目录

引子

LangChain v1.0 刚刚发布时,我曾经写过一篇关于其新架构的分析。当时 LangChain 正在经历一次比较大的框架重构:Agent、Tool、Runtime 等概念被重新整理,LangGraph 也逐渐成为 Agent 运行时的核心基础设施。

不过,在那个时间点上,很多设计仍然处在快速演进阶段。不少 API 和概念还在不断调整,一些官方推荐的使用方式也并没有完全稳定下来。因此,当时的分析更多只能算是一次“阶段性观察”。

随着 LangChain 版本逐渐演进到 1.2,整个框架的核心理念开始变得更加清晰:
Agent 的运行模型、工具调用机制、状态管理方式以及可观测体系,都逐渐形成了一套相对完整的工程化思路。

因此,这篇文章并不是简单介绍 LangChain 的使用方法,而是尝试在 v1.2 的语境下重新审视 LangChain 的一些核心概念,看看在这段时间的迭代中:

  • 哪些设计思路被保留下来了
  • 哪些概念发生了演进
  • LangChain 在 Agent 框架方向上到底想解决什么问题

如果你之前接触过 LangChain,或者正在设计自己的 Agent 系统,希望这次基于 LangChain v1.2 的再探索,能够帮助你更清晰地理解这个框架的整体架构。

Langchain更新日志

2026-03-10 langgraph v1.1

类型安全的流式输出(version=“v2”)

  • stream() / astream() 中传入 version="v2",即可获得统一的 StreamPart 输出结构。

  • 每个数据块都包含以下字段:

    • type
    • ns
    • data
  • 每种流式模式都有对应的 TypedDict 类型定义,均可从 langgraph.types 导入。

类型安全的 invoke 调用(version=“v2”)

  • invoke() / ainvoke() 中传入 version="v2",返回一个 GraphOutput 对象

    • .value
    • .interrupts

Pydantic 与 dataclass 自动转换

  • version="v2" 模式下:

    • invoke() 返回结果
    • values 模式的 stream 输出

会自动转换为你声明的 Pydantic 模型或 dataclass 类型

修复 time travel 与 interrupt / subgraph 的问题

  • 回放(replay)不再复用旧的 RESUME
  • 子图(subgraph)现在可以正确恢复父图的历史 checkpoint

完全向后兼容

  • version="v2"可选功能(opt-in)
  • GraphOutput 仍支持 旧版 dict 风格访问,方便渐进迁移

2026-02-10 deepagents v0.4

新增可插拔沙箱集成包

新增三个集成:

  • langchain-modal
  • langchain-daytona
  • langchain-runloop

可用于可插拔的沙箱环境。

参考:

  • sandboxes guide
  • data analysis 示例教程

对会话历史总结机制的修改

  • Summarization 现在在 model node 中通过 wrap_model_call 事件触发
  • 因此现在会 在 graph state 中保留完整的 message history

更准确的 Token 计算

自动触发上下文总结

如果模型抛出:

ContextOverflowError

将自动触发 summarization。

目前支持:

  • langchain-anthropic
  • langchain-openai

OpenAI Responses API 成为默认

当模型字符串以:

openai:

开头时,默认使用 Responses API

2025-12-15 langchain v1.2.0

create_agent 改进

通过工具上的新属性 extras,简化对 provider-specific tool 参数与定义 的支持。

示例能力:

  • Provider 特定配置

    • Anthropic 的 programmatic tool calling
    • tool search
  • 客户端执行的内置工具

    • Anthropic
    • OpenAI
    • 其他 provider 均支持
  • 支持 严格 schema 的 agent 输出格式

2025-12-08 langchain-google-genai v4.0.0

Google GenAI 集成已被完全重写,现在使用 Google 的统一 Generative AI SDK

新 SDK 同时支持:

  • Gemini API
  • Vertex AI Platform

通过同一接口访问。

变化包括:

  • 少量 breaking change
  • langchain-google-vertexai 中部分包被弃用

2025-11-25 langchain v1.1.0

Model Profiles

Chat Model 现在通过 .profile 属性暴露模型能力,例如:

  • 支持的功能
  • 能力列表

Summarization Middleware

  • 支持更灵活的触发机制
  • 可以结合 model profile 进行上下文感知总结

Structured Output

  • ProviderStrategy(原生结构化输出)现在可以通过 model profile 自动推断

create_agent 支持 SystemMessage

现在可以直接向 system_prompt 传入:

SystemMessage

支持:

  • cache control
  • structured content blocks

Model Retry Middleware

新增中间件:

  • 自动重试失败的模型调用
  • 支持可配置的 指数退避(exponential backoff)

内容审核 Middleware

新增 OpenAI 内容审核中间件,用于检测不安全内容:

支持检测:

  • 用户输入
  • 模型输出
  • 工具执行结果

OK,接下来让我们看看Langchain又整了哪些花活出来。

Agent Loop

在这里插入图片描述
一个 LLM Agent 会在一个循环中调用工具来完成目标任务。Agent 会持续运行,直到满足某个 停止条件,例如:

  • 模型生成了最终输出(final output)
  • 达到预设的迭代次数上限

其基本流程可以理解为:

在这里插入图片描述

即:

  1. 接收输入(input)
  2. 模型进行推理(model)
  3. 决定要执行的操作(action)
  4. 调用工具(tools)
  5. 获得工具返回结果(observation)
  6. 再次进行推理(model)
  7. 最终输出结果(finish)

create_agent 基于 LangGraph 构建了一个 图结构的 Agent 运行时(runtime)

在这个图中:

  • 节点(nodes) 表示执行步骤
  • 边(edges) 表示步骤之间的连接关系

这些节点和连接共同定义了 Agent 如何处理信息以及执行任务流程。Agent 会在这个图结构中不断移动,并执行不同的节点,例如:

  • model node
    调用语言模型进行推理

  • tools node
    执行工具调用

  • middleware
    在执行过程中进行拦截、增强或处理逻辑

LangGraph Runtime

Pregel 实现了 LangGraph 的运行时(runtime),负责管理 LangGraph 应用的执行。

当你:

  • 编译一个 StateGraph
  • 或创建一个 @entrypoint

时,最终都会生成一个 Pregel 实例,这个实例可以通过输入参数进行调用(invoke)。

注意:
Pregel runtime 的名称来源于 Google 的 Pregel 算法,该算法描述了一种利用图结构进行大规模并行计算的高效方法

LangGraph 中,Pregel 将 Actor(执行单元)Channel(通信通道) 组合成一个完整的应用。

基本机制是:

  • Actor 从 Channel 读取数据
  • Actor 向 Channel 写入数据

Pregel 会按照 Pregel Algorithm / Bulk Synchronous Parallel(BSP)模型,将应用执行组织成多个步骤(steps)。

每个步骤包含 三个阶段

1.Plan(规划阶段):确定本步骤需要执行哪些 Actor。

例如:

  • 第一步:选择订阅了 输入 channel 的 Actor
  • 后续步骤:选择订阅了 上一轮更新过的 channel 的 Actor

2.Execution(执行阶段):并行执行所有选定的 Actor,直到:

  • 所有 Actor 执行完成
  • 某个 Actor 执行失败
  • 达到超时时间

在此阶段:

  • Actor 写入 channel 的数据 不会立即可见
  • 这些更新 只会在下一步中生效

3.Update(更新阶段):将 Actor 在本步骤中写入的值 更新到对应的 Channel 中

整个流程会不断重复,直到没有新的 Actor 需要执行或达到最大执行步数

Actors

Actor 在 LangGraph 中对应一个 PregelNode。它会订阅若干 Channel,从这些 Channel 中读取数据,并将结果写回到 Channel 中。

可以将其理解为 Pregel 算法中的计算节点(Actor)。在实现上,PregelNode 实现了 LangChain 的 Runnable 接口

Channels

Channel 用于在 Actor(PregelNode)之间传递数据。每个 Channel 都包含三个核心要素:

  • value type(值类型)
  • update type(更新类型)
  • update function(更新函数)

更新函数会接收一组更新,并据此修改 Channel 当前存储的值。

Channel 可以用于:

  • 在不同 chain 之间传递数据
  • 将数据在未来的执行步骤中再次发送给当前 chain

LangGraph 提供了多种内置 Channel 类型:

  • LastValue:默认的 Channel 类型,用于保存最近一次发送到该 Channel 的值。适合用于输入输出数据,或在不同执行步骤之间传递数据。

  • Topic:一种可配置的 发布-订阅(Pub/Sub)Topic。适合在 Actor 之间传递多个值,或用于累积输出结果。可以配置为对数据进行去重,或在多个步骤中持续累积数据。

  • BinaryOperatorAggregate:用于存储一个持续累积的值。每次更新时,会通过一个二元操作符,将当前值与新发送到 Channel 的更新值进行合并。适合用于跨多个步骤进行聚合计算,例如:

    total = BinaryOperatorAggregate(int, operator.add)
    

示例1:单节点

大多数用户通常通过:

  • StateGraph API
  • @entrypoint 装饰器

来间接使用 Pregel,但实际上也可以 直接使用 Pregel API 构建应用

下面是一个简单示例。

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})

输出结果:

{'b': 'foofoo'}

这个示例的逻辑是:

  1. node1 订阅 channel a
  2. 接收到数据后执行函数 lambda x: x + x
  3. 将结果写入 channel b
  4. 输入 "foo",输出 "foofoo"

示例2:多节点

代码中定义了两个节点:

  • node1:订阅 Channel a,将接收到的字符串进行一次拼接(x + x),并将结果写入 Channel b
  • node2:订阅 Channel b,对其结果再次进行拼接(x + x),并将结果写入 Channel c
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

node2 = (
    NodeBuilder().subscribe_only("b")
    .do(lambda x: x + x)
    .write_to("c")
)

接下来创建一个 Pregel 应用实例。在这个实例中:

  • nodes 定义了参与执行的节点
  • channels 定义了各个 Channel 及其类型
  • input_channels 指定输入数据来源
  • output_channels 指定最终输出的 Channel
app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": LastValue(str),
        "c": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b", "c"],
)

其中:

  • EphemeralValue 表示临时 Channel,仅在当前执行过程中使用
  • LastValue 表示保存最近一次写入值的 Channel

最后调用 invoke 执行该流程:

app.invoke({"a": "foo"})

执行结果为:

{'b': 'foofoo', 'c': 'foofoofoofoo'}

执行过程可以理解为:

  1. 输入 "foo" 写入 Channel a
  2. node1 读取 a"foo" → 处理后得到 "foofoo" → 写入 b
  3. node2 读取 b"foofoo" → 处理后得到 "foofoofoofoo" → 写入 c

因此最终输出:

  • b = "foofoo"
  • c = "foofoofoofoo"

示例3:Topic

下面的代码示例展示了 Topic 类型 Channel 的使用方式,用于在多个节点之间传递并累积多个结果。

代码中定义了两个节点:

  • node1:订阅 Channel a,将输入字符串进行一次拼接(x + x),并将结果同时写入 Channel bc
  • node2:订阅 Channel b,读取 b 中的数据并再次拼接(x["b"] + x["b"]),然后将结果写入 Channel c
from langgraph.channels import EphemeralValue, Topic
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b", "c")
)

node2 = (
    NodeBuilder().subscribe_to("b")
    .do(lambda x: x["b"] + x["b"])
    .write_to("c")
)

随后创建一个 Pregel 应用实例

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": Topic(str, accumulate=True),
    },
    input_channels=["a"],
    output_channels=["c"],
)

其中:

  • a:使用 EphemeralValue,表示临时 Channel
  • b:使用 EphemeralValue,用于在节点之间传递中间结果
  • c:使用 Topic 类型 Channel,并设置 accumulate=True,表示该 Channel 会 累积多个结果值

最后执行:

app.invoke({"a": "foo"})

执行结果:

{'c': ['foofoo', 'foofoofoofoo']}

执行过程可以理解为:

  1. 输入 "foo" 写入 Channel a
  2. node1 读取 a"foo" → 处理后得到 "foofoo" → 写入 bc
  3. node2 读取 b"foofoo" → 处理后得到 "foofoofoofoo" → 写入 c
  4. 由于 cTopic(accumulate=True),因此会累积所有写入结果

最终 c 中保存:

['foofoo', 'foofoofoofoo']

示例4:BinaryOperatorAggregate

该示例演示了如何使用 BinaryOperatorAggregate Channel 来实现一个 reducer(聚合函数)

from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder

首先定义两个节点:

  • node1:订阅 Channel a,将输入字符串进行拼接(x + x),并将结果写入 Channel bc
  • node2:订阅 Channel b,对其结果再次拼接(x + x),并将结果写入 Channel c
node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b", "c")
)

node2 = (
    NodeBuilder().subscribe_only("b")
    .do(lambda x: x + x)
    .write_to("c")
)

接着定义一个 reducer 函数,用于指定 Channel c 在接收到多个更新值时如何进行合并:

def reducer(current, update):
    if current:
        return current + " | " + update
    else:
        return update

该函数的逻辑是:

  • 如果 Channel 中已经存在值,则使用 " | " 将新值追加到后面
  • 如果当前还没有值,则直接返回新值

然后创建 Pregel 应用实例

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": BinaryOperatorAggregate(str, operator=reducer),
    },
    input_channels=["a"],
    output_channels=["c"],
)

其中:

  • a:使用 EphemeralValue,用于输入数据
  • b:使用 EphemeralValue,用于中间数据传递
  • c:使用 BinaryOperatorAggregate,并指定 reducer 作为聚合函数

最后执行:

app.invoke({"a": "foo"})

执行流程为:

  1. 输入 "foo" 写入 Channel a
  2. node1 读取 a"foo" → 处理后得到 "foofoo" → 写入 bc
  3. node2 读取 b"foofoo" → 处理后得到 "foofoofoofoo" → 写入 c
  4. Channel c 使用 reducer 对多个更新值进行聚合

最终 c 的结果为:

foofoo | foofoofoofoo

示例5:Cycle

该示例演示了如何在 图(graph)中引入循环(cycle):让一个节点将结果写回到它自己订阅的 Channel 中。这样,执行过程会不断循环,直到某次写入的值为 None 时才停止。

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry

首先定义一个节点:

  • 订阅 Channel value
  • 如果字符串长度小于 10,就执行 x + x(字符串翻倍)
  • 如果长度已经达到或超过 10,则返回 None
example_node = (
    NodeBuilder().subscribe_only("value")
    .do(lambda x: x + x if len(x) < 10 else None)
    .write_to(ChannelWriteEntry("value", skip_none=True))
)

这里的关键点是:

  • write_to("value") 会把结果重新写回 同一个 Channel
  • skip_none=True 表示如果结果为 None,则不会写入 Channel
    这样就不会再触发下一轮执行,从而结束循环

接着创建一个 Pregel 应用实例

app = Pregel(
    nodes={"example_node": example_node},
    channels={
        "value": EphemeralValue(str),
    },
    input_channels=["value"],
    output_channels=["value"],
)

其中:

  • value 使用 EphemeralValue,作为输入和输出的 Channel

最后执行:

app.invoke({"value": "a"})

执行过程大致如下:

  1. "a""aa"
  2. "aa""aaaa"
  3. "aaaa""aaaaaaaa"
  4. "aaaaaaaa""aaaaaaaaaaaaaaaa"
  5. 此时长度 ≥ 10,返回 None,循环停止

最终输出:

{'value': 'aaaaaaaaaaaaaaaa'}

StateGraph(Graph API)

StateGraph(Graph API) 是一种更高层的抽象,用于简化 Pregel 应用的构建过程。
通过该 API,可以定义一个由 节点(nodes)边(edges) 组成的图结构。

当图被 编译(compile) 时,StateGraph API 会自动为你生成对应的 Pregel 应用实例

from typing import TypedDict

from langgraph.constants import START
from langgraph.graph import StateGraph

首先定义一个状态结构 Essay

class Essay(TypedDict):
    topic: str
    content: str | None
    score: float | None

然后定义两个处理节点:

def write_essay(essay: Essay):
    return {
        "content": f"Essay about {essay['topic']}",
    }

def score_essay(essay: Essay):
    return {
        "score": 10
    }

接下来创建一个 StateGraph

builder = StateGraph(Essay)
builder.add_node(write_essay)
builder.add_node(score_essay)
builder.add_edge(START, "write_essay")
builder.add_edge("write_essay", "score_essay")

最后对图进行编译:

graph = builder.compile()

编译完成后,这个 Pregel 实例会包含一组 节点(nodes)Channel

可以通过打印它们进行查看。

print(graph.nodes)

可能会看到类似输出:

{
 '__start__': <langgraph.pregel.read.PregelNode ...>,
 'write_essay': <langgraph.pregel.read.PregelNode ...>,
 'score_essay': <langgraph.pregel.read.PregelNode ...>
}

说明 Graph API 会自动创建:

  • __start__ 节点
  • write_essay 节点
  • score_essay 节点

同样可以查看 Channel:

print(graph.channels)

可能会看到类似输出:

{
 'topic': <LastValue ...>,
 'content': <LastValue ...>,
 'score': <LastValue ...>,
 '__start__': <EphemeralValue ...>,
 'write_essay': <EphemeralValue ...>,
 'score_essay': <EphemeralValue ...>,
 ...
}

这些 Channel 包括:

  • 状态字段对应的 Channel

    • topic
    • content
    • score
  • 节点之间通信使用的 Channel

    • __start__
    • write_essay
    • score_essay
    • 以及内部自动生成的 branch Channel

这些 Channel 由 StateGraph 在编译时自动创建,用于实现节点之间的数据传递。

create_agent

create_agent 提供了一个 可用于生产环境的 Agent 实现

