第12讲:Agent应用的后端工程化基建——异步并发控制与Web服务化封装
·
欢迎来到《Python + AI Agent 实战开发完全指南》专栏!
在本讲中,我们将学习如何将复杂的多智能体协作系统封装为高性能的RESTful API服务,实现从本地脚本到云服务的架构升级。
一、工程化架构设计
1.1 分层架构思想
我们将采用经典的分层架构设计:
- 业务逻辑层(Model):包含多智能体协作的核心逻辑
- 接口控制层(Controller):处理HTTP请求和响应
- 数据传输层(DTO):定义API的请求和响应模型
1.2 文件结构设计
multi-agent-api/
├── team_state_agent.py # 多智能体业务逻辑
├── main.py # FastAPI服务入口
├── requirements.txt # 依赖管理
└── README.md # 项目说明
二、核心业务逻辑实现
2.1 创建多智能体模块 (team_state_agent.py)
# team_state_agent.py
import os
from typing import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import HumanMessage, SystemMessage
# 初始化LLM
llm = ChatOpenAI(
api_key="sk-XXXXXXXXXXXXXXXXXXXXXXXXXXXX",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
model="qwen-turbo"
)
# 定义全局共享的结构化状态
class TeamState(TypedDict):
query: str # 用户的原始需求
research_data: str # 研究员收集的客观事实
report: str # 作家生成的最终文章
loop_count: int # 循环计数器
# 研究员节点
def researcher_node(state: TeamState):
"""研究员:只关注原始需求,输出研究资料"""
prompt = [SystemMessage(content="你是一个严谨的行业研究员。请根据用户需求搜集客观事实和数据,以要点列表形式返回。不要写文章。")]
prompt.append(HumanMessage(content=state["query"]))
response = llm.invoke(prompt)
return {"research_data": response.content}
# 作家节点
def writer_node(state: TeamState):
"""作家:只关注研究资料,输出最终文章"""
prompt = [SystemMessage(content="你是一个资深科技媒体主编。请仅根据提供的研究要点撰写一篇引人入胜的文章。绝对不要输出任何关于调度或流程的词汇。")]
prompt.append(HumanMessage(content=f"研究要点如下:\n{state['research_data']}"))
response = llm.invoke(prompt)
return {"report": response.content}
# 更新循环计数节点
def update_loop_node(state: TeamState):
return {"loop_count": state.get("loop_count", 0) + 1}
# 创建多智能体工作流
def create_team_workflow():
"""创建并返回配置好的多智能体工作流"""
builder = StateGraph(TeamState)
# 添加节点
builder.add_node("researcher", researcher_node)
builder.add_node("writer", writer_node)
builder.add_node("update_loop", update_loop_node)
# 配置工作流
builder.add_edge(START, "researcher")
# 研究员完成后判断下一步
builder.add_conditional_edges(
"researcher",
lambda state: "writer" if state.get("research_data") else END,
{"writer": "writer", END: END}
)
# 作家完成后判断是否需要继续
builder.add_conditional_edges(
"writer",
lambda state: END if state.get("report") else "researcher",
{END: END, "researcher": "researcher"}
)
# 作家处理后更新循环计数
builder.add_edge("writer", "update_loop")
# 返回编译后的工作流
return builder.compile()
2.2 创建API服务 (main.py)
# main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, Dict, Any
import uuid
import asyncio
from datetime import datetime
from team_state_agent import TeamState, create_team_workflow
app = FastAPI(
title="Multi-Agent Collaboration API",
description="企业级多智能体协作系统API",
version="1.0.0"
)
# 创建多智能体工作流实例
team_workflow = create_team_workflow()
# 定义请求模型
class AgentRequest(BaseModel):
query: str
max_rounds: Optional[int] = 3
# 定义异步请求模型
class AsyncAgentRequest(AgentRequest):
webhook_url: Optional[str] = None
# 定义响应模型
class AgentResponse(BaseModel):
session_id: str
status: str
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
timestamp: datetime
# 在内存中维护会话状态
sessions: Dict[str, TeamState] = {}
@app.post("/api/v1/agents/collaborate", response_model=AgentResponse)
async def collaborate(request: AgentRequest):
"""
同步多智能体协作API端点
"""
session_id = str(uuid.uuid4())
start_time = datetime.now()
try:
# 初始化会话状态
initial_state = {
"query": request.query,
"research_data": "",
"report": "",
"loop_count": 0
}
sessions[session_id] = initial_state
# 异步执行多智能体协作
result = await asyncio.get_event_loop().run_in_executor(
None,
team_workflow.invoke,
initial_state
)
# 构建成功响应
response = AgentResponse(
session_id=session_id,
status="success",
result={
"research_data": result.get("research_data"),
"report": result.get("report"),
"total_rounds": result.get("loop_count")
},
timestamp=start_time
)
return response
except Exception as e:
# 构建错误响应
error_response = AgentResponse(
session_id=session_id,
status="error",
error=str(e),
timestamp=start_time
)
raise HTTPException(status_code=500, detail=error_response.dict())
finally:
# 清理会话
if session_id in sessions:
del sessions[session_id]
@app.post("/api/v1/agents/collaborate/async")
async def async_collaborate(
request: AsyncAgentRequest,
background_tasks: BackgroundTasks
):
"""
异步多智能体协作API,支持Webhook回调
"""
session_id = str(uuid.uuid4())
def run_collaboration():
try:
# 执行多智能体协作
initial_state = {
"query": request.query,
"research_data": "",
"report": "",
"loop_count": 0
}
result = team_workflow.invoke(initial_state)
# 如果配置了Webhook,发送结果
if request.webhook_url:
import requests
requests.post(request.webhook_url, json={
"session_id": session_id,
"status": "completed",
"result": result,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
if request.webhook_url:
import requests
requests.post(request.webhook_url, json={
"session_id": session_id,
"status": "failed",
"error": str(e),
"timestamp": datetime.now().isoformat()
})
# 添加到后台任务
background_tasks.add_task(run_collaboration)
return {
"session_id": session_id,
"status": "processing",
"message": "Task has been queued for processing",
"timestamp": datetime.now().isoformat()
}
@app.get("/api/v1/agents/status/{session_id}", response_model=AgentResponse)
async def get_status(session_id: str):
"""
查询会话状态的API端点
"""
if session_id not in sessions:
raise HTTPException(status_code=404, detail="Session not found")
session = sessions[session_id]
return AgentResponse(
session_id=session_id,
status="in_progress",
result={
"current_state": session,
"progress": f"Loop {session.get('loop_count', 0)}"
},
timestamp=datetime.now()
)
@app.get("/health")
async def health_check():
"""
健康检查端点
"""
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
# 添加中间件
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 全局异常处理器
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
return {
"error": "Internal server error",
"message": str(exc),
"timestamp": datetime.now().isoformat()
}
# 请求日志中间件
@app.middleware("http")
async def log_requests(request, call_next):
start_time = datetime.now()
response = await call_next(request)
process_time = (datetime.now() - start_time).total_seconds()
print(f"Request: {request.method} {request.url} | "
f"Status: {response.status_code} | "
f"Time: {process_time:.2f}s")
return response
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
三、依赖管理
3.1 创建依赖文件 (requirements.txt)
fastapi>=0.68.0
uvicorn>=0.15.0
pydantic>=1.8.2
langchain>=0.1.0
langchain-openai>=0.1.0
langgraph>=0.1.0
requests>=2.26.0
3.2 安装依赖
pip install -r requirements.txt
四、部署与运行
4.1 项目启动
# 方式1:直接运行
python main.py
# 方式2:使用uvicorn
uvicorn main:app --reload --host 0.0.0.0 --port 8000
4.2 API文档访问
服务启动后,访问以下地址查看自动生成的API文档:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
五、测试用例
import requests
import json
url = "http://localhost:8000/api/v1/agents/collaborate"
data = {
"query": "帮我写一份关于2024年AI Agent发展趋势的简报"
}
response = requests.post(url, json=data)
print(json.dumps(response.json(), indent=2, ensure_ascii=False))
六、本节小结
在本讲中,我们完成了多智能体系统的工程化部署:
- 模块化设计:将业务逻辑与API接口分离,提高了代码的可维护性
- 异步处理:使用FastAPI的异步支持,提升了服务的并发处理能力
- 状态管理:实现了会话状态的维护和清理机制
- 错误处理:建立了统一的错误处理和响应格式
- 扩展功能:支持异步任务处理和Webhook回调
通过这种工程化架构,我们的多智能体系统已经具备了企业级应用的完整能力,可以轻松集成到各种业务场景中。
课后思考:
- 如何使用Redis替代内存存储来实现分布式会话管理?
- 如何为API添加身份验证和限流功能?
- 如何监控API的性能指标并设置告警?
在下一讲(第13讲)中,我们将学习如何构建前端界面,实现完整的全栈AI应用!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)