本文基于AutoGen 0.4版本,全面讲解多智能体系统的开发原理、核心API、最佳实践与生产部署方案。


第一章:AutoGen框架概述与核心架构

1.1 AutoGen简介与发展历程

AutoGen是由Microsoft Research开发的开源多智能体框架,旨在简化基于大语言模型(LLM)的智能体系统构建。其核心理念是让多个AI智能体通过对话协作完成复杂任务。

2023年末 AutoGen 0.2发布 成为多智能体系统标杆框架 2024年中 架构重构启动 引入异步事件驱动模型 2025年初 AutoGen 0.4正式发布 完全重写,分层架构 2025年末 与Semantic Kernel合并 统一为Microsoft Agent Framework AutoGen发展历程

核心设计理念

  • 对话驱动:智能体通过自然语言对话协作,而非硬编码的工作流
  • 角色扮演:每个Agent扮演特定角色,拥有独立的system_message和能力
  • 人机协作:支持Human-in-the-loop,人类可随时介入决策

1.2 核心组件架构

AutoGen 0.4采用分层架构设计,从高层抽象到底层原语,满足不同场景需求:

扩展层 - Extensions

底层API - Core

高层API - AgentChat

应用层

AutoGen Studio
可视化开发

AssistantAgent

UserProxyAgent

Team/GroupChat

TerminationCondition

RoutedAgent

AgentRuntime

MessageContext

Subscription

McpWorkbench

OpenAIAssistantAgent

GrpcWorkerAgentRuntime

各层职责

层级 组件 适用场景 抽象级别
应用层 Studio 快速原型开发、无代码构建 最高
高层API AgentChat 大多数应用开发场景
底层API Core 复杂工作流、分布式系统
扩展层 Extensions 集成外部服务、MCP协议

1.3 三层API体系详解

AgentChat层(推荐大多数开发者使用)

AgentChat提供了开箱即用的智能体类型和团队协作模式:

# AgentChat层示例:最简单的智能体
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def main():
    # 创建模型客户端
    model_client = OpenAIChatCompletionClient(
        model="gpt-4o",
        api_key="your-api-key"
    )
    
    # 创建助手智能体
    agent = AssistantAgent(
        name="assistant",
        model_client=model_client,
        system_message="你是一个有帮助的AI助手。"
    )
    
    # 运行智能体
    result = await agent.run(task="你好,请介绍一下你自己。")
    print(result.messages[-1].content)

asyncio.run(main())
Core层(高级开发者使用)

Core层提供事件驱动的底层原语,支持构建复杂的分布式智能体系统:

# Core层示例:事件驱动智能体
from autogen_core import RoutedAgent, message_handler, SingleThreadedAgentRuntime
from dataclasses import dataclass

# 定义消息类型
@dataclass
class TaskMessage:
    content: str

# 定义智能体
class WorkerAgent(RoutedAgent):
    def __init__(self):
        super().__init__("处理工作任务")
    
    @message_handler
    async def handle_task(self, message: TaskMessage, ctx) -> None:
        # 处理任务逻辑
        print(f"收到任务: {message.content}")

# 注册并运行
async def main():
    runtime = SingleThreadedAgentRuntime()
    await WorkerAgent.register(runtime, "worker", lambda: WorkerAgent())
    runtime.start()
    # 发送消息...
    await runtime.stop()

asyncio.run(main())

1.4 异步编程模型基础

AutoGen 0.4完全基于异步架构,理解asyncio是开发的基础:

import asyncio

# 异步函数定义
async def fetch_data():
    """异步函数使用async关键字定义"""
    await asyncio.sleep(1)  # 模拟IO操作
    return "数据已获取"

# 并发执行多个任务
async def concurrent_tasks():
    """使用gather并发执行多个异步任务"""
    results = await asyncio.gather(
        fetch_data(),
        fetch_data(),
        fetch_data()
    )
    print(f"并发结果: {results}")

# 异步生成器(用于流式输出)
async def stream_results():
    """异步生成器使用async def和yield"""
    for i in range(5):
        await asyncio.sleep(0.1)
        yield f"结果 {i}"

# 消费异步生成器
async def consume_stream():
    """使用async for消费异步生成器"""
    async for result in stream_results():
        print(result)

# 运行异步程序
asyncio.run(concurrent_tasks())

关键概念

  • async def:定义异步函数(协程)
  • await:等待异步操作完成
  • asyncio.gather():并发执行多个协程
  • async for:异步迭代
  • asyncio.run():运行主协程

1.5 环境安装与配置

使用pip安装
# 安装AgentChat核心包和OpenAI扩展
pip install -U "autogen-agentchat" "autogen-ext[openai]"

# 安装完整功能(包含Docker代码执行、Azure等)
pip install -U "autogen-agentchat" "autogen-ext[openai,azure,docker]"
使用uv安装(推荐)
# uv是更快的Python包管理器
pip install uv

# 创建项目并安装依赖
uv init my-autogen-project
cd my-autogen-project
uv add "autogen-agentchat" "autogen-ext[openai]"
环境变量配置
# 推荐使用环境变量管理API密钥
import os
from dotenv import load_dotenv

# 从.env文件加载环境变量
load_dotenv()

# 获取API密钥
api_key = os.getenv("OPENAI_API_KEY")
# .env文件示例
OPENAI_API_KEY=sk-your-api-key-here
AZURE_OPENAI_API_KEY=your-azure-key
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/

第二章:AgentChat层——Agent核心类型与参数

2.1 Agent继承体系

AutoGen的Agent体系采用清晰的继承结构:

«abstract»

BaseChatAgent

+name: str

+description: str

+produced_message_types: list

+on_messages() : Response

+on_reset() : void

AssistantAgent

+model_client: ChatCompletionClient

+system_message: str

+tools: list

+handoffs: list

UserProxyAgent

+human_input_mode: str

+max_consecutive_auto_reply: int

CodeExecutorAgent

+code_executor: CodeExecutor

OpenAIAssistantAgent

+assistant_id: str

2.2 BaseChatAgent核心方法详解

BaseChatAgent是所有Agent的抽象基类,定义了核心接口:

from abc import ABC, abstractmethod
from typing import Sequence, List
from autogen_agentchat.messages import BaseChatMessage
from autogen_core import CancellationToken

class BaseChatAgent(ABC):
    """Agent基类,定义核心接口"""
    
    @property
    @abstractmethod
    def name(self) -> str:
        """Agent的唯一标识名称"""
        pass
    
    @property
    @abstractmethod
    def description(self) -> str:
        """Agent的功能描述,用于Team中选择发言者"""
        pass
    
    @abstractmethod
    async def on_messages(
        self, 
        messages: Sequence[BaseChatMessage], 
        cancellation_token: CancellationToken
    ) -> Response:
        """
        处理传入消息并返回响应
        
        参数:
            messages: 接收到的消息序列
            cancellation_token: 用于取消操作的令牌
        
        返回:
            Response对象,包含chat_message和inner_messages
        """
        pass
    
    @abstractmethod
    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        """
        重置Agent到初始状态
        
        参数:
            cancellation_token: 取消令牌
        """
        pass
    
    @property
    def produced_message_types(self) -> List[type]:
        """
        Agent可能产生的消息类型列表
        
        用于类型检查和消息路由
        """
        return []

Response对象详解

from dataclasses import dataclass
from typing import List, Optional
from autogen_agentchat.messages import BaseChatMessage, BaseAgentEvent

@dataclass
class Response:
    """Agent响应对象"""
    
    # 主要输出消息,发送给其他Agent或用户
    chat_message: BaseChatMessage
    
    # 内部事件列表,用于UI展示或调试
    # 包含ToolCallEvent、ToolCallExecutionEvent等
    inner_messages: Optional[List[BaseAgentEvent]] = None
    
    # 是否请求终止对话
    terminate: bool = False

2.3 AssistantAgent参数详解

AssistantAgent是最常用的智能体类型,由LLM驱动:

from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

# 创建模型客户端
model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    api_key="your-api-key"
)

# 创建AssistantAgent
assistant = AssistantAgent(
    # === 必需参数 ===
    name="researcher",                    # Agent唯一标识,用于日志和消息路由
    
    # === 模型配置 ===
    model_client=model_client,            # 模型客户端实例
    
    # === 行为配置 ===
    system_message="""你是一个专业的研究助手。
    你的职责是:
    1. 分析用户的研究问题
    2. 搜索相关信息
    3. 提供结构化的研究报告
    
    请始终保持客观、准确。
    """,                                  # 系统提示词,定义Agent角色和行为
    
    description="专业研究助手,擅长信息搜索和分析",  # 功能描述,用于Team发言者选择
    
    # === 工具配置 ===
    tools=[search_tool, analyze_tool],    # 可用工具列表
    
    # === Handoff配置 ===
    handoffs=["writer", "reviewer"],      # 可转交任务的Agent名称列表
    
    # === 高级配置 ===
    reflect_on_tool_use=True,             # 是否在工具使用后进行反思
    tool_call_summary_format="{result}",  # 工具调用摘要格式
)

参数详解表

参数 类型 必需 默认值 说明
name str - Agent唯一标识,建议使用小写字母和下划线
model_client ChatCompletionClient - 模型客户端,支持OpenAI、Azure等
system_message str 默认助手提示词 定义Agent角色、能力和行为规范
description str 基于system_message生成 用于Team中发言者选择的描述
tools List[Tool] [] 可调用的工具列表
handoffs List[str] [] 可转交任务的Agent名称
reflect_on_tool_use bool False 工具使用后是否让LLM反思结果
tool_call_summary_format str “{result}” 工具调用结果摘要格式

2.4 UserProxyAgent参数详解

UserProxyAgent代表人机交互接口,支持人工介入:

from autogen_agentchat.agents import UserProxyAgent

user_proxy = UserProxyAgent(
    name="user",
    
    # === 人工输入模式 ===
    human_input_mode="ALWAYS",  # NEVER / ALWAYS / TERMINATE
    
    # === 自动回复配置 ===
    max_consecutive_auto_reply=3,  # 最大连续自动回复次数
    
    # === 代码执行配置 ===
    code_executor=code_executor,   # 代码执行器实例
)

# human_input_mode详解:
# NEVER: 完全自动,不请求人工输入
# ALWAYS: 每次都请求人工输入
# TERMINATE: 仅在终止条件触发时请求人工输入

human_input_mode对比

TERMINATE模式

Agent运行

终止条件?

请求人工确认

ALWAYS模式

Agent运行

请求人工输入

人工回复

NEVER模式

Agent自动运行

无需人工介入

适合自动化场景

2.5 CodeExecutorAgent与OpenAIAssistantAgent

CodeExecutorAgent

专门用于执行代码的Agent:

from autogen_agentchat.agents import CodeExecutorAgent
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor

# 创建代码执行器
code_executor = LocalCommandLineCodeExecutor(
    work_dir="./workspace",  # 工作目录
    timeout=60,              # 执行超时(秒)
)

# 创建代码执行Agent
code_agent = CodeExecutorAgent(
    name="code_executor",
    code_executor=code_executor,
)
OpenAIAssistantAgent

使用OpenAI Assistant API的Agent:

from autogen_ext.agents.openai import OpenAIAssistantAgent

# 创建OpenAI Assistant Agent
assistant = OpenAIAssistantAgent(
    name="openai_assistant",
    model_client=model_client,
    instructions="你是一个有帮助的助手",
    tools=[{"type": "code_interpreter"}],  # 支持代码解释器
)

2.6 human_input_mode三种模式对比

import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def demonstrate_human_input_modes():
    """演示三种human_input_mode的区别"""
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    # 模式1: NEVER - 完全自动
    auto_agent = UserProxyAgent(
        name="auto_user",
        human_input_mode="NEVER",
        max_consecutive_auto_reply=1,
    )
    
    # 模式2: ALWAYS - 每次都请求输入
    interactive_agent = UserProxyAgent(
        name="interactive_user",
        human_input_mode="ALWAYS",
    )
    
    # 模式3: TERMINATE - 终止时请求输入
    confirm_agent = UserProxyAgent(
        name="confirm_user",
        human_input_mode="TERMINATE",
    )
    
    # 创建助手
    assistant = AssistantAgent(
        name="assistant",
        model_client=model_client,
    )
    
    # NEVER模式示例:完全自动化执行
    print("=== NEVER模式 ===")
    result = await assistant.run(task="计算1+1等于多少")
    print(f"结果: {result.messages[-1].content}")

asyncio.run(demonstrate_human_input_modes())

2.7 自定义Agent开发

继承BaseChatAgent创建自定义Agent:

from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.messages import TextMessage, Response
from autogen_core import CancellationToken
from typing import Sequence, List, AsyncGenerator

