一份基于真实事故的深度教学文档。
覆盖 stream mode、user/agent/tool 三种消息、子图到主图事件传播、过滤链设计、values vs updates。


零、前置知识(10 分钟速通)

如果你已经了解 LangGraph 的基本概念,可以直接跳到第一章。

0.1 这个系统在做什么

用户在前端发一条消息 → 后端调用 LLM 生成回复 → 回复以逐字流式(类似 ChatGPT 打字效果)推送到前端。

听起来简单,但实际系统里这条链路经过了很多环节。LangGraph 就是负责编排这些环节的框架。

0.2 什么是 LangGraph

LangGraph 是一个有状态的工作流编排框架。你可以把它理解为"带状态的流水线"。

三个核心概念

StateGraph(状态图)
  = 节点(Nodes)+ 边(Edges)+ 共享状态(State)
  • State(状态):一个共享的数据容器,在节点之间传递。比如我们系统的 ChatState 包含 messages(对话历史)、scenarios(当前场景)等字段。每个节点都可以读写它。
  • Node(节点):一个函数,接收 State,做一些事情(比如调 LLM、查数据库),返回更新后的 State。
  • Edge(边):连接节点的箭头。普通边是"A 之后执行 B",条件边是"A 之后根据 State 的某个值决定走 B 还是 C"。

类比:如果把一次对话比作一条流水线,State 就是流水线上的托盘,Node 就是每个工位(工人从托盘拿东西、加工、放回去),Edge 就是传送带。

0.3 什么是 astream()(流式执行)

执行一个图有两种方式:

# 方式 1:invoke — 全部执行完再返回结果
result = await graph.ainvoke(input)

# 方式 2:astream — 边执行边往外发事件(实时推送)
async for event in graph.astream(input):
    # 每完成一个节点、每产出一条消息,都会触发一次循环
    yield event  # 推送到前端

我们用的是 astream(),因为需要实时流式地把 LLM 的输出逐字推给前端。

astream() 的几个关键参数:

  • stream_mode:控制"往外发什么类型的事件"(后面详讲)
  • subgraphsTrue 时,子图内部的事件也会上报(这是很多问题的根源)

0.4 四种消息类型(一分钟认全)

LangChain 把对话中的每条消息都表示为一个对象:

# 用户发的消息
HumanMessage(content="帮我分析一下这个文件")

# AI 的回复(分两种形态)
AIMessageChunk(content="好")     # 流式片段:模型正在生成,每次只出一个 token
AIMessage(content="好的,我来帮你分析...")  # 完整回复:模型生成完毕后的最终版

# 工具调用结果
ToolMessage(content="查询结果:123", tool_call_id="call_xxx")

Chunk 和完整 Message 的关系:同一个 AI 回复,先以 Chunk 形式逐 token 下发(前端一个个字拼起来),最后以完整 AIMessage 下发(前端整体替换确保数据一致)。就像你在看 ChatGPT 打字,打完后再做一次最终确认。

0.5 什么是子图(Subgraph)

子图就是一个被当作节点使用的图

主图(Main Graph)
├── node_a()          ← 普通函数节点
├── node_b()          ← 普通函数节点
├── subgraph_x        ← 这本身也是一个完整的 StateGraph!
│   ├── inner_node_1()
│   ├── inner_node_2()
│   └── inner_node_3()
└── node_c()          ← 普通函数节点

为什么要用子图?当某个功能足够复杂(有自己的内部状态、自己的 LLM 调用链),把它封装成子图比在主图里堆节点更清晰。

我们系统里有几个子图:router_subgraphknowledge_subgraphinsight_subgraph 等。

0.6 什么是 SSE(Server-Sent Events)

前端和 gateway 之间用 SSE 协议通信。SSE 就是服务器向浏览器单向推送事件:

event: message
data: {"msgId": "xxx", "type": "AGENT", "contents": [...]}

event: message
data: {"msgId": "xxx", "type": "AGENT", "contents": [...]}

前端收到这些事件后,实时渲染到聊天界面。每条事件有一个 mergeType

  • APPEND:追加到现有消息末尾(流式打字)
  • UPDATE:替换整条消息(最终态覆盖)

0.7 create_agent 是什么

create_agent 是 LangGraph 提供的一个工厂函数,它自动创建一个包含 LLM 调用的子图

agent_graph = create_agent(
    model,          # 用哪个 LLM
    tools=[...],    # 可以调哪些工具
    state_schema=ChatState,
    response_format=ProviderStrategy(SomeOutputClass),  # 结构化输出
)

它内部会自动创建 model 节点(调 LLM)和 tools 节点(执行工具),组成一个 model ↔ tools 的循环。

关键问题:如果 create_agent 被用在子图内部,且 response_format 设了结构化输出(如要求 LLM 输出 JSON),那么这个JSON 会作为 AIMessage 通过 messages 流上报。它就是本次 JSON 泄漏的源头。

0.8 整体链路(把前面的串起来)

前端发送消息
  │
  ▼
gateway (Python) — 接收请求,构建 ChatUIState
  │
  ▼
agent_server (Python) — LangGraph 执行
  │
  ├── core_workflow.astream()
  │     ├─ 执行 input_parser_node → 解析用户输入
  │     ├─ 执行 router_subgraph 子图 → 判断场景
  │     ├─ 执行 knowledge_subgraph 子图 → 检索知识
  │     ├─ 执行 response_node → LLM 生成最终回复
  │     └─ ... 每个节点执行时通过 astream 往外发事件
  │
  ├── 事件类型(stream_mode 控制):
  │   ├─ messages: LLM 的逐字输出 (AIMessageChunk)
  │   ├─ custom:   节点自定义推送(卡片、选择器等)
  │   └─ values:   完整 state 快照(用于最终态补齐)
  │
  ▼
gateway — 接收 SSE 事件,写入 Redis Stream
  │
  ▼
前端 — 消费 SSE,实时渲染

这份文档的核心问题就是astream() 发出来的事件里,哪些应该展示给用户,哪些是内部推理过程不该展示?这就是过滤链要解决的事。


