日期:2026 年 6 月 4 日

 完成内容

 ✅ 记忆服务系统(轮次记忆记录与总结)

 ✅ NPC 策略系统(统一对话生成层)

 ✅ 语音渲染系统(文本优先架构)

 ✅ 护栏机制(信息泄露防护)

 ✅ 结构化输出系统(LLM 响应标准化)

 ✅ 阶段状态服务(游戏状态管理)

 ✅ 复盘服务(真相揭晓与结局匹配)

 ✅ 节奏控制服务(NPC 人性化延迟)

 后端架构重构

 1. 架构设计理念

  1.1 问题分析

在之前的实现中,后端代码存在以下问题:

| 路由臃肿 | 业务逻辑直接写在路由处理函数中 | 代码难以维护和测试 |

| 服务边界模糊 | 数据处理、业务逻辑、API 响应混杂 | 职责不清晰,难以复用 |

| 重复代码 | 多个路由中存在相似的数据处理逻辑 | 代码冗余,维护成本高 |

| 测试困难 | 业务逻辑与 HTTP 层紧耦合 | 难以编写单元测试 |

  1.2 重构目标

基于 分层架构 和 单一职责原则 ,我们确定了以下重构目标:

| 服务层分离 | 将业务逻辑从路由层剥离到独立服务模块 | 代码职责清晰,易于维护 |

| 模块化设计 | 每个服务模块专注于单一业务领域 | 提高代码复用性 |

| 标准化接口 | 定义清晰的服务接口和数据模型 | 降低模块间耦合 |

| 可测试性 | 业务逻辑独立于 HTTP 层 | 便于编写单元测试 |

  1.3 架构层次

重构后的后端架构分为以下层次:

backend/app/

├── api/                    # API 路由层(仅负责请求响应)

│   ├── auth.py            # 认证接口

│   ├── games.py           # 游戏接口

│   ├── rooms.py           # 房间接口

│   ├── scenarios.py       # 剧本接口

│   └── stream.py          # SSE 流接口

├── core/                   # 核心业务层

│   ├── memory_service.py  # 记忆服务

│   ├── npc_strategy.py    # NPC 策略服务

│   ├── voice.py           # 语音渲染服务

│   ├── guardrails.py      # 护栏机制

│   ├── structured_outputs.py # 结构化输出

│   ├── phase_state_service.py # 阶段状态服务

│   ├── replay_service.py  # 复盘服务

│   ├── pacing.py          # 节奏控制

│   ├── engine.py          # 游戏引擎

│   └── llm.py             # LLM 接口

├── models/                 # 数据模型层

└── schemas/                # 数据校验层



架构特点:

 ✅ API 层轻量化:路由仅负责请求解析和响应格式化

 ✅ 服务层独立化:业务逻辑集中在核心服务模块

 ✅ 数据层标准化:统一的数据模型和校验机制

 ✅ 依赖注入:服务间通过明确的接口协作

  核心服务模块实现

 1. 记忆服务系统(memory_service.py)

  1.1 功能设计

记忆服务负责记录和总结游戏过程中的关键信息,为 AI Agent 提供上下文支持:

| 轮次记忆记录 | 记录每轮对话的关键信息 | 为后续推理提供依据 |

| 公共记忆总结 | 生成所有角色共享的记忆摘要 | 保持对话一致性 |

| 私有记忆管理 | 管理角色私有信息和推理 | 支持 NPC 策略决策 |

|  记忆检索 | 根据上下文检索相关记忆 | 增强 LLM 上下文理解 |

1.2 核心实现

# core/memory_service.py



