AI Agent Harness Engineering 的性能指标体系:响应时间、准确率与吞吐量的完整测量

引言

在人工智能技术快速发展的今天,AI Agent(人工智能代理)已经从实验室走向了实际应用场景。从智能客服到自动化运维,从个性化推荐到自动驾驶,AI Agent正在改变着我们的生活和工作方式。然而,随着AI Agent应用场景的不断扩展,如何系统地评估和优化其性能成为了一个关键挑战。

AI Agent Harness Engineering(AI代理框架工程)作为构建和管理AI Agent的核心工程实践,其性能指标体系的建立至关重要。响应时间、准确率和吞吐量作为三个核心性能指标,直接影响着AI Agent系统的用户体验、业务价值和运营成本。

本文将深入探讨AI Agent Harness Engineering的性能指标体系,从理论到实践,从概念到代码,为读者提供一个完整的性能测量指南。无论你是刚接触AI Agent的初学者,还是有丰富经验的资深开发者,相信本文都能为你带来有价值的参考。

1. 核心概念

1.1 AI Agent

AI Agent是指能够感知环境、做出决策并采取行动的智能系统。一个典型的AI Agent通常包含以下几个核心组件:

  1. 感知器(Perceptor):负责从环境中获取信息
  2. 推理引擎(Reasoning Engine):基于感知到的信息进行决策
  3. 执行器(Actuator):将决策转化为实际行动
  4. 知识库(Knowledge Base):存储Agent的知识和经验

1.2 Harness Engineering

Harness Engineering在这里指的是构建和管理AI Agent的框架、工具和最佳实践的集合。它包括:

  1. Agent开发框架:简化AI Agent开发过程的工具集
  2. 监控和日志系统:收集和分析Agent运行状态的系统
  3. 测试和评估工具:验证Agent性能和行为的工具
  4. 部署和运维流程:确保Agent稳定运行的流程和方法

1.3 性能指标体系

性能指标体系是一组用于评估系统性能的量化指标。对于AI Agent Harness Engineering,我们主要关注:

  1. 响应时间(Latency):系统从接收请求到返回响应的时间
  2. 准确率(Accuracy):系统输出正确结果的比例
  3. 吞吐量(Throughput):系统在单位时间内处理的请求数量

2. 问题背景

2.1 AI Agent应用的普及

近年来,随着大语言模型(LLM)技术的突破,AI Agent应用呈现爆发式增长。根据Gartner的预测,到2025年,超过50%的企业将在其业务流程中集成AI Agent。这些应用涵盖了客户服务、内容创作、软件开发、数据分析等多个领域。

2.2 性能评估的挑战

随着AI Agent应用场景的增多,性能评估面临着诸多挑战:

  1. 多维度评估需求:传统的软件系统评估往往只关注响应时间和吞吐量,但AI Agent还需要评估准确率、可靠性等指标。
  2. 动态环境适应:AI Agent通常需要在动态变化的环境中运行,性能评估需要考虑环境变化的影响。
  3. 资源消耗与性能平衡:AI Agent往往消耗大量计算资源,如何在资源消耗和性能之间找到平衡是一个重要问题。
  4. 缺乏标准化评估方法:目前业界还没有形成统一的AI Agent性能评估标准,不同团队使用不同的评估方法,导致结果难以比较。

2.3 业务价值驱动

AI Agent的性能直接影响着业务价值:

  1. 响应时间:过长的响应时间会导致用户流失,研究表明,页面加载时间每增加1秒,转化率就会下降7%。
  2. 准确率:低准确率会导致用户信任度下降,甚至造成业务损失。
  3. 吞吐量:低吞吐量会限制系统的并发处理能力,影响业务规模的扩展。

3. 问题描述

3.1 响应时间测量的挑战

响应时间是评估AI Agent性能的重要指标,但测量响应时间面临以下挑战:

  1. 多阶段处理延迟:AI Agent处理请求通常涉及多个阶段,每个阶段都可能产生延迟,如何准确测量和分析每个阶段的延迟是一个挑战。
  2. 冷启动问题:AI Agent系统在启动时通常需要加载模型和资源,导致初始请求的响应时间较长。
  3. 可变延迟:由于AI Agent处理的复杂性,不同请求的响应时间可能存在较大差异,如何统计和分析这些可变延迟是一个问题。

3.2 准确率测量的挑战

准确率是评估AI Agent质量的关键指标,但测量准确率面临以下挑战:

  1. 定义"正确":在很多场景下,什么是"正确"的结果并不明确,需要仔细定义评估标准。
  2. 基准数据的获取:需要大量高质量的标注数据作为评估基准,但获取这些数据往往成本高昂。
  3. 多维度评估:AI Agent的输出可能需要从多个维度进行评估,如正确性、相关性、完整性等,如何综合这些维度是一个挑战。

