第七章:等待机制与Appium Settings API

7.1 等待机制概述

等待机制

隐式等待
Implicit Wait

显式等待
Explicit Wait

流畅等待
Fluent Wait

全局生效

简单易用

不推荐与显式等待混用

条件精确控制

推荐使用

WebDriverWait

自定义间隔

忽略异常

灵活配置

7.2 隐式等待

class ImplicitWaitManager:
    """隐式等待管理类"""
    
    def __init__(self, driver):
        self.driver = driver
    
    def set_implicit_wait(self, seconds):
        """
        设置隐式等待时间
        
        参数:
            seconds: 等待时间(秒)
        
        说明:
            - 隐式等待对全局find_element生效
            - 在查找元素时,如果元素未立即出现,会等待指定时间
            - 不建议与显式等待混用,可能导致不可预期的等待时间
        """
        self.driver.implicitly_wait(seconds)
    
    def disable_implicit_wait(self):
        """禁用隐式等待"""
        self.driver.implicitly_wait(0)
    
    def get_default_implicit_wait(self):
        """获取默认隐式等待时间"""
        return self.driver.timeouts.implicit_wait

# 使用示例
wait_mgr = ImplicitWaitManager(driver)

# 设置10秒隐式等待
wait_mgr.set_implicit_wait(10)

# 查找元素时会自动等待
element = driver.find_element(AppiumBy.ID, 'com.example.app:id/button')

# 禁用隐式等待
wait_mgr.disable_implicit_wait()

7.3 显式等待

from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from appium.webdriver.common.appiumby import AppiumBy

class ExplicitWaitManager:
    """显式等待管理类"""
    
    def __init__(self, driver, default_timeout=10):
        self.driver = driver
        self.default_timeout = default_timeout
    
    def wait_for_element(self, locator, timeout=None):
        """
        等待元素出现
        
        参数:
            locator: 元素定位器 (By, value)
            timeout: 超时时间(秒)
        返回:
            WebElement: 找到的元素
        """
        timeout = timeout or self.default_timeout
        return WebDriverWait(self.driver, timeout).until(
            EC.presence_of_element_located(locator)
        )
    
    def wait_for_element_visible(self, locator, timeout=None):
        """等待元素可见"""
        timeout = timeout or self.default_timeout
        return WebDriverWait(self.driver, timeout).until(
            EC.visibility_of_element_located(locator)
        )
    
    def wait_for_element_clickable(self, locator, timeout=None):
        """等待元素可点击"""
        timeout = timeout or self.default_timeout
        return WebDriverWait(self.driver, timeout).until(
            EC.element_to_be_clickable(locator)
        )
    
    def wait_for_element_invisible(self, locator, timeout=None):
        """等待元素不可见"""
        timeout = timeout or self.default_timeout
        return WebDriverWait(self.driver, timeout).until(
            EC.invisibility_of_element_located(locator)
        )
    
    def wait_for_text_present(self, locator, text, timeout=None):
        """等待元素包含指定文本"""
        timeout = timeout or self.default_timeout
        return WebDriverWait(self.driver, timeout).until(
            EC.text_to_be_present_in_element(locator, text)
        )
    
    def wait_for_element_selected(self, locator, timeout=None):
        """等待元素被选中"""
        timeout = timeout or self.default_timeout
        return WebDriverWait(self.driver, timeout).until(
            EC.element_to_be_selected(locator)
        )
    
    def wait_for_title_contains(self, text, timeout=None):
        """等待标题包含指定文本"""
        timeout = timeout or self.default_timeout
        return WebDriverWait(self.driver, timeout).until(
            EC.title_contains(text)
        )
    
    def wait_for_alert_present(self, timeout=None):
        """等待Alert出现"""
        timeout = timeout or self.default_timeout
        return WebDriverWait(self.driver, timeout).until(
            EC.alert_is_present()
        )
    
    def wait_for_element_count(self, locator, count, timeout=None):
        """等待元素数量达到指定值"""
        timeout = timeout or self.default_timeout
        return WebDriverWait(self.driver, timeout).until(
            lambda d: len(d.find_elements(*locator)) == count
        )

# 使用示例
wait_mgr = ExplicitWaitManager(driver, default_timeout=15)

# 等待元素出现
element = wait_mgr.wait_for_element(
    (AppiumBy.ID, 'com.example.app:id/button')
)

# 等待元素可见并点击
button = wait_mgr.wait_for_element_clickable(
    (AppiumBy.ID, 'com.example.app:id/submit')
)
button.click()

# 等待元素消失
wait_mgr.wait_for_element_invisible(
    (AppiumBy.ID, 'com.example.app:id/loading')
)

7.4 自定义等待条件

class CustomWaitConditions:
    """自定义等待条件"""
    
    @staticmethod
    def element_attribute_contains(locator, attribute, value):
        """
        等待元素属性包含指定值
        
        参数:
            locator: 元素定位器
            attribute: 属性名
            value: 期望包含的值
        """
        def _predicate(driver):
            try:
                element = driver.find_element(*locator)
                return value in element.get_attribute(attribute)
            except:
                return False
        return _predicate
    
    @staticmethod
    def element_text_not_empty(locator):
        """等待元素文本非空"""
        def _predicate(driver):
            try:
                element = driver.find_element(*locator)
                return len(element.text.strip()) > 0
            except:
                return False
        return _predicate
    
    @staticmethod
    def element_enabled(locator):
        """等待元素启用"""
        def _predicate(driver):
            try:
                element = driver.find_element(*locator)
                return element.is_enabled()
            except:
                return False
        return _predicate
    
    @staticmethod
    def element_has_class(locator, class_name):
        """等待元素包含指定CSS类"""
        def _predicate(driver):
            try:
                element = driver.find_element(*locator)
                classes = element.get_attribute('class')
                return class_name in classes.split()
            except:
                return False
        return _predicate
    
    @staticmethod
    def page_loaded():
        """等待页面完全加载"""
        def _predicate(driver):
            return driver.execute_script(
                'return document.readyState;'
            ) == 'complete'
        return _predicate
    
    @staticmethod
    def element_stale(element):
        """等待元素失效"""
        def _predicate(driver):
            try:
                element.is_enabled()
                return False
            except:
                return True
        return _predicate
    
    @staticmethod
    def any_element_present(*locators):
        """等待任意一个元素出现"""
        def _predicate(driver):
            for locator in locators:
                try:
                    driver.find_element(*locator)
                    return True
                except:
                    continue
            return False
        return _predicate
    
    @staticmethod
    def all_elements_present(*locators):
        """等待所有元素都出现"""
        def _predicate(driver):
            for locator in locators:
                try:
                    driver.find_element(*locator)
                except:
                    return False
            return True
        return _predicate

# 使用自定义条件示例
wait = WebDriverWait(driver, 10)

# 等待元素属性包含指定值
wait.until(
    CustomWaitConditions.element_attribute_contains(
        (AppiumBy.ID, 'com.example.app:id/price'),
        'text',
        '¥'
    )
)

# 等待元素文本非空
wait.until(
    CustomWaitConditions.element_text_not_empty(
        (AppiumBy.ID, 'com.example.app:id/title')
    )
)

