第1章:Browser-use 核心概念与架构

1.1 Browser-use 是什么

Browser-use 是一个开源的 Python 库,旨在通过集成大型语言模型(LLM)实现智能浏览器自动化。它允许开发者使用自然语言描述任务,让 AI 代理自主完成复杂的网页交互操作。

核心定位

  • 不是简单的浏览器控制工具,而是AI 驱动的智能代理框架
  • 底层基于 Playwright,但屏蔽了底层复杂性
  • 通过 LLM 实现"理解-决策-执行"的闭环

GitHub 数据(截至 2025年):

  • ⭐ Stars: 60K+
  • 🍴 Forks: 6K+
  • 主要贡献者:gregpr07 及社区团队

1.2 核心架构解析

Browser-use 采用三层架构模型

外部服务

Browser 层 - 浏览器交互

Controller 层 - 动作执行

Agent 层 - 决策中心

用户层

发送上下文
接收决策

自然语言任务描述

Agent 类
大脑/指挥官

MessageManager
消息管理

SystemPrompt
系统提示词

Controller 类
动作注册与分发

Action Registry
动作注册表

Browser 类
浏览器实例

BrowserContext
上下文管理

Playwright
底层驱动

LLM API
GPT-4/Claude等

目标网页

各层职责详解

层级 核心类 职责
Agent 层 Agent 任务协调、决策循环、LLM 交互
Controller 层 Controller 动作注册、参数验证、执行分发
Browser 层 Browser, BrowserContext 浏览器生命周期、页面状态管理

1.3 与传统自动化工具的对比

Browser-use

开发者描述
任务目标

LLM 理解意图
动态决策

自适应调整
容错执行

传统工具 (Selenium/Playwright)

开发者编写
详细操作代码

按步骤执行
固定流程

遇到变化
直接失败

详细对比表

特性 Selenium Playwright Browser-use
编程方式 命令式代码 命令式代码 自然语言 + 代码
学习曲线 陡峭 中等 平缓
自适应能力 强(LLM驱动)
维护成本 高(页面变化需重写) 低(自然语言描述稳定)
复杂任务 需大量代码 需大量代码 简洁描述即可
执行可控性 完全可控 完全可控 需设计约束
适用场景 稳定页面测试 现代 Web 应用 动态/复杂交互场景

1.4 适用场景与核心优势

最佳适用场景

  1. 动态内容处理:页面结构频繁变化,传统选择器易失效
  2. 复杂多步骤任务:需要推理和决策的流程(如比价、筛选)
  3. 自然语言交互测试:验证系统对用户意图的理解
  4. 快速原型验证:无需编写大量代码即可验证自动化可行性

核心优势

Browser-use
核心优势

智能化

LLM 驱动决策

自适应页面变化

容错执行能力

易用性

自然语言描述

简洁 API 设计

快速上手

扩展性

自定义 Action

多 LLM 支持

与现有框架集成

可靠性

基于 Playwright

成熟底层技术

活跃社区支持


第2章:环境搭建与基础配置

2.1 系统要求

硬性要求

  • Python: 3.11 或更高版本(必需,因使用新类型注解特性)
  • 操作系统: Windows 10+/macOS 10.15+/Linux (Ubuntu 20.04+)
  • 内存: 建议 8GB+(LLM 推理需要)
  • 网络: 可访问 LLM API 服务

验证 Python 版本

# 检查 Python 版本
python --version
# 或
python3 --version

# 输出应为 3.11.x 或更高

2.2 安装与依赖管理

基础安装

# 创建虚拟环境(推荐)
python -m venv browser-use-env

# 激活虚拟环境
# Windows:
browser-use-env\Scripts\activate
# macOS/Linux:
source browser-use-env/bin/activate

# 安装 browser-use
pip install browser-use

# 安装 Playwright 浏览器(必需)
playwright install

# 安装特定浏览器(可选,节省空间)
playwright install chromium
# playwright install firefox
# playwright install webkit

完整依赖安装(生产环境)

# 创建 requirements.txt
cat > requirements.txt << EOF
browser-use>=0.1.40
playwright>=1.40.0
openai>=1.0.0
python-dotenv>=1.0.0
pydantic>=2.0.0
pytest>=7.0.0
pytest-asyncio>=0.21.0
aiofiles>=23.0.0
EOF

# 安装依赖
pip install -r requirements.txt

验证安装

# test_installation.py
"""
验证 browser-use 安装是否成功
"""
import asyncio
from browser_use import Agent, Browser, BrowserConfig

async def test_basic():
    """基础功能测试"""
    try:
        # 创建浏览器实例
        browser = Browser(config=BrowserConfig(headless=True))
        print("✅ Browser 实例创建成功")
        
        # 验证 Playwright 浏览器已安装
        from playwright.sync_api import sync_playwright
        with sync_playwright() as p:
            browser_check = p.chromium.launch()
            browser_check.close()
        print("✅ Playwright 浏览器检查通过")
        
        await browser.close()
        print("✅ 所有检查通过,环境配置正确!")
        return True
        
    except Exception as e:
        print(f"❌ 检查失败: {e}")
        return False

if __name__ == "__main__":
    asyncio.run(test_basic())

2.3 LLM 接入配置

Browser-use 支持多种 LLM 提供商,以下是详细配置方法:

2.3.1 OpenAI 配置
# config/openai_config.py
"""
OpenAI LLM 配置示例
支持 GPT-4、GPT-4o、GPT-3.5-turbo 等模型
"""
import os
from langchain_openai import ChatOpenAI
from browser_use import Agent

# 方式1:环境变量配置(推荐,安全)
# 在 .env 文件中设置:
# OPENAI_API_KEY=sk-your-api-key-here

# 方式2:代码中配置(仅测试使用)
os.environ["OPENAI_API_KEY"] = "sk-your-api-key-here"

# 创建 LLM 实例
llm = ChatOpenAI(
    model="gpt-4o",           # 模型名称
    temperature=0.1,          # 温度参数,控制创造性(0-1)
    max_tokens=4096,          # 最大输出token数
    timeout=60,               # API 调用超时时间(秒)
    max_retries=3,            # 失败重试次数
)

# 使用示例
async def create_agent_with_openai():
    """使用 OpenAI LLM 创建 Agent"""
    agent = Agent(
        task="访问 example.com 并获取页面标题",
        llm=llm,
    )
    return agent
2.3.2 Azure OpenAI 配置
# config/azure_config.py
"""
Azure OpenAI 配置示例
适用于企业级部署场景
"""
import os
from langchain_openai import AzureChatOpenAI

# 设置 Azure 环境变量
os.environ["AZURE_OPENAI_API_KEY"] = "your-azure-api-key"
os.environ["AZURE_OPENAI_ENDPOINT"] = "https://your-resource.openai.azure.com/"
os.environ["AZURE_OPENAI_API_VERSION"] = "2024-02-01"

# 创建 Azure LLM 实例
azure_llm = AzureChatOpenAI(
    azure_deployment="gpt-4o",      # Azure 部署名称
    model="gpt-4o",                  # 模型名称
    temperature=0.1,
    max_tokens=4096,
    timeout=60,
)

# 使用 Azure LLM 创建 Agent
async def create_azure_agent():
    """使用 Azure OpenAI 创建 Agent"""
    from browser_use import Agent
    
    agent = Agent(
        task="在 Azure 环境中执行自动化任务",
        llm=azure_llm,
    )
    return agent
2.3.3 Claude (Anthropic) 配置
# config/claude_config.py
"""
Anthropic Claude 配置示例
Claude 在长文本理解和复杂推理方面表现优异
"""
import os
from langchain_anthropic import ChatAnthropic

# 设置 Anthropic API Key
os.environ["ANTHROPIC_API_KEY"] = "sk-ant-your-api-key"

# 创建 Claude LLM 实例
claude_llm = ChatAnthropic(
    model="claude-3-5-sonnet-20241022",  # Claude 3.5 Sonnet
    temperature=0.1,
    max_tokens=4096,
    timeout=60,
    max_retries=3,
)

# Claude 特别适合复杂的多步骤任务
async def create_claude_agent():
    """使用 Claude 创建 Agent"""
    from browser_use import Agent
    
    agent = Agent(
        task="""
        完成以下复杂任务:
        1. 访问一个电商网站
        2. 搜索"无线耳机"
        3. 按价格从低到高排序
        4. 选择评分最高的商品
        5. 获取商品详细信息
        """,
        llm=claude_llm,
    )
    return agent
2.3.4 本地模型配置(Ollama)
# config/local_llm_config.py
"""
本地 LLM 配置示例(使用 Ollama)
适用于数据隐私要求高的场景
"""
from langchain_ollama import ChatOllama

# 创建本地 LLM 实例
local_llm = ChatOllama(
    model="llama3.2",           # 本地模型名称
    temperature=0.1,
    base_url="http://localhost:11434",  # Ollama 服务地址
)

# 使用本地模型创建 Agent
async def create_local_agent():
    """使用本地 LLM 创建 Agent"""
    from browser_use import Agent
    
    agent = Agent(
        task="执行本地自动化测试任务",
        llm=local_llm,
    )
    return agent
2.3.5 统一配置管理(推荐)
# config/llm_factory.py
"""
LLM 工厂类 - 统一管理不同提供商的配置
"""
import os
from typing import Optional
from langchain_openai import ChatOpenAI, AzureChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_ollama import ChatOllama

class LLMFactory:
    """
    LLM 工厂类
    根据配置创建对应的 LLM 实例
    """
    
    @staticmethod
    def create(
        provider: str = "openai",
        model: Optional[str] = None,
        temperature: float = 0.1,
        **kwargs
    ):
        """
        创建 LLM 实例
        
        Args:
            provider: 提供商名称 (openai/azure/anthropic/ollama)
            model: 模型名称
            temperature: 温度参数
            **kwargs: 其他参数
            
        Returns:
            BaseChatModel: LLM 实例
        """
        providers = {
            "openai": LLMFactory._create_openai,
            "azure": LLMFactory._create_azure,
            "anthropic": LLMFactory._create_anthropic,
            "ollama": LLMFactory._create_ollama,
        }
        
        if provider not in providers:
            raise ValueError(f"不支持的提供商: {provider}")
        
        return providers[provider](model, temperature, **kwargs)
    
    @staticmethod
    def _create_openai(model, temperature, **kwargs):
        """创建 OpenAI LLM"""
        return ChatOpenAI(
            model=model or "gpt-4o",
            temperature=temperature,
            max_retries=kwargs.get("max_retries", 3),
        )
    
    @staticmethod
    def _create_azure(model, temperature, **kwargs):
        """创建 Azure OpenAI LLM"""
        return AzureChatOpenAI(
            azure_deployment=model or "gpt-4o",
            temperature=temperature,
        )
    
    @staticmethod
    def _create_anthropic(model, temperature, **kwargs):
        """创建 Anthropic LLM"""
        return ChatAnthropic(
            model=model or "claude-3-5-sonnet-20241022",
            temperature=temperature,
        )
    
    @staticmethod
    def _create_ollama(model, temperature, **kwargs):
        """创建 Ollama 本地 LLM"""
        return ChatOllama(
            model=model or "llama3.2",
            temperature=temperature,
            base_url=kwargs.get("base_url", "http://localhost:11434"),
        )

# 使用示例
# llm = LLMFactory.create(provider="openai", model="gpt-4o")
# llm = LLMFactory.create(provider="anthropic", temperature=0.2)

第3章:核心类与参数详解

3.1 Agent 类深度解析

Agent 是 Browser-use 的核心类,负责协调整个自动化流程。

class Agent:
    """
    Agent 类 - 智能代理的核心实现
    
    职责:
    1. 管理任务执行生命周期
    2. 与 LLM 交互进行决策
    3. 调用 Controller 执行动作
    4. 处理执行结果和错误
    """
    
    def __init__(
        self,
        # ------------------- 必需参数 -------------------
        task: str,                           # 任务描述(自然语言)
        llm: BaseChatModel,                 # LLM 实例
        
        # ------------------- 可选参数 -------------------
        browser: Browser | None = None,      # 浏览器实例
        browser_context: BrowserContext | None = None,  # 浏览器上下文
        controller: Controller | None = None,  # 控制器实例
        
        # ------------------- 执行控制参数 -------------------
        max_steps: int = 100,                # 最大执行步数
        max_actions_per_step: int = 10,      # 每步最大动作数
        max_failures: int = 3,               # 最大失败次数
        retry_delay: int = 10,               # 重试延迟(秒)
        
        # ------------------- 系统配置 -------------------
        system_prompt: SystemMessage | None = None,  # 自定义系统提示词
        use_vision: bool = True,              # 是否启用视觉模式
        
        # ------------------- 高级参数 -------------------
        generate_gif: bool | str | None = None,  # 生成执行过程GIF
        available_file_paths: list[str] | None = None,  # 可用文件路径
        include_attributes: list[str] | None = None,    # 包含的DOM属性
    ):

参数详解表

参数 类型 默认值 说明
task str 必需 自然语言描述的任务目标
llm BaseChatModel 必需 LangChain 格式的 LLM 实例
browser Browser None 浏览器实例,None则自动创建
browser_context BrowserContext None 浏览器上下文,用于保持会话
controller Controller None 自定义控制器实例
max_steps int 100 最大执行步数,防止无限循环
max_actions_per_step int 10 每步最大动作数,防止LLM生成过多动作
max_failures int 3 连续失败次数上限,超过则终止
retry_delay int 10 失败后重试前的等待时间
system_prompt SystemMessage None 自定义系统提示词,控制Agent行为
use_vision bool True 是否发送页面截图给LLM
generate_gif bool/str None 是否生成执行过程GIF记录

3.2 Browser 与 BrowserContext 类

class Browser:
    """
    Browser 类 - 浏览器实例管理
    
    职责:
    1. 管理 Playwright 浏览器实例
    2. 提供浏览器级别的配置
    3. 创建和管理 BrowserContext
    """
    
    def __init__(
        self,
        config: BrowserConfig = BrowserConfig(),  # 浏览器配置
    ):

class BrowserContext:
    """
    BrowserContext 类 - 浏览器上下文管理
    
    职责:
    1. 管理独立的浏览会话(Cookie、缓存、存储)
    2. 提供页面级别的操作
    3. 维护当前页面状态
    """
    
    def __init__(
        self,
        browser: Browser,                          # 父浏览器实例
        config: BrowserContextConfig | None = None,  # 上下文配置
    ):

