企业级AI智能客服系统:从零到一的架构设计实战
——尘一不染
"架构不是关于技术,而是关于如何用技术解决业务问题。"
序章:从一个真实问题开始
场景描述:企业客服的真实困境
让我们先看一个真实的业务场景:
某中型电商平台,拥有500万注册用户,日均咨询量约30,000次。客服团队30人,三班倒轮班。
现状问题:
- 人力成本高:每月客服人力成本超过50万元,且每年以15%的速度增长
- 响应速度慢:高峰期平均等待时间超过15分钟,用户满意度仅68%
- 知识分散:产品知识分布在10多个系统中,新员工培训周期长达2周
- 服务时间受限:只能在工作时间段提供服务,夜间和节假日的用户流失严重
老板的灵魂拷问:
"能不能做一个AI客服,24小时在线,成本低一点,能回答80%的常见问题?"
核心需求拆解
将业务需求转化为技术需求:
表格
| 业务需求 | 技术需求 | 优先级 |
|---|---|---|
| 24/7服务 | 高可用架构、容错机制 | P0 |
| 多轮对话 | 状态管理、上下文理解 | P0 |
| 精准回复 | RAG检索、大模型推理 | P0 |
| 持续学习 | 知识更新机制、反馈闭环 | P1 |
| 对接CRM | API集成、数据同步 | P1 |
| 安全合规 | 数据脱敏、审计日志 | P1 |
| 成本控制 | 高效推理、资源优化 | P2 |
业务约束:
- 预算有限:初期投入不超过30万,后续月度运营成本不超过5万
- 现有系统:已部署CRM系统(Salesforce)、工单系统(Zendesk)
- 合规要求:用户数据需符合GDPR,部分对话需加密存储
第一章:架构设计方法论
1.1 架构设计的核心原则
在动手写代码之前,我们需要理解为什么要这样设计,而不是那样设计。
plaintext
架构设计的三个维度:
性能 ◄────────────► 成本
▲ ▲
│ │
│ [权衡区间] │
│ │
▼ ▼
复杂度 ──────────── 可扩展性
我的设计哲学:
- 先正确,再优化:先保证系统正确运行,再考虑性能优化
- 简单优于完美:过度设计的系统是维护的噩梦
- 演进优于一步到位:架构需要随着业务发展而演进
1.2 如何拆解系统:分层思维
对于AI智能客服系统,我采用五层架构:
plaintext
┌─────────────────────────────────────────────────────────┐
│ 用户界面层 (Presentation) │
│ Web / App / 企微 / 钉钉 / API SDK │
├─────────────────────────────────────────────────────────┤
│ 网关层 (Gateway) │
│ 认证 / 限流 / 路由 / 监控 │
├─────────────────────────────────────────────────────────┤
│ 业务层 (Business) │
│ 多Agent协作 / 对话管理 / 业务逻辑 │
├─────────────────────────────────────────────────────────┤
│ 知识层 (Knowledge) │
│ RAG检索 / 知识图谱 / 向量库 │
├─────────────────────────────────────────────────────────┤
│ 基础设施层 (Infrastructure) │
│ 存储 / 缓存 / 消息队列 / 监控 / 安全 │
└─────────────────────────────────────────────────────────┘
分层的好处:
- 关注点分离:每层只关心自己的职责
- 独立演进:可以单独升级某层而不影响其他层
- 团队分工:不同团队负责不同层次
- 故障隔离:一层出问题不会导致整个系统崩溃
1.3 技术选型的权衡框架
plaintext
技术选型决策矩阵:
┌──────────────────────────────────────┐
│ 技术选型三问 │
├──────────────────────────────────────┤
│ 1. 这个技术解决什么问题? │
│ 2. 引入它会带来什么复杂度? │
│ 3. 团队有能力驾驭它吗? │
└──────────────────────────────────────┘
│
▼
┌──────────────────────────────────────┐
│ 评估维度 │
├──────────────────────────────────────┤
│ • 成熟度:生产环境验证了吗? │
│ • 社区活跃度:有问题能找到答案吗? │
│ • 学习曲线:团队能快速上手吗? │
│ • 运维成本:需要多少人力维护? │
│ • 成本:采购/云服务/人力成本? │
└──────────────────────────────────────┘
第二章:前端架构设计与实现
2.1 技术选型
选择 Next.js + React 的理由:
表格
| 维度 | 评估 | 得分 |
|---|---|---|
| SSR支持 | SEO友好,首屏加载快 | ⭐⭐⭐⭐⭐ |
| 流式输出 | 原生支持SSE,方便AI响应流 | ⭐⭐⭐⭐⭐ |
| 生态丰富 | React社区组件丰富 | ⭐⭐⭐⭐⭐ |
| 团队熟悉度 | 主流技术,团队有经验 | ⭐⭐⭐⭐⭐ |
| 部署便捷 | Vercel一键部署 | ⭐⭐⭐⭐⭐ |
2.2 聊天组件核心实现
tsx
// components/ChatInterface.tsx
"use client";
import React, { useState, useRef, useEffect, useCallback } from "react";
import { Message, ChatSession } from "@/types/chat";
import { ChatMessage } from "./ChatMessage";
import { ChatInput } from "./ChatInput";
import { streamChat } from "@/lib/chat-stream";
interface ChatInterfaceProps {
sessionId: string;
userId: string;
onSessionUpdate?: (session: ChatSession) => void;
onError?: (error: Error) => void;
}
export function ChatInterface({
sessionId,
userId,
onSessionUpdate,
onError,
}: ChatInterfaceProps) {
const [messages, setMessages] = useState<Message[]>([]);
const [isLoading, setIsLoading] = useState(false);
const [streamingContent, setStreamingContent] = useState("");
const messagesEndRef = useRef<HTMLDivElement>(null);
const abortControllerRef = useRef<AbortController | null>(null);
// 自动滚动到底部
const scrollToBottom = useCallback(() => {
messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
}, []);
useEffect(() => {
scrollToBottom();
}, [messages, streamingContent, scrollToBottom]);
// 发送消息
const handleSendMessage = async (content: string, attachments?: File[]) => {
if (isLoading) return;
// 创建用户消息
const userMessage: Message = {
id: crypto.randomUUID(),
role: "user",
content,
timestamp: new Date(),
attachments: attachments?.map((f) => ({
name: f.name,
url: URL.createObjectURL(f),
type: f.type,
})),
};
// 添加用户消息
setMessages((prev) => [...prev, userMessage]);
setIsLoading(true);
setStreamingContent("");
// 创建助手消息占位
const assistantMessageId = crypto.randomUUID();
let assistantMessage: Message = {
id: assistantMessageId,
role: "assistant",
content: "",
timestamp: new Date(),
};
try {
// 创建AbortController用于取消请求
abortControllerRef.current = new AbortController();
// 调用流式聊天API
await streamChat({
sessionId,
userId,
messages: [...messages, userMessage],
onChunk: (chunk: string) => {
setStreamingContent((prev) => prev + chunk);
},
onComplete: (fullContent: string, metadata?: Record<string, any>) => {
// 更新助手消息
assistantMessage = {
...assistantMessage,
content: fullContent,
metadata,
};
setMessages((prev) => [...prev, assistantMessage]);
setIsLoading(false);
setStreamingContent("");
// 通知会话更新
onSessionUpdate?.({
id: sessionId,
messages: [...messages, userMessage, assistantMessage],
lastActivity: new Date(),
});
},
onError: (error: Error) => {
setIsLoading(false);
setStreamingContent("");
onError?.(error);
},
signal: abortControllerRef.current.signal,
});
} catch (error) {
setIsLoading(false);
setStreamingContent("");
onError?.(error as Error);
}
};
// 停止生成
const handleStopGeneration = () => {
abortControllerRef.current?.abort();
if (streamingContent) {
const assistantMessage: Message = {
id: crypto.randomUUID(),
role: "assistant",
content: streamingContent,
timestamp: new Date(),
metadata: { stopped: true },
};
setMessages((prev) => [...prev, assistantMessage]);
}
setIsLoading(false);
setStreamingContent("");
};
// 重新生成
const handleRegenerate = async (messageId: string) => {
const messageIndex = messages.findIndex((m) => m.id === messageId);
if (messageIndex === -1) return;
// 删除该消息及其后的所有消息
const updatedMessages = messages.slice(0, messageIndex);
setMessages(updatedMessages);
// 重新发送最后一条用户消息
const lastUserMessage = updatedMessages.filter((m) => m.role === "user").pop();
if (lastUserMessage) {
await handleSendMessage(lastUserMessage.content);
}
};
return (
<div className="flex flex-col h-full bg-gray-50">
{/* 消息列表 */}
<div className="flex-1 overflow-y-auto p-4 space-y-4">
{messages.length === 0 && (
<div className="flex flex-col items-center justify-center h-full text-gray-500">
<div className="text-6xl mb-4">🤖</div>
<h2 className="text-xl font-semibold mb-2">欢迎使用智能客服</h2>
<p className="text-sm">我可以帮您解答产品问题、处理订单查询等</p>
</div>
)}
{messages.map((message) => (
<ChatMessage
key={message.id}
message={message}
onRegenerate={message.role === "assistant" ? handleRegenerate : undefined}
/>
))}
{/* 流式输出中的消息 */}
{streamingContent && (
<ChatMessage
message={{
id: "streaming",
role: "assistant",
content: streamingContent,
timestamp: new Date(),
}}
isStreaming
/>
)}
<div ref={messagesEndRef} />
</div>
{/* 输入区域 */}
<ChatInput
onSend={handleSendMessage}
onStop={handleStopGeneration}
isLoading={isLoading}
disabled={!sessionId}
/>
</div>
);
}
2.3 流式聊天API客户端
typescript
// lib/chat-stream.ts
interface StreamChatParams {
sessionId: string;
userId: string;
messages: Message[];
onChunk: (chunk: string) => void;
onComplete: (content: string, metadata?: Record<string, any>) => void;
onError: (error: Error) => void;
signal?: AbortSignal;
}
export async function streamChat({
sessionId,
userId,
messages,
onChunk,
onComplete,
onError,
signal,
}: StreamChatParams): Promise<void> {
try {
const response = await fetch("/api/chat/stream", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
sessionId,
userId,
messages: messages.map((m) => ({
role: m.role,
content: m.content,
})),
}),
signal,
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
if (!response.body) {
throw new Error("Response body is null");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let fullContent = "";
// 解析SSE数据
const processLine = (line: string) => {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") {
return true; // 完成
}
try {
const parsed = JSON.parse(data);
if (parsed.type === "content") {
fullContent += parsed.content;
onChunk(parsed.content);
} else if (parsed.type === "metadata") {
// 处理元数据
}
} catch {
// 忽略解析错误
}
}
return false;
};
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (processLine(line)) {
onComplete(fullContent);
return;
}
}
}
// 处理剩余的buffer
if (buffer.trim()) {
processLine(buffer);
}
onComplete(fullContent);
} catch (error) {
if ((error as Error).name === "AbortError") {
onComplete(fullContent); // 取消时返回已生成的内容
} else {
onError(error as Error);
}
}
}
2.4 WebSocket vs SSE 对比分析
plaintext
┌────────────────────────────────────────────────────────────────┐
│ 实时通信技术对比 │
├──────────────┬─────────────────────┬────────────────────────────┤
│ 特性 │ WebSocket │ SSE │
├──────────────┼─────────────────────┼────────────────────────────┤
│ 协议方向 │ 双向通信 │ 单向通信(服务端→客户端) │
│ 标准 │ RFC 6455 │ HTML5 标准 │
│ 连接开销 │ 较低(长连接) │ 低(HTTP/1.1) │
│ 兼容性 │ 需要升级协议 │ 自动降级 │
│ 重连机制 │ 需手动实现 │ 自动重连 │
│ 负载均衡 │ 需要 sticky session │ 无状态,易于负载均衡 │
│ 防火墙 │ 可能被阻止 │ 通过HTTP,防火墙友好 │
│ 适用场景 │ 游戏、实时交易 │ AI流式输出、通知推送 │
└──────────────┴─────────────────────┴────────────────────────────┘
💡 结论:对于AI客服的流式响应场景,SSE更加合适:
1. 单向通信足够(服务端推送AI响应)
2. 实现简单,HTTP兼容性好
3. 天然支持断线重连
4. 易于负载均衡和水平扩展
第三章:API Gateway 设计与实现
3.1 为什么需要API Gateway?
plaintext
没有API Gateway时的问题:
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Client │───▶│ Auth │───▶│ Rate │───▶ Service A
│ │ │ Logic │ │ Limit │
└─────────┘ └─────────┘ └─────────┘
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Client │───▶│ Auth │───▶│ Rate │───▶ Service B
│ │ │ Logic │ │ Limit │
└─────────┘ └─────────┘ └─────────┘
❌ 每个服务都要重复实现认证、限流
❌ 难以统一监控和日志
❌ 跨服务调用复杂
有API Gateway后:
┌──────────────────┐
│ API Gateway │
│ ┌────────────┐ │
│ │ Auth │ │
│ │ Rate Limit│ │
│ │ Logging │ │
│ │ Routing │ │
│ │ Circuit │ │
│ └────────────┘ │
└────────┬─────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Service A│ │Service B│ │Service C│
└─────────┘ └─────────┘ └─────────┘
✅ 统一横切关注点
✅ 集中式监控
✅ 易于扩展
3.2 Gateway 核心实现
python
# gateway/main.py
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import StreamingResponse, JSONResponse
from contextlib import asynccontextmanager
import time
import logging
from typing import Callable, Dict, Any
from dataclasses import dataclass, field
from .auth import AuthHandler, TokenData
from .rate_limiter import RateLimiter, TokenBucketLimiter
from .circuit_breaker import CircuitBreaker, CircuitState
from .router import ServiceRouter
from .logger import RequestLogger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class GatewayConfig:
"""网关配置"""
services: Dict[str, Dict[str, Any]] = field(default_factory=dict)
rate_limit_requests: int = 100 # 每分钟请求数
rate_limit_window: int = 60 # 时间窗口(秒)
circuit_breaker_threshold: int = 5 # 失败次数阈值
circuit_breaker_timeout: int = 60 # 熔断恢复时间(秒)
class APIGateway:
def __init__(self, config: GatewayConfig):
self.config = config
self.auth_handler = AuthHandler()
self.rate_limiter = TokenBucketLimiter(
requests_per_window=config.rate_limit_requests,
window_seconds=config.rate_limit_window
)
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
self.service_router = ServiceRouter()
self.request_logger = RequestLogger()
# 初始化服务路由
self._init_services()
def _init_services(self):
"""初始化服务路由"""
for service_name, service_config in self.config.services.items():
self.service_router.register(
service_name,
service_config["base_url"],
service_config["routes"]
)
self.circuit_breakers[service_name] = CircuitBreaker(
service_name=service_name,
failure_threshold=self.config.circuit_breaker_threshold,
recovery_timeout=self.config.circuit_breaker_timeout
)
async def handle_request(
self,
request: Request,
service_name: str,
path: str
) -> Response:
"""处理请求的核心逻辑"""
start_time = time.time()
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
try:
# 1. 身份认证
auth_result = await self._authenticate(request)
if not auth_result.success:
raise HTTPException(
status_code=401,
detail=f"Authentication failed: {auth_result.error}"
)
# 2. 限流检查
if not await self.rate_limiter.check(auth_result.user_id):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
# 3. 熔断器检查
circuit_breaker = self.circuit_breakers.get(service_name)
if circuit_breaker and circuit_breaker.state == CircuitState.OPEN:
raise HTTPException(
status_code=503,
detail="Service temporarily unavailable"
)
# 4. 路由和转发
response = await self._proxy_request(
request,
service_name,
path,
auth_result.user_id
)
# 5. 记录成功
if circuit_breaker:
circuit_breaker.record_success()
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Gateway error: {e}")
if circuit_breaker:
circuit_breaker.record_failure()
raise HTTPException(status_code=500, detail="Internal server error")
finally:
# 6. 记录日志
duration = time.time() - start_time
await self.request_logger.log({
"request_id": request_id,
"method": request.method,
"path": str(request.url.path),
"user_id": auth_result.user_id if 'auth_result' in dir() else None,
"duration": duration,
"timestamp": datetime.utcnow().isoformat()
})
async def _authenticate(self, request: Request) -> AuthResult:
"""身份认证"""
# 从Authorization header提取token
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return AuthResult(success=False, error="Missing or invalid authorization header")
token = auth_header[7:] # 去掉"Bearer "前缀
return await self.auth_handler.verify_token(token)
async def _proxy_request(
self,
request: Request,
service_name: str,
path: str,
user_id: str
) -> Response:
"""代理请求到后端服务"""
# 构建目标URL
target_url = self.service_router.build_url(service_name, path)
# 读取请求体
body = await request.body()
# 构建转发头
headers = dict(request.headers)
headers["X-User-ID"] = user_id
headers["X-Forwarded-For"] = request.client.host if request.client else ""
# 发起请求
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.request(
method=request.method,
url=str(target_url),
headers=headers,
content=body,
params=request.query_params
)
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
# FastAPI 应用
app = FastAPI(title="AI Customer Service Gateway")
gateway = APIGateway(config=GatewayConfig(
services={
"chat-service": {
"base_url": "http://chat-service:8001",
"routes": ["/chat/stream", "/chat/history"]
},
"agent-service": {
"base_url": "http://agent-service:8002",
"routes": ["/agent/execute", "/agent/state"]
},
"knowledge-service": {
"base_url": "http://knowledge-service:8003",
"routes": ["/knowledge/search", "/knowledge/index"]
}
}
))
@app.api_route("/api/{service_name}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def gateway_handler(
request: Request,
service_name: str,
path: str
):
return await gateway.handle_request(request, service_name, path)
3.3 限流算法实现
python
# gateway/rate_limiter.py
import time
import asyncio
from abc import ABC, abstractmethod
from typing import Optional
from dataclasses import dataclass
class RateLimiter(ABC):
"""限流器抽象基类"""
@abstractmethod
async def check(self, key: str) -> bool:
"""检查是否允许请求"""
pass
@abstractmethod
async def get_remaining(self, key: str) -> int:
"""获取剩余请求数"""
pass
@dataclass
class TokenBucketState:
"""令牌桶状态"""
tokens: float
last_update: float
class TokenBucketLimiter(RateLimiter):
"""
令牌桶限流算法
算法原理:
- 桶的容量为 capacity
- 以固定速率 rate 向桶中添加令牌
- 每个请求消耗1个令牌
- 桶满时,令牌溢出(不累积)
优点:
- 允许一定程度的突发流量
- 流量平滑
"""
def __init__(self, requests_per_window: int, window_seconds: int):
self.capacity = requests_per_window
self.refill_rate = requests_per_window / window_seconds # 每秒补充的令牌数
self.buckets: dict[str, TokenBucketState] = {}
self._lock = asyncio.Lock()
async def check(self, key: str) -> bool:
async with self._lock:
self._refill(key)
if self.buckets[key].tokens >= 1:
self.buckets[key].tokens -= 1
return True
return False
async def get_remaining(self, key: str) -> int:
async with self._lock:
self._refill(key)
return int(self.buckets[key].tokens)
def _refill(self, key: str):
"""补充令牌"""
now = time.time()
if key not in self.buckets:
self.buckets[key] = TokenBucketState(
tokens=self.capacity,
last_update=now
)
return
state = self.buckets[key]
elapsed = now - state.last_update
new_tokens = state.tokens + elapsed * self.refill_rate
state.tokens = min(self.capacity, new_tokens)
state.last_update = now
class LeakyBucketLimiter(RateLimiter):
"""
漏桶限流算法
算法原理:
- 请求以任意速率进入桶中
- 桶以固定速率漏水
- 桶满时,新请求被拒绝
优点:
- 流量完全平滑
- 适合固定速率输出的场景
缺点:
- 不允许突发流量
"""
def __init__(self, requests_per_second: float, bucket_size: int):
self.rate = requests_per_second
self.capacity = bucket_size
self.buckets: dict[str, float] = {} # 当前水量
self._lock = asyncio.Lock()
async def check(self, key: str) -> bool:
async with self._lock:
now = time.time()
if key not in self.buckets:
self.buckets[key] = 0
# 漏水
self.buckets[key] = max(0, self.buckets[key] - (now - self.buckets.get(f"{key}_last", now)) * self.rate)
self.buckets[f"{key}_last"] = now
if self.buckets[key] < self.capacity:
self.buckets[key] += 1
return True
return False
async def get_remaining(self, key: str) -> int:
async with self._lock:
return max(0, self.capacity - self.buckets.get(key, 0))
class SlidingWindowLimiter(RateLimiter):
"""
滑动窗口限流算法
算法原理:
- 将时间划分为固定大小的窗口
- 统计当前窗口和前N个窗口的请求数
- 根据比例计算当前有效的请求数
优点:
- 比固定窗口更精确
- 允许一定程度的突发
"""
def __init__(self, max_requests: int, window_size: int):
self.max_requests = max_requests
self.window_size = window_size
self.requests: dict[str, list[float]] = {} # 请求时间戳列表
self._lock = asyncio.Lock()
async def check(self, key: str) -> bool:
async with self._lock:
now = time.time()
window_start = now - self.window_size
# 清理过期的请求
if key in self.requests:
self.requests[key] = [
ts for ts in self.requests[key] if ts > window_start
]
else:
self.requests[key] = []
if len(self.requests[key]) < self.max_requests:
self.requests[key].append(now)
return True
return False
async def get_remaining(self, key: str) -> int:
async with self._lock:
now = time.time()
window_start = now - self.window_size
if key not in self.requests:
return self.max_requests
current_count = len([
ts for ts in self.requests[key] if ts > window_start
])
return max(0, self.max_requests - current_count)
plaintext
┌────────────────────────────────────────────────────────────────┐
│ 限流算法对比 │
├────────────────┬─────────────────┬─────────────────────────────┤
│ 算法 │ 特点 │ 适用场景 │
├────────────────┼─────────────────┼─────────────────────────────┤
│ 令牌桶 │ 允许突发流量 │ 需要容忍突发的API服务 │
│ │ 流量平滑 │ │
├────────────────┼─────────────────┼─────────────────────────────┤
│ 漏桶 │ 流量完全平滑 │ 固定速率输出的场景 │
│ │ 不允许突发 │ 消息队列限流 │
├────────────────┼─────────────────┼─────────────────────────────┤
│ 滑动窗口 │ 精确度高 │ 需要精确控制的场景 │
│ │ 实现较复杂 │ │
├────────────────┼─────────────────┼─────────────────────────────┤
│ 固定窗口 │ 实现简单 │ 粗粒度控制 │
│ │ 边界有突发风险 │ │
└────────────────┴─────────────────┴─────────────────────────────┘
💡 本系统选择令牌桶算法:
1. 允许用户有一定的突发体验(短时间内多问几句)
2. 整体流量可控
3. 实现相对简单
3.4 熔断器模式实现
python
# gateway/circuit_breaker.py
import time
import asyncio
from enum import Enum
from dataclasses import dataclass
from typing import Callable, TypeVar, Any
from functools import wraps
class CircuitState(Enum):
"""熔断器状态"""
CLOSED = "closed" # 关闭状态,正常请求
OPEN = "open" # 打开状态,拒绝请求
HALF_OPEN = "half_open" # 半开状态,允许部分请求
@dataclass
class CircuitBreakerConfig:
"""熔断器配置"""
failure_threshold: int = 5 # 失败次数阈值,触发熔断
success_threshold: int = 3 # 成功次数阈值,恢复正常
timeout: float = 60.0 # 熔断恢复超时(秒)
half_open_max_calls: int = 3 # 半开状态最大并发调用数
class CircuitBreaker:
"""
熔断器实现
状态转换图:
失败次数>=阈值
┌───────────────────────┐
│ ▼
┌─────────┐ ┌─────────┐ 超时时间到
│ CLOSED │───▶│ OPEN │──────────────┐
│ (正常) │ │ (熔断) │ │
└────┬────┘ └─────────┘ │
▲ │ │
│ │ │
│ ▼ │
│ ┌───────────┐ │
│ │ HALF_OPEN│◀────────────┘
│ │ (半开) │ 超时时间到
│ └───────────┘
│ │
│ │ 成功次数>=阈值
│ ▼
└─────────────────────▶ CLOSED
"""
def __init__(
self,
service_name: str,
failure_threshold: int = 5,
success_threshold: int = 3,
timeout: float = 60.0,
half_open_max_calls: int = 3
):
self.service_name = service_name
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.timeout = timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time: float | None = None
self.half_open_calls = 0
self._lock = asyncio.Lock()
@property
def state(self) -> CircuitState:
return self._state
@state.setter
def state(self, value: CircuitState):
old_state = self._state
self._state = value
if old_state != value:
print(f"Circuit breaker for {self.service_name}: {old_state} -> {value}")
async def can_execute(self) -> bool:
"""检查是否可以执行请求"""
async with self._lock:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# 检查是否超时,可以进入半开状态
if time.time() - self.last_failure_time >= self.timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
return True
return False
if self.state == CircuitState.HALF_OPEN:
# 半开状态限制并发数
if self.half_open_calls < self.half_open_max_calls:
self.half_open_calls += 1
return True
return False
return False
async def record_success(self):
"""记录成功调用"""
async with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
# 恢复正常
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
else:
# 关闭状态下重置失败计数
self.failure_count = 0
async def record_failure(self):
"""记录失败调用"""
async with self._lock:
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
# 半开状态下失败,立即打开
self.state = CircuitState.OPEN
else:
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
async def execute(self, func: Callable, *args, **kwargs) -> Any:
"""执行函数,带熔断保护"""
if not await self.can_execute():
raise CircuitBreakerOpenError(
f"Circuit breaker is OPEN for {self.service_name}"
)
try:
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
await self.record_success()
return result
except Exception as e:
await self.record_failure()
raise
class CircuitBreakerOpenError(Exception):
"""熔断器打开异常"""
pass
第四章:Multi-Agent 系统设计与实现
4.1 为什么需要 Multi-Agent?
plaintext
单Agent的局限性:
┌─────────────────────────────────────────────────┐
│ Single Agent │
│ │
│ 输入 → [意图识别] → [知识检索] → [回复生成] → 输出 │
│ │
│ 问题: │
│ 1. 什么都做,什么都不精 │
│ 2. 上下文太长,推理成本高 │
│ 3. 难以处理复杂业务流程 │
│ 4. 单一失败点 │
└─────────────────────────────────────────────────┘
Multi-Agent的优势:
┌─────────────────────────────────────────────────┐
│ Multi-Agent Orchestra │
│ │
│ ┌──────────────────┐ │
│ │ Router Agent │ ← 意图识别 │
│ │ (总指挥) │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌───────────┼───────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Query │ │ Order │ │Complaint│ │
│ │ Agent │ │ Agent │ │ Agent │ │
│ │(问答专家)│ │(订单专家)│ │(投诉专家)│ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └───────────┴───────────┘ │
│ │ │
│ ┌────────▼─────────┐ │
│ │ Response Agent │ ← 回复生成 │
│ │ (整合优化) │ │
│ └──────────────────┘ │
│ │
│ 优势: │
│ 1. 专业分工,能力聚焦 │
│ 2. 可独立优化每个Agent │
│ 3. 易于扩展新的业务场景 │
│ 4. 容错性高,单个Agent失败不影响整体 │
└─────────────────────────────────────────────────┘
4.2 使用 LangGraph 实现 Multi-Agent
python
# agents/orchestrator.py
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.tools import tool
from typing import TypedDict, Annotated, Sequence, Literal
from dataclasses import dataclass, field
import operator
from enum import Enum
# ==================== 定义状态 ====================
class Agent(str, Enum):
"""Agent类型枚举"""
ROUTER = "router"
QUERY = "query"
ORDER = "order"
COMPLAINT = "complaint"
RESPONSE = "response"
@dataclass
class ConversationContext:
"""对话上下文"""
user_id: str
session_id: str
user_profile: dict = field(default_factory=dict)
entities: dict = field(default_factory=dict) # 提取的实体
class AgentState(TypedDict):
"""Multi-Agent共享状态"""
# 对话历史
messages: Annotated[Sequence[HumanMessage | AIMessage], operator.add]
# 当前状态
current_agent: Agent | None
intent: str | None
entities: dict
# 上下文
context: ConversationContext
# 检索结果
retrieved_docs: list[dict] | None
# 最终响应
final_response: str | None
# 路由决策
routing_decision: dict | None
# ==================== 定义工具 ====================
@tool
def search_knowledge_base(query: str, category: str | None = None) -> list[dict]:
"""
搜索知识库获取相关信息
Args:
query: 搜索查询
category: 可选的分类筛选
Returns:
相关文档列表
"""
# 实现知识库检索
# 这里简化实现,实际应该调用RAG系统
return [
{
"content": "根据您的描述,这是我们产品的常见问题...",
"source": "faq",
"relevance": 0.95
}
]
@tool
def query_order_system(order_id: str, user_id: str) -> dict:
"""
查询订单系统获取订单信息
Args:
order_id: 订单ID
user_id: 用户ID
Returns:
订单详情
"""
# 实现订单查询
return {
"order_id": order_id,
"status": "shipped",
"tracking_number": "SF1234567890",
"estimated_delivery": "2024-01-20"
}
@tool
def create_support_ticket(title: str, description: str, user_id: str, priority: str) -> dict:
"""
创建支持工单
Args:
title: 工单标题
description: 工单描述
user_id: 用户ID
priority: 优先级 (low/medium/high)
Returns:
工单ID
"""
# 实现工单创建
return {
"ticket_id": "TKT-2024-001",
"status": "created",
"estimated_response": "2小时内"
}
@tool
def check_order_status(order_id: str) -> dict:
"""检查订单状态"""
return {
"order_id": order_id,
"status": "in_transit",
"last_update": "已到达目的城市"
}
tools = [search_knowledge_base, query_order_system, create_support_ticket, check_order_status]
# ==================== 定义Agent节点 ====================
def create_router_agent(llm):
"""创建路由Agent - 负责意图识别和任务分发"""
system_prompt = """你是一个智能客服路由系统。你的职责是:
1. 分析用户的输入,准确识别用户的意图
2. 根据意图将请求路由到最合适的处理Agent
支持的意图类型:
- query: 产品咨询、使用问题、FAQ等
- order: 订单查询、修改、取消等
- complaint: 投诉、问题反馈等
输出格式:
{
"intent": "意图类型",
"entities": {"提取的实体信息"},
"reasoning": "推理过程"
}
"""
def router_node(state: AgentState) -> AgentState:
messages = state["messages"]
last_message = messages[-1].content if messages else ""
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=f"用户输入: {last_message}")
])
import json
try:
decision = json.loads(response.content)
state["routing_decision"] = decision
state["intent"] = decision.get("intent")
state["entities"] = decision.get("entities", {})
except:
# 默认路由到查询Agent
state["routing_decision"] = {"intent": "query", "entities": {}}
state["intent"] = "query"
state["entities"] = {}
return state
return router_node
def create_domain_agent(agent_type: Agent, llm):
"""创建领域Agent - 负责特定领域的业务处理"""
prompts = {
Agent.QUERY: """你是一个产品咨询专家。根据用户的问题,
从知识库检索相关信息,给出专业、准确的回答。
如果知识库没有相关信息,说明暂时无法解答。""",
Agent.ORDER: """你是一个订单管理专家。根据用户提供的订单信息,
查询订单状态,或帮助用户处理订单相关问题。
如果需要查询订单,请使用工具获取订单信息。""",
Agent.COMPLAINT: """你是一个客服投诉处理专家。
认真倾听用户的问题和不满,
表示理解和歉意,
根据情况创建工单或提供解决方案。"""
}
def domain_node(state: AgentState) -> AgentState:
# 根据路由决策确定当前Agent
state["current_agent"] = agent_type
# 如果有检索结果,直接使用
if state.get("retrieved_docs"):
docs = state["retrieved_docs"]
docs_context = "\n\n".join([
f"[来源: {d.get('source', 'unknown')}]\n{d.get('content', '')}"
for d in docs
])
else:
docs_context = ""
# 调用领域处理
messages = state["messages"]
context = state.get("context", {})
entities = state.get("entities", {})
response = llm.invoke([
SystemMessage(content=prompts[agent_type]),
HumanMessage(content=f"用户输入: {messages[-1].content}\n\n提取的实体: {entities}\n\n参考文档:\n{docs_context}")
])
# 将领域Agent的响应添加到消息中
state["messages"] = state["messages"] + [AIMessage(content=response.content)]
return state
return domain_node
def create_response_agent(llm):
"""创建响应Agent - 负责整合和优化最终回复"""
system_prompt = """你是一个客服响应优化专家。你的职责是:
1. 整合前面各Agent的处理结果
2. 生成最终的用户友好的回复
3. 确保回复准确、有帮助、语气友好
注意:
- 不要透露内部处理细节
- 回复要简洁明了
- 如果无法解决问题,要诚实告知并提供替代方案
"""
def response_node(state: AgentState) -> AgentState:
messages = state["messages"]
# 获取所有AI消息
ai_messages = [m for m in messages if isinstance(m, AIMessage)]
if ai_messages:
# 使用响应优化Agent处理
context = "\n\n".join([m.content for m in ai_messages])
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=f"中间处理结果:\n{context}\n\n请生成最终的用户回复。")
])
state["final_response"] = response.content
state["messages"] = state["messages"] + [AIMessage(content=response.content)]
return state
return response_node
# ==================== 定义路由逻辑 ====================
def route_to_agent(state: AgentState) -> str:
"""根据意图路由到不同的Agent"""
intent = state.get("intent")
routes = {
"query": "query_agent",
"order": "order_agent",
"complaint": "complaint_agent",
}
return routes.get(intent, "query_agent")
def should_continue_to_response(state: AgentState) -> str:
"""判断是否继续到响应Agent"""
# 领域Agent处理完成后,都进入响应Agent
return "response_agent"
# ==================== 构建图 ====================
def build_agent_graph():
"""构建Multi-Agent协作图"""
# 初始化LLM
llm = ChatOpenAI(model="gpt-4-turbo", temperature=0.7)
llm_with_tools = llm.bind_tools(tools)
# 创建图
graph = StateGraph(AgentState)
# 添加节点
graph.add_node("router", create_router_agent(llm))
graph.add_node("query_agent", create_domain_agent(Agent.QUERY, llm))
graph.add_node("order_agent", create_domain_agent(Agent.ORDER, llm))
graph.add_node("complaint_agent", create_domain_agent(Agent.COMPLAINT, llm))
graph.add_node("response_agent", create_response_agent(llm))
# 添加边
graph.set_entry_point("router")
# 路由到领域Agent
graph.add_conditional_edges(
"router",
route_to_agent,
{
"query_agent": "query_agent",
"order_agent": "order_agent",
"complaint_agent": "complaint_agent",
}
)
# 领域Agent到响应Agent
graph.add_edge("query_agent", "response_agent")
graph.add_edge("order_agent", "response_agent")
graph.add_edge("complaint_agent", "response_agent")
# 结束
graph.add_edge("response_agent", END)
return graph.compile()
# ==================== 执行示例 ====================
async def main():
"""示例执行"""
graph = build_agent_graph()
# 初始化状态
initial_state: AgentState = {
"messages": [HumanMessage(content="我的订单什么时候能到?订单号是123456")],
"current_agent": None,
"intent": None,
"entities": {},
"context": ConversationContext(
user_id="user_001",
session_id="session_001"
),
"retrieved_docs": None,
"final_response": None,
"routing_decision": None,
}
# 执行图
result = await graph.ainvoke(initial_state)
print("=" * 50)
print("最终响应:")
print(result["final_response"])
print("=" * 50)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
4.3 Agent 间通信机制
plaintext
Multi-Agent Memory 架构设计:
┌─────────────────────────────────────────────────────────────────┐
│ Shared Memory System │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Short-term Memory │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Turn 1 │ │ Turn 2 │ │ Turn 3 │ │ Turn 4 │ │ │
│ │ │ 消息 │ │ 消息 │ │ 消息 │ │ 消息 │ │ │
│ │ │ 状态 │ │ 状态 │ │ 状态 │ │ 状态 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ │ │ │
│ │ TTL: 当前会话周期 │ │
│ │ 大小: 限制最近N轮对话 │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Long-term Memory │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ User Profile: │ │ │
│ │ │ - 历史偏好 │ │ │
│ │ │ - 购买记录 │ │ │
│ │ │ - 常用地址 │ │ │
│ │ │ - 偏好设置 │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Session Summary: │ │ │
│ │ │ - 会话摘要(用于跨会话上下文) │ │ │
│ │ │ - 未解决的问题 │ │ │
│ │ │ - 用户反馈 │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ 存储: Redis / PostgreSQL / 图数据库 │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Agent间消息传递模式:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Router │────────▶│ Query │────────▶│ Response │
│ Agent │ │ Agent │ │ Agent │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
│ │ │
└────────────────────────┴────────────────────────┘
│
▼
┌─────────────────┐
│ Shared State │
│ (广播模式) │
└─────────────────┘
MCP协议(Model Context Protocol)集成:
┌─────────────────────────────────────────────────────────────────┐
│ MCP Protocol Stack │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ MCP Client (Agent) │ │
│ ├────────────────────────────────────────────────────────────┤ │
│ │ { │ │
│ │ "jsonrpc": "2.0", │ │
│ │ "method": "tools/call", │ │
│ │ "params": { │ │
│ │ "name": "query_order_system", │ │
│ │ "arguments": {"order_id": "123"} │ │
│ │ } │ │
│ │ } │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ MCP Transport Layer │ │
│ │ (HTTP/WebSocket/stdio) │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ MCP Server (Tools) │ │
│ │ Order System / Knowledge Base / CRM / etc │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
第五章:RAG 知识库系统设计与实现
5.1 RAG 架构全景
plaintext
企业级RAG系统架构:
┌─────────────────────────────────────────────────────────────────┐
│ RAG Pipeline │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Ingestion Pipeline │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 文档 │──▶│ 解析 │──▶│ 分块 │──▶│ 向量化 │ │ │
│ │ │ 上传 │ │ 提取 │ │ 切分 │ │ 嵌入 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └────┬────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌───────────┐ │ │
│ │ │ 向量存储 │ │ │
│ │ │ Database │ │ │
│ │ └───────────┘ │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Retrieval Pipeline │ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 用户 │──▶│ 重写 │──▶│ 混合 │──▶│ 重排 │ │ │
│ │ │ 查询 │ │ 查询 │ │ 检索 │ │ Rerank │ │ │
│ │ └─────────┘ └─────────┘ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ ┌───────────┐ ┌───────────┐ │ │
│ │ │ 向量存储 │ │ BM25索引 │ │ │
│ │ └───────────┘ └───────────┘ │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
5.2 完整 RAG 实现
python
# rag_system/main.py
import os
import hashlib
from dataclasses import dataclass, field
from typing import List, Optional, Tuple, Any
from enum import Enum
import asyncio
from abc import ABC, abstractmethod
# LangChain 相关
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CohereRerank
# 向量存储
from langchain_community.vectorstores import Chroma, FAISS
from langchain_community.vectorstores.milvus import Milvus
# 数据处理
import PyPDF2
from docx import Document as DocxDocument
import markdown
# ==================== 配置 ====================
@dataclass
class RAGConfig:
"""RAG系统配置"""
# 嵌入模型配置
embedding_model: str = "text-embedding-3-small"
embedding_dimension: int = 1536
# 分块配置
chunk_size: int = 500
chunk_overlap: int = 50
# 检索配置
top_k: int = 5
similarity_threshold: float = 0.7
# 混合检索权重
vector_weight: float = 0.7
bm25_weight: float = 0.3
# 重排配置
enable_rerank: bool = True
rerank_top_n: int = 3
# ==================== 文档解析器 ====================
class DocumentParser(ABC):
"""文档解析器抽象基类"""
@abstractmethod
def parse(self, file_path: str) -> List[Document]:
"""解析文档为Document列表"""
pass
class PDFParser(DocumentParser):
"""PDF文档解析器"""
def parse(self, file_path: str) -> List[Document]:
documents = []
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
for page_num, page in enumerate(reader.pages):
text = page.extract_text()
if text.strip():
documents.append(Document(
page_content=text,
metadata={
"source": file_path,
"page": page_num + 1,
"total_pages": len(reader.pages),
"doc_type": "pdf"
}
))
return documents
class DocxParser(DocumentParser):
"""Word文档解析器"""
def parse(self, file_path: str) -> List[Document]:
documents = []
doc = DocxDocument(file_path)
# 按段落提取
full_text = []
for para in doc.paragraphs:
if para.text.strip():
full_text.append(para.text)
text = "\n".join(full_text)
if text.strip():
documents.append(Document(
page_content=text,
metadata={
"source": file_path,
"doc_type": "docx"
}
))
return documents
class MarkdownParser(DocumentParser):
"""Markdown文档解析器"""
def parse(self, file_path: str) -> List[Document]:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 转换为HTML再提取文本
html = markdown.markdown(content)
# 简单处理:按标题分块
sections = content.split('\n## ')
documents = []
for i, section in enumerate(sections):
if section.strip():
documents.append(Document(
page_content=section.strip(),
metadata={
"source": file_path,
"section": i,
"doc_type": "markdown"
}
))
return documents
class TextParser(DocumentParser):
"""纯文本解析器"""
def parse(self, file_path: str) -> List[Document]:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return [Document(
page_content=content,
metadata={
"source": file_path,
"doc_type": "text"
}
)]
class DocumentParserFactory:
"""文档解析器工厂"""
_parsers = {
'.pdf': PDFParser,
'.docx': DocxParser,
'.doc': DocxParser,
'.md': MarkdownParser,
'.txt': TextParser,
}
@classmethod
def get_parser(cls, file_path: str) -> DocumentParser:
ext = os.path.splitext(file_path)[1].lower()
parser_class = cls._parsers.get(ext, TextParser)
return parser_class()
# ==================== 文本分块器 ====================
class SemanticChunker:
"""
语义分块器
策略:
1. 按句子边界切分
2. 保持语义完整性
3. 支持重叠以保持上下文
"""
def __init__(
self,
chunk_size: int = 500,
chunk_overlap: int = 50,
min_chunk_size: int = 100
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
# 使用LangChain的文本分割器
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", "。", "!", "?", " ", ""],
is_separator_regex=False,
)
def chunk_documents(
self,
documents: List[Document]
) -> List[Document]:
"""将文档列表分块"""
chunked_docs = []
for doc in documents:
chunks = self.text_splitter.split_documents([doc])
chunked_docs.extend(chunks)
return chunked_docs
def chunk_text(
self,
text: str,
metadata: dict = None
) -> List[Document]:
"""将文本分块"""
chunks = self.text_splitter.split_text(text)
return [
Document(
page_content=chunk,
metadata=metadata or {}
)
for chunk in chunks
]
# ==================== 向量化存储 ====================
class VectorStoreManager:
"""向量存储管理器"""
def __init__(
self,
embeddings: Embeddings,
persist_directory: str = "./vectorstore"
):
self.embeddings = embeddings
self.persist_directory = persist_directory
self.vectorstore: Optional[Chroma] = None
async def create_vectorstore(
self,
documents: List[Document],
collection_name: str = "knowledge_base"
) -> Chroma:
"""创建向量存储"""
# 确保目录存在
os.makedirs(self.persist_directory, exist_ok=True)
# 创建Chroma向量存储
self.vectorstore = await asyncio.to_thread(
Chroma.from_documents,
documents=documents,
embedding=self.embeddings,
persist_directory=self.persist_directory,
collection_name=collection_name
)
return self.vectorstore
def add_documents(
self,
documents: List[Document],
ids: List[str] = None
) -> List[str]:
"""添加文档到向量存储"""
if not self.vectorstore:
raise ValueError("Vectorstore not initialized")
return self.vectorstore.add_documents(
documents=documents,
ids=ids
)
def delete(self, ids: List[str] = None):
"""删除文档"""
if not self.vectorstore:
raise ValueError("Vectorstore not initialized")
if ids:
self.vectorstore.delete(ids=ids)
else:
self.vectorstore.delete()
def as_retriever(self, **kwargs):
"""转换为检索器"""
if not self.vectorstore:
raise ValueError("Vectorstore not initialized")
return self.vectorstore.as_retriever(**kwargs)
# ==================== 混合检索器 ====================
class HybridRetriever:
"""
混合检索器
结合向量检索和关键词检索,提高召回率
"""
def __init__(
self,
vectorstore: Chroma,
embeddings: Embeddings,
vector_weight: float = 0.7,
bm25_weight: float = 0.3
):
self.vectorstore = vectorstore
self.embeddings = embeddings
self.vector_weight = vector_weight
self.bm25_weight = bm25_weight
# BM25相关初始化
self._init_bm25()
def _init_bm25(self):
"""初始化BM25"""
try:
from rank_bm25 import BM25Okapi
self.bm25_available = True
except ImportError:
self.bm25_available = False
async def retrieve(
self,
query: str,
top_k: int = 5,
filter_criteria: dict = None
) -> List[Tuple[Document, float]]:
"""
混合检索
Returns:
(文档, 融合分数) 列表
"""
results = []
# 1. 向量检索
vector_results = await self._vector_search(
query,
top_k * 2, # 多检索一些,后面会融合
filter_criteria
)
# 2. BM25检索(如果可用)
if self.bm25_available:
bm25_results = await self._bm25_search(query, top_k * 2)
results = self._reciprocal_rank_fusion(
vector_results,
bm25_results
)
else:
results = vector_results
return results[:top_k]
async def _vector_search(
self,
query: str,
k: int,
filter_criteria: dict = None
) -> List[Tuple[Document, float]]:
"""向量相似度搜索"""
def _search():
return self.vectorstore.similarity_search_with_score(
query,
k=k,
filter=filter_criteria
)
return await asyncio.to_thread(_search)
async def _bm25_search(
self,
query: str,
k: int
) -> List[Tuple[Document, float]]:
"""BM25关键词搜索"""
if not self.bm25_available:
return []
# 获取所有文档
all_docs = self.vectorstore.get()
documents = all_docs.get("documents", [])
metadatas = all_docs.get("metadatas", [])
if not documents:
return []
# Tokenize
tokenized_corpus = [doc.split() for doc in documents]
bm25 = BM25Okapi(tokenized_corpus)
# 搜索
tokenized_query = query.split()
scores = bm25.get_scores(tokenized_query)
# 获取top-k
doc_scores = list(enumerate(scores))
doc_scores.sort(key=lambda x: x[1], reverse=True)
results = []
for idx, score in doc_scores[:k]:
if score > 0: # BM25分数大于0
doc = Document(
page_content=documents[idx],
metadata=metadatas[idx] if metadatas else {}
)
results.append((doc, float(score)))
return results
def _reciprocal_rank_fusion(
self,
results_a: List[Tuple[Document, float]],
results_b: List[Tuple[Document, float]],
k: float = 60
) -> List[Tuple[Document, float]]:
"""
倒数排序融合(RRF)
融合多个检索结果
"""
doc_scores = {}
# 处理结果A(向量检索)
for rank, (doc, score) in enumerate(results_a):
doc_id = self._get_doc_id(doc)
doc_scores[doc_id] = doc_scores.get(doc_id, 0) + self.vector_weight * (1 / (k + rank + 1))
doc_scores[f"{doc_id}_data"] = doc
# 处理结果B(BM25)
max_b_score = max(score for _, score in results_b) if results_b else 1
for rank, (doc, score) in enumerate(results_b):
doc_id = self._get_doc_id(doc)
normalized_score = score / max_b_score if max_b_score > 0 else 0
doc_scores[doc_id] = doc_scores.get(doc_id, 0) + self.bm25_weight * (1 / (k + rank + 1))
doc_scores[f"{doc_id}_data"] = doc
# 排序
fused_results = []
for doc_id, score in doc_scores.items():
if not doc_id.endswith("_data"):
fused_results.append((
doc_scores.get(f"{doc_id}_data"),
score
))
fused_results.sort(key=lambda x: x[1], reverse=True)
return fused_results
def _get_doc_id(self, doc: Document) -> str:
"""获取文档唯一ID"""
source = doc.metadata.get("source", "")
content = doc.page_content[:100]
return hashlib.md5(f"{source}:{content}".encode()).hexdigest()
# ==================== 重排序 ====================
class Reranker:
"""
语义重排序
使用更强大的模型对初步检索结果进行重排序
提高相关性
"""
def __init__(self, model_name: str = "cohere-rerank-multilingual-v2.0"):
self.model_name = model_name
self._client = None
def _get_client(self):
"""懒加载客户端"""
if self._client is None:
try:
from cohere import Client
api_key = os.environ.get("COHERE_API_KEY")
self._client = Client(api_key=api_key)
except ImportError:
raise ImportError("cohere package not installed")
return self._client
async def rerank(
self,
query: str,
documents: List[Document],
top_n: int = 3
) -> List[Document]:
"""重排序文档"""
if not documents:
return []
try:
client = self._get_client()
docs_text = [doc.page_content for doc in documents]
response = await asyncio.to_thread(
client.rerank,
model=self.model_name,
query=query,
documents=docs_text,
top_n=top_n
)
reranked_docs = []
for result in response.results:
reranked_docs.append(documents[result.index])
return reranked_docs
except Exception as e:
# 如果重排序失败,返回原始顺序
return documents[:top_n]
# ==================== 查询处理 ====================
class QueryProcessor:
"""
查询处理器
负责查询的改写、扩展等预处理
"""
def __init__(self, llm):
self.llm = llm
async def expand_query(self, query: str) -> List[str]:
"""
查询扩展
使用LLM生成多个查询变体,提高召回率
"""
prompt = f"""给定用户查询,生成3个不同的查询变体,
这些变体应该从不同角度表达相同的语义。
原始查询: {query}
要求:
1. 保持原意
2. 使用不同的表述方式
3. 可以添加同义词或相关概念
输出格式(JSON数组):
["查询1", "查询2", "查询3"]
"""
response = await asyncio.to_thread(
self.llm.invoke,
prompt
)
import json
try:
queries = json.loads(response.content)
return queries if isinstance(queries, list) else [query]
except:
return [query]
async def rewrite_query(self, query: str, conversation_history: list = None) -> str:
"""
查询改写
结合对话历史,改写当前查询
"""
if not conversation_history:
return query
history_text = "\n".join([
f"用户: {h.get('user', '')}\n助手: {h.get('assistant', '')}"
for h in conversation_history[-3:]
])
prompt = f"""给定对话历史和当前查询,改写当前查询使其更加完整清晰。
对话历史:
{history_text}
当前查询:{query}
要求:
1. 引用历史对话中的必要上下文
2. 保持简洁
3. 明确表达当前意图
输出:改写后的查询
"""
response = await asyncio.to_thread(
self.llm.invoke,
prompt
)
return response.content.strip()
# ==================== RAG系统整合 ====================
class RAGSystem:
"""完整的RAG系统"""
def __init__(self, config: RAGConfig):
self.config = config
# 初始化组件
self.embeddings = OpenAIEmbeddings(
model=config.embedding_model,
dimensions=config.embedding_dimension
)
self.chunker = SemanticChunker(
chunk_size=config.chunk_size,
chunk_overlap=config.chunk_overlap
)
self.vector_manager = VectorStoreManager(
embeddings=self.embeddings,
persist_directory="./vectorstore"
)
async def ingest_document(self, file_path: str) -> dict:
"""
文档摄入流程
1. 解析文档
2. 分块
3. 向量化
4. 存储
"""
# 1. 解析
parser = DocumentParserFactory.get_parser(file_path)
documents = parser.parse(file_path)
if not documents:
return {"status": "error", "message": "Failed to parse document"}
# 2. 分块
chunked_docs = self.chunker.chunk_documents(documents)
# 生成文档ID
for i, doc in enumerate(chunked_docs):
doc_id = hashlib.md5(
f"{doc.page_content}:{i}".encode()
).hexdigest()
doc.metadata["chunk_id"] = doc_id
# 3. 向量存储
vectorstore = await self.vector_manager.create_vectorstore(
documents=chunked_docs,
collection_name="knowledge_base"
)
return {
"status": "success",
"document_count": len(documents),
"chunk_count": len(chunked_docs),
"file_path": file_path
}
async def retrieve(
self,
query: str,
filter_criteria: dict = None
) -> List[Document]:
"""
检索流程
1. 查询预处理
2. 混合检索
3. 重排序
"""
# 确保向量存储已初始化
if not self.vector_manager.vectorstore:
# 加载已存在的向量存储
self.vector_manager.vectorstore = Chroma(
persist_directory=self.vector_manager.persist_directory,
embedding_function=self.embeddings
)
# 创建混合检索器
hybrid_retriever = HybridRetriever(
vectorstore=self.vector_manager.vectorstore,
embeddings=self.embeddings,
vector_weight=self.config.vector_weight,
bm25_weight=self.config.bm25_weight
)
# 混合检索
results = await hybrid_retriever.retrieve(
query=query,
top_k=self.config.top_k,
filter_criteria=filter_criteria
)
# 重排序
if self.config.enable_rerank:
reranker = Reranker()
documents = [doc for doc, _ in results]
reranked = await reranker.rerank(
query=query,
documents=documents,
top_n=self.config.rerank_top_n
)
return reranked
return [doc for doc, _ in results[:self.config.top_k]]
async def chat(
self,
query: str,
conversation_history: list = None
) -> dict:
"""
RAG增强的对话
1. 检索相关文档
2. 构建提示
3. 生成回答
"""
# 1. 检索
docs = await self.retrieve(query)
# 2. 构建上下文
context = "\n\n".join([
f"[来源: {doc.metadata.get('source', 'unknown')}]\n{doc.page_content}"
for doc in docs
])
# 3. 生成回答
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4", temperature=0.7)
prompt = f"""基于以下参考信息,回答用户的问题。
参考信息:
{context}
用户问题:{query}
要求:
1. 基于参考信息回答
2. 如果信息不足,说明不知道
3. 注明信息来源
"""
response = await asyncio.to_thread(
llm.invoke,
prompt
)
return {
"answer": response.content,
"sources": [
{
"content": doc.page_content[:200] + "...",
"source": doc.metadata.get("source", "unknown")
}
for doc in docs
]
}
# ==================== 使用示例 ====================
async def main():
"""使用示例"""
# 初始化RAG系统
config = RAGConfig(
chunk_size=500,
chunk_overlap=50,
top_k=5,
vector_weight=0.7,
bm25_weight=0.3,
enable_rerank=True
)
rag = RAGSystem(config)
# 1. 摄入文档
print("摄入文档...")
result = await rag.ingest_document("./docs/faq.md")
print(result)
# 2. 检索
print("\n检索...")
docs = await rag.retrieve("如何申请退货?")
for doc in docs:
print(f"- {doc.page_content[:100]}...")
# 3. RAG对话
print("\nRAG对话...")
response = await rag.chat("我的订单什么时候能到?")
print(response["answer"])
print("\n来源:")
for source in response["sources"]:
print(f"- {source['source']}")
if __name__ == "__main__":
asyncio.run(main())
5.3 语义分块策略对比
plaintext
┌────────────────────────────────────────────────────────────────┐
│ 语义分块策略对比 │
├────────────────┬───────────────────────────────────────────────┤
│ 策略 │ 说明 │
├────────────────┼───────────────────────────────────────────────┤
│ 固定大小分块 │ 简单直接,但可能切断语义单元 │
│ │ chunk_size=500, overlap=50 │
├────────────────┼───────────────────────────────────────────────┤
│ 句子分块 │ 按句子边界切分,保持句子完整性 │
│ │ 适合FAQ、简短问答 │
├────────────────┼───────────────────────────────────────────────┤
│ 段落分块 │ 按段落切分,保持段落语义 │
│ │ 适合长文档、产品说明 │
├────────────────┼───────────────────────────────────────────────┤
│ 递归分块 │ 按层级切分,优先大单元再小单元 │
│ │ \n\n → \n → 句子 │
│ │ ✅ LangChain默认推荐 │
├────────────────┼───────────────────────────────────────────────┤
│ 语义分块 │ 使用Embedding识别语义边界 │
│ │ 质量最高,计算成本也最高 │
├────────────────┼───────────────────────────────────────────────┤
│ Agent分块 │ 使用Agent决定分块位置 │
│ │ 最智能,适合复杂文档结构 │
└────────────────┴───────────────────────────────────────────────┘
💡 本系统选择:递归分块 + 语义重排
1. 递归分块作为基础,平衡质量和性能
2. 结合文档结构元数据(标题层级)
3. 对关键段落使用较小的chunk_size
第六章:基础设施层
6.1 监控与可观测性架构
plaintext
可观测性三大支柱:
┌─────────────────────────────────────────────────────────────────┐
│ Observability Stack │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Metrics │ │ Logs │ │ Traces │ │
│ │ 指标 │ │ 日志 │ │ 链路追踪 │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ Prometheus │ │ ELK │ │ Jaeger │ │
│ │ Grafana │ │ Loki │ │ Zipkin │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 统一视图 │ │
│ │ Dashboard │ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
AI系统特有指标:
┌─────────────────────────────────────────────────────────────────┐
│ AI System Metrics │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 延迟指标 │ │ 质量指标 │ │ 业务指标 │ │
│ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │
│ │ TTFT: 首token │ │ 意图识别准确率 │ │ 客服解决率 │ │
│ │ 响应时间 │ │ 答案相关性分数 │ │ 用户满意度 │ │
│ │ Token生成速度 │ │ 检索召回率 │ │ 转人工率 │ │
│ │ P99/P95延迟 │ │ 拒答率 │ │ 会话完成率 │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 成本指标 │ │ 健康指标 │ │ 安全指标 │ │
│ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │
│ │ Token消耗量 │ │ 模型可用性 │ │ Prompt注入次数 │ │
│ │ 每会话成本 │ │ 错误率 │ │ 敏感信息暴露 │ │
│ │ GPU利用率 │ │ 熔断触发次数 │ │ 审计覆盖率 │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
6.2 监控指标采集实现
python
# infrastructure/metrics.py
from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry
from prometheus_client.exposition import generate_latest
import time
from functools import wraps
from typing import Callable, Any
import asyncio
# 创建注册表
registry = CollectorRegistry()
# ==================== 定义指标 ====================
# 请求计数器
http_requests_total = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"],
registry=registry
)
# 请求延迟直方图
http_request_duration_seconds = Histogram(
"http_request_duration_seconds",
"HTTP request latency in seconds",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
registry=registry
)
# AI相关指标
ai_request_duration_seconds = Histogram(
"ai_request_duration_seconds",
"AI request latency in seconds",
["agent_type", "operation"],
buckets=[0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0],
registry=registry
)
ai_tokens_consumed = Counter(
"ai_tokens_consumed_total",
"Total tokens consumed",
["model", "token_type"], # token_type: prompt/completion
registry=registry
)
ai_requests_total = Counter(
"ai_requests_total",
"Total AI requests",
["agent_type", "status"], # status: success/error
registry=registry
)
# RAG指标
rag_retrieval_latency_seconds = Histogram(
"rag_retrieval_latency_seconds",
"RAG retrieval latency in seconds",
["retrieval_type"], # retrieval_type: vector/bm25/hybrid
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0],
registry=registry
)
rag_retrieval_results = Histogram(
"rag_retrieval_results_count",
"Number of retrieval results",
["retrieval_type"],
buckets=[1, 3, 5, 10, 20],
registry=registry
)
# 业务指标
conversation_total = Counter(
"conversation_total",
"Total conversations",
["status"], # status: completed/transferred/timeout
registry=registry
)
user_satisfaction_score = Histogram(
"user_satisfaction_score",
"User satisfaction score",
buckets=[1, 2, 3, 4, 5],
registry=registry
)
# 系统资源指标
active_connections = Gauge(
"active_connections",
"Number of active connections",
["connection_type"], # connection_type: websocket/sse
registry=registry
)
queue_size = Gauge(
"queue_size",
"Message queue size",
["queue_name"],
registry=registry
)
# ==================== 装饰器 ====================
def track_request_metrics(method: str, endpoint: str):
"""请求指标装饰器"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
start_time = time.time()
status = "200"
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
status = "500"
raise
finally:
duration = time.time() - start_time
http_requests_total.labels(
method=method,
endpoint=endpoint,
status=status
).inc()
http_request_duration_seconds.labels(
method=method,
endpoint=endpoint
).observe(duration)
return wrapper
return decorator
def track_ai_metrics(agent_type: str, operation: str):
"""AI指标装饰器"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
start_time = time.time()
status = "success"
try:
result = await func(*args, **kwargs)
# 如果返回结果包含token使用量
if isinstance(result, dict) and "usage" in result:
usage = result["usage"]
model = result.get("model", "unknown")
if "prompt_tokens" in usage:
ai_tokens_consumed.labels(
model=model,
token_type="prompt"
).inc(usage["prompt_tokens"])
if "completion_tokens" in usage:
ai_tokens_consumed.labels(
model=model,
token_type="completion"
).inc(usage["completion_tokens"])
return result
except Exception as e:
status = "error"
raise
finally:
duration = time.time() - start_time
ai_request_duration_seconds.labels(
agent_type=agent_type,
operation=operation
).observe(duration)
ai_requests_total.labels(
agent_type=agent_type,
status=status
).inc()
return wrapper
return decorator
# ==================== 指标端点 ====================
async def metrics_endpoint():
"""Prometheus指标端点"""
return generate_latest(registry)
6.3 缓存策略设计
plaintext
多级缓存架构:
┌─────────────────────────────────────────────────────────────────┐
│ Multi-Level Cache │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Level 1: 进程内缓存 (In-Memory) │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││
│ │ │ LRU Cache│ │ TTLCache │ │ LFUCache│ │ TinyLFU │ ││
│ │ │ 最近最少│ │ 时间过期│ │ 频率最│ │ 频率限制│ ││
│ │ │ 使用 │ │ 策略 │ │ 低使用│ │ + LRU │ ││
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ ││
│ │ ││
│ │ 容量: 1000-10000 items ││
│ │ 延迟: < 1ms ││
│ │ 淘汰: LRU / LFU / TTL ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ │
│ ▼ │
│ Level 2: 分布式缓存 (Redis) │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ ││
│ │ 热知识: 热门FAQ答案 ││
│ │ 用户画像: 近期对话摘要 ││
│ │ 会话状态: 多轮对话上下文 ││
│ │ ││
│ │ 容量: 百万级 items ││
│ │ 延迟: 1-5ms ││
│ │ 持久化: 可选 ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ │
│ ▼ │
│ Level 3: 持久化存储 (PostgreSQL/MongoDB) │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ ││
│ │ 历史对话: 完整会话记录 ││
│ │ 知识库: 文档向量 ││
│ │ 用户数据: 长期偏好 ││
│ │ ││
│ │ 容量: 十亿级 items ││
│ │ 延迟: 10-100ms ││
│ │ 事务: 支持 ││
│ └─────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────┘
缓存键设计模式:
┌─────────────────────────────────────────────────────────────────┐
│ Cache Key Design │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 命名规范: {namespace}:{entity}:{identifier}:{version} │
│ │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ 命名空间 │ 实体类型 │ 示例 │ │
│ ├────────────────────────────────────────────────────────────┤ │
│ │ chat │ session │ chat:session:xxx:v1 │ │
│ │ chat │ context │ chat:context:xxx:v1 │ │
│ │ rag │ embedding │ rag:emb:xxx:v1 │ │
│ │ user │ profile │ user:profile:xxx:v1 │ │
│ │ knowledge │ doc │ knowledge:doc:xxx:v1 │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
│ TTL策略: │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ 数据类型 │ TTL │ 说明 │ │
│ ├────────────────────────────────────────────────────────────┤ │
│ │ 热门FAQ答案 │ 1小时-24小时 │ 长期稳定 │ │
│ │ 用户会话上下文 │ 30分钟 │ 对话窗口内 │ │
│ │ 检索结果 │ 5-15分钟 │ 短期缓存 │ │
│ │ 嵌入向量 │ 24小时-7天 │ 文档更新频率低 │ │
│ │ 临时计算结果 │ 1-5分钟 │ 避免重复计算 │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
6.4 消息队列架构
plaintext
事件驱动架构:
┌─────────────────────────────────────────────────────────────────┐
│ Event-Driven Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ API │──────▶│ Kafka │──────▶│ Consumer│ │
│ │ Gateway │ │ Cluster │ │ Pool │ │
│ └─────────┘ └─────────┘ └────┬────┘ │
│ │ │
│ ┌─────────────────────┼─────────────────────┐│
│ ▼ ▼ ▼│
│ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ │ Analytics│ │ Notification│ │ Archive │
│ │ Pipeline │ │ Service │ │ Service │
│ └───────────┘ └───────────┘ └───────────┘
│ │
│ Topic设计: │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Topic │ Partitions │ Retention │ │
│ ├────────────────────────────────────────────────────────────┤ │
│ │ chat.events │ 6 │ 7 days │ │
│ │ chat.sessions │ 3 │ 30 days │ │
│ │ ai.requests │ 12 │ 3 days │ │
│ │ rag.indexing │ 3 │ 1 day │ │
│ │ user.feedback │ 3 │ 90 days │ │
│ │ system.alerts │ 1 │ 30 days │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
第七章:安全与合规
7.1 安全架构
plaintext
多层安全防护体系:
┌─────────────────────────────────────────────────────────────────┐
│ Security Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ 边界安全 (Perimeter) ││
│ │ ││
│ │ WAF ──── DDoS防护 ──── API Gateway认证 ──── VPN/专线 ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ 应用安全 (Application) ││
│ │ ││
│ │ 输入验证 ──── 身份认证 ──── 授权控制 ──── 会话管理 ││
│ │ 敏感词过滤 ──── Prompt注入检测 ──── 输出脱敏 ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ 数据安全 (Data) ││
│ │ ││
│ │ 传输加密(TLS) ──── 存储加密(AES) ──── 数据脱敏 ──── 备份 ││
│ └─────────────────────────────────────────────────────────────┘│
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ 审计合规 (Audit) ││
│ │ ││
│ │ 操作日志 ──── 审计追踪 ──── 合规报告 ──── 威胁检测 ││
│ └─────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────┘
7.2 Prompt注入防护实现
python
# security/prompt_injection.py
import re
from typing import List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
class ThreatLevel(Enum):
"""威胁级别"""
SAFE = "safe"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class InjectionCheckResult:
"""注入检查结果"""
is_safe: bool
threat_level: ThreatLevel
detected_patterns: List[str]
sanitized_input: str
reason: str
class PromptInjectionDetector:
"""
Prompt注入检测器
检测常见注入模式:
1. 指令覆盖 (system prompt override)
2. 角色扮演攻击
3. 上下文逃逸
4. 编码绕过
"""
# 注入模式库
INJECTION_PATTERNS = {
# 指令覆盖
r"(?i)(ignore\s+(previous|all|above|prior)\s+(instructions?|rules?|guidance))",
r"(?i)(forget\s+(everything|all|what)\s+(you|me)\s+(said|told|know))",
r"(?i)(new\s+(system|instruction|rule|priority)\s*[:=])",
r"(?i)(override\s+(system|your|my)\s+(instruction|rule|setting))",
# 角色扮演/越狱
r"(?i)(pretend\s+you\s+(are|were)\s+not\s+(an?\s+)?)",
r"(?i)(you\s+are\s+now\s+(in?\s+)?(developer|DAN|evil|jailbreak))",
r"(?i)(roleplay\s+(as|that)\s+(you\s+)?(can|cannot|must|should)",
r"(?i)(disregard\s+(your|all)\s+(safety|ethical|constraint))",
# 上下文逃逸
r'(?i)(extract.*?(password|key|token|secret|credential))',
r"(?i)(reveal.*?(system\s+prompt|instruction|configuration))",
r"(?i)(tell\s+(me|us)\s+(how|what).*?(bypass|ignore|disable))",
# 编码绕过
r"(?i)(base64|hex|encode|decode|encrypt|decrypt)\s*[:=]",
r"(?i)(pyth.*?(exec|eval|compile))",
r"(?:\\x[0-9a-f]{2}|\\u[0-9a-f]{4})", # 十六进制/Unicode转义
}
# 敏感关键词
SENSITIVE_KEYWORDS = [
"password", "secret", "api_key", "apikey", "token",
"credential", "private_key", "access_key", "auth",
"confidential", "proprietary", "内部资料", "机密",
"密码", "密钥", "token", "凭证"
]
def __init__(self, strict_mode: bool = False):
self.strict_mode = strict_mode
self.patterns = [
re.compile(pattern) for pattern in self.INJECTION_PATTERNS
]
def check(self, input_text: str) -> InjectionCheckResult:
"""检查输入是否包含注入攻击"""
detected_patterns = []
threat_level = ThreatLevel.SAFE
# 1. 模式匹配检测
for i, pattern in enumerate(self.patterns):
matches = pattern.findall(input_text)
if matches:
pattern_name = list(self.INJECTION_PATTERNS.keys())[i]
detected_patterns.append(pattern_name)
# 2. 敏感词检测
for keyword in self.SENSITIVE_KEYWORDS:
if keyword.lower() in input_text.lower():
detected_patterns.append(f"sensitive_keyword:{keyword}")
# 3. 特殊字符检测(编码绕过)
special_chars = self._detect_encoding_attempts(input_text)
if special_chars:
detected_patterns.extend(special_chars)
# 4. 确定威胁级别
if detected_patterns:
if any("ignore" in p or "override" in p or "forget" in p
for p in detected_patterns):
threat_level = ThreatLevel.CRITICAL
elif any("password" in p or "secret" in p or "credential" in p
for p in detected_patterns):
threat_level = ThreatLevel.HIGH
elif any("pretend" in p or "roleplay" in p or "jailbreak" in p
for p in detected_patterns):
threat_level = ThreatLevel.HIGH
elif any("encode" in p or "exec" in p for p in detected_patterns):
threat_level = ThreatLevel.MEDIUM
else:
threat_level = ThreatLevel.LOW
# 5. 决定是否安全
is_safe = threat_level in (ThreatLevel.SAFE, ThreatLevel.LOW)
# 严格模式下,中等问题也拒绝
if self.strict_mode and threat_level == ThreatLevel.LOW:
is_safe = False
# 6. 消毒处理
sanitized = self._sanitize(input_text, detected_patterns)
return InjectionCheckResult(
is_safe=is_safe,
threat_level=threat_level,
detected_patterns=detected_patterns,
sanitized_input=sanitized,
reason=self._generate_reason(threat_level, detected_patterns)
)
def _detect_encoding_attempts(self, text: str) -> List[str]:
"""检测编码绕过尝试"""
attempts = []
# 零宽字符
if '\u200b' in text or '\u200c' in text or '\u200d' in text:
attempts.append("zero_width_characters")
# 重复的空字符
if re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', text):
attempts.append("control_characters")
# 大量Unicode变体
unicode_count = len(re.findall(r'[\u0080-\uffff]', text))
if unicode_count > len(text) * 0.3:
attempts.append("high_unicode_ratio")
return attempts
def _sanitize(self, text: str, patterns: List[str]) -> str:
"""消毒处理"""
sanitized = text
# 移除零宽字符
sanitized = sanitized.replace('\u200b', '')
sanitized = sanitized.replace('\u200c', '')
sanitized = sanitized.replace('\u200d', '')
# 移除控制字符
sanitized = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', '', sanitized)
return sanitized.strip()
def _generate_reason(self, level: ThreatLevel, patterns: List[str]) -> str:
"""生成原因描述"""
if level == ThreatLevel.SAFE:
return "未检测到注入攻击"
if not patterns:
return f"威胁级别: {level.value}"
return f"检测到威胁: {', '.join(patterns[:3])}"
# ==================== 输入验证 ====================
class InputValidator:
"""输入验证器"""
MAX_LENGTH = 10000
MAX_TOKENS = 2500 # 估算,中文约2字符/token
@classmethod
def validate(cls, text: str) -> Tuple[bool, Optional[str]]:
"""
验证输入
Returns:
(is_valid, error_message)
"""
if not text or not text.strip():
return False, "输入不能为空"
if len(text) > cls.MAX_LENGTH:
return False, f"输入长度不能超过{cls.MAX_LENGTH}字符"
# 检查不可见字符
if re.search(r'[\x00-\x08\x0b\x0c\x0e-\x1f]', text):
return False, "输入包含无效字符"
return True, None
# ==================== 输出脱敏 ====================
class OutputSanitizer:
"""输出脱敏器"""
# 敏感信息模式
SENSITIVE_PATTERNS = [
(r'\b\d{11,}\b', '[手机号]'), # 手机号
(r'\b\d{15,18}\b', '[身份证号]'), # 身份证号
(r'\b\d{16,19}\b', '[银行卡号]'), # 银行卡号
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[邮箱]'), # 邮箱
(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', '[IP地址]'), # IP地址
]
@classmethod
def sanitize(cls, text: str) -> str:
"""脱敏处理"""
sanitized = text
for pattern, replacement in cls.SENSITIVE_PATTERNS:
sanitized = re.sub(pattern, replacement, sanitized)
return sanitized
7.3 数据脱敏实现
python
# security/data_masking.py
from typing import Any, Callable, Optional
from enum import Enum
import hashlib
import json
class MaskLevel(Enum):
"""脱敏级别"""
NONE = "none" # 不脱敏
PARTIAL = "partial" # 部分脱敏
FULL = "full" # 完全脱敏
HASH = "hash" # 哈希
class DataMasker:
"""
数据脱敏工具
支持多种脱敏策略:
1. 手机号: 138****5678
2. 邮箱: a***@example.com
3. 姓名: 张*三
4. 地址: 北京市 ***
5. 身份证: 110** ***** ****1234
"""
# 脱敏规则
MASK_RULES = {
"phone": {
"pattern": r'^(\d{3})(\d{4})(\d{4})$',
"replacement": r'\1** **\3',
"level": MaskLevel.PARTIAL
},
"email": {
"pattern": r'^([a-zA-Z0-9._%+-]+)@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})$',
"replacement": lambda m: f"{m.group(1)[0]}** *@{m.group(2)}",
"level": MaskLevel.PARTIAL
},
"name": {
"pattern": r'^([\u4e00-\u9fa5])([\u4e00-\u9fa5]+)([\u4e00-\u9fa5])$',
"replacement": r'\1*\3',
"level": MaskLevel.PARTIAL
},
"id_card": {
"pattern": r'^(\d{3})\d{11}(\d{4})$',
"replacement": r'\1 ***** ***** *\2',
"level": MaskLevel.FULL
},
"bank_card": {
"pattern": r'^(\d{4})\d+(\d{4})$',
"replacement": r'\1 ****\2',
"level": MaskLevel.FULL
},
"address": {
"pattern": r'^(.+?)(市|区|县)(.+)$',
"replacement": r'\1\2***',
"level": MaskLevel.PARTIAL
}
}
@classmethod
def mask(cls, value: str, data_type: str, level: MaskLevel = None) -> str:
"""根据类型脱敏"""
if data_type not in cls.MASK_RULES:
return cls._partial_mask(value)
rule = cls.MASK_RULES[data_type]
# 检查脱敏级别
if level == MaskLevel.NONE:
return value
rule_level = level or rule["level"]
if rule_level == MaskLevel.FULL:
return cls._full_mask(value)
if rule_level == MaskLevel.HASH:
return cls._hash_mask(value)
# PARTIAL - 使用规则脱敏
replacement = rule["replacement"]
if callable(replacement):
import re
return re.sub(rule["pattern"], replacement, value)
else:
import re
return re.sub(rule["pattern"], replacement, value)
@classmethod
def _partial_mask(cls, value: str, visible_start: int = 3, visible_end: int = 4) -> str:
"""部分脱敏: 显示前后几位"""
if len(value) <= visible_start + visible_end:
return '*' * len(value)
return f"{value[:visible_start]}{'*' * (len(value) - visible_start - visible_end)}{value[-visible_end:]}"
@classmethod
def _full_mask(cls, value: str, mask_char: str = '*') -> str:
"""完全脱敏"""
return mask_char * len(value)
@classmethod
def _hash_mask(cls, value: str) -> str:
"""哈希脱敏"""
return hashlib.sha256(value.encode()).hexdigest()[:16]
# ==================== 对话记录脱敏 ====================
class ConversationMasker:
"""对话记录脱敏"""
@classmethod
def mask_conversation(cls, messages: list) -> list:
"""脱敏对话记录"""
masked_messages = []
for msg in messages:
masked_msg = {
"role": msg.get("role"),
"content": cls._mask_content(msg.get("content", "")),
"timestamp": msg.get("timestamp"),
"masked": True
}
# 如果有metadata,也脱敏
if "metadata" in msg:
masked_msg["metadata"] = cls.mask_dict(msg["metadata"])
masked_messages.append(masked_msg)
return masked_messages
@classmethod
def _mask_content(cls, content: str) -> str:
"""脱敏对话内容"""
masked = content
# 脱敏手机号
masked = DataMasker.mask(masked, "phone")
# 脱敏邮箱
masked = DataMasker.mask(masked, "email")
# 脱敏姓名(中文名)
masked = DataMasker.mask(masked, "name")
return masked
@classmethod
def mask_dict(cls, data: dict, sensitive_keys: list = None) -> dict:
"""脱敏字典中的敏感字段"""
if sensitive_keys is None:
sensitive_keys = [
"phone", "email", "name", "address",
"id_card", "bank_card", "password", "secret"
]
masked = {}
for key, value in data.items():
if any(sensitive in key.lower() for sensitive in sensitive_keys):
if isinstance(value, str):
masked[key] = DataMasker.mask(value, key, MaskLevel.FULL)
else:
masked[key] = " ***"
elif isinstance(value, dict):
masked[key] = cls.mask_dict(value, sensitive_keys)
elif isinstance(value, list):
masked[key] = [
cls.mask_dict(item, sensitive_keys) if isinstance(item, dict) else item
for item in value
]
else:
masked[key] = value
return masked
第八章:架构思维的本质
8.1 从业务到技术的映射
plaintext
架构设计思维导图:
┌─────────────────┐
│ 业务需求 │
│ 24/7服务 │
│ 精准回复 │
│ 成本控制 │
└────────┬────────┘
│
▼
┌─────────────────────────────────────┐
│ 需求分析 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌───────┐ │
│ │ 功能需求 │ │ 非功能 │ │ 约束 │ │
│ │ │ │ 需求 │ │ │ │
│ └────┬────┘ └────┬────┘ └───┬───┘ │
└───────┼───────────┼───────────┼─────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────┐
│ 技术选型 │
│ │
│ 1. 功能 → 组件映射 │
│ 意图识别 → Router Agent │
│ 知识检索 → RAG System │
│ │
│ 2. 非功能 → 架构模式 │
│ 高可用 → 冗余 + 熔断 │
│ 低延迟 → 缓存 + 流式 │
│ │
│ 3. 约束 → 边界条件 │
│ 预算 → 开源方案优先 │
│ 合规 → 数据脱敏 + 审计 │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ 架构设计 │
│ │
│ 分层架构 │
│ ┌───────┬───────┬───────┬───────┐ │
│ │ Front │Gateway│ Agent │Knowledge│ │
│ │ │ │ │ │ │
│ │ UI │ 路由 │ 业务 │ RAG │ │
│ │ 组件 │ 限流 │ 编排 │ 检索 │ │
│ └───────┴───────┴───────┴───────┘ │
│ │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ 迭代演进 │
│ │
│ MVP → 扩展功能 → 性能优化 → 规模化 │
│ │
└─────────────────────────────────────┘
8.2 权衡取舍的艺术
plaintext
架构权衡矩阵:
┌────────────────────────────────────────────────────────────────────────────┐
│ 常见架构权衡 │
├────────────────────┬─────────────────┬─────────────────┬───────────────────┤
│ 维度 │ 方案A │ 方案B │ 选择建议 │
├────────────────────┼─────────────────┼─────────────────┼───────────────────┤
│ 复杂度 vs 可维护 │ 单体应用 │ 微服务架构 │ 团队小选单体 │
│ │ 简单但难扩展 │ 复杂但易扩展 │ 团队大选微服务 │
├────────────────────┼─────────────────┼─────────────────┼───────────────────┤
│ 性能 vs 成本 │ 自建向量数据库 │ 使用云服务 │ 初期用云服务 │
│ │ 灵活但贵 │ 便宜但有限制 │ 有能力后自建 │
├────────────────────┼─────────────────┼─────────────────┼───────────────────┤
│ 一致性 vs 可用性 │ 强一致 │ 最终一致 │ 核心数据强一致 │
│ │ 响应慢 │ 响应快 │ 非核心最终一致 │
├────────────────────┼─────────────────┼─────────────────┼───────────────────┤
│ 实时性 vs 可靠性 │ 同步处理 │ 异步处理 │ 需要确认用同步 │
│ │ 响应快但脆弱 │ 慢但可靠 │ 允许延迟用异步 │
├────────────────────┼─────────────────┼─────────────────┼───────────────────┤
│ 标准化 vs 灵活性 │ 标准协议 │ 自定义格式 │ 外部接口用标准 │
│ │ 互操作性好 │ 定制能力强 │ 内部可用自定义 │
├────────────────────┼─────────────────┼─────────────────┼───────────────────┤
│ 安全性 vs 易用性 │ 严格验证 │ 简化流程 │ 核心流程安全优先 │
│ │ 安全但麻烦 │ 方便但风险 │ 非核心可简化 │
└────────────────────┴─────────────────┴─────────────────┴───────────────────┘
💡 核心原则:
1. 没有最好的架构,只有最适合的架构
2. 权衡时明确知道自己在trade-off什么
3. 记录决策过程和理由,便于以后回顾
4. 架构要服务于业务,而不是相反
8.3 持续演进的架构
plaintext
架构演进路线图:
┌─────────────────────────────────────────────────────────────────┐
│ Architecture Evolution │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Phase 1: MVP (1-2个月) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ 用户 ──▶ Web ──▶ 单体服务 ──▶ 知识库 │ │
│ │ (所有逻辑) │ │
│ │ │ │
│ │ 目标:验证核心价值,最快速度上线 │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Phase 2: 能力扩展 (3-4个月) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────┐ │ │
│ │ │Router│ │ │
│ │ └──┬──┘ │ │
│ │ ┌──┴──┐ │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ Query Order Complaint │ │
│ │ Agent Agent Agent │ │
│ │ │ │
│ │ 目标:支持多场景,Multi-Agent协作 │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Phase 3: 性能优化 (2-3个月) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌────┐ ┌────┐ ┌────┐ │ │
│ │ │Cache│──│Queue│──│Worker│ │ │
│ │ └────┘ └────┘ └────┘ │ │
│ │ │ │
│ │ 目标:降低延迟,提升吞吐量 │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Phase 4: 规模化 (持续) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────┐ │ │
│ │ │ Kubernetes Cluster │ │ │
│ │ │ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │ │ │
│ │ │ │Pod │ │Pod │ │Pod │ │Pod │ │Pod │ ... │ │ │
│ │ │ └────┘ └────┘ └────┘ └────┘ └────┘ │ │ │
│ │ └────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ 目标:支持高可用、弹性伸缩 │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
技术债务管理:
┌─────────────────────────────────────────────────────────────────┐
│ Technical Debt │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ 识别债务 │ │ 评估影响 │ │
│ │ • 代码重复 │ │ • 维护成本 │ │
│ │ • 硬编码配置 │ │ • 开发效率 │ │
│ │ • 缺少测试 │ │ • 系统稳定性 │ │
│ │ • 过时依赖 │ │ • 安全性 │ │
│ └──────────┬──────────┘ └──────────┬──────────┘ │
│ │ │ │
│ └──────────┬───────────────┘ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ 优先级排序 │ │
│ │ │ │
│ │ 🔴 高影响+易修复 │ ← 立即处理 │
│ │ 🟡 高影响+难修复 │ ← 计划处理 │
│ │ 🟢 低影响+易修复 │ ← 有空就做 │
│ │ ⚪ 低影响+难修复 │ ← 暂时忽略 │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
结语
核心要点回顾
-
架构是关于决策的 :每个架构决策都应该有明确的理由,服务于业务目标。
-
分层是组织复杂性的手段 :不是为了分层而分层,而是为了让系统更易理解、维护和演进。
-
技术选型要权衡 :
- 性能 vs 成本 vs 复杂度
- 标准化 vs 灵活性
- 当前需求 vs 未来扩展
-
Multi-Agent是复杂AI系统的必然选择 :
- 专业分工
- 独立演进
- 容错隔离
-
RAG是企业知识应用的标配 :
- 解决LLM知识过时问题
- 保证回答的准确性
- 支持知识持续更新
-
安全要内嵌,不是外挂 :
- 从设计阶段就考虑安全
- 多层防护
- 持续监控
-
架构需要演进 :
- 从MVP开始
- 根据业务反馈迭代
- 管理技术债务
未来展望
plaintext
新兴技术趋势:
┌─────────────────────────────────────────────────────────────────┐
│ Future Technologies │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ MCP协议 │ │ Multi-Agent │ │ 边缘AI │ │
│ │ Model Context │ │ Memory │ │ Edge AI │ │
│ │ Protocol │ │ │ │ │ │
│ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │
│ │ Agent标准化 │ │ 长期记忆共享 │ │ 本地推理 │ │
│ │ 工具互操作 │ │ 经验积累 │ │ 隐私保护 │ │
│ │ 安全可信 │ │ 个性化服务 │ │ 低延迟 │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ 知识图谱+RAG │ │ 主动式AI │ │ 多模态融合 │ │
│ │ Knowledge Graph│ │ Proactive AI │ │ Multimodal │ │
│ │ + RAG │ │ │ │ │ │
│ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │
│ │ 结构化知识 │ │ 预判用户需求 │ │ 图文音视频 │ │
│ │ 关系推理 │ │ 主动服务 │ │ 统一理解 │ │
│ │ 可解释性强 │ │ 预防性客服 │ │ 更自然交互 │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
参考资料
官方文档
-
Anthropic - Building Effective AI Agents
- https://docs.anthropic.com/en/docs/build-anthropic-ai-agent
- 理解AI Agent的设计原则和最佳实践
-
LangGraph Documentation
- https://langchain-ai.github.io/langgraph/
- Multi-Agent编排的核心框架
-
Retrieval-Augmented Generation (RAG)
架构设计模式
-
API Gateway Pattern
-
Circuit Breaker Pattern
-
Rate Limiting Algorithms
安全最佳实践
-
OWASP Top 10 for LLM Applications
-
Prompt Injection Attack Taxonomy
工程实践
-
LangChain Best Practices
-
Building Reliable Distributed Systems
- 《Designing Data-Intensive Applications》- Martin Kleppmann
"The architecture of a system is the decisions you make that allow it to evolve over time."
— Lean Architecture, James Coplien
本文档版本:v1.0
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)