# 等待页面加载完成
wait.until(CustomWaitConditions.page_loaded())

# 等待任意一个元素出现
wait.until(
    CustomWaitConditions.any_element_present(
        (AppiumBy.ID, 'com.example.app:id/button1'),
        (AppiumBy.ID, 'com.example.app:id/button2')
    )
)

7.5 FluentWait流畅等待

from selenium.webdriver.support.ui import FluentWait
from selenium.common.exceptions import NoSuchElementException, StaleElementReferenceException

class FluentWaitManager:
    """流畅等待管理类"""
    
    def __init__(self, driver):
        self.driver = driver
    
    def create_fluent_wait(self, timeout=30, poll_frequency=0.5, ignored_exceptions=None):
        """
        创建流畅等待实例
        
        参数:
            timeout: 超时时间(秒)
            poll_frequency: 轮询间隔(秒)
            ignored_exceptions: 忽略的异常列表
        """
        if ignored_exceptions is None:
            ignored_exceptions = [NoSuchElementException, StaleElementReferenceException]
        
        return FluentWait(self.driver).with_timeout(timeout).poll_frequency(poll_frequency).ignore_exceptions(*ignored_exceptions)
    
    def wait_with_retry(self, locator, max_attempts=3, timeout=10):
        """
        带重试的等待
        
        参数:
            locator: 元素定位器
            max_attempts: 最大尝试次数
            timeout: 每次尝试的超时时间
        """
        wait = self.create_fluent_wait(timeout=timeout)
        
        for attempt in range(max_attempts):
            try:
                element = wait.until(EC.presence_of_element_located(locator))
                return element
            except Exception as e:
                if attempt == max_attempts - 1:
                    raise
                print(f"第{attempt + 1}次尝试失败,正在重试...")
        
        return None
    
    def wait_with_custom_polling(self, condition, timeout=30, poll_frequency=1):
        """
        自定义轮询间隔的等待
        
        参数:
            condition: 等待条件
            timeout: 超时时间
            poll_frequency: 轮询间隔
        """
        wait = FluentWait(self.driver).with_timeout(timeout).poll_frequency(poll_frequency).ignoring(NoSuchElementException)
        
        return wait.until(condition)

# 使用示例
fluent_mgr = FluentWaitManager(driver)

# 创建流畅等待
wait = fluent_mgr.create_fluent_wait(timeout=20, poll_frequency=1)
element = wait.until(EC.presence_of_element_located(
    (AppiumBy.ID, 'com.example.app:id/item')
))

# 带重试的等待
element = fluent_mgr.wait_with_retry(
    (AppiumBy.ID, 'com.example.app:id/unstable_element'),
    max_attempts=3
)

7.6 Appium Settings API

7.6.1 Settings API概述
class AppiumSettingsManager:
    """Appium Settings API管理类"""
    
    def __init__(self, driver):
        self.driver = driver
    
    def get_settings(self):
        """获取当前所有设置"""
        return self.driver.get_settings()
    
    def update_settings(self, settings):
        """
        更新设置
        
        参数:
            settings: 设置字典
        """
        self.driver.update_settings(settings)
    
    def get_setting(self, setting_name):
        """获取单个设置值"""
        settings = self.get_settings()
        return settings.get(setting_name)
    
    def set_setting(self, setting_name, value):
        """设置单个设置项"""
        self.driver.update_settings({setting_name: value})

# 使用示例
settings_mgr = AppiumSettingsManager(driver)

# 获取所有设置
all_settings = settings_mgr.get_settings()
print(f"当前设置: {all_settings}")

# 更新设置
settings_mgr.update_settings({
    'waitForIdleTimeout': 0,
    'waitForSelectorTimeout': 0
})
7.6.2 Android UiAutomator2 Settings
class AndroidSettingsManager:
    """Android UiAutomator2 Settings管理"""
    
    def __init__(self, driver):
        self.driver = driver
    
    def set_wait_for_idle_timeout(self, milliseconds):
        """
        设置等待空闲超时
        
        参数:
            milliseconds: 超时时间(毫秒),0表示不等待
        """
        self.driver.update_settings({
            'waitForIdleTimeout': milliseconds
        })
    
    def set_wait_for_selector_timeout(self, milliseconds):
        """
        设置选择器等待超时
        
        参数:
            milliseconds: 超时时间(毫秒)
        """
        self.driver.update_settings({
            'waitForSelectorTimeout': milliseconds
        })
    
    def set_element_response_attributes(self, attributes):
        """
        设置元素返回的属性
        
        参数:
            attributes: 属性列表,如['text', 'content-desc', 'class']
        """
        self.driver.update_settings({
            'elementResponseAttributes': ','.join(attributes)
        })
    
    def enable_companion_window_tracking(self, enable=True):
        """
        启用/禁用伴随窗口跟踪
        
        参数:
            enable: 是否启用
        """
        self.driver.update_settings({
            'trackCompanionWindows': enable
        })
    
    def set_scroll_acknowledgment_timeout(self, milliseconds):
        """设置滚动确认超时"""
        self.driver.update_settings({
            'scrollAcknowledgmentTimeout': milliseconds
        })
    
    def set_action_acknowledgment_timeout(self, milliseconds):
        """设置操作确认超时"""
        self.driver.update_settings({
            'actionAcknowledgmentTimeout': milliseconds
        })
    
    def enable_image_analysis(self, enable=True):
        """启用/禁用图像分析"""
        self.driver.update_settings({
            'enableImageAnalysis': enable
        })
    
    def configure_for_fast_operation(self):
        """配置为快速操作模式"""
        self.driver.update_settings({
            'waitForIdleTimeout': 0,
            'waitForSelectorTimeout': 0,
            'actionAcknowledgmentTimeout': 100,
            'scrollAcknowledgmentTimeout': 100
        })
    
    def configure_for_stable_operation(self):
        """配置为稳定操作模式"""
        self.driver.update_settings({
            'waitForIdleTimeout': 10000,
            'waitForSelectorTimeout': 10000,
            'actionAcknowledgmentTimeout': 3000,
            'scrollAcknowledgmentTimeout': 3000
        })

# 使用示例
android_settings = AndroidSettingsManager(driver)

# 快速操作模式
android_settings.configure_for_fast_operation()

# 稳定操作模式
android_settings.configure_for_stable_operation()

