Temporal介绍(持久化、可靠、可恢复的分布式工作流引擎)Event History事件历史、Deterministic确定性、副作用操作、Saga分布式事务、temporalio
文章目录
- Temporal 完全指南:下一代分布式工作流引擎
- 什么是 Temporal?
- 为什么需要 Temporal?
- Temporal 的核心思想
- Temporal 架构
- Temporal 最核心的概念
- 1. Workflow
- 2. Activity
- (解释)
- 3. Worker
- 4. Event History(最核心)
- Temporal 的“魔法”
- (解释)关于“睡眠任务持久化存储”
- Deterministic(确定性)
- Retry(重试)
- Saga(分布式事务)
- Continue-As-New
- Child Workflow
- Signal(超强能力)
- Query
- Temporal 与消息队列的区别
- Temporal 与 Airflow 的区别
- Temporal 与 AI Agent
- Python SDK 示例
- Temporal 的优势
- Temporal 的缺点
- 1. 学习曲线高
- 2. Debug 比传统程序复杂
- 3. History 膨胀问题
- 4. 运维复杂度
- Temporal Cloud
- 适合 Temporal 的场景
- 一个非常经典的理解
- 总结
Temporal 完全指南:下一代分布式工作流引擎
在现代后端系统中,业务流程越来越复杂:
-
用户下单后,需要:
- 扣库存
- 创建支付单
- 调用第三方支付
- 等待支付回调
- 发货
- 发送通知
-
AI Agent 系统中,需要:
- 多步骤推理
- 长时间等待
- 人工审批
- 自动重试
- 状态恢复
问题来了:
如果服务重启了怎么办?
某一步失败怎么办?
长达几天的任务怎么恢复?
如何保证流程不会丢失状态?
这正是 Temporal 解决的问题。
什么是 Temporal?
Temporal Technologies 是一个:
“持久化、可靠、可恢复”的分布式工作流引擎(Workflow Engine)
官方网站:
GitHub:
它最初来自:
Uber 内部的 Cadence 项目。
Temporal 的核心目标是:
把“业务流程状态管理”从应用代码中抽离出来。
为什么需要 Temporal?
传统微服务里:
Service A -> Service B -> Service C
看似简单,但实际上:
- 网络可能失败
- 服务可能重启
- 第三方 API 不稳定
- 消息队列可能重复消费
- 定时任务可能丢失
于是开发者开始写:
retry()
timeout()
dead_letter_queue()
cron()
state_machine()
compensation()
系统逐渐变成:
业务逻辑 20%
异常处理 80%
Temporal 的目标就是:
让开发者只写业务逻辑。
Temporal 的核心思想
Temporal 的核心理念非常重要:
Workflow 是“可恢复的程序”
也就是说:
- 程序执行到一半
- 服务宕机
- 容器销毁
- 几天后恢复
Workflow 仍然能继续执行。
Temporal 架构
整体架构:
核心组件:
| 组件 | 作用 |
|---|---|
| Temporal Server | 工作流协调中心 |
| Worker | 执行业务代码 |
| Workflow | 持久化状态机 |
| Activity | 真正执行副作用 |
| Task Queue | 调度队列 |
| Persistence DB | 持久化事件历史 |
Temporal 最核心的概念
1. Workflow
Workflow:
一个“可持久化”的函数。
例如:
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str):
await workflow.execute_activity(
reserve_inventory,
order_id
)
await workflow.execute_activity(
create_payment,
order_id
)
await workflow.sleep(days=3)
await workflow.execute_activity(
ship_order,
order_id
)
注意:
await workflow.sleep(days=3)
即使:
- Worker 重启
- Pod 被删除
- 服务器宕机
3 天后依然会恢复执行。
这是 Temporal 最震撼的能力之一。
2. Activity
Activity:
真正执行“副作用”的地方。
例如:
- HTTP 请求
- 数据库写入
- 发邮件
- 调用 OpenAI
- 调用 Stripe
因为 Workflow 必须是“确定性的”。
所以:
time.time()
random.random()
requests.post()
这些都不能直接在 Workflow 中执行。
必须放进 Activity。
(解释)
这段内容是关于 Temporal 工作流引擎中 “Activity” 概念 的核心解释,核心逻辑是 “Workflow 必须保持确定性,而外部副作用操作必须放在 Activity 中执行”。以下是逐层拆解:
1️⃣ 核心原则:Workflow 必须是“确定性的”
-
确定性(Determinism):
指 Workflow 在相同输入 + 相同历史事件下,每次执行必须产生完全一致的结果。
这是 Temporal 的核心设计原则,目的是保证:- 工作流可以无限重试(如机器崩溃后恢复)
- 工作流可以精确回放(用于调试、审计)
- 避免因外部不确定性导致状态不一致
-
为什么需要确定性?
举个反例:如果 Workflow 直接调用time.time(),第一次执行时返回1717020000,第二次执行时可能返回1717020001(时间变化了),这就破坏了确定性 → 工作流无法重放/恢复。
2️⃣ “副作用”操作为什么不能直接在 Workflow 中执行?
-
什么是“副作用”?
指与外部系统交互或引入不确定性的操作,例如:- 网络请求(HTTP 请求、调用 OpenAI/Stripe API)
- 数据库写入(可能失败/超时)
- 发送邮件(依赖外部服务)
- 生成随机数(
random.random()) - 获取当前时间(
time.time())
-
为什么不能直接在 Workflow 中执行?
这些操作具有 “非确定性”:time.time():每次执行返回不同时间戳random.random():每次生成不同随机数requests.post():可能因网络波动返回不同结果
→ 直接在 Workflow 中执行会导致 “相同输入但结果不同”,破坏确定性原则。
3️⃣ 解决方案:把副作用操作放进 Activity
-
Activity 的作用:
专门执行“副作用”操作的单元,它负责:- 与外部系统交互(如调用 API、写数据库)
- 处理不确定性(如重试失败的请求)
- 将非确定性操作隔离在 Workflow 之外
-
Workflow 的职责:
仅负责编排逻辑(如“先 A 再 B,B 失败则重试”),不直接操作外部系统。
Workflow 通过 “调度 Activity” 来间接执行副作用操作,例如:# Workflow 代码(确定性逻辑) def my_workflow(): # 1. 调度一个 Activity 执行 HTTP 请求 result = await execute_activity(http_request_activity, url="https://example.com") # 2. 根据结果决定下一步(确定性逻辑) if result.status == 200: await execute_activity(send_email_activity) -
为什么这样设计?
- Activity 执行时,Temporal 会记录其输入/输出,但不记录内部细节(如具体时间戳、随机数)。
- 当 Workflow 需要重放时,Temporal 会直接使用之前记录的 Activity 结果,而非重新执行 → 保证确定性。
🌰 举个实际例子
假设你有一个订单处理 Workflow:
# 错误:直接在 Workflow 中调用外部 API(破坏确定性!)
def process_order():
# 1. 获取当前时间(非确定性!)
current_time = time.time()
# 2. 调用支付 API(可能失败/超时)
payment_result = requests.post("https://stripe.com/charge", ...)
✅ 正确做法:用 Activity 封装副作用
# Activity:专门处理支付(非确定性操作)
@activity.defn
def charge_payment(amount: int):
return requests.post("https://stripe.com/charge", amount=amount)
# Workflow:仅编排逻辑(确定性)
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_id: str):
# 1. 调度 Activity 执行支付
payment_result = await workflow.execute_activity(
charge_payment,
args=[100],
schedule_to_close_timeout=timedelta(seconds=10)
)
# 2. 根据结果决定下一步(确定性逻辑)
if payment_result.success:
await workflow.execute_activity(send_confirmation_email)
💡 总结
| 组件 | 职责 | 是否允许非确定性 | 例子 |
|---|---|---|---|
| Workflow | 编排业务逻辑 | ❌ 必须确定性 | “如果支付成功则发邮件” |
| Activity | 执行外部副作用操作 | ✅ 允许非确定性 | 调用 API、写数据库、发邮件 |
- 关键结论:
所有与外部系统交互或引入不确定性的操作(HTTP 请求、数据库写入、随机数等),必须封装在 Activity 中,而不能直接写在 Workflow 里。
这是 Temporal 保证工作流可重放、可恢复、可审计的核心机制。
这种设计让开发者能专注业务逻辑(Workflow),同时安全地处理外部依赖(Activity),是 Temporal 区别于普通任务队列(如 Celery)的核心优势。
3. Worker
Worker:
负责真正执行 Workflow / Activity 的进程。
可以理解成:
Temporal Server = 大脑
Worker = 四肢
Worker 是无状态的。
即使 Worker 崩溃:
- Temporal 会重新调度
- Workflow 会继续执行
4. Event History(最核心)
Temporal 最重要的设计:
Event Sourcing
Workflow 的每一步都会记录成 Event。
例如:
WorkflowStarted
ActivityScheduled
ActivityCompleted
TimerStarted
TimerFired
WorkflowCompleted
Temporal 可以通过:
Replay Event History
重新恢复 Workflow 状态。
这也是:
“为什么 Workflow 可以恢复”
的根本原因。
Temporal 的“魔法”
很多人第一次接触 Temporal 时会震惊:
await workflow.sleep(days=30)
为什么不会阻塞线程?
原因是:
Temporal 并不是传统 sleep。
而是:
1. 记录 Timer Event
2. 挂起 Workflow
3. 释放 Worker
4. 到时间后重新调度
因此:
- 数百万 Workflow
- 几乎不占资源
(解释)关于“睡眠任务持久化存储”
Temporal 框架中 workflow.sleep() 的特殊设计逻辑,核心是说明它为什么“看起来像阻塞线程,但实际不会阻塞”,从而实现高并发处理。以下是分步解读:
1️⃣ 问题背景
很多人第一次看到 Temporal 的代码:
await workflow.sleep(days=30)
会感到震惊,因为传统编程中的 sleep 会阻塞当前线程(线程会一直占用资源等待,无法处理其他任务)。
但 Temporal 的 workflow.sleep 并不会阻塞线程,这是它的“魔法”所在。
2️⃣ 为什么不会阻塞?
Temporal 的 workflow.sleep 不是传统意义上的“休眠”,而是通过以下 4 步实现“逻辑上的等待”,同时释放资源:
- 记录 Timer Event
→ 把“30 天后唤醒”的任务记录到 Temporal 的持久化存储中(比如数据库)。 - 挂起 Workflow
→ 当前 Workflow 的执行状态被暂停,不再占用计算资源。 - 释放 Worker
→ 执行这个 Workflow 的 Worker(比如服务器进程)被释放,可以去处理其他任务。 - 到时间后重新调度
→ 30 天后,Temporal 检测到 Timer Event 到期,重新调度该 Workflow 继续执行。
3️⃣ 带来的优势
- 支持数百万 Workflow
因为 Worker 不会被“卡在 sleep”上,可以复用资源处理其他任务。 - 几乎不占资源
等待期间,Workflow 仅占用少量存储(记录状态),不消耗 CPU/内存等计算资源。
✅ 通俗理解
想象你让一个员工(Worker)去等一个 30 天后的电话:
- 传统
sleep:员工坐在电话旁干等 30 天,期间不能做其他事(资源浪费)。 - Temporal
workflow.sleep:员工记下“30 天后有电话”,然后去忙其他工作;到期时系统自动提醒他回来处理(资源高效复用)。
💡 关键结论
Temporal 通过 “持久化状态 + 异步调度” 的设计,把“等待”变成一个可恢复的、非阻塞的事件,从而实现超大规模工作流的高效管理。
这正是它能处理“数百万 Workflow”却“几乎不占资源”的核心原因。
Deterministic(确定性)
这是 Temporal 最难理解的部分。
Workflow 必须:
相同 Event History => 相同执行结果
因此 Workflow 里:
❌ 禁止:
random.random()
datetime.now()
uuid.uuid4()
因为 Replay 时结果会不同。
Temporal 提供了:
workflow.now()
workflow.random()
等机制保证一致性。
Retry(重试)
Temporal 内建:
- Retry
- Backoff
- Timeout
- Heartbeat
例如:
retry_policy=RetryPolicy(
maximum_attempts=5
)
无需手写:
while True:
try:
...
Saga(分布式事务)
Temporal 非常适合:
Saga Pattern
例如:
1. 扣库存
2. 扣款
3. 创建订单
如果第 3 步失败:
补偿:
- 退款
- 恢复库存
Temporal 天然适合做:
- 补偿逻辑
- 长事务
- 跨服务事务
Continue-As-New
Workflow 可能无限增长。
例如:
- 聊天机器人
- 长生命周期 Agent
- 永久运行流程
Temporal 提供:
workflow.continue_as_new()
把旧 History 截断。
避免:
- Replay 太慢
- History 爆炸
这是生产环境非常关键的机制。
Child Workflow
Temporal 支持:
Workflow -> Child Workflow
例如:
主订单流程
├─ 支付流程
├─ 发货流程
└─ 通知流程
非常适合:
- 多 Agent
- DAG
- 复杂编排
Signal(超强能力)
Signal:
外部动态通知 Workflow
例如:
Workflow 正在等待支付
外部系统:
支付成功
发送:
workflow.signal("payment_received")
Workflow 立即恢复。
这是:
“长时间等待外部事件”
的核心能力。
Query
Query:
查询 Workflow 当前状态
例如:
workflow.query("current_status")
不影响执行。
非常适合:
- 订单状态查询
- Agent 状态查询
Temporal 与消息队列的区别
很多人会问:
Kafka / RabbitMQ 不行吗?
区别很大。
| 能力 | MQ | Temporal |
|---|---|---|
| 消息传递 | ✅ | ✅ |
| 长时间状态管理 | ❌ | ✅ |
| 自动恢复 | ❌ | ✅ |
| Workflow 编排 | ❌ | ✅ |
| Saga | ❌ | ✅ |
| Retry | 部分 | ✅ |
| Durable Timer | ❌ | ✅ |
Temporal 更像:
“有状态的业务流程引擎”
而不是简单 MQ。
Temporal 与 Airflow 的区别
| 对比 | Airflow | Temporal |
|---|---|---|
| 主要场景 | 数据管道 | 应用工作流 |
| 编排方式 | DAG | 代码 |
| 长时间等待 | 一般 | 极强 |
| 实时业务 | 不适合 | 非常适合 |
| 状态恢复 | 一般 | 极强 |
| 微服务集成 | 一般 | 极强 |
Temporal 与 AI Agent
Temporal 最近在 AI Agent 圈非常火。
因为 Agent 天生需要:
- 长生命周期
- 多步骤推理
- Retry
- Human-in-the-loop
- 状态恢复
- 工具调用
- DAG 编排
Temporal 非常适合:
Planner Agent
↓
Research Agent
↓
Code Agent
↓
Review Agent
很多 AI 公司正在使用:
- Temporal
- LangGraph
- Durable Execution(确保智能体的任务执行即使在系统崩溃、网络中断、服务重启等异常情况下,也能从断点恢复,避免任务丢失或重复执行。)
构建 Agent Runtime。
Python SDK 示例
安装:
pip install temporalio
简单示例:
from temporalio import workflow, activity
@activity.defn
async def hello(name: str):
return f"Hello {name}"
@workflow.defn
class HelloWorkflow:
@workflow.run
async def run(self, name: str):
result = await workflow.execute_activity(
hello,
name,
start_to_close_timeout=timedelta(seconds=10),
)
return result
启动 Worker:
worker = Worker(
client,
task_queue="hello-task-queue",
workflows=[HelloWorkflow],
activities=[hello],
)
Temporal 的优势
1. Durable Execution
最大的核心价值:
程序永远不会“忘记”执行到哪里。
2. 极强的故障恢复
即使:
- 服务器崩溃
- Kubernetes 重建
- Worker 升级
Workflow 仍然继续。
3. 天然支持长时间任务
例如:
等待用户审批 7 天
传统系统很难做。
Temporal 天然支持。
4. 开发体验极好
开发者:
用普通代码写工作流
而不是:
- YAML
- JSON DAG
- 状态机配置
Temporal 的缺点
1. 学习曲线高
尤其:
- Deterministic
- Replay
- Versioning
初学者容易踩坑。
2. Debug 比传统程序复杂
因为:
- Workflow 会 Replay
- 并非普通函数调用
理解成本较高。
3. History 膨胀问题
长生命周期 Workflow:
可能:
数十万 Event
需要:
- Continue-As-New
- History 管理
4. 运维复杂度
自建 Temporal:
涉及:
- Frontend
- History Service
- Matching Service
- Persistence
生产部署并不简单。
Temporal Cloud
官方也提供托管服务:
适合:
- 不想自建集群
- 快速上线
- SaaS 场景
适合 Temporal 的场景
非常适合:
✅ 订单系统
✅ 支付系统
✅ AI Agent
✅ 人工审批
✅ 长事务
✅ 微服务编排
✅ ETL Pipeline
✅ DevOps Pipeline
✅ 异步任务系统
不太适合:
❌ 超简单 CRUD
❌ 极短生命周期任务
❌ 单体同步系统
一个非常经典的理解
可以这样理解:
传统程序:
程序状态在内存
Temporal:
程序状态在数据库
这就是它最革命性的地方。
总结
Temporal 本质上是:
“把程序执行过程持久化”
它解决的是:
分布式系统中的状态可靠性问题
它最核心的价值:
- Durable Execution
- Workflow Orchestration
- Fault Tolerance
- Long-running Process
- Stateful Workflow
如果你正在构建:
- 微服务
- AI Agent
- 长事务系统
- 分布式业务流程
Temporal 几乎是当前最强的基础设施之一。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)