AI开发~OpenAI专家之路:构建企业级AI应用(第三部分·下)
第七部分(续):LLM应用测试与评估
7.4 黄金数据集(Golden Dataset)
打个比方: 黄金数据集就像"标准答案库",里面存着精心挑选的问题和高质量的标准答案。每次测试时,把AI的回答和标准答案对比,就能知道AI表现如何。
之所以叫"黄金",是因为这些答案都是人工审核过的、质量有保证的"真金"。
深入理解: 黄金数据集的作用:
- 基准测试:作为评估AI性能的基准
- 回归测试:确保模型更新后性能不下降
- 微调数据:高质量数据可用于模型微调
- 问题发现:识别AI的薄弱环节
7.4.1 黄金数据集设计
from dataclasses import dataclass
from typing import List, Dict, Optional
from openai import OpenAI
import numpy as np
import json
from pathlib import Path
@dataclass
class GoldenExample:
"""黄金样本"""
id: str
input: str
reference_output: str
quality_score: float
annotator: str
category: str
difficulty: str
notes: Optional[str] = None
tags: Optional[List[str]] = None
class GoldenDatasetManager:
"""黄金数据集管理器"""
def __init__(self, dataset_path: str):
self.dataset_path = Path(dataset_path)
self.examples: List[GoldenExample] = []
self._load()
def _load(self):
"""加载数据集"""
if self.dataset_path.exists():
with open(self.dataset_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self.examples = [
GoldenExample(
id=item["id"],
input=item["input"],
reference_output=item["reference_output"],
quality_score=item["quality_score"],
annotator=item["annotator"],
category=item.get("category", "general"),
difficulty=item.get("difficulty", "medium"),
notes=item.get("notes"),
tags=item.get("tags", [])
)
for item in data
]
def save(self):
"""保存数据集"""
data = [
{
"id": ex.id,
"input": ex.input,
"reference_output": ex.reference_output,
"quality_score": ex.quality_score,
"annotator": ex.annotator,
"category": ex.category,
"difficulty": ex.difficulty,
"notes": ex.notes,
"tags": ex.tags
}
for ex in self.examples
]
with open(self.dataset_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def add_example(
self,
input_text: str,
reference_output: str,
quality_score: float,
annotator: str,
category: str = "general",
difficulty: str = "medium",
notes: str = None,
tags: List[str] = None
) -> GoldenExample:
"""添加黄金样本"""
example_id = f"GE{len(self.examples) + 1:04d}"
example = GoldenExample(
id=example_id,
input=input_text,
reference_output=reference_output,
quality_score=quality_score,
annotator=annotator,
category=category,
difficulty=difficulty,
notes=notes,
tags=tags or []
)
self.examples.append(example)
self.save()
return example
def get_by_category(self, category: str) -> List[GoldenExample]:
"""按类别获取样本"""
return [ex for ex in self.examples if ex.category == category]
def get_by_difficulty(self, difficulty: str) -> List[GoldenExample]:
"""按难度获取样本"""
return [ex for ex in self.examples if ex.difficulty == difficulty]
def get_high_quality(self, min_score: float = 0.8) -> List[GoldenExample]:
"""获取高质量样本"""
return [ex for ex in self.examples if ex.quality_score >= min_score]
def get_by_tags(self, tags: List[str]) -> List[GoldenExample]:
"""按标签获取样本"""
return [
ex for ex in self.examples
if any(tag in ex.tags for tag in tags)
]
def get_statistics(self) -> Dict:
"""获取数据集统计信息"""
if not self.examples:
return {"total": 0}
by_category = {}
by_difficulty = {}
by_annotator = {}
for ex in self.examples:
by_category[ex.category] = by_category.get(ex.category, 0) + 1
by_difficulty[ex.difficulty] = by_difficulty.get(ex.difficulty, 0) + 1
by_annotator[ex.annotator] = by_annotator.get(ex.annotator, 0) + 1
quality_scores = [ex.quality_score for ex in self.examples]
return {
"total": len(self.examples),
"by_category": by_category,
"by_difficulty": by_difficulty,
"by_annotator": by_annotator,
"quality": {
"mean": np.mean(quality_scores),
"min": np.min(quality_scores),
"max": np.max(quality_scores),
"std": np.std(quality_scores)
}
}
golden_manager = GoldenDatasetManager("golden_dataset.json")
golden_manager.add_example(
input_text="什么是机器学习?",
reference_output="机器学习是人工智能的一个分支,它使计算机系统能够从数据中学习和改进,而无需显式编程。机器学习算法通过分析大量数据来识别模式,并利用这些模式做出决策或预测。",
quality_score=0.95,
annotator="expert_001",
category="concept",
difficulty="easy",
notes="标准定义,包含核心概念",
tags=["AI", "基础概念"]
)
golden_manager.add_example(
input_text="解释一下什么是神经网络",
reference_output="神经网络是一种模仿人脑结构的计算系统。它由大量相互连接的节点(神经元)组成,分为输入层、隐藏层和输出层。每个连接都有权重,通过训练调整这些权重,神经网络可以学习识别复杂的模式和关系。",
quality_score=0.90,
annotator="expert_001",
category="concept",
difficulty="medium",
notes="清晰的结构解释",
tags=["AI", "神经网络", "深度学习"]
)
golden_manager.add_example(
input_text="如何处理不平衡数据集?",
reference_output="处理不平衡数据集的常用方法包括:1. 重采样技术:过采样少数类(如SMOTE)或欠采样多数类;2. 调整类别权重:在损失函数中给予少数类更高权重;3. 使用适合不平衡数据的评估指标:如F1-score、AUC-ROC而非准确率;4. 数据增强:为少数类生成更多样本;5. 集成方法:如EasyEnsemble、BalanceCascade。",
quality_score=0.92,
annotator="expert_002",
category="technical",
difficulty="hard",
notes="实用的技术方案",
tags=["机器学习", "数据处理", "最佳实践"]
)
stats = golden_manager.get_statistics()
print("黄金数据集统计:")
print(f" 总样本数: {stats['total']}")
print(f" 按类别: {stats['by_category']}")
print(f" 按难度: {stats['by_difficulty']}")
print(f" 平均质量分: {stats['quality']['mean']:.2f}")
7.4.2 对照黄金数据集评估
class GoldenDatasetEvaluator:
"""黄金数据集评估器"""
def __init__(
self,
golden_manager: GoldenDatasetManager,
client: OpenAI,
model: str = "gpt-4-turbo"
):
self.golden_manager = golden_manager
self.client = client
self.model = model
def evaluate(
self,
examples: List[GoldenExample] = None,
similarity_threshold: float = 0.7
) -> Dict:
"""
对照黄金数据集评估
Args:
examples: 要评估的样本,None表示全部
similarity_threshold: 相似度阈值
"""
examples = examples or self.golden_manager.examples
results = []
print(f"\n开始评估,共 {len(examples)} 个样本...")
for i, example in enumerate(examples, 1):
print(f"[{i}/{len(examples)}] 评估: {example.id}")
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": example.input}]
)
actual_output = response.choices[0].message.content
similarity = self._calculate_similarity(
example.reference_output,
actual_output
)
passed = similarity >= similarity_threshold
results.append({
"id": example.id,
"input": example.input,
"reference": example.reference_output,
"actual": actual_output,
"similarity": similarity,
"passed": passed,
"category": example.category,
"difficulty": example.difficulty
})
return self._aggregate_results(results)
def _calculate_similarity(self, text1: str, text2: str) -> float:
"""计算语义相似度"""
response = self.client.embeddings.create(
input=[text1, text2],
model="text-embedding-3-small"
)
emb1 = np.array(response.data[0].embedding)
emb2 = np.array(response.data[1].embedding)
return np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))
def _aggregate_results(self, results: List[Dict]) -> Dict:
"""汇总结果"""
total = len(results)
passed = sum(1 for r in results if r["passed"])
by_category = {}
by_difficulty = {}
for r in results:
cat = r["category"]
diff = r["difficulty"]
if cat not in by_category:
by_category[cat] = {"passed": 0, "total": 0, "similarities": []}
by_category[cat]["passed"] += 1 if r["passed"] else 0
by_category[cat]["total"] += 1
by_category[cat]["similarities"].append(r["similarity"])
if diff not in by_difficulty:
by_difficulty[diff] = {"passed": 0, "total": 0, "similarities": []}
by_difficulty[diff]["passed"] += 1 if r["passed"] else 0
by_difficulty[diff]["total"] += 1
by_difficulty[diff]["similarities"].append(r["similarity"])
return {
"summary": {
"total": total,
"passed": passed,
"pass_rate": passed / total * 100 if total > 0 else 0,
"avg_similarity": np.mean([r["similarity"] for r in results])
},
"by_category": by_category,
"by_difficulty": by_difficulty,
"details": results
}
def print_report(self, report: Dict):
"""打印评估报告"""
print(f"\n{'='*60}")
print("黄金数据集评估报告")
print(f"{'='*60}")
summary = report["summary"]
print(f"\n总体表现:")
print(f" 通过率: {summary['pass_rate']:.1f}% ({summary['passed']}/{summary['total']})")
print(f" 平均相似度: {summary['avg_similarity']:.4f}")
print(f"\n按类别:")
for cat, stats in report["by_category"].items():
rate = stats["passed"] / stats["total"] * 100
avg_sim = np.mean(stats["similarities"])
print(f" {cat}: {rate:.1f}% 通过, 相似度 {avg_sim:.4f}")
print(f"\n按难度:")
for diff, stats in report["by_difficulty"].items():
rate = stats["passed"] / stats["total"] * 100
avg_sim = np.mean(stats["similarities"])
print(f" {diff}: {rate:.1f}% 通过, 相似度 {avg_sim:.4f}")
print(f"\n失败样本:")
failures = [r for r in report["details"] if not r["passed"]]
for f in failures[:5]:
print(f" - {f['id']}: 相似度 {f['similarity']:.4f}")
client = OpenAI(api_key="your-api-key")
golden_evaluator = GoldenDatasetEvaluator(golden_manager, client)
report = golden_evaluator.evaluate()
golden_evaluator.print_report(report)
7.4.3 数据集增强与维护
class GoldenDatasetEnhancer:
"""黄金数据集增强器"""
def __init__(
self,
golden_manager: GoldenDatasetManager,
client: OpenAI
):
self.golden_manager = golden_manager
self.client = client
def generate_variations(
self,
example: GoldenExample,
num_variations: int = 3
) -> List[Dict]:
"""生成问题变体"""
prompt = f"""请为以下问题生成 {num_variations} 个语义相同但表述不同的问题变体。
原问题:{example.input}
要求:
1. 保持相同的语义和意图
2. 使用不同的表述方式
3. 难度相当
以JSON格式输出:
{{
"variations": [
"变体1",
"变体2",
"变体3"
]
}}"""
response = self.client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
variations = []
for var_input in result.get("variations", []):
variations.append({
"input": var_input,
"reference_output": example.reference_output,
"original_id": example.id
})
return variations
def validate_example(self, example: GoldenExample) -> Dict:
"""验证样本质量"""
issues = []
if len(example.input) < 10:
issues.append("问题过短")
if len(example.reference_output) < 20:
issues.append("答案过短")
if example.quality_score < 0.7:
issues.append("质量分数过低")
response = self.client.embeddings.create(
input=[example.input, example.reference_output],
model="text-embedding-3-small"
)
emb1 = np.array(response.data[0].embedding)
emb2 = np.array(response.data[1].embedding)
relevance = np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))
if relevance < 0.5:
issues.append(f"问答相关性低 ({relevance:.2f})")
return {
"id": example.id,
"is_valid": len(issues) == 0,
"issues": issues,
"input_answer_relevance": relevance
}
def clean_dataset(self) -> Dict:
"""清理数据集"""
validation_results = []
invalid_ids = []
for example in self.golden_manager.examples:
result = self.validate_example(example)
validation_results.append(result)
if not result["is_valid"]:
invalid_ids.append(example.id)
return {
"total": len(self.golden_manager.examples),
"valid": len(self.golden_manager.examples) - len(invalid_ids),
"invalid": len(invalid_ids),
"invalid_ids": invalid_ids,
"details": validation_results
}
enhancer = GoldenDatasetEnhancer(golden_manager, client)
example = golden_manager.examples[0]
variations = enhancer.generate_variations(example)
print(f"生成 {len(variations)} 个问题变体:")
for i, var in enumerate(variations, 1):
print(f" {i}. {var['input']}")
validation = enhancer.validate_example(example)
print(f"\n样本验证: {validation}")
7.5 人工评估
大白话解释: 机器评估虽然快,但有些东西机器判断不了。比如"这个回答够不够友好"、"这个解释够不够通俗易懂"。这时候就需要人来打分。
人工评估就像请了一群"品鉴师",他们按照统一的标准给AI的回答打分。为了避免个人偏见,通常会请多个人评估同一个答案,然后取平均分。
深入理解: 人工评估的关键要素:
- 评估标准:明确、可操作的评分标准
- 评估者培训:确保评估者理解标准
- 多人评估:减少个人偏见
- 一致性检查:评估者之间的意见是否一致
7.5.1 评估标准设计
from enum import Enum
from typing import List, Dict, Optional, Callable
from datetime import datetime
import statistics
class RatingScale(Enum):
"""评分等级"""
FIVE_POINT = "five_point"
SEVEN_POINT = "seven_point"
TEN_POINT = "ten_point"
@dataclass
class EvaluationCriteria:
"""评估标准"""
name: str
description: str
scale: RatingScale
guidelines: List[str]
weight: float = 1.0
def get_scale_range(self) -> tuple:
"""获取评分范围"""
if self.scale == RatingScale.FIVE_POINT:
return (1, 5)
elif self.scale == RatingScale.SEVEN_POINT:
return (1, 7)
else:
return (1, 10)
DEFAULT_CRITERIA = [
EvaluationCriteria(
name="准确性",
description="回答的事实是否正确",
scale=RatingScale.FIVE_POINT,
guidelines=[
"5分:完全正确,无事实错误",
"4分:基本正确,有小瑕疵",
"3分:部分正确,有明显错误",
"2分:大部分错误",
"1分:完全错误"
],
weight=1.5
),
EvaluationCriteria(
name="有用性",
description="回答是否解决了用户的问题",
scale=RatingScale.FIVE_POINT,
guidelines=[
"5分:完全解决问题",
"4分:基本解决,有小遗漏",
"3分:部分解决",
"2分:帮助有限",
"1分:没有帮助"
],
weight=1.2
),
EvaluationCriteria(
name="清晰度",
description="回答是否清晰易懂",
scale=RatingScale.FIVE_POINT,
guidelines=[
"5分:非常清晰,结构完整",
"4分:比较清晰",
"3分:一般,有些地方模糊",
"2分:不太清晰",
"1分:混乱难懂"
],
weight=1.0
),
EvaluationCriteria(
name="友好度",
description="回答的语气是否友好",
scale=RatingScale.FIVE_POINT,
guidelines=[
"5分:非常友好,语气温暖",
"4分:比较友好",
"3分:中性",
"2分:有些冷淡",
"1分:不友好或冒犯"
],
weight=0.8
)
]
def print_criteria(criteria: EvaluationCriteria):
"""打印评估标准"""
print(f"\n{criteria.name}(权重: {criteria.weight})")
print(f"描述: {criteria.description}")
print(f"评分范围: {criteria.get_scale_range()}")
print("评分指南:")
for guideline in criteria.guidelines:
print(f" - {guideline}")
print("人工评估标准:")
for c in DEFAULT_CRITERIA:
print_criteria(c)
7.5.2 人工评估系统
@dataclass
class HumanRating:
"""人工评分"""
evaluator_id: str
criteria_name: str
score: float
comment: Optional[str] = None
timestamp: str = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now().isoformat()
@dataclass
class EvaluationTask:
"""评估任务"""
task_id: str
question: str
answer: str
criteria: List[EvaluationCriteria]
ratings: List[HumanRating]
status: str = "pending"
required_evaluators: int = 3
class HumanEvaluationSystem:
"""人工评估系统"""
def __init__(self, criteria: List[EvaluationCriteria] = None):
self.criteria = criteria or DEFAULT_CRITERIA
self.tasks: Dict[str, EvaluationTask] = {}
self.evaluators: Dict[str, Dict] = {}
def register_evaluator(
self,
evaluator_id: str,
name: str,
expertise: List[str]
):
"""注册评估者"""
self.evaluators[evaluator_id] = {
"name": name,
"expertise": expertise,
"total_evaluations": 0,
"registered_at": datetime.now().isoformat()
}
print(f"已注册评估者: {name} ({evaluator_id})")
def create_task(
self,
question: str,
answer: str,
required_evaluators: int = 3
) -> EvaluationTask:
"""创建评估任务"""
task_id = f"HT{len(self.tasks) + 1:04d}"
task = EvaluationTask(
task_id=task_id,
question=question,
answer=answer,
criteria=self.criteria,
ratings=[],
status="pending",
required_evaluators=required_evaluators
)
self.tasks[task_id] = task
return task
def submit_rating(
self,
task_id: str,
evaluator_id: str,
criteria_name: str,
score: float,
comment: str = None
):
"""提交评分"""
if task_id not in self.tasks:
raise ValueError(f"任务 {task_id} 不存在")
if evaluator_id not in self.evaluators:
raise ValueError(f"评估者 {evaluator_id} 未注册")
task = self.tasks[task_id]
criteria_names = [c.name for c in task.criteria]
if criteria_name not in criteria_names:
raise ValueError(f"无效的评估标准: {criteria_name}")
rating = HumanRating(
evaluator_id=evaluator_id,
criteria_name=criteria_name,
score=score,
comment=comment
)
task.ratings.append(rating)
self.evaluators[evaluator_id]["total_evaluations"] += 1
self._check_task_completion(task)
def _check_task_completion(self, task: EvaluationTask):
"""检查任务是否完成"""
evaluator_ids = set(r.evaluator_id for r in task.ratings)
if len(evaluator_ids) >= task.required_evaluators:
all_criteria_rated = True
for criteria in task.criteria:
criteria_ratings = [
r for r in task.ratings
if r.criteria_name == criteria.name
]
if len(criteria_ratings) < task.required_evaluators:
all_criteria_rated = False
break
if all_criteria_rated:
task.status = "completed"
def get_task_summary(self, task_id: str) -> Dict:
"""获取任务评估汇总"""
task = self.tasks[task_id]
if task.status != "completed":
return {
"task_id": task_id,
"status": task.status,
"message": "任务尚未完成"
}
criteria_scores = {}
for criteria in task.criteria:
scores = [
r.score for r in task.ratings
if r.criteria_name == criteria.name
]
criteria_scores[criteria.name] = {
"mean": statistics.mean(scores),
"median": statistics.median(scores),
"std": statistics.stdev(scores) if len(scores) > 1 else 0,
"min": min(scores),
"max": max(scores),
"count": len(scores),
"weight": criteria.weight
}
weighted_scores = []
for criteria_name, stats in criteria_scores.items():
weight = stats["weight"]
weighted_scores.append(stats["mean"] * weight)
total_weight = sum(c.weight for c in task.criteria)
overall_score = sum(weighted_scores) / total_weight
return {
"task_id": task_id,
"status": task.status,
"question": task.question,
"answer": task.answer,
"criteria_scores": criteria_scores,
"overall_score": overall_score,
"evaluator_count": len(set(r.evaluator_id for r in task.ratings))
}
def calculate_inter_rater_reliability(self, task_id: str) -> float:
"""计算评估者间一致性"""
task = self.tasks[task_id]
evaluator_scores = {}
for rating in task.ratings:
if rating.evaluator_id not in evaluator_scores:
evaluator_scores[rating.evaluator_id] = []
evaluator_scores[rating.evaluator_id].append(rating.score)
evaluator_means = {
eid: statistics.mean(scores)
for eid, scores in evaluator_scores.items()
}
if len(evaluator_means) < 2:
return 1.0
agreements = []
evaluator_ids = list(evaluator_means.keys())
for i in range(len(evaluator_ids)):
for j in range(i + 1, len(evaluator_ids)):
mean1 = evaluator_means[evaluator_ids[i]]
mean2 = evaluator_means[evaluator_ids[j]]
max_diff = 4
agreement = 1 - abs(mean1 - mean2) / max_diff
agreements.append(agreement)
return statistics.mean(agreements) if agreements else 0
human_eval_system = HumanEvaluationSystem()
human_eval_system.register_evaluator(
evaluator_id="E001",
name="张三",
expertise=["技术", "产品"]
)
human_eval_system.register_evaluator(
evaluator_id="E002",
name="李四",
expertise=["用户体验", "内容"]
)
human_eval_system.register_evaluator(
evaluator_id="E003",
name="王五",
expertise=["技术", "教育"]
)
task = human_eval_system.create_task(
question="如何学习Python编程?",
answer="学习Python编程建议从以下几个方面入手:1. 选择合适的学习资源,如官方教程或在线课程;2. 从基础语法开始,逐步掌握变量、函数、类等概念;3. 多做练习项目,在实践中学习;4. 加入社区,与其他学习者交流。"
)
print(f"\n创建评估任务: {task.task_id}")
print(f"需要 {task.required_evaluators} 位评估者")
7.5.3 批量评估与报告
class BatchEvaluationManager:
"""批量评估管理器"""
def __init__(self, eval_system: HumanEvaluationSystem):
self.eval_system = eval_system
self.batch_results: List[Dict] = []
def create_batch_tasks(
self,
qa_pairs: List[Dict],
required_evaluators: int = 3
) -> List[EvaluationTask]:
"""批量创建评估任务"""
tasks = []
for qa in qa_pairs:
task = self.eval_system.create_task(
question=qa["question"],
answer=qa["answer"],
required_evaluators=required_evaluators
)
tasks.append(task)
print(f"已创建 {len(tasks)} 个评估任务")
return tasks
def generate_evaluation_report(self) -> Dict:
"""生成评估报告"""
completed_tasks = [
t for t in self.eval_system.tasks.values()
if t.status == "completed"
]
if not completed_tasks:
return {"message": "没有已完成的评估任务"}
all_scores = []
criteria_aggregates = {}
for task in completed_tasks:
summary = self.eval_system.get_task_summary(task.task_id)
all_scores.append(summary["overall_score"])
for criteria_name, stats in summary["criteria_scores"].items():
if criteria_name not in criteria_aggregates:
criteria_aggregates[criteria_name] = []
criteria_aggregates[criteria_name].append(stats["mean"])
report = {
"total_tasks": len(self.eval_system.tasks),
"completed_tasks": len(completed_tasks),
"overall_statistics": {
"mean": statistics.mean(all_scores),
"median": statistics.median(all_scores),
"std": statistics.stdev(all_scores) if len(all_scores) > 1 else 0,
"min": min(all_scores),
"max": max(all_scores)
},
"criteria_statistics": {
name: {
"mean": statistics.mean(scores),
"std": statistics.stdev(scores) if len(scores) > 1 else 0
}
for name, scores in criteria_aggregates.items()
},
"evaluator_statistics": {
eid: info["total_evaluations"]
for eid, info in self.eval_system.evaluators.items()
}
}
return report
def print_report(self, report: Dict):
"""打印评估报告"""
print(f"\n{'='*60}")
print("人工评估报告")
print(f"{'='*60}")
print(f"\n任务统计:")
print(f" 总任务数: {report['total_tasks']}")
print(f" 已完成: {report['completed_tasks']}")
overall = report["overall_statistics"]
print(f"\n综合得分:")
print(f" 平均: {overall['mean']:.2f}")
print(f" 中位数: {overall['median']:.2f}")
print(f" 标准差: {overall['std']:.2f}")
print(f" 范围: [{overall['min']:.2f}, {overall['max']:.2f}]")
print(f"\n各项指标:")
for name, stats in report["criteria_statistics"].items():
print(f" {name}: {stats['mean']:.2f} ± {stats['std']:.2f}")
print(f"\n评估者贡献:")
for eid, count in report["evaluator_statistics"].items():
name = self.eval_system.evaluators[eid]["name"]
print(f" {name}: {count} 次评估")
batch_manager = BatchEvaluationManager(human_eval_system)
qa_pairs = [
{
"question": "什么是API?",
"answer": "API(Application Programming Interface)是应用程序编程接口,它定义了软件组件之间如何交互。通过API,开发者可以调用其他程序的功能,而无需了解其内部实现细节。"
},
{
"question": "如何提高代码质量?",
"answer": "提高代码质量的方法包括:编写清晰的注释和文档、遵循编码规范、进行代码审查、编写单元测试、使用静态分析工具、重构冗余代码等。"
}
]
batch_tasks = batch_manager.create_batch_tasks(qa_pairs)
for task in batch_tasks:
for evaluator_id in ["E001", "E002", "E003"]:
for criteria in human_eval_system.criteria:
import random
score = random.uniform(3.5, 5.0)
human_eval_system.submit_rating(
task_id=task.task_id,
evaluator_id=evaluator_id,
criteria_name=criteria.name,
score=round(score, 1)
)
report = batch_manager.generate_evaluation_report()
batch_manager.print_report(report)
7.6 A/B测试
大白话解释: A/B测试就像做科学实验:把用户随机分成两组,A组用旧版本的AI回答,B组用新版本的AI回答,然后比较两组的反馈。这样可以科学地判断哪个版本更好,而不是凭感觉决定。
比如你想测试一个新的提示词是否更好,不能只看几个例子,而是要做A/B测试,让大量用户实际使用,用数据说话。
深入理解: A/B测试的核心要素:
- 实验设计:明确假设、控制变量
- 流量分配:随机、均匀分配用户
- 指标定义:选择合适的评估指标
- 统计显著性:判断差异是否真实存在
- 实验时长:确保样本量足够
7.6.1 实验设计
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
import random
import hashlib
from datetime import datetime, timedelta
import statistics
import math
class ExperimentStatus(Enum):
"""实验状态"""
DRAFT = "draft"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
@dataclass
class ExperimentVariant:
"""实验变体"""
name: str
description: str
config: Dict[str, Any]
weight: float = 1.0
def __post_init__(self):
if self.name == "control":
self.is_control = True
else:
self.is_control = False
@dataclass
class ExperimentMetric:
"""实验指标"""
name: str
description: str
higher_is_better: bool = True
unit: str = ""
@dataclass
class ExperimentConfig:
"""实验配置"""
experiment_id: str
name: str
description: str
variants: List[ExperimentVariant]
metrics: List[ExperimentMetric]
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
min_sample_size: int = 100
significance_level: float = 0.05
def design_prompt_experiment() -> ExperimentConfig:
"""设计提示词优化实验"""
config = ExperimentConfig(
experiment_id="EXP001",
name="提示词优化实验",
description="测试不同提示词风格对回答质量的影响",
variants=[
ExperimentVariant(
name="control",
description="原始提示词",
config={
"system_prompt": "你是一个AI助手。",
"temperature": 0.7
},
weight=1.0
),
ExperimentVariant(
name="professional",
description="专业风格提示词",
config={
"system_prompt": "你是一个专业的AI助手。请用清晰、准确、有条理的方式回答问题。确保信息准确,必要时说明信息来源。",
"temperature": 0.5
},
weight=1.0
),
ExperimentVariant(
name="friendly",
description="友好风格提示词",
config={
"system_prompt": "你是一个友好的AI助手。请用温暖、亲切的语气回答问题,让用户感到被关心和理解。",
"temperature": 0.8
},
weight=1.0
)
],
metrics=[
ExperimentMetric("user_satisfaction", "用户满意度(1-5分)", True, "分"),
ExperimentMetric("response_helpfulness", "回答有用性(1-5分)", True, "分"),
ExperimentMetric("response_time", "响应时间", False, "秒"),
ExperimentMetric("completion_rate", "对话完成率", True, "%")
],
min_sample_size=500,
significance_level=0.05
)
return config
experiment_config = design_prompt_experiment()
print(f"实验名称: {experiment_config.name}")
print(f"实验描述: {experiment_config.description}")
print(f"\n变体:")
for v in experiment_config.variants:
print(f" - {v.name}: {v.description}")
print(f"\n指标:")
for m in experiment_config.metrics:
print(f" - {m.name}: {m.description}")
7.6.2 流量分配
class TrafficAllocator:
"""流量分配器"""
def __init__(self, variants: List[ExperimentVariant]):
self.variants = variants
self.total_weight = sum(v.weight for v in variants)
def assign(self, user_id: str, experiment_id: str) -> str:
"""
为用户分配变体
使用确定性哈希确保同一用户始终分配到同一变体
"""
hash_input = f"{experiment_id}_{user_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
threshold = (hash_value % 10000) / 10000 * self.total_weight
cumulative = 0
for variant in self.variants:
cumulative += variant.weight
if threshold <= cumulative:
return variant.name
return self.variants[-1].name
def get_allocation_distribution(self) -> Dict[str, float]:
"""获取流量分配比例"""
return {
v.name: v.weight / self.total_weight
for v in self.variants
}
allocator = TrafficAllocator(experiment_config.variants)
print("流量分配比例:")
for name, ratio in allocator.get_allocation_distribution().items():
print(f" {name}: {ratio*100:.1f}%")
print("\n模拟用户分配:")
test_users = [f"user_{i}" for i in range(20)]
assignments = {}
for user_id in test_users:
variant = allocator.assign(user_id, experiment_config.experiment_id)
assignments[variant] = assignments.get(variant, 0) + 1
print("实际分配结果:")
for variant, count in assignments.items():
print(f" {variant}: {count} 用户")
7.6.3 实验执行与数据收集
@dataclass
class MetricDataPoint:
"""指标数据点"""
timestamp: datetime
user_id: str
variant: str
metric_name: str
value: float
metadata: Optional[Dict] = None
class ExperimentCollector:
"""实验数据收集器"""
def __init__(self, config: ExperimentConfig):
self.config = config
self.data_points: List[MetricDataPoint] = []
self.user_variants: Dict[str, str] = {}
def record(
self,
user_id: str,
metric_name: str,
value: float,
metadata: Dict = None
):
"""记录指标数据"""
if user_id not in self.user_variants:
raise ValueError(f"用户 {user_id} 未分配变体")
if metric_name not in [m.name for m in self.config.metrics]:
raise ValueError(f"无效指标: {metric_name}")
data_point = MetricDataPoint(
timestamp=datetime.now(),
user_id=user_id,
variant=self.user_variants[user_id],
metric_name=metric_name,
value=value,
metadata=metadata
)
self.data_points.append(data_point)
def assign_user(self, user_id: str, variant: str):
"""记录用户分配"""
self.user_variants[user_id] = variant
def get_variant_data(
self,
variant_name: str,
metric_name: str
) -> List[float]:
"""获取特定变体的指标数据"""
return [
dp.value for dp in self.data_points
if dp.variant == variant_name and dp.metric_name == metric_name
]
def get_summary(self) -> Dict:
"""获取数据摘要"""
summary = {
"total_users": len(self.user_variants),
"total_data_points": len(self.data_points),
"by_variant": {},
"by_metric": {}
}
for variant in self.config.variants:
users = [
uid for uid, v in self.user_variants.items()
if v == variant.name
]
summary["by_variant"][variant.name] = {
"users": len(users),
"data_points": sum(
1 for dp in self.data_points
if dp.variant == variant.name
)
}
for metric in self.config.metrics:
summary["by_metric"][metric.name] = {
"total": sum(
1 for dp in self.data_points
if dp.metric_name == metric.name
)
}
return summary
collector = ExperimentCollector(experiment_config)
for i in range(100):
user_id = f"user_{i}"
variant = allocator.assign(user_id, experiment_config.experiment_id)
collector.assign_user(user_id, variant)
import random
for user_id in collector.user_variants:
variant = collector.user_variants[user_id]
if variant == "control":
satisfaction = random.gauss(3.5, 1.0)
helpfulness = random.gauss(3.6, 0.9)
elif variant == "professional":
satisfaction = random.gauss(4.0, 0.8)
helpfulness = random.gauss(4.2, 0.7)
else:
satisfaction = random.gauss(3.8, 0.9)
helpfulness = random.gauss(3.9, 0.8)
satisfaction = max(1, min(5, satisfaction))
helpfulness = max(1, min(5, helpfulness))
collector.record(user_id, "user_satisfaction", round(satisfaction, 2))
collector.record(user_id, "response_helpfulness", round(helpfulness, 2))
collector.record(user_id, "response_time", random.gauss(2.5, 0.5))
collector.record(user_id, "completion_rate", random.choice([0, 1]))
summary = collector.get_summary()
print("数据收集摘要:")
print(f" 总用户数: {summary['total_users']}")
print(f" 总数据点: {summary['total_data_points']}")
print(f"\n按变体:")
for variant, stats in summary["by_variant"].items():
print(f" {variant}: {stats['users']} 用户, {stats['data_points']} 数据点")
7.6.4 统计分析与结果解读
class ExperimentAnalyzer:
"""实验分析器"""
def __init__(
self,
config: ExperimentConfig,
collector: ExperimentCollector
):
self.config = config
self.collector = collector
def analyze_metric(self, metric_name: str) -> Dict:
"""分析单个指标"""
metric = next(
m for m in self.config.metrics
if m.name == metric_name
)
variant_stats = {}
for variant in self.config.variants:
values = self.collector.get_variant_data(
variant.name, metric_name
)
if not values:
continue
variant_stats[variant.name] = {
"n": len(values),
"mean": statistics.mean(values),
"std": statistics.stdev(values) if len(values) > 1 else 0,
"se": statistics.stdev(values) / math.sqrt(len(values)) if len(values) > 1 else 0,
"median": statistics.median(values),
"min": min(values),
"max": max(values)
}
if len(variant_stats) >= 2:
control_name = "control"
if control_name in variant_stats:
control_stats = variant_stats[control_name]
for variant_name, stats in variant_stats.items():
if variant_name == control_name:
continue
if control_stats["mean"] != 0:
lift = (stats["mean"] - control_stats["mean"]) / abs(control_stats["mean"]) * 100
else:
lift = 0
is_significant, p_value = self._significance_test(
self.collector.get_variant_data(control_name, metric_name),
self.collector.get_variant_data(variant_name, metric_name)
)
stats["lift"] = lift
stats["significant"] = is_significant
stats["p_value"] = p_value
best_variant = max(
variant_stats.items(),
key=lambda x: x[1]["mean"] if metric.higher_is_better else -x[1]["mean"]
)
return {
"metric_name": metric_name,
"higher_is_better": metric.higher_is_better,
"unit": metric.unit,
"variant_stats": variant_stats,
"best_variant": best_variant[0]
}
def _significance_test(
self,
control_values: List[float],
treatment_values: List[float],
alpha: float = 0.05
) -> tuple:
"""
显著性检验(双样本t检验)
Returns:
(is_significant, p_value)
"""
if len(control_values) < 2 or len(treatment_values) < 2:
return False, 1.0
n1, n2 = len(control_values), len(treatment_values)
mean1 = statistics.mean(control_values)
mean2 = statistics.mean(treatment_values)
var1 = statistics.variance(control_values)
var2 = statistics.variance(treatment_values)
pooled_se = math.sqrt(var1/n1 + var2/n2)
if pooled_se == 0:
return False, 1.0
t_stat = (mean2 - mean1) / pooled_se
df = n1 + n2 - 2
p_value = self._t_distribution_pvalue(abs(t_stat), df)
is_significant = p_value < alpha
return is_significant, p_value
def _t_distribution_pvalue(self, t: float, df: int) -> float:
"""简化版t分布p值计算"""
if df > 30:
import math
p = 2 * (1 - 0.5 * (1 + math.erf(t / math.sqrt(2))))
return p
critical_values = {
1: {0.05: 12.71, 0.01: 63.66},
5: {0.05: 2.57, 0.01: 4.03},
10: {0.05: 2.23, 0.01: 3.17},
20: {0.05: 2.09, 0.01: 2.85},
30: {0.05: 2.04, 0.01: 2.75}
}
closest_df = min(critical_values.keys(), key=lambda x: abs(x - df))
if t > critical_values[closest_df].get(0.01, 3.0):
return 0.005
elif t > critical_values[closest_df].get(0.05, 2.0):
return 0.025
else:
return 0.10
def analyze_all(self) -> Dict:
"""分析所有指标"""
results = {}
for metric in self.config.metrics:
results[metric.name] = self.analyze_metric(metric.name)
return results
def generate_report(self, results: Dict) -> str:
"""生成分析报告"""
report_lines = []
report_lines.append(f"\n{'='*60}")
report_lines.append(f"实验分析报告: {self.config.name}")
report_lines.append(f"{'='*60}")
for metric_name, analysis in results.items():
report_lines.append(f"\n--- {metric_name} ({analysis['unit']}) ---")
report_lines.append(f"优化方向: {'越高越好' if analysis['higher_is_better'] else '越低越好'}")
for variant_name, stats in analysis["variant_stats"].items():
report_lines.append(f"\n {variant_name}:")
report_lines.append(f" 样本量: {stats['n']}")
report_lines.append(f" 平均值: {stats['mean']:.4f}")
report_lines.append(f" 标准差: {stats['std']:.4f}")
report_lines.append(f" 标准误: {stats['se']:.4f}")
if "lift" in stats:
lift = stats["lift"]
sig = "✓ 显著" if stats["significant"] else "✗ 不显著"
report_lines.append(f" 相对提升: {lift:+.2f}% {sig}")
report_lines.append(f" p值: {stats['p_value']:.4f}")
report_lines.append(f"\n 最佳变体: {analysis['best_variant']}")
return "\n".join(report_lines)
analyzer = ExperimentAnalyzer(experiment_config, collector)
results = analyzer.analyze_all()
report = analyzer.generate_report(results)
print(report)
7.6.5 实验管理
class ABTestingSystem:
"""A/B测试系统"""
def __init__(self):
self.experiments: Dict[str, ExperimentConfig] = {}
self.collectors: Dict[str, ExperimentCollector] = {}
self.allocators: Dict[str, TrafficAllocator] = {}
self.status: Dict[str, ExperimentStatus] = {}
def create_experiment(self, config: ExperimentConfig) -> str:
"""创建实验"""
exp_id = config.experiment_id
self.experiments[exp_id] = config
self.collectors[exp_id] = ExperimentCollector(config)
self.allocators[exp_id] = TrafficAllocator(config.variants)
self.status[exp_id] = ExperimentStatus.DRAFT
print(f"已创建实验: {config.name} ({exp_id})")
return exp_id
def start_experiment(self, experiment_id: str):
"""启动实验"""
if experiment_id not in self.experiments:
raise ValueError(f"实验 {experiment_id} 不存在")
self.status[experiment_id] = ExperimentStatus.RUNNING
self.experiments[experiment_id].start_time = datetime.now()
print(f"实验 {experiment_id} 已启动")
def stop_experiment(self, experiment_id: str):
"""停止实验"""
if experiment_id not in self.experiments:
raise ValueError(f"实验 {experiment_id} 不存在")
self.status[experiment_id] = ExperimentStatus.COMPLETED
self.experiments[experiment_id].end_time = datetime.now()
print(f"实验 {experiment_id} 已停止")
def assign_variant(self, experiment_id: str, user_id: str) -> str:
"""为用户分配变体"""
if self.status.get(experiment_id) != ExperimentStatus.RUNNING:
raise ValueError(f"实验 {experiment_id} 未运行")
variant = self.allocators[experiment_id].assign(
user_id, experiment_id
)
self.collectors[experiment_id].assign_user(user_id, variant)
return variant
def record_metric(
self,
experiment_id: str,
user_id: str,
metric_name: str,
value: float,
metadata: Dict = None
):
"""记录指标"""
if self.status.get(experiment_id) != ExperimentStatus.RUNNING:
raise ValueError(f"实验 {experiment_id} 未运行")
self.collectors[experiment_id].record(
user_id, metric_name, value, metadata
)
def get_results(self, experiment_id: str) -> Dict:
"""获取实验结果"""
if experiment_id not in self.experiments:
raise ValueError(f"实验 {experiment_id} 不存在")
config = self.experiments[experiment_id]
collector = self.collectors[experiment_id]
analyzer = ExperimentAnalyzer(config, collector)
results = analyzer.analyze_all()
return {
"experiment_id": experiment_id,
"status": self.status[experiment_id].value,
"start_time": config.start_time.isoformat() if config.start_time else None,
"end_time": config.end_time.isoformat() if config.end_time else None,
"results": results,
"summary": collector.get_summary()
}
ab_system = ABTestingSystem()
exp_id = ab_system.create_experiment(experiment_config)
ab_system.start_experiment(exp_id)
print("\n模拟实验运行...")
for i in range(200):
user_id = f"test_user_{i}"
variant = ab_system.assign_variant(exp_id, user_id)
if variant == "control":
sat = random.gauss(3.5, 1.0)
help_val = random.gauss(3.6, 0.9)
elif variant == "professional":
sat = random.gauss(4.0, 0.8)
help_val = random.gauss(4.2, 0.7)
else:
sat = random.gauss(3.8, 0.9)
help_val = random.gauss(3.9, 0.8)
ab_system.record_metric(exp_id, user_id, "user_satisfaction", max(1, min(5, sat)))
ab_system.record_metric(exp_id, user_id, "response_helpfulness", max(1, min(5, help_val)))
ab_system.record_metric(exp_id, user_id, "response_time", random.gauss(2.5, 0.5))
ab_system.record_metric(exp_id, user_id, "completion_rate", random.choice([0, 1]))
ab_system.stop_experiment(exp_id)
results = ab_system.get_results(exp_id)
print(f"\n实验状态: {results['status']}")
print(f"总用户数: {results['summary']['total_users']}")
7.7 多变量测试(Multivariate Testing)
大白话解释: A/B测试只比较两个版本,但有时候你想同时测试多个因素。比如,你想知道:
- 提示词风格(专业 vs 友好)
- 温度参数(0.5 vs 0.7 vs 0.9)
- 最大Token数(500 vs 1000)
这些因素组合起来有很多种可能,多变量测试就是用来处理这种情况的。
深入理解:
@dataclass
class Factor:
"""实验因素"""
name: str
levels: List[Dict[str, Any]]
def __post_init__(self):
self.level_names = [str(i) for i in range(len(self.levels))]
@dataclass
class MultivariateConfig:
"""多变量实验配置"""
experiment_id: str
name: str
factors: List[Factor]
metrics: List[ExperimentMetric]
def design_multivariate_experiment() -> MultivariateConfig:
"""设计多变量实验"""
config = MultivariateConfig(
experiment_id="MV001",
name="提示词多因素优化实验",
factors=[
Factor(
name="prompt_style",
levels=[
{"system_prompt": "你是一个AI助手。"},
{"system_prompt": "你是一个专业的AI助手。"},
{"system_prompt": "你是一个友好的AI助手。"}
]
),
Factor(
name="temperature",
levels=[
{"temperature": 0.5},
{"temperature": 0.7},
{"temperature": 0.9}
]
),
Factor(
name="max_tokens",
levels=[
{"max_tokens": 500},
{"max_tokens": 1000}
]
)
],
metrics=[
ExperimentMetric("quality_score", "回答质量", True),
ExperimentMetric("user_rating", "用户评分", True)
]
)
return config
mv_config = design_multivariate_experiment()
total_combinations = 1
for factor in mv_config.factors:
total_combinations *= len(factor.levels)
print(f"多变量实验: {mv_config.name}")
print(f"因素数量: {len(mv_config.factors)}")
print(f"总组合数: {total_combinations}")
print("\n因素详情:")
for factor in mv_config.factors:
print(f" {factor.name}: {len(factor.levels)} 个水平")
class MultivariateAllocator:
"""多变量分配器"""
def __init__(self, config: MultivariateConfig):
self.config = config
self.combinations = self._generate_combinations()
def _generate_combinations(self) -> List[Dict]:
"""生成所有组合"""
import itertools
level_indices = [
range(len(factor.levels))
for factor in self.config.factors
]
combinations = []
for indices in itertools.product(*level_indices):
combination = {
factor.name: factor.levels[idx]
for factor, idx in zip(self.config.factors, indices)
}
combination["_indices"] = indices
combinations.append(combination)
return combinations
def assign(self, user_id: str) -> Dict:
"""为用户分配组合"""
hash_value = int(
hashlib.md5(f"{self.config.experiment_id}_{user_id}".encode()).hexdigest(),
16
)
index = hash_value % len(self.combinations)
return self.combinations[index]
mv_allocator = MultivariateAllocator(mv_config)
print(f"\n生成的组合数: {len(mv_allocator.combinations)}")
test_user = "test_user_001"
assigned = mv_allocator.assign(test_user)
print(f"\n用户 {test_user} 分配的组合:")
for key, value in assigned.items():
if key != "_indices":
print(f" {key}: {value}")
7.8 持续监控
大白话解释: 持续监控就像给AI装了一个"健康监测仪",24小时盯着它的表现。一旦发现异常——比如回答质量突然下降、响应时间变长、用户投诉增加——就立即报警,让你能及时处理。
这就像医院的监护仪,病人的心率、血压、血氧都在实时监控,一旦出现异常就报警。AI应用也需要这样的监护。
深入理解: 持续监控的核心组件:
- 指标收集:实时收集各项性能指标
- 异常检测:识别异常模式和趋势
- 告警机制:及时通知相关人员
- 可视化面板:直观展示系统状态
- 日志分析:深入分析问题原因
7.8.1 指标收集系统
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
from datetime import datetime, timedelta
from enum import Enum
import statistics
import time
import json
class MetricType(Enum):
"""指标类型"""
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
@dataclass
class MetricConfig:
"""指标配置"""
name: str
description: str
metric_type: MetricType
unit: str
warning_threshold: float
error_threshold: float
critical_threshold: float
higher_is_better: bool = True
window_size: int = 100
@dataclass
class MetricDataPoint:
"""指标数据点"""
timestamp: datetime
value: float
metadata: Optional[Dict] = None
class MetricsCollector:
"""指标收集器"""
def __init__(self):
self.metrics: Dict[str, List[MetricDataPoint]] = {}
self.configs: Dict[str, MetricConfig] = {}
def register_metric(self, config: MetricConfig):
"""注册指标"""
self.configs[config.name] = config
self.metrics[config.name] = []
print(f"已注册指标: {config.name} ({config.description})")
def record(
self,
metric_name: str,
value: float,
metadata: Dict = None
):
"""记录指标值"""
if metric_name not in self.metrics:
raise ValueError(f"未注册的指标: {metric_name}")
data_point = MetricDataPoint(
timestamp=datetime.now(),
value=value,
metadata=metadata
)
self.metrics[metric_name].append(data_point)
config = self.configs[metric_name]
if len(self.metrics[metric_name]) > config.window_size:
self.metrics[metric_name] = self.metrics[metric_name][-config.window_size:]
def get_statistics(self, metric_name: str) -> Dict:
"""获取指标统计"""
if metric_name not in self.metrics:
return {}
values = [dp.value for dp in self.metrics[metric_name]]
if not values:
return {}
return {
"count": len(values),
"mean": statistics.mean(values),
"median": statistics.median(values),
"std": statistics.stdev(values) if len(values) > 1 else 0,
"min": min(values),
"max": max(values),
"latest": values[-1],
"sum": sum(values)
}
def get_time_series(
self,
metric_name: str,
start_time: datetime = None,
end_time: datetime = None
) -> List[Dict]:
"""获取时间序列数据"""
if metric_name not in self.metrics:
return []
data_points = self.metrics[metric_name]
if start_time:
data_points = [
dp for dp in data_points
if dp.timestamp >= start_time
]
if end_time:
data_points = [
dp for dp in data_points
if dp.timestamp <= end_time
]
return [
{
"timestamp": dp.timestamp.isoformat(),
"value": dp.value,
"metadata": dp.metadata
}
for dp in data_points
]
collector = MetricsCollector()
collector.register_metric(MetricConfig(
name="response_time",
description="响应时间",
metric_type=MetricType.HISTOGRAM,
unit="seconds",
warning_threshold=3.0,
error_threshold=5.0,
critical_threshold=10.0,
higher_is_better=False
))
collector.register_metric(MetricConfig(
name="token_usage",
description="Token使用量",
metric_type=MetricType.COUNTER,
unit="tokens",
warning_threshold=2000,
error_threshold=3000,
critical_threshold=4000,
higher_is_better=False
))
collector.register_metric(MetricConfig(
name="error_rate",
description="错误率",
metric_type=MetricType.GAUGE,
unit="%",
warning_threshold=5.0,
error_threshold=10.0,
critical_threshold=20.0,
higher_is_better=False
))
collector.register_metric(MetricConfig(
name="user_satisfaction",
description="用户满意度",
metric_type=MetricType.GAUGE,
unit="score",
warning_threshold=3.5,
error_threshold=3.0,
critical_threshold=2.5,
higher_is_better=True
))
import random
for i in range(50):
collector.record("response_time", random.gauss(2.0, 0.5))
collector.record("token_usage", random.randint(500, 1500))
collector.record("error_rate", random.gauss(3.0, 1.0))
collector.record("user_satisfaction", random.gauss(4.0, 0.5))
print("\n指标统计:")
for metric_name in collector.configs:
stats = collector.get_statistics(metric_name)
print(f"\n{metric_name}:")
print(f" 平均: {stats['mean']:.2f}")
print(f" 最新: {stats['latest']:.2f}")
print(f" 范围: [{stats['min']:.2f}, {stats['max']:.2f}]")
7.8.2 异常检测
class AnomalyDetector:
"""异常检测器"""
def __init__(
self,
sensitivity: float = 2.0,
min_samples: int = 10
):
self.sensitivity = sensitivity
self.min_samples = min_samples
def detect_point_anomaly(
self,
values: List[float],
new_value: float
) -> Dict:
"""
检测点异常
使用Z-score方法检测单个数据点是否异常
"""
if len(values) < self.min_samples:
return {
"is_anomaly": False,
"reason": "样本量不足",
"z_score": 0
}
mean = statistics.mean(values)
std = statistics.stdev(values)
if std == 0:
return {
"is_anomaly": False,
"reason": "标准差为零",
"z_score": 0
}
z_score = abs(new_value - mean) / std
is_anomaly = z_score > self.sensitivity
return {
"is_anomaly": is_anomaly,
"z_score": z_score,
"threshold": self.sensitivity,
"deviation": new_value - mean,
"mean": mean,
"std": std
}
def detect_trend(
self,
values: List[float],
window: int = 10
) -> Dict:
"""检测趋势变化"""
if len(values) < window * 2:
return {
"trend": "insufficient_data",
"change_rate": 0
}
recent = values[-window:]
previous = values[-window*2:-window]
recent_mean = statistics.mean(recent)
previous_mean = statistics.mean(previous)
if previous_mean != 0:
change_rate = (recent_mean - previous_mean) / abs(previous_mean) * 100
else:
change_rate = 0
if change_rate > 10:
trend = "increasing"
elif change_rate < -10:
trend = "decreasing"
else:
trend = "stable"
return {
"trend": trend,
"change_rate": change_rate,
"recent_mean": recent_mean,
"previous_mean": previous_mean
}
def detect_collective_anomaly(
self,
values: List[float],
window: int = 20
) -> Dict:
"""检测集体异常(连续多个异常点)"""
if len(values) < window:
return {
"has_collective_anomaly": False,
"reason": "样本量不足"
}
recent_values = values[-window:]
historical_values = values[:-window]
if len(historical_values) < self.min_samples:
return {
"has_collective_anomaly": False,
"reason": "历史数据不足"
}
hist_mean = statistics.mean(historical_values)
hist_std = statistics.stdev(historical_values)
if hist_std == 0:
return {
"has_collective_anomaly": False,
"reason": "历史标准差为零"
}
z_scores = [
abs(v - hist_mean) / hist_std
for v in recent_values
]
anomaly_count = sum(1 for z in z_scores if z > self.sensitivity)
anomaly_ratio = anomaly_count / len(z_scores)
has_collective_anomaly = anomaly_ratio > 0.5
return {
"has_collective_anomaly": has_collective_anomaly,
"anomaly_ratio": anomaly_ratio,
"anomaly_count": anomaly_count,
"window_size": window
}
anomaly_detector = AnomalyDetector(sensitivity=2.5)
test_values = [random.gauss(10, 2) for _ in range(30)]
test_values.append(25)
result = anomaly_detector.detect_point_anomaly(test_values[:-1], test_values[-1])
print(f"\n点异常检测:")
print(f" 是否异常: {result['is_anomaly']}")
print(f" Z-score: {result['z_score']:.2f}")
print(f" 阈值: {result['threshold']}")
trend_values = list(range(10, 30))
trend_result = anomaly_detector.detect_trend(trend_values)
print(f"\n趋势检测:")
print(f" 趋势: {trend_result['trend']}")
print(f" 变化率: {trend_result['change_rate']:.1f}%")
7.8.3 告警系统
class AlertLevel(Enum):
"""告警级别"""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class Alert:
"""告警"""
alert_id: str
timestamp: datetime
metric_name: str
level: AlertLevel
message: str
current_value: float
threshold: float
metadata: Optional[Dict] = None
class AlertManager:
"""告警管理器"""
def __init__(self):
self.alerts: List[Alert] = []
self.handlers: List[Callable] = []
self.cooldown: Dict[str, datetime] = {}
self.cooldown_period = timedelta(minutes=5)
self.alert_counter = 0
def register_handler(self, handler: Callable):
"""注册告警处理器"""
self.handlers.append(handler)
def trigger(
self,
metric_name: str,
level: AlertLevel,
message: str,
current_value: float,
threshold: float,
metadata: Dict = None
):
"""触发告警"""
alert_key = f"{metric_name}_{level.value}"
if alert_key in self.cooldown:
if datetime.now() - self.cooldown[alert_key] < self.cooldown_period:
return
self.alert_counter += 1
alert = Alert(
alert_id=f"ALT{self.alert_counter:06d}",
timestamp=datetime.now(),
metric_name=metric_name,
level=level,
message=message,
current_value=current_value,
threshold=threshold,
metadata=metadata
)
self.alerts.append(alert)
self.cooldown[alert_key] = datetime.now()
for handler in self.handlers:
try:
handler(alert)
except Exception as e:
print(f"告警处理器执行失败: {e}")
def get_recent_alerts(self, hours: int = 24) -> List[Alert]:
"""获取最近告警"""
cutoff = datetime.now() - timedelta(hours=hours)
return [a for a in self.alerts if a.timestamp > cutoff]
def get_alert_statistics(self) -> Dict:
"""获取告警统计"""
recent_alerts = self.get_recent_alerts(24)
by_level = {}
by_metric = {}
for alert in recent_alerts:
level_name = alert.level.value
by_level[level_name] = by_level.get(level_name, 0) + 1
by_metric[alert.metric_name] = by_metric.get(alert.metric_name, 0) + 1
return {
"total_24h": len(recent_alerts),
"by_level": by_level,
"by_metric": by_metric
}
alert_manager = AlertManager()
def console_alert_handler(alert: Alert):
"""控制台告警处理器"""
level_emoji = {
AlertLevel.INFO: "ℹ️",
AlertLevel.WARNING: "⚠️",
AlertLevel.ERROR: "❌",
AlertLevel.CRITICAL: "🚨"
}
print(f"\n{level_emoji[alert.level]} [{alert.level.value.upper()}] 告警")
print(f" 时间: {alert.timestamp}")
print(f" 指标: {alert.metric_name}")
print(f" 消息: {alert.message}")
print(f" 当前值: {alert.current_value:.2f}, 阈值: {alert.threshold:.2f}")
alert_manager.register_handler(console_alert_handler)
alert_manager.trigger(
metric_name="response_time",
level=AlertLevel.WARNING,
message="响应时间超过警告阈值",
current_value=4.5,
threshold=3.0
)
alert_manager.trigger(
metric_name="error_rate",
level=AlertLevel.ERROR,
message="错误率超过错误阈值",
current_value=12.5,
threshold=10.0
)
stats = alert_manager.get_alert_statistics()
print(f"\n告警统计:")
print(f" 24小时内总告警: {stats['total_24h']}")
print(f" 按级别: {stats['by_level']}")
7.8.4 监控面板
class MonitoringDashboard:
"""监控面板"""
def __init__(
self,
collector: MetricsCollector,
anomaly_detector: AnomalyDetector,
alert_manager: AlertManager
):
self.collector = collector
self.anomaly_detector = anomaly_detector
self.alert_manager = alert_manager
def get_dashboard_data(self) -> Dict:
"""获取面板数据"""
dashboard = {
"timestamp": datetime.now().isoformat(),
"metrics": {},
"alerts": {},
"anomalies": {}
}
for metric_name, config in self.collector.configs.items():
stats = self.collector.get_statistics(metric_name)
if not stats:
continue
values = [dp.value for dp in self.collector.metrics[metric_name]]
trend = self.anomaly_detector.detect_trend(values)
anomaly = None
if len(values) > 1:
anomaly = self.anomaly_detector.detect_point_anomaly(
values[:-1], values[-1]
)
dashboard["metrics"][metric_name] = {
"description": config.description,
"unit": config.unit,
"type": config.metric_type.value,
"statistics": stats,
"trend": trend,
"anomaly": anomaly,
"thresholds": {
"warning": config.warning_threshold,
"error": config.error_threshold,
"critical": config.critical_threshold
},
"higher_is_better": config.higher_is_better
}
alert_stats = self.alert_manager.get_alert_statistics()
recent_alerts = self.alert_manager.get_recent_alerts(24)
dashboard["alerts"] = {
"statistics": alert_stats,
"recent": [
{
"id": a.alert_id,
"timestamp": a.timestamp.isoformat(),
"level": a.level.value,
"metric": a.metric_name,
"message": a.message
}
for a in recent_alerts[-10:]
]
}
return dashboard
def render_text_dashboard(self) -> str:
"""渲染文本面板"""
data = self.get_dashboard_data()
lines = []
lines.append(f"\n{'='*60}")
lines.append("LLM应用监控面板")
lines.append(f"{'='*60}")
lines.append(f"更新时间: {data['timestamp']}")
lines.append(f"\n{'─'*60}")
lines.append("指标概览")
lines.append(f"{'─'*60}")
for metric_name, metric_data in data["metrics"].items():
stats = metric_data["statistics"]
trend = metric_data["trend"]
trend_emoji = {
"increasing": "📈",
"decreasing": "📉",
"stable": "➡️",
"insufficient_data": "❓"
}
lines.append(f"\n{metric_name} ({metric_data['unit']})")
lines.append(f" 当前: {stats['latest']:.2f}")
lines.append(f" 平均: {stats['mean']:.2f} ± {stats['std']:.2f}")
lines.append(f" 趋势: {trend_emoji.get(trend['trend'], '❓')} {trend['trend']}")
if metric_data.get("anomaly") and metric_data["anomaly"]["is_anomaly"]:
lines.append(f" ⚠️ 异常检测: Z-score = {metric_data['anomaly']['z_score']:.2f}")
lines.append(f"\n{'─'*60}")
lines.append("告警概览")
lines.append(f"{'─'*60}")
alert_stats = data["alerts"]["statistics"]
lines.append(f"24小时内: {alert_stats['total_24h']} 条告警")
for level, count in alert_stats["by_level"].items():
lines.append(f" {level}: {count}")
return "\n".join(lines)
dashboard = MonitoringDashboard(collector, anomaly_detector, alert_manager)
print(dashboard.render_text_dashboard())
7.9 用户反馈收集
大白话解释: 机器评估和自动化监控虽然重要,但用户的真实感受才是最终标准。用户反馈收集就是建立一个渠道,让用户能够方便地表达对AI回答的满意程度、提出建议、甚至提供正确答案。
这些反馈不仅能发现问题,还能作为微调模型的训练数据。
深入理解:
7.9.1 反馈类型设计
from enum import Enum
class FeedbackType(Enum):
"""反馈类型"""
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
CORRECTION = "correction"
SUGGESTION = "suggestion"
@dataclass
class UserFeedback:
"""用户反馈"""
feedback_id: str
user_id: str
session_id: str
question: str
answer: str
feedback_type: FeedbackType
rating: Optional[int] = None
comment: Optional[str] = None
corrected_answer: Optional[str] = None
tags: List[str] = field(default_factory=list)
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
7.9.2 反馈收集系统
class FeedbackCollector:
"""用户反馈收集器"""
def __init__(self):
self.feedbacks: List[UserFeedback] = []
self.handlers: List[Callable] = []
self.feedback_counter = 0
def register_handler(self, handler: Callable):
"""注册反馈处理器"""
self.handlers.append(handler)
def collect(
self,
user_id: str,
session_id: str,
question: str,
answer: str,
feedback_type: FeedbackType,
rating: int = None,
comment: str = None,
corrected_answer: str = None,
tags: List[str] = None
) -> UserFeedback:
"""收集反馈"""
self.feedback_counter += 1
feedback_id = f"FB{self.feedback_counter:06d}"
feedback = UserFeedback(
feedback_id=feedback_id,
user_id=user_id,
session_id=session_id,
question=question,
answer=answer,
feedback_type=feedback_type,
rating=rating,
comment=comment,
corrected_answer=corrected_answer,
tags=tags or []
)
self.feedbacks.append(feedback)
for handler in self.handlers:
try:
handler(feedback)
except Exception as e:
print(f"反馈处理器执行失败: {e}")
return feedback
def get_by_type(self, feedback_type: FeedbackType) -> List[UserFeedback]:
"""按类型获取反馈"""
return [f for f in self.feedbacks if f.feedback_type == feedback_type]
def get_by_user(self, user_id: str) -> List[UserFeedback]:
"""按用户获取反馈"""
return [f for f in self.feedbacks if f.user_id == user_id]
def get_statistics(self) -> Dict:
"""获取反馈统计"""
if not self.feedbacks:
return {"total": 0}
by_type = {}
by_rating = {}
for feedback in self.feedbacks:
type_name = feedback.feedback_type.value
by_type[type_name] = by_type.get(type_name, 0) + 1
if feedback.rating is not None:
rating_key = str(feedback.rating)
by_rating[rating_key] = by_rating.get(rating_key, 0) + 1
ratings = [f.rating for f in self.feedbacks if f.rating is not None]
rating_stats = {}
if ratings:
rating_stats = {
"mean": statistics.mean(ratings),
"median": statistics.median(ratings),
"distribution": by_rating
}
positive_count = by_type.get("positive", 0)
negative_count = by_type.get("negative", 0)
total_feedback = positive_count + negative_count
satisfaction_rate = (
positive_count / total_feedback * 100
if total_feedback > 0 else 0
)
return {
"total": len(self.feedbacks),
"by_type": by_type,
"rating_stats": rating_stats,
"satisfaction_rate": satisfaction_rate,
"corrections_count": by_type.get("correction", 0)
}
feedback_collector = FeedbackCollector()
def log_feedback_handler(feedback: UserFeedback):
"""日志反馈处理器"""
print(f"[反馈] {feedback.feedback_type.value}: {feedback.comment or '无评论'}")
feedback_collector.register_handler(log_feedback_handler)
feedback_collector.collect(
user_id="user_001",
session_id="session_001",
question="什么是Python?",
answer="Python是一种编程语言。",
feedback_type=FeedbackType.POSITIVE,
rating=4,
comment="回答简洁准确"
)
feedback_collector.collect(
user_id="user_002",
session_id="session_002",
question="如何学习机器学习?",
answer="你可以从基础开始学习。",
feedback_type=FeedbackType.NEGATIVE,
rating=2,
comment="回答太笼统,没有具体建议"
)
feedback_collector.collect(
user_id="user_003",
session_id="session_003",
question="中国的首都是哪里?",
answer="中国的首都是上海。",
feedback_type=FeedbackType.CORRECTION,
corrected_answer="中国的首都是北京。",
comment="答案错误"
)
stats = feedback_collector.get_statistics()
print(f"\n反馈统计:")
print(f" 总数: {stats['total']}")
print(f" 按类型: {stats['by_type']}")
print(f" 满意率: {stats['satisfaction_rate']:.1f}%")
print(f" 纠正数: {stats['corrections_count']}")
7.9.3 反馈分析与改进建议
class FeedbackAnalyzer:
"""反馈分析器"""
def __init__(self, collector: FeedbackCollector):
self.collector = collector
def identify_patterns(self) -> Dict:
"""识别反馈模式"""
negative_feedbacks = self.collector.get_by_type(FeedbackType.NEGATIVE)
issue_categories = {
"不准确": 0,
"不完整": 0,
"不清晰": 0,
"不友好": 0,
"其他": 0
}
for feedback in negative_feedbacks:
if feedback.comment:
comment_lower = feedback.comment.lower()
if any(kw in comment_lower for kw in ["错误", "不准", "不对"]):
issue_categories["不准确"] += 1
elif any(kw in comment_lower for kw in ["不完整", "缺少", "遗漏"]):
issue_categories["不完整"] += 1
elif any(kw in comment_lower for kw in ["不清", "模糊", "难懂"]):
issue_categories["不清晰"] += 1
elif any(kw in comment_lower for kw in ["不友好", "冷淡", "生硬"]):
issue_categories["不友好"] += 1
else:
issue_categories["其他"] += 1
return {
"total_negative": len(negative_feedbacks),
"issue_categories": issue_categories,
"top_issues": sorted(
issue_categories.items(),
key=lambda x: x[1],
reverse=True
)[:3]
}
def generate_improvement_suggestions(self) -> List[Dict]:
"""生成改进建议"""
suggestions = []
patterns = self.identify_patterns()
for issue, count in patterns["top_issues"]:
if count > 0:
if issue == "不准确":
suggestions.append({
"priority": "high",
"issue": "回答准确性问题",
"count": count,
"suggestion": "建议:检查知识库数据质量,优化提示词强调准确性,考虑添加事实核查机制"
})
elif issue == "不完整":
suggestions.append({
"priority": "medium",
"issue": "回答完整性问题",
"count": count,
"suggestion": "建议:优化提示词要求全面回答,检查RAG检索是否遗漏相关信息"
})
elif issue == "不清晰":
suggestions.append({
"priority": "medium",
"issue": "回答清晰度问题",
"count": count,
"suggestion": "建议:优化回答结构,使用分点列举,添加示例说明"
})
elif issue == "不友好":
suggestions.append({
"priority": "low",
"issue": "回答语气问题",
"count": count,
"suggestion": "建议:调整提示词增加友好语气要求,使用更温暖的表达方式"
})
corrections = self.collector.get_by_type(FeedbackType.CORRECTION)
if corrections:
suggestions.append({
"priority": "high",
"issue": "用户纠正",
"count": len(corrections),
"suggestion": f"建议:有 {len(corrections)} 条用户纠正,可用于更新知识库或微调模型"
})
return suggestions
def export_training_data(self) -> List[Dict]:
"""导出训练数据"""
training_data = []
corrections = self.collector.get_by_type(FeedbackType.CORRECTION)
for feedback in corrections:
if feedback.corrected_answer:
training_data.append({
"messages": [
{"role": "user", "content": feedback.question},
{"role": "assistant", "content": feedback.corrected_answer}
],
"source": "user_correction",
"feedback_id": feedback.feedback_id
})
positive_feedbacks = self.collector.get_by_type(FeedbackType.POSITIVE)
for feedback in positive_feedbacks:
if feedback.rating and feedback.rating >= 4:
training_data.append({
"messages": [
{"role": "user", "content": feedback.question},
{"role": "assistant", "content": feedback.answer}
],
"source": "positive_feedback",
"feedback_id": feedback.feedback_id
})
return training_data
analyzer = FeedbackAnalyzer(feedback_collector)
patterns = analyzer.identify_patterns()
print(f"\n问题模式分析:")
print(f" 负面反馈总数: {patterns['total_negative']}")
print(f" 问题分类: {patterns['issue_categories']}")
suggestions = analyzer.generate_improvement_suggestions()
print(f"\n改进建议:")
for s in suggestions:
print(f" [{s['priority']}] {s['issue']} ({s['count']}次)")
print(f" {s['suggestion']}")
training_data = analyzer.export_training_data()
print(f"\n可导出训练数据: {len(training_data)} 条")
7.9.4 反馈闭环
class FeedbackLoop:
"""反馈闭环系统"""
def __init__(
self,
collector: FeedbackCollector,
analyzer: FeedbackAnalyzer
):
self.collector = collector
self.analyzer = analyzer
self.actions: List[Dict] = []
def process_feedback(self, feedback: UserFeedback) -> Dict:
"""处理反馈并触发相应动作"""
action = {
"feedback_id": feedback.feedback_id,
"timestamp": datetime.now().isoformat(),
"actions_taken": []
}
if feedback.feedback_type == FeedbackType.CORRECTION:
action["actions_taken"].append({
"type": "add_to_review_queue",
"description": "添加到人工审核队列",
"data": {
"question": feedback.question,
"original_answer": feedback.answer,
"corrected_answer": feedback.corrected_answer
}
})
if feedback.feedback_type == FeedbackType.NEGATIVE:
if feedback.rating and feedback.rating <= 2:
action["actions_taken"].append({
"type": "alert_team",
"description": "通知团队关注",
"priority": "high"
})
if feedback.comment and len(feedback.comment) > 50:
action["actions_taken"].append({
"type": "analyze_comment",
"description": "深度分析用户评论"
})
self.actions.append(action)
return action
def get_pending_actions(self) -> List[Dict]:
"""获取待处理动作"""
pending = []
for action in self.actions:
for taken in action["actions_taken"]:
if taken["type"] == "add_to_review_queue":
pending.append({
"feedback_id": action["feedback_id"],
"action": taken,
"status": "pending"
})
return pending
def generate_weekly_report(self) -> Dict:
"""生成周报"""
stats = self.collector.get_statistics()
suggestions = self.analyzer.generate_improvement_suggestions()
patterns = self.analyzer.identify_patterns()
return {
"period": "weekly",
"generated_at": datetime.now().isoformat(),
"summary": {
"total_feedback": stats["total"],
"satisfaction_rate": stats["satisfaction_rate"],
"corrections": stats["corrections_count"]
},
"issues": patterns["issue_categories"],
"suggestions": suggestions,
"training_data_available": len(self.analyzer.export_training_data())
}
feedback_loop = FeedbackLoop(feedback_collector, analyzer)
for feedback in feedback_collector.feedbacks:
action = feedback_loop.process_feedback(feedback)
if action["actions_taken"]:
print(f"\n反馈 {feedback.feedback_id} 触发动作:")
for a in action["actions_taken"]:
print(f" - {a['description']}")
weekly_report = feedback_loop.generate_weekly_report()
print(f"\n{'='*60}")
print("周报摘要")
print(f"{'='*60}")
print(f"总反馈数: {weekly_report['summary']['total_feedback']}")
print(f"满意率: {weekly_report['summary']['satisfaction_rate']:.1f}%")
print(f"纠正数: {weekly_report['summary']['corrections']}")
print(f"可导出训练数据: {weekly_report['training_data_available']} 条")
第七部分总结
核心要点回顾
| 主题 | 核心内容 |
|---|---|
| 评估指标体系 | 准确性、相关性、连贯性、安全性、性能多维度评估 |
| 自动化测试 | 测试用例设计、执行、断言、报告生成 |
| 黄金数据集 | 高质量标准答案库,用于基准测试和微调 |
| 人工评估 | 多人评估、评估标准设计、一致性检查 |
| A/B测试 | 实验设计、流量分配、统计分析 |
| 持续监控 | 指标收集、异常检测、告警机制 |
| 用户反馈 | 反馈收集、分析、改进建议、训练数据导出 |
最佳实践建议
-
建立完整的测试体系
- 自动化测试覆盖核心场景
- 黄金数据集定期更新
- 人工评估作为补充
-
持续监控不可少
- 实时监控关键指标
- 设置合理的告警阈值
- 建立快速响应机制
-
重视用户反馈
- 降低反馈门槛
- 及时处理负面反馈
- 将反馈转化为改进动力
-
数据驱动决策
- A/B测试验证改进效果
- 用数据说话,避免主观判断
- 建立长期追踪机制
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)