自动化测试 AI Agent:Harness 测试框架构建

1. 标题 (Title)

  • 从零构建智能测试系统:Harness 框架下的 AI Agent 自动化测试实战
  • AI 驱动的软件质量革命:手把手教你用 Harness 构建自动化测试 Agent
  • 未来测试已来:Harness 测试框架与 AI Agent 的完美融合指南
  • 智能测试工程师之路:基于 Harness 的自动化测试 AI Agent 开发完全手册

2. 引言 (Introduction)

痛点引入 (Hook)

在当今快速迭代的软件开发环境中,你是否曾遇到过这样的困境:新版本发布前夜,测试团队还在熬夜手动执行回归测试;随着产品功能不断增加,测试用例库变得越来越庞大,维护成本居高不下;明明已经编写了大量的自动化测试脚本,却依然有漏网之鱼,生产环境的bug依旧层出不穷?

传统的自动化测试往往依赖于人工编写的脚本,它们虽然能执行重复性任务,但缺乏灵活性和智能性。当UI发生变化、业务逻辑调整时,我们不得不花费大量时间去修复那些"脆弱"的测试脚本。更重要的是,这些脚本无法主动思考、无法预测风险、无法从历史数据中学习并优化测试策略。

文章内容概述 (What)

本文将带你进入一个全新的测试时代——AI驱动的自动化测试时代。我们将深入探讨如何利用Harness这个现代化的持续交付平台,结合前沿的AI技术,构建一个具备自主决策、自我学习能力的测试AI Agent。

我们将从基础概念讲起,逐步深入到系统架构设计、核心组件实现、AI算法集成,最后通过一个完整的实战项目,让你掌握从0到1构建智能测试系统的全部技能。

读者收益 (Why)

读完本文,你将不仅仅是一个会编写测试脚本的测试工程师,而是一名能够设计和实现智能测试系统的架构师。你将学会:

  • 理解AI Agent在自动化测试中的核心价值和应用场景
  • 掌握Harness测试框架的高级用法和扩展机制
  • 设计并实现一个可扩展的测试AI Agent架构
  • 集成机器学习模型来优化测试用例选择和执行
  • 利用自然语言处理技术实现测试用例的自动生成
  • 构建一个能够自我学习、持续进化的智能测试系统

这不仅能显著提升你的工作效率和测试质量,更能让你在AI与软件测试融合的浪潮中占据先机。


3. 准备工作 (Prerequisites)

在开始这段精彩的旅程之前,让我们确保你已经准备好了必要的装备。

技术栈/知识要求

  1. 编程基础:熟悉Python或JavaScript(我们的示例将主要使用Python),了解面向对象编程思想。
  2. 软件测试基础:理解软件测试的基本概念、测试类型(单元测试、集成测试、端到端测试等)、测试用例设计方法。
  3. CI/CD概念:了解持续集成、持续交付的基本原理和流程。
  4. 机器学习基础(可选但推荐):对监督学习、强化学习、自然语言处理有基本了解将大大帮助你理解本文的高级部分。
  5. API开发基础:了解RESTful API的设计和使用。

