基于 AgentScope Java SDK 的 AG-UI 独立示例项目,深度拆解 AI Agent 前端通信的协议设计、状态同步与中断机制。

一、引言

当我们在浏览器中与 ChatGPT、通义千问等 AI 产品对话时,最直观的体验是:回答不是一次性出现的,而是逐字逐句"流"出来的。这种流式体验背后,是 SSE(Server-Sent Events)技术在支撑。

但对于更复杂的 AI Agent 场景——Agent 不仅回复文本,还会调用工具、展示推理过程、维护会话状态——单纯的"文本流"就不够了。我们需要一种更结构化的协议,把 Agent 运行的全过程拆成不同类型的事件,让前端能够精确地渲染每一步。

AG-UI 协议正是为此而生。它定义了 Agent 与前端之间的标准事件模型,将一次 Agent 运行拆解为:生命周期事件、文本消息事件、工具调用事件、状态同步事件、推理事件等,通过 SSE 推送给前端。

本文基于 AgentScope Java SDK 的 AG-UI 独立示例项目,从一行前端代码出发,逐层追踪到后端 Agent 执行,完整拆解 AG-UI 协议的核心机制。

目标读者:有 Web 开发基础、对 AI Agent 前端集成感兴趣的开发者。

 项目地址https://github.com/redants-101/agui-standalone-demo

二、技术原理:AG-UI 协议的核心设计

2.1 为什么是 SSE 而不是 WebSocket?

在 Agent 场景中,前后端通信有一个鲜明特征:前端发起一次请求,后端持续推送事件直到运行结束。这是典型的"一次请求,流式响应"模式。

维度 SSE WebSocket
通信方向 单向(服务端 → 客户端) 双向
协议基础 基于 HTTP/1.1,无需升级 需要协议升级握手
断线重连 浏览器原生自动重连 需要自己实现
基础设施兼容 天然兼容代理、CDN、CORS 部分代理不支持
中断方式 直接断开 HTTP 连接 需要发送关闭帧

AG-UI 选择 SSE 的核心理由:Agent 运行过程中,前端不需要向服务端发送任何数据。用户消息在请求体中一次性提交,运行过程中前端只是被动接收事件。这完美匹配 SSE 的单向推送模型。

2.2 AG-UI 事件类型体系

AG-UI 协议将一次 Agent 运行拆解为以下事件类型:

RUN_STARTED                    ← 运行开始
│
├── TEXT_MESSAGE_START         ← 文本消息开始
│   ├── TEXT_MESSAGE_CONTENT   ← 文本增量(逐 token)
│   └── TEXT_MESSAGE_END       ← 文本消息结束
│
├── TOOL_CALL_START            ← 工具调用开始
│   ├── TOOL_CALL_ARGS         ← 工具参数流式到达
│   ├── TOOL_CALL_END          ← 工具调用结束
│   └── TOOL_CALL_RESULT       ← 工具执行结果
│
├── STATE_SNAPSHOT             ← 运行前完整状态快照
├── STATE_DELTA                ← 运行后状态增量(JSON Patch)
│
├── REASONING_MESSAGE_START    ← 推理过程开始
│   ├── REASONING_MESSAGE_CONTENT ← 推理增量
│   └── REASONING_MESSAGE_END  ← 推理过程结束
│
└── RUN_FINISHED               ← 运行结束

每个事件都携带 threadId 和 runId,前端可以精确地将事件归属到某次运行和某个会话。

2.3 SSE 数据格式

后端推送给前端的原始数据遵循 W3C SSE 规范:

data: {"type":"RUN_STARTED","threadId":"thread-xxx","runId":"run-xxx"}

data: {"type":"TEXT_MESSAGE_CONTENT","messageId":"msg-xxx","delta":"AG-UI"}

data: {"type":"RUN_FINISHED","threadId":"thread-xxx","runId":"run-xxx"}

每条事件格式为 data: <JSON>\n\n,两个换行符表示事件结束。

三、实践过程:从前端一行代码到后端完整链路

3.1 项目架构概览

项目采用 Spring Boot WebFlux(响应式) 后端 + 原生 HTML/JS + @ag-ui/client 前端,前后端同源部署在 8090 端口。

agui-standalone-demo/
├── src/main/java/com/example/agui/
│   ├── AguiStandaloneApplication.java   # 启动入口
│   ├── config/
│   │   ├── AgentConfiguration.java      # Agent 注册(3 个 Agent + 工厂模式)
│   │   ├── DashScopeProperties.java     # 模型 API Key 配置
│   │   └── WebFluxConfiguration.java    # 路由 + CORS + 会话管理
│   ├── web/
│   │   └── AguiStandaloneHandler.java   # 核心Handler:路由→Agent→SSE
│   ├── agent/
│   │   └── LocalDemoAgent.java          # 本地模拟Agent(无Key时兜底)
│   ├── state/
│   │   └── AguiStateStore.java          # 服务端状态仓库
│   └── tool/
│       └── DemoTools.java               # 示例工具集
└── src/main/resources/static/
    ├── index.html                        # 页面结构
    ├── app.js                           # 核心交互逻辑
    └── style.css                        # 样式

