AutoGen详解三
本文基于AutoGen 0.4版本,全面讲解多智能体系统的开发原理、核心API、最佳实践与生产部署方案。
第一章:AutoGen框架概述与核心架构
1.1 AutoGen简介与发展历程
AutoGen是由Microsoft Research开发的开源多智能体框架,旨在简化基于大语言模型(LLM)的智能体系统构建。其核心理念是让多个AI智能体通过对话协作完成复杂任务。
核心设计理念:
- 对话驱动:智能体通过自然语言对话协作,而非硬编码的工作流
- 角色扮演:每个Agent扮演特定角色,拥有独立的system_message和能力
- 人机协作:支持Human-in-the-loop,人类可随时介入决策
1.2 核心组件架构
AutoGen 0.4采用分层架构设计,从高层抽象到底层原语,满足不同场景需求:
各层职责:
| 层级 | 组件 | 适用场景 | 抽象级别 |
|---|---|---|---|
| 应用层 | Studio | 快速原型开发、无代码构建 | 最高 |
| 高层API | AgentChat | 大多数应用开发场景 | 高 |
| 底层API | Core | 复杂工作流、分布式系统 | 低 |
| 扩展层 | Extensions | 集成外部服务、MCP协议 | 中 |
1.3 三层API体系详解
AgentChat层(推荐大多数开发者使用)
AgentChat提供了开箱即用的智能体类型和团队协作模式:
# AgentChat层示例:最简单的智能体
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
# 创建模型客户端
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
api_key="your-api-key"
)
# 创建助手智能体
agent = AssistantAgent(
name="assistant",
model_client=model_client,
system_message="你是一个有帮助的AI助手。"
)
# 运行智能体
result = await agent.run(task="你好,请介绍一下你自己。")
print(result.messages[-1].content)
asyncio.run(main())
Core层(高级开发者使用)
Core层提供事件驱动的底层原语,支持构建复杂的分布式智能体系统:
# Core层示例:事件驱动智能体
from autogen_core import RoutedAgent, message_handler, SingleThreadedAgentRuntime
from dataclasses import dataclass
# 定义消息类型
@dataclass
class TaskMessage:
content: str
# 定义智能体
class WorkerAgent(RoutedAgent):
def __init__(self):
super().__init__("处理工作任务")
@message_handler
async def handle_task(self, message: TaskMessage, ctx) -> None:
# 处理任务逻辑
print(f"收到任务: {message.content}")
# 注册并运行
async def main():
runtime = SingleThreadedAgentRuntime()
await WorkerAgent.register(runtime, "worker", lambda: WorkerAgent())
runtime.start()
# 发送消息...
await runtime.stop()
asyncio.run(main())
1.4 异步编程模型基础
AutoGen 0.4完全基于异步架构,理解asyncio是开发的基础:
import asyncio
# 异步函数定义
async def fetch_data():
"""异步函数使用async关键字定义"""
await asyncio.sleep(1) # 模拟IO操作
return "数据已获取"
# 并发执行多个任务
async def concurrent_tasks():
"""使用gather并发执行多个异步任务"""
results = await asyncio.gather(
fetch_data(),
fetch_data(),
fetch_data()
)
print(f"并发结果: {results}")
# 异步生成器(用于流式输出)
async def stream_results():
"""异步生成器使用async def和yield"""
for i in range(5):
await asyncio.sleep(0.1)
yield f"结果 {i}"
# 消费异步生成器
async def consume_stream():
"""使用async for消费异步生成器"""
async for result in stream_results():
print(result)
# 运行异步程序
asyncio.run(concurrent_tasks())
关键概念:
async def:定义异步函数(协程)await:等待异步操作完成asyncio.gather():并发执行多个协程async for:异步迭代asyncio.run():运行主协程
1.5 环境安装与配置
使用pip安装
# 安装AgentChat核心包和OpenAI扩展
pip install -U "autogen-agentchat" "autogen-ext[openai]"
# 安装完整功能(包含Docker代码执行、Azure等)
pip install -U "autogen-agentchat" "autogen-ext[openai,azure,docker]"
使用uv安装(推荐)
# uv是更快的Python包管理器
pip install uv
# 创建项目并安装依赖
uv init my-autogen-project
cd my-autogen-project
uv add "autogen-agentchat" "autogen-ext[openai]"
环境变量配置
# 推荐使用环境变量管理API密钥
import os
from dotenv import load_dotenv
# 从.env文件加载环境变量
load_dotenv()
# 获取API密钥
api_key = os.getenv("OPENAI_API_KEY")
# .env文件示例
OPENAI_API_KEY=sk-your-api-key-here
AZURE_OPENAI_API_KEY=your-azure-key
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
第二章:AgentChat层——Agent核心类型与参数
2.1 Agent继承体系
AutoGen的Agent体系采用清晰的继承结构:
2.2 BaseChatAgent核心方法详解
BaseChatAgent是所有Agent的抽象基类,定义了核心接口:
from abc import ABC, abstractmethod
from typing import Sequence, List
from autogen_agentchat.messages import BaseChatMessage
from autogen_core import CancellationToken
class BaseChatAgent(ABC):
"""Agent基类,定义核心接口"""
@property
@abstractmethod
def name(self) -> str:
"""Agent的唯一标识名称"""
pass
@property
@abstractmethod
def description(self) -> str:
"""Agent的功能描述,用于Team中选择发言者"""
pass
@abstractmethod
async def on_messages(
self,
messages: Sequence[BaseChatMessage],
cancellation_token: CancellationToken
) -> Response:
"""
处理传入消息并返回响应
参数:
messages: 接收到的消息序列
cancellation_token: 用于取消操作的令牌
返回:
Response对象,包含chat_message和inner_messages
"""
pass
@abstractmethod
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""
重置Agent到初始状态
参数:
cancellation_token: 取消令牌
"""
pass
@property
def produced_message_types(self) -> List[type]:
"""
Agent可能产生的消息类型列表
用于类型检查和消息路由
"""
return []
Response对象详解:
from dataclasses import dataclass
from typing import List, Optional
from autogen_agentchat.messages import BaseChatMessage, BaseAgentEvent
@dataclass
class Response:
"""Agent响应对象"""
# 主要输出消息,发送给其他Agent或用户
chat_message: BaseChatMessage
# 内部事件列表,用于UI展示或调试
# 包含ToolCallEvent、ToolCallExecutionEvent等
inner_messages: Optional[List[BaseAgentEvent]] = None
# 是否请求终止对话
terminate: bool = False
2.3 AssistantAgent参数详解
AssistantAgent是最常用的智能体类型,由LLM驱动:
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
# 创建模型客户端
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
api_key="your-api-key"
)
# 创建AssistantAgent
assistant = AssistantAgent(
# === 必需参数 ===
name="researcher", # Agent唯一标识,用于日志和消息路由
# === 模型配置 ===
model_client=model_client, # 模型客户端实例
# === 行为配置 ===
system_message="""你是一个专业的研究助手。
你的职责是:
1. 分析用户的研究问题
2. 搜索相关信息
3. 提供结构化的研究报告
请始终保持客观、准确。
""", # 系统提示词,定义Agent角色和行为
description="专业研究助手,擅长信息搜索和分析", # 功能描述,用于Team发言者选择
# === 工具配置 ===
tools=[search_tool, analyze_tool], # 可用工具列表
# === Handoff配置 ===
handoffs=["writer", "reviewer"], # 可转交任务的Agent名称列表
# === 高级配置 ===
reflect_on_tool_use=True, # 是否在工具使用后进行反思
tool_call_summary_format="{result}", # 工具调用摘要格式
)
参数详解表:
| 参数 | 类型 | 必需 | 默认值 | 说明 |
|---|---|---|---|---|
| name | str | ✅ | - | Agent唯一标识,建议使用小写字母和下划线 |
| model_client | ChatCompletionClient | ✅ | - | 模型客户端,支持OpenAI、Azure等 |
| system_message | str | ❌ | 默认助手提示词 | 定义Agent角色、能力和行为规范 |
| description | str | ❌ | 基于system_message生成 | 用于Team中发言者选择的描述 |
| tools | List[Tool] | ❌ | [] | 可调用的工具列表 |
| handoffs | List[str] | ❌ | [] | 可转交任务的Agent名称 |
| reflect_on_tool_use | bool | ❌ | False | 工具使用后是否让LLM反思结果 |
| tool_call_summary_format | str | ❌ | “{result}” | 工具调用结果摘要格式 |
2.4 UserProxyAgent参数详解
UserProxyAgent代表人机交互接口,支持人工介入:
from autogen_agentchat.agents import UserProxyAgent
user_proxy = UserProxyAgent(
name="user",
# === 人工输入模式 ===
human_input_mode="ALWAYS", # NEVER / ALWAYS / TERMINATE
# === 自动回复配置 ===
max_consecutive_auto_reply=3, # 最大连续自动回复次数
# === 代码执行配置 ===
code_executor=code_executor, # 代码执行器实例
)
# human_input_mode详解:
# NEVER: 完全自动,不请求人工输入
# ALWAYS: 每次都请求人工输入
# TERMINATE: 仅在终止条件触发时请求人工输入
human_input_mode对比:
2.5 CodeExecutorAgent与OpenAIAssistantAgent
CodeExecutorAgent
专门用于执行代码的Agent:
from autogen_agentchat.agents import CodeExecutorAgent
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
# 创建代码执行器
code_executor = LocalCommandLineCodeExecutor(
work_dir="./workspace", # 工作目录
timeout=60, # 执行超时(秒)
)
# 创建代码执行Agent
code_agent = CodeExecutorAgent(
name="code_executor",
code_executor=code_executor,
)
OpenAIAssistantAgent
使用OpenAI Assistant API的Agent:
from autogen_ext.agents.openai import OpenAIAssistantAgent
# 创建OpenAI Assistant Agent
assistant = OpenAIAssistantAgent(
name="openai_assistant",
model_client=model_client,
instructions="你是一个有帮助的助手",
tools=[{"type": "code_interpreter"}], # 支持代码解释器
)
2.6 human_input_mode三种模式对比
import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def demonstrate_human_input_modes():
"""演示三种human_input_mode的区别"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 模式1: NEVER - 完全自动
auto_agent = UserProxyAgent(
name="auto_user",
human_input_mode="NEVER",
max_consecutive_auto_reply=1,
)
# 模式2: ALWAYS - 每次都请求输入
interactive_agent = UserProxyAgent(
name="interactive_user",
human_input_mode="ALWAYS",
)
# 模式3: TERMINATE - 终止时请求输入
confirm_agent = UserProxyAgent(
name="confirm_user",
human_input_mode="TERMINATE",
)
# 创建助手
assistant = AssistantAgent(
name="assistant",
model_client=model_client,
)
# NEVER模式示例:完全自动化执行
print("=== NEVER模式 ===")
result = await assistant.run(task="计算1+1等于多少")
print(f"结果: {result.messages[-1].content}")
asyncio.run(demonstrate_human_input_modes())
2.7 自定义Agent开发
继承BaseChatAgent创建自定义Agent:
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.messages import TextMessage, Response
from autogen_core import CancellationToken
from typing import Sequence, List, AsyncGenerator
class CustomAnalysisAgent(BaseChatAgent):
"""
自定义分析Agent示例
功能:对输入文本进行分析并返回结构化结果
"""
def __init__(self, name: str, analysis_type: str = "sentiment"):
"""
初始化自定义Agent
参数:
name: Agent名称
analysis_type: 分析类型(sentiment/keyword/summary)
"""
self._name = name
self._analysis_type = analysis_type
self._message_count = 0
@property
def name(self) -> str:
"""返回Agent名称"""
return self._name
@property
def description(self) -> str:
"""返回Agent描述"""
return f"文本分析Agent,擅长{self._analysis_type}分析"
@property
def produced_message_types(self) -> List[type]:
"""声明产生的消息类型"""
return [TextMessage]
async def on_messages(
self,
messages: Sequence[TextMessage],
cancellation_token: CancellationToken
) -> Response:
"""
处理消息的核心方法
参数:
messages: 输入消息序列
cancellation_token: 取消令牌
返回:
Response对象
"""
# 检查是否被取消
if cancellation_token.is_cancelled:
raise asyncio.CancelledError("操作被取消")
# 获取最后一条消息
last_message = messages[-1] if messages else None
if not last_message:
return Response(
chat_message=TextMessage(
content="没有收到消息",
source=self.name
)
)
# 执行分析逻辑
self._message_count += 1
analysis_result = await self._analyze(last_message.content)
# 构建响应
response_message = TextMessage(
content=f"分析结果(第{self._message_count}次):\n{analysis_result}",
source=self.name
)
return Response(chat_message=response_message)
async def _analyze(self, text: str) -> str:
"""
执行文本分析(示例实现)
参数:
text: 待分析文本
返回:
分析结果字符串
"""
# 模拟分析过程
await asyncio.sleep(0.1)
if self._analysis_type == "sentiment":
return f"情感分析: 文本长度={len(text)}, 倾向=中性"
elif self._analysis_type == "keyword":
words = text.split()
return f"关键词提取: {words[:5]}"
else:
return f"摘要: {text[:100]}..."
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""重置Agent状态"""
self._message_count = 0
# 使用自定义Agent
async def use_custom_agent():
"""演示自定义Agent的使用"""
agent = CustomAnalysisAgent(
name="analyzer",
analysis_type="sentiment"
)
# 运行Agent
result = await agent.run(task="今天天气真好,心情很愉快!")
print(result.messages[-1].content)
asyncio.run(use_custom_agent())
2.8 实战场景:双Agent对话实现Human-in-the-loop
import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def human_in_the_loop_demo():
"""
双Agent对话示例:实现人机协作
场景:AI助手与用户协作完成代码编写任务
"""
# 创建模型客户端
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
api_key="your-api-key"
)
# 创建AI助手Agent
assistant = AssistantAgent(
name="coder",
model_client=model_client,
system_message="""你是一个专业的Python程序员。
当用户提出需求时,你负责:
1. 理解需求
2. 编写代码
3. 解释代码逻辑
当任务完成时,请回复'完成'。
"""
)
# 创建用户代理Agent(TERMINATE模式)
user_proxy = UserProxyAgent(
name="user",
human_input_mode="TERMINATE", # 在终止时请求用户确认
)
# 创建终止条件
termination = TextMentionTermination("完成")
# 创建对话团队
team = RoundRobinGroupChat(
participants=[user_proxy, assistant],
termination_condition=termination
)
# 运行对话
print("=== 开始对话 ===")
print("提示:输入'完成'结束对话\n")
async for message in team.run_stream(task="请帮我写一个计算斐波那契数列的函数"):
if hasattr(message, 'content'):
print(f"[{message.source}]: {message.content}\n")
asyncio.run(human_in_the_loop_demo())
第三章:模型客户端与配置体系
3.1 OpenAIChatCompletionClient参数详解
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_core.models import ModelInfo
# 基础配置
client = OpenAIChatCompletionClient(
# === 必需参数 ===
model="gpt-4o", # 模型名称
# === 认证配置 ===
api_key="sk-your-api-key", # API密钥(也可通过环境变量设置)
# === 端点配置 ===
base_url="https://api.openai.com/v1", # API端点(自定义端点时修改)
# === 模型能力配置 ===
model_info=ModelInfo(
vision=True, # 是否支持视觉
function_calling=True, # 是否支持函数调用
json_output=True, # 是否支持JSON输出
family="gpt-4o", # 模型家族
),
# === 生成参数 ===
temperature=0.7, # 温度(0-2),控制随机性
max_tokens=4096, # 最大生成token数
top_p=1.0, # 核采样参数
frequency_penalty=0.0, # 频率惩罚(-2到2)
presence_penalty=0.0, # 存在惩罚(-2到2)
stop=["END", "STOP"], # 停止词列表
# === 超时配置 ===
timeout=60, # 请求超时(秒)
)
# 使用环境变量的推荐方式
import os
client = OpenAIChatCompletionClient(
model="gpt-4o",
api_key=os.getenv("OPENAI_API_KEY"), # 从环境变量读取
)
参数详解表:
| 参数 | 类型 | 说明 | 取值范围 |
|---|---|---|---|
| model | str | 模型标识符 | gpt-4o, gpt-4-turbo, gpt-3.5-turbo等 |
| api_key | str | API密钥 | 从OpenAI获取 |
| base_url | str | API端点 | 默认OpenAI端点或自定义 |
| temperature | float | 控制输出随机性 | 0.0-2.0,越高越随机 |
| max_tokens | int | 最大输出token数 | 1-模型上限 |
| top_p | float | 核采样 | 0.0-1.0 |
| frequency_penalty | float | 降低重复词频率 | -2.0到2.0 |
| presence_penalty | float | 鼓励新话题 | -2.0到2.0 |
| timeout | int | 请求超时秒数 | 建议30-120 |
3.2 llm_config配置体系
llm_config是AutoGen的模型配置字典,支持多模型配置和缓存:
import os
from autogen_ext.models.openai import OpenAIChatCompletionClient
# 方式1:直接使用OpenAIChatCompletionClient(推荐)
# 注意:生产环境请使用环境变量,不要硬编码API Key
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
api_key=os.getenv("OPENAI_API_KEY"), # 推荐使用环境变量
temperature=0.7,
)
# 方式2:使用config_list(兼容v0.2风格)
config_list = [
{
"model": "gpt-4o",
"api_key": os.getenv("OPENAI_API_KEY"), # 推荐使用环境变量
},
{
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
]
# 完整的llm_config示例
llm_config = {
"config_list": config_list,
"cache_seed": 42, # 缓存种子,相同种子+相同输入=相同输出
"timeout": 120, # 请求超时
"temperature": 0.7, # 默认温度
"seed": 42, # 随机种子,用于可复现性
}
# 缓存机制说明
# cache_seed=None: 禁用缓存
# cache_seed=42: 启用缓存,相同种子保证可复现
缓存机制工作原理:
3.3 多模型支持
Azure OpenAI配置
from autogen_ext.models.openai import OpenAIChatCompletionClient
from azure.identity import DefaultAzureCredential
# 方式1:使用API Key
azure_client = OpenAIChatCompletionClient(
model="gpt-4o",
api_key="your-azure-api-key",
base_url="https://your-resource.openai.azure.com/",
api_version="2024-02-15-preview",
)
# 方式2:使用Azure AD认证
azure_client = OpenAIChatCompletionClient(
model="gpt-4o",
azure_endpoint="https://your-resource.openai.azure.com/",
azure_ad_token_provider=DefaultAzureCredential(),
api_version="2024-02-15-preview",
)
本地模型配置(通过LiteLLM)
# 使用LiteLLM代理本地模型
local_client = OpenAIChatCompletionClient(
model="llama-3", # 或其他本地模型名称
base_url="http://localhost:8000/v1", # LiteLLM代理地址
api_key="dummy-key", # 本地模型通常不需要真实key
)
# 使用Ollama
ollama_client = OpenAIChatCompletionClient(
model="llama3",
base_url="http://localhost:11434/v1",
api_key="ollama",
)
多模型负载均衡
import asyncio
from typing import List
from autogen_ext.models.openai import OpenAIChatCompletionClient
class ModelLoadBalancer:
"""
多模型负载均衡器
功能:在多个模型间分配请求,支持轮询和故障转移
"""
def __init__(self, model_configs: List[dict]):
"""
初始化负载均衡器
参数:
model_configs: 模型配置列表
"""
self.clients = [
OpenAIChatCompletionClient(**config)
for config in model_configs
]
self.current_index = 0
async def create(self, messages, **kwargs):
"""
轮询方式选择模型创建响应
参数:
messages: 消息列表
**kwargs: 其他参数
返回:
模型响应
"""
# 尝试所有模型
for _ in range(len(self.clients)):
client = self.clients[self.current_index]
try:
result = await client.create(messages, **kwargs)
self.current_index = (self.current_index + 1) % len(self.clients)
return result
except Exception as e:
print(f"模型 {self.current_index} 失败: {e}")
self.current_index = (self.current_index + 1) % len(self.clients)
raise RuntimeError("所有模型都不可用")
# 使用示例
configs = [
{"model": "gpt-4o", "api_key": os.getenv("OPENAI_API_KEY")},
{"model": "gpt-3.5-turbo", "api_key": os.getenv("OPENAI_API_KEY")},
]
balancer = ModelLoadBalancer(configs)
3.4 流式输出配置与实现
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def streaming_demo():
"""
流式输出演示
展示两种流式输出方式:
1. 使用Console组件自动显示
2. 手动处理流式输出
"""
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
stream=True, # 启用流式输出
)
agent = AssistantAgent(
name="assistant",
model_client=model_client,
)
# 方式1:使用Console组件(推荐)
print("=== 方式1:Console组件 ===")
stream = agent.run_stream(task="请写一首关于春天的短诗")
await Console(stream)
# 方式2:手动处理流式输出
print("\n=== 方式2:手动处理 ===")
async for message in agent.run_stream(task="解释什么是递归"):
# message可能是各种类型
if hasattr(message, 'content'):
# 文本消息
if isinstance(message.content, str):
print(message.content, end="", flush=True)
# 多模态内容
elif isinstance(message.content, list):
for item in message.content:
print(item, end="", flush=True)
print() # 换行
asyncio.run(streaming_demo())
Console组件详解:
from autogen_agentchat.ui import Console
# Console是一个便捷的流式输出显示工具
async def console_usage():
"""Console组件使用示例"""
# 基本用法
stream = team.run_stream(task="你的任务")
await Console(stream)
# Console会自动处理不同类型的消息:
# - TextMessage: 显示文本内容
# - ToolCallEvent: 显示工具调用
# - ToolCallExecutionEvent: 显示工具执行结果
# - HandoffMessage: 显示Agent切换
# - StopMessage: 显示终止信息
3.5 RequestUsage:Token使用量跟踪
from dataclasses import dataclass
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
@dataclass
class RequestUsage:
"""Token使用量记录"""
prompt_tokens: int # 输入token数
completion_tokens: int # 输出token数
total_tokens: int # 总token数
async def track_token_usage():
"""
Token使用量跟踪示例
展示如何监控和统计API调用成本
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(name="assistant", model_client=model_client)
# 运行并获取结果
result = await agent.run(task="解释量子计算的基本原理")
# 从结果中提取token使用量
total_prompt = 0
total_completion = 0
for message in result.messages:
if hasattr(message, 'models_usage') and message.models_usage:
usage = message.models_usage
total_prompt += usage.prompt_tokens
total_completion += usage.completion_tokens
print(f"消息使用: 输入={usage.prompt_tokens}, 输出={usage.completion_tokens}")
print(f"\n总计: 输入={total_prompt}, 输出={total_completion}, 总计={total_prompt + total_completion}")
# 成本估算(以GPT-4o为例)
# 输入: $2.5/1M tokens, 输出: $10/1M tokens
cost = (total_prompt * 2.5 + total_completion * 10) / 1_000_000
print(f"估算成本: ${cost:.6f}")
asyncio.run(track_token_usage())
Token计数器类:
class TokenCounter:
"""
Token计数和成本跟踪工具
功能:
- 统计总token使用量
- 估算API成本
- 生成使用报告
注意:定价仅供参考,请以OpenAI官方最新价格为准
"""
# 模型定价(美元/百万token,仅供参考)
PRICING = {
"gpt-4o": {"input": 2.5, "output": 10.0},
"gpt-4-turbo": {"input": 10.0, "output": 30.0},
"gpt-3.5-turbo": {"input": 0.5, "output": 1.5},
}
def __init__(self, model: str = "gpt-4o"):
"""
初始化计数器
参数:
model: 模型名称,用于成本计算
"""
self.model = model
self.prompt_tokens = 0
self.completion_tokens = 0
self.request_count = 0
def add_usage(self, prompt: int, completion: int):
"""
添加使用记录
参数:
prompt: 输入token数
completion: 输出token数
"""
self.prompt_tokens += prompt
self.completion_tokens += completion
self.request_count += 1
def calculate_cost(self) -> float:
"""
计算总成本
返回:
总成本(美元)
"""
pricing = self.PRICING.get(self.model, {"input": 0, "output": 0})
cost = (
self.prompt_tokens * pricing["input"] +
self.completion_tokens * pricing["output"]
) / 1_000_000
return cost
def report(self) -> str:
"""
生成使用报告
返回:
格式化的报告字符串
"""
return f"""
=== Token使用报告 ===
模型: {self.model}
请求次数: {self.request_count}
输入Token: {self.prompt_tokens:,}
输出Token: {self.completion_tokens:,}
总Token: {self.prompt_tokens + self.completion_tokens:,}
估算成本: ${self.calculate_cost():.4f}
====================
"""
# 使用示例
counter = TokenCounter("gpt-4o")
counter.add_usage(100, 50)
counter.add_usage(200, 100)
print(counter.report())
3.6 实战场景:多模型切换与成本控制
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from typing import Optional
class CostAwareAgent:
"""
成本感知Agent
功能:
- 根据任务复杂度选择合适模型
- 监控token使用量
- 在预算范围内运行
"""
def __init__(
self,
name: str,
fast_model: str = "gpt-3.5-turbo",
smart_model: str = "gpt-4o",
budget_limit: float = 1.0, # 美元
):
"""
初始化成本感知Agent
参数:
name: Agent名称
fast_model: 快速/便宜模型
smart_model: 智能/昂贵模型
budget_limit: 预算上限
"""
self.name = name
self.fast_client = OpenAIChatCompletionClient(model=fast_model)
self.smart_client = OpenAIChatCompletionClient(model=smart_model)
self.budget_limit = budget_limit
self.total_cost = 0
self.token_counter = TokenCounter(smart_model)
def _estimate_complexity(self, task: str) -> str:
"""
评估任务复杂度
参数:
task: 任务描述
返回:
'simple' 或 'complex'
"""
# 简单启发式规则
complex_keywords = ["分析", "设计", "架构", "优化", "重构", "算法"]
if any(kw in task for kw in complex_keywords):
return "complex"
if len(task) > 500:
return "complex"
return "simple"
async def run(self, task: str) -> str:
"""
运行任务,自动选择模型
参数:
task: 任务描述
返回:
结果字符串
"""
# 检查预算
if self.total_cost >= self.budget_limit:
return "错误:已超出预算限制"
# 选择模型
complexity = self._estimate_complexity(task)
client = self.smart_client if complexity == "complex" else self.fast_client
print(f"任务复杂度: {complexity}, 使用模型: {client.model}")
# 创建Agent并运行
agent = AssistantAgent(name=self.name, model_client=client)
result = await agent.run(task=task)
# 更新成本
for msg in result.messages:
if hasattr(msg, 'models_usage') and msg.models_usage:
self.token_counter.add_usage(
msg.models_usage.prompt_tokens,
msg.models_usage.completion_tokens
)
self.total_cost = self.token_counter.calculate_cost()
print(f"当前成本: ${self.total_cost:.4f} / ${self.budget_limit:.2f}")
return result.messages[-1].content
# 使用示例
async def main():
agent = CostAwareAgent(
name="smart_assistant",
budget_limit=0.5
)
# 简单任务 -> 使用快速模型
await agent.run("1+1等于多少?")
# 复杂任务 -> 使用智能模型
await agent.run("请设计一个分布式缓存系统的架构,考虑一致性、可用性和分区容错性")
asyncio.run(main())
第四章:Team团队机制与协作模式
4.1 Team核心概念
Team是AutoGen中管理多Agent协作的核心抽象:
核心概念:
- Team:Agent的容器,管理Agent间的消息传递
- Participant:参与协作的Agent
- TerminationCondition:终止条件,决定何时结束对话
- Selector:发言者选择器,决定下一个发言的Agent
4.2 Team.run()方法详解
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def team_run_demo():
"""
Team.run()方法详解
参数:
task: 任务描述(字符串或消息对象)
cancellation_token: 取消令牌
output_task_messages: 是否输出任务消息
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 创建多个Agent
writer = AssistantAgent(
name="writer",
model_client=model_client,
system_message="你是一个技术文档撰写者,负责编写清晰的技术文档。"
)
reviewer = AssistantAgent(
name="reviewer",
model_client=model_client,
system_message="你是一个技术审核者,负责检查文档的准确性和完整性。"
)
# 创建Team
team = RoundRobinGroupChat(
participants=[writer, reviewer],
termination_condition=MaxMessageTermination(max_messages=6)
)
# 方式1:同步运行(等待完成)
print("=== 同步运行 ===")
result = await team.run(
task="请写一段关于Python装饰器的技术文档",
# cancellation_token=None, # 可选:取消令牌
# output_task_messages=True, # 可选:是否输出任务消息
)
# 处理结果
print(f"停止原因: {result.stop_reason}")
print(f"消息数量: {len(result.messages)}")
for msg in result.messages:
print(f"[{msg.source}]: {msg.content[:100]}...")
asyncio.run(team_run_demo())
4.3 Team.run_stream()方法详解
from autogen_agentchat.ui import Console
async def team_stream_demo():
"""
Team.run_stream()方法详解
返回异步生成器,实时输出消息
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
writer = AssistantAgent(name="writer", model_client=model_client)
reviewer = AssistantAgent(name="reviewer", model_client=model_client)
team = RoundRobinGroupChat(
participants=[writer, reviewer],
termination_condition=MaxMessageTermination(max_messages=4)
)
# 方式1:使用Console组件(推荐)
print("=== 使用Console ===")
stream = team.run_stream(task="解释什么是RESTful API")
await Console(stream)
# 方式2:手动处理流
print("\n=== 手动处理流 ===")
async for message in team.run_stream(task="解释什么是GraphQL"):
# 根据消息类型处理
message_type = type(message).__name__
print(f"[{message_type}] ", end="")
if hasattr(message, 'source'):
print(f"来自: {message.source}")
if hasattr(message, 'content'):
content = message.content
if isinstance(content, str):
print(f"内容: {content[:100]}...")
asyncio.run(team_stream_demo())
4.4 Team.reset()方法
async def team_reset_demo():
"""
Team.reset()方法演示
重置Team状态,清除对话历史
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(name="assistant", model_client=model_client)
team = RoundRobinGroupChat(participants=[agent])
# 第一次运行
result1 = await team.run(task="你好")
print(f"第一次运行消息数: {len(result1.messages)}")
# 重置Team
await team.reset()
print("Team已重置")
# 第二次运行(历史已清除)
result2 = await team.run(task="你好")
print(f"第二次运行消息数: {len(result2.messages)}")
asyncio.run(team_reset_demo())
4.5 TaskResult返回值详解
from dataclasses import dataclass
from typing import List, Optional
from autogen_agentchat.messages import BaseChatMessage
@dataclass
class TaskResult:
"""
Team.run()的返回值
属性:
messages: 完整的消息历史
stop_reason: 停止原因描述
"""
messages: List[BaseChatMessage]
stop_reason: Optional[str] = None
async def task_result_demo():
"""TaskResult详解"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(name="assistant", model_client=model_client)
team = RoundRobinGroupChat(
participants=[agent],
termination_condition=MaxMessageTermination(max_messages=2)
)
result = await team.run(task="你好")
# 访问消息
print("=== 所有消息 ===")
for i, msg in enumerate(result.messages):
print(f"{i+1}. [{msg.source}]: {msg.content}")
# 停止原因
print(f"\n停止原因: {result.stop_reason}")
# 最后一条消息(通常是最终结果)
last_message = result.messages[-1]
print(f"\n最终回复: {last_message.content}")
asyncio.run(task_result_demo())
4.6 RoundRobinGroupChat轮询协作模式
RoundRobinGroupChat按顺序轮流让Agent发言:
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
async def round_robin_demo():
"""
RoundRobinGroupChat示例
场景:代码编写-审核-优化流程
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 定义角色
coder = AssistantAgent(
name="coder",
model_client=model_client,
system_message="""你是程序员,负责编写代码。
编写完成后说'请审核'。"""
)
reviewer = AssistantAgent(
name="reviewer",
model_client=model_client,
system_message="""你是代码审核者,负责检查代码质量。
如果代码没问题说'通过',否则指出问题。"""
)
optimizer = AssistantAgent(
name="optimizer",
model_client=model_client,
system_message="""你是优化专家,负责优化代码性能和可读性。
优化完成后说'完成'。"""
)
# 创建轮询团队
team = RoundRobinGroupChat(
participants=[coder, reviewer, optimizer],
termination_condition=TextMentionTermination("完成")
)
# 运行
stream = team.run_stream(
task="写一个Python函数,计算列表中所有偶数的和"
)
await Console(stream)
asyncio.run(round_robin_demo())
4.7 SelectorGroupChat智能选择模式
SelectorGroupChat使用LLM智能选择下一个发言者:
from autogen_agentchat.teams import SelectorGroupChat
async def selector_group_demo():
"""
SelectorGroupChat示例
场景:根据问题类型选择合适的专家
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 定义专家角色
math_expert = AssistantAgent(
name="math_expert",
model_client=model_client,
system_message="你是数学专家,回答数学相关问题。",
description="擅长数学计算和数学概念解释"
)
code_expert = AssistantAgent(
name="code_expert",
model_client=model_client,
system_message="你是编程专家,回答编程相关问题。",
description="擅长编程、代码调试和软件开发"
)
general_expert = AssistantAgent(
name="general_expert",
model_client=model_client,
system_message="你是通用知识专家,回答一般性问题。",
description="擅长回答一般性问题和常识"
)
# 创建选择器团队
team = SelectorGroupChat(
participants=[math_expert, code_expert, general_expert],
model_client=model_client, # 用于选择发言者的模型
termination_condition=MaxMessageTermination(max_messages=6),
)
# 运行:LLM会根据问题选择合适的专家
stream = team.run_stream(task="如何用Python实现快速排序算法?")
await Console(stream)
asyncio.run(selector_group_demo())
4.8 GroupChat参数详解
from autogen_agentchat.teams import RoundRobinGroupChat
team = RoundRobinGroupChat(
# === 必需参数 ===
participants=[agent1, agent2], # 参与者列表
# === 终止条件 ===
termination_condition=TextMentionTermination("完成"),
# === 消息限制 ===
max_turns=10, # 最大轮次(可选)
)
# 参数说明:
# participants: 参与协作的Agent列表
# termination_condition: 终止条件对象
# max_turns: 最大对话轮次(额外的终止条件)
4.9 发言者选择策略
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.base import Selector
async def custom_selector_demo():
"""
自定义发言者选择策略
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent(name="agent1", model_client=model_client)
agent2 = AssistantAgent(name="agent2", model_client=model_client)
# 自定义选择函数
async def custom_selector(messages, agents):
"""
自定义发言者选择逻辑
参数:
messages: 消息历史
agents: 可用Agent列表
返回:
选择的Agent名称
"""
last_message = messages[-1] if messages else None
if last_message and "代码" in last_message.content:
return "agent1" # 代码相关选择agent1
else:
return "agent2" # 其他选择agent2
team = SelectorGroupChat(
participants=[agent1, agent2],
selector=custom_selector,
)
4.10 实战场景:构建多角色协作团队
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import TextMentionTermination, MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def software_development_team():
"""
软件开发团队协作示例
角色:
- 产品经理:需求分析
- 架构师:系统设计
- 开发者:代码实现
- 测试工程师:测试验证
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 产品经理
product_manager = AssistantAgent(
name="product_manager",
model_client=model_client,
system_message="""你是产品经理,负责:
1. 分析用户需求
2. 定义产品功能
3. 编写需求文档
完成需求分析后说'需求已明确'。""",
description="产品经理,擅长需求分析和产品规划"
)
# 架构师
architect = AssistantAgent(
name="architect",
model_client=model_client,
system_message="""你是系统架构师,负责:
1. 设计系统架构
2. 选择技术栈
3. 定义模块划分
完成架构设计后说'架构已设计'。""",
description="系统架构师,擅长系统设计和技术选型"
)
# 开发者
developer = AssistantAgent(
name="developer",
model_client=model_client,
system_message="""你是高级开发者,负责:
1. 实现核心功能
2. 编写高质量代码
3. 处理技术细节
完成开发后说'开发完成'。""",
description="高级开发者,擅长代码实现"
)
# 测试工程师
tester = AssistantAgent(
name="tester",
model_client=model_client,
system_message="""你是测试工程师,负责:
1. 设计测试用例
2. 验证功能正确性
3. 报告问题
测试通过后说'测试通过,项目完成'。""",
description="测试工程师,擅长质量保证"
)
# 创建团队
team = SelectorGroupChat(
participants=[product_manager, architect, developer, tester],
model_client=model_client,
termination_condition=TextMentionTermination("项目完成") | MaxMessageTermination(20),
)
# 运行项目
print("=== 软件开发团队启动 ===\n")
stream = team.run_stream(
task="开发一个简单的待办事项(Todo)应用,支持添加、删除、标记完成功能"
)
await Console(stream)
asyncio.run(software_development_team())
第五章:工具系统与代码执行
5.1 Tool定义与FunctionTool参数
AutoGen使用FunctionTool封装可调用的工具函数:
from autogen_core.tools import FunctionTool
from typing import Annotated
# 方式1:使用装饰器定义
@FunctionTool.from_function
def get_weather(
city: Annotated[str, "城市名称,如'北京'、'上海'"]
) -> str:
"""
获取指定城市的天气信息
参数:
city: 城市名称
返回:
天气信息字符串
"""
# 模拟天气API调用
weather_data = {
"北京": "晴天,温度15°C",
"上海": "多云,温度18°C",
"广州": "小雨,温度22°C",
}
return weather_data.get(city, f"未找到{city}的天气信息")
# 方式2:直接创建FunctionTool
def calculate_sum(a: int, b: int) -> int:
"""计算两个数的和"""
return a + b
sum_tool = FunctionTool(
name="calculate_sum",
description="计算两个整数的和",
func=calculate_sum,
global_imports=[], # 全局导入(用于序列化)
)
# 方式3:使用Annotated增强参数描述
from typing import Annotated
def search_web(
query: Annotated[str, "搜索关键词"],
max_results: Annotated[int, "最大返回结果数,默认5"] = 5,
language: Annotated[str, "语言代码,如'zh'、'en'"] = "zh"
) -> str:
"""
在网络上搜索信息
参数:
query: 搜索关键词
max_results: 最大返回结果数
language: 搜索语言
返回:
搜索结果字符串
"""
# 模拟搜索
return f"搜索'{query}',找到{max_results}条{language}结果"
search_tool = FunctionTool.from_function(search_web)
FunctionTool参数详解:
| 参数 | 类型 | 说明 |
|---|---|---|
| name | str | 工具名称,用于LLM调用 |
| description | str | 工具描述,LLM理解工具用途 |
| func | Callable | 实际执行的函数 |
| global_imports | List[str] | 函数依赖的全局导入 |
5.2 工具注册机制
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_core.tools import FunctionTool
from autogen_ext.models.openai import OpenAIChatCompletionClient
# 定义工具函数
def get_current_time() -> str:
"""获取当前时间"""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def calculate(expression: str) -> str:
"""
计算数学表达式
参数:
expression: 数学表达式,如'1+2*3'
"""
try:
result = eval(expression)
return f"计算结果: {result}"
except Exception as e:
return f"计算错误: {str(e)}"
async def tool_registration_demo():
"""工具注册示例"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 创建带工具的Agent
agent = AssistantAgent(
name="assistant",
model_client=model_client,
tools=[
FunctionTool.from_function(get_current_time),
FunctionTool.from_function(calculate),
],
# 工具调用后反思(可选)
reflect_on_tool_use=True,
)
# 运行:Agent会自动选择调用工具
result = await agent.run(task="现在几点了?另外帮我算一下123*456等于多少")
for msg in result.messages:
print(f"[{msg.source}]: {msg.content}")
asyncio.run(tool_registration_demo())
5.3 工具调用事件详解
from autogen_agentchat.messages import ToolCallEvent, ToolCallExecutionEvent
async def tool_event_demo():
"""
工具调用事件详解
事件类型:
- ToolCallEvent: LLM发起工具调用请求
- ToolCallExecutionEvent: 工具执行完成
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(
name="assistant",
model_client=model_client,
tools=[FunctionTool.from_function(get_current_time)],
)
# 使用流式输出查看事件
async for message in agent.run_stream(task="现在几点了?"):
message_type = type(message).__name__
if isinstance(message, ToolCallEvent):
# 工具调用请求
print(f"=== 工具调用请求 ===")
for call in message.content:
print(f"工具名: {call.name}")
print(f"参数: {call.arguments}")
elif isinstance(message, ToolCallExecutionEvent):
# 工具执行结果
print(f"=== 工具执行结果 ===")
for result in message.content:
print(f"调用ID: {result.call_id}")
print(f"结果: {result.content}")
elif hasattr(message, 'content') and isinstance(message.content, str):
# 最终回复
print(f"=== 最终回复 ===")
print(message.content)
asyncio.run(tool_event_demo())
5.4 CodeExecutor体系详解
CodeExecutor用于执行LLM生成的代码:
5.5 LocalCommandLineCodeExecutor vs DockerCommandLineCodeExecutor
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
# 本地代码执行器(开发测试用)
local_executor = LocalCommandLineCodeExecutor(
work_dir="./workspace", # 工作目录
timeout=60, # 执行超时(秒)
functions=[], # 可用函数列表
)
# Docker代码执行器(生产环境推荐)
docker_executor = DockerCommandLineCodeExecutor(
image="python:3.11-slim", # Docker镜像
work_dir="/workspace", # 容器内工作目录
timeout=60,
bind_dir="./workspace", # 宿主机绑定目录
)
Local vs Docker 对比:
| 特性 | LocalCommandLine | DockerCommandLine |
|---|---|---|
| 安全性 | 低(直接执行) | 高(隔离环境) |
| 性能 | 高 | 中等 |
| 依赖管理 | 使用系统环境 | 使用镜像环境 |
| 适用场景 | 开发测试 | 生产环境 |
5.6 代码执行安全配置
import asyncio
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def secure_code_execution():
"""
安全代码执行示例
最佳实践:
1. 使用Docker隔离
2. 设置资源限制
3. 限制网络访问
4. 超时控制
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 创建安全的代码执行器
safe_executor = DockerCommandLineCodeExecutor(
image="python:3.11-slim",
work_dir="/workspace",
timeout=30, # 30秒超时
bind_dir="./safe_workspace",
# 额外的Docker配置
container_name="autogen_executor",
)
# 创建代码编写Agent
coder = AssistantAgent(
name="coder",
model_client=model_client,
system_message="""你是Python程序员。
编写代码时请:
1. 只使用标准库
2. 不进行文件系统操作
3. 不进行网络请求
""",
)
# 创建代码执行Agent
executor = CodeExecutorAgent(
name="executor",
code_executor=safe_executor,
)
# 创建团队
team = RoundRobinGroupChat(
participants=[coder, executor],
termination_condition=MaxMessageTermination(6)
)
# 运行
stream = team.run_stream(task="写一个计算斐波那契数列第10项的代码并执行")
await Console(stream)
asyncio.run(secure_code_execution())
5.7 实战场景:构建代码生成与执行Agent
import asyncio
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def code_generation_and_execution():
"""
完整的代码生成与执行流程
流程:
1. 用户提出需求
2. Coder生成代码
3. Executor执行代码
4. Reviewer检查结果
5. 如有问题,Coder修复
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 代码执行器
executor = LocalCommandLineCodeExecutor(
work_dir="./code_workspace",
timeout=60
)
# 代码编写者
coder = AssistantAgent(
name="coder",
model_client=model_client,
system_message="""你是资深Python开发者。
任务:
1. 根据需求编写Python代码
2. 代码需要完整可执行
3. 包含必要的注释
4. 处理可能的异常
格式要求:
```python
# 你的代码
```
如果执行出错,请分析错误并修复代码。
完成后说'代码已完成'。
""",
)
# 代码执行者
code_executor = CodeExecutorAgent(
name="executor",
code_executor=executor,
)
# 代码审核者
reviewer = AssistantAgent(
name="reviewer",
model_client=model_client,
system_message="""你是代码审核专家。
检查:
1. 代码是否正确执行
2. 结果是否符合预期
3. 是否有潜在问题
如果一切正常说'审核通过'。
如果有问题,指出需要修复的地方。
""",
)
# 创建团队
team = RoundRobinGroupChat(
participants=[coder, code_executor, reviewer],
termination_condition=TextMentionTermination("审核通过") | TextMentionTermination("代码已完成")
)
# 执行任务
print("=== 代码生成与执行系统 ===\n")
stream = team.run_stream(
task="编写一个Python程序:读取一个数字列表,计算平均值、最大值、最小值,并输出结果。测试数据:[1, 5, 3, 9, 2, 7]"
)
await Console(stream)
asyncio.run(code_generation_and_execution())
第六章:终止条件与任务控制
6.1 TerminationCondition体系概述
6.2 内置终止条件详解
from autogen_agentchat.conditions import (
TextMentionTermination,
MaxMessageTermination,
SourceTermination,
ExternalTermination,
HandoffTermination,
StopMessageTermination,
)
# 1. TextMentionTermination - 文本触发终止
text_termination = TextMentionTermination(
text="完成", # 当消息中出现此文本时终止
)
# 2. MaxMessageTermination - 消息数量限制
max_msg_termination = MaxMessageTermination(
max_messages=10, # 达到10条消息时终止
)
# 3. SourceTermination - 特定Agent终止
source_termination = SourceTermination(
sources=["user"], # 当user发言后终止
)
# 4. ExternalTermination - 外部控制终止
external_termination = ExternalTermination()
# 在外部调用 external_termination.set() 来终止
# 5. HandoffTermination - Handoff触发终止
handoff_termination = HandoffTermination(
target="user", # 当handoff到user时终止
)
# 6. StopMessageTermination - StopMessage消息终止
stop_termination = StopMessageTermination()
# 当Agent返回StopMessage时终止
6.3 组合终止条件
from autogen_agentchat.conditions import (
TextMentionTermination,
MaxMessageTermination,
SourceTermination,
OrTerminationCondition,
AndTerminationCondition,
)
# OR条件:任一条件满足即终止
or_termination = (
TextMentionTermination("完成") |
TextMentionTermination("结束") |
MaxMessageTermination(20)
)
# AND条件:所有条件都满足才终止
and_termination = (
SourceTermination(["reviewer"]) &
TextMentionTermination("审核通过")
)
# 复杂组合
complex_termination = (
(TextMentionTermination("完成") | TextMentionTermination("成功")) &
MaxMessageTermination(50)
)
# 使用示例
async def combined_termination_demo():
"""组合终止条件示例"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(
name="assistant",
model_client=model_client,
system_message="完成任务后说'完成',或者说出'放弃'。"
)
team = RoundRobinGroupChat(
participants=[agent],
termination_condition=(
TextMentionTermination("完成") |
TextMentionTermination("放弃") |
MaxMessageTermination(10)
)
)
result = await team.run(task="请介绍一下你自己")
print(f"停止原因: {result.stop_reason}")
asyncio.run(combined_termination_demo())
6.4 自定义终止条件开发
from autogen_agentchat.conditions import TerminationCondition
from autogen_agentchat.messages import BaseChatMessage
from typing import Sequence
class KeywordCountTermination(TerminationCondition):
"""
自定义终止条件:关键词出现次数
当特定关键词累计出现指定次数时终止
"""
def __init__(self, keyword: str, count: int):
"""
初始化
参数:
keyword: 监控的关键词
count: 触发终止的出现次数
"""
self._keyword = keyword
self._target_count = count
self._current_count = 0
self._terminated = False
@property
def terminated(self) -> bool:
"""是否已终止"""
return self._terminated
async def __call__(
self,
messages: Sequence[BaseChatMessage]
) -> bool:
"""
检查是否应该终止
参数:
messages: 消息序列
返回:
是否终止
"""
for message in messages:
if hasattr(message, 'content'):
content = message.content
if isinstance(content, str):
self._current_count += content.count(self._keyword)
if self._current_count >= self._target_count:
self._terminated = True
return self._terminated
async def reset(self) -> None:
"""重置状态"""
self._current_count = 0
self._terminated = False
# 使用自定义终止条件
async def custom_termination_demo():
"""自定义终止条件示例"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(name="assistant", model_client=model_client)
# 当"好"字出现3次时终止
termination = KeywordCountTermination(keyword="好", count=3)
team = RoundRobinGroupChat(
participants=[agent],
termination_condition=termination
)
result = await team.run(task="请说三句包含'好'字的句子")
print(f"停止原因: '好'字出现了3次")
asyncio.run(custom_termination_demo())
6.5 CancellationToken:任务取消机制
import asyncio
from autogen_core import CancellationToken
async def cancellation_demo():
"""
CancellationToken使用示例
用途:
1. 用户主动取消任务
2. 超时自动取消
3. 外部事件触发取消
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(name="assistant", model_client=model_client)
# 创建取消令牌
cancellation_token = CancellationToken()
# 方式1:超时取消
try:
result = await asyncio.wait_for(
agent.run(
task="请写一篇很长的文章",
cancellation_token=cancellation_token
),
timeout=5.0 # 5秒超时
)
except asyncio.TimeoutError:
cancellation_token.cancel() # 触发取消
print("任务超时已取消")
# 方式2:手动取消
async def run_with_manual_cancel():
"""支持手动取消的任务"""
token = CancellationToken()
# 在另一个协程中取消
async def cancel_after_delay():
await asyncio.sleep(2)
token.cancel()
print("已发送取消信号")
# 并发运行
asyncio.create_task(cancel_after_delay())
try:
result = await agent.run(
task="执行任务",
cancellation_token=token
)
except asyncio.CancelledError:
print("任务被取消")
asyncio.run(cancellation_demo())
6.6 Team暂停、恢复与终止控制
import asyncio
from autogen_agentchat.conditions import ExternalTermination
async def team_control_demo():
"""
Team运行控制示例
展示如何暂停、恢复和终止Team
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(name="assistant", model_client=model_client)
# 使用ExternalTermination进行外部控制
external_termination = ExternalTermination()
team = RoundRobinGroupChat(
participants=[agent],
termination_condition=external_termination
)
# 在后台运行Team
async def run_team():
async for message in team.run_stream(task="持续对话"):
print(f"[{message.source if hasattr(message, 'source') else 'system'}]: {message}")
# 控制协程
async def control_team():
await asyncio.sleep(3) # 等待3秒
print("\n>>> 触发终止 <<<")
external_termination.set() # 终止Team
# 并发运行
await asyncio.gather(
run_team(),
control_team()
)
asyncio.run(team_control_demo())
6.7 实战场景:精确控制对话结束时机
import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import (
TextMentionTermination,
MaxMessageTermination,
SourceTermination,
HandoffTermination
)
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def precise_termination_demo():
"""
精确控制对话结束时机
场景:客服对话系统
- 用户说"再见"时结束
- 或对话超过20轮时结束
- 或转人工时结束
- 或客服确认问题解决时结束
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# AI客服
ai_support = AssistantAgent(
name="ai_support",
model_client=model_client,
system_message="""你是智能客服。
职责:
1. 解答用户问题
2. 无法解决时转人工
问题解决后说'问题已解决'。
需要人工时说'转人工客服'。
""",
handoffs=["human_support"] # 可转交人工
)
# 人工客服(模拟)
human_support = UserProxyAgent(
name="human_support",
human_input_mode="ALWAYS",
)
# 用户
user = UserProxyAgent(
name="user",
human_input_mode="ALWAYS",
)
# 组合终止条件
termination = (
TextMentionTermination("再见") | # 用户说再见
TextMentionTermination("问题已解决") | # 问题解决
MaxMessageTermination(30) | # 最多30轮
HandoffTermination(target="human_support") # 转人工时终止
)
# 创建团队
team = SelectorGroupChat(
participants=[user, ai_support, human_support],
model_client=model_client,
termination_condition=termination,
)
print("=== 客服系统启动 ===")
print("输入'再见'结束对话\n")
stream = team.run_stream(task="您好,请问有什么可以帮助您的?")
await Console(stream)
asyncio.run(precise_termination_demo())
第七章:消息体系与事件机制
7.1 消息继承体系:BaseChatMessage基类
消息类型详解:
| 消息类型 | 用途 | content类型 |
|---|---|---|
| TextMessage | 文本消息 | str |
| MultiModalMessage | 多模态消息(文本+图片) | List[MultiModalContent] |
| HandoffMessage | Agent间任务转交 | 包含target和context |
| ToolCallMessage | 工具调用请求 | List[FunctionCall] |
| StopMessage | 终止信号 | str |
7.2 消息类型详解
from autogen_agentchat.messages import (
TextMessage,
MultiModalMessage,
HandoffMessage,
ToolCallMessage,
StopMessage,
)
from autogen_core import Image
async def message_types_demo():
"""
各种消息类型使用示例
"""
# 1. TextMessage - 文本消息
text_msg = TextMessage(
content="这是一条文本消息",
source="user"
)
# 2. MultiModalMessage - 多模态消息
from PIL import Image as PILImage
import io
# 创建示例图片
pil_image = PILImage.new('RGB', (100, 100), color='red')
ag_image = Image(pil_image)
multimodal_msg = MultiModalMessage(
content=[
"请描述这张图片:",
ag_image
],
source="user"
)
# 3. HandoffMessage - 任务转交
handoff_msg = HandoffMessage(
target="expert_agent",
context="用户需要专业分析",
source="coordinator"
)
# 4. StopMessage - 终止信号
stop_msg = StopMessage(
content="任务已完成",
source="assistant"
)
print(f"文本消息: {text_msg.content}")
print(f"多模态消息: {multimodal_msg.content}")
print(f"转交消息: 转交给 {handoff_msg.target}")
print(f"终止消息: {stop_msg.content}")
asyncio.run(message_types_demo())
7.3 多模态消息处理
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import MultiModalMessage
from autogen_core import Image
from autogen_ext.models.openai import OpenAIChatCompletionClient
from PIL import Image as PILImage
async def multimodal_demo():
"""
多模态消息处理示例
展示如何发送图片给Agent进行分析
"""
# 使用支持视觉的模型
model_client = OpenAIChatCompletionClient(
model="gpt-4o", # 支持视觉
)
agent = AssistantAgent(
name="vision_assistant",
model_client=model_client,
system_message="你是一个图像分析专家,能够识别和描述图片内容。"
)
# 加载本地图片
pil_image = PILImage.open("./example_image.png")
ag_image = Image(pil_image)
# 创建多模态消息
multimodal_msg = MultiModalMessage(
content=[
"请分析这张图片,描述其中的内容:",
ag_image
],
source="user"
)
# 运行Agent
result = await agent.run(task=multimodal_msg)
print(result.messages[-1].content)
asyncio.run(multimodal_demo())
7.4 Response对象详解
from dataclasses import dataclass
from typing import List, Optional
from autogen_agentchat.messages import BaseChatMessage, BaseAgentEvent
@dataclass
class Response:
"""
Agent响应对象
属性:
chat_message: 主要输出消息,发送给其他Agent或用户
inner_messages: 内部事件列表,用于UI展示或调试
terminate: 是否请求终止对话
"""
chat_message: BaseChatMessage
inner_messages: Optional[List[BaseAgentEvent]] = None
terminate: bool = False
async def response_demo():
"""Response对象使用示例"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(
name="assistant",
model_client=model_client,
tools=[FunctionTool.from_function(lambda x: x)], # 示例工具
)
# 使用on_messages获取Response
from autogen_agentchat.messages import TextMessage
from autogen_core import CancellationToken
response = await agent.on_messages(
messages=[TextMessage(content="你好", source="user")],
cancellation_token=CancellationToken()
)
# 访问主要消息
print(f"主要消息: {response.chat_message.content}")
# 访问内部事件
if response.inner_messages:
print(f"内部事件数量: {len(response.inner_messages)}")
for event in response.inner_messages:
print(f"事件类型: {type(event).__name__}")
# 检查终止标志
print(f"是否终止: {response.terminate}")
asyncio.run(response_demo())
7.5 inner_messages机制与UI集成
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import ToolCallEvent, ToolCallExecutionEvent
from autogen_core.tools import FunctionTool
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def inner_messages_ui_demo():
"""
inner_messages机制示例
展示如何利用inner_messages实现UI实时更新
"""
# 定义工具
def search(query: str) -> str:
"""搜索工具"""
return f"搜索结果: {query}"
def analyze(data: str) -> str:
"""分析工具"""
return f"分析结果: {data}"
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(
name="assistant",
model_client=model_client,
tools=[
FunctionTool.from_function(search),
FunctionTool.from_function(analyze),
],
)
# 模拟UI事件处理
async def handle_ui_events():
"""处理UI事件"""
async for message in agent.run_stream(
task="搜索AutoGen相关信息并分析"
):
# 根据消息类型更新UI
if isinstance(message, ToolCallEvent):
# 工具调用开始 - 显示加载状态
for call in message.content:
print(f"[UI] 正在调用工具: {call.name}")
print(f"[UI] 参数: {call.arguments}")
elif isinstance(message, ToolCallExecutionEvent):
# 工具调用完成 - 显示结果
for result in message.content:
print(f"[UI] 工具返回: {result.content}")
elif hasattr(message, 'content') and isinstance(message.content, str):
# 最终回复 - 显示在聊天框
print(f"[UI] Agent回复: {message.content}")
await handle_ui_events()
asyncio.run(inner_messages_ui_demo())
7.6 Agent状态保存与恢复
import asyncio
import json
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def state_management_demo():
"""
Agent状态保存与恢复示例
展示如何保存Agent状态以便后续恢复
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(
name="assistant",
model_client=model_client,
)
# 第一次对话
print("=== 第一次对话 ===")
result1 = await agent.run(task="我叫张三,我喜欢编程")
print(f"回复: {result1.messages[-1].content}")
# 保存状态
state = await agent.save_state()
print(f"\n保存的状态: {json.dumps(state, indent=2, ensure_ascii=False)[:200]}...")
# 模拟Agent重启
new_agent = AssistantAgent(
name="assistant",
model_client=model_client,
)
# 恢复状态
await new_agent.load_state(state)
print("\n状态已恢复")
# 第二次对话(新Agent应该记住之前的信息)
print("\n=== 第二次对话(状态已恢复)===")
result2 = await new_agent.run(task="我叫什么名字?我喜欢什么?")
print(f"回复: {result2.messages[-1].content}")
asyncio.run(state_management_demo())
7.7 Handoff机制:Agent间任务传递
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import HandoffTermination, MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def handoff_demo():
"""
Handoff机制示例
展示Agent间如何通过handoff传递任务
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 前台接待
receptionist = AssistantAgent(
name="receptionist",
model_client=model_client,
system_message="""你是前台接待。
根据用户需求转交给合适的专家:
- 数学问题 -> mathematician
- 编程问题 -> programmer
转交时说明用户需求。
""",
handoffs=["mathematician", "programmer"],
description="前台接待,负责分流用户请求"
)
# 数学家
mathematician = AssistantAgent(
name="mathematician",
model_client=model_client,
system_message="你是数学专家,解答数学问题。完成后说'问题已解决'。",
description="数学专家"
)
# 程序员
programmer = AssistantAgent(
name="programmer",
model_client=model_client,
system_message="你是编程专家,解答编程问题。完成后说'问题已解决'。",
description="编程专家"
)
# 创建团队
team = SelectorGroupChat(
participants=[receptionist, mathematician, programmer],
model_client=model_client,
termination_condition=(
TextMentionTermination("问题已解决") |
MaxMessageTermination(10)
),
)
print("=== Handoff演示 ===\n")
stream = team.run_stream(task="请帮我用Python实现一个快速排序算法")
await Console(stream)
asyncio.run(handoff_demo())
7.8 实战场景:实现长时间运行的多轮任务
import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
class LongRunningTaskManager:
"""
长时间运行任务管理器
功能:
1. 支持任务暂停和恢复
2. 保存对话历史
3. 支持断点续传
"""
def __init__(self, model_client):
self.model_client = model_client
self.agents = {}
self.team = None
self.history = []
async def initialize(self):
"""初始化团队"""
self.agents["assistant"] = AssistantAgent(
name="assistant",
model_client=self.model_client,
system_message="你是项目助手,帮助用户完成长期项目。"
)
self.agents["user"] = UserProxyAgent(
name="user",
human_input_mode="ALWAYS",
)
self.team = RoundRobinGroupChat(
participants=[self.agents["user"], self.agents["assistant"]],
termination_condition=TextMentionTermination("结束会话")
)
async def run_session(self, initial_task: str = None):
"""运行一个会话"""
if initial_task:
stream = self.team.run_stream(task=initial_task)
else:
stream = self.team.run_stream()
async for message in stream:
self.history.append(message)
yield message
async def save_session(self, filepath: str):
"""保存会话状态"""
# 保存Agent状态
states = {}
for name, agent in self.agents.items():
states[name] = await agent.save_state()
# 保存到文件
import json
with open(filepath, 'w') as f:
json.dump({
"agent_states": states,
"history_count": len(self.history)
}, f)
print(f"会话已保存到 {filepath}")
async def load_session(self, filepath: str):
"""加载会话状态"""
import json
with open(filepath, 'r') as f:
data = json.load(f)
for name, state in data["agent_states"].items():
if name in self.agents:
await self.agents[name].load_state(state)
print(f"会话已从 {filepath} 恢复")
async def long_running_task_demo():
"""长时间运行任务演示"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
manager = LongRunningTaskManager(model_client)
await manager.initialize()
print("=== 长期项目助手 ===")
print("输入'结束会话'来保存并退出\n")
async for message in manager.run_session(
initial_task="我想开发一个个人博客系统,请帮我规划"
):
pass
# 保存会话
await manager.save_session("./session_backup.json")
asyncio.run(long_running_task_demo())
第八章:Core层——事件驱动架构与底层开发
8.1 Core层核心概念
Core层是AutoGen的底层API,提供事件驱动的Agent运行时:
核心概念说明:
| 概念 | 说明 |
|---|---|
| AgentRuntime | Agent运行时,管理Agent生命周期和消息传递 |
| RoutedAgent | 可路由的Agent,通过订阅机制接收消息 |
| AgentId | Agent的唯一标识,包含type和key |
| TopicId | 主题标识,用于发布/订阅 |
| MessageContext | 消息上下文,包含发送者信息 |
| @message_handler | 装饰器,标记消息处理方法 |
| @type_subscription | 装饰器,订阅特定类型消息 |
8.2 消息处理装饰器
from autogen_core import RoutedAgent, message_handler, MessageContext
from dataclasses import dataclass
# 定义消息类型
@dataclass
class TaskMessage:
"""任务消息"""
task_id: str
content: str
@dataclass
class ResultMessage:
"""结果消息"""
task_id: str
result: str
class WorkerAgent(RoutedAgent):
"""
工作Agent示例
使用@message_handler处理不同类型的消息
"""
def __init__(self, name: str):
super().__init__(f"工作Agent: {name}")
self.name = name
self.processed_count = 0
@message_handler
async def handle_task(
self,
message: TaskMessage,
ctx: MessageContext
) -> ResultMessage:
"""
处理任务消息
参数:
message: TaskMessage类型的消息
ctx: 消息上下文,包含发送者信息
返回:
ResultMessage类型的结果
"""
self.processed_count += 1
print(f"[{self.name}] 收到任务: {message.task_id}")
print(f"[{self.name}] 任务内容: {message.content}")
print(f"[{self.name}] 发送者: {ctx.sender}")
# 处理任务
result = f"已处理: {message.content}"
return ResultMessage(
task_id=message.task_id,
result=result
)
@message_handler
async def handle_ping(self, message: str, ctx: MessageContext) -> str:
"""
处理字符串消息
简单的ping-pong响应
"""
if message == "ping":
return "pong"
return f"收到: {message}"
8.3 订阅机制
from autogen_core import (
RoutedAgent,
message_handler,
type_subscription,
default_subscription,
TopicId,
AgentId,
)
@type_subscription(topic_type="math_tasks")
class MathAgent(RoutedAgent):
"""
数学Agent - 订阅math_tasks主题
"""
def __init__(self):
super().__init__("数学计算Agent")
@message_handler
async def handle_math(self, message: str, ctx) -> str:
"""处理数学任务"""
try:
result = eval(message)
return f"计算结果: {result}"
except Exception as e:
return f"计算错误: {e}"
@type_subscription(topic_type="text_tasks")
class TextAgent(RoutedAgent):
"""
文本Agent - 订阅text_tasks主题
"""
def __init__(self):
super().__init__("文本处理Agent")
@message_handler
async def handle_text(self, message: str, ctx) -> str:
"""处理文本任务"""
return f"文本长度: {len(message)}, 单词数: {len(message.split())}"
@default_subscription
class DefaultAgent(RoutedAgent):
"""
默认Agent - 订阅所有消息
"""
def __init__(self):
super().__init__("默认处理Agent")
@message_handler
async def handle_default(self, message: str, ctx) -> str:
"""处理所有未匹配的消息"""
return f"默认处理: {message}"
8.4 AgentRuntime运行时
import asyncio
from autogen_core import (
SingleThreadedAgentRuntime,
AgentId,
TopicId,
)
async def runtime_demo():
"""
AgentRuntime使用示例
展示如何创建运行时、注册Agent、发送消息
"""
# 创建单线程运行时(开发测试用)
runtime = SingleThreadedAgentRuntime()
# 注册Agent类型
await MathAgent.register(
runtime,
"math_agent", # Agent类型名称
lambda: MathAgent() # 工厂函数
)
await TextAgent.register(
runtime,
"text_agent",
lambda: TextAgent()
)
# 启动运行时
runtime.start()
# 方式1:直接发送消息给Agent
math_agent_id = AgentId("math_agent", "default")
result1 = await runtime.send_message(
"2 + 3 * 4",
math_agent_id
)
print(f"数学Agent回复: {result1}")
# 方式2:通过主题发布消息
topic = TopicId("math_tasks", "default")
result2 = await runtime.publish_message(
"10 / 2",
topic
)
print(f"主题发布结果: {result2}")
# 停止运行时
await runtime.stop()
asyncio.run(runtime_demo())
8.5 Agent注册与启动
import asyncio
from autogen_core import SingleThreadedAgentRuntime, AgentId, TopicId
async def agent_lifecycle_demo():
"""
Agent生命周期管理示例
"""
runtime = SingleThreadedAgentRuntime()
# 注册多个Agent实例
await WorkerAgent.register(
runtime,
"worker",
lambda: WorkerAgent("Worker-1")
)
# 注册带状态的Agent
class StatefulAgent(RoutedAgent):
def __init__(self, initial_state: dict):
super().__init__("有状态Agent")
self.state = initial_state
@message_handler
async def get_state(self, message: str, ctx) -> dict:
if message == "get_state":
return self.state
elif message.startswith("set_state:"):
key, value = message[10:].split("=")
self.state[key] = value
return f"状态已更新: {key}={value}"
return self.state
await StatefulAgent.register(
runtime,
"stateful",
lambda: StatefulAgent({"count": 0, "name": "test"})
)
# 启动运行时
runtime.start()
# 与Agent交互
stateful_id = AgentId("stateful", "default")
# 获取状态
state = await runtime.send_message("get_state", stateful_id)
print(f"当前状态: {state}")
# 更新状态
result = await runtime.send_message("set_state:count=10", stateful_id)
print(f"更新结果: {result}")
# 再次获取状态
state = await runtime.send_message("get_state", stateful_id)
print(f"更新后状态: {state}")
await runtime.stop()
asyncio.run(agent_lifecycle_demo())
8.6 发布/订阅消息传递模式
import asyncio
from autogen_core import (
SingleThreadedAgentRuntime,
RoutedAgent,
message_handler,
type_subscription,
TopicId,
)
from dataclasses import dataclass
@dataclass
class NewsMessage:
"""新闻消息"""
category: str
title: str
content: str
@type_subscription(topic_type="news.sports")
class SportsNewsAgent(RoutedAgent):
"""体育新闻订阅者"""
def __init__(self):
super().__init__("体育新闻Agent")
self.news_count = 0
@message_handler
async def handle_news(self, message: NewsMessage, ctx) -> None:
self.news_count += 1
print(f"[体育新闻 #{self.news_count}] {message.title}")
@type_subscription(topic_type="news.tech")
class TechNewsAgent(RoutedAgent):
"""科技新闻订阅者"""
def __init__(self):
super().__init__("科技新闻Agent")
self.news_count = 0
@message_handler
async def handle_news(self, message: NewsMessage, ctx) -> None:
self.news_count += 1
print(f"[科技新闻 #{self.news_count}] {message.title}")
@type_subscription(topic_type="news.*") # 通配符订阅
class AllNewsAgent(RoutedAgent):
"""所有新闻订阅者"""
def __init__(self):
super().__init__("新闻汇总Agent")
self.all_news = []
@message_handler
async def handle_news(self, message: NewsMessage, ctx) -> None:
self.all_news.append(message)
print(f"[新闻汇总] 收到 {message.category} 新闻: {message.title}")
async def pubsub_demo():
"""发布/订阅模式演示"""
runtime = SingleThreadedAgentRuntime()
# 注册订阅者
await SportsNewsAgent.register(runtime, "sports_news", lambda: SportsNewsAgent())
await TechNewsAgent.register(runtime, "tech_news", lambda: TechNewsAgent())
await AllNewsAgent.register(runtime, "all_news", lambda: AllNewsAgent())
runtime.start()
# 发布新闻
sports_topic = TopicId("news.sports", "default")
tech_topic = TopicId("news.tech", "default")
await runtime.publish_message(
NewsMessage("sports", "世界杯决赛", "精彩比赛..."),
sports_topic
)
await runtime.publish_message(
NewsMessage("tech", "AI新突破", "GPT-5发布..."),
tech_topic
)
await runtime.stop()
asyncio.run(pubsub_demo())
8.7 实战场景:构建底层事件驱动Agent系统
import asyncio
from autogen_core import (
SingleThreadedAgentRuntime,
RoutedAgent,
message_handler,
type_subscription,
TopicId,
AgentId,
MessageContext,
)
from dataclasses import dataclass
from typing import Optional
# 定义消息类型
@dataclass
class OrderCreated:
"""订单创建消息"""
order_id: str
customer: str
items: list
total: float
@dataclass
class PaymentProcessed:
"""支付处理消息"""
order_id: str
success: bool
transaction_id: Optional[str] = None
@dataclass
class OrderShipped:
"""订单发货消息"""
order_id: str
tracking_number: str
@type_subscription(topic_type="order.created")
class OrderProcessor(RoutedAgent):
"""订单处理器"""
def __init__(self):
super().__init__("订单处理器")
@message_handler
async def handle_order(self, message: OrderCreated, ctx: MessageContext) -> str:
print(f"[订单处理器] 处理订单: {message.order_id}")
print(f" 客户: {message.customer}")
print(f" 商品: {message.items}")
print(f" 总额: ${message.total}")
# 发送支付请求
payment_topic = TopicId("payment.request", "default")
await ctx.publish_message(
PaymentProcessed(
order_id=message.order_id,
success=True,
transaction_id=f"TXN-{message.order_id}"
),
payment_topic
)
return f"订单 {message.order_id} 已处理"
@type_subscription(topic_type="payment.request")
class PaymentProcessor(RoutedAgent):
"""支付处理器"""
def __init__(self):
super().__init__("支付处理器")
@message_handler
async def handle_payment(self, message: PaymentProcessed, ctx: MessageContext) -> str:
print(f"[支付处理器] 处理支付: {message.order_id}")
if message.success:
print(f" 支付成功: {message.transaction_id}")
# 发送发货请求
shipping_topic = TopicId("shipping.request", "default")
await ctx.publish_message(
OrderShipped(
order_id=message.order_id,
tracking_number=f"TRK-{message.order_id}"
),
shipping_topic
)
else:
print(f" 支付失败")
return f"支付处理完成: {message.order_id}"
@type_subscription(topic_type="shipping.request")
class ShippingProcessor(RoutedAgent):
"""发货处理器"""
def __init__(self):
super().__init__("发货处理器")
@message_handler
async def handle_shipping(self, message: OrderShipped, ctx: MessageContext) -> str:
print(f"[发货处理器] 处理发货: {message.order_id}")
print(f" 物流单号: {message.tracking_number}")
return f"订单 {message.order_id} 已发货"
async def ecommerce_workflow_demo():
"""电商工作流演示"""
print("=== 电商订单处理系统 ===\n")
runtime = SingleThreadedAgentRuntime()
# 注册处理器
await OrderProcessor.register(runtime, "order_processor", lambda: OrderProcessor())
await PaymentProcessor.register(runtime, "payment_processor", lambda: PaymentProcessor())
await ShippingProcessor.register(runtime, "shipping_processor", lambda: ShippingProcessor())
runtime.start()
# 创建订单
order_topic = TopicId("order.created", "default")
await runtime.publish_message(
OrderCreated(
order_id="ORD-001",
customer="张三",
items=["商品A", "商品B"],
total=299.99
),
order_topic
)
await asyncio.sleep(1) # 等待处理完成
await runtime.stop()
asyncio.run(ecommerce_workflow_demo())
第九章:扩展集成与分布式部署
9.1 Extensions扩展机制概述
9.2 McpWorkbench详解:MCP Server集成
import asyncio
from autogen_ext.tools.mcp import McpWorkbench
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def mcp_integration_demo():
"""
MCP (Model Context Protocol) 集成示例
MCP是Anthropic推出的标准化协议,用于AI模型与外部工具的交互
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 创建MCP Workbench
# 方式1:连接到MCP服务器
async with McpWorkbench(server_params={
"command": "python",
"args": ["mcp_server.py"], # MCP服务器脚本
}) as workbench:
# 获取可用工具
tools = await workbench.list_tools()
print(f"可用工具: {[t.name for t in tools]}")
# 创建Agent并使用MCP工具
agent = AssistantAgent(
name="mcp_assistant",
model_client=model_client,
tools=tools, # 直接使用MCP工具
)
# 运行Agent
result = await agent.run(
task="使用MCP工具查询当前天气"
)
print(result.messages[-1].content)
asyncio.run(mcp_integration_demo())
9.3 OpenAIAssistantAgent扩展使用
import asyncio
from autogen_ext.agents.openai import OpenAIAssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def openai_assistant_demo():
"""
OpenAI Assistant Agent示例
使用OpenAI的Assistant API,支持:
- 代码解释器
- 文件搜索
- 函数调用
"""
model_client = OpenAIChatCompletionClient(model="gpt-4o")
# 创建OpenAI Assistant Agent
assistant = OpenAIAssistantAgent(
name="code_assistant",
model_client=model_client,
instructions="你是一个代码专家,帮助用户编写和调试代码。",
tools=[
{"type": "code_interpreter"}, # 代码解释器
],
)
# 运行
result = await assistant.run(
task="写一个Python函数计算斐波那契数列,并测试前10项"
)
print(result.messages[-1].content)
# 上传文件(可选)
# await assistant.upload_file("./data.csv")
# 清理
await assistant.delete()
asyncio.run(openai_assistant_demo())
9.4 GrpcWorkerAgentRuntime分布式部署
import asyncio
from autogen_core import RoutedAgent, message_handler
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime
class DistributedWorkerAgent(RoutedAgent):
"""分布式工作Agent"""
def __init__(self, worker_id: str):
super().__init__(f"Worker-{worker_id}")
self.worker_id = worker_id
@message_handler
async def process(self, message: str, ctx) -> str:
"""处理任务"""
return f"Worker-{self.worker_id} 处理: {message}"
async def distributed_runtime_demo():
"""
分布式运行时示例
架构:
- 主节点:GrpcAgentRuntimeHost
- 工作节点:GrpcWorkerAgentRuntime
"""
# 工作节点配置
worker = GrpcWorkerAgentRuntime(
host_address="localhost:50051", # 主节点地址
worker_id="worker-1",
)
# 注册Agent
await DistributedWorkerAgent.register(
worker,
"distributed_worker",
lambda: DistributedWorkerAgent("1")
)
# 启动工作节点
await worker.start()
print("工作节点已启动,等待任务...")
# 保持运行
try:
await asyncio.sleep(3600) # 运行1小时
finally:
await worker.stop()
# 主节点代码(单独进程)
async def host_demo():
"""主节点示例"""
from autogen_ext.runtimes.grpc import GrpcAgentRuntimeHost
host = GrpcAgentRuntimeHost(address="localhost:50051")
await host.start()
print("主节点已启动")
# 发送任务到工作节点
# result = await host.send_message(...)
await host.stop()
# asyncio.run(distributed_runtime_demo())
9.5 AutoGen Studio可视化开发
AutoGen Studio是一个Web UI,用于无代码开发Agent系统:
# 安装
pip install -U autogenstudio
# 启动
autogenstudio ui --port 8080 --appdir ./myapp
Studio功能:
从Studio到代码的转换:
# Studio配置导出为Python代码
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient
# Studio生成的配置
studio_config = {
"agents": [
{
"name": "assistant",
"type": "AssistantAgent",
"model": "gpt-4o",
"system_message": "你是一个有帮助的助手"
}
],
"team": {
"type": "RoundRobinGroupChat",
"max_turns": 10
}
}
# 转换为代码
def create_from_config(config):
"""从Studio配置创建Agent系统"""
model_client = OpenAIChatCompletionClient(
model=config["agents"][0]["model"]
)
agents = []
for agent_config in config["agents"]:
agent = AssistantAgent(
name=agent_config["name"],
model_client=model_client,
system_message=agent_config["system_message"]
)
agents.append(agent)
team = RoundRobinGroupChat(
participants=agents,
termination_condition=MaxMessageTermination(
config["team"]["max_turns"]
)
)
return team
9.6 v0.2到v0.4迁移指南
"""
v0.2 到 v0.4 主要变化
"""
# ========== v0.2 代码 ==========
# from autogen import AssistantAgent, UserProxyAgent
# # v0.2: 使用llm_config字典
# llm_config = {
# "config_list": [{"model": "gpt-4", "api_key": "..."}]
# }
# assistant = AssistantAgent(
# name="assistant",
# llm_config=llm_config,
# )
# user_proxy = UserProxyAgent(
# name="user",
# human_input_mode="NEVER",
# )
# # v0.2: 使用initiate_chat
# user_proxy.initiate_chat(
# assistant,
# message="你好"
# )
# ========== v0.4 代码 ==========
import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def v04_style():
# v0.4: 使用模型客户端
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
api_key="..."
)
assistant = AssistantAgent(
name="assistant",
model_client=model_client, # 直接传入客户端
)
user_proxy = UserProxyAgent(
name="user",
human_input_mode="NEVER",
)
# v0.4: 使用Team和run
team = RoundRobinGroupChat(participants=[user_proxy, assistant])
result = await team.run(task="你好")
print(result.messages[-1].content)
asyncio.run(v04_style())
主要迁移变化:
| v0.2 | v0.4 |
|---|---|
| llm_config字典 | OpenAIChatCompletionClient |
| initiate_chat() | team.run() / team.run_stream() |
| ConversableAgent | BaseChatAgent |
| GroupChat | RoundRobinGroupChat / SelectorGroupChat |
| 同步API | 异步API (async/await) |
9.7 实战场景:构建企业级分布式Agent系统
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient
class EnterpriseAgentSystem:
"""
企业级Agent系统
特性:
1. 多租户支持
2. 负载均衡
3. 监控和日志
4. 错误恢复
"""
def __init__(self, config: dict):
self.config = config
self.model_clients = {}
self.teams = {}
self.metrics = {
"requests": 0,
"errors": 0,
"total_tokens": 0,
}
async def initialize(self):
"""初始化系统"""
# 创建模型客户端池
for name, model_config in self.config["models"].items():
self.model_clients[name] = OpenAIChatCompletionClient(**model_config)
# 创建Agent团队
for team_name, team_config in self.config["teams"].items():
self.teams[team_name] = await self._create_team(team_config)
async def _create_team(self, config: dict):
"""创建Agent团队"""
agents = []
for agent_config in config["agents"]:
model_client = self.model_clients[agent_config["model"]]
agent = AssistantAgent(
name=agent_config["name"],
model_client=model_client,
system_message=agent_config["system_message"],
)
agents.append(agent)
return SelectorGroupChat(
participants=agents,
model_client=self.model_clients["default"],
termination_condition=MaxMessageTermination(
config.get("max_messages", 20)
)
)
async def process_request(
self,
tenant_id: str,
team_name: str,
task: str
) -> dict:
"""
处理请求
参数:
tenant_id: 租户ID
team_name: 团队名称
task: 任务描述
返回:
处理结果和元数据
"""
self.metrics["requests"] += 1
try:
team = self.teams[team_name]
# 运行任务
result = await team.run(task=task)
# 收集指标
for msg in result.messages:
if hasattr(msg, 'models_usage') and msg.models_usage:
self.metrics["total_tokens"] += (
msg.models_usage.prompt_tokens +
msg.models_usage.completion_tokens
)
return {
"success": True,
"response": result.messages[-1].content,
"stop_reason": result.stop_reason,
"tenant_id": tenant_id,
}
except Exception as e:
self.metrics["errors"] += 1
return {
"success": False,
"error": str(e),
"tenant_id": tenant_id,
}
def get_metrics(self) -> dict:
"""获取系统指标"""
return self.metrics.copy()
# 配置示例
enterprise_config = {
"models": {
"default": {
"model": "gpt-4o",
"api_key": os.getenv("OPENAI_API_KEY") # 使用环境变量
},
"fast": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY")
}
},
"teams": {
"customer_service": {
"agents": [
{
"name": "receptionist",
"model": "fast",
"system_message": "你是客服接待"
},
{
"name": "expert",
"model": "default",
"system_message": "你是技术专家"
}
],
"max_messages": 15
}
}
}
async def enterprise_demo():
"""企业级系统演示"""
system = EnterpriseAgentSystem(enterprise_config)
await system.initialize()
# 处理请求
result = await system.process_request(
tenant_id="company-a",
team_name="customer_service",
task="我的订单在哪里?"
)
print(f"结果: {result}")
print(f"指标: {system.get_metrics()}")
asyncio.run(enterprise_demo())
第十章:生产最佳实践
10.1 架构设计原则
Agent职责划分建议:
# 好的设计:职责清晰
class GoodArchitecture:
"""
推荐的Agent架构
- ReceptionAgent: 接收请求,路由分发
- ExpertAgent: 专业领域处理
- ValidatorAgent: 结果验证
- ReporterAgent: 生成报告
"""
pass
# 不好的设计:职责混乱
class BadArchitecture:
"""
不推荐的Agent架构
- SuperAgent: 处理所有事情(职责过多)
"""
pass
AgentChat vs Core选择指南:
| 场景 | 推荐使用 |
|---|---|
| 快速原型开发 | AgentChat |
| 标准对话场景 | AgentChat |
| 简单工作流 | AgentChat |
| 复杂事件驱动 | Core |
| 分布式部署 | Core |
| 自定义消息路由 | Core |
10.2 性能优化策略
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
class PerformanceOptimizedAgent:
"""性能优化示例"""
@staticmethod
async def optimize_concurrency():
"""
并发控制优化
策略:使用信号量限制并发数
"""
# 创建信号量限制并发
semaphore = asyncio.Semaphore(5) # 最多5个并发
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(name="worker", model_client=model_client)
async def process_with_limit(task):
async with semaphore:
return await agent.run(task=task)
# 并发处理多个任务
tasks = [f"任务{i}" for i in range(20)]
results = await asyncio.gather(
*[process_with_limit(t) for t in tasks]
)
return results
@staticmethod
def optimize_tokens():
"""
Token优化策略
1. 精简system_message
2. 使用更短的变量名
3. 移除不必要的上下文
4. 使用缓存
"""
# 不好的system_message(冗长)
bad_system = """
你是一个非常专业、经验丰富、知识渊博的助手,
你有多年工作经验,擅长各种任务...
(很多废话)
"""
# 好的system_message(精简)
good_system = "你是技术专家,简洁准确地回答问题。"
return good_system
@staticmethod
async def optimize_caching():
"""
缓存优化
使用cache_seed实现响应缓存
"""
# 启用缓存
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
# 相同cache_seed + 相同输入 = 相同输出
# cache_seed=42, # 可选:设置缓存种子
)
# 或使用自定义缓存
class ResponseCache:
"""简单的响应缓存"""
def __init__(self):
self.cache = {}
def get_key(self, messages):
return str(hash(str(messages)))
async def get_or_compute(self, key, compute_fn):
if key in self.cache:
return self.cache[key]
result = await compute_fn()
self.cache[key] = result
return result
10.3 错误处理与重试机制
import asyncio
from typing import Callable, Any
from functools import wraps
class RetryHandler:
"""
重试处理器
功能:
1. 自动重试失败的操作
2. 指数退避
3. 最大重试次数限制
"""
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exceptions: tuple = (Exception,)
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exceptions = exceptions
async def execute_with_retry(
self,
func: Callable,
*args,
**kwargs
) -> Any:
"""
带重试的执行函数
"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except self.exceptions as e:
last_exception = e
if attempt < self.max_retries:
# 计算退避时间
delay = min(
self.base_delay * (2 ** attempt),
self.max_delay
)
print(f"第{attempt + 1}次失败,{delay}秒后重试: {e}")
await asyncio.sleep(delay)
raise last_exception
async def error_handling_demo():
"""错误处理示例"""
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
retry_handler = RetryHandler(
max_retries=3,
base_delay=1.0,
exceptions=(ConnectionError, TimeoutError)
)
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(name="assistant", model_client=model_client)
# 使用重试机制运行Agent
async def run_agent():
return await agent.run(task="你好")
try:
result = await retry_handler.execute_with_retry(run_agent)
print(f"结果: {result.messages[-1].content}")
except Exception as e:
print(f"所有重试失败: {e}")
asyncio.run(error_handling_demo())
10.4 可观测性实践
import asyncio
import logging
import json
from datetime import datetime
from typing import Optional
from dataclasses import dataclass, field
@dataclass
class TraceSpan:
"""追踪跨度"""
span_id: str
operation: str
start_time: datetime
end_time: Optional[datetime] = None
attributes: dict = field(default_factory=dict)
events: list = field(default_factory=list)
class ObservabilityManager:
"""
可观测性管理器
功能:
1. 结构化日志
2. 分布式追踪
3. 指标收集
"""
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = self._setup_logger()
self.current_trace: Optional[TraceSpan] = None
self.metrics = {
"requests_total": 0,
"requests_success": 0,
"requests_error": 0,
"latency_sum": 0.0,
}
def _setup_logger(self) -> logging.Logger:
"""配置结构化日志"""
logger = logging.getLogger(self.service_name)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
json.dumps({
"timestamp": "%(asctime)s",
"level": "%(levelname)s",
"service": self.service_name,
"message": "%(message)s"
})
))
logger.addHandler(handler)
return logger
def start_span(self, operation: str, **attributes) -> TraceSpan:
"""开始追踪跨度"""
span = TraceSpan(
span_id=f"{operation}-{datetime.now().timestamp()}",
operation=operation,
start_time=datetime.now(),
attributes=attributes
)
self.current_trace = span
self.logger.info(f"开始操作: {operation}", extra=attributes)
return span
def end_span(self, span: TraceSpan):
"""结束追踪跨度"""
span.end_time = datetime.now()
duration = (span.end_time - span.start_time).total_seconds()
self.metrics["latency_sum"] += duration
self.logger.info(
f"完成操作: {span.operation}",
extra={
"duration_ms": duration * 1000,
**span.attributes
}
)
def record_event(self, event_name: str, **attributes):
"""记录事件"""
if self.current_trace:
self.current_trace.events.append({
"name": event_name,
"timestamp": datetime.now().isoformat(),
"attributes": attributes
})
self.logger.info(f"事件: {event_name}", extra=attributes)
def record_error(self, error: Exception, **attributes):
"""记录错误"""
self.metrics["requests_error"] += 1
self.logger.error(
f"错误: {type(error).__name__}: {error}",
extra={"error_type": type(error).__name__, **attributes}
)
async def observability_demo():
"""可观测性演示"""
obs = ObservabilityManager("autogen-service")
# 开始追踪
span = obs.start_span("process_request", user_id="user-123")
try:
# 模拟处理
obs.record_event("validation_passed")
await asyncio.sleep(0.1)
obs.record_event("model_called")
await asyncio.sleep(0.2)
obs.record_event("response_generated")
obs.metrics["requests_success"] += 1
except Exception as e:
obs.record_error(e)
finally:
obs.end_span(span)
print(f"\n指标: {obs.metrics}")
asyncio.run(observability_demo())
10.5 安全最佳实践
import os
import hashlib
from typing import Optional
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
class SecurityBestPractices:
"""
安全最佳实践
涵盖:
1. 密钥管理
2. 代码执行安全
3. 输入验证
4. 权限控制
"""
# ========== 1. 密钥管理 ==========
@staticmethod
def manage_secrets():
"""
密钥管理最佳实践
- 使用环境变量
- 使用密钥管理服务
- 永远不要硬编码
"""
# 好的做法:从环境变量读取
api_key = os.getenv("OPENAI_API_KEY")
# 更好的做法:使用.env文件 + python-dotenv
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
# 生产环境:使用密钥管理服务
# Azure Key Vault, AWS Secrets Manager, etc.
return api_key
# ========== 2. 代码执行安全 ==========
@staticmethod
def secure_code_execution():
"""
安全的代码执行配置
"""
# 生产环境推荐:Docker隔离
secure_executor = DockerCommandLineCodeExecutor(
image="python:3.11-slim",
work_dir="/workspace",
timeout=30,
# 安全配置
bind_dir=None, # 不绑定宿主机目录
)
return secure_executor
# ========== 3. 输入验证 ==========
@staticmethod
def validate_input(user_input: str) -> tuple[bool, str]:
"""
输入验证
返回: (是否有效, 处理后的输入或错误信息)
"""
# 长度限制
if len(user_input) > 10000:
return False, "输入过长"
# 敏感词检测
sensitive_words = ["password", "secret", "token"]
for word in sensitive_words:
if word in user_input.lower():
return False, f"输入包含敏感词: {word}"
# 清理输入
cleaned = user_input.strip()
return True, cleaned
# ========== 4. 权限控制 ==========
@staticmethod
def check_permission(user_id: str, action: str) -> bool:
"""
权限检查
参数:
user_id: 用户ID
action: 操作类型
返回: 是否有权限
"""
# 示例权限配置
permissions = {
"admin": ["read", "write", "delete", "execute"],
"user": ["read", "write"],
"guest": ["read"]
}
# 获取用户角色
role = "user" # 从数据库查询
return action in permissions.get(role, [])
# 安全配置示例
def create_secure_agent():
"""创建安全的Agent配置"""
security = SecurityBestPractices()
# 安全检查清单
checklist = {
"密钥管理": "使用环境变量",
"代码执行": "Docker隔离",
"输入验证": "长度和内容检查",
"权限控制": "基于角色的访问控制",
"日志脱敏": "不记录敏感信息",
"HTTPS": "使用加密传输",
}
print("=== 安全配置检查清单 ===")
for item, status in checklist.items():
print(f"[✓] {item}: {status}")
return checklist
10.6 生产部署方案
# docker-compose.yml 示例
version: '3.8'
services:
autogen-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- LOG_LEVEL=INFO
volumes:
- ./workspace:/app/workspace
depends_on:
- redis
- postgres
deploy:
replicas: 3
resources:
limits:
cpus: '2'
memory: 4G
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
redis:
image: redis:7-alpine
ports:
- "6379:6379"
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: autogen
POSTGRES_USER: autogen
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
# Kubernetes部署示例 (deployment.yaml)
"""
apiVersion: apps/v1
kind: Deployment
metadata:
name: autogen-deployment
spec:
replicas: 3
selector:
matchLabels:
app: autogen
template:
metadata:
labels:
app: autogen
spec:
containers:
- name: autogen
image: autogen:latest
ports:
- containerPort: 8000
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: autogen-secrets
key: openai-api-key
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
"""
# 生产环境启动脚本
import asyncio
import uvicorn
from fastapi import FastAPI
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
app = FastAPI(title="AutoGen API")
# 全局Agent实例
model_client = None
agent = None
@app.on_event("startup")
async def startup():
"""启动时初始化"""
global model_client, agent
model_client = OpenAIChatCompletionClient(
model="gpt-4o",
api_key=os.getenv("OPENAI_API_KEY")
)
agent = AssistantAgent(
name="production_assistant",
model_client=model_client,
)
@app.get("/health")
async def health():
"""健康检查端点"""
return {"status": "healthy"}
@app.get("/ready")
async def ready():
"""就绪检查端点"""
return {"status": "ready"}
@app.post("/chat")
async def chat(message: str):
"""聊天端点"""
result = await agent.run(task=message)
return {"response": result.messages[-1].content}
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
workers=4 # 多工作进程
)
总结
本文全面讲解了AutoGen 0.4的核心概念、API使用和生产实践:
核心要点回顾:
- 分层架构:AgentChat(高层)vs Core(底层),根据场景选择
- Agent类型:AssistantAgent、UserProxyAgent、CodeExecutorAgent等
- Team协作:RoundRobinGroupChat、SelectorGroupChat
- 工具系统:FunctionTool、CodeExecutor、MCP集成
- 终止控制:多种TerminationCondition组合使用
- 消息机制:理解消息类型和inner_messages
- Core层:事件驱动、发布/订阅、分布式部署
- 生产实践:安全、性能、可观测性、部署方案
学习路径建议:
AutoGen为构建多智能体系统提供了强大而灵活的框架,希望本文能帮助您快速掌握并应用于实际项目!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)