为什么有这份文档

2026年6月,我们遇到了两次叠加的问题:

  1. 结构化 JSON 泄漏:子图内部 create_agent 的结构化输出({"normalized_question": "xxx", ...})裸 JSON 出现在用户聊天界面
  2. 子图消息误过滤:修复 JSON 泄漏时用了 startswith('{') 一刀切,导致某些正常子图输出被误杀

我和同事 A 同时在修这件事,两个人都对 subgraphs=True + values 模式 + create_agent 的事件传播链路不够熟悉,各自在正确方向上努力,但合在一起才暴露出知识点盲区。

这份文档把这次实战中学到的知识体系化,让自己以后不再踩同样的坑。


一、LangGraph 流式基础:三种 Stream Mode

1.1 我们的配置

# core_workflow.py:369-375
async for chunk in self.graph.astream(
    input=user_input,
    config=config,
    subgraphs=True,                            # 关键:子图事件会上报
    stream_mode=['messages', 'custom', 'values'],  # 三种模式
)

subgraphs=True 时,astream 返回的不再是简单的 (event, content),而是 三元组

graph_chain = chunk[0]   # 如 ('knowledge_subgraph', 'model')
graph_event = chunk[1]   # 'messages' | 'custom' | 'values'
graph_content = chunk[2] # 内容,格式取决于 mode
  • graph_chain 为空元组 () 表示事件来自主图顶层节点
  • graph_chain 非空表示事件来自子图内部,最后一个元素是最内层节点名

1.2 messages 模式

产出(message_obj, metainfo_dict) 元组

# core_workflow.py:381
message = graph_content[0]   # AIMessageChunk / AIMessage / ToolMessage
metainfo = graph_content[1]  # {'lc_source': '...', 'thread_id': '...', ...}

这是最核心的模式,LLM 的流式 token 通过它逐字推送:

  • AIMessageChunk:模型正在生成时的逐 token 片段。content 可能只是 "你""好" 这样的单字。
  • AIMessage:模型完成一次完整回复后的完整消息。content 是完整的回复文本。
  • ToolMessage:工具执行完成后产出的结果。

1.3 custom 模式

产出:节点内部调用 get_stream_writer()(payload) 推送的任意内容

# 节点内部
writer = get_stream_writer()
writer(create_custom_message(id, "card_type_a", content, extra_data))

这是最干净的消息通道。消息不经过 state,直接以 SSE 事件的形式透传给前端。insight_subgraph 就完全走这条路——它的 card_type_a、card_type_b、card_type_c 等内容全部通过 custom 事件推送,不依赖 messages/values 路径

1.4 values 模式

产出:每一步执行后的完整 state 快照(不是增量 diff)

# core_workflow.py:589-593
elif graph_event == 'values':
    messages = graph_content.get('messages', [])  # 完整 state 的 messages 数组

