MiroFish 项目完整分析文档

项目仓库: https://github.com/666ghj/MiroFish
本文档目标: 提供足够详细的技术分析,使读者仅凭本文档即可精确复现整个 MiroFish 项目


目录


第一部分:项目总览

1.1 项目定位

MiroFish 是一款基于多智能体(Multi-Agent)技术的新一代 AI 预测引擎,由盛大集团战略支持和孵化。其核心理念是:通过提取现实世界的种子信息(如突发新闻、政策草案、金融信号、小说文本等),自动构建出高保真的平行数字世界。在此数字世界中,成千上万个具备独立人格、长期记忆与行为逻辑的智能体(Agent)进行自由交互与社会演化,从而实现对未来走向的精准推演。

项目口号:“让未来在数字沙盘中预演,助决策在百战模拟后胜出”

1.2 核心能力

能力维度 说明
种子输入 支持 PDF、Markdown、TXT 格式的文档上传,自动提取文本内容
本体论生成 基于 LLM 自动分析文档,生成实体类型和关系类型定义(Ontology)
知识图谱构建 利用 Zep Cloud 的 GraphRAG 能力,从文档构建语义知识图谱
智能体人设生成 将知识图谱中的实体转化为具有独立人格的 OASIS 仿真智能体
双平台仿真 同时在 Twitter 和 Reddit 两个模拟社交平台上运行多智能体交互
动态记忆更新 仿真过程中智能体的行为实时回写到知识图谱,实现记忆演化
AI 报告生成 基于 ReACT 推理模式,自动生成详尽的预测分析报告
深度交互 支持与仿真世界中的任意智能体或报告Agent进行自然语言对话

1.3 应用场景

  • 宏观层面:政策推演、舆情预测、公关危机模拟、金融市场预测
  • 微观层面:小说结局推演、创意验证、社交网络行为模拟
  • 已展示案例:武汉大学舆情推演预测、《红楼梦》失传结局推演预测

1.4 五步工作流

MiroFish 的完整工作流程分为五个步骤:

步骤1: 图谱构建 → 步骤2: 环境搭建 → 步骤3: 开始模拟 → 步骤4: 报告生成 → 步骤5: 深度互动
  1. 图谱构建(Graph Build):上传种子文档 → LLM 生成本体论 → Zep 构建知识图谱
  2. 环境搭建(Environment Setup):读取实体 → 生成智能体人设 → LLM 生成仿真配置参数
  3. 开始模拟(Simulation):启动 OASIS 仿真引擎 → 双平台并行运行 → 实时监控
  4. 报告生成(Report):ReportAgent 使用工具集 → 深度分析仿真结果 → 生成 Markdown 报告
  5. 深度互动(Interaction):与 ReportAgent 对话 → 与仿真世界中的智能体对话 → 批量调研

1.5 项目文件结构

MiroFish/
├── .env.example                    # 环境变量模板
├── .github/workflows/
│   └── docker-image.yml            # CI/CD 流水线
├── Dockerfile                      # Docker 构建文件
├── docker-compose.yml              # Docker Compose 编排
├── package.json                    # 根 Node.js 配置(并发启动前后端)
├── README.md / README-EN.md        # 项目说明文档
├── backend/
│   ├── run.py                      # 后端入口文件
│   ├── requirements.txt            # Python 依赖
│   ├── pyproject.toml              # Python 项目配置
│   ├── app/
│   │   ├── __init__.py             # Flask 应用工厂
│   │   ├── config.py               # 配置管理
│   │   ├── api/                    # API 路由层
│   │   │   ├── __init__.py         # Blueprint 注册
│   │   │   ├── graph.py            # 图谱相关 API
│   │   │   ├── simulation.py       # 仿真相关 API
│   │   │   └── report.py           # 报告相关 API
│   │   ├── models/                 # 数据模型
│   │   │   ├── project.py          # 项目状态管理
│   │   │   └── task.py             # 异步任务管理
│   │   ├── services/               # 核心业务逻辑
│   │   │   ├── text_processor.py          # 文本处理
│   │   │   ├── ontology_generator.py      # 本体论生成
│   │   │   ├── graph_builder.py           # 图谱构建
│   │   │   ├── zep_entity_reader.py       # 实体读取
│   │   │   ├── oasis_profile_generator.py # 人设生成
│   │   │   ├── simulation_config_generator.py # 仿真配置生成
│   │   │   ├── simulation_manager.py      # 仿真管理
│   │   │   ├── simulation_runner.py       # 仿真执行
│   │   │   ├── simulation_ipc.py          # 进程间通信
│   │   │   ├── zep_graph_memory_updater.py # 图谱记忆更新
│   │   │   ├── report_agent.py            # 报告Agent
│   │   │   └── zep_tools.py               # Zep 工具服务
│   │   └── utils/                  # 工具模块
│   │       ├── file_parser.py      # 文件解析
│   │       ├── llm_client.py       # LLM 通信
│   │       ├── logger.py           # 日志系统
│   │       ├── retry.py            # 重试机制
│   │       └── zep_paging.py       # Zep 分页工具
│   └── scripts/                    # 仿真运行脚本
│       ├── run_parallel_simulation.py     # 并行仿真
│       ├── run_twitter_simulation.py      # Twitter 仿真
│       ├── run_reddit_simulation.py       # Reddit 仿真
│       ├── action_logger.py               # 行为日志
│       └── test_profile_format.py         # 格式测试
├── frontend/
│   ├── index.html                  # HTML 入口
│   ├── package.json                # 前端依赖
│   ├── vite.config.js              # Vite 构建配置
│   └── src/
│       ├── App.vue                 # 根组件
│       ├── main.js                 # Vue 入口
│       ├── router/index.js         # 路由配置
│       ├── store/pendingUpload.js  # 状态管理
│       ├── api/                    # API 调用层
│       │   ├── index.js            # Axios 基础配置
│       │   ├── graph.js            # 图谱 API
│       │   ├── simulation.js       # 仿真 API
│       │   └── report.js           # 报告 API
│       ├── views/                  # 页面视图
│       │   ├── Home.vue            # 首页
│       │   ├── MainView.vue        # 主处理视图
│       │   ├── Process.vue         # 处理流程(备用)
│       │   ├── SimulationView.vue  # 仿真视图
│       │   ├── SimulationRunView.vue # 仿真运行视图
│       │   ├── ReportView.vue      # 报告视图
│       │   └── InteractionView.vue # 交互视图
│       └── components/             # 组件
│           ├── GraphPanel.vue      # 图谱可视化
│           ├── HistoryDatabase.vue # 历史记录
│           ├── Step1GraphBuild.vue # 步骤1组件
│           ├── Step2EnvSetup.vue   # 步骤2组件
│           ├── Step3Simulation.vue # 步骤3组件
│           ├── Step4Report.vue     # 步骤4组件
│           └── Step5Interaction.vue # 步骤5组件
└── static/image/                   # 静态资源

第二部分:系统架构设计

2.1 整体架构

MiroFish 采用经典的前后端分离架构,结合外部服务和仿真引擎:

┌─────────────────────────────────────────────────────────────────────┐
│                        用户浏览器(Frontend)                         │
│  Vue 3 + Vite + D3.js + Axios                                      │
│  端口: 3000                                                         │
└──────────────────────────────┬──────────────────────────────────────┘
                               │ HTTP REST API
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│                        Flask 后端(Backend)                          │
│  端口: 5001                                                         │
│  ┌────────────┐  ┌─────────────┐  ┌────────────┐                   │
│  │ Graph API  │  │Simulation API│  │ Report API │                   │
│  └─────┬──────┘  └──────┬──────┘  └─────┬──────┘                   │
│        │                │                │                           │
│  ┌─────┴────────────────┴────────────────┴──────┐                   │
│  │              Services 层(核心业务逻辑)         │                   │
│  │  OntologyGenerator | GraphBuilder | ProfileGen │                  │
│  │  SimulationManager | SimulationRunner          │                  │
│  │  ReportAgent | ZepTools                        │                  │
│  └─────┬────────────────┬────────────────┬──────┘                   │
│        │                │                │                           │
│  ┌─────┴──┐    ┌───────┴───┐    ┌───────┴───────┐                  │
│  │ Models │    │   Utils   │    │  Scripts      │                  │
│  └────────┘    └───────────┘    └───────────────┘                  │
└────────┬─────────────────┬──────────────────┬───────────────────────┘
         │                 │                  │
         ▼                 ▼                  ▼
┌──────────────┐  ┌──────────────┐  ┌──────────────────────┐
│  Zep Cloud   │  │  LLM API     │  │  OASIS 仿真引擎       │
│ (知识图谱存储) │  │(OpenAI 格式)  │  │ (CAMEL-AI 驱动)      │
│  GraphRAG    │  │  Qwen/GPT等  │  │  Twitter + Reddit    │
└──────────────┘  └──────────────┘  └──────────────────────┘

2.2 数据流架构

整个系统的数据流遵循清晰的管道式架构:

文档输入 ──→ 文本提取 ──→ 本体论生成 ──→ 知识图谱构建
                                              │
                                              ▼
报告生成 ←── 仿真执行 ←── 仿真配置生成 ←── 实体/人设生成
    │
    ▼
深度交互

详细数据流:

1. 文档输入阶段:
   PDF/MD/TXT 文件 → FileParser 提取文本 → TextProcessor 预处理 → 存储

2. 图谱构建阶段:
   预处理文本 → OntologyGenerator(LLM) → 实体/关系类型定义
   文本分块 → GraphBuilderService → Zep API → 知识图谱

3. 仿真准备阶段:
   知识图谱 → ZepEntityReader → 实体列表
   实体列表 → OasisProfileGenerator(LLM) → 智能体人设
   实体+文档+需求 → SimulationConfigGenerator(LLM) → 仿真参数

4. 仿真执行阶段:
   仿真参数+人设 → OASIS引擎(子进程) → actions.jsonl 日志
   actions.jsonl → SimulationRunner(监控线程) → 状态更新
   Agent行为 → ZepGraphMemoryUpdater → 图谱记忆更新

5. 报告生成阶段:
   图谱+仿真数据 → ReportAgent(ReACT模式) → 工具调用+LLM推理
   搜索结果+分析 → 分节生成 → Markdown 报告

6. 交互阶段:
   用户消息 → ReportAgent.chat() → 工具辅助回答
   用户消息 → IPC → OASIS Agent → 智能体回复

2.3 进程架构

MiroFish 运行时涉及多个进程和线程:

Flask 主进程 (PID: main)
├── 请求处理线程(Flask threaded=True)
├── 图谱构建后台线程(daemon thread per task)
├── 仿真准备后台线程(daemon thread per simulation)
├── 报告生成后台线程(daemon thread per report)
├── 仿真监控线程(per running simulation)
└── 图谱记忆更新工作线程(per simulation with memory enabled)

OASIS 仿真子进程 (独立进程组)
├── run_parallel_simulation.py(主仿真进程)
├── Twitter 环境(内部线程)
└── Reddit 环境(内部线程)

Flask ←→ OASIS 通信方式: 文件系统 IPC
  ipc_commands/   → Flask 写入命令 JSON
  ipc_responses/  → OASIS 写入响应 JSON
  actions.jsonl   → OASIS 写入,Flask 监控线程读取
  env_status.json → OASIS 写入状态,Flask 查询

2.4 存储架构

所有数据采用文件系统存储,无需外部数据库:

backend/uploads/
├── projects/
│   └── proj_{uuid}/
│       ├── project.json              # 项目元数据
│       ├── extracted_text.txt        # 提取的文档文本
│       └── files/
│           ├── {safe_filename}.pdf   # 上传的原始文件
│           └── {safe_filename}.md
├── simulations/
│   └── sim_{uuid}/
│       ├── state.json                # 仿真状态
│       ├── simulation_config.json    # LLM 生成的仿真配置
│       ├── reddit_profiles.json      # Reddit 平台智能体人设
│       ├── twitter_profiles.csv      # Twitter 平台智能体人设
│       ├── run_state.json            # 执行状态
│       ├── simulation.log            # 主日志
│       ├── twitter/
│       │   └── actions.jsonl         # Twitter 行为日志
│       ├── reddit/
│       │   └── actions.jsonl         # Reddit 行为日志
│       └── ipc_commands/ & ipc_responses/  # IPC 通信文件
└── reports/
    └── report_{uuid}/
        ├── meta.json                 # 报告元数据
        ├── full_report.md            # 完整 Markdown 报告
        ├── section_01.md ~ section_N.md  # 各节内容
        ├── agent_log.jsonl           # Agent 执行日志
        └── console_log.txt           # 控制台输出

2.5 外部依赖服务

服务 用途 协议 必需性
Zep Cloud 知识图谱存储、GraphRAG 搜索、实体/关系管理 REST API 必需
LLM API 本体论生成、人设生成、配置生成、报告生成、对话 OpenAI SDK 格式 必需
OASIS (camel-oasis) 社交媒体仿真引擎,驱动智能体行为 Python 库 必需

2.6 通信协议

  • 前端 ↔ 后端: HTTP REST API(JSON 格式),前端通过 Vite 代理转发到 5001 端口
  • 后端 ↔ LLM: OpenAI SDK 兼容的 HTTP API,支持任意 OpenAI 格式的 LLM 服务
  • 后端 ↔ Zep: Zep Python SDK(封装 REST API),使用 API Key 认证
  • 后端 ↔ OASIS: 文件系统 IPC(进程间通信),通过 JSON 文件交换命令和响应
  • OASIS 内部: CAMEL-AI 框架管理智能体间的消息传递和行为决策

第三部分:技术栈与依赖

3.1 后端技术栈

技术/库 版本要求 用途
Python ≥3.11, ≤3.12 后端运行时
Flask 3.0.0+ Web 框架
Flask-CORS 6.0.0+ 跨域资源共享
OpenAI SDK 1.0.0+ LLM API 统一接口
Zep-Cloud 3.13.0 Zep 知识图谱 SDK
CAMEL-OASIS 0.2.5 OASIS 社交媒体仿真引擎
CAMEL-AI 0.2.78 CAMEL 多智能体框架
PyMuPDF (fitz) 1.24.0+ PDF 文档解析
Pydantic 2.0.0+ 数据验证和序列化
python-dotenv 1.0.0+ 环境变量管理
charset-normalizer 3.0.0+ 字符编码检测
chardet 5.0.0+ 备用编码检测
uv 最新版 Python 包管理器(替代 pip)

3.2 前端技术栈

技术/库 版本 用途
Vue 3 3.5.24 前端框架
Vue Router 4.6.3 路由管理
Vite 7.2.4 构建工具和开发服务器
Axios - HTTP 请求库
D3.js - 知识图谱力导向可视化
Node.js 18+ 前端运行时

3.3 字体资源

前端使用以下 Google Fonts:

  • Inter: 主要界面文字
  • JetBrains Mono: 代码和终端风格显示
  • Noto Sans SC: 中文字体
  • Space Grotesk: 标题和强调文字

3.4 开发与构建工具

工具 用途
concurrently 同时启动前后端开发服务器
npm Node.js 包管理
uv Python 依赖管理和虚拟环境
Docker / Docker Compose 容器化部署
GitHub Actions CI/CD 自动构建 Docker 镜像

3.5 环境变量配置

基于 .env.example 的完整配置项:

# ========== 必需配置 ==========

# LLM API 配置(支持 OpenAI SDK 格式的任意 LLM API)
LLM_API_KEY=your_api_key                                    # LLM API 密钥
LLM_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1  # API 基础 URL
LLM_MODEL_NAME=qwen-plus                                    # 模型名称

# Zep Cloud 配置
ZEP_API_KEY=your_zep_api_key                                # Zep Cloud API 密钥

# ========== 可选配置 ==========

# Boost LLM(用于更高性能的 LLM 调用,如报告生成)
LLM_BOOST_API_KEY=...
LLM_BOOST_BASE_URL=...
LLM_BOOST_MODEL_NAME=...

# Flask 配置
FLASK_HOST=0.0.0.0                                          # 监听地址(默认 0.0.0.0)
FLASK_PORT=5001                                             # 监听端口(默认 5001)
FLASK_DEBUG=True                                            # 调试模式(默认 True)
SECRET_KEY=mirofish-secret-key                              # Flask 密钥

# OASIS 仿真配置
OASIS_DEFAULT_MAX_ROUNDS=10                                 # 默认最大轮数

# Report Agent 配置
REPORT_AGENT_MAX_TOOL_CALLS=5                               # 每节最大工具调用次数
REPORT_AGENT_MAX_REFLECTION_ROUNDS=2                        # 最大反思轮数
REPORT_AGENT_TEMPERATURE=0.5                                # LLM 温度参数

3.6 Config 类关键常量

以下是 app/config.py 中定义的重要常量:

MAX_CONTENT_LENGTH = 50 * 1024 * 1024    # 最大上传文件大小: 50MB
ALLOWED_EXTENSIONS = {'pdf', 'md', 'txt', 'markdown'}  # 允许的文件类型
DEFAULT_CHUNK_SIZE = 500                  # 文本分块大小: 500 字符
DEFAULT_CHUNK_OVERLAP = 50                # 分块重叠: 50 字符

