接上文,这是一个可运行的示例

"""
================================================================================
完整可运行的机器学习项目流程
================================================================================

修复了以下问题:
1. 数据泄露(标准化在划分之后进行)
2. 目标列名称不匹配
3. 预测服务缺少预处理
4. LabelEncoder 改为 OneHotEncoder
5. 修复 get_best_model 对回归指标的逻辑错误
6. 使用独立的随机种子管理
7. 移除 inplace 操作的 FutureWarning
8. 添加完整的异常处理

作者: AI Assistant
版本: 2.0
================================================================================
"""

# =========================
# 导入必要的库
# =========================
import json  # 用于处理 JSON 格式数据
import pandas as pd  # 数据处理和分析
import numpy as np  # 数值计算
import pickle  # 模型序列化(保存/加载)
import matplotlib
import matplotlib.pyplot as plt  # 数据可视化
from datetime import datetime, timedelta  # 时间处理
from sklearn.model_selection import train_test_split, cross_val_score  # 数据划分和交叉验证
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder  # 数据预处理
from sklearn.compose import ColumnTransformer  # 列转换器
from sklearn.pipeline import Pipeline  # 流水线
from sklearn.linear_model import LogisticRegression  # 逻辑回归模型
from sklearn.ensemble import RandomForestClassifier  # 随机森林模型
from sklearn.metrics import (  # 评估指标
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, roc_curve, auc, classification_report
)

# 设置 matplotlib 中文字体(解决中文显示问题)
matplotlib.use('TkAgg')
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']  # 使用黑体
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题


# =================================================================================
# 模块 1:数据收集器 (DataCollector)
# 功能:模拟从多个数据源(用户、行为、商品)收集原始数据
# =================================================================================
class DataCollector:
    """
    数据收集器类
    负责生成模拟数据,用于演示机器学习项目的数据收集阶段
    """

    def __init__(self, seed=42):
        """
        初始化数据收集器

        参数:
            seed: 随机种子,确保数据可复现
        """
        # 创建独立的随机数生成器(比全局 np.random 更可控)
        self.rng = np.random.RandomState(seed)
        # 存储收集到的所有数据
        self.collected_data = {}

    def collect_user_data(self, n_users=1000):
        """
        收集用户画像数据

        参数:
            n_users: 需要生成的用户数量

        返回:
            DataFrame: 包含用户信息的表格
        """
        # 生成用户数据字典
        user_data = {
            'user_id': range(1, n_users + 1),  # 用户ID,从1开始
            'age': self.rng.randint(18, 65, n_users),  # 年龄:18-65岁随机
            'gender': self.rng.choice(['男', '女'], n_users),  # 性别:随机男/女
            'city': self.rng.choice(['北京', '上海', '广州', '深圳'], n_users),  # 所在城市
            'registration_date': [  # 注册日期:过去一年内随机
                datetime.now() - timedelta(days=self.rng.randint(1, 365))
                for _ in range(n_users)
            ]
        }
        # 转换为 DataFrame 并存储
        self.collected_data['users'] = pd.DataFrame(user_data)
        print(f"✓ 收集了 {n_users} 条用户数据")
        return self.collected_data['users']

    def collect_behavior_data(self, n_behaviors=5000, n_users=1000):
        """
        收集用户行为数据(浏览、点击、加购物车、购买等)

        参数:
            n_behaviors: 行为记录数量
            n_users: 用户总数(用于生成合理的 user_id 范围)

        返回:
            DataFrame: 包含行为记录的表格
        """
        # 从用户ID范围内随机选择产生行为的用户
        user_ids = self.rng.choice(range(1, n_users + 1), n_behaviors)
        # 从商品ID范围内随机选择商品(假设有500个商品)
        product_ids = self.rng.choice(range(1, 501), n_behaviors)

        behavior_data = {
            'behavior_id': range(1, n_behaviors + 1),  # 行为ID
            'user_id': user_ids,  # 用户ID
            'product_id': product_ids,  # 商品ID
            'behavior_type': self.rng.choice(  # 行为类型(带概率权重)
                ['浏览', '点击', '加购物车', '购买'], n_behaviors,
                p=[0.4, 0.3, 0.2, 0.1]  # 40%浏览, 30%点击, 20%加购, 10%购买
            ),
            'timestamp': [  # 行为发生时间
                datetime.now() - timedelta(minutes=self.rng.randint(1, 10080))
                for _ in range(n_behaviors)
            ],
            'duration': self.rng.exponential(30, n_behaviors)  # 停留时间(指数分布)
        }
        self.collected_data['behaviors'] = pd.DataFrame(behavior_data)
        print(f"✓ 收集了 {n_behaviors} 条行为数据")
        return self.collected_data['behaviors']

    def collect_product_data(self, n_products=500):
        """
        收集商品数据(本示例中暂未使用,保留以扩展功能)

        参数:
            n_products: 商品数量
        """
        categories = ['电子产品', '服装', '食品', '家居', '图书']
        product_data = {
            'product_id': range(1, n_products + 1),
            'category': self.rng.choice(categories, n_products),  # 商品类别
            'price': self.rng.uniform(10, 1000, n_products).round(2),  # 价格
            'rating': self.rng.uniform(3.0, 5.0, n_products).round(1),  # 评分
            'stock': self.rng.randint(0, 1000, n_products)  # 库存
        }
        self.collected_data['products'] = pd.DataFrame(product_data)
        print(f"✓ 收集了 {n_products} 条商品数据")
        return self.collected_data['products']

    def get_data_summary(self):
        """打印所有收集数据的摘要信息"""
        print("\n" + "=" * 50)
        print("数据收集摘要")
        print("=" * 50)
        for name, df in self.collected_data.items():
            print(f"\n【{name}】")
            print(f"  形状: {df.shape}")
            print(f"  列名: {list(df.columns)}")
            print(f"  缺失值: {df.isnull().sum().sum()}")


