第八章:测试框架集成与设计模式

8.1 Pytest/TestNG框架集成

# ========== Pytest框架集成示例 ==========

import pytest
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

# ========== Fixture配置 ==========

@pytest.fixture(scope="module")
def driver():
    """
    模块级别的WebDriver fixture
    整个测试模块共享一个浏览器实例
    """
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
    driver.maximize_window()
    driver.implicitly_wait(10)
    
    yield driver
    
    driver.quit()

@pytest.fixture(scope="function")
def fresh_driver():
    """
    函数级别的WebDriver fixture
    每个测试函数使用新的浏览器实例
    """
    driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
    driver.maximize_window()
    
    yield driver
    
    driver.quit()

# ========== 测试类示例 ==========

class TestLogin:
    """
    登录功能测试类
    """
    
    def test_login_success(self, driver):
        """测试登录成功场景"""
        driver.get("https://www.example.com/login")
        
        # 输入用户名和密码
        driver.find_element(By.ID, "username").send_keys("test_user")
        driver.find_element(By.ID, "password").send_keys("test_pass")
        driver.find_element(By.ID, "login-btn").click()
        
        # 验证登录成功
        welcome = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.ID, "welcome"))
        )
        assert "欢迎" in welcome.text
    
    def test_login_failed(self, driver):
        """测试登录失败场景"""
        driver.get("https://www.example.com/login")
        
        # 输入错误密码
        driver.find_element(By.ID, "username").send_keys("test_user")
        driver.find_element(By.ID, "password").send_keys("wrong_pass")
        driver.find_element(By.ID, "login-btn").click()
        
        # 验证错误提示
        error_msg = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.ID, "error"))
        )
        assert "密码错误" in error_msg.text

# ========== 参数化测试 ==========

@pytest.mark.parametrize("username,password,expected", [
    ("user1", "pass1", "成功"),
    ("user2", "wrong", "失败"),
    ("", "pass", "用户名不能为空"),
    ("user", "", "密码不能为空"),
])
def test_login_scenarios(driver, username, password, expected):
    """
    参数化登录测试
    """
    driver.get("https://www.example.com/login")
    
    driver.find_element(By.ID, "username").clear()
    driver.find_element(By.ID, "username").send_keys(username)
    driver.find_element(By.ID, "password").clear()
    driver.find_element(By.ID, "password").send_keys(password)
    driver.find_element(By.ID, "login-btn").click()
    
    if expected == "成功":
        welcome = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.ID, "welcome"))
        )
        assert "欢迎" in welcome.text
    else:
        error = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.ID, "error"))
        )
        assert expected in error.text

# ========== conftest.py配置文件示例 ==========

# conftest.py
"""
import pytest
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service

@pytest.fixture(scope="session")
def browser_type():
    return "chrome"

@pytest.fixture(scope="session")
def headless():
    return False

@pytest.fixture(scope="function")
def driver(browser_type, headless):
    if browser_type == "chrome":
        options = webdriver.ChromeOptions()
        if headless:
            options.add_argument("--headless=new")
        driver = webdriver.Chrome(
            service=Service(ChromeDriverManager().install()),
            options=options
        )
    elif browser_type == "firefox":
        driver = webdriver.Firefox()
    
    driver.maximize_window()
    
    yield driver
    
    # 失败时截图
    import pytest
    if hasattr(pytest, "test_failed") and pytest.test_failed:
        driver.save_screenshot(f"screenshots/failed_{pytest.current_test}.png")
    
    driver.quit()

@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_makereport(item, call):
    outcome = yield
    report = outcome.get_result()
    
    if call.when == "call":
        pytest.test_failed = report.failed
        pytest.current_test = item.name
"""

# 运行测试命令
# pytest tests/ -v --html=report.html --self-contained-html

8.2 断言机制与验证方法详解

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

driver = webdriver.Chrome()
driver.get("https://www.example.com")

# ========== 基本断言 ==========

# 断言页面标题
assert "Example" in driver.title, f"标题验证失败: {driver.title}"

# 断言URL
assert "example.com" in driver.current_url, f"URL验证失败: {driver.current_url}"

# 断言元素存在
element = driver.find_element(By.ID, "username")
assert element is not None, "元素不存在"

# 断言元素可见
assert element.is_displayed(), "元素不可见"

# 断言元素可用
assert element.is_enabled(), "元素不可用"

# 断言元素文本
assert "登录" in element.text, f"文本验证失败: {element.text}"

# 断言元素属性
assert element.get_attribute("type") == "text", "属性验证失败"

# ========== 软断言(收集所有错误)==========

class SoftAssert:
    """
    软断言类
    收集所有断言错误,最后统一抛出
    """
    
    def __init__(self):
        self.errors = []
    
    def assert_true(self, condition, message=""):
        """断言条件为True"""
        if not condition:
            self.errors.append(f"断言失败: {message}")
    
    def assert_false(self, condition, message=""):
        """断言条件为False"""
        if condition:
            self.errors.append(f"断言失败: {message}")
    
    def assert_equal(self, actual, expected, message=""):
        """断言相等"""
        if actual != expected:
            self.errors.append(
                f"断言失败: {message} - 期望: {expected}, 实际: {actual}"
            )
    
    def assert_contains(self, actual, expected, message=""):
        """断言包含"""
        if expected not in actual:
            self.errors.append(
                f"断言失败: {message} - '{expected}' 不在 '{actual}' 中"
            )
    
    def assert_all(self):
        """验证所有断言"""
        if self.errors:
            error_msg = "\n".join(self.errors)
            raise AssertionError(f"发现 {len(self.errors)} 个断言错误:\n{error_msg}")

# 使用软断言示例
soft = SoftAssert()

soft.assert_true(driver.find_element(By.ID, "username").is_displayed(), "用户名输入框应可见")
soft.assert_true(driver.find_element(By.ID, "password").is_displayed(), "密码输入框应可见")
soft.assert_contains(driver.title, "Example", "标题应包含Example")

soft.assert_all()  # 统一抛出所有错误

# ========== 验证工具类 ==========

class VerifyHelper:
    """
    验证工具类
    提供常用的验证方法
    """
    
    def __init__(self, driver):
        self.driver = driver
    
    def verify_title(self, expected_title):
        """验证页面标题"""
        actual = self.driver.title
        assert expected_title in actual, f"标题验证失败: 期望包含 '{expected_title}', 实际 '{actual}'"
    
    def verify_url(self, expected_url):
        """验证URL"""
        actual = self.driver.current_url
        assert expected_url in actual, f"URL验证失败: 期望包含 '{expected_url}', 实际 '{actual}'"
    
    def verify_element_present(self, locator):
        """验证元素存在"""
        elements = self.driver.find_elements(*locator)
        assert len(elements) > 0, f"元素不存在: {locator}"
    
    def verify_element_visible(self, locator):
        """验证元素可见"""
        element = self.driver.find_element(*locator)
        assert element.is_displayed(), f"元素不可见: {locator}"
    
    def verify_element_text(self, locator, expected_text):
        """验证元素文本"""
        element = self.driver.find_element(*locator)
        actual = element.text
        assert expected_text in actual, f"文本验证失败: 期望 '{expected_text}', 实际 '{actual}'"
    
    def verify_element_attribute(self, locator, attribute, expected_value):
        """验证元素属性"""
        element = self.driver.find_element(*locator)
        actual = element.get_attribute(attribute)
        assert actual == expected_value, f"属性验证失败: 期望 '{expected_value}', 实际 '{actual}'"

driver.quit()

8.3 Page Object Model设计模式

# ========== Page Object Model设计模式 ==========

"""
Page Object Model (POM) 是一种设计模式
将页面元素定位和操作封装到独立的类中
优点:
1. 代码复用性高
2. 维护成本低
3. 可读性好
4. 测试与页面结构解耦
"""

# ========== 基类Page ==========

