源码路径:https://github.com/shareAI-lab/learn-claude-code#

s01The Agent Loop

+--------+      +-------+      +---------+
|  User  | ---> |  LLM  | ---> |  Tool   |
| prompt |      |       |      | execute |
+--------+      +---+---+      +----+----+
                    ^                |
                    |   tool_result  |
                    +----------------+
                    (loop until stop_reason != "tool_use")

一个工具 + 一
个循环 = 一个智能体。

ReAct(Reasoning范式):让AI边思考边行动的交互模式。

用户提示词-llm-工具执行(得到工具结果循环llm-工具执行直到停止)

思考-行动-观察-再思考-…–任务完成给出答案

一个退出条件控制整个流程。循环持续运行, 直到模型不再调用工具。

原理:

  • 用户 prompt 作为第一条消息。
  • 将消息和工具定义一起发给 LLM。
  • 追加助手响应。检查 stop_reason – 如果模型没有调用工具, 结束。
  • 执行每个工具调用, 收集结果, 作为 user 消息追加。回到第 2 步。
def agent_loop(query):
   # 1.用户 prompt 作为第一条消息。
    messages = [{"role": "user", "content": query}]
    while True:
       # 2.将消息和工具定义一起发给 LLM。z
        response = client.messages.create(
            model=MODEL, system=SYSTEM, messages=messages,
            tools=TOOLS, max_tokens=8000,
        )
        # 3. 追加助手响应。检查 `stop_reason` -- 如果模型没有调用工具, 结束。
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            return
        # 4.执行每个工具调用, 收集结果, 作为 user 消息追加。回到第 2 步。
        results = []
        for block in response.content:
            if block.type == "tool_use":
                output = run_bash(block.input["command"])
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })
        messages.append({"role": "user", "content": results})

s02 Tool Use

+--------+      +-------+      +------------------+
|  User  | ---> |  LLM  | ---> | Tool Dispatch    |
| prompt |      |       |      | {                |
+--------+      +---+---+      |   bash: run_bash |
                    ^           |   read: run_read |
                    |           |   write: run_wr  |
                    +-----------+   edit: run_edit |
                    tool_result | }                |
                                +------------------+

The dispatch map is a dict: {tool_name: handler_function}.
One lookup replaces any if/elif chain.

加一个工具, 只加一个 handler"* – 循环不用动, 新工具注册进 dispatch map 就行。

工具分发 – 扩展模型能触达的边界。

原理

  • 每个工具有一个处理函数。路径沙箱防止逃逸工作区。
  • dispatch map 将工具名映射到处理函数。
  • 循环中按名称查找处理函数。循环体本身与 s01 完全一致。

工具1.:

def safe_path(p: str) -> Path:
    path = (WORKDIR / p).resolve()
    if not path.is_relative_to(WORKDIR):
        raise ValueError(f"Path escapes workspace: {p}")
    return path

def run_read(path: str, limit: int = None) -> str:
    text = safe_path(path).read_text()
    lines = text.splitlines()
    if limit and limit < len(lines):
        lines = lines[:limit]
    return "\n".join(lines)[:50000]

dispatch map

TOOL_HANDLERS = {
    "bash":       lambda **kw: run_bash(kw["command"]),
    "read_file":  lambda **kw: run_read(kw["path"], kw.get("limit")),
    "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
    "edit_file":  lambda **kw: run_edit(kw["path"], kw["old_text"],
                                        kw["new_text"]),
}

循环中按名称查找处理函数。循环体本身与 s01 完全一致

for block in response.content:
    if block.type == "tool_use":
        handler = TOOL_HANDLERS.get(block.name)
        output = handler(**block.input) if handler \
            else f"Unknown tool: {block.name}"
        results.append({
            "type": "tool_result",
            "tool_use_id": block.id,
            "content": output,
        })

s03 TodoWrite

规划 – 让模型不偏航, 但不替它画航线。

“同时只能有一个 in_progress” 强制顺序聚焦。nag reminder 制造问责压力 – 你不更新计划, 系统就追着你问。

+--------+      +-------+      +---------+
|  User  | ---> |  LLM  | ---> | Tools   |
| prompt |      |       |      | + todo  |
+--------+      +---+---+      +----+----+
                    ^                |
                    |   tool_result  |
                    +----------------+
                          |
              +-----------+-----------+
              | TodoManager state     |
              | [ ] task A            |
              | [>] task B  <- doing  |
              | [x] task C            |
              +-----------------------+
                          |
              if rounds_since_todo >= 3:
                inject <reminder> into tool_result

原理

  • TodoManager 存储带状态的项目。同一时间只允许一个 in_progress
  • todo 工具和其他工具一样加入 dispatch map。
  • nag reminder: 模型连续 3 轮以上不调用 todo 时注入提醒。

TodoManager 存储带状态的项目。同一时间只允许一个 in_progress

# 定义待办事项管理类
class TodoManager:
    # 更新待办列表的方法,接收列表参数,返回字符串
    def update(self, items: list) -> str:
        # 初始化:校验后的任务列表、进行中任务的数量
        validated, in_progress_count = [], 0
        
        # 遍历所有待办任务
        for item in items:
            # 获取任务状态,没有则默认为 pending
            status = item.get("status", "pending")
            
            # 如果任务是进行中,计数 +1
            if status == "in_progress":
                in_progress_count += 1
            
            # 把合法的任务信息存入 validated
            validated.append({
                "id": item["id"], 
                "text": item["text"],
                "status": status
            })
        
        # 检查:进行中的任务不能超过 1 个,否则报错
        if in_progress_count > 1:
            raise ValueError("Only one task can be in_progress")
        
        # 将校验后的任务保存到实例属性
        self.items = validated
        
        # 调用渲染方法并返回结果
        return self.render()

todo 工具和其他工具一样加入 dispatch map。

TOOL_HANDLERS = {
    # ...base tools...
    "todo": lambda **kw: TODO.update(kw["items"]),
}

nag reminder: 模型连续 3 轮以上不调用 todo 时注入提醒。

# 判断条件:距离上次待办事项更新已满3轮对话 且 消息列表不为空
if rounds_since_todo >= 3 and messages:
    # 取出消息列表中的最后一条消息(最新的一条消息)
    last = messages[-1]
    
    # 判断:最后一条消息是用户发送的,且消息内容是列表格式
    if last["role"] == "user" and isinstance(last.get("content"), list):
        # 在用户消息内容的最前面插入一条提醒文本
        last["content"].insert(0, {
            "type": "text",       # 消息类型:文本
            "text": "<reminder>Update your todos.</reminder>",  # 提醒内容:更新待办事项
        })