class CustomAnalysisAgent(BaseChatAgent):
    """
    自定义分析Agent示例
    
    功能:对输入文本进行分析并返回结构化结果
    """
    
    def __init__(self, name: str, analysis_type: str = "sentiment"):
        """
        初始化自定义Agent
        
        参数:
            name: Agent名称
            analysis_type: 分析类型(sentiment/keyword/summary)
        """
        self._name = name
        self._analysis_type = analysis_type
        self._message_count = 0
    
    @property
    def name(self) -> str:
        """返回Agent名称"""
        return self._name
    
    @property
    def description(self) -> str:
        """返回Agent描述"""
        return f"文本分析Agent,擅长{self._analysis_type}分析"
    
    @property
    def produced_message_types(self) -> List[type]:
        """声明产生的消息类型"""
        return [TextMessage]
    
    async def on_messages(
        self, 
        messages: Sequence[TextMessage], 
        cancellation_token: CancellationToken
    ) -> Response:
        """
        处理消息的核心方法
        
        参数:
            messages: 输入消息序列
            cancellation_token: 取消令牌
        
        返回:
            Response对象
        """
        # 检查是否被取消
        if cancellation_token.is_cancelled:
            raise asyncio.CancelledError("操作被取消")
        
        # 获取最后一条消息
        last_message = messages[-1] if messages else None
        if not last_message:
            return Response(
                chat_message=TextMessage(
                    content="没有收到消息",
                    source=self.name
                )
            )
        
        # 执行分析逻辑
        self._message_count += 1
        analysis_result = await self._analyze(last_message.content)
        
        # 构建响应
        response_message = TextMessage(
            content=f"分析结果(第{self._message_count}次):\n{analysis_result}",
            source=self.name
        )
        
        return Response(chat_message=response_message)
    
    async def _analyze(self, text: str) -> str:
        """
        执行文本分析(示例实现)
        
        参数:
            text: 待分析文本
        
        返回:
            分析结果字符串
        """
        # 模拟分析过程
        await asyncio.sleep(0.1)
        
        if self._analysis_type == "sentiment":
            return f"情感分析: 文本长度={len(text)}, 倾向=中性"
        elif self._analysis_type == "keyword":
            words = text.split()
            return f"关键词提取: {words[:5]}"
        else:
            return f"摘要: {text[:100]}..."
    
    async def on_reset(self, cancellation_token: CancellationToken) -> None:
        """重置Agent状态"""
        self._message_count = 0


# 使用自定义Agent
async def use_custom_agent():
    """演示自定义Agent的使用"""
    
    agent = CustomAnalysisAgent(
        name="analyzer",
        analysis_type="sentiment"
    )
    
    # 运行Agent
    result = await agent.run(task="今天天气真好,心情很愉快!")
    print(result.messages[-1].content)

asyncio.run(use_custom_agent())

2.8 实战场景:双Agent对话实现Human-in-the-loop

import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def human_in_the_loop_demo():
    """
    双Agent对话示例:实现人机协作
    
    场景:AI助手与用户协作完成代码编写任务
    """
    
    # 创建模型客户端
    model_client = OpenAIChatCompletionClient(
        model="gpt-4o",
        api_key="your-api-key"
    )
    
    # 创建AI助手Agent
    assistant = AssistantAgent(
        name="coder",
        model_client=model_client,
        system_message="""你是一个专业的Python程序员。
        当用户提出需求时,你负责:
        1. 理解需求
        2. 编写代码
        3. 解释代码逻辑
        
        当任务完成时,请回复'完成'。
        """
    )
    
    # 创建用户代理Agent(TERMINATE模式)
    user_proxy = UserProxyAgent(
        name="user",
        human_input_mode="TERMINATE",  # 在终止时请求用户确认
    )
    
    # 创建终止条件
    termination = TextMentionTermination("完成")
    
    # 创建对话团队
    team = RoundRobinGroupChat(
        participants=[user_proxy, assistant],
        termination_condition=termination
    )
    
    # 运行对话
    print("=== 开始对话 ===")
    print("提示:输入'完成'结束对话\n")
    
    async for message in team.run_stream(task="请帮我写一个计算斐波那契数列的函数"):
        if hasattr(message, 'content'):
            print(f"[{message.source}]: {message.content}\n")

asyncio.run(human_in_the_loop_demo())

第三章:模型客户端与配置体系

3.1 OpenAIChatCompletionClient参数详解

from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_core.models import ModelInfo

# 基础配置
client = OpenAIChatCompletionClient(
    # === 必需参数 ===
    model="gpt-4o",                    # 模型名称
    
    # === 认证配置 ===
    api_key="sk-your-api-key",         # API密钥(也可通过环境变量设置)
    
    # === 端点配置 ===
    base_url="https://api.openai.com/v1",  # API端点(自定义端点时修改)
    
    # === 模型能力配置 ===
    model_info=ModelInfo(
        vision=True,                   # 是否支持视觉
        function_calling=True,         # 是否支持函数调用
        json_output=True,              # 是否支持JSON输出
        family="gpt-4o",               # 模型家族
    ),
    
    # === 生成参数 ===
    temperature=0.7,                   # 温度(0-2),控制随机性
    max_tokens=4096,                   # 最大生成token数
    top_p=1.0,                        # 核采样参数
    frequency_penalty=0.0,             # 频率惩罚(-2到2)
    presence_penalty=0.0,              # 存在惩罚(-2到2)
    stop=["END", "STOP"],              # 停止词列表
    
    # === 超时配置 ===
    timeout=60,                        # 请求超时(秒)
)

# 使用环境变量的推荐方式
import os
client = OpenAIChatCompletionClient(
    model="gpt-4o",
    api_key=os.getenv("OPENAI_API_KEY"),  # 从环境变量读取
)

参数详解表

参数 类型 说明 取值范围
model str 模型标识符 gpt-4o, gpt-4-turbo, gpt-3.5-turbo等
api_key str API密钥 从OpenAI获取
base_url str API端点 默认OpenAI端点或自定义
temperature float 控制输出随机性 0.0-2.0,越高越随机
max_tokens int 最大输出token数 1-模型上限
top_p float 核采样 0.0-1.0
frequency_penalty float 降低重复词频率 -2.0到2.0
presence_penalty float 鼓励新话题 -2.0到2.0
timeout int 请求超时秒数 建议30-120

3.2 llm_config配置体系

llm_config是AutoGen的模型配置字典,支持多模型配置和缓存:

import os
from autogen_ext.models.openai import OpenAIChatCompletionClient

# 方式1:直接使用OpenAIChatCompletionClient(推荐)
# 注意:生产环境请使用环境变量,不要硬编码API Key
model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    api_key=os.getenv("OPENAI_API_KEY"),  # 推荐使用环境变量
    temperature=0.7,
)

# 方式2:使用config_list(兼容v0.2风格)
config_list = [
    {
        "model": "gpt-4o",
        "api_key": os.getenv("OPENAI_API_KEY"),  # 推荐使用环境变量
    },
    {
        "model": "gpt-3.5-turbo",
        "api_key": os.getenv("OPENAI_API_KEY"),
    }
]

# 完整的llm_config示例
llm_config = {
    "config_list": config_list,
    "cache_seed": 42,           # 缓存种子,相同种子+相同输入=相同输出
    "timeout": 120,             # 请求超时
    "temperature": 0.7,         # 默认温度
    "seed": 42,                 # 随机种子,用于可复现性
}

# 缓存机制说明
# cache_seed=None: 禁用缓存
# cache_seed=42: 启用缓存,相同种子保证可复现

缓存机制工作原理

命中

未命中

请求

缓存检查

返回缓存结果

调用LLM

存储到缓存

返回结果

cache_seed

请求内容hash

3.3 多模型支持

Azure OpenAI配置
from autogen_ext.models.openai import OpenAIChatCompletionClient
from azure.identity import DefaultAzureCredential

# 方式1:使用API Key
azure_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    api_key="your-azure-api-key",
    base_url="https://your-resource.openai.azure.com/",
    api_version="2024-02-15-preview",
)

# 方式2:使用Azure AD认证
azure_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    azure_endpoint="https://your-resource.openai.azure.com/",
    azure_ad_token_provider=DefaultAzureCredential(),
    api_version="2024-02-15-preview",
)
本地模型配置(通过LiteLLM)
# 使用LiteLLM代理本地模型
local_client = OpenAIChatCompletionClient(
    model="llama-3",  # 或其他本地模型名称
    base_url="http://localhost:8000/v1",  # LiteLLM代理地址
    api_key="dummy-key",  # 本地模型通常不需要真实key
)

# 使用Ollama
ollama_client = OpenAIChatCompletionClient(
    model="llama3",
    base_url="http://localhost:11434/v1",
    api_key="ollama",
)
多模型负载均衡
import asyncio
from typing import List
from autogen_ext.models.openai import OpenAIChatCompletionClient

class ModelLoadBalancer:
    """
    多模型负载均衡器
    
    功能:在多个模型间分配请求,支持轮询和故障转移
    """
    
    def __init__(self, model_configs: List[dict]):
        """
        初始化负载均衡器
        
        参数:
            model_configs: 模型配置列表
        """
        self.clients = [
            OpenAIChatCompletionClient(**config)
            for config in model_configs
        ]
        self.current_index = 0
    
    async def create(self, messages, **kwargs):
        """
        轮询方式选择模型创建响应
        
        参数:
            messages: 消息列表
            **kwargs: 其他参数
        
        返回:
            模型响应
        """
        # 尝试所有模型
        for _ in range(len(self.clients)):
            client = self.clients[self.current_index]
            try:
                result = await client.create(messages, **kwargs)
                self.current_index = (self.current_index + 1) % len(self.clients)
                return result
            except Exception as e:
                print(f"模型 {self.current_index} 失败: {e}")
                self.current_index = (self.current_index + 1) % len(self.clients)
        
        raise RuntimeError("所有模型都不可用")

# 使用示例
configs = [
    {"model": "gpt-4o", "api_key": os.getenv("OPENAI_API_KEY")},
    {"model": "gpt-3.5-turbo", "api_key": os.getenv("OPENAI_API_KEY")},
]
balancer = ModelLoadBalancer(configs)

3.4 流式输出配置与实现

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def streaming_demo():
    """
    流式输出演示
    
    展示两种流式输出方式:
    1. 使用Console组件自动显示
    2. 手动处理流式输出
    """
    
    model_client = OpenAIChatCompletionClient(
        model="gpt-4o",
        stream=True,  # 启用流式输出
    )
    
    agent = AssistantAgent(
        name="assistant",
        model_client=model_client,
    )
    
    # 方式1:使用Console组件(推荐)
    print("=== 方式1:Console组件 ===")
    stream = agent.run_stream(task="请写一首关于春天的短诗")
    await Console(stream)
    
    # 方式2:手动处理流式输出
    print("\n=== 方式2:手动处理 ===")
    async for message in agent.run_stream(task="解释什么是递归"):
        # message可能是各种类型
        if hasattr(message, 'content'):
            # 文本消息
            if isinstance(message.content, str):
                print(message.content, end="", flush=True)
            # 多模态内容
            elif isinstance(message.content, list):
                for item in message.content:
                    print(item, end="", flush=True)
    
    print()  # 换行

asyncio.run(streaming_demo())

Console组件详解

from autogen_agentchat.ui import Console

# Console是一个便捷的流式输出显示工具
async def console_usage():
    """Console组件使用示例"""
    
    # 基本用法
    stream = team.run_stream(task="你的任务")
    await Console(stream)
    
    # Console会自动处理不同类型的消息:
    # - TextMessage: 显示文本内容
    # - ToolCallEvent: 显示工具调用
    # - ToolCallExecutionEvent: 显示工具执行结果
    # - HandoffMessage: 显示Agent切换
    # - StopMessage: 显示终止信息

3.5 RequestUsage:Token使用量跟踪

from dataclasses import dataclass
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

@dataclass
class RequestUsage:
    """Token使用量记录"""
    prompt_tokens: int      # 输入token数
    completion_tokens: int  # 输出token数
    total_tokens: int       # 总token数

async def track_token_usage():
    """
    Token使用量跟踪示例
    
    展示如何监控和统计API调用成本
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    agent = AssistantAgent(name="assistant", model_client=model_client)
    
    # 运行并获取结果
    result = await agent.run(task="解释量子计算的基本原理")
    
    # 从结果中提取token使用量
    total_prompt = 0
    total_completion = 0
    
    for message in result.messages:
        if hasattr(message, 'models_usage') and message.models_usage:
            usage = message.models_usage
            total_prompt += usage.prompt_tokens
            total_completion += usage.completion_tokens
            print(f"消息使用: 输入={usage.prompt_tokens}, 输出={usage.completion_tokens}")
    
    print(f"\n总计: 输入={total_prompt}, 输出={total_completion}, 总计={total_prompt + total_completion}")
    
    # 成本估算(以GPT-4o为例)
    # 输入: $2.5/1M tokens, 输出: $10/1M tokens
    cost = (total_prompt * 2.5 + total_completion * 10) / 1_000_000
    print(f"估算成本: ${cost:.6f}")

asyncio.run(track_token_usage())

Token计数器类

class TokenCounter:
    """
    Token计数和成本跟踪工具
    
    功能:
    - 统计总token使用量
    - 估算API成本
    - 生成使用报告
    
    注意:定价仅供参考,请以OpenAI官方最新价格为准
    """
    
    # 模型定价(美元/百万token,仅供参考)
    PRICING = {
        "gpt-4o": {"input": 2.5, "output": 10.0},
        "gpt-4-turbo": {"input": 10.0, "output": 30.0},
        "gpt-3.5-turbo": {"input": 0.5, "output": 1.5},
    }
    
    def __init__(self, model: str = "gpt-4o"):
        """
        初始化计数器
        
        参数:
            model: 模型名称,用于成本计算
        """
        self.model = model
        self.prompt_tokens = 0
        self.completion_tokens = 0
        self.request_count = 0
    
    def add_usage(self, prompt: int, completion: int):
        """
        添加使用记录
        
        参数:
            prompt: 输入token数
            completion: 输出token数
        """
        self.prompt_tokens += prompt
        self.completion_tokens += completion
        self.request_count += 1
    
    def calculate_cost(self) -> float:
        """
        计算总成本
        
        返回:
            总成本(美元)
        """
        pricing = self.PRICING.get(self.model, {"input": 0, "output": 0})
        cost = (
            self.prompt_tokens * pricing["input"] +
            self.completion_tokens * pricing["output"]
        ) / 1_000_000
        return cost
    
    def report(self) -> str:
        """
        生成使用报告
        
        返回:
            格式化的报告字符串
        """
        return f"""
