多租户RAG架构:如何服务1000+企业客户
·
🏢 企业级RAG系统完整指南 | 数据隔离 + RBAC权限 + 资源调度 | 支持1000+租户的架构设计
📖 什么是多租户RAG?
多租户(Multi-tenancy) = 一套系统服务多个客户
在RAG系统中,意味着:
- 1000家企业共用同一个RAG平台
- 每家企业的数据完全隔离
- 每个企业的用户只能访问自己的知识库
- 资源按需分配,互不影响
为什么需要多租户?
单租户的问题:
企业A → 部署一套RAG系统 → 成本高、维护难
企业B → 部署一套RAG系统 → 成本高、维护难
企业C → 部署一套RAG系统 → 成本高、维护难
...
总成本:N套系统 × 高成本
多租户的优势:
企业A ┐
企业B ├→ 共用一套RAG平台 → 成本低、易维护
企业C ┘
...
总成本:1套系统 ÷ N个客户 = 低成本
🏗️ 多租户RAG核心架构
┌─────────────────────────────────────────┐
│ API Gateway / Load Balancer │
└──────────────┬──────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Authentication & Authorization │
│ (JWT + RBAC 权限控制) │
└──────────────┬──────────────────────────┘
↓
┌─────────────────────────────────────────┐
│ Tenant Context Manager │
│ (识别租户ID,设置上下文) │
└──────────────┬──────────────────────────┘
↓
┌──────┴──────┐
↓ ↓
┌──────────────┐ ┌──────────────┐
│ Tenant A │ │ Tenant B │
│ Vector DB │ │ Vector DB │
│ (隔离) │ │ (隔离) │
└──────────────┘ └──────────────┘
↓ ↓
┌──────────────┐ ┌──────────────┐
│ LLM Pool │ │ LLM Pool │
│ (共享) │ │ (共享) │
└──────────────┘ └──────────────┘
三大核心挑战
- 数据隔离 - 确保租户间数据完全不泄露
- 权限管理 - 细粒度的访问控制
- 资源调度 - 公平分配计算资源
🔐 数据隔离方案
方案对比
| 方案 | 隔离级别 | 复杂度 | 性能 | 适用场景 |
|---|---|---|---|---|
| Schema隔离 | 高 | 中 | 好 | 中型系统(100-500租户) |
| Row隔离 | 中 | 低 | 最好 | 大型系统(500-2000租户) |
| Database隔离 | 最高 | 高 | 一般 | 超高安全要求 |
方案A:Schema隔离(推荐)
原理: 每个租户有独立的Schema(命名空间)
# PostgreSQL示例
CREATE SCHEMA tenant_001;
CREATE SCHEMA tenant_002;
# 每个Schema下有独立的表
CREATE TABLE tenant_001.documents (
id UUID PRIMARY KEY,
content TEXT,
embedding VECTOR(1536),
created_at TIMESTAMP
);
CREATE TABLE tenant_002.documents (
id UUID PRIMARY KEY,
content TEXT,
embedding VECTOR(1536),
created_at TIMESTAMP
);
优点:
- ✅ 物理隔离,安全性高
- ✅ 备份和恢复简单
- ✅ 可以单独优化每个租户
缺点:
- ❌ Schema数量多时管理复杂
- ❌ 迁移和升级需要同步所有Schema
实现代码:
from sqlalchemy import create_engine, text
from contextlib import contextmanager
class TenantDatabaseManager:
"""租户数据库管理器"""
def __init__(self, base_url: str):
self.base_url = base_url
def _get_tenant_url(self, tenant_id: str) -> str:
"""获取租户数据库URL"""
return f"{self.base_url}?options=-c%20search_path=tenant_{tenant_id}"
@contextmanager
def get_tenant_connection(self, tenant_id: str):
"""获取租户连接(上下文管理器)"""
engine = create_engine(self._get_tenant_url(tenant_id))
conn = engine.connect()
try:
# 设置搜索路径
conn.execute(text(f"SET search_path TO tenant_{tenant_id}"))
yield conn
finally:
conn.close()
def create_tenant_schema(self, tenant_id: str):
"""创建租户Schema"""
engine = create_engine(self.base_url)
with engine.connect() as conn:
# 创建Schema
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS tenant_{tenant_id}"))
# 创建表
conn.execute(text(f"""
CREATE TABLE IF NOT EXISTS tenant_{tenant_id}.documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content TEXT NOT NULL,
metadata JSONB,
embedding VECTOR(1536),
created_at TIMESTAMP DEFAULT NOW()
)
"""))
# 创建向量索引
conn.execute(text(f"""
CREATE INDEX IF NOT EXISTS idx_tenant_{tenant_id}_embedding
ON tenant_{tenant_id}.documents
USING ivfflat (embedding vector_cosine_ops)
"""))
conn.commit()
print(f"✅ 租户 {tenant_id} 的Schema已创建")
def delete_tenant_schema(self, tenant_id: str):
"""删除租户Schema"""
engine = create_engine(self.base_url)
with engine.connect() as conn:
conn.execute(text(f"DROP SCHEMA IF EXISTS tenant_{tenant_id} CASCADE"))
conn.commit()
print(f"✅ 租户 {tenant_id} 的Schema已删除")
# 使用示例
db_manager = TenantDatabaseManager("postgresql://user:pass@localhost/rag_db")
# 创建新租户
db_manager.create_tenant_schema("company_a")
# 查询租户数据
with db_manager.get_tenant_connection("company_a") as conn:
result = conn.execute(text("SELECT * FROM documents LIMIT 10"))
for row in result:
print(row)
方案B:Row隔离(适合大规模)
原理: 所有租户共用表,通过tenant_id字段区分
from sqlalchemy import Column, String, Text
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class Document(Base):
"""文档模型(所有租户共用)"""
__tablename__ = 'documents'
id = Column(String, primary_key=True)
tenant_id = Column(String, index=True) # 租户ID
content = Column(Text)
metadata = Column(JSON)
embedding = Column(Vector(1536))
created_at = Column(DateTime)
优点:
- ✅ 简单易实现
- ✅ 性能好(单表查询)
- ✅ 易于扩展
缺点:
- ❌ 需要严格的SQL过滤
- ❌ 误删风险(忘记加WHERE tenant_id)
实现代码:
from functools import wraps
from flask import g, abort
def tenant_required(f):
"""租户验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
tenant_id = g.get('tenant_id')
if not tenant_id:
abort(403, "未指定租户")
# 在所有查询中自动添加租户过滤
kwargs['tenant_id'] = tenant_id
return f(*args, **kwargs)
return decorated_function
class TenantAwareRepository:
"""租户感知的数据仓库"""
def __init__(self, session):
self.session = session
def add_document(self, tenant_id: str, content: str, metadata: dict = None):
"""添加文档(自动关联租户)"""
from uuid import uuid4
doc = Document(
id=str(uuid4()),
tenant_id=tenant_id,
content=content,
metadata=metadata or {}
)
self.session.add(doc)
self.session.commit()
return doc.id
def query_documents(self, tenant_id: str, limit: int = 10):
"""查询文档(自动过滤租户)"""
return (
self.session.query(Document)
.filter(Document.tenant_id == tenant_id)
.limit(limit)
.all()
)
def delete_document(self, tenant_id: str, doc_id: str):
"""删除文档(双重验证)"""
doc = (
self.session.query(Document)
.filter(
Document.id == doc_id,
Document.tenant_id == tenant_id # 必须匹配租户
)
.first()
)
if not doc:
raise ValueError("文档不存在或无权访问")
self.session.delete(doc)
self.session.commit()
# 使用示例
@tenant_required
def upload_document(tenant_id: str, content: str):
repo = TenantAwareRepository(db_session)
doc_id = repo.add_document(tenant_id, content)
return {"doc_id": doc_id}
方案C:混合隔离(最佳实践)
策略:
- 小租户(<1000文档)→ Row隔离
- 大租户(≥1000文档)→ Schema隔离
class HybridTenantManager:
"""混合租户管理器"""
def __init__(self):
self.row_isolation_threshold = 1000 # 阈值
def get_isolation_strategy(self, tenant_id: str) -> str:
"""根据租户规模选择隔离策略"""
doc_count = self.get_tenant_document_count(tenant_id)
if doc_count < self.row_isolation_threshold:
return "row"
else:
return "schema"
def query_documents(self, tenant_id: str, query: str):
"""智能查询(自动选择策略)"""
strategy = self.get_isolation_strategy(tenant_id)
if strategy == "row":
return self._query_row_isolation(tenant_id, query)
else:
return self._query_schema_isolation(tenant_id, query)
👥 RBAC权限管理系统
角色定义
from enum import Enum
from dataclasses import dataclass
from typing import List
class Role(Enum):
"""用户角色"""
ADMIN = "admin" # 租户管理员
EDITOR = "editor" # 编辑者
VIEWER = "viewer" # 查看者
API_USER = "api_user" # API调用者
class Permission(Enum):
"""权限类型"""
DOCUMENT_CREATE = "document:create"
DOCUMENT_READ = "document:read"
DOCUMENT_UPDATE = "document:update"
DOCUMENT_DELETE = "document:delete"
KNOWLEDGE_BASE_MANAGE = "kb:manage"
USER_MANAGE = "user:manage"
# 角色-权限映射
ROLE_PERMISSIONS = {
Role.ADMIN: [
Permission.DOCUMENT_CREATE,
Permission.DOCUMENT_READ,
Permission.DOCUMENT_UPDATE,
Permission.DOCUMENT_DELETE,
Permission.KNOWLEDGE_BASE_MANAGE,
Permission.USER_MANAGE,
],
Role.EDITOR: [
Permission.DOCUMENT_CREATE,
Permission.DOCUMENT_READ,
Permission.DOCUMENT_UPDATE,
],
Role.VIEWER: [
Permission.DOCUMENT_READ,
],
Role.API_USER: [
Permission.DOCUMENT_READ,
],
}
权限验证中间件
from functools import wraps
from flask import request, g, jsonify
def require_permission(permission: Permission):
"""权限验证装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user = g.current_user
# 检查用户是否有该权限
if not user.has_permission(permission):
return jsonify({
"error": "权限不足",
"required": permission.value
}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
# 使用示例
@app.route('/api/documents', methods=['POST'])
@require_permission(Permission.DOCUMENT_CREATE)
def create_document():
"""创建文档(需要创建权限)"""
tenant_id = g.tenant_id
content = request.json['content']
# 业务逻辑
doc_id = document_service.create(tenant_id, content)
return jsonify({"doc_id": doc_id}), 201
@app.route('/api/documents/<doc_id>', methods=['GET'])
@require_permission(Permission.DOCUMENT_READ)
def get_document(doc_id: str):
"""获取文档(需要读取权限)"""
tenant_id = g.tenant_id
doc = document_service.get(tenant_id, doc_id)
return jsonify(doc.to_dict())
JWT Token生成与验证
import jwt
from datetime import datetime, timedelta
class AuthService:
"""认证服务"""
def __init__(self, secret_key: str):
self.secret_key = secret_key
def generate_token(
self,
user_id: str,
tenant_id: str,
role: Role
) -> str:
"""生成JWT Token"""
payload = {
'user_id': user_id,
'tenant_id': tenant_id,
'role': role.value,
'exp': datetime.utcnow() + timedelta(hours=24),
'iat': datetime.utcnow()
}
token = jwt.encode(payload, self.secret_key, algorithm='HS256')
return token
def verify_token(self, token: str) -> dict:
"""验证Token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
raise Exception("Token已过期")
except jwt.InvalidTokenError:
raise Exception("无效的Token")
# Flask中间件
@app.before_request
def authenticate():
"""请求前认证"""
auth_header = request.headers.get('Authorization')
if not auth_header:
return jsonify({"error": "缺少认证信息"}), 401
try:
token = auth_header.split(' ')[1]
payload = auth_service.verify_token(token)
# 设置全局上下文
g.current_user = User(
user_id=payload['user_id'],
tenant_id=payload['tenant_id'],
role=Role(payload['role'])
)
g.tenant_id = payload['tenant_id']
except Exception as e:
return jsonify({"error": str(e)}), 401
⚖️ 资源调度与限流
配额管理
from dataclasses import dataclass
from datetime import datetime
@dataclass
class TenantQuota:
"""租户配额"""
tenant_id: str
max_documents: int = 10000 # 最大文档数
max_queries_per_day: int = 10000 # 每日最大查询数
max_storage_gb: float = 10.0 # 最大存储空间
max_concurrent_requests: int = 10 # 最大并发请求
used_documents: int = 0
used_queries_today: int = 0
used_storage_gb: float = 0.0
current_requests: int = 0
class QuotaManager:
"""配额管理器"""
def __init__(self):
self.quotas: dict[str, TenantQuota] = {}
def check_quota(self, tenant_id: str, resource: str) -> bool:
"""检查配额"""
quota = self.quotas.get(tenant_id)
if not quota:
return False
if resource == "documents":
return quota.used_documents < quota.max_documents
elif resource == "queries":
return quota.used_queries_today < quota.max_queries_per_day
elif resource == "storage":
return quota.used_storage_gb < quota.max_storage_gb
return False
def increment_usage(self, tenant_id: str, resource: str):
"""增加使用量"""
quota = self.quotas.get(tenant_id)
if quota:
if resource == "documents":
quota.used_documents += 1
elif resource == "queries":
quota.used_queries_today += 1
# 限流装饰器
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.route('/api/query', methods=['POST'])
@limiter.limit("100/hour") # 每小时100次
@require_permission(Permission.DOCUMENT_READ)
def query_knowledge_base():
"""查询知识库(带限流)"""
tenant_id = g.tenant_id
# 检查配额
if not quota_manager.check_quota(tenant_id, "queries"):
return jsonify({"error": "超出查询配额"}), 429
# 增加使用量
quota_manager.increment_usage(tenant_id, "queries")
# 业务逻辑
query = request.json['query']
results = rag_service.query(tenant_id, query)
return jsonify(results)
优先级队列
import heapq
from queue import PriorityQueue
class PriorityRequestQueue:
"""优先级请求队列"""
def __init__(self):
self.queue = PriorityQueue()
self.counter = 0
def add_request(self, tenant_id: str, priority: int, request_data: dict):
"""添加请求(优先级高的先处理)"""
# priority: 1=高, 2=中, 3=低
self.queue.put((priority, self.counter, tenant_id, request_data))
self.counter += 1
def get_next_request(self):
"""获取下一个请求"""
if not self.queue.empty():
priority, _, tenant_id, request_data = self.queue.get()
return {
'tenant_id': tenant_id,
'priority': priority,
'data': request_data
}
return None
# 使用示例
request_queue = PriorityRequestQueue()
# VIP租户的请求优先处理
request_queue.add_request("vip_company", 1, {"query": "重要问题"})
request_queue.add_request("normal_company", 3, {"query": "普通问题"})
# 处理请求
next_request = request_queue.get_next_request()
print(f"处理租户: {next_request['tenant_id']}, 优先级: {next_request['priority']}")
💻 完整项目代码
我已经为你准备了完整的多租户RAG平台项目:
GitHub仓库:
git clone https://github.com/Lee985-cmd/AI-30-Day-Challenge.git
cd projects/multi-tenant-rag-platform
项目特性:
- ✅ Schema + Row混合隔离
- ✅ RBAC权限管理
- ✅ JWT认证
- ✅ 配额管理和限流
- ✅ 优先级队列
- ✅ Streamlit管理后台
- ✅ 完整的API文档
快速启动:
# 安装依赖
pip install -r requirements.txt
# 启动PostgreSQL(Docker)
docker run -d \
--name postgres \
-e POSTGRES_PASSWORD=password \
-p 5432:5432 \
postgres:15
# 配置环境变量
export DATABASE_URL="postgresql://postgres:password@localhost:5432/rag_db"
export JWT_SECRET_KEY="your-secret-key"
# 初始化数据库
python init_db.py
# 启动API服务
uvicorn main:app --reload
# 启动管理后台
streamlit run admin_dashboard.py
📊 性能优化技巧
1. 连接池优化
from sqlalchemy.pool import QueuePool
engine = create_engine(
database_url,
poolclass=QueuePool,
pool_size=20, # 连接池大小
max_overflow=40, # 最大溢出连接数
pool_timeout=30, # 超时时间
pool_recycle=1800 # 连接回收时间(秒)
)
2. 缓存策略
import redis
from functools import lru_cache
class TenantCache:
"""租户缓存"""
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_tenant_config(self, tenant_id: str) -> dict:
"""获取租户配置(带缓存)"""
cache_key = f"tenant:{tenant_id}:config"
# 尝试从缓存获取
cached = self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 从数据库查询
config = self._load_from_db(tenant_id)
# 存入缓存(1小时过期)
self.redis_client.setex(cache_key, 3600, json.dumps(config))
return config
3. 异步处理
import asyncio
from celery import Celery
celery_app = Celery('tasks', broker='redis://localhost:6379/1')
@celery_app.task
def async_index_documents(tenant_id: str, doc_ids: list):
"""异步索引文档"""
for doc_id in doc_ids:
# 生成向量
embedding = generate_embedding(doc_id)
# 存入向量数据库
vector_db.insert(tenant_id, doc_id, embedding)
return f"Indexed {len(doc_ids)} documents for tenant {tenant_id}"
# 使用
async_index_documents.delay("tenant_001", ["doc1", "doc2", "doc3"])
🛠️ 常见问题与解决
Q1: 如何保证数据完全隔离?
答: 多层防护
# 1. 数据库层:Schema隔离
CREATE SCHEMA tenant_001;
# 2. 应用层:强制租户过滤
SELECT * FROM documents WHERE tenant_id = :tenant_id;
# 3. 代码层:装饰器验证
@tenant_required
def query_documents(tenant_id: str):
...
# 4. 审计层:记录所有访问
audit_log.log(tenant_id, user_id, action, timestamp)
Q2: 如何处理租户激增?
答: 自动化扩容
class AutoScalingManager:
"""自动扩缩容管理器"""
def check_and_scale(self):
"""检查并扩容"""
active_tenants = self.get_active_tenants_count()
if active_tenants > self.threshold:
# 添加新的数据库实例
new_instance = self.create_database_instance()
# 更新负载均衡
self.update_load_balancer(new_instance)
# 发送告警
self.send_alert(f"Auto-scaled to {new_instance}")
Q3: 如何实现灰度发布?
答: 基于租户的灰度
def should_use_new_feature(tenant_id: str) -> bool:
"""判断租户是否使用新功能"""
# 10%的租户使用新版本
if hash(tenant_id) % 100 < 10:
return True
return False
@app.route('/api/query')
def query():
tenant_id = g.tenant_id
if should_use_new_feature(tenant_id):
return new_query_handler()
else:
return old_query_handler()
📈 实际应用案例
案例1:SaaS客服平台
场景: 为500家企业提供智能客服
架构:
- 数据隔离:Schema隔离
- 权限管理:RBAC(Admin/Editor/Viewer)
- 资源配额:每企业10000文档/天
- 性能:平均响应时间<500ms
效果:
- 运维成本降低70%
- 新客户接入时间从1天缩短到5分钟
- 客户满意度提升40%
案例2:企业知识库平台
场景: 为1000+企业提供知识管理
架构:
- 数据隔离:混合隔离(小租户Row,大租户Schema)
- 权限管理:自定义角色
- 资源配额:按套餐分级
- 监控:Prometheus + Grafana
效果:
- 支持1000+企业同时在线
- 数据存储成本降低60%
- 系统可用性99.9%
🎯 总结
多租户RAG的核心要点:
- ✅ 数据隔离 - Schema隔离(安全)+ Row隔离(性能)
- ✅ 权限管理 - RBAC + JWT + 中间件验证
- ✅ 资源调度 - 配额管理 + 限流 + 优先级队列
- ✅ 性能优化 - 连接池 + 缓存 + 异步处理
最佳实践:
- 从小规模开始,逐步演进
- 多层防护,不要依赖单一机制
- 完善的监控和告警
- 定期审计和安全测试
下一步:
- 尝试完整项目代码
- 根据业务需求调整隔离策略
- 实施监控和告警
- 进行压力测试
完整代码和详细教程: 👉 GitHub仓库
上一篇: Agent性能优化:从秒级到毫秒级的提速技巧
下一篇: 用Agent自动化数据处理:从2小时到15分钟的效率革命
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)