1、Learn Harness Engineering介绍

通过网盘分享的文件:学习 Harness Engineering
链接: https://pan.baidu.com/s/1Z9YVAvccnw5qY3azpxWnNw?pwd=tyui 提取码: tyui

2-核心循环与工具调度

一、什么是 Agent Harness?

Agent Harness(代理线束)是连接 LLM 和真实世界的"桥梁"。它的核心职责只有一件事:

循环调用 LLM,执行工具,把结果反馈给 LLM,直到 LLM 决定停止。

这个模式是所有 AI Agent 系统的基石。无论是 Claude Code、Cursor、Devin,还是任何 AI 编程助手,底层都是这个 while 循环。

核心公式

while stop_reason == "tool_use":
    response = LLM(messages, tools)
    execute tools
    append results

流程图

+----------+      +-------+      +---------+
|   用户   | ---> |  LLM  | ---> |  工具   |
|  输入    |      |       |      |  执行   |
+----------+      +---+---+      +----+----+
                      ^               |
                      |   tool_result |
                      +---------------+
                      (循环继续,直到模型决定停止)

二、s01:最简核心循环

2.1 关键组件

组件 作用
client Anthropic SDK 客户端,负责调用 LLM API
MODEL 模型 ID,如 claude-sonnet-4-20250514
SYSTEM 系统提示词,告诉模型它的角色和规则
TOOLS 工具定义列表,描述模型可以调用什么工具
messages 完整的对话历史,跨轮次保持上下文

2.2 工具定义格式

工具定义是一个 JSON Schema,告诉 LLM "你有什么工具可用、每个工具接受什么参数":

TOOLS = [{
    "name": "bash",                              # 工具名
    "description": "Run a shell command.",        # 描述(模型据此判断何时使用)
    "input_schema": {                             # 参数 Schema
        "type": "object",
        "properties": {
            "command": {"type": "string"}         # 参数名和类型
        },
        "required": ["command"],                  # 必填参数
    },
}]

重要理解:这个定义不是给人看的,是给 LLM 看的。LLM 读了这个"说明书"后,就知道可以调用名为 bash 的工具,传入一个 command 字符串参数。

2.3 核心循环代码解析

def agent_loop(messages: list):
    while True:
        # 第一步:调用 LLM
        response = client.messages.create(
            model=MODEL,
            system=SYSTEM,
            messages=messages,
            tools=TOOLS,
            max_tokens=8000,
        )
​
        # 第二步:保存 LLM 回复到消息历史
        messages.append({"role": "assistant", "content": response.content})
​
        # 第三步:检查是否还要继续
        # stop_reason == "tool_use" → 模型想调用工具,继续
        # stop_reason == "end_turn" → 模型说完了,退出
        if response.stop_reason != "tool_use":
            return
​
        # 第四步:执行工具调用
        results = []
        for block in response.content:
            if block.type == "tool_use":
                output = run_bash(block.input["command"])
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,   # 必须匹配!
                    "content": output,
                })
​
        # 第五步:工具结果以 user 角色加入历史
        messages.append({"role": "user", "content": results})
        # → 回到 while True 开头

2.4 关键细节

  1. tool_use_id 必须匹配:API 用 block.id 来关联"哪个工具调用"对应"哪个结果"。如果 ID 不匹配,API 会报错。

  2. tool_result 必须以 user 角色发送:这是 Anthropic API 的规定。即使结果来自工具执行,也要包装成 user 消息。

  3. response.content 是一个列表:可能同时包含 text 块(文字回复)和 tool_use 块(工具调用请求)。

  4. 安全措施

    • 危险命令检测(rm -rf /sudo 等)

    • 超时限制(120 秒)

    • 输出长度限制(50000 字符,防止撑爆上下文)


三、s02:工具调度模式

3.1 从一个工具到多个工具

s01 只有 bash 一个工具,s02 扩展到了四个:

工具名 功能 典型用途
bash 执行 shell 命令 ls -lagit status
read_file 读取文件内容 查看代码、读配置
write_file 写入文件 创建新文件
edit_file 精确替换文本 修改代码中的某一处

3.2 工具调度表(Dispatch Map)

核心改进:用字典把工具名映射到处理函数,替代 if-elif 链:

TOOL_HANDLERS = {
    "bash":       lambda **kw: run_bash(kw["command"]),
    "read_file":  lambda **kw: run_read(kw["path"], kw.get("limit")),
    "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
    "edit_file":  lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
}

使用时只需一行:

handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"

关键洞察**block.input 把字典展开为关键字参数。比如 LLM 返回 {"command": "ls -la"},展开后就是 run_bash(command="ls -la")

3.3 路径安全检查

def safe_path(p: str) -> Path:
    path = (WORKDIR / p).resolve()
    if not path.is_relative_to(WORKDIR):
        raise ValueError(f"Path escapes workspace: {p}")
    return path

为什么需要这个:LLM 可能试图访问 ../../etc/passwd 这样的路径。safe_path 确保所有文件操作都在工作目录内,防止越权。

3.4 循环没有变!

"The loop didn't change at all. I just added tools."

s01 到 s02,agent_loop 函数的结构完全一样!只是:

  • TOOLS 列表从 1 个变成 4 个

  • 工具执行从直接调用 run_bash 变成查表调度

这就是 Harness 架构的优雅之处:核心循环是稳定的,所有扩展都是"往循环里加东西"。


四、消息历史的数据结构

理解消息历史的结构对于理解整个 Harness 至关重要:

messages = [
    # 用户输入
    {"role": "user", "content": "帮我看看当前目录有什么文件"},
​
    # LLM 回复(包含工具调用请求)
    {"role": "assistant", "content": [
        TextBlock(type="text", text="好的,让我查看一下"),
        ToolUseBlock(type="tool_use", id="toolu_123", name="bash",
                     input={"command": "ls -la"})
    ]},
​
    # 工具执行结果
    {"role": "user", "content": [
        {"type": "tool_result", "tool_use_id": "toolu_123",
         "content": "total 32\ndrwxr-xr-x  5 user ..."}
    ]},
​
    # LLM 最终回复(没有工具调用,循环结束)
    {"role": "assistant", "content": [
        TextBlock(type="text", text="当前目录有 5 个文件...")
    ]},
]

消息交替规则userassistantuserassistant,必须严格交替。


五、REPL(交互式命令行)

每个版本都有一个 REPL 入口,让你可以直接运行和测试:

if __name__ == "__main__":
    history = []  # 跨轮次的消息历史
​
    while True:
        query = input("\033[36ms01 >> \033[0m")  # 彩色提示符
        if query.strip().lower() in ("q", "exit", ""):
            break
​
        history.append({"role": "user", "content": query})
        agent_loop(history)  # 运行 agent
​
        # 打印模型回复
        for block in history[-1]["content"]:
            if hasattr(block, "text"):
                print(block.text)

history 跨轮次保持:这意味着你和 agent 的对话是有上下文的。你可以说"把刚才那个文件改一下",agent 能理解"刚才那个"指的是什么。


六、实战练习

练习 1:运行 s01

cd test/agents
python s01_agent_loop.py

试试输入:

  • 帮我看看当前目录有什么文件

  • 找出项目中所有 Python 文件

  • 统计代码行数

观察 agent 是如何一步步调用 bash 工具来完成任务的。

练习 2:扩展 s02 的工具

尝试添加一个新工具 list_dir,功能是列出目录内容(不需要 shell):

  1. 写工具处理函数

  2. 加入 TOOL_HANDLERS

  3. 加入 TOOLS 定义

  4. 运行测试

练习 3:观察消息历史

agent_loop 中加一行 print(json.dumps(messages, default=str, indent=2)),观察每一轮循环后消息历史是如何增长的。


七、核心要点总结

