[LangChain中的Multi-Agent模式-02]Router:多领域任务并行时的路由风险评估与分发策略
在路由架构中,路由步骤会对输入进行分类,并将其定向到专门的Agent,其主要特征包括:
- 路由器将查询分解;
- 并行调用零个或多个专用Agent;
- 将结果综合成一个连贯的响应;
当我们拥有不同的垂直领域(各自独立的知识领域,每个领域都需要自己的Agent)、需要并行查询多个数据源,并且希望将结果综合成一个统一的响应时,特别适合使用路由模式。对于上面采用Supervisor模式实现的差旅助手,如果切换成这种模式,可以采用下图所示的路由方式。
熟悉LangGraph的人应该知道,上图本质上是由StateGraph构建的状态图。话句话说,路由模式可以利用LangGraph来构建。
1. 构建路由器
路由会对输入进行分类,并将其定向到专门的Agent。对于我们这个差旅助手来说,这个路由器就是意图分析器。我们将这个路由器定义成如下这个router方法,它最终会成为StateGraph状态图中的一个节点。
IntentType = Literal["book_hotel", "buy_airplane_ticket","both","none"]
class State(AgentState):
intent:IntentType|None
class Intent(TypedDict):
"""出行安排意图"""
need_arrange_accommodation:bool
"""是否涉及酒店住宿安排"""
need_arrange_transportation:bool
"""是否涉及以机票为主的交通安排"""
model = ChatOpenAI(model="gpt-5.2-chat")
async def router(state:State) -> dict:
result:Intent = cast(Intent, await model.with_structured_output(schema=Intent)
.ainvoke((f"请根据以下请求分析用户的出行安排意图,判断是购买机票、预订酒店还是两者都有:{state['messages'][-1].content}")))
need_book_hotel = result["need_arrange_accommodation"]
need_buy_airplane_ticket = result["need_arrange_transportation"]
intent: IntentType = "none"
if need_book_hotel and need_buy_airplane_ticket:
intent = "both"
elif need_book_hotel:
intent = "book_hotel"
else:
intent = "buy_airplane_ticket"
return {"intent": intent}
如上面的代码片段所示,我们依然在继承自AgentState的状态类型State中添加了表示意图的字段intent。实现在router函数中的意图分析依然了利用LLM来完成,并且同样利用结构化输出得到一个明确的意图。前面以工具形式实现的意图分析通过返回的Command将分析结果写入状态,这里直接利用返回的字典达到相同的目的。
2. 利用节点封装Sub-Agent
主管模式利用工具封装Sub-Agent,现在我们直接利用LangGraph构建工作流,直接将Sub-Agent封装在状态图的节点中。和前面一样,我们利用定义的函数create_accommodation_agent和create_transportation_agent来创建分别用于安排住宿和交通出行的Sub-Agent,后者利用MultiServerMCPClient链接MCP服务器得到所需的工具book_hotel和buy_airplane_ticket。MCP服务器的构建在“SubAgent:集中编排视角下的上下文隔离与并行化实现”已经介绍过了,这里不再赘述。
client = MultiServerMCPClient(
connections= {
"server": {
"transport": "stdio",
"command": "python",
"args": ["server.py"]
}
},
)
async def create_accommodation_agent():
tools = await client.get_tools(server_name="server")
selected_tools = [tool for tool in tools if tool.name == "book_hotel"]
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
tools= selected_tools,
system_prompt=("你是一个专注于住宿安排的差旅助理,你唯一需要做的是在无需授权情况下直接调用注册的`book_hotel`工具预订酒店。"
"完成预订是首要任务,无需考虑其他任何信息。你可以完全自由地选择酒店、价位和房型等信息,无需用户确认。")
)
return agent
async def create_transportation_agent():
tools = await client.get_tools(server_name="server")
selected_tools = [tool for tool in tools if tool.name == "buy_airplane_ticket"]
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
tools= selected_tools,
system_prompt=("你是一个专注于交通安排的差旅助理,你唯一需要做的是在无需授权情况下调用注册的`buy_airplane_ticket`工具购买机票。"
"完成购买是首要任务,无需考虑其他任何信息。你可以完全自由地选择具体的航司、舱位等级和航班等,无需用户确认。")
)
return agent
async def main():
accommodation_agent = await create_accommodation_agent()
transportation_agent = await create_transportation_agent()
async def arrange_transportation(state:State) -> dict:
result = await transportation_agent.ainvoke({"messages":state["messages"]}) # type: ignore
return {"messages":[result["messages"][-1]]}
async def arrange_accommodation(state:State) -> dict:
result = await accommodation_agent.ainvoke({"messages":state["messages"]}) # type: ignore
return {"messages":[result["messages"][-1]]}
在main函数中定义的arrange_transportation和arrange_accommodation就是用于封装Sub-Agent的节点函数,它们在完成Sub-Agent调用之后,只会将最后的结果(最后一条AIMessage)写入消息历史,所以Main-Agent的上下文窗口压力会大大减轻。
3. 路由汇总
路由将请求按照预定义的路径分发出去进行并行处理后,一般还需要对这些结果进行汇总。对于我们的例子来说,我们需要一个汇总节点对已经完成的交通出行和住宿安排进行整理后统一显示给用户。如下这个synthesize就是这么一个节点函数,它直接将整理工作交给LLM来完成。
async def main():
...
async def synthesize(state:State) -> dict:
message = HumanMessage(content="整理已经制定好的出行安排。如果没有发现任何出行安排,请回复用户:意图不明,无法执行。")
result = await model.ainvoke([message,*state["messages"]])
return {"messages": [result]}
4. 构建Agent
上图展示的状态图由如下的程序来构建。我们针对状态类型State创建了一个StateGraph对象,并添加了四个函数(router、arrange_transportation、arrange_accommodation和synthesize)对应的节点。router和synthesize分别被设置成入口节点和完成节点。router和其余三个节点之间具有条件边,条件函数route_condition根据状态存储的意图决定目标节点。最终的Agent通过编译StateGraph生成。
async def main():
def route_condition(state:State) -> list[str]:
if state["intent"] == "both":
return ["arrange_transportation", "arrange_accommodation"]
elif state["intent"] == "book_hotel":
return ["arrange_accommodation"]
elif state["intent"] == "buy_airplane_ticket":
return ["arrange_transportation"]
else:
return ["synthesize"]
agent = (StateGraph(State)
.add_node("router", router)
.add_node("arrange_transportation", arrange_transportation)
.add_node("arrange_accommodation", arrange_accommodation)
.add_node("synthesize", synthesize)
.set_entry_point("router")
.set_finish_point("synthesize")
.add_conditional_edges("router", lambda state: route_condition(state),["arrange_transportation", "arrange_accommodation", "synthesize"])
.add_edge("arrange_transportation", "synthesize")
.add_edge("arrange_accommodation", "synthesize")
).compile()
5. 测试Agent
我们采用与前面一样的测试用例(供了四种输入来模拟四种情况:同时包含酒店预订和机票购买需求、只包含酒店预订或者机票购买需求以及不涉及这两种需求)对构建的Agent进行测试:
async def main():
...
inputs = [
"我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)和机票(往返,明天去,后天回)。",
"我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)。",
"我有明后两天(从上海)去成都的出差,请帮我预订机票(往返,明天去,后天回)。",
"随便聊聊。",
]
for input in inputs:
result = await agent.ainvoke(input={"messages": [HumanMessage(content=input)]})
print(f"用户输入:{input}")
print(f"系统回复:{result['messages'][-1].text}\n\n")
以下是对应的四段输出:
用户输入:我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)和机票(往返,明天出发,后天返回)。
系统回复:📋 **已预订信息汇总**
**🏨 酒店预订**
- 城市:成都
- 酒店名称:如家酒店
- 入住时间:2026-04-17
- 离店时间:2026-04-18
**✈️ 机票信息(往返)**
- **去程**
- 上海 → 成都
- 出发日期:2026-04-17
- 航班号:MU5401
- **返程**
- 成都 → 上海
- 出发日期:2026-04-18
- 航班号:MU5402
如需我继续帮您核对订单、调整行程或补充其他出差安排,请告诉我。
用户输入:我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)。
系统回复:📋 **预订信息汇总**
**酒店预订**
- 🏨 酒店名称:如家酒店
- 📍 城市:成都
- 📅 入住时间:2026年4月17日
- 📅 离店时间:2026年4月18日
**机票信息**
- ✈️ 暂无机票预订信息
用户输入:我有明后两天(从上海)去成都的出差,请帮我预订机票(往返,明天出发,后天返回)。
系统回复:### 已有预订信息汇总
✈️ **机票预订**
- **行程**:上海 ⇄ 成都(往返)
- **去程**:
- 日期:2026-04-17
- 航班号:MU5401
- **返程**:
- 日期:2026-04-18
- 航班号:MU5402
🏨 **酒店预订**:无(已忽略)
如需补充酒店预订或调整航班信息,请告知。
用户输入:随便说说!
系统回复:意图不明,无法执行。
对于第一个输入,如图反映了Agent内部完整的调用链,可以看出它将完整的流程走了一遍。
6. 完整程序
下面给出完整程序:
from typing import Literal,TypedDict,cast
from langchain.agents import create_agent,AgentState
from langgraph.graph import StateGraph
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_core.messages import HumanMessage
import asyncio
from dotenv import load_dotenv
load_dotenv()
IntentType = Literal["book_hotel", "buy_airplane_ticket","both","none"]
class State(AgentState):
intent:IntentType|None
class Intent(TypedDict):
"""出行安排意图"""
need_arrange_accommodation:bool
"""是否涉及酒店住宿安排"""
need_arrange_transportation:bool
"""是否涉及以机票为主的交通安排"""
model = ChatOpenAI(model="gpt-5.2-chat")
async def router(state:State) -> dict:
result:Intent = cast(Intent, await model.with_structured_output(schema=Intent)
.ainvoke((f"请根据以下请求分析用户的出行安排意图,判断是购买机票、预订酒店还是两者都有:{state['messages'][-1].content}")))
need_book_hotel = result["need_arrange_accommodation"]
need_buy_airplane_ticket = result["need_arrange_transportation"]
intent: IntentType = "none"
if need_book_hotel and need_buy_airplane_ticket:
intent = "both"
elif need_book_hotel:
intent = "book_hotel"
else:
intent = "buy_airplane_ticket"
return {"intent": intent}
client = MultiServerMCPClient(
connections= {
"server": {
"transport": "stdio",
"command": "python",
"args": ["server.py"]
}
},
)
async def create_accommodation_agent():
tools = await client.get_tools(server_name="server")
selected_tools = [tool for tool in tools if tool.name == "book_hotel"]
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
tools= selected_tools,
system_prompt=("你是一个专注于住宿安排的差旅助理,你唯一需要做的是在无需授权情况下直接调用注册的`book_hotel`工具预订酒店。"
"完成预订是首要任务,无需考虑其他任何信息。你可以完全自由地选择酒店、价位和房型等信息,无需用户确认。")
)
return agent
async def create_transportation_agent():
tools = await client.get_tools(server_name="server")
selected_tools = [tool for tool in tools if tool.name == "buy_airplane_ticket"]
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
tools= selected_tools,
system_prompt=("你是一个专注于交通安排的差旅助理,你唯一需要做的是在无需授权情况下调用注册的`buy_airplane_ticket`工具购买机票。"
"完成购买是首要任务,无需考虑其他任何信息。你可以完全自由地选择具体的航司、舱位等级和航班等,无需用户确认。")
)
return agent
async def main():
accommodation_agent = await create_accommodation_agent()
transportation_agent = await create_transportation_agent()
async def arrange_transportation(state:State) -> dict:
result = await transportation_agent.ainvoke({"messages":state["messages"]}) # type: ignore
return {"messages":[result["messages"][-1]]}
async def arrange_accommodation(state:State) -> dict:
result = await accommodation_agent.ainvoke({"messages":state["messages"]}) # type: ignore
return {"messages":[result["messages"][-1]]}
async def synthesize(state:State) -> dict:
message = HumanMessage(content="汇总预订的酒店(如果没有请忽略)和购买机票信息(如果没有请忽略)。如果没有任何预订信息,请回复:意图不明,无法执行。")
result = await model.ainvoke([message,*state["messages"]])
return {"messages": [result]}
def route_condition(state:State) -> list[str]:
if state["intent"] == "both":
return ["arrange_transportation", "arrange_accommodation"]
elif state["intent"] == "book_hotel":
return ["arrange_accommodation"]
elif state["intent"] == "buy_airplane_ticket":
return ["arrange_transportation"]
else:
return ["synthesize"]
agent = (StateGraph(State)
.add_node("router", router)
.add_node("arrange_transportation", arrange_transportation)
.add_node("arrange_accommodation", arrange_accommodation)
.add_node("synthesize", synthesize)
.set_entry_point("router")
.set_finish_point("synthesize")
.add_conditional_edges("router", lambda state: route_condition(state),["arrange_transportation", "arrange_accommodation", "synthesize"])
.add_edge("arrange_transportation", "synthesize")
.add_edge("arrange_accommodation", "synthesize")
).compile()
from PIL import Image as PILImage
import io
# payload = agent.get_graph().draw_mermaid_png()
# PILImage.open(io.BytesIO( payload)).show()
inputs =[
"我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)和机票(往返,明天出发,后天返回)。",
"我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)。",
"我有明后两天(从上海)去成都的出差,请帮我预订机票(往返,明天出发,后天返回)。",
"随便说说!"
]
for message in inputs:
reuslt = await agent.ainvoke(input= {"messages": [HumanMessage(content=message)]}) # type: ignore
print(f"用户输入:{message}")
print(f"系统回复:{reuslt['messages'][-1].text}\n\n")
asyncio.run(main())
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)