从消息交换的角度来看,Agent提供一个用于LLM请求和相应的管道。不我们属性的Web框架的处理管道不太一样的是,这个管道涉及的消息交换模式并非简单的请求-响应模式,内部会涉及ReAct循环。接下来我们LangChain和MAF的Agent管道在设计上有什么异同。

1. LangChain

对于LangGraph创建的Agent来说,根本没有所谓的管道一说,因为在这种编程模式下,我们会利用StateGraph根据推理流程构建一个对应的图,编译生成的Agent会严格按照这个图来执行。我们所谓的管道指的是,当我们使用create_agent函数来创建Agent是,由注册的中间件来构建的一个用于调用LLM和工具的消息处理管道。

1.1 基于AgentState的双节点状态图

create_agent函数根据指定的模型和工具集创建的Agent,底层使用的依然还是LangGraph的那一套。构建的图默认使用AgentState作为其状态类型,表示对话历史的messages是它的核心状态成员。状态图具有两个节点,一个用于封装模型(节点默认名称为model),另一个承载所有的工具(节点默认名称为tools)。模型节点与工具节点之间具有一条动态条件边,当LLM返回的AIMessage包含工具调用时,被激活的这条边会路由到tools节点完成工具调用,反之则意味着AIMessage携带的就是最终的结果,整个推理过程就此结束。工具节点与模型节点有一条静态边,所以工具执行后会再次回到模型节点,后者在新的状态下完成下一步推理。

LangChain利用create_agent工厂函数创建Agent,一个最简单的Agent只需要指定模型即可,但一般情况我们都需要注册相应的工具。下面的代码利用create_agent函数创建了一个由ChatOpenAI模型和两个注册工具组成的Agent:

from langchain.agents import create_agent
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from PIL import Image
from dotenv import load_dotenv
import io

load_dotenv()

@tool
async def foo():
    """test tool foo"""

@tool
async def bar():
    """test tool bar"""

agent = create_agent(
    model= ChatOpenAI(name= "gpt-5.2-chat"),
    tools=[foo,bar]
)

Image.open(io.BytesIO(agent.get_graph().draw_mermaid_png())).show()

当这个Agent被转换成图后具有如下的结构。这是一个包含两个节点的状态图,一个是用于决策和推理的model节点,作为决策执行者的工具全部被封装到tools节点中。

Alternative Text

1.2 中间件的作用

这种AgentState + 双节点的结构虽然简单,但却能满足常规的推理任务。对于更复杂的推理任务,一方面我们可以扩展AgentState,提供更多的状态成员来承载更多的上下文信息;另一方面我们也可以通过注册AgentMiddleware来添加更多的节点来完善工作流。具体来说,注册的AgentMiddleware提供了如下的功能:

  • 添加状态字段:如果AgentMiddleware涉及到针对状态更新,对应的状态字段会定义在state_schema字段返回的状态类型中。此状态状态类型通常是AgentState的子类,定义其中的字段最终会转换成通道;
  • 用于注册工具:当中间件被注册到创建的Agent上时,存储在其tools字段中的工具会自动注册到Agent上。这相当于提供了一种模块化的工具开发和注册的方式;
  • 添加节点:当中间件重写了before_agent/abefore_agentbefore_model/abefore_modelafter_model/aafter_modelafter_agent/aafter_agent方法,都会在状态图中相应的位置添加一个节点。Middleware相当于利用此方式完善了Agent的工作流;
  • 包装模型和工具调用:中间件利用重写的wrap_model_call/awrap_model_callwrap_tool_call/awrap_tool_call方法对模型和工具的调用进行包装,将AOP引入到模型和工具的调用中,使得在调用前后添加一些额外的操作变得非常简单。比如很多中间件都具有各自的系统提示词,它们基本上都是利用重写的wrap_model_call/awrap_model_call方法的方式实现针对系统提示词的注入;
