上一章——> 【LangGraph 】custom自定义流式数据实战

在这里插入图片描述

前言

在聊天类 AI 应用中,用户期望看到 AI 像真人一样一个字一个字地输出回答(打字机效果),而不是等待整个生成完成后才一次性显示

LangGraph 的 stream_mode=“messages” 模式支持逐 token 输出 LLM 的响应
更进一步,如果你的图包含多个 LLM 节点(比如一个负责写笑话,一个负责写诗歌),你还可以通过 metadata 中的 tags 或langgraph_node 字段,精确过滤出你关心的那个模型的输出

本文将带您掌握这些高级技巧:


1、messages 模式的基本原理

当你在 stream() 或 astream() 中设置 stream_mode=“messages” 时
LangGraph 会拦截图内所有 LLM 调用,并逐 token 输出 (message_chunk, metadata) 元组

  • message_chunk:一个 AIMessageChunk 对象,通过 .content 可以获取当前 token 的文本

  • metadata:一个字典,包含该 token 的来源信息,例如:

    • “langgraph_node”: 生成该 token 的节点名称

    • “tags”: 你在 LLM 调用时设置的标签列表(通过 model_kwargs={“tags”: […]})

    • “model_name”: 使用的模型名称

    • 其他调试信息

利用这些元数据,我们可以实现非常细粒度的控制和过滤


2、按 Tags 过滤:同一个图中的多个 LLM 任务

假设我们有一个节点,它同时调用两个不同的 LLM:一个生成笑话,另一个生成诗歌
我们希望只将笑话的生成过程流式输出给用户,而诗歌的结果在后台默默完成

2.1 为每个模型分配不同的标签

在初始化模型时,通过 model_kwargs 传递 tags:

from langchain_openai import ChatOpenAI

joke_model = ChatOpenAI(
    model="your-model",
    api_key="your-key",
    temperature=0,
    model_kwargs={"tags": ["joke"]}
)

poem_model = ChatOpenAI(
    model="your-model",
    model_kwargs={"tags": ["poem"]}
)

2.2 在一个节点中顺序调用两个模型

def generate_creative_content(state: CreativeState):
    topic = state["topic"]
    joke_response = joke_model.invoke([{"role": "user", "content": f"讲一个关于 {topic} 的笑话"}])
    poem_response = poem_model.invoke([{"role": "user", "content": f"写一首关于 {topic} 的短诗"}])
    return {"joke": joke_response.content, "poem": poem_response.content}

2.3 流式输出时只显示笑话的 token

for token_chunk, metadata in graph.stream(
    {"topic": "猫"},
    stream_mode="messages"
):
    tags = metadata.get("tags", [])
    if "joke" in tags:
        print(token_chunk.content, end="", flush=True)

运行后,你只会看到笑话模型逐字输出,而诗歌模型的输出被静默忽略
这对于需要向用户展示“主要回答”、同时后台做额外处理的应用非常有用

我们也可以只输出诗歌模型,改个参数就可以:

if "poem" in tags:
        print(token_chunk.content, end="", flush=True)

3、按节点名称过滤:区分并行节点中的不同任务

如果你的图中有多个 LLM 节点(例如一个负责摘要,一个负责翻译),并且它们可能并行执行,你可以根据 metadata 中的 langgraph_node 字段来区分来自哪个节点的 token

3.1 构建并行图

builder = StateGraph(State)
builder.add_node("summarize", generate_summary)
builder.add_node("translate", generate_translation)
builder.add_edge(START, "summarize")
builder.add_edge(START, "translate")
builder.add_edge("summarize", END)
builder.add_edge("translate", END)
graph = builder.compile()

3.2 流式输出时仅显示摘要节点的输出

target_node = "summarize"
for token_chunk, metadata in graph.stream(
    {"query": "人工智能是计算机科学的一个分支,致力于创造能够执行通常需要人类智能的任务的机器。"},
    stream_mode="messages"
):
    node_name = metadata.get("langgraph_node", "")
    if token_chunk.content and node_name == target_node:
        prefix = "[摘要] " if node_name == "summarize" else "[翻译] "
        print(f"{prefix}{token_chunk.content}", end="", flush=True)

这样,终端只会输出摘要的逐 token 内容,而翻译的输出被过滤掉了


3.3 动态切换过滤节点

你可以根据用户选择或某个条件,动态改变 target_node
例如,用户点击“显示翻译”按钮后,前端重新连接并只输出翻译节点

4、综合实战:一个多语言新闻摘要 Agent

让我们构建一个更贴近真实业务场景的例子:
一个新闻处理 Agent,它接收到一篇英文新闻稿后,并行地做两件事:

1.生成中文摘要(使用一个较快的模型,适合摘要)

2.将原文翻译成英文(使用一个更强大的模型,适合翻译)

我们希望前端可以选择「只看摘要的生成过程」或「只看翻译的生成过程」,甚至同时显示但用不同颜色区分。这完美体现了按节点名称过滤的能力


4.1 定义状态和节点

from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI

class NewsState(TypedDict):
    original: str      # 原始英文新闻
    summary: str       # 中文摘要结果
    translation: str   # 英文翻译结果