# Twitter 平台可用动作
TWITTER_ACTIONS = [
    "CREATE_POST", "LIKE_POST", "REPOST",
    "FOLLOW", "DO_NOTHING", "QUOTE_POST"
]

# Reddit 平台可用动作
REDDIT_ACTIONS = [
    "LIKE_POST", "DISLIKE_POST", "CREATE_POST", "CREATE_COMMENT",
    "LIKE_COMMENT", "DISLIKE_COMMENT", "SEARCH_POSTS", "SEARCH_USER",
    "TREND", "REFRESH", "DO_NOTHING", "FOLLOW", "MUTE"
]

第四部分:后端核心服务详解

后端 services/ 目录包含 11 个核心服务模块,它们构成了 MiroFish 的全部业务逻辑。以下逐一详细分析。

4.1 TextProcessor — 文本处理服务

文件: backend/app/services/text_processor.py
职责: 文档文本的提取、预处理和分块

类定义
class TextProcessor:
    """文本处理器,封装文件解析和文本操作"""
核心方法
方法 参数 返回值 功能
extract_from_files file_paths: List[str] str 委托 FileParser 提取多文件文本
split_text text, chunk_size=500, overlap=50 List[str] 按指定大小分块文本
preprocess_text text: str str 规范化换行、去除连续空行、首尾去空
get_text_stats text: str dict 返回字符数、行数、词数统计
文本分块算法

分块逻辑是整个图谱构建的基础,其核心策略:

  1. 按指定 chunk_size(默认 500 字符)切分文本
  2. 保留 overlap(默认 50 字符)的重叠区域确保上下文连续性
  3. 优先在句子边界处分割(句号、叹号、问号、换行符:。!?.\n!\n?\n\n
  4. 如果在句子边界处找不到合适的切割点,则按字符位置硬切
文本: |<------------- chunk 1 ------------->|
                              |<--- overlap --->|<----------- chunk 2 ----------->|
设计原理
  • 分块目的是将长文档拆成 Zep 可处理的小段,每段作为一个 “episode” 输入知识图谱
  • 重叠区域确保实体和关系在块边界处不会丢失
  • 500 字符的默认大小兼顾了信息完整性和 LLM 处理效率

4.2 OntologyGenerator — 本体论生成器

文件: backend/app/services/ontology_generator.py(约 454 行)
职责: 基于 LLM 分析上传文档,自动生成用于知识图谱构建的实体类型(Entity Types)和关系类型(Edge Types)

类定义
class OntologyGenerator:
    def __init__(self, llm_client: Optional[LLMClient] = None)
核心方法

generate(document_texts, simulation_requirement, additional_context) -> Dict

这是本体论生成的主入口方法。

输入参数:

  • document_texts: List[str] — 提取的文档文本列表
  • simulation_requirement: str — 用户描述的仿真需求
  • additional_context: Optional[str] — 额外上下文信息

输出格式:

{
    "entity_types": [
        {
            "name": "PascalCase 名称",
            "description": "实体类型描述",
            "attributes": [
                {"name": "snake_case 属性名", "type": "text", "description": "属性描述"}
            ],
            "examples": ["示例实体1", "示例实体2"]
        }
    ],
    "edge_types": [
        {
            "name": "UPPER_SNAKE_CASE 名称",
            "description": "关系类型描述",
            "source_targets": [
                {"source": "EntityType1", "target": "EntityType2"}
            ],
            "attributes": []
        }
    ],
    "analysis_summary": "中文分析摘要"
}

执行流程:

  1. 调用 _build_user_message() 构建提示词,合并文档文本(最大 50K 字符)和仿真需求
  2. 调用 LLM,使用 ONTOLOGY_SYSTEM_PROMPT 作为系统提示
  3. 解析 LLM 返回的 JSON 结果
  4. 调用 _validate_and_process() 验证和补全

约束规则:

  • 必须恰好 10 个实体类型(8 个场景特定 + 2 个兜底类型)
  • 兜底类型为 PersonOrganization,永远存在
  • 关系类型 6-10 个
  • 属性名不能使用保留名称:name, uuid, group_id, created_at, summary
  • 实体类型名使用 PascalCase,关系类型名使用 UPPER_SNAKE_CASE

generate_python_code(ontology) -> str: 将本体论定义转换为 Python 类定义代码

System Prompt 设计原理

本体论系统提示词的核心指令:

  • 要求 LLM 扮演社交媒体舆论仿真领域的本体论设计专家
  • 强调实体必须是"能在社交媒体上发声的现实世界行为者"
  • 要求生成的实体类型具有清晰的角色差异和行为特征差异
  • 确保关系类型覆盖核心的社会关系网络

4.3 GraphBuilderService — 图谱构建服务

文件: backend/app/services/graph_builder.py(约 501 行)
职责: 利用 Zep Cloud API 构建知识图谱,管理图谱的创建、本体论设置、文本注入和查询

数据类
@dataclass
class GraphInfo:
    graph_id: str
    node_count: int
    edge_count: int
    entity_types: List[str]
类定义
class GraphBuilderService:
    def __init__(self, api_key: Optional[str] = None)
    # 初始化 Zep 客户端和 TaskManager
核心方法详解

build_graph_async(text, ontology, graph_name, chunk_size, chunk_overlap, batch_size) -> str

异步图谱构建入口,返回 task_id 供轮询。

进度推进:

0%  → 创建任务
5%  → 分块文本
10% → 创建 Zep 图谱
15% → 设置本体论
20% → 开始注入文本
60% → 文本注入完成
90% → Zep 处理完成
100% → 获取图谱信息完毕

_build_graph_worker(task_id, text, ontology, ...): 后台工作线程

  1. 创建图谱: 调用 create_graph(name) 生成唯一 ID 格式 mirofish_{uuid[:16]}
  2. 设置本体论:
    • 将本体论的实体类型和关系类型动态转换为 Pydantic 模型
    • 处理保留字段名转换(如 nameentity_name
    • 调用 client.graph.set_ontology(graph_ids=[graph_id], entities={...}, edges={...})
  3. 分块注入:
    • 将文本分块后按批次(默认每批 3 个 chunk)发送到 Zep
    • 每批之间间隔 1 秒(限流)
    • 调用 client.graph.add_batch() 返回 episode UUID 列表
  4. 等待处理:
    • 轮询每个 episode 的 processed 状态
    • 每 3 秒检查一次,最长等待 600 秒
    • 所有 episode 处理完毕后继续
  5. 获取图谱信息: 使用分页工具获取所有节点和边,统计数量和类型

set_ontology(graph_id, ontology) — 动态 Pydantic 模型生成

这是关键的技术实现。由于 Zep SDK 要求以 Pydantic 模型定义本体论,而本体论是运行时由 LLM 动态生成的,因此需要在运行时动态创建 Pydantic 类:

# 伪代码示意
for entity_type in ontology["entity_types"]:
    fields = {}
    for attr in entity_type["attributes"]:
        field_name = attr["name"]
        if field_name in RESERVED_NAMES:
            field_name = f"entity_{field_name}"  # 避免保留字冲突
        fields[field_name] = (Optional[str], Field(default=None, description=attr["description"]))

    # 动态创建 Pydantic 模型类
    EntityModel = create_model(entity_type["name"], **fields)
    entities[entity_type["name"]] = EntityModel

get_graph_data(graph_id) -> Dict: 获取完整图谱结构

返回格式:

{
    "nodes": [
        {
            "uuid": "...",
            "name": "实体名",
            "labels": ["Student", "Entity"],
            "summary": "实体摘要",
            "attributes": {"key": "value"},
            "created_at": "2025-01-01T00:00:00"
        }
    ],
    "edges": [
        {
            "uuid": "...",
            "name": "STUDIES_AT",
            "fact": "关系描述事实",
            "source_node_uuid": "...",
            "target_node_uuid": "...",
            "source_node_name": "张三",
            "target_node_name": "北京大学"
        }
    ]
}

4.4 ZepEntityReader — 实体读取服务

文件: backend/app/services/zep_entity_reader.py(约 438 行)
职责: 从 Zep 知识图谱中读取和过滤实体,丰富实体上下文信息

数据类
@dataclass
class EntityNode:
    uuid: str
    name: str
    labels: List[str]        # 实体标签,如 ["Student", "Entity"]
    summary: str             # 实体摘要
    attributes: Dict         # 实体属性
    related_edges: List[Dict]  # 关联的边
    related_nodes: List[Dict]  # 关联的节点

    def get_entity_type(self) -> str:
        """获取实体类型(过滤掉 'Entity' 和 'Node' 默认标签)"""

@dataclass
class FilteredEntities:
    entities: List[EntityNode]
    entity_types: Set[str]
核心方法

filter_defined_entities(graph_id, defined_entity_types=None, enrich_with_edges=True) -> FilteredEntities

这是仿真准备阶段的关键方法:

  1. 获取所有节点: 调用 fetch_all_nodes() 分页获取
  2. 过滤逻辑:
    • 保留包含自定义标签(非 “Entity”/“Node”)的节点
    • 如果指定了 defined_entity_types,仅保留匹配的类型
    • 跳过仅有默认标签的节点
  3. 上下文丰富:
    • 对每个实体调用 get_node_edges() 获取所有关联边
    • 收集所有关联节点的信息
    • 将边和节点信息附加到 EntityNode 对象

重试机制: 所有 Zep API 调用都使用 _call_with_retry() 封装,指数退避重试(初始延迟 2 秒,最多 3 次)。


4.5 OasisProfileGenerator — 智能体人设生成器

文件: backend/app/services/oasis_profile_generator.py(约 1000+ 行)
职责: 将知识图谱中的实体转化为 OASIS 仿真引擎可识别的智能体人设(Agent Profile)

数据类
@dataclass
class OasisAgentProfile:
    # 核心字段
    user_id: int             # OASIS 用户 ID
    user_name: str           # 用户名(如 zhang_san_123)
    name: str                # 显示名
    bio: str                 # 简介(约 200 字符)
    persona: str             # 人设描述(约 2000 字符,包含行为模式)

    # 可选字段
    age: Optional[int]
    gender: Optional[str]
    mbti: Optional[str]      # 16 种 MBTI 人格类型
    country: Optional[str]
    profession: Optional[str]
    interested_topics: Optional[List[str]]

    # 元数据
    source_entity_uuid: str   # 来源实体 UUID
    source_entity_type: str   # 来源实体类型
    created_at: str

    def to_reddit_format(self) -> Dict    # Reddit JSON 格式
    def to_twitter_format(self) -> Dict   # Twitter CSV 格式
实体类型分类

系统将实体分为两大类,采用不同的人设生成策略:

个体类实体 (INDIVIDUAL_ENTITY_TYPES):

["student", "alumni", "professor", "person", "publicfigure",
 "expert", "faculty", "official", "journalist", "activist"]

群体/机构类实体 (GROUP_ENTITY_TYPES):

["university", "governmentagency", "organization", "ngo",
 "mediaoutlet", "company", "institution", "group", "community"]
人设生成流程

generate_profile_from_entity(entity, user_id, use_llm=True) -> OasisAgentProfile

  1. 生成用户名: 小写化 + 下划线替换空格 + 去除特殊字符 + 随机 3 位数后缀
  2. 构建实体上下文 (_build_entity_context):
    • 添加实体属性
    • 添加关联边(关系信息)
    • 添加关联节点信息
    • 调用 Zep 搜索获取丰富上下文
    • 事实去重
  3. LLM 生成_generate_profile_with_llm):
    • 根据实体类型选择个体/群体提示词
    • 要求 JSON 格式输出
    • 包含 bio(200字符)、persona(2000字符)、age、gender、mbti、country、profession、interested_topics
    • 温度递减重试:0.7 → 0.6 → 0.5(3 次)
    • JSON 修复机制:闭合未关闭的括号/方括号
  4. 规则兜底_generate_profile_rule_based):
    • 如果 LLM 生成失败,使用基于实体类型的规则生成默认人设
    • 不同类型有不同的默认值(如学生 age=20, 教授 age=45)

generate_profiles_from_entities(entities, use_llm, progress_callback, parallel_count=5, ...) -> List[OasisAgentProfile]

批量生成核心方法:

  • 使用 ThreadPoolExecutor 并发生成(默认 5 个工作线程)
  • 预分配结果列表保持顺序
  • 实时输出:每完成一个就写入文件(Reddit JSON 或 Twitter CSV)
  • 进度回调:报告 current/total 和当前实体名
  • 失败兜底:如果某个实体的 LLM 生成失败,创建基本人设
平台输出格式差异

Reddit (JSON):

{
    "realname": "张三",
    "username": "zhang_san_123",
    "bio": "北京大学计算机系大四学生...",
    "persona": "张三是一名对技术充满热情的...",
    "age": 22,
    "gender": "male",
    "mbti": "INTJ",
    "country": "China",
    "profession": "Student",
    "interested_topics": ["AI", "开源", "编程"]
}

Twitter (CSV):

user_id,user_name,name,bio,friend_count,follower_count,statuses_count,created_at
0,zhang_san_123,张三,北京大学计算机系...,50,100,200,2025-01-01

4.6 SimulationConfigGenerator — 仿真配置生成器

文件: backend/app/services/simulation_config_generator.py(约 987 行)
职责: 基于 LLM 生成完整的仿真运行参数配置

配置数据类

TimeSimulationConfig — 时间配置:

@dataclass
class TimeSimulationConfig:
    total_simulation_hours: int = 72    # 仿真总时长(小时)
    minutes_per_round: int = 60         # 每轮模拟分钟数
    agents_per_hour_min: int            # 每小时最少活跃智能体数
    agents_per_hour_max: int            # 每小时最多活跃智能体数

    # 中国时区时段划分
    peak_hours: List[int] = [19, 20, 21, 22]        # 高峰时段
    off_peak_hours: List[int] = [0, 1, 2, 3, 4, 5]  # 低谷时段
    morning_hours: List[int] = [6, 7, 8]              # 早间时段
    work_hours: List[int] = [9, 10, ..., 18]          # 工作时段

    # 活跃度乘数
    peak_activity_multiplier: float = 1.5       # 高峰 1.5 倍
    off_peak_activity_multiplier: float = 0.05  # 低谷 0.05 倍
    morning_activity_multiplier: float = 0.4    # 早间 0.4 倍
    work_activity_multiplier: float = 0.7       # 工作 0.7 倍

AgentActivityConfig — 单个智能体活动配置:

@dataclass
class AgentActivityConfig:
    agent_id: int
    entity_uuid: str
    entity_name: str
    entity_type: str
    activity_level: float          # 0.0-1.0,活跃概率
    posts_per_hour: float          # 每小时发帖数
    comments_per_hour: float       # 每小时评论数
    active_hours: List[int]        # 活跃时间段
    response_delay_min: int        # 最小响应延迟(秒)
    response_delay_max: int        # 最大响应延迟(秒)
    sentiment_bias: str            # 情感偏向
    stance: str                    # 立场
    influence_weight: float        # 影响力权重

EventConfig — 事件配置:

@dataclass
class EventConfig:
    initial_posts: List[Dict]      # 初始种子帖子
    scheduled_events: List[Dict]   # 定时事件
    hot_topics: List[str]          # 热门话题
    narrative_direction: str       # 叙事方向

PlatformConfig — 平台配置:

@dataclass
class PlatformConfig:
    platform: str                  # "twitter" 或 "reddit"
    recency_weight: float          # 时效性权重
    popularity_weight: float       # 热度权重
    relevance_weight: float        # 相关性权重
    viral_threshold: float         # 传播阈值
    echo_chamber_strength: float   # 回音室效应强度
配置生成流程

generate_config(simulation_id, project_id, graph_id, simulation_requirement, document_text, entities, enable_twitter, enable_reddit, progress_callback) -> SimulationParameters

