模型监控进阶:数据漂移检测与自动重训练——让ML模型自己“感知“性能下降
本文是《模型服务的水平扩展:从单机到K8s》的进阶篇,聚焦生产环境中模型性能监控的最后一环——数据漂移检测与自动重训练。当模型开始"老化"时,如何让它自动"感知"并触发自我修复?包含完整的Evidently AI集成、MLflow模型管理、以及GitHub Actions自动重训练流水线。
一、为什么数据漂移比模型Bug更可怕?
什么是数据漂移?
数据漂移(Data Drift)是指生产环境中的数据分布与训练数据分布发生显著变化的现象。它会导致模型性能逐渐下降,但这个过程往往是渐进的,不易被察觉。
典型的漂移场景
| 场景 | 训练数据 | 生产数据变化 | 影响 |
|---|---|---|---|
| 季节性变化 | 去年冬季数据 | 今年春节、618大促 | 销量预测失效 |
| 市场变化 | 疫情前数据 | 疫情后消费习惯改变 | 推荐系统失效 |
| 外部冲击 | 常规价格 | 价格战、突发事件 | 价格预测偏差 |
| 特征变化 | 用户正常行为 | 新功能上线 | 流失预测失效 |
| 竞争对手 | 竞争对手少 | 新玩家入局 | 市场占有率预测不准 |
一个真实的"静默失败"案例
某电商平台的销量预测模型,上线时MAPE为2.1%,运行3个月后悄然上升到8.5%——业务团队以为是系统问题,排查了服务器、数据库、日志,折腾了两周才发现是模型"老了":训练数据是去年Q4的,但用户购买习惯已经发生了结构性变化。
这就是数据漂移的可怕之处:它不会报错,只会慢慢"变蠢"。
二、解决方案架构概览
本文将搭建以下完整监控-检测-行动闭环:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ API服务 │───▶│ Prometheus │───▶│ Grafana │───▶│ 告警通知 │
│ /metrics │ │ 数据采集 │ │ 可视化 │ │ Slack/钉钉 │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ 漂移检测服务 │
│ (Evidently) │
└─────────────┘
│
▼
┌─────────────┐ ┌─────────────┐
│ 数据快照 │───▶│ 漂移报告 │
│ 日益存储 │ │ (HTML/JSON) │
└─────────────┘ └─────────────┘
│
(超过阈值)
│
▼
┌─────────────┐ ┌─────────────┐
│ 重训练触发 │───▶│ MLflow │
│ (GitHub │ │ 模型注册 │
│ Actions) │ │ + 版本管理 │
└─────────────┘ └─────────────┘
三、第一步:集成Evidently AI进行数据漂移检测
什么是Evidently?
Evidently是一个开源的ML监控框架,专门用于检测数据漂移和模型性能退化。它支持多种漂移检测算法,输出美观的HTML报告,与Prometheus/Grafana完美集成。
1. 安装依赖
pip install evidently pandas numpy prometheus-flask-exporter mlflow scikit-learn
2. 创建漂移检测模块
# drift_detector.py
import pandas as pd
import numpy as np
from evidently.dashboard import Dashboard
from evidently.tabs import DataDriftTab, NumTargetDriftTab
from evidently.pipeline import ColumnMapping
from evidently.metric_preset import DataDriftPreset, ClassificationPreset
import json
from datetime import datetime
import os
class DriftDetector:
"""数据漂移检测器"""
def __init__(self, reference_data: pd.DataFrame,
target_column: str = None,
numerical_columns: list = None,
categorical_columns: list = None):
self.reference_data = reference_data
self.target_column = target_column
# 定义列映射
self.column_mapping = ColumnMapping()
self.column_mapping.numerical_features = numerical_columns
self.column_mapping.categorical_features = categorical_columns
if target_column:
self.column_mapping.target = target_column
# 漂移阈值
self.drift_threshold = 0.5 # 超过50%的特征漂移则告警
self.dataset_drift_threshold = 0.3 # 数据集整体漂移阈值
def calculate_drift(self, current_data: pd.DataFrame) -> dict:
"""计算数据漂移"""
# 使用DataDriftPreset进行漂移检测
drift_detector = DataDriftPreset()
drift_detector.calculate(
reference_data=self.reference_data,
current_data=current_data,
column_mapping=self.column_mapping
)
# 获取漂移结果
drift_result = drift_detector.get_result()
# 提取关键指标
results = {
'timestamp': datetime.now().isoformat(),
'dataset_drift': drift_result.metrics.dataset_drift,
'drift_share': drift_result.metrics.drift_share,
'number_of_drifted_features': drift_result.metrics.number_of_drifted_features,
'total_features': drift_result.metrics.total_features,
'feature_drift': {}
}
# 记录每个特征的漂移情况
for feature_name in drift_result.metrics.features['numerical']:
feature_metrics = drift_result.metrics.features['numerical'][feature_name]
results['feature_drift'][feature_name] = {
'drift_score': feature_metrics.drift_score,
'drifted': feature_metrics.drift_detected,
'expected_range': feature_metrics.expected_range,
'actual_range': feature_metrics.actual_range
}
return results
def should_alert(self, drift_results: dict) -> bool:
"""判断是否需要告警"""
return (
drift_results['dataset_drift'] or
drift_results['drift_share'] > self.drift_threshold
)
def generate_report(self, current_data: pd.DataFrame,
drift_results: dict,
output_path: str = './drift_reports') -> str:
"""生成HTML漂移报告"""
os.makedirs(output_path, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_path = os.path.join(output_path, f'drift_report_{timestamp}.html')
# 创建Dashboard
dashboard = Dashboard(tabs=[DataDriftTab(), NumTargetDriftTab()])
dashboard.calculate(
reference_data=self.reference_data,
current_data=current_data,
column_mapping=self.column_mapping
)
# 保存报告
dashboard.save(report_path)
return report_path
3. 创建漂移监控服务
# drift_monitor.py
from flask import Flask, Response, jsonify
from prometheus_flask_exporter import PrometheusMetrics
import pandas as pd
import schedule
import time
import threading
import logging
from drift_detector import DriftDetector
import boto3
import os
app = Flask(__name__)
metrics = PrometheusMetrics(app)
# 自定义Prometheus指标
DATA_DRIFT_GAUGE = metrics.registerGauge(
'data_drift_score',
'Data drift score for each feature',
labels=['feature_name']
)
DATASET_DRIFT_GAUGE = metrics.registerGauge(
'dataset_drift_detected',
'Whether dataset drift is detected (1=yes, 0=no)'
)
DRIFT_ALERT_COUNTER = metrics.registerCounter(
'drift_alerts_total',
'Total number of drift alerts'
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DriftMonitor:
"""漂移监控服务"""
def __init__(self):
self.detector = None
self.last_alert_time = None
self.alert_cooldown = 3600 # 告警冷却时间(秒)
def load_reference_data(self):
"""加载参考数据集(可以是训练数据或最近的稳定数据)"""
# 方式1:从本地文件加载
# reference_data = pd.read_csv('./data/reference_data.csv')
# 方式2:从数据库加载
# reference_data = self.load_from_database()
# 方式3:从S3/MinIO加载
s3_client = boto3.client('s3',
endpoint_url=os.environ.get('S3_ENDPOINT'),
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'))
s3_client.download_file('models-bucket',
'reference_data.parquet',
'/tmp/reference_data.parquet')
reference_data = pd.read_parquet('/tmp/reference_data.parquet')
# 定义特征列
numerical_cols = ['store', 'family_encoded', 'day_of_week',
'month', 'is_holiday', 'oil_price',
'day_of_year', 'week_of_year']
categorical_cols = ['family', 'city', 'state']
self.detector = DriftDetector(
reference_data=reference_data,
target_column='sales',
numerical_columns=numerical_cols,
categorical_columns=categorical_cols
)
logger.info(f"加载参考数据集: {len(reference_data)} 行")
return reference_data
def check_drift(self):
"""定时检查漂移"""
try:
logger.info("开始数据漂移检测...")
# 加载当前数据(过去24小时)
current_data = self.load_current_data()
if current_data is None or len(current_data) < 100:
logger.warning("当前数据不足,跳过检测")
return
# 计算漂移
drift_results = self.detector.calculate_drift(current_data)
# 更新Prometheus指标
for feature, metrics_data in drift_results['feature_drift'].items():
DATA_DRIFT_GAUGE.labels(feature_name=feature).set(
metrics_data['drift_score']
)
DATASET_DRIFT_GAUGE.set(1 if drift_results['dataset_drift'] else 0)
# 生成报告
if self.detector.should_alert(drift_results):
self.handle_alert(drift_results, current_data)
else:
logger.info(f"数据正常,漂移比例: {drift_results['drift_share']:.2%}")
# 保存漂移指标到时序数据库(可选)
self.save_drift_metrics(drift_results)
except Exception as e:
logger.error(f"漂移检测失败: {str(e)}")
def handle_alert(self, drift_results: dict, current_data: pd.DataFrame):
"""处理漂移告警"""
# 冷却检查
if self.last_alert_time:
elapsed = time.time() - self.last_alert_time
if elapsed < self.alert_cooldown:
logger.info(f"告警冷却中,剩余 {self.alert_cooldown - elapsed:.0f} 秒")
return
DRIFT_ALERT_COUNTER.inc()
self.last_alert_time = time.time()
# 生成详细报告
report_path = self.detector.generate_report(
current_data,
drift_results,
output_path='/app/drift_reports'
)
# 发送告警(Slack/钉钉/企业微信)
self.send_alert(
title="⚠️ 数据漂移检测告警",
message=f"""
检测到数据漂移!
📊 漂移统计:
- 数据集漂移: {'是' if drift_results['dataset_drift'] else '否'}
- 漂移特征比例: {drift_results['drift_share']:.2%}
- 漂移特征数: {drift_results['number_of_drifted_features']}/{drift_results['total_features']}
🔍 主要漂移特征:
{self.format_top_drifted_features(drift_results['feature_drift'])}
📄 详细报告: {report_path}
🤖 建议: 触发模型重训练流程
""")
# 触发自动重训练(可选,自动触发风险较高,建议先告警人工确认)
# self.trigger_retraining(current_data)
def send_alert(self, title: str, message: str):
"""发送告警通知"""
# Slack
import requests
slack_webhook = os.environ.get('SLACK_WEBHOOK')
if slack_webhook:
requests.post(slack_webhook, json={
'text': f"{title}\n{message}"
})
# 钉钉
dingtalk_webhook = os.environ.get('DINGTALK_WEBHOOK')
if dingtalk_webhook:
requests.post(dingtalk_webhook, json={
'msgtype': 'markdown',
'markdown': {
'title': title,
'text': message
}
})
def load_current_data(self) -> pd.DataFrame:
"""加载当前数据(从数据库或数据仓库)"""
# 这里实现从实际数据源加载的逻辑
pass
def format_top_drifted_features(self, feature_drift: dict, top_n: int = 5) -> str:
"""格式化漂移最严重的特征"""
sorted_features = sorted(
feature_drift.items(),
key=lambda x: x[1]['drift_score'],
reverse=True
)[:top_n]
return '\n'.join([
f"- **{name}**: {metrics['drift_score']:.3f} {'⚠️' if metrics['drifted'] else '✅'}"
for name, metrics in sorted_features
])
def save_drift_metrics(self, drift_results: dict):
"""保存漂移指标到时序数据库"""
# 可选实现:保存到InfluxDB/TimescaleDB用于长期趋势分析
pass
def trigger_retraining(self, current_data: pd.DataFrame):
"""触发模型重训练(危险操作,建议人工确认)"""
import requests
github_token = os.environ.get('GITHUB_TOKEN')
repo_owner = os.environ.get('REPO_OWNER')
repo_name = os.environ.get('REPO_NAME')
workflow_id = os.environ.get('RETRAIN_WORKFLOW_ID')
# 保存当前数据快照
snapshot_path = f'/tmp/current_data_{int(time.time())}.parquet'
current_data.to_parquet(snapshot_path)
# 触发GitHub Actions工作流
url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/actions/workflows/{workflow_id}/dispatches"
headers = {
'Authorization': f'token {github_token}',
'Accept': 'application/vnd.github.v3+json'
}
response = requests.post(url, headers=headers, json={
'ref': 'main',
'inputs': {
'triggered_by': 'drift_detection',
'snapshot_path': snapshot_path
}
})
if response.status_code == 204:
logger.info("成功触发重训练工作流")
else:
logger.error(f"触发重训练失败: {response.status_code}")
# Flask端点
@app.route('/health')
def health():
return jsonify({'status': 'ok'})
@app.route('/drift-check', methods=['POST'])
def manual_drift_check():
"""手动触发漂移检测"""
monitor.check_drift()
return jsonify({'status': 'drift check triggered'})
if __name__ == '__main__':
monitor = DriftMonitor()
monitor.load_reference_data()
# 定时任务(每小时检测一次)
schedule.every().hour.do(monitor.check_drift)
def run_schedule():
while True:
schedule.run_pending()
time.sleep(60)
# 后台运行定时任务
schedule_thread = threading.Thread(target=run_schedule, daemon=True)
schedule_thread.start()
app.run(host='0.0.0.0', port=5001)
四、第二步:配置Prometheus告警规则
prometheus-alerts.yml
groups:
- name: ml-monitoring
rules:
# 数据漂移告警
- alert: DataDriftDetected
expr: dataset_drift_detected == 1
for: 5m
labels:
severity: warning
annotations:
summary: "检测到数据漂移"
description: "数据集与参考数据存在显著漂移,建议检查并考虑重训练"
# 高漂移比例告警
- alert: HighFeatureDriftShare
expr: drift_share > 0.3
for: 5m
labels:
severity: warning
annotations:
summary: "大量特征发生漂移"
description: "漂移特征比例超过30%,当前值: {{ $value }}"
# 预测延迟告警
- alert: HighPredictionLatency
expr: histogram_quantile(0.99, rate(model_prediction_duration_seconds_bucket[5m])) > 2
for: 10m
labels:
severity: critical
annotations:
summary: "预测延迟过高"
description: "P99延迟超过2秒,可能需要扩容或优化"
# 模型精度下降(如果有ground truth标签)
- alert: ModelAccuracyDegradation
expr: |
(model_weekly_accuracy - model_monthly_accuracy) / model_monthly_accuracy < -0.1
for: 1h
labels:
severity: warning
annotations:
summary: "模型精度显著下降"
description: "周准确率相比月准确率下降超过10%"
五、第三步:使用MLflow管理模型版本
MLflow服务器配置
# docker-compose.mlflow.yml
version: '3.8'
services:
mlflow:
image: ghcr.io/mlflow/mlflow:latest
container_name: mlflow
ports:
- "5000:5000"
environment:
- MLFLOW_TRACKING_URI=postgresql://mlflow:password@postgres:5432/mlflow
- MLFLOW_ARTIFACT_ROOT=s3://mlflow-artifacts/
volumes:
- ./mlflow:/mlflow
depends_on:
- postgres
command: mlflow server --backend-store-uri postgresql://mlflow:password@postgres:5432/mlflow --default-artifact-root s3://mlflow-artifacts/ -h 0.0.0.0 -p 5000
postgres:
image: postgres:14
environment:
POSTGRES_DB: mlflow
POSTGRES_USER: mlflow
POSTGRES_PASSWORD: password
volumes:
- postgres-data:/var/lib/postgresql/data
minio:
image: minio/minio:latest
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
volumes:
- minio-data:/data
volumes:
postgres-data:
minio-data:
模型注册与部署脚本
# mlflow_manager.py
import mlflow
from mlflow.tracking import MlflowClient
import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_percentage_error
import joblib
import lightgbm as lgb
class MLflowModelManager:
"""MLflow模型管理器"""
def __init__(self, tracking_uri: str):
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment("retail-forecast")
self.client = MlflowClient()
def train_and_log_model(self, train_data: pd.DataFrame,
test_data: pd.DataFrame,
params: dict = None):
"""训练模型并记录到MLflow"""
with mlflow.start_run(run_name="drift-triggered-retraining"):
# 记录参数
if params:
mlflow.log_params(params)
# 准备特征
feature_cols = [c for c in train_data.columns if c not in ['sales', 'date']]
X_train = train_data[feature_cols]
y_train = train_data['sales']
X_test = test_data[feature_cols]
y_test = test_data['sales']
# 训练模型
model = lgb.LGBMRegressor(**params)
model.fit(X_train, y_train)
# 评估
predictions = model.predict(X_test)
mape = mean_absolute_percentage_error(y_test, predictions) * 100
# 记录指标
mlflow.log_metrics({
'test_mape': mape,
'test_rmse': np.sqrt(np.mean((y_test - predictions) ** 2)),
'train_size': len(X_train),
'test_size': len(X_test)
})
# 记录特征重要性
importance_df = pd.DataFrame({
'feature': feature_cols,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
importance_df.to_csv('feature_importance.csv', index=False)
mlflow.log_artifact('feature_importance.csv')
# 注册模型
model_name = "retail-forecast"
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name=model_name
)
# 保存模型
joblib.dump(model, 'model.pkl')
return model, mape
def promote_model(self, model_name: str, version: int, stage: str = "Staging"):
"""将模型推送到指定阶段"""
self.client.transition_model_version_stage(
name=model_name,
version=version,
stage=stage
)
def compare_models(self, model_name: str):
"""对比所有版本的模型性能"""
versions = self.client.search_model_versions(f"name = '{model_name}'")
comparison = []
for v in versions:
run = self.client.get_run(v.run_id)
metrics = run.data.metrics
comparison.append({
'version': v.version,
'stage': v.current_stage,
'mape': metrics.get('test_mape'),
'rmse': metrics.get('test_rmse'),
'created': v.creation_timestamp
})
return pd.DataFrame(comparison)
def get_production_model(self, model_name: str):
"""获取生产环境模型"""
try:
model = self.client.get_latest_versions(model_name, stages=['Production'])[0]
return model
except:
return None
def load_production_model(self, model_name: str):
"""加载生产环境模型"""
production_model = self.get_production_model(model_name)
if production_model:
return mlflow.pyfunc.load_model(
model_uri=f"models:/{model_name}/Production"
)
return None
六、第四步:GitHub Actions自动重训练流水线
retrain.yml
name: Model Retraining Pipeline
on:
workflow_dispatch:
inputs:
triggered_by:
description: '触发原因'
required: false
default: 'manual'
snapshot_date:
description: '数据快照日期 (YYYY-MM-DD)'
required: false
schedule:
# 每周日凌晨2点自动训练(基于上周数据)
- cron: '0 2 * * 0'
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
jobs:
retrain:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.9'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install mlflow boto3 psycopg2-binary
- name: Download training data
run: |
# 从数据仓库下载数据
python scripts/download_training_data.py \
--start-date "${{ github.event.inputs.snapshot_date || github.event.inputs.snapshot_date }}" \
--end-date "$(date +%Y-%m-%d)"
- name: Train model
run: |
python scripts/train_model.py \
--train-data ./data/train.parquet \
--test-data ./data/test.parquet \
--params-file ./configs/model_params.json
- name: Register to MLflow
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: |
python scripts/register_model.py
- name: Compare with current production
run: |
python scripts/compare_models.py
- name: Deploy to staging
if: github.event.inputs.manual_approve != 'false'
run: |
python scripts/deploy_model.py --stage staging
- name: Notify on Slack
if: always()
uses: slackapi/slack-github-action@v1
with:
channel-id: 'ml-alerts'
payload: |
{
"text": "模型重训练完成",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*模型重训练完成*"
}
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": "*触发原因:*"},
{"type": "plain_text", "text": "${{ github.event.inputs.triggered_by || 'scheduled' }}"},
{"type": "mrkdwn", "text": "*结果:*"},
{"type": "plain_text", "text": "${{ job.status }}"}
]
}
]
}
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
七、Grafana仪表盘配置
漂移监控仪表盘JSON(关键部分)
{
"dashboard": {
"title": "ML模型监控中心",
"panels": [
{
"title": "数据漂移热力图",
"type": "heatmap",
"targets": [
{
"expr": "data_drift_score",
"legendFormat": "{{feature_name}}"
}
]
},
{
"title": "漂移告警统计",
"type": "stat",
"targets": [
{
"expr": "increase(drift_alerts_total[24h])"
}
]
},
{
"title": "模型MAPE趋势",
"type": "timeseries",
"targets": [
{
"expr": "model_test_mape",
"legendFormat": "测试集MAPE"
}
]
},
{
"title": "预测延迟分布",
"type": "histogram",
"targets": [
{
"expr": "rate(model_prediction_duration_seconds_bucket[5m])"
}
]
}
]
}
}
八、常见问题与避坑指南
Q1:参考数据多久更新一次?
建议每月更新一次参考数据集,选择最近3个月的稳定数据作为基准。避免使用刚发生重大变化的数据作为参考。
Q2:误报太多怎么办?
调整漂移阈值,evidently默认使用0.5(50%),可以根据业务容错度调整到0.6-0.7。同时加入告警冷却机制,避免频繁告警。
Q3:自动重训练安全吗?
强烈建议不要开启完全自动重训练+自动部署。正确做法:检测到漂移 → 告警通知 → 人工确认 → 触发训练 → 部署到Staging → 人工验证 → 手动上线Production。
Q4:如何验证重训练后的模型?
- 在Staging环境做A/B测试
- 黄金数据集验证(Golden Dataset)
- 对比新旧模型的指标差异
- 逐步切换流量(10% → 50% → 100%)
九、完整监控体系总结
┌────────────────────────────────────────────────────────────────────┐
│ 生产ML监控体系 │
├────────────────────────────────────────────────────────────────────┤
│ 基础设施层 │
│ ├── Kubernetes: 服务编排、高可用、自动伸缩 │
│ ├── Prometheus: 指标采集、时序存储 │
│ └── Grafana: 可视化仪表盘 │
├────────────────────────────────────────────────────────────────────┤
│ 应用层 │
│ ├── API服务: /metrics、/health、/predict │
│ ├── 漂移检测: Evidently + 定时任务 │
│ └── 日志采集: ELK/Loki │
├────────────────────────────────────────────────────────────────────┤
│ ML层 │
│ ├── MLflow: 模型版本管理、实验追踪 │
│ ├── 漂移告警: Slack/钉钉/邮件 │
│ └── 自动重训练: GitHub Actions │
├────────────────────────────────────────────────────────────────────┤
│ 核心指标 │
│ ├── 业务指标: MAPE、RMSE、点击率、转化率 │
│ ├── 系统指标: QPS、延迟、错误率、CPU/内存 │
│ └── 模型指标: 漂移分数、特征分布、预测分布 │
└────────────────────────────────────────────────────────────────────┘
十、GitHub仓库
仓库地址:https://github.com/yourusername/ml-monitoring-system
包含内容:
- Evidently漂移检测模块
- MLflow模型管理脚本
- Prometheus告警规则
- Grafana监控仪表盘
- GitHub Actions自动重训练流水线
- 完整的Docker Compose配置
写在最后
模型监控不是一次性工作,而是持续迭代的过程。建议从本文的最小可行监控开始,逐步完善,让你的ML系统真正具备"自愈"能力。
如果觉得有帮助,欢迎一键三连!有问题欢迎评论区交流~
「往期文章推荐 + 关注我」
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)