Multi-Agent 的 DAG 工作流设计模式:工程级图结构调度法
Multi-Agent 的 DAG 工作流设计模式:工程级图结构调度法
摘要/引言
开门见山:一个“AI 流水线车间”的故事
想象你经营着一家无人AI 咖啡甜品站:
- 接到用户订单后,系统首先得做需求拆解与确认(Agent 1:产品顾问)—— 比如订单里的“招牌芝士慕斯”要不要无糖、常温/冷冻?咖啡要什么杯型、冰度、糖度?
- 确认无误后,同时启动两条生产链:
- 咖啡链:咖啡研磨(Agent 2:磨豆师)→ 萃取浓缩液(Agent 3:萃取师)→ 加奶/气泡水/调味(Agent 4:调饮师)→ 成品检测包装(Agent 5:质检员A)
- 慕斯链:解冻预制品/备料现做(Agent 6:备料师)→ 成型装饰(Agent 7:裱花师)→ 成品检测包装(Agent 8:质检员B)
- 两条链的成品都到位后,由出餐调度(Agent 9:调度员)核对订单完整性、打印小票、通知骑手取餐。
这个无人站的运作逻辑,就是典型的多智能体(Multi-Agent, MA)有向无环图(Directed Acyclic Graph, DAG)工作流!
问题陈述
在大语言模型(Large Language Model, LLM)普及的今天,单Agent已经能解决很多简单问题——比如写一封邮件、翻译一段文本、回答一个常识问题。但当我们遇到复杂、跨领域、多步骤、有依赖关系的任务时:
- 单Agent要么能力不足(比如既不懂咖啡萃取,又不懂裱花装饰);
- 要么效率低下(做完备料再做研磨,串行执行浪费时间);
- 要么容错性差(备料出错了,整个流程都得重来)。
这时,我们需要多个 specialized(专业化)Agent 协同工作,而DAG 是描述这种协同依赖关系最直观、最严谨的数学模型和工程结构。
不过,目前关于 MA-DAG 工作流的资料要么太理论化(只讲多智能体协作的博弈论、马尔可夫决策过程),要么太零散化(只讲某个框架比如 LangChain、AutoGen 的“玩具级”示例),缺少一套工程级的、可落地的设计模式、调度算法和最佳实践。
核心价值
读完本文,你将掌握:
- MA-DAG 工作流的核心概念、边界与外延:彻底搞懂什么是 MA-DAG、它和普通 DAG 流水线、单Agent 链式调用、AutoGen 的“群体对话”有什么区别;
- MA-DAG 的概念结构与核心要素:拆解成 Agent、任务、边(依赖关系)、调度器四大组件,并用Mermaid ER图、交互图、属性对比表把它们的关系讲得明明白白;
- 工程级调度算法:从简单的拓扑排序调度,到复杂的带优先级、资源约束、容错的调度,配有Mermaid 流程图和完整的 Python 源代码;
- 真实场景应用与完整项目实现:以本文开头的“无人AI 咖啡甜品站”为例,从环境安装、系统架构设计、接口设计,到核心代码实现,一步一步带你落地;
- 行业发展与未来趋势:梳理 MA-DAG 工作流从传统软件流水线到 AI 协作系统的演变历史,展望 Agent 网络拓扑优化、自适应调度、联邦 MA-DAG 等前沿方向;
- 工程最佳实践与避坑指南:比如如何设计专业化Agent、如何处理循环依赖、如何做资源隔离、如何优化性能等。
文章概述
本文将分为以下九个章节展开:
- MA-DAG 工作流的核心概念与边界定义:解决“是什么、不是什么”的问题;
- MA-DAG 的概念结构与核心要素:拆解四大组件,建立完整的理论框架;
- MA-DAG 工作流的数学模型与拓扑分析:用图论、概率论的语言严谨描述 MA-DAG,并介绍基本的拓扑排序算法;
- 工程级 MA-DAG 调度算法设计:从简单到复杂,讲解5种实用的调度算法;
- 真实场景应用:无人AI 咖啡甜品站的项目设计:详细介绍项目背景、需求分析、架构设计、接口设计;
- 无人AI 咖啡甜品站的核心实现源代码:提供从 Agent 定义、DAG 构建到调度执行的完整 Python 代码;
- MA-DAG 工作流的最佳实践与避坑指南:总结工程落地中的10个核心要点;
- MA-DAG 工作流的行业发展与未来趋势:梳理历史、展望未来;
- 本章小结与后续探索:回顾全文内容,给出后续学习和实践的方向。
一、 MA-DAG 工作流的核心概念与边界定义
核心概念
在深入讲解之前,我们先把几个最基础、最容易混淆的概念定义清楚:
1.1 多智能体系统(Multi-Agent System, MAS)
核心概念:MAS 是由多个**自主(autonomous)、交互(interactive)、智能(intelligent)的实体(Agent)组成的系统,这些 Agent 通过通信(communication)、协作(cooperation)、协调(coordination)甚至竞争(competition)**来共同解决单个 Agent 无法解决或解决不好的问题。
MAS 的四大核心特征(Wooldridge & Jennings 经典定义):
- 自主性(Autonomy):Agent 能在没有人类或其他 Agent 直接干预的情况下,自主控制自身的行为和内部状态;
- 社会性(Sociality):Agent 能通过某种通信机制(比如消息队列、API 调用、自然语言对话)与其他 Agent 交互;
- 反应性(Reactivity):Agent 能感知环境的变化,并及时做出响应;
- 主动性(Pro-activity):Agent 不仅能被动响应环境,还能主动采取行动以实现预设的目标。
1.2 有向无环图(Directed Acyclic Graph, DAG)
核心概念:DAG 是一种**没有有向环(Directed Cycle)的图,由顶点(Vertex/Node)和有向边(Directed Edge)**组成。
DAG 的三大核心特征:
- 有向性(Directedness):所有的边都有明确的方向(比如从顶点 A 指向顶点 B);
- 无环性(Acyclicity):不存在一条路径,能从某个顶点出发,沿着有向边走,最终回到这个顶点;
- 拓扑排序性(Topological Sortability):可以对顶点进行线性排序,使得对于任意一条有向边 u→vu \rightarrow vu→v,顶点 uuu 在排序中都出现在顶点 vvv 的前面(拓扑排序可能不唯一)。
DAG 的常见应用场景:
- 软件工程中的依赖管理(比如 Maven/Gradle 的项目依赖、Webpack 的模块依赖);
- 数据工程中的ETL 流水线(比如 Apache Airflow、Prefect 的任务调度);
- 计算机科学中的编译优化(比如指令调度、数据流分析);
- 机器学习中的神经网络推理(比如 Transformer 的注意力机制计算、模型并行训练)。
1.3 工作流(Workflow)
核心概念:工作流是对业务流程或任务流程的自动化或半自动化建模,它定义了流程中各个步骤的执行顺序、输入输出、依赖关系、资源需求以及异常处理规则。
工作流的三大核心要素(Workflow Management Coalition, WfMC 经典定义):
- 流程定义(Process Definition):用某种可视化或形式化语言(比如 BPMN、DAG、YAML)对流程进行建模;
- 流程实例(Process Instance):流程定义的一次具体执行(比如某个用户的咖啡订单,就是“咖啡甜品生产流程”的一个实例);
- 工作流引擎(Workflow Engine):负责解析流程定义、创建流程实例、调度执行各个步骤、处理异常和监控流程状态的软件组件。
1.4 MA-DAG 工作流(Multi-Agent Directed Acyclic Graph Workflow)
核心概念:MA-DAG 工作流是一种以 DAG 为流程建模工具、以多个专业化 Agent 为流程执行单元、以工作流引擎为调度核心的多智能体协作系统。
在 MA-DAG 工作流中:
- DAG 的顶点(Node):对应一个任务(Task),每个任务由一个指定的专业化 Agent 执行(当然,也可以有多个 Agent 竞争执行同一个任务,或者一个 Agent 负责执行多个任务);
- DAG 的有向边(Edge):对应任务之间的依赖关系—— 只有当“源顶点(Source Node)”对应的任务执行成功并输出了结果后,“目标顶点(Target Node)”对应的任务才能开始执行;
- 工作流引擎(Scheduler/Orchestrator):除了普通工作流引擎的功能外,还需要负责Agent 的注册与发现、资源分配、负载均衡、容错处理、通信协调等 MAS 特有的功能。
问题背景
MA-DAG 工作流的出现,是技术发展和业务需求共同驱动的结果:
2.1 技术背景
- 大语言模型(LLM)的普及:2022年11月 ChatGPT 发布以来,LLM 的能力得到了爆发式提升——不仅能理解自然语言、生成文本,还能调用工具(Tool Calling)、编写代码、进行推理。这使得我们可以用 LLM 快速构建专业化的智能 Agent(比如专门做产品顾问的 Agent、专门做磨豆师的 Agent);
- 微服务架构的成熟:微服务架构将一个大型应用拆分成多个独立部署、独立扩展的服务,每个服务负责一个特定的功能。这为Agent 的分布式部署和管理提供了很好的技术基础;
- DAG 工作流引擎的完善:Apache Airflow、Prefect、Dagster 等传统 DAG 工作流引擎已经非常成熟,它们提供了强大的流程定义、调度执行、监控告警功能。我们可以在这些引擎的基础上,扩展支持 MAS 特有的功能,快速构建 MA-DAG 工作流系统;
- 消息队列和事件驱动架构的发展:Kafka、RabbitMQ、Redis Stream 等消息队列的普及,使得Agent 之间的通信变得更加高效、可靠、异步化。
2.2 业务背景
- 复杂任务的需求增加:在金融、医疗、教育、制造业等领域,越来越多的任务需要跨领域、多步骤、有依赖关系地完成——比如金融风控中的“用户画像生成→信用评分计算→风险等级评估→贷款审批决策”,医疗诊断中的“症状收集→初步诊断→检查建议→检查结果分析→最终诊断→治疗方案制定”;
- 效率和成本的要求提高:企业需要更快、更便宜、更准确地完成这些复杂任务——单Agent 串行执行太慢,群体对话式协作不确定性太高,MA-DAG 工作流既能并行执行无依赖的任务,又能通过专业化 Agent 提高准确性和效率;
- 容错性和可扩展性的要求提高:企业需要系统在某个环节出错时,能快速恢复或降级处理,同时需要系统能根据业务量的变化,灵活扩展 Agent 的数量——MA-DAG 工作流的无环性和分布式部署特性,正好满足了这些要求。
问题描述
虽然 MA-DAG 工作流的概念听起来很美好,但在工程落地时,我们会遇到很多具体的、棘手的问题:
3.1 理论层面的问题
- 如何定义 MA-DAG 的数学模型? 普通 DAG 的数学模型已经很成熟,但 MA-DAG 涉及到 Agent、任务、资源、通信、容错等多个维度,需要建立一个更复杂、更全面的数学模型;
- 如何分析 MA-DAG 的性能? 比如如何计算 MA-DAG 的最短完成时间(Makespan)、资源利用率、Agent 负载均衡度?
- 如何处理 MA-DAG 中的不确定性? 比如 Agent 可能会超时、出错,任务的执行时间可能是随机的,如何在这种情况下保证系统的可靠性和稳定性?
3.2 工程层面的问题
- 如何设计专业化的 Agent? 比如 Agent 的职责边界是什么?Agent 需要具备哪些能力?Agent 的输入输出格式是什么?
- 如何构建 MA-DAG? 比如如何可视化地构建 DAG?如何避免循环依赖?如何定义任务的依赖关系(比如数据依赖、控制依赖、时间依赖)?
- 如何调度 MA-DAG? 比如如何选择下一个要执行的任务?如何分配 Agent 给任务?如何处理资源约束?如何处理任务超时和出错?
- 如何实现 Agent 之间的通信? 比如用什么通信协议?用什么消息格式?如何保证通信的可靠性和安全性?
- 如何监控和运维 MA-DAG 工作流系统? 比如如何监控 Agent 的状态?如何监控任务的执行情况?如何做日志收集和分析?如何做告警和故障恢复?
问题解决
本文将重点解决工程层面的问题,同时会给出理论层面的基础框架和分析方法:
-
针对理论层面的问题:
- 在第三章,我们会用图论、概率论、排队论的语言,建立 MA-DAG 的数学模型;
- 在第三章和第四章,我们会介绍如何计算 MA-DAG 的最短完成时间、资源利用率等性能指标;
- 在第四章,我们会介绍如何处理任务超时和出错的容错调度算法。
-
针对工程层面的问题:
- 在第二章,我们会详细讲解如何设计专业化的 Agent;
- 在第二章和第六章,我们会讲解如何用 YAML 文件和 Python 代码构建 MA-DAG,以及如何避免循环依赖;
- 在第四章和第六章,我们会详细讲解5种工程级的调度算法,并提供完整的 Python 源代码;
- 在第六章,我们会讲解如何用 Redis Stream 实现 Agent 之间的通信;
- 在第七章,我们会给出监控和运维 MA-DAG 工作流系统的最佳实践。
边界与外延
为了避免概念混淆,我们需要明确MA-DAG 工作流的边界——即它是什么,不是什么,以及它和其他相关概念的关系:
5.1 MA-DAG 工作流 vs 普通 DAG 工作流
| 对比维度 | 普通 DAG 工作流 | MA-DAG 工作流 |
|---|---|---|
| 执行单元 | 普通的函数/脚本/容器化任务 | 专业化的、自主的、交互的智能 Agent |
| 依赖关系 | 主要是数据依赖和控制依赖 | 除了数据依赖和控制依赖,还可能有 Agent 通信依赖 |
| 调度逻辑 | 主要考虑拓扑排序、资源约束、优先级 | 除了普通调度逻辑,还要考虑 Agent 注册发现、负载均衡、容错、通信协调 |
| 不确定性 | 较低(任务执行时间、结果通常可预测) | 较高(Agent 可能超时、出错,结果可能不确定) |
| 扩展性 | 主要是任务级的扩展(增加更多任务) | 主要是 Agent 级的扩展(增加更多 Agent) |
| 典型应用场景 | ETL 流水线、CI/CD 流水线、批量数据处理 | 复杂 AI 协作任务、跨领域业务流程自动化、无人系统 |
简单来说:普通 DAG 工作流是“自动化的任务流水线”,而 MA-DAG 工作流是“自动化的智能 Agent 协作流水线”。
5.2 MA-DAG 工作流 vs 单 Agent 链式调用
| 对比维度 | 单 Agent 链式调用 | MA-DAG 工作流 |
|---|---|---|
| Agent 数量 | 1个(通用 Agent) | N个(专业化 Agent,N≥2) |
| 执行方式 | 串行执行(一步一步来) | 并行/串行混合执行(无依赖的任务可以并行) |
| 能力范围 | 受限于单个 Agent 的能力 | 可以整合多个 Agent 的能力,解决更复杂的问题 |
| 效率 | 较低(串行执行浪费时间) | 较高(并行执行提高效率) |
| 容错性 | 较差(某个步骤出错,整个流程都得重来) | 较好(可以针对单个任务/Agent 做容错处理) |
| 典型示例 | LangChain 的 Sequential Chain | 本文开头的无人AI 咖啡甜品站 |
简单来说:单 Agent 链式调用是“一个人包办所有事情”,而 MA-DAG 工作流是“一群专业的人分工协作”。
5.3 MA-DAG 工作流 vs AutoGen 的“群体对话”
| 对比维度 | AutoGen 的群体对话 | MA-DAG 工作流 |
|---|---|---|
| 流程建模方式 | 自然语言对话、角色定义、规则约束(无严格的拓扑结构) | DAG(严格的有向无环拓扑结构) |
| 执行方式 | 异步/同步对话,Agent 可以自由发言(不确定性高) | 严格按照 DAG 的拓扑顺序执行(不确定性低) |
| 依赖关系 | 隐式的(通过对话内容体现) | 显式的(通过 DAG 的边体现) |
| 效率 | 较低(可能会有冗余对话) | 较高(无冗余对话,无依赖的任务可以并行) |
| 可控性 | 较低(很难预测 Agent 接下来会说什么、做什么) | 较高(可以严格控制任务的执行顺序和 Agent 的行为) |
| 典型应用场景 | 创意写作、头脑风暴、开放式问题讨论 | 复杂但结构化的业务流程自动化、无人系统、批量处理 |
简单来说:AutoGen 的群体对话是“一群人开会讨论,自由发言”,而 MA-DAG 工作流是“一群人按照严格的 SOP 分工协作”。
5.4 MA-DAG 工作流的外延
MA-DAG 工作流的外延非常丰富,目前比较热门的研究和应用方向包括:
- 自适应 MA-DAG(Adaptive MA-DAG):系统能根据环境的变化(比如业务量的变化、Agent 的状态变化、任务执行时间的变化),动态调整 DAG 的拓扑结构和调度策略;
- 联邦 MA-DAG(Federated MA-DAG):多个 MA-DAG 工作流系统分布在不同的组织或地理位置,它们通过联邦学习或联邦通信的方式,协同完成一个更大的任务,同时保护各自的数据隐私;
- 强化学习驱动的 MA-DAG(RL-driven MA-DAG):用强化学习算法优化 DAG 的拓扑结构、调度策略、Agent 的行为,以提高系统的性能;
- MA-DAG 与知识图谱的结合:用知识图谱补充 Agent 的背景知识,用 MA-DAG 工作流自动化知识图谱的构建、更新、推理;
- MA-DAG 与机器人流程自动化(RPA)的结合:用 RPA 机器人替代或补充 Agent 执行一些需要物理操作的任务,用 MA-DAG 工作流整合 RPA 机器人和 AI Agent 的能力。
二、 MA-DAG 的概念结构与核心要素
核心概念
在第一章,我们已经把 MA-DAG 工作流定义为“以 DAG 为流程建模工具、以多个专业化 Agent 为流程执行单元、以工作流引擎为调度核心的多智能体协作系统”。
在本章,我们将把 MA-DAG 工作流的概念结构拆解成四大核心组件:
- Agent(智能体):流程的执行单元;
- Task(任务):流程的基本执行步骤;
- Edge(边/依赖关系):连接任务的桥梁,定义任务之间的执行顺序和数据传递规则;
- Scheduler/Orchestrator(调度器/编排器):MA-DAG 工作流的核心大脑,负责流程定义的解析、任务的调度执行、Agent 的管理、异常的处理等。
接下来,我们将详细讲解每个核心组件的定义、核心属性、类型、设计方法,并用Mermaid ER图、交互图、属性对比表把它们的关系讲得明明白白。
概念结构与核心要素组成
1.1 核心要素 ER 实体关系图
首先,我们用 Mermaid ER图 来展示四大核心组件之间的关系:
1.2 核心要素交互关系图
接下来,我们用 Mermaid 交互图(Sequence Diagram)来展示四大核心组件在一次简单的 MA-DAG 工作流执行过程中的交互:
假设我们的 MA-DAG 工作流只有两个任务:
- 任务 A:产品顾问(Agent 1 执行)
- 任务 B:咖啡研磨(Agent 2 执行)
- 边 A→B:数据依赖(任务 A 的输出作为任务 B 的输入)
交互流程如下:
核心要素详解
接下来,我们将详细讲解每个核心要素的定义、核心属性、类型、设计方法:
2.1 Agent(智能体)
2.1.1 定义
Agent 是 MA-DAG 工作流的执行单元,它是一个自主的、交互的、智能的软件实体(或硬件实体,比如 RPA 机器人、无人机),负责执行一个或多个特定的任务。
2.1.2 核心属性
根据第一章的 Wooldridge & Jennings 经典定义,以及工程落地的实际需求,我们总结出 Agent 的10个核心属性:
| 属性名称 | 定义 | 示例 |
|---|---|---|
| agent_id | Agent 的唯一标识符,用于区分不同的 Agent | "agent_001_product_consultant" |
| agent_name | Agent 的人类可读名称 | "产品顾问 Agent" |
| agent_type | Agent 的类型,用于区分不同能力的 Agent | LLM_AGENT, TOOL_AGENT, HUMAN_AGENT, HYBRID_AGENT |
| capabilities | Agent 的能力列表,用于描述 Agent 能执行哪些任务 | ["product_consultation", "order_confirmation"] |
| status | Agent 的当前状态,用于调度器判断是否能分配任务给该 Agent | REGISTERED, ONLINE, OFFLINE, BUSY, ERROR |
| current_load | Agent 当前正在执行的任务数,用于负载均衡 | 0(空闲),2(正在执行2个任务) |
| max_load | Agent 最大能并发执行的任务数,用于资源约束调度 | 5(最多同时执行5个任务) |
| input_schema | Agent 输入数据的 Schema(JSON Schema 格式),用于数据校验和转换 | 要求输入包含 user_id 和 order_details 字段 |
| output_schema | Agent 输出数据的 Schema(JSON Schema 格式),用于数据校验和转换 | 要求输出包含 confirmed_order 和 special_requirements 字段 |
| agent_config | Agent 的配置信息,比如 LLM 模型名称、API 密钥、最大并发数、超时时间等 | {"llm_model": "gpt-4o", "api_key": "sk-xxx", "timeout": 30, "max_retries": 3} |
2.1.3 类型
根据 Agent 的实现方式和能力范围,我们可以将 Agent 分为以下5种常见类型:
| Agent 类型 | 定义 | 实现方式 | 典型应用场景 |
|---|---|---|---|
| LLM Agent | 主要基于大语言模型构建的 Agent,能理解自然语言、生成文本、调用工具、进行推理 | LangChain, AutoGen, LlamaIndex, 自定义框架 | 产品顾问、文案撰写、代码生成、初步诊断 |
| Tool Agent | 主要基于传统软件工具或 API 构建的 Agent,能执行特定的软件操作或硬件操作 | 封装 REST API、数据库操作、RPA 机器人脚本、硬件驱动程序 | 咖啡研磨、萃取浓缩液、发送邮件、查询数据库、控制无人机 |
| Human Agent | 由人类操作员扮演的 Agent,用于执行那些 AI 暂时无法完成或需要人类监督的任务 | Web 界面、移动端 App、企业微信/钉钉机器人 | 复杂问题的最终决策、高风险任务的审核、创意内容的润色、故障的人工排查 |
| Hybrid Agent | 结合了 LLM Agent、Tool Agent 和/或 Human Agent 能力的 Agent | 先由 LLM Agent 理解需求,然后调用 Tool Agent 执行操作,最后由 Human Agent 审核 | 金融贷款审批、医疗最终诊断、法律合同审查、复杂订单的处理 |
| Swarm Agent | 由多个小型 Agent 组成的群体 Agent,能通过协作完成更复杂的任务 | 基于 AutoGen Swarm、Custom Swarm 框架构建 | 大规模文本分类、分布式数据处理、复杂优化问题的求解 |
2.1.4 工程级设计方法
设计一个工程级的、可落地的、专业化的 Agent,需要遵循以下6个核心步骤:
-
明确 Agent 的职责边界:
- 遵循单一职责原则(Single Responsibility Principle, SRP):一个 Agent 只负责一个或少数几个高度相关的任务,不要让 Agent 承担太多职责;
- 比如不要设计一个“全能咖啡师 Agent”,而是设计“产品顾问 Agent”、“磨豆师 Agent”、“萃取师 Agent”、“调饮师 Agent”、“裱花师 Agent”、“质检员 Agent”等专业化 Agent。
-
定义 Agent 的能力列表:
- 用清晰、明确、可衡量的语言描述 Agent 能执行哪些任务;
- 比如“产品顾问 Agent”的能力列表可以是:
["product_consultation: 根据用户的模糊需求推荐合适的咖啡和甜品", "order_confirmation: 确认用户订单的所有细节(比如杯型、冰度、糖度、特殊要求)", "question_answering: 回答用户关于产品、价格、配送时间等方面的问题"]
-
定义 Agent 的输入输出 Schema:
- 使用JSON Schema 格式严格定义 Agent 的输入输出数据结构,这样可以:
- 实现数据的自动校验,避免因数据格式错误导致任务失败;
- 实现数据的自动转换,方便不同 Agent 之间的数据传递;
- 实现 Agent 的自动发现,调度器可以根据任务的输入输出 Schema 匹配合适的 Agent;
- 比如“产品顾问 Agent”的输入 Schema 可以是:
{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Product Consultant Input", "type": "object", "properties": { "user_id": { "type": "string", "description": "用户唯一标识符" }, "user_query": { "type": "string", "description": "用户的原始需求或问题" }, "context": { "type": "object", "description": "上下文信息(比如用户的历史订单、产品目录、当前促销活动)", "properties": { "historical_orders": { "type": "array", "items": { "type": "object" } }, "product_catalog": { "type": "array", "items": { "type": "object" } }, "promotions": { "type": "array", "items": { "type": "object" } } }, "required": ["product_catalog"] } }, "required": ["user_id", "user_query", "context"] } - “产品顾问 Agent”的输出 Schema 可以是:
{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Product Consultant Output", "type": "object", "properties": { "task_type": { "type": "string", "enum": ["product_recommendation", "order_confirmation", "question_answer"], "description": "本次执行的任务类型" }, "result": { "type": "object", "description": "任务执行结果" } }, "required": ["task_type", "result"] }
- 使用JSON Schema 格式严格定义 Agent 的输入输出数据结构,这样可以:
-
实现 Agent 的核心逻辑:
- 根据 Agent 的类型,选择合适的实现方式:
- 如果是 LLM Agent:可以使用 LangChain、AutoGen、LlamaIndex 等框架快速构建,核心逻辑包括提示词工程(Prompt Engineering)、工具调用(Tool Calling)、记忆管理(Memory Management)、推理链(Chain of Thought)等;
- 如果是 Tool Agent:可以封装现有的 REST API、数据库操作、RPA 机器人脚本、硬件驱动程序,核心逻辑包括输入数据的解析、工具的调用、输出数据的格式化、异常的处理等;
- 如果是 Human Agent:可以开发一个 Web 界面或移动端 App,让人类操作员可以查看任务、输入结果、提交审核意见,核心逻辑包括任务的推送、结果的收集、状态的更新等;
- 比如“产品顾问 Agent”(LLM Agent)的核心提示词可以是:
PRODUCT_CONSULTANT_PROMPT = """ 你是一位专业的无人AI 咖啡甜品站的产品顾问,你的职责是: 1. 根据用户的模糊需求推荐合适的咖啡和甜品; 2. 确认用户订单的所有细节(比如杯型、冰度、糖度、特殊要求); 3. 回答用户关于产品、价格、配送时间等方面的问题。 你的输入数据包含: - user_id:用户唯一标识符; - user_query:用户的原始需求或问题; - context:上下文信息,包含: - historical_orders:用户的历史订单(可选); - product_catalog:产品目录(必填),包含所有咖啡和甜品的名称、价格、描述、配料、可选配置等信息; - promotions:当前促销活动(可选)。 你的输出数据必须严格符合以下 JSON Schema 格式: {output_schema} 请你先分析用户的需求或问题,然后选择合适的任务类型,最后输出符合要求的结果。 如果用户的需求不明确,请不要猜测,而是在 result 中要求用户补充更多信息。 """
- 根据 Agent 的类型,选择合适的实现方式:
-
实现 Agent 的通信与心跳机制:
- Agent 需要与调度器和任务队列进行通信,主要通信内容包括:
- 注册与注销:Agent 启动时向调度器注册,停止时向调度器注销;
- 心跳:Agent 定期(比如每10秒)向调度器发送心跳,报告自己的当前状态(ONLINE/OFFLINE/BUSY/ERROR)和当前负载;
- 任务的接收与执行结果的报告:Agent 从任务队列中接收任务,执行完成后向任务队列报告执行结果(SUCCESS/FAILED/TIMEOUT);
- 通信协议的选择:
- 注册、注销、心跳:可以使用 REST API 或 gRPC(如果对性能要求较高);
- 任务的接收与执行结果的报告:可以使用 消息队列(比如 Redis Stream、Kafka、RabbitMQ),这样可以实现异步通信,提高系统的可靠性和可扩展性;
- 比如 Agent 的心跳机制的 Python 伪代码可以是:
import time import requests class Agent: def __init__(self, agent_id, scheduler_url, heartbeat_interval=10): self.agent_id = agent_id self.scheduler_url = scheduler_url self.heartbeat_interval = heartbeat_interval self.status = "REGISTERED" self.current_load = 0 self.max_load = 5 self.running = False def start(self): self.running = True # 启动时向调度器注册 self.register() # 启动心跳线程 import threading heartbeat_thread = threading.Thread(target=self.send_heartbeat, daemon=True) heartbeat_thread.start() # 启动任务接收线程 task_thread = threading.Thread(target=self.receive_tasks, daemon=True) task_thread.start() print(f"Agent {self.agent_id} started successfully") def stop(self): self.running = False # 停止时向调度器注销 self.unregister() print(f"Agent {self.agent_id} stopped successfully") def register(self): try: response = requests.post( f"{self.scheduler_url}/agents/register", json={ "agent_id": self.agent_id, "agent_name": "产品顾问 Agent", "agent_type": "LLM_AGENT", "capabilities": ["product_consultation", "order_confirmation", "question_answering"], "max_load": self.max_load, "input_schema": INPUT_SCHEMA, "output_schema": OUTPUT_SCHEMA, "agent_config": AGENT_CONFIG }, timeout=5 ) response.raise_for_status() self.status = "ONLINE" print(f"Agent {self.agent_id} registered successfully") except Exception as e: print(f"Failed to register Agent {self.agent_id}: {e}") self.status = "ERROR" def unregister(self): try: response = requests.post( f"{self.scheduler_url}/agents/unregister", json={"agent_id": self.agent_id}, timeout=5 ) response.raise_for_status() print(f"Agent {self.agent_id} unregistered successfully") except Exception as e: print(f"Failed to unregister Agent {self.agent_id}: {e}") def send_heartbeat(self): while self.running: try: response = requests.post( f"{self.scheduler_url}/agents/heartbeat", json={ "agent_id": self.agent_id, "status": self.status, "current_load": self.current_load }, timeout=5 ) response.raise_for_status() # print(f"Agent {self.agent_id} sent heartbeat successfully") except Exception as e: print(f"Failed to send heartbeat for Agent {self.agent_id}: {e}") self.status = "OFFLINE" time.sleep(self.heartbeat_interval) def receive_tasks
- Agent 需要与调度器和任务队列进行通信,主要通信内容包括:
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)