3.3 吞吐量测量的挑战

吞吐量是评估AI Agent系统容量的重要指标,但测量吞吐量面临以下挑战:

  1. 资源限制:AI Agent系统通常受限于计算资源(如GPU、内存),如何在资源限制下测量最大吞吐量是一个问题。
  2. 并发处理:AI Agent系统通常需要处理大量并发请求,如何模拟真实的并发场景是一个挑战。
  3. 性能瓶颈定位:当吞吐量无法满足需求时,如何快速定位性能瓶颈是一个重要问题。

4. 问题解决

4.1 响应时间测量解决方案

针对响应时间测量的挑战,我们提出以下解决方案:

  1. 分阶段测量:将AI Agent的处理流程分解为多个阶段,分别测量每个阶段的延迟,这样可以快速定位延迟瓶颈。
  2. 考虑冷启动:在测量响应时间时,区分冷启动和热启动场景,分别进行测量和分析。
  3. 统计分析:使用百分位数(如P50、P95、P99)等统计指标来描述响应时间的分布,而不仅仅使用平均值。

4.2 准确率测量解决方案

针对准确率测量的挑战,我们提出以下解决方案:

  1. 明确定义评估标准:根据具体应用场景,制定详细的评估标准,明确什么是"正确"的结果。
  2. 自动化评估与人工评估结合:对于可以自动化评估的部分,使用自动化方法;对于难以自动化评估的部分,结合人工评估。
  3. 多维度评估框架:建立多维度评估框架,从正确性、相关性、完整性等多个维度评估AI Agent的输出。

4.3 吞吐量测量解决方案

针对吞吐量测量的挑战,我们提出以下解决方案:

  1. 压力测试:使用压力测试工具模拟大量并发请求,测量系统在不同负载下的吞吐量。
  2. 资源监控:在测试过程中监控系统资源使用情况,找出资源瓶颈。
  3. 渐进式测试:从低负载开始,逐渐增加负载,观察系统吞吐量的变化,找到系统的最大吞吐量。

5. 边界与外延

5.1 指标体系的边界

我们提出的性能指标体系主要关注以下方面:

  1. 技术性能:响应时间、准确率、吞吐量等技术指标。
  2. 系统层面:从整个AI Agent系统的层面进行评估,而不是单个组件。
  3. 定量评估:主要使用定量指标进行评估,便于比较和分析。

5.2 指标体系的外延

除了核心的响应时间、准确率和吞吐量指标外,我们还可以考虑以下外延指标:

  1. 可靠性:系统在长时间运行中的稳定性和可靠性。
  2. 可扩展性:系统随着负载增加而扩展的能力。
  3. 资源效率:系统在处理请求时的资源使用效率。
  4. 用户体验:最终用户对系统的主观体验评价。

6. 概念结构与核心要素组成

6.1 AI Agent系统的核心组件

一个完整的AI Agent系统通常包含以下核心组件:

  1. 接口层:负责与用户或其他系统进行交互,接收请求并返回响应。
  2. 协调层:负责协调多个Agent或组件之间的工作。
  3. 推理层:负责处理请求,进行推理和决策。
  4. 知识层:负责存储和管理Agent的知识和数据。
  5. 基础设施层:提供计算、存储等基础资源。

6.2 性能指标的核心要素

我们提出的性能指标体系包含以下核心要素:

  1. 响应时间

    • 端到端响应时间
    • 各处理阶段的延迟
    • 冷启动时间
    • 响应时间分布(P50、P95、P99等)
  2. 准确率

    • 任务完成率
    • 答案正确率
    • 输出质量评分
    • 用户满意度
  3. 吞吐量

    • 每秒请求数(QPS)
    • 每分钟任务数(TPM)
    • 并发处理能力
    • 资源利用率

7. 概念之间的关系

7.1 性能指标之间的关系

响应时间、准确率和吞吐量三个指标之间存在密切的关系:

  1. 响应时间与准确率:通常情况下,为了提高准确率,可能需要更复杂的处理,从而导致响应时间增加。反之,为了减少响应时间,可能需要简化处理,从而导致准确率下降。
  2. 响应时间与吞吐量:在系统资源有限的情况下,响应时间和吞吐量通常是负相关的。吞吐量增加时,响应时间也会增加;反之,要减少响应时间,可能需要降低吞吐量。
  3. 准确率与吞吐量:类似地,为了提高准确率,可能需要更多的计算资源,从而导致吞吐量下降。反之,为了提高吞吐量,可能需要牺牲一些准确率。

我们可以用以下表格来对比这三个指标的核心属性:

指标 主要关注点 影响因素 优化方向 测量难度
响应时间 处理速度 算法复杂度、资源分配、网络延迟 减少处理延迟 中等
准确率 输出质量 模型质量、数据质量、算法设计 提高输出正确性 较高
吞吐量 处理能力 并发设计、资源利用率、系统架构 提高并发处理能力 中等

7.2 AI Agent组件与性能指标的关系

不同的AI Agent组件对不同的性能指标影响不同:

  1. 接口层:主要影响响应时间和吞吐量,特别是网络延迟和并发处理能力。
  2. 协调层:主要影响响应时间和吞吐量,特别是多Agent协调的开销。
  3. 推理层:主要影响响应时间和准确率,推理算法的复杂度和质量直接影响这两个指标。
  4. 知识层:主要影响准确率和响应时间,知识的质量和检索效率会影响这两个指标。
  5. 基础设施层:主要影响响应时间和吞吐量,硬件资源的性能和配置会影响这两个指标。

我们可以用以下ER图来表示这些概念之间的关系:

contains

contains

contains

contains

contains

includes

includes

includes

affects

affects

affects

affects

affects

affects

affects

affects

affects

affects

AI_Agent_System

Interface_Layer

Coordination_Layer

Reasoning_Layer

Knowledge_Layer

Infrastructure_Layer

Performance_Metrics

Response_Time

Accuracy

Throughput

7.3 性能指标交互关系

我们可以用以下交互关系图来表示三个性能指标之间的交互关系:

增加

降低

增加

增加

降低

降低

响应时间

准确率

吞吐量

8. 数学模型

8.1 响应时间模型

响应时间可以分解为多个阶段的延迟之和:

Ttotal=Tnetwork+Tqueue+Tprocess+Tknowledge+Tresponse T_{total} = T_{network} + T_{queue} + T_{process} + T_{knowledge} + T_{response} Ttotal=Tnetwork+Tqueue+Tprocess+Tknowledge+Tresponse

其中:

  • TtotalT_{total}Ttotal 是总响应时间
  • TnetworkT_{network}Tnetwork 是网络传输延迟
  • TqueueT_{queue}Tqueue 是排队等待延迟
  • TprocessT_{process}Tprocess 是推理处理延迟
  • TknowledgeT_{knowledge}Tknowledge 是知识检索延迟
  • TresponseT_{response}Tresponse 是响应生成延迟

对于响应时间的分布,我们通常使用对数正态分布来建模:

f(t)=1tσ2πe−(ln⁡t−μ)22σ2 f(t) = \frac{1}{t\sigma\sqrt{2\pi}} e^{-\frac{(\ln t - \mu)^2}{2\sigma^2}} f(t)=tσ2π 1e2σ2(lntμ)2

其中 μ\muμσ\sigmaσ 是对数正态分布的参数。

8.2 准确率模型

准确率可以用以下公式计算:

Accuracy=TP+TNTP+TN+FP+FN Accuracy = \frac{TP + TN}{TP + TN + FP + FN} Accuracy=TP+TN+FP+FNTP+TN

其中:

  • TPTPTP 是真正例(True Positive)
  • TNTNTN 是真负例(True Negative)
  • FPFPFP 是假正例(False Positive)
  • FNFNFN 是假负例(False Negative)

对于更复杂的场景,我们可以使用F1分数来综合考虑精确率(Precision)和召回率(Recall):

F1=2×Precision×RecallPrecision+Recall F1 = 2 \times \frac{Precision \times Recall}{Precision + Recall} F1=2×Precision+RecallPrecision×Recall

其中:
Precision=TPTP+FP Precision = \frac{TP}{TP + FP} Precision=TP+FPTP
Recall=TPTP+FN Recall = \frac{TP}{TP + FN} Recall=TP+FNTP

8.3 吞吐量模型

吞吐量可以用利特尔定律(Little’s Law)来建模:

L=λ×W L = \lambda \times W L=λ×W

其中:

  • LLL 是系统中的平均请求数
  • λ\lambdaλ 是平均到达率(即吞吐量)
  • WWW 是请求在系统中的平均等待时间

对于服务器系统,我们可以使用M/M/1排队模型来估算吞吐量和响应时间的关系:

W=1μ−λ W = \frac{1}{\mu - \lambda} W=μλ1

其中 μ\muμ 是服务率。

当系统达到稳定状态时,最大吞吐量受限于服务率:

λmax<μ \lambda_{max} < \mu λmax<μ

9. 算法流程图

9.1 响应时间测量流程

以下是响应时间测量的算法流程图:

开始

准备测试环境

预热系统

发送测试请求

记录时间戳

接收响应

计算响应时间

判断是否完成所有测试

