[MAF的Agent管道详解-06]ChatClientAgent对IChatClient和输入输出增强管道的整合
上面我们介绍了与LLM交互的IChatClient管道、持久化对话消息的ChatHistoryProvider、以及实现输入和输出增强的AIContextProvider,接下来我们来看看ChatClientAgent是如何将它们整合在一起的。
1. ChatClientAgent的构建
如下面的代码片段所示,ChatClientAgent定义了两个构造函数,我们可以指定一个IChatClient对象、一个ChatClientAgentOptions以及可选的ILoggerFactory和IServiceProvider对象来创建一个ChatClientAgent对象。ChatClientAgentOptions承载的配置选项包括Agent的名称、描述、指令、工具集等也可以通过调用另一个构造函数利用对应的参数来提供。
public sealed class ChatClientAgent : AIAgent
{
public IChatClient ChatClient { get; }
public ChatHistoryProvider? ChatHistoryProvider { get; private set; }
public IReadOnlyList<AIContextProvider>? AIContextProviders { get; }
protected override string? IdCore { get; }
public override string? Name { get; }
public override string? Description { get; }
public string? Instructions { get; }
public ChatClientAgent(
IChatClient chatClient,
string? instructions = null,
string? name = null,
string? description = null,
IList<AITool>? tools = null,
ILoggerFactory? loggerFactory = null,
IServiceProvider? services = null);
public ChatClientAgent(
IChatClient chatClient,
ChatClientAgentOptions? options,
ILoggerFactory? loggerFactory = null,
IServiceProvider? services = null)
...
1.1 ChatClientAgentOptions
ChatClientAgent除ChatClient之外的其他属性都会从ChatClientAgentOptions中获取。如下所示的是ChatClientAgentOptions类型的定义:
public sealed class ChatClientAgentOptions
{
public string? Id { get; set; }
public string? Name { get; set; }
public string? Description { get; set; }
public ChatOptions? ChatOptions { get; set; }
public ChatHistoryProvider? ChatHistoryProvider { get; set; }
public IEnumerable<AIContextProvider>? AIContextProviders { get; set; }
public bool UseProvidedChatClientAsIs { get; set; }
public bool ClearOnChatHistoryProviderConflict { get; set; } = true;
public bool WarnOnChatHistoryProviderConflict { get; set; } = true;
public bool ThrowOnChatHistoryProviderConflict { get; set; } = true;
public bool RequirePerServiceCallChatHistoryPersistence { get; set; }
}
具体的配置选项包括:
- Id:Agent的唯一标识符,如果没有提供,系统会自动生成一个GUID字符串作为Id;
- Name:Agent的名称,用于描述Agent的功能或角色;
- Description:Agent的描述信息,用于提供更详细的Agent功能说明;
- ChatOptions:一个
ChatOptions对象,调用IChatClient时会使用的选项配置); - ChatHistoryProvider:一个
ChatHistoryProvider对象,用于管理Agent的对话历史; - AIContextProviders:一个
AIContextProvider对象的集合,用于提供Agent的上下文定制功能; - UseProvidedChatClientAsIs:一个布尔值,指示是否直接使用提供的
IChatClient对象而不进行任何包装或增强,默认为false; - RequirePerServiceCallChatHistoryPersistence:一个布尔值,指示是否要求每次服务调用都持久化
ChatHistoryProvider提供的对话历史,默认为false;
1.2 构建规则
ChatClientAgent在构建过程中会采用如下的规则:
- 如果指定的
ChatClientAgentOptions没有提供ChatHistoryProvider,会默认使用InMemoryChatHistoryProvider作为ChatHistoryProvider; - 如果指定的
ChatClientAgentOptions的UseProvidedChatClientAsIs属性被设置成True,会直接使用提供的IChatClient对象;否则会利用配置选项添加响应的中间件:
默认利用中间件对IChatClient的装饰规则如下:
- 自动添加一个
FunctionInvokingChatClient中间件来支持函数调用功能,正是这个中间件实现ReAct循环; ChatClientAgentOptions的Tools属性提供的工具集将会作为FunctionInvokingChatClient的AdditionalTools;- 如果
ChatClientAgentOptions的RequirePerServiceCallChatHistoryPersistence属性被设置成True,会自动添加一个PerServiceCallChatHistoryPersistingChatClient中间件。对于每次Agent调用,这个中间件都会确保对话历史被持久化。与此同时,会屏蔽ChatHistoryProvider针对对话历史的持久化,否则就出现了重复的持久化操作;
如果IChatClient对象已经内置了对话历史管理功能(比如采用OpenAI的Responses API),指定的ChatClientAgentOptions又同时提供了ChatHistoryProvider,此时就会产生冲突,ChatClientAgentOptions提供了三个选项来处理这种冲突:
ClearOnChatHistoryProviderConflict:是否直接将ChatClientAgentOptions的ChatHistoryProvider属性设置为Null,默认为true;WarnOnChatHistoryProviderConflict:是否输出一个警告日志,默认为true;ThrowOnChatHistoryProviderConflict:是否抛出异常,默认为true;
在如下的演示程序中,我们调用OpenAIClient的GetResponsesClient方法来获取一个ResponsesClient对象,并将它转换成一个IChatClient对象来创建ChatClientAgent。在创建ChatClientAgent时,我们提供了一个ChatClientAgentOptions对象,并指定了一个InMemoryChatHistoryProvider作为ChatHistoryProvider。
using dotenv.net;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using OpenAI;
using System.ClientModel;
DotEnv.Load();
var model = Environment.GetEnvironmentVariable("MODEL")!;
var apiKey = Environment.GetEnvironmentVariable("API_KEY")!;
var openAIUrl = Environment.GetEnvironmentVariable("OPENAI_URL")!;
var prompt = "目前的天气是晴天,气温25°C。请给我一些穿衣建议。";
var openAIClient = new OpenAIClient(
credential: new ApiKeyCredential(key: apiKey),
options: new OpenAIClientOptions
{
Endpoint = new Uri(openAIUrl)
});
var chatClient = openAIClient.GetResponsesClient()
.AsIChatClient(defaultModelId:model);
var agent = new ChatClientAgent(
chatClient:chatClient,
options: new ChatClientAgentOptions { ChatHistoryProvider = new InMemoryChatHistoryProvider() });
var session = await agent.CreateSessionAsync();
await agent.RunAsync(message: prompt,session:session);
我们调用ChatClientAgent的RunAsync方法来发送一个请求消息,此时会得到如下所示的InvalidOperationException异常,提示ChatClientAgentOptions的ChatHistoryProvider属性与IChatClient对象内置的对话历史管理功能产生了冲突。
2. 会话的创建和序列化
由ChatClientAgent创建的Session类型为ChatClientAgentSession,它在基类AgentSession的基础上增加了一个ConversationId属性,用于标识与LLM交互的会话ID。所以ChatClientAgent重写了基类的CreateSessionCoreAsync方法来创建一个ChatClientAgentSession对象,并重写了SerializeSessionCoreAsync和DeserializeSessionCoreAsync方法来支持ChatClientAgentSession对象的序列化和反序列化。它还额外提供了一个CreateSessionAsync方法来创建一个带有指定ConversationId的ChatClientAgentSession对象。
public sealed class ChatClientAgent
{
protected override ValueTask<AgentSession> CreateSessionCoreAsync(CancellationToken cancellationToken = default);
public ValueTask<AgentSession> CreateSessionAsync(
string conversationId,
CancellationToken cancellationToken = default);
protected override ValueTask<JsonElement> SerializeSessionCoreAsync(
AgentSession session,
JsonSerializerOptions? jsonSerializerOptions = null,
CancellationToken cancellationToken = default);
protected override ValueTask<AgentSession> DeserializeSessionCoreAsync(
JsonElement serializedState,
JsonSerializerOptions? jsonSerializerOptions = null,
CancellationToken cancellationToken = default)
}
public sealed class ChatClientAgentSession : AgentSession
{
public string? ConversationId{get;set;}
}
3. 执行流程
除了具有专属的AgentSession类型之外,ChatClientAgent还有一个属于直接的AgentRunOptions类型,名为ChatClientAgentRunOptions。它在基类AgentRunOptions的基础上增加了一个ChatOptions属性,返回的ChatOptions将会作为调用IChatClient的GetResponseAsync和GetStreamingResponseAsync方法的参数。另一个ChatClientFactory属性返回的Func<IChatClient, IChatClient>委托可以用来为指定的IChatClient对象装饰任意的中间件。
如果基于IChatClient的中间件适用于每次调用,就应该将它置于预先构建好的IChatClient管道中,并利用它来创建ChatClientAgent。如果中间件只适用于某些特殊的调用,ChatClientAgentRunOptions的ChatClientFactory赋予我们对它进行动态注册的机会。ChatClientAgent重新定义了一系列重载的RunAsync和RunStreamingAsync方法来接受ChatClientAgentRunOptions参数。
public sealed class ChatClientAgentRunOptions : AgentRunOptions
{
public ChatOptions? ChatOptions { get; set; }
public Func<IChatClient, IChatClient>? ChatClientFactory { get; set; }
}
public sealed class ChatClientAgent
{
public Task<AgentResponse> RunAsync(
AgentSession? session,
ChatClientAgentRunOptions? options,
CancellationToken cancellationToken = default);
public Task<AgentResponse> RunAsync(
string message,
AgentSession? session,
ChatClientAgentRunOptions? options,
CancellationToken cancellationToken = default);
public Task<AgentResponse> RunAsync(
ChatMessage message,
AgentSession? session,
ChatClientAgentRunOptions? options,
CancellationToken cancellationToken = default);
public Task<AgentResponse> RunAsync(
IEnumerable<ChatMessage> messages,
AgentSession? session,
ChatClientAgentRunOptions? options,
CancellationToken cancellationToken = default);
public IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(
AgentSession? session,
ChatClientAgentRunOptions? options,
CancellationToken cancellationToken = default);
public IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(
string message,
AgentSession? session,
ChatClientAgentRunOptions? options,
CancellationToken cancellationToken = default);
public IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(
ChatMessage message,
AgentSession? session,
ChatClientAgentRunOptions? options,
CancellationToken cancellationToken = default);
public IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(
IEnumerable<ChatMessage> messages,
AgentSession? session,
ChatClientAgentRunOptions? options,
CancellationToken cancellationToken = default);
执行ChatClientAgent真正的逻辑实现在重写的RunCoreAsync和RunCoreStreamingAsync方法中。
public sealed class ChatClientAgent
{
protected override async Task<AgentResponse> RunCoreAsync(
IEnumerable<ChatMessage> messages,
AgentSession? session = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default);
protected override IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(
IEnumerable<ChatMessage> messages,
AgentSession? session = null,
AgentRunOptions? options = null,
CancellationToken cancellationToken = default);
}
}
我们来具体说说实现在RunCoreAsync方法中的调用流程,我们将整个流程分为准备阶段、调用阶段和收尾阶段。准备阶段流程如下:
- 如果指定的session为null,则调用
CreateSessionAsync方法来创建一个新的ChatClientAgentSession对象;也就是说孤立的单次调用也会绑定一个Session; - 如果
RequirePerServiceCallChatHistoryPersistence选项为false,会利用ChatHistoryProvider加载作为当前对话历史的消息列表,并与指定的消息列表进行合并(如果为true,额外注册的PerServiceCallChatHistoryPersistingChatClient会在每次调用时完成此工作); - 根据得到的消息列表和从
ChatOptions提取的系统指令和工具集创建AIContext上下文,并基于它调用每个注册的AIContextProvider的InvokingAsync方法来定制这个上下文。被定制的系统指令和工具集最终回ChatOptions中,从而实现针对输入的增强;
如果指定的AgentRunOptions参数是一个ChatClientAgentRunOptions对象,并且它的ChatClientFactory属性不为null,会将当前IChatClient作为参数调用这个ChatClientFactory委托对它进行装饰,以得到一个新的IChatClient对象来调用;否则直接使用ChatClientAgent的ChatClient属性来调用。调用之后进入收尾阶段,如果调用过程没有发生异常,并得到作为响应结果的ChatResponse,则按照如下的流程执行:
- 如果
ChatResponse的ContinuationToken不为null,意味这得到的并非完整的结果,还有后续;在RequiresPerServiceCallChatHistoryPersistence=False的情况下(如果为True的化,额外注册的PerServiceCallChatHistoryPersistingChatClient会在每次调用时完成此工作),需要将响应返回的ConversationId保存到当前ChatClientAgentSession的ConversationId属性上; - 如果响应消息的
AuthorName属性没有设置,会设置为当前ChatClientAgent的Name属性; - 如果
RequirePerServiceCallChatHistoryPersistence选项为false,并且ChatResponse的ContinuationToken不为null或者chatOptions.AllowBackgroundResponses为True(表明响应尚未结束),此时需要:- 将响应消息列表与当前对话历史进行合并后存储到
ChatHistoryProvider中; - 调用每个注册的
AIContextProvider的InvokedAsync方法来对结果进行再加工;
- 将响应消息列表与当前对话历史进行合并后存储到
如果调用过程中发生了异常,并且RequirePerServiceCallChatHistoryPersistence选项为False,此时需要:
- 调用每个注册的
AIContextProvider的InvokedAsync方法来作响应的异常处理; - 调用每个注册的
AIContextProvider的InvokedAsync方法来作响应的异常处理;
对于RunCoreStreamingAsync方法,调用流程与RunCoreAsync方法基本一致,区别在于它会处理来自IChatClient的流式响应,并将每个响应更新(AgentResponseUpdate)逐一返回。
如果Agent执行尚未结束,响应的ChatResponse或者ChatResponseUpdate的ContiuationToken属性将会返回一个ResponseContinuationToken对象。如果希望得到后续的响应,需要指定一个ChatClientAgentRunOptions对象,并将这个ContinuationToken对象保存到ChatClientAgentRunOptions对象的ContinuationToken属性中。
对于ChatClientAgent来说,这个ContinuationToken是一个ChatClientAgentContinuationToken对象。从如下的代码可以看出,ChatClientAgentContinuationToken是对另一个ResponseContinuationToken对象的包装,并在此基础上增加了InputMessages和ResponseUpdates属性来保存之前的输入消息列表和响应更新列表,以便在获取后续响应时继续使用。
internal class ChatClientAgentContinuationToken : ResponseContinuationToken
{
internal ResponseContinuationToken InnerToken { get; }
internal IEnumerable<ChatMessage>? InputMessages { get; set; }
internal IReadOnlyList<ChatResponseUpdate>? ResponseUpdates { get; set; }
internal ChatClientAgentContinuationToken(ResponseContinuationToken innerToken);
}
4. 结构化输出
让Agent返回指定结构的输出一直是Agent开发中的一个重要需求。ChatClientAgent通过提供一系列重载的RunAsync方法来满足这个需求,这些方法允许调用者指定一个泛型参数T来表示期望的响应结果类型。ChatClientAgent会尝试将LLM的响应内容反序列化成T类型的对象,并将其封装在AgentResponse<T>对象中返回。
public sealed class ChatClientAgent : AIAgent
{
public Task<AgentResponse<T>> RunAsync<T>(
AgentSession? session,
JsonSerializerOptions? serializerOptions,
ChatClientAgentRunOptions? options,
CancellationToken cancellationToken = default);
public Task<AgentResponse<T>> RunAsync<T>(
string message, AgentSession? session,
JsonSerializerOptions? serializerOptions,
ChatClientAgentRunOptions? options,
CancellationToken cancellationToken = default);
public Task<AgentResponse<T>> RunAsync<T>(
ChatMessage message, AgentSession? session,
JsonSerializerOptions? serializerOptions,
ChatClientAgentRunOptions? options,
CancellationToken cancellationToken = default);
public Task<AgentResponse<T>> RunAsync<T>(
IEnumerable<ChatMessage> messages, AgentSession? session,
JsonSerializerOptions? serializerOptions,
ChatClientAgentRunOptions? options,
CancellationToken cancellationToken = default);
}
结构化输出的核心是对AgentRunOptions的ResponseFormat属性进行配置,该属性定义了LLM响应内容的期望格式。通过设置ResponseFormat,开发者可以指定响应应如何被序列化和反序列化,从而确保Agent返回的数据结构符合预期。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)