作者:WangQiaomei版本:1.0(2026/3/16)

一、核心概念解析

1.1 Orchestrator-Worker 模式是什么?

Orchestrator-Worker(编排器 - 工作者)是 LangGraph 中极具灵活性的工作流模式,核心逻辑是 **“先规划、再执行、最后汇总”**:

  • 编排器(Orchestrator):AI 根据输入动态拆分任务(比如写报告时自动规划章节数量),而非预先固定任务列表
  • 工作者(Worker):按编排器的规划,逐个执行子任务(比如为每个章节生成内容)
  • 合成器(Synthesizer):汇总所有子任务结果,输出最终产物(比如整合所有章节成完整报告)

1.2 与 Routing 模式的核心区别

表格

模式 核心特点 适用场景
Routing 任务固定(如选 “写故事 / 写笑话 / 写诗” 其一),仅做选择执行 任务类型明确、数量固定的场景
Orchestrator-Worker 任务数量不固定,由 AI 动态决定拆分数量 报告撰写、多文件代码更新、未知数量子任务的场景

1.3 实现目标

输入任意报告主题(如 “人工智能的发展历史”),程序自动完成:

  1. AI 智能规划报告章节(2-3 个)
  2. 为每个章节生成针对性内容
  3. 自动汇总所有章节,输出格式规整的完整报告

1.4 工作流流程图

plaintext

用户输入: "写一篇关于人工智能的报告"
              ↓
    ┌─────────────────────┐
    │   编排器 (Orchestrator)  │  ← AI 规划:需要写3个章节
    │   拆分任务              │
    └─────────────────────┘
              ↓
    ┌───┬───┬───┐
    │   │   │   │
    ↓   ↓   ↓   ↓
 章节1 章节2 章节3   ← 工作者(Worker)分别执行
    │   │   │   │
    └───┴───┴───┘
              ↓
         汇总结果(合成器)

流程执行链路:START → orchestrator → worker → synthesizer → END

二、环境准备

2.1 安装依赖库

执行以下命令安装所需 Python 库,建议在虚拟环境中操作:

bash

运行

# 升级pip
python.exe -m pip install --upgrade pip
# 安装核心依赖
pip install --upgrade langchain langchain-core langchain-openai langgraph pydantic

2.2 大模型初始化说明

本文示例使用通用 OpenAI 兼容接口的大模型,你可根据实际使用的模型(如 OpenAI GPT-4、智谱 AI、通义千问等)调整初始化参数,核心代码框架完全通用:

python

运行

from langchain_openai import ChatOpenAI

# 通用大模型初始化模板(按需替换参数)
llm = ChatOpenAI(
    model="你使用的模型名称",  # 如gpt-4o、glm-4、qwen-plus等
    api_key="你的API密钥",     # 替换为自己的API Key
    base_url="模型接口地址"    # 非OpenAI官方模型需填写对应base_url
)

三、分步实现详解

步骤 1:定义结构化输出模板(Schema)

核心目的:让 AI 返回固定格式的结构化数据,而非自由文本,方便后续程序直接解析使用。

1.1 定义单个章节结构

python

运行

from pydantic import BaseModel, Field
from typing import List

# 单个章节的结构定义
class Section(BaseModel):
    name: str = Field(description="Name for this section of the report.")  # 章节名称
    description: str = Field(description="Brief overview of the main topics and concepts to be covered in this section.")  # 章节描述

作用:强制 AI 返回的每个章节必须包含 “名称” 和 “描述” 两个字段,避免格式混乱。

1.2 定义多章节列表结构

python

运行

# 多个章节的列表结构
class Sections(BaseModel):
    sections: List[Section] = Field(description="Sections of the report.")  # 章节列表

作用:让 AI 返回的章节规划以列表形式呈现,方便后续遍历执行。

1.3 绑定结构化输出

python

运行

# 将结构化模板绑定到LLM,确保输出符合定义
planner = llm.with_structured_output(Sections)
为什么需要结构化?
  • 非结构化输出:AI 可能返回 “我建议分三章写,第一章讲起源...”,需手动解析字符串,易出错
  • 结构化输出:AI 直接返回Sections(sections=[Section(name="早期发展", ...), ...]),可直接用for循环遍历,代码更健壮

