【LangGraph 】 流式传输 LLM Token
【LangGraph】新篇章:LangGraph 流式传输 LLM Token —— 细粒度过滤与多模型并发控制
上一章——> 【LangGraph 】custom自定义流式数据实战

前言
在聊天类 AI 应用中,用户期望看到 AI 像真人一样一个字一个字地输出回答(打字机效果),而不是等待整个生成完成后才一次性显示
LangGraph 的 stream_mode=“messages” 模式支持逐 token 输出 LLM 的响应
更进一步,如果你的图包含多个 LLM 节点(比如一个负责写笑话,一个负责写诗歌),你还可以通过 metadata 中的 tags 或langgraph_node 字段,精确过滤出你关心的那个模型的输出
本文将带您掌握这些高级技巧:
1、messages 模式的基本原理
当你在 stream() 或 astream() 中设置 stream_mode=“messages” 时
LangGraph 会拦截图内所有 LLM 调用,并逐 token 输出 (message_chunk, metadata) 元组
-
message_chunk:一个 AIMessageChunk 对象,通过 .content 可以获取当前 token 的文本
-
metadata:一个字典,包含该 token 的来源信息,例如:
-
“langgraph_node”: 生成该 token 的节点名称
-
“tags”: 你在 LLM 调用时设置的标签列表(通过 model_kwargs={“tags”: […]})
-
“model_name”: 使用的模型名称
-
其他调试信息
-
利用这些元数据,我们可以实现非常细粒度的控制和过滤
2、按 Tags 过滤:同一个图中的多个 LLM 任务
假设我们有一个节点,它同时调用两个不同的 LLM:一个生成笑话,另一个生成诗歌
我们希望只将笑话的生成过程流式输出给用户,而诗歌的结果在后台默默完成
2.1 为每个模型分配不同的标签
在初始化模型时,通过 model_kwargs 传递 tags:
from langchain_openai import ChatOpenAI
joke_model = ChatOpenAI(
model="your-model",
api_key="your-key",
temperature=0,
model_kwargs={"tags": ["joke"]}
)
poem_model = ChatOpenAI(
model="your-model",
model_kwargs={"tags": ["poem"]}
)
2.2 在一个节点中顺序调用两个模型
def generate_creative_content(state: CreativeState):
topic = state["topic"]
joke_response = joke_model.invoke([{"role": "user", "content": f"讲一个关于 {topic} 的笑话"}])
poem_response = poem_model.invoke([{"role": "user", "content": f"写一首关于 {topic} 的短诗"}])
return {"joke": joke_response.content, "poem": poem_response.content}
2.3 流式输出时只显示笑话的 token
for token_chunk, metadata in graph.stream(
{"topic": "猫"},
stream_mode="messages"
):
tags = metadata.get("tags", [])
if "joke" in tags:
print(token_chunk.content, end="", flush=True)
运行后,你只会看到笑话模型逐字输出,而诗歌模型的输出被静默忽略
这对于需要向用户展示“主要回答”、同时后台做额外处理的应用非常有用
我们也可以只输出诗歌模型,改个参数就可以:
if "poem" in tags:
print(token_chunk.content, end="", flush=True)
3、按节点名称过滤:区分并行节点中的不同任务
如果你的图中有多个 LLM 节点(例如一个负责摘要,一个负责翻译),并且它们可能并行执行,你可以根据 metadata 中的 langgraph_node 字段来区分来自哪个节点的 token
3.1 构建并行图
builder = StateGraph(State)
builder.add_node("summarize", generate_summary)
builder.add_node("translate", generate_translation)
builder.add_edge(START, "summarize")
builder.add_edge(START, "translate")
builder.add_edge("summarize", END)
builder.add_edge("translate", END)
graph = builder.compile()
3.2 流式输出时仅显示摘要节点的输出
target_node = "summarize"
for token_chunk, metadata in graph.stream(
{"query": "人工智能是计算机科学的一个分支,致力于创造能够执行通常需要人类智能的任务的机器。"},
stream_mode="messages"
):
node_name = metadata.get("langgraph_node", "")
if token_chunk.content and node_name == target_node:
prefix = "[摘要] " if node_name == "summarize" else "[翻译] "
print(f"{prefix}{token_chunk.content}", end="", flush=True)
这样,终端只会输出摘要的逐 token 内容,而翻译的输出被过滤掉了
3.3 动态切换过滤节点
你可以根据用户选择或某个条件,动态改变 target_node
例如,用户点击“显示翻译”按钮后,前端重新连接并只输出翻译节点
4、综合实战:一个多语言新闻摘要 Agent
让我们构建一个更贴近真实业务场景的例子:
一个新闻处理 Agent,它接收到一篇英文新闻稿后,并行地做两件事:
1.生成中文摘要(使用一个较快的模型,适合摘要)
2.将原文翻译成英文(使用一个更强大的模型,适合翻译)
我们希望前端可以选择「只看摘要的生成过程」或「只看翻译的生成过程」,甚至同时显示但用不同颜色区分。这完美体现了按节点名称过滤的能力
4.1 定义状态和节点
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
class NewsState(TypedDict):
original: str # 原始英文新闻
summary: str # 中文摘要结果
translation: str # 英文翻译结果
# 模型1:用于摘要(速度快、成本低)
summarize_model = ChatOpenAI(
model="gpt-3.5-turbo",
streaming=True, # 开启流式支持
model_kwargs={"tags": ["summary"]}
)
# 模型2:用于翻译(质量更高)
translate_model = ChatOpenAI(
model="gpt-4o-mini",
streaming=True,
model_kwargs={"tags": ["translation"]}
)
def node_summarize(state: NewsState):
"""生成中文摘要节点"""
response = summarize_model.invoke([
{"role": "system", "content": "请将以下英文新闻用中文总结成一段话。"},
{"role": "user", "content": state["original"]}
])
return {"summary": response.content}
def node_translate(state: NewsState):
"""翻译成英文节点"""
response = translate_model.invoke([
{"role": "system", "content": "请将以下新闻翻译成英文,保持新闻专业风格。"},
{"role": "user", "content": state["original"]}
])
return {"translation": response.content}
4.2 构建并行图
builder = StateGraph(NewsState)
builder.add_node("summarize", node_summarize)
builder.add_node("translate", node_translate)
builder.add_edge(START, "summarize")
builder.add_edge(START, "translate")
builder.add_edge("summarize", END)
builder.add_edge("translate", END)
graph = builder.compile()
4.3 流式输出并动态选择节点
现在我们可以通过 stream_mode=“messages” 获取两个节点产生的 token,并根据用户的选择(比如通过命令行参数或前端传递的 target)只显示其中一个节点的输出
# 示例新闻原文
news_text = """Meta has announced a new AI model called Llama 4, which is
trained on 10 trillion tokens and can handle 1 million tokens of context.
It is designed to power advanced AI agents that can reason across very long
documents, such as entire books or movie scripts."""
# 用户选择查看哪一个节点的输出)
target_node = "summarize" # 可选 "summarize" 或 "translate"
print(f"正在实时生成{ '摘要' if target_node == 'summarize' else '翻译'},请稍候...\n")
for token_chunk, metadata in graph.stream(
{
"original": news_text
},
stream_mode="messages"
):
node_name = metadata.get("langgraph_node", "")
if node_name == target_node and token_chunk.content:
print(token_chunk.content, end="", flush=True)
print("\n\n生成完成!")
运行效果(target_node = “summarize”):
正在实时生成摘要,请稍候...
Meta公司宣布推出一款名为Llama 4的新型AI模型,该模型基于10万亿个token进行训练,可处理高达100万个token的上下文内容。它旨在支持能够推理长篇文档(如整本书或电影剧本)的高级AI代理。
生成完成!
如果改为 target_node = "translate",则输出:
Meta has released a new AI model called Llama 4, trained on 10 trillion tokens and capable of handling 1 million tokens of context. It is designed to power advanced AI agents that can reason across long documents, such as entire books or movie scripts.
4.4 高级扩展:将两个节点的输出合并为一个统一的 UI 流
有时我们希望同时显示两个节点的输出,但用不同颜色或前缀区分
我们可以修改循环,打印前缀即可:
prefix_map = {"summarize": "[摘要] ", "translate": "[翻译] "}
for token_chunk, metadata in graph.stream(...):
node_name = metadata.get("langgraph_node", "")
if token_chunk.content and node_name in prefix_map:
print(prefix_map[node_name] + token_chunk.content, end="", flush=True)
这样终端会交错输出两个模型生成的文字(但通常由于两个模型处理速度不同,两个流的 token 会交替出现,可能造成混乱)
总结:
通过这个综合示例,我们可以看到 messages 模式在复杂多说话人/多任务场景下的强大潜力,并结合用户交互实现高度可控的流式体验
5、messages 模式的注意事项
5.1 只有 LLM 节点会产生流
如果一个节点不调用 LLM(例如简单的数据转换节点),它不会产生 messages 流数据
5.2 需要模型支持流式输出
ChatOpenAI 默认支持流式输出,但某些本地模型或封装可能需要显式设置 streaming=True
在 LangChain 中,通常只需在调用 invoke 时通过配置启用,但 stream_mode=“messages” 会自动处理
5.3 多节点并行时的顺序
当多个 LLM 节点并行执行时,它们输出的 token 会交错出现,但每个 token 的 metadata 都正确标识了来源
你可以放心地根据标签或节点名重新组织显示
5.4 性能开销
逐 token 传输会增加网络消息数量,但对于现代应用(WebSocket)来说完全可以接受LangGraph 内部使用了高效的事件循环
6、总结与全系列回顾
本文详细讲解了 LangGraph 的 stream_mode=“messages” 模式:
-
与 custom 模式不同,messages 模式专为 LLM 生成内容设计,提供逐 token 输出
-
通过 metadata 中的 tags 和 langgraph_node,你可以精确过滤来自不同模型或节点的输出
-
结合 updates 模式,可以实现更复杂的交互逻辑
至此,LangGraph 流式处理系列的三篇文章已全部完成:
掌握了这些能力,我们可以构建出响应迅速、交互丰富、可观测性强的 AI Agent 应用
希望你在自己的项目中大胆尝试,并享受流式编程带来的乐趣!!!!
ok啊,流系列到此为止啊,我们下系列再会~~~(写博客好累)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)