AI 的“打字机效果”到底怎么实现?从我的聊天项目说起
从项目中学习 NDJSON 流式协议
本文基于 X-Chat 项目中的 AI 流式回复链路整理。项目由 Vue3 前端、Spring Boot 主后端、FastAPI AI 服务组成。本文重点讲清楚一个问题:Python AI 服务如何一边生成答案,一边把内容传给 Java 后端,再实时显示到前端页面上?
1. 先看项目背景
X-Chat 是一个即时通讯项目,同时接入了一个独立的 AI 服务。它的整体架构大致是:
Vue3 前端
-> Spring Boot 主后端
-> 用户鉴权 / 会话管理 / 消息落库 / WebSocket 推送
-> HTTP 调用 FastAPI AI 服务 /chat/stream
-> 意图判断
-> RAG 检索 / Agent 调用
-> NDJSON 流式返回
也就是说,前端不会直接访问 Python AI 服务。前端只和 Java 后端通信;Java 后端再根据业务权限、会话、群聊等信息去调用 Python。
这样设计有两个好处:
- 业务权限更安全:群成员校验、聊天会话校验都在 Java 后端完成,避免前端绕过 Java 直接访问 AI 服务。
- AI 服务更独立:Python 只专心处理 RAG、Agent、模型调用和流式输出,Java 继续负责主业务。
在这个项目里,AI 流式回复的核心协议就是:NDJSON。
2. NDJSON 是什么?
NDJSON 全称是 Newline Delimited JSON,可以翻译成“换行分隔的 JSON”。
它的格式非常简单:
{"type":"delta","delta":"你好"}
{"type":"delta","delta":",我是"}
{"type":"delta","delta":" X-Chat AI"}
{"type":"done","content":"你好,我是 X-Chat AI"}
每一行都是一个完整的 JSON 对象,行与行之间用换行符 \n 分隔。
这和普通 JSON 最大的区别是:普通 JSON 通常要等整个数据结构完整之后才能解析,比如:
[
{"type":"delta","delta":"你好"},
{"type":"delta","delta":",我是"},
{"type":"done","content":"你好,我是 X-Chat AI"}
]
这种数组形式必须等最后的 ] 出现,整体才是合法 JSON。对于 AI 回复来说,这就不够友好,因为模型可能要几秒钟才生成完整答案。
而 NDJSON 是“一行一个事件”。服务端每生成一点内容,就可以立刻发一行;客户端读到一行,就可以立刻处理一行。这样用户看到的就是类似 ChatGPT 的“打字机效果”。
3. 为什么这个项目适合用 NDJSON?
AI 回复天然是流式的。
用户问一句话,模型不是一次性吐出完整答案,而是不断生成 token 或文本片段。项目希望前端可以边生成边展示,而不是等模型完全答完才显示。
因此,Python AI 服务不能只返回一个普通 JSON:
{"content":"完整回答"}
因为这种方式必须等完整回答生成完。
更适合的方式是:
{"type":"delta","delta":"第一小段"}
{"type":"delta","delta":"第二小段"}
{"type":"delta","delta":"第三小段"}
{"type":"done","content":"完整回答"}
这正是 NDJSON 的优势:
- 结构简单;
- 每行独立可解析;
- 适合 HTTP 长连接;
- Java、Python、JavaScript 都很好处理;
- 不需要前后端直接建立 AI 专用 WebSocket。
4. 项目里的完整调用链路
在 X-Chat 中,一次 AI 回复大致经历下面几步:
用户发送消息给 X-Chat AI
-> Vue 调用 Java 后端 /api/chat/sendMessage
-> Java 判断这是发给 AI 机器人的消息
-> 用户消息先落库并推送
-> 事务提交后,Java 异步调用 Python /chat/stream
-> Python 一行一行返回 NDJSON 事件
-> Java 用 OkHttp 按行读取
-> Java 把事件转换成 WebSocket 消息
-> Vue 根据 messageId 更新同一条 AI 消息
这里要注意一个关键点:
NDJSON 只存在于 Python 到 Java 这一段。Java 到前端不是 NDJSON,而是 WebSocket 事件。
也就是说:
Python FastAPI --NDJSON over HTTP--> Java Spring Boot --WebSocket--> Vue 前端
5. Python 端:如何输出 NDJSON?
对应文件:
X-RAG Agent/api.py
Python 的流式接口是:
@app.post("/chat/stream")
def chat_stream(request: ChatStreamRequest):
return StreamingResponse(
stream_xchat_answer(request),
media_type="application/x-ndjson; charset=utf-8",
)
这里有两个重点。
第一个重点是 StreamingResponse。
普通接口一般是先算完结果,再一次性返回。但 StreamingResponse 可以接收一个生成器。生成器每 yield 一段内容,FastAPI 就可以把这一段内容写回 HTTP 响应里。
第二个重点是 media_type:
media_type="application/x-ndjson; charset=utf-8"
这表示接口返回的是 NDJSON,并且使用 UTF-8 编码。
项目里还有一个非常关键的函数:
def encode_event(payload: dict) -> str:
return json.dumps(payload, ensure_ascii=False) + "\n"
这个函数做了两件事:
- 把 Python 字典转成 JSON 字符串;
- 在末尾加上
\n。
其中 \n 就是 NDJSON 的核心。没有这个换行,Java 后端就没法稳定地按行读取一个个事件。
6. Python 端:流式事件有哪些?
这个项目中常见的 NDJSON 事件有五类。
6.1 delta:增量文本
{"type":"delta","delta":"回答片段"}
delta 表示 AI 新生成的一小段文本。Java 收到后会立刻推给前端,前端把它追加到当前 AI 消息后面。
6.2 done:回答完成
{"type":"done","content":"完整回答"}
done 表示本次 AI 回复结束。它通常会带上完整内容,方便 Java 后端最终落库。
6.3 error:错误事件
{"type":"error","message":"错误原因"}
如果 Python 侧模型调用、知识库检索或其他逻辑出错,就可以通过 error 告诉 Java。Java 再把占位消息更新成用户可读的错误提示。
6.4 tool_call:工具开始执行
{
"type": "tool_call",
"tool_name": "rag_summarize",
"display_name": "知识库检索",
"status": "start",
"message": "正在检索知识库"
}
这个事件表示 AI 准备调用某个工具,例如知识库检索、意图判断等。
6.5 tool_result:工具执行结果
{
"type": "tool_result",
"tool_name": "rag_summarize",
"display_name": "知识库检索",
"status": "success",
"summary": "命中 3 条知识库片段"
}
这个事件表示工具执行完成,并返回一个摘要。前端可以把它展示成“执行轨迹”,让用户知道 AI 不是凭空回答,而是先进行了检索或判断。
7. Python 端:普通 Agent 回复是怎么流式输出的?
项目里的普通 Agent 流式输出逻辑可以简化成这样:
def stream_agent(prompt: str):
full_text = []
try:
for delta in get_agent().execute_stream(prompt):
if not delta:
continue
full_text.append(delta)
yield encode_event({"type": "delta", "delta": delta})
yield encode_event({"type": "done", "content": "".join(full_text)})
except Exception as exc:
yield encode_event({"type": "error", "message": str(exc)})
这段代码的思路非常清楚:
- 模型每生成一小段
delta; - Python 就马上
yield一个delta事件; - 同时用
full_text累计完整答案; - 生成结束后,再发一个
done事件; - 如果异常,则发
error事件。
换成更直观的例子,假设模型生成了:
你好,我是 X-Chat AI。
Python 可能实际返回:
{"type":"delta","delta":"你好"}
{"type":"delta","delta":",我是"}
{"type":"delta","delta":" X-Chat AI。"}
{"type":"done","content":"你好,我是 X-Chat AI。"}
8. Python 端:RAG 问答为什么还有 tool_call 和 tool_result?
X-Chat 不只是普通 AI 聊天,它还支持知识库问答。比如用户问:
项目里的 Netty WebSocket 是怎么推送消息的?
这类问题可能需要先查知识库,再组织答案。项目中的 RAG 链路大致是:
意图判断
-> 知识库检索
-> 命中片段
-> 生成回答
-> 返回来源和完整答案
因此,Python 不只返回文本,还会返回工具事件。例如:
{"type":"tool_call","tool_name":"intent_router","display_name":"意图判断","status":"start","message":"正在判断应该使用哪类能力"}
{"type":"tool_result","tool_name":"intent_router","display_name":"意图判断","status":"success","summary":"识别为:知识库问答"}
{"type":"tool_call","tool_name":"rag_summarize","display_name":"知识库检索","status":"start","message":"正在检索知识库"}
{"type":"tool_result","tool_name":"rag_summarize","display_name":"知识库检索","status":"success","summary":"命中 3 条知识库片段"}
{"type":"delta","delta":"根据知识库资料,Netty WebSocket 在项目中负责..."}
{"type":"done","content":"根据知识库资料,Netty WebSocket 在项目中负责..."}
这样前端就可以展示类似:
正在判断意图...
识别为:知识库问答
正在检索知识库...
命中 3 条知识库片段
AI 正在回答...
对于用户来说,这种过程展示会更可信,也更容易理解 AI 为什么这么回答。
9. Java 端:如何按行读取 NDJSON?
对应文件:
src/main/java/com/xchat/backend/ai/AiFastApiClient.java
Java 后端通过 OkHttp 调用 Python:
Request request = new Request.Builder()
.url(buildUrl("/chat/stream"))
.post(RequestBody.create(json, JSON))
.build();
try (Response response = client.newCall(request).execute()) {
ResponseBody body = response.body();
readNdjson(body.source(), listener);
}
真正处理 NDJSON 的逻辑可以简化成这样:
private void readNdjson(BufferedSource source, StreamListener listener) throws IOException {
String line;
while ((line = source.readUtf8Line()) != null) {
if (line.isBlank()) {
continue;
}
JsonNode node = objectMapper.readTree(line);
String type = node.path("type").asText();
if ("delta".equals(type)) {
listener.onDelta(node.path("delta").asText(""));
} else if ("tool_call".equals(type)) {
listener.onToolCall(node);
} else if ("tool_result".equals(type)) {
listener.onToolResult(node);
} else if ("done".equals(type)) {
listener.onDone(node.path("content").asText(""));
} else if ("error".equals(type)) {
throw new IOException(node.path("message").asText("AI service error"));
}
}
}
这段代码就是 NDJSON 协议在 Java 端的核心。
它的处理步骤是:
readUtf8Line()读取一行;- 空行跳过;
- 用
ObjectMapper把这一行解析成 JSON; - 根据
type分发给不同回调; - 每读到一个
delta就立即通知上层。
这里有一个容易忽略的点:
Java 不是等 Python 整个响应结束后才处理,而是 Python 发一行,Java 就可以读一行。
这正是流式效果成立的关键。
10. Java 端:为什么还要把 NDJSON 转成 WebSocket?
因为前端没有直接连接 Python 服务。前端连接的是 Java 后端的 WebSocket。
所以 Java 收到 Python 的 NDJSON 事件后,需要转换成前端能识别的 WebSocket 事件。
对应文件:
src/main/java/com/xchat/backend/ai/AiChatService.java
核心逻辑可以简化成:
aiFastApiClient.streamChat(request, new AiFastApiClient.StreamListener() {
@Override
public void onDelta(String delta) {
contentBuilder.append(delta);
pushAiDelta(userId, replyMessage.getMessageId(), delta);
}
@Override
public void onDone(String content) {
doneContent[0] = content;
}
@Override
public void onToolCall(JsonNode event) {
pushAiToolEvent(userId, replyMessage.getMessageId(), "ai_tool_call", event);
}
@Override
public void onToolResult(JsonNode event) {
pushAiToolEvent(userId, replyMessage.getMessageId(), "ai_tool_result", event);
}
});
这里可以看到映射关系:
| Python NDJSON 事件 | Java 回调 | 前端 WebSocket 事件 |
|---|---|---|
delta |
onDelta |
ai_stream_delta |
done |
onDone |
ai_stream_done |
error |
抛异常后处理 | ai_stream_error |
tool_call |
onToolCall |
ai_tool_call |
tool_result |
onToolResult |
ai_tool_result |
也就是说,Java 后端在这里扮演了一个“协议转换器”的角色:
NDJSON 事件 -> Java 回调 -> WebSocket 事件
11. Java 端:为什么要先创建一条 AI 占位消息?
在真正开始读取 Python 流之前,Java 会先创建一条 AI 回复消息。此时这条消息可能还是空内容,它的作用是给前端一个稳定的 messageId。
然后 Java 先推送:
{"type":"ai_stream_start","data":{"messageId":123,"messageContent":""}}
前端收到后,先在聊天窗口里显示一个空的 AI 气泡。
后面每次收到 delta,Java 都带着同一个 messageId 推给前端:
{"type":"ai_stream_delta","data":{"messageId":123,"delta":"你好"}}
{"type":"ai_stream_delta","data":{"messageId":123,"delta":",我是"}}
{"type":"ai_stream_delta","data":{"messageId":123,"delta":" X-Chat AI"}}
前端根据 messageId=123 找到同一条消息,并把内容不断追加上去。
这样做有一个非常重要的好处:
多个 delta 不会变成多条聊天气泡,而是合并成同一条 AI 消息。
如果没有占位消息和 messageId,前端可能会显示成这样:
AI:你好
AI:,我是
AI: X-Chat AI
这是错误体验。
正确体验应该是:
AI:你好,我是 X-Chat AI
12. 前端:如何把 delta 追加到同一条消息?
对应文件:
frontend/src/api/websocket.js
frontend/src/stores/chatStore.js
前端收到 ai_stream_start 时,把空消息加入消息列表:
if (data.type === 'ai_stream_start') {
chatStore.addMessage(data.data)
}
收到 ai_stream_delta 时,不新增消息,而是追加内容:
if (data.type === 'ai_stream_delta') {
chatStore.appendAiDelta(data.data.messageId, data.data.delta)
}
appendAiDelta 的核心逻辑是:
appendAiDelta(messageId, delta) {
if (!delta) return
const target = this.findMessageById(messageId)
if (!target) return
target.messageContent = (target.messageContent || '') + delta
target.status = 0
}
通过这段代码理解流式 UI:
- 根据
messageId找到那条 AI 占位消息; - 把新来的
delta拼到messageContent后面; - Vue 响应式数据变化,页面自动刷新;
- 用户就看到了“文字逐渐出现”的效果。
13. 一次完整示例
假设用户问:
知识库中 Netty 是什么?
13.1 Vue 调用 Java
前端先调用普通发送消息接口:
POST /api/chat/sendMessage
此时前端并不直接请求 Python。
13.2 Java 识别这是 AI 消息
Java 发现接收人是固定 AI 联系人,于是:
- 先保存用户消息;
- 推送用户消息;
- 等事务提交后,异步启动 AI 回复。
这样即使 AI 服务慢,也不会阻塞用户发送消息。
13.3 Java 请求 Python
Java 向 Python /chat/stream 发送请求体,大致如下:
{
"mode": "chat",
"user_id": "10001",
"message": "知识库中 Netty 是什么?",
"context_messages": [],
"scope_type": "global",
"scope_id": ""
}
这些字段的含义是:
| 字段 | 含义 |
|---|---|
mode |
AI 工作模式,例如普通聊天、上下文问答、聊天总结 |
user_id |
当前用户 ID |
message |
用户问题 |
context_messages |
最近聊天上下文 |
scope_type |
知识库范围,例如全局或群聊 |
scope_id |
具体群 ID,或全局为空 |
13.4 Python 返回 NDJSON
Python 可能返回:
{"type":"tool_call","tool_name":"intent_router","display_name":"意图判断","status":"start","message":"正在判断应该使用哪类能力"}
{"type":"tool_result","tool_name":"intent_router","display_name":"意图判断","status":"success","summary":"识别为:知识库问答"}
{"type":"tool_call","tool_name":"rag_summarize","display_name":"知识库检索","status":"start","message":"正在检索知识库"}
{"type":"tool_result","tool_name":"rag_summarize","display_name":"知识库检索","status":"success","summary":"命中 3 条知识库片段"}
{"type":"delta","delta":"根据知识库资料,Netty 是一个高性能网络通信框架..."}
{"type":"done","content":"根据知识库资料,Netty 是一个高性能网络通信框架..."}
13.5 Java 转成 WebSocket
Java 再推给前端:
{"type":"ai_stream_start","data":{"messageId":123,"messageContent":""}}
{"type":"ai_tool_call","data":{"messageId":123,"displayName":"意图判断","status":"start"}}
{"type":"ai_tool_result","data":{"messageId":123,"displayName":"意图判断","summary":"识别为:知识库问答"}}
{"type":"ai_tool_call","data":{"messageId":123,"displayName":"知识库检索","status":"start"}}
{"type":"ai_tool_result","data":{"messageId":123,"displayName":"知识库检索","summary":"命中 3 条知识库片段"}}
{"type":"ai_stream_delta","data":{"messageId":123,"delta":"根据知识库资料,Netty 是一个高性能网络通信框架..."}}
{"type":"ai_stream_done","data":{"messageId":123,"messageContent":"根据知识库资料,Netty 是一个高性能网络通信框架..."}}
13.6 Vue 更新页面
前端最终做三件事:
ai_stream_start:创建空 AI 气泡;ai_stream_delta:不断追加文本;ai_stream_done:用后端最终保存的消息覆盖本地临时消息。
最终用户看到的就是一条逐步生成的 AI 回复。
14. 这里为什么不用普通 HTTP JSON?
普通 HTTP JSON 当然也能实现 AI 回复,但用户体验会差一些。
普通 JSON 的流程是:
用户提问
-> 后端等待 AI 生成完整答案
-> 一次性返回完整 JSON
-> 前端显示答案
这意味着用户在等待过程中什么也看不到。
NDJSON 的流程是:
用户提问
-> 后端收到第一段内容
-> 前端显示第一段
-> 后端收到第二段内容
-> 前端追加第二段
-> ...
-> 最终完成
这会让用户感觉系统更快、更流畅。
15. 那为什么不用 SSE?
SSE,也就是 Server-Sent Events,也常用于服务端向客户端推送流式数据。它的格式通常是:
data: {"delta":"你好"}
data: {"delta":",我是"}
SSE 很适合浏览器直接接收服务端流式事件。
但在这个项目里,前端并不直接请求 Python,而是 Java 后端作为中间层:
Python -> Java -> Vue
Java 只需要简单地按行读取 Python 的事件,然后再转 WebSocket。NDJSON 在这种“服务端到服务端”的场景里非常轻量,处理起来也很直接。
所以这个项目的选择是合理的:
- Python 到 Java:用 NDJSON,简单、轻量、易解析;
- Java 到 Vue:用已有 Netty WebSocket,和聊天系统实时推送能力复用。
16. 那为什么不让 Python 直接 WebSocket 推给前端?
表面上看,Python 直接 WebSocket 给前端也可以实现流式回复。但这样会带来几个问题:
- 前端要同时连接 Java 和 Python 两套服务;
- Python 需要理解用户身份、群权限、会话权限;
- 消息落库和最终状态同步会变复杂;
- AI 服务会和业务系统强耦合。
X-Chat 的设计是让 Java 作为统一入口:
前端只信任 Java
Java 再调用 Python
Python 只负责 AI 能力
这种方式更适合业务型项目。
17. 项目里的一个细节:事务提交后再调用 AI
Java 后端并不是一收到用户消息就立刻调用 AI,而是在用户消息落库并且事务提交后才启动 AI 回复。
这个设计很重要。
如果事务还没提交,AI 就开始回复,可能出现一种尴尬情况:
用户消息写库失败了
但 AI 回复已经生成并推给前端了
这样聊天记录就会出现不一致。
所以项目中使用了类似这样的逻辑:
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
aiChatService.scheduleChatReply(userId, messageContent);
}
});
意思是:只有数据库事务真正提交成功后,才启动 AI 回复。
18. 项目里的另一个细节:异常也要更新占位消息
流式系统里很容易出现一种问题:前端已经创建了 AI 占位气泡,但后端调用 AI 失败了。如果不处理异常,用户就会看到一条永远空白或一直加载中的消息。
项目里的做法是:
- 先创建 AI 占位消息;
- 如果 Python 正常返回,就不断追加 delta;
- 如果 Python 抛错,就把占位消息更新成错误提示;
- 再通过
ai_stream_error推给前端。
这说明流式系统不能只考虑“成功路径”,还必须考虑:
- Python 服务未启动;
- 模型接口失败;
- RAG 检索失败;
- HTTP 流中断;
- 前端没有找到对应 messageId。
19. 一个可以继续优化的点:sources 事件
Python 的 RAG 链路中会返回 sources 事件,用于携带知识库来源信息,例如文档名、片段标题等。
不过 Java 当前主要处理:
delta / done / error / tool_call / tool_result
对于未知事件会忽略。因此如果想进一步优化前端展示,可以扩展一条链路:
Python sources
-> Java onSources
-> WebSocket ai_sources
-> 前端挂到同一条 message 的 sources 字段
-> 页面展示引用来源
这样用户不仅能看到“命中 3 条知识库片段”,还能看到具体引用来自哪些文档。
这是一个很适合作为后续功能增强的小任务。
20. 项目重点学到了什么?
通过这个项目,可以把 NDJSON 流式协议理解成一句话:
服务端把多个 JSON 事件按行输出,客户端按行读取并立即处理,从而实现流式效果。
在 X-Chat 中,它的落地方式是:
Python:StreamingResponse + yield + 每行 JSON + \n
Java:OkHttp + readUtf8Line + ObjectMapper + listener 回调
前端:WebSocket + messageId + 追加 delta 到同一条消息
整个链路最核心的不是某一个框架,而是这几个设计思想:
- 一行一个事件:每条 JSON 都能独立解析;
- 事件类型分发:通过
type区分 delta、done、error、tool_call、tool_result; - 后端协议转换:Java 把 Python 的 NDJSON 转成前端 WebSocket;
- 同一消息聚合:前端通过
messageId把多个增量合成一条 AI 回复; - 最终状态落库:done 后使用完整内容保存,保证刷新后聊天记录一致;
- 异常兜底:流式过程中失败,也要更新前端状态。
21. 总结
NDJSON 并不复杂,它甚至可以说是最简单的流式协议之一:
JSON + 换行
但放到真实项目里,它能解决一个很实际的问题:AI 回复生成慢,用户不想等到最后才看到结果。
X-Chat 项目中,Python FastAPI 使用 StreamingResponse 一行一行返回 NDJSON;Java 后端使用 OkHttp 按行读取,并把事件转换成 WebSocket;Vue 前端根据 messageId 把增量文本追加到同一条消息里。
这套设计既保持了 AI 服务和业务系统的解耦,又实现了自然的流式聊天体验。
最后用一张简化图收尾:
用户提问
↓
Vue 发送消息
↓
Spring Boot 落库 + 异步调用 AI
↓
FastAPI 返回 NDJSON:
{"type":"delta","delta":"..."}\n
{"type":"done","content":"..."}\n
↓
Spring Boot 按行读取并转 WebSocket
↓
Vue 按 messageId 追加文本
↓
用户看到流式 AI 回复
核心模型: 生成一点,发送一行;读取一行,更新一次。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)