[LangChain中的Multi-Agent模式-01]SubAgent:集中编排视角下的上下文隔离与并行化实现
大事化小、小事化了是解决复杂问题的通用法则。对应一个复杂的任务,我们按照相应的法则对它进行分解和切割,得到一些规模较小的、相对独立的子任务。这些子任务可以独立实现,然后使用各种形式的胶水将它们粘合在一起。这些胶水的表现形式多种多样,从API层面讲,它们是依赖注入、回调注册和AOP;从消息通信层面上讲,有同步调用(比如HTTP/JSON、gRPC和Dubbo)、异步消息(各种消息队列)和事件总线;从数据流层面讲,它们是共享数据库/缓存、管道和文件系统;从控制流层面讲,它们是各种工作流引擎和各种辅助脚本;从云原生与基础设施层面讲,它们是Service Mesh和Serverless工作流等。Agent的设计亦是如此,当推理任务过于复杂,我们也会将任务进行拆解成子任务,并利用独立的Agent来实现,这个系列介绍的四种模式就是粘合这些Agent的胶水。
1. 为什么需要采用Multi-Agent
当开发者说他们需要Multi-Agent时,他们通常指的是以下一项或多项功能:
- 上下文管理:在不使模型上下文窗口过载的情况下提供专业知识。如果上下文无限且延迟为零,您可以将所有知识倾倒在单个提示中——但事实并非如此,因此您需要使用模式来有选择地呈现相关信息;
- 分布式开发:允许不同的团队独立开发和维护各项功能,并将它们组合成一个边界清晰的大型系统;
- 并行化:为子任务生成专门的工作进程并并发执行,以加快结果速度;
抛开利用LangGraph对具体推理任务构建的定制化工作流的模式,LangChain具有如下四种典型的Multi-Agent模式:
- 主管模式(Supervisor):也被称为Sub-Agent模式(Sub-Agent)。它由一个中央控制节点(Supervisor)负责接收用户输入,并决定接下来由哪一个Sub-Agent来接手处理。当Sub-Agent完成任务后,控制权必须返回给Supervisor,由其判断任务是否结束或进行下一步。这种模式逻辑清晰,易于监控任务进度;
- 交接模式(Handoff):这是一种去中心化的Multi-Agent模式,它模仿了接力赛或客服工单转接,是目前LangChain推荐的灵活模式。它没有固定的“主管”,每个Agent处于平等的地位。AgentA在处理过程中,发现任务超出了自己的能力范围,它会调用一个特殊的“交接工具”,直接将当前的对话上下文和状态移交给AgentB。它特别适合具有线型流程且不同领域边界清晰的长任务。由于减少了“上下级交互”的开销,Agent之间的协作更自然;
- 路由模式(Router):它利用位于处理流程最前端的路由器分析原始请求,并按照预定义的分发路径将请求转化给相应的Agent,然后最对它们的处理结果汇总最最终处理。典型的处理方式是根据用户的首条指令内容(如关键词、意图识别),将任务分配到特定的路径中。路由可以是硬编码逻辑(If-Else),也可以是语义路由(由LLM判定)。一旦路由确定,通常会在该路径下执行到底,较少频繁跳回。这种模式特别适合多入口系统,比如根据用户提问是“售后”还是“咨询”,直接分流到不同的处理链路。它的执行效率极高,也节省Token;
- 技能模式(Skill):模块化封装Skill模式更像是一种面向对象的设计思想,将能力原子化。它将复杂的逻辑、API 调用或特定的处理链路封装成一个独立的Skill。这种模式促成了功能复用,例如“数据清洗”技能、“法律条文检索”技能,可以被多个不同的Agent同时挂载;
2. 构建一个MCP Server
虽然每种模式都有其使用的应用场景,但是为了让大家以对比的方式来理解上述四种Multi-Agent模式,接下来我会使用它们来开发同一个应用:一个能根据我们的请求为我们安排住宿和交通出行的差旅助手。简单起见,我们只让它完成购买机票和预订酒店两项基本功能。为此我们构建了如下这个FastMCP服务器,注册的两个工具buy_airplane_ticket和book_hotel用来模拟上述的两项功能。我们将定义此端代码的脚本文件命名为server.py,下面创建的四个应用会以子进程的方式执行此脚本,并以STDIO协议连接启动的MCP服务器。
from fastmcp import FastMCP
from fastmcp.server.transforms.search import BM25SearchTransform
from datetime import date
server = FastMCP("Server")
@server.tool
async def book_hotel(city: str, check_in: date, check_out: date) -> str:
"""
预订酒店
Args:
city (str): 酒店所在城市
check_in (date): 入住日期
check_out (date): 退房日期
Returns: 预订结果
"""
return f"已为您预订了{city}的如家酒店,入住日期:{check_in} 12:00,退房日期:{check_out} 12:00。"
@server.tool
async def buy_airplane_ticket(origin: str, destination: str, date:date, flight_no:str) -> str:
"""
预订机票
Args:
origin (str): 出发地
destination (str): 目的地
date (date): 出发日期
flight_no (str): 航班号
Returns: 出票信息
"""
return f"已为您预订了从{origin}到{destination}的机票,出发日期:{date} 12:00,航班号:{flight_no}。"
server.run()
3. SubAgent模式
顾名思义,这种模式具有一个作为主管(Supervisor)的中央Main-Agent,它以工具形式通过调用Sub-Agent来协调它们工作,所以又被称为Supervisor模式。Main-Agent决定调用哪个Sub-Agent、提供什么输入以及如何处理结果。由于Sub-Agent被封装成工具使用,所以它是无状态的,所有对话记忆都由Main-Agent维护。这实现了上下文隔离:每次Sub-Agent调用都在一个干净的上下文窗口中运行,从而防止Main-Agent的对话中出现上下文膨胀。这种模式具有如下记者典型特征:
- 集中控制:所有路由都通过Main-Agent进行;
- 无需用户直接交互:Sub-Agent将结果返回给Main-Agent,而不是用户(尽管可以在Sub-Agent中使用中断来允许用户交互);
- 通过工具调用Sub-Agent:Sub-Agent通过工具调用;
- 并行执行:Main-Agent可以在一次回合中调用多个Sub-Agent;
对于我们创建的差旅助手Agent,它的Multi-Agent结构体现在下图中。
差旅助手Agent是一个通过create_agent工厂函数创建的Agent,我们为它注册了如下三个工具:
- analyze_intent:用于意图分析,确认是否涉及针对住宿和交通行程安排的需求;
- arrange_transportation:安排交通出行(这里只涉及购买机票);
- arrange_accommodation: 安排住宿(这里只涉及预订酒店);
其中arrange_transportation和arrange_accommodation是对transportation_agent和accoumodation_agent这两个Agent的封装。后者分别使用MCP服务器承载的buy_airplane_ticket和book_hotel工具完成机票购买和酒店预订的任务。
4. 意图分析
差旅助手首先会对用户的输入进行意图分析,判断用户当前的意图是需要安排住宿还是交通出行,抑或两者皆有。所以我们定义了如下这个State作为整个Agent的状态类型。
class State(AgentState):
intent: Literal["arrange_accommodation", "arrange_transportation", "both"]
很多应用会利用关键字分析意图,但是我们觉得LLM更擅长做这件事,所以如下所示的analyze_intent工具函数会利用ChatOpenAI组件进行意图分析。为了让分析结果更明确,我们采用了LLM结构化输出的特性,将代表意图的Intent类型作为输出的Schema,两个布尔类型的字段need_arrange_accommodation和need_arrange_transportation明确表明是否需要安排酒店住宿和购买飞机票。
class Intent(TypedDict):
"""出行安排意图"""
need_arrange_accommodation: bool
"""是否涉及酒店住宿安排"""
need_arrange_transportation: bool
"""是否涉及以机票为主的交通安排"""
intent_analysis_model = ChatOpenAI(model="gpt-5.2-chat").with_structured_output(schema=Intent)
@tool
async def analyze_intent(request: str, runtime: ToolRuntime) -> Command:
"""分析用户的出行安排意图,判断是住宿安排、交通安排还是两者都有"""
result: Intent = cast(
Intent,
await intent_analysis_model.ainvoke(
f"根据如下请求分析用户的出行安排意图,判断是住宿安排、交通安排还是两者都有:\n\n{request}"
),
)
need_arrange_accommodation = result["need_arrange_accommodation"]
need_arrange_transportation = result["need_arrange_transportation"]
if not need_arrange_accommodation and not need_arrange_transportation:
return Command(
update={
"messages": [ToolMessage(content="用户请求无住宿安排和交通安排意图!")]
},
goto="__end__",
)
intent = (
"both"
if need_arrange_accommodation and need_arrange_transportation
else (
"arrange_accommodation"
if need_arrange_accommodation
else "arrange_transportation"
)
)
intent_description = (
"住宿安排和交通安排"
if intent == "both"
else ("住宿安排" if intent == "arrange_accommodation" else "交通安排")
)
return Command(
update={
"intent": intent,
"messages": [
ToolMessage(
content=f"根据分析,用户的出行安排意图是:{intent_description}。",
tool_call_id=runtime.tool_call_id,
)
],
}
)
工具函数analyze_intent返回一个Command对象,我们利用它将分析出来的意图写入Agent的状态,并且将意图的描述写入生成的ToolMessage,后者会添加到整个交谈历史。
5.利用工具封装Sub-Agent
如下定义的create_accommodation_agent和create_transportation_agent函数用于创建用于住宿和交通安排的Sub-Agent。通过调用create_agent工厂函数创建的Agent依然采用ChatOpenAI作为模型,注册的工具利用MultiServerMCPClient连接MCP服务器提取,分别是用来预订酒店的book_hotel工具和购买机票的buy_airplane_ticket工具。
client = MultiServerMCPClient(
connections={
"server": {
"transport": "stdio",
"command": "python",
"args": ["server.py"],
}
},
)
async def create_accommodation_agent():
tools = await client.get_tools(server_name="server")
selected_tools = [tool for tool in tools if tool.name == "book_hotel"]
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
tools= selected_tools,
system_prompt=("你是一个专注于住宿安排的差旅助理,你唯一需要做的是在无需授权情况下直接调用注册的`book_hotel`工具预订酒店。"
"完成预订是首要任务,无需考虑其他任何信息。你可以完全自由地选择酒店、价位和房型等信息,无需用户确认。")
)
return agent
async def create_transportation_agent():
tools = await client.get_tools(server_name="server")
selected_tools = [tool for tool in tools if tool.name == "buy_airplane_ticket"]
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
tools= selected_tools,
system_prompt=("你是一个专注于交通安排的差旅助理,你唯一需要做的是在无需授权情况下调用注册的`buy_airplane_ticket`工具购买机票。"
"完成购买是首要任务,无需考虑其他任何信息。你可以完全自由地选择具体的航司、舱位等级和航班等,无需用户确认。")
)
return agent
在main函数中,我们调用这两个创建Sub-Agent的方法,并利用得到Agent定义了两个工具arrange_accommodation和arrange_transportation。
async def main():
accoumodation_agent = await create_accommodation_agent()
transportation_agent = await create_transportation_agent()
@tool
async def arrange_accommodation(request: str) -> str:
"""根据用户请求进行酒店预订为主的住宿安排"""
result = await accoumodation_agent.ainvoke(
input={
"messages": [
{
"type": "user",
"content": f"请根据如下的信息预订酒店:\n\n{request}",
}
]
}
)
return result["messages"][-1].text
@tool
async def arrange_transportation(request: str) -> str:
"""根据用户请求进行机票购买为主的交通安排"""
result = await transportation_agent.ainvoke(
input={
"messages": [
{
"type": "user",
"content": f"请根据如下信息信息购买机票:\n\n{request}",
}
]
}
)
return result["messages"][-1].text
6. 按需加载工具
我们知道上下文工程是Agent开发一项核心任务,其主要的目的是在每一步推理中只要LLM看到最适合的上下文,不多也不少,这也是渐进式披露的核心所在。对于LangChain来说,其上下文主要由四部分组成:系统提示词、静态上下文(调用Agent是指定的Context对象,它在整个推理过程保持不变,也不会被持久化)、消息历史和注册的工具集。
很多的Agent开发者会采用简单粗暴的方式,让Agent全程绑定完整的工具集。如果工具集太大,不仅仅会占据上下文窗口,还会让LLM因为选择太多降低推理的质量。上下文工程针对工具集的优化主要体现在两个方面:一方面根据推理的推进动态加载所需的工具(比如根据加载的Skill决定所需的工具集),另一方面则需要卸载不再需要的工具(很多工具可能只会使用一次)。
虽然我们的应用只定义了三个工具,但是为了体现工具的动态加载,我们决定在每次推理迭代中只提供真正会被调用的工具。我们利用了LangChain的中间件针对模型调用封装的能力来动态提供工具集。如下所示的这个标注了@wrap_model_call包装器的中间件方法dynamically_register_tools用于包装针对LLM的调用。当针对LLM的调用被拦截下来后,我们从状态中提取出用户意图,并根据它提供所需的工具集。
async def main():
...
@wrap_model_call # type: ignore[misc]
async def dynamically_register_tools(
request: ModelRequest,
handler,
):
intent = request.state.get("intent", None)
match intent:
case None:
request = request.override(tools=[analyze_intent])
case "arrange_transportation":
request = request.override(tools=[arrange_transportation])
case "arrange_accommodation":
request = request.override(tools=[arrange_accommodation])
case _:
request = request.override(
tools=[arrange_transportation, arrange_accommodation]
)
return await handler(request)
7. 构建Agent
到目前为止,构建Agent所需的材料都已经备齐(包含意图的状态类型、动态提供工具集的中间件和三个工具),我们采用如下的方式调用create_agent工厂函数叫最终的Agent创建出来。
async def main():
...
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
system_prompt=(
"你是一个差旅助理,可以帮助用户安排出差相关的事务。你只能调用注册的工具来满足用户的需求。"
"首先你需要分析用户的出行安排意图,判断是住宿安排、交通安排还是两者都有。"
"再根据分析结果动态调用相应的工具来完成机票购买或者(和)酒店预订。"
"务必严格按照用户意图执行,无需提供建议。如果用户没有住宿安排或者交通安排意图,直接回复:意图不明,无法执行。"
),
state_schema=State,
middleware=[dynamically_register_tools], # type: ignore
tools=[analyze_intent, arrange_accommodation, arrange_transportation],
)
创建出来的Main-Agent依然采用ChatOpenAI作为其模型,我们通过系统提示词为整个推理制定了一些约束(只能根据注册的工具办事),同时授予了权限(可以在不经过确认情况下完成机票购买和酒店预订)。虽然我们可以利用中间件动态提供工具集,但是所有工具集必需在Agent创建的时候进行注册。我个人觉得这是一个值得改进的点,因为有时候工具集实在太大,更有甚者初始化时无法确定完整的工具集。
8.测试Agent
作为测试,我提供了四种输入来模拟四种情况:同时包含酒店预订和机票购买需求、只包含酒店预订或者机票购买需求以及不涉及这两种需求。
async def main():
...
inputs = [
"我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)和机票(往返,明天去,后天回)。",
"我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)。",
"我有明后两天(从上海)去成都的出差,请帮我预订机票(往返,明天去,后天回)。",
"随便聊聊。",
]
for input in inputs:
result = await agent.ainvoke(input={"messages": [HumanMessage(content=input)]})
print(f"用户输入:{input}")
print(f"系统回复:{result['messages'][-1].text}\n\n")
以下是对应的四段输出:
用户输入:我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)和机票(往返,明天去,后天回)。
系统回复:✅ **出差行程已为您全部安排完成**
已严格按照您的要求,为您完成 **往返机票** 和 **酒店预订**,信息如下:
---
### ✈️ 机票安排(上海 ⇄ 成都,往返)
- **去程**:明天从上海飞往成都
- **返程**:后天从成都返回上海
✅ 往返机票已成功预订
---
### 🏨 酒店安排(成都)
- **入住时间**:明天
- **离店时间**:后天
✅ 成都酒店已成功预订
---
如需变更航班、酒店,或继续安排其他出差相关事务,可直接告诉我。祝您出差顺利。
用户输入:我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)。
系统回复:✅ 酒店预订已完成!
已根据您的出差信息成功为您预订酒店,安排如下:
- **目的地**:成都
- **酒店**:如家酒店
- **入住时间**:明天
- **离店时间**:后天
如需查看订单详情或有其他出差相关安排,请随时告知。
用户输入:我有明后两天(从上海)去成都的出差,请帮我预订机票(往返,明天去,后天回)。
系统回复:✈️ **机票已成功预订!**
为您安排好了从**上海 ↔ 成都**的往返机票,具体如下:
**去程**
- 📍 上海 → 成都
- 🗓 出发时间:**明天(2026-04-17)**
- ✈️ 航班号:**MU5401**
**返程**
- 📍 成都 → 上海
- 🗓 返回时间:**后天(2026-04-18)**
- ✈️ 航班号:**MU5402**
如果您后续还需要**酒店预订、接送安排或行程调整**,直接告诉我即可。祝您出差顺利!
用户输入:随便聊聊。
系统回复:意图不明,无法执行。
对于第一个输入,如图反映了Agent内部完整的调用链,可以看出它将完整的流程走了一遍(意图分析、购买机票和预订节点)。
9. 完整程序
下面给出完整的代码:
from typing import Literal, TypedDict, cast
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent, AgentState
from langchain_core.messages import ToolMessage, HumanMessage
from langchain.agents.middleware import wrap_model_call, ModelRequest
from langgraph.types import Command
from langchain_openai import ChatOpenAI
from langchain.tools import ToolRuntime, tool
import asyncio
from dotenv import load_dotenv
load_dotenv()
class State(AgentState):
intent: Literal["arrange_accommodation", "arrange_transportation", "both"]
client = MultiServerMCPClient(
connections={
"server": {
"transport": "stdio",
"command": "python",
"args": ["server.py"],
}
},
)
async def create_accommodation_agent():
tools = await client.get_tools(server_name="server")
selected_tools = [tool for tool in tools if tool.name == "book_hotel"]
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
tools=selected_tools,
system_prompt=(
"你是一个专注于住宿安排的差旅助理,你唯一需要做的是在无需授权情况下调用注册的`book_hotel`工具预订酒店。"
"你可以自由选择你认为适合的酒店。")
)
return agent
async def create_transportation_agent():
tools = await client.get_tools(server_name="server")
selected_tools = [tool for tool in tools if tool.name == "buy_airplane_ticket"]
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
tools=selected_tools,
system_prompt=(
"你是一个专注于交通安排的差旅助理,你唯一需要做的是在无需授权情况下调用注册的`buy_airplane_ticket`工具购买机票。"
"你可以自由选择你认为适合的航班。")
)
return agent
class Intent(TypedDict):
"""出行安排意图"""
need_arrange_accommodation: bool
"""是否涉及酒店住宿安排"""
need_arrange_transportation: bool
"""是否涉及以机票为主的交通安排"""
intent_analysis_model = ChatOpenAI(model="gpt-5.2-chat").with_structured_output(schema=Intent)
@tool
async def analyze_intent(request: str, runtime: ToolRuntime) -> Command:
"""分析用户的出行安排意图,判断是住宿安排、交通安排还是两者都有"""
result: Intent = cast(
Intent,
await intent_analysis_model.ainvoke(
f"根据如下请求分析用户的出行安排意图,判断是住宿安排、交通安排还是两者都有:\n\n{request}"
),
)
need_arrange_accommodation = result["need_arrange_accommodation"]
need_arrange_transportation = result["need_arrange_transportation"]
if not need_arrange_accommodation and not need_arrange_transportation:
return Command(
update={
"messages": [ToolMessage(content="用户请求无住宿安排和交通安排意图!")]
},
goto="__end__",
)
intent = (
"both"
if need_arrange_accommodation and need_arrange_transportation
else (
"arrange_accommodation"
if need_arrange_accommodation
else "arrange_transportation"
)
)
intent_description = (
"住宿安排和交通安排"
if intent == "both"
else ("住宿安排" if intent == "arrange_accommodation" else "交通安排")
)
return Command(
update={
"intent": intent,
"messages": [
ToolMessage(
content=f"根据分析,用户的出行安排意图是:{intent_description}。",
tool_call_id=runtime.tool_call_id,
)
],
}
)
async def main():
accoumodation_agent = await create_accommodation_agent()
transportation_agent = await create_transportation_agent()
@tool
async def arrange_accommodation(request: str) -> str:
"""根据用户请求进行酒店预订为主的住宿安排"""
result = await accoumodation_agent.ainvoke(
input={
"messages": [
{
"type": "user",
"content": f"请根据如下的信息预订酒店:\n\n{request}",
}
]
}
)
return result["messages"][-1].text
@tool
async def arrange_transportation(request: str) -> str:
"""根据用户请求进行机票购买为主的交通安排"""
result = await transportation_agent.ainvoke(
input={
"messages": [
{
"type": "user",
"content": f"请根据如下信息信息购买机票:\n\n{request}",
}
]
}
)
return result["messages"][-1].text
@wrap_model_call # type: ignore[misc]
async def dynamically_register_tools(
request: ModelRequest,
handler,
):
intent = request.state.get("intent", None)
match intent:
case None:
request = request.override(tools=[analyze_intent])
case "arrange_transportation":
request = request.override(tools=[arrange_transportation])
case "arrange_accommodation":
request = request.override(tools=[arrange_accommodation])
case _:
request = request.override(
tools=[arrange_transportation, arrange_accommodation]
)
return await handler(request)
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
system_prompt=(
"你是一个差旅助理,可以帮助用户安排出差相关的事务。你只能调用注册的工具来满足用户的需求。"
"首先你需要分析用户的出行安排意图,判断是住宿安排、交通安排还是两者都有。"
"再根据分析结果动态调用相应的工具来完成机票购买或者(和)酒店预订。"
"务必严格按照用户意图执行,无需提供建议。如果用户没有住宿安排或者交通安排意图,直接回复:意图不明,无法执行。"
),
state_schema=State,
middleware=[dynamically_register_tools], # type: ignore
tools=[analyze_intent, arrange_accommodation, arrange_transportation],
)
inputs = [
"我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)和机票(往返,明天去,后天回)。",
"我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)。",
"我有明后两天(从上海)去成都的出差,请帮我预订机票(往返,明天去,后天回)。",
"随便聊聊。",
]
for input in inputs:
result = await agent.ainvoke(input={"messages": [HumanMessage(content=input)]})
print(f"用户输入:{input}")
print(f"系统回复:{result['messages'][-1].text}\n\n")
asyncio.run(main())
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)