八、完整实战:从零构建多Agent协作系统

8.1 实战目标

构建一个"技术方案评审"多Agent系统:

用户输入: 一个技术方案描述
系统输出: 多角度的评审报告

Agent团队:
  1. 架构师Agent: 评审架构设计的合理性
  2. 安全专家Agent: 评审安全风险
  3. 性能专家Agent: 评审性能瓶颈
  4. 汇总Agent: 整合所有评审意见,生成最终报告

架构模式: 管理者模式(汇总Agent作为管理者)
通信方式: 共享状态

8.2 完整实现

"""
完整实战: 技术方案评审多Agent系统
依赖: pip install openai
"""

from openai import OpenAI
from dataclasses import dataclass, field
from datetime import datetime
import json
from concurrent.futures import ThreadPoolExecutor, as_completed

client = OpenAI()


# ═══════════════════════════════════════════════════════════════════
# Part 1: Agent定义
# ═══════════════════════════════════════════════════════════════════

@dataclass
class ReviewAgent:
    """评审Agent"""
    name: str
    role: str
    system_prompt: str
    review_dimensions: list[str]  # 评审维度
    
    def review(self, proposal: str) -> dict:
        """对技术方案进行评审"""
        dimensions_text = "\n".join([f"  {i+1}. {d}" for i, d in enumerate(self.review_dimensions)])
        
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": f"""请从你的专业角度评审以下技术方案:

{proposal}

请从以下维度进行评审:
{dimensions_text}

输出JSON格式:
{{
  "score": 1-10的评分,
  "summary": "一句话总结评审结论",
  "issues": [
    {{"severity": "high/medium/low", "description": "问题描述", "suggestion": "改进建议"}}
  ],
  "highlights": ["方案的亮点1", "方案的亮点2"]
}}"""}
            ],
            response_format={"type": "json_object"},
            temperature=0.3
        )
        
        try:
            result = json.loads(response.choices[0].message.content)
            result["reviewer"] = self.name
            result["role"] = self.role
            return result
        except:
            return {
                "reviewer": self.name,
                "role": self.role,
                "score": 5,
                "summary": "评审过程出错",
                "issues": [],
                "highlights": []
            }


# ═══════════════════════════════════════════════════════════════════
# Part 2: 评审团队
# ═══════════════════════════════════════════════════════════════════

class ReviewTeam:
    """评审团队: 管理多个评审Agent"""
    
    def __init__(self):
        self.reviewers = self._create_reviewers()
    
    def _create_reviewers(self) -> list[ReviewAgent]:
        """创建评审团队"""
        return [
            ReviewAgent(
                name="架构师",
                role="系统架构评审",
                system_prompt="""你是一位资深系统架构师,有15年分布式系统设计经验。
你擅长评估系统架构的合理性、可扩展性和可维护性。
你的评审风格: 严谨但建设性,指出问题的同时给出改进方案。""",
                review_dimensions=[
                    "架构分层是否合理",
                    "组件职责是否清晰",
                    "可扩展性设计",
                    "技术选型是否合适",
                    "是否存在过度设计或设计不足"
                ]
            ),
            ReviewAgent(
                name="安全专家",
                role="安全风险评审",
                system_prompt="""你是一位信息安全专家,专注于应用安全和基础设施安全。
你擅长发现潜在的安全漏洞、数据泄露风险和权限问题。
你的评审风格: 从攻击者视角思考,找出最薄弱的环节。""",
                review_dimensions=[
                    "认证和授权机制",
                    "数据安全(传输和存储)",
                    "输入验证和注入防护",
                    "敏感信息保护",
                    "第三方依赖的安全性"
                ]
            ),
            ReviewAgent(
                name="性能专家",
                role="性能瓶颈评审",
                system_prompt="""你是一位性能工程专家,专注于高并发系统的性能优化。