分步骤生成,每步调用 LLM:

  1. 构建上下文 (step 1):

    • 合并仿真需求、实体摘要(每类最多 20 个,每个摘要最多 300 字符)、文档文本
    • 总长度限制在 MAX_CONTEXT_LENGTH = 50000 字符
  2. 生成时间配置 (step 2):

    • LLM 根据场景推理仿真时长、每轮时间、活跃度分布
    • 验证:确保 agents_per_hour 不超过实际实体数量
    • 失败回退到默认配置
  3. 生成事件配置 (step 3):

    • LLM 生成热门话题、叙事方向、初始帖子
    • 初始帖子包含 poster_type 字段(匹配实体类型)
  4. 分批生成智能体配置 (steps 4 ~ N):

    • 每批 15 个智能体(AGENTS_PER_BATCH = 15
    • LLM 为每个智能体生成 activity_level、active_hours、stance 等
    • 失败回退到规则生成
  5. 分配初始帖子发布者 (step N+1):

    • 将 EventConfig 中的初始帖子匹配到具体智能体
    • 优先按 entity_type 匹配,找不到则按 influence_weight 排名选取
  6. 生成平台配置 (final step):

    • 为启用的平台生成推荐算法权重参数
规则兜底配置

当 LLM 生成失败时,按实体类型使用预设配置:

实体类型 activity_level posts/h active_hours response_delay influence_weight
University/Gov/NGO 0.2 0.1 9-18 60-240s 3.0
MediaOutlet 0.5 0.8 7-24 5-30s 2.5
Professor/Expert 0.4 0.3 8-22 30-120s 2.0
Student 0.8 0.6 混合时段 5-30s 0.8
默认 0.7 0.4 8-23 10-60s 1.0

4.7 SimulationManager — 仿真管理器

文件: backend/app/services/simulation_manager.py(约 529 行)
职责: 编排仿真准备流水线(读取实体 → 生成人设 → 生成配置 → 准备文件)

状态机
class SimulationStatus(Enum):
    CREATED = "created"
    PREPARING = "preparing"
    READY = "ready"
    RUNNING = "running"
    PAUSED = "paused"
    STOPPED = "stopped"
    COMPLETED = "completed"
    FAILED = "failed"

状态转换:

CREATED → PREPARING → READY → RUNNING → COMPLETED
                  ↘         ↘         ↘
                  FAILED    PAUSED    STOPPED/FAILED
仿真状态数据
@dataclass
class SimulationState:
    simulation_id: str        # 格式: sim_{uuid[:12]}
    project_id: str
    graph_id: str
    status: SimulationStatus
    enable_twitter: bool
    enable_reddit: bool
    entities_count: int
    profiles_count: int
    entity_types: List[str]
    config_generated: bool
    current_round: int
    created_at: str
    updated_at: str
核心方法

prepare_simulation(simulation_id, simulation_requirement, document_text, ...) -> SimulationState

这是仿真准备的核心编排方法,包含三个主要阶段:

阶段 1: 读取实体 (0-100%)

reader = ZepEntityReader(zep_api_key)
filtered = reader.filter_defined_entities(
    graph_id=state.graph_id,
    defined_entity_types=defined_entity_types,
    enrich_with_edges=True
)
state.entities_count = len(filtered.entities)
state.entity_types = list(filtered.entity_types)

阶段 2: 生成人设 (0-100%)

generator = OasisProfileGenerator(
    api_key=llm_api_key, base_url=llm_base_url,
    model_name=llm_model, zep_api_key=zep_api_key,
    graph_id=state.graph_id
)
profiles = generator.generate_profiles_from_entities(
    entities=filtered.entities,
    use_llm=use_llm_for_profiles,
    parallel_count=parallel_profile_count,  # 默认 3
    realtime_output_path=sim_dir,
    output_platform="reddit"  # 或 "twitter"
)
# 保存到 reddit_profiles.json 和/或 twitter_profiles.csv

阶段 3: 生成配置 (0-100%)

config_gen = SimulationConfigGenerator(
    api_key=llm_api_key, base_url=llm_base_url,
    model_name=llm_model
)
config = config_gen.generate_config(
    simulation_id=state.simulation_id,
    simulation_requirement=simulation_requirement,
    document_text=document_text,
    entities=filtered.entities,
    enable_twitter=state.enable_twitter,
    enable_reddit=state.enable_reddit
)
# 保存到 simulation_config.json

最终将状态设为 READY


4.8 SimulationRunner — 仿真执行器

文件: backend/app/services/simulation_runner.py(约 1000+ 行)
职责: 启动 OASIS 仿真子进程、监控运行状态、解析行为日志

状态定义
class RunnerStatus(Enum):
    IDLE = "idle"
    STARTING = "starting"
    RUNNING = "running"
    PAUSED = "paused"
    STOPPING = "stopping"
    STOPPED = "stopped"
    COMPLETED = "completed"
    FAILED = "failed"
运行时状态
@dataclass
class SimulationRunState:
    simulation_id: str
    runner_status: RunnerStatus
    current_round: int
    total_rounds: int
    simulated_hours: float
    total_simulation_hours: float

    # 平台独立状态
    twitter_current_round: int
    reddit_current_round: int
    twitter_running: bool
    reddit_running: bool
    twitter_completed: bool
    reddit_completed: bool

    # 行为追踪
    recent_actions: List[AgentAction]  # 最新 50 条
    rounds: List[RoundSummary]
核心方法

start_simulation(simulation_id, platform="parallel", max_rounds=None, enable_graph_memory_update=False, graph_id=None) -> SimulationRunState

  1. 验证: 检查仿真是否已在运行
  2. 加载配置: 读取 simulation_config.json 获取 total_rounds
  3. 选择脚本:
    • platform="parallel"run_parallel_simulation.py
    • platform="twitter"run_twitter_simulation.py
    • platform="reddit"run_reddit_simulation.py
  4. 启动子进程:
    cmd = ["python", script_path, "--config", config_file]
    if max_rounds:
        cmd.extend(["--max-rounds", str(max_rounds)])
    
    process = subprocess.Popen(
        cmd,
        start_new_session=True,  # 创建新进程组,便于终止
        stdout=log_file,
        stderr=log_file,
        env={**os.environ, "PYTHONIOENCODING": "utf-8"}
    )
    
  5. 记忆更新(可选): 创建 ZepGraphMemoryUpdater 实例
  6. 启动监控线程: _monitor_simulation() 持续解析 actions.jsonl

_monitor_simulation(simulation_id) — 监控线程

while process.poll() is None:  # 进程仍在运行
    # 读取 Twitter actions.jsonl
    twitter_pos = _read_action_log(twitter_log, twitter_pos, state, "twitter")
    # 读取 Reddit actions.jsonl
    reddit_pos = _read_action_log(reddit_log, reddit_pos, state, "reddit")

    _save_run_state(state)
    time.sleep(1)  # 每秒检查一次

_read_action_log(log_path, position, state, platform) -> int

增量读取 JSONL 日志的核心逻辑:

  1. 从上次读取位置 position 继续
  2. 逐行解析 JSON 对象
  3. 区分事件类型:
    • simulation_end: 标记平台完成
    • round_end: 更新轮次和模拟时间
    • 其他: 解析为 AgentAction 对象,加入状态
  4. 如果启用图谱记忆更新,将行为发送给 ZepGraphMemoryUpdater
  5. 返回新的文件位置

进程终止机制:

def _terminate_process(process, simulation_id, timeout=10):
    if IS_WINDOWS:
        # Windows: 使用 taskkill /T /F 终止进程树
        subprocess.call(['taskkill', '/T', '/F', '/PID', str(process.pid)])
    else:
        # Unix: 使用 os.killpg 发送 SIGTERM 到整个进程组
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)
        # 等待 timeout 秒,若未退出则 SIGKILL 强制终止

4.9 SimulationIPC — 进程间通信

文件: backend/app/services/simulation_ipc.py(约 395 行)
职责: Flask 后端与 OASIS 仿真子进程之间的命令/响应通信

通信协议
命令类型 (CommandType):
  INTERVIEW        — 单个智能体访谈
  BATCH_INTERVIEW  — 批量访谈
  CLOSE_ENV        — 关闭仿真环境

命令状态 (CommandStatus):
  PENDING     — 已创建,等待处理
  PROCESSING  — 正在处理
  COMPLETED   — 处理完成
  FAILED      — 处理失败
文件系统 IPC 原理

这是 MiroFish 的核心设计之一。由于 OASIS 仿真以独立子进程运行,Flask 无法直接调用其内部方法,因此采用文件系统作为通信媒介:

ipc_commands/           ipc_responses/
├── {cmd_id}.json  →   ├── {cmd_id}.json
│   命令文件              │   响应文件
│   Flask 写入            │   OASIS 写入
│                         │   Flask 轮询读取

Flask 端 — SimulationIPCClient:

def send_command(command_type, args, timeout=60.0, poll_interval=0.5):
    # 1. 生成唯一 command_id
    # 2. 将命令序列化为 JSON 写入 ipc_commands/{cmd_id}.json
    # 3. 轮询 ipc_responses/{cmd_id}.json(每 0.5 秒检查一次)
    # 4. 读取响应后删除命令和响应文件
    # 5. 超时抛出 TimeoutError

OASIS 端 — SimulationIPCServer:

def poll_commands() -> Optional[IPCCommand]:
    # 扫描 ipc_commands/ 目录
    # 返回最早的命令(按修改时间排序)

def send_response(response):
    # 将响应写入 ipc_responses/{cmd_id}.json
    # 删除对应的命令文件

环境存活检测: 通过 env_status.json 文件的 status 字段判断环境是否存活。


4.10 ZepGraphMemoryUpdater — 图谱记忆更新器

文件: backend/app/services/zep_graph_memory_updater.py(约 549 行)
职责: 将仿真过程中智能体的行为实时回写到 Zep 知识图谱,实现动态记忆演化

设计原理

这是 MiroFish 的一个重要创新:仿真不仅是从知识图谱中"读取"信息来驱动智能体,还会将仿真过程中的行为"写回"知识图谱。这意味着:

  1. 仿真中的智能体行为会成为知识图谱的一部分
  2. 后续的报告分析可以搜索到仿真中发生的事件
  3. 深度交互时可以基于仿真后的知识图谱进行更精确的查询
行为转自然语言

每种行为类型都有对应的中文描述模板:

# AgentActivity.to_episode_text() 示例:
CREATE_POST    → "{agent_name} 在{platform}上发布了一条帖子:「{content}」"
LIKE_POST      → "{agent_name} 在{platform}上点赞了{post_author}的帖子:「{post_content}」"
REPOST         → "{agent_name} 在{platform}上转发了{original_author}的帖子:「{content}」"
CREATE_COMMENT → "{agent_name} 在{platform}上在{post_author}的帖子下评论道:「{content}」"
FOLLOW         → "{agent_name} 在{platform}上关注了用户「{target_user_name}」"
SEARCH_POSTS   → "{agent_name} 在{platform}上搜索了「{query}」"
MUTE           → "{agent_name} 在{platform}上屏蔽了用户「{target_user_name}」"

平台名称映射:twitter → "世界1", reddit → "世界2"

批量发送机制
BATCH_SIZE = 5           # 累积 5 条行为后发送
SEND_INTERVAL = 0.5      # 批次间隔 0.5 秒
MAX_RETRIES = 3           # 最大重试次数
RETRY_DELAY = 2           # 重试间隔(指数增长)

工作线程 (_worker_loop):

  1. 从活动队列中取出行为
  2. 按平台分组到缓冲区
  3. 缓冲区达到 BATCH_SIZE 时,将多条行为的自然语言描述合并
  4. 调用 client.graph.add(graph_id, type="text", data=combined_text) 写入 Zep
  5. 指数退避重试

ZepGraphMemoryManager — 单例管理器:

class ZepGraphMemoryManager:
    """管理多个仿真的记忆更新器实例"""
    _updaters: Dict[str, ZepGraphMemoryUpdater]

    @classmethod
    def create_updater(cls, simulation_id, graph_id) -> ZepGraphMemoryUpdater

    @classmethod
    def stop_updater(cls, simulation_id)

    @classmethod
    def stop_all(cls)

4.11 ReportAgent — 报告生成Agent

文件: backend/app/services/report_agent.py(约 1000+ 行)
职责: 基于 ReACT(Reasoning + Acting)模式,利用工具集深度分析仿真结果,生成预测报告

设计理念

ReportAgent 模仿人类分析师的工作方式:先规划报告结构,再针对每个章节反复调研(搜索、访谈)和撰写。它使用"思考-行动-观察"循环:

对每个章节:
  1. 思考: 我需要搜集哪些信息来写这个章节?
  2. 行动: 调用工具(搜索图谱、访谈智能体等)
  3. 观察: 分析工具返回的结果
  4. 重复 1-3,直到信息充分
  5. 撰写: 基于收集的信息生成章节内容
工具定义

ReportAgent 拥有 4 种工具:

工具名 功能 适用场景
insight_forge 深度分析:自动生成子问题,多维度搜索 复杂分析,需要多角度理解
panorama_search 全景搜索:包含历史/过期内容 全局概览,时序分析
quick_search 快速精确搜索 已知关键词的快速查找
interview_agents 智能体访谈:通过 IPC 与 OASIS 中的智能体对话 获取第一人称视角和引言
配置参数
MAX_TOOL_CALLS_PER_SECTION = 5    # 每个章节最多调用 5 次工具
MAX_REFLECTION_ROUNDS = 3         # 最多 3 轮反思/修改
MAX_TOOL_CALLS_PER_CHAT = 2       # 聊天模式每轮最多 2 次工具调用
报告生成流程

规划阶段:

PLAN_SYSTEM_PROMPT = """
你是一位专业的未来预测报告撰写专家。
你的任务是基于多智能体社交媒体模拟推演的结果,
撰写一份详尽的预测分析报告...
"""
  • LLM 生成报告大纲(ReportOutline),包含标题、摘要、章节列表

章节生成阶段:

SECTION_SYSTEM_PROMPT_TEMPLATE = """
你现在需要撰写报告的一个章节:{section_title}
你可以使用以下工具来收集信息...
必须引用原始事件语录,避免自行编造内容...
"""
  • 对每个章节进行 ReACT 循环
  • 强制最少工具调用次数
  • 检测未使用的工具并建议使用
  • 超过最大工具调用次数后强制撰写

输出格式: Markdown 格式的完整报告

日志系统

ReportLogger: JSONL 格式的结构化日志

{
    "timestamp": "2025-01-01T12:00:00",
    "elapsed_seconds": 45.2,
    "report_id": "report_xxx",
    "action": "tool_call",
    "stage": "section_generation",
    "details": {
        "section": "第二章:舆论传播分析",
        "tool_name": "insight_forge",
        "parameters": {"query": "..."}
    }
}

记录的动作类型:report_start, planning_start, section_start, react_thought, tool_call, tool_result, llm_response, section_content, section_complete, report_complete, error


4.12 ZepToolsService — Zep 工具服务

文件: backend/app/services/zep_tools.py(约 1000+ 行)
职责: 为 ReportAgent 提供统一的 Zep 图谱搜索接口,封装多种搜索策略

搜索结果数据类
@dataclass
class SearchResult:
    facts: List[str]
    edges: List[EdgeInfo]
    nodes: List[NodeInfo]
    query: str
    total_count: int

@dataclass
class NodeInfo:
    uuid: str
    name: str
    labels: List[str]
    summary: str
    attributes: Dict

@dataclass
class EdgeInfo:
    uuid: str
    name: str
    fact: str
    source_node: Dict        # {uuid, name}
    target_node: Dict        # {uuid, name}
    # 时序字段
    created_at: Optional[str]
    valid_at: Optional[str]
    invalid_at: Optional[str]
    expired_at: Optional[str]

    @property
    def is_expired(self) -> bool   # 判断关系是否过期
    @property
    def is_invalid(self) -> bool   # 判断关系是否失效
核心搜索方法

search_graph(graph_id, query, limit=10, scope="edges") -> SearchResult

  • 优先使用 Zep Cloud Search API
  • 如果 API 不可用,回退到本地关键词匹配 (_local_search)

_local_search(graph_id, query, limit, scope) -> SearchResult

  • 回退方法:对所有边/节点进行关键词匹配
  • 计算匹配分数,返回最佳结果
高级搜索工具

insight_forge(graph_id, query, simulation_requirement, report_context, max_sub_queries=5) -> InsightForgeResult

深度分析工具的实现:

  1. 子问题生成: 调用 LLM 将主查询分解为多个子问题
  2. 多维搜索: 对每个子问题执行语义搜索
  3. 实体洞察: 提取关键实体的详细信息
  4. 关系链: 追踪实体间的关系路径
  5. 统计汇总: 返回总事实数、实体数、关系数

panorama_search(graph_id, query, include_expired=True) -> PanoramaResult

全景搜索工具:

  1. 获取图谱中所有节点和边
  2. 分离活跃事实和历史事实(基于 expired_at/invalid_at)
  3. 返回完整的图谱视图

interview_agents(graph_id, interviews_config) -> InterviewResult

智能体访谈工具:

  1. 选择相关智能体
  2. 通过 IPC 发送访谈命令到 OASIS 进程
  3. 收集智能体回复
  4. 清洗引言中的格式问题
  5. 生成访谈摘要

第五部分:后端API接口详解

5.1 API 总览

后端通过 Flask Blueprint 组织 API,挂载在三个路径下:

Blueprint 挂载路径 功能
graph_bp /api/graph 图谱构建相关(本体论、图谱、项目管理)
simulation_bp /api/simulation 仿真相关(创建、准备、运行、状态查询)
report_bp /api/report 报告相关(生成、查看、聊天、日志)

