[菜鸟教程] 机器学习教程第四课-机器学习项目生命周期二
·
接上文,这是一个可运行的示例
"""
================================================================================
完整可运行的机器学习项目流程
================================================================================
修复了以下问题:
1. 数据泄露(标准化在划分之后进行)
2. 目标列名称不匹配
3. 预测服务缺少预处理
4. LabelEncoder 改为 OneHotEncoder
5. 修复 get_best_model 对回归指标的逻辑错误
6. 使用独立的随机种子管理
7. 移除 inplace 操作的 FutureWarning
8. 添加完整的异常处理
作者: AI Assistant
版本: 2.0
================================================================================
"""
# =========================
# 导入必要的库
# =========================
import json # 用于处理 JSON 格式数据
import pandas as pd # 数据处理和分析
import numpy as np # 数值计算
import pickle # 模型序列化(保存/加载)
import matplotlib
import matplotlib.pyplot as plt # 数据可视化
from datetime import datetime, timedelta # 时间处理
from sklearn.model_selection import train_test_split, cross_val_score # 数据划分和交叉验证
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder # 数据预处理
from sklearn.compose import ColumnTransformer # 列转换器
from sklearn.pipeline import Pipeline # 流水线
from sklearn.linear_model import LogisticRegression # 逻辑回归模型
from sklearn.ensemble import RandomForestClassifier # 随机森林模型
from sklearn.metrics import ( # 评估指标
accuracy_score, precision_score, recall_score, f1_score,
confusion_matrix, roc_curve, auc, classification_report
)
# 设置 matplotlib 中文字体(解决中文显示问题)
matplotlib.use('TkAgg')
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans'] # 使用黑体
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# =================================================================================
# 模块 1:数据收集器 (DataCollector)
# 功能:模拟从多个数据源(用户、行为、商品)收集原始数据
# =================================================================================
class DataCollector:
"""
数据收集器类
负责生成模拟数据,用于演示机器学习项目的数据收集阶段
"""
def __init__(self, seed=42):
"""
初始化数据收集器
参数:
seed: 随机种子,确保数据可复现
"""
# 创建独立的随机数生成器(比全局 np.random 更可控)
self.rng = np.random.RandomState(seed)
# 存储收集到的所有数据
self.collected_data = {}
def collect_user_data(self, n_users=1000):
"""
收集用户画像数据
参数:
n_users: 需要生成的用户数量
返回:
DataFrame: 包含用户信息的表格
"""
# 生成用户数据字典
user_data = {
'user_id': range(1, n_users + 1), # 用户ID,从1开始
'age': self.rng.randint(18, 65, n_users), # 年龄:18-65岁随机
'gender': self.rng.choice(['男', '女'], n_users), # 性别:随机男/女
'city': self.rng.choice(['北京', '上海', '广州', '深圳'], n_users), # 所在城市
'registration_date': [ # 注册日期:过去一年内随机
datetime.now() - timedelta(days=self.rng.randint(1, 365))
for _ in range(n_users)
]
}
# 转换为 DataFrame 并存储
self.collected_data['users'] = pd.DataFrame(user_data)
print(f"✓ 收集了 {n_users} 条用户数据")
return self.collected_data['users']
def collect_behavior_data(self, n_behaviors=5000, n_users=1000):
"""
收集用户行为数据(浏览、点击、加购物车、购买等)
参数:
n_behaviors: 行为记录数量
n_users: 用户总数(用于生成合理的 user_id 范围)
返回:
DataFrame: 包含行为记录的表格
"""
# 从用户ID范围内随机选择产生行为的用户
user_ids = self.rng.choice(range(1, n_users + 1), n_behaviors)
# 从商品ID范围内随机选择商品(假设有500个商品)
product_ids = self.rng.choice(range(1, 501), n_behaviors)
behavior_data = {
'behavior_id': range(1, n_behaviors + 1), # 行为ID
'user_id': user_ids, # 用户ID
'product_id': product_ids, # 商品ID
'behavior_type': self.rng.choice( # 行为类型(带概率权重)
['浏览', '点击', '加购物车', '购买'], n_behaviors,
p=[0.4, 0.3, 0.2, 0.1] # 40%浏览, 30%点击, 20%加购, 10%购买
),
'timestamp': [ # 行为发生时间
datetime.now() - timedelta(minutes=self.rng.randint(1, 10080))
for _ in range(n_behaviors)
],
'duration': self.rng.exponential(30, n_behaviors) # 停留时间(指数分布)
}
self.collected_data['behaviors'] = pd.DataFrame(behavior_data)
print(f"✓ 收集了 {n_behaviors} 条行为数据")
return self.collected_data['behaviors']
def collect_product_data(self, n_products=500):
"""
收集商品数据(本示例中暂未使用,保留以扩展功能)
参数:
n_products: 商品数量
"""
categories = ['电子产品', '服装', '食品', '家居', '图书']
product_data = {
'product_id': range(1, n_products + 1),
'category': self.rng.choice(categories, n_products), # 商品类别
'price': self.rng.uniform(10, 1000, n_products).round(2), # 价格
'rating': self.rng.uniform(3.0, 5.0, n_products).round(1), # 评分
'stock': self.rng.randint(0, 1000, n_products) # 库存
}
self.collected_data['products'] = pd.DataFrame(product_data)
print(f"✓ 收集了 {n_products} 条商品数据")
return self.collected_data['products']
def get_data_summary(self):
"""打印所有收集数据的摘要信息"""
print("\n" + "=" * 50)
print("数据收集摘要")
print("=" * 50)
for name, df in self.collected_data.items():
print(f"\n【{name}】")
print(f" 形状: {df.shape}")
print(f" 列名: {list(df.columns)}")
print(f" 缺失值: {df.isnull().sum().sum()}")
# =================================================================================
# 模块 2:数据准备器 (DataPreparer)
# 功能:数据清洗、特征工程、数据转换、数据集划分
# 关键修复:先划分后处理,避免数据泄露
# =================================================================================
class DataPreparer:
"""
数据准备器类
负责将原始数据转换为模型可用的格式
重要改进:
1. 先划分数据集,再分别进行预处理(避免数据泄露)
2. 使用 OneHotEncoder 替代 LabelEncoder(避免引入顺序关系)
3. 使用 ColumnTransformer 统一管理数值列和类别列的处理
"""
def __init__(self, data):
"""
初始化数据准备器
参数:
data: 原始 DataFrame(包含特征和目标列)
"""
self.original_data = data.copy() # 保存原始数据的副本
self.preprocessor = None # 预处理器(ColumnTransformer 对象)
self.feature_names = None # 预处理后的特征名称
def _clean_data(self, df):
"""
数据清洗(私有方法)
处理缺失值、重复值、异常值
注意:这个方法不包含标准化,因为标准化需要在数据划分之后进行
参数:
df: 待清洗的 DataFrame
返回:
清洗后的 DataFrame
"""
df = df.copy() # 避免修改原数据
# ----- 1. 处理缺失值 -----
# 数值列:使用中位数填充(比均值更鲁棒)
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if df[col].isnull().sum() > 0:
df[col] = df[col].fillna(df[col].median())
# 类别列:使用众数填充(出现最多的值)
categorical_cols = df.select_dtypes(include=['object']).columns
for col in categorical_cols:
if df[col].isnull().sum() > 0:
mode_val = df[col].mode()[0] if len(df[col].mode()) > 0 else 'unknown'
df[col] = df[col].fillna(mode_val)
# ----- 2. 处理重复值 -----
df = df.drop_duplicates()
# ----- 3. 处理异常值(使用 IQR 四分位距方法)-----
for col in numeric_cols:
Q1 = df[col].quantile(0.25) # 第一四分位数(25%位置)
Q3 = df[col].quantile(0.75) # 第三四分位数(75%位置)
IQR = Q3 - Q1 # 四分位距
lower = Q1 - 1.5 * IQR # 下界
upper = Q3 + 1.5 * IQR # 上界
# 将超出边界的值裁剪到边界上(保留样本量)
df[col] = df[col].clip(lower, upper)
return df
def _create_features(self, df):
"""
特征工程(私有方法)
创建派生特征,增强模型表达能力
参数:
df: 待处理的数据
返回:
添加了新特征后的 DataFrame
"""
df = df.copy()
# 特征1:性价比(价格/评分)
if 'price' in df.columns and 'rating' in df.columns:
# 加 0.01 避免除零错误
df['price_per_rating'] = df['price'] / (df['rating'] + 0.01)
print(f" ✓ 创建特征: price_per_rating")
# 特征2:年龄段分组(将连续年龄转换为离散分组)
if 'age' in df.columns:
df['age_group'] = pd.cut(
df['age'],
bins=[0, 25, 35, 50, 100], # 分组边界
labels=[0, 1, 2, 3] # 分组标签
)
print(f" ✓ 创建特征: age_group")
return df
def _build_preprocessor(self, X_train):
"""
构建预处理器(关键修复:只在训练集上 fit)
功能:
1. 数值列:标准化(StandardScaler)
2. 类别列:独热编码(OneHotEncoder)
参数:
X_train: 训练集特征(用于 fit 预处理器)
返回:
配置好的 ColumnTransformer
"""
# 识别数值列和类别列
numeric_cols = X_train.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = X_train.select_dtypes(include=['object']).columns.tolist()
# 排除目标列(如果误被包含)
if 'purchased' in categorical_cols:
categorical_cols.remove('purchased')
print(f" 数值列: {numeric_cols}")
print(f" 类别列: {categorical_cols}")
# 构建 ColumnTransformer:对不同列应用不同转换
self.preprocessor = ColumnTransformer([
# 数值列:标准化(减去均值,除以标准差)
('numeric', StandardScaler(), numeric_cols),
# 类别列:独热编码(drop='first' 避免多重共线性)
('categorical', OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore'), categorical_cols)
])
# 在训练集上拟合预处理器(计算均值和标准差)
self.preprocessor.fit(X_train)
# 获取预处理后的特征名称(用于后续的 DataFrame)
try:
numeric_feature_names = numeric_cols
categorical_feature_names = self.preprocessor.named_transformers_['categorical'].get_feature_names_out(
categorical_cols)
self.feature_names = numeric_feature_names + list(categorical_feature_names)
except:
self.feature_names = None
return self.preprocessor
def prepare_data(self, target_column, test_size=0.2, val_size=0.2, random_state=42):
"""
完整的数据准备流程(关键修复:先划分,再分别处理)
执行步骤:
1. 划分数据集(训练/验证/测试)
2. 对每个数据集分别进行清洗和特征工程
3. 在训练集上 fit 预处理器
4. 用训练好的预处理器转换所有数据集
参数:
target_column: 目标列名称(标签列)
test_size: 测试集比例
val_size: 验证集比例
random_state: 随机种子
返回:
splits: 字典,包含 X_train, y_train, X_val, y_val, X_test, y_test
"""
print("\n" + "=" * 50)
print("数据准备流水线")
print("=" * 50)
# ----- 步骤 1:划分数据集(先划分!避免数据泄露)-----
X = self.original_data.drop(columns=[target_column]) # 特征
y = self.original_data[target_column] # 标签
# 第一次划分:分出测试集
X_temp, X_test, y_temp, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state,
stratify=y # 分层采样:保持各类别比例一致
)
# 第二次划分:从剩余数据分出验证集
val_ratio = val_size / (1 - test_size) # 调整验证集比例
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=val_ratio, random_state=random_state,
stratify=y_temp
)
print(f"✓ 数据划分完成:")
print(f" 训练集: {X_train.shape[0]} 条")
print(f" 验证集: {X_val.shape[0]} 条")
print(f" 测试集: {X_test.shape[0]} 条")
# ----- 步骤 2:分别进行清洗和特征工程 -----
print("\n✓ 数据清洗和特征工程:")
X_train = self._clean_data(X_train)
X_train = self._create_features(X_train)
X_val = self._clean_data(X_val)
X_val = self._create_features(X_val)
X_test = self._clean_data(X_test)
X_test = self._create_features(X_test)
# ----- 步骤 3:构建预处理器(只在训练集上 fit)-----
print("\n✓ 构建预处理器:")
self._build_preprocessor(X_train)
# ----- 步骤 4:转换数据 -----
print("\n✓ 转换数据:")
X_train_processed = self.preprocessor.transform(X_train)
X_val_processed = self.preprocessor.transform(X_val)
X_test_processed = self.preprocessor.transform(X_test)
# ----- 步骤 5:转换为 DataFrame(便于查看)-----
if self.feature_names:
X_train_processed = pd.DataFrame(X_train_processed, columns=self.feature_names)
X_val_processed = pd.DataFrame(X_val_processed, columns=self.feature_names)
X_test_processed = pd.DataFrame(X_test_processed, columns=self.feature_names)
# 打包返回
splits = {
'X_train': X_train_processed, 'y_train': y_train,
'X_val': X_val_processed, 'y_val': y_val,
'X_test': X_test_processed, 'y_test': y_test
}
print(f"\n✓ 数据处理完成,最终特征数: {X_train_processed.shape[1]}")
return splits
def transform_new_data(self, df):
"""
转换新数据(用于生产环境预测)
使用已拟合的预处理器进行转换
参数:
df: 新数据的 DataFrame
返回:
转换后的 DataFrame
"""
if self.preprocessor is None:
raise ValueError("请先调用 prepare_data 方法拟合预处理器")
# 应用相同的清洗和特征工程
df = self._clean_data(df)
df = self._create_features(df)
# 转换
transformed = self.preprocessor.transform(df)
if self.feature_names:
transformed = pd.DataFrame(transformed, columns=self.feature_names)
return transformed
# =================================================================================
# 模块 3:模型训练器 (ModelTrainer)
# 功能:注册模型、训练模型、评估模型、选择最佳模型
# =================================================================================
class ModelTrainer:
"""
模型训练器类
负责管理多个模型,批量训练和评估
"""
def __init__(self):
self.models = {} # 存储模型配置 {'name': {'model': ..., 'problem_type': ...}}
self.trained_models = {} # 存储训练好的模型实例
self.training_history = {} # 存储训练历史(如训练时间)
def register_model(self, name, model, problem_type):
"""
注册模型(在训练前声明要使用的模型)
参数:
name: 模型名称(自定义,如 '逻辑回归')
model: sklearn 模型实例(未训练的)
problem_type: 问题类型,'classification' 或 'regression'
返回:
self: 支持链式调用
"""
self.models[name] = {
'model': model,
'problem_type': problem_type
}
print(f"✓ 注册模型:{name}({problem_type})")
return self
def train_single_model(self, name, X_train, y_train):
"""
训练单个模型
参数:
name: 已注册的模型名称
X_train: 训练集特征
y_train: 训练集标签
返回:
训练好的模型
"""
if name not in self.models:
raise ValueError(f"模型 {name} 未注册")
model = self.models[name]['model']
print(f"\n▶ 训练模型:{name}")
# 记录训练开始时间
start_time = datetime.now()
model.fit(X_train, y_train) # 核心:训练模型
elapsed = (datetime.now() - start_time).total_seconds()
# 保存训练结果
self.trained_models[name] = model
self.training_history[name] = {'training_time': elapsed}
print(f" ✓ 完成,耗时 {elapsed:.2f} 秒")
return model
def train_all_models(self, X_train, y_train):
"""
训练所有已注册的模型
单个模型训练失败不影响其他模型
参数:
X_train: 训练集特征
y_train: 训练集标签
返回:
所有训练好的模型字典
"""
print("\n" + "-" * 40)
print("开始批量训练模型")
print("-" * 40)
for name in self.models.keys():
try:
self.train_single_model(name, X_train, y_train)
except Exception as e:
print(f" ✗ 训练模型 {name} 时出错:{e}")
return self.trained_models
def evaluate_models(self, X_test, y_test):
"""
评估所有训练好的模型
参数:
X_test: 测试集特征
y_test: 测试集标签
返回:
results: 各模型的评估结果字典
"""
print("\n" + "-" * 40)
print("模型评估结果")
print("-" * 40)
results = {}
for name, model in self.trained_models.items():
problem_type = self.models[name]['problem_type']
y_pred = model.predict(X_test)
if problem_type == 'classification':
accuracy = accuracy_score(y_test, y_pred)
results[name] = {'accuracy': accuracy}
print(f"\n【{name}】")
print(f" 准确率: {accuracy:.4f}")
print(f" 分类报告:")
print(classification_report(y_test, y_pred, target_names=['未购买', '购买']))
elif problem_type == 'regression':
from sklearn.metrics import mean_squared_error
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
results[name] = {'mse': mse, 'rmse': rmse}
print(f"\n【{name}】")
print(f" MSE: {mse:.4f}, RMSE: {rmse:.4f}")
return results
def get_best_model(self, results, metric='accuracy', higher_is_better=True):
"""
获取最佳模型(根据指定指标)
参数:
results: evaluate_models 返回的结果字典
metric: 评估指标名称,如 'accuracy', 'rmse'
higher_is_better: 指标是否越大越好(True: 选最大,False: 选最小)
返回:
best_name: 最佳模型名称
best_model: 最佳模型实例
"""
if not results:
print("没有评估结果")
return None, None
# 根据指标方向选择最大值或最小值
if higher_is_better:
best_name = max(results.keys(), key=lambda x: results[x].get(metric, -float('inf')))
else:
best_name = min(results.keys(), key=lambda x: results[x].get(metric, float('inf')))
best_score = results[best_name][metric]
print("\n" + "=" * 50)
print(f"🏆 最佳模型:{best_name}")
print(f" {metric} = {best_score:.4f}")
print("=" * 50)
return best_name, self.trained_models[best_name]
# =================================================================================
# 模块 4:模型评估器 (ModelEvaluator)
# 功能:详细评估模型性能,生成可视化图表
# =================================================================================
class ModelEvaluator:
"""
模型评估器类
提供更详细的评估指标和可视化功能
"""
def __init__(self):
self.evaluation_results = {}
def evaluate_classification(self, y_true, y_pred, y_prob=None, model_name="Model"):
"""
评估分类模型
参数:
y_true: 真实标签
y_pred: 预测标签
y_prob: 预测概率(用于 ROC 曲线)
model_name: 模型名称
返回:
results: 评估结果字典
"""
# 计算各项评估指标
results = {
'accuracy': accuracy_score(y_true, y_pred), # 准确率
'precision': precision_score(y_true, y_pred, average='binary', zero_division=0), # 精确率
'recall': recall_score(y_true, y_pred, average='binary', zero_division=0), # 召回率
'f1': f1_score(y_true, y_pred, average='binary', zero_division=0) # F1分数
}
print(f"\n【{model_name} 分类评估】")
print(f" 准确率 (Accuracy): {results['accuracy']:.4f} - 预测正确的比例")
print(f" 精确率 (Precision): {results['precision']:.4f} - 预测为正例中实际为正例的比例")
print(f" 召回率 (Recall): {results['recall']:.4f} - 实际正例中被正确预测的比例")
print(f" F1分数: {results['f1']:.4f} - 精确率和召回率的调和平均")
# 混淆矩阵
cm = confusion_matrix(y_true, y_pred)
print(f"\n 混淆矩阵:")
print(f" ┌─────────────┬─────────────┐")
print(f" │ TN={cm[0, 0]:<3} │ FP={cm[0, 1]:<3} │ 预测为负 / 预测为正")
print(f" ├─────────────┼─────────────┤")
print(f" │ FN={cm[1, 0]:<3} │ TP={cm[1, 1]:<3} │")
print(f" └─────────────┴─────────────┘")
print(f" 实际为负 实际为正")
# ROC 曲线和 AUC(需要概率预测)
if y_prob is not None:
# 处理概率数组的格式
if y_prob.ndim > 1:
y_prob = y_prob[:, 1] # 取正类的概率
fpr, tpr, _ = roc_curve(y_true, y_prob)
roc_auc = auc(fpr, tpr)
results['roc_auc'] = roc_auc
print(f" AUC: {roc_auc:.4f} - ROC曲线下面积")
# 绘制 ROC 曲线
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, 'b-', lw=2, label=f'ROC 曲线 (AUC = {roc_auc:.3f})')
plt.plot([0, 1], [0, 1], 'r--', lw=1, label='随机分类器')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('假正率 (False Positive Rate)')
plt.ylabel('真正率 (True Positive Rate)')
plt.title(f'{model_name} - ROC 曲线')
plt.legend(loc="lower right")
plt.grid(True, alpha=0.3)
plt.show()
self.evaluation_results[model_name] = results
return results
# =================================================================================
# 模块 5:模型部署器 (ModelDeployer)
# 功能:保存/加载模型、创建预测服务、模型监控
# =================================================================================
class ModelDeployer:
"""
模型部署器类
负责将训练好的模型保存到文件,并创建可用于生产的预测服务
"""
def __init__(self):
self.deployed_models = {} # 已部署的模型
self.preprocessors = {} # 对应的预处理器
self.deployment_logs = [] # 部署日志
def save_model(self, model, model_name, preprocessor=None, filepath=None):
"""
保存模型(和预处理器)到文件
参数:
model: 训练好的模型
model_name: 模型名称
preprocessor: 对应的预处理器(用于生产环境的数据预处理)
filepath: 保存路径,默认为 f"{model_name}.pkl"
返回:
保存的文件路径
"""
if filepath is None:
filepath = f"{model_name}.pkl"
# 将模型和预处理器打包在一起
package = {
'model': model,
'preprocessor': preprocessor,
'model_name': model_name,
'saved_at': datetime.now().isoformat()
}
with open(filepath, 'wb') as f:
pickle.dump(package, f)
print(f"✓ 模型 {model_name} 已保存到 {filepath}")
# 记录日志
self.deployment_logs.append({
'timestamp': datetime.now().isoformat(),
'action': 'save_model',
'model_name': model_name,
'filepath': filepath
})
return filepath
def load_model(self, filepath):
"""
从文件加载模型和预处理器
参数:
filepath: 模型文件路径
返回:
model: 加载的模型
preprocessor: 加载的预处理器
"""
with open(filepath, 'rb') as f:
package = pickle.load(f)
model = package['model']
model_name = package.get('model_name', 'unknown')
preprocessor = package.get('preprocessor')
self.deployed_models[model_name] = model
if preprocessor:
self.preprocessors[model_name] = preprocessor
print(f"✓ 模型 {model_name} 已从 {filepath} 加载")
self.deployment_logs.append({
'timestamp': datetime.now().isoformat(),
'action': 'load_model',
'model_name': model_name,
'filepath': filepath
})
return model, preprocessor
def create_prediction_service(self, model_name, preprocessor=None):
"""
创建预测服务(返回一个可调用的函数)
参数:
model_name: 模型名称 或 模型文件路径
preprocessor: 预处理器(可选,如果不提供则使用已加载的)
返回:
predict_service: 预测函数,接收数据返回预测结果
"""
# 如果传入的是文件路径,先加载
if model_name not in self.deployed_models:
import os
if os.path.exists(model_name):
model, preprocessor = self.load_model(model_name)
model_name = model_name.replace('.pkl', '')
else:
raise ValueError(f"模型 {model_name} 未部署")
model = self.deployed_models[model_name]
preprocessor = preprocessor or self.preprocessors.get(model_name)
def predict_service(input_data):
"""
预测服务内部函数
参数:
input_data: 原始数据(DataFrame、dict 或 list)
返回:
dict: 包含预测结果、概率和状态信息
"""
try:
# 1. 转换为 DataFrame
if not isinstance(input_data, pd.DataFrame):
input_data = pd.DataFrame(input_data)
# 2. 数据预处理(如果提供了预处理器)
if preprocessor is not None:
if hasattr(preprocessor, 'transform_new_data'):
input_data = preprocessor.transform_new_data(input_data)
elif hasattr(preprocessor, 'transform'):
input_data = preprocessor.transform(input_data)
# 3. 预测
prediction = model.predict(input_data)
# 4. 预测概率(分类模型特有)
probability = None
if hasattr(model, 'predict_proba'):
probability = model.predict_proba(input_data)
# 5. 返回结果
return {
'prediction': prediction.tolist(),
'probability': probability.tolist() if probability is not None else None,
'status': 'success',
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'error': str(e),
'status': 'error',
'timestamp': datetime.now().isoformat()
}
return predict_service
# =================================================================================
# 模块 6:主流程控制器 (MLProjectPipeline)
# 功能:串联所有模块,提供一键运行的能力
# =================================================================================
class MLProjectPipeline:
"""
机器学习项目流水线主控制器
使用门面模式,将复杂的子模块操作封装为简单的一键运行接口
"""
def __init__(self):
"""初始化所有子模块"""
self.data_collector = DataCollector(seed=42) # 数据收集器
self.data_preparer = None # 数据准备器
self.model_trainer = ModelTrainer() # 模型训练器
self.model_evaluator = ModelEvaluator() # 模型评估器
self.model_deployer = ModelDeployer() # 模型部署器
def run_complete_pipeline(self, target_column='purchased', test_size=0.2, val_size=0.2):
"""
运行完整的机器学习项目流程
参数:
target_column: 目标列名称(标签列)
test_size: 测试集比例
val_size: 验证集比例
返回:
dict: 包含所有中间结果和最终模型
"""
print("\n" + "=" * 60)
print("🚀 机器学习项目完整流程")
print("=" * 60)
# ==================== 第1步:数据收集 ====================
print("\n📁 第1步:数据收集")
print("-" * 40)
# 收集用户数据(1000条)
user_data = self.data_collector.collect_user_data(n_users=1000)
# 收集行为数据(5000条)
behavior_data = self.data_collector.collect_behavior_data(n_behaviors=5000, n_users=1000)
# 合并数据:以内连接方式关联用户和行为
merged_data = pd.merge(user_data, behavior_data, on='user_id', how='inner')
# 创建目标变量:是否有购买行为(1=购买,0=未购买)
merged_data['purchased'] = (merged_data['behavior_type'] == '购买').astype(int)
print(merged_data.to_string())
print(f"\n✓ 合并后数据: {len(merged_data)} 条记录")
print(f" 购买比例: {merged_data['purchased'].mean():.2%}") # 查看正负样本平衡情况
# ==================== 第2步:数据准备 ====================
print("\n🔧 第2步:数据准备")
print("-" * 40)
# 选择用于模型训练的特征列
feature_columns = ['age', 'gender', 'city', 'duration']
data_for_ml = merged_data[feature_columns + ['purchased']].copy()
# 在准备数据之前,先保存原始特征和目标
X_full = data_for_ml[feature_columns].copy()
y_full = data_for_ml['purchased'].copy()
# 划分数据(使用相同的 random_state 保证一致性)
from sklearn.model_selection import train_test_split
X_temp, X_test_raw, y_temp, y_test_raw = train_test_split(
X_full, y_full, test_size=test_size, random_state=42, stratify=y_full
)
val_ratio = val_size / (1 - test_size)
X_train_raw, X_val_raw, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=val_ratio, random_state=42, stratify=y_temp
)
# 初始化数据准备器
self.data_preparer = DataPreparer(data_for_ml)
# 执行数据准备(清洗、特征工程、转换、划分)
splits = self.data_preparer.prepare_data(
target_column=target_column,
test_size=test_size,
val_size=val_size,
random_state=42
)
# 将原始数据添加到 splits 中
splits['X_train_raw'] = X_train_raw
splits['X_val_raw'] = X_val_raw
splits['X_test_raw'] = X_test_raw
# ==================== 第3步:模型训练 ====================
print("\n🤖 第3步:模型训练")
print("-" * 40)
# 注册要使用的模型
self.model_trainer.register_model(
'逻辑回归',
LogisticRegression(random_state=42, max_iter=1000),
'classification'
)
# 随机森林既可以做分类,也可以做回归。
self.model_trainer.register_model(
'随机森林',
RandomForestClassifier(n_estimators=100, random_state=42),
'classification'
)
# 批量训练所有注册的模型
trained_models = self.model_trainer.train_all_models(
splits['X_train'], splits['y_train']
)
# ==================== 第4步:模型评估 ====================
print("\n📊 第4步:模型评估")
print("-" * 40)
# 在测试集上评估所有模型
results = self.model_trainer.evaluate_models(splits['X_test'], splits['y_test'])
# 选出最佳模型(基于准确率)
best_name, best_model = self.model_trainer.get_best_model(
results, metric='accuracy', higher_is_better=True
)
# 使用 ModelEvaluator 进行更详细的评估
if best_model:
y_pred = best_model.predict(splits['X_test'])
y_prob = best_model.predict_proba(splits['X_test']) if hasattr(best_model, 'predict_proba') else None
self.model_evaluator.evaluate_classification(
splits['y_test'], y_pred, y_prob, best_name
)
# ==================== 第5步:模型部署 ====================
print("\n📦 第5步:模型部署")
print("-" * 40)
# 保存最佳模型和预处理器
model_path = self.model_deployer.save_model(
best_model,
best_name,
preprocessor=self.data_preparer
)
# 创建预测服务(可用于生产环境)
prediction_service = self.model_deployer.create_prediction_service(model_path)
# 测试预测服务
print("\n🧪 测试预测服务...")
# test_input = splits['X_test'].head(3) # 取测试集前3条
# print("\n测试输入:")
# print(test_input)
raw_test_input = splits['X_test_raw'].head(3)
print("\n测试输入(原始数据 - 包含原始列名):")
print(raw_test_input)
# prediction_result = prediction_service(test_input)
prediction_result = prediction_service(raw_test_input)
print("\n预测结果:")
print(json.dumps(prediction_result, indent=2, ensure_ascii=False))
# ==================== 完成 ====================
print("\n" + "=" * 60)
print("✅ 机器学习项目流程完成!")
print("=" * 60)
# 返回所有结果,供后续分析使用
return {
'data': data_for_ml, # 原始数据
'splits': splits, # 数据划分结果
'best_model': best_model, # 最佳模型
'best_model_name': best_name, # 最佳模型名称
'evaluation_results': results, # 评估结果
'prediction_service': prediction_service # 预测服务函数
}
# =================================================================================
# 程序入口
# =================================================================================
if __name__ == "__main__":
"""
当脚本被直接运行时(而不是作为模块导入),执行以下代码
"""
# 创建流水线实例
pipeline = MLProjectPipeline()
# 运行完整流程(指定目标列名为 'purchased')
project_results = pipeline.run_complete_pipeline(target_column='purchased')
# # 打印最终结果摘要
# if project_results:
# print("\n" + "=" * 60)
# print("📋 最终结果摘要")
# print("=" * 60)
# print(f" 最佳模型: {project_results['best_model_name']}")
# print(f" 评估结果: {project_results['evaluation_results']}")
# print(f" 训练集大小: {project_results['splits']['X_train'].shape}")
# print(f" 测试集大小: {project_results['splits']['X_test'].shape}")
#
# # 可选:保存结果到文件
# import joblib
# joblib.dump(project_results['best_model'], 'final_model.pkl')
# print("\n✓ 最终模型已保存到 final_model.pkl")
产生下面问题的原因,我预测使用了预处理的数据
print("\n🧪 测试预测服务...")
test_input = splits['X_test'].head(3) # 取测试集前3条
print("\n测试输入:")
print(test_input)
结果报错
{
"error": "columns are missing: {'city', 'gender'}",
"status": "error",
"timestamp": "2026-05-19T11:18:05.136684"
}
为什么训练模型使用X_train ,预测时使用的X_test_raw
这是一个非常好的问题!它触及了机器学习流程中的一个核心概念:训练用的数据和预测用的数据格式必须一致。
让我详细解释为什么会出现 X_train 和 X_train_raw 这两个东西。
一、问题核心:数据格式不同
| 数据集 | 内容 | 列名示例 | 用途 |
|---|---|---|---|
X_train |
预处理后的数据 | age, gender_男, city_北京, duration |
训练模型 |
| X_test_raw | 原始未处理的数据 | age, gender, city, duration |
模拟真实世界的原始输入 |
具体例子
原始数据 (X_test_raw):
age gender city duration
0 25 男 北京 120
1 30 女 上海 300
2 35 男 广州 450
预处理后的数据 (X_train):
age gender_男 city_北京 city_上海 city_广州 duration
0 25 1 1 0 0 120
1 30 0 0 1 0 300
2 35 1 0 0 1 450
二、为什么训练和预测要用不同格式的数据?
训练阶段:使用 X_train
模型学习的是 "预处理后的特征" 与 "标签" 之间的关系。
# 模型看到的是这样的数据
X_train (预处理后) → y_train (标签)
[25, 1, 1, 0, 0, 120] → 1 (购买)
[30, 0, 0, 1, 0, 300] → 0 (未购买)
预测阶段:应该使用 X_test_raw(或类似格式)
在真实世界中,新数据到来时是原始格式:
# 新用户数据(原始格式)
new_user = {
'age': 28,
'gender': '女',
'city': '深圳',
'duration': 200
}
关键问题:模型不认识 '女' 和 '深圳',它只认识 0/1 数字!
所以预测服务的正确流程是:
原始数据 → 预处理器.transform() → 模型.predict()
三、错误原因回顾
你的错误代码:
# ❌ 错误:直接用预处理后的数据测试
test_input = splits['X_test'].head(3) # 已经是 [25,1,1,0,...] 格式
prediction_result = prediction_service(test_input)
预测服务内部会再次尝试预处理:
def predict_service(input_data):
# 尝试找 'gender' 列
input_data['gender'] # ❌ 报错!因为 X_test 中已经没有 'gender' 列了
X_test 已经是处理后的数据,没有原始列名,所以预处理步骤失败了。
四、正确的流程对比
训练阶段流程
原始数据 (X_train_raw)
│
▼
DataPreparer.transform() # 预处理(编码、标准化)
│
▼
处理后的数据 (X_train)
│
▼
model.fit(X_train, y_train) # 训练
预测阶段流程(正确)
新原始数据 (new_data_raw) ← 这就是为什么需要 X_train_raw 格式
│
▼
预处理器.transform() ← 使用训练时拟合的预处理器
│
▼
处理后的数据
│
▼
model.predict() ← 预测
五、为什么不直接用 X_train_raw 训练?
# 如果直接用原始数据训练
model.fit(X_train_raw, y_train) # ❌ 不行!
原因:
| 问题 | 说明 |
|---|---|
| 类别变量无法计算 | 模型无法处理 '男'、'女' 这样的字符串 |
| 数值范围不一致 | age(0-100) 和 duration(0-1000) 尺度不同,会影响模型 |
| 无法学习类别关系 | 模型不理解 '北京' 和 '上海' 是不同类别 |
所以必须先预处理,再训练。
六、总结:两者关系
| 对比项 | X_train |
X_test_raw |
|---|---|---|
| 数据格式 | 数值(0/1、标准化值) | 原始(字符串、数值混合) |
| 列数 | 更多(One-Hot 后列数增加) | 较少 |
| 用途 | 训练模型 | 模拟真实输入、验证预处理器 |
| 是否需要预处理 | 否(已经是预处理后) | 是(需要预处理器转换) |
| 模型能直接使用吗 | ✅ 能 | ❌ 不能 |
七、你的代码应该如何修复?
# 在 prepare_data 方法中,同时返回原始数据
splits = {
# 预处理后的数据(用于训练和评估)
'X_train': X_train_processed,
'X_val': X_val_processed,
'X_test': X_test_processed,
# ✅ 原始数据(用于模拟真实预测)
'X_train_raw': X_train_raw,
'X_val_raw': X_val_raw,
'X_test_raw': X_test_raw,
'y_train': y_train,
'y_val': y_val,
'y_test': y_test
}
# 测试预测服务时,使用原始数据
test_input = splits['X_test_raw'].head(3) # ✅ 使用原始格式
prediction_result = prediction_service(test_input)
一句话总结:
模型训练用的是"处理后的数字",但真实世界的输入是"原始数据"。预处理器的作用就是在这两者之间做转换。所以测试预测服务时,要用原始数据来验证整个流程是否正确。
是否同样原因,保存模型的时候 同时保存预处理器?
是的,完全正确! 你理解到了机器学习工程化的核心要点。
预处理器保存了什么?
| 组件 | 保存的内容 | 用途 |
|---|---|---|
OneHotEncoder |
类别和编码的映射关系 | 知道 '女' 应该变成 [0,1],'深圳' 应该变成 [0,0,0,1] |
StandardScaler |
均值和标准差 | 知道 age=28 应该减去多少、除以多少 |
| 特征列名 | 哪些列需要编码/标准化 | 知道新数据应该按什么顺序处理 |
LabelEncoder |
标签映射(如有) | 知道预测结果 1 对应什么业务含义 |
如果没有这些信息,预处理器无法正确转换新数据
完整的数据流图
┌─────────────────────────────────────────────────────────────────────────┐ │ 训练阶段 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 原始数据 ──┬──► X_train_raw ──► 预处理器.fit_transform() ──► X_train │ │ │ │ │ │ │ ▼ │ │ │ 保存预处理器参数 │ │ │ (类别映射、均值、标准差) │ │ │ │ │ └──► y_train ──────────────────────────────────────────────►│ │ │ │ ▼ │ │ model.fit(X_train, y_train) │ │ │ │ │ ▼ │ │ 保存 model + 预处理器 │ │ │ └─────────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────────┐ │ 生产预测阶段 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 加载 ──► model + 预处理器 │ │ │ │ 新原始数据 ──► 预处理器.transform() ──► 处理后的数据 ──► model.predict()│ │ (包含 'gender', 'city') (数值) (预测结果) │ │ │ └─────────────────────────────────────────────────────────────────────────┘
总结
| 问题 | 答案 |
|---|---|
| 为什么训练和预测数据格式不同? | 模型只认识数字,但现实世界是字符串+数字混合 |
| 谁负责格式转换? | 预处理器(Preprocessor) |
| 预处理器需要保存吗? | 必须保存,否则无法转换新数据 |
| 你代码中的设计正确吗? | ✅ 正确,ModelDeployer 同时保存了模型和预处理器 |
| 你的错误原因? | 测试时用了处理后的数据,而不是原始数据 |
一句话总结:
模型只懂"数字",真实世界是"人话"。预处理器是翻译官,必须和模型一起保存,否则翻译官丢了,模型就变成了"哑巴"。你的错误正是因为没有用"人话"(原始数据)去问模型。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)