Browser-use
第1章:Browser-use 核心概念与架构
1.1 Browser-use 是什么
Browser-use 是一个开源的 Python 库,旨在通过集成大型语言模型(LLM)实现智能浏览器自动化。它允许开发者使用自然语言描述任务,让 AI 代理自主完成复杂的网页交互操作。
核心定位:
- 不是简单的浏览器控制工具,而是AI 驱动的智能代理框架
- 底层基于 Playwright,但屏蔽了底层复杂性
- 通过 LLM 实现"理解-决策-执行"的闭环
GitHub 数据(截至 2025年):
- ⭐ Stars: 60K+
- 🍴 Forks: 6K+
- 主要贡献者:gregpr07 及社区团队
1.2 核心架构解析
Browser-use 采用三层架构模型:
各层职责详解:
| 层级 | 核心类 | 职责 |
|---|---|---|
| Agent 层 | Agent |
任务协调、决策循环、LLM 交互 |
| Controller 层 | Controller |
动作注册、参数验证、执行分发 |
| Browser 层 | Browser, BrowserContext |
浏览器生命周期、页面状态管理 |
1.3 与传统自动化工具的对比
详细对比表:
| 特性 | Selenium | Playwright | Browser-use |
|---|---|---|---|
| 编程方式 | 命令式代码 | 命令式代码 | 自然语言 + 代码 |
| 学习曲线 | 陡峭 | 中等 | 平缓 |
| 自适应能力 | 无 | 无 | 强(LLM驱动) |
| 维护成本 | 高(页面变化需重写) | 中 | 低(自然语言描述稳定) |
| 复杂任务 | 需大量代码 | 需大量代码 | 简洁描述即可 |
| 执行可控性 | 完全可控 | 完全可控 | 需设计约束 |
| 适用场景 | 稳定页面测试 | 现代 Web 应用 | 动态/复杂交互场景 |
1.4 适用场景与核心优势
最佳适用场景:
- 动态内容处理:页面结构频繁变化,传统选择器易失效
- 复杂多步骤任务:需要推理和决策的流程(如比价、筛选)
- 自然语言交互测试:验证系统对用户意图的理解
- 快速原型验证:无需编写大量代码即可验证自动化可行性
核心优势:
第2章:环境搭建与基础配置
2.1 系统要求
硬性要求:
- Python: 3.11 或更高版本(必需,因使用新类型注解特性)
- 操作系统: Windows 10+/macOS 10.15+/Linux (Ubuntu 20.04+)
- 内存: 建议 8GB+(LLM 推理需要)
- 网络: 可访问 LLM API 服务
验证 Python 版本:
# 检查 Python 版本
python --version
# 或
python3 --version
# 输出应为 3.11.x 或更高
2.2 安装与依赖管理
基础安装:
# 创建虚拟环境(推荐)
python -m venv browser-use-env
# 激活虚拟环境
# Windows:
browser-use-env\Scripts\activate
# macOS/Linux:
source browser-use-env/bin/activate
# 安装 browser-use
pip install browser-use
# 安装 Playwright 浏览器(必需)
playwright install
# 安装特定浏览器(可选,节省空间)
playwright install chromium
# playwright install firefox
# playwright install webkit
完整依赖安装(生产环境):
# 创建 requirements.txt
cat > requirements.txt << EOF
browser-use>=0.1.40
playwright>=1.40.0
openai>=1.0.0
python-dotenv>=1.0.0
pydantic>=2.0.0
pytest>=7.0.0
pytest-asyncio>=0.21.0
aiofiles>=23.0.0
EOF
# 安装依赖
pip install -r requirements.txt
验证安装:
# test_installation.py
"""
验证 browser-use 安装是否成功
"""
import asyncio
from browser_use import Agent, Browser, BrowserConfig
async def test_basic():
"""基础功能测试"""
try:
# 创建浏览器实例
browser = Browser(config=BrowserConfig(headless=True))
print("✅ Browser 实例创建成功")
# 验证 Playwright 浏览器已安装
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser_check = p.chromium.launch()
browser_check.close()
print("✅ Playwright 浏览器检查通过")
await browser.close()
print("✅ 所有检查通过,环境配置正确!")
return True
except Exception as e:
print(f"❌ 检查失败: {e}")
return False
if __name__ == "__main__":
asyncio.run(test_basic())
2.3 LLM 接入配置
Browser-use 支持多种 LLM 提供商,以下是详细配置方法:
2.3.1 OpenAI 配置
# config/openai_config.py
"""
OpenAI LLM 配置示例
支持 GPT-4、GPT-4o、GPT-3.5-turbo 等模型
"""
import os
from langchain_openai import ChatOpenAI
from browser_use import Agent
# 方式1:环境变量配置(推荐,安全)
# 在 .env 文件中设置:
# OPENAI_API_KEY=sk-your-api-key-here
# 方式2:代码中配置(仅测试使用)
os.environ["OPENAI_API_KEY"] = "sk-your-api-key-here"
# 创建 LLM 实例
llm = ChatOpenAI(
model="gpt-4o", # 模型名称
temperature=0.1, # 温度参数,控制创造性(0-1)
max_tokens=4096, # 最大输出token数
timeout=60, # API 调用超时时间(秒)
max_retries=3, # 失败重试次数
)
# 使用示例
async def create_agent_with_openai():
"""使用 OpenAI LLM 创建 Agent"""
agent = Agent(
task="访问 example.com 并获取页面标题",
llm=llm,
)
return agent
2.3.2 Azure OpenAI 配置
# config/azure_config.py
"""
Azure OpenAI 配置示例
适用于企业级部署场景
"""
import os
from langchain_openai import AzureChatOpenAI
# 设置 Azure 环境变量
os.environ["AZURE_OPENAI_API_KEY"] = "your-azure-api-key"
os.environ["AZURE_OPENAI_ENDPOINT"] = "https://your-resource.openai.azure.com/"
os.environ["AZURE_OPENAI_API_VERSION"] = "2024-02-01"
# 创建 Azure LLM 实例
azure_llm = AzureChatOpenAI(
azure_deployment="gpt-4o", # Azure 部署名称
model="gpt-4o", # 模型名称
temperature=0.1,
max_tokens=4096,
timeout=60,
)
# 使用 Azure LLM 创建 Agent
async def create_azure_agent():
"""使用 Azure OpenAI 创建 Agent"""
from browser_use import Agent
agent = Agent(
task="在 Azure 环境中执行自动化任务",
llm=azure_llm,
)
return agent
2.3.3 Claude (Anthropic) 配置
# config/claude_config.py
"""
Anthropic Claude 配置示例
Claude 在长文本理解和复杂推理方面表现优异
"""
import os
from langchain_anthropic import ChatAnthropic
# 设置 Anthropic API Key
os.environ["ANTHROPIC_API_KEY"] = "sk-ant-your-api-key"
# 创建 Claude LLM 实例
claude_llm = ChatAnthropic(
model="claude-3-5-sonnet-20241022", # Claude 3.5 Sonnet
temperature=0.1,
max_tokens=4096,
timeout=60,
max_retries=3,
)
# Claude 特别适合复杂的多步骤任务
async def create_claude_agent():
"""使用 Claude 创建 Agent"""
from browser_use import Agent
agent = Agent(
task="""
完成以下复杂任务:
1. 访问一个电商网站
2. 搜索"无线耳机"
3. 按价格从低到高排序
4. 选择评分最高的商品
5. 获取商品详细信息
""",
llm=claude_llm,
)
return agent
2.3.4 本地模型配置(Ollama)
# config/local_llm_config.py
"""
本地 LLM 配置示例(使用 Ollama)
适用于数据隐私要求高的场景
"""
from langchain_ollama import ChatOllama
# 创建本地 LLM 实例
local_llm = ChatOllama(
model="llama3.2", # 本地模型名称
temperature=0.1,
base_url="http://localhost:11434", # Ollama 服务地址
)
# 使用本地模型创建 Agent
async def create_local_agent():
"""使用本地 LLM 创建 Agent"""
from browser_use import Agent
agent = Agent(
task="执行本地自动化测试任务",
llm=local_llm,
)
return agent
2.3.5 统一配置管理(推荐)
# config/llm_factory.py
"""
LLM 工厂类 - 统一管理不同提供商的配置
"""
import os
from typing import Optional
from langchain_openai import ChatOpenAI, AzureChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_ollama import ChatOllama
class LLMFactory:
"""
LLM 工厂类
根据配置创建对应的 LLM 实例
"""
@staticmethod
def create(
provider: str = "openai",
model: Optional[str] = None,
temperature: float = 0.1,
**kwargs
):
"""
创建 LLM 实例
Args:
provider: 提供商名称 (openai/azure/anthropic/ollama)
model: 模型名称
temperature: 温度参数
**kwargs: 其他参数
Returns:
BaseChatModel: LLM 实例
"""
providers = {
"openai": LLMFactory._create_openai,
"azure": LLMFactory._create_azure,
"anthropic": LLMFactory._create_anthropic,
"ollama": LLMFactory._create_ollama,
}
if provider not in providers:
raise ValueError(f"不支持的提供商: {provider}")
return providers[provider](model, temperature, **kwargs)
@staticmethod
def _create_openai(model, temperature, **kwargs):
"""创建 OpenAI LLM"""
return ChatOpenAI(
model=model or "gpt-4o",
temperature=temperature,
max_retries=kwargs.get("max_retries", 3),
)
@staticmethod
def _create_azure(model, temperature, **kwargs):
"""创建 Azure OpenAI LLM"""
return AzureChatOpenAI(
azure_deployment=model or "gpt-4o",
temperature=temperature,
)
@staticmethod
def _create_anthropic(model, temperature, **kwargs):
"""创建 Anthropic LLM"""
return ChatAnthropic(
model=model or "claude-3-5-sonnet-20241022",
temperature=temperature,
)
@staticmethod
def _create_ollama(model, temperature, **kwargs):
"""创建 Ollama 本地 LLM"""
return ChatOllama(
model=model or "llama3.2",
temperature=temperature,
base_url=kwargs.get("base_url", "http://localhost:11434"),
)
# 使用示例
# llm = LLMFactory.create(provider="openai", model="gpt-4o")
# llm = LLMFactory.create(provider="anthropic", temperature=0.2)
第3章:核心类与参数详解
3.1 Agent 类深度解析
Agent 是 Browser-use 的核心类,负责协调整个自动化流程。
class Agent:
"""
Agent 类 - 智能代理的核心实现
职责:
1. 管理任务执行生命周期
2. 与 LLM 交互进行决策
3. 调用 Controller 执行动作
4. 处理执行结果和错误
"""
def __init__(
self,
# ------------------- 必需参数 -------------------
task: str, # 任务描述(自然语言)
llm: BaseChatModel, # LLM 实例
# ------------------- 可选参数 -------------------
browser: Browser | None = None, # 浏览器实例
browser_context: BrowserContext | None = None, # 浏览器上下文
controller: Controller | None = None, # 控制器实例
# ------------------- 执行控制参数 -------------------
max_steps: int = 100, # 最大执行步数
max_actions_per_step: int = 10, # 每步最大动作数
max_failures: int = 3, # 最大失败次数
retry_delay: int = 10, # 重试延迟(秒)
# ------------------- 系统配置 -------------------
system_prompt: SystemMessage | None = None, # 自定义系统提示词
use_vision: bool = True, # 是否启用视觉模式
# ------------------- 高级参数 -------------------
generate_gif: bool | str | None = None, # 生成执行过程GIF
available_file_paths: list[str] | None = None, # 可用文件路径
include_attributes: list[str] | None = None, # 包含的DOM属性
):
参数详解表:
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
task |
str | 必需 | 自然语言描述的任务目标 |
llm |
BaseChatModel | 必需 | LangChain 格式的 LLM 实例 |
browser |
Browser | None | 浏览器实例,None则自动创建 |
browser_context |
BrowserContext | None | 浏览器上下文,用于保持会话 |
controller |
Controller | None | 自定义控制器实例 |
max_steps |
int | 100 | 最大执行步数,防止无限循环 |
max_actions_per_step |
int | 10 | 每步最大动作数,防止LLM生成过多动作 |
max_failures |
int | 3 | 连续失败次数上限,超过则终止 |
retry_delay |
int | 10 | 失败后重试前的等待时间 |
system_prompt |
SystemMessage | None | 自定义系统提示词,控制Agent行为 |
use_vision |
bool | True | 是否发送页面截图给LLM |
generate_gif |
bool/str | None | 是否生成执行过程GIF记录 |
3.2 Browser 与 BrowserContext 类
class Browser:
"""
Browser 类 - 浏览器实例管理
职责:
1. 管理 Playwright 浏览器实例
2. 提供浏览器级别的配置
3. 创建和管理 BrowserContext
"""
def __init__(
self,
config: BrowserConfig = BrowserConfig(), # 浏览器配置
):
class BrowserContext:
"""
BrowserContext 类 - 浏览器上下文管理
职责:
1. 管理独立的浏览会话(Cookie、缓存、存储)
2. 提供页面级别的操作
3. 维护当前页面状态
"""
def __init__(
self,
browser: Browser, # 父浏览器实例
config: BrowserContextConfig | None = None, # 上下文配置
):
BrowserConfig 配置详解:
@dataclass
class BrowserConfig:
"""
浏览器配置类
控制浏览器实例的行为和特性
"""
# ------------------- 显示模式 -------------------
headless: bool = False # 是否无头模式(不显示界面)
# True: 后台运行,适合生产环境
# False: 显示界面,适合调试
# ------------------- 窗口配置 -------------------
extra_chromium_args: list[str] = field(default_factory=list)
# 额外的 Chromium 启动参数
# 例如: ["--disable-gpu", "--no-sandbox"]
# ------------------- 浏览器类型 -------------------
browser_type: str = "chromium" # 浏览器类型
# 可选: "chromium", "firefox", "webkit"
# ------------------- 连接配置 -------------------
_force_keep_browser_alive: bool = False
# 是否保持浏览器存活
# 用于调试,防止自动关闭
BrowserContextConfig 配置详解:
@dataclass
class BrowserContextConfig:
"""
浏览器上下文配置类
控制单个浏览会话的行为
"""
# ------------------- 视口配置 -------------------
viewport: dict = field(default_factory=lambda: {
"width": 1280,
"height": 720
}) # 浏览器窗口大小
# ------------------- 地理位置 -------------------
geolocation: dict | None = None # 地理位置信息
# 例如: {"latitude": 39.9042, "longitude": 116.4074}
# ------------------- 区域设置 -------------------
locale: str = "zh-CN" # 区域设置,影响语言和格式
timezone_id: str = "Asia/Shanghai" # 时区设置
# ------------------- 设备模拟 -------------------
user_agent: str = "" # 自定义 User-Agent
device_scale_factor: float = 1.0 # 设备缩放比例
is_mobile: bool = False # 是否模拟移动设备
has_touch: bool = False # 是否支持触摸事件
# ------------------- 权限配置 -------------------
permissions: list[str] = field(default_factory=list)
# 权限列表
# 例如: ["geolocation", "notifications"]
# ------------------- 存储配置 -------------------
storage_state: str | None = None # 存储状态文件路径
# 用于恢复登录状态等
3.3 Controller 类与自定义动作
class Controller:
"""
Controller 类 - 动作注册与执行中心
职责:
1. 注册和管理可执行动作
2. 验证动作参数
3. 分发动作执行
4. 处理执行结果
"""
def __init__(
self,
exclude_actions: list[str] | None = None, # 排除的动作列表
output_model: type[BaseModel] | None = None, # 输出数据模型
):
核心方法:
| 方法 | 说明 | 使用场景 |
|---|---|---|
action() |
装饰器,注册自定义动作 | 扩展 Agent 能力 |
register_action() |
程序化注册动作 | 动态添加动作 |
execute_action() |
执行指定动作 | 手动调用动作 |
3.4 完整配置示例
# examples/complete_configuration.py
"""
Browser-use 完整配置示例
展示所有核心类的配置方式
"""
from browser_use import Agent, Browser, BrowserConfig, BrowserContextConfig, Controller
from langchain_openai import ChatOpenAI
import asyncio
# ============================================================
# 1. 创建 LLM 实例
# ============================================================
llm = ChatOpenAI(
model="gpt-4o",
temperature=0.1,
max_tokens=4096,
max_retries=3,
)
# ============================================================
# 2. 配置浏览器
# ============================================================
browser_config = BrowserConfig(
headless=True, # 无头模式
extra_chromium_args=[ # 额外启动参数
"--disable-gpu",
"--no-sandbox",
"--disable-dev-shm-usage",
],
browser_type="chromium", # 使用 Chromium
)
browser = Browser(config=browser_config)
# ============================================================
# 3. 配置浏览器上下文
# ============================================================
context_config = BrowserContextConfig(
viewport={"width": 1920, "height": 1080}, # 全高清分辨率
locale="zh-CN", # 中文环境
timezone_id="Asia/Shanghai", # 上海时区
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
)
# ============================================================
# 4. 配置控制器
# ============================================================
controller = Controller(
exclude_actions=["open_tab"], # 排除特定动作
)
# ============================================================
# 5. 创建完整配置的 Agent
# ============================================================
async def create_fully_configured_agent():
"""创建完整配置的 Agent"""
agent = Agent(
# ------------------- 必需参数 -------------------
task="完成指定的浏览器自动化任务", # str: 任务描述
llm=llm, # BaseChatModel: LLM 实例
# ------------------- 浏览器配置 -------------------
browser=browser, # Browser: 自定义浏览器实例
# ------------------- 控制器配置 -------------------
controller=controller, # Controller: 自定义控制器
# ------------------- 执行控制参数 -------------------
max_steps=100, # int: 最大执行步数,默认 100
max_actions_per_step=10, # int: 每步最大动作数,默认 10
max_failures=3, # int: 最大失败次数,默认 3
retry_delay=10, # int: 重试延迟(秒),默认 10
# ------------------- 系统配置 -------------------
use_vision=True, # bool: 启用视觉模式,默认 True
# ------------------- 高级参数 -------------------
generate_gif=False, # bool/str: 生成GIF,默认 False
)
return agent
# ============================================================
# 6. 执行示例
# ============================================================
async def main():
"""主函数"""
agent = await create_fully_configured_agent()
# 修改任务
agent.task = """
访问 https://example.com 并完成以下操作:
1. 获取页面标题
2. 提取所有链接
3. 返回结果
"""
# 执行任务
result = await agent.run()
# 处理结果
if result.is_successful():
print(f"✅ 任务成功: {result.final_result()}")
else:
print(f"❌ 任务失败: {result.errors()}")
# 清理资源
await browser.close()
if __name__ == "__main__":
asyncio.run(main())
第4章:内置动作方法与使用
4.1 核心动作方法详解
Browser-use 提供了丰富的内置动作,以下是完整列表:
动作详细说明表:
| 动作 | 功能 | 参数 | 返回值 |
|---|---|---|---|
go_to_url |
访问指定URL | url: str |
页面加载结果 |
click_element |
点击元素 | index: int |
点击结果 |
input_text |
输入文本 | index: int, text: str |
输入结果 |
scroll_down |
向下滚动 | scroll_amount: int |
滚动结果 |
scroll_up |
向上滚动 | scroll_amount: int |
滚动结果 |
select_option |
选择下拉选项 | index: int, text: str |
选择结果 |
check_checkbox |
勾选/取消复选框 | index: int |
操作结果 |
switch_to_tab |
切换标签页 | page_id: int |
切换结果 |
open_tab |
打开新标签页 | url: str |
新页面 |
close_tab |
关闭标签页 | - | 关闭结果 |
extract_content |
提取页面内容 | - | 提取的文本 |
wait |
等待指定时间 | seconds: int |
- |
done |
标记任务完成 | text: str |
最终结果 |
4.2 动作使用示例
# examples/action_usage.py
"""
内置动作使用示例
展示各种内置动作的实际使用
"""
from browser_use import Agent, Controller, ActionResult
from langchain_openai import ChatOpenAI
import asyncio
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# ============================================================
# 1. 基础导航动作
# ============================================================
async def navigation_actions():
"""导航动作示例"""
task = """
导航操作演示:
1. 访问 https://example.com
2. 等待2秒
3. 点击页面上的第一个链接
4. 等待新页面加载
5. 返回上一页
6. 刷新页面
7. 完成操作
记录每个步骤的结果。
"""
agent = Agent(task=task, llm=llm)
result = await agent.run()
return result
# ============================================================
# 2. 表单交互动作
# ============================================================
async def form_interaction():
"""表单交互示例"""
task = """
表单填写演示:
1. 访问 https://forms.example.com
2. 在姓名字段输入"张三"
3. 在邮箱字段输入"zhangsan@example.com"
4. 从下拉菜单选择"北京"
5. 勾选"同意服务条款"复选框
6. 点击提交按钮
7. 验证提交成功
"""
agent = Agent(task=task, llm=llm, max_steps=50)
result = await agent.run()
return result
# ============================================================
# 3. 内容提取动作
# ============================================================
async def content_extraction():
"""内容提取示例"""
task = """
内容提取演示:
1. 访问 https://news.example.com
2. 提取页面标题
3. 提取所有文章标题和链接
4. 滚动页面加载更多内容
5. 继续提取新加载的文章
6. 返回JSON格式的结果:
{
"page_title": "...",
"articles": [
{"title": "...", "link": "..."}
]
}
"""
agent = Agent(task=task, llm=llm, max_steps=80)
result = await agent.run()
return result
# ============================================================
# 4. 多标签页管理
# ============================================================
async def tab_management():
"""标签页管理示例"""
task = """
多标签页操作演示:
1. 访问 https://site1.com
2. 在新标签页打开 https://site2.com
3. 在新标签页打开 https://site3.com
4. 切换到第2个标签页,提取标题
5. 切换到第3个标签页,提取标题
6. 关闭第3个标签页
7. 返回第1个标签页
8. 汇总所有访问过的页面标题
"""
agent = Agent(task=task, llm=llm, max_steps=60)
result = await agent.run()
return result
# ============================================================
# 5. 复杂交互流程
# ============================================================
async def complex_interaction():
"""复杂交互示例"""
task = """
复杂交互演示(电商购物流程):
1. 访问 https://shop.example.com
2. 在搜索框输入"无线耳机"
3. 点击搜索按钮
4. 等待搜索结果加载
5. 点击第一个商品
6. 选择颜色为"黑色"
7. 点击"加入购物车"
8. 验证购物车数量增加
9. 进入购物车页面
10. 点击"去结算"
11. 确认订单信息
12. 返回操作结果
"""
agent = Agent(task=task, llm=llm, max_steps=100)
result = await agent.run()
return result
# 运行示例
if __name__ == "__main__":
# 选择要运行的示例
asyncio.run(navigation_actions())
4.3 动作组合策略
# examples/action_composition.py
"""
动作组合策略
展示如何组合多个动作完成复杂任务
"""
from browser_use import Agent, Controller
from langchain_openai import ChatOpenAI
from typing import List, Dict
import asyncio
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# ============================================================
# 1. 顺序执行模式
# ============================================================
async def sequential_execution():
"""
顺序执行模式
按固定顺序执行一系列操作
"""
steps = [
"访问登录页面",
"输入用户名",
"输入密码",
"点击登录按钮",
"验证登录成功",
]
task = f"""
按顺序执行以下操作:
{chr(10).join(f"{i+1}. {step}" for i, step in enumerate(steps))}
每个步骤完成后验证结果,如果失败则停止并报告。
"""
agent = Agent(task=task, llm=llm)
return await agent.run()
# ============================================================
# 2. 条件执行模式
# ============================================================
async def conditional_execution():
"""
条件执行模式
根据页面状态决定执行路径
"""
task = """
条件执行演示:
1. 访问 https://conditional.example.com
2. 检查页面状态:
- 如果显示登录表单:执行登录流程
- 如果显示欢迎页面:说明已登录,跳过登录
- 如果显示错误页面:报告错误并停止
3. 根据检查结果执行相应操作
4. 返回执行路径和结果
"""
agent = Agent(task=task, llm=llm, max_steps=50)
return await agent.run()
# ============================================================
# 3. 循环执行模式
# ============================================================
async def loop_execution():
"""
循环执行模式
重复执行直到满足条件
"""
task = """
循环执行演示(分页数据采集):
1. 访问 https://pagination.example.com
2. 提取当前页的所有商品信息
3. 检查是否有"下一页"按钮:
- 如果有:点击下一页,返回步骤2继续
- 如果没有:结束采集
4. 汇总所有采集的数据
5. 返回JSON格式的完整结果
注意:最多采集10页,防止无限循环。
"""
agent = Agent(task=task, llm=llm, max_steps=150)
return await agent.run()
# ============================================================
# 4. 错误恢复模式
# ============================================================
async def error_recovery():
"""
错误恢复模式
遇到错误时尝试恢复
"""
task = """
错误恢复演示:
1. 访问 https://unstable.example.com
2. 尝试点击"加载数据"按钮
3. 如果加载失败(超时或无响应):
- 刷新页面
- 等待3秒
- 重试点击
- 最多重试3次
4. 如果仍然失败,报告错误
5. 如果成功,继续后续操作
"""
agent = Agent(task=task, llm=llm, max_failures=5)
return await agent.run()
第5章:LLM 集成与提示工程
5.1 支持的 LLM 提供商与配置方式
Browser-use 通过 LangChain 集成多种 LLM 提供商,以下是完整支持列表:
各提供商特点对比:
| 提供商 | 推荐模型 | 优势 | 适用场景 |
|---|---|---|---|
| OpenAI | GPT-4o | 综合能力强,速度快 | 通用任务 |
| Azure OpenAI | GPT-4o | 企业合规,SLA保障 | 企业部署 |
| Anthropic | Claude 3.5 Sonnet | 推理能力强,上下文长 | 复杂任务 |
| Gemini Pro | 多模态能力强 | 视觉任务 | |
| Ollama | Llama 3.2 | 本地运行,隐私保护 | 敏感数据场景 |
5.2 系统提示词(System Prompt)定制
系统提示词决定了 Agent 的行为模式,可以通过自定义 SystemPrompt 类实现。
# llm/system_prompt.py
"""
系统提示词定制
通过自定义 SystemPrompt 控制 Agent 行为
"""
from browser_use import Agent
from langchain_core.messages import SystemMessage
from langchain_openai import ChatOpenAI
import asyncio
# ============================================================
# 1. 基础系统提示词
# ============================================================
BASIC_SYSTEM_PROMPT = """你是一个浏览器自动化助手。
你的任务是帮助用户完成网页浏览和交互任务。
规则:
1. 仔细分析当前页面状态
2. 选择最合适的操作
3. 如果操作失败,尝试替代方案
4. 保持简洁,避免不必要的步骤
"""
# ============================================================
# 2. 测试专用系统提示词
# ============================================================
TESTING_SYSTEM_PROMPT = """你是一个专业的自动化测试工程师。
你的职责:
1. 严格按照测试用例执行操作
2. 验证每个步骤的预期结果
3. 详细记录测试过程和结果
4. 发现缺陷时立即停止并报告
操作规范:
- 每次操作后验证页面状态
- 使用断言验证关键元素
- 截图保存关键步骤
- 详细记录错误信息
"""
# ============================================================
# 3. 数据采集专用系统提示词
# ============================================================
SCRAPING_SYSTEM_PROMPT = """你是一个数据采集专家。
你的任务是从网页中提取结构化数据。
规则:
1. 识别数据所在的页面区域
2. 提取完整、准确的数据
3. 处理分页和动态加载
4. 返回格式化的 JSON 数据
注意事项:
- 尊重网站的 robots.txt
- 控制请求频率,避免被封禁
- 处理反爬虫机制
"""
# ============================================================
# 4. 使用自定义系统提示词
# ============================================================
async def agent_with_custom_prompt():
"""
使用自定义系统提示词创建 Agent
"""
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# 创建自定义系统消息
system_message = SystemMessage(content=TESTING_SYSTEM_PROMPT)
agent = Agent(
task="执行登录测试",
llm=llm,
system_message=system_message, # 使用自定义系统提示词
)
result = await agent.run()
return result
# ============================================================
# 5. 动态系统提示词
# ============================================================
def create_contextual_prompt(task_type: str, domain: str) -> str:
"""
根据任务类型和领域创建上下文相关的系统提示词
Args:
task_type: 任务类型 (testing/scraping/form_filling/navigation)
domain: 目标网站领域 (ecommerce/finance/social/etc)
Returns:
str: 定制的系统提示词
"""
base_prompt = f"""你是一个专业的浏览器自动化助手,专注于 {domain} 领域的 {task_type} 任务。
领域知识:
- 熟悉 {domain} 网站的常见布局和交互模式
- 了解该领域的专业术语和业务流程
- 知晓常见的反爬虫和保护机制
任务规范:
"""
if task_type == "testing":
base_prompt += """
- 严格验证每个操作的结果
- 详细记录测试步骤和结果
- 发现异常立即停止并报告
"""
elif task_type == "scraping":
base_prompt += """
- 提取完整、准确的数据
- 处理分页和动态加载
- 控制请求频率
"""
elif task_type == "form_filling":
base_prompt += """
- 准确填写所有必填字段
- 验证输入格式和约束
- 处理表单验证错误
"""
return base_prompt
# 使用动态提示词
async def agent_with_dynamic_prompt():
"""使用动态生成的系统提示词"""
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# 根据任务动态生成提示词
prompt = create_contextual_prompt(
task_type="testing",
domain="ecommerce"
)
agent = Agent(
task="测试电商网站购物车功能",
llm=llm,
system_message=SystemMessage(content=prompt),
)
return await agent.run()
5.3 任务描述的最佳实践
任务描述(task)是 Agent 理解需求的唯一途径,编写清晰、准确的任务描述至关重要。
# llm/task_best_practices.py
"""
任务描述最佳实践
如何编写高质量的任务描述
"""
from browser_use import Agent
from langchain_openai import ChatOpenAI
import asyncio
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# ============================================================
# 1. 任务描述的结构化模板
# ============================================================
# ❌ 不好的任务描述
bad_task = "去那个网站找点东西"
# ✅ 好的任务描述
good_task = """
任务目标:在 Amazon 上搜索并比较无线耳机
执行步骤:
1. 访问 https://www.amazon.com
2. 在搜索框输入 "wireless headphones"
3. 点击搜索按钮
4. 筛选条件:价格范围 $50-$100,评分 4星以上
5. 选择前 3 个商品,提取以下信息:
- 商品名称
- 价格
- 评分
- 评论数量
6. 按性价比排序,返回最佳推荐
输出格式:
返回 JSON 格式的比较结果
"""
# ============================================================
# 2. 任务描述的关键要素
# ============================================================
"""
一个高质量的任务描述应包含:
1. 目标(Goal)
- 明确要完成的任务
- 说明成功的标准
2. 步骤(Steps)
- 按顺序列出关键步骤
- 每个步骤具体明确
3. 约束(Constraints)
- 时间限制
- 操作边界
- 禁止事项
4. 输出(Output)
- 期望的输出格式
- 需要包含的字段
- 数据类型要求
"""
# ============================================================
# 3. 不同类型任务的最佳实践
# ============================================================
# 3.1 表单填写任务
form_task = """
任务:完成用户注册表单
目标:在 example.com 上创建新账户
步骤:
1. 访问 https://example.com/register
2. 填写以下字段:
- 用户名:testuser2024
- 邮箱:testuser2024@example.com
- 密码:SecurePass123!
- 确认密码:SecurePass123!
3. 勾选"我同意服务条款"复选框
4. 点击"注册"按钮
验证:
- 确认页面跳转到欢迎页面
- 检查是否显示"注册成功"消息
约束:
- 如果用户名已存在,尝试 testuser2024_1, testuser2024_2 等变体
- 密码必须包含大小写字母、数字和特殊字符
"""
# 3.2 数据采集任务
scraping_task = """
任务:采集新闻网站文章列表
目标:从 news.example.com 采集最新 50 篇文章
步骤:
1. 访问 https://news.example.com
2. 滚动页面加载更多文章,直到采集到 50 篇
3. 对每篇文章提取:
- 标题
- 发布时间
- 摘要
- 链接
- 分类标签
输出格式:
{
"articles": [
{
"title": "文章标题",
"publish_time": "2024-01-15 10:30",
"summary": "文章摘要...",
"url": "https://...",
"tags": ["科技", "AI"]
}
],
"total_count": 50
}
约束:
- 只采集最近 7 天内发布的文章
- 跳过付费/会员专属内容
- 控制请求频率,每滚动一次等待 2 秒
"""
# 3.3 测试验证任务
testing_task = """
任务:验证电商网站结账流程
目标:测试从购物车到订单完成的完整流程
前置条件:
- 已登录测试账户
- 购物车中已有测试商品
测试步骤:
1. 访问购物车页面
2. 点击"去结算"按钮
3. 选择收货地址(使用默认地址)
4. 选择支付方式(货到付款)
5. 确认订单信息
6. 点击"提交订单"
预期结果:
- 页面跳转到订单成功页面
- 显示订单号
- 显示预计送达时间
- 收到订单确认邮件(检查邮件通知)
验证点:
- 订单金额计算正确
- 商品信息准确无误
- 收货地址正确
- 订单状态为"待发货"
"""
# ============================================================
# 4. 任务描述优化技巧
# ============================================================
def optimize_task_description(original_task: str) -> str:
"""
优化任务描述的辅助函数
Args:
original_task: 原始任务描述
Returns:
str: 优化后的任务描述
"""
optimized = f"""
【任务目标】
{original_task}
【执行策略】
1. 优先使用明确的选择器定位元素
2. 操作失败后尝试替代方案
3. 每个关键步骤后验证结果
4. 遇到问题及时报告,不盲目重试
【输出要求】
- 返回执行结果摘要
- 如有错误,说明错误原因
- 提供关键步骤的截图(如可能)
【注意事项】
- 遵守网站使用条款
- 控制操作频率
- 保护敏感信息
"""
return optimized
# 使用优化后的任务
async def run_optimized_task():
"""运行优化后的任务"""
original = "登录系统并查看账户余额"
optimized = optimize_task_description(original)
agent = Agent(
task=optimized,
llm=llm,
max_steps=30,
)
return await agent.run()
5.4 多轮对话与上下文管理
Browser-use 支持多轮对话模式,可以在连续的任务中保持上下文。
# llm/multi_turn_conversation.py
"""
多轮对话与上下文管理
实现连续任务执行和状态保持
"""
from browser_use import Agent, Browser, BrowserConfig
from langchain_openai import ChatOpenAI
import asyncio
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# ============================================================
# 1. 会话状态保持
# ============================================================
async def session_management():
"""
使用共享浏览器实例保持会话状态
"""
# 创建共享浏览器实例
browser = Browser(config=BrowserConfig(headless=True))
try:
# 第一轮:登录
login_agent = Agent(
task="""
登录系统:
1. 访问 https://example.com/login
2. 输入用户名 'testuser'
3. 输入密码 'testpass'
4. 点击登录按钮
5. 验证登录成功
""",
llm=llm,
browser=browser, # 复用浏览器实例
)
login_result = await login_agent.run()
print(f"登录结果: {login_result.final_result()}")
# 第二轮:登录后操作(保持会话)
action_agent = Agent(
task="""
在登录状态下执行操作:
1. 访问个人中心
2. 查看账户信息
3. 获取账户余额
""",
llm=llm,
browser=browser, # 复用同一浏览器,保持登录状态
)
action_result = await action_agent.run()
print(f"操作结果: {action_result.final_result()}")
# 第三轮:修改信息
update_agent = Agent(
task="""
修改个人信息:
1. 进入个人资料页面
2. 更新手机号码为 '13800138000'
3. 保存更改
4. 验证更新成功
""",
llm=llm,
browser=browser,
)
update_result = await update_agent.run()
print(f"更新结果: {update_result.final_result()}")
finally:
await browser.close()
# ============================================================
# 2. 上下文传递模式
# ============================================================
class ContextualAgent:
"""
上下文感知 Agent
支持在多轮对话中传递上下文信息
"""
def __init__(self, llm, browser=None):
self.llm = llm
self.browser = browser or Browser(config=BrowserConfig(headless=True))
self.conversation_history = []
self.shared_context = {}
async def execute_task(self, task: str, context: dict = None) -> dict:
"""
执行任务并更新上下文
Args:
task: 当前任务描述
context: 额外的上下文信息
Returns:
dict: 执行结果和更新后的上下文
"""
# 构建包含上下文的任务描述
contextual_task = self._build_contextual_task(task, context)
# 创建 Agent 并执行
agent = Agent(
task=contextual_task,
llm=self.llm,
browser=self.browser,
)
result = await agent.run()
# 更新历史记录
self.conversation_history.append({
"task": task,
"result": result.final_result(),
})
# 提取并更新共享上下文
self._update_shared_context(result)
return {
"result": result,
"context": self.shared_context,
"history": self.conversation_history,
}
def _build_contextual_task(self, task: str, context: dict) -> str:
"""构建包含上下文的任务描述"""
contextual_parts = ["【当前任务】", task]
if context:
contextual_parts.extend(["", "【已知信息】"])
for key, value in context.items():
contextual_parts.append(f"- {key}: {value}")
if self.shared_context:
contextual_parts.extend(["", "【会话上下文】"])
for key, value in self.shared_context.items():
contextual_parts.append(f"- {key}: {value}")
return "\n".join(contextual_parts)
def _update_shared_context(self, result):
"""从执行结果中提取并更新上下文"""
# 这里可以根据实际需求解析结果并提取关键信息
pass
async def close(self):
"""关闭浏览器"""
await self.browser.close()
# 使用上下文感知 Agent
async def contextual_conversation():
"""多轮上下文对话示例"""
agent = ContextualAgent(llm=llm)
try:
# 第一轮:搜索商品
result1 = await agent.execute_task(
task="在电商网站搜索 'iPhone 15'",
context={"site": "amazon.com"}
)
print(f"搜索完成: {result1['result'].final_result()}")
# 第二轮:筛选(使用第一轮的上下文)
result2 = await agent.execute_task(
task="筛选结果:只显示 256GB 版本,按价格从低到高排序"
)
print(f"筛选完成: {result2['result'].final_result()}")
# 第三轮:选择商品(继承前两轮的上下文)
result3 = await agent.execute_task(
task="选择价格最低的商品,获取详细信息"
)
print(f"选择完成: {result3['result'].final_result()}")
finally:
await agent.close()
# ============================================================
# 3. 复杂多步骤工作流
# ============================================================
async def complex_workflow():
"""
复杂工作流示例:电商完整购物流程
"""
browser = Browser(config=BrowserConfig(headless=True))
workflow_steps = [
{
"name": "登录",
"task": """
登录电商网站:
1. 访问登录页面
2. 输入用户名 'testuser' 和密码 'testpass'
3. 完成登录验证
""",
},
{
"name": "搜索商品",
"task": """
搜索并选择商品:
1. 在搜索框输入 '无线蓝牙耳机'
2. 点击搜索
3. 筛选条件:价格 100-300 元,评分 4.5 以上
4. 选择第一个符合条件的商品
""",
},
{
"name": "加入购物车",
"task": """
将商品加入购物车:
1. 选择颜色:黑色
2. 选择数量:2
3. 点击'加入购物车'按钮
4. 验证添加成功
""",
},
{
"name": "结算",
"task": """
完成结算流程:
1. 进入购物车
2. 确认商品信息
3. 点击'去结算'
4. 选择默认收货地址
5. 选择支付方式:支付宝
6. 提交订单(不实际支付)
""",
},
]
results = []
try:
for step in workflow_steps:
print(f"\n执行步骤: {step['name']}")
agent = Agent(
task=step["task"],
llm=llm,
browser=browser,
max_steps=50,
)
result = await agent.run()
results.append({
"step": step["name"],
"success": result.is_successful(),
"result": result.final_result(),
})
if not result.is_successful():
print(f"步骤 {step['name']} 失败,终止工作流")
break
finally:
await browser.close()
return results
5.5 工具调用(Function Calling)机制解析
Browser-use 内部使用工具调用机制让 LLM 与浏览器交互。
# llm/function_calling.py
"""
工具调用(Function Calling)机制解析
理解 Browser-use 如何让 LLM 控制浏览器
"""
from browser_use import Agent, Controller
from browser_use.browser.context import BrowserContext
from langchain_openai import ChatOpenAI
import asyncio
# ============================================================
# 1. 工具调用机制概述
# ============================================================
"""
Browser-use 的工具调用流程:
1. 页面分析
- 获取当前页面 HTML
- 提取可交互元素
- 为每个元素分配索引
2. 构建提示词
- 系统提示词(定义可用工具)
- 页面状态(当前可执行的操作)
- 任务目标
3. LLM 决策
- 分析当前状态
- 选择要执行的工具
- 确定工具参数
4. 执行工具
- 调用对应的浏览器操作
- 获取执行结果
- 更新页面状态
5. 循环迭代
- 重复步骤 1-4
- 直到任务完成或达到限制
"""
# ============================================================
# 2. 自定义工具示例
# ============================================================
controller = Controller()
@controller.action("获取页面性能指标")
async def get_performance_metrics(browser: BrowserContext) -> dict:
"""
自定义工具:获取页面性能指标
这个工具展示了如何创建自定义功能
让 LLM 能够获取页面的性能数据
"""
page = await browser.get_current_page()
# 使用 Performance API 获取指标
metrics = await page.evaluate("""
() => {
const navigation = performance.getEntriesByType('navigation')[0];
const paint = performance.getEntriesByType('paint');
return {
// 加载时间指标
dns_lookup: navigation.domainLookupEnd - navigation.domainLookupStart,
tcp_connection: navigation.connectEnd - navigation.connectStart,
server_response: navigation.responseEnd - navigation.requestStart,
dom_processing: navigation.domComplete - navigation.domLoading,
total_load: navigation.loadEventEnd - navigation.startTime,
// 渲染指标
first_paint: paint.find(p => p.name === 'first-paint')?.startTime,
first_contentful_paint: paint.find(p => p.name === 'first-contentful-paint')?.startTime,
// 资源数量
resource_count: performance.getEntriesByType('resource').length,
};
}
""")
return metrics
@controller.action("执行 JavaScript")
async def execute_javascript(
browser: BrowserContext,
script: str,
return_value: bool = True
) -> any:
"""
自定义工具:在页面中执行 JavaScript
Args:
script: 要执行的 JavaScript 代码
return_value: 是否返回执行结果
"""
page = await browser.get_current_page()
if return_value:
result = await page.evaluate(f"() => {{ {script} }}")
return result
else:
await page.evaluate(f"() => {{ {script} }}")
return "Script executed successfully"
@controller.action("检查元素是否存在")
async def check_element_exists(
browser: BrowserContext,
selector: str,
timeout: int = 5000
) -> dict:
"""
自定义工具:检查元素是否存在
返回详细的元素状态信息
"""
page = await browser.get_current_page()
try:
element = await page.wait_for_selector(selector, timeout=timeout, state='attached')
if element:
# 获取元素详细信息
info = await element.evaluate("""
el => ({
tagName: el.tagName,
id: el.id,
className: el.className,
textContent: el.textContent?.substring(0, 100),
isVisible: el.offsetParent !== null,
rect: el.getBoundingClientRect(),
})
""")
return {
"exists": True,
"info": info,
}
except:
pass
return {
"exists": False,
"selector": selector,
}
# ============================================================
# 3. 工具调用调试
# ============================================================
async def debug_tool_calls():
"""
调试工具调用过程
"""
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# 创建带调试功能的 Agent
agent = Agent(
task="访问 example.com,获取页面性能指标",
llm=llm,
controller=controller, # 使用自定义控制器
)
# 执行并观察工具调用
result = await agent.run()
# 分析执行历史
print("\n=== 工具调用分析 ===")
print(f"总步数: {result.total_steps()}")
print(f"总 Token: {result.total_tokens()}")
# 打印每一步的工具调用
for i, step in enumerate(result.steps):
print(f"\n步骤 {i+1}:")
print(f" 模型思考: {step.model_thought[:200]}...")
print(f" 执行动作: {step.action}")
print(f" 执行结果: {step.result}")
# ============================================================
# 4. 工具调用最佳实践
# ============================================================
"""
工具调用最佳实践:
1. 工具设计原则
- 单一职责:每个工具只做一件事
- 明确参数:参数名和类型清晰
- 错误处理:返回明确的错误信息
- 幂等性:同一操作多次执行结果一致
2. 提示词优化
- 清晰的工具描述
- 明确的参数说明
- 使用示例
3. 性能考虑
- 避免频繁的工具调用
- 合并相关的操作
- 缓存重复的结果
4. 安全考虑
- 验证所有输入参数
- 限制操作范围
- 避免执行危险操作
"""
# 使用示例
if __name__ == "__main__":
asyncio.run(debug_tool_calls())
第6章:典型应用场景实战
6.1 场景一:自动化表单填写与提交
# scenarios/form_automation.py
"""
场景一:自动化表单填写与提交
适用于:用户注册、信息录入、调查问卷等场景
"""
from browser_use import Agent, Browser, BrowserConfig
from langchain_openai import ChatOpenAI
import asyncio
import json
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# ============================================================
# 1. 基础表单填写
# ============================================================
async def basic_form_filling():
"""
基础表单填写示例
演示如何自动填写简单的注册表单
"""
agent = Agent(
task="""
完成用户注册表单:
目标网站:https://example.com/register
填写信息:
- 用户名:auto_test_user_001
- 电子邮箱:autotest001@example.com
- 密码:TestPass123!
- 确认密码:TestPass123!
- 手机号码:13800138001
- 性别:男
- 出生日期:1990-01-01
操作步骤:
1. 访问注册页面
2. 依次填写所有字段
3. 勾选"同意服务条款"复选框
4. 点击"注册"按钮
5. 验证注册成功(检查页面跳转或成功消息)
注意事项:
- 如果用户名已存在,尝试 auto_test_user_002、003 等
- 确保所有必填字段都已填写
- 截图保存注册结果
""",
llm=llm,
max_steps=30,
)
result = await agent.run()
if result.is_successful():
print("✅ 表单填写成功")
print(f"结果: {result.final_result()}")
else:
print("❌ 表单填写失败")
print(f"错误: {result.errors()}")
return result
# ============================================================
# 2. 动态表单处理
# ============================================================
async def dynamic_form_handling():
"""
动态表单处理
处理条件显示、动态添加字段等复杂表单
"""
agent = Agent(
task="""
完成动态表单填写:
目标:填写一个带有条件字段的申请表
基础信息:
- 姓名:张三
- 身份证号:110101199001011234
- 申请类型:企业申请
条件字段(根据申请类型动态显示):
- 企业名称:测试科技有限公司
- 统一社会信用代码:91110108MA0012345
- 注册资本:1000万元
- 成立日期:2020-01-01
操作步骤:
1. 访问申请页面
2. 填写基础信息
3. 选择"企业申请"类型
4. 等待企业相关字段加载
5. 填写企业信息
6. 上传营业执照(如果有上传按钮)
7. 提交申请
8. 保存申请编号
处理策略:
- 如果字段未显示,先选择对应的申请类型
- 如果上传失败,记录错误继续提交
- 验证所有必填项已填写
""",
llm=llm,
max_steps=40,
)
return await agent.run()
# ============================================================
# 3. 批量表单填写
# ============================================================
async def batch_form_filling(data_list: list):
"""
批量表单填写
使用相同模板填写多组数据
Args:
data_list: 包含多组表单数据的列表
"""
browser = Browser(config=BrowserConfig(headless=True))
results = []
try:
for i, data in enumerate(data_list):
print(f"\n处理第 {i+1}/{len(data_list)} 条数据...")
# 构建任务描述
task = f"""
填写数据录入表单:
数据编号:{data.get('id', i+1)}
填写内容:
{json.dumps(data, ensure_ascii=False, indent=2)}
操作要求:
1. 访问 https://example.com/data-entry
2. 清空所有字段(如有旧数据)
3. 按顺序填写所有字段
4. 点击"保存"按钮
5. 确认保存成功
6. 点击"添加下一条"继续
数据验证:
- 确保数值字段格式正确
- 日期字段使用 YYYY-MM-DD 格式
- 必填字段不能为空
"""
agent = Agent(
task=task,
llm=llm,
browser=browser,
max_steps=25,
)
result = await agent.run()
results.append({
"index": i,
"data": data,
"success": result.is_successful(),
"result": result.final_result(),
})
# 短暂等待,避免请求过快
await asyncio.sleep(2)
finally:
await browser.close()
# 输出汇总报告
success_count = sum(1 for r in results if r["success"])
print(f"\n批量处理完成:成功 {success_count}/{len(data_list)}")
return results
# 批量数据示例
sample_data = [
{
"id": "EMP001",
"name": "张三",
"department": "技术部",
"position": "高级工程师",
"salary": 25000,
"entry_date": "2024-01-15",
},
{
"id": "EMP002",
"name": "李四",
"department": "产品部",
"position": "产品经理",
"salary": 28000,
"entry_date": "2024-02-01",
},
{
"id": "EMP003",
"name": "王五",
"department": "设计部",
"position": "UI设计师",
"salary": 22000,
"entry_date": "2024-03-10",
},
]
# ============================================================
# 4. 表单验证与错误处理
# ============================================================
async def form_with_validation():
"""
带验证的表单填写
处理表单验证错误,自动修正
"""
agent = Agent(
task="""
填写并验证表单:
目标:完成带实时验证的注册表单
初始数据(可能有错误):
- 邮箱:invalid-email-format
- 密码:123
- 手机号:12345678901
验证规则:
- 邮箱:必须符合邮箱格式
- 密码:至少8位,包含大小写字母和数字
- 手机号:11位数字,以1开头
处理流程:
1. 尝试填写初始数据
2. 观察验证错误提示
3. 根据错误提示修正数据:
- 邮箱改为:valid_user@example.com
- 密码改为:SecurePass123
- 手机号改为:13800138001
4. 重新提交
5. 验证通过,完成注册
成功标准:
- 无验证错误提示
- 页面跳转到成功页面
- 显示"注册成功"消息
""",
llm=llm,
max_steps=35,
max_failures=5, # 允许更多重试
)
return await agent.run()
# ============================================================
# 5. 生产环境表单自动化类
# ============================================================
class FormAutomation:
"""
表单自动化类(生产级)
封装表单填写的常用功能
"""
def __init__(self, llm, browser=None):
self.llm = llm
self.browser = browser or Browser(config=BrowserConfig(headless=True))
self.results = []
async def fill_form(self, form_url: str, data: dict, validation_rules: dict = None) -> dict:
"""
填写单个表单
Args:
form_url: 表单页面 URL
data: 表单数据字典
validation_rules: 字段验证规则
Returns:
dict: 执行结果
"""
# 构建任务描述
fields_desc = "\n".join([f"- {k}: {v}" for k, v in data.items()])
task = f"""
填写表单:
页面:{form_url}
填写字段:
{fields_desc}
操作步骤:
1. 访问表单页面
2. 填写所有字段
3. 提交表单
4. 验证提交成功
"""
if validation_rules:
task += f"\n\n验证规则:\n{json.dumps(validation_rules, ensure_ascii=False)}"
agent = Agent(
task=task,
llm=self.llm,
browser=self.browser,
max_steps=25,
validate_output=True,
)
result = await agent.run()
return {
"success": result.is_successful(),
"data": data,
"result": result.final_result(),
"steps": result.total_steps(),
}
async def batch_fill(self, form_url: str, data_list: list, delay: int = 2) -> list:
"""
批量填写表单
Args:
form_url: 表单页面 URL
data_list: 数据列表
delay: 间隔延迟(秒)
Returns:
list: 所有执行结果
"""
results = []
for i, data in enumerate(data_list):
print(f"处理 {i+1}/{len(data_list)}...")
result = await self.fill_form(form_url, data)
results.append(result)
if i < len(data_list) - 1: # 不是最后一条
await asyncio.sleep(delay)
return results
async def close(self):
"""关闭浏览器"""
await self.browser.close()
# 使用示例
async def production_form_example():
"""生产环境表单自动化示例"""
automation = FormAutomation(llm=llm)
try:
# 单条填写
result = await automation.fill_form(
form_url="https://example.com/register",
data={
"username": "testuser001",
"email": "test001@example.com",
"password": "SecurePass123!",
}
)
print(f"填写结果: {result}")
# 批量填写
batch_results = await automation.batch_fill(
form_url="https://example.com/bulk-entry",
data_list=sample_data,
delay=3,
)
print(f"批量填写完成: {len(batch_results)} 条")
finally:
await automation.close()
# 运行示例
if __name__ == "__main__":
# 运行基础示例
asyncio.run(basic_form_filling())
6.2 场景二:电商网站数据抓取与比价
- 动态内容处理:页面结构频繁变化,传统选择器易失效
- 复杂多步骤任务:需要推理和决策的流程(如比价、筛选)
- 自然语言交互测试:验证系统对用户意图的理解
- 快速原型验证:无需编写大量代码即可验证自动化可行性
核心优势:
第7章:错误处理与调试技巧
7.1 常见错误类型与排查方法
# debugging/error_handling.py
"""
错误处理与排查
Browser-use 常见错误及解决方案
"""
from browser_use import Agent, Browser, BrowserConfig
from langchain_openai import ChatOpenAI
import asyncio
from typing import Dict, List, Optional
import os
# 从环境变量读取API密钥
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("请设置 OPENAI_API_KEY 环境变量")
llm = ChatOpenAI(model="gpt-4o", temperature=0.1, api_key=api_key)
# ============================================================
# 1. 常见错误类型分类
# ============================================================
"""
Browser-use 常见错误类型:
1. 页面相关错误
- TimeoutError: 页面加载超时
- ElementNotFoundError: 元素未找到
- NavigationError: 导航失败
2. LLM 相关错误
- RateLimitError: API 限流
- TokenLimitError: Token 超限
- APIError: LLM API 错误
3. 浏览器相关错误
- BrowserCrashError: 浏览器崩溃
- ContextError: 上下文错误
- ScreenshotError: 截图失败
4. 任务执行错误
- MaxStepsExceeded: 超过最大步数
- TaskFailedError: 任务执行失败
- ValidationError: 输出验证失败
"""
# ============================================================
# 2. 错误处理装饰器
# ============================================================
def retry_on_error(max_retries: int = 3, delay: float = 2.0):
"""
错误重试装饰器
Args:
max_retries: 最大重试次数
delay: 重试间隔(秒)
"""
def decorator(func):
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < max_retries:
print(f"⚠️ 尝试 {attempt + 1}/{max_retries + 1} 失败: {e}")
print(f"🔄 {delay}秒后重试...")
await asyncio.sleep(delay)
else:
print(f"❌ 所有重试都失败")
raise last_exception
return wrapper
return decorator
@retry_on_error(max_retries=3, delay=2)
async def robust_agent_execution(task: str, llm) -> Dict:
"""
带重试机制的 Agent 执行
Args:
task: 任务描述
llm: LLM 实例
Returns:
Dict: 执行结果
"""
agent = Agent(
task=task,
llm=llm,
max_steps=50,
max_failures=3,
retry_delay=5,
)
result = await agent.run()
return {
"success": result.is_successful(),
"result": result.final_result() if result.is_successful() else None,
"errors": result.errors() if not result.is_successful() else [],
"steps": result.total_steps(),
}
# ============================================================
# 3. 错误分类处理
# ============================================================
class ErrorHandler:
"""
错误处理器
根据错误类型采取不同的处理策略
"""
@staticmethod
def classify_error(error: Exception) -> str:
"""
分类错误类型
Args:
error: 异常对象
Returns:
str: 错误类型
"""
error_msg = str(error).lower()
if "timeout" in error_msg:
return "timeout"
elif "element" in error_msg and "not found" in error_msg:
return "element_not_found"
elif "rate limit" in error_msg or "429" in error_msg:
return "rate_limit"
elif "token" in error_msg:
return "token_limit"
elif "navigation" in error_msg:
return "navigation"
elif "browser" in error_msg and "crash" in error_msg:
return "browser_crash"
else:
return "unknown"
@staticmethod
def get_recovery_strategy(error_type: str) -> Dict:
"""
获取恢复策略
Args:
error_type: 错误类型
Returns:
Dict: 恢复策略
"""
strategies = {
"timeout": {
"action": "retry_with_delay",
"delay": 10,
"message": "页面加载超时,等待后重试",
},
"element_not_found": {
"action": "refresh_and_retry",
"message": "元素未找到,刷新页面后重试",
},
"rate_limit": {
"action": "exponential_backoff",
"initial_delay": 30,
"message": "API 限流,使用指数退避策略",
},
"token_limit": {
"action": "reduce_context",
"message": "Token 超限,减少上下文长度",
},
"navigation": {
"action": "verify_url",
"message": "导航失败,检查 URL 是否正确",
},
"browser_crash": {
"action": "restart_browser",
"message": "浏览器崩溃,重启浏览器实例",
},
"unknown": {
"action": "log_and_continue",
"message": "未知错误,记录后继续",
},
}
return strategies.get(error_type, strategies["unknown"])
# ============================================================
# 4. 调试信息收集
# ============================================================
class DebugCollector:
"""
调试信息收集器
收集执行过程中的调试信息
"""
def __init__(self):
self.logs: List[Dict] = []
self.screenshots: List[bytes] = []
self.network_logs: List[Dict] = []
def log(self, level: str, message: str, context: Dict = None):
"""记录日志"""
import time
self.logs.append({
"timestamp": time.time(),
"level": level,
"message": message,
"context": context or {},
})
def add_screenshot(self, screenshot: bytes, description: str = ""):
"""添加截图"""
import time
self.screenshots.append({
"data": screenshot,
"description": description,
"timestamp": time.time(),
})
def generate_report(self) -> str:
"""生成调试报告"""
report = []
report.append("=" * 50)
report.append("调试报告")
report.append("=" * 50)
# 日志汇总
report.append("\n【日志记录】")
for log in self.logs:
report.append(f"[{log['level'].upper()}] {log['message']}")
# 截图信息
report.append(f"\n【截图】共 {len(self.screenshots)} 张")
return "\n".join(report)
# ============================================================
# 5. 生产级错误处理示例
# ============================================================
async def production_error_handling():
"""
生产环境错误处理示例
"""
debug_collector = DebugCollector()
try:
debug_collector.log("info", "开始执行任务")
agent = Agent(
task="访问 example.com 并获取标题",
llm=llm,
max_steps=30,
)
result = await agent.run()
if result.is_successful():
debug_collector.log("info", "任务执行成功")
return result.final_result()
else:
# 收集错误信息
errors = result.errors()
debug_collector.log("error", f"任务执行失败: {errors}")
# 分类并处理错误
for error in errors:
error_type = ErrorHandler.classify_error(Exception(str(error)))
strategy = ErrorHandler.get_recovery_strategy(error_type)
debug_collector.log("warning",
f"错误类型: {error_type}, 策略: {strategy['message']}")
# 根据错误类型决定是否重试
raise Exception(f"任务失败: {errors}")
except Exception as e:
debug_collector.log("error", f"异常: {e}")
# 生成调试报告
report = debug_collector.generate_report()
print(report)
# 可以发送告警通知
# await send_alert(report)
raise
# ============================================================
# 6. 日志配置与级别控制
# ============================================================
import logging
import logging.handlers
import sys
from pathlib import Path
def setup_production_logging(
log_dir: str = "logs",
app_name: str = "browser-use",
level: int = logging.INFO
):
"""
生产级日志配置
Args:
log_dir: 日志目录
app_name: 应用名称
level: 日志级别
"""
# 创建日志目录
Path(log_dir).mkdir(parents=True, exist_ok=True)
# 创建 logger
logger = logging.getLogger(app_name)
logger.setLevel(level)
# 清除现有处理器
logger.handlers = []
# 格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
)
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 文件处理器(按大小轮转)
file_handler = logging.handlers.RotatingFileHandler(
f"{log_dir}/{app_name}.log",
maxBytes=10*1024*1024, # 10MB
backupCount=5,
)
file_handler.setLevel(level)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
# 运行示例
if __name__ == "__main__":
# 设置日志
logger = setup_production_logging()
logger.info("开始测试错误处理")
# 运行示例
try:
asyncio.run(production_error_handling())
except Exception as e:
logger.error(f"测试失败: {e}")
第8章:性能优化与资源管理
8.1 浏览器资源管理策略
# performance/resource_management.py
"""
浏览器资源管理
优化内存使用和浏览器实例生命周期
"""
from browser_use import Agent, Browser, BrowserConfig, BrowserContextConfig
from langchain_openai import ChatOpenAI
import asyncio
import psutil
import os
from typing import Optional, List
import weakref
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# ============================================================
# 1. 浏览器池管理
# ============================================================
class BrowserPool:
"""
浏览器连接池
管理多个浏览器实例,实现资源复用
"""
def __init__(
self,
max_browsers: int = 5,
max_contexts_per_browser: int = 3,
config: Optional[BrowserConfig] = None
):
"""
初始化浏览器池
Args:
max_browsers: 最大浏览器实例数
max_contexts_per_browser: 每个浏览器的最大上下文数
config: 浏览器配置
"""
self.max_browsers = max_browsers
self.max_contexts_per_browser = max_contexts_per_browser
self.config = config or BrowserConfig(headless=True)
# 浏览器实例管理
self._browsers: List[Browser] = []
self._context_counts: dict = {}
self._available_browsers: asyncio.Queue = asyncio.Queue()
self._lock = asyncio.Lock()
async def acquire_browser(self) -> Browser:
"""
获取可用浏览器实例
Returns:
Browser: 浏览器实例
"""
async with self._lock:
# 尝试获取现有可用浏览器
try:
browser = self._available_browsers.get_nowait()
if self._context_counts[browser] < self.max_contexts_per_browser:
return browser
else:
# 上下文已满,放回队列
await self._available_browsers.put(browser)
except asyncio.QueueEmpty:
pass
# 创建新浏览器实例
if len(self._browsers) < self.max_browsers:
browser = Browser(config=self.config)
self._browsers.append(browser)
self._context_counts[browser] = 0
return browser
# 等待可用浏览器
browser = await self._available_browsers.get()
return browser
async def release_browser(self, browser: Browser):
"""
释放浏览器实例
Args:
browser: 要释放的浏览器实例
"""
async with self._lock:
if browser in self._context_counts:
await self._available_browsers.put(browser)
async def close_all(self):
"""关闭所有浏览器实例"""
for browser in self._browsers:
try:
await browser.close()
except Exception as e:
print(f"关闭浏览器时出错: {e}")
self._browsers.clear()
self._context_counts.clear()
# ============================================================
# 2. 内存监控与优化
# ============================================================
class MemoryMonitor:
"""
内存监控器
实时监控内存使用并触发优化
"""
def __init__(self, threshold_mb: float = 1024):
"""
初始化内存监控器
Args:
threshold_mb: 内存使用阈值(MB)
"""
self.threshold_mb = threshold_mb
self.process = psutil.Process(os.getpid())
self.peak_memory = 0
def get_memory_usage(self) -> dict:
"""
获取当前内存使用情况
Returns:
dict: 内存使用信息
"""
memory_info = self.process.memory_info()
current_mb = memory_info.rss / 1024 / 1024
if current_mb > self.peak_memory:
self.peak_memory = current_mb
return {
"current_mb": current_mb,
"peak_mb": self.peak_memory,
"threshold_mb": self.threshold_mb,
"percent": self.process.memory_percent(),
"status": "normal" if current_mb < self.threshold_mb else "warning"
}
def check_memory(self) -> bool:
"""
检查内存是否超过阈值
Returns:
bool: 是否超过阈值
"""
usage = self.get_memory_usage()
return usage["current_mb"] > self.threshold_mb
async def monitor_loop(self, interval: int = 30):
"""
内存监控循环
Args:
interval: 检查间隔(秒)
"""
while True:
usage = self.get_memory_usage()
print(f"内存使用: {usage['current_mb']:.2f}MB / {usage['threshold_mb']}MB")
if usage["status"] == "warning":
print("⚠️ 内存使用超过阈值,触发优化...")
await self.optimize_memory()
await asyncio.sleep(interval)
async def optimize_memory(self):
"""执行内存优化"""
# 触发垃圾回收
import gc
gc.collect()
# 可以在这里添加其他优化逻辑
print("✅ 内存优化完成")
# ============================================================
# 3. 并发任务管理
# ============================================================
class ConcurrentTaskManager:
"""
并发任务管理器
控制并发数量,优化资源使用
"""
def __init__(
self,
max_concurrent: int = 3,
browser_pool: Optional[BrowserPool] = None
):
"""
初始化并发任务管理器
Args:
max_concurrent: 最大并发任务数
browser_pool: 浏览器池实例
"""
self.max_concurrent = max_concurrent
self.browser_pool = browser_pool
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = []
async def execute_task(
self,
task: str,
task_id: str,
llm
) -> dict:
"""
执行单个任务(带并发控制)
Args:
task: 任务描述
task_id: 任务ID
llm: LLM实例
Returns:
dict: 执行结果
"""
async with self.semaphore:
print(f"🚀 开始执行任务 {task_id}")
start_time = asyncio.get_event_loop().time()
try:
# 获取浏览器
browser = await self.browser_pool.acquire_browser()
# 创建Agent并执行
agent = Agent(
task=task,
llm=llm,
browser=browser,
max_steps=50,
)
result = await agent.run()
# 释放浏览器
await self.browser_pool.release_browser(browser)
end_time = asyncio.get_event_loop().time()
return {
"task_id": task_id,
"success": result.is_successful(),
"result": result.final_result() if result.is_successful() else None,
"errors": result.errors() if not result.is_successful() else [],
"duration": end_time - start_time,
}
except Exception as e:
end_time = asyncio.get_event_loop().time()
return {
"task_id": task_id,
"success": False,
"result": None,
"errors": [str(e)],
"duration": end_time - start_time,
}
async def execute_batch(
self,
tasks: List[dict],
llm
) -> List[dict]:
"""
批量执行任务
Args:
tasks: 任务列表,每个任务包含task和task_id
llm: LLM实例
Returns:
List[dict]: 执行结果列表
"""
# 创建所有任务
coroutines = [
self.execute_task(t["task"], t["task_id"], llm)
for t in tasks
]
# 并发执行
results = await asyncio.gather(*coroutines, return_exceptions=True)
# 处理结果
processed_results = []
for result in results:
if isinstance(result, Exception):
processed_results.append({
"success": False,
"errors": [str(result)]
})
else:
processed_results.append(result)
return processed_results
# ============================================================
# 4. 性能监控与报告
# ============================================================
class PerformanceMonitor:
"""
性能监控器
收集和分析性能指标
"""
def __init__(self):
self.metrics = []
def record_metric(
self,
task_id: str,
metric_type: str,
value: float,
metadata: dict = None
):
"""
记录性能指标
Args:
task_id: 任务ID
metric_type: 指标类型
value: 指标值
metadata: 附加元数据
"""
self.metrics.append({
"task_id": task_id,
"type": metric_type,
"value": value,
"timestamp": asyncio.get_event_loop().time(),
"metadata": metadata or {}
})
def generate_report(self) -> dict:
"""
生成性能报告
Returns:
dict: 性能报告
"""
if not self.metrics:
return {"message": "暂无性能数据"}
# 按类型分组统计
type_stats = {}
for metric in self.metrics:
mtype = metric["type"]
if mtype not in type_stats:
type_stats[mtype] = []
type_stats[mtype].append(metric["value"])
# 计算统计信息
report = {}
for mtype, values in type_stats.items():
report[mtype] = {
"count": len(values),
"avg": sum(values) / len(values),
"min": min(values),
"max": max(values),
}
return report
# ============================================================
# 5. 使用示例
# ============================================================
async def resource_management_demo():
"""
资源管理综合示例
"""
# 创建浏览器池
browser_pool = BrowserPool(
max_browsers=3,
max_contexts_per_browser=2
)
# 创建内存监控器
memory_monitor = MemoryMonitor(threshold_mb=2048)
# 创建并发任务管理器
task_manager = ConcurrentTaskManager(
max_concurrent=3,
browser_pool=browser_pool
)
# 创建性能监控器
perf_monitor = PerformanceMonitor()
try:
# 准备批量任务
tasks = [
{
"task_id": f"task_{i}",
"task": f"访问 https://example.com/page{i} 并获取标题"
}
for i in range(10)
]
# 检查初始内存
print(f"初始内存: {memory_monitor.get_memory_usage()}")
# 执行批量任务
results = await task_manager.execute_batch(tasks, llm)
# 检查结果
success_count = sum(1 for r in results if r.get("success"))
print(f"任务完成: {success_count}/{len(tasks)} 成功")
# 检查最终内存
print(f"最终内存: {memory_monitor.get_memory_usage()}")
return results
finally:
# 清理资源
await browser_pool.close_all()
# 运行示例
if __name__ == "__main__":
asyncio.run(resource_management_demo())
8.2 执行速度优化技巧
# performance/speed_optimization.py
"""
执行速度优化
提升Agent任务执行效率
"""
from browser_use import Agent, Browser, BrowserConfig
from langchain_openai import ChatOpenAI
import asyncio
from typing import List
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# ============================================================
# 1. 任务并行化
# ============================================================
async def parallel_tasks():
"""
并行执行多个独立任务
"""
tasks = [
Agent(
task="访问 https://site1.com 获取标题",
llm=llm,
max_steps=20,
).run(),
Agent(
task="访问 https://site2.com 获取标题",
llm=llm,
max_steps=20,
).run(),
Agent(
task="访问 https://site3.com 获取标题",
llm=llm,
max_steps=20,
).run(),
]
# 并行执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# ============================================================
# 2. 预热与缓存
# ============================================================
class WarmupManager:
"""
预热管理器
预先初始化资源,减少首次执行延迟
"""
def __init__(self):
self.warmed_up = False
self._browser: Browser = None
async def warmup(self):
"""执行预热"""
if self.warmed_up:
return
print("🔥 开始预热...")
# 预热浏览器
self._browser = Browser(config=BrowserConfig(headless=True))
# 执行一个简单的任务来预热LLM连接
warmup_agent = Agent(
task="访问 https://example.com",
llm=llm,
browser=self._browser,
max_steps=5,
)
await warmup_agent.run()
self.warmed_up = True
print("✅ 预热完成")
def get_browser(self) -> Browser:
"""获取预热的浏览器实例"""
return self._browser
async def cleanup(self):
"""清理资源"""
if self._browser:
await self._browser.close()
# ============================================================
# 3. 智能重试策略
# ============================================================
async def smart_retry(
task: str,
llm,
max_retries: int = 3,
base_delay: float = 1.0
) -> dict:
"""
智能重试机制
使用指数退避策略
Args:
task: 任务描述
llm: LLM实例
max_retries: 最大重试次数
base_delay: 基础延迟
Returns:
dict: 执行结果
"""
for attempt in range(max_retries + 1):
try:
agent = Agent(
task=task,
llm=llm,
max_steps=50,
)
result = await agent.run()
if result.is_successful():
return {
"success": True,
"result": result.final_result(),
"attempts": attempt + 1
}
# 任务失败但无异常,可能是业务逻辑失败
if attempt < max_retries:
delay = base_delay * (2 ** attempt)
print(f"任务失败,{delay}秒后重试...")
await asyncio.sleep(delay)
except Exception as e:
if attempt < max_retries:
delay = base_delay * (2 ** attempt)
print(f"异常: {e},{delay}秒后重试...")
await asyncio.sleep(delay)
else:
return {
"success": False,
"error": str(e),
"attempts": attempt + 1
}
return {
"success": False,
"error": "达到最大重试次数",
"attempts": max_retries + 1
}
# ============================================================
# 4. 批处理优化
# ============================================================
class BatchOptimizer:
"""
批处理优化器
合并小任务,减少开销
"""
def __init__(self, batch_size: int = 5):
self.batch_size = batch_size
self.pending_tasks = []
def add_task(self, task: str) -> str:
"""
添加任务到批次
Args:
task: 任务描述
Returns:
str: 任务ID
"""
task_id = f"task_{len(self.pending_tasks)}"
self.pending_tasks.append({
"id": task_id,
"task": task
})
return task_id
async def execute_batch(self, llm) -> List[dict]:
"""
执行批次任务
Args:
llm: LLM实例
Returns:
List[dict]: 执行结果
"""
if not self.pending_tasks:
return []
# 合并任务描述
combined_task = self._combine_tasks(self.pending_tasks)
# 执行合并后的任务
agent = Agent(
task=combined_task,
llm=llm,
max_steps=100,
)
result = await agent.run()
# 解析结果并分配给各个任务
# 这里需要根据实际返回格式解析
results = []
for task in self.pending_tasks:
results.append({
"task_id": task["id"],
"success": result.is_successful(),
"result": result.final_result() if result.is_successful() else None
})
# 清空待处理列表
self.pending_tasks = []
return results
def _combine_tasks(self, tasks: List[dict]) -> str:
"""合并多个任务描述"""
combined = ["执行以下多个任务:"]
for i, task in enumerate(tasks, 1):
combined.append(f"\n任务{i}: {task['task']}")
combined.append("\n请按顺序完成以上所有任务,并分别返回结果。")
return "\n".join(combined)
# 运行示例
if __name__ == "__main__":
# 测试并行任务
results = asyncio.run(parallel_tasks())
print(f"并行任务完成: {len(results)} 个")
第9章:安全与合规
9.1 敏感数据处理
# security/sensitive_data.py
"""
敏感数据处理
安全处理密码、密钥等敏感信息
"""
from browser_use import Agent, Browser, BrowserConfig
from langchain_openai import ChatOpenAI
import os
from typing import Optional
from cryptography.fernet import Fernet
import hashlib
import base64
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# ============================================================
# 1. 环境变量管理
# ============================================================
class SecureConfig:
"""
安全配置管理
从环境变量安全读取敏感信息
"""
@staticmethod
def get_api_key(provider: str) -> str:
"""
获取API密钥
Args:
provider: 提供商名称 (openai/azure/anthropic等)
Returns:
str: API密钥
"""
env_vars = {
"openai": "OPENAI_API_KEY",
"azure": "AZURE_OPENAI_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"google": "GOOGLE_API_KEY",
}
env_var = env_vars.get(provider)
if not env_var:
raise ValueError(f"未知的提供商: {provider}")
api_key = os.getenv(env_var)
if not api_key:
raise ValueError(f"环境变量 {env_var} 未设置")
return api_key
@staticmethod
def get_credentials(service: str) -> dict:
"""
获取服务凭证
Args:
service: 服务名称
Returns:
dict: 凭证信息
"""
prefix = service.upper()
credentials = {
"username": os.getenv(f"{prefix}_USERNAME"),
"password": os.getenv(f"{prefix}_PASSWORD"),
}
if not all(credentials.values()):
raise ValueError(f"服务 {service} 的凭证未完整配置")
return credentials
# ============================================================
# 2. 数据加密
# ============================================================
class DataEncryption:
"""
数据加密工具
加密敏感数据
"""
def __init__(self, key: Optional[bytes] = None):
"""
初始化加密器
Args:
key: 加密密钥,None则自动生成
"""
if key is None:
key = Fernet.generate_key()
self.cipher = Fernet(key)
self.key = key
def encrypt(self, data: str) -> str:
"""
加密数据
Args:
data: 要加密的数据
Returns:
str: 加密后的数据
"""
encoded = data.encode('utf-8')
encrypted = self.cipher.encrypt(encoded)
return base64.urlsafe_b64encode(encrypted).decode('utf-8')
def decrypt(self, encrypted_data: str) -> str:
"""
解密数据
Args:
encrypted_data: 加密的数据
Returns:
str: 解密后的数据
"""
decoded = base64.urlsafe_b64decode(encrypted_data.encode('utf-8'))
decrypted = self.cipher.decrypt(decoded)
return decrypted.decode('utf-8')
@staticmethod
def hash_data(data: str) -> str:
"""
计算数据哈希
Args:
data: 要哈希的数据
Returns:
str: SHA-256哈希值
"""
return hashlib.sha256(data.encode('utf-8')).hexdigest()
# ============================================================
# 3. 安全任务执行
# ============================================================
class SecureAgent:
"""
安全Agent
在任务执行中保护敏感信息
"""
def __init__(self, llm):
self.llm = llm
self.sensitive_patterns = [
r'password[:\s]+\S+',
r'api[_-]?key[:\s]+\S+',
r'secret[:\s]+\S+',
r'token[:\s]+\S+',
]
async def execute_with_credentials(
self,
task_template: str,
credentials: dict,
url: str
):
"""
使用凭证安全执行任务
Args:
task_template: 任务模板,使用占位符如 {username}
credentials: 凭证字典
url: 目标URL
Returns:
执行结果
"""
# 替换模板中的占位符
task = task_template.format(**credentials)
# 添加安全提示
secure_task = f"""
{task}
安全要求:
1. 不要在日志中记录密码或其他敏感信息
2. 确保使用HTTPS连接
3. 任务完成后清除表单中的敏感数据
4. 不要截图包含密码的页面
"""
agent = Agent(
task=secure_task,
llm=self.llm,
max_steps=30,
)
return await agent.run()
def sanitize_log(self, message: str) -> str:
"""
清理日志中的敏感信息
Args:
message: 原始日志消息
Returns:
str: 清理后的消息
"""
import re
sanitized = message
for pattern in self.sensitive_patterns:
sanitized = re.sub(
pattern,
lambda m: m.group(0).split(':')[0] + ': ***',
sanitized,
flags=re.IGNORECASE
)
return sanitized
# ============================================================
# 4. 使用示例
# ============================================================
async def secure_login_example():
"""
安全登录示例
"""
# 从环境变量获取凭证
credentials = SecureConfig.get_credentials("TEST_SITE")
# 创建安全Agent
secure_agent = SecureAgent(llm)
# 定义任务模板
task_template = """
登录测试网站:
1. 访问 https://secure.example.com/login
2. 在用户名框输入:{username}
3. 在密码框输入:{password}
4. 点击登录按钮
5. 验证登录成功
"""
# 安全执行任务
result = await secure_agent.execute_with_credentials(
task_template=task_template,
credentials=credentials,
url="https://secure.example.com"
)
return result
if __name__ == "__main__":
import asyncio
asyncio.run(secure_login_example())
9.2 访问控制与审计
# security/access_control.py
"""
访问控制与审计
实现任务执行的权限控制和审计日志
"""
from browser_use import Agent
from langchain_openai import ChatOpenAI
from datetime import datetime
from typing import List, Dict, Optional
from enum import Enum
import json
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# ============================================================
# 1. 权限定义
# ============================================================
class Permission(Enum):
"""权限枚举"""
READ = "read" # 只读访问
WRITE = "write" # 写入权限
DELETE = "delete" # 删除权限
ADMIN = "admin" # 管理权限
SCRAPE = "scrape" # 数据抓取
FORM_FILL = "form_fill" # 表单填写
NAVIGATE = "navigate" # 页面导航
# ============================================================
# 2. 用户角色
# ============================================================
class Role:
"""用户角色"""
def __init__(self, name: str, permissions: List[Permission]):
self.name = name
self.permissions = set(permissions)
def has_permission(self, permission: Permission) -> bool:
"""检查是否有指定权限"""
return permission in self.permissions
# 预定义角色
ROLES = {
"viewer": Role("viewer", [Permission.READ, Permission.NAVIGATE]),
"operator": Role("operator", [
Permission.READ, Permission.WRITE,
Permission.NAVIGATE, Permission.FORM_FILL
]),
"analyst": Role("analyst", [
Permission.READ, Permission.SCRAPE,
Permission.NAVIGATE
]),
"admin": Role("admin", list(Permission)),
}
# ============================================================
# 3. 审计日志
# ============================================================
class AuditLogger:
"""
审计日志记录器
记录所有关键操作
"""
def __init__(self, log_file: str = "audit.log"):
self.log_file = log_file
self.entries: List[Dict] = []
def log(
self,
user_id: str,
action: str,
resource: str,
status: str,
details: Dict = None
):
"""
记录审计日志
Args:
user_id: 用户ID
action: 执行的动作
resource: 操作的资源
status: 执行状态
details: 详细信息
"""
entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"action": action,
"resource": resource,
"status": status,
"details": details or {}
}
self.entries.append(entry)
# 写入文件
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
def query(
self,
user_id: Optional[str] = None,
action: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None
) -> List[Dict]:
"""
查询审计日志
Args:
user_id: 用户ID过滤
action: 动作过滤
start_time: 开始时间
end_time: 结束时间
Returns:
List[Dict]: 匹配的日志条目
"""
results = self.entries
if user_id:
results = [e for e in results if e["user_id"] == user_id]
if action:
results = [e for e in results if e["action"] == action]
if start_time:
results = [
e for e in results
if datetime.fromisoformat(e["timestamp"]) >= start_time
]
if end_time:
results = [
e for e in results
if datetime.fromisoformat(e["timestamp"]) <= end_time
]
return results
def generate_report(self, period: str = "daily") -> Dict:
"""
生成审计报告
Args:
period: 报告周期 (daily/weekly/monthly)
Returns:
Dict: 审计报告
"""
stats = {
"total_actions": len(self.entries),
"actions_by_type": {},
"actions_by_user": {},
"success_rate": 0,
}
success_count = 0
for entry in self.entries:
# 按类型统计
action = entry["action"]
stats["actions_by_type"][action] = \
stats["actions_by_type"].get(action, 0) + 1
# 按用户统计
user = entry["user_id"]
stats["actions_by_user"][user] = \
stats["actions_by_user"].get(user, 0) + 1
# 成功率
if entry["status"] == "success":
success_count += 1
if stats["total_actions"] > 0:
stats["success_rate"] = success_count / stats["total_actions"]
return stats
# ============================================================
# 4. 访问控制管理器
# ============================================================
class AccessControlManager:
"""
访问控制管理器
管理用户权限和访问控制
"""
def __init__(self):
self.users: Dict[str, Role] = {}
self.audit_logger = AuditLogger()
def assign_role(self, user_id: str, role_name: str):
"""
分配角色给用户
Args:
user_id: 用户ID
role_name: 角色名称
"""
if role_name not in ROLES:
raise ValueError(f"未知角色: {role_name}")
self.users[user_id] = ROLES[role_name]
self.audit_logger.log(
user_id="system",
action="assign_role",
resource=f"user:{user_id}",
status="success",
details={"role": role_name}
)
def check_permission(
self,
user_id: str,
permission: Permission
) -> bool:
"""
检查用户权限
Args:
user_id: 用户ID
permission: 要检查的权限
Returns:
bool: 是否有权限
"""
role = self.users.get(user_id)
if not role:
return False
return role.has_permission(permission)
async def execute_with_auth(
self,
user_id: str,
task: str,
required_permission: Permission,
resource: str
):
"""
带权限检查的任务执行
Args:
user_id: 用户ID
task: 任务描述
required_permission: 所需权限
resource: 操作的资源
Returns:
执行结果
"""
# 检查权限
if not self.check_permission(user_id, required_permission):
self.audit_logger.log(
user_id=user_id,
action="execute_task",
resource=resource,
status="denied",
details={"reason": "insufficient_permissions"}
)
raise PermissionError(f"用户 {user_id} 没有 {required_permission.value} 权限")
# 记录开始
self.audit_logger.log(
user_id=user_id,
action="execute_task",
resource=resource,
status="started",
details={"task": task[:100]} # 只记录前100字符
)
try:
# 执行任务
agent = Agent(
task=task,
llm=llm,
max_steps=50,
)
result = await agent.run()
# 记录成功
self.audit_logger.log(
user_id=user_id,
action="execute_task",
resource=resource,
status="success",
details={
"steps": result.total_steps(),
"success": result.is_successful()
}
)
return result
except Exception as e:
# 记录失败
self.audit_logger.log(
user_id=user_id,
action="execute_task",
resource=resource,
status="failed",
details={"error": str(e)}
)
raise
# ============================================================
# 5. 使用示例
# ============================================================
async def access_control_demo():
"""
访问控制演示
"""
# 创建访问控制管理器
acm = AccessControlManager()
# 分配角色
acm.assign_role("user1", "viewer")
acm.assign_role("user2", "operator")
acm.assign_role("admin1", "admin")
# 测试权限检查
print("权限检查:")
print(f"user1 是否有 SCRAPE 权限: {acm.check_permission('user1', Permission.SCRAPE)}")
print(f"user2 是否有 FORM_FILL 权限: {acm.check_permission('user2', Permission.FORM_FILL)}")
print(f"admin1 是否有 ADMIN 权限: {acm.check_permission('admin1', Permission.ADMIN)}")
# 生成审计报告
report = acm.audit_logger.generate_report()
print(f"\n审计报告: {report}")
if __name__ == "__main__":
import asyncio
asyncio.run(access_control_demo())
第10章:实战案例与项目模板
10.1 完整项目:自动化测试平台
# projects/automation_testing_platform/
"""
自动化测试平台
基于 browser-use 的完整企业级测试解决方案
"""
from browser_use import Agent, Browser, BrowserConfig, Controller
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import List, Dict, Optional, Callable
import asyncio
import json
from datetime import datetime
from pathlib import Path
import yaml
# ============================================================
# 1. 测试用例定义
# ============================================================
class TestCase(BaseModel):
"""测试用例模型"""
id: str = Field(..., description="测试用例ID")
name: str = Field(..., description="测试用例名称")
description: str = Field(..., description="测试描述")
preconditions: List[str] = Field(default=[], description="前置条件")
steps: List[str] = Field(..., description="测试步骤")
expected_results: List[str] = Field(..., description="预期结果")
priority: str = Field(default="medium", description="优先级")
tags: List[str] = Field(default=[], description="标签")
class TestSuite(BaseModel):
"""测试套件模型"""
name: str = Field(..., description="套件名称")
description: str = Field(..., description="套件描述")
test_cases: List[TestCase] = Field(..., description="测试用例列表")
setup: Optional[str] = Field(None, description="套件前置操作")
teardown: Optional[str] = Field(None, description="套件后置操作")
# ============================================================
# 2. 测试执行引擎
# ============================================================
class TestExecutionEngine:
"""
测试执行引擎
执行测试套件并收集结果
"""
def __init__(
self,
llm,
browser_config: Optional[BrowserConfig] = None
):
self.llm = llm
self.browser_config = browser_config or BrowserConfig(headless=True)
self.results = []
async def execute_test_case(
self,
test_case: TestCase,
browser: Browser
) -> Dict:
"""
执行单个测试用例
Args:
test_case: 测试用例
browser: 浏览器实例
Returns:
Dict: 执行结果
"""
print(f"\n{'='*60}")
print(f"执行测试: {test_case.name} ({test_case.id})")
print(f"{'='*60}")
start_time = datetime.now()
# 构建任务描述
task = self._build_task_description(test_case)
try:
# 创建Agent并执行
agent = Agent(
task=task,
llm=self.llm,
browser=browser,
max_steps=100,
)
result = await agent.run()
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# 分析结果
test_result = {
"test_id": test_case.id,
"test_name": test_case.name,
"status": "passed" if result.is_successful() else "failed",
"duration": duration,
"steps_executed": result.total_steps(),
"output": result.final_result() if result.is_successful() else None,
"errors": result.errors() if not result.is_successful() else [],
"timestamp": datetime.now().isoformat(),
}
status_icon = "✅" if test_result["status"] == "passed" else "❌"
print(f"{status_icon} 测试结果: {test_result['status'].upper()}")
print(f"⏱️ 执行时间: {duration:.2f}秒")
return test_result
except Exception as e:
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
return {
"test_id": test_case.id,
"test_name": test_case.name,
"status": "error",
"duration": duration,
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
async def execute_test_suite(
self,
test_suite: TestSuite
) -> Dict:
"""
执行测试套件
Args:
test_suite: 测试套件
Returns:
Dict: 套件执行结果
"""
print(f"\n{'#'*70}")
print(f"# 测试套件: {test_suite.name}")
print(f"# 描述: {test_suite.description}")
print(f"# 用例数: {len(test_suite.test_cases)}")
print(f"{'#'*70}")
# 创建浏览器实例
browser = Browser(config=self.browser_config)
try:
results = []
# 执行套件前置操作
if test_suite.setup:
print("\n🔧 执行套件前置操作...")
setup_agent = Agent(
task=test_suite.setup,
llm=self.llm,
browser=browser,
)
await setup_agent.run()
# 执行所有测试用例
for test_case in test_suite.test_cases:
result = await self.execute_test_case(test_case, browser)
results.append(result)
# 执行套件后置操作
if test_suite.teardown:
print("\n🧹 执行套件后置操作...")
teardown_agent = Agent(
task=test_suite.teardown,
llm=self.llm,
browser=browser,
)
await teardown_agent.run()
# 生成套件报告
suite_report = self._generate_suite_report(test_suite, results)
return suite_report
finally:
await browser.close()
def _build_task_description(self, test_case: TestCase) -> str:
"""构建任务描述"""
parts = [
f"执行测试: {test_case.name}",
"",
"前置条件:",
]
for precond in test_case.preconditions:
parts.append(f" - {precond}")
parts.extend(["", "测试步骤:"])
for i, step in enumerate(test_case.steps, 1):
parts.append(f" {i}. {step}")
parts.extend(["", "预期结果:"])
for result in test_case.expected_results:
parts.append(f" - {result}")
parts.extend([
"",
"执行要求:",
" - 严格按照步骤执行",
" - 验证每个预期结果",
" - 发现问题立即停止并报告",
" - 返回详细的执行报告",
])
return "\n".join(parts)
def _generate_suite_report(
self,
suite: TestSuite,
results: List[Dict]
) -> Dict:
"""生成套件报告"""
total = len(results)
passed = sum(1 for r in results if r["status"] == "passed")
failed = sum(1 for r in results if r["status"] == "failed")
errors = sum(1 for r in results if r["status"] == "error")
total_duration = sum(r["duration"] for r in results)
return {
"suite_name": suite.name,
"timestamp": datetime.now().isoformat(),
"summary": {
"total": total,
"passed": passed,
"failed": failed,
"errors": errors,
"pass_rate": passed / total if total > 0 else 0,
"total_duration": total_duration,
},
"results": results,
}
# ============================================================
# 3. 报告生成器
# ============================================================
class ReportGenerator:
"""
报告生成器
生成多种格式的测试报告
"""
@staticmethod
def generate_html_report(report: Dict, output_path: str):
"""
生成HTML报告
Args:
report: 测试报告数据
output_path: 输出文件路径
"""
summary = report["summary"]
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>测试报告 - {report['suite_name']}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.header {{ background: #333; color: white; padding: 20px; }}
.summary {{ display: flex; gap: 20px; margin: 20px 0; }}
.metric {{ background: #f0f0f0; padding: 15px; border-radius: 5px; }}
.passed {{ color: green; }}
.failed {{ color: red; }}
.error {{ color: orange; }}
table {{ width: 100%; border-collapse: collapse; margin-top: 20px; }}
th, td {{ border: 1px solid #ddd; padding: 12px; text-align: left; }}
th {{ background: #4CAF50; color: white; }}
tr:nth-child(even) {{ background: #f2f2f2; }}
</style>
</head>
<body>
<div class="header">
<h1>测试报告: {report['suite_name']}</h1>
<p>生成时间: {report['timestamp']}</p>
</div>
<div class="summary">
<div class="metric">
<h3>总用例数</h3>
<p>{summary['total']}</p>
</div>
<div class="metric">
<h3>通过</h3>
<p class="passed">{summary['passed']} ✅</p>
</div>
<div class="metric">
<h3>失败</h3>
<p class="failed">{summary['failed']} ❌</p>
</div>
<div class="metric">
<h3>错误</h3>
<p class="error">{summary['errors']} ⚠️</p>
</div>
<div class="metric">
<h3>通过率</h3>
<p>{summary['pass_rate']*100:.1f}%</p>
</div>
<div class="metric">
<h3>总耗时</h3>
<p>{summary['total_duration']:.2f}s</p>
</div>
</div>
<table>
<tr>
<th>测试ID</th>
<th>测试名称</th>
<th>状态</th>
<th>耗时</th>
<th>详情</th>
</tr>
"""
for result in report["results"]:
status_class = result["status"]
html += f"""
<tr>
<td>{result['test_id']}</td>
<td>{result['test_name']}</td>
<td class="{status_class}">{result['status'].upper()}</td>
<td>{result['duration']:.2f}s</td>
<td>{result.get('errors', ['-'])[0] if result.get('errors') else '-'}</td>
</tr>
"""
html += """
</table>
</body>
</html>
"""
with open(output_path, 'w', encoding='utf-8') as f:
f.write(html)
print(f"\n📊 HTML报告已生成: {output_path}")
@staticmethod
def generate_json_report(report: Dict, output_path: str):
"""生成JSON报告"""
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"📄 JSON报告已生成: {output_path}")
# ============================================================
# 4. 配置加载器
# ============================================================
class ConfigLoader:
"""
配置加载器
从YAML文件加载测试配置
"""
@staticmethod
def load_test_suite(config_path: str) -> TestSuite:
"""
加载测试套件配置
Args:
config_path: 配置文件路径
Returns:
TestSuite: 测试套件
"""
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# 解析测试用例
test_cases = []
for tc_config in config.get("test_cases", []):
test_cases.append(TestCase(**tc_config))
return TestSuite(
name=config["name"],
description=config.get("description", ""),
test_cases=test_cases,
setup=config.get("setup"),
teardown=config.get("teardown"),
)
# ============================================================
# 5. 使用示例
# ============================================================
async def run_test_platform():
"""
运行测试平台示例
"""
# 初始化LLM
llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
# 创建测试套件(代码方式)
test_suite = TestSuite(
name="电商网站功能测试",
description="测试电商网站的核心功能",
setup="访问 https://ecommerce-demo.com 并确保页面加载完成",
test_cases=[
TestCase(
id="TC001",
name="用户登录功能测试",
description="验证用户能够正常登录系统",
preconditions=["用户已注册","测试环境可用"],
steps=[
"访问登录页面",
"输入有效的用户名",
"输入正确的密码",
"点击登录按钮",
"验证登录成功",
],
expected_results=[
"页面跳转到用户中心",
"显示欢迎消息",
"用户信息正确显示",
],
priority="high",
tags=["login", "authentication"],
),
TestCase(
id="TC002",
name="商品搜索功能测试",
description="验证商品搜索功能正常",
preconditions=["用户已登录"],
steps=[
"在搜索框输入关键词'手机'",
"点击搜索按钮",
"等待搜索结果加载",
],
expected_results=[
"显示搜索结果列表",
"结果中包含手机相关商品",
"分页功能正常",
],
priority="high",
tags=["search", "product"],
),
],
teardown="登出用户并关闭浏览器",
)
# 创建执行引擎
engine = TestExecutionEngine(llm)
# 执行测试套件
report = await engine.execute_test_suite(test_suite)
# 生成报告
output_dir = Path("test_reports")
output_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
ReportGenerator.generate_html_report(
report,
str(output_dir / f"report_{timestamp}.html")
)
ReportGenerator.generate_json_report(
report,
str(output_dir / f"report_{timestamp}.json")
)
# 打印摘要
print(f"\n{'='*70}")
print("测试执行完成!")
print(f"总用例: {report['summary']['total']}")
print(f"通过: {report['summary']['passed']}")
print(f"失败: {report['summary']['failed']}")
print(f"错误: {report['summary']['errors']}")
print(f"通过率: {report['summary']['pass_rate']*100:.1f}%")
print(f"{'='*70}")
if __name__ == "__main__":
asyncio.run(run_test_platform())
10.2 项目模板与脚手架
# projects/project-template/config/test_suite.yaml
# 测试套件配置文件示例
name: "网站功能测试套件"
description: "完整的网站功能回归测试"
setup: |
访问测试网站 https://demo.example.com
确保页面完全加载
清除浏览器缓存和Cookie
teardown: |
清理测试数据
登出所有用户
关闭浏览器
test_cases:
- id: "TC001"
name: "首页加载测试"
description: "验证首页能够正常加载"
preconditions:
- 网络连接正常
- 测试环境可用
steps:
- 访问首页
- 等待页面加载完成
- 检查关键元素是否存在
expected_results:
- 页面状态码为200
- 加载时间小于3秒
- Logo正确显示
priority: "high"
tags: ["homepage", "performance"]
- id: "TC002"
name: "用户注册流程测试"
description: "验证新用户能够成功注册"
preconditions:
- 使用未注册的邮箱
steps:
- 点击注册按钮
- 填写注册表单
- 提交注册
- 验证邮箱
expected_results:
- 注册成功提示
- 收到验证邮件
- 能够使用新账户登录
priority: "high"
tags: ["registration", "user"]
- id: "TC003"
name: "购物车功能测试"
description: "验证购物车功能正常"
preconditions:
- 用户已登录
steps:
- 浏览商品列表
- 添加商品到购物车
- 查看购物车
- 修改商品数量
expected_results:
- 商品正确添加到购物车
- 价格计算正确
- 数量修改生效
priority: "medium"
tags: ["cart", "ecommerce"]
# projects/project-template/main.py
"""
Browser-use 项目模板
快速启动新的自动化测试项目
"""
from browser_use import Agent, Browser, BrowserConfig
from langchain_openai import ChatOpenAI
import asyncio
import os
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# ============================================================
# 配置区域
# ============================================================
class Config:
"""项目配置"""
# LLM配置
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
LLM_MODEL = "gpt-4o"
LLM_TEMPERATURE = 0.1
# 浏览器配置
HEADLESS = True
SLOW_MO = 0
# 执行配置
MAX_STEPS = 100
MAX_FAILURES = 3
# ============================================================
# 初始化
# ============================================================
def init_llm():
"""初始化LLM"""
return ChatOpenAI(
model=Config.LLM_MODEL,
temperature=Config.LLM_TEMPERATURE,
api_key=Config.OPENAI_API_KEY,
)
def init_browser():
"""初始化浏览器"""
return Browser(config=BrowserConfig(
headless=Config.HEADLESS,
slow_mo=Config.SLOW_MO,
))
# ============================================================
# 任务定义
# ============================================================
SAMPLE_TASK = """
任务:访问示例网站并获取信息
步骤:
1. 访问 https://example.com
2. 获取页面标题
3. 提取页面主要内容的摘要
4. 返回结果
输出格式:
返回包含标题和摘要的JSON对象
"""
# ============================================================
# 主函数
# ============================================================
async def main():
"""主函数"""
print("🚀 启动 Browser-use 自动化任务")
# 初始化
llm = init_llm()
browser = init_browser()
try:
# 创建Agent
agent = Agent(
task=SAMPLE_TASK,
llm=llm,
browser=browser,
max_steps=Config.MAX_STEPS,
max_failures=Config.MAX_FAILURES,
)
# 执行任务
print("⏳ 正在执行任务...")
result = await agent.run()
# 处理结果
if result.is_successful():
print("\n✅ 任务执行成功!")
print(f"结果: {result.final_result()}")
else:
print("\n❌ 任务执行失败")
print(f"错误: {result.errors()}")
return result
finally:
# 清理资源
await browser.close()
print("\n👋 任务结束,资源已清理")
# ============================================================
# 入口点
# ============================================================
if __name__ == "__main__":
asyncio.run(main())
# projects/project-template/requirements.txt
# 项目依赖
browser-use>=0.1.40
playwright>=1.40.0
langchain-openai>=0.1.0
python-dotenv>=1.0.0
pydantic>=2.0.0
pyyaml>=6.0.0
# projects/project-template/.env.example
# 环境变量模板
# 复制此文件为 .env 并填入实际值
# OpenAI API配置
OPENAI_API_KEY=your-openai-api-key-here
# Azure OpenAI配置(可选)
AZURE_OPENAI_API_KEY=your-azure-key
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_VERSION=2024-02-01
# Anthropic配置(可选)
ANTHROPIC_API_KEY=your-anthropic-key
# 测试站点凭证(可选)
TEST_SITE_USERNAME=testuser
TEST_SITE_PASSWORD=testpass
# projects/project-template/README.md
# Browser-use 项目模板
## 快速开始
### 1. 安装依赖
```bash
pip install -r requirements.txt
playwright install
2. 配置环境变量
cp .env.example .env
# 编辑 .env 文件,填入你的API密钥
3. 运行示例
python main.py
项目结构
.
├── config/ # 配置文件
│ └── test_suite.yaml # 测试套件配置
├── main.py # 主入口
├── requirements.txt # 依赖
├── .env.example # 环境变量模板
└── README.md # 本文件
自定义任务
编辑 main.py 中的 SAMPLE_TASK 变量,定义你的自动化任务。
高级用法
参考 config/test_suite.yaml 了解如何配置完整的测试套件。
---
## 附录:常见问题与解决方案
### Q1: 如何处理页面加载超时?
```python
# 增加超时时间
agent = Agent(
task=task,
llm=llm,
max_steps=100, # 增加最大步数
)
# 在任务描述中明确等待要求
task = """
访问页面后,等待以下元素加载完成:
- 主内容区域
- 导航菜单
- 页脚信息
如果元素未立即出现,等待最多10秒。
"""
Q2: 如何处理动态加载的内容?
task = """
抓取动态加载的商品列表:
1. 访问商品页面
2. 滚动页面触发懒加载
3. 等待新内容加载(观察加载指示器消失)
4. 重复滚动直到没有新内容
5. 抓取所有已加载的商品信息
"""
Q3: 如何降低API调用成本?
# 1. 使用较小的模型
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1)
# 2. 限制任务复杂度
task = """
简洁的任务描述,避免冗余信息
明确指定步骤和预期结果
"""
# 3. 减少max_steps
agent = Agent(
task=task,
llm=llm,
max_steps=30, # 限制步数
)
Q4: 如何提高执行稳定性?
# 1. 使用重试机制
async def robust_execute(task, llm, max_retries=3):
for i in range(max_retries):
try:
agent = Agent(task=task, llm=llm)
result = await agent.run()
if result.is_successful():
return result
except Exception as e:
if i == max_retries - 1:
raise
await asyncio.sleep(2 ** i) # 指数退避
# 2. 使用自定义控制器添加稳定性检查
controller = Controller()
@controller.action("等待页面稳定")
async def wait_for_stable_page(browser):
# 实现等待逻辑
await asyncio.sleep(2)
return ActionResult(extracted_content="页面已稳定")
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)