最近在做大模型 RAG(检索增强生成)相关的工程化落地时,遇到了一个非常典型的性能瓶颈:文档的解析与向量化处理。

设想一个真实的业务场景,用户上传了一份几十页的企业财报 PDF,后端需要完成三个步骤:

  1. 将文档切分为大概 1000 个文本块(Chunk)。
  2. 调用大模型厂商的 Embedding API,把这 1000 个文本块转化为向量。
  3. 把文本和向量数据批量存入 Milvus 向量数据库。

最开始是用最原始的同步循环来写的。假设每次 API 请求网络往返需要 0.5 秒,1000 个 Chunk 顺序处理完差不多要 8 分钟。这还不是最要命的,最要命的是如果处理到第 999 个的时候网络抖动了一下,抛了个异常,整个任务直接前功尽弃。

为了彻底解决高并发和高可靠性的问题,我花时间重构了这部分链路,最终敲定了一套基于 Celery + RabbitMQ + asyncio 的混合异步流水线。今天抽空把这套方案的核心思路和踩坑经验梳理出来。

核心架构思路

在这套架构里,各个组件的职责划分非常明确:

  • RabbitMQ: 充当 Broker,把用户的上传任务持久化,哪怕服务器中途重启,任务也不会丢。
  • Celery Chain: 负责分布式任务流的编排。它能把“切片 -> 向量化 -> 入库”这三个独立的步骤像流水线一样无缝串联起来。
  • asyncio: 这是单机性能的破局点。Celery 的 Worker 默认是同步阻塞的,如果我们在 Task 里老老实实等 I/O 返回,Worker 进程直接就卡死了。我利用 asyncio 在单个 Task 内部接管了网络请求,实现了单线程下的极致并发。

整体的数据流向如下:
用户触发 -> MQ -> [Task1: 文本切片] -> [Task2: asyncio并发向量化] -> [Task3: 批量写入Milvus]

关键代码实现

下面挑核心部分的代码说一下。

首先是流水线的编排入口,Celery 原生提供的 chain 功能非常适合做这种分步任务。

from celery import shared_task, chain

def start_rag_pipeline(document_id: str, raw_text: str):
    # 将三个任务串接,上一个任务的返回值会自动作为下一个任务的参数
    workflow = chain(
        chunking_task.s(document_id, raw_text),  
        embedding_task.s(),                      
        milvus_insert_task.s()                   
    )
    workflow.apply_async()

第一个任务是文本切片。这是个纯 CPU 密集的任务,耗时很短,直接同步跑就行。这里为了演示,只做逻辑上的 mock。

@shared_task(bind=True, max_retries=3)
def chunking_task(self, document_id: str, raw_text: str):
    # 模拟切分出了 1000 个 Chunk
    chunks = [{"doc_id": document_id, "chunk_id": i, "text": f"文本片段_{i}"} for i in range(1000)]
    return chunks 

接下来是整条流水线的重点:同步任务包装异步协程

这里绝对不能上来就是一个 asyncio.gather 把 1000 个并发全扔出去。瞬间产生 1000 个并发请求,要么把自己的内存撑爆,要么直接被大模型厂商的 API 网关拉黑(HTTP 429)。我们必须引入 asyncio.Semaphore 来做并发控制。

import asyncio
import aiohttp

async def fetch_embedding_async(chunk: dict, session: aiohttp.ClientSession, sem: asyncio.Semaphore):
    # 利用信号量控制最大并发度
    async with sem: 
        # 模拟调用 Embedding API
        async with session.post("http://llm-api/v1/embeddings", json={"input": chunk['text']}) as resp:
            # 模拟网络延迟和向量返回
            await asyncio.sleep(0.1) 
            chunk['vector'] = [0.1] * 768 
            return chunk

async def async_embedding_pipeline(chunks: list):
    # 关键点:每次最多允许 50 个并发在飞
    sem = asyncio.Semaphore(50) 
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_embedding_async(chunk, session, sem) for chunk in chunks]
        results = await asyncio.gather(*tasks)
        return results

@shared_task(bind=True)
def embedding_task(self, chunks: list):
    # Celery 壳子:在普通的同步线程中唤起事件循环
    processed_chunks = asyncio.run(async_embedding_pipeline(chunks))
    return processed_chunks

最后一步就是拿到底层数据,去和 Milvus 交互了。拿到批量的向量后,一次性 insert 进去即可。

@shared_task(bind=True, max_retries=3)
def milvus_insert_task(self, processed_chunks: list):
    # 提取字段构造实体列表
    doc_ids = [c['doc_id'] for c in processed_chunks]
    chunk_ids = [c['chunk_id'] for c in processed_chunks]
    texts = [c['text'] for c in processed_chunks]
    embeddings = [c['vector'] for c in processed_chunks]
    
    entities = [doc_ids, chunk_ids, texts, embeddings]
    
    # 假设 Collection 对象已初始化
    # collection.insert(entities)
    # collection.flush()
    
    return "SUCCESS"

生产环境排坑指南

这套方案跑通不难,但如果要上生产环境,有几个坑:

1. 协程冲突导致的死锁
网上搜 Celery 并发,很多帖子会教你加参数 -P gevent 或者 eventlet。在这个架构里,千万别这么干。
因为我们在业务代码里已经用到了原生的 asyncio。gevent 的底层逻辑是给 Python 标准库打猴子补丁,如果把它和 asyncio 的事件循环混在一起用,你会遇到极其诡异且难以复现的底层死锁。老老实实保持默认的多进程(prefork)或多线程(threads)模式去启动 Worker,并发的事情交给代码里的 asyncio 去做。

2. 警惕 MQ 被大消息撑爆
在 Celery Chain 里,上一个任务的返回值是会被序列化成 JSON 塞进 RabbitMQ,然后再塞给下一个任务的。你想想,1000 个带着 768 维浮点数向量的字典,那可是好几兆的数据量。把 MQ 当数据库用,迟早把集群搞瘫痪。在真正的生产环境里,更优雅的做法是“数据不动指令动”:Task1 切片完成后存入 Redis 或 MySQL,只往后传递一个 batch_id;Task2 拿到 ID 去查库、做向量化、再写回库里;Task3 同样根据 ID 去拉取数据存入 Milvus。让消息队列保持轻量。

3. API 限流的防御性编程
就算单机加了并发限制,如果你线外部署了 10 个 Worker 节点,总并发还是可能会触碰大模型接口的限流阈值。在发请求的 async 逻辑里,必须加上基于 tenacity 的指数退避重试(Exponential Backoff)机制,做好重试容灾,否则整个任务链很容易因为个别请求被拒绝而全部失败。

总结

通过 RabbitMQ 保底不丢任务,利用 Celery 编排复杂流程,再用 asyncio 榨干单机的 I/O 性能,算是一套比较实用且高性价比的解法。希望这篇踩坑记录能给正在做 AI 架构的朋友一点参考。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