# 设置元素返回属性
android_settings.set_element_response_attributes(['text', 'content-desc', 'resource-id'])
7.6.3 iOS XCUITest Settings
class IOSSettingsManager:
    """iOS XCUITest Settings管理"""
    
    def __init__(self, driver):
        self.driver = driver
    
    def set_max_typing_frequency(self, frequency):
        """
        设置最大输入频率
        
        参数:
            frequency: 每秒输入字符数,默认60
        """
        self.driver.update_settings({
            'maxTypingFrequency': frequency
        })
    
    def set_keyboard_autocorrection(self, enable):
        """启用/禁用键盘自动纠正"""
        self.driver.update_settings({
            'keyboardAutocorrection': enable
        })
    
    def set_keyboard_prediction(self, enable):
        """启用/禁用键盘预测"""
        self.driver.update_settings({
            'keyboardPrediction': enable
        })
    
    def set_snapshot_timeout(self, seconds):
        """设置快照超时时间"""
        self.driver.update_settings({
            'snapshotTimeout': seconds
        })
    
    def set_use_first_match(self, enable):
        """启用/禁用首次匹配模式"""
        self.driver.update_settings({
            'useFirstMatch': enable
        })
    
    def set_wait_for_quiescence(self, enable):
        """启用/禁用等待静止"""
        self.driver.update_settings({
            'waitForQuiescence': enable
        })
    
    def set_should_use_compact_responses(self, enable):
        """启用/禁用紧凑响应"""
        self.driver.update_settings({
            'shouldUseCompactResponses': enable
        })
    
    def set_element_response_attributes(self, attributes):
        """设置元素返回的属性"""
        self.driver.update_settings({
            'elementResponseAttributes': ','.join(attributes)
        })
    
    def configure_for_fast_typing(self):
        """配置为快速输入模式"""
        self.driver.update_settings({
            'maxTypingFrequency': 100,
            'keyboardAutocorrection': False,
            'keyboardPrediction': False
        })
    
    def configure_for_fast_search(self):
        """配置为快速搜索模式"""
        self.driver.update_settings({
            'useFirstMatch': True,
            'waitForQuiescence': False,
            'snapshotTimeout': 5
        })

# 使用示例
ios_settings = IOSSettingsManager(driver)

# 配置快速输入
ios_settings.configure_for_fast_typing()

# 配置快速搜索
ios_settings.configure_for_fast_search()

# 设置快照超时
ios_settings.set_snapshot_timeout(15)

7.7 等待最佳实践

class WaitBestPractices:
    """等待最佳实践示例"""
    
    def __init__(self, driver):
        self.driver = driver
        self.wait = WebDriverWait(driver, 10)
    
    def smart_wait_for_element(self, locator, timeout=10, poll_frequency=0.5):
        """
        智能等待元素
        
        结合显式等待和流畅等待的优点
        """
        wait = FluentWait(self.driver).with_timeout(timeout).poll_frequency(poll_frequency).ignoring(NoSuchElementException, StaleElementReferenceException)
        
        return wait.until(EC.visibility_of_element_located(locator))
    
    def wait_with_spinner_handling(self, locator, spinner_locator=None, timeout=15):
        """
        处理加载动画的等待
        
        参数:
            locator: 目标元素定位器
            spinner_locator: 加载动画定位器
        """
        # 先等待加载动画消失
        if spinner_locator:
            try:
                WebDriverWait(self.driver, 5).until(
                    EC.invisibility_of_element_located(spinner_locator)
                )
            except:
                pass
        
        # 再等待目标元素出现
        return WebDriverWait(self.driver, timeout).until(
            EC.visibility_of_element_located(locator)
        )
    
    def wait_for_page_load_complete(self, timeout=30):
        """等待页面完全加载"""
        WebDriverWait(self.driver, timeout).until(
            lambda d: d.execute_script('return document.readyState') == 'complete'
        )
    
    def wait_for_ajax_complete(self, timeout=30):
        """等待AJAX请求完成"""
        WebDriverWait(self.driver, timeout).until(
            lambda d: d.execute_script('return jQuery.active == 0') if d.execute_script('return typeof jQuery != "undefined"') else True
        )
    
    def wait_for_element_with_retry(self, locator, max_retries=3):
        """
        带重试机制的等待
        
        适用于网络不稳定或页面动态加载的场景
        """
        for attempt in range(max_retries):
            try:
                element = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located(locator)
                )
                return element
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                # 刷新页面重试
                self.driver.refresh()
        
        return None

# 使用示例
wait_bp = WaitBestPractices(driver)

# 智能等待
element = wait_bp.smart_wait_for_element(
    (AppiumBy.ID, 'com.example.app:id/content')
)

# 处理加载动画
element = wait_bp.wait_with_spinner_handling(
    (AppiumBy.ID, 'com.example.app:id/list'),
    spinner_locator=(AppiumBy.ID, 'com.example.app:id/progress')
)

第八章:上下文切换与高级场景

8.1 WebView上下文处理

class WebViewContextManager:
    """WebView上下文管理类"""
    
    def __init__(self, driver):
        self.driver = driver
    
    def get_all_contexts(self):
        """获取所有上下文"""
        return self.driver.contexts
    
    def get_current_context(self):
        """获取当前上下文"""
        return self.driver.current_context
    
    def switch_to_webview(self, webview_name=None):
        """
        切换到WebView上下文
        
        参数:
            webview_name: WebView名称,如果为None则自动选择第一个WebView
        """
        contexts = self.get_all_contexts()
        
        if webview_name:
            self.driver.switch_to.context(webview_name)
        else:
            # 自动查找WEBVIEW上下文
            for context in contexts:
                if 'WEBVIEW' in context:
                    self.driver.switch_to.context(context)
                    print(f"已切换到上下文: {context}")
                    return
        
        raise Exception("未找到WebView上下文")
    
    def switch_to_native(self):
        """切换到原生上下文"""
        self.driver.switch_to.context('NATIVE_APP')
        print("已切换到原生上下文")
    
    def get_webview_handles(self):
        """获取WebView窗口句柄"""
        return self.driver.window_handles
    
    def switch_to_window(self, window_handle):
        """切换窗口"""
        self.driver.switch_to.window(window_handle)

# 使用示例
webview_mgr = WebViewContextManager(driver)

# 查看所有上下文
contexts = webview_mgr.get_all_contexts()
print(f"可用上下文: {contexts}")

# 切换到WebView
webview_mgr.switch_to_webview()

# 在WebView中操作
driver.find_element(By.ID, 'username').send_keys('test')

# 切换回原生
webview_mgr.switch_to_native()

8.2 自动WebView检测

class AutoWebViewDetector:
    """自动WebView检测器"""
    
    def __init__(self, driver):
        self.driver = driver
    
    def wait_for_webview(self, timeout=30):
        """
        等待WebView加载完成
        
        参数:
            timeout: 超时时间(秒)
        返回:
            str: WebView上下文名称
        """
        import time
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            contexts = self.driver.contexts
            for context in contexts:
                if 'WEBVIEW' in context:
                    return context
            time.sleep(1)
        
        raise TimeoutError("等待WebView超时")
    
    def auto_switch_to_webview(self, timeout=30):
        """
        自动等待并切换到WebView
        
        参数:
            timeout: 超时时间(秒)
        """
        webview_context = self.wait_for_webview(timeout)
        self.driver.switch_to.context(webview_context)
        print(f"自动切换到: {webview_context}")
        return webview_context
    
    def is_in_webview(self):
        """检查是否在WebView中"""
        return 'WEBVIEW' in self.driver.current_context
    
    def ensure_webview_context(self):
        """确保在WebView上下文中"""
        if not self.is_in_webview():
            self.auto_switch_to_webview()
    
    def ensure_native_context(self):
        """确保在原生上下文中"""
        if self.is_in_webview():
            self.driver.switch_to.context('NATIVE_APP')