概念 说明
while 循环 Agent 的心脏,不断调用 LLM 直到任务完成
stop_reason "tool_use" = 继续,"end_turn" = 停止
tool_use_id 关联请求和结果的唯一标识
dispatch map {工具名: 处理函数} 的路由表,优雅的扩展模式
safe_path 路径安全检查,防止越权访问
messages 完整的对话历史,跨轮次保持上下文

3-任务规划与进度追踪

一、为什么需要任务规划?

在 s01/s02 中,agent 只是"收到指令 → 执行",没有计划性。面对复杂任务时,它可能:

  • 做到一半忘了总目标

  • 在细节中迷失方向

  • 用户不知道它在做什么、做到哪了

解决方案:让 agent 自己管理一个任务清单,规划步骤、跟踪进度。


二、s03:内存中的 TodoManager

2.1 设计思路

TodoManager 不是给人用的 todo app,而是给 LLM 用的"进度跟踪器":

+----------+      +-------+      +---------+
|   User   | ---> |  LLM  | ---> | Tools   |
|  prompt  |      |       |      | + todo  |
+----------+      +---+---+      +----+----+
                      ^               |
                      |   tool_result |
                      +---------------+
                            |
                +-----------+-----------+
                | TodoManager state     |
                | [ ] task A            |  <-- 待办
                | [>] task B <- doing   |  <-- 正在做
                | [x] task C            |  <-- 已完成
                +-----------------------+

2.2 TodoManager 实现

class TodoManager:
    def __init__(self):
        self.items = []  # 存储所有任务
​
    def update(self, items: list) -> str:
        """模型每次调用 todo 工具时,传入完整的任务列表来替换旧列表"""
        validated = []
        in_progress_count = 0
​
        for item in items:
            text = str(item.get("text", "")).strip()
            status = str(item.get("status", "pending")).lower()
​
            # 校验状态只能是三种
            if status not in ("pending", "in_progress", "completed"):
                raise ValueError(f"invalid status '{status}'")
​
            if status == "in_progress":
                in_progress_count += 1
​
            validated.append({"id": item_id, "text": text, "status": status})
​
        # 限制:同一时间只能有一个任务处于"进行中"
        if in_progress_count > 1:
            raise ValueError("Only one task can be in_progress at a time")
​
        self.items = validated
        return self.render()

2.3 关键设计决策

决策 原因
全量替换(不是增量更新) 让模型每次传完整列表,避免增量同步的复杂性
最多 20 个任务 防止模型创建过多任务撑爆上下文
只允许 1 个 in_progress 强制模型专注,不要同时做多件事
三种状态 pending → in_progress → completed,简洁清晰

2.4 催促机制(Nudge)

这是 s03 最巧妙的设计:

def agent_loop(messages: list):
    rounds_since_todo = 0  # 模型多久没更新 todo 了
​
    while True:
        response = client.messages.create(...)
        messages.append({"role": "assistant", "content": response.content})
​
        if response.stop_reason != "tool_use":
            return
​
        # 执行工具,检查是否用了 todo
        used_todo = False
        for block in response.content:
            if block.type == "tool_use":
                if block.name == "todo":
                    used_todo = True
                # ... 执行工具
​
        # 更新催促计数器
        rounds_since_todo = 0 if used_todo else rounds_since_todo + 1
​
        # 如果连续 3 轮没更新 todo,注入提醒
        if rounds_since_todo >= 3:
            results.insert(0, {
                "type": "text",
                "text": "<reminder>Update your todos.</reminder>"
            })

工作原理:如果模型连续 3 轮都在调用工具但没有更新 todo,说明它可能忘了更新进度。这时自动注入一条 <reminder> 消息,模型看到后通常会去更新 todo。

为什么不用系统提示词催促? 因为系统提示词是静态的,每轮都一样。催促应该是动态的——只在需要时出现。


三、s07:持久化任务系统

3.1 s03 的局限

s03 把任务存在内存里(Python 对象)。问题:

  • 上下文被压缩(s06)后,任务信息丢失

  • 程序重启后,任务信息丢失

  • 无法表达任务之间的依赖关系

3.2 文件存储

s07 把任务存成 JSON 文件:

.tasks/
  task_1.json  {"id":1, "subject":"...", "status":"completed", ...}
  task_2.json  {"id":2, "blockedBy":[1], "status":"pending", ...}
  task_3.json  {"id":3, "blockedBy":[2], "blocks":[], ...}

每个任务的完整结构:

{
    "id": 1,
    "subject": "写登录接口",
    "description": "实现 JWT 认证的登录 API",
    "status": "pending",
    "blockedBy": [],
    "blocks": [2, 3],
    "owner": ""
}

3.3 依赖关系图

s07 最重要的新增能力是任务依赖

+----------+     +----------+     +----------+
| task 1   | --> | task 2   | --> | task 3   |
| 已完成   |     | 被1阻塞  |     | 被2阻塞  |
+----------+     +----------+     +----------+
     |                ^
     +--- 当 task 1 完成时,自动从 task 2 的 blockedBy 中移除
自动解除依赖
def update(self, task_id, status=None, ...):
    task = self._load(task_id)
​
    if status:
        task["status"] = status
​
        # 核心机制:完成任务时,自动解除其他任务对它的依赖
        if status == "completed":
            self._clear_dependency(task_id)
​
def _clear_dependency(self, completed_id):
    """从所有任务的 blockedBy 中移除已完成的任务 ID"""
    for f in self.dir.glob("task_*.json"):
        task = json.loads(f.read_text())
        if completed_id in task.get("blockedBy", []):
            task["blockedBy"].remove(completed_id)
            self._save(task)

示例

  1. task_2 的 blockedBy = [1]

  2. 执行 update(task_id=1, status="completed")

  3. task_1 标记为 completed

  4. task_2 的 blockedBy 自动从 [1] 变成 []

  5. agent 看到 task_2 不再被阻塞,可以开始执行

3.4 四个任务工具

工具名 功能 使用场景
task_create 创建任务 规划阶段
task_update 更新状态/依赖 执行过程中
task_list 列出所有任务 查看进度
task_get 获取单个任务详情 开始工作前

3.5 显示格式

[x] #1: 搭建项目框架
[>] #2: 写登录接口
[ ] #3: 写测试 (blocked by: [2])
[ ] #4: 部署 (blocked by: [2, 3])

四、s03 vs s07 对比

维度 s03 TodoManager s07 TaskManager
存储 内存(Python 对象) 磁盘(JSON 文件)
持久性 压缩/重启后丢失 永久保留
依赖关系 有(blockedBy/blocks)
适用场景 短期简单任务 长期复杂项目
更新方式 全量替换 单个任务增量更新
ID 管理 用户指定 自增

什么时候用哪个?

  • 快速的临时任务列表 → s03 TodoManager

  • 需要依赖管理的复杂项目 → s07 TaskManager

  • s_full.py 中两者并存:TodoWrite 用于快速清单,task_* 用于持久化管理


五、核心设计模式

5.1 状态外化(Externalize State)

"状态存在对话之外,就不怕对话被压缩。"

s03 的任务状态在对话消息里(内存),s07 的任务状态在文件系统中。即使对话被压缩到只剩一条摘要消息,agent 仍然可以通过 task_list 工具读取到所有任务的当前状态。

5.2 自我监督(Self-Monitoring)

通过催促机制,Harness 实现了对 agent 行为的监督:

  • 不是通过复杂的逻辑判断

  • 而是通过简单的计数 + 消息注入

  • LLM 收到 <reminder> 后,自己就会去更新进度

这体现了 Harness 的设计哲学:用简单的机制引导复杂的行为


六、实战练习

练习 1:运行 s03,观察催促机制

python s03_todo_write.py