s04 Subagents (子智能体)

Parent agent                     Subagent
+------------------+             +------------------+
| messages=[...]   |             | messages=[]      | <-- fresh
|                  |  dispatch   |                  |
| tool: task       | ----------> | while tool_use:  |
|   prompt="..."   |             |   call tools     |
|                  |  summary    |   append results |
|   result = "..." | <---------- | return last text |
+------------------+             +------------------+

Parent context stays clean. Subagent context is discarded.

“大任务拆小, 每个小
任务干净的上下文”* – 子智能体用独立 messages[], 不污染主对话。

上下文隔离 – 守护模型的思维清晰度。

子智能体可能跑了 30+ 次工具调用, 但整个消息历史直接丢弃。父智能体收到的只是一段摘要文本, 作为普通 tool_result 返回。

工作原理

  • 父智能体有一个 task 工具。子智能体拥有除 task 外的所有基础工具 (禁止递归生成)。
  • 子智能体以 messages=[] 启动, 运行自己的循环。只有最终文本返回给父智能体。

父智能体有一个 task 工具。子智能体拥有除 task 外的所有基础工具 (禁止递归生成)。

# 定义父级智能体可用的工具列表
# 父级工具 = 子级工具 + 新增的 task 工具
PARENT_TOOLS = CHILD_TOOLS + [
    # 新增的 task 工具:用于创建子智能体,分配独立任务
    {
        # 工具名称:task
        "name": "task",
        # 工具描述:创建一个拥有全新上下文环境的子智能体
        "description": "Spawn a subagent with fresh context.",
        # 工具输入参数规范
        "input_schema": {
            # 输入类型:对象(固定格式)
            "type": "object",
            # 对象属性定义:只需要一个 prompt 参数
            "properties": {
                "prompt": {
                    "type": "string"  # prompt 参数类型:字符串
                }
            },
            # 必填参数:必须传入 prompt
            "required": ["prompt"]
        }
    },
]

子智能体以 messages=[] 启动, 运行自己的循环。只有最终文本返回给父智能体。

# 定义子智能体执行函数,接收任务提示,返回执行结果字符串
def run_subagent(prompt: str) -> str:
    # 初始化子智能体的消息列表:第一条是用户传入的任务提示
    sub_messages = [{"role": "user", "content": prompt}]
    
    # 循环执行对话,最多30轮(安全限制,防止无限循环)
    for _ in range(30):
        # 调用大模型接口生成响应
        response = client.messages.create(
            model=MODEL,                  # 使用的模型名称
            system=SUBAGENT_SYSTEM,       # 子智能体的系统提示词
            messages=sub_messages,        # 对话历史消息
            tools=CHILD_TOOLS,            # 子智能体可用的工具列表
            max_tokens=8000,              # 最大生成token数
        )
        
        # 将模型的响应添加到对话历史(角色:助手)
        sub_messages.append({
            "role": "assistant",
            "content": response.content
        })
        
        # 如果停止原因不是工具调用,说明任务完成,退出循环
        if response.stop_reason != "tool_use":
            break
        
        # 处理模型调用的工具,收集执行结果
        results = []
        # 遍历响应中的每个内容块
        for block in response.content:
            # 判断是否是工具调用类型
            if block.type == "tool_use":
                # 根据工具名称获取对应的处理函数
                handler = TOOL_HANDLERS.get(block.name)
                # 执行工具处理函数,传入参数
                output = handler(**block.input)
                # 构造工具执行结果(限制长度防止溢出)
                results.append({
                    "type": "tool_result",        # 类型:工具结果
                    "tool_use_id": block.id,       # 关联的工具调用ID
                    "content": str(output)[:50000] # 结果内容,截取前50000字符
                })
        
        # 将工具执行结果作为用户消息,添加到对话历史
        sub_messages.append({"role": "user", "content": results})
    
    # 从最终响应中提取所有文本内容,拼接成字符串返回
    # 如果没有文本,返回默认提示 "(no summary)"
    return "".join(
        b.text for b in response.content if hasattr(b, "text")
    ) or "(no summary)"

s05 Skills (技能加载)

System prompt (Layer 1 -- always present):
+--------------------------------------+
| You are a coding agent.              |
| Skills available:                    |
|   - git: Git workflow helpers        |  ~100 tokens/skill
|   - test: Testing best practices     |
+--------------------------------------+

When model calls load_skill("git"):
+--------------------------------------+
| tool_result (Layer 2 -- on demand):  |
| <skill name="git">                   |
|   Full git workflow instructions...  |  ~2000 tokens
|   Step 1: ...                        |
| </skill>                             |
+--------------------------------------+

“用到什么知识, 临时加载什么知识” – 通过 tool_result 注入, 不塞 system prompt。

按需知识 – 模型开口要时才给的领域专长。

原理

  • 每个技能是一个目录, 包含 SKILL.md 文件和 YAML frontmatter。
  • SkillLoader 递归扫描 SKILL.md 文件, 用目录名作为技能标识。
  • 第一层写入系统提示。第二层不过是 dispatch map 中的又一个工具。

每个技能是一个目录, 包含 SKILL.md 文件和 YAML frontmatter。

skills/
  pdf/
    SKILL.md       # ---\n name: pdf\n description: Process PDF files\n ---\n ...
  code-review/
    SKILL.md       # ---\n name: code-review\n description: Review code\n ---\n ...

SkillLoader 递归扫描 SKILL.md 文件, 用目录名作为技能标识。