=== Token使用报告 ===
模型: {self.model}
请求次数: {self.request_count}
输入Token: {self.prompt_tokens:,}
输出Token: {self.completion_tokens:,}
总Token: {self.prompt_tokens + self.completion_tokens:,}
估算成本: ${self.calculate_cost():.4f}
====================
"""

# 使用示例
counter = TokenCounter("gpt-4o")
counter.add_usage(100, 50)
counter.add_usage(200, 100)
print(counter.report())

3.6 实战场景:多模型切换与成本控制

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from typing import Optional

class CostAwareAgent:
    """
    成本感知Agent
    
    功能:
    - 根据任务复杂度选择合适模型
    - 监控token使用量
    - 在预算范围内运行
    """
    
    def __init__(
        self,
        name: str,
        fast_model: str = "gpt-3.5-turbo",
        smart_model: str = "gpt-4o",
        budget_limit: float = 1.0,  # 美元
    ):
        """
        初始化成本感知Agent
        
        参数:
            name: Agent名称
            fast_model: 快速/便宜模型
            smart_model: 智能/昂贵模型
            budget_limit: 预算上限
        """
        self.name = name
        self.fast_client = OpenAIChatCompletionClient(model=fast_model)
        self.smart_client = OpenAIChatCompletionClient(model=smart_model)
        self.budget_limit = budget_limit
        self.total_cost = 0
        self.token_counter = TokenCounter(smart_model)
    
    def _estimate_complexity(self, task: str) -> str:
        """
        评估任务复杂度
        
        参数:
            task: 任务描述
        
        返回:
            'simple' 或 'complex'
        """
        # 简单启发式规则
        complex_keywords = ["分析", "设计", "架构", "优化", "重构", "算法"]
        if any(kw in task for kw in complex_keywords):
            return "complex"
        if len(task) > 500:
            return "complex"
        return "simple"
    
    async def run(self, task: str) -> str:
        """
        运行任务,自动选择模型
        
        参数:
            task: 任务描述
        
        返回:
            结果字符串
        """
        # 检查预算
        if self.total_cost >= self.budget_limit:
            return "错误:已超出预算限制"
        
        # 选择模型
        complexity = self._estimate_complexity(task)
        client = self.smart_client if complexity == "complex" else self.fast_client
        
        print(f"任务复杂度: {complexity}, 使用模型: {client.model}")
        
        # 创建Agent并运行
        agent = AssistantAgent(name=self.name, model_client=client)
        result = await agent.run(task=task)
        
        # 更新成本
        for msg in result.messages:
            if hasattr(msg, 'models_usage') and msg.models_usage:
                self.token_counter.add_usage(
                    msg.models_usage.prompt_tokens,
                    msg.models_usage.completion_tokens
                )
        
        self.total_cost = self.token_counter.calculate_cost()
        print(f"当前成本: ${self.total_cost:.4f} / ${self.budget_limit:.2f}")
        
        return result.messages[-1].content

# 使用示例
async def main():
    agent = CostAwareAgent(
        name="smart_assistant",
        budget_limit=0.5
    )
    
    # 简单任务 -> 使用快速模型
    await agent.run("1+1等于多少?")
    
    # 复杂任务 -> 使用智能模型
    await agent.run("请设计一个分布式缓存系统的架构,考虑一致性、可用性和分区容错性")

asyncio.run(main())

第四章:Team团队机制与协作模式

4.1 Team核心概念

Team是AutoGen中管理多Agent协作的核心抽象:

终止条件

消息流

Team结构

满足

不满足

Team团队

Agent 1

Agent 2

Agent 3

TerminationCondition

消息1

消息2

消息3

检查条件

终止

继续

核心概念

  • Team:Agent的容器,管理Agent间的消息传递
  • Participant:参与协作的Agent
  • TerminationCondition:终止条件,决定何时结束对话
  • Selector:发言者选择器,决定下一个发言的Agent

4.2 Team.run()方法详解

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def team_run_demo():
    """
    Team.run()方法详解
    
    参数:
        task: 任务描述(字符串或消息对象)
        cancellation_token: 取消令牌
        output_task_messages: 是否输出任务消息
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    # 创建多个Agent
    writer = AssistantAgent(
        name="writer",
        model_client=model_client,
        system_message="你是一个技术文档撰写者,负责编写清晰的技术文档。"
    )
    
    reviewer = AssistantAgent(
        name="reviewer",
        model_client=model_client,
        system_message="你是一个技术审核者,负责检查文档的准确性和完整性。"
    )
    
    # 创建Team
    team = RoundRobinGroupChat(
        participants=[writer, reviewer],
        termination_condition=MaxMessageTermination(max_messages=6)
    )
    
    # 方式1:同步运行(等待完成)
    print("=== 同步运行 ===")
    result = await team.run(
        task="请写一段关于Python装饰器的技术文档",
        # cancellation_token=None,  # 可选:取消令牌
        # output_task_messages=True,  # 可选:是否输出任务消息
    )
    
    # 处理结果
    print(f"停止原因: {result.stop_reason}")
    print(f"消息数量: {len(result.messages)}")
    for msg in result.messages:
        print(f"[{msg.source}]: {msg.content[:100]}...")

asyncio.run(team_run_demo())

4.3 Team.run_stream()方法详解

from autogen_agentchat.ui import Console

async def team_stream_demo():
    """
    Team.run_stream()方法详解
    
    返回异步生成器,实时输出消息
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    writer = AssistantAgent(name="writer", model_client=model_client)
    reviewer = AssistantAgent(name="reviewer", model_client=model_client)
    
    team = RoundRobinGroupChat(
        participants=[writer, reviewer],
        termination_condition=MaxMessageTermination(max_messages=4)
    )
    
    # 方式1:使用Console组件(推荐)
    print("=== 使用Console ===")
    stream = team.run_stream(task="解释什么是RESTful API")
    await Console(stream)
    
    # 方式2:手动处理流
    print("\n=== 手动处理流 ===")
    async for message in team.run_stream(task="解释什么是GraphQL"):
        # 根据消息类型处理
        message_type = type(message).__name__
        print(f"[{message_type}] ", end="")
        
        if hasattr(message, 'source'):
            print(f"来自: {message.source}")
        if hasattr(message, 'content'):
            content = message.content
            if isinstance(content, str):
                print(f"内容: {content[:100]}...")

asyncio.run(team_stream_demo())

4.4 Team.reset()方法

async def team_reset_demo():
    """
    Team.reset()方法演示
    
    重置Team状态,清除对话历史
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    agent = AssistantAgent(name="assistant", model_client=model_client)
    team = RoundRobinGroupChat(participants=[agent])
    
    # 第一次运行
    result1 = await team.run(task="你好")
    print(f"第一次运行消息数: {len(result1.messages)}")
    
    # 重置Team
    await team.reset()
    print("Team已重置")
    
    # 第二次运行(历史已清除)
    result2 = await team.run(task="你好")
    print(f"第二次运行消息数: {len(result2.messages)}")

asyncio.run(team_reset_demo())

4.5 TaskResult返回值详解

from dataclasses import dataclass
from typing import List, Optional
from autogen_agentchat.messages import BaseChatMessage

@dataclass
class TaskResult:
    """
    Team.run()的返回值
    
    属性:
        messages: 完整的消息历史
        stop_reason: 停止原因描述
    """
    messages: List[BaseChatMessage]
    stop_reason: Optional[str] = None

async def task_result_demo():
    """TaskResult详解"""
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    agent = AssistantAgent(name="assistant", model_client=model_client)
    team = RoundRobinGroupChat(
        participants=[agent],
        termination_condition=MaxMessageTermination(max_messages=2)
    )
    
    result = await team.run(task="你好")
    
    # 访问消息
    print("=== 所有消息 ===")
    for i, msg in enumerate(result.messages):
        print(f"{i+1}. [{msg.source}]: {msg.content}")
    
    # 停止原因
    print(f"\n停止原因: {result.stop_reason}")
    
    # 最后一条消息(通常是最终结果)
    last_message = result.messages[-1]
    print(f"\n最终回复: {last_message.content}")

asyncio.run(task_result_demo())

4.6 RoundRobinGroupChat轮询协作模式

RoundRobinGroupChat按顺序轮流让Agent发言:

Agent3 Agent2 Agent1 User Agent3 Agent2 Agent1 User 终止条件满足 任务 消息 消息 消息 消息 最终结果
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination

async def round_robin_demo():
    """
    RoundRobinGroupChat示例
    
    场景:代码编写-审核-优化流程
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    # 定义角色
    coder = AssistantAgent(
        name="coder",
        model_client=model_client,
        system_message="""你是程序员,负责编写代码。
        编写完成后说'请审核'。"""
    )
    
    reviewer = AssistantAgent(
        name="reviewer",
        model_client=model_client,
        system_message="""你是代码审核者,负责检查代码质量。
        如果代码没问题说'通过',否则指出问题。"""
    )
    
    optimizer = AssistantAgent(
        name="optimizer",
        model_client=model_client,
        system_message="""你是优化专家,负责优化代码性能和可读性。
        优化完成后说'完成'。"""
    )
    
    # 创建轮询团队
    team = RoundRobinGroupChat(
        participants=[coder, reviewer, optimizer],
        termination_condition=TextMentionTermination("完成")
    )
    
    # 运行
    stream = team.run_stream(
        task="写一个Python函数,计算列表中所有偶数的和"
    )
    await Console(stream)

asyncio.run(round_robin_demo())

4.7 SelectorGroupChat智能选择模式

SelectorGroupChat使用LLM智能选择下一个发言者:

from autogen_agentchat.teams import SelectorGroupChat

async def selector_group_demo():
    """
    SelectorGroupChat示例
    
    场景:根据问题类型选择合适的专家
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    # 定义专家角色
    math_expert = AssistantAgent(
        name="math_expert",
        model_client=model_client,
        system_message="你是数学专家,回答数学相关问题。",
        description="擅长数学计算和数学概念解释"
    )
    
    code_expert = AssistantAgent(
        name="code_expert",
        model_client=model_client,
        system_message="你是编程专家,回答编程相关问题。",
        description="擅长编程、代码调试和软件开发"
    )
    
    general_expert = AssistantAgent(
        name="general_expert",
        model_client=model_client,
        system_message="你是通用知识专家,回答一般性问题。",
        description="擅长回答一般性问题和常识"
    )
    
    # 创建选择器团队
    team = SelectorGroupChat(
        participants=[math_expert, code_expert, general_expert],
        model_client=model_client,  # 用于选择发言者的模型
        termination_condition=MaxMessageTermination(max_messages=6),
    )
    
    # 运行:LLM会根据问题选择合适的专家
    stream = team.run_stream(task="如何用Python实现快速排序算法?")
    await Console(stream)

asyncio.run(selector_group_demo())

4.8 GroupChat参数详解

from autogen_agentchat.teams import RoundRobinGroupChat

team = RoundRobinGroupChat(
    # === 必需参数 ===
    participants=[agent1, agent2],  # 参与者列表
    
    # === 终止条件 ===
    termination_condition=TextMentionTermination("完成"),
    
    # === 消息限制 ===
    max_turns=10,  # 最大轮次(可选)
)

# 参数说明:
# participants: 参与协作的Agent列表
# termination_condition: 终止条件对象
# max_turns: 最大对话轮次(额外的终止条件)

4.9 发言者选择策略

from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.base import Selector