输入一个复杂任务(如"分析所有 Python 文件的代码质量"),观察:

  1. 模型是否会主动创建 todo 列表?

  2. 连续几轮后是否会收到催促?

  3. 催促后模型是否会更新 todo?

练习 2:运行 s07,测试依赖关系

python s07_task_system.py

输入:"帮我创建一个有依赖关系的任务计划:先搭建框架,然后写登录,最后写测试。测试依赖登录,登录依赖框架。"

观察 agent 如何:

  1. 创建三个任务

  2. 设置依赖关系

  3. 按依赖顺序执行

  4. 完成一个任务后自动解锁下一个

练习 3:把 s03 的催促机制加到 s07

s07 没有催促机制。尝试把 s03 的 rounds_since_todo 逻辑移植到 s07,当模型连续 3 轮没有调用 task_update 时注入提醒。


七、核心要点总结

概念 说明
TodoManager 内存中的轻量级任务清单,全量替换
TaskManager 磁盘上的持久化任务系统,带依赖图
催促机制 计数器 + 消息注入,引导 LLM 更新进度
依赖自动解除 任务完成时,自动从其他任务的 blockedBy 中移除
状态外化 把状态存到对话之外(文件),防止压缩丢失

4-子代理与上下文隔离

一、问题:上下文污染

当 agent 处理复杂任务时,消息历史会越来越长。比如让 agent "分析项目结构并重构":

  1. 探索阶段:调用 20 次 bash/read_file 来了解项目

  2. 重构阶段:需要根据分析结果修改代码

问题是,探索阶段产生的大量输出(文件列表、代码内容、目录结构)会塞满上下文窗口,导致后续重构时 agent "记不住"重要的分析结论。

这就像你的桌子上堆满了调研材料,已经没地方放电脑写代码了。


二、解决方案:子代理

把"探索"这种子任务交给一个全新的子代理去做:

  • 子代理有自己独立的、干净的消息历史

  • 做完后只返回一个摘要给父代理

  • 父代理的上下文保持干净

架构图

父代理(Parent)                  子代理(Subagent)
+------------------+             +------------------+
| messages=[...]   |             | messages=[]      |  <-- 全新的空历史
|                  |  派遣任务    |                  |
| tool: task       | ----------> | while tool_use:  |
|   prompt="..."   |             |   调用工具       |
|   description="" |             |   收集结果       |
|                  |  只返回摘要  |                  |
|   result = "..." | <---------- | return 最后的文字 |
+------------------+             +------------------+
          |
父代理的上下文保持干净。
子代理的上下文在完成后被丢弃。

类比理解

就像公司里老板(父代理)把调研任务交给实习生(子代理),实习生自己去查资料、跑数据,最后只交一份报告给老板。老板不需要知道实习生中间查了哪些网页、试了哪些方法。


三、核心实现

3.1 子代理执行函数

def run_subagent(prompt: str) -> str:
    """
    启动一个子代理来执行任务。
​
    关键特点:
    1. messages=[] —— 全新的消息历史,不继承父代理的任何上下文
    2. 只有基础工具 —— 不能再派遣子代理(防止递归)
    3. 只返回摘要 —— 子代理的完整对话历史在函数结束后被丢弃
    """
    # 全新的消息历史!这就是"上下文隔离"的关键
    sub_messages = [{"role": "user", "content": prompt}]
​
    # 安全限制:最多循环 30 轮
    for _ in range(30):
        response = client.messages.create(
            model=MODEL,
            system=SUBAGENT_SYSTEM,
            messages=sub_messages,
            tools=CHILD_TOOLS,      # 只有基础工具,没有 task 工具
            max_tokens=8000,
        )
        sub_messages.append({"role": "assistant", "content": response.content})
​
        if response.stop_reason != "tool_use":
            break
​
        # 执行工具,收集结果
        results = []
        for block in response.content:
            if block.type == "tool_use":
                handler = TOOL_HANDLERS.get(block.name)
                output = handler(**block.input) if handler else f"Unknown"
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": str(output)[:50000],
                })
        sub_messages.append({"role": "user", "content": results})
​
    # 只返回最后一轮的文字(摘要)
    # sub_messages(完整对话)在这里被丢弃
    return "".join(
        b.text for b in response.content if hasattr(b, "text")
    ) or "(no summary)"

3.2 防递归设计

# 子代理的工具列表:只有基础工具
CHILD_TOOLS = [bash, read_file, write_file, edit_file]  # 没有 task 工具!
​
# 父代理的工具列表:基础工具 + task 工具
PARENT_TOOLS = CHILD_TOOLS + [task]  # 可以派遣子代理

为什么子代理没有 task 工具?

防止无限递归:

父 → 子 → 孙 → 曾孙 → ...  (永远不停!)

子代理没有 task 工具,就不能再派遣子代理,递归自然终止。

3.3 父代理的工具调用分支

def agent_loop(messages: list):
    while True:
        response = client.messages.create(...)
        messages.append({"role": "assistant", "content": response.content})
​
        if response.stop_reason != "tool_use":
            return
​
        results = []
        for block in response.content:
            if block.type == "tool_use":
                if block.name == "task":
                    # 启动子代理!
                    desc = block.input.get("description", "subtask")
                    print(f"> task ({desc}): {block.input['prompt'][:80]}")
                    output = run_subagent(block.input["prompt"])
                else:
                    # 普通工具直接执行
                    handler = TOOL_HANDLERS.get(block.name)
                    output = handler(**block.input)
​
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": str(output),
                })
​
        messages.append({"role": "user", "content": results})

四、上下文隔离的效果

不用子代理(s01/s02)

messages 长度随时间增长:
轮次 1: [user, assistant, user]                      → 3 条
轮次 5: [user, assistant, user, ..., assistant, user] → 11 条
轮次 20: 超过 40 条消息,上下文快满了

使用子代理(s04)

父代理的 messages 保持精简:
[user, assistant(task调用), user(摘要结果)]  → 始终只有几条
​
子代理的 sub_messages 做完就丢弃:
[user(任务), assistant, user(工具结果), ..., assistant(摘要)]
                    ↑ 完成后全部丢弃,只保留最后的摘要文字

五、s_full.py 中的增强子代理

在完整版本中,子代理增加了 agent_type 参数:

def run_subagent(prompt: str, agent_type: str = "Explore") -> str:
    sub_tools = [bash, read_file]  # 基础:只读
​
    # general-purpose 类型的子代理可以修改文件
    if agent_type != "Explore":
        sub_tools += [write_file, edit_file]
类型 工具 用途
Explore bash + read_file 只读探索(安全)
general-purpose 全部基础工具 需要修改文件的子任务

六、子代理的设计原则

6.1 输入明确

给子代理的 prompt 应该是一个清晰、自包含的任务描述:

# 好的 prompt
"列出项目中所有 Python 文件,统计每个文件的行数,并按行数排序"
​
# 不好的 prompt
"看看那个文件"  # 子代理不知道"那个文件"是哪个

6.2 输出是摘要

子代理的返回值是最后一轮回复中的文字。系统提示词告诉它要"总结发现":

SUBAGENT_SYSTEM = f"You are a coding subagent at {WORKDIR}. "
                  f"Complete the given task, then summarize your findings."

6.3 共享文件系统

子代理和父代理在同一个工作目录下操作。它们不共享对话历史,但共享文件系统。这意味着:

  • 子代理创建的文件,父代理可以看到

  • 子代理修改的文件,父代理读到的是修改后的版本

  • 文件系统是它们之间的"持久化通信通道"


七、实战练习

练习 1:观察上下文隔离

python s04_subagent.py

输入:"先帮我调研这个项目中有哪些 Python 文件和它们的功能,然后基于调研结果,给我一个重构建议。"

观察:

  1. agent 是否使用了 task 工具来派遣子代理?

  2. 子代理的输出是否只有摘要?

  3. 父代理收到摘要后,是否能给出合理的建议?