BrowserConfig 配置详解

@dataclass
class BrowserConfig:
    """
    浏览器配置类
    
    控制浏览器实例的行为和特性
    """
    
    # ------------------- 显示模式 -------------------
    headless: bool = False           # 是否无头模式(不显示界面)
                                     # True: 后台运行,适合生产环境
                                     # False: 显示界面,适合调试
    
    # ------------------- 窗口配置 -------------------
    extra_chromium_args: list[str] = field(default_factory=list)
                                     # 额外的 Chromium 启动参数
                                     # 例如: ["--disable-gpu", "--no-sandbox"]
    
    # ------------------- 浏览器类型 -------------------
    browser_type: str = "chromium"   # 浏览器类型
                                     # 可选: "chromium", "firefox", "webkit"
    
    # ------------------- 连接配置 -------------------
    _force_keep_browser_alive: bool = False
                                     # 是否保持浏览器存活
                                     # 用于调试,防止自动关闭

BrowserContextConfig 配置详解

@dataclass
class BrowserContextConfig:
    """
    浏览器上下文配置类
    
    控制单个浏览会话的行为
    """
    
    # ------------------- 视口配置 -------------------
    viewport: dict = field(default_factory=lambda: {
        "width": 1280,
        "height": 720
    })                               # 浏览器窗口大小
    
    # ------------------- 地理位置 -------------------
    geolocation: dict | None = None  # 地理位置信息
                                     # 例如: {"latitude": 39.9042, "longitude": 116.4074}
    
    # ------------------- 区域设置 -------------------
    locale: str = "zh-CN"            # 区域设置,影响语言和格式
    timezone_id: str = "Asia/Shanghai"  # 时区设置
    
    # ------------------- 设备模拟 -------------------
    user_agent: str = ""             # 自定义 User-Agent
    device_scale_factor: float = 1.0 # 设备缩放比例
    is_mobile: bool = False          # 是否模拟移动设备
    has_touch: bool = False          # 是否支持触摸事件
    
    # ------------------- 权限配置 -------------------
    permissions: list[str] = field(default_factory=list)
                                     # 权限列表
                                     # 例如: ["geolocation", "notifications"]
    
    # ------------------- 存储配置 -------------------
    storage_state: str | None = None # 存储状态文件路径
                                     # 用于恢复登录状态等

3.3 Controller 类与自定义动作

class Controller:
    """
    Controller 类 - 动作注册与执行中心
    
    职责:
    1. 注册和管理可执行动作
    2. 验证动作参数
    3. 分发动作执行
    4. 处理执行结果
    """
    
    def __init__(
        self,
        exclude_actions: list[str] | None = None,  # 排除的动作列表
        output_model: type[BaseModel] | None = None,  # 输出数据模型
    ):

核心方法

方法 说明 使用场景
action() 装饰器,注册自定义动作 扩展 Agent 能力
register_action() 程序化注册动作 动态添加动作
execute_action() 执行指定动作 手动调用动作

3.4 完整配置示例

# examples/complete_configuration.py
"""
Browser-use 完整配置示例
展示所有核心类的配置方式
"""
from browser_use import Agent, Browser, BrowserConfig, BrowserContextConfig, Controller
from langchain_openai import ChatOpenAI
import asyncio

# ============================================================
# 1. 创建 LLM 实例
# ============================================================

llm = ChatOpenAI(
    model="gpt-4o",
    temperature=0.1,
    max_tokens=4096,
    max_retries=3,
)

# ============================================================
# 2. 配置浏览器
# ============================================================

browser_config = BrowserConfig(
    headless=True,                           # 无头模式
    extra_chromium_args=[                    # 额外启动参数
        "--disable-gpu",
        "--no-sandbox",
        "--disable-dev-shm-usage",
    ],
    browser_type="chromium",                  # 使用 Chromium
)

browser = Browser(config=browser_config)

# ============================================================
# 3. 配置浏览器上下文
# ============================================================

context_config = BrowserContextConfig(
    viewport={"width": 1920, "height": 1080},  # 全高清分辨率
    locale="zh-CN",                           # 中文环境
    timezone_id="Asia/Shanghai",              # 上海时区
    user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
)

# ============================================================
# 4. 配置控制器
# ============================================================

controller = Controller(
    exclude_actions=["open_tab"],  # 排除特定动作
)

# ============================================================
# 5. 创建完整配置的 Agent
# ============================================================

async def create_fully_configured_agent():
    """创建完整配置的 Agent"""
    
    agent = Agent(
        # ------------------- 必需参数 -------------------
        task="完成指定的浏览器自动化任务",        # str: 任务描述
        llm=llm,                                  # BaseChatModel: LLM 实例
        
        # ------------------- 浏览器配置 -------------------
        browser=browser,                          # Browser: 自定义浏览器实例
        
        # ------------------- 控制器配置 -------------------
        controller=controller,                    # Controller: 自定义控制器
        
        # ------------------- 执行控制参数 -------------------
        max_steps=100,                            # int: 最大执行步数,默认 100
        max_actions_per_step=10,                  # int: 每步最大动作数,默认 10
        max_failures=3,                           # int: 最大失败次数,默认 3
        retry_delay=10,                           # int: 重试延迟(秒),默认 10
        
        # ------------------- 系统配置 -------------------
        use_vision=True,                          # bool: 启用视觉模式,默认 True
        
        # ------------------- 高级参数 -------------------
        generate_gif=False,                       # bool/str: 生成GIF,默认 False
    )
    
    return agent

# ============================================================
# 6. 执行示例
# ============================================================

async def main():
    """主函数"""
    agent = await create_fully_configured_agent()
    
    # 修改任务
    agent.task = """
    访问 https://example.com 并完成以下操作:
    1. 获取页面标题
    2. 提取所有链接
    3. 返回结果
    """
    
    # 执行任务
    result = await agent.run()
    
    # 处理结果
    if result.is_successful():
        print(f"✅ 任务成功: {result.final_result()}")
    else:
        print(f"❌ 任务失败: {result.errors()}")
    
    # 清理资源
    await browser.close()

if __name__ == "__main__":
    asyncio.run(main())

第4章:内置动作方法与使用

4.1 核心动作方法详解

Browser-use 提供了丰富的内置动作,以下是完整列表:

内置动作

导航类

go_to_url

go_back

go_forward

refresh

交互类

click_element

input_text

scroll_down

scroll_up

scroll_to_text

表单类

select_option

check_checkbox

switch_to_frame

标签管理

switch_to_tab

open_tab

close_tab

信息获取

extract_content

get_page_html

find_element

系统类

wait

done

动作详细说明表

动作 功能 参数 返回值
go_to_url 访问指定URL url: str 页面加载结果
click_element 点击元素 index: int 点击结果
input_text 输入文本 index: int, text: str 输入结果
scroll_down 向下滚动 scroll_amount: int 滚动结果
scroll_up 向上滚动 scroll_amount: int 滚动结果
select_option 选择下拉选项 index: int, text: str 选择结果
check_checkbox 勾选/取消复选框 index: int 操作结果
switch_to_tab 切换标签页 page_id: int 切换结果
open_tab 打开新标签页 url: str 新页面
close_tab 关闭标签页 - 关闭结果
extract_content 提取页面内容 - 提取的文本
wait 等待指定时间 seconds: int -
done 标记任务完成 text: str 最终结果

4.2 动作使用示例

# examples/action_usage.py
"""
内置动作使用示例
展示各种内置动作的实际使用
"""
from browser_use import Agent, Controller, ActionResult
from langchain_openai import ChatOpenAI
import asyncio

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

# ============================================================
# 1. 基础导航动作
# ============================================================

async def navigation_actions():
    """导航动作示例"""
    
    task = """
    导航操作演示:
    
    1. 访问 https://example.com
    2. 等待2秒
    3. 点击页面上的第一个链接
    4. 等待新页面加载
    5. 返回上一页
    6. 刷新页面
    7. 完成操作
    
    记录每个步骤的结果。
    """
    
    agent = Agent(task=task, llm=llm)
    result = await agent.run()
    return result

# ============================================================
# 2. 表单交互动作
# ============================================================

async def form_interaction():
    """表单交互示例"""
    
    task = """
    表单填写演示:
    
    1. 访问 https://forms.example.com
    2. 在姓名字段输入"张三"
    3. 在邮箱字段输入"zhangsan@example.com"
    4. 从下拉菜单选择"北京"
    5. 勾选"同意服务条款"复选框
    6. 点击提交按钮
    7. 验证提交成功
    """
    
    agent = Agent(task=task, llm=llm, max_steps=50)
    result = await agent.run()
    return result

# ============================================================
# 3. 内容提取动作
# ============================================================

async def content_extraction():
    """内容提取示例"""
    
    task = """
    内容提取演示:
    
    1. 访问 https://news.example.com
    2. 提取页面标题
    3. 提取所有文章标题和链接
    4. 滚动页面加载更多内容
    5. 继续提取新加载的文章
    6. 返回JSON格式的结果:
       {
         "page_title": "...",
         "articles": [
           {"title": "...", "link": "..."}
         ]
       }
    """
    
    agent = Agent(task=task, llm=llm, max_steps=80)
    result = await agent.run()
    return result

# ============================================================
# 4. 多标签页管理
# ============================================================

async def tab_management():
    """标签页管理示例"""
    
    task = """
    多标签页操作演示:
    
    1. 访问 https://site1.com
    2. 在新标签页打开 https://site2.com
    3. 在新标签页打开 https://site3.com
    4. 切换到第2个标签页,提取标题
    5. 切换到第3个标签页,提取标题
    6. 关闭第3个标签页
    7. 返回第1个标签页
    8. 汇总所有访问过的页面标题
    """
    
    agent = Agent(task=task, llm=llm, max_steps=60)
    result = await agent.run()
    return result

# ============================================================
# 5. 复杂交互流程
# ============================================================

async def complex_interaction():
    """复杂交互示例"""
    
    task = """
    复杂交互演示(电商购物流程):
    
    1. 访问 https://shop.example.com
    2. 在搜索框输入"无线耳机"
    3. 点击搜索按钮
    4. 等待搜索结果加载
    5. 点击第一个商品
    6. 选择颜色为"黑色"
    7. 点击"加入购物车"
    8. 验证购物车数量增加
    9. 进入购物车页面
    10. 点击"去结算"
    11. 确认订单信息
    12. 返回操作结果
    """
    
    agent = Agent(task=task, llm=llm, max_steps=100)
    result = await agent.run()
    return result

# 运行示例
if __name__ == "__main__":
    # 选择要运行的示例
    asyncio.run(navigation_actions())

4.3 动作组合策略

# examples/action_composition.py
"""
动作组合策略
展示如何组合多个动作完成复杂任务
"""
from browser_use import Agent, Controller
from langchain_openai import ChatOpenAI
from typing import List, Dict
import asyncio

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

# ============================================================
# 1. 顺序执行模式
# ============================================================

async def sequential_execution():
    """
    顺序执行模式
    按固定顺序执行一系列操作
    """
    
    steps = [
        "访问登录页面",
        "输入用户名",
        "输入密码",
        "点击登录按钮",
        "验证登录成功",
    ]
    
    task = f"""
    按顺序执行以下操作:
    {chr(10).join(f"{i+1}. {step}" for i, step in enumerate(steps))}
    
    每个步骤完成后验证结果,如果失败则停止并报告。
    """
    
    agent = Agent(task=task, llm=llm)
    return await agent.run()

# ============================================================
# 2. 条件执行模式
# ============================================================

async def conditional_execution():
    """
    条件执行模式
    根据页面状态决定执行路径
    """
    
    task = """
    条件执行演示:
    
    1. 访问 https://conditional.example.com
    2. 检查页面状态:
       - 如果显示登录表单:执行登录流程
       - 如果显示欢迎页面:说明已登录,跳过登录
       - 如果显示错误页面:报告错误并停止
    3. 根据检查结果执行相应操作
    4. 返回执行路径和结果
    """
    
    agent = Agent(task=task, llm=llm, max_steps=50)
    return await agent.run()

# ============================================================
# 3. 循环执行模式
# ============================================================

async def loop_execution():
    """
    循环执行模式
    重复执行直到满足条件
    """
    
    task = """
    循环执行演示(分页数据采集):
    
    1. 访问 https://pagination.example.com
    2. 提取当前页的所有商品信息
    3. 检查是否有"下一页"按钮:
       - 如果有:点击下一页,返回步骤2继续
       - 如果没有:结束采集
    4. 汇总所有采集的数据
    5. 返回JSON格式的完整结果
    
    注意:最多采集10页,防止无限循环。
    """
    
    agent = Agent(task=task, llm=llm, max_steps=150)
    return await agent.run()

# ============================================================
# 4. 错误恢复模式
# ============================================================

async def error_recovery():
    """
    错误恢复模式
    遇到错误时尝试恢复
    """
    
    task = """
    错误恢复演示:
    
    1. 访问 https://unstable.example.com
    2. 尝试点击"加载数据"按钮
    3. 如果加载失败(超时或无响应):
       - 刷新页面
       - 等待3秒
       - 重试点击
       - 最多重试3次
    4. 如果仍然失败,报告错误
    5. 如果成功,继续后续操作
    """
    
    agent = Agent(task=task, llm=llm, max_failures=5)
    return await agent.run()

第5章:LLM 集成与提示工程

5.1 支持的 LLM 提供商与配置方式

Browser-use 通过 LangChain 集成多种 LLM 提供商,以下是完整支持列表:

集成方式

LangChain
ChatModel 接口

支持的 LLM 提供商

OpenAI
GPT-4/GPT-4o

Azure OpenAI
企业级部署

Anthropic
Claude 3 系列

Google
Gemini

本地模型
Ollama/LM Studio

其他
通过 LangChain 扩展

各提供商特点对比

提供商 推荐模型 优势 适用场景
OpenAI GPT-4o 综合能力强,速度快 通用任务
Azure OpenAI GPT-4o 企业合规,SLA保障 企业部署
Anthropic Claude 3.5 Sonnet 推理能力强,上下文长 复杂任务
Google Gemini Pro 多模态能力强 视觉任务
Ollama Llama 3.2 本地运行,隐私保护 敏感数据场景