你擅长识别性能瓶颈、评估系统容量和优化方案。
你的评审风格: 用数据说话,关注P99延迟和吞吐量。""",
                review_dimensions=[
                    "是否存在性能瓶颈",
                    "数据库查询效率",
                    "缓存策略是否合理",
                    "并发处理能力",
                    "资源使用效率"
                ]
            ),
        ]
    
    def review(self, proposal: str, parallel: bool = True) -> dict:
        """执行评审(支持并行)"""
        print(f"📋 开始评审,共 {len(self.reviewers)} 位评审专家")
        print(f"   并行模式: {'是' if parallel else '否'}")
        
        reviews = []
        
        if parallel:
            # 并行执行所有评审
            with ThreadPoolExecutor(max_workers=len(self.reviewers)) as executor:
                futures = {
                    executor.submit(reviewer.review, proposal): reviewer 
                    for reviewer in self.reviewers
                }
                for future in as_completed(futures):
                    reviewer = futures[future]
                    result = future.result()
                    reviews.append(result)
                    print(f"   ✅ {reviewer.name} 评审完成 (评分: {result.get('score', '?')}/10)")
        else:
            # 串行执行
            for reviewer in self.reviewers:
                print(f"   → {reviewer.name} 正在评审...")
                result = reviewer.review(proposal)
                reviews.append(result)
                print(f"   ✅ {reviewer.name} 评审完成 (评分: {result.get('score', '?')}/10)")
        
        # 汇总评审结果
        print(f"\n📊 汇总评审结果...")
        summary = self._synthesize(proposal, reviews)
        
        return {
            "individual_reviews": reviews,
            "summary": summary,
            "timestamp": datetime.now().isoformat()
        }
    
    def _synthesize(self, proposal: str, reviews: list[dict]) -> dict:
        """汇总所有评审意见"""
        reviews_text = ""
        for r in reviews:
            reviews_text += f"\n--- {r['reviewer']}({r['role']}) 评分:{r.get('score','?')}/10 ---\n"
            reviews_text += f"总结: {r.get('summary', '')}\n"
            if r.get('issues'):
                reviews_text += "问题:\n"
                for issue in r['issues']:
                    reviews_text += f"  [{issue.get('severity','')}] {issue.get('description','')}\n"
            if r.get('highlights'):
                reviews_text += f"亮点: {', '.join(r.get('highlights', []))}\n"
        
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "user",
                "content": f"""请汇总以下多位专家的评审意见,生成最终评审报告:

技术方案:
{proposal[:500]}

各专家评审:
{reviews_text}

输出JSON格式:
{{
  "overall_score": 综合评分(1-10),
  "verdict": "通过/有条件通过/需要修改/不通过",
  "key_risks": ["最关键的风险1", "风险2"],
  "must_fix": ["必须修改的问题1", "问题2"],
  "nice_to_have": ["建议优化的点1", "点2"],
  "final_comment": "给方案作者的综合建议(2-3句话)"
}}"""
            }],
            response_format={"type": "json_object"},
            temperature=0.3
        )
        
        try:
            return json.loads(response.choices[0].message.content)
        except:
            return {"overall_score": 5, "verdict": "需要修改", "final_comment": "汇总过程出错"}


# ═══════════════════════════════════════════════════════════════════
# Part 3: 使用示例
# ═══════════════════════════════════════════════════════════════════

if __name__ == "__main__":
    team = ReviewTeam()
    
    proposal = """
    技术方案: 订单服务重构
    
    1. 架构: 从单体拆分为微服务,使用gRPC通信
    2. 数据库: 从MySQL迁移到PostgreSQL,引入读写分离
    3. 缓存: 使用Redis缓存热点订单数据,TTL=5分钟
    4. 消息队列: 引入Kafka处理异步任务(发送通知、更新统计)
    5. API网关: 使用Kong作为API网关,统一认证和限流
    6. 部署: Kubernetes部署,HPA自动扩缩容
    """
    
    result = team.review(proposal, parallel=True)
    
    print("\n" + "="*60)
    print("📝 最终评审报告")
    print("="*60)
    
    summary = result["summary"]
    print(f"\n综合评分: {summary.get('overall_score', '?')}/10")
    print(f"评审结论: {summary.get('verdict', '?')}")
    print(f"\n关键风险:")
    for risk in summary.get('key_risks', []):
        print(f"  ⚠️ {risk}")
    print(f"\n必须修改:")
    for fix in summary.get('must_fix', []):
        print(f"  🔴 {fix}")
    print(f"\n建议优化:")
    for nice in summary.get('nice_to_have', []):
        print(f"  🟡 {nice}")
    print(f"\n综合建议: {summary.get('final_comment', '')}")
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