async def custom_selector_demo():
    """
    自定义发言者选择策略
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    agent1 = AssistantAgent(name="agent1", model_client=model_client)
    agent2 = AssistantAgent(name="agent2", model_client=model_client)
    
    # 自定义选择函数
    async def custom_selector(messages, agents):
        """
        自定义发言者选择逻辑
        
        参数:
            messages: 消息历史
            agents: 可用Agent列表
        
        返回:
            选择的Agent名称
        """
        last_message = messages[-1] if messages else None
        
        if last_message and "代码" in last_message.content:
            return "agent1"  # 代码相关选择agent1
        else:
            return "agent2"  # 其他选择agent2
    
    team = SelectorGroupChat(
        participants=[agent1, agent2],
        selector=custom_selector,
    )

4.10 实战场景:构建多角色协作团队

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import TextMentionTermination, MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def software_development_team():
    """
    软件开发团队协作示例
    
    角色:
    - 产品经理:需求分析
    - 架构师:系统设计
    - 开发者:代码实现
    - 测试工程师:测试验证
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    # 产品经理
    product_manager = AssistantAgent(
        name="product_manager",
        model_client=model_client,
        system_message="""你是产品经理,负责:
        1. 分析用户需求
        2. 定义产品功能
        3. 编写需求文档
        
        完成需求分析后说'需求已明确'。""",
        description="产品经理,擅长需求分析和产品规划"
    )
    
    # 架构师
    architect = AssistantAgent(
        name="architect",
        model_client=model_client,
        system_message="""你是系统架构师,负责:
        1. 设计系统架构
        2. 选择技术栈
        3. 定义模块划分
        
        完成架构设计后说'架构已设计'。""",
        description="系统架构师,擅长系统设计和技术选型"
    )
    
    # 开发者
    developer = AssistantAgent(
        name="developer",
        model_client=model_client,
        system_message="""你是高级开发者,负责:
        1. 实现核心功能
        2. 编写高质量代码
        3. 处理技术细节
        
        完成开发后说'开发完成'。""",
        description="高级开发者,擅长代码实现"
    )
    
    # 测试工程师
    tester = AssistantAgent(
        name="tester",
        model_client=model_client,
        system_message="""你是测试工程师,负责:
        1. 设计测试用例
        2. 验证功能正确性
        3. 报告问题
        
        测试通过后说'测试通过,项目完成'。""",
        description="测试工程师,擅长质量保证"
    )
    
    # 创建团队
    team = SelectorGroupChat(
        participants=[product_manager, architect, developer, tester],
        model_client=model_client,
        termination_condition=TextMentionTermination("项目完成") | MaxMessageTermination(20),
    )
    
    # 运行项目
    print("=== 软件开发团队启动 ===\n")
    stream = team.run_stream(
        task="开发一个简单的待办事项(Todo)应用,支持添加、删除、标记完成功能"
    )
    await Console(stream)

asyncio.run(software_development_team())

第五章:工具系统与代码执行

5.1 Tool定义与FunctionTool参数

AutoGen使用FunctionTool封装可调用的工具函数:

from autogen_core.tools import FunctionTool
from typing import Annotated

# 方式1:使用装饰器定义
@FunctionTool.from_function
def get_weather(
    city: Annotated[str, "城市名称,如'北京'、'上海'"]
) -> str:
    """
    获取指定城市的天气信息
    
    参数:
        city: 城市名称
    
    返回:
        天气信息字符串
    """
    # 模拟天气API调用
    weather_data = {
        "北京": "晴天,温度15°C",
        "上海": "多云,温度18°C",
        "广州": "小雨,温度22°C",
    }
    return weather_data.get(city, f"未找到{city}的天气信息")

# 方式2:直接创建FunctionTool
def calculate_sum(a: int, b: int) -> int:
    """计算两个数的和"""
    return a + b

sum_tool = FunctionTool(
    name="calculate_sum",
    description="计算两个整数的和",
    func=calculate_sum,
    global_imports=[],  # 全局导入(用于序列化)
)

# 方式3:使用Annotated增强参数描述
from typing import Annotated

def search_web(
    query: Annotated[str, "搜索关键词"],
    max_results: Annotated[int, "最大返回结果数,默认5"] = 5,
    language: Annotated[str, "语言代码,如'zh'、'en'"] = "zh"
) -> str:
    """
    在网络上搜索信息
    
    参数:
        query: 搜索关键词
        max_results: 最大返回结果数
        language: 搜索语言
    
    返回:
        搜索结果字符串
    """
    # 模拟搜索
    return f"搜索'{query}',找到{max_results}{language}结果"

search_tool = FunctionTool.from_function(search_web)

FunctionTool参数详解

参数 类型 说明
name str 工具名称,用于LLM调用
description str 工具描述,LLM理解工具用途
func Callable 实际执行的函数
global_imports List[str] 函数依赖的全局导入

5.2 工具注册机制

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_core.tools import FunctionTool
from autogen_ext.models.openai import OpenAIChatCompletionClient

# 定义工具函数
def get_current_time() -> str:
    """获取当前时间"""
    from datetime import datetime
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def calculate(expression: str) -> str:
    """
    计算数学表达式
    
    参数:
        expression: 数学表达式,如'1+2*3'
    """
    try:
        result = eval(expression)
        return f"计算结果: {result}"
    except Exception as e:
        return f"计算错误: {str(e)}"

async def tool_registration_demo():
    """工具注册示例"""
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    # 创建带工具的Agent
    agent = AssistantAgent(
        name="assistant",
        model_client=model_client,
        tools=[
            FunctionTool.from_function(get_current_time),
            FunctionTool.from_function(calculate),
        ],
        # 工具调用后反思(可选)
        reflect_on_tool_use=True,
    )
    
    # 运行:Agent会自动选择调用工具
    result = await agent.run(task="现在几点了?另外帮我算一下123*456等于多少")
    
    for msg in result.messages:
        print(f"[{msg.source}]: {msg.content}")

asyncio.run(tool_registration_demo())

5.3 工具调用事件详解

from autogen_agentchat.messages import ToolCallEvent, ToolCallExecutionEvent

async def tool_event_demo():
    """
    工具调用事件详解
    
    事件类型:
    - ToolCallEvent: LLM发起工具调用请求
    - ToolCallExecutionEvent: 工具执行完成
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    agent = AssistantAgent(
        name="assistant",
        model_client=model_client,
        tools=[FunctionTool.from_function(get_current_time)],
    )
    
    # 使用流式输出查看事件
    async for message in agent.run_stream(task="现在几点了?"):
        message_type = type(message).__name__
        
        if isinstance(message, ToolCallEvent):
            # 工具调用请求
            print(f"=== 工具调用请求 ===")
            for call in message.content:
                print(f"工具名: {call.name}")
                print(f"参数: {call.arguments}")
        
        elif isinstance(message, ToolCallExecutionEvent):
            # 工具执行结果
            print(f"=== 工具执行结果 ===")
            for result in message.content:
                print(f"调用ID: {result.call_id}")
                print(f"结果: {result.content}")
        
        elif hasattr(message, 'content') and isinstance(message.content, str):
            # 最终回复
            print(f"=== 最终回复 ===")
            print(message.content)

asyncio.run(tool_event_demo())

5.4 CodeExecutor体系详解

CodeExecutor用于执行LLM生成的代码:

执行流程

本地

容器

LLM生成代码

CodeExecutor接收

执行环境

LocalCommandLine

Docker容器

返回结果

CodeExecutor体系

BaseCodeExecutor

LocalCommandLineCodeExecutor

DockerCommandLineCodeExecutor

5.5 LocalCommandLineCodeExecutor vs DockerCommandLineCodeExecutor

from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor

# 本地代码执行器(开发测试用)
local_executor = LocalCommandLineCodeExecutor(
    work_dir="./workspace",      # 工作目录
    timeout=60,                   # 执行超时(秒)
    functions=[],                 # 可用函数列表
)

# Docker代码执行器(生产环境推荐)
docker_executor = DockerCommandLineCodeExecutor(
    image="python:3.11-slim",    # Docker镜像
    work_dir="/workspace",        # 容器内工作目录
    timeout=60,
    bind_dir="./workspace",       # 宿主机绑定目录
)

Local vs Docker 对比

特性 LocalCommandLine DockerCommandLine
安全性 低(直接执行) 高(隔离环境)
性能 中等
依赖管理 使用系统环境 使用镜像环境
适用场景 开发测试 生产环境

5.6 代码执行安全配置

import asyncio
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def secure_code_execution():
    """
    安全代码执行示例
    
    最佳实践:
    1. 使用Docker隔离
    2. 设置资源限制
    3. 限制网络访问
    4. 超时控制
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    # 创建安全的代码执行器
    safe_executor = DockerCommandLineCodeExecutor(
        image="python:3.11-slim",
        work_dir="/workspace",
        timeout=30,              # 30秒超时
        bind_dir="./safe_workspace",
        # 额外的Docker配置
        container_name="autogen_executor",
    )
    
    # 创建代码编写Agent
    coder = AssistantAgent(
        name="coder",
        model_client=model_client,
        system_message="""你是Python程序员。
        编写代码时请:
        1. 只使用标准库
        2. 不进行文件系统操作
        3. 不进行网络请求
        """,
    )
    
    # 创建代码执行Agent
    executor = CodeExecutorAgent(
        name="executor",
        code_executor=safe_executor,
    )
    
    # 创建团队
    team = RoundRobinGroupChat(
        participants=[coder, executor],
        termination_condition=MaxMessageTermination(6)
    )
    
    # 运行
    stream = team.run_stream(task="写一个计算斐波那契数列第10项的代码并执行")
    await Console(stream)

asyncio.run(secure_code_execution())

5.7 实战场景:构建代码生成与执行Agent

import asyncio
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def code_generation_and_execution():
    """
    完整的代码生成与执行流程
    
    流程:
    1. 用户提出需求
    2. Coder生成代码
    3. Executor执行代码
    4. Reviewer检查结果
    5. 如有问题,Coder修复
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    # 代码执行器
    executor = LocalCommandLineCodeExecutor(
        work_dir="./code_workspace",
        timeout=60
    )
    
    # 代码编写者
    coder = AssistantAgent(
        name="coder",
        model_client=model_client,
        system_message="""你是资深Python开发者。
        
        任务:
        1. 根据需求编写Python代码
        2. 代码需要完整可执行
        3. 包含必要的注释
        4. 处理可能的异常
        
        格式要求:
        ```python
        # 你的代码
        ```
        
        如果执行出错,请分析错误并修复代码。
        完成后说'代码已完成'。
        """,
    )
    
    # 代码执行者
    code_executor = CodeExecutorAgent(
        name="executor",
        code_executor=executor,
    )
    
    # 代码审核者
    reviewer = AssistantAgent(
        name="reviewer",
        model_client=model_client,
        system_message="""你是代码审核专家。
        
        检查:
        1. 代码是否正确执行
        2. 结果是否符合预期
        3. 是否有潜在问题
        
        如果一切正常说'审核通过'。
        如果有问题,指出需要修复的地方。
        """,
    )
    
    # 创建团队
    team = RoundRobinGroupChat(
        participants=[coder, code_executor, reviewer],
        termination_condition=TextMentionTermination("审核通过") | TextMentionTermination("代码已完成")
    )
    
    # 执行任务
    print("=== 代码生成与执行系统 ===\n")
    stream = team.run_stream(
        task="编写一个Python程序:读取一个数字列表,计算平均值、最大值、最小值,并输出结果。测试数据:[1, 5, 3, 9, 2, 7]"
    )
    await Console(stream)

asyncio.run(code_generation_and_execution())

第六章:终止条件与任务控制

6.1 TerminationCondition体系概述

«abstract»

TerminationCondition

+terminated: bool

+reset() : void

TextMentionTermination

+text: str

MaxMessageTermination

+max_messages: int

SourceTermination

+sources: List<str>

ExternalTermination

+set() : void

HandoffTermination

+target: str

StopMessageTermination

6.2 内置终止条件详解

from autogen_agentchat.conditions import (
    TextMentionTermination,
    MaxMessageTermination,
    SourceTermination,
    ExternalTermination,
    HandoffTermination,
    StopMessageTermination,
)

# 1. TextMentionTermination - 文本触发终止
text_termination = TextMentionTermination(
    text="完成",  # 当消息中出现此文本时终止
)

# 2. MaxMessageTermination - 消息数量限制
max_msg_termination = MaxMessageTermination(
    max_messages=10,  # 达到10条消息时终止
)

# 3. SourceTermination - 特定Agent终止
source_termination = SourceTermination(
    sources=["user"],  # 当user发言后终止
)

# 4. ExternalTermination - 外部控制终止
external_termination = ExternalTermination()
# 在外部调用 external_termination.set() 来终止

# 5. HandoffTermination - Handoff触发终止
handoff_termination = HandoffTermination(
    target="user",  # 当handoff到user时终止
)

# 6. StopMessageTermination - StopMessage消息终止
stop_termination = StopMessageTermination()
# 当Agent返回StopMessage时终止

6.3 组合终止条件

from autogen_agentchat.conditions import (
    TextMentionTermination,
    MaxMessageTermination,
    SourceTermination,
    OrTerminationCondition,
    AndTerminationCondition,
)

# OR条件:任一条件满足即终止
or_termination = (
    TextMentionTermination("完成") |
    TextMentionTermination("结束") |
    MaxMessageTermination(20)
)

# AND条件:所有条件都满足才终止
and_termination = (
    SourceTermination(["reviewer"]) &
    TextMentionTermination("审核通过")
)

# 复杂组合
complex_termination = (
    (TextMentionTermination("完成") | TextMentionTermination("成功")) &
    MaxMessageTermination(50)
)

# 使用示例
async def combined_termination_demo():
    """组合终止条件示例"""
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    agent = AssistantAgent(
        name="assistant",
        model_client=model_client,
        system_message="完成任务后说'完成',或者说出'放弃'。"
    )
    
    team = RoundRobinGroupChat(
        participants=[agent],
        termination_condition=(
            TextMentionTermination("完成") |
            TextMentionTermination("放弃") |
            MaxMessageTermination(10)
        )
    )
    
    result = await team.run(task="请介绍一下你自己")
    print(f"停止原因: {result.stop_reason}")

asyncio.run(combined_termination_demo())

6.4 自定义终止条件开发

from autogen_agentchat.conditions import TerminationCondition
from autogen_agentchat.messages import BaseChatMessage
from typing import Sequence

class KeywordCountTermination(TerminationCondition):
    """
    自定义终止条件:关键词出现次数
    
    当特定关键词累计出现指定次数时终止
    """
    
    def __init__(self, keyword: str, count: int):
        """
        初始化
        
        参数:
            keyword: 监控的关键词
            count: 触发终止的出现次数
        """
        self._keyword = keyword
        self._target_count = count
        self._current_count = 0
        self._terminated = False
    
    @property
    def terminated(self) -> bool:
        """是否已终止"""
        return self._terminated
    
    async def __call__(
        self, 
        messages: Sequence[BaseChatMessage]
    ) -> bool:
        """
        检查是否应该终止
        
        参数:
            messages: 消息序列
        
        返回:
            是否终止
        """
        for message in messages:
            if hasattr(message, 'content'):
                content = message.content
                if isinstance(content, str):
                    self._current_count += content.count(self._keyword)
        
        if self._current_count >= self._target_count:
            self._terminated = True
        
        return self._terminated
    
    async def reset(self) -> None:
        """重置状态"""
        self._current_count = 0
        self._terminated = False

# 使用自定义终止条件
async def custom_termination_demo():
    """自定义终止条件示例"""
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    agent = AssistantAgent(name="assistant", model_client=model_client)
    
    # 当"好"字出现3次时终止
    termination = KeywordCountTermination(keyword="好", count=3)
    
    team = RoundRobinGroupChat(
        participants=[agent],
        termination_condition=termination
    )
    
    result = await team.run(task="请说三句包含'好'字的句子")
    print(f"停止原因: '好'字出现了3次")

asyncio.run(custom_termination_demo())

6.5 CancellationToken:任务取消机制

import asyncio
from autogen_core import CancellationToken

async def cancellation_demo():
    """
    CancellationToken使用示例
    
    用途:
    1. 用户主动取消任务
    2. 超时自动取消
    3. 外部事件触发取消
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    agent = AssistantAgent(name="assistant", model_client=model_client)
    
    # 创建取消令牌
    cancellation_token = CancellationToken()
    
    # 方式1:超时取消
    try:
        result = await asyncio.wait_for(
            agent.run(
                task="请写一篇很长的文章",
                cancellation_token=cancellation_token
            ),
            timeout=5.0  # 5秒超时
        )
    except asyncio.TimeoutError:
        cancellation_token.cancel()  # 触发取消
        print("任务超时已取消")
    
    # 方式2:手动取消
    async def run_with_manual_cancel():
        """支持手动取消的任务"""
        token = CancellationToken()
        
        # 在另一个协程中取消
        async def cancel_after_delay():
            await asyncio.sleep(2)
            token.cancel()
            print("已发送取消信号")
        
        # 并发运行
        asyncio.create_task(cancel_after_delay())
        
        try:
            result = await agent.run(
                task="执行任务",
                cancellation_token=token
            )
        except asyncio.CancelledError:
            print("任务被取消")

