欢迎来到《Python + AI Agent 实战开发完全指南》专栏!

在本讲中,我们将学习如何将复杂的多智能体协作系统封装为高性能的RESTful API服务,实现从本地脚本到云服务的架构升级。

一、工程化架构设计

1.1 分层架构思想

我们将采用经典的分层架构设计:

  • 业务逻辑层(Model):包含多智能体协作的核心逻辑
  • 接口控制层(Controller):处理HTTP请求和响应
  • 数据传输层(DTO):定义API的请求和响应模型

1.2 文件结构设计

multi-agent-api/
├── team_state_agent.py    # 多智能体业务逻辑
├── main.py              # FastAPI服务入口
├── requirements.txt     # 依赖管理
└── README.md            # 项目说明

二、核心业务逻辑实现

2.1 创建多智能体模块 (team_state_agent.py)

# team_state_agent.py
import os
from typing import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import HumanMessage, SystemMessage

# 初始化LLM
llm = ChatOpenAI(
    api_key="sk-XXXXXXXXXXXXXXXXXXXXXXXXXXXX", 
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", 
    model="qwen-turbo"
)

# 定义全局共享的结构化状态
class TeamState(TypedDict):
    query: str              # 用户的原始需求
    research_data: str      # 研究员收集的客观事实
    report: str             # 作家生成的最终文章
    loop_count: int         # 循环计数器

# 研究员节点
def researcher_node(state: TeamState):
    """研究员:只关注原始需求,输出研究资料"""
    prompt = [SystemMessage(content="你是一个严谨的行业研究员。请根据用户需求搜集客观事实和数据,以要点列表形式返回。不要写文章。")]
    prompt.append(HumanMessage(content=state["query"]))
    
    response = llm.invoke(prompt)
    return {"research_data": response.content}

# 作家节点
def writer_node(state: TeamState):
    """作家:只关注研究资料,输出最终文章"""
    prompt = [SystemMessage(content="你是一个资深科技媒体主编。请仅根据提供的研究要点撰写一篇引人入胜的文章。绝对不要输出任何关于调度或流程的词汇。")]
    prompt.append(HumanMessage(content=f"研究要点如下:\n{state['research_data']}"))
    
    response = llm.invoke(prompt)
    return {"report": response.content}

# 更新循环计数节点
def update_loop_node(state: TeamState):
    return {"loop_count": state.get("loop_count", 0) + 1}

# 创建多智能体工作流
def create_team_workflow():
    """创建并返回配置好的多智能体工作流"""
    builder = StateGraph(TeamState)
    
    # 添加节点
    builder.add_node("researcher", researcher_node)
    builder.add_node("writer", writer_node)
    builder.add_node("update_loop", update_loop_node)
    
    # 配置工作流
    builder.add_edge(START, "researcher")
    
    # 研究员完成后判断下一步
    builder.add_conditional_edges(
        "researcher", 
        lambda state: "writer" if state.get("research_data") else END,
        {"writer": "writer", END: END}
    )
    
    # 作家完成后判断是否需要继续
    builder.add_conditional_edges(
        "writer", 
        lambda state: END if state.get("report") else "researcher",
        {END: END, "researcher": "researcher"}
    )
    
    # 作家处理后更新循环计数
    builder.add_edge("writer", "update_loop")
    
    # 返回编译后的工作流
    return builder.compile()

2.2 创建API服务 (main.py)

# main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, Dict, Any
import uuid
import asyncio
from datetime import datetime
from team_state_agent import TeamState, create_team_workflow

app = FastAPI(
    title="Multi-Agent Collaboration API",
    description="企业级多智能体协作系统API",
    version="1.0.0"
)

# 创建多智能体工作流实例
team_workflow = create_team_workflow()

# 定义请求模型
class AgentRequest(BaseModel):
    query: str
    max_rounds: Optional[int] = 3

# 定义异步请求模型
class AsyncAgentRequest(AgentRequest):
    webhook_url: Optional[str] = None

# 定义响应模型
class AgentResponse(BaseModel):
    session_id: str
    status: str
    result: Optional[Dict[str, Any]] = None
    error: Optional[str] = None
    timestamp: datetime

# 在内存中维护会话状态
sessions: Dict[str, TeamState] = {}

@app.post("/api/v1/agents/collaborate", response_model=AgentResponse)
async def collaborate(request: AgentRequest):
    """
    同步多智能体协作API端点
    """
    session_id = str(uuid.uuid4())
    start_time = datetime.now()
    
    try:
        # 初始化会话状态
        initial_state = {
            "query": request.query,
            "research_data": "",
            "report": "",
            "loop_count": 0
        }
        sessions[session_id] = initial_state
        
        # 异步执行多智能体协作
        result = await asyncio.get_event_loop().run_in_executor(
            None, 
            team_workflow.invoke, 
            initial_state
        )
        
        # 构建成功响应
        response = AgentResponse(
            session_id=session_id,
            status="success",
            result={
                "research_data": result.get("research_data"),
                "report": result.get("report"),
                "total_rounds": result.get("loop_count")
            },
            timestamp=start_time
        )
        
        return response
        
    except Exception as e:
        # 构建错误响应
        error_response = AgentResponse(
            session_id=session_id,
            status="error",
            error=str(e),
            timestamp=start_time
        )
        raise HTTPException(status_code=500, detail=error_response.dict())
    
    finally:
        # 清理会话
        if session_id in sessions:
            del sessions[session_id]

