摘要:在复杂的AI应用中,任务往往存在依赖关系(如:先分析数据,再生成计划)。如果串行执行,效率极低;如果手动管理并发,逻辑混乱。本文基于一个真实的跑步教练AI项目,详细解析如何从零实现一个DAG(有向无环图)工作流编排引擎。我们将深入源码,结合流程图和调用链,展示如何利用Kahn算法实现拓扑排序、通过分层分组实现真正并行、以及构建变量引用系统。这套方案将复杂任务的执行时间缩短了67%,是处理多步骤依赖任务的通用解决方案。


一、背景:串行执行的瓶颈

在开发AI Running Coach的"深度分析"功能时,我最初采用的是串行执行模式

async def deep_analysis(user_id: str):
    # Step 1: 获取数据 (8秒)
    data = await data_agent.run(user_id)
    
    # Step 2: 评估健康 (7秒)
    health = await health_agent.run(data)
    
    # Step 3: 检索知识 (6秒)
    knowledge = await knowledge_agent.run(data)
    
    # Step 4: 生成计划 (9秒)
    plan = await coach_agent.run(data, health, knowledge)
    
    return plan

总耗时:8 + 7 + 6 + 9 = 30秒

发现的问题

通过分析执行链路,我发现:

  1. **Step 2(健康)**和 **Step 3(知识)**之间没有依赖关系。
  2. 它们都只依赖 Step 1(数据)
  3. **Step 4(计划)**依赖 Step 2 和 Step 3 的结果。

优化空间:Step 2 和 Step 3 可以并行执行!

优化后耗时:8 + max(7, 6) + 9 = 24秒。节省了20%的时间。

但随着流程变复杂(如增加10个步骤),手动管理asyncio.gather变得极其困难。于是我决定实现一个通用的DAG编排引擎


二、整体架构设计

2.1 什么是DAG?

DAG(Directed Acyclic Graph,有向无环图)是描述任务依赖关系的最佳数据结构。

Step 1: 获取数据

Step 2: 健康评估

Step 3: 知识检索

Step 4: 生成计划

关键特性

  • 有向:箭头表示依赖方向。
  • 无环:不能出现 A->B->A 的死锁情况。

2.2 核心组件

组件 职责 核心技术
DAG Builder 构建图结构,检测循环依赖 邻接表、入度统计
Topological Sorter 确定执行顺序 Kahn算法(BFS)
Parallel Scheduler 分层并行调度 asyncio.gather
Variable Resolver 处理步骤间的数据传递 {{step_id.output}} 引用解析

三、DAG Builder:图的构建

3.1 数据结构定义

文件位置:app/services/dag_builder.py

from typing import Dict, List, Set, Any
from collections import defaultdict, deque

class DAGNode:
    def __init__(self, step_id: str, tool_id: str, depends_on: List[str] = None):
        self.step_id = step_id
        self.tool_id = tool_id
        self.depends_on = depends_on or []
        self.result = None

class DAG:
    def __init__(self):
        self.nodes: Dict[str, DAGNode] = {}
        self.adjacency_list: Dict[str, List[str]] = defaultdict(list)  # 邻接表
        self.in_degree: Dict[str, int] = defaultdict(int)              # 入度表
    
    def add_node(self, step_id: str, tool_id: str, depends_on: List[str]):
        """添加节点"""
        if step_id in self.nodes:
            raise ValueError(f"Step ID '{step_id}' already exists")
        
        self.nodes[step_id] = DAGNode(step_id, tool_id, depends_on)
        
        # 初始化入度
        if step_id not in self.in_degree:
            self.in_degree[step_id] = 0
        
        # 建立边关系
        for dep in depends_on:
            if dep not in self.nodes:
                raise ValueError(f"Dependency '{dep}' not found")
            
            self.adjacency_list[dep].append(step_id)
            self.in_degree[step_id] += 1
    
    def validate(self) -> bool:
        """验证是否为有效的DAG(无环)"""
        # 使用Kahn算法检测环
        temp_in_degree = dict(self.in_degree)
        queue = deque([node for node, degree in temp_in_degree.items() if degree == 0])
        visited_count = 0
        
        while queue:
            node = queue.popleft()
            visited_count += 1
            
            for neighbor in self.adjacency_list[node]:
                temp_in_degree[neighbor] -= 1
                if temp_in_degree[neighbor] == 0:
                    queue.append(neighbor)
        
        return visited_count == len(self.nodes)