from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By

class BasePage:
    """
    页面基类
    所有页面类的父类
    """
    
    def __init__(self, driver):
        self.driver = driver
        self.wait = WebDriverWait(driver, 10)
    
    def find_element(self, locator):
        """查找元素"""
        return self.driver.find_element(*locator)
    
    def find_elements(self, locator):
        """查找多个元素"""
        return self.driver.find_elements(*locator)
    
    def click(self, locator):
        """点击元素"""
        element = self.wait.until(EC.element_to_be_clickable(locator))
        element.click()
    
    def enter_text(self, locator, text):
        """输入文本"""
        element = self.wait.until(EC.visibility_of_element_located(locator))
        element.clear()
        element.send_keys(text)
    
    def get_text(self, locator):
        """获取文本"""
        element = self.wait.until(EC.visibility_of_element_located(locator))
        return element.text
    
    def is_visible(self, locator):
        """检查元素是否可见"""
        try:
            element = self.wait.until(EC.visibility_of_element_located(locator))
            return element.is_displayed()
        except:
            return False
    
    def wait_for_element(self, locator):
        """等待元素出现"""
        return self.wait.until(EC.presence_of_element_located(locator))
    
    def wait_for_element_disappear(self, locator):
        """等待元素消失"""
        return self.wait.until(EC.invisibility_of_element_located(locator))

# ========== 登录页Page ==========

class LoginPage(BasePage):
    """
    登录页面类
    封装登录页面的元素和操作
    """
    
    # 页面元素定位器
    USERNAME_INPUT = (By.ID, "username")
    PASSWORD_INPUT = (By.ID, "password")
    LOGIN_BUTTON = (By.ID, "login-btn")
    ERROR_MESSAGE = (By.ID, "error-msg")
    FORGOT_PASSWORD_LINK = (By.LINK_TEXT, "忘记密码")
    
    def __init__(self, driver):
        super().__init__(driver)
        self.url = "https://www.example.com/login"
    
    def open(self):
        """打开登录页面"""
        self.driver.get(self.url)
        return self
    
    def enter_username(self, username):
        """输入用户名"""
        self.enter_text(self.USERNAME_INPUT, username)
        return self
    
    def enter_password(self, password):
        """输入密码"""
        self.enter_text(self.PASSWORD_INPUT, password)
        return self
    
    def click_login(self):
        """点击登录按钮"""
        self.click(self.LOGIN_BUTTON)
        return self
    
    def get_error_message(self):
        """获取错误消息"""
        return self.get_text(self.ERROR_MESSAGE)
    
    def is_error_displayed(self):
        """检查错误消息是否显示"""
        return self.is_visible(self.ERROR_MESSAGE)
    
    def login(self, username, password):
        """
        执行登录操作
        封装完整的登录流程
        """
        self.enter_username(username)
        self.enter_password(password)
        self.click_login()
        return HomePage(self.driver)

# ========== 首页Page ==========

class HomePage(BasePage):
    """
    首页类
    """
    
    WELCOME_MESSAGE = (By.ID, "welcome")
    USER_MENU = (By.ID, "user-menu")
    LOGOUT_BUTTON = (By.ID, "logout")
    
    def __init__(self, driver):
        super().__init__(driver)
    
    def get_welcome_message(self):
        """获取欢迎消息"""
        return self.get_text(self.WELCOME_MESSAGE)
    
    def is_logged_in(self):
        """检查是否已登录"""
        return self.is_visible(self.USER_MENU)
    
    def logout(self):
        """退出登录"""
        self.click(self.USER_MENU)
        self.click(self.LOGOUT_BUTTON)
        return LoginPage(self.driver)

# ========== 测试代码 ==========

def test_login_success(driver):
    """使用Page Object的测试"""
    login_page = LoginPage(driver)
    login_page.open()
    
    home_page = login_page.login("test_user", "test_pass")
    
    assert home_page.is_logged_in()
    assert "欢迎" in home_page.get_welcome_message()

8.4 数据驱动测试实现

import pytest
import json
import csv
import yaml
from selenium import webdriver
from selenium.webdriver.common.by import By

# ========== JSON数据驱动 ==========

# test_data.json
"""
[
    {"username": "user1", "password": "pass1", "expected": "成功"},
    {"username": "user2", "password": "wrong", "expected": "失败"},
    {"username": "", "password": "pass", "expected": "用户名不能为空"}
]
"""

def load_json_data(file_path):
    """加载JSON测试数据"""
    with open(file_path, 'r', encoding='utf-8') as f:
        return json.load(f)

@pytest.mark.parametrize("data", load_json_data("test_data.json"))
def test_login_json(driver, data):
    """JSON数据驱动测试"""
    driver.get("https://www.example.com/login")
    
    driver.find_element(By.ID, "username").send_keys(data["username"])
    driver.find_element(By.ID, "password").send_keys(data["password"])
    driver.find_element(By.ID, "login-btn").click()
    
    if data["expected"] == "成功":
        assert "欢迎" in driver.page_source
    else:
        assert data["expected"] in driver.page_source

# ========== CSV数据驱动 ==========

# test_data.csv
"""
username,password,expected
user1,pass1,成功
user2,wrong,失败
,pass,用户名不能为空
"""

def load_csv_data(file_path):
    """加载CSV测试数据"""
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            data.append(row)
    return data

@pytest.mark.parametrize("data", load_csv_data("test_data.csv"))
def test_login_csv(driver, data):
    """CSV数据驱动测试"""
    driver.get("https://www.example.com/login")
    
    driver.find_element(By.ID, "username").send_keys(data["username"])
    driver.find_element(By.ID, "password").send_keys(data["password"])
    driver.find_element(By.ID, "login-btn").click()
    
    if data["expected"] == "成功":
        assert "欢迎" in driver.page_source
    else:
        assert data["expected"] in driver.page_source

# ========== YAML数据驱动 ==========

# test_data.yaml
"""
test_cases:
  - username: user1
    password: pass1
    expected: 成功
  - username: user2
    password: wrong
    expected: 失败
"""

def load_yaml_data(file_path):
    """加载YAML测试数据"""
    with open(file_path, 'r', encoding='utf-8') as f:
        return yaml.safe_load(f)

# ========== 数据驱动工具类 ==========