5.2 系统提示词(System Prompt)定制

系统提示词决定了 Agent 的行为模式,可以通过自定义 SystemPrompt 类实现。

# llm/system_prompt.py
"""
系统提示词定制
通过自定义 SystemPrompt 控制 Agent 行为
"""
from browser_use import Agent
from langchain_core.messages import SystemMessage
from langchain_openai import ChatOpenAI
import asyncio

# ============================================================
# 1. 基础系统提示词
# ============================================================

BASIC_SYSTEM_PROMPT = """你是一个浏览器自动化助手。

你的任务是帮助用户完成网页浏览和交互任务。

规则:
1. 仔细分析当前页面状态
2. 选择最合适的操作
3. 如果操作失败,尝试替代方案
4. 保持简洁,避免不必要的步骤
"""

# ============================================================
# 2. 测试专用系统提示词
# ============================================================

TESTING_SYSTEM_PROMPT = """你是一个专业的自动化测试工程师。

你的职责:
1. 严格按照测试用例执行操作
2. 验证每个步骤的预期结果
3. 详细记录测试过程和结果
4. 发现缺陷时立即停止并报告

操作规范:
- 每次操作后验证页面状态
- 使用断言验证关键元素
- 截图保存关键步骤
- 详细记录错误信息
"""

# ============================================================
# 3. 数据采集专用系统提示词
# ============================================================

SCRAPING_SYSTEM_PROMPT = """你是一个数据采集专家。

你的任务是从网页中提取结构化数据。

规则:
1. 识别数据所在的页面区域
2. 提取完整、准确的数据
3. 处理分页和动态加载
4. 返回格式化的 JSON 数据

注意事项:
- 尊重网站的 robots.txt
- 控制请求频率,避免被封禁
- 处理反爬虫机制
"""

# ============================================================
# 4. 使用自定义系统提示词
# ============================================================

async def agent_with_custom_prompt():
    """
    使用自定义系统提示词创建 Agent
    """
    llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
    
    # 创建自定义系统消息
    system_message = SystemMessage(content=TESTING_SYSTEM_PROMPT)
    
    agent = Agent(
        task="执行登录测试",
        llm=llm,
        system_message=system_message,  # 使用自定义系统提示词
    )
    
    result = await agent.run()
    return result

# ============================================================
# 5. 动态系统提示词
# ============================================================

def create_contextual_prompt(task_type: str, domain: str) -> str:
    """
    根据任务类型和领域创建上下文相关的系统提示词
    
    Args:
        task_type: 任务类型 (testing/scraping/form_filling/navigation)
        domain: 目标网站领域 (ecommerce/finance/social/etc)
        
    Returns:
        str: 定制的系统提示词
    """
    base_prompt = f"""你是一个专业的浏览器自动化助手,专注于 {domain} 领域的 {task_type} 任务。

领域知识:
- 熟悉 {domain} 网站的常见布局和交互模式
- 了解该领域的专业术语和业务流程
- 知晓常见的反爬虫和保护机制

任务规范:
"""
    
    if task_type == "testing":
        base_prompt += """
- 严格验证每个操作的结果
- 详细记录测试步骤和结果
- 发现异常立即停止并报告
"""
    elif task_type == "scraping":
        base_prompt += """
- 提取完整、准确的数据
- 处理分页和动态加载
- 控制请求频率
"""
    elif task_type == "form_filling":
        base_prompt += """
- 准确填写所有必填字段
- 验证输入格式和约束
- 处理表单验证错误
"""
    
    return base_prompt

# 使用动态提示词
async def agent_with_dynamic_prompt():
    """使用动态生成的系统提示词"""
    llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
    
    # 根据任务动态生成提示词
    prompt = create_contextual_prompt(
        task_type="testing",
        domain="ecommerce"
    )
    
    agent = Agent(
        task="测试电商网站购物车功能",
        llm=llm,
        system_message=SystemMessage(content=prompt),
    )
    
    return await agent.run()

5.3 任务描述的最佳实践

任务描述(task)是 Agent 理解需求的唯一途径,编写清晰、准确的任务描述至关重要。

# llm/task_best_practices.py
"""
任务描述最佳实践
如何编写高质量的任务描述
"""
from browser_use import Agent
from langchain_openai import ChatOpenAI
import asyncio

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

# ============================================================
# 1. 任务描述的结构化模板
# ============================================================

# ❌ 不好的任务描述
bad_task = "去那个网站找点东西"

# ✅ 好的任务描述
good_task = """
任务目标:在 Amazon 上搜索并比较无线耳机

执行步骤:
1. 访问 https://www.amazon.com
2. 在搜索框输入 "wireless headphones"
3. 点击搜索按钮
4. 筛选条件:价格范围 $50-$100,评分 4星以上
5. 选择前 3 个商品,提取以下信息:
   - 商品名称
   - 价格
   - 评分
   - 评论数量
6. 按性价比排序,返回最佳推荐

输出格式:
返回 JSON 格式的比较结果
"""

# ============================================================
# 2. 任务描述的关键要素
# ============================================================

"""
一个高质量的任务描述应包含:

1. 目标(Goal)
   - 明确要完成的任务
   - 说明成功的标准

2. 步骤(Steps)
   - 按顺序列出关键步骤
   - 每个步骤具体明确

3. 约束(Constraints)
   - 时间限制
   - 操作边界
   - 禁止事项

4. 输出(Output)
   - 期望的输出格式
   - 需要包含的字段
   - 数据类型要求
"""

# ============================================================
# 3. 不同类型任务的最佳实践
# ============================================================

# 3.1 表单填写任务
form_task = """
任务:完成用户注册表单

目标:在 example.com 上创建新账户

步骤:
1. 访问 https://example.com/register
2. 填写以下字段:
   - 用户名:testuser2024
   - 邮箱:testuser2024@example.com
   - 密码:SecurePass123!
   - 确认密码:SecurePass123!
3. 勾选"我同意服务条款"复选框
4. 点击"注册"按钮

验证:
- 确认页面跳转到欢迎页面
- 检查是否显示"注册成功"消息

约束:
- 如果用户名已存在,尝试 testuser2024_1, testuser2024_2 等变体
- 密码必须包含大小写字母、数字和特殊字符
"""

# 3.2 数据采集任务
scraping_task = """
任务:采集新闻网站文章列表

目标:从 news.example.com 采集最新 50 篇文章

步骤:
1. 访问 https://news.example.com
2. 滚动页面加载更多文章,直到采集到 50 篇
3. 对每篇文章提取:
   - 标题
   - 发布时间
   - 摘要
   - 链接
   - 分类标签

输出格式:
{
  "articles": [
    {
      "title": "文章标题",
      "publish_time": "2024-01-15 10:30",
      "summary": "文章摘要...",
      "url": "https://...",
      "tags": ["科技", "AI"]
    }
  ],
  "total_count": 50
}

约束:
- 只采集最近 7 天内发布的文章
- 跳过付费/会员专属内容
- 控制请求频率,每滚动一次等待 2 秒
"""

# 3.3 测试验证任务
testing_task = """
任务:验证电商网站结账流程

目标:测试从购物车到订单完成的完整流程

前置条件:
- 已登录测试账户
- 购物车中已有测试商品

测试步骤:
1. 访问购物车页面
2. 点击"去结算"按钮
3. 选择收货地址(使用默认地址)
4. 选择支付方式(货到付款)
5. 确认订单信息
6. 点击"提交订单"

预期结果:
- 页面跳转到订单成功页面
- 显示订单号
- 显示预计送达时间
- 收到订单确认邮件(检查邮件通知)

验证点:
- 订单金额计算正确
- 商品信息准确无误
- 收货地址正确
- 订单状态为"待发货"
"""

# ============================================================
# 4. 任务描述优化技巧
# ============================================================

def optimize_task_description(original_task: str) -> str:
    """
    优化任务描述的辅助函数
    
    Args:
        original_task: 原始任务描述
        
    Returns:
        str: 优化后的任务描述
    """
    optimized = f"""
【任务目标】
{original_task}

【执行策略】
1. 优先使用明确的选择器定位元素
2. 操作失败后尝试替代方案
3. 每个关键步骤后验证结果
4. 遇到问题及时报告,不盲目重试

【输出要求】
- 返回执行结果摘要
- 如有错误,说明错误原因
- 提供关键步骤的截图(如可能)

【注意事项】
- 遵守网站使用条款
- 控制操作频率
- 保护敏感信息
"""
    return optimized

# 使用优化后的任务
async def run_optimized_task():
    """运行优化后的任务"""
    original = "登录系统并查看账户余额"
    optimized = optimize_task_description(original)
    
    agent = Agent(
        task=optimized,
        llm=llm,
        max_steps=30,
    )
    
    return await agent.run()

5.4 多轮对话与上下文管理

Browser-use 支持多轮对话模式,可以在连续的任务中保持上下文。

# llm/multi_turn_conversation.py
"""
多轮对话与上下文管理
实现连续任务执行和状态保持
"""
from browser_use import Agent, Browser, BrowserConfig
from langchain_openai import ChatOpenAI
import asyncio

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

# ============================================================
# 1. 会话状态保持
# ============================================================

async def session_management():
    """
    使用共享浏览器实例保持会话状态
    """
    # 创建共享浏览器实例
    browser = Browser(config=BrowserConfig(headless=True))
    
    try:
        # 第一轮:登录
        login_agent = Agent(
            task="""
            登录系统:
            1. 访问 https://example.com/login
            2. 输入用户名 'testuser'
            3. 输入密码 'testpass'
            4. 点击登录按钮
            5. 验证登录成功
            """,
            llm=llm,
            browser=browser,  # 复用浏览器实例
        )
        login_result = await login_agent.run()
        print(f"登录结果: {login_result.final_result()}")
        
        # 第二轮:登录后操作(保持会话)
        action_agent = Agent(
            task="""
            在登录状态下执行操作:
            1. 访问个人中心
            2. 查看账户信息
            3. 获取账户余额
            """,
            llm=llm,
            browser=browser,  # 复用同一浏览器,保持登录状态
        )
        action_result = await action_agent.run()
        print(f"操作结果: {action_result.final_result()}")
        
        # 第三轮:修改信息
        update_agent = Agent(
            task="""
            修改个人信息:
            1. 进入个人资料页面
            2. 更新手机号码为 '13800138000'
            3. 保存更改
            4. 验证更新成功
            """,
            llm=llm,
            browser=browser,
        )
        update_result = await update_agent.run()
        print(f"更新结果: {update_result.final_result()}")
        
    finally:
        await browser.close()

# ============================================================
# 2. 上下文传递模式
# ============================================================

class ContextualAgent:
    """
    上下文感知 Agent
    支持在多轮对话中传递上下文信息
    """
    
    def __init__(self, llm, browser=None):
        self.llm = llm
        self.browser = browser or Browser(config=BrowserConfig(headless=True))
        self.conversation_history = []
        self.shared_context = {}
    
    async def execute_task(self, task: str, context: dict = None) -> dict:
        """
        执行任务并更新上下文
        
        Args:
            task: 当前任务描述
            context: 额外的上下文信息
            
        Returns:
            dict: 执行结果和更新后的上下文
        """
        # 构建包含上下文的任务描述
        contextual_task = self._build_contextual_task(task, context)
        
        # 创建 Agent 并执行
        agent = Agent(
            task=contextual_task,
            llm=self.llm,
            browser=self.browser,
        )
        
        result = await agent.run()
        
        # 更新历史记录
        self.conversation_history.append({
            "task": task,
            "result": result.final_result(),
        })
        
        # 提取并更新共享上下文
        self._update_shared_context(result)
        
        return {
            "result": result,
            "context": self.shared_context,
            "history": self.conversation_history,
        }
    
    def _build_contextual_task(self, task: str, context: dict) -> str:
        """构建包含上下文的任务描述"""
        contextual_parts = ["【当前任务】", task]
        
        if context:
            contextual_parts.extend(["", "【已知信息】"])
            for key, value in context.items():
                contextual_parts.append(f"- {key}: {value}")
        
        if self.shared_context:
            contextual_parts.extend(["", "【会话上下文】"])
            for key, value in self.shared_context.items():
                contextual_parts.append(f"- {key}: {value}")
        
        return "\n".join(contextual_parts)
    
    def _update_shared_context(self, result):
        """从执行结果中提取并更新上下文"""
        # 这里可以根据实际需求解析结果并提取关键信息
        pass
    
    async def close(self):
        """关闭浏览器"""
        await self.browser.close()

# 使用上下文感知 Agent
async def contextual_conversation():
    """多轮上下文对话示例"""
    agent = ContextualAgent(llm=llm)
    
    try:
        # 第一轮:搜索商品
        result1 = await agent.execute_task(
            task="在电商网站搜索 'iPhone 15'",
            context={"site": "amazon.com"}
        )
        print(f"搜索完成: {result1['result'].final_result()}")
        
        # 第二轮:筛选(使用第一轮的上下文)
        result2 = await agent.execute_task(
            task="筛选结果:只显示 256GB 版本,按价格从低到高排序"
        )
        print(f"筛选完成: {result2['result'].final_result()}")
        
        # 第三轮:选择商品(继承前两轮的上下文)
        result3 = await agent.execute_task(
            task="选择价格最低的商品,获取详细信息"
        )
        print(f"选择完成: {result3['result'].final_result()}")
        
    finally:
        await agent.close()

# ============================================================
# 3. 复杂多步骤工作流
# ============================================================

async def complex_workflow():
    """
    复杂工作流示例:电商完整购物流程
    """
    browser = Browser(config=BrowserConfig(headless=True))
    
    workflow_steps = [
        {
            "name": "登录",
            "task": """
            登录电商网站:
            1. 访问登录页面
            2. 输入用户名 'testuser' 和密码 'testpass'
            3. 完成登录验证
            """,
        },
        {
            "name": "搜索商品",
            "task": """
            搜索并选择商品:
            1. 在搜索框输入 '无线蓝牙耳机'
            2. 点击搜索
            3. 筛选条件:价格 100-300 元,评分 4.5 以上
            4. 选择第一个符合条件的商品
            """,
        },
        {
            "name": "加入购物车",
            "task": """
            将商品加入购物车:
            1. 选择颜色:黑色
            2. 选择数量:2
            3. 点击'加入购物车'按钮
            4. 验证添加成功
            """,
        },
        {
            "name": "结算",
            "task": """
            完成结算流程:
            1. 进入购物车
            2. 确认商品信息
            3. 点击'去结算'
            4. 选择默认收货地址
            5. 选择支付方式:支付宝
            6. 提交订单(不实际支付)
            """,
        },
    ]
    
    results = []
    
    try:
        for step in workflow_steps:
            print(f"\n执行步骤: {step['name']}")
            
            agent = Agent(
                task=step["task"],
                llm=llm,
                browser=browser,
                max_steps=50,
            )
            
            result = await agent.run()
            results.append({
                "step": step["name"],
                "success": result.is_successful(),
                "result": result.final_result(),
            })
            
            if not result.is_successful():
                print(f"步骤 {step['name']} 失败,终止工作流")
                break
                
    finally:
        await browser.close()
    
    return results

