范围:本文基于仓库 src/query_engine.py,并关联 src/models.pyUsageSummaryPermissionDenial)、src/transcript.pysrc/session_store.pysrc/runtime.pysrc/main.py 中的调用方式。分析对象是 Python 移植层QueryEnginePort,其 submit_message 不发起任何外部 LLM HTTP 请求;输出由本地格式化的摘要行或 JSON 构成,用于练习 会话状态、停止语义、流式事件形状与持久化


1. 为什么「不接模型」仍能练会话?

真实生产 Harness 里,大模型只是 turn 的一环:前后还有 配额、轮次上限、权限拒绝、转写是否落盘、压缩何时触发、结构化输出是否可解析 等横切逻辑。若在第一天就把这些与 openai.chat.completions 绑死,调试时会出现:

  • 分不清是 prompt 写坏了 还是 状态机越界
  • 分不清是 工具真的被拒 还是 网络超时
  • 单测无法稳定复现(依赖 API Key、速率与模型随机性)。

QueryEnginePort 刻意把 「一轮用户输入 → 结构化 TurnResult + 可选流式事件」 做成 纯本地、可单测、可 CLI 驱动,让「会话骨架」与「模型推理」解耦。接入真实模型时,应替换的是 生成 output 字符串的那一段,而不是推翻 TurnResult / stop_reason / persist_session 的契约。


2. 核心类型:配置、单轮结果、引擎本体

2.1 QueryEngineConfig:状态机的「旋钮」

# 15:21:src/query_engine.py
@dataclass(frozen=True)
class QueryEngineConfig:
    max_turns: int = 8
    max_budget_tokens: int = 2000
    compact_after_turns: int = 12
    structured_output: bool = False
    structured_retry_limit: int = 2
字段 在行为上的含义
max_turns 会话深度闸门:以 mutable_messages 条数衡量,达到上限则 拒绝再处理新 prompt(见下文提前返回)。
max_budget_tokens 累计用量闸门:注意实现里用 UsageSummary.add_turn词数近似(非真实 tokenizer)。
compact_after_turns 压缩/截断阈值:超过后对 mutable_messagesTranscriptStore.entries 只保留尾部窗口。
structured_output 输出是 多行文本 还是 JSON 字符串(便于下游解析)。
structured_retry_limit JSON 序列化失败时的重试次数(防御分支)。

这些字段共同定义了 无需模型也能跑通的有限状态行为

2.2 TurnResult:单轮对外契约(含停止原因)

# 24:32:src/query_engine.py
@dataclass(frozen=True)
class TurnResult:
    prompt: str
    output: str
    matched_commands: tuple[str, ...]
    matched_tools: tuple[str, ...]
    permission_denials: tuple[PermissionDenial, ...]
    usage: UsageSummary
    stop_reason: str

学习点stop_reason 是审计与编排的 一等公民,与 output 并列返回;调用方(CLI、PortRuntime、未来 UI)可以 分支处理(例如 max_turns_reached 时禁止继续提交)。

2.3 QueryEnginePort 持有的可变状态

# 35:43:src/query_engine.py
@dataclass
class QueryEnginePort:
    manifest: PortManifest
    config: QueryEngineConfig = field(default_factory=QueryEngineConfig)
    session_id: str = field(default_factory=lambda: uuid4().hex)
    mutable_messages: list[str] = field(default_factory=list)
    permission_denials: list[PermissionDenial] = field(default_factory=list)
    total_usage: UsageSummary = field(default_factory=UsageSummary)
    transcript_store: TranscriptStore = field(default_factory=TranscriptStore)
  • manifest:与工作区 PortManifest 绑定,render_summary() 时拼进报告(会话与「移植面」同源展示)。
  • session_id:稳定关联持久化文件(见 session_store)。
  • mutable_messages轮次计数与压缩的主载体(每成功处理一轮 prompt 追加一条)。
  • permission_denials跨轮累积的拒绝记录(审计位);render_summary() 打印 len(self.permission_denials)
  • total_usage:跨轮累计的「伪 token」计数。
  • transcript_store:与 mutable_messages 同步追加,但 flush 语义 单独维护(TranscriptStore.flushed)。