# =================================================================================
# 模块 2:数据准备器 (DataPreparer)
# 功能:数据清洗、特征工程、数据转换、数据集划分
# 关键修复:先划分后处理,避免数据泄露
# =================================================================================
class DataPreparer:
    """
    数据准备器类
    负责将原始数据转换为模型可用的格式

    重要改进:
    1. 先划分数据集,再分别进行预处理(避免数据泄露)
    2. 使用 OneHotEncoder 替代 LabelEncoder(避免引入顺序关系)
    3. 使用 ColumnTransformer 统一管理数值列和类别列的处理
    """

    def __init__(self, data):
        """
        初始化数据准备器

        参数:
            data: 原始 DataFrame(包含特征和目标列)
        """
        self.original_data = data.copy()  # 保存原始数据的副本
        self.preprocessor = None  # 预处理器(ColumnTransformer 对象)
        self.feature_names = None  # 预处理后的特征名称

    def _clean_data(self, df):
        """
        数据清洗(私有方法)
        处理缺失值、重复值、异常值

        注意:这个方法不包含标准化,因为标准化需要在数据划分之后进行

        参数:
            df: 待清洗的 DataFrame

        返回:
            清洗后的 DataFrame
        """
        df = df.copy()  # 避免修改原数据

        # ----- 1. 处理缺失值 -----
        # 数值列:使用中位数填充(比均值更鲁棒)
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            if df[col].isnull().sum() > 0:
                df[col] = df[col].fillna(df[col].median())

        # 类别列:使用众数填充(出现最多的值)
        categorical_cols = df.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            if df[col].isnull().sum() > 0:
                mode_val = df[col].mode()[0] if len(df[col].mode()) > 0 else 'unknown'
                df[col] = df[col].fillna(mode_val)

        # ----- 2. 处理重复值 -----
        df = df.drop_duplicates()

        # ----- 3. 处理异常值(使用 IQR 四分位距方法)-----
        for col in numeric_cols:
            Q1 = df[col].quantile(0.25)  # 第一四分位数(25%位置)
            Q3 = df[col].quantile(0.75)  # 第三四分位数(75%位置)
            IQR = Q3 - Q1  # 四分位距
            lower = Q1 - 1.5 * IQR  # 下界
            upper = Q3 + 1.5 * IQR  # 上界
            # 将超出边界的值裁剪到边界上(保留样本量)
            df[col] = df[col].clip(lower, upper)

        return df

    def _create_features(self, df):
        """
        特征工程(私有方法)
        创建派生特征,增强模型表达能力

        参数:
            df: 待处理的数据

        返回:
            添加了新特征后的 DataFrame
        """
        df = df.copy()

        # 特征1:性价比(价格/评分)
        if 'price' in df.columns and 'rating' in df.columns:
            # 加 0.01 避免除零错误
            df['price_per_rating'] = df['price'] / (df['rating'] + 0.01)
            print(f"  ✓ 创建特征: price_per_rating")

        # 特征2:年龄段分组(将连续年龄转换为离散分组)
        if 'age' in df.columns:
            df['age_group'] = pd.cut(
                df['age'],
                bins=[0, 25, 35, 50, 100],  # 分组边界
                labels=[0, 1, 2, 3]  # 分组标签
            )
            print(f"  ✓ 创建特征: age_group")

        return df

    def _build_preprocessor(self, X_train):
        """
        构建预处理器(关键修复:只在训练集上 fit)

        功能:
        1. 数值列:标准化(StandardScaler)
        2. 类别列:独热编码(OneHotEncoder)

        参数:
            X_train: 训练集特征(用于 fit 预处理器)

        返回:
            配置好的 ColumnTransformer
        """
        # 识别数值列和类别列
        numeric_cols = X_train.select_dtypes(include=[np.number]).columns.tolist()
        categorical_cols = X_train.select_dtypes(include=['object']).columns.tolist()

        # 排除目标列(如果误被包含)
        if 'purchased' in categorical_cols:
            categorical_cols.remove('purchased')

        print(f"  数值列: {numeric_cols}")
        print(f"  类别列: {categorical_cols}")

        # 构建 ColumnTransformer:对不同列应用不同转换
        self.preprocessor = ColumnTransformer([
            # 数值列:标准化(减去均值,除以标准差)
            ('numeric', StandardScaler(), numeric_cols),
            # 类别列:独热编码(drop='first' 避免多重共线性)
            ('categorical', OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore'), categorical_cols)
        ])

        # 在训练集上拟合预处理器(计算均值和标准差)
        self.preprocessor.fit(X_train)

        # 获取预处理后的特征名称(用于后续的 DataFrame)
        try:
            numeric_feature_names = numeric_cols
            categorical_feature_names = self.preprocessor.named_transformers_['categorical'].get_feature_names_out(
                categorical_cols)
            self.feature_names = numeric_feature_names + list(categorical_feature_names)
        except:
            self.feature_names = None

        return self.preprocessor

    def prepare_data(self, target_column, test_size=0.2, val_size=0.2, random_state=42):
        """
        完整的数据准备流程(关键修复:先划分,再分别处理)

        执行步骤:
        1. 划分数据集(训练/验证/测试)
        2. 对每个数据集分别进行清洗和特征工程
        3. 在训练集上 fit 预处理器
        4. 用训练好的预处理器转换所有数据集

        参数:
            target_column: 目标列名称(标签列)
            test_size: 测试集比例
            val_size: 验证集比例
            random_state: 随机种子

        返回:
            splits: 字典,包含 X_train, y_train, X_val, y_val, X_test, y_test
        """
        print("\n" + "=" * 50)
        print("数据准备流水线")
        print("=" * 50)

        # ----- 步骤 1:划分数据集(先划分!避免数据泄露)-----
        X = self.original_data.drop(columns=[target_column])  # 特征
        y = self.original_data[target_column]  # 标签

        # 第一次划分:分出测试集
        X_temp, X_test, y_temp, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state,
            stratify=y  # 分层采样:保持各类别比例一致
        )

        # 第二次划分:从剩余数据分出验证集
        val_ratio = val_size / (1 - test_size)  # 调整验证集比例
        X_train, X_val, y_train, y_val = train_test_split(
            X_temp, y_temp, test_size=val_ratio, random_state=random_state,
            stratify=y_temp
        )

        print(f"✓ 数据划分完成:")
        print(f"  训练集: {X_train.shape[0]} 条")
        print(f"  验证集: {X_val.shape[0]} 条")
        print(f"  测试集: {X_test.shape[0]} 条")

        # ----- 步骤 2:分别进行清洗和特征工程 -----
        print("\n✓ 数据清洗和特征工程:")
        X_train = self._clean_data(X_train)
        X_train = self._create_features(X_train)

        X_val = self._clean_data(X_val)
        X_val = self._create_features(X_val)

        X_test = self._clean_data(X_test)
        X_test = self._create_features(X_test)

        # ----- 步骤 3:构建预处理器(只在训练集上 fit)-----
        print("\n✓ 构建预处理器:")
        self._build_preprocessor(X_train)

        # ----- 步骤 4:转换数据 -----
        print("\n✓ 转换数据:")
        X_train_processed = self.preprocessor.transform(X_train)
        X_val_processed = self.preprocessor.transform(X_val)
        X_test_processed = self.preprocessor.transform(X_test)


        # ----- 步骤 5:转换为 DataFrame(便于查看)-----
        if self.feature_names:
            X_train_processed = pd.DataFrame(X_train_processed, columns=self.feature_names)
            X_val_processed = pd.DataFrame(X_val_processed, columns=self.feature_names)
            X_test_processed = pd.DataFrame(X_test_processed, columns=self.feature_names)

        # 打包返回
        splits = {
            'X_train': X_train_processed, 'y_train': y_train,
            'X_val': X_val_processed, 'y_val': y_val,
            'X_test': X_test_processed, 'y_test': y_test
        }

        print(f"\n✓ 数据处理完成,最终特征数: {X_train_processed.shape[1]}")

        return splits

    def transform_new_data(self, df):
        """
        转换新数据(用于生产环境预测)
        使用已拟合的预处理器进行转换

        参数:
            df: 新数据的 DataFrame

        返回:
            转换后的 DataFrame
        """
        if self.preprocessor is None:
            raise ValueError("请先调用 prepare_data 方法拟合预处理器")

        # 应用相同的清洗和特征工程
        df = self._clean_data(df)
        df = self._create_features(df)
        # 转换
        transformed = self.preprocessor.transform(df)

        if self.feature_names:
            transformed = pd.DataFrame(transformed, columns=self.feature_names)

        return transformed