# 技能加载器类:负责从文件系统加载、解析、管理技能文档
class SkillLoader:
    # 初始化方法:传入技能文件夹路径,自动加载所有技能
    def __init__(self, skills_dir: Path):
        # 存储所有技能的字典:key=技能名,value=技能元数据+内容
        self.skills = {}
        
        # 递归遍历指定目录下 所有名为 SKILL.md 的文件(排序后遍历)
        for f in sorted(skills_dir.rglob("SKILL.md")):
            # 读取.md文件的全部文本内容
            text = f.read_text()
            
            # 解析文件:拆分为 前端元数据(meta) 和 技能正文(body)
            meta, body = self._parse_frontmatter(text)
            
            # 取技能名:优先用meta里的name,没有就用文件夹名
            name = meta.get("name", f.parent.name)
            
            # 把解析好的技能存入字典
            self.skills[name] = {"meta": meta, "body": body}

    # 获取所有技能的描述文本(用于展示/提示词)
    def get_descriptions(self) -> str:
        # 存储每行技能描述
        lines = []
        
        # 遍历所有已加载的技能
        for name, skill in self.skills.items():
            # 取出描述,没有则为空
            desc = skill["meta"].get("description", "")
            # 拼接成固定格式:- 技能名: 描述
            lines.append(f"  - {name}: {desc}")
        
        # 用换行符连接所有行,返回完整字符串
        return "\n".join(lines)

    # 根据技能名,获取完整的技能内容(带标签包裹)
    def get_content(self, name: str) -> str:
        # 从技能字典中查找指定技能
        skill = self.skills.get(name)
        
        # 技能不存在:返回错误信息
        if not skill:
            return f"Error: Unknown skill '{name}'."
        
        # 技能存在:返回带 <skill> 标签包裹的完整内容
        return f"<skill name=\"{name}\">\n{skill['body']}\n</skill>"

第一层写入系统提示。第二层不过是 dispatch map 中的又一个工具。

# 定义系统提示词(给AI智能体的身份与能力说明)
# 身份:工作目录下的编码智能体 + 列出所有可用技能描述
SYSTEM = f"""You are a coding agent at {WORKDIR}.
Skills available:
{SKILL_LOADER.get_descriptions()}"""

# 工具处理器字典:key=工具名,value=处理函数
TOOL_HANDLERS = {
    # 基础工具(省略)
    # ...base tools...
    
    # 工具:load_skill → 调用技能加载器,根据技能名称获取完整技能内容
    "load_skill": lambda **kw: SKILL_LOADER.get_content(kw["name"]),
}

s06: Context Compact (上下文压缩)

“上下文总会满, 要有办法腾地方” – 三层压缩策略, 换来无限会话。

压缩 – 干净的记忆, 无限的会话。

Every turn:
+------------------+
| Tool call result |
+------------------+
        |
        v
[Layer 1: micro_compact]        (silent, every turn)
  Replace tool_result > 3 turns old
  with "[Previous: used {tool_name}]"
        |
        v
[Check: tokens > 50000?]
   |               |
   no              yes
   |               |
   v               v
continue    [Layer 2: auto_compact]
              Save transcript to .transcripts/
              LLM summarizes conversation.
              Replace all messages with [summary].
                    |
                    v
            [Layer 3: compact tool]
              Model calls compact explicitly.
              Same summarization as auto_compact.

原理

  • 第一层 – micro_compact: 每次 LLM 调用前, 将旧的 tool result 替换为占位符。
  • 第二层 – auto_compact: token 超过阈值时, 保存完整对话到磁盘, 让 LLM 做摘要。
  • 第三层 – manual compact: compact 工具按需触发同样的摘要机制。
  • 循环整合三层:

第一层 – micro_compact*: 每次 LLM 调用前, 将旧的 tool result 替换为占位符。

# 定义消息精简函数:对历史消息中的工具结果进行压缩,保留最近的,减少上下文长度
def micro_compact(messages: list) -> list:
    # 存储所有找到的【工具结果】消息条目,格式:(消息索引, 内容块索引, 内容块)
    tool_results = []
    
    # 遍历每一条历史消息,查找工具执行结果
    for i, msg in enumerate(messages):
        # 筛选:用户消息 + 内容是列表格式(符合工具结果格式)
        if msg["role"] == "user" and isinstance(msg.get("content"), list):
            # 遍历这条消息里的每一个内容块
            for j, part in enumerate(msg["content"]):
                # 判断:内容块是字典,且类型为 tool_result(工具返回结果)
                if isinstance(part, dict) and part.get("type") == "tool_result":
                    # 把这条工具结果记录下来
                    tool_results.append((i, j, part))

    # 如果工具结果总数 ≤ 需要保留的最近数量,直接返回原消息,不做压缩
    if len(tool_results) <= KEEP_RECENT:
        return messages

    # 对【除了最近 N 条之外】的旧工具结果进行精简
    # 只处理:前面所有旧的工具结果(去掉最后 KEEP_RECENT 条)
    for _, _, part in tool_results[:-KEEP_RECENT]:
        # 如果这个工具结果的内容长度 > 100,说明内容很长,需要精简
        if len(part.get("content", "")) > 100:
            # 把长内容替换成简短提示(原代码里 tool_name 需自行定义)
            part["content"] = f"[Previous: used {tool_name}]"

    # 返回精简后的消息列表
    return messages

第二层 – auto_compact: token 超过阈值时, 保存完整对话到磁盘, 让 LLM 做摘要。

# 定义自动压缩函数:用于精简过长的对话历史,保留上下文核心信息
def auto_compact(messages: list) -> list:
    # 1. 先保存原始对话记录,用于后续恢复/调试
    # 生成保存路径:用时间戳命名,确保文件名唯一
    transcript_path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
    
    # 以写入模式打开文件
    with open(transcript_path, "w") as f:
        # 遍历每一条消息
        for msg in messages:
            # 将消息转为JSON字符串并逐行写入文件
            f.write(json.dumps(msg, default=str) + "\n")

    # 2. 调用大模型对对话历史进行总结压缩
    response = client.messages.create(
        model=MODEL,  # 使用指定的AI模型
        messages=[{   # 给模型发送指令:总结对话
            "role": "user", 
            "content": 
                # 指令:总结对话以保持连贯性
                "Summarize this conversation for continuity..."
                # 拼接原始对话(限制长度防止超出模型限制)
                + json.dumps(messages, default=str)[:80000]
        }],
        max_tokens=2000,  # 限制总结内容的最大长度
    )

    # 3. 返回精简后的新对话历史(只保留总结,替代原长对话)
    return [
        # 第一条:用户消息,包含压缩后的总结
        {"role": "user", "content": f"[Compressed]\n\n{response.content[0].text}"},
        # 第二条:助手回复,表示已理解并继续
        {"role": "assistant", "content": "Understood. Continuing."},
    ]

循环整合三层:

# 定义智能体主循环:负责持续对话、自动压缩上下文、执行工具
def agent_loop(messages: list):
    # 无限循环,保持对话持续进行
    while True:
        # 【第一层压缩】轻量级精简:只压缩过长的工具结果,保留最近记录
        micro_compact(messages)
        
        # 估算当前对话总token数,如果超过阈值,则执行深度压缩
        if estimate_tokens(messages) > THRESHOLD:
            # 【第二层压缩】重量级AI总结压缩:用大模型把长对话浓缩成简短总结
            # messages[:] = 表示直接替换原列表内容,而不是创建新列表
            messages[:] = auto_compact(messages)
        
        # 调用大模型接口,生成回复(具体参数省略)
        response = client.messages.create(...)
        
        # ... 中间执行工具调用逻辑(省略) ...
        
        # 如果触发了手动压缩标志,则立即执行深度压缩
        if manual_compact:
            # 【第三层压缩】手动触发的AI总结压缩
            messages[:] = auto_compact(messages)

s07: Task System (任务系统)

“大目标要拆成小任务, 排好序, 记在磁盘上” – 文件持久化的任务图, 为多 agent 协作打基础。

持久化任务 – 比任何一次对话都长命的目标。

真实目标是有结构的 – 任务 B 依赖任务 A, 任务 C 和 D 可以并行, 任务 E 要等 C 和 D 都完成。

没有显式的关系, 智能体分不清什么能做、什么被卡住、什么能同时跑。而且清单只活在内存里, 上下文压缩 (s06) 一跑就没了。

把扁平清单升级为持久化到磁盘的任务图。每个任务是一个 JSON 文件, 有状态、前置依赖 (blockedBy) 和后置依赖 (blocks)。任务图随时回答三个问题:

  • 什么可以做? – 状态为 pendingblockedBy 为空的任务。
  • 什么被卡住? – 等待前置任务完成的任务。
  • 什么做完了? – 状态为 completed 的任务, 完成时自动解锁后续任务。
.tasks/
  task_1.json  {"id":1, "status":"completed"}
  task_2.json  {"id":2, "blockedBy":[1], "status":"pending"}
  task_3.json  {"id":3, "blockedBy":[1], "status":"pending"}
  task_4.json  {"id":4, "blockedBy":[2,3], "status":"pending"}

任务图 (DAG):
                 +----------+
            +--> | task 2   | --+
            |    | pending  |   |
+----------+     +----------+    +--> +----------+
| task 1   |                          | task 4   |
| completed| --> +----------+    +--> | blocked  |
+----------+     | task 3   | --+     +----------+
                 | pending  |
                 +----------+

顺序:   task 1 必须先完成, 才能开始 2 和 3
并行:   task 2 和 3 可以同时执行
依赖:   task 4 要等 2 和 3 都完成
状态:   pending -> in_progress -> completed

原理

  • TaskManager: 每个任务一个 JSON 文件, CRUD + 依赖图。
  • 依赖解除: 完成任务时, 自动将其 ID 从其他任务的 blockedBy 中移除, 解锁后续任务。
  • 状态变更 + 依赖关联: update 处理状态转换和依赖边。
  • 四个任务工具加入 dispatch map。
  1. TaskManager: 每个任务一个 JSON 文件, CRUD + 依赖图。
# 任务管理类:负责创建、存储、管理任务数据
class TaskManager:
    # 初始化方法:设置任务存储目录并初始化任务ID
    # tasks_dir:任务文件的存储路径(Path类型)
    def __init__(self, tasks_dir: Path):
        # 保存任务存储目录到实例变量
        self.dir = tasks_dir
        # 创建任务目录(如果不存在),exist_ok=True 表示已存在则不报错
        self.dir.mkdir(exist_ok=True)
        # 初始化下一个任务的ID:获取当前最大ID并+1
        self._next_id = self._max_id() + 1

    # 创建新任务的方法
    # subject:任务主题(必填)
    # description:任务描述(可选,默认为空)
    def create(self, subject, description=""):
        # 构造任务字典,包含完整的任务字段
        task = {
            "id": self._next_id,    # 任务唯一ID
            "subject": subject,     # 任务主题/标题
            "status": "pending",    # 任务状态:默认待处理
            "blockedBy": [],        # 依赖的任务ID列表(被哪些任务阻塞)
            "blocks": [],           # 阻塞的任务ID列表(阻塞哪些任务)
            "owner": ""             # 任务负责人,默认为空
        }
        # 保存任务到文件系统
        self._save(task)
        # 创建完成后,下一个任务ID自增
        self._next_id += 1
        # 将任务转为格式化的JSON字符串并返回
        return json.dumps(task, indent=2)
  1. 依赖解除: 完成任务时, 自动将其 ID 从其他任务的 blockedBy 中移除, 解锁后续任务。
# 清理任务依赖的私有方法
# 当某个任务完成时,将其从其他任务的"依赖列表"中移除
# completed_id:已完成的任务ID
def _clear_dependency(self, completed_id):
    # 遍历任务目录下所有以 task_ 开头的json文件(所有任务文件)
    for f in self.dir.glob("task_*.json"):
        # 读取文件内容并解析为任务字典
        task = json.loads(f.read_text())
        
        # 检查:已完成的任务ID 是否存在于 当前任务的依赖列表中
        if completed_id in task.get("blockedBy", []):
            # 从当前任务的依赖列表中,移除已完成的任务ID
            task["blockedBy"].remove(completed_id)
            # 保存修改后的任务到文件
            self._save(task)
  1. 状态变更 + 依赖关联: update 处理状态转换和依赖边。
# 更新任务信息的方法:支持修改状态、添加依赖/阻塞关系
# task_id: 要更新的任务ID
# status: 可选,要设置的任务状态
# add_blocked_by: 可选,要添加的【依赖任务ID】
# add_blocks: 可选,要添加的【阻塞任务ID】
def update(self, task_id, status=None,
           add_blocked_by=None, add_blocks=None):
    # 1. 根据任务ID加载对应的任务数据
    task = self._load(task_id)
    
    # 2. 如果传入了新状态,就更新任务状态
    if status:
        task["status"] = status
        
        # 3. 如果状态被改为【已完成】,自动清理该任务的依赖关系
        if status == "completed":
            self._clear_dependency(task_id)
    
    # 4. 保存修改后的任务到文件
    self._save(task)
  1. 四个任务工具加入 dispatch map。
TOOL_HANDLERS = {
    # ...base tools...
    "task_create": lambda **kw: TASKS.create(kw["subject"]),
    "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status")),
    "task_list":   lambda **kw: TASKS.list_all(),
    "task_get":    lambda **kw: TASKS.get(kw["task_id"]),
}

s08: Background Tasks (后台任务)