分析响应时间数据

生成报告

结束

9.2 准确率测量流程

以下是准确率测量的算法流程图:

开始

准备测试数据集

定义评估标准

运行AI Agent处理测试数据

收集Agent输出

自动化评估

人工抽样评估

综合评估结果

计算准确率指标

生成报告

结束

9.3 吞吐量测量流程

以下是吞吐量测量的算法流程图:

开始

准备测试环境

设置初始并发数

启动负载测试

监控系统性能

收集吞吐量数据

判断是否达到性能极限

增加并发数

分析吞吐量数据

定位性能瓶颈

生成报告

结束

10. 算法源代码

10.1 响应时间测量工具

以下是一个简单的响应时间测量工具的Python实现:

import time
import requests
import statistics
from typing import List, Dict, Any
import json

class ResponseTimeMeasurer:
    def __init__(self, endpoint: str, num_requests: int = 100):
        self.endpoint = endpoint
        self.num_requests = num_requests
        self.response_times: List[float] = []
        self.stage_times: List[Dict[str, float]] = []
        
    def warm_up(self, num_warmup_requests: int = 10) -> None:
        """预热系统,避免冷启动影响"""
        print(f"开始预热系统,发送 {num_warmup_requests} 个请求...")
        for i in range(num_warmup_requests):
            try:
                requests.get(self.endpoint, timeout=30)
            except Exception as e:
                print(f"预热请求 {i+1} 失败: {e}")
        print("系统预热完成")
    
    def measure_single_request(self, payload: Dict[str, Any] = None) -> Dict[str, float]:
        """测量单个请求的响应时间,并分解为多个阶段"""
        stage_times = {}
        
        # 记录请求开始时间
        total_start = time.time()
        
        # 发送请求并记录各阶段时间
        try:
            # 阶段1: 准备请求
            prepare_start = time.time()
            if payload is None:
                payload = {"query": "Hello, AI Agent!"}
            prepare_end = time.time()
            stage_times["prepare"] = prepare_end - prepare_start
            
            # 阶段2: 网络传输与处理
            request_start = time.time()
            response = requests.post(self.endpoint, json=payload, timeout=30)
            request_end = time.time()
            stage_times["request"] = request_end - request_start
            
            # 阶段3: 处理响应
            process_start = time.time()
            response.raise_for_status()
            result = response.json()
            process_end = time.time()
            stage_times["process"] = process_end - process_start
            
            # 总响应时间
            total_end = time.time()
            stage_times["total"] = total_end - total_start
            
            # 记录服务器端的详细时间信息(如果API提供)
            if "server_timing" in result:
                stage_times.update(result["server_timing"])
                
        except Exception as e:
            print(f"请求失败: {e}")
            stage_times["error"] = str(e)
            stage_times["total"] = -1
            
        return stage_times
    
    def run_measurements(self, payload: Dict[str, Any] = None) -> None:
        """运行完整的响应时间测量"""
        print(f"开始测量响应时间,计划发送 {self.num_requests} 个请求...")
        
        # 预热系统
        self.warm_up()
        
        # 清空之前的测量结果
        self.response_times = []
        self.stage_times = []
        
        # 开始测量
        for i in range(self.num_requests):
            print(f"发送请求 {i+1}/{self.num_requests}...")
            stage_time = self.measure_single_request(payload)
            
            if stage_time["total"] > 0:
                self.response_times.append(stage_time["total"])
                self.stage_times.append(stage_time)
            
            # 稍微延迟一下,避免请求过于集中
            time.sleep(0.1)
        
        print("响应时间测量完成")
    
    def analyze_results(self) -> Dict[str, Any]:
        """分析测量结果"""
        if not self.response_times:
            return {"error": "没有有效的测量结果"}
        
        results = {
            "num_requests": len(self.response_times),
            "mean": statistics.mean(self.response_times),
            "median": statistics.median(self.response_times),
            "stdev": statistics.stdev(self.response_times) if len(self.response_times) > 1 else 0,
            "min": min(self.response_times),
            "max": max(self.response_times),
            "percentiles": {}
        }
        
        # 计算百分位数
        percentiles = [50, 75, 90, 95, 99]
        sorted_times = sorted(self.response_times)
        for p in percentiles:
            index = int(len(sorted_times) * p / 100)
            results["percentiles"][f"p{p}"] = sorted_times[index]
        
        # 分析各阶段时间
        if self.stage_times:
            stage_analysis = {}
            stages = self.stage_times[0].keys()
            for stage in stages:
                times = [st[stage] for st in self.stage_times if stage in st and isinstance(st[stage], (int, float))]
                if times:
                    stage_analysis[stage] = {
                        "mean": statistics.mean(times),
                        "median": statistics.median(times),
                        "min": min(times),
                        "max": max(times)
                    }
            results["stage_analysis"] = stage_analysis
        
        return results
    
    def save_results(self, filename: str = "response_time_results.json") -> None:
        """保存测量结果到文件"""
        results = self.analyze_results()
        results["raw_data"] = {
            "response_times": self.response_times,
            "stage_times": self.stage_times
        }
        
        with open(filename, 'w') as f:
            json.dump(results, f, indent=2)
        
        print(f"结果已保存到 {filename}")