3.2 前端:一行代码发起 Run

前端通过 @ag-ui/client 的 HttpAgent 发起请求:

// 创建 Agent 客户端
const agent = new HttpAgent({
    url: "/agui/run",
    threadId: "thread-xxx",
    initialMessages: [],
    initialState: {}
});

// 订阅事件
agent.subscribe({
    onTextMessageContentEvent: ({ event }) => {
        // 文本增量到达,更新 UI
    },
    onToolCallStartEvent: ({ event }) => {
        // 工具调用开始
    }
});

// 发起运行
await agent.runAgent({
    runId: "run-xxx",
    forwardedProps: { agentId: "calculator" }
});

runAgent() 内部做了四件事:

  1. 将 messagesstateforwardedProps 组装成 RunAgentInput JSON
  2. 发起 fetch(POST, url, { body: RunAgentInput }) 请求
  3. 解析 SSE 响应流,按事件类型触发 subscriber 回调
  4. Promise 在事件流结束(RUN_FINISHED)或出错时 resolve/reject

3.3 后端:六阶段请求处理管道

后端的核心处理方法 processInput() 串联了六个阶段:

private Mono<ServerResponse> processInput(
        RunAgentInput input, ServerRequest request, String pathAgentId) {

    // 第一阶段:路由决策
    // 优先级:路径参数 > Header > forwardedProps > 默认值 > fallback
    RoutingDecision routingDecision = resolveRouting(input, request, pathAgentId);

    // 第二阶段:判断服务端记忆 + 解析 Agent 实例
    boolean hadMemoryBeforeRun = properties.isServerSideMemory()
            && (agentResolver.hasMemory(threadId)
                || threadSessionManager.getSession(threadId).isPresent());
    Agent agent = agentResolver.resolveAgent(routingDecision.agentId(), threadId);

    // 第三阶段:消息裁剪(有记忆时去重)
    RunAgentInput effectiveInput = hadMemoryBeforeRun
            ? extractLatestUserMessage(input) : input;

    // 第四阶段:打开状态会话 + 创建事件收集器
    AguiStateStore.StateSession stateSession = stateStore.openRun(...);
    AguiStateStore.TraceCollector collector = stateStore.newCollector(input);

    // 第五阶段:构建 SSE 事件流(Reactor 响应式管道)
    Flux<ServerSentEvent<String>> events = new AguiAgentAdapter(agent, adapterConfig)
            .run(effectiveInput)                              // 5.1 Agent 运行
            .timeout(properties.getRunTimeout(), Flux.defer(  // 5.2 超时兜底
                    () -> { agent.interrupt(); ... }))
            .concatMap(event ->                               // 5.3 插入状态事件
                    decorateEventFlow(event, stateSession, collector))
            .map(event -> ServerSentEvent.<String>builder()   // 5.4 编码为 SSE
                    .data(encoder.encodeToJson(event).trim()).build())
            .doOnCancel(() -> agent.interrupt());             // 5.5 客户端取消

    // 第六阶段:返回 SSE 响应
    return ServerResponse.ok()
            .contentType(MediaType.TEXT_EVENT_STREAM)
            .body(events, ServerSentEvent.class);
}

关键设计Flux<ServerSentEvent> 是惰性的——方法返回时 Agent 还未真正运行,只有当 Spring WebFlux 开始订阅这个 Flux 时,事件流才会被驱动。

3.4 多 Agent 路由:四种方式与优先级

项目注册了三个 Agent(defaultchatcalculator),支持四种路由方式:

private RoutingDecision resolveRouting(
        RunAgentInput input, ServerRequest request, String pathAgentId) {
    // 优先级 1:路径参数 → POST /agui/run/calculator
    if (pathAgentId != null && !pathAgentId.isBlank()) {
        return new RoutingDecision(pathAgentId, "path");
    }
    // 优先级 2:Header → X-Agent-Id: chat
    String headerAgentId = request.headers().firstHeader(properties.getAgentIdHeader());
    if (headerAgentId != null && !headerAgentId.isBlank()) {
        return new RoutingDecision(headerAgentId, "header");
    }
    // 优先级 3:forwardedProps → { "agentId": "calculator" }
    Object forwardedAgentId = input.getForwardedProp("agentId");
    if (forwardedAgentId != null && !String.valueOf(forwardedAgentId).isBlank()) {
        return new RoutingDecision(String.valueOf(forwardedAgentId), "forwardedProps");
    }
    // 优先级 4:默认配置 → default-agent-id: default
    // 优先级 5:兜底 → "default"
    ...
}