# 使用示例
detector = AutoWebViewDetector(driver)

# 点击触发WebView加载
driver.find_element(AppiumBy.ID, 'com.example.app:id/open_web').click()

# 自动等待并切换
detector.auto_switch_to_webview()

# 执行WebView操作
driver.find_element(By.ID, 'search').send_keys('Appium')

# 切换回原生
detector.ensure_native_context()

8.3 混合应用测试

class HybridAppTester:
    """混合应用测试类"""
    
    def __init__(self, driver):
        self.driver = driver
        self.webview_mgr = WebViewContextManager(driver)
    
    def test_hybrid_flow(self, native_steps, webview_steps):
        """
        测试混合应用流程
        
        参数:
            native_steps: 原生应用操作列表
            webview_steps: WebView操作列表
        """
        # 执行原生操作
        self.webview_mgr.switch_to_native()
        for step in native_steps:
            step()
        
        # 切换到WebView执行操作
        self.webview_mgr.switch_to_webview()
        for step in webview_steps:
            step()
        
        # 切换回原生
        self.webview_mgr.switch_to_native()
    
    def find_element_across_contexts(self, locator, timeout=10):
        """
        跨上下文查找元素
        
        在原生和WebView上下文中都尝试查找元素
        """
        import time
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            # 尝试原生上下文
            try:
                self.webview_mgr.switch_to_native()
                element = self.driver.find_element(*locator)
                return element
            except:
                pass
            
            # 尝试WebView上下文
            try:
                contexts = self.driver.contexts
                for context in contexts:
                    if 'WEBVIEW' in context:
                        self.driver.switch_to.context(context)
                        element = self.driver.find_element(*locator)
                        return element
            except:
                pass
            
            time.sleep(0.5)
        
        raise Exception(f"在所有上下文中都未找到元素: {locator}")

# 使用示例
hybrid_tester = HybridAppTester(driver)

# 定义操作步骤
def native_login():
    driver.find_element(AppiumBy.ID, 'com.example.app:id/username').send_keys('user')
    driver.find_element(AppiumBy.ID, 'com.example.app:id/password').send_keys('pass')
    driver.find_element(AppiumBy.ID, 'com.example.app:id/login').click()

def webview_search():
    driver.find_element(By.ID, 'search_box').send_keys('test')
    driver.find_element(By.ID, 'search_btn').click()

# 执行混合测试
hybrid_tester.test_hybrid_flow(
    native_steps=[native_login],
    webview_steps=[webview_search]
)

8.4 弹窗处理

class PopupHandler:
    """弹窗处理器"""
    
    def __init__(self, driver):
        self.driver = driver
    
    def handle_alert_accept(self, timeout=5):
        """接受Alert弹窗"""
        try:
            alert = WebDriverWait(self.driver, timeout).until(
                EC.alert_is_present()
            )
            alert.accept()
            return True
        except:
            return False
    
    def handle_alert_dismiss(self, timeout=5):
        """取消Alert弹窗"""
        try:
            alert = WebDriverWait(self.driver, timeout).until(
                EC.alert_is_present()
            )
            alert.dismiss()
            return True
        except:
            return False
    
    def get_alert_text(self, timeout=5):
        """获取Alert文本"""
        try:
            alert = WebDriverWait(self.driver, timeout).until(
                EC.alert_is_present()
            )
            return alert.text
        except:
            return None
    
    def handle_android_permission_dialog(self, action='allow'):
        """
        处理Android权限弹窗
        
        参数:
            action: 'allow' 或 'deny'
        """
        from appium.webdriver.common.appiumby import AppiumBy
        
        if action == 'allow':
            locators = [
                (AppiumBy.ID, 'com.android.packageinstaller:id/permission_allow_button'),
                (AppiumBy.ID, 'android:id/button1'),
                (AppiumBy.XPATH, '//android.widget.Button[@text="允许"]'),
                (AppiumBy.XPATH, '//android.widget.Button[@text="始终允许"]'),
            ]
        else:
            locators = [
                (AppiumBy.ID, 'com.android.packageinstaller:id/permission_deny_button'),
                (AppiumBy.ID, 'android:id/button2'),
                (AppiumBy.XPATH, '//android.widget.Button[@text="拒绝"]'),
            ]
        
        for locator in locators:
            try:
                element = WebDriverWait(self.driver, 2).until(
                    EC.element_to_be_clickable(locator)
                )
                element.click()
                return True
            except:
                continue
        
        return False
    
    def handle_ios_system_alert(self):
        """处理iOS系统弹窗"""
        try:
            self.driver.execute_script('mobile: acceptAlert', {})
            return True
        except:
            return False
    
    def dismiss_all_popups(self, max_attempts=5):
        """尝试关闭所有弹窗"""
        for _ in range(max_attempts):
            popup_closed = False
            
            # 尝试关闭Alert
            if self.handle_alert_dismiss(timeout=1):
                popup_closed = True
            
            # 尝试点击返回按钮
            try:
                self.driver.back()
                popup_closed = True
            except:
                pass
            
            if not popup_closed:
                break
        
        return True

# 使用示例
popup_handler = PopupHandler(driver)

# 处理权限弹窗
popup_handler.handle_android_permission_dialog(action='allow')

# 处理Alert弹窗
if popup_handler.handle_alert_accept():
    print("Alert已处理")

# 关闭所有弹窗
popup_handler.dismiss_all_popups()

8.5 键盘与输入处理

class KeyboardInputHandler:
    """键盘输入处理器"""
    
    def __init__(self, driver):
        self.driver = driver
    
    def hide_keyboard(self):
        """隐藏键盘"""
        try:
            self.driver.hide_keyboard()
            return True
        except:
            return False
    
    def is_keyboard_shown(self):
        """检查键盘是否显示"""
        return self.driver.is_keyboard_shown()
    
    def press_key(self, keycode, metastate=None):
        """
        按下按键(Android)
        
        参数:
            keycode: 按键码
            metastate: 元状态(组合键)
        """
        if metastate:
            self.driver.press_keycode(keycode, metastate)
        else:
            self.driver.press_keycode(keycode)
    
    def long_press_key(self, keycode):
        """长按按键(Android)"""
        self.driver.long_press_keycode(keycode)
    
    def send_keys_with_keyboard(self, text):
        """使用键盘输入文本"""
        for char in text:
            self.driver.press_keycode(ord(char.upper()))
    
    def press_enter(self):
        """按下回车键"""
        self.driver.press_keycode(66)  # KEYCODE_ENTER
    
    def press_back(self):
        """按下返回键"""
        self.driver.press_keycode(4)  # KEYCODE_BACK
    
    def press_home(self):
        """按下Home键"""
        self.driver.press_keycode(3)  # KEYCODE_HOME
    
    def press_tab(self):
        """按下Tab键"""
        self.driver.press_keycode(61)  # KEYCODE_TAB
    
    def press_delete(self):
        """按下删除键"""
        self.driver.press_keycode(67)  # KEYCODE_DEL
    
    def clear_and_type(self, element, text):
        """
        清除并输入文本
        
        参数:
            element: 目标元素
            text: 要输入的文本
        """
        element.clear()
        element.send_keys(text)
    
    def slow_type(self, element, text, interval=0.1):
        """
        慢速输入(逐字符输入)
        
        参数:
            element: 目标元素
            text: 要输入的文本
            interval: 字符间隔时间(秒)
        """
        import time
        element.clear()
        for char in text:
            element.send_keys(char)
            time.sleep(interval)

