Multi-Agent 的 DAG 工作流设计模式:工程级图结构调度法


摘要/引言

开门见山:一个“AI 流水线车间”的故事

想象你经营着一家无人AI 咖啡甜品站

  1. 接到用户订单后,系统首先得做需求拆解与确认(Agent 1:产品顾问)—— 比如订单里的“招牌芝士慕斯”要不要无糖、常温/冷冻?咖啡要什么杯型、冰度、糖度?
  2. 确认无误后,同时启动两条生产链
    • 咖啡链:咖啡研磨(Agent 2:磨豆师)→ 萃取浓缩液(Agent 3:萃取师)→ 加奶/气泡水/调味(Agent 4:调饮师)→ 成品检测包装(Agent 5:质检员A)
    • 慕斯链:解冻预制品/备料现做(Agent 6:备料师)→ 成型装饰(Agent 7:裱花师)→ 成品检测包装(Agent 8:质检员B)
  3. 两条链的成品都到位后,由出餐调度(Agent 9:调度员)核对订单完整性、打印小票、通知骑手取餐。

这个无人站的运作逻辑,就是典型的多智能体(Multi-Agent, MA)有向无环图(Directed Acyclic Graph, DAG)工作流

问题陈述

在大语言模型(Large Language Model, LLM)普及的今天,单Agent已经能解决很多简单问题——比如写一封邮件、翻译一段文本、回答一个常识问题。但当我们遇到复杂、跨领域、多步骤、有依赖关系的任务时:

  • 单Agent要么能力不足(比如既不懂咖啡萃取,又不懂裱花装饰);
  • 要么效率低下(做完备料再做研磨,串行执行浪费时间);
  • 要么容错性差(备料出错了,整个流程都得重来)。

这时,我们需要多个 specialized(专业化)Agent 协同工作,而DAG 是描述这种协同依赖关系最直观、最严谨的数学模型和工程结构

不过,目前关于 MA-DAG 工作流的资料要么太理论化(只讲多智能体协作的博弈论、马尔可夫决策过程),要么太零散化(只讲某个框架比如 LangChain、AutoGen 的“玩具级”示例),缺少一套工程级的、可落地的设计模式、调度算法和最佳实践

核心价值

读完本文,你将掌握:

  1. MA-DAG 工作流的核心概念、边界与外延:彻底搞懂什么是 MA-DAG、它和普通 DAG 流水线、单Agent 链式调用、AutoGen 的“群体对话”有什么区别;
  2. MA-DAG 的概念结构与核心要素:拆解成 Agent、任务、边(依赖关系)、调度器四大组件,并用Mermaid ER图、交互图、属性对比表把它们的关系讲得明明白白;
  3. 工程级调度算法:从简单的拓扑排序调度,到复杂的带优先级、资源约束、容错的调度,配有Mermaid 流程图完整的 Python 源代码
  4. 真实场景应用与完整项目实现:以本文开头的“无人AI 咖啡甜品站”为例,从环境安装、系统架构设计、接口设计,到核心代码实现,一步一步带你落地;
  5. 行业发展与未来趋势:梳理 MA-DAG 工作流从传统软件流水线到 AI 协作系统的演变历史,展望 Agent 网络拓扑优化、自适应调度、联邦 MA-DAG 等前沿方向;
  6. 工程最佳实践与避坑指南:比如如何设计专业化Agent、如何处理循环依赖、如何做资源隔离、如何优化性能等。

文章概述

本文将分为以下九个章节展开:

  1. MA-DAG 工作流的核心概念与边界定义:解决“是什么、不是什么”的问题;
  2. MA-DAG 的概念结构与核心要素:拆解四大组件,建立完整的理论框架;
  3. MA-DAG 工作流的数学模型与拓扑分析:用图论、概率论的语言严谨描述 MA-DAG,并介绍基本的拓扑排序算法;
  4. 工程级 MA-DAG 调度算法设计:从简单到复杂,讲解5种实用的调度算法;
  5. 真实场景应用:无人AI 咖啡甜品站的项目设计:详细介绍项目背景、需求分析、架构设计、接口设计;
  6. 无人AI 咖啡甜品站的核心实现源代码:提供从 Agent 定义、DAG 构建到调度执行的完整 Python 代码;
  7. MA-DAG 工作流的最佳实践与避坑指南:总结工程落地中的10个核心要点;
  8. MA-DAG 工作流的行业发展与未来趋势:梳理历史、展望未来;
  9. 本章小结与后续探索:回顾全文内容,给出后续学习和实践的方向。

