CANN 模型测试与基准测试:从精度验证到性能基准的完整方案
·
一、为什么需要系统化的模型测试
1.1 模型上线前的风险
一个模型从训练完成到上线部署,中间有太多环节可能出问题:
模型上线前可能出问题的环节:
1. 模型转换 (ATC)
- 算子不支持 → 静默失败,输出全零
- 数据格式不匹配 → 精度严重下降
- 维度不兼容 → 运行时崩溃
2. 推理环境
- CANN 版本不兼容 → 算子行为异常
- NPU 驱动版本冲突 → 无法启动
- 显存不足 → OOM 崩溃
3. 数据预处理
- 归一化参数不一致 → 精度下降
- Resize 算法不同 → 结果偏差
- 通道顺序错误 (RGB vs BGR) → 完全错误
4. 性能退化
- 新版本算子优化不足 → 延迟升高
- 算子融合策略变化 → 吞吐量下降
- 缓存策略不当 → 性能波动
真实案例:
某团队将 PyTorch 模型转换为 ONNX 再转为 OM 格式。
测试时精度正常(0.76),上线后发现精度只有 0.52。
原因: 测试用的预处理是 ImageNet 标准归一化,
而线上服务用的是自定义归一化参数。
如果有完整的端到端测试,这个问题在上线前就能发现。
1.2 测试金字塔
模型测试的层次结构(从底层到顶层):
┌──────────┐
│ 端到端测试 │ ← 最少,最慢,最全面
├──────────┤
│ 集成测试 │ ← 验证模块间协作
├────────────┤
│ 精度测试 │ ← 验证模型输出质量
├──────────────┤
│ 性能测试 │ ← 验证推理效率
├────────────────┤
│ 单元测试 │ ← 最多,最快,验证基础功能
└──────────────────┘
每一层的职责:
单元测试: 验证单个算子、单个函数的正确性
性能测试: 验证延迟、吞吐量、显存占用
精度测试: 验证模型输出的精度指标
集成测试: 验证数据流水线、模型服务化等模块协作
端到端测试: 模拟真实用户请求,验证完整链路
二、单元测试
2.1 算子正确性测试
import pytest
import torch
import numpy as np
class TestOperatorCorrectness:
"""算子正确性测试
每个算子在转换为 OM 格式后,都需要验证其正确性。
测试方法:
1. 用 PyTorch 计算参考结果 (CPU/GPU)
2. 用 CANN OM 模型计算 NPU 结果
3. 比较两者的结果,计算误差
为什么要对比 PyTorch 和 OM?
因为 PyTorch 是"标准答案"(训练框架),
OM 是"转换后的结果"(推理格式)。
如果两者结果差异大,说明转换过程出了问题。
"""
def test_conv2d_basic(self):
"""测试 Conv2d 算子基本功能"""
# 创建 PyTorch 参考模型
torch_conv = torch.nn.Conv2d(3, 16, kernel_size=3, padding=1)
torch_conv.eval()
# 创建输入
x = torch.randn(1, 3, 224, 224)
# PyTorch 参考结果
with torch.no_grad():
ref_output = torch_conv(x)
# 加载 OM 模型
om_model = load_om_model("conv2d_test.om")
om_output = om_model(x.npu())
# 比较结果
diff = torch.abs(ref_output - om_output.cpu()).max().item()
assert diff < 1e-5, f"Conv2d 算子误差过大: {diff}"
def test_batch_norm(self):
"""测试 BatchNorm 算子"""
torch_bn = torch.nn.BatchNorm2d(64)
torch_bn.eval()
x = torch.randn(4, 64, 56, 56)
with torch.no_grad():
ref_output = torch_bn(x)
om_model = load_om_model("batchnorm_test.om")
om_output = om_model(x.npu())
# BatchNorm 的误差通常比 Conv 大(因为涉及统计量计算)
mean_diff = torch.abs(ref_output - om_output.cpu()).mean().item()
assert mean_diff < 1e-4, f"BatchNorm 均值误差: {mean_diff}"
def test_attention_mechanism(self):
"""测试 Self-Attention 算子
Attention 的数值稳定性是一个常见问题。
softmax 对输入非常敏感,微小的差异可能导致完全不同的输出。
因此 Attention 的精度要求比普通算子更高。
"""
# 创建一个简化的 Self-Attention 模块
d_model = 64
torch_qkv = torch.nn.Linear(d_model, 3 * d_model)
x = torch.randn(1, 10, d_model) # [batch, seq_len, dim]
# PyTorch 参考
qkv = torch_qkv(x)
q, k, v = qkv.chunk(3, dim=-1)
scale = d_model ** 0.5
attn = torch.softmax(torch.bmm(q, k.transpose(1, 2)) / scale, dim=-1)
ref_output = torch.bmm(attn, v)
# OM 模型
om_model = load_om_model("self_attention_test.om")
om_output = om_model(x.npu())
# Attention 的误差容限较宽松(softmax 的数值特性)
cosine_sim = torch.nn.functional.cosine_similarity(
ref_output.flatten().unsqueeze(0),
om_output.cpu().flatten().unsqueeze(0)
).item()
assert cosine_sim > 0.99, f"Attention 余弦相似度过低: {cosine_sim}"
2.2 数据类型兼容性测试
class TestDataTypes:
"""数据类型兼容性测试
不同的数据类型 (dtype) 会影响:
1. 计算精度
2. 推理速度
3. 显存占用
这个测试确保模型在所有支持的 dtype 下都能正常工作。
"""
@pytest.mark.parametrize("dtype", [
torch.float16,
torch.bfloat16,
])
def test_model_dtype_support(self, dtype):
"""测试模型对不同数据类型的支持"""
model = load_om_model("resnet50.om")
x = torch.randn(1, 3, 224, 224).to(dtype)
try:
output = model(x.npu())
assert output is not None
assert not torch.isnan(output.cpu()).any(), "输出包含 NaN"
assert not torch.isinf(output.cpu()).any(), "输出包含 Inf"
except RuntimeError as e:
if "不支持的数据类型" in str(e):
pytest.skip(f"当前设备不支持 {dtype}")
else:
raise
def test_input_output_consistency(self):
"""测试输入输出一致性
多次输入相同数据,输出应该完全一致。
如果输出不一致,说明模型存在非确定性行为,
这在推理场景中是不可接受的。
"""
model = load_om_model("resnet50.om")
x = torch.randn(1, 3, 224, 224)
outputs = []
for _ in range(10):
output = model(x.npu()).cpu()
outputs.append(output)
# 所有输出应该完全一致
for i in range(1, len(outputs)):
diff = torch.abs(outputs[0] - outputs[i]).max().item()
assert diff == 0, f"第 {i} 次输出与第 0 次不一致: diff={diff}"
三、精度验证
3.1 精度测试框架
class AccuracyValidator:
"""精度验证器
用于验证 OM 模型的推理精度是否达标。
工作流程:
1. 加载 OM 模型和对应的 PyTorch 参考模型
2. 用相同的数据集分别推理
3. 计算精度指标(Top-1、Top-5、mAP 等)
4. 与阈值对比,判断是否通过
关键指标:
- Top-1 准确率: 最大概率类别是否正确
- Top-5 准确率: 前 5 个概率最大的类别中是否包含正确类别
- 余弦相似度: 输出向量的方向是否一致
- 最大绝对误差: 单个元素的最大偏差
"""
def __init__(self, om_model_path, torch_model, class_names=None):
self.om_model = load_om_model(om_model_path)
self.torch_model = torch_model
self.class_names = class_names or []
def validate(self, test_loader, thresholds=None):
"""执行精度验证
参数:
test_loader: 测试数据加载器
thresholds: 精度阈值字典
返回:
验证结果字典,包含各项指标和是否通过
"""
if thresholds is None:
thresholds = {
'top1_accuracy': 0.70,
'top5_accuracy': 0.90,
'max_abs_error': 0.01,
'cosine_similarity': 0.99
}
# 收集所有预测结果
om_predictions = []
torch_predictions = []
om_features = []
torch_features = []
all_labels = []
self.om_model.eval()
self.torch_model.eval()
with torch.no_grad():
for images, labels in test_loader:
# OM 模型推理
om_output = self.om_model(images.npu())
om_predictions.append(om_output.cpu())
# PyTorch 参考
torch_output = self.torch_model(images)
torch_predictions.append(torch_output)
all_labels.append(labels)
# 拼接结果
om_preds = torch.cat(om_predictions)
torch_preds = torch.cat(torch_predictions)
labels = torch.cat(all_labels)
# 计算各项指标
results = {}
# Top-1 准确率
om_top1 = om_preds.argmax(dim=1)
torch_top1 = torch_preds.argmax(dim=1)
om_accuracy = (om_top1 == labels).float().mean().item()
torch_accuracy = (torch_top1 == labels).float().mean().item()
results['om_top1_accuracy'] = om_accuracy
results['torch_top1_accuracy'] = torch_accuracy
results['accuracy_drop'] = torch_accuracy - om_accuracy
# Top-5 准确率
om_top5 = om_preds.topk(5, dim=1).indices
torch_top5 = torch_preds.topk(5, dim=1).indices
om_top5_acc = (om_top5 == labels.unsqueeze(1)).any(dim=1).float().mean().item()
results['om_top5_accuracy'] = om_top5_acc
# 最大绝对误差
max_abs_error = torch.abs(om_preds - torch_preds).max().item()
results['max_abs_error'] = max_abs_error
# 余弦相似度
cosine_sim = torch.nn.functional.cosine_similarity(
om_preds.flatten().unsqueeze(0),
torch_preds.flatten().unsqueeze(0)
).item()
results['cosine_similarity'] = cosine_sim
# 判断是否通过
results['passed'] = all([
results['om_top1_accuracy'] >= thresholds['top1_accuracy'],
results['om_top5_accuracy'] >= thresholds['top5_accuracy'],
results['max_abs_error'] <= thresholds['max_abs_error'],
results['cosine_similarity'] >= thresholds['cosine_similarity']
])
return results
def print_report(self, results):
"""打印验证报告"""
print("
" + "=" * 60)
print("精度验证报告")
print("=" * 60)
print(f"
{'指标':<25} {'OM 模型':<15} {'PyTorch':<15} {'状态':<10}")
print("-" * 60)
items = [
("Top-1 准确率", f"{results['om_top1_accuracy']:.4f}",
f"{results['torch_top1_accuracy']:.4f}"),
("Top-5 准确率", f"{results['om_top5_accuracy']:.4f}", ""),
("最大绝对误差", f"{results['max_abs_error']:.6f}", ""),
("余弦相似度", f"{results['cosine_similarity']:.6f}", ""),
("精度下降", f"{results['accuracy_drop']:.4f}", ""),
]
for name, om_val, torch_val in items:
print(f"{name:<25} {om_val:<15} {torch_val:<15}")
print("-" * 60)
status = "✓ 通过" if results['passed'] else "✗ 未通过"
print(f"总体结果: {status}")
print("=" * 60)
# 使用示例
# validator = AccuracyValidator("resnet50.om", torch_resnet50)
# results = validator.validate(test_loader)
# validator.print_report(results)
四、性能基准测试
4.1 延迟测试
import time
class LatencyBenchmark:
"""延迟基准测试
测量推理延迟的多个维度:
1. 首次延迟 (Cold Start):
模型第一次推理的延迟。
包含模型加载、内存分配、编译等开销。
通常比后续推理慢 10-100 倍。
2. 平均延迟 (Average Latency):
多次推理的平均延迟。
反映稳态性能。
3. 百分位延迟 (Percentile Latency):
P50: 50% 的请求在此延迟内完成
P95: 95% 的请求在此延迟内完成
P99: 99% 的请求在此延迟内完成
P99 比平均值更重要!
因为平均值可能被大量快速请求拉低,
而少数慢请求(P99)才是用户体验的瓶颈。
"""
def __init__(self, model, warmup=10, iterations=1000):
"""
参数:
model: 要测试的模型
warmup: 预热次数(不计入统计)
为什么需要预热?
1. GPU/NPU 首次推理需要编译 kernel
2. 缓存需要填充
3. 内存分配器需要预热
iterations: 正式测试次数
越多越准确,但耗时越长
建议: 至少 100 次,推荐 1000 次
"""
self.model = model
self.warmup = warmup
self.iterations = iterations
def benchmark(self, input_tensor):
"""执行延迟基准测试"""
latencies = []
# 预热
for _ in range(self.warmup):
_ = self.model(input_tensor.npu())
# 确保预热完成
torch.npu.synchronize()
# 正式测试
for _ in range(self.iterations):
start = time.time()
_ = self.model(input_tensor.npu())
# 等待 NPU 完成计算
torch.npu.synchronize()
latency = (time.time() - start) * 1000 # 转换为毫秒
latencies.append(latency)
# 统计
latencies.sort()
results = {
'mean_ms': np.mean(latencies),
'std_ms': np.std(latencies),
'min_ms': np.min(latencies),
'max_ms': np.max(latencies),
'p50_ms': np.percentile(latencies, 50),
'p95_ms': np.percentile(latencies, 95),
'p99_ms': np.percentile(latencies, 99),
'iterations': self.iterations
}
return results
def print_report(self, results):
"""打印基准报告"""
print("
" + "=" * 50)
print("延迟基准测试报告")
print("=" * 50)
print(f" 测试次数: {results['iterations']}")
print(f" 平均延迟: {results['mean_ms']:.2f} ms")
print(f" 标准差: {results['std_ms']:.2f} ms")
print(f" 最小延迟: {results['min_ms']:.2f} ms")
print(f" 最大延迟: {results['max_ms']:.2f} ms")
print(f" P50 延迟: {results['p50_ms']:.2f} ms")
print(f" P95 延迟: {results['p95_ms']:.2f} ms")
print(f" P99 延迟: {results['p99_ms']:.2f} ms")
print("=" * 50)
# 使用示例
# benchmark = LatencyBenchmark(model, warmup=10, iterations=1000)
# results = benchmark.benchmark(test_input)
# benchmark.print_report(results)
4.2 吞吐量测试
class ThroughputBenchmark:
"""吞吐量基准测试
吞吐量 (Throughput) 衡量的是单位时间内能处理多少请求。
为什么吞吐量和延迟同样重要?
一个系统可能单个请求只要 10ms(延迟低),
但同时只能处理 1 个请求(吞吐量低)。
在高并发场景下,这样的系统会崩溃。
吞吐量的两种度量方式:
1. 单 Stream 吞吐量: 一个 Stream 上连续推理的吞吐量
2. 多 Stream 吞吐量: 多个 Stream 并发推理的吞吐量
通常多 Stream 吞吐量远高于单 Stream,
因为可以隐藏内存拷贝的延迟。
"""
def __init__(self, model, duration_seconds=10):
self.model = model
self.duration_seconds = duration_seconds
def benchmark_single_stream(self, input_tensor):
"""单 Stream 吞吐量测试"""
count = 0
start_time = time.time()
while time.time() - start_time < self.duration_seconds:
_ = self.model(input_tensor.npu())
torch.npu.synchronize()
count += 1
elapsed = time.time() - start_time
throughput = count / elapsed
return {
'mode': 'single_stream',
'throughput_qps': throughput,
'total_requests': count,
'duration_seconds': elapsed
}
def benchmark_multi_stream(self, input_tensor, num_streams=4):
"""多 Stream 吞吐量测试
原理:
使用多个 Stream 交替提交推理任务。
当 Stream 0 在等待 NPU 执行时,
Stream 1 可以提交下一个任务。
这样 NPU 几乎一直在忙,不会空闲。
时间轴:
Stream 0: |--提交1--| |--提交3--| |--提交5--|
Stream 1: |--提交2--| |--提交4--| |
NPU: |--执行1--|--执行2--|--执行3--|--执行4--|--执行5--|
"""
streams = [torch.npu.Stream() for _ in range(num_streams)]
count = 0
start_time = time.time()
while time.time() - start_time < self.duration_seconds:
stream = streams[count % num_streams]
with torch.npu.stream(stream):
_ = self.model(input_tensor.npu())
count += 1
# 每 100 个请求同步一次,避免提交过快
if count % 100 == 0:
torch.npu.synchronize()
torch.npu.synchronize()
elapsed = time.time() - start_time
throughput = count / elapsed
return {
'mode': f'multi_stream_{num_streams}',
'throughput_qps': throughput,
'total_requests': count,
'duration_seconds': elapsed,
'num_streams': num_streams
}
def print_report(self, single_result, multi_result):
"""打印对比报告"""
print("
" + "=" * 60)
print("吞吐量基准测试报告")
print("=" * 60)
print(f"
{'模式':<25} {'吞吐量 (QPS)':<20} {'总请求数':<15}")
print("-" * 60)
print(f"{'单 Stream':<25} {single_result['throughput_qps']:<20.2f} "
f"{single_result['total_requests']:<15}")
print(f"{'多 Stream (' + str(multi_result['num_streams']) + ')':<25} "
f"{multi_result['throughput_qps']:<20.2f} "
f"{multi_result['total_requests']:<15}")
speedup = multi_result['throughput_qps'] / single_result['throughput_qps']
print(f"
多 Stream 加速比: {speedup:.2f}x")
print("=" * 60)
五、回归测试
5.1 版本对比测试
class RegressionTest:
"""回归测试
回归测试的目的是: 确保新版本的模型/软件没有引入退化。
典型触发时机:
1. CANN 版本升级后
2. 模型重新转换后
3. 算子库更新后
4. 驱动升级后
测试内容:
1. 精度是否退化(与基线版本对比)
2. 性能是否退化(延迟是否增加)
3. 功能是否正常(是否新增不支持的算子)
"""
def __init__(self, baseline_results=None):
self.baseline = baseline_results or {}
self.tolerance = {
'accuracy_drop': 0.01, # 精度下降容忍度: 1%
'latency_increase': 0.10, # 延迟增加容忍度: 10%
'throughput_drop': 0.10 # 吞吐量下降容忍度: 10%
}
def set_baseline(self, accuracy, latency_ms, throughput_qps):
"""设置基线指标"""
self.baseline = {
'accuracy': accuracy,
'latency_ms': latency_ms,
'throughput_qps': throughput_qps
}
def compare(self, current_accuracy, current_latency_ms, current_throughput_qps):
"""与基线对比"""
if not self.baseline:
return {'passed': True, 'reason': '无基线,跳过对比'}
issues = []
# 精度对比
accuracy_drop = self.baseline['accuracy'] - current_accuracy
if accuracy_drop > self.tolerance['accuracy_drop']:
issues.append(
f"精度退化: {self.baseline['accuracy']:.4f} → {current_accuracy:.4f} "
f"(下降 {accuracy_drop:.4f}, 容忍度 {self.tolerance['accuracy_drop']})"
)
# 延迟对比
latency_increase = (current_latency_ms - self.baseline['latency_ms']) / self.baseline['latency_ms']
if latency_increase > self.tolerance['latency_increase']:
issues.append(
f"延迟退化: {self.baseline['latency_ms']:.2f}ms → {current_latency_ms:.2f}ms "
f"(增加 {latency_increase:.1%}, 容忍度 {self.tolerance['latency_increase']:.0%})"
)
# 吞吐量对比
throughput_drop = (self.baseline['throughput_qps'] - current_throughput_qps) / self.baseline['throughput_qps']
if throughput_drop > self.tolerance['throughput_drop']:
issues.append(
f"吞吐量退化: {self.baseline['throughput_qps']:.2f} → {current_throughput_qps:.2f} "
f"(下降 {throughput_drop:.1%}, 容忍度 {self.tolerance['throughput_drop']:.0%})"
)
return {
'passed': len(issues) == 0,
'issues': issues,
'accuracy_drop': accuracy_drop,
'latency_change': latency_increase,
'throughput_change': -throughput_drop
}
# 使用示例
# regression = RegressionTest()
# regression.set_baseline(accuracy=0.76, latency_ms=8.5, throughput_qps=120)
#
# # 新版本测试
# result = regression.compare(
# current_accuracy=0.758,
# current_latency_ms=8.3,
# current_throughput_qps=125
# )
# print(f"回归测试: {'通过' if result['passed'] else '未通过'}")
# if not result['passed']:
# for issue in result['issues']:
# print(f" - {issue}")
六、常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| OM 模型输出全零 | 算子不支持或转换失败 | 检查 ATC 转换日志、逐层对比 |
| 精度大幅下降 | 预处理不一致 | 确保训练和推理的预处理完全相同 |
| 性能测试结果波动大 | 系统负载不稳定 | 多次测试取中位数、关闭无关进程 |
| 回归测试误报 | 阈值设置过严 | 调整容忍度、增加测试样本数 |
| 首次推理特别慢 | 模型编译和缓存预热 | 单独测量预热延迟、使用模型缓存 |
相关仓库
- torch_npu - NPU 推理后端 https://atomgit.com/cann/ops-nn
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)