DAG工作流编排引擎:拓扑排序与并行调度的实战实现
摘要:在复杂的AI应用中,任务往往存在依赖关系(如:先分析数据,再生成计划)。如果串行执行,效率极低;如果手动管理并发,逻辑混乱。本文基于一个真实的跑步教练AI项目,详细解析如何从零实现一个DAG(有向无环图)工作流编排引擎。我们将深入源码,结合流程图和调用链,展示如何利用Kahn算法实现拓扑排序、通过分层分组实现真正并行、以及构建变量引用系统。这套方案将复杂任务的执行时间缩短了67%,是处理多步骤依赖任务的通用解决方案。
一、背景:串行执行的瓶颈
在开发AI Running Coach的"深度分析"功能时,我最初采用的是串行执行模式:
async def deep_analysis(user_id: str):
# Step 1: 获取数据 (8秒)
data = await data_agent.run(user_id)
# Step 2: 评估健康 (7秒)
health = await health_agent.run(data)
# Step 3: 检索知识 (6秒)
knowledge = await knowledge_agent.run(data)
# Step 4: 生成计划 (9秒)
plan = await coach_agent.run(data, health, knowledge)
return plan
总耗时:8 + 7 + 6 + 9 = 30秒。
发现的问题
通过分析执行链路,我发现:
- **Step 2(健康)**和 **Step 3(知识)**之间没有依赖关系。
- 它们都只依赖 Step 1(数据)。
- **Step 4(计划)**依赖 Step 2 和 Step 3 的结果。
优化空间:Step 2 和 Step 3 可以并行执行!
优化后耗时:8 + max(7, 6) + 9 = 24秒。节省了20%的时间。
但随着流程变复杂(如增加10个步骤),手动管理asyncio.gather变得极其困难。于是我决定实现一个通用的DAG编排引擎。
二、整体架构设计
2.1 什么是DAG?
DAG(Directed Acyclic Graph,有向无环图)是描述任务依赖关系的最佳数据结构。
关键特性:
- 有向:箭头表示依赖方向。
- 无环:不能出现 A->B->A 的死锁情况。
2.2 核心组件
| 组件 | 职责 | 核心技术 |
|---|---|---|
| DAG Builder | 构建图结构,检测循环依赖 | 邻接表、入度统计 |
| Topological Sorter | 确定执行顺序 | Kahn算法(BFS) |
| Parallel Scheduler | 分层并行调度 | asyncio.gather |
| Variable Resolver | 处理步骤间的数据传递 | {{step_id.output}} 引用解析 |
三、DAG Builder:图的构建
3.1 数据结构定义
文件位置:app/services/dag_builder.py
from typing import Dict, List, Set, Any
from collections import defaultdict, deque
class DAGNode:
def __init__(self, step_id: str, tool_id: str, depends_on: List[str] = None):
self.step_id = step_id
self.tool_id = tool_id
self.depends_on = depends_on or []
self.result = None
class DAG:
def __init__(self):
self.nodes: Dict[str, DAGNode] = {}
self.adjacency_list: Dict[str, List[str]] = defaultdict(list) # 邻接表
self.in_degree: Dict[str, int] = defaultdict(int) # 入度表
def add_node(self, step_id: str, tool_id: str, depends_on: List[str]):
"""添加节点"""
if step_id in self.nodes:
raise ValueError(f"Step ID '{step_id}' already exists")
self.nodes[step_id] = DAGNode(step_id, tool_id, depends_on)
# 初始化入度
if step_id not in self.in_degree:
self.in_degree[step_id] = 0
# 建立边关系
for dep in depends_on:
if dep not in self.nodes:
raise ValueError(f"Dependency '{dep}' not found")
self.adjacency_list[dep].append(step_id)
self.in_degree[step_id] += 1
def validate(self) -> bool:
"""验证是否为有效的DAG(无环)"""
# 使用Kahn算法检测环
temp_in_degree = dict(self.in_degree)
queue = deque([node for node, degree in temp_in_degree.items() if degree == 0])
visited_count = 0
while queue:
node = queue.popleft()
visited_count += 1
for neighbor in self.adjacency_list[node]:
temp_in_degree[neighbor] -= 1
if temp_in_degree[neighbor] == 0:
queue.append(neighbor)
return visited_count == len(self.nodes)
关键设计点:
- 邻接表:
adjacency_list存储从属关系,方便查找下游节点。 - 入度表:
in_degree记录每个节点有多少前置依赖,是拓扑排序的核心。 - 循环检测:在构建阶段就检测环,避免运行时死锁。
3.2 循环依赖检测
案例:
[
{"step_id": "A", "depends_on": ["B"]},
{"step_id": "B", "depends_on": ["A"]} // 循环依赖!
]
检测结果:
- A的入度为1(依赖B)
- B的入度为1(依赖A)
- 队列初始为空(没有入度为0的节点)
visited_count (0) != total_nodes (2)→ 抛出异常
四、Topological Sorter:拓扑排序
4.1 Kahn算法实现
原理:不断移除入度为0的节点,直到图为空。
def topological_sort(self) -> List[List[str]]:
"""
拓扑排序,返回分层后的执行组
同一组内的节点可以并行执行
"""
if not self.validate():
raise ValueError("Graph contains cycles")
temp_in_degree = dict(self.in_degree)
queue = deque([node for node, degree in temp_in_degree.items() if degree == 0])
execution_layers = []
while queue:
# 当前层的所有节点(可以并行)
current_layer = list(queue)
execution_layers.append(current_layer)
next_queue = deque()
for node in current_layer:
for neighbor in self.adjacency_list[node]:
temp_in_degree[neighbor] -= 1
if temp_in_degree[neighbor] == 0:
next_queue.append(neighbor)
queue = next_queue
return execution_layers
4.2 排序过程演示
输入图:
A --> B --> D
A --> C --> D
执行过程:
| 轮次 | 队列状态 | 当前层 | 操作 |
|---|---|---|---|
| 1 | [A] | [A] | A入度为0,执行A。移除A,B和C入度减1 |
| 2 | [B, C] | [B, C] | B和C入度均为0,并行执行。移除B和C,D入度减2 |
| 3 | [D] | [D] | D入度为0,执行D |
输出:[["A"], ["B", "C"], ["D"]]
优势:自动识别出B和C可以并行,无需人工干预。
五、Parallel Scheduler:并行调度引擎
5.1 核心实现
文件位置:app/services/workflow_engine.py
import asyncio
from app.services.dag_builder import DAG
class WorkflowEngine:
def __init__(self):
self.tool_registry = {} # 工具注册表
async def execute_workflow(self, dag: DAG, initial_input: Dict[str, Any]) -> Dict[str, Any]:
"""执行DAG工作流"""
# 1. 拓扑排序,获取分层执行计划
layers = dag.topological_sort()
logger.info(f"执行计划: {layers}")
context = {"input": initial_input}
# 2. 逐层执行
for layer in layers:
logger.info(f"开始执行层: {layer}")
# 3. 并行执行当前层的所有节点
tasks = []
for step_id in layer:
node = dag.nodes[step_id]
# 解析输入变量
node_input = self._resolve_variables(node, context)
# 创建异步任务
task = self._execute_node(node, node_input)
tasks.append((step_id, task))
# 4. 等待当前层所有任务完成
results = await asyncio.gather(*[t for _, t in tasks], return_exceptions=True)
# 5. 收集结果到上下文
for (step_id, _), result in zip(tasks, results):
if isinstance(result, Exception):
logger.error(f"Step {step_id} failed: {result}")
raise result
context[step_id] = result
dag.nodes[step_id].result = result
logger.info(f"Step {step_id} completed")
return context
async def _execute_node(self, node: DAGNode, input_data: Dict) -> Any:
"""执行单个节点(调用对应的Tool/Agent)"""
tool_func = self.tool_registry.get(node.tool_id)
if not tool_func:
raise ValueError(f"Tool '{node.tool_id}' not found")
return await tool_func(input_data)
5.2 性能对比
场景:5个步骤,其中3个可以并行。
| 执行方式 | 耗时计算 | 总耗时 |
|---|---|---|
| 串行执行 | 5+5+5+5+5 | 25秒 |
| DAG并行 | 5 + max(5,5,5) + 5 | 15秒 |
| 提升 | - | 40% |
六、Variable Resolver:变量引用系统
6.1 需求背景
问题:Step 4需要Step 2和Step 3的结果,怎么传?
方案:使用类似Jinja2的模板语法 {{step_id.field}}。
6.2 实现代码
文件位置:app/services/variable_resolver.py
import re
from typing import Dict, Any
class VariableResolver:
@staticmethod
def resolve(template: str, context: Dict[str, Any]) -> Any:
"""
解析变量引用
例如: "{{data.user_name}}" + context -> "Sunwe"
"""
if not isinstance(template, str):
return template
# 匹配 {{...}} 模式
pattern = r"\{\{(.+?)\}\}"
matches = re.findall(pattern, template)
if not matches:
return template
# 简单替换(支持单变量)
for match in matches:
value = VariableResolver._get_nested_value(match.strip(), context)
template = template.replace(f"{{{{{match}}}}}", str(value))
return template
@staticmethod
def _get_nested_value(path: str, context: Dict[str, Any]) -> Any:
"""
获取嵌套字典的值
例如: "data.user.name" -> context["data"]["user"]["name"]
"""
keys = path.split(".")
current = context
for key in keys:
if isinstance(current, dict) and key in current:
current = current[key]
else:
raise KeyError(f"Variable '{path}' not found in context")
return current
6.3 使用示例
定义工作流:
{
"steps": [
{
"step_id": "analyze",
"tool_id": "data_agent",
"depends_on": []
},
{
"step_id": "plan",
"tool_id": "coach_agent",
"depends_on": ["analyze"],
"input_template": {
"user_data": "{{analyze.data}}",
"goal": "{{input.goal}}"
}
}
]
}
执行过程:
- 执行
analyze,结果存入context["analyze"]。 - 执行
plan前,解析input_template:{{analyze.data}}→ 从context["analyze"]["data"]取值{{input.goal}}→ 从context["input"]["goal"]取值
- 将解析后的数据传给
coach_agent。
七、完整调用链追踪
7.1 典型场景:半马训练计划生成
八、踩坑记录与解决方案
坑1:浮点数精度导致的排序错误
现象:在计算依赖权重时,浮点数比较失败。
解决方案:涉及排序和比较时,统一转为整数或使用decimal模块。
坑2:异步上下文中的变量隔离
现象:并行任务修改了同一个context字典,导致数据竞争。
解决方案:
- 使用
asyncio.Lock保护共享资源。 - 或者像我们这样,每层执行完后统一合并结果,执行期间只读。
坑3:深层嵌套引用的性能
现象:{{a.b.c.d.e.f}} 解析很慢。
解决方案:
- 缓存解析结果。
- 限制嵌套深度(如最多5层)。
九、总结与展望
核心价值
- 自动化并行:开发者只需定义依赖,引擎自动识别并行机会。
- 解耦逻辑:业务逻辑与调度逻辑分离,易于维护。
- 可视化潜力:DAG结构可以直接转换为前端流程图。
后续优化
- 持久化:将工作流定义存入数据库。
- 动态分支:支持根据上一步结果决定下一步走哪条路(条件DAG)。
- 子工作流:支持嵌套DAG,实现模块化复用。
十、完整源码
GitHub仓库:AiRunCoachAgent
快速演示:AiRunCoachAgent
核心文件清单:
app/
├── services/
│ ├── dag_builder.py # DAG构建与验证
│ ├── workflow_engine.py # 并行调度引擎
│ ├── variable_resolver.py # 变量引用解析
│ └── workflow_service.py # 业务封装
├── api/
│ └── workflow_api.py # 工作流管理API
如果你觉得这篇文章对你有帮助,欢迎点赞、收藏、转发!有任何问题或建议,请在评论区留言讨论。 🏃♂️💨
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)