5.5 工具调用(Function Calling)机制解析

Browser-use 内部使用工具调用机制让 LLM 与浏览器交互。

# llm/function_calling.py
"""
工具调用(Function Calling)机制解析
理解 Browser-use 如何让 LLM 控制浏览器
"""
from browser_use import Agent, Controller
from browser_use.browser.context import BrowserContext
from langchain_openai import ChatOpenAI
import asyncio

# ============================================================
# 1. 工具调用机制概述
# ============================================================

"""
Browser-use 的工具调用流程:

1. 页面分析
   - 获取当前页面 HTML
   - 提取可交互元素
   - 为每个元素分配索引

2. 构建提示词
   - 系统提示词(定义可用工具)
   - 页面状态(当前可执行的操作)
   - 任务目标

3. LLM 决策
   - 分析当前状态
   - 选择要执行的工具
   - 确定工具参数

4. 执行工具
   - 调用对应的浏览器操作
   - 获取执行结果
   - 更新页面状态

5. 循环迭代
   - 重复步骤 1-4
   - 直到任务完成或达到限制
"""

# ============================================================
# 2. 自定义工具示例
# ============================================================

controller = Controller()

@controller.action("获取页面性能指标")
async def get_performance_metrics(browser: BrowserContext) -> dict:
    """
    自定义工具:获取页面性能指标
    
    这个工具展示了如何创建自定义功能
    让 LLM 能够获取页面的性能数据
    """
    page = await browser.get_current_page()
    
    # 使用 Performance API 获取指标
    metrics = await page.evaluate("""
        () => {
            const navigation = performance.getEntriesByType('navigation')[0];
            const paint = performance.getEntriesByType('paint');
            
            return {
                // 加载时间指标
                dns_lookup: navigation.domainLookupEnd - navigation.domainLookupStart,
                tcp_connection: navigation.connectEnd - navigation.connectStart,
                server_response: navigation.responseEnd - navigation.requestStart,
                dom_processing: navigation.domComplete - navigation.domLoading,
                total_load: navigation.loadEventEnd - navigation.startTime,
                
                // 渲染指标
                first_paint: paint.find(p => p.name === 'first-paint')?.startTime,
                first_contentful_paint: paint.find(p => p.name === 'first-contentful-paint')?.startTime,
                
                // 资源数量
                resource_count: performance.getEntriesByType('resource').length,
            };
        }
    """)
    
    return metrics

@controller.action("执行 JavaScript")
async def execute_javascript(
    browser: BrowserContext,
    script: str,
    return_value: bool = True
) -> any:
    """
    自定义工具:在页面中执行 JavaScript
    
    Args:
        script: 要执行的 JavaScript 代码
        return_value: 是否返回执行结果
    """
    page = await browser.get_current_page()
    
    if return_value:
        result = await page.evaluate(f"() => {{ {script} }}")
        return result
    else:
        await page.evaluate(f"() => {{ {script} }}")
        return "Script executed successfully"

@controller.action("检查元素是否存在")
async def check_element_exists(
    browser: BrowserContext,
    selector: str,
    timeout: int = 5000
) -> dict:
    """
    自定义工具:检查元素是否存在
    
    返回详细的元素状态信息
    """
    page = await browser.get_current_page()
    
    try:
        element = await page.wait_for_selector(selector, timeout=timeout, state='attached')
        
        if element:
            # 获取元素详细信息
            info = await element.evaluate("""
                el => ({
                    tagName: el.tagName,
                    id: el.id,
                    className: el.className,
                    textContent: el.textContent?.substring(0, 100),
                    isVisible: el.offsetParent !== null,
                    rect: el.getBoundingClientRect(),
                })
            """)
            
            return {
                "exists": True,
                "info": info,
            }
    except:
        pass
    
    return {
        "exists": False,
        "selector": selector,
    }

# ============================================================
# 3. 工具调用调试
# ============================================================

async def debug_tool_calls():
    """
    调试工具调用过程
    """
    llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
    
    # 创建带调试功能的 Agent
    agent = Agent(
        task="访问 example.com,获取页面性能指标",
        llm=llm,
        controller=controller,  # 使用自定义控制器
    )
    
    # 执行并观察工具调用
    result = await agent.run()
    
    # 分析执行历史
    print("\n=== 工具调用分析 ===")
    print(f"总步数: {result.total_steps()}")
    print(f"总 Token: {result.total_tokens()}")
    
    # 打印每一步的工具调用
    for i, step in enumerate(result.steps):
        print(f"\n步骤 {i+1}:")
        print(f"  模型思考: {step.model_thought[:200]}...")
        print(f"  执行动作: {step.action}")
        print(f"  执行结果: {step.result}")

# ============================================================
# 4. 工具调用最佳实践
# ============================================================

"""
工具调用最佳实践:

1. 工具设计原则
   - 单一职责:每个工具只做一件事
   - 明确参数:参数名和类型清晰
   - 错误处理:返回明确的错误信息
   - 幂等性:同一操作多次执行结果一致

2. 提示词优化
   - 清晰的工具描述
   - 明确的参数说明
   - 使用示例

3. 性能考虑
   - 避免频繁的工具调用
   - 合并相关的操作
   - 缓存重复的结果

4. 安全考虑
   - 验证所有输入参数
   - 限制操作范围
   - 避免执行危险操作
"""

# 使用示例
if __name__ == "__main__":
    asyncio.run(debug_tool_calls())

第6章:典型应用场景实战

6.1 场景一:自动化表单填写与提交

# scenarios/form_automation.py
"""
场景一:自动化表单填写与提交
适用于:用户注册、信息录入、调查问卷等场景
"""
from browser_use import Agent, Browser, BrowserConfig
from langchain_openai import ChatOpenAI
import asyncio
import json

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

# ============================================================
# 1. 基础表单填写
# ============================================================

async def basic_form_filling():
    """
    基础表单填写示例
    演示如何自动填写简单的注册表单
    """
    agent = Agent(
        task="""
        完成用户注册表单:
        
        目标网站:https://example.com/register
        
        填写信息:
        - 用户名:auto_test_user_001
        - 电子邮箱:autotest001@example.com
        - 密码:TestPass123!
        - 确认密码:TestPass123!
        - 手机号码:13800138001
        - 性别:男
        - 出生日期:1990-01-01
        
        操作步骤:
        1. 访问注册页面
        2. 依次填写所有字段
        3. 勾选"同意服务条款"复选框
        4. 点击"注册"按钮
        5. 验证注册成功(检查页面跳转或成功消息)
        
        注意事项:
        - 如果用户名已存在,尝试 auto_test_user_002、003 等
        - 确保所有必填字段都已填写
        - 截图保存注册结果
        """,
        llm=llm,
        max_steps=30,
    )
    
    result = await agent.run()
    
    if result.is_successful():
        print("✅ 表单填写成功")
        print(f"结果: {result.final_result()}")
    else:
        print("❌ 表单填写失败")
        print(f"错误: {result.errors()}")
    
    return result

# ============================================================
# 2. 动态表单处理
# ============================================================

async def dynamic_form_handling():
    """
    动态表单处理
    处理条件显示、动态添加字段等复杂表单
    """
    agent = Agent(
        task="""
        完成动态表单填写:
        
        目标:填写一个带有条件字段的申请表
        
        基础信息:
        - 姓名:张三
        - 身份证号:110101199001011234
        - 申请类型:企业申请
        
        条件字段(根据申请类型动态显示):
        - 企业名称:测试科技有限公司
        - 统一社会信用代码:91110108MA0012345
        - 注册资本:1000万元
        - 成立日期:2020-01-01
        
        操作步骤:
        1. 访问申请页面
        2. 填写基础信息
        3. 选择"企业申请"类型
        4. 等待企业相关字段加载
        5. 填写企业信息
        6. 上传营业执照(如果有上传按钮)
        7. 提交申请
        8. 保存申请编号
        
        处理策略:
        - 如果字段未显示,先选择对应的申请类型
        - 如果上传失败,记录错误继续提交
        - 验证所有必填项已填写
        """,
        llm=llm,
        max_steps=40,
    )
    
    return await agent.run()

# ============================================================
# 3. 批量表单填写
# ============================================================

async def batch_form_filling(data_list: list):
    """
    批量表单填写
    使用相同模板填写多组数据
    
    Args:
        data_list: 包含多组表单数据的列表
    """
    browser = Browser(config=BrowserConfig(headless=True))
    results = []
    
    try:
        for i, data in enumerate(data_list):
            print(f"\n处理第 {i+1}/{len(data_list)} 条数据...")
            
            # 构建任务描述
            task = f"""
            填写数据录入表单:
            
            数据编号:{data.get('id', i+1)}
            
            填写内容:
            {json.dumps(data, ensure_ascii=False, indent=2)}
            
            操作要求:
            1. 访问 https://example.com/data-entry
            2. 清空所有字段(如有旧数据)
            3. 按顺序填写所有字段
            4. 点击"保存"按钮
            5. 确认保存成功
            6. 点击"添加下一条"继续
            
            数据验证:
            - 确保数值字段格式正确
            - 日期字段使用 YYYY-MM-DD 格式
            - 必填字段不能为空
            """
            
            agent = Agent(
                task=task,
                llm=llm,
                browser=browser,
                max_steps=25,
            )
            
            result = await agent.run()
            
            results.append({
                "index": i,
                "data": data,
                "success": result.is_successful(),
                "result": result.final_result(),
            })
            
            # 短暂等待,避免请求过快
            await asyncio.sleep(2)
            
    finally:
        await browser.close()
    
    # 输出汇总报告
    success_count = sum(1 for r in results if r["success"])
    print(f"\n批量处理完成:成功 {success_count}/{len(data_list)}")
    
    return results

# 批量数据示例
sample_data = [
    {
        "id": "EMP001",
        "name": "张三",
        "department": "技术部",
        "position": "高级工程师",
        "salary": 25000,
        "entry_date": "2024-01-15",
    },
    {
        "id": "EMP002",
        "name": "李四",
        "department": "产品部",
        "position": "产品经理",
        "salary": 28000,
        "entry_date": "2024-02-01",
    },
    {
        "id": "EMP003",
        "name": "王五",
        "department": "设计部",
        "position": "UI设计师",
        "salary": 22000,
        "entry_date": "2024-03-10",
    },
]

# ============================================================
# 4. 表单验证与错误处理
# ============================================================

async def form_with_validation():
    """
    带验证的表单填写
    处理表单验证错误,自动修正
    """
    agent = Agent(
        task="""
        填写并验证表单:
        
        目标:完成带实时验证的注册表单
        
        初始数据(可能有错误):
        - 邮箱:invalid-email-format
        - 密码:123
        - 手机号:12345678901
        
        验证规则:
        - 邮箱:必须符合邮箱格式
        - 密码:至少8位,包含大小写字母和数字
        - 手机号:11位数字,以1开头
        
        处理流程:
        1. 尝试填写初始数据
        2. 观察验证错误提示
        3. 根据错误提示修正数据:
           - 邮箱改为:valid_user@example.com
           - 密码改为:SecurePass123
           - 手机号改为:13800138001
        4. 重新提交
        5. 验证通过,完成注册
        
        成功标准:
        - 无验证错误提示
        - 页面跳转到成功页面
        - 显示"注册成功"消息
        """,
        llm=llm,
        max_steps=35,
        max_failures=5,  # 允许更多重试
    )
    
    return await agent.run()

# ============================================================
# 5. 生产环境表单自动化类
# ============================================================

class FormAutomation:
    """
    表单自动化类(生产级)
    封装表单填写的常用功能
    """
    
    def __init__(self, llm, browser=None):
        self.llm = llm
        self.browser = browser or Browser(config=BrowserConfig(headless=True))
        self.results = []
    
    async def fill_form(self, form_url: str, data: dict, validation_rules: dict = None) -> dict:
        """
        填写单个表单
        
        Args:
            form_url: 表单页面 URL
            data: 表单数据字典
            validation_rules: 字段验证规则
            
        Returns:
            dict: 执行结果
        """
        # 构建任务描述
        fields_desc = "\n".join([f"- {k}: {v}" for k, v in data.items()])
        
        task = f"""
        填写表单:
        
        页面:{form_url}
        
        填写字段:
        {fields_desc}
        
        操作步骤:
        1. 访问表单页面
        2. 填写所有字段
        3. 提交表单
        4. 验证提交成功
        """
        
        if validation_rules:
            task += f"\n\n验证规则:\n{json.dumps(validation_rules, ensure_ascii=False)}"
        
        agent = Agent(
            task=task,
            llm=self.llm,
            browser=self.browser,
            max_steps=25,
            validate_output=True,
        )
        
        result = await agent.run()
        
        return {
            "success": result.is_successful(),
            "data": data,
            "result": result.final_result(),
            "steps": result.total_steps(),
        }
    
    async def batch_fill(self, form_url: str, data_list: list, delay: int = 2) -> list:
        """
        批量填写表单
        
        Args:
            form_url: 表单页面 URL
            data_list: 数据列表
            delay: 间隔延迟(秒)
            
        Returns:
            list: 所有执行结果
        """
        results = []
        
        for i, data in enumerate(data_list):
            print(f"处理 {i+1}/{len(data_list)}...")
            
            result = await self.fill_form(form_url, data)
            results.append(result)
            
            if i < len(data_list) - 1:  # 不是最后一条
                await asyncio.sleep(delay)
        
        return results
    
    async def close(self):
        """关闭浏览器"""
        await self.browser.close()