“慢操作丢后台, agent 继续想下一步”* – 后台线程跑命令, 完成后注入通知。

后台执行 – 模型继续思考, harness 负责等待。

有些命令要跑好几分钟: npm installpytestdocker build。阻塞式循环下模型只能干等。用户说 “装依赖, 顺便建个配置文件”, 智能体却只能一个一个来。

Main thread                Background thread
+-----------------+        +-----------------+
| agent loop      |        | subprocess runs |
| ...             |        | ...             |
| [LLM call] <---+------- | enqueue(result) |
|  ^drain queue   |        +-----------------+
+-----------------+

Timeline:
Agent --[spawn A]--[spawn B]--[other work]----
             |          |
             v          v
          [A runs]   [B runs]      (parallel)
             |          |
             +-- results injected before next LLM call --+

原理:

  • BackgroundManager 用线程安全的通知队列追踪任务。
  • run() 启动守护线程, 立即返回。
  • 子进程完成后, 结果进入通知队列。
  • 每次 LLM 调用前排空通知队列。
  1. BackgroundManager 用线程安全的通知队列追踪任务。
# 后台任务管理类:用于管理多线程后台任务、通知队列和线程安全操作
class BackgroundManager:
    # 初始化方法:创建后台管理器实例
    def __init__(self):
        # 存储后台任务的字典(key=任务ID,value=任务对象/信息)
        self.tasks = {}
        
        # 通知队列:用于存储后台任务产生的通知/消息(先进先出)
        self._notification_queue = []
        
        # 线程锁:保证多线程环境下操作数据安全,防止并发冲突
        self._lock = threading.Lock()
  1. run() 启动守护线程, 立即返回。
# 启动后台任务的方法:接收命令字符串,异步执行并返回任务ID
# command: 要在后台执行的命令
# 返回: 任务启动提示信息
def run(self, command: str) -> str:
    # 生成唯一的短任务ID(取UUID前8位,保证不重复)
    task_id = str(uuid.uuid4())[:8]
    
    # 将任务存入任务字典,标记状态为 running,记录执行命令
    self.tasks[task_id] = {"status": "running", "command": command}
    
    # 创建守护线程:目标执行方法 _execute,传入任务ID和命令
    thread = threading.Thread(
        target=self._execute,    # 线程要运行的函数
        args=(task_id, command), # 传给函数的参数
        daemon=True              # 设为守护线程,主程序退出时自动结束
    )
    
    # 启动线程,后台异步执行命令
    thread.start()
    
    # 返回任务启动成功的提示信息
    return f"Background task {task_id} started"
  1. 子进程完成后, 结果进入通知队列。
# 【私有方法】在后台线程中执行具体的系统命令
# task_id: 后台任务唯一ID
# command: 要执行的shell命令
def _execute(self, task_id, command):
    try:
        # 执行系统命令(子进程)
        # shell=True: 以shell环境运行命令
        # cwd=WORKDIR: 在指定工作目录执行
        # capture_output=True: 捕获标准输出+标准错误
        # text=True: 输出以字符串形式返回
        # timeout=300: 最大执行时间300秒(5分钟)
        r = subprocess.run(command, shell=True, cwd=WORKDIR,
            capture_output=True, text=True, timeout=300)
        
        # 拼接标准输出 + 错误输出,去首尾空格,限制最大长度50000字符
        output = (r.stdout + r.stderr).strip()[:50000]

    # 如果命令执行超时,捕获异常并返回超时提示
    except subprocess.TimeoutExpired:
        output = "Error: Timeout (300s)"

    # 加线程锁:保证多线程下操作队列安全
    with self._lock:
        # 将执行结果(任务ID + 截取后的输出)加入通知队列
        self._notification_queue.append({
            "task_id": task_id,       # 关联任务ID
            "result": output[:500]    # 结果只保留前500字符,避免过长
        })
  1. 每次 LLM 调用前排空通知队列。
# 智能体主循环:持续处理对话 + 后台任务通知
def agent_loop(messages: list):
    # 无限循环,保持智能体持续运行
    while True:
        # 1. 获取并清空后台任务管理器中的所有通知(后台执行结果)
        notifs = BG.drain_notifications()

        # 2. 如果有后台任务返回结果
        if notifs:
            # 把所有后台通知拼接成文本,格式:[bg:任务ID] 执行结果
            notif_text = "\n".join(
                f"[bg:{n['task_id']}] {n['result']}" for n in notifs
            )

            # 3. 将后台结果包装成 <background-results> 标签,加入对话历史(用户角色)
            messages.append({
                "role": "user",
                "content": f"<background-results>\n{notif_text}\n</background-results>"
            })

            # 4. 让智能体回复“已记录后台结果”,保持对话逻辑连贯
            messages.append({
                "role": "assistant",
                "content": "Noted background results."
            })

        # 5. 调用大模型生成下一轮回复(具体参数省略)
        response = client.messages.create(...)

s09: Agent Teams (智能体团队)

“任务太大一个人干不完, 要能分给队友” – 持久化队友 + JSONL 邮箱。

团队邮箱 – 多个模型, 通过文件协调。

真正的团队协作需要三样东西: (1) 能跨多轮对话存活的持久智能体, (2) 身份和生命周期管理, (3) 智能体之间的通信通道。

Teammate lifecycle:
  spawn -> WORKING -> IDLE -> WORKING -> ... -> SHUTDOWN

Communication:
  .team/
    config.json           <- team roster + statuses
    inbox/
      alice.jsonl         <- append-only, drain-on-read
      bob.jsonl
      lead.jsonl

              +--------+    send("alice","bob","...")    +--------+
              | alice  | -----------------------------> |  bob   |
              | loop   |    bob.jsonl << {json_line}    |  loop  |
              +--------+                                +--------+
                   ^                                         |
                   |        BUS.read_inbox("alice")          |
                   +---- alice.jsonl -> read + drain ---------+

原理:

  • TeammateManager 通过 config.json 维护团队名册。
  • spawn() 创建队友并在线程中启动 agent loop。
  • MessageBus: append-only 的 JSONL 收件箱。send() 追加一行; read_inbox() 读取全部并清空。
  • 每个队友在每次 LLM 调用前检查收件箱, 将消息注入上下文。
  1. TeammateManager 通过 config.json 维护团队名册。
