本文以 earendil-works/pi-mono 为样本,分析其 agent 核心包 @earendil-works/pi-agent-core 的三层 API 切分。所引用的代码位于 packages/agent/src/

引子:为什么要把 agent 拆成三层

绝大多数 agent 框架只暴露一个"大入口":传入消息和工具,等待最终回答。这种设计在 demo 阶段够用,但在工程落地时会撞到几堵墙:

  • 想做流式 UI,事件粒度不够细;
  • 想接持久化,框架内部状态拿不到;
  • 想在长时间运行时改模型 / 改工具 / 中途打断,没有安全的边界;
  • 想做崩溃恢复,事件流和持久化是耦合的。

pi 的解法是把 agent 拆成 三层 API,每层独立可用,能力递增

文件 角色
agentLoop packages/agent/src/agent-loop.ts 纯流式生成器,无状态
Agent packages/agent/src/agent.ts 有状态、可订阅、含队列与中途打断
AgentHarness packages/agent/src/harness/agent-harness.ts 持久化、phase 状态机、save point、hook

下文逐层拆解。


第一层:agentLoop —— 纯生成器,无状态

agentLoop 是整个 pi agent 体系的"原子 API"。它的签名极简:

// packages/agent/src/agent-loop.ts
export function agentLoop(
    prompts: AgentMessage[],
    context: AgentContext,
    config: AgentLoopConfig,
    signal?: AbortSignal,
    streamFn?: StreamFn,
): EventStream<AgentEvent, AgentMessage[]>;

export function agentLoopContinue(
    context: AgentContext,
    config: AgentLoopConfig,
    signal?: AbortSignal,
    streamFn?: StreamFn,
): EventStream<AgentEvent, AgentMessage[]>;

它只做一件事:在 (messages, tools, model) 上循环执行 LLM 调用 → 解析 toolCall → 执行工具 → 把 toolResult 喂回,直到模型自然停止或工具显式 terminate它本身不持有任何状态——所有上下文通过参数传入,所有结果通过事件流和返回值传出。

1.1 事件序列

agentLoop 是一个 async generator(封装在 EventStream 里),输出固定语义的事件序列。一次没有工具调用的简单对话长这样:

agent_start
  turn_start
    message_start  { message: userMsg }
    message_end    { message: userMsg }
    message_start  { message: <partial assistant> }
    message_update { delta, contentIndex, partial } x N
    message_end    { message: <complete assistant> }
  turn_end         { message, toolResults: [] }
agent_end          { messages: [...] }

如果 assistant 触发了工具调用,会在 message_end 之后插入工具执行序列,然后进入下一个 turn

turn_end       { message, toolResults: [] }     ← 之前的 turn 结束
  tool_execution_start  { toolCallId, toolName, args }
  tool_execution_update { partial }              ← 工具流式回报(可选)
  tool_execution_end    { toolCallId, result }
  message_start         { toolResultMsg }
  message_end           { toolResultMsg }
turn_start                                       ← 新 turn 开始
  message_start { partial assistant }
  ...

每个事件都带有 contentIndex 这种字段,用于把 streaming delta 对应回一个 content block。消费者必须用 contentIndex 关联,不能假设 *_start / *_delta / *_end 是连续不交错的——因为同一个 provider chunk 里可能同时包含 text、thinking、toolCall 三种 delta。这点 README 里有专门强调。

1.2 主循环结构

runLoop 的核心简化一下:

async function runLoop(initialContext, newMessages, initialConfig, signal, emit, streamFn) {
    let currentContext = initialContext;
    let config = initialConfig;
    let pendingMessages = (await config.getSteeringMessages?.()) || [];

    // 外层循环:处理 follow-up(agent 本来要停时追加的消息)
    while (true) {
        let hasMoreToolCalls = true;

        // 内层循环:一个 turn 接一个 turn
        while (hasMoreToolCalls || pendingMessages.length > 0) {
            await emit({ type: "turn_start" });

            // 1) 先把 steer 队列里的消息注入
            for (const message of pendingMessages) {
                await emit({ type: "message_start", message });
                await emit({ type: "message_end", message });
                currentContext.messages.push(message);
                newMessages.push(message);
            }
            pendingMessages = [];

            // 2) 调 LLM
            const message = await streamAssistantResponse(currentContext, config, signal, emit, streamFn);
            newMessages.push(message);

            // 3) 处理工具调用
            const toolCalls = message.content.filter((c) => c.type === "toolCall");
            const toolResults = [];
            hasMoreToolCalls = false;
            if (toolCalls.length > 0) {
                const batch = await executeToolCalls(currentContext, message, config, signal, emit);
                toolResults.push(...batch.messages);
                hasMoreToolCalls = !batch.terminate;
                for (const r of toolResults) {
                    currentContext.messages.push(r);
                    newMessages.push(r);
                }
            }
            await emit({ type: "turn_end", message, toolResults });

            // 4) save point:可让上层刷盘 / 替换 snapshot
            const snapshot = await config.prepareNextTurn?.({ message, toolResults, context: currentContext, newMessages });
            if (snapshot) {
                currentContext = snapshot.context ?? currentContext;
                config = { ...config, model: snapshot.model ?? config.model, ... };
            }

            // 5) 显式停止信号
            if (await config.shouldStopAfterTurn?.({ ... })) {
                await emit({ type: "agent_end", messages: newMessages });
                return;
            }

            // 6) 检查 steer 队列
            pendingMessages = (await config.getSteeringMessages?.()) || [];
        }

        // 内层退出 → agent 本来要停。检查 follow-up 队列
        const followUps = (await config.getFollowUpMessages?.()) || [];
        if (followUps.length > 0) {
            pendingMessages = followUps;
            continue;
        }
        break;
    }
    await emit({ type: "agent_end", messages: newMessages });
}