# Android按键码常量
class AndroidKeyCode:
    """Android按键码常量"""
    KEYCODE_HOME = 3
    KEYCODE_BACK = 4
    KEYCODE_MENU = 82
    KEYCODE_ENTER = 66
    KEYCODE_TAB = 61
    KEYCODE_DEL = 67
    KEYCODE_VOLUME_UP = 24
    KEYCODE_VOLUME_DOWN = 25
    KEYCODE_POWER = 26
    KEYCODE_CAMERA = 27

# 使用示例
keyboard = KeyboardInputHandler(driver)

# 隐藏键盘
if keyboard.is_keyboard_shown():
    keyboard.hide_keyboard()

# 按下返回键
keyboard.press_back()

# 慢速输入
element = driver.find_element(AppiumBy.ID, 'com.example.app:id/input')
keyboard.slow_type(element, 'Hello World', interval=0.05)

8.6 文件传输

class FileTransferHandler:
    """文件传输处理器"""
    
    def __init__(self, driver):
        self.driver = driver
    
    def push_file(self, device_path, local_path):
        """
        推送文件到设备
        
        参数:
            device_path: 设备目标路径
            local_path: 本地文件路径
        """
        import base64
        
        with open(local_path, 'rb') as f:
            content = base64.b64encode(f.read()).decode('utf-8')
        
        self.driver.push_file(device_path, content)
        print(f"文件已推送: {local_path} -> {device_path}")
    
    def pull_file(self, device_path, local_path):
        """
        从设备拉取文件
        
        参数:
            device_path: 设备文件路径
            local_path: 本地保存路径
        """
        import base64
        
        content = self.driver.pull_file(device_path)
        decoded = base64.b64decode(content)
        
        with open(local_path, 'wb') as f:
            f.write(decoded)
        
        print(f"文件已拉取: {device_path} -> {local_path}")
    
    def pull_folder(self, device_path, local_path):
        """
        从设备拉取文件夹
        
        参数:
            device_path: 设备文件夹路径
            local_path: 本地保存路径
        """
        import base64
        import zipfile
        import io
        
        content = self.driver.pull_folder(device_path)
        decoded = base64.b64decode(content)
        
        with zipfile.ZipFile(io.BytesIO(decoded), 'r') as zf:
            zf.extractall(local_path)
        
        print(f"文件夹已拉取: {device_path} -> {local_path}")
    
    def push_text_as_file(self, device_path, text):
        """
        将文本推送到设备文件
        
        参数:
            device_path: 设备目标路径
            text: 文本内容
        """
        import base64
        content = base64.b64encode(text.encode('utf-8')).decode('utf-8')
        self.driver.push_file(device_path, content)

# 使用示例
file_transfer = FileTransferHandler(driver)

# 推送文件
file_transfer.push_file('/sdcard/test.txt', 'C:/local/test.txt')

# 拉取文件
file_transfer.pull_file('/sdcard/test.txt', 'C:/local/downloaded.txt')

# 推送文本
file_transfer.push_text_as_file('/sdcard/config.json', '{"key": "value"}')

第九章:测试框架与工程化

9.1 Page Object模式

from appium.webdriver.common.appiumby import AppiumBy
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

class BasePage:
    """页面基类"""
    
    def __init__(self, driver):
        self.driver = driver
        self.wait = WebDriverWait(driver, 10)
    
    def find_element(self, locator, timeout=None):
        """查找元素"""
        if timeout:
            return WebDriverWait(self.driver, timeout).until(
                EC.presence_of_element_located(locator)
            )
        return self.driver.find_element(*locator)
    
    def find_elements(self, locator):
        """查找多个元素"""
        return self.driver.find_elements(*locator)
    
    def click(self, locator):
        """点击元素"""
        element = self.wait.until(EC.element_to_be_clickable(locator))
        element.click()
    
    def input_text(self, locator, text):
        """输入文本"""
        element = self.wait.until(EC.visibility_of_element_located(locator))
        element.clear()
        element.send_keys(text)
    
    def get_text(self, locator):
        """获取元素文本"""
        element = self.wait.until(EC.visibility_of_element_located(locator))
        return element.text
    
    def is_element_visible(self, locator, timeout=5):
        """检查元素是否可见"""
        try:
            WebDriverWait(self.driver, timeout).until(
                EC.visibility_of_element_located(locator)
            )
            return True
        except:
            return False
    
    def wait_for_element_gone(self, locator, timeout=10):
        """等待元素消失"""
        WebDriverWait(self.driver, timeout).until(
            EC.invisibility_of_element_located(locator)
        )

class LoginPage(BasePage):
    """登录页面"""
    
    _USERNAME_INPUT = (AppiumBy.ID, 'com.example.app:id/username')
    _PASSWORD_INPUT = (AppiumBy.ID, 'com.example.app:id/password')
    _LOGIN_BUTTON = (AppiumBy.ID, 'com.example.app:id/login_btn')
    _ERROR_MESSAGE = (AppiumBy.ID, 'com.example.app:id/error_msg')
    
    def enter_username(self, username):
        """输入用户名"""
        self.input_text(self._USERNAME_INPUT, username)
        return self
    
    def enter_password(self, password):
        """输入密码"""
        self.input_text(self._PASSWORD_INPUT, password)
        return self
    
    def click_login(self):
        """点击登录按钮"""
        self.click(self._LOGIN_BUTTON)
        return self
    
    def login(self, username, password):
        """执行登录操作"""
        self.enter_username(username)
        self.enter_password(password)
        self.click_login()
        return self
    
    def get_error_message(self):
        """获取错误消息"""
        return self.get_text(self._ERROR_MESSAGE)
    
    def is_login_successful(self):
        """检查登录是否成功"""
        return not self.is_element_visible(self._ERROR_MESSAGE, timeout=3)

class HomePage(BasePage):
    """首页"""
    
    _WELCOME_TEXT = (AppiumBy.ID, 'com.example.app:id/welcome')
    _MENU_BUTTON = (AppiumBy.ID, 'com.example.app:id/menu')
    _LOGOUT_BUTTON = (AppiumBy.ID, 'com.example.app:id/logout')
    
    def get_welcome_message(self):
        """获取欢迎消息"""
        return self.get_text(self._WELCOME_TEXT)
    
    def open_menu(self):
        """打开菜单"""
        self.click(self._MENU_BUTTON)
        return self
    
    def logout(self):
        """退出登录"""
        self.open_menu()
        self.click(self._LOGOUT_BUTTON)
        return LoginPage(self.driver)

# 使用示例
login_page = LoginPage(driver)
login_page.login('testuser', 'password123')

if login_page.is_login_successful():
    home_page = HomePage(driver)
    print(home_page.get_welcome_message())

9.2 数据驱动测试

