🏢 企业级RAG系统完整指南 | 数据隔离 + RBAC权限 + 资源调度 | 支持1000+租户的架构设计


📖 什么是多租户RAG?

多租户(Multi-tenancy) = 一套系统服务多个客户

在RAG系统中,意味着:

  • 1000家企业共用同一个RAG平台
  • 每家企业的数据完全隔离
  • 每个企业的用户只能访问自己的知识库
  • 资源按需分配,互不影响

为什么需要多租户?

单租户的问题:

企业A → 部署一套RAG系统 → 成本高、维护难
企业B → 部署一套RAG系统 → 成本高、维护难
企业C → 部署一套RAG系统 → 成本高、维护难
...
总成本:N套系统 × 高成本

多租户的优势:

企业A ┐
企业B ├→ 共用一套RAG平台 → 成本低、易维护
企业C ┘
...
总成本:1套系统 ÷ N个客户 = 低成本

🏗️ 多租户RAG核心架构

┌─────────────────────────────────────────┐
│         API Gateway / Load Balancer     │
└──────────────┬──────────────────────────┘
               ↓
┌─────────────────────────────────────────┐
│      Authentication & Authorization     │
│         (JWT + RBAC 权限控制)            │
└──────────────┬──────────────────────────┘
               ↓
┌─────────────────────────────────────────┐
│         Tenant Context Manager          │
│      (识别租户ID,设置上下文)             │
└──────────────┬──────────────────────────┘
               ↓
        ┌──────┴──────┐
        ↓             ↓
┌──────────────┐ ┌──────────────┐
│  Tenant A    │ │  Tenant B    │
│  Vector DB   │ │  Vector DB   │
│  (隔离)      │ │  (隔离)      │
└──────────────┘ └──────────────┘
        ↓             ↓
┌──────────────┐ ┌──────────────┐
│  LLM Pool    │ │  LLM Pool    │
│  (共享)      │ │  (共享)      │
└──────────────┘ └──────────────┘

三大核心挑战

  1. 数据隔离 - 确保租户间数据完全不泄露
  2. 权限管理 - 细粒度的访问控制
  3. 资源调度 - 公平分配计算资源

🔐 数据隔离方案

方案对比

方案 隔离级别 复杂度 性能 适用场景
Schema隔离 中型系统(100-500租户)
Row隔离 最好 大型系统(500-2000租户)
Database隔离 最高 一般 超高安全要求

方案A:Schema隔离(推荐)

原理: 每个租户有独立的Schema(命名空间)

# PostgreSQL示例
CREATE SCHEMA tenant_001;
CREATE SCHEMA tenant_002;

# 每个Schema下有独立的表
CREATE TABLE tenant_001.documents (
    id UUID PRIMARY KEY,
    content TEXT,
    embedding VECTOR(1536),
    created_at TIMESTAMP
);

CREATE TABLE tenant_002.documents (
    id UUID PRIMARY KEY,
    content TEXT,
    embedding VECTOR(1536),
    created_at TIMESTAMP
);

优点:

  • ✅ 物理隔离,安全性高
  • ✅ 备份和恢复简单
  • ✅ 可以单独优化每个租户

缺点:

  • ❌ Schema数量多时管理复杂
  • ❌ 迁移和升级需要同步所有Schema

实现代码:

from sqlalchemy import create_engine, text
from contextlib import contextmanager

class TenantDatabaseManager:
    """租户数据库管理器"""
    
    def __init__(self, base_url: str):
        self.base_url = base_url
    
    def _get_tenant_url(self, tenant_id: str) -> str:
        """获取租户数据库URL"""
        return f"{self.base_url}?options=-c%20search_path=tenant_{tenant_id}"
    
    @contextmanager
    def get_tenant_connection(self, tenant_id: str):
        """获取租户连接(上下文管理器)"""
        engine = create_engine(self._get_tenant_url(tenant_id))
        conn = engine.connect()
        try:
            # 设置搜索路径
            conn.execute(text(f"SET search_path TO tenant_{tenant_id}"))
            yield conn
        finally:
            conn.close()
    
    def create_tenant_schema(self, tenant_id: str):
        """创建租户Schema"""
        engine = create_engine(self.base_url)
        with engine.connect() as conn:
            # 创建Schema
            conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS tenant_{tenant_id}"))
            
            # 创建表
            conn.execute(text(f"""
                CREATE TABLE IF NOT EXISTS tenant_{tenant_id}.documents (
                    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                    content TEXT NOT NULL,
                    metadata JSONB,
                    embedding VECTOR(1536),
                    created_at TIMESTAMP DEFAULT NOW()
                )
            """))
            
            # 创建向量索引
            conn.execute(text(f"""
                CREATE INDEX IF NOT EXISTS idx_tenant_{tenant_id}_embedding 
                ON tenant_{tenant_id}.documents 
                USING ivfflat (embedding vector_cosine_ops)
            """))
            
            conn.commit()
        
        print(f"✅ 租户 {tenant_id} 的Schema已创建")
    
    def delete_tenant_schema(self, tenant_id: str):
        """删除租户Schema"""
        engine = create_engine(self.base_url)
        with engine.connect() as conn:
            conn.execute(text(f"DROP SCHEMA IF EXISTS tenant_{tenant_id} CASCADE"))
            conn.commit()
        
        print(f"✅ 租户 {tenant_id} 的Schema已删除")