练习 2:添加子代理轮次限制

当前子代理最多循环 30 轮。尝试:

  1. 把限制改为 5 轮,观察子代理是否能在有限轮次内完成任务

  2. 如果不能完成,子代理的"摘要"是什么样的?

练习 3:多层子代理

取消防递归限制(给子代理也加上 task 工具),观察会发生什么。

  • 子代理是否会再次派遣子子代理?

  • 如果会,什么情况下会停止?

  • 这样做有什么风险?


八、核心要点总结

概念 说明
上下文隔离 子代理有独立的消息历史,不污染父代理
摘要返回 子代理只返回文字摘要,完整对话被丢弃
防递归 子代理没有 task 工具,不能再派遣子代理
共享文件系统 上下文隔离,文件系统不隔离
agent_type 区分只读探索和读写操作

核心公式子代理 = 新消息历史 + 受限工具 + 摘要返回

5-技能加载与上下文压缩

一、s05:技能按需加载

1.1 问题

LLM 不可能什么都懂。比如让它处理 PDF,它可能不知道最佳实践。你可以把专业知识写成 "技能文件",让 LLM 需要时自己去加载。

但如果把所有技能的完整内容都塞进系统提示词:

  • 10 个技能 × 2000 token = 20000 token,太浪费了

  • 大部分时候只需要其中一两个

1.2 两层加载机制

第一层(便宜):只在系统提示词里放技能的名称和简介(~100 token/技能)
    → 模型知道"有哪些技能可用"
​
第二层(按需):模型调用 load_skill 工具时,才把完整内容返回
    → 只在真正需要时加载
​
类比:
  第一层 = 书的目录(告诉你有哪些章节)
  第二层 = 翻到那一章去读(只在需要时才翻开)

1.3 技能文件结构

skills/
  pdf/
    SKILL.md          <-- 技能文件
  code-review/
    SKILL.md

SKILL.md 格式(YAML 前置元数据 + Markdown 正文):

---
name: pdf
description: Process PDF files using Python
tags: file-processing
---
## 如何处理 PDF
第一步:安装 PyPDF2 ...
第二步:...

1.4 SkillLoader 核心代码

class SkillLoader:
    def __init__(self, skills_dir: Path):
        self.skills = {}
        self._load_all()  # 启动时一次性扫描
​
    def _load_all(self):
        """扫描 skills/ 目录下所有 SKILL.md"""
        for f in sorted(self.skills_dir.rglob("SKILL.md")):
            text = f.read_text()
            meta, body = self._parse_frontmatter(text)
            name = meta.get("name", f.parent.name)
            self.skills[name] = {"meta": meta, "body": body}
​
    def get_descriptions(self) -> str:
        """第一层:生成技能简介,嵌入系统提示词"""
        # 输出:  - pdf: Process PDF files [file-processing]
        lines = []
        for name, skill in self.skills.items():
            desc = skill["meta"].get("description", "No description")
            lines.append(f"  - {name}: {desc}")
        return "\n".join(lines)
​
    def get_content(self, name: str) -> str:
        """第二层:返回完整技能内容"""
        skill = self.skills.get(name)
        if not skill:
            return f"Error: Unknown skill '{name}'"
        return f'<skill name="{name}">\n{skill["body"]}\n</skill>'

1.5 系统提示词嵌入

SYSTEM = f"""You are a coding agent at {WORKDIR}.
Use load_skill to access specialized knowledge before tackling unfamiliar topics.
​
Skills available:
{SKILL_LOADER.get_descriptions()}"""

模型看到的效果:

You are a coding agent at /workspace.
Use load_skill to access specialized knowledge.
​
Skills available:
  - pdf: Process PDF files using Python [file-processing]
  - code-review: Review code quality

当模型遇到 PDF 相关任务时,它会先调用 load_skill("pdf"),获取完整的处理指南,然后按照指南操作。


二、s06:三层上下文压缩

2.1 问题

LLM 的上下文窗口有限(如 200k token)。agent 工作久了,消息历史会越来越长。怎么让 agent "永远"工作下去?

答案:有策略地遗忘。

2.2 三层压缩流水线

每一轮对话都经过:
​
[第一层: micro_compact]           (每轮静默执行)
  把 3 轮之前的工具结果替换为占位符
  → 省掉大量旧的命令输出
​
[检查: token 数 > 50000?]
   |               |
   否              是
   |               |
   继续执行    [第二层: auto_compact]   (自动触发)
                保存完整对话到磁盘
                让 LLM 生成摘要
                用摘要替换所有消息
​
[第三层: compact 工具]    (模型主动触发)
  模型调用 compact 工具主动压缩
  效果和第二层一样
​
类比:
  第一层 = 做笔记时只详细记最近 3 页,之前的只写"参考了 XX"
  第二层 = 笔记本快写满了,总结成一页摘要,翻开新本子
  第三层 = 你自己觉得笔记太乱了,主动做一次整理

2.3 第一层:micro_compact

def micro_compact(messages: list) -> list:
    """把旧的工具结果替换为简短占位符,只保留最近 3 次"""
​
    # 收集所有 tool_result 的位置
    tool_results = []
    for msg_idx, msg in enumerate(messages):
        if msg["role"] == "user" and isinstance(msg.get("content"), list):
            for part_idx, part in enumerate(msg["content"]):
                if isinstance(part, dict) and part.get("type") == "tool_result":
                    tool_results.append((msg_idx, part_idx, part))
​
    # 不超过 3 个就不压缩
    if len(tool_results) <= KEEP_RECENT:
        return messages
​
    # 建立 tool_use_id → tool_name 的映射
    tool_name_map = {}
    for msg in messages:
        if msg["role"] == "assistant":
            for block in msg.get("content", []):
                if hasattr(block, "type") and block.type == "tool_use":
                    tool_name_map[block.id] = block.name
​
    # 压缩旧的(保留最近 3 个)
    to_clear = tool_results[:-KEEP_RECENT]
    for _, _, result in to_clear:
        if isinstance(result.get("content"), str) and len(result["content"]) > 100:
            tool_id = result.get("tool_use_id", "")
            tool_name = tool_name_map.get(tool_id, "unknown")
            result["content"] = f"[Previous: used {tool_name}]"
​
    return messages

效果

压缩前:

tool_result: "a.py\nb.py\nc.py\n..."   (2000 字符)
tool_result: "150 lines in a.py"         (50 字符)
tool_result: "整个 a.py 的内容..."       (5000 字符)  ← 保留
tool_result: "整个 b.py 的内容..."       (3000 字符)  ← 保留
tool_result: "test output: 5 passed"     (100 字符)   ← 保留

压缩后:

tool_result: "[Previous: used bash]"     (22 字符)    ← 压缩了!
tool_result: "[Previous: used bash]"     (22 字符)    ← 压缩了!
tool_result: "整个 a.py 的内容..."       (5000 字符)  ← 保留
tool_result: "整个 b.py 的内容..."       (3000 字符)  ← 保留
tool_result: "test output: 5 passed"     (100 字符)   ← 保留

为什么安全? 旧的工具输出通常已经"被模型消化了"——模型已经根据输出做了决策。一个占位符足够让模型知道"之前做过什么"。

2.4 第二层:auto_compact

def auto_compact(messages: list) -> list:
    """完整的上下文压缩"""
​
    # 1. 保存完整对话到磁盘(备份)
    TRANSCRIPT_DIR.mkdir(exist_ok=True)
    transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
    with open(transcript_path, "w") as f:
        for msg in messages:
            f.write(json.dumps(msg, default=str) + "\n")