import pytest
import yaml
import json
import csv

class DataProvider:
    """数据提供器"""
    
    @staticmethod
    def from_yaml(file_path):
        """从YAML文件加载数据"""
        with open(file_path, 'r', encoding='utf-8') as f:
            return yaml.safe_load(f)
    
    @staticmethod
    def from_json(file_path):
        """从JSON文件加载数据"""
        with open(file_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    @staticmethod
    def from_csv(file_path, delimiter=','):
        """从CSV文件加载数据"""
        data = []
        with open(file_path, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f, delimiter=delimiter)
            for row in reader:
                data.append(dict(row))
        return data

# login_data.yaml 示例
"""
test_cases:
  - username: valid_user
    password: valid_pass
    expected: success
  - username: invalid_user
    password: invalid_pass
    expected: error
  - username: ""
    password: valid_pass
    expected: username_required
"""

# 使用数据驱动的测试
class TestLogin:
    """登录测试类"""
    
    @pytest.fixture(autouse=True)
    def setup(self, driver):
        self.login_page = LoginPage(driver)
    
    @pytest.mark.parametrize("data", DataProvider.from_yaml('login_data.yaml')['test_cases'])
    def test_login_scenarios(self, data):
        """测试各种登录场景"""
        self.login_page.login(data['username'], data['password'])
        
        if data['expected'] == 'success':
            assert self.login_page.is_login_successful()
        else:
            assert self.login_page.get_error_message() is not None

# 使用pytest运行
# pytest test_login.py -v --alluredir=./allure-results

9.3 测试配置管理

import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class DeviceConfig:
    """设备配置"""
    platform_name: str
    platform_version: str
    device_name: str
    automation_name: str
    udid: Optional[str] = None
    app: Optional[str] = None
    app_package: Optional[str] = None
    app_activity: Optional[str] = None
    bundle_id: Optional[str] = None

@dataclass
class AppiumConfig:
    """Appium配置"""
    server_url: str = 'http://localhost:4723'
    implicit_wait: int = 10
    explicit_wait: int = 15
    command_timeout: int = 300

class ConfigManager:
    """配置管理器"""
    
    def __init__(self, config_file='config.yaml'):
        self.config = self._load_config(config_file)
    
    def _load_config(self, file_path):
        """加载配置文件"""
        import yaml
        with open(file_path, 'r', encoding='utf-8') as f:
            return yaml.safe_load(f)
    
    def get_device_config(self, device_name):
        """获取设备配置"""
        device_data = self.config['devices'].get(device_name)
        if not device_data:
            raise ValueError(f"设备配置不存在: {device_name}")
        return DeviceConfig(**device_data)
    
    def get_appium_config(self):
        """获取Appium配置"""
        return AppiumConfig(**self.config.get('appium', {}))
    
    def get_test_config(self):
        """获取测试配置"""
        return self.config.get('test', {})

# config.yaml 示例
"""
devices:
  android_emulator:
    platform_name: Android
    platform_version: '13'
    device_name: emulator-5554
    automation_name: UiAutomator2
    app_package: com.example.app
    app_activity: .MainActivity
  
  ios_simulator:
    platform_name: iOS
    platform_version: '16.0'
    device_name: iPhone 14
    automation_name: XCUITest
    bundle_id: com.example.app

appium:
  server_url: http://localhost:4723
  implicit_wait: 10
  explicit_wait: 15
  command_timeout: 300

test:
  retry_count: 2
  screenshot_on_failure: true
  video_recording: true
"""

# 使用示例
config_mgr = ConfigManager()
device_config = config_mgr.get_device_config('android_emulator')
appium_config = config_mgr.get_appium_config()

9.4 测试报告

import pytest
import allure
from datetime import datetime

class AllureReporter:
    """Allure报告助手"""
    
    @staticmethod
    def step(title):
        """添加测试步骤"""
        return allure.step(title)
    
    @staticmethod
    def attach_screenshot(driver, name='截图'):
        """附加截图"""
        allure.attach(
            driver.get_screenshot_as_png(),
            name=name,
            attachment_type=allure.attachment_type.PNG
        )
    
    @staticmethod
    def attach_text(text, name='日志'):
        """附加文本"""
        allure.attach(
            text,
            name=name,
            attachment_type=allure.attachment_type.TEXT
        )
    
    @staticmethod
    def attach_html(html, name='HTML'):
        """附加HTML"""
        allure.attach(
            html,
            name=name,
            attachment_type=allure.attachment_type.HTML
        )
    
    @staticmethod
    def feature(name):
        """标记功能模块"""
        return allure.feature(name)
    
    @staticmethod
    def story(name):
        """标记用户故事"""
        return allure.story(name)
    
    @staticmethod
    def severity(level):
        """标记严重程度"""
        return allure.severity(level)

# 使用Allure的测试示例
@allure.feature('登录功能')
@allure.story('用户登录')
class TestLoginWithReport:
    """带报告的登录测试"""
    
    @pytest.fixture(autouse=True)
    def setup(self, driver):
        self.driver = driver
        self.login_page = LoginPage(driver)
    
    @allure.severity(allure.severity_level.CRITICAL)
    @allure.title('正常登录测试')
    def test_login_success(self):
        """测试正常登录"""
        with AllureReporter.step('输入用户名'):
            self.login_page.enter_username('testuser')
            AllureReporter.attach_screenshot(self.driver, '输入用户名后')
        
        with AllureReporter.step('输入密码'):
            self.login_page.enter_password('password123')
        
        with AllureReporter.step('点击登录按钮'):
            self.login_page.click_login()
        
        with AllureReporter.step('验证登录成功'):
            assert self.login_page.is_login_successful()
            AllureReporter.attach_screenshot(self.driver, '登录成功')

# pytest.ini 配置
"""
[pytest]
addopts = --alluredir=./allure-results --clean-alluredir
testpaths = ./tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
"""

9.5 CI/CD集成

# .gitlab-ci.yml 示例
stages:
  - test
  - report

android_test:
  stage: test
  image: python:3.9
  before_script:
    - pip install -r requirements.txt
    - appium driver install uiautomator2
  script:
    - pytest tests/android/ --alluredir=allure-results
  artifacts:
    paths:
      - allure-results/
    expire_in: 1 week

ios_test:
  stage: test
  tags:
    - macos
  before_script:
    - pip install -r requirements.txt
    - appium driver install xcuitest
  script:
    - pytest tests/ios/ --alluredir=allure-results
  artifacts:
    paths:
      - allure-results/
    expire_in: 1 week

generate_report:
  stage: report
  image: frankescobar/allure-docker-service
  script:
    - allure generate allure-results -o allure-report
  artifacts:
    paths:
      - allure-report/
    expire_in: 1 month
# Jenkinsfile 示例
pipeline {
    agent any
    
    stages {
        stage('Setup') {
            steps {
                sh 'pip install -r requirements.txt'
                sh 'appium driver install uiautomator2'
            }
        }
        
        stage('Test') {
            parallel {
                stage('Android Tests') {
                    steps {
                        sh 'pytest tests/android/ --alluredir=allure-results'
                    }
                }
                stage('iOS Tests') {
                    agent { label 'macos' }
                    steps {
                        sh 'pytest tests/ios/ --alluredir=allure-results'
                    }
                }
            }
        }
        
        stage('Report') {
            steps {
                allure includeProperties: false, jdk: '', results: [[path: 'allure-results']]
            }
        }
    }
    
    post {
        always {
            cleanWs()
        }
    }
}

第十章:生产环境最佳实践

10.1 Driver初始化优化

from appium import webdriver
from appium.options.android import UiAutomator2Options
from appium.options.ios import XCUITestOptions
import logging
import time

class DriverFactory:
    """Driver工厂类"""
    
    _instance = None
    _driver = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    @staticmethod
    def create_android_driver(config, retries=3):
        """
        创建Android Driver
        
        参数:
            config: 设备配置
            retries: 重试次数
        """
        options = UiAutomator2Options()
        options.platform_name = config.platform_name
        options.platform_version = config.platform_version
        options.device_name = config.device_name
        options.automation_name = config.automation_name
        options.app_package = config.app_package
        options.app_activity = config.app_activity
        options.no_reset = False
        options.new_command_timeout = 300
        options.uiautomator2_server_launch_timeout = 60000
        options.uiautomator2_server_install_timeout = 60000
        options.adb_exec_timeout = 60000
        options.skip_server_installation = False
        options.disable_window_animation = True
        options.ignore_unimportant_views = True
        options.enable_performance_logging = True
        
        for attempt in range(retries):
            try:
                driver = webdriver.Remote(
                    'http://localhost:4723',
                    options=options
                )
                driver.implicitly_wait(10)
                logging.info(f"Android Driver创建成功: {config.device_name}")
                return driver
            except Exception as e:
                logging.warning(f"第{attempt + 1}次创建Driver失败: {e}")
                if attempt == retries - 1:
                    raise
                time.sleep(5)
    
    @staticmethod
    def create_ios_driver(config, retries=3):
        """
        创建iOS Driver
        
        参数:
            config: 设备配置
            retries: 重试次数
        """
        options = XCUITestOptions()
        options.platform_name = config.platform_name
        options.platform_version = config.platform_version
        options.device_name = config.device_name
        options.automation_name = config.automation_name
        options.bundle_id = config.bundle_id
        options.no_reset = False
        options.new_command_timeout = 300
        options.wda_launch_timeout = 120000
        options.wda_connection_timeout = 240000
        options.wda_startup_retries = 4
        options.wda_startup_retry_interval = 20000
        options.use_new_wda = False
        options.use_prebuilt_wda = True
        options.skip_wda_installation = False
        options.show_ios_log = False
        options.auto_accept_alerts = True
        
        for attempt in range(retries):
            try:
                driver = webdriver.Remote(
                    'http://localhost:4723',
                    options=options
                )
                driver.implicitly_wait(10)
                logging.info(f"iOS Driver创建成功: {config.device_name}")
                return driver
            except Exception as e:
                logging.warning(f"第{attempt + 1}次创建Driver失败: {e}")
                if attempt == retries - 1:
                    raise
                time.sleep(5)
    
    @staticmethod
    def quit_driver(driver):
        """安全退出Driver"""
        if driver:
            try:
                driver.quit()
                logging.info("Driver已安全退出")
            except Exception as e:
                logging.warning(f"退出Driver时出错: {e}")

# 使用示例
factory = DriverFactory()
driver = factory.create_android_driver(device_config)

10.2 错误处理与重试

import functools
import logging
import time
from typing import Callable, Type, Tuple

def retry(
    max_attempts: int = 3,
    delay: float = 1.0,
    exceptions: Tuple[Type[Exception], ...] = (Exception,),
    on_retry: Callable = None
):
    """
    重试装饰器
    
    参数:
        max_attempts: 最大尝试次数
        delay: 重试间隔(秒)
        exceptions: 需要重试的异常类型
        on_retry: 重试时的回调函数
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    logging.warning(
                        f"函数 {func.__name__}{attempt + 1}次执行失败: {e}"
                    )
                    
                    if on_retry:
                        on_retry(attempt, e)
                    
                    if attempt < max_attempts - 1:
                        time.sleep(delay)
            
            raise last_exception
        
        return wrapper
    return decorator

class RobustElementOperations:
    """健壮的元素操作类"""
    
    def __init__(self, driver):
        self.driver = driver
    
    @retry(max_attempts=3, delay=1.0)
    def find_and_click(self, locator):
        """查找并点击元素(带重试)"""
        element = WebDriverWait(self.driver, 10).until(
            EC.element_to_be_clickable(locator)
        )
        element.click()
        return element
    
    @retry(max_attempts=3, delay=0.5)
    def find_and_input(self, locator, text):
        """查找并输入文本(带重试)"""
        element = WebDriverWait(self.driver, 10).until(
            EC.visibility_of_element_located(locator)
        )
        element.clear()
        element.send_keys(text)
        return element
    
    def safe_click(self, locator, ignore_errors=True):
        """安全点击(忽略错误)"""
        try:
            return self.find_and_click(locator)
        except Exception as e:
            logging.warning(f"点击失败: {e}")
            if not ignore_errors:
                raise
            return None

# 使用示例
robust_ops = RobustElementOperations(driver)

# 带重试的点击
robust_ops.find_and_click((AppiumBy.ID, 'com.example.app:id/button'))

# 安全点击
robust_ops.safe_click((AppiumBy.ID, 'com.example.app:id/optional_button'))

10.3 性能监控

import time
import logging
from functools import wraps
from contextlib import contextmanager

class PerformanceMonitor:
    """性能监控类"""
    
    def __init__(self, driver):
        self.driver = driver
        self.metrics = {}
    
    @contextmanager
    def measure_time(self, operation_name):
        """测量操作耗时"""
        start_time = time.time()
        yield
        elapsed_time = time.time() - start_time
        self.metrics[operation_name] = elapsed_time
        logging.info(f"{operation_name} 耗时: {elapsed_time:.2f}秒")
    
    def get_memory_info(self):
        """获取内存信息(Android)"""
        result = self.driver.execute_script('mobile: shell', {
            'command': 'dumpsys meminfo com.example.app'
        })
        return result
    
    def get_cpu_usage(self):
        """获取CPU使用率(Android)"""
        result = self.driver.execute_script('mobile: shell', {
            'command': 'dumpsys cpuinfo com.example.app'
        })
        return result
    
    def get_battery_info(self):
        """获取电池信息(Android)"""
        result = self.driver.execute_script('mobile: shell', {
            'command': 'dumpsys battery'
        })
        return result
    
    def get_network_stats(self):
        """获取网络统计(Android)"""
        result = self.driver.execute_script('mobile: shell', {
            'command': 'cat /proc/net/dev'
        })
        return result
    
    def start_performance_recording(self):
        """开始性能录制"""
        self.driver.start_recording_screen()
    
    def stop_performance_recording(self):
        """停止性能录制"""
        return self.driver.stop_recording_screen()
    
    def get_page_source_size(self):
        """获取页面源码大小"""
        source = self.driver.page_source
        return len(source)
    
    def get_metrics_summary(self):
        """获取性能指标摘要"""
        return {
            'operations': self.metrics,
            'total_time': sum(self.metrics.values()),
            'avg_time': sum(self.metrics.values()) / len(self.metrics) if self.metrics else 0
        }

def performance_decorator(func):
    """性能监控装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        elapsed_time = time.time() - start_time
        logging.info(f"函数 {func.__name__} 执行耗时: {elapsed_time:.2f}秒")
        return result
    return wrapper

# 使用示例
perf_monitor = PerformanceMonitor(driver)

with perf_monitor.measure_time('登录操作'):
    login_page.login('user', 'pass')

print(perf_monitor.get_metrics_summary())

10.4 日志与调试

import logging
import os
from datetime import datetime

class TestLogger:
    """测试日志类"""
    
    _instance = None
    
    def __new__(cls, log_dir='logs'):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialize(log_dir)
        return cls._instance
    
    def _initialize(self, log_dir):
        """初始化日志配置"""
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)
        
        log_file = os.path.join(
            log_dir, 
            f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
        )
        
        self.logger = logging.getLogger('AppiumTest')
        self.logger.setLevel(logging.DEBUG)
        
        # 文件处理器
        file_handler = logging.FileHandler(log_file, encoding='utf-8')
        file_handler.setLevel(logging.DEBUG)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
        # 格式化器
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
    
    def info(self, message):
        self.logger.info(message)
    
    def warning(self, message):
        self.logger.warning(message)
    
    def error(self, message):
        self.logger.error(message)
    
    def debug(self, message):
        self.logger.debug(message)
    
    def step(self, step_name):
        """记录测试步骤"""
        self.info(f"=== 步骤: {step_name} ===")

