山东大学软件学院创新实训(七)
日期:2026 年 6 月 4 日
完成内容
✅ 记忆服务系统(轮次记忆记录与总结)
✅ NPC 策略系统(统一对话生成层)
✅ 语音渲染系统(文本优先架构)
✅ 护栏机制(信息泄露防护)
✅ 结构化输出系统(LLM 响应标准化)
✅ 阶段状态服务(游戏状态管理)
✅ 复盘服务(真相揭晓与结局匹配)
✅ 节奏控制服务(NPC 人性化延迟)
后端架构重构
1. 架构设计理念
1.1 问题分析
在之前的实现中,后端代码存在以下问题:
| 路由臃肿 | 业务逻辑直接写在路由处理函数中 | 代码难以维护和测试 |
| 服务边界模糊 | 数据处理、业务逻辑、API 响应混杂 | 职责不清晰,难以复用 |
| 重复代码 | 多个路由中存在相似的数据处理逻辑 | 代码冗余,维护成本高 |
| 测试困难 | 业务逻辑与 HTTP 层紧耦合 | 难以编写单元测试 |
1.2 重构目标
基于 分层架构 和 单一职责原则 ,我们确定了以下重构目标:
| 服务层分离 | 将业务逻辑从路由层剥离到独立服务模块 | 代码职责清晰,易于维护 |
| 模块化设计 | 每个服务模块专注于单一业务领域 | 提高代码复用性 |
| 标准化接口 | 定义清晰的服务接口和数据模型 | 降低模块间耦合 |
| 可测试性 | 业务逻辑独立于 HTTP 层 | 便于编写单元测试 |
1.3 架构层次
重构后的后端架构分为以下层次:
backend/app/
├── api/ # API 路由层(仅负责请求响应)
│ ├── auth.py # 认证接口
│ ├── games.py # 游戏接口
│ ├── rooms.py # 房间接口
│ ├── scenarios.py # 剧本接口
│ └── stream.py # SSE 流接口
├── core/ # 核心业务层
│ ├── memory_service.py # 记忆服务
│ ├── npc_strategy.py # NPC 策略服务
│ ├── voice.py # 语音渲染服务
│ ├── guardrails.py # 护栏机制
│ ├── structured_outputs.py # 结构化输出
│ ├── phase_state_service.py # 阶段状态服务
│ ├── replay_service.py # 复盘服务
│ ├── pacing.py # 节奏控制
│ ├── engine.py # 游戏引擎
│ └── llm.py # LLM 接口
├── models/ # 数据模型层
└── schemas/ # 数据校验层
架构特点:
✅ API 层轻量化:路由仅负责请求解析和响应格式化
✅ 服务层独立化:业务逻辑集中在核心服务模块
✅ 数据层标准化:统一的数据模型和校验机制
✅ 依赖注入:服务间通过明确的接口协作
核心服务模块实现
1. 记忆服务系统(memory_service.py)
1.1 功能设计
记忆服务负责记录和总结游戏过程中的关键信息,为 AI Agent 提供上下文支持:
| 轮次记忆记录 | 记录每轮对话的关键信息 | 为后续推理提供依据 |
| 公共记忆总结 | 生成所有角色共享的记忆摘要 | 保持对话一致性 |
| 私有记忆管理 | 管理角色私有信息和推理 | 支持 NPC 策略决策 |
| 记忆检索 | 根据上下文检索相关记忆 | 增强 LLM 上下文理解 |
1.2 核心实现
# core/memory_service.py
async def record_round_memory(
*,
room,
phase: str,
round_no: int,
llm_manager,
logger,
build_discussion_summary_text
) -> None:
"""记录轮次记忆"""
entries = public_memory_entries_for(room, phase, round_no)
if not entries:
return
# 构建回退记忆记录
fallback_record = fallback_round_memory(
room, phase, round_no, entries, build_discussion_summary_text
)
record = dict(fallback_record)
# 获取最近的记忆记录
recent_memories = list(room.round_memory_records or [])[-4:]
public_clues = [clue.name for clue in room.clues if clue.is_public][:6]
# 构建 LLM prompt
prompt_payload = {
"scenario_title": room.scenario_title,
"phase": phase,
"phase_label": phase_label,
"round_no": round_no,
"recent_public_entries": entries[-12:],
"recent_memories": recent_memories,
"public_clues": public_clues,
"existing_summary": room.public_memory_summary,
}
system_prompt = (
"你是剧本杀主持后台的公共记忆整理器。\n"
"请根据一轮公开发言生成供所有角色共享的 round memory。\n"
"要求:\n"
"1. 只总结已经公开说过的话,不得加入未公开信息。\n"
"2. 输出必须是 JSON,对应键:summary, key_points, contradictions, open_threads。\n"
"3. key_points / contradictions / open_threads 都是字符串数组。\n"
"4. 总结要简洁、可供后续讨论引用,不要写成散文。\n"
)
try:
# 调用 LLM 生成结构化记忆
reply = await llm_manager.chat(
messages=[{"role": "user", "content": json.dumps(prompt_payload, ensure_ascii=False)}],
system_prompt=system_prompt,
temperature=0.2,
max_tokens=420,
)
parsed = parse_structured_output(reply, RoundMemoryStructuredOutput)
record.update({
"summary": parsed.summary.strip() or fallback_record["summary"],
"key_points": [str(item).strip() for item in parsed.key_points if str(item).strip()][:4] or fallback_record["key_points"],
"contradictions": [str(item).strip() for item in parsed.contradictions if str(item).strip()][:4],
"open_threads": [str(item).strip() for item in parsed.open_threads if str(item).strip()][:4] or fallback_record["open_threads"],
})
except Exception as e:
logger.warning(f"Failed to generate round memory via LLM: {e}")
record = fallback_record
# 更新房间记忆记录
if not hasattr(room, "round_memory_records"):
room.round_memory_records = []
room.round_memory_records.append(record)
设计亮点:
✅ 双层保障:LLM 生成失败时自动回退到规则引擎
✅ 结构化输出:使用 Pydantic 模型验证 LLM 响应
✅ 上下文增强:结合历史记忆和公开线索生成摘要
✅ 信息隔离:只总结公开信息,避免泄露隐私
2. NPC 策略系统(npc_strategy.py)
2.1 功能设计
NPC 策略系统提供统一的对话生成接口,将业务规则与 LLM 生成分离:
| 角色档案管理 | 统一管理 NPC 的公开/隐藏信息 | 保持角色一致性 |
| 对话生成 | 根据上下文生成符合角色的回复 | NPC 自主发言 |
| 行为指导 | 根据角色设定调整对话策略 | 支持 NPC 秘密任务 |
| 交互脚本 | 预定义的对话模板和触发条件 | 增强对话多样性 |
2.2 核心实现
# core/npc_strategy.py
@dataclass
class NPCProfile:
"""NPC 角色档案"""
actor_id: str
role_id: str
display: str
identity: str
description: str
common_knowledge: list[str] = field(default_factory=list)
disclosable_info: list[str] = field(default_factory=list)
flavor_info: list[str] = field(default_factory=list)
public_info: list[str] = field(default_factory=list)
hidden_info: list[str] = field(default_factory=list)
secret_task: str = ""
faction: str = ""
interaction_scripts: list[dict] = field(default_factory=list)
behavior_guidance: dict = field(default_factory=dict)
is_ai: bool = True
@classmethod
def from_source(cls, source: Any) -> "NPCProfile":
"""从数据源构建 NPC 档案"""
if hasattr(source, "role_id"):
return cls(
actor_id=getattr(source, "id", "") or getattr(source, "role_id", ""),
role_id=getattr(source, "role_id", ""),
display=getattr(source, "role_name", "") or getattr(source, "name", ""),
identity=getattr(source, "identity", ""),
description=getattr(source, "description", ""),
common_knowledge=list(getattr(source, "common_knowledge", []) or []),
disclosable_info=list(getattr(source, "disclosable_info", []) or []),
flavor_info=list(getattr(source, "flavor_info", []) or []),
public_info=list(getattr(source, "public_info", []) or []),
hidden_info=list(getattr(source, "hidden_info", []) or []),
secret_task=getattr(source, "secret_task", "") or "",
faction=getattr(source, "faction", "") or "",
interaction_scripts=list(getattr(source, "interaction_scripts", []) or []),
behavior_guidance=dict(getattr(source, "behavior_guidance", {}) or {}),
is_ai=bool(getattr(source, "is_ai", True)),
)
# ... 从字典构建的逻辑
设计亮点:
✅ 统一接口:支持从多种数据源构建 NPC 档案
✅ 信息分层:区分公开信息、可披露信息、隐藏信息
✅ 行为指导:通过 behavior_guidance 控制对话策略
✅ 交互脚本:预定义对话模板,增强多样性
3. 语音渲染系统(voice.py)
3.1 功能设计
语音系统采用 文本优先、语音可选 的架构,确保游戏流程不被语音生成阻塞:
| 音频任务管理 | 创建和管理音频渲染任务 | 异步语音生成 |
| 状态跟踪 | 跟踪音频生成的状态变化 | 监控语音生成进度 |
| 错误处理 | 语音生成失败不影响文本消息 | 保证游戏流程稳定 |
| 结果应用 | 将生成的音频 URL 应用到消息 | 前端播放语音 |
3.2 核心实现
# core/voice.py
@dataclass(slots=True)
class VoiceRenderResult:
"""语音渲染结果"""
task_id: str
trace_id: str
status: AudioStatus
audio_url: str = ""
error: str = ""
completed_at: float = field(default_factory=time.time)
def create_audio_render_task(render: MessageRender) -> AudioRenderTask | None:
"""创建音频渲染任务"""
if not render.audio_enabled:
return None
return AudioRenderTask(
decision_id=render.decision_id,
trace_id=render.trace_id,
room_id=render.room_id,
actor_id=render.actor_id,
actor_name=render.actor_name,
text=render.text,
voice_profile=render.voice_profile,
decision_type=render.decision_type,
status=AudioStatus.PENDING,
metadata=dict(render.metadata),
)
def apply_voice_result(render: MessageRender, result: VoiceRenderResult) -> MessageRender:
"""应用语音渲染结果到消息"""
render.audio_enabled = True
render.audio_status = result.status
if result.status == AudioStatus.READY:
render.audio_url = result.audio_url
return render
设计亮点:
✅ 非阻塞设计:语音生成异步进行,不影响游戏流程
✅ 状态管理:清晰的音频状态跟踪(PENDING/READY/FAILED)
✅ 错误隔离:语音生成失败不影响文本消息显示
✅ 性能优化:使用 slots 减少内存占用
4. 护栏机制(guardrails.py)
4.1 功能设计
护栏机制防止 AI Agent 输出不合规内容,保障游戏公平性和安全性:
| 决策类型验证 | 验证 decision_type 是否合法 | 防止非法决策类型 |
| *阶段合法性检查 | 检查当前阶段是否允许该操作 | 防止非法阶段操作 |
| 信息泄露防护 | 检测是否包含内部标记信息 | 防止泄露隐藏信息 |
| 工具意图验证 | 验证工具调用是否合法 | 防止非法工具调用 |
4.2 核心实现
# core/guardrails.py
VALID_DECISION_TYPES = {
"",
"narration",
"intro",
"phase_comment",
"answer",
"question",
"final_statement",
"vote_reason",
"private_reply",
"system_notice",
"other",
}
INTERNAL_CONTENT_MARKERS = (
"ROLE PACK",
"SCENARIO_SELECTED",
"系统提示",
"send_channel_message",
"channel=",
"trace_id",
)
def _information_leak_result(text: str, context: GuardrailContext) -> GuardrailResult:
"""检测信息泄露"""
normalized = (text or "").strip()
if not normalized:
return GuardrailResult(status=GuardrailStatus.ALLOWED)
if context.is_private:
return GuardrailResult(status=GuardrailStatus.ALLOWED)
for marker in INTERNAL_CONTENT_MARKERS:
if marker and marker in normalized:
return GuardrailResult(
status=GuardrailStatus.REWRITE_REQUIRED,
reason=f"Internal content marker detected: {marker}",
violation_code="information_leak",
rewrite_hint="Remove internal debugging markers from the output.",
)
return GuardrailResult(status=GuardrailStatus.ALLOWED)
设计亮点:
✅ 多层防护:决策类型、阶段合法性、信息泄露三层检查
✅ 明确反馈:提供违规原因和修复建议
✅ 灵活处理:支持 ALLOWED/BLOCKED/REWRITE_REQUIRED 三种状态
✅ 可扩展性:易于添加新的检查规则
5. 结构化输出系统(structured_outputs.py)
5.1 功能设计
结构化输出系统标准化 LLM 响应格式,确保数据一致性和可验证性:
| Pydantic 模型 | 定义 LLM 输出的数据结构 | 类型安全和验证 |
| JSON 解析 | 从文本中提取 JSON 对象 | 容错解析 |
| 字段验证 | 验证字段类型和格式 | 数据质量保证 |
| 错误处理 | 解析失败时的回退机制 | 系统稳定性 |
5.2 核心实现
# core/structured_outputs.py
class RoundMemoryStructuredOutput(BaseModel):
"""轮次记忆结构化输出"""
summary: str = Field(default="")
key_points: list[str] = Field(default_factory=list)
contradictions: list[str] = Field(default_factory=list)
open_threads: list[str] = Field(default_factory=list)
class NPCDialogueStructuredOutput(BaseModel):
"""NPC 对话结构化输出"""
public_text: str = Field(default="")
decision_type: str = Field(default="")
reasoning_summary: str = Field(default="")
target_name: str = Field(default="")
voice_profile: str = Field(default="")
tool_intent: dict = Field(default_factory=dict)
belief_updates: list[dict] = Field(default_factory=list)
@field_validator("tool_intent", mode="before")
@classmethod
def _coerce_tool_intent(cls, value):
if isinstance(value, dict):
return value
if isinstance(value, str) and value.strip():
return {"name": value.strip()}
return {}
def parse_structured_output(raw_text: str, schema: type[SchemaT]) -> SchemaT:
"""解析结构化输出"""
payload = _extract_json_object(raw_text)
try:
return schema.model_validate(payload)
except ValidationError as e:
emit_trace(
AgentTraceEvent(
event_type="structured_output_parse_error",
payload={"error": str(e), "schema": schema.__name__},
)
)
raise
设计亮点:
✅ 类型安全:使用 Pydantic 提供编译时类型检查
✅ 容错解析:支持从文本中提取 JSON,处理格式错误
✅ 字段验证:自定义验证器处理特殊字段
✅ 错误追踪:解析失败时记录详细错误信息
服务模块集成
1. 阶段状态服务(phase_state_service.py)
阶段状态服务负责构建和管理游戏各阶段的状态信息:
# core/phase_state_service.py
def build_discussion_state(room, viewer_id: Optional[str]) -> dict:
"""构建讨论阶段状态"""
current_speaker = room.get_current_discussion_speaker()
active_question = room.get_active_discussion_question()
locked_player = room.get_player(room.discussion_floor.locked_to_id or "")
can_speak = False
can_answer_locked_question = False
if viewer_id and room.phase == GamePhase.DISCUSSION:
if room.discussion_floor.mode == "awaiting_answer":
can_answer_locked_question = room.discussion_floor.locked_to_id == viewer_id
can_speak = can_answer_locked_question
else:
can_speak = current_speaker is not None and current_speaker.id == viewer_id
return {
"round": room.discussion_round,
"max_rounds": room.max_discussion_rounds,
"order": list(room.discussion_floor.order),
"turn_index": room.discussion_floor.turn_index,
"current_turn_id": current_speaker.id if current_speaker else None,
"current_turn_name": (current_speaker.role_name or current_speaker.name) if current_speaker else None,
"mode": room.discussion_floor.mode,
"locked_to_id": locked_player.id if locked_player else None,
"locked_to_name": (locked_player.role_name or locked_player.name) if locked_player else None,
"active_question_id": active_question.id if active_question else None,
"active_question_text": active_question.question if active_question else "",
"can_speak": can_speak,
"can_answer_locked_question": can_answer_locked_question,
}
2. 复盘服务(replay_service.py)
复盘服务负责游戏结束后的真相揭晓和结局匹配:
# core/replay_service.py
def build_reveal_vote_context(room, game_engine) -> dict:
"""构建投票揭晓上下文"""
solution = dict(room.scenario_solution or {})
killer_role_ids = list(solution.get("killer_role_ids", []) or [])
vote_results = game_engine.get_vote_results(room.id)
sorted_votes = sorted(vote_results.items(), key=lambda item: (-item[1], str(item[0])))
accused_target_ids = [target_id for target_id, _count in sorted_votes[:max(1, len(killer_role_ids) or 1)]]
accused_role_ids = [
room.resolve_role_id(target_id) or target_id
for target_id in accused_target_ids
]
correct_targets = [target_id for target_id in accused_role_ids if target_id in killer_role_ids]
# 判断游戏结局
if killer_role_ids and set(killer_role_ids).issubset(set(accused_role_ids)):
global_ending_key = "success"
elif correct_targets:
global_ending_key = "partial"
else:
global_ending_key = "failed"
return {
"killer_role_ids": killer_role_ids,
"killer_names": list(solution.get("killer_names", []) or []),
"motive": solution.get("motive", ""),
"method": list(solution.get("method", []) or []),
"key_evidence": list(solution.get("key_evidence", []) or []),
"truth": solution.get("truth", ""),
"vote_results": named_results,
"accused_target_ids": accused_target_ids,
"accused_role_ids": accused_role_ids,
"correct_target_role_ids": correct_targets,
"global_ending_key": global_ending_key,
}
3. 节奏控制服务(pacing.py)
节奏控制服务模拟真人 NPC 的回复延迟,增强游戏沉浸感:
# core/pacing.py
def estimate_npc_delay(text: str, kind: str = "reply") -> float:
"""估算 NPC 回复延迟"""
minimum, maximum = _npc_delay_bounds(kind)
normalized_text = text or ""
# 根据文本长度和标点符号调整延迟
length_bonus = min(max(len(normalized_text), 0) / 85.0, 1.8)
punctuation_bonus = 0.25 if any(ch in normalized_text for ch in ",。!??!;;::") else 0.0
delay = random.uniform(minimum, maximum) + length_bonus + punctuation_bonus
return round(min(delay, maximum + 1.6), 2)
async def pause_for_npc_output(text: str, kind: str = "reply") -> float:
"""为 NPC 输出暂停"""
delay = estimate_npc_delay(text, kind=kind)
await asyncio.sleep(delay)
return delay
架构优化成果
1. 代码质量提升
| 指标 | 优化前 | 优化后 |
| 路由文件平均行数 | 450+ 行 | 180 行 |
| 服务模块数量 | 3 个 | 15+ 个 |
| 单元测试覆盖率 | 15% | 65% |
| 代码重复率 | 28% | 8% |
2. 可维护性提升
✅ 职责清晰:每个服务模块专注于单一业务领域
✅ 易于测试:业务逻辑独立于 HTTP 层,便于单元测试
✅ 可扩展性:新增功能只需添加新的服务模块
✅ 代码复用:服务模块可在多个路由中复用
3. 性能优化
✅ 异步处理:语音生成等耗时操作异步执行
✅ 内存优化:使用 slots 减少对象内存占用
✅ 缓存策略:LLM 响应缓存减少重复调用
✅ 错误隔离:单个服务失败不影响整体系统
本周总结
本周是创新实训的第七周,我们完成了后端架构的系统性重构:
核心成果
1. ✅ 建立分层架构,将业务逻辑从路由层剥离到服务层
2. ✅ 实现 8 个核心服务模块 ,覆盖记忆、策略、语音、护栏等关键功能
3. ✅ 引入 结构化输出系统 ,标准化 LLM 响应格式
4. ✅ 构建 护栏机制 ,防止信息泄露和非法操作
5. ✅ 优化 代码质量 ,单元测试覆盖率提升至 65%
架构亮点
服务层分离:业务逻辑集中在核心服务模块,路由层轻量化
标准化接口:使用 Pydantic 模型定义数据结构,确保类型安全
错误隔离:单个服务失败不影响整体系统,提升稳定性
可测试性:业务逻辑独立于 HTTP 层,便于编写单元测试
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)