前端根据用户选择构造不同的请求:

function buildRouteConfig() {
    const baseUrl = "/agui/run";
    if (dom.routeMode.value === "path") {
        return { url: `${baseUrl}/${effectiveAgentId()}`, headers: {} };
    }
    if (dom.routeMode.value === "header") {
        return { url: baseUrl, headers: { "X-Agent-Id": effectiveAgentId() } };
    }
    // forwardedProps 在 runAgent() 的参数中传递
    return { url: baseUrl, headers: {} };
}

3.5 状态同步:SNAPSHOT + DELTA 的双事件模型

AG-UI 的状态同步采用"快照 + 增量"双事件模型:

  • STATE_SNAPSHOT:Run 开始时推送完整状态,前端直接替换本地状态
  • STATE_DELTA:Run 结束时推送 JSON Patch(RFC 6902),前端增量更新

后端通过 AguiStateConverter 计算增量:

// 递归对比 before 和 after,生成 JSON Patch 操作
private List<JsonPatchOperation> computeDelta(
        Map<String, Object> before, Map<String, Object> after, String basePath) {
    for (String key : allKeys) {
        if (!before.containsKey(key)) {
            operations.add(JsonPatchOperation.add(path, afterValue));    // 新增字段
        } else if (!after.containsKey(key)) {
            operations.add(JsonPatchOperation.remove(path));             // 删除字段
        } else if (!Objects.equals(beforeValue, afterValue)) {
            if (beforeValue instanceof Map && afterValue instanceof Map) {
                operations.addAll(computeDelta(...));                    // 递归深入
            } else {
                operations.add(JsonPatchOperation.replace(path, afterValue)); // 替换值
            }
        }
    }
}

递归 diff 的意义:如果 metrics 下有 10 个字段只变了 1 个,只产生 1 个 replace 操作,而不是替换整个 metrics 对象。

事件装饰逻辑在 decorateEventFlow() 中:

private Flux<AguiEvent> decorateEventFlow(AguiEvent event, ...) {
    collector.accept(event);  // 旁路收集事件摘要

    if (event instanceof AguiEvent.RunStarted) {
        // RUN_STARTED 后立即推送状态快照
        output.add(event);
        output.add(stateStore.createSnapshot(stateSession));
        return Flux.fromIterable(output);
    }
    if (event instanceof AguiEvent.RunFinished) {
        // RUN_FINISHED 前推送状态增量
        AguiEvent.StateDelta delta = stateStore.completeRun(stateSession, collector);
        output.add(delta);
        output.add(event);
        return Flux.fromIterable(output);
    }
    return Flux.just(event);  // 其他事件原样通过
}

四、遇到的问题与解决方案

4.1 会话记忆与上下文重复

问题:前端 @ag-ui/client 会在每次 runAgent() 时发送完整的消息历史(包含之前的 user + assistant 消息),而服务端 Agent 的 InMemoryMemory 也保存了同样的历史。如果两者直接叠加,第一轮对话会在上下文中出现两次。

解决方案:后端在检测到已有服务端记忆时,执行消息裁剪——只提取最后一条 user message 提交给 Agent:

private RunAgentInput extractLatestUserMessage(RunAgentInput input) {
    AguiMessage lastUserMessage = null;
    for (int index = messages.size() - 1; index >= 0; index--) {
        if ("user".equalsIgnoreCase(messages.get(index).getRole())) {
            lastUserMessage = messages.get(index);
            break;
        }
    }
    return RunAgentInput.builder()
            .threadId(input.getThreadId())
            .runId(input.getRunId())
            .messages(List.of(lastUserMessage))  // 只提交最后一条
            .build();
}

这样 Agent 看到的上下文 = InMemoryMemory 中的历史 + 本轮新消息,不会重复。

4.2 超时不是静默断开

问题:如果 Agent 运行超时,直接断开 SSE 连接会导致前端无法区分"正常结束"和"超时异常"。

解决方案:使用 Reactor 的 timeout() 操作符,超时时补发 RAW(timeout) + RUN_FINISHED 事件,而不是让连接静默断开:

.timeout(
    properties.getRunTimeout(),
    Flux.defer(() -> {
        agent.interrupt();
        return Flux.just(
            new AguiEvent.Raw(threadId, runId,
                Map.of("kind", "timeout", "error", "运行超时")),
            new AguiEvent.RunFinished(threadId, runId)
        );
    })
)

