系列「企业级 AI Agent 实现拆解」第二篇。上一篇:一个 while 循环而已?—— 为什么我用 DDD 来构建企业级 AI Agent讲了整体架构选型。这篇从最核心的领域对象 Session 开始,看状态管理具体怎么落地。


第一版写法,后来全推翻了

会话对象的第一版,长这样:

type Session struct {
    ID    string
    State string
    Turns int
    // ...
}

Public 字段,拿来就用。写 demo 的时候没问题。

上生产之后踩了坑。任何拿到 Session 引用的代码都能直接写 sess.State = "completed",没有任何校验。Agent 系统的调用链很长——ReAct 循环、Hook 回调、HTTP handler、定时任务都在操作同一个会话对象。有一次调试 HITL 中断,一个 waiting 状态的 session 莫名其妙变成了 running,排查了两小时,最后发现是某个错误处理路径直接赋值了 State 字段,完全没过状态机校验。

在那之后,我把所有字段改成了私有。


把状态迁移表写成代码

现在的 Session 所有字段都是私有的,状态流转只能通过方法调用,每次迁移先过一张合法性检查表:

// domain/model/session.go
var validTransitions = map[State]map[State]bool{
    StateRunning: {
        StateWaiting:   true, // 触发 HITL,等人工审批
        StateCompleted: true, // Agent 给出最终答案
        StateFailed:    true, // 出了异常
        StateCancelled: true, // 用户主动中止
    },
    StateWaiting: {
        StateRunning:   true, // 审批通过,继续跑
        StateCancelled: true, // 审批拒绝,终止
    },
    // completed / failed / cancelled 是终态,不在表里 = 不允许再转移
}

func (s *Session) transitTo(to State) error {
    allowed, ok := validTransitions[s.state]
    if !ok || !allowed[to] {
        return ErrInvalidStateTransition{From: s.state, To: to}
    }
    s.state = to
    s.updatedAt = time.Now()
    return nil
}

这张表是整个系统对"Agent 什么状态下能做什么"的唯一权威说明。要加状态、改规则,改这一个地方就行。

对外暴露的是方法,不是字段:

func (s *Session) Pause(it Interrupt) error          { ... } // running → waiting
func (s *Session) Resume(d InterruptDecision) error  { ... } // waiting → running
func (s *Session) Complete() error                   { ... } // running → completed
func (s *Session) Fail(reason string) error          { ... } // running → failed
func (s *Session) Cancel(by string) error            { ... } // running|waiting → cancelled

每个方法内部调 transitTo,非法迁移直接返回错误,调用方决定怎么处理。


新建和重建,是两件不同的事

Session 有两个构造函数,不是代码重复,是刻意区分的:

// NewSession · 新建会话,走业务逻辑
func NewSession(tenantID, userID, agentConfig string) (*Session, error) {
    if tenantID == "" || userID == "" {
        return nil, ErrMissingIdentity
    }
    s := &Session{
        id:          SessionID(uuid.NewString()),
        tenantID:    tenantID,
        userID:      userID,
        agentConfig: agentConfig,
        state:       StateRunning,
        createdAt:   time.Now(),
    }
    s.emit(EventSessionStarted{ ... }) // 产生领域事件
    return s, nil
}

// ReconstructSession · 从数据库重建,不产生事件
func ReconstructSession(id SessionID, tenantID, userID, agentConfig string,
    state State, createdAt, updatedAt time.Time) *Session {
    if tenantID == "" || userID == "" {
        panic("session: ReconstructSession with empty tenantID/userID — DB corruption?")
    }
    return &Session{ /* 直接装字段 */ }
}

区别在两点:

第一,领域事件。新建时要通知下游"有会话开始了"——触发计费记录、审计日志、监控打点。从数据库加载已有会话时,它早就存在了,不该再发一遍"新建"事件。