​
    # 2. 让 LLM 生成摘要
    conversation_text = json.dumps(messages, default=str)[:80000]
    response = client.messages.create(
        model=MODEL,
        messages=[{"role": "user", "content":
            "Summarize this conversation for continuity. Include: "
            "1) What was accomplished, 2) Current state, 3) Key decisions made. "
            "Be concise but preserve critical details.\n\n" + conversation_text}],
        max_tokens=2000,
    )
    summary = response.content[0].text
​
    # 3. 用摘要替换所有消息
    return [
        {"role": "user", "content":
            f"[Conversation compressed. Transcript: {transcript_path}]\n\n{summary}"},
        {"role": "assistant", "content":
            "Understood. I have the context from the summary. Continuing."},
    ]

关键点

  • 完整对话保存到磁盘(JSONL 格式),不会永久丢失

  • 摘要由 LLM 自己生成,它知道什么信息最重要

  • 压缩后只剩 2 条消息(user + assistant),从 50000+ token 变成 ~1000 token

2.5 第三层:模型主动触发

# 在 agent_loop 中
for block in response.content:
    if block.type == "tool_use":
        if block.name == "compact":
            manual_compact = True  # 标记
            output = "Compressing..."
​
# 工具调用处理完后
if manual_compact:
    print("[manual compact]")
    messages[:] = auto_compact(messages)  # 原地替换

messages[:] = ... 的技巧:不能写 messages = auto_compact(messages),因为那只是改了局部变量。用 [:] 才能真正修改外部传入的 list 对象(原地替换)。

2.6 在主循环中的集成

def agent_loop(messages: list):
    while True:
        # ★ 第一层:每轮都执行
        micro_compact(messages)
​
        # ★ 第二层:超限自动触发
        if estimate_tokens(messages) > THRESHOLD:
            messages[:] = auto_compact(messages)
​
        # 调用 LLM
        response = client.messages.create(...)
​
        # ... 执行工具 ...
​
        # ★ 第三层:模型主动触发
        if manual_compact:
            messages[:] = auto_compact(messages)

2.7 Token 估算

def estimate_tokens(messages: list) -> int:
    """经验法则:平均每 4 个字符约等于 1 个 token"""
    return len(str(messages)) // 4

不需要精确,只要能判断"是否快超限了"就行。


三、两个机制的协同

s05 和 s06 解决的是相反的问题:

  • s05(技能加载):按需增加上下文中的知识

  • s06(上下文压缩):有策略地减少上下文中的信息

它们共同让 agent 的上下文窗口保持在一个健康的范围内:

  • 需要专业知识时 → 加载技能(增加)

  • 历史信息太多时 → 压缩清理(减少)


四、实战练习

练习 1:创建一个技能

在项目中创建 skills/python-testing/SKILL.md

---
name: python-testing
description: Best practices for Python unit testing
tags: testing
---
## Python 测试最佳实践
1. 使用 pytest 框架
2. 测试文件命名为 test_*.py
...

运行 s05,看模型是否能发现并使用这个技能。

练习 2:观察压缩效果

在 s06 的 agent_loop 开头加上 token 计数打印:

print(f"[tokens: {estimate_tokens(messages)}]")

给 agent 一个复杂任务,观察 token 数如何增长、何时触发压缩、压缩后变成多少。

练习 3:调整压缩参数

  • KEEP_RECENT 从 3 改为 1,观察 agent 是否因为丢失太多信息而困惑

  • THRESHOLD 从 50000 改为 10000,观察频繁压缩的效果


五、核心要点总结

概念 说明
两层技能加载 简介在系统提示词,完整内容按需加载
三层压缩 micro(每轮)→ auto(超限)→ manual(主动)
对话备份 压缩前保存到磁盘,信息不永久丢失
LLM 自摘要 让 LLM 自己决定什么信息最重要
原地替换 messages[:] = ... 修改外部列表

6-后台任务与异步执行

一、问题:阻塞等待

之前所有命令都是"阻塞式"的——模型调用 bash 后必须等命令跑完才能继续。如果命令要跑 5 分钟(比如跑测试、编译大项目),模型就干等着。

阻塞式执行:
Agent ----[调用bash]----等待5分钟----[拿到结果]----继续工作
                        ↑ 浪费时间

解决方案

让耗时命令在后台线程中运行,模型立即拿到 task_id,可以继续做别的事。

后台执行:
Agent ----[启动A]----[启动B]----[做其他事]----[读取结果]----
               |            |
               v            v
            [A在跑]      [B在跑]        (并行!)
               |            |
               +-- 通知队列 --> [结果注入到下一轮对话]

类比

阻塞式(bash)    → 你去餐厅点菜,站在厨房门口等菜做好才离开
后台式(bg_run)  → 你点完菜拿个号,去逛街,手机响了再回来取

二、BackgroundManager 实现

2.1 核心数据结构

class BackgroundManager:
    def __init__(self):
        self.tasks = {}                # task_id → {status, result, command}
        self._notification_queue = []  # 已完成任务的通知
        self._lock = threading.Lock()  # 保护通知队列的并发安全
组件 作用
tasks 所有任务的状态和结果
_notification_queue 已完成的结果,等待被主线程读取
_lock 线程锁,防止主线程和后台线程同时操作队列

2.2 启动后台任务

def run(self, command: str) -> str:
    """启动后台任务,立即返回 task_id"""
    task_id = str(uuid.uuid4())[:8]  # 如 "a3f2b1c4"
    self.tasks[task_id] = {"status": "running", "result": None, "command": command}
​
    # daemon=True → 主程序退出时自动终止
    thread = threading.Thread(
        target=self._execute,
        args=(task_id, command),
        daemon=True,
    )
    thread.start()
​
    # 立即返回!不等执行完
    return f"Background task {task_id} started: {command[:80]}"

2.3 后台执行线程

def _execute(self, task_id: str, command: str):
    """在后台线程中运行命令"""
    try:
        r = subprocess.run(
            command, shell=True, cwd=WORKDIR,
            capture_output=True, text=True,
            timeout=300,  # 后台任务超时更长:5 分钟
        )
        output = (r.stdout + r.stderr).strip()[:50000]
        status = "completed"
    except subprocess.TimeoutExpired:
        output = "Error: Timeout (300s)"
        status = "timeout"
​
    # 更新任务状态
    self.tasks[task_id]["status"] = status
    self.tasks[task_id]["result"] = output
​
    # 放入通知队列(需要加锁!)
    with self._lock:
        self._notification_queue.append({
            "task_id": task_id,
            "status": status,
            "command": command[:80],
            "result": (output or "(no output)")[:500],
        })

2.4 清空通知队列

def drain_notifications(self) -> list:
    """清空并返回所有已完成的通知"""
    with self._lock:
        notifs = list(self._notification_queue)  # 复制
        self._notification_queue.clear()          # 清空
    return notifs

"drain"(排空)命名:就像把水池里的水全放掉——读完就清空,下次不会重复读取。


三、通知注入机制

3.1 在 agent_loop 中注入

def agent_loop(messages: list):
    while True:
        # ★ 核心:每次调用 LLM 前,检查后台有没有任务完成了
        notifs = BG.drain_notifications()
        if notifs and messages:
            notif_text = "\n".join(
                f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
            )
            # 以 user 消息的形式注入
            messages.append({
                "role": "user",
                "content": f"<background-results>\n{notif_text}\n</background-results>",
            })
            # 保持消息交替
            messages.append({
                "role": "assistant",
                "content": "Noted background results.",
            })
​
        # 调用 LLM(此时模型能"看到"后台任务的结果)
        response = client.messages.create(...)

3.2 时间线示例

时刻 0: 用户 → "帮我跑测试并同时修 bug"
时刻 1: Agent → background_run("pytest tests/")  → 返回 task_id=abc
时刻 2: Agent → bash("sed -i 's/old/new/' bug.py")  → 修 bug
时刻 3: Agent → bash("git diff")  → 查看修改
时刻 4: [后台] pytest 完成,结果放入通知队列
时刻 5: [agent_loop] 调用 LLM 前发现通知 → 注入结果
时刻 6: Agent 看到测试结果 → "5 tests passed, 1 failed"
时刻 7: Agent → bash("cat tests/test_auth.py")  → 检查失败的测试

