1. 本期目标

上一期我们分析了 OpenClaw 的 Channel 接入机制:外部平台消息会先经过 Channel adapter,再经过访问控制、群聊激活、Agent binding、sessionKey 生成,最后进入 Agent run。

这一期继续往下看一个更贴近真实聊天场景的问题:

如果用户连续发很多条消息怎么办?
如果平台重复投递同一条消息怎么办?
如果 Agent 正在执行任务,新的消息又来了怎么办?
如果回复太长,Channel 平台发不出去怎么办?

这些问题都属于 OpenClaw 的 Messages and delivery 机制。官方文档把整体消息流程概括为:

Inbound message
  -> routing/bindings
  -> session key
  -> queue
  -> agent run
  -> outbound replies

也就是说,外部消息进入系统后,不会立刻无脑触发模型,而是会经过路由、会话、队列、运行和投递等多个阶段。(OpenClaw)

本期重点分析五个机制:

1. inbound dedupe:入站去重
2. inbound debounce:入站防抖
3. command queue:会话级运行队列
4. steering queue:运行中的消息注入
5. streaming / chunking / retry:输出投递控制

2. 为什么需要消息队列?

在命令行环境中,用户通常是一条命令一条命令地执行。

但在聊天平台里,消息到达具有明显的不确定性:

用户可能连续发送多条短消息;
群聊里可能多人同时 @ Agent;
Webhook 可能重试投递;
Channel SDK 可能重放事件;
Agent 正在执行长任务时,用户又补充新要求;
平台接口可能限流或发送失败。

如果没有消息队列,OpenClaw 很容易出现几个问题:

同一个 session 同时启动多个 Agent run;
多个 run 同时写 transcript;
旧回复覆盖新回复;
同一条消息被处理多次;
用户连续消息被拆成多个无意义回合;
Channel 回复顺序错乱。

所以消息队列的核心目标不是“让消息慢一点处理”,而是:

在真实聊天环境中,让消息处理有顺序、有边界、可控、可恢复。

官方 Command queue 文档也明确说,OpenClaw 会通过一个轻量的进程内队列序列化 inbound auto-reply runs,以避免多个 agent run 互相碰撞,同时允许不同 session 之间安全并行。(OpenClaw)


3. 整体消息处理链路

先给出一张整体流程图:

外部平台消息
    ↓
Channel adapter 标准化
    ↓
访问控制 / mention gating
    ↓
生成 agentId 和 sessionKey
    ↓
入站去重 dedupe
    ↓
入站防抖 debounce
    ↓
判断该 session 是否已有 active run
    ↓
根据 queue mode 选择处理方式
    ↓
Agent run / steer / followup / collect / interrupt
    ↓
生成回复
    ↓
streaming / chunking / retry
    ↓
投递回原 Channel

这条链路中,模型调用只是中间一环。OpenClaw 真正复杂的地方,是模型调用前后的工程控制。


4. 第一层:Inbound dedupe 入站去重

外部 Channel 经常会重复投递同一条消息。

比如:

Telegram bot 重连;
Discord gateway 事件重放;
Webhook 超时后重试;
Slack event API 重新发送;
Gateway 重启后恢复事件。

如果没有 dedupe,同一条用户消息可能触发两次 Agent run,导致重复回复、重复工具调用、重复写入 transcript。

官方文档说明,OpenClaw 会维护一个短期缓存,并用 channel、account、peer、session、message id 等信息作为 key,避免重复投递再次触发 Agent run。(OpenClaw)

可以理解为:

收到消息
    ↓
生成 dedupe key
    ↓
判断短期缓存里是否见过
    ├─ 见过:跳过
    └─ 没见过:继续处理,并写入缓存

举个例子:

第一次收到:
telegram / default / group:-100123 / msg:7788
    ↓
缓存未命中
    ↓
触发 Agent run

第二次收到:
telegram / default / group:-100123 / msg:7788
    ↓
缓存命中
    ↓
跳过

所以 dedupe 解决的是:

同一条外部消息不要被处理多次。

5. 第二层:Inbound debounce 入站防抖

去重解决“重复消息”的问题,防抖解决“连续短消息”的问题。

人在聊天时经常不是一次性把问题说完,而是这样发:

你帮我看一下
这个日志
最后几行有问题
可能是权限错误

如果每条消息都触发一次 Agent run,就会出现四次模型调用,而且 Agent 可能在用户还没说完时就开始回答。

OpenClaw 支持通过 messages.inbound 把同一发送者在短时间内连续发来的文本消息合并成一个 Agent turn。官方文档说明,debouncing 按 channel + conversation 作用域生效,并且会使用最后一条消息作为 reply threading / IDs 的依据;文本消息可以 debounce,媒体和附件会立即 flush,控制命令会绕过 debounce。(OpenClaw)