第二,错误处理级别。新建时 tenantID 为空,是调用方传错了,返回 error 让上层处理。重建时 tenantID 为空,说明数据库里存了一条残缺记录,这是数据损坏,panic 是合理的——业务层没有办法 recover 这种情况。


一个容易忽略的安全细节

ErrMissingIdentity 这个校验背后有一个不太直觉的安全考量:

空字符串的 tenantID 在 PostgreSQL RLS(行级安全策略)里是个危险值。RLS 策略依赖 current_setting('app.tenant_id') 做隔离,如果这个值是空字符串,在某些数据库配置下会绕过 RLS,变成能读到所有租户的数据。

所以这个校验放在领域层比放在 HTTP handler 里更可靠——不管请求从哪里进来,这里都拦得住,不依赖上层"记得检查"。


领域事件:先记录,再发布

状态变更时,Session 不直接调用任何外部服务,而是把事件挂在自己身上:

func (s *Session) emit(e DomainEvent) {
    s.events = append(s.events, e)
}

Pause() 时 emit EventInterruptRaisedComplete() 时 emit EventSessionCompleted,以此类推。

真正的发布是在用例层,等数据库写成功之后:

// application/command/run_turn.go
func (h *RunTurnHandler) persist(ctx context.Context, s *model.Session) error {
    if err := h.repo.Save(ctx, s); err != nil {
        return err
    }
    if evts := s.Events(); len(evts) > 0 {
        _ = h.bus.Publish(ctx, string(s.ID()), evts)
        s.ClearEvents()
    }
    return nil
}

先落库,再发事件。避免出现"数据没保存成功,但下游已经收到事件"的不一致。

领域层只管记录、不管发布,不依赖任何消息队列的接口。这样做有个实际好处:领域模型的单元测试完全不需要 mock 外部系统,直接检查 sess.Events() 就够了。


几个小设计决定,以及为什么

IsTerminal() 为什么单独写成方法

func (s *Session) IsTerminal() bool {
    return s.state == StateCompleted ||
           s.state == StateFailed   ||
           s.state == StateCancelled
}

"是不是终态"这个判断在 ReAct 循环入口、HTTP handler、定时清理任务里都会用到。如果每处都展开写,将来加个新终态就得挨个找,很容易漏。方法封装之后,改一处。

CurrentInterrupt() 为什么返回副本

func (s *Session) CurrentInterrupt() *Interrupt {
    if s.currentInterrupt == nil { return nil }
    cp := *s.currentInterrupt   // 返回副本,不是指针
    return &cp
}

防止调用方拿到指针后直接修改 Session 内部的 Interrupt 字段,绕过状态机。多一次结构体拷贝,对这种小对象可以接受。


小结

这个 Session 的设计不复杂,核心就三条:

  1. 状态迁移集中管理:一张 validTransitions 表,所有迁移都过它,没有散落在各处的 if-else
  2. 新建和重建区分清楚:领域事件只在新建时发,从数据库重建不发
  3. 校验尽量靠近数据ErrMissingIdentity 在领域层拦,不指望上层"会注意"

Agent 系统的状态比普通 CRUD 应用复杂,因为它是有时序的:这一步的输出是下一步的输入,中间还可能有人工介入。状态管理做扎实了,后面的 ReAct 循环、HITL 中断才有可靠的地基。

顺带一提,这种"状态机驱动 + interrupt/resume"的模式在开源社区也有成熟的实践。Google adk-go(Golang 生态)用 RequestInput 暂停执行、ResumabilityConfig 做检查点持久化。CloudWeGo Eino 的 ADK(从 v0.5.0 起,设计上参考了 Google ADK)在 Go 生态实现了类似机制——Interrupt 暂停、Resume 恢复,框架层负责状态持久化和跨实例路由。它们把 interrupt/resume 封装成了通用框架能力,而我们把它做成 DDD 领域层的业务规则——validTransitions 表里每一行都是业务语义,不是技术配置。两种做法各有适用场景。


下一篇:ReAct 循环的 50 行 Go 实现,逐行拆解

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