# 使用示例
if __name__ == "__main__":
    # 创建测量器实例
    measurer = ResponseTimeMeasurer(
        endpoint="http://localhost:8000/api/agent",
        num_requests=100
    )
    
    # 运行测量
    measurer.run_measurements()
    
    # 分析结果
    results = measurer.analyze_results()
    print("\n测量结果分析:")
    print(f"平均响应时间: {results['mean']:.3f}秒")
    print(f"中位数响应时间: {results['median']:.3f}秒")
    print(f"最小响应时间: {results['min']:.3f}秒")
    print(f"最大响应时间: {results['max']:.3f}秒")
    print(f"P95响应时间: {results['percentiles']['p95']:.3f}秒")
    print(f"P99响应时间: {results['percentiles']['p99']:.3f}秒")
    
    # 保存结果
    measurer.save_results()

10.2 准确率测量工具

以下是一个准确率测量工具的Python实现:

import json
import time
from typing import List, Dict, Any, Tuple
import requests
from dataclasses import dataclass
from enum import Enum

class EvaluationResult(Enum):
    CORRECT = "correct"
    PARTIALLY_CORRECT = "partially_correct"
    INCORRECT = "incorrect"
    ERROR = "error"

@dataclass
class TestCase:
    id: str
    input: Dict[str, Any]
    expected_output: Dict[str, Any]
    evaluation_criteria: Dict[str, Any]