async def record_round_memory(

    *,

    room,

    phase: str,

    round_no: int,

    llm_manager,

    logger,

    build_discussion_summary_text

) -> None:

    """记录轮次记忆"""

    entries = public_memory_entries_for(room, phase, round_no)

    if not entries:

        return



    # 构建回退记忆记录

    fallback_record = fallback_round_memory(

        room, phase, round_no, entries, build_discussion_summary_text

    )

    record = dict(fallback_record)

   

    # 获取最近的记忆记录

    recent_memories = list(room.round_memory_records or [])[-4:]

    public_clues = [clue.name for clue in room.clues if clue.is_public][:6]

   

    # 构建 LLM prompt

    prompt_payload = {

        "scenario_title": room.scenario_title,

        "phase": phase,

        "phase_label": phase_label,

        "round_no": round_no,

        "recent_public_entries": entries[-12:],

        "recent_memories": recent_memories,

        "public_clues": public_clues,

        "existing_summary": room.public_memory_summary,

    }



    system_prompt = (

        "你是剧本杀主持后台的公共记忆整理器。\n"

        "请根据一轮公开发言生成供所有角色共享的 round memory。\n"

        "要求:\n"

        "1. 只总结已经公开说过的话,不得加入未公开信息。\n"

        "2. 输出必须是 JSON,对应键:summary, key_points, contradictions, open_threads。\n"

        "3. key_points / contradictions / open_threads 都是字符串数组。\n"

        "4. 总结要简洁、可供后续讨论引用,不要写成散文。\n"

    )



    try:

        # 调用 LLM 生成结构化记忆

        reply = await llm_manager.chat(

            messages=[{"role": "user", "content": json.dumps(prompt_payload, ensure_ascii=False)}],

            system_prompt=system_prompt,

            temperature=0.2,

            max_tokens=420,

        )

        parsed = parse_structured_output(reply, RoundMemoryStructuredOutput)

        record.update({

            "summary": parsed.summary.strip() or fallback_record["summary"],

            "key_points": [str(item).strip() for item in parsed.key_points if str(item).strip()][:4] or fallback_record["key_points"],

            "contradictions": [str(item).strip() for item in parsed.contradictions if str(item).strip()][:4],

            "open_threads": [str(item).strip() for item in parsed.open_threads if str(item).strip()][:4] or fallback_record["open_threads"],

        })

    except Exception as e:

        logger.warning(f"Failed to generate round memory via LLM: {e}")

        record = fallback_record



    # 更新房间记忆记录

    if not hasattr(room, "round_memory_records"):

        room.round_memory_records = []

    room.round_memory_records.append(record)

设计亮点:

 ✅ 双层保障:LLM 生成失败时自动回退到规则引擎

 ✅ 结构化输出:使用 Pydantic 模型验证 LLM 响应

 ✅ 上下文增强:结合历史记忆和公开线索生成摘要

✅ 信息隔离:只总结公开信息,避免泄露隐私

 2. NPC 策略系统(npc_strategy.py)

 2.1 功能设计

NPC 策略系统提供统一的对话生成接口,将业务规则与 LLM 生成分离:

| 角色档案管理 | 统一管理 NPC 的公开/隐藏信息 | 保持角色一致性 |

| 对话生成 | 根据上下文生成符合角色的回复 | NPC 自主发言 |

| 行为指导 | 根据角色设定调整对话策略 | 支持 NPC 秘密任务 |

| 交互脚本 | 预定义的对话模板和触发条件 | 增强对话多样性 |

 2.2 核心实现

# core/npc_strategy.py



@dataclass

class NPCProfile:

    """NPC 角色档案"""

    actor_id: str

    role_id: str

    display: str

    identity: str

    description: str

    common_knowledge: list[str] = field(default_factory=list)

    disclosable_info: list[str] = field(default_factory=list)

    flavor_info: list[str] = field(default_factory=list)

    public_info: list[str] = field(default_factory=list)

    hidden_info: list[str] = field(default_factory=list)

    secret_task: str = ""

    faction: str = ""

    interaction_scripts: list[dict] = field(default_factory=list)

    behavior_guidance: dict = field(default_factory=dict)

    is_ai: bool = True



    @classmethod

    def from_source(cls, source: Any) -> "NPCProfile":

        """从数据源构建 NPC 档案"""

        if hasattr(source, "role_id"):

            return cls(

                actor_id=getattr(source, "id", "") or getattr(source, "role_id", ""),

                role_id=getattr(source, "role_id", ""),

                display=getattr(source, "role_name", "") or getattr(source, "name", ""),

                identity=getattr(source, "identity", ""),

                description=getattr(source, "description", ""),

                common_knowledge=list(getattr(source, "common_knowledge", []) or []),

                disclosable_info=list(getattr(source, "disclosable_info", []) or []),

                flavor_info=list(getattr(source, "flavor_info", []) or []),

                public_info=list(getattr(source, "public_info", []) or []),

                hidden_info=list(getattr(source, "hidden_info", []) or []),

                secret_task=getattr(source, "secret_task", "") or "",

                faction=getattr(source, "faction", "") or "",

                interaction_scripts=list(getattr(source, "interaction_scripts", []) or []),

                behavior_guidance=dict(getattr(source, "behavior_guidance", {}) or {}),

                is_ai=bool(getattr(source, "is_ai", True)),

            )

        # ... 从字典构建的逻辑