class DebugHelper:
    """调试助手类"""
    
    def __init__(self, driver, screenshot_dir='screenshots'):
        self.driver = driver
        self.screenshot_dir = screenshot_dir
        self.logger = TestLogger()
        
        if not os.path.exists(screenshot_dir):
            os.makedirs(screenshot_dir)
    
    def take_screenshot(self, name=None):
        """截图"""
        if name is None:
            name = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        screenshot_path = os.path.join(
            self.screenshot_dir, 
            f"{name}.png"
        )
        self.driver.save_screenshot(screenshot_path)
        self.logger.info(f"截图已保存: {screenshot_path}")
        return screenshot_path
    
    def save_page_source(self, name=None):
        """保存页面源码"""
        if name is None:
            name = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        source_path = os.path.join(
            self.screenshot_dir, 
            f"{name}.xml"
        )
        
        with open(source_path, 'w', encoding='utf-8') as f:
            f.write(self.driver.page_source)
        
        self.logger.info(f"页面源码已保存: {source_path}")
        return source_path
    
    def debug_element(self, locator):
        """调试元素"""
        try:
            element = self.driver.find_element(*locator)
            self.logger.info(f"元素找到: {locator}")
            self.logger.info(f"  - 文本: {element.text}")
            self.logger.info(f"  - 位置: {element.location}")
            self.logger.info(f"  - 大小: {element.size}")
            self.logger.info(f"  - 可见: {element.is_displayed()}")
            self.logger.info(f"  - 启用: {element.is_enabled()}")
            self.logger.info(f"  - 选中: {element.is_selected()}")
            return element
        except Exception as e:
            self.logger.error(f"元素未找到: {locator}, 错误: {e}")
            return None
    
    def highlight_and_screenshot(self, locator, name=None):
        """高亮元素并截图"""
        element = self.driver.find_element(*locator)
        self.driver.execute_script(
            'arguments[0].style.border = "3px solid red"',
            element
        )
        screenshot_path = self.take_screenshot(name)
        self.driver.execute_script(
            'arguments[0].style.border = ""',
            element
        )
        return screenshot_path

