从项目中学习 NDJSON 流式协议

本文基于 X-Chat 项目中的 AI 流式回复链路整理。项目由 Vue3 前端、Spring Boot 主后端、FastAPI AI 服务组成。本文重点讲清楚一个问题:Python AI 服务如何一边生成答案,一边把内容传给 Java 后端,再实时显示到前端页面上?


1. 先看项目背景

X-Chat 是一个即时通讯项目,同时接入了一个独立的 AI 服务。它的整体架构大致是:

Vue3 前端
  -> Spring Boot 主后端
    -> 用户鉴权 / 会话管理 / 消息落库 / WebSocket 推送
    -> HTTP 调用 FastAPI AI 服务 /chat/stream
      -> 意图判断
      -> RAG 检索 / Agent 调用
      -> NDJSON 流式返回

也就是说,前端不会直接访问 Python AI 服务。前端只和 Java 后端通信;Java 后端再根据业务权限、会话、群聊等信息去调用 Python。

这样设计有两个好处:

  1. 业务权限更安全:群成员校验、聊天会话校验都在 Java 后端完成,避免前端绕过 Java 直接访问 AI 服务。
  2. AI 服务更独立:Python 只专心处理 RAG、Agent、模型调用和流式输出,Java 继续负责主业务。

在这个项目里,AI 流式回复的核心协议就是:NDJSON


2. NDJSON 是什么?

NDJSON 全称是 Newline Delimited JSON,可以翻译成“换行分隔的 JSON”。

它的格式非常简单:

{"type":"delta","delta":"你好"}
{"type":"delta","delta":",我是"}
{"type":"delta","delta":" X-Chat AI"}
{"type":"done","content":"你好,我是 X-Chat AI"}

每一行都是一个完整的 JSON 对象,行与行之间用换行符 \n 分隔。

这和普通 JSON 最大的区别是:普通 JSON 通常要等整个数据结构完整之后才能解析,比如:

[
  {"type":"delta","delta":"你好"},
  {"type":"delta","delta":",我是"},
  {"type":"done","content":"你好,我是 X-Chat AI"}
]

这种数组形式必须等最后的 ] 出现,整体才是合法 JSON。对于 AI 回复来说,这就不够友好,因为模型可能要几秒钟才生成完整答案。

而 NDJSON 是“一行一个事件”。服务端每生成一点内容,就可以立刻发一行;客户端读到一行,就可以立刻处理一行。这样用户看到的就是类似 ChatGPT 的“打字机效果”。


3. 为什么这个项目适合用 NDJSON?

AI 回复天然是流式的。

用户问一句话,模型不是一次性吐出完整答案,而是不断生成 token 或文本片段。项目希望前端可以边生成边展示,而不是等模型完全答完才显示。

因此,Python AI 服务不能只返回一个普通 JSON:

{"content":"完整回答"}

因为这种方式必须等完整回答生成完。

更适合的方式是:

{"type":"delta","delta":"第一小段"}
{"type":"delta","delta":"第二小段"}
{"type":"delta","delta":"第三小段"}
{"type":"done","content":"完整回答"}

这正是 NDJSON 的优势:

  • 结构简单;
  • 每行独立可解析;
  • 适合 HTTP 长连接;
  • Java、Python、JavaScript 都很好处理;
  • 不需要前后端直接建立 AI 专用 WebSocket。

4. 项目里的完整调用链路

在 X-Chat 中,一次 AI 回复大致经历下面几步:

用户发送消息给 X-Chat AI
-> Vue 调用 Java 后端 /api/chat/sendMessage
-> Java 判断这是发给 AI 机器人的消息
-> 用户消息先落库并推送
-> 事务提交后,Java 异步调用 Python /chat/stream
-> Python 一行一行返回 NDJSON 事件
-> Java 用 OkHttp 按行读取
-> Java 把事件转换成 WebSocket 消息
-> Vue 根据 messageId 更新同一条 AI 消息

这里要注意一个关键点:

NDJSON 只存在于 Python 到 Java 这一段。Java 到前端不是 NDJSON,而是 WebSocket 事件。