# =================================================================================
# 模块 3:模型训练器 (ModelTrainer)
# 功能:注册模型、训练模型、评估模型、选择最佳模型
# =================================================================================
class ModelTrainer:
    """
    模型训练器类
    负责管理多个模型,批量训练和评估
    """

    def __init__(self):
        self.models = {}  # 存储模型配置 {'name': {'model': ..., 'problem_type': ...}}
        self.trained_models = {}  # 存储训练好的模型实例
        self.training_history = {}  # 存储训练历史(如训练时间)

    def register_model(self, name, model, problem_type):
        """
        注册模型(在训练前声明要使用的模型)

        参数:
            name: 模型名称(自定义,如 '逻辑回归')
            model: sklearn 模型实例(未训练的)
            problem_type: 问题类型,'classification' 或 'regression'

        返回:
            self: 支持链式调用
        """
        self.models[name] = {
            'model': model,
            'problem_type': problem_type
        }
        print(f"✓ 注册模型:{name}({problem_type})")
        return self

    def train_single_model(self, name, X_train, y_train):
        """
        训练单个模型

        参数:
            name: 已注册的模型名称
            X_train: 训练集特征
            y_train: 训练集标签

        返回:
            训练好的模型
        """
        if name not in self.models:
            raise ValueError(f"模型 {name} 未注册")

        model = self.models[name]['model']
        print(f"\n▶ 训练模型:{name}")

        # 记录训练开始时间
        start_time = datetime.now()
        model.fit(X_train, y_train)  # 核心:训练模型
        elapsed = (datetime.now() - start_time).total_seconds()

        # 保存训练结果
        self.trained_models[name] = model
        self.training_history[name] = {'training_time': elapsed}

        print(f"  ✓ 完成,耗时 {elapsed:.2f} 秒")
        return model

    def train_all_models(self, X_train, y_train):
        """
        训练所有已注册的模型
        单个模型训练失败不影响其他模型

        参数:
            X_train: 训练集特征
            y_train: 训练集标签

        返回:
            所有训练好的模型字典
        """
        print("\n" + "-" * 40)
        print("开始批量训练模型")
        print("-" * 40)

        for name in self.models.keys():
            try:
                self.train_single_model(name, X_train, y_train)
            except Exception as e:
                print(f"  ✗ 训练模型 {name} 时出错:{e}")

        return self.trained_models

    def evaluate_models(self, X_test, y_test):
        """
        评估所有训练好的模型

        参数:
            X_test: 测试集特征
            y_test: 测试集标签

        返回:
            results: 各模型的评估结果字典
        """
        print("\n" + "-" * 40)
        print("模型评估结果")
        print("-" * 40)

        results = {}
        for name, model in self.trained_models.items():
            problem_type = self.models[name]['problem_type']
            y_pred = model.predict(X_test)

            if problem_type == 'classification':
                accuracy = accuracy_score(y_test, y_pred)
                results[name] = {'accuracy': accuracy}
                print(f"\n【{name}】")
                print(f"  准确率: {accuracy:.4f}")
                print(f"  分类报告:")
                print(classification_report(y_test, y_pred, target_names=['未购买', '购买']))

            elif problem_type == 'regression':
                from sklearn.metrics import mean_squared_error
                mse = mean_squared_error(y_test, y_pred)
                rmse = np.sqrt(mse)
                results[name] = {'mse': mse, 'rmse': rmse}
                print(f"\n【{name}】")
                print(f"  MSE: {mse:.4f}, RMSE: {rmse:.4f}")

        return results

    def get_best_model(self, results, metric='accuracy', higher_is_better=True):
        """
        获取最佳模型(根据指定指标)

        参数:
            results: evaluate_models 返回的结果字典
            metric: 评估指标名称,如 'accuracy', 'rmse'
            higher_is_better: 指标是否越大越好(True: 选最大,False: 选最小)

        返回:
            best_name: 最佳模型名称
            best_model: 最佳模型实例
        """
        if not results:
            print("没有评估结果")
            return None, None

        # 根据指标方向选择最大值或最小值
        if higher_is_better:
            best_name = max(results.keys(), key=lambda x: results[x].get(metric, -float('inf')))
        else:
            best_name = min(results.keys(), key=lambda x: results[x].get(metric, float('inf')))

        best_score = results[best_name][metric]

        print("\n" + "=" * 50)
        print(f"🏆 最佳模型:{best_name}")
        print(f"   {metric} = {best_score:.4f}")
        print("=" * 50)

        return best_name, self.trained_models[best_name]


