Agent 三层 API 设计:agentLoop / Agent / AgentHarness 一个比一个高级
本文以 earendil-works/pi-mono 为样本,分析其 agent 核心包
@earendil-works/pi-agent-core的三层 API 切分。所引用的代码位于packages/agent/src/。
引子:为什么要把 agent 拆成三层
绝大多数 agent 框架只暴露一个"大入口":传入消息和工具,等待最终回答。这种设计在 demo 阶段够用,但在工程落地时会撞到几堵墙:
- 想做流式 UI,事件粒度不够细;
- 想接持久化,框架内部状态拿不到;
- 想在长时间运行时改模型 / 改工具 / 中途打断,没有安全的边界;
- 想做崩溃恢复,事件流和持久化是耦合的。
pi 的解法是把 agent 拆成 三层 API,每层独立可用,能力递增:
| 层 | 文件 | 角色 |
|---|---|---|
agentLoop |
packages/agent/src/agent-loop.ts |
纯流式生成器,无状态 |
Agent |
packages/agent/src/agent.ts |
有状态、可订阅、含队列与中途打断 |
AgentHarness |
packages/agent/src/harness/agent-harness.ts |
持久化、phase 状态机、save point、hook |
下文逐层拆解。
第一层:agentLoop —— 纯生成器,无状态
agentLoop 是整个 pi agent 体系的"原子 API"。它的签名极简:
// packages/agent/src/agent-loop.ts
export function agentLoop(
prompts: AgentMessage[],
context: AgentContext,
config: AgentLoopConfig,
signal?: AbortSignal,
streamFn?: StreamFn,
): EventStream<AgentEvent, AgentMessage[]>;
export function agentLoopContinue(
context: AgentContext,
config: AgentLoopConfig,
signal?: AbortSignal,
streamFn?: StreamFn,
): EventStream<AgentEvent, AgentMessage[]>;
它只做一件事:在 (messages, tools, model) 上循环执行 LLM 调用 → 解析 toolCall → 执行工具 → 把 toolResult 喂回,直到模型自然停止或工具显式 terminate。它本身不持有任何状态——所有上下文通过参数传入,所有结果通过事件流和返回值传出。
1.1 事件序列
agentLoop 是一个 async generator(封装在 EventStream 里),输出固定语义的事件序列。一次没有工具调用的简单对话长这样:
agent_start
turn_start
message_start { message: userMsg }
message_end { message: userMsg }
message_start { message: <partial assistant> }
message_update { delta, contentIndex, partial } x N
message_end { message: <complete assistant> }
turn_end { message, toolResults: [] }
agent_end { messages: [...] }
如果 assistant 触发了工具调用,会在 message_end 之后插入工具执行序列,然后进入下一个 turn:
turn_end { message, toolResults: [] } ← 之前的 turn 结束
tool_execution_start { toolCallId, toolName, args }
tool_execution_update { partial } ← 工具流式回报(可选)
tool_execution_end { toolCallId, result }
message_start { toolResultMsg }
message_end { toolResultMsg }
turn_start ← 新 turn 开始
message_start { partial assistant }
...
每个事件都带有 contentIndex 这种字段,用于把 streaming delta 对应回一个 content block。消费者必须用 contentIndex 关联,不能假设 *_start / *_delta / *_end 是连续不交错的——因为同一个 provider chunk 里可能同时包含 text、thinking、toolCall 三种 delta。这点 README 里有专门强调。
1.2 主循环结构
把 runLoop 的核心简化一下:
async function runLoop(initialContext, newMessages, initialConfig, signal, emit, streamFn) {
let currentContext = initialContext;
let config = initialConfig;
let pendingMessages = (await config.getSteeringMessages?.()) || [];
// 外层循环:处理 follow-up(agent 本来要停时追加的消息)
while (true) {
let hasMoreToolCalls = true;
// 内层循环:一个 turn 接一个 turn
while (hasMoreToolCalls || pendingMessages.length > 0) {
await emit({ type: "turn_start" });
// 1) 先把 steer 队列里的消息注入
for (const message of pendingMessages) {
await emit({ type: "message_start", message });
await emit({ type: "message_end", message });
currentContext.messages.push(message);
newMessages.push(message);
}
pendingMessages = [];
// 2) 调 LLM
const message = await streamAssistantResponse(currentContext, config, signal, emit, streamFn);
newMessages.push(message);
// 3) 处理工具调用
const toolCalls = message.content.filter((c) => c.type === "toolCall");
const toolResults = [];
hasMoreToolCalls = false;
if (toolCalls.length > 0) {
const batch = await executeToolCalls(currentContext, message, config, signal, emit);
toolResults.push(...batch.messages);
hasMoreToolCalls = !batch.terminate;
for (const r of toolResults) {
currentContext.messages.push(r);
newMessages.push(r);
}
}
await emit({ type: "turn_end", message, toolResults });
// 4) save point:可让上层刷盘 / 替换 snapshot
const snapshot = await config.prepareNextTurn?.({ message, toolResults, context: currentContext, newMessages });
if (snapshot) {
currentContext = snapshot.context ?? currentContext;
config = { ...config, model: snapshot.model ?? config.model, ... };
}
// 5) 显式停止信号
if (await config.shouldStopAfterTurn?.({ ... })) {
await emit({ type: "agent_end", messages: newMessages });
return;
}
// 6) 检查 steer 队列
pendingMessages = (await config.getSteeringMessages?.()) || [];
}
// 内层退出 → agent 本来要停。检查 follow-up 队列
const followUps = (await config.getFollowUpMessages?.()) || [];
if (followUps.length > 0) {
pendingMessages = followUps;
continue;
}
break;
}
await emit({ type: "agent_end", messages: newMessages });
}
几处关键设计:
消息流转的边界:context 里的消息一直是 AgentMessage[](pi 自己的扩展类型,可包含 custom 消息),只有在调 LLM 那一刻才用 convertToLlm 转成标准 Message[]。这让上层可以在 context 里塞应用层消息(通知、模型切换标记等),不影响 LLM 调用。
prepareNextTurn 是 save point 的钩子:每个 turn 结束后调一次,上层可以返回新的 context / model / thinkingLevel,下个 turn 自动应用。这是后面 AgentHarness 实现"运行中改模型"的关键基础设施。
工具执行模式:
// packages/agent/src/agent-loop.ts
const hasSequentialToolCall = toolCalls.some(
(tc) => currentContext.tools?.find((t) => t.name === tc.name)?.executionMode === "sequential",
);
if (config.toolExecution === "sequential" || hasSequentialToolCall) {
return executeToolCallsSequential(...);
}
return executeToolCallsParallel(...);
只要单批工具调用里有任何一个工具被声明为 sequential,整批降级串行。这是保守设计——避免"串行写文件"和"并行 grep" 同时存在时把 grep 跑在 write 之前。
工具结果的 terminate 仲裁:
工具可以返回 terminate: true 提示 agent “完事了别再 LLM 调用”。但只有整批工具结果都说 terminate 时才生效,混合 batch 仍然继续。
1.3 适用场景
直接用 agentLoop 的场景:
- 完全无状态的批处理(“读取这 100 个 issue,对每个跑一次 agent”);
- 只想要事件流,自己实现状态管理;
- 嵌入到现有框架里,不需要 pi 的 state 抽象。
它不解决以下问题:
- 多个订阅者同时监听事件;
- 跑到一半改 model / tools / systemPrompt;
- 把消息持久化到磁盘;
- 中途打断(abort 之外的精细控制)。
需要这些能力,就要用第二层。
第二层:Agent —— 加状态、订阅、队列
Agent 是把 agentLoop 封进一个有状态对象:
// packages/agent/src/agent.ts
class Agent {
state: {
systemPrompt: string;
model: Model<any>;
thinkingLevel: ThinkingLevel;
tools: AgentTool[];
messages: AgentMessage[];
readonly isStreaming: boolean;
readonly streamingMessage?: AgentMessage;
readonly pendingToolCalls: ReadonlySet<string>;
};
subscribe(listener: (event: AgentEvent, signal?: AbortSignal) => Promise<void> | void): () => void;
async prompt(message: string, images?: ImageContent[]): Promise<void>;
async continue(): Promise<void>; // 不加新消息,从当前 context 续跑
steer(message: AgentMessage): void; // turn 边界注入(打断)
followUp(message: AgentMessage): void; // agent 本要停时追加
abort(): void;
async waitForIdle(): Promise<void>;
reset(): void;
}
2.1 它额外做了什么
第一,把 agentLoop 需要的 AgentLoopConfig 全部从 state 派生。改 agent.state.tools、agent.state.systemPrompt、agent.state.model 都立刻反映到下一个 turn。
第二,提供 steer / followUp 队列:
// packages/agent/src/agent.ts
class PendingMessageQueue {
private messages: AgentMessage[] = [];
public mode: QueueMode; // "all" | "one-at-a-time"
enqueue(message: AgentMessage): void {
this.messages.push(message);
}
drain(): AgentMessage[] {
if (this.mode === "all") {
const drained = this.messages.slice();
this.messages = [];
return drained;
}
const drained = this.messages.length ? [this.messages.shift()!] : [];
return drained;
}
}
Agent 持有两个队列:steeringQueue 和 followUpQueue。它把这两个队列的 drain() 接成 agentLoop 配置里的 getSteeringMessages / getFollowUpMessages 钩子,于是 agentLoop 在两个标准切点(turn 边界、agent 本要 end 时)会自动 poll 队列。
第三,订阅者按注册顺序串行 await:
agent.subscribe(listenerA);
agent.subscribe(listenerB);
// 每个事件先 await listenerA 再 await listenerB
// agent_end 事件的 listener 完成后,prompt() 才 resolve
这很重要,意味着 await agent.prompt(...) 完成时,所有订阅者已经处理完最后一个事件。pi-coding-agent 利用这点在 message_end 上做"先持久化再通知 UI"。
第四,agent.continue():从当前 context 续跑(不加新消息)。最后一条消息必须是 user 或 toolResult。专门为重试设计——比如 provider 临时报错,捕获后调 continue(),agent 会用同一段消息再发一次请求。
2.2 适用场景
Agent 已经能撑起一个浏览器内的对话 UI:
- React/Vue 组件订阅事件渲染流式消息;
- 用户点"打断"按钮调
agent.steer(...); - 用户改模型直接
agent.state.model = newModel,下个 turn 生效; - 重试就是
agent.continue()。
它不解决的:
- 把消息按事件落到
.jsonl文件,崩溃后能恢复; - 长时间运行中改配置时的 race(运行中改 model 会污染当前 provider 请求吗);
- 多种"结构性操作"互斥(比如不能同时跑 prompt 和 compact);
- 对外提供 hook / extension。
需要这些就要用第三层。
第三层:AgentHarness —— phase 状态机 + 持久化 + hook
AgentHarness 是 pi 的"工程级"agent 抽象,文件位于 packages/agent/src/harness/agent-harness.ts,配套文档在 packages/agent/docs/agent-harness.md。
3.1 phase 状态机
最核心的概念:
type AgentHarnessPhase = "idle" | "turn" | "compaction" | "branch_summary" | "retry";
哪些操作要 idle、哪些可以在 turn 中调,规定得很死:
| 操作 | 要求 |
|---|---|
prompt / skill / promptFromTemplate / compact / navigateTree |
必须 idle,否则 AgentHarnessError(code: "busy") |
steer / followUp / nextTurn / abort |
turn 中允许 |
| 所有 setter(setModel / setTools / setResources / …) | 任何时候都允许,但只影响下一个 turn snapshot |
进入结构性操作前同步置位 phase,第一个 await 之前就把状态设好;操作结束(无论成功失败)finally 复位。这样 race 完全可控。
3.2 双层 state:harness config vs turn snapshot
这是整个 harness 设计中最值得抄走的一招。
harness config 是上层最新设置,由 getter 返回:
harness.getModel() → 当前最新 model
harness.getTools() → 当前最新 tools
harness.getResources() → 当前最新 resources
harness.getSystemPrompt() → 最新 system prompt
turn snapshot 是这一轮 LLM 调用真正用的快照,由 createTurnState() 在 turn 开始时生成:
[turn_start 时刻]
snapshot = createTurnState(harness)
├─ messages ← 从 session 读
├─ resources ← shallow copy harness.resources
├─ systemPrompt ← 调用 systemPromptProvider 一次
├─ model ← 当前 harness.model
├─ thinkingLevel ← 当前 harness.thinkingLevel
├─ tools / activeTools
├─ streamOptions ← shallow copy
└─ sessionId
整个 turn 内的 agentLoop 都用这个 snapshot。turn 进行中改 harness.model 不会污染当前的 provider 请求——但下一个 turn(save point 之后)会立刻看到新值。
代码上,harness 把 prepareNextTurn 钩子接到自己的 save point 逻辑上,每个 turn 结束时:
- flush pending session writes(hook / 扩展产生的写入按 FIFO 落盘)
- 重建 turn snapshot
- 把 snapshot 的
context / model / reasoning / streamOptions应用到agentLoop配置
// packages/agent/src/agent-loop.ts
const nextTurnSnapshot = await config.prepareNextTurn?.(nextTurnContext);
if (nextTurnSnapshot) {
currentContext = nextTurnSnapshot.context ?? currentContext;
config = {
...config,
model: nextTurnSnapshot.model ?? config.model,
reasoning:
nextTurnSnapshot.thinkingLevel === undefined
? config.reasoning
: nextTurnSnapshot.thinkingLevel === "off"
? undefined
: nextTurnSnapshot.thinkingLevel,
};
}
3.3 Pending session writes:保证写入顺序
session 是底层持久化抽象(见 packages/agent/src/harness/session/),在 idle 时直接写、在 turn 中写要排队。
为什么要排队:agent 自己的消息(assistant message、toolResult)由 message_end 事件立刻落盘,而 hook / extension 在 turn 中产生的写入必须等待 agent 这一轮的消息全部落盘后再写,否则文件里的事件顺序就和实际事件顺序对不上。
排队规则:
- idle 时写 → 立刻持久化;
- 结构性操作进行中写 → 入队
pending session writes; - save point(每个 turn 结束)→ 按 FIFO flush;
- agent_end → 再 flush 一次;
- 操作失败的 cleanup 路径也会 flush;
- abort 不丢 pending writes,下次 save point 还在。
3.4 错误分层:低层 Result,高层 throw
低层 capability(filesystem、shell、resource loading、compaction helpers)用 Result<T, E>:
type Result<TValue, TError> =
| { ok: true; value: TValue }
| { ok: false; error: TError };
这些层级不抛异常,调用者必须显式处理 result.ok。
高层(Session、AgentHarness)回到 throw + AgentHarnessError,并保留底层错误为 cause:
class AgentHarnessError extends Error {
code: "busy" | "hook" | "session" | "provider" | ...;
cause?: unknown;
}
好处:低层不会因为某次 fs 失败把整个 harness 炸了;高层 API 又有清晰的错误语义供调用者 catch。
3.5 hook 系统
AgentHarness 是 pi 唯一的扩展注入点,主要 hook:
| Hook | 时机 | 用途 |
|---|---|---|
beforeToolCall |
tool 参数验证后、execute 前 | 拒绝 / 改参数 / 注入 audit |
afterToolCall |
execute 后、tool_execution_end 前 | 改结果 / 强制 terminate |
before_provider_request |
LLM 请求发出前 | 改 stream options / 注入 secrets / 拒绝 |
before_provider_payload |
payload 序列化前 | 修改最终 payload |
after_provider_response |
响应回到 agent 前 | 审计 / 记账 |
Hook 设计正在向 typed reducer 演进(packages/agent/docs/hooks.md),目标是每个事件类型对应一个 reducer,extension 只需声明感兴趣的事件 + 提供 reducer。
3.6 durable harness:正在做的事
packages/agent/docs/durable-harness.md 描述了一个进行中的方向:把 harness 自己的 state(队列、pending writes、turn / provider request / tool call 边界)也作为 entry 写进 session.jsonl,让宿主进程崩溃后能从 session 重放出 harness state。这是把 append-only event log 思想推到极致——session 不只是消息历史,而是整个 agent 的 durable state machine。
三层 API 的取舍总结
| 维度 | agentLoop |
Agent |
AgentHarness |
|---|---|---|---|
| 状态 | 无 | 内存 | 内存 + 持久化 |
| 订阅者 | 单一 emit 回调 | 多订阅、按序 await | 同 Agent + hook |
| 队列 | getSteeringMessages 钩子 |
steer / followUp 队列 | 同 Agent + nextTurn |
| 配置变更 | 调用方负责 | 改 state 立即生效 | turn snapshot 隔离 race |
| 持久化 | 无 | 无 | 自动 + pending writes |
| 错误 | 任由抛出 | 任由抛出 | AgentHarnessError 归一 |
| 扩展 | 无 | 无 | hook + extension |
| 适用 | 批处理、嵌入 | 单页面对话 | 工程化产品 |
实际项目中这三层经常叠加使用:coding-agent 包里的 AgentSession 就是在 AgentHarness 上再封一层(加内置工具、bash 执行、retry、auto-compaction),而它内部仍然是用 agentLoop 在跑。
写在最后
把 agent 的能力切成三层,而不是用一个大对象兜住所有概念,是 pi 给同类项目最有价值的启发。它意味着:
- 测试可分层:测
agentLoop不需要持久化,测Agent不需要 hook,测AgentHarness才关心边界与 race; - 复用可分层:实现一个"无界面批处理"只用
agentLoop,几十行; - 稳定性可分层:底层 generator 一旦稳定,上层可以快速迭代。
对正在自研 agent 框架的工程团队,最直接的可借鉴点:把"状态"、“事件”、“持久化”、"配置变更安全性"这四件事拆开实现,每一件都有明确的边界。而 turn snapshot vs harness config 的双层 state 模型,是处理"运行时配置变更"最干净的解法之一,值得直接抄进自己的项目。
仓库地址:https://github.com/earendil-works/pi-mono
关键文档:packages/agent/docs/agent-harness.md、packages/agent/docs/durable-harness.md、packages/agent/docs/hooks.md
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)