AI 编程3:LangGraph 实现 Orchestrator-Worker(编排器 - 工作者)工作流-test6
作者:WangQiaomei版本:1.0(2026/3/16)
一、核心概念解析
1.1 Orchestrator-Worker 模式是什么?
Orchestrator-Worker(编排器 - 工作者)是 LangGraph 中极具灵活性的工作流模式,核心逻辑是 **“先规划、再执行、最后汇总”**:
- 编排器(Orchestrator):AI 根据输入动态拆分任务(比如写报告时自动规划章节数量),而非预先固定任务列表
- 工作者(Worker):按编排器的规划,逐个执行子任务(比如为每个章节生成内容)
- 合成器(Synthesizer):汇总所有子任务结果,输出最终产物(比如整合所有章节成完整报告)
1.2 与 Routing 模式的核心区别
表格
| 模式 | 核心特点 | 适用场景 |
|---|---|---|
| Routing | 任务固定(如选 “写故事 / 写笑话 / 写诗” 其一),仅做选择执行 | 任务类型明确、数量固定的场景 |
| Orchestrator-Worker | 任务数量不固定,由 AI 动态决定拆分数量 | 报告撰写、多文件代码更新、未知数量子任务的场景 |
1.3 实现目标
输入任意报告主题(如 “人工智能的发展历史”),程序自动完成:
- AI 智能规划报告章节(2-3 个)
- 为每个章节生成针对性内容
- 自动汇总所有章节,输出格式规整的完整报告
1.4 工作流流程图
plaintext
用户输入: "写一篇关于人工智能的报告"
↓
┌─────────────────────┐
│ 编排器 (Orchestrator) │ ← AI 规划:需要写3个章节
│ 拆分任务 │
└─────────────────────┘
↓
┌───┬───┬───┐
│ │ │ │
↓ ↓ ↓ ↓
章节1 章节2 章节3 ← 工作者(Worker)分别执行
│ │ │ │
└───┴───┴───┘
↓
汇总结果(合成器)
流程执行链路:START → orchestrator → worker → synthesizer → END
二、环境准备
2.1 安装依赖库
执行以下命令安装所需 Python 库,建议在虚拟环境中操作:
bash
运行
# 升级pip
python.exe -m pip install --upgrade pip
# 安装核心依赖
pip install --upgrade langchain langchain-core langchain-openai langgraph pydantic
2.2 大模型初始化说明
本文示例使用通用 OpenAI 兼容接口的大模型,你可根据实际使用的模型(如 OpenAI GPT-4、智谱 AI、通义千问等)调整初始化参数,核心代码框架完全通用:
python
运行
from langchain_openai import ChatOpenAI
# 通用大模型初始化模板(按需替换参数)
llm = ChatOpenAI(
model="你使用的模型名称", # 如gpt-4o、glm-4、qwen-plus等
api_key="你的API密钥", # 替换为自己的API Key
base_url="模型接口地址" # 非OpenAI官方模型需填写对应base_url
)
三、分步实现详解
步骤 1:定义结构化输出模板(Schema)
核心目的:让 AI 返回固定格式的结构化数据,而非自由文本,方便后续程序直接解析使用。
1.1 定义单个章节结构
python
运行
from pydantic import BaseModel, Field
from typing import List
# 单个章节的结构定义
class Section(BaseModel):
name: str = Field(description="Name for this section of the report.") # 章节名称
description: str = Field(description="Brief overview of the main topics and concepts to be covered in this section.") # 章节描述
作用:强制 AI 返回的每个章节必须包含 “名称” 和 “描述” 两个字段,避免格式混乱。
1.2 定义多章节列表结构
python
运行
# 多个章节的列表结构
class Sections(BaseModel):
sections: List[Section] = Field(description="Sections of the report.") # 章节列表
作用:让 AI 返回的章节规划以列表形式呈现,方便后续遍历执行。
1.3 绑定结构化输出
python
运行
# 将结构化模板绑定到LLM,确保输出符合定义
planner = llm.with_structured_output(Sections)
为什么需要结构化?
- 非结构化输出:AI 可能返回 “我建议分三章写,第一章讲起源...”,需手动解析字符串,易出错
- 结构化输出:AI 直接返回
Sections(sections=[Section(name="早期发展", ...), ...]),可直接用for循环遍历,代码更健壮
步骤 2:定义工作流状态(State)
State 是 LangGraph 的核心,用于存储工作流执行过程中所有数据,相当于 “全局数据容器”:
python
运行
from typing_extensions import TypedDict
from typing import Annotated, List
import operator
class State(TypedDict):
topic: str # 用户输入的报告主题
sections: List[Section] # 编排器规划的章节列表
completed_sections: Annotated[List[str], operator.add] # 已完成的章节内容(累加存储)
final_report: str # 最终生成的完整报告
operator.add:确保多轮执行中,completed_sections能累加存储所有章节内容,而非覆盖
步骤 3:实现核心节点函数
3.1 编排器(Orchestrator)- 规划章节
python
运行
from langchain.messages import HumanMessage, SystemMessage
def orchestrator(state: State):
"""
编排器节点:根据用户输入的主题,规划报告章节
:param state: 工作流状态(包含topic字段)
:return: 更新后的状态(添加sections字段)
"""
# 调用LLM生成章节规划
report_sections = planner.invoke(
[
# 系统提示:限定生成2-3个章节
SystemMessage(content="Generate a plan for a report. Return a list of 2-3 sections."),
# 用户输入:传入报告主题
HumanMessage(content=f"Topic: {state['topic']}"),
]
)
# 返回规划的章节列表,更新状态
return {"sections": report_sections.sections}
3.2 工作者(Worker)- 生成章节内容
python
运行
def worker(state: State):
"""
工作者节点:遍历章节列表,为每个章节生成内容
:param state: 工作流状态(包含sections字段)
:return: 更新后的状态(添加completed_sections字段)
"""
completed = []
# 遍历编排器规划的每个章节
for section in state["sections"]:
# 为当前章节生成内容
result = llm.invoke(
[
SystemMessage(content="Write a short paragraph (2-3 sentences) for the given section."),
HumanMessage(content=f"Section: {section.name}\nDescription: {section.description}"),
]
)
# 按Markdown格式存储章节内容
completed.append(f"## {section.name}\n{result.content}\n")
# 返回已完成的章节内容,更新状态
return {"completed_sections": completed}
3.3 合成器(Synthesizer)- 汇总最终报告
python
运行
def synthesizer(state: State):
"""
合成器节点:汇总所有章节内容,生成最终报告
:param state: 工作流状态(包含completed_sections字段)
:return: 更新后的状态(添加final_report字段)
"""
# 拼接最终报告(标题+所有章节)
final_report = f"# Report: {state['topic']}\n\n"
final_report += "\n".join(state["completed_sections"])
# 返回最终报告,更新状态
return {"final_report": final_report}
步骤 4:构建并编译工作流
python
运行
from langgraph.graph import StateGraph, START, END
# 1. 初始化状态图(传入State定义)
builder = StateGraph(State)
# 2. 添加节点(将函数绑定为工作流节点)
builder.add_node("orchestrator", orchestrator) # 编排器节点
builder.add_node("worker", worker) # 工作者节点
builder.add_node("synthesizer", synthesizer) # 合成器节点
# 3. 添加执行边(定义节点执行顺序)
builder.add_edge(START, "orchestrator") # 开始 → 编排器
builder.add_edge("orchestrator", "worker") # 编排器 → 工作者
builder.add_edge("worker", "synthesizer") # 工作者 → 合成器
builder.add_edge("synthesizer", END) # 合成器 → 结束
# 4. 编译工作流(生成可执行的chain)
workflow = builder.compile()
步骤 5:执行工作流并输出结果
python
运行
def safe_print(text):
"""
兼容GBK编码的打印函数,避免特殊字符导致的打印错误
:param text: 要打印的文本
"""
print(text.encode('gbk', errors='ignore').decode('gbk'), end="", flush=True)
if __name__ == "__main__":
# 执行工作流(传入初始状态:主题+空的已完成章节列表)
state = workflow.invoke({"topic": "人工智能的发展历史", "completed_sections": []})
# 输出最终报告
safe_print(state["final_report"])
四、完整可运行源码
python
运行
# -*- coding: utf-8 -*-
"""
langchain_test6_orchestrator-worker
~~~~~~~~~~~~
:copyright: (c) 2026
:authors: WangQiaomei
:version: 1.0 of 2026/3/16
"""
import warnings
# ====================== 1. 导入核心库 ======================
from typing import Annotated, List
import operator
# ====================== 2. 定义结构化输出模板 ======================
class Section(BaseModel):
name: str = Field(description="Name for this section of the report.")
description: str = Field(description="Brief overview of the main topics and concepts to be covered in this section.")
class Sections(BaseModel):
sections: List[Section] = Field(description="Sections of the report.")
# 绑定结构化输出
planner = llm.with_structured_output(Sections)
# ====================== 3. 定义工作流状态 ======================
class State(TypedDict):
topic: str # 用户输入的主题
sections: List[Section] # 编排器规划的章节列表
completed_sections: Annotated[List[str], operator.add] # 已完成的章节内容
final_report: str # 最终报告
# ====================== 4. 实现节点函数 ======================
def orchestrator(state: State):
"""编排器:分析主题,规划报告章节"""
report_sections = planner.invoke(
[
SystemMessage(content="Generate a plan for a report. Return a list of 2-3 sections."),
HumanMessage(content=f"Topic: {state['topic']}"),
]
)
return {"sections": report_sections.sections}
def worker(state: State):
"""工作者:为每个章节生成内容"""
completed = []
for section in state["sections"]:
result = llm.invoke(
[
SystemMessage(content="Write a short paragraph (2-3 sentences) for the given section."),
HumanMessage(content=f"Section: {section.name}\nDescription: {section.description}"),
]
)
completed.append(f"## {section.name}\n{result.content}\n")
return {"completed_sections": completed}
def synthesizer(state: State):
"""合成器:汇总所有章节为最终报告"""
final_report = f"# Report: {state['topic']}\n\n"
final_report += "\n".join(state["completed_sections"])
return {"final_report": final_report}
# ====================== 5. 构建工作流 ======================
builder = StateGraph(State)
# 添加节点
builder.add_node("orchestrator", orchestrator)
builder.add_node("worker", worker)
builder.add_node("synthesizer", synthesizer)
# 添加执行边
builder.add_edge(START, "orchestrator")
builder.add_edge("orchestrator", "worker")
builder.add_edge("worker", "synthesizer")
builder.add_edge("synthesizer", END)
# 编译工作流
workflow = builder.compile()
if __name__ == "__main__":
# 执行工作流
state = workflow.invoke({"topic": "人工智能的发展历史", "completed_sections": []})
# 输出结果
safe_print(state["final_report"])
五、预期输出示例
plaintext
# Report: 人工智能的发展历史
## 早期发展
人工智能的概念起源于20世纪50年代,1956年达特茅斯会议正式提出了"人工智能"这一术语,早期研究聚焦于逻辑推理和符号系统,如通用问题求解器等。
## 机器学习时代
21世纪初,机器学习开始快速发展,随着数据量的增长和计算能力的提升,支持向量机、决策树等算法逐步落地,让AI从理论走向实际应用。
## 深度学习革命
2010年代,深度学习带来了重大突破,卷积神经网络(CNN)、循环神经网络(RNN)等模型的出现,推动AI在计算机视觉、自然语言处理等领域实现跨越式发展。
六、关键知识点总结
- Orchestrator-Worker 核心:AI 动态拆分任务数量,而非固定任务列表,适配报告撰写、多文件处理等灵活场景;
- 结构化输出的价值:通过 Pydantic 定义 Schema,让 AI 返回可直接解析的结构化数据,避免自由文本解析的繁琐与错误;
- LangGraph 状态管理:State 作为全局数据容器,通过
operator.add实现数据累加,保障工作流数据流转的完整性。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)