预测万物智能Agent-MiroFish源码分析
MiroFish 项目完整分析文档
项目仓库: https://github.com/666ghj/MiroFish
本文档目标: 提供足够详细的技术分析,使读者仅凭本文档即可精确复现整个 MiroFish 项目
目录
- 第一部分:项目总览
- 第二部分:系统架构设计
- 第三部分:技术栈与依赖
- 第四部分:后端核心服务详解
- 第五部分:后端API接口详解
- 第六部分:后端数据模型
- 第七部分:后端工具模块
- 第八部分:仿真脚本详解
- 第九部分:前端架构与实现
- 第十部分:前端视图组件详解
- 第十一部分:前端步骤组件详解
- 第十二部分:前端子组件详解
- 第十三部分:部署与运维
- 第十四部分:完整工作流程
- 第十五部分:关键设计模式与原理
第一部分:项目总览
1.1 项目定位
MiroFish 是一款基于多智能体(Multi-Agent)技术的新一代 AI 预测引擎,由盛大集团战略支持和孵化。其核心理念是:通过提取现实世界的种子信息(如突发新闻、政策草案、金融信号、小说文本等),自动构建出高保真的平行数字世界。在此数字世界中,成千上万个具备独立人格、长期记忆与行为逻辑的智能体(Agent)进行自由交互与社会演化,从而实现对未来走向的精准推演。
项目口号:“让未来在数字沙盘中预演,助决策在百战模拟后胜出”
1.2 核心能力
| 能力维度 | 说明 |
|---|---|
| 种子输入 | 支持 PDF、Markdown、TXT 格式的文档上传,自动提取文本内容 |
| 本体论生成 | 基于 LLM 自动分析文档,生成实体类型和关系类型定义(Ontology) |
| 知识图谱构建 | 利用 Zep Cloud 的 GraphRAG 能力,从文档构建语义知识图谱 |
| 智能体人设生成 | 将知识图谱中的实体转化为具有独立人格的 OASIS 仿真智能体 |
| 双平台仿真 | 同时在 Twitter 和 Reddit 两个模拟社交平台上运行多智能体交互 |
| 动态记忆更新 | 仿真过程中智能体的行为实时回写到知识图谱,实现记忆演化 |
| AI 报告生成 | 基于 ReACT 推理模式,自动生成详尽的预测分析报告 |
| 深度交互 | 支持与仿真世界中的任意智能体或报告Agent进行自然语言对话 |
1.3 应用场景
- 宏观层面:政策推演、舆情预测、公关危机模拟、金融市场预测
- 微观层面:小说结局推演、创意验证、社交网络行为模拟
- 已展示案例:武汉大学舆情推演预测、《红楼梦》失传结局推演预测
1.4 五步工作流
MiroFish 的完整工作流程分为五个步骤:
步骤1: 图谱构建 → 步骤2: 环境搭建 → 步骤3: 开始模拟 → 步骤4: 报告生成 → 步骤5: 深度互动
- 图谱构建(Graph Build):上传种子文档 → LLM 生成本体论 → Zep 构建知识图谱
- 环境搭建(Environment Setup):读取实体 → 生成智能体人设 → LLM 生成仿真配置参数
- 开始模拟(Simulation):启动 OASIS 仿真引擎 → 双平台并行运行 → 实时监控
- 报告生成(Report):ReportAgent 使用工具集 → 深度分析仿真结果 → 生成 Markdown 报告
- 深度互动(Interaction):与 ReportAgent 对话 → 与仿真世界中的智能体对话 → 批量调研
1.5 项目文件结构
MiroFish/
├── .env.example # 环境变量模板
├── .github/workflows/
│ └── docker-image.yml # CI/CD 流水线
├── Dockerfile # Docker 构建文件
├── docker-compose.yml # Docker Compose 编排
├── package.json # 根 Node.js 配置(并发启动前后端)
├── README.md / README-EN.md # 项目说明文档
├── backend/
│ ├── run.py # 后端入口文件
│ ├── requirements.txt # Python 依赖
│ ├── pyproject.toml # Python 项目配置
│ ├── app/
│ │ ├── __init__.py # Flask 应用工厂
│ │ ├── config.py # 配置管理
│ │ ├── api/ # API 路由层
│ │ │ ├── __init__.py # Blueprint 注册
│ │ │ ├── graph.py # 图谱相关 API
│ │ │ ├── simulation.py # 仿真相关 API
│ │ │ └── report.py # 报告相关 API
│ │ ├── models/ # 数据模型
│ │ │ ├── project.py # 项目状态管理
│ │ │ └── task.py # 异步任务管理
│ │ ├── services/ # 核心业务逻辑
│ │ │ ├── text_processor.py # 文本处理
│ │ │ ├── ontology_generator.py # 本体论生成
│ │ │ ├── graph_builder.py # 图谱构建
│ │ │ ├── zep_entity_reader.py # 实体读取
│ │ │ ├── oasis_profile_generator.py # 人设生成
│ │ │ ├── simulation_config_generator.py # 仿真配置生成
│ │ │ ├── simulation_manager.py # 仿真管理
│ │ │ ├── simulation_runner.py # 仿真执行
│ │ │ ├── simulation_ipc.py # 进程间通信
│ │ │ ├── zep_graph_memory_updater.py # 图谱记忆更新
│ │ │ ├── report_agent.py # 报告Agent
│ │ │ └── zep_tools.py # Zep 工具服务
│ │ └── utils/ # 工具模块
│ │ ├── file_parser.py # 文件解析
│ │ ├── llm_client.py # LLM 通信
│ │ ├── logger.py # 日志系统
│ │ ├── retry.py # 重试机制
│ │ └── zep_paging.py # Zep 分页工具
│ └── scripts/ # 仿真运行脚本
│ ├── run_parallel_simulation.py # 并行仿真
│ ├── run_twitter_simulation.py # Twitter 仿真
│ ├── run_reddit_simulation.py # Reddit 仿真
│ ├── action_logger.py # 行为日志
│ └── test_profile_format.py # 格式测试
├── frontend/
│ ├── index.html # HTML 入口
│ ├── package.json # 前端依赖
│ ├── vite.config.js # Vite 构建配置
│ └── src/
│ ├── App.vue # 根组件
│ ├── main.js # Vue 入口
│ ├── router/index.js # 路由配置
│ ├── store/pendingUpload.js # 状态管理
│ ├── api/ # API 调用层
│ │ ├── index.js # Axios 基础配置
│ │ ├── graph.js # 图谱 API
│ │ ├── simulation.js # 仿真 API
│ │ └── report.js # 报告 API
│ ├── views/ # 页面视图
│ │ ├── Home.vue # 首页
│ │ ├── MainView.vue # 主处理视图
│ │ ├── Process.vue # 处理流程(备用)
│ │ ├── SimulationView.vue # 仿真视图
│ │ ├── SimulationRunView.vue # 仿真运行视图
│ │ ├── ReportView.vue # 报告视图
│ │ └── InteractionView.vue # 交互视图
│ └── components/ # 组件
│ ├── GraphPanel.vue # 图谱可视化
│ ├── HistoryDatabase.vue # 历史记录
│ ├── Step1GraphBuild.vue # 步骤1组件
│ ├── Step2EnvSetup.vue # 步骤2组件
│ ├── Step3Simulation.vue # 步骤3组件
│ ├── Step4Report.vue # 步骤4组件
│ └── Step5Interaction.vue # 步骤5组件
└── static/image/ # 静态资源
第二部分:系统架构设计
2.1 整体架构
MiroFish 采用经典的前后端分离架构,结合外部服务和仿真引擎:
┌─────────────────────────────────────────────────────────────────────┐
│ 用户浏览器(Frontend) │
│ Vue 3 + Vite + D3.js + Axios │
│ 端口: 3000 │
└──────────────────────────────┬──────────────────────────────────────┘
│ HTTP REST API
▼
┌─────────────────────────────────────────────────────────────────────┐
│ Flask 后端(Backend) │
│ 端口: 5001 │
│ ┌────────────┐ ┌─────────────┐ ┌────────────┐ │
│ │ Graph API │ │Simulation API│ │ Report API │ │
│ └─────┬──────┘ └──────┬──────┘ └─────┬──────┘ │
│ │ │ │ │
│ ┌─────┴────────────────┴────────────────┴──────┐ │
│ │ Services 层(核心业务逻辑) │ │
│ │ OntologyGenerator | GraphBuilder | ProfileGen │ │
│ │ SimulationManager | SimulationRunner │ │
│ │ ReportAgent | ZepTools │ │
│ └─────┬────────────────┬────────────────┬──────┘ │
│ │ │ │ │
│ ┌─────┴──┐ ┌───────┴───┐ ┌───────┴───────┐ │
│ │ Models │ │ Utils │ │ Scripts │ │
│ └────────┘ └───────────┘ └───────────────┘ │
└────────┬─────────────────┬──────────────────┬───────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐
│ Zep Cloud │ │ LLM API │ │ OASIS 仿真引擎 │
│ (知识图谱存储) │ │(OpenAI 格式) │ │ (CAMEL-AI 驱动) │
│ GraphRAG │ │ Qwen/GPT等 │ │ Twitter + Reddit │
└──────────────┘ └──────────────┘ └──────────────────────┘
2.2 数据流架构
整个系统的数据流遵循清晰的管道式架构:
文档输入 ──→ 文本提取 ──→ 本体论生成 ──→ 知识图谱构建
│
▼
报告生成 ←── 仿真执行 ←── 仿真配置生成 ←── 实体/人设生成
│
▼
深度交互
详细数据流:
1. 文档输入阶段:
PDF/MD/TXT 文件 → FileParser 提取文本 → TextProcessor 预处理 → 存储
2. 图谱构建阶段:
预处理文本 → OntologyGenerator(LLM) → 实体/关系类型定义
文本分块 → GraphBuilderService → Zep API → 知识图谱
3. 仿真准备阶段:
知识图谱 → ZepEntityReader → 实体列表
实体列表 → OasisProfileGenerator(LLM) → 智能体人设
实体+文档+需求 → SimulationConfigGenerator(LLM) → 仿真参数
4. 仿真执行阶段:
仿真参数+人设 → OASIS引擎(子进程) → actions.jsonl 日志
actions.jsonl → SimulationRunner(监控线程) → 状态更新
Agent行为 → ZepGraphMemoryUpdater → 图谱记忆更新
5. 报告生成阶段:
图谱+仿真数据 → ReportAgent(ReACT模式) → 工具调用+LLM推理
搜索结果+分析 → 分节生成 → Markdown 报告
6. 交互阶段:
用户消息 → ReportAgent.chat() → 工具辅助回答
用户消息 → IPC → OASIS Agent → 智能体回复
2.3 进程架构
MiroFish 运行时涉及多个进程和线程:
Flask 主进程 (PID: main)
├── 请求处理线程(Flask threaded=True)
├── 图谱构建后台线程(daemon thread per task)
├── 仿真准备后台线程(daemon thread per simulation)
├── 报告生成后台线程(daemon thread per report)
├── 仿真监控线程(per running simulation)
└── 图谱记忆更新工作线程(per simulation with memory enabled)
OASIS 仿真子进程 (独立进程组)
├── run_parallel_simulation.py(主仿真进程)
├── Twitter 环境(内部线程)
└── Reddit 环境(内部线程)
Flask ←→ OASIS 通信方式: 文件系统 IPC
ipc_commands/ → Flask 写入命令 JSON
ipc_responses/ → OASIS 写入响应 JSON
actions.jsonl → OASIS 写入,Flask 监控线程读取
env_status.json → OASIS 写入状态,Flask 查询
2.4 存储架构
所有数据采用文件系统存储,无需外部数据库:
backend/uploads/
├── projects/
│ └── proj_{uuid}/
│ ├── project.json # 项目元数据
│ ├── extracted_text.txt # 提取的文档文本
│ └── files/
│ ├── {safe_filename}.pdf # 上传的原始文件
│ └── {safe_filename}.md
├── simulations/
│ └── sim_{uuid}/
│ ├── state.json # 仿真状态
│ ├── simulation_config.json # LLM 生成的仿真配置
│ ├── reddit_profiles.json # Reddit 平台智能体人设
│ ├── twitter_profiles.csv # Twitter 平台智能体人设
│ ├── run_state.json # 执行状态
│ ├── simulation.log # 主日志
│ ├── twitter/
│ │ └── actions.jsonl # Twitter 行为日志
│ ├── reddit/
│ │ └── actions.jsonl # Reddit 行为日志
│ └── ipc_commands/ & ipc_responses/ # IPC 通信文件
└── reports/
└── report_{uuid}/
├── meta.json # 报告元数据
├── full_report.md # 完整 Markdown 报告
├── section_01.md ~ section_N.md # 各节内容
├── agent_log.jsonl # Agent 执行日志
└── console_log.txt # 控制台输出
2.5 外部依赖服务
| 服务 | 用途 | 协议 | 必需性 |
|---|---|---|---|
| Zep Cloud | 知识图谱存储、GraphRAG 搜索、实体/关系管理 | REST API | 必需 |
| LLM API | 本体论生成、人设生成、配置生成、报告生成、对话 | OpenAI SDK 格式 | 必需 |
| OASIS (camel-oasis) | 社交媒体仿真引擎,驱动智能体行为 | Python 库 | 必需 |
2.6 通信协议
- 前端 ↔ 后端: HTTP REST API(JSON 格式),前端通过 Vite 代理转发到 5001 端口
- 后端 ↔ LLM: OpenAI SDK 兼容的 HTTP API,支持任意 OpenAI 格式的 LLM 服务
- 后端 ↔ Zep: Zep Python SDK(封装 REST API),使用 API Key 认证
- 后端 ↔ OASIS: 文件系统 IPC(进程间通信),通过 JSON 文件交换命令和响应
- OASIS 内部: CAMEL-AI 框架管理智能体间的消息传递和行为决策
第三部分:技术栈与依赖
3.1 后端技术栈
| 技术/库 | 版本要求 | 用途 |
|---|---|---|
| Python | ≥3.11, ≤3.12 | 后端运行时 |
| Flask | 3.0.0+ | Web 框架 |
| Flask-CORS | 6.0.0+ | 跨域资源共享 |
| OpenAI SDK | 1.0.0+ | LLM API 统一接口 |
| Zep-Cloud | 3.13.0 | Zep 知识图谱 SDK |
| CAMEL-OASIS | 0.2.5 | OASIS 社交媒体仿真引擎 |
| CAMEL-AI | 0.2.78 | CAMEL 多智能体框架 |
| PyMuPDF (fitz) | 1.24.0+ | PDF 文档解析 |
| Pydantic | 2.0.0+ | 数据验证和序列化 |
| python-dotenv | 1.0.0+ | 环境变量管理 |
| charset-normalizer | 3.0.0+ | 字符编码检测 |
| chardet | 5.0.0+ | 备用编码检测 |
| uv | 最新版 | Python 包管理器(替代 pip) |
3.2 前端技术栈
| 技术/库 | 版本 | 用途 |
|---|---|---|
| Vue 3 | 3.5.24 | 前端框架 |
| Vue Router | 4.6.3 | 路由管理 |
| Vite | 7.2.4 | 构建工具和开发服务器 |
| Axios | - | HTTP 请求库 |
| D3.js | - | 知识图谱力导向可视化 |
| Node.js | 18+ | 前端运行时 |
3.3 字体资源
前端使用以下 Google Fonts:
- Inter: 主要界面文字
- JetBrains Mono: 代码和终端风格显示
- Noto Sans SC: 中文字体
- Space Grotesk: 标题和强调文字
3.4 开发与构建工具
| 工具 | 用途 |
|---|---|
| concurrently | 同时启动前后端开发服务器 |
| npm | Node.js 包管理 |
| uv | Python 依赖管理和虚拟环境 |
| Docker / Docker Compose | 容器化部署 |
| GitHub Actions | CI/CD 自动构建 Docker 镜像 |
3.5 环境变量配置
基于 .env.example 的完整配置项:
# ========== 必需配置 ==========
# LLM API 配置(支持 OpenAI SDK 格式的任意 LLM API)
LLM_API_KEY=your_api_key # LLM API 密钥
LLM_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1 # API 基础 URL
LLM_MODEL_NAME=qwen-plus # 模型名称
# Zep Cloud 配置
ZEP_API_KEY=your_zep_api_key # Zep Cloud API 密钥
# ========== 可选配置 ==========
# Boost LLM(用于更高性能的 LLM 调用,如报告生成)
LLM_BOOST_API_KEY=...
LLM_BOOST_BASE_URL=...
LLM_BOOST_MODEL_NAME=...
# Flask 配置
FLASK_HOST=0.0.0.0 # 监听地址(默认 0.0.0.0)
FLASK_PORT=5001 # 监听端口(默认 5001)
FLASK_DEBUG=True # 调试模式(默认 True)
SECRET_KEY=mirofish-secret-key # Flask 密钥
# OASIS 仿真配置
OASIS_DEFAULT_MAX_ROUNDS=10 # 默认最大轮数
# Report Agent 配置
REPORT_AGENT_MAX_TOOL_CALLS=5 # 每节最大工具调用次数
REPORT_AGENT_MAX_REFLECTION_ROUNDS=2 # 最大反思轮数
REPORT_AGENT_TEMPERATURE=0.5 # LLM 温度参数
3.6 Config 类关键常量
以下是 app/config.py 中定义的重要常量:
MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 最大上传文件大小: 50MB
ALLOWED_EXTENSIONS = {'pdf', 'md', 'txt', 'markdown'} # 允许的文件类型
DEFAULT_CHUNK_SIZE = 500 # 文本分块大小: 500 字符
DEFAULT_CHUNK_OVERLAP = 50 # 分块重叠: 50 字符
# Twitter 平台可用动作
TWITTER_ACTIONS = [
"CREATE_POST", "LIKE_POST", "REPOST",
"FOLLOW", "DO_NOTHING", "QUOTE_POST"
]
# Reddit 平台可用动作
REDDIT_ACTIONS = [
"LIKE_POST", "DISLIKE_POST", "CREATE_POST", "CREATE_COMMENT",
"LIKE_COMMENT", "DISLIKE_COMMENT", "SEARCH_POSTS", "SEARCH_USER",
"TREND", "REFRESH", "DO_NOTHING", "FOLLOW", "MUTE"
]
第四部分:后端核心服务详解
后端 services/ 目录包含 11 个核心服务模块,它们构成了 MiroFish 的全部业务逻辑。以下逐一详细分析。
4.1 TextProcessor — 文本处理服务
文件: backend/app/services/text_processor.py
职责: 文档文本的提取、预处理和分块
类定义
class TextProcessor:
"""文本处理器,封装文件解析和文本操作"""
核心方法
| 方法 | 参数 | 返回值 | 功能 |
|---|---|---|---|
extract_from_files |
file_paths: List[str] |
str |
委托 FileParser 提取多文件文本 |
split_text |
text, chunk_size=500, overlap=50 |
List[str] |
按指定大小分块文本 |
preprocess_text |
text: str |
str |
规范化换行、去除连续空行、首尾去空 |
get_text_stats |
text: str |
dict |
返回字符数、行数、词数统计 |
文本分块算法
分块逻辑是整个图谱构建的基础,其核心策略:
- 按指定
chunk_size(默认 500 字符)切分文本 - 保留
overlap(默认 50 字符)的重叠区域确保上下文连续性 - 优先在句子边界处分割(句号、叹号、问号、换行符:
。!?.\n!\n?\n\n) - 如果在句子边界处找不到合适的切割点,则按字符位置硬切
文本: |<------------- chunk 1 ------------->|
|<--- overlap --->|<----------- chunk 2 ----------->|
设计原理
- 分块目的是将长文档拆成 Zep 可处理的小段,每段作为一个 “episode” 输入知识图谱
- 重叠区域确保实体和关系在块边界处不会丢失
- 500 字符的默认大小兼顾了信息完整性和 LLM 处理效率
4.2 OntologyGenerator — 本体论生成器
文件: backend/app/services/ontology_generator.py(约 454 行)
职责: 基于 LLM 分析上传文档,自动生成用于知识图谱构建的实体类型(Entity Types)和关系类型(Edge Types)
类定义
class OntologyGenerator:
def __init__(self, llm_client: Optional[LLMClient] = None)
核心方法
generate(document_texts, simulation_requirement, additional_context) -> Dict
这是本体论生成的主入口方法。
输入参数:
document_texts: List[str]— 提取的文档文本列表simulation_requirement: str— 用户描述的仿真需求additional_context: Optional[str]— 额外上下文信息
输出格式:
{
"entity_types": [
{
"name": "PascalCase 名称",
"description": "实体类型描述",
"attributes": [
{"name": "snake_case 属性名", "type": "text", "description": "属性描述"}
],
"examples": ["示例实体1", "示例实体2"]
}
],
"edge_types": [
{
"name": "UPPER_SNAKE_CASE 名称",
"description": "关系类型描述",
"source_targets": [
{"source": "EntityType1", "target": "EntityType2"}
],
"attributes": []
}
],
"analysis_summary": "中文分析摘要"
}
执行流程:
- 调用
_build_user_message()构建提示词,合并文档文本(最大 50K 字符)和仿真需求 - 调用 LLM,使用
ONTOLOGY_SYSTEM_PROMPT作为系统提示 - 解析 LLM 返回的 JSON 结果
- 调用
_validate_and_process()验证和补全
约束规则:
- 必须恰好 10 个实体类型(8 个场景特定 + 2 个兜底类型)
- 兜底类型为
Person和Organization,永远存在 - 关系类型 6-10 个
- 属性名不能使用保留名称:
name, uuid, group_id, created_at, summary - 实体类型名使用 PascalCase,关系类型名使用 UPPER_SNAKE_CASE
generate_python_code(ontology) -> str: 将本体论定义转换为 Python 类定义代码
System Prompt 设计原理
本体论系统提示词的核心指令:
- 要求 LLM 扮演社交媒体舆论仿真领域的本体论设计专家
- 强调实体必须是"能在社交媒体上发声的现实世界行为者"
- 要求生成的实体类型具有清晰的角色差异和行为特征差异
- 确保关系类型覆盖核心的社会关系网络
4.3 GraphBuilderService — 图谱构建服务
文件: backend/app/services/graph_builder.py(约 501 行)
职责: 利用 Zep Cloud API 构建知识图谱,管理图谱的创建、本体论设置、文本注入和查询
数据类
@dataclass
class GraphInfo:
graph_id: str
node_count: int
edge_count: int
entity_types: List[str]
类定义
class GraphBuilderService:
def __init__(self, api_key: Optional[str] = None)
# 初始化 Zep 客户端和 TaskManager
核心方法详解
build_graph_async(text, ontology, graph_name, chunk_size, chunk_overlap, batch_size) -> str
异步图谱构建入口,返回 task_id 供轮询。
进度推进:
0% → 创建任务
5% → 分块文本
10% → 创建 Zep 图谱
15% → 设置本体论
20% → 开始注入文本
60% → 文本注入完成
90% → Zep 处理完成
100% → 获取图谱信息完毕
_build_graph_worker(task_id, text, ontology, ...): 后台工作线程
- 创建图谱: 调用
create_graph(name)生成唯一 ID 格式mirofish_{uuid[:16]} - 设置本体论:
- 将本体论的实体类型和关系类型动态转换为 Pydantic 模型
- 处理保留字段名转换(如
name→entity_name) - 调用
client.graph.set_ontology(graph_ids=[graph_id], entities={...}, edges={...})
- 分块注入:
- 将文本分块后按批次(默认每批 3 个 chunk)发送到 Zep
- 每批之间间隔 1 秒(限流)
- 调用
client.graph.add_batch()返回 episode UUID 列表
- 等待处理:
- 轮询每个 episode 的
processed状态 - 每 3 秒检查一次,最长等待 600 秒
- 所有 episode 处理完毕后继续
- 轮询每个 episode 的
- 获取图谱信息: 使用分页工具获取所有节点和边,统计数量和类型
set_ontology(graph_id, ontology) — 动态 Pydantic 模型生成
这是关键的技术实现。由于 Zep SDK 要求以 Pydantic 模型定义本体论,而本体论是运行时由 LLM 动态生成的,因此需要在运行时动态创建 Pydantic 类:
# 伪代码示意
for entity_type in ontology["entity_types"]:
fields = {}
for attr in entity_type["attributes"]:
field_name = attr["name"]
if field_name in RESERVED_NAMES:
field_name = f"entity_{field_name}" # 避免保留字冲突
fields[field_name] = (Optional[str], Field(default=None, description=attr["description"]))
# 动态创建 Pydantic 模型类
EntityModel = create_model(entity_type["name"], **fields)
entities[entity_type["name"]] = EntityModel
get_graph_data(graph_id) -> Dict: 获取完整图谱结构
返回格式:
{
"nodes": [
{
"uuid": "...",
"name": "实体名",
"labels": ["Student", "Entity"],
"summary": "实体摘要",
"attributes": {"key": "value"},
"created_at": "2025-01-01T00:00:00"
}
],
"edges": [
{
"uuid": "...",
"name": "STUDIES_AT",
"fact": "关系描述事实",
"source_node_uuid": "...",
"target_node_uuid": "...",
"source_node_name": "张三",
"target_node_name": "北京大学"
}
]
}
4.4 ZepEntityReader — 实体读取服务
文件: backend/app/services/zep_entity_reader.py(约 438 行)
职责: 从 Zep 知识图谱中读取和过滤实体,丰富实体上下文信息
数据类
@dataclass
class EntityNode:
uuid: str
name: str
labels: List[str] # 实体标签,如 ["Student", "Entity"]
summary: str # 实体摘要
attributes: Dict # 实体属性
related_edges: List[Dict] # 关联的边
related_nodes: List[Dict] # 关联的节点
def get_entity_type(self) -> str:
"""获取实体类型(过滤掉 'Entity' 和 'Node' 默认标签)"""
@dataclass
class FilteredEntities:
entities: List[EntityNode]
entity_types: Set[str]
核心方法
filter_defined_entities(graph_id, defined_entity_types=None, enrich_with_edges=True) -> FilteredEntities
这是仿真准备阶段的关键方法:
- 获取所有节点: 调用
fetch_all_nodes()分页获取 - 过滤逻辑:
- 保留包含自定义标签(非 “Entity”/“Node”)的节点
- 如果指定了
defined_entity_types,仅保留匹配的类型 - 跳过仅有默认标签的节点
- 上下文丰富:
- 对每个实体调用
get_node_edges()获取所有关联边 - 收集所有关联节点的信息
- 将边和节点信息附加到
EntityNode对象
- 对每个实体调用
重试机制: 所有 Zep API 调用都使用 _call_with_retry() 封装,指数退避重试(初始延迟 2 秒,最多 3 次)。
4.5 OasisProfileGenerator — 智能体人设生成器
文件: backend/app/services/oasis_profile_generator.py(约 1000+ 行)
职责: 将知识图谱中的实体转化为 OASIS 仿真引擎可识别的智能体人设(Agent Profile)
数据类
@dataclass
class OasisAgentProfile:
# 核心字段
user_id: int # OASIS 用户 ID
user_name: str # 用户名(如 zhang_san_123)
name: str # 显示名
bio: str # 简介(约 200 字符)
persona: str # 人设描述(约 2000 字符,包含行为模式)
# 可选字段
age: Optional[int]
gender: Optional[str]
mbti: Optional[str] # 16 种 MBTI 人格类型
country: Optional[str]
profession: Optional[str]
interested_topics: Optional[List[str]]
# 元数据
source_entity_uuid: str # 来源实体 UUID
source_entity_type: str # 来源实体类型
created_at: str
def to_reddit_format(self) -> Dict # Reddit JSON 格式
def to_twitter_format(self) -> Dict # Twitter CSV 格式
实体类型分类
系统将实体分为两大类,采用不同的人设生成策略:
个体类实体 (INDIVIDUAL_ENTITY_TYPES):
["student", "alumni", "professor", "person", "publicfigure",
"expert", "faculty", "official", "journalist", "activist"]
群体/机构类实体 (GROUP_ENTITY_TYPES):
["university", "governmentagency", "organization", "ngo",
"mediaoutlet", "company", "institution", "group", "community"]
人设生成流程
generate_profile_from_entity(entity, user_id, use_llm=True) -> OasisAgentProfile
- 生成用户名: 小写化 + 下划线替换空格 + 去除特殊字符 + 随机 3 位数后缀
- 构建实体上下文 (
_build_entity_context):- 添加实体属性
- 添加关联边(关系信息)
- 添加关联节点信息
- 调用 Zep 搜索获取丰富上下文
- 事实去重
- LLM 生成(
_generate_profile_with_llm):- 根据实体类型选择个体/群体提示词
- 要求 JSON 格式输出
- 包含 bio(200字符)、persona(2000字符)、age、gender、mbti、country、profession、interested_topics
- 温度递减重试:0.7 → 0.6 → 0.5(3 次)
- JSON 修复机制:闭合未关闭的括号/方括号
- 规则兜底(
_generate_profile_rule_based):- 如果 LLM 生成失败,使用基于实体类型的规则生成默认人设
- 不同类型有不同的默认值(如学生 age=20, 教授 age=45)
generate_profiles_from_entities(entities, use_llm, progress_callback, parallel_count=5, ...) -> List[OasisAgentProfile]
批量生成核心方法:
- 使用
ThreadPoolExecutor并发生成(默认 5 个工作线程) - 预分配结果列表保持顺序
- 实时输出:每完成一个就写入文件(Reddit JSON 或 Twitter CSV)
- 进度回调:报告 current/total 和当前实体名
- 失败兜底:如果某个实体的 LLM 生成失败,创建基本人设
平台输出格式差异
Reddit (JSON):
{
"realname": "张三",
"username": "zhang_san_123",
"bio": "北京大学计算机系大四学生...",
"persona": "张三是一名对技术充满热情的...",
"age": 22,
"gender": "male",
"mbti": "INTJ",
"country": "China",
"profession": "Student",
"interested_topics": ["AI", "开源", "编程"]
}
Twitter (CSV):
user_id,user_name,name,bio,friend_count,follower_count,statuses_count,created_at
0,zhang_san_123,张三,北京大学计算机系...,50,100,200,2025-01-01
4.6 SimulationConfigGenerator — 仿真配置生成器
文件: backend/app/services/simulation_config_generator.py(约 987 行)
职责: 基于 LLM 生成完整的仿真运行参数配置
配置数据类
TimeSimulationConfig — 时间配置:
@dataclass
class TimeSimulationConfig:
total_simulation_hours: int = 72 # 仿真总时长(小时)
minutes_per_round: int = 60 # 每轮模拟分钟数
agents_per_hour_min: int # 每小时最少活跃智能体数
agents_per_hour_max: int # 每小时最多活跃智能体数
# 中国时区时段划分
peak_hours: List[int] = [19, 20, 21, 22] # 高峰时段
off_peak_hours: List[int] = [0, 1, 2, 3, 4, 5] # 低谷时段
morning_hours: List[int] = [6, 7, 8] # 早间时段
work_hours: List[int] = [9, 10, ..., 18] # 工作时段
# 活跃度乘数
peak_activity_multiplier: float = 1.5 # 高峰 1.5 倍
off_peak_activity_multiplier: float = 0.05 # 低谷 0.05 倍
morning_activity_multiplier: float = 0.4 # 早间 0.4 倍
work_activity_multiplier: float = 0.7 # 工作 0.7 倍
AgentActivityConfig — 单个智能体活动配置:
@dataclass
class AgentActivityConfig:
agent_id: int
entity_uuid: str
entity_name: str
entity_type: str
activity_level: float # 0.0-1.0,活跃概率
posts_per_hour: float # 每小时发帖数
comments_per_hour: float # 每小时评论数
active_hours: List[int] # 活跃时间段
response_delay_min: int # 最小响应延迟(秒)
response_delay_max: int # 最大响应延迟(秒)
sentiment_bias: str # 情感偏向
stance: str # 立场
influence_weight: float # 影响力权重
EventConfig — 事件配置:
@dataclass
class EventConfig:
initial_posts: List[Dict] # 初始种子帖子
scheduled_events: List[Dict] # 定时事件
hot_topics: List[str] # 热门话题
narrative_direction: str # 叙事方向
PlatformConfig — 平台配置:
@dataclass
class PlatformConfig:
platform: str # "twitter" 或 "reddit"
recency_weight: float # 时效性权重
popularity_weight: float # 热度权重
relevance_weight: float # 相关性权重
viral_threshold: float # 传播阈值
echo_chamber_strength: float # 回音室效应强度
配置生成流程
generate_config(simulation_id, project_id, graph_id, simulation_requirement, document_text, entities, enable_twitter, enable_reddit, progress_callback) -> SimulationParameters
分步骤生成,每步调用 LLM:
-
构建上下文 (step 1):
- 合并仿真需求、实体摘要(每类最多 20 个,每个摘要最多 300 字符)、文档文本
- 总长度限制在
MAX_CONTEXT_LENGTH = 50000字符
-
生成时间配置 (step 2):
- LLM 根据场景推理仿真时长、每轮时间、活跃度分布
- 验证:确保 agents_per_hour 不超过实际实体数量
- 失败回退到默认配置
-
生成事件配置 (step 3):
- LLM 生成热门话题、叙事方向、初始帖子
- 初始帖子包含 poster_type 字段(匹配实体类型)
-
分批生成智能体配置 (steps 4 ~ N):
- 每批 15 个智能体(
AGENTS_PER_BATCH = 15) - LLM 为每个智能体生成 activity_level、active_hours、stance 等
- 失败回退到规则生成
- 每批 15 个智能体(
-
分配初始帖子发布者 (step N+1):
- 将 EventConfig 中的初始帖子匹配到具体智能体
- 优先按 entity_type 匹配,找不到则按 influence_weight 排名选取
-
生成平台配置 (final step):
- 为启用的平台生成推荐算法权重参数
规则兜底配置
当 LLM 生成失败时,按实体类型使用预设配置:
| 实体类型 | activity_level | posts/h | active_hours | response_delay | influence_weight |
|---|---|---|---|---|---|
| University/Gov/NGO | 0.2 | 0.1 | 9-18 | 60-240s | 3.0 |
| MediaOutlet | 0.5 | 0.8 | 7-24 | 5-30s | 2.5 |
| Professor/Expert | 0.4 | 0.3 | 8-22 | 30-120s | 2.0 |
| Student | 0.8 | 0.6 | 混合时段 | 5-30s | 0.8 |
| 默认 | 0.7 | 0.4 | 8-23 | 10-60s | 1.0 |
4.7 SimulationManager — 仿真管理器
文件: backend/app/services/simulation_manager.py(约 529 行)
职责: 编排仿真准备流水线(读取实体 → 生成人设 → 生成配置 → 准备文件)
状态机
class SimulationStatus(Enum):
CREATED = "created"
PREPARING = "preparing"
READY = "ready"
RUNNING = "running"
PAUSED = "paused"
STOPPED = "stopped"
COMPLETED = "completed"
FAILED = "failed"
状态转换:
CREATED → PREPARING → READY → RUNNING → COMPLETED
↘ ↘ ↘
FAILED PAUSED STOPPED/FAILED
仿真状态数据
@dataclass
class SimulationState:
simulation_id: str # 格式: sim_{uuid[:12]}
project_id: str
graph_id: str
status: SimulationStatus
enable_twitter: bool
enable_reddit: bool
entities_count: int
profiles_count: int
entity_types: List[str]
config_generated: bool
current_round: int
created_at: str
updated_at: str
核心方法
prepare_simulation(simulation_id, simulation_requirement, document_text, ...) -> SimulationState
这是仿真准备的核心编排方法,包含三个主要阶段:
阶段 1: 读取实体 (0-100%)
reader = ZepEntityReader(zep_api_key)
filtered = reader.filter_defined_entities(
graph_id=state.graph_id,
defined_entity_types=defined_entity_types,
enrich_with_edges=True
)
state.entities_count = len(filtered.entities)
state.entity_types = list(filtered.entity_types)
阶段 2: 生成人设 (0-100%)
generator = OasisProfileGenerator(
api_key=llm_api_key, base_url=llm_base_url,
model_name=llm_model, zep_api_key=zep_api_key,
graph_id=state.graph_id
)
profiles = generator.generate_profiles_from_entities(
entities=filtered.entities,
use_llm=use_llm_for_profiles,
parallel_count=parallel_profile_count, # 默认 3
realtime_output_path=sim_dir,
output_platform="reddit" # 或 "twitter"
)
# 保存到 reddit_profiles.json 和/或 twitter_profiles.csv
阶段 3: 生成配置 (0-100%)
config_gen = SimulationConfigGenerator(
api_key=llm_api_key, base_url=llm_base_url,
model_name=llm_model
)
config = config_gen.generate_config(
simulation_id=state.simulation_id,
simulation_requirement=simulation_requirement,
document_text=document_text,
entities=filtered.entities,
enable_twitter=state.enable_twitter,
enable_reddit=state.enable_reddit
)
# 保存到 simulation_config.json
最终将状态设为 READY。
4.8 SimulationRunner — 仿真执行器
文件: backend/app/services/simulation_runner.py(约 1000+ 行)
职责: 启动 OASIS 仿真子进程、监控运行状态、解析行为日志
状态定义
class RunnerStatus(Enum):
IDLE = "idle"
STARTING = "starting"
RUNNING = "running"
PAUSED = "paused"
STOPPING = "stopping"
STOPPED = "stopped"
COMPLETED = "completed"
FAILED = "failed"
运行时状态
@dataclass
class SimulationRunState:
simulation_id: str
runner_status: RunnerStatus
current_round: int
total_rounds: int
simulated_hours: float
total_simulation_hours: float
# 平台独立状态
twitter_current_round: int
reddit_current_round: int
twitter_running: bool
reddit_running: bool
twitter_completed: bool
reddit_completed: bool
# 行为追踪
recent_actions: List[AgentAction] # 最新 50 条
rounds: List[RoundSummary]
核心方法
start_simulation(simulation_id, platform="parallel", max_rounds=None, enable_graph_memory_update=False, graph_id=None) -> SimulationRunState
- 验证: 检查仿真是否已在运行
- 加载配置: 读取
simulation_config.json获取total_rounds - 选择脚本:
platform="parallel"→run_parallel_simulation.pyplatform="twitter"→run_twitter_simulation.pyplatform="reddit"→run_reddit_simulation.py
- 启动子进程:
cmd = ["python", script_path, "--config", config_file] if max_rounds: cmd.extend(["--max-rounds", str(max_rounds)]) process = subprocess.Popen( cmd, start_new_session=True, # 创建新进程组,便于终止 stdout=log_file, stderr=log_file, env={**os.environ, "PYTHONIOENCODING": "utf-8"} ) - 记忆更新(可选): 创建
ZepGraphMemoryUpdater实例 - 启动监控线程:
_monitor_simulation()持续解析 actions.jsonl
_monitor_simulation(simulation_id) — 监控线程
while process.poll() is None: # 进程仍在运行
# 读取 Twitter actions.jsonl
twitter_pos = _read_action_log(twitter_log, twitter_pos, state, "twitter")
# 读取 Reddit actions.jsonl
reddit_pos = _read_action_log(reddit_log, reddit_pos, state, "reddit")
_save_run_state(state)
time.sleep(1) # 每秒检查一次
_read_action_log(log_path, position, state, platform) -> int
增量读取 JSONL 日志的核心逻辑:
- 从上次读取位置
position继续 - 逐行解析 JSON 对象
- 区分事件类型:
simulation_end: 标记平台完成round_end: 更新轮次和模拟时间- 其他: 解析为
AgentAction对象,加入状态
- 如果启用图谱记忆更新,将行为发送给
ZepGraphMemoryUpdater - 返回新的文件位置
进程终止机制:
def _terminate_process(process, simulation_id, timeout=10):
if IS_WINDOWS:
# Windows: 使用 taskkill /T /F 终止进程树
subprocess.call(['taskkill', '/T', '/F', '/PID', str(process.pid)])
else:
# Unix: 使用 os.killpg 发送 SIGTERM 到整个进程组
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
# 等待 timeout 秒,若未退出则 SIGKILL 强制终止
4.9 SimulationIPC — 进程间通信
文件: backend/app/services/simulation_ipc.py(约 395 行)
职责: Flask 后端与 OASIS 仿真子进程之间的命令/响应通信
通信协议
命令类型 (CommandType):
INTERVIEW — 单个智能体访谈
BATCH_INTERVIEW — 批量访谈
CLOSE_ENV — 关闭仿真环境
命令状态 (CommandStatus):
PENDING — 已创建,等待处理
PROCESSING — 正在处理
COMPLETED — 处理完成
FAILED — 处理失败
文件系统 IPC 原理
这是 MiroFish 的核心设计之一。由于 OASIS 仿真以独立子进程运行,Flask 无法直接调用其内部方法,因此采用文件系统作为通信媒介:
ipc_commands/ ipc_responses/
├── {cmd_id}.json → ├── {cmd_id}.json
│ 命令文件 │ 响应文件
│ Flask 写入 │ OASIS 写入
│ │ Flask 轮询读取
Flask 端 — SimulationIPCClient:
def send_command(command_type, args, timeout=60.0, poll_interval=0.5):
# 1. 生成唯一 command_id
# 2. 将命令序列化为 JSON 写入 ipc_commands/{cmd_id}.json
# 3. 轮询 ipc_responses/{cmd_id}.json(每 0.5 秒检查一次)
# 4. 读取响应后删除命令和响应文件
# 5. 超时抛出 TimeoutError
OASIS 端 — SimulationIPCServer:
def poll_commands() -> Optional[IPCCommand]:
# 扫描 ipc_commands/ 目录
# 返回最早的命令(按修改时间排序)
def send_response(response):
# 将响应写入 ipc_responses/{cmd_id}.json
# 删除对应的命令文件
环境存活检测: 通过 env_status.json 文件的 status 字段判断环境是否存活。
4.10 ZepGraphMemoryUpdater — 图谱记忆更新器
文件: backend/app/services/zep_graph_memory_updater.py(约 549 行)
职责: 将仿真过程中智能体的行为实时回写到 Zep 知识图谱,实现动态记忆演化
设计原理
这是 MiroFish 的一个重要创新:仿真不仅是从知识图谱中"读取"信息来驱动智能体,还会将仿真过程中的行为"写回"知识图谱。这意味着:
- 仿真中的智能体行为会成为知识图谱的一部分
- 后续的报告分析可以搜索到仿真中发生的事件
- 深度交互时可以基于仿真后的知识图谱进行更精确的查询
行为转自然语言
每种行为类型都有对应的中文描述模板:
# AgentActivity.to_episode_text() 示例:
CREATE_POST → "{agent_name} 在{platform}上发布了一条帖子:「{content}」"
LIKE_POST → "{agent_name} 在{platform}上点赞了{post_author}的帖子:「{post_content}」"
REPOST → "{agent_name} 在{platform}上转发了{original_author}的帖子:「{content}」"
CREATE_COMMENT → "{agent_name} 在{platform}上在{post_author}的帖子下评论道:「{content}」"
FOLLOW → "{agent_name} 在{platform}上关注了用户「{target_user_name}」"
SEARCH_POSTS → "{agent_name} 在{platform}上搜索了「{query}」"
MUTE → "{agent_name} 在{platform}上屏蔽了用户「{target_user_name}」"
平台名称映射:twitter → "世界1", reddit → "世界2"
批量发送机制
BATCH_SIZE = 5 # 累积 5 条行为后发送
SEND_INTERVAL = 0.5 # 批次间隔 0.5 秒
MAX_RETRIES = 3 # 最大重试次数
RETRY_DELAY = 2 # 重试间隔(指数增长)
工作线程 (_worker_loop):
- 从活动队列中取出行为
- 按平台分组到缓冲区
- 缓冲区达到 BATCH_SIZE 时,将多条行为的自然语言描述合并
- 调用
client.graph.add(graph_id, type="text", data=combined_text)写入 Zep - 指数退避重试
ZepGraphMemoryManager — 单例管理器:
class ZepGraphMemoryManager:
"""管理多个仿真的记忆更新器实例"""
_updaters: Dict[str, ZepGraphMemoryUpdater]
@classmethod
def create_updater(cls, simulation_id, graph_id) -> ZepGraphMemoryUpdater
@classmethod
def stop_updater(cls, simulation_id)
@classmethod
def stop_all(cls)
4.11 ReportAgent — 报告生成Agent
文件: backend/app/services/report_agent.py(约 1000+ 行)
职责: 基于 ReACT(Reasoning + Acting)模式,利用工具集深度分析仿真结果,生成预测报告
设计理念
ReportAgent 模仿人类分析师的工作方式:先规划报告结构,再针对每个章节反复调研(搜索、访谈)和撰写。它使用"思考-行动-观察"循环:
对每个章节:
1. 思考: 我需要搜集哪些信息来写这个章节?
2. 行动: 调用工具(搜索图谱、访谈智能体等)
3. 观察: 分析工具返回的结果
4. 重复 1-3,直到信息充分
5. 撰写: 基于收集的信息生成章节内容
工具定义
ReportAgent 拥有 4 种工具:
| 工具名 | 功能 | 适用场景 |
|---|---|---|
| insight_forge | 深度分析:自动生成子问题,多维度搜索 | 复杂分析,需要多角度理解 |
| panorama_search | 全景搜索:包含历史/过期内容 | 全局概览,时序分析 |
| quick_search | 快速精确搜索 | 已知关键词的快速查找 |
| interview_agents | 智能体访谈:通过 IPC 与 OASIS 中的智能体对话 | 获取第一人称视角和引言 |
配置参数
MAX_TOOL_CALLS_PER_SECTION = 5 # 每个章节最多调用 5 次工具
MAX_REFLECTION_ROUNDS = 3 # 最多 3 轮反思/修改
MAX_TOOL_CALLS_PER_CHAT = 2 # 聊天模式每轮最多 2 次工具调用
报告生成流程
规划阶段:
PLAN_SYSTEM_PROMPT = """
你是一位专业的未来预测报告撰写专家。
你的任务是基于多智能体社交媒体模拟推演的结果,
撰写一份详尽的预测分析报告...
"""
- LLM 生成报告大纲(ReportOutline),包含标题、摘要、章节列表
章节生成阶段:
SECTION_SYSTEM_PROMPT_TEMPLATE = """
你现在需要撰写报告的一个章节:{section_title}
你可以使用以下工具来收集信息...
必须引用原始事件语录,避免自行编造内容...
"""
- 对每个章节进行 ReACT 循环
- 强制最少工具调用次数
- 检测未使用的工具并建议使用
- 超过最大工具调用次数后强制撰写
输出格式: Markdown 格式的完整报告
日志系统
ReportLogger: JSONL 格式的结构化日志
{
"timestamp": "2025-01-01T12:00:00",
"elapsed_seconds": 45.2,
"report_id": "report_xxx",
"action": "tool_call",
"stage": "section_generation",
"details": {
"section": "第二章:舆论传播分析",
"tool_name": "insight_forge",
"parameters": {"query": "..."}
}
}
记录的动作类型:report_start, planning_start, section_start, react_thought, tool_call, tool_result, llm_response, section_content, section_complete, report_complete, error
4.12 ZepToolsService — Zep 工具服务
文件: backend/app/services/zep_tools.py(约 1000+ 行)
职责: 为 ReportAgent 提供统一的 Zep 图谱搜索接口,封装多种搜索策略
搜索结果数据类
@dataclass
class SearchResult:
facts: List[str]
edges: List[EdgeInfo]
nodes: List[NodeInfo]
query: str
total_count: int
@dataclass
class NodeInfo:
uuid: str
name: str
labels: List[str]
summary: str
attributes: Dict
@dataclass
class EdgeInfo:
uuid: str
name: str
fact: str
source_node: Dict # {uuid, name}
target_node: Dict # {uuid, name}
# 时序字段
created_at: Optional[str]
valid_at: Optional[str]
invalid_at: Optional[str]
expired_at: Optional[str]
@property
def is_expired(self) -> bool # 判断关系是否过期
@property
def is_invalid(self) -> bool # 判断关系是否失效
核心搜索方法
search_graph(graph_id, query, limit=10, scope="edges") -> SearchResult
- 优先使用 Zep Cloud Search API
- 如果 API 不可用,回退到本地关键词匹配 (
_local_search)
_local_search(graph_id, query, limit, scope) -> SearchResult
- 回退方法:对所有边/节点进行关键词匹配
- 计算匹配分数,返回最佳结果
高级搜索工具
insight_forge(graph_id, query, simulation_requirement, report_context, max_sub_queries=5) -> InsightForgeResult
深度分析工具的实现:
- 子问题生成: 调用 LLM 将主查询分解为多个子问题
- 多维搜索: 对每个子问题执行语义搜索
- 实体洞察: 提取关键实体的详细信息
- 关系链: 追踪实体间的关系路径
- 统计汇总: 返回总事实数、实体数、关系数
panorama_search(graph_id, query, include_expired=True) -> PanoramaResult
全景搜索工具:
- 获取图谱中所有节点和边
- 分离活跃事实和历史事实(基于 expired_at/invalid_at)
- 返回完整的图谱视图
interview_agents(graph_id, interviews_config) -> InterviewResult
智能体访谈工具:
- 选择相关智能体
- 通过 IPC 发送访谈命令到 OASIS 进程
- 收集智能体回复
- 清洗引言中的格式问题
- 生成访谈摘要
第五部分:后端API接口详解
5.1 API 总览
后端通过 Flask Blueprint 组织 API,挂载在三个路径下:
| Blueprint | 挂载路径 | 功能 |
|---|---|---|
graph_bp |
/api/graph |
图谱构建相关(本体论、图谱、项目管理) |
simulation_bp |
/api/simulation |
仿真相关(创建、准备、运行、状态查询) |
report_bp |
/api/report |
报告相关(生成、查看、聊天、日志) |
5.2 图谱 API(/api/graph)
POST /api/graph/ontology/generate — 生成本体论
请求: multipart/form-data
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
files |
File[] | 是 | PDF/MD/TXT 文档(支持多文件) |
simulation_requirement |
string | 是 | 仿真需求描述(自然语言) |
project_name |
string | 否 | 项目名称 |
additional_context |
string | 否 | 额外上下文 |
处理流程:
- 创建 Project →
ProjectStatus.CREATED - 保存上传文件到
projects/{project_id}/files/ - FileParser 提取各文件文本
- TextProcessor 预处理文本
- OntologyGenerator.generate() 调用 LLM 生成本体论
- 保存到项目 →
ProjectStatus.ONTOLOGY_GENERATED
响应:
{
"success": true,
"data": {
"project_id": "proj_xxxxx",
"project_name": "项目名称",
"ontology": {
"entity_types": [...],
"edge_types": [...]
},
"analysis_summary": "基于文档的分析摘要...",
"files": [{"filename": "xxx.pdf", "path": "...", "size": 12345}],
"total_text_length": 50000
}
}
POST /api/graph/build — 构建知识图谱
请求: application/json
{
"project_id": "proj_xxxxx",
"graph_name": "图谱名称",
"chunk_size": 500,
"chunk_overlap": 50,
"force": false
}
处理流程:
- 验证项目存在且状态为 ONTOLOGY_GENERATED
- 启动后台线程执行图谱构建
- 返回 task_id 供轮询
响应:
{
"success": true,
"data": {
"project_id": "proj_xxxxx",
"task_id": "uuid-xxxx",
"message": "Graph building task started"
}
}
GET /api/graph/task/{task_id} — 查询任务状态
响应:
{
"success": true,
"data": {
"task_id": "uuid-xxxx",
"task_type": "构建图谱",
"status": "processing",
"progress": 45,
"message": "正在处理文本批次 3/10...",
"result": null,
"error": null
}
}
GET /api/graph/data/{graph_id} — 获取图谱数据
返回完整的节点和边数据,用于前端 D3 可视化。
DELETE /api/graph/delete/{graph_id} — 删除图谱
GET/DELETE /api/graph/project/{project_id} — 项目管理
POST /api/graph/project/{project_id}/reset — 重置项目(回到 ONTOLOGY_GENERATED 状态)
5.3 仿真 API(/api/simulation)
实体查询
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /entities/{graph_id} |
获取所有过滤后的实体 |
| GET | /entities/{graph_id}/{entity_uuid} |
获取单个实体详情 |
| GET | /entities/{graph_id}/by-type/{entity_type} |
按类型获取实体 |
POST /api/simulation/create — 创建仿真
请求:
{
"project_id": "proj_xxxxx",
"graph_id": "mirofish_xxxxx",
"enable_twitter": true,
"enable_reddit": true
}
响应: 返回 simulation_id
POST /api/simulation/prepare — 准备仿真环境
请求:
{
"simulation_id": "sim_xxxxx",
"entity_types": ["Student", "Professor"],
"use_llm_for_profiles": true,
"parallel_profile_count": 5,
"force_regenerate": false
}
智能检测逻辑:
- 如果已准备完毕(config_generated=true),直接返回
- 否则启动后台任务(三阶段:读取实体 → 生成人设 → 生成配置)
响应:
{
"success": true,
"data": {
"simulation_id": "sim_xxxxx",
"task_id": "task_uuid",
"status": "preparing",
"expected_entities_count": 42,
"entity_types": ["Student", "Professor"]
}
}
POST /api/simulation/prepare/status — 查询准备进度
POST /api/simulation/start — 启动仿真
请求:
{
"simulation_id": "sim_xxxxx",
"platform": "parallel",
"max_rounds": 100,
"enable_graph_memory_update": false,
"force": false
}
智能启动逻辑:
- 检查准备是否完成
- 如果已在运行且非 force,返回错误
- 如果 force,停止现有进程,清理日志
- 调用 SimulationRunner.start_simulation()
响应:
{
"success": true,
"data": {
"simulation_id": "sim_xxxxx",
"runner_status": "running",
"process_pid": 12345,
"twitter_running": true,
"reddit_running": true,
"graph_memory_update_enabled": true
}
}
GET /api/simulation/{simulation_id}/run-status — 运行状态
响应:
{
"runner_status": "running",
"current_round": 5,
"total_rounds": 144,
"progress_percent": 3.5,
"simulated_hours": 2,
"total_simulation_hours": 72,
"twitter_running": true,
"reddit_running": true,
"twitter_actions_count": 150,
"reddit_actions_count": 200,
"total_actions_count": 350
}
其他仿真 API
| 方法 | 路径 | 说明 |
|---|---|---|
| POST | /stop |
停止运行中的仿真 |
| GET | /{id}/profiles |
获取智能体人设列表 |
| GET | /{id}/profiles/realtime |
实时获取人设生成进度 |
| GET | /{id}/config |
获取仿真配置 |
| GET | /{id}/config/realtime |
实时获取配置生成进度 |
| GET | /list |
列出所有仿真 |
| GET | /history |
获取富化的历史记录 |
| GET | /{id}/actions |
获取行为日志 |
5.4 报告 API(/api/report)
POST /api/report/generate — 生成报告
请求:
{
"simulation_id": "sim_xxxxx",
"force_regenerate": false
}
处理流程:
- 检查是否已有报告(非 force_regenerate 时跳过)
- 获取仿真、项目、图谱信息
- 创建 ReportAgent 实例
- 后台线程执行
agent.generate_report() - 分节保存并更新进度
POST /api/report/generate/status — 报告生成进度
GET /api/report/{report_id} — 获取报告内容
响应:
{
"success": true,
"data": {
"report_id": "report_xxxxx",
"simulation_id": "sim_xxxxx",
"status": "completed",
"title": "报告标题",
"markdown_content": "# 完整 Markdown 报告内容...",
"outline": {
"title": "...",
"summary": "...",
"sections": [
{"title": "第一章:...", "content": "..."}
]
}
}
}
POST /api/report/chat — 与 ReportAgent 对话
请求:
{
"simulation_id": "sim_xxxxx",
"message": "请解释这个预测结论的依据",
"chat_history": [
{"role": "user", "content": "之前的问题"},
{"role": "assistant", "content": "之前的回答"}
]
}
响应: 包含回答文本、工具调用记录、信息来源
日志和状态查询
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /{id}/progress |
实时生成进度和当前章节 |
| GET | /{id}/sections |
获取已生成的所有章节元数据 |
| GET | /{id}/section/{index} |
获取单个章节内容 |
| GET | /{id}/agent-log |
分页获取 Agent 执行日志 |
| GET | /{id}/agent-log/stream |
一次性获取所有日志 |
| GET | /{id}/console-log |
分页获取控制台输出 |
| GET | /{id}/download |
下载 Markdown 文件 |
| GET | /check/{simulation_id} |
检查报告状态 |
5.5 健康检查
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /health |
返回 {"status": "ok"} |
5.6 统一错误响应格式
所有 API 在出错时返回统一格式:
{
"success": false,
"error": "人类可读的错误描述",
"traceback": "完整 Python 堆栈跟踪(开发模式下)"
}
HTTP 状态码使用:
400: 请求参数错误、验证失败404: 资源不存在500: 服务器内部错误
第六部分:后端数据模型
6.1 Project 模型
文件: backend/app/models/project.py
项目状态机
class ProjectStatus(Enum):
CREATED = "created" # 文件已上传,无本体论
ONTOLOGY_GENERATED = "ontology_generated" # 本体论已生成
GRAPH_BUILDING = "graph_building" # 正在构建图谱
GRAPH_COMPLETED = "graph_completed" # 图谱构建完成
FAILED = "failed" # 任意步骤失败
状态转换流:
CREATED → ONTOLOGY_GENERATED → GRAPH_BUILDING → GRAPH_COMPLETED
↘
FAILED
项目数据结构
@dataclass
class Project:
project_id: str # 格式: proj_{uuid[:5]}
name: str # 项目名称
status: ProjectStatus # 当前状态
created_at: str # ISO 时间戳
updated_at: str # ISO 时间戳
files: List[Dict] # [{filename, path, size}]
total_text_length: int # 提取的文本总字符数
ontology: Optional[Dict] # {entity_types, edge_types}
analysis_summary: str # LLM 分析摘要
graph_id: Optional[str] # Zep 图谱 ID
graph_build_task_id: Optional[str] # 异步构建任务 ID
simulation_requirement: str # 用户仿真需求描述
chunk_size: int # 文本分块大小
chunk_overlap: int # 分块重叠量
error: Optional[str] # 错误信息
ProjectManager(静态方法类)
| 方法 | 功能 | 存储位置 |
|---|---|---|
create_project(name) |
创建新项目目录和元数据 | uploads/projects/proj_xxx/ |
save_project(project) |
持久化项目状态 | project.json |
get_project(project_id) |
加载项目状态 | project.json |
list_projects(limit) |
列出所有项目(按创建时间降序) | 扫描目录 |
delete_project(project_id) |
删除整个项目目录 | shutil.rmtree |
save_file_to_project(id, file, name) |
保存上传文件 | files/{safe_name} |
save_extracted_text(id, text) |
保存提取的文本 | extracted_text.txt |
get_extracted_text(id) |
读取提取的文本 | extracted_text.txt |
get_project_files(id) |
获取项目所有文件路径 | 扫描 files/ |
文件安全处理
上传文件使用 werkzeug.utils.secure_filename() 处理文件名,防止路径遍历攻击。对中文文件名进行 UUID 前缀处理以确保安全性。
6.2 Task 模型
文件: backend/app/models/task.py
任务状态
class TaskStatus(Enum):
PENDING = "pending" # 已创建未开始
PROCESSING = "processing" # 执行中
COMPLETED = "completed" # 成功完成
FAILED = "failed" # 执行失败
任务数据结构
@dataclass
class Task:
task_id: str # UUID
task_type: str # 任务类型描述(如 "构建图谱"、"report_generate")
status: TaskStatus
created_at: datetime
updated_at: datetime
progress: int # 0-100 百分比
message: str # 当前状态消息
result: Optional[Dict] # 任务完成后的结果
error: Optional[str] # 错误堆栈
metadata: Dict # 额外上下文(如 project_id, graph_id)
progress_detail: Dict # 详细阶段进度
TaskManager(单例模式 + 线程安全)
class TaskManager:
_instance = None
_instance_lock = threading.Lock()
_task_lock = threading.Lock()
_tasks: Dict[str, Task] = {}
@classmethod
def get_instance(cls) -> 'TaskManager':
"""双重检查锁定的单例实现"""
if cls._instance is None:
with cls._instance_lock:
if cls._instance is None:
cls._instance = TaskManager()
return cls._instance
关键设计: TaskManager 是纯内存存储,重启后任务数据丢失。通过 cleanup_old_tasks(max_age_hours=24) 清理过期任务防止内存泄漏。
| 方法 | 线程安全 | 说明 |
|---|---|---|
create_task(task_type, metadata) |
是 | 创建任务,返回 task_id |
get_task(task_id) |
是 | 获取任务对象 |
update_task(task_id, ...) |
是 | 更新进度/状态/消息 |
complete_task(task_id, result) |
是 | 标记完成,progress=100 |
fail_task(task_id, error) |
是 | 标记失败 |
list_tasks(task_type) |
是 | 列出任务 |
cleanup_old_tasks(max_age_hours) |
是 | 清理旧任务 |
第七部分:后端工具模块
7.1 FileParser — 文件解析器
文件: backend/app/utils/file_parser.py
支持的文件格式
| 格式 | 解析方法 | 依赖库 |
|---|---|---|
_extract_from_pdf |
PyMuPDF (fitz) | |
| Markdown | _extract_from_md |
原生文本读取 |
| TXT | _extract_from_txt |
原生文本读取 |
编码检测策略
_read_text_with_fallback(file_path) 的四级回退策略:
1. UTF-8 直接解码
↓ 失败
2. charset_normalizer 自动检测
↓ 失败
3. chardet 检测
↓ 失败
4. UTF-8 + errors='replace'(用 � 替换无法解码的字节)
这种多级回退确保了对各种中文编码(UTF-8、GBK、GB2312、Big5 等)的兼容。
文本分块函数
split_text_into_chunks(text, chunk_size=500, overlap=50) -> List[str]
核心逻辑:
def split_text_into_chunks(text, chunk_size=500, overlap=50):
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
if end < len(text):
# 尝试在句子边界处分割
for sep in ['。', '!', '?', '.\n', '!\n', '?\n', '\n\n']:
boundary = text.rfind(sep, start + chunk_size // 2, end)
if boundary != -1:
end = boundary + len(sep)
break
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - overlap # 保留重叠
return chunks
7.2 LLMClient — LLM 通信客户端
文件: backend/app/utils/llm_client.py
初始化
class LLMClient:
def __init__(self, api_key=None, base_url=None, model=None):
self.api_key = api_key or Config.LLM_API_KEY
self.base_url = base_url or Config.LLM_BASE_URL
self.model = model or Config.LLM_MODEL_NAME
self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)
核心方法
chat(messages, temperature=0.7, max_tokens=4096, response_format=None) -> str
- 调用
client.chat.completions.create() - 获取响应文本
- 清除
<think>标签: 某些模型(如 DeepSeek)会输出思维过程,使用正则删除:content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL) - 返回清理后的文本
chat_json(messages, temperature=0.7, max_tokens=4096) -> Dict
- 设置
response_format={"type": "json_object"} - 调用
chat()获取文本 - 清理 Markdown 代码块标记(
json ...) - 解析为 Python Dict
- 解析失败则抛出
ValueError
7.3 Logger — 日志系统
文件: backend/app/utils/logger.py
日志配置
def setup_logger(name='mirofish', level=logging.DEBUG):
# 文件处理器:logs/YYYY-MM-DD.log
# - DEBUG 级别
# - RotatingFileHandler: 10MB 单文件,保留 5 个备份
# - 格式: [2025-01-01 12:00:00,000] DEBUG [module.function:42] 消息
# 控制台处理器:stdout
# - INFO 级别
# - 格式: [12:00:00] INFO: 消息
Windows UTF-8 处理
if sys.platform == 'win32':
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
7.4 Retry — 重试机制
文件: backend/app/utils/retry.py
装饰器用法
@retry_with_backoff(
max_retries=3, # 最大重试次数
initial_delay=1.0, # 初始延迟(秒)
max_delay=30.0, # 最大延迟上限
backoff_factor=2.0, # 退避因子
jitter=True, # 添加随机抖动
exceptions=(Exception,) # 捕获的异常类型
)
def my_api_call():
...
退避公式:
delay = min(delay × backoff_factor, max_delay) × (0.5 + random())
示例延迟序列(initial_delay=1.0, backoff_factor=2.0):
第1次重试: ~1.0s (0.5~1.5s)
第2次重试: ~2.0s (1.0~3.0s)
第3次重试: ~4.0s (2.0~6.0s)
RetryableAPIClient
class RetryableAPIClient:
def call_with_retry(self, func, *args, **kwargs):
"""执行单个函数并重试"""
def call_batch_with_retry(self, func, items, *args, **kwargs):
"""批量执行,返回 (results, failures)"""
# 每个 item 独立重试,失败不影响其他 item
7.5 ZepPaging — Zep 分页工具
文件: backend/app/utils/zep_paging.py
分页策略
_DEFAULT_PAGE_SIZE = 100 # 每页 100 条
_MAX_NODES = 2000 # 节点最大获取量
_DEFAULT_MAX_RETRIES = 3 # 每页最大重试次数
_DEFAULT_RETRY_DELAY = 2.0 # 重试初始延迟
def fetch_all_nodes(client, graph_id, page_size=100, max_items=2000):
"""
使用 UUID 游标分页获取所有节点
每页独立重试,支持指数退避
返回最多 max_items 个节点
"""
all_nodes = []
cursor = None
while True:
page = _fetch_page_with_retry(
lambda: client.graph.node.get_by_graph_id(
graph_id, page_size=page_size, cursor=cursor
)
)
all_nodes.extend(page.results)
if len(all_nodes) >= max_items or not page.results:
break
cursor = page.results[-1].uuid # 使用最后一个 UUID 作为游标
return all_nodes[:max_items]
def fetch_all_edges(client, graph_id, page_size=100):
"""类似逻辑,但不限制数量"""
重试逻辑
def _fetch_page_with_retry(func, max_retries=3, initial_delay=2.0):
"""
捕获 ConnectionError, TimeoutError, OSError, InternalServerError
指数退避重试(2s → 4s → 8s)
"""
第八部分:仿真脚本详解
8.1 ActionLogger — 行为日志系统
文件: backend/scripts/action_logger.py
职责: 为 OASIS 仿真过程提供结构化的行为日志记录
日志架构
SimulationLogManager
├── PlatformActionLogger (Twitter) → twitter/actions.jsonl
├── PlatformActionLogger (Reddit) → reddit/actions.jsonl
└── Main Logger → simulation.log(控制台 + 文件)
JSONL 日志格式
每行一个 JSON 对象:
普通行为记录:
{
"round": 1,
"timestamp": "2025-01-01T12:00:00.000000",
"agent_id": 5,
"agent_name": "zhang_san_123",
"action_type": "CREATE_POST",
"action_args": {
"content": "这个政策我觉得值得讨论..."
},
"result": "success",
"success": true
}
轮次开始/结束事件:
{
"type": "event",
"event": "round_start",
"round": 2,
"timestamp": "2025-01-01T12:30:00",
"simulated_hour": 1.0
}
仿真结束事件:
{
"type": "event",
"event": "simulation_end",
"total_rounds": 144,
"total_actions": 5280,
"timestamp": "2025-01-04T12:00:00"
}
PlatformActionLogger 关键方法
class PlatformActionLogger:
def __init__(self, log_dir: str, platform: str):
# 创建 {log_dir}/{platform}/actions.jsonl
self.log_file = open(log_path, 'a', encoding='utf-8')
def log_action(self, round_num, agent_id, agent_name,
action_type, action_args, result, success):
entry = {
"round": round_num,
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"agent_name": agent_name,
"action_type": action_type,
"action_args": action_args,
"result": str(result),
"success": success
}
self.log_file.write(json.dumps(entry, ensure_ascii=False) + '\n')
self.log_file.flush() # 立即刷新,确保监控线程能读到
UnicodeFormatter
自定义日志格式化器,确保中文字符在控制台正确显示。
8.2 run_parallel_simulation.py — 并行仿真脚本
文件: backend/scripts/run_parallel_simulation.py
职责: 同时运行 Twitter 和 Reddit 两个仿真平台
命令行参数
python run_parallel_simulation.py \
--config simulation_config.json \ # 仿真配置文件路径
[--max-rounds N] # 最大轮数覆盖
[--no-wait] # 不等待 IPC 命令(仿真结束后直接退出)
[--twitter-only] # 仅运行 Twitter
[--reddit-only] # 仅运行 Reddit
初始化流程
def main():
# 1. Windows UTF-8 兼容处理
if sys.platform == 'win32':
# 猴子补丁: 重写 builtins.open 默认编码为 utf-8
original_open = builtins.open
def patched_open(*args, **kwargs):
if 'encoding' not in kwargs and 'b' not in str(kwargs.get('mode', '')):
kwargs['encoding'] = 'utf-8'
return original_open(*args, **kwargs)
builtins.open = patched_open
# 2. 加载配置
config = json.load(open(args.config))
# 3. LLM 配置优先级: .env > config > 默认值
llm_api_key = os.getenv("LLM_API_KEY") or config.get("llm_api_key")
llm_base_url = os.getenv("LLM_BASE_URL") or config.get("llm_base_url")
llm_model = os.getenv("LLM_MODEL_NAME") or config.get("llm_model", "gpt-4o-mini")
# 4. 抑制 OASIS 详细日志
for logger_name in ['social.agent', 'social.twitter', 'social.rec',
'oasis.env', 'table']:
logging.getLogger(logger_name).setLevel(logging.CRITICAL)
# 5. 初始化日志管理器
log_manager = SimulationLogManager(simulation_dir)
仿真环境初始化
# 6. 创建 Twitter 环境
twitter_env = TwitterEnv(
agent_graph=twitter_agent_graph,
llm_config=ModelConfig(model=llm_model, api_key=llm_api_key, ...),
available_actions=[ActionType.CREATE_POST, ActionType.LIKE_POST,
ActionType.REPOST, ActionType.FOLLOW,
ActionType.DO_NOTHING, ActionType.QUOTE_POST],
db_path="twitter_simulation.db"
)
# 7. 创建 Reddit 环境
reddit_env = RedditEnv(
agent_graph=reddit_agent_graph,
llm_config=ModelConfig(...),
available_actions=[ActionType.LIKE_POST, ActionType.DISLIKE_POST,
ActionType.CREATE_POST, ActionType.CREATE_COMMENT,
...13 种动作...],
db_path="reddit_simulation.db"
)
仿真主循环
# 8. 执行初始事件(种子帖子)
for post in config.get("event_config", {}).get("initial_posts", []):
agent_id = post.get("poster_agent_id")
content = post.get("content")
# 在对应平台发布初始帖子
# 9. 主仿真循环
total_rounds = config["time_config"]["total_simulation_hours"] * 60 \
// config["time_config"]["minutes_per_round"]
for round_num in range(total_rounds):
simulated_hour = round_num * minutes_per_round / 60
# 确定本轮活跃智能体数量
hour_of_day = int(simulated_hour) % 24
base_count = random.randint(agents_per_hour_min, agents_per_hour_max)
# 应用时段乘数
if hour_of_day in peak_hours:
target_count = int(base_count * peak_multiplier)
elif hour_of_day in off_peak_hours:
target_count = int(base_count * off_peak_multiplier)
elif hour_of_day in morning_hours:
target_count = int(base_count * morning_multiplier)
else:
target_count = int(base_count * work_multiplier)
# 选择活跃智能体
candidates = []
for agent in all_agents:
if hour_of_day in agent.active_hours:
if random.random() < agent.activity_level:
candidates.append(agent)
active_agents = random.sample(candidates, min(target_count, len(candidates)))
# 在两个平台上执行行为
for platform_env in [twitter_env, reddit_env]:
actions = platform_env.step(active_agents)
for action in actions:
log_manager.log_action(platform, round_num, action)
# 10. 仿真结束后进入 IPC 等待模式
if not args.no_wait:
ipc_server = SimulationIPCServer(simulation_dir)
ipc_server.start()
while True:
command = ipc_server.poll_commands()
if command:
handle_ipc_command(command, twitter_env, reddit_env)
时间模拟机制
MiroFish 使用离散时间步进模拟,核心参数:
total_simulation_hours = 72 → 模拟现实中的 72 小时
minutes_per_round = 60 → 每轮代表 60 分钟
total_rounds = 72 * 60 / 60 = 72 轮
每轮代表的时刻 = round_num × minutes_per_round / 60 (小时)
例: 第 5 轮 = 5 × 60 / 60 = 第 5 小时 = 凌晨 5 点
8.3 run_twitter_simulation.py — Twitter 单平台仿真
文件: backend/scripts/run_twitter_simulation.py
与并行仿真类似,但仅运行 Twitter 平台。
关键差异:
- 人设格式: CSV(
twitter_profiles.csv) - 数据库:
twitter_simulation.db - 可用动作: 6 种(CREATE_POST, LIKE_POST, REPOST, FOLLOW, DO_NOTHING, QUOTE_POST)
- IPCHandler 只处理 Twitter 环境的命令
智能体选择逻辑:
def select_active_agents(all_agents, config, simulated_hour):
hour_of_day = int(simulated_hour) % 24
# 1. 计算基础范围
base_min = config["agents_per_hour_min"]
base_max = config["agents_per_hour_max"]
base_count = random.randint(base_min, base_max)
# 2. 应用时段乘数
if hour_of_day in config["peak_hours"]:
multiplier = config.get("peak_activity_multiplier", 1.5)
elif hour_of_day in config["off_peak_hours"]:
multiplier = config.get("off_peak_activity_multiplier", 0.3)
else:
multiplier = 1.0
target_count = int(base_count * multiplier)
# 3. 过滤候选智能体
candidates = []
for agent in all_agents:
agent_config = config["agent_configs"].get(str(agent.id))
if agent_config:
if hour_of_day in agent_config.get("active_hours", range(24)):
if random.random() < agent_config.get("activity_level", 0.5):
candidates.append(agent)
# 4. 随机抽样
return random.sample(candidates, min(target_count, len(candidates)))
8.4 run_reddit_simulation.py — Reddit 单平台仿真
文件: backend/scripts/run_reddit_simulation.py
与 Twitter 版本对称,但针对 Reddit 平台:
关键差异:
- 人设格式: JSON(
reddit_profiles.json),包含更丰富的字段 - 数据库:
reddit_simulation.db - 可用动作: 13 种,包括 DISLIKE_POST, CREATE_COMMENT, SEARCH_POSTS 等
- Reddit 特有的 TREND 和 REFRESH 动作
8.5 test_profile_format.py — 人设格式测试
文件: backend/scripts/test_profile_format.py
职责: 验证生成的人设文件格式是否符合 OASIS 要求
测试项目:
-
Twitter CSV 格式:
- 必需字段:
user_id, user_name, name, bio, friend_count, follower_count, statuses_count, created_at - 使用
csv.DictReader验证
- 必需字段:
-
Reddit JSON 格式:
- 必需字段:
realname, username, bio, persona - 可选字段:
age, gender, mbti, country, profession, interested_topics
- 必需字段:
第九部分:前端架构与实现
9.1 技术架构
前端使用 Vue 3 + Vite 构建,采用单页应用(SPA)架构。
项目配置
package.json:
{
"dependencies": {
"vue": "^3.5.24",
"vue-router": "^4.6.3",
"axios": "axios",
"d3": "d3"
},
"scripts": {
"dev": "vite --port 3000 --open",
"build": "vite build",
"preview": "vite preview"
}
}
vite.config.js:
export default defineConfig({
plugins: [vue()],
server: {
port: 3000,
open: true,
proxy: {
'/api': {
target: 'http://localhost:5001', // API 代理到后端
changeOrigin: true
}
}
}
})
index.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<!-- Google Fonts: Inter, JetBrains Mono, Noto Sans SC, Space Grotesk -->
<title>MiroFish - 预测万物</title>
</head>
<body>
<div id="app"></div>
<script type="module" src="/src/main.js"></script>
</body>
</html>
9.2 路由结构
文件: frontend/src/router/index.js
| 路径 | 组件 | 用途 |
|---|---|---|
/ |
Home.vue |
首页(文件上传 & 项目选择) |
/process/:projectId |
MainView.vue |
图谱构建(步骤 1) |
/simulation/:simulationId |
SimulationView.vue |
环境搭建(步骤 2) |
/simulation/:simulationId/start |
SimulationRunView.vue |
仿真运行(步骤 3) |
/report/:reportId |
ReportView.vue |
报告生成(步骤 4) |
/interaction/:reportId |
InteractionView.vue |
深度交互(步骤 5) |
路由参数通过 props: true 传递给组件。
9.3 状态管理
文件: frontend/src/store/pendingUpload.js
MiroFish 不使用 Vuex/Pinia,而是使用简单的响应式状态模块:
import { reactive } from 'vue'
const state = reactive({
files: [],
requirement: '',
pending: false
})
export function setPendingUpload(files, requirement) {
state.files = files
state.requirement = requirement
state.pending = true
}
export function getPendingUpload() {
return { ...state }
}
export function clearPendingUpload() {
state.files = []
state.requirement = ''
state.pending = false
}
设计理由: 这个状态存储解决了首页(Home)和处理页(Process)之间的数据传递问题。用户在首页选择文件和输入需求后,数据暂存在 pendingUpload 中,导航到处理页后读取并清除。
9.4 API 层
Axios 基础配置(api/index.js)
const service = axios.create({
baseURL: '/api',
timeout: 300000 // 5 分钟超时(适应 LLM 长调用)
})
// 请求拦截器:日志记录
service.interceptors.request.use(config => {
console.log(`[API] ${config.method.toUpperCase()} ${config.url}`)
return config
})
// 响应拦截器:错误处理
service.interceptors.response.use(
response => response.data,
error => {
if (error.code === 'ECONNABORTED') {
console.error('请求超时')
}
return Promise.reject(error)
}
)
// 带重试的请求封装
export function requestWithRetry(requestFn, maxRetries = 3) {
// 指数退避重试
return async function (...args) {
for (let i = 0; i <= maxRetries; i++) {
try {
return await requestFn(...args)
} catch (error) {
if (i === maxRetries) throw error
await sleep(Math.pow(2, i) * 1000) // 1s, 2s, 4s
}
}
}
}
Graph API(api/graph.js)
export const generateOntology = (formData) =>
requestWithRetry(() => service.post('/graph/ontology/generate', formData, {
headers: { 'Content-Type': 'multipart/form-data' }
}))()
export const buildGraph = (data) =>
service.post('/graph/build', data)
export const getTaskStatus = (taskId) =>
service.get(`/graph/task/${taskId}`)
export const getGraphData = (graphId) =>
service.get(`/graph/data/${graphId}`)
export const getProject = (projectId) =>
service.get(`/graph/project/${projectId}`)
Simulation API(api/simulation.js)
包含 20+ 个 API 调用函数,覆盖仿真生命周期的每个阶段:
- 创建、准备、启动、停止仿真
- 查询状态、人设、配置
- 获取行为日志和时间线
- 智能体访谈
- 历史记录
Report API(api/report.js)
export const generateReport = (data) =>
service.post('/report/generate', data)
export const getReportStatus = (reportId) =>
service.get(`/report/${reportId}/progress`)
export const getAgentLog = (reportId, fromLine = 0) =>
service.get(`/report/${reportId}/agent-log`, { params: { from_line: fromLine } })
export const getConsoleLog = (reportId, fromLine = 0) =>
service.get(`/report/${reportId}/console-log`, { params: { from_line: fromLine } })
export const getReport = (reportId) =>
service.get(`/report/${reportId}`)
export const chatWithReport = (data) =>
service.post('/report/chat', data)
9.5 全局样式
App.vue 定义的全局样式:
/* 根字体 */
body {
font-family: 'JetBrains Mono', 'Space Grotesk', 'Noto Sans SC', monospace;
}
/* 自定义暗色系滚动条 */
::-webkit-scrollbar { width: 6px; }
::-webkit-scrollbar-track { background: #f0f0f0; }
::-webkit-scrollbar-thumb { background: #ccc; border-radius: 3px; }
/* 全局按钮样式 */
button { font-family: inherit; }
色板:
- 主色:
#000000(黑色)、#FFFFFF(白色) - 强调色:
#FF4500(橙红色) - 灰色:
#666666(文字)、#E5E5E5(边框)
动画:
blink: 1 秒透明度脉冲pulse: 边框脉冲效果- 统一使用
cubic-bezier(0.4, 0, 0.2, 1)缓动曲线
9.6 轮询策略
前端采用定时轮询获取后端异步操作的状态:
| 场景 | 轮询间隔 | 停止条件 |
|---|---|---|
| 任务状态(图谱构建) | 2 秒 | status === "completed" 或 "failed" |
| 图谱数据更新 | 10 秒 | 一次性获取后停止 |
| 人设生成进度 | 3 秒 | 所有人设生成完毕 |
| 仿真配置进度 | 2 秒 | 配置生成完毕 |
| 仿真运行状态 | 2 秒 | runner_status === "completed" |
| 报告生成进度 | 2 秒 | status === "completed" |
| Agent 日志 | 3 秒 | 报告生成完毕 |
第十部分:前端视图组件详解
10.1 Home.vue — 首页
路径: frontend/src/views/Home.vue
路由: /
页面布局
双栏布局:左侧信息展示区 + 右侧控制台风格操作区
┌─────────────────────────┬──────────────────────────┐
│ │ │
│ Hero Section │ Console Section │
│ - 渐变标题 │ - 文件上传区域 │
│ - 项目描述 │ (拖拽 + 点击) │
│ - 功能特点 │ - 仿真需求输入框 │
│ │ - 已选文件列表 │
│ │ - 启动按钮 │
│ │ │
├─────────────────────────┴──────────────────────────┤
│ HistoryDatabase Component │
│ - 历史项目/仿真记录 │
└────────────────────────────────────────────────────┘
核心交互
文件上传:
// 支持拖拽和点击两种方式
addFiles(newFiles) {
for (const file of newFiles) {
const ext = file.name.split('.').pop().toLowerCase()
if (['pdf', 'md', 'txt', 'markdown'].includes(ext)) {
this.files.push(file)
} else {
alert(`不支持的文件格式: ${ext}`)
}
}
}
启动仿真流程:
startSimulation() {
if (!this.canSubmit) return
// 暂存文件和需求到全局状态
setPendingUpload(this.files, this.formData.simulationRequirement)
// 导航到处理页(使用特殊标记 'new' 表示新项目)
this.$router.push('/process/new')
}
10.2 MainView.vue — 主处理视图
路径: frontend/src/views/MainView.vue
路由: /process/:projectId
功能概述
这是步骤 1-2 的容器视图,包含左右双面板:
- 左面板: GraphPanel(知识图谱可视化)
- 右面板: Step1GraphBuild 或 Step2EnvSetup 组件
三种视图模式
viewMode: 'graph' | 'split' | 'workbench'
// graph: 左面板 100%,右面板隐藏
// split: 左右各 50%
// workbench: 左面板隐藏,右面板 100%
项目初始化逻辑
async initProject() {
if (this.projectId === 'new') {
// 新项目: 从 pendingUpload 获取文件和需求
const { files, requirement } = getPendingUpload()
clearPendingUpload()
await this.handleNewProject(files, requirement)
} else {
// 已有项目: 加载项目数据
await this.loadProject(this.projectId)
}
}
handleNewProject 流程:
- 构建 FormData(文件 + 仿真需求)
- 调用
generateOntology(formData)API - 获取返回的 project_id
- 更新路由到
/process/{project_id} - 设置 currentPhase = 0(本体论已生成)
图谱构建触发:
async startBuildGraph() {
const res = await buildGraph({
project_id: this.projectId,
graph_name: this.projectData.name,
chunk_size: 500,
chunk_overlap: 50
})
this.startPollingTask(res.data.task_id)
}
startPollingTask(taskId) {
this.pollInterval = setInterval(() => this.pollTaskStatus(taskId), 2000)
}
系统日志
systemLogs: [] // 活动日志数组
// 添加日志
addLog(message) {
this.systemLogs.push({
timestamp: new Date().toISOString(),
message: message
})
}
10.3 SimulationView.vue / SimulationRunView.vue
SimulationView.vue 路由 /simulation/:simulationId:
- 容器视图,嵌套 Step2EnvSetup 组件
- 加载仿真状态和关联的项目/图谱数据
SimulationRunView.vue 路由 /simulation/:simulationId/start:
- 容器视图,嵌套 Step3Simulation 组件
- 管理仿真运行的视图切换
10.4 ReportView.vue — 报告视图
路径: frontend/src/views/ReportView.vue
路由: /report/:reportId
布局: 左面板(GraphPanel)+ 右面板(Step4Report)
数据加载链:
async loadReportData() {
// 1. 通过 reportId 获取报告 → 得到 simulationId
// 2. 通过 simulationId 获取仿真状态 → 得到 projectId, graphId
// 3. 通过 projectId 获取项目数据
// 4. 通过 graphId 获取图谱可视化数据
}
默认以 workbench 模式显示(Step4Report 全屏),因为报告生成阶段图谱可视化相对不重要。
10.5 InteractionView.vue — 交互视图
路径: frontend/src/views/InteractionView.vue
路由: /interaction/:reportId
布局: 与 ReportView 类似,但右面板为 Step5Interaction 组件。
状态管理:
data() {
return {
viewMode: 'workbench', // 默认全屏交互面板
simulationId: null,
currentStatus: 'ready' // ready | processing | completed | error
}
}
第十一部分:前端步骤组件详解
11.1 Step1GraphBuild.vue — 图谱构建步骤
文件: frontend/src/components/Step1GraphBuild.vue
Props
| Prop | 类型 | 说明 |
|---|---|---|
currentPhase |
Number | 0=本体论, 1=构建中, 2=完成 |
projectData |
Object | 项目数据(含本体论) |
ontologyProgress |
String | 本体论生成消息 |
buildProgress |
Number | 图谱构建进度 (0-100) |
graphData |
Object | 图谱节点/边数据 |
systemLogs |
Array | 系统日志 |
显示内容
Phase 0 — 本体论展示:
- 实体类型标签(可点击展开详情)
- 关系类型标签(可点击展开详情)
- 每个类型显示名称、描述、属性列表、示例
- “开始构建图谱” 按钮
Phase 1 — 构建进度:
- 进度条 + 百分比
- 当前操作描述
Phase 2 — 构建完成:
- 统计卡片: 节点数、边数、实体类型数
- 本体论详情覆盖层
- “进入环境搭建” 按钮
进入仿真的逻辑
async goToSimulation() {
// 1. 调用 createSimulation API
const res = await createSimulation({
project_id: this.projectData.project_id,
graph_id: this.projectData.graph_id,
enable_twitter: true,
enable_reddit: true
})
// 2. 导航到仿真页
this.$router.push(`/simulation/${res.data.simulation_id}`)
}
11.2 Step2EnvSetup.vue — 环境搭建步骤
文件: frontend/src/components/Step2EnvSetup.vue(约 2000+ 行模板)
五阶段工作流
phase: 0 | 1 | 2 | 3 | 4
// Phase 0: 初始化(读取实体)
// Phase 1: 人设生成(实时显示进度)
// Phase 2: 配置生成(LLM 推理仿真参数)
// Phase 3: 环境编排(汇总配置)
// Phase 4: 完成(可以开始仿真)
实时人设展示
// 每 3 秒轮询 realtime profiles API
pollProfilesInterval = setInterval(async () => {
const res = await getSimulationProfilesRealtime(simulationId, 'reddit')
if (res.success) {
this.profiles = res.data.profiles
this.profilesProgress = `${res.data.generated}/${res.data.total}`
if (res.data.completed) {
clearInterval(this.pollProfilesInterval)
this.phase = 2 // 进入配置生成阶段
}
}
}, 3000)
配置展示
时间配置:
- 仿真总时长、每轮时长
- 高峰/工作/早间/低谷时段定义
- 各时段活跃度乘数
智能体配置展示:
- 24 小时活跃时间线可视化
- activity_level、stance、influence_weight 等参数
- 点击查看完整人设模态框
平台配置:
- Twitter/Reddit 推荐算法权重
- viral_threshold、echo_chamber_strength
自定义轮数
// 用户可以通过滑块自定义最大轮数
useCustomRounds: false,
customMaxRounds: null,
// 推荐轮数计算
get recommendedRounds() {
const timeConfig = this.simulationConfig?.time_config
if (timeConfig) {
return Math.ceil(
timeConfig.total_simulation_hours * 60 / timeConfig.minutes_per_round
)
}
return 72 // 默认
}
11.3 Step3Simulation.vue — 仿真运行步骤
文件: frontend/src/components/Step3Simulation.vue
双平台状态显示
┌─────────────────────────────────────────────────┐
│ 仿真运行面板 │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ Twitter 信息广场 │ │ Reddit 话题社区 │ │
│ │ 轮次: 5/144 │ │ 轮次: 5/144 │ │
│ │ 行为: 150 条 │ │ 行为: 200 条 │ │
│ │ 状态: 运行中 │ │ 状态: 运行中 │ │
│ └──────────────────┘ └──────────────────┘ │
│ │
│ ┌───────────────────────────────────────────┐ │
│ │ 时间线 │ │
│ │ 12:00 张三 发布帖子: "这个政策..." │ │
│ │ 12:01 李四 点赞了张三的帖子 │ │
│ │ 12:02 王五 评论: "我同意..." │ │
│ │ ... │ │
│ └───────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
行为类型显示映射
const actionTypeMap = {
'CREATE_POST': { label: '发帖', color: '#4CAF50' },
'REPOST': { label: '转发', color: '#2196F3' },
'QUOTE_POST': { label: '引用', color: '#9C27B0' },
'LIKE_POST': { label: '点赞', color: '#FF9800' },
'CREATE_COMMENT': { label: '评论', color: '#00BCD4' },
'FOLLOW': { label: '关注', color: '#E91E63' },
'SEARCH_POSTS': { label: '搜索', color: '#607D8B' },
'UPVOTE_POST': { label: '赞同', color: '#8BC34A' },
'DOWNVOTE_POST': { label: '反对', color: '#F44336' },
'DO_NOTHING': { label: '观望', color: '#9E9E9E' }
}
实时状态轮询
// 每 2 秒获取运行状态
pollRunStatus() {
this.statusInterval = setInterval(async () => {
const res = await getRunStatus(this.simulationId)
if (res.success) {
this.runStatus = res.data
// 检查完成
if (res.data.runner_status === 'completed') {
this.phase = 2
clearInterval(this.statusInterval)
}
}
}, 2000)
}
// 每 2 秒获取最新行为
pollActions() {
this.actionsInterval = setInterval(async () => {
const res = await getSimulationActions(this.simulationId, {
limit: 50, offset: 0
})
if (res.success) {
// 使用 actionIds Set 去重
for (const action of res.data) {
const id = `${action.round}_${action.agent_id}_${action.timestamp}`
if (!this.actionIds.has(id)) {
this.actionIds.add(id)
this.allActions.unshift(action) // 最新的在前面
}
}
}
}, 2000)
}
耗时计算
computed: {
elapsedTime() {
if (!this.runStatus?.started_at) return '00:00:00'
const start = new Date(this.runStatus.started_at)
const now = new Date()
const diff = Math.floor((now - start) / 1000)
const hours = Math.floor(diff / 3600)
const minutes = Math.floor((diff % 3600) / 60)
const seconds = diff % 60
return `${pad(hours)}:${pad(minutes)}:${pad(seconds)}`
}
}
11.4 Step4Report.vue — 报告生成步骤
文件: frontend/src/components/Step4Report.vue
双面板布局
┌──────────────────────┬──────────────────────┐
│ 报告内容面板 │ 工作流面板 │
│ │ │
│ [章节1] ▼ │ 度量指标 │
│ 内容... │ - 已完成章节: 3/8 │
│ │ - 耗时: 5:23 │
│ [章节2] ▼ │ - 工具调用: 15 │
│ 内容... │ │
│ │ 工作流步骤 │
│ [章节3] ▼ │ ✓ 开始生成 │
│ (生成中...) │ ✓ 规划大纲 │
│ │ → 生成第3章 │
│ │ ○ 生成第4章 │
│ │ │
│ │ Agent 日志 │
│ │ [tool_call] insight_ │
│ │ [tool_result] 找到... │
│ │ [llm_response] ... │
└──────────────────────┴──────────────────────┘
Agent 日志时间线
// 不同类型日志的显示配置
const logTypeConfig = {
'report_start': { icon: '▶', color: '#4CAF50' },
'planning_start': { icon: '📋', color: '#2196F3' },
'section_start': { icon: '📝', color: '#FF9800' },
'tool_call': { icon: '🔧', color: '#9C27B0' },
'tool_result': { icon: '📊', color: '#00BCD4' },
'llm_response': { icon: '🤖', color: '#607D8B' },
'section_complete': { icon: '✅', color: '#4CAF50' },
'report_complete': { icon: '🎉', color: '#FFD700' },
'error': { icon: '❌', color: '#F44336' }
}
工具调用展示
每种工具有独特的配色和图标:
const toolConfigs = {
'insight_forge': {
label: 'InsightForge 深度分析',
color: '#9C27B0',
description: '自动生成子问题,多维度深度搜索'
},
'panorama_search': {
label: 'PanoramaSearch 全景搜索',
color: '#2196F3',
description: '全图谱范围搜索,包含历史数据'
},
'quick_search': {
label: 'QuickSearch 快速搜索',
color: '#FF9800',
description: '精准关键词快速查找'
},
'interview_agents': {
label: 'InterviewAgents 智能体访谈',
color: '#4CAF50',
description: '与仿真中的智能体进行访谈'
}
}
实时报告渲染
// 增量获取 Agent 日志
async pollAgentLogs() {
const res = await getAgentLog(this.reportId, this.lastLogLine)
if (res.success && res.data.logs.length > 0) {
this.agentLogs.push(...res.data.logs)
this.lastLogLine = res.data.to_line
// 检测新章节内容
for (const log of res.data.logs) {
if (log.action === 'section_content') {
const idx = log.details.section_index
this.generatedSections[idx] = log.details.content
}
}
}
}
11.5 Step5Interaction.vue — 深度交互步骤
文件: frontend/src/components/Step5Interaction.vue
三种交互模式
activeTab: 'chat' | 'survey'
chatTarget: 'report_agent' | 'agent'
// 模式1: 与 ReportAgent 对话(基于报告内容的问答)
// 模式2: 与仿真世界中的智能体 1 对 1 对话
// 模式3: 批量调研(向多个智能体发送同一问题)
与 ReportAgent 对话
async sendToReportAgent(message) {
this.chatHistory.push({ role: 'user', content: message })
const res = await chatWithReport({
simulation_id: this.simulationId,
message: message,
chat_history: this.chatHistory.slice(0, -1) // 排除当前消息
})
this.chatHistory.push({
role: 'assistant',
content: res.data.response,
tools: res.data.tool_calls,
sources: res.data.sources
})
}
与仿真智能体对话
async sendToAgent(message) {
const agent = this.profiles[this.selectedAgent]
this.chatHistory.push({ role: 'user', content: message })
const res = await interviewAgents({
simulation_id: this.simulationId,
interviews: [{
agent_id: agent.user_id,
agent_name: agent.realname || agent.name,
prompt: message
}]
})
const interview = res.data.interviews[0]
this.chatHistory.push({
role: 'assistant',
content: interview.response,
agent_name: interview.agent_name
})
}
批量调研
async submitSurvey() {
const interviews = this.selectedAgents.map(idx => ({
agent_id: this.profiles[idx].user_id,
agent_name: this.profiles[idx].realname || this.profiles[idx].name,
prompt: this.surveyQuestion
}))
const res = await interviewAgents({
simulation_id: this.simulationId,
interviews: interviews
})
this.surveyResults = res.data.interviews
}
聊天历史缓存
// 在切换对话目标时保存/恢复聊天历史
chatHistoryCache: {} // key: chatTarget, value: chatHistory
switchChatTarget(target) {
// 保存当前历史
this.chatHistoryCache[this.chatTarget] = [...this.chatHistory]
// 恢复目标历史
this.chatTarget = target
this.chatHistory = this.chatHistoryCache[target] || []
}
第十二部分:前端子组件详解
12.1 GraphPanel.vue — 知识图谱可视化
文件: frontend/src/components/GraphPanel.vue
技术: D3.js 力导向图(Force-Directed Graph)
D3 力模拟配置
this.simulation = d3.forceSimulation(nodes)
.force('link', d3.forceLink(edges).id(d => d.uuid).distance(150))
.force('charge', d3.forceManyBody().strength(-300))
.force('center', d3.forceCenter(width / 2, height / 2))
.force('collision', d3.forceCollide().radius(30))
力参数说明:
forceLink: 边的弹簧力,自然长度 150pxforceManyBody: 节点间斥力,强度 -300forceCenter: 居中力forceCollide: 碰撞检测,半径 30px
多边处理(Multi-Edge)
当两个节点之间有多条边时,使用曲线路径避免重叠:
// 计算曲率
function getCurvature(edge, allEdges) {
const parallel = allEdges.filter(e =>
(e.source === edge.source && e.target === edge.target) ||
(e.source === edge.target && e.target === edge.source)
)
if (parallel.length <= 1) return 0
const idx = parallel.indexOf(edge)
const step = 0.3 // 曲率步进
return (idx - (parallel.length - 1) / 2) * step
}
// SVG 路径生成
function drawEdge(d) {
const curvature = getCurvature(d, allEdges)
if (curvature === 0) {
return `M${d.source.x},${d.source.y} L${d.target.x},${d.target.y}`
}
const midX = (d.source.x + d.target.x) / 2
const midY = (d.source.y + d.target.y) / 2
const dx = d.target.x - d.source.x
const dy = d.target.y - d.source.y
const cpX = midX - curvature * dy
const cpY = midY + curvature * dx
return `M${d.source.x},${d.source.y} Q${cpX},${cpY} ${d.target.x},${d.target.y}`
}
自环检测
// 节点指向自身的边
if (edge.source === edge.target) {
const r = 40 // 自环半径
return `M${x},${y} C${x+r},${y-r} ${x+r},${y+r} ${x},${y}`
}
交互功能
- 拖拽: D3 drag 行为,拖拽时固定节点位置
- 缩放/平移: D3 zoom 行为
- 节点选中: 点击节点显示详情面板(名称、类型、摘要、属性)
- 边选中: 点击边显示关系详情(关系名、事实描述、源/目标节点)
- 标签切换: 勾选/取消实体类型的可见性
- 边标签显示: 开关控制是否显示边上的关系名称
实体类型配色
computed: {
entityTypes() {
const types = new Set()
this.graphData.nodes.forEach(n => {
n.labels?.forEach(l => {
if (l !== 'Entity' && l !== 'Node') types.add(l)
})
})
const colors = d3.schemeCategory10
return [...types].map((type, i) => ({
name: type,
color: colors[i % colors.length],
count: this.graphData.nodes.filter(n => n.labels?.includes(type)).length
}))
}
}
渲染更新
// 每帧更新位置
simulation.on('tick', () => {
// 更新边路径
edgeElements.attr('d', drawEdge)
// 更新边标签位置
edgeLabelElements.attr('transform', d => {
const midX = (d.source.x + d.target.x) / 2
const midY = (d.source.y + d.target.y) / 2
return `translate(${midX},${midY})`
})
// 更新节点位置
nodeElements.attr('transform', d => `translate(${d.x},${d.y})`)
})
12.2 HistoryDatabase.vue — 历史记录组件
文件: frontend/src/components/HistoryDatabase.vue
用途: 在首页展示历史项目和仿真记录
核心特性
IntersectionObserver 自动展开:
mounted() {
this.observer = new IntersectionObserver(entries => {
entries.forEach(entry => {
if (entry.isIntersecting) {
this.isExpanded = true
}
})
}, { threshold: 0.1 })
this.observer.observe(this.$el)
}
卡片布局:
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ 项目 1 │ │ 项目 2 │ │ 项目 3 │
│ ──────────── │ │ ──────────── │ │ ──────────── │
│ 📄 report.pdf │ │ 📄 novel.md │ │ 📄 data.txt │
│ 📊 21 实体 │ │ 📊 35 实体 │ │ 📊 12 实体 │
│ 🔄 72 轮 │ │ 🔄 144 轮 │ │ 🔄 36 轮 │
│ ● 已完成 │ │ ● 运行中 │ │ ● 未开始 │
│ │ │ │ │ │
│ [查看] [继续] │ │ [查看] [继续] │ │ [查看] [继续] │
└──────────────────┘ └──────────────────┘ └──────────────────┘
进度状态映射:
not-started: 灰色 — 已创建但未运行in-progress: 蓝色动画 — 正在运行completed: 绿色 — 已完成
文件类型颜色(莫兰迪色系):
const fileTypeColors = {
PDF: '#C5A3A3', // 柔和红
DOC: '#A3B5C5', // 柔和蓝
XLS: '#A3C5A3', // 柔和绿
PPT: '#C5B5A3', // 柔和橙
TXT: '#B5B5C5', // 柔和紫
CODE: '#A3C5C5', // 柔和青
IMG: '#C5A3C5', // 柔和粉
ZIP: '#B5C5A3' // 柔和黄绿
}
步骤导航:
navigateToStep(project, step) {
switch (step) {
case 'graph':
this.$router.push(`/process/${project.project_id}`)
break
case 'simulation':
this.$router.push(`/simulation/${project.simulation_id}`)
break
case 'report':
this.$router.push(`/report/${project.report_id}`)
break
case 'interaction':
this.$router.push(`/interaction/${project.report_id}`)
break
}
}
第十三部分:部署与运维
13.1 源码部署
前置要求
| 工具 | 版本 | 安装检查 |
|---|---|---|
| Node.js | 18+ | node -v |
| Python | ≥3.11, ≤3.12 | python --version |
| uv | 最新版 | uv --version |
安装步骤
# 1. 克隆项目
git clone https://github.com/666ghj/MiroFish.git
cd MiroFish
# 2. 配置环境变量
cp .env.example .env
# 编辑 .env,填入 LLM_API_KEY 和 ZEP_API_KEY
# 3. 一键安装所有依赖
npm run setup:all
# 等同于:
# npm install (根目录 concurrently)
# cd frontend && npm install (前端依赖)
# cd backend && uv sync (Python 虚拟环境 + 依赖)
# 4. 启动开发服务器
npm run dev
# 等同于:
# concurrently "cd backend && uv run python run.py" "cd frontend && npm run dev"
服务地址:
- 前端:
http://localhost:3000 - 后端 API:
http://localhost:5001
分步安装
# 仅安装 Node 依赖
npm run setup
# 仅安装 Python 依赖
npm run setup:backend
# 仅启动后端
npm run backend
# 仅启动前端
npm run frontend
# 构建前端生产版本
npm run build
13.2 Docker 部署
docker-compose.yml 配置
services:
mirofish:
image: ghcr.io/666ghj/mirofish:latest
# 备用镜像(国内加速):
# image: ghcr.nju.edu.cn/666ghj/mirofish:latest
ports:
- "3000:3000" # 前端
- "5001:5001" # 后端
volumes:
- ./backend/uploads:/app/backend/uploads # 持久化数据
env_file:
- .env # 读取环境变量
restart: unless-stopped
部署步骤
# 1. 配置环境变量
cp .env.example .env
# 编辑 .env
# 2. 拉取镜像并启动
docker compose up -d
# 3. 查看日志
docker compose logs -f
Dockerfile 分析
FROM python:3.11
# 安装 Node.js 18+
RUN curl -fsSL https://deb.nodesource.com/setup_18.x | bash - && \
apt-get install -y nodejs
# 安装 uv(从官方镜像复制)
COPY --from=ghcr.io/astral-sh/uv:0.9.26 /uv /uvx /bin/
# 层优化: 先复制依赖配置文件
COPY package.json package-lock.json ./
COPY frontend/package.json frontend/package-lock.json frontend/
COPY backend/pyproject.toml backend/uv.lock backend/
# 安装依赖(利用 Docker 缓存)
RUN npm install && cd frontend && npm install
RUN cd backend && uv sync
# 复制源码
COPY . .
EXPOSE 3000 5001
CMD ["npm", "run", "dev"]
13.3 CI/CD 流水线
文件: .github/workflows/docker-image.yml
on:
workflow_dispatch: # 手动触发
push:
tags: ["*"] # 推送 tag 时触发
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: docker/setup-qemu-action@v3 # 多架构支持
- uses: docker/setup-buildx-action@v3
- uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- uses: docker/metadata-action@v5 # 自动标签
id: meta
- uses: docker/build-push-action@v5
with:
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
推送目标: ghcr.io/{owner}/mirofish
标签策略: Git ref + commit SHA + latest
13.4 日志管理
后端日志
backend/logs/
├── 2025-01-01.log # 日期轮转日志
├── 2025-01-02.log
└── ...
# 每个文件最大 10MB,保留 5 个备份
# 日志级别: DEBUG(文件)、INFO(控制台)
仿真日志
backend/uploads/simulations/sim_xxx/
├── simulation.log # 仿真进程标准输出
├── twitter/actions.jsonl # Twitter 行为日志
└── reddit/actions.jsonl # Reddit 行为日志
报告日志
backend/uploads/reports/report_xxx/
├── agent_log.jsonl # 结构化 Agent 执行日志
└── console_log.txt # 纯文本控制台日志
13.5 数据持久化
所有项目数据存储在 backend/uploads/ 目录。Docker 部署时,该目录通过 volume 映射到宿主机,确保容器重启后数据不丢失。
注意事项:
- TaskManager 是纯内存存储,重启后任务进度丢失(但仿真状态文件保留)
- 仿真运行中的子进程在容器重启后终止,需要手动重新启动
- Zep Cloud 中的图谱数据由 Zep 管理,不依赖本地存储
第十四部分:完整工作流程
14.1 端到端流程详解
以下是用户从上传文档到获取预测报告的完整端到端流程,涉及的每一步技术细节。
流程 1:文档上传与本体论生成
用户操作: 在首页上传 PDF/MD/TXT 文件,输入仿真需求描述
技术流程:
[浏览器]
│
├─ 1. 用户选择文件 + 输入仿真需求
│ Home.vue → setPendingUpload(files, requirement)
│ 导航到 /process/new
│
├─ 2. MainView.vue 检测 projectId='new'
│ 从 pendingUpload 获取 files 和 requirement
│ 构建 FormData (multipart/form-data)
│
├─ 3. POST /api/graph/ontology/generate
│ │
│ ▼
[Flask 后端]
│
├─ 4. ProjectManager.create_project() → proj_xxxxx
│ 创建 uploads/projects/proj_xxxxx/ 目录
│
├─ 5. 保存文件 → projects/proj_xxxxx/files/
│
├─ 6. FileParser.extract_text() 对每个文件:
│ PDF → PyMuPDF 逐页提取文本
│ MD/TXT → UTF-8 多级回退读取
│
├─ 7. TextProcessor.preprocess_text()
│ 规范化换行、去除连续空行
│ 保存到 extracted_text.txt
│
├─ 8. OntologyGenerator.generate()
│ │
│ ├─ 构建 user_message(文档文本 + 仿真需求)
│ ├─ 调用 LLM(system=ONTOLOGY_SYSTEM_PROMPT)
│ │ │
│ │ ▼
│ │ [LLM API]
│ │ 分析文档内容和仿真需求
│ │ 返回 JSON:
│ │ entity_types: 10 种实体类型
│ │ edge_types: 6-10 种关系类型
│ │ analysis_summary: 中文分析摘要
│ │
│ ├─ _validate_and_process()
│ │ 确保 10 个实体类型
│ │ 确保 Person 和 Organization 兜底类型存在
│ │ 验证属性名不冲突
│ │
│ └─ 返回 ontology dict
│
├─ 9. 更新 Project 状态 → ONTOLOGY_GENERATED
│ 保存 ontology 到 project.json
│
└─ 10. 返回响应(project_id, ontology, analysis_summary)
│
▼
[浏览器]
│
└─ 11. MainView 更新路由 /process/{project_id}
Step1GraphBuild 展示本体论结果
显示实体类型标签、关系类型标签
流程 2:知识图谱构建
用户操作: 点击"开始构建图谱"
[浏览器]
│
├─ 1. POST /api/graph/build { project_id, chunk_size:500, chunk_overlap:50 }
│ │
│ ▼
[Flask 后端]
│
├─ 2. 验证 Project 状态 = ONTOLOGY_GENERATED
│
├─ 3. 创建 Task(task_type="构建图谱")→ task_id
│ 更新 Project 状态 → GRAPH_BUILDING
│
├─ 4. 启动后台线程(daemon=True)
│ │
│ ├─ 5. TextProcessor.split_text(extracted_text, 500, 50)
│ │ → 文本分成 N 个块,每块约 500 字符
│ │
│ ├─ 6. GraphBuilderService.create_graph("项目名")
│ │ → 调用 Zep API 创建图谱 → graph_id = mirofish_{uuid}
│ │
│ ├─ 7. GraphBuilderService.set_ontology(graph_id, ontology)
│ │ │
│ │ ├─ 对每个 entity_type 动态创建 Pydantic 模型
│ │ │ class Student(BaseModel):
│ │ │ major: Optional[str] = Field(description="专业")
│ │ │ grade: Optional[str] = Field(description="年级")
│ │ │
│ │ ├─ 对每个 edge_type 动态创建 Pydantic 模型
│ │ │ class STUDIES_AT(BaseModel):
│ │ │ ...
│ │ │
│ │ └─ 调用 client.graph.set_ontology()
│ │ │
│ │ ▼
│ │ [Zep Cloud] 注册本体论到图谱
│ │
│ ├─ 8. GraphBuilderService.add_text_batches()
│ │ 每批 3 个 chunk → client.graph.add_batch()
│ │ 批间间隔 1 秒
│ │ → 返回 episode UUID 列表
│ │ │
│ │ ▼
│ │ [Zep Cloud] 接收文本,进行:
│ │ - 实体识别(NER)
│ │ - 关系抽取
│ │ - 根据本体论分类实体
│ │ - 构建知识图谱节点和边
│ │
│ ├─ 9. _wait_for_episodes(episode_uuids, timeout=600)
│ │ 每 3 秒查询 episode.processed 状态
│ │ 直到所有 episode 处理完毕
│ │
│ ├─ 10. _get_graph_info(graph_id)
│ │ 使用分页工具获取所有节点和边
│ │ 统计: node_count, edge_count, entity_types
│ │
│ └─ 11. 更新 Project 状态 → GRAPH_COMPLETED
│ 保存 graph_id 到 project.json
│ 完成 Task(progress=100)
│
└─ 返回 { task_id }
│
▼
[浏览器]
│
└─ 每 2 秒轮询 GET /api/graph/task/{task_id}
→ 更新进度条
→ 完成后加载图谱数据
→ GraphPanel 渲染 D3 力导向图
流程 3:仿真环境准备
用户操作: 点击"进入环境搭建"
[浏览器]
│
├─ 1. POST /api/simulation/create
│ → simulation_id = sim_xxxxx
│
├─ 2. 导航到 /simulation/{simulation_id}
│
├─ 3. POST /api/simulation/prepare
│ │
│ ▼
[Flask 后端] → 后台线程
│
├─ 阶段 1: 读取实体 (进度 0-20%)
│ ZepEntityReader.filter_defined_entities()
│ → 获取所有图谱节点
│ → 过滤: 仅保留本体论定义的实体类型
│ → 丰富: 为每个实体获取关联边和节点
│ → 结果: FilteredEntities (例 42 个实体, 8 种类型)
│
├─ 阶段 2: 生成人设 (进度 20-70%)
│ OasisProfileGenerator(graph_id=xxx)
│ │
│ ├─ 对每个实体(5 线程并行):
│ │ ├─ _build_entity_context()
│ │ │ 合并属性 + 关系 + Zep 搜索结果
│ │ │
│ │ ├─ _generate_profile_with_llm()
│ │ │ │
│ │ │ ▼
│ │ │ [LLM API]
│ │ │ 输入: 实体名、类型、摘要、上下文
│ │ │ 输出 JSON:
│ │ │ bio: "北京大学计算机系大四学生..."
│ │ │ persona: "张三是一名对技术充满热情的..."
│ │ │ age: 22, gender: "male", mbti: "INTJ"
│ │ │ country: "China", profession: "Student"
│ │ │ interested_topics: ["AI", "开源"]
│ │ │
│ │ └─ 写入 reddit_profiles.json(实时)
│ │
│ └─ 42 个 OasisAgentProfile 生成完毕
│
├─ 阶段 3: 生成仿真配置 (进度 70-90%)
│ SimulationConfigGenerator
│ │
│ ├─ _generate_time_config() → [LLM] → 时间参数
│ │ total_simulation_hours: 72
│ │ minutes_per_round: 60
│ │ agents_per_hour_min: 3, max: 9
│ │ peak_hours: [19,20,21,22]
│ │
│ ├─ _generate_event_config() → [LLM] → 事件/话题
│ │ hot_topics: ["政策改革", "学生权益"]
│ │ initial_posts: [{content: "...", poster_type: "Student"}]
│ │ narrative_direction: "关注政策对学生的影响"
│ │
│ ├─ _generate_agent_configs_batch() → [LLM] × N 批
│ │ 每批 15 个智能体的活动参数
│ │
│ ├─ _assign_initial_post_agents()
│ │ 匹配初始帖子发布者
│ │
│ └─ 保存 simulation_config.json
│
└─ 更新状态 → READY
│
▼
[浏览器]
│
└─ Step2EnvSetup 实时展示:
→ 人设生成进度(每 3 秒轮询)
→ 配置参数预览
→ 时间线可视化
→ 自定义轮数滑块
流程 4:仿真执行
用户操作: 设置轮数,点击"开始模拟"
[浏览器]
│
├─ POST /api/simulation/start
│ { simulation_id, platform:"parallel", max_rounds:72 }
│ │
│ ▼
[Flask 后端]
│
├─ SimulationRunner.start_simulation()
│ │
│ ├─ 构建命令:
│ │ python run_parallel_simulation.py --config config.json --max-rounds 72
│ │
│ ├─ subprocess.Popen(cmd, start_new_session=True)
│ │ → 启动独立进程组
│ │ → stdout/stderr → simulation.log
│ │
│ └─ 启动 _monitor_simulation 线程
│ │
│ ▼
[OASIS 仿真子进程]
│
├─ 加载 simulation_config.json
├─ 加载 reddit_profiles.json / twitter_profiles.csv
├─ 初始化 Twitter 环境 (6 种动作)
├─ 初始化 Reddit 环境 (13 种动作)
│
├─ 执行初始事件(种子帖子)
│
├─ 主循环 (72 轮):
│ │
│ ├─ 计算当前模拟时间 (round × 60min / 60 = 小时)
│ ├─ 确定活跃智能体数量(时段乘数调节)
│ │ 高峰(19-22): × 1.5
│ │ 低谷(0-5): × 0.05
│ │ 早间(6-8): × 0.4
│ │ 工作(9-18): × 0.7
│ │
│ ├─ 选择活跃智能体:
│ │ 过滤: 当前时段在 active_hours 中
│ │ 概率: random() < activity_level
│ │ 抽样: random.sample(candidates, target_count)
│ │
│ ├─ Twitter 环境执行:
│ │ 每个活跃智能体:
│ │ [LLM] → 根据 persona + 当前帖子 → 决策行为
│ │ → 执行 CREATE_POST / LIKE / REPOST / ...
│ │ → 写入 twitter/actions.jsonl
│ │
│ ├─ Reddit 环境执行:
│ │ 类似,但有更多行为类型
│ │ → 写入 reddit/actions.jsonl
│ │
│ └─ 写入轮次结束事件
│
├─ 仿真完成,写入 simulation_end 事件
│
└─ 进入 IPC 等待模式(等待 interview 命令)
│
▼
[监控线程] (Flask 内)
│
├─ 每秒读取 actions.jsonl(增量读取)
├─ 解析 AgentAction,更新 RunState
├─ 检测 simulation_end 事件
├─ 可选: 将行为发送到 ZepGraphMemoryUpdater
│ → 自然语言描述 → Zep 图谱更新
└─ 保存 run_state.json
│
▼
[浏览器]
│
└─ Step3Simulation 每 2 秒轮询:
→ 更新双平台进度
→ 时间线展示最新行为
→ 完成后显示"生成报告"按钮
流程 5:报告生成
用户操作: 点击"生成报告"
[浏览器]
│
├─ POST /api/report/generate { simulation_id }
│ │
│ ▼
[Flask 后端] → 后台线程
│
├─ 创建 ReportAgent(graph_id, simulation_id, requirement)
│
├─ 规划阶段:
│ [LLM] → 生成报告大纲 (ReportOutline)
│ 标题、摘要、8-12 个章节
│
├─ 对每个章节 (ReACT 循环):
│ │
│ ├─ 思考: "我需要什么信息来写这个章节?"
│ │
│ ├─ 行动: 调用工具
│ │ ├─ insight_forge → [Zep] 多维搜索 + 子问题分解
│ │ ├─ panorama_search → [Zep] 全景搜索
│ │ ├─ quick_search → [Zep] 快速搜索
│ │ └─ interview_agents → [IPC] → [OASIS] 智能体访谈
│ │
│ ├─ 观察: 分析工具返回结果
│ │
│ ├─ 重复(最多 5 次工具调用,3 轮反思)
│ │
│ └─ 撰写: [LLM] 基于收集的信息生成章节内容
│ → 保存 section_XX.md
│ → 更新进度
│
├─ 合并所有章节 → full_report.md
│
└─ 更新 Report 状态 → COMPLETED
│
▼
[浏览器]
│
└─ Step4Report:
→ 左面板: 分节展示报告内容
→ 右面板: Agent 日志时间线
→ 完成后显示"深度交互"按钮
流程 6:深度交互
用户操作: 在交互面板中选择对话对象并发送消息
模式 A: 与 ReportAgent 对话
[浏览器]
├─ POST /api/report/chat { simulation_id, message, chat_history }
│ │
│ ▼
│ [Flask] → ReportAgent.chat()
│ │
│ ├─ [LLM] 分析问题
│ ├─ 可能调用工具 (最多 2 次/轮)
│ └─ [LLM] 生成回答
│
└─ 显示回答 + 工具调用记录
模式 B: 与仿真智能体对话
[浏览器]
├─ POST /api/simulation/interview
│ { simulation_id, interviews: [{agent_id, prompt}] }
│ │
│ ▼
│ [Flask] → SimulationIPCClient.send_interview()
│ │
│ ├─ 写入 ipc_commands/{cmd_id}.json
│ │ │
│ │ ▼
│ │ [OASIS 进程] SimulationIPCServer.poll_commands()
│ │ → 找到智能体
│ │ → [LLM] 以智能体人设回答问题
│ │ → 写入 ipc_responses/{cmd_id}.json
│ │
│ └─ 轮询响应文件 → 返回回答
│
└─ 显示智能体回复
模式 C: 批量调研
[浏览器]
├─ 选择多个智能体 + 输入调研问题
├─ POST /api/simulation/interview
│ { interviews: [{agent1}, {agent2}, ...] }
│ │
│ ▼
│ [Flask] → SimulationIPCClient.send_batch_interview()
│ → 同上,但批量处理
│
└─ 网格展示所有智能体的回答
14.2 数据流总结
文档输入:
PDF/MD/TXT → extracted_text.txt (纯文本)
知识表示:
extracted_text → Zep Cloud Graph (节点+边)
Ontology (JSON) → Pydantic Models → Zep Ontology
智能体表示:
Graph Entities → OasisAgentProfile → reddit_profiles.json / twitter_profiles.csv
仿真配置:
Document + Requirement + Entities → simulation_config.json
仿真输出:
OASIS 子进程 → actions.jsonl (JSONL 行为日志)
可选: actions → ZepGraphMemoryUpdater → Zep Graph 更新
报告输出:
Zep Graph + actions.jsonl → ReportAgent → full_report.md
交互数据:
用户消息 → LLM/IPC → 回答文本
第十五部分:关键设计模式与原理
15.1 异步任务模式
MiroFish 中多个长耗时操作(图谱构建、仿真准备、报告生成)采用统一的异步任务模式:
1. API 接收请求 → 创建 Task → 返回 task_id(立即响应)
2. 启动后台 daemon 线程执行实际工作
3. 工作线程通过 TaskManager.update_task() 更新进度
4. 前端定时轮询 GET /api/graph/task/{task_id}
5. 任务完成/失败 → 前端获取最终结果
为什么使用 daemon 线程而非异步框架:
- Flask 本身是同步框架,使用 daemon 线程是最简单的异步方案
- 避免引入 Celery 等重量级任务队列的复杂性
- daemon 线程在主进程退出时自动终止,不会留下孤儿进程
15.2 进度回调模式
所有长操作接受 progress_callback 函数参数:
def progress_callback(stage, progress, message, **kwargs):
"""
stage: 当前阶段名称
progress: 0-100 百分比
message: 人类可读的进度描述
"""
task_manager.update_task(task_id, progress=progress, message=message)
这种设计实现了业务逻辑与进度报告的解耦:
- Service 层不需要知道 Task 的存在
- 进度更新的具体行为由调用者决定
- 相同的 Service 可以在有/无进度回调的情况下工作
15.3 文件系统 IPC 设计
这是 MiroFish 最独特的设计之一。选择文件系统 IPC 而非 Socket/RPC 的原因:
- 跨平台兼容: 文件系统操作在 Windows/Linux/macOS 上行为一致
- 进程隔离: OASIS 仿真作为独立子进程运行,不依赖 Flask 进程空间
- 调试友好: JSON 文件可以直接查看,方便排查问题
- 容错性强: 即使一方崩溃,文件仍然存在,另一方可以检测到
权衡: 轮询开销(每 0.5 秒检查文件系统)是其代价,但对于低频命令(如访谈),这完全可以接受。
15.4 状态机模式
项目(Project)和仿真(Simulation)都使用显式状态机管理生命周期:
项目状态机:
CREATED → ONTOLOGY_GENERATED → GRAPH_BUILDING → GRAPH_COMPLETED
↓
FAILED
仿真状态机:
CREATED → PREPARING → READY → RUNNING → COMPLETED
↓ ↓ ↓
FAILED PAUSED STOPPED
↓
FAILED
每个状态转换都在 API 层进行验证,防止非法操作(如在 CREATED 状态直接启动仿真)。
15.5 重试与容错
系统在多个层面实现了重试机制:
| 层面 | 实现方式 | 参数 |
|---|---|---|
| LLM 调用 | 温度递减重试 | 0.7→0.6→0.5,3 次 |
| Zep API | 指数退避重试装饰器 | 初始 2s,因子 2,最多 3 次 |
| HTTP 请求(前端) | requestWithRetry |
1s→2s→4s,最多 3 次 |
| Zep 分页 | 每页独立重试 | 初始 2s,最多 3 次 |
| 图谱记忆更新 | 批次级重试 | 初始 2s,最多 3 次 |
| JSON 解析 | 修复截断 + 重试 | 闭合括号,多种策略 |
LLM 温度递减策略: 每次重试降低 temperature,使输出更确定性,减少格式错误概率。
15.6 单例模式(线程安全)
TaskManager 使用双重检查锁定(Double-Checked Locking)实现线程安全的单例:
class TaskManager:
_instance = None
_instance_lock = threading.Lock()
@classmethod
def get_instance(cls):
if cls._instance is None: # 第一次检查(无锁,快速路径)
with cls._instance_lock: # 加锁
if cls._instance is None: # 第二次检查(防止竞态条件)
cls._instance = TaskManager()
return cls._instance
所有任务操作通过 _task_lock 保护内部字典,确保并发请求不会破坏数据。
15.7 动态模型生成
OntologyGenerator 生成的本体论需要在运行时转换为 Pydantic 模型,以满足 Zep SDK 的类型要求。这是通过 Pydantic 的 create_model 函数实现的:
from pydantic import create_model, Field
# 运行时动态创建
StudentModel = create_model(
'Student',
major=(Optional[str], Field(default=None, description="专业方向")),
grade=(Optional[str], Field(default=None, description="年级"))
)
挑战: 保留字段名冲突(Zep SDK 使用 name, uuid 等字段名,而本体论可能也定义这些属性)。解决方案是添加 entity_ 前缀。
15.8 ReACT 推理模式
ReportAgent 使用 ReACT(Reasoning and Acting)模式生成报告:
循环开始
│
├─ Reasoning(思考):
│ "我需要分析政策对学生群体的影响。
│ 应该先搜索学生相关的实体和关系。"
│
├─ Acting(行动):
│ 调用 insight_forge(query="政策对学生的影响")
│
├─ Observation(观察):
│ "找到 15 条相关事实,包括学生的发帖内容和关系变化"
│
├─ Reasoning:
│ "信息比较充分了,但缺少第一人称视角"
│
├─ Acting:
│ 调用 interview_agents(agent_type="Student", question="...")
│
├─ Observation:
│ "获取了 3 位学生智能体的访谈回复"
│
└─ 判断信息是否充分 → 是 → 撰写章节内容
→ 否 → 继续循环(最多 5 次工具调用)
15.9 时间模拟机制
OASIS 仿真使用离散时间步进(Discrete Time Stepping)模拟连续时间流逝:
现实世界: 72 小时 = 3 天
模拟参数: minutes_per_round = 60
总轮数: 72 × 60 / 60 = 72 轮
每轮:
模拟时间 = round_num × 60 / 60 = round_num 小时
时段 = 模拟时间 % 24
例: 第 20 轮 = 第 20 小时 = 晚上 8 点(高峰时段)
→ 活跃智能体数 × 1.5 倍
中国时区对齐: 活动模式按北京时间设计,高峰在 19-22 点(社交媒体黄金时段),低谷在 0-5 点(深夜)。
15.10 图谱记忆演化
传统仿真是"只读"知识库 → 生成行为。MiroFish 创新地实现了"读写"闭环:
初始知识图谱
│
├─ 读取 → 生成智能体人设 → 驱动行为
│
└─ 写回 ← 行为自然语言化 ← 智能体行为
│
└─ 更新后的图谱用于:
- 报告生成时搜索仿真中的事件
- 深度交互时引用仿真中的行为
- 后续仿真的知识基础
15.11 前端双面板架构
MainView 等容器视图使用三模式双面板设计:
graph 模式: [GraphPanel 100%] [Steps 隐藏]
split 模式: [GraphPanel 50%] [Steps 50%]
workbench 模式: [GraphPanel 隐藏] [Steps 100%]
面板切换使用 CSS transform + transition 实现平滑动画:
.left-panel {
transition: width 0.4s cubic-bezier(0.4, 0, 0.2, 1);
}
15.12 安全设计
- 文件上传安全:
werkzeug.utils.secure_filename()防止路径遍历 - 文件类型限制: 仅允许 PDF/MD/TXT/Markdown
- 文件大小限制: 50MB (
MAX_CONTENT_LENGTH) - 编码安全: 所有文件操作使用 UTF-8,Windows 兼容处理
- 进程隔离: OASIS 子进程使用
start_new_session=True创建独立进程组 - 敏感信息: API Key 通过环境变量配置,不硬编码在代码中
附录 A:API 完整清单
| 方法 | 路径 | 功能 |
|---|---|---|
| GET | /health |
健康检查 |
| POST | /api/graph/ontology/generate |
生成本体论 |
| POST | /api/graph/build |
构建知识图谱 |
| GET | /api/graph/task/{id} |
查询任务状态 |
| GET | /api/graph/tasks |
列出所有任务 |
| GET | /api/graph/data/{id} |
获取图谱数据 |
| DELETE | /api/graph/delete/{id} |
删除图谱 |
| GET | /api/graph/project/{id} |
获取项目 |
| GET | /api/graph/project/list |
列出项目 |
| DELETE | /api/graph/project/{id} |
删除项目 |
| POST | /api/graph/project/{id}/reset |
重置项目 |
| GET | /api/simulation/entities/{gid} |
获取实体 |
| GET | /api/simulation/entities/{gid}/{uuid} |
单个实体 |
| GET | /api/simulation/entities/{gid}/by-type/{t} |
按类型获取 |
| POST | /api/simulation/create |
创建仿真 |
| POST | /api/simulation/prepare |
准备仿真 |
| POST | /api/simulation/prepare/status |
准备进度 |
| GET | /api/simulation/{id} |
获取仿真状态 |
| GET | /api/simulation/list |
列出仿真 |
| GET | /api/simulation/history |
历史记录 |
| GET | /api/simulation/{id}/profiles |
智能体人设 |
| GET | /api/simulation/{id}/profiles/realtime |
实时人设 |
| GET | /api/simulation/{id}/config |
仿真配置 |
| GET | /api/simulation/{id}/config/realtime |
实时配置 |
| POST | /api/simulation/start |
启动仿真 |
| POST | /api/simulation/stop |
停止仿真 |
| GET | /api/simulation/{id}/run-status |
运行状态 |
| GET | /api/simulation/{id}/actions |
行为日志 |
| POST | /api/simulation/generate-profiles |
生成人设 |
| POST | /api/report/generate |
生成报告 |
| POST | /api/report/generate/status |
生成进度 |
| GET | /api/report/{id} |
获取报告 |
| GET | /api/report/by-simulation/{id} |
按仿真获取 |
| GET | /api/report/list |
列出报告 |
| GET | /api/report/{id}/download |
下载报告 |
| POST | /api/report/chat |
报告对话 |
| GET | /api/report/{id}/progress |
报告进度 |
| GET | /api/report/{id}/sections |
报告章节 |
| GET | /api/report/{id}/section/{n} |
单个章节 |
| GET | /api/report/{id}/agent-log |
Agent 日志 |
| GET | /api/report/{id}/agent-log/stream |
日志流 |
| GET | /api/report/{id}/console-log |
控制台日志 |
| GET | /api/report/check/{sim_id} |
报告检查 |
附录 B:数据结构参考
B.1 simulation_config.json 完整结构
{
"simulation_id": "sim_xxxxx",
"project_id": "proj_xxxxx",
"graph_id": "mirofish_xxxxx",
"simulation_requirement": "用户输入的仿真需求",
"created_at": "2025-01-01T00:00:00",
"time_config": {
"total_simulation_hours": 72,
"minutes_per_round": 60,
"agents_per_hour_min": 3,
"agents_per_hour_max": 9,
"peak_hours": [19, 20, 21, 22],
"off_peak_hours": [0, 1, 2, 3, 4, 5],
"morning_hours": [6, 7, 8],
"work_hours": [9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
"peak_activity_multiplier": 1.5,
"off_peak_activity_multiplier": 0.05,
"morning_activity_multiplier": 0.4,
"work_activity_multiplier": 0.7,
"reasoning": "LLM 的推理解释..."
},
"agent_configs": [
{
"agent_id": 0,
"entity_uuid": "xxx",
"entity_name": "张三",
"entity_type": "Student",
"activity_level": 0.8,
"posts_per_hour": 0.6,
"comments_per_hour": 1.2,
"active_hours": [8, 9, 10, 12, 13, 14, 19, 20, 21, 22, 23],
"response_delay_min": 5,
"response_delay_max": 30,
"sentiment_bias": "slightly_negative",
"stance": "反对方",
"influence_weight": 0.8
}
],
"event_config": {
"hot_topics": ["政策改革", "学生权益", "校园生活"],
"narrative_direction": "围绕政策对学生群体的影响展开讨论",
"initial_posts": [
{
"content": "刚看到新政策的公告,大家怎么看?",
"poster_type": "Student",
"poster_agent_id": 0,
"platform": "both"
}
]
},
"platform_config": {
"twitter": {
"platform": "twitter",
"recency_weight": 0.4,
"popularity_weight": 0.3,
"relevance_weight": 0.3,
"viral_threshold": 0.7,
"echo_chamber_strength": 0.3
},
"reddit": {
"platform": "reddit",
"recency_weight": 0.3,
"popularity_weight": 0.4,
"relevance_weight": 0.3,
"viral_threshold": 0.6,
"echo_chamber_strength": 0.4
}
},
"llm_config": {
"api_key": "***",
"base_url": "https://...",
"model_name": "qwen-plus"
}
}
B.2 Reddit Agent Profile 完整结构
{
"user_id": 0,
"realname": "张三",
"username": "zhang_san_123",
"bio": "北京大学计算机科学与技术学院大四学生,热爱开源技术和AI研究",
"persona": "张三是一名北京大学计算机系的大四学生,来自湖北武汉。他性格内向但思维敏捷,是INTJ类型的人。在校期间参与了多个开源项目,对人工智能和机器学习有浓厚兴趣。他关注科技新闻和政策动态,倾向于通过数据和逻辑来分析问题。在社交媒体上,他通常会发表有深度的技术观点和对社会事件的理性分析。面对争议话题,他倾向于收集多方信息后再表态,但一旦形成观点则会坚定表达。",
"age": 22,
"gender": "male",
"mbti": "INTJ",
"country": "China",
"profession": "Student",
"interested_topics": ["人工智能", "开源技术", "政策分析", "校园生活"],
"source_entity_uuid": "xxxx-xxxx",
"source_entity_type": "Student"
}
附录 C:许可证
MiroFish 使用 AGPL-3.0 开源许可证。
文档完
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)