# 使用示例
logger = TestLogger()
debug_helper = DebugHelper(driver)

logger.step("开始登录测试")
logger.info("输入用户名")
debug_helper.take_screenshot('before_login')

10.5 并行测试

import pytest
from concurrent.futures import ThreadPoolExecutor
from appium import webdriver

class ParallelTestExecutor:
    """并行测试执行器"""
    
    def __init__(self, devices_config):
        self.devices_config = devices_config
    
    def create_driver_for_device(self, device_name):
        """为设备创建Driver"""
        config = self.devices_config.get(device_name)
        if config['platform_name'] == 'Android':
            return DriverFactory.create_android_driver(config)
        else:
            return DriverFactory.create_ios_driver(config)
    
    def run_test_on_device(self, device_name, test_func):
        """在指定设备上运行测试"""
        driver = None
        try:
            driver = self.create_driver_for_device(device_name)
            result = test_func(driver)
            return {'device': device_name, 'result': result, 'status': 'passed'}
        except Exception as e:
            return {'device': device_name, 'error': str(e), 'status': 'failed'}
        finally:
            if driver:
                DriverFactory.quit_driver(driver)
    
    def run_tests_parallel(self, test_func, max_workers=4):
        """并行运行测试"""
        results = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for device_name in self.devices_config.keys():
                future = executor.submit(
                    self.run_test_on_device, 
                    device_name, 
                    test_func
                )
                futures.append(future)
            
            for future in futures:
                results.append(future.result())
        
        return results

# pytest-xdist 并行测试配置
# pytest.ini
"""
[pytest]
addopts = -n auto --dist=loadscope
"""

# conftest.py
import pytest

@pytest.fixture(scope="session")
def driver(request):
    """每个worker创建独立的driver"""
    device_name = request.config.getoption("--device")
    config = load_device_config(device_name)
    driver = DriverFactory.create_android_driver(config)
    
    yield driver
    
    DriverFactory.quit_driver(driver)

def pytest_addoption(parser):
    parser.addoption("--device", action="store", default="android_emulator")

# 运行命令
# pytest tests/ -n 4 --device=android_emulator

10.6 测试数据管理

import json
import os
from typing import Any, Dict

class TestDataManager:
    """测试数据管理类"""
    
    def __init__(self, data_dir='test_data'):
        self.data_dir = data_dir
        self.cache = {}
    
    def load_json(self, file_name):
        """加载JSON数据"""
        if file_name in self.cache:
            return self.cache[file_name]
        
        file_path = os.path.join(self.data_dir, file_name)
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
            self.cache[file_name] = data
            return data
    
    def get_test_data(self, file_name, key=None):
        """获取测试数据"""
        data = self.load_json(file_name)
        if key:
            return data.get(key)
        return data
    
    def save_test_result(self, file_name, result):
        """保存测试结果"""
        file_path = os.path.join(self.data_dir, 'results', file_name)
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(result, f, ensure_ascii=False, indent=2)

# test_data/users.json 示例
"""
{
    "valid_users": [
        {"username": "user1", "password": "pass1"},
        {"username": "user2", "password": "pass2"}
    ],
    "invalid_users": [
        {"username": "invalid", "password": "wrong"}
    ]
}
"""

# 使用示例
data_mgr = TestDataManager()
valid_users = data_mgr.get_test_data('users.json', 'valid_users')
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