3. 状态机:submit_message 的两条路径

3.1 路径 A——max_turns 已耗尽:不修改状态,立即返回

# 61:78:src/query_engine.py
    def submit_message(
        self,
        prompt: str,
        matched_commands: tuple[str, ...] = (),
        matched_tools: tuple[str, ...] = (),
        denied_tools: tuple[PermissionDenial, ...] = (),
    ) -> TurnResult:
        if len(self.mutable_messages) >= self.config.max_turns:
            output = f'Max turns reached before processing prompt: {prompt}'
            return TurnResult(
                prompt=prompt,
                output=output,
                matched_commands=matched_commands,
                matched_tools=matched_tools,
                permission_denials=denied_tools,
                usage=self.total_usage,
                stop_reason='max_turns_reached',
            )

设计解读

  • 条件用的是 >= max_turns:当已有 max_turns 条历史时,新一轮不再入队。等价于「最多容纳 max_turns 条用户消息」的会话深度。
  • 提前返回不会 append mutable_messages不会更新 total_usage不会 extend permission_denials
  • TurnResult 里仍带回本次传入的 matched_*denied_tools,便于 UI 展示「这一轮本想做什么」,但 引擎内部状态冻结在边界上

审计细节:此路径下返回的 permission_denials 是参数 denied_tools,但实例字段 self.permission_denials 未追加;若你需要「累计拒绝次数包含越界轮」,需在调用层统一合并。这是移植层的小不一致,阅读时留意。

3.2 路径 B——正常处理:生成输出 → 计量 → 写入状态 → 压缩

# 80:104:src/query_engine.py
        summary_lines = [
            f'Prompt: {prompt}',
            f'Matched commands: {", ".join(matched_commands) if matched_commands else "none"}',
            f'Matched tools: {", ".join(matched_tools) if matched_tools else "none"}',
            f'Permission denials: {len(denied_tools)}',
        ]
        output = self._format_output(summary_lines)
        projected_usage = self.total_usage.add_turn(prompt, output)
        stop_reason = 'completed'
        if projected_usage.input_tokens + projected_usage.output_tokens > self.config.max_budget_tokens:
            stop_reason = 'max_budget_reached'
        self.mutable_messages.append(prompt)
        self.transcript_store.append(prompt)
        self.permission_denials.extend(denied_tools)
        self.total_usage = projected_usage
        self.compact_messages_if_needed()
        return TurnResult(
            prompt=prompt,
            output=output,
            matched_commands=matched_commands,
            matched_tools=matched_tools,
            permission_denials=denied_tools,
            usage=self.total_usage,
            stop_reason=stop_reason,
        )

设计解读

  1. 「模型输出」的替身summary_lines 把本应由上游路由/权限算好的 matched_commandsmatched_tools、拒绝数量 写进人类可读摘要;再接模型时,这一段可换成真实 completion,同时保留相同字段供审计。
  2. 停止条件 max_budget_reached:在 本 turn 已经计入 projected_usage 之后 才判定;本轮仍会落盘appendextend、更新 total_usage)。与 max_turns 路径不同——预算是「软顶」:越界轮仍进入历史,但 stop_reason 标记为预算耗尽,便于产品层提示「下一句要新开会话或清理上下文」。
  3. permission_denials:把 本轮 denied_tools 累积进实例列表,实现跨 turn 的拒绝审计(render_summary 可读到总次数)。
  4. compact_messages_if_needed:在写入后调用,避免历史无限增长。

4. 用量模型:UsageSummary 与「假 token」

# 28:36:src/models.py
@dataclass(frozen=True)
class UsageSummary:
    input_tokens: int = 0
    output_tokens: int = 0

    def add_turn(self, prompt: str, output: str) -> 'UsageSummary':
        return UsageSummary(
            input_tokens=self.input_tokens + len(prompt.split()),
            output_tokens=self.output_tokens + len(output.split()),
        )

