第13讲:异步任务队列与长任务处理——Celery + Redis实战
欢迎来到《Python + AI Agent 实战开发完全指南》专栏!
在前面的课程中,我们使用 FastAPI 将多智能体系统封装为了 RESTful API。但在真实的生产环境中,AI 推理和多智能体协作往往需要几秒甚至几分钟的时间。如果用户在发起请求后一直等待 HTTP 响应,不仅体验极差,还容易导致网关超时(Timeout)或服务器线程阻塞。
在这一讲中,我们将引入工业界标准的解决方案:异步任务队列。我们将学习如何使用 Celery 结合 Redis,将耗时的 Agent 任务从 Web 服务中彻底剥离,实现真正的非阻塞架构。
学习目标:
- 理解同步阻塞与异步任务队列的核心区别。
- 掌握 Celery + Redis 的环境搭建与配置。
- 将多智能体工作流改造为 Celery Task。
- 实现任务的提交、状态轮询与结果获取机制。
一、 为什么要引入任务队列?
1.1 传统同步调用的痛点
在之前的 FastAPI 代码中,即使使用了 run_in_executor,本质上依然是“客户端发起请求 -> 服务端执行完AI任务 -> 返回结果”。
- 用户体验差:浏览器转圈等待,容易误触刷新。
- 资源浪费:Web 服务器的 Worker 线程被长时间占用,无法处理其他用户的健康检查或简单查询。
- 单点故障风险:如果 AI 调用发生死锁,整个 Web 服务可能随之崩溃。
1.2 生产者-消费者模型
引入 Celery 后,我们的架构变为经典的分布式模型:
- Producer (FastAPI):只负责接收请求,把任务丢进 Redis 队列,立刻给用户返回一个
task_id(耗时 < 10ms)。 - Broker (Redis):作为消息中间件,安全地排队存储待执行的 Agent 任务。
- Consumer (Celery Worker):独立的后台进程,从队列中取出任务,调用 LangGraph 执行多智能体协作,并将结果写回 Redis。
- Client (前端/用户):拿着
task_id定期轮询接口,直到任务完成再获取最终报告。
二、 环境准备与基础配置
2.1 安装核心依赖
pip install celery redis fastapi uvicorn langchain-openai python-dotenv
2.2 启动本地 Redis
如果你使用的是 Docker,可以一键启动:
docker run -d -p 6379:6379 --name agent-redis redis:latest
三、 核心实战:构建 Celery Agent Worker
3.1 定义 Celery 实例与任务 (celery_app.py)
我们将之前第11讲的多智能体逻辑封装为 Celery 任务:
import os
from celery import Celery
from team_state_agent import create_team_workflow # 导入我们封装好的多智能体工作流
# 初始化 Celery,指定 Redis 作为 Broker 和 Result Backend
app = Celery(
"agent_tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1"
)
# 优化配置
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Shanghai",
enable_utc=True,
task_track_started=True, # 追踪任务开始时间
)
@app.task(bind=True, name="run_multi_agent_collaboration")
def run_multi_agent_collaboration(self, query: str, max_rounds: int = 3):
"""
多智能体协作的异步任务
bind=True 允许我们在任务内部访问 self,用于更新状态
"""
try:
# 更新任务状态为进行中
self.update_state(state='PROGRESS', meta={'status': 'Initializing Agents...'})
workflow = create_team_workflow()
initial_state = {
"query": query,
"research_data": "",
"report": "",
"loop_count": 0
}
self.update_state(state='PROGRESS', meta={'status': 'Researcher is working...'})
result = workflow.invoke(initial_state)
return {
"status": "success",
"research_data": result.get("research_data"),
"report": result.get("report"),
"total_rounds": result.get("loop_count")
}
except Exception as e:
# 捕获异常,防止 Worker 崩溃
return {"status": "error", "message": str(e)}
3.2 启动 Worker 进程
打开一个新的终端窗口(保持 FastAPI 运行),启动消费者:
celery -A celery_app worker --loglevel=info --pool=solo
# 注意:Windows 下建议加 --pool=solo,Linux/Mac 默认即可
四、 改造 FastAPI:接入任务队列
现在,我们需要修改 FastAPI 的接口,使其只负责任务分发和状态查询。
# main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
from celery_app import run_multi_agent_collaboration
app = FastAPI(title="Async Multi-Agent API")
class AgentRequest(BaseModel):
query: str
max_rounds: int = 3
# 1. 提交任务接口(毫秒级响应)
@app.post("/api/v1/agents/collaborate/async")
async def submit_task(request: AgentRequest):
# .delay() 将任务推送到 Redis,立即返回 task_id
task = run_multi_agent_collaboration.delay(request.query, request.max_rounds)
return {
"task_id": task.id,
"status": "PENDING",
"message": "Task has been queued."
}
# 2. 查询任务状态接口
@app.get("/api/v1/tasks/{task_id}")
async def get_task_status(task_id: str):
result = AsyncResult(task_id, app=run_multi_agent_collaboration.app)
response = {
"task_id": task_id,
"status": result.status,
"result": None
}
if result.status == "PROGRESS":
response["progress"] = result.info.get("status")
elif result.status == "SUCCESS":
response["result"] = result.result
elif result.status == "FAILURE":
response["error"] = str(result.result)
return response
五、测试用例
import requests
import json
url = "http://localhost:8000/api/v1/agents/collaborate/async"
data = {
"query": "帮我写一份关于2024年AI Agent发展趋势的简报",
"webhook_url": "http://your-webhook-url.com/callback"
}
response = requests.post(url, json=data)
print(json.dumps(response.json(), indent=2, ensure_ascii=False))
六、 进阶优化:生产级考量
在实际的企业落地中,仅仅跑通是不够的,还需要考虑以下工程细节:
- 结果过期策略:Redis 中的结果不能无限堆积。可以在 Celery 配置中添加
result_expires=3600,让一小时前的任务结果自动清除。 - 超时与重试机制:大模型 API 偶尔会限流或超时。可以使用 Celery 的
autoretry_for参数: - 并发控制:如果你的 LLM API 有严格的 QPS 限制,可以通过设置 Worker 的并发数(如
-c 2)或使用celery-rate-limit来平滑请求。
七、 本节小结与进阶展望
恭喜你!至此,你的 AI Agent 后端已经具备了企业级的健壮性。
- 彻底解耦:Web 服务与 AI 计算分离,互不影响。
- 高可用性:即使某个 Agent 任务卡死,也不会拖垮整个 API 网关。
- 弹性扩展:当任务量激增时,只需增加 Celery Worker 节点即可水平扩容。
课后思考:
目前前端依然需要通过定时轮询(Polling)来获取任务状态,这在一定程度上仍有延迟。你能否查阅资料,尝试在 FastAPI 中引入 WebSocket 或 Server-Sent Events (SSE),让后端在 Celery 任务完成的瞬间,主动向前端推送结果?
在下一讲(第14讲)中,我们将走出开发环境,进入项目打包与容器化部署阶段,学习如何用 Docker 将这套包含 FastAPI、Celery、Redis 的复杂架构一键部署到云服务器上!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)