前端 subscriber 收到 RAW(kind=timeout) 后可以在事件时间线中展示超时原因。

4.3 错误也走 SSE 通道

问题:如果 Agent 不存在或请求体解析失败,传统做法是返回 HTTP 400/500 JSON 错误。但前端 @ag-ui/client 只消费 SSE 流,无法统一处理 HTTP 错误响应。

解决方案:所有错误都包装成 SSE 事件流返回:

private Mono<ServerResponse> createErrorResponse(
        String threadId, String runId, String errorMessage, String kind) {
    return ServerResponse.ok()
            .contentType(MediaType.TEXT_EVENT_STREAM)
            .body(Flux.just(
                new AguiEvent.Raw(threadId, runId,
                    Map.of("kind", kind, "error", errorMessage)),
                new AguiEvent.RunFinished(threadId, runId)
            ).map(...), ServerSentEvent.class);
}

这样前端的 subscriber 可以统一处理所有场景,无需区分 SSE 流和 HTTP 错误。

4.4 中断信号的双路径传播

问题:用户点击"停止"按钮时,需要同时中断前端请求和后端 Agent 执行。

解决方案:利用 SSE 基于 HTTP 的特性——前端断开连接后,后端自动感知:

前端 abortRun()
  └─ AbortController.abort()
       └─ 浏览器关闭 TCP 连接
            └─ 后端 WebFlux 检测到连接断开
                 └─ Reactor doOnCancel 回调
                      └─ agent.interrupt()
                           └─ interrupted.set(true)
                                └─ takeUntil() 截断事件流

前端通过 catch (AbortError) 感知中断,后端通过 doOnCancel 回调中断 Agent。两条路径同时发生,无需额外的"中断协议"。

用户中断 vs 超时中断的区别

维度 用户点击"停止" 超时中断
触发方 前端 abortRun() 后端 .timeout() 操作符
前端感知方式 catch (AbortError) 收到 SSE 事件 RAW(kind=timeout) + RUN_FINISHED
SSE 连接 立即断开 保持连接,正常发送完事件后关闭
后端 Agent 中断 doOnCancel → agent.interrupt() Flux.defer 内 → agent.interrupt()
前端能否收到 RUN_FINISHED 不能(连接已断) 能(连接还在)

五、总结与展望

核心经验

  1. SSE 是 Agent 前端通信的最佳选择:Agent 运行是"一次请求,流式响应"模式,SSE 的单向推送完美匹配,且天然兼容 HTTP 基础设施

  2. 事件类型体系是协议的灵魂:AG-UI 把 Agent 运行拆解为生命周期、文本、工具、状态、推理五类事件,前端可以精确渲染每一步,而不是只看到一个"正在生成"的光标

  3. 两层记忆需要协调:前端记忆负责 UI 展示,后端记忆负责模型上下文,两者通过 threadId 关联,通过消息裁剪避免上下文重复

  4. 错误也是事件:将错误包装成 SSE 事件而非 HTTP 错误响应,让前端可以统一处理所有场景

  5. 中断就是断开连接:不需要额外的中断协议,前端断开 TCP 连接,后端通过 Reactor 的 doOnCancel 感知并中断 Agent

项目成果

本项目作为 AG-UI 协议的教学演示,实现了 6 个递进示例,覆盖了协议的核心能力:

示例 核心事件类型 学习目标
01 curl 最小闭环 RUN_STARTED / TEXT_MESSAGE_* / RUN_FINISHED 理解 SSE 生命周期
02 HTML + @ag-ui/client TEXT_MESSAGE_CONTENT(流式增量) 浏览器消费流式文本
03 工具调用可视化 TOOL_CALL_START / ARGS / END / RESULT 工具参数流与结果展示
04 会话与状态同步 STATE_SNAPSHOT / STATE_DELTA threadId 复用与状态增量
05 多 Agent 路由 RAW(routing 元信息) 四种路由方式与优先级
06 推理/停止/超时 REASONING_MESSAGE_* / RAW(timeout) 中断与异常处理

未来优化方向

  • 持久化存储:当前状态和会话都存储在内存中,生产环境应替换为 Redis 或数据库
  • 权限控制:当前 CORS 允许所有来源,生产环境应限制为可信前端域名
  • WebSocket 支持:对于需要双向通信的场景(如 Agent 主动推送通知),可以扩展 WebSocket 传输层
  • 前端框架集成:当前使用原生 JS,可以封装 React/Vue 组件库降低集成成本
  • 多模态事件:扩展 IMAGE_MESSAGE、AUDIO_MESSAGE 等事件类型,支持多模态 Agent

相关资源

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