5.2 图谱 API(/api/graph

POST /api/graph/ontology/generate — 生成本体论

请求: multipart/form-data

字段 类型 必填 说明
files File[] PDF/MD/TXT 文档(支持多文件)
simulation_requirement string 仿真需求描述(自然语言)
project_name string 项目名称
additional_context string 额外上下文

处理流程:

  1. 创建 Project → ProjectStatus.CREATED
  2. 保存上传文件到 projects/{project_id}/files/
  3. FileParser 提取各文件文本
  4. TextProcessor 预处理文本
  5. OntologyGenerator.generate() 调用 LLM 生成本体论
  6. 保存到项目 → ProjectStatus.ONTOLOGY_GENERATED

响应:

{
    "success": true,
    "data": {
        "project_id": "proj_xxxxx",
        "project_name": "项目名称",
        "ontology": {
            "entity_types": [...],
            "edge_types": [...]
        },
        "analysis_summary": "基于文档的分析摘要...",
        "files": [{"filename": "xxx.pdf", "path": "...", "size": 12345}],
        "total_text_length": 50000
    }
}
POST /api/graph/build — 构建知识图谱

请求: application/json

{
    "project_id": "proj_xxxxx",
    "graph_name": "图谱名称",
    "chunk_size": 500,
    "chunk_overlap": 50,
    "force": false
}

处理流程:

  1. 验证项目存在且状态为 ONTOLOGY_GENERATED
  2. 启动后台线程执行图谱构建
  3. 返回 task_id 供轮询

响应:

{
    "success": true,
    "data": {
        "project_id": "proj_xxxxx",
        "task_id": "uuid-xxxx",
        "message": "Graph building task started"
    }
}
GET /api/graph/task/{task_id} — 查询任务状态

响应:

{
    "success": true,
    "data": {
        "task_id": "uuid-xxxx",
        "task_type": "构建图谱",
        "status": "processing",
        "progress": 45,
        "message": "正在处理文本批次 3/10...",
        "result": null,
        "error": null
    }
}
GET /api/graph/data/{graph_id} — 获取图谱数据

返回完整的节点和边数据,用于前端 D3 可视化。

DELETE /api/graph/delete/{graph_id} — 删除图谱
GET/DELETE /api/graph/project/{project_id} — 项目管理
POST /api/graph/project/{project_id}/reset — 重置项目(回到 ONTOLOGY_GENERATED 状态)

5.3 仿真 API(/api/simulation

实体查询
方法 路径 说明
GET /entities/{graph_id} 获取所有过滤后的实体
GET /entities/{graph_id}/{entity_uuid} 获取单个实体详情
GET /entities/{graph_id}/by-type/{entity_type} 按类型获取实体
POST /api/simulation/create — 创建仿真

请求:

{
    "project_id": "proj_xxxxx",
    "graph_id": "mirofish_xxxxx",
    "enable_twitter": true,
    "enable_reddit": true
}

响应: 返回 simulation_id

POST /api/simulation/prepare — 准备仿真环境

请求:

{
    "simulation_id": "sim_xxxxx",
    "entity_types": ["Student", "Professor"],
    "use_llm_for_profiles": true,
    "parallel_profile_count": 5,
    "force_regenerate": false
}

智能检测逻辑:

  • 如果已准备完毕(config_generated=true),直接返回
  • 否则启动后台任务(三阶段:读取实体 → 生成人设 → 生成配置)

响应:

{
    "success": true,
    "data": {
        "simulation_id": "sim_xxxxx",
        "task_id": "task_uuid",
        "status": "preparing",
        "expected_entities_count": 42,
        "entity_types": ["Student", "Professor"]
    }
}
POST /api/simulation/prepare/status — 查询准备进度
POST /api/simulation/start — 启动仿真

请求:

{
    "simulation_id": "sim_xxxxx",
    "platform": "parallel",
    "max_rounds": 100,
    "enable_graph_memory_update": false,
    "force": false
}

智能启动逻辑:

  1. 检查准备是否完成
  2. 如果已在运行且非 force,返回错误
  3. 如果 force,停止现有进程,清理日志
  4. 调用 SimulationRunner.start_simulation()

响应:

{
    "success": true,
    "data": {
        "simulation_id": "sim_xxxxx",
        "runner_status": "running",
        "process_pid": 12345,
        "twitter_running": true,
        "reddit_running": true,
        "graph_memory_update_enabled": true
    }
}
GET /api/simulation/{simulation_id}/run-status — 运行状态

响应:

{
    "runner_status": "running",
    "current_round": 5,
    "total_rounds": 144,
    "progress_percent": 3.5,
    "simulated_hours": 2,
    "total_simulation_hours": 72,
    "twitter_running": true,
    "reddit_running": true,
    "twitter_actions_count": 150,
    "reddit_actions_count": 200,
    "total_actions_count": 350
}
其他仿真 API
方法 路径 说明
POST /stop 停止运行中的仿真
GET /{id}/profiles 获取智能体人设列表
GET /{id}/profiles/realtime 实时获取人设生成进度
GET /{id}/config 获取仿真配置
GET /{id}/config/realtime 实时获取配置生成进度
GET /list 列出所有仿真
GET /history 获取富化的历史记录
GET /{id}/actions 获取行为日志

5.4 报告 API(/api/report

POST /api/report/generate — 生成报告

请求:

{
    "simulation_id": "sim_xxxxx",
    "force_regenerate": false
}

处理流程:

  1. 检查是否已有报告(非 force_regenerate 时跳过)
  2. 获取仿真、项目、图谱信息
  3. 创建 ReportAgent 实例
  4. 后台线程执行 agent.generate_report()
  5. 分节保存并更新进度
POST /api/report/generate/status — 报告生成进度
GET /api/report/{report_id} — 获取报告内容

响应:

{
    "success": true,
    "data": {
        "report_id": "report_xxxxx",
        "simulation_id": "sim_xxxxx",
        "status": "completed",
        "title": "报告标题",
        "markdown_content": "# 完整 Markdown 报告内容...",
        "outline": {
            "title": "...",
            "summary": "...",
            "sections": [
                {"title": "第一章:...", "content": "..."}
            ]
        }
    }
}
POST /api/report/chat — 与 ReportAgent 对话

请求:

{
    "simulation_id": "sim_xxxxx",
    "message": "请解释这个预测结论的依据",
    "chat_history": [
        {"role": "user", "content": "之前的问题"},
        {"role": "assistant", "content": "之前的回答"}
    ]
}

响应: 包含回答文本、工具调用记录、信息来源

日志和状态查询
方法 路径 说明
GET /{id}/progress 实时生成进度和当前章节
GET /{id}/sections 获取已生成的所有章节元数据
GET /{id}/section/{index} 获取单个章节内容
GET /{id}/agent-log 分页获取 Agent 执行日志
GET /{id}/agent-log/stream 一次性获取所有日志
GET /{id}/console-log 分页获取控制台输出
GET /{id}/download 下载 Markdown 文件
GET /check/{simulation_id} 检查报告状态

5.5 健康检查

方法 路径 说明
GET /health 返回 {"status": "ok"}

5.6 统一错误响应格式

所有 API 在出错时返回统一格式:

{
    "success": false,
    "error": "人类可读的错误描述",
    "traceback": "完整 Python 堆栈跟踪(开发模式下)"
}

HTTP 状态码使用:

  • 400: 请求参数错误、验证失败
  • 404: 资源不存在
  • 500: 服务器内部错误

第六部分:后端数据模型

6.1 Project 模型

文件: backend/app/models/project.py

项目状态机
class ProjectStatus(Enum):
    CREATED = "created"                    # 文件已上传,无本体论
    ONTOLOGY_GENERATED = "ontology_generated"  # 本体论已生成
    GRAPH_BUILDING = "graph_building"      # 正在构建图谱
    GRAPH_COMPLETED = "graph_completed"    # 图谱构建完成
    FAILED = "failed"                      # 任意步骤失败

状态转换流:

CREATED → ONTOLOGY_GENERATED → GRAPH_BUILDING → GRAPH_COMPLETED
                                     ↘
                                    FAILED
项目数据结构
@dataclass
class Project:
    project_id: str              # 格式: proj_{uuid[:5]}
    name: str                    # 项目名称
    status: ProjectStatus        # 当前状态
    created_at: str              # ISO 时间戳
    updated_at: str              # ISO 时间戳
    files: List[Dict]            # [{filename, path, size}]
    total_text_length: int       # 提取的文本总字符数
    ontology: Optional[Dict]     # {entity_types, edge_types}
    analysis_summary: str        # LLM 分析摘要
    graph_id: Optional[str]      # Zep 图谱 ID
    graph_build_task_id: Optional[str]  # 异步构建任务 ID
    simulation_requirement: str  # 用户仿真需求描述
    chunk_size: int              # 文本分块大小
    chunk_overlap: int           # 分块重叠量
    error: Optional[str]         # 错误信息
ProjectManager(静态方法类)
方法 功能 存储位置
create_project(name) 创建新项目目录和元数据 uploads/projects/proj_xxx/
save_project(project) 持久化项目状态 project.json
get_project(project_id) 加载项目状态 project.json
list_projects(limit) 列出所有项目(按创建时间降序) 扫描目录
delete_project(project_id) 删除整个项目目录 shutil.rmtree
save_file_to_project(id, file, name) 保存上传文件 files/{safe_name}
save_extracted_text(id, text) 保存提取的文本 extracted_text.txt
get_extracted_text(id) 读取提取的文本 extracted_text.txt
get_project_files(id) 获取项目所有文件路径 扫描 files/
文件安全处理

上传文件使用 werkzeug.utils.secure_filename() 处理文件名,防止路径遍历攻击。对中文文件名进行 UUID 前缀处理以确保安全性。


6.2 Task 模型

文件: backend/app/models/task.py

任务状态
class TaskStatus(Enum):
    PENDING = "pending"        # 已创建未开始
    PROCESSING = "processing"  # 执行中
    COMPLETED = "completed"    # 成功完成
    FAILED = "failed"          # 执行失败
任务数据结构
@dataclass
class Task:
    task_id: str               # UUID
    task_type: str             # 任务类型描述(如 "构建图谱"、"report_generate")
    status: TaskStatus
    created_at: datetime
    updated_at: datetime
    progress: int              # 0-100 百分比
    message: str               # 当前状态消息
    result: Optional[Dict]     # 任务完成后的结果
    error: Optional[str]       # 错误堆栈
    metadata: Dict             # 额外上下文(如 project_id, graph_id)
    progress_detail: Dict      # 详细阶段进度
TaskManager(单例模式 + 线程安全)
class TaskManager:
    _instance = None
    _instance_lock = threading.Lock()
    _task_lock = threading.Lock()
    _tasks: Dict[str, Task] = {}

    @classmethod
    def get_instance(cls) -> 'TaskManager':
        """双重检查锁定的单例实现"""
        if cls._instance is None:
            with cls._instance_lock:
                if cls._instance is None:
                    cls._instance = TaskManager()
        return cls._instance

关键设计: TaskManager 是纯内存存储,重启后任务数据丢失。通过 cleanup_old_tasks(max_age_hours=24) 清理过期任务防止内存泄漏。

方法 线程安全 说明
create_task(task_type, metadata) 创建任务,返回 task_id
get_task(task_id) 获取任务对象
update_task(task_id, ...) 更新进度/状态/消息
complete_task(task_id, result) 标记完成,progress=100
fail_task(task_id, error) 标记失败
list_tasks(task_type) 列出任务
cleanup_old_tasks(max_age_hours) 清理旧任务

第七部分:后端工具模块

7.1 FileParser — 文件解析器

文件: backend/app/utils/file_parser.py

支持的文件格式
格式 解析方法 依赖库
PDF _extract_from_pdf PyMuPDF (fitz)
Markdown _extract_from_md 原生文本读取
TXT _extract_from_txt 原生文本读取
编码检测策略

_read_text_with_fallback(file_path) 的四级回退策略:

1. UTF-8 直接解码
      ↓ 失败
2. charset_normalizer 自动检测
      ↓ 失败
3. chardet 检测
      ↓ 失败
4. UTF-8 + errors='replace'(用 � 替换无法解码的字节)

这种多级回退确保了对各种中文编码(UTF-8、GBK、GB2312、Big5 等)的兼容。

文本分块函数

split_text_into_chunks(text, chunk_size=500, overlap=50) -> List[str]

核心逻辑:

def split_text_into_chunks(text, chunk_size=500, overlap=50):
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        if end < len(text):
            # 尝试在句子边界处分割
            for sep in ['。', '!', '?', '.\n', '!\n', '?\n', '\n\n']:
                boundary = text.rfind(sep, start + chunk_size // 2, end)
                if boundary != -1:
                    end = boundary + len(sep)
                    break
        chunk = text[start:end].strip()
        if chunk:
            chunks.append(chunk)
        start = end - overlap  # 保留重叠
    return chunks

7.2 LLMClient — LLM 通信客户端

文件: backend/app/utils/llm_client.py

初始化
class LLMClient:
    def __init__(self, api_key=None, base_url=None, model=None):
        self.api_key = api_key or Config.LLM_API_KEY
        self.base_url = base_url or Config.LLM_BASE_URL
        self.model = model or Config.LLM_MODEL_NAME
        self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)
核心方法

chat(messages, temperature=0.7, max_tokens=4096, response_format=None) -> str

  1. 调用 client.chat.completions.create()
  2. 获取响应文本
  3. 清除 <think> 标签: 某些模型(如 DeepSeek)会输出思维过程,使用正则删除:
    content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)
    
  4. 返回清理后的文本

chat_json(messages, temperature=0.7, max_tokens=4096) -> Dict

  1. 设置 response_format={"type": "json_object"}
  2. 调用 chat() 获取文本
  3. 清理 Markdown 代码块标记(json ...
  4. 解析为 Python Dict
  5. 解析失败则抛出 ValueError

7.3 Logger — 日志系统

文件: backend/app/utils/logger.py

日志配置
def setup_logger(name='mirofish', level=logging.DEBUG):
    # 文件处理器:logs/YYYY-MM-DD.log
    # - DEBUG 级别
    # - RotatingFileHandler: 10MB 单文件,保留 5 个备份
    # - 格式: [2025-01-01 12:00:00,000] DEBUG [module.function:42] 消息

    # 控制台处理器:stdout
    # - INFO 级别
    # - 格式: [12:00:00] INFO: 消息
Windows UTF-8 处理
if sys.platform == 'win32':
    sys.stdout.reconfigure(encoding='utf-8', errors='replace')
    sys.stderr.reconfigure(encoding='utf-8', errors='replace')

7.4 Retry — 重试机制

文件: backend/app/utils/retry.py

装饰器用法
@retry_with_backoff(
    max_retries=3,           # 最大重试次数
    initial_delay=1.0,       # 初始延迟(秒)
    max_delay=30.0,          # 最大延迟上限
    backoff_factor=2.0,      # 退避因子
    jitter=True,             # 添加随机抖动
    exceptions=(Exception,)  # 捕获的异常类型
)
def my_api_call():
    ...

退避公式:

delay = min(delay × backoff_factor, max_delay) × (0.5 + random())

示例延迟序列(initial_delay=1.0, backoff_factor=2.0):

第1次重试: ~1.0s (0.5~1.5s)
第2次重试: ~2.0s (1.0~3.0s)
第3次重试: ~4.0s (2.0~6.0s)
RetryableAPIClient
class RetryableAPIClient:
    def call_with_retry(self, func, *args, **kwargs):
        """执行单个函数并重试"""

    def call_batch_with_retry(self, func, items, *args, **kwargs):
        """批量执行,返回 (results, failures)"""
        # 每个 item 独立重试,失败不影响其他 item

7.5 ZepPaging — Zep 分页工具

文件: backend/app/utils/zep_paging.py

分页策略
_DEFAULT_PAGE_SIZE = 100   # 每页 100 条
_MAX_NODES = 2000          # 节点最大获取量
_DEFAULT_MAX_RETRIES = 3   # 每页最大重试次数
_DEFAULT_RETRY_DELAY = 2.0 # 重试初始延迟

def fetch_all_nodes(client, graph_id, page_size=100, max_items=2000):
    """
    使用 UUID 游标分页获取所有节点
    每页独立重试,支持指数退避
    返回最多 max_items 个节点
    """
    all_nodes = []
    cursor = None
    while True:
        page = _fetch_page_with_retry(
            lambda: client.graph.node.get_by_graph_id(
                graph_id, page_size=page_size, cursor=cursor
            )
        )
        all_nodes.extend(page.results)
        if len(all_nodes) >= max_items or not page.results:
            break
        cursor = page.results[-1].uuid  # 使用最后一个 UUID 作为游标
    return all_nodes[:max_items]

def fetch_all_edges(client, graph_id, page_size=100):
    """类似逻辑,但不限制数量"""
重试逻辑
def _fetch_page_with_retry(func, max_retries=3, initial_delay=2.0):
    """
    捕获 ConnectionError, TimeoutError, OSError, InternalServerError
    指数退避重试(2s → 4s → 8s)
    """

第八部分:仿真脚本详解

8.1 ActionLogger — 行为日志系统

文件: backend/scripts/action_logger.py
职责: 为 OASIS 仿真过程提供结构化的行为日志记录

日志架构
SimulationLogManager
├── PlatformActionLogger (Twitter) → twitter/actions.jsonl
├── PlatformActionLogger (Reddit) → reddit/actions.jsonl
└── Main Logger → simulation.log(控制台 + 文件)
JSONL 日志格式

每行一个 JSON 对象:

普通行为记录:

{
    "round": 1,
    "timestamp": "2025-01-01T12:00:00.000000",
    "agent_id": 5,
    "agent_name": "zhang_san_123",
    "action_type": "CREATE_POST",
    "action_args": {
        "content": "这个政策我觉得值得讨论..."
    },
    "result": "success",
    "success": true
}

轮次开始/结束事件:

{
    "type": "event",
    "event": "round_start",
    "round": 2,
    "timestamp": "2025-01-01T12:30:00",
    "simulated_hour": 1.0
}

仿真结束事件:

{
    "type": "event",
    "event": "simulation_end",
    "total_rounds": 144,
    "total_actions": 5280,
    "timestamp": "2025-01-04T12:00:00"
}
PlatformActionLogger 关键方法
class PlatformActionLogger:
    def __init__(self, log_dir: str, platform: str):
        # 创建 {log_dir}/{platform}/actions.jsonl
        self.log_file = open(log_path, 'a', encoding='utf-8')

    def log_action(self, round_num, agent_id, agent_name,
                   action_type, action_args, result, success):
        entry = {
            "round": round_num,
            "timestamp": datetime.now().isoformat(),
            "agent_id": agent_id,
            "agent_name": agent_name,
            "action_type": action_type,
            "action_args": action_args,
            "result": str(result),
            "success": success
        }
        self.log_file.write(json.dumps(entry, ensure_ascii=False) + '\n')
        self.log_file.flush()  # 立即刷新,确保监控线程能读到
UnicodeFormatter

自定义日志格式化器,确保中文字符在控制台正确显示。


8.2 run_parallel_simulation.py — 并行仿真脚本

文件: backend/scripts/run_parallel_simulation.py
职责: 同时运行 Twitter 和 Reddit 两个仿真平台

命令行参数
python run_parallel_simulation.py \
    --config simulation_config.json \  # 仿真配置文件路径
    [--max-rounds N]                   # 最大轮数覆盖
    [--no-wait]                        # 不等待 IPC 命令(仿真结束后直接退出)
    [--twitter-only]                   # 仅运行 Twitter
    [--reddit-only]                    # 仅运行 Reddit
初始化流程
def main():
    # 1. Windows UTF-8 兼容处理
    if sys.platform == 'win32':
        # 猴子补丁: 重写 builtins.open 默认编码为 utf-8
        original_open = builtins.open
        def patched_open(*args, **kwargs):
            if 'encoding' not in kwargs and 'b' not in str(kwargs.get('mode', '')):
                kwargs['encoding'] = 'utf-8'
            return original_open(*args, **kwargs)
        builtins.open = patched_open

    # 2. 加载配置
    config = json.load(open(args.config))

    # 3. LLM 配置优先级: .env > config > 默认值
    llm_api_key = os.getenv("LLM_API_KEY") or config.get("llm_api_key")
    llm_base_url = os.getenv("LLM_BASE_URL") or config.get("llm_base_url")
    llm_model = os.getenv("LLM_MODEL_NAME") or config.get("llm_model", "gpt-4o-mini")

    # 4. 抑制 OASIS 详细日志
    for logger_name in ['social.agent', 'social.twitter', 'social.rec',
                        'oasis.env', 'table']:
        logging.getLogger(logger_name).setLevel(logging.CRITICAL)

    # 5. 初始化日志管理器
    log_manager = SimulationLogManager(simulation_dir)
仿真环境初始化
    # 6. 创建 Twitter 环境
    twitter_env = TwitterEnv(
        agent_graph=twitter_agent_graph,
        llm_config=ModelConfig(model=llm_model, api_key=llm_api_key, ...),
        available_actions=[ActionType.CREATE_POST, ActionType.LIKE_POST,
                          ActionType.REPOST, ActionType.FOLLOW,
                          ActionType.DO_NOTHING, ActionType.QUOTE_POST],
        db_path="twitter_simulation.db"
    )

    # 7. 创建 Reddit 环境
    reddit_env = RedditEnv(
        agent_graph=reddit_agent_graph,
        llm_config=ModelConfig(...),
        available_actions=[ActionType.LIKE_POST, ActionType.DISLIKE_POST,
                          ActionType.CREATE_POST, ActionType.CREATE_COMMENT,
                          ...13 种动作...],
        db_path="reddit_simulation.db"
    )
仿真主循环
    # 8. 执行初始事件(种子帖子)
    for post in config.get("event_config", {}).get("initial_posts", []):
        agent_id = post.get("poster_agent_id")
        content = post.get("content")
        # 在对应平台发布初始帖子

    # 9. 主仿真循环
    total_rounds = config["time_config"]["total_simulation_hours"] * 60 \
                   // config["time_config"]["minutes_per_round"]

    for round_num in range(total_rounds):
        simulated_hour = round_num * minutes_per_round / 60

        # 确定本轮活跃智能体数量
        hour_of_day = int(simulated_hour) % 24
        base_count = random.randint(agents_per_hour_min, agents_per_hour_max)

        # 应用时段乘数
        if hour_of_day in peak_hours:
            target_count = int(base_count * peak_multiplier)
        elif hour_of_day in off_peak_hours:
            target_count = int(base_count * off_peak_multiplier)
        elif hour_of_day in morning_hours:
            target_count = int(base_count * morning_multiplier)
        else:
            target_count = int(base_count * work_multiplier)

        # 选择活跃智能体
        candidates = []
        for agent in all_agents:
            if hour_of_day in agent.active_hours:
                if random.random() < agent.activity_level:
                    candidates.append(agent)
        active_agents = random.sample(candidates, min(target_count, len(candidates)))

        # 在两个平台上执行行为
        for platform_env in [twitter_env, reddit_env]:
            actions = platform_env.step(active_agents)
            for action in actions:
                log_manager.log_action(platform, round_num, action)

    # 10. 仿真结束后进入 IPC 等待模式
    if not args.no_wait:
        ipc_server = SimulationIPCServer(simulation_dir)
        ipc_server.start()
        while True:
            command = ipc_server.poll_commands()
            if command:
                handle_ipc_command(command, twitter_env, reddit_env)
时间模拟机制

MiroFish 使用离散时间步进模拟,核心参数:

total_simulation_hours = 72    → 模拟现实中的 72 小时
minutes_per_round = 60         → 每轮代表 60 分钟
total_rounds = 72 * 60 / 60 = 72 轮

每轮代表的时刻 = round_num × minutes_per_round / 60 (小时)
例: 第 5 轮 = 5 × 60 / 60 = 第 5 小时 = 凌晨 5 点

8.3 run_twitter_simulation.py — Twitter 单平台仿真

文件: backend/scripts/run_twitter_simulation.py

与并行仿真类似,但仅运行 Twitter 平台。

关键差异:

  • 人设格式: CSV(twitter_profiles.csv
  • 数据库: twitter_simulation.db
  • 可用动作: 6 种(CREATE_POST, LIKE_POST, REPOST, FOLLOW, DO_NOTHING, QUOTE_POST)
  • IPCHandler 只处理 Twitter 环境的命令

智能体选择逻辑:

def select_active_agents(all_agents, config, simulated_hour):
    hour_of_day = int(simulated_hour) % 24

    # 1. 计算基础范围
    base_min = config["agents_per_hour_min"]
    base_max = config["agents_per_hour_max"]
    base_count = random.randint(base_min, base_max)

    # 2. 应用时段乘数
    if hour_of_day in config["peak_hours"]:
        multiplier = config.get("peak_activity_multiplier", 1.5)
    elif hour_of_day in config["off_peak_hours"]:
        multiplier = config.get("off_peak_activity_multiplier", 0.3)
    else:
        multiplier = 1.0
    target_count = int(base_count * multiplier)

    # 3. 过滤候选智能体
    candidates = []
    for agent in all_agents:
        agent_config = config["agent_configs"].get(str(agent.id))
        if agent_config:
            if hour_of_day in agent_config.get("active_hours", range(24)):
                if random.random() < agent_config.get("activity_level", 0.5):
                    candidates.append(agent)

    # 4. 随机抽样
    return random.sample(candidates, min(target_count, len(candidates)))

8.4 run_reddit_simulation.py — Reddit 单平台仿真

文件: backend/scripts/run_reddit_simulation.py

与 Twitter 版本对称,但针对 Reddit 平台:

关键差异:

  • 人设格式: JSON(reddit_profiles.json),包含更丰富的字段
  • 数据库: reddit_simulation.db
  • 可用动作: 13 种,包括 DISLIKE_POST, CREATE_COMMENT, SEARCH_POSTS 等
  • Reddit 特有的 TREND 和 REFRESH 动作

8.5 test_profile_format.py — 人设格式测试

文件: backend/scripts/test_profile_format.py
职责: 验证生成的人设文件格式是否符合 OASIS 要求

测试项目:

  1. Twitter CSV 格式:

    • 必需字段: user_id, user_name, name, bio, friend_count, follower_count, statuses_count, created_at
    • 使用 csv.DictReader 验证
  2. Reddit JSON 格式:

    • 必需字段: realname, username, bio, persona
    • 可选字段: age, gender, mbti, country, profession, interested_topics

第九部分:前端架构与实现

9.1 技术架构

前端使用 Vue 3 + Vite 构建,采用单页应用(SPA)架构。

项目配置

package.json:

{
    "dependencies": {
        "vue": "^3.5.24",
        "vue-router": "^4.6.3",
        "axios": "axios",
        "d3": "d3"
    },
    "scripts": {
        "dev": "vite --port 3000 --open",
        "build": "vite build",
        "preview": "vite preview"
    }
}

vite.config.js:

export default defineConfig({
    plugins: [vue()],
    server: {
        port: 3000,
        open: true,
        proxy: {
            '/api': {
                target: 'http://localhost:5001',  // API 代理到后端
                changeOrigin: true
            }
        }
    }
})
index.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <!-- Google Fonts: Inter, JetBrains Mono, Noto Sans SC, Space Grotesk -->
    <title>MiroFish - 预测万物</title>
</head>
<body>
    <div id="app"></div>
    <script type="module" src="/src/main.js"></script>
</body>
</html>

9.2 路由结构

文件: frontend/src/router/index.js

路径 组件 用途
/ Home.vue 首页(文件上传 & 项目选择)
/process/:projectId MainView.vue 图谱构建(步骤 1)
/simulation/:simulationId SimulationView.vue 环境搭建(步骤 2)
/simulation/:simulationId/start SimulationRunView.vue 仿真运行(步骤 3)
/report/:reportId ReportView.vue 报告生成(步骤 4)
/interaction/:reportId InteractionView.vue 深度交互(步骤 5)

路由参数通过 props: true 传递给组件。


9.3 状态管理

文件: frontend/src/store/pendingUpload.js

MiroFish 不使用 Vuex/Pinia,而是使用简单的响应式状态模块:

import { reactive } from 'vue'

const state = reactive({
    files: [],
    requirement: '',
    pending: false
})

export function setPendingUpload(files, requirement) {
    state.files = files
    state.requirement = requirement
    state.pending = true
}

export function getPendingUpload() {
    return { ...state }
}

export function clearPendingUpload() {
    state.files = []
    state.requirement = ''
    state.pending = false
}

设计理由: 这个状态存储解决了首页(Home)和处理页(Process)之间的数据传递问题。用户在首页选择文件和输入需求后,数据暂存在 pendingUpload 中,导航到处理页后读取并清除。


9.4 API 层

Axios 基础配置(api/index.js
const service = axios.create({
    baseURL: '/api',
    timeout: 300000  // 5 分钟超时(适应 LLM 长调用)
})

// 请求拦截器:日志记录
service.interceptors.request.use(config => {
    console.log(`[API] ${config.method.toUpperCase()} ${config.url}`)
    return config
})

// 响应拦截器:错误处理
service.interceptors.response.use(
    response => response.data,
    error => {
        if (error.code === 'ECONNABORTED') {
            console.error('请求超时')
        }
        return Promise.reject(error)
    }
)

// 带重试的请求封装
export function requestWithRetry(requestFn, maxRetries = 3) {
    // 指数退避重试
    return async function (...args) {
        for (let i = 0; i <= maxRetries; i++) {
            try {
                return await requestFn(...args)
            } catch (error) {
                if (i === maxRetries) throw error
                await sleep(Math.pow(2, i) * 1000)  // 1s, 2s, 4s
            }
        }
    }
}
Graph API(api/graph.js
export const generateOntology = (formData) =>
    requestWithRetry(() => service.post('/graph/ontology/generate', formData, {
        headers: { 'Content-Type': 'multipart/form-data' }
    }))()

export const buildGraph = (data) =>
    service.post('/graph/build', data)

export const getTaskStatus = (taskId) =>
    service.get(`/graph/task/${taskId}`)

export const getGraphData = (graphId) =>
    service.get(`/graph/data/${graphId}`)

export const getProject = (projectId) =>
    service.get(`/graph/project/${projectId}`)
Simulation API(api/simulation.js

包含 20+ 个 API 调用函数,覆盖仿真生命周期的每个阶段:

  • 创建、准备、启动、停止仿真
  • 查询状态、人设、配置
  • 获取行为日志和时间线
  • 智能体访谈
  • 历史记录
Report API(api/report.js
export const generateReport = (data) =>
    service.post('/report/generate', data)

export const getReportStatus = (reportId) =>
    service.get(`/report/${reportId}/progress`)

export const getAgentLog = (reportId, fromLine = 0) =>
    service.get(`/report/${reportId}/agent-log`, { params: { from_line: fromLine } })

export const getConsoleLog = (reportId, fromLine = 0) =>
    service.get(`/report/${reportId}/console-log`, { params: { from_line: fromLine } })

export const getReport = (reportId) =>
    service.get(`/report/${reportId}`)

export const chatWithReport = (data) =>
    service.post('/report/chat', data)

9.5 全局样式

App.vue 定义的全局样式:

/* 根字体 */
body {
    font-family: 'JetBrains Mono', 'Space Grotesk', 'Noto Sans SC', monospace;
}

/* 自定义暗色系滚动条 */
::-webkit-scrollbar { width: 6px; }
::-webkit-scrollbar-track { background: #f0f0f0; }
::-webkit-scrollbar-thumb { background: #ccc; border-radius: 3px; }

/* 全局按钮样式 */
button { font-family: inherit; }

色板:

  • 主色: #000000(黑色)、#FFFFFF(白色)
  • 强调色: #FF4500(橙红色)
  • 灰色: #666666(文字)、#E5E5E5(边框)

动画:

  • blink: 1 秒透明度脉冲
  • pulse: 边框脉冲效果
  • 统一使用 cubic-bezier(0.4, 0, 0.2, 1) 缓动曲线

9.6 轮询策略

前端采用定时轮询获取后端异步操作的状态:

场景 轮询间隔 停止条件
任务状态(图谱构建) 2 秒 status === "completed""failed"
图谱数据更新 10 秒 一次性获取后停止
人设生成进度 3 秒 所有人设生成完毕
仿真配置进度 2 秒 配置生成完毕
仿真运行状态 2 秒 runner_status === "completed"
报告生成进度 2 秒 status === "completed"
Agent 日志 3 秒 报告生成完毕

第十部分:前端视图组件详解

10.1 Home.vue — 首页

路径: frontend/src/views/Home.vue
路由: /

页面布局

双栏布局:左侧信息展示区 + 右侧控制台风格操作区

┌─────────────────────────┬──────────────────────────┐
│                         │                          │
│  Hero Section           │  Console Section         │
│  - 渐变标题             │  - 文件上传区域          │
│  - 项目描述             │    (拖拽 + 点击)          │
│  - 功能特点             │  - 仿真需求输入框        │
│                         │  - 已选文件列表          │
│                         │  - 启动按钮              │
│                         │                          │
├─────────────────────────┴──────────────────────────┤
│  HistoryDatabase Component                         │
│  - 历史项目/仿真记录                                │
└────────────────────────────────────────────────────┘
核心交互

文件上传:

// 支持拖拽和点击两种方式
addFiles(newFiles) {
    for (const file of newFiles) {
        const ext = file.name.split('.').pop().toLowerCase()
        if (['pdf', 'md', 'txt', 'markdown'].includes(ext)) {
            this.files.push(file)
        } else {
            alert(`不支持的文件格式: ${ext}`)
        }
    }
}

启动仿真流程:

startSimulation() {
    if (!this.canSubmit) return
    // 暂存文件和需求到全局状态
    setPendingUpload(this.files, this.formData.simulationRequirement)
    // 导航到处理页(使用特殊标记 'new' 表示新项目)
    this.$router.push('/process/new')
}

10.2 MainView.vue — 主处理视图

路径: frontend/src/views/MainView.vue
路由: /process/:projectId

功能概述

这是步骤 1-2 的容器视图,包含左右双面板:

  • 左面板: GraphPanel(知识图谱可视化)
  • 右面板: Step1GraphBuild 或 Step2EnvSetup 组件
三种视图模式
viewMode: 'graph' | 'split' | 'workbench'

// graph: 左面板 100%,右面板隐藏
// split: 左右各 50%
// workbench: 左面板隐藏,右面板 100%
项目初始化逻辑
async initProject() {
    if (this.projectId === 'new') {
        // 新项目: 从 pendingUpload 获取文件和需求
        const { files, requirement } = getPendingUpload()
        clearPendingUpload()
        await this.handleNewProject(files, requirement)
    } else {
        // 已有项目: 加载项目数据
        await this.loadProject(this.projectId)
    }
}

handleNewProject 流程:

  1. 构建 FormData(文件 + 仿真需求)
  2. 调用 generateOntology(formData) API
  3. 获取返回的 project_id
  4. 更新路由到 /process/{project_id}
  5. 设置 currentPhase = 0(本体论已生成)

图谱构建触发:

async startBuildGraph() {
    const res = await buildGraph({
        project_id: this.projectId,
        graph_name: this.projectData.name,
        chunk_size: 500,
        chunk_overlap: 50
    })
    this.startPollingTask(res.data.task_id)
}

startPollingTask(taskId) {
    this.pollInterval = setInterval(() => this.pollTaskStatus(taskId), 2000)
}
系统日志
systemLogs: []  // 活动日志数组

// 添加日志
addLog(message) {
    this.systemLogs.push({
        timestamp: new Date().toISOString(),
        message: message
    })
}

10.3 SimulationView.vue / SimulationRunView.vue

SimulationView.vue 路由 /simulation/:simulationId:

  • 容器视图,嵌套 Step2EnvSetup 组件
  • 加载仿真状态和关联的项目/图谱数据

SimulationRunView.vue 路由 /simulation/:simulationId/start:

  • 容器视图,嵌套 Step3Simulation 组件
  • 管理仿真运行的视图切换

10.4 ReportView.vue — 报告视图

路径: frontend/src/views/ReportView.vue
路由: /report/:reportId

布局: 左面板(GraphPanel)+ 右面板(Step4Report)

数据加载链:

async loadReportData() {
    // 1. 通过 reportId 获取报告 → 得到 simulationId
    // 2. 通过 simulationId 获取仿真状态 → 得到 projectId, graphId
    // 3. 通过 projectId 获取项目数据
    // 4. 通过 graphId 获取图谱可视化数据
}

默认以 workbench 模式显示(Step4Report 全屏),因为报告生成阶段图谱可视化相对不重要。


10.5 InteractionView.vue — 交互视图

路径: frontend/src/views/InteractionView.vue
路由: /interaction/:reportId

布局: 与 ReportView 类似,但右面板为 Step5Interaction 组件。

状态管理:

data() {
    return {
        viewMode: 'workbench',  // 默认全屏交互面板
        simulationId: null,
        currentStatus: 'ready'  // ready | processing | completed | error
    }
}

第十一部分:前端步骤组件详解

11.1 Step1GraphBuild.vue — 图谱构建步骤

文件: frontend/src/components/Step1GraphBuild.vue

Props
Prop 类型 说明
currentPhase Number 0=本体论, 1=构建中, 2=完成
projectData Object 项目数据(含本体论)
ontologyProgress String 本体论生成消息
buildProgress Number 图谱构建进度 (0-100)
graphData Object 图谱节点/边数据
systemLogs Array 系统日志
显示内容

Phase 0 — 本体论展示:

  • 实体类型标签(可点击展开详情)
  • 关系类型标签(可点击展开详情)
  • 每个类型显示名称、描述、属性列表、示例
  • “开始构建图谱” 按钮

Phase 1 — 构建进度:

  • 进度条 + 百分比
  • 当前操作描述

Phase 2 — 构建完成:

  • 统计卡片: 节点数、边数、实体类型数
  • 本体论详情覆盖层
  • “进入环境搭建” 按钮
进入仿真的逻辑
async goToSimulation() {
    // 1. 调用 createSimulation API
    const res = await createSimulation({
        project_id: this.projectData.project_id,
        graph_id: this.projectData.graph_id,
        enable_twitter: true,
        enable_reddit: true
    })
    // 2. 导航到仿真页
    this.$router.push(`/simulation/${res.data.simulation_id}`)
}

11.2 Step2EnvSetup.vue — 环境搭建步骤

文件: frontend/src/components/Step2EnvSetup.vue(约 2000+ 行模板)

五阶段工作流
phase: 0 | 1 | 2 | 3 | 4

// Phase 0: 初始化(读取实体)
// Phase 1: 人设生成(实时显示进度)
// Phase 2: 配置生成(LLM 推理仿真参数)
// Phase 3: 环境编排(汇总配置)
// Phase 4: 完成(可以开始仿真)
实时人设展示
// 每 3 秒轮询 realtime profiles API
pollProfilesInterval = setInterval(async () => {
    const res = await getSimulationProfilesRealtime(simulationId, 'reddit')
    if (res.success) {
        this.profiles = res.data.profiles
        this.profilesProgress = `${res.data.generated}/${res.data.total}`
        if (res.data.completed) {
            clearInterval(this.pollProfilesInterval)
            this.phase = 2  // 进入配置生成阶段
        }
    }
}, 3000)
配置展示

时间配置:

  • 仿真总时长、每轮时长
  • 高峰/工作/早间/低谷时段定义
  • 各时段活跃度乘数

智能体配置展示:

  • 24 小时活跃时间线可视化
  • activity_level、stance、influence_weight 等参数
  • 点击查看完整人设模态框

平台配置:

  • Twitter/Reddit 推荐算法权重
  • viral_threshold、echo_chamber_strength
自定义轮数
// 用户可以通过滑块自定义最大轮数
useCustomRounds: false,
customMaxRounds: null,

// 推荐轮数计算
get recommendedRounds() {
    const timeConfig = this.simulationConfig?.time_config
    if (timeConfig) {
        return Math.ceil(
            timeConfig.total_simulation_hours * 60 / timeConfig.minutes_per_round
        )
    }
    return 72  // 默认
}

11.3 Step3Simulation.vue — 仿真运行步骤

文件: frontend/src/components/Step3Simulation.vue

双平台状态显示
┌─────────────────────────────────────────────────┐
│                  仿真运行面板                      │
│                                                   │
│  ┌──────────────────┐  ┌──────────────────┐      │
│  │ Twitter 信息广场  │  │ Reddit 话题社区   │      │
│  │ 轮次: 5/144      │  │ 轮次: 5/144      │      │
│  │ 行为: 150 条     │  │ 行为: 200 条     │      │
│  │ 状态: 运行中     │  │ 状态: 运行中      │      │
│  └──────────────────┘  └──────────────────┘      │
│                                                   │
│  ┌───────────────────────────────────────────┐   │
│  │ 时间线                                     │   │
│  │ 12:00 张三 发布帖子: "这个政策..."         │   │
│  │ 12:01 李四 点赞了张三的帖子               │   │
│  │ 12:02 王五 评论: "我同意..."              │   │
│  │ ...                                        │   │
│  └───────────────────────────────────────────┘   │
└─────────────────────────────────────────────────┘
行为类型显示映射
const actionTypeMap = {
    'CREATE_POST': { label: '发帖', color: '#4CAF50' },
    'REPOST': { label: '转发', color: '#2196F3' },
    'QUOTE_POST': { label: '引用', color: '#9C27B0' },
    'LIKE_POST': { label: '点赞', color: '#FF9800' },
    'CREATE_COMMENT': { label: '评论', color: '#00BCD4' },
    'FOLLOW': { label: '关注', color: '#E91E63' },
    'SEARCH_POSTS': { label: '搜索', color: '#607D8B' },
    'UPVOTE_POST': { label: '赞同', color: '#8BC34A' },
    'DOWNVOTE_POST': { label: '反对', color: '#F44336' },
    'DO_NOTHING': { label: '观望', color: '#9E9E9E' }
}
实时状态轮询
// 每 2 秒获取运行状态
pollRunStatus() {
    this.statusInterval = setInterval(async () => {
        const res = await getRunStatus(this.simulationId)
        if (res.success) {
            this.runStatus = res.data
            // 检查完成
            if (res.data.runner_status === 'completed') {
                this.phase = 2
                clearInterval(this.statusInterval)
            }
        }
    }, 2000)
}

// 每 2 秒获取最新行为
pollActions() {
    this.actionsInterval = setInterval(async () => {
        const res = await getSimulationActions(this.simulationId, {
            limit: 50, offset: 0
        })
        if (res.success) {
            // 使用 actionIds Set 去重
            for (const action of res.data) {
                const id = `${action.round}_${action.agent_id}_${action.timestamp}`
                if (!this.actionIds.has(id)) {
                    this.actionIds.add(id)
                    this.allActions.unshift(action)  // 最新的在前面
                }
            }
        }
    }, 2000)
}
耗时计算
computed: {
    elapsedTime() {
        if (!this.runStatus?.started_at) return '00:00:00'
        const start = new Date(this.runStatus.started_at)
        const now = new Date()
        const diff = Math.floor((now - start) / 1000)
        const hours = Math.floor(diff / 3600)
        const minutes = Math.floor((diff % 3600) / 60)
        const seconds = diff % 60
        return `${pad(hours)}:${pad(minutes)}:${pad(seconds)}`
    }
}

11.4 Step4Report.vue — 报告生成步骤

文件: frontend/src/components/Step4Report.vue

双面板布局
┌──────────────────────┬──────────────────────┐
│  报告内容面板         │  工作流面板            │
│                      │                      │
│  [章节1] ▼           │  度量指标             │
│  内容...             │  - 已完成章节: 3/8    │
│                      │  - 耗时: 5:23         │
│  [章节2] ▼           │  - 工具调用: 15       │
│  内容...             │                      │
│                      │  工作流步骤           │
│  [章节3] ▼           │  ✓ 开始生成           │
│  (生成中...)         │  ✓ 规划大纲           │
│                      │  → 生成第3章          │
│                      │  ○ 生成第4章          │
│                      │                      │
│                      │  Agent 日志           │
│                      │  [tool_call] insight_ │
│                      │  [tool_result] 找到... │
│                      │  [llm_response] ...   │
└──────────────────────┴──────────────────────┘
Agent 日志时间线
// 不同类型日志的显示配置
const logTypeConfig = {
    'report_start': { icon: '▶', color: '#4CAF50' },
    'planning_start': { icon: '📋', color: '#2196F3' },
    'section_start': { icon: '📝', color: '#FF9800' },
    'tool_call': { icon: '🔧', color: '#9C27B0' },
    'tool_result': { icon: '📊', color: '#00BCD4' },
    'llm_response': { icon: '🤖', color: '#607D8B' },
    'section_complete': { icon: '✅', color: '#4CAF50' },
    'report_complete': { icon: '🎉', color: '#FFD700' },
    'error': { icon: '❌', color: '#F44336' }
}
工具调用展示

每种工具有独特的配色和图标:

const toolConfigs = {
    'insight_forge': {
        label: 'InsightForge 深度分析',
        color: '#9C27B0',
        description: '自动生成子问题,多维度深度搜索'
    },
    'panorama_search': {
        label: 'PanoramaSearch 全景搜索',
        color: '#2196F3',
        description: '全图谱范围搜索,包含历史数据'
    },
    'quick_search': {
        label: 'QuickSearch 快速搜索',
        color: '#FF9800',
        description: '精准关键词快速查找'
    },
    'interview_agents': {
        label: 'InterviewAgents 智能体访谈',
        color: '#4CAF50',
        description: '与仿真中的智能体进行访谈'
    }
}
实时报告渲染
// 增量获取 Agent 日志
async pollAgentLogs() {
    const res = await getAgentLog(this.reportId, this.lastLogLine)
    if (res.success && res.data.logs.length > 0) {
        this.agentLogs.push(...res.data.logs)
        this.lastLogLine = res.data.to_line

        // 检测新章节内容
        for (const log of res.data.logs) {
            if (log.action === 'section_content') {
                const idx = log.details.section_index
                this.generatedSections[idx] = log.details.content
            }
        }
    }
}

11.5 Step5Interaction.vue — 深度交互步骤

文件: frontend/src/components/Step5Interaction.vue

三种交互模式
activeTab: 'chat' | 'survey'
chatTarget: 'report_agent' | 'agent'

// 模式1: 与 ReportAgent 对话(基于报告内容的问答)
// 模式2: 与仿真世界中的智能体 1 对 1 对话
// 模式3: 批量调研(向多个智能体发送同一问题)
与 ReportAgent 对话
async sendToReportAgent(message) {
    this.chatHistory.push({ role: 'user', content: message })

    const res = await chatWithReport({
        simulation_id: this.simulationId,
        message: message,
        chat_history: this.chatHistory.slice(0, -1)  // 排除当前消息
    })

    this.chatHistory.push({
        role: 'assistant',
        content: res.data.response,
        tools: res.data.tool_calls,
        sources: res.data.sources
    })
}
与仿真智能体对话
async sendToAgent(message) {
    const agent = this.profiles[this.selectedAgent]
    this.chatHistory.push({ role: 'user', content: message })

    const res = await interviewAgents({
        simulation_id: this.simulationId,
        interviews: [{
            agent_id: agent.user_id,
            agent_name: agent.realname || agent.name,
            prompt: message
        }]
    })

    const interview = res.data.interviews[0]
    this.chatHistory.push({
        role: 'assistant',
        content: interview.response,
        agent_name: interview.agent_name
    })
}
批量调研
async submitSurvey() {
    const interviews = this.selectedAgents.map(idx => ({
        agent_id: this.profiles[idx].user_id,
        agent_name: this.profiles[idx].realname || this.profiles[idx].name,
        prompt: this.surveyQuestion
    }))

    const res = await interviewAgents({
        simulation_id: this.simulationId,
        interviews: interviews
    })

    this.surveyResults = res.data.interviews
}
聊天历史缓存
// 在切换对话目标时保存/恢复聊天历史
chatHistoryCache: {}  // key: chatTarget, value: chatHistory

switchChatTarget(target) {
    // 保存当前历史
    this.chatHistoryCache[this.chatTarget] = [...this.chatHistory]
    // 恢复目标历史
    this.chatTarget = target
    this.chatHistory = this.chatHistoryCache[target] || []
}

第十二部分:前端子组件详解

12.1 GraphPanel.vue — 知识图谱可视化

文件: frontend/src/components/GraphPanel.vue
技术: D3.js 力导向图(Force-Directed Graph)

D3 力模拟配置
this.simulation = d3.forceSimulation(nodes)
    .force('link', d3.forceLink(edges).id(d => d.uuid).distance(150))
    .force('charge', d3.forceManyBody().strength(-300))
    .force('center', d3.forceCenter(width / 2, height / 2))
    .force('collision', d3.forceCollide().radius(30))

力参数说明:

  • forceLink: 边的弹簧力,自然长度 150px
  • forceManyBody: 节点间斥力,强度 -300
  • forceCenter: 居中力
  • forceCollide: 碰撞检测,半径 30px
多边处理(Multi-Edge)

当两个节点之间有多条边时,使用曲线路径避免重叠:

// 计算曲率
function getCurvature(edge, allEdges) {
    const parallel = allEdges.filter(e =>
        (e.source === edge.source && e.target === edge.target) ||
        (e.source === edge.target && e.target === edge.source)
    )
    if (parallel.length <= 1) return 0
    const idx = parallel.indexOf(edge)
    const step = 0.3  // 曲率步进
    return (idx - (parallel.length - 1) / 2) * step
}

// SVG 路径生成
function drawEdge(d) {
    const curvature = getCurvature(d, allEdges)
    if (curvature === 0) {
        return `M${d.source.x},${d.source.y} L${d.target.x},${d.target.y}`
    }
    const midX = (d.source.x + d.target.x) / 2
    const midY = (d.source.y + d.target.y) / 2
    const dx = d.target.x - d.source.x
    const dy = d.target.y - d.source.y
    const cpX = midX - curvature * dy
    const cpY = midY + curvature * dx
    return `M${d.source.x},${d.source.y} Q${cpX},${cpY} ${d.target.x},${d.target.y}`
}
自环检测
// 节点指向自身的边
if (edge.source === edge.target) {
    const r = 40  // 自环半径
    return `M${x},${y} C${x+r},${y-r} ${x+r},${y+r} ${x},${y}`
}
交互功能
  • 拖拽: D3 drag 行为,拖拽时固定节点位置
  • 缩放/平移: D3 zoom 行为
  • 节点选中: 点击节点显示详情面板(名称、类型、摘要、属性)
  • 边选中: 点击边显示关系详情(关系名、事实描述、源/目标节点)
  • 标签切换: 勾选/取消实体类型的可见性
  • 边标签显示: 开关控制是否显示边上的关系名称
实体类型配色
computed: {
    entityTypes() {
        const types = new Set()
        this.graphData.nodes.forEach(n => {
            n.labels?.forEach(l => {
                if (l !== 'Entity' && l !== 'Node') types.add(l)
            })
        })
        const colors = d3.schemeCategory10
        return [...types].map((type, i) => ({
            name: type,
            color: colors[i % colors.length],
            count: this.graphData.nodes.filter(n => n.labels?.includes(type)).length
        }))
    }
}
渲染更新
// 每帧更新位置
simulation.on('tick', () => {
    // 更新边路径
    edgeElements.attr('d', drawEdge)
    // 更新边标签位置
    edgeLabelElements.attr('transform', d => {
        const midX = (d.source.x + d.target.x) / 2
        const midY = (d.source.y + d.target.y) / 2
        return `translate(${midX},${midY})`
    })
    // 更新节点位置
    nodeElements.attr('transform', d => `translate(${d.x},${d.y})`)
})

12.2 HistoryDatabase.vue — 历史记录组件

文件: frontend/src/components/HistoryDatabase.vue
用途: 在首页展示历史项目和仿真记录

核心特性

IntersectionObserver 自动展开:

mounted() {
    this.observer = new IntersectionObserver(entries => {
        entries.forEach(entry => {
            if (entry.isIntersecting) {
                this.isExpanded = true
            }
        })
    }, { threshold: 0.1 })
    this.observer.observe(this.$el)
}

卡片布局:

┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐
│ 项目 1            │  │ 项目 2            │  │ 项目 3            │
│ ────────────      │  │ ────────────      │  │ ────────────      │
│ 📄 report.pdf    │  │ 📄 novel.md      │  │ 📄 data.txt      │
│ 📊 21 实体       │  │ 📊 35 实体       │  │ 📊 12 实体       │
│ 🔄 72 轮         │  │ 🔄 144 轮        │  │ 🔄 36 轮         │
│ ● 已完成         │  │ ● 运行中         │  │ ● 未开始          │
│                   │  │                   │  │                   │
│ [查看] [继续]    │  │ [查看] [继续]    │  │ [查看] [继续]    │
└──────────────────┘  └──────────────────┘  └──────────────────┘

进度状态映射:

  • not-started: 灰色 — 已创建但未运行
  • in-progress: 蓝色动画 — 正在运行
  • completed: 绿色 — 已完成

文件类型颜色(莫兰迪色系):

const fileTypeColors = {
    PDF: '#C5A3A3',   // 柔和红
    DOC: '#A3B5C5',   // 柔和蓝
    XLS: '#A3C5A3',   // 柔和绿
    PPT: '#C5B5A3',   // 柔和橙
    TXT: '#B5B5C5',   // 柔和紫
    CODE: '#A3C5C5',  // 柔和青
    IMG: '#C5A3C5',   // 柔和粉
    ZIP: '#B5C5A3'    // 柔和黄绿
}

步骤导航:

navigateToStep(project, step) {
    switch (step) {
        case 'graph':
            this.$router.push(`/process/${project.project_id}`)
            break
        case 'simulation':
            this.$router.push(`/simulation/${project.simulation_id}`)
            break
        case 'report':
            this.$router.push(`/report/${project.report_id}`)
            break
        case 'interaction':
            this.$router.push(`/interaction/${project.report_id}`)
            break
    }
}

第十三部分:部署与运维

13.1 源码部署

前置要求
工具 版本 安装检查
Node.js 18+ node -v
Python ≥3.11, ≤3.12 python --version
uv 最新版 uv --version
安装步骤
# 1. 克隆项目
git clone https://github.com/666ghj/MiroFish.git
cd MiroFish

# 2. 配置环境变量
cp .env.example .env
# 编辑 .env,填入 LLM_API_KEY 和 ZEP_API_KEY

# 3. 一键安装所有依赖
npm run setup:all
# 等同于:
#   npm install          (根目录 concurrently)
#   cd frontend && npm install  (前端依赖)
#   cd backend && uv sync       (Python 虚拟环境 + 依赖)

# 4. 启动开发服务器
npm run dev
# 等同于:
#   concurrently "cd backend && uv run python run.py" "cd frontend && npm run dev"

服务地址:

  • 前端: http://localhost:3000
  • 后端 API: http://localhost:5001
分步安装
# 仅安装 Node 依赖
npm run setup

# 仅安装 Python 依赖
npm run setup:backend

# 仅启动后端
npm run backend

# 仅启动前端
npm run frontend

# 构建前端生产版本
npm run build

13.2 Docker 部署

docker-compose.yml 配置
services:
  mirofish:
    image: ghcr.io/666ghj/mirofish:latest
    # 备用镜像(国内加速):
    # image: ghcr.nju.edu.cn/666ghj/mirofish:latest
    ports:
      - "3000:3000"    # 前端
      - "5001:5001"    # 后端
    volumes:
      - ./backend/uploads:/app/backend/uploads  # 持久化数据
    env_file:
      - .env           # 读取环境变量
    restart: unless-stopped
部署步骤
# 1. 配置环境变量
cp .env.example .env
# 编辑 .env

# 2. 拉取镜像并启动
docker compose up -d

# 3. 查看日志
docker compose logs -f
Dockerfile 分析
FROM python:3.11

# 安装 Node.js 18+
RUN curl -fsSL https://deb.nodesource.com/setup_18.x | bash - && \
    apt-get install -y nodejs

# 安装 uv(从官方镜像复制)
COPY --from=ghcr.io/astral-sh/uv:0.9.26 /uv /uvx /bin/

# 层优化: 先复制依赖配置文件
COPY package.json package-lock.json ./
COPY frontend/package.json frontend/package-lock.json frontend/
COPY backend/pyproject.toml backend/uv.lock backend/

# 安装依赖(利用 Docker 缓存)
RUN npm install && cd frontend && npm install
RUN cd backend && uv sync

# 复制源码
COPY . .

EXPOSE 3000 5001
CMD ["npm", "run", "dev"]

13.3 CI/CD 流水线

文件: .github/workflows/docker-image.yml

on:
  workflow_dispatch:     # 手动触发
  push:
    tags: ["*"]          # 推送 tag 时触发

jobs:
  build:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: docker/setup-qemu-action@v3      # 多架构支持
      - uses: docker/setup-buildx-action@v3
      - uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}
      - uses: docker/metadata-action@v5         # 自动标签
        id: meta
      - uses: docker/build-push-action@v5
        with:
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}

推送目标: ghcr.io/{owner}/mirofish
标签策略: Git ref + commit SHA + latest


13.4 日志管理

后端日志
backend/logs/
├── 2025-01-01.log      # 日期轮转日志
├── 2025-01-02.log
└── ...

# 每个文件最大 10MB,保留 5 个备份
# 日志级别: DEBUG(文件)、INFO(控制台)
仿真日志
backend/uploads/simulations/sim_xxx/
├── simulation.log          # 仿真进程标准输出
├── twitter/actions.jsonl   # Twitter 行为日志
└── reddit/actions.jsonl    # Reddit 行为日志
报告日志
backend/uploads/reports/report_xxx/
├── agent_log.jsonl    # 结构化 Agent 执行日志
└── console_log.txt    # 纯文本控制台日志

13.5 数据持久化

所有项目数据存储在 backend/uploads/ 目录。Docker 部署时,该目录通过 volume 映射到宿主机,确保容器重启后数据不丢失。

注意事项:

  • TaskManager 是纯内存存储,重启后任务进度丢失(但仿真状态文件保留)
  • 仿真运行中的子进程在容器重启后终止,需要手动重新启动
  • Zep Cloud 中的图谱数据由 Zep 管理,不依赖本地存储

第十四部分:完整工作流程

14.1 端到端流程详解

以下是用户从上传文档到获取预测报告的完整端到端流程,涉及的每一步技术细节。

流程 1:文档上传与本体论生成
用户操作: 在首页上传 PDF/MD/TXT 文件,输入仿真需求描述
技术流程:

[浏览器]
  │
  ├─ 1. 用户选择文件 + 输入仿真需求
  │     Home.vue → setPendingUpload(files, requirement)
  │     导航到 /process/new
  │
  ├─ 2. MainView.vue 检测 projectId='new'
  │     从 pendingUpload 获取 files 和 requirement
  │     构建 FormData (multipart/form-data)
  │
  ├─ 3. POST /api/graph/ontology/generate
  │     │
  │     ▼
[Flask 后端]
  │
  ├─ 4. ProjectManager.create_project() → proj_xxxxx
  │     创建 uploads/projects/proj_xxxxx/ 目录
  │
  ├─ 5. 保存文件 → projects/proj_xxxxx/files/
  │
  ├─ 6. FileParser.extract_text() 对每个文件:
  │     PDF → PyMuPDF 逐页提取文本
  │     MD/TXT → UTF-8 多级回退读取
  │
  ├─ 7. TextProcessor.preprocess_text()
  │     规范化换行、去除连续空行
  │     保存到 extracted_text.txt
  │
  ├─ 8. OntologyGenerator.generate()
  │     │
  │     ├─ 构建 user_message(文档文本 + 仿真需求)
  │     ├─ 调用 LLM(system=ONTOLOGY_SYSTEM_PROMPT)
  │     │     │
  │     │     ▼
  │     │  [LLM API]
  │     │  分析文档内容和仿真需求
  │     │  返回 JSON:
  │     │    entity_types: 10 种实体类型
  │     │    edge_types: 6-10 种关系类型
  │     │    analysis_summary: 中文分析摘要
  │     │
  │     ├─ _validate_and_process()
  │     │     确保 10 个实体类型
  │     │     确保 Person 和 Organization 兜底类型存在
  │     │     验证属性名不冲突
  │     │
  │     └─ 返回 ontology dict
  │
  ├─ 9. 更新 Project 状态 → ONTOLOGY_GENERATED
  │     保存 ontology 到 project.json
  │
  └─ 10. 返回响应(project_id, ontology, analysis_summary)
        │
        ▼
[浏览器]
  │
  └─ 11. MainView 更新路由 /process/{project_id}
         Step1GraphBuild 展示本体论结果
         显示实体类型标签、关系类型标签
流程 2:知识图谱构建
用户操作: 点击"开始构建图谱"

[浏览器]
  │
  ├─ 1. POST /api/graph/build { project_id, chunk_size:500, chunk_overlap:50 }
  │     │
  │     ▼
[Flask 后端]
  │
  ├─ 2. 验证 Project 状态 = ONTOLOGY_GENERATED
  │
  ├─ 3. 创建 Task(task_type="构建图谱")→ task_id
  │     更新 Project 状态 → GRAPH_BUILDING
  │
  ├─ 4. 启动后台线程(daemon=True)
  │     │
  │     ├─ 5. TextProcessor.split_text(extracted_text, 500, 50)
  │     │     → 文本分成 N 个块,每块约 500 字符
  │     │
  │     ├─ 6. GraphBuilderService.create_graph("项目名")
  │     │     → 调用 Zep API 创建图谱 → graph_id = mirofish_{uuid}
  │     │
  │     ├─ 7. GraphBuilderService.set_ontology(graph_id, ontology)
  │     │     │
  │     │     ├─ 对每个 entity_type 动态创建 Pydantic 模型
  │     │     │     class Student(BaseModel):
  │     │     │         major: Optional[str] = Field(description="专业")
  │     │     │         grade: Optional[str] = Field(description="年级")
  │     │     │
  │     │     ├─ 对每个 edge_type 动态创建 Pydantic 模型
  │     │     │     class STUDIES_AT(BaseModel):
  │     │     │         ...
  │     │     │
  │     │     └─ 调用 client.graph.set_ontology()
  │     │         │
  │     │         ▼
  │     │      [Zep Cloud] 注册本体论到图谱
  │     │
  │     ├─ 8. GraphBuilderService.add_text_batches()
  │     │     每批 3 个 chunk → client.graph.add_batch()
  │     │     批间间隔 1 秒
  │     │     → 返回 episode UUID 列表
  │     │     │
  │     │     ▼
  │     │  [Zep Cloud] 接收文本,进行:
  │     │    - 实体识别(NER)
  │     │    - 关系抽取
  │     │    - 根据本体论分类实体
  │     │    - 构建知识图谱节点和边
  │     │
  │     ├─ 9. _wait_for_episodes(episode_uuids, timeout=600)
  │     │     每 3 秒查询 episode.processed 状态
  │     │     直到所有 episode 处理完毕
  │     │
  │     ├─ 10. _get_graph_info(graph_id)
  │     │     使用分页工具获取所有节点和边
  │     │     统计: node_count, edge_count, entity_types
  │     │
  │     └─ 11. 更新 Project 状态 → GRAPH_COMPLETED
  │           保存 graph_id 到 project.json
  │           完成 Task(progress=100)
  │
  └─ 返回 { task_id }
        │
        ▼
[浏览器]
  │
  └─ 每 2 秒轮询 GET /api/graph/task/{task_id}
     → 更新进度条
     → 完成后加载图谱数据
     → GraphPanel 渲染 D3 力导向图
流程 3:仿真环境准备
用户操作: 点击"进入环境搭建"

[浏览器]
  │
  ├─ 1. POST /api/simulation/create
  │     → simulation_id = sim_xxxxx
  │
  ├─ 2. 导航到 /simulation/{simulation_id}
  │
  ├─ 3. POST /api/simulation/prepare
  │     │
  │     ▼
[Flask 后端] → 后台线程
  │
  ├─ 阶段 1: 读取实体 (进度 0-20%)
  │     ZepEntityReader.filter_defined_entities()
  │     → 获取所有图谱节点
  │     → 过滤: 仅保留本体论定义的实体类型
  │     → 丰富: 为每个实体获取关联边和节点
  │     → 结果: FilteredEntities (例 42 个实体, 8 种类型)
  │
  ├─ 阶段 2: 生成人设 (进度 20-70%)
  │     OasisProfileGenerator(graph_id=xxx)
  │     │
  │     ├─ 对每个实体(5 线程并行):
  │     │     ├─ _build_entity_context()
  │     │     │     合并属性 + 关系 + Zep 搜索结果
  │     │     │
  │     │     ├─ _generate_profile_with_llm()
  │     │     │     │
  │     │     │     ▼
  │     │     │  [LLM API]
  │     │     │  输入: 实体名、类型、摘要、上下文
  │     │     │  输出 JSON:
  │     │     │    bio: "北京大学计算机系大四学生..."
  │     │     │    persona: "张三是一名对技术充满热情的..."
  │     │     │    age: 22, gender: "male", mbti: "INTJ"
  │     │     │    country: "China", profession: "Student"
  │     │     │    interested_topics: ["AI", "开源"]
  │     │     │
  │     │     └─ 写入 reddit_profiles.json(实时)
  │     │
  │     └─ 42 个 OasisAgentProfile 生成完毕
  │
  ├─ 阶段 3: 生成仿真配置 (进度 70-90%)
  │     SimulationConfigGenerator
  │     │
  │     ├─ _generate_time_config() → [LLM] → 时间参数
  │     │     total_simulation_hours: 72
  │     │     minutes_per_round: 60
  │     │     agents_per_hour_min: 3, max: 9
  │     │     peak_hours: [19,20,21,22]
  │     │
  │     ├─ _generate_event_config() → [LLM] → 事件/话题
  │     │     hot_topics: ["政策改革", "学生权益"]
  │     │     initial_posts: [{content: "...", poster_type: "Student"}]
  │     │     narrative_direction: "关注政策对学生的影响"
  │     │
  │     ├─ _generate_agent_configs_batch() → [LLM] × N 批
  │     │     每批 15 个智能体的活动参数
  │     │
  │     ├─ _assign_initial_post_agents()
  │     │     匹配初始帖子发布者
  │     │
  │     └─ 保存 simulation_config.json
  │
  └─ 更新状态 → READY
        │
        ▼
[浏览器]
  │
  └─ Step2EnvSetup 实时展示:
     → 人设生成进度(每 3 秒轮询)
     → 配置参数预览
     → 时间线可视化
     → 自定义轮数滑块
流程 4:仿真执行
用户操作: 设置轮数,点击"开始模拟"

[浏览器]
  │
  ├─ POST /api/simulation/start
  │   { simulation_id, platform:"parallel", max_rounds:72 }
  │     │
  │     ▼
[Flask 后端]
  │
  ├─ SimulationRunner.start_simulation()
  │     │
  │     ├─ 构建命令:
  │     │   python run_parallel_simulation.py --config config.json --max-rounds 72
  │     │
  │     ├─ subprocess.Popen(cmd, start_new_session=True)
  │     │     → 启动独立进程组
  │     │     → stdout/stderr → simulation.log
  │     │
  │     └─ 启动 _monitor_simulation 线程
  │           │
  │           ▼
[OASIS 仿真子进程]
  │
  ├─ 加载 simulation_config.json
  ├─ 加载 reddit_profiles.json / twitter_profiles.csv
  ├─ 初始化 Twitter 环境 (6 种动作)
  ├─ 初始化 Reddit 环境 (13 种动作)
  │
  ├─ 执行初始事件(种子帖子)
  │
  ├─ 主循环 (72 轮):
  │     │
  │     ├─ 计算当前模拟时间 (round × 60min / 60 = 小时)
  │     ├─ 确定活跃智能体数量(时段乘数调节)
  │     │     高峰(19-22): × 1.5
  │     │     低谷(0-5):  × 0.05
  │     │     早间(6-8):  × 0.4
  │     │     工作(9-18): × 0.7
  │     │
  │     ├─ 选择活跃智能体:
  │     │     过滤: 当前时段在 active_hours 中
  │     │     概率: random() < activity_level
  │     │     抽样: random.sample(candidates, target_count)
  │     │
  │     ├─ Twitter 环境执行:
  │     │     每个活跃智能体:
  │     │       [LLM] → 根据 persona + 当前帖子 → 决策行为
  │     │       → 执行 CREATE_POST / LIKE / REPOST / ...
  │     │       → 写入 twitter/actions.jsonl
  │     │
  │     ├─ Reddit 环境执行:
  │     │     类似,但有更多行为类型
  │     │     → 写入 reddit/actions.jsonl
  │     │
  │     └─ 写入轮次结束事件
  │
  ├─ 仿真完成,写入 simulation_end 事件
  │
  └─ 进入 IPC 等待模式(等待 interview 命令)
        │
        ▼
[监控线程] (Flask 内)
  │
  ├─ 每秒读取 actions.jsonl(增量读取)
  ├─ 解析 AgentAction,更新 RunState
  ├─ 检测 simulation_end 事件
  ├─ 可选: 将行为发送到 ZepGraphMemoryUpdater
  │     → 自然语言描述 → Zep 图谱更新
  └─ 保存 run_state.json
        │
        ▼
[浏览器]
  │
  └─ Step3Simulation 每 2 秒轮询:
     → 更新双平台进度
     → 时间线展示最新行为
     → 完成后显示"生成报告"按钮
流程 5:报告生成
用户操作: 点击"生成报告"

[浏览器]
  │
  ├─ POST /api/report/generate { simulation_id }
  │     │
  │     ▼
[Flask 后端] → 后台线程
  │
  ├─ 创建 ReportAgent(graph_id, simulation_id, requirement)
  │
  ├─ 规划阶段:
  │     [LLM] → 生成报告大纲 (ReportOutline)
  │     标题、摘要、8-12 个章节
  │
  ├─ 对每个章节 (ReACT 循环):
  │     │
  │     ├─ 思考: "我需要什么信息来写这个章节?"
  │     │
  │     ├─ 行动: 调用工具
  │     │     ├─ insight_forge → [Zep] 多维搜索 + 子问题分解
  │     │     ├─ panorama_search → [Zep] 全景搜索
  │     │     ├─ quick_search → [Zep] 快速搜索
  │     │     └─ interview_agents → [IPC] → [OASIS] 智能体访谈
  │     │
  │     ├─ 观察: 分析工具返回结果
  │     │
  │     ├─ 重复(最多 5 次工具调用,3 轮反思)
  │     │
  │     └─ 撰写: [LLM] 基于收集的信息生成章节内容
  │           → 保存 section_XX.md
  │           → 更新进度
  │
  ├─ 合并所有章节 → full_report.md
  │
  └─ 更新 Report 状态 → COMPLETED
        │
        ▼
[浏览器]
  │
  └─ Step4Report:
     → 左面板: 分节展示报告内容
     → 右面板: Agent 日志时间线
     → 完成后显示"深度交互"按钮
流程 6:深度交互
用户操作: 在交互面板中选择对话对象并发送消息

模式 A: 与 ReportAgent 对话
[浏览器]
  ├─ POST /api/report/chat { simulation_id, message, chat_history }
  │     │
  │     ▼
  │  [Flask] → ReportAgent.chat()
  │     │
  │     ├─ [LLM] 分析问题
  │     ├─ 可能调用工具 (最多 2 次/轮)
  │     └─ [LLM] 生成回答
  │
  └─ 显示回答 + 工具调用记录

模式 B: 与仿真智能体对话
[浏览器]
  ├─ POST /api/simulation/interview
  │   { simulation_id, interviews: [{agent_id, prompt}] }
  │     │
  │     ▼
  │  [Flask] → SimulationIPCClient.send_interview()
  │     │
  │     ├─ 写入 ipc_commands/{cmd_id}.json
  │     │     │
  │     │     ▼
  │     │  [OASIS 进程] SimulationIPCServer.poll_commands()
  │     │     → 找到智能体
  │     │     → [LLM] 以智能体人设回答问题
  │     │     → 写入 ipc_responses/{cmd_id}.json
  │     │
  │     └─ 轮询响应文件 → 返回回答
  │
  └─ 显示智能体回复

模式 C: 批量调研
[浏览器]
  ├─ 选择多个智能体 + 输入调研问题
  ├─ POST /api/simulation/interview
  │   { interviews: [{agent1}, {agent2}, ...] }
  │     │
  │     ▼
  │  [Flask] → SimulationIPCClient.send_batch_interview()
  │     → 同上,但批量处理
  │
  └─ 网格展示所有智能体的回答

14.2 数据流总结

文档输入:
  PDF/MD/TXT → extracted_text.txt (纯文本)

知识表示:
  extracted_text → Zep Cloud Graph (节点+边)
  Ontology (JSON) → Pydantic Models → Zep Ontology

智能体表示:
  Graph Entities → OasisAgentProfile → reddit_profiles.json / twitter_profiles.csv

仿真配置:
  Document + Requirement + Entities → simulation_config.json

仿真输出:
  OASIS 子进程 → actions.jsonl (JSONL 行为日志)
  可选: actions → ZepGraphMemoryUpdater → Zep Graph 更新

报告输出:
  Zep Graph + actions.jsonl → ReportAgent → full_report.md

交互数据:
  用户消息 → LLM/IPC → 回答文本

第十五部分:关键设计模式与原理

15.1 异步任务模式

MiroFish 中多个长耗时操作(图谱构建、仿真准备、报告生成)采用统一的异步任务模式:

1. API 接收请求 → 创建 Task → 返回 task_id(立即响应)
2. 启动后台 daemon 线程执行实际工作
3. 工作线程通过 TaskManager.update_task() 更新进度
4. 前端定时轮询 GET /api/graph/task/{task_id}
5. 任务完成/失败 → 前端获取最终结果

为什么使用 daemon 线程而非异步框架:

  • Flask 本身是同步框架,使用 daemon 线程是最简单的异步方案
  • 避免引入 Celery 等重量级任务队列的复杂性
  • daemon 线程在主进程退出时自动终止,不会留下孤儿进程

15.2 进度回调模式

所有长操作接受 progress_callback 函数参数:

def progress_callback(stage, progress, message, **kwargs):
    """
    stage: 当前阶段名称
    progress: 0-100 百分比
    message: 人类可读的进度描述
    """
    task_manager.update_task(task_id, progress=progress, message=message)

这种设计实现了业务逻辑与进度报告的解耦:

  • Service 层不需要知道 Task 的存在
  • 进度更新的具体行为由调用者决定
  • 相同的 Service 可以在有/无进度回调的情况下工作

15.3 文件系统 IPC 设计

这是 MiroFish 最独特的设计之一。选择文件系统 IPC 而非 Socket/RPC 的原因:

  1. 跨平台兼容: 文件系统操作在 Windows/Linux/macOS 上行为一致
  2. 进程隔离: OASIS 仿真作为独立子进程运行,不依赖 Flask 进程空间
  3. 调试友好: JSON 文件可以直接查看,方便排查问题
  4. 容错性强: 即使一方崩溃,文件仍然存在,另一方可以检测到

权衡: 轮询开销(每 0.5 秒检查文件系统)是其代价,但对于低频命令(如访谈),这完全可以接受。

15.4 状态机模式

项目(Project)和仿真(Simulation)都使用显式状态机管理生命周期:

项目状态机:

CREATED → ONTOLOGY_GENERATED → GRAPH_BUILDING → GRAPH_COMPLETED
                                      ↓
                                    FAILED

仿真状态机:

CREATED → PREPARING → READY → RUNNING → COMPLETED
               ↓                  ↓        ↓
             FAILED             PAUSED   STOPPED
                                  ↓
                                FAILED

每个状态转换都在 API 层进行验证,防止非法操作(如在 CREATED 状态直接启动仿真)。

15.5 重试与容错

系统在多个层面实现了重试机制:

层面 实现方式 参数
LLM 调用 温度递减重试 0.7→0.6→0.5,3 次
Zep API 指数退避重试装饰器 初始 2s,因子 2,最多 3 次
HTTP 请求(前端) requestWithRetry 1s→2s→4s,最多 3 次
Zep 分页 每页独立重试 初始 2s,最多 3 次
图谱记忆更新 批次级重试 初始 2s,最多 3 次
JSON 解析 修复截断 + 重试 闭合括号,多种策略

LLM 温度递减策略: 每次重试降低 temperature,使输出更确定性,减少格式错误概率。

15.6 单例模式(线程安全)

TaskManager 使用双重检查锁定(Double-Checked Locking)实现线程安全的单例:

class TaskManager:
    _instance = None
    _instance_lock = threading.Lock()

    @classmethod
    def get_instance(cls):
        if cls._instance is None:             # 第一次检查(无锁,快速路径)
            with cls._instance_lock:           # 加锁
                if cls._instance is None:      # 第二次检查(防止竞态条件)
                    cls._instance = TaskManager()
        return cls._instance

所有任务操作通过 _task_lock 保护内部字典,确保并发请求不会破坏数据。

15.7 动态模型生成

OntologyGenerator 生成的本体论需要在运行时转换为 Pydantic 模型,以满足 Zep SDK 的类型要求。这是通过 Pydantic 的 create_model 函数实现的:

from pydantic import create_model, Field

# 运行时动态创建
StudentModel = create_model(
    'Student',
    major=(Optional[str], Field(default=None, description="专业方向")),
    grade=(Optional[str], Field(default=None, description="年级"))
)

挑战: 保留字段名冲突(Zep SDK 使用 name, uuid 等字段名,而本体论可能也定义这些属性)。解决方案是添加 entity_ 前缀。

15.8 ReACT 推理模式

ReportAgent 使用 ReACT(Reasoning and Acting)模式生成报告:

循环开始
  │
  ├─ Reasoning(思考):
  │   "我需要分析政策对学生群体的影响。
  │    应该先搜索学生相关的实体和关系。"
  │
  ├─ Acting(行动):
  │   调用 insight_forge(query="政策对学生的影响")
  │
  ├─ Observation(观察):
  │   "找到 15 条相关事实,包括学生的发帖内容和关系变化"
  │
  ├─ Reasoning:
  │   "信息比较充分了,但缺少第一人称视角"
  │
  ├─ Acting:
  │   调用 interview_agents(agent_type="Student", question="...")
  │
  ├─ Observation:
  │   "获取了 3 位学生智能体的访谈回复"
  │
  └─ 判断信息是否充分 → 是 → 撰写章节内容
                       → 否 → 继续循环(最多 5 次工具调用)

15.9 时间模拟机制

OASIS 仿真使用离散时间步进(Discrete Time Stepping)模拟连续时间流逝:

现实世界: 72 小时 = 3 天
模拟参数: minutes_per_round = 60
总轮数: 72 × 60 / 60 = 72 轮

每轮:
  模拟时间 = round_num × 60 / 60 = round_num 小时
  时段 = 模拟时间 % 24

  例: 第 20 轮 = 第 20 小时 = 晚上 8 点(高峰时段)
      → 活跃智能体数 × 1.5 倍

中国时区对齐: 活动模式按北京时间设计,高峰在 19-22 点(社交媒体黄金时段),低谷在 0-5 点(深夜)。

15.10 图谱记忆演化

传统仿真是"只读"知识库 → 生成行为。MiroFish 创新地实现了"读写"闭环:

初始知识图谱
     │
     ├─ 读取 → 生成智能体人设 → 驱动行为
     │
     └─ 写回 ← 行为自然语言化 ← 智能体行为
           │
           └─ 更新后的图谱用于:
              - 报告生成时搜索仿真中的事件
              - 深度交互时引用仿真中的行为
              - 后续仿真的知识基础

15.11 前端双面板架构

MainView 等容器视图使用三模式双面板设计:

graph 模式:     [GraphPanel 100%] [Steps 隐藏]
split 模式:     [GraphPanel 50%]  [Steps 50%]
workbench 模式: [GraphPanel 隐藏] [Steps 100%]

面板切换使用 CSS transform + transition 实现平滑动画:

.left-panel {
    transition: width 0.4s cubic-bezier(0.4, 0, 0.2, 1);
}

15.12 安全设计

  1. 文件上传安全: werkzeug.utils.secure_filename() 防止路径遍历
  2. 文件类型限制: 仅允许 PDF/MD/TXT/Markdown
  3. 文件大小限制: 50MB (MAX_CONTENT_LENGTH)
  4. 编码安全: 所有文件操作使用 UTF-8,Windows 兼容处理
  5. 进程隔离: OASIS 子进程使用 start_new_session=True 创建独立进程组
  6. 敏感信息: API Key 通过环境变量配置,不硬编码在代码中

附录 A:API 完整清单

方法 路径 功能
GET /health 健康检查
POST /api/graph/ontology/generate 生成本体论
POST /api/graph/build 构建知识图谱
GET /api/graph/task/{id} 查询任务状态
GET /api/graph/tasks 列出所有任务
GET /api/graph/data/{id} 获取图谱数据
DELETE /api/graph/delete/{id} 删除图谱
GET /api/graph/project/{id} 获取项目
GET /api/graph/project/list 列出项目
DELETE /api/graph/project/{id} 删除项目
POST /api/graph/project/{id}/reset 重置项目
GET /api/simulation/entities/{gid} 获取实体
GET /api/simulation/entities/{gid}/{uuid} 单个实体
GET /api/simulation/entities/{gid}/by-type/{t} 按类型获取
POST /api/simulation/create 创建仿真
POST /api/simulation/prepare 准备仿真
POST /api/simulation/prepare/status 准备进度
GET /api/simulation/{id} 获取仿真状态
GET /api/simulation/list 列出仿真
GET /api/simulation/history 历史记录
GET /api/simulation/{id}/profiles 智能体人设
GET /api/simulation/{id}/profiles/realtime 实时人设
GET /api/simulation/{id}/config 仿真配置
GET /api/simulation/{id}/config/realtime 实时配置
POST /api/simulation/start 启动仿真
POST /api/simulation/stop 停止仿真
GET /api/simulation/{id}/run-status 运行状态
GET /api/simulation/{id}/actions 行为日志
POST /api/simulation/generate-profiles 生成人设
POST /api/report/generate 生成报告
POST /api/report/generate/status 生成进度
GET /api/report/{id} 获取报告
GET /api/report/by-simulation/{id} 按仿真获取
GET /api/report/list 列出报告
GET /api/report/{id}/download 下载报告
POST /api/report/chat 报告对话
GET /api/report/{id}/progress 报告进度
GET /api/report/{id}/sections 报告章节
GET /api/report/{id}/section/{n} 单个章节
GET /api/report/{id}/agent-log Agent 日志
GET /api/report/{id}/agent-log/stream 日志流
GET /api/report/{id}/console-log 控制台日志
GET /api/report/check/{sim_id} 报告检查

附录 B:数据结构参考

B.1 simulation_config.json 完整结构

{
    "simulation_id": "sim_xxxxx",
    "project_id": "proj_xxxxx",
    "graph_id": "mirofish_xxxxx",
    "simulation_requirement": "用户输入的仿真需求",
    "created_at": "2025-01-01T00:00:00",

    "time_config": {
        "total_simulation_hours": 72,
        "minutes_per_round": 60,
        "agents_per_hour_min": 3,
        "agents_per_hour_max": 9,
        "peak_hours": [19, 20, 21, 22],
        "off_peak_hours": [0, 1, 2, 3, 4, 5],
        "morning_hours": [6, 7, 8],
        "work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
        "peak_activity_multiplier": 1.5,
        "off_peak_activity_multiplier": 0.05,
        "morning_activity_multiplier": 0.4,
        "work_activity_multiplier": 0.7,
        "reasoning": "LLM 的推理解释..."
    },

    "agent_configs": [
        {
            "agent_id": 0,
            "entity_uuid": "xxx",
            "entity_name": "张三",
            "entity_type": "Student",
            "activity_level": 0.8,
            "posts_per_hour": 0.6,
            "comments_per_hour": 1.2,
            "active_hours": [8, 9, 10, 12, 13, 14, 19, 20, 21, 22, 23],
            "response_delay_min": 5,
            "response_delay_max": 30,
            "sentiment_bias": "slightly_negative",
            "stance": "反对方",
            "influence_weight": 0.8
        }
    ],

    "event_config": {
        "hot_topics": ["政策改革", "学生权益", "校园生活"],
        "narrative_direction": "围绕政策对学生群体的影响展开讨论",
        "initial_posts": [
            {
                "content": "刚看到新政策的公告,大家怎么看?",
                "poster_type": "Student",
                "poster_agent_id": 0,
                "platform": "both"
            }
        ]
    },

    "platform_config": {
        "twitter": {
            "platform": "twitter",
            "recency_weight": 0.4,
            "popularity_weight": 0.3,
            "relevance_weight": 0.3,
            "viral_threshold": 0.7,
            "echo_chamber_strength": 0.3
        },
        "reddit": {
            "platform": "reddit",
            "recency_weight": 0.3,
            "popularity_weight": 0.4,
            "relevance_weight": 0.3,
            "viral_threshold": 0.6,
            "echo_chamber_strength": 0.4
        }
    },

    "llm_config": {
        "api_key": "***",
        "base_url": "https://...",
        "model_name": "qwen-plus"
    }
}

B.2 Reddit Agent Profile 完整结构

{
    "user_id": 0,
    "realname": "张三",
    "username": "zhang_san_123",
    "bio": "北京大学计算机科学与技术学院大四学生,热爱开源技术和AI研究",
    "persona": "张三是一名北京大学计算机系的大四学生,来自湖北武汉。他性格内向但思维敏捷,是INTJ类型的人。在校期间参与了多个开源项目,对人工智能和机器学习有浓厚兴趣。他关注科技新闻和政策动态,倾向于通过数据和逻辑来分析问题。在社交媒体上,他通常会发表有深度的技术观点和对社会事件的理性分析。面对争议话题,他倾向于收集多方信息后再表态,但一旦形成观点则会坚定表达。",
    "age": 22,
    "gender": "male",
    "mbti": "INTJ",
    "country": "China",
    "profession": "Student",
    "interested_topics": ["人工智能", "开源技术", "政策分析", "校园生活"],
    "source_entity_uuid": "xxxx-xxxx",
    "source_entity_type": "Student"
}

附录 C:许可证

MiroFish 使用 AGPL-3.0 开源许可证。


文档完

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