步骤 2:定义工作流状态(State)

State 是 LangGraph 的核心,用于存储工作流执行过程中所有数据,相当于 “全局数据容器”:

python

运行

from typing_extensions import TypedDict
from typing import Annotated, List
import operator

class State(TypedDict):
    topic: str                                              # 用户输入的报告主题
    sections: List[Section]                                 # 编排器规划的章节列表
    completed_sections: Annotated[List[str], operator.add]  # 已完成的章节内容(累加存储)
    final_report: str                                       # 最终生成的完整报告
  • operator.add:确保多轮执行中,completed_sections能累加存储所有章节内容,而非覆盖

步骤 3:实现核心节点函数

3.1 编排器(Orchestrator)- 规划章节

python

运行

from langchain.messages import HumanMessage, SystemMessage

def orchestrator(state: State):
    """
    编排器节点:根据用户输入的主题,规划报告章节
    :param state: 工作流状态(包含topic字段)
    :return: 更新后的状态(添加sections字段)
    """
    # 调用LLM生成章节规划
    report_sections = planner.invoke(
        [
            # 系统提示:限定生成2-3个章节
            SystemMessage(content="Generate a plan for a report. Return a list of 2-3 sections."),
            # 用户输入:传入报告主题
            HumanMessage(content=f"Topic: {state['topic']}"),
        ]
    )
    # 返回规划的章节列表,更新状态
    return {"sections": report_sections.sections}
3.2 工作者(Worker)- 生成章节内容

python

运行

def worker(state: State):
    """
    工作者节点:遍历章节列表,为每个章节生成内容
    :param state: 工作流状态(包含sections字段)
    :return: 更新后的状态(添加completed_sections字段)
    """
    completed = []
    # 遍历编排器规划的每个章节
    for section in state["sections"]:
        # 为当前章节生成内容
        result = llm.invoke(
            [
                SystemMessage(content="Write a short paragraph (2-3 sentences) for the given section."),
                HumanMessage(content=f"Section: {section.name}\nDescription: {section.description}"),
            ]
        )
        # 按Markdown格式存储章节内容
        completed.append(f"## {section.name}\n{result.content}\n")
    # 返回已完成的章节内容,更新状态
    return {"completed_sections": completed}
3.3 合成器(Synthesizer)- 汇总最终报告

python

运行

def synthesizer(state: State):
    """
    合成器节点:汇总所有章节内容,生成最终报告
    :param state: 工作流状态(包含completed_sections字段)
    :return: 更新后的状态(添加final_report字段)
    """
    # 拼接最终报告(标题+所有章节)
    final_report = f"# Report: {state['topic']}\n\n"
    final_report += "\n".join(state["completed_sections"])
    # 返回最终报告,更新状态
    return {"final_report": final_report}

步骤 4:构建并编译工作流

python

运行

from langgraph.graph import StateGraph, START, END

# 1. 初始化状态图(传入State定义)
builder = StateGraph(State)

# 2. 添加节点(将函数绑定为工作流节点)
builder.add_node("orchestrator", orchestrator)  # 编排器节点
builder.add_node("worker", worker)              # 工作者节点
builder.add_node("synthesizer", synthesizer)    # 合成器节点

# 3. 添加执行边(定义节点执行顺序)
builder.add_edge(START, "orchestrator")         # 开始 → 编排器
builder.add_edge("orchestrator", "worker")      # 编排器 → 工作者
builder.add_edge("worker", "synthesizer")       # 工作者 → 合成器
builder.add_edge("synthesizer", END)            # 合成器 → 结束

# 4. 编译工作流(生成可执行的chain)
workflow = builder.compile()

步骤 5:执行工作流并输出结果

python

运行

def safe_print(text):
    """
    兼容GBK编码的打印函数,避免特殊字符导致的打印错误
    :param text: 要打印的文本
    """
    print(text.encode('gbk', errors='ignore').decode('gbk'), end="", flush=True)

