系列「企业级 AI Agent 实现拆解」第四篇。上一篇讲了 ReAct 循环的主体实现,这篇看中断恢复这条路径怎么走通。


为什么需要 HITL

ReAct 循环跑起来之后,一个问题立刻浮现:AI 自主决定调什么工具,人怎么管得住?

我们的场景里有一批高危工具——执行 SQL、删文件、发外部 HTTP 请求。这些工具不能让 Agent 自己决定要不要用,必须有人在中间确认。另外还有合规场景:某些关键字触发、超预算、Agent 主动请求人介入……

HITL(Human-in-the-Loop)本质上是在 ReAct 循环里打一个暂停点:循环跑到需要人工确认的地方,挂起,等人做决定,然后从原位继续。"等人"这段时间可以是秒级,也可能是小时级,进程在这期间随时可以重启,状态不能丢。


中断的四种类型

代码里用 InterruptKind 区分中断来源:

type InterruptKind string

const (
    InterruptPreToolUse      InterruptKind = "pre_tool_use"      // 工具调用前审批
    InterruptPolicyViolation InterruptKind = "policy_violation"  // 命中合规规则
    InterruptBudgetExceeded  InterruptKind = "budget_exceeded"   // 超预算需确认
    InterruptManual          InterruptKind = "manual"            // Agent 主动请求
)

最常见的是 pre_tool_use:PreToolUse hook 检测到高危操作,让循环暂停等审批。其余三种同理,区别在于谁触发、触发条件是什么。


整条路径,从触发到恢复

第一步:Eino 图级中断——“在工具节点前装一个停止按钮”

上一篇讲了 Handle() 把 ReAct 循环交给 Eino 的 react.NewAgent() 跑。Eino 内部把循环实现成一个有向图:LLM 推理是一个节点,工具执行是另一个节点,数据在节点之间流动。

Eino 提供了一个能力:在图的某个节点前面插一个"停止按钮"。代码长这样:

// agent_builder.go — 构建 Eino 图时
if cfg.HITLEnabled {
    compose.WithInterruptBeforeNodes([]string{"tools"})
}

意思是:当图的执行流即将进入 tools 节点(准备执行工具调用)时,自动暂停。不需要自己写 if 判断,不需要在业务代码里插中断逻辑——框架在图编译阶段就把这个停止点焊进去了。

打个比方:想象一条工厂流水线,LLM 推理是工位 A,工具执行是工位 B。WithInterruptBeforeNodes 就是在工位 B 前面装一个红灯——传送带到这里自动停,等人按绿灯才继续。

应用层怎么知道发生了中断?

StreamTurn 调用 runnable.Stream(),正常情况返回一个 Token 流;如果图被中断了,会返回一个特殊的错误。应用层用 ExtractInterruptInfo 从这个错误里提取中断信号:

// factory.go
sr, err := runnable.Stream(ctx, inputMsgs, opts...)
if err != nil {
    if info, ok := compose.ExtractInterruptInfo(err); ok {
        return nil, &port.AgentInterruptInfo{BeforeNodes: info.BeforeNodes}, nil
    }
    return nil, nil, err  // 真正的错误
}

应用层看到 interrupt != nil,就知道图被中断了,然后执行中断处理:

// run_turn.go — handleEinoInterrupt
func (h *RunTurnHandler) handleEinoInterrupt(ctx context.Context, sess *model.Session, info *port.AgentInterruptInfo) error {
    it := model.NewPreToolInterrupt("hitl: tool call requires approval", model.ToolCall{}, h.interruptTTL)
    if err := sess.Pause(it); err != nil {
        return h.failSession(ctx, sess, err)
    }
    if h.stream != nil {
        h.stream.Emit(ctx, sess.ID(), port.StreamEvent{
            Type: "interrupt", Payload: map[string]any{"before_nodes": info.BeforeNodes},
        })
    }
    return h.persist(ctx, sess)
}

四步:

  1. 创建 Interrupt 值对象,带 ExpiresAt(当前时间 + TTL,默认 30 分钟)
  2. sess.Pause(it) — 状态从 running 迁移到 waiting,把 Interrupt 挂到 session 上
  3. stream.Emit — 推一帧 SSE interrupt 事件给浏览器,前端弹出审批弹窗
  4. persist — 先写数据库再发事件,状态安全落地

