[LangChain中的Multi-Agent模式-03]Handoffs:状态驱动的多阶段流程编排与状态机管理
在交接(Handoffs)模式中,整个Agent被构建为一个状态机,当前的状态决定了可执行的操作,操作的执行又会导致状态的改变。该模式的一个典型实现方案为:利用工具更新一个持久存储的状态变量(例如current_step或active_agent),系统读取该变量来调整行为,包括应用不同的配置(系统提示、工具)或将请求路由到不同的Agent。这种模式既支持不同Agent之间的交接,也支持单个Agent内部的动态配置更改。该模式的典型特征包括:
- 状态驱动行为:行为会根据状态变量而改变;
- 基于工具的转换:工具更新状态变量以实现状态间的转换;
- 直接用户交互:每个状态的配置直接处理用户消息;
- 持久状态:状态在对话回合中保持不变。
交接模式特别适合使用在需要强制执行顺序约束的应用场景,比如客服人员需要在不同状态下与用户直接对话,或者构建多阶段对话流程时。对于我们构建的差旅助手,前面已经通过Sub-Agent和Router模式提供了实现,现在我们将它改成Handoffs交接模式重新实现一遍。
上图反映了采用Handoffs模式的执行流程:先进行意图分析,根据得到意图执行相应的状态:
- 没有机票购买和酒店预订需求:直接退出;
- 只有机票购买需求:完成机票购买任务后退出;
- 只有酒店预订需求:完成酒店预订任务后退出;
- 同时具有两种需求:先后完成机票购买和酒店预订后退出(不采用并行执行);
1. 意图分析
虽然这个系列的主题是Multi-Agent,但是如果考虑到我们将Sub-Agent封装在工具之中,交接模式与是否采用Multi-Agent没有关系,所以接下来我们采用Single-Agent方法来演示,我们直接调用MCP服务器提供的buy_airplane_ticket和book_hotel工具。
由于交接模式采用基于状态的顺序执行流程,所以状态类型State中除了定义表示意图的intent字段,还定义了current_step字段表示当前所在步骤。交接模式没有Sub-Agent架构中的Supervisor,也没有路由架构的汇总节点,所以我们定义额外的字段is_last_step表示当前是否是流程的最后一环,因为它需求对前面完成的工作做一个总结。
class State(AgentState):
intent:Literal["book_hotel", "buy_airplane_ticket","both"] |None
current_step: Annotated[Literal[ "analyze_intent","buy_airplane_ticket","book_hotel"] | None,AnyValue]
is_last_step: Annotated[bool,AnyValue]
分析意图的工具函数analyze_intent与之前的实现基本一致:我们将用于请求原封不动发送给LLM,并且利用结构化输出返回明确的出行安排意图。唯一的区别在于,每一步任务完成之后需要将下一步,以及是否是最后一步写入状态。按照我们的分析:如果请求包含机票购买的意图,下一步任务就是购买机票;如果只包含酒店预订需求,下一步就应该预订酒店,这一点体现在返回的Command针对current_step和is_last_step通道的写入上。如果两种需求皆无,整个流程结束,此时analyze_intent方法利用Command对goto字段的设置结束整个处理流程。
class Intent(TypedDict):
"""出行安排意图"""
need_book_hotel:bool
"""是否涉及预订酒店"""
need_buy_airplane_ticket:bool
"""是否涉及购买机票"""
intent_analysis_model = ChatOpenAI(model="gpt-5.2-chat").with_structured_output(schema=Intent)
@tool
async def analyze_intent(request:str, runtime:ToolRuntime) -> Command:
"""分析用户的出行安排意图,判断是购买机票、预订酒店还是两者都有"""
result:Intent = cast(Intent, await intent_analysis_model.ainvoke(f"根据如下请求分析用户的出行安排意图,判断是住宿安排、交通安排还是两者都有:\n\n{request}"))
need_book_hotel = result["need_book_hotel"]
need_buy_airplane_ticket = result["need_buy_airplane_ticket"]
if not need_book_hotel and not need_buy_airplane_ticket:
return Command(
update={"messages":[ToolMessage(content="用户请求无住宿安排和交通安排意图!")]},
goto="__end__",)
intent = "both" if need_book_hotel and need_buy_airplane_ticket else ("book_hotel" if need_book_hotel else "buy_airplane_ticket")
intent_description = "预订酒店和购买机票" if intent == "both" else ("预订酒店" if intent == "book_hotel" else "购买机票")
next_step = "buy_airplane_ticket" if need_buy_airplane_ticket else "book_hotel"
return Command(
update={
"intent": intent,
"current_step": next_step,
"is_last_step": intent != "both",
"messages":[ToolMessage(
content=f"根据分析,用户的出行安排意图是:{intent_description}。",
tool_call_id = runtime.tool_call_id,)]
})
2. 购买机票后如何预订酒店
由于意图分析是我们自定义的工具,我们可以利用返回的Command设置下一步。如果同时具有机票购买和酒店预订的需求,在购买机票后需要将下一步设置为酒店预订,但是购买机票是MCP服务器提供的工具,该如何实现的。这可以借助于langchain_mcp_adapters提供的ToolCallInterceptor来拦截调用MCP工具的结果,并将其封装成返回的Command来完成对应通道的写入。这个ToolCallInterceptor是通过如下这个transfor_to_next_step函数实现的。
async def transfor_to_next_step(request: MCPToolCallRequest, handler):
runtime: ToolRuntime[Any,State] = cast(ToolRuntime[Any,State], request.runtime)
result = cast(CallToolResult, await handler(request))
tool_name = request.name
intent = runtime.state["intent"]
if intent == "both" and tool_name == "buy_airplane_ticket":
contents, aircraft = _convert_call_tool_result(result)
contentBlocks = cast(list[ContentBlock], contents)
tool_message = ToolMessage(content_blocks = contentBlocks, aircraft=aircraft, tool_call_id = runtime.tool_call_id)
return Command(
update={
"messages": [tool_message],
"current_step": "book_hotel",
"is_last_step": True,
})
return result
client = MultiServerMCPClient(
connections= {
"server": {
"transport": "stdio",
"command": "python",
"args": ["server.py"]
}
},
tool_interceptors=[transfor_to_next_step])
3. 利用系统提示词和工具提供控制流程
和前面介绍的主管模式和路由模式相比,你会发现交接模式最为简洁:它不需要一个协调者(Supervisor或者路由器),完全由参与者自行完成交接任务。由于工具永远是被动的执行者,所以流程的流转还得由LLM来完成,所以我们必需在某一步都为它提供精准的系统提示词指导它工作,并且提供给它所需的工具集去执行它的决策,所以这里依然会使用到LangChain的中间件。
这个为每一步设置系统提示词和工具集的中间件,体现在如下所示的标注了@wrap_model_call包装器的configure_step函数上。如代码片段所示,我们为每一步设置可准确的系统提示词和工具集。中间件在拦截针对模型的调用后,从状态中提取当前的步骤后,对当前使用的提示词和工具集进行设置。如果当前为最后异步,还需要借助于系统提示词让它汇总出行信息。
async def main():
tools = {tool.name:tool for tool in await client.get_tools(server_name="server")}
tools["analyze_intent"] = analyze_intent
step_based_tooks = {
"analyze_intent":[analyze_intent],
"buy_airplane_ticket":[tools["buy_airplane_ticket"]],
"book_hotel":[tools["book_hotel"]],
}
step_based_system_messages = {
"analyze_intent": SystemMessage(content=(
"你是一个善于语义分析的差旅助理,你唯一的工作是调用`analyze_intent`工具根据请求分析出行安排意图,判断是需要预订酒店、购买机票还是两者都需要。"
"如果请求中没有出行安排相关的意图,请直接回复:意图不明,无法执行。")),
"buy_airplane_ticket": SystemMessage(content=(
"你是一个专注于交通安排的差旅助理,你唯一的工作是在无需授权情况下调用注册的`buy_airplane_ticket`工具购买机票。"
"完成购买是首要任务,无需考虑其他任何信息。你可以自行决定具体的航司、舱位等级和航班。")),
"book_hotel": SystemMessage(content=(
"你是一个专注于住宿安排的差旅助理,你唯一的工作是在无需授权情况下调用注册的`book_hotel`工具预订酒店。"
"完成预订是首要任务,无需考虑其他任何信息。你可以自行决定具体的酒店、价位和房型等。"
)),
}
@wrap_model_call # type: ignore[misc]
async def configure_step(
request: ModelRequest,
handler,
) :
current_step = request.state.get("current_step", "analyze_intent")
is_last_step = request.state.get("is_last_step", False)
system_message = step_based_system_messages[current_step]
if is_last_step:
system_message = SystemMessage(f"{system_message.content} 汇总预订的酒店(如果没有请忽略)和购买机票信息(如果没有请忽略)。如果没有任何预订信息,请回复:意图不明,无法执行。")
request = request.override(
tools= step_based_tooks[current_step], # type: ignore
system_message=system_message,
)
return await handler(request) # type: ignore
4. 构建和测试Agent
我们利用MultiServerMCPClient连接“SubAgent:集中编排视角下的上下文隔离与并行化实现”中构建的MCP服务器,进而获取可用的工具。我们通过指定状态类型State、工具(包括自定义的用于意图分析的analyze_intent工具和MCP服务器提供的buy_airplane_ticket和book_hotel工具)和为每一步设置系统提示词和工具集的中间件调用create_agent函数将Agent创建出来,然后采用与前面一样的测试用例(供了四种输入来模拟四种情况:同时包含酒店预订和机票购买需求、只包含酒店预订或者机票购买需求以及不涉及这两种需求)对构建的Agent进行测试:
async def main():
client = MultiServerMCPClient(
connections= {
"server": {
"transport": "stdio",
"command": "python",
"args": ["server.py"]
}
},
tool_interceptors=[transfor_to_next_step])
tools = {tool.name:tool for tool in await client.get_tools(server_name="server")}
...
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
state_schema=State,
tools=[tool for tool in tools.values()],
middleware=[configure_step], # type: ignore
)
inputs =["我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)和机票(往返,明天出发,后天返回)。",
"我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)。",
"我有明后两天(从上海)去成都的出差,请帮我预订机票(往返,明天出发,后天返回)。",
"随便聊聊!"
]
for message in inputs:
reuslt = await agent.ainvoke(input= {"messages": [HumanMessage(content=message)]})
print(f"用户输入:{message}")
print(f"系统回复:{reuslt['messages'][-1].text}\n\n")
以下是对应的四段输出:
用户输入:我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)和机票(往返,明天出发,后天返回)。
系统回复:✅ 已识别您的出行安排意图:**需要同时预订机票和酒店**。
根据您的需求,已为您完成以下安排:
### ✈️ 机票预订(往返)
- **去程**:上海 → 成都
📅 出发时间:明天(2026-04-18)12:00
✈️ 航班号:MU5401
- **返程**:成都 → 上海
📅 出发时间:后天(2026-04-19)12:00
✈️ 航班号:MU5402
### 🏨 酒店预订
- **城市**:成都
- **酒店**:如家酒店
- **入住时间**:2026-04-18
- **退房时间**:2026-04-19
如需调整航班时间、酒店档次或添加其他出行服务(如接送机),随时告诉我 👍
用户输入:我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)。
系统回复:✅ **酒店预订已完成**
已为您成功预订以下酒店:
- **城市**:成都
- **酒店**:如家酒店
- **入住时间**:2026年4月18日
- **退房时间**:2026年4月19日
祝您成都出差顺利,如需调整或再次预订住宿,随时告诉我。
用户输入:我有明后两天(从上海)去成都的出差,请帮我预订机票(往返,明天出发,后天返回)。
系统回复:✅ **出行安排已识别并处理**
我已识别到您的出行意图为:**购买机票(往返)**。
✈️ **机票预订信息如下:**
- **去程**:上海 → 成都
出发时间:明天(4月18日)12:00
航班号:MU5401
- **返程**:成都 → 上海
出发时间:后天(4月19日)12:00
航班号:MU5402
机票已成功为您预订。如您还需要 **酒店预订、座位选择或改签提醒**,欢迎随时告诉我。祝您出差顺利!
用户输入:随便聊聊!
系统回复:意图不明,无法执行
再来看针对第一个请求,反映Agent内部的调用链,流转方式是不是比前面两种模式简单。但是由于交接模式一般采用顺序执行流程,如是涉及大量的并行计算(比如本例中购买机票和预订酒店没有实现为并行)就需要考虑一下是否适合了。如果一定要采用这种模式,将涉及大量并行推理的逻辑实现在一个Sub-Agent中,然后封装成一个工具作为顺序流程中的一个环节也是很容易实现的。
5. 完整程序
如下给出了完整的代码:
from typing import Any, Literal,Annotated, TypedDict,cast
from click import Command
from langchain_mcp_adapters.client import MultiServerMCPClient
from langgraph.channels import AnyValue
from langchain_mcp_adapters.interceptors import MCPToolCallRequest
from langchain.agents import AgentState, create_agent
from langchain.agents.middleware import wrap_model_call, ModelRequest
from langchain_openai import ChatOpenAI
from langchain.tools import tool,ToolRuntime
from langchain_core.messages import SystemMessage,ToolMessage,HumanMessage
from langchain_core.messages.content import ContentBlock
from langgraph.types import Command
from mcp.types import CallToolResult
from langchain_mcp_adapters.tools import _convert_call_tool_result
import asyncio
from dotenv import load_dotenv
load_dotenv()
class Intent(TypedDict):
"""出行安排意图"""
need_book_hotel:bool
"""是否涉及预订酒店"""
need_buy_airplane_ticket:bool
"""是否涉及以购买机票"""
intent_analysis_model = ChatOpenAI(model="gpt-5.2-chat").with_structured_output(schema=Intent)
@tool
async def analyze_intent(request:str, runtime:ToolRuntime) -> Command:
"""分析用户的出行安排意图,判断是购买机票、预订酒店还是两者都有"""
result:Intent = cast(Intent, await intent_analysis_model.ainvoke(f"根据如下请求分析用户的出行安排意图,判断是住宿安排、交通安排还是两者都有:\n\n{request}"))
need_book_hotel = result["need_book_hotel"]
need_buy_airplane_ticket = result["need_buy_airplane_ticket"]
if not need_book_hotel and not need_buy_airplane_ticket:
return Command(
update={"messages":[ToolMessage(content="用户请求无住宿安排和交通安排意图!")]},
goto="__end__",)
intent = "both" if need_book_hotel and need_buy_airplane_ticket else ("book_hotel" if need_book_hotel else "buy_airplane_ticket")
intent_description = "预订酒店和购买机票" if intent == "both" else ("预订酒店" if intent == "book_hotel" else "购买机票")
next_step = "buy_airplane_ticket" if need_buy_airplane_ticket else "book_hotel"
return Command(
update={
"intent": intent,
"current_step": next_step,
"is_last_step": intent != "both",
"messages":[ToolMessage(
content=f"根据分析,用户的出行安排意图是:{intent_description}。",
tool_call_id = runtime.tool_call_id,)]
})
class State(AgentState):
intent:Literal["book_hotel", "buy_airplane_ticket","both"] |None
current_step: Annotated[Literal[ "analyze_intent","buy_airplane_ticket","book_hotel"] | None,AnyValue]
is_last_step: Annotated[bool,AnyValue]
async def transfor_to_next_step(request: MCPToolCallRequest, handler):
runtime: ToolRuntime[Any,State] = cast(ToolRuntime[Any,State], request.runtime)
result = cast(CallToolResult, await handler(request))
tool_name = request.name
intent = runtime.state["intent"]
if intent == "both" and tool_name == "buy_airplane_ticket":
contents, aircraft = _convert_call_tool_result(result)
contentBlocks = cast(list[ContentBlock], contents)
tool_message = ToolMessage(content_blocks = contentBlocks, aircraft=aircraft, tool_call_id = runtime.tool_call_id)
return Command(
update={
"messages": [tool_message],
"current_step": "book_hotel",
"is_last_step": True,
})
return result
client = MultiServerMCPClient(
connections= {
"server": {
"transport": "stdio",
"command": "python",
"args": ["server.py"]
}
},
tool_interceptors=[transfor_to_next_step])
async def main():
tools = {tool.name:tool for tool in await client.get_tools(server_name="server")}
tools["analyze_intent"] = analyze_intent
step_based_tooks = {
"analyze_intent":[analyze_intent],
"buy_airplane_ticket":[tools["buy_airplane_ticket"]],
"book_hotel":[tools["book_hotel"]],
}
step_based_system_messages = {
"analyze_intent": SystemMessage(content=(
"你是一个善于语义分析的差旅助理,你唯一的工作是调用`analyze_intent`工具根据请求分析出行安排意图,判断是需要预订酒店、购买机票还是两者都需要。"
"如果请求中没有出行安排相关的意图,请直接回复:意图不明,无法执行。")),
"buy_airplane_ticket": SystemMessage(content=(
"你是一个专注于交通安排的差旅助理,你唯一的工作是在无需授权情况下调用注册的`buy_airplane_ticket`工具购买机票。"
"完成购买是首要任务,无需考虑其他任何信息。你可以自行决定具体的航司、舱位等级和航班。")),
"book_hotel": SystemMessage(content=(
"你是一个专注于住宿安排的差旅助理,你唯一的工作是在无需授权情况下调用注册的`book_hotel`工具预订酒店。"
"完成预订是首要任务,无需考虑其他任何信息。你可以自行决定具体的酒店、价位和房型等。"
)),
}
@wrap_model_call # type: ignore[misc]
async def configure_step(
request: ModelRequest,
handler,
) :
current_step = request.state.get("current_step", "analyze_intent")
is_last_step = request.state.get("is_last_step", False)
system_message = step_based_system_messages[current_step]
if is_last_step:
system_message = SystemMessage(f"{system_message.content} 汇总预订的酒店(如果没有请忽略)和购买机票信息(如果没有请忽略)。如果没有任何预订信息,请回复:意图不明,无法执行。")
request = request.override(
tools= step_based_tooks[current_step], # type: ignore
system_message=system_message,
)
return await handler(request) # type: ignore
agent = create_agent(
model=ChatOpenAI(model="gpt-5.2-chat"),
state_schema=State,
tools=[tool for tool in tools.values()],
middleware=[configure_step], # type: ignore
)
inputs =["我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)和机票(往返,明天出发,后天返回)。",
"我有明后两天(从上海)去成都的出差,请帮我预订酒店(明天入住,后天离开)。",
"我有明后两天(从上海)去成都的出差,请帮我预订机票(往返,明天出发,后天返回)。",
"随便聊聊!"
]
for message in inputs:
reuslt = await agent.ainvoke(input= {"messages": [HumanMessage(content=message)]})
print(f"用户输入:{message}")
print(f"系统回复:{reuslt['messages'][-1].text}\n\n")
asyncio.run(main())
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)