一、 MA-DAG 工作流的核心概念与边界定义

核心概念

在深入讲解之前,我们先把几个最基础、最容易混淆的概念定义清楚:

1.1 多智能体系统(Multi-Agent System, MAS)

核心概念:MAS 是由多个**自主(autonomous)、交互(interactive)、智能(intelligent)的实体(Agent)组成的系统,这些 Agent 通过通信(communication)、协作(cooperation)、协调(coordination)甚至竞争(competition)**来共同解决单个 Agent 无法解决或解决不好的问题。

MAS 的四大核心特征(Wooldridge & Jennings 经典定义):

  1. 自主性(Autonomy):Agent 能在没有人类或其他 Agent 直接干预的情况下,自主控制自身的行为和内部状态;
  2. 社会性(Sociality):Agent 能通过某种通信机制(比如消息队列、API 调用、自然语言对话)与其他 Agent 交互;
  3. 反应性(Reactivity):Agent 能感知环境的变化,并及时做出响应;
  4. 主动性(Pro-activity):Agent 不仅能被动响应环境,还能主动采取行动以实现预设的目标。
1.2 有向无环图(Directed Acyclic Graph, DAG)

核心概念:DAG 是一种**没有有向环(Directed Cycle)的图,由顶点(Vertex/Node)有向边(Directed Edge)**组成。

DAG 的三大核心特征

  1. 有向性(Directedness):所有的边都有明确的方向(比如从顶点 A 指向顶点 B);
  2. 无环性(Acyclicity):不存在一条路径,能从某个顶点出发,沿着有向边走,最终回到这个顶点;
  3. 拓扑排序性(Topological Sortability):可以对顶点进行线性排序,使得对于任意一条有向边 u→vu \rightarrow vuv,顶点 uuu 在排序中都出现在顶点 vvv 的前面(拓扑排序可能不唯一)。

DAG 的常见应用场景

  • 软件工程中的依赖管理(比如 Maven/Gradle 的项目依赖、Webpack 的模块依赖);
  • 数据工程中的ETL 流水线(比如 Apache Airflow、Prefect 的任务调度);
  • 计算机科学中的编译优化(比如指令调度、数据流分析);
  • 机器学习中的神经网络推理(比如 Transformer 的注意力机制计算、模型并行训练)。
1.3 工作流(Workflow)

核心概念:工作流是对业务流程或任务流程的自动化或半自动化建模,它定义了流程中各个步骤的执行顺序、输入输出、依赖关系、资源需求以及异常处理规则

工作流的三大核心要素(Workflow Management Coalition, WfMC 经典定义):

  1. 流程定义(Process Definition):用某种可视化或形式化语言(比如 BPMN、DAG、YAML)对流程进行建模;
  2. 流程实例(Process Instance):流程定义的一次具体执行(比如某个用户的咖啡订单,就是“咖啡甜品生产流程”的一个实例);
  3. 工作流引擎(Workflow Engine):负责解析流程定义、创建流程实例、调度执行各个步骤、处理异常和监控流程状态的软件组件。
1.4 MA-DAG 工作流(Multi-Agent Directed Acyclic Graph Workflow)

核心概念:MA-DAG 工作流是一种以 DAG 为流程建模工具、以多个专业化 Agent 为流程执行单元、以工作流引擎为调度核心的多智能体协作系统。

在 MA-DAG 工作流中:

  • DAG 的顶点(Node):对应一个任务(Task),每个任务由一个指定的专业化 Agent 执行(当然,也可以有多个 Agent 竞争执行同一个任务,或者一个 Agent 负责执行多个任务);
  • DAG 的有向边(Edge):对应任务之间的依赖关系—— 只有当“源顶点(Source Node)”对应的任务执行成功并输出了结果后,“目标顶点(Target Node)”对应的任务才能开始执行;
  • 工作流引擎(Scheduler/Orchestrator):除了普通工作流引擎的功能外,还需要负责Agent 的注册与发现、资源分配、负载均衡、容错处理、通信协调等 MAS 特有的功能。