if __name__ == "__main__":
    # 执行工作流(传入初始状态:主题+空的已完成章节列表)
    state = workflow.invoke({"topic": "人工智能的发展历史", "completed_sections": []})
    # 输出最终报告
    safe_print(state["final_report"])

四、完整可运行源码

python

运行

# -*- coding: utf-8 -*-
"""
langchain_test6_orchestrator-worker
~~~~~~~~~~~~

:copyright: (c) 2026
:authors: WangQiaomei
:version: 1.0 of 2026/3/16
"""
import warnings



# ====================== 1. 导入核心库 ======================
from typing import Annotated, List
import operator



# ====================== 2. 定义结构化输出模板 ======================
class Section(BaseModel):
    name: str = Field(description="Name for this section of the report.")
    description: str = Field(description="Brief overview of the main topics and concepts to be covered in this section.")

class Sections(BaseModel):
    sections: List[Section] = Field(description="Sections of the report.")

# 绑定结构化输出
planner = llm.with_structured_output(Sections)

# ====================== 3. 定义工作流状态 ======================
class State(TypedDict):
    topic: str                                              # 用户输入的主题
    sections: List[Section]                                 # 编排器规划的章节列表
    completed_sections: Annotated[List[str], operator.add]  # 已完成的章节内容
    final_report: str                                       # 最终报告

# ====================== 4. 实现节点函数 ======================
def orchestrator(state: State):
    """编排器:分析主题,规划报告章节"""
    report_sections = planner.invoke(
        [
            SystemMessage(content="Generate a plan for a report. Return a list of 2-3 sections."),
            HumanMessage(content=f"Topic: {state['topic']}"),
        ]
    )
    return {"sections": report_sections.sections}

def worker(state: State):
    """工作者:为每个章节生成内容"""
    completed = []
    for section in state["sections"]:
        result = llm.invoke(
            [
                SystemMessage(content="Write a short paragraph (2-3 sentences) for the given section."),
                HumanMessage(content=f"Section: {section.name}\nDescription: {section.description}"),
            ]
        )
        completed.append(f"## {section.name}\n{result.content}\n")
    return {"completed_sections": completed}

def synthesizer(state: State):
    """合成器:汇总所有章节为最终报告"""
    final_report = f"# Report: {state['topic']}\n\n"
    final_report += "\n".join(state["completed_sections"])
    return {"final_report": final_report}

# ====================== 5. 构建工作流 ======================
builder = StateGraph(State)

# 添加节点
builder.add_node("orchestrator", orchestrator)
builder.add_node("worker", worker)
builder.add_node("synthesizer", synthesizer)

# 添加执行边
builder.add_edge(START, "orchestrator")
builder.add_edge("orchestrator", "worker")
builder.add_edge("worker", "synthesizer")
builder.add_edge("synthesizer", END)

# 编译工作流
workflow = builder.compile()



if __name__ == "__main__":
    # 执行工作流
    state = workflow.invoke({"topic": "人工智能的发展历史", "completed_sections": []})
    # 输出结果
    safe_print(state["final_report"])

五、预期输出示例

plaintext

# Report: 人工智能的发展历史

## 早期发展
人工智能的概念起源于20世纪50年代,1956年达特茅斯会议正式提出了"人工智能"这一术语,早期研究聚焦于逻辑推理和符号系统,如通用问题求解器等。

## 机器学习时代
21世纪初,机器学习开始快速发展,随着数据量的增长和计算能力的提升,支持向量机、决策树等算法逐步落地,让AI从理论走向实际应用。

## 深度学习革命
2010年代,深度学习带来了重大突破,卷积神经网络(CNN)、循环神经网络(RNN)等模型的出现,推动AI在计算机视觉、自然语言处理等领域实现跨越式发展。

六、关键知识点总结

  1. Orchestrator-Worker 核心:AI 动态拆分任务数量,而非固定任务列表,适配报告撰写、多文件处理等灵活场景;
  2. 结构化输出的价值:通过 Pydantic 定义 Schema,让 AI 返回可直接解析的结构化数据,避免自由文本解析的繁琐与错误;
  3. LangGraph 状态管理:State 作为全局数据容器,通过operator.add实现数据累加,保障工作流数据流转的完整性。
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