基于前文轻量化企业级RAG权限架构,本文输出生产可用核心源码(Python/FastAPI、完整部署流程、本地调试方案、权限自测逻辑。所有代码贴合跨部门隔离、部门内多级RBAC、切片级权限过滤体系,无冗余依赖,中小企业单机可直接部署上线。

一、项目整体文件结构

极简分层结构,无复杂微服务,新手可快速上手

Plain Text
rag_rbac_project/
├── main.py               # 项目入口、API路由
├── rbac_middleware.py    # 核心权限过滤中间件(三层鉴权)
├── doc_ingest.py         # 文档解析、切片、权限打标、向量入库
├── rag_service.py        # RAG检索+LLM生成服务
├── database.py           # 数据库连接、三张权限表模型
├── config.py             # 全局配置(密钥、模型、向量库)
└── requirements.txt      # 项目依赖

二、全部核心代码文件

1. 全局配置文件 config.py

Plain Text
# 全局基础配置
# 服务配置
HOST = "0.0.0.0"
PORT = 8000

# MySQL数据库配置
MYSQL_HOST = "127.0.0.1"
MYSQL_USER = "root"
MYSQL_PASSWORD = "你的数据库密码"
MYSQL_DB = "rag_rbac_db"
MYSQL_PORT = 3306

# 向量库配置(轻量化Chroma)
CHROMA_PERSIST_PATH = "./chroma_db"
EMBEDDING_MODEL = "text-embedding-3-small"

# 大模型配置
LLM_API_KEY = "你的大模型API密钥"
LLM_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
LLM_MODEL = "qwen3.5-turbo"

# 权限常量定义
ROLE_LEVEL_MAP = {
    0: "普通员工",
    1: "业务骨干",
    2: "部门负责人",
    3: "高管/管理员"
}
# 角色允许访问的最大文档密级
ROLE_SECRET_RULE = {
    0: 1,
    1: 2,
    2: 3,
    3: 3
}

2. 依赖清单 requirements.txt

Plain Text
fastapi==0.104.1
uvicorn==0.24.0
pymysql==1.1.0
sqlalchemy==2.0.23
chromadb==0.4.18
openai==1.6.0
langchain==0.1.0
python-multipart==0.0.6
pywin32==306
python-dotenv==1.0.0

3. 数据库模型文件 database.py(对应三张核心权限表)

Plain Text
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from config import *
import datetime

# 数据库连接初始化
SQLALCHEMY_DATABASE_URL = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}?charset=utf8mb4"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# 1. 用户权限表 user_rbac
class UserRbac(Base):
    __tablename__ = "user_rbac"
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(String(50), unique=True, nullable=False)
    username = Column(String(50))
    dept_name = Column(String(50))
    dept_code = Column(String(50))
    role_level = Column(Integer, default=0)
    privilege_tag = Column(String(200), default="")
    status = Column(Integer, default=1)
    create_time = Column(DateTime, default=datetime.datetime.now)

# 2. 文档权限表 doc_permission
class DocPermission(Base):
    __tablename__ = "doc_permission"
    id = Column(Integer, primary_key=True, index=True)
    doc_id = Column(String(50), unique=True, nullable=False)
    vec_group_id = Column(String(50))
    dept_owner = Column(String(50))
    secret_level = Column(Integer, default=1)
    white_list_users = Column(Text, default="")
    black_list_users = Column(Text, default="")
    is_allow_summary = Column(Integer, default=1)
    is_allow_export = Column(Integer, default=1)
    uploader_id = Column(String(50))
    create_time = Column(DateTime, default=datetime.datetime.now)

# 3. 权限映射规则表 rbac_rule
class RbacRule(Base):
    __tablename__ = "rbac_rule"
    id = Column(Integer, primary_key=True, index=True)
    role_level = Column(Integer)
    allow_secret_level = Column(Integer)
    allow_dept_list = Column(String(200), default="")
    is_cross_dept_query = Column(Integer, default=0)

# 创建所有数据表
def create_tables():
    Base.metadata.create_all(bind=engine)

# 获取数据库会话
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
 

4. 核心权限中间件 rbac_middleware.py(三层鉴权核心)

实现:跨部门拦截 + 部门内密级过滤 + 黑白名单兜底,检索前置鉴权

Plain Text
from sqlalchemy.orm import Session
from database import UserRbac, DocPermission, RbacRule
from config import ROLE_SECRET_RULE