# 使用示例
async def production_form_example():
    """生产环境表单自动化示例"""
    automation = FormAutomation(llm=llm)
    
    try:
        # 单条填写
        result = await automation.fill_form(
            form_url="https://example.com/register",
            data={
                "username": "testuser001",
                "email": "test001@example.com",
                "password": "SecurePass123!",
            }
        )
        print(f"填写结果: {result}")
        
        # 批量填写
        batch_results = await automation.batch_fill(
            form_url="https://example.com/bulk-entry",
            data_list=sample_data,
            delay=3,
        )
        print(f"批量填写完成: {len(batch_results)} 条")
        
    finally:
        await automation.close()

# 运行示例
if __name__ == "__main__":
    # 运行基础示例
    asyncio.run(basic_form_filling())

6.2 场景二:电商网站数据抓取与比价

  1. 动态内容处理:页面结构频繁变化,传统选择器易失效
  2. 复杂多步骤任务:需要推理和决策的流程(如比价、筛选)
  3. 自然语言交互测试:验证系统对用户意图的理解
  4. 快速原型验证:无需编写大量代码即可验证自动化可行性

核心优势

Browser-use
核心优势

智能化

LLM 驱动决策

自适应页面变化

容错执行能力

易用性

自然语言描述

简洁 API 设计

快速上手

扩展性

自定义 Action

多 LLM 支持

与现有框架集成

可靠性

基于 Playwright

成熟底层技术

活跃社区支持


第7章:错误处理与调试技巧

7.1 常见错误类型与排查方法

# debugging/error_handling.py
"""
错误处理与排查
Browser-use 常见错误及解决方案
"""
from browser_use import Agent, Browser, BrowserConfig
from langchain_openai import ChatOpenAI
import asyncio
from typing import Dict, List, Optional
import os

# 从环境变量读取API密钥
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
    raise ValueError("请设置 OPENAI_API_KEY 环境变量")

llm = ChatOpenAI(model="gpt-4o", temperature=0.1, api_key=api_key)

# ============================================================
# 1. 常见错误类型分类
# ============================================================

"""
Browser-use 常见错误类型:

1. 页面相关错误
   - TimeoutError: 页面加载超时
   - ElementNotFoundError: 元素未找到
   - NavigationError: 导航失败

2. LLM 相关错误
   - RateLimitError: API 限流
   - TokenLimitError: Token 超限
   - APIError: LLM API 错误

3. 浏览器相关错误
   - BrowserCrashError: 浏览器崩溃
   - ContextError: 上下文错误
   - ScreenshotError: 截图失败

4. 任务执行错误
   - MaxStepsExceeded: 超过最大步数
   - TaskFailedError: 任务执行失败
   - ValidationError: 输出验证失败
"""

# ============================================================
# 2. 错误处理装饰器
# ============================================================

def retry_on_error(max_retries: int = 3, delay: float = 2.0):
    """
    错误重试装饰器
    
    Args:
        max_retries: 最大重试次数
        delay: 重试间隔(秒)
    """
    def decorator(func):
        async def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_retries:
                        print(f"⚠️  尝试 {attempt + 1}/{max_retries + 1} 失败: {e}")
                        print(f"🔄 {delay}秒后重试...")
                        await asyncio.sleep(delay)
                    else:
                        print(f"❌ 所有重试都失败")
            
            raise last_exception
        
        return wrapper
    return decorator

@retry_on_error(max_retries=3, delay=2)
async def robust_agent_execution(task: str, llm) -> Dict:
    """
    带重试机制的 Agent 执行
    
    Args:
        task: 任务描述
        llm: LLM 实例
        
    Returns:
        Dict: 执行结果
    """
    agent = Agent(
        task=task,
        llm=llm,
        max_steps=50,
        max_failures=3,
        retry_delay=5,
    )
    
    result = await agent.run()
    
    return {
        "success": result.is_successful(),
        "result": result.final_result() if result.is_successful() else None,
        "errors": result.errors() if not result.is_successful() else [],
        "steps": result.total_steps(),
    }

# ============================================================
# 3. 错误分类处理
# ============================================================

class ErrorHandler:
    """
    错误处理器
    根据错误类型采取不同的处理策略
    """
    
    @staticmethod
    def classify_error(error: Exception) -> str:
        """
        分类错误类型
        
        Args:
            error: 异常对象
            
        Returns:
            str: 错误类型
        """
        error_msg = str(error).lower()
        
        if "timeout" in error_msg:
            return "timeout"
        elif "element" in error_msg and "not found" in error_msg:
            return "element_not_found"
        elif "rate limit" in error_msg or "429" in error_msg:
            return "rate_limit"
        elif "token" in error_msg:
            return "token_limit"
        elif "navigation" in error_msg:
            return "navigation"
        elif "browser" in error_msg and "crash" in error_msg:
            return "browser_crash"
        else:
            return "unknown"
    
    @staticmethod
    def get_recovery_strategy(error_type: str) -> Dict:
        """
        获取恢复策略
        
        Args:
            error_type: 错误类型
            
        Returns:
            Dict: 恢复策略
        """
        strategies = {
            "timeout": {
                "action": "retry_with_delay",
                "delay": 10,
                "message": "页面加载超时,等待后重试",
            },
            "element_not_found": {
                "action": "refresh_and_retry",
                "message": "元素未找到,刷新页面后重试",
            },
            "rate_limit": {
                "action": "exponential_backoff",
                "initial_delay": 30,
                "message": "API 限流,使用指数退避策略",
            },
            "token_limit": {
                "action": "reduce_context",
                "message": "Token 超限,减少上下文长度",
            },
            "navigation": {
                "action": "verify_url",
                "message": "导航失败,检查 URL 是否正确",
            },
            "browser_crash": {
                "action": "restart_browser",
                "message": "浏览器崩溃,重启浏览器实例",
            },
            "unknown": {
                "action": "log_and_continue",
                "message": "未知错误,记录后继续",
            },
        }
        
        return strategies.get(error_type, strategies["unknown"])

# ============================================================
# 4. 调试信息收集
# ============================================================

class DebugCollector:
    """
    调试信息收集器
    收集执行过程中的调试信息
    """
    
    def __init__(self):
        self.logs: List[Dict] = []
        self.screenshots: List[bytes] = []
        self.network_logs: List[Dict] = []
    
    def log(self, level: str, message: str, context: Dict = None):
        """记录日志"""
        import time
        self.logs.append({
            "timestamp": time.time(),
            "level": level,
            "message": message,
            "context": context or {},
        })
    
    def add_screenshot(self, screenshot: bytes, description: str = ""):
        """添加截图"""
        import time
        self.screenshots.append({
            "data": screenshot,
            "description": description,
            "timestamp": time.time(),
        })
    
    def generate_report(self) -> str:
        """生成调试报告"""
        report = []
        report.append("=" * 50)
        report.append("调试报告")
        report.append("=" * 50)
        
        # 日志汇总
        report.append("\n【日志记录】")
        for log in self.logs:
            report.append(f"[{log['level'].upper()}] {log['message']}")
        
        # 截图信息
        report.append(f"\n【截图】共 {len(self.screenshots)} 张")
        
        return "\n".join(report)

# ============================================================
# 5. 生产级错误处理示例
# ============================================================

async def production_error_handling():
    """
    生产环境错误处理示例
    """
    debug_collector = DebugCollector()
    
    try:
        debug_collector.log("info", "开始执行任务")
        
        agent = Agent(
            task="访问 example.com 并获取标题",
            llm=llm,
            max_steps=30,
        )
        
        result = await agent.run()
        
        if result.is_successful():
            debug_collector.log("info", "任务执行成功")
            return result.final_result()
        else:
            # 收集错误信息
            errors = result.errors()
            debug_collector.log("error", f"任务执行失败: {errors}")
            
            # 分类并处理错误
            for error in errors:
                error_type = ErrorHandler.classify_error(Exception(str(error)))
                strategy = ErrorHandler.get_recovery_strategy(error_type)
                
                debug_collector.log("warning", 
                    f"错误类型: {error_type}, 策略: {strategy['message']}")
            
            # 根据错误类型决定是否重试
            raise Exception(f"任务失败: {errors}")
            
    except Exception as e:
        debug_collector.log("error", f"异常: {e}")
        
        # 生成调试报告
        report = debug_collector.generate_report()
        print(report)
        
        # 可以发送告警通知
        # await send_alert(report)
        
        raise

# ============================================================
# 6. 日志配置与级别控制
# ============================================================

import logging
import logging.handlers
import sys
from pathlib import Path

def setup_production_logging(
    log_dir: str = "logs",
    app_name: str = "browser-use",
    level: int = logging.INFO
):
    """
    生产级日志配置
    
    Args:
        log_dir: 日志目录
        app_name: 应用名称
        level: 日志级别
    """
    # 创建日志目录
    Path(log_dir).mkdir(parents=True, exist_ok=True)
    
    # 创建 logger
    logger = logging.getLogger(app_name)
    logger.setLevel(level)
    
    # 清除现有处理器
    logger.handlers = []
    
    # 格式化器
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
    )
    
    # 控制台处理器
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    
    # 文件处理器(按大小轮转)
    file_handler = logging.handlers.RotatingFileHandler(
        f"{log_dir}/{app_name}.log",
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5,
    )
    file_handler.setLevel(level)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    
    return logger

# 运行示例
if __name__ == "__main__":
    # 设置日志
    logger = setup_production_logging()
    logger.info("开始测试错误处理")
    
    # 运行示例
    try:
        asyncio.run(production_error_handling())
    except Exception as e:
        logger.error(f"测试失败: {e}")

第8章:性能优化与资源管理

8.1 浏览器资源管理策略

# performance/resource_management.py
"""
浏览器资源管理
优化内存使用和浏览器实例生命周期
"""
from browser_use import Agent, Browser, BrowserConfig, BrowserContextConfig
from langchain_openai import ChatOpenAI
import asyncio
import psutil
import os
from typing import Optional, List
import weakref

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

# ============================================================
# 1. 浏览器池管理
# ============================================================

class BrowserPool:
    """
    浏览器连接池
    管理多个浏览器实例,实现资源复用
    """
    
    def __init__(
        self,
        max_browsers: int = 5,
        max_contexts_per_browser: int = 3,
        config: Optional[BrowserConfig] = None
    ):
        """
        初始化浏览器池
        
        Args:
            max_browsers: 最大浏览器实例数
            max_contexts_per_browser: 每个浏览器的最大上下文数
            config: 浏览器配置
        """
        self.max_browsers = max_browsers
        self.max_contexts_per_browser = max_contexts_per_browser
        self.config = config or BrowserConfig(headless=True)
        
        # 浏览器实例管理
        self._browsers: List[Browser] = []
        self._context_counts: dict = {}
        self._available_browsers: asyncio.Queue = asyncio.Queue()
        self._lock = asyncio.Lock()
    
    async def acquire_browser(self) -> Browser:
        """
        获取可用浏览器实例
        
        Returns:
            Browser: 浏览器实例
        """
        async with self._lock:
            # 尝试获取现有可用浏览器
            try:
                browser = self._available_browsers.get_nowait()
                if self._context_counts[browser] < self.max_contexts_per_browser:
                    return browser
                else:
                    # 上下文已满,放回队列
                    await self._available_browsers.put(browser)
            except asyncio.QueueEmpty:
                pass
            
            # 创建新浏览器实例
            if len(self._browsers) < self.max_browsers:
                browser = Browser(config=self.config)
                self._browsers.append(browser)
                self._context_counts[browser] = 0
                return browser
            
            # 等待可用浏览器
            browser = await self._available_browsers.get()
            return browser
    
    async def release_browser(self, browser: Browser):
        """
        释放浏览器实例
        
        Args:
            browser: 要释放的浏览器实例
        """
        async with self._lock:
            if browser in self._context_counts:
                await self._available_browsers.put(browser)
    
    async def close_all(self):
        """关闭所有浏览器实例"""
        for browser in self._browsers:
            try:
                await browser.close()
            except Exception as e:
                print(f"关闭浏览器时出错: {e}")
        self._browsers.clear()
        self._context_counts.clear()

# ============================================================
# 2. 内存监控与优化
# ============================================================

class MemoryMonitor:
    """
    内存监控器
    实时监控内存使用并触发优化
    """
    
    def __init__(self, threshold_mb: float = 1024):
        """
        初始化内存监控器
        
        Args:
            threshold_mb: 内存使用阈值(MB)
        """
        self.threshold_mb = threshold_mb
        self.process = psutil.Process(os.getpid())
        self.peak_memory = 0
    
    def get_memory_usage(self) -> dict:
        """
        获取当前内存使用情况
        
        Returns:
            dict: 内存使用信息
        """
        memory_info = self.process.memory_info()
        current_mb = memory_info.rss / 1024 / 1024
        
        if current_mb > self.peak_memory:
            self.peak_memory = current_mb
        
        return {
            "current_mb": current_mb,
            "peak_mb": self.peak_memory,
            "threshold_mb": self.threshold_mb,
            "percent": self.process.memory_percent(),
            "status": "normal" if current_mb < self.threshold_mb else "warning"
        }
    
    def check_memory(self) -> bool:
        """
        检查内存是否超过阈值
        
        Returns:
            bool: 是否超过阈值
        """
        usage = self.get_memory_usage()
        return usage["current_mb"] > self.threshold_mb
    
    async def monitor_loop(self, interval: int = 30):
        """
        内存监控循环
        
        Args:
            interval: 检查间隔(秒)
        """
        while True:
            usage = self.get_memory_usage()
            print(f"内存使用: {usage['current_mb']:.2f}MB / {usage['threshold_mb']}MB")
            
            if usage["status"] == "warning":
                print("⚠️  内存使用超过阈值,触发优化...")
                await self.optimize_memory()
            
            await asyncio.sleep(interval)
    
    async def optimize_memory(self):
        """执行内存优化"""
        # 触发垃圾回收
        import gc
        gc.collect()
        
        # 可以在这里添加其他优化逻辑
        print("✅ 内存优化完成")

# ============================================================
# 3. 并发任务管理
# ============================================================