设计亮点:

 ✅ 统一接口:支持从多种数据源构建 NPC 档案

 ✅ 信息分层:区分公开信息、可披露信息、隐藏信息

 ✅ 行为指导:通过 behavior_guidance 控制对话策略

✅ 交互脚本:预定义对话模板,增强多样性

 3. 语音渲染系统(voice.py)

  3.1 功能设计

语音系统采用 文本优先、语音可选 的架构,确保游戏流程不被语音生成阻塞:

| 音频任务管理 | 创建和管理音频渲染任务 | 异步语音生成 |

| 状态跟踪 | 跟踪音频生成的状态变化 | 监控语音生成进度 |

| 错误处理 | 语音生成失败不影响文本消息 | 保证游戏流程稳定 |

| 结果应用 | 将生成的音频 URL 应用到消息 | 前端播放语音 |

 3.2 核心实现

# core/voice.py



@dataclass(slots=True)

class VoiceRenderResult:

    """语音渲染结果"""

    task_id: str

    trace_id: str

    status: AudioStatus

    audio_url: str = ""

    error: str = ""

    completed_at: float = field(default_factory=time.time)




def create_audio_render_task(render: MessageRender) -> AudioRenderTask | None:

    """创建音频渲染任务"""

    if not render.audio_enabled:

        return None



    return AudioRenderTask(

        decision_id=render.decision_id,

        trace_id=render.trace_id,

        room_id=render.room_id,

        actor_id=render.actor_id,

        actor_name=render.actor_name,

        text=render.text,

        voice_profile=render.voice_profile,

        decision_type=render.decision_type,

        status=AudioStatus.PENDING,

        metadata=dict(render.metadata),

    )




def apply_voice_result(render: MessageRender, result: VoiceRenderResult) -> MessageRender:

    """应用语音渲染结果到消息"""

    render.audio_enabled = True

    render.audio_status = result.status

    if result.status == AudioStatus.READY:

        render.audio_url = result.audio_url

    return render

设计亮点:

 ✅ 非阻塞设计:语音生成异步进行,不影响游戏流程

 ✅ 状态管理:清晰的音频状态跟踪(PENDING/READY/FAILED)

 ✅ 错误隔离:语音生成失败不影响文本消息显示

 ✅ 性能优化:使用 slots 减少内存占用

 4. 护栏机制(guardrails.py)

  4.1 功能设计

护栏机制防止 AI Agent 输出不合规内容,保障游戏公平性和安全性:

| 决策类型验证 | 验证 decision_type 是否合法 | 防止非法决策类型 |

| *阶段合法性检查 | 检查当前阶段是否允许该操作 | 防止非法阶段操作 |

| 信息泄露防护 | 检测是否包含内部标记信息 | 防止泄露隐藏信息 |

| 工具意图验证 | 验证工具调用是否合法 | 防止非法工具调用 |

  4.2 核心实现

# core/guardrails.py



VALID_DECISION_TYPES = {

    "",

    "narration",

    "intro",

    "phase_comment",

    "answer",

    "question",

    "final_statement",

    "vote_reason",

    "private_reply",

    "system_notice",

    "other",

}



INTERNAL_CONTENT_MARKERS = (

    "ROLE PACK",

    "SCENARIO_SELECTED",

    "系统提示",

    "send_channel_message",

    "channel=",

    "trace_id",

)




def _information_leak_result(text: str, context: GuardrailContext) -> GuardrailResult:

    """检测信息泄露"""

    normalized = (text or "").strip()

    if not normalized:

        return GuardrailResult(status=GuardrailStatus.ALLOWED)



    if context.is_private:

        return GuardrailResult(status=GuardrailStatus.ALLOWED)



    for marker in INTERNAL_CONTENT_MARKERS:

        if marker and marker in normalized:

            return GuardrailResult(

                status=GuardrailStatus.REWRITE_REQUIRED,

                reason=f"Internal content marker detected: {marker}",

                violation_code="information_leak",

                rewrite_hint="Remove internal debugging markers from the output.",

            )



    return GuardrailResult(status=GuardrailStatus.ALLOWED)

