HITL:让人类随时叫停 AI,并且能优雅地继续
系列「企业级 AI Agent 实现拆解」第四篇。上一篇讲了 ReAct 循环的主体实现,这篇看中断恢复这条路径怎么走通。
为什么需要 HITL
ReAct 循环跑起来之后,一个问题立刻浮现:AI 自主决定调什么工具,人怎么管得住?
我们的场景里有一批高危工具——执行 SQL、删文件、发外部 HTTP 请求。这些工具不能让 Agent 自己决定要不要用,必须有人在中间确认。另外还有合规场景:某些关键字触发、超预算、Agent 主动请求人介入……
HITL(Human-in-the-Loop)本质上是在 ReAct 循环里打一个暂停点:循环跑到需要人工确认的地方,挂起,等人做决定,然后从原位继续。"等人"这段时间可以是秒级,也可能是小时级,进程在这期间随时可以重启,状态不能丢。
中断的四种类型
代码里用 InterruptKind 区分中断来源:
type InterruptKind string
const (
InterruptPreToolUse InterruptKind = "pre_tool_use" // 工具调用前审批
InterruptPolicyViolation InterruptKind = "policy_violation" // 命中合规规则
InterruptBudgetExceeded InterruptKind = "budget_exceeded" // 超预算需确认
InterruptManual InterruptKind = "manual" // Agent 主动请求
)
最常见的是 pre_tool_use:PreToolUse hook 检测到高危操作,让循环暂停等审批。其余三种同理,区别在于谁触发、触发条件是什么。
整条路径,从触发到恢复
第一步:Eino 图级中断——“在工具节点前装一个停止按钮”
上一篇讲了 Handle() 把 ReAct 循环交给 Eino 的 react.NewAgent() 跑。Eino 内部把循环实现成一个有向图:LLM 推理是一个节点,工具执行是另一个节点,数据在节点之间流动。
Eino 提供了一个能力:在图的某个节点前面插一个"停止按钮"。代码长这样:
// agent_builder.go — 构建 Eino 图时
if cfg.HITLEnabled {
compose.WithInterruptBeforeNodes([]string{"tools"})
}
意思是:当图的执行流即将进入 tools 节点(准备执行工具调用)时,自动暂停。不需要自己写 if 判断,不需要在业务代码里插中断逻辑——框架在图编译阶段就把这个停止点焊进去了。
打个比方:想象一条工厂流水线,LLM 推理是工位 A,工具执行是工位 B。WithInterruptBeforeNodes 就是在工位 B 前面装一个红灯——传送带到这里自动停,等人按绿灯才继续。
应用层怎么知道发生了中断?
StreamTurn 调用 runnable.Stream(),正常情况返回一个 Token 流;如果图被中断了,会返回一个特殊的错误。应用层用 ExtractInterruptInfo 从这个错误里提取中断信号:
// factory.go
sr, err := runnable.Stream(ctx, inputMsgs, opts...)
if err != nil {
if info, ok := compose.ExtractInterruptInfo(err); ok {
return nil, &port.AgentInterruptInfo{BeforeNodes: info.BeforeNodes}, nil
}
return nil, nil, err // 真正的错误
}
应用层看到 interrupt != nil,就知道图被中断了,然后执行中断处理:
// run_turn.go — handleEinoInterrupt
func (h *RunTurnHandler) handleEinoInterrupt(ctx context.Context, sess *model.Session, info *port.AgentInterruptInfo) error {
it := model.NewPreToolInterrupt("hitl: tool call requires approval", model.ToolCall{}, h.interruptTTL)
if err := sess.Pause(it); err != nil {
return h.failSession(ctx, sess, err)
}
if h.stream != nil {
h.stream.Emit(ctx, sess.ID(), port.StreamEvent{
Type: "interrupt", Payload: map[string]any{"before_nodes": info.BeforeNodes},
})
}
return h.persist(ctx, sess)
}
四步:
- 创建
Interrupt值对象,带ExpiresAt(当前时间 + TTL,默认 30 分钟) sess.Pause(it)— 状态从running迁移到waiting,把 Interrupt 挂到 session 上stream.Emit— 推一帧 SSEinterrupt事件给浏览器,前端弹出审批弹窗persist— 先写数据库再发事件,状态安全落地
循环在 return h.persist(ctx, sess) 这里退出,Goroutine 结束。Session 的状态完整保留在数据库里,进程可以安全重启。
你可能注意到了:NewPreToolInterrupt 传了一个空的 model.ToolCall{}。为什么?因为中断发生在工具执行之前——LLM 决定要调什么工具,但工具还没执行。具体的工具调用信息由 Eino 的 checkpoint 保存在图状态里,恢复时自动还原,不需要在 Interrupt 实体里重复存一份。
中断时,Eino 内部发生了什么
上面说的是"应用层怎么处理中断"。但中断信号本身是 Eino 框架产生的——框架内部做了三件事,才是中断能工作的真正原因。
第一件:图的执行流到了 tools 节点前,框架检测到命中中断列表。
Eino 把 ReAct 循环实现成一个有向图(Directed Graph)。每个节点是一个执行步骤——LLM 调用是一个节点,工具执行是另一个节点,节点之间有边连接,数据沿边流动。图跑起来之后,框架内部有一个主循环:取下一个待执行节点 → 执行 → 把输出传给下游节点 → 再取下一个。
WithInterruptBeforeNodes(["tools"]) 的效果是:在主循环里,每次准备执行下一个节点之前,先检查这个节点是不是 "tools"。如果是,不执行,而是走中断分支。
第二件:框架把图的完整状态保存成 checkpoint(检查点)。
这一步类似游戏里的"存档"。中断发生时,图可能已经跑了好几轮 LLM 推理,积累了对话历史、工具调用决策、中间变量。checkpoint 把这些全部序列化成一个二进制 blob,通过 CheckPointStore 接口写入数据库。
我们的 CheckPointStore 实现是这样的:
// checkpoint.go
type SessionCheckpointStore struct {
repo domain.SessionRepo // 复用已有的 SessionRepo
}
func (s *SessionCheckpointStore) Set(ctx context.Context, threadID string, cp *checkpoint.Checkpoint) error {
blob, _ := json.Marshal(cp)
return s.repo.SaveCheckpoint(ctx, model.SessionID(threadID), blob)
}
func (s *SessionCheckpointStore) Get(ctx context.Context, threadID string) (*checkpoint.Checkpoint, error) {
blob, found, err := s.repo.LoadCheckpoint(ctx, model.SessionID(threadID))
if !found { return nil, nil }
var cp checkpoint.Checkpoint
json.Unmarshal(blob, &cp)
return &cp, nil
}
threadID 就是 sessionID。checkpoint blob 存在 sessions 表的 checkpoint_blob 列里——和 session 状态存在同一张表,不需要额外的存储。
第三件:框架返回一个特殊的错误信号。
checkpoint 存好之后,框架不会继续执行 tools 节点,而是返回一个 interruptError。这个错误携带了 InterruptInfo{BeforeNodes: ["tools"]},告诉调用方"我在哪个节点前面停了"。
这就是应用层看到的那个信号——ExtractInterruptInfo(err) 从错误里提取出来的就是这个 InterruptInfo。
Eino 图主循环(简化)
│
├── 取下一个节点 → "tools"
├── 检查 interruptBeforeNodes → 命中!
│
├── 保存 checkpoint → json.Marshal(图状态) → 写入 sessions.checkpoint_blob
├── 返回 interruptError{BeforeNodes: ["tools"]}
│
└── [图执行结束,控制权回到应用层]
恢复时,Eino 怎么从存档点继续
上一篇讲了 ResumeInterruptHandler 把 approve/modify 决策传回 RunTurnHandler.Handle。Handle 发现 UserText == "",走 resume 路径:先 sess.Resume(decision) 落库,再调 StreamTurn(IsResume=true)。
关键区别在 StreamTurn 内部——新 turn 和 resume 传了不同的选项给 Eino:
// factory.go — StreamTurn 内部
if !in.IsResume {
opts = append(opts, compose.WithForceNewRun()) // 新 turn:跳过 checkpoint,从头开始
}
// resume 时不加 WithForceNewRun,Eino 会自动加载 checkpoint
Eino 的行为完全不同:
| 新 turn | resume | |
|---|---|---|
WithForceNewRun |
有(true) |
无(false) |
| Eino 加载 checkpoint? | 不加载,从 START 节点重新开始 | 从数据库加载 checkpoint |
| 输入消息 | 完整的 system prompt + history | nil(不需要,checkpoint 里已经有了) |
| 从哪里开始执行 | LLM 推理节点(第一步) | 上次中断的 tools 节点 |
Resume 的执行流:
StreamTurn(IsResume=true)
│
├── 不加 WithForceNewRun
├── inputMsgs = nil
│
└── runnable.Stream(ctx, nil, opts...)
│
├── Eino 检测到 checkPointID != nil && !forceNewRun
├── 从数据库加载 checkpoint_blob
├── json.Unmarshal → 还原图的完整状态
│ 包括:之前 LLM 的推理结果、要调的工具名和参数、中间变量
├── 从 checkpoint 中取出待执行的 tasks
│ 就是上次被中断的 "tools" 节点
│
└── 继续主循环 → 执行工具调用 → 拿到结果 → 继续 LLM 推理 → ...
用一个不太严谨但好理解的比喻:新 turn 像开新游戏,resume 像读档。 存档点精确到"LLM 刚决定调某个工具,但还没调"——恢复后直接从这个工具调用开始执行,不重新问 LLM,也不重走之前的推理步骤。
还有一个细节:Runnable 是按 AgentConfig 缓存的,缓存的 TTL 是 35 分钟,而中断的 TTL 是 30 分钟。35 > 30 是故意的——保证 resume 时 Runnable 还在缓存里,不需要重新构建图、重新拉工具 schema,减少恢复路径的故障面。
第二步:用户在 UI 做决定
浏览器收到 interrupt SSE 帧,弹出审批窗口,显示:
- 工具名和参数(来自
Interrupt.ToolCall) - 原因说明(来自
Interrupt.Reason) - 三个操作:允许 / 拒绝 / 修改参数后允许
用户确认后,前端调:
POST /v1/sessions/{id}/interrupts/{intr_id}/resume
{
"action": "approve" | "deny" | "modify",
"modified_args": "...", // action=modify 时
"comment": "..."
}
第三步:ResumeInterruptHandler 路由
func (h *ResumeInterruptHandler) Handle(ctx context.Context, in ResumeInput) error {
in.Decision.DecidedAt = time.Now()
sess, err := h.repo.Load(ctx, in.SessionID)
if err != nil {
return err
}
if sess.State() != model.StateWaiting {
return fmt.Errorf("resume interrupt: %w (state=%s)", ErrSessionNotWaiting, sess.State())
}
if in.Decision.Action == model.DecisionDeny {
return h.runner.rejectInterrupt(ctx, sess, in.Decision)
}
// approve / modify
return h.runner.Handle(ctx, RunTurnInput{
SessionID: in.SessionID,
Decision: &in.Decision,
})
}
先校验状态,再分叉:
deny→rejectInterrupt:终止 session,同时记录完整的 HITL 决策事件approve/modify→ 重新进RunTurnHandler.Handle,从中断的工具调用继续
approve 路径重新进循环时,RunTurnHandler 会拿到 Decision,把之前等待的工具调用继续执行下去,不重新问 LLM,也不重新拉工具 schema。
为什么用 RejectInterrupt 而不是 Cancel
拒绝时,有两条路径可以走:
sess.Cancel("hitl:deny") // ← 只 emit EventSessionCancelled
sess.RejectInterrupt(decision) // ← emit EventInterruptResolved + EventSessionCancelled
我们用的是 RejectInterrupt。区别在于事件完整性:
Cancel 只告诉下游"session 结束了",审计日志里看不到 HITL 这条决策链——是谁拒绝的、什么时候、拒绝的是哪个工具调用、有没有备注。RejectInterrupt 会先 emit EventInterruptResolved(带完整的 InterruptDecision),再 emit EventSessionCancelled。审计服务能拿到这两个事件,还原出完整的决策链。
这在合规场景里很重要:操作日志里必须能查到"谁、在什么时候、为什么拒绝了这个工具调用"。
超时处理:K8s CronJob 兜底
审批有 TTL,超时未处理的 session 需要自动关掉。这是一个周期任务:
// cmd/cron/hitl-timeout · K8s CronJob */5 * * * *
func (h *ExpireWaitingInterruptsHandler) cancelOne(
ctx context.Context, sid model.SessionID, now time.Time,
) error {
sess, _ := h.repo.Load(ctx, sid)
if sess.State() != model.StateWaiting {
return nil // 已被用户审批,跳过
}
decision := model.InterruptDecision{
Action: model.DecisionDeny,
Comment: fmt.Sprintf("hitl:timeout:expired_at=%s", now.UTC().Format(time.RFC3339)),
DecidedBy: "system:hitl-timeout",
DecidedAt: now,
}
_ = sess.RejectInterrupt(decision) // 同样走 RejectInterrupt,保留事件链
_ = h.repo.Save(ctx, sess)
// ...publish events
}
同样走 RejectInterrupt 而不是 Cancel,原因一样:审计需要知道这次 session 结束是因为 HITL 超时,而不是用户主动取消或者 Agent 出错。decided_by=system:hitl-timeout 让报表系统能直接过滤统计。
每个 session 失败不阻断后续:cancelOne 出错只记 log、继续下一个。这是周期任务的标准做法,一个坏掉的 session 不能影响其他的。
流程示意
用户发消息
│
▼
ReAct 循环(Eino 图内部)... LLM → 决定调高危工具
│
┌── tools 节点 ──┐
│ 红灯亮了! │ ← WithInterruptBeforeNodes
│ 图执行暂停 │
└───────┬────────┘
│
ExtractInterruptInfo → interrupt != nil
│
handleEinoInterrupt:
sess.Pause(interrupt) state: running → waiting
│
stream.Emit(interrupt) ← SSE 帧推给浏览器
│
persist(sess) ← 状态落库
[循环 Goroutine 退出]
│
│ (用户在 UI 看到审批弹窗)
│
┌─────────┴──────────┐
deny approve/modify
│ │
RejectInterrupt 重进 RunTurnHandler
emit × 2 events 从 Eino checkpoint 恢复
state → cancelled 继续执行工具调用
state: waiting → running
跟 Eino ADK 的关系
上一篇已经提过,ReAct 循环的编排我们直接用了 Eino 的 react.NewAgent()。HITL 中断也是——WithInterruptBeforeNodes 和 ExtractInterruptInfo 都是 Eino 框架提供的 API。
但 Eino 只管"在哪里停"和"怎么恢复图的执行",不管停了之后业务上要干什么。停了之后的事是我们的领域层在处理:
sess.Pause()把状态迁移到waitingInterruptKind枚举标记中断的业务含义(工具审批 / 合规 / 预算)RejectInterrupt保留完整的审计事件链decided_by字段记录谁做的决定- 超时兜底走同一个 handler,
decided_by=system:hitl-timeout
简单说:Eino 负责在哪里停,我们负责停了之后怎么记、怎么审、怎么超时兜底。 两者各管各的,中间只通过 AgentInterruptInfo 这个薄薄的数据结构通信。
小结
HITL 这条路径的核心是三个设计决定:
- 状态持久化先于一切:
sess.Pause+persist之后循环才退出,重启不丢状态 - RejectInterrupt 而不是 Cancel:两个事件比一个事件多的信息,合规审计用得上
- CronJob 走同一路径:超时按 deny 处理,
decided_by标记来源,审计可区分
下一篇看 SSE 推流——token 是怎么一个个从 LLM 流到浏览器的。
下一篇:SSE 推流 —— Token 怎么一个个蹦出来
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)