class ConcurrentTaskManager:
    """
    并发任务管理器
    控制并发数量,优化资源使用
    """
    
    def __init__(
        self,
        max_concurrent: int = 3,
        browser_pool: Optional[BrowserPool] = None
    ):
        """
        初始化并发任务管理器
        
        Args:
            max_concurrent: 最大并发任务数
            browser_pool: 浏览器池实例
        """
        self.max_concurrent = max_concurrent
        self.browser_pool = browser_pool
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []
    
    async def execute_task(
        self,
        task: str,
        task_id: str,
        llm
    ) -> dict:
        """
        执行单个任务(带并发控制)
        
        Args:
            task: 任务描述
            task_id: 任务ID
            llm: LLM实例
            
        Returns:
            dict: 执行结果
        """
        async with self.semaphore:
            print(f"🚀 开始执行任务 {task_id}")
            
            start_time = asyncio.get_event_loop().time()
            
            try:
                # 获取浏览器
                browser = await self.browser_pool.acquire_browser()
                
                # 创建Agent并执行
                agent = Agent(
                    task=task,
                    llm=llm,
                    browser=browser,
                    max_steps=50,
                )
                
                result = await agent.run()
                
                # 释放浏览器
                await self.browser_pool.release_browser(browser)
                
                end_time = asyncio.get_event_loop().time()
                
                return {
                    "task_id": task_id,
                    "success": result.is_successful(),
                    "result": result.final_result() if result.is_successful() else None,
                    "errors": result.errors() if not result.is_successful() else [],
                    "duration": end_time - start_time,
                }
                
            except Exception as e:
                end_time = asyncio.get_event_loop().time()
                return {
                    "task_id": task_id,
                    "success": False,
                    "result": None,
                    "errors": [str(e)],
                    "duration": end_time - start_time,
                }
    
    async def execute_batch(
        self,
        tasks: List[dict],
        llm
    ) -> List[dict]:
        """
        批量执行任务
        
        Args:
            tasks: 任务列表,每个任务包含task和task_id
            llm: LLM实例
            
        Returns:
            List[dict]: 执行结果列表
        """
        # 创建所有任务
        coroutines = [
            self.execute_task(t["task"], t["task_id"], llm)
            for t in tasks
        ]
        
        # 并发执行
        results = await asyncio.gather(*coroutines, return_exceptions=True)
        
        # 处理结果
        processed_results = []
        for result in results:
            if isinstance(result, Exception):
                processed_results.append({
                    "success": False,
                    "errors": [str(result)]
                })
            else:
                processed_results.append(result)
        
        return processed_results

# ============================================================
# 4. 性能监控与报告
# ============================================================

class PerformanceMonitor:
    """
    性能监控器
    收集和分析性能指标
    """
    
    def __init__(self):
        self.metrics = []
    
    def record_metric(
        self,
        task_id: str,
        metric_type: str,
        value: float,
        metadata: dict = None
    ):
        """
        记录性能指标
        
        Args:
            task_id: 任务ID
            metric_type: 指标类型
            value: 指标值
            metadata: 附加元数据
        """
        self.metrics.append({
            "task_id": task_id,
            "type": metric_type,
            "value": value,
            "timestamp": asyncio.get_event_loop().time(),
            "metadata": metadata or {}
        })
    
    def generate_report(self) -> dict:
        """
        生成性能报告
        
        Returns:
            dict: 性能报告
        """
        if not self.metrics:
            return {"message": "暂无性能数据"}
        
        # 按类型分组统计
        type_stats = {}
        for metric in self.metrics:
            mtype = metric["type"]
            if mtype not in type_stats:
                type_stats[mtype] = []
            type_stats[mtype].append(metric["value"])
        
        # 计算统计信息
        report = {}
        for mtype, values in type_stats.items():
            report[mtype] = {
                "count": len(values),
                "avg": sum(values) / len(values),
                "min": min(values),
                "max": max(values),
            }
        
        return report

# ============================================================
# 5. 使用示例
# ============================================================

async def resource_management_demo():
    """
    资源管理综合示例
    """
    # 创建浏览器池
    browser_pool = BrowserPool(
        max_browsers=3,
        max_contexts_per_browser=2
    )
    
    # 创建内存监控器
    memory_monitor = MemoryMonitor(threshold_mb=2048)
    
    # 创建并发任务管理器
    task_manager = ConcurrentTaskManager(
        max_concurrent=3,
        browser_pool=browser_pool
    )
    
    # 创建性能监控器
    perf_monitor = PerformanceMonitor()
    
    try:
        # 准备批量任务
        tasks = [
            {
                "task_id": f"task_{i}",
                "task": f"访问 https://example.com/page{i} 并获取标题"
            }
            for i in range(10)
        ]
        
        # 检查初始内存
        print(f"初始内存: {memory_monitor.get_memory_usage()}")
        
        # 执行批量任务
        results = await task_manager.execute_batch(tasks, llm)
        
        # 检查结果
        success_count = sum(1 for r in results if r.get("success"))
        print(f"任务完成: {success_count}/{len(tasks)} 成功")
        
        # 检查最终内存
        print(f"最终内存: {memory_monitor.get_memory_usage()}")
        
        return results
        
    finally:
        # 清理资源
        await browser_pool.close_all()

# 运行示例
if __name__ == "__main__":
    asyncio.run(resource_management_demo())

8.2 执行速度优化技巧

# performance/speed_optimization.py
"""
执行速度优化
提升Agent任务执行效率
"""
from browser_use import Agent, Browser, BrowserConfig
from langchain_openai import ChatOpenAI
import asyncio
from typing import List

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

# ============================================================
# 1. 任务并行化
# ============================================================

async def parallel_tasks():
    """
    并行执行多个独立任务
    """
    tasks = [
        Agent(
            task="访问 https://site1.com 获取标题",
            llm=llm,
            max_steps=20,
        ).run(),
        Agent(
            task="访问 https://site2.com 获取标题",
            llm=llm,
            max_steps=20,
        ).run(),
        Agent(
            task="访问 https://site3.com 获取标题",
            llm=llm,
            max_steps=20,
        ).run(),
    ]
    
    # 并行执行所有任务
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

# ============================================================
# 2. 预热与缓存
# ============================================================

class WarmupManager:
    """
    预热管理器
    预先初始化资源,减少首次执行延迟
    """
    
    def __init__(self):
        self.warmed_up = False
        self._browser: Browser = None
    
    async def warmup(self):
        """执行预热"""
        if self.warmed_up:
            return
        
        print("🔥 开始预热...")
        
        # 预热浏览器
        self._browser = Browser(config=BrowserConfig(headless=True))
        
        # 执行一个简单的任务来预热LLM连接
        warmup_agent = Agent(
            task="访问 https://example.com",
            llm=llm,
            browser=self._browser,
            max_steps=5,
        )
        await warmup_agent.run()
        
        self.warmed_up = True
        print("✅ 预热完成")
    
    def get_browser(self) -> Browser:
        """获取预热的浏览器实例"""
        return self._browser
    
    async def cleanup(self):
        """清理资源"""
        if self._browser:
            await self._browser.close()

# ============================================================
# 3. 智能重试策略
# ============================================================

async def smart_retry(
    task: str,
    llm,
    max_retries: int = 3,
    base_delay: float = 1.0
) -> dict:
    """
    智能重试机制
    使用指数退避策略
    
    Args:
        task: 任务描述
        llm: LLM实例
        max_retries: 最大重试次数
        base_delay: 基础延迟
        
    Returns:
        dict: 执行结果
    """
    for attempt in range(max_retries + 1):
        try:
            agent = Agent(
                task=task,
                llm=llm,
                max_steps=50,
            )
            
            result = await agent.run()
            
            if result.is_successful():
                return {
                    "success": True,
                    "result": result.final_result(),
                    "attempts": attempt + 1
                }
            
            # 任务失败但无异常,可能是业务逻辑失败
            if attempt < max_retries:
                delay = base_delay * (2 ** attempt)
                print(f"任务失败,{delay}秒后重试...")
                await asyncio.sleep(delay)
                
        except Exception as e:
            if attempt < max_retries:
                delay = base_delay * (2 ** attempt)
                print(f"异常: {e}{delay}秒后重试...")
                await asyncio.sleep(delay)
            else:
                return {
                    "success": False,
                    "error": str(e),
                    "attempts": attempt + 1
                }
    
    return {
        "success": False,
        "error": "达到最大重试次数",
        "attempts": max_retries + 1
    }

# ============================================================
# 4. 批处理优化
# ============================================================

class BatchOptimizer:
    """
    批处理优化器
    合并小任务,减少开销
    """
    
    def __init__(self, batch_size: int = 5):
        self.batch_size = batch_size
        self.pending_tasks = []
    
    def add_task(self, task: str) -> str:
        """
        添加任务到批次
        
        Args:
            task: 任务描述
            
        Returns:
            str: 任务ID
        """
        task_id = f"task_{len(self.pending_tasks)}"
        self.pending_tasks.append({
            "id": task_id,
            "task": task
        })
        return task_id
    
    async def execute_batch(self, llm) -> List[dict]:
        """
        执行批次任务
        
        Args:
            llm: LLM实例
            
        Returns:
            List[dict]: 执行结果
        """
        if not self.pending_tasks:
            return []
        
        # 合并任务描述
        combined_task = self._combine_tasks(self.pending_tasks)
        
        # 执行合并后的任务
        agent = Agent(
            task=combined_task,
            llm=llm,
            max_steps=100,
        )
        
        result = await agent.run()
        
        # 解析结果并分配给各个任务
        # 这里需要根据实际返回格式解析
        results = []
        for task in self.pending_tasks:
            results.append({
                "task_id": task["id"],
                "success": result.is_successful(),
                "result": result.final_result() if result.is_successful() else None
            })
        
        # 清空待处理列表
        self.pending_tasks = []
        
        return results
    
    def _combine_tasks(self, tasks: List[dict]) -> str:
        """合并多个任务描述"""
        combined = ["执行以下多个任务:"]
        for i, task in enumerate(tasks, 1):
            combined.append(f"\n任务{i}: {task['task']}")
        combined.append("\n请按顺序完成以上所有任务,并分别返回结果。")
        return "\n".join(combined)

# 运行示例
if __name__ == "__main__":
    # 测试并行任务
    results = asyncio.run(parallel_tasks())
    print(f"并行任务完成: {len(results)} 个")

第9章:安全与合规

9.1 敏感数据处理

# security/sensitive_data.py
"""
敏感数据处理
安全处理密码、密钥等敏感信息
"""
from browser_use import Agent, Browser, BrowserConfig
from langchain_openai import ChatOpenAI
import os
from typing import Optional
from cryptography.fernet import Fernet
import hashlib
import base64

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

# ============================================================
# 1. 环境变量管理
# ============================================================

class SecureConfig:
    """
    安全配置管理
    从环境变量安全读取敏感信息
    """
    
    @staticmethod
    def get_api_key(provider: str) -> str:
        """
        获取API密钥
        
        Args:
            provider: 提供商名称 (openai/azure/anthropic等)
            
        Returns:
            str: API密钥
        """
        env_vars = {
            "openai": "OPENAI_API_KEY",
            "azure": "AZURE_OPENAI_API_KEY",
            "anthropic": "ANTHROPIC_API_KEY",
            "google": "GOOGLE_API_KEY",
        }
        
        env_var = env_vars.get(provider)
        if not env_var:
            raise ValueError(f"未知的提供商: {provider}")
        
        api_key = os.getenv(env_var)
        if not api_key:
            raise ValueError(f"环境变量 {env_var} 未设置")
        
        return api_key
    
    @staticmethod
    def get_credentials(service: str) -> dict:
        """
        获取服务凭证
        
        Args:
            service: 服务名称
            
        Returns:
            dict: 凭证信息
        """
        prefix = service.upper()
        
        credentials = {
            "username": os.getenv(f"{prefix}_USERNAME"),
            "password": os.getenv(f"{prefix}_PASSWORD"),
        }
        
        if not all(credentials.values()):
            raise ValueError(f"服务 {service} 的凭证未完整配置")
        
        return credentials

# ============================================================
# 2. 数据加密
# ============================================================

class DataEncryption:
    """
    数据加密工具
    加密敏感数据
    """
    
    def __init__(self, key: Optional[bytes] = None):
        """
        初始化加密器
        
        Args:
            key: 加密密钥,None则自动生成
        """
        if key is None:
            key = Fernet.generate_key()
        
        self.cipher = Fernet(key)
        self.key = key
    
    def encrypt(self, data: str) -> str:
        """
        加密数据
        
        Args:
            data: 要加密的数据
            
        Returns:
            str: 加密后的数据
        """
        encoded = data.encode('utf-8')
        encrypted = self.cipher.encrypt(encoded)
        return base64.urlsafe_b64encode(encrypted).decode('utf-8')
    
    def decrypt(self, encrypted_data: str) -> str:
        """
        解密数据
        
        Args:
            encrypted_data: 加密的数据
            
        Returns:
            str: 解密后的数据
        """
        decoded = base64.urlsafe_b64decode(encrypted_data.encode('utf-8'))
        decrypted = self.cipher.decrypt(decoded)
        return decrypted.decode('utf-8')
    
    @staticmethod
    def hash_data(data: str) -> str:
        """
        计算数据哈希
        
        Args:
            data: 要哈希的数据
            
        Returns:
            str: SHA-256哈希值
        """
        return hashlib.sha256(data.encode('utf-8')).hexdigest()

# ============================================================
# 3. 安全任务执行
# ============================================================

class SecureAgent:
    """
    安全Agent
    在任务执行中保护敏感信息
    """
    
    def __init__(self, llm):
        self.llm = llm
        self.sensitive_patterns = [
            r'password[:\s]+\S+',
            r'api[_-]?key[:\s]+\S+',
            r'secret[:\s]+\S+',
            r'token[:\s]+\S+',
        ]
    
    async def execute_with_credentials(
        self,
        task_template: str,
        credentials: dict,
        url: str
    ):
        """
        使用凭证安全执行任务
        
        Args:
            task_template: 任务模板,使用占位符如 {username}
            credentials: 凭证字典
            url: 目标URL
            
        Returns:
            执行结果
        """
        # 替换模板中的占位符
        task = task_template.format(**credentials)
        
        # 添加安全提示
        secure_task = f"""
{task}

安全要求:
1. 不要在日志中记录密码或其他敏感信息
2. 确保使用HTTPS连接
3. 任务完成后清除表单中的敏感数据
4. 不要截图包含密码的页面
"""
        
        agent = Agent(
            task=secure_task,
            llm=self.llm,
            max_steps=30,
        )
        
        return await agent.run()
    
    def sanitize_log(self, message: str) -> str:
        """
        清理日志中的敏感信息
        
        Args:
            message: 原始日志消息
            
        Returns:
            str: 清理后的消息
        """
        import re
        
        sanitized = message
        for pattern in self.sensitive_patterns:
            sanitized = re.sub(
                pattern,
                lambda m: m.group(0).split(':')[0] + ': ***',
                sanitized,
                flags=re.IGNORECASE
            )
        
        return sanitized