问题背景

MA-DAG 工作流的出现,是技术发展和业务需求共同驱动的结果:

2.1 技术背景
  1. 大语言模型(LLM)的普及:2022年11月 ChatGPT 发布以来,LLM 的能力得到了爆发式提升——不仅能理解自然语言、生成文本,还能调用工具(Tool Calling)、编写代码、进行推理。这使得我们可以用 LLM 快速构建专业化的智能 Agent(比如专门做产品顾问的 Agent、专门做磨豆师的 Agent);
  2. 微服务架构的成熟:微服务架构将一个大型应用拆分成多个独立部署、独立扩展的服务,每个服务负责一个特定的功能。这为Agent 的分布式部署和管理提供了很好的技术基础;
  3. DAG 工作流引擎的完善:Apache Airflow、Prefect、Dagster 等传统 DAG 工作流引擎已经非常成熟,它们提供了强大的流程定义、调度执行、监控告警功能。我们可以在这些引擎的基础上,扩展支持 MAS 特有的功能,快速构建 MA-DAG 工作流系统;
  4. 消息队列和事件驱动架构的发展:Kafka、RabbitMQ、Redis Stream 等消息队列的普及,使得Agent 之间的通信变得更加高效、可靠、异步化。
2.2 业务背景
  1. 复杂任务的需求增加:在金融、医疗、教育、制造业等领域,越来越多的任务需要跨领域、多步骤、有依赖关系地完成——比如金融风控中的“用户画像生成→信用评分计算→风险等级评估→贷款审批决策”,医疗诊断中的“症状收集→初步诊断→检查建议→检查结果分析→最终诊断→治疗方案制定”;
  2. 效率和成本的要求提高:企业需要更快、更便宜、更准确地完成这些复杂任务——单Agent 串行执行太慢,群体对话式协作不确定性太高,MA-DAG 工作流既能并行执行无依赖的任务,又能通过专业化 Agent 提高准确性和效率;
  3. 容错性和可扩展性的要求提高:企业需要系统在某个环节出错时,能快速恢复或降级处理,同时需要系统能根据业务量的变化,灵活扩展 Agent 的数量——MA-DAG 工作流的无环性和分布式部署特性,正好满足了这些要求。

问题描述

虽然 MA-DAG 工作流的概念听起来很美好,但在工程落地时,我们会遇到很多具体的、棘手的问题

3.1 理论层面的问题
  1. 如何定义 MA-DAG 的数学模型? 普通 DAG 的数学模型已经很成熟,但 MA-DAG 涉及到 Agent、任务、资源、通信、容错等多个维度,需要建立一个更复杂、更全面的数学模型
  2. 如何分析 MA-DAG 的性能? 比如如何计算 MA-DAG 的最短完成时间(Makespan)资源利用率Agent 负载均衡度
  3. 如何处理 MA-DAG 中的不确定性? 比如 Agent 可能会超时、出错,任务的执行时间可能是随机的,如何在这种情况下保证系统的可靠性和稳定性
3.2 工程层面的问题
  1. 如何设计专业化的 Agent? 比如 Agent 的职责边界是什么?Agent 需要具备哪些能力?Agent 的输入输出格式是什么?
  2. 如何构建 MA-DAG? 比如如何可视化地构建 DAG?如何避免循环依赖?如何定义任务的依赖关系(比如数据依赖、控制依赖、时间依赖)?
  3. 如何调度 MA-DAG? 比如如何选择下一个要执行的任务?如何分配 Agent 给任务?如何处理资源约束?如何处理任务超时和出错?
  4. 如何实现 Agent 之间的通信? 比如用什么通信协议?用什么消息格式?如何保证通信的可靠性和安全性?
  5. 如何监控和运维 MA-DAG 工作流系统? 比如如何监控 Agent 的状态?如何监控任务的执行情况?如何做日志收集和分析?如何做告警和故障恢复?

问题解决