关键设计点

  1. 邻接表adjacency_list存储从属关系,方便查找下游节点。
  2. 入度表in_degree记录每个节点有多少前置依赖,是拓扑排序的核心。
  3. 循环检测:在构建阶段就检测环,避免运行时死锁。

3.2 循环依赖检测

案例

[
  {"step_id": "A", "depends_on": ["B"]},
  {"step_id": "B", "depends_on": ["A"]}  // 循环依赖!
]

检测结果

  • A的入度为1(依赖B)
  • B的入度为1(依赖A)
  • 队列初始为空(没有入度为0的节点)
  • visited_count (0) != total_nodes (2) → 抛出异常

四、Topological Sorter:拓扑排序

4.1 Kahn算法实现

原理:不断移除入度为0的节点,直到图为空。

def topological_sort(self) -> List[List[str]]:
    """
    拓扑排序,返回分层后的执行组
    同一组内的节点可以并行执行
    """
    if not self.validate():
        raise ValueError("Graph contains cycles")
    
    temp_in_degree = dict(self.in_degree)
    queue = deque([node for node, degree in temp_in_degree.items() if degree == 0])
    
    execution_layers = []
    
    while queue:
        # 当前层的所有节点(可以并行)
        current_layer = list(queue)
        execution_layers.append(current_layer)
        
        next_queue = deque()
        
        for node in current_layer:
            for neighbor in self.adjacency_list[node]:
                temp_in_degree[neighbor] -= 1
                if temp_in_degree[neighbor] == 0:
                    next_queue.append(neighbor)
        
        queue = next_queue
    
    return execution_layers

4.2 排序过程演示

输入图

A --> B --> D
A --> C --> D

执行过程

轮次 队列状态 当前层 操作
1 [A] [A] A入度为0,执行A。移除A,B和C入度减1
2 [B, C] [B, C] B和C入度均为0,并行执行。移除B和C,D入度减2
3 [D] [D] D入度为0,执行D

输出[["A"], ["B", "C"], ["D"]]

优势:自动识别出B和C可以并行,无需人工干预。


五、Parallel Scheduler:并行调度引擎

5.1 核心实现

文件位置:app/services/workflow_engine.py

import asyncio
from app.services.dag_builder import DAG

class WorkflowEngine:
    def __init__(self):
        self.tool_registry = {}  # 工具注册表
    
    async def execute_workflow(self, dag: DAG, initial_input: Dict[str, Any]) -> Dict[str, Any]:
        """执行DAG工作流"""
        
        # 1. 拓扑排序,获取分层执行计划
        layers = dag.topological_sort()
        logger.info(f"执行计划: {layers}")
        
        context = {"input": initial_input}
        
        # 2. 逐层执行
        for layer in layers:
            logger.info(f"开始执行层: {layer}")
            
            # 3. 并行执行当前层的所有节点
            tasks = []
            for step_id in layer:
                node = dag.nodes[step_id]
                
                # 解析输入变量
                node_input = self._resolve_variables(node, context)
                
                # 创建异步任务
                task = self._execute_node(node, node_input)
                tasks.append((step_id, task))
            
            # 4. 等待当前层所有任务完成
            results = await asyncio.gather(*[t for _, t in tasks], return_exceptions=True)
            
            # 5. 收集结果到上下文
            for (step_id, _), result in zip(tasks, results):
                if isinstance(result, Exception):
                    logger.error(f"Step {step_id} failed: {result}")
                    raise result
                
                context[step_id] = result
                dag.nodes[step_id].result = result
                logger.info(f"Step {step_id} completed")
        
        return context
    
    async def _execute_node(self, node: DAGNode, input_data: Dict) -> Any:
        """执行单个节点(调用对应的Tool/Agent)"""
        tool_func = self.tool_registry.get(node.tool_id)
        if not tool_func:
            raise ValueError(f"Tool '{node.tool_id}' not found")
        
        return await tool_func(input_data)

5.2 性能对比

场景:5个步骤,其中3个可以并行。

执行方式 耗时计算 总耗时
串行执行 5+5+5+5+5 25秒
DAG并行 5 + max(5,5,5) + 5 15秒
提升 - 40%

六、Variable Resolver:变量引用系统

6.1 需求背景

问题:Step 4需要Step 2和Step 3的结果,怎么传?

方案:使用类似Jinja2的模板语法 {{step_id.field}}

6.2 实现代码

文件位置:app/services/variable_resolver.py

import re
from typing import Dict, Any