# ============================================================
# 4. 使用示例
# ============================================================

async def secure_login_example():
    """
    安全登录示例
    """
    # 从环境变量获取凭证
    credentials = SecureConfig.get_credentials("TEST_SITE")
    
    # 创建安全Agent
    secure_agent = SecureAgent(llm)
    
    # 定义任务模板
    task_template = """
    登录测试网站:
    1. 访问 https://secure.example.com/login
    2. 在用户名框输入:{username}
    3. 在密码框输入:{password}
    4. 点击登录按钮
    5. 验证登录成功
    """
    
    # 安全执行任务
    result = await secure_agent.execute_with_credentials(
        task_template=task_template,
        credentials=credentials,
        url="https://secure.example.com"
    )
    
    return result

if __name__ == "__main__":
    import asyncio
    asyncio.run(secure_login_example())

9.2 访问控制与审计

# security/access_control.py
"""
访问控制与审计
实现任务执行的权限控制和审计日志
"""
from browser_use import Agent
from langchain_openai import ChatOpenAI
from datetime import datetime
from typing import List, Dict, Optional
from enum import Enum
import json

llm = ChatOpenAI(model="gpt-4o", temperature=0.1)

# ============================================================
# 1. 权限定义
# ============================================================

class Permission(Enum):
    """权限枚举"""
    READ = "read"                    # 只读访问
    WRITE = "write"                  # 写入权限
    DELETE = "delete"                # 删除权限
    ADMIN = "admin"                  # 管理权限
    SCRAPE = "scrape"                # 数据抓取
    FORM_FILL = "form_fill"          # 表单填写
    NAVIGATE = "navigate"            # 页面导航

# ============================================================
# 2. 用户角色
# ============================================================

class Role:
    """用户角色"""
    
    def __init__(self, name: str, permissions: List[Permission]):
        self.name = name
        self.permissions = set(permissions)
    
    def has_permission(self, permission: Permission) -> bool:
        """检查是否有指定权限"""
        return permission in self.permissions

# 预定义角色
ROLES = {
    "viewer": Role("viewer", [Permission.READ, Permission.NAVIGATE]),
    "operator": Role("operator", [
        Permission.READ, Permission.WRITE,
        Permission.NAVIGATE, Permission.FORM_FILL
    ]),
    "analyst": Role("analyst", [
        Permission.READ, Permission.SCRAPE,
        Permission.NAVIGATE
    ]),
    "admin": Role("admin", list(Permission)),
}

# ============================================================
# 3. 审计日志
# ============================================================

class AuditLogger:
    """
    审计日志记录器
    记录所有关键操作
    """
    
    def __init__(self, log_file: str = "audit.log"):
        self.log_file = log_file
        self.entries: List[Dict] = []
    
    def log(
        self,
        user_id: str,
        action: str,
        resource: str,
        status: str,
        details: Dict = None
    ):
        """
        记录审计日志
        
        Args:
            user_id: 用户ID
            action: 执行的动作
            resource: 操作的资源
            status: 执行状态
            details: 详细信息
        """
        entry = {
            "timestamp": datetime.now().isoformat(),
            "user_id": user_id,
            "action": action,
            "resource": resource,
            "status": status,
            "details": details or {}
        }
        
        self.entries.append(entry)
        
        # 写入文件
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(json.dumps(entry, ensure_ascii=False) + '\n')
    
    def query(
        self,
        user_id: Optional[str] = None,
        action: Optional[str] = None,
        start_time: Optional[datetime] = None,
        end_time: Optional[datetime] = None
    ) -> List[Dict]:
        """
        查询审计日志
        
        Args:
            user_id: 用户ID过滤
            action: 动作过滤
            start_time: 开始时间
            end_time: 结束时间
            
        Returns:
            List[Dict]: 匹配的日志条目
        """
        results = self.entries
        
        if user_id:
            results = [e for e in results if e["user_id"] == user_id]
        
        if action:
            results = [e for e in results if e["action"] == action]
        
        if start_time:
            results = [
                e for e in results
                if datetime.fromisoformat(e["timestamp"]) >= start_time
            ]
        
        if end_time:
            results = [
                e for e in results
                if datetime.fromisoformat(e["timestamp"]) <= end_time
            ]
        
        return results
    
    def generate_report(self, period: str = "daily") -> Dict:
        """
        生成审计报告
        
        Args:
            period: 报告周期 (daily/weekly/monthly)
            
        Returns:
            Dict: 审计报告
        """
        stats = {
            "total_actions": len(self.entries),
            "actions_by_type": {},
            "actions_by_user": {},
            "success_rate": 0,
        }
        
        success_count = 0
        
        for entry in self.entries:
            # 按类型统计
            action = entry["action"]
            stats["actions_by_type"][action] = \
                stats["actions_by_type"].get(action, 0) + 1
            
            # 按用户统计
            user = entry["user_id"]
            stats["actions_by_user"][user] = \
                stats["actions_by_user"].get(user, 0) + 1
            
            # 成功率
            if entry["status"] == "success":
                success_count += 1
        
        if stats["total_actions"] > 0:
            stats["success_rate"] = success_count / stats["total_actions"]
        
        return stats

# ============================================================
# 4. 访问控制管理器
# ============================================================

class AccessControlManager:
    """
    访问控制管理器
    管理用户权限和访问控制
    """
    
    def __init__(self):
        self.users: Dict[str, Role] = {}
        self.audit_logger = AuditLogger()
    
    def assign_role(self, user_id: str, role_name: str):
        """
        分配角色给用户
        
        Args:
            user_id: 用户ID
            role_name: 角色名称
        """
        if role_name not in ROLES:
            raise ValueError(f"未知角色: {role_name}")
        
        self.users[user_id] = ROLES[role_name]
        
        self.audit_logger.log(
            user_id="system",
            action="assign_role",
            resource=f"user:{user_id}",
            status="success",
            details={"role": role_name}
        )
    
    def check_permission(
        self,
        user_id: str,
        permission: Permission
    ) -> bool:
        """
        检查用户权限
        
        Args:
            user_id: 用户ID
            permission: 要检查的权限
            
        Returns:
            bool: 是否有权限
        """
        role = self.users.get(user_id)
        if not role:
            return False
        
        return role.has_permission(permission)
    
    async def execute_with_auth(
        self,
        user_id: str,
        task: str,
        required_permission: Permission,
        resource: str
    ):
        """
        带权限检查的任务执行
        
        Args:
            user_id: 用户ID
            task: 任务描述
            required_permission: 所需权限
            resource: 操作的资源
            
        Returns:
            执行结果
        """
        # 检查权限
        if not self.check_permission(user_id, required_permission):
            self.audit_logger.log(
                user_id=user_id,
                action="execute_task",
                resource=resource,
                status="denied",
                details={"reason": "insufficient_permissions"}
            )
            raise PermissionError(f"用户 {user_id} 没有 {required_permission.value} 权限")
        
        # 记录开始
        self.audit_logger.log(
            user_id=user_id,
            action="execute_task",
            resource=resource,
            status="started",
            details={"task": task[:100]}  # 只记录前100字符
        )
        
        try:
            # 执行任务
            agent = Agent(
                task=task,
                llm=llm,
                max_steps=50,
            )
            
            result = await agent.run()
            
            # 记录成功
            self.audit_logger.log(
                user_id=user_id,
                action="execute_task",
                resource=resource,
                status="success",
                details={
                    "steps": result.total_steps(),
                    "success": result.is_successful()
                }
            )
            
            return result
            
        except Exception as e:
            # 记录失败
            self.audit_logger.log(
                user_id=user_id,
                action="execute_task",
                resource=resource,
                status="failed",
                details={"error": str(e)}
            )
            raise

# ============================================================
# 5. 使用示例
# ============================================================