asyncio.run(cancellation_demo())

6.6 Team暂停、恢复与终止控制

import asyncio
from autogen_agentchat.conditions import ExternalTermination

async def team_control_demo():
    """
    Team运行控制示例
    
    展示如何暂停、恢复和终止Team
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    agent = AssistantAgent(name="assistant", model_client=model_client)
    
    # 使用ExternalTermination进行外部控制
    external_termination = ExternalTermination()
    
    team = RoundRobinGroupChat(
        participants=[agent],
        termination_condition=external_termination
    )
    
    # 在后台运行Team
    async def run_team():
        async for message in team.run_stream(task="持续对话"):
            print(f"[{message.source if hasattr(message, 'source') else 'system'}]: {message}")
    
    # 控制协程
    async def control_team():
        await asyncio.sleep(3)  # 等待3秒
        print("\n>>> 触发终止 <<<")
        external_termination.set()  # 终止Team
    
    # 并发运行
    await asyncio.gather(
        run_team(),
        control_team()
    )

asyncio.run(team_control_demo())

6.7 实战场景:精确控制对话结束时机

import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import (
    TextMentionTermination,
    MaxMessageTermination,
    SourceTermination,
    HandoffTermination
)
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def precise_termination_demo():
    """
    精确控制对话结束时机
    
    场景:客服对话系统
    - 用户说"再见"时结束
    - 或对话超过20轮时结束
    - 或转人工时结束
    - 或客服确认问题解决时结束
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    # AI客服
    ai_support = AssistantAgent(
        name="ai_support",
        model_client=model_client,
        system_message="""你是智能客服。
        职责:
        1. 解答用户问题
        2. 无法解决时转人工
        
        问题解决后说'问题已解决'。
        需要人工时说'转人工客服'。
        """,
        handoffs=["human_support"]  # 可转交人工
    )
    
    # 人工客服(模拟)
    human_support = UserProxyAgent(
        name="human_support",
        human_input_mode="ALWAYS",
    )
    
    # 用户
    user = UserProxyAgent(
        name="user",
        human_input_mode="ALWAYS",
    )
    
    # 组合终止条件
    termination = (
        TextMentionTermination("再见") |          # 用户说再见
        TextMentionTermination("问题已解决") |    # 问题解决
        MaxMessageTermination(30) |               # 最多30轮
        HandoffTermination(target="human_support") # 转人工时终止
    )
    
    # 创建团队
    team = SelectorGroupChat(
        participants=[user, ai_support, human_support],
        model_client=model_client,
        termination_condition=termination,
    )
    
    print("=== 客服系统启动 ===")
    print("输入'再见'结束对话\n")
    
    stream = team.run_stream(task="您好,请问有什么可以帮助您的?")
    await Console(stream)

asyncio.run(precise_termination_demo())

第七章:消息体系与事件机制

7.1 消息继承体系:BaseChatMessage基类

«abstract»

BaseChatMessage

+source: str

+models_usage: RequestUsage

TextMessage

+content: str

MultiModalMessage

+content: List<MultiModalContent>

HandoffMessage

+target: str

+context: str

ToolCallMessage

+content: List<FunctionCall>

StopMessage

+content: str

消息类型详解

消息类型 用途 content类型
TextMessage 文本消息 str
MultiModalMessage 多模态消息(文本+图片) List[MultiModalContent]
HandoffMessage Agent间任务转交 包含target和context
ToolCallMessage 工具调用请求 List[FunctionCall]
StopMessage 终止信号 str

7.2 消息类型详解

from autogen_agentchat.messages import (
    TextMessage,
    MultiModalMessage,
    HandoffMessage,
    ToolCallMessage,
    StopMessage,
)
from autogen_core import Image

async def message_types_demo():
    """
    各种消息类型使用示例
    """
    
    # 1. TextMessage - 文本消息
    text_msg = TextMessage(
        content="这是一条文本消息",
        source="user"
    )
    
    # 2. MultiModalMessage - 多模态消息
    from PIL import Image as PILImage
    import io
    
    # 创建示例图片
    pil_image = PILImage.new('RGB', (100, 100), color='red')
    ag_image = Image(pil_image)
    
    multimodal_msg = MultiModalMessage(
        content=[
            "请描述这张图片:",
            ag_image
        ],
        source="user"
    )
    
    # 3. HandoffMessage - 任务转交
    handoff_msg = HandoffMessage(
        target="expert_agent",
        context="用户需要专业分析",
        source="coordinator"
    )
    
    # 4. StopMessage - 终止信号
    stop_msg = StopMessage(
        content="任务已完成",
        source="assistant"
    )
    
    print(f"文本消息: {text_msg.content}")
    print(f"多模态消息: {multimodal_msg.content}")
    print(f"转交消息: 转交给 {handoff_msg.target}")
    print(f"终止消息: {stop_msg.content}")

asyncio.run(message_types_demo())

7.3 多模态消息处理

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import MultiModalMessage
from autogen_core import Image
from autogen_ext.models.openai import OpenAIChatCompletionClient
from PIL import Image as PILImage

async def multimodal_demo():
    """
    多模态消息处理示例
    
    展示如何发送图片给Agent进行分析
    """
    
    # 使用支持视觉的模型
    model_client = OpenAIChatCompletionClient(
        model="gpt-4o",  # 支持视觉
    )
    
    agent = AssistantAgent(
        name="vision_assistant",
        model_client=model_client,
        system_message="你是一个图像分析专家,能够识别和描述图片内容。"
    )
    
    # 加载本地图片
    pil_image = PILImage.open("./example_image.png")
    ag_image = Image(pil_image)
    
    # 创建多模态消息
    multimodal_msg = MultiModalMessage(
        content=[
            "请分析这张图片,描述其中的内容:",
            ag_image
        ],
        source="user"
    )
    
    # 运行Agent
    result = await agent.run(task=multimodal_msg)
    print(result.messages[-1].content)

asyncio.run(multimodal_demo())

7.4 Response对象详解

from dataclasses import dataclass
from typing import List, Optional
from autogen_agentchat.messages import BaseChatMessage, BaseAgentEvent

@dataclass
class Response:
    """
    Agent响应对象
    
    属性:
        chat_message: 主要输出消息,发送给其他Agent或用户
        inner_messages: 内部事件列表,用于UI展示或调试
        terminate: 是否请求终止对话
    """
    chat_message: BaseChatMessage
    inner_messages: Optional[List[BaseAgentEvent]] = None
    terminate: bool = False

async def response_demo():
    """Response对象使用示例"""
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    agent = AssistantAgent(
        name="assistant",
        model_client=model_client,
        tools=[FunctionTool.from_function(lambda x: x)],  # 示例工具
    )
    
    # 使用on_messages获取Response
    from autogen_agentchat.messages import TextMessage
    from autogen_core import CancellationToken
    
    response = await agent.on_messages(
        messages=[TextMessage(content="你好", source="user")],
        cancellation_token=CancellationToken()
    )
    
    # 访问主要消息
    print(f"主要消息: {response.chat_message.content}")
    
    # 访问内部事件
    if response.inner_messages:
        print(f"内部事件数量: {len(response.inner_messages)}")
        for event in response.inner_messages:
            print(f"事件类型: {type(event).__name__}")
    
    # 检查终止标志
    print(f"是否终止: {response.terminate}")

asyncio.run(response_demo())

7.5 inner_messages机制与UI集成

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import ToolCallEvent, ToolCallExecutionEvent
from autogen_core.tools import FunctionTool
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def inner_messages_ui_demo():
    """
    inner_messages机制示例
    
    展示如何利用inner_messages实现UI实时更新
    """
    
    # 定义工具
    def search(query: str) -> str:
        """搜索工具"""
        return f"搜索结果: {query}"
    
    def analyze(data: str) -> str:
        """分析工具"""
        return f"分析结果: {data}"
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    agent = AssistantAgent(
        name="assistant",
        model_client=model_client,
        tools=[
            FunctionTool.from_function(search),
            FunctionTool.from_function(analyze),
        ],
    )
    
    # 模拟UI事件处理
    async def handle_ui_events():
        """处理UI事件"""
        async for message in agent.run_stream(
            task="搜索AutoGen相关信息并分析"
        ):
            # 根据消息类型更新UI
            if isinstance(message, ToolCallEvent):
                # 工具调用开始 - 显示加载状态
                for call in message.content:
                    print(f"[UI] 正在调用工具: {call.name}")
                    print(f"[UI] 参数: {call.arguments}")
            
            elif isinstance(message, ToolCallExecutionEvent):
                # 工具调用完成 - 显示结果
                for result in message.content:
                    print(f"[UI] 工具返回: {result.content}")
            
            elif hasattr(message, 'content') and isinstance(message.content, str):
                # 最终回复 - 显示在聊天框
                print(f"[UI] Agent回复: {message.content}")
    
    await handle_ui_events()

asyncio.run(inner_messages_ui_demo())

7.6 Agent状态保存与恢复

import asyncio
import json
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def state_management_demo():
    """
    Agent状态保存与恢复示例
    
    展示如何保存Agent状态以便后续恢复
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    agent = AssistantAgent(
        name="assistant",
        model_client=model_client,
    )
    
    # 第一次对话
    print("=== 第一次对话 ===")
    result1 = await agent.run(task="我叫张三,我喜欢编程")
    print(f"回复: {result1.messages[-1].content}")
    
    # 保存状态
    state = await agent.save_state()
    print(f"\n保存的状态: {json.dumps(state, indent=2, ensure_ascii=False)[:200]}...")
    
    # 模拟Agent重启
    new_agent = AssistantAgent(
        name="assistant",
        model_client=model_client,
    )
    
    # 恢复状态
    await new_agent.load_state(state)
    print("\n状态已恢复")
    
    # 第二次对话(新Agent应该记住之前的信息)
    print("\n=== 第二次对话(状态已恢复)===")
    result2 = await new_agent.run(task="我叫什么名字?我喜欢什么?")
    print(f"回复: {result2.messages[-1].content}")

asyncio.run(state_management_demo())

7.7 Handoff机制:Agent间任务传递

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import HandoffTermination, MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def handoff_demo():
    """
    Handoff机制示例
    
    展示Agent间如何通过handoff传递任务
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    # 前台接待
    receptionist = AssistantAgent(
        name="receptionist",
        model_client=model_client,
        system_message="""你是前台接待。
        根据用户需求转交给合适的专家:
        - 数学问题 -> mathematician
        - 编程问题 -> programmer
        
        转交时说明用户需求。
        """,
        handoffs=["mathematician", "programmer"],
        description="前台接待,负责分流用户请求"
    )
    
    # 数学家
    mathematician = AssistantAgent(
        name="mathematician",
        model_client=model_client,
        system_message="你是数学专家,解答数学问题。完成后说'问题已解决'。",
        description="数学专家"
    )
    
    # 程序员
    programmer = AssistantAgent(
        name="programmer",
        model_client=model_client,
        system_message="你是编程专家,解答编程问题。完成后说'问题已解决'。",
        description="编程专家"
    )
    
    # 创建团队
    team = SelectorGroupChat(
        participants=[receptionist, mathematician, programmer],
        model_client=model_client,
        termination_condition=(
            TextMentionTermination("问题已解决") |
            MaxMessageTermination(10)
        ),
    )
    
    print("=== Handoff演示 ===\n")
    stream = team.run_stream(task="请帮我用Python实现一个快速排序算法")
    await Console(stream)