# =================================================================================
# 模块 4:模型评估器 (ModelEvaluator)
# 功能:详细评估模型性能,生成可视化图表
# =================================================================================
class ModelEvaluator:
    """
    模型评估器类
    提供更详细的评估指标和可视化功能
    """

    def __init__(self):
        self.evaluation_results = {}

    def evaluate_classification(self, y_true, y_pred, y_prob=None, model_name="Model"):
        """
        评估分类模型

        参数:
            y_true: 真实标签
            y_pred: 预测标签
            y_prob: 预测概率(用于 ROC 曲线)
            model_name: 模型名称

        返回:
            results: 评估结果字典
        """
        # 计算各项评估指标
        results = {
            'accuracy': accuracy_score(y_true, y_pred),  # 准确率
            'precision': precision_score(y_true, y_pred, average='binary', zero_division=0),  # 精确率
            'recall': recall_score(y_true, y_pred, average='binary', zero_division=0),  # 召回率
            'f1': f1_score(y_true, y_pred, average='binary', zero_division=0)  # F1分数
        }

        print(f"\n【{model_name} 分类评估】")
        print(f"  准确率 (Accuracy): {results['accuracy']:.4f}  - 预测正确的比例")
        print(f"  精确率 (Precision): {results['precision']:.4f} - 预测为正例中实际为正例的比例")
        print(f"  召回率 (Recall): {results['recall']:.4f}    - 实际正例中被正确预测的比例")
        print(f"  F1分数: {results['f1']:.4f}  - 精确率和召回率的调和平均")

        # 混淆矩阵
        cm = confusion_matrix(y_true, y_pred)
        print(f"\n  混淆矩阵:")
        print(f"    ┌─────────────┬─────────────┐")
        print(f"    │  TN={cm[0, 0]:<3}  │  FP={cm[0, 1]:<3}  │  预测为负 / 预测为正")
        print(f"    ├─────────────┼─────────────┤")
        print(f"    │  FN={cm[1, 0]:<3}  │  TP={cm[1, 1]:<3}  │")
        print(f"    └─────────────┴─────────────┘")
        print(f"     实际为负       实际为正")

        # ROC 曲线和 AUC(需要概率预测)
        if y_prob is not None:
            # 处理概率数组的格式
            if y_prob.ndim > 1:
                y_prob = y_prob[:, 1]  # 取正类的概率
            fpr, tpr, _ = roc_curve(y_true, y_prob)
            roc_auc = auc(fpr, tpr)
            results['roc_auc'] = roc_auc
            print(f"  AUC: {roc_auc:.4f} - ROC曲线下面积")

            # 绘制 ROC 曲线
            plt.figure(figsize=(8, 6))
            plt.plot(fpr, tpr, 'b-', lw=2, label=f'ROC 曲线 (AUC = {roc_auc:.3f})')
            plt.plot([0, 1], [0, 1], 'r--', lw=1, label='随机分类器')
            plt.xlim([0.0, 1.0])
            plt.ylim([0.0, 1.05])
            plt.xlabel('假正率 (False Positive Rate)')
            plt.ylabel('真正率 (True Positive Rate)')
            plt.title(f'{model_name} - ROC 曲线')
            plt.legend(loc="lower right")
            plt.grid(True, alpha=0.3)
            plt.show()

        self.evaluation_results[model_name] = results
        return results