# 使用示例
db_manager = TenantDatabaseManager("postgresql://user:pass@localhost/rag_db")

# 创建新租户
db_manager.create_tenant_schema("company_a")

# 查询租户数据
with db_manager.get_tenant_connection("company_a") as conn:
    result = conn.execute(text("SELECT * FROM documents LIMIT 10"))
    for row in result:
        print(row)

方案B:Row隔离(适合大规模)

原理: 所有租户共用表,通过tenant_id字段区分

from sqlalchemy import Column, String, Text
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class Document(Base):
    """文档模型(所有租户共用)"""
    __tablename__ = 'documents'
    
    id = Column(String, primary_key=True)
    tenant_id = Column(String, index=True)  # 租户ID
    content = Column(Text)
    metadata = Column(JSON)
    embedding = Column(Vector(1536))
    created_at = Column(DateTime)

优点:

  • ✅ 简单易实现
  • ✅ 性能好(单表查询)
  • ✅ 易于扩展

缺点:

  • ❌ 需要严格的SQL过滤
  • ❌ 误删风险(忘记加WHERE tenant_id)

实现代码:

from functools import wraps
from flask import g, abort

def tenant_required(f):
    """租户验证装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        tenant_id = g.get('tenant_id')
        
        if not tenant_id:
            abort(403, "未指定租户")
        
        # 在所有查询中自动添加租户过滤
        kwargs['tenant_id'] = tenant_id
        return f(*args, **kwargs)
    
    return decorated_function

class TenantAwareRepository:
    """租户感知的数据仓库"""
    
    def __init__(self, session):
        self.session = session
    
    def add_document(self, tenant_id: str, content: str, metadata: dict = None):
        """添加文档(自动关联租户)"""
        from uuid import uuid4
        
        doc = Document(
            id=str(uuid4()),
            tenant_id=tenant_id,
            content=content,
            metadata=metadata or {}
        )
        
        self.session.add(doc)
        self.session.commit()
        
        return doc.id
    
    def query_documents(self, tenant_id: str, limit: int = 10):
        """查询文档(自动过滤租户)"""
        return (
            self.session.query(Document)
            .filter(Document.tenant_id == tenant_id)
            .limit(limit)
            .all()
        )
    
    def delete_document(self, tenant_id: str, doc_id: str):
        """删除文档(双重验证)"""
        doc = (
            self.session.query(Document)
            .filter(
                Document.id == doc_id,
                Document.tenant_id == tenant_id  # 必须匹配租户
            )
            .first()
        )
        
        if not doc:
            raise ValueError("文档不存在或无权访问")
        
        self.session.delete(doc)
        self.session.commit()

# 使用示例
@tenant_required
def upload_document(tenant_id: str, content: str):
    repo = TenantAwareRepository(db_session)
    doc_id = repo.add_document(tenant_id, content)
    return {"doc_id": doc_id}

方案C:混合隔离(最佳实践)

策略:

  • 小租户(<1000文档)→ Row隔离
  • 大租户(≥1000文档)→ Schema隔离
class HybridTenantManager:
    """混合租户管理器"""
    
    def __init__(self):
        self.row_isolation_threshold = 1000  # 阈值
    
    def get_isolation_strategy(self, tenant_id: str) -> str:
        """根据租户规模选择隔离策略"""
        doc_count = self.get_tenant_document_count(tenant_id)
        
        if doc_count < self.row_isolation_threshold:
            return "row"
        else:
            return "schema"
    
    def query_documents(self, tenant_id: str, query: str):
        """智能查询(自动选择策略)"""
        strategy = self.get_isolation_strategy(tenant_id)
        
        if strategy == "row":
            return self._query_row_isolation(tenant_id, query)
        else:
            return self._query_schema_isolation(tenant_id, query)

👥 RBAC权限管理系统

角色定义

from enum import Enum
from dataclasses import dataclass
from typing import List

class Role(Enum):
    """用户角色"""
    ADMIN = "admin"           # 租户管理员
    EDITOR = "editor"         # 编辑者
    VIEWER = "viewer"         # 查看者
    API_USER = "api_user"     # API调用者

class Permission(Enum):
    """权限类型"""
    DOCUMENT_CREATE = "document:create"
    DOCUMENT_READ = "document:read"
    DOCUMENT_UPDATE = "document:update"
    DOCUMENT_DELETE = "document:delete"
    KNOWLEDGE_BASE_MANAGE = "kb:manage"
    USER_MANAGE = "user:manage"

# 角色-权限映射
ROLE_PERMISSIONS = {
    Role.ADMIN: [
        Permission.DOCUMENT_CREATE,
        Permission.DOCUMENT_READ,
        Permission.DOCUMENT_UPDATE,
        Permission.DOCUMENT_DELETE,
        Permission.KNOWLEDGE_BASE_MANAGE,
        Permission.USER_MANAGE,
    ],
    Role.EDITOR: [
        Permission.DOCUMENT_CREATE,
        Permission.DOCUMENT_READ,
        Permission.DOCUMENT_UPDATE,
    ],
    Role.VIEWER: [
        Permission.DOCUMENT_READ,
    ],
    Role.API_USER: [
        Permission.DOCUMENT_READ,
    ],
}

权限验证中间件

from functools import wraps
from flask import request, g, jsonify

def require_permission(permission: Permission):
    """权限验证装饰器"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            user = g.current_user
            
            # 检查用户是否有该权限
            if not user.has_permission(permission):
                return jsonify({
                    "error": "权限不足",
                    "required": permission.value
                }), 403
            
            return f(*args, **kwargs)
        return decorated_function
    return decorator

