OpenClaw 源码解析(十):消息队列、去重与防抖机制
1. 本期目标
上一期我们分析了 OpenClaw 的 Channel 接入机制:外部平台消息会先经过 Channel adapter,再经过访问控制、群聊激活、Agent binding、sessionKey 生成,最后进入 Agent run。
这一期继续往下看一个更贴近真实聊天场景的问题:
如果用户连续发很多条消息怎么办?
如果平台重复投递同一条消息怎么办?
如果 Agent 正在执行任务,新的消息又来了怎么办?
如果回复太长,Channel 平台发不出去怎么办?
这些问题都属于 OpenClaw 的 Messages and delivery 机制。官方文档把整体消息流程概括为:
Inbound message
-> routing/bindings
-> session key
-> queue
-> agent run
-> outbound replies
也就是说,外部消息进入系统后,不会立刻无脑触发模型,而是会经过路由、会话、队列、运行和投递等多个阶段。(OpenClaw)
本期重点分析五个机制:
1. inbound dedupe:入站去重
2. inbound debounce:入站防抖
3. command queue:会话级运行队列
4. steering queue:运行中的消息注入
5. streaming / chunking / retry:输出投递控制
2. 为什么需要消息队列?
在命令行环境中,用户通常是一条命令一条命令地执行。
但在聊天平台里,消息到达具有明显的不确定性:
用户可能连续发送多条短消息;
群聊里可能多人同时 @ Agent;
Webhook 可能重试投递;
Channel SDK 可能重放事件;
Agent 正在执行长任务时,用户又补充新要求;
平台接口可能限流或发送失败。
如果没有消息队列,OpenClaw 很容易出现几个问题:
同一个 session 同时启动多个 Agent run;
多个 run 同时写 transcript;
旧回复覆盖新回复;
同一条消息被处理多次;
用户连续消息被拆成多个无意义回合;
Channel 回复顺序错乱。
所以消息队列的核心目标不是“让消息慢一点处理”,而是:
在真实聊天环境中,让消息处理有顺序、有边界、可控、可恢复。
官方 Command queue 文档也明确说,OpenClaw 会通过一个轻量的进程内队列序列化 inbound auto-reply runs,以避免多个 agent run 互相碰撞,同时允许不同 session 之间安全并行。(OpenClaw)
3. 整体消息处理链路
先给出一张整体流程图:
外部平台消息
↓
Channel adapter 标准化
↓
访问控制 / mention gating
↓
生成 agentId 和 sessionKey
↓
入站去重 dedupe
↓
入站防抖 debounce
↓
判断该 session 是否已有 active run
↓
根据 queue mode 选择处理方式
↓
Agent run / steer / followup / collect / interrupt
↓
生成回复
↓
streaming / chunking / retry
↓
投递回原 Channel
这条链路中,模型调用只是中间一环。OpenClaw 真正复杂的地方,是模型调用前后的工程控制。
4. 第一层:Inbound dedupe 入站去重
外部 Channel 经常会重复投递同一条消息。
比如:
Telegram bot 重连;
Discord gateway 事件重放;
Webhook 超时后重试;
Slack event API 重新发送;
Gateway 重启后恢复事件。
如果没有 dedupe,同一条用户消息可能触发两次 Agent run,导致重复回复、重复工具调用、重复写入 transcript。
官方文档说明,OpenClaw 会维护一个短期缓存,并用 channel、account、peer、session、message id 等信息作为 key,避免重复投递再次触发 Agent run。(OpenClaw)
可以理解为:
收到消息
↓
生成 dedupe key
↓
判断短期缓存里是否见过
├─ 见过:跳过
└─ 没见过:继续处理,并写入缓存
举个例子:
第一次收到:
telegram / default / group:-100123 / msg:7788
↓
缓存未命中
↓
触发 Agent run
第二次收到:
telegram / default / group:-100123 / msg:7788
↓
缓存命中
↓
跳过
所以 dedupe 解决的是:
同一条外部消息不要被处理多次。
5. 第二层:Inbound debounce 入站防抖
去重解决“重复消息”的问题,防抖解决“连续短消息”的问题。
人在聊天时经常不是一次性把问题说完,而是这样发:
你帮我看一下
这个日志
最后几行有问题
可能是权限错误
如果每条消息都触发一次 Agent run,就会出现四次模型调用,而且 Agent 可能在用户还没说完时就开始回答。
OpenClaw 支持通过 messages.inbound 把同一发送者在短时间内连续发来的文本消息合并成一个 Agent turn。官方文档说明,debouncing 按 channel + conversation 作用域生效,并且会使用最后一条消息作为 reply threading / IDs 的依据;文本消息可以 debounce,媒体和附件会立即 flush,控制命令会绕过 debounce。(OpenClaw)
配置示例:
{
"messages": {
"inbound": {
"debounceMs": 2000,
"byChannel": {
"whatsapp": 5000,
"slack": 1500,
"discord": 1500
}
}
}
}
可以这样理解:
用户连续发多条文本
↓
先不立刻触发 Agent
↓
等待 debounceMs
↓
如果窗口内继续收到同 conversation 消息,就继续合并
↓
窗口结束后,一次性触发一个 Agent turn
源码中可以重点看:
src/auto-reply/inbound-debounce.ts
这个文件中有 resolveInboundDebounceMs 和 createInboundDebouncer,前者负责解析全局和 channel 级 debounce 时间,后者维护按 key 分组的缓冲区、定时 flush、立即 flush、取消 key 等逻辑。(GitHub)
6. dedupe 和 debounce 的区别
这两个机制很容易混淆。
可以这样区分:
dedupe:
同一条消息重复到达,只处理一次。
debounce:
同一个会话中多条连续消息,合并成一次处理。
对应例子:
dedupe 场景:
用户只发了一条消息,但平台重复投递了两次。
debounce 场景:
用户真的发了四条消息,但这四条本来就是同一个问题。
所以它们的位置也不同:
dedupe 更靠近“消息事件可靠性”;
debounce 更靠近“聊天输入体验”。
7. 第三层:Command queue 会话级运行队列
当消息通过去重和防抖之后,还要判断当前 session 是否已经有 Agent run 正在执行。
如果没有 active run,消息可以直接启动一次 Agent run。
如果有 active run,就不能简单再开一个 run。因为同一个 session 同时运行多个 Agent,可能会同时读写 transcript、同时修改会话状态、同时发送回复,造成上下文混乱。
官方文档说明,OpenClaw 使用 lane-aware FIFO queue,并且 runEmbeddedPiAgent 会按 session:<key> 入队,以保证每个 session 同一时间只有一个 active run;随后每个 session run 还会进入一个全局 lane,用 agents.defaults.maxConcurrent 控制整体并行度。(OpenClaw)
可以画成:
不同 session:
session:A ── run1 ── run2
session:B ── run1 ── run2
全局 lane:
main lane 控制整体并发上限
这里有两个层次:
session lane:
保证同一个 session 内串行。
global lane:
控制整个 Gateway 中同时运行多少个 Agent run。
这就实现了一个平衡:
同一会话不并发,避免上下文冲突;
不同会话可并行,提高整体吞吐。
8. 第四层:queue mode 决定 active run 期间如何处理新消息
如果当前 session 没有 active run,消息直接运行。
真正复杂的是:当前 session 已经有 Agent 正在运行时,新消息该怎么办?
OpenClaw 提供四种 queue mode:
steer
followup
collect
interrupt
官方文档中说明,默认模式是 steer,并且默认配置包括 debounceMs: 500、cap: 20、drop: "summarize"。(OpenClaw)
配置示例:
{
"messages": {
"queue": {
"mode": "steer",
"debounceMs": 500,
"cap": 20,
"drop": "summarize",
"byChannel": {
"discord": "collect"
}
}
}
}
下面分别解释四种模式。
9. steer:把新消息注入当前运行
steer 是默认模式。
它的含义是:如果用户在 Agent 正在运行时又发来一条新消息,OpenClaw 会尽量把这条消息注入当前 active runtime,而不是等当前 run 结束后再开一个新 run。
官方 Steering queue 文档说明,当普通 prompt 在一个 session run 正在 streaming 时到达,OpenClaw 默认会尝试把该 prompt 发送进 active runtime;如果运行时不支持 steering,则会等待 active run 结束后再处理。(OpenClaw)
可以理解为:
Agent 正在执行
↓
用户补充:“等一下,把第二步改成这样”
↓
steer 模式尝试把补充消息注入当前 run
↓
下一次模型决策时看到新消息
这适合用户在长任务中途补充需求的场景。
但要注意:steering 通常不会打断正在执行的工具调用。官方文档说明,Pi 会在模型边界检查 queued steering messages,也就是工具调用批次执行结束、turn end 之后,再把新消息追加为 user message,供下一次 LLM call 使用。(OpenClaw)
所以 steer 更像是:
软修正当前任务
而不是:
立即强杀当前任务
10. followup:排队等待下一轮
followup 的逻辑比较简单。
当 active run 期间又收到新消息时,不注入当前 run,而是等当前 run 完成后,再把新消息作为下一次 Agent turn 执行。官方文档对 followup 的定义就是:不进行 steer,消息会在当前 run 结束后排队执行。(OpenClaw)
可以理解为:
当前任务继续完成;
新消息作为下一轮任务处理。
适合这种场景:
用户发来的新消息和当前任务关系不大;
希望每条消息保持独立上下文;
不希望中途修改当前 run。
11. collect:把等待中的消息合并为一个 followup
collect 和 followup 的区别在于:
followup:
每条消息之后单独跑一轮。
collect:
把兼容的排队消息合并成一个后续回合。
官方文档说明,collect 不进行 steering,而是把 queued messages 在 quiet window 后合并成一个 later turn;如果消息目标不同 channel 或 thread,会分别 drain,以保持路由正确性。(OpenClaw)
例如用户在 active run 期间连续发:
还有一点
把表格也加上
顺便输出 markdown
collect 模式更可能把它们合并成:
还有一点。把表格也加上。顺便输出 markdown。
然后在当前 run 结束后作为一个 followup turn 执行。
这对聊天体验很有用,因为它避免用户的补充说明变成多个碎片化回合。
12. interrupt:中断当前任务,执行最新消息
interrupt 是最强的模式。
官方文档对它的解释是:abort 当前 session 的 active run,然后运行最新消息。(OpenClaw)
可以理解为:
当前任务不重要了;
以新消息为准;
先终止旧 run,再执行新的 run。
适合场景:
用户说“停”;
用户发现前面任务方向错了;
用户发来更紧急的任务;
长任务正在跑,但用户明确要求换方向。
所以四种模式可以这样对比:
steer:
把新消息注入当前 run。
followup:
当前 run 完成后,新消息单独执行。
collect:
当前 run 完成后,多条新消息合并执行。
interrupt:
终止当前 run,执行最新消息。
13. queue mode 的优先级
OpenClaw 的 queue mode 不只来自全局配置,也可以来自当前 session 的命令覆盖。
官方文档说明,模式选择优先级大致是:
1. inline 或已保存的 per-session /queue override
2. messages.queue.byChannel.<channel>
3. messages.queue.mode
4. 默认 steer
同时,debounceMs、cap、drop 等参数也有配置和会话覆盖的优先级。(OpenClaw)
也就是说,用户可以在某个聊天会话里直接发:
/queue collect
/queue interrupt
/queue default
来改变当前 session 的排队行为。
这个设计很实用:
日常聊天:
用 steer,方便中途补充。
群聊:
用 collect,避免碎片化回复。
高风险任务:
用 interrupt,让用户能及时纠偏。
脚本任务:
用 followup,让任务顺序更稳定。
14. cap 和 drop:队列满了怎么办?
真实系统中,队列不能无限增长。
如果用户或群聊在 Agent 忙碌时持续发消息,队列可能被撑爆。因此 OpenClaw 提供了:
cap:
每个 session 最大排队消息数。
drop:
超过 cap 后如何处理旧消息或新消息。
官方文档列出三种 drop 策略:
drop: "summarize"
丢弃最旧 queued entries,但保留紧凑摘要,并作为 synthetic followup prompt 注入。
drop: "old"
丢弃最旧消息,不保留摘要。
drop: "new"
当队列已满时,拒绝最新消息。
默认是 drop: "summarize"。(OpenClaw)
这说明 OpenClaw 不只是“排队”,还会考虑队列过载后的信息保留策略。
15. steering queue 的运行边界
steer 模式最容易误解的一点是:它不是随时随地把消息插入模型内部。
官方文档明确说明,steering 不会中断已经在运行的 tool call。Pi 会在模型边界检查 queued steering messages:assistant 请求工具调用、执行当前工具调用批次、发出 turn end event、drain steering messages,然后把这些消息作为 user messages 追加到下一次 LLM call 前。(OpenClaw)
这可以画成:
LLM 生成工具调用
↓
执行工具调用 batch
↓
用户中途发来 steer 消息
↓
工具 batch 继续执行
↓
到达模型边界
↓
drain steering messages
↓
下一次 LLM call 看到用户补充
这个边界设计很重要。
因为如果在工具调用执行到一半时强行插入新消息,可能破坏:
工具调用和工具结果的对应关系;
assistant message 与 tool result 的配对;
transcript 的结构完整性;
运行时状态的一致性。
所以 steer 是一种“边界处注入”,不是“任意时刻打断”。
16. active run、steer、followup 的关系
可以用一个例子理解。
用户先发:
帮我分析这个仓库的代码结构
Agent 开始跑,正在读取文件。
此时用户又发:
重点看 src/channels
如果 queue mode 是 steer:
当前 run 到达模型边界后,把“重点看 src/channels”加入当前上下文。
如果 queue mode 是 followup:
当前 run 先完成;
然后再开一轮处理“重点看 src/channels”。
如果 queue mode 是 collect:
如果用户后面又发“顺便总结配置文件”,这些补充会被合并成一个 followup。
如果 queue mode 是 interrupt:
当前分析任务被终止;
系统直接开始处理最新消息。
所以 queue mode 的本质是:
决定 active run 期间新消息和当前任务的关系。
17. 输出侧:Streaming 和 chunking
消息队列解决输入侧的顺序问题,输出侧还要解决投递问题。
不同平台对消息长度和格式有不同限制。比如:
Discord 消息长度有限;
Telegram Markdown 可能解析失败;
WhatsApp 长文本需要拆分;
Slack thread 需要保持 reply threading;
代码块不能随便从中间切开。
OpenClaw 的 Streaming and chunking 文档说明,它有两层 streaming:
Block streaming:
模型生成过程中,把完成的文本块作为普通 channel messages 发出。
Preview streaming:
在 Telegram、Discord、Slack 等平台上更新临时 preview message。
同时,文档强调目前没有真正把 token delta 直接流式发成 channel messages,preview streaming 是基于 message 的 send + edit / append。(OpenClaw)
输出链路可以理解为:
模型输出 text_delta / events
↓
chunker 根据配置切分文本块
↓
按 channel textChunkLimit 限制长度
↓
避免破坏代码块、段落和句子结构
↓
发送普通消息或更新 preview
官方文档还说明,EmbeddedBlockChunker 会使用低 / 高边界、break preference、代码围栏保护等策略;如果达到强制切分,还会关闭并重新打开代码围栏,保证 Markdown 仍然有效。(OpenClaw)
18. 输出侧:Retry policy
即使消息成功生成,发送到平台时也可能失败。
例如:
HTTP 429 rate limit;
网络超时;
DNS 临时失败;
平台 5xx;
连接被关闭;
Telegram temporarily unavailable;
Discord rate limit。
OpenClaw 的 Retry policy 目标是:按单个 HTTP request 重试,而不是重试整个多步骤流程;同时保持顺序,并避免重复执行非幂等操作。默认 attempts 是 3,max delay cap 是 30000ms,jitter 是 0.1。(OpenClaw)
这点很关键。
如果一个回复包含:
发送文本;
上传图片;
发送文件;
发送 reaction;
某一步失败时,系统应该重试当前失败的 request,而不是从头重发整个复合流程。官方文档也明确说,retries apply per request,composite flows 不会重试已完成步骤。(OpenClaw)
可以理解为:
好的 retry:
只重试失败的那一步。
不好的 retry:
整个流程从头来一遍,导致重复发消息或重复上传文件。
19. 一条真实消息的完整例子
假设用户在 Slack thread 里连续发:
帮我看一下部署失败原因
日志在这里
重点关注权限错误
而此时 Agent 正在处理同一个 thread 的上一条任务。
在 OpenClaw 中可能发生:
Slack adapter 收到三条消息
↓
dedupe 检查:确认不是平台重复投递
↓
debounce 检查:三条短消息合并
↓
生成同一个 sessionKey:
agent:main:slack:channel:C123:thread:172...
↓
发现该 session 有 active run
↓
读取 queue mode
↓
如果是 steer:
在模型边界注入补充消息
↓
如果是 collect:
当前 run 结束后合并成一个 followup
↓
Agent 生成回复
↓
chunking 处理长回复和代码块
↓
retry 处理 Slack 发送失败
↓
回复回原 Slack thread
这条链路说明:
消息不是简单地从 Channel 到模型;
它经过了可靠性、体验、上下文、并发、投递多层控制。
20. 初学者容易混淆的几个点
20.1 debounce 不是 queue mode
debounce:
Agent run 启动前,把连续文本消息合并。
queue mode:
Agent run 已经 active 时,决定新消息怎么处理。
20.2 steer 不是 interrupt
steer:
在运行边界把新消息加入当前 run。
interrupt:
终止当前 run,执行新消息。
20.3 collect 不是简单延迟
collect:
等待当前 run 结束后,把兼容的 queued messages 合并成一个 followup。
20.4 retry 不应该重跑整个 Agent
retry:
主要用于投递请求或平台 API 请求失败后的局部重试。
Agent run:
是模型和工具执行过程,不能随便整体重试,否则可能重复执行工具动作。
20.5 同一 session 串行,不等于全局串行
同一个 session:
一般保证一个 active run。
不同 session:
可以通过全局 lane 和并发配置并行运行。
21. 源码阅读建议
这一期建议重点看这些文件:
第一组:入站防抖
src/auto-reply/inbound-debounce.ts
第二组:入站分发
src/auto-reply/dispatch.ts
src/auto-reply/reply/dispatch-from-config.ts
src/auto-reply/reply/session.ts
第三组:Agent run 与 session 队列
src/agents/agent-command.ts
src/gateway/server-methods/agent.ts
第四组:队列命令和会话覆盖
src/auto-reply/command-detection.ts
src/auto-reply/command-control.ts
src/auto-reply/command-turn-context.ts
第五组:输出投递
src/auto-reply/chunk.ts
src/auto-reply/reply/reply-dispatcher.ts
src/auto-reply/reply/delivery.ts
第六组:文档对照
docs/concepts/messages.md
docs/concepts/queue.md
docs/concepts/queue-steering.md
docs/concepts/streaming.md
docs/concepts/retry.md
阅读时可以带着这些问题:
1. debounce key 是如何构造的?
2. 哪些消息会绕过 debounce?
3. active run 是如何判断的?
4. queue mode 是在哪里解析的?
5. /queue 命令如何写入当前 session override?
6. steer 消息如何进入 active runtime?
7. followup 和 collect 在 drain 时有什么区别?
8. interrupt 如何触发 abort?
9. 长回复如何拆成多个 channel message?
10. retry 为什么只作用于单个 request?
22. 我的理解
我认为这一期是理解 OpenClaw 工程化能力的关键。
因为很多 AI Agent 项目在 demo 阶段只需要处理:
用户发一条消息;
模型回一条消息。
但真实聊天环境更复杂:
用户会连发;
平台会重投;
群聊会多人说话;
Agent 会跑长任务;
回复会很长;
平台会限流;
用户会中途纠正;
用户会要求停止。
OpenClaw 的 messages pipeline 正是在解决这些真实问题。
它用:
dedupe 解决重复投递;
debounce 解决连续短消息;
session lane 解决同一上下文并发冲突;
global lane 控制整体并发;
steer 支持中途补充;
followup 保持顺序;
collect 合并碎片输入;
interrupt 支持任务纠偏;
chunking 适配平台限制;
retry 提升投递可靠性。
这也是 OpenClaw 不只是“LLM 包装器”的原因。它把聊天平台中的不确定消息流,整理成 Agent 可以稳定处理的任务流。
23. 本期重点理解
这一期可以总结为五点:
第一,OpenClaw 的消息链路不是直接从 Channel 到模型,而是经过 routing、session、dedupe、debounce、queue、agent run 和 delivery。
第二,dedupe 用来避免平台重复投递导致重复 Agent run。
第三,debounce 用来把用户连续发送的短文本合并成一个 Agent turn。
第四,queue mode 决定 active run 期间新消息如何处理,主要包括 steer、followup、collect 和 interrupt。
第五,输出侧通过 streaming、chunking 和 retry 处理长回复、平台限制和临时发送失败。
一句话概括:
OpenClaw 的消息队列机制,本质上是把真实聊天平台里的混乱消息流,转换成有序、可控、可恢复的 Agent 任务流。
24. 本期小结
本期主要分析了 OpenClaw 的消息队列、去重与防抖机制。入站消息进入 OpenClaw 后,会先经过 dedupe,避免平台重复投递导致重复执行;再经过 debounce,把短时间内连续到达的文本消息合并成一个 Agent turn。进入 session 后,如果当前 session 已有 active run,OpenClaw 会根据 queue mode 决定如何处理新消息:steer 会尽量注入当前运行,followup 会排队到下一轮,collect 会合并多个后续消息,interrupt 会中断当前运行并执行最新消息。输出阶段,OpenClaw 还会通过 streaming、chunking 和 retry 处理平台消息长度限制、临时发送失败和长回复投递问题。
这一期可以用一句话总结:
dedupe 保证同一条消息不重复处理,debounce 保证连续消息不碎片化,queue mode 保证 active run 期间的新消息有明确处理策略。
下一期可以继续分析:
OpenClaw 源码解析(十一):Tools 工具系统与 Agent 能力扩展
下一期重点看 OpenClaw 如何组织工具目录、如何暴露 tools.* Gateway 方法、Agent 如何调用工具、工具结果如何进入 transcript,以及工具调用如何和插件、权限、sandbox 结合起来。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)