也就是说:

Python FastAPI  --NDJSON over HTTP-->  Java Spring Boot  --WebSocket-->  Vue 前端

5. Python 端:如何输出 NDJSON?

对应文件:

X-RAG Agent/api.py

Python 的流式接口是:

@app.post("/chat/stream")
def chat_stream(request: ChatStreamRequest):
    return StreamingResponse(
        stream_xchat_answer(request),
        media_type="application/x-ndjson; charset=utf-8",
    )

这里有两个重点。

第一个重点是 StreamingResponse

普通接口一般是先算完结果,再一次性返回。但 StreamingResponse 可以接收一个生成器。生成器每 yield 一段内容,FastAPI 就可以把这一段内容写回 HTTP 响应里。

第二个重点是 media_type

media_type="application/x-ndjson; charset=utf-8"

这表示接口返回的是 NDJSON,并且使用 UTF-8 编码。

项目里还有一个非常关键的函数:

def encode_event(payload: dict) -> str:
    return json.dumps(payload, ensure_ascii=False) + "\n"

这个函数做了两件事:

  1. 把 Python 字典转成 JSON 字符串;
  2. 在末尾加上 \n

其中 \n 就是 NDJSON 的核心。没有这个换行,Java 后端就没法稳定地按行读取一个个事件。


6. Python 端:流式事件有哪些?

这个项目中常见的 NDJSON 事件有五类。

6.1 delta:增量文本

{"type":"delta","delta":"回答片段"}

delta 表示 AI 新生成的一小段文本。Java 收到后会立刻推给前端,前端把它追加到当前 AI 消息后面。

6.2 done:回答完成

{"type":"done","content":"完整回答"}

done 表示本次 AI 回复结束。它通常会带上完整内容,方便 Java 后端最终落库。

6.3 error:错误事件

{"type":"error","message":"错误原因"}

如果 Python 侧模型调用、知识库检索或其他逻辑出错,就可以通过 error 告诉 Java。Java 再把占位消息更新成用户可读的错误提示。

6.4 tool_call:工具开始执行

{
  "type": "tool_call",
  "tool_name": "rag_summarize",
  "display_name": "知识库检索",
  "status": "start",
  "message": "正在检索知识库"
}

这个事件表示 AI 准备调用某个工具,例如知识库检索、意图判断等。

6.5 tool_result:工具执行结果

{
  "type": "tool_result",
  "tool_name": "rag_summarize",
  "display_name": "知识库检索",
  "status": "success",
  "summary": "命中 3 条知识库片段"
}

这个事件表示工具执行完成,并返回一个摘要。前端可以把它展示成“执行轨迹”,让用户知道 AI 不是凭空回答,而是先进行了检索或判断。


7. Python 端:普通 Agent 回复是怎么流式输出的?

项目里的普通 Agent 流式输出逻辑可以简化成这样:

def stream_agent(prompt: str):
    full_text = []
    try:
        for delta in get_agent().execute_stream(prompt):
            if not delta:
                continue

            full_text.append(delta)
            yield encode_event({"type": "delta", "delta": delta})

        yield encode_event({"type": "done", "content": "".join(full_text)})
    except Exception as exc:
        yield encode_event({"type": "error", "message": str(exc)})

这段代码的思路非常清楚:

  1. 模型每生成一小段 delta
  2. Python 就马上 yield 一个 delta 事件;
  3. 同时用 full_text 累计完整答案;
  4. 生成结束后,再发一个 done 事件;
  5. 如果异常,则发 error 事件。

换成更直观的例子,假设模型生成了:

你好,我是 X-Chat AI。

Python 可能实际返回:

{"type":"delta","delta":"你好"}
{"type":"delta","delta":",我是"}
{"type":"delta","delta":" X-Chat AI。"}
{"type":"done","content":"你好,我是 X-Chat AI。"}

8. Python 端:RAG 问答为什么还有 tool_call 和 tool_result?

X-Chat 不只是普通 AI 聊天,它还支持知识库问答。比如用户问:

项目里的 Netty WebSocket 是怎么推送消息的?

这类问题可能需要先查知识库,再组织答案。项目中的 RAG 链路大致是:

意图判断
-> 知识库检索
-> 命中片段
-> 生成回答
-> 返回来源和完整答案

因此,Python 不只返回文本,还会返回工具事件。例如:

{"type":"tool_call","tool_name":"intent_router","display_name":"意图判断","status":"start","message":"正在判断应该使用哪类能力"}
{"type":"tool_result","tool_name":"intent_router","display_name":"意图判断","status":"success","summary":"识别为:知识库问答"}
{"type":"tool_call","tool_name":"rag_summarize","display_name":"知识库检索","status":"start","message":"正在检索知识库"}
{"type":"tool_result","tool_name":"rag_summarize","display_name":"知识库检索","status":"success","summary":"命中 3 条知识库片段"}
{"type":"delta","delta":"根据知识库资料,Netty WebSocket 在项目中负责..."}
{"type":"done","content":"根据知识库资料,Netty WebSocket 在项目中负责..."}

这样前端就可以展示类似:

正在判断意图...
识别为:知识库问答
正在检索知识库...
命中 3 条知识库片段
AI 正在回答...

对于用户来说,这种过程展示会更可信,也更容易理解 AI 为什么这么回答。


9. Java 端:如何按行读取 NDJSON?

对应文件:

src/main/java/com/xchat/backend/ai/AiFastApiClient.java

Java 后端通过 OkHttp 调用 Python:

Request request = new Request.Builder()
        .url(buildUrl("/chat/stream"))
        .post(RequestBody.create(json, JSON))
        .build();

try (Response response = client.newCall(request).execute()) {
    ResponseBody body = response.body();
    readNdjson(body.source(), listener);
}

真正处理 NDJSON 的逻辑可以简化成这样:

private void readNdjson(BufferedSource source, StreamListener listener) throws IOException {
    String line;
    while ((line = source.readUtf8Line()) != null) {
        if (line.isBlank()) {
            continue;
        }

        JsonNode node = objectMapper.readTree(line);
        String type = node.path("type").asText();

        if ("delta".equals(type)) {
            listener.onDelta(node.path("delta").asText(""));
        } else if ("tool_call".equals(type)) {
            listener.onToolCall(node);
        } else if ("tool_result".equals(type)) {
            listener.onToolResult(node);
        } else if ("done".equals(type)) {
            listener.onDone(node.path("content").asText(""));
        } else if ("error".equals(type)) {
            throw new IOException(node.path("message").asText("AI service error"));
        }
    }
}

这段代码就是 NDJSON 协议在 Java 端的核心。

它的处理步骤是:

  1. readUtf8Line() 读取一行;
  2. 空行跳过;
  3. ObjectMapper 把这一行解析成 JSON;
  4. 根据 type 分发给不同回调;
  5. 每读到一个 delta 就立即通知上层。

这里有一个容易忽略的点:

Java 不是等 Python 整个响应结束后才处理,而是 Python 发一行,Java 就可以读一行。

这正是流式效果成立的关键。


10. Java 端:为什么还要把 NDJSON 转成 WebSocket?

因为前端没有直接连接 Python 服务。前端连接的是 Java 后端的 WebSocket。

所以 Java 收到 Python 的 NDJSON 事件后,需要转换成前端能识别的 WebSocket 事件。

对应文件:

src/main/java/com/xchat/backend/ai/AiChatService.java

核心逻辑可以简化成:

aiFastApiClient.streamChat(request, new AiFastApiClient.StreamListener() {
    @Override
    public void onDelta(String delta) {
        contentBuilder.append(delta);
        pushAiDelta(userId, replyMessage.getMessageId(), delta);
    }

    @Override
    public void onDone(String content) {
        doneContent[0] = content;
    }

    @Override
    public void onToolCall(JsonNode event) {
        pushAiToolEvent(userId, replyMessage.getMessageId(), "ai_tool_call", event);
    }

    @Override
    public void onToolResult(JsonNode event) {
        pushAiToolEvent(userId, replyMessage.getMessageId(), "ai_tool_result", event);
    }
});

