18.1-大模型智能体开发工程师:从零构建多Agent协作系统
·
八、完整实战:从零构建多Agent协作系统
8.1 实战目标
构建一个"技术方案评审"多Agent系统:
用户输入: 一个技术方案描述
系统输出: 多角度的评审报告
Agent团队:
1. 架构师Agent: 评审架构设计的合理性
2. 安全专家Agent: 评审安全风险
3. 性能专家Agent: 评审性能瓶颈
4. 汇总Agent: 整合所有评审意见,生成最终报告
架构模式: 管理者模式(汇总Agent作为管理者)
通信方式: 共享状态
8.2 完整实现
"""
完整实战: 技术方案评审多Agent系统
依赖: pip install openai
"""
from openai import OpenAI
from dataclasses import dataclass, field
from datetime import datetime
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
client = OpenAI()
# ═══════════════════════════════════════════════════════════════════
# Part 1: Agent定义
# ═══════════════════════════════════════════════════════════════════
@dataclass
class ReviewAgent:
"""评审Agent"""
name: str
role: str
system_prompt: str
review_dimensions: list[str] # 评审维度
def review(self, proposal: str) -> dict:
"""对技术方案进行评审"""
dimensions_text = "\n".join([f" {i+1}. {d}" for i, d in enumerate(self.review_dimensions)])
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": f"""请从你的专业角度评审以下技术方案:
{proposal}
请从以下维度进行评审:
{dimensions_text}
输出JSON格式:
{{
"score": 1-10的评分,
"summary": "一句话总结评审结论",
"issues": [
{{"severity": "high/medium/low", "description": "问题描述", "suggestion": "改进建议"}}
],
"highlights": ["方案的亮点1", "方案的亮点2"]
}}"""}
],
response_format={"type": "json_object"},
temperature=0.3
)
try:
result = json.loads(response.choices[0].message.content)
result["reviewer"] = self.name
result["role"] = self.role
return result
except:
return {
"reviewer": self.name,
"role": self.role,
"score": 5,
"summary": "评审过程出错",
"issues": [],
"highlights": []
}
# ═══════════════════════════════════════════════════════════════════
# Part 2: 评审团队
# ═══════════════════════════════════════════════════════════════════
class ReviewTeam:
"""评审团队: 管理多个评审Agent"""
def __init__(self):
self.reviewers = self._create_reviewers()
def _create_reviewers(self) -> list[ReviewAgent]:
"""创建评审团队"""
return [
ReviewAgent(
name="架构师",
role="系统架构评审",
system_prompt="""你是一位资深系统架构师,有15年分布式系统设计经验。
你擅长评估系统架构的合理性、可扩展性和可维护性。
你的评审风格: 严谨但建设性,指出问题的同时给出改进方案。""",
review_dimensions=[
"架构分层是否合理",
"组件职责是否清晰",
"可扩展性设计",
"技术选型是否合适",
"是否存在过度设计或设计不足"
]
),
ReviewAgent(
name="安全专家",
role="安全风险评审",
system_prompt="""你是一位信息安全专家,专注于应用安全和基础设施安全。
你擅长发现潜在的安全漏洞、数据泄露风险和权限问题。
你的评审风格: 从攻击者视角思考,找出最薄弱的环节。""",
review_dimensions=[
"认证和授权机制",
"数据安全(传输和存储)",
"输入验证和注入防护",
"敏感信息保护",
"第三方依赖的安全性"
]
),
ReviewAgent(
name="性能专家",
role="性能瓶颈评审",
system_prompt="""你是一位性能工程专家,专注于高并发系统的性能优化。
你擅长识别性能瓶颈、评估系统容量和优化方案。
你的评审风格: 用数据说话,关注P99延迟和吞吐量。""",
review_dimensions=[
"是否存在性能瓶颈",
"数据库查询效率",
"缓存策略是否合理",
"并发处理能力",
"资源使用效率"
]
),
]
def review(self, proposal: str, parallel: bool = True) -> dict:
"""执行评审(支持并行)"""
print(f"📋 开始评审,共 {len(self.reviewers)} 位评审专家")
print(f" 并行模式: {'是' if parallel else '否'}")
reviews = []
if parallel:
# 并行执行所有评审
with ThreadPoolExecutor(max_workers=len(self.reviewers)) as executor:
futures = {
executor.submit(reviewer.review, proposal): reviewer
for reviewer in self.reviewers
}
for future in as_completed(futures):
reviewer = futures[future]
result = future.result()
reviews.append(result)
print(f" ✅ {reviewer.name} 评审完成 (评分: {result.get('score', '?')}/10)")
else:
# 串行执行
for reviewer in self.reviewers:
print(f" → {reviewer.name} 正在评审...")
result = reviewer.review(proposal)
reviews.append(result)
print(f" ✅ {reviewer.name} 评审完成 (评分: {result.get('score', '?')}/10)")
# 汇总评审结果
print(f"\n📊 汇总评审结果...")
summary = self._synthesize(proposal, reviews)
return {
"individual_reviews": reviews,
"summary": summary,
"timestamp": datetime.now().isoformat()
}
def _synthesize(self, proposal: str, reviews: list[dict]) -> dict:
"""汇总所有评审意见"""
reviews_text = ""
for r in reviews:
reviews_text += f"\n--- {r['reviewer']}({r['role']}) 评分:{r.get('score','?')}/10 ---\n"
reviews_text += f"总结: {r.get('summary', '')}\n"
if r.get('issues'):
reviews_text += "问题:\n"
for issue in r['issues']:
reviews_text += f" [{issue.get('severity','')}] {issue.get('description','')}\n"
if r.get('highlights'):
reviews_text += f"亮点: {', '.join(r.get('highlights', []))}\n"
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"""请汇总以下多位专家的评审意见,生成最终评审报告:
技术方案:
{proposal[:500]}
各专家评审:
{reviews_text}
输出JSON格式:
{{
"overall_score": 综合评分(1-10),
"verdict": "通过/有条件通过/需要修改/不通过",
"key_risks": ["最关键的风险1", "风险2"],
"must_fix": ["必须修改的问题1", "问题2"],
"nice_to_have": ["建议优化的点1", "点2"],
"final_comment": "给方案作者的综合建议(2-3句话)"
}}"""
}],
response_format={"type": "json_object"},
temperature=0.3
)
try:
return json.loads(response.choices[0].message.content)
except:
return {"overall_score": 5, "verdict": "需要修改", "final_comment": "汇总过程出错"}
# ═══════════════════════════════════════════════════════════════════
# Part 3: 使用示例
# ═══════════════════════════════════════════════════════════════════
if __name__ == "__main__":
team = ReviewTeam()
proposal = """
技术方案: 订单服务重构
1. 架构: 从单体拆分为微服务,使用gRPC通信
2. 数据库: 从MySQL迁移到PostgreSQL,引入读写分离
3. 缓存: 使用Redis缓存热点订单数据,TTL=5分钟
4. 消息队列: 引入Kafka处理异步任务(发送通知、更新统计)
5. API网关: 使用Kong作为API网关,统一认证和限流
6. 部署: Kubernetes部署,HPA自动扩缩容
"""
result = team.review(proposal, parallel=True)
print("\n" + "="*60)
print("📝 最终评审报告")
print("="*60)
summary = result["summary"]
print(f"\n综合评分: {summary.get('overall_score', '?')}/10")
print(f"评审结论: {summary.get('verdict', '?')}")
print(f"\n关键风险:")
for risk in summary.get('key_risks', []):
print(f" ⚠️ {risk}")
print(f"\n必须修改:")
for fix in summary.get('must_fix', []):
print(f" 🔴 {fix}")
print(f"\n建议优化:")
for nice in summary.get('nice_to_have', []):
print(f" 🟡 {nice}")
print(f"\n综合建议: {summary.get('final_comment', '')}")
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)