Claude Code架构深度解读(二)
这是第二篇文章,第三篇文章在我的blog:https://www.catchcat.eu.cc/post/claude-code%E6%9E%B6%E6%9E%84%E6%B7%B1%E5%BA%A6%E8%A7%A3%E8%AF%BB%E4%B8%89
s06: Context Compact (上下文压缩)(Harness 层: 压缩 – 干净的记忆, 无限的会话。)
问题
上下文窗口是有限的。读一个 1000 行的文件就吃掉 ~4000 token; 读 30 个文件、跑 20 条命令, 轻松突破 100k token。不压缩, Agent 根本没法在大项目里干活。
解决方案
三层压缩, 激进程度递增:
Every turn:
+------------------+
| Tool call result |
+------------------+
|
v
[Layer 1: micro_compact] (silent, every turn)
Replace tool_result > 3 turns old
with "[Previous: used {tool_name}]"
|
v
[Check: tokens > 50000?]
| |
no yes
| |
v v
continue [Layer 2: auto_compact]
Save transcript to .transcripts/
LLM summarizes conversation.
Replace all messages with [summary].
|
v
[Layer 3: compact tool]
Model calls compact explicitly.
Same summarization as auto_compact.
Layer1:micro_compact(每轮必执行):保留最近 3 个工具结果,非 read_file 的旧结果替换为 [Previous: used 工具名]
Layer2:auto_compact:token>50000 自动总结(本次指令 token 不足,不触发)
Layer3:compact 工具:手动压缩(一般不给大模型说就不触发)
相对 s05 的变更
| 组件 | 之前 (s05) | 之后 (s06) |
|---|---|---|
| Tools | 5 | 5 (基础 + compact) |
| 上下文管理 | 无 | 三层压缩 |
| Micro-compact | 无 | 旧结果 -> 占位符 |
| Auto-compact | 无 | token 阈值触发 |
| Transcripts | 无 | 保存到 .transcripts/ |
阶段 1:用户输入指令 → history = 1 条(初始状态)
[
{
"role": "user",
"content": "逐一阅读agents/目录中的每个Python文件"
}
]
阶段 2:进入 agent_loop → 执行 Layer1 micro_compact
代码执行:
micro_compact(messages) # 此时只有1条消息,无tool_result → 不做任何修改
messages保持不变,token<50000 → 不触发 auto_compact
阶段 3:调用大模型 → 返回 tool_use(bash 列出文件)
大模型返回 stop_reason: tool_use,代码追加助手消息 → history = 2 条
[
{
"role": "user",
"content": "逐一阅读agents/目录中的每个Python文件"
},
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "tu_001",
"name": "bash",
"input": {
"command": "ls agents/*.py"
}
}
]
}
]
阶段 4:执行 bash 工具 → 追加 tool_result → history = 3 条
执行 run_bash,返回文件列表,封装结果追加:
[
{
"role": "user",
"content": "逐一阅读agents/目录中的每个Python文件"
},
{
"role": "assistant",
"content": [{"type":"tool_use","id":"tu_001","name":"bash","input":{"command":"ls agents/*.py"}}]
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_001",
"content": "s05_agent.py\ns06_context_compact.py"
}
]
}
]
执行micro_compact:工具结果数量 = 1 < KEEP_RECENT=3 → 不压缩
阶段 5:第二轮循环 → LLM 调用 read_file 读第一个文件
追加助手消息 → history = 4 条
[
// 前3条省略
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "tu_002",
"name": "read_file",
"input": {
"path": "s05_agent.py"
}
}
]
}
]
阶段 6:执行 read_file → 追加结果 → history = 5 条
read_file 受保护,不压缩,返回文件内容:
[
// 前4条省略
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_002",
"content": "import os\nimport re\n...(s05_agent.py完整代码)"
}
]
}
]
micro_compact:结果数量 = 2 < 3 → 不压缩
阶段 7:第三轮循环 → LLM 调用 read_file 读第二个文件
追加助手消息 → history = 6 条
[
// 前5条省略
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "tu_003",
"name": "read_file",
"input": {
"path": "s06_context_compact.py"
}
}
]
}
]
阶段 8:执行 read_file → 追加结果 → history = 7 条
[
// 前6条省略
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_003",
"content": "import json\nimport os\n...(s06_context_compact.py完整代码)"
}
]
}
]
阶段 9:micro_compact 压缩逻辑触发演示(这个很关键)
当前工具结果列表:
- tu_001: bash(非保护工具)
- tu_002: read_file(保护)
- tu_003: read_file(保护)
总数量 = 3 = KEEP_RECENT=3 → 刚好不压缩
模拟:再多执行 1 个工具(达到 4 个,触发压缩)
如果再执行 1 个工具(tu_004: bash),工具结果 = 4,micro_compact 会自动替换最早的非保护结果:
// 被压缩后的旧结果(tu_001 bash)
{
"type": "tool_result",
"tool_use_id": "tu_001",
"content": "[Previous: used bash]" // 被微压缩替换!
}
read_file 结果永久保留,不会被压缩
阶段 10:LLM 结束任务 → 最终 history
大模型返回 end_turn,追加文本回答,任务完成:
[
// 前7条省略
{
"role": "assistant",
"content": "已成功逐一阅读agents/目录下的所有Python文件:\n1. s05_agent.py(基础Agent)\n2. s06_context_compact.py(三层压缩Agent)"
}
]
压缩过程也就是这个代码
THRESHOLD = 50000
TRANSCRIPT_DIR = WORKDIR / ".transcripts"
KEEP_RECENT = 3
PRESERVE_RESULT_TOOLS = {"read_file"}
def estimate_tokens(messages: list) -> int:
"""Rough token count: ~4 chars per token."""
return len(str(messages)) // 4
# -- Layer 1: micro_compact - replace old tool results with placeholders --
def micro_compact(messages: list) -> list:
# Collect (msg_index, part_index, tool_result_dict) for all tool_result entries
tool_results = []
for msg_idx, msg in enumerate(messages):
if msg["role"] == "user" and isinstance(msg.get("content"), list):
for part_idx, part in enumerate(msg["content"]):
if isinstance(part, dict) and part.get("type") == "tool_result":
tool_results.append((msg_idx, part_idx, part))
if len(tool_results) <= KEEP_RECENT:
return messages
# Find tool_name for each result by matching tool_use_id in prior assistant messages
tool_name_map = {}
for msg in messages:
if msg["role"] == "assistant":
content = msg.get("content", [])
if isinstance(content, list):
for block in content:
if hasattr(block, "type") and block.type == "tool_use":
tool_name_map[block.id] = block.name
# Clear old results (keep last KEEP_RECENT). Preserve read_file outputs because
# they are reference material; compacting them forces the agent to re-read files.
to_clear = tool_results[:-KEEP_RECENT]
for _, _, result in to_clear:
if not isinstance(result.get("content"), str) or len(result["content"]) <= 100:
continue
tool_id = result.get("tool_use_id", "")
tool_name = tool_name_map.get(tool_id, "unknown")
if tool_name in PRESERVE_RESULT_TOOLS:
continue
result["content"] = f"[Previous: used {tool_name}]"
return messages
# -- Layer 2: auto_compact - save transcript, summarize, replace messages --
def auto_compact(messages: list) -> list:
# Save full transcript to disk
TRANSCRIPT_DIR.mkdir(exist_ok=True)
transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
with open(transcript_path, "w") as f:
for msg in messages:
f.write(json.dumps(msg, default=str) + "\n")
print(f"[transcript saved: {transcript_path}]")
# Ask LLM to summarize
conversation_text = json.dumps(messages, default=str)[-80000:]
response = client.messages.create(
model=MODEL,
messages=[{"role": "user", "content":
"Summarize this conversation for continuity. Include: "
"1) What was accomplished, 2) Current state, 3) Key decisions made. "
"Be concise but preserve critical details.\n\n" + conversation_text}],
max_tokens=2000,
)
summary = next((block.text for block in response.content if hasattr(block, "text")), "")
if not summary:
summary = "No summary generated."
# Replace all messages with compressed summary
return [
{"role": "user", "content": f"[Conversation compressed. Transcript: {transcript_path}]\n\n{summary}"},
]
这里顺便解释一下enumerate这个函数(秒懂没!!!)(给 enumerate 传一个列表,遍历 / 转换后,每一项都是 (索引,元素) 的元组)
my_list = ["张三", "李四", "王五"]
result = enumerate(my_list)
print(result)
结果:
[(0, '张三'), (1, '李四'), (2, '王五')]
整个压缩解读
这 4 个变量定义了压缩的所有规则,是整个逻辑的核心:
# 1. Token阈值:超过50000 token,触发【深度自动压缩】
THRESHOLD = 50000
# 2. 对话存档文件夹:压缩前把完整对话存到这里,防止数据丢失
TRANSCRIPT_DIR = WORKDIR / ".transcripts"
# 3. 微压缩保留数量:只保留最近3个工具结果,更早的直接压缩
KEEP_RECENT = 3
# 4. 保护工具:read_file的结果永远不压缩(读文件的内容必须保留)
PRESERVE_RESULT_TOOLS = {"read_file"}
辅助函数:estimate_tokens(计算对话长度)
作用:粗略估算当前对话的 token 数量,判断是否触发深度压缩
def estimate_tokens(messages: list) -> int:
# 规则:1个token ≈ 4个英文字符,中文/符号也按这个粗略算,其实一个token对应的是1.3-1.5个汉字,这里粗滤结算
return len(str(messages)) // 4
作用:给代码一个「判断依据」—— 对话太长(>50000)就压缩。
核心 1:Layer1 微压缩 micro_compact(每轮必执行!轻量级)
核心作用
不删除消息,只把「旧的、非读文件的工具结果」替换成简短占位符
- 保留:最近 3 个工具结果 + 所有 read_file 结果
- 压缩:更早的 bash/write/edit 工具结果 → 替换为
[Previous: used 工具名]
逐行代码拆解 + JSON 示例
第一步:收集所有「工具结果(tool_result)
tool_results = []
for msg_idx, msg in enumerate(messages):
# 只找user角色、content是列表的消息
if msg["role"] == "user" and isinstance(msg.get("content"), list):
for part_idx, part in enumerate(msg["content"]):
# 只收集 type="tool_result" 的工具结果
if isinstance(part, dict) and part.get("type") == "tool_result":
tool_results.append((消息索引, 内容索引, 工具结果字典))
【示例】收集到的工具结果(共 4 个,超过 3 个,触发压缩)
- tu_001:bash 结果
- tu_002:read_file 结果
- tu_003:write_file 结果
- tu_004:read_file 结果
第二步:建立「工具 ID → 工具名」映射
tool_name_map = {}
for msg in messages:
if msg["role"] == "assistant":
for block in content:
if block.type == "tool_use":
# 记录:工具ID → 工具名(如 tu_001 → bash)
tool_name_map[block.id] = block.name
作用:知道每个工具结果对应的工具名,方便替换占位符。
第三步:执行压缩(核心!)
# 只保留最后3个,前面的全部标记为「待压缩」
to_clear = tool_results[:-KEEP_RECENT]
for _, _, result in to_clear:
# 跳过:内容很短的结果 / 保护工具(read_file)的结果
if 内容不是字符串 或 内容长度≤100: continue
if 工具名是read_file: continue
# 替换!核心压缩逻辑
result["content"] = f"[Previous: used {tool_name}]"
微压缩 JSON 前后对比(最直观)
压缩前(原始长文本,占空间)
{
"type": "tool_result",
"tool_use_id": "tu_001",
"content": "s05_agent.py\ns06_context_compact.py" // 原始bash结果
}
压缩后(替换为占位符,极省空间)
{
"type": "tool_result",
"tool_use_id": "tu_001",
"content": "[Previous: used bash]" // 微压缩完成!
}
关键保护:read_file 结果完全不变(这个很重要,读的内容必须保护)
{
"type": "tool_result",
"tool_use_id": "tu_002",
"content": "import os\nimport re\n...(完整文件代码)" // 永久保留,不压缩
}
核心 2:Layer2 深度自动压缩 auto_compact(超 50000token 触发)
核心作用
对话太长时,直接把「整个对话」替换成「1 条 AI 总结」,实现无限会话
三步必执行:
- 存档:把完整对话保存到
.transcripts文件夹(防止丢数据) - 总结:让大模型把整个对话浓缩成一段文字
- 替换:把几十条消息 → 替换成1 条总结消息
逐行代码拆解 + JSON 示例(这个更直观!!看这里来了!!!)
第一步:存档完整对话
# 创建文件夹,把所有消息保存为jsonl文件
TRANSCRIPT_DIR.mkdir(exist_ok=True)
with open(transcript_时间戳.jsonl, "w") as f:
f.write(完整对话json)
作用:压缩后也能找回原始对话。
第二步:让 AI 生成总结
# 给AI发指令:总结对话的成果、当前状态、关键决策
response = client.messages.create(
prompt="总结对话,保留关键信息",
messages=对话内容
)
summary = AI返回的总结文本
第三步:替换整个对话(终极压缩)
# 把N条消息 → 直接变成1条总结消息
return [
{"role": "user", "content": f"[对话已压缩]\n\n{summary}"},
]
深度压缩 JSON 前后对比
压缩前(几十条消息,超长)
[
{"role":"user","content":"逐一阅读agents目录文件"},
{"role":"assistant","content":[{"type":"tool_use","name":"bash"}]},
{"role":"user","content":[{"type":"tool_result","content":"[Previous: used bash]"}]},
{"role":"assistant","content":[{"type":"tool_use","name":"read_file"}]},
... 省略10+条消息
]
压缩后(只剩 1 条消息,极小)
[
{
"role": "user",
"content": "[Conversation compressed. Transcript: .transcripts/transcript_123456.jsonl]\n\n总结:已完成agents目录下2个Python文件的阅读,s05是基础Agent,s06是三层压缩Agent,无错误执行完成。"
}
]
s07: Task System (任务系统)(Harness 层: 持久化任务 – 比任何一次对话都长命的目标)
问题
s03 的 TodoManager 只是内存中的扁平清单: 没有顺序、没有依赖、状态只有做完没做完。真实目标是有结构的 – 任务 B 依赖任务 A, 任务 C 和 D 可以并行, 任务 E 要等 C 和 D 都完成。
没有显式的关系, Agent 分不清什么能做、什么被卡住、什么能同时跑。而且清单只活在内存里, 上下文压缩 (s06) 一跑就没了。
解决方案
把扁平清单升级为持久化到磁盘的任务图。每个任务是一个 JSON 文件, 有状态、前置依赖 (blockedBy)。任务图随时回答三个问题:
-
什么可以做? – 状态为
pending且blockedBy为空的任务。 -
什么被卡住? – 等待前置任务完成的任务。
-
什么做完了? – 状态为
completed的任务, 完成时自动解锁后续任务。其实就是s03的升级,之前是把所有的任务都串在一起执行,这里直接把有些任务并起来执行
.tasks/
task_1.json {"id":1, "status":"completed"}
task_2.json {"id":2, "blockedBy":[1], "status":"pending"}
task_3.json {"id":3, "blockedBy":[1], "status":"pending"}
task_4.json {"id":4, "blockedBy":[2,3], "status":"pending"}
任务图 (DAG):
+----------+
+--> | task 2 | --+
| | pending | |
+----------+ +----------+ +--> +----------+
| task 1 | | task 4 |
| completed| --> +----------+ +--> | blocked |
+----------+ | task 3 | --+ +----------+
| pending |
+----------+
顺序: task 1 必须先完成, 才能开始 2 和 3
并行: task 2 和 3 可以同时执行
依赖: task 4 要等 2 和 3 都完成
状态: pending -> in_progress -> completed
这个任务图是 s07 之后所有机制的协调骨架: 后台执行 (s08)、多 agent 团队 (s09+)、worktree 隔离 (s12) 都读写这同一个结构。
核心代码:
class TaskManager:
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(exist_ok=True)
self._next_id = self._max_id() + 1
def _max_id(self) -> int:
ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")]
return max(ids) if ids else 0
def _load(self, task_id: int) -> dict:
path = self.dir / f"task_{task_id}.json"
if not path.exists():
raise ValueError(f"Task {task_id} not found")
return json.loads(path.read_text())
def _save(self, task: dict):
path = self.dir / f"task_{task['id']}.json"
path.write_text(json.dumps(task, indent=2, ensure_ascii=False))
def create(self, subject: str, description: str = "") -> str:
task = {
"id": self._next_id, "subject": subject, "description": description,
"status": "pending", "blockedBy": [], "owner": "",
}
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2, ensure_ascii=False)
def get(self, task_id: int) -> str:
return json.dumps(self._load(task_id), indent=2, ensure_ascii=False)
def update(self, task_id: int, status: str = None,
add_blocked_by: list = None, remove_blocked_by: list = None) -> str:
task = self._load(task_id)
if status:
if status not in ("pending", "in_progress", "completed"):
raise ValueError(f"Invalid status: {status}")
task["status"] = status
if status == "completed":
self._clear_dependency(task_id)
if add_blocked_by:
task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
if remove_blocked_by:
task["blockedBy"] = [x for x in task["blockedBy"] if x not in remove_blocked_by]
self._save(task)
return json.dumps(task, indent=2, ensure_ascii=False)
def _clear_dependency(self, completed_id: int):
"""Remove completed_id from all other tasks' blockedBy lists."""
for f in self.dir.glob("task_*.json"):
task = json.loads(f.read_text())
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
self._save(task)
def list_all(self) -> str:
tasks = []
files = sorted(
self.dir.glob("task_*.json"),
key=lambda f: int(f.stem.split("_")[1])
)
for f in files:
tasks.append(json.loads(f.read_text()))
if not tasks:
return "No tasks."
lines = []
for t in tasks:
marker = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}")
return "\n".join(lines)
TASKS = TaskManager(TASKS_DIR)
我输入:
创建3个任务:“设置项目”、“编写代码”、“编写测试”。按顺序相互依赖
大模型会自动调用 5 个工具(严格匹配代码的 TOOL_HANDLERS):
task_create(设置项目)task_create(编写代码)task_create(编写测试)task_update(任务2, 添加依赖1)task_update(任务3, 添加依赖2)
一、核心方法 1:task_create(创建任务)
对应代码
def create(self, subject: str, description: str = "") -> str:
# 1. 生成任务字典(固定字段)
task = {
"id": self._next_id, "subject": subject, "description": description,
"status": "pending", "blockedBy": [], "owner": "",
}
# 2. 写入JSON文件
self._save(task)
# 3. 自增ID,下次创建用
self._next_id += 1
# 4. 返回完整JSON
return json.dumps(task, indent=2, ensure_ascii=False)
执行动作(对应我的 3 个任务)
-
创建任务 1
传入
subject=设置项目- 生成
id=1,blockedBy=[],写入task_1.json
- 生成
-
创建任务 2
传入
subject=编写代码- 生成
id=2,blockedBy=[],写入task_2.json
- 生成
-
创建任务 3
传入
subject=编写测试- 生成
id=3,blockedBy=[],写入task_3.json
- 生成
结果
生成 3 个空依赖的 JSON 文件,这是创建的底层逻辑。
生成文件:.tasks/task_1.json
{
"id": 1,
"subject": "设置项目",
"description": "",
"status": "pending",
"blockedBy": [],
"owner": ""
}
生成文件:.tasks/task_2.json
{
"id": 2,
"subject": "编写代码",
"description": "",
"status": "pending",
"blockedBy": [],
"owner": ""
}
生成文件:.tasks/task_3.json
{
"id": 3,
"subject": "编写测试",
"description": "",
"status": "pending",
"blockedBy": [],
"owner": ""
}
二、核心方法 2:task_update(修改依赖 / 状态)
对应代码
def update(self, task_id: int, status=None, add_blocked_by=None, remove_blocked_by=None):
# 1. 加载要修改的任务JSON
task = self._load(task_id)
# 2. 添加依赖:合并列表+去重
if add_blocked_by:
task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
# 3. 保存修改后的JSON
self._save(task)
return json.dumps(task)
执行动作(配置顺序依赖)
-
修改任务 2
:调用
update(task_id=2, add_blocked_by=[1])- 加载
task_2.json - 把
blockedBy: []→ 合并成 [1] - 覆盖写入文件
- 加载
-
修改任务 3
:调用
update(task_id=3, add_blocked_by=[2])- 加载
task_3.json - 把
blockedBy: []→ 合并成 [2] - 覆盖写入文件
- 加载
结果
任务 2 依赖 1,任务 3 依赖 2,这是依赖配置的底层逻辑。
修改文件:.tasks/task_2.json
完整最终 JSON(blockedBy 从 [] → [1])
{
"id": 2,
"subject": "编写代码",
"description": "",
"status": "pending",
"blockedBy": [
1
],
"owner": ""
}
修改文件:.tasks/task_3.json
完整最终 JSON(blockedBy 从 [] → [2])
{
"id": 3,
"subject": "编写测试",
"description": "",
"status": "pending",
"blockedBy": [
2
],
"owner": ""
}
.tasks/task_1.json(无变化)
{
"id": 1,
"subject": "设置项目",
"description": "",
"status": "pending",
"blockedBy": [],
"owner": ""
}
三、核心方法 3:task_list(列出所有任务)
对应代码(这是输出格式的唯一来源!)
def list_all(self) -> str:
# 1. 遍历.tasks/下所有task_*.json,按ID排序
files = sorted(self.dir.glob("task_*.json"), key=lambda f: int(f.stem.split("_")[1]))
# 2. 读取所有任务
for f in files:
task = json.loads(f.read_text())
# 3. 按固定格式拼接每行输出(核心!)
marker = {"pending":"[ ]", "in_progress":"[>]", "completed":"[x]"}[task["status"]]
blocked = f" (blocked by: {task['blockedBy']})" if task["blockedBy"] else ""
lines.append(f"{marker} #{task['id']}: {task['subject']}{blocked}")
# 4. 返回拼接好的字符串
return "\n".join(lines)
执行动作