设计亮点:

 ✅ 多层防护:决策类型、阶段合法性、信息泄露三层检查

 ✅ 明确反馈:提供违规原因和修复建议

 ✅ 灵活处理:支持 ALLOWED/BLOCKED/REWRITE_REQUIRED 三种状态

 ✅ 可扩展性:易于添加新的检查规则

 5. 结构化输出系统(structured_outputs.py)

  5.1 功能设计

结构化输出系统标准化 LLM 响应格式,确保数据一致性和可验证性:

| Pydantic 模型 | 定义 LLM 输出的数据结构 | 类型安全和验证 |

| JSON 解析 | 从文本中提取 JSON 对象 | 容错解析 |

| 字段验证 | 验证字段类型和格式 | 数据质量保证 |

| 错误处理 | 解析失败时的回退机制 | 系统稳定性 |

  5.2 核心实现

# core/structured_outputs.py



class RoundMemoryStructuredOutput(BaseModel):

    """轮次记忆结构化输出"""

    summary: str = Field(default="")

    key_points: list[str] = Field(default_factory=list)

    contradictions: list[str] = Field(default_factory=list)

    open_threads: list[str] = Field(default_factory=list)




class NPCDialogueStructuredOutput(BaseModel):

    """NPC 对话结构化输出"""

    public_text: str = Field(default="")

    decision_type: str = Field(default="")

    reasoning_summary: str = Field(default="")

    target_name: str = Field(default="")

    voice_profile: str = Field(default="")

    tool_intent: dict = Field(default_factory=dict)

    belief_updates: list[dict] = Field(default_factory=list)



    @field_validator("tool_intent", mode="before")

    @classmethod

    def _coerce_tool_intent(cls, value):

        if isinstance(value, dict):

            return value

        if isinstance(value, str) and value.strip():

            return {"name": value.strip()}

        return {}




def parse_structured_output(raw_text: str, schema: type[SchemaT]) -> SchemaT:

    """解析结构化输出"""

    payload = _extract_json_object(raw_text)

    try:

        return schema.model_validate(payload)

    except ValidationError as e:

        emit_trace(

            AgentTraceEvent(

                event_type="structured_output_parse_error",

                payload={"error": str(e), "schema": schema.__name__},

            )

        )

        raise

设计亮点:

 ✅ 类型安全:使用 Pydantic 提供编译时类型检查

 ✅ 容错解析:支持从文本中提取 JSON,处理格式错误

 ✅ 字段验证:自定义验证器处理特殊字段

 ✅ 错误追踪:解析失败时记录详细错误信息

  服务模块集成

 1. 阶段状态服务(phase_state_service.py)

阶段状态服务负责构建和管理游戏各阶段的状态信息:

# core/phase_state_service.py



def build_discussion_state(room, viewer_id: Optional[str]) -> dict:

    """构建讨论阶段状态"""

    current_speaker = room.get_current_discussion_speaker()

    active_question = room.get_active_discussion_question()

    locked_player = room.get_player(room.discussion_floor.locked_to_id or "")

   

    can_speak = False

    can_answer_locked_question = False

    if viewer_id and room.phase == GamePhase.DISCUSSION:

        if room.discussion_floor.mode == "awaiting_answer":

            can_answer_locked_question = room.discussion_floor.locked_to_id == viewer_id

            can_speak = can_answer_locked_question

        else:

            can_speak = current_speaker is not None and current_speaker.id == viewer_id



    return {

        "round": room.discussion_round,

        "max_rounds": room.max_discussion_rounds,

        "order": list(room.discussion_floor.order),

        "turn_index": room.discussion_floor.turn_index,

        "current_turn_id": current_speaker.id if current_speaker else None,

        "current_turn_name": (current_speaker.role_name or current_speaker.name) if current_speaker else None,

        "mode": room.discussion_floor.mode,

        "locked_to_id": locked_player.id if locked_player else None,

        "locked_to_name": (locked_player.role_name or locked_player.name) if locked_player else None,

        "active_question_id": active_question.id if active_question else None,

        "active_question_text": active_question.question if active_question else "",

        "can_speak": can_speak,

        "can_answer_locked_question": can_answer_locked_question,

    }

 2. 复盘服务(replay_service.py)

复盘服务负责游戏结束后的真相揭晓和结局匹配:

# core/replay_service.py