# 队友管理类:用于管理团队成员配置、对话线程和文件存储
class TeammateManager:
    # 初始化方法:设置团队目录、加载配置、初始化线程字典
    # team_dir: 团队数据的存储路径(Path类型)
    def __init__(self, team_dir: Path):
        # 保存团队目录路径到实例变量
        self.dir = team_dir
        
        # 创建团队目录(如果不存在),exist_ok=True 表示已存在则不报错
        self.dir.mkdir(exist_ok=True)
        
        # 定义团队配置文件路径(目录下的 config.json)
        self.config_path = self.dir / "config.json"
        
        # 加载配置文件内容到 self.config
        self.config = self._load_config()
        
        # 存储队友对话线程的字典(key=队友ID/名称,value=对话消息列表)
        self.threads = {}
  1. spawn() 创建队友并在线程中启动 agent loop。
# 创建并启动一个新的团队成员(子智能体)
# name: 成员名称
# role: 成员角色/职责
# prompt: 给该成员的初始指令
def spawn(self, name: str, role: str, prompt: str) -> str:
    # 构建新成员的信息字典:包含名称、角色、工作状态
    member = {
        "name": name,       # 成员名称
        "role": role,       # 成员角色(如:开发者、测试者)
        "status": "working" # 成员状态:工作中
    }
    
    # 将新成员添加到配置的成员列表中
    self.config["members"].append(member)
    
    # 保存更新后的团队配置到文件
    self._save_config()
    
    # 创建守护线程,运行队友的主循环逻辑
    # target: 线程执行的目标函数(成员的主循环)
    # args: 传给目标函数的参数(名称、角色、初始指令)
    # daemon=True: 守护线程,主程序退出时自动终止
    thread = threading.Thread(
        target=self._teammate_loop,
        args=(name, role, prompt),
        daemon=True
    )
    
    # 启动线程,让成员在后台独立运行
    thread.start()
    
    # 返回成员创建成功的提示信息
    return f"Spawned teammate '{name}' (role: {role})"
  1. MessageBus: append-only 的 JSONL 收件箱。send() 追加一行; read_inbox() 读取全部并清空。
# 消息总线类:负责团队成员之间的消息发送、存储和读取(基于文件实现简易消息队列)
class MessageBus:

    # 发送消息给指定成员
    # sender: 发送者名称
    # to: 接收者名称
    # content: 消息内容
    # msg_type: 消息类型,默认为普通消息
    # extra: 额外附加数据(可选)
    def send(self, sender, to, content, msg_type="message", extra=None):
        # 构建标准消息结构体
        msg = {
            "type": msg_type,       # 消息类型
            "from": sender,         # 发送者
            "content": content,     # 消息内容
            "timestamp": time.time() # 时间戳(记录发送时间)
        }
        
        # 如果有额外参数,合并到消息中
        if extra:
            msg.update(extra)
        
        # 以追加模式打开接收者的消息文件(文件名:接收者名.jsonl)
        with open(self.dir / f"{to}.jsonl", "a") as f:
            # 将消息转为JSON字符串并写入文件(一行一条)
            f.write(json.dumps(msg) + "\n")

    # 读取并清空指定成员的收件箱(拉取所有消息)
    # name: 要读取收件箱的成员名称
    def read_inbox(self, name):
        # 构造成员收件箱文件路径
        path = self.dir / f"{name}.jsonl"
        
        # 如果文件不存在,返回空列表的JSON字符串
        if not path.exists():
            return "[]"
        
        # 读取所有行,逐行解析为JSON消息列表
        msgs = [json.loads(line) for line in path.read_text().strip().splitlines() if line]
        
        # 清空收件箱文件(drain:取出后即清空,避免重复处理)
        path.write_text("")
        
        # 将消息格式化为格式化JSON字符串返回
        return json.dumps(msgs, indent=2)
  1. 每个队友在每次 LLM 调用前检查收件箱, 将消息注入上下文。
# 【私有方法】团队成员(子智能体)的独立运行主循环
# name: 成员名称   role: 成员角色   prompt: 初始任务指令
def _teammate_loop(self, name, role, prompt):
    # 初始化成员的对话上下文,第一条消息是初始任务指令
    messages = [{"role": "user", "content": prompt}]

    # 最多执行50轮循环(防止无限运行,安全限制)
    for _ in range(50):
        # 1. 读取当前成员的收件箱消息(读完自动清空)
        inbox = BUS.read_inbox(name)
        
        # 2. 如果收件箱有新消息,将其加入对话上下文
        if inbox != "[]":
            # 把收件箱消息包装成标签格式,作为用户消息传入
            messages.append({"role": "user",
                "content": f"<inbox>{inbox}</inbox>"})
            # 助手回复:已记录收件箱消息,保持上下文连贯
            messages.append({"role": "assistant",
                "content": "Noted inbox messages."})

        # 3. 调用AI大模型生成响应(执行任务/调用工具)
        response = client.messages.create(...)

        # 4. 如果模型停止原因不是「工具调用」,说明任务完成,退出循环
        if response.stop_reason != "tool_use":
            break

        # (此处省略)执行工具调用,并将结果追加到messages中

    # 5. 循环结束后,将成员状态更新为「空闲 idle」
    self._find_member(name)["status"] = "idle"

s10: Team Protocols (团队协议)

“队友之间要有统一的沟通规矩” – 一个 request-response 模式驱动所有协商。

协议 – 模型之间的结构化握手。

Shutdown Protocol            Plan Approval Protocol
==================           ======================

Lead             Teammate    Teammate           Lead
  |                 |           |                 |
  |--shutdown_req-->|           |--plan_req------>|
  | {req_id:"abc"}  |           | {req_id:"xyz"}  |
  |                 |           |                 |
  |<--shutdown_resp-|           |<--plan_resp-----|
  | {req_id:"abc",  |           | {req_id:"xyz",  |
  |  approve:true}  |           |  approve:true}  |

Shared FSM:
  [pending] --approve--> [approved]
  [pending] --reject---> [rejected]

Trackers:
  shutdown_requests = {req_id: {target, status}}
  plan_requests     = {req_id: {from, plan, status}}

原理

  • 领导生成 request_id, 通过收件箱发起关机请求。
  • 队友收到请求后, 用 approve/reject 响应。
  • 计划审批遵循完全相同的模式。队友提交计划 (生成 request_id), 领导审查 (引用同一个 request_id)。
  1. 领导生成 request_id, 通过收件箱发起关机请求。
# 全局字典:存储所有关机请求,key=请求ID,value=请求详情
shutdown_requests = {}