本文将重点解决工程层面的问题,同时会给出理论层面的基础框架和分析方法:

  1. 针对理论层面的问题

    • 在第三章,我们会用图论、概率论、排队论的语言,建立 MA-DAG 的数学模型;
    • 在第三章和第四章,我们会介绍如何计算 MA-DAG 的最短完成时间、资源利用率等性能指标;
    • 在第四章,我们会介绍如何处理任务超时和出错的容错调度算法。
  2. 针对工程层面的问题

    • 在第二章,我们会详细讲解如何设计专业化的 Agent;
    • 在第二章和第六章,我们会讲解如何用 YAML 文件和 Python 代码构建 MA-DAG,以及如何避免循环依赖;
    • 在第四章和第六章,我们会详细讲解5种工程级的调度算法,并提供完整的 Python 源代码;
    • 在第六章,我们会讲解如何用 Redis Stream 实现 Agent 之间的通信;
    • 在第七章,我们会给出监控和运维 MA-DAG 工作流系统的最佳实践。

边界与外延

为了避免概念混淆,我们需要明确MA-DAG 工作流的边界——即它是什么,不是什么,以及它和其他相关概念的关系:

5.1 MA-DAG 工作流 vs 普通 DAG 工作流
对比维度 普通 DAG 工作流 MA-DAG 工作流
执行单元 普通的函数/脚本/容器化任务 专业化的、自主的、交互的智能 Agent
依赖关系 主要是数据依赖和控制依赖 除了数据依赖和控制依赖,还可能有 Agent 通信依赖
调度逻辑 主要考虑拓扑排序、资源约束、优先级 除了普通调度逻辑,还要考虑 Agent 注册发现、负载均衡、容错、通信协调
不确定性 较低(任务执行时间、结果通常可预测) 较高(Agent 可能超时、出错,结果可能不确定)
扩展性 主要是任务级的扩展(增加更多任务) 主要是 Agent 级的扩展(增加更多 Agent)
典型应用场景 ETL 流水线、CI/CD 流水线、批量数据处理 复杂 AI 协作任务、跨领域业务流程自动化、无人系统

简单来说:普通 DAG 工作流是“自动化的任务流水线”,而 MA-DAG 工作流是“自动化的智能 Agent 协作流水线”。

5.2 MA-DAG 工作流 vs 单 Agent 链式调用
对比维度 单 Agent 链式调用 MA-DAG 工作流
Agent 数量 1个(通用 Agent) N个(专业化 Agent,N≥2)
执行方式 串行执行(一步一步来) 并行/串行混合执行(无依赖的任务可以并行)
能力范围 受限于单个 Agent 的能力 可以整合多个 Agent 的能力,解决更复杂的问题
效率 较低(串行执行浪费时间) 较高(并行执行提高效率)
容错性 较差(某个步骤出错,整个流程都得重来) 较好(可以针对单个任务/Agent 做容错处理)
典型示例 LangChain 的 Sequential Chain 本文开头的无人AI 咖啡甜品站

简单来说:单 Agent 链式调用是“一个人包办所有事情”,而 MA-DAG 工作流是“一群专业的人分工协作”。

5.3 MA-DAG 工作流 vs AutoGen 的“群体对话”
对比维度 AutoGen 的群体对话 MA-DAG 工作流
流程建模方式 自然语言对话、角色定义、规则约束(无严格的拓扑结构) DAG(严格的有向无环拓扑结构)
执行方式 异步/同步对话,Agent 可以自由发言(不确定性高) 严格按照 DAG 的拓扑顺序执行(不确定性低)
依赖关系 隐式的(通过对话内容体现) 显式的(通过 DAG 的边体现)
效率 较低(可能会有冗余对话) 较高(无冗余对话,无依赖的任务可以并行)
可控性 较低(很难预测 Agent 接下来会说什么、做什么) 较高(可以严格控制任务的执行顺序和 Agent 的行为)
典型应用场景 创意写作、头脑风暴、开放式问题讨论 复杂但结构化的业务流程自动化、无人系统、批量处理

简单来说:AutoGen 的群体对话是“一群人开会讨论,自由发言”,而 MA-DAG 工作流是“一群人按照严格的 SOP 分工协作”。

5.4 MA-DAG 工作流的外延