几处关键设计:

消息流转的边界:context 里的消息一直是 AgentMessage[](pi 自己的扩展类型,可包含 custom 消息),只有在调 LLM 那一刻才用 convertToLlm 转成标准 Message[]。这让上层可以在 context 里塞应用层消息(通知、模型切换标记等),不影响 LLM 调用。

prepareNextTurn 是 save point 的钩子:每个 turn 结束后调一次,上层可以返回新的 context / model / thinkingLevel,下个 turn 自动应用。这是后面 AgentHarness 实现"运行中改模型"的关键基础设施。

工具执行模式

// packages/agent/src/agent-loop.ts
const hasSequentialToolCall = toolCalls.some(
    (tc) => currentContext.tools?.find((t) => t.name === tc.name)?.executionMode === "sequential",
);
if (config.toolExecution === "sequential" || hasSequentialToolCall) {
    return executeToolCallsSequential(...);
}
return executeToolCallsParallel(...);

只要单批工具调用里有任何一个工具被声明为 sequential,整批降级串行。这是保守设计——避免"串行写文件"和"并行 grep" 同时存在时把 grep 跑在 write 之前。

工具结果的 terminate 仲裁

工具可以返回 terminate: true 提示 agent “完事了别再 LLM 调用”。但只有整批工具结果都说 terminate 时才生效,混合 batch 仍然继续。

1.3 适用场景

直接用 agentLoop 的场景:

  • 完全无状态的批处理(“读取这 100 个 issue,对每个跑一次 agent”);
  • 只想要事件流,自己实现状态管理;
  • 嵌入到现有框架里,不需要 pi 的 state 抽象。

它不解决以下问题:

  • 多个订阅者同时监听事件;
  • 跑到一半改 model / tools / systemPrompt;
  • 把消息持久化到磁盘;
  • 中途打断(abort 之外的精细控制)。

需要这些能力,就要用第二层。


第二层:Agent —— 加状态、订阅、队列

Agent 是把 agentLoop 封进一个有状态对象

// packages/agent/src/agent.ts
class Agent {
    state: {
        systemPrompt: string;
        model: Model<any>;
        thinkingLevel: ThinkingLevel;
        tools: AgentTool[];
        messages: AgentMessage[];
        readonly isStreaming: boolean;
        readonly streamingMessage?: AgentMessage;
        readonly pendingToolCalls: ReadonlySet<string>;
    };

    subscribe(listener: (event: AgentEvent, signal?: AbortSignal) => Promise<void> | void): () => void;
    async prompt(message: string, images?: ImageContent[]): Promise<void>;
    async continue(): Promise<void>;          // 不加新消息,从当前 context 续跑
    steer(message: AgentMessage): void;        // turn 边界注入(打断)
    followUp(message: AgentMessage): void;     // agent 本要停时追加
    abort(): void;
    async waitForIdle(): Promise<void>;
    reset(): void;
}

2.1 它额外做了什么

第一,agentLoop 需要的 AgentLoopConfig 全部从 state 派生。改 agent.state.toolsagent.state.systemPromptagent.state.model 都立刻反映到下一个 turn。

第二,提供 steer / followUp 队列

// packages/agent/src/agent.ts
class PendingMessageQueue {
    private messages: AgentMessage[] = [];
    public mode: QueueMode;          // "all" | "one-at-a-time"

    enqueue(message: AgentMessage): void {
        this.messages.push(message);
    }