学习点:字段名叫 input_tokens / output_tokens,实现却是 split() 词数。这在移植/教学代码里很常见:先打通「累计 → 阈值 → stop_reason」管线,再换成真实 tokenizer 或 API 返回的 usage。单测里 test_bootstrap_session_tracks_turn_state 只断言 usage.input_tokens >= 1,与这种近似一致。

预算判定:

# 88:90:src/query_engine.py
        stop_reason = 'completed'
        if projected_usage.input_tokens + projected_usage.output_tokens > self.config.max_budget_tokens:
            stop_reason = 'max_budget_reached'

输入侧伪 token + 输出侧伪 tokenmax_budget_tokens 比较。


5. 转写与压缩:TranscriptStorecompact_messages_if_needed

# 6:23:src/transcript.py
@dataclass
class TranscriptStore:
    entries: list[str] = field(default_factory=list)
    flushed: bool = False

    def append(self, entry: str) -> None:
        self.entries.append(entry)
        self.flushed = False

    def compact(self, keep_last: int = 10) -> None:
        if len(self.entries) > keep_last:
            self.entries[:] = self.entries[-keep_last:]

    def replay(self) -> tuple[str, ...]:
        return tuple(self.entries)

    def flush(self) -> None:
        self.flushed = True
# 129:132:src/query_engine.py
    def compact_messages_if_needed(self) -> None:
        if len(self.mutable_messages) > self.config.compact_after_turns:
            self.mutable_messages[:] = self.mutable_messages[-self.config.compact_after_turns :]
        self.transcript_store.compact(self.config.compact_after_turns)

学习点

  • 双缓冲mutable_messagestranscript_store.entriessubmit_message 里同步 append,压缩时也 用同一 compact_after_turns 截尾
  • flushed 语义:每次 appendFalseflush()Truepersist_session 会先 flush_transcript() 再写盘——与「是否已持久化」叙事对齐。
  • 压缩是简单截断,不是摘要模型;与 02_inventory.md 中「Compaction 接口先行」一致。

6. 审计位:权限拒绝、流式事件、摘要报告

6.1 权限拒绝:本轮元组 vs 实例累积

  • 本轮TurnResult.permission_denials == denied_tools(调用方传入的快照)。
  • 累积self.permission_denials.extend(denied_tools)(仅路径 B)。

render_summary() 暴露累积长度:

# 185:191:src/query_engine.py
            f'Session id: {self.session_id}',
            f'Conversation turns stored: {len(self.mutable_messages)}',
            f'Permission denials tracked: {len(self.permission_denials)}',
            f'Usage totals: in={self.total_usage.input_tokens} out={self.total_usage.output_tokens}',
            f'Max turns: {self.config.max_turns}',
            f'Max budget tokens: {self.config.max_budget_tokens}',
            f'Transcript flushed: {self.transcript_store.flushed}',

6.2 流式 API 形状(仍无网络)

stream_submit_message生成器 yield dict,模拟常见 SSE/流式协议的阶段:

# 106:127:src/query_engine.py
    def stream_submit_message(
        self,
        prompt: str,
        matched_commands: tuple[str, ...] = (),
        matched_tools: tuple[str, ...] = (),
        denied_tools: tuple[PermissionDenial, ...] = (),
    ):
        yield {'type': 'message_start', 'session_id': self.session_id, 'prompt': prompt}
        if matched_commands:
            yield {'type': 'command_match', 'commands': matched_commands}
        if matched_tools:
            yield {'type': 'tool_match', 'tools': matched_tools}
        if denied_tools:
            yield {'type': 'permission_denial', 'denials': [denial.tool_name for denial in denied_tools]}
        result = self.submit_message(prompt, matched_commands, matched_tools, denied_tools)
        yield {'type': 'message_delta', 'text': result.output}
        yield {
            'type': 'message_stop',
            'usage': {'input_tokens': result.usage.input_tokens, 'output_tokens': result.usage.output_tokens},
            'stop_reason': result.stop_reason,
            'transcript_size': len(self.transcript_store.entries),
        }