这是最容易出问题的模式。因为:

  1. 子图的 values 事件包含子图内部的全部 state(包括 create_agent 内部的结构化 JSON)
  2. 主图的 values handler 不知道哪些消息来自子图内部、哪些来自根图
  3. 它会遍历所有 state.messages 里的 AI 消息,把"可见过的消息"补发完整版(mergeType: UPDATE

1.5 三模式对比

messages custom values
产出的内容 单个消息对象 任意自定义 payload 完整 state 快照
触发时机 每条消息产生时 节点主动 push 每步执行后
是否经过 state 否(旁路) 是(全部)
子图影响 会冒泡上来 会冒泡上来 会冒泡上来
主要用途 流式 LLM 输出 组件级推送(卡片等) 最终态补齐(UPDATE)
典型陷阱 子图内部 JSON 泄漏 子图 state 误补发

二、三种消息类型:User / Agent / Tool

2.1 HumanMessage — 用户消息

用户发的消息。在 stream handler 中被直接跳过

# core_workflow.py:411-412
if not isinstance(message, (AIMessage, AIMessageChunk, ToolMessage)):
    continue  # HumanMessage 在这里被过滤

原因:用户的原始输入已经在前端展示过了,不需要在 SSE 流中再次下发。

2.2 AIMessageChunk vs AIMessage — AI 消息的两种形态

这是最重要的区分:

AIMessageChunk AIMessage
阶段 流式生成中 生成完成后
content 单 token 片段 完整回复
下发方式 mergeType: APPEND mergeType: UPDATE
message.content 格式 字符串或 list[dict] 同左
携带工具调用 tool_call_chunks(增量) tool_calls(完整)

关键认知:同一个 AI 回复,先以 Chunk 形式逐 token 下发(APPEND),最后以完整 AIMessage 形式下发(UPDATE)。前端用 ID 关联,先拼接后替换。

# Chunk 下发 — 流式逐字
chatui_message = build_ai_message_emit(message_id, message_content, message_kind='chunk')
# → 前端 append 到消息末尾

# 完整消息下发 — 最终态覆盖
chatui_message = build_ai_message_emit(message_id, message_content, message_kind='update')
# → 前端替换整个消息内容

2.3 ToolMessage — 工具执行结果

工具调用完成后产生。结构:

ToolMessage(
    content="工具执行的结果字符串",
    tool_call_id="call_xxx",
    name="tool_name",
    status="completed"  # 或 "failed"
)

handler 把它解析成 tool_result 事件推给前端,前端可以展示"工具 xxx 执行完成"。

2.4 message.content 的真实结构

不是所有 content 都是纯字符串。text_content() 函数做了规范化:

# core_workflow.py:100-112
def text_content(content):
    if isinstance(content, str):        # 纯文本 → 直接返回
        return content
    if isinstance(content, list):       # 多模态 → 提取 type=text 的块
        return ''.join(item['text'] for item in content if item.get('type') == 'text')
    if content is None:
        return ''

三、子图到主图:事件传播链路

3.1 什么是子图

主图的某些节点本身是一个编译后的 StateGraph。例如:

主图 StateGraph (ChatState)
├── input_parser_node       ← 普通函数节点
├── router_subgraph          ← 编译后的子图(create_agent)
├── knowledge_subgraph ← 编译后的子图(create_agent)
├── insight_subgraph       ← 编译后的子图(自定义 3 节点)
├── response_node           ← 普通函数节点
└── ...

当一个节点是编译后的子图时,它内部有自己的 state、自己的节点、自己的消息流。

3.2 subgraphs=True 的效应

主图 astream(subgraphs=True)
  │
  ├─ [主图] input_parser_node 运行
  │   └─ graph_chain=() → graph_event='custom' → graph_content=...
  │
  ├─ [子图] router_subgraph 运行
  │   ├─ 内部 create_agent.ainvoke()
  │   │   ├─ model node 产生 AIMessageChunk
  │   │   │   └─ graph_chain=('router_subgraph', 'model')
  │   │   │      graph_event='messages'
  │   │   │      graph_content=(AIMessageChunk(content='{"scenarios"...'), {...})
  │   │   │        → 🔴 主图 handler 收到这个消息!
  │   │   │
  │   │   └─ 最终产出 AIMessage(content='{"scenarios":"场景A",...}')
  │   │       └─ graph_chain=('router_subgraph', 'model')
  │   │          graph_event='messages'
  │   │          graph_content=(AIMessage(content='{"scenarios":...'), {...})
  │   │            → 🔴 主图 handler 也收到这个消息!
  │   │
  │   └─ 子图 state values 事件
  │       └─ graph_chain=('router_subgraph',)
  │          graph_event='values'
  │          graph_content={'messages': [..., AIMessage('{"scenarios":...'), ...], ...}
  │            → 🔴 values handler 遍历到这条 AIMessage → 补发 UPDATE!
  │
  ├─ [主图] insight_subgraph 子图运行
  │   ├─ 内部 writer(create_card_message(...))
  │   │   └─ graph_chain=('insight_subgraph',)
  │   │      graph_event='custom'
  │   │      graph_content={'name': 'custom_push', 'type': 'card_type_a', ...}
  │   │        → ✅ custom handler 直接透传(clean path)
  │   │
  │   └─ 内部 create_planning_subgraph 在 response_node 中
  │       └─ graph_chain=(..., 'response_node')
  │          graph_event='messages'
  │          graph_content=(AIMessageChunk(content='场景A结果...'), {...})
  │            → ✅ messages handler → 以 chunk 下发(用户可见)

3.3 核心矛盾

subgraphs=True 把子图的所有 events 上报到主图,但主图 handler 无法区分哪些消息应该展示给用户、哪些是内部逻辑。

这就是 JSON 泄漏的根因:router_subgraph 子图内部的 create_agent 产出的 {"scenarios":"场景A",...}内部推理结果(用于场景路由决策),但它通过 messages 流上报到了主图 handler,被当作"需要展示给用户的消息"推到了前端。

3.4 事件传播全景图

这张图是本文档最重要的图。它把 astream(subgraphs=True, stream_mode=['messages','custom','values'])所有事件的来源、路径、去向画在了一起。

                          ┌─────────────────────────────────────────────────────┐
                          │              主图 (CoreWorkflow)                     │
                          │                                                     │
                          │  astream(subgraphs=True,                             │
                          │          stream_mode=['messages','custom','values']) │
                          │                                                     │
                          └─────────────────────────────────────────────────────┘
                                                    │
                    ┌───────────────────────────────┼───────────────────────────────┐
                    │                               │                               │
                    ▼                               ▼                               ▼
          ┌─────────────┐                 ┌──────────────┐                ┌──────────────┐
          │  messages   │                 │    custom    │                │    values    │
          │  事件流      │                 │   事件流      │                │   事件流      │
          └─────────────┘                 └──────────────┘                └──────────────┘
                    │                               │                               │
    ┌───────────────┼───────────────┐               │               ┌───────────────┼───────────────┐
    │               │               │               │               │               │               │
    ▼               ▼               ▼               ▼               ▼               ▼               ▼
┌───────┐    ┌────────────┐  ┌───────────┐   ┌───────────┐  ┌───────────┐  ┌───────────┐  ┌────────────┐
│ 根图   │    │router      │  │knowledge  │   │insight    │  │ 根图       │  │router      │  │knowledge   │
│ 节点   │    │_subgraph  │  │_subgraph  │   │_subgraph  │  │ 节点       │  │_subgraph  │  │_subgraph   │
│       │    │            │  │           │   │            │  │            │  │            │  │            │
└───────┘    └────────────┘  └───────────┘   └───────────┘  └───────────┘  └───────────┘  └────────────┘
    │               │               │               │               │               │               │
    │               │               │               │               │               │               │
    ▼               ▼               ▼               ▼               ▼               ▼               ▼
┌───────┐    ┌────────────┐  ┌───────────┐   ┌───────────┐  ┌───────────┐  ┌───────────┐  ┌────────────┐
│AIMes- │    │create_agent│  │create_agent│   │writer()   │  │完整 state │  │完整 state  │  │完整 state  │
│sage-  │    │内部 model  │  │内部 model  │   │推         │  │快照中的   │  │快照中的    │  │快照中的    │
│Chunk  │    │节点 产出   │  │节点 产出   │   │custom事件 │  │AI消息     │  │AI消息      │  │AI消息      │
│       │    │            │  │            │   │           │  │           │  │            │  │            │
│       │    │产出:       │  │产出:       │   │产出:      │  │           │  │产出:       │  │产出:       │
│消息:  │    │{"scenarios│  │{"normalized│   │card_type_a │  │消息:      │  │{"scenarios│  │{"normalized│
│"你好" │    │":"场景A"│  │_question"  │   │card_type_b│  │"你好"     │  │":"场景A"│  │_question"  │
│       │    │...}        │  │:"xxx"...}  │   │           │  │           │  │...}        │  │:"xxx"...}  │
└───────┘    └────────────┘  └───────────┘   └───────────┘  └───────────┘  └───────────┘  └────────────┘
    │               │               │               │               │               │               │
    │               │               │               │               │               │               │
    ▼               ▼               ▼               ▼               ▼               ▼               ▼