这里可以看到映射关系:

Python NDJSON 事件 Java 回调 前端 WebSocket 事件
delta onDelta ai_stream_delta
done onDone ai_stream_done
error 抛异常后处理 ai_stream_error
tool_call onToolCall ai_tool_call
tool_result onToolResult ai_tool_result

也就是说,Java 后端在这里扮演了一个“协议转换器”的角色:

NDJSON 事件 -> Java 回调 -> WebSocket 事件

11. Java 端:为什么要先创建一条 AI 占位消息?

在真正开始读取 Python 流之前,Java 会先创建一条 AI 回复消息。此时这条消息可能还是空内容,它的作用是给前端一个稳定的 messageId

然后 Java 先推送:

{"type":"ai_stream_start","data":{"messageId":123,"messageContent":""}}

前端收到后,先在聊天窗口里显示一个空的 AI 气泡。

后面每次收到 delta,Java 都带着同一个 messageId 推给前端:

{"type":"ai_stream_delta","data":{"messageId":123,"delta":"你好"}}
{"type":"ai_stream_delta","data":{"messageId":123,"delta":",我是"}}
{"type":"ai_stream_delta","data":{"messageId":123,"delta":" X-Chat AI"}}

前端根据 messageId=123 找到同一条消息,并把内容不断追加上去。

这样做有一个非常重要的好处:

多个 delta 不会变成多条聊天气泡,而是合并成同一条 AI 消息。

如果没有占位消息和 messageId,前端可能会显示成这样:

AI:你好
AI:,我是
AI: X-Chat AI

这是错误体验。

正确体验应该是:

AI:你好,我是 X-Chat AI

12. 前端:如何把 delta 追加到同一条消息?

对应文件:

frontend/src/api/websocket.js
frontend/src/stores/chatStore.js

前端收到 ai_stream_start 时,把空消息加入消息列表:

if (data.type === 'ai_stream_start') {
  chatStore.addMessage(data.data)
}

收到 ai_stream_delta 时,不新增消息,而是追加内容:

if (data.type === 'ai_stream_delta') {
  chatStore.appendAiDelta(data.data.messageId, data.data.delta)
}

appendAiDelta 的核心逻辑是:

appendAiDelta(messageId, delta) {
  if (!delta) return

  const target = this.findMessageById(messageId)
  if (!target) return

  target.messageContent = (target.messageContent || '') + delta
  target.status = 0
}

通过这段代码理解流式 UI:

  1. 根据 messageId 找到那条 AI 占位消息;
  2. 把新来的 delta 拼到 messageContent 后面;
  3. Vue 响应式数据变化,页面自动刷新;
  4. 用户就看到了“文字逐渐出现”的效果。

13. 一次完整示例

假设用户问:

知识库中 Netty 是什么?

13.1 Vue 调用 Java

前端先调用普通发送消息接口:

POST /api/chat/sendMessage

此时前端并不直接请求 Python。

13.2 Java 识别这是 AI 消息

Java 发现接收人是固定 AI 联系人,于是:

  1. 先保存用户消息;
  2. 推送用户消息;
  3. 等事务提交后,异步启动 AI 回复。

这样即使 AI 服务慢,也不会阻塞用户发送消息。

13.3 Java 请求 Python

Java 向 Python /chat/stream 发送请求体,大致如下:

{
  "mode": "chat",
  "user_id": "10001",
  "message": "知识库中 Netty 是什么?",
  "context_messages": [],
  "scope_type": "global",
  "scope_id": ""
}

这些字段的含义是:

字段 含义
mode AI 工作模式,例如普通聊天、上下文问答、聊天总结
user_id 当前用户 ID
message 用户问题
context_messages 最近聊天上下文
scope_type 知识库范围,例如全局或群聊
scope_id 具体群 ID,或全局为空

13.4 Python 返回 NDJSON

Python 可能返回:

{"type":"tool_call","tool_name":"intent_router","display_name":"意图判断","status":"start","message":"正在判断应该使用哪类能力"}
{"type":"tool_result","tool_name":"intent_router","display_name":"意图判断","status":"success","summary":"识别为:知识库问答"}
{"type":"tool_call","tool_name":"rag_summarize","display_name":"知识库检索","status":"start","message":"正在检索知识库"}
{"type":"tool_result","tool_name":"rag_summarize","display_name":"知识库检索","status":"success","summary":"命中 3 条知识库片段"}
{"type":"delta","delta":"根据知识库资料,Netty 是一个高性能网络通信框架..."}
{"type":"done","content":"根据知识库资料,Netty 是一个高性能网络通信框架..."}

13.5 Java 转成 WebSocket

Java 再推给前端:

{"type":"ai_stream_start","data":{"messageId":123,"messageContent":""}}
{"type":"ai_tool_call","data":{"messageId":123,"displayName":"意图判断","status":"start"}}
{"type":"ai_tool_result","data":{"messageId":123,"displayName":"意图判断","summary":"识别为:知识库问答"}}
{"type":"ai_tool_call","data":{"messageId":123,"displayName":"知识库检索","status":"start"}}
{"type":"ai_tool_result","data":{"messageId":123,"displayName":"知识库检索","summary":"命中 3 条知识库片段"}}
{"type":"ai_stream_delta","data":{"messageId":123,"delta":"根据知识库资料,Netty 是一个高性能网络通信框架..."}}
{"type":"ai_stream_done","data":{"messageId":123,"messageContent":"根据知识库资料,Netty 是一个高性能网络通信框架..."}}

13.6 Vue 更新页面

前端最终做三件事:

  1. ai_stream_start:创建空 AI 气泡;
  2. ai_stream_delta:不断追加文本;
  3. ai_stream_done:用后端最终保存的消息覆盖本地临时消息。

最终用户看到的就是一条逐步生成的 AI 回复。


14. 这里为什么不用普通 HTTP JSON?

普通 HTTP JSON 当然也能实现 AI 回复,但用户体验会差一些。

普通 JSON 的流程是:

用户提问
-> 后端等待 AI 生成完整答案
-> 一次性返回完整 JSON
-> 前端显示答案

这意味着用户在等待过程中什么也看不到。

NDJSON 的流程是:

用户提问
-> 后端收到第一段内容
-> 前端显示第一段
-> 后端收到第二段内容
-> 前端追加第二段
-> ...
-> 最终完成

这会让用户感觉系统更快、更流畅。


15. 那为什么不用 SSE?

SSE,也就是 Server-Sent Events,也常用于服务端向客户端推送流式数据。它的格式通常是:

data: {"delta":"你好"}

data: {"delta":",我是"}

SSE 很适合浏览器直接接收服务端流式事件。

但在这个项目里,前端并不直接请求 Python,而是 Java 后端作为中间层:

Python -> Java -> Vue

Java 只需要简单地按行读取 Python 的事件,然后再转 WebSocket。NDJSON 在这种“服务端到服务端”的场景里非常轻量,处理起来也很直接。

所以这个项目的选择是合理的:

  • Python 到 Java:用 NDJSON,简单、轻量、易解析;
  • Java 到 Vue:用已有 Netty WebSocket,和聊天系统实时推送能力复用。

16. 那为什么不让 Python 直接 WebSocket 推给前端?

表面上看,Python 直接 WebSocket 给前端也可以实现流式回复。但这样会带来几个问题:

  1. 前端要同时连接 Java 和 Python 两套服务;
  2. Python 需要理解用户身份、群权限、会话权限;
  3. 消息落库和最终状态同步会变复杂;
  4. AI 服务会和业务系统强耦合。

X-Chat 的设计是让 Java 作为统一入口:

前端只信任 Java
Java 再调用 Python
Python 只负责 AI 能力

这种方式更适合业务型项目。


17. 项目里的一个细节:事务提交后再调用 AI

Java 后端并不是一收到用户消息就立刻调用 AI,而是在用户消息落库并且事务提交后才启动 AI 回复。

