欢迎来到《Python + AI Agent 实战开发完全指南》专栏!

在前面的课程中,我们使用 FastAPI 将多智能体系统封装为了 RESTful API。但在真实的生产环境中,AI 推理和多智能体协作往往需要几秒甚至几分钟的时间。如果用户在发起请求后一直等待 HTTP 响应,不仅体验极差,还容易导致网关超时(Timeout)或服务器线程阻塞。

在这一讲中,我们将引入工业界标准的解决方案:异步任务队列。我们将学习如何使用 Celery 结合 Redis,将耗时的 Agent 任务从 Web 服务中彻底剥离,实现真正的非阻塞架构。

学习目标:

  • 理解同步阻塞与异步任务队列的核心区别。
  • 掌握 Celery + Redis 的环境搭建与配置。
  • 将多智能体工作流改造为 Celery Task。
  • 实现任务的提交、状态轮询与结果获取机制。

一、 为什么要引入任务队列?

1.1 传统同步调用的痛点

在之前的 FastAPI 代码中,即使使用了 run_in_executor,本质上依然是“客户端发起请求 -> 服务端执行完AI任务 -> 返回结果”。

  • 用户体验差:浏览器转圈等待,容易误触刷新。
  • 资源浪费:Web 服务器的 Worker 线程被长时间占用,无法处理其他用户的健康检查或简单查询。
  • 单点故障风险:如果 AI 调用发生死锁,整个 Web 服务可能随之崩溃。

1.2 生产者-消费者模型

引入 Celery 后,我们的架构变为经典的分布式模型:

  1. Producer (FastAPI):只负责接收请求,把任务丢进 Redis 队列,立刻给用户返回一个 task_id(耗时 < 10ms)。
  2. Broker (Redis):作为消息中间件,安全地排队存储待执行的 Agent 任务。
  3. Consumer (Celery Worker):独立的后台进程,从队列中取出任务,调用 LangGraph 执行多智能体协作,并将结果写回 Redis。
  4. Client (前端/用户):拿着 task_id 定期轮询接口,直到任务完成再获取最终报告。

二、 环境准备与基础配置

2.1 安装核心依赖

pip install celery redis fastapi uvicorn langchain-openai python-dotenv

2.2 启动本地 Redis

如果你使用的是 Docker,可以一键启动:

docker run -d -p 6379:6379 --name agent-redis redis:latest

三、 核心实战:构建 Celery Agent Worker

3.1 定义 Celery 实例与任务 (celery_app.py)

我们将之前第11讲的多智能体逻辑封装为 Celery 任务:

import os
from celery import Celery
from team_state_agent import create_team_workflow # 导入我们封装好的多智能体工作流

# 初始化 Celery,指定 Redis 作为 Broker 和 Result Backend
app = Celery(
    "agent_tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1"
)

# 优化配置
app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Shanghai",
    enable_utc=True,
    task_track_started=True, # 追踪任务开始时间
)

@app.task(bind=True, name="run_multi_agent_collaboration")
def run_multi_agent_collaboration(self, query: str, max_rounds: int = 3):
    """
    多智能体协作的异步任务
    bind=True 允许我们在任务内部访问 self,用于更新状态
    """
    try:
        # 更新任务状态为进行中
        self.update_state(state='PROGRESS', meta={'status': 'Initializing Agents...'})
        
        workflow = create_team_workflow()
        initial_state = {
            "query": query,
            "research_data": "",
            "report": "",
            "loop_count": 0
        }
        
        self.update_state(state='PROGRESS', meta={'status': 'Researcher is working...'})
        result = workflow.invoke(initial_state)
        
        return {
            "status": "success",
            "research_data": result.get("research_data"),
            "report": result.get("report"),
            "total_rounds": result.get("loop_count")
        }
        
    except Exception as e:
        # 捕获异常,防止 Worker 崩溃
        return {"status": "error", "message": str(e)}

3.2 启动 Worker 进程

打开一个新的终端窗口(保持 FastAPI 运行),启动消费者:

celery -A celery_app worker --loglevel=info --pool=solo
# 注意:Windows 下建议加 --pool=solo,Linux/Mac 默认即可

四、 改造 FastAPI:接入任务队列

现在,我们需要修改 FastAPI 的接口,使其只负责任务分发和状态查询。

# main.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
from celery_app import run_multi_agent_collaboration

app = FastAPI(title="Async Multi-Agent API")

class AgentRequest(BaseModel):
    query: str
    max_rounds: int = 3

# 1. 提交任务接口(毫秒级响应)
@app.post("/api/v1/agents/collaborate/async")
async def submit_task(request: AgentRequest):
    # .delay() 将任务推送到 Redis,立即返回 task_id
    task = run_multi_agent_collaboration.delay(request.query, request.max_rounds)
    return {
        "task_id": task.id,
        "status": "PENDING",
        "message": "Task has been queued."
    }

# 2. 查询任务状态接口
@app.get("/api/v1/tasks/{task_id}")
async def get_task_status(task_id: str):
    result = AsyncResult(task_id, app=run_multi_agent_collaboration.app)
    
    response = {
        "task_id": task_id,
        "status": result.status,
        "result": None
    }
    
    if result.status == "PROGRESS":
        response["progress"] = result.info.get("status")
    elif result.status == "SUCCESS":
        response["result"] = result.result
    elif result.status == "FAILURE":
        response["error"] = str(result.result)
        
    return response

五、测试用例

import requests
import json

url = "http://localhost:8000/api/v1/agents/collaborate/async"
data = {
    "query": "帮我写一份关于2024年AI Agent发展趋势的简报",
    "webhook_url": "http://your-webhook-url.com/callback"
}

response = requests.post(url, json=data)
print(json.dumps(response.json(), indent=2, ensure_ascii=False))

六、 进阶优化:生产级考量

在实际的企业落地中,仅仅跑通是不够的,还需要考虑以下工程细节:

  1. 结果过期策略:Redis 中的结果不能无限堆积。可以在 Celery 配置中添加 result_expires=3600,让一小时前的任务结果自动清除。
  2. 超时与重试机制:大模型 API 偶尔会限流或超时。可以使用 Celery 的 autoretry_for 参数:
  3. 并发控制:如果你的 LLM API 有严格的 QPS 限制,可以通过设置 Worker 的并发数(如 -c 2)或使用 celery-rate-limit 来平滑请求。

七、 本节小结与进阶展望

恭喜你!至此,你的 AI Agent 后端已经具备了企业级的健壮性。

  1. 彻底解耦:Web 服务与 AI 计算分离,互不影响。
  2. 高可用性:即使某个 Agent 任务卡死,也不会拖垮整个 API 网关。
  3. 弹性扩展:当任务量激增时,只需增加 Celery Worker 节点即可水平扩容。

课后思考:
目前前端依然需要通过定时轮询(Polling)来获取任务状态,这在一定程度上仍有延迟。你能否查阅资料,尝试在 FastAPI 中引入 WebSocketServer-Sent Events (SSE),让后端在 Celery 任务完成的瞬间,主动向前端推送结果?

在下一讲(第14讲)中,我们将走出开发环境,进入项目打包与容器化部署阶段,学习如何用 Docker 将这套包含 FastAPI、Celery、Redis 的复杂架构一键部署到云服务器上!

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