Selenium自动化测试详解3
·
第八章:测试框架集成与设计模式
8.1 Pytest/TestNG框架集成
# ========== Pytest框架集成示例 ==========
import pytest
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
# ========== Fixture配置 ==========
@pytest.fixture(scope="module")
def driver():
"""
模块级别的WebDriver fixture
整个测试模块共享一个浏览器实例
"""
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
driver.maximize_window()
driver.implicitly_wait(10)
yield driver
driver.quit()
@pytest.fixture(scope="function")
def fresh_driver():
"""
函数级别的WebDriver fixture
每个测试函数使用新的浏览器实例
"""
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
driver.maximize_window()
yield driver
driver.quit()
# ========== 测试类示例 ==========
class TestLogin:
"""
登录功能测试类
"""
def test_login_success(self, driver):
"""测试登录成功场景"""
driver.get("https://www.example.com/login")
# 输入用户名和密码
driver.find_element(By.ID, "username").send_keys("test_user")
driver.find_element(By.ID, "password").send_keys("test_pass")
driver.find_element(By.ID, "login-btn").click()
# 验证登录成功
welcome = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "welcome"))
)
assert "欢迎" in welcome.text
def test_login_failed(self, driver):
"""测试登录失败场景"""
driver.get("https://www.example.com/login")
# 输入错误密码
driver.find_element(By.ID, "username").send_keys("test_user")
driver.find_element(By.ID, "password").send_keys("wrong_pass")
driver.find_element(By.ID, "login-btn").click()
# 验证错误提示
error_msg = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "error"))
)
assert "密码错误" in error_msg.text
# ========== 参数化测试 ==========
@pytest.mark.parametrize("username,password,expected", [
("user1", "pass1", "成功"),
("user2", "wrong", "失败"),
("", "pass", "用户名不能为空"),
("user", "", "密码不能为空"),
])
def test_login_scenarios(driver, username, password, expected):
"""
参数化登录测试
"""
driver.get("https://www.example.com/login")
driver.find_element(By.ID, "username").clear()
driver.find_element(By.ID, "username").send_keys(username)
driver.find_element(By.ID, "password").clear()
driver.find_element(By.ID, "password").send_keys(password)
driver.find_element(By.ID, "login-btn").click()
if expected == "成功":
welcome = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "welcome"))
)
assert "欢迎" in welcome.text
else:
error = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "error"))
)
assert expected in error.text
# ========== conftest.py配置文件示例 ==========
# conftest.py
"""
import pytest
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
@pytest.fixture(scope="session")
def browser_type():
return "chrome"
@pytest.fixture(scope="session")
def headless():
return False
@pytest.fixture(scope="function")
def driver(browser_type, headless):
if browser_type == "chrome":
options = webdriver.ChromeOptions()
if headless:
options.add_argument("--headless=new")
driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=options
)
elif browser_type == "firefox":
driver = webdriver.Firefox()
driver.maximize_window()
yield driver
# 失败时截图
import pytest
if hasattr(pytest, "test_failed") and pytest.test_failed:
driver.save_screenshot(f"screenshots/failed_{pytest.current_test}.png")
driver.quit()
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_makereport(item, call):
outcome = yield
report = outcome.get_result()
if call.when == "call":
pytest.test_failed = report.failed
pytest.current_test = item.name
"""
# 运行测试命令
# pytest tests/ -v --html=report.html --self-contained-html
8.2 断言机制与验证方法详解
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
driver = webdriver.Chrome()
driver.get("https://www.example.com")
# ========== 基本断言 ==========
# 断言页面标题
assert "Example" in driver.title, f"标题验证失败: {driver.title}"
# 断言URL
assert "example.com" in driver.current_url, f"URL验证失败: {driver.current_url}"
# 断言元素存在
element = driver.find_element(By.ID, "username")
assert element is not None, "元素不存在"
# 断言元素可见
assert element.is_displayed(), "元素不可见"
# 断言元素可用
assert element.is_enabled(), "元素不可用"
# 断言元素文本
assert "登录" in element.text, f"文本验证失败: {element.text}"
# 断言元素属性
assert element.get_attribute("type") == "text", "属性验证失败"
# ========== 软断言(收集所有错误)==========
class SoftAssert:
"""
软断言类
收集所有断言错误,最后统一抛出
"""
def __init__(self):
self.errors = []
def assert_true(self, condition, message=""):
"""断言条件为True"""
if not condition:
self.errors.append(f"断言失败: {message}")
def assert_false(self, condition, message=""):
"""断言条件为False"""
if condition:
self.errors.append(f"断言失败: {message}")
def assert_equal(self, actual, expected, message=""):
"""断言相等"""
if actual != expected:
self.errors.append(
f"断言失败: {message} - 期望: {expected}, 实际: {actual}"
)
def assert_contains(self, actual, expected, message=""):
"""断言包含"""
if expected not in actual:
self.errors.append(
f"断言失败: {message} - '{expected}' 不在 '{actual}' 中"
)
def assert_all(self):
"""验证所有断言"""
if self.errors:
error_msg = "\n".join(self.errors)
raise AssertionError(f"发现 {len(self.errors)} 个断言错误:\n{error_msg}")
# 使用软断言示例
soft = SoftAssert()
soft.assert_true(driver.find_element(By.ID, "username").is_displayed(), "用户名输入框应可见")
soft.assert_true(driver.find_element(By.ID, "password").is_displayed(), "密码输入框应可见")
soft.assert_contains(driver.title, "Example", "标题应包含Example")
soft.assert_all() # 统一抛出所有错误
# ========== 验证工具类 ==========
class VerifyHelper:
"""
验证工具类
提供常用的验证方法
"""
def __init__(self, driver):
self.driver = driver
def verify_title(self, expected_title):
"""验证页面标题"""
actual = self.driver.title
assert expected_title in actual, f"标题验证失败: 期望包含 '{expected_title}', 实际 '{actual}'"
def verify_url(self, expected_url):
"""验证URL"""
actual = self.driver.current_url
assert expected_url in actual, f"URL验证失败: 期望包含 '{expected_url}', 实际 '{actual}'"
def verify_element_present(self, locator):
"""验证元素存在"""
elements = self.driver.find_elements(*locator)
assert len(elements) > 0, f"元素不存在: {locator}"
def verify_element_visible(self, locator):
"""验证元素可见"""
element = self.driver.find_element(*locator)
assert element.is_displayed(), f"元素不可见: {locator}"
def verify_element_text(self, locator, expected_text):
"""验证元素文本"""
element = self.driver.find_element(*locator)
actual = element.text
assert expected_text in actual, f"文本验证失败: 期望 '{expected_text}', 实际 '{actual}'"
def verify_element_attribute(self, locator, attribute, expected_value):
"""验证元素属性"""
element = self.driver.find_element(*locator)
actual = element.get_attribute(attribute)
assert actual == expected_value, f"属性验证失败: 期望 '{expected_value}', 实际 '{actual}'"
driver.quit()
8.3 Page Object Model设计模式
# ========== Page Object Model设计模式 ==========
"""
Page Object Model (POM) 是一种设计模式
将页面元素定位和操作封装到独立的类中
优点:
1. 代码复用性高
2. 维护成本低
3. 可读性好
4. 测试与页面结构解耦
"""
# ========== 基类Page ==========
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
class BasePage:
"""
页面基类
所有页面类的父类
"""
def __init__(self, driver):
self.driver = driver
self.wait = WebDriverWait(driver, 10)
def find_element(self, locator):
"""查找元素"""
return self.driver.find_element(*locator)
def find_elements(self, locator):
"""查找多个元素"""
return self.driver.find_elements(*locator)
def click(self, locator):
"""点击元素"""
element = self.wait.until(EC.element_to_be_clickable(locator))
element.click()
def enter_text(self, locator, text):
"""输入文本"""
element = self.wait.until(EC.visibility_of_element_located(locator))
element.clear()
element.send_keys(text)
def get_text(self, locator):
"""获取文本"""
element = self.wait.until(EC.visibility_of_element_located(locator))
return element.text
def is_visible(self, locator):
"""检查元素是否可见"""
try:
element = self.wait.until(EC.visibility_of_element_located(locator))
return element.is_displayed()
except:
return False
def wait_for_element(self, locator):
"""等待元素出现"""
return self.wait.until(EC.presence_of_element_located(locator))
def wait_for_element_disappear(self, locator):
"""等待元素消失"""
return self.wait.until(EC.invisibility_of_element_located(locator))
# ========== 登录页Page ==========
class LoginPage(BasePage):
"""
登录页面类
封装登录页面的元素和操作
"""
# 页面元素定位器
USERNAME_INPUT = (By.ID, "username")
PASSWORD_INPUT = (By.ID, "password")
LOGIN_BUTTON = (By.ID, "login-btn")
ERROR_MESSAGE = (By.ID, "error-msg")
FORGOT_PASSWORD_LINK = (By.LINK_TEXT, "忘记密码")
def __init__(self, driver):
super().__init__(driver)
self.url = "https://www.example.com/login"
def open(self):
"""打开登录页面"""
self.driver.get(self.url)
return self
def enter_username(self, username):
"""输入用户名"""
self.enter_text(self.USERNAME_INPUT, username)
return self
def enter_password(self, password):
"""输入密码"""
self.enter_text(self.PASSWORD_INPUT, password)
return self
def click_login(self):
"""点击登录按钮"""
self.click(self.LOGIN_BUTTON)
return self
def get_error_message(self):
"""获取错误消息"""
return self.get_text(self.ERROR_MESSAGE)
def is_error_displayed(self):
"""检查错误消息是否显示"""
return self.is_visible(self.ERROR_MESSAGE)
def login(self, username, password):
"""
执行登录操作
封装完整的登录流程
"""
self.enter_username(username)
self.enter_password(password)
self.click_login()
return HomePage(self.driver)
# ========== 首页Page ==========
class HomePage(BasePage):
"""
首页类
"""
WELCOME_MESSAGE = (By.ID, "welcome")
USER_MENU = (By.ID, "user-menu")
LOGOUT_BUTTON = (By.ID, "logout")
def __init__(self, driver):
super().__init__(driver)
def get_welcome_message(self):
"""获取欢迎消息"""
return self.get_text(self.WELCOME_MESSAGE)
def is_logged_in(self):
"""检查是否已登录"""
return self.is_visible(self.USER_MENU)
def logout(self):
"""退出登录"""
self.click(self.USER_MENU)
self.click(self.LOGOUT_BUTTON)
return LoginPage(self.driver)
# ========== 测试代码 ==========
def test_login_success(driver):
"""使用Page Object的测试"""
login_page = LoginPage(driver)
login_page.open()
home_page = login_page.login("test_user", "test_pass")
assert home_page.is_logged_in()
assert "欢迎" in home_page.get_welcome_message()
8.4 数据驱动测试实现
import pytest
import json
import csv
import yaml
from selenium import webdriver
from selenium.webdriver.common.by import By
# ========== JSON数据驱动 ==========
# test_data.json
"""
[
{"username": "user1", "password": "pass1", "expected": "成功"},
{"username": "user2", "password": "wrong", "expected": "失败"},
{"username": "", "password": "pass", "expected": "用户名不能为空"}
]
"""
def load_json_data(file_path):
"""加载JSON测试数据"""
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
@pytest.mark.parametrize("data", load_json_data("test_data.json"))
def test_login_json(driver, data):
"""JSON数据驱动测试"""
driver.get("https://www.example.com/login")
driver.find_element(By.ID, "username").send_keys(data["username"])
driver.find_element(By.ID, "password").send_keys(data["password"])
driver.find_element(By.ID, "login-btn").click()
if data["expected"] == "成功":
assert "欢迎" in driver.page_source
else:
assert data["expected"] in driver.page_source
# ========== CSV数据驱动 ==========
# test_data.csv
"""
username,password,expected
user1,pass1,成功
user2,wrong,失败
,pass,用户名不能为空
"""
def load_csv_data(file_path):
"""加载CSV测试数据"""
data = []
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
data.append(row)
return data
@pytest.mark.parametrize("data", load_csv_data("test_data.csv"))
def test_login_csv(driver, data):
"""CSV数据驱动测试"""
driver.get("https://www.example.com/login")
driver.find_element(By.ID, "username").send_keys(data["username"])
driver.find_element(By.ID, "password").send_keys(data["password"])
driver.find_element(By.ID, "login-btn").click()
if data["expected"] == "成功":
assert "欢迎" in driver.page_source
else:
assert data["expected"] in driver.page_source
# ========== YAML数据驱动 ==========
# test_data.yaml
"""
test_cases:
- username: user1
password: pass1
expected: 成功
- username: user2
password: wrong
expected: 失败
"""
def load_yaml_data(file_path):
"""加载YAML测试数据"""
with open(file_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
# ========== 数据驱动工具类 ==========
class DataDriver:
"""
数据驱动工具类
支持多种数据源
"""
@staticmethod
def from_json(file_path):
"""从JSON文件加载"""
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
@staticmethod
def from_csv(file_path):
"""从CSV文件加载"""
data = []
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
data.append(row)
return data
@staticmethod
def from_yaml(file_path):
"""从YAML文件加载"""
with open(file_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
@staticmethod
def from_excel(file_path, sheet_name):
"""从Excel文件加载"""
import pandas as pd
df = pd.read_excel(file_path, sheet_name=sheet_name)
return df.to_dict('records')
8.5 测试报告生成
# ========== Pytest HTML报告 ==========
# 安装: pip install pytest-html
# 运行命令生成报告
# pytest tests/ --html=report.html --self-contained-html
# ========== Allure报告 ==========
# 安装: pip install allure-pytest
import pytest
from selenium import webdriver
from selenium.webdriver.common.by import By
import allure
@allure.feature("登录功能")
class TestLogin:
"""
登录功能测试
"""
@pytest.fixture(autouse=True)
def setup(self):
self.driver = webdriver.Chrome()
self.driver.maximize_window()
yield
self.driver.quit()
@allure.story("正常登录")
@allure.severity(allure.severity_level.BLOCKER)
@allure.title("测试用户正常登录")
def test_login_success(self):
"""测试正常登录"""
with allure.step("打开登录页面"):
self.driver.get("https://www.example.com/login")
allure.attach(
self.driver.get_screenshot_as_png(),
"登录页面",
allure.attachment_type.PNG
)
with allure.step("输入用户名和密码"):
self.driver.find_element(By.ID, "username").send_keys("test_user")
self.driver.find_element(By.ID, "password").send_keys("test_pass")
with allure.step("点击登录按钮"):
self.driver.find_element(By.ID, "login-btn").click()
with allure.step("验证登录成功"):
assert "欢迎" in self.driver.page_source
allure.attach(
self.driver.get_screenshot_as_png(),
"登录成功",
allure.attachment_type.PNG
)
# 运行Allure报告
# pytest tests/ --alluredir=allure-results
# allure serve allure-results
# ========== 自定义测试报告工具类 ==========
class TestReporter:
"""
测试报告工具类
"""
def __init__(self, output_dir="reports"):
self.output_dir = output_dir
self.results = []
import os
os.makedirs(output_dir, exist_ok=True)
def add_result(self, test_name, status, message="", screenshot=None):
"""添加测试结果"""
import time
result = {
"test_name": test_name,
"status": status,
"message": message,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"screenshot": screenshot
}
self.results.append(result)
def generate_html_report(self, filename="report.html"):
"""生成HTML报告"""
import time
total = len(self.results)
passed = sum(1 for r in self.results if r["status"] == "PASS")
failed = total - passed
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>测试报告</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.summary {{ background: #f5f5f5; padding: 20px; margin-bottom: 20px; }}
.pass {{ color: green; }}
.fail {{ color: red; }}
table {{ width: 100%; border-collapse: collapse; }}
th, td {{ border: 1px solid #ddd; padding: 10px; text-align: left; }}
th {{ background: #4CAF50; color: white; }}
</style>
</head>
<body>
<h1>自动化测试报告</h1>
<div class="summary">
<p>生成时间: {time.strftime("%Y-%m-%d %H:%M:%S")}</p>
<p>总计: {total} | <span class="pass">通过: {passed}</span> | <span class="fail">失败: {failed}</span></p>
</div>
<table>
<tr>
<th>测试名称</th>
<th>状态</th>
<th>消息</th>
<th>时间</th>
</tr>
"""
for result in self.results:
status_class = "pass" if result["status"] == "PASS" else "fail"
html += f"""
<tr>
<td>{result['test_name']}</td>
<td class="{status_class}">{result['status']}</td>
<td>{result['message']}</td>
<td>{result['timestamp']}</td>
</tr>
"""
html += """
</table>
</body>
</html>
"""
import os
filepath = os.path.join(self.output_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(html)
return filepath
8.6 配置管理与环境切换
import os
import json
import yaml
# ========== 配置文件示例 ==========
# config.yaml
"""
environments:
dev:
base_url: "https://dev.example.com"
username: "dev_user"
password: "dev_pass"
timeout: 10
test:
base_url: "https://test.example.com"
username: "test_user"
password: "test_pass"
timeout: 15
prod:
base_url: "https://www.example.com"
username: "prod_user"
password: "prod_pass"
timeout: 20
browser:
type: chrome
headless: false
window_size:
width: 1920
height: 1080
"""
# ========== 配置管理类 ==========
class ConfigManager:
"""
配置管理类
支持多环境配置切换
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._config = None
return cls._instance
def load_config(self, config_file="config.yaml", env="dev"):
"""
加载配置文件
参数:
config_file: 配置文件路径
env: 环境名称
"""
with open(config_file, 'r', encoding='utf-8') as f:
all_config = yaml.safe_load(f)
self._config = {
"env": env,
**all_config["environments"][env],
"browser": all_config["browser"]
}
return self._config
def get(self, key, default=None):
"""获取配置项"""
if self._config is None:
raise RuntimeError("配置未加载,请先调用load_config()")
return self._config.get(key, default)
@property
def base_url(self):
return self.get("base_url")
@property
def timeout(self):
return self.get("timeout", 10)
@property
def browser_type(self):
return self.get("browser", {}).get("type", "chrome")
@property
def headless(self):
return self.get("browser", {}).get("headless", False)
# ========== 环境切换工具类 ==========
class EnvironmentManager:
"""
环境管理类
"""
ENVIRONMENTS = {
"dev": {
"base_url": "https://dev.example.com",
"api_url": "https://api.dev.example.com"
},
"test": {
"base_url": "https://test.example.com",
"api_url": "https://api.test.example.com"
},
"prod": {
"base_url": "https://www.example.com",
"api_url": "https://api.example.com"
}
}
@classmethod
def set_environment(cls, env_name):
"""设置当前环境"""
if env_name not in cls.ENVIRONMENTS:
raise ValueError(f"未知环境: {env_name}")
os.environ["TEST_ENV"] = env_name
@classmethod
def get_current_env(cls):
"""获取当前环境"""
return os.environ.get("TEST_ENV", "dev")
@classmethod
def get_config(cls):
"""获取当前环境配置"""
env = cls.get_current_env()
return cls.ENVIRONMENTS[env]
# ========== 使用示例 ==========
# 加载配置
config = ConfigManager()
config.load_config(env="test")
print(f"Base URL: {config.base_url}")
print(f"Timeout: {config.timeout}")
# 切换环境
EnvironmentManager.set_environment("prod")
env_config = EnvironmentManager.get_config()
print(f"当前环境配置: {env_config}")
第九章:Selenium Grid与分布式测试
9.1 Selenium Grid架构原理
# ========== Selenium Grid 4 架构组件说明 ==========
"""
Selenium Grid 4 组件架构:
1. Router(路由器)
- 接收所有外部请求
- 将请求路由到正确的组件
- 是Grid的入口点
2. Distributor(分发器)
- 管理Node注册
- 分配测试会话到合适的Node
- 负载均衡
3. Session Queue(会话队列)
- 管理等待中的会话请求
- 先进先出(FIFO)处理
- 支持会话优先级
4. Session Map(会话映射)
- 存储会话ID与Node的映射关系
- 快速定位会话所在Node
5. Node(节点)
- 实际执行测试的机器
- 可以注册多个浏览器
- 支持并发会话
"""
# ========== Grid工作流程 ==========
"""
测试执行流程:
1. 客户端发送WebDriver请求到Router
2. Router将创建会话请求放入Session Queue
3. Distributor从Session Queue获取请求
4. Distributor选择合适的Node
5. Node创建会话并返回Session ID
6. Session Map存储Session ID与Node的映射
7. 后续请求通过Router直接路由到对应Node
"""
9.2 Grid环境搭建与配置
# ========== Selenium Grid 4 安装 ==========
# 方式一:下载独立的JAR文件
# 下载地址: https://www.selenium.dev/downloads/
# selenium-server-4.x.x.jar
# 方式二:使用Docker
docker pull selenium/hub
docker pull selenium/node-chrome
docker pull selenium/node-firefox
# ========== 启动Standalone模式 ==========
# 单机模式(Hub和Node在同一进程)
java -jar selenium-server-4.x.x.jar standalone
# ========== 启动Hub-Node模式 ==========
# 启动Hub
java -jar selenium-server-4.x.x.jar hub
# 启动Node(在另一台机器或同一机器)
java -jar selenium-server-4.x.x.jar node --detect-drivers true
# ========== 使用Docker Compose ==========
# docker-compose.yml
"""
version: "3"
services:
selenium-hub:
image: selenium/hub:4.x.x
container_name: selenium-hub
ports:
- "4442:4442"
- "4443:4443"
- "4444:4444"
environment:
- GRID_MAX_SESSION=10
- GRID_BROWSER_TIMEOUT=300
- GRID_TIMEOUT=300
chrome:
image: selenium/node-chrome:4.x.x
depends_on:
- selenium-hub
environment:
- SE_EVENT_BUS_HOST=selenium-hub
- SE_EVENT_BUS_PUBLISH_PORT=4442
- SE_EVENT_BUS_SUBSCRIBE_PORT=4443
- SE_NODE_MAX_SESSIONS=3
deploy:
replicas: 2
firefox:
image: selenium/node-firefox:4.x.x
depends_on:
- selenium-hub
environment:
- SE_EVENT_BUS_HOST=selenium-hub
- SE_EVENT_BUS_PUBLISH_PORT=4442
- SE_EVENT_BUS_SUBSCRIBE_PORT=4443
- SE_NODE_MAX_SESSIONS=3
deploy:
replicas: 1
"""
# 启动Docker Compose
# docker-compose up -d
9.3 Node配置与能力定义
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.firefox.options import Options as FirefoxOptions
# ========== 连接到Selenium Grid ==========
# 基本连接
driver = webdriver.Remote(
command_executor="http://localhost:4444/wd/hub",
options=Options()
)
# ========== 配置Chrome选项 ==========
chrome_options = Options()
chrome_options.set_capability("browserName", "chrome")
chrome_options.set_capability("platformName", "linux")
chrome_options.set_capability("browserVersion", "latest")
# 设置额外的选项
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
# 连接Grid
driver = webdriver.Remote(
command_executor="http://localhost:4444/wd/hub",
options=chrome_options
)
# ========== 配置Firefox选项 ==========
firefox_options = FirefoxOptions()
firefox_options.set_capability("browserName", "firefox")
firefox_options.set_capability("platformName", "linux")
driver = webdriver.Remote(
command_executor="http://localhost:4444/wd/hub",
options=firefox_options
)
# ========== 自定义Capabilities ==========
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
# 定义自定义能力
capabilities = {
"browserName": "chrome",
"browserVersion": "latest",
"platformName": "linux",
"goog:chromeOptions": {
"args": ["--no-sandbox", "--disable-dev-shm-usage"],
"prefs": {
"download.default_directory": "/tmp/downloads"
}
},
"se:name": "My Test Session",
"se:timeZone": "Asia/Shanghai"
}
driver = webdriver.Remote(
command_executor="http://localhost:4444/wd/hub",
options=chrome_options
)
# ========== Grid工具类 ==========
class GridHelper:
"""
Selenium Grid工具类
"""
def __init__(self, hub_url="http://localhost:4444"):
self.hub_url = hub_url
def create_driver(self, browser="chrome", **kwargs):
"""
创建远程WebDriver
参数:
browser: 浏览器类型
**kwargs: 额外配置
"""
if browser == "chrome":
options = Options()
elif browser == "firefox":
options = FirefoxOptions()
else:
raise ValueError(f"不支持的浏览器: {browser}")
# 设置通用选项
for key, value in kwargs.items():
if key == "headless" and value:
options.add_argument("--headless=new")
elif key == "window_size":
options.add_argument(f"--window-size={value[0]},{value[1]}")
elif key == "version":
options.set_capability("browserVersion", value)
elif key == "platform":
options.set_capability("platformName", value)
return webdriver.Remote(
command_executor=f"{self.hub_url}/wd/hub",
options=options
)
def get_grid_status(self):
"""获取Grid状态"""
import requests
response = requests.get(f"{self.hub_url}/status")
return response.json()
def get_available_nodes(self):
"""获取可用节点"""
status = self.get_grid_status()
return status.get("value", {}).get("ready", False)
# 使用示例
grid = GridHelper("http://localhost:4444")
print(f"Grid状态: {grid.get_grid_status()}")
driver = grid.create_driver("chrome", headless=True)
9.4 并行测试实现
import pytest
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from concurrent.futures import ThreadPoolExecutor
import threading
# ========== 使用pytest-xdist进行并行测试 ==========
# 安装: pip install pytest-xdist
# 运行命令: pytest tests/ -n 4 (使用4个进程并行)
# conftest.py配置
"""
import pytest
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
@pytest.fixture(scope="function")
def driver():
options = Options()
options.add_argument("--headless=new")
options.add_argument("--no-sandbox")
driver = webdriver.Remote(
command_executor="http://localhost:4444/wd/hub",
options=options
)
yield driver
driver.quit()
"""
# ========== 使用ThreadPoolExecutor并行测试 ==========
class ParallelTestRunner:
"""
并行测试运行器
"""
def __init__(self, hub_url, max_workers=4):
self.hub_url = hub_url
self.max_workers = max_workers
self.results = []
self.lock = threading.Lock()
def run_test(self, test_func, browser="chrome"):
"""
运行单个测试
参数:
test_func: 测试函数
browser: 浏览器类型
"""
driver = None
try:
# 创建远程驱动
options = Options()
options.add_argument("--headless=new")
driver = webdriver.Remote(
command_executor=f"{self.hub_url}/wd/hub",
options=options
)
# 执行测试
result = test_func(driver)
with self.lock:
self.results.append({
"test": test_func.__name__,
"status": "PASS",
"result": result
})
return result
except Exception as e:
with self.lock:
self.results.append({
"test": test_func.__name__,
"status": "FAIL",
"error": str(e)
})
raise
finally:
if driver:
driver.quit()
def run_tests_parallel(self, test_functions):
"""
并行运行多个测试
参数:
test_functions: 测试函数列表
"""
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [
executor.submit(self.run_test, func)
for func in test_functions
]
for future in futures:
future.result() # 等待所有测试完成
return self.results
# ========== 跨浏览器并行测试 ==========
class CrossBrowserTest:
"""
跨浏览器测试类
"""
BROWSERS = ["chrome", "firefox", "edge"]
def __init__(self, hub_url):
self.hub_url = hub_url
def run_on_all_browsers(self, test_func):
"""在所有浏览器上运行测试"""
results = {}
with ThreadPoolExecutor(max_workers=len(self.BROWSERS)) as executor:
futures = {
browser: executor.submit(self._run_test, test_func, browser)
for browser in self.BROWSERS
}
for browser, future in futures.items():
try:
results[browser] = future.result()
except Exception as e:
results[browser] = {"error": str(e)}
return results
def _run_test(self, test_func, browser):
"""在指定浏览器上运行测试"""
from selenium.webdriver.firefox.options import Options as FirefoxOptions
if browser == "chrome":
options = Options()
elif browser == "firefox":
options = FirefoxOptions()
else:
options = Options()
options.add_argument("--headless=new")
driver = webdriver.Remote(
command_executor=f"{self.hub_url}/wd/hub",
options=options
)
try:
return test_func(driver)
finally:
driver.quit()
# 使用示例
def sample_test(driver):
"""示例测试函数"""
driver.get("https://www.example.com")
assert "Example" in driver.title
return driver.title
runner = ParallelTestRunner("http://localhost:4444", max_workers=3)
results = runner.run_tests_parallel([sample_test])
print(results)
9.5 Grid监控与管理
import requests
import json
import time
# ========== Grid API监控 ==========
class GridMonitor:
"""
Selenium Grid监控类
"""
def __init__(self, hub_url="http://localhost:4444"):
self.hub_url = hub_url
self.api_url = f"{hub_url}/graphql"
def get_grid_status(self):
"""获取Grid整体状态"""
response = requests.get(f"{self.hub_url}/status")
return response.json()
def get_nodes_info(self):
"""获取所有节点信息"""
query = """
{
nodesInfo {
nodes {
id
uri
status
maxSessionCount
sessionCount
stereotypes
activeSessions {
id
capabilities
}
}
}
}
"""
response = requests.post(
self.api_url,
json={"query": query}
)
return response.json()
def get_sessions_info(self):
"""获取所有会话信息"""
query = """
{
sessionsInfo {
sessionQueueRequests
sessions {
id
capabilities
nodeId
nodeUri
startTime
}
}
}
"""
response = requests.post(
self.api_url,
json={"query": query}
)
return response.json()
def delete_session(self, session_id):
"""删除指定会话"""
response = requests.delete(
f"{self.hub_url}/session/{session_id}"
)
return response.status_code == 200
def drain_node(self, node_id):
"""排空节点(停止接收新会话)"""
query = """
mutation {
drainNode(id: "%s") {
id
status
}
}
""" % node_id
response = requests.post(
self.api_url,
json={"query": query}
)
return response.json()
def get_grid_health(self):
"""获取Grid健康状态"""
status = self.get_grid_status()
return {
"ready": status.get("value", {}).get("ready", False),
"message": status.get("value", {}).get("message", ""),
"nodes": len(status.get("value", {}).get("nodes", []))
}
def monitor_loop(self, interval=30):
"""
持续监控循环
参数:
interval: 监控间隔(秒)
"""
while True:
try:
health = self.get_grid_health()
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] "
f"Grid状态: {'正常' if health['ready'] else '异常'}")
sessions = self.get_sessions_info()
active_count = len(sessions.get("data", {})
.get("sessionsInfo", {})
.get("sessions", []))
print(f"活跃会话数: {active_count}")
except Exception as e:
print(f"监控异常: {e}")
time.sleep(interval)
# ========== Grid管理工具类 ==========
class GridManager:
"""
Selenium Grid管理工具类
"""
def __init__(self, hub_url="http://localhost:4444"):
self.hub_url = hub_url
self.monitor = GridMonitor(hub_url)
def cleanup_stale_sessions(self, max_age_minutes=30):
"""
清理过期会话
参数:
max_age_minutes: 最大会话年龄(分钟)
"""
sessions = self.monitor.get_sessions_info()
current_time = time.time()
cleaned = 0
for session in sessions.get("data", {}).get("sessionsInfo", {}).get("sessions", []):
start_time = session.get("startTime", 0)
age_minutes = (current_time - start_time) / 60
if age_minutes > max_age_minutes:
session_id = session.get("id")
if self.monitor.delete_session(session_id):
cleaned += 1
print(f"已清理过期会话: {session_id}")
return cleaned
def get_load_balance_info(self):
"""获取负载均衡信息"""
nodes = self.monitor.get_nodes_info()
balance_info = []
for node in nodes.get("data", {}).get("nodesInfo", {}).get("nodes", []):
balance_info.append({
"node_id": node.get("id"),
"max_sessions": node.get("maxSessionCount"),
"current_sessions": node.get("sessionCount"),
"utilization": node.get("sessionCount") / node.get("maxSessionCount", 1) * 100
})
return balance_info
def print_grid_summary(self):
"""打印Grid摘要信息"""
health = self.monitor.get_grid_health()
balance = self.get_load_balance_info()
print("=" * 50)
print("Selenium Grid 状态摘要")
print("=" * 50)
print(f"状态: {'正常' if health['ready'] else '异常'}")
print(f"节点数: {health['nodes']}")
print("\n节点负载:")
for node in balance:
print(f" Node {node['node_id']}: "
f"{node['current_sessions']}/{node['max_sessions']} "
f"({node['utilization']:.1f}%)")
print("=" * 50)
# 使用示例
manager = GridManager("http://localhost:4444")
manager.print_grid_summary()
第十章:生产环境最佳实践
10.1 测试稳定性优化
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import *
import time
import functools
# ========== 重试机制 ==========
def retry_on_failure(max_retries=3, delay=1, exceptions=(Exception,)):
"""
失败重试装饰器
参数:
max_retries: 最大重试次数
delay: 重试间隔(秒)
exceptions: 需要重试的异常类型
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries - 1:
print(f"第 {attempt + 1} 次尝试失败: {e},正在重试...")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# 使用示例
@retry_on_failure(max_retries=3, delay=2, exceptions=(TimeoutException, StaleElementReferenceException))
def click_element(driver, locator):
"""带重试的点击操作"""
element = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable(locator)
)
element.click()
# ========== 显式等待最佳实践 ==========
class SmartWait:
"""
智能等待类
"""
def __init__(self, driver, timeout=10, poll_frequency=0.5):
self.driver = driver
self.timeout = timeout
self.poll_frequency = poll_frequency
def wait_for_element(self, locator, condition="visible"):
"""
等待元素满足条件
参数:
locator: 元素定位器
condition: 等待条件 (visible/present/clickable)
"""
wait = WebDriverWait(
self.driver,
self.timeout,
poll_frequency=self.poll_frequency
)
conditions = {
"visible": EC.visibility_of_element_located,
"present": EC.presence_of_element_located,
"clickable": EC.element_to_be_clickable,
"invisible": EC.invisibility_of_element_located
}
return wait.until(conditions[condition](locator))
def wait_for_page_load(self):
"""等待页面加载完成"""
wait = WebDriverWait(self.driver, self.timeout)
wait.until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
def wait_for_ajax_complete(self):
"""等待AJAX请求完成"""
wait = WebDriverWait(self.driver, self.timeout)
wait.until(
lambda d: d.execute_script("return jQuery.active == 0") if
d.execute_script("return typeof jQuery != 'undefined'") else True
)
# ========== 元素操作封装 ==========
class StableElement:
"""
稳定的元素操作类
处理常见的元素操作问题
"""
def __init__(self, driver):
self.driver = driver
self.wait = SmartWait(driver)
def safe_click(self, locator, max_attempts=3):
"""
安全点击操作
处理元素被覆盖、不可点击等问题
"""
for attempt in range(max_attempts):
try:
element = self.wait.wait_for_element(locator, "clickable")
# 滚动到元素可见
self.driver.execute_script(
"arguments[0].scrollIntoView({block: 'center'});",
element
)
time.sleep(0.3)
# 尝试普通点击
try:
element.click()
return True
except ElementClickInterceptedException:
# 使用JavaScript点击
self.driver.execute_script("arguments[0].click();", element)
return True
except StaleElementReferenceException:
if attempt < max_attempts - 1:
continue
raise
return False
def safe_send_keys(self, locator, text, clear_first=True):
"""
安全输入操作
"""
element = self.wait.wait_for_element(locator, "visible")
if clear_first:
element.clear()
time.sleep(0.1)
element.send_keys(text)
# 验证输入是否成功
actual_value = element.get_attribute("value")
if actual_value != text:
# 使用JavaScript强制设置值
self.driver.execute_script(
f"arguments[0].value = '{text}';",
element
)
def safe_get_text(self, locator):
"""
安全获取文本
处理隐藏元素的文本获取
"""
element = self.wait.wait_for_element(locator, "present")
try:
if element.is_displayed():
return element.text
else:
# 获取隐藏元素的文本
return self.driver.execute_script(
"return arguments[0].textContent;",
element
)
except StaleElementReferenceException:
element = self.wait.wait_for_element(locator, "present")
return element.text
# ========== 测试基类 ==========
class BaseTest:
"""
测试基类
提供通用的测试方法
"""
@staticmethod
def handle_stale_element(func):
"""处理StaleElementReferenceException的装饰器"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
max_attempts = 3
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except StaleElementReferenceException:
if attempt < max_attempts - 1:
time.sleep(0.5)
continue
raise
return wrapper
10.2 异常处理与日志记录
import logging
import traceback
from datetime import datetime
from functools import wraps
# ========== 日志配置 ==========
def setup_logger(name, log_file, level=logging.INFO):
"""
配置日志记录器
参数:
name: 日志记录器名称
log_file: 日志文件路径
level: 日志级别
"""
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler = logging.FileHandler(log_file, encoding='utf-8')
handler.setFormatter(formatter)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(handler)
logger.addHandler(console_handler)
return logger
# 创建测试日志记录器
test_logger = setup_logger(
"selenium_test",
"logs/test.log",
logging.DEBUG
)
# ========== 异常处理装饰器 ==========
def log_exception(func):
"""
异常日志记录装饰器
"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
test_logger.error(
f"函数 {func.__name__} 执行失败\n"
f"异常类型: {type(e).__name__}\n"
f"异常信息: {str(e)}\n"
f"堆栈跟踪:\n{traceback.format_exc()}"
)
raise
return wrapper
# ========== 测试日志工具类 ==========
class TestLogger:
"""
测试日志工具类
"""
def __init__(self, driver, logger):
self.driver = driver
self.logger = logger
def log_step(self, step_name):
"""记录测试步骤"""
self.logger.info(f"执行步骤: {step_name}")
def log_element_info(self, locator, action):
"""记录元素操作"""
self.logger.debug(f"元素操作: {action} - {locator}")
def log_assertion(self, condition, message):
"""记录断言结果"""
if condition:
self.logger.info(f"断言通过: {message}")
else:
self.logger.error(f"断言失败: {message}")
def log_screenshot(self, filename):
"""记录截图"""
self.driver.save_screenshot(filename)
self.logger.info(f"截图已保存: {filename}")
def log_page_source(self, filename):
"""记录页面源码"""
with open(filename, 'w', encoding='utf-8') as f:
f.write(self.driver.page_source)
self.logger.info(f"页面源码已保存: {filename}")
# ========== 异常处理工具类 ==========
class ExceptionHandler:
"""
异常处理工具类
"""
def __init__(self, driver, logger):
self.driver = driver
self.logger = logger
def handle_timeout(self, locator, operation):
"""
处理超时异常
参数:
locator: 元素定位器
operation: 操作名称
"""
self.logger.error(f"操作超时: {operation} - {locator}")
self.logger.error(f"当前URL: {self.driver.current_url}")
self.logger.error(f"页面标题: {self.driver.title}")
# 保存诊断信息
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.driver.save_screenshot(f"error_screenshot_{timestamp}.png")
def handle_no_such_element(self, locator):
"""处理元素未找到异常"""
self.logger.error(f"元素未找到: {locator}")
self.logger.error(f"页面源码:\n{self.driver.page_source[:1000]}")
def handle_stale_element(self, element_id):
"""处理元素过期异常"""
self.logger.warning(f"元素已过期: {element_id}")
# ========== 使用示例 ==========
class TestWithLogging:
"""带日志的测试示例"""
def __init__(self, driver):
self.driver = driver
self.logger = TestLogger(driver, test_logger)
self.exception_handler = ExceptionHandler(driver, test_logger)
@log_exception
def test_login(self):
"""登录测试"""
self.logger.log_step("打开登录页面")
self.driver.get("https://www.example.com/login")
self.logger.log_step("输入用户名")
self.driver.find_element(By.ID, "username").send_keys("test_user")
self.logger.log_step("点击登录")
self.driver.find_element(By.ID, "login-btn").click()
self.logger.log_assertion(
"欢迎" in self.driver.page_source,
"验证登录成功"
)
10.3 性能优化策略
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import time
# ========== 浏览器启动优化 ==========
def create_optimized_driver():
"""
创建优化的WebDriver
"""
options = Options()
# 禁用不必要的功能
options.add_argument("--disable-extensions")
options.add_argument("--disable-plugins")
options.add_argument("--disable-images") # 禁用图片加载
options.add_argument("--disable-javascript") # 如不需要JS可禁用
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
# 无头模式
options.add_argument("--headless=new")
# 设置缓存
options.add_argument("--disk-cache-dir=/tmp/chrome_cache")
# 忽略证书错误
options.add_argument("--ignore-certificate-errors")
# 设置User-Agent
options.add_argument("--user-agent=Mozilla/5.0")
# 禁用自动化标志
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option("useAutomationExtension", False)
# 页面加载策略
options.page_load_strategy = "eager" # 不等待资源完全加载
return webdriver.Chrome(options=options)
# ========== 页面加载策略 ==========
"""
页面加载策略(page_load_strategy):
1. normal(默认): 等待整个页面加载完成
2. eager: 等待DOMContentLoaded事件,不等待图片等资源
3. none: 不等待任何加载事件
"""
# 使用eager策略
options = Options()
options.page_load_strategy = "eager"
# ========== 元素定位优化 ==========
class OptimizedLocator:
"""
优化的元素定位类
"""
def __init__(self, driver):
self.driver = driver
self._cache = {}
def find_element_cached(self, locator, ttl=60):
"""
缓存元素定位结果
参数:
locator: 元素定位器
ttl: 缓存有效期(秒)
"""
cache_key = str(locator)
if cache_key in self._cache:
cached = self._cache[cache_key]
if time.time() - cached["time"] < ttl:
try:
# 验证元素是否仍然有效
cached["element"].is_displayed()
return cached["element"]
except:
pass
# 重新查找元素
element = self.driver.find_element(*locator)
self._cache[cache_key] = {
"element": element,
"time": time.time()
}
return element
def find_elements_batch(self, locators):
"""
批量查找元素
使用JavaScript一次性查找多个元素
"""
js_code = """
var results = {};
var locators = arguments[0];
for (var key in locators) {
var loc = locators[key];
var element = null;
if (loc.type === 'id') {
element = document.getElementById(loc.value);
} else if (loc.type === 'css') {
element = document.querySelector(loc.value);
}
results[key] = element ? true : false;
}
return results;
"""
locators_dict = {
key: {"type": loc[0].lower().replace("by.", ""), "value": loc[1]}
for key, loc in locators.items()
}
return self.driver.execute_script(js_code, locators_dict)
# ========== 批量操作优化 ==========
class BatchOperations:
"""
批量操作类
减少WebDriver调用次数
"""
def __init__(self, driver):
self.driver = driver
def fill_form_batch(self, field_values):
"""
批量填充表单
参数:
field_values: {定位器: 值} 字典
"""
js_code = """
var fields = arguments[0];
for (var selector in fields) {
var element = document.querySelector(selector);
if (element) {
element.value = fields[selector];
}
}
"""
selector_values = {loc[1]: value for loc, value in field_values.items()}
self.driver.execute_script(js_code, selector_values)
def get_multiple_texts(self, locators):
"""
批量获取元素文本
"""
selectors = [loc[1] for loc in locators]
js_code = """
var selectors = arguments[0];
return selectors.map(function(s) {
var el = document.querySelector(s);
return el ? el.textContent : null;
});
"""
return self.driver.execute_script(js_code, selectors)
# ========== 连接池管理 ==========
class WebDriverPool:
"""
WebDriver连接池
复用浏览器实例
"""
def __init__(self, max_size=5):
self.max_size = max_size
self._pool = []
self._in_use = []
def get_driver(self):
"""获取WebDriver实例"""
if self._pool:
driver = self._pool.pop()
self._in_use.append(driver)
return driver
if len(self._in_use) < self.max_size:
driver = create_optimized_driver()
self._in_use.append(driver)
return driver
raise RuntimeError("连接池已满")
def return_driver(self, driver):
"""归还WebDriver实例"""
if driver in self._in_use:
self._in_use.remove(driver)
# 清理状态
driver.delete_all_cookies()
self._pool.append(driver)
def close_all(self):
"""关闭所有实例"""
for driver in self._pool + self._in_use:
driver.quit()
self._pool.clear()
self._in_use.clear()
10.4 CI/CD集成
# ========== Jenkins Pipeline配置 ==========
# Jenkinsfile
"""
pipeline {
agent any
environment {
SELENIUM_HUB = 'http://selenium-hub:4444'
}
stages {
stage('Setup') {
steps {
sh 'pip install -r requirements.txt'
}
}
stage('Start Grid') {
steps {
sh 'docker-compose up -d'
sh 'sleep 10' # 等待Grid启动
}
}
stage('Run Tests') {
parallel {
stage('Chrome Tests') {
steps {
sh 'pytest tests/ --browser=chrome -n 4 --alluredir=allure-results-chrome'
}
}
stage('Firefox Tests') {
steps {
sh 'pytest tests/ --browser=firefox -n 4 --alluredir=allure-results-firefox'
}
}
}
}
stage('Generate Report') {
steps {
allure includeProperties: false, jdk: '', results: [[path: 'allure-results-chrome']]
}
}
stage('Cleanup') {
steps {
sh 'docker-compose down'
}
}
}
post {
always {
cleanWs()
}
failure {
emailext (
subject: "测试失败: ${env.JOB_NAME} #${env.BUILD_NUMBER}",
body: "请查看构建详情: ${env.BUILD_URL}",
to: 'team@example.com'
)
}
}
}
"""
# ========== GitHub Actions配置 ==========
# .github/workflows/selenium-tests.yml
"""
name: Selenium Tests
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
services:
selenium-hub:
image: selenium/hub:4.15.0
ports:
- 4442:4442
- 4443:4443
- 4444:4444
chrome:
image: selenium/node-chrome:4.15.0
env:
SE_EVENT_BUS_HOST: selenium-hub
SE_EVENT_BUS_PUBLISH_PORT: 4442
SE_EVENT_BUS_SUBSCRIBE_PORT: 4443
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Wait for Selenium Grid
run: |
sleep 10
curl --retry 10 --retry-delay 5 --retry-connrefused http://localhost:4444/status
- name: Run tests
run: |
pytest tests/ -v --alluredir=allure-results
env:
SELENIUM_HUB_URL: http://localhost:4444
- name: Generate Allure Report
uses: simple-elf/allure-report-action@master
if: always()
with:
allure_results: allure-results
allure_report: allure-report
- name: Publish Report
uses: peaceiris/actions-gh-pages@v3
if: always()
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
publish_dir: allure-report
"""
# ========== pytest配置 ==========
# pytest.ini
"""
[pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
-v
--tb=short
--strict-markers
--alluredir=allure-results
markers =
smoke: 冒烟测试
regression: 回归测试
slow: 慢速测试
"""
10.5 安全性最佳实践
import os
import base64
from cryptography.fernet import Fernet
# ========== 敏感数据处理 ==========
class SecureDataManager:
"""
安全数据管理类
处理敏感信息如密码、API密钥等
"""
def __init__(self, key=None):
if key is None:
key = os.environ.get("ENCRYPTION_KEY", Fernet.generate_key())
self.cipher = Fernet(key if isinstance(key, bytes) else key.encode())
def encrypt(self, data):
"""加密数据"""
return self.cipher.encrypt(data.encode()).decode()
def decrypt(self, encrypted_data):
"""解密数据"""
return self.cipher.decrypt(encrypted_data.encode()).decode()
def get_password(self, env_var):
"""
从环境变量获取加密密码并解密
"""
encrypted = os.environ.get(env_var)
if encrypted:
return self.decrypt(encrypted)
return None
# ========== 环境变量管理 ==========
class EnvConfig:
"""
环境变量配置类
避免硬编码敏感信息
"""
@staticmethod
def get(key, default=None):
"""获取环境变量"""
return os.environ.get(key, default)
@staticmethod
def get_required(key):
"""获取必需的环境变量"""
value = os.environ.get(key)
if value is None:
raise ValueError(f"必需的环境变量 {key} 未设置")
return value
@staticmethod
def get_int(key, default=0):
"""获取整数类型环境变量"""
value = os.environ.get(key)
return int(value) if value else default
@staticmethod
def get_bool(key, default=False):
"""获取布尔类型环境变量"""
value = os.environ.get(key)
if value:
return value.lower() in ('true', '1', 'yes')
return default
# 使用示例
# 设置环境变量: export TEST_PASSWORD="encrypted_password"
password = EnvConfig.get_required("TEST_PASSWORD")
# ========== 安全的浏览器配置 ==========
def create_secure_driver():
"""
创建安全的WebDriver配置
"""
options = Options()
# 禁用密码保存
options.add_experimental_option("prefs", {
"credentials_enable_service": False,
"profile.password_manager_enabled": False
})
# 禁用浏览器指纹
options.add_argument("--disable-blink-features=AutomationControlled")
# 清除自动填充数据
options.add_argument("--disable-autofill")
# 禁用扩展
options.add_argument("--disable-extensions")
return webdriver.Chrome(options=options)
# ========== 测试数据脱敏 ==========
class DataMasker:
"""
数据脱敏类
"""
@staticmethod
def mask_email(email):
"""脱敏邮箱"""
if '@' not in email:
return email
parts = email.split('@')
username = parts[0]
if len(username) <= 2:
return '*' * len(username) + '@' + parts[1]
return username[0] + '*' * (len(username) - 2) + username[-1] + '@' + parts[1]
@staticmethod
def mask_phone(phone):
"""脱敏手机号"""
if len(phone) < 7:
return '*' * len(phone)
return phone[:3] + '*' * (len(phone) - 6) + phone[-3:]
@staticmethod
def mask_id_card(id_card):
"""脱敏身份证号"""
if len(id_card) < 8:
return '*' * len(id_card)
return id_card[:4] + '*' * (len(id_card) - 8) + id_card[-4:]
# ========== 日志脱敏 ==========
class SecureLogger:
"""
安全日志记录类
自动脱敏敏感信息
"""
SENSITIVE_KEYS = ['password', 'token', 'secret', 'key', 'credential']
def __init__(self, logger):
self.logger = logger
def _mask_sensitive(self, message):
"""脱敏敏感信息"""
import re
for key in self.SENSITIVE_KEYS:
pattern = rf'({key}["\s:=]+)(["\']?)([^"\s,}}]+)(["\']?)'
message = re.sub(pattern, r'\1\2****\4', message, flags=re.IGNORECASE)
return message
def info(self, message):
self.logger.info(self._mask_sensitive(message))
def error(self, message):
self.logger.error(self._mask_sensitive(message))
def debug(self, message):
self.logger.debug(self._mask_sensitive(message))
# ========== 会话安全 ==========
class SecureSession:
"""
安全会话管理类
"""
def __init__(self, driver):
self.driver = driver
def clear_session_data(self):
"""清除会话数据"""
self.driver.delete_all_cookies()
self.driver.execute_script("window.localStorage.clear();")
self.driver.execute_script("window.sessionStorage.clear();")
def logout(self, logout_url):
"""安全退出"""
self.driver.get(logout_url)
self.clear_session_data()
def check_https(self):
"""检查是否使用HTTPS"""
return self.driver.current_url.startswith('https://')
10.6 维护性与可扩展性
# ========== 项目结构建议 ==========
"""
selenium_project/
├── config/
│ ├── config.yaml # 配置文件
│ └── environments/ # 环境配置
├── pages/
│ ├── base_page.py # 页面基类
│ ├── login_page.py # 登录页
│ └── home_page.py # 首页
├── tests/
│ ├── conftest.py # pytest配置
│ ├── test_login.py # 登录测试
│ └── test_search.py # 搜索测试
├── utils/
│ ├── driver_factory.py # 驱动工厂
│ ├── wait_helper.py # 等待工具
│ └── logger.py # 日志工具
├── data/
│ └── test_data.json # 测试数据
├── reports/
│ └── allure-results/ # 测试报告
├── requirements.txt # 依赖
└── pytest.ini # pytest配置
"""
# ========== 驱动工厂模式 ==========
from abc import ABC, abstractmethod
class DriverFactory(ABC):
"""
驱动工厂抽象类
"""
@abstractmethod
def create_driver(self, options):
pass
class ChromeDriverFactory(DriverFactory):
"""Chrome驱动工厂"""
def create_driver(self, options=None):
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
if options is None:
options = Options()
return webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=options
)
class FirefoxDriverFactory(DriverFactory):
"""Firefox驱动工厂"""
def create_driver(self, options=None):
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.firefox.service import Service
from webdriver_manager.firefox import GeckoDriverManager
if options is None:
options = FirefoxOptions()
return webdriver.Firefox(
service=Service(GeckoDriverManager().install()),
options=options
)
class RemoteDriverFactory(DriverFactory):
"""远程驱动工厂"""
def __init__(self, hub_url):
self.hub_url = hub_url
def create_driver(self, options):
return webdriver.Remote(
command_executor=f"{self.hub_url}/wd/hub",
options=options
)
# ========== 工厂管理器 ==========
class DriverManager:
"""
驱动管理器
"""
FACTORIES = {
"chrome": ChromeDriverFactory(),
"firefox": FirefoxDriverFactory(),
}
@classmethod
def register_factory(cls, name, factory):
"""注册工厂"""
cls.FACTORIES[name] = factory
@classmethod
def get_driver(cls, browser="chrome", options=None, hub_url=None):
"""
获取WebDriver实例
参数:
browser: 浏览器类型
options: 浏览器选项
hub_url: 远程Hub地址(可选)
"""
if hub_url:
factory = RemoteDriverFactory(hub_url)
else:
factory = cls.FACTORIES.get(browser)
if factory is None:
raise ValueError(f"不支持的浏览器: {browser}")
return factory.create_driver(options)
# ========== 测试数据管理 ==========
import json
import yaml
from typing import Any, Dict
class TestDataManager:
"""
测试数据管理类
"""
def __init__(self, data_dir="data"):
self.data_dir = data_dir
self._cache = {}
def load_json(self, filename):
"""加载JSON数据"""
if filename not in self._cache:
filepath = os.path.join(self.data_dir, filename)
with open(filepath, 'r', encoding='utf-8') as f:
self._cache[filename] = json.load(f)
return self._cache[filename]
def load_yaml(self, filename):
"""加载YAML数据"""
if filename not in self._cache:
filepath = os.path.join(self.data_dir, filename)
with open(filepath, 'r', encoding='utf-8') as f:
self._cache[filename] = yaml.safe_load(f)
return self._cache[filename]
def get_test_data(self, test_name):
"""获取特定测试的数据"""
data = self.load_json("test_data.json")
return data.get(test_name, {})
# ========== 元素定位器管理 ==========
class LocatorManager:
"""
元素定位器管理类
集中管理所有元素定位器
"""
# 登录页定位器
LOGIN = {
"username": (By.ID, "username"),
"password": (By.ID, "password"),
"login_button": (By.ID, "login-btn"),
"error_message": (By.ID, "error-msg"),
}
# 首页定位器
HOME = {
"welcome": (By.ID, "welcome"),
"user_menu": (By.ID, "user-menu"),
"logout": (By.ID, "logout"),
}
@classmethod
def get(cls, page, element):
"""获取定位器"""
page_locators = getattr(cls, page.upper(), {})
return page_locators.get(element)
# 使用示例
username_locator = LocatorManager.get("login", "username")
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)