配置示例:

{
  "messages": {
    "inbound": {
      "debounceMs": 2000,
      "byChannel": {
        "whatsapp": 5000,
        "slack": 1500,
        "discord": 1500
      }
    }
  }
}

可以这样理解:

用户连续发多条文本
    ↓
先不立刻触发 Agent
    ↓
等待 debounceMs
    ↓
如果窗口内继续收到同 conversation 消息,就继续合并
    ↓
窗口结束后,一次性触发一个 Agent turn

源码中可以重点看:

src/auto-reply/inbound-debounce.ts

这个文件中有 resolveInboundDebounceMscreateInboundDebouncer,前者负责解析全局和 channel 级 debounce 时间,后者维护按 key 分组的缓冲区、定时 flush、立即 flush、取消 key 等逻辑。(GitHub)


6. dedupe 和 debounce 的区别

这两个机制很容易混淆。

可以这样区分:

dedupe:
同一条消息重复到达,只处理一次。

debounce:
同一个会话中多条连续消息,合并成一次处理。

对应例子:

dedupe 场景:
用户只发了一条消息,但平台重复投递了两次。

debounce 场景:
用户真的发了四条消息,但这四条本来就是同一个问题。

所以它们的位置也不同:

dedupe 更靠近“消息事件可靠性”;
debounce 更靠近“聊天输入体验”。

7. 第三层:Command queue 会话级运行队列

当消息通过去重和防抖之后,还要判断当前 session 是否已经有 Agent run 正在执行。

如果没有 active run,消息可以直接启动一次 Agent run。

如果有 active run,就不能简单再开一个 run。因为同一个 session 同时运行多个 Agent,可能会同时读写 transcript、同时修改会话状态、同时发送回复,造成上下文混乱。

官方文档说明,OpenClaw 使用 lane-aware FIFO queue,并且 runEmbeddedPiAgent 会按 session:<key> 入队,以保证每个 session 同一时间只有一个 active run;随后每个 session run 还会进入一个全局 lane,用 agents.defaults.maxConcurrent 控制整体并行度。(OpenClaw)

可以画成:

不同 session:
session:A  ── run1 ── run2
session:B  ── run1 ── run2

全局 lane:
main lane 控制整体并发上限

这里有两个层次:

session lane:
保证同一个 session 内串行。

global lane:
控制整个 Gateway 中同时运行多少个 Agent run。

这就实现了一个平衡:

同一会话不并发,避免上下文冲突;
不同会话可并行,提高整体吞吐。

8. 第四层:queue mode 决定 active run 期间如何处理新消息

如果当前 session 没有 active run,消息直接运行。

真正复杂的是:当前 session 已经有 Agent 正在运行时,新消息该怎么办?

OpenClaw 提供四种 queue mode:

steer
followup
collect
interrupt

官方文档中说明,默认模式是 steer,并且默认配置包括 debounceMs: 500cap: 20drop: "summarize"。(OpenClaw)

配置示例:

{
  "messages": {
    "queue": {
      "mode": "steer",
      "debounceMs": 500,
      "cap": 20,
      "drop": "summarize",
      "byChannel": {
        "discord": "collect"
      }
    }
  }
}

下面分别解释四种模式。


9. steer:把新消息注入当前运行

steer 是默认模式。

它的含义是:如果用户在 Agent 正在运行时又发来一条新消息,OpenClaw 会尽量把这条消息注入当前 active runtime,而不是等当前 run 结束后再开一个新 run。

官方 Steering queue 文档说明,当普通 prompt 在一个 session run 正在 streaming 时到达,OpenClaw 默认会尝试把该 prompt 发送进 active runtime;如果运行时不支持 steering,则会等待 active run 结束后再处理。(OpenClaw)

可以理解为:

Agent 正在执行
    ↓
用户补充:“等一下,把第二步改成这样”
    ↓
steer 模式尝试把补充消息注入当前 run
    ↓
下一次模型决策时看到新消息

这适合用户在长任务中途补充需求的场景。

但要注意:steering 通常不会打断正在执行的工具调用。官方文档说明,Pi 会在模型边界检查 queued steering messages,也就是工具调用批次执行结束、turn end 之后,再把新消息追加为 user message,供下一次 LLM call 使用。(OpenClaw)

所以 steer 更像是:

软修正当前任务

而不是:

立即强杀当前任务

10. followup:排队等待下一轮

followup 的逻辑比较简单。