上面我们知道langgraph实现了langchain-core的Runable协议,并且提供了一个图运行时,该运行时可以将输入,输出,节点,状态转换串联起来,而langchain基于langgraph构建,其底层必然是一个图结构,这里就从源码开始入手解析create_agent是如何构建这个图结构的:

def create_agent(
    model: str | BaseChatModel,
    tools: Sequence[BaseTool | Callable[..., Any] | dict[str, Any]] | None = None,
    *,
    system_prompt: str | SystemMessage | None = None,
    middleware: Sequence[AgentMiddleware[StateT_co, ContextT]] = (),
    response_format: ResponseFormat[ResponseT] | type[ResponseT] | dict[str, Any] | None = None,
    state_schema: type[AgentState[ResponseT]] | None = None,
    context_schema: type[ContextT] | None = None,
    checkpointer: Checkpointer | None = None,
    store: BaseStore | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    debug: bool = False,
    name: str | None = None,
    cache: BaseCache[Any] | None = None,
) -> CompiledStateGraph[
    AgentState[ResponseT], ContextT, _InputAgentState, _OutputAgentState[ResponseT]
]:

我们快速构建一个agent,然后输出其channels和nodes:

from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from pydantic import SecretStr
from pprint import pprint as print

@tool
def add(a:int, b:int) -> int:
    """
    Add two integers
    """
    return a + b

agent = create_agent(
    system_prompt="你是一个有用的助手",
    tools=[add],
    model=ChatOpenAI(
        model="deepseek-ai/DeepSeek-R1-Distill-Qwen-7B",
        base_url="https://api.siliconflow.cn/v1",
        api_key=SecretStr("****")
    )
)

if __name__ == '__main__':
    print("-"*20+"nodes"+"-"*20)
    print(agent.nodes)
    print("-" * 20 + "channels" + "-" * 20)
    print(agent.channels)
'--------------------nodes--------------------'
{'__start__': <langgraph.pregel._read.PregelNode object at 0x000001D65A3BDBE0>,
 'model': <langgraph.pregel._read.PregelNode object at 0x000001D65A24FC50>,
 'tools': <langgraph.pregel._read.PregelNode object at 0x000001D65A24FED0>}
'--------------------channels--------------------'
{'__pregel_tasks': <langgraph.channels.topic.Topic object at 0x000001D65A1A7400>,
 '__start__': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001D65A0FD540>,
 'branch:to:model': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001D65A44BCC0>,
 'branch:to:tools': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001D65A475280>,
 'jump_to': <langgraph.channels.ephemeral_value.EphemeralValue object at 0x000001D65A44B040>,
 'messages': <langgraph.channels.binop.BinaryOperatorAggregate object at 0x000001D65A44AF00>,
 'structured_response': <langgraph.channels.last_value.LastValue object at 0x000001D65A44B2C0>}

这个输出其实非常有价值,因为它把 create_agent()LangGraph Runtime(Pregel)层面真实构建出来的 Graph暴露出来了。我们逐层拆一下。

nodes:Agent 实际是 3 个 PregelNode

{'__start__': PregelNode,
 'model': PregelNode,
 'tools': PregelNode}

说明 Agent Graph 非常简单

START
  ↓
model
  ↓
tools
  ↓
model
  ↓
...

循环执行直到模型停止调用 tool。

  1. __start__

    这是 Graph API 自动生成的 起始节点

    作用:

    输入 state
       ↓
    触发 model
    

    类似:

    START → model
    
  2. model

    这是 LLM 调用节点

    职责:

    messages
       ↓
    LLM
       ↓
    AIMessage
    

    如果模型输出:

    tool_calls
    

    就跳转:

    model → tools
    

    否则结束。

3 tools

这是 **工具执行节点**。

职责:

```
读取 AIMessage.tool_calls
    ↓
执行 tool
    ↓
生成 ToolMessage
```

然后再把 ToolMessage 写回:

```
messages
```

再回到:

```
tools → model
```

所以 Agent Loop 是:

```
model → tools → model → tools ...
```

channels:Agent 的 State

{
 '__pregel_tasks': Topic,
 '__start__': EphemeralValue,
 'branch:to:model': EphemeralValue,
 'branch:to:tools': EphemeralValue,
 'jump_to': EphemeralValue,
 'messages': BinaryOperatorAggregate,
 'structured_response': LastValue
}

这些其实就是 Graph State + Runtime Control Channels

我们分两类看。

  1. messages

    messages: BinaryOperatorAggregate
    

    这是 最核心的 channel

    因为 Agent State 是:

    messages: list[BaseMessage]
    

    但 Pregel 是 step-based execution,所以要用 reducer。

    这里使用:

    BinaryOperatorAggregate
    

    作用是:

    messages = messages + new_messages
    

    类似 reducer:

    state.messages.append(...)
    

    所以执行流程:

    HumanMessage
        ↓
    AIMessage
        ↓
    ToolMessage
        ↓
    AIMessage
    

    全部累计在 messages 里。

  2. structured_response

    structured_response: LastValue
    

    只有当你使用:

    response_format=...
    

    才会真正用到。

    作用:

    最终结构化输出
    

    LastValue 表示:

    只保留最后一次写入
    

Runtime 控制 channels

这些是 LangGraph 内部控制执行流的 channel

  1. __pregel_tasks

    Topic
    

    这是 Pregel runtime 用来调度 下一轮执行任务 的。

    类似:

    scheduler queue
    
  2. branch:to:model

    EphemeralValue
    

    用于:

    触发 model node
    

    类似:

    edge → model
    
  3. branch:to:tools

    EphemeralValue
    

    触发:

    model → tools
    

    当 LLM 生成 tool_calls。

  4. jump_to

    EphemeralValue
    

    用于:

    动态跳转节点
    

    例如:

    middleware
    interrupt
    resume
    

    都会用这个。

  5. __start__

    Graph start signal。

    invoke()
       ↓
    写入 __start__
       ↓
    触发 model
    

完整执行流程

根据这些 node + channel,可以推导完整运行过程:

invoke()
   ↓
__start__ channel
   ↓
branch:to:model
   ↓
model node
   ↓
LLM output
   ↓
messages reducer
   ↓
如果有 tool_calls
      ↓
branch:to:tools
      ↓
tools node
      ↓
ToolMessage
      ↓
messages reducer
      ↓
branch:to:model

循环:

model → tools → model

直到:

没有 tool_calls

结束。

为了加深对create_agent的理解,现在使用低级API来模仿实现一个Agent Loop:

"""
基于 LangGraph Pregel + NodeBuilder 完全手写实现的 Agent Loop。

与官方 create_agent 编译后的结构对齐:
- nodes: __start__, model, tools
- channels: messages, branch:to:model, branch:to:tools, jump_to, structured_response 等
"""

from langgraph.pregel import Pregel, NodeBuilder
from langgraph.pregel._write import ChannelWriteEntry
from langgraph.channels import (
    EphemeralValue,
    BinaryOperatorAggregate,
    LastValue,
)
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain_core.messages import (
    AIMessage,
    HumanMessage,
    SystemMessage,
    ToolMessage,
    AnyMessage,
)
from pydantic import SecretStr


# ---------------------------------------------------------------------------
# 工具定义
# ---------------------------------------------------------------------------

@tool
def add(a: int, b: int) -> int:
    """Add two integers."""
    return a + b


TOOLS_BY_NAME = {"add": add}


# ---------------------------------------------------------------------------
# 模型
# ---------------------------------------------------------------------------

LLM = ChatOpenAI(
    model="deepseek-ai/DeepSeek-R1-Distill-Qwen-7B",
    base_url="https://api.siliconflow.cn/v1",
    api_key=SecretStr("sk-qluvqlqupbpcoirxikfukgtmiiovvcpjxxyzhytizbntsxvq"),
)
LLM_BOUND = LLM.bind_tools([add])

SYSTEM_PROMPT = "你是一个有用的助手,在涉及数学计算时请调用 add 工具。"


# ---------------------------------------------------------------------------
# messages 通道的 reducer:累加消息列表
# ---------------------------------------------------------------------------

def messages_reducer(current: list[AnyMessage] | None, update: list[AnyMessage] | None):
    if update is None:
        return current or []
    if current is None:
        return list(update) if update else []
    return current + list(update)


# ---------------------------------------------------------------------------
# __start__ 节点:仅在「尚无 AI 回复」时写入 branch:to:model,触发首轮 model
# ---------------------------------------------------------------------------

def start_node(messages: list[AnyMessage] | None) -> dict:
    if not messages:
        return {"branch:to:model": True}
    if any(isinstance(m, AIMessage) for m in messages):
        return {}
    return {"branch:to:model": True}


def _write_branch_to_model(out: dict):
    v = out.get("branch:to:model")
    return v if v is not None else (object() if "branch:to:model" in out else None)  # skip_none 只跳过 None

start_node_builder = (
    NodeBuilder()
    .subscribe_only("messages")
    .do(start_node)
    .write_to(ChannelWriteEntry("branch:to:model", skip_none=True, mapper=lambda o: o.get("branch:to:model")))
)


# ---------------------------------------------------------------------------
# model 节点:订阅 branch:to:model,读取 messages,调用 LLM,写 messages 和可选的 branch:to:tools
# ---------------------------------------------------------------------------

def call_model(data: dict) -> dict:
    messages: list[AnyMessage] = data.get("messages") or []
    if not messages:
        return {}
    with_system = [SystemMessage(content=SYSTEM_PROMPT)] + list(messages)
    response: AIMessage = LLM_BOUND.invoke(with_system)

    out = {"messages": [response]}
    if response.tool_calls:
        out["branch:to:tools"] = True
    return out


model_node_builder = (
    NodeBuilder()
    .subscribe_to("branch:to:model", read=True)
    .read_from("messages")
    .do(call_model)
    .write_to(
        ChannelWriteEntry("messages", mapper=lambda o: o.get("messages")),
        ChannelWriteEntry("branch:to:tools", skip_none=True, mapper=lambda o: o.get("branch:to:tools")),
    )
)


# ---------------------------------------------------------------------------
# tools 节点:订阅 branch:to:tools,读取 messages,执行 tool_calls,写 messages 和 branch:to:model
# ---------------------------------------------------------------------------

def call_tools(data: dict) -> dict:
    messages: list[AnyMessage] = data.get("messages") or []
    if not messages:
        return {}
    last = messages[-1]
    if not isinstance(last, AIMessage) or not last.tool_calls:
        return {}

    tool_messages = []
    for tc in last.tool_calls:
        name = tc["name"]
        args = tc.get("args") or {}
        tool_fn = TOOLS_BY_NAME.get(name)
        if not tool_fn:
            tool_messages.append(
                ToolMessage(content=f"Unknown tool: {name}", tool_call_id=tc["id"])
            )
            continue
        result = tool_fn.invoke(args)
        tool_messages.append(
            ToolMessage(content=str(result), tool_call_id=tc["id"])
        )

    return {
        "messages": tool_messages,
        "branch:to:model": True,
    }


tools_node_builder = (
    NodeBuilder()
    .subscribe_to("branch:to:tools", read=True)
    .read_from("messages")
    .do(call_tools)
    .write_to(
        ChannelWriteEntry("messages", mapper=lambda o: o.get("messages")),
        ChannelWriteEntry("branch:to:model", mapper=lambda o: o.get("branch:to:model")),
    )
)


# ---------------------------------------------------------------------------
# 构建 Pregel 图(对齐官方 channels / nodes 命名)
# ---------------------------------------------------------------------------

app = Pregel(
    nodes={
        "__start__": start_node_builder.build(),
        "model": model_node_builder.build(),
        "tools": tools_node_builder.build(),
    },
    channels={
        "messages": BinaryOperatorAggregate(list, operator=messages_reducer),
        "branch:to:model": EphemeralValue(bool),
        "branch:to:tools": EphemeralValue(bool),
        "jump_to": EphemeralValue(str),
        "structured_response": LastValue(str),
    },
    input_channels=["messages"],
    output_channels=["messages"],
)


# ---------------------------------------------------------------------------
# 运行
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    from pprint import pprint

    result = app.invoke({
        "messages": [
            HumanMessage(content="请用 add 工具算 1+2 等于多少"),
        ]
    })
    pprint(result)

在这里插入图片描述

中间件

Middleware(中间件)提供了一种方式,可以 更精细地控制 Agent 内部发生的事情。中间件在以下场景中非常有用:

  • 通过 日志、分析和调试 来跟踪 Agent 的行为
  • 修改或转换 Prompt、工具选择以及输出格式
  • 添加 重试(retry)、回退(fallback)和提前终止(early termination) 的逻辑
  • 实施 速率限制(rate limit)、安全护栏(guardrails)以及 PII(敏感信息)检测

Agent 的核心循环包括以下步骤:

  1. 调用模型(Model)
  2. 让模型决定要执行哪些工具(Tools)
  3. 执行工具
  4. 如果模型不再调用任何工具,则结束

这张图片需要记住:
在这里插入图片描述

Middleware 可以在这些步骤 前后插入 Hook,从而对整个流程进行干预或增强。

Hooks

可以通过实现 Hook(钩子) 来构建自定义 Middleware,这些 Hook 会在 Agent 执行流程的特定阶段 被触发。

Middleware 提供两种类型的 Hook,用于拦截和控制 Agent 的执行流程:

Node 风格 Hook(Node-style hooks)

特定执行阶段按顺序运行

常见用途:

  • 日志记录(logging)
  • 数据校验(validation)
  • 状态更新(state updates)

可用 Hook

  • before_agent
    在 Agent 开始执行之前触发(每次调用只执行一次)

  • before_model
    在每一次调用模型之前触发

  • after_model
    在每一次模型返回结果之后触发

  • after_agent
    在 Agent 执行结束之后触发(每次调用只执行一次)

示例(推荐:类写法)

from langchain.agents.middleware import AgentMiddleware, AgentState, hook_config
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any


class MessageLimitMiddleware(AgentMiddleware):

    def __init__(self, max_messages: int = 50):
        super().__init__()
        self.max_messages = max_messages

    @hook_config(can_jump_to=["end"])
    def before_model(
        self,
        state: AgentState,
        runtime: Runtime
    ) -> dict[str, Any] | None:

        if len(state["messages"]) >= self.max_messages:
            return {
                "messages": [AIMessage("Conversation limit reached.")],
                "jump_to": "end"
            }

        return None

    def after_model(
        self,
        state: AgentState,
        runtime: Runtime
    ) -> dict[str, Any] | None:

        print(f"Model returned: {state['messages'][-1].content}")
        return None

这个 Middleware 实现了两个功能:

1️⃣ 对话长度限制

  • before_model 阶段检查 messages
  • 如果消息数量超过设定值
  • 直接返回 "jump_to": "end" 提前结束 Agent

2️⃣ 模型返回日志

  • after_model 阶段打印模型输出
  • 方便调试或记录日志
Wrap 风格 Hook(Wrap-style hooks)

Wrap-style Hook 会 包裹(wrap)模型调用或工具调用

适合用于:

  • 重试机制(retry)
  • 缓存(caching)
  • 输入输出转换(transformation)

与 Node-style Hook 的区别是:

你可以控制 handler 被调用的次数

  • 0 次 → 直接短路(short-circuit)
  • 1 次 → 正常执行
  • 多次 → 实现重试逻辑

可用 Hook:

  • wrap_model_call
    包裹每一次模型调用

  • wrap_tool_call
    包裹每一次工具调用

示例(推荐:类写法)

from langchain.agents.middleware import AgentMiddleware, ModelRequest, ModelResponse
from typing import Callable


class RetryMiddleware(AgentMiddleware):

    def __init__(self, max_retries: int = 3):
        super().__init__()
        self.max_retries = max_retries

    def wrap_model_call(
        self,
        request: ModelRequest,
        handler: Callable[[ModelRequest], ModelResponse],
    ) -> ModelResponse:

        for attempt in range(self.max_retries):
            try:
                return handler(request)

            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise

                print(f"Retry {attempt + 1}/{self.max_retries} after error: {e}")

这个 Middleware 实现了 模型调用自动重试

  • 每次模型调用都会被 wrap_model_call 包裹

  • 如果调用失败:

    • 自动重试
    • 最多重试 max_retries
  • 最后一次仍然失败则抛出异常

两类Hock的区别

AgentMiddleware 接口实际上定义了所有 Hook 方法,因此从 底层实现角度 来看,并不存在严格的 “Hook 类型” 或 “Middleware 类型” 的区分。无论是 Node-style 还是 Wrap-style,本质上都是在 Agent 执行流程中的某个阶段插入自定义逻辑。

不过在设计和文档层面,LangChain 仍然将其划分为两类,这样做主要有两个原因:

  1. 功能解耦
    将 Middleware 按执行模式划分,可以让开发者更容易理解各类逻辑的职责边界。例如日志记录、状态检查等逻辑适合放在 Node-style Hook 中,而重试、缓存、调用拦截等逻辑则更适合使用 Wrap-style Hook。这种划分能够使 Middleware 的职责更加清晰,避免在同一逻辑中混杂过多不同类型的控制行为。

  2. 执行语义不同
    两类 Hook 在执行方式上存在明显差异。Node-style Hook 是在固定节点按顺序执行的,它们更像是 Agent 流程中的“生命周期回调”;而 Wrap-style Hook 则是包裹在模型或工具调用外部,通过接管 handler 来控制实际执行过程。开发者可以决定是否调用 handler、调用几次,甚至完全替换执行逻辑,因此具有更强的控制能力。

简单来说,Node-style Hook 更侧重于 在流程节点上扩展行为,而 Wrap-style Hook 更侧重于 拦截并控制具体调用过程。这种区分并不是底层强制的,而是为了让 Middleware 的使用方式更加清晰和可维护。

还记得之前的动态替换模型的中间件吗,

from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse


basic_model = ChatOpenAI(model="gpt-4.1-mini")
advanced_model = ChatOpenAI(model="gpt-4.1")