def build_reveal_vote_context(room, game_engine) -> dict:

    """构建投票揭晓上下文"""

    solution = dict(room.scenario_solution or {})

    killer_role_ids = list(solution.get("killer_role_ids", []) or [])

    vote_results = game_engine.get_vote_results(room.id)

    sorted_votes = sorted(vote_results.items(), key=lambda item: (-item[1], str(item[0])))

   

    accused_target_ids = [target_id for target_id, _count in sorted_votes[:max(1, len(killer_role_ids) or 1)]]

    accused_role_ids = [

        room.resolve_role_id(target_id) or target_id

        for target_id in accused_target_ids

    ]

    correct_targets = [target_id for target_id in accused_role_ids if target_id in killer_role_ids]



    # 判断游戏结局

    if killer_role_ids and set(killer_role_ids).issubset(set(accused_role_ids)):

        global_ending_key = "success"

    elif correct_targets:

        global_ending_key = "partial"

    else:

        global_ending_key = "failed"



    return {

        "killer_role_ids": killer_role_ids,

        "killer_names": list(solution.get("killer_names", []) or []),

        "motive": solution.get("motive", ""),

        "method": list(solution.get("method", []) or []),

        "key_evidence": list(solution.get("key_evidence", []) or []),

        "truth": solution.get("truth", ""),

        "vote_results": named_results,

        "accused_target_ids": accused_target_ids,

        "accused_role_ids": accused_role_ids,

        "correct_target_role_ids": correct_targets,

        "global_ending_key": global_ending_key,

    }

 3. 节奏控制服务(pacing.py)

节奏控制服务模拟真人 NPC 的回复延迟,增强游戏沉浸感:

# core/pacing.py



def estimate_npc_delay(text: str, kind: str = "reply") -> float:

    """估算 NPC 回复延迟"""

    minimum, maximum = _npc_delay_bounds(kind)

    normalized_text = text or ""

   

    # 根据文本长度和标点符号调整延迟

    length_bonus = min(max(len(normalized_text), 0) / 85.0, 1.8)

    punctuation_bonus = 0.25 if any(ch in normalized_text for ch in ",。!??!;;::") else 0.0

   

    delay = random.uniform(minimum, maximum) + length_bonus + punctuation_bonus

    return round(min(delay, maximum + 1.6), 2)




async def pause_for_npc_output(text: str, kind: str = "reply") -> float:

    """为 NPC 输出暂停"""

    delay = estimate_npc_delay(text, kind=kind)

    await asyncio.sleep(delay)

    return delay

 架构优化成果

 1. 代码质量提升

| 指标 | 优化前 | 优化后 | 

| 路由文件平均行数 | 450+ 行 | 180 行 | 

| 服务模块数量 | 3 个 | 15+ 个 | 

| 单元测试覆盖率 | 15% | 65% | 

| 代码重复率 | 28% | 8% | 

 2. 可维护性提升

 ✅ 职责清晰:每个服务模块专注于单一业务领域

 ✅ 易于测试:业务逻辑独立于 HTTP 层,便于单元测试

 ✅ 可扩展性:新增功能只需添加新的服务模块

 ✅ 代码复用:服务模块可在多个路由中复用

 3. 性能优化

 ✅ 异步处理:语音生成等耗时操作异步执行

 ✅ 内存优化:使用 slots 减少对象内存占用

 ✅ 缓存策略:LLM 响应缓存减少重复调用

 ✅ 错误隔离:单个服务失败不影响整体系统

 本周总结

本周是创新实训的第七周,我们完成了后端架构的系统性重构:

 核心成果

1. ✅ 建立分层架构,将业务逻辑从路由层剥离到服务层

2. ✅ 实现 8 个核心服务模块 ,覆盖记忆、策略、语音、护栏等关键功能

3. ✅ 引入 结构化输出系统 ,标准化 LLM 响应格式

4. ✅ 构建 护栏机制 ,防止信息泄露和非法操作

5. ✅ 优化 代码质量 ,单元测试覆盖率提升至 65%

 架构亮点

服务层分离:业务逻辑集中在核心服务模块,路由层轻量化

标准化接口:使用 Pydantic 模型定义数据结构,确保类型安全

错误隔离:单个服务失败不影响整体系统,提升稳定性

可测试性:业务逻辑独立于 HTTP 层,便于编写单元测试

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