╔═══════╗    ╔════════════╗  ╔═══════════╗   ╔═══════════╗  ╔═══════════╗  ╔═══════════╗  ╔═══════════╗
║ ✅     ║    ║ ❌ 需要过滤 ║  ║ ❌ 需要过滤 ║   ║ ✅ 直接透传║  ║ ✅ 守卫通过║  ║ ❌ 守卫拦截║  ║ ❌ 守卫拦截║
║ 下发    ║    ║ 内部JSON  ║  ║ 内部JSON  ║   ║ (旁路通道) ║  ║ 消息已在   ║  ║ 不在       ║  ║ 不在       ║
║ 前端    ║    ║ 不应展示  ║  ║ 不应展示  ║   ║           ║  ║ visible中  ║  ║ visible中  ║  ║ visible中  ║
╚═══════╝    ╚════════════╝  ╚═══════════╝   ╚═══════════╝  ╚═══════════╝  ╚═══════════╝  ╚═══════════╝
    │               │               │               │               │               │               │
    │               │               │               │               │               │               │
    ▼               ▼               ▼               ▼               ▼               ▼               ▼
┌───────┐    ┌────────────┐  ┌───────────┐   ┌───────────┐  ┌───────────┐  ┌───────────┐  ┌────────────┐
│APPEND │    │   SKIP     │  │   SKIP    │   │  custom- │  │  UPDATE   │  │   SKIP    │  │   SKIP     │
│打字   │    │  (过滤链   │  │  (过滤链  │   │  Push     │  │  最终补齐  │  │  (不补齐) │  │  (不补齐)  │
│效果   │    │  ⑦拦截)   │  │  ⑦拦截)  │   │  "卡片"   │  │            │  │           │  │            │
└───────┘    └────────────┘  └───────────┘   └───────────┘  └───────────┘  └───────────┘  └────────────┘
                    │                               │
                    │                               │
        ┌───────────┴───────────┐                   │
        │                       │                   │
        ▼                       ▼                   ▼
  ┌───────────┐         ┌───────────┐       ┌───────────┐
  │ 如果漏过滤 │         │ 即使漏过滤 │       │ 不经过    │
  │ → 前端看到 │         │ → values   │       │ messages  │
  │ raw JSON  │         │ handler    │       │ 流,直接透 │
  │ (第1条泄漏 │         │ 补发完整版 │       │ 传到前端  │
  │  路径)    │         │ (第2条泄漏 │       │           │
  │           │         │  路径)     │       │           │
  └───────────┘         └───────────┘       └───────────┘

看图说话

  1. 根图节点产生的 messages(比如 response_node 的 LLM 回复 “你好”)→ ✅ 直接下发,前端看到打字效果
  2. 子图 create_agent 内部 model 节点产生的 messages(比如 {"scenarios":"场景A",...})→ ❌ 需要被过滤链拦截。如果漏了 → 前端看到裸 JSON
  3. 子图内部 writer() 推送的 custom 事件(比如 insight_subgraph 的 card_type_a)→ ✅ 不经过 messages/values 路径,完全干净的旁路通道
  4. 根图节点的 values(完整 state 快照中的 AI 消息 “你好”)→ ✅ 消息 ID 在 visible_ai_message_ids 中 → 补发 UPDATE(最终态确认)
  5. 子图节点的 values(完整 state 快照中的 JSON 消息)→ ❌ 消息 ID 不在 visible_ai_message_ids 中(因为 messages 层已过滤)→ 守卫拦截,不补齐

两条泄漏路径

  • 路径 1(messages 层):子图 create_agent JSON → messages handler 没过滤掉 → 以 chunk 形式逐字下发 → 前端展示 raw JSON
  • 路径 2(values 层):路径 1 发生的同时 → message_id 被加入 visible_ai_message_ids → values handler 遍历到这条消息 → 补发完整版 UPDATE → 前端看到完整 JSON(第二次泄漏,且是完整版)

两条路径必须同时堵住:只堵 messages(chunk 过滤)不够——如果某条 JSON 消息绕过了 chunk 过滤进入了 visible_ai_message_ids,values 路径还会补发完整版。只堵 values(不补发子图消息)也不够——messages 路径会先逐字泄漏。


四、消息过滤链:哪些要渲染,哪些不要

stream handler 里有层层过滤,理解每个过滤器的存在理由,才能知道新增子图时该怎么处理。

4.1 过滤全景图

