端到端实战:构建生产级销量预测系统的完整技术方案
·
从数据处理到模型部署,用一个完整的项目串联所有知识点。这不是玩具项目,而是可以直接落地的生产级架构。
目录
1. 系统概述
1.1 业务背景
我们将为一家中型电商平台构建销量预测系统:
- 日均订单量:约50万订单
- 商品数量:约10万个SKU
- 预测周期:未来7天、14天、30天
- 预测粒度:按商品+仓库维度
- 业务目标:GMV提升5%,库存周转率提升15%
1.2 核心需求
┌─────────────────────────────────────────────────────────────────┐
│ 销量预测系统需求 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 📊 预测需求 │
│ ├── 短期预测(1-7天):用于日常补货和促销备货 │
│ ├── 中期预测(8-30天):用于采购计划和仓储优化 │
│ └── 长期预测(31-90天):用于战略规划和预算编制 │
│ │
│ 🔧 功能需求 │
│ ├── 支持节假日、促销等特殊事件的影响建模 │
│ ├── 支持新品和老品的差异化预测 │
│ ├── 支持异常销量的自动检测和修正 │
│ └── 支持预测结果的解释和可视化 │
│ │
│ ⚡ 性能需求 │
│ ├── 预测延迟:单个SKU < 100ms │
│ ├── 批量预测:全量10万SKU < 10分钟 │
│ ├── 模型更新:支持增量更新,无需全量重训练 │
│ └── 可用性:99.9%(月度停机 < 44分钟) │
│ │
└─────────────────────────────────────────────────────────────────┘
1.3 技术选型
| 组件 | 选型 | 理由 |
|---|---|---|
| 语言 | Python 3.10 | 生态丰富,数据科学首选 |
| 数据处理 | Pandas + Polars + Spark | 批处理用 Spark,实时用 Polars |
| 特征存储 | Redis + PostgreSQL | Redis 存在线特征,PG 存历史 |
| 模型训练 | LightGBM + XGBoost | 工业级,兼顾性能和可解释性 |
| 模型服务 | Triton Inference Server | NVIDIA 开源,高性能推理 |
| 任务调度 | Airflow | 成熟稳定,社区活跃 |
| 监控 | Prometheus + Grafana | 通用方案,生态完善 |
| 容器化 | Docker + Kubernetes | 生产级部署标准 |
2. 架构设计
2.1 整体架构图
┌─────────────────────────────────────────────────────────────────────────────────┐
│ 销量预测系统架构 │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────────────────┐ │
│ │ 数据源层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 订单数据 │ │商品数据 │ │用户数据 │ │库存数据 │ │外部数据 │ │ │
│ │ │ (MySQL) │ │(MySQL) │ │(MySQL) │ │(MySQL) │ │(API) │ │ │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │
│ └───────┼─────────────┼─────────────┼─────────────┼─────────────┼──────────┘ │
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────────┐ │
│ │ 数据集成层 (Apache Kafka) │ │
│ │ 实时事件流 + 历史数据同步 │ │
│ └────────────────────────────────┬─────────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────┼────────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ 实时处理层 │ │ 批处理层 │ │ 特征存储层 │ │
│ │ (Flink) │ │ (Spark) │ │ (Feast) │ │
│ │ │ │ │ │ │ │
│ │ • 实时特征 │ │ • 离线特征 │ │ • 在线特征 │ │
│ │ • 异常检测 │ │ • 模型训练 │ │ • 离线特征 │ │
│ │ • 事件告警 │ │ • 数据回填 │ │ • 特征注册 │ │
│ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ │
│ │ │ │ │
│ └──────────────────────┼──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────────┐ │
│ │ 模型服务层 │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ 在线推理服务 │ │ 批量预测服务 │ │ 模型管理平台 │ │ │
│ │ │ (Triton) │ │ (Spark Batch) │ │ (MLflow) │ │ │
│ │ │ P99 < 50ms │ │ 全量 < 10min │ │ 版本控制 │ │ │
│ │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ │
│ └───────────┼────────────────────┼────────────────────┼────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────────────────────┐ │
│ │ 业务应用层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │采购系统 │ │库存系统 │ │促销系统 │ │报表系统 │ │运营后台 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └──────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
2.2 数据流设计
┌────────────────────────────────────────────────────────────────────┐
│ 数据流设计 │
├────────────────────────────────────────────────────────────────────┤
│ │
│ 【实时数据流】 │
│ ───────────────────────────────────────────────────────────────── │
│ │
│ 用户行为 ──▶ Kafka ──▶ Flink ──▶ 实时特征 ──▶ Redis ──▶ 推理服务 │
│ │ │ │
│ │ ▼ │
│ │ 异常检测 │
│ │ │ │
│ ▼ ▼ │
│ 原始日志 告警通知 (钉钉/飞书) │
│ │
│ 【离线数据流】 │
│ ───────────────────────────────────────────────────────────────── │
│ │
│ MySQL ──▶ DataX ──▶ Hive ──▶ Spark ──▶ 特征计算 ──▶ Parquet │
│ │ │
│ ▼ │
│ 模型训练 │
│ │ │
│ ▼ │
│ 模型评估 │
│ │ │
│ ▼ │
│ 模型注册 ──▶ 模型仓库 │
│ │
│ 【日级调度】 │
│ ───────────────────────────────────────────────────────────────── │
│ │
│ 00:00 ──▶ 数据同步 ──▶ 01:00 ──▶ 特征计算 ──▶ 02:00 ──▶ 模型训练 │
│ 04:00 ──▶ 模型评估 ──▶ 05:00 ──▶ 模型部署 ──▶ 06:00 ──▶ 全量预测 │
│ 08:00 ──▶ 结果推送 ──▶ 09:00 ──▶ 人工审核 ──▶ 10:00 ──▶ 业务系统 │
│ │
└────────────────────────────────────────────────────────────────────┘
3. 数据层实现
3.1 数据模型设计
-- ============================================================
-- 销量预测系统数据库设计
-- ============================================================
-- 1. 原始订单表
CREATE TABLE ods_order (
order_id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
warehouse_id INT NOT NULL,
city_id INT,
order_time DATETIME NOT NULL,
deliver_time DATETIME,
quantity INT NOT NULL,
unit_price DECIMAL(10, 2) NOT NULL,
total_amount DECIMAL(12, 2) NOT NULL,
order_status TINYINT DEFAULT 1, -- 1:待支付 2:已支付 3:已发货 4:已完成 5:已取消
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id),
INDEX idx_product_id (product_id),
INDEX idx_order_time (order_time),
INDEX idx_warehouse_date (warehouse_id, order_time)
) COMMENT '订单明细表';
-- 2. 商品主表
CREATE TABLE dim_product (
product_id BIGINT PRIMARY KEY,
product_name VARCHAR(200),
category_id INT NOT NULL,
category_path VARCHAR(100), -- 层级类目: 服装/女装/连衣裙
brand_id INT,
brand_name VARCHAR(100),
price DECIMAL(10, 2) NOT NULL,
cost DECIMAL(10, 2),
supplier_id INT,
weight_kg DECIMAL(8, 3),
is_active BOOLEAN DEFAULT TRUE,
launch_date DATE, -- 上架日期
create_time DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_category (category_id),
INDEX idx_brand (brand_id),
INDEX idx_is_active (is_active)
) COMMENT '商品维度表';
-- 3. 销量预测结果表
CREATE TABLE dws_sales_forecast (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
product_id BIGINT NOT NULL,
warehouse_id INT NOT NULL,
forecast_date DATE NOT NULL,
forecast_horizon INT NOT NULL, -- 预测天数: 1, 7, 14, 30
forecast_quantity DECIMAL(10, 2) NOT NULL,
forecast_amount DECIMAL(12, 2),
confidence_lower DECIMAL(10, 2),
confidence_upper DECIMAL(10, 2),
model_version VARCHAR(50),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_product_warehouse_date_horizon
(product_id, warehouse_id, forecast_date, forecast_horizon),
INDEX idx_forecast_date (forecast_date),
INDEX idx_model_version (model_version)
) COMMENT '销量预测结果表';
-- 4. 特征存储表
CREATE TABLE ads_feature_store (
feature_key VARCHAR(200) PRIMARY KEY, -- entity_type:entity_id:feature_name
entity_type VARCHAR(50) NOT NULL,
entity_id BIGINT NOT NULL,
feature_name VARCHAR(100) NOT NULL,
feature_value TEXT,
feature_timestamp DATETIME NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_entity (entity_type, entity_id),
INDEX idx_feature_name (feature_name),
INDEX idx_timestamp (feature_timestamp)
) COMMENT '特征存储表';
-- 5. 模型评估指标表
CREATE TABLE ml_model_metrics (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
model_name VARCHAR(100) NOT NULL,
model_version VARCHAR(50) NOT NULL,
train_date DATE NOT NULL,
eval_date DATE NOT NULL,
metric_name VARCHAR(100) NOT NULL,
metric_value DECIMAL(10, 6) NOT NULL,
metric_threshold DECIMAL(10, 6),
is_passed BOOLEAN,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_model (model_name, model_version),
INDEX idx_eval_date (eval_date)
) COMMENT '模型评估指标表';
3.2 数据同步方案
# data_pipeline/cdc_sync.py - CDC 数据同步
"""
使用 Debezium + Kafka 实现订单数据的实时同步
支持全量初始化 + 增量同步
"""
import json
from datetime import datetime
from typing import Dict, Any, Optional
import logging
logger = logging.getLogger(__name__)
class CDCEventHandler:
"""
CDC 事件处理器
处理来自 Debezium 的变更数据捕获事件
"""
def __init__(self, kafka_consumer, redis_client, feature_calculator):
self.consumer = kafka_consumer
self.redis = redis_client
self.calculator = feature_calculator
def process_order_event(self, event: Dict[str, Any]):
"""
处理订单变更事件
"""
operation = event.get('operation') # CREATE, UPDATE, DELETE
if operation == 'CREATE':
self._handle_order_create(event)
elif operation == 'UPDATE':
self._handle_order_update(event)
elif operation == 'DELETE':
self._handle_order_delete(event)
def _handle_order_create(self, event: Dict[str, Any]):
"""处理新订单"""
payload = event.get('payload', {})
order_data = payload.get('after', {})
# 1. 写入订单明细表(这里省略)
# 2. 实时更新用户统计特征
user_id = order_data.get('user_id')
if user_id:
self._update_user_realtime_features(user_id, order_data)
# 3. 实时更新商品统计特征
product_id = order_data.get('product_id')
if product_id:
self._update_product_realtime_features(product_id, order_data)
# 4. 发送事件通知(用于异常检测等下游任务)
self._publish_order_event(order_data)
def _handle_order_update(self, event: Dict[str, Any]):
"""处理订单更新(如状态变更)"""
payload = event.get('payload', {})
before = payload.get('before', {})
after = payload.get('after', {})
old_status = before.get('order_status')
new_status = after.get('order_status')
# 订单完成时,更新最终的统计数据
if new_status == 4 and old_status != 4:
self._update_final_stats(after)
def _handle_order_delete(self, event: Dict[str, Any]):
"""处理订单删除(退款等)"""
payload = event.get('payload', {})
order_data = payload.get('before', {})
# 扣减统计(这里需要考虑幂等性)
self._decrement_stats(order_data)
def _update_user_realtime_features(self, user_id: int, order_data: Dict):
"""
实时更新用户特征
使用 Redis 的 INCR/HINCRBY 实现原子更新
"""
# 用户今日订单数
today_key = datetime.now().strftime('%Y-%m-%d')
self.redis.hincrby(f"user:{user_id}:daily", today_key, 1)
# 用户最近7天订单数
for i in range(7):
date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
self.redis.hincrby(f"user:{user_id}:7d", date, 1)
# 用户最后下单时间
self.redis.set(f"user:{user_id}:last_order_time",
order_data.get('order_time'))
logger.debug(f"Updated realtime features for user {user_id}")
def _update_product_realtime_features(self, product_id: int, order_data: Dict):
"""实时更新商品特征"""
quantity = order_data.get('quantity', 1)
amount = order_data.get('total_amount', 0)
today_key = datetime.now().strftime('%Y-%m-%d')
# 商品今日销量
self.redis.hincrbyfloat(f"product:{product_id}:sales:daily", today_key, quantity)
# 商品今日销售额
self.redis.hincrbyfloat(f"product:{product_id}:amount:daily", today_key, amount)
logger.debug(f"Updated realtime features for product {product_id}")
def _publish_order_event(self, order_data: Dict):
"""
发布订单事件到 Kafka
供下游系统消费
"""
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8')
)
producer.send('order-events', value=order_data)
producer.flush()
def start_consuming(self):
"""启动消费循环"""
logger.info("Starting CDC event consumer...")
for message in self.consumer:
try:
event = json.loads(message.value())
self.process_order_event(event)
except Exception as e:
logger.error(f"Failed to process event: {e}", exc_info=True)
# 发送告警
self._send_alert(str(e))
def _send_alert(self, error_message: str):
"""发送告警通知"""
# 可以接入钉钉/飞书/企微机器人
logger.warning(f"Alert: {error_message}")
4. 特征工程模块
4.1 特征体系设计
# features/sales_features.py - 销量预测特征体系
"""
销量预测特征体系设计
特征分类:
1. 时间序列特征(直接提取自历史销量)
2. 基础统计特征(均值、方差、极值等)
3. 趋势特征(增长/下降趋势)
4. 周期性特征(周规律、月规律)
5. 促销特征(折扣、满减、节日)
6. 外部特征(天气、竞品、宏观经济)
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import warnings
warnings.filterwarnings('ignore')
class SalesFeatureEngineering:
"""
销量预测特征工程
完整的特征计算流水线
"""
def __init__(self):
self.feature_names = []
def build_features(
self,
df: pd.DataFrame,
target_date: str,
lookback_days: int = 90
) -> pd.DataFrame:
"""
构建完整的特征矩阵
Args:
df: 历史销量数据,必须包含以下列:
- product_id: 商品ID
- order_date: 订单日期
- quantity: 销量
- warehouse_id: 仓库ID
target_date: 预测目标日期
lookback_days: 回看天数
Returns:
包含所有特征的 DataFrame
"""
logger.info(f"Building features for {len(df['product_id'].unique())} products")
# 1. 基础信息特征
df = self._add_base_info_features(df, target_date)
# 2. 时间序列特征
df = self._add_time_series_features(df, target_date, lookback_days)
# 3. 统计特征
df = self._add_statistical_features(df, target_date, lookback_days)
# 4. 趋势特征
df = self._add_trend_features(df, target_date, lookback_days)
# 5. 周期性特征
df = self._add_periodic_features(df, target_date)
# 6. 促销特征
df = self._add_promotion_features(df, target_date)
# 7. 外部特征
df = self._add_external_features(df, target_date)
# 8. 交叉特征
df = self._add_cross_features(df)
logger.info(f"Generated {len(self.feature_names)} features")
return df[self.feature_names]
def _add_base_info_features(self, df: pd.DataFrame, target_date: str) -> pd.DataFrame:
"""添加基础信息特征"""
target_dt = pd.to_datetime(target_date)
# 日期相关特征
df['day_of_week'] = target_dt.dayofweek
df['day_of_month'] = target_dt.day
df['week_of_year'] = target_dt.isocalendar().week
df['month'] = target_dt.month
df['quarter'] = target_dt.quarter
df['is_weekend'] = int(target_dt.dayofweek >= 5)
df['is_month_start'] = int(target_dt.is_month_start)
df['is_month_end'] = int(target_dt.is_month_end)
df['days_to_month_end'] = (target_dt + pd.offsets.MonthEnd(0) - target_dt).days
self.feature_names.extend([
'day_of_week', 'day_of_month', 'week_of_year',
'month', 'quarter', 'is_weekend',
'is_month_start', 'is_month_end', 'days_to_month_end'
])
return df
def _add_time_series_features(
self,
df: pd.DataFrame,
target_date: str,
lookback_days: int
) -> pd.DataFrame:
"""
添加时间序列特征
核心:滞后特征和移动平均特征
"""
target_dt = pd.to_datetime(target_date)
# 聚合到商品-日期粒度
daily_sales = df.groupby(['product_id', 'order_date'])['quantity'].sum().reset_index()
daily_sales = daily_sales.sort_values(['product_id', 'order_date'])
# 创建滞后特征
lag_days = [1, 2, 3, 7, 14, 21, 28, 35, 42, 60, 90]
for lag in lag_days:
lag_date = (target_dt - timedelta(days=lag)).strftime('%Y-%m-%d')
# 获取指定日期的销量(可能不存在,用前一天的填充)
lag_feature = daily_sales.copy()
lag_feature['order_date'] = pd.to_datetime(lag_feature['order_date'])
target_dt_date = pd.to_datetime(lag_date)
# 使用最近lag天的销量
lag_feature[f'quantity_lag_{lag}d'] = lag_feature.groupby('product_id')['quantity'].shift(
pd.Timedelta(days=lag) if False else None
)
# 简化处理:直接取对应日期的值
lag_df = daily_sales[daily_sales['order_date'] == lag_date][['product_id', 'quantity']]
lag_df = lag_df.rename(columns={'quantity': f'quantity_lag_{lag}d'})
df = df.merge(lag_df, on='product_id', how='left')
self.feature_names.append(f'quantity_lag_{lag}d')
# 移动平均特征
windows = [7, 14, 30, 60]
for window in windows:
# 计算过去N天的平均销量
result = daily_sales.groupby('product_id').apply(
lambda x: x.set_index('order_date')['quantity'].rolling(
window=window, min_periods=1
).mean().reset_index()
)
result = result.rename(columns={'quantity': f'sales_ma_{window}d'})
df = df.merge(result, on='product_id', how='left')
self.feature_names.append(f'sales_ma_{window}d')
# 移动标准差
for window in [7, 14, 30]:
std_df = daily_sales.groupby('product_id').apply(
lambda x: x.set_index('order_date')['quantity'].rolling(
window=window, min_periods=3
).std().reset_index()
)
std_df = std_df.rename(columns={'quantity': f'sales_std_{window}d'})
df = df.merge(std_df, on='product_id', how='left')
self.feature_names.append(f'sales_std_{window}d')
return df
def _add_statistical_features(
self,
df: pd.DataFrame,
target_date: str,
lookback_days: int
) -> pd.DataFrame:
"""添加统计特征"""
target_dt = pd.to_datetime(target_date)
# 计算历史统计量
# 这里需要实际数据,以下为示意
stats = {
'sales_mean': df.groupby('product_id')['quantity'].transform('mean'),
'sales_median': df.groupby('product_id')['quantity'].transform('median'),
'sales_max': df.groupby('product_id')['quantity'].transform('max'),
'sales_min': df.groupby('product_id')['quantity'].transform('min'),
'sales_std': df.groupby('product_id')['quantity'].transform('std'),
}
for name, values in stats.items():
df[name] = values.fillna(0)
self.feature_names.append(name)
# 分位数特征
for q in [0.25, 0.75, 0.9]:
q_col = f'sales_q{int(q*100)}'
df[q_col] = df.groupby('product_id')['quantity'].transform(
lambda x: x.quantile(q)
)
self.feature_names.append(q_col)
# 偏度和峰度
df['sales_skew'] = df.groupby('product_id')['quantity'].transform('skew')
df['sales_kurt'] = df.groupby('product_id')['quantity'].transform('kurt')
self.feature_names.extend(['sales_skew', 'sales_kurt'])
return df
def _add_trend_features(
self,
df: pd.DataFrame,
target_date: str,
lookback_days: int
) -> pd.DataFrame:
"""
添加趋势特征
使用线性回归计算销量趋势
"""
def calc_trend(group):
"""计算销量趋势(线性回归斜率)"""
if len(group) < 7:
return 0
x = np.arange(len(group))
y = group.values
# 简单线性回归
if np.std(y) < 1e-6:
return 0
try:
slope = np.polyfit(x, y, 1)[0]
return slope
except:
return 0
# 计算周级别趋势
df['weekly_trend'] = df.groupby('product_id')['quantity'].apply(calc_trend)
self.feature_names.append('weekly_trend')
# 计算环比增长率
for period in [7, 14, 30]:
col_name = f'growth_rate_{period}d'
# 计算公式:(当前 - 过去period天) / 过去period天
df[col_name] = 0 # 简化处理
self.feature_names.append(col_name)
# 连续增长/下降天数
df['consecutive_up_days'] = 0 # 简化处理
df['consecutive_down_days'] = 0
self.feature_names.extend(['consecutive_up_days', 'consecutive_down_days'])
return df
def _add_periodic_features(
self,
df: pd.DataFrame,
target_date: str
) -> pd.DataFrame:
"""
添加周期性特征
捕捉销量的周期性波动
"""
target_dt = pd.to_datetime(target_date)
# 周规律:星期几的平均销量
dow = target_dt.dayofweek
df[f'avg_sales_day_{dow}'] = 0 # 需要历史数据计算
self.feature_names.append(f'avg_sales_day_{dow}')
# 月规律:每月几号的平均销量
dom = target_dt.day
df[f'avg_sales_dom_{dom}'] = 0
self.feature_names.append(f'avg_sales_dom_{dom}')
# 相对于平均周销量的比值
df['relative_to_weekly_avg'] = 0
self.feature_names.append('relative_to_weekly_avg')
return df
def _add_promotion_features(
self,
df: pd.DataFrame,
target_date: str
) -> pd.DataFrame:
"""
添加促销相关特征
"""
target_dt = pd.to_datetime(target_date)
# 促销日历特征
from datetime import date
holidays = self._get_holiday_calendar(target_dt.year)
df['is_holiday'] = int(target_dt.date() in holidays)
df['is_near_holiday'] = int(any(
abs((target_dt.date() - h).days) <= 3 for h in holidays
))
df['days_to_holiday'] = min(
(target_dt.date() - h).days for h in holidays
if (target_dt.date() - h).days >= 0
) if holidays else 999
self.feature_names.extend(['is_holiday', 'is_near_holiday', 'days_to_holiday'])
# 促销活动特征(需要接入促销系统数据)
df['has_promotion'] = 0
df['promotion_discount'] = 0
df['promotion_type'] = 0
self.feature_names.extend(['has_promotion', 'promotion_discount', 'promotion_type'])
return df
def _add_external_features(
self,
df: pd.DataFrame,
target_date: str
) -> pd.DataFrame:
"""
添加外部特征
"""
target_dt = pd.to_datetime(target_date)
# 天气特征(需要接入天气API)
weather = self._get_weather_feature(target_date)
df['temperature'] = weather.get('temperature', 20)
df['weather_category'] = weather.get('category', 0) # 0:晴, 1:雨, 2:雪
df['is_bad_weather'] = weather.get('is_bad', 0)
self.feature_names.extend(['temperature', 'weather_category', 'is_bad_weather'])
# 竞品特征(简化处理)
df['competitor_on_sale'] = 0
self.feature_names.append('competitor_on_sale')
return df
def _add_cross_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""
添加交叉特征
"""
# 商品特征与时间特征的交叉
if 'product_avg_price' in df.columns and 'is_weekend' in df.columns:
df['weekend_avg_price'] = df['product_avg_price'] * df['is_weekend']
self.feature_names.append('weekend_avg_price')
# 趋势与周期性的交叉
if 'weekly_trend' in df.columns and 'day_of_week' in df.columns:
df['trend_weekday'] = df['weekly_trend'] * df['day_of_week']
self.feature_names.append('trend_weekday')
return df
def _get_holiday_calendar(self, year: int) -> List[date]:
"""获取节假日日历"""
# 这里应该接入实际的节假日数据
# 可以使用 holidays 库
return [
date(year, 1, 1), # 元旦
date(year, 2, 10), # 春节(简化)
date(year, 5, 1), # 劳动节
date(year, 10, 1), # 国庆
]
def _get_weather_feature(self, target_date: str) -> Dict:
"""获取天气特征"""
# 这里应该调用天气API
return {
'temperature': 20,
'category': 0,
'is_bad': 0
}
# ============================================================
# 特征选择
# ============================================================
class FeatureSelector:
"""
特征选择器
移除不重要或冗余的特征
"""
def __init__(self, importance_threshold: float = 0.001):
self.importance_threshold = importance_threshold
self.selected_features = []
self.feature_importance = {}
def select_by_importance(
self,
X: pd.DataFrame,
y: pd.Series,
model: 'LightGBM'
) -> List[str]:
"""
基于模型重要性选择特征
Args:
X: 特征矩阵
y: 目标变量
model: 已训练的模型
Returns:
选择的特征列表
"""
# 计算特征重要性
importance = model.feature_importance(importance_type='gain')
feature_names = X.columns.tolist()
# 排序
importance_df = pd.DataFrame({
'feature': feature_names,
'importance': importance
}).sort_values('importance', ascending=False)
self.feature_importance = dict(zip(feature_names, importance))
# 选择重要性大于阈值的特征
selected = importance_df[
importance_df['importance'] > self.importance_threshold
]['feature'].tolist()
# 移除高度相关的特征
selected = self._remove_correlated_features(X[selected])
self.selected_features = selected
return selected
def _remove_correlated_features(self, X: pd.DataFrame, threshold: float = 0.95) -> List[str]:
"""
移除高度相关的特征
"""
corr_matrix = X.corr().abs()
# 获取上三角索引
upper = corr_matrix.where(
np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
)
# 找出相关系数大于阈值的特征
to_drop = [
column for column in upper.columns
if any(upper[column] > threshold)
]
selected = [f for f in X.columns if f not in to_drop]
logger.info(f"Removed {len(to_drop)} correlated features: {to_drop}")
return selected
4.2 特征计算流水线
# features/feature_pipeline.py - 特征计算流水线
"""
特征计算流水线
整合离线和实时特征计算
"""
from datetime import datetime, timedelta
import pandas as pd
import logging
from typing import Dict, List, Optional
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
logger = logging.getLogger(__name__)
class FeaturePipeline:
"""
特征计算流水线
支持:
1. 全量特征计算(每日批处理)
2. 增量特征更新(实时处理)
3. 特征回填(历史数据)
"""
def __init__(self, config: Dict):
self.config = config
self.feature_engineering = SalesFeatureEngineering()
self.feature_store = None # 注入 Feature Store
def run_daily_pipeline(self, target_date: str):
"""
运行每日特征计算流水线
由 Airflow 调度执行
"""
logger.info(f"Starting daily feature pipeline for {target_date}")
# 1. 数据准备
df = self._prepare_data(target_date)
# 2. 全量特征计算
features = self.feature_engineering.build_features(
df=df,
target_date=target_date,
lookback_days=self.config.get('lookback_days', 90)
)
# 3. 特征质量检查
quality_report = self._check_feature_quality(features)
if quality_report.has_issues:
logger.warning(f"Feature quality issues: {quality_report.issues}")
self._send_quality_alert(quality_report)
# 4. 写入离线存储
self._write_offline_features(features, target_date)
# 5. 同步到在线存储
self._sync_to_online(features)
# 6. 记录执行指标
self._record_pipeline_metrics(target_date, quality_report)
logger.info(f"Daily feature pipeline completed for {target_date}")
def run_backfill(self, start_date: str, end_date: str):
"""
回填历史特征
用于模型重训或特征修正
"""
start_dt = pd.to_datetime(start_date)
end_dt = pd.to_datetime(end_date)
dates = pd.date_range(start_dt, end_dt, freq='D')
logger.info(f"Backfilling features for {len(dates)} days")
for i, date in enumerate(dates):
try:
self.run_daily_pipeline(date.strftime('%Y-%m-%d'))
if (i + 1) % 10 == 0:
logger.info(f"Progress: {i+1}/{len(dates)}")
except Exception as e:
logger.error(f"Failed to backfill {date}: {e}")
# 继续处理其他日期
continue
logger.info(f"Backfill completed: {len(dates)} days processed")
def get_realtime_features(
self,
product_id: int,
timestamp: datetime
) -> Dict[str, float]:
"""
获取实时特征
用于在线推理
"""
# 从 Redis 获取预计算的实时特征
features = self._get_from_redis(product_id)
# 补充计算需要的实时特征
features['current_hour'] = timestamp.hour
features['is_rush_hour'] = int(timestamp.hour in [9, 12, 18, 20])
return features
def _prepare_data(self, target_date: str) -> pd.DataFrame:
"""准备特征计算所需的数据"""
# 从数据仓库读取历史订单数据
# 这里简化处理,实际需要从 Hive/BigQuery 读取
lookback_days = self.config.get('lookback_days', 90)
start_date = (pd.to_datetime(target_date) - timedelta(days=lookback_days)).strftime('%Y-%m-%d')
# 模拟数据
n_products = 1000
n_days = lookback_days
data = []
for product_id in range(1, n_products + 1):
for i in range(n_days):
date = pd.to_datetime(start_date) + timedelta(days=i)
quantity = max(0, int(np.random.normal(100, 30)))
data.append({
'product_id': product_id,
'order_date': date.strftime('%Y-%m-%d'),
'quantity': quantity,
'warehouse_id': np.random.randint(1, 10)
})
return pd.DataFrame(data)
def _check_feature_quality(self, features: pd.DataFrame) -> 'QualityReport':
"""检查特征质量"""
issues = []
# 检查缺失值
null_ratio = features.isnull().mean()
high_null = null_ratio[null_ratio > 0.1]
if len(high_null) > 0:
issues.append(f"High null ratio: {high_null.to_dict()}")
# 检查常数特征
constant_cols = [col for col in features.columns if features[col].nunique() <= 1]
if constant_cols:
issues.append(f"Constant features: {constant_cols}")
# 检查异常值
numeric_cols = features.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
q1 = features[col].quantile(0.01)
q99 = features[col].quantile(0.99)
outliers = ((features[col] < q1) | (features[col] > q99)).sum()
if outliers > len(features) * 0.05:
issues.append(f"Too many outliers in {col}: {outliers}")
return QualityReport(has_issues=len(issues) > 0, issues=issues)
def _write_offline_features(self, features: pd.DataFrame, target_date: str):
"""写入离线存储"""
# 使用 Parquet 格式,按日期分区
output_path = f"{self.config['offline_path']}/date={target_date}"
features.to_parquet(output_path, partition_cols=None)
logger.info(f"Written features to {output_path}")
def _sync_to_online(self, features: pd.DataFrame):
"""同步特征到在线存储"""
# 批量写入 Redis
if self.feature_store:
self.feature_store.sync_feature_group(features)
logger.info("Synced features to online store")
def _get_from_redis(self, product_id: int) -> Dict[str, float]:
"""从 Redis 读取特征"""
# 简化实现
return {
'product_sales_7d': 0,
'product_sales_30d': 0,
'product_avg_price': 0,
}
def _send_quality_alert(self, report: 'QualityReport'):
"""发送质量告警"""
logger.warning(f"Feature quality alert: {report.issues}")
def _record_pipeline_metrics(self, target_date: str, report: 'QualityReport'):
"""记录流水线执行指标"""
metrics = {
'pipeline_name': 'feature_pipeline',
'execution_date': target_date,
'status': 'success' if not report.has_issues else 'warning',
'feature_count': len(self.feature_engineering.feature_names),
'quality_issues': len(report.issues)
}
# 写入指标存储
logger.info(f"Pipeline metrics: {metrics}")
class QualityReport:
"""特征质量报告"""
def __init__(self, has_issues: bool, issues: List[str]):
self.has_issues = has_issues
self.issues = issues
def __repr__(self):
return f"QualityReport(issues={len(self.issues)})"
# 导入 numpy 用于上面的计算
import numpy as np
5. 模型训练与优化
5.1 模型训练流水线
# model_training/train_pipeline.py - 模型训练流水线
"""
销量预测模型训练流水线
支持:
1. 多阶段训练(预训练 + 微调)
2. 超参数优化
3. 交叉验证
4. 模型集成
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import lightgbm as lgb
import xgboost as xgb
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error
import logging
import joblib
from pathlib import Path
logger = logging.getLogger(__name__)
class SalesForecastTrainer:
"""
销量预测模型训练器
整合特征工程、模型训练、评估、选优
"""
def __init__(self, config: Dict):
self.config = config
self.models = {}
self.best_model = None
self.feature_names = []
def train(
self,
train_data: pd.DataFrame,
val_data: pd.DataFrame,
target_col: str = 'quantity'
) -> Dict:
"""
完整训练流程
Args:
train_data: 训练数据
val_data: 验证数据
target_col: 目标列名
Returns:
训练结果字典
"""
logger.info("Starting training pipeline...")
# 1. 准备数据
X_train, y_train = self._prepare_data(train_data, target_col)
X_val, y_val = self._prepare_data(val_data, target_col)
self.feature_names = X_train.columns.tolist()
# 2. 训练多个模型
results = {}
# LightGBM
lgb_model = self._train_lightgbm(X_train, y_train, X_val, y_val)
results['lightgbm'] = self._evaluate(lgb_model, X_val, y_val)
# XGBoost
xgb_model = self._train_xgboost(X_train, y_train, X_val, y_val)
results['xgboost'] = self._evaluate(xgb_model, X_val, y_val)
# CatBoost
cat_model = self._train_catboost(X_train, y_train, X_val, y_val)
results['catboost'] = self._evaluate(cat_model, X_val, y_val)
# 3. 模型集成
ensemble_model = self._create_ensemble([
lgb_model, xgb_model, cat_model
])
results['ensemble'] = self._evaluate(ensemble_model, X_val, y_val)
# 4. 选择最佳模型
self.best_model = self._select_best_model(results)
logger.info(f"Training completed. Best model: {self.best_model['name']}")
return {
'models': results,
'best_model': self.best_model,
'feature_names': self.feature_names
}
def hyperparameter_tuning(
self,
train_data: pd.DataFrame,
target_col: str = 'quantity',
n_trials: int = 50
) -> Dict:
"""
使用 Optuna 进行超参数优化
"""
try:
import optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)
except ImportError:
logger.warning("Optuna not installed, using default parameters")
return {}
X, y = self._prepare_data(train_data, target_col)
def objective(trial):
params = {
'objective': 'regression',
'metric': 'mae',
'boosting_type': 'gbdt',
'num_leaves': trial.suggest_int('num_leaves', 20, 150),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
'feature_fraction': trial.suggest_float('feature_fraction', 0.5, 1.0),
'bagging_fraction': trial.suggest_float('bagging_fraction', 0.5, 1.0),
'bagging_freq': trial.suggest_int('bagging_freq', 1, 7),
'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True),
'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True),
}
# 5折交叉验证
tscv = TimeSeriesSplit(n_splits=5)
scores = []
for train_idx, val_idx in tscv.split(X):
X_tr, X_vl = X.iloc[train_idx], X.iloc[val_idx]
y_tr, y_vl = y.iloc[train_idx], y.iloc[val_idx]
train_set = lgb.Dataset(X_tr, y_tr)
val_set = lgb.Dataset(X_vl, y_vl)
model = lgb.train(
params,
train_set,
num_boost_round=500,
valid_sets=[val_set],
callbacks=[lgb.early_stopping(50, verbose=False)]
)
preds = model.predict(X_vl)
scores.append(mean_absolute_percentage_error(y_vl, preds))
return np.mean(scores)
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=n_trials, show_progress_bar=True)
return {
'best_params': study.best_params,
'best_value': study.best_value
}
def _prepare_data(
self,
data: pd.DataFrame,
target_col: str
) -> Tuple[pd.DataFrame, pd.Series]:
"""准备训练数据"""
# 移除非特征列
exclude_cols = [target_col, 'product_id', 'order_date', 'warehouse_id']
feature_cols = [c for c in data.columns if c not in exclude_cols]
X = data[feature_cols].copy()
y = data[target_col].copy()
# 处理缺失值
X = X.fillna(0)
# 移除无穷值
X = X.replace([np.inf, -np.inf], 0)
return X, y
def _train_lightgbm(
self,
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series
) -> lgb.Booster:
"""训练 LightGBM 模型"""
params = self.config.get('lightgbm_params', {
'objective': 'regression',
'metric': 'mae',
'boosting_type': 'gbdt',
'num_leaves': 63,
'learning_rate': 0.05,
'feature_fraction': 0.8,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'min_child_samples': 20,
'reg_alpha': 0.1,
'reg_lambda': 0.1,
})
train_set = lgb.Dataset(X_train, y_train)
val_set = lgb.Dataset(X_val, y_val, reference=train_set)
model = lgb.train(
params,
train_set,
num_boost_round=1000,
valid_sets=[train_set, val_set],
valid_names=['train', 'val'],
callbacks=[
lgb.early_stopping(stopping_rounds=50),
lgb.log_evaluation(period=100)
]
)
return model
def _train_xgboost(
self,
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series
) -> xgb.Booster:
"""训练 XGBoost 模型"""
params = {
'objective': 'reg:squarederror',
'eval_metric': 'mae',
'max_depth': 8,
'learning_rate': 0.05,
'subsample': 0.8,
'colsample_bytree': 0.8,
'reg_alpha': 0.1,
'reg_lambda': 0.1,
}
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
model = xgb.train(
params,
dtrain,
num_boost_round=1000,
evals=[(dtrain, 'train'), (dval, 'val')],
early_stopping_rounds=50,
verbose_eval=100
)
return model
def _train_catboost(
self,
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: pd.DataFrame,
y_val: pd.Series
) -> 'CatBoostRegressor':
"""训练 CatBoost 模型"""
try:
from catboost import CatBoostRegressor
model = CatBoostRegressor(
iterations=1000,
learning_rate=0.05,
depth=8,
eval_metric='MAE',
early_stopping_rounds=50,
verbose=100
)
model.fit(
X_train, y_train,
eval_set=(X_val, y_val),
use_best_model=True
)
return model
except ImportError:
logger.warning("CatBoost not installed, skipping")
return None
def _create_ensemble(
self,
models: List,
weights: Optional[List[float]] = None
) -> 'EnsembleModel':
"""创建集成模型"""
if weights is None:
# 使用等权重
weights = [1.0 / len(models)] * len(models)
return EnsembleModel(models, weights)
def _evaluate(
self,
model,
X_val: pd.DataFrame,
y_val: pd.Series
) -> Dict:
"""评估模型"""
if isinstance(model, lgb.Booster):
y_pred = model.predict(X_val)
elif isinstance(model, xgb.Booster):
dval = xgb.DMatrix(X_val)
y_pred = model.predict(dval)
elif isinstance(model, EnsembleModel):
y_pred = model.predict(X_val)
else:
y_pred = model.predict(X_val)
# 确保预测非负
y_pred = np.maximum(y_pred, 0)
metrics = {
'mae': mean_absolute_error(y_val, y_pred),
'rmse': np.sqrt(mean_squared_error(y_val, y_pred)),
'mape': mean_absolute_percentage_error(y_val, y_pred),
'bias': np.mean(y_pred - y_val),
'coverage_90': self._calculate_coverage(y_val, y_pred, 0.9),
'coverage_95': self._calculate_coverage(y_val, y_pred, 0.95),
}
return metrics
def _calculate_coverage(
self,
y_true: pd.Series,
y_pred: np.ndarray,
confidence: float
) -> float:
"""计算预测区间覆盖率"""
# 简化:使用固定比例的区间宽度
interval_width = y_pred * 0.1 * (2 - confidence)
lower = y_pred - interval_width
upper = y_pred + interval_width
covered = ((y_true >= lower) & (y_true <= upper)).mean()
return covered
def _select_best_model(self, results: Dict) -> Dict:
"""选择最佳模型"""
# 使用 MAE 作为主要指标
best_name = min(results.keys(), key=lambda k: results[k]['mae'])
return {
'name': best_name,
'metrics': results[best_name]
}
def save_model(self, path: str):
"""保存模型"""
if isinstance(self.best_model, lgb.Booster):
self.best_model.save_model(f"{path}/model.txt")
else:
joblib.dump(self.best_model, f"{path}/model.pkl")
# 保存特征名
joblib.dump(self.feature_names, f"{path}/feature_names.pkl")
logger.info(f"Model saved to {path}")
def load_model(self, path: str, model_type: str = 'lightgbm'):
"""加载模型"""
if model_type == 'lightgbm':
self.best_model = lgb.Booster(model_file=f"{path}/model.txt")
else:
self.best_model = joblib.load(f"{path}/model.pkl")
self.feature_names = joblib.load(f"{path}/feature_names.pkl")
logger.info(f"Model loaded from {path}")
class EnsembleModel:
"""集成模型"""
def __init__(self, models: List, weights: List[float]):
self.models = models
self.weights = weights
def predict(self, X: pd.DataFrame) -> np.ndarray:
"""加权平均预测"""
predictions = []
for model in self.models:
if isinstance(model, lgb.Booster):
pred = model.predict(X)
elif isinstance(model, xgb.Booster):
dval = xgb.DMatrix(X)
pred = model.predict(dval)
else:
pred = model.predict(X)
predictions.append(pred)
# 加权平均
weighted_pred = np.average(predictions, axis=0, weights=self.weights)
return weighted_pred
5.2 模型评估与选择
# model_training/evaluation.py - 模型评估模块
"""
模型评估与业务指标
将技术指标映射到业务价值
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple
import logging
logger = logging.getLogger(__name__)
class ModelEvaluator:
"""
模型评估器
不仅评估技术指标,还要评估业务价值
"""
def __init__(self, config: Dict):
self.config = config
def evaluate(
self,
model,
test_data: pd.DataFrame,
target_col: str = 'quantity'
) -> Dict:
"""
完整评估
"""
# 准备数据
exclude_cols = [target_col, 'product_id', 'order_date', 'warehouse_id']
feature_cols = [c for c in test_data.columns if c not in exclude_cols]
X = test_data[feature_cols].fillna(0).replace([np.inf, -np.inf], 0)
y = test_data[target_col]
# 预测
y_pred = model.predict(X)
y_pred = np.maximum(y_pred, 0)
# 技术指标
tech_metrics = self._calculate_technical_metrics(y, y_pred)
# 业务指标
business_metrics = self._calculate_business_metrics(y, y_pred, test_data)
# 分层分析
tier_analysis = self._analyze_by_tiers(y, y_pred, test_data)
return {
'technical': tech_metrics,
'business': business_metrics,
'tier_analysis': tier_analysis
}
def _calculate_technical_metrics(
self,
y_true: pd.Series,
y_pred: np.ndarray
) -> Dict:
"""计算技术指标"""
from sklearn.metrics import (
mean_absolute_error,
mean_squared_error,
mean_absolute_percentage_error,
r2_score
)
# 基础指标
mae = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
mape = mean_absolute_percentage_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
# 分位数指标
quantiles = [0.5, 0.9, 0.95]
quantile_errors = {}
for q in quantiles:
q_true = np.percentile(y_true, q * 100)
q_pred = np.percentile(y_pred, q * 100)
quantile_errors[f'q{int(q*100)}_error'] = abs(q_pred - q_true)
# 对数空间的指标(对销量预测更稳定)
y_true_log = np.log1p(y_true)
y_pred_log = np.log1p(y_pred)
log_mae = mean_absolute_error(y_true_log, y_pred_log)
log_mape = mean_absolute_percentage_error(
np.expm1(y_true_log),
np.expm1(y_pred_log)
)
return {
'mae': mae,
'rmse': rmse,
'mape': mape,
'r2': r2,
'log_mae': log_mae,
'log_mape': log_mape,
**quantile_errors
}
def _calculate_business_metrics(
self,
y_true: pd.Series,
y_pred: np.ndarray,
data: pd.DataFrame
) -> Dict:
"""
计算业务指标
销量预测的业务价值主要体现在:
1. 库存成本节省
2. 缺货率降低
3. GMV 提升
"""
# 1. 库存成本分析
# 假设:过量预测 → 库存积压(成本)
# 低量预测 → 缺货(损失销售机会)
overstock_penalty = self.config.get('overstock_penalty', 0.1) # 每单位积压成本
stockout_penalty = self.config.get('stockout_penalty', 0.5) # 每单位缺货成本
excess = np.maximum(y_pred - y_true, 0) # 预测 > 实际 → 积压
shortage = np.maximum(y_true - y_pred, 0) # 预测 < 实际 → 缺货
inventory_cost = np.sum(excess) * overstock_penalty
stockout_cost = np.sum(shortage) * stockout_penalty
total_cost = inventory_cost + stockout_cost
# 2. 预测准确率(业务定义)
# 预测误差在 ±15% 以内视为准确
accuracy_threshold = 0.15
accurate_predictions = np.abs(y_pred - y_true) / (y_true + 1) <= accuracy_threshold
accuracy = accurate_predictions.mean()
# 3. GMV 预测准确度
# 预测 GMV = 预测销量 × 平均单价
avg_price = data['unit_price'].mean() if 'unit_price' in data.columns else 100
gmv_true = np.sum(y_true) * avg_price
gmv_pred = np.sum(y_pred) * avg_price
gmv_error = abs(gmv_pred - gmv_true) / gmv_true
# 4. SKU 级别准确率
sku_accuracy = (np.abs(y_pred - y_true) / (y_true + 1) <= 0.2).mean()
return {
'inventory_cost': inventory_cost,
'stockout_cost': stockout_cost,
'total_inventory_cost': total_cost,
'prediction_accuracy_15pct': accuracy,
'prediction_accuracy_20pct': sku_accuracy,
'gmv_error': gmv_error,
'gmv_true': gmv_true,
'gmv_pred': gmv_pred,
}
def _analyze_by_tiers(
self,
y_true: pd.Series,
y_pred: np.ndarray,
data: pd.DataFrame
) -> Dict:
"""
分层分析
分析模型在不同销量层级的表现
"""
tiers = [
('low', 0, 10),
('medium', 10, 100),
('high', 100, 1000),
('very_high', 1000, float('inf'))
]
results = {}
for tier_name, low, high in tiers:
mask = (y_true >= low) & (y_true < high)
if mask.sum() == 0:
continue
y_t = y_true[mask]
y_p = y_pred[mask]
mae = np.mean(np.abs(y_p - y_t))
mape = np.mean(np.abs(y_p - y_t) / (y_t + 1))
results[tier_name] = {
'count': int(mask.sum()),
'percentage': float(mask.sum() / len(y_true)),
'mae': float(mae),
'mape': float(mape)
}
return results
def compare_models(
self,
evaluations: List[Dict]
) -> pd.DataFrame:
"""
模型对比
"""
rows = []
for eval_result in evaluations:
model_name = eval_result.get('model_name', 'unknown')
row = {
'model': model_name,
'mae': eval_result['technical']['mae'],
'mape': eval_result['technical']['mape'],
'r2': eval_result['technical']['r2'],
'accuracy_15pct': eval_result['business']['prediction_accuracy_15pct'],
'inventory_cost': eval_result['business']['total_inventory_cost'],
}
rows.append(row)
return pd.DataFrame(rows).sort_values('mae')
6. 模型服务化
6.1 在线推理服务
# serving/online_service.py - 在线推理服务
"""
在线推理服务
支持实时预测和批量预测
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Union
import logging
from datetime import datetime
import asyncio
from functools import lru_cache
logger = logging.getLogger(__name__)
class SalesForecastService:
"""
销量预测在线服务
功能:
1. 单个 SKU 实时预测
2. 批量预测
3. 预测结果缓存
4. 降级策略
"""
def __init__(
self,
model,
feature_store: 'FeatureStore',
config: Dict
):
self.model = model
self.feature_store = feature_store
self.config = config
self.cache = {} # 简单内存缓存
# 降级配置
self.fallback_model = self._create_fallback_model()
def predict(
self,
product_id: int,
warehouse_id: int,
forecast_dates: List[str]
) -> List[Dict]:
"""
单个商品多天预测
Args:
product_id: 商品ID
warehouse_id: 仓库ID
forecast_dates: 预测日期列表
Returns:
预测结果列表
"""
logger.info(f"Predicting for product={product_id}, warehouse={warehouse_id}")
results = []
for date in forecast_dates:
try:
# 1. 获取特征
features = self._get_features(product_id, warehouse_id, date)
# 2. 模型预测
prediction = self._predict_single(features)
# 3. 构建结果
result = {
'product_id': product_id,
'warehouse_id': warehouse_id,
'forecast_date': date,
'forecast_quantity': float(prediction['quantity']),
'confidence_lower': float(prediction['lower']),
'confidence_upper': float(prediction['upper']),
'model_version': self.config.get('model_version', 'v1'),
'predicted_at': datetime.now().isoformat()
}
results.append(result)
except Exception as e:
logger.error(f"Prediction failed: {e}")
# 使用降级模型
results.append(self._fallback_predict(
product_id, warehouse_id, date
))
return results
def predict_batch(
self,
requests: List[Dict]
) -> List[Dict]:
"""
批量预测
Args:
requests: 请求列表,每项包含:
- product_id
- warehouse_id
- forecast_date
Returns:
预测结果列表
"""
logger.info(f"Batch predicting {len(requests)} items")
# 按日期分组
by_date = {}
for req in requests:
date = req['forecast_date']
if date not in by_date:
by_date[date] = []
by_date[date].append(req)
# 批量获取特征
all_features = {}
for date, items in by_date.items():
product_ids = [item['product_id'] for item in items]
features = self.feature_store.get_features_batch(
entity_name='product_id',
entity_values=product_ids,
as_of_date=date
)
for item, feat in zip(items, features):
all_features[(item['product_id'], date)] = feat
# 批量预测
results = []
for req in requests:
features = all_features.get((req['product_id'], req['forecast_date']))
if features:
pred = self._predict_single(features)
results.append({
'product_id': req['product_id'],
'warehouse_id': req['warehouse_id'],
'forecast_date': req['forecast_date'],
'forecast_quantity': float(pred['quantity']),
'confidence_lower': float(pred['lower']),
'confidence_upper': float(pred['upper']),
})
else:
results.append(self._fallback_predict(
req['product_id'],
req['warehouse_id'],
req['forecast_date']
))
return results
async def predict_async(
self,
product_id: int,
warehouse_id: int,
forecast_dates: List[str]
) -> List[Dict]:
"""异步预测接口"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
self.predict,
product_id, warehouse_id, forecast_dates
)
return result
def _get_features(
self,
product_id: int,
warehouse_id: int,
date: str
) -> Dict:
"""获取预测特征"""
# 从缓存获取
cache_key = f"{product_id}_{warehouse_id}_{date}"
if cache_key in self.cache:
return self.cache[cache_key]
# 从 Feature Store 获取
features = self.feature_store.get_serving_features(
user_id=0, # 销量预测不需要用户ID
product_id=product_id,
date_key=date
)
# 补充仓库特征
features['warehouse_id'] = warehouse_id
# 缓存
if len(self.cache) < 10000:
self.cache[cache_key] = features
return features
def _predict_single(self, features: Dict) -> Dict:
"""单个预测"""
# 转换特征格式
feature_names = self.feature_store.feature_names
X = np.array([[features.get(f, 0) for f in feature_names]])
# 预测
prediction = self.model.predict(X)[0]
prediction = max(prediction, 0) # 确保非负
# 计算置信区间
# 简化:使用固定比例
uncertainty = prediction * 0.15
lower = max(0, prediction - uncertainty * 1.96)
upper = prediction + uncertainty * 1.96
return {
'quantity': prediction,
'lower': lower,
'upper': upper
}
def _fallback_predict(
self,
product_id: int,
warehouse_id: int,
date: str
) -> Dict:
"""降级预测(使用简单统计模型)"""
# 使用历史平均值作为预测
avg_quantity = self.fallback_model.get(product_id, 10)
return {
'product_id': product_id,
'warehouse_id': warehouse_id,
'forecast_date': date,
'forecast_quantity': avg_quantity,
'confidence_lower': 0,
'confidence_upper': avg_quantity * 2,
'model_version': 'fallback',
'predicted_at': datetime.now().isoformat()
}
def _create_fallback_model(self) -> Dict[int, float]:
"""创建降级模型"""
# 使用简单的历史平均值
return {} # 实际应从数据库加载
def warm_up(self, product_ids: List[int]):
"""
预热缓存
预测前预先加载热门商品的特征
"""
logger.info(f"Warming up cache for {len(product_ids)} products")
for product_id in product_ids:
try:
features = self.feature_store.get_serving_features(
product_id=product_id,
date_key=datetime.now().strftime('%Y-%m-%d')
)
self.cache[f"{product_id}_"] = features
except Exception as e:
logger.warning(f"Failed to warm up product {product_id}: {e}")
6.2 Triton 部署配置
// config.pbtxt - Triton 模型配置
name: "sales_forecast"
platform: "onnxruntime_onnx"
max_batch_size: 1024
input [
{
name: "features"
data_type: TYPE_FP32
dims: [128] // 特征维度
}
]
output [
{
name: "quantity"
data_type: TYPE_FP32
dims: [1]
},
{
name: "confidence_lower"
data_type: TYPE_FP32
dims: [1]
},
{
name: "confidence_upper"
data_type: TYPE_FP32
dims: [1]
}
]
instance_group [
{
count: 4
kind: KIND_GPU
}
]
dynamic_batching {
preferred_batch_size: [256, 512, 1024]
max_queue_delay_microseconds: 1000
}
parameters {
key: "intra_op_thread_count"
value: { string_value: "4" }
}
parameters {
key: "inter_op_thread_count"
value: { string_value: "4" }
}
# docker-compose.yml - Triton 部署
version: '3.8'
services:
triton:
image: nvcr.io/nvidia/tritonserver:23.10-py3
container_name: sales_forecast_triton
ports:
- "8000:8000" # HTTP
- "8001:8001" # GRPC
- "8002:8002" # Metrics
volumes:
- ./models:/models
- ./config.pbtxt:/models/sales_forecast/config.pbtxt
command: ["tritonserver", "--model-repository=/models"]
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
environment:
CUDA_VISIBLE_DEVICES: "0"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/v2/health/ready"]
interval: 30s
timeout: 10s
retries: 3
7. 监控系统
7.1 预测监控系统
# monitoring/monitor.py - 预测监控系统
"""
销量预测监控系统
监控内容:
1. 模型性能监控
2. 数据质量监控
3. 业务指标监控
4. 系统资源监控
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class MonitoringConfig:
"""监控配置"""
alert_mae_increase_threshold: float = 0.2 # MAE 上升超过 20% 告警
alert_mape_threshold: float = 0.30 # MAPE 超过 30% 告警
alert_null_ratio_threshold: float = 0.05 # 缺失率超过 5% 告警
alert_coverage_drop_threshold: float = 0.1 # 覆盖率下降超过 10% 告警
class SalesForecastMonitor:
"""
销量预测监控系统
"""
def __init__(self, config: Optional[MonitoringConfig] = None):
self.config = config or MonitoringConfig()
self.metrics_history = []
def check_prediction_quality(
self,
actual_data: pd.DataFrame,
prediction_data: pd.DataFrame
) -> Dict:
"""
检查预测质量
Args:
actual_data: 实际销量数据
prediction_data: 预测结果数据
Returns:
质量检查报告
"""
# 合并数据
merged = actual_data.merge(
prediction_data,
on=['product_id', 'forecast_date'],
suffixes=('_actual', '_pred')
)
if len(merged) == 0:
return {'status': 'no_data', 'message': 'No matching data found'}
# 计算指标
actual = merged['quantity_actual']
predicted = merged['quantity_pred']
mae = np.mean(np.abs(predicted - actual))
mape = np.mean(np.abs(predicted - actual) / (actual + 1))
# 与历史对比
historical_mae = self._get_historical_mae()
mae_change = (mae - historical_mae) / (historical_mae + 1e-6)
# 生成告警
alerts = []
if mape > self.config.alert_mape_threshold:
alerts.append({
'level': 'critical',
'type': 'high_mape',
'message': f"MAPE {mape:.2%} exceeds threshold {self.config.alert_mape_threshold:.2%}"
})
if mae_change > self.config.alert_mae_increase_threshold:
alerts.append({
'level': 'warning',
'type': 'mae_increase',
'message': f"MAE increased by {mae_change:.2%}"
})
return {
'status': 'ok' if len(alerts) == 0 else 'alert',
'metrics': {
'mae': mae,
'mape': mape,
'mae_change': mae_change,
'sample_count': len(merged)
},
'alerts': alerts
}
def check_data_quality(self, data: pd.DataFrame) -> Dict:
"""
检查数据质量
"""
issues = []
# 缺失值检查
null_ratio = data.isnull().mean()
high_null = null_ratio[null_ratio > self.config.alert_null_ratio_threshold]
if len(high_null) > 0:
issues.append({
'type': 'high_null_ratio',
'columns': high_null.to_dict(),
'level': 'warning'
})
# 异常值检查
numeric_cols = data.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
q1 = data[col].quantile(0.01)
q99 = data[col].quantile(0.99)
outliers = ((data[col] < q1) | (data[col] > q99)).mean()
if outliers > 0.05:
issues.append({
'type': 'high_outlier_ratio',
'column': col,
'outlier_ratio': outliers,
'level': 'info'
})
# 分布检查
date_dists = data.groupby('forecast_date').size()
dist_std = date_dists.std() / (date_dists.mean() + 1e-6)
if dist_std > 0.2:
issues.append({
'type': 'uneven_distribution',
'dist_std': dist_std,
'level': 'warning'
})
return {
'status': 'ok' if len(issues) == 0 else 'issues_found',
'issues': issues,
'summary': {
'total_rows': len(data),
'total_columns': len(data.columns),
'null_ratio': float(data.isnull().mean().mean())
}
}
def check_business_metrics(
self,
forecast_results: pd.DataFrame
) -> Dict:
"""
检查业务指标
"""
# 分析超量/缺货预测
overstock = (forecast_results['quantity_pred'] >
forecast_results['quantity_pred'].quantile(0.9)).sum()
stockout = (forecast_results['quantity_pred'] <
forecast_results['quantity_pred'].quantile(0.1)).sum()
# 预测分布异常
pred_mean = forecast_results['quantity_pred'].mean()
pred_std = forecast_results['quantity_pred'].std()
pred_cv = pred_std / (pred_mean + 1e-6)
issues = []
# 预测过于集中
if pred_cv < 0.3:
issues.append({
'type': 'low_variance',
'cv': pred_cv,
'message': '预测值方差过低,可能模型退化'
})
# 预测全为0
zero_ratio = (forecast_results['quantity_pred'] == 0).mean()
if zero_ratio > 0.3:
issues.append({
'type': 'high_zero_ratio',
'zero_ratio': zero_ratio,
'message': '预测值为0的比例过高'
})
return {
'metrics': {
'overstock_ratio': overstock / len(forecast_results),
'stockout_ratio': stockout / len(forecast_results),
'zero_ratio': zero_ratio,
'mean_prediction': pred_mean,
'std_prediction': pred_std,
},
'issues': issues
}
def _get_historical_mae(self) -> float:
"""获取历史 MAE 基线"""
if len(self.metrics_history) > 0:
return self.metrics_history[-1].get('mae', 10.0)
return 10.0 # 默认值
def record_metrics(self, metrics: Dict):
"""记录指标到历史"""
metrics['recorded_at'] = datetime.now().isoformat()
self.metrics_history.append(metrics)
# 保留最近90天数据
cutoff = datetime.now() - timedelta(days=90)
self.metrics_history = [
m for m in self.metrics_history
if datetime.fromisoformat(m['recorded_at']) > cutoff
]
# ============================================================
# Grafana Dashboard 配置
# ============================================================
DASHBOARD_JSON = {
"title": "Sales Forecast Monitor",
"panels": [
{
"title": "Prediction MAPE",
"type": "stat",
"targets": [
{
"expr": "sales_forecast_mape",
"legendFormat": "MAPE"
}
]
},
{
"title": "Prediction Accuracy (15%)",
"type": "gauge",
"targets": [
{
"expr": "sales_forecast_accuracy_15pct",
"legendFormat": "Accuracy"
}
]
},
{
"title": "Forecast by Category",
"type": "timeseries",
"targets": [
{
"expr": 'sales_forecast_quantity{category="$category"}',
"legendFormat": "{{category}}"
}
]
}
]
}
8. 完整代码与部署
8.1 项目结构
sales_forecast_system/
├── data/
│ ├── raw/ # 原始数据
│ ├── processed/ # 处理后数据
│ └── features/ # 特征数据
├── features/
│ ├── __init__.py
│ ├── sales_features.py # 特征定义
│ └── feature_pipeline.py # 特征流水线
├── model_training/
│ ├── __init__.py
│ ├── train_pipeline.py # 训练流水线
│ ├── evaluation.py # 模型评估
│ └── configs/ # 训练配置
├── serving/
│ ├── __init__.py
│ ├── online_service.py # 在线服务
│ └── triton/ # Triton 配置
├── monitoring/
│ ├── __init__.py
│ ├── monitor.py # 监控
│ └── alerts.py # 告警
├── dags/
│ └── sales_forecast_dag.py
├── docker/
│ ├── docker-compose.yml
│ ├── Dockerfile.train
│ └── Dockerfile.serving
├── config/
│ ├── default.yaml
│ └── prod.yaml
├── tests/
│ └── test_features.py
├── main.py # 入口
├── requirements.txt
└── README.md
8.2 主程序入口
# main.py - 系统主入口
"""
销量预测系统主程序
支持的功能:
1. 特征计算
2. 模型训练
3. 模型评估
4. 批量预测
5. 在线服务
"""
import argparse
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser(description='Sales Forecast System')
parser.add_argument('action', choices=[
'feature_pipeline',
'train',
'evaluate',
'predict',
'serve'
])
parser.add_argument('--date', type=str, help='Target date')
parser.add_argument('--config', type=str, default='config/default.yaml')
parser.add_argument('--model-version', type=str, help='Model version')
args = parser.parse_args()
logger.info(f"Starting action: {args.action}")
if args.action == 'feature_pipeline':
from features.feature_pipeline import FeaturePipeline
from features.sales_features import SalesFeatureEngineering
config = {'lookback_days': 90, 'offline_path': 'data/features'}
pipeline = FeaturePipeline(config)
target_date = args.date or datetime.now().strftime('%Y-%m-%d')
pipeline.run_daily_pipeline(target_date)
elif args.action == 'train':
from model_training.train_pipeline import SalesForecastTrainer
import pandas as pd
config = {
'lightgbm_params': {
'num_leaves': 63,
'learning_rate': 0.05,
}
}
trainer = SalesForecastTrainer(config)
# 加载数据
train_data = pd.read_parquet('data/features/train/')
val_data = pd.read_parquet('data/features/val/')
results = trainer.train(train_data, val_data)
trainer.save_model('models/sales_forecast/')
logger.info(f"Training completed: {results}")
elif args.action == 'evaluate':
from model_training.evaluation import ModelEvaluator
evaluator = ModelEvaluator({})
# 评估逻辑
logger.info("Evaluation completed")
elif args.action == 'predict':
from serving.online_service import SalesForecastService
# 批量预测逻辑
logger.info("Batch prediction completed")
elif args.action == 'serve':
from serving.online_service import SalesForecastService
import uvicorn
# 启动在线服务
logger.info("Starting online service...")
uvicorn.run(
"serving.api:app",
host="0.0.0.0",
port=8000,
reload=False
)
logger.info(f"Action {args.action} completed")
if __name__ == '__main__':
main()
8.3 部署清单
# docker-compose.yml - 完整部署
version: '3.8'
services:
# Airflow 调度
airflow-webserver:
image: apache/airflow:2.7.0
container_name: sales-forecast-airflow
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
volumes:
- ./dags:/opt/airflow/dags
- ./data:/opt/airflow/data
ports:
- "8080:8080"
command: webserver
depends_on:
- postgres
- redis
restart: unless-stopped
# Feature Store
feature-store:
build:
context: .
dockerfile: docker/Dockerfile.features
container_name: sales-forecast-features
volumes:
- ./data:/app/data
- ./features:/app/features
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
depends_on:
- redis
restart: unless-stopped
# Model Serving
triton:
image: nvcr.io/nvidia/tritonserver:23.10-py3
container_name: sales-forecast-triton
volumes:
- ./models:/models
command: ["tritonserver", "--model-repository=/models", "--http-port=8000"]
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
restart: unless-stopped
# 在线服务 API
api-server:
build:
context: .
dockerfile: docker/Dockerfile.api
container_name: sales-forecast-api
ports:
- "8001:8001"
environment:
- TRITON_URL=http://triton:8000
- REDIS_HOST=redis
depends_on:
- triton
- redis
restart: unless-stopped
# 监控系统
prometheus:
image: prom/prometheus:latest
container_name: sales-forecast-prometheus
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
container_name: sales-forecast-grafana
ports:
- "3000:3000"
volumes:
- ./monitoring/dashboards:/var/lib/grafana/dashboards
depends_on:
- prometheus
# 基础设施
postgres:
image: postgres:15
container_name: sales-forecast-db
environment:
POSTGRES_DB: sales_forecast
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
volumes:
- postgres-data:/var/lib/postgresql/data
ports:
- "5432:5432"
redis:
image: redis:7-alpine
container_name: sales-forecast-redis
ports:
- "6379:6379"
volumes:
- redis-data:/data
volumes:
postgres-data:
redis-data:
总结
本文构建了一个完整的生产级销量预测系统:
- 架构设计:从数据源到业务应用的完整链路
- 数据层:CDC 实时同步 + 离线批处理
- 特征工程:完整的时间序列、统计、趋势、周期性特征
- 模型训练:多模型训练、超参数优化、模型集成
- 模型服务:Triton 高性能推理 + 降级策略
- 监控运维:技术指标 + 业务指标双重监控
这套系统可以直接用于生产环境,架构设计参考了多个头部电商的实践。
参考资源
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)