class AgentMiddleware(Generic[StateT, ContextT]):

    def before_agent(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None
    async def abefore_agent(
        self, state: StateT, runtime: Runtime[ContextT]
    ) -> dict[str, Any] | None

    def before_model(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None
    async def abefore_model(
        self, state: StateT, runtime: Runtime[ContextT]
    ) -> dict[str, Any] | None

    def after_model(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None
    async def aafter_model(
        self, state: StateT, runtime: Runtime[ContextT]
    ) -> dict[str, Any] | None

    def after_agent(self, state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None
    async def aafter_agent(
        self, state: StateT, runtime: Runtime[ContextT]
    ) -> dict[str, Any] | None

    def wrap_model_call(
        self,
        request: ModelRequest,
        handler: Callable[[ModelRequest], ModelResponse],
    ) -> ModelCallResult
    async def awrap_model_call(
        self,
        request: ModelRequest,
        handler: Callable[[ModelRequest], Awaitable[ModelResponse]],
    ) -> ModelCallResult

    def wrap_tool_call(
        self,
        request: ToolCallRequest,
        handler: Callable[[ToolCallRequest], ToolMessage | Command[Any]],
    ) -> ToolMessage | Command[Any]
    async def awrap_tool_call(
        self,
        request: ToolCallRequest,
        handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command[Any]]],
    ) -> ToolMessage | Command[Any]

1.3 Agent管道的构建

在下面的程序中,我们定义了一个FooMiddlewareBarMiddleware,并将其注册到Agent上

from langchain.agents import create_agent
from dotenv import load_dotenv
from langchain.agents.middleware.types import AgentState, ExtendedModelResponse, ModelRequest, ModelResponse
from langchain_openai import ChatOpenAI
from langchain.agents.middleware import AgentMiddleware
from langgraph.prebuilt.tool_node import ToolCallRequest
from langgraph.runtime import Runtime
from typing import  Any,Callable
from langchain_core.messages import  AIMessage, ToolMessage
from langchain_core.tools import tool
from langgraph.types import Command
from PIL import Image
import io

load_dotenv()

@tool
def get_weather(city:str) -> str:
    """Get weather information for given city"""
    return f"It's sunny today in {city}."

class BaseMiddleware(AgentMiddleware):
    def before_agent(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None:
        print(f"{self.name}.before_agent")
        return super().before_agent(state, runtime) 
    def before_model(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None:
        print(f"{self.name}.before_model")
        return super().before_model(state, runtime)
    def after_agent(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None:
        print(f"{self.name}.after_agent")
        return super().after_agent(state, runtime)
    def after_model(self, state: AgentState[Any], runtime: Runtime[None]) -> dict[str, Any] | None:
        print(f"{self.name}.after_model")
        return super().after_model(state, runtime)   
    def wrap_tool_call(self, request: ToolCallRequest, handler: Callable[[ToolCallRequest], ToolMessage | Command[Any]]) -> ToolMessage | Command[Any]:
        print(f"{self.name}.wrap_tool_call")
        return handler(request)
    def wrap_model_call(self, request: ModelRequest[None], handler: Callable[[ModelRequest[None]], ModelResponse[Any]]) -> ModelResponse[Any] | AIMessage | ExtendedModelResponse[Any]:
        print(f"{self.name}.wrap_model_call")
        return handler(request)
    
class FooMiddleware(BaseMiddleware):
    pass
class BarMiddleware(BaseMiddleware):
    pass

agent = create_agent(
    model= ChatOpenAI(model="gpt-5.2-chat"),
    tools=[get_weather],
    middleware=[FooMiddleware(), BarMiddleware()])

result = agent.invoke(input={
    "messages":[{"role":"user", "content":"What is the weather like in Suzhou?"}]
})
Image.open(io.BytesIO(agent.get_graph().draw_mermaid_png())).show()
print(result["messages"][-1].content)

我们调用Agent的get_graph方法将它转换成一个Graph对象,并调用Graph对象的draw_mermaid_png方法将图以Mermaid格式绘制成PNG图片。从下图可以看出,由于我们注册了两个中间件,所以会在状态图中添加八个节点,至于整张图前后的两个节点对应中间件的before_agentafter_agent方法,位于模型节点前后的四个节点对应中间件的before_modelafter_model方法。中间件的wrap_tool_callwrap_model_call方法会用于包装现有针对模型和工具的调用。

Alternative Text

在定义注册中间件的基类时,我们在每个方法上都添加了输出语句,以便我们在调用Agent时能够清晰地看到每个方法的调用顺序。我们的调用会产生如下的输出:

FooMiddleware.before_agent
BarMiddleware.before_agent

FooMiddleware.before_model
BarMiddleware.before_model
FooMiddleware.wrap_model_call
BarMiddleware.wrap_model_call
BarMiddleware.after_model
FooMiddleware.after_model

FooMiddleware.wrap_tool_call
BarMiddleware.wrap_tool_call

FooMiddleware.before_model
BarMiddleware.before_model
FooMiddleware.wrap_model_call
BarMiddleware.wrap_model_call
BarMiddleware.after_model
FooMiddleware.after_model

BarMiddleware.after_agent
FooMiddleware.after_agent

It’s sunny in Suzhou today. ☀️

我们可以简单分析一下这个输出体现的执行流程:

  • 对于每次Agent的调用,中间件重写的before_agent方法会在Agent执行前被调用,after_agent方法会在Agent执行后被调用;
  • 涉及两次针对LLM的调用,所以中间件的before_modelafter_modelwrap_model_call方法会被调用两次;
  • 涉及一次工具调用,所以中间件的wrap_tool_call方法会被调用一次;

2. MAF

MAF具有不同类型的AIAgent类型,这里我们只关注最常用的ChatClientAgent。它具有如下图所示的管道结构:

Alternative Text

从右到左看,整个管道由三个部分组成:

  • IChatClient管道:与LLM交互的是由LLM客户端构建的IChatClient对象,我们可以采用装饰器的设计模式为它添加任意作为装饰器的DelegatingChatClient对象。DelegatingChatClient就是IChatClient的中间件,整个IChatClient管道就是由这些中间件装饰而成的一个链式结构;
  • 输入输出增强ChatClientAgent利用注册的AIContextProvider为输入输出增强提供了一个非常灵活的机制,我们可以利用注册的AIContextProvider在调用IChatClient之前对输入消息列表、注册的工具集以及调用LLM的系统指令进行加工;同样的,在得到LLM的响应后,我们也可以利用AIContextProvider对响应消息进行加工处理。输入输出增强机制为我们提供了一个非常灵活的方式来完善Agent的工作流;
  • Agent中间件:和构建IChatClient管道的中间件类似,我们也可以利用DelegatingAIAgent作为ChatClientAgent的中间件来完善Agent的工作流。

下面的演示程序演示了如何使用上面介绍的这些对象来构建一个ChatClientAgent的管道。我们自定义了一个FakeChatClient来模拟与LLM交互的IChatClient,并定义了ChatClientMiddlewareAgentMiddleware作为IChatClient管道和Agent管道的中间件,NamedAIContextProvider作为输入输出增强的AIContextProvider。我们将它们组合在一起构建了一个ChatClientAgent,并调用RunAsync方法来运行这个Agent。

using Azure.AI.Projects;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

var agent = new FakeChatClient()
    .AsBuilder()
    .Use(inner => new ChatClientMiddleware(inner, "FooChatClientMiddleware"))
    .Use(inner => new ChatClientMiddleware(inner, "BarChatClientMiddleware"))
    .Build()
    .AsAIAgent(new ChatClientAgentOptions { AIContextProviders = [new NamedAIContextProvider("FooAIContextProvider"), new NamedAIContextProvider("BarAIContextProvider")] })
    .AsBuilder()
    .Use(inner => new AgentMiddleware(inner, "FooAgentMiddleware"))
    .Use(inner => new AgentMiddleware(inner, "BarAgentMiddleware"))
    .Build()
    ;
await agent.RunAsync();

class FakeChatClient : IChatClient
{
    public void Dispose() { }
    public Task<ChatResponse> GetResponseAsync(
      IEnumerable<ChatMessage> messages, 
      ChatOptions? options = null, 
      CancellationToken cancellationToken = default)
    {
        Console.WriteLine("FakeChatClient.GetResponseAsync");
        return Task.FromResult(new ChatResponse(new[] { new ChatMessage(ChatRole.Assistant, "Fake response") }));
    }
    public object? GetService(Type serviceType, object? serviceKey = null)=>null;
    public IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseAsync(
      IEnumerable<ChatMessage> messages, 
      ChatOptions? options = null,
      CancellationToken cancellationToken = default)
    => throw new NotImplementedException();
}


class ChatClientMiddleware(IChatClient innerClient, string name) : DelegatingChatClient(innerClient)
{
    public override Task<ChatResponse> GetResponseAsync(
      IEnumerable<ChatMessage> messages, 
      ChatOptions? options = null, 
      CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"{name}.GetResponseAsync[before]");
        var result = base.GetResponseAsync(messages, options, cancellationToken);
        Console.WriteLine($"{name}.GetResponseAsync[after]");
        return result;
    }
}

class AgentMiddleware(AIAgent innerAgent, string name) : DelegatingAIAgent(innerAgent)
{        
    protected override Task<AgentResponse> RunCoreAsync(
      IEnumerable<ChatMessage> messages, 
      AgentSession? session = null, 
      AgentRunOptions? options = null, 
      CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"{name}.RunAsync[before]");
        var result = base.RunCoreAsync(messages, session, options, cancellationToken);
        Console.WriteLine($"{name}.RunAsync[after]");
        return result;
    }
}

class NamedAIContextProvider(string name) : AIContextProvider
{
    protected override ValueTask<AIContext> InvokingCoreAsync(
      InvokingContext context, 
      CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"{name}.InvokingAsync");
        return base.InvokingCoreAsync(context, cancellationToken);
    }
    protected override ValueTask InvokedCoreAsync(
      InvokedContext context, 
      CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"{name}.InvokedAsync");
        return  base.InvokedCoreAsync(context, cancellationToken);
    }
    public override IReadOnlyList<string> StateKeys => [$"{name}.{GetType().Name}"];
}

程序运行后会产生如下的输出。对照一下上面的管道图,我们会发现输出的顺序完全符合管道图所示的执行流程:

FooAgentMiddleware.RunAsync[before]
BarAgentMiddleware.RunAsync[before]

FooAIContextProvider.InvokingAsync
BarAIContextProvider.InvokingAsync

FooChatClientMiddleware.GetResponseAsync[before]
BarChatClientMiddleware.GetResponseAsync[before]

FakeChatClient.GetResponseAsync

BarChatClientMiddleware.GetResponseAsync[after]
FooChatClientMiddleware.GetResponseAsync[after]

FooAIContextProvider.InvokedAsync
BarAIContextProvider.InvokedAsync

BarAgentMiddleware.RunAsync[after]
FooAgentMiddleware.RunAsync[after]

我的博文系列“MAF的Agent管道详解”提供了对MAF的Agent管道的详细解读,感兴趣的读者可以前往阅读。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