    drain(): AgentMessage[] {
        if (this.mode === "all") {
            const drained = this.messages.slice();
            this.messages = [];
            return drained;
        }
        const drained = this.messages.length ? [this.messages.shift()!] : [];
        return drained;
    }
}

Agent 持有两个队列:steeringQueuefollowUpQueue。它把这两个队列的 drain() 接成 agentLoop 配置里的 getSteeringMessages / getFollowUpMessages 钩子,于是 agentLoop 在两个标准切点(turn 边界、agent 本要 end 时)会自动 poll 队列。

第三,订阅者按注册顺序串行 await

agent.subscribe(listenerA);
agent.subscribe(listenerB);
// 每个事件先 await listenerA 再 await listenerB
// agent_end 事件的 listener 完成后,prompt() 才 resolve

这很重要,意味着 await agent.prompt(...) 完成时,所有订阅者已经处理完最后一个事件。pi-coding-agent 利用这点在 message_end 上做"先持久化再通知 UI"。

第四,agent.continue():从当前 context 续跑(不加新消息)。最后一条消息必须是 usertoolResult。专门为重试设计——比如 provider 临时报错,捕获后调 continue(),agent 会用同一段消息再发一次请求。

2.2 适用场景

Agent 已经能撑起一个浏览器内的对话 UI:

  • React/Vue 组件订阅事件渲染流式消息;
  • 用户点"打断"按钮调 agent.steer(...)
  • 用户改模型直接 agent.state.model = newModel,下个 turn 生效;
  • 重试就是 agent.continue()

它不解决的:

  • 把消息按事件落到 .jsonl 文件,崩溃后能恢复;
  • 长时间运行中改配置时的 race(运行中改 model 会污染当前 provider 请求吗);
  • 多种"结构性操作"互斥(比如不能同时跑 prompt 和 compact);
  • 对外提供 hook / extension。

需要这些就要用第三层。


第三层:AgentHarness —— phase 状态机 + 持久化 + hook

AgentHarness 是 pi 的"工程级"agent 抽象,文件位于 packages/agent/src/harness/agent-harness.ts,配套文档在 packages/agent/docs/agent-harness.md

3.1 phase 状态机

最核心的概念:

type AgentHarnessPhase = "idle" | "turn" | "compaction" | "branch_summary" | "retry";

哪些操作要 idle、哪些可以在 turn 中调,规定得很死:

操作 要求
prompt / skill / promptFromTemplate / compact / navigateTree 必须 idle,否则 AgentHarnessError(code: "busy")
steer / followUp / nextTurn / abort turn 中允许
所有 setter(setModel / setTools / setResources / …) 任何时候都允许,但只影响下一个 turn snapshot

进入结构性操作前同步置位 phase,第一个 await 之前就把状态设好;操作结束(无论成功失败)finally 复位。这样 race 完全可控。

3.2 双层 state:harness config vs turn snapshot

这是整个 harness 设计中最值得抄走的一招。

harness config 是上层最新设置,由 getter 返回:

harness.getModel()         → 当前最新 model
harness.getTools()         → 当前最新 tools
harness.getResources()     → 当前最新 resources
harness.getSystemPrompt()  → 最新 system prompt

turn snapshot 是这一轮 LLM 调用真正用的快照,由 createTurnState() 在 turn 开始时生成:

[turn_start 时刻]
  snapshot = createTurnState(harness)
    ├─ messages         ← 从 session 读
    ├─ resources        ← shallow copy harness.resources
    ├─ systemPrompt     ← 调用 systemPromptProvider 一次
    ├─ model            ← 当前 harness.model
    ├─ thinkingLevel    ← 当前 harness.thinkingLevel
    ├─ tools / activeTools
    ├─ streamOptions    ← shallow copy
    └─ sessionId

整个 turn 内的 agentLoop 都用这个 snapshot。turn 进行中改 harness.model 不会污染当前的 provider 请求——但下一个 turn(save point 之后)会立刻看到新值。

代码上,harness 把 prepareNextTurn 钩子接到自己的 save point 逻辑上,每个 turn 结束时:

  1. flush pending session writes(hook / 扩展产生的写入按 FIFO 落盘)
  2. 重建 turn snapshot
  3. 把 snapshot 的 context / model / reasoning / streamOptions 应用到 agentLoop 配置
// packages/agent/src/agent-loop.ts
const nextTurnSnapshot = await config.prepareNextTurn?.(nextTurnContext);
if (nextTurnSnapshot) {
    currentContext = nextTurnSnapshot.context ?? currentContext;
    config = {
        ...config,
        model: nextTurnSnapshot.model ?? config.model,
        reasoning:
            nextTurnSnapshot.thinkingLevel === undefined
                ? config.reasoning
                : nextTurnSnapshot.thinkingLevel === "off"
                    ? undefined
                    : nextTurnSnapshot.thinkingLevel,
    };
}