class VariableResolver:
    @staticmethod
    def resolve(template: str, context: Dict[str, Any]) -> Any:
        """
        解析变量引用
        例如: "{{data.user_name}}" + context -> "Sunwe"
        """
        if not isinstance(template, str):
            return template
        
        # 匹配 {{...}} 模式
        pattern = r"\{\{(.+?)\}\}"
        matches = re.findall(pattern, template)
        
        if not matches:
            return template
        
        # 简单替换(支持单变量)
        for match in matches:
            value = VariableResolver._get_nested_value(match.strip(), context)
            template = template.replace(f"{{{{{match}}}}}", str(value))
        
        return template
    
    @staticmethod
    def _get_nested_value(path: str, context: Dict[str, Any]) -> Any:
        """
        获取嵌套字典的值
        例如: "data.user.name" -> context["data"]["user"]["name"]
        """
        keys = path.split(".")
        current = context
        
        for key in keys:
            if isinstance(current, dict) and key in current:
                current = current[key]
            else:
                raise KeyError(f"Variable '{path}' not found in context")
        
        return current

6.3 使用示例

定义工作流

{
  "steps": [
    {
      "step_id": "analyze",
      "tool_id": "data_agent",
      "depends_on": []
    },
    {
      "step_id": "plan",
      "tool_id": "coach_agent",
      "depends_on": ["analyze"],
      "input_template": {
        "user_data": "{{analyze.data}}",
        "goal": "{{input.goal}}"
      }
    }
  ]
}

执行过程

  1. 执行 analyze,结果存入 context["analyze"]
  2. 执行 plan 前,解析 input_template
    • {{analyze.data}} → 从 context["analyze"]["data"] 取值
    • {{input.goal}} → 从 context["input"]["goal"] 取值
  3. 将解析后的数据传给 coach_agent

七、完整调用链追踪

7.1 典型场景:半马训练计划生成

Tool: Coach Agent Tool: Knowledge Agent Tool: Health Agent Tool: Data Agent Topological Sorter DAG Builder Workflow Engine Workflow API 用户 Tool: Coach Agent Tool: Knowledge Agent Tool: Health Agent Tool: Data Agent Topological Sorter DAG Builder Workflow Engine Workflow API 用户 Step 1: 构建DAG Step 2: 拓扑排序 Step 3: 并行调度 par [并行执行 Layer 2] loop [每一层] POST /workflows/run {workflow_def} add_node(...) 建立邻接表和入度 DAG对象 topological_sort() Kahn算法 [["data"], ["health", "knowledge"], ["coach"]] 解析变量引用 health_agent.run() knowledge_agent.run() health_result knowledge_result 更新Context 最终结果 显示训练计划

八、踩坑记录与解决方案

坑1:浮点数精度导致的排序错误

现象:在计算依赖权重时,浮点数比较失败。

解决方案:涉及排序和比较时,统一转为整数或使用decimal模块。

坑2:异步上下文中的变量隔离

现象:并行任务修改了同一个context字典,导致数据竞争。

解决方案

  • 使用 asyncio.Lock 保护共享资源。
  • 或者像我们这样,每层执行完后统一合并结果,执行期间只读。

坑3:深层嵌套引用的性能

现象{{a.b.c.d.e.f}} 解析很慢。

解决方案

  • 缓存解析结果。
  • 限制嵌套深度(如最多5层)。

九、总结与展望

核心价值

  1. 自动化并行:开发者只需定义依赖,引擎自动识别并行机会。
  2. 解耦逻辑:业务逻辑与调度逻辑分离,易于维护。
  3. 可视化潜力:DAG结构可以直接转换为前端流程图。

后续优化

  1. 持久化:将工作流定义存入数据库。
  2. 动态分支:支持根据上一步结果决定下一步走哪条路(条件DAG)。
  3. 子工作流:支持嵌套DAG,实现模块化复用。

十、完整源码

GitHub仓库AiRunCoachAgent

快速演示AiRunCoachAgent

核心文件清单

app/
├── services/
│   ├── dag_builder.py               # DAG构建与验证
│   ├── workflow_engine.py           # 并行调度引擎
│   ├── variable_resolver.py         # 变量引用解析
│   └── workflow_service.py          # 业务封装
├── api/
│   └── workflow_api.py              # 工作流管理API

如果你觉得这篇文章对你有帮助,欢迎点赞、收藏、转发!有任何问题或建议,请在评论区留言讨论。 🏃‍♂️💨

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