从零理解 AG-UI 协议:AI Agent 与前端的流式事件通信实战
基于 AgentScope Java SDK 的 AG-UI 独立示例项目,深度拆解 AI Agent 前端通信的协议设计、状态同步与中断机制。
一、引言
当我们在浏览器中与 ChatGPT、通义千问等 AI 产品对话时,最直观的体验是:回答不是一次性出现的,而是逐字逐句"流"出来的。这种流式体验背后,是 SSE(Server-Sent Events)技术在支撑。
但对于更复杂的 AI Agent 场景——Agent 不仅回复文本,还会调用工具、展示推理过程、维护会话状态——单纯的"文本流"就不够了。我们需要一种更结构化的协议,把 Agent 运行的全过程拆成不同类型的事件,让前端能够精确地渲染每一步。
AG-UI 协议正是为此而生。它定义了 Agent 与前端之间的标准事件模型,将一次 Agent 运行拆解为:生命周期事件、文本消息事件、工具调用事件、状态同步事件、推理事件等,通过 SSE 推送给前端。
本文基于 AgentScope Java SDK 的 AG-UI 独立示例项目,从一行前端代码出发,逐层追踪到后端 Agent 执行,完整拆解 AG-UI 协议的核心机制。
目标读者:有 Web 开发基础、对 AI Agent 前端集成感兴趣的开发者。
项目地址:https://github.com/redants-101/agui-standalone-demo
二、技术原理:AG-UI 协议的核心设计
2.1 为什么是 SSE 而不是 WebSocket?
在 Agent 场景中,前后端通信有一个鲜明特征:前端发起一次请求,后端持续推送事件直到运行结束。这是典型的"一次请求,流式响应"模式。
| 维度 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 单向(服务端 → 客户端) | 双向 |
| 协议基础 | 基于 HTTP/1.1,无需升级 | 需要协议升级握手 |
| 断线重连 | 浏览器原生自动重连 | 需要自己实现 |
| 基础设施兼容 | 天然兼容代理、CDN、CORS | 部分代理不支持 |
| 中断方式 | 直接断开 HTTP 连接 | 需要发送关闭帧 |
AG-UI 选择 SSE 的核心理由:Agent 运行过程中,前端不需要向服务端发送任何数据。用户消息在请求体中一次性提交,运行过程中前端只是被动接收事件。这完美匹配 SSE 的单向推送模型。
2.2 AG-UI 事件类型体系
AG-UI 协议将一次 Agent 运行拆解为以下事件类型:
RUN_STARTED ← 运行开始
│
├── TEXT_MESSAGE_START ← 文本消息开始
│ ├── TEXT_MESSAGE_CONTENT ← 文本增量(逐 token)
│ └── TEXT_MESSAGE_END ← 文本消息结束
│
├── TOOL_CALL_START ← 工具调用开始
│ ├── TOOL_CALL_ARGS ← 工具参数流式到达
│ ├── TOOL_CALL_END ← 工具调用结束
│ └── TOOL_CALL_RESULT ← 工具执行结果
│
├── STATE_SNAPSHOT ← 运行前完整状态快照
├── STATE_DELTA ← 运行后状态增量(JSON Patch)
│
├── REASONING_MESSAGE_START ← 推理过程开始
│ ├── REASONING_MESSAGE_CONTENT ← 推理增量
│ └── REASONING_MESSAGE_END ← 推理过程结束
│
└── RUN_FINISHED ← 运行结束
每个事件都携带 threadId 和 runId,前端可以精确地将事件归属到某次运行和某个会话。
2.3 SSE 数据格式
后端推送给前端的原始数据遵循 W3C SSE 规范:
data: {"type":"RUN_STARTED","threadId":"thread-xxx","runId":"run-xxx"}
data: {"type":"TEXT_MESSAGE_CONTENT","messageId":"msg-xxx","delta":"AG-UI"}
data: {"type":"RUN_FINISHED","threadId":"thread-xxx","runId":"run-xxx"}
每条事件格式为 data: <JSON>\n\n,两个换行符表示事件结束。
三、实践过程:从前端一行代码到后端完整链路
3.1 项目架构概览
项目采用 Spring Boot WebFlux(响应式) 后端 + 原生 HTML/JS + @ag-ui/client 前端,前后端同源部署在 8090 端口。
agui-standalone-demo/
├── src/main/java/com/example/agui/
│ ├── AguiStandaloneApplication.java # 启动入口
│ ├── config/
│ │ ├── AgentConfiguration.java # Agent 注册(3 个 Agent + 工厂模式)
│ │ ├── DashScopeProperties.java # 模型 API Key 配置
│ │ └── WebFluxConfiguration.java # 路由 + CORS + 会话管理
│ ├── web/
│ │ └── AguiStandaloneHandler.java # 核心Handler:路由→Agent→SSE
│ ├── agent/
│ │ └── LocalDemoAgent.java # 本地模拟Agent(无Key时兜底)
│ ├── state/
│ │ └── AguiStateStore.java # 服务端状态仓库
│ └── tool/
│ └── DemoTools.java # 示例工具集
└── src/main/resources/static/
├── index.html # 页面结构
├── app.js # 核心交互逻辑
└── style.css # 样式
3.2 前端:一行代码发起 Run
前端通过 @ag-ui/client 的 HttpAgent 发起请求:
// 创建 Agent 客户端
const agent = new HttpAgent({
url: "/agui/run",
threadId: "thread-xxx",
initialMessages: [],
initialState: {}
});
// 订阅事件
agent.subscribe({
onTextMessageContentEvent: ({ event }) => {
// 文本增量到达,更新 UI
},
onToolCallStartEvent: ({ event }) => {
// 工具调用开始
}
});
// 发起运行
await agent.runAgent({
runId: "run-xxx",
forwardedProps: { agentId: "calculator" }
});
runAgent() 内部做了四件事:
- 将
messages、state、forwardedProps组装成RunAgentInputJSON - 发起
fetch(POST, url, { body: RunAgentInput })请求 - 解析 SSE 响应流,按事件类型触发 subscriber 回调
- Promise 在事件流结束(
RUN_FINISHED)或出错时 resolve/reject
3.3 后端:六阶段请求处理管道
后端的核心处理方法 processInput() 串联了六个阶段:
private Mono<ServerResponse> processInput(
RunAgentInput input, ServerRequest request, String pathAgentId) {
// 第一阶段:路由决策
// 优先级:路径参数 > Header > forwardedProps > 默认值 > fallback
RoutingDecision routingDecision = resolveRouting(input, request, pathAgentId);
// 第二阶段:判断服务端记忆 + 解析 Agent 实例
boolean hadMemoryBeforeRun = properties.isServerSideMemory()
&& (agentResolver.hasMemory(threadId)
|| threadSessionManager.getSession(threadId).isPresent());
Agent agent = agentResolver.resolveAgent(routingDecision.agentId(), threadId);
// 第三阶段:消息裁剪(有记忆时去重)
RunAgentInput effectiveInput = hadMemoryBeforeRun
? extractLatestUserMessage(input) : input;
// 第四阶段:打开状态会话 + 创建事件收集器
AguiStateStore.StateSession stateSession = stateStore.openRun(...);
AguiStateStore.TraceCollector collector = stateStore.newCollector(input);
// 第五阶段:构建 SSE 事件流(Reactor 响应式管道)
Flux<ServerSentEvent<String>> events = new AguiAgentAdapter(agent, adapterConfig)
.run(effectiveInput) // 5.1 Agent 运行
.timeout(properties.getRunTimeout(), Flux.defer( // 5.2 超时兜底
() -> { agent.interrupt(); ... }))
.concatMap(event -> // 5.3 插入状态事件
decorateEventFlow(event, stateSession, collector))
.map(event -> ServerSentEvent.<String>builder() // 5.4 编码为 SSE
.data(encoder.encodeToJson(event).trim()).build())
.doOnCancel(() -> agent.interrupt()); // 5.5 客户端取消
// 第六阶段:返回 SSE 响应
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(events, ServerSentEvent.class);
}
关键设计:Flux<ServerSentEvent> 是惰性的——方法返回时 Agent 还未真正运行,只有当 Spring WebFlux 开始订阅这个 Flux 时,事件流才会被驱动。
3.4 多 Agent 路由:四种方式与优先级
项目注册了三个 Agent(default、chat、calculator),支持四种路由方式:
private RoutingDecision resolveRouting(
RunAgentInput input, ServerRequest request, String pathAgentId) {
// 优先级 1:路径参数 → POST /agui/run/calculator
if (pathAgentId != null && !pathAgentId.isBlank()) {
return new RoutingDecision(pathAgentId, "path");
}
// 优先级 2:Header → X-Agent-Id: chat
String headerAgentId = request.headers().firstHeader(properties.getAgentIdHeader());
if (headerAgentId != null && !headerAgentId.isBlank()) {
return new RoutingDecision(headerAgentId, "header");
}
// 优先级 3:forwardedProps → { "agentId": "calculator" }
Object forwardedAgentId = input.getForwardedProp("agentId");
if (forwardedAgentId != null && !String.valueOf(forwardedAgentId).isBlank()) {
return new RoutingDecision(String.valueOf(forwardedAgentId), "forwardedProps");
}
// 优先级 4:默认配置 → default-agent-id: default
// 优先级 5:兜底 → "default"
...
}
前端根据用户选择构造不同的请求:
function buildRouteConfig() {
const baseUrl = "/agui/run";
if (dom.routeMode.value === "path") {
return { url: `${baseUrl}/${effectiveAgentId()}`, headers: {} };
}
if (dom.routeMode.value === "header") {
return { url: baseUrl, headers: { "X-Agent-Id": effectiveAgentId() } };
}
// forwardedProps 在 runAgent() 的参数中传递
return { url: baseUrl, headers: {} };
}
3.5 状态同步:SNAPSHOT + DELTA 的双事件模型
AG-UI 的状态同步采用"快照 + 增量"双事件模型:
- STATE_SNAPSHOT:Run 开始时推送完整状态,前端直接替换本地状态
- STATE_DELTA:Run 结束时推送 JSON Patch(RFC 6902),前端增量更新
后端通过 AguiStateConverter 计算增量:
// 递归对比 before 和 after,生成 JSON Patch 操作
private List<JsonPatchOperation> computeDelta(
Map<String, Object> before, Map<String, Object> after, String basePath) {
for (String key : allKeys) {
if (!before.containsKey(key)) {
operations.add(JsonPatchOperation.add(path, afterValue)); // 新增字段
} else if (!after.containsKey(key)) {
operations.add(JsonPatchOperation.remove(path)); // 删除字段
} else if (!Objects.equals(beforeValue, afterValue)) {
if (beforeValue instanceof Map && afterValue instanceof Map) {
operations.addAll(computeDelta(...)); // 递归深入
} else {
operations.add(JsonPatchOperation.replace(path, afterValue)); // 替换值
}
}
}
}
递归 diff 的意义:如果 metrics 下有 10 个字段只变了 1 个,只产生 1 个 replace 操作,而不是替换整个 metrics 对象。
事件装饰逻辑在 decorateEventFlow() 中:
private Flux<AguiEvent> decorateEventFlow(AguiEvent event, ...) {
collector.accept(event); // 旁路收集事件摘要
if (event instanceof AguiEvent.RunStarted) {
// RUN_STARTED 后立即推送状态快照
output.add(event);
output.add(stateStore.createSnapshot(stateSession));
return Flux.fromIterable(output);
}
if (event instanceof AguiEvent.RunFinished) {
// RUN_FINISHED 前推送状态增量
AguiEvent.StateDelta delta = stateStore.completeRun(stateSession, collector);
output.add(delta);
output.add(event);
return Flux.fromIterable(output);
}
return Flux.just(event); // 其他事件原样通过
}
四、遇到的问题与解决方案
4.1 会话记忆与上下文重复
问题:前端 @ag-ui/client 会在每次 runAgent() 时发送完整的消息历史(包含之前的 user + assistant 消息),而服务端 Agent 的 InMemoryMemory 也保存了同样的历史。如果两者直接叠加,第一轮对话会在上下文中出现两次。
解决方案:后端在检测到已有服务端记忆时,执行消息裁剪——只提取最后一条 user message 提交给 Agent:
private RunAgentInput extractLatestUserMessage(RunAgentInput input) {
AguiMessage lastUserMessage = null;
for (int index = messages.size() - 1; index >= 0; index--) {
if ("user".equalsIgnoreCase(messages.get(index).getRole())) {
lastUserMessage = messages.get(index);
break;
}
}
return RunAgentInput.builder()
.threadId(input.getThreadId())
.runId(input.getRunId())
.messages(List.of(lastUserMessage)) // 只提交最后一条
.build();
}
这样 Agent 看到的上下文 = InMemoryMemory 中的历史 + 本轮新消息,不会重复。
4.2 超时不是静默断开
问题:如果 Agent 运行超时,直接断开 SSE 连接会导致前端无法区分"正常结束"和"超时异常"。
解决方案:使用 Reactor 的 timeout() 操作符,超时时补发 RAW(timeout) + RUN_FINISHED 事件,而不是让连接静默断开:
.timeout(
properties.getRunTimeout(),
Flux.defer(() -> {
agent.interrupt();
return Flux.just(
new AguiEvent.Raw(threadId, runId,
Map.of("kind", "timeout", "error", "运行超时")),
new AguiEvent.RunFinished(threadId, runId)
);
})
)
前端 subscriber 收到 RAW(kind=timeout) 后可以在事件时间线中展示超时原因。
4.3 错误也走 SSE 通道
问题:如果 Agent 不存在或请求体解析失败,传统做法是返回 HTTP 400/500 JSON 错误。但前端 @ag-ui/client 只消费 SSE 流,无法统一处理 HTTP 错误响应。
解决方案:所有错误都包装成 SSE 事件流返回:
private Mono<ServerResponse> createErrorResponse(
String threadId, String runId, String errorMessage, String kind) {
return ServerResponse.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.body(Flux.just(
new AguiEvent.Raw(threadId, runId,
Map.of("kind", kind, "error", errorMessage)),
new AguiEvent.RunFinished(threadId, runId)
).map(...), ServerSentEvent.class);
}
这样前端的 subscriber 可以统一处理所有场景,无需区分 SSE 流和 HTTP 错误。
4.4 中断信号的双路径传播
问题:用户点击"停止"按钮时,需要同时中断前端请求和后端 Agent 执行。
解决方案:利用 SSE 基于 HTTP 的特性——前端断开连接后,后端自动感知:
前端 abortRun()
└─ AbortController.abort()
└─ 浏览器关闭 TCP 连接
└─ 后端 WebFlux 检测到连接断开
└─ Reactor doOnCancel 回调
└─ agent.interrupt()
└─ interrupted.set(true)
└─ takeUntil() 截断事件流
前端通过 catch (AbortError) 感知中断,后端通过 doOnCancel 回调中断 Agent。两条路径同时发生,无需额外的"中断协议"。
用户中断 vs 超时中断的区别:
| 维度 | 用户点击"停止" | 超时中断 |
|---|---|---|
| 触发方 | 前端 abortRun() |
后端 .timeout() 操作符 |
| 前端感知方式 | catch (AbortError) |
收到 SSE 事件 RAW(kind=timeout) + RUN_FINISHED |
| SSE 连接 | 立即断开 | 保持连接,正常发送完事件后关闭 |
| 后端 Agent 中断 | doOnCancel → agent.interrupt() |
Flux.defer 内 → agent.interrupt() |
| 前端能否收到 RUN_FINISHED | 不能(连接已断) | 能(连接还在) |
五、总结与展望
核心经验
-
SSE 是 Agent 前端通信的最佳选择:Agent 运行是"一次请求,流式响应"模式,SSE 的单向推送完美匹配,且天然兼容 HTTP 基础设施
-
事件类型体系是协议的灵魂:AG-UI 把 Agent 运行拆解为生命周期、文本、工具、状态、推理五类事件,前端可以精确渲染每一步,而不是只看到一个"正在生成"的光标
-
两层记忆需要协调:前端记忆负责 UI 展示,后端记忆负责模型上下文,两者通过 threadId 关联,通过消息裁剪避免上下文重复
-
错误也是事件:将错误包装成 SSE 事件而非 HTTP 错误响应,让前端可以统一处理所有场景
-
中断就是断开连接:不需要额外的中断协议,前端断开 TCP 连接,后端通过 Reactor 的
doOnCancel感知并中断 Agent
项目成果
本项目作为 AG-UI 协议的教学演示,实现了 6 个递进示例,覆盖了协议的核心能力:
| 示例 | 核心事件类型 | 学习目标 |
|---|---|---|
| 01 curl 最小闭环 | RUN_STARTED / TEXT_MESSAGE_* / RUN_FINISHED | 理解 SSE 生命周期 |
| 02 HTML + @ag-ui/client | TEXT_MESSAGE_CONTENT(流式增量) | 浏览器消费流式文本 |
| 03 工具调用可视化 | TOOL_CALL_START / ARGS / END / RESULT | 工具参数流与结果展示 |
| 04 会话与状态同步 | STATE_SNAPSHOT / STATE_DELTA | threadId 复用与状态增量 |
| 05 多 Agent 路由 | RAW(routing 元信息) | 四种路由方式与优先级 |
| 06 推理/停止/超时 | REASONING_MESSAGE_* / RAW(timeout) | 中断与异常处理 |
未来优化方向
- 持久化存储:当前状态和会话都存储在内存中,生产环境应替换为 Redis 或数据库
- 权限控制:当前 CORS 允许所有来源,生产环境应限制为可信前端域名
- WebSocket 支持:对于需要双向通信的场景(如 Agent 主动推送通知),可以扩展 WebSocket 传输层
- 前端框架集成:当前使用原生 JS,可以封装 React/Vue 组件库降低集成成本
- 多模态事件:扩展 IMAGE_MESSAGE、AUDIO_MESSAGE 等事件类型,支持多模态 Agent
相关资源
- 项目地址:https://github.com/redants-101/agui-standalone-demo
- AgentScope Java 官方文档:https://java.agentscope.io/zh/intro.html
- AgentScope Java GitHub:https://github.com/agentscope-ai/agentscope-java
- @ag-ui/client NPM 包:https://www.npmjs.com/package/@ag-ui/client
- SSE 规范 (W3C):https://html.spec.whatwg.org/multipage/server-sent-events.html
- JSON Patch (RFC 6902):https://tools.ietf.org/html/rfc6902
- JSON Pointer (RFC 6901):https://tools.ietf.org/html/rfc6901
- Spring WebFlux 文档:https://docs.spring.io/spring-framework/reference/web/webflux.html
- Reactor 核心文档:https://projectreactor.io/docs/core/release/reference/
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)