# =================================================================================
# 模块 5:模型部署器 (ModelDeployer)
# 功能:保存/加载模型、创建预测服务、模型监控
# =================================================================================
class ModelDeployer:
    """
    模型部署器类
    负责将训练好的模型保存到文件,并创建可用于生产的预测服务
    """

    def __init__(self):
        self.deployed_models = {}  # 已部署的模型
        self.preprocessors = {}  # 对应的预处理器
        self.deployment_logs = []  # 部署日志

    def save_model(self, model, model_name, preprocessor=None, filepath=None):
        """
        保存模型(和预处理器)到文件

        参数:
            model: 训练好的模型
            model_name: 模型名称
            preprocessor: 对应的预处理器(用于生产环境的数据预处理)
            filepath: 保存路径,默认为 f"{model_name}.pkl"

        返回:
            保存的文件路径
        """
        if filepath is None:
            filepath = f"{model_name}.pkl"

        # 将模型和预处理器打包在一起
        package = {
            'model': model,
            'preprocessor': preprocessor,
            'model_name': model_name,
            'saved_at': datetime.now().isoformat()
        }

        with open(filepath, 'wb') as f:
            pickle.dump(package, f)

        print(f"✓ 模型 {model_name} 已保存到 {filepath}")

        # 记录日志
        self.deployment_logs.append({
            'timestamp': datetime.now().isoformat(),
            'action': 'save_model',
            'model_name': model_name,
            'filepath': filepath
        })

        return filepath

    def load_model(self, filepath):
        """
        从文件加载模型和预处理器

        参数:
            filepath: 模型文件路径

        返回:
            model: 加载的模型
            preprocessor: 加载的预处理器
        """
        with open(filepath, 'rb') as f:
            package = pickle.load(f)

        model = package['model']
        model_name = package.get('model_name', 'unknown')
        preprocessor = package.get('preprocessor')

        self.deployed_models[model_name] = model
        if preprocessor:
            self.preprocessors[model_name] = preprocessor

        print(f"✓ 模型 {model_name} 已从 {filepath} 加载")

        self.deployment_logs.append({
            'timestamp': datetime.now().isoformat(),
            'action': 'load_model',
            'model_name': model_name,
            'filepath': filepath
        })

        return model, preprocessor

    def create_prediction_service(self, model_name, preprocessor=None):
        """
        创建预测服务(返回一个可调用的函数)

        参数:
            model_name: 模型名称 或 模型文件路径
            preprocessor: 预处理器(可选,如果不提供则使用已加载的)

        返回:
            predict_service: 预测函数,接收数据返回预测结果
        """
        # 如果传入的是文件路径,先加载
        if model_name not in self.deployed_models:
            import os
            if os.path.exists(model_name):
                model, preprocessor = self.load_model(model_name)
                model_name = model_name.replace('.pkl', '')
            else:
                raise ValueError(f"模型 {model_name} 未部署")

        model = self.deployed_models[model_name]
        preprocessor = preprocessor or self.preprocessors.get(model_name)

        def predict_service(input_data):
            """
            预测服务内部函数

            参数:
                input_data: 原始数据(DataFrame、dict 或 list)

            返回:
                dict: 包含预测结果、概率和状态信息
            """
            try:
                # 1. 转换为 DataFrame
                if not isinstance(input_data, pd.DataFrame):
                    input_data = pd.DataFrame(input_data)

                # 2. 数据预处理(如果提供了预处理器)
                if preprocessor is not None:
                    if hasattr(preprocessor, 'transform_new_data'):
                        input_data = preprocessor.transform_new_data(input_data)
                    elif hasattr(preprocessor, 'transform'):
                        input_data = preprocessor.transform(input_data)

                # 3. 预测
                prediction = model.predict(input_data)

                # 4. 预测概率(分类模型特有)
                probability = None
                if hasattr(model, 'predict_proba'):
                    probability = model.predict_proba(input_data)

                # 5. 返回结果
                return {
                    'prediction': prediction.tolist(),
                    'probability': probability.tolist() if probability is not None else None,
                    'status': 'success',
                    'timestamp': datetime.now().isoformat()
                }
            except Exception as e:
                return {
                    'error': str(e),
                    'status': 'error',
                    'timestamp': datetime.now().isoformat()
                }

        return predict_service