# 处理向团队成员发送关机请求的函数
# teammate: 要关闭的队友/子智能体名称
def handle_shutdown_request(teammate: str) -> str:
    # 生成唯一的请求ID(取UUID前8位,确保不重复)
    req_id = str(uuid.uuid4())[:8]
    
    # 将关机请求存入字典,标记目标成员和状态为待处理
    shutdown_requests[req_id] = {
        "target": teammate,    # 目标关闭对象
        "status": "pending"    # 请求状态:待处理
    }
    
    # 通过消息总线发送关机指令
    # 发送者:lead(管理者)
    # 接收者:指定队友
    # 内容:请优雅地关闭
    # 消息类型:shutdown_request(关机请求)
    # 附加参数:携带请求ID,用于追踪
    BUS.send(
        "lead",                # 发送者
        teammate,              # 接收者
        "Please shut down gracefully.",  # 消息内容
        "shutdown_request",    # 消息类型
        {"request_id": req_id} # 附加信息:请求ID
    )
    
    # 返回请求发送成功的提示信息
    return f"Shutdown request {req_id} sent (status: pending)"
  1. 队友收到请求后, 用 approve/reject 响应。
# 处理【关机响应】工具调用
# 当子智能体收到关机请求后,通过该工具回复同意/拒绝
if tool_name == "shutdown_response":
    # 从工具参数中获取:关机请求ID、是否同意关闭
    req_id = args["request_id"]
    approve = args["approve"]
    
    # 更新全局请求状态:同意 → approved,拒绝 → rejected
    shutdown_requests[req_id]["status"] = "approved" if approve else "rejected"
    
    # 通过消息总线,把响应结果发送给领导者(lead)
    # sender:发送响应的队友名称
    # 接收者:lead(管理者)
    # 消息内容:拒绝/同意的原因
    # 消息类型:shutdown_response
    # 附加信息:携带请求ID + 同意状态
    BUS.send(
        sender,                # 发送者:当前队友
        "lead",                # 接收者:主智能体/管理者
        args.get("reason", ""),# 响应原因(可选)
        "shutdown_response",   # 消息类型:关机响应
        {
            "request_id": req_id,
            "approve": approve
        }
    )
  1. 计划审批遵循完全相同的模式。队友提交计划 (生成 request_id), 领导审查 (引用同一个 request_id)。
# 全局字典:存储所有【计划审核请求】,key=请求ID,value=请求详情
plan_requests = {}

# 处理计划审核结果的函数(由领导者/管理员调用)
# request_id: 待审核的计划请求ID
# approve: 是否通过审核(布尔值)
# feedback: 审核反馈/意见(可选)
def handle_plan_review(request_id, approve, feedback=""):
    # 1. 根据请求ID,从字典中取出对应的计划请求
    req = plan_requests[request_id]
    
    # 2. 更新请求状态:通过/拒绝
    req["status"] = "approved" if approve else "rejected"
    
    # 3. 通过消息总线,把审核结果发送给【提交计划的成员】
    BUS.send(
        "lead",                # 发送者:领导者/管理员
        req["from"],           # 接收者:提交该计划的成员
        feedback,              # 发送审核反馈内容
        "plan_approval_response",  # 消息类型:计划审核响应
        {
            "request_id": request_id,  # 携带请求ID,用于关联
            "approve": approve         # 携带审核结果(通过/拒绝)
        }
    )

一个 FSM, 两种用途。同样的 pending -> approved | rejected 状态机可以套用到任何请求-响应协议上。

s11: Autonomous Agents (自治智能体)

“队友自己看看板, 有活就认领” – 不需要领导逐个分配, 自组织。

自治 – 模型自己找活干, 无需指派。

Teammate lifecycle with idle cycle:

+-------+
| spawn |
+---+---+
    |
    v
+-------+   tool_use     +-------+
| WORK  | <------------- |  LLM  |
+---+---+                +-------+
    |
    | stop_reason != tool_use (or idle tool called)
    v
+--------+
|  IDLE  |  poll every 5s for up to 60s
+---+----+
    |
    +---> check inbox --> message? ----------> WORK
    |
    +---> scan .tasks/ --> unclaimed? -------> claim -> WORK
    |
    +---> 60s timeout ----------------------> SHUTDOWN

Identity re-injection after compression:
  if len(messages) <= 3:
    messages.insert(0, identity_block)

原理

  • 队友循环分两个阶段: WORK 和 IDLE。LLM 停止调用工具 (或调用了 idle) 时, 进入 IDLE。
  • 空闲阶段循环轮询收件箱和任务看板。
  • 任务看板扫描: 找 pending 状态、无 owner、未被阻塞的任务。
  • 身份重注入: 上下文过短 (说明发生了压缩) 时, 在开头插入身份块。
  1. 队友循环分两个阶段: WORK 和 IDLE。LLM 停止调用工具 (或调用了 idle) 时, 进入 IDLE。
# 【私有方法】智能体/团队成员的完整生命周期循环
# 包含:工作阶段 → 空闲等待 → 恢复/关闭 全流程
# name: 成员名称   role: 成员角色   prompt: 初始工作指令
def _loop(self, name, role, prompt):
    # 无限循环:维持成员持续运行
    while True:
        # ======================
        # 【一、工作阶段】执行任务
        # ======================
        # 初始化对话上下文,使用初始指令
        messages = [{"role": "user", "content": prompt}]
        
        # 最多执行50轮工具调用/任务处理(安全限制)
        for _ in range(50):
            # 调用AI模型生成响应
            response = client.messages.create(...)
            
            # 如果模型停止原因不是工具调用 → 任务完成,退出工作循环
            if response.stop_reason != "tool_use":
                break
            
            # (省略)执行具体工具逻辑
            
            # 如果收到空闲请求 → 主动中断工作
            if idle_requested:
                break

        # ======================
        # 【二、空闲阶段】等待指令
        # ======================
        # 将成员状态设置为:空闲(idle)
        self._set_status(name, "idle")
        
        # 进入空闲轮询:等待恢复指令或关机指令
        # 返回 True=继续工作  False=关闭程序
        resume = self._idle_poll(name, messages)
        
        # 判断是否需要关闭成员
        if not resume:
            # 设置状态为:已关闭(shutdown)
            self._set_status(name, "shutdown")
            # 退出循环,终止线程
            return
        
        # 收到恢复指令 → 重新设置状态为:工作中(working)
        self._set_status(name, "working")
  1. 空闲阶段循环轮询收件箱和任务看板。