四、阻塞 vs 后台的选择

场景 建议 原因
ls -la bash(阻塞) 瞬间完成,不需要后台
cat file.py bash(阻塞) 读文件很快
pytest background_run 可能跑几分钟
npm install background_run 安装依赖耗时
docker build background_run 构建镜像很慢
git status bash(阻塞) 几乎瞬间完成

原则:快速命令用 bash(阻塞),耗时命令用 background_run。


五、线程安全

为什么需要锁?

主线程(agent_loop)              后台线程(_execute)
                                       |
drain_notifications()             同时写入 _notification_queue
      |                                 |
      |    ← 竞态条件!→               |
      v                                 v

如果不加锁,可能发生:

  1. 主线程正在读队列

  2. 后台线程同时往队列里写

  3. 数据损坏或丢失

s08 的方案:Lock

self._lock = threading.Lock()
​
# 写入时加锁
with self._lock:
    self._notification_queue.append(...)
​
# 读取时加锁
with self._lock:
    notifs = list(self._notification_queue)
    self._notification_queue.clear()

s_full 的优化方案:Queue

from queue import Queue
self.notifications = Queue()  # 线程安全的队列,无需手动加锁
​
# 写入(线程安全)
self.notifications.put(notif)
​
# 读取(线程安全)
while not self.notifications.empty():
    notifs.append(self.notifications.get_nowait())

Queue 内部已经处理了线程安全,代码更简洁。


六、实战练习

练习 1:运行 s08

python s08_background_tasks.py

输入:"帮我在后台跑一个慢命令 sleep 10 && echo done,同时查看当前目录文件"

观察:

  1. agent 是否用了 background_run

  2. 在等待期间是否继续做其他事?

  3. 后台任务完成后结果是否被自动注入?

练习 2:手动查询后台状态

运行 s08 后,输入:"检查一下后台任务的状态"

观察 agent 是否使用 check_background 工具。

练习 3:并发后台任务

输入:"同时在后台启动三个命令",观察多个后台任务是否能正确并行。


七、核心要点总结

概念 说明
background_run 在后台线程启动命令,立即返回 task_id
通知队列 后台完成的结果暂存,主线程下一轮读取
drain 模式 读完即清空,同一结果不会读两次
线程锁 保护共享数据的并发安全
消息注入 后台结果以 <background-results> 标签注入对话

核心公式后台执行 + 通知队列 + 调用前注入 = 异步 Agent

7-多代理团队协作

一、从子代理到团队

版本 模式 类比
s04 子代理 派遣 → 执行 → 返回摘要 → 销毁 叫外卖(下单 → 等送到 → 骑手走了)
s09 队友 派遣 → 工作 → 空闲 → 工作 → ... → 关闭 组建团队(招人 → 分工 → 互相沟通)

s04 的子代理是一次性的,s09 的队友是持久的——它们在自己的线程中长期运行,通过文件系统的"收件箱"互相通信。


二、s09:基础团队

2.1 架构

.team/config.json                   .team/inbox/
+----------------------------+      +------------------+
| {"team_name": "default",   |      | alice.jsonl      |  ← alice 的收件箱
|  "members": [              |      | bob.jsonl        |  ← bob 的收件箱
|    {"name":"alice",         |      | lead.jsonl       |  ← 领导的收件箱
|     "role":"coder",         |      +------------------+
|     "status":"idle"}
|  ]}
+----------------------------+
​
线程结构:
主线程: 领导(lead)         工作线程: alice        工作线程: bob
+------------------+        +------------------+   +------------------+
| agent_loop       |        | agent_loop       |   | agent_loop       |
| 派遣任务         |        | 执行工具         |   | 等待消息         |
| 收发消息         |        | 收发消息         |   | 执行工具         |
+------------------+        +------------------+   +------------------+
       ↕ 消息通过文件收件箱传递 ↕                         ↕

2.2 MessageBus(消息总线)

每个队友有一个 .jsonl 文件作为收件箱(每行一个 JSON):

class MessageBus:
    def send(self, sender, to, content, msg_type="message", extra=None):
        """往对方的 .jsonl 文件末尾追加一行"""
        msg = {
            "type": msg_type,
            "from": sender,
            "content": content,
            "timestamp": time.time(),
        }
        with open(self.dir / f"{to}.jsonl", "a") as f:
            f.write(json.dumps(msg) + "\n")
​
    def read_inbox(self, name):
        """读取并清空收件箱(drain 模式)"""
        inbox_path = self.dir / f"{name}.jsonl"
        messages = [json.loads(l) for l in inbox_path.read_text().splitlines() if l]
        inbox_path.write_text("")  # 清空
        return messages

为什么用文件而不用内存?

  • 文件天然持久化(程序重启后消息还在)

  • 不需要复杂的进程间通信

  • JSONL 格式简单、易追踪调试

2.3 领导 vs 队友的工具差异

工具 领导 队友
bash, read_file, write_file, edit_file
send_message
read_inbox
spawn_teammate
list_teammates
broadcast

关键设计:队友不能创建新队友(类似 s04 子代理不能再派遣子代理),只有领导有这个权限。

2.4 5 种消息类型

VALID_MSG_TYPES = {
    "message",                # 普通文本消息
    "broadcast",              # 广播给所有人
    "shutdown_request",       # 请求关闭(s10)
    "shutdown_response",      # 关闭回复(s10)
    "plan_approval_response", # 方案审批回复(s10)
}

三、s10:团队协议

3.1 问题

s09 的队友之间只能发普通消息,没有"规矩":

  • 领导想关闭一个队友,直接 kill 线程?太粗暴了

  • 队友想做大改动,直接就做了?万一做错了怎么办?

3.2 关闭协议(Shutdown Protocol)

领导                              队友
+---------------------+          +---------------------+
| shutdown_request     |          |                     |
| {                    | -------> | 收到关闭请求        |
|   request_id: abc    |          | 自己决定:同意吗?   |
| }                    |          |                     |
+---------------------+          +---------------------+
                                         |
+---------------------+          +-------v-------------+
| shutdown_response    | <------- | shutdown_response   |
| {                    |          | {                   |
|   request_id: abc    |          |   request_id: abc   |  ← 同一个 ID
|   approve: true      |          |   approve: true     |
| }                    |          | }                   |
+---------------------+          +---------------------+
        |
        v
状态 → "shutdown",线程停止

3.3 方案审批协议(Plan Approval Protocol)

队友                              领导
+---------------------+          +---------------------+
| plan_approval        |          |                     |
| {plan: "重构方案..."}| -------> | 审阅方案内容         |
+---------------------+          | 通过还是拒绝?       |
                                 +---------------------+
                                         |
+---------------------+          +-------v-------------+
| plan_approval_resp   | <------- | plan_approval       |
| {approve: true}      |          | {request_id: abc,   |
+---------------------+          |  approve: true}      |
                                 +---------------------+

3.4 核心模式:request_id 关联

两个协议共用同一个模式:

# 发起方生成唯一 ID
req_id = str(uuid.uuid4())[:8]
​
# 在追踪器中记录(状态为 pending)
shutdown_requests[req_id] = {"target": teammate, "status": "pending"}
​
# 发送请求
BUS.send("lead", teammate, "Please shut down.",
         "shutdown_request", {"request_id": req_id})
​
# 接收方回复时带上同一个 ID
BUS.send(sender, "lead", reason,
         "shutdown_response", {"request_id": req_id, "approve": True})