async def access_control_demo():
    """
    访问控制演示
    """
    # 创建访问控制管理器
    acm = AccessControlManager()
    
    # 分配角色
    acm.assign_role("user1", "viewer")
    acm.assign_role("user2", "operator")
    acm.assign_role("admin1", "admin")
    
    # 测试权限检查
    print("权限检查:")
    print(f"user1 是否有 SCRAPE 权限: {acm.check_permission('user1', Permission.SCRAPE)}")
    print(f"user2 是否有 FORM_FILL 权限: {acm.check_permission('user2', Permission.FORM_FILL)}")
    print(f"admin1 是否有 ADMIN 权限: {acm.check_permission('admin1', Permission.ADMIN)}")
    
    # 生成审计报告
    report = acm.audit_logger.generate_report()
    print(f"\n审计报告: {report}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(access_control_demo())

第10章:实战案例与项目模板

10.1 完整项目:自动化测试平台

# projects/automation_testing_platform/
"""
自动化测试平台
基于 browser-use 的完整企业级测试解决方案
"""
from browser_use import Agent, Browser, BrowserConfig, Controller
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import List, Dict, Optional, Callable
import asyncio
import json
from datetime import datetime
from pathlib import Path
import yaml

# ============================================================
# 1. 测试用例定义
# ============================================================

class TestCase(BaseModel):
    """测试用例模型"""
    id: str = Field(..., description="测试用例ID")
    name: str = Field(..., description="测试用例名称")
    description: str = Field(..., description="测试描述")
    preconditions: List[str] = Field(default=[], description="前置条件")
    steps: List[str] = Field(..., description="测试步骤")
    expected_results: List[str] = Field(..., description="预期结果")
    priority: str = Field(default="medium", description="优先级")
    tags: List[str] = Field(default=[], description="标签")

class TestSuite(BaseModel):
    """测试套件模型"""
    name: str = Field(..., description="套件名称")
    description: str = Field(..., description="套件描述")
    test_cases: List[TestCase] = Field(..., description="测试用例列表")
    setup: Optional[str] = Field(None, description="套件前置操作")
    teardown: Optional[str] = Field(None, description="套件后置操作")

# ============================================================
# 2. 测试执行引擎
# ============================================================

class TestExecutionEngine:
    """
    测试执行引擎
    执行测试套件并收集结果
    """
    
    def __init__(
        self,
        llm,
        browser_config: Optional[BrowserConfig] = None
    ):
        self.llm = llm
        self.browser_config = browser_config or BrowserConfig(headless=True)
        self.results = []
    
    async def execute_test_case(
        self,
        test_case: TestCase,
        browser: Browser
    ) -> Dict:
        """
        执行单个测试用例
        
        Args:
            test_case: 测试用例
            browser: 浏览器实例
            
        Returns:
            Dict: 执行结果
        """
        print(f"\n{'='*60}")
        print(f"执行测试: {test_case.name} ({test_case.id})")
        print(f"{'='*60}")
        
        start_time = datetime.now()
        
        # 构建任务描述
        task = self._build_task_description(test_case)
        
        try:
            # 创建Agent并执行
            agent = Agent(
                task=task,
                llm=self.llm,
                browser=browser,
                max_steps=100,
            )
            
            result = await agent.run()
            
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            
            # 分析结果
            test_result = {
                "test_id": test_case.id,
                "test_name": test_case.name,
                "status": "passed" if result.is_successful() else "failed",
                "duration": duration,
                "steps_executed": result.total_steps(),
                "output": result.final_result() if result.is_successful() else None,
                "errors": result.errors() if not result.is_successful() else [],
                "timestamp": datetime.now().isoformat(),
            }
            
            status_icon = "✅" if test_result["status"] == "passed" else "❌"
            print(f"{status_icon} 测试结果: {test_result['status'].upper()}")
            print(f"⏱️  执行时间: {duration:.2f}秒")
            
            return test_result
            
        except Exception as e:
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            
            return {
                "test_id": test_case.id,
                "test_name": test_case.name,
                "status": "error",
                "duration": duration,
                "error": str(e),
                "timestamp": datetime.now().isoformat(),
            }
    
    async def execute_test_suite(
        self,
        test_suite: TestSuite
    ) -> Dict:
        """
        执行测试套件
        
        Args:
            test_suite: 测试套件
            
        Returns:
            Dict: 套件执行结果
        """
        print(f"\n{'#'*70}")
        print(f"# 测试套件: {test_suite.name}")
        print(f"# 描述: {test_suite.description}")
        print(f"# 用例数: {len(test_suite.test_cases)}")
        print(f"{'#'*70}")
        
        # 创建浏览器实例
        browser = Browser(config=self.browser_config)
        
        try:
            results = []
            
            # 执行套件前置操作
            if test_suite.setup:
                print("\n🔧 执行套件前置操作...")
                setup_agent = Agent(
                    task=test_suite.setup,
                    llm=self.llm,
                    browser=browser,
                )
                await setup_agent.run()
            
            # 执行所有测试用例
            for test_case in test_suite.test_cases:
                result = await self.execute_test_case(test_case, browser)
                results.append(result)
            
            # 执行套件后置操作
            if test_suite.teardown:
                print("\n🧹 执行套件后置操作...")
                teardown_agent = Agent(
                    task=test_suite.teardown,
                    llm=self.llm,
                    browser=browser,
                )
                await teardown_agent.run()
            
            # 生成套件报告
            suite_report = self._generate_suite_report(test_suite, results)
            
            return suite_report
            
        finally:
            await browser.close()
    
    def _build_task_description(self, test_case: TestCase) -> str:
        """构建任务描述"""
        parts = [
            f"执行测试: {test_case.name}",
            "",
            "前置条件:",
        ]
        
        for precond in test_case.preconditions:
            parts.append(f"  - {precond}")
        
        parts.extend(["", "测试步骤:"])
        for i, step in enumerate(test_case.steps, 1):
            parts.append(f"  {i}. {step}")
        
        parts.extend(["", "预期结果:"])
        for result in test_case.expected_results:
            parts.append(f"  - {result}")
        
        parts.extend([
            "",
            "执行要求:",
            "  - 严格按照步骤执行",
            "  - 验证每个预期结果",
            "  - 发现问题立即停止并报告",
            "  - 返回详细的执行报告",
        ])
        
        return "\n".join(parts)
    
    def _generate_suite_report(
        self,
        suite: TestSuite,
        results: List[Dict]
    ) -> Dict:
        """生成套件报告"""
        total = len(results)
        passed = sum(1 for r in results if r["status"] == "passed")
        failed = sum(1 for r in results if r["status"] == "failed")
        errors = sum(1 for r in results if r["status"] == "error")
        
        total_duration = sum(r["duration"] for r in results)
        
        return {
            "suite_name": suite.name,
            "timestamp": datetime.now().isoformat(),
            "summary": {
                "total": total,
                "passed": passed,
                "failed": failed,
                "errors": errors,
                "pass_rate": passed / total if total > 0 else 0,
                "total_duration": total_duration,
            },
            "results": results,
        }

# ============================================================
# 3. 报告生成器
# ============================================================

class ReportGenerator:
    """
    报告生成器
    生成多种格式的测试报告
    """
    
    @staticmethod
    def generate_html_report(report: Dict, output_path: str):
        """
        生成HTML报告
        
        Args:
            report: 测试报告数据
            output_path: 输出文件路径
        """
        summary = report["summary"]
        
        html = f"""
<!DOCTYPE html>
<html>
<head>
    <title>测试报告 - {report['suite_name']}</title>
    <style>
        body {{ font-family: Arial, sans-serif; margin: 20px; }}
        .header {{ background: #333; color: white; padding: 20px; }}
        .summary {{ display: flex; gap: 20px; margin: 20px 0; }}
        .metric {{ background: #f0f0f0; padding: 15px; border-radius: 5px; }}
        .passed {{ color: green; }}
        .failed {{ color: red; }}
        .error {{ color: orange; }}
        table {{ width: 100%; border-collapse: collapse; margin-top: 20px; }}
        th, td {{ border: 1px solid #ddd; padding: 12px; text-align: left; }}
        th {{ background: #4CAF50; color: white; }}
        tr:nth-child(even) {{ background: #f2f2f2; }}
    </style>
</head>
<body>
    <div class="header">
        <h1>测试报告: {report['suite_name']}</h1>
        <p>生成时间: {report['timestamp']}</p>
    </div>
    
    <div class="summary">
        <div class="metric">
            <h3>总用例数</h3>
            <p>{summary['total']}</p>
        </div>
        <div class="metric">
            <h3>通过</h3>
            <p class="passed">{summary['passed']} ✅</p>
        </div>
        <div class="metric">
            <h3>失败</h3>
            <p class="failed">{summary['failed']} ❌</p>
        </div>
        <div class="metric">
            <h3>错误</h3>
            <p class="error">{summary['errors']} ⚠️</p>
        </div>
        <div class="metric">
            <h3>通过率</h3>
            <p>{summary['pass_rate']*100:.1f}%</p>
        </div>
        <div class="metric">
            <h3>总耗时</h3>
            <p>{summary['total_duration']:.2f}s</p>
        </div>
    </div>
    
    <table>
        <tr>
            <th>测试ID</th>
            <th>测试名称</th>
            <th>状态</th>
            <th>耗时</th>
            <th>详情</th>
        </tr>
"""
        
        for result in report["results"]:
            status_class = result["status"]
            html += f"""
        <tr>
            <td>{result['test_id']}</td>
            <td>{result['test_name']}</td>
            <td class="{status_class}">{result['status'].upper()}</td>
            <td>{result['duration']:.2f}s</td>
            <td>{result.get('errors', ['-'])[0] if result.get('errors') else '-'}</td>
        </tr>
"""
        
        html += """
    </table>
</body>
</html>
"""
        
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(html)
        
        print(f"\n📊 HTML报告已生成: {output_path}")
    
    @staticmethod
    def generate_json_report(report: Dict, output_path: str):
        """生成JSON报告"""
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
        
        print(f"📄 JSON报告已生成: {output_path}")

# ============================================================
# 4. 配置加载器
# ============================================================

class ConfigLoader:
    """
    配置加载器
    从YAML文件加载测试配置
    """
    
    @staticmethod
    def load_test_suite(config_path: str) -> TestSuite:
        """
        加载测试套件配置
        
        Args:
            config_path: 配置文件路径
            
        Returns:
            TestSuite: 测试套件
        """
        with open(config_path, 'r', encoding='utf-8') as f:
            config = yaml.safe_load(f)
        
        # 解析测试用例
        test_cases = []
        for tc_config in config.get("test_cases", []):
            test_cases.append(TestCase(**tc_config))
        
        return TestSuite(
            name=config["name"],
            description=config.get("description", ""),
            test_cases=test_cases,
            setup=config.get("setup"),
            teardown=config.get("teardown"),
        )

# ============================================================
# 5. 使用示例
# ============================================================

async def run_test_platform():
    """
    运行测试平台示例
    """
    # 初始化LLM
    llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
    
    # 创建测试套件(代码方式)
    test_suite = TestSuite(
        name="电商网站功能测试",
        description="测试电商网站的核心功能",
        setup="访问 https://ecommerce-demo.com 并确保页面加载完成",
        test_cases=[
            TestCase(
                id="TC001",
                name="用户登录功能测试",
                description="验证用户能够正常登录系统",
                preconditions=["用户已注册","测试环境可用"],
                steps=[
                    "访问登录页面",
                    "输入有效的用户名",
                    "输入正确的密码",
                    "点击登录按钮",
                    "验证登录成功",
                ],
                expected_results=[
                    "页面跳转到用户中心",
                    "显示欢迎消息",
                    "用户信息正确显示",
                ],
                priority="high",
                tags=["login", "authentication"],
            ),
            TestCase(
                id="TC002",
                name="商品搜索功能测试",
                description="验证商品搜索功能正常",
                preconditions=["用户已登录"],
                steps=[
                    "在搜索框输入关键词'手机'",
                    "点击搜索按钮",
                    "等待搜索结果加载",
                ],
                expected_results=[
                    "显示搜索结果列表",
                    "结果中包含手机相关商品",
                    "分页功能正常",
                ],
                priority="high",
                tags=["search", "product"],
            ),
        ],
        teardown="登出用户并关闭浏览器",
    )
    
    # 创建执行引擎
    engine = TestExecutionEngine(llm)
    
    # 执行测试套件
    report = await engine.execute_test_suite(test_suite)
    
    # 生成报告
    output_dir = Path("test_reports")
    output_dir.mkdir(exist_ok=True)
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    ReportGenerator.generate_html_report(
        report,
        str(output_dir / f"report_{timestamp}.html")
    )
    
    ReportGenerator.generate_json_report(
        report,
        str(output_dir / f"report_{timestamp}.json")
    )
    
    # 打印摘要
    print(f"\n{'='*70}")
    print("测试执行完成!")
    print(f"总用例: {report['summary']['total']}")
    print(f"通过: {report['summary']['passed']}")
    print(f"失败: {report['summary']['failed']}")
    print(f"错误: {report['summary']['errors']}")
    print(f"通过率: {report['summary']['pass_rate']*100:.1f}%")
    print(f"{'='*70}")

if __name__ == "__main__":
    asyncio.run(run_test_platform())

10.2 项目模板与脚手架

# projects/project-template/config/test_suite.yaml
# 测试套件配置文件示例

name: "网站功能测试套件"
description: "完整的网站功能回归测试"

setup: |
  访问测试网站 https://demo.example.com
  确保页面完全加载
  清除浏览器缓存和Cookie

teardown: |
  清理测试数据
  登出所有用户
  关闭浏览器

test_cases:
  - id: "TC001"
    name: "首页加载测试"
    description: "验证首页能够正常加载"
    preconditions:
      - 网络连接正常
      - 测试环境可用
    steps:
      - 访问首页
      - 等待页面加载完成
      - 检查关键元素是否存在
    expected_results:
      - 页面状态码为200
      - 加载时间小于3秒
      - Logo正确显示
    priority: "high"
    tags: ["homepage", "performance"]

  - id: "TC002"
    name: "用户注册流程测试"
    description: "验证新用户能够成功注册"
    preconditions:
      - 使用未注册的邮箱
    steps:
      - 点击注册按钮
      - 填写注册表单
      - 提交注册
      - 验证邮箱
    expected_results:
      - 注册成功提示
      - 收到验证邮件
      - 能够使用新账户登录
    priority: "high"
    tags: ["registration", "user"]

  - id: "TC003"
    name: "购物车功能测试"
    description: "验证购物车功能正常"
    preconditions:
      - 用户已登录
    steps:
      - 浏览商品列表
      - 添加商品到购物车
      - 查看购物车
      - 修改商品数量
    expected_results:
      - 商品正确添加到购物车
      - 价格计算正确
      - 数量修改生效
    priority: "medium"
    tags: ["cart", "ecommerce"]
# projects/project-template/main.py
"""
Browser-use 项目模板
快速启动新的自动化测试项目
"""
from browser_use import Agent, Browser, BrowserConfig
from langchain_openai import ChatOpenAI
import asyncio
import os
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# ============================================================
# 配置区域
# ============================================================

class Config:
    """项目配置"""
    
    # LLM配置
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    LLM_MODEL = "gpt-4o"
    LLM_TEMPERATURE = 0.1
    
    # 浏览器配置
    HEADLESS = True
    SLOW_MO = 0
    
    # 执行配置
    MAX_STEPS = 100
    MAX_FAILURES = 3

# ============================================================
# 初始化
# ============================================================

def init_llm():
    """初始化LLM"""
    return ChatOpenAI(
        model=Config.LLM_MODEL,
        temperature=Config.LLM_TEMPERATURE,
        api_key=Config.OPENAI_API_KEY,
    )

def init_browser():
    """初始化浏览器"""
    return Browser(config=BrowserConfig(
        headless=Config.HEADLESS,
        slow_mo=Config.SLOW_MO,
    ))

# ============================================================
# 任务定义
# ============================================================

SAMPLE_TASK = """
任务:访问示例网站并获取信息

步骤:
1. 访问 https://example.com
2. 获取页面标题
3. 提取页面主要内容的摘要
4. 返回结果

输出格式:
返回包含标题和摘要的JSON对象
"""

# ============================================================
# 主函数
# ============================================================

async def main():
    """主函数"""
    print("🚀 启动 Browser-use 自动化任务")
    
    # 初始化
    llm = init_llm()
    browser = init_browser()
    
    try:
        # 创建Agent
        agent = Agent(
            task=SAMPLE_TASK,
            llm=llm,
            browser=browser,
            max_steps=Config.MAX_STEPS,
            max_failures=Config.MAX_FAILURES,
        )
        
        # 执行任务
        print("⏳ 正在执行任务...")
        result = await agent.run()
        
        # 处理结果
        if result.is_successful():
            print("\n✅ 任务执行成功!")
            print(f"结果: {result.final_result()}")
        else:
            print("\n❌ 任务执行失败")
            print(f"错误: {result.errors()}")
        
        return result
        
    finally:
        # 清理资源
        await browser.close()
        print("\n👋 任务结束,资源已清理")

# ============================================================
# 入口点
# ============================================================

if __name__ == "__main__":
    asyncio.run(main())
# projects/project-template/requirements.txt
# 项目依赖

browser-use>=0.1.40
playwright>=1.40.0
langchain-openai>=0.1.0
python-dotenv>=1.0.0
pydantic>=2.0.0
pyyaml>=6.0.0
# projects/project-template/.env.example
# 环境变量模板
# 复制此文件为 .env 并填入实际值

# OpenAI API配置
OPENAI_API_KEY=your-openai-api-key-here

# Azure OpenAI配置(可选)
AZURE_OPENAI_API_KEY=your-azure-key
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_VERSION=2024-02-01

# Anthropic配置(可选)
ANTHROPIC_API_KEY=your-anthropic-key

# 测试站点凭证(可选)
TEST_SITE_USERNAME=testuser
TEST_SITE_PASSWORD=testpass
# projects/project-template/README.md
# Browser-use 项目模板

## 快速开始

### 1. 安装依赖

```bash
pip install -r requirements.txt
playwright install

2. 配置环境变量

cp .env.example .env
# 编辑 .env 文件,填入你的API密钥

3. 运行示例

python main.py

项目结构

.
├── config/                 # 配置文件
│   └── test_suite.yaml    # 测试套件配置
├── main.py                # 主入口
├── requirements.txt       # 依赖
├── .env.example          # 环境变量模板
└── README.md             # 本文件

自定义任务

编辑 main.py 中的 SAMPLE_TASK 变量,定义你的自动化任务。

高级用法

参考 config/test_suite.yaml 了解如何配置完整的测试套件。


---

## 附录:常见问题与解决方案

### Q1: 如何处理页面加载超时?

```python
# 增加超时时间
agent = Agent(
    task=task,
    llm=llm,
    max_steps=100,  # 增加最大步数
)

# 在任务描述中明确等待要求
task = """
访问页面后,等待以下元素加载完成:
- 主内容区域
- 导航菜单
- 页脚信息

如果元素未立即出现,等待最多10秒。
"""

Q2: 如何处理动态加载的内容?

task = """
抓取动态加载的商品列表:

1. 访问商品页面
2. 滚动页面触发懒加载
3. 等待新内容加载(观察加载指示器消失)
4. 重复滚动直到没有新内容
5. 抓取所有已加载的商品信息
"""

Q3: 如何降低API调用成本?

# 1. 使用较小的模型
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1)

# 2. 限制任务复杂度
task = """
简洁的任务描述,避免冗余信息
明确指定步骤和预期结果
"""

# 3. 减少max_steps
agent = Agent(
    task=task,
    llm=llm,
    max_steps=30,  # 限制步数
)

Q4: 如何提高执行稳定性?

# 1. 使用重试机制
async def robust_execute(task, llm, max_retries=3):
    for i in range(max_retries):
        try:
            agent = Agent(task=task, llm=llm)
            result = await agent.run()
            if result.is_successful():
                return result
        except Exception as e:
            if i == max_retries - 1:
                raise
            await asyncio.sleep(2 ** i)  # 指数退避

# 2. 使用自定义控制器添加稳定性检查
controller = Controller()

@controller.action("等待页面稳定")
async def wait_for_stable_page(browser):
    # 实现等待逻辑
    await asyncio.sleep(2)
    return ActionResult(extracted_content="页面已稳定")
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