循环在 return h.persist(ctx, sess) 这里退出,Goroutine 结束。Session 的状态完整保留在数据库里,进程可以安全重启。

你可能注意到了:NewPreToolInterrupt 传了一个空的 model.ToolCall{}。为什么?因为中断发生在工具执行之前——LLM 决定要调什么工具,但工具还没执行。具体的工具调用信息由 Eino 的 checkpoint 保存在图状态里,恢复时自动还原,不需要在 Interrupt 实体里重复存一份。

中断时,Eino 内部发生了什么

上面说的是"应用层怎么处理中断"。但中断信号本身是 Eino 框架产生的——框架内部做了三件事,才是中断能工作的真正原因。

第一件:图的执行流到了 tools 节点前,框架检测到命中中断列表。

Eino 把 ReAct 循环实现成一个有向图(Directed Graph)。每个节点是一个执行步骤——LLM 调用是一个节点,工具执行是另一个节点,节点之间有边连接,数据沿边流动。图跑起来之后,框架内部有一个主循环:取下一个待执行节点 → 执行 → 把输出传给下游节点 → 再取下一个。

WithInterruptBeforeNodes(["tools"]) 的效果是:在主循环里,每次准备执行下一个节点之前,先检查这个节点是不是 "tools"。如果是,不执行,而是走中断分支。

第二件:框架把图的完整状态保存成 checkpoint(检查点)。

这一步类似游戏里的"存档"。中断发生时,图可能已经跑了好几轮 LLM 推理,积累了对话历史、工具调用决策、中间变量。checkpoint 把这些全部序列化成一个二进制 blob,通过 CheckPointStore 接口写入数据库。

我们的 CheckPointStore 实现是这样的:

// checkpoint.go
type SessionCheckpointStore struct {
    repo domain.SessionRepo  // 复用已有的 SessionRepo
}

func (s *SessionCheckpointStore) Set(ctx context.Context, threadID string, cp *checkpoint.Checkpoint) error {
    blob, _ := json.Marshal(cp)
    return s.repo.SaveCheckpoint(ctx, model.SessionID(threadID), blob)
}

func (s *SessionCheckpointStore) Get(ctx context.Context, threadID string) (*checkpoint.Checkpoint, error) {
    blob, found, err := s.repo.LoadCheckpoint(ctx, model.SessionID(threadID))
    if !found { return nil, nil }
    var cp checkpoint.Checkpoint
    json.Unmarshal(blob, &cp)
    return &cp, nil
}

threadID 就是 sessionID。checkpoint blob 存在 sessions 表的 checkpoint_blob 列里——和 session 状态存在同一张表,不需要额外的存储。

第三件:框架返回一个特殊的错误信号。

checkpoint 存好之后,框架不会继续执行 tools 节点,而是返回一个 interruptError。这个错误携带了 InterruptInfo{BeforeNodes: ["tools"]},告诉调用方"我在哪个节点前面停了"。

这就是应用层看到的那个信号——ExtractInterruptInfo(err) 从错误里提取出来的就是这个 InterruptInfo

Eino 图主循环(简化)
    │
    ├── 取下一个节点 → "tools"
    ├── 检查 interruptBeforeNodes → 命中!
    │
    ├── 保存 checkpoint → json.Marshal(图状态) → 写入 sessions.checkpoint_blob
    ├── 返回 interruptError{BeforeNodes: ["tools"]}
    │
    └── [图执行结束,控制权回到应用层]

恢复时,Eino 怎么从存档点继续

上一篇讲了 ResumeInterruptHandler 把 approve/modify 决策传回 RunTurnHandler.HandleHandle 发现 UserText == "",走 resume 路径:先 sess.Resume(decision) 落库,再调 StreamTurn(IsResume=true)

关键区别在 StreamTurn 内部——新 turn 和 resume 传了不同的选项给 Eino:

// factory.go — StreamTurn 内部
if !in.IsResume {
    opts = append(opts, compose.WithForceNewRun())  // 新 turn:跳过 checkpoint,从头开始
}
// resume 时不加 WithForceNewRun,Eino 会自动加载 checkpoint

Eino 的行为完全不同:

新 turn resume
WithForceNewRun 有(true 无(false
Eino 加载 checkpoint? 不加载,从 START 节点重新开始 从数据库加载 checkpoint
输入消息 完整的 system prompt + history nil(不需要,checkpoint 里已经有了)
从哪里开始执行 LLM 推理节点(第一步) 上次中断的 tools 节点

Resume 的执行流:

StreamTurn(IsResume=true)
    │
    ├── 不加 WithForceNewRun
    ├── inputMsgs = nil
    │
    └── runnable.Stream(ctx, nil, opts...)
            │
            ├── Eino 检测到 checkPointID != nil && !forceNewRun
            ├── 从数据库加载 checkpoint_blob
            ├── json.Unmarshal → 还原图的完整状态
            │   包括:之前 LLM 的推理结果、要调的工具名和参数、中间变量
            ├── 从 checkpoint 中取出待执行的 tasks
            │   就是上次被中断的 "tools" 节点
            │
            └── 继续主循环 → 执行工具调用 → 拿到结果 → 继续 LLM 推理 → ...

用一个不太严谨但好理解的比喻:新 turn 像开新游戏,resume 像读档。 存档点精确到"LLM 刚决定调某个工具,但还没调"——恢复后直接从这个工具调用开始执行,不重新问 LLM,也不重走之前的推理步骤。

还有一个细节:Runnable 是按 AgentConfig 缓存的,缓存的 TTL 是 35 分钟,而中断的 TTL 是 30 分钟。35 > 30 是故意的——保证 resume 时 Runnable 还在缓存里,不需要重新构建图、重新拉工具 schema,减少恢复路径的故障面。

第二步:用户在 UI 做决定

浏览器收到 interrupt SSE 帧,弹出审批窗口,显示:

  • 工具名和参数(来自 Interrupt.ToolCall
  • 原因说明(来自 Interrupt.Reason
  • 三个操作:允许 / 拒绝 / 修改参数后允许

用户确认后,前端调:

POST /v1/sessions/{id}/interrupts/{intr_id}/resume
{
  "action": "approve" | "deny" | "modify",
  "modified_args": "...",   // action=modify 时
  "comment": "..."
}

第三步:ResumeInterruptHandler 路由

func (h *ResumeInterruptHandler) Handle(ctx context.Context, in ResumeInput) error {
    in.Decision.DecidedAt = time.Now()

    sess, err := h.repo.Load(ctx, in.SessionID)
    if err != nil {
        return err
    }
    if sess.State() != model.StateWaiting {
        return fmt.Errorf("resume interrupt: %w (state=%s)", ErrSessionNotWaiting, sess.State())
    }

    if in.Decision.Action == model.DecisionDeny {
        return h.runner.rejectInterrupt(ctx, sess, in.Decision)
    }

    // approve / modify
    return h.runner.Handle(ctx, RunTurnInput{
        SessionID: in.SessionID,
        Decision:  &in.Decision,
    })
}

先校验状态,再分叉:

  • denyrejectInterrupt:终止 session,同时记录完整的 HITL 决策事件
  • approve / modify → 重新进 RunTurnHandler.Handle,从中断的工具调用继续

approve 路径重新进循环时,RunTurnHandler 会拿到 Decision,把之前等待的工具调用继续执行下去,不重新问 LLM,也不重新拉工具 schema。


为什么用 RejectInterrupt 而不是 Cancel

拒绝时,有两条路径可以走:

sess.Cancel("hitl:deny")     // ← 只 emit EventSessionCancelled
sess.RejectInterrupt(decision) // ← emit EventInterruptResolved + EventSessionCancelled

我们用的是 RejectInterrupt。区别在于事件完整性:

Cancel 只告诉下游"session 结束了",审计日志里看不到 HITL 这条决策链——是谁拒绝的、什么时候、拒绝的是哪个工具调用、有没有备注。RejectInterrupt 会先 emit EventInterruptResolved(带完整的 InterruptDecision),再 emit EventSessionCancelled。审计服务能拿到这两个事件,还原出完整的决策链。

这在合规场景里很重要:操作日志里必须能查到"谁、在什么时候、为什么拒绝了这个工具调用"。


超时处理:K8s CronJob 兜底

审批有 TTL,超时未处理的 session 需要自动关掉。这是一个周期任务:

// cmd/cron/hitl-timeout · K8s CronJob */5 * * * *
func (h *ExpireWaitingInterruptsHandler) cancelOne(
    ctx context.Context, sid model.SessionID, now time.Time,
) error {
    sess, _ := h.repo.Load(ctx, sid)
    if sess.State() != model.StateWaiting {
        return nil  // 已被用户审批,跳过
    }
    decision := model.InterruptDecision{
        Action:    model.DecisionDeny,
        Comment:   fmt.Sprintf("hitl:timeout:expired_at=%s", now.UTC().Format(time.RFC3339)),
        DecidedBy: "system:hitl-timeout",
        DecidedAt: now,
    }
    _ = sess.RejectInterrupt(decision)  // 同样走 RejectInterrupt,保留事件链
    _ = h.repo.Save(ctx, sess)
    // ...publish events
}

同样走 RejectInterrupt 而不是 Cancel,原因一样:审计需要知道这次 session 结束是因为 HITL 超时,而不是用户主动取消或者 Agent 出错。decided_by=system:hitl-timeout 让报表系统能直接过滤统计。

每个 session 失败不阻断后续:cancelOne 出错只记 log、继续下一个。这是周期任务的标准做法,一个坏掉的 session 不能影响其他的。


流程示意

用户发消息
    │
    ▼
ReAct 循环(Eino 图内部)... LLM → 决定调高危工具
                                      │
                            ┌── tools 节点 ──┐
                            │  红灯亮了!     │  ← WithInterruptBeforeNodes
                            │  图执行暂停    │
                            └───────┬────────┘
                                    │
                      ExtractInterruptInfo → interrupt != nil
                                    │
                      handleEinoInterrupt:
                      sess.Pause(interrupt)    state: running → waiting
                                    │
                      stream.Emit(interrupt)   ← SSE 帧推给浏览器
                                    │
                      persist(sess)            ← 状态落库
                      [循环 Goroutine 退出]
                                    │
                                    │    (用户在 UI 看到审批弹窗)
                                    │
                          ┌─────────┴──────────┐
                        deny                approve/modify
                          │                     │
                   RejectInterrupt          重进 RunTurnHandler
                   emit × 2 events          从 Eino checkpoint 恢复
                   state → cancelled        继续执行工具调用
                                             state: waiting → running

跟 Eino ADK 的关系

上一篇已经提过,ReAct 循环的编排我们直接用了 Einoreact.NewAgent()。HITL 中断也是——WithInterruptBeforeNodesExtractInterruptInfo 都是 Eino 框架提供的 API。

但 Eino 只管"在哪里停"和"怎么恢复图的执行",不管停了之后业务上要干什么。停了之后的事是我们的领域层在处理:

  • sess.Pause() 把状态迁移到 waiting
  • InterruptKind 枚举标记中断的业务含义(工具审批 / 合规 / 预算)
  • RejectInterrupt 保留完整的审计事件链
  • decided_by 字段记录谁做的决定
  • 超时兜底走同一个 handler,decided_by=system:hitl-timeout

简单说:Eino 负责在哪里停,我们负责停了之后怎么记、怎么审、怎么超时兜底。 两者各管各的,中间只通过 AgentInterruptInfo 这个薄薄的数据结构通信。


小结

HITL 这条路径的核心是三个设计决定:

  1. 状态持久化先于一切sess.Pause + persist 之后循环才退出,重启不丢状态
  2. RejectInterrupt 而不是 Cancel:两个事件比一个事件多的信息,合规审计用得上
  3. CronJob 走同一路径:超时按 deny 处理,decided_by 标记来源,审计可区分

下一篇看 SSE 推流——token 是怎么一个个从 LLM 流到浏览器的。


下一篇:SSE 推流 —— Token 怎么一个个蹦出来

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