学习点:前端或中间层可以 只对接事件类型,无需关心底层是否真流式生成 token;PortRuntime.bootstrap_sessiontuple(engine.stream_submit_message(...)) 收进 RuntimeSession.stream_events,便于整段 Markdown 报告调试。

6.3 render_summary:会话 + 工作区 + 清单面

render_summary 拉取 build_command_backlog() / build_tool_backlog()manifest,把 一次会话自省移植清单 打在一张表里,适合 python3 -m src.main summary 的人类阅读(测试见 test_query_engine_summary_mentions_workspace)。


7. 持久化与恢复:persist_session / from_saved_session

# 140:150:src/query_engine.py
    def persist_session(self) -> str:
        self.flush_transcript()
        path = save_session(
            StoredSession(
                session_id=self.session_id,
                messages=tuple(self.mutable_messages),
                input_tokens=self.total_usage.input_tokens,
                output_tokens=self.total_usage.output_tokens,
            )
        )
        return str(path)
# 49:59:src/query_engine.py
    @classmethod
    def from_saved_session(cls, session_id: str) -> 'QueryEnginePort':
        stored = load_session(session_id)
        transcript = TranscriptStore(entries=list(stored.messages), flushed=True)
        return cls(
            manifest=build_port_manifest(),
            session_id=stored.session_id,
            mutable_messages=list(stored.messages),
            total_usage=UsageSummary(stored.input_tokens, stored.output_tokens),
            transcript_store=transcript,
        )

学习点

  • 磁盘 JSON(默认目录 .port_sessions/)只存 session_id、messages、input/output 累计不存 permission_denials 列表。若生产需要完整审计链,应在 StoredSession 中扩展字段或另建日志。
  • from_saved_session 不恢复 self.permission_denials(新实例为空列表);这是当前移植层的缺口/简化。

8. 与 PortRuntime 的衔接:谁负责路由,谁负责 turn

PortRuntime.bootstrap_session 在外部完成 route_prompt、registry shim、_infer_permission_denials,再把 匹配结果与拒绝 喂给 QueryEnginePort

  • 路由与权限推断 → runtime
  • 会话深度、预算、转写、持久化 → QueryEnginePort

职责分离清晰:QueryEnginePort 不实现「从自然语言猜工具」,只实现 「给定结构化输入下的会话语义」——这正是「不接模型也能练」的前提:路由可用规则/关键词(route_prompt)代替模型。

run_turn_loop 用同一组 matches 重复 submit_message,用于压测 max_turns / budget / structured_output(见 test_turn_loop_cli_runs),而非模拟真实多轮意图变化。


9. CLI 入口速查

命令 作用
python3 -m src.main summary QueryEnginePort(manifest).render_summary()
python3 -m src.main turn-loop ... PortRuntime.run_turn_loop,打印每轮 outputstop_reason
python3 -m src.main flush-transcript <prompt> from_workspacesubmit_messagepersist_session,打印路径与 flushed
python3 -m src.main load-session <id> 直接读 StoredSession JSON

10. 小结:QueryEnginePort 如何把三件事摆对

  1. 状态机:用 mutable_messages 长度与 max_turns 定义 硬停止(越界不写入);用 total_usagemax_budget_tokens 定义 软停止(越界仍写入但 stop_reason 标记)。
  2. 停止条件:统一从 TurnResult.stop_reason 读出(max_turns_reached / max_budget_reached / completed),便于上层统一处理。
  3. 审计位:本轮 matched_*denied_tools 进入输出与 TurnResult;拒绝 累积permission_denials;流式事件暴露 permission_denial 与最终 message_stop;转写 flushedtranscript_size 可观测;持久化保存消息与用量近似。

后续接大模型时:保留 TurnResultstream_submit_message 的事件形状,将 summary_lines / _format_output 替换为真实生成逻辑,并把 真实 usage 写回 UsageSummary,即可在不大改外围的情况下完成演进。


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