SSE 事件进入 handler
  │
  ├─ ① internal_msg_prefix 前缀检查(所有消息)
  │    ID 以 'internal-marker' 开头 → skip
  │    用途:planning_subgraph / specialist_subgraph 内部消息
  │
  ├─ ② thread_id 前缀检查(所有消息)
  │    thread_id 以 'query-rewrite-' 开头 → skip
  │    用途:knowledge_subgraph 的 query rewriter / pattern recognition
  │
  ├─ ③ 消息类型检查(所有消息)
  │    不是 AIMessage / AIMessageChunk / ToolMessage → skip
  │    HumanMessage 在此被过滤
  │
  ├─ ④ editor_subgraph 检查(仅 Chunk)
  │    graph_chain 包含 'editor_subgraph' → skip
  │    原因:editor_subgraph 走 custom 事件,不走 messages
  │
  ├─ ⑤ router_subgraph JSON 检查(仅 Chunk)
  │    content 以 '{"scenarios":' 开头 → skip
  │    原因:router_subgraph 的 create_agent 结构化输出
  │
  ├─ ⑥ COMPILATION_MARKER 摘要检查(仅 Chunk,仅 response_node)
  │    content 包含 'COMPILATION_MARKER' / '## SUMMARY' 等 → skip
  │    原因:CompactionMiddleware 产生的压缩摘要不应展示
  │
  ├─ ⑦ startswith('{') JSON 检查(仅 Chunk,非 response_node 子图)
  │    content 以 '{' 开头 → 标记 message_id 并 skip 后续所有 chunk
  │    原因:knowledge_subgraph 等子图 create_agent 结构化输出  │    ⚠️ 范围过大:任何非 response_node 子图的 { 开头内容都被过滤
  │
  └─ ⑧ 正常输出
       ├─ AIMessageChunk → build_ai_message_emit(chunk) → yield → 前端 APPEND
       └─ AIMessage → build_ai_message_emit(update) → yield → 前端 UPDATE

4.2 为什么需要这么多过滤器

每一种过滤器背后都是一次"这里不应该输出"的发现。它们存在的共同原因是:

LangGraph 没有一个内置机制区分"展示给用户的输出"和"内部推理的输出"。

create_agent 内部调用 LLM 时,产出的消息和其他 LLM 调用没有结构上的区别——它们都是 AIMessage。主图 handler 只能靠"猜"(检查 thread_id 前缀、检查 content 内容特征)来区分。

4.3 现有过滤方式的分类

过滤方式 粒度 可靠性 维护成本
internal_msg_prefix 前缀 消息级 高(源头标记)
thread_id 前缀 线程级 中(新子图需注册)
graph_chain 节点名匹配 节点级
内容特征 startswith('{') 内容级 低(heuristic) 高(误判风险)
内容特征 COMPILATION_MARKER 内容级 低(heuristic)

五、values vs updates:数据更新那点事

5.1 为什么不用 updates

早期代码用 stream_mode=['updates']updates 只发 state 的增量 diff

# updates 示例:只包含变化的字段
{'scenarios': {'name': '文件分析总结'}}

问题:tool call 的"内容填充"不是"字段变化"——工具名从 tool_name: '' 变成 tool_name: 'search' 是字段更新,但工具参数从 args: '{}' 填充为 args: '{"query": "xxx"}' 在某些情况下不被视为"变化",导致前端拿不到完整工具调用信息。

5.2 values 做了什么

values 在每一步后发送完整 state 快照。handler 利用 TurnRuntime 的追踪机制,只对"已经在消息流中出现过的 ID"做补齐:

# TurnRuntime (stream_runtime.py:34-61)
visible_ai_message_ids: set[str]       # 已通过 messages 流发送的 AI 消息 ID
visible_tool_call_ids: set[str]        # 已发送的工具调用 ID
emitted_values_ai_message_ids: set[str]  # 已通过 values 补齐过的 ID(防重复)
emitted_values_ai_tool_call_ids: set[str]
emitted_values_tool_message_ids: set[str]

核心守卫

# core_workflow.py:602-604 — 只有"已经可见但尚未补齐"的消息才触发 UPDATE
if (message_id in turn_runtime.visible_ai_message_ids
    and message_id not in turn_runtime.emitted_values_ai_message_ids):
    # 补发完整版 → mergeType: UPDATE

这确保:

  1. 子图内部消息不会泄漏:子图 create_agent 的 AIMessage 从未被加入 visible_ai_message_ids(因为它的 chunk 在过滤链中被跳过了),所以 values handler 不会补齐它
  2. 同一条消息不会重复 UPDATEemitted_values_ai_message_ids 记录已补齐的 ID

5.3 子图 + values 组合风险

子图执行 → 内部 produce AIMessage {"normalized_question": "xxx", ...}
  │
  ├─ 路径 1: messages handler
  │   └─ chunk 被 startswith('{') 过滤 → 不加入 visible_ai_message_ids ✅
  │
  └─ 路径 2: values handler
      └─ 子图 state 含这条 AIMessage
          └─ message_id NOT IN visible_ai_message_ids → skip ✅
              (没有被补齐,因为从没"可见"过)

但如果过滤链漏了某个 chunk(比如首 chunk 不巧没有以 { 开头),message_id 就会被加入 visible_ai_message_ids,values handler 就会把它当作用户可见消息,补发完整 JSON 到前端。


六、实战教训:我和同事 A 为什么会撞车

6.1 两个改动的交集

时间线:

同事 A 的改动(先):
  ① stream_mode 从 ['updates'] 改为 ['values']
  ② 新增 subgraphs=True
  ③ 新增 values handler(遍历 state.messages,补齐已可见消息的完整版)

我的改动(后):
  ④ 把 insight_subgraph 从普通节点改为编译后的子图(StateGraph)

两个改动各自测试通过。合并后:
  → subgraphs=True 把 insight_subgraph 子图内部 events 上报到主图
  → 这些 events 包括 knowledge_subgraph 等旧有子图的 create_agent JSON
  → values handler 收到子图 state,遍历到 JSON 消息 → 补齐 UPDATE → 泄漏到前端

6.2 为什么各自都没发现

同事 A 的角度:他加 subgraphs=True + values handler 时,主图还没有子图节点(insight_subgraph 还是普通节点)。在他当时的环境下,values 事件里不会出现子图内部消息,values handler 的行为完全正确。

我的角度:我把 insight_subgraph 改成子图时,知道它的输出走 custom 事件(不经过 messages/values),所以认为改子图是安全的。但我没考虑到 subgraphs=True 已经打开了,也没排查其他已存在的子图knowledge_subgraphrouter_subgraph)的内部 events 会被影响。

6.3 根本原因

没有系统性的知识。两个人都不知道:

  1. subgraphs=True 后子图内部事件的完整传播路径
  2. create_agent 内部 model 节点会通过 messages 流产出结构化 JSON
  3. values handler 的 visible_ai_message_ids 守卫依赖消息必须先经过 messages handler 的过滤

6.4 正确做法

改动 stream_mode / subgraphs 等全局配置时,必须扫一遍所有 handler

□ stream_mode 增删了哪个模式?
  → 已有 handler 对新模式的事件会怎么处理?
□ subgraphs 开关变了?
  → 所有现有子图的新增事件会被哪些 handler 误处理?
□ 新增了 handler 逻辑?
  → 它会处理哪些来源的事件?能区分根图 vs 子图吗?

新增子图节点时,排查"我的内部事件会不会被主图 handler 误处理"

□ 子图内部有哪些 LLM 调用(create_agent / direct invoke)?
  → 它们的输出会以 messages/values 事件上报给主图
□ 主图的 messages handler 会不会误发我的内部消息?
  → 检查所有过滤条件是否覆盖了我的消息
□ 主图的 values handler 会不会补齐我的内部消息?
  → 确认我的内部消息不会进入 visible_ai_message_ids

七、最佳实践:如何不让消息泄漏

7.1 当前最佳方案:消息打 tag(同事 A 的方案)

在消息源头create_agent 调用处)给 AIMessage 加 metadata tag:

# 内部推理消息 — 打上 internal tag
AIMessage(
    content='{"normalized_question": "xxx", ...}',
    response_metadata={"visibility": "internal"}
)

# 用户可见消息 — 默认不打 tag 或打 user_visible
AIMessage(
    content="这是展示给用户的回复",
    response_metadata={"visibility": "user_visible"}
)

stream handler 只需一条规则:

if message.response_metadata.get("visibility") == "internal":
    continue

优点

  • 责任在消息生产者(他知道自己在产什么),不在消费者(不用猜)
  • 不管新增多少子图,只要有 tag 就不会漏
  • 不受内容特征变化影响(明天 JSON 格式变了也不怕)

这比 startswith('{') 精确无数倍,也比写死子图名白名单干净。

7.2 长期方向:子图输出约定

所有子图约定:

  1. 用户可见的输出custom 事件,不在 messages 路径上依赖主图 handler
  2. 内部推理消息create_agent 结构化输出等)不期望透出到前端
  3. 主图的 messages handler 只处理根图 LLM 的直接输出

insight_subgraph 已经符合这个模式。未来新增子图都应按此约定。


八、速查表

8.1 我要新增一个子图,该关注什么

问题 检查项
子图内部有 LLM 调用吗? 消息会通过 messages 流上报到主图
LLM 输出是给用户看的还是内部用的? 内部用的要打 tag 或走 custom 事件
子图有 create_agent 调用吗? create_agent 内部的 model 节点会独立产生 messages 事件
子图 state 里有不该展示的消息吗? 会通过 values handler 的 visible_ai_message_ids 被补齐
子图需要自定义内容推送吗? get_stream_writer() 发 custom 事件

8.2 消息在哪个阶段被过滤

消息来源 过滤方式 安全性
planning_subgraph 内部 internal_msg_prefix 前缀 ✅ 可靠
query-rewrite- 子图 thread_id 前缀 ✅ 可靠
editor_subgraph graph_chain 节点名 ✅ 可靠
router_subgraph 内容特征 + graph_chain ⚠️ heuristic
knowledge_subgraph startswith('{') ❌ 范围过大
response_node 压缩摘要 内容特征(COMPILATION_MARKER 等) ⚠️ heuristic

8.3 三种消息事件模式对比

用户消息(HumanMessage)
  → 在 stream handler 中被过滤(不推送给前端)
  → 只作为对话历史保存在 state.messages 中

AI 消息(AIMessageChunk → AIMessage)
  → Chunk:流式 APPEND,逐 token 推送给前端
  → Message:最终 UPDATE,补发完整内容
  → 工具调用参数在 Chunk.tool_call_chunks 中流式下发

工具消息(ToolMessage)
  → 包含 tool_call_id(关联回对应的 AIMessage 工具调用)
  → content 是工具执行结果
  → values handler 用 build_merged_tool_payload 合并最终状态

九、面试吹牛篇:如何把这次实战讲成高分故事

面试中最大的优势不是你解决了多少 bug,而是你能把解决 bug 的过程讲出系统性思考

9.1 开场 30 秒:一句话定调

面试官问"你做过最有挑战的事情是什么",不要直接进技术细节。先给一个有画面感的问题描述

"我在做的是一个企业级 AI 聊天系统。有次我们发现,用户正常聊天的时候,聊天界面突然出现了一坨裸 JSON——{"normalized_question":"xxx","search_queries":["yyy"]}。用户完全看不懂,这是系统内部的推理结果泄漏到了前端。

这个问题的根因涉及到 LangGraph 框架的三个核心机制——stream mode、子图事件传播、消息过滤链。我和另一个同事同时在对这个模块做改动,两个改动各自都是对的,但组合在一起触发了这个泄漏。

最终我不仅修了 bug,还写了一份从零到一的教学文档,覆盖了整个 LangGraph 流式事件处理体系。"

为什么这样开场有效:有画面(裸 JSON 出现在聊天框)、有技术深度(三个核心机制)、有协作冲突(两个正确改动组合出 bug)、有产出(教学文档)。面试官马上知道你不是在修表面 bug,而是在理解系统。

9.2 故事主线:5 分钟讲清楚来龙去脉

按这个顺序讲,面试官最容易跟上:

第一幕:背景(30 秒)

"我们的聊天系统用 LangGraph 编排 LLM 调用。主图里挂了好几个子图节点——router_subgraph 负责场景路由,knowledge_subgraph 负责知识检索,insight_subgraph 负责智能分析。

LangGraph 的 astream() 有三种 stream mode:messages(流式 LLM 输出)、custom(节点自定义推送)、values(每步后的完整 state 快照)。我们三种全用了,并且开了 subgraphs=True,子图内部的事件会冒泡到主图。"

第二幕:问题(1 分钟)

"有一天我们发现,用户界面出现了 {"normalized_question":"xxx","search_queries":["yyy"]} 这样的裸 JSON。排查后发现,这是子图内部 create_agent 调用产出的结构化推理结果——它本来只是 agent 之间通信用的内部数据,不应该给用户看。

泄漏路径是这样的:子图的 create_agent 内部 model 节点产生 AIMessage(内容是 JSON),通过 subgraphs=True 上报到主图的 messages handler → handler 把它当成普通 AI 回复 → 推送给前端。同时 values handler 遍历 state.messages,发现这条消息的 ID 在可见列表里,又补发了一次 UPDATE 版本。"

第三幕:冲突(1 分钟)

"更有意思的是,这个问题是我和另一个同事的改动组合触发的。他先改了:把 stream_mode 从 updates 换成 values,加了 subgraphs=True,新增了 values handler。当时主图还没有子图节点,他的改动测试完全正常。

我后改了:把 insight_subgraph 从普通节点改成编译后的子图。我知道这个子图的输出走 custom 事件,不经过 messages/values 路径,所以认为改子图是安全的,测试也过了。