3.3 Pending session writes:保证写入顺序

session 是底层持久化抽象(见 packages/agent/src/harness/session/),在 idle 时直接写、在 turn 中写要排队。

为什么要排队:agent 自己的消息(assistant message、toolResult)由 message_end 事件立刻落盘,而 hook / extension 在 turn 中产生的写入必须等待 agent 这一轮的消息全部落盘后再写,否则文件里的事件顺序就和实际事件顺序对不上。

排队规则:

  • idle 时写 → 立刻持久化;
  • 结构性操作进行中写 → 入队 pending session writes
  • save point(每个 turn 结束)→ 按 FIFO flush;
  • agent_end → 再 flush 一次;
  • 操作失败的 cleanup 路径也会 flush;
  • abort 不丢 pending writes,下次 save point 还在。

3.4 错误分层:低层 Result,高层 throw

低层 capability(filesystem、shell、resource loading、compaction helpers)用 Result<T, E>

type Result<TValue, TError> =
    | { ok: true; value: TValue }
    | { ok: false; error: TError };

这些层级不抛异常,调用者必须显式处理 result.ok

高层(SessionAgentHarness)回到 throw + AgentHarnessError,并保留底层错误为 cause

class AgentHarnessError extends Error {
    code: "busy" | "hook" | "session" | "provider" | ...;
    cause?: unknown;
}

好处:低层不会因为某次 fs 失败把整个 harness 炸了;高层 API 又有清晰的错误语义供调用者 catch。

3.5 hook 系统

AgentHarness 是 pi 唯一的扩展注入点,主要 hook:

Hook 时机 用途
beforeToolCall tool 参数验证后、execute 前 拒绝 / 改参数 / 注入 audit
afterToolCall execute 后、tool_execution_end 前 改结果 / 强制 terminate
before_provider_request LLM 请求发出前 改 stream options / 注入 secrets / 拒绝
before_provider_payload payload 序列化前 修改最终 payload
after_provider_response 响应回到 agent 前 审计 / 记账

Hook 设计正在向 typed reducer 演进(packages/agent/docs/hooks.md),目标是每个事件类型对应一个 reducer,extension 只需声明感兴趣的事件 + 提供 reducer。

3.6 durable harness:正在做的事

packages/agent/docs/durable-harness.md 描述了一个进行中的方向:把 harness 自己的 state(队列、pending writes、turn / provider request / tool call 边界)也作为 entry 写进 session.jsonl,让宿主进程崩溃后能从 session 重放出 harness state。这是把 append-only event log 思想推到极致——session 不只是消息历史,而是整个 agent 的 durable state machine。


三层 API 的取舍总结

维度 agentLoop Agent AgentHarness
状态 内存 内存 + 持久化
订阅者 单一 emit 回调 多订阅、按序 await 同 Agent + hook
队列 getSteeringMessages 钩子 steer / followUp 队列 同 Agent + nextTurn
配置变更 调用方负责 改 state 立即生效 turn snapshot 隔离 race
持久化 自动 + pending writes
错误 任由抛出 任由抛出 AgentHarnessError 归一
扩展 hook + extension
适用 批处理、嵌入 单页面对话 工程化产品

实际项目中这三层经常叠加使用coding-agent 包里的 AgentSession 就是在 AgentHarness 上再封一层(加内置工具、bash 执行、retry、auto-compaction),而它内部仍然是用 agentLoop 在跑。


写在最后

把 agent 的能力切成三层,而不是用一个大对象兜住所有概念,是 pi 给同类项目最有价值的启发。它意味着:

  • 测试可分层:测 agentLoop 不需要持久化,测 Agent 不需要 hook,测 AgentHarness 才关心边界与 race;
  • 复用可分层:实现一个"无界面批处理"只用 agentLoop,几十行;
  • 稳定性可分层:底层 generator 一旦稳定,上层可以快速迭代。

对正在自研 agent 框架的工程团队,最直接的可借鉴点:把"状态"、“事件”、“持久化”、"配置变更安全性"这四件事拆开实现,每一件都有明确的边界。而 turn snapshot vs harness config 的双层 state 模型,是处理"运行时配置变更"最干净的解法之一,值得直接抄进自己的项目。

仓库地址:https://github.com/earendil-works/pi-mono
关键文档:packages/agent/docs/agent-harness.mdpackages/agent/docs/durable-harness.mdpackages/agent/docs/hooks.md

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