class RBACFilter:
    def __init__(self, db: Session, user_id: str):
        self.db = db
        self.user_id = user_id
        self.user_info = self._get_user_info()

    # 获取用户身份权限信息
    def _get_user_info(self):
        user = self.db.query(UserRbac).filter(UserRbac.user_id == self.user_id).first()
        if not user:
            return None
        return user

    # 核心:过滤向量切片权限列表
    def filter_vectors(self, doc_id_list: list):
        if not self.user_info:
            return []
       
        # 用户基础身份
        user_dept = self.user_info.dept_name
        user_role = self.user_info.role_level
        allow_max_secret = ROLE_SECRET_RULE.get(user_role, 0)

        # 遍历所有召回文档,逐层鉴权
        valid_doc_ids = []
        for doc_id in doc_id_list:
            doc = self.db.query(DocPermission).filter(DocPermission.doc_id == doc_id).first()
            if not doc:
                continue

            # 1. 黑名单直接拦截
            if self.user_id in doc.black_list_users.split(","):
                continue

            # 2. 白名单优先兜底
            if doc.white_list_users:
                if self.user_id in doc.white_list_users.split(","):
                    valid_doc_ids.append(doc_id)
                continue

            # 3. 跨部门拦截
            if doc.dept_owner != user_dept and doc.secret_level != 0:
                continue

            # 4. 部门内密级等级拦截
            if doc.secret_level > allow_max_secret:
                continue

            # 所有校验通过
            valid_doc_ids.append(doc_id)
        return valid_doc_ids
 

5. 文档入库权限打标模块 doc_ingest.py

实现文档切片、向量入库、自动绑定部门/密级权限标签

Plain Text
import uuid
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import TextLoader, PyPDFLoader
from sqlalchemy.orm import Session
from database import DocPermission
from config import *

# 初始化向量库
embeddings = OpenAIEmbeddings(
    openai_api_key=LLM_API_KEY,
    openai_api_base=LLM_BASE_URL,
    model=EMBEDDING_MODEL
)
vector_db = Chroma(
    persist_directory=CHROMA_PERSIST_PATH,
    embedding_function=embeddings
)

# 文档切片
def split_document(file_path: str):
    splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=100)
    if file_path.endswith(".pdf"):
        loader = PyPDFLoader(file_path)
    else:
        loader = TextLoader(file_path, encoding="utf-8")
    docs = loader.load_and_split(text_splitter=splitter)
    return docs

# 带权限打标的文档入库
def ingest_doc_with_rbac(db: Session, file_path: str, uploader_id: str, dept_owner: str, secret_level: int):
    # 1. 文档切片
    chunks = split_document(file_path)
    doc_id = str(uuid.uuid4())
    vec_group_id = f"dept_{dept_owner}"

    # 2. 向量入库,绑定权限元数据
    metadatas = []
    for _ in chunks:
        metadatas.append({
            "doc_id": doc_id,
            "vec_group_id": vec_group_id,
            "dept_owner": dept_owner,
            "secret_level": secret_level
        })
    vector_db.add_documents(chunks, metadatas=metadatas)
    vector_db.persist()

    # 3. 写入数据库权限记录
    doc_perm = DocPermission(
        doc_id=doc_id,
        vec_group_id=vec_group_id,
        dept_owner=dept_owner,
        secret_level=secret_level,
        uploader_id=uploader_id
    )
    db.add(doc_perm)
    db.commit()
    return {"code":200, "msg":"入库成功", "doc_id":doc_id}
 

6. RAG核心服务 rag_service.py

权限检索 + LLM二次权限约束生成

Plain Text
from openai import OpenAI
from sqlalchemy.orm import Session
from rbac_middleware import RBACFilter
from config import *

client = OpenAI(
    api_key=LLM_API_KEY,
    base_url=LLM_BASE_URL
)