@wrap_model_call
def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse:
    """Choose model based on conversation complexity."""
    message_count = len(request.state["messages"])

    if message_count > 10:
        # Use an advanced model for longer conversations
        model = advanced_model
    else:
        model = basic_model

    return handler(request.override(model=model))

这里实现 @wrap_model_call 的目的,其实就是 “狸猫换太子”——在真正调用模型之前,动态替换掉原本要使用的模型实例。

之所以必须使用 Wrap-style Hook,是因为它可以直接拦截模型调用,并拿到当前的 ModelRequest。开发者可以在这个阶段:

  • 修改请求参数
  • 替换模型实例
  • 修改 Prompt
  • 或者直接短路调用

在这个例子中,中间件通过读取 request.state["messages"] 的长度来判断当前对话复杂度,然后使用 request.override(model=model) 替换实际调用的模型。

最终流程实际上变成了:

Agent -> wrap_model_call -> 选择模型 -> handler() -> 实际调用模型

如果使用 Node-style Hook(例如 before_model)其实也能读取状态,但它 无法直接控制模型调用本身,只能通过返回新的 state 来影响后续流程。因此像 模型切换、调用重试、缓存命中、请求改写 这类需要拦截调用的逻辑,更适合使用 Wrap-style Hook。

换句话说:

  • Node-style Hook 更像是 Agent 生命周期中的“观察点”和“修正点”
  • Wrap-style Hook 更像是对关键调用过程的“拦截器”或“代理层”

因此在实际开发中,一个复杂的 Agent 系统往往会同时使用两种 Middleware:

  • Node-style 用于 监控与流程控制,而 Wrap-style 用于 调用层面的增强与接管

状态更新(State updates)

无论是 Node-style Hook 还是 Wrap-style Hook,都可以对 Agent 的状态(state)进行更新。不过两者的实现机制有所不同。

Node-style Hook
before_agentbefore_modelafter_modelafter_agent

  • 直接 返回一个 dict
  • 这个 dict 会通过图(graph)的 reducers 合并到 Agent 的状态中

Wrap-style Hook
wrap_model_callwrap_tool_call

  • 在模型调用时,需要返回 ExtendedModelResponse
  • 同时通过 Command 注入状态更新
  • 在工具调用时,可以 直接返回 Command

这种方式适用于以下场景:

  • 在模型调用过程中统计使用情况(usage metadata)
  • 在某些触发点进行总结(summarization)
  • 根据请求或响应计算新的状态字段
  • 记录自定义信息
Node-style Hook 更新状态

Node-style Hook 只需要返回一个 dict,其中的 key 会映射到 Agent 的 state 字段。

from langchain.agents.middleware import after_model, AgentState
from langgraph.runtime import Runtime
from typing import Any
from typing_extensions import NotRequired


class TrackingState(AgentState):
    model_call_count: NotRequired[int]


@after_model(state_schema=TrackingState)
def increment_after_model(
    state: TrackingState,
    runtime: Runtime
) -> dict[str, Any] | None:

    return {
        "model_call_count": state.get("model_call_count", 0) + 1
    }

这个示例实现的逻辑是:

  • 每次模型调用完成后
  • model_call_count 自动加 1
  • 最终统计 Agent 总共调用了多少次模型
Wrap-style Hook 更新状态

wrap_model_call 中,需要返回 ExtendedModelResponse,并使用 Command 注入状态更新。

from typing import Callable
from langchain.agents.middleware import (
    wrap_model_call,
    ModelRequest,
    ModelResponse,
    AgentState,
    ExtendedModelResponse
)
from langgraph.types import Command
from typing_extensions import NotRequired


class UsageTrackingState(AgentState):
    """带 token 使用统计的 Agent state"""

    last_model_call_tokens: NotRequired[int]


@wrap_model_call(state_schema=UsageTrackingState)
def track_usage(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ExtendedModelResponse:

    response = handler(request)

    return ExtendedModelResponse(
        model_response=response,
        command=Command(
            update={"last_model_call_tokens": 150}
        ),
    )

在这个例子中:

  • 先调用 handler(request) 执行真实模型调用
  • 然后通过 Command 更新 Agent 状态
  • 最终返回 ExtendedModelResponse

Command 会通过 Graph 的 reducer 机制 合并到状态中,因此:

  • state 更新是 正确合并的
  • messages 这类字段是 追加(additive) 而不是覆盖
多 Middleware 组合(Composition)

当多个 Middleware 同时返回 ExtendedModelResponse 时,它们的 Command 会自动组合。

合并规则:

1️⃣ 通过 reducer 合并

每个 Command 都会被当作一次独立的 state 更新。

对于 messages 这种字段:

  • 更新是 追加的
  • 不会覆盖之前的消息

2️⃣ 冲突时外层优先(Outer wins)

对于没有 reducer 的普通字段:

  • 更新顺序是 inner → outer
  • 如果 key 冲突,最外层 Middleware 的值会覆盖内部值

3️⃣ 支持重试安全(Retry-safe)

如果外层 Middleware 存在 重试逻辑(例如多次调用 handler()):

  • 只有 最终成功的那一次调用Command 会被保留
  • 之前失败调用产生的更新会被丢弃

示例:多 Middleware 组合

from typing import Annotated, Callable

from langchain.agents.middleware import (
    AgentMiddleware,
    AgentState,
    ExtendedModelResponse,
    ModelRequest,
    ModelResponse,
)
from langchain.messages import SystemMessage
from langgraph.types import Command
from typing_extensions import NotRequired

定义 reducer:

def _last_wins(_a: str, b: str) -> str:
    """Reducer:最后写入者生效(outer 覆盖 inner)"""
    return b

定义 Agent state:

class CustomMiddlewareState(AgentState):
    """
    Agent state:
    trace_layer 使用 last-wins reducer
    messages 使用追加 reducer
    """

    trace_layer: NotRequired[Annotated[str, _last_wins]]
class OuterMiddleware(AgentMiddleware):

    def wrap_model_call(
        self,
        request: ModelRequest,
        handler: Callable[[ModelRequest], ModelResponse],
    ) -> ExtendedModelResponse:

        response = handler(request)

        return ExtendedModelResponse(
            model_response=response,
            command=Command(update={
                "trace_layer": "outer",
                "messages": [SystemMessage(content="[Outer ran]")],
            }),
        )

class InnerMiddleware(AgentMiddleware):
    """
    同样写入 trace_layer 和 messages

    规则:
    trace_layer → outer 覆盖 inner
    messages → 追加
    """

    def wrap_model_call(
        self,
        request: ModelRequest,
        handler: Callable[[ModelRequest], ModelResponse],
    ):

        response = handler(request)

        return ExtendedModelResponse(
            model_response=response,
            command=Command(update={
                "trace_layer": "inner",
                "messages": [SystemMessage(content="[Inner ran]")],
            }),
        )

假设两个 Middleware 同时执行:

  • trace_layer

    inner → outer
    

    最终值为:

    outer
    
  • messages

    [Inner ran]
    [Outer ran]
    

消息会 按顺序追加,而不会覆盖

LangChain 的 Middleware 状态更新本质依赖于 LangGraph 的 reducer 机制

  • Node-style Hook:直接返回 dict
  • Wrap-style Hook:通过 Command 注入 state 更新
  • 多 Middleware:通过 reducer 自动组合状态更新

这种设计使得 复杂 Middleware 组合、重试机制、并发更新 都能够稳定工作。

中间件状态机

Middleware 可以通过 扩展 Agent 的 state 来添加自定义字段。这使得 Middleware 能够在 Agent 执行过程中维护和共享额外的信息。

from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.agents.middleware import AgentState, AgentMiddleware
from typing_extensions import NotRequired
from typing import Any

定义自定义 state:

class CustomState(AgentState):
    model_call_count: NotRequired[int]
    user_id: NotRequired[str]

在这个 state 中新增了两个字段:

  • model_call_count:记录模型调用次数
  • user_id:记录当前用户 ID

定义 Middleware:

class CallCounterMiddleware(AgentMiddleware[CustomState]):

    state_schema = CustomState

    def before_model(self, state: CustomState, runtime) -> dict[str, Any] | None:

        count = state.get("model_call_count", 0)

        if count > 10:
            return {"jump_to": "end"}

        return None

    def after_model(self, state: CustomState, runtime) -> dict[str, Any] | None:

        return {
            "model_call_count": state.get("model_call_count", 0) + 1
        }

这个 Middleware 实现的逻辑:

  • before_model 中检查模型调用次数
  • 如果超过 10 次,则通过 jump_to="end" 提前终止 Agent
  • after_model 中累加 model_call_count

创建 Agent:

agent = create_agent(
    model="gpt-4.1",
    middleware=[CallCounterMiddleware()],
    tools=[],
)

调用 Agent,并传入自定义 state:

result = agent.invoke({
    "messages": [HumanMessage("Hello")],
    "model_call_count": 0,
    "user_id": "user-123",
})

这里的调用参数中:

  • messages 是默认 state 字段
  • model_call_countuser_id自定义 state 字段

Middleware 可以在整个 Agent 执行过程中 持续读取和更新这些值

有人可能会好奇,这里的状态机和create_agent时传入的state_schema有什么区别?

我们都知道,LangChain 会做一件事情:

把所有 middleware 声明的 state_schema 自动合并成一个最终 schema。

类似:

FinalAgentState =
    AgentState
  + MiddlewareAState
  + MiddlewareBState
  + ...

LangChain 这样设计其实是为了实现:

Middleware 可插拔(Plug-in Architecture)

如果 middleware 需要字段:

model_call_count

它不需要修改 Agent:

create_agent(...)

只需要:

middleware=[CallCounterMiddleware()]

state 就会自动扩展,这就是 插件式 state 扩展

执行顺序(Execution order)

当同时使用多个 Middleware 时,需要理解它们的执行顺序。例如:

agent = create_agent(
    model="gpt-4.1",
    middleware=[middleware1, middleware2, middleware3],
    tools=[...],
)

1️⃣ before_* Hook 按顺序执行

middleware1.before_agent()
middleware2.before_agent()
middleware3.before_agent()

随后 Agent Loop 开始:

middleware1.before_model()
middleware2.before_model()
middleware3.before_model()

2️⃣ wrap_* Hook 像函数调用一样嵌套

middleware1.wrap_model_call()
    → middleware2.wrap_model_call()
        → middleware3.wrap_model_call()
            → model

也就是说:

第一个 middleware 会包裹后面所有 middleware。

3️⃣ after_* Hook 按相反顺序执行

middleware3.after_model()
middleware2.after_model()
middleware1.after_model()

4️⃣ Agent Loop 结束

middleware3.after_agent()
middleware2.after_agent()
middleware1.after_agent()
Hook 类型 执行顺序
before_* hooks 从前到后执行
after_* hooks 从后到前执行(逆序)
wrap_* hooks 嵌套执行(类似函数包装)

这种执行顺序其实和很多框架中的 Middleware / Interceptor / Filter 机制类似。

Agent 跳转(Agent jumps)

如果希望在 Middleware 中 提前结束或改变执行流程,可以返回一个包含 jump_to 的字典。

  • "end"
    跳转到 Agent 执行结束(或第一个 after_agent Hook)

  • "tools"
    跳转到 tools 节点

  • "model"
    跳转到 model 节点(或第一个 before_model Hook)

from langchain.agents.middleware import AgentMiddleware, hook_config, AgentState
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any


class BlockedContentMiddleware(AgentMiddleware):

    @hook_config(can_jump_to=["end"])
    def after_model(
        self,
        state: AgentState,
        runtime: Runtime
    ) -> dict[str, Any] | None:

        last_message = state["messages"][-1]

        if "BLOCKED" in last_message.content:
            return {
                "messages": [AIMessage("I cannot respond to that request.")],
                "jump_to": "end"
            }

        return None

这个 Middleware 的逻辑是:

  1. after_model 阶段检查模型返回内容
  2. 如果检测到 "BLOCKED" 关键词
  3. 则返回新的回复
  4. 并通过 jump_to="end" 直接结束 Agent 执行

这里@hook_config(can_jump_to=["end"]) 的作用其实是:

声明这个 Hook 允许跳转到哪些节点。

换句话说,它是在 编译 Agent Graph 时给 Hook 配置“合法跳转目标”

在 Middleware 里你可以返回:

return {
    "jump_to": "end"
}

但 Agent 的执行实际上是一个 LangGraph Graph

Graph 在编译时必须提前知道:

这个节点可以跳到哪些节点

否则 Graph 是 无法构建边(edge) 的。

所以需要:

@hook_config(can_jump_to=["end"])

来告诉系统:

这个 Hook 可能会跳到 end 节点

LangChain 在内部编译 Graph 时会创建对应的边。

如果不写会发生什么?如果你这样写:

class BlockedMiddleware(AgentMiddleware):

    def after_model(self, state, runtime):
        return {"jump_to": "end"}

但没有:

@hook_config(can_jump_to=["end"])

那么在运行时通常会报类似错误:

Invalid jump target 'end'

或者 Graph 编译阶段报错。

因为 Graph 不知道:

after_model
   ↓
可以跳到哪里

hook_config 本质干了什么

它只是给 Hook 附加一些 元信息(metadata)

类似:

hook_config = {
    "can_jump_to": ["end"]
}

LangChain 在编译 Agent 时会读取这个配置,然后:

after_model node
    ├── normal edge → next step
    └── jump edge → end

可以允许多个跳转

例如:

@hook_config(can_jump_to=["end", "tools"])

那这个 Hook 就可以返回:

return {"jump_to": "tools"}

或者:

return {"jump_to": "end"}

为什么官方要设计成装饰器

因为 Hook 只是普通函数:

def after_model(...)

LangChain 需要一种方式给函数附加 Graph 配置

所以用了 decorator。

其实效果类似:

after_model._hook_config = {
    "can_jump_to": ["end"]
}

如果你在研究 Agent Loop + Pregel,这里其实还有一个很有意思的点:

LangChain 的 jump_to 在 LangGraph 里其实是一个:

Command(goto=...)

也就是说:

return {"jump_to": "end"}

最后会变成:

Command(goto="end")

这也是为什么 Agent Middleware 本质就是 LangGraph 的语法糖

中间件最佳实践(Best practices)

保持 Middleware 职责单一
每个 Middleware 应该只负责一件事情。

优雅处理错误
不要让 Middleware 内部错误导致 Agent 崩溃。

选择合适的 Hook 类型

  • Node-style Hook:适合顺序逻辑,例如

    • 日志记录
    • 数据校验
  • Wrap-style Hook:适合控制执行流程,例如

    • 重试(retry)
    • 回退(fallback)
    • 缓存(cache)

清晰记录自定义 State 字段
如果 Middleware 扩展了 state,需要明确说明这些字段的含义。

在集成前单独测试 Middleware
Middleware 最好先进行单元测试,再加入 Agent。

注意 Middleware 执行顺序
如果某些 Middleware 非常关键,应当放在列表的前面。

尽量使用内置 Middleware
如果官方已经提供了对应功能,优先使用官方实现。

Agent 的运行时上下文管理

Runtime 和 Context Engineering 在 Agent 里是强相关的两个概念,因为:

  • Runtime 负责在执行过程中提供环境信息(state、config、runtime object)
  • Context Engineering 负责把这些信息组织成 模型真正看到的上下文

Runtime

在 LangChain 中,create_agent 实际上是运行在 LangGraph 的 Runtime 之上的。

LangGraph 会向 Agent 暴露一个 Runtime 对象,其中包含以下信息:

  • Context:静态信息,例如用户 ID、数据库连接或其他与一次 Agent 调用相关的依赖。
  • Store:一个 BaseStore 实例,用于实现 长期记忆(long-term memory)
  • Stream writer:用于在 "custom" stream 模式下 实时流式输出信息 的对象。

Runtime 中的 context 提供了一种 依赖注入(Dependency Injection)机制,可用于工具(tools)和中间件(middleware)。

相比于将这些信息写死在代码里,或者使用全局变量,你可以在 调用 Agent 时动态注入运行时依赖,例如:

  • 数据库连接
  • 用户 ID
  • 运行配置

这种方式可以让工具更加:

  • 易于测试
  • 可复用
  • 灵活可配置

在工具和中间件中都可以访问 Runtime 中的信息。

在使用 create_agent 创建 Agent 时,可以通过 context_schema 指定 Runtime context 的结构

在调用 Agent 时,通过 context 参数传入对应的运行时信息。

from dataclasses import dataclass

from langchain.agents import create_agent


@dataclass
class Context:
    user_name: str

agent = create_agent(
    model="gpt-5-nano",
    tools=[...],
    context_schema=Context  
)

agent.invoke(
    {"messages": [{"role": "user", "content": "What's my name?"}]},
    context=Context(user_name="John Smith")
)

在工具内部可以通过 Runtime 完成以下操作:

  • 访问 context
  • 读取或写入 长期记忆(store)
  • custom stream 写入信息(例如工具执行进度)

可以使用 ToolRuntime 参数在工具中访问 Runtime。

from dataclasses import dataclass
from langchain.tools import tool, ToolRuntime  

@dataclass
class Context:
    user_id: str

@tool
def fetch_user_email_preferences(runtime: ToolRuntime[Context]) -> str:
    """Fetch the user's email preferences from the store."""
    user_id = runtime.context.user_id  

    preferences: str = "The user prefers you to write a brief and polite email."
    if runtime.store:
        if memory := runtime.store.get(("users",), user_id):
            preferences = memory.value["preferences"]

    return preferences

在这个示例中:

  1. runtime.context 中读取 user_id
  2. runtime.store 中读取用户的长期偏好信息
  3. 返回用户偏好的邮件风格

在中间件中也可以访问 Runtime 信息,例如:

  • 构建 动态 Prompt
  • 修改消息
  • 根据用户上下文控制 Agent 行为

访问方式:

  • Node-style hooks:通过 Runtime 参数获取
  • Wrap-style hooks:通过 ModelRequest.runtime 获取

示例:

from dataclasses import dataclass

from langchain.messages import AnyMessage
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import dynamic_prompt, ModelRequest, before_model, after_model
from langgraph.runtime import Runtime


@dataclass
class Context:
    user_name: str

# 动态 Prompt
@dynamic_prompt
def dynamic_system_prompt(request: ModelRequest) -> str:
    user_name = request.runtime.context.user_name  
    system_prompt = f"You are a helpful assistant. Address the user as {user_name}."
    return system_prompt

# before_model hook
@before_model
def log_before_model(state: AgentState, runtime: Runtime[Context]) -> dict | None:
    print(f"Processing request for user: {runtime.context.user_name}")
    return None

# after_model hook
@after_model
def log_after_model(state: AgentState, runtime: Runtime[Context]) -> dict | None:
    print(f"Completed request for user: {runtime.context.user_name}")
    return None

agent = create_agent(
    model="gpt-5-nano",
    tools=[...],
    middleware=[dynamic_system_prompt, log_before_model, log_after_model],
    context_schema=Context
)

agent.invoke(
    {"messages": [{"role": "user", "content": "What's my name?"}]},
    context=Context(user_name="John Smith")
)

在这个示例中:

  • dynamic_system_prompt 根据 Runtime 中的用户信息动态生成 System Prompt
  • before_model 在模型调用前记录日志
  • after_model 在模型调用后记录日志

所有这些逻辑都可以通过 Runtime context 获取当前用户信息,而不需要依赖全局变量或硬编码配置。

Context

在Runtime小节中,我们提到了Context,什么是Context?从源码可以直观的看出,Context是Runtime的组成部分:

@dataclass(**_DC_KWARGS)
class Runtime(Generic[ContextT]):
context: ContextT = field(default=None)  # type: ignore[assignment]
    """Static context for the graph run, like `user_id`, `db_conn`, etc.
    
    Can also be thought of as 'run dependencies'."""

    store: BaseStore | None = field(default=None)
    """Store for the graph run, enabling persistence and memory."""

    stream_writer: StreamWriter = field(default=_no_op_stream_writer)
    """Function that writes to the custom stream."""

    previous: Any = field(default=None)
    """The previous return value for the given thread.
    
    Only available with the functional API when a checkpointer is provided.
    """

只要是一个思维正常的开发者,都应该明白,运行时的概念是最大的,比如Go的各种特性都是基于Runtime去实现的,而langchain的Context也是Langchain Runtime的一种特性。

要构建可靠的 Agent,你需要控制:

  • Agent Loop 每个步骤发生的事情
  • 各个步骤之间发生的事情
Context 类型 可控制内容 生命周期
Model Context 模型调用时输入的内容(指令、消息历史、工具、返回格式) 瞬时
Tool Context 工具可以访问或生成的数据(state、store、runtime context) 持久
Life-cycle Context 模型调用和工具调用之间发生的逻辑(摘要、guardrails、日志等) 持久

瞬时上下文(Transient Context)

某一次 LLM 调用时看到的上下文

例如:

  • 修改 messages
  • 修改 prompt
  • 修改 tools

这些修改 不会改变 state 中保存的数据

持久上下文(Persistent Context)

跨多轮对话持续存在的数据

例如:

  • 生命周期 hook 修改 state
  • tool 写入 store
  • 系统记录用户信息

这些数据会在之后的执行中继续存在。

在整个执行过程中,Agent 会访问不同的数据来源(读或写)。

数据来源 也称为 作用范围 示例
Runtime Context 静态配置 单次对话 用户 ID、API Key、数据库连接、权限、环境配置
State 短期记忆 单次对话 当前消息、上传文件、认证状态、工具结果
Store 长期记忆 跨对话 用户偏好、提取的知识、历史数据

在 LangChain 中,Middleware(中间件)是实现 Context Engineering 的核心机制

Middleware 允许你在 Agent 生命周期的任何阶段进行干预,例如:

  • 更新上下文
  • 跳转到 Agent 生命周期中的其他步骤

在接下来的内容中,你会看到 Middleware API 被频繁使用,因为它是 实现 Context Engineering 的核心工具

Model context(模型上下文)

Model Context 指的是:

每一次模型调用时传递给 LLM 的所有信息。

它包括:

  • 指令(instructions)
  • 可用工具
  • 使用哪个模型
  • 输出格式

这些决策会 直接影响 Agent 的可靠性和成本

模型上下文通常由以下几个部分组成:

System Prompt

开发者提供给 LLM 的基础指令,用于定义模型的行为方式。

Messages

发送给 LLM 的完整消息列表(即对话历史)。

Tools

Agent 可以使用的工具,用于执行操作或获取外部信息。

Model

实际调用的模型(包括模型配置)。

Response Format

模型最终输出的 结构化格式或 Schema 约束

System Prompt

System Prompt 用于定义 LLM 的行为和能力。

在不同的情况下,Agent 可能需要不同的指令,例如:

  • 不同用户
  • 不同上下文
  • 不同对话阶段

一个优秀的 Agent 会根据:

  • 用户历史记忆
  • 用户偏好
  • 当前配置

动态生成 适合当前对话状态的系统指令

示例:基于 State 的动态 System Prompt

下面的示例根据 对话轮数 动态调整系统提示词。

from langchain.agents import create_agent
from langchain.agents.middleware import dynamic_prompt, ModelRequest

@dynamic_prompt
def state_aware_prompt(request: ModelRequest) -> str:
    # request.messages 是 request.state["messages"] 的快捷方式
    message_count = len(request.messages)

    base = "You are a helpful assistant."

    if message_count > 10:
        base += "\nThis is a long conversation - be extra concise."

    return base

agent = create_agent(
    model="gpt-4.1",
    tools=[...],
    middleware=[state_aware_prompt]
)

在这个例子中:

  • request.messages 用于访问当前对话历史
  • 当对话超过 10 条消息时
  • System Prompt 会增加额外指令,要求模型 更加简洁地回答

这就是 基于 State 的动态 Prompt 构建

我们打开dynamic_prompt的源码部分:

在这里插入图片描述

可以看到dynamic_prompt这个语法糖将我们传入的函数的返回值作为了系统的提示词,并且使用override覆盖写入,而这个System Prompt会替换请求的messages列表中的System Prompt!

示例:从长期记忆(Store)中访问用户偏好

下面的示例展示了如何从 Store(长期记忆) 中读取用户偏好,并动态构建 System Prompt。

from dataclasses import dataclass
from langchain.agents import create_agent
from langchain.agents.middleware import dynamic_prompt, ModelRequest
from langgraph.store.memory import InMemoryStore

@dataclass
class Context:
    user_id: str

@dynamic_prompt
def store_aware_prompt(request: ModelRequest) -> str:
    user_id = request.runtime.context.user_id

    # 从 Store 中读取用户偏好
    store = request.runtime.store
    user_prefs = store.get(("preferences",), user_id)

    base = "You are a helpful assistant."

    if user_prefs:
        style = user_prefs.value.get("communication_style", "balanced")
        base += f"\nUser prefers {style} responses."

    return base

agent = create_agent(
    model="gpt-4.1",
    tools=[...],
    middleware=[store_aware_prompt],
    context_schema=Context,
    store=InMemoryStore()
)

在这个示例中:

  1. Runtime Context 提供 user_id
  2. 使用 runtime.store 访问 Store(长期记忆)
  3. "preferences" 命名空间中读取用户偏好
  4. 根据用户的 communication_style 动态调整 System Prompt

例如:

如果用户偏好是:

communication_style = concise

那么最终生成的 System Prompt 可能会变成:

You are a helpful assistant.
User prefers concise responses.

这种方式可以让 Agent 根据用户历史偏好动态调整行为,从而提升个性化体验和回答质量。

我们打开ModelRequest的源码:

@dataclass(init=False)
class ModelRequest(Generic[ContextT]):
    """Model request information for the agent.

    Type Parameters:
        ContextT: The type of the runtime context. Defaults to `None` if not specified.
    """

    model: BaseChatModel
    messages: list[AnyMessage]  # excluding system message
    system_message: SystemMessage | None
    tool_choice: Any | None
    tools: list[BaseTool | dict[str, Any]]
    response_format: ResponseFormat[Any] | None
    state: AgentState[Any]
    runtime: Runtime[ContextT]
    model_settings: dict[str, Any] = field(default_factory=dict)

可以看到request里面是提供了runtime结构的,除此之外的一些字段则可以通过override方法进行覆写。

示例:基于 Runtime Context 的用户权限过滤工具

下面的示例展示了如何根据 Runtime Context 中的用户权限,动态过滤 Agent 可使用的工具。

from dataclasses import dataclass
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from typing import Callable

@dataclass
class Context:
    user_role: str

@wrap_model_call
def context_based_tools(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse]
) -> ModelResponse:
    """根据 Runtime Context 中的权限过滤工具。"""
    # 从 Runtime Context 中读取用户角色
    user_role = request.runtime.context.user_role

    if user_role == "admin":
        # 管理员可以使用所有工具
        pass
    elif user_role == "editor":
        # 编辑者不能删除数据
        tools = [t for t in request.tools if t.name != "delete_data"]
        request = request.override(tools=tools)
    else:
        # 只读用户只能使用读取类工具
        tools = [t for t in request.tools if t.name.startswith("read_")]
        request = request.override(tools=tools)

    return handler(request)

