Appium自动化测试完全指南3
·
第七章:等待机制与Appium Settings API
7.1 等待机制概述
7.2 隐式等待
class ImplicitWaitManager:
"""隐式等待管理类"""
def __init__(self, driver):
self.driver = driver
def set_implicit_wait(self, seconds):
"""
设置隐式等待时间
参数:
seconds: 等待时间(秒)
说明:
- 隐式等待对全局find_element生效
- 在查找元素时,如果元素未立即出现,会等待指定时间
- 不建议与显式等待混用,可能导致不可预期的等待时间
"""
self.driver.implicitly_wait(seconds)
def disable_implicit_wait(self):
"""禁用隐式等待"""
self.driver.implicitly_wait(0)
def get_default_implicit_wait(self):
"""获取默认隐式等待时间"""
return self.driver.timeouts.implicit_wait
# 使用示例
wait_mgr = ImplicitWaitManager(driver)
# 设置10秒隐式等待
wait_mgr.set_implicit_wait(10)
# 查找元素时会自动等待
element = driver.find_element(AppiumBy.ID, 'com.example.app:id/button')
# 禁用隐式等待
wait_mgr.disable_implicit_wait()
7.3 显式等待
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from appium.webdriver.common.appiumby import AppiumBy
class ExplicitWaitManager:
"""显式等待管理类"""
def __init__(self, driver, default_timeout=10):
self.driver = driver
self.default_timeout = default_timeout
def wait_for_element(self, locator, timeout=None):
"""
等待元素出现
参数:
locator: 元素定位器 (By, value)
timeout: 超时时间(秒)
返回:
WebElement: 找到的元素
"""
timeout = timeout or self.default_timeout
return WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located(locator)
)
def wait_for_element_visible(self, locator, timeout=None):
"""等待元素可见"""
timeout = timeout or self.default_timeout
return WebDriverWait(self.driver, timeout).until(
EC.visibility_of_element_located(locator)
)
def wait_for_element_clickable(self, locator, timeout=None):
"""等待元素可点击"""
timeout = timeout or self.default_timeout
return WebDriverWait(self.driver, timeout).until(
EC.element_to_be_clickable(locator)
)
def wait_for_element_invisible(self, locator, timeout=None):
"""等待元素不可见"""
timeout = timeout or self.default_timeout
return WebDriverWait(self.driver, timeout).until(
EC.invisibility_of_element_located(locator)
)
def wait_for_text_present(self, locator, text, timeout=None):
"""等待元素包含指定文本"""
timeout = timeout or self.default_timeout
return WebDriverWait(self.driver, timeout).until(
EC.text_to_be_present_in_element(locator, text)
)
def wait_for_element_selected(self, locator, timeout=None):
"""等待元素被选中"""
timeout = timeout or self.default_timeout
return WebDriverWait(self.driver, timeout).until(
EC.element_to_be_selected(locator)
)
def wait_for_title_contains(self, text, timeout=None):
"""等待标题包含指定文本"""
timeout = timeout or self.default_timeout
return WebDriverWait(self.driver, timeout).until(
EC.title_contains(text)
)
def wait_for_alert_present(self, timeout=None):
"""等待Alert出现"""
timeout = timeout or self.default_timeout
return WebDriverWait(self.driver, timeout).until(
EC.alert_is_present()
)
def wait_for_element_count(self, locator, count, timeout=None):
"""等待元素数量达到指定值"""
timeout = timeout or self.default_timeout
return WebDriverWait(self.driver, timeout).until(
lambda d: len(d.find_elements(*locator)) == count
)
# 使用示例
wait_mgr = ExplicitWaitManager(driver, default_timeout=15)
# 等待元素出现
element = wait_mgr.wait_for_element(
(AppiumBy.ID, 'com.example.app:id/button')
)
# 等待元素可见并点击
button = wait_mgr.wait_for_element_clickable(
(AppiumBy.ID, 'com.example.app:id/submit')
)
button.click()
# 等待元素消失
wait_mgr.wait_for_element_invisible(
(AppiumBy.ID, 'com.example.app:id/loading')
)
7.4 自定义等待条件
class CustomWaitConditions:
"""自定义等待条件"""
@staticmethod
def element_attribute_contains(locator, attribute, value):
"""
等待元素属性包含指定值
参数:
locator: 元素定位器
attribute: 属性名
value: 期望包含的值
"""
def _predicate(driver):
try:
element = driver.find_element(*locator)
return value in element.get_attribute(attribute)
except:
return False
return _predicate
@staticmethod
def element_text_not_empty(locator):
"""等待元素文本非空"""
def _predicate(driver):
try:
element = driver.find_element(*locator)
return len(element.text.strip()) > 0
except:
return False
return _predicate
@staticmethod
def element_enabled(locator):
"""等待元素启用"""
def _predicate(driver):
try:
element = driver.find_element(*locator)
return element.is_enabled()
except:
return False
return _predicate
@staticmethod
def element_has_class(locator, class_name):
"""等待元素包含指定CSS类"""
def _predicate(driver):
try:
element = driver.find_element(*locator)
classes = element.get_attribute('class')
return class_name in classes.split()
except:
return False
return _predicate
@staticmethod
def page_loaded():
"""等待页面完全加载"""
def _predicate(driver):
return driver.execute_script(
'return document.readyState;'
) == 'complete'
return _predicate
@staticmethod
def element_stale(element):
"""等待元素失效"""
def _predicate(driver):
try:
element.is_enabled()
return False
except:
return True
return _predicate
@staticmethod
def any_element_present(*locators):
"""等待任意一个元素出现"""
def _predicate(driver):
for locator in locators:
try:
driver.find_element(*locator)
return True
except:
continue
return False
return _predicate
@staticmethod
def all_elements_present(*locators):
"""等待所有元素都出现"""
def _predicate(driver):
for locator in locators:
try:
driver.find_element(*locator)
except:
return False
return True
return _predicate
# 使用自定义条件示例
wait = WebDriverWait(driver, 10)
# 等待元素属性包含指定值
wait.until(
CustomWaitConditions.element_attribute_contains(
(AppiumBy.ID, 'com.example.app:id/price'),
'text',
'¥'
)
)
# 等待元素文本非空
wait.until(
CustomWaitConditions.element_text_not_empty(
(AppiumBy.ID, 'com.example.app:id/title')
)
)
# 等待页面加载完成
wait.until(CustomWaitConditions.page_loaded())
# 等待任意一个元素出现
wait.until(
CustomWaitConditions.any_element_present(
(AppiumBy.ID, 'com.example.app:id/button1'),
(AppiumBy.ID, 'com.example.app:id/button2')
)
)
7.5 FluentWait流畅等待
from selenium.webdriver.support.ui import FluentWait
from selenium.common.exceptions import NoSuchElementException, StaleElementReferenceException
class FluentWaitManager:
"""流畅等待管理类"""
def __init__(self, driver):
self.driver = driver
def create_fluent_wait(self, timeout=30, poll_frequency=0.5, ignored_exceptions=None):
"""
创建流畅等待实例
参数:
timeout: 超时时间(秒)
poll_frequency: 轮询间隔(秒)
ignored_exceptions: 忽略的异常列表
"""
if ignored_exceptions is None:
ignored_exceptions = [NoSuchElementException, StaleElementReferenceException]
return FluentWait(self.driver).with_timeout(timeout).poll_frequency(poll_frequency).ignore_exceptions(*ignored_exceptions)
def wait_with_retry(self, locator, max_attempts=3, timeout=10):
"""
带重试的等待
参数:
locator: 元素定位器
max_attempts: 最大尝试次数
timeout: 每次尝试的超时时间
"""
wait = self.create_fluent_wait(timeout=timeout)
for attempt in range(max_attempts):
try:
element = wait.until(EC.presence_of_element_located(locator))
return element
except Exception as e:
if attempt == max_attempts - 1:
raise
print(f"第{attempt + 1}次尝试失败,正在重试...")
return None
def wait_with_custom_polling(self, condition, timeout=30, poll_frequency=1):
"""
自定义轮询间隔的等待
参数:
condition: 等待条件
timeout: 超时时间
poll_frequency: 轮询间隔
"""
wait = FluentWait(self.driver).with_timeout(timeout).poll_frequency(poll_frequency).ignoring(NoSuchElementException)
return wait.until(condition)
# 使用示例
fluent_mgr = FluentWaitManager(driver)
# 创建流畅等待
wait = fluent_mgr.create_fluent_wait(timeout=20, poll_frequency=1)
element = wait.until(EC.presence_of_element_located(
(AppiumBy.ID, 'com.example.app:id/item')
))
# 带重试的等待
element = fluent_mgr.wait_with_retry(
(AppiumBy.ID, 'com.example.app:id/unstable_element'),
max_attempts=3
)
7.6 Appium Settings API
7.6.1 Settings API概述
class AppiumSettingsManager:
"""Appium Settings API管理类"""
def __init__(self, driver):
self.driver = driver
def get_settings(self):
"""获取当前所有设置"""
return self.driver.get_settings()
def update_settings(self, settings):
"""
更新设置
参数:
settings: 设置字典
"""
self.driver.update_settings(settings)
def get_setting(self, setting_name):
"""获取单个设置值"""
settings = self.get_settings()
return settings.get(setting_name)
def set_setting(self, setting_name, value):
"""设置单个设置项"""
self.driver.update_settings({setting_name: value})
# 使用示例
settings_mgr = AppiumSettingsManager(driver)
# 获取所有设置
all_settings = settings_mgr.get_settings()
print(f"当前设置: {all_settings}")
# 更新设置
settings_mgr.update_settings({
'waitForIdleTimeout': 0,
'waitForSelectorTimeout': 0
})
7.6.2 Android UiAutomator2 Settings
class AndroidSettingsManager:
"""Android UiAutomator2 Settings管理"""
def __init__(self, driver):
self.driver = driver
def set_wait_for_idle_timeout(self, milliseconds):
"""
设置等待空闲超时
参数:
milliseconds: 超时时间(毫秒),0表示不等待
"""
self.driver.update_settings({
'waitForIdleTimeout': milliseconds
})
def set_wait_for_selector_timeout(self, milliseconds):
"""
设置选择器等待超时
参数:
milliseconds: 超时时间(毫秒)
"""
self.driver.update_settings({
'waitForSelectorTimeout': milliseconds
})
def set_element_response_attributes(self, attributes):
"""
设置元素返回的属性
参数:
attributes: 属性列表,如['text', 'content-desc', 'class']
"""
self.driver.update_settings({
'elementResponseAttributes': ','.join(attributes)
})
def enable_companion_window_tracking(self, enable=True):
"""
启用/禁用伴随窗口跟踪
参数:
enable: 是否启用
"""
self.driver.update_settings({
'trackCompanionWindows': enable
})
def set_scroll_acknowledgment_timeout(self, milliseconds):
"""设置滚动确认超时"""
self.driver.update_settings({
'scrollAcknowledgmentTimeout': milliseconds
})
def set_action_acknowledgment_timeout(self, milliseconds):
"""设置操作确认超时"""
self.driver.update_settings({
'actionAcknowledgmentTimeout': milliseconds
})
def enable_image_analysis(self, enable=True):
"""启用/禁用图像分析"""
self.driver.update_settings({
'enableImageAnalysis': enable
})
def configure_for_fast_operation(self):
"""配置为快速操作模式"""
self.driver.update_settings({
'waitForIdleTimeout': 0,
'waitForSelectorTimeout': 0,
'actionAcknowledgmentTimeout': 100,
'scrollAcknowledgmentTimeout': 100
})
def configure_for_stable_operation(self):
"""配置为稳定操作模式"""
self.driver.update_settings({
'waitForIdleTimeout': 10000,
'waitForSelectorTimeout': 10000,
'actionAcknowledgmentTimeout': 3000,
'scrollAcknowledgmentTimeout': 3000
})
# 使用示例
android_settings = AndroidSettingsManager(driver)
# 快速操作模式
android_settings.configure_for_fast_operation()
# 稳定操作模式
android_settings.configure_for_stable_operation()
# 设置元素返回属性
android_settings.set_element_response_attributes(['text', 'content-desc', 'resource-id'])
7.6.3 iOS XCUITest Settings
class IOSSettingsManager:
"""iOS XCUITest Settings管理"""
def __init__(self, driver):
self.driver = driver
def set_max_typing_frequency(self, frequency):
"""
设置最大输入频率
参数:
frequency: 每秒输入字符数,默认60
"""
self.driver.update_settings({
'maxTypingFrequency': frequency
})
def set_keyboard_autocorrection(self, enable):
"""启用/禁用键盘自动纠正"""
self.driver.update_settings({
'keyboardAutocorrection': enable
})
def set_keyboard_prediction(self, enable):
"""启用/禁用键盘预测"""
self.driver.update_settings({
'keyboardPrediction': enable
})
def set_snapshot_timeout(self, seconds):
"""设置快照超时时间"""
self.driver.update_settings({
'snapshotTimeout': seconds
})
def set_use_first_match(self, enable):
"""启用/禁用首次匹配模式"""
self.driver.update_settings({
'useFirstMatch': enable
})
def set_wait_for_quiescence(self, enable):
"""启用/禁用等待静止"""
self.driver.update_settings({
'waitForQuiescence': enable
})
def set_should_use_compact_responses(self, enable):
"""启用/禁用紧凑响应"""
self.driver.update_settings({
'shouldUseCompactResponses': enable
})
def set_element_response_attributes(self, attributes):
"""设置元素返回的属性"""
self.driver.update_settings({
'elementResponseAttributes': ','.join(attributes)
})
def configure_for_fast_typing(self):
"""配置为快速输入模式"""
self.driver.update_settings({
'maxTypingFrequency': 100,
'keyboardAutocorrection': False,
'keyboardPrediction': False
})
def configure_for_fast_search(self):
"""配置为快速搜索模式"""
self.driver.update_settings({
'useFirstMatch': True,
'waitForQuiescence': False,
'snapshotTimeout': 5
})
# 使用示例
ios_settings = IOSSettingsManager(driver)
# 配置快速输入
ios_settings.configure_for_fast_typing()
# 配置快速搜索
ios_settings.configure_for_fast_search()
# 设置快照超时
ios_settings.set_snapshot_timeout(15)
7.7 等待最佳实践
class WaitBestPractices:
"""等待最佳实践示例"""
def __init__(self, driver):
self.driver = driver
self.wait = WebDriverWait(driver, 10)
def smart_wait_for_element(self, locator, timeout=10, poll_frequency=0.5):
"""
智能等待元素
结合显式等待和流畅等待的优点
"""
wait = FluentWait(self.driver).with_timeout(timeout).poll_frequency(poll_frequency).ignoring(NoSuchElementException, StaleElementReferenceException)
return wait.until(EC.visibility_of_element_located(locator))
def wait_with_spinner_handling(self, locator, spinner_locator=None, timeout=15):
"""
处理加载动画的等待
参数:
locator: 目标元素定位器
spinner_locator: 加载动画定位器
"""
# 先等待加载动画消失
if spinner_locator:
try:
WebDriverWait(self.driver, 5).until(
EC.invisibility_of_element_located(spinner_locator)
)
except:
pass
# 再等待目标元素出现
return WebDriverWait(self.driver, timeout).until(
EC.visibility_of_element_located(locator)
)
def wait_for_page_load_complete(self, timeout=30):
"""等待页面完全加载"""
WebDriverWait(self.driver, timeout).until(
lambda d: d.execute_script('return document.readyState') == 'complete'
)
def wait_for_ajax_complete(self, timeout=30):
"""等待AJAX请求完成"""
WebDriverWait(self.driver, timeout).until(
lambda d: d.execute_script('return jQuery.active == 0') if d.execute_script('return typeof jQuery != "undefined"') else True
)
def wait_for_element_with_retry(self, locator, max_retries=3):
"""
带重试机制的等待
适用于网络不稳定或页面动态加载的场景
"""
for attempt in range(max_retries):
try:
element = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located(locator)
)
return element
except Exception as e:
if attempt == max_retries - 1:
raise
# 刷新页面重试
self.driver.refresh()
return None
# 使用示例
wait_bp = WaitBestPractices(driver)
# 智能等待
element = wait_bp.smart_wait_for_element(
(AppiumBy.ID, 'com.example.app:id/content')
)
# 处理加载动画
element = wait_bp.wait_with_spinner_handling(
(AppiumBy.ID, 'com.example.app:id/list'),
spinner_locator=(AppiumBy.ID, 'com.example.app:id/progress')
)
第八章:上下文切换与高级场景
8.1 WebView上下文处理
class WebViewContextManager:
"""WebView上下文管理类"""
def __init__(self, driver):
self.driver = driver
def get_all_contexts(self):
"""获取所有上下文"""
return self.driver.contexts
def get_current_context(self):
"""获取当前上下文"""
return self.driver.current_context
def switch_to_webview(self, webview_name=None):
"""
切换到WebView上下文
参数:
webview_name: WebView名称,如果为None则自动选择第一个WebView
"""
contexts = self.get_all_contexts()
if webview_name:
self.driver.switch_to.context(webview_name)
else:
# 自动查找WEBVIEW上下文
for context in contexts:
if 'WEBVIEW' in context:
self.driver.switch_to.context(context)
print(f"已切换到上下文: {context}")
return
raise Exception("未找到WebView上下文")
def switch_to_native(self):
"""切换到原生上下文"""
self.driver.switch_to.context('NATIVE_APP')
print("已切换到原生上下文")
def get_webview_handles(self):
"""获取WebView窗口句柄"""
return self.driver.window_handles
def switch_to_window(self, window_handle):
"""切换窗口"""
self.driver.switch_to.window(window_handle)
# 使用示例
webview_mgr = WebViewContextManager(driver)
# 查看所有上下文
contexts = webview_mgr.get_all_contexts()
print(f"可用上下文: {contexts}")
# 切换到WebView
webview_mgr.switch_to_webview()
# 在WebView中操作
driver.find_element(By.ID, 'username').send_keys('test')
# 切换回原生
webview_mgr.switch_to_native()
8.2 自动WebView检测
class AutoWebViewDetector:
"""自动WebView检测器"""
def __init__(self, driver):
self.driver = driver
def wait_for_webview(self, timeout=30):
"""
等待WebView加载完成
参数:
timeout: 超时时间(秒)
返回:
str: WebView上下文名称
"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
contexts = self.driver.contexts
for context in contexts:
if 'WEBVIEW' in context:
return context
time.sleep(1)
raise TimeoutError("等待WebView超时")
def auto_switch_to_webview(self, timeout=30):
"""
自动等待并切换到WebView
参数:
timeout: 超时时间(秒)
"""
webview_context = self.wait_for_webview(timeout)
self.driver.switch_to.context(webview_context)
print(f"自动切换到: {webview_context}")
return webview_context
def is_in_webview(self):
"""检查是否在WebView中"""
return 'WEBVIEW' in self.driver.current_context
def ensure_webview_context(self):
"""确保在WebView上下文中"""
if not self.is_in_webview():
self.auto_switch_to_webview()
def ensure_native_context(self):
"""确保在原生上下文中"""
if self.is_in_webview():
self.driver.switch_to.context('NATIVE_APP')
# 使用示例
detector = AutoWebViewDetector(driver)
# 点击触发WebView加载
driver.find_element(AppiumBy.ID, 'com.example.app:id/open_web').click()
# 自动等待并切换
detector.auto_switch_to_webview()
# 执行WebView操作
driver.find_element(By.ID, 'search').send_keys('Appium')
# 切换回原生
detector.ensure_native_context()
8.3 混合应用测试
class HybridAppTester:
"""混合应用测试类"""
def __init__(self, driver):
self.driver = driver
self.webview_mgr = WebViewContextManager(driver)
def test_hybrid_flow(self, native_steps, webview_steps):
"""
测试混合应用流程
参数:
native_steps: 原生应用操作列表
webview_steps: WebView操作列表
"""
# 执行原生操作
self.webview_mgr.switch_to_native()
for step in native_steps:
step()
# 切换到WebView执行操作
self.webview_mgr.switch_to_webview()
for step in webview_steps:
step()
# 切换回原生
self.webview_mgr.switch_to_native()
def find_element_across_contexts(self, locator, timeout=10):
"""
跨上下文查找元素
在原生和WebView上下文中都尝试查找元素
"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
# 尝试原生上下文
try:
self.webview_mgr.switch_to_native()
element = self.driver.find_element(*locator)
return element
except:
pass
# 尝试WebView上下文
try:
contexts = self.driver.contexts
for context in contexts:
if 'WEBVIEW' in context:
self.driver.switch_to.context(context)
element = self.driver.find_element(*locator)
return element
except:
pass
time.sleep(0.5)
raise Exception(f"在所有上下文中都未找到元素: {locator}")
# 使用示例
hybrid_tester = HybridAppTester(driver)
# 定义操作步骤
def native_login():
driver.find_element(AppiumBy.ID, 'com.example.app:id/username').send_keys('user')
driver.find_element(AppiumBy.ID, 'com.example.app:id/password').send_keys('pass')
driver.find_element(AppiumBy.ID, 'com.example.app:id/login').click()
def webview_search():
driver.find_element(By.ID, 'search_box').send_keys('test')
driver.find_element(By.ID, 'search_btn').click()
# 执行混合测试
hybrid_tester.test_hybrid_flow(
native_steps=[native_login],
webview_steps=[webview_search]
)
8.4 弹窗处理
class PopupHandler:
"""弹窗处理器"""
def __init__(self, driver):
self.driver = driver
def handle_alert_accept(self, timeout=5):
"""接受Alert弹窗"""
try:
alert = WebDriverWait(self.driver, timeout).until(
EC.alert_is_present()
)
alert.accept()
return True
except:
return False
def handle_alert_dismiss(self, timeout=5):
"""取消Alert弹窗"""
try:
alert = WebDriverWait(self.driver, timeout).until(
EC.alert_is_present()
)
alert.dismiss()
return True
except:
return False
def get_alert_text(self, timeout=5):
"""获取Alert文本"""
try:
alert = WebDriverWait(self.driver, timeout).until(
EC.alert_is_present()
)
return alert.text
except:
return None
def handle_android_permission_dialog(self, action='allow'):
"""
处理Android权限弹窗
参数:
action: 'allow' 或 'deny'
"""
from appium.webdriver.common.appiumby import AppiumBy
if action == 'allow':
locators = [
(AppiumBy.ID, 'com.android.packageinstaller:id/permission_allow_button'),
(AppiumBy.ID, 'android:id/button1'),
(AppiumBy.XPATH, '//android.widget.Button[@text="允许"]'),
(AppiumBy.XPATH, '//android.widget.Button[@text="始终允许"]'),
]
else:
locators = [
(AppiumBy.ID, 'com.android.packageinstaller:id/permission_deny_button'),
(AppiumBy.ID, 'android:id/button2'),
(AppiumBy.XPATH, '//android.widget.Button[@text="拒绝"]'),
]
for locator in locators:
try:
element = WebDriverWait(self.driver, 2).until(
EC.element_to_be_clickable(locator)
)
element.click()
return True
except:
continue
return False
def handle_ios_system_alert(self):
"""处理iOS系统弹窗"""
try:
self.driver.execute_script('mobile: acceptAlert', {})
return True
except:
return False
def dismiss_all_popups(self, max_attempts=5):
"""尝试关闭所有弹窗"""
for _ in range(max_attempts):
popup_closed = False
# 尝试关闭Alert
if self.handle_alert_dismiss(timeout=1):
popup_closed = True
# 尝试点击返回按钮
try:
self.driver.back()
popup_closed = True
except:
pass
if not popup_closed:
break
return True
# 使用示例
popup_handler = PopupHandler(driver)
# 处理权限弹窗
popup_handler.handle_android_permission_dialog(action='allow')
# 处理Alert弹窗
if popup_handler.handle_alert_accept():
print("Alert已处理")
# 关闭所有弹窗
popup_handler.dismiss_all_popups()
8.5 键盘与输入处理
class KeyboardInputHandler:
"""键盘输入处理器"""
def __init__(self, driver):
self.driver = driver
def hide_keyboard(self):
"""隐藏键盘"""
try:
self.driver.hide_keyboard()
return True
except:
return False
def is_keyboard_shown(self):
"""检查键盘是否显示"""
return self.driver.is_keyboard_shown()
def press_key(self, keycode, metastate=None):
"""
按下按键(Android)
参数:
keycode: 按键码
metastate: 元状态(组合键)
"""
if metastate:
self.driver.press_keycode(keycode, metastate)
else:
self.driver.press_keycode(keycode)
def long_press_key(self, keycode):
"""长按按键(Android)"""
self.driver.long_press_keycode(keycode)
def send_keys_with_keyboard(self, text):
"""使用键盘输入文本"""
for char in text:
self.driver.press_keycode(ord(char.upper()))
def press_enter(self):
"""按下回车键"""
self.driver.press_keycode(66) # KEYCODE_ENTER
def press_back(self):
"""按下返回键"""
self.driver.press_keycode(4) # KEYCODE_BACK
def press_home(self):
"""按下Home键"""
self.driver.press_keycode(3) # KEYCODE_HOME
def press_tab(self):
"""按下Tab键"""
self.driver.press_keycode(61) # KEYCODE_TAB
def press_delete(self):
"""按下删除键"""
self.driver.press_keycode(67) # KEYCODE_DEL
def clear_and_type(self, element, text):
"""
清除并输入文本
参数:
element: 目标元素
text: 要输入的文本
"""
element.clear()
element.send_keys(text)
def slow_type(self, element, text, interval=0.1):
"""
慢速输入(逐字符输入)
参数:
element: 目标元素
text: 要输入的文本
interval: 字符间隔时间(秒)
"""
import time
element.clear()
for char in text:
element.send_keys(char)
time.sleep(interval)
# Android按键码常量
class AndroidKeyCode:
"""Android按键码常量"""
KEYCODE_HOME = 3
KEYCODE_BACK = 4
KEYCODE_MENU = 82
KEYCODE_ENTER = 66
KEYCODE_TAB = 61
KEYCODE_DEL = 67
KEYCODE_VOLUME_UP = 24
KEYCODE_VOLUME_DOWN = 25
KEYCODE_POWER = 26
KEYCODE_CAMERA = 27
# 使用示例
keyboard = KeyboardInputHandler(driver)
# 隐藏键盘
if keyboard.is_keyboard_shown():
keyboard.hide_keyboard()
# 按下返回键
keyboard.press_back()
# 慢速输入
element = driver.find_element(AppiumBy.ID, 'com.example.app:id/input')
keyboard.slow_type(element, 'Hello World', interval=0.05)
8.6 文件传输
class FileTransferHandler:
"""文件传输处理器"""
def __init__(self, driver):
self.driver = driver
def push_file(self, device_path, local_path):
"""
推送文件到设备
参数:
device_path: 设备目标路径
local_path: 本地文件路径
"""
import base64
with open(local_path, 'rb') as f:
content = base64.b64encode(f.read()).decode('utf-8')
self.driver.push_file(device_path, content)
print(f"文件已推送: {local_path} -> {device_path}")
def pull_file(self, device_path, local_path):
"""
从设备拉取文件
参数:
device_path: 设备文件路径
local_path: 本地保存路径
"""
import base64
content = self.driver.pull_file(device_path)
decoded = base64.b64decode(content)
with open(local_path, 'wb') as f:
f.write(decoded)
print(f"文件已拉取: {device_path} -> {local_path}")
def pull_folder(self, device_path, local_path):
"""
从设备拉取文件夹
参数:
device_path: 设备文件夹路径
local_path: 本地保存路径
"""
import base64
import zipfile
import io
content = self.driver.pull_folder(device_path)
decoded = base64.b64decode(content)
with zipfile.ZipFile(io.BytesIO(decoded), 'r') as zf:
zf.extractall(local_path)
print(f"文件夹已拉取: {device_path} -> {local_path}")
def push_text_as_file(self, device_path, text):
"""
将文本推送到设备文件
参数:
device_path: 设备目标路径
text: 文本内容
"""
import base64
content = base64.b64encode(text.encode('utf-8')).decode('utf-8')
self.driver.push_file(device_path, content)
# 使用示例
file_transfer = FileTransferHandler(driver)
# 推送文件
file_transfer.push_file('/sdcard/test.txt', 'C:/local/test.txt')
# 拉取文件
file_transfer.pull_file('/sdcard/test.txt', 'C:/local/downloaded.txt')
# 推送文本
file_transfer.push_text_as_file('/sdcard/config.json', '{"key": "value"}')
第九章:测试框架与工程化
9.1 Page Object模式
from appium.webdriver.common.appiumby import AppiumBy
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class BasePage:
"""页面基类"""
def __init__(self, driver):
self.driver = driver
self.wait = WebDriverWait(driver, 10)
def find_element(self, locator, timeout=None):
"""查找元素"""
if timeout:
return WebDriverWait(self.driver, timeout).until(
EC.presence_of_element_located(locator)
)
return self.driver.find_element(*locator)
def find_elements(self, locator):
"""查找多个元素"""
return self.driver.find_elements(*locator)
def click(self, locator):
"""点击元素"""
element = self.wait.until(EC.element_to_be_clickable(locator))
element.click()
def input_text(self, locator, text):
"""输入文本"""
element = self.wait.until(EC.visibility_of_element_located(locator))
element.clear()
element.send_keys(text)
def get_text(self, locator):
"""获取元素文本"""
element = self.wait.until(EC.visibility_of_element_located(locator))
return element.text
def is_element_visible(self, locator, timeout=5):
"""检查元素是否可见"""
try:
WebDriverWait(self.driver, timeout).until(
EC.visibility_of_element_located(locator)
)
return True
except:
return False
def wait_for_element_gone(self, locator, timeout=10):
"""等待元素消失"""
WebDriverWait(self.driver, timeout).until(
EC.invisibility_of_element_located(locator)
)
class LoginPage(BasePage):
"""登录页面"""
_USERNAME_INPUT = (AppiumBy.ID, 'com.example.app:id/username')
_PASSWORD_INPUT = (AppiumBy.ID, 'com.example.app:id/password')
_LOGIN_BUTTON = (AppiumBy.ID, 'com.example.app:id/login_btn')
_ERROR_MESSAGE = (AppiumBy.ID, 'com.example.app:id/error_msg')
def enter_username(self, username):
"""输入用户名"""
self.input_text(self._USERNAME_INPUT, username)
return self
def enter_password(self, password):
"""输入密码"""
self.input_text(self._PASSWORD_INPUT, password)
return self
def click_login(self):
"""点击登录按钮"""
self.click(self._LOGIN_BUTTON)
return self
def login(self, username, password):
"""执行登录操作"""
self.enter_username(username)
self.enter_password(password)
self.click_login()
return self
def get_error_message(self):
"""获取错误消息"""
return self.get_text(self._ERROR_MESSAGE)
def is_login_successful(self):
"""检查登录是否成功"""
return not self.is_element_visible(self._ERROR_MESSAGE, timeout=3)
class HomePage(BasePage):
"""首页"""
_WELCOME_TEXT = (AppiumBy.ID, 'com.example.app:id/welcome')
_MENU_BUTTON = (AppiumBy.ID, 'com.example.app:id/menu')
_LOGOUT_BUTTON = (AppiumBy.ID, 'com.example.app:id/logout')
def get_welcome_message(self):
"""获取欢迎消息"""
return self.get_text(self._WELCOME_TEXT)
def open_menu(self):
"""打开菜单"""
self.click(self._MENU_BUTTON)
return self
def logout(self):
"""退出登录"""
self.open_menu()
self.click(self._LOGOUT_BUTTON)
return LoginPage(self.driver)
# 使用示例
login_page = LoginPage(driver)
login_page.login('testuser', 'password123')
if login_page.is_login_successful():
home_page = HomePage(driver)
print(home_page.get_welcome_message())
9.2 数据驱动测试
import pytest
import yaml
import json
import csv
class DataProvider:
"""数据提供器"""
@staticmethod
def from_yaml(file_path):
"""从YAML文件加载数据"""
with open(file_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
@staticmethod
def from_json(file_path):
"""从JSON文件加载数据"""
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
@staticmethod
def from_csv(file_path, delimiter=','):
"""从CSV文件加载数据"""
data = []
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter=delimiter)
for row in reader:
data.append(dict(row))
return data
# login_data.yaml 示例
"""
test_cases:
- username: valid_user
password: valid_pass
expected: success
- username: invalid_user
password: invalid_pass
expected: error
- username: ""
password: valid_pass
expected: username_required
"""
# 使用数据驱动的测试
class TestLogin:
"""登录测试类"""
@pytest.fixture(autouse=True)
def setup(self, driver):
self.login_page = LoginPage(driver)
@pytest.mark.parametrize("data", DataProvider.from_yaml('login_data.yaml')['test_cases'])
def test_login_scenarios(self, data):
"""测试各种登录场景"""
self.login_page.login(data['username'], data['password'])
if data['expected'] == 'success':
assert self.login_page.is_login_successful()
else:
assert self.login_page.get_error_message() is not None
# 使用pytest运行
# pytest test_login.py -v --alluredir=./allure-results
9.3 测试配置管理
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class DeviceConfig:
"""设备配置"""
platform_name: str
platform_version: str
device_name: str
automation_name: str
udid: Optional[str] = None
app: Optional[str] = None
app_package: Optional[str] = None
app_activity: Optional[str] = None
bundle_id: Optional[str] = None
@dataclass
class AppiumConfig:
"""Appium配置"""
server_url: str = 'http://localhost:4723'
implicit_wait: int = 10
explicit_wait: int = 15
command_timeout: int = 300
class ConfigManager:
"""配置管理器"""
def __init__(self, config_file='config.yaml'):
self.config = self._load_config(config_file)
def _load_config(self, file_path):
"""加载配置文件"""
import yaml
with open(file_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def get_device_config(self, device_name):
"""获取设备配置"""
device_data = self.config['devices'].get(device_name)
if not device_data:
raise ValueError(f"设备配置不存在: {device_name}")
return DeviceConfig(**device_data)
def get_appium_config(self):
"""获取Appium配置"""
return AppiumConfig(**self.config.get('appium', {}))
def get_test_config(self):
"""获取测试配置"""
return self.config.get('test', {})
# config.yaml 示例
"""
devices:
android_emulator:
platform_name: Android
platform_version: '13'
device_name: emulator-5554
automation_name: UiAutomator2
app_package: com.example.app
app_activity: .MainActivity
ios_simulator:
platform_name: iOS
platform_version: '16.0'
device_name: iPhone 14
automation_name: XCUITest
bundle_id: com.example.app
appium:
server_url: http://localhost:4723
implicit_wait: 10
explicit_wait: 15
command_timeout: 300
test:
retry_count: 2
screenshot_on_failure: true
video_recording: true
"""
# 使用示例
config_mgr = ConfigManager()
device_config = config_mgr.get_device_config('android_emulator')
appium_config = config_mgr.get_appium_config()
9.4 测试报告
import pytest
import allure
from datetime import datetime
class AllureReporter:
"""Allure报告助手"""
@staticmethod
def step(title):
"""添加测试步骤"""
return allure.step(title)
@staticmethod
def attach_screenshot(driver, name='截图'):
"""附加截图"""
allure.attach(
driver.get_screenshot_as_png(),
name=name,
attachment_type=allure.attachment_type.PNG
)
@staticmethod
def attach_text(text, name='日志'):
"""附加文本"""
allure.attach(
text,
name=name,
attachment_type=allure.attachment_type.TEXT
)
@staticmethod
def attach_html(html, name='HTML'):
"""附加HTML"""
allure.attach(
html,
name=name,
attachment_type=allure.attachment_type.HTML
)
@staticmethod
def feature(name):
"""标记功能模块"""
return allure.feature(name)
@staticmethod
def story(name):
"""标记用户故事"""
return allure.story(name)
@staticmethod
def severity(level):
"""标记严重程度"""
return allure.severity(level)
# 使用Allure的测试示例
@allure.feature('登录功能')
@allure.story('用户登录')
class TestLoginWithReport:
"""带报告的登录测试"""
@pytest.fixture(autouse=True)
def setup(self, driver):
self.driver = driver
self.login_page = LoginPage(driver)
@allure.severity(allure.severity_level.CRITICAL)
@allure.title('正常登录测试')
def test_login_success(self):
"""测试正常登录"""
with AllureReporter.step('输入用户名'):
self.login_page.enter_username('testuser')
AllureReporter.attach_screenshot(self.driver, '输入用户名后')
with AllureReporter.step('输入密码'):
self.login_page.enter_password('password123')
with AllureReporter.step('点击登录按钮'):
self.login_page.click_login()
with AllureReporter.step('验证登录成功'):
assert self.login_page.is_login_successful()
AllureReporter.attach_screenshot(self.driver, '登录成功')
# pytest.ini 配置
"""
[pytest]
addopts = --alluredir=./allure-results --clean-alluredir
testpaths = ./tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
"""
9.5 CI/CD集成
# .gitlab-ci.yml 示例
stages:
- test
- report
android_test:
stage: test
image: python:3.9
before_script:
- pip install -r requirements.txt
- appium driver install uiautomator2
script:
- pytest tests/android/ --alluredir=allure-results
artifacts:
paths:
- allure-results/
expire_in: 1 week
ios_test:
stage: test
tags:
- macos
before_script:
- pip install -r requirements.txt
- appium driver install xcuitest
script:
- pytest tests/ios/ --alluredir=allure-results
artifacts:
paths:
- allure-results/
expire_in: 1 week
generate_report:
stage: report
image: frankescobar/allure-docker-service
script:
- allure generate allure-results -o allure-report
artifacts:
paths:
- allure-report/
expire_in: 1 month
# Jenkinsfile 示例
pipeline {
agent any
stages {
stage('Setup') {
steps {
sh 'pip install -r requirements.txt'
sh 'appium driver install uiautomator2'
}
}
stage('Test') {
parallel {
stage('Android Tests') {
steps {
sh 'pytest tests/android/ --alluredir=allure-results'
}
}
stage('iOS Tests') {
agent { label 'macos' }
steps {
sh 'pytest tests/ios/ --alluredir=allure-results'
}
}
}
}
stage('Report') {
steps {
allure includeProperties: false, jdk: '', results: [[path: 'allure-results']]
}
}
}
post {
always {
cleanWs()
}
}
}
第十章:生产环境最佳实践
10.1 Driver初始化优化
from appium import webdriver
from appium.options.android import UiAutomator2Options
from appium.options.ios import XCUITestOptions
import logging
import time
class DriverFactory:
"""Driver工厂类"""
_instance = None
_driver = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@staticmethod
def create_android_driver(config, retries=3):
"""
创建Android Driver
参数:
config: 设备配置
retries: 重试次数
"""
options = UiAutomator2Options()
options.platform_name = config.platform_name
options.platform_version = config.platform_version
options.device_name = config.device_name
options.automation_name = config.automation_name
options.app_package = config.app_package
options.app_activity = config.app_activity
options.no_reset = False
options.new_command_timeout = 300
options.uiautomator2_server_launch_timeout = 60000
options.uiautomator2_server_install_timeout = 60000
options.adb_exec_timeout = 60000
options.skip_server_installation = False
options.disable_window_animation = True
options.ignore_unimportant_views = True
options.enable_performance_logging = True
for attempt in range(retries):
try:
driver = webdriver.Remote(
'http://localhost:4723',
options=options
)
driver.implicitly_wait(10)
logging.info(f"Android Driver创建成功: {config.device_name}")
return driver
except Exception as e:
logging.warning(f"第{attempt + 1}次创建Driver失败: {e}")
if attempt == retries - 1:
raise
time.sleep(5)
@staticmethod
def create_ios_driver(config, retries=3):
"""
创建iOS Driver
参数:
config: 设备配置
retries: 重试次数
"""
options = XCUITestOptions()
options.platform_name = config.platform_name
options.platform_version = config.platform_version
options.device_name = config.device_name
options.automation_name = config.automation_name
options.bundle_id = config.bundle_id
options.no_reset = False
options.new_command_timeout = 300
options.wda_launch_timeout = 120000
options.wda_connection_timeout = 240000
options.wda_startup_retries = 4
options.wda_startup_retry_interval = 20000
options.use_new_wda = False
options.use_prebuilt_wda = True
options.skip_wda_installation = False
options.show_ios_log = False
options.auto_accept_alerts = True
for attempt in range(retries):
try:
driver = webdriver.Remote(
'http://localhost:4723',
options=options
)
driver.implicitly_wait(10)
logging.info(f"iOS Driver创建成功: {config.device_name}")
return driver
except Exception as e:
logging.warning(f"第{attempt + 1}次创建Driver失败: {e}")
if attempt == retries - 1:
raise
time.sleep(5)
@staticmethod
def quit_driver(driver):
"""安全退出Driver"""
if driver:
try:
driver.quit()
logging.info("Driver已安全退出")
except Exception as e:
logging.warning(f"退出Driver时出错: {e}")
# 使用示例
factory = DriverFactory()
driver = factory.create_android_driver(device_config)
10.2 错误处理与重试
import functools
import logging
import time
from typing import Callable, Type, Tuple
def retry(
max_attempts: int = 3,
delay: float = 1.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,),
on_retry: Callable = None
):
"""
重试装饰器
参数:
max_attempts: 最大尝试次数
delay: 重试间隔(秒)
exceptions: 需要重试的异常类型
on_retry: 重试时的回调函数
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
logging.warning(
f"函数 {func.__name__} 第{attempt + 1}次执行失败: {e}"
)
if on_retry:
on_retry(attempt, e)
if attempt < max_attempts - 1:
time.sleep(delay)
raise last_exception
return wrapper
return decorator
class RobustElementOperations:
"""健壮的元素操作类"""
def __init__(self, driver):
self.driver = driver
@retry(max_attempts=3, delay=1.0)
def find_and_click(self, locator):
"""查找并点击元素(带重试)"""
element = WebDriverWait(self.driver, 10).until(
EC.element_to_be_clickable(locator)
)
element.click()
return element
@retry(max_attempts=3, delay=0.5)
def find_and_input(self, locator, text):
"""查找并输入文本(带重试)"""
element = WebDriverWait(self.driver, 10).until(
EC.visibility_of_element_located(locator)
)
element.clear()
element.send_keys(text)
return element
def safe_click(self, locator, ignore_errors=True):
"""安全点击(忽略错误)"""
try:
return self.find_and_click(locator)
except Exception as e:
logging.warning(f"点击失败: {e}")
if not ignore_errors:
raise
return None
# 使用示例
robust_ops = RobustElementOperations(driver)
# 带重试的点击
robust_ops.find_and_click((AppiumBy.ID, 'com.example.app:id/button'))
# 安全点击
robust_ops.safe_click((AppiumBy.ID, 'com.example.app:id/optional_button'))
10.3 性能监控
import time
import logging
from functools import wraps
from contextlib import contextmanager
class PerformanceMonitor:
"""性能监控类"""
def __init__(self, driver):
self.driver = driver
self.metrics = {}
@contextmanager
def measure_time(self, operation_name):
"""测量操作耗时"""
start_time = time.time()
yield
elapsed_time = time.time() - start_time
self.metrics[operation_name] = elapsed_time
logging.info(f"{operation_name} 耗时: {elapsed_time:.2f}秒")
def get_memory_info(self):
"""获取内存信息(Android)"""
result = self.driver.execute_script('mobile: shell', {
'command': 'dumpsys meminfo com.example.app'
})
return result
def get_cpu_usage(self):
"""获取CPU使用率(Android)"""
result = self.driver.execute_script('mobile: shell', {
'command': 'dumpsys cpuinfo com.example.app'
})
return result
def get_battery_info(self):
"""获取电池信息(Android)"""
result = self.driver.execute_script('mobile: shell', {
'command': 'dumpsys battery'
})
return result
def get_network_stats(self):
"""获取网络统计(Android)"""
result = self.driver.execute_script('mobile: shell', {
'command': 'cat /proc/net/dev'
})
return result
def start_performance_recording(self):
"""开始性能录制"""
self.driver.start_recording_screen()
def stop_performance_recording(self):
"""停止性能录制"""
return self.driver.stop_recording_screen()
def get_page_source_size(self):
"""获取页面源码大小"""
source = self.driver.page_source
return len(source)
def get_metrics_summary(self):
"""获取性能指标摘要"""
return {
'operations': self.metrics,
'total_time': sum(self.metrics.values()),
'avg_time': sum(self.metrics.values()) / len(self.metrics) if self.metrics else 0
}
def performance_decorator(func):
"""性能监控装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
elapsed_time = time.time() - start_time
logging.info(f"函数 {func.__name__} 执行耗时: {elapsed_time:.2f}秒")
return result
return wrapper
# 使用示例
perf_monitor = PerformanceMonitor(driver)
with perf_monitor.measure_time('登录操作'):
login_page.login('user', 'pass')
print(perf_monitor.get_metrics_summary())
10.4 日志与调试
import logging
import os
from datetime import datetime
class TestLogger:
"""测试日志类"""
_instance = None
def __new__(cls, log_dir='logs'):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialize(log_dir)
return cls._instance
def _initialize(self, log_dir):
"""初始化日志配置"""
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(
log_dir,
f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
)
self.logger = logging.getLogger('AppiumTest')
self.logger.setLevel(logging.DEBUG)
# 文件处理器
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def info(self, message):
self.logger.info(message)
def warning(self, message):
self.logger.warning(message)
def error(self, message):
self.logger.error(message)
def debug(self, message):
self.logger.debug(message)
def step(self, step_name):
"""记录测试步骤"""
self.info(f"=== 步骤: {step_name} ===")
class DebugHelper:
"""调试助手类"""
def __init__(self, driver, screenshot_dir='screenshots'):
self.driver = driver
self.screenshot_dir = screenshot_dir
self.logger = TestLogger()
if not os.path.exists(screenshot_dir):
os.makedirs(screenshot_dir)
def take_screenshot(self, name=None):
"""截图"""
if name is None:
name = datetime.now().strftime('%Y%m%d_%H%M%S')
screenshot_path = os.path.join(
self.screenshot_dir,
f"{name}.png"
)
self.driver.save_screenshot(screenshot_path)
self.logger.info(f"截图已保存: {screenshot_path}")
return screenshot_path
def save_page_source(self, name=None):
"""保存页面源码"""
if name is None:
name = datetime.now().strftime('%Y%m%d_%H%M%S')
source_path = os.path.join(
self.screenshot_dir,
f"{name}.xml"
)
with open(source_path, 'w', encoding='utf-8') as f:
f.write(self.driver.page_source)
self.logger.info(f"页面源码已保存: {source_path}")
return source_path
def debug_element(self, locator):
"""调试元素"""
try:
element = self.driver.find_element(*locator)
self.logger.info(f"元素找到: {locator}")
self.logger.info(f" - 文本: {element.text}")
self.logger.info(f" - 位置: {element.location}")
self.logger.info(f" - 大小: {element.size}")
self.logger.info(f" - 可见: {element.is_displayed()}")
self.logger.info(f" - 启用: {element.is_enabled()}")
self.logger.info(f" - 选中: {element.is_selected()}")
return element
except Exception as e:
self.logger.error(f"元素未找到: {locator}, 错误: {e}")
return None
def highlight_and_screenshot(self, locator, name=None):
"""高亮元素并截图"""
element = self.driver.find_element(*locator)
self.driver.execute_script(
'arguments[0].style.border = "3px solid red"',
element
)
screenshot_path = self.take_screenshot(name)
self.driver.execute_script(
'arguments[0].style.border = ""',
element
)
return screenshot_path
# 使用示例
logger = TestLogger()
debug_helper = DebugHelper(driver)
logger.step("开始登录测试")
logger.info("输入用户名")
debug_helper.take_screenshot('before_login')
10.5 并行测试
import pytest
from concurrent.futures import ThreadPoolExecutor
from appium import webdriver
class ParallelTestExecutor:
"""并行测试执行器"""
def __init__(self, devices_config):
self.devices_config = devices_config
def create_driver_for_device(self, device_name):
"""为设备创建Driver"""
config = self.devices_config.get(device_name)
if config['platform_name'] == 'Android':
return DriverFactory.create_android_driver(config)
else:
return DriverFactory.create_ios_driver(config)
def run_test_on_device(self, device_name, test_func):
"""在指定设备上运行测试"""
driver = None
try:
driver = self.create_driver_for_device(device_name)
result = test_func(driver)
return {'device': device_name, 'result': result, 'status': 'passed'}
except Exception as e:
return {'device': device_name, 'error': str(e), 'status': 'failed'}
finally:
if driver:
DriverFactory.quit_driver(driver)
def run_tests_parallel(self, test_func, max_workers=4):
"""并行运行测试"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for device_name in self.devices_config.keys():
future = executor.submit(
self.run_test_on_device,
device_name,
test_func
)
futures.append(future)
for future in futures:
results.append(future.result())
return results
# pytest-xdist 并行测试配置
# pytest.ini
"""
[pytest]
addopts = -n auto --dist=loadscope
"""
# conftest.py
import pytest
@pytest.fixture(scope="session")
def driver(request):
"""每个worker创建独立的driver"""
device_name = request.config.getoption("--device")
config = load_device_config(device_name)
driver = DriverFactory.create_android_driver(config)
yield driver
DriverFactory.quit_driver(driver)
def pytest_addoption(parser):
parser.addoption("--device", action="store", default="android_emulator")
# 运行命令
# pytest tests/ -n 4 --device=android_emulator
10.6 测试数据管理
import json
import os
from typing import Any, Dict
class TestDataManager:
"""测试数据管理类"""
def __init__(self, data_dir='test_data'):
self.data_dir = data_dir
self.cache = {}
def load_json(self, file_name):
"""加载JSON数据"""
if file_name in self.cache:
return self.cache[file_name]
file_path = os.path.join(self.data_dir, file_name)
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self.cache[file_name] = data
return data
def get_test_data(self, file_name, key=None):
"""获取测试数据"""
data = self.load_json(file_name)
if key:
return data.get(key)
return data
def save_test_result(self, file_name, result):
"""保存测试结果"""
file_path = os.path.join(self.data_dir, 'results', file_name)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
# test_data/users.json 示例
"""
{
"valid_users": [
{"username": "user1", "password": "pass1"},
{"username": "user2", "password": "pass2"}
],
"invalid_users": [
{"username": "invalid", "password": "wrong"}
]
}
"""
# 使用示例
data_mgr = TestDataManager()
valid_users = data_mgr.get_test_data('users.json', 'valid_users')
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)