class AccuracyMeasurer:
    def __init__(self, endpoint: str):
        self.endpoint = endpoint
        self.test_cases: List[TestCase] = []
        self.results: List[Dict[str, Any]] = []
        
    def load_test_cases(self, filepath: str) -> None:
        """从JSON文件加载测试用例"""
        with open(filepath, 'r') as f:
            test_data = json.load(f)
        
        self.test_cases = []
        for item in test_data:
            test_case = TestCase(
                id=item.get("id", str(len(self.test_cases) + 1)),
                input=item.get("input", {}),
                expected_output=item.get("expected_output", {}),
                evaluation_criteria=item.get("evaluation_criteria", {})
            )
            self.test_cases.append(test_case)
        
        print(f"已加载 {len(self.test_cases)} 个测试用例")
    
    def evaluate_output(self, actual_output: Dict[str, Any], 
                       expected_output: Dict[str, Any],
                       evaluation_criteria: Dict[str, Any]) -> Tuple[EvaluationResult, Dict[str, Any]]:
        """评估Agent的输出"""
        details = {}
        overall_score = 0.0
        max_score = 0.0
        
        # 检查是否有错误
        if "error" in actual_output:
            return EvaluationResult.ERROR, {"error": actual_output["error"]}
        
        # 根据评估标准进行评估
        for criterion, criteria in evaluation_criteria.items():
            if criterion not in expected_output or criterion not in actual_output:
                continue
            
            expected = expected_output[criterion]
            actual = actual_output[criterion]
            weight = criteria.get("weight", 1.0)
            max_score += weight
            
            # 根据类型进行不同的评估
            if isinstance(expected, str):
                # 字符串匹配评估
                if criteria.get("type") == "exact_match":
                    if actual == expected:
                        score = weight
                        details[criterion] = {"result": "correct", "score": score}
                    else:
                        score = 0
                        details[criterion] = {"result": "incorrect", "score": score, 
                                             "expected": expected, "actual": actual}
                elif criteria.get("type") == "contains":
                    if expected in actual:
                        score = weight
                        details[criterion] = {"result": "correct", "score": score}
                    else:
                        score = 0
                        details[criterion] = {"result": "incorrect", "score": score,
                                             "expected": expected, "actual": actual}
                else:
                    # 默认使用相似度评估
                    similarity = self._calculate_similarity(expected, actual)
                    score = weight * similarity
                    details[criterion] = {"result": "partial" if similarity > 0.5 else "incorrect", 
                                         "score": score, "similarity": similarity}
            elif isinstance(expected, (int, float)):
                # 数值评估
                tolerance = criteria.get("tolerance", 0.1)
                if abs(actual - expected) <= tolerance:
                    score = weight
                    details[criterion] = {"result": "correct", "score": score}
                else:
                    score = 0
                    details[criterion] = {"result": "incorrect", "score": score,
                                         "expected": expected, "actual": actual}
            elif isinstance(expected, list):
                # 列表评估
                if criteria.get("type") == "set_match":
                    expected_set = set(expected)
                    actual_set = set(actual)
                    intersection = expected_set.intersection(actual_set)
                    union = expected_set.union(actual_set)
                    jaccard = len(intersection) / len(union) if union else 1.0
                    score = weight * jaccard
                    details[criterion] = {"result": "partial" if jaccard > 0.5 else "incorrect", 
                                         "score": score, "jaccard": jaccard}
                else:
                    # 精确列表匹配
                    if actual == expected:
                        score = weight
                        details[criterion] = {"result": "correct", "score": score}
                    else:
                        score = 0
                        details[criterion] = {"result": "incorrect", "score": score,
                                             "expected": expected, "actual": actual}
            else:
                # 其他类型的默认评估
                if actual == expected:
                    score = weight
                    details[criterion] = {"result": "correct", "score": score}
                else:
                    score = 0
                    details[criterion] = {"result": "incorrect", "score": score,
                                         "expected": expected, "actual": actual}
            
            overall_score += score
        
        # 计算归一化分数
        normalized_score = overall_score / max_score if max_score > 0 else 0
        
        # 确定整体评估结果
        if normalized_score >= 0.9:
            result = EvaluationResult.CORRECT
        elif normalized_score >= 0.5:
            result = EvaluationResult.PARTIALLY_CORRECT
        else:
            result = EvaluationResult.INCORRECT
        
        return result, {
            "overall_score": overall_score,
            "max_score": max_score,
            "normalized_score": normalized_score,
            "details": details
        }
    
    def _calculate_similarity(self, str1: str, str2: str) -> float:
        """计算两个字符串的相似度(简单实现,实际应用中可以使用更复杂的方法)"""
        # 转换为小写
        str1 = str1.lower()
        str2 = str2.lower()
        
        # 计算词集相似度
        set1 = set(str1.split())
        set2 = set(str2.split())
        
        if not set1 and not set2:
            return 1.0
        
        intersection = set1.intersection(set2)
        union = set1.union(set2)
        
        return len(intersection) / len(union)
    
    def run_measurements(self) -> None:
        """运行准确率测量"""
        if not self.test_cases:
            print("没有测试用例,请先加载测试用例")
            return
        
        print(f"开始测量准确率,测试 {len(self.test_cases)} 个用例...")
        
        self.results = []
        
        for i, test_case in enumerate(self.test_cases):
            print(f"处理测试用例 {i+1}/{len(self.test_cases)}: {test_case.id}")
            
            # 发送请求到Agent
            try:
                start_time = time.time()
                response = requests.post(self.endpoint, json=test_case.input, timeout=60)
                end_time = time.time()
                
                response.raise_for_status()
                actual_output = response.json()
                
                # 评估输出
                eval_result, eval_details = self.evaluate_output(
                    actual_output, 
                    test_case.expected_output,
                    test_case.evaluation_criteria
                )
                
                # 记录结果
                self.results.append({
                    "test_case_id": test_case.id,
                    "input": test_case.input,
                    "expected_output": test_case.expected_output,
                    "actual_output": actual_output,
                    "evaluation_result": eval_result.value,
                    "evaluation_details": eval_details,
                    "response_time": end_time - start_time
                })
                
            except Exception as e:
                print(f"测试用例 {test_case.id} 处理失败: {e}")
                self.results.append({
                    "test_case_id": test_case.id,
                    "input": test_case.input,
                    "expected_output": test_case.expected_output,
                    "actual_output": {"error": str(e)},
                    "evaluation_result": EvaluationResult.ERROR.value,
                    "evaluation_details": {"error": str(e)},
                    "response_time": 0
                })
        
        print("准确率测量完成")
    
    def analyze_results(self) -> Dict[str, Any]:
        """分析测量结果"""
        if not self.results:
            return {"error": "没有有效的测量结果"}
        
        # 统计各类结果的数量
        result_counts = {
            EvaluationResult.CORRECT.value: 0,
            EvaluationResult.PARTIALLY_CORRECT.value: 0,
            EvaluationResult.INCORRECT.value: 0,
            EvaluationResult.ERROR.value: 0
        }
        
        total_score = 0.0
        total_normalized_score = 0.0
        response_times = []
        
        for result in self.results:
            eval_result = result["evaluation_result"]
            result_counts[eval_result] = result_counts.get(eval_result, 0) + 1
            
            if "normalized_score" in result["evaluation_details"]:
                total_normalized_score += result["evaluation_details"]["normalized_score"]
            
            if "overall_score" in result["evaluation_details"]:
                total_score += result["evaluation_details"]["overall_score"]
            
            if result["response_time"] > 0:
                response_times.append(result["response_time"])
        
        # 计算准确率指标
        total_tests = len(self.results)
        accuracy = result_counts[EvaluationResult.CORRECT.value] / total_tests
        partial_accuracy = result_counts[EvaluationResult.PARTIALLY_CORRECT.value] / total_tests
        error_rate = result_counts[EvaluationResult.ERROR.value] / total_tests
        
        # 计算平均分数
        avg_score = total_score / total_tests if total_tests > 0 else 0
        avg_normalized_score = total_normalized_score / total_tests if total_tests > 0 else 0
        
        # 计算平均响应时间
        avg_response_time = sum(response_times) / len(response_times) if response_times else 0
        
        return {
            "total_tests": total_tests,
            "result_counts": result_counts,
            "accuracy": accuracy,
            "partial_accuracy": partial_accuracy,
            "error_rate": error_rate,
            "avg_score": avg_score,
            "avg_normalized_score": avg_normalized_score,
            "avg_response_time": avg_response_time,
            "detailed_results": self.results
        }
    
    def save_results(self, filename: str = "accuracy_results.json") -> None:
        """保存测量结果到文件"""
        results = self.analyze_results()
        
        with open(filename, 'w') as f:
            json.dump(results, f, indent=2, ensure_ascii=False)
        
        print(f"结果已保存到 {filename}")