# 使用示例
@app.route('/api/documents', methods=['POST'])
@require_permission(Permission.DOCUMENT_CREATE)
def create_document():
    """创建文档(需要创建权限)"""
    tenant_id = g.tenant_id
    content = request.json['content']
    
    # 业务逻辑
    doc_id = document_service.create(tenant_id, content)
    
    return jsonify({"doc_id": doc_id}), 201

@app.route('/api/documents/<doc_id>', methods=['GET'])
@require_permission(Permission.DOCUMENT_READ)
def get_document(doc_id: str):
    """获取文档(需要读取权限)"""
    tenant_id = g.tenant_id
    
    doc = document_service.get(tenant_id, doc_id)
    
    return jsonify(doc.to_dict())

JWT Token生成与验证

import jwt
from datetime import datetime, timedelta

class AuthService:
    """认证服务"""
    
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
    
    def generate_token(
        self, 
        user_id: str, 
        tenant_id: str, 
        role: Role
    ) -> str:
        """生成JWT Token"""
        payload = {
            'user_id': user_id,
            'tenant_id': tenant_id,
            'role': role.value,
            'exp': datetime.utcnow() + timedelta(hours=24),
            'iat': datetime.utcnow()
        }
        
        token = jwt.encode(payload, self.secret_key, algorithm='HS256')
        return token
    
    def verify_token(self, token: str) -> dict:
        """验证Token"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
            return payload
        except jwt.ExpiredSignatureError:
            raise Exception("Token已过期")
        except jwt.InvalidTokenError:
            raise Exception("无效的Token")

# Flask中间件
@app.before_request
def authenticate():
    """请求前认证"""
    auth_header = request.headers.get('Authorization')
    
    if not auth_header:
        return jsonify({"error": "缺少认证信息"}), 401
    
    try:
        token = auth_header.split(' ')[1]
        payload = auth_service.verify_token(token)
        
        # 设置全局上下文
        g.current_user = User(
            user_id=payload['user_id'],
            tenant_id=payload['tenant_id'],
            role=Role(payload['role'])
        )
        g.tenant_id = payload['tenant_id']
    
    except Exception as e:
        return jsonify({"error": str(e)}), 401

⚖️ 资源调度与限流

配额管理

from dataclasses import dataclass
from datetime import datetime

@dataclass
class TenantQuota:
    """租户配额"""
    tenant_id: str
    max_documents: int = 10000          # 最大文档数
    max_queries_per_day: int = 10000    # 每日最大查询数
    max_storage_gb: float = 10.0        # 最大存储空间
    max_concurrent_requests: int = 10   # 最大并发请求
    
    used_documents: int = 0
    used_queries_today: int = 0
    used_storage_gb: float = 0.0
    current_requests: int = 0

class QuotaManager:
    """配额管理器"""
    
    def __init__(self):
        self.quotas: dict[str, TenantQuota] = {}
    
    def check_quota(self, tenant_id: str, resource: str) -> bool:
        """检查配额"""
        quota = self.quotas.get(tenant_id)
        
        if not quota:
            return False
        
        if resource == "documents":
            return quota.used_documents < quota.max_documents
        elif resource == "queries":
            return quota.used_queries_today < quota.max_queries_per_day
        elif resource == "storage":
            return quota.used_storage_gb < quota.max_storage_gb
        
        return False
    
    def increment_usage(self, tenant_id: str, resource: str):
        """增加使用量"""
        quota = self.quotas.get(tenant_id)
        
        if quota:
            if resource == "documents":
                quota.used_documents += 1
            elif resource == "queries":
                quota.used_queries_today += 1

# 限流装饰器
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.route('/api/query', methods=['POST'])
@limiter.limit("100/hour")  # 每小时100次
@require_permission(Permission.DOCUMENT_READ)
def query_knowledge_base():
    """查询知识库(带限流)"""
    tenant_id = g.tenant_id
    
    # 检查配额
    if not quota_manager.check_quota(tenant_id, "queries"):
        return jsonify({"error": "超出查询配额"}), 429
    
    # 增加使用量
    quota_manager.increment_usage(tenant_id, "queries")
    
    # 业务逻辑
    query = request.json['query']
    results = rag_service.query(tenant_id, query)
    
    return jsonify(results)

优先级队列

import heapq
from queue import PriorityQueue

class PriorityRequestQueue:
    """优先级请求队列"""
    
    def __init__(self):
        self.queue = PriorityQueue()
        self.counter = 0
    
    def add_request(self, tenant_id: str, priority: int, request_data: dict):
        """添加请求(优先级高的先处理)"""
        # priority: 1=高, 2=中, 3=低
        self.queue.put((priority, self.counter, tenant_id, request_data))
        self.counter += 1
    
    def get_next_request(self):
        """获取下一个请求"""
        if not self.queue.empty():
            priority, _, tenant_id, request_data = self.queue.get()
            return {
                'tenant_id': tenant_id,
                'priority': priority,
                'data': request_data
            }
        return None

# 使用示例
request_queue = PriorityRequestQueue()

# VIP租户的请求优先处理
request_queue.add_request("vip_company", 1, {"query": "重要问题"})
request_queue.add_request("normal_company", 3, {"query": "普通问题"})

# 处理请求
next_request = request_queue.get_next_request()
print(f"处理租户: {next_request['tenant_id']}, 优先级: {next_request['priority']}")

💻 完整项目代码

我已经为你准备了完整的多租户RAG平台项目:

GitHub仓库:

git clone https://github.com/Lee985-cmd/AI-30-Day-Challenge.git
cd projects/multi-tenant-rag-platform

项目特性:

  • ✅ Schema + Row混合隔离
  • ✅ RBAC权限管理
  • ✅ JWT认证
  • ✅ 配额管理和限流
  • ✅ 优先级队列
  • ✅ Streamlit管理后台
  • ✅ 完整的API文档

快速启动:

# 安装依赖
pip install -r requirements.txt

# 启动PostgreSQL(Docker)
docker run -d \
  --name postgres \
  -e POSTGRES_PASSWORD=password \
  -p 5432:5432 \
  postgres:15

# 配置环境变量
export DATABASE_URL="postgresql://postgres:password@localhost:5432/rag_db"
export JWT_SECRET_KEY="your-secret-key"

# 初始化数据库
python init_db.py

# 启动API服务
uvicorn main:app --reload

# 启动管理后台
streamlit run admin_dashboard.py

📊 性能优化技巧

1. 连接池优化

from sqlalchemy.pool import QueuePool

engine = create_engine(
    database_url,
    poolclass=QueuePool,
    pool_size=20,           # 连接池大小
    max_overflow=40,        # 最大溢出连接数
    pool_timeout=30,        # 超时时间
    pool_recycle=1800       # 连接回收时间(秒)
)

2. 缓存策略

import redis
from functools import lru_cache

class TenantCache:
    """租户缓存"""
    
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
    def get_tenant_config(self, tenant_id: str) -> dict:
        """获取租户配置(带缓存)"""
        cache_key = f"tenant:{tenant_id}:config"
        
        # 尝试从缓存获取
        cached = self.redis_client.get(cache_key)
        if cached:
            return json.loads(cached)
        
        # 从数据库查询
        config = self._load_from_db(tenant_id)
        
        # 存入缓存(1小时过期)
        self.redis_client.setex(cache_key, 3600, json.dumps(config))
        
        return config

3. 异步处理

import asyncio
from celery import Celery

celery_app = Celery('tasks', broker='redis://localhost:6379/1')

@celery_app.task
def async_index_documents(tenant_id: str, doc_ids: list):
    """异步索引文档"""
    for doc_id in doc_ids:
        # 生成向量
        embedding = generate_embedding(doc_id)
        
        # 存入向量数据库
        vector_db.insert(tenant_id, doc_id, embedding)
    
    return f"Indexed {len(doc_ids)} documents for tenant {tenant_id}"

# 使用
async_index_documents.delay("tenant_001", ["doc1", "doc2", "doc3"])

🛠️ 常见问题与解决

Q1: 如何保证数据完全隔离?

答: 多层防护

# 1. 数据库层:Schema隔离
CREATE SCHEMA tenant_001;

# 2. 应用层:强制租户过滤
SELECT * FROM documents WHERE tenant_id = :tenant_id;

# 3. 代码层:装饰器验证
@tenant_required
def query_documents(tenant_id: str):
    ...

# 4. 审计层:记录所有访问
audit_log.log(tenant_id, user_id, action, timestamp)

Q2: 如何处理租户激增?

答: 自动化扩容

class AutoScalingManager:
    """自动扩缩容管理器"""
    
    def check_and_scale(self):
        """检查并扩容"""
        active_tenants = self.get_active_tenants_count()
        
        if active_tenants > self.threshold:
            # 添加新的数据库实例
            new_instance = self.create_database_instance()
            
            # 更新负载均衡
            self.update_load_balancer(new_instance)
            
            # 发送告警
            self.send_alert(f"Auto-scaled to {new_instance}")

Q3: 如何实现灰度发布?

答: 基于租户的灰度

def should_use_new_feature(tenant_id: str) -> bool:
    """判断租户是否使用新功能"""
    # 10%的租户使用新版本
    if hash(tenant_id) % 100 < 10:
        return True
    return False

@app.route('/api/query')
def query():
    tenant_id = g.tenant_id
    
    if should_use_new_feature(tenant_id):
        return new_query_handler()
    else:
        return old_query_handler()

📈 实际应用案例

案例1:SaaS客服平台

场景: 为500家企业提供智能客服

架构:

  • 数据隔离:Schema隔离
  • 权限管理:RBAC(Admin/Editor/Viewer)
  • 资源配额:每企业10000文档/天
  • 性能:平均响应时间<500ms

效果:

  • 运维成本降低70%
  • 新客户接入时间从1天缩短到5分钟
  • 客户满意度提升40%

案例2:企业知识库平台

场景: 为1000+企业提供知识管理

架构:

  • 数据隔离:混合隔离(小租户Row,大租户Schema)
  • 权限管理:自定义角色
  • 资源配额:按套餐分级
  • 监控:Prometheus + Grafana

效果:

  • 支持1000+企业同时在线
  • 数据存储成本降低60%
  • 系统可用性99.9%

🎯 总结

多租户RAG的核心要点:

  1. ✅ 数据隔离 - Schema隔离(安全)+ Row隔离(性能)
  2. ✅ 权限管理 - RBAC + JWT + 中间件验证
  3. ✅ 资源调度 - 配额管理 + 限流 + 优先级队列
  4. ✅ 性能优化 - 连接池 + 缓存 + 异步处理

最佳实践:

  • 从小规模开始,逐步演进
  • 多层防护,不要依赖单一机制
  • 完善的监控和告警
  • 定期审计和安全测试

下一步:

  • 尝试完整项目代码
  • 根据业务需求调整隔离策略
  • 实施监控和告警
  • 进行压力测试

完整代码和详细教程: 👉 GitHub仓库

上一篇: Agent性能优化:从秒级到毫秒级的提速技巧
下一篇: 用Agent自动化数据处理:从2小时到15分钟的效率革命

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