类比

  • 关闭协议 = 公司裁员流程(先协商,不是直接开除)

  • 方案审批 = 项目审批流程(先提方案,等批准再做)

  • request_id = 工单编号(追踪从发起到完成的全过程)


四、s11:自主代理

4.1 问题

s09/s10 的队友做完当前任务就变 idle,线程结束。有新活还得重新 spawn。

4.2 自主生命周期

+-------+
| spawn |  创建并启动
+---+---+
    |
    v
+-------+  tool_use    +-------+
| 工作  | <----------- |  LLM  |  正常的工具调用循环
+---+---+              +-------+
    |
    | stop_reason != tool_use(或调用了 idle 工具)
    v
+--------+
| 空闲   |  每 5 秒轮询一次,最多等 60 秒
+---+----+
    |
    +---> 检查收件箱 → 有消息? → 回到工作状态
    |
    +---> 扫描 .tasks/ → 有未认领的任务? → 认领 → 回到工作
    |
    +---> 超时(60秒) → 关闭线程

4.3 核心代码:外层 while True

def _loop(self, name, role, prompt):
    messages = [{"role": "user", "content": prompt}]
​
    while True:  # ★ 外层循环:工作和空闲交替
​
        # === 工作阶段 ===
        for _ in range(50):
            # 检查收件箱
            inbox = BUS.read_inbox(name)
            for msg in inbox:
                if msg.get("type") == "shutdown_request":
                    return  # 收到关闭请求,直接退出
​
            response = client.messages.create(...)
            if response.stop_reason != "tool_use":
                break  # 当前任务完成
​
            # 执行工具...
            if idle_requested:
                break  # 模型主动要求空闲
​
        # === 空闲阶段 ===
        self._set_status(name, "idle")
        resume = False