# =================================================================================
# 模块 6:主流程控制器 (MLProjectPipeline)
# 功能:串联所有模块,提供一键运行的能力
# =================================================================================
class MLProjectPipeline:
    """
    机器学习项目流水线主控制器
    使用门面模式,将复杂的子模块操作封装为简单的一键运行接口
    """

    def __init__(self):
        """初始化所有子模块"""
        self.data_collector = DataCollector(seed=42)  # 数据收集器
        self.data_preparer = None  # 数据准备器
        self.model_trainer = ModelTrainer()  # 模型训练器
        self.model_evaluator = ModelEvaluator()  # 模型评估器
        self.model_deployer = ModelDeployer()  # 模型部署器

    def run_complete_pipeline(self, target_column='purchased', test_size=0.2, val_size=0.2):
        """
        运行完整的机器学习项目流程

        参数:
            target_column: 目标列名称(标签列)
            test_size: 测试集比例
            val_size: 验证集比例

        返回:
            dict: 包含所有中间结果和最终模型
        """
        print("\n" + "=" * 60)
        print("🚀 机器学习项目完整流程")
        print("=" * 60)

        # ==================== 第1步:数据收集 ====================
        print("\n📁 第1步:数据收集")
        print("-" * 40)

        # 收集用户数据(1000条)
        user_data = self.data_collector.collect_user_data(n_users=1000)
        # 收集行为数据(5000条)
        behavior_data = self.data_collector.collect_behavior_data(n_behaviors=5000, n_users=1000)

        # 合并数据:以内连接方式关联用户和行为
        merged_data = pd.merge(user_data, behavior_data, on='user_id', how='inner')


        # 创建目标变量:是否有购买行为(1=购买,0=未购买)
        merged_data['purchased'] = (merged_data['behavior_type'] == '购买').astype(int)
        print(merged_data.to_string())

        print(f"\n✓ 合并后数据: {len(merged_data)} 条记录")
        print(f"  购买比例: {merged_data['purchased'].mean():.2%}")  # 查看正负样本平衡情况
        # ==================== 第2步:数据准备 ====================
        print("\n🔧 第2步:数据准备")
        print("-" * 40)

        # 选择用于模型训练的特征列
        feature_columns = ['age', 'gender', 'city', 'duration']
        data_for_ml = merged_data[feature_columns + ['purchased']].copy()
        # 在准备数据之前,先保存原始特征和目标
        X_full = data_for_ml[feature_columns].copy()
        y_full = data_for_ml['purchased'].copy()

        # 划分数据(使用相同的 random_state 保证一致性)
        from sklearn.model_selection import train_test_split
        X_temp, X_test_raw, y_temp, y_test_raw = train_test_split(
            X_full, y_full, test_size=test_size, random_state=42, stratify=y_full
        )
        val_ratio = val_size / (1 - test_size)
        X_train_raw, X_val_raw, y_train, y_val = train_test_split(
            X_temp, y_temp, test_size=val_ratio, random_state=42, stratify=y_temp
        )
        # 初始化数据准备器
        self.data_preparer = DataPreparer(data_for_ml)
        # 执行数据准备(清洗、特征工程、转换、划分)
        splits = self.data_preparer.prepare_data(
            target_column=target_column,
            test_size=test_size,
            val_size=val_size,
            random_state=42
        )

        # 将原始数据添加到 splits 中
        splits['X_train_raw'] = X_train_raw
        splits['X_val_raw'] = X_val_raw
        splits['X_test_raw'] = X_test_raw


        # ==================== 第3步:模型训练 ====================
        print("\n🤖 第3步:模型训练")
        print("-" * 40)

        # 注册要使用的模型
        self.model_trainer.register_model(
            '逻辑回归',
            LogisticRegression(random_state=42, max_iter=1000),
            'classification'
        )
        # 随机森林既可以做分类,也可以做回归。
        self.model_trainer.register_model(
            '随机森林',
            RandomForestClassifier(n_estimators=100, random_state=42),
            'classification'
        )

        # 批量训练所有注册的模型
        trained_models = self.model_trainer.train_all_models(
            splits['X_train'], splits['y_train']
        )

        # ==================== 第4步:模型评估 ====================
        print("\n📊 第4步:模型评估")
        print("-" * 40)

        # 在测试集上评估所有模型
        results = self.model_trainer.evaluate_models(splits['X_test'], splits['y_test'])

        # 选出最佳模型(基于准确率)
        best_name, best_model = self.model_trainer.get_best_model(
            results, metric='accuracy', higher_is_better=True
        )

        # 使用 ModelEvaluator 进行更详细的评估
        if best_model:
            y_pred = best_model.predict(splits['X_test'])
            y_prob = best_model.predict_proba(splits['X_test']) if hasattr(best_model, 'predict_proba') else None
            self.model_evaluator.evaluate_classification(
                splits['y_test'], y_pred, y_prob, best_name
            )

        # ==================== 第5步:模型部署 ====================
        print("\n📦 第5步:模型部署")
        print("-" * 40)

        # 保存最佳模型和预处理器
        model_path = self.model_deployer.save_model(
            best_model,
            best_name,
            preprocessor=self.data_preparer
        )

        # 创建预测服务(可用于生产环境)
        prediction_service = self.model_deployer.create_prediction_service(model_path)

        # 测试预测服务
        print("\n🧪 测试预测服务...")
        # test_input = splits['X_test'].head(3)  # 取测试集前3条
        # print("\n测试输入:")
        # print(test_input)
        raw_test_input = splits['X_test_raw'].head(3)
        print("\n测试输入(原始数据 - 包含原始列名):")
        print(raw_test_input)

        # prediction_result = prediction_service(test_input)
        prediction_result = prediction_service(raw_test_input)
        print("\n预测结果:")
        print(json.dumps(prediction_result, indent=2, ensure_ascii=False))

        # ==================== 完成 ====================
        print("\n" + "=" * 60)
        print("✅ 机器学习项目流程完成!")
        print("=" * 60)

        # 返回所有结果,供后续分析使用
        return {
            'data': data_for_ml,  # 原始数据
            'splits': splits,  # 数据划分结果
            'best_model': best_model,  # 最佳模型
            'best_model_name': best_name,  # 最佳模型名称
            'evaluation_results': results,  # 评估结果
            'prediction_service': prediction_service  # 预测服务函数
        }