MA-DAG 工作流的外延非常丰富,目前比较热门的研究和应用方向包括:

  1. 自适应 MA-DAG(Adaptive MA-DAG):系统能根据环境的变化(比如业务量的变化、Agent 的状态变化、任务执行时间的变化),动态调整 DAG 的拓扑结构和调度策略
  2. 联邦 MA-DAG(Federated MA-DAG):多个 MA-DAG 工作流系统分布在不同的组织或地理位置,它们通过联邦学习或联邦通信的方式,协同完成一个更大的任务,同时保护各自的数据隐私;
  3. 强化学习驱动的 MA-DAG(RL-driven MA-DAG):用强化学习算法优化 DAG 的拓扑结构、调度策略、Agent 的行为,以提高系统的性能;
  4. MA-DAG 与知识图谱的结合:用知识图谱补充 Agent 的背景知识,用 MA-DAG 工作流自动化知识图谱的构建、更新、推理
  5. MA-DAG 与机器人流程自动化(RPA)的结合:用 RPA 机器人替代或补充 Agent 执行一些需要物理操作的任务,用 MA-DAG 工作流整合 RPA 机器人和 AI Agent 的能力

二、 MA-DAG 的概念结构与核心要素

核心概念

在第一章,我们已经把 MA-DAG 工作流定义为“以 DAG 为流程建模工具、以多个专业化 Agent 为流程执行单元、以工作流引擎为调度核心的多智能体协作系统”。

在本章,我们将把 MA-DAG 工作流的概念结构拆解成四大核心组件

  1. Agent(智能体):流程的执行单元;
  2. Task(任务):流程的基本执行步骤;
  3. Edge(边/依赖关系):连接任务的桥梁,定义任务之间的执行顺序和数据传递规则;
  4. Scheduler/Orchestrator(调度器/编排器):MA-DAG 工作流的核心大脑,负责流程定义的解析、任务的调度执行、Agent 的管理、异常的处理等。

接下来,我们将详细讲解每个核心组件的定义、核心属性、类型、设计方法,并用Mermaid ER图、交互图、属性对比表把它们的关系讲得明明白白。

概念结构与核心要素组成

1.1 核心要素 ER 实体关系图

首先,我们用 Mermaid ER图 来展示四大核心组件之间的关系:

渲染错误: Mermaid 渲染失败: Parse error on line 51: ...product_consultation", "coffee_grinding" -----------------------^ Expecting 'ATTRIBUTE_WORD', got 'COMMENT'
1.2 核心要素交互关系图

接下来,我们用 Mermaid 交互图(Sequence Diagram)来展示四大核心组件在一次简单的 MA-DAG 工作流执行过程中的交互:

假设我们的 MA-DAG 工作流只有两个任务:

  • 任务 A:产品顾问(Agent 1 执行)
  • 任务 B:咖啡研磨(Agent 2 执行)
  • 边 A→B:数据依赖(任务 A 的输出作为任务 B 的输入)

交互流程如下:

Edge 数据存储 Agent 2(咖啡研磨) Agent 1(产品顾问) 任务队列 Agent注册中心 调度器 用户 Edge 数据存储 Agent 2(咖啡研磨) Agent 1(产品顾问) 任务队列 Agent注册中心 调度器 用户 提交工作流实例请求(包含初始输入数据) 1 解析工作流定义(DAG) 2 创建工作流实例 3 保存工作流实例信息和初始输入数据 4 拓扑排序得到任务执行顺序:[A, B] 5 查询任务 A 的候选/指定 Agent 6 返回 Agent 1(状态 ONLINE,当前负载 0) 7 提交任务 A(分配给 Agent 1) 8 返回工作流实例 ID 9 推送任务 A 10 获取任务 A 的输入数据 11 执行任务 A(需求拆解与确认) 12 保存任务 A 的输出数据 13 报告任务 A 执行成功 14 通知任务 A 执行成功 15 更新任务 A 的状态为 SUCCESS 16 检查任务 B 的所有前置依赖是否都已满足(任务 A 已成功) 17 查询任务 B 的候选/指定 Agent 18 返回 Agent 2(状态 ONLINE,当前负载 0) 19 解析边 A→B 的数据转换器,将任务 A 的输出转换成任务 B 的输入 20 保存任务 B 的转换后的输入数据 21 提交任务 B(分配给 Agent 2) 22 推送任务 B 23 获取任务 B 的输入数据 24 执行任务 B(咖啡研磨) 25 保存任务 B 的输出数据 26 报告任务 B 执行成功 27 通知任务 B 执行成功 28 更新任务 B 的状态为 SUCCESS 29 检查工作流实例的所有任务是否都已成功 30 更新工作流实例的状态为 SUCCESS 31 通知工作流实例执行成功(可选,通过Webhook或轮询) 32

