AI Agent Harness Engineering 的性能指标体系:响应时间、准确率与吞吐量的完整测量
AI Agent Harness Engineering 的性能指标体系:响应时间、准确率与吞吐量的完整测量
引言
在人工智能技术快速发展的今天,AI Agent(人工智能代理)已经从实验室走向了实际应用场景。从智能客服到自动化运维,从个性化推荐到自动驾驶,AI Agent正在改变着我们的生活和工作方式。然而,随着AI Agent应用场景的不断扩展,如何系统地评估和优化其性能成为了一个关键挑战。
AI Agent Harness Engineering(AI代理框架工程)作为构建和管理AI Agent的核心工程实践,其性能指标体系的建立至关重要。响应时间、准确率和吞吐量作为三个核心性能指标,直接影响着AI Agent系统的用户体验、业务价值和运营成本。
本文将深入探讨AI Agent Harness Engineering的性能指标体系,从理论到实践,从概念到代码,为读者提供一个完整的性能测量指南。无论你是刚接触AI Agent的初学者,还是有丰富经验的资深开发者,相信本文都能为你带来有价值的参考。
1. 核心概念
1.1 AI Agent
AI Agent是指能够感知环境、做出决策并采取行动的智能系统。一个典型的AI Agent通常包含以下几个核心组件:
- 感知器(Perceptor):负责从环境中获取信息
- 推理引擎(Reasoning Engine):基于感知到的信息进行决策
- 执行器(Actuator):将决策转化为实际行动
- 知识库(Knowledge Base):存储Agent的知识和经验
1.2 Harness Engineering
Harness Engineering在这里指的是构建和管理AI Agent的框架、工具和最佳实践的集合。它包括:
- Agent开发框架:简化AI Agent开发过程的工具集
- 监控和日志系统:收集和分析Agent运行状态的系统
- 测试和评估工具:验证Agent性能和行为的工具
- 部署和运维流程:确保Agent稳定运行的流程和方法
1.3 性能指标体系
性能指标体系是一组用于评估系统性能的量化指标。对于AI Agent Harness Engineering,我们主要关注:
- 响应时间(Latency):系统从接收请求到返回响应的时间
- 准确率(Accuracy):系统输出正确结果的比例
- 吞吐量(Throughput):系统在单位时间内处理的请求数量
2. 问题背景
2.1 AI Agent应用的普及
近年来,随着大语言模型(LLM)技术的突破,AI Agent应用呈现爆发式增长。根据Gartner的预测,到2025年,超过50%的企业将在其业务流程中集成AI Agent。这些应用涵盖了客户服务、内容创作、软件开发、数据分析等多个领域。
2.2 性能评估的挑战
随着AI Agent应用场景的增多,性能评估面临着诸多挑战:
- 多维度评估需求:传统的软件系统评估往往只关注响应时间和吞吐量,但AI Agent还需要评估准确率、可靠性等指标。
- 动态环境适应:AI Agent通常需要在动态变化的环境中运行,性能评估需要考虑环境变化的影响。
- 资源消耗与性能平衡:AI Agent往往消耗大量计算资源,如何在资源消耗和性能之间找到平衡是一个重要问题。
- 缺乏标准化评估方法:目前业界还没有形成统一的AI Agent性能评估标准,不同团队使用不同的评估方法,导致结果难以比较。
2.3 业务价值驱动
AI Agent的性能直接影响着业务价值:
- 响应时间:过长的响应时间会导致用户流失,研究表明,页面加载时间每增加1秒,转化率就会下降7%。
- 准确率:低准确率会导致用户信任度下降,甚至造成业务损失。
- 吞吐量:低吞吐量会限制系统的并发处理能力,影响业务规模的扩展。
3. 问题描述
3.1 响应时间测量的挑战
响应时间是评估AI Agent性能的重要指标,但测量响应时间面临以下挑战:
- 多阶段处理延迟:AI Agent处理请求通常涉及多个阶段,每个阶段都可能产生延迟,如何准确测量和分析每个阶段的延迟是一个挑战。
- 冷启动问题:AI Agent系统在启动时通常需要加载模型和资源,导致初始请求的响应时间较长。
- 可变延迟:由于AI Agent处理的复杂性,不同请求的响应时间可能存在较大差异,如何统计和分析这些可变延迟是一个问题。
3.2 准确率测量的挑战
准确率是评估AI Agent质量的关键指标,但测量准确率面临以下挑战:
- 定义"正确":在很多场景下,什么是"正确"的结果并不明确,需要仔细定义评估标准。
- 基准数据的获取:需要大量高质量的标注数据作为评估基准,但获取这些数据往往成本高昂。
- 多维度评估:AI Agent的输出可能需要从多个维度进行评估,如正确性、相关性、完整性等,如何综合这些维度是一个挑战。
3.3 吞吐量测量的挑战
吞吐量是评估AI Agent系统容量的重要指标,但测量吞吐量面临以下挑战:
- 资源限制:AI Agent系统通常受限于计算资源(如GPU、内存),如何在资源限制下测量最大吞吐量是一个问题。
- 并发处理:AI Agent系统通常需要处理大量并发请求,如何模拟真实的并发场景是一个挑战。
- 性能瓶颈定位:当吞吐量无法满足需求时,如何快速定位性能瓶颈是一个重要问题。
4. 问题解决
4.1 响应时间测量解决方案
针对响应时间测量的挑战,我们提出以下解决方案:
- 分阶段测量:将AI Agent的处理流程分解为多个阶段,分别测量每个阶段的延迟,这样可以快速定位延迟瓶颈。
- 考虑冷启动:在测量响应时间时,区分冷启动和热启动场景,分别进行测量和分析。
- 统计分析:使用百分位数(如P50、P95、P99)等统计指标来描述响应时间的分布,而不仅仅使用平均值。
4.2 准确率测量解决方案
针对准确率测量的挑战,我们提出以下解决方案:
- 明确定义评估标准:根据具体应用场景,制定详细的评估标准,明确什么是"正确"的结果。
- 自动化评估与人工评估结合:对于可以自动化评估的部分,使用自动化方法;对于难以自动化评估的部分,结合人工评估。
- 多维度评估框架:建立多维度评估框架,从正确性、相关性、完整性等多个维度评估AI Agent的输出。
4.3 吞吐量测量解决方案
针对吞吐量测量的挑战,我们提出以下解决方案:
- 压力测试:使用压力测试工具模拟大量并发请求,测量系统在不同负载下的吞吐量。
- 资源监控:在测试过程中监控系统资源使用情况,找出资源瓶颈。
- 渐进式测试:从低负载开始,逐渐增加负载,观察系统吞吐量的变化,找到系统的最大吞吐量。
5. 边界与外延
5.1 指标体系的边界
我们提出的性能指标体系主要关注以下方面:
- 技术性能:响应时间、准确率、吞吐量等技术指标。
- 系统层面:从整个AI Agent系统的层面进行评估,而不是单个组件。
- 定量评估:主要使用定量指标进行评估,便于比较和分析。
5.2 指标体系的外延
除了核心的响应时间、准确率和吞吐量指标外,我们还可以考虑以下外延指标:
- 可靠性:系统在长时间运行中的稳定性和可靠性。
- 可扩展性:系统随着负载增加而扩展的能力。
- 资源效率:系统在处理请求时的资源使用效率。
- 用户体验:最终用户对系统的主观体验评价。
6. 概念结构与核心要素组成
6.1 AI Agent系统的核心组件
一个完整的AI Agent系统通常包含以下核心组件:
- 接口层:负责与用户或其他系统进行交互,接收请求并返回响应。
- 协调层:负责协调多个Agent或组件之间的工作。
- 推理层:负责处理请求,进行推理和决策。
- 知识层:负责存储和管理Agent的知识和数据。
- 基础设施层:提供计算、存储等基础资源。
6.2 性能指标的核心要素
我们提出的性能指标体系包含以下核心要素:
-
响应时间:
- 端到端响应时间
- 各处理阶段的延迟
- 冷启动时间
- 响应时间分布(P50、P95、P99等)
-
准确率:
- 任务完成率
- 答案正确率
- 输出质量评分
- 用户满意度
-
吞吐量:
- 每秒请求数(QPS)
- 每分钟任务数(TPM)
- 并发处理能力
- 资源利用率
7. 概念之间的关系
7.1 性能指标之间的关系
响应时间、准确率和吞吐量三个指标之间存在密切的关系:
- 响应时间与准确率:通常情况下,为了提高准确率,可能需要更复杂的处理,从而导致响应时间增加。反之,为了减少响应时间,可能需要简化处理,从而导致准确率下降。
- 响应时间与吞吐量:在系统资源有限的情况下,响应时间和吞吐量通常是负相关的。吞吐量增加时,响应时间也会增加;反之,要减少响应时间,可能需要降低吞吐量。
- 准确率与吞吐量:类似地,为了提高准确率,可能需要更多的计算资源,从而导致吞吐量下降。反之,为了提高吞吐量,可能需要牺牲一些准确率。
我们可以用以下表格来对比这三个指标的核心属性:
| 指标 | 主要关注点 | 影响因素 | 优化方向 | 测量难度 |
|---|---|---|---|---|
| 响应时间 | 处理速度 | 算法复杂度、资源分配、网络延迟 | 减少处理延迟 | 中等 |
| 准确率 | 输出质量 | 模型质量、数据质量、算法设计 | 提高输出正确性 | 较高 |
| 吞吐量 | 处理能力 | 并发设计、资源利用率、系统架构 | 提高并发处理能力 | 中等 |
7.2 AI Agent组件与性能指标的关系
不同的AI Agent组件对不同的性能指标影响不同:
- 接口层:主要影响响应时间和吞吐量,特别是网络延迟和并发处理能力。
- 协调层:主要影响响应时间和吞吐量,特别是多Agent协调的开销。
- 推理层:主要影响响应时间和准确率,推理算法的复杂度和质量直接影响这两个指标。
- 知识层:主要影响准确率和响应时间,知识的质量和检索效率会影响这两个指标。
- 基础设施层:主要影响响应时间和吞吐量,硬件资源的性能和配置会影响这两个指标。
我们可以用以下ER图来表示这些概念之间的关系:
7.3 性能指标交互关系
我们可以用以下交互关系图来表示三个性能指标之间的交互关系:
8. 数学模型
8.1 响应时间模型
响应时间可以分解为多个阶段的延迟之和:
Ttotal=Tnetwork+Tqueue+Tprocess+Tknowledge+Tresponse T_{total} = T_{network} + T_{queue} + T_{process} + T_{knowledge} + T_{response} Ttotal=Tnetwork+Tqueue+Tprocess+Tknowledge+Tresponse
其中:
- TtotalT_{total}Ttotal 是总响应时间
- TnetworkT_{network}Tnetwork 是网络传输延迟
- TqueueT_{queue}Tqueue 是排队等待延迟
- TprocessT_{process}Tprocess 是推理处理延迟
- TknowledgeT_{knowledge}Tknowledge 是知识检索延迟
- TresponseT_{response}Tresponse 是响应生成延迟
对于响应时间的分布,我们通常使用对数正态分布来建模:
f(t)=1tσ2πe−(lnt−μ)22σ2 f(t) = \frac{1}{t\sigma\sqrt{2\pi}} e^{-\frac{(\ln t - \mu)^2}{2\sigma^2}} f(t)=tσ2π1e−2σ2(lnt−μ)2
其中 μ\muμ 和 σ\sigmaσ 是对数正态分布的参数。
8.2 准确率模型
准确率可以用以下公式计算:
Accuracy=TP+TNTP+TN+FP+FN Accuracy = \frac{TP + TN}{TP + TN + FP + FN} Accuracy=TP+TN+FP+FNTP+TN
其中:
- TPTPTP 是真正例(True Positive)
- TNTNTN 是真负例(True Negative)
- FPFPFP 是假正例(False Positive)
- FNFNFN 是假负例(False Negative)
对于更复杂的场景,我们可以使用F1分数来综合考虑精确率(Precision)和召回率(Recall):
F1=2×Precision×RecallPrecision+Recall F1 = 2 \times \frac{Precision \times Recall}{Precision + Recall} F1=2×Precision+RecallPrecision×Recall
其中:
Precision=TPTP+FP Precision = \frac{TP}{TP + FP} Precision=TP+FPTP
Recall=TPTP+FN Recall = \frac{TP}{TP + FN} Recall=TP+FNTP
8.3 吞吐量模型
吞吐量可以用利特尔定律(Little’s Law)来建模:
L=λ×W L = \lambda \times W L=λ×W
其中:
- LLL 是系统中的平均请求数
- λ\lambdaλ 是平均到达率(即吞吐量)
- WWW 是请求在系统中的平均等待时间
对于服务器系统,我们可以使用M/M/1排队模型来估算吞吐量和响应时间的关系:
W=1μ−λ W = \frac{1}{\mu - \lambda} W=μ−λ1
其中 μ\muμ 是服务率。
当系统达到稳定状态时,最大吞吐量受限于服务率:
λmax<μ \lambda_{max} < \mu λmax<μ
9. 算法流程图
9.1 响应时间测量流程
以下是响应时间测量的算法流程图:
9.2 准确率测量流程
以下是准确率测量的算法流程图:
9.3 吞吐量测量流程
以下是吞吐量测量的算法流程图:
10. 算法源代码
10.1 响应时间测量工具
以下是一个简单的响应时间测量工具的Python实现:
import time
import requests
import statistics
from typing import List, Dict, Any
import json
class ResponseTimeMeasurer:
def __init__(self, endpoint: str, num_requests: int = 100):
self.endpoint = endpoint
self.num_requests = num_requests
self.response_times: List[float] = []
self.stage_times: List[Dict[str, float]] = []
def warm_up(self, num_warmup_requests: int = 10) -> None:
"""预热系统,避免冷启动影响"""
print(f"开始预热系统,发送 {num_warmup_requests} 个请求...")
for i in range(num_warmup_requests):
try:
requests.get(self.endpoint, timeout=30)
except Exception as e:
print(f"预热请求 {i+1} 失败: {e}")
print("系统预热完成")
def measure_single_request(self, payload: Dict[str, Any] = None) -> Dict[str, float]:
"""测量单个请求的响应时间,并分解为多个阶段"""
stage_times = {}
# 记录请求开始时间
total_start = time.time()
# 发送请求并记录各阶段时间
try:
# 阶段1: 准备请求
prepare_start = time.time()
if payload is None:
payload = {"query": "Hello, AI Agent!"}
prepare_end = time.time()
stage_times["prepare"] = prepare_end - prepare_start
# 阶段2: 网络传输与处理
request_start = time.time()
response = requests.post(self.endpoint, json=payload, timeout=30)
request_end = time.time()
stage_times["request"] = request_end - request_start
# 阶段3: 处理响应
process_start = time.time()
response.raise_for_status()
result = response.json()
process_end = time.time()
stage_times["process"] = process_end - process_start
# 总响应时间
total_end = time.time()
stage_times["total"] = total_end - total_start
# 记录服务器端的详细时间信息(如果API提供)
if "server_timing" in result:
stage_times.update(result["server_timing"])
except Exception as e:
print(f"请求失败: {e}")
stage_times["error"] = str(e)
stage_times["total"] = -1
return stage_times
def run_measurements(self, payload: Dict[str, Any] = None) -> None:
"""运行完整的响应时间测量"""
print(f"开始测量响应时间,计划发送 {self.num_requests} 个请求...")
# 预热系统
self.warm_up()
# 清空之前的测量结果
self.response_times = []
self.stage_times = []
# 开始测量
for i in range(self.num_requests):
print(f"发送请求 {i+1}/{self.num_requests}...")
stage_time = self.measure_single_request(payload)
if stage_time["total"] > 0:
self.response_times.append(stage_time["total"])
self.stage_times.append(stage_time)
# 稍微延迟一下,避免请求过于集中
time.sleep(0.1)
print("响应时间测量完成")
def analyze_results(self) -> Dict[str, Any]:
"""分析测量结果"""
if not self.response_times:
return {"error": "没有有效的测量结果"}
results = {
"num_requests": len(self.response_times),
"mean": statistics.mean(self.response_times),
"median": statistics.median(self.response_times),
"stdev": statistics.stdev(self.response_times) if len(self.response_times) > 1 else 0,
"min": min(self.response_times),
"max": max(self.response_times),
"percentiles": {}
}
# 计算百分位数
percentiles = [50, 75, 90, 95, 99]
sorted_times = sorted(self.response_times)
for p in percentiles:
index = int(len(sorted_times) * p / 100)
results["percentiles"][f"p{p}"] = sorted_times[index]
# 分析各阶段时间
if self.stage_times:
stage_analysis = {}
stages = self.stage_times[0].keys()
for stage in stages:
times = [st[stage] for st in self.stage_times if stage in st and isinstance(st[stage], (int, float))]
if times:
stage_analysis[stage] = {
"mean": statistics.mean(times),
"median": statistics.median(times),
"min": min(times),
"max": max(times)
}
results["stage_analysis"] = stage_analysis
return results
def save_results(self, filename: str = "response_time_results.json") -> None:
"""保存测量结果到文件"""
results = self.analyze_results()
results["raw_data"] = {
"response_times": self.response_times,
"stage_times": self.stage_times
}
with open(filename, 'w') as f:
json.dump(results, f, indent=2)
print(f"结果已保存到 {filename}")
# 使用示例
if __name__ == "__main__":
# 创建测量器实例
measurer = ResponseTimeMeasurer(
endpoint="http://localhost:8000/api/agent",
num_requests=100
)
# 运行测量
measurer.run_measurements()
# 分析结果
results = measurer.analyze_results()
print("\n测量结果分析:")
print(f"平均响应时间: {results['mean']:.3f}秒")
print(f"中位数响应时间: {results['median']:.3f}秒")
print(f"最小响应时间: {results['min']:.3f}秒")
print(f"最大响应时间: {results['max']:.3f}秒")
print(f"P95响应时间: {results['percentiles']['p95']:.3f}秒")
print(f"P99响应时间: {results['percentiles']['p99']:.3f}秒")
# 保存结果
measurer.save_results()
10.2 准确率测量工具
以下是一个准确率测量工具的Python实现:
import json
import time
from typing import List, Dict, Any, Tuple
import requests
from dataclasses import dataclass
from enum import Enum
class EvaluationResult(Enum):
CORRECT = "correct"
PARTIALLY_CORRECT = "partially_correct"
INCORRECT = "incorrect"
ERROR = "error"
@dataclass
class TestCase:
id: str
input: Dict[str, Any]
expected_output: Dict[str, Any]
evaluation_criteria: Dict[str, Any]
class AccuracyMeasurer:
def __init__(self, endpoint: str):
self.endpoint = endpoint
self.test_cases: List[TestCase] = []
self.results: List[Dict[str, Any]] = []
def load_test_cases(self, filepath: str) -> None:
"""从JSON文件加载测试用例"""
with open(filepath, 'r') as f:
test_data = json.load(f)
self.test_cases = []
for item in test_data:
test_case = TestCase(
id=item.get("id", str(len(self.test_cases) + 1)),
input=item.get("input", {}),
expected_output=item.get("expected_output", {}),
evaluation_criteria=item.get("evaluation_criteria", {})
)
self.test_cases.append(test_case)
print(f"已加载 {len(self.test_cases)} 个测试用例")
def evaluate_output(self, actual_output: Dict[str, Any],
expected_output: Dict[str, Any],
evaluation_criteria: Dict[str, Any]) -> Tuple[EvaluationResult, Dict[str, Any]]:
"""评估Agent的输出"""
details = {}
overall_score = 0.0
max_score = 0.0
# 检查是否有错误
if "error" in actual_output:
return EvaluationResult.ERROR, {"error": actual_output["error"]}
# 根据评估标准进行评估
for criterion, criteria in evaluation_criteria.items():
if criterion not in expected_output or criterion not in actual_output:
continue
expected = expected_output[criterion]
actual = actual_output[criterion]
weight = criteria.get("weight", 1.0)
max_score += weight
# 根据类型进行不同的评估
if isinstance(expected, str):
# 字符串匹配评估
if criteria.get("type") == "exact_match":
if actual == expected:
score = weight
details[criterion] = {"result": "correct", "score": score}
else:
score = 0
details[criterion] = {"result": "incorrect", "score": score,
"expected": expected, "actual": actual}
elif criteria.get("type") == "contains":
if expected in actual:
score = weight
details[criterion] = {"result": "correct", "score": score}
else:
score = 0
details[criterion] = {"result": "incorrect", "score": score,
"expected": expected, "actual": actual}
else:
# 默认使用相似度评估
similarity = self._calculate_similarity(expected, actual)
score = weight * similarity
details[criterion] = {"result": "partial" if similarity > 0.5 else "incorrect",
"score": score, "similarity": similarity}
elif isinstance(expected, (int, float)):
# 数值评估
tolerance = criteria.get("tolerance", 0.1)
if abs(actual - expected) <= tolerance:
score = weight
details[criterion] = {"result": "correct", "score": score}
else:
score = 0
details[criterion] = {"result": "incorrect", "score": score,
"expected": expected, "actual": actual}
elif isinstance(expected, list):
# 列表评估
if criteria.get("type") == "set_match":
expected_set = set(expected)
actual_set = set(actual)
intersection = expected_set.intersection(actual_set)
union = expected_set.union(actual_set)
jaccard = len(intersection) / len(union) if union else 1.0
score = weight * jaccard
details[criterion] = {"result": "partial" if jaccard > 0.5 else "incorrect",
"score": score, "jaccard": jaccard}
else:
# 精确列表匹配
if actual == expected:
score = weight
details[criterion] = {"result": "correct", "score": score}
else:
score = 0
details[criterion] = {"result": "incorrect", "score": score,
"expected": expected, "actual": actual}
else:
# 其他类型的默认评估
if actual == expected:
score = weight
details[criterion] = {"result": "correct", "score": score}
else:
score = 0
details[criterion] = {"result": "incorrect", "score": score,
"expected": expected, "actual": actual}
overall_score += score
# 计算归一化分数
normalized_score = overall_score / max_score if max_score > 0 else 0
# 确定整体评估结果
if normalized_score >= 0.9:
result = EvaluationResult.CORRECT
elif normalized_score >= 0.5:
result = EvaluationResult.PARTIALLY_CORRECT
else:
result = EvaluationResult.INCORRECT
return result, {
"overall_score": overall_score,
"max_score": max_score,
"normalized_score": normalized_score,
"details": details
}
def _calculate_similarity(self, str1: str, str2: str) -> float:
"""计算两个字符串的相似度(简单实现,实际应用中可以使用更复杂的方法)"""
# 转换为小写
str1 = str1.lower()
str2 = str2.lower()
# 计算词集相似度
set1 = set(str1.split())
set2 = set(str2.split())
if not set1 and not set2:
return 1.0
intersection = set1.intersection(set2)
union = set1.union(set2)
return len(intersection) / len(union)
def run_measurements(self) -> None:
"""运行准确率测量"""
if not self.test_cases:
print("没有测试用例,请先加载测试用例")
return
print(f"开始测量准确率,测试 {len(self.test_cases)} 个用例...")
self.results = []
for i, test_case in enumerate(self.test_cases):
print(f"处理测试用例 {i+1}/{len(self.test_cases)}: {test_case.id}")
# 发送请求到Agent
try:
start_time = time.time()
response = requests.post(self.endpoint, json=test_case.input, timeout=60)
end_time = time.time()
response.raise_for_status()
actual_output = response.json()
# 评估输出
eval_result, eval_details = self.evaluate_output(
actual_output,
test_case.expected_output,
test_case.evaluation_criteria
)
# 记录结果
self.results.append({
"test_case_id": test_case.id,
"input": test_case.input,
"expected_output": test_case.expected_output,
"actual_output": actual_output,
"evaluation_result": eval_result.value,
"evaluation_details": eval_details,
"response_time": end_time - start_time
})
except Exception as e:
print(f"测试用例 {test_case.id} 处理失败: {e}")
self.results.append({
"test_case_id": test_case.id,
"input": test_case.input,
"expected_output": test_case.expected_output,
"actual_output": {"error": str(e)},
"evaluation_result": EvaluationResult.ERROR.value,
"evaluation_details": {"error": str(e)},
"response_time": 0
})
print("准确率测量完成")
def analyze_results(self) -> Dict[str, Any]:
"""分析测量结果"""
if not self.results:
return {"error": "没有有效的测量结果"}
# 统计各类结果的数量
result_counts = {
EvaluationResult.CORRECT.value: 0,
EvaluationResult.PARTIALLY_CORRECT.value: 0,
EvaluationResult.INCORRECT.value: 0,
EvaluationResult.ERROR.value: 0
}
total_score = 0.0
total_normalized_score = 0.0
response_times = []
for result in self.results:
eval_result = result["evaluation_result"]
result_counts[eval_result] = result_counts.get(eval_result, 0) + 1
if "normalized_score" in result["evaluation_details"]:
total_normalized_score += result["evaluation_details"]["normalized_score"]
if "overall_score" in result["evaluation_details"]:
total_score += result["evaluation_details"]["overall_score"]
if result["response_time"] > 0:
response_times.append(result["response_time"])
# 计算准确率指标
total_tests = len(self.results)
accuracy = result_counts[EvaluationResult.CORRECT.value] / total_tests
partial_accuracy = result_counts[EvaluationResult.PARTIALLY_CORRECT.value] / total_tests
error_rate = result_counts[EvaluationResult.ERROR.value] / total_tests
# 计算平均分数
avg_score = total_score / total_tests if total_tests > 0 else 0
avg_normalized_score = total_normalized_score / total_tests if total_tests > 0 else 0
# 计算平均响应时间
avg_response_time = sum(response_times) / len(response_times) if response_times else 0
return {
"total_tests": total_tests,
"result_counts": result_counts,
"accuracy": accuracy,
"partial_accuracy": partial_accuracy,
"error_rate": error_rate,
"avg_score": avg_score,
"avg_normalized_score": avg_normalized_score,
"avg_response_time": avg_response_time,
"detailed_results": self.results
}
def save_results(self, filename: str = "accuracy_results.json") -> None:
"""保存测量结果到文件"""
results = self.analyze_results()
with open(filename, 'w') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"结果已保存到 {filename}")
# 使用示例
if __name__ == "__main__":
# 创建测量器实例
measurer = AccuracyMeasurer(endpoint="http://localhost:8000/api/agent")
# 加载测试用例
# 假设test_cases.json文件包含测试用例数据
measurer.load_test_cases("test_cases.json")
# 运行测量
measurer.run_measurements()
# 分析结果
results = measurer.analyze_results()
print("\n测量结果分析:")
print(f"总测试用例数: {results['total_tests']}")
print(f"完全正确: {results['result_counts']['correct']}")
print(f"部分正确: {results['result_counts']['partially_correct']}")
print(f"不正确: {results['result_counts']['incorrect']}")
print(f"错误: {results['result_counts']['error']}")
print(f"准确率: {results['accuracy']:.2%}")
print(f"平均归一化分数: {results['avg_normalized_score']:.2f}")
print(f"平均响应时间: {results['avg_response_time']:.3f}秒")
# 保存结果
measurer.save_results()
10.3 吞吐量测量工具
以下是一个吞吐量测量工具的Python实现:
import time
import threading
import requests
import statistics
from typing import List, Dict, Any, Callable
import json
from dataclasses import dataclass
from queue import Queue, Empty
@dataclass
class ThroughputResult:
timestamp: float
requests_sent: int
requests_completed: int
requests_failed: int
avg_response_time: float
p50_response_time: float
p95_response_time: float
p99_response_time: float
class ThroughputMeasurer:
def __init__(self, endpoint: str):
self.endpoint = endpoint
self.results: List[ThroughputResult] = []
self.response_times: List[float] = []
self.requests_sent = 0
self.requests_completed = 0
self.requests_failed = 0
self.lock = threading.Lock()
self.stop_event = threading.Event()
def worker(self, payload: Dict[str, Any], queue: Queue) -> None:
"""工作线程函数,用于发送请求"""
while not self.stop_event.is_set():
try:
# 从队列获取任务(如果队列为空,使用默认payload)
try:
task_payload = queue.get_nowait()
except Empty:
task_payload = payload
# 发送请求
start_time = time.time()
self.requests_sent += 1
try:
response = requests.post(self.endpoint, json=task_payload, timeout=30)
response.raise_for_status()
end_time = time.time()
# 记录响应时间
response_time = end_time - start_time
with self.lock:
self.response_times.append(response_time)
self.requests_completed += 1
except Exception as e:
with self.lock:
self.requests_failed += 1
except Exception as e:
print(f"工作线程错误: {e}")
def monitor(self, interval: float = 1.0) -> None:
"""监控线程函数,用于定期记录性能数据"""
while not self.stop_event.is_set():
# 记录当前时间和状态
timestamp = time.time()
with self.lock:
# 复制当前状态
current_sent = self.requests_sent
current_completed = self.requests_completed
current_failed = self.requests_failed
current_response_times = self.response_times.copy()
# 重置计数器和响应时间列表,为下一个周期做准备
self.requests_sent = 0
self.requests_completed = 0
self.requests_failed = 0
self.response_times.clear()
# 计算统计数据
if current_response_times:
avg_response_time = statistics.mean(current_response_times)
sorted_times = sorted(current_response_times)
p50_index = int(len(sorted_times) * 0.5)
p95_index = int(len(sorted_times) * 0.95)
p99_index = int(len(sorted_times) * 0.99)
p50_response_time = sorted_times[p50_index]
p95_response_time = sorted_times[min(p95_index, len(sorted_times)-1)]
p99_response_time = sorted_times[min(p99_index, len(sorted_times)-1)]
else:
avg_response_time = 0
p50_response_time = 0
p95_response_time = 0
p99_response_time = 0
# 记录结果
result = ThroughputResult(
timestamp=timestamp,
requests_sent=current_sent,
requests_completed=current_completed,
requests_failed=current_failed,
avg_response_time=avg_response_time,
p50_response_time=p50_response_time,
p95_response_time=p95_response_time,
p99_response_time=p99_response_time
)
self.results.append(result)
# 打印当前状态
print(f"[监控] 完成: {current_completed}, 失败: {current_failed}, "
f"QPS: {current_completed/interval:.1f}, "
f"P50: {p50_response_time:.3f}s, P95: {p95_response_time:.3f}s")
# 等待下一个监控周期
time.sleep(interval)
def warm_up(self, num_requests: int = 50, payload: Dict[str, Any] = None) -> None:
"""预热系统"""
print(f"开始预热系统,发送 {num_requests} 个请求...")
if payload is None:
payload = {"query": "Hello, AI Agent!"}
for i in range(num_requests):
try:
requests.post(self.endpoint, json=payload, timeout=30)
except Exception as e:
print(f"预热请求 {i+1} 失败: {e}")
print("系统预热完成")
def run_constant_load_test(self, concurrency: int, duration: float,
payload: Dict[str, Any] = None,
monitoring_interval: float = 1.0) -> None:
"""运行恒定负载测试"""
print(f"开始恒定负载测试,并发数: {concurrency}, 持续时间: {duration}秒")
if payload is None:
payload = {"query": "Hello, AI Agent!"}
# 重置状态
self.results = []
self.response_times = []
self.requests_sent = 0
self.requests_completed = 0
self.requests_failed = 0
self.stop_event.clear()
# 预热系统
self.warm_up()
# 创建工作线程
queue = Queue()
threads = []
for i in range(concurrency):
t = threading.Thread(target=self.worker, args=(payload, queue))
t.daemon = True
t.start()
threads.append(t)
# 创建监控线程
monitor_thread = threading.Thread(target=self.monitor, args=(monitoring_interval,))
monitor_thread.daemon = True
monitor_thread.start()
# 运行指定时间
print("测试进行中...")
time.sleep(duration)
# 停止所有线程
self.stop_event.set()
monitor_thread.join(timeout=monitoring_interval + 1)
for t in threads:
t.join(timeout=2)
print("恒定负载测试完成")
def run_staircase_load_test(self, start_concurrency: int, end_concurrency: int,
step: int, step_duration: float,
payload: Dict[str, Any] = None,
monitoring_interval: float = 1.0) -> None:
"""运行阶梯式负载测试"""
print(f"开始阶梯式负载测试,起始并发: {start_concurrency}, 结束并发: {end_concurrency}, "
f"步长: {step}, 每步持续: {step_duration}秒")
if payload is None:
payload = {"query": "Hello, AI Agent!"}
# 重置状态
self.results = []
self.response_times = []
self.requests_sent = 0
self.requests_completed = 0
self.requests_failed = 0
# 预热系统
self.warm_up()
# 逐步增加并发数
for concurrency in range(start_concurrency, end_concurrency + 1, step):
print(f"\n当前并发数: {concurrency}")
# 重置当前步骤的状态
self.response_times = []
self.requests_sent = 0
self.requests_completed = 0
self.requests_failed = 0
self.stop_event.clear()
# 创建工作线程
queue = Queue()
threads = []
for i in range(concurrency):
t = threading.Thread(target=self.worker, args=(payload, queue))
t.daemon = True
t.start()
threads.append(t)
# 创建监控线程
monitor_thread
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)