asyncio.run(handoff_demo())

7.8 实战场景:实现长时间运行的多轮任务

import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

class LongRunningTaskManager:
    """
    长时间运行任务管理器
    
    功能:
    1. 支持任务暂停和恢复
    2. 保存对话历史
    3. 支持断点续传
    """
    
    def __init__(self, model_client):
        self.model_client = model_client
        self.agents = {}
        self.team = None
        self.history = []
    
    async def initialize(self):
        """初始化团队"""
        self.agents["assistant"] = AssistantAgent(
            name="assistant",
            model_client=self.model_client,
            system_message="你是项目助手,帮助用户完成长期项目。"
        )
        
        self.agents["user"] = UserProxyAgent(
            name="user",
            human_input_mode="ALWAYS",
        )
        
        self.team = RoundRobinGroupChat(
            participants=[self.agents["user"], self.agents["assistant"]],
            termination_condition=TextMentionTermination("结束会话")
        )
    
    async def run_session(self, initial_task: str = None):
        """运行一个会话"""
        if initial_task:
            stream = self.team.run_stream(task=initial_task)
        else:
            stream = self.team.run_stream()
        
        async for message in stream:
            self.history.append(message)
            yield message
    
    async def save_session(self, filepath: str):
        """保存会话状态"""
        # 保存Agent状态
        states = {}
        for name, agent in self.agents.items():
            states[name] = await agent.save_state()
        
        # 保存到文件
        import json
        with open(filepath, 'w') as f:
            json.dump({
                "agent_states": states,
                "history_count": len(self.history)
            }, f)
        
        print(f"会话已保存到 {filepath}")
    
    async def load_session(self, filepath: str):
        """加载会话状态"""
        import json
        with open(filepath, 'r') as f:
            data = json.load(f)
        
        for name, state in data["agent_states"].items():
            if name in self.agents:
                await self.agents[name].load_state(state)
        
        print(f"会话已从 {filepath} 恢复")

async def long_running_task_demo():
    """长时间运行任务演示"""
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    manager = LongRunningTaskManager(model_client)
    
    await manager.initialize()
    
    print("=== 长期项目助手 ===")
    print("输入'结束会话'来保存并退出\n")
    
    async for message in manager.run_session(
        initial_task="我想开发一个个人博客系统,请帮我规划"
    ):
        pass
    
    # 保存会话
    await manager.save_session("./session_backup.json")

asyncio.run(long_running_task_demo())

第八章:Core层——事件驱动架构与底层开发

8.1 Core层核心概念

Core层是AutoGen的底层API,提供事件驱动的Agent运行时:

消息流

核心类型

Core层架构

AgentRuntime
运行时

RoutedAgent
路由智能体

@message_handler
消息处理器

@type_subscription
类型订阅

AgentId
智能体标识

TopicId
主题标识

MessageContext
消息上下文

消息

处理结果

核心概念说明

概念 说明
AgentRuntime Agent运行时,管理Agent生命周期和消息传递
RoutedAgent 可路由的Agent,通过订阅机制接收消息
AgentId Agent的唯一标识,包含type和key
TopicId 主题标识,用于发布/订阅
MessageContext 消息上下文,包含发送者信息
@message_handler 装饰器,标记消息处理方法
@type_subscription 装饰器,订阅特定类型消息

8.2 消息处理装饰器

from autogen_core import RoutedAgent, message_handler, MessageContext
from dataclasses import dataclass

# 定义消息类型
@dataclass
class TaskMessage:
    """任务消息"""
    task_id: str
    content: str

@dataclass
class ResultMessage:
    """结果消息"""
    task_id: str
    result: str

class WorkerAgent(RoutedAgent):
    """
    工作Agent示例
    
    使用@message_handler处理不同类型的消息
    """
    
    def __init__(self, name: str):
        super().__init__(f"工作Agent: {name}")
        self.name = name
        self.processed_count = 0
    
    @message_handler
    async def handle_task(
        self, 
        message: TaskMessage, 
        ctx: MessageContext
    ) -> ResultMessage:
        """
        处理任务消息
        
        参数:
            message: TaskMessage类型的消息
            ctx: 消息上下文,包含发送者信息
        
        返回:
            ResultMessage类型的结果
        """
        self.processed_count += 1
        
        print(f"[{self.name}] 收到任务: {message.task_id}")
        print(f"[{self.name}] 任务内容: {message.content}")
        print(f"[{self.name}] 发送者: {ctx.sender}")
        
        # 处理任务
        result = f"已处理: {message.content}"
        
        return ResultMessage(
            task_id=message.task_id,
            result=result
        )
    
    @message_handler
    async def handle_ping(self, message: str, ctx: MessageContext) -> str:
        """
        处理字符串消息
        
        简单的ping-pong响应
        """
        if message == "ping":
            return "pong"
        return f"收到: {message}"

8.3 订阅机制

from autogen_core import (
    RoutedAgent, 
    message_handler, 
    type_subscription,
    default_subscription,
    TopicId,
    AgentId,
)

@type_subscription(topic_type="math_tasks")
class MathAgent(RoutedAgent):
    """
    数学Agent - 订阅math_tasks主题
    """
    
    def __init__(self):
        super().__init__("数学计算Agent")
    
    @message_handler
    async def handle_math(self, message: str, ctx) -> str:
        """处理数学任务"""
        try:
            result = eval(message)
            return f"计算结果: {result}"
        except Exception as e:
            return f"计算错误: {e}"

@type_subscription(topic_type="text_tasks")
class TextAgent(RoutedAgent):
    """
    文本Agent - 订阅text_tasks主题
    """
    
    def __init__(self):
        super().__init__("文本处理Agent")
    
    @message_handler
    async def handle_text(self, message: str, ctx) -> str:
        """处理文本任务"""
        return f"文本长度: {len(message)}, 单词数: {len(message.split())}"

@default_subscription
class DefaultAgent(RoutedAgent):
    """
    默认Agent - 订阅所有消息
    """
    
    def __init__(self):
        super().__init__("默认处理Agent")
    
    @message_handler
    async def handle_default(self, message: str, ctx) -> str:
        """处理所有未匹配的消息"""
        return f"默认处理: {message}"

8.4 AgentRuntime运行时

import asyncio
from autogen_core import (
    SingleThreadedAgentRuntime,
    AgentId,
    TopicId,
)

async def runtime_demo():
    """
    AgentRuntime使用示例
    
    展示如何创建运行时、注册Agent、发送消息
    """
    
    # 创建单线程运行时(开发测试用)
    runtime = SingleThreadedAgentRuntime()
    
    # 注册Agent类型
    await MathAgent.register(
        runtime,
        "math_agent",           # Agent类型名称
        lambda: MathAgent()     # 工厂函数
    )
    
    await TextAgent.register(
        runtime,
        "text_agent",
        lambda: TextAgent()
    )
    
    # 启动运行时
    runtime.start()
    
    # 方式1:直接发送消息给Agent
    math_agent_id = AgentId("math_agent", "default")
    result1 = await runtime.send_message(
        "2 + 3 * 4",
        math_agent_id
    )
    print(f"数学Agent回复: {result1}")
    
    # 方式2:通过主题发布消息
    topic = TopicId("math_tasks", "default")
    result2 = await runtime.publish_message(
        "10 / 2",
        topic
    )
    print(f"主题发布结果: {result2}")
    
    # 停止运行时
    await runtime.stop()

asyncio.run(runtime_demo())

8.5 Agent注册与启动

import asyncio
from autogen_core import SingleThreadedAgentRuntime, AgentId, TopicId

async def agent_lifecycle_demo():
    """
    Agent生命周期管理示例
    """
    
    runtime = SingleThreadedAgentRuntime()
    
    # 注册多个Agent实例
    await WorkerAgent.register(
        runtime,
        "worker",
        lambda: WorkerAgent("Worker-1")
    )
    
    # 注册带状态的Agent
    class StatefulAgent(RoutedAgent):
        def __init__(self, initial_state: dict):
            super().__init__("有状态Agent")
            self.state = initial_state
        
        @message_handler
        async def get_state(self, message: str, ctx) -> dict:
            if message == "get_state":
                return self.state
            elif message.startswith("set_state:"):
                key, value = message[10:].split("=")
                self.state[key] = value
                return f"状态已更新: {key}={value}"
            return self.state
    
    await StatefulAgent.register(
        runtime,
        "stateful",
        lambda: StatefulAgent({"count": 0, "name": "test"})
    )
    
    # 启动运行时
    runtime.start()
    
    # 与Agent交互
    stateful_id = AgentId("stateful", "default")
    
    # 获取状态
    state = await runtime.send_message("get_state", stateful_id)
    print(f"当前状态: {state}")
    
    # 更新状态
    result = await runtime.send_message("set_state:count=10", stateful_id)
    print(f"更新结果: {result}")
    
    # 再次获取状态
    state = await runtime.send_message("get_state", stateful_id)
    print(f"更新后状态: {state}")
    
    await runtime.stop()

asyncio.run(agent_lifecycle_demo())

8.6 发布/订阅消息传递模式

import asyncio
from autogen_core import (
    SingleThreadedAgentRuntime,
    RoutedAgent,
    message_handler,
    type_subscription,
    TopicId,
)
from dataclasses import dataclass

@dataclass
class NewsMessage:
    """新闻消息"""
    category: str
    title: str
    content: str

@type_subscription(topic_type="news.sports")
class SportsNewsAgent(RoutedAgent):
    """体育新闻订阅者"""
    
    def __init__(self):
        super().__init__("体育新闻Agent")
        self.news_count = 0
    
    @message_handler
    async def handle_news(self, message: NewsMessage, ctx) -> None:
        self.news_count += 1
        print(f"[体育新闻 #{self.news_count}] {message.title}")

@type_subscription(topic_type="news.tech")
class TechNewsAgent(RoutedAgent):
    """科技新闻订阅者"""
    
    def __init__(self):
        super().__init__("科技新闻Agent")
        self.news_count = 0
    
    @message_handler
    async def handle_news(self, message: NewsMessage, ctx) -> None:
        self.news_count += 1
        print(f"[科技新闻 #{self.news_count}] {message.title}")

@type_subscription(topic_type="news.*")  # 通配符订阅
class AllNewsAgent(RoutedAgent):
    """所有新闻订阅者"""
    
    def __init__(self):
        super().__init__("新闻汇总Agent")
        self.all_news = []
    
    @message_handler
    async def handle_news(self, message: NewsMessage, ctx) -> None:
        self.all_news.append(message)
        print(f"[新闻汇总] 收到 {message.category} 新闻: {message.title}")

async def pubsub_demo():
    """发布/订阅模式演示"""
    
    runtime = SingleThreadedAgentRuntime()
    
    # 注册订阅者
    await SportsNewsAgent.register(runtime, "sports_news", lambda: SportsNewsAgent())
    await TechNewsAgent.register(runtime, "tech_news", lambda: TechNewsAgent())
    await AllNewsAgent.register(runtime, "all_news", lambda: AllNewsAgent())
    
    runtime.start()
    
    # 发布新闻
    sports_topic = TopicId("news.sports", "default")
    tech_topic = TopicId("news.tech", "default")
    
    await runtime.publish_message(
        NewsMessage("sports", "世界杯决赛", "精彩比赛..."),
        sports_topic
    )
    
    await runtime.publish_message(
        NewsMessage("tech", "AI新突破", "GPT-5发布..."),
        tech_topic
    )
    
    await runtime.stop()

asyncio.run(pubsub_demo())

8.7 实战场景:构建底层事件驱动Agent系统

import asyncio
from autogen_core import (
    SingleThreadedAgentRuntime,
    RoutedAgent,
    message_handler,
    type_subscription,
    TopicId,
    AgentId,
    MessageContext,
)
from dataclasses import dataclass
from typing import Optional

# 定义消息类型
@dataclass
class OrderCreated:
    """订单创建消息"""
    order_id: str
    customer: str
    items: list
    total: float

@dataclass
class PaymentProcessed:
    """支付处理消息"""
    order_id: str
    success: bool
    transaction_id: Optional[str] = None

@dataclass
class OrderShipped:
    """订单发货消息"""
    order_id: str
    tracking_number: str

@type_subscription(topic_type="order.created")
class OrderProcessor(RoutedAgent):
    """订单处理器"""
    
    def __init__(self):
        super().__init__("订单处理器")
    
    @message_handler
    async def handle_order(self, message: OrderCreated, ctx: MessageContext) -> str:
        print(f"[订单处理器] 处理订单: {message.order_id}")
        print(f"  客户: {message.customer}")
        print(f"  商品: {message.items}")
        print(f"  总额: ${message.total}")
        
        # 发送支付请求
        payment_topic = TopicId("payment.request", "default")
        await ctx.publish_message(
            PaymentProcessed(
                order_id=message.order_id,
                success=True,
                transaction_id=f"TXN-{message.order_id}"
            ),
            payment_topic
        )
        
        return f"订单 {message.order_id} 已处理"

