[对比学习LangChain和MAF-05]两种截然不同的Agent管道设计
从消息交换的角度来看,Agent提供一个用于LLM请求和相应的管道。不我们属性的Web框架的处理管道不太一样的是,这个管道涉及的消息交换模式并非简单的请求-响应模式,内部会涉及ReAct循环。接下来我们LangChain和MAF的Agent管道在设计上有什么异同。
1. LangChain
对于LangGraph创建的Agent来说,根本没有所谓的管道一说,因为在这种编程模式下,我们会利用StateGraph根据推理流程构建一个对应的图,编译生成的Agent会严格按照这个图来执行。我们所谓的管道指的是,当我们使用create_agent函数来创建Agent是,由注册的中间件来构建的一个用于调用LLM和工具的消息处理管道。
1.1 基于AgentState的双节点状态图
create_agent函数根据指定的模型和工具集创建的Agent,底层使用的依然还是LangGraph的那一套。构建的图默认使用AgentState作为其状态类型,表示对话历史的messages是它的核心状态成员。状态图具有两个节点,一个用于封装模型(节点默认名称为model),另一个承载所有的工具(节点默认名称为tools)。模型节点与工具节点之间具有一条动态条件边,当LLM返回的AIMessage包含工具调用时,被激活的这条边会路由到tools节点完成工具调用,反之则意味着AIMessage携带的就是最终的结果,整个推理过程就此结束。工具节点与模型节点有一条静态边,所以工具执行后会再次回到模型节点,后者在新的状态下完成下一步推理。
LangChain利用create_agent工厂函数创建Agent,一个最简单的Agent只需要指定模型即可,但一般情况我们都需要注册相应的工具。下面的代码利用create_agent函数创建了一个由ChatOpenAI模型和两个注册工具组成的Agent:
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from PIL import Image
from dotenv import load_dotenv
import io
load_dotenv()
@tool
async def foo():
"""test tool foo"""
@tool
async def bar():
"""test tool bar"""
agent = create_agent(
model= ChatOpenAI(name= "gpt-5.2-chat"),
tools=[foo,bar]
)
Image.open(io.BytesIO(agent.get_graph().draw_mermaid_png())).show()
当这个Agent被转换成图后具有如下的结构。这是一个包含两个节点的状态图,一个是用于决策和推理的model节点,作为决策执行者的工具全部被封装到tools节点中。
1.2 中间件的作用
这种AgentState + 双节点的结构虽然简单,但却能满足常规的推理任务。对于更复杂的推理任务,一方面我们可以扩展AgentState,提供更多的状态成员来承载更多的上下文信息;另一方面我们也可以通过注册AgentMiddleware来添加更多的节点来完善工作流。具体来说,注册的AgentMiddleware提供了如下的功能:
- 添加状态字段:如果
AgentMiddleware涉及到针对状态更新,对应的状态字段会定义在state_schema字段返回的状态类型中。此状态状态类型通常是AgentState的子类,定义其中的字段最终会转换成通道; - 用于注册工具:当中间件被注册到创建的Agent上时,存储在其
tools字段中的工具会自动注册到Agent上。这相当于提供了一种模块化的工具开发和注册的方式; - 添加节点:当中间件重写了
before_agent/abefore_agent、before_model/abefore_model、after_model/aafter_model、after_agent/aafter_agent方法,都会在状态图中相应的位置添加一个节点。Middleware相当于利用此方式完善了Agent的工作流; - 包装模型和工具调用:中间件利用重写的
wrap_model_call/awrap_model_call、wrap_tool_call/awrap_tool_call方法对模型和工具的调用进行包装,将AOP引入到模型和工具的调用中,使得在调用前后添加一些额外的操作变得非常简单。比如很多中间件都具有各自的系统提示词,它们基本上都是利用重写的wrap_model_call/awrap_model_call方法的方式实现针对系统提示词的注入;
class AgentMiddleware(Generic[StateT, ContextT]):
def before_agent(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None
async def abefore_agent(
self, state: StateT, runtime: Runtime[ContextT]
) -> dict[str, Any] | None
def before_model(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None
async def abefore_model(
self, state: StateT, runtime: Runtime[ContextT]
) -> dict[str, Any] | None
def after_model(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None
async def aafter_model(
self, state: StateT, runtime: Runtime[ContextT]
) -> dict[str, Any] | None
def after_agent(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None
async def aafter_agent(
self, state: StateT, runtime: Runtime[ContextT]
) -> dict[str, Any] | None
def wrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], ModelResponse],
) -> ModelCallResult
async def awrap_model_call(
self,
request: ModelRequest,
handler: Callable[[ModelRequest], Awaitable[ModelResponse]],
) -> ModelCallResult
def wrap_tool_call(
self,
request: ToolCallRequest,
handler: Callable[[ToolCallRequest], ToolMessage | Command[Any]],
) -> ToolMessage | Command[Any]
async def awrap_tool_call(
self,
request: ToolCallRequest,
handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command[Any]]],
) -> ToolMessage | Command[Any]
1.3 Agent管道的构建
在下面的程序中,我们定义了一个FooMiddleware和BarMiddleware,并将其注册到Agent上
from langchain.agents import create_agent
from dotenv import load_dotenv
from langchain.agents.middleware.types import AgentState, ExtendedModelResponse, ModelRequest, ModelResponse
from langchain_openai import ChatOpenAI
from langchain.agents.middleware import AgentMiddleware
from langgraph.prebuilt.tool_node import ToolCallRequest
from langgraph.runtime import Runtime
from typing import Any,Callable
from langchain_core.messages import AIMessage, ToolMessage
from langchain_core.tools import tool
from langgraph.types import Command
from PIL import Image
import io
load_dotenv()
@tool
def get_weather(city:str) -> str:
"""Get weather information for given city"""
return f"It's sunny today in {city}."
class BaseMiddleware(AgentMiddleware):
def before_agent(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None:
print(f"{self.name}.before_agent")
return super().before_agent(state, runtime)
def before_model(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None:
print(f"{self.name}.before_model")
return super().before_model(state, runtime)
def after_agent(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None:
print(f"{self.name}.after_agent")
return super().after_agent(state, runtime)
def after_model(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None:
print(f"{self.name}.after_model")
return super().after_model(state, runtime)
def wrap_tool_call(self, request: ToolCallRequest, handler: Callable[[ToolCallRequest], ToolMessage | Command[Any]]) -> ToolMessage | Command[Any]:
print(f"{self.name}.wrap_tool_call")
return handler(request)
def wrap_model_call(self, request: ModelRequest[None], handler: Callable[[ModelRequest[None]], ModelResponse[Any]]) -> ModelResponse[Any] | AIMessage | ExtendedModelResponse[Any]:
print(f"{self.name}.wrap_model_call")
return handler(request)
class FooMiddleware(BaseMiddleware):
pass
class BarMiddleware(BaseMiddleware):
pass
agent = create_agent(
model= ChatOpenAI(model="gpt-5.2-chat"),
tools=[get_weather],
middleware=[FooMiddleware(), BarMiddleware()])
result = agent.invoke(input={
"messages":[{"role":"user", "content":"What is the weather like in Suzhou?"}]
})
Image.open(io.BytesIO(agent.get_graph().draw_mermaid_png())).show()
print(result["messages"][-1].content)
我们调用Agent的get_graph方法将它转换成一个Graph对象,并调用Graph对象的draw_mermaid_png方法将图以Mermaid格式绘制成PNG图片。从下图可以看出,由于我们注册了两个中间件,所以会在状态图中添加八个节点,至于整张图前后的两个节点对应中间件的before_agent和after_agent方法,位于模型节点前后的四个节点对应中间件的before_model和after_model方法。中间件的wrap_tool_call和wrap_model_call方法会用于包装现有针对模型和工具的调用。
在定义注册中间件的基类时,我们在每个方法上都添加了输出语句,以便我们在调用Agent时能够清晰地看到每个方法的调用顺序。我们的调用会产生如下的输出:
FooMiddleware.before_agent
BarMiddleware.before_agent
FooMiddleware.before_model
BarMiddleware.before_model
FooMiddleware.wrap_model_call
BarMiddleware.wrap_model_call
BarMiddleware.after_model
FooMiddleware.after_model
FooMiddleware.wrap_tool_call
BarMiddleware.wrap_tool_call
FooMiddleware.before_model
BarMiddleware.before_model
FooMiddleware.wrap_model_call
BarMiddleware.wrap_model_call
BarMiddleware.after_model
FooMiddleware.after_model
BarMiddleware.after_agent
FooMiddleware.after_agent
It’s sunny in Suzhou today. ☀️
我们可以简单分析一下这个输出体现的执行流程:
- 对于每次Agent的调用,中间件重写的
before_agent方法会在Agent执行前被调用,after_agent方法会在Agent执行后被调用; - 涉及两次针对LLM的调用,所以中间件的
before_model、after_model和wrap_model_call方法会被调用两次; - 涉及一次工具调用,所以中间件的
wrap_tool_call方法会被调用一次;
2. MAF
MAF具有不同类型的AIAgent类型,这里我们只关注最常用的ChatClientAgent。它具有如下图所示的管道结构:
从右到左看,整个管道由三个部分组成:
- IChatClient管道:与LLM交互的是由LLM客户端构建的
IChatClient对象,我们可以采用装饰器的设计模式为它添加任意作为装饰器的DelegatingChatClient对象。DelegatingChatClient就是IChatClient的中间件,整个IChatClient管道就是由这些中间件装饰而成的一个链式结构; - 输入输出增强:
ChatClientAgent利用注册的AIContextProvider为输入输出增强提供了一个非常灵活的机制,我们可以利用注册的AIContextProvider在调用IChatClient之前对输入消息列表、注册的工具集以及调用LLM的系统指令进行加工;同样的,在得到LLM的响应后,我们也可以利用AIContextProvider对响应消息进行加工处理。输入输出增强机制为我们提供了一个非常灵活的方式来完善Agent的工作流; - Agent中间件:和构建
IChatClient管道的中间件类似,我们也可以利用DelegatingAIAgent作为ChatClientAgent的中间件来完善Agent的工作流。
下面的演示程序演示了如何使用上面介绍的这些对象来构建一个ChatClientAgent的管道。我们自定义了一个FakeChatClient来模拟与LLM交互的IChatClient,并定义了ChatClientMiddleware和AgentMiddleware作为IChatClient管道和Agent管道的中间件,NamedAIContextProvider作为输入输出增强的AIContextProvider。我们将它们组合在一起构建了一个ChatClientAgent,并调用RunAsync方法来运行这个Agent。
using Azure.AI.Projects;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
var agent = new FakeChatClient()
.AsBuilder()
.Use(inner => new ChatClientMiddleware(inner, "FooChatClientMiddleware"))
.Use(inner => new ChatClientMiddleware(inner, "BarChatClientMiddleware"))
.Build()
.AsAIAgent(new ChatClientAgentOptions { AIContextProviders = [new NamedAIContextProvider("FooAIContextProvider"), new NamedAIContextProvider("BarAIContextProvider")] })
.AsBuilder()
.Use(inner => new AgentMiddleware(inner, "FooAgentMiddleware"))
.Use(inner => new AgentMiddleware(inner, "BarAgentMiddleware"))
.Build()
;
await agent.RunAsync();
class FakeChatClient : IChatClient
{
public void Dispose() { }
public Task<ChatResponse> GetResponseAsync(
IEnumerable<ChatMessage> messages,
ChatOptions? options = null,
CancellationToken cancellationToken = default)
{
Console.WriteLine("FakeChatClient.GetResponseAsync");
return Task.FromResult(new ChatResponse(new[] { new ChatMessage(ChatRole.Assistant, "Fake response") }));
}
public object? GetService(Type serviceType, object? serviceKey = null)=>null;
public IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseAsync(
IEnumerable<ChatMessage> messages,
ChatOptions? options = null,
CancellationToken cancellationToken = default)
=> throw new NotImplementedException();
}
class ChatClientMiddleware(IChatClient innerClient, string name) : DelegatingChatClient(innerClient)
{
public override Task<ChatResponse> GetResponseAsync(
IEnumerable<ChatMessage> messages,
ChatOptions? options = null,
CancellationToken cancellationToken = default)
{
Console.WriteLine($"{name}.GetResponseAsync[before]");
var result = base.GetResponseAsync(messages, options, cancellationToken);
Console.WriteLine($"{name}.GetResponseAsync[after]");
return result;
}
}
class AgentMiddleware(AIAgent innerAgent, string name) : DelegatingAIAgent(innerAgent)
{
protected override Task<AgentResponse> RunCoreAsync(
IEnumerable<ChatMessage> messages,
AgentSession? session = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default)
{
Console.WriteLine($"{name}.RunAsync[before]");
var result = base.RunCoreAsync(messages, session, options, cancellationToken);
Console.WriteLine($"{name}.RunAsync[after]");
return result;
}
}
class NamedAIContextProvider(string name) : AIContextProvider
{
protected override ValueTask<AIContext> InvokingCoreAsync(
InvokingContext context,
CancellationToken cancellationToken = default)
{
Console.WriteLine($"{name}.InvokingAsync");
return base.InvokingCoreAsync(context, cancellationToken);
}
protected override ValueTask InvokedCoreAsync(
InvokedContext context,
CancellationToken cancellationToken = default)
{
Console.WriteLine($"{name}.InvokedAsync");
return base.InvokedCoreAsync(context, cancellationToken);
}
public override IReadOnlyList<string> StateKeys => [$"{name}.{GetType().Name}"];
}
程序运行后会产生如下的输出。对照一下上面的管道图,我们会发现输出的顺序完全符合管道图所示的执行流程:
FooAgentMiddleware.RunAsync[before]
BarAgentMiddleware.RunAsync[before]
FooAIContextProvider.InvokingAsync
BarAIContextProvider.InvokingAsync
FooChatClientMiddleware.GetResponseAsync[before]
BarChatClientMiddleware.GetResponseAsync[before]
FakeChatClient.GetResponseAsync
BarChatClientMiddleware.GetResponseAsync[after]
FooChatClientMiddleware.GetResponseAsync[after]
FooAIContextProvider.InvokedAsync
BarAIContextProvider.InvokedAsync
BarAgentMiddleware.RunAsync[after]
FooAgentMiddleware.RunAsync[after]
我的博文系列“MAF的Agent管道详解”提供了对MAF的Agent管道的详细解读,感兴趣的读者可以前往阅读。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)