Ai读码,OpenClaw 源码深度解析(二):执行篇 — Agent Loop 如何调用 LLM 和工具
OpenClaw 源码深度解析(二):执行篇 — Agent Loop 如何调用 LLM 和工具
OpenClaw v2026.2.25 | Node.js 22+ | TypeScript ESM
引言
规划阶段准备好了一切——session 恢复了、system prompt 组装好了、工具集过滤完毕、模型和 API Key 就位。现在,session.prompt("帮我查一下天气") 被调用,Agent 正式进入执行阶段。
这是 OpenClaw 最核心、最精妙的代码路径。它不是简单的"调一次 LLM 拿结果",而是一个循环驱动的 Agentic Loop——LLM 可以反复调用工具、根据工具结果继续思考、直到生成最终回复。本文深入这个循环的每一个齿轮。
一、Agent Loop 全景
session.prompt(userMessage)
│
│ ┌─────────────────────────────────────────────────┐
│ │ Pi Agent Loop │
│ │ │
│ │ ┌──── 迭代 N ────────────────────────────┐ │
│ │ │ │ │
│ │ │ 1. 构建上下文 (system + history + msg)│ │
│ │ │ 2. 调用 LLM API (流式) │ │
│ │ │ 3. 解析响应 │ │
│ │ │ ├── 纯文本 → 回复用户 → 结束 │ │
│ │ │ └── tool_calls → 进入步骤 4 │ │
│ │ │ 4. 逐个执行工具 │ │
│ │ │ 5. 结果追加到历史 │ │
│ │ │ 6. → 回到步骤 1 (迭代 N+1) │ │
│ │ │ │ │
│ │ └────────────────────────────────────────┘ │
│ │ │
│ │ 终止条件: LLM 返回纯文本(无 tool_calls) │
│ └─────────────────────────────────────────────────┘
一次"帮我查天气"的请求,通常需要 2-3 次迭代:
- LLM 返回
web_fetch工具调用 - 执行工具获取天气数据,结果送回 LLM
- LLM 返回纯文本回复
而复杂任务(“帮我改这个 bug”)可能需要 10+ 次迭代——读文件、运行测试、编辑代码、再运行测试…
二、迭代 Step 1:构建上下文
每次迭代开始前,Pi SDK 需要构建发给 LLM 的完整上下文。
2.1 上下文组成
// 发给 LLM 的消息序列
const messages = [
// 1. System Prompt (由 OpenClaw 预构建)
{ role: 'system', content: systemPrompt },
// 2. 历史对话 (从 jsonl transcript 读取)
...historyMessages, // user/assistant/tool 消息链
// 3. 当前用户消息
{ role: 'user', content: '帮我查一下天气' },
// 4. 之前的工具结果 (如果是第 2+ 次迭代)
// ...{ role: 'tool', content: '北京 晴 15-25°C' }
];
2.2 历史裁剪
历史消息不是无限注入的。OpenClaw 会根据场景裁剪:
// src/agents/pi-embedded-runner/history.ts
function limitHistoryTurns(params: {
history: Message[];
chatType: 'direct' | 'group';
maxTurns?: number;
}): Message[] {
// 私聊: 保留更多历史 (完整上下文)
// 群聊: 保留较少 (节省 token,群聊对话通常较碎片化)
const defaultMaxTurns = params.chatType === 'group' ? 10 : 50;
const maxTurns = params.maxTurns ?? defaultMaxTurns;
if (params.history.length <= maxTurns) return params.history;
// 保留最近的 turns,裁剪掉旧的
return params.history.slice(-maxTurns);
}
2.3 Transcript Hygiene (转录卫生)
在构建上下文之前,OpenClaw 会对 transcript 做提供商特定的修复:
// src/agents/pi-embedded-runner/google.ts
function sanitizeSessionHistory(history: Message[], provider: string): Message[] {
let sanitized = history;
// 全局: 图片降采样 (防止 token 爆炸)
sanitized = sanitizeImages(sanitized);
// 全局: 清除不完整的 tool_calls
sanitized = dropMalformedToolCalls(sanitized);
// Google/Gemini 特有:
if (provider === 'google') {
sanitized = sanitizeToolCallIdsStrict(sanitized); // ID 必须纯字母数字
sanitized = repairToolResultPairing(sanitized); // 修复 tool_use/tool_result 配对
sanitized = applyGoogleTurnOrderingFix(sanitized); // 修复对话顺序
}
// Anthropic 特有:
if (provider === 'anthropic') {
sanitized = mergeConsecutiveUserTurns(sanitized); // 合并连续 user 消息
sanitized = repairToolResultPairing(sanitized); // 修复配对
}
// Mistral 特有:
if (provider === 'mistral') {
sanitized = sanitizeToolCallIdsLength9(sanitized); // ID 必须 9 位字母数字
}
return sanitized;
}
这是一个被低估的设计——不同 LLM 对 API 格式的要求差异很大。如果不做这些修复,provider 会直接拒绝请求。OpenClaw 选择在上游解决这些问题,而不是让用户看到莫名其妙的报错。
三、迭代 Step 2:调用 LLM API
3.1 流式调用
// Pi SDK 内部 (简化)
import { streamSimple } from '@mariozechner/pi-ai';
const stream = streamSimple(model, messages, {
maxTokens: contextWindow - currentTokens,
thinking: thinkingLevel,
// provider 特定参数
...getProviderExtraParams(provider),
});
3.2 Provider Extra Params
不同 provider 需要不同的额外参数:
// src/agents/pi-embedded-runner/extra-params.ts
function getProviderExtraParams(provider: string) {
switch (provider) {
case 'google':
return {
// Gemini 需要的特定配置
turnLocation: 'last',
};
case 'openai':
return {
// OpenAI 的 reasoning effort
reasoningEffort: mapThinkingToOpenAI(thinkingLevel),
};
case 'anthropic':
return {
// Claude 的特定参数
cacheControl: getCacheControl(),
};
default:
return {};
}
}
3.3 AbortSignal 支持
每次 LLM 调用都绑定一个 AbortSignal,用户发 /stop 时可以立即中断:
// src/agents/pi-tools.abort.ts
function wrapToolWithAbort(tool: AnyAgentTool, getSignal: () => AbortSignal) {
return {
...tool,
execute: async (id, params, signal, onUpdate) => {
const abortSignal = getSignal();
// 组合两个 signal:工具自己的 + 外部 abort
const combined = combineAbortSignals(signal, abortSignal);
return tool.execute(id, params, combined, onUpdate);
},
};
}
四、迭代 Step 3:解析 LLM 响应
LLM 的流式响应分为两种情况:
情况 A:纯文本回复 → 结束循环
{
"type": "text",
"text": "今天北京晴,气温 15-25°C,适合外出。"
}
Agent Loop 结束,进入流式返回阶段。
情况 B:包含 tool_calls → 继续循环
{
"type": "tool_calls",
"text": "我来帮你查一下天气数据...",
"tool_calls": [
{
"id": "call_abc123",
"name": "web_fetch",
"params": { "url": "https://wttr.in/Beijing?format=j1" }
}
]
}
进入工具执行阶段。
五、迭代 Step 4:工具执行
5.1 执行流程
// Pi SDK 内部 (简化)
for (const toolCall of response.toolCalls) {
// 1. 触发事件
emit('tool_execution_start', { toolName: toolCall.name, callId: toolCall.id });
// 2. 查找工具
const tool = toolRegistry.get(toolCall.name);
if (!tool) {
emit('tool_execution_end', { error: `Unknown tool: ${toolCall.name}` });
continue;
}
// 3. 执行
try {
const result = await tool.execute(
toolCall.id,
toolCall.params,
abortSignal,
(update) => {
// 流式更新 (长耗时工具可以报告进度)
emit('tool_execution_update', update);
}
);
// 4. 触发完成事件
emit('tool_execution_end', { callId: toolCall.id, result });
// 5. 追加到历史
history.push({ role: 'tool', content: result, toolCallId: toolCall.id });
} catch (error) {
if (error.name === 'AbortError') {
emit('tool_execution_end', { callId: toolCall.id, error: 'aborted' });
throw error; // 中断整个循环
}
history.push({ role: 'tool', content: `Error: ${error.message}`, toolCallId: toolCall.id });
}
}
5.2 工具定义适配器
OpenClaw 的工具需要适配两个不同的签名——pi-agent-core 的 AgentTool 和 pi-coding-agent 的 ToolDefinition:
// src/agents/pi-tool-definition-adapter.ts
export function toToolDefinitions(tools: AnyAgentTool[]): ToolDefinition[] {
return tools.map(tool => ({
name: tool.name,
label: tool.label ?? tool.name,
description: tool.description ?? '',
parameters: tool.parameters,
execute: async (toolCallId, params, _ctx, signal, onUpdate) => {
// 签名转换: pi-coding-agent 的 execute 有 5 个参数
// AgentTool 的 execute 有 4 个参数
// 这里做桥接
return await tool.execute(toolCallId, params, signal, onUpdate);
},
}));
}
这看起来很小,但在实际中是一个反复出 bug 的地方——两个 SDK 版本的签名差异导致各种运行时错误。OpenClaw 用适配器层统一处理。
5.3 典型工具执行示例
以 web_fetch 为例:
// src/agents/tools/web-fetch.ts (简化)
const webFetchTool = {
name: 'web_fetch',
description: 'Fetch and extract readable content from a URL',
parameters: {
type: 'object',
properties: {
url: { type: 'string', description: 'HTTP(S) URL to fetch' },
extractMode: { type: 'string', enum: ['markdown', 'text'] },
maxChars: { type: 'number', default: 50000 },
},
required: ['url'],
},
execute: async (id, params, signal) => {
// 1. 安全校验 (ClawDefender)
validateUrl(params.url); // 防止 SSRF
// 2. 发起 HTTP 请求
const response = await fetch(params.url, { signal });
// 3. 提取正文
const html = await response.text();
const readable = extractReadability(html); // 用 @mozilla/readability
// 4. 格式化输出
const output = params.extractMode === 'markdown'
? toMarkdown(readable)
: toText(readable);
// 5. 截断保护
return output.slice(0, params.maxChars ?? 50000);
},
};
以 exec(命令执行)为例:
// src/agents/tools/exec-tool.ts (简化)
const execTool = {
name: 'exec',
description: 'Run shell commands',
parameters: {
type: 'object',
properties: {
command: { type: 'string' },
timeout: { type: 'number' },
workdir: { type: 'string' },
background: { type: 'boolean' },
},
required: ['command'],
},
execute: async (id, params, signal, onUpdate) => {
// 1. 如果启用了沙箱,在容器中执行
if (sandboxEnabled) {
return await execInContainer(params, signal);
}
// 2. 否则在宿主机执行
const child = spawn('powershell', ['/c', params.command], {
cwd: params.workdir,
signal,
timeout: params.timeout ?? 30000,
});
// 3. 流式报告输出
child.stdout.on('data', (chunk) => {
onUpdate({ type: 'stdout', text: chunk.toString() });
});
// 4. 等待完成
return await child.complete();
},
};
六、流式响应处理
LLM 的输出不是等全部生成完才返回的。OpenClaw 通过事件订阅实时处理流式输出。
6.1 事件订阅系统
// src/agents/pi-embedded-subscribe.ts
function subscribeEmbeddedPiSession(params) {
const { session, onBlockReply, onPartialReply, onToolResult, ... } = params;
// 监听所有事件
session.on('message_start', () => { /* 新消息开始 */ });
session.on('message_update', (chunk) => { /* 流式文本块 */ });
session.on('message_end', () => { /* 消息结束 */ });
session.on('tool_execution_start', (event) => { /* 工具开始 */ });
session.on('tool_execution_update', (event) => { /* 工具进度 */ });
session.on('tool_execution_end', (event) => { /* 工具完成 */ });
session.on('turn_start', () => { /* 一轮开始 */ });
session.on('turn_end', () => { /* 一轮结束 */ });
session.on('agent_start', () => { /* 整个 agent 运行开始 */ });
session.on('agent_end', () => { /* 整个 agent 运行结束 */ });
session.on('auto_compaction_start', () => { /* 压缩开始 */ });
session.on('auto_compaction_end', (result) => { /* 压缩完成 */ });
}
6.2 Block Chunker(分块器)
流式输出是逐 token 到达的,但直接逐 token 发给用户会体验很差。OpenClaw 用 EmbeddedBlockChunker 做智能分块:
// src/agents/pi-embedded-block-chunker.ts
class EmbeddedBlockChunker {
private buffer = '';
private pendingCodeBlock = false;
addChunk(text: string): string[] {
this.buffer += text;
const blocks: string[] = [];
// 策略 1: 遇到双换行分段
if (this.buffer.includes('\n\n')) {
const parts = this.buffer.split('\n\n');
blocks.push(...parts.slice(0, -1));
this.buffer = parts[parts.length - 1];
}
// 策略 2: 代码块完整后发送
if (this.buffer.includes('```')) {
this.pendingCodeBlock = !this.pendingCodeBlock;
if (!this.pendingCodeBlock) {
blocks.push(this.buffer);
this.buffer = '';
}
}
// 策略 3: 超时强制发送 (避免长时间无输出)
// ...
return blocks;
}
}
6.3 回复指令解析
Agent 可以在回复中嵌入特殊指令:
// src/agents/pi-embedded-subscribe.handlers.ts
function consumeReplyDirectives(chunk: string) {
let text = chunk;
const mediaUrls: string[] = [];
let audioAsVoice = false;
let replyToId: string | null = null;
// [[media:url]] → 附加媒体文件
text = text.replace(/\[\[media:(https?:\/\/[^\]]+)\]\]/g, (_, url) => {
mediaUrls.push(url);
return '';
});
// [[voice]] → 用 TTS 语音发送
text = text.replace(/\[\[voice\]\]/g, () => {
audioAsVoice = true;
return '';
});
// [[reply_to:id]] → 引用回复
text = text.replace(/\[\[reply_to:(\d+)\]\]/g, (_, id) => {
replyToId = id;
return '';
});
return { text, mediaUrls, audioAsVoice, replyToId };
}
6.4 Thinking 标签剥离
LLM 可能输出 thinking 内容(思考过程),这些不应该发给用户:
function stripBlockTags(text: string, state: { thinking: boolean }) {
// 处理 🤔... 区块 (thinking 标记)
let result = '';
let i = 0;
while (i < text.length) {
if (text.startsWith('🤔', i)) {
state.thinking = true;
i += '🤔'.length;
continue;
}
if (text.startsWith('</thinking>', i)) {
state.thinking = false;
i += '</thinking>'.length;
continue;
}
if (!state.thinking) {
result += text[i];
}
i++;
}
// 如果 enforceFinalTag 模式,只提取 <final>...</final>
// ...
return result;
}
七、并发控制与队列
7.1 同一 Session 串行执行
// src/agents/pi-embedded-runner/runs.ts
const activeRuns = new Map<string, RunEntry>();
async function enqueueRun(sessionKey: string, runFn: () => Promise<void>) {
const existing = activeRuns.get(sessionKey);
if (existing) {
// 当前 session 有 run 在执行,排队
existing.queue.push(runFn);
return;
}
const entry = { current: runFn(), queue: [] };
activeRuns.set(sessionKey, entry);
try {
await entry.current;
} finally {
// 执行队列中的下一个
const next = entry.queue.shift();
if (next) {
activeRuns.set(sessionKey, { current: next(), queue: entry.queue });
} else {
activeRuns.delete(sessionKey);
}
}
}
为什么同一 session 必须串行? 因为 session 的 transcript 是 append-only 的,并发写入会导致数据损坏。
7.2 超时控制
const runResult = await Promise.race([
session.prompt(prompt, { images }),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('timeout')), params.timeoutMs)
),
]);
默认超时 120 秒。超时后自动清理 run 状态,释放并发锁。
八、错误处理与重试
8.1 错误分类
// src/agents/pi-embedded-helpers.ts
function classifyError(errorText: string): ErrorClass {
if (isContextOverflowError(errorText)) return 'context_overflow';
if (isCompactionFailureError(errorText)) return 'compaction_failure';
if (isAuthError(errorText)) return 'auth';
if (isRateLimitError(errorText)) return 'rate_limit';
if (isQuotaError(errorText)) return 'quota';
if (isTimeoutError(errorText)) return 'timeout';
return 'unknown';
}
8.2 Context Overflow 自动恢复
// 上下文溢出时自动压缩后重试
if (classifyError(errorText) === 'context_overflow') {
// 1. 触发 compaction (让 LLM 总结旧对话)
await compactSession(sessionManager, model, provider);
// 2. 用压缩后的历史重试
return await retryPrompt(compactedHistory);
}
8.3 Thinking Level 降级
// 如果 thinking level 不被模型支持,自动降级
if (isThinkingLevelError(errorText)) {
const fallback = pickFallbackThinkingLevel({
attempted: currentThinkingLevel,
message: errorText,
});
if (fallback) {
return await retryWithThinking(fallback);
}
}
8.4 Failover 链
// 可配置的多级降级
// models.providers.<p>.failover: { targets: [...] }
async function handleWithFailover(error: Error) {
if (!config.failover?.targets?.length) throw error;
const targets = config.failover.targets;
for (const target of targets) {
try {
return await runWithModel(target.provider, target.model);
} catch (e) {
continue; // 尝试下一个
}
}
throw error; // 所有 target 都失败
}
九、执行阶段的完整时序图
session.prompt("帮我查一下天气")
│
▼
┌─ 迭代 1 ─────────────────────────────────────────────┐
│ ① 构建 messages: [system, ...history, user_msg] │
│ ② sanitizeSessionHistory() (provider 适配) │
│ ③ limitHistoryTurns() (裁剪历史) │
│ ④ streamSimple(model, messages) → 调用 LLM │
│ ⑤ 响应: tool_calls: [{ name: "web_fetch", │
│ params: { url: "wttr.in" } }]│
│ ⑥ 执行 web_fetch("wttr.in/Beijing") │
│ ⑦ 结果: "北京 晴 15-25°C..." │
│ ⑧ 追加到 history → 继续迭代 │
└──────────────────────┬──────────────────────────────┘
│
▼
┌─ 迭代 2 ─────────────────────────────────────────────┐
│ ① 构建 messages: [system, ...history, tool_result] │
│ ② streamSimple(model, messages) → 调用 LLM │
│ ③ 响应: text: "今天北京晴,15-25°C,适合外出。" │
│ ④ 无 tool_calls → 循环结束 │
└──────────────────────┬──────────────────────────────┘
│
▼
┌─ 流式返回 ───────────────────────────────────────────┐
│ ① BlockChunker 分块 │
│ ② stripBlockTags() 剥离 thinking │
│ ③ consumeReplyDirectives() 解析 [[media]] 等 │
│ ④ onBlockReply() → Channel Provider 发送到 Telegram │
│ ⑤ 用户看到: "今天北京晴,15-25°C,适合外出。" │
└──────────────────────┬──────────────────────────────┘
│
▼
┌─ 后处理 ─────────────────────────────────────────────┐
│ ① 更新 sessions.json (token 计数, timestamp) │
│ ② SessionManager 追加到 jsonl │
│ ③ 检查 contextTokens vs contextWindow │
│ ④ 如果接近上限 → 触发 auto-compaction │
│ ⑤ 释放 run 锁 │
└──────────────────────────────────────────────────────┘
设计亮点总结
- Agentic Loop 本质 — 不是单次调用,而是"思考→行动→观察→再思考"的循环
- Transcript Hygiene — 上游解决 provider 差异,用户无感知
- Block Chunker 智能分块 — 不是逐 token 推送,而是按语义分段
- 工具签名适配器 — 桥接两个 SDK 的接口差异
- 同 Session 串行执行 — 保护 transcript 一致性
- 多层错误恢复 — overflow→压缩重试、thinking→降级、auth→failover
- 回复指令系统 — Agent 可以声明式地控制发送格式(媒体/语音/引用)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)