当 active run 期间又收到新消息时,不注入当前 run,而是等当前 run 完成后,再把新消息作为下一次 Agent turn 执行。官方文档对 followup 的定义就是:不进行 steer,消息会在当前 run 结束后排队执行。(OpenClaw)

可以理解为:

当前任务继续完成;
新消息作为下一轮任务处理。

适合这种场景:

用户发来的新消息和当前任务关系不大;
希望每条消息保持独立上下文;
不希望中途修改当前 run。

11. collect:把等待中的消息合并为一个 followup

collectfollowup 的区别在于:

followup:
每条消息之后单独跑一轮。

collect:
把兼容的排队消息合并成一个后续回合。

官方文档说明,collect 不进行 steering,而是把 queued messages 在 quiet window 后合并成一个 later turn;如果消息目标不同 channel 或 thread,会分别 drain,以保持路由正确性。(OpenClaw)

例如用户在 active run 期间连续发:

还有一点
把表格也加上
顺便输出 markdown

collect 模式更可能把它们合并成:

还有一点。把表格也加上。顺便输出 markdown。

然后在当前 run 结束后作为一个 followup turn 执行。

这对聊天体验很有用,因为它避免用户的补充说明变成多个碎片化回合。


12. interrupt:中断当前任务,执行最新消息

interrupt 是最强的模式。

官方文档对它的解释是:abort 当前 session 的 active run,然后运行最新消息。(OpenClaw)

可以理解为:

当前任务不重要了;
以新消息为准;
先终止旧 run,再执行新的 run。

适合场景:

用户说“停”;
用户发现前面任务方向错了;
用户发来更紧急的任务;
长任务正在跑,但用户明确要求换方向。

所以四种模式可以这样对比:

steer:
把新消息注入当前 run。

followup:
当前 run 完成后,新消息单独执行。

collect:
当前 run 完成后,多条新消息合并执行。

interrupt:
终止当前 run,执行最新消息。

13. queue mode 的优先级

OpenClaw 的 queue mode 不只来自全局配置,也可以来自当前 session 的命令覆盖。

官方文档说明,模式选择优先级大致是:

1. inline 或已保存的 per-session /queue override
2. messages.queue.byChannel.<channel>
3. messages.queue.mode
4. 默认 steer

同时,debounceMscapdrop 等参数也有配置和会话覆盖的优先级。(OpenClaw)

也就是说,用户可以在某个聊天会话里直接发:

/queue collect
/queue interrupt
/queue default

来改变当前 session 的排队行为。

这个设计很实用:

日常聊天:
用 steer,方便中途补充。

群聊:
用 collect,避免碎片化回复。

高风险任务:
用 interrupt,让用户能及时纠偏。

脚本任务:
用 followup,让任务顺序更稳定。

14. cap 和 drop:队列满了怎么办?

真实系统中,队列不能无限增长。

如果用户或群聊在 Agent 忙碌时持续发消息,队列可能被撑爆。因此 OpenClaw 提供了:

cap:
每个 session 最大排队消息数。

drop:
超过 cap 后如何处理旧消息或新消息。

官方文档列出三种 drop 策略:

drop: "summarize"
    丢弃最旧 queued entries,但保留紧凑摘要,并作为 synthetic followup prompt 注入。

drop: "old"
    丢弃最旧消息,不保留摘要。

drop: "new"
    当队列已满时,拒绝最新消息。

默认是 drop: "summarize"。(OpenClaw)

这说明 OpenClaw 不只是“排队”,还会考虑队列过载后的信息保留策略。


15. steering queue 的运行边界

steer 模式最容易误解的一点是:它不是随时随地把消息插入模型内部。

官方文档明确说明,steering 不会中断已经在运行的 tool call。Pi 会在模型边界检查 queued steering messages:assistant 请求工具调用、执行当前工具调用批次、发出 turn end event、drain steering messages,然后把这些消息作为 user messages 追加到下一次 LLM call 前。(OpenClaw)

这可以画成:

LLM 生成工具调用
    ↓
执行工具调用 batch
    ↓
用户中途发来 steer 消息
    ↓
工具 batch 继续执行
    ↓
到达模型边界
    ↓
drain steering messages
    ↓
下一次 LLM call 看到用户补充

这个边界设计很重要。

因为如果在工具调用执行到一半时强行插入新消息,可能破坏:

工具调用和工具结果的对应关系;
assistant message 与 tool result 的配对;
transcript 的结构完整性;
运行时状态的一致性。

所以 steer 是一种“边界处注入”,不是“任意时刻打断”。


16. active run、steer、followup 的关系

可以用一个例子理解。

用户先发:

帮我分析这个仓库的代码结构

Agent 开始跑,正在读取文件。

此时用户又发:

重点看 src/channels

如果 queue mode 是 steer

当前 run 到达模型边界后,把“重点看 src/channels”加入当前上下文。

如果 queue mode 是 followup

当前 run 先完成;
然后再开一轮处理“重点看 src/channels”。

如果 queue mode 是 collect

如果用户后面又发“顺便总结配置文件”,这些补充会被合并成一个 followup。

如果 queue mode 是 interrupt

当前分析任务被终止;
系统直接开始处理最新消息。

所以 queue mode 的本质是:

决定 active run 期间新消息和当前任务的关系。

17. 输出侧:Streaming 和 chunking

消息队列解决输入侧的顺序问题,输出侧还要解决投递问题。

不同平台对消息长度和格式有不同限制。比如:

Discord 消息长度有限;
Telegram Markdown 可能解析失败;
WhatsApp 长文本需要拆分;
Slack thread 需要保持 reply threading;
代码块不能随便从中间切开。

OpenClaw 的 Streaming and chunking 文档说明,它有两层 streaming:

Block streaming:
模型生成过程中,把完成的文本块作为普通 channel messages 发出。

Preview streaming:
在 Telegram、Discord、Slack 等平台上更新临时 preview message。

同时,文档强调目前没有真正把 token delta 直接流式发成 channel messages,preview streaming 是基于 message 的 send + edit / append。(OpenClaw)

输出链路可以理解为:

模型输出 text_delta / events
    ↓
chunker 根据配置切分文本块
    ↓
按 channel textChunkLimit 限制长度
    ↓
避免破坏代码块、段落和句子结构
    ↓
发送普通消息或更新 preview

官方文档还说明,EmbeddedBlockChunker 会使用低 / 高边界、break preference、代码围栏保护等策略;如果达到强制切分,还会关闭并重新打开代码围栏,保证 Markdown 仍然有效。(OpenClaw)


18. 输出侧:Retry policy

即使消息成功生成,发送到平台时也可能失败。

例如:

HTTP 429 rate limit;
网络超时;
DNS 临时失败;
平台 5xx;
连接被关闭;
Telegram temporarily unavailable;
Discord rate limit。

OpenClaw 的 Retry policy 目标是:按单个 HTTP request 重试,而不是重试整个多步骤流程;同时保持顺序,并避免重复执行非幂等操作。默认 attempts 是 3,max delay cap 是 30000ms,jitter 是 0.1。(OpenClaw)

这点很关键。

如果一个回复包含:

发送文本;
上传图片;
发送文件;
发送 reaction;

某一步失败时,系统应该重试当前失败的 request,而不是从头重发整个复合流程。官方文档也明确说,retries apply per request,composite flows 不会重试已完成步骤。(OpenClaw)

可以理解为:

好的 retry:
只重试失败的那一步。

不好的 retry:
整个流程从头来一遍,导致重复发消息或重复上传文件。

19. 一条真实消息的完整例子

假设用户在 Slack thread 里连续发:

帮我看一下部署失败原因
日志在这里
重点关注权限错误

而此时 Agent 正在处理同一个 thread 的上一条任务。

在 OpenClaw 中可能发生:

Slack adapter 收到三条消息
    ↓
dedupe 检查:确认不是平台重复投递
    ↓
debounce 检查:三条短消息合并
    ↓
生成同一个 sessionKey:
agent:main:slack:channel:C123:thread:172...
    ↓
发现该 session 有 active run
    ↓
读取 queue mode
    ↓
如果是 steer:
    在模型边界注入补充消息
    ↓
如果是 collect:
    当前 run 结束后合并成一个 followup
    ↓
Agent 生成回复
    ↓
chunking 处理长回复和代码块
    ↓
retry 处理 Slack 发送失败
    ↓
回复回原 Slack thread

这条链路说明:

消息不是简单地从 Channel 到模型;
它经过了可靠性、体验、上下文、并发、投递多层控制。

20. 初学者容易混淆的几个点

20.1 debounce 不是 queue mode

debounce:
Agent run 启动前,把连续文本消息合并。

queue mode:
Agent run 已经 active 时,决定新消息怎么处理。

20.2 steer 不是 interrupt

steer:
在运行边界把新消息加入当前 run。

interrupt:
终止当前 run,执行新消息。

20.3 collect 不是简单延迟

collect:
等待当前 run 结束后,把兼容的 queued messages 合并成一个 followup。

20.4 retry 不应该重跑整个 Agent

retry:
主要用于投递请求或平台 API 请求失败后的局部重试。