# 使用示例
if __name__ == "__main__":
    # 创建测量器实例
    measurer = AccuracyMeasurer(endpoint="http://localhost:8000/api/agent")
    
    # 加载测试用例
    # 假设test_cases.json文件包含测试用例数据
    measurer.load_test_cases("test_cases.json")
    
    # 运行测量
    measurer.run_measurements()
    
    # 分析结果
    results = measurer.analyze_results()
    print("\n测量结果分析:")
    print(f"总测试用例数: {results['total_tests']}")
    print(f"完全正确: {results['result_counts']['correct']}")
    print(f"部分正确: {results['result_counts']['partially_correct']}")
    print(f"不正确: {results['result_counts']['incorrect']}")
    print(f"错误: {results['result_counts']['error']}")
    print(f"准确率: {results['accuracy']:.2%}")
    print(f"平均归一化分数: {results['avg_normalized_score']:.2f}")
    print(f"平均响应时间: {results['avg_response_time']:.3f}秒")
    
    # 保存结果
    measurer.save_results()

10.3 吞吐量测量工具

以下是一个吞吐量测量工具的Python实现:

import time
import threading
import requests
import statistics
from typing import List, Dict, Any, Callable
import json
from dataclasses import dataclass
from queue import Queue, Empty

@dataclass
class ThroughputResult:
    timestamp: float
    requests_sent: int
    requests_completed: int
    requests_failed: int
    avg_response_time: float
    p50_response_time: float
    p95_response_time: float
    p99_response_time: float