# 模型1:用于摘要(速度快、成本低)
summarize_model = ChatOpenAI(
    model="gpt-3.5-turbo",
    streaming=True,   # 开启流式支持
    model_kwargs={"tags": ["summary"]}
)

# 模型2:用于翻译(质量更高)
translate_model = ChatOpenAI(
    model="gpt-4o-mini",
    streaming=True,
    model_kwargs={"tags": ["translation"]}
)

def node_summarize(state: NewsState):
    """生成中文摘要节点"""
    response = summarize_model.invoke([
        {"role": "system", "content": "请将以下英文新闻用中文总结成一段话。"},
        {"role": "user", "content": state["original"]}
    ])
    return {"summary": response.content}

def node_translate(state: NewsState):
    """翻译成英文节点"""
    response = translate_model.invoke([
        {"role": "system", "content": "请将以下新闻翻译成英文,保持新闻专业风格。"},
        {"role": "user", "content": state["original"]}
    ])
    return {"translation": response.content}

4.2 构建并行图

builder = StateGraph(NewsState)
builder.add_node("summarize", node_summarize)
builder.add_node("translate", node_translate)
builder.add_edge(START, "summarize")
builder.add_edge(START, "translate")
builder.add_edge("summarize", END)
builder.add_edge("translate", END)

graph = builder.compile()

4.3 流式输出并动态选择节点

现在我们可以通过 stream_mode=“messages” 获取两个节点产生的 token,并根据用户的选择(比如通过命令行参数或前端传递的 target)只显示其中一个节点的输出

# 示例新闻原文
news_text = """Meta has announced a new AI model called Llama 4, which is 
trained on 10 trillion tokens and can handle 1 million tokens of context. 
It is designed to power advanced AI agents that can reason across very long 
documents, such as entire books or movie scripts."""

# 用户选择查看哪一个节点的输出)
target_node = "summarize"   # 可选 "summarize" 或 "translate"

print(f"正在实时生成{ '摘要' if target_node == 'summarize' else '翻译'},请稍候...\n")

for token_chunk, metadata in graph.stream(
    {
    "original": news_text
    },
    stream_mode="messages"
):
    node_name = metadata.get("langgraph_node", "")
    if node_name == target_node and token_chunk.content:
        print(token_chunk.content, end="", flush=True)

print("\n\n生成完成!")

运行效果(target_node = “summarize”):

正在实时生成摘要,请稍候...

Meta公司宣布推出一款名为Llama 4的新型AI模型,该模型基于10万亿个token进行训练,可处理高达100万个token的上下文内容。它旨在支持能够推理长篇文档(如整本书或电影剧本)的高级AI代理。

生成完成!
如果改为 target_node = "translate",则输出:

Meta has released a new AI model called Llama 4, trained on 10 trillion tokens and capable of handling 1 million tokens of context. It is designed to power advanced AI agents that can reason across long documents, such as entire books or movie scripts.

4.4 高级扩展:将两个节点的输出合并为一个统一的 UI 流

有时我们希望同时显示两个节点的输出,但用不同颜色或前缀区分
我们可以修改循环,打印前缀即可:

prefix_map = {"summarize": "[摘要] ", "translate": "[翻译] "}
for token_chunk, metadata in graph.stream(...):
    node_name = metadata.get("langgraph_node", "")
    if token_chunk.content and node_name in prefix_map:
        print(prefix_map[node_name] + token_chunk.content, end="", flush=True)

这样终端会交错输出两个模型生成的文字(但通常由于两个模型处理速度不同,两个流的 token 会交替出现,可能造成混乱)

总结:
通过这个综合示例,我们可以看到 messages 模式在复杂多说话人/多任务场景下的强大潜力,并结合用户交互实现高度可控的流式体验


5、messages 模式的注意事项

5.1 只有 LLM 节点会产生流

如果一个节点不调用 LLM(例如简单的数据转换节点),它不会产生 messages 流数据

5.2 需要模型支持流式输出

ChatOpenAI 默认支持流式输出,但某些本地模型或封装可能需要显式设置 streaming=True
在 LangChain 中,通常只需在调用 invoke 时通过配置启用,但 stream_mode=“messages” 会自动处理

5.3 多节点并行时的顺序

当多个 LLM 节点并行执行时,它们输出的 token 会交错出现,但每个 token 的 metadata 都正确标识了来源
你可以放心地根据标签或节点名重新组织显示

5.4 性能开销

逐 token 传输会增加网络消息数量,但对于现代应用(WebSocket)来说完全可以接受LangGraph 内部使用了高效的事件循环


6、总结与全系列回顾

本文详细讲解了 LangGraph 的 stream_mode=“messages” 模式:

  • 与 custom 模式不同,messages 模式专为 LLM 生成内容设计,提供逐 token 输出

  • 通过 metadata 中的 tags 和 langgraph_node,你可以精确过滤来自不同模型或节点的输出

  • 结合 updates 模式,可以实现更复杂的交互逻辑

至此,LangGraph 流式处理系列的三篇文章已全部完成:

掌握了这些能力,我们可以构建出响应迅速、交互丰富、可观测性强的 AI Agent 应用
希望你在自己的项目中大胆尝试,并享受流式编程带来的乐趣!!!!


ok啊,流系列到此为止啊,我们下系列再会~~~(写博客好累)
在这里插入图片描述

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