@app.post("/api/v1/agents/collaborate/async")
async def async_collaborate(
    request: AsyncAgentRequest, 
    background_tasks: BackgroundTasks
):
    """
    异步多智能体协作API,支持Webhook回调
    """
    session_id = str(uuid.uuid4())
    
    def run_collaboration():
        try:
            # 执行多智能体协作
            initial_state = {
                "query": request.query,
                "research_data": "",
                "report": "",
                "loop_count": 0
            }
            result = team_workflow.invoke(initial_state)
            
            # 如果配置了Webhook,发送结果
            if request.webhook_url:
                import requests
                requests.post(request.webhook_url, json={
                    "session_id": session_id,
                    "status": "completed",
                    "result": result,
                    "timestamp": datetime.now().isoformat()
                })
                
        except Exception as e:
            if request.webhook_url:
                import requests
                requests.post(request.webhook_url, json={
                    "session_id": session_id,
                    "status": "failed",
                    "error": str(e),
                    "timestamp": datetime.now().isoformat()
                })
    
    # 添加到后台任务
    background_tasks.add_task(run_collaboration)
    
    return {
        "session_id": session_id,
        "status": "processing",
        "message": "Task has been queued for processing",
        "timestamp": datetime.now().isoformat()
    }

@app.get("/api/v1/agents/status/{session_id}", response_model=AgentResponse)
async def get_status(session_id: str):
    """
    查询会话状态的API端点
    """
    if session_id not in sessions:
        raise HTTPException(status_code=404, detail="Session not found")
    
    session = sessions[session_id]
    return AgentResponse(
        session_id=session_id,
        status="in_progress",
        result={
            "current_state": session,
            "progress": f"Loop {session.get('loop_count', 0)}"
        },
        timestamp=datetime.now()
    )

@app.get("/health")
async def health_check():
    """
    健康检查端点
    """
    return {"status": "healthy", "timestamp": datetime.now().isoformat()}

# 添加中间件
from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 全局异常处理器
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
    return {
        "error": "Internal server error",
        "message": str(exc),
        "timestamp": datetime.now().isoformat()
    }

# 请求日志中间件
@app.middleware("http")
async def log_requests(request, call_next):
    start_time = datetime.now()
    response = await call_next(request)
    process_time = (datetime.now() - start_time).total_seconds()
    
    print(f"Request: {request.method} {request.url} | "
          f"Status: {response.status_code} | "
          f"Time: {process_time:.2f}s")
    
    return response

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

三、依赖管理

3.1 创建依赖文件 (requirements.txt)

fastapi>=0.68.0
uvicorn>=0.15.0
pydantic>=1.8.2
langchain>=0.1.0
langchain-openai>=0.1.0
langgraph>=0.1.0
requests>=2.26.0

3.2 安装依赖

pip install -r requirements.txt

四、部署与运行

4.1 项目启动

# 方式1:直接运行
python main.py

# 方式2:使用uvicorn
uvicorn main:app --reload --host 0.0.0.0 --port 8000

4.2 API文档访问

服务启动后,访问以下地址查看自动生成的API文档:

  • Swagger UI: http://localhost:8000/docs
  • ReDoc: http://localhost:8000/redoc

五、测试用例

import requests
import json

url = "http://localhost:8000/api/v1/agents/collaborate"
data = {
    "query": "帮我写一份关于2024年AI Agent发展趋势的简报"
}

response = requests.post(url, json=data)
print(json.dumps(response.json(), indent=2, ensure_ascii=False))

六、本节小结

在本讲中,我们完成了多智能体系统的工程化部署:

  1. 模块化设计:将业务逻辑与API接口分离,提高了代码的可维护性
  2. 异步处理:使用FastAPI的异步支持,提升了服务的并发处理能力
  3. 状态管理:实现了会话状态的维护和清理机制
  4. 错误处理:建立了统一的错误处理和响应格式
  5. 扩展功能:支持异步任务处理和Webhook回调

通过这种工程化架构,我们的多智能体系统已经具备了企业级应用的完整能力,可以轻松集成到各种业务场景中。

课后思考

  1. 如何使用Redis替代内存存储来实现分布式会话管理?
  2. 如何为API添加身份验证和限流功能?
  3. 如何监控API的性能指标并设置告警?

在下一讲(第13讲)中,我们将学习如何构建前端界面,实现完整的全栈AI应用!

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