结果
格式是代码硬编码拼接的。!!!!!!!!
[ ] #1: 设置项目
[ ] #2: 编写代码 (blocked by: [1])
[ ] #3: 编写测试 (blocked by: [2])
四、核心方法 4:task_get(查询单个任务详情)
对应代码
def get(self, task_id: int) -> str:
# 直接加载对应JSON,返回完整字符串
return json.dumps(self._load(task_id), indent=2)
执行动作
调用 task_get(1) → 直接读取 task_1.json 并返回完整 JSON。
五、完整闭环:从指令 → 最终输出(一步一对应)
初始状态
.tasks/` 目录为空,`_next_id=1
第 1 步:调用 task_create (设置项目)
- 代码生成
id=1任务 → 写入task_1.json _next_id变成 2
第 2 步:调用 task_create (编写代码)
- 代码生成
id=2任务 → 写入task_2.json _next_id变成 3
第 3 步:调用 task_create (编写测试)
- 代码生成
id=3任务 → 写入task_3.json _next_id变成 4
第 4 步:调用 task_update (2, add_blocked_by=[1])
- 代码修改
task_2.json→blockedBy: [1]
第 5 步:调用 task_update (3, add_blocked_by=[2])
- 代码修改
task_3.json→blockedBy: [2]
第 6 步:调用 task_list
[ ] #1: 设置项目
[ ] #2: 编写代码 (blocked by: [1])
[ ] #3: 编写测试 (blocked by: [2])
s08: Background Tasks (后台任务)(Harness 层: 后台执行 – 模型继续思考, harness 负责等待)
问题
有些命令要跑好几分钟: npm install、pytest、docker build。阻塞式循环下模型只能干等。用户说 “装依赖, 顺便建个配置文件”, Agent 却只能一个一个来。
解决方案
Main thread Background thread
+-----------------+ +-----------------+
| agent loop | | subprocess runs |
| ... | | ... |
| [LLM call] <---+------- | enqueue(result) |
| ^drain queue | +-----------------+
+-----------------+
Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
| |
v v
[A runs] [B runs] (parallel)
| |
+-- results injected before next LLM call --+
相对 s07 的变更
| 组件 | 之前 (s07) | 之后 (s08) |
|---|---|---|
| Tools | 8 | 6 (基础 + background_run + check) |
| 执行方式 | 仅阻塞 | 阻塞 + 后台线程 |
| 通知机制 | 无 | 每轮排空的队列 |
| 并发 | 无 | 守护线程 |
关键代码:
class MessageBus:
def __init__(self, inbox_dir: Path):
self.dir = inbox_dir
self.dir.mkdir(parents=True, exist_ok=True)
def send(self, sender: str, to: str, content: str,
msg_type: str = "message", extra: dict = None) -> str:
if msg_type not in VALID_MSG_TYPES:
return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
msg = {
"type": msg_type,
"from": sender,
"content": content,
"timestamp": time.time(),
}
if extra:
msg.update(extra)
inbox_path = self.dir / f"{to}.jsonl"
with open(inbox_path, "a") as f:
f.write(json.dumps(msg) + "\n")
return f"Sent {msg_type} to {to}"
def read_inbox(self, name: str) -> list:
inbox_path = self.dir / f"{name}.jsonl"
if not inbox_path.exists():
return []
messages = []
for line in inbox_path.read_text().strip().splitlines():
if line:
messages.append(json.loads(line))
inbox_path.write_text("")
return messages
def broadcast(self, sender: str, content: str, teammates: list) -> str:
count = 0
for name in teammates:
if name != sender:
self.send(sender, name, content, "broadcast")
count += 1
return f"Broadcast to {count} teammates"
BUS = MessageBus(INBOX_DIR)
我输入指令:
在后台运行 "sleep 5 && echo done",同时创建一个 test.txt 文件,内容为 hello
history 初始 = 1 条(只有我的提问)
[
{
"role": "user",
"content": "在后台运行 \"sleep 5 && echo done\",同时创建一个 test.txt 文件,内容为 hello"
}
]
阶段 1:AI 调用 background_run 工具(后台执行 sleep 命令)
追加助手消息 → history = 2 条
[
{
"role": "user",
"content": "在后台运行 \"sleep 5 && echo done\",同时创建一个 test.txt 文件,内容为 hello"
},
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "tu_001",
"name": "background_run",
"input": {
"command": "sleep 5 && echo done"
}
}
]
}
]
BackgroundManager 数据变化(核心)
# tasks 记录本:新增任务,状态=运行中
self.tasks = {
"8f2d9c7a": {
"status": "running",
"result": None,
"command": "sleep 5 && echo done"
}
}
# 消息箱:空
self._notification_queue = []
阶段 2:执行 background_run → 追加 tool_result → history = 3 条
返回后台任务启动提示,封装结果追加:
[
{
"role": "user",
"content": "在后台运行 \"sleep 5 && echo done\",同时创建一个 test.txt 文件,内容为 hello"
},
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "tu_001",
"name": "background_run",
"input": {
"command": "sleep 5 && echo done"
}
}
]
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_001",
"content": "Background task 8f2d9c7a started: sleep 5 && echo done"
}
]
}
]
BackgroundManager 数据变化
无变化:任务仍在 running,消息箱为空
关键:主线程不卡顿!立刻可以执行创建文件的操作
阶段 3:AI 调用 write_file 工具(创建文件)
追加助手消息 → history = 4 条
[
// 前3条省略
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "tu_002",
"name": "write_file",
"input": {
"path": "test.txt",
"content": "hello"
}
}
]
}
]
阶段 4:执行 write_file → 追加 tool_result → history = 5 条
[
// 前4条省略
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_002",
"content": "Wrote 5 bytes"
}
]
}
]
此时状态
test.txt文件已创建完成- 后台任务
8f2d9c7a仍在 sleep 5 秒(running) BackgroundManager消息箱依旧为空
阶段 5:5 秒后 → 后台任务自动完成
后台线程执行完毕,BackgroundManager 内部数据突变:
# tasks 记录本:状态改为 completed,写入结果
self.tasks = {
"8f2d9c7a": {
"status": "completed",
"result": "done",
"command": "sleep 5 && echo done"
}
}
# 消息箱:放入完成通知
self._notification_queue = [
{
"task_id": "8f2d9c7a",
"status": "completed",
"command": "sleep 5 && echo done",
"result": "done"
}
]
阶段 6:AI 第二轮循环 → drain 通知 → 追加背景结果消息 → history = 6 条
AI 调用 LLM 前,会把后台任务结果作为用户消息追加到历史:
[
// 前5条省略
{
"role": "user",
"content": "<background-results>\n[bg:8f2d9c7a] completed: done\n</background-results>"
}
]
BackgroundManager 数据变化
消息箱被清空:self._notification_queue = []
阶段 7:LLM 接收通知 → 返回助手最终消息 → history = 7 条
AI 告知我后台任务完成,最终对话历史:
[
// 前6条省略
{
"role": "assistant",
"content": "后台任务已完成:sleep 5 && echo done,输出结果:done\n已成功创建文件 test.txt,内容为 hello"
}
]
最终完整 history JSON
[
{
"role": "user",
"content": "在后台运行 \"sleep 5 && echo done\",同时创建一个 test.txt 文件,内容为 hello"
},
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "tu_001",
"name": "background_run",
"input": {
"command": "sleep 5 && echo done"
}
}
]
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_001",
"content": "Background task 8f2d9c7a started: sleep 5 && echo done"
}
]
},
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "tu_002",
"name": "write_file",
"input": {
"path": "test.txt",
"content": "hello"
}
}
]
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_002",
"content": "Wrote 5 bytes"
}
]
},
{
"role": "user",
"content": "<background-results>\n[bg:8f2d9c7a] completed: done\n</background-results>"
},
{
"role": "assistant",
"content": "后台任务已完成:sleep 5 && echo done,输出结果:done\n已成功创建文件 test.txt,内容为 hello"
}
]
s09: Agent Teams (Agent 团队)(Harness 层: 团队邮箱 – 多个模型, 通过文件协调)
问题
Subagent (s04) 是一次性的: 生成、干活、返回摘要、消亡。没有身份, 没有跨调用的记忆。Background Tasks (s08) 能跑 shell 命令, 但做不了 LLM 引导的决策。
真正的团队协作需要三样东西: (1) 能跨多轮对话存活的持久 Agent, (2) 身份和生命周期管理, (3) Agent 之间的通信通道。
解决方案
Teammate lifecycle:
spawn -> WORKING -> IDLE -> WORKING -> ... -> SHUTDOWN
Communication:
.team/
config.json <- team roster + statuses
inbox/
alice.jsonl <- append-only, drain-on-read
bob.jsonl
lead.jsonl
+--------+ send("alice","bob","...") +--------+
| alice | -----------------------------> | bob |
| loop | bob.jsonl << {json_line} | loop |
+--------+ +--------+
^ |
| BUS.read_inbox("alice") |
+---- alice.jsonl -> read + drain ---------+
相对 s08 的变更
| 组件 | 之前 (s08) | 之后 (s09) |
|---|---|---|
| Tools | 6 | 9 (+spawn/send/read_inbox) |
| Agent 数量 | 单一 | 领导 + N 个队友 |
| 持久化 | 无 | config.json + JSONL 收件箱 |
| 线程 | 后台命令 | 每线程完整 agent loop |
| 生命周期 | 一次性 | idle -> working -> idle |
| 通信 | 无 | message + broadcast |
输入指令
生成 Alice(编码者)和 Bob(测试者)。让 Alice 给 Bob 发送一条消息
初始状态
history = 1 条(仅我的提问)
[
{
"role": "user",
"content": "生成Alice(编码者)和Bob(测试者)。让Alice给Bob发送一条消息"
}
]
阶段 1:AI 调用 spawn_teammate 工具(创建 Alice)
追加助手消息 → history = 2 条
[
{
"role": "user",
"content": "生成Alice(编码者)和Bob(测试者)。让Alice给Bob发送一条消息"
},
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "tu_001",
"name": "spawn_teammate",
"input": {
"name": "Alice",
"role": "编码者",
"prompt": "我是编码者Alice,后续给测试者Bob发送消息"
}
}
]
}
]
阶段 2:执行 spawn_teammate → 追加 tool_result → history = 3 条
创建 Alice 成功,返回结果,同步更新 .team/config.json
[
// 前2条省略
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_001",
"content": "Spawned 'Alice' (role: 编码者)"
}
]
}
]
团队文件变化
.team/config.json(新增 Alice,状态 = working)
{
"team_name": "default",
"members": [
{
"name": "Alice",
"role": "编码者",
"status": "working"
}
]
}
收件箱:.team/inbox/alice.jsonl 空文件
阶段 3:AI 调用 spawn_teammate 工具(创建 Bob)
追加助手消息 → history = 4 条
[
// 前3条省略
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "tu_002",
"name": "spawn_teammate",
"input": {
"name": "Bob",
"role": "测试者",
"prompt": "我是测试者Bob,接收Alice的消息"
}
}
]
}
]
阶段 4:执行 spawn_teammate → 追加 tool_result → history = 5 条
创建 Bob 成功,返回结果,同步更新 .team/config.json
[
// 前4条省略
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_002",
"content": "Spawned 'Bob' (role: 测试者)"
}
]
}
]
团队文件变化
.team/config.json(Alice+Bob,均为 working)
{
"team_name": "default",
"members": [
{
"name": "Alice",
"role": "编码者",
"status": "working"
},
{
"name": "Bob",
"role": "测试者",
"status": "working"
}
]
}
收件箱:.team/inbox/bob.jsonl 空文件
阶段 5:AI 调用 send_message 工具(指挥 Alice 发消息)
追加助手消息 → history = 6 条
(Lead 给 Alice 发指令:给 Bob 发消息)
[
// 前5条省略
{
"role": "assistant",
"content": [
{
"type": "tool_use",
"id": "tu_003",
"name": "send_message",
"input": {
"to": "Alice",
"content": "给Bob发送一条消息:我是Alice,已准备好编码工作",
"msg_type": "message"
}
}
]
}
]
阶段 6:执行 send_message → 追加 tool_result → history = 7 条
消息发送成功,写入 Alice 的收件箱
[
// 前6条省略
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_003",
"content": "Sent message to Alice"
}
]
}
]
收件箱文件变化
.team/inbox/alice.jsonl(写入 Lead 发来的指令)
{"type":"message","from":"lead","content":"给Bob发送一条消息:我是Alice,已准备好编码工作","timestamp":1718888888.888}
阶段 7:Alice 线程自动读取收件箱 → 执行 send_message 发消息给 Bob
Alice 自动执行工具,消息写入 Bob 的收件箱
收件箱文件变化
.team/inbox/bob.jsonl(Alice 发给 Bob 的消息)
{"type":"message","from":"Alice","content":"我是Alice,已准备好编码工作","timestamp":1718888889.999}
配置文件变化
Alice 完成工作,状态变为 idle
.team/config.json
{
"team_name": "default",
"members": [
{
"name": "Alice",
"role": "编码者",
"status": "idle"
},
{
"name": "Bob",
"role": "测试者",
"status": "working"
}
]
}
阶段 8:AI 第二轮循环 → 读取 Lead 收件箱 → 追加消息 → history = 8 条
系统自动读取收件箱,把 Alice 的回执追加到对话历史
[
// 前7条省略
{
"role": "user",
"content": "<inbox>[{\"type\":\"message\",\"from\":\"Alice\",\"content\":\"已给Bob发送消息\",\"timestamp\":1718888890.000}]</inbox>"
}
]
阶段 9:LLM 返回最终结果 → history = 9 条
AI 给出最终答复,流程结束
[
// 前8条省略
{
"role": "assistant",
"content": "已成功生成:\n1. Alice(编码者)\n2. Bob(测试者)\nAlice 已向 Bob 发送消息:我是Alice,已准备好编码工作"
}
]
最终完整 history JSON
[
{
"role": "user",
"content": "生成Alice(编码者)和Bob(测试者)。让Alice给Bob发送一条消息"
},
{
"role": "assistant",
"content": [{"type":"tool_use","id":"tu_001","name":"spawn_teammate","input":{"name":"Alice","role":"编码者","prompt":"我是编码者Alice,后续给测试者Bob发送消息"}}]
},
{
"role": "user",
"content": [{"type":"tool_result","tool_use_id":"tu_001","content":"Spawned 'Alice' (role: 编码者)"}]
},
{
"role": "assistant",
"content": [{"type":"tool_use","id":"tu_002","name":"spawn_teammate","input":{"name":"Bob","role":"测试者","prompt":"我是测试者Bob,接收Alice的消息"}}]
},
{
"role": "user",
"content": [{"type":"tool_result","tool_use_id":"tu_002","content":"Spawned 'Bob' (role: 测试者)"}]
},
{
"role": "assistant",
"content": [{"type":"tool_use","id":"tu_003","name":"send_message","input":{"to":"Alice","content":"给Bob发送一条消息:我是Alice,已准备好编码工作","msg_type":"message"}}]
},
{
"role": "user",
"content": [{"type":"tool_result","tool_use_id":"tu_003","content":"Sent message to Alice"}]
},
{
"role": "user",
"content": "<inbox>[{\"type\":\"message\",\"from\":\"Alice\",\"content\":\"已给Bob发送消息\",\"timestamp\":1718888890.000}]</inbox>"
},
{
"role": "assistant",
"content": "已成功生成:\n1. Alice(编码者)\n2. Bob(测试者)\nAlice 已向 Bob 发送消息:我是Alice,已准备好编码工作"
}
]
核心代码:
class MessageBus:
def __init__(self, inbox_dir: Path):
self.dir = inbox_dir
self.dir.mkdir(parents=True, exist_ok=True)
def send(self, sender: str, to: str, content: str,
msg_type: str = "message", extra: dict = None) -> str:
if msg_type not in VALID_MSG_TYPES:
return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
msg = {
"type": msg_type,
"from": sender,
"content": content,
"timestamp": time.time(),
}
if extra:
msg.update(extra)
inbox_path = self.dir / f"{to}.jsonl"
with open(inbox_path, "a") as f:
f.write(json.dumps(msg) + "\n")
return f"Sent {msg_type} to {to}"
def read_inbox(self, name: str) -> list:
inbox_path = self.dir / f"{name}.jsonl"
if not inbox_path.exists():
return []
messages = []
for line in inbox_path.read_text().strip().splitlines():
if line:
messages.append(json.loads(line))
inbox_path.write_text("")
return messages
def broadcast(self, sender: str, content: str, teammates: list) -> str:
count = 0
for name in teammates:
if name != sender:
self.send(sender, name, content, "broadcast")
count += 1
return f"Broadcast to {count} teammates"
BUS = MessageBus(INBOX_DIR)
# -- TeammateManager: persistent named agents with config.json --
class TeammateManager:
def __init__(self, team_dir: Path):
self.dir = team_dir
self.dir.mkdir(exist_ok=True)
self.config_path = self.dir / "config.json"
self.config = self._load_config()
self.threads = {}
def _load_config(self) -> dict:
if self.config_path.exists():
return json.loads(self.config_path.read_text())
return {"team_name": "default", "members": []}
def _save_config(self):
self.config_path.write_text(json.dumps(self.config, indent=2))
def _find_member(self, name: str) -> dict:
for m in self.config["members"]:
if m["name"] == name:
return m
return None
def spawn(self, name: str, role: str, prompt: str) -> str:
member = self._find_member(name)
if member:
if member["status"] not in ("idle", "shutdown"):
return f"Error: '{name}' is currently {member['status']}"
member["status"] = "working"
member["role"] = role
else:
member = {"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
self._save_config()
thread = threading.Thread(
target=self._teammate_loop,
args=(name, role, prompt),
daemon=True,
)
self.threads[name] = thread
thread.start()
return f"Spawned '{name}' (role: {role})"
def _teammate_loop(self, name: str, role: str, prompt: str):
sys_prompt = (
f"You are '{name}', role: {role}, at {WORKDIR}. "
f"Use send_message to communicate. Complete your task."
)
messages = [{"role": "user", "content": prompt}]
tools = self._teammate_tools()
for _ in range(50):
inbox = BUS.read_inbox(name)
for msg in inbox:
messages.append({"role": "user", "content": json.dumps(msg)})
try:
response = client.messages.create(
model=MODEL,
system=sys_prompt,
messages=messages,
tools=tools,
max_tokens=8000,
)
except Exception:
break
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
break
results = []
for block in response.content:
if block.type == "tool_use":
output = self._exec(name, block.name, block.input)
print(f" [{name}] {block.name}: {str(output)[:120]}")
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
})
messages.append({"role": "user", "content": results})
member = self._find_member(name)
if member and member["status"] != "shutdown":
member["status"] = "idle"
self._save_config()
def _exec(self, sender: str, tool_name: str, args: dict) -> str:
# these base tools are unchanged from s02
if tool_name == "bash":
return _run_bash(args["command"])
if tool_name == "read_file":
return _run_read(args["path"])
if tool_name == "write_file":
return _run_write(args["path"], args["content"])
if tool_name == "edit_file":
return _run_edit(args["path"], args["old_text"], args["new_text"])
if tool_name == "send_message":
return BUS.send(sender, args["to"], args["content"], args.get("msg_type", "message"))
if tool_name == "read_inbox":
return json.dumps(BUS.read_inbox(sender), indent=2)
return f"Unknown tool: {tool_name}"
def _teammate_tools(self) -> list:
# these base tools are unchanged from s02
return [
{"name": "bash", "description": "Run a shell command.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "send_message", "description": "Send message to a teammate.",
"input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
{"name": "read_inbox", "description": "Read and drain your inbox.",
"input_schema": {"type": "object", "properties": {}}},
]
def list_all(self) -> str:
if not self.config["members"]:
return "No teammates."
lines = [f"Team: {self.config['team_name']}"]
for m in self.config["members"]:
lines.append(f" {m['name']} ({m['role']}): {m['status']}")
return "\n".join(lines)
def member_names(self) -> list:
return [m["name"] for m in self.config["members"]]
TEAM = TeammateManager(TEAM_DIR)
关键代码解读
| 类名 | 比喻 | 核心作用 | 存储介质 |
|---|---|---|---|
MessageBus |
团队邮局 | 负责 AI 之间发消息、收消息、清空消息 | .team/inbox/xxx.jsonl(每个成员一个收件箱文件) |
TeammateManager |
团队人事经理 | 负责创建 AI 成员、管理成员状态、启动独立线程 | .team/config.json(团队花名册) |
第一部分:超细解读 MessageBus 消息邮局
核心作用
专门处理AI 成员之间的消息收发,所有消息都存在文件里(持久化,重启程序也不丢),每个成员有专属的 .jsonl 收件箱文件。
1. 初始化方法 __init__
def __init__(self, inbox_dir: Path):
# 保存收件箱文件夹路径:.team/inbox
self.dir = inbox_dir
# 创建文件夹(不存在就新建,存在就跳过)
self.dir.mkdir(parents=True, exist_ok=True)
解读:
- 一启动程序,就自动创建文件夹
.team/inbox - 这个文件夹里,每个 AI 成员对应一个收件箱文件:
alice.jsonl、bob.jsonl
2. 发消息方法 send(核心!)
def send(self, sender: str, to: str, content: str,
msg_type: str = "message", extra: dict = None) -> str:
# 校验消息类型是否合法(只能是预设的5种)
if msg_type not in VALID_MSG_TYPES:
return f"Error: Invalid type '{msg_type}'. Valid: {VALID_MSG_TYPES}"
# 组装消息JSON格式
msg = {
"type": msg_type, # 消息类型:普通消息/广播等
"from": sender, # 发送者:lead/alice/bob
"content": content, # 消息内容
"timestamp": time.time(),# 时间戳
}
# 如果有额外信息,合并到消息里
if extra:
msg.update(extra)
# 找到接收者的收件箱文件:.team/inbox/【to】.jsonl
inbox_path = self.dir / f"{to}.jsonl"
# 以【追加模式】写入消息(不会覆盖旧消息)
with open(inbox_path, "a") as f:
f.write(json.dumps(msg) + "\n")
# 返回发送成功提示
return f"Sent {msg_type} to {to}"
实际运行:Alice 给 Bob 发消息
调用代码:BUS.send("Alice", "Bob", "我是Alice,开始编码")
JSON 消息格式:
{
"type": "message",
"from": "Alice",
"content": "我是Alice,开始编码",
"timestamp": 1718999999.123
}
文件变化:
自动创建 / 追加到 .team/inbox/bob.jsonl:
{"type":"message","from":"Alice","content":"我是Alice,开始编码","timestamp":1718999999.123}
3. 读收件箱方法 read_inbox(核心!)
def read_inbox(self, name: str) -> list:
# 找到对应成员的收件箱文件
inbox_path = self.dir / f"{name}.jsonl"
# 文件不存在,返回空列表
if not inbox_path.exists():
return []
messages = []
# 读取文件所有行,逐行解析为JSON消息
for line in inbox_path.read_text().strip().splitlines():
if line:
messages.append(json.loads(line))
# 读完**清空文件**(消息只读取一次)
inbox_path.write_text("")
# 返回所有消息列表
return messages
实际运行:Bob 读取自己的收件箱
调用代码:BUS.read_inbox("Bob")
执行前:bob.jsonl 有 1 条消息
执行中:解析消息为 JSON 列表
[
{
"type": "message",
"from": "Alice",
"content": "我是Alice,开始编码",
"timestamp": 1718999999.123
}
]
执行后:bob.jsonl 被清空(内容为空)
4. 广播方法 broadcast
def broadcast(self, sender: str, content: str, teammates: list) -> str:
count = 0
# 遍历所有团队成员,给除了自己以外的所有人发消息
for name in teammates:
if name != sender:
self.send(sender, name, content, "broadcast")
count += 1
return f"Broadcast to {count} teammates"
解读:群发消息,比如 Lead 给 Alice、Bob 同时发通知。
5. 全局实例
BUS = MessageBus(INBOX_DIR)
解读:创建一个全局唯一的邮局,整个程序的 AI 都用它收发消息。
第二部分:超细解读 TeammateManager 团队管理员
核心作用
管理 AI 团队的成员创建、状态、线程运行,成员信息存在 config.json(花名册),每个成员独立运行一个线程(不会互相干扰)。
1. 初始化方法 __init__
def __init__(self, team_dir: Path):
# 团队文件夹路径:.team
self.dir = team_dir
self.dir.mkdir(exist_ok=True)
# 团队配置文件:.team/config.json
self.config_path = self.dir / "config.json"
# 加载配置(花名册)
self.config = self._load_config()
# 存储成员线程:key=成员名,value=线程对象
self.threads = {}
2. 加载 / 保存配置(花名册)
# 加载花名册
def _load_config(self) -> dict:
if self.config_path.exists():
return json.loads(self.config_path.read_text())
# 默认空团队
return {"team_name": "default", "members": []}
# 保存花名册到文件
def _save_config(self):
self.config_path.write_text(json.dumps(self.config, indent=2))
默认初始 JSON(config.json)
{
"team_name": "default",
"members": []
}
3. 查找成员
def _find_member(self, name: str) -> dict:
# 遍历花名册,找到对应名字的成员
for m in self.config["members"]:
if m["name"] == name:
return m
return None
4. 创建成员 spawn(核心!)
def spawn(self, name: str, role: str, prompt: str) -> str:
# 查找成员是否已存在
member = self._find_member(name)
if member:
# 成员正在工作,不能重复创建
if member["status"] not in ("idle", "shutdown"):
return f"Error: '{name}' is currently {member['status']}"
# 更新状态为工作中
member["status"] = "working"
member["role"] = role
else:
# 新建成员:名字、角色、状态=工作中
member = {"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
# 保存到花名册文件
self._save_config()
# 创建独立线程,运行成员的循环
thread = threading.Thread(
target=self._teammate_loop,
args=(name, role, prompt),
daemon=True,
)
self.threads[name] = thread
thread.start()
return f"Spawned '{name}' (role: {role})"
实际运行:创建 Alice(编码者)
调用代码:TEAM.spawn("Alice", "编码者", "我是编码者")
JSON 变化(config.json):
{
"team_name": "default",
"members": [
{
"name": "Alice",
"role": "编码者",
"status": "working"
}
]
}
再创建 Bob(测试者)后:
{
"team_name": "default",
"members": [
{ "name": "Alice", "role": "编码者", "status": "working" },
{ "name": "Bob", "role": "测试者", "status": "working" }
]
}
5. 成员独立循环 _teammate_loop(核心!)
def _teammate_loop(self, name: str, role: str, prompt: str):
# 成员的系统提示:我是XX,角色XX
sys_prompt = f"You are '{name}', role: {role}"
# 成员的对话历史
messages = [{"role": "user", "content": prompt}]
tools = self._teammate_tools()
# 最多执行50轮任务
for _ in range(50):
# 1. 读取自己的收件箱(自动清空)
inbox = BUS.read_inbox(name)
# 2. 把消息加入自己的对话历史
for msg in inbox:
messages.append({"role": "user", "content": json.dumps(msg)})
# 3. 调用AI思考
response = client.messages.create(...)
messages.append({"role": "assistant", "content": response.content})
# 4. 执行工具(发消息、读文件等)
if response.stop_reason == "tool_use":
results = []
for block in response.content:
if block.type == "tool_use":
output = self._exec(name, block.name, block.input)
results.append(...)
messages.append({"role": "user", "content": results})
# 5. 任务完成,状态改为空闲
member = self._find_member(name)
member["status"] = "idle"
self._save_config()
核心逻辑
每个成员独立跑一个死循环:
- 自动查收自己的收件箱
- 把消息交给 AI 思考
- 执行工具(比如给其他成员发消息)
- 完成后状态改为
idle(空闲)
JSON 变化:Alice 完成任务后
config.json 中 Alice 状态从 working → idle:
{ "name": "Alice", "role": "编码者", "status": "idle" }
6. 工具执行 _exec + 成员工具列表
# 执行成员的工具
def _exec(self, sender: str, tool_name: str, args: dict) -> str:
if tool_name == "send_message":
# 调用邮局发消息
return BUS.send(sender, args["to"], args["content"])
# 其他工具:bash、读/写文件
...
# 成员能用的所有工具
def _teammate_tools(self) -> list:
return [bash, read_file, write_file, send_message, read_inbox]
解读:成员可以自己发消息、读文件、执行命令,完全独立工作。
7. 查询团队信息
# 列出所有成员
def list_all(self) -> str:
lines = [f"Team: {self.config['team_name']}"]
for m in self.config["members"]:
lines.append(f" {m['name']} ({m['role']}): {m['status']}")
return "\n".join(lines)
# 获取所有成员名字
def member_names(self) -> list:
return [m["name"] for m in self.config["members"]]
8. 全局实例
TEAM = TeammateManager(TEAM_DIR)
解读:创建一个全局唯一的团队管理员,管理所有 AI 成员。
第三部分:两个类联动 + 完整 JSON / 文件变化
场景:创建 Alice、Bob → Alice 给 Bob 发消息
步骤 1:创建 Alice
config.json` 新增成员,状态 `working
步骤 2:创建 Bob
config.json` 新增成员,状态 `working
步骤 3:Lead 给 Alice 发指令
写入 .team/inbox/alice.jsonl
步骤 4:Alice 自动读收件箱,给 Bob 发消息
写入 .team/inbox/bob.jsonl
步骤 5:Alice 完成任务,状态改为 idle
1. 读取自己的收件箱(自动清空)
inbox = BUS.read_inbox(name)
# 2. 把消息加入自己的对话历史
for msg in inbox:
messages.append({"role": "user", "content": json.dumps(msg)})
# 3. 调用AI思考
response = client.messages.create(...)
messages.append({"role": "assistant", "content": response.content})
# 4. 执行工具(发消息、读文件等)
if response.stop_reason == "tool_use":
results = []
for block in response.content:
if block.type == "tool_use":
output = self._exec(name, block.name, block.input)
results.append(...)
messages.append({"role": "user", "content": results})
# 5. 任务完成,状态改为空闲
member = self._find_member(name)
member["status"] = "idle"
self._save_config()
#### 核心逻辑
每个成员**独立跑一个死循环**:
1. 自动查收自己的收件箱
2. 把消息交给 AI 思考
3. 执行工具(比如给其他成员发消息)
4. 完成后状态改为 `idle`(空闲)
#### JSON 变化:Alice 完成任务后
`config.json` 中 Alice 状态从 `working` → `idle`:
{ “name”: “Alice”, “role”: “编码者”, “status”: “idle” }
------
### 6. 工具执行 `_exec` + 成员工具列表
执行成员的工具
def _exec(self, sender: str, tool_name: str, args: dict) -> str:
if tool_name == “send_message”:
# 调用邮局发消息
return BUS.send(sender, args[“to”], args[“content”])
# 其他工具:bash、读/写文件
…
成员能用的所有工具
def _teammate_tools(self) -> list:
return [bash, read_file, write_file, send_message, read_inbox]
**解读**:成员可以自己发消息、读文件、执行命令,完全独立工作。
------
### 7. 查询团队信息
列出所有成员
def list_all(self) -> str:
lines = [f"Team: {self.config[‘team_name’]}“]
for m in self.config[“members”]:
lines.append(f” {m[‘name’]} ({m[‘role’]}): {m[‘status’]}")
return “\n”.join(lines)
获取所有成员名字
def member_names(self) -> list:
return [m[“name”] for m in self.config[“members”]]
------
### 8. 全局实例
TEAM = TeammateManager(TEAM_DIR)
**解读**:创建一个**全局唯一的团队管理员**,管理所有 AI 成员。
------
# 第三部分:两个类联动 + 完整 JSON / 文件变化
## 场景:创建 Alice、Bob → Alice 给 Bob 发消息
### 步骤 1:创建 Alice
config.json新增成员,状态working
### 步骤 2:创建 Bob
config.json新增成员,状态working
### 步骤 3:Lead 给 Alice 发指令
写入 `.team/inbox/alice.jsonl`
### 步骤 4:Alice 自动读收件箱,给 Bob 发消息
写入 `.team/inbox/bob.jsonl`
### 步骤 5:Alice 完成任务,状态改为 `idle`
`config.json` 更新状态
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)