这个设计很重要。

如果事务还没提交,AI 就开始回复,可能出现一种尴尬情况:

用户消息写库失败了
但 AI 回复已经生成并推给前端了

这样聊天记录就会出现不一致。

所以项目中使用了类似这样的逻辑:

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
    @Override
    public void afterCommit() {
        aiChatService.scheduleChatReply(userId, messageContent);
    }
});

意思是:只有数据库事务真正提交成功后,才启动 AI 回复。


18. 项目里的另一个细节:异常也要更新占位消息

流式系统里很容易出现一种问题:前端已经创建了 AI 占位气泡,但后端调用 AI 失败了。如果不处理异常,用户就会看到一条永远空白或一直加载中的消息。

项目里的做法是:

  1. 先创建 AI 占位消息;
  2. 如果 Python 正常返回,就不断追加 delta;
  3. 如果 Python 抛错,就把占位消息更新成错误提示;
  4. 再通过 ai_stream_error 推给前端。

这说明流式系统不能只考虑“成功路径”,还必须考虑:

  • Python 服务未启动;
  • 模型接口失败;
  • RAG 检索失败;
  • HTTP 流中断;
  • 前端没有找到对应 messageId。

19. 一个可以继续优化的点:sources 事件

Python 的 RAG 链路中会返回 sources 事件,用于携带知识库来源信息,例如文档名、片段标题等。

不过 Java 当前主要处理:

delta / done / error / tool_call / tool_result

对于未知事件会忽略。因此如果想进一步优化前端展示,可以扩展一条链路:

Python sources
-> Java onSources
-> WebSocket ai_sources
-> 前端挂到同一条 message 的 sources 字段
-> 页面展示引用来源

这样用户不仅能看到“命中 3 条知识库片段”,还能看到具体引用来自哪些文档。

这是一个很适合作为后续功能增强的小任务。


20. 项目重点学到了什么?

通过这个项目,可以把 NDJSON 流式协议理解成一句话:

服务端把多个 JSON 事件按行输出,客户端按行读取并立即处理,从而实现流式效果。

在 X-Chat 中,它的落地方式是:

Python:StreamingResponse + yield + 每行 JSON + \n
Java:OkHttp + readUtf8Line + ObjectMapper + listener 回调

前端:WebSocket + messageId + 追加 delta 到同一条消息

整个链路最核心的不是某一个框架,而是这几个设计思想:

  1. 一行一个事件:每条 JSON 都能独立解析;
  2. 事件类型分发:通过 type 区分 delta、done、error、tool_call、tool_result;
  3. 后端协议转换:Java 把 Python 的 NDJSON 转成前端 WebSocket;
  4. 同一消息聚合:前端通过 messageId 把多个增量合成一条 AI 回复;
  5. 最终状态落库:done 后使用完整内容保存,保证刷新后聊天记录一致;
  6. 异常兜底:流式过程中失败,也要更新前端状态。

21. 总结

NDJSON 并不复杂,它甚至可以说是最简单的流式协议之一:

JSON + 换行

但放到真实项目里,它能解决一个很实际的问题:AI 回复生成慢,用户不想等到最后才看到结果。

X-Chat 项目中,Python FastAPI 使用 StreamingResponse 一行一行返回 NDJSON;Java 后端使用 OkHttp 按行读取,并把事件转换成 WebSocket;Vue 前端根据 messageId 把增量文本追加到同一条消息里。

这套设计既保持了 AI 服务和业务系统的解耦,又实现了自然的流式聊天体验。

最后用一张简化图收尾:

用户提问
  ↓
Vue 发送消息
  ↓
Spring Boot 落库 + 异步调用 AI
  ↓
FastAPI 返回 NDJSON:
  {"type":"delta","delta":"..."}\n
  {"type":"done","content":"..."}\n
  ↓
Spring Boot 按行读取并转 WebSocket
  ↓
Vue 按 messageId 追加文本
  ↓
用户看到流式 AI 回复

核心模型: 生成一点,发送一行;读取一行,更新一次。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