agent = create_agent(
    model="gpt-4.1",
    tools=[read_data, write_data, delete_data],
    middleware=[context_based_tools],
    context_schema=Context
)

在这个示例中:

  1. Runtime Context 提供 user_role(用户角色)。

  2. 中间件根据角色 动态过滤工具列表

    • admin:可以使用所有工具
    • editor:不能使用 delete_data
    • viewer:只能使用以 read_ 开头的只读工具
  3. 通过 request.override(tools=tools) 修改当前模型调用时可见的工具。

这种方式可以实现 基于权限的工具访问控制(RBAC),确保不同角色的用户只能调用符合权限范围的工具,同时避免将权限逻辑硬编码到 Agent 或工具内部。

总之大体上就是这三类,你只需要逮着一个参数点点点,总能达到你的目的!

Tool Context(工具上下文)

工具(Tools)的特殊之处在于:

它们既可以读取上下文,也可以写入上下文。

在最基本的情况下,当一个工具被执行时:

  1. 工具接收 LLM 生成的调用参数
  2. 执行对应逻辑
  3. 返回 Tool Message 作为执行结果

也就是说:

  • 工具完成具体任务
  • 并将执行结果返回给 Agent

除此之外,工具还可以 获取重要信息并提供给模型,从而帮助模型更好地完成任务。

Reads(读取上下文)

在真实场景中,大多数工具不仅仅依赖 LLM 提供的参数,还需要额外信息,例如:

  • 用户 ID(用于数据库查询)
  • API Key(用于调用外部服务)
  • 当前会话状态(用于决策逻辑)

因此,工具通常会从以下数据源读取信息:

  • State(短期记忆)
  • Store(长期记忆)
  • Runtime Context(运行时配置)

下面的示例展示了如何从 State(短期记忆) 中读取当前会话信息。

from langchain.tools import tool, ToolRuntime
from langchain.agents import create_agent

@tool
def check_authentication(
    runtime: ToolRuntime
) -> str:
    """检查用户是否已经认证。"""
    # 从 State 中读取当前认证状态
    current_state = runtime.state
    is_authenticated = current_state.get("authenticated", False)

    if is_authenticated:
        return "User is authenticated"
    else:
        return "User is not authenticated"

agent = create_agent(
    model="gpt-4.1",
    tools=[check_authentication]
)

在这个例子中:

  • runtime.state 提供当前会话的 短期状态
  • 工具通过 authenticated 字段判断用户是否已登录
  • 然后返回对应的认证状态信息

这种方式可以让工具根据 当前会话状态 做出不同的行为决策。

而从Store以及Context中读取数据只需要用runtime去点出各种属性即可!毕竟runtime都有了,相当于整个agent运行的”堆栈“都在你手中,还有什么数据是拿不到的。

store = runtime.store
existing_prefs = store.get(("preferences",), user_id)

db_connection = runtime.context.db_connection
Writes(写入上下文)

工具执行的结果不仅可以帮助 Agent 完成当前任务,还可以 更新 Agent 的记忆,从而让后续步骤能够访问这些重要信息。

工具可以通过两种方式产生影响:

  1. 直接将结果返回给模型
  2. 更新 Agent 的记忆(State 或 Store)

下面的示例展示了如何通过 Command 向 State 写入数据,以记录当前会话中的状态信息。

当工具需要 更新图(graph)的状态 时,应返回一个 Command(例如设置用户偏好或应用状态)。

Command 可以 只包含状态更新,也可以 同时包含一个 ToolMessage

如果模型需要看到工具执行成功的信息(例如确认用户偏好已经修改),那么在 update 中应包含一个 ToolMessage,并使用runtime.tool_call_id 作为 tool_call_id 参数。

from langchain.tools import tool, ToolRuntime
from langchain.agents import create_agent
from langgraph.types import Command

@tool
def authenticate_user(
    password: str,
    runtime: ToolRuntime
) -> Command:
    """Authenticate user and update State."""
    # 执行认证逻辑(简化示例)
    if password == "correct":
        # 写入 State:标记用户已认证
        return Command(
            update={"authenticated": True},
        )
    else:
        return Command(update={"authenticated": False})

agent = create_agent(
    model="gpt-4.1",
    tools=[authenticate_user]
)

在这个示例中:

  1. 工具接收用户输入的 password
  2. 执行简单的认证逻辑
  3. 使用 Command(update=...) 更新 State
  4. authenticated 字段写入当前会话状态

这样,在后续步骤中:

  • 其他工具
  • Middleware
  • Agent 逻辑

都可以通过 state["authenticated"] 判断用户是否已经登录。

ToolRuntime

ToolRuntimeRuntime 的一个“受限视图(restricted view)”,专门给 Tool 使用。

也就是说:

Runtime        → Agent / Middleware 用
ToolRuntime    → Tool 用

Tool 是 模型可以调用的东西,而 Middleware / Agent 是 开发者控制的代码

所以框架必须保证:

Tool 不能随便操作整个 Runtime

如果直接给 Tool 完整 Runtime,理论上工具就可以:

  • 改 Agent 执行流程
  • 修改 Graph 运行结构
  • 跳节点
  • 改调度逻辑

这显然不合理。

所以 ToolRuntime 只暴露 工具真正需要的能力

常见字段:

runtime.state
runtime.context
runtime.store
runtime.stream_writer
runtime.tool_call_id

但不会暴露:

graph
scheduler
node control
jump control
middleware control

所以:

Runtime = 完整执行环境
ToolRuntime = 工具执行环境

类似 沙箱化 API

如果工具直接接收 Runtime

def my_tool(runtime: Runtime)

开发者会误以为:

工具可以控制 Agent 运行

但实际上工具应该只是:

读取上下文
执行动作
返回结果

而不是:

控制 Agent Loop

所以 LangChain 用 ToolRuntime 明确表达:

这是工具的运行环境
不是 Agent 的运行环境

这是一种 语义边界(semantic boundary)

LangChain 会通过 参数类型自动注入依赖

例如:

ToolRuntime → 注入工具运行环境
Runtime     → 注入 middleware runtime

如果统一用 Runtime,就会出现:

工具 runtime
middleware runtime

混在一起,容易出错。

现在类型是:

ToolRuntime
Runtime

框架一眼就知道该注入哪个对象。

LangGraph 未来可能会让 ToolRuntime 支持:

  • 限制 Store 访问
  • 限制 Namespace
  • 限制 Stream
  • 沙箱权限

例如:

ToolRuntime(read_only_store=True)

如果工具拿到完整 Runtime,这些控制就很难做。

为什么 Tool 写 state 要用 Command,而不是直接改 runtime.state

这个设计其实是 LangGraph 的核心思想之一:让 Agent 执行过程保持“可控、可回放、可合成”
如果 Tool 可以直接改 runtime.state,很多关键能力都会被破坏。

保证 Graph 执行是“可回放”的(deterministic execution)

LangGraph 本质上是一个 状态机 + 图执行引擎

每一步执行都会形成:

(state_before) 
      ↓
   Node 执行
      ↓
(state_update)
      ↓
(state_after)

如果 Tool 直接这样改:

runtime.state["authenticated"] = True

那么问题来了:

state 是什么时候被改的?

是:

工具内部

Graph 引擎是 不知道的

但如果用:

return Command(update={"authenticated": True})

Graph 会记录:

node: authenticate_user
update: {"authenticated": True}

于是整个执行变成:

(state_before)
   ↓
tool node
   ↓
Command(update)
   ↓
reducer
   ↓
(state_after)

这样就可以 完全回放执行过程

这对很多能力很重要:

  • Debug
  • Replay
  • Agent trace
  • LangSmith 可视化

统一 State 更新入口(Reducer 机制)

LangGraph 的 State 更新不是简单的 dict.update,而是通过 Reducer

例如 messages:

messages: Annotated[list, add_messages]

更新时不是:

覆盖

而是:

append

如果 Tool 直接写:

runtime.state["messages"] = [...]

就会破坏 reducer 逻辑。

Command(update=...) 会经过 reducer:

Command
 ↓
Reducer
 ↓
State update

例如:

old messages = [A, B]
update = [C]

结果 = [A, B, C]

而不是覆盖。

支持 Middleware 组合(非常关键)

LangChain 支持多个 middleware 叠加。

比如:

retry middleware
logging middleware
tracking middleware

这些 middleware 都可能返回:

ExtendedModelResponse(
    command=Command(...)
)