Agent run:
是模型和工具执行过程,不能随便整体重试,否则可能重复执行工具动作。

20.5 同一 session 串行,不等于全局串行

同一个 session:
一般保证一个 active run。

不同 session:
可以通过全局 lane 和并发配置并行运行。

21. 源码阅读建议

这一期建议重点看这些文件:

第一组:入站防抖
src/auto-reply/inbound-debounce.ts

第二组:入站分发
src/auto-reply/dispatch.ts
src/auto-reply/reply/dispatch-from-config.ts
src/auto-reply/reply/session.ts

第三组:Agent run 与 session 队列
src/agents/agent-command.ts
src/gateway/server-methods/agent.ts

第四组:队列命令和会话覆盖
src/auto-reply/command-detection.ts
src/auto-reply/command-control.ts
src/auto-reply/command-turn-context.ts

第五组:输出投递
src/auto-reply/chunk.ts
src/auto-reply/reply/reply-dispatcher.ts
src/auto-reply/reply/delivery.ts

第六组:文档对照
docs/concepts/messages.md
docs/concepts/queue.md
docs/concepts/queue-steering.md
docs/concepts/streaming.md
docs/concepts/retry.md

阅读时可以带着这些问题:

1. debounce key 是如何构造的?
2. 哪些消息会绕过 debounce?
3. active run 是如何判断的?
4. queue mode 是在哪里解析的?
5. /queue 命令如何写入当前 session override?
6. steer 消息如何进入 active runtime?
7. followup 和 collect 在 drain 时有什么区别?
8. interrupt 如何触发 abort?
9. 长回复如何拆成多个 channel message?
10. retry 为什么只作用于单个 request?

22. 我的理解

我认为这一期是理解 OpenClaw 工程化能力的关键。

因为很多 AI Agent 项目在 demo 阶段只需要处理:

用户发一条消息;
模型回一条消息。

但真实聊天环境更复杂:

用户会连发;
平台会重投;
群聊会多人说话;
Agent 会跑长任务;
回复会很长;
平台会限流;
用户会中途纠正;
用户会要求停止。

OpenClaw 的 messages pipeline 正是在解决这些真实问题。

它用:

dedupe 解决重复投递;
debounce 解决连续短消息;
session lane 解决同一上下文并发冲突;
global lane 控制整体并发;
steer 支持中途补充;
followup 保持顺序;
collect 合并碎片输入;
interrupt 支持任务纠偏;
chunking 适配平台限制;
retry 提升投递可靠性。

这也是 OpenClaw 不只是“LLM 包装器”的原因。它把聊天平台中的不确定消息流,整理成 Agent 可以稳定处理的任务流。


23. 本期重点理解

这一期可以总结为五点:

第一,OpenClaw 的消息链路不是直接从 Channel 到模型,而是经过 routing、session、dedupe、debounce、queue、agent run 和 delivery。

第二,dedupe 用来避免平台重复投递导致重复 Agent run。

第三,debounce 用来把用户连续发送的短文本合并成一个 Agent turn。

第四,queue mode 决定 active run 期间新消息如何处理,主要包括 steer、followup、collect 和 interrupt。

第五,输出侧通过 streaming、chunking 和 retry 处理长回复、平台限制和临时发送失败。

一句话概括:

OpenClaw 的消息队列机制,本质上是把真实聊天平台里的混乱消息流,转换成有序、可控、可恢复的 Agent 任务流。

24. 本期小结

本期主要分析了 OpenClaw 的消息队列、去重与防抖机制。入站消息进入 OpenClaw 后,会先经过 dedupe,避免平台重复投递导致重复执行;再经过 debounce,把短时间内连续到达的文本消息合并成一个 Agent turn。进入 session 后,如果当前 session 已有 active run,OpenClaw 会根据 queue mode 决定如何处理新消息:steer 会尽量注入当前运行,followup 会排队到下一轮,collect 会合并多个后续消息,interrupt 会中断当前运行并执行最新消息。输出阶段,OpenClaw 还会通过 streaming、chunking 和 retry 处理平台消息长度限制、临时发送失败和长回复投递问题。

这一期可以用一句话总结:

dedupe 保证同一条消息不重复处理,debounce 保证连续消息不碎片化,queue mode 保证 active run 期间的新消息有明确处理策略。

下一期可以继续分析:

OpenClaw 源码解析(十一):Tools 工具系统与 Agent 能力扩展

下一期重点看 OpenClaw 如何组织工具目录、如何暴露 tools.* Gateway 方法、Agent 如何调用工具、工具结果如何进入 transcript,以及工具调用如何和插件、权限、sandbox 结合起来。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