# RAG问答核心逻辑
def rag_chat(db: Session, user_id: str, query: str):
    # 1. 初始化权限过滤器
    rbac = RBACFilter(db, user_id)
    if not rbac.user_info:
        return {"code":403, "msg":"用户无权限"}

    # 2. 全局粗检索
    search_res = vector_db.similarity_search(query, k=10)
    doc_id_list = list(set([item.metadata["doc_id"] for item in search_res]))

    # 3. 权限精准过滤(核心)
    valid_doc_ids = rbac.filter_vectors(doc_id_list)
    valid_chunks = [x for x in search_res if x.metadata["doc_id"] in valid_doc_ids]

    if not valid_chunks:
        return {"code":200, "data":"暂无权限查看相关内容"}

    # 4. 拼接上下文
    context = "\n".join([page.page_content for page in valid_chunks])

    # 5. LLM权限约束Prompt
    prompt = f"""
    你是企业内部智能办公助手,严格遵守数据权限规则:
    1. 仅基于给定上下文回答,禁止编造、推演、猜测涉密数据
    2. 禁止输出跨部门涉密信息、底价、成本、薪资等敏感内容
    3. 无明确信息时直接回复「暂无相关资料」

    上下文:{context}
    用户问题:{query}
    """

    # 6. 生成答案
    res = client.chat.completions.create(
        model=LLM_MODEL,
        messages=[{"role":"user", "content":prompt}],
        temperature=0.1
    )
    return {"code":200, "data":res.choices[0].message.content}
 

7. 项目入口 main.pyAPI路由)

Plain Text
from fastapi import FastAPI, Depends, UploadFile, File
from sqlalchemy.orm import Session
from database import get_db, create_tables
from doc_ingest import ingest_doc_with_rbac
from rag_service import rag_chat
from config import HOST, PORT

# 初始化数据表
create_tables()
app = FastAPI(title="RBAC-RAG企业权限知识库")

# 1. 文档上传入库接口
@app.post("/doc/ingest")
async def doc_ingest(
    file: UploadFile = File(...),
    user_id: str = "",
    dept_owner: str = "",
    secret_level: int = 1,
    db: Session = Depends(get_db)
):
    # 保存临时文件
    file_path = f"./temp_{file.filename}"
    with open(file_path, "wb") as f:
        f.write(await file.read())
    res = ingest_doc_with_rbac(db, file_path, user_id, dept_owner, secret_level)
    return res

# 2. 智能问答接口
@app.post("/rag/chat")
async def chat(
    user_id: str,
    query: str,
    db: Session = Depends(get_db)
):
    res = rag_chat(db, user_id, query)
    return res

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host=HOST, port=PORT)
 

三、完整部署流程(服务器一键部署)

1. 环境准备

  • 服务器:4核8G 轻量云服务器(CentOS7+/Ubuntu20+)
  • 安装Python3.9、MySQL8.0
  • 创建数据库:create database rag_rbac_db default charset utf8mb4;

2. 项目部署步骤

  1. 新建项目文件夹mkdir rag_rbac_project && cd rag_rbac_project
  1. 将以上7个代码文件全部上传至目录
  1. 安装依赖:pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
  1. 修改 config.py 中:数据库密码、大模型API密钥
  1. 初始化数据:手动在 user_rbac 表插入测试用户、rbac_rule 插入权限规则
  1. 启动服务:python main.py
  1. 后台常驻运行:nohup python main.py > run.log 2>&1

四、本地调试方法(全程可自测)

1. 调试前置准备

  • 本地安装MySQL8.0,创建对应数据库
  • 修改config.py为本地数据库配置
  • 在数据库录入测试数据:
               
    - 普通员工账号(role_level=0,部门=销售部)
               
    - 销售负责人账号(role_level=2,部门=销售部)

    - 研发部测试账号
           

2. 接口调试(自动生成接口文档)

服务启动后访问:http://127.0.0.1:8000/docs

  • 调用 /doc/ingest 上传测试文档,设置不同部门、不同密级
  • 调用 /rag/chat 传入不同用户ID,测试权限拦截效果

五、核心权限调试验证标准(必过测试用例)

  1. 跨部门拦截:销售账号查询研发、财务文档 → 返回无权限
  1. 部门内层级拦截:销售普通员工无法查看销售高密级底价文档,销售负责人可正常查看
  1. 白名单兜底:绝密文档仅白名单用户可访问,部门负责人也无法查看
  1. LLM生成约束:无权限内容不会被推演、不会泄露敏感数据

六、常见问题排查

  • 权限不生效:检查向量切片是否绑定secret_level、dept_owner元数据
  • 检索为空:核对用户部门编码、角色等级配置是否正确
  • 模型报错:检查API密钥、模型地址是否有效,切换轻量模型
  • 数据残留:更新文档密级后,清空本地chroma_db缓存重新入库

|(注:文档部分内容可能由 AI 生成)

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