class DataDriver:
    """
    数据驱动工具类
    支持多种数据源
    """
    
    @staticmethod
    def from_json(file_path):
        """从JSON文件加载"""
        with open(file_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    @staticmethod
    def from_csv(file_path):
        """从CSV文件加载"""
        data = []
        with open(file_path, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                data.append(row)
        return data
    
    @staticmethod
    def from_yaml(file_path):
        """从YAML文件加载"""
        with open(file_path, 'r', encoding='utf-8') as f:
            return yaml.safe_load(f)
    
    @staticmethod
    def from_excel(file_path, sheet_name):
        """从Excel文件加载"""
        import pandas as pd
        df = pd.read_excel(file_path, sheet_name=sheet_name)
        return df.to_dict('records')

8.5 测试报告生成

# ========== Pytest HTML报告 ==========

# 安装: pip install pytest-html

# 运行命令生成报告
# pytest tests/ --html=report.html --self-contained-html

# ========== Allure报告 ==========

# 安装: pip install allure-pytest

import pytest
from selenium import webdriver
from selenium.webdriver.common.by import By
import allure

@allure.feature("登录功能")
class TestLogin:
    """
    登录功能测试
    """
    
    @pytest.fixture(autouse=True)
    def setup(self):
        self.driver = webdriver.Chrome()
        self.driver.maximize_window()
        yield
        self.driver.quit()
    
    @allure.story("正常登录")
    @allure.severity(allure.severity_level.BLOCKER)
    @allure.title("测试用户正常登录")
    def test_login_success(self):
        """测试正常登录"""
        with allure.step("打开登录页面"):
            self.driver.get("https://www.example.com/login")
            allure.attach(
                self.driver.get_screenshot_as_png(),
                "登录页面",
                allure.attachment_type.PNG
            )
        
        with allure.step("输入用户名和密码"):
            self.driver.find_element(By.ID, "username").send_keys("test_user")
            self.driver.find_element(By.ID, "password").send_keys("test_pass")
        
        with allure.step("点击登录按钮"):
            self.driver.find_element(By.ID, "login-btn").click()
        
        with allure.step("验证登录成功"):
            assert "欢迎" in self.driver.page_source
            allure.attach(
                self.driver.get_screenshot_as_png(),
                "登录成功",
                allure.attachment_type.PNG
            )

# 运行Allure报告
# pytest tests/ --alluredir=allure-results
# allure serve allure-results

# ========== 自定义测试报告工具类 ==========

class TestReporter:
    """
    测试报告工具类
    """
    
    def __init__(self, output_dir="reports"):
        self.output_dir = output_dir
        self.results = []
        import os
        os.makedirs(output_dir, exist_ok=True)
    
    def add_result(self, test_name, status, message="", screenshot=None):
        """添加测试结果"""
        import time
        result = {
            "test_name": test_name,
            "status": status,
            "message": message,
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "screenshot": screenshot
        }
        self.results.append(result)
    
    def generate_html_report(self, filename="report.html"):
        """生成HTML报告"""
        import time
        total = len(self.results)
        passed = sum(1 for r in self.results if r["status"] == "PASS")
        failed = total - passed
        
        html = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>测试报告</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; }}
                .summary {{ background: #f5f5f5; padding: 20px; margin-bottom: 20px; }}
                .pass {{ color: green; }}
                .fail {{ color: red; }}
                table {{ width: 100%; border-collapse: collapse; }}
                th, td {{ border: 1px solid #ddd; padding: 10px; text-align: left; }}
                th {{ background: #4CAF50; color: white; }}
            </style>
        </head>
        <body>
            <h1>自动化测试报告</h1>
            <div class="summary">
                <p>生成时间: {time.strftime("%Y-%m-%d %H:%M:%S")}</p>
                <p>总计: {total} | <span class="pass">通过: {passed}</span> | <span class="fail">失败: {failed}</span></p>
            </div>
            <table>
                <tr>
                    <th>测试名称</th>
                    <th>状态</th>
                    <th>消息</th>
                    <th>时间</th>
                </tr>
        """
        
        for result in self.results:
            status_class = "pass" if result["status"] == "PASS" else "fail"
            html += f"""
                <tr>
                    <td>{result['test_name']}</td>
                    <td class="{status_class}">{result['status']}</td>
                    <td>{result['message']}</td>
                    <td>{result['timestamp']}</td>
                </tr>
            """
        
        html += """
            </table>
        </body>
        </html>
        """
        
        import os
        filepath = os.path.join(self.output_dir, filename)
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(html)
        
        return filepath

8.6 配置管理与环境切换

import os
import json
import yaml

# ========== 配置文件示例 ==========

# config.yaml
"""
environments:
  dev:
    base_url: "https://dev.example.com"
    username: "dev_user"
    password: "dev_pass"
    timeout: 10
  test:
    base_url: "https://test.example.com"
    username: "test_user"
    password: "test_pass"
    timeout: 15
  prod:
    base_url: "https://www.example.com"
    username: "prod_user"
    password: "prod_pass"
    timeout: 20

browser:
  type: chrome
  headless: false
  window_size:
    width: 1920
    height: 1080
"""

# ========== 配置管理类 ==========

class ConfigManager:
    """
    配置管理类
    支持多环境配置切换
    """
    
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._config = None
        return cls._instance
    
    def load_config(self, config_file="config.yaml", env="dev"):
        """
        加载配置文件
        
        参数:
            config_file: 配置文件路径
            env: 环境名称
        """
        with open(config_file, 'r', encoding='utf-8') as f:
            all_config = yaml.safe_load(f)
        
        self._config = {
            "env": env,
            **all_config["environments"][env],
            "browser": all_config["browser"]
        }
        
        return self._config
    
    def get(self, key, default=None):
        """获取配置项"""
        if self._config is None:
            raise RuntimeError("配置未加载,请先调用load_config()")
        return self._config.get(key, default)
    
    @property
    def base_url(self):
        return self.get("base_url")
    
    @property
    def timeout(self):
        return self.get("timeout", 10)
    
    @property
    def browser_type(self):
        return self.get("browser", {}).get("type", "chrome")
    
    @property
    def headless(self):
        return self.get("browser", {}).get("headless", False)

# ========== 环境切换工具类 ==========

class EnvironmentManager:
    """
    环境管理类
    """
    
    ENVIRONMENTS = {
        "dev": {
            "base_url": "https://dev.example.com",
            "api_url": "https://api.dev.example.com"
        },
        "test": {
            "base_url": "https://test.example.com",
            "api_url": "https://api.test.example.com"
        },
        "prod": {
            "base_url": "https://www.example.com",
            "api_url": "https://api.example.com"
        }
    }
    
    @classmethod
    def set_environment(cls, env_name):
        """设置当前环境"""
        if env_name not in cls.ENVIRONMENTS:
            raise ValueError(f"未知环境: {env_name}")
        
        os.environ["TEST_ENV"] = env_name
    
    @classmethod
    def get_current_env(cls):
        """获取当前环境"""
        return os.environ.get("TEST_ENV", "dev")
    
    @classmethod
    def get_config(cls):
        """获取当前环境配置"""
        env = cls.get_current_env()
        return cls.ENVIRONMENTS[env]

# ========== 使用示例 ==========

# 加载配置
config = ConfigManager()
config.load_config(env="test")

print(f"Base URL: {config.base_url}")
print(f"Timeout: {config.timeout}")

# 切换环境
EnvironmentManager.set_environment("prod")
env_config = EnvironmentManager.get_config()
print(f"当前环境配置: {env_config}")

第九章:Selenium Grid与分布式测试

9.1 Selenium Grid架构原理

Selenium Grid 4 架构

测试客户端

Router路由器

Distributor分发器

Session Queue会话队列

Node 1
Chrome

Node 2
Firefox

Node 3
Edge

Session Map会话映射

# ========== Selenium Grid 4 架构组件说明 ==========

"""
Selenium Grid 4 组件架构:

1. Router(路由器)
   - 接收所有外部请求
   - 将请求路由到正确的组件
   - 是Grid的入口点

2. Distributor(分发器)
   - 管理Node注册
   - 分配测试会话到合适的Node
   - 负载均衡

3. Session Queue(会话队列)
   - 管理等待中的会话请求
   - 先进先出(FIFO)处理
   - 支持会话优先级

4. Session Map(会话映射)
   - 存储会话ID与Node的映射关系
   - 快速定位会话所在Node

5. Node(节点)
   - 实际执行测试的机器
   - 可以注册多个浏览器
   - 支持并发会话
"""

# ========== Grid工作流程 ==========

"""
测试执行流程:
1. 客户端发送WebDriver请求到Router
2. Router将创建会话请求放入Session Queue
3. Distributor从Session Queue获取请求
4. Distributor选择合适的Node
5. Node创建会话并返回Session ID
6. Session Map存储Session ID与Node的映射
7. 后续请求通过Router直接路由到对应Node
"""

9.2 Grid环境搭建与配置

# ========== Selenium Grid 4 安装 ==========

# 方式一:下载独立的JAR文件
# 下载地址: https://www.selenium.dev/downloads/
# selenium-server-4.x.x.jar

# 方式二:使用Docker
docker pull selenium/hub
docker pull selenium/node-chrome
docker pull selenium/node-firefox

# ========== 启动Standalone模式 ==========

# 单机模式(Hub和Node在同一进程)
java -jar selenium-server-4.x.x.jar standalone

# ========== 启动Hub-Node模式 ==========

# 启动Hub
java -jar selenium-server-4.x.x.jar hub

# 启动Node(在另一台机器或同一机器)
java -jar selenium-server-4.x.x.jar node --detect-drivers true

# ========== 使用Docker Compose ==========

# docker-compose.yml
"""
version: "3"
services:
  selenium-hub:
    image: selenium/hub:4.x.x
    container_name: selenium-hub
    ports:
      - "4442:4442"
      - "4443:4443"
      - "4444:4444"
    environment:
      - GRID_MAX_SESSION=10
      - GRID_BROWSER_TIMEOUT=300
      - GRID_TIMEOUT=300

  chrome:
    image: selenium/node-chrome:4.x.x
    depends_on:
      - selenium-hub
    environment:
      - SE_EVENT_BUS_HOST=selenium-hub
      - SE_EVENT_BUS_PUBLISH_PORT=4442
      - SE_EVENT_BUS_SUBSCRIBE_PORT=4443
      - SE_NODE_MAX_SESSIONS=3
    deploy:
      replicas: 2

  firefox:
    image: selenium/node-firefox:4.x.x
    depends_on:
      - selenium-hub
    environment:
      - SE_EVENT_BUS_HOST=selenium-hub
      - SE_EVENT_BUS_PUBLISH_PORT=4442
      - SE_EVENT_BUS_SUBSCRIBE_PORT=4443
      - SE_NODE_MAX_SESSIONS=3
    deploy:
      replicas: 1
"""

# 启动Docker Compose
# docker-compose up -d

9.3 Node配置与能力定义

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.firefox.options import Options as FirefoxOptions

# ========== 连接到Selenium Grid ==========

# 基本连接
driver = webdriver.Remote(
    command_executor="http://localhost:4444/wd/hub",
    options=Options()
)

# ========== 配置Chrome选项 ==========

chrome_options = Options()
chrome_options.set_capability("browserName", "chrome")
chrome_options.set_capability("platformName", "linux")
chrome_options.set_capability("browserVersion", "latest")

# 设置额外的选项
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")

# 连接Grid
driver = webdriver.Remote(
    command_executor="http://localhost:4444/wd/hub",
    options=chrome_options
)

# ========== 配置Firefox选项 ==========

firefox_options = FirefoxOptions()
firefox_options.set_capability("browserName", "firefox")
firefox_options.set_capability("platformName", "linux")

driver = webdriver.Remote(
    command_executor="http://localhost:4444/wd/hub",
    options=firefox_options
)

# ========== 自定义Capabilities ==========

from selenium.webdriver.common.desired_capabilities import DesiredCapabilities

# 定义自定义能力
capabilities = {
    "browserName": "chrome",
    "browserVersion": "latest",
    "platformName": "linux",
    "goog:chromeOptions": {
        "args": ["--no-sandbox", "--disable-dev-shm-usage"],
        "prefs": {
            "download.default_directory": "/tmp/downloads"
        }
    },
    "se:name": "My Test Session",
    "se:timeZone": "Asia/Shanghai"
}

driver = webdriver.Remote(
    command_executor="http://localhost:4444/wd/hub",
    options=chrome_options
)

# ========== Grid工具类 ==========

class GridHelper:
    """
    Selenium Grid工具类
    """
    
    def __init__(self, hub_url="http://localhost:4444"):
        self.hub_url = hub_url
    
    def create_driver(self, browser="chrome", **kwargs):
        """
        创建远程WebDriver
        
        参数:
            browser: 浏览器类型
            **kwargs: 额外配置
        """
        if browser == "chrome":
            options = Options()
        elif browser == "firefox":
            options = FirefoxOptions()
        else:
            raise ValueError(f"不支持的浏览器: {browser}")
        
        # 设置通用选项
        for key, value in kwargs.items():
            if key == "headless" and value:
                options.add_argument("--headless=new")
            elif key == "window_size":
                options.add_argument(f"--window-size={value[0]},{value[1]}")
            elif key == "version":
                options.set_capability("browserVersion", value)
            elif key == "platform":
                options.set_capability("platformName", value)
        
        return webdriver.Remote(
            command_executor=f"{self.hub_url}/wd/hub",
            options=options
        )
    
    def get_grid_status(self):
        """获取Grid状态"""
        import requests
        response = requests.get(f"{self.hub_url}/status")
        return response.json()
    
    def get_available_nodes(self):
        """获取可用节点"""
        status = self.get_grid_status()
        return status.get("value", {}).get("ready", False)

# 使用示例
grid = GridHelper("http://localhost:4444")
print(f"Grid状态: {grid.get_grid_status()}")
driver = grid.create_driver("chrome", headless=True)

9.4 并行测试实现

import pytest
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from concurrent.futures import ThreadPoolExecutor
import threading

# ========== 使用pytest-xdist进行并行测试 ==========

# 安装: pip install pytest-xdist

# 运行命令: pytest tests/ -n 4  (使用4个进程并行)

# conftest.py配置
"""
import pytest
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

@pytest.fixture(scope="function")
def driver():
    options = Options()
    options.add_argument("--headless=new")
    options.add_argument("--no-sandbox")
    
    driver = webdriver.Remote(
        command_executor="http://localhost:4444/wd/hub",
        options=options
    )
    
    yield driver
    driver.quit()
"""

# ========== 使用ThreadPoolExecutor并行测试 ==========

class ParallelTestRunner:
    """
    并行测试运行器
    """
    
    def __init__(self, hub_url, max_workers=4):
        self.hub_url = hub_url
        self.max_workers = max_workers
        self.results = []
        self.lock = threading.Lock()
    
    def run_test(self, test_func, browser="chrome"):
        """
        运行单个测试
        
        参数:
            test_func: 测试函数
            browser: 浏览器类型
        """
        driver = None
        try:
            # 创建远程驱动
            options = Options()
            options.add_argument("--headless=new")
            driver = webdriver.Remote(
                command_executor=f"{self.hub_url}/wd/hub",
                options=options
            )
            
            # 执行测试
            result = test_func(driver)
            
            with self.lock:
                self.results.append({
                    "test": test_func.__name__,
                    "status": "PASS",
                    "result": result
                })
            
            return result
        
        except Exception as e:
            with self.lock:
                self.results.append({
                    "test": test_func.__name__,
                    "status": "FAIL",
                    "error": str(e)
                })
            raise
        
        finally:
            if driver:
                driver.quit()
    
    def run_tests_parallel(self, test_functions):
        """
        并行运行多个测试
        
        参数:
            test_functions: 测试函数列表
        """
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = [
                executor.submit(self.run_test, func)
                for func in test_functions
            ]
            
            for future in futures:
                future.result()  # 等待所有测试完成
        
        return self.results

# ========== 跨浏览器并行测试 ==========

class CrossBrowserTest:
    """
    跨浏览器测试类
    """
    
    BROWSERS = ["chrome", "firefox", "edge"]
    
    def __init__(self, hub_url):
        self.hub_url = hub_url
    
    def run_on_all_browsers(self, test_func):
        """在所有浏览器上运行测试"""
        results = {}
        
        with ThreadPoolExecutor(max_workers=len(self.BROWSERS)) as executor:
            futures = {
                browser: executor.submit(self._run_test, test_func, browser)
                for browser in self.BROWSERS
            }
            
            for browser, future in futures.items():
                try:
                    results[browser] = future.result()
                except Exception as e:
                    results[browser] = {"error": str(e)}
        
        return results
    
    def _run_test(self, test_func, browser):
        """在指定浏览器上运行测试"""
        from selenium.webdriver.firefox.options import Options as FirefoxOptions
        
        if browser == "chrome":
            options = Options()
        elif browser == "firefox":
            options = FirefoxOptions()
        else:
            options = Options()
        
        options.add_argument("--headless=new")
        
        driver = webdriver.Remote(
            command_executor=f"{self.hub_url}/wd/hub",
            options=options
        )
        
        try:
            return test_func(driver)
        finally:
            driver.quit()

# 使用示例
def sample_test(driver):
    """示例测试函数"""
    driver.get("https://www.example.com")
    assert "Example" in driver.title
    return driver.title

runner = ParallelTestRunner("http://localhost:4444", max_workers=3)
results = runner.run_tests_parallel([sample_test])
print(results)

9.5 Grid监控与管理

import requests
import json
import time

# ========== Grid API监控 ==========

class GridMonitor:
    """
    Selenium Grid监控类
    """
    
    def __init__(self, hub_url="http://localhost:4444"):
        self.hub_url = hub_url
        self.api_url = f"{hub_url}/graphql"
    
    def get_grid_status(self):
        """获取Grid整体状态"""
        response = requests.get(f"{self.hub_url}/status")
        return response.json()
    
    def get_nodes_info(self):
        """获取所有节点信息"""
        query = """
        {
            nodesInfo {
                nodes {
                    id
                    uri
                    status
                    maxSessionCount
                    sessionCount
                    stereotypes
                    activeSessions {
                        id
                        capabilities
                    }
                }
            }
        }
        """
        
        response = requests.post(
            self.api_url,
            json={"query": query}
        )
        return response.json()
    
    def get_sessions_info(self):
        """获取所有会话信息"""
        query = """
        {
            sessionsInfo {
                sessionQueueRequests
                sessions {
                    id
                    capabilities
                    nodeId
                    nodeUri
                    startTime
                }
            }
        }
        """
        
        response = requests.post(
            self.api_url,
            json={"query": query}
        )
        return response.json()
    
    def delete_session(self, session_id):
        """删除指定会话"""
        response = requests.delete(
            f"{self.hub_url}/session/{session_id}"
        )
        return response.status_code == 200
    
    def drain_node(self, node_id):
        """排空节点(停止接收新会话)"""
        query = """
        mutation {
            drainNode(id: "%s") {
                id
                status
            }
        }
        """ % node_id
        
        response = requests.post(
            self.api_url,
            json={"query": query}
        )
        return response.json()
    
    def get_grid_health(self):
        """获取Grid健康状态"""
        status = self.get_grid_status()
        
        return {
            "ready": status.get("value", {}).get("ready", False),
            "message": status.get("value", {}).get("message", ""),
            "nodes": len(status.get("value", {}).get("nodes", []))
        }
    
    def monitor_loop(self, interval=30):
        """
        持续监控循环
        
        参数:
            interval: 监控间隔(秒)
        """
        while True:
            try:
                health = self.get_grid_health()
                print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] "
                      f"Grid状态: {'正常' if health['ready'] else '异常'}")
                
                sessions = self.get_sessions_info()
                active_count = len(sessions.get("data", {})
                                       .get("sessionsInfo", {})
                                       .get("sessions", []))
                print(f"活跃会话数: {active_count}")
                
            except Exception as e:
                print(f"监控异常: {e}")
            
            time.sleep(interval)

# ========== Grid管理工具类 ==========

class GridManager:
    """
    Selenium Grid管理工具类
    """
    
    def __init__(self, hub_url="http://localhost:4444"):
        self.hub_url = hub_url
        self.monitor = GridMonitor(hub_url)
    
    def cleanup_stale_sessions(self, max_age_minutes=30):
        """
        清理过期会话
        
        参数:
            max_age_minutes: 最大会话年龄(分钟)
        """
        sessions = self.monitor.get_sessions_info()
        current_time = time.time()
        
        cleaned = 0
        for session in sessions.get("data", {}).get("sessionsInfo", {}).get("sessions", []):
            start_time = session.get("startTime", 0)
            age_minutes = (current_time - start_time) / 60
            
            if age_minutes > max_age_minutes:
                session_id = session.get("id")
                if self.monitor.delete_session(session_id):
                    cleaned += 1
                    print(f"已清理过期会话: {session_id}")
        
        return cleaned
    
    def get_load_balance_info(self):
        """获取负载均衡信息"""
        nodes = self.monitor.get_nodes_info()
        
        balance_info = []
        for node in nodes.get("data", {}).get("nodesInfo", {}).get("nodes", []):
            balance_info.append({
                "node_id": node.get("id"),
                "max_sessions": node.get("maxSessionCount"),
                "current_sessions": node.get("sessionCount"),
                "utilization": node.get("sessionCount") / node.get("maxSessionCount", 1) * 100
            })
        
        return balance_info
    
    def print_grid_summary(self):
        """打印Grid摘要信息"""
        health = self.monitor.get_grid_health()
        balance = self.get_load_balance_info()
        
        print("=" * 50)
        print("Selenium Grid 状态摘要")
        print("=" * 50)
        print(f"状态: {'正常' if health['ready'] else '异常'}")
        print(f"节点数: {health['nodes']}")
        print("\n节点负载:")
        
        for node in balance:
            print(f"  Node {node['node_id']}: "
                  f"{node['current_sessions']}/{node['max_sessions']} "
                  f"({node['utilization']:.1f}%)")
        
        print("=" * 50)

# 使用示例
manager = GridManager("http://localhost:4444")
manager.print_grid_summary()

第十章:生产环境最佳实践

10.1 测试稳定性优化

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import *
import time
import functools

# ========== 重试机制 ==========

def retry_on_failure(max_retries=3, delay=1, exceptions=(Exception,)):
    """
    失败重试装饰器
    
    参数:
        max_retries: 最大重试次数
        delay: 重试间隔(秒)
        exceptions: 需要重试的异常类型
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_retries - 1:
                        print(f"第 {attempt + 1} 次尝试失败: {e},正在重试...")
                        time.sleep(delay)
            
            raise last_exception
        return wrapper
    return decorator

# 使用示例
@retry_on_failure(max_retries=3, delay=2, exceptions=(TimeoutException, StaleElementReferenceException))
def click_element(driver, locator):
    """带重试的点击操作"""
    element = WebDriverWait(driver, 10).until(
        EC.element_to_be_clickable(locator)
    )
    element.click()

# ========== 显式等待最佳实践 ==========

class SmartWait:
    """
    智能等待类
    """
    
    def __init__(self, driver, timeout=10, poll_frequency=0.5):
        self.driver = driver
        self.timeout = timeout
        self.poll_frequency = poll_frequency
    
    def wait_for_element(self, locator, condition="visible"):
        """
        等待元素满足条件
        
        参数:
            locator: 元素定位器
            condition: 等待条件 (visible/present/clickable)
        """
        wait = WebDriverWait(
            self.driver, 
            self.timeout, 
            poll_frequency=self.poll_frequency
        )
        
        conditions = {
            "visible": EC.visibility_of_element_located,
            "present": EC.presence_of_element_located,
            "clickable": EC.element_to_be_clickable,
            "invisible": EC.invisibility_of_element_located
        }
        
        return wait.until(conditions[condition](locator))
    
    def wait_for_page_load(self):
        """等待页面加载完成"""
        wait = WebDriverWait(self.driver, self.timeout)
        wait.until(
            lambda d: d.execute_script("return document.readyState") == "complete"
        )
    
    def wait_for_ajax_complete(self):
        """等待AJAX请求完成"""
        wait = WebDriverWait(self.driver, self.timeout)
        wait.until(
            lambda d: d.execute_script("return jQuery.active == 0") if 
            d.execute_script("return typeof jQuery != 'undefined'") else True
        )

# ========== 元素操作封装 ==========

class StableElement:
    """
    稳定的元素操作类
    处理常见的元素操作问题
    """
    
    def __init__(self, driver):
        self.driver = driver
        self.wait = SmartWait(driver)
    
    def safe_click(self, locator, max_attempts=3):
        """
        安全点击操作
        处理元素被覆盖、不可点击等问题
        """
        for attempt in range(max_attempts):
            try:
                element = self.wait.wait_for_element(locator, "clickable")
                
                # 滚动到元素可见
                self.driver.execute_script(
                    "arguments[0].scrollIntoView({block: 'center'});",
                    element
                )
                time.sleep(0.3)
                
                # 尝试普通点击
                try:
                    element.click()
                    return True
                except ElementClickInterceptedException:
                    # 使用JavaScript点击
                    self.driver.execute_script("arguments[0].click();", element)
                    return True
                    
            except StaleElementReferenceException:
                if attempt < max_attempts - 1:
                    continue
                raise
        
        return False
    
    def safe_send_keys(self, locator, text, clear_first=True):
        """
        安全输入操作
        """
        element = self.wait.wait_for_element(locator, "visible")
        
        if clear_first:
            element.clear()
            time.sleep(0.1)
        
        element.send_keys(text)
        
        # 验证输入是否成功
        actual_value = element.get_attribute("value")
        if actual_value != text:
            # 使用JavaScript强制设置值
            self.driver.execute_script(
                f"arguments[0].value = '{text}';",
                element
            )
    
    def safe_get_text(self, locator):
        """
        安全获取文本
        处理隐藏元素的文本获取
        """
        element = self.wait.wait_for_element(locator, "present")
        
        try:
            if element.is_displayed():
                return element.text
            else:
                # 获取隐藏元素的文本
                return self.driver.execute_script(
                    "return arguments[0].textContent;",
                    element
                )
        except StaleElementReferenceException:
            element = self.wait.wait_for_element(locator, "present")
            return element.text

# ========== 测试基类 ==========

class BaseTest:
    """
    测试基类
    提供通用的测试方法
    """
    
    @staticmethod
    def handle_stale_element(func):
        """处理StaleElementReferenceException的装饰器"""
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            max_attempts = 3
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except StaleElementReferenceException:
                    if attempt < max_attempts - 1:
                        time.sleep(0.5)
                        continue
                    raise
        return wrapper

10.2 异常处理与日志记录

import logging
import traceback
from datetime import datetime
from functools import wraps

# ========== 日志配置 ==========

def setup_logger(name, log_file, level=logging.INFO):
    """
    配置日志记录器
    
    参数:
        name: 日志记录器名称
        log_file: 日志文件路径
        level: 日志级别
    """
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    handler = logging.FileHandler(log_file, encoding='utf-8')
    handler.setFormatter(formatter)
    
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    
    logger = logging.getLogger(name)
    logger.setLevel(level)
    logger.addHandler(handler)
    logger.addHandler(console_handler)
    
    return logger

# 创建测试日志记录器
test_logger = setup_logger(
    "selenium_test", 
    "logs/test.log",
    logging.DEBUG
)

# ========== 异常处理装饰器 ==========

def log_exception(func):
    """
    异常日志记录装饰器
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            test_logger.error(
                f"函数 {func.__name__} 执行失败\n"
                f"异常类型: {type(e).__name__}\n"
                f"异常信息: {str(e)}\n"
                f"堆栈跟踪:\n{traceback.format_exc()}"
            )
            raise
    return wrapper

# ========== 测试日志工具类 ==========

class TestLogger:
    """
    测试日志工具类
    """
    
    def __init__(self, driver, logger):
        self.driver = driver
        self.logger = logger
    
    def log_step(self, step_name):
        """记录测试步骤"""
        self.logger.info(f"执行步骤: {step_name}")
    
    def log_element_info(self, locator, action):
        """记录元素操作"""
        self.logger.debug(f"元素操作: {action} - {locator}")
    
    def log_assertion(self, condition, message):
        """记录断言结果"""
        if condition:
            self.logger.info(f"断言通过: {message}")
        else:
            self.logger.error(f"断言失败: {message}")
    
    def log_screenshot(self, filename):
        """记录截图"""
        self.driver.save_screenshot(filename)
        self.logger.info(f"截图已保存: {filename}")
    
    def log_page_source(self, filename):
        """记录页面源码"""
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(self.driver.page_source)
        self.logger.info(f"页面源码已保存: {filename}")

# ========== 异常处理工具类 ==========

class ExceptionHandler:
    """
    异常处理工具类
    """
    
    def __init__(self, driver, logger):
        self.driver = driver
        self.logger = logger
    
    def handle_timeout(self, locator, operation):
        """
        处理超时异常
        
        参数:
            locator: 元素定位器
            operation: 操作名称
        """
        self.logger.error(f"操作超时: {operation} - {locator}")
        self.logger.error(f"当前URL: {self.driver.current_url}")
        self.logger.error(f"页面标题: {self.driver.title}")
        
        # 保存诊断信息
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.driver.save_screenshot(f"error_screenshot_{timestamp}.png")
    
    def handle_no_such_element(self, locator):
        """处理元素未找到异常"""
        self.logger.error(f"元素未找到: {locator}")
        self.logger.error(f"页面源码:\n{self.driver.page_source[:1000]}")
    
    def handle_stale_element(self, element_id):
        """处理元素过期异常"""
        self.logger.warning(f"元素已过期: {element_id}")

# ========== 使用示例 ==========

class TestWithLogging:
    """带日志的测试示例"""
    
    def __init__(self, driver):
        self.driver = driver
        self.logger = TestLogger(driver, test_logger)
        self.exception_handler = ExceptionHandler(driver, test_logger)
    
    @log_exception
    def test_login(self):
        """登录测试"""
        self.logger.log_step("打开登录页面")
        self.driver.get("https://www.example.com/login")
        
        self.logger.log_step("输入用户名")
        self.driver.find_element(By.ID, "username").send_keys("test_user")
        
        self.logger.log_step("点击登录")
        self.driver.find_element(By.ID, "login-btn").click()
        
        self.logger.log_assertion(
            "欢迎" in self.driver.page_source,
            "验证登录成功"
        )

10.3 性能优化策略

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import time

# ========== 浏览器启动优化 ==========

def create_optimized_driver():
    """
    创建优化的WebDriver
    """
    options = Options()
    
    # 禁用不必要的功能
    options.add_argument("--disable-extensions")
    options.add_argument("--disable-plugins")
    options.add_argument("--disable-images")  # 禁用图片加载
    options.add_argument("--disable-javascript")  # 如不需要JS可禁用
    options.add_argument("--disable-gpu")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    
    # 无头模式
    options.add_argument("--headless=new")
    
    # 设置缓存
    options.add_argument("--disk-cache-dir=/tmp/chrome_cache")
    
    # 忽略证书错误
    options.add_argument("--ignore-certificate-errors")
    
    # 设置User-Agent
    options.add_argument("--user-agent=Mozilla/5.0")
    
    # 禁用自动化标志
    options.add_experimental_option("excludeSwitches", ["enable-automation"])
    options.add_experimental_option("useAutomationExtension", False)
    
    # 页面加载策略
    options.page_load_strategy = "eager"  # 不等待资源完全加载
    
    return webdriver.Chrome(options=options)

# ========== 页面加载策略 ==========

"""
页面加载策略(page_load_strategy):
1. normal(默认): 等待整个页面加载完成
2. eager: 等待DOMContentLoaded事件,不等待图片等资源
3. none: 不等待任何加载事件
"""

# 使用eager策略
options = Options()
options.page_load_strategy = "eager"

# ========== 元素定位优化 ==========

class OptimizedLocator:
    """
    优化的元素定位类
    """
    
    def __init__(self, driver):
        self.driver = driver
        self._cache = {}
    
    def find_element_cached(self, locator, ttl=60):
        """
        缓存元素定位结果
        
        参数:
            locator: 元素定位器
            ttl: 缓存有效期(秒)
        """
        cache_key = str(locator)
        
        if cache_key in self._cache:
            cached = self._cache[cache_key]
            if time.time() - cached["time"] < ttl:
                try:
                    # 验证元素是否仍然有效
                    cached["element"].is_displayed()
                    return cached["element"]
                except:
                    pass
        
        # 重新查找元素
        element = self.driver.find_element(*locator)
        self._cache[cache_key] = {
            "element": element,
            "time": time.time()
        }
        return element
    
    def find_elements_batch(self, locators):
        """
        批量查找元素
        使用JavaScript一次性查找多个元素
        """
        js_code = """
        var results = {};
        var locators = arguments[0];
        
        for (var key in locators) {
            var loc = locators[key];
            var element = null;
            
            if (loc.type === 'id') {
                element = document.getElementById(loc.value);
            } else if (loc.type === 'css') {
                element = document.querySelector(loc.value);
            }
            
            results[key] = element ? true : false;
        }
        
        return results;
        """
        
        locators_dict = {
            key: {"type": loc[0].lower().replace("by.", ""), "value": loc[1]}
            for key, loc in locators.items()
        }
        
        return self.driver.execute_script(js_code, locators_dict)

# ========== 批量操作优化 ==========

class BatchOperations:
    """
    批量操作类
    减少WebDriver调用次数
    """
    
    def __init__(self, driver):
        self.driver = driver
    
    def fill_form_batch(self, field_values):
        """
        批量填充表单
        
        参数:
            field_values: {定位器: 值} 字典
        """
        js_code = """
        var fields = arguments[0];
        for (var selector in fields) {
            var element = document.querySelector(selector);
            if (element) {
                element.value = fields[selector];
            }
        }
        """
        
        selector_values = {loc[1]: value for loc, value in field_values.items()}
        self.driver.execute_script(js_code, selector_values)
    
    def get_multiple_texts(self, locators):
        """
        批量获取元素文本
        """
        selectors = [loc[1] for loc in locators]
        
        js_code = """
        var selectors = arguments[0];
        return selectors.map(function(s) {
            var el = document.querySelector(s);
            return el ? el.textContent : null;
        });
        """
        
        return self.driver.execute_script(js_code, selectors)

# ========== 连接池管理 ==========

class WebDriverPool:
    """
    WebDriver连接池
    复用浏览器实例
    """
    
    def __init__(self, max_size=5):
        self.max_size = max_size
        self._pool = []
        self._in_use = []
    
    def get_driver(self):
        """获取WebDriver实例"""
        if self._pool:
            driver = self._pool.pop()
            self._in_use.append(driver)
            return driver
        
        if len(self._in_use) < self.max_size:
            driver = create_optimized_driver()
            self._in_use.append(driver)
            return driver
        
        raise RuntimeError("连接池已满")
    
    def return_driver(self, driver):
        """归还WebDriver实例"""
        if driver in self._in_use:
            self._in_use.remove(driver)
            
            # 清理状态
            driver.delete_all_cookies()
            
            self._pool.append(driver)
    
    def close_all(self):
        """关闭所有实例"""
        for driver in self._pool + self._in_use:
            driver.quit()
        
        self._pool.clear()
        self._in_use.clear()

10.4 CI/CD集成

# ========== Jenkins Pipeline配置 ==========

# Jenkinsfile
"""
pipeline {
    agent any
    
    environment {
        SELENIUM_HUB = 'http://selenium-hub:4444'
    }
    
    stages {
        stage('Setup') {
            steps {
                sh 'pip install -r requirements.txt'
            }
        }
        
        stage('Start Grid') {
            steps {
                sh 'docker-compose up -d'
                sh 'sleep 10'  # 等待Grid启动
            }
        }
        
        stage('Run Tests') {
            parallel {
                stage('Chrome Tests') {
                    steps {
                        sh 'pytest tests/ --browser=chrome -n 4 --alluredir=allure-results-chrome'
                    }
                }
                stage('Firefox Tests') {
                    steps {
                        sh 'pytest tests/ --browser=firefox -n 4 --alluredir=allure-results-firefox'
                    }
                }
            }
        }
        
        stage('Generate Report') {
            steps {
                allure includeProperties: false, jdk: '', results: [[path: 'allure-results-chrome']]
            }
        }
        
        stage('Cleanup') {
            steps {
                sh 'docker-compose down'
            }
        }
    }
    
    post {
        always {
            cleanWs()
        }
        failure {
            emailext (
                subject: "测试失败: ${env.JOB_NAME} #${env.BUILD_NUMBER}",
                body: "请查看构建详情: ${env.BUILD_URL}",
                to: 'team@example.com'
            )
        }
    }
}
"""

# ========== GitHub Actions配置 ==========

# .github/workflows/selenium-tests.yml
"""
name: Selenium Tests

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
    
    services:
      selenium-hub:
        image: selenium/hub:4.15.0
        ports:
          - 4442:4442
          - 4443:4443
          - 4444:4444
      
      chrome:
        image: selenium/node-chrome:4.15.0
        env:
          SE_EVENT_BUS_HOST: selenium-hub
          SE_EVENT_BUS_PUBLISH_PORT: 4442
          SE_EVENT_BUS_SUBSCRIBE_PORT: 4443
    
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
    
    - name: Wait for Selenium Grid
      run: |
        sleep 10
        curl --retry 10 --retry-delay 5 --retry-connrefused http://localhost:4444/status
    
    - name: Run tests
      run: |
        pytest tests/ -v --alluredir=allure-results
      env:
        SELENIUM_HUB_URL: http://localhost:4444
    
    - name: Generate Allure Report
      uses: simple-elf/allure-report-action@master
      if: always()
      with:
        allure_results: allure-results
        allure_report: allure-report
    
    - name: Publish Report
      uses: peaceiris/actions-gh-pages@v3
      if: always()
      with:
        github_token: ${{ secrets.GITHUB_TOKEN }}
        publish_dir: allure-report
"""

# ========== pytest配置 ==========

# pytest.ini
"""
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts = 
    -v
    --tb=short
    --strict-markers
    --alluredir=allure-results
markers =
    smoke: 冒烟测试
    regression: 回归测试
    slow: 慢速测试
"""

10.5 安全性最佳实践

import os
import base64
from cryptography.fernet import Fernet

# ========== 敏感数据处理 ==========

class SecureDataManager:
    """
    安全数据管理类
    处理敏感信息如密码、API密钥等
    """
    
    def __init__(self, key=None):
        if key is None:
            key = os.environ.get("ENCRYPTION_KEY", Fernet.generate_key())
        self.cipher = Fernet(key if isinstance(key, bytes) else key.encode())
    
    def encrypt(self, data):
        """加密数据"""
        return self.cipher.encrypt(data.encode()).decode()
    
    def decrypt(self, encrypted_data):
        """解密数据"""
        return self.cipher.decrypt(encrypted_data.encode()).decode()
    
    def get_password(self, env_var):
        """
        从环境变量获取加密密码并解密
        """
        encrypted = os.environ.get(env_var)
        if encrypted:
            return self.decrypt(encrypted)
        return None

# ========== 环境变量管理 ==========

class EnvConfig:
    """
    环境变量配置类
    避免硬编码敏感信息
    """
    
    @staticmethod
    def get(key, default=None):
        """获取环境变量"""
        return os.environ.get(key, default)
    
    @staticmethod
    def get_required(key):
        """获取必需的环境变量"""
        value = os.environ.get(key)
        if value is None:
            raise ValueError(f"必需的环境变量 {key} 未设置")
        return value
    
    @staticmethod
    def get_int(key, default=0):
        """获取整数类型环境变量"""
        value = os.environ.get(key)
        return int(value) if value else default
    
    @staticmethod
    def get_bool(key, default=False):
        """获取布尔类型环境变量"""
        value = os.environ.get(key)
        if value:
            return value.lower() in ('true', '1', 'yes')
        return default

# 使用示例
# 设置环境变量: export TEST_PASSWORD="encrypted_password"
password = EnvConfig.get_required("TEST_PASSWORD")

# ========== 安全的浏览器配置 ==========

def create_secure_driver():
    """
    创建安全的WebDriver配置
    """
    options = Options()
    
    # 禁用密码保存
    options.add_experimental_option("prefs", {
        "credentials_enable_service": False,
        "profile.password_manager_enabled": False
    })
    
    # 禁用浏览器指纹
    options.add_argument("--disable-blink-features=AutomationControlled")
    
    # 清除自动填充数据
    options.add_argument("--disable-autofill")
    
    # 禁用扩展
    options.add_argument("--disable-extensions")
    
    return webdriver.Chrome(options=options)

# ========== 测试数据脱敏 ==========

class DataMasker:
    """
    数据脱敏类
    """
    
    @staticmethod
    def mask_email(email):
        """脱敏邮箱"""
        if '@' not in email:
            return email
        parts = email.split('@')
        username = parts[0]
        if len(username) <= 2:
            return '*' * len(username) + '@' + parts[1]
        return username[0] + '*' * (len(username) - 2) + username[-1] + '@' + parts[1]
    
    @staticmethod
    def mask_phone(phone):
        """脱敏手机号"""
        if len(phone) < 7:
            return '*' * len(phone)
        return phone[:3] + '*' * (len(phone) - 6) + phone[-3:]
    
    @staticmethod
    def mask_id_card(id_card):
        """脱敏身份证号"""
        if len(id_card) < 8:
            return '*' * len(id_card)
        return id_card[:4] + '*' * (len(id_card) - 8) + id_card[-4:]

# ========== 日志脱敏 ==========

class SecureLogger:
    """
    安全日志记录类
    自动脱敏敏感信息
    """
    
    SENSITIVE_KEYS = ['password', 'token', 'secret', 'key', 'credential']
    
    def __init__(self, logger):
        self.logger = logger
    
    def _mask_sensitive(self, message):
        """脱敏敏感信息"""
        import re
        for key in self.SENSITIVE_KEYS:
            pattern = rf'({key}["\s:=]+)(["\']?)([^"\s,}}]+)(["\']?)'
            message = re.sub(pattern, r'\1\2****\4', message, flags=re.IGNORECASE)
        return message
    
    def info(self, message):
        self.logger.info(self._mask_sensitive(message))
    
    def error(self, message):
        self.logger.error(self._mask_sensitive(message))
    
    def debug(self, message):
        self.logger.debug(self._mask_sensitive(message))

# ========== 会话安全 ==========

class SecureSession:
    """
    安全会话管理类
    """
    
    def __init__(self, driver):
        self.driver = driver
    
    def clear_session_data(self):
        """清除会话数据"""
        self.driver.delete_all_cookies()
        self.driver.execute_script("window.localStorage.clear();")
        self.driver.execute_script("window.sessionStorage.clear();")
    
    def logout(self, logout_url):
        """安全退出"""
        self.driver.get(logout_url)
        self.clear_session_data()
    
    def check_https(self):
        """检查是否使用HTTPS"""
        return self.driver.current_url.startswith('https://')

10.6 维护性与可扩展性

# ========== 项目结构建议 ==========

"""
selenium_project/
├── config/
│   ├── config.yaml          # 配置文件
│   └── environments/        # 环境配置
├── pages/
│   ├── base_page.py         # 页面基类
│   ├── login_page.py        # 登录页
│   └── home_page.py         # 首页
├── tests/
│   ├── conftest.py          # pytest配置
│   ├── test_login.py        # 登录测试
│   └── test_search.py       # 搜索测试
├── utils/
│   ├── driver_factory.py    # 驱动工厂
│   ├── wait_helper.py       # 等待工具
│   └── logger.py            # 日志工具
├── data/
│   └── test_data.json       # 测试数据
├── reports/
│   └── allure-results/      # 测试报告
├── requirements.txt         # 依赖
└── pytest.ini              # pytest配置
"""

# ========== 驱动工厂模式 ==========

from abc import ABC, abstractmethod

class DriverFactory(ABC):
    """
    驱动工厂抽象类
    """
    
    @abstractmethod
    def create_driver(self, options):
        pass

class ChromeDriverFactory(DriverFactory):
    """Chrome驱动工厂"""
    
    def create_driver(self, options=None):
        from selenium.webdriver.chrome.service import Service
        from webdriver_manager.chrome import ChromeDriverManager
        
        if options is None:
            options = Options()
        
        return webdriver.Chrome(
            service=Service(ChromeDriverManager().install()),
            options=options
        )

class FirefoxDriverFactory(DriverFactory):
    """Firefox驱动工厂"""
    
    def create_driver(self, options=None):
        from selenium.webdriver.firefox.options import Options as FirefoxOptions
        from selenium.webdriver.firefox.service import Service
        from webdriver_manager.firefox import GeckoDriverManager
        
        if options is None:
            options = FirefoxOptions()
        
        return webdriver.Firefox(
            service=Service(GeckoDriverManager().install()),
            options=options
        )

class RemoteDriverFactory(DriverFactory):
    """远程驱动工厂"""
    
    def __init__(self, hub_url):
        self.hub_url = hub_url
    
    def create_driver(self, options):
        return webdriver.Remote(
            command_executor=f"{self.hub_url}/wd/hub",
            options=options
        )

# ========== 工厂管理器 ==========

class DriverManager:
    """
    驱动管理器
    """
    
    FACTORIES = {
        "chrome": ChromeDriverFactory(),
        "firefox": FirefoxDriverFactory(),
    }
    
    @classmethod
    def register_factory(cls, name, factory):
        """注册工厂"""
        cls.FACTORIES[name] = factory
    
    @classmethod
    def get_driver(cls, browser="chrome", options=None, hub_url=None):
        """
        获取WebDriver实例
        
        参数:
            browser: 浏览器类型
            options: 浏览器选项
            hub_url: 远程Hub地址(可选)
        """
        if hub_url:
            factory = RemoteDriverFactory(hub_url)
        else:
            factory = cls.FACTORIES.get(browser)
            if factory is None:
                raise ValueError(f"不支持的浏览器: {browser}")
        
        return factory.create_driver(options)

# ========== 测试数据管理 ==========

import json
import yaml
from typing import Any, Dict

class TestDataManager:
    """
    测试数据管理类
    """
    
    def __init__(self, data_dir="data"):
        self.data_dir = data_dir
        self._cache = {}
    
    def load_json(self, filename):
        """加载JSON数据"""
        if filename not in self._cache:
            filepath = os.path.join(self.data_dir, filename)
            with open(filepath, 'r', encoding='utf-8') as f:
                self._cache[filename] = json.load(f)
        return self._cache[filename]
    
    def load_yaml(self, filename):
        """加载YAML数据"""
        if filename not in self._cache:
            filepath = os.path.join(self.data_dir, filename)
            with open(filepath, 'r', encoding='utf-8') as f:
                self._cache[filename] = yaml.safe_load(f)
        return self._cache[filename]
    
    def get_test_data(self, test_name):
        """获取特定测试的数据"""
        data = self.load_json("test_data.json")
        return data.get(test_name, {})

# ========== 元素定位器管理 ==========

class LocatorManager:
    """
    元素定位器管理类
    集中管理所有元素定位器
    """
    
    # 登录页定位器
    LOGIN = {
        "username": (By.ID, "username"),
        "password": (By.ID, "password"),
        "login_button": (By.ID, "login-btn"),
        "error_message": (By.ID, "error-msg"),
    }
    
    # 首页定位器
    HOME = {
        "welcome": (By.ID, "welcome"),
        "user_menu": (By.ID, "user-menu"),
        "logout": (By.ID, "logout"),
    }
    
    @classmethod
    def get(cls, page, element):
        """获取定位器"""
        page_locators = getattr(cls, page.upper(), {})
        return page_locators.get(element)

# 使用示例
username_locator = LocatorManager.get("login", "username")

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