learn-claude-code笔记
源码路径:https://github.com/shareAI-lab/learn-claude-code#
s01The Agent Loop
+--------+ +-------+ +---------+
| User | ---> | LLM | ---> | Tool |
| prompt | | | | execute |
+--------+ +---+---+ +----+----+
^ |
| tool_result |
+----------------+
(loop until stop_reason != "tool_use")
一个工具 + 一
个循环 = 一个智能体。
ReAct(Reasoning范式):让AI边思考边行动的交互模式。
用户提示词-llm-工具执行(得到工具结果循环llm-工具执行直到停止)
思考-行动-观察-再思考-…–任务完成给出答案
一个退出条件控制整个流程。循环持续运行, 直到模型不再调用工具。
原理:
- 用户 prompt 作为第一条消息。
- 将消息和工具定义一起发给 LLM。
- 追加助手响应。检查
stop_reason– 如果模型没有调用工具, 结束。 - 执行每个工具调用, 收集结果, 作为 user 消息追加。回到第 2 步。
def agent_loop(query):
# 1.用户 prompt 作为第一条消息。
messages = [{"role": "user", "content": query}]
while True:
# 2.将消息和工具定义一起发给 LLM。z
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
# 3. 追加助手响应。检查 `stop_reason` -- 如果模型没有调用工具, 结束。
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
# 4.执行每个工具调用, 收集结果, 作为 user 消息追加。回到第 2 步。
results = []
for block in response.content:
if block.type == "tool_use":
output = run_bash(block.input["command"])
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})
s02 Tool Use
+--------+ +-------+ +------------------+
| User | ---> | LLM | ---> | Tool Dispatch |
| prompt | | | | { |
+--------+ +---+---+ | bash: run_bash |
^ | read: run_read |
| | write: run_wr |
+-----------+ edit: run_edit |
tool_result | } |
+------------------+
The dispatch map is a dict: {tool_name: handler_function}.
One lookup replaces any if/elif chain.
加一个工具, 只加一个 handler"* – 循环不用动, 新工具注册进 dispatch map 就行。
工具分发 – 扩展模型能触达的边界。
原理
- 每个工具有一个处理函数。路径沙箱防止逃逸工作区。
- dispatch map 将工具名映射到处理函数。
- 循环中按名称查找处理函数。循环体本身与 s01 完全一致。
工具1.:
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_read(path: str, limit: int = None) -> str:
text = safe_path(path).read_text()
lines = text.splitlines()
if limit and limit < len(lines):
lines = lines[:limit]
return "\n".join(lines)[:50000]
dispatch map
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"],
kw["new_text"]),
}
循环中按名称查找处理函数。循环体本身与 s01 完全一致
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input) if handler \
else f"Unknown tool: {block.name}"
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
s03 TodoWrite
规划 – 让模型不偏航, 但不替它画航线。
“同时只能有一个 in_progress” 强制顺序聚焦。nag reminder 制造问责压力 – 你不更新计划, 系统就追着你问。
+--------+ +-------+ +---------+
| User | ---> | LLM | ---> | Tools |
| prompt | | | | + todo |
+--------+ +---+---+ +----+----+
^ |
| tool_result |
+----------------+
|
+-----------+-----------+
| TodoManager state |
| [ ] task A |
| [>] task B <- doing |
| [x] task C |
+-----------------------+
|
if rounds_since_todo >= 3:
inject <reminder> into tool_result
原理
- TodoManager 存储带状态的项目。同一时间只允许一个
in_progress。 todo工具和其他工具一样加入 dispatch map。- nag reminder: 模型连续 3 轮以上不调用
todo时注入提醒。
TodoManager 存储带状态的项目。同一时间只允许一个 in_progress。
# 定义待办事项管理类
class TodoManager:
# 更新待办列表的方法,接收列表参数,返回字符串
def update(self, items: list) -> str:
# 初始化:校验后的任务列表、进行中任务的数量
validated, in_progress_count = [], 0
# 遍历所有待办任务
for item in items:
# 获取任务状态,没有则默认为 pending
status = item.get("status", "pending")
# 如果任务是进行中,计数 +1
if status == "in_progress":
in_progress_count += 1
# 把合法的任务信息存入 validated
validated.append({
"id": item["id"],
"text": item["text"],
"status": status
})
# 检查:进行中的任务不能超过 1 个,否则报错
if in_progress_count > 1:
raise ValueError("Only one task can be in_progress")
# 将校验后的任务保存到实例属性
self.items = validated
# 调用渲染方法并返回结果
return self.render()
todo 工具和其他工具一样加入 dispatch map。
TOOL_HANDLERS = {
# ...base tools...
"todo": lambda **kw: TODO.update(kw["items"]),
}
nag reminder: 模型连续 3 轮以上不调用 todo 时注入提醒。
# 判断条件:距离上次待办事项更新已满3轮对话 且 消息列表不为空
if rounds_since_todo >= 3 and messages:
# 取出消息列表中的最后一条消息(最新的一条消息)
last = messages[-1]
# 判断:最后一条消息是用户发送的,且消息内容是列表格式
if last["role"] == "user" and isinstance(last.get("content"), list):
# 在用户消息内容的最前面插入一条提醒文本
last["content"].insert(0, {
"type": "text", # 消息类型:文本
"text": "<reminder>Update your todos.</reminder>", # 提醒内容:更新待办事项
})
s04 Subagents (子智能体)
Parent agent Subagent
+------------------+ +------------------+
| messages=[...] | | messages=[] | <-- fresh
| | dispatch | |
| tool: task | ----------> | while tool_use: |
| prompt="..." | | call tools |
| | summary | append results |
| result = "..." | <---------- | return last text |
+------------------+ +------------------+
Parent context stays clean. Subagent context is discarded.
“大任务拆小, 每个小
任务干净的上下文”* – 子智能体用独立 messages[], 不污染主对话。
上下文隔离 – 守护模型的思维清晰度。
子智能体可能跑了 30+ 次工具调用, 但整个消息历史直接丢弃。父智能体收到的只是一段摘要文本, 作为普通 tool_result 返回。
工作原理
- 父智能体有一个
task工具。子智能体拥有除task外的所有基础工具 (禁止递归生成)。 - 子智能体以
messages=[]启动, 运行自己的循环。只有最终文本返回给父智能体。
父智能体有一个 task 工具。子智能体拥有除 task 外的所有基础工具 (禁止递归生成)。
# 定义父级智能体可用的工具列表
# 父级工具 = 子级工具 + 新增的 task 工具
PARENT_TOOLS = CHILD_TOOLS + [
# 新增的 task 工具:用于创建子智能体,分配独立任务
{
# 工具名称:task
"name": "task",
# 工具描述:创建一个拥有全新上下文环境的子智能体
"description": "Spawn a subagent with fresh context.",
# 工具输入参数规范
"input_schema": {
# 输入类型:对象(固定格式)
"type": "object",
# 对象属性定义:只需要一个 prompt 参数
"properties": {
"prompt": {
"type": "string" # prompt 参数类型:字符串
}
},
# 必填参数:必须传入 prompt
"required": ["prompt"]
}
},
]
子智能体以 messages=[] 启动, 运行自己的循环。只有最终文本返回给父智能体。
# 定义子智能体执行函数,接收任务提示,返回执行结果字符串
def run_subagent(prompt: str) -> str:
# 初始化子智能体的消息列表:第一条是用户传入的任务提示
sub_messages = [{"role": "user", "content": prompt}]
# 循环执行对话,最多30轮(安全限制,防止无限循环)
for _ in range(30):
# 调用大模型接口生成响应
response = client.messages.create(
model=MODEL, # 使用的模型名称
system=SUBAGENT_SYSTEM, # 子智能体的系统提示词
messages=sub_messages, # 对话历史消息
tools=CHILD_TOOLS, # 子智能体可用的工具列表
max_tokens=8000, # 最大生成token数
)
# 将模型的响应添加到对话历史(角色:助手)
sub_messages.append({
"role": "assistant",
"content": response.content
})
# 如果停止原因不是工具调用,说明任务完成,退出循环
if response.stop_reason != "tool_use":
break
# 处理模型调用的工具,收集执行结果
results = []
# 遍历响应中的每个内容块
for block in response.content:
# 判断是否是工具调用类型
if block.type == "tool_use":
# 根据工具名称获取对应的处理函数
handler = TOOL_HANDLERS.get(block.name)
# 执行工具处理函数,传入参数
output = handler(**block.input)
# 构造工具执行结果(限制长度防止溢出)
results.append({
"type": "tool_result", # 类型:工具结果
"tool_use_id": block.id, # 关联的工具调用ID
"content": str(output)[:50000] # 结果内容,截取前50000字符
})
# 将工具执行结果作为用户消息,添加到对话历史
sub_messages.append({"role": "user", "content": results})
# 从最终响应中提取所有文本内容,拼接成字符串返回
# 如果没有文本,返回默认提示 "(no summary)"
return "".join(
b.text for b in response.content if hasattr(b, "text")
) or "(no summary)"
s05 Skills (技能加载)
System prompt (Layer 1 -- always present):
+--------------------------------------+
| You are a coding agent. |
| Skills available: |
| - git: Git workflow helpers | ~100 tokens/skill
| - test: Testing best practices |
+--------------------------------------+
When model calls load_skill("git"):
+--------------------------------------+
| tool_result (Layer 2 -- on demand): |
| <skill name="git"> |
| Full git workflow instructions... | ~2000 tokens
| Step 1: ... |
| </skill> |
+--------------------------------------+
“用到什么知识, 临时加载什么知识” – 通过 tool_result 注入, 不塞 system prompt。
按需知识 – 模型开口要时才给的领域专长。
原理
- 每个技能是一个目录, 包含
SKILL.md文件和 YAML frontmatter。 - SkillLoader 递归扫描
SKILL.md文件, 用目录名作为技能标识。 - 第一层写入系统提示。第二层不过是 dispatch map 中的又一个工具。
每个技能是一个目录, 包含 SKILL.md 文件和 YAML frontmatter。
skills/
pdf/
SKILL.md # ---\n name: pdf\n description: Process PDF files\n ---\n ...
code-review/
SKILL.md # ---\n name: code-review\n description: Review code\n ---\n ...
SkillLoader 递归扫描 SKILL.md 文件, 用目录名作为技能标识。
# 技能加载器类:负责从文件系统加载、解析、管理技能文档
class SkillLoader:
# 初始化方法:传入技能文件夹路径,自动加载所有技能
def __init__(self, skills_dir: Path):
# 存储所有技能的字典:key=技能名,value=技能元数据+内容
self.skills = {}
# 递归遍历指定目录下 所有名为 SKILL.md 的文件(排序后遍历)
for f in sorted(skills_dir.rglob("SKILL.md")):
# 读取.md文件的全部文本内容
text = f.read_text()
# 解析文件:拆分为 前端元数据(meta) 和 技能正文(body)
meta, body = self._parse_frontmatter(text)
# 取技能名:优先用meta里的name,没有就用文件夹名
name = meta.get("name", f.parent.name)
# 把解析好的技能存入字典
self.skills[name] = {"meta": meta, "body": body}
# 获取所有技能的描述文本(用于展示/提示词)
def get_descriptions(self) -> str:
# 存储每行技能描述
lines = []
# 遍历所有已加载的技能
for name, skill in self.skills.items():
# 取出描述,没有则为空
desc = skill["meta"].get("description", "")
# 拼接成固定格式:- 技能名: 描述
lines.append(f" - {name}: {desc}")
# 用换行符连接所有行,返回完整字符串
return "\n".join(lines)
# 根据技能名,获取完整的技能内容(带标签包裹)
def get_content(self, name: str) -> str:
# 从技能字典中查找指定技能
skill = self.skills.get(name)
# 技能不存在:返回错误信息
if not skill:
return f"Error: Unknown skill '{name}'."
# 技能存在:返回带 <skill> 标签包裹的完整内容
return f"<skill name=\"{name}\">\n{skill['body']}\n</skill>"
第一层写入系统提示。第二层不过是 dispatch map 中的又一个工具。
# 定义系统提示词(给AI智能体的身份与能力说明)
# 身份:工作目录下的编码智能体 + 列出所有可用技能描述
SYSTEM = f"""You are a coding agent at {WORKDIR}.
Skills available:
{SKILL_LOADER.get_descriptions()}"""
# 工具处理器字典:key=工具名,value=处理函数
TOOL_HANDLERS = {
# 基础工具(省略)
# ...base tools...
# 工具:load_skill → 调用技能加载器,根据技能名称获取完整技能内容
"load_skill": lambda **kw: SKILL_LOADER.get_content(kw["name"]),
}
s06: Context Compact (上下文压缩)
“上下文总会满, 要有办法腾地方” – 三层压缩策略, 换来无限会话。
压缩 – 干净的记忆, 无限的会话。
Every turn:
+------------------+
| Tool call result |
+------------------+
|
v
[Layer 1: micro_compact] (silent, every turn)
Replace tool_result > 3 turns old
with "[Previous: used {tool_name}]"
|
v
[Check: tokens > 50000?]
| |
no yes
| |
v v
continue [Layer 2: auto_compact]
Save transcript to .transcripts/
LLM summarizes conversation.
Replace all messages with [summary].
|
v
[Layer 3: compact tool]
Model calls compact explicitly.
Same summarization as auto_compact.
原理
- 第一层 – micro_compact: 每次 LLM 调用前, 将旧的 tool result 替换为占位符。
- 第二层 – auto_compact: token 超过阈值时, 保存完整对话到磁盘, 让 LLM 做摘要。
- 第三层 – manual compact:
compact工具按需触发同样的摘要机制。 - 循环整合三层:
第一层 – micro_compact*: 每次 LLM 调用前, 将旧的 tool result 替换为占位符。
# 定义消息精简函数:对历史消息中的工具结果进行压缩,保留最近的,减少上下文长度
def micro_compact(messages: list) -> list:
# 存储所有找到的【工具结果】消息条目,格式:(消息索引, 内容块索引, 内容块)
tool_results = []
# 遍历每一条历史消息,查找工具执行结果
for i, msg in enumerate(messages):
# 筛选:用户消息 + 内容是列表格式(符合工具结果格式)
if msg["role"] == "user" and isinstance(msg.get("content"), list):
# 遍历这条消息里的每一个内容块
for j, part in enumerate(msg["content"]):
# 判断:内容块是字典,且类型为 tool_result(工具返回结果)
if isinstance(part, dict) and part.get("type") == "tool_result":
# 把这条工具结果记录下来
tool_results.append((i, j, part))
# 如果工具结果总数 ≤ 需要保留的最近数量,直接返回原消息,不做压缩
if len(tool_results) <= KEEP_RECENT:
return messages
# 对【除了最近 N 条之外】的旧工具结果进行精简
# 只处理:前面所有旧的工具结果(去掉最后 KEEP_RECENT 条)
for _, _, part in tool_results[:-KEEP_RECENT]:
# 如果这个工具结果的内容长度 > 100,说明内容很长,需要精简
if len(part.get("content", "")) > 100:
# 把长内容替换成简短提示(原代码里 tool_name 需自行定义)
part["content"] = f"[Previous: used {tool_name}]"
# 返回精简后的消息列表
return messages
第二层 – auto_compact: token 超过阈值时, 保存完整对话到磁盘, 让 LLM 做摘要。
# 定义自动压缩函数:用于精简过长的对话历史,保留上下文核心信息
def auto_compact(messages: list) -> list:
# 1. 先保存原始对话记录,用于后续恢复/调试
# 生成保存路径:用时间戳命名,确保文件名唯一
transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
# 以写入模式打开文件
with open(transcript_path, "w") as f:
# 遍历每一条消息
for msg in messages:
# 将消息转为JSON字符串并逐行写入文件
f.write(json.dumps(msg, default=str) + "\n")
# 2. 调用大模型对对话历史进行总结压缩
response = client.messages.create(
model=MODEL, # 使用指定的AI模型
messages=[{ # 给模型发送指令:总结对话
"role": "user",
"content":
# 指令:总结对话以保持连贯性
"Summarize this conversation for continuity..."
# 拼接原始对话(限制长度防止超出模型限制)
+ json.dumps(messages, default=str)[:80000]
}],
max_tokens=2000, # 限制总结内容的最大长度
)
# 3. 返回精简后的新对话历史(只保留总结,替代原长对话)
return [
# 第一条:用户消息,包含压缩后的总结
{"role": "user", "content": f"[Compressed]\n\n{response.content[0].text}"},
# 第二条:助手回复,表示已理解并继续
{"role": "assistant", "content": "Understood. Continuing."},
]
循环整合三层:
# 定义智能体主循环:负责持续对话、自动压缩上下文、执行工具
def agent_loop(messages: list):
# 无限循环,保持对话持续进行
while True:
# 【第一层压缩】轻量级精简:只压缩过长的工具结果,保留最近记录
micro_compact(messages)
# 估算当前对话总token数,如果超过阈值,则执行深度压缩
if estimate_tokens(messages) > THRESHOLD:
# 【第二层压缩】重量级AI总结压缩:用大模型把长对话浓缩成简短总结
# messages[:] = 表示直接替换原列表内容,而不是创建新列表
messages[:] = auto_compact(messages)
# 调用大模型接口,生成回复(具体参数省略)
response = client.messages.create(...)
# ... 中间执行工具调用逻辑(省略) ...
# 如果触发了手动压缩标志,则立即执行深度压缩
if manual_compact:
# 【第三层压缩】手动触发的AI总结压缩
messages[:] = auto_compact(messages)
s07: Task System (任务系统)
“大目标要拆成小任务, 排好序, 记在磁盘上” – 文件持久化的任务图, 为多 agent 协作打基础。
持久化任务 – 比任何一次对话都长命的目标。
真实目标是有结构的 – 任务 B 依赖任务 A, 任务 C 和 D 可以并行, 任务 E 要等 C 和 D 都完成。
没有显式的关系, 智能体分不清什么能做、什么被卡住、什么能同时跑。而且清单只活在内存里, 上下文压缩 (s06) 一跑就没了。
把扁平清单升级为持久化到磁盘的任务图。每个任务是一个 JSON 文件, 有状态、前置依赖 (blockedBy) 和后置依赖 (blocks)。任务图随时回答三个问题:
- 什么可以做? – 状态为
pending且blockedBy为空的任务。 - 什么被卡住? – 等待前置任务完成的任务。
- 什么做完了? – 状态为
completed的任务, 完成时自动解锁后续任务。
.tasks/
task_1.json {"id":1, "status":"completed"}
task_2.json {"id":2, "blockedBy":[1], "status":"pending"}
task_3.json {"id":3, "blockedBy":[1], "status":"pending"}
task_4.json {"id":4, "blockedBy":[2,3], "status":"pending"}
任务图 (DAG):
+----------+
+--> | task 2 | --+
| | pending | |
+----------+ +----------+ +--> +----------+
| task 1 | | task 4 |
| completed| --> +----------+ +--> | blocked |
+----------+ | task 3 | --+ +----------+
| pending |
+----------+
顺序: task 1 必须先完成, 才能开始 2 和 3
并行: task 2 和 3 可以同时执行
依赖: task 4 要等 2 和 3 都完成
状态: pending -> in_progress -> completed
原理
- TaskManager: 每个任务一个 JSON 文件, CRUD + 依赖图。
- 依赖解除: 完成任务时, 自动将其 ID 从其他任务的
blockedBy中移除, 解锁后续任务。 - 状态变更 + 依赖关联:
update处理状态转换和依赖边。 - 四个任务工具加入 dispatch map。
- TaskManager: 每个任务一个 JSON 文件, CRUD + 依赖图。
# 任务管理类:负责创建、存储、管理任务数据
class TaskManager:
# 初始化方法:设置任务存储目录并初始化任务ID
# tasks_dir:任务文件的存储路径(Path类型)
def __init__(self, tasks_dir: Path):
# 保存任务存储目录到实例变量
self.dir = tasks_dir
# 创建任务目录(如果不存在),exist_ok=True 表示已存在则不报错
self.dir.mkdir(exist_ok=True)
# 初始化下一个任务的ID:获取当前最大ID并+1
self._next_id = self._max_id() + 1
# 创建新任务的方法
# subject:任务主题(必填)
# description:任务描述(可选,默认为空)
def create(self, subject, description=""):
# 构造任务字典,包含完整的任务字段
task = {
"id": self._next_id, # 任务唯一ID
"subject": subject, # 任务主题/标题
"status": "pending", # 任务状态:默认待处理
"blockedBy": [], # 依赖的任务ID列表(被哪些任务阻塞)
"blocks": [], # 阻塞的任务ID列表(阻塞哪些任务)
"owner": "" # 任务负责人,默认为空
}
# 保存任务到文件系统
self._save(task)
# 创建完成后,下一个任务ID自增
self._next_id += 1
# 将任务转为格式化的JSON字符串并返回
return json.dumps(task, indent=2)
- 依赖解除: 完成任务时, 自动将其 ID 从其他任务的
blockedBy中移除, 解锁后续任务。
# 清理任务依赖的私有方法
# 当某个任务完成时,将其从其他任务的"依赖列表"中移除
# completed_id:已完成的任务ID
def _clear_dependency(self, completed_id):
# 遍历任务目录下所有以 task_ 开头的json文件(所有任务文件)
for f in self.dir.glob("task_*.json"):
# 读取文件内容并解析为任务字典
task = json.loads(f.read_text())
# 检查:已完成的任务ID 是否存在于 当前任务的依赖列表中
if completed_id in task.get("blockedBy", []):
# 从当前任务的依赖列表中,移除已完成的任务ID
task["blockedBy"].remove(completed_id)
# 保存修改后的任务到文件
self._save(task)
- 状态变更 + 依赖关联:
update处理状态转换和依赖边。
# 更新任务信息的方法:支持修改状态、添加依赖/阻塞关系
# task_id: 要更新的任务ID
# status: 可选,要设置的任务状态
# add_blocked_by: 可选,要添加的【依赖任务ID】
# add_blocks: 可选,要添加的【阻塞任务ID】
def update(self, task_id, status=None,
add_blocked_by=None, add_blocks=None):
# 1. 根据任务ID加载对应的任务数据
task = self._load(task_id)
# 2. 如果传入了新状态,就更新任务状态
if status:
task["status"] = status
# 3. 如果状态被改为【已完成】,自动清理该任务的依赖关系
if status == "completed":
self._clear_dependency(task_id)
# 4. 保存修改后的任务到文件
self._save(task)
- 四个任务工具加入 dispatch map。
TOOL_HANDLERS = {
# ...base tools...
"task_create": lambda **kw: TASKS.create(kw["subject"]),
"task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status")),
"task_list": lambda **kw: TASKS.list_all(),
"task_get": lambda **kw: TASKS.get(kw["task_id"]),
}
s08: Background Tasks (后台任务)
“慢操作丢后台, agent 继续想下一步”* – 后台线程跑命令, 完成后注入通知。
后台执行 – 模型继续思考, harness 负责等待。
有些命令要跑好几分钟: npm install、pytest、docker build。阻塞式循环下模型只能干等。用户说 “装依赖, 顺便建个配置文件”, 智能体却只能一个一个来。
Main thread Background thread
+-----------------+ +-----------------+
| agent loop | | subprocess runs |
| ... | | ... |
| [LLM call] <---+------- | enqueue(result) |
| ^drain queue | +-----------------+
+-----------------+
Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
| |
v v
[A runs] [B runs] (parallel)
| |
+-- results injected before next LLM call --+
原理:
- BackgroundManager 用线程安全的通知队列追踪任务。
run()启动守护线程, 立即返回。- 子进程完成后, 结果进入通知队列。
- 每次 LLM 调用前排空通知队列。
- BackgroundManager 用线程安全的通知队列追踪任务。
# 后台任务管理类:用于管理多线程后台任务、通知队列和线程安全操作
class BackgroundManager:
# 初始化方法:创建后台管理器实例
def __init__(self):
# 存储后台任务的字典(key=任务ID,value=任务对象/信息)
self.tasks = {}
# 通知队列:用于存储后台任务产生的通知/消息(先进先出)
self._notification_queue = []
# 线程锁:保证多线程环境下操作数据安全,防止并发冲突
self._lock = threading.Lock()
run()启动守护线程, 立即返回。
# 启动后台任务的方法:接收命令字符串,异步执行并返回任务ID
# command: 要在后台执行的命令
# 返回: 任务启动提示信息
def run(self, command: str) -> str:
# 生成唯一的短任务ID(取UUID前8位,保证不重复)
task_id = str(uuid.uuid4())[:8]
# 将任务存入任务字典,标记状态为 running,记录执行命令
self.tasks[task_id] = {"status": "running", "command": command}
# 创建守护线程:目标执行方法 _execute,传入任务ID和命令
thread = threading.Thread(
target=self._execute, # 线程要运行的函数
args=(task_id, command), # 传给函数的参数
daemon=True # 设为守护线程,主程序退出时自动结束
)
# 启动线程,后台异步执行命令
thread.start()
# 返回任务启动成功的提示信息
return f"Background task {task_id} started"
- 子进程完成后, 结果进入通知队列。
# 【私有方法】在后台线程中执行具体的系统命令
# task_id: 后台任务唯一ID
# command: 要执行的shell命令
def _execute(self, task_id, command):
try:
# 执行系统命令(子进程)
# shell=True: 以shell环境运行命令
# cwd=WORKDIR: 在指定工作目录执行
# capture_output=True: 捕获标准输出+标准错误
# text=True: 输出以字符串形式返回
# timeout=300: 最大执行时间300秒(5分钟)
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=300)
# 拼接标准输出 + 错误输出,去首尾空格,限制最大长度50000字符
output = (r.stdout + r.stderr).strip()[:50000]
# 如果命令执行超时,捕获异常并返回超时提示
except subprocess.TimeoutExpired:
output = "Error: Timeout (300s)"
# 加线程锁:保证多线程下操作队列安全
with self._lock:
# 将执行结果(任务ID + 截取后的输出)加入通知队列
self._notification_queue.append({
"task_id": task_id, # 关联任务ID
"result": output[:500] # 结果只保留前500字符,避免过长
})
- 每次 LLM 调用前排空通知队列。
# 智能体主循环:持续处理对话 + 后台任务通知
def agent_loop(messages: list):
# 无限循环,保持智能体持续运行
while True:
# 1. 获取并清空后台任务管理器中的所有通知(后台执行结果)
notifs = BG.drain_notifications()
# 2. 如果有后台任务返回结果
if notifs:
# 把所有后台通知拼接成文本,格式:[bg:任务ID] 执行结果
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['result']}" for n in notifs
)
# 3. 将后台结果包装成 <background-results> 标签,加入对话历史(用户角色)
messages.append({
"role": "user",
"content": f"<background-results>\n{notif_text}\n</background-results>"
})
# 4. 让智能体回复“已记录后台结果”,保持对话逻辑连贯
messages.append({
"role": "assistant",
"content": "Noted background results."
})
# 5. 调用大模型生成下一轮回复(具体参数省略)
response = client.messages.create(...)
s09: Agent Teams (智能体团队)
“任务太大一个人干不完, 要能分给队友” – 持久化队友 + JSONL 邮箱。
团队邮箱 – 多个模型, 通过文件协调。
真正的团队协作需要三样东西: (1) 能跨多轮对话存活的持久智能体, (2) 身份和生命周期管理, (3) 智能体之间的通信通道。
Teammate lifecycle:
spawn -> WORKING -> IDLE -> WORKING -> ... -> SHUTDOWN
Communication:
.team/
config.json <- team roster + statuses
inbox/
alice.jsonl <- append-only, drain-on-read
bob.jsonl
lead.jsonl
+--------+ send("alice","bob","...") +--------+
| alice | -----------------------------> | bob |
| loop | bob.jsonl << {json_line} | loop |
+--------+ +--------+
^ |
| BUS.read_inbox("alice") |
+---- alice.jsonl -> read + drain ---------+
原理:
- TeammateManager 通过 config.json 维护团队名册。
spawn()创建队友并在线程中启动 agent loop。- MessageBus: append-only 的 JSONL 收件箱。
send()追加一行;read_inbox()读取全部并清空。 - 每个队友在每次 LLM 调用前检查收件箱, 将消息注入上下文。
- TeammateManager 通过 config.json 维护团队名册。
# 队友管理类:用于管理团队成员配置、对话线程和文件存储
class TeammateManager:
# 初始化方法:设置团队目录、加载配置、初始化线程字典
# team_dir: 团队数据的存储路径(Path类型)
def __init__(self, team_dir: Path):
# 保存团队目录路径到实例变量
self.dir = team_dir
# 创建团队目录(如果不存在),exist_ok=True 表示已存在则不报错
self.dir.mkdir(exist_ok=True)
# 定义团队配置文件路径(目录下的 config.json)
self.config_path = self.dir / "config.json"
# 加载配置文件内容到 self.config
self.config = self._load_config()
# 存储队友对话线程的字典(key=队友ID/名称,value=对话消息列表)
self.threads = {}
spawn()创建队友并在线程中启动 agent loop。
# 创建并启动一个新的团队成员(子智能体)
# name: 成员名称
# role: 成员角色/职责
# prompt: 给该成员的初始指令
def spawn(self, name: str, role: str, prompt: str) -> str:
# 构建新成员的信息字典:包含名称、角色、工作状态
member = {
"name": name, # 成员名称
"role": role, # 成员角色(如:开发者、测试者)
"status": "working" # 成员状态:工作中
}
# 将新成员添加到配置的成员列表中
self.config["members"].append(member)
# 保存更新后的团队配置到文件
self._save_config()
# 创建守护线程,运行队友的主循环逻辑
# target: 线程执行的目标函数(成员的主循环)
# args: 传给目标函数的参数(名称、角色、初始指令)
# daemon=True: 守护线程,主程序退出时自动终止
thread = threading.Thread(
target=self._teammate_loop,
args=(name, role, prompt),
daemon=True
)
# 启动线程,让成员在后台独立运行
thread.start()
# 返回成员创建成功的提示信息
return f"Spawned teammate '{name}' (role: {role})"
- MessageBus: append-only 的 JSONL 收件箱。
send()追加一行;read_inbox()读取全部并清空。
# 消息总线类:负责团队成员之间的消息发送、存储和读取(基于文件实现简易消息队列)
class MessageBus:
# 发送消息给指定成员
# sender: 发送者名称
# to: 接收者名称
# content: 消息内容
# msg_type: 消息类型,默认为普通消息
# extra: 额外附加数据(可选)
def send(self, sender, to, content, msg_type="message", extra=None):
# 构建标准消息结构体
msg = {
"type": msg_type, # 消息类型
"from": sender, # 发送者
"content": content, # 消息内容
"timestamp": time.time() # 时间戳(记录发送时间)
}
# 如果有额外参数,合并到消息中
if extra:
msg.update(extra)
# 以追加模式打开接收者的消息文件(文件名:接收者名.jsonl)
with open(self.dir / f"{to}.jsonl", "a") as f:
# 将消息转为JSON字符串并写入文件(一行一条)
f.write(json.dumps(msg) + "\n")
# 读取并清空指定成员的收件箱(拉取所有消息)
# name: 要读取收件箱的成员名称
def read_inbox(self, name):
# 构造成员收件箱文件路径
path = self.dir / f"{name}.jsonl"
# 如果文件不存在,返回空列表的JSON字符串
if not path.exists():
return "[]"
# 读取所有行,逐行解析为JSON消息列表
msgs = [json.loads(line) for line in path.read_text().strip().splitlines() if line]
# 清空收件箱文件(drain:取出后即清空,避免重复处理)
path.write_text("")
# 将消息格式化为格式化JSON字符串返回
return json.dumps(msgs, indent=2)
- 每个队友在每次 LLM 调用前检查收件箱, 将消息注入上下文。
# 【私有方法】团队成员(子智能体)的独立运行主循环
# name: 成员名称 role: 成员角色 prompt: 初始任务指令
def _teammate_loop(self, name, role, prompt):
# 初始化成员的对话上下文,第一条消息是初始任务指令
messages = [{"role": "user", "content": prompt}]
# 最多执行50轮循环(防止无限运行,安全限制)
for _ in range(50):
# 1. 读取当前成员的收件箱消息(读完自动清空)
inbox = BUS.read_inbox(name)
# 2. 如果收件箱有新消息,将其加入对话上下文
if inbox != "[]":
# 把收件箱消息包装成标签格式,作为用户消息传入
messages.append({"role": "user",
"content": f"<inbox>{inbox}</inbox>"})
# 助手回复:已记录收件箱消息,保持上下文连贯
messages.append({"role": "assistant",
"content": "Noted inbox messages."})
# 3. 调用AI大模型生成响应(执行任务/调用工具)
response = client.messages.create(...)
# 4. 如果模型停止原因不是「工具调用」,说明任务完成,退出循环
if response.stop_reason != "tool_use":
break
# (此处省略)执行工具调用,并将结果追加到messages中
# 5. 循环结束后,将成员状态更新为「空闲 idle」
self._find_member(name)["status"] = "idle"
s10: Team Protocols (团队协议)
“队友之间要有统一的沟通规矩” – 一个 request-response 模式驱动所有协商。
协议 – 模型之间的结构化握手。
Shutdown Protocol Plan Approval Protocol
================== ======================
Lead Teammate Teammate Lead
| | | |
|--shutdown_req-->| |--plan_req------>|
| {req_id:"abc"} | | {req_id:"xyz"} |
| | | |
|<--shutdown_resp-| |<--plan_resp-----|
| {req_id:"abc", | | {req_id:"xyz", |
| approve:true} | | approve:true} |
Shared FSM:
[pending] --approve--> [approved]
[pending] --reject---> [rejected]
Trackers:
shutdown_requests = {req_id: {target, status}}
plan_requests = {req_id: {from, plan, status}}
原理
- 领导生成 request_id, 通过收件箱发起关机请求。
- 队友收到请求后, 用 approve/reject 响应。
- 计划审批遵循完全相同的模式。队友提交计划 (生成 request_id), 领导审查 (引用同一个 request_id)。
- 领导生成 request_id, 通过收件箱发起关机请求。
# 全局字典:存储所有关机请求,key=请求ID,value=请求详情
shutdown_requests = {}
# 处理向团队成员发送关机请求的函数
# teammate: 要关闭的队友/子智能体名称
def handle_shutdown_request(teammate: str) -> str:
# 生成唯一的请求ID(取UUID前8位,确保不重复)
req_id = str(uuid.uuid4())[:8]
# 将关机请求存入字典,标记目标成员和状态为待处理
shutdown_requests[req_id] = {
"target": teammate, # 目标关闭对象
"status": "pending" # 请求状态:待处理
}
# 通过消息总线发送关机指令
# 发送者:lead(管理者)
# 接收者:指定队友
# 内容:请优雅地关闭
# 消息类型:shutdown_request(关机请求)
# 附加参数:携带请求ID,用于追踪
BUS.send(
"lead", # 发送者
teammate, # 接收者
"Please shut down gracefully.", # 消息内容
"shutdown_request", # 消息类型
{"request_id": req_id} # 附加信息:请求ID
)
# 返回请求发送成功的提示信息
return f"Shutdown request {req_id} sent (status: pending)"
- 队友收到请求后, 用 approve/reject 响应。
# 处理【关机响应】工具调用
# 当子智能体收到关机请求后,通过该工具回复同意/拒绝
if tool_name == "shutdown_response":
# 从工具参数中获取:关机请求ID、是否同意关闭
req_id = args["request_id"]
approve = args["approve"]
# 更新全局请求状态:同意 → approved,拒绝 → rejected
shutdown_requests[req_id]["status"] = "approved" if approve else "rejected"
# 通过消息总线,把响应结果发送给领导者(lead)
# sender:发送响应的队友名称
# 接收者:lead(管理者)
# 消息内容:拒绝/同意的原因
# 消息类型:shutdown_response
# 附加信息:携带请求ID + 同意状态
BUS.send(
sender, # 发送者:当前队友
"lead", # 接收者:主智能体/管理者
args.get("reason", ""),# 响应原因(可选)
"shutdown_response", # 消息类型:关机响应
{
"request_id": req_id,
"approve": approve
}
)
- 计划审批遵循完全相同的模式。队友提交计划 (生成 request_id), 领导审查 (引用同一个 request_id)。
# 全局字典:存储所有【计划审核请求】,key=请求ID,value=请求详情
plan_requests = {}
# 处理计划审核结果的函数(由领导者/管理员调用)
# request_id: 待审核的计划请求ID
# approve: 是否通过审核(布尔值)
# feedback: 审核反馈/意见(可选)
def handle_plan_review(request_id, approve, feedback=""):
# 1. 根据请求ID,从字典中取出对应的计划请求
req = plan_requests[request_id]
# 2. 更新请求状态:通过/拒绝
req["status"] = "approved" if approve else "rejected"
# 3. 通过消息总线,把审核结果发送给【提交计划的成员】
BUS.send(
"lead", # 发送者:领导者/管理员
req["from"], # 接收者:提交该计划的成员
feedback, # 发送审核反馈内容
"plan_approval_response", # 消息类型:计划审核响应
{
"request_id": request_id, # 携带请求ID,用于关联
"approve": approve # 携带审核结果(通过/拒绝)
}
)
一个 FSM, 两种用途。同样的 pending -> approved | rejected 状态机可以套用到任何请求-响应协议上。
s11: Autonomous Agents (自治智能体)
“队友自己看看板, 有活就认领” – 不需要领导逐个分配, 自组织。
自治 – 模型自己找活干, 无需指派。
Teammate lifecycle with idle cycle:
+-------+
| spawn |
+---+---+
|
v
+-------+ tool_use +-------+
| WORK | <------------- | LLM |
+---+---+ +-------+
|
| stop_reason != tool_use (or idle tool called)
v
+--------+
| IDLE | poll every 5s for up to 60s
+---+----+
|
+---> check inbox --> message? ----------> WORK
|
+---> scan .tasks/ --> unclaimed? -------> claim -> WORK
|
+---> 60s timeout ----------------------> SHUTDOWN
Identity re-injection after compression:
if len(messages) <= 3:
messages.insert(0, identity_block)
原理
- 队友循环分两个阶段: WORK 和 IDLE。LLM 停止调用工具 (或调用了
idle) 时, 进入 IDLE。 - 空闲阶段循环轮询收件箱和任务看板。
- 任务看板扫描: 找 pending 状态、无 owner、未被阻塞的任务。
- 身份重注入: 上下文过短 (说明发生了压缩) 时, 在开头插入身份块。
- 队友循环分两个阶段: WORK 和 IDLE。LLM 停止调用工具 (或调用了
idle) 时, 进入 IDLE。
# 【私有方法】智能体/团队成员的完整生命周期循环
# 包含:工作阶段 → 空闲等待 → 恢复/关闭 全流程
# name: 成员名称 role: 成员角色 prompt: 初始工作指令
def _loop(self, name, role, prompt):
# 无限循环:维持成员持续运行
while True:
# ======================
# 【一、工作阶段】执行任务
# ======================
# 初始化对话上下文,使用初始指令
messages = [{"role": "user", "content": prompt}]
# 最多执行50轮工具调用/任务处理(安全限制)
for _ in range(50):
# 调用AI模型生成响应
response = client.messages.create(...)
# 如果模型停止原因不是工具调用 → 任务完成,退出工作循环
if response.stop_reason != "tool_use":
break
# (省略)执行具体工具逻辑
# 如果收到空闲请求 → 主动中断工作
if idle_requested:
break
# ======================
# 【二、空闲阶段】等待指令
# ======================
# 将成员状态设置为:空闲(idle)
self._set_status(name, "idle")
# 进入空闲轮询:等待恢复指令或关机指令
# 返回 True=继续工作 False=关闭程序
resume = self._idle_poll(name, messages)
# 判断是否需要关闭成员
if not resume:
# 设置状态为:已关闭(shutdown)
self._set_status(name, "shutdown")
# 退出循环,终止线程
return
# 收到恢复指令 → 重新设置状态为:工作中(working)
self._set_status(name, "working")
- 空闲阶段循环轮询收件箱和任务看板。
# 【私有方法】智能体空闲轮询:等待消息、自动领任务、超时判断
# 智能体在空闲时循环检查是否需要恢复工作
# name: 当前智能体名称 | messages: 对话上下文
def _idle_poll(self, name, messages):
# 计算轮询次数:总超时时间 ÷ 轮询间隔
# 例:60秒超时,每5秒查一次 → 共轮询12次
for _ in range(IDLE_TIMEOUT // POLL_INTERVAL):
# 每次轮询先等待指定间隔时间(不占用CPU)
time.sleep(POLL_INTERVAL)
# 1. 检查收件箱:是否收到新消息/指令
inbox = BUS.read_inbox(name)
if inbox:
# 有新消息 → 加入上下文,返回True(恢复工作)
messages.append({"role": "user",
"content": f"<inbox>{inbox}</inbox>"})
return True
# 2. 检查是否有无人认领的任务
unclaimed = scan_unclaimed_tasks()
if unclaimed:
# 自动认领第一个任务
claim_task(unclaimed[0]["id"], name)
# 把认领结果加入上下文
messages.append({"role": "user",
"content": f"<auto-claimed>Task #{unclaimed[0]['id']}: "
f"{unclaimed[0]['subject']}</auto-claimed>"})
return True
# 3. 轮询全部结束仍无任务/消息 → 超时,返回False(准备关机)
return False
- 任务看板扫描: 找 pending 状态、无 owner、未被阻塞的任务。
# 扫描所有【无人认领、可执行】的任务
# 筛选条件:待处理 + 无负责人 + 无依赖阻塞
def scan_unclaimed_tasks() -> list:
# 存储符合条件的可认领任务
unclaimed = []
# 遍历任务目录下所有 task_*.json 文件,并按文件名排序
for f in sorted(TASKS_DIR.glob("task_*.json")):
# 读取并解析任务文件内容
task = json.loads(f.read_text())
# 筛选条件:
# 1. 任务状态 = pending(待处理)
# 2. 无负责人(owner 为空)
# 3. 无阻塞依赖(blockedBy 为空)
if (task.get("status") == "pending"
and not task.get("owner")
and not task.get("blockedBy")):
# 符合条件 → 加入可认领列表
unclaimed.append(task)
# 返回所有可自动认领的任务
return unclaimed
- 身份重注入: 上下文过短 (说明发生了压缩) 时, 在开头插入身份块。
# 如果当前对话消息过短(≤3条),为了保证智能体身份认知完整
# 主动在消息列表最前面插入【身份设定】,防止AI忘记自己是谁
if len(messages) <= 3:
# 插入到第0位:给AI的身份与工作指令
messages.insert(0, {
"role": "user",
"content": f"<identity>You are '{name}', role: {role}, "
f"team: {team_name}. Continue your work.</identity>"
})
# 插入到第1位:AI确认身份的回复,保持上下文连贯
messages.insert(1, {
"role": "assistant",
"content": f"I am {name}. Continuing."
})
s12: Worktree + Task Isolation (Worktree 任务隔离)
“各干各的目录, 互不干扰” – 任务管目标, worktree 管目录, 按 ID 绑定。
目录隔离 – 永不碰撞的并行执行通道。
Control plane (.tasks/) Execution plane (.worktrees/)
+------------------+ +------------------------+
| task_1.json | | auth-refactor/ |
| status: in_progress <------> branch: wt/auth-refactor
| worktree: "auth-refactor" | task_id: 1 |
+------------------+ +------------------------+
| task_2.json | | ui-login/ |
| status: pending <------> branch: wt/ui-login
| worktree: "ui-login" | task_id: 2 |
+------------------+ +------------------------+
|
index.json (worktree registry)
events.jsonl (lifecycle log)
State machines:
Task: pending -> in_progress -> completed
Worktree: absent -> active -> removed | kept
原理
- 创建任务。 先把目标持久化。
- 创建 worktree 并绑定任务。 传入
task_id自动将任务推进到in_progress。 - 在 worktree 中执行命令。
cwd指向隔离目录。 - 收尾。 两种选择:
- 事件流。 每个生命周期步骤写入
.worktrees/events.jsonl:
- 创建任务。 先把目标持久化。
TASKS.create("Implement auth refactor")
# -> .tasks/task_1.json status=pending worktree=""
- 创建 worktree 并绑定任务。 传入
task_id自动将任务推进到in_progress。
WORKTREES.create("auth-refactor", task_id=1)
# -> git worktree add -b wt/auth-refactor .worktrees/auth-refactor HEAD
# -> index.json gets new entry, task_1.json gets worktree="auth-refactor"
绑定同时写入两侧状态:
def bind_worktree(self, task_id, worktree):
task = self._load(task_id)
task["worktree"] = worktree
if task["status"] == "pending":
task["status"] = "in_progress"
self._save(task)
- 在 worktree 中执行命令。
cwd指向隔离目录。
subprocess.run(command, shell=True, cwd=worktree_path,
capture_output=True, text=True, timeout=300)
- 收尾。 两种选择:
worktree_keep(name)– 保留目录供后续使用。worktree_remove(name, complete_task=True)– 删除目录, 完成绑定任务, 发出事件。一个调用搞定拆除 + 完成。
def remove(self, name, force=False, complete_task=False):
self._run_git(["worktree", "remove", wt["path"]])
if complete_task and wt.get("task_id") is not None:
self.tasks.update(wt["task_id"], status="completed")
self.tasks.unbind_worktree(wt["task_id"])
self.events.emit("task.completed", ...)
- 事件流。 每个生命周期步骤写入
.worktrees/events.jsonl:
{
"event": "worktree.remove.after",
"task": {"id": 1, "status": "completed"},
"worktree": {"name": "auth-refactor", "status": "removed"},
"ts": 1730000000
}
事件类型: worktree.create.before/after/failed, worktree.remove.before/after/failed, worktree.keep, task.completed。
崩溃后从 .tasks/ + .worktrees/index.json 重建现场。会话记忆是易失的; 磁盘状态是持久的。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)