@type_subscription(topic_type="payment.request")
class PaymentProcessor(RoutedAgent):
    """支付处理器"""
    
    def __init__(self):
        super().__init__("支付处理器")
    
    @message_handler
    async def handle_payment(self, message: PaymentProcessed, ctx: MessageContext) -> str:
        print(f"[支付处理器] 处理支付: {message.order_id}")
        
        if message.success:
            print(f"  支付成功: {message.transaction_id}")
            
            # 发送发货请求
            shipping_topic = TopicId("shipping.request", "default")
            await ctx.publish_message(
                OrderShipped(
                    order_id=message.order_id,
                    tracking_number=f"TRK-{message.order_id}"
                ),
                shipping_topic
            )
        else:
            print(f"  支付失败")
        
        return f"支付处理完成: {message.order_id}"

@type_subscription(topic_type="shipping.request")
class ShippingProcessor(RoutedAgent):
    """发货处理器"""
    
    def __init__(self):
        super().__init__("发货处理器")
    
    @message_handler
    async def handle_shipping(self, message: OrderShipped, ctx: MessageContext) -> str:
        print(f"[发货处理器] 处理发货: {message.order_id}")
        print(f"  物流单号: {message.tracking_number}")
        
        return f"订单 {message.order_id} 已发货"

async def ecommerce_workflow_demo():
    """电商工作流演示"""
    
    print("=== 电商订单处理系统 ===\n")
    
    runtime = SingleThreadedAgentRuntime()
    
    # 注册处理器
    await OrderProcessor.register(runtime, "order_processor", lambda: OrderProcessor())
    await PaymentProcessor.register(runtime, "payment_processor", lambda: PaymentProcessor())
    await ShippingProcessor.register(runtime, "shipping_processor", lambda: ShippingProcessor())
    
    runtime.start()
    
    # 创建订单
    order_topic = TopicId("order.created", "default")
    await runtime.publish_message(
        OrderCreated(
            order_id="ORD-001",
            customer="张三",
            items=["商品A", "商品B"],
            total=299.99
        ),
        order_topic
    )
    
    await asyncio.sleep(1)  # 等待处理完成
    await runtime.stop()

asyncio.run(ecommerce_workflow_demo())

第九章:扩展集成与分布式部署

9.1 Extensions扩展机制概述

核心功能

Extensions扩展层

McpWorkbench
MCP协议

OpenAIAssistantAgent
OpenAI Assistant

GrpcWorkerAgentRuntime
分布式运行时

Azure Extensions
Azure集成

外部工具接入

模型服务集成

分布式部署

云服务集成

9.2 McpWorkbench详解:MCP Server集成

