AI智能体平台开发文档
AI智能体平台开发文档
从零开始,手把手教你搭建一套生产级 AI 智能体管理平台
本文档将完整讲解框架设计、目录结构、代码逻辑、模块实现、扩展方法
目录
- 第一章 项目总览
- 第二章 架构设计
- 第三章 环境搭建
- 第四章 目录结构详解
- 第五章 配置系统
- 第六章 数据库层
- 第七章 智能体核心
- 第八章 技能系统
- 第九章 RAG 知识库
- 第十章 多智能体协作
- 第十一章 反思式推理
- 第十二章 FastAPI 路由层
- 第十三章 前端实现
- 第十四章 启动与部署
- 第十五章 扩展开发指南
第一章 项目总览
1.1 项目目标
构建一个开箱即用的 AI 智能体管理平台,提供以下能力:
- ✅ 智能体管理 - 创建/删除/对话多个独立 AI 角色
- ✅ 技能/工具系统 - 函数注册 + Function Calling
- ✅ RAG 知识库 - Milvus Lite + Embedding 实现文档问答
- ✅ 多智能体协作 - 角色分工 + 自动任务流
- ✅ 反思式推理 - 三步推理提升准确性
- ✅ 多格式文档解析 - TXT/PDF/DOCX/XLSX/ZIP/MD
1.2 核心特性
| 特性 | 说明 |
|---|---|
| 零数据库 | 内置 SQLite,开箱即用 |
| 零向量库服务 | Milvus Lite 嵌入式 |
| 零配置 | .env 一份配置文件 |
| 跨平台 | Windows / macOS / Linux |
| 异步高性能 | FastAPI async + httpx |
| 完整 REST API | OpenAPI 文档自动生成 |
1.3 技术栈
后端:
- Python 3.10+
- FastAPI(Web 框架)
- SQLAlchemy(ORM)
- Milvus Lite(向量库)
- httpx(异步 HTTP 客户端)
- loguru(日志)
前端:
- Vue 3(CDN 引入)
- Axios(HTTP 客户端)
第二章 架构设计
2.1 分层架构
┌─────────────────────────────────────────────────────────┐
│ 表现层 (Presentation) │
│ 浏览器 / Vue 3 / Axios │
└────────────────────────┬────────────────────────────────┘
│ HTTP
┌────────────────────────▼────────────────────────────────┐
│ 路由层 (Routes) │
│ FastAPI │
│ /agents /skills /rag /crewai /rerct /system │
└────────────────────────┬────────────────────────────────┘
│
┌────────────────────────▼────────────────────────────────┐
│ 业务逻辑层 (Services) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Agent │ │ Skill │ │ RAG │ │ CrewAI │ │
│ │ Manager │ │ Registry │ │ System │ │ /ReRCT │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────┬────────────────────────────────┘
│
┌────────────────────────▼────────────────────────────────┐
│ 基础设施层 (Infrastructure) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ SQLite │ │ Milvus │ │ LLM │ │ 日志 │ │
│ │ (ORM) │ │ Lite │ │ (httpx) │ │ (loguru) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────┘
2.2 核心设计模式
- 单例模式 -
RAGSystem、AgentManager、EmbeddingService全局单例 - 注册表模式 -
SkillRegistry动态注册技能 - 工厂模式 -
DocumentParserFactory多格式解析 - 策略模式 - 不同 Embedding 策略可切换
- 模板方法 -
SkillBase抽象基类,子类实现具体逻辑
2.3 数据流
智能体对话流程
用户输入
│
▼
[可选] RAG 检索 ──── 查询 Milvus Lite ──── 返回相关文档
│
▼
构造 prompt(系统提示 + 历史 + 用户消息 + RAG 上下文)
│
▼
调用 LLM(OpenAI 兼容 API via httpx)
│
▼
[如有] LLM 触发 Function Calling ──── 调用 Skill Registry ──── 执行具体工具
│
▼
工具结果返回给 LLM ──── 重新生成最终回答
│
▼
保存到对话历史 ──── 返回用户
文档上传与检索流程
用户上传文件
│
▼
DocumentParserFactory 解析 ──── TXT/PDF/DOCX/XLSX/ZIP
│
▼
TextChunker 切分(500 字/片,50 字 overlap)
│
▼
EmbeddingService 向量化(OpenAI API 或本地模型)
│
▼
存入 Milvus Lite collection
│
▼
[查询] Embedding 问题 ──── Milvus 检索 top-k ──── 返回相关文档
第三章 环境搭建
3.1 Python 版本
要求 Python 3.10+:
python --version
# Python 3.10.x 或更高
3.2 创建虚拟环境(推荐)
# Windows
python -m venv venv
venv\Scripts\activate
# macOS / Linux
python3 -m venv venv
source venv/bin/activate
3.3 安装依赖
cd backend
pip install -r requirements.txt
关键依赖说明:
| 包 | 用途 |
|---|---|
fastapi |
Web 框架 |
uvicorn |
ASGI 服务器 |
pydantic / pydantic-settings |
数据验证与配置 |
sqlalchemy |
ORM |
httpx |
异步 HTTP 客户端 |
pymilvus[milvus-lite] |
嵌入式向量库 |
loguru |
日志库 |
pypdf / pdfplumber |
PDF 解析 |
python-docx |
DOCX 解析 |
openpyxl |
XLSX 解析 |
3.4 配置文件
cp .env.example .env
编辑 .env:
# 大模型配置
OPENAI_API_KEY=sk-xxxxxxxx
OPENAI_BASE_URL=https://api.openai.com/v1
OPENAI_MODEL=gpt-3.5-turbo
# 向量库(Milvus Lite 嵌入式)
MILVUS_DB_PATH=./data/milvus_lite.db
EMBEDDING_DIM=1536
EMBEDDING_MODEL=text-embedding-ada-002
USE_LOCAL_EMBEDDING=false
3.5 启动服务
python main.py
访问 http://localhost:8000/docs 查看 API 文档。
第四章 目录结构详解
ai-platform/
├── backend/ # Python 后端
│ ├── main.py # FastAPI 入口
│ ├── requirements.txt # 依赖清单
│ ├── .env.example # 配置模板
│ │
│ ├── core/ # 核心配置
│ │ └── config.py # Settings(pydantic-settings)
│ │
│ ├── db/ # 数据库层
│ │ └── models.py # SQLAlchemy 模型定义
│ │
│ ├── agents/ # 智能体核心
│ │ ├── manager.py # AgentManager 单例
│ │ ├── crewai.py # 多智能体协作
│ │ └── rerct.py # 反思式推理
│ │
│ ├── skills/ # 技能系统
│ │ └── registry.py # SkillRegistry + SkillBase
│ │
│ ├── rag/ # 知识库
│ │ ├── rag_system.py # RAGSystem + EmbeddingService
│ │ └── document_parser.py # DocumentParserFactory
│ │
│ └── scheduler/ # (已移除)
│
├── frontend/ # 前端
│ ├── index.html # Vue 3 单页应用
│ └── styles.css # 样式
│
├── data/ # 数据目录(自动创建)
│ ├── uploads/ # 上传文件
│ ├── platform.db # SQLite 数据库
│ └── milvus_lite.db # Milvus Lite 数据
│
└── docs/ # 文档
第五章 配置系统
5.1 使用 pydantic-settings
backend/core/config.py 核心代码:
from pydantic_settings import BaseSettings
from pydantic import Field
from pathlib import Path
class Settings(BaseSettings):
# ==================== 应用基础 ====================
APP_NAME: str = "AI智能体平台"
APP_VERSION: str = "1.0.0"
DEBUG: bool = True
HOST: str = "0.0.0.0"
PORT: int = 8000
# ==================== LLM ====================
OPENAI_API_KEY: str = "sk-xxx"
OPENAI_BASE_URL: str = "https://api.openai.com/v1"
OPENAI_MODEL: str = "gpt-3.5-turbo"
LLM_TIMEOUT: int = 60
# ==================== Milvus Lite ====================
MILVUS_DB_PATH: str = "./data/milvus_lite.db"
MILVUS_COLLECTION_PREFIX: str = "ai_platform_"
# ==================== Embedding ====================
EMBEDDING_DIM: int = 1536
USE_LOCAL_EMBEDDING: bool = False
EMBEDDING_MODEL: str = "text-embedding-ada-002"
# ==================== 路径 ====================
DATA_DIR: Path = Field(default_factory=lambda: Path("./data"))
class Config:
env_file = ".env" # 自动读取 .env
env_file_encoding = "utf-8"
case_sensitive = True
def init_dirs(self):
"""启动时自动创建目录"""
for d in [self.DATA_DIR, self.UPLOADS_DIR, self.LOGS_DIR]:
d.mkdir(parents=True, exist_ok=True)
# 全局单例
settings = Settings()
settings.init_dirs()
5.2 使用方式
from core.config import settings
print(settings.OPENAI_MODEL) # gpt-3.5-turbo
print(settings.DATA_DIR) # PosixPath('.../data')
设计要点:
- 使用
pydantic-settings自动从环境变量和.env读取 - 类型安全(自动类型转换)
- 集中管理所有配置
init_dirs()在启动时确保目录存在
第六章 数据库层
6.1 SQLAlchemy + SQLite
backend/db/models.py 设计了 3 张表:
6.1.1 智能体配置表(agent_configs)
class AgentConfig(Base):
__tablename__ = "agent_configs"
id = Column(Integer, primary_key=True)
agent_id = Column(String(50), unique=True) # 业务ID
name = Column(String(100))
description = Column(Text)
system_prompt = Column(Text) # 系统提示词
model = Column(String(50)) # 使用的 LLM
temperature = Column(Integer) # 0-100
skills = Column(Text) # JSON 数组
created_at = Column(DateTime)
updated_at = Column(DateTime)
6.1.2 聊天记录表(chat_histories)
class ChatHistory(Base):
__tablename__ = "chat_histories"
id = Column(Integer, primary_key=True)
session_id = Column(String(50), index=True) # 会话ID
agent_id = Column(String(50), index=True)
role = Column(String(20)) # user / assistant
content = Column(Text)
tool_calls = Column(Text) # JSON
created_at = Column(DateTime)
6.1.3 知识库表(knowledge_bases)
class KnowledgeBase(Base):
__tablename__ = "knowledge_bases"
id = Column(Integer, primary_key=True)
kb_id = Column(String(50), unique=True)
name = Column(String(100))
description = Column(Text)
document_count = Column(Integer)
chunk_count = Column(Integer)
created_at = Column(DateTime)
6.2 数据库初始化
def init_db():
"""创建所有表"""
Base.metadata.create_all(bind=engine)
def get_db():
"""FastAPI 依赖注入"""
db = SessionLocal()
try:
yield db
finally:
db.close()
6.3 在 FastAPI 中使用
from db.models import get_db
from sqlalchemy.orm import Session
@app.get("/api/agents")
async def list_agents(db: Session = Depends(get_db)):
agents = db.query(AgentConfig).all()
return {"agents": [...]}
注意:db: Session = Depends(get_db) 会在请求结束时自动关闭连接。
第七章 智能体核心
7.1 AgentManager 设计
backend/agents/manager.py 是整个系统的核心。
设计思想:
- 单例模式 - 全局唯一管理器
- 内存池 - 所有 AgentRuntime 存在
_agents字典 - 会话隔离 - 每个 agent 的对话历史按
session_id分组 - 异步执行 - 全程使用 async/await
7.2 核心类
class AgentManager:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init()
return cls._instance
def _init(self):
self._agents: Dict[str, AgentRuntime] = {}
def create_agent(self, agent_id, name, system_prompt, skills, model):
runtime = AgentRuntime(agent_id, name, system_prompt, skills, model)
self._agents[agent_id] = runtime
return runtime
def get_agent(self, agent_id):
return self._agents.get(agent_id)
def list_agents(self):
return list(self._agents.values())
async def send_message_between_agents(self, from_id, to_id, message):
"""A2A 通信"""
from_agent = self.get_agent(from_id)
to_agent = self.get_agent(to_id)
# 转发消息到对方上下文
response = await to_agent.process_internal(message, from_id)
return response
# 全局单例
agent_manager = AgentManager()
7.3 AgentRuntime 核心
class AgentRuntime:
def __init__(self, agent_id, name, system_prompt, skills, model):
self.agent_id = agent_id
self.name = name
self.system_prompt = system_prompt
self.skills = skills or []
self.model = model or settings.OPENAI_MODEL
# 按 session_id 分组的对话历史
# 结构: {session_id: [{"role": "user", "content": "..."}]}
self.sessions: Dict[str, List[Dict]] = {}
async def chat(self, message, session_id="default"):
"""用户对话"""
# 1. 初始化会话
if session_id not in self.sessions:
self.sessions[session_id] = []
# 2. 添加用户消息
self.sessions[session_id].append({"role": "user", "content": message})
# 3. 调用 LLM
messages = [{"role": "system", "content": self.system_prompt}] + \
self.sessions[session_id]
response = await self._call_llm(messages)
# 4. 保存回复
self.sessions[session_id].append({"role": "assistant", "content": response})
return response
async def _call_llm(self, messages):
"""调用 OpenAI 兼容 API"""
import httpx
url = f"{settings.OPENAI_BASE_URL}/chat/completions"
headers = {
"Authorization": f"Bearer {settings.OPENAI_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": self.model,
"messages": messages,
"temperature": 0.7
}
# 如果绑定了技能,传入工具定义
if self.skills:
tools = registry.get_tools_for_skills(self.skills)
payload["tools"] = tools
async with httpx.AsyncClient(timeout=settings.LLM_TIMEOUT) as client:
response = await client.post(url, json=payload, headers=headers)
data = response.json()
return data["choices"][0]["message"]["content"]
7.4 Function Calling 处理
当 LLM 返回 tool_calls 时,需要循环调用工具直到拿到最终答案:
async def _call_llm_with_tools(self, messages):
"""支持 Function Calling 的 LLM 调用"""
max_iterations = 5
for _ in range(max_iterations):
response = await self._call_llm(messages)
message = response.choices[0].message
# 没有工具调用,直接返回
if not message.tool_calls:
return message.content
# 执行工具
messages.append(message)
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
# 调用工具
result = await registry.execute_tool(tool_name, arguments)
# 工具结果加入消息
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result, ensure_ascii=False)
})
return response.choices[0].message.content
7.5 智能体互通(A2A)
async def process_internal(self, message, sender_id):
"""接收来自其他 Agent 的消息"""
system_msg = f"[系统] 你收到了来自 {sender_id} 的消息"
return await self.chat(f"{system_msg}\n{message}", session_id=f"a2a_{sender_id}")
A2A 使用场景:
- 研究员 Agent 找到资料 → 转给写作 Agent
- 客服 Agent 收集需求 → 转给订单 Agent
第八章 技能系统
8.1 核心设计
技能系统基于注册表模式 + Function Calling。
backend/skills/registry.py:
class SkillBase:
"""技能基类"""
@property
def name(self) -> str:
raise NotImplementedError
@property
def description(self) -> str:
raise NotImplementedError
def get_tools(self) -> List[Dict]:
"""返回 OpenAI Function Calling 格式的工具定义"""
raise NotImplementedError
async def execute(self, tool_name: str, arguments: Dict) -> Any:
"""执行具体工具"""
raise NotImplementedError
8.2 内置技能:Calculator
class CalculatorSkill(SkillBase):
@property
def name(self): return "calculator"
@property
def description(self): return "执行数学计算,支持四则运算、函数"
def get_tools(self):
return [{
"type": "function",
"function": {
"name": "calculate",
"description": "计算数学表达式",
"parameters": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "数学表达式,如 1+2*3"
}
},
"required": ["expression"]
}
}
}]
async def execute(self, tool_name, arguments):
if tool_name == "calculate":
expr = arguments.get("expression", "")
# 安全过滤:只允许数字和运算符
safe = "".join(c for c in expr if c in "0123456789.+-*/()% ")
try:
return {"result": eval(safe)}
except Exception as e:
return {"error": str(e)}
8.3 技能注册表
class SkillRegistry:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init()
return cls._instance
def _init(self):
self._skills: Dict[str, SkillBase] = {}
# 注册内置技能
self.register(CalculatorSkill())
self.register(WebSearchSkill())
self.register(DateTimeSkill())
def register(self, skill: SkillBase):
self._skills[skill.name] = skill
def list_skills(self):
return [
{"name": s.name, "description": s.description}
for s in self._skills.values()
]
def get_tools_for_skills(self, skill_names):
"""获取指定技能的工具定义"""
tools = []
for name in skill_names:
if name in self._skills:
tools.extend(self._skills[name].get_tools())
return tools
async def execute_tool(self, tool_name, arguments):
"""根据工具名找到对应技能并执行"""
for skill in self._skills.values():
for tool in skill.get_tools():
if tool["function"]["name"] == tool_name:
return await skill.execute(tool_name, arguments)
return {"error": f"工具 {tool_name} 不存在"}
# 全局注册表
registry = SkillRegistry()
8.4 如何添加自定义技能
# 在 skills/registry.py 中添加
class WeatherSkill(SkillBase):
@property
def name(self): return "weather"
@property
def description(self): return "查询天气"
def get_tools(self):
return [{
"type": "function",
"function": {
"name": "get_weather",
"description": "查询某城市天气",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名"}
},
"required": ["city"]
}
}
}]
async def execute(self, tool_name, arguments):
if tool_name == "get_weather":
city = arguments.get("city")
# 调用天气 API
return {"city": city, "weather": "晴"}
# 注册
registry.register(WeatherSkill())
扩展点:技能系统是平台最易扩展的部分,可以快速接入:
- 邮件发送
- 日历管理
- 数据库查询
- 第三方 API
- 代码执行(沙箱)
第九章 RAG 知识库
9.1 整体架构
┌──────────────┐
│ 文档上传 │
└──────┬───────┘
│
▼
┌──────────────┐
│DocumentParser│ (TXT/PDF/DOCX/XLSX/ZIP/MD)
└──────┬───────┘
│ 纯文本
▼
┌──────────────┐
│TextChunker │ (智能分片 500+50)
└──────┬───────┘
│ 文本片段列表
▼
┌──────────────┐
│Embedding │ (OpenAI/本地模型)
│Service │
└──────┬───────┘
│ 向量列表
▼
┌──────────────┐
│Milvus Lite │ (IVF_FLAT + COSINE)
│(嵌入式) │
└──────────────┘
[检索] 用户问题 → Embedding → Milvus search → top-k 文档
9.2 EmbeddingService
backend/rag/rag_system.py 核心代码:
class EmbeddingService:
def __init__(self):
from core.config import settings
self.use_local = settings.USE_LOCAL_EMBEDDING
self.dim = settings.EMBEDDING_DIM
self.model_name = settings.EMBEDDING_MODEL
if self.use_local:
from sentence_transformers import SentenceTransformer
self._local_model = SentenceTransformer(self.model_name)
actual_dim = self._local_model.get_sentence_embedding_dimension()
if actual_dim != self.dim:
self.dim = actual_dim
async def embed_texts(self, texts):
if self.use_local:
return self._embed_local(texts)
return await self._embed_openai(texts)
async def _embed_openai(self, texts):
import httpx
url = f"{settings.OPENAI_BASE_URL}/embeddings"
headers = {"Authorization": f"Bearer {settings.OPENAI_API_KEY}"}
payload = {"model": self.model_name, "input": texts}
async with httpx.AsyncClient(timeout=60) as client:
response = await client.post(url, json=payload, headers=headers)
data = response.json()
return [item["embedding"] for item in data["data"]]
双模式:
- OpenAI Embedding - 在线,准确度更高,需要 API Key
- 本地 sentence-transformers - 离线免费,首次需下载模型
9.3 RAGSystem 核心
class RAGSystem:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init()
return cls._instance
def _init(self):
from core.config import settings
from pymilvus import MilvusClient, DataType
# Milvus Lite 嵌入式
db_path = Path(settings.MILVUS_DB_PATH).resolve()
db_path.parent.mkdir(parents=True, exist_ok=True)
self.client = MilvusClient(uri=str(db_path))
self.embedding = EmbeddingService()
self.prefix = settings.MILVUS_COLLECTION_PREFIX
self.dim = self.embedding.dim
def _collection_name(self, kb_id):
return f"{self.prefix}{kb_id}".replace("-", "_")
def create_kb(self, kb_id, name, description=""):
collection_name = self._collection_name(kb_id)
if self.client.has_collection(collection_name):
return {"status": "exists"}
# 定义 schema
schema = self.client.create_schema(auto_id=False, enable_dynamic_field=True)
schema.add_field("id", DataType.VARCHAR, max_length=64, is_primary=True)
schema.add_field("vector", DataType.FLOAT_VECTOR, dim=self.dim)
schema.add_field("text", DataType.VARCHAR, max_length=65535)
schema.add_field("source", DataType.VARCHAR, max_length=512)
# 创建 collection
self.client.create_collection(collection_name=collection_name, schema=schema)
# 创建向量索引
index_params = self.client.prepare_index_params()
index_params.add_index(
field_name="vector",
index_type="IVF_FLAT",
metric_type="COSINE",
params={"nlist": 64}
)
self.client.create_index(collection_name=collection_name, index_params=index_params)
async def add_documents(self, kb_id, texts, metadatas=None, ids=None):
collection_name = self._collection_name(kb_id)
if ids is None:
ids = [str(uuid.uuid4()) for _ in texts]
# 向量化
vectors = await self.embedding.embed_texts(texts)
# 构造数据
data = []
for doc_id, text, vector, meta in zip(ids, texts, vectors, metadatas or [{}]*len(texts)):
data.append({
"id": doc_id,
"vector": vector,
"text": text,
"source": meta.get("filename", "")
})
# 插入
self.client.insert(collection_name=collection_name, data=data)
self.client.load_collection(collection_name)
return ids
async def query(self, kb_id, query_text, top_k=5):
collection_name = self._collection_name(kb_id)
if not self.client.has_collection(collection_name):
return []
# 向量化查询
query_vector = await self.embedding.embed_query(query_text)
# 检索
results = self.client.search(
collection_name=collection_name,
data=[query_vector],
limit=top_k,
output_fields=["text", "source"]
)
# 格式化
formatted = []
for hit in results[0]:
distance = hit.get("distance", 0)
score = max(0, 1 - distance) # COSINE 距离转相似度
entity = hit.get("entity", {})
formatted.append({
"id": hit.get("id"),
"text": entity.get("text"),
"source": entity.get("source"),
"score": score
})
return formatted
9.4 文档解析器工厂
backend/rag/document_parser.py:
class DocumentParser:
"""解析器基类"""
@classmethod
def can_parse(cls, filename):
raise NotImplementedError
@classmethod
def parse(cls, file_path):
raise NotImplementedError
class TXTParser(DocumentParser):
@classmethod
def can_parse(cls, filename):
return filename.endswith(('.txt', '.md'))
@classmethod
def parse(cls, file_path):
return {"content": Path(file_path).read_text(encoding='utf-8')}
class PDFParser(DocumentParser):
@classmethod
def can_parse(cls, filename):
return filename.endswith('.pdf')
@classmethod
def parse(cls, file_path):
import pdfplumber
text = ""
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
text += page.extract_text() or ""
return {"content": text}
class DocumentParserFactory:
_parsers = [TXTParser, PDFParser, DOCXParser, XLSXParser, ZIPParser]
@classmethod
def parse(cls, file_path):
for parser in cls._parsers:
if parser.can_parse(file_path):
return parser.parse(file_path)
raise ValueError(f"不支持的文件类型: {file_path}")
9.5 文本分片
class TextChunker:
def __init__(self, chunk_size=500, overlap=50):
self.chunk_size = chunk_size
self.overlap = overlap
def split(self, text):
"""智能分片:按段落优先,长段落按句子切"""
text = text.strip()
if not text:
return []
paragraphs = text.split("\n\n")
chunks = []
current = ""
for para in paragraphs:
if len(current) + len(para) + 2 > self.chunk_size:
if current:
chunks.append(current.strip())
# 保留 overlap
current = current[-self.overlap:] if len(current) > self.overlap else ""
current += para + "\n\n"
else:
current += para + "\n\n"
if current.strip():
chunks.append(current.strip())
return chunks
分片策略:
- 按段落切分(保留语义完整性)
- 长段落按句子切分
- 50 字 overlap 避免关键信息丢失
- 默认 500 字/片(平衡精度和召回率)
第十章 多智能体协作
10.1 CrewAI 模式
backend/agents/crewai.py 实现一个简化版 CrewAI:
用户任务
│
▼
[信息研究员] - InformationResearcher
│ ├ 联网搜索
│ └ RAG 检索
│
▼
[文稿撰写员] - ReportWriter
│ └ 基于研究材料生成报告
│
▼
最终报告
10.2 角色定义
class InformationResearcher:
"""信息研究员"""
async def research(self, topic):
results = []
# 1. 联网搜索
try:
from duckduckgo_search import DDGS
with DDGS() as ddgs:
search_results = ddgs.text(topic, max_results=5)
for r in search_results:
results.append({
"type": "web",
"title": r.get("title"),
"content": r.get("body")
})
except Exception as e:
logger.warning(f"联网搜索失败: {e}")
# 2. RAG 检索(如有知识库)
try:
kbs = rag_system.list_kbs()
for kb_id in kbs[:2]: # 取前2个
docs = await rag_system.query(kb_id, topic, top_k=3)
for doc in docs:
results.append({
"type": "rag",
"content": doc["text"]
})
except Exception as e:
logger.warning(f"RAG 检索失败: {e}")
return results
class ReportWriter:
"""文稿撰写员"""
async def write(self, topic, research_data):
# 构造 prompt
context = "\n\n".join([
f"[{r.get('type', 'doc')}] {r.get('title', '')}\n{r.get('content', '')}"
for r in research_data
])
prompt = f"""基于以下研究材料,写一份关于 "{topic}" 的专业报告。
研究材料:
{context}
要求:
1. 结构清晰,使用标题分节
2. 数据准确,引用具体来源
3. 不少于 800 字
"""
# 调用 LLM
async with httpx.AsyncClient(timeout=60) as client:
response = await client.post(
f"{settings.OPENAI_BASE_URL}/chat/completions",
headers={"Authorization": f"Bearer {settings.OPENAI_API_KEY}"},
json={
"model": settings.OPENAI_MODEL,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.5
}
)
data = response.json()
return data["choices"][0]["message"]["content"]
10.3 编排器
class CrewAIOrchestrator:
def __init__(self):
self.researcher = InformationResearcher()
self.writer = ReportWriter()
self.history = []
async def execute_task(self, task, steps=None):
# 1. 信息搜集
logger.info(f"[Crew] 信息研究员开始: {task}")
research_data = await self.researcher.research(task)
# 2. 报告撰写
logger.info(f"[Crew] 文稿撰写员开始")
final_report = await self.writer.write(task, research_data)
# 3. 记录历史
result = {
"task": task,
"research_count": len(research_data),
"final_report": final_report,
"created_at": datetime.now().isoformat()
}
self.history.append(result)
return result
# 全局实例
crew = CrewAIOrchestrator()
第十一章 反思式推理
11.1 ReRCT 思想
ReRCT = Reasoning + Reflection + Criticism + reThinking
通过三步推理提升 LLM 回答质量:
- 初步回答 - 直接给出第一反应答案
- 自我反思 - 审查答案的不足
- 优化输出 - 基于反思重新生成
11.2 实现代码
backend/agents/rerct.py:
class ReRCTReasoner:
def __init__(self):
self.model = settings.OPENAI_MODEL
async def reason(self, question, context=None):
# 步骤 1: 初步回答
initial = await self._initial_response(question, context)
# 步骤 2: 自我反思
reflection = await self._reflect(question, initial, context)
# 步骤 3: 优化输出
final = await self._refine(question, initial, reflection, context)
return {
"question": question,
"initial": initial,
"reflection": reflection,
"final": final
}
async def _initial_response(self, question, context):
prompt = f"""请回答以下问题,给出你的第一反应答案。
问题:{question}
{f"参考信息:{context}" if context else ""}
"""
return await self._call_llm(prompt)
async def _reflect(self, question, initial, context):
prompt = f"""请审查以下答案,指出其中的不足、错误或不完整之处。
问题:{question}
初步答案:{initial}
请从以下角度审查:
1. 准确性 - 是否有事实错误?
2. 完整性 - 是否遗漏关键信息?
3. 逻辑性 - 推理是否合理?
4. 相关性 - 是否切题?
"""
return await self._call_llm(prompt)
async def _refine(self, question, initial, reflection, context):
prompt = f"""基于以下信息,给出最终优化答案。
问题:{question}
初步答案:{initial}
审查反馈:{reflection}
{f"参考信息:{context}" if context else ""}
请提供一份更准确、更完整、更专业的答案。
"""
return await self._call_llm(prompt)
async def _call_llm(self, prompt):
async with httpx.AsyncClient(timeout=60) as client:
response = await client.post(
f"{settings.OPENAI_BASE_URL}/chat/completions",
headers={"Authorization": f"Bearer {settings.OPENAI_API_KEY}"},
json={
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.5
}
)
data = response.json()
return data["choices"][0]["message"]["content"]
# 全局实例
reasoner = ReRCTReasoner()
为什么有效:
- 让 LLM 有机会"自我批评"
- 揭示隐藏的逻辑漏洞
- 提升 30-50% 的准确性
第十二章 FastAPI 路由层
12.1 路由设计
backend/main.py 定义所有 API:
| 路由前缀 | 功能 |
|---|---|
/api/agents |
智能体管理 |
/api/agents/chat |
对话接口 |
/api/agents/a2a |
智能体互通 |
/api/skills |
技能管理 |
/api/rag/kbs |
知识库管理 |
/api/rag/upload |
文档上传 |
/api/rag/query |
RAG 检索 |
/api/crewai/execute |
多智能体任务 |
/api/rerct/reason |
反思推理 |
/api/system/info |
系统信息 |
12.2 标准接口模式
from pydantic import BaseModel
class CreateAgentRequest(BaseModel):
"""请求体模型"""
name: str
description: str = ""
system_prompt: str = "你是一个有帮助的AI助手。"
skills: List[str] = []
model: Optional[str] = None
temperature: float = 0.7
@app.post("/api/agents")
async def create_agent(
request: CreateAgentRequest,
db: Session = Depends(get_db)
):
"""创建智能体"""
# 1. 业务逻辑
agent_id = f"agent_{uuid.uuid4().hex[:12]}"
# 2. 持久化
agent_config = AgentConfig(
agent_id=agent_id,
name=request.name,
...
)
db.add(agent_config)
db.commit()
# 3. 内存中创建
agent_manager.create_agent(...)
return {"agent_id": agent_id, "status": "created"}
12.3 文件上传
@app.post("/api/rag/upload")
async def upload_document(
kb_id: str,
file: UploadFile = File(...)
):
# 1. 读取文件
content_bytes = await file.read()
# 2. 保存到磁盘
file_path = settings.UPLOADS_DIR / file.filename
with open(file_path, "wb") as f:
f.write(content_bytes)
# 3. 解析
text = DocumentParserFactory.parse(str(file_path))
# 4. 分片
chunks = TextChunker().split(text["content"])
# 5. 向量化并存入向量库
await rag_system.add_documents(kb_id, chunks, metadatas=[...])
return {"filename": file.filename, "chunks": len(chunks)}
12.4 CORS 配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
为什么 *:开发期方便前后端分离调试。生产环境应指定具体域名。
12.5 启动事件
@app.on_event("startup")
async def startup_event():
logger.info(f"🚀 {settings.APP_NAME} 启动中...")
init_db()
logger.info("✅ 系统启动完成")
第十三章 前端实现
13.1 整体结构
frontend/index.html 是一个单页应用(SPA),使用 Vue 3 CDN 引入。
核心模块:
- 侧边栏导航
- 6 个视图:智能体 / RAG / 多智能体 / 反思 / 技能
- 模态框:创建智能体
13.2 Vue 3 Setup 模式
const { createApp, ref, onMounted } = Vue;
createApp({
setup() {
// 响应式数据
const agents = ref([]);
const currentView = ref('agents');
// 方法
const loadAll = async () => {
agents.value = (await callApi('get', '/api/agents'))?.agents || [];
};
// 生命周期
onMounted(() => {
loadAll();
});
// 暴露给模板
return { agents, currentView, loadAll };
}
}).mount('#app');
13.3 Axios 封装
const api = axios.create({
baseURL: window.location.origin,
});
const callApi = async (method, path, data = null) => {
try {
const response = await api({ method, url: path, data });
return response.data;
} catch (err) {
alert(err.response?.data?.detail || '请求失败');
return null;
}
};
13.4 各模块实现
智能体对话
<div v-if="selectedAgent" class="chat-area">
<div class="messages">
<div v-for="(m, i) in messages" :key="i" :class="['message', m.role]">
<div class="message-content">{{ m.content }}</div>
</div>
</div>
<div class="chat-input">
<input v-model="inputMessage" @keyup.enter="sendMessage">
<button @click="sendMessage">发送</button>
</div>
</div>
const sendMessage = async () => {
if (!inputMessage.value || !selectedAgent.value) return;
const userMsg = inputMessage.value;
messages.value.push({ role: 'user', content: userMsg });
inputMessage.value = '';
const res = await callApi('post', '/api/agents/chat', {
agent_id: selectedAgent.value.agent_id,
message: userMsg
});
if (res) {
messages.value.push({ role: 'assistant', content: res.message });
}
};
文档上传
const uploadFile = async () => {
const formData = new FormData();
formData.append('file', selectedFile.value);
const res = await api.post(`/api/rag/upload?kb_id=${selectedKb.value}`, formData, {
headers: { 'Content-Type': 'multipart/form-data' }
});
alert(`上传成功!切分为 ${res.data.chunks} 个片段`);
};
第十四章 启动与部署
14.1 本地启动
# 1. 进入项目目录
cd ai-platform
# 2. 安装依赖
cd backend
pip install -r requirements.txt
# 3. 复制配置
cp .env.example .env
# 编辑 .env 填入 OPENAI_API_KEY
# 4. 启动
python main.py
14.2 访问
- API 文档:http://localhost:8000/docs
- 前端页面:浏览器打开
frontend/index.html - 健康检查:http://localhost:8000/
14.3 生产部署
# 使用 Gunicorn + Uvicorn workers
pip install gunicorn
gunicorn main:app \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--access-logfile - \
--error-logfile -
14.4 Nginx 反向代理
server {
listen 80;
server_name api.example.com;
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
第十五章 扩展开发指南
15.1 添加新技能
# 在 backend/skills/registry.py 中追加
class MySkill(SkillBase):
@property
def name(self): return "my_skill"
@property
def description(self): return "我的技能"
def get_tools(self):
return [{
"type": "function",
"function": {
"name": "my_tool",
"description": "工具说明",
"parameters": {
"type": "object",
"properties": {
"param1": {"type": "string"}
}
}
}
}]
async def execute(self, tool_name, arguments):
return {"result": "..."}
# 注册
registry.register(MySkill())
15.2 添加新文档格式
# 在 backend/rag/document_parser.py 中追加
class MyFormatParser(DocumentParser):
@classmethod
def can_parse(cls, filename):
return filename.endswith('.myext')
@classmethod
def parse(cls, file_path):
# 解析逻辑
return {"content": "..."}
# 添加到工厂
DocumentParserFactory._parsers.append(MyFormatParser)
15.3 添加新 LLM 提供商
修改 core/config.py:
class Settings(BaseSettings):
# 选择使用的提供商
LLM_PROVIDER: str = "openai" # openai / anthropic / gemini
...
修改 agents/manager.py 的 _call_llm:
async def _call_llm(self, messages):
if settings.LLM_PROVIDER == "anthropic":
# 调用 Anthropic API
...
elif settings.LLM_PROVIDER == "openai":
# 默认 OpenAI
...
15.4 添加新数据库后端
修改 db/models.py:
# 将 SQLite 改为 PostgreSQL
engine = create_engine(
"postgresql://user:pass@localhost/dbname",
pool_size=10
)
15.5 性能优化方向
- ✅ 缓存层 - 引入 Redis 缓存 RAG 结果
- ✅ 异步批处理 - Embedding 批量提交
- ✅ 流式响应 - LLM 流式返回(SSE)
- ✅ 连接池 - httpx 连接复用
- ✅ 后台任务 - 文档解析异步化
附录:常见问题
Q1: 没有 OpenAI API Key 能跑吗?
A: 可以启动并创建知识库。LLM 调用会失败,但 RAG 检索和文档解析功能可用。
Q2: Milvus Lite 数据存在哪?
A: 默认 ./data/milvus_lite.db,删除该文件即可重置向量库。
Q3: 如何重置整个系统?
A: 删除 data/ 目录下的所有文件即可。
Q4: 为什么 Embedding 维度要对齐?
A: 写入和查询必须使用同一维度的向量模型。OpenAI 是 1536,本地 MiniLM 是 384。
Q5: 如何备份知识库?
A: 复制 data/milvus_lite.db 文件即可。
总结
本平台从零搭建了一个生产级 AI 智能体管理平台,覆盖了:
- ✅ 配置管理(pydantic-settings)
- ✅ 数据持久化(SQLAlchemy + SQLite)
- ✅ 智能体核心(Agent + A2A + Function Calling)
- ✅ 技能系统(注册表模式)
- ✅ RAG 知识库(Milvus Lite + Embedding)
- ✅ 多智能体协作(CrewAI 模式)
- ✅ 反思式推理(ReRCT)
- ✅ 多格式文档解析
- ✅ REST API(FastAPI)
- ✅ Vue 3 前端
通过模块化设计,每个组件都可独立替换和扩展。后续可在此基础上:
- 添加用户系统
- 接入更多 LLM 提供商
- 实现工作流编排
- 加入监控告警
- 部署到 Kubernetes
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)