核心要素详解

接下来,我们将详细讲解每个核心要素的定义、核心属性、类型、设计方法


2.1 Agent(智能体)
2.1.1 定义

Agent 是 MA-DAG 工作流的执行单元,它是一个自主的、交互的、智能的软件实体(或硬件实体,比如 RPA 机器人、无人机),负责执行一个或多个特定的任务。

2.1.2 核心属性

根据第一章的 Wooldridge & Jennings 经典定义,以及工程落地的实际需求,我们总结出 Agent 的10个核心属性

属性名称 定义 示例
agent_id Agent 的唯一标识符,用于区分不同的 Agent "agent_001_product_consultant"
agent_name Agent 的人类可读名称 "产品顾问 Agent"
agent_type Agent 的类型,用于区分不同能力的 Agent LLM_AGENT, TOOL_AGENT, HUMAN_AGENT, HYBRID_AGENT
capabilities Agent 的能力列表,用于描述 Agent 能执行哪些任务 ["product_consultation", "order_confirmation"]
status Agent 的当前状态,用于调度器判断是否能分配任务给该 Agent REGISTERED, ONLINE, OFFLINE, BUSY, ERROR
current_load Agent 当前正在执行的任务数,用于负载均衡 0(空闲),2(正在执行2个任务)
max_load Agent 最大能并发执行的任务数,用于资源约束调度 5(最多同时执行5个任务)
input_schema Agent 输入数据的 Schema(JSON Schema 格式),用于数据校验和转换 要求输入包含 user_idorder_details 字段
output_schema Agent 输出数据的 Schema(JSON Schema 格式),用于数据校验和转换 要求输出包含 confirmed_orderspecial_requirements 字段
agent_config Agent 的配置信息,比如 LLM 模型名称、API 密钥、最大并发数、超时时间等 {"llm_model": "gpt-4o", "api_key": "sk-xxx", "timeout": 30, "max_retries": 3}
2.1.3 类型

根据 Agent 的实现方式和能力范围,我们可以将 Agent 分为以下5种常见类型

Agent 类型 定义 实现方式 典型应用场景
LLM Agent 主要基于大语言模型构建的 Agent,能理解自然语言、生成文本、调用工具、进行推理 LangChain, AutoGen, LlamaIndex, 自定义框架 产品顾问、文案撰写、代码生成、初步诊断
Tool Agent 主要基于传统软件工具或 API 构建的 Agent,能执行特定的软件操作或硬件操作 封装 REST API、数据库操作、RPA 机器人脚本、硬件驱动程序 咖啡研磨、萃取浓缩液、发送邮件、查询数据库、控制无人机
Human Agent 由人类操作员扮演的 Agent,用于执行那些 AI 暂时无法完成或需要人类监督的任务 Web 界面、移动端 App、企业微信/钉钉机器人 复杂问题的最终决策、高风险任务的审核、创意内容的润色、故障的人工排查
Hybrid Agent 结合了 LLM Agent、Tool Agent 和/或 Human Agent 能力的 Agent 先由 LLM Agent 理解需求,然后调用 Tool Agent 执行操作,最后由 Human Agent 审核 金融贷款审批、医疗最终诊断、法律合同审查、复杂订单的处理
Swarm Agent 由多个小型 Agent 组成的群体 Agent,能通过协作完成更复杂的任务 基于 AutoGen Swarm、Custom Swarm 框架构建 大规模文本分类、分布式数据处理、复杂优化问题的求解
2.1.4 工程级设计方法