LangGraph 会做:

Command 合成

执行顺序:

inner middleware
   ↓
outer middleware

如果 Tool 直接改 state:

runtime.state["x"] = ...

那 middleware 就 没法合成这些更新

但用 Command:

Command(update={...})

Graph 可以统一处理:

Command1
Command2
Command3
 ↓
Reducer
 ↓
Final State

支持“失败回滚”

想象一种情况:

Tool 执行
↓
State 已经被改
↓
Tool 抛异常

如果 Tool 直接改 state:

状态已经被污染

但用 Command

Tool 成功 → Command 才会应用
Tool 失败 → Command 丢弃

这样状态就不会被破坏。

还有一个很高级的能力:Execution Trace

LangGraph 内部可以记录:

step1
step2
step3

每一步都有:

state diff

例如:

step 5
node: authenticate_user

state diff:
authenticated: False → True

如果 Tool 直接改 state:

这个 diff 就没法计算

Human-in-the-loop

某些工具操作可能具有较高风险,因此在执行之前需要 人工审批

Deep Agents 通过 LangGraph 的 interrupt(中断)机制 来支持 Human-in-the-loop 工作流

你可以通过 interrupt_on 参数指定 哪些工具在执行前需要人工确认

基本流程如下:

在这里插入图片描述

人工可以:

  • approve:批准执行
  • edit:修改参数后执行
  • reject:拒绝执行

interrupt_on 参数接收一个字典,用于 将工具名称映射到对应的中断配置

每个工具可以配置为:

  • True
    启用中断,使用默认行为(允许 approve / edit / reject)

  • False
    禁用中断,工具可以直接执行

  • {“allowed_decisions”: […]}
    自定义允许的人工决策类型

示例:

from langchain.tools import tool
from deepagents import create_deep_agent
from langgraph.checkpoint.memory import MemorySaver

@tool
def delete_file(path: str) -> str:
    """Delete a file from the filesystem."""
    return f"Deleted {path}"

@tool
def read_file(path: str) -> str:
    """Read a file from the filesystem."""
    return f"Contents of {path}"

@tool
def send_email(to: str, subject: str, body: str) -> str:
    """Send an email."""
    return f"Sent email to {to}"

# Human-in-the-loop 必须使用 Checkpointer
checkpointer = MemorySaver()

agent = create_deep_agent(
    model="claude-sonnet-4-6",
    tools=[delete_file, read_file, send_email],
    interrupt_on={
        "delete_file": True,   # 默认允许 approve / edit / reject
        "read_file": False,    # 不需要人工确认
        "send_email": {"allowed_decisions": ["approve", "reject"]},  # 不允许修改参数
    },
    checkpointer=checkpointer  # 必须提供
)

需要注意的是:

Human-in-the-loop 功能必须启用 Checkpointer,因为中断时需要保存当前执行状态,以便人工处理完成后继续执行。

决策类型(Decision Types):

allowed_decisions 用于控制 人工可以对工具调用做出哪些操作

可选值包括:

“approve”

按照 Agent 提出的原始参数 直接执行工具

“edit”

在执行前 修改工具参数

“reject”

完全跳过这次工具调用,不执行该工具。

你可以为不同风险级别的工具配置不同的审批策略:

interrupt_on = {
    # 高风险操作:允许所有选项
    "delete_file": {"allowed_decisions": ["approve", "edit", "reject"]},

    # 中等风险:只能批准或拒绝
    "write_file": {"allowed_decisions": ["approve", "reject"]},

    # 必须执行(只能批准)
    "critical_operation": {"allowed_decisions": ["approve"]},
}

通过这种方式,可以为 Agent 的工具调用建立 细粒度的安全控制机制,确保关键操作在执行前得到人工确认。

delete_file,write_file,critical_operation 均为工具名字

处理中断(Handle interrupts)

当触发中断时,Agent 会暂停执行并将控制权返回。你需要在结果中检查是否存在中断,并根据情况进行处理。

import uuid
from langgraph.types import Command

# 创建包含 thread_id 的配置,用于状态持久化
config = {"configurable": {"thread_id": str(uuid.uuid4())}}

# 调用 agent
result = agent.invoke(
    {"messages": [{"role": "user", "content": "Delete the file temp.txt"}]},
    config=config,
    version="v2",
)

# 检查执行是否被中断
if result.interrupts:
    # 提取中断信息
    interrupt_value = result.interrupts[0].value  
    action_requests = interrupt_value["action_requests"]
    review_configs = interrupt_value["review_configs"]

    # 创建从工具名称到 review_config 的映射
    config_map = {cfg["action_name"]: cfg for cfg in review_configs}

    # 向用户展示待处理的操作
    for action in action_requests:
        review_config = config_map[action["name"]]
        print(f"Tool: {action['name']}")
        print(f"Arguments: {action['args']}")
        print(f"Allowed decisions: {review_config['allowed_decisions']}")

    # 获取用户决策(按 action_request 顺序,一一对应)
    decisions = [
        {"type": "approve"}  # 用户批准删除操作
    ]

    # 根据用户决策恢复执行
    result = agent.invoke(
        Command(resume={"decisions": decisions}),
        config=config,  # 必须使用相同的 config!
        version="v2",
    )

# 处理最终结果
print(result.value["messages"][-1].content)

当 Agent 调用多个需要审批的工具时,所有中断会被 合并(batch)到一个 interrupt 中

你必须 按顺序为每个工具提供对应的决策

config = {"configurable": {"thread_id": str(uuid.uuid4())}}

result = agent.invoke(
    {"messages": [{
        "role": "user",
        "content": "Delete temp.txt and send an email to admin@example.com"
    }]},
    config=config,
    version="v2",
)

if result.interrupts:
    interrupt_value = result.interrupts[0].value  
    action_requests = interrupt_value["action_requests"]

    # 有两个工具需要审批
    assert len(action_requests) == 2

    # 决策必须与 action_requests 顺序一致
    decisions = [
        {"type": "approve"},  # 第一个工具:delete_file
        {"type": "reject"}    # 第二个工具:send_email
    ]

    result = agent.invoke(
        Command(resume={"decisions": decisions}),
        config=config,
        version="v2",
    )

如果 allowed_decisions 中包含 "edit",你可以在执行工具之前 修改工具参数

if result.interrupts:
    interrupt_value = result.interrupts[0].value  
    action_request = interrupt_value["action_requests"][0]

    # Agent 原始参数
    print(action_request["args"])  # {"to": "everyone@company.com", ...}

    # 用户决定修改收件人
    decisions = [{
        "type": "edit",
        "edited_action": {
            "name": action_request["name"],  # 必须包含工具名称
            "args": {"to": "team@company.com", "subject": "...", "body": "..."}
        }
    }]

    result = agent.invoke(
        Command(resume={"decisions": decisions}),
        config=config,
        version="v2",
    )

另外需要注意的是,在前后端交互的环境中,我们需要把中断内容发给前端进行渲染,并且需要实现一个resume接口进行重启会话:

def chat():
    config = {"configurable": {"thread_id": "generalzy"}}
    # Invoke the agent
    result:GraphOutput = agent.invoke(
        {"messages": [{"role": "user", "content": "调用本地add工具计算1+1等于几"}]},
        config=config,
        version="v2",
    )

    # Check if execution was interrupted
    if result.interrupts:
        # Extract interrupt information
        interrupt_value = result.interrupts[0].value
        action_requests = interrupt_value["action_requests"]
        review_configs = interrupt_value["review_configs"]

        # Create a lookup map from tool name to review config
        config_map = {cfg["action_name"]: cfg for cfg in review_configs}

        # Display the pending actions to the user
        for action in action_requests:
            review_config = config_map[action["name"]]
            print(f"Tool: {action['name']}")
            print(f"Arguments: {action['args']}")
            print(f"Allowed decisions: {review_config['allowed_decisions']}")
            # 生产环境中直接把中断发给前端
            return {
                "status": "interrupt",
                "Tool": action['name'],
                "Arguments": action['args'],
                "Allowed decisions": review_config['allowed_decisions']
            }
    else:
        pprint(result)
        # 正常返回
        return {
            "status": "completed",
            "message": result.value["messages"][-1].content
        }

def resume(decisions):
    config = {"configurable": {"thread_id": "generalzy"}}

    result = agent.invoke(
        Command(resume={"decisions": decisions}),
        config=config,
        version="v2",
    )

    # Check if execution was interrupted
    if result.interrupts:
        # Extract interrupt information
        interrupt_value = result.interrupts[0].value
        action_requests = interrupt_value["action_requests"]
        review_configs = interrupt_value["review_configs"]

        # Create a lookup map from tool name to review config
        config_map = {cfg["action_name"]: cfg for cfg in review_configs}

        # Display the pending actions to the user
        for action in action_requests:
            review_config = config_map[action["name"]]
            print(f"Tool: {action['name']}")
            print(f"Arguments: {action['args']}")
            print(f"Allowed decisions: {review_config['allowed_decisions']}")
            # 生产环境中直接把中断发给前端
            return {
                "status": "interrupt",
                "Tool": action['name'],
                "Arguments": action['args'],
                "Allowed decisions": review_config['allowed_decisions']
            }
    else:
        pprint(result)
        # 正常返回
        return {
            "status": "completed",
            "message": result.value["messages"][-1].content
        }


def main():
    result = chat()
    if result["status"] == "completed":
        print(result["message"])
    elif result["status"] == "interrupt":
        result = resume(decisions=[{"type": "approve"}])
        print(result["message"])


    print("bye bye!")



if __name__ == '__main__':
    main()

可以看到,我们在前端允许了add操作,后台agent自然进行了一次Tool Call得到了ToolMessage:
在这里插入图片描述
其他类型的中断处理大致一样,这里就不再赘述!

子 Agent 中的中断(Subagent interrupts)

在使用 子 Agent(subagents) 时,可以在两种地方使用中断:

  1. 工具调用时触发中断(Interrupts on tool calls)
  2. 工具内部触发中断(Interrupts within tool calls)

工具调用时的中断(Interrupts on tool calls)

每个 子 Agent 都可以拥有自己的 interrupt_on 配置,用于 覆盖主 Agent 的设置

agent = create_deep_agent(
    tools=[delete_file, read_file],
    interrupt_on={
        "delete_file": True,
        "read_file": False,
    },
    subagents=[{
        "name": "file-manager",
        "description": "Manages file operations",
        "system_prompt": "You are a file management assistant.",
        "tools": [delete_file, read_file],
        "interrupt_on": {
            # 覆盖设置:在这个子 agent 中读取文件也需要审批
            "delete_file": True,
            "read_file": True,  # 与主 agent 不同
        }
    }],
    checkpointer=checkpointer
)

含义是:

主 Agent:

delete_file → 需要审批
read_file → 不需要审批

子 Agent:

delete_file → 需要审批
read_file → 需要审批

子 Agent 触发中断 时,处理方式与普通 Agent 相同:

  1. 在结果中检查 result.interrupts
  2. 使用 Command(resume=...) 恢复执行

工具内部触发中断(Interrupts within tool calls)

子 Agent 的工具也可以 直接调用 interrupt() 来暂停执行并等待用户审批。

示例:

from langchain.agents import create_agent
from langchain_anthropic import ChatAnthropic
from langchain.messages import HumanMessage
from langchain.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, interrupt

from deepagents.graph import create_deep_agent
from deepagents.middleware.subagents import CompiledSubAgent

定义一个需要审批的工具:

@tool(description="Request human approval before proceeding with an action.")
def request_approval(action_description: str) -> str:
    """使用 interrupt() 请求人工审批"""

    # interrupt() 会暂停执行,并返回 Command(resume=...) 中传入的值
    approval = interrupt({
        "type": "approval_request",
        "action": action_description,
        "message": f"Please approve or reject: {action_description}",
    })

    if approval.get("approved"):
        return f"Action '{action_description}' was APPROVED. Proceeding..."
    else:
        return f"Action '{action_description}' was REJECTED. Reason: {approval.get('reason', 'No reason provided')}"

完整示例

def main():
    checkpointer = InMemorySaver()

    model = ChatAnthropic(
        model_name="claude-sonnet-4-6",
        max_tokens=4096,
    )

    compiled_subagent = create_agent(
        model=model,
        tools=[request_approval],
        name="approval-agent",
    )

    parent_agent = create_deep_agent(
        checkpointer=checkpointer,
        subagents=[
            CompiledSubAgent(
                name="approval-agent",
                description="An agent that can request approvals",
                runnable=compiled_subagent,
            )
        ],
    )

    thread_id = "test_interrupt_directly"
    config = {"configurable": {"thread_id": thread_id}}

    print("Invoking agent - sub-agent will use request_approval tool...")

    result = parent_agent.invoke(
        {
            "messages": [
                HumanMessage(
                    content="Use the task tool to launch the approval-agent sub-agent. "
                    "Tell it to use the request_approval tool to request approval for 'deploying to production'."
                )
            ]
        },
        config=config,
        version="v2",
    )

检测中断并恢复执行

# 检查是否触发中断
if result.interrupts:
    interrupt_value = result.interrupts[0].value  

    print("\nInterrupt received!")
    print(f"  Type: {interrupt_value.get('type')}")
    print(f"  Action: {interrupt_value.get('action')}")
    print(f"  Message: {interrupt_value.get('message')}")

    print("\nResuming with Command(resume={'approved': True})...")

    result2 = parent_agent.invoke(
        Command(resume={"approved": True}),
        config=config,
        version="v2",
    )

    if not result2.interrupts:
        print("\nExecution completed!")

        # 找到工具执行结果
        tool_msgs = [m for m in result2.value.get("messages", []) if m.type == "tool"]

        if tool_msgs:
            print(f"  Tool result: {tool_msgs[-1].content}")
    else:
        print("\nAnother interrupt occurred")

else:
    print("\n  No interrupt - the model may not have called request_approval")

运行输出示例

Invoking agent - sub-agent will use request_approval tool...

Interrupt received!
  Type: approval_request
  Action: deploying to production
  Message: Please approve or reject: deploying to production

Resuming with Command(resume={'approved': True})...

Execution completed!
  Tool result: Great! The approval request has been processed. The action "deploying to production" was APPROVED. You can now proceed with the production deployment.

在工具函数内部调用:

interrupt({...})

这种方式更灵活,可以:

  • 动态构造审批信息
  • 实现复杂的 human-in-the-loop 流程
  • 自定义审批逻辑

interrupt_on和 interrupt的使用场景

1️⃣ interrupt_on(Agent 自动拦截工具调用)

如果你配置了:

interrupt_on = {
    "delete_file": True
}

当 Agent 准备调用这个工具时,LangGraph 会 自动触发 interrupt

返回的数据类似:

{
  "action_requests": [
    {
      "name": "delete_file",
      "args": {"path": "temp.txt"}
    }
  ],
  "review_configs": [
    {
      "action_name": "delete_file",
      "allowed_decisions": ["approve","reject","edit"]
    }
  ]
}

特点:

  • 不需要在工具里写 interrupt()
  • 工具调用之前暂停
  • 属于 Agent runtime 控制

流程:

LLM
 ↓
决定调用 tool
 ↓
interrupt_on 拦截
 ↓
interrupt
 ↓
等待用户 approve
 ↓
resume
 ↓
真正执行 tool

2️⃣ 工具内部 interrupt()(工具主动暂停)

如果你在工具里写:

approval = interrupt({
    "type": "approval_request",
    "action": "deploy",
})

此时:

  • 工具执行过程中暂停
  • interrupt 返回的是你传入的数据

例如:

{
  "type": "approval_request",
  "action": "deploy"
}

恢复时:

Command(resume={"approved": True})

这个数据会返回给:

approval = interrupt(...)

也就是说:

approval == {"approved": True}

特点:

  • 完全自定义 interrupt 数据
  • 工具内部逻辑中断

实际生产系统一般同时用两种

例如安全 Agent:

run_shell → interrupt_on
deploy_prod → interrupt_on

而发布流程:

create_release
   ↓
interrupt() 请求审批

interrupt() 实际做的是:

raise GraphInterrupt

LangGraph 捕获后:

保存 graph state
返回 interrupt
暂停执行

resume 时:

恢复 graph stack
继续执行

所以它其实是 Graph execution pause

Streaming(流式输出V2)

该功能要求:

LangGraph >= 1.1

并且需要在调用 stream()astream() 时指定:

version="v2"

v2 版本中,所有返回的数据块都采用统一格式:

{
    "type": "values" | "updates" | "messages" | "custom" | "checkpoints" | "tasks" | "debug",
    "ns": (),        # namespace(子图时会使用)
    "data": ...,     # 实际数据(取决于 stream mode)
}

其中:

字段 含义
type 数据类型
ns 命名空间(子图事件会使用)
data 具体数据

每种 stream mode 对应一个 TypedDict 类型:

  • ValuesStreamPart
  • UpdatesStreamPart
  • MessagesStreamPart
  • CustomStreamPart
  • CheckpointStreamPart
  • TasksStreamPart
  • DebugStreamPart

这些类型可以从langgraph.types导入。

所有类型的联合类型是:

StreamPart

它是一个 part["type"] 为判别字段的联合类型(discriminated union),因此在编辑器或类型检查器中可以实现 精确的类型推断(type narrowing)

v1 与 v2 的区别

默认版本是 v1,其输出结构会根据 streaming 配置发生变化,例如:

  • 单个 mode → 返回原始数据
  • 多个 mode → 返回 (mode, data)
  • subgraph → 返回 (namespace, data)

v2 的格式始终统一

for chunk in graph.stream(inputs, stream_mode="updates", version="v2"):
    print(chunk["type"])  # "updates"
    print(chunk["ns"])    # ()
    print(chunk["data"])  # {"node_name": {"key": "value"}}

由于 type 字段是判别字段,可以按类型分支处理数据:

