使用Learn Harness Engineering开发agent
1、Learn Harness Engineering介绍
通过网盘分享的文件:学习 Harness Engineering
链接: https://pan.baidu.com/s/1Z9YVAvccnw5qY3azpxWnNw?pwd=tyui 提取码: tyui
2-核心循环与工具调度
一、什么是 Agent Harness?
Agent Harness(代理线束)是连接 LLM 和真实世界的"桥梁"。它的核心职责只有一件事:
循环调用 LLM,执行工具,把结果反馈给 LLM,直到 LLM 决定停止。
这个模式是所有 AI Agent 系统的基石。无论是 Claude Code、Cursor、Devin,还是任何 AI 编程助手,底层都是这个 while 循环。
核心公式
while stop_reason == "tool_use": response = LLM(messages, tools) execute tools append results
流程图
+----------+ +-------+ +---------+ | 用户 | ---> | LLM | ---> | 工具 | | 输入 | | | | 执行 | +----------+ +---+---+ +----+----+ ^ | | tool_result | +---------------+ (循环继续,直到模型决定停止)
二、s01:最简核心循环
2.1 关键组件
| 组件 | 作用 |
|---|---|
client |
Anthropic SDK 客户端,负责调用 LLM API |
MODEL |
模型 ID,如 claude-sonnet-4-20250514 |
SYSTEM |
系统提示词,告诉模型它的角色和规则 |
TOOLS |
工具定义列表,描述模型可以调用什么工具 |
messages |
完整的对话历史,跨轮次保持上下文 |
2.2 工具定义格式
工具定义是一个 JSON Schema,告诉 LLM "你有什么工具可用、每个工具接受什么参数":
TOOLS = [{
"name": "bash", # 工具名
"description": "Run a shell command.", # 描述(模型据此判断何时使用)
"input_schema": { # 参数 Schema
"type": "object",
"properties": {
"command": {"type": "string"} # 参数名和类型
},
"required": ["command"], # 必填参数
},
}]
重要理解:这个定义不是给人看的,是给 LLM 看的。LLM 读了这个"说明书"后,就知道可以调用名为 bash 的工具,传入一个 command 字符串参数。
2.3 核心循环代码解析
def agent_loop(messages: list):
while True:
# 第一步:调用 LLM
response = client.messages.create(
model=MODEL,
system=SYSTEM,
messages=messages,
tools=TOOLS,
max_tokens=8000,
)
# 第二步:保存 LLM 回复到消息历史
messages.append({"role": "assistant", "content": response.content})
# 第三步:检查是否还要继续
# stop_reason == "tool_use" → 模型想调用工具,继续
# stop_reason == "end_turn" → 模型说完了,退出
if response.stop_reason != "tool_use":
return
# 第四步:执行工具调用
results = []
for block in response.content:
if block.type == "tool_use":
output = run_bash(block.input["command"])
results.append({
"type": "tool_result",
"tool_use_id": block.id, # 必须匹配!
"content": output,
})
# 第五步:工具结果以 user 角色加入历史
messages.append({"role": "user", "content": results})
# → 回到 while True 开头
2.4 关键细节
-
tool_use_id必须匹配:API 用block.id来关联"哪个工具调用"对应"哪个结果"。如果 ID 不匹配,API 会报错。 -
tool_result必须以user角色发送:这是 Anthropic API 的规定。即使结果来自工具执行,也要包装成 user 消息。 -
response.content是一个列表:可能同时包含text块(文字回复)和tool_use块(工具调用请求)。 -
安全措施:
-
危险命令检测(
rm -rf /、sudo等) -
超时限制(120 秒)
-
输出长度限制(50000 字符,防止撑爆上下文)
-
三、s02:工具调度模式
3.1 从一个工具到多个工具
s01 只有 bash 一个工具,s02 扩展到了四个:
| 工具名 | 功能 | 典型用途 |
|---|---|---|
bash |
执行 shell 命令 | ls -la、git status |
read_file |
读取文件内容 | 查看代码、读配置 |
write_file |
写入文件 | 创建新文件 |
edit_file |
精确替换文本 | 修改代码中的某一处 |
3.2 工具调度表(Dispatch Map)
核心改进:用字典把工具名映射到处理函数,替代 if-elif 链:
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
}
使用时只需一行:
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
关键洞察:**block.input 把字典展开为关键字参数。比如 LLM 返回 {"command": "ls -la"},展开后就是 run_bash(command="ls -la")。
3.3 路径安全检查
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
为什么需要这个:LLM 可能试图访问 ../../etc/passwd 这样的路径。safe_path 确保所有文件操作都在工作目录内,防止越权。
3.4 循环没有变!
"The loop didn't change at all. I just added tools."
s01 到 s02,agent_loop 函数的结构完全一样!只是:
-
TOOLS列表从 1 个变成 4 个 -
工具执行从直接调用
run_bash变成查表调度
这就是 Harness 架构的优雅之处:核心循环是稳定的,所有扩展都是"往循环里加东西"。
四、消息历史的数据结构
理解消息历史的结构对于理解整个 Harness 至关重要:
messages = [
# 用户输入
{"role": "user", "content": "帮我看看当前目录有什么文件"},
# LLM 回复(包含工具调用请求)
{"role": "assistant", "content": [
TextBlock(type="text", text="好的,让我查看一下"),
ToolUseBlock(type="tool_use", id="toolu_123", name="bash",
input={"command": "ls -la"})
]},
# 工具执行结果
{"role": "user", "content": [
{"type": "tool_result", "tool_use_id": "toolu_123",
"content": "total 32\ndrwxr-xr-x 5 user ..."}
]},
# LLM 最终回复(没有工具调用,循环结束)
{"role": "assistant", "content": [
TextBlock(type="text", text="当前目录有 5 个文件...")
]},
]
消息交替规则:user → assistant → user → assistant,必须严格交替。
五、REPL(交互式命令行)
每个版本都有一个 REPL 入口,让你可以直接运行和测试:
if __name__ == "__main__":
history = [] # 跨轮次的消息历史
while True:
query = input("\033[36ms01 >> \033[0m") # 彩色提示符
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
agent_loop(history) # 运行 agent
# 打印模型回复
for block in history[-1]["content"]:
if hasattr(block, "text"):
print(block.text)
history 跨轮次保持:这意味着你和 agent 的对话是有上下文的。你可以说"把刚才那个文件改一下",agent 能理解"刚才那个"指的是什么。
六、实战练习
练习 1:运行 s01
cd test/agents python s01_agent_loop.py
试试输入:
-
帮我看看当前目录有什么文件 -
找出项目中所有 Python 文件 -
统计代码行数
观察 agent 是如何一步步调用 bash 工具来完成任务的。
练习 2:扩展 s02 的工具
尝试添加一个新工具 list_dir,功能是列出目录内容(不需要 shell):
-
写工具处理函数
-
加入
TOOL_HANDLERS -
加入
TOOLS定义 -
运行测试
练习 3:观察消息历史
在 agent_loop 中加一行 print(json.dumps(messages, default=str, indent=2)),观察每一轮循环后消息历史是如何增长的。
七、核心要点总结
| 概念 | 说明 |
|---|---|
| while 循环 | Agent 的心脏,不断调用 LLM 直到任务完成 |
| stop_reason | "tool_use" = 继续,"end_turn" = 停止 |
| tool_use_id | 关联请求和结果的唯一标识 |
| dispatch map | {工具名: 处理函数} 的路由表,优雅的扩展模式 |
| safe_path | 路径安全检查,防止越权访问 |
| messages | 完整的对话历史,跨轮次保持上下文 |
3-任务规划与进度追踪
一、为什么需要任务规划?
在 s01/s02 中,agent 只是"收到指令 → 执行",没有计划性。面对复杂任务时,它可能:
-
做到一半忘了总目标
-
在细节中迷失方向
-
用户不知道它在做什么、做到哪了
解决方案:让 agent 自己管理一个任务清单,规划步骤、跟踪进度。
二、s03:内存中的 TodoManager
2.1 设计思路
TodoManager 不是给人用的 todo app,而是给 LLM 用的"进度跟踪器":
+----------+ +-------+ +---------+ | User | ---> | LLM | ---> | Tools | | prompt | | | | + todo | +----------+ +---+---+ +----+----+ ^ | | tool_result | +---------------+ | +-----------+-----------+ | TodoManager state | | [ ] task A | <-- 待办 | [>] task B <- doing | <-- 正在做 | [x] task C | <-- 已完成 +-----------------------+
2.2 TodoManager 实现
class TodoManager:
def __init__(self):
self.items = [] # 存储所有任务
def update(self, items: list) -> str:
"""模型每次调用 todo 工具时,传入完整的任务列表来替换旧列表"""
validated = []
in_progress_count = 0
for item in items:
text = str(item.get("text", "")).strip()
status = str(item.get("status", "pending")).lower()
# 校验状态只能是三种
if status not in ("pending", "in_progress", "completed"):
raise ValueError(f"invalid status '{status}'")
if status == "in_progress":
in_progress_count += 1
validated.append({"id": item_id, "text": text, "status": status})
# 限制:同一时间只能有一个任务处于"进行中"
if in_progress_count > 1:
raise ValueError("Only one task can be in_progress at a time")
self.items = validated
return self.render()
2.3 关键设计决策
| 决策 | 原因 |
|---|---|
| 全量替换(不是增量更新) | 让模型每次传完整列表,避免增量同步的复杂性 |
| 最多 20 个任务 | 防止模型创建过多任务撑爆上下文 |
| 只允许 1 个 in_progress | 强制模型专注,不要同时做多件事 |
| 三种状态 | pending → in_progress → completed,简洁清晰 |
2.4 催促机制(Nudge)
这是 s03 最巧妙的设计:
def agent_loop(messages: list):
rounds_since_todo = 0 # 模型多久没更新 todo 了
while True:
response = client.messages.create(...)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
# 执行工具,检查是否用了 todo
used_todo = False
for block in response.content:
if block.type == "tool_use":
if block.name == "todo":
used_todo = True
# ... 执行工具
# 更新催促计数器
rounds_since_todo = 0 if used_todo else rounds_since_todo + 1
# 如果连续 3 轮没更新 todo,注入提醒
if rounds_since_todo >= 3:
results.insert(0, {
"type": "text",
"text": "<reminder>Update your todos.</reminder>"
})
工作原理:如果模型连续 3 轮都在调用工具但没有更新 todo,说明它可能忘了更新进度。这时自动注入一条 <reminder> 消息,模型看到后通常会去更新 todo。
为什么不用系统提示词催促? 因为系统提示词是静态的,每轮都一样。催促应该是动态的——只在需要时出现。
三、s07:持久化任务系统
3.1 s03 的局限
s03 把任务存在内存里(Python 对象)。问题:
-
上下文被压缩(s06)后,任务信息丢失
-
程序重启后,任务信息丢失
-
无法表达任务之间的依赖关系
3.2 文件存储
s07 把任务存成 JSON 文件:
.tasks/
task_1.json {"id":1, "subject":"...", "status":"completed", ...}
task_2.json {"id":2, "blockedBy":[1], "status":"pending", ...}
task_3.json {"id":3, "blockedBy":[2], "blocks":[], ...}
每个任务的完整结构:
{
"id": 1,
"subject": "写登录接口",
"description": "实现 JWT 认证的登录 API",
"status": "pending",
"blockedBy": [],
"blocks": [2, 3],
"owner": ""
}
3.3 依赖关系图
s07 最重要的新增能力是任务依赖:
+----------+ +----------+ +----------+ | task 1 | --> | task 2 | --> | task 3 | | 已完成 | | 被1阻塞 | | 被2阻塞 | +----------+ +----------+ +----------+ | ^ +--- 当 task 1 完成时,自动从 task 2 的 blockedBy 中移除
自动解除依赖
def update(self, task_id, status=None, ...):
task = self._load(task_id)
if status:
task["status"] = status
# 核心机制:完成任务时,自动解除其他任务对它的依赖
if status == "completed":
self._clear_dependency(task_id)
def _clear_dependency(self, completed_id):
"""从所有任务的 blockedBy 中移除已完成的任务 ID"""
for f in self.dir.glob("task_*.json"):
task = json.loads(f.read_text())
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
self._save(task)
示例:
-
task_2 的
blockedBy = [1] -
执行
update(task_id=1, status="completed") -
task_1 标记为 completed
-
task_2 的
blockedBy自动从[1]变成[] -
agent 看到 task_2 不再被阻塞,可以开始执行
3.4 四个任务工具
| 工具名 | 功能 | 使用场景 |
|---|---|---|
task_create |
创建任务 | 规划阶段 |
task_update |
更新状态/依赖 | 执行过程中 |
task_list |
列出所有任务 | 查看进度 |
task_get |
获取单个任务详情 | 开始工作前 |
3.5 显示格式
[x] #1: 搭建项目框架 [>] #2: 写登录接口 [ ] #3: 写测试 (blocked by: [2]) [ ] #4: 部署 (blocked by: [2, 3])
四、s03 vs s07 对比
| 维度 | s03 TodoManager | s07 TaskManager |
|---|---|---|
| 存储 | 内存(Python 对象) | 磁盘(JSON 文件) |
| 持久性 | 压缩/重启后丢失 | 永久保留 |
| 依赖关系 | 无 | 有(blockedBy/blocks) |
| 适用场景 | 短期简单任务 | 长期复杂项目 |
| 更新方式 | 全量替换 | 单个任务增量更新 |
| ID 管理 | 用户指定 | 自增 |
什么时候用哪个?
-
快速的临时任务列表 → s03 TodoManager
-
需要依赖管理的复杂项目 → s07 TaskManager
-
s_full.py 中两者并存:TodoWrite 用于快速清单,task_* 用于持久化管理
五、核心设计模式
5.1 状态外化(Externalize State)
"状态存在对话之外,就不怕对话被压缩。"
s03 的任务状态在对话消息里(内存),s07 的任务状态在文件系统中。即使对话被压缩到只剩一条摘要消息,agent 仍然可以通过 task_list 工具读取到所有任务的当前状态。
5.2 自我监督(Self-Monitoring)
通过催促机制,Harness 实现了对 agent 行为的监督:
-
不是通过复杂的逻辑判断
-
而是通过简单的计数 + 消息注入
-
LLM 收到
<reminder>后,自己就会去更新进度
这体现了 Harness 的设计哲学:用简单的机制引导复杂的行为。
六、实战练习
练习 1:运行 s03,观察催促机制
python s03_todo_write.py
输入一个复杂任务(如"分析所有 Python 文件的代码质量"),观察:
-
模型是否会主动创建 todo 列表?
-
连续几轮后是否会收到催促?
-
催促后模型是否会更新 todo?
练习 2:运行 s07,测试依赖关系
python s07_task_system.py
输入:"帮我创建一个有依赖关系的任务计划:先搭建框架,然后写登录,最后写测试。测试依赖登录,登录依赖框架。"
观察 agent 如何:
-
创建三个任务
-
设置依赖关系
-
按依赖顺序执行
-
完成一个任务后自动解锁下一个
练习 3:把 s03 的催促机制加到 s07
s07 没有催促机制。尝试把 s03 的 rounds_since_todo 逻辑移植到 s07,当模型连续 3 轮没有调用 task_update 时注入提醒。
七、核心要点总结
| 概念 | 说明 |
|---|---|
| TodoManager | 内存中的轻量级任务清单,全量替换 |
| TaskManager | 磁盘上的持久化任务系统,带依赖图 |
| 催促机制 | 计数器 + 消息注入,引导 LLM 更新进度 |
| 依赖自动解除 | 任务完成时,自动从其他任务的 blockedBy 中移除 |
| 状态外化 | 把状态存到对话之外(文件),防止压缩丢失 |
4-子代理与上下文隔离
一、问题:上下文污染
当 agent 处理复杂任务时,消息历史会越来越长。比如让 agent "分析项目结构并重构":
-
探索阶段:调用 20 次 bash/read_file 来了解项目
-
重构阶段:需要根据分析结果修改代码
问题是,探索阶段产生的大量输出(文件列表、代码内容、目录结构)会塞满上下文窗口,导致后续重构时 agent "记不住"重要的分析结论。
这就像你的桌子上堆满了调研材料,已经没地方放电脑写代码了。
二、解决方案:子代理
把"探索"这种子任务交给一个全新的子代理去做:
-
子代理有自己独立的、干净的消息历史
-
做完后只返回一个摘要给父代理
-
父代理的上下文保持干净
架构图
父代理(Parent) 子代理(Subagent) +------------------+ +------------------+ | messages=[...] | | messages=[] | <-- 全新的空历史 | | 派遣任务 | | | tool: task | ----------> | while tool_use: | | prompt="..." | | 调用工具 | | description="" | | 收集结果 | | | 只返回摘要 | | | result = "..." | <---------- | return 最后的文字 | +------------------+ +------------------+ | 父代理的上下文保持干净。 子代理的上下文在完成后被丢弃。
类比理解
就像公司里老板(父代理)把调研任务交给实习生(子代理),实习生自己去查资料、跑数据,最后只交一份报告给老板。老板不需要知道实习生中间查了哪些网页、试了哪些方法。
三、核心实现
3.1 子代理执行函数
def run_subagent(prompt: str) -> str:
"""
启动一个子代理来执行任务。
关键特点:
1. messages=[] —— 全新的消息历史,不继承父代理的任何上下文
2. 只有基础工具 —— 不能再派遣子代理(防止递归)
3. 只返回摘要 —— 子代理的完整对话历史在函数结束后被丢弃
"""
# 全新的消息历史!这就是"上下文隔离"的关键
sub_messages = [{"role": "user", "content": prompt}]
# 安全限制:最多循环 30 轮
for _ in range(30):
response = client.messages.create(
model=MODEL,
system=SUBAGENT_SYSTEM,
messages=sub_messages,
tools=CHILD_TOOLS, # 只有基础工具,没有 task 工具
max_tokens=8000,
)
sub_messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
break
# 执行工具,收集结果
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler else f"Unknown"
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output)[:50000],
})
sub_messages.append({"role": "user", "content": results})
# 只返回最后一轮的文字(摘要)
# sub_messages(完整对话)在这里被丢弃
return "".join(
b.text for b in response.content if hasattr(b, "text")
) or "(no summary)"
3.2 防递归设计
# 子代理的工具列表:只有基础工具 CHILD_TOOLS = [bash, read_file, write_file, edit_file] # 没有 task 工具! # 父代理的工具列表:基础工具 + task 工具 PARENT_TOOLS = CHILD_TOOLS + [task] # 可以派遣子代理
为什么子代理没有 task 工具?
防止无限递归:
父 → 子 → 孙 → 曾孙 → ... (永远不停!)
子代理没有 task 工具,就不能再派遣子代理,递归自然终止。
3.3 父代理的工具调用分支
def agent_loop(messages: list):
while True:
response = client.messages.create(...)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
if block.name == "task":
# 启动子代理!
desc = block.input.get("description", "subtask")
print(f"> task ({desc}): {block.input['prompt'][:80]}")
output = run_subagent(block.input["prompt"])
else:
# 普通工具直接执行
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input)
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
messages.append({"role": "user", "content": results})
四、上下文隔离的效果
不用子代理(s01/s02)
messages 长度随时间增长: 轮次 1: [user, assistant, user] → 3 条 轮次 5: [user, assistant, user, ..., assistant, user] → 11 条 轮次 20: 超过 40 条消息,上下文快满了
使用子代理(s04)
父代理的 messages 保持精简: [user, assistant(task调用), user(摘要结果)] → 始终只有几条 子代理的 sub_messages 做完就丢弃: [user(任务), assistant, user(工具结果), ..., assistant(摘要)] ↑ 完成后全部丢弃,只保留最后的摘要文字
五、s_full.py 中的增强子代理
在完整版本中,子代理增加了 agent_type 参数:
def run_subagent(prompt: str, agent_type: str = "Explore") -> str: sub_tools = [bash, read_file] # 基础:只读 # general-purpose 类型的子代理可以修改文件 if agent_type != "Explore": sub_tools += [write_file, edit_file]
| 类型 | 工具 | 用途 |
|---|---|---|
Explore |
bash + read_file | 只读探索(安全) |
general-purpose |
全部基础工具 | 需要修改文件的子任务 |
六、子代理的设计原则
6.1 输入明确
给子代理的 prompt 应该是一个清晰、自包含的任务描述:
# 好的 prompt "列出项目中所有 Python 文件,统计每个文件的行数,并按行数排序" # 不好的 prompt "看看那个文件" # 子代理不知道"那个文件"是哪个
6.2 输出是摘要
子代理的返回值是最后一轮回复中的文字。系统提示词告诉它要"总结发现":
SUBAGENT_SYSTEM = f"You are a coding subagent at {WORKDIR}. "
f"Complete the given task, then summarize your findings."
6.3 共享文件系统
子代理和父代理在同一个工作目录下操作。它们不共享对话历史,但共享文件系统。这意味着:
-
子代理创建的文件,父代理可以看到
-
子代理修改的文件,父代理读到的是修改后的版本
-
文件系统是它们之间的"持久化通信通道"
七、实战练习
练习 1:观察上下文隔离
python s04_subagent.py
输入:"先帮我调研这个项目中有哪些 Python 文件和它们的功能,然后基于调研结果,给我一个重构建议。"
观察:
-
agent 是否使用了 task 工具来派遣子代理?
-
子代理的输出是否只有摘要?
-
父代理收到摘要后,是否能给出合理的建议?
练习 2:添加子代理轮次限制
当前子代理最多循环 30 轮。尝试:
-
把限制改为 5 轮,观察子代理是否能在有限轮次内完成任务
-
如果不能完成,子代理的"摘要"是什么样的?
练习 3:多层子代理
取消防递归限制(给子代理也加上 task 工具),观察会发生什么。
-
子代理是否会再次派遣子子代理?
-
如果会,什么情况下会停止?
-
这样做有什么风险?
八、核心要点总结
| 概念 | 说明 |
|---|---|
| 上下文隔离 | 子代理有独立的消息历史,不污染父代理 |
| 摘要返回 | 子代理只返回文字摘要,完整对话被丢弃 |
| 防递归 | 子代理没有 task 工具,不能再派遣子代理 |
| 共享文件系统 | 上下文隔离,文件系统不隔离 |
| agent_type | 区分只读探索和读写操作 |
核心公式:子代理 = 新消息历史 + 受限工具 + 摘要返回
5-技能加载与上下文压缩
一、s05:技能按需加载
1.1 问题
LLM 不可能什么都懂。比如让它处理 PDF,它可能不知道最佳实践。你可以把专业知识写成 "技能文件",让 LLM 需要时自己去加载。
但如果把所有技能的完整内容都塞进系统提示词:
-
10 个技能 × 2000 token = 20000 token,太浪费了
-
大部分时候只需要其中一两个
1.2 两层加载机制
第一层(便宜):只在系统提示词里放技能的名称和简介(~100 token/技能) → 模型知道"有哪些技能可用" 第二层(按需):模型调用 load_skill 工具时,才把完整内容返回 → 只在真正需要时加载 类比: 第一层 = 书的目录(告诉你有哪些章节) 第二层 = 翻到那一章去读(只在需要时才翻开)
1.3 技能文件结构
skills/ pdf/ SKILL.md <-- 技能文件 code-review/ SKILL.md
SKILL.md 格式(YAML 前置元数据 + Markdown 正文):
--- name: pdf description: Process PDF files using Python tags: file-processing --- ## 如何处理 PDF 第一步:安装 PyPDF2 ... 第二步:...
1.4 SkillLoader 核心代码
class SkillLoader:
def __init__(self, skills_dir: Path):
self.skills = {}
self._load_all() # 启动时一次性扫描
def _load_all(self):
"""扫描 skills/ 目录下所有 SKILL.md"""
for f in sorted(self.skills_dir.rglob("SKILL.md")):
text = f.read_text()
meta, body = self._parse_frontmatter(text)
name = meta.get("name", f.parent.name)
self.skills[name] = {"meta": meta, "body": body}
def get_descriptions(self) -> str:
"""第一层:生成技能简介,嵌入系统提示词"""
# 输出: - pdf: Process PDF files [file-processing]
lines = []
for name, skill in self.skills.items():
desc = skill["meta"].get("description", "No description")
lines.append(f" - {name}: {desc}")
return "\n".join(lines)
def get_content(self, name: str) -> str:
"""第二层:返回完整技能内容"""
skill = self.skills.get(name)
if not skill:
return f"Error: Unknown skill '{name}'"
return f'<skill name="{name}">\n{skill["body"]}\n</skill>'
1.5 系统提示词嵌入
SYSTEM = f"""You are a coding agent at {WORKDIR}.
Use load_skill to access specialized knowledge before tackling unfamiliar topics.
Skills available:
{SKILL_LOADER.get_descriptions()}"""
模型看到的效果:
You are a coding agent at /workspace. Use load_skill to access specialized knowledge. Skills available: - pdf: Process PDF files using Python [file-processing] - code-review: Review code quality
当模型遇到 PDF 相关任务时,它会先调用 load_skill("pdf"),获取完整的处理指南,然后按照指南操作。
二、s06:三层上下文压缩
2.1 问题
LLM 的上下文窗口有限(如 200k token)。agent 工作久了,消息历史会越来越长。怎么让 agent "永远"工作下去?
答案:有策略地遗忘。
2.2 三层压缩流水线
每一轮对话都经过: [第一层: micro_compact] (每轮静默执行) 把 3 轮之前的工具结果替换为占位符 → 省掉大量旧的命令输出 [检查: token 数 > 50000?] | | 否 是 | | 继续执行 [第二层: auto_compact] (自动触发) 保存完整对话到磁盘 让 LLM 生成摘要 用摘要替换所有消息 [第三层: compact 工具] (模型主动触发) 模型调用 compact 工具主动压缩 效果和第二层一样 类比: 第一层 = 做笔记时只详细记最近 3 页,之前的只写"参考了 XX" 第二层 = 笔记本快写满了,总结成一页摘要,翻开新本子 第三层 = 你自己觉得笔记太乱了,主动做一次整理
2.3 第一层:micro_compact
def micro_compact(messages: list) -> list:
"""把旧的工具结果替换为简短占位符,只保留最近 3 次"""
# 收集所有 tool_result 的位置
tool_results = []
for msg_idx, msg in enumerate(messages):
if msg["role"] == "user" and isinstance(msg.get("content"), list):
for part_idx, part in enumerate(msg["content"]):
if isinstance(part, dict) and part.get("type") == "tool_result":
tool_results.append((msg_idx, part_idx, part))
# 不超过 3 个就不压缩
if len(tool_results) <= KEEP_RECENT:
return messages
# 建立 tool_use_id → tool_name 的映射
tool_name_map = {}
for msg in messages:
if msg["role"] == "assistant":
for block in msg.get("content", []):
if hasattr(block, "type") and block.type == "tool_use":
tool_name_map[block.id] = block.name
# 压缩旧的(保留最近 3 个)
to_clear = tool_results[:-KEEP_RECENT]
for _, _, result in to_clear:
if isinstance(result.get("content"), str) and len(result["content"]) > 100:
tool_id = result.get("tool_use_id", "")
tool_name = tool_name_map.get(tool_id, "unknown")
result["content"] = f"[Previous: used {tool_name}]"
return messages
效果:
压缩前:
tool_result: "a.py\nb.py\nc.py\n..." (2000 字符) tool_result: "150 lines in a.py" (50 字符) tool_result: "整个 a.py 的内容..." (5000 字符) ← 保留 tool_result: "整个 b.py 的内容..." (3000 字符) ← 保留 tool_result: "test output: 5 passed" (100 字符) ← 保留
压缩后:
tool_result: "[Previous: used bash]" (22 字符) ← 压缩了! tool_result: "[Previous: used bash]" (22 字符) ← 压缩了! tool_result: "整个 a.py 的内容..." (5000 字符) ← 保留 tool_result: "整个 b.py 的内容..." (3000 字符) ← 保留 tool_result: "test output: 5 passed" (100 字符) ← 保留
为什么安全? 旧的工具输出通常已经"被模型消化了"——模型已经根据输出做了决策。一个占位符足够让模型知道"之前做过什么"。
2.4 第二层:auto_compact
def auto_compact(messages: list) -> list:
"""完整的上下文压缩"""
# 1. 保存完整对话到磁盘(备份)
TRANSCRIPT_DIR.mkdir(exist_ok=True)
transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
with open(transcript_path, "w") as f:
for msg in messages:
f.write(json.dumps(msg, default=str) + "\n")
# 2. 让 LLM 生成摘要
conversation_text = json.dumps(messages, default=str)[:80000]
response = client.messages.create(
model=MODEL,
messages=[{"role": "user", "content":
"Summarize this conversation for continuity. Include: "
"1) What was accomplished, 2) Current state, 3) Key decisions made. "
"Be concise but preserve critical details.\n\n" + conversation_text}],
max_tokens=2000,
)
summary = response.content[0].text
# 3. 用摘要替换所有消息
return [
{"role": "user", "content":
f"[Conversation compressed. Transcript: {transcript_path}]\n\n{summary}"},
{"role": "assistant", "content":
"Understood. I have the context from the summary. Continuing."},
]
关键点:
-
完整对话保存到磁盘(JSONL 格式),不会永久丢失
-
摘要由 LLM 自己生成,它知道什么信息最重要
-
压缩后只剩 2 条消息(user + assistant),从 50000+ token 变成 ~1000 token
2.5 第三层:模型主动触发
# 在 agent_loop 中
for block in response.content:
if block.type == "tool_use":
if block.name == "compact":
manual_compact = True # 标记
output = "Compressing..."
# 工具调用处理完后
if manual_compact:
print("[manual compact]")
messages[:] = auto_compact(messages) # 原地替换
messages[:] = ... 的技巧:不能写 messages = auto_compact(messages),因为那只是改了局部变量。用 [:] 才能真正修改外部传入的 list 对象(原地替换)。
2.6 在主循环中的集成
def agent_loop(messages: list): while True: # ★ 第一层:每轮都执行 micro_compact(messages) # ★ 第二层:超限自动触发 if estimate_tokens(messages) > THRESHOLD: messages[:] = auto_compact(messages) # 调用 LLM response = client.messages.create(...) # ... 执行工具 ... # ★ 第三层:模型主动触发 if manual_compact: messages[:] = auto_compact(messages)
2.7 Token 估算
def estimate_tokens(messages: list) -> int: """经验法则:平均每 4 个字符约等于 1 个 token""" return len(str(messages)) // 4
不需要精确,只要能判断"是否快超限了"就行。
三、两个机制的协同
s05 和 s06 解决的是相反的问题:
-
s05(技能加载):按需增加上下文中的知识
-
s06(上下文压缩):有策略地减少上下文中的信息
它们共同让 agent 的上下文窗口保持在一个健康的范围内:
-
需要专业知识时 → 加载技能(增加)
-
历史信息太多时 → 压缩清理(减少)
四、实战练习
练习 1:创建一个技能
在项目中创建 skills/python-testing/SKILL.md:
--- name: python-testing description: Best practices for Python unit testing tags: testing --- ## Python 测试最佳实践 1. 使用 pytest 框架 2. 测试文件命名为 test_*.py ...
运行 s05,看模型是否能发现并使用这个技能。
练习 2:观察压缩效果
在 s06 的 agent_loop 开头加上 token 计数打印:
print(f"[tokens: {estimate_tokens(messages)}]")
给 agent 一个复杂任务,观察 token 数如何增长、何时触发压缩、压缩后变成多少。
练习 3:调整压缩参数
-
把
KEEP_RECENT从 3 改为 1,观察 agent 是否因为丢失太多信息而困惑 -
把
THRESHOLD从 50000 改为 10000,观察频繁压缩的效果
五、核心要点总结
| 概念 | 说明 |
|---|---|
| 两层技能加载 | 简介在系统提示词,完整内容按需加载 |
| 三层压缩 | micro(每轮)→ auto(超限)→ manual(主动) |
| 对话备份 | 压缩前保存到磁盘,信息不永久丢失 |
| LLM 自摘要 | 让 LLM 自己决定什么信息最重要 |
| 原地替换 | messages[:] = ... 修改外部列表 |
6-后台任务与异步执行
一、问题:阻塞等待
之前所有命令都是"阻塞式"的——模型调用 bash 后必须等命令跑完才能继续。如果命令要跑 5 分钟(比如跑测试、编译大项目),模型就干等着。
阻塞式执行: Agent ----[调用bash]----等待5分钟----[拿到结果]----继续工作 ↑ 浪费时间
解决方案
让耗时命令在后台线程中运行,模型立即拿到 task_id,可以继续做别的事。
后台执行: Agent ----[启动A]----[启动B]----[做其他事]----[读取结果]---- | | v v [A在跑] [B在跑] (并行!) | | +-- 通知队列 --> [结果注入到下一轮对话]
类比
阻塞式(bash) → 你去餐厅点菜,站在厨房门口等菜做好才离开 后台式(bg_run) → 你点完菜拿个号,去逛街,手机响了再回来取
二、BackgroundManager 实现
2.1 核心数据结构
class BackgroundManager:
def __init__(self):
self.tasks = {} # task_id → {status, result, command}
self._notification_queue = [] # 已完成任务的通知
self._lock = threading.Lock() # 保护通知队列的并发安全
| 组件 | 作用 |
|---|---|
tasks |
所有任务的状态和结果 |
_notification_queue |
已完成的结果,等待被主线程读取 |
_lock |
线程锁,防止主线程和后台线程同时操作队列 |
2.2 启动后台任务
def run(self, command: str) -> str:
"""启动后台任务,立即返回 task_id"""
task_id = str(uuid.uuid4())[:8] # 如 "a3f2b1c4"
self.tasks[task_id] = {"status": "running", "result": None, "command": command}
# daemon=True → 主程序退出时自动终止
thread = threading.Thread(
target=self._execute,
args=(task_id, command),
daemon=True,
)
thread.start()
# 立即返回!不等执行完
return f"Background task {task_id} started: {command[:80]}"
2.3 后台执行线程
def _execute(self, task_id: str, command: str):
"""在后台线程中运行命令"""
try:
r = subprocess.run(
command, shell=True, cwd=WORKDIR,
capture_output=True, text=True,
timeout=300, # 后台任务超时更长:5 分钟
)
output = (r.stdout + r.stderr).strip()[:50000]
status = "completed"
except subprocess.TimeoutExpired:
output = "Error: Timeout (300s)"
status = "timeout"
# 更新任务状态
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output
# 放入通知队列(需要加锁!)
with self._lock:
self._notification_queue.append({
"task_id": task_id,
"status": status,
"command": command[:80],
"result": (output or "(no output)")[:500],
})
2.4 清空通知队列
def drain_notifications(self) -> list: """清空并返回所有已完成的通知""" with self._lock: notifs = list(self._notification_queue) # 复制 self._notification_queue.clear() # 清空 return notifs
"drain"(排空)命名:就像把水池里的水全放掉——读完就清空,下次不会重复读取。
三、通知注入机制
3.1 在 agent_loop 中注入
def agent_loop(messages: list):
while True:
# ★ 核心:每次调用 LLM 前,检查后台有没有任务完成了
notifs = BG.drain_notifications()
if notifs and messages:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
)
# 以 user 消息的形式注入
messages.append({
"role": "user",
"content": f"<background-results>\n{notif_text}\n</background-results>",
})
# 保持消息交替
messages.append({
"role": "assistant",
"content": "Noted background results.",
})
# 调用 LLM(此时模型能"看到"后台任务的结果)
response = client.messages.create(...)
3.2 时间线示例
时刻 0: 用户 → "帮我跑测试并同时修 bug"
时刻 1: Agent → background_run("pytest tests/") → 返回 task_id=abc
时刻 2: Agent → bash("sed -i 's/old/new/' bug.py") → 修 bug
时刻 3: Agent → bash("git diff") → 查看修改
时刻 4: [后台] pytest 完成,结果放入通知队列
时刻 5: [agent_loop] 调用 LLM 前发现通知 → 注入结果
时刻 6: Agent 看到测试结果 → "5 tests passed, 1 failed"
时刻 7: Agent → bash("cat tests/test_auth.py") → 检查失败的测试
四、阻塞 vs 后台的选择
| 场景 | 建议 | 原因 |
|---|---|---|
ls -la |
bash(阻塞) | 瞬间完成,不需要后台 |
cat file.py |
bash(阻塞) | 读文件很快 |
pytest |
background_run | 可能跑几分钟 |
npm install |
background_run | 安装依赖耗时 |
docker build |
background_run | 构建镜像很慢 |
git status |
bash(阻塞) | 几乎瞬间完成 |
原则:快速命令用 bash(阻塞),耗时命令用 background_run。
五、线程安全
为什么需要锁?
主线程(agent_loop) 后台线程(_execute) | drain_notifications() 同时写入 _notification_queue | | | ← 竞态条件!→ | v v
如果不加锁,可能发生:
-
主线程正在读队列
-
后台线程同时往队列里写
-
数据损坏或丢失
s08 的方案:Lock
self._lock = threading.Lock() # 写入时加锁 with self._lock: self._notification_queue.append(...) # 读取时加锁 with self._lock: notifs = list(self._notification_queue) self._notification_queue.clear()
s_full 的优化方案:Queue
from queue import Queue self.notifications = Queue() # 线程安全的队列,无需手动加锁 # 写入(线程安全) self.notifications.put(notif) # 读取(线程安全) while not self.notifications.empty(): notifs.append(self.notifications.get_nowait())
Queue 内部已经处理了线程安全,代码更简洁。
六、实战练习
练习 1:运行 s08
python s08_background_tasks.py
输入:"帮我在后台跑一个慢命令 sleep 10 && echo done,同时查看当前目录文件"
观察:
-
agent 是否用了
background_run? -
在等待期间是否继续做其他事?
-
后台任务完成后结果是否被自动注入?
练习 2:手动查询后台状态
运行 s08 后,输入:"检查一下后台任务的状态"
观察 agent 是否使用 check_background 工具。
练习 3:并发后台任务
输入:"同时在后台启动三个命令",观察多个后台任务是否能正确并行。
七、核心要点总结
| 概念 | 说明 |
|---|---|
| background_run | 在后台线程启动命令,立即返回 task_id |
| 通知队列 | 后台完成的结果暂存,主线程下一轮读取 |
| drain 模式 | 读完即清空,同一结果不会读两次 |
| 线程锁 | 保护共享数据的并发安全 |
| 消息注入 | 后台结果以 <background-results> 标签注入对话 |
核心公式:后台执行 + 通知队列 + 调用前注入 = 异步 Agent
7-多代理团队协作
一、从子代理到团队
| 版本 | 模式 | 类比 |
|---|---|---|
| s04 子代理 | 派遣 → 执行 → 返回摘要 → 销毁 | 叫外卖(下单 → 等送到 → 骑手走了) |
| s09 队友 | 派遣 → 工作 → 空闲 → 工作 → ... → 关闭 | 组建团队(招人 → 分工 → 互相沟通) |
s04 的子代理是一次性的,s09 的队友是持久的——它们在自己的线程中长期运行,通过文件系统的"收件箱"互相通信。
二、s09:基础团队
2.1 架构
.team/config.json .team/inbox/
+----------------------------+ +------------------+
| {"team_name": "default", | | alice.jsonl | ← alice 的收件箱
| "members": [ | | bob.jsonl | ← bob 的收件箱
| {"name":"alice", | | lead.jsonl | ← 领导的收件箱
| "role":"coder", | +------------------+
| "status":"idle"}
| ]}
+----------------------------+
线程结构:
主线程: 领导(lead) 工作线程: alice 工作线程: bob
+------------------+ +------------------+ +------------------+
| agent_loop | | agent_loop | | agent_loop |
| 派遣任务 | | 执行工具 | | 等待消息 |
| 收发消息 | | 收发消息 | | 执行工具 |
+------------------+ +------------------+ +------------------+
↕ 消息通过文件收件箱传递 ↕ ↕
2.2 MessageBus(消息总线)
每个队友有一个 .jsonl 文件作为收件箱(每行一个 JSON):
class MessageBus:
def send(self, sender, to, content, msg_type="message", extra=None):
"""往对方的 .jsonl 文件末尾追加一行"""
msg = {
"type": msg_type,
"from": sender,
"content": content,
"timestamp": time.time(),
}
with open(self.dir / f"{to}.jsonl", "a") as f:
f.write(json.dumps(msg) + "\n")
def read_inbox(self, name):
"""读取并清空收件箱(drain 模式)"""
inbox_path = self.dir / f"{name}.jsonl"
messages = [json.loads(l) for l in inbox_path.read_text().splitlines() if l]
inbox_path.write_text("") # 清空
return messages
为什么用文件而不用内存?
-
文件天然持久化(程序重启后消息还在)
-
不需要复杂的进程间通信
-
JSONL 格式简单、易追踪调试
2.3 领导 vs 队友的工具差异
| 工具 | 领导 | 队友 |
|---|---|---|
| bash, read_file, write_file, edit_file | 有 | 有 |
| send_message | 有 | 有 |
| read_inbox | 有 | 有 |
| spawn_teammate | 有 | 无 |
| list_teammates | 有 | 无 |
| broadcast | 有 | 无 |
关键设计:队友不能创建新队友(类似 s04 子代理不能再派遣子代理),只有领导有这个权限。
2.4 5 种消息类型
VALID_MSG_TYPES = {
"message", # 普通文本消息
"broadcast", # 广播给所有人
"shutdown_request", # 请求关闭(s10)
"shutdown_response", # 关闭回复(s10)
"plan_approval_response", # 方案审批回复(s10)
}
三、s10:团队协议
3.1 问题
s09 的队友之间只能发普通消息,没有"规矩":
-
领导想关闭一个队友,直接 kill 线程?太粗暴了
-
队友想做大改动,直接就做了?万一做错了怎么办?
3.2 关闭协议(Shutdown Protocol)
领导 队友
+---------------------+ +---------------------+
| shutdown_request | | |
| { | -------> | 收到关闭请求 |
| request_id: abc | | 自己决定:同意吗? |
| } | | |
+---------------------+ +---------------------+
|
+---------------------+ +-------v-------------+
| shutdown_response | <------- | shutdown_response |
| { | | { |
| request_id: abc | | request_id: abc | ← 同一个 ID
| approve: true | | approve: true |
| } | | } |
+---------------------+ +---------------------+
|
v
状态 → "shutdown",线程停止
3.3 方案审批协议(Plan Approval Protocol)
队友 领导
+---------------------+ +---------------------+
| plan_approval | | |
| {plan: "重构方案..."}| -------> | 审阅方案内容 |
+---------------------+ | 通过还是拒绝? |
+---------------------+
|
+---------------------+ +-------v-------------+
| plan_approval_resp | <------- | plan_approval |
| {approve: true} | | {request_id: abc, |
+---------------------+ | approve: true} |
+---------------------+
3.4 核心模式:request_id 关联
两个协议共用同一个模式:
# 发起方生成唯一 ID
req_id = str(uuid.uuid4())[:8]
# 在追踪器中记录(状态为 pending)
shutdown_requests[req_id] = {"target": teammate, "status": "pending"}
# 发送请求
BUS.send("lead", teammate, "Please shut down.",
"shutdown_request", {"request_id": req_id})
# 接收方回复时带上同一个 ID
BUS.send(sender, "lead", reason,
"shutdown_response", {"request_id": req_id, "approve": True})
类比:
-
关闭协议 = 公司裁员流程(先协商,不是直接开除)
-
方案审批 = 项目审批流程(先提方案,等批准再做)
-
request_id = 工单编号(追踪从发起到完成的全过程)
四、s11:自主代理
4.1 问题
s09/s10 的队友做完当前任务就变 idle,线程结束。有新活还得重新 spawn。
4.2 自主生命周期
+-------+ | spawn | 创建并启动 +---+---+ | v +-------+ tool_use +-------+ | 工作 | <----------- | LLM | 正常的工具调用循环 +---+---+ +-------+ | | stop_reason != tool_use(或调用了 idle 工具) v +--------+ | 空闲 | 每 5 秒轮询一次,最多等 60 秒 +---+----+ | +---> 检查收件箱 → 有消息? → 回到工作状态 | +---> 扫描 .tasks/ → 有未认领的任务? → 认领 → 回到工作 | +---> 超时(60秒) → 关闭线程
4.3 核心代码:外层 while True
def _loop(self, name, role, prompt):
messages = [{"role": "user", "content": prompt}]
while True: # ★ 外层循环:工作和空闲交替
# === 工作阶段 ===
for _ in range(50):
# 检查收件箱
inbox = BUS.read_inbox(name)
for msg in inbox:
if msg.get("type") == "shutdown_request":
return # 收到关闭请求,直接退出
response = client.messages.create(...)
if response.stop_reason != "tool_use":
break # 当前任务完成
# 执行工具...
if idle_requested:
break # 模型主动要求空闲
# === 空闲阶段 ===
self._set_status(name, "idle")
resume = False
for _ in range(IDLE_TIMEOUT // POLL_INTERVAL): # 60秒/5秒 = 12次
time.sleep(POLL_INTERVAL)
# 优先级 1:检查收件箱
inbox = BUS.read_inbox(name)
if inbox:
resume = True
break
# 优先级 2:扫描任务板
unclaimed = scan_unclaimed_tasks()
if unclaimed:
task = unclaimed[0]
claim_task(task["id"], name) # 认领
resume = True
break
if not resume:
return # 60 秒没找到新活,关闭
self._set_status(name, "working") # 回到工作
4.4 任务认领与竞态防护
_claim_lock = threading.Lock() def claim_task(task_id, owner): """加锁认领,防止两个队友同时认领同一个任务""" with _claim_lock: task = json.loads(path.read_text()) task["owner"] = owner task["status"] = "in_progress" path.write_text(json.dumps(task, indent=2))
4.5 身份重注入
当消息历史被压缩到很短(≤3 条)时,队友可能"忘了自己是谁":
if len(messages) <= 3:
messages.insert(0, {
"role": "user",
"content": f"<identity>You are '{name}', role: {role}, "
f"team: {team_name}. Continue your work.</identity>"
})
messages.insert(1, {
"role": "assistant",
"content": f"I am {name}. Continuing."
})
五、演进总结
s04: 一次性子代理 → 派遣 → 做完 → 销毁 s09: 持久化队友 → 派遣 → 工作 → 空闲 → 再工作 s10: + 协议 → 有规矩的协作(关闭协议、方案审批) s11: + 自主能力 → 自己找活干,不需要分配
| 能力 | s04 | s09 | s10 | s11 |
|---|---|---|---|---|
| 工具调用 | 有 | 有 | 有 | 有 |
| 持久化 | 无 | 有 | 有 | 有 |
| 收件箱通信 | 无 | 有 | 有 | 有 |
| 关闭协议 | 无 | 无 | 有 | 有 |
| 方案审批 | 无 | 无 | 有 | 有 |
| 自动认领 | 无 | 无 | 无 | 有 |
| 空闲轮询 | 无 | 无 | 无 | 有 |
| 身份重注入 | 无 | 无 | 无 | 有 |
六、实战练习
练习 1:运行 s09,组建团队
python s09_agent_teams.py
输入:"创建两个队友:alice 负责写代码,bob 负责写测试。让 alice 先写一个简单的 Python 函数,然后让 bob 为它写测试。"
特殊命令:
-
/team— 查看团队状态 -
/inbox— 查看领导收件箱
练习 2:观察协议交互
python s10_team_protocols.py
创建一个队友,然后用 shutdown_request 请求关闭。观察协议交互过程。
练习 3:测试自主认领
python s11_autonomous_agents.py
-
先创建几个任务(通过 task_create)
-
再创建一个自主队友
-
用
/tasks查看任务被自动认领的过程
七、核心要点总结
| 概念 | 说明 |
|---|---|
| MessageBus | 基于 JSONL 文件的消息总线,drain 模式 |
| TeammateManager | 管理队友的创建、运行、状态追踪 |
| 关闭协议 | request_id 关联的请求-回复握手 |
| 方案审批 | 队友提交 → 领导审批 → 执行或修改 |
| 自主轮询 | 空闲时主动检查收件箱和任务板 |
| 任务认领锁 | 防止并发认领同一任务 |
| 身份重注入 | 上下文压缩后防止"失忆" |
8-工作树隔离与生产实战
第一部分:s12 工作树隔离
一、问题
之前所有队友/子代理都在同一个目录下工作。如果 alice 在改 auth.py,bob 同时也在改 auth.py,文件就冲突了。
就像两个人同时编辑同一个文档,互相覆盖对方的修改。
二、Git Worktree 是什么
普通情况:一个 git 仓库只有一个工作目录。 Worktree:同一个仓库可以有多个工作目录,每个在不同的分支上。
项目根目录/ src/ ← 主工作目录 .worktrees/ auth-refactor/ ← 独立工作目录(alice 在这里改代码) src/ ← 主目录的完整副本 fix-bug/ ← 另一个独立工作目录(bob 在这里) src/ index.json ← 工作树注册表 events.jsonl ← 生命周期事件日志
三、控制面 vs 执行面
s12 的核心设计思想是分离控制和执行:
控制面(.tasks/) 执行面(.worktrees/)
task_12.json auth-refactor/
{ ← alice 在这里工作
"id": 12, ← 独立的代码副本
"subject": "重构认证模块", ← 互不干扰
"worktree": "auth-refactor" ─────→
}
task_13.json fix-bug/
{ ← bob 在这里工作
"id": 13,
"subject": "修复登录 bug",
"worktree": "fix-bug" ──────────→
}
四、WorktreeManager 核心操作
4.1 创建工作树
def create(self, name, task_id=None, base_ref="HEAD"):
"""
执行: git worktree add -b wt/auth-refactor .worktrees/auth-refactor HEAD
效果: 创建独立工作目录 + 新分支
"""
path = self.dir / name
branch = f"wt/{name}"
self._run_git(["worktree", "add", "-b", branch, str(path), base_ref])
# 注册到 index.json
entry = {
"name": name,
"path": str(path),
"branch": branch,
"task_id": task_id,
"status": "active",
}
# 绑定任务
if task_id is not None:
self.tasks.bind_worktree(task_id, name)
4.2 在工作树中执行命令
def run(self, name, command): """在指定工作树目录中执行命令(目录隔离的关键)""" wt = self._find(name) path = Path(wt["path"]) r = subprocess.run( command, shell=True, cwd=path, # ← 关键:在工作树目录中执行! capture_output=True, text=True, timeout=300, ) return (r.stdout + r.stderr).strip()[:50000]
4.3 删除或保留
def remove(self, name, force=False, complete_task=False):
"""删除工作树,可选同时完成任务"""
self._run_git(["worktree", "remove", wt["path"]])
if complete_task and wt.get("task_id"):
self.tasks.update(wt["task_id"], status="completed")
def keep(self, name):
"""标记为保留(不删除,留着以后用)"""
item["status"] = "kept"
五、EventBus(事件日志)
class EventBus:
def emit(self, event, task=None, worktree=None, error=None):
"""记录生命周期事件到 JSONL"""
payload = {
"event": event, # 如 "worktree.create.after"
"ts": time.time(),
"task": task or {},
"worktree": worktree or {},
}
with self.path.open("a") as f:
f.write(json.dumps(payload) + "\n")
事件类型:
| 事件 | 触发时机 |
|---|---|
worktree.create.before |
创建前 |
worktree.create.after |
创建成功后 |
worktree.create.failed |
创建失败 |
worktree.remove.before |
删除前 |
worktree.remove.after |
删除成功后 |
worktree.keep |
标记保留 |
task.completed |
任务完成 |
六、16 个工具
s12 拥有全系列最多的工具(16 个):
| 分类 | 工具 |
|---|---|
| 基础(4) | bash, read_file, write_file, edit_file |
| 任务(5) | task_create, task_list, task_get, task_update, task_bind_worktree |
| 工作树(6) | worktree_create, worktree_list, worktree_status, worktree_run, worktree_keep, worktree_remove |
| 事件(1) | worktree_events |
第二部分:生产级 Harness(zidonghua-test)
七、从教学到生产
zidonghua-test/core/harness.py 是一个真实的生产级 Harness,它把 s01-s05 的模式应用到了自动化测试场景。
7.1 架构对比
| 教学版本 | 生产版本 |
|---|---|
| 通用编程 agent | 专注于自动化测试 |
| bash/read/write/edit | read_document/http_request/db_query/assert_check |
| 自由探索 | 两阶段工作流(规划 → 执行) |
| 无人工确认 | submit_plan 暂停点 |
| 控制台输出 | Markdown 报告生成 |
7.2 两阶段工作流
阶段1 - 规划 +------------------+ +------------------+ +------------------+ | read_document | --> | 分析需求 | --> | submit_plan | | 读取需求文档 | | 生成测试计划 | | 人工确认暂停点 | +------------------+ +------------------+ +------------------+ | [用户确认] | 阶段2 - 执行 v +------------------+ +------------------+ +------------------+ | prepare_test_data| --> | http_request | --> | db_query | | 准备测试数据 | | 执行HTTP请求 | | 查询数据库验证 | +------------------+ +------------------+ +------------------+ | +------------------+ +------v-----------+ | report_result | <-- | assert_check | | 记录PASS/FAIL | | 断言验证 | +------------------+ +------------------+
7.3 submit_plan:人工确认暂停点
def _handle_plan_pause(self, plan_input):
"""人工确认暂停点"""
print("=" * 60)
print("测试计划")
for case in plan_input.get("test_cases", []):
print(f" {case.get('id')}: {case.get('name')}")
while True:
answer = input("确认执行?[Y/n/q] ").strip().lower()
if answer in ("", "y", "yes"):
return "用户已确认,请开始执行测试用例。"
elif answer in ("n", "no", "q"):
return "用户已取消,请停止执行。"
设计理念:Agent 自动生成测试计划,但在执行前必须经过人工审批。这是一种 "人在回路"(Human-in-the-Loop)的设计模式。
7.4 7 个专业工具
| 工具 | 功能 | 对应教学版本 |
|---|---|---|
read_document |
读取需求文档 | s02 read_file |
submit_plan |
提交测试计划 | 新增(暂停点) |
http_request |
调用测试服务 | s01 bash |
db_query |
查询数据库 | 新增 |
assert_check |
断言验证 | 新增 |
prepare_test_data |
准备测试数据 | 新增 |
report_result |
记录测试结果 | 新增 |
7.5 入口程序(main.py)
# @文件引用解析 message = resolve_at_references(user_input) # 例如 "测试 @../docs/xxx.md" → 自动读取文件内容注入 # 运行 Harness result = harness.run_chat(message) # 生成报告 if result["results"]: report_path = reporter.generate( doc_path=doc_path, plan=result["plan"], results=result["results"], summary=result["summary"], )
第三部分:s_full 完整参考
八、全部组装
s_full.py 是 s01-s11 的集大成者,22 个工具:
+------------------------------------------------------------------+ | 完整 Agent | | | | 每次调用 LLM 前执行: | | +--------------------+ +------------------+ +--------------+ | | | 微压缩 + 自动压缩 | | 清空后台通知 | | 检查收件箱 | | | | (s06) | | (s08) | | (s09) | | | +--------------------+ +------------------+ +--------------+ | | | | 工具调度 (s02 模式): | | bash | read | write | edit | TodoWrite | task | load_skill | | compress | bg_run | bg_check | task_create | task_get | | task_update | task_list | spawn_teammate | list_teammates | | send_message | read_inbox | broadcast | shutdown_request | | plan_approval | idle | claim_task | +------------------------------------------------------------------+
8.1 主循环中的三件前置处理
def agent_loop(messages):
while True:
# 1. 压缩流水线(s06)
microcompact(messages)
if estimate_tokens(messages) > TOKEN_THRESHOLD:
messages[:] = auto_compact(messages)
# 2. 后台通知注入(s08)
notifs = BG.drain()
if notifs:
messages.append({"role": "user", "content": f"<background-results>..."})
# 3. 收件箱注入(s09)
inbox = BUS.read_inbox("lead")
if inbox:
messages.append({"role": "user", "content": f"<inbox>..."})
# 调用 LLM
response = client.messages.create(...)
九、全系列架构总览
s01 核心循环 while + tool_use + messages
↓
s02 工具调度 dispatch map {name: handler}
↓
s03 任务规划 TodoManager + 催促机制
↓
s04 子代理 上下文隔离 + 摘要返回
↓
s05 技能加载 两层机制(目录 + 按需加载)
↓
s06 上下文压缩 三层流水线(micro + auto + manual)
↓
s07 持久化任务 文件存储 + 依赖图
↓
s08 后台任务 线程 + 通知队列
↓
s09 团队协作 MessageBus + TeammateManager
↓
s10 协议 request_id 关联的握手
↓
s11 自主代理 空闲轮询 + 任务认领
↓
s12 工作树隔离 Git Worktree + EventBus
每一步都在前一步的基础上叠加一个新能力,核心循环始终不变。
十、设计原则总结
| 原则 | 体现 |
|---|---|
| 循环不变 | s01-s12 的 agent_loop 结构一致 |
| 功能叠加 | 每个版本在前一版基础上加一个能力 |
| 工具即能力 | 新功能 = 新工具 + 新 handler |
| 状态外化 | 任务、消息、配置都存文件,不依赖内存 |
| 消息注入 | 催促、后台结果、收件箱都通过注入消息实现 |
| 防递归 | 子代理/队友的工具集是父级的子集 |
| 渐进压缩 | 从轻量级(占位符)到重量级(LLM 摘要) |
| 协议握手 | request_id 关联请求和回复 |
| 目录隔离 | Git Worktree 实现并行不冲突 |
十一、实战建议
从教学到生产的路径
-
理解核心循环(s01-s02)→ 这是一切的基础
-
加上规划能力(s03/s07)→ 让 agent 有计划地工作
-
加上压缩能力(s06)→ 让 agent 能长时间工作
-
加上子代理(s04)→ 处理复杂任务时保持清晰
-
定制专业工具(参考 zidonghua-test)→ 针对你的业务场景
你不需要全部
不是每个项目都需要团队协作(s09-s11)或工作树隔离(s12)。根据实际需求选择:
| 场景 | 推荐级别 |
|---|---|
| 简单的命令行助手 | s01-s02 |
| 需要规划的复杂任务 | + s03 或 s07 |
| 长时间运行的 agent | + s06 |
| 需要并行探索的场景 | + s04 |
| 需要专业领域知识 | + s05 |
| 需要并行执行的耗时操作 | + s08 |
| 多角色协作系统 | + s09-s11 |
| 多任务并行开发 | + s12 |
9-资料下载地址
通过网盘分享的文件:harness智能体.zip
链接: https://pan.baidu.com/s/11jr93x41ITQHh0y-BCln0Q?pwd=tyui 提取码: tyui
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)