环境/工具准备

  1. 开发环境

    • 安装Python 3.8+(推荐使用3.9或3.10版本)
    • 安装Node.js 16+(如需使用Harness的某些JavaScript组件)
    • 一个好用的IDE或代码编辑器(PyCharm、VS Code等)
  2. Harness平台

    • 注册一个Harness账号(https://harness.io/,提供免费试用)
    • 熟悉Harness的基本界面和操作流程
  3. 其他工具

    • Git(用于版本控制)
    • Docker(可选,用于环境隔离和部署)
    • Postman或类似工具(用于API测试)

准备好了吗?让我们开始这段智能测试的探索之旅!


4. 核心内容:手把手实战 (Step-by-Step Tutorial)

核心概念

在开始动手之前,让我们先明确几个核心概念,这将帮助我们更好地理解后续的内容。

什么是AI Agent?

AI Agent(智能代理)是一种能够感知环境、做出决策并执行行动的智能系统。它具有以下几个关键特征:

  1. 自主性:能够在没有人类干预的情况下运行
  2. 反应性:能够感知环境变化并做出及时响应
  3. 主动性:能够主动设定目标并采取行动实现目标
  4. 社交能力:能够与其他Agent或人类进行交互

在测试领域,一个理想的AI Agent应该能够:

  • 自动理解应用的功能和结构
  • 智能生成测试用例
  • 自主执行测试并分析结果
  • 从错误中学习并优化测试策略
  • 预测潜在的问题区域
Harness测试框架简介

Harness是一个现代化的软件交付平台,它提供了从代码提交到生产部署的完整CI/CD解决方案。在测试方面,Harness提供了以下核心能力:

  1. 测试智能:利用机器学习分析测试结果,识别不稳定的测试和高风险变更
  2. 测试编排:灵活编排各种类型的测试执行
  3. 服务依赖映射:理解系统各部分之间的依赖关系
  4. 可观测性集成:与主流的监控和日志系统集成

Harness的架构非常灵活,允许我们通过插件和API进行扩展,这使得它成为构建测试AI Agent的理想平台。

自动化测试的演进历史

让我们通过一个表格来看看自动化测试是如何一步步发展到今天的AI驱动测试的:

阶段 主要特征 核心技术 优点 局限性
手动测试 完全依赖人工执行 人类智慧 灵活性高,能发现复杂问题 效率低,容易出错,成本高
记录回放 记录用户操作并回放 自动化工具(如QTP) 入门门槛低,快速实现自动化 维护成本高,脚本脆弱
脚本化测试 人工编写测试脚本 编程语言+测试框架 灵活性高,可维护性较好 需要编程技能,编写成本高
数据驱动/关键字驱动 分离测试逻辑和数据 高级测试框架 提高复用性,降低维护成本 仍需大量人工维护
AI驱动测试(当前) AI辅助测试生成、执行和分析 机器学习、计算机视觉、NLP 减少人工干预,提高覆盖率,自我优化 技术复杂度高,需要数据积累
完全自主测试(未来) AI完全自主完成测试全流程 强AI、自主学习系统 理论上可以完全替代人工测试 仍在研究阶段,伦理和责任问题

问题背景

传统自动化测试面临的挑战

在当今的软件开发环境中,传统自动化测试面临着前所未有的挑战:

  1. 维护成本爆炸:随着应用的迭代,UI元素和业务逻辑频繁变化,导致测试脚本大量失效,维护成本甚至超过了开发成本。研究表明,在某些项目中,自动化测试的维护成本可能占总测试成本的70%以上。

  2. 测试覆盖困境:如何在有限的时间内选择最有效的测试用例?传统的方法往往依赖测试工程师的经验,容易遗漏重要的测试场景,或者执行大量冗余的测试。

  3. 结果分析瓶颈:每次测试执行后产生大量的数据,人工分析这些数据不仅耗时,而且容易忽略重要的模式和趋势。

  4. 人才短缺:优秀的自动化测试工程师需要同时具备测试思维和编程能力,这样的人才在市场上非常稀缺。

这些挑战导致了一个尴尬的局面:虽然我们投入了大量资源在自动化测试上,但软件质量并没有得到预期的提升,发布速度也受到了限制。

AI技术为测试领域带来的机遇

幸运的是,近年来AI技术的快速发展为解决这些问题提供了新的思路:

  1. 计算机视觉:可以用于识别UI元素,即使UI发生变化也能稳健地定位元素,大大提高测试脚本的稳定性。

  2. 自然语言处理(NLP):可以分析需求文档、用户故事,自动生成测试用例;还可以分析测试失败日志,自动分类问题并提供修复建议。

  3. 机器学习:可以从历史测试数据中学习,预测哪些变更最可能引入bug,从而优化测试用例的选择和执行顺序。

  4. 强化学习:可以让Agent通过与应用的交互自动探索应用功能,发现潜在的问题。

当这些AI技术与一个强大的测试平台(如Harness)结合时,我们就可以构建出一个真正智能的测试系统,一个能够自我学习、持续进化的测试AI Agent。


问题描述

现在,让我们来明确一下我们要解决的具体问题:

我们的目标是构建一个基于Harness的测试AI Agent系统,该系统应能够:

  1. 智能测试生成:能够分析应用的结构、需求文档和历史数据,自动生成有价值的测试用例。
  2. 自适应测试执行:能够根据代码变更、历史失败数据和风险评估,智能选择和优先执行最相关的测试用例。
  3. 稳健的UI交互:利用计算机视觉技术,即使在UI发生变化的情况下也能稳定地与应用交互。
  4. 智能结果分析:自动分析测试结果,识别失败模式,分类问题类型,并提供修复建议。
  5. 持续学习优化:从每次测试执行中学习,不断优化自己的策略和模型。
  6. 与Harness无缝集成:作为Harness生态系统的一部分,能够利用Harness的现有能力,并为Harness用户提供AI增强的测试体验。

这是一个相当宏大的目标,但我们将其分解为可管理的步骤,逐步实现。


系统架构设计

在开始编码之前,让我们先设计一个清晰、可扩展的系统架构。一个好的架构将帮助我们更好地组织代码,便于维护和扩展。

整体架构图

首先,让我们通过一个Mermaid架构图来看看系统的整体结构:

数据层

AI模型层

核心服务层

API层

用户交互层

调用

调用

集成

协调

决策请求

执行测试

分析结果

反馈学习

使用

使用

使用

使用

使用

更新

更新

更新

更新

更新

读写

读写

读写

读写

Harness UI

自定义Agent控制台

Harness REST API

Agent REST API

测试编排服务

AI决策引擎

测试执行引擎

结果分析服务

学习优化服务

测试用例生成模型

风险评估模型

UI元素识别模型

失败分类模型

测试优先级模型

测试用例库

执行历史库

应用元数据

模型知识库

核心组件说明
  1. 测试编排服务:这是系统的大脑,负责协调整个测试流程。它与Harness API交互,接收测试任务,调用其他服务完成测试,并将结果返回给Harness。

  2. AI决策引擎:负责做出所有智能决策,如选择哪些测试用例执行、如何优先排序、生成新的测试用例等。

  3. 测试执行引擎:负责实际执行测试,包括UI测试、API测试等。它集成了计算机视觉技术,能够稳健地与应用交互。

  4. 结果分析服务:分析测试执行结果,识别失败原因,分类问题类型,生成有洞察力的报告。

  5. 学习优化服务:从每次测试执行中学习,更新和优化AI模型,使系统变得越来越智能。

现在我们对整体架构有了清晰的认识,接下来让我们开始逐步实现这个系统。


步骤一:环境搭建与Harness集成

首先,让我们搭建基本的开发环境,并实现与Harness平台的集成。这是我们构建测试AI Agent的基础。

环境安装

让我们创建一个Python项目,并安装必要的依赖:

# 创建项目目录
mkdir harness-test-ai-agent
cd harness-test-ai-agent

# 创建虚拟环境(推荐)
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# 安装必要的依赖
pip install fastapi uvicorn requests python-dotenv pandas scikit-learn numpy pillow pytesseract transformers torch

现在,让我们创建项目的基本结构:

harness-test-ai-agent/
├── .env
├── requirements.txt
├── src/
│   ├── __init__.py
│   ├── api/
│   │   ├── __init__.py
│   │   └── harness_integration.py
│   ├── core/
│   │   ├── __init__.py
│   │   ├── config.py
│   │   └── orchestrator.py
│   ├── ai/
│   │   ├── __init__.py
│   │   └── decision_engine.py
│   ├── execution/
│   │   ├── __init__.py
│   │   └── test_executor.py
│   └── analysis/
│       ├── __init__.py
│       └── result_analyzer.py
└── tests/
    └── __init__.py
Harness API集成

首先,让我们创建一个配置文件来管理我们的设置,包括Harness的API密钥:

# src/core/config.py
import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    # Harness配置
    HARNESS_API_KEY = os.getenv("HARNESS_API_KEY")
    HARNESS_ACCOUNT_ID = os.getenv("HARNESS_ACCOUNT_ID")
    HARNESS_ORG_ID = os.getenv("HARNESS_ORG_ID", "default")
    HARNESS_PROJECT_ID = os.getenv("HARNESS_PROJECT_ID")
    HARNESS_BASE_URL = os.getenv("HARNESS_BASE_URL", "https://app.harness.io/gateway")
    
    # Agent配置
    AGENT_NAME = os.getenv("AGENT_NAME", "HarnessTestAIAgent")
    AGENT_VERSION = os.getenv("AGENT_VERSION", "1.0.0")
    
    # 数据库配置(简化示例使用SQLite)
    DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./test_agent.db")
    
    # AI模型配置
    MODEL_CACHE_DIR = os.getenv("MODEL_CACHE_DIR", "./models")

然后创建.env文件:

# .env
HARNESS_API_KEY=your_harness_api_key_here
HARNESS_ACCOUNT_ID=your_account_id_here
HARNESS_PROJECT_ID=your_project_id_here

现在,让我们创建与Harness API交互的模块:

# src/api/harness_integration.py
import requests
from typing import Dict, List, Any, Optional
from src.core.config import Config

class HarnessIntegration:
    def __init__(self):
        self.base_url = Config.HARNESS_BASE_URL
        self.api_key = Config.HARNESS_API_KEY
        self.account_id = Config.HARNESS_ACCOUNT_ID
        self.org_id = Config.HARNESS_ORG_ID
        self.project_id = Config.HARNESS_PROJECT_ID
        
        self.headers = {
            "x-api-key": self.api_key,
            "Content-Type": "application/json"
        }
    
    def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> Dict:
        """向Harness API发送请求"""
        url = f"{self.base_url}{endpoint}"
        params = {
            "accountIdentifier": self.account_id,
            "orgIdentifier": self.org_id,
            "projectIdentifier": self.project_id
        }
        
        try:
            if method.upper() == "GET":
                response = requests.get(url, headers=self.headers, params=params)
            elif method.upper() == "POST":
                response = requests.post(url, headers=self.headers, params=params, json=data)
            elif method.upper() == "PUT":
                response = requests.put(url, headers=self.headers, params=params, json=data)
            elif method.upper() == "DELETE":
                response = requests.delete(url, headers=self.headers, params=params)
            else:
                raise ValueError(f"Unsupported HTTP method: {method}")
            
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Error making request to Harness API: {e}")
            raise
    
    def get_pipelines(self) -> List[Dict]:
        """获取项目中的所有流水线"""
        endpoint = "/pipeline/api/pipelines"
        response = self._make_request("GET", endpoint)
        return response.get("data", {}).get("content", [])
    
    def get_pipeline_executions(self, pipeline_identifier: str, limit: int = 10) -> List[Dict]:
        """获取特定流水线的执行历史"""
        endpoint = f"/pipeline/api/pipelines/{pipeline_identifier}/executions"
        params = {"limit": limit}
        response = self._make_request("GET", endpoint)
        return response.get("data", {}).get("content", [])
    
    def get_test_results(self, execution_identifier: str) -> Dict:
        """获取特定执行的测试结果"""
        # 注意:这里的端点可能需要根据Harness的实际API进行调整
        endpoint = f"/ti/api/v1/reports"
        params = {"pipelineExecutionId": execution_identifier}
        response = self._make_request("GET", endpoint)
        return response.get("data", {})
    
    def create_webhook(self, webhook_url: str, events: List[str]) -> Dict:
        """创建一个Webhook,用于接收Harness的事件通知"""
        endpoint = "/ng/api/webhook"
        data = {
            "name": "AI Test Agent Webhook",
            "url": webhook_url,
            "events": events,
            "enabled": True
        }
        return self._make_request("POST", endpoint, data)
    
    def upload_test_report(self, report_data: Dict) -> Dict:
        """上传测试报告到Harness"""
        # 注意:这里的端点可能需要根据Harness的实际API进行调整
        endpoint = "/ti/api/v1/reports"
        return self._make_request("POST", endpoint, report_data)

这个模块提供了与Harness平台交互的基本功能。接下来,让我们创建一个简单的FastAPI应用,作为我们Agent的API入口点:

# src/api/main.py
from fastapi import FastAPI, Request, HTTPException
from src.api.harness_integration import HarnessIntegration
from src.core.config import Config

app = FastAPI(title="Harness Test AI Agent", version=Config.AGENT_VERSION)

# 初始化Harness集成
harness = HarnessIntegration()

@app.get("/")
async def root():
    return {
        "name": Config.AGENT_NAME,
        "version": Config.AGENT_VERSION,
        "status": "running"
    }

@app.get("/harness/pipelines")
async def get_pipelines():
    """获取Harness项目中的所有流水线"""
    try:
        pipelines = harness.get_pipelines()
        return {"pipelines": pipelines}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/webhook/harness")
async def handle_harness_webhook(request: Request):
    """处理来自Harness的Webhook事件"""
    try:
        payload = await request.json()
        event_type = payload.get("event")
        
        # 这里我们可以根据不同的事件类型执行不同的操作
        # 例如:当流水线开始时,准备测试环境;当测试完成时,分析结果
        
        print(f"Received Harness webhook event: {event_type}")
        
        # 暂时只是记录事件,后续我们会添加更多逻辑
        return {"status": "received", "event": event_type}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

最后,让我们创建一个简单的启动脚本:

# run.py
import uvicorn
from src.core.config import Config

if __name__ == "__main__":
    uvicorn.run(
        "src.api.main:app",
        host="0.0.0.0",
        port=8000,
        reload=True
    )

现在,我们已经搭建了基本的环境,并实现了与Harness平台的初步集成。你可以通过运行python run.py来启动这个应用,然后访问http://localhost:8000/docs来查看API文档并测试端点。


步骤二:构建AI决策引擎核心

接下来,让我们构建AI决策引擎的核心部分。这是我们测试Agent的"大脑",负责做出各种智能决策。

风险评估模型

首先,让我们实现一个简单的风险评估模型,用于评估代码变更的风险,从而帮助我们决定哪些测试需要优先执行。

# src/ai/risk_assessment.py
import pandas as pd
import numpy as np
from typing import Dict, List, Any
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import OneHotEncoder
import joblib
import os
from src.core.config import Config

class RiskAssessmentModel:
    def __init__(self):
        self.model = None
        self.encoder = None
        self.model_path = os.path.join(Config.MODEL_CACHE_DIR, "risk_assessment_model.pkl")
        self.encoder_path = os.path.join(Config.MODEL_CACHE_DIR, "risk_assessment_encoder.pkl")
        
        # 确保模型缓存目录存在
        os.makedirs(Config.MODEL_CACHE_DIR, exist_ok=True)
        
        # 尝试加载已训练的模型
        self._load_model()
    
    def _load_model(self):
        """加载已训练的模型"""
        if os.path.exists(self.model_path) and os.path.exists(self.encoder_path):
            try:
                self.model = joblib.load(self.model_path)
                self.encoder = joblib.load(self.encoder_path)
                print("Risk assessment model loaded successfully.")
            except Exception as e:
                print(f"Error loading model: {e}")
                self.model = None
                self.encoder = None
    
    def _save_model(self):
        """保存训练好的模型"""
        if self.model and self.encoder:
            joblib.dump(self.model, self.model_path)
            joblib.dump(self.encoder, self.encoder_path)
            print("Risk assessment model saved successfully.")
    
    def _extract_features(self, change_data: Dict[str, Any]) -> pd.DataFrame:
        """从变更数据中提取特征"""
        # 这里我们假设change_data包含以下字段
        # 在实际应用中,你可能需要从Harness API或其他来源获取更详细的数据
        
        features = {
            "files_changed": change_data.get("files_changed", 0),
            "lines_added": change_data.get("lines_added", 0),
            "lines_deleted": change_data.get("lines_deleted", 0),
            "commit_count": change_data.get("commit_count", 1),
            "author_experience": change_data.get("author_experience", 1.0),  # 标准化的经验值
            "component": change_data.get("component", "unknown"),
            "time_since_last_release": change_data.get("time_since_last_release", 7),  # 天
            "is_critical_component": change_data.get("is_critical_component", False)
        }
        
        return pd.DataFrame([features])
    
    def _preprocess_features(self, features_df: pd.DataFrame) -> np.ndarray:
        """预处理特征"""
        # 分离数值和分类特征
        numerical_features = ["files_changed", "lines_added", "lines_deleted", 
                             "commit_count", "author_experience", "time_since_last_release"]
        categorical_features = ["component", "is_critical_component"]
        
        # 处理数值特征(简单的标准化)
        for feature in numerical_features:
            if feature in features_df.columns:
                # 这里我们使用简单的标准化,实际应用中可能需要保存训练时的均值和标准差
                features_df[feature] = (features_df[feature] - features_df[feature].mean()) / (features_df[feature].std() + 1e-8)
        
        # 处理分类特征
        if self.encoder:
            # 如果有训练好的编码器,使用它
            encoded_cats = self.encoder.transform(features_df[categorical_features])
            encoded_df = pd.DataFrame(encoded_cats.toarray(), 
                                     columns=self.encoder.get_feature_names_out(categorical_features))
        else:
            # 如果没有编码器,只是简单地丢弃分类特征(实际应用中不推荐)
            encoded_df = pd.DataFrame()
        
        # 合并处理后的特征
        final_features = pd.concat([features_df[numerical_features].reset_index(drop=True), 
                                    encoded_df.reset_index(drop=True)], axis=1)
        
        return final_features.values
    
    def train(self, training_data: List[Dict[str, Any]]):
        """训练风险评估模型"""
        # 准备训练数据
        features_list = []
        labels = []
        
        for data_point in training_data:
            features_list.append(self._extract_features(data_point))
            labels.append(data_point.get("introduced_bug", False))
        
        features_df = pd.concat(features_list, ignore_index=True)
        
        # 初始化并拟合编码器
        self.encoder = OneHotEncoder(sparse_output=False, handle_unknown="ignore")
        categorical_features = ["component", "is_critical_component"]
        self.encoder.fit(features_df[categorical_features])
        
        # 预处理特征
        X = self._preprocess_features(features_df)
        y = np.array(labels)
        
        # 训练随机森林分类器
        self.model = RandomForestClassifier(n_estimators=100, random_state=42)
        self.model.fit(X, y)
        
        # 保存模型
        self._save_model()
        
        print("Risk assessment model trained successfully.")
    
    def predict_risk(self, change_data: Dict[str, Any]) -> Dict[str, Any]:
        """预测变更的风险"""
        if not self.model or not self.encoder:
            # 如果没有训练好的模型,返回一个默认的风险评估
            print("No trained model available, returning default risk assessment.")
            return {
                "risk_score": 0.5,  # 中等风险
                "risk_level": "medium",
                "confidence": 0.5,
                "recommendation": "Run all critical tests"
            }
        
        # 提取和预处理特征
        features_df = self._extract_features(change_data)
        X = self._preprocess_features(features_df)
        
        # 预测风险
        risk_probability = self.model.predict_proba(X)[0][1]  # 获取引入bug的概率
        
        # 确定风险等级
        if risk_probability < 0.3:
            risk_level = "low"
            recommendation = "Run minimal test suite"
        elif risk_probability < 0.7:
            risk_level = "medium"
            recommendation = "Run critical and relevant tests"
        else:
            risk_level = "high"
            recommendation = "Run full test suite and perform additional manual checks"
        
        return {
            "risk_score": float(risk_probability),
            "risk_level": risk_level,
            "confidence": 0.8,  # 这里可以根据模型的实际性能进行调整
            "recommendation": recommendation
        }
测试优先级模型

接下来,让我们实现一个测试优先级模型,用于根据风险评估结果和其他因素来确定测试的执行顺序。

# src/ai/test_prioritization.py
from typing import Dict, List, Any
import pandas as pd
import numpy as np
from src.ai.risk_assessment import RiskAssessmentModel

class TestPrioritizer:
    def __init__(self, risk_assessment_model: RiskAssessmentModel):
        self.risk_model = risk_assessment_model
    
    def _calculate_test_failure_rate(self, test_history: List[Dict[str, Any]]) -> float:
        """计算测试的历史失败率"""
        if not test_history:
            return 0.5  # 默认中等失败率
        
        total_runs = len(test_history)
        failures = sum(1 for run in test_history if run.get("status") == "failed")
        
        return failures / total_runs
    
    def _calculate_test_execution_time(self, test_history: List[Dict[str, Any]]) -> float:
        """计算测试的平均执行时间"""
        if not test_history:
            return 60.0  # 默认60秒
        
        execution_times = [run.get("execution_time", 60.0) for run in test_history]
        return np.mean(execution_times)
    
    def _calculate_test_importance(self, test_data: Dict[str, Any]) -> float:
        """计算测试的重要性"""
        # 可以基于测试覆盖的功能、代码覆盖率等因素来计算
        # 这里我们使用一个简化的方法
        
        # 默认重要性
        importance = 0.5
        
        # 根据测试类型调整
        test_type = test_data.get("type", "unknown")
        if test_type == "smoke":
            importance = 0.9
        elif test_type == "regression":
            importance = 0.7
        elif test_type == "integration":
            importance = 0.6
        elif test_type == "unit":
            importance = 0.4
        
        # 根据是否是关键路径调整
        if test_data.get("is_critical_path", False):
            importance += 0.2
        
        # 确保重要性在0-1之间
        return min(max(importance, 0.0), 1.0)
    
    def _calculate_relevance_to_change(self, test_data: Dict[str, Any], 
                                       change_data: Dict[str, Any]) -> float:
        """计算测试与变更的相关性"""
        # 这里我们使用一个简化的方法,实际应用中可能需要更复杂的算法
        # 比如基于代码覆盖率、变更文件与测试文件的关系等
        
        relevance = 0.3  # 默认低相关性
        
        # 检查组件是否匹配
        if test_data.get("component") == change_data.get("component"):
            relevance += 0.4
        
        # 检查标签是否有重叠
        test_tags = set(test_data.get("tags", []))
        change_tags = set(change_data.get("tags", []))
        if test_tags & change_tags:
            relevance += 0.2
        
        # 检查测试是否覆盖了变更的文件
        changed_files = set(change_data.get("changed_files", []))
        test_covered_files = set(test_data.get("covered_files", []))
        if changed_files & test_covered_files:
            relevance += 0.3
        
        # 确保相关性在0-1之间
        return min(max(relevance, 0.0), 1.0)
    
    def prioritize_tests(self, tests: List[Dict[str, Any]], 
                        change_data: Dict[str, Any],
                        risk_assessment: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        对测试进行优先级排序
        
        参数:
            tests: 测试列表,每个测试是一个字典,包含测试信息和历史数据
            change_data: 代码变更数据
            risk_assessment: 风险评估结果
            
        返回:
            排序后的测试列表,每个测试包含优先级分数
        """
        prioritized_tests = []
        
        # 获取风险分数
        risk_score = risk_assessment.get("risk_score", 0.5)
        
        for test in tests:
            # 计算各种因素的分数
            failure_rate = self._calculate_test_failure_rate(test.get("history", []))
            execution_time = self._calculate_test_execution_time(test.get("history", []))
            importance = self._calculate_test_importance(test)
            relevance = self._calculate_relevance_to_change(test, change_data)
            
            # 计算优先级分数 - 这是一个简化的公式,你可以根据实际情况调整权重
            # 我们希望高失败率、高重要性、高相关性、短执行时间的测试优先执行
            priority_score = (
                0.3 * failure_rate +
                0.3 * importance +
                0.3 * relevance +
                0.1 * (1.0 - min(execution_time / 300.0, 1.0))  # 执行时间因素,最多考虑5分钟
            )
            
            # 根据整体风险调整优先级
            if risk_score > 0.7:  # 高风险
                # 更加重视重要性和相关性
                priority_score = (
                    0.2 * failure_rate +
                    0.4 * importance +
                    0.3 * relevance +
                    0.1 * (1.0 - min(execution_time / 300.0, 1.0))
                )
            
            # 将分数和测试信息添加到结果中
            test_with_priority = test.copy()
            test_with_priority["priority_score"] = priority_score
            test_with_priority["factors"] = {
                "failure_rate": failure_rate,
                "importance": importance,
                "relevance": relevance,
                "execution_time_factor": 1.0 - min(execution_time / 300.0, 1.0)
            }
            
            prioritized_tests.append(test_with_priority)
        
        # 按优先级分数降序排序
        prioritized_tests.sort(key=lambda x: x["priority_score"], reverse=True)
        
        return prioritized_tests
    
    def select_optimal_test_subset(self, prioritized_tests: List[Dict[str, Any]], 
                                   risk_assessment: Dict[str, Any],
                                   max_execution_time: float = 3600.0) -> List[Dict[str, Any]]:
        """
        选择最优的测试子集,在给定的时间限制内最大化测试效果
        
        参数:
            prioritized_tests: 已排序的测试列表
            risk_assessment: 风险评估结果
            max_execution_time: 最大执行时间(秒),默认1小时
            
        返回:
            选择的测试子集
        """
        selected_tests = []
        total_time = 0.0
        
        # 根据风险确定最小覆盖率
        risk_level = risk_assessment.get("risk_level", "medium")
        if risk_level == "high":
            # 高风险,选择更多测试
            min_coverage = 0.9
        elif risk_level == "medium":
            min_coverage = 0.7
        else:
            min_coverage = 0.5
        
        # 这里我们使用一个简单的贪心算法,总是选择优先级最高且不超过时间限制的测试
        # 实际应用中可能需要更复杂的算法,如整数规划等
        
        for test in prioritized_tests:
            test_time = self._calculate_test_execution_time(test.get("history", []))
            
            if total_time + test_time <= max_execution_time:
                selected_tests.append(test)
                total_time += test_time
            else:
                # 如果添加这个测试会超过时间限制,尝试添加更短的测试
                # 这里简化处理,直接跳过
                continue
        
        # 计算当前选择的覆盖率(简化处理)
        # 实际应用中,你可能需要基于代码覆盖率或功能覆盖率来计算
        coverage_estimate = min(len(selected_tests) / max(len(prioritized_tests), 1), 1.0)
        
        # 如果覆盖率不够,即使超过一点时间限制也继续添加测试
        if coverage_estimate < min_coverage:
            remaining_tests = [t for t in prioritized_tests if t not in selected_tests]
            for test in remaining_tests:
                if coverage_estimate >= min_coverage:
                    break
                selected_tests.append(test)
                coverage_estimate = min(len(selected_tests) / max(len(prioritized_tests), 1), 1.0)
        
        return selected_tests

现在,让我们将这些组件整合到AI决策引擎中:

# src/ai/decision_engine.py
from typing import Dict, List, Any
from src.ai.risk_assessment import RiskAssessmentModel
from src.ai.test_prioritization import TestPrioritizer

class AIDecisionEngine:
    def __init__(self):
        self.risk_assessment_model = RiskAssessmentModel()
        self.test_prioritizer = TestPrioritizer(self.risk_assessment_model)
    
    def analyze_changes_and_plan_tests(self, change_data: Dict[str, Any], 
                                       available_tests: List[Dict[str, Any]],
                                       max_execution_time: float = 3600.0) -> Dict[str, Any]:
        """
        分析代码变更并规划测试执行
        
        参数:
            change_data: 代码变更数据
            available_tests: 可用的测试列表
            max_execution_time: 最大执行时间(秒)
            
        返回:
            包含风险评估、优先级测试列表和执行计划的字典
        """
        # 1. 评估变更风险
        risk_assessment = self.risk_assessment_model.predict_risk(change_data)
        
        # 2. 对测试进行优先级排序
        prioritized_tests = self.test_prioritizer.prioritize_tests(
            available_tests, change_data, risk_assessment
        )
        
        # 3. 选择最优测试子集
        selected_tests = self.test_prioritizer.select_optimal_test_subset(
            prioritized_tests, risk_assessment, max_execution_time
        )
        
        # 4. 生成执行计划
        execution_plan = {
            "risk_assessment": risk_assessment,
            "total_tests_available": len(available_tests),
            "total_tests_selected": len(selected_tests),
            "estimated_execution_time": sum(
                self.test_prioritizer._calculate_test_execution_time(test.get("history", []))
                for test in selected_tests
            ),
            "selected_tests": selected_tests,
            "all_prioritized_tests": prioritized_tests,
            "recommendation": risk_assessment["recommendation"]
        }
        
        return execution_plan
    
    def update_model_with_new_data(self, execution_data: List[Dict[str, Any]]):
        """
        使用新的执行数据更新模型
        
        参数:
            execution_data: 执行数据列表,每个数据点应包含变更信息和是否引入bug的标签
        """
        # 更新风险评估模型
        self.risk_assessment_model.train(execution_data)
        
        print("AI models updated with new execution data.")

步骤三:构建测试编排器

现在,让我们构建测试编排器,这是连接各个组件的核心,负责协调整个测试流程。

# src/core/orchestrator.py
from typing import Dict, List, Any, Optional
from src.api.harness_integration import HarnessIntegration
from src.ai.decision_engine import AIDecisionEngine
from datetime import datetime
import json

class TestOrchestrator:
    def __init__(self):
        self.harness = HarnessIntegration()
        self.decision_engine = AIDecisionEngine()
        self.execution_history = []  # 在实际应用中,这应该是一个数据库
    
    def handle_pipeline_event(self, event_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        处理来自Harness的流水线事件
        
        参数:
            event_data: 事件数据
            
        返回:
            处理结果
        """
        event_type = event_data.get("event")
        pipeline_id = event_data.get("pipelineIdentifier")
        execution_id = event_data.get("executionId")
        
        print(f"Handling {event_type} event for pipeline {pipeline_id}, execution {execution_id}")
        
        if event_type == "PIPELINE_STARTED":
            return self._handle_pipeline_started(event_data)
        elif event_type == "PIPELINE_COMPLETED":
            return self._handle_pipeline_completed(event_data)
        elif event_type == "STAGE_COMPLETED":
            return self._handle_stage_completed(event_data)
        else:
            return {
                "status": "ignored",
                "message": f"Event type {event_type} not handled"
            }
    
    def _handle_pipeline_started(self, event_data: Dict[str, Any]) -> Dict[str, Any]:
        """处理流水线开始事件"""
        pipeline_id = event_data.get("pipelineIdentifier")
        execution_id = event_data.get("executionId")
        
        # 1. 获取变更数据
        change_data = self._extract_change_data(event_data)
        
        # 2. 获取可用的测试列表
        available_tests = self._get_available_tests(pipeline_id)
        
        # 3. 使用AI决策引擎规划测试
        test_plan = self.decision_engine.analyze_changes_and_plan_tests(
            change_data, available_tests
        )
        
        # 4. 保存执行计划
        execution_record = {
            "execution_id": execution_id,
            "pipeline_id": pipeline_id,
            "start_time": datetime.now().isoformat(),
            "change_data": change_data,
            "test_plan": test_plan,
            "status": "planning_completed"
        }
        self.execution_history.append(execution_record)
        
        print(f"Test planning completed for execution {execution_id}")
        
        return {
            "status": "success",
            "message": "Test planning completed",
            "test_plan": test_plan
        }
    
    def _handle_stage_completed(self, event_data: Dict[str, Any]) -> Dict[str, Any]:
        """处理阶段完成事件"""
        stage_name = event_data.get("stageName")
        
        # 我们只对测试阶段感兴趣
        if "test" in stage_name.lower() or "qa" in stage_name.lower():
            execution_id = event_data.get("executionId")
            
            # 1. 获取测试结果
            test_results = self.harness.get_test_results(execution_id)
            
            # 2. 分析测试结果
            analysis = self._analyze_test_results(test_results)
            
            # 3. 更新执行记录
            for record in self.execution_history:
                if record["execution_id"] == execution_id:
                    record["test_results"] = test_results
                    record["analysis"] = analysis
                    record["status"] = "test_stage_completed"
                    break
            
            print(f"Test stage completed and analyzed for execution {execution_id}")
            
            return {
                "status": "success",
                "message": "Test results analyzed",
                "analysis": analysis
            }
        
        return {
            "status": "ignored",
            "message": f"Stage {stage_name} not a test stage"
        }
    
    def _handle_pipeline_completed(self, event_data: Dict[str, Any]) -> Dict[str, Any]:
        """处理流水线完成事件"""
        execution_id = event
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