# 【私有方法】智能体空闲轮询:等待消息、自动领任务、超时判断
# 智能体在空闲时循环检查是否需要恢复工作
# name: 当前智能体名称 | messages: 对话上下文
def _idle_poll(self, name, messages):
    # 计算轮询次数:总超时时间 ÷ 轮询间隔
    # 例:60秒超时,每5秒查一次 → 共轮询12次
    for _ in range(IDLE_TIMEOUT // POLL_INTERVAL):
        
        # 每次轮询先等待指定间隔时间(不占用CPU)
        time.sleep(POLL_INTERVAL)
        
        # 1. 检查收件箱:是否收到新消息/指令
        inbox = BUS.read_inbox(name)
        if inbox:
            # 有新消息 → 加入上下文,返回True(恢复工作)
            messages.append({"role": "user",
                "content": f"<inbox>{inbox}</inbox>"})
            return True
        
        # 2. 检查是否有无人认领的任务
        unclaimed = scan_unclaimed_tasks()
        if unclaimed:
            # 自动认领第一个任务
            claim_task(unclaimed[0]["id"], name)
            # 把认领结果加入上下文
            messages.append({"role": "user",
                "content": f"<auto-claimed>Task #{unclaimed[0]['id']}: "
                           f"{unclaimed[0]['subject']}</auto-claimed>"})
            return True

    # 3. 轮询全部结束仍无任务/消息 → 超时,返回False(准备关机)
    return False
  1. 任务看板扫描: 找 pending 状态、无 owner、未被阻塞的任务。
# 扫描所有【无人认领、可执行】的任务
# 筛选条件:待处理 + 无负责人 + 无依赖阻塞
def scan_unclaimed_tasks() -> list:
    # 存储符合条件的可认领任务
    unclaimed = []
    
    # 遍历任务目录下所有 task_*.json 文件,并按文件名排序
    for f in sorted(TASKS_DIR.glob("task_*.json")):
        # 读取并解析任务文件内容
        task = json.loads(f.read_text())
        
        # 筛选条件:
        # 1. 任务状态 = pending(待处理)
        # 2. 无负责人(owner 为空)
        # 3. 无阻塞依赖(blockedBy 为空)
        if (task.get("status") == "pending"
                and not task.get("owner")
                and not task.get("blockedBy")):
            # 符合条件 → 加入可认领列表
            unclaimed.append(task)
    
    # 返回所有可自动认领的任务
    return unclaimed
  1. 身份重注入: 上下文过短 (说明发生了压缩) 时, 在开头插入身份块。
# 如果当前对话消息过短(≤3条),为了保证智能体身份认知完整
# 主动在消息列表最前面插入【身份设定】,防止AI忘记自己是谁
if len(messages) <= 3:
    # 插入到第0位:给AI的身份与工作指令
    messages.insert(0, {
        "role": "user",
        "content": f"<identity>You are '{name}', role: {role}, "
                   f"team: {team_name}. Continue your work.</identity>"
    })
    # 插入到第1位:AI确认身份的回复,保持上下文连贯
    messages.insert(1, {
        "role": "assistant",
        "content": f"I am {name}. Continuing."
    })

s12: Worktree + Task Isolation (Worktree 任务隔离)

“各干各的目录, 互不干扰” – 任务管目标, worktree 管目录, 按 ID 绑定。

目录隔离 – 永不碰撞的并行执行通道。

Control plane (.tasks/)             Execution plane (.worktrees/)
+------------------+                +------------------------+
| task_1.json      |                | auth-refactor/         |
|   status: in_progress  <------>   branch: wt/auth-refactor
|   worktree: "auth-refactor"   |   task_id: 1             |
+------------------+                +------------------------+
| task_2.json      |                | ui-login/              |
|   status: pending    <------>     branch: wt/ui-login
|   worktree: "ui-login"       |   task_id: 2             |
+------------------+                +------------------------+
                                    |
                          index.json (worktree registry)
                          events.jsonl (lifecycle log)

State machines:
  Task:     pending -> in_progress -> completed
  Worktree: absent  -> active      -> removed | kept

原理

  • 创建任务。 先把目标持久化。
  • 创建 worktree 并绑定任务。 传入 task_id 自动将任务推进到 in_progress
  • 在 worktree 中执行命令。 cwd 指向隔离目录。
  • 收尾。 两种选择:
  • 事件流。 每个生命周期步骤写入 .worktrees/events.jsonl:
  1. 创建任务。 先把目标持久化。
TASKS.create("Implement auth refactor")
# -> .tasks/task_1.json  status=pending  worktree=""
  1. 创建 worktree 并绑定任务。 传入 task_id 自动将任务推进到 in_progress
WORKTREES.create("auth-refactor", task_id=1)
# -> git worktree add -b wt/auth-refactor .worktrees/auth-refactor HEAD
# -> index.json gets new entry, task_1.json gets worktree="auth-refactor"

绑定同时写入两侧状态:

def bind_worktree(self, task_id, worktree):
    task = self._load(task_id)
    task["worktree"] = worktree
    if task["status"] == "pending":
        task["status"] = "in_progress"
    self._save(task)
  1. 在 worktree 中执行命令。 cwd 指向隔离目录。
subprocess.run(command, shell=True, cwd=worktree_path,
               capture_output=True, text=True, timeout=300)
  1. 收尾。 两种选择:
    • worktree_keep(name) – 保留目录供后续使用。
    • worktree_remove(name, complete_task=True) – 删除目录, 完成绑定任务, 发出事件。一个调用搞定拆除 + 完成。
def remove(self, name, force=False, complete_task=False):
    self._run_git(["worktree", "remove", wt["path"]])
    if complete_task and wt.get("task_id") is not None:
        self.tasks.update(wt["task_id"], status="completed")
        self.tasks.unbind_worktree(wt["task_id"])
        self.events.emit("task.completed", ...)
  1. 事件流。 每个生命周期步骤写入 .worktrees/events.jsonl:
{
  "event": "worktree.remove.after",
  "task": {"id": 1, "status": "completed"},
  "worktree": {"name": "auth-refactor", "status": "removed"},
  "ts": 1730000000
}

事件类型: worktree.create.before/after/failed, worktree.remove.before/after/failed, worktree.keep, task.completed

崩溃后从 .tasks/ + .worktrees/index.json 重建现场。会话记忆是易失的; 磁盘状态是持久的。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