​
        for _ in range(IDLE_TIMEOUT // POLL_INTERVAL):  # 60秒/5秒 = 12次
            time.sleep(POLL_INTERVAL)
​
            # 优先级 1:检查收件箱
            inbox = BUS.read_inbox(name)
            if inbox:
                resume = True
                break
​
            # 优先级 2:扫描任务板
            unclaimed = scan_unclaimed_tasks()
            if unclaimed:
                task = unclaimed[0]
                claim_task(task["id"], name)  # 认领
                resume = True
                break
​
        if not resume:
            return  # 60 秒没找到新活,关闭
​
        self._set_status(name, "working")  # 回到工作

4.4 任务认领与竞态防护

_claim_lock = threading.Lock()
​
def claim_task(task_id, owner):
    """加锁认领,防止两个队友同时认领同一个任务"""
    with _claim_lock:
        task = json.loads(path.read_text())
        task["owner"] = owner
        task["status"] = "in_progress"
        path.write_text(json.dumps(task, indent=2))

4.5 身份重注入

当消息历史被压缩到很短(≤3 条)时,队友可能"忘了自己是谁":

if len(messages) <= 3:
    messages.insert(0, {
        "role": "user",
        "content": f"<identity>You are '{name}', role: {role}, "
                   f"team: {team_name}. Continue your work.</identity>"
    })
    messages.insert(1, {
        "role": "assistant",
        "content": f"I am {name}. Continuing."
    })

五、演进总结

s04: 一次性子代理     → 派遣 → 做完 → 销毁
s09: 持久化队友       → 派遣 → 工作 → 空闲 → 再工作
s10: + 协议           → 有规矩的协作(关闭协议、方案审批)
s11: + 自主能力       → 自己找活干,不需要分配
能力 s04 s09 s10 s11
工具调用
持久化
收件箱通信
关闭协议
方案审批
自动认领
空闲轮询
身份重注入

六、实战练习

练习 1:运行 s09,组建团队

python s09_agent_teams.py

输入:"创建两个队友:alice 负责写代码,bob 负责写测试。让 alice 先写一个简单的 Python 函数,然后让 bob 为它写测试。"

特殊命令:

  • /team — 查看团队状态

  • /inbox — 查看领导收件箱

练习 2:观察协议交互

python s10_team_protocols.py

创建一个队友,然后用 shutdown_request 请求关闭。观察协议交互过程。

练习 3:测试自主认领

python s11_autonomous_agents.py
  1. 先创建几个任务(通过 task_create)

  2. 再创建一个自主队友

  3. /tasks 查看任务被自动认领的过程


七、核心要点总结

概念 说明
MessageBus 基于 JSONL 文件的消息总线,drain 模式
TeammateManager 管理队友的创建、运行、状态追踪
关闭协议 request_id 关联的请求-回复握手
方案审批 队友提交 → 领导审批 → 执行或修改
自主轮询 空闲时主动检查收件箱和任务板
任务认领锁 防止并发认领同一任务
身份重注入 上下文压缩后防止"失忆"

8-工作树隔离与生产实战

第一部分:s12 工作树隔离

一、问题

之前所有队友/子代理都在同一个目录下工作。如果 alice 在改 auth.py,bob 同时也在改 auth.py,文件就冲突了。

就像两个人同时编辑同一个文档,互相覆盖对方的修改。

二、Git Worktree 是什么

普通情况:一个 git 仓库只有一个工作目录。 Worktree:同一个仓库可以有多个工作目录,每个在不同的分支上。

项目根目录/
  src/                    ← 主工作目录
  .worktrees/
    auth-refactor/        ← 独立工作目录(alice 在这里改代码)
      src/                ← 主目录的完整副本
    fix-bug/              ← 另一个独立工作目录(bob 在这里)
      src/
    index.json            ← 工作树注册表
    events.jsonl          ← 生命周期事件日志

三、控制面 vs 执行面

s12 的核心设计思想是分离控制和执行

控制面(.tasks/)                    执行面(.worktrees/)
task_12.json                        auth-refactor/
  {                                   ← alice 在这里工作
    "id": 12,                         ← 独立的代码副本
    "subject": "重构认证模块",         ← 互不干扰
    "worktree": "auth-refactor"  ─────→
  }
​
task_13.json                        fix-bug/
  {                                   ← bob 在这里工作
    "id": 13,
    "subject": "修复登录 bug",
    "worktree": "fix-bug"  ──────────→
  }

四、WorktreeManager 核心操作

4.1 创建工作树
def create(self, name, task_id=None, base_ref="HEAD"):
    """
    执行: git worktree add -b wt/auth-refactor .worktrees/auth-refactor HEAD
    效果: 创建独立工作目录 + 新分支
    """
    path = self.dir / name
    branch = f"wt/{name}"
​
    self._run_git(["worktree", "add", "-b", branch, str(path), base_ref])
​
    # 注册到 index.json
    entry = {
        "name": name,
        "path": str(path),
        "branch": branch,
        "task_id": task_id,
        "status": "active",
    }
​
    # 绑定任务
    if task_id is not None:
        self.tasks.bind_worktree(task_id, name)
4.2 在工作树中执行命令
def run(self, name, command):
    """在指定工作树目录中执行命令(目录隔离的关键)"""
    wt = self._find(name)
    path = Path(wt["path"])
​
    r = subprocess.run(
        command, shell=True,
        cwd=path,           # ← 关键:在工作树目录中执行!
        capture_output=True, text=True, timeout=300,
    )
    return (r.stdout + r.stderr).strip()[:50000]
4.3 删除或保留
def remove(self, name, force=False, complete_task=False):
    """删除工作树,可选同时完成任务"""
    self._run_git(["worktree", "remove", wt["path"]])
    if complete_task and wt.get("task_id"):
        self.tasks.update(wt["task_id"], status="completed")
​
def keep(self, name):
    """标记为保留(不删除,留着以后用)"""
    item["status"] = "kept"

五、EventBus(事件日志)

class EventBus:
    def emit(self, event, task=None, worktree=None, error=None):
        """记录生命周期事件到 JSONL"""
        payload = {
            "event": event,            # 如 "worktree.create.after"
            "ts": time.time(),
            "task": task or {},
            "worktree": worktree or {},
        }
        with self.path.open("a") as f:
            f.write(json.dumps(payload) + "\n")

事件类型:

事件 触发时机
worktree.create.before 创建前
worktree.create.after 创建成功后
worktree.create.failed 创建失败
worktree.remove.before 删除前
worktree.remove.after 删除成功后
worktree.keep 标记保留
task.completed 任务完成

六、16 个工具

s12 拥有全系列最多的工具(16 个):

分类 工具
基础(4) bash, read_file, write_file, edit_file
任务(5) task_create, task_list, task_get, task_update, task_bind_worktree
工作树(6) worktree_create, worktree_list, worktree_status, worktree_run, worktree_keep, worktree_remove
事件(1) worktree_events

第二部分:生产级 Harness(zidonghua-test)

七、从教学到生产

zidonghua-test/core/harness.py 是一个真实的生产级 Harness,它把 s01-s05 的模式应用到了自动化测试场景。

7.1 架构对比

教学版本 生产版本
通用编程 agent 专注于自动化测试
bash/read/write/edit read_document/http_request/db_query/assert_check
自由探索 两阶段工作流(规划 → 执行)
无人工确认 submit_plan 暂停点
控制台输出 Markdown 报告生成

7.2 两阶段工作流

阶段1 - 规划
+------------------+     +------------------+     +------------------+
| read_document    | --> | 分析需求          | --> | submit_plan      |
| 读取需求文档      |     | 生成测试计划      |     | 人工确认暂停点    |
+------------------+     +------------------+     +------------------+
                                                         |
                                                    [用户确认]
                                                         |
阶段2 - 执行                                              v
+------------------+     +------------------+     +------------------+
| prepare_test_data| --> | http_request     | --> | db_query         |
| 准备测试数据      |     | 执行HTTP请求      |     | 查询数据库验证    |
+------------------+     +------------------+     +------------------+
                                                         |
                         +------------------+     +------v-----------+
                         | report_result    | <-- | assert_check     |
                         | 记录PASS/FAIL    |     | 断言验证          |
                         +------------------+     +------------------+

7.3 submit_plan:人工确认暂停点

def _handle_plan_pause(self, plan_input):
    """人工确认暂停点"""
    print("=" * 60)
    print("测试计划")
    for case in plan_input.get("test_cases", []):
        print(f"  {case.get('id')}: {case.get('name')}")
​
    while True:
        answer = input("确认执行?[Y/n/q] ").strip().lower()
        if answer in ("", "y", "yes"):
            return "用户已确认,请开始执行测试用例。"
        elif answer in ("n", "no", "q"):
            return "用户已取消,请停止执行。"

设计理念:Agent 自动生成测试计划,但在执行前必须经过人工审批。这是一种 "人在回路"(Human-in-the-Loop)的设计模式。

7.4 7 个专业工具

工具 功能 对应教学版本
read_document 读取需求文档 s02 read_file
submit_plan 提交测试计划 新增(暂停点)
http_request 调用测试服务 s01 bash
db_query 查询数据库 新增
assert_check 断言验证 新增
prepare_test_data 准备测试数据 新增
report_result 记录测试结果 新增

7.5 入口程序(main.py)

# @文件引用解析
message = resolve_at_references(user_input)
# 例如 "测试 @../docs/xxx.md" → 自动读取文件内容注入
​
# 运行 Harness
result = harness.run_chat(message)
​
# 生成报告
if result["results"]:
    report_path = reporter.generate(
        doc_path=doc_path,
        plan=result["plan"],
        results=result["results"],
        summary=result["summary"],
    )

第三部分:s_full 完整参考

八、全部组装

s_full.py 是 s01-s11 的集大成者,22 个工具:

+------------------------------------------------------------------+
|                        完整 Agent                                 |
|                                                                   |
|  每次调用 LLM 前执行:                                             |
|  +--------------------+  +------------------+  +--------------+   |
|  | 微压缩 + 自动压缩  |  | 清空后台通知      |  | 检查收件箱    |   |
|  | (s06)              |  | (s08)             |  | (s09)        |   |
|  +--------------------+  +------------------+  +--------------+   |
|                                                                   |
|  工具调度 (s02 模式):                                              |
|  bash | read | write | edit | TodoWrite | task | load_skill       |
|  compress | bg_run | bg_check | task_create | task_get            |
|  task_update | task_list | spawn_teammate | list_teammates         |
|  send_message | read_inbox | broadcast | shutdown_request          |
|  plan_approval | idle | claim_task                                 |
+------------------------------------------------------------------+

8.1 主循环中的三件前置处理

def agent_loop(messages):
    while True:
        # 1. 压缩流水线(s06)
        microcompact(messages)
        if estimate_tokens(messages) > TOKEN_THRESHOLD:
            messages[:] = auto_compact(messages)
​
        # 2. 后台通知注入(s08)
        notifs = BG.drain()
        if notifs:
            messages.append({"role": "user", "content": f"<background-results>..."})
​
        # 3. 收件箱注入(s09)
        inbox = BUS.read_inbox("lead")
        if inbox:
            messages.append({"role": "user", "content": f"<inbox>..."})
​
        # 调用 LLM
        response = client.messages.create(...)

九、全系列架构总览

s01  核心循环        while + tool_use + messages
 ↓
s02  工具调度        dispatch map {name: handler}
 ↓
s03  任务规划        TodoManager + 催促机制
 ↓
s04  子代理          上下文隔离 + 摘要返回
 ↓
s05  技能加载        两层机制(目录 + 按需加载)
 ↓
s06  上下文压缩      三层流水线(micro + auto + manual)
 ↓
s07  持久化任务      文件存储 + 依赖图
 ↓
s08  后台任务        线程 + 通知队列
 ↓
s09  团队协作        MessageBus + TeammateManager
 ↓
s10  协议            request_id 关联的握手
 ↓
s11  自主代理        空闲轮询 + 任务认领
 ↓
s12  工作树隔离      Git Worktree + EventBus

每一步都在前一步的基础上叠加一个新能力,核心循环始终不变。


十、设计原则总结

原则 体现
循环不变 s01-s12 的 agent_loop 结构一致
功能叠加 每个版本在前一版基础上加一个能力
工具即能力 新功能 = 新工具 + 新 handler
状态外化 任务、消息、配置都存文件,不依赖内存
消息注入 催促、后台结果、收件箱都通过注入消息实现
防递归 子代理/队友的工具集是父级的子集
渐进压缩 从轻量级(占位符)到重量级(LLM 摘要)
协议握手 request_id 关联请求和回复
目录隔离 Git Worktree 实现并行不冲突

十一、实战建议

从教学到生产的路径

  1. 理解核心循环(s01-s02)→ 这是一切的基础

  2. 加上规划能力(s03/s07)→ 让 agent 有计划地工作

  3. 加上压缩能力(s06)→ 让 agent 能长时间工作

  4. 加上子代理(s04)→ 处理复杂任务时保持清晰

  5. 定制专业工具(参考 zidonghua-test)→ 针对你的业务场景

你不需要全部

不是每个项目都需要团队协作(s09-s11)或工作树隔离(s12)。根据实际需求选择:

场景 推荐级别
简单的命令行助手 s01-s02
需要规划的复杂任务 + s03 或 s07
长时间运行的 agent + s06
需要并行探索的场景 + s04
需要专业领域知识 + s05
需要并行执行的耗时操作 + s08
多角色协作系统 + s09-s11
多任务并行开发 + s12

9-资料下载地址

通过网盘分享的文件:harness智能体.zip
链接: https://pan.baidu.com/s/11jr93x41ITQHh0y-BCln0Q?pwd=tyui 提取码: tyui

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