# =================================================================================
# 程序入口
# =================================================================================
if __name__ == "__main__":
    """
    当脚本被直接运行时(而不是作为模块导入),执行以下代码
    """
    # 创建流水线实例
    pipeline = MLProjectPipeline()

    # 运行完整流程(指定目标列名为 'purchased')
    project_results = pipeline.run_complete_pipeline(target_column='purchased')

    # # 打印最终结果摘要
    # if project_results:
    #     print("\n" + "=" * 60)
    #     print("📋 最终结果摘要")
    #     print("=" * 60)
    #     print(f"  最佳模型: {project_results['best_model_name']}")
    #     print(f"  评估结果: {project_results['evaluation_results']}")
    #     print(f"  训练集大小: {project_results['splits']['X_train'].shape}")
    #     print(f"  测试集大小: {project_results['splits']['X_test'].shape}")
    #
    #     # 可选:保存结果到文件
    #     import joblib
    #     joblib.dump(project_results['best_model'], 'final_model.pkl')
    #     print("\n✓ 最终模型已保存到 final_model.pkl")

产生下面问题的原因,我预测使用了预处理的数据

print("\n🧪 测试预测服务...")
test_input = splits['X_test'].head(3)  # 取测试集前3条
print("\n测试输入:")
print(test_input)

结果报错

{
  "error": "columns are missing: {'city', 'gender'}",
  "status": "error",
  "timestamp": "2026-05-19T11:18:05.136684"
}

为什么训练模型使用X_train ,预测时使用的X_test_raw

这是一个非常好的问题!它触及了机器学习流程中的一个核心概念:训练用的数据和预测用的数据格式必须一致

让我详细解释为什么会出现 X_train 和 X_train_raw 这两个东西。


一、问题核心:数据格式不同

数据集 内容 列名示例 用途
X_train 预处理后的数据 agegender_男city_北京duration 训练模型
X_test_raw 原始未处理的数据 agegendercityduration 模拟真实世界的原始输入

具体例子

原始数据 (X_test_raw)

   age gender city  duration
0   25     男   北京       120
1   30     女   上海       300
2   35     男   广州       450

预处理后的数据 (X_train)

   age  gender_男  city_北京  city_上海  city_广州  duration
0   25         1         1         0         0       120
1   30         0         0         1         0       300
2   35         1         0         0         1       450

二、为什么训练和预测要用不同格式的数据?

训练阶段:使用 X_train

模型学习的是 "预处理后的特征" 与 "标签" 之间的关系。

# 模型看到的是这样的数据
X_train (预处理后) → y_train (标签)
[25, 1, 1, 0, 0, 120] → 1 (购买)
[30, 0, 0, 1, 0, 300] → 0 (未购买)

预测阶段:应该使用 X_test_raw(或类似格式)

在真实世界中,新数据到来时是原始格式

# 新用户数据(原始格式)
new_user = {
    'age': 28,
    'gender': '女',
    'city': '深圳',
    'duration': 200
}

关键问题:模型不认识 '女' 和 '深圳',它只认识 0/1 数字!

所以预测服务的正确流程是:

原始数据 → 预处理器.transform() → 模型.predict()

三、错误原因回顾

你的错误代码:

# ❌ 错误:直接用预处理后的数据测试
test_input = splits['X_test'].head(3)  # 已经是 [25,1,1,0,...] 格式
prediction_result = prediction_service(test_input)

预测服务内部会再次尝试预处理:

def predict_service(input_data):
    # 尝试找 'gender' 列
    input_data['gender']  # ❌ 报错!因为 X_test 中已经没有 'gender' 列了

X_test 已经是处理后的数据,没有原始列名,所以预处理步骤失败了。


四、正确的流程对比

训练阶段流程

原始数据 (X_train_raw)
    │
    ▼
DataPreparer.transform()  # 预处理(编码、标准化)
    │
    ▼
处理后的数据 (X_train)
    │
    ▼
model.fit(X_train, y_train)  # 训练

预测阶段流程(正确)

新原始数据 (new_data_raw)   ← 这就是为什么需要 X_train_raw 格式
    │
    ▼
预处理器.transform()        ← 使用训练时拟合的预处理器
    │
    ▼
处理后的数据
    │
    ▼
model.predict()             ← 预测

五、为什么不直接用 X_train_raw 训练?

# 如果直接用原始数据训练
model.fit(X_train_raw, y_train)  # ❌ 不行!

原因

问题 说明
类别变量无法计算 模型无法处理 '男''女' 这样的字符串
数值范围不一致 age(0-100) 和 duration(0-1000) 尺度不同,会影响模型
无法学习类别关系 模型不理解 '北京' 和 '上海' 是不同类别

所以必须先预处理,再训练。


六、总结:两者关系

对比项 X_train X_test_raw
数据格式 数值(0/1、标准化值) 原始(字符串、数值混合)
列数 更多(One-Hot 后列数增加) 较少
用途 训练模型 模拟真实输入、验证预处理器
是否需要预处理 否(已经是预处理后) 是(需要预处理器转换)
模型能直接使用吗 ✅ 能 ❌ 不能

七、你的代码应该如何修复?

# 在 prepare_data 方法中,同时返回原始数据
splits = {
    # 预处理后的数据(用于训练和评估)
    'X_train': X_train_processed,
    'X_val': X_val_processed,
    'X_test': X_test_processed,
    
    # ✅ 原始数据(用于模拟真实预测)
    'X_train_raw': X_train_raw,
    'X_val_raw': X_val_raw,
    'X_test_raw': X_test_raw,
    
    'y_train': y_train,
    'y_val': y_val,
    'y_test': y_test
}



# 测试预测服务时,使用原始数据
test_input = splits['X_test_raw'].head(3)  # ✅ 使用原始格式
prediction_result = prediction_service(test_input)

一句话总结

模型训练用的是"处理后的数字",但真实世界的输入是"原始数据"。预处理器的作用就是在这两者之间做转换。所以测试预测服务时,要用原始数据来验证整个流程是否正确。

是否同样原因,保存模型的时候 同时保存预处理器?

是的,完全正确! 你理解到了机器学习工程化的核心要点。

预处理器保存了什么?

组件 保存的内容 用途
OneHotEncoder 类别和编码的映射关系 知道 '女' 应该变成 [0,1]'深圳' 应该变成 [0,0,0,1]
StandardScaler 均值和标准差 知道 age=28 应该减去多少、除以多少
特征列名 哪些列需要编码/标准化 知道新数据应该按什么顺序处理
LabelEncoder 标签映射(如有) 知道预测结果 1 对应什么业务含义

如果没有这些信息,预处理器无法正确转换新数据

完整的数据流图

┌─────────────────────────────────────────────────────────────────────────┐
│                           训练阶段                                       │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  原始数据 ──┬──► X_train_raw ──► 预处理器.fit_transform() ──► X_train   │
│            │                              │                            │
│            │                              ▼                            │
│            │                    保存预处理器参数                        │
│            │                    (类别映射、均值、标准差)                 │
│            │                                                            │
│            └──► y_train ──────────────────────────────────────────────►│
│                                                                         │
│                                           ▼                             │
│                                    model.fit(X_train, y_train)          │
│                                           │                             │
│                                           ▼                             │
│                                  保存 model + 预处理器                   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────────┐
│                          生产预测阶段                                    │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  加载 ──► model + 预处理器                                              │
│                                                                         │
│  新原始数据 ──► 预处理器.transform() ──► 处理后的数据 ──► model.predict()│
│  (包含 'gender', 'city')          (数值)                    (预测结果)   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

总结

问题 答案
为什么训练和预测数据格式不同? 模型只认识数字,但现实世界是字符串+数字混合
谁负责格式转换? 预处理器(Preprocessor)
预处理器需要保存吗? 必须保存,否则无法转换新数据
你代码中的设计正确吗? ✅ 正确,ModelDeployer 同时保存了模型和预处理器
你的错误原因? 测试时用了处理后的数据,而不是原始数据

一句话总结

模型只懂"数字",真实世界是"人话"。预处理器是翻译官,必须和模型一起保存,否则翻译官丢了,模型就变成了"哑巴"。你的错误正是因为没有用"人话"(原始数据)去问模型。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