但合并后,subgraphs=True 把 insight_subgraph 和其他已有子图的内部 events 全部上报到主图。其中 knowledge_subgraph 等旧子图的 create_agent JSON 开始通过 messages 和 values 两条路径泄漏。

这是典型的组合式缺陷(Combinatorial Bug):两个改动各自逻辑正确、测试通过,组合后产生未预期的副作用。"

第四幕:根因与解决(1 分钟)

"根本原因不是代码写错了,而是 LangGraph 没有一个内置机制区分’展示给用户的输出’和’内部推理的输出’。create_agent 内部的 LLM 调用和其他 LLM 调用产生的都是 AIMessage,结构上没有任何区别。主图 handler 只能靠猜——检查 thread_id 前缀、检查 content 是否以 { 开头、检查 graph_chain 节点名。

修复分三个阶段:第一阶段,case-by-case 打补丁,用 startswith('{') 过滤所有非 response_node 子图的 JSON 内容——但范围太大,会误杀正常输出。第二阶段,我的同事 同事 A 提出了更好的方案:在消息源头打 tag(response_metadata={"visibility":"internal"}),handler 只检查 tag 不检查内容特征。第三阶段,我写了完整的事件处理体系文档,覆盖了三种 stream mode、三种消息类型、子图传播链路、整个过滤链的设计理由。"

第五幕:收获(30 秒)

"这次经历让我对 Agent 工程有了几个关键认知:

  1. 框架能力(subgraphs=True)打开容易,但理解它对下游 handler 的连锁影响需要系统性排查
  2. 内容特征的过滤是不可靠的——过滤应该基于元数据(tag),不应该基于内容值
  3. 流式系统中,同一个消息会走多条路径(messages + values),需要理解每条路径的触发条件和相互作用"

9.3 可以吹的技术点(逐个攻克)

面试官可能会针对具体技术点追问。下面每个点都拆成"你是怎么理解的" + “可以用什么话术”。

技术点 1:Stream Mode 设计(messages / custom / values)

你的理解

三种 mode 各有分工,缺一不可:

  • messages:LLM 流式输出,逐 token 推送。这是实时性的核心——用户看到打字效果。
  • custom:旁路通道,节点可以任意推数据,不经过 state。这是组件化推送的核心——卡片、选择器、canvas 文档都走这里。
  • values:每步后发完整 state 快照,用于最终态补齐。解决流式输出中可能丢帧或内容不完整的问题。

话术

"这三种 mode 不是随便选的,它们对应了三种不同的信息传递需求。messages 解决的是’用户要看到模型在思考’的实时性问题。custom 解决的是’有些东西不是纯文本’——比如选择器卡片、授权卡片,这些东西不应该混在 markdown 流里。values 解决的是’流式输出可能丢信息’的问题——在每一步后用完整 state 做最终确认。

但 values 也是危险的——它包含子图的完整 state。如果不加守卫,子图内部的推理消息会被误当成用户消息补发。这就是为什么我们在 values handler 里用了 visible_ai_message_ids 追踪:只有已经在 messages 流中’见光’的消息,才允许 values 补发。"

技术点 2:子图事件传播

你的理解

subgraphs=True 把子图内部所有节点的事件上报到主图。这是双刃剑:好的一面是让主图能统一处理所有流式输出;坏的一面是子图的内部推理消息也上报了,主图 handler 无法区分来源。

话术

"LangGraph 的子图机制有个隐含设计假设:子图内部的消息和根图的消息是同质的——都是 AIMessage。框架本身没有提供’这条消息是内部推理结果’的标记。所以当 subgraphs=True 时,子图 create_agent 的结构化 JSON 和用户可见的 LLM 回复,在 handler 眼里长得一模一样。

这就逼着我们自己在应用层建立区分机制。最初我们用内容特征(startswith('{')),后来演进到 metadata tag(visibility: internal)。这个演进过程本身说明了一个问题:过滤应该基于消息的意图(它是给谁看的),而不是消息的长相(它长什么样)。"

技术点 3:过滤链的设计哲学

你的理解

现有 8 层过滤,每层都是一次"这里不该输出"的发现。按可靠性分:

  • 源头标记(internal_msg_prefix、thread_id 前缀):最可靠,因为消息生产者明确标记了它不该展示
  • graph_chain 节点名匹配:中等可靠,因为节点名稳定但需要维护
  • 内容特征匹配(startswith('{')、COMPILATION_MARKER):最不可靠,纯 heuristic

话术

"这 8 层过滤是’长出来的’,不是设计出来的。每加一层,都是因为发现了一种新的泄漏路径。如果让我重新设计,我会在消息层面加一个统一的 visibility 字段——消息生产者自己标记’intended_for_user’还是’internal’。这样 handler 只需要一条规则,不需要猜。

这也反映了 Agent 系统的一个本质挑战:LLM 的输出是文本,文本本身不携带’intent’信息。当多个 agent 通过同一条消息管道通信时,接收方只能通过上下文推断哪些是给自己的、哪些是给用户的。这就是为什么 metadata tag 比内容特征好——metadata 是结构化的 intent。"

技术点 4:组合式缺陷(Combinatorial Bug)

你的理解

两个正确的改动组合后产生 bug。这类 bug 很难在 code review 中发现,因为 reviewer 通常只看自己关注的改动范围,不会去推演对方改动对全局事件路径的影响。

话术

"这类组合式缺陷在分布式和流式系统中特别常见。核心问题是:改动 A 打开了新的能力(subgraphs=True),改动 B 添加了新的数据(子图节点),但没有人检查’新能力处理新数据时会怎样’。

我们的教训是:改动全局配置(stream_mode、subgraphs)时,必须扫一遍所有 handler;新增子图时,必须排查所有已有子图的内部事件会不会被误处理。这两个 checklist 已经在我们的文档里固化了。"

技术点 5:values 和 updates 的选择

你的理解

updates 是增量 diff,values 是完整快照。从 updates 切到 values 是因为 tool call 的内容填充在 updates 模式下有时不会被识别为"变化"。