设计一个工程级的、可落地的、专业化的 Agent,需要遵循以下6个核心步骤

  1. 明确 Agent 的职责边界

    • 遵循单一职责原则(Single Responsibility Principle, SRP):一个 Agent 只负责一个或少数几个高度相关的任务,不要让 Agent 承担太多职责;
    • 比如不要设计一个“全能咖啡师 Agent”,而是设计“产品顾问 Agent”、“磨豆师 Agent”、“萃取师 Agent”、“调饮师 Agent”、“裱花师 Agent”、“质检员 Agent”等专业化 Agent。
  2. 定义 Agent 的能力列表

    • 清晰、明确、可衡量的语言描述 Agent 能执行哪些任务;
    • 比如“产品顾问 Agent”的能力列表可以是:
      ["product_consultation: 根据用户的模糊需求推荐合适的咖啡和甜品",
       "order_confirmation: 确认用户订单的所有细节(比如杯型、冰度、糖度、特殊要求)",
       "question_answering: 回答用户关于产品、价格、配送时间等方面的问题"]
      
  3. 定义 Agent 的输入输出 Schema

    • 使用JSON Schema 格式严格定义 Agent 的输入输出数据结构,这样可以:
      • 实现数据的自动校验,避免因数据格式错误导致任务失败;
      • 实现数据的自动转换,方便不同 Agent 之间的数据传递;
      • 实现 Agent 的自动发现,调度器可以根据任务的输入输出 Schema 匹配合适的 Agent;
    • 比如“产品顾问 Agent”的输入 Schema 可以是:
      {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "title": "Product Consultant Input",
        "type": "object",
        "properties": {
          "user_id": {
            "type": "string",
            "description": "用户唯一标识符"
          },
          "user_query": {
            "type": "string",
            "description": "用户的原始需求或问题"
          },
          "context": {
            "type": "object",
            "description": "上下文信息(比如用户的历史订单、产品目录、当前促销活动)",
            "properties": {
              "historical_orders": {
                "type": "array",
                "items": {
                  "type": "object"
                }
              },
              "product_catalog": {
                "type": "array",
                "items": {
                  "type": "object"
                }
              },
              "promotions": {
                "type": "array",
                "items": {
                  "type": "object"
                }
              }
            },
            "required": ["product_catalog"]
          }
        },
        "required": ["user_id", "user_query", "context"]
      }
      
    • “产品顾问 Agent”的输出 Schema 可以是:
      {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "title": "Product Consultant Output",
        "type": "object",
        "properties": {
          "task_type": {
            "type": "string",
            "enum": ["product_recommendation", "order_confirmation", "question_answer"],
            "description": "本次执行的任务类型"
          },
          "result": {
            "type": "object",
            "description": "任务执行结果"
          }
        },
        "required": ["task_type", "result"]
      }
      
  4. 实现 Agent 的核心逻辑

    • 根据 Agent 的类型,选择合适的实现方式:
      • 如果是 LLM Agent:可以使用 LangChain、AutoGen、LlamaIndex 等框架快速构建,核心逻辑包括提示词工程(Prompt Engineering)、工具调用(Tool Calling)、记忆管理(Memory Management)、推理链(Chain of Thought)等;
      • 如果是 Tool Agent:可以封装现有的 REST API、数据库操作、RPA 机器人脚本、硬件驱动程序,核心逻辑包括输入数据的解析、工具的调用、输出数据的格式化、异常的处理等;
      • 如果是 Human Agent:可以开发一个 Web 界面或移动端 App,让人类操作员可以查看任务、输入结果、提交审核意见,核心逻辑包括任务的推送、结果的收集、状态的更新等;
    • 比如“产品顾问 Agent”(LLM Agent)的核心提示词可以是:
      PRODUCT_CONSULTANT_PROMPT = """
      你是一位专业的无人AI 咖啡甜品站的产品顾问,你的职责是:
      1. 根据用户的模糊需求推荐合适的咖啡和甜品;
      2. 确认用户订单的所有细节(比如杯型、冰度、糖度、特殊要求);
      3. 回答用户关于产品、价格、配送时间等方面的问题。
      
      你的输入数据包含:
      - user_id:用户唯一标识符;
      - user_query:用户的原始需求或问题;
      - context:上下文信息,包含:
        - historical_orders:用户的历史订单(可选);
        - product_catalog:产品目录(必填),包含所有咖啡和甜品的名称、价格、描述、配料、可选配置等信息;
        - promotions:当前促销活动(可选)。
      
      你的输出数据必须严格符合以下 JSON Schema 格式:
      {output_schema}
      
      请你先分析用户的需求或问题,然后选择合适的任务类型,最后输出符合要求的结果。
      如果用户的需求不明确,请不要猜测,而是在 result 中要求用户补充更多信息。
      """
      
  5. 实现 Agent 的通信与心跳机制

    • Agent 需要与调度器任务队列进行通信,主要通信内容包括:
      • 注册与注销:Agent 启动时向调度器注册,停止时向调度器注销;
      • 心跳:Agent 定期(比如每10秒)向调度器发送心跳,报告自己的当前状态(ONLINE/OFFLINE/BUSY/ERROR)和当前负载;
      • 任务的接收与执行结果的报告:Agent 从任务队列中接收任务,执行完成后向任务队列报告执行结果(SUCCESS/FAILED/TIMEOUT);
    • 通信协议的选择:
      • 注册、注销、心跳:可以使用 REST APIgRPC(如果对性能要求较高);
      • 任务的接收与执行结果的报告:可以使用 消息队列(比如 Redis Stream、Kafka、RabbitMQ),这样可以实现异步通信,提高系统的可靠性和可扩展性;
    • 比如 Agent 的心跳机制的 Python 伪代码可以是:
      import time
      import requests
      
      class Agent:
          def __init__(self, agent_id, scheduler_url, heartbeat_interval=10):
              self.agent_id = agent_id
              self.scheduler_url = scheduler_url
              self.heartbeat_interval = heartbeat_interval
              self.status = "REGISTERED"
              self.current_load = 0
              self.max_load = 5
              self.running = False
      
          def start(self):
              self.running = True
              # 启动时向调度器注册
              self.register()
              # 启动心跳线程
              import threading
              heartbeat_thread = threading.Thread(target=self.send_heartbeat, daemon=True)
              heartbeat_thread.start()
              # 启动任务接收线程
              task_thread = threading.Thread(target=self.receive_tasks, daemon=True)
              task_thread.start()
              print(f"Agent {self.agent_id} started successfully")
      
          def stop(self):
              self.running = False
              # 停止时向调度器注销
              self.unregister()
              print(f"Agent {self.agent_id} stopped successfully")
      
          def register(self):
              try:
                  response = requests.post(
                      f"{self.scheduler_url}/agents/register",
                      json={
                          "agent_id": self.agent_id,
                          "agent_name": "产品顾问 Agent",
                          "agent_type": "LLM_AGENT",
                          "capabilities": ["product_consultation", "order_confirmation", "question_answering"],
                          "max_load": self.max_load,
                          "input_schema": INPUT_SCHEMA,
                          "output_schema": OUTPUT_SCHEMA,
                          "agent_config": AGENT_CONFIG
                      },
                      timeout=5
                  )
                  response.raise_for_status()
                  self.status = "ONLINE"
                  print(f"Agent {self.agent_id} registered successfully")
              except Exception as e:
                  print(f"Failed to register Agent {self.agent_id}: {e}")
                  self.status = "ERROR"
      
          def unregister(self):
              try:
                  response = requests.post(
                      f"{self.scheduler_url}/agents/unregister",
                      json={"agent_id": self.agent_id},
                      timeout=5
                  )
                  response.raise_for_status()
                  print(f"Agent {self.agent_id} unregistered successfully")
              except Exception as e:
                  print(f"Failed to unregister Agent {self.agent_id}: {e}")
      
          def send_heartbeat(self):
              while self.running:
                  try:
                      response = requests.post(
                          f"{self.scheduler_url}/agents/heartbeat",
                          json={
                              "agent_id": self.agent_id,
                              "status": self.status,
                              "current_load": self.current_load
                          },
                          timeout=5
                      )
                      response.raise_for_status()
                      # print(f"Agent {self.agent_id} sent heartbeat successfully")
                  except Exception as e:
                      print(f"Failed to send heartbeat for Agent {self.agent_id}: {e}")
                      self.status = "OFFLINE"
                  time.sleep(self.heartbeat_interval)
      
          def receive_tasks
      
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