import asyncio
from autogen_ext.tools.mcp import McpWorkbench
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def mcp_integration_demo():
    """
    MCP (Model Context Protocol) 集成示例
    
    MCP是Anthropic推出的标准化协议,用于AI模型与外部工具的交互
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    # 创建MCP Workbench
    # 方式1:连接到MCP服务器
    async with McpWorkbench(server_params={
        "command": "python",
        "args": ["mcp_server.py"],  # MCP服务器脚本
    }) as workbench:
        
        # 获取可用工具
        tools = await workbench.list_tools()
        print(f"可用工具: {[t.name for t in tools]}")
        
        # 创建Agent并使用MCP工具
        agent = AssistantAgent(
            name="mcp_assistant",
            model_client=model_client,
            tools=tools,  # 直接使用MCP工具
        )
        
        # 运行Agent
        result = await agent.run(
            task="使用MCP工具查询当前天气"
        )
        print(result.messages[-1].content)

asyncio.run(mcp_integration_demo())

9.3 OpenAIAssistantAgent扩展使用

import asyncio
from autogen_ext.agents.openai import OpenAIAssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def openai_assistant_demo():
    """
    OpenAI Assistant Agent示例
    
    使用OpenAI的Assistant API,支持:
    - 代码解释器
    - 文件搜索
    - 函数调用
    """
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    
    # 创建OpenAI Assistant Agent
    assistant = OpenAIAssistantAgent(
        name="code_assistant",
        model_client=model_client,
        instructions="你是一个代码专家,帮助用户编写和调试代码。",
        tools=[
            {"type": "code_interpreter"},  # 代码解释器
        ],
    )
    
    # 运行
    result = await assistant.run(
        task="写一个Python函数计算斐波那契数列,并测试前10项"
    )
    print(result.messages[-1].content)
    
    # 上传文件(可选)
    # await assistant.upload_file("./data.csv")
    
    # 清理
    await assistant.delete()

asyncio.run(openai_assistant_demo())

9.4 GrpcWorkerAgentRuntime分布式部署

import asyncio
from autogen_core import RoutedAgent, message_handler
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime

class DistributedWorkerAgent(RoutedAgent):
    """分布式工作Agent"""
    
    def __init__(self, worker_id: str):
        super().__init__(f"Worker-{worker_id}")
        self.worker_id = worker_id
    
    @message_handler
    async def process(self, message: str, ctx) -> str:
        """处理任务"""
        return f"Worker-{self.worker_id} 处理: {message}"

async def distributed_runtime_demo():
    """
    分布式运行时示例
    
    架构:
    - 主节点:GrpcAgentRuntimeHost
    - 工作节点:GrpcWorkerAgentRuntime
    """
    
    # 工作节点配置
    worker = GrpcWorkerAgentRuntime(
        host_address="localhost:50051",  # 主节点地址
        worker_id="worker-1",
    )
    
    # 注册Agent
    await DistributedWorkerAgent.register(
        worker,
        "distributed_worker",
        lambda: DistributedWorkerAgent("1")
    )
    
    # 启动工作节点
    await worker.start()
    
    print("工作节点已启动,等待任务...")
    
    # 保持运行
    try:
        await asyncio.sleep(3600)  # 运行1小时
    finally:
        await worker.stop()

# 主节点代码(单独进程)
async def host_demo():
    """主节点示例"""
    from autogen_ext.runtimes.grpc import GrpcAgentRuntimeHost
    
    host = GrpcAgentRuntimeHost(address="localhost:50051")
    await host.start()
    
    print("主节点已启动")
    
    # 发送任务到工作节点
    # result = await host.send_message(...)
    
    await host.stop()

# asyncio.run(distributed_runtime_demo())

9.5 AutoGen Studio可视化开发

AutoGen Studio是一个Web UI,用于无代码开发Agent系统:

# 安装
pip install -U autogenstudio

# 启动
autogenstudio ui --port 8080 --appdir ./myapp

Studio功能

AutoGen Studio功能

Agent配置

团队编排

工具管理

测试运行

代码导出

从Studio到代码的转换

# Studio配置导出为Python代码
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Studio生成的配置
studio_config = {
    "agents": [
        {
            "name": "assistant",
            "type": "AssistantAgent",
            "model": "gpt-4o",
            "system_message": "你是一个有帮助的助手"
        }
    ],
    "team": {
        "type": "RoundRobinGroupChat",
        "max_turns": 10
    }
}

# 转换为代码
def create_from_config(config):
    """从Studio配置创建Agent系统"""
    model_client = OpenAIChatCompletionClient(
        model=config["agents"][0]["model"]
    )
    
    agents = []
    for agent_config in config["agents"]:
        agent = AssistantAgent(
            name=agent_config["name"],
            model_client=model_client,
            system_message=agent_config["system_message"]
        )
        agents.append(agent)
    
    team = RoundRobinGroupChat(
        participants=agents,
        termination_condition=MaxMessageTermination(
            config["team"]["max_turns"]
        )
    )
    
    return team

9.6 v0.2到v0.4迁移指南

"""
v0.2 到 v0.4 主要变化
"""

# ========== v0.2 代码 ==========
# from autogen import AssistantAgent, UserProxyAgent

# # v0.2: 使用llm_config字典
# llm_config = {
#     "config_list": [{"model": "gpt-4", "api_key": "..."}]
# }

# assistant = AssistantAgent(
#     name="assistant",
#     llm_config=llm_config,
# )

# user_proxy = UserProxyAgent(
#     name="user",
#     human_input_mode="NEVER",
# )

# # v0.2: 使用initiate_chat
# user_proxy.initiate_chat(
#     assistant,
#     message="你好"
# )

# ========== v0.4 代码 ==========
import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient

async def v04_style():
    # v0.4: 使用模型客户端
    model_client = OpenAIChatCompletionClient(
        model="gpt-4o",
        api_key="..."
    )
    
    assistant = AssistantAgent(
        name="assistant",
        model_client=model_client,  # 直接传入客户端
    )
    
    user_proxy = UserProxyAgent(
        name="user",
        human_input_mode="NEVER",
    )
    
    # v0.4: 使用Team和run
    team = RoundRobinGroupChat(participants=[user_proxy, assistant])
    result = await team.run(task="你好")
    print(result.messages[-1].content)

asyncio.run(v04_style())

主要迁移变化

v0.2 v0.4
llm_config字典 OpenAIChatCompletionClient
initiate_chat() team.run() / team.run_stream()
ConversableAgent BaseChatAgent
GroupChat RoundRobinGroupChat / SelectorGroupChat
同步API 异步API (async/await)

9.7 实战场景:构建企业级分布式Agent系统

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient

class EnterpriseAgentSystem:
    """
    企业级Agent系统
    
    特性:
    1. 多租户支持
    2. 负载均衡
    3. 监控和日志
    4. 错误恢复
    """
    
    def __init__(self, config: dict):
        self.config = config
        self.model_clients = {}
        self.teams = {}
        self.metrics = {
            "requests": 0,
            "errors": 0,
            "total_tokens": 0,
        }
    
    async def initialize(self):
        """初始化系统"""
        # 创建模型客户端池
        for name, model_config in self.config["models"].items():
            self.model_clients[name] = OpenAIChatCompletionClient(**model_config)
        
        # 创建Agent团队
        for team_name, team_config in self.config["teams"].items():
            self.teams[team_name] = await self._create_team(team_config)
    
    async def _create_team(self, config: dict):
        """创建Agent团队"""
        agents = []
        
        for agent_config in config["agents"]:
            model_client = self.model_clients[agent_config["model"]]
            
            agent = AssistantAgent(
                name=agent_config["name"],
                model_client=model_client,
                system_message=agent_config["system_message"],
            )
            agents.append(agent)
        
        return SelectorGroupChat(
            participants=agents,
            model_client=self.model_clients["default"],
            termination_condition=MaxMessageTermination(
                config.get("max_messages", 20)
            )
        )
    
    async def process_request(
        self, 
        tenant_id: str, 
        team_name: str, 
        task: str
    ) -> dict:
        """
        处理请求
        
        参数:
            tenant_id: 租户ID
            team_name: 团队名称
            task: 任务描述
        
        返回:
            处理结果和元数据
        """
        self.metrics["requests"] += 1
        
        try:
            team = self.teams[team_name]
            
            # 运行任务
            result = await team.run(task=task)
            
            # 收集指标
            for msg in result.messages:
                if hasattr(msg, 'models_usage') and msg.models_usage:
                    self.metrics["total_tokens"] += (
                        msg.models_usage.prompt_tokens +
                        msg.models_usage.completion_tokens
                    )
            
            return {
                "success": True,
                "response": result.messages[-1].content,
                "stop_reason": result.stop_reason,
                "tenant_id": tenant_id,
            }
        
        except Exception as e:
            self.metrics["errors"] += 1
            return {
                "success": False,
                "error": str(e),
                "tenant_id": tenant_id,
            }
    
    def get_metrics(self) -> dict:
        """获取系统指标"""
        return self.metrics.copy()

# 配置示例
enterprise_config = {
    "models": {
        "default": {
            "model": "gpt-4o",
            "api_key": os.getenv("OPENAI_API_KEY")  # 使用环境变量
        },
        "fast": {
            "model": "gpt-3.5-turbo",
            "api_key": os.getenv("OPENAI_API_KEY")
        }
    },
    "teams": {
        "customer_service": {
            "agents": [
                {
                    "name": "receptionist",
                    "model": "fast",
                    "system_message": "你是客服接待"
                },
                {
                    "name": "expert",
                    "model": "default",
                    "system_message": "你是技术专家"
                }
            ],
            "max_messages": 15
        }
    }
}

async def enterprise_demo():
    """企业级系统演示"""
    
    system = EnterpriseAgentSystem(enterprise_config)
    await system.initialize()
    
    # 处理请求
    result = await system.process_request(
        tenant_id="company-a",
        team_name="customer_service",
        task="我的订单在哪里?"
    )
    
    print(f"结果: {result}")
    print(f"指标: {system.get_metrics()}")

asyncio.run(enterprise_demo())

第十章:生产最佳实践

10.1 架构设计原则

架构设计原则

单一职责

每个Agent专注一个领域

明确边界

清晰定义Agent能力范围

松耦合

通过消息通信,避免直接依赖

可观测

完善的日志和监控

容错性

错误处理和重试机制

Agent职责划分建议

# 好的设计:职责清晰
class GoodArchitecture:
    """
    推荐的Agent架构
    
    - ReceptionAgent: 接收请求,路由分发
    - ExpertAgent: 专业领域处理
    - ValidatorAgent: 结果验证
    - ReporterAgent: 生成报告
    """
    pass

# 不好的设计:职责混乱
class BadArchitecture:
    """
    不推荐的Agent架构
    
    - SuperAgent: 处理所有事情(职责过多)
    """
    pass

AgentChat vs Core选择指南

场景 推荐使用
快速原型开发 AgentChat
标准对话场景 AgentChat
简单工作流 AgentChat
复杂事件驱动 Core
分布式部署 Core
自定义消息路由 Core

10.2 性能优化策略

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

class PerformanceOptimizedAgent:
    """性能优化示例"""
    
    @staticmethod
    async def optimize_concurrency():
        """
        并发控制优化
        
        策略:使用信号量限制并发数
        """
        
        # 创建信号量限制并发
        semaphore = asyncio.Semaphore(5)  # 最多5个并发
        
        model_client = OpenAIChatCompletionClient(model="gpt-4o")
        agent = AssistantAgent(name="worker", model_client=model_client)
        
        async def process_with_limit(task):
            async with semaphore:
                return await agent.run(task=task)
        
        # 并发处理多个任务
        tasks = [f"任务{i}" for i in range(20)]
        results = await asyncio.gather(
            *[process_with_limit(t) for t in tasks]
        )
        
        return results
    
    @staticmethod
    def optimize_tokens():
        """
        Token优化策略
        
        1. 精简system_message
        2. 使用更短的变量名
        3. 移除不必要的上下文
        4. 使用缓存
        """
        
        # 不好的system_message(冗长)
        bad_system = """
        你是一个非常专业、经验丰富、知识渊博的助手,
        你有多年工作经验,擅长各种任务...
        (很多废话)
        """
        
        # 好的system_message(精简)
        good_system = "你是技术专家,简洁准确地回答问题。"
        
        return good_system
    
    @staticmethod
    async def optimize_caching():
        """
        缓存优化
        
        使用cache_seed实现响应缓存
        """
        
        # 启用缓存
        model_client = OpenAIChatCompletionClient(
            model="gpt-4o",
            # 相同cache_seed + 相同输入 = 相同输出
            # cache_seed=42,  # 可选:设置缓存种子
        )
        
        # 或使用自定义缓存
        class ResponseCache:
            """简单的响应缓存"""
            
            def __init__(self):
                self.cache = {}
            
            def get_key(self, messages):
                return str(hash(str(messages)))
            
            async def get_or_compute(self, key, compute_fn):
                if key in self.cache:
                    return self.cache[key]
                result = await compute_fn()
                self.cache[key] = result
                return result

10.3 错误处理与重试机制

import asyncio
from typing import Callable, Any
from functools import wraps

class RetryHandler:
    """
    重试处理器
    
    功能:
    1. 自动重试失败的操作
    2. 指数退避
    3. 最大重试次数限制
    """
    
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exceptions: tuple = (Exception,)
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exceptions = exceptions
    
    async def execute_with_retry(
        self,
        func: Callable,
        *args,
        **kwargs
    ) -> Any:
        """
        带重试的执行函数
        """
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                return await func(*args, **kwargs)
            
            except self.exceptions as e:
                last_exception = e
                
                if attempt < self.max_retries:
                    # 计算退避时间
                    delay = min(
                        self.base_delay * (2 ** attempt),
                        self.max_delay
                    )
                    
                    print(f"第{attempt + 1}次失败,{delay}秒后重试: {e}")
                    await asyncio.sleep(delay)
        
        raise last_exception

async def error_handling_demo():
    """错误处理示例"""
    
    from autogen_agentchat.agents import AssistantAgent
    from autogen_ext.models.openai import OpenAIChatCompletionClient
    
    retry_handler = RetryHandler(
        max_retries=3,
        base_delay=1.0,
        exceptions=(ConnectionError, TimeoutError)
    )
    
    model_client = OpenAIChatCompletionClient(model="gpt-4o")
    agent = AssistantAgent(name="assistant", model_client=model_client)
    
    # 使用重试机制运行Agent
    async def run_agent():
        return await agent.run(task="你好")
    
    try:
        result = await retry_handler.execute_with_retry(run_agent)
        print(f"结果: {result.messages[-1].content}")
    
    except Exception as e:
        print(f"所有重试失败: {e}")

asyncio.run(error_handling_demo())

10.4 可观测性实践

import asyncio
import logging
import json
from datetime import datetime
from typing import Optional
from dataclasses import dataclass, field

@dataclass
class TraceSpan:
    """追踪跨度"""
    span_id: str
    operation: str
    start_time: datetime
    end_time: Optional[datetime] = None
    attributes: dict = field(default_factory=dict)
    events: list = field(default_factory=list)

class ObservabilityManager:
    """
    可观测性管理器
    
    功能:
    1. 结构化日志
    2. 分布式追踪
    3. 指标收集
    """
    
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.logger = self._setup_logger()
        self.current_trace: Optional[TraceSpan] = None
        self.metrics = {
            "requests_total": 0,
            "requests_success": 0,
            "requests_error": 0,
            "latency_sum": 0.0,
        }
    
    def _setup_logger(self) -> logging.Logger:
        """配置结构化日志"""
        logger = logging.getLogger(self.service_name)
        logger.setLevel(logging.INFO)
        
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter(
            json.dumps({
                "timestamp": "%(asctime)s",
                "level": "%(levelname)s",
                "service": self.service_name,
                "message": "%(message)s"
            })
        ))
        
        logger.addHandler(handler)
        return logger
    
    def start_span(self, operation: str, **attributes) -> TraceSpan:
        """开始追踪跨度"""
        span = TraceSpan(
            span_id=f"{operation}-{datetime.now().timestamp()}",
            operation=operation,
            start_time=datetime.now(),
            attributes=attributes
        )
        self.current_trace = span
        
        self.logger.info(f"开始操作: {operation}", extra=attributes)
        return span
    
    def end_span(self, span: TraceSpan):
        """结束追踪跨度"""
        span.end_time = datetime.now()
        duration = (span.end_time - span.start_time).total_seconds()
        
        self.metrics["latency_sum"] += duration
        
        self.logger.info(
            f"完成操作: {span.operation}",
            extra={
                "duration_ms": duration * 1000,
                **span.attributes
            }
        )
    
    def record_event(self, event_name: str, **attributes):
        """记录事件"""
        if self.current_trace:
            self.current_trace.events.append({
                "name": event_name,
                "timestamp": datetime.now().isoformat(),
                "attributes": attributes
            })
        
        self.logger.info(f"事件: {event_name}", extra=attributes)
    
    def record_error(self, error: Exception, **attributes):
        """记录错误"""
        self.metrics["requests_error"] += 1
        
        self.logger.error(
            f"错误: {type(error).__name__}: {error}",
            extra={"error_type": type(error).__name__, **attributes}
        )

async def observability_demo():
    """可观测性演示"""
    
    obs = ObservabilityManager("autogen-service")
    
    # 开始追踪
    span = obs.start_span("process_request", user_id="user-123")
    
    try:
        # 模拟处理
        obs.record_event("validation_passed")
        await asyncio.sleep(0.1)
        obs.record_event("model_called")
        await asyncio.sleep(0.2)
        obs.record_event("response_generated")
        
        obs.metrics["requests_success"] += 1
    
    except Exception as e:
        obs.record_error(e)
    
    finally:
        obs.end_span(span)
    
    print(f"\n指标: {obs.metrics}")

asyncio.run(observability_demo())

10.5 安全最佳实践

import os
import hashlib
from typing import Optional
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor

class SecurityBestPractices:
    """
    安全最佳实践
    
    涵盖:
    1. 密钥管理
    2. 代码执行安全
    3. 输入验证
    4. 权限控制
    """
    
    # ========== 1. 密钥管理 ==========
    
    @staticmethod
    def manage_secrets():
        """
        密钥管理最佳实践
        
        - 使用环境变量
        - 使用密钥管理服务
        - 永远不要硬编码
        """
        
        # 好的做法:从环境变量读取
        api_key = os.getenv("OPENAI_API_KEY")
        
        # 更好的做法:使用.env文件 + python-dotenv
        from dotenv import load_dotenv
        load_dotenv()
        api_key = os.getenv("OPENAI_API_KEY")
        
        # 生产环境:使用密钥管理服务
        # Azure Key Vault, AWS Secrets Manager, etc.
        
        return api_key
    
    # ========== 2. 代码执行安全 ==========
    
    @staticmethod
    def secure_code_execution():
        """
        安全的代码执行配置
        """
        
        # 生产环境推荐:Docker隔离
        secure_executor = DockerCommandLineCodeExecutor(
            image="python:3.11-slim",
            work_dir="/workspace",
            timeout=30,
            # 安全配置
            bind_dir=None,  # 不绑定宿主机目录
        )
        
        return secure_executor
    
    # ========== 3. 输入验证 ==========
    
    @staticmethod
    def validate_input(user_input: str) -> tuple[bool, str]:
        """
        输入验证
        
        返回: (是否有效, 处理后的输入或错误信息)
        """
        
        # 长度限制
        if len(user_input) > 10000:
            return False, "输入过长"
        
        # 敏感词检测
        sensitive_words = ["password", "secret", "token"]
        for word in sensitive_words:
            if word in user_input.lower():
                return False, f"输入包含敏感词: {word}"
        
        # 清理输入
        cleaned = user_input.strip()
        
        return True, cleaned
    
    # ========== 4. 权限控制 ==========
    
    @staticmethod
    def check_permission(user_id: str, action: str) -> bool:
        """
        权限检查
        
        参数:
            user_id: 用户ID
            action: 操作类型
        
        返回: 是否有权限
        """
        
        # 示例权限配置
        permissions = {
            "admin": ["read", "write", "delete", "execute"],
            "user": ["read", "write"],
            "guest": ["read"]
        }
        
        # 获取用户角色
        role = "user"  # 从数据库查询
        
        return action in permissions.get(role, [])

# 安全配置示例
def create_secure_agent():
    """创建安全的Agent配置"""
    
    security = SecurityBestPractices()
    
    # 安全检查清单
    checklist = {
        "密钥管理": "使用环境变量",
        "代码执行": "Docker隔离",
        "输入验证": "长度和内容检查",
        "权限控制": "基于角色的访问控制",
        "日志脱敏": "不记录敏感信息",
        "HTTPS": "使用加密传输",
    }
    
    print("=== 安全配置检查清单 ===")
    for item, status in checklist.items():
        print(f"[✓] {item}: {status}")
    
    return checklist

10.6 生产部署方案

# docker-compose.yml 示例
version: '3.8'

services:
  autogen-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - LOG_LEVEL=INFO
    volumes:
      - ./workspace:/app/workspace
    depends_on:
      - redis
      - postgres
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '2'
          memory: 4G
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_DB: autogen
      POSTGRES_USER: autogen
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:
# Kubernetes部署示例 (deployment.yaml)
"""
apiVersion: apps/v1
kind: Deployment
metadata:
  name: autogen-deployment
spec:
  replicas: 3
  selector:
    matchLabels:
      app: autogen
  template:
    metadata:
      labels:
        app: autogen
    spec:
      containers:
      - name: autogen
        image: autogen:latest
        ports:
        - containerPort: 8000
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: autogen-secrets
              key: openai-api-key
        resources:
          requests:
            memory: "2Gi"
            cpu: "1"
          limits:
            memory: "4Gi"
            cpu: "2"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
"""

# 生产环境启动脚本
import asyncio
import uvicorn
from fastapi import FastAPI
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

app = FastAPI(title="AutoGen API")

# 全局Agent实例
model_client = None
agent = None

@app.on_event("startup")
async def startup():
    """启动时初始化"""
    global model_client, agent
    
    model_client = OpenAIChatCompletionClient(
        model="gpt-4o",
        api_key=os.getenv("OPENAI_API_KEY")
    )
    
    agent = AssistantAgent(
        name="production_assistant",
        model_client=model_client,
    )

@app.get("/health")
async def health():
    """健康检查端点"""
    return {"status": "healthy"}

@app.get("/ready")
async def ready():
    """就绪检查端点"""
    return {"status": "ready"}

@app.post("/chat")
async def chat(message: str):
    """聊天端点"""
    result = await agent.run(task=message)
    return {"response": result.messages[-1].content}

if __name__ == "__main__":
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8000,
        workers=4  # 多工作进程
    )

总结

本文全面讲解了AutoGen 0.4的核心概念、API使用和生产实践:

核心要点回顾

  1. 分层架构:AgentChat(高层)vs Core(底层),根据场景选择
  2. Agent类型:AssistantAgent、UserProxyAgent、CodeExecutorAgent等
  3. Team协作:RoundRobinGroupChat、SelectorGroupChat
  4. 工具系统:FunctionTool、CodeExecutor、MCP集成
  5. 终止控制:多种TerminationCondition组合使用
  6. 消息机制:理解消息类型和inner_messages
  7. Core层:事件驱动、发布/订阅、分布式部署
  8. 生产实践:安全、性能、可观测性、部署方案

学习路径建议

入门

AgentChat API

Team协作

工具集成

Core层开发

分布式部署

生产实践

AutoGen为构建多智能体系统提供了强大而灵活的框架,希望本文能帮助您快速掌握并应用于实际项目!

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