但 values 带来了新问题:子图 state 被完整暴露。

话术

"从 updates 换到 values,本质是在’信息完整性’和’信息安全性’之间做权衡。values 让消息补齐更可靠,但也让子图内部 state 暴露在了 handler 面前。

我们的解决方案不是退回 updates,而是给 values handler 加了守卫——visible_ai_message_ids。只有已经在 messages 流中’见光’的消息 ID,才被 values 补发。这个守卫本质上是在说:‘我只补齐用户已经看到的东西,不揭示用户不该看到的东西’。"

技术点 6:消息打 tag(同事 A 的方案)

你的理解

这是目前最好的方案——在消息源头标记 visibility。比内容特征过滤精确,比子图名白名单干净。

话术

"消息打 tag 的思路在软件工程里有个经典对应:‘Provenance(来源标记)’。与其在消费者端猜’这条消息该不该展示’,不如让生产者声明’我这条消息是给谁看的’。责任在消息生产者,因为他最清楚自己的输出性质。

这个方案的好处是:不管以后新增多少子图、多少 agent,只要产消息的时候正确标记了 visibility,handler 不需要任何改动。这是 scalable 的。"

9.4 对 Agent 工程的理解(升华)

这一节是面试中"加分项"——从具体 bug 上升到方法论。

理解 1:Agent 系统最大的挑战不是模型能力,是消息路由

"很多人以为做 Agent 就是调 prompt、选模型。但实际上,当系统里有 10+ 个 agent 在同一个 StateGraph 里协作时,最难的问题是:谁的消息给谁看?

一个 agent 产出的结构化推理结果(比如场景判断 JSON),可能是给另一个 agent 的输入,也可能是给用户的回复。同一类消息对象(AIMessage),在不同上下文里有完全不同的含义。LangGraph 框架不解决这个问题,得我们自己建约定。"

理解 2:流式系统中的"最终一致性"

"流式 LLM 输出天然是’先发后补’的——先发 Chunk 打打字,最后发完整 Message 做最终确认。这和分布式系统里的’最终一致性’是同一个思想:先尽快给用户反馈(可用性),再在后台确保数据准确(一致性)。

values 模式就是做这个’最终确认’的——在每一步后用完整 state 发一次 UPDATE,确保前端拿到的内容和后端 state 一致。"

理解 3:好的架构让组件各走各路

"insight_subgraph 是我们架构里最干净的组件——它的输出全部走 custom 事件,不碰 messages/values 路径。这意味着它不需要依赖主图 handler 的任何过滤逻辑,自己控制自己的输出通道。

这就是好的 Agent 架构:每个 agent 对自己的输出边界负责,不依赖下游消费者去猜’这个该不该展示’。未来新增 agent 都应该按这个模式设计。"

理解 4:heuristic 过滤是技术债

"任何基于内容特征的过滤(startswith('{')COMPILATION_MARKER 等)都是技术债。它今天能用,但明天内容格式一变就失效,而且有误杀风险。

技术债不可避免——遇到紧急问题先打补丁是合理的。但打完补丁后要有计划地还债:用 metadata tag 替代内容特征,用结构化字段替代字符串匹配。"

9.5 常见追问 & 回答预案

Q1:为什么不用 stream_mode=['messages'] 一种就够了?

“只用 messages 解决不了组件化推送的问题——你能想象一个 card_type_b 卡片被放在 markdown 流里逐字展示吗?也不能解决最终态补齐的问题——流式输出可能丢帧,没有 values 的话,某些消息永远不会以完整形式到达前端。”

Q2:为什么不让每个 agent 控制它自己是否输出?

“这是我们最终要走的方向——消息打 tag 就是这个思路。但 LangGraph 框架目前不提供内置的消息 visibility 机制,所以我们自己在应用层用 response_metadata 实现。未来如果 LangGraph 原生支持,我们可以直接迁移。”

Q3:如果有 100 个 agent,你们的过滤方案还撑得住吗?

“撑不住。内容特征过滤和 graph_chain 白名单都是不可扩展的。100 个 agent 的场景下,只有 metadata tag 方案能工作——因为它是 decentralized 的,每个 agent 自己决定 message visibility,不需要中央 handler 知道所有 agent 的名字。”

Q4:这次事故如果重来,你会怎么做?

“我不会先修 bug,我会先画一张事件传播路径图——把 subgraphs=True 后所有子图的所有模式的事件路径画出来,标注每个 handler 会怎么处理。这张图画完,泄漏点就一目了然了。这也是为什么我事后写了这份文档——文档就是这张图。”

Q5:你觉得 LangGraph 这个框架有什么设计缺陷?

"最大的问题是消息没有 visibility 标记。AIMessage 既表示’给用户看的回复’,也表示’给下游 agent 的内部推理结果’。框架没有提供区分机制,把这个责任完全甩给了应用层。

第二个问题是 subgraphs=True 的默认行为——子图内部事件默认上报,但很多情况下应用层并不想要这个行为。如果让我设计,我会让子图显式声明’哪些事件需要上报到主图’。"

9.6 面试节奏建议

面试阶段 讲什么 时长
开场定调 问题画面 + 三个核心机制 + 产出 30 秒
讲故事 背景 → 问题 → 冲突 → 解决 → 收获 5 分钟
技术深挖 面试官问哪个点就讲哪个,用 9.3 的话术 1-2 分钟/点
升华 对 Agent 工程的理解(9.4),展现系统性思考 2 分钟
收尾 “如果重来我会先画事件传播图”(展现反思能力) 30 秒

核心原则

  1. 先给结论再解释:面试官注意力有限,先说"根因是消息没有 visibility 标记",再展开
  2. 用类比:StateGraph = 流水线、values = 最终确认、custom = 旁路通道。抽象概念一定要接地气
  3. 承认不完美:说"我们的过滤链是长出来的,不是设计出来的",比说"我们的系统很完美"更让人信服
  4. 展现系统性:不只是修了 bug,还写了文档、建了 checklist、提出了长期方案。这证明你不是头痛医头
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