class ThroughputMeasurer:
    def __init__(self, endpoint: str):
        self.endpoint = endpoint
        self.results: List[ThroughputResult] = []
        self.response_times: List[float] = []
        self.requests_sent = 0
        self.requests_completed = 0
        self.requests_failed = 0
        self.lock = threading.Lock()
        self.stop_event = threading.Event()
        
    def worker(self, payload: Dict[str, Any], queue: Queue) -> None:
        """工作线程函数,用于发送请求"""
        while not self.stop_event.is_set():
            try:
                # 从队列获取任务(如果队列为空,使用默认payload)
                try:
                    task_payload = queue.get_nowait()
                except Empty:
                    task_payload = payload
                
                # 发送请求
                start_time = time.time()
                self.requests_sent += 1
                
                try:
                    response = requests.post(self.endpoint, json=task_payload, timeout=30)
                    response.raise_for_status()
                    end_time = time.time()
                    
                    # 记录响应时间
                    response_time = end_time - start_time
                    with self.lock:
                        self.response_times.append(response_time)
                        self.requests_completed += 1
                        
                except Exception as e:
                    with self.lock:
                        self.requests_failed += 1
                    
            except Exception as e:
                print(f"工作线程错误: {e}")
    
    def monitor(self, interval: float = 1.0) -> None:
        """监控线程函数,用于定期记录性能数据"""
        while not self.stop_event.is_set():
            # 记录当前时间和状态
            timestamp = time.time()
            
            with self.lock:
                # 复制当前状态
                current_sent = self.requests_sent
                current_completed = self.requests_completed
                current_failed = self.requests_failed
                current_response_times = self.response_times.copy()
                
                # 重置计数器和响应时间列表,为下一个周期做准备
                self.requests_sent = 0
                self.requests_completed = 0
                self.requests_failed = 0
                self.response_times.clear()
            
            # 计算统计数据
            if current_response_times:
                avg_response_time = statistics.mean(current_response_times)
                sorted_times = sorted(current_response_times)
                p50_index = int(len(sorted_times) * 0.5)
                p95_index = int(len(sorted_times) * 0.95)
                p99_index = int(len(sorted_times) * 0.99)
                
                p50_response_time = sorted_times[p50_index]
                p95_response_time = sorted_times[min(p95_index, len(sorted_times)-1)]
                p99_response_time = sorted_times[min(p99_index, len(sorted_times)-1)]
            else:
                avg_response_time = 0
                p50_response_time = 0
                p95_response_time = 0
                p99_response_time = 0
            
            # 记录结果
            result = ThroughputResult(
                timestamp=timestamp,
                requests_sent=current_sent,
                requests_completed=current_completed,
                requests_failed=current_failed,
                avg_response_time=avg_response_time,
                p50_response_time=p50_response_time,
                p95_response_time=p95_response_time,
                p99_response_time=p99_response_time
            )
            self.results.append(result)
            
            # 打印当前状态
            print(f"[监控] 完成: {current_completed}, 失败: {current_failed}, "
                  f"QPS: {current_completed/interval:.1f}, "
                  f"P50: {p50_response_time:.3f}s, P95: {p95_response_time:.3f}s")
            
            # 等待下一个监控周期
            time.sleep(interval)
    
    def warm_up(self, num_requests: int = 50, payload: Dict[str, Any] = None) -> None:
        """预热系统"""
        print(f"开始预热系统,发送 {num_requests} 个请求...")
        
        if payload is None:
            payload = {"query": "Hello, AI Agent!"}
        
        for i in range(num_requests):
            try:
                requests.post(self.endpoint, json=payload, timeout=30)
            except Exception as e:
                print(f"预热请求 {i+1} 失败: {e}")
        
        print("系统预热完成")
    
    def run_constant_load_test(self, concurrency: int, duration: float, 
                               payload: Dict[str, Any] = None,
                               monitoring_interval: float = 1.0) -> None:
        """运行恒定负载测试"""
        print(f"开始恒定负载测试,并发数: {concurrency}, 持续时间: {duration}秒")
        
        if payload is None:
            payload = {"query": "Hello, AI Agent!"}
        
        # 重置状态
        self.results = []
        self.response_times = []
        self.requests_sent = 0
        self.requests_completed = 0
        self.requests_failed = 0
        self.stop_event.clear()
        
        # 预热系统
        self.warm_up()
        
        # 创建工作线程
        queue = Queue()
        threads = []
        for i in range(concurrency):
            t = threading.Thread(target=self.worker, args=(payload, queue))
            t.daemon = True
            t.start()
            threads.append(t)
        
        # 创建监控线程
        monitor_thread = threading.Thread(target=self.monitor, args=(monitoring_interval,))
        monitor_thread.daemon = True
        monitor_thread.start()
        
        # 运行指定时间
        print("测试进行中...")
        time.sleep(duration)
        
        # 停止所有线程
        self.stop_event.set()
        monitor_thread.join(timeout=monitoring_interval + 1)
        for t in threads:
            t.join(timeout=2)
        
        print("恒定负载测试完成")
    
    def run_staircase_load_test(self, start_concurrency: int, end_concurrency: int, 
                                step: int, step_duration: float,
                                payload: Dict[str, Any] = None,
                                monitoring_interval: float = 1.0) -> None:
        """运行阶梯式负载测试"""
        print(f"开始阶梯式负载测试,起始并发: {start_concurrency}, 结束并发: {end_concurrency}, "
              f"步长: {step}, 每步持续: {step_duration}秒")
        
        if payload is None:
            payload = {"query": "Hello, AI Agent!"}
        
        # 重置状态
        self.results = []
        self.response_times = []
        self.requests_sent = 0
        self.requests_completed = 0
        self.requests_failed = 0
        
        # 预热系统
        self.warm_up()
        
        # 逐步增加并发数
        for concurrency in range(start_concurrency, end_concurrency + 1, step):
            print(f"\n当前并发数: {concurrency}")
            
            # 重置当前步骤的状态
            self.response_times = []
            self.requests_sent = 0
            self.requests_completed = 0
            self.requests_failed = 0
            self.stop_event.clear()
            
            # 创建工作线程
            queue = Queue()
            threads = []
            for i in range(concurrency):
                t = threading.Thread(target=self.worker, args=(payload, queue))
                t.daemon = True
                t.start()
                threads.append(t)
            
            # 创建监控线程
            monitor_thread
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