for part in graph.stream(
    {"topic": "ice cream"},
    stream_mode=["values", "updates", "messages", "custom"],
    version="v2",
):

    if part["type"] == "values":
        # 完整 state 快照
        print(f"State: topic={part['data']['topic']}")

    elif part["type"] == "updates":
        # 仅包含节点更新的数据
        for node_name, state in part["data"].items():
            print(f"Node `{node_name}` updated: {state}")

    elif part["type"] == "messages":
        # LLM message chunk
        msg, metadata = part["data"]
        print(msg.content, end="", flush=True)

    elif part["type"] == "custom":
        # 自定义流式数据
        print(f"Progress: {part['data']['progress']}%")

Stream 模式(Stream modes)

在调用 stream()astream() 方法时,可以传入一个 stream modes 列表,用于指定你希望接收的流式数据类型。

Mode Type 说明
values ValuesStreamPart 每一步执行后返回 完整的 state
updates UpdatesStreamPart 每一步执行后返回 state 的更新部分。同一步中多个更新会分别流式输出
messages MessagesStreamPart LLM 调用产生的 (token, metadata) 二元组
custom CustomStreamPart 通过 get_stream_writer() 从节点中发送的 自定义数据
checkpoints CheckpointStreamPart Checkpoint 事件(格式与 get_state() 相同),需要配置 checkpointer
tasks TasksStreamPart 任务开始/结束事件,包含执行结果和错误信息,需要 checkpointer
debug DebugStreamPart 所有可用信息(包含 checkpoints 和 tasks,并附加额外 metadata)

这些 stream modes 本质上是在解决一个问题:

Agent / Graph 执行过程中,你希望实时看到什么信息?

不同 mode 对应 不同粒度的运行信息。如果你在做 Agent 平台 / UI / 调试工具,这些 mode 的用途会非常明显。

生产环境最常用的是这四个:

messages
updates
values
custom

1️⃣ messages —— LLM token streaming

用途:

实现 ChatGPT 那种逐字输出

返回:

(token, metadata)

示例:

for part in graph.stream(..., stream_mode="messages"):
    msg, metadata = part["data"]
    print(msg.content, end="")

效果:

The answer is...
The answer is 42

使用场景:

  • 聊天 UI
  • token streaming
  • 减少 LLM latency 体感

典型 UI:

Assistant typing...

如果你做 聊天界面,这个基本是 必须用的

2️⃣ updates —— 每个节点的 state 变化

用途:

观察 graph 每个 node 执行后的变化

返回:

{
  node_name: {state_update}
}

示例:

Node refine_topic updated:
{'topic': 'ice cream and cats'}

使用场景:

Agent execution timeline

例如 UI:

Step 1 refine_topic
Step 2 generate_joke
Step 3 call_tool

很多 Agent 可视化工具都用这个。

优点:

数据小
只返回变化

3️⃣ values —— 每一步完整 state

用途:

获取完整 graph state

示例:

{
  topic: "ice cream and cats",
  joke: "This is a joke..."
}

updates 的区别:

mode 返回
updates 只返回变化
values 返回完整 state

例如:

Step1

updates → {"topic": "..."}
values → {"topic": "...", "joke": ""}

Step2

updates → {"joke": "..."}
values → {"topic": "...", "joke": "..."}

使用场景:

state debugging
state replay
state visualization

但缺点:

数据量更大

所以生产系统 通常用 updates 而不是 values

4️⃣ custom —— 节点主动发消息

用途:

节点主动发进度信息

需要用:

writer = get_stream_writer()

writer({"progress": 30})

流输出:

Progress: 30%

使用场景:

例如:

检索 10000 条漏洞

UI:

Scanning CVE database...
Progress 10%
Progress 30%
Progress 80%

特别适合:

长任务
RAG
数据处理

下面这三个更多是 系统级功能

5️⃣ checkpoints

用途:

graph checkpoint 保存事件

前提:

必须使用 checkpointer

例如:

InMemorySaver
RedisSaver
PostgresSaver

返回:

checkpoint state

使用场景:

恢复执行
time travel debugging
state replay

例如:

resume agent

6️⃣ tasks

用途:

任务开始 / 结束

返回:

task started
task finished
task error

示例:

Task generate_joke started
Task generate_joke finished

使用场景:

Agent execution timeline

UI:

Task1 running
Task2 finished
Task3 error

7️⃣ debug

用途:

所有调试信息

包括:

checkpoints
tasks
metadata
internal info

相当于:

debug = checkpoints + tasks + extra

使用场景:

调试 graph
开发 agent

生产环境一般不会开。

实际系统通常这样用:

聊天 UI

messages

实现:

token streaming

Agent 可视化 UI

messages
updates

效果:

LLM token
+
agent step

类似:

Thinking...
Calling tool...
Tool finished
Answer:

长任务

messages
updates
custom

例如:

progress
step
token

调试

updates
debug

一个真实 Agent UI 的 streaming 结构

例如:

User: 查找最近的漏洞

stream:

messages → "Searching"
updates → node search_cve started
custom → progress 20%
custom → progress 80%
updates → node summarize finished
messages → "Found 3 vulnerabilities..."

UI:

Searching...
[████████] 80%

Found 3 vulnerabilities...

如果你做 Agent 平台(很关键)

通常 streaming 会组合:

messages  → token streaming
updates   → agent step
custom    → progress
tasks     → monitoring

这样 UI 才完整。

messages 是 LLM streaming

updates 是 agent streaming

两者是不同层:

LLM
 ↓
messages

Agent Graph
 ↓
updates

与任意 LLM 一起使用(Use with any LLM)

你可以使用 stream_mode="custom"从任意 LLM API 进行流式输出,即使该 API 没有实现 LangChain 的 Chat Model 接口

这意味着你可以集成:

  • 原生 LLM 客户端(raw LLM clients)
  • 外部服务提供的 streaming API
  • 自定义模型接口

示例:从任意模型流式输出

from langgraph.config import get_stream_writer

def call_arbitrary_model(state):
    """示例节点:调用任意模型并流式返回输出"""

    # 获取 stream writer 用于发送自定义数据
    writer = get_stream_writer()

    # 假设你有一个 streaming client 会不断产生数据块
    for chunk in your_custom_streaming_client(state["topic"]):

        # 将数据写入 streaming channel
        writer({"custom_llm_chunk": chunk})

    return {"result": "completed"}

构建 graph:

graph = (
    StateGraph(State)
    .add_node(call_arbitrary_model)
    # 可以添加其他节点和边
    .compile()
)

运行 graph,并使用 stream_mode="custom" 接收流数据:

for chunk in graph.stream(
    {"topic": "cats"},
    stream_mode="custom",
    version="v2",
):
    if chunk["type"] == "custom":
        # data 中包含从 LLM 流出的自定义数据
        print(chunk["data"])

扩展示例:流式调用任意 Chat Model

import operator
import json

from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START

from openai import AsyncOpenAI

初始化 OpenAI 客户端:

openai_client = AsyncOpenAI()
model_name = "gpt-4.1-mini"

定义一个 token streaming 生成器

async def stream_tokens(model_name: str, messages: list[dict]):

    response = await openai_client.chat.completions.create(
        messages=messages,
        model=model_name,
        stream=True
    )

    role = None

    async for chunk in response:
        delta = chunk.choices[0].delta

        if delta.role is not None:
            role = delta.role

        if delta.content:
            yield {"role": role, "content": delta.content}

定义工具函数

async def get_items(place: str) -> str:
    """列出某个地方可能存在的物品"""

    writer = get_stream_writer()

    response = ""

    async for msg_chunk in stream_tokens(
        model_name,
        [
            {
                "role": "user",
                "content": (
                    "Can you tell me what kind of items "
                    f"i might find in the following place: '{place}'. "
                    "List at least 3 such items separating them by a comma. "
                    "And include a brief description of each item."
                ),
            }
        ],
    ):
        response += msg_chunk["content"]

        # 将 token streaming 发送到 LangGraph stream
        writer(msg_chunk)

    return response

Graph State

class State(TypedDict):
    messages: Annotated[list[dict], operator.add]

Tool 调用节点

async def call_tool(state: State):

    ai_message = state["messages"][-1]
    tool_call = ai_message["tool_calls"][-1]

    function_name = tool_call["function"]["name"]

    if function_name != "get_items":
        raise ValueError(f"Tool {function_name} not supported")

    function_arguments = tool_call["function"]["arguments"]
    arguments = json.loads(function_arguments)

    function_response = await get_items(**arguments)

    tool_message = {
        "tool_call_id": tool_call["id"],
        "role": "tool",
        "name": function_name,
        "content": function_response,
    }

    return {"messages": [tool_message]}

构建 Graph

graph = (
    StateGraph(State)
    .add_node(call_tool)
    .add_edge(START, "call_tool")
    .compile()
)

触发 Tool Call

传入一个包含 tool call 的 AIMessage:

inputs = {
    "messages": [
        {
            "content": None,
            "role": "assistant",
            "tool_calls": [
                {
                    "id": "1",
                    "function": {
                        "arguments": '{"place":"bedroom"}',
                        "name": "get_items",
                    },
                    "type": "function",
                }
            ],
        }
    ]
}

运行:

async for chunk in graph.astream(
    inputs,
    stream_mode="custom",
    version="v2",
):
    if chunk["type"] == "custom":
        print(chunk["data"]["content"], end="|", flush=True)

为特定 Chat Model 禁用 Streaming

如果你的应用同时使用:

  • 支持 streaming 的模型
  • 不支持 streaming 的模型

则可能需要 显式禁用 streaming

初始化模型时设置:

streaming=False

例如使用 LangChain:

from langchain.chat_models import init_chat_model

model = init_chat_model(
    "claude-sonnet-4-6",
    # 禁用 streaming
    streaming=False
)

有些模型集成不支持 streaming 参数。

这时可以使用:

disable_streaming=True

该参数由 ChatModel 的 基类提供,因此 所有 Chat Model 都支持

Model profiles(模型配置档)

Model profiles 需要 langchain >= 1.1

在 LangChain 中,Chat Model 可以通过 .profile 属性暴露一个字典,用于描述 模型支持的能力与限制

示例:

model.profile
# {
#   "max_input_tokens": 400000,
#   "image_inputs": True,
#   "reasoning_output": True,
#   "tool_calling": True,
#   ...
# }

大部分模型 profile 数据来自 开源项目

  • models.dev

该项目提供:

  • 模型能力数据
  • 上下文窗口
  • multimodal 支持
  • tool calling 支持等

LangChain 在此基础上会增加一些字段,以便更好地与 LangChain 生态集成,并持续与 upstream 项目保持同步。

Model profile 允许应用 根据模型能力动态调整行为

例如:

1️⃣ 自动触发 summarization

Middleware 可以根据:

max_input_tokens

判断上下文窗口大小。

如果上下文接近上限:

自动触发 summarization

2️⃣ 自动选择 structured output 策略

create_agent 中:

可以通过检测模型是否支持:

structured_output

自动选择:

  • native structured output
  • tool calling fallback

3️⃣ 限制输入内容

可以根据 profile 做输入校验,例如:

  • 是否支持图片
  • 最大 token 输入
  • 是否支持 reasoning

如果 profile 数据:

  • 缺失
  • 过期
  • 错误

可以进行修改。

方法 1:快速修复(本地覆盖)

创建一个自定义 profile:

custom_profile = {
    "max_input_tokens": 100_000,
    "tool_calling": True,
    "structured_output": True,
}

初始化模型:

model = init_chat_model("...", profile=custom_profile)

更新 profile

profile 是普通 dict,可以直接更新:

new_profile = model.profile | {"key": "value"}

model.model_copy(update={"profile": new_profile})

如果 model 在多个地方共享:

⚠️ 建议使用 model_copy,避免修改共享状态。

方法 2:修复上游数据(推荐)

模型 profile 的主要数据来源是:

models.dev

更新流程:

Step 1

在 models.dev GitHub 提交 PR 更新数据。

Step 2

如果需要增加 LangChain 特有字段:

修改:

langchain_<package>/data/profile_augmentations.toml

例如:

langchain_anthropic/data/profile_augmentations.toml

Step 3

使用 CLI 工具更新 profile:

pip install langchain-model-profiles

运行:

langchain-profiles refresh \
  --provider <provider> \
  --data-dir <data_dir>

该命令会:

1️⃣ 从 models.dev 下载最新数据
2️⃣ 合并 profile_augmentations.toml
3️⃣ 写入 profiles.py

示例

在 LangChain monorepo 中:

uv run --with langchain-model-profiles \
  --provider anthropic \
  --data-dir langchain_anthropic/data

⚠️ 注意

Model profile 目前仍然是 Beta 功能,格式未来可能发生变化。

Model profiles 和传入参数的区别

Model profiles ≠ 传入参数

它们解决的是 两个完全不同的问题

概念 作用 谁决定
Model profiles 描述模型“能做什么” 模型/SDK提供
传入参数 控制模型“这次怎么做” 调用者决定

可以理解成:

Model profiles = 模型能力说明书
调用参数 = 本次调用的配置

Model profiles 是模型的能力描述

LangChain 1.1 之后,引入了 model.profile

它是一个 只读能力信息,例如:

model.profile

可能返回:

{
  "max_input_tokens": 400000,
  "image_inputs": True,
  "reasoning_output": True,
  "tool_calling": True
}

这些信息说明:

字段 含义
max_input_tokens 最大上下文
image_inputs 是否支持图像
reasoning_output 是否支持推理输出
tool_calling 是否支持工具调用

注意:

这是模型能力,不是调用配置。

不能传入这些值改变模型能力

例如:

model.profile["image_inputs"] = False

是没有意义的。

传入参数是调用时的配置

调用模型时你传的:

model.invoke(
    messages,
    temperature=0.7,
    max_tokens=1000,
)

这些是:

参数 作用
temperature 随机性
max_tokens 最大生成
stop 停止符
tools 工具
response_format 输出结构

这些是 runtime配置

举个最直观的例子

假设你有一个模型:

model = ChatOpenAI(...)

Model profile

model.profile

输出:

{
  "image_inputs": True
}

表示:

这个模型 支持图像输入

你调用:

model.invoke([
    {"role":"user","content":[
        {"type":"text","text":"这是什么"},
        {"type":"image_url","image_url":"..."}
    ]}
])

这里你 实际使用了这个能力

为什么要有 Model Profiles

LangChain 引入它的目的主要是:

让 Agent 自动适配模型能力。

例如在 LangChain Agents 里:

Agent 可以自动判断:

if model.profile["tool_calling"]:
    使用 function calling
else:
    使用 text parsing

再比如:

if model.profile["image_inputs"]:
    允许上传图片

这样 同一个 Agent 可以适配不同模型

Harness capabilities(Harness 能力)

在 LangGraph 中,agent harness 是多种能力的组合,这些能力可以让构建 长时间运行的 agent 更加容易。

一个 agent harness 包含以下能力:

  • Planning capabilities(规划能力)
  • Virtual filesystem(虚拟文件系统)
  • Task delegation(任务委派 / 子代理)
  • Context 和 token 管理
  • Code execution(代码执行)
  • Human-in-the-loop(人类参与决策)

除了这些能力之外,deep agents 还会使用 Skills 和 Memory 来提供额外的上下文和指令。

其中规划能力,虚拟文件系统,代码执行,Context 和 token 管理都与backends有莫大的关系,因此deepagent的backends概念一定要弄明白。

Backends(后端)

Deep agents 通过诸如 lsread_filewrite_fileedit_fileglobgrep 等工具向 agent 暴露一个文件系统界面。这些工具通过可插拔的 backend(后端)来运行。read_file 工具在所有 backend 上都原生支持图像文件(.png.jpg.jpeg.gif.webp),并将其作为多模态内容块返回。

Sandbox 和 LocalShellBackend 还提供 execute 工具。

Filesystem Tools
        ↓
      Backend
   ├ State
   ├ Filesystem
   ├ Store
   ├ Sandbox
   ├ Local Shell
   ├ Composite
   └ Custom
        ↓
       Routes
        ↓
   + execute tool

在这里插入图片描述

下面是一些预构建的 filesystem backend,可以快速用于你的 deep agent:

Built-in backend Description
Default agent = create_deep_agent()
在 state 中为临时(ephemeral)。Agent 的默认 filesystem backend 存储在 langgraph state 中。注意该 filesystem 只会在单个 thread 生命周期内存在。
Local filesystem persistence agent = create_deep_agent(backend=FilesystemBackend(root_dir="/Users/nh/Desktop/"))
让 deep agent 可以访问本地机器的文件系统。你可以指定 agent 可以访问的根目录。注意 root_dir 必须是绝对路径。
Durable store (LangGraph store) agent = create_deep_agent(backend=lambda rt: StoreBackend(rt))
让 agent 可以访问跨 thread 持久化的长期存储。这非常适合存储长期记忆或在多次执行之间适用的指令。
Sandbox agent = create_deep_agent(backend=sandbox)
在隔离环境中执行代码。Sandbox 提供 filesystem 工具以及用于运行 shell 命令的 execute 工具。可以选择 Modal、Daytona、Deno 或本地 VFS。
Local shell agent = create_deep_agent(backend=LocalShellBackend(root_dir=".", env={"PATH": "/usr/bin:/bin"}))
在宿主机上直接进行 filesystem 和 shell 执行。没有隔离——只应在受控的开发环境中使用。参见下方的安全注意事项。
Composite 默认 /memories/ 持久化,其余为临时(ephemeral)。Composite backend 最为灵活。你可以指定文件系统中的不同路径路由到不同 backend。下面的 Composite routing 提供了一个可以直接使用的示例。

StateBackend(临时 / ephemeral)

# 默认提供 StateBackend
agent = create_deep_agent()

# 实际上底层等价于
from deepagents.backends import StateBackend

agent = create_deep_agent(
    backend=(lambda rt: StateBackend(rt))   # 注意:这些工具通过 runtime.state 访问 State
)

工作方式

  • 将文件存储在 LangGraph agent 的 state 中,作用范围为当前线程。
  • 通过 checkpoint,在同一线程的多个 agent 轮次之间保持持久化。

适用场景

  • 作为 agent 的 临时工作区(scratch pad),用于写入中间结果。
  • 自动清理大型工具输出:agent 可以按需分块重新读取这些输出。

需要注意:

  • 该 backend 在 supervisor agent 和 subagent 之间共享
  • 如果某个 subagent 写入文件,即使该 subagent 执行结束,这些文件仍然会 保留在 LangGraph agent state 中
  • 这些文件之后仍然可以被 supervisor agent 或其他 subagent 访问

FilesystemBackend(本地磁盘)

该 backend 允许 agent 直接读写真实文件系统
需要谨慎使用,只适用于合适的环境。

适用场景

  • 本地开发 CLI

    • 例如:代码助手、开发工具
  • CI/CD pipeline

    • (下文包含安全注意事项)

不适用场景

  • Web 服务器或 HTTP API

    • 建议使用:

      • StateBackend
      • StoreBackend
      • sandbox backend

安全风险

  • Agent 可以读取 任何可访问文件,包括敏感信息:

    • API keys
    • credentials
    • .env 文件
  • 如果结合网络工具,可能通过 SSRF 攻击外泄这些敏感信息。

  • 文件修改是 永久性的且不可恢复

推荐防护措施

  • 启用 Human-in-the-Loop (HITL) middleware,对敏感操作进行人工审核。

  • 将敏感信息排除在 agent 可访问的文件路径之外(尤其是在 CI/CD 环境中)。

  • 在生产环境中,如果需要文件系统交互,建议使用 sandbox backend

  • 始终结合 root_dir 使用 virtual_mode=True,以启用基于路径的访问限制:

    • 阻止 ..
    • 阻止 ~
    • 阻止访问 root_dir 之外的绝对路径

注意:

  • 默认 virtual_mode=False 不会提供任何安全保护,即使设置了 root_dir 也是如此。
from deepagents.backends import FilesystemBackend

agent = create_deep_agent(
    backend=FilesystemBackend(root_dir=".", virtual_mode=True)
)

工作方式

  • 在可配置的 root_dir 目录下读取和写入真实文件。
  • 可以设置 virtual_mode=True,在 root_dir 下对路径进行 沙箱化和标准化
  • 使用安全的路径解析机制,在可能的情况下防止 不安全的符号链接(symlink)遍历
  • 可以使用 ripgrep 实现高速 grep 搜索。

适用场景

  • 本地机器上的项目
  • CI 沙箱环境
  • 挂载的持久化存储卷

LocalShellBackend(本地 Shell)

该 backend 允许 agent 直接读写文件系统,并在宿主机上执行任意 shell 命令

必须 极其谨慎使用,仅适用于特定环境。

适用场景

  • 本地开发 CLI

    • 如:代码助手、开发工具
  • 个人开发环境

    • 当你信任 agent 生成的代码时
  • CI/CD 流水线

    • 并且已做好 secret 管理

不适用场景

  • 生产环境

    • 如 Web 服务器
    • API 服务
    • 多租户系统
  • 处理不可信用户输入

  • 执行不可信代码

安全风险

  • Agent 可以 执行任意 shell 命令,权限等同于当前用户。

  • Agent 可以读取 任何可访问文件,包括敏感信息:

    • API keys
    • credentials
    • .env 文件
  • 敏感信息可能被泄露

  • 文件修改和命令执行 都是永久且不可逆的

  • 命令 直接在宿主机运行

  • 命令可能 无限消耗资源

    • CPU
    • 内存
    • 磁盘

推荐安全措施

  • 启用 Human-in-the-Loop (HITL) middleware,在执行前审核并批准操作(强烈推荐)。
  • 仅在 专用开发环境中运行,不要在共享或生产系统上使用
  • 如果生产环境需要 shell 执行,建议使用 sandbox backend

注意:
即使设置 virtual_mode=True在启用 shell 访问时也不会提供任何安全性,因为 shell 命令可以访问系统中的 任意路径

from deepagents.backends import LocalShellBackend

agent = create_deep_agent(
    backend=LocalShellBackend(root_dir=".", env={"PATH": "/usr/bin:/bin"})
)

工作机制

  • FilesystemBackend 的基础上扩展,增加了 execute 工具,用于在宿主机执行 shell 命令。
  • 命令通过:
subprocess.run(shell=True)

直接在本机执行,没有任何沙箱隔离

支持以下参数:

  • timeout(默认 120 秒
  • max_output_bytes(默认 100000
  • env(自定义环境变量)
  • inherit_env(是否继承系统环境变量)

Shell 命令默认使用 root_dir 作为 工作目录,但仍然可以访问 系统任意路径

StoreBackend(LangGraph Store)

from langgraph.store.memory import InMemoryStore
from deepagents.backends import StoreBackend

agent = create_deep_agent(
    backend=(lambda rt: StoreBackend(rt)),
    store=InMemoryStore() 
)

工作机制

  • 文件存储在 LangGraph 的 BaseStore 中。
  • BaseStore 由 runtime 提供,从而实现 跨线程(cross-thread)的持久化存储

适用场景

  • 你已经在使用 LangGraph store(例如基于 Redis、Postgres 或云端实现的 BaseStore)。

CompositeBackend(路由型 Backend)

from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend
from langgraph.store.memory import InMemoryStore

composite_backend = lambda rt: CompositeBackend(
    default=StateBackend(rt),
    routes={
        "/memories/": StoreBackend(rt),
    }
)

agent = create_deep_agent(
    backend=composite_backend,
    store=InMemoryStore()  # store 是传给 create_deep_agent,而不是 backend
)

工作机制

  • 根据 路径前缀(path prefix) 将文件操作路由到不同 backend。
  • 文件列表和搜索结果中会保留原始路径前缀

适用场景

1️⃣ 同时使用临时存储 + 长期存储

如果你希望 agent 同时拥有:

  • 临时存储(ephemeral)

    • 使用 StateBackend
  • 跨线程持久存储

    • 使用 StoreBackend

那么可以使用 CompositeBackend 同时提供这两种能力。

2️⃣ 为 agent 提供多个信息源

当你有 多个数据来源,希望统一暴露成一个文件系统 时非常有用。

例如:

  • /memories/

    • 长期记忆(存储在某个 Store)
  • /docs/

    • 文档数据(来自自定义 backend)

这样 agent 看起来像在访问一个 统一的虚拟文件系统,但实际数据来自不同 backend。

路径分发

按路径前缀匹配 backend

CompositeBackend(
    default=StateBackend(rt),
    routes={
        "/memories/": StoreBackend(rt),
    }
)

含义其实是:

路径前缀 使用 backend
/memories/* StoreBackend
其他路径 StateBackend

所以:

Agent 操作 实际 backend
/memories/user.txt StoreBackend
/workspace/a.txt StateBackend
/tmp/b.txt StateBackend

Agent 并不是直接操作 backend,它调用的是 filesystem tools

ls
read_file
write_file
edit_file
grep
glob

这些工具内部会调用 backend。

假设 agent 执行:

write_file("/memories/profile.txt")

流程是:

Tool: write_file
      ↓
Filesystem middleware
      ↓
CompositeBackend.write()
      ↓
匹配 path prefix
      ↓
StoreBackend.write()

假设 agent 写三个文件:

写临时计划

write_file("/workspace/plan.md")

匹配:

/workspace/plan.md
   ↓
不匹配 /memories/
   ↓
default backend
   ↓
StateBackend

结果:

存在 LangGraph state
线程结束可能消失

写长期记忆

write_file("/memories/user_profile.md")

匹配:

/memories/user_profile.md
   ↓
匹配 /memories/
   ↓
StoreBackend

结果:

存入 LangGraph store
跨线程持久

那么为什么会形成这样的分发机制呢?

工具不会主动加 /memories/ 前缀。
前缀是由 LLM 决定的,而 LLM 是否会这么做,通常是由 prompt(系统提示)或文档规范引导的。

工具其实只是普通函数,deepagents 暴露给模型的工具类似这样:

write_file(path: str, content: str)
read_file(path: str)
ls(path: str)
grep(pattern: str, path: str)

注意:

工具并不知道 /memories/ 有任何特殊含义。

它只接受字符串:

path="/memories/profile.txt"

然后 backend 决定去哪写。

/memories/ 只是一个“约定路径”

在:

CompositeBackend(
    default=StateBackend(rt),
    routes={
        "/memories/": StoreBackend(rt),
    }
)

这里定义的是:

如果 path 以 /memories/ 开头
→ StoreBackend

所以:

/workspace/a.txt → StateBackend
/memories/a.txt → StoreBackend

谁写 /memories/

答案是:

LLM

LLM 为什么会写 /memories/

原因通常有三个。

情况1:Prompt 明确告诉模型

很多 agent framework 会在 system prompt 写:

例如:

You have a filesystem available.

Use the following directories:

/workspace/ - temporary files for the current task
/memories/ - persistent memory across conversations
/docs/ - documentation

If you want to store long-term memory, write files under /memories/.

这样模型就学会:

write_file("/memories/user_profile.txt")

情况2:工具文档说明

工具 schema 也可能写说明:

例如:

write_file(path, content)

Paths starting with:
/memories/ -> persistent storage
/workspace/ -> temporary storage

模型会读这些描述。

情况3:示例(few-shot)

Prompt 里可能有例子:

Example:

User: remember that my name is Bob

Assistant:
write_file("/memories/user_name.txt", "Bob")

模型就会模仿。

如果 prompt 不写会发生什么

如果 prompt 没告诉模型:

/memories/

模型通常会写:

write_file("notes.txt")

或者:

write_file("/workspace/notes.txt")

这时就会:

StateBackend

不会触发持久化。

很多 Agent 会定义一个“虚拟文件系统”。

例如:

/
├─ workspace/
├─ memories/
├─ docs/
├─ repo/
└─ outputs/

Prompt 会写:

Filesystem layout:

/workspace/ -> temporary files
/memories/ -> long-term memory
/docs/ -> documentation
/repo/ -> source code

模型就会根据任务选择路径。

你可能会想到,为什么不直接设计:

write_memory()
write_temp()

原因是:

文件系统抽象更通用。

统一工具:

read_file
write_file
ls
grep
glob

但 backend 决定存哪里。

这其实就是:

Agent Virtual Filesystem

很多 coding agent 都是这个设计。

这种设计是为了让 agent 可以同时访问:

代码
记忆
文档
任务文件

统一接口:

read
write
grep
glob

而不是几十个工具。

举一个真实例子,一个 coding agent 的 FS 可能是:

/
├─ workspace/       (当前任务)
├─ repo/            (代码仓库)
├─ memories/        (长期记忆)
├─ docs/            (文档RAG)
└─ logs/            (日志)

背后 backend:

workspace → StateBackend
repo → GitBackend
memories → StoreBackend
docs → VectorBackend
logs → FilesystemBackend

LLM 只看到:

read_file("/repo/main.py")

长期记忆(Long-term memory)

Deep agents 自带一个 本地文件系统,用于存放 agent 的数据。

默认情况下,这个文件系统存储在 agent 的 state 中(因为默认是StateBackend),并且只在 单个线程内有效(也就是说,当对话结束时,这些文件就会丢失)。

如果希望让 agent 具备 长期记忆能力,可以使用 CompositeBackend,将特定路径路由到 持久化存储

这样就可以实现一种 混合存储(hybrid storage)

  • 一部分文件 跨线程持久存在
  • 另一部分文件 仍然是临时的

架构示意:

在这里插入图片描述

含义:

路径 存储类型
/memories/* 持久存储
其他路径 临时存储

可以通过 CompositeBackend,把 /memories/ 路径路由到 StoreBackend

from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()

def make_backend(runtime):
    return CompositeBackend(
        default=StateBackend(runtime),  # 临时存储
        routes={
            "/memories/": StoreBackend(runtime)  # 持久存储
        }
    )

agent = create_deep_agent(
    store=InMemoryStore(), 
    backend=make_backend,
    checkpointer=checkpointer
)

当 agent 执行文件操作时:

写入

/memories/profile.txt

→ 会进入 StoreBackend
跨线程持久化

而写入:

/workspace/plan.txt

→ 会进入 StateBackend
只在当前会话存在

工作原理(How it works)

当使用 CompositeBackend 时,deep agents 会维护 两个独立的文件系统

1️⃣ 短期(临时)文件系统

  • 存储在 agent 的 state 中(通过 StateBackend)。
  • 只在 单个线程内持久存在
  • 当线程结束时,文件会 被删除

访问路径示例:

/notes.txt
/workspace/draft.md

2️⃣ 长期(持久)文件系统

  • 存储在 LangGraph Store 中(通过 StoreBackend)。
  • 可以 跨所有线程和对话持久存在
  • 即使 agent 重启后仍然存在

访问路径示例:

/memories/preferences.txt

CompositeBackend 会根据 路径前缀 将文件操作路由到不同的 backend:

  • 路径以 /memories/ 开头

    • 存储到 Store(持久存储)
  • 没有该前缀

    • 存储到 临时 state

所有文件系统工具都可以同时访问这两类存储:

  • ls
  • read_file
  • write_file
  • edit_file

路由前缀处理:

CompositeBackend 在存储时会 去掉路由前缀

例如:

/memories/preferences.txt

StoreBackend 中实际存储为:

/preferences.txt

agent 始终使用完整路径

示例:

临时文件(线程结束后会丢失)

agent.invoke({
    "messages": [
        {"role": "user", "content": "Write draft to /draft.txt"}
    ]
})

持久文件(跨线程存在)

agent.invoke({
    "messages": [
        {"role": "user", "content": "Save final report to /memories/report.txt"}
    ]
})

使用场景(Use cases)

用户偏好(User preferences)

可以将用户的偏好保存下来,使其 跨会话持久存在

agent = create_deep_agent(
    store=InMemoryStore(),
    backend=lambda rt: CompositeBackend(
        default=StateBackend(rt),
        routes={"/memories/": StoreBackend(rt)}
    ),
    system_prompt="""当用户告诉你他们的偏好时,请将其保存到
    /memories/user_preferences.txt,这样你在未来的对话中也能记住这些信息。"""
)

这样 agent 就可以在未来的对话中 记住用户习惯

自我改进的指令(Self-improving instructions)

Agent 可以根据用户反馈 动态更新自己的指令

agent = create_deep_agent(
    store=InMemoryStore(),
    backend=lambda rt: CompositeBackend(
        default=StateBackend(rt),
        routes={"/memories/": StoreBackend(rt)}
    ),
    system_prompt="""你有一个文件 /memories/instructions.txt,其中包含额外的
    指令和用户偏好。

    在每次对话开始时读取这个文件,以了解用户的偏好。

    当用户提供类似 “请始终执行 X” 或 “我更喜欢 Y” 的反馈时,
    使用 edit_file 工具更新 /memories/instructions.txt。"""
)

随着时间推移,这个 instructions 文件会不断积累用户偏好,从而让 agent 逐渐优化自身行为

知识库(Knowledge base)

Agent 可以在多次对话之间 逐渐构建知识库

第一次对话:学习信息

agent.invoke({
    "messages": [
        {"role": "user", "content": "我们正在用 React 构建一个 Web 应用,请保存项目笔记。"}
    ]
})

第二次对话:利用之前的知识

agent.invoke({
    "messages": [
        {"role": "user", "content": "我们使用的是什么框架?"}
    ]
})

此时 agent 会读取:

/memories/project_notes.txt

这是 之前对话中保存的内容

研究项目(Research projects)

可以在多次会话之间 持续维护研究进度

research_agent = create_deep_agent(
    store=InMemoryStore(),
    backend=lambda rt: CompositeBackend(
        default=StateBackend(rt),
        routes={"/memories/": StoreBackend(rt)}
    ),
    system_prompt="""你是一名研究助手。

    将研究进度保存到 /memories/research/:

    - /memories/research/sources.txt    发现的资料来源
    - /memories/research/notes.txt      关键发现和笔记
    - /memories/research/report.md      最终报告草稿

    这样研究工作可以在多个会话之间持续推进。"""
)

💡 核心思想

通过 /memories/ 路径实现 长期存储,agent 可以逐渐积累:

  • 用户偏好
  • 自我改进规则
  • 项目知识
  • 研究笔记

从而让 agent 具备 跨会话学习和持续工作的能力

FileData 数据结构(FileData schema)

通过 StoreBackend 存储的文件使用如下数据结构:

{
    "content": ["line 1", "line 2", "line 3"],  // 字符串列表(每个元素表示一行)
    "created_at": "2024-01-15T10:30:00Z",       // ISO 8601 时间戳
    "modified_at": "2024-01-15T11:45:00Z"       // ISO 8601 时间戳
}

字段说明:

字段 含义
content 文件内容,按行存储的字符串列表
created_at 文件创建时间(ISO 8601 格式)
modified_at 文件最后修改时间(ISO 8601 格式)

可以使用 create_file_data 工具函数来生成 格式正确的文件数据

from deepagents.backends.utils import create_file_data

file_data = create_file_data("Hello\nWorld")

生成结果类似:

{
  'content': ['Hello', 'World'],
  'created_at': '...',
  'modified_at': '...'
}

该函数会:

  • 自动按 换行符拆分内容
  • 自动生成 创建时间
  • 自动生成 修改时间

最佳实践(Best practices)

使用描述性路径

为持久化文件使用 清晰、具有语义的路径结构

/memories/user_preferences.txt
/memories/research/topic_a/sources.txt
/memories/research/topic_a/notes.txt
/memories/project/requirements.md

这样可以让 agent 更容易 理解和组织长期存储的数据


记录内存结构

system prompt 中明确告诉 agent 不同路径存储的内容。例如:

Your persistent memory structure:

- /memories/preferences.txt: 用户偏好和设置
- /memories/context/: 用户的长期上下文信息
- /memories/knowledge/: 随时间学习到的事实和知识

这样 agent 在使用文件系统时会 更清楚每个目录的用途


清理旧数据

定期清理 过时的持久化文件,以保持存储空间可控。

可以实现:

  • 定期清理脚本
  • 生命周期管理
  • 版本归档策略

选择合适的存储

根据环境选择不同的存储方案:

场景 推荐存储
开发环境 InMemoryStore(方便快速迭代)
生产环境 PostgresStore 或其他持久化存储
多租户系统 在 store 中使用 assistant_id 作为命名空间

使用 assistant_id 命名空间可以确保 不同 agent 或用户的数据相互隔离

configurable字段

configurable 本身没有任何固定字段。
thread_idassistant_id 等都不是框架强制规定的字段,而是上层组件约定使用的字段名

官方这段文档其实已经把关键说出来了:

configurable: dict[str, Any]
Runtime values for attributes previously made configurable on this Runnable
through configurable_fields or configurable_alternatives.

核心意思是:

configurable = 给 Runnable 注入运行时参数

哪些参数可以注入,取决于:

这个 Runnable 声明了哪些 configurable_fields

LCEL / LangGraph 里一个 Runnable 可以声明:

.configurable_fields(...)

例如:

model = ChatOpenAI().configurable_fields(
    model_name="model"
)

这样就声明了:

model_name 是 configurable

然后你才能这样传:

config = {
  "configurable": {
     "model_name": "gpt-4"
  }
}

所以:

configurable字段 = Runnable声明的字段

而不是框架固定的。

那 thread_id 是怎么来的?

thread_id 不是 Runnable configurable 字段

它是:

checkpointer 约定的 config key

例如:

MemorySaver
SqliteCheckpointer
PostgresCheckpointer

这些组件在内部会读取:

config["configurable"]["thread_id"]

在这里插入图片描述

用来决定:

checkpoint namespace

所以:

thread_id 是 checkpointer 约定
不是 framework schema

assistant_id 也是同样的

例如:

StoreBackend
LangSmith deployment
multi-assistant runtime

这些组件约定读取:

configurable["assistant_id"]

在这里插入图片描述

用于:

store namespace

但 LangGraph 本身 没有强制要求这个字段存在

从架构上说:

configurable = runtime dependency injection

它允许 runtime 注入:

类型 示例
checkpoint key thread_id
store namespace assistant_id
model selection model
prompt override system_prompt
tenant info tenant_id

但:

LangGraph 不规定字段组件自己约定

为什么官方不列字段

因为 LangGraph 是 Graph runtime,而不是 agent framework

它不知道:

你要做 chat
workflow
agent
RAG
SaaS
multi-tenant

所以不能规定:

thread_id
assistant_id
user_id

这些都是 上层框架语义

例如:

框架 使用字段
LangGraph checkpointer thread_id
deepagents assistant_id
LangSmith run_id
SaaS agent tenant_id

LangChain / LangGraph 的设计是:

configurable = 可配置运行时参数

但:

schema 由 Runnable 决定

所以文档才说:

Check output_schema for a description of configurable attributes

意思是:

去看 runnable 自己声明了哪些 configurable 字段。

Code execution(代码执行)

当你使用 sandbox(沙箱)后端 时,运行环境(harness)会暴露一个 execute 工具,允许 Agent 在一个 隔离环境 中运行 shell 命令。

这使得 Agent 可以在执行任务时 安装依赖、运行脚本以及执行代码

工作原理:

  • 沙箱后端实现了 SandboxBackendProtocol

  • 当系统检测到使用了这种后端时,运行环境会 自动为 Agent 添加 execute 工具,并将其加入可用工具列表。

  • 如果 没有使用沙箱后端,Agent 只会拥有 文件系统相关工具(例如 read_filewrite_file 等),无法执行命令

  • execute 工具会返回:

    • 合并后的 stdout / stderr
    • 退出码(exit code)
    • 对于过大的输出会进行截断,并保存到文件中,Agent 可以再通过文件读取工具 逐步读取这些内容

为什么有用:

安全性
代码在隔离环境中运行,可以保护宿主系统不被 Agent 的操作影响。

干净的环境
可以使用特定的依赖或操作系统配置,而无需在本地手动安装或配置。

可复现性
团队成员之间可以共享一致的执行环境,保证运行结果的一致性。

Shell 工具(Shell tool)

将一个持久化的 Shell 会话暴露给 Agent,用于执行命令。Shell 工具中间件适用于以下场景:

  • 需要执行系统命令的 Agent
  • 开发和部署自动化任务
  • 测试与验证流程
  • 文件系统操作脚本执行

安全注意事项:

请根据你的部署安全需求选择合适的执行策略(execution policy):

  • HostExecutionPolicy
  • DockerExecutionPolicy
  • CodexSandboxExecutionPolicy

限制:

当前持久化 Shell 会话不支持**中断(human-in-the-loop)**机制。未来版本预计会增加该能力。

from langchain.agents import create_agent
from langchain.agents.middleware import (
    ShellToolMiddleware,
    HostExecutionPolicy,
)

agent = create_agent(
    model="gpt-4.1",
    tools=[search_tool],
    middleware=[
        ShellToolMiddleware(
            workspace_root="/workspace",
            execution_policy=HostExecutionPolicy(),
        ),
    ],
)

配置选项(Configuration options)

workspace_root

str | Path | None

Shell 会话的基础目录。
如果省略,当 Agent 启动时会创建一个临时目录,并在 Agent 结束时删除。


startup_commands

tuple[str, ...] | list[str] | str | None

可选参数。
在 Shell 会话启动后按顺序执行的命令


shutdown_commands

tuple[str, ...] | list[str] | str | None

可选参数。
在 Shell 会话关闭前执行的命令。


execution_policy

BaseExecutionPolicy | None

执行策略,用于控制:

  • 超时时间
  • 输出限制
  • 资源配置

可选策略:

HostExecutionPolicy

  • 完整的宿主机访问权限(默认)
  • 适用于 Agent 已经运行在容器或虚拟机中的可信环境

DockerExecutionPolicy

  • 每次 Agent 运行时都会启动一个独立的 Docker 容器
  • 提供更强的隔离性

CodexSandboxExecutionPolicy

  • 复用 Codex CLI sandbox
  • 提供额外的 系统调用 / 文件系统限制

redaction_rules

tuple[RedactionRule, ...] | list[RedactionRule] | None

可选的输出脱敏规则
在命令执行完成后,对返回给模型的输出进行敏感信息清理

⚠️ 注意:
在使用 HostExecutionPolicy 时,脱敏规则不会阻止敏感信息泄露,只是在执行完成后对输出进行处理。


tool_description

str | None

可选参数。
用于覆盖默认注册的 Shell 工具描述


shell_command

Sequence[str] | str | None

可选参数。
用于指定启动持久化 Shell 会话的Shell 可执行程序

默认值:

/bin/bash

env

Mapping[str, Any] | None

可选参数。
提供给 Shell 会话的环境变量

所有值在执行命令前都会被转换为 字符串


完整示例(Full example)

该中间件会提供一个单一持久化 Shell 会话,Agent 可以在其中按顺序执行命令

执行策略

  • HostExecutionPolicy(默认)
    原生执行,拥有完整宿主机访问权限

  • DockerExecutionPolicy
    在隔离的 Docker 容器中执行

  • CodexSandboxExecutionPolicy
    通过 Codex CLI 提供沙箱执行

基础示例:宿主机执行

from langchain.agents import create_agent
from langchain.agents.middleware import (
    ShellToolMiddleware,
    HostExecutionPolicy,
    DockerExecutionPolicy,
    RedactionRule,
)

agent = create_agent(
    model="gpt-4.1",
    tools=[search_tool],
    middleware=[
        ShellToolMiddleware(
            workspace_root="/workspace",
            execution_policy=HostExecutionPolicy(),
        ),
    ],
)

使用 Docker 隔离,并带启动命令

agent_docker = create_agent(
    model="gpt-4.1",
    tools=[],
    middleware=[
        ShellToolMiddleware(
            workspace_root="/workspace",
            startup_commands=["pip install requests", "export PYTHONPATH=/workspace"],
            execution_policy=DockerExecutionPolicy(
                image="python:3.11-slim",
                command_timeout=60.0,
            ),
        ),
    ],
)

带输出脱敏规则

agent_redacted = create_agent(
    model="gpt-4.1",
    tools=[],
    middleware=[
        ShellToolMiddleware(
            workspace_root="/workspace",
            redaction_rules=[
                RedactionRule(pii_type="api_key", detector=r"sk-[a-zA-Z0-9]{32}"),
            ],
        ),
    ],
)
和沙箱的区别

简单说:两者解决的是不同层级的问题

  • Sandbox(沙箱):解决 运行环境隔离(security boundary)
  • ShellToolMiddleware:解决 给 Agent 提供 shell 能力(tool capability)

可以理解为:

Sandbox = 安全的运行环境
ShellToolMiddleware = 一个工具

Sandbox 是 Agent 运行的环境 backend

它的核心作用是:

隔离 Agent 和宿主机

防止 Agent:

  • 读取你的本地文件
  • 访问凭据(API key)
  • 破坏系统
  • 执行危险命令

Sandbox 本质是:

Agent
  ↓
Sandbox backend
  ↓
Isolated environment
  ↓
Host system

配置 sandbox 后:

Agent 获得:

  • 文件系统工具
    ls
    read_file
    write_file
    edit_file
    glob
    grep

  • execute 工具
    (执行 shell 命令)

  • 隔离环境

例如:

Docker sandbox
VM sandbox
Remote sandbox

Agent 执行:

rm -rf /

只会影响 sandbox 容器,不会影响你的电脑。


ShellToolMiddleware:工具级别(Tool Layer)

ShellToolMiddleware 只是:

给 Agent 加一个 shell tool

它做的事情只有:

Agent
  ↓
Shell tool
  ↓
Shell session

例如 Agent 可以执行:

ls
pip install
python script.py

但关键问题是:

Shell 在哪里执行?

默认:

HostExecutionPolicy

也就是:

Agent → Shell → 你的宿主机

这其实是:

没有隔离的

两者最关键区别

对比 Sandbox ShellToolMiddleware
层级 环境 工具
作用 隔离 Agent 提供 shell 命令
是否安全 取决于 execution_policy
是否改变运行环境
是否提供 shell 提供 execute 提供 shell tool

Sandbox 其实 已经包含 shell 执行能力

sandbox backend
    ↓
execute tool

所以:

有 sandbox 时

Agent 仍然可以执行:

pip install
git clone
python script.py

但这些都在 sandbox 内部

为什么还要 ShellToolMiddleware?

因为 LangChain 有两个不同设计:

Deep Agents 设计

backend
   └ sandbox
        └ execute tool

shell 是 backend提供


LangChain middleware 设计

Agent
  └ middleware
       └ shell tool

shell 是 tool

最安全的组合(推荐)

如果做 Autonomous Agent / Coding Agent

推荐架构:

Agent
   ↓
ShellToolMiddleware
   ↓
DockerExecutionPolicy
   ↓
Docker container

或者:

Agent
   ↓
Sandbox backend
   ↓
Isolated environment

避免:

HostExecutionPolicy

否则 Agent 理论上可以:

rm -rf /
curl your_keys

可以这样理解:

Sandbox = 房子
ShellToolMiddleware = 工具箱
  • Sandbox:决定 你在哪里工作
  • ShellTool:决定 你有什么工具

Deepagent上下文管理

Deep Agents 通过有效的上下文管理来处理长时间运行的任务

Agent 可以访问多种类型的上下文:

  • 一些上下文在 Agent 启动时提供
  • 另一些在 运行过程中动态产生(例如用户输入)

输入上下文(Input context)

输入上下文指的是在 Agent 启动时提供给 Deep Agent 的信息源,这些信息会被加入到 prompt 中。

Deep Agents 使用 system prompt(系统提示词) 来定义:

  • Agent 的角色
  • 行为方式
  • 能力
  • 知识范围

如果你提供了 自定义 system prompt,它会被添加在内置 system prompt 之前

内置 system prompt 包含了对以下内置工具的详细使用指导

  • planning tool(规划工具)
  • filesystem tools(文件系统工具)
  • subagents(子 Agent)

如果 middleware 增加了新的工具(例如 filesystem middleware),

它会自动把工具使用说明附加到 system prompt 中。

这些说明会形成 tool prompts,用于指导 Agent 如何正确使用这些工具。

Deep Agent 最终构造的 Prompt 包含以下部分:

  1. 自定义 system_prompt(如果提供)

  2. 基础 agent prompt

  3. To-do list prompt
    关于如何使用 待办列表进行任务规划的说明

  4. Memory prompt

    • AGENTS.md
    • memory 使用指南
      (仅在提供 memory 时启用)
  5. Skills prompt
    包含:

    • skills 的存放位置
    • skills 列表
    • frontmatter 信息
    • 使用说明
      (仅在提供 skills 时启用)
  6. Virtual filesystem prompt
    包含:

    • filesystem 工具说明
    • execute 工具说明(如果存在)
  7. Subagent prompt
    关于如何使用 task tool 调用子 Agent

  8. 用户提供的 middleware prompts
    (当使用自定义 middleware 时)

  9. Human-in-the-loop prompt
    当启用 interrupt_on 时使用

  10. Local context prompt
    包含例如:

    • 当前目录
    • 项目信息

(在本地 CLI 运行 Agent 时提供)

运行时上下文(Runtime context)

Deep Agents 使用一种称为 上下文压缩(context compression) 的模式。
其原理是:减少 Agent 工作记忆中的信息大小,同时保留与当前任务相关的重要细节

为了确保传递给 LLM 的上下文不会超出模型的 context window 限制,Deep Agents 内置了以下机制:

  • Offloading(卸载)大型工具输入与结果
  • Summarization(摘要)

此外,你还可以配置 long-term memory(长期记忆),让 Agent 在不同线程和对话之间保存信息。

Offloading(卸载)大型工具输入和结果

Deep Agents 使用内置的 filesystem tools 自动将大内容写入文件系统,并在需要时进行搜索和读取。

内容卸载会在以下两种情况下发生:

1.Tool 调用输入超过 20,000 tokens

(可通过 tool_token_limit_before_evict 配置)

当 Agent 进行 文件写入或编辑操作时,tool call 通常会在对话历史中留下完整文件内容

但这些内容已经保存到文件系统中,因此在上下文中再次保存是冗余的

当会话上下文达到 模型上下文窗口的 85% 时:

Deep Agents 会:

  • 截断旧的 tool call
  • 用一个 指向磁盘文件的指针 替换
  • 从而减少当前上下文大小

示例:

大输入内容被保存到磁盘,而 tool call 中只保留简化版本。

在这里插入图片描述

2.Tool 调用结果超过 20,000 tokens

(同样由 tool_token_limit_before_evict 控制)

当工具返回的结果过大时:

Deep Agent 会:

  1. 将完整结果 写入 backend

  2. 在上下文中只保留:

    • 文件路径引用
    • 结果前 10 行预览

Agent 在需要时可以:

  • 重新读取
  • 搜索该文件内容

示例:
大型工具结果被替换为:

  • 文件位置
  • 前 10 行内容

在这里插入图片描述

Summarization(摘要)

在这里插入图片描述

当上下文大小超过模型的 context window 限制
(例如达到 85% 的 max_input_tokens),并且没有更多内容可以卸载时:

Deep Agent 会对 消息历史进行摘要

这个过程包括两个部分:

1.上下文摘要(In-context summary)

LLM 会生成一个 结构化摘要,包括:

  • 当前会话目标
  • 已创建的产物(artifacts)
  • 下一步任务

这个摘要会替代完整的对话历史,成为 Agent 的工作记忆。

2.文件系统保存(Filesystem preservation)

完整的原始对话记录会:

  • 写入文件系统
  • 作为完整的历史记录保存

这种方式可以:

  • 让 Agent 保持对目标和进度的理解(通过摘要)
  • 同时保留 恢复细节的能力(通过文件系统搜索)

示例:
Agent 的历史对话被压缩为摘要,只保留关键步骤。

Summarization 配置:

默认配置:

  • 模型 max_input_tokens 的 85% 时触发

  • 保留 10% tokens 作为最近上下文

  • 如果模型 profile 不可用:

    • 使用 170,000 tokens 作为触发阈值
    • 保留 6 条消息

如果模型调用触发:

ContextOverflowError

Deep Agents 会:

  1. 立即进行摘要
  2. 保留最近消息
  3. 使用新的上下文重新调用模型

旧消息会被模型总结。

Summarization Tool(摘要工具)

Deep Agents 还提供一个可选工具,允许 Agent 在合适的时间主动触发摘要

例如:

  • 完成一个任务之后
  • 开始新任务之前

而不是只在 token 达到阈值时触发。

启用方法:

from deepagents import create_deep_agent
from deepagents.backends import StateBackend
from deepagents.middleware.summarization import (
    create_summarization_tool_middleware,
)

backend = StateBackend  # 默认 backend

model = "openai:gpt-5.4"

agent = create_deep_agent(
    model=model,
    middleware=[
        create_summarization_tool_middleware(model, backend),
    ],
)

启用该功能后:

  • 不会关闭默认的 85% 自动摘要机制

Deep Agents 的 上下文管理体系是三层压缩结构

LLM Context
     ↓
Summary
     ↓
Filesystem artifacts

这种设计实际上就是 Coding Agent 能跑长任务的核心机制

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