目录

第9章 工程实现与部署基础

9.1 数值计算与矩阵运算库

9.1.1 NumPy底层与向量化

9.1.1.1 广播机制的步长(Strides)操纵与内存视图

9.1.1.2 BLAS/LAPACK接口:矩阵乘法的Strassen算法与CPU缓存优化

9.1.1.3 稀疏矩阵格式(CSR/CSC/COO)的运算优化与图算法应用

9.1.2 自动微分框架原理

9.1.2.1 前向模式(Forward Mode)与对偶数(Dual Numbers)的JVP计算

9.1.2.2 反向模式(Reverse Mode)的磁带(Tape)与拓扑排序实现

9.1.2.3 混合模式微分(Hessian-vector products)的检查点(Checkpointing)技术

9.2 数据工程与特征处理

9.2.1 特征工程方法论

9.2.1.1 特征缩放:标准化(Z-score)vs归一化(Min-Max)的分布影响

9.2.1.2 多项式特征交互与SVD降维的维数灾难规避

9.2.1.3 目标编码(Target Encoding)与CatBoost的顺序统计 leak-proof 实现

9.2.2.2 Isolation Forest的递归随机划分与异常分数计算

9.2.2.3 稳健统计:Huber损失与RANSAC的随机采样一致性实现

9.3 模型序列化与部署初步

9.3.1 模型持久化

9.3.1.1 Pickle协议版本与自定义对象的安全序列化(joblib优化)

9.3.1.3 模型版本控制:MLflow与DVC的实验追踪与血缘管理

9.3.2 推理优化基础

9.3.2.1 量化感知训练(QAT)与后训练量化(PTQ)的线性映射实现

9.3.2.2 知识蒸馏:软标签(Soft Targets)与温度参数的温度退火

9.3.2.3 ONNX Runtime与TensorRT的图优化与算子融合


第9章 工程实现与部署基础

9.1 数值计算与矩阵运算库

9.1.1 NumPy底层与向量化

9.1.1.1 广播机制的步长(Strides)操纵与内存视图

NumPy的广播机制并非简单的维度扩展操作,其底层依赖于步长(Strides)的元数据操纵与内存视图的零拷贝技术。Strides定义了在多维数组中沿每个维度移动时所需的字节偏移量,这种元数据层面的操纵允许不同形状的数组在算术运算中共享底层内存而无需物理复制数据。当两个数组进行广播时,NumPy会构建一个虚拟的维度映射,其中缺失维度的步长被设为0,使得在遍历过程中该维度被重复读取而指针不发生实际位移。内存视图(Memory View)作为Python缓冲协议的核心实现,提供了对同一块内存的多维索引能力,通过修改shape与strides属性即可创建具有不同逻辑结构的数组视图,这种技术在高维张量切片、滑动窗口操作及图像卷积中展现出显著的内存效率优势。以下代码展示了Strides操纵在滑动窗口与广播运算中的底层实现机制。

Python

"""
【脚本说明】
本脚本演示NumPy的广播机制、步长(strides)操纵与内存视图(Memory View)的高级用法。
内容涵盖:
1. NumPy数组内存布局与strides的内部机制
2. 通过strides实现高效的滑动窗口操作
3. 内存视图的创建与零拷贝操作
4. 广播规则的底层实现原理
5. 大规模数据的高效切片与重组

使用方式:
- 直接运行查看可视化输出
- 修改数组尺寸参数观察不同规模下的性能差异
- 适用于图像处理、时间序列分析等需要高效内存操作的场景
"""

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import time
from typing import Tuple

class NumPyStrideExplorer:
    """探索NumPy的strides和内存布局机制"""
    
    def __init__(self):
        self.results = {}
    
    def demonstrate_memory_layout(self):
        """演示不同内存布局对性能的影响"""
        size = 5000
        
        # C-order (Row-major) vs F-order (Column-major)
        c_array = np.random.rand(size, size)
        f_array = np.asfortranarray(c_array)
        
        iterations = 10
        
        # C-order行访问 (连续内存)
        start = time.perf_counter()
        for _ in range(iterations):
            row_sum_c = np.sum(c_array[0, :])
        c_row_time = (time.perf_counter() - start) / iterations
        
        # C-order列访问 (跨步访问)
        start = time.perf_counter()
        for _ in range(iterations):
            col_sum_c = np.sum(c_array[:, 0])
        c_col_time = (time.perf_counter() - start) / iterations
        
        # F-order列访问 (连续内存)
        start = time.perf_counter()
        for _ in range(iterations):
            col_sum_f = np.sum(f_array[:, 0])
        f_col_time = (time.perf_counter() - start) / iterations
        
        return {
            'c_row': c_row_time,
            'c_col': c_col_time,
            'f_row': f_col_time,
            'f_col': f_col_time,
            'strides_c': c_array.strides,
            'strides_f': f_array.strides
        }
    
    def sliding_window_strides(self, arr: np.ndarray, window_size: int) -> np.ndarray:
        """
        使用strides实现高效的滑动窗口操作,无需复制数据
        
        原理:通过操纵shape和strides,创建具有重叠内存的视图
        """
        shape = (arr.shape[0] - window_size + 1, window_size) + arr.shape[1:]
        strides = (arr.strides[0],) + arr.strides
        
        # 创建内存视图
        windowed = np.lib.stride_tricks.as_strided(
            arr, shape=shape, strides=strides, writeable=False
        )
        return windowed
    
    def broadcast_mechanism_analysis(self):
        """分析广播机制的内存访问模式"""
        a = np.array([1, 2, 3, 4, 5])
        b = np.array([[1], [2], [3]])
        
        a_broadcasted = np.broadcast_to(a, (3, 5))
        b_broadcasted = np.broadcast_to(b, (3, 5))
        result = a + b
        
        return {
            'a_shape': a.shape,
            'a_strides': a.strides,
            'b_shape': b.shape,
            'b_strides': b.strides,
            'result_shape': result.shape,
            'result_strides': result.strides,
            'broadcasted_a_strides': a_broadcasted.strides,
            'broadcasted_b_strides': b_broadcasted.strides
        }
    
    def memory_view_operations(self):
        """演示内存视图的零拷贝操作"""
        data = np.arange(24).reshape(4, 6)
        transposed_view = data.T
        sliced_view = data[::2, ::2]
        reshaped_view = data.reshape(2, 12)
        
        views_share_memory = np.shares_memory(data, transposed_view)
        slice_shares_memory = np.shares_memory(data, sliced_view)
        
        return {
            'original': data,
            'transposed': transposed_view,
            'sliced': sliced_view,
            'reshaped': reshaped_view,
            'shares_memory_transpose': views_share_memory,
            'shares_memory_slice': slice_shares_memory,
            'original_strides': data.strides,
            'transposed_strides': transposed_view.strides,
            'sliced_strides': sliced_view.strides
        }
    
    def advanced_stride_manipulation(self):
        """高级的步长操纵:图像卷积的内存高效实现"""
        image = np.random.rand(64, 64, 3).astype(np.float32)
        kernel_size = 3
        stride = 1
        
        out_h = (image.shape[0] - kernel_size) // stride + 1
        out_w = (image.shape[1] - kernel_size) // stride + 1
        
        shape = (out_h, out_w, kernel_size, kernel_size, image.shape[2])
        strides = (
            image.strides[0] * stride,
            image.strides[1] * stride,
            image.strides[0],
            image.strides[1],
            image.strides[2]
        )
        
        windows = np.lib.stride_tricks.as_strided(
            image, shape=shape, strides=strides, writeable=False
        )
        
        return {
            'original_shape': image.shape,
            'windows_shape': windows.shape,
            'windows_nbytes': windows.nbytes,
            'original_nbytes': image.nbytes,
            'memory_efficiency': image.nbytes / (windows.size * windows.itemsize)
        }

if __name__ == "__main__":
    explorer = NumPyStrideExplorer()
    
    print("[1] 内存布局与性能分析")
    layout_results = explorer.demonstrate_memory_layout()
    print(f"C-order strides: {layout_results['strides_c']}")
    print(f"F-order strides: {layout_results['strides_f']}")
    print(f"C-order行访问时间: {layout_results['c_row']*1000:.3f} ms")
    print(f"C-order列访问时间: {layout_results['c_col']*1000:.3f} ms")
    print(f"F-order列访问时间: {layout_results['f_col']*1000:.3f} ms")
    
    print("\n[2] 广播机制底层分析")
    broadcast_results = explorer.broadcast_mechanism_analysis()
    print(f"数组a shape: {broadcast_results['a_shape']}, strides: {broadcast_results['a_strides']}")
    print(f"数组b shape: {broadcast_results['b_shape']}, strides: {broadcast_results['b_strides']}")
    print(f"广播后a strides: {broadcast_results['broadcasted_a_strides']} (虚拟步长为0)")
    print(f"广播后b strides: {broadcast_results['broadcasted_b_strides']} (虚拟步长为0)")
    
    print("\n[3] 内存视图操作")
    view_results = explorer.memory_view_operations()
    print(f"转置视图与原数组共享内存: {view_results['shares_memory_transpose']}")
    print(f"切片视图与原数组共享内存: {view_results['shares_memory_slice']}")
    print(f"原数组strides: {view_results['original_strides']}")
    print(f"转置后strides: {view_results['transposed_strides']}")
    print(f"切片后strides: {view_results['sliced_strides']}")
    
    print("\n[4] 高级步长操纵:卷积滑动窗口")
    conv_results = explorer.advanced_stride_manipulation()
    print(f"原始图像shape: {conv_results['original_shape']}")
    print(f"滑动窗口shape: {conv_results['windows_shape']}")
    print(f"原始数据内存: {conv_results['original_nbytes']} bytes")
    print(f"窗口视图开销: {conv_results['windows_nbytes']} bytes")
    print(f"内存效率提升: {conv_results['memory_efficiency']:.1f}x")
9.1.1.2 BLAS/LAPACK接口:矩阵乘法的Strassen算法与CPU缓存优化

矩阵乘法的计算效率在很大程度上取决于算法对CPU缓存层次结构的利用程度。Strassen算法通过将矩阵分块并减少乘法次数(从8次降至7次)实现了低于传统O(n³)的渐近复杂度,尽管其常数因子较大,但在大规模矩阵运算中展现出显著优势。该算法的递归特性与缓存分块策略天然契合,通过将矩阵划分为适合L1/L2缓存的子块,减少了主存访问次数。BLAS(Basic Linear Algebra Subprograms)接口提供了标准化的矩阵运算原语,其中GEMM(General Matrix Multiply)操作经过高度优化,利用向量化指令集与缓存预取技术。现代实现通常结合Strassen算法于递归基例之上,并在递归深度达到阈值时切换至标准BLAS调用,以平衡理论复杂度与实际常数因子。以下代码实现了Strassen算法的缓存优化版本及分块矩阵乘法。

Python

"""
【脚本说明】
本脚本实现Strassen矩阵乘法算法与CPU缓存优化技术,包含:
1. 标准矩阵乘法与NumPy BLAS调用的性能对比
2. Strassen算法的递归实现与缓存优化
3. Morton序(Z-order)存储布局的实现与应用
4. 分块矩阵乘法(Block Matrix Multiplication)优化
5. 缓存命中率分析与性能可视化

使用方式:
- 运行脚本将自动执行性能基准测试
- 调整矩阵大小观察不同规模下的算法表现
- 需要scipy库用于BLAS接口调用
"""

import numpy as np
import time
import matplotlib.pyplot as plt
from typing import Tuple, Optional
import scipy.linalg.blas as blas

class MatrixMultiplicationOptimizer:
    """矩阵乘法优化器:实现Strassen算法与缓存优化"""
    
    def __init__(self, threshold: int = 64):
        self.threshold = threshold
        self.performance_logs = []
    
    def standard_multiply(self, A: np.ndarray, B: np.ndarray) -> np.ndarray:
        """标准矩阵乘法 O(n^3)"""
        n = A.shape[0]
        C = np.zeros((n, n))
        for i in range(n):
            for j in range(n):
                for k in range(n):
                    C[i, j] += A[i, k] * B[k, j]
        return C
    
    def blas_multiply(self, A: np.ndarray, B: np.ndarray) -> np.ndarray:
        """调用BLAS库进行矩阵乘法"""
        return blas.dgemm(alpha=1.0, a=A, b=B)
    
    def strassen_multiply(self, A: np.ndarray, B: np.ndarray) -> np.ndarray:
        """
        Strassen矩阵乘法算法 O(n^2.807)
        通过减少乘法次数(7次而非8次)来提升渐近复杂度
        """
        n = A.shape[0]
        
        if n <= self.threshold:
            return np.dot(A, B)
        
        if n % 2 != 0:
            A_padded = np.pad(A, ((0, 1), (0, 1)), mode='constant')
            B_padded = np.pad(B, ((0, 1), (0, 1)), mode='constant')
            C_padded = self._strassen_recursive(A_padded, B_padded)
            return C_padded[:n, :n]
        
        return self._strassen_recursive(A, B)
    
    def _strassen_recursive(self, A: np.ndarray, B: np.ndarray) -> np.ndarray:
        """Strassen递归实现"""
        n = A.shape[0]
        
        if n <= self.threshold:
            return np.dot(A, B)
        
        mid = n // 2
        A11 = A[:mid, :mid]
        A12 = A[:mid, mid:]
        A21 = A[mid:, :mid]
        A22 = A[mid:, mid:]
        
        B11 = B[:mid, :mid]
        B12 = B[:mid, mid:]
        B21 = B[mid:, :mid]
        B22 = B[mid:, mid:]
        
        M1 = self._strassen_recursive(A11 + A22, B11 + B22)
        M2 = self._strassen_recursive(A21 + A22, B11)
        M3 = self._strassen_recursive(A11, B12 - B22)
        M4 = self._strassen_recursive(A22, B21 - B11)
        M5 = self._strassen_recursive(A11 + A12, B22)
        M6 = self._strassen_recursive(A21 - A11, B11 + B12)
        M7 = self._strassen_recursive(A12 - A22, B21 + B22)
        
        C11 = M1 + M4 - M5 + M7
        C12 = M3 + M5
        C21 = M2 + M4
        C22 = M1 - M2 + M3 + M6
        
        C = np.zeros((n, n))
        C[:mid, :mid] = C11
        C[:mid, mid:] = C12
        C[mid:, :mid] = C21
        C[mid:, mid:] = C22
        
        return C
    
    def block_matrix_multiply(self, A: np.ndarray, B: np.ndarray, block_size: int = 32) -> np.ndarray:
        """
        分块矩阵乘法优化:利用缓存层次结构
        通过将大矩阵分解为适合缓存的小块来提升性能
        """
        n = A.shape[0]
        C = np.zeros((n, n))
        
        for i0 in range(0, n, block_size):
            i1 = min(i0 + block_size, n)
            for j0 in range(0, n, block_size):
                j1 = min(j0 + block_size, n)
                for k0 in range(0, n, block_size):
                    k1 = min(k0 + block_size, n)
                    C[i0:i1, j0:j1] += np.dot(A[i0:i1, k0:k1], B[k0:k1, j0:j1])
        
        return C
    
    def benchmark_algorithms(self, sizes: list) -> dict:
        """基准测试不同矩阵乘法实现"""
        results = {
            'sizes': sizes,
            'numpy_dot': [],
            'blas_gemm': [],
            'strassen': [],
            'block_multiply': [],
            'standard': []
        }
        
        for size in sizes:
            print(f"\n测试矩阵大小: {size}x{size}")
            A = np.random.rand(size, size)
            B = np.random.rand(size, size)
            
            start = time.perf_counter()
            _ = np.dot(A, B)
            elapsed = time.perf_counter() - start
            results['numpy_dot'].append(elapsed)
            
            start = time.perf_counter()
            _ = blas.dgemm(alpha=1.0, a=A, b=B)
            elapsed = time.perf_counter() - start
            results['blas_gemm'].append(elapsed)
            
            start = time.perf_counter()
            _ = self.block_matrix_multiply(A, B, block_size=min(64, size//4))
            elapsed = time.perf_counter() - start
            results['block_multiply'].append(elapsed)
            
            if size >= 128 and size <= 2048:
                start = time.perf_counter()
                _ = self.strassen_multiply(A, B)
                elapsed = time.perf_counter() - start
                results['strassen'].append(elapsed)
            else:
                results['strassen'].append(None)
            
            if size <= 256:
                start = time.perf_counter()
                _ = self.standard_multiply(A, B)
                elapsed = time.perf_counter() - start
                results['standard'].append(elapsed)
            else:
                results['standard'].append(None)
        
        return results

if __name__ == "__main__":
    optimizer = MatrixMultiplicationOptimizer(threshold=64)
    
    print("[1] 算法正确性验证")
    A = np.random.rand(256, 256)
    B = np.random.rand(256, 256)
    
    C_numpy = np.dot(A, B)
    C_strassen = optimizer.strassen_multiply(A, B)
    C_block = optimizer.block_matrix_multiply(A, B, block_size=64)
    
    error_strassen = np.max(np.abs(C_numpy - C_strassen))
    error_block = np.max(np.abs(C_numpy - C_block))
    
    print(f"Strassen算法最大误差: {error_strassen:.2e}")
    print(f"分块乘法最大误差: {error_block:.2e}")
    
    print("\n[2] 性能基准测试")
    test_sizes = [64, 128, 256, 512, 1024]
    results = optimizer.benchmark_algorithms(test_sizes)
    
    print("\n[3] 缓存块大小优化分析")
    matrix_size = 512
    A = np.random.rand(matrix_size, matrix_size)
    B = np.random.rand(matrix_size, matrix_size)
    
    block_sizes = [16, 32, 64, 128, 256]
    for bs in block_sizes:
        start = time.perf_counter()
        _ = optimizer.block_matrix_multiply(A, B, block_size=bs)
        elapsed = time.perf_counter() - start
        print(f"Block size {bs:3d}: {elapsed:.4f}s")
9.1.1.3 稀疏矩阵格式(CSR/CSC/COO)的运算优化与图算法应用

稀疏矩阵存储格式的选择直接决定了线性代数运算的内存效率与计算吞吐量。CSR(Compressed Sparse Row)格式通过压缩行指针数组将非零元素的列索引与值分别存储,实现了O(1)的行访问复杂度,特别适用于稀疏矩阵-向量乘法(SpMV)中的行并行计算。CSC(Compressed Sparse Column)格式则优化了列切片操作,在最小二乘等列优先算法中表现优异。COO(Coordinate)格式以三元组形式存储非零元素,支持快速的稀疏矩阵构造与格式转换,但运算效率相对较低。在图算法应用中,稀疏矩阵的邻接表示与线性代数运算存在深度同构:PageRank迭代可表达为稀疏矩阵-向量乘法的幂方法,连通分量分析可通过稀疏矩阵的传递闭包实现,而广度优先搜索则可利用布尔掩码与SpMV的迭代应用模拟层级遍历。以下代码实现了三种格式的手动SpMV及基于稀疏矩阵的图算法。

Python

"""
【脚本说明】
本脚本实现稀疏矩阵的三种主要存储格式(CSR、CSC、COO)及其运算优化,包含:
1. CSR(Compressed Sparse Row)、CSC(Compressed Sparse Column)、COO(Coordinate)格式的实现
2. 稀疏矩阵-向量乘法(SpMV)的高效实现
3. 稀疏矩阵-矩阵乘法(SpMM)算法
4. 基于稀疏矩阵的图算法(PageRank、连通分量)
5. 格式转换开销分析与内存占用对比
6. 稀疏度对性能的影响可视化

使用方式:
- 直接运行执行完整基准测试
- 调整稀疏度和矩阵大小观察不同场景下的性能
- 适用于图神经网络、推荐系统、网络分析等场景
"""

import numpy as np
import time
import matplotlib.pyplot as plt
from scipy import sparse
from scipy.sparse import csr_matrix, csc_matrix, coo_matrix
from scipy.sparse.csgraph import connected_components
from typing import Tuple, List, Dict
import heapq

class SparseMatrixFormats:
    """稀疏矩阵格式实现与运算优化"""
    
    def __init__(self):
        self.format_stats = {}
    
    def create_sparse_matrix(self, n: int, density: float, format_type: str = 'csr'):
        """创建指定格式的稀疏矩阵"""
        np.random.seed(42)
        matrix = sparse.random(n, n, density=density, format=format_type, dtype=np.float64)
        if format_type == 'csr':
            matrix = matrix + matrix.T
        return matrix
    
    def spmv_csr_manual(self, data: np.ndarray, indices: np.ndarray, 
                       indptr: np.ndarray, x: np.ndarray) -> np.ndarray:
        """
        手动实现CSR格式的稀疏矩阵-向量乘法 (SpMV)
        
        CSR格式:
        - data: 非零元素值
        - indices: 非零元素的列索引
        - indptr: 行指针,indptr[i]表示第i行的起始位置
        """
        n_rows = len(indptr) - 1
        y = np.zeros(n_rows)
        
        for i in range(n_rows):
            row_start = indptr[i]
            row_end = indptr[i + 1]
            
            for j in range(row_start, row_end):
                y[i] += data[j] * x[indices[j]]
        
        return y
    
    def spmv_coo_manual(self, data: np.ndarray, row: np.ndarray, 
                       col: np.ndarray, x: np.ndarray, n_rows: int) -> np.ndarray:
        """
        手动实现COO格式的稀疏矩阵-向量乘法
        
        COO格式:
        - data: 非零元素值
        - row: 行索引
        - col: 列索引
        """
        y = np.zeros(n_rows)
        
        for i in range(len(data)):
            y[row[i]] += data[i] * x[col[i]]
        
        return y
    
    def memory_analysis(self, n: int, densities: List[float]) -> Dict:
        """分析不同稀疏度下的内存占用"""
        results = {'densities': densities, 'csr': [], 'csc': [], 'coo': [], 'dense': []}
        
        for density in densities:
            dense = np.random.rand(n, n)
            dense[dense > density] = 0
            
            csr = csr_matrix(dense)
            csc = csc_matrix(dense)
            coo = coo_matrix(dense)
            
            dense_mem = dense.nbytes
            csr_mem = csr.data.nbytes + csr.indices.nbytes + csr.indptr.nbytes
            csc_mem = csc.data.nbytes + csc.indices.nbytes + csc.indptr.nbytes
            coo_mem = coo.data.nbytes + coo.row.nbytes + coo.col.nbytes
            
            results['dense'].append(dense_mem / 1024 / 1024)
            results['csr'].append(csr_mem / 1024 / 1024)
            results['csc'].append(csc_mem / 1024 / 1024)
            results['coo'].append(coo_mem / 1024 / 1024)
        
        return results
    
    def pagerank_spmv(self, adjacency: csr_matrix, alpha: float = 0.85, 
                     max_iter: int = 100, tol: float = 1e-6) -> np.ndarray:
        """
        使用稀疏矩阵-向量乘法实现PageRank算法
        
        PageRank: PR(i) = (1-alpha)/N + alpha * sum(PR(j)/L(j) for j in inlinks(i))
        """
        n = adjacency.shape[0]
        
        out_degrees = np.array(adjacency.sum(axis=0)).flatten()
        out_degrees[out_degrees == 0] = 1
        
        M = adjacency.T
        M = M.multiply(1.0 / out_degrees)
        M = M.tocsr()
        
        pr = np.ones(n) / n
        teleport = (1 - alpha) / n
        
        for iteration in range(max_iter):
            new_pr = alpha * M.dot(pr) + teleport
            
            delta = np.linalg.norm(new_pr - pr, ord=1)
            pr = new_pr
            
            if delta < tol:
                print(f"PageRank converged after {iteration + 1} iterations")
                break
        
        return pr
    
    def connected_components_analysis(self, adjacency: csr_matrix) -> Tuple[int, np.ndarray]:
        """
        使用稀疏矩阵进行连通分量分析
        返回连通分量数量和每个节点的标签
        """
        n_components, labels = connected_components(adjacency, directed=False, return_labels=True)
        return n_components, labels

class GraphAlgorithms:
    """基于稀疏矩阵的图算法实现"""
    
    @staticmethod
    def dijkstra_spmsv(adjacency: csr_matrix, start: int) -> np.ndarray:
        """
        使用稀疏矩阵-向量乘法风格的Dijkstra算法
        利用稀疏矩阵的行切片获取邻居
        """
        n = adjacency.shape[0]
        distances = np.full(n, np.inf)
        distances[start] = 0
        visited = np.zeros(n, dtype=bool)
        
        pq = [(0, start)]
        
        while pq:
            current_dist, u = heapq.heappop(pq)
            
            if visited[u]:
                continue
            visited[u] = True
            
            row = adjacency[u]
            neighbors = row.indices
            weights = row.data
            
            for v, weight in zip(neighbors, weights):
                if not visited[v]:
                    new_dist = current_dist + weight
                    if new_dist < distances[v]:
                        distances[v] = new_dist
                        heapq.heappush(pq, (new_dist, v))
        
        return distances
    
    @staticmethod
    def breadth_first_search_spmsv(adjacency: csr_matrix, start: int) -> np.ndarray:
        """
        使用稀疏矩阵-向量乘法的BFS实现
        利用布尔掩码和SpMV实现层级遍历
        """
        n = adjacency.shape[0]
        levels = np.full(n, -1, dtype=int)
        levels[start] = 0
        
        current_level = np.zeros(n, dtype=bool)
        current_level[start] = True
        
        level = 0
        while current_level.sum() > 0:
            level += 1
            next_level_mask = adjacency.T @ current_level.astype(float)
            next_level = (next_level_mask > 0) & (levels == -1)
            
            if not next_level.any():
                break
            
            levels[next_level] = level
            current_level = next_level
        
        return levels

if __name__ == "__main__":
    sparse_ops = SparseMatrixFormats()
    
    print("[1] 稀疏矩阵格式内存分析")
    n = 5000
    densities = [0.001, 0.01, 0.1]
    for density in densities:
        mem_results = sparse_ops.memory_analysis(n, [density])
        print(f"Density {density}:")
        print(f"  Dense: {mem_results['dense'][0]:.2f} MB")
        print(f"  CSR: {mem_results['csr'][0]:.2f} MB")
        print(f"  Compression ratio: {mem_results['dense'][0]/mem_results['csr'][0]:.1f}x")
    
    print("\n[2] PageRank算法实现 (基于SpMV)")
    n_pages = 1000
    density = 0.02
    adjacency = sparse_ops.create_sparse_matrix(n_pages, density, 'csr')
    
    start = time.perf_counter()
    pr = sparse_ops.pagerank_spmv(adjacency, alpha=0.85)
    elapsed = time.perf_counter() - start
    print(f"PageRank计算完成,耗时: {elapsed:.4f}s")
    print(f"Top 5 PageRank scores: {np.sort(pr)[-5:]}")
    
    print("\n[3] 连通分量分析")
    n_comp, labels = sparse_ops.connected_components_analysis(adjacency)
    print(f"发现 {n_comp} 个连通分量")
    unique, counts = np.unique(labels, return_counts=True)
    print(f"最大连通分量大小: {max(counts)} 个节点")
    
    print("\n[4] 图算法性能对比")
    n_small = 500
    adj_small = sparse_ops.create_sparse_matrix(n_small, 0.05, 'csr')
    adj_small.data = np.abs(adj_small.data) + 0.1
    
    graph_algo = GraphAlgorithms()
    
    start = time.perf_counter()
    distances = graph_algo.dijkstra_spmsv(adj_small, start=0)
    elapsed = time.perf_counter() - start
    print(f"Dijkstra算法 (SpMSpV风格): {elapsed:.4f}s")
    
    start = time.perf_counter()
    levels = graph_algo.breadth_first_search_spmsv(adj_small, start=0)
    elapsed = time.perf_counter() - start
    print(f"BFS层级遍历 (SpMV风格): {elapsed:.4f}s")

9.1.2 自动微分框架原理

9.1.2.1 前向模式(Forward Mode)与对偶数(Dual Numbers)的JVP计算

前向模式自动微分基于对偶数(Dual Numbers)代数结构,通过将每个实数扩展为包含原始值与方向导数分量的二元组,实现了计算过程中导数的同步传播。对偶数定义为x = v + εv',其中ε满足ε² = 0且ε ≠ 0,这种幂零性质使得多项式函数的泰勒展开自动截断至一阶项,从而精确计算出函数值与导数。雅可比-向量积(Jacobian-Vector Product, JVP)的计算通过将输入向量与方向向量捆绑为对偶数输入,前向传播过程中所有算术运算自动遵循对偶数代数规则,最终输出同时包含函数值与沿该方向的导数。前向模式的时间复杂度与原始函数计算同阶,但在多输入单输出场景下需多次前向传递才能获取完整梯度,因此更适用于输入维度较低或需要计算Jacobian向量积的场合。以下代码实现了基于对偶数的前向模式自动微分框架。

Python

"""
【脚本说明】
本脚本实现前向模式自动微分框架,基于对偶数(Dual Numbers)代数结构,包含:
1. 对偶数类的实现与运算符重载
2. 基本数学函数(sin, cos, exp, log)的对偶数扩展
3. 雅可比-向量积(JVP)的计算实现
4. 多层感知机(MLP)的前向模式微分
5. 与数值微分(有限差分)的精度对比

使用方式:
- 直接运行查看基本运算与神经网络梯度计算示例
- 修改网络结构或输入维度观察JVP计算性能
- 适用于需要高效计算Jacobian向量积的场景
"""

import numpy as np
import time
from typing import Callable, List, Union

class DualNumber:
    """
    对偶数实现:x = v + εv',其中 ε² = 0
    通过运算符重载实现自动微分
    """
    
    def __init__(self, value: float, derivative: float = 0.0):
        self.value = value
        self.derivative = derivative
    
    def __repr__(self):
        return f"Dual({self.value}, {self.derivative})"
    
    def __add__(self, other):
        if isinstance(other, DualNumber):
            return DualNumber(self.value + other.value, 
                            self.derivative + other.derivative)
        else:
            return DualNumber(self.value + other, self.derivative)
    
    def __radd__(self, other):
        return self.__add__(other)
    
    def __sub__(self, other):
        if isinstance(other, DualNumber):
            return DualNumber(self.value - other.value, 
                            self.derivative - other.derivative)
        else:
            return DualNumber(self.value - other, self.derivative)
    
    def __rsub__(self, other):
        return DualNumber(other - self.value, -self.derivative)
    
    def __mul__(self, other):
        if isinstance(other, DualNumber):
            # (u + εu')(v + εv') = uv + ε(uv' + u'v)
            return DualNumber(self.value * other.value,
                            self.value * other.derivative + self.derivative * other.value)
        else:
            return DualNumber(self.value * other, self.derivative * other)
    
    def __rmul__(self, other):
        return self.__mul__(other)
    
    def __truediv__(self, other):
        if isinstance(other, DualNumber):
            # (u + εu')/(v + εv') = (u/v) + ε((u'v - uv')/v²)
            return DualNumber(self.value / other.value,
                            (self.derivative * other.value - self.value * other.derivative) / 
                            (other.value ** 2))
        else:
            return DualNumber(self.value / other, self.derivative / other)
    
    def __pow__(self, power):
        # (u + εu')^n = u^n + ε(n*u^(n-1)*u')
        return DualNumber(self.value ** power,
                        power * self.value ** (power - 1) * self.derivative)

def sin(x):
    if isinstance(x, DualNumber):
        return DualNumber(np.sin(x.value), np.cos(x.value) * x.derivative)
    return np.sin(x)

def cos(x):
    if isinstance(x, DualNumber):
        return DualNumber(np.cos(x.value), -np.sin(x.value) * x.derivative)
    return np.cos(x)

def exp(x):
    if isinstance(x, DualNumber):
        val = np.exp(x.value)
        return DualNumber(val, val * x.derivative)
    return np.exp(x)

def log(x):
    if isinstance(x, DualNumber):
        return DualNumber(np.log(x.value), x.derivative / x.value)
    return np.log(x)

class ForwardModeAD:
    """前向模式自动微分实现"""
    
    @staticmethod
    def jvp(func: Callable, x: np.ndarray, v: np.ndarray) -> tuple:
        """
        计算雅可比-向量积(Jacobian-Vector Product)
        
        Args:
            func: 目标函数,接受对偶数数组,返回对偶数或数组
            x: 输入点 (n,)
            v: 方向向量 (n,)
        
        Returns:
            (函数值, JVP结果)
        """
        # 将对偶数与方向向量捆绑
        x_dual = np.array([DualNumber(xi, vi) for xi, vi in zip(x, v)])
        
        # 前向传播
        result = func(x_dual)
        
        if isinstance(result, DualNumber):
            return result.value, result.derivative
        else:
            # 处理多维输出
            values = np.array([r.value for r in result])
            derivatives = np.array([r.derivative for r in result])
            return values, derivatives
    
    @staticmethod
    def gradient(func: Callable, x: np.ndarray) -> np.ndarray:
        """
        计算梯度(适用于标量输出函数)
        通过对每个输入方向计算JVP得到完整梯度
        """
        n = len(x)
        grad = np.zeros(n)
        f_val = None
        
        for i in range(n):
            v = np.zeros(n)
            v[i] = 1.0
            val, d = ForwardModeAD.jvp(func, x, v)
            if f_val is None:
                f_val = val
            grad[i] = d
        
        return f_val, grad

class MLPForwardAD:
    """使用对偶数实现的多层感知机前向模式微分"""
    
    def __init__(self, layer_sizes: List[int]):
        self.layer_sizes = layer_sizes
        self.weights = []
        self.biases = []
        
        # 初始化权重和偏置
        for i in range(len(layer_sizes) - 1):
            w = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.1
            b = np.zeros(layer_sizes[i+1])
            self.weights.append(w)
            self.biases.append(b)
    
    def forward(self, x: Union[np.ndarray, List[DualNumber]], 
                compute_grad: bool = False) -> Union[np.ndarray, List[DualNumber]]:
        """
        前向传播,支持对偶数输入以实现自动微分
        """
        if compute_grad:
            # 将输入转换为对偶数
            if not isinstance(x[0], DualNumber):
                x = [DualNumber(xi, 0.0) for xi in x]
        
        current = x
        for w, b in zip(self.weights, self.biases):
            # 线性变换
            if isinstance(current[0], DualNumber):
                # 对偶数矩阵乘法
                new_current = []
                for j in range(w.shape[1]):
                    val = sum(current[i] * w[i, j] for i in range(w.shape[0])) + b[j]
                    new_current.append(val)
                current = new_current
            else:
                current = np.dot(current, w) + b
            
            # ReLU激活(支持对偶数)
            if isinstance(current[0], DualNumber):
                current = [DualNumber(max(0, c.value), c.derivative if c.value > 0 else 0) 
                          for c in current]
            else:
                current = np.maximum(0, current)
        
        return current
    
    def compute_gradient(self, x: np.ndarray, target: np.ndarray) -> List[np.ndarray]:
        """
        计算损失函数对参数的梯度(使用对偶数)
        """
        # 定义损失函数(MSE)
        def loss_func(x_dual):
            pred = self.forward(x_dual, compute_grad=True)
            diff = [p - t for p, t in zip(pred, target)]
            # 计算MSE
            loss = sum(d * d for d in diff) / len(diff)
            return loss
        
        # 使用JVP计算梯度
        grad_weights = []
        grad_biases = []
        
        # 对每个参数分别计算(简化示例,实际应使用向量化的JVP)
        # 这里仅演示输入梯度计算
        _, grad_input = ForwardModeAD.gradient(loss_func, x)
        
        return grad_input

if __name__ == "__main__":
    print("[1] 基本运算自动微分测试")
    x = DualNumber(2.0, 1.0)  # 在x=2处求导
    
    # f(x) = x^2 + 3x + 1, f'(x) = 2x + 3
    f = x**2 + 3*x + 1
    print(f"f(x) = x^2 + 3x + 1 at x=2: value={f.value}, derivative={f.derivative}")
    print(f"Expected: value=11, derivative=7")
    
    # f(x) = sin(x) * exp(x)
    x = DualNumber(1.0, 1.0)
    f = sin(x) * exp(x)
    analytical = np.cos(1.0)*np.exp(1.0) + np.sin(1.0)*np.exp(1.0)
    print(f"\nf(x) = sin(x)*exp(x) at x=1:")
    print(f"  Computed: {f.derivative:.6f}")
    print(f"  Analytical: {analytical:.6f}")
    
    print("\n[2] 多元函数JVP计算")
    def func(x):
        # f1 = x0^2 + x1
        # f2 = x0 * x1
        f1 = x[0]**2 + x[1]
        f2 = x[0] * x[1]
        return [f1, f2]
    
    x = np.array([2.0, 3.0])
    v = np.array([1.0, 0.0])  # 对x0求偏导
    
    val, jvp = ForwardModeAD.jvp(func, x, v)
    print(f"Jacobian-Vector Product at x=[2,3] in direction [1,0]:")
    print(f"  Function value: {val}")
    print(f"  JVP: {jvp}")
    print(f"  Expected JVP (df1/dx0, df2/dx0): [4, 3]")
    
    print("\n[3] 神经网络前向模式微分")
    mlp = MLPForwardAD([2, 4, 1])
    x_input = np.array([1.0, 0.5])
    target = np.array([1.0])
    
    output = mlp.forward(x_input)
    print(f"MLP output: {output}")
    
    grad_input = mlp.compute_gradient(x_input, target)
    print(f"Input gradient: {grad_input}")
9.1.2.2 反向模式(Reverse Mode)的磁带(Tape)与拓扑排序实现

反向模式自动微分通过构建计算图并在图上执行反向传播来高效计算梯度,特别适用于输入维度远高于输出维度的机器学习场景。计算图以有向无环图(DAG)形式记录所有计算操作,其中节点表示变量,边表示操作依赖关系。磁带(Tape)或Wengert列表作为计算图的线性化表示,按执行顺序记录每个操作及其输入输出引用,这种结构支持高效的反向遍历。拓扑排序确保在反向传播过程中,每个节点的梯度在其所有后继节点梯度计算完成后才被计算,符合链式法则的依赖要求。反向传播阶段从输出节点开始,将输出梯度(通常为1)沿边反向传播,通过累加各路径贡献计算每个中间节点的伴随变量(adjoint)。该方法的时间复杂度与正传播同阶,但空间复杂度需存储计算图或重计算中间值,通过检查点技术可在时间与空间之间进行权衡。以下代码实现了基于磁带的反向模式自动微分系统。

Python

"""
【脚本说明】
本脚本实现反向模式自动微分框架,基于计算图与磁带(Tape)机制,包含:
1. 计算图节点的实现(Variable类)
2. 磁带(Tape)的记录与回放机制
3. 拓扑排序确保正确的梯度传播顺序
4. 常见运算(加减乘除、幂、激活函数)的反向传播实现
5. 计算图可视化与梯度检查

使用方式:
- 直接运行查看自动微分示例与计算图结构
- 构建复杂计算图观察反向传播过程
- 适用于深度学习模型训练中的梯度计算
"""

import numpy as np
import time
from typing import List, Tuple, Optional, Callable
from collections import defaultdict

class Variable:
    """
    计算图节点,支持反向模式自动微分
    每个节点记录其值、父节点及反向传播函数
    """
    
    def __init__(self, value: float, parents: List['Variable'] = None, 
                 op: str = "input", grad_fn: Callable = None):
        self.value = float(value)
        self.parents = parents if parents else []
        self.op = op
        self.grad_fn = grad_fn
        self.grad = 0.0  # 伴随变量(adjoint)
        self.id = id(self)
    
    def __repr__(self):
        return f"Var({self.value:.4f}, grad={self.grad:.4f}, op={self.op})"
    
    def __add__(self, other):
        if not isinstance(other, Variable):
            other = Variable(other)
        
        def grad_fn(output_grad):
            self.grad += output_grad * 1.0
            other.grad += output_grad * 1.0
        
        return Variable(self.value + other.value, 
                       parents=[self, other], 
                       op="add", 
                       grad_fn=grad_fn)
    
    def __mul__(self, other):
        if not isinstance(other, Variable):
            other = Variable(other)
        
        def grad_fn(output_grad):
            self.grad += output_grad * other.value
            other.grad += output_grad * self.value
        
        return Variable(self.value * other.value, 
                       parents=[self, other], 
                       op="mul", 
                       grad_fn=grad_fn)
    
    def __pow__(self, power):
        assert isinstance(power, (int, float)), "Power must be scalar"
        
        def grad_fn(output_grad):
            self.grad += output_grad * power * (self.value ** (power - 1))
        
        return Variable(self.value ** power, 
                       parents=[self], 
                       op=f"pow{power}", 
                       grad_fn=grad_fn)
    
    def __neg__(self):
        return self * (-1)
    
    def __sub__(self, other):
        return self + (-other)
    
    def __truediv__(self, other):
        return self * (other ** (-1))

class Tape:
    """
    磁带(Tape)实现:记录计算历史并支持反向传播
    """
    
    def __init__(self):
        self.nodes = []
        self.topological_order = []
    
    def add_node(self, node: Variable):
        """添加节点到磁带"""
        self.nodes.append(node)
    
    def build_topological_order(self, output_node: Variable) -> List[Variable]:
        """
        使用DFS构建拓扑排序(后序遍历)
        确保在反向传播时,当前节点的梯度先计算,再传播到父节点
        """
        visited = set()
        order = []
        
        def dfs(node):
            if node.id in visited:
                return
            visited.add(node.id)
            for parent in node.parents:
                dfs(parent)
            order.append(node)
        
        dfs(output_node)
        return order
    
    def backward(self, output_node: Variable, retain_graph: bool = False):
        """
        反向传播计算梯度
        
        Args:
            output_node: 输出节点(标量)
            retain_graph: 是否保留计算图(用于多次反向传播)
        """
        # 拓扑排序(输出节点在前)
        topo_order = self.build_topological_order(output_node)
        
        # 初始化输出梯度为1(对标量而言)
        output_node.grad = 1.0
        
        # 反向遍历
        for node in topo_order:
            if node.grad_fn is not None:
                node.grad_fn(node.grad)
            
            if not retain_graph and node != output_node:
                node.grad_fn = None  # 释放内存
    
    def zero_grad(self):
        """清零所有梯度"""
        for node in self.nodes:
            node.grad = 0.0
    
    def get_computation_graph(self, output_node: Variable) -> dict:
        """获取计算图结构用于可视化"""
        graph = {
            'nodes': [],
            'edges': []
        }
        
        visited = set()
        def traverse(node):
            if node.id in visited:
                return
            visited.add(node.id)
            
            graph['nodes'].append({
                'id': node.id,
                'value': node.value,
                'grad': node.grad,
                'op': node.op
            })
            
            for parent in node.parents:
                graph['edges'].append((parent.id, node.id))
                traverse(parent)
        
        traverse(output_node)
        return graph

class ReverseModeAD:
    """反向模式自动微分主类"""
    
    def __init__(self):
        self.tape = Tape()
    
    def variable(self, value: float, name: str = "") -> Variable:
        """创建变量"""
        var = Variable(value)
        var.name = name
        self.tape.add_node(var)
        return var
    
    def sin(self, x: Variable) -> Variable:
        """sin函数及其反向传播"""
        def grad_fn(output_grad):
            x.grad += output_grad * np.cos(x.value)
        
        result = Variable(np.sin(x.value), 
                         parents=[x], 
                         op="sin", 
                         grad_fn=grad_fn)
        self.tape.add_node(result)
        return result
    
    def cos(self, x: Variable) -> Variable:
        """cos函数及其反向传播"""
        def grad_fn(output_grad):
            x.grad += output_grad * (-np.sin(x.value))
        
        result = Variable(np.cos(x.value), 
                         parents=[x], 
                         op="cos", 
                         grad_fn=grad_fn)
        self.tape.add_node(result)
        return result
    
    def exp(self, x: Variable) -> Variable:
        """exp函数及其反向传播"""
        val = np.exp(x.value)
        
        def grad_fn(output_grad):
            x.grad += output_grad * val
        
        result = Variable(val, 
                         parents=[x], 
                         op="exp", 
                         grad_fn=grad_fn)
        self.tape.add_node(result)
        return result
    
    def log(self, x: Variable) -> Variable:
        """log函数及其反向传播"""
        def grad_fn(output_grad):
            x.grad += output_grad / x.value
        
        result = Variable(np.log(x.value), 
                         parents=[x], 
                         op="log", 
                         grad_fn=grad_fn)
        self.tape.add_node(result)
        return result
    
    def relu(self, x: Variable) -> Variable:
        """ReLU激活函数"""
        val = max(0, x.value)
        
        def grad_fn(output_grad):
            x.grad += output_grad * (1.0 if x.value > 0 else 0.0)
        
        result = Variable(val, 
                         parents=[x], 
                         op="relu", 
                         grad_fn=grad_fn)
        self.tape.add_node(result)
        return result
    
    def backward(self, output: Variable):
        """执行反向传播"""
        self.tape.backward(output)
    
    def gradient(self, output: Variable, inputs: List[Variable]) -> List[float]:
        """计算输出对指定输入的梯度"""
        self.tape.zero_grad()
        self.backward(output)
        return [x.grad for x in inputs]

if __name__ == "__main__":
    print("[1] 基本运算反向传播测试")
    ad = ReverseModeAD()
    
    # 创建变量
    x = ad.variable(2.0, "x")
    y = ad.variable(3.0, "y")
    
    # 计算 z = x^2 * y + y
    z = (x ** 2) * y + y
    
    print(f"Computation: z = x^2 * y + y")
    print(f"x = {x.value}, y = {y.value}")
    print(f"z = {z.value}")
    
    # 反向传播
    ad.backward(z)
    
    print(f"dz/dx = {x.grad} (expected: 2*x*y = 12)")
    print(f"dz/dy = {y.grad} (expected: x^2 + 1 = 5)")
    
    print("\n[2] 复杂函数梯度计算")
    ad2 = ReverseModeAD()
    
    x = ad2.variable(np.pi / 4, "x")
    # f = sin(x) * exp(x) + x^2
    f = ad2.sin(x) * ad2.exp(x) + x ** 2
    
    ad2.backward(f)
    
    analytical = np.cos(np.pi/4)*np.exp(np.pi/4) + np.sin(np.pi/4)*np.exp(np.pi/4) + 2*(np.pi/4)
    print(f"f(x) = sin(x)*exp(x) + x^2 at x=π/4")
    print(f"Computed gradient: {x.grad:.6f}")
    print(f"Analytical gradient: {analytical:.6f}")
    
    print("\n[3] 计算图结构分析")
    graph = ad.tape.get_computation_graph(z)
    print(f"Number of nodes: {len(graph['nodes'])}")
    print(f"Number of edges: {len(graph['edges'])}")
    print("Nodes:")
    for node in graph['nodes']:
        print(f"  {node['op']}: value={node['value']:.4f}, grad={node['grad']:.4f}")
    
    print("\n[4] 神经网络层模拟")
    ad3 = ReverseModeAD()
    
    # 模拟一层神经网络: output = relu(w1*x1 + w2*x2 + b)
    x1 = ad3.variable(1.0, "x1")
    x2 = ad3.variable(0.5, "x2")
    w1 = ad3.variable(0.3, "w1")
    w2 = ad3.variable(-0.2, "w2")
    b = ad3.variable(0.1, "b")
    
    linear = w1 * x1 + w2 * x2 + b
    output = ad3.relu(linear)
    
    # 目标值为1.0,计算MSE损失
    target = 1.0
    loss = (output - target) ** 2
    
    grads = ad3.gradient(loss, [w1, w2, b, x1, x2])
    param_names = ["w1", "w2", "b", "x1", "x2"]
    
    print("Neural network gradient check:")
    for name, grad in zip(param_names, grads):
        print(f"  dLoss/d{name} = {grad:.6f}")
9.1.2.3 混合模式微分(Hessian-vector products)的检查点(Checkpointing)技术

Hessian矩阵的计算在深度学习优化中至关重要,然而显式存储完整的Hessian矩阵具有O(n²)的空间复杂度,在大型神经网络中不可行。Hessian-向量积(HVP)通过反向模式自动微分的嵌套应用,实现了在不构建完整Hessian的情况下计算Hv,其中v为任意向量。具体而言,HVP等价于对梯度向量g = ∇f进行方向导数计算,即先计算g(反向模式),再对g·v进行前向或反向模式微分。检查点技术(Checkpointing)解决了反向模式自动微分中内存消耗随计算深度线性增长的问题,通过在计算图中策略性地保存中间状态,在反向传播时重计算非检查点节点,以时间换取空间。最优检查点策略遵循动态规划原理,在内存预算约束下最小化重计算开销。以下代码实现了基于检查点的HVP计算及内存优化策略。

"""
【脚本说明】
本脚本实现混合模式自动微分与检查点技术,包含:
1. Hessian-向量积(HVP)的自动微分实现
2. 检查点(Checkpointing)机制减少内存占用
3. 嵌套反向模式与前向-反向混合模式对比
4. 分段检查点策略在深层网络中的应用
5. 内存使用分析与计算开销权衡

使用方式:
- 直接运行查看HVP计算与检查点内存优化效果
- 调整网络深度与检查点频率观察性能变化
- 适用于二阶优化方法(如牛顿法、自然梯度)及Hessian分析
"""

import numpy as np
import time
from typing import List, Callable, Tuple, Optional
import copy

class CheckpointedAD:
    """基于检查点的自动微分实现"""
    
    def __init__(self, max_memory_mb: float = 100):
        self.max_memory = max_memory_mb * 1024 * 1024  # 转换为bytes
        self.checkpoints = {}
        self.computation_history = []
    
    def forward_with_checkpoints(self, 
                                 layers: List[Callable], 
                                 x: np.ndarray, 
                                 checkpoint_interval: int = 5) -> np.ndarray:
        """
        带检查点前向传播
        
        Args:
            layers: 网络层函数列表
            x: 输入
            checkpoint_interval: 每隔多少层保存检查点
        
        Returns:
            输出及检查点信息
        """
        current = x
        self.checkpoints = {0: copy.deepcopy(x)}  # 输入作为第0个检查点
        
        for i, layer in enumerate(layers):
            current = layer(current)
            self.computation_history.append((i, layer, current))
            
            # 保存检查点
            if (i + 1) % checkpoint_interval == 0:
                self.checkpoints[i + 1] = copy.deepcopy(current)
                print(f"Checkpoint saved at layer {i + 1}")
        
        return current
    
    def backward_with_checkpoints(self, 
                                  layers: List[Callable], 
                                  output_grad: np.ndarray,
                                  checkpoint_interval: int = 5) -> np.ndarray:
        """
        带检查点的反向传播,从最近的检查点重计算
        
        Args:
            layers: 网络层函数列表
            output_grad: 输出梯度
            checkpoint_interval: 检查点间隔
        
        Returns:
            输入梯度
        """
        grad = output_grad
        
        # 从后向前遍历
        for i in range(len(layers) - 1, -1, -1):
            # 找到最近的检查点
            checkpoint_idx = (i // checkpoint_interval) * checkpoint_interval
            if checkpoint_idx not in self.checkpoints:
                checkpoint_idx = 0
            
            # 如果需要,从检查点重计算到当前层
            if checkpoint_idx != i:
                current = self.checkpoints[checkpoint_idx]
                for j in range(checkpoint_idx, i):
                    current = layers[j](current)
                print(f"Recomputed layers {checkpoint_idx} to {i}")
            
            # 计算当前层梯度(简化示例,实际应使用自动微分)
            grad = self._compute_layer_gradient(layers[i], grad)
        
        return grad
    
    def _compute_layer_gradient(self, layer: Callable, output_grad: np.ndarray) -> np.ndarray:
        """计算单层梯度(简化实现)"""
        # 这里使用数值微分作为占位符
        return output_grad * 0.9  # 模拟梯度传播

class HessianVectorProduct:
    """Hessian-向量积计算实现"""
    
    def __init__(self, func: Callable, grad_func: Callable):
        """
        Args:
            func: 目标函数 f(x)
            grad_func: 梯度函数 ∇f(x)
        """
        self.func = func
        self.grad_func = grad_func
    
    def hvp_forward_backward(self, x: np.ndarray, v: np.ndarray, eps: float = 1e-5) -> np.ndarray:
        """
        使用有限差分近似HVP(前向-反向混合)
        Hv ≈ (∇f(x + εv) - ∇f(x - εv)) / (2ε)
        """
        x_plus = x + eps * v
        x_minus = x - eps * v
        
        grad_plus = self.grad_func(x_plus)
        grad_minus = self.grad_func(x_minus)
        
        hvp = (grad_plus - grad_minus) / (2 * eps)
        return hvp
    
    def hvp_reverse_forward(self, x: np.ndarray, v: np.ndarray) -> np.ndarray:
        """
        使用反向-前向混合模式计算HVP
        计算 ∇(∇f(x) · v) 对 x 的梯度
        """
        # 前向模式计算方向导数
        def directional_derivative(x):
            grad = self.grad_func(x)
            return np.dot(grad, v)
        
        # 反向模式计算该方向导数的梯度(即HVP)
        return self._compute_gradient(directional_derivative, x)
    
    def hvp_reverse_reverse(self, x: np.ndarray, v: np.ndarray) -> np.ndarray:
        """
        使用双重反向模式计算HVP(Pearlmutter技巧)
        通过反向传播计算HVP,精度更高
        """
        # R{v} = v^T ∇
        # Hv = ∇(v^T ∇f) = ∇(R{v}f)
        
        # 第一步:计算梯度
        grad = self.grad_func(x)
        
        # 第二步:对梯度进行方向导数(使用反向模式模拟前向模式)
        def grad_dot_v(x):
            g = self.grad_func(x)
            return np.dot(g, v)
        
        return self._compute_gradient(grad_dot_v, x)
    
    def _compute_gradient(self, func: Callable, x: np.ndarray, eps: float = 1e-7) -> np.ndarray:
        """数值梯度计算(用于演示)"""
        grad = np.zeros_like(x)
        for i in range(len(x)):
            x_plus = x.copy()
            x_minus = x.copy()
            x_plus[i] += eps
            x_minus[i] -= eps
            
            grad[i] = (func(x_plus) - func(x_minus)) / (2 * eps)
        return grad

class MemoryOptimizedNetwork:
    """内存优化的神经网络,使用检查点技术"""
    
    def __init__(self, layer_sizes: List[int], use_checkpointing: bool = True):
        self.layer_sizes = layer_sizes
        self.use_checkpointing = use_checkpointing
        self.weights = []
        self.activations_cache = {}
        
        for i in range(len(layer_sizes) - 1):
            w = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * 0.1
            self.weights.append(w)
    
    def forward(self, x: np.ndarray, save_activations: bool = True) -> np.ndarray:
        """前向传播,可选择保存激活值"""
        current = x
        self.activations_cache = {0: x} if save_activations else {}
        
        for i, w in enumerate(self.weights):
            current = np.dot(current, w)
            current = np.maximum(0, current)  # ReLU
            
            if save_activations:
                if self.use_checkpointing and (i + 1) % 3 == 0:
                    # 只保存检查点,删除之前的激活值
                    self.activations_cache = {(i+1): current}
                else:
                    self.activations_cache[i + 1] = current
        
        return current
    
    def backward(self, output_grad: np.ndarray) -> List[np.ndarray]:
        """
        反向传播,支持从检查点重计算
        """
        grad = output_grad
        weight_grads = []
        
        for i in range(len(self.weights) - 1, -1, -1):
            # 如果需要,从检查点重计算激活值
            if self.use_checkpointing and (i + 1) not in self.activations_cache:
                # 找到最近的检查点
                checkpoint_idx = ((i + 1) // 3) * 3
                current = self.activations_cache.get(checkpoint_idx, 
                                                    np.random.randn(self.layer_sizes[0]))
                
                # 重计算到当前层
                for j in range(checkpoint_idx, i + 1):
                    current = np.dot(current, self.weights[j])
                    current = np.maximum(0, current)
                
                activation = current
                print(f"Recomputed activation for layer {i+1} from checkpoint {checkpoint_idx}")
            else:
                activation = self.activations_cache.get(i + 1)
                if activation is None:
                    activation = np.random.randn(self.layer_sizes[i+1])  # 占位符
            
            # 计算梯度(简化)
            grad_w = np.outer(activation, grad) if len(activation.shape) == 1 else activation.T @ grad
            weight_grads.append(grad_w)
            
            # 传播梯度
            grad = grad @ self.weights[i].T
            grad = grad * (activation > 0).astype(float)  # ReLU导数
        
        return list(reversed(weight_grads))

if __name__ == "__main__":
    print("[1] Hessian-向量积(HVP)计算对比")
    
    # 定义测试函数:f(x) = 0.5 * x^T A x + b^T x
    A = np.array([[4, 1], [1, 3]])
    b = np.array([1, 2])
    
    def func(x):
        return 0.5 * x @ A @ x + b @ x
    
    def grad_func(x):
        return A @ x + b
    
    hvp_calculator = HessianVectorProduct(func, grad_func)
    
    x = np.array([1.0, 2.0])
    v = np.array([1.0, 0.0])  # 方向向量
    
    print(f"计算 H*v at x={x}, v={v}")
    
    # 方法1:有限差分
    hvp_fd = hvp_calculator.hvp_forward_backward(x, v)
    print(f"Finite Difference HVP: {hvp_fd}")
    
    # 方法2:双重反向模式
    hvp_rr = hvp_calculator.hvp_reverse_reverse(x, v)
    print(f"Reverse-Reverse HVP: {hvp_rr}")
    
    # 真实Hessian乘以v
    true_hvp = A @ v
    print(f"True HVP (A @ v): {true_hvp}")
    
    print(f"\n误差分析:")
    print(f"Finite Diff Error: {np.linalg.norm(hvp_fd - true_hvp):.2e}")
    print(f"Rev-Rev Error: {np.linalg.norm(hvp_rr - true_hvp):.2e}")
    
    print("\n[2] 检查点技术内存优化")
    layer_sizes = [10, 20, 15, 10, 5]
    
    # 不使用检查点
    net_no_checkpoint = MemoryOptimizedNetwork(layer_sizes, use_checkpointing=False)
    input_data = np.random.randn(10)
    output = net_no_checkpoint.forward(input_data, save_activations=True)
    grads_no_ckpt = net_no_checkpoint.backward(np.ones(5))
    memory_no_ckpt = len(net_no_checkpoint.activations_cache)
    
    # 使用检查点
    net_checkpoint = MemoryOptimizedNetwork(layer_sizes, use_checkpointing=True)
    output = net_checkpoint.forward(input_data, save_activations=True)
    grads_ckpt = net_checkpoint.backward(np.ones(5))
    memory_ckpt = len(net_checkpoint.activations_cache)
    
    print(f"\nNetwork depth: {len(layer_sizes)} layers")
    print(f"Memory without checkpointing: {memory_no_ckpt} activation tensors")
    print(f"Memory with checkpointing: {memory_ckpt} activation tensors")
    print(f"Memory reduction: {(1 - memory_ckpt/memory_no_ckpt)*100:.1f}%")
    
    print("\n[3] 大规模网络性能测试")
    large_layer_sizes = [100, 200, 200, 200, 100, 50]
    
    start = time.perf_counter()
    net_large = MemoryOptimizedNetwork(large_layer_sizes, use_checkpointing=True)
    x = np.random.randn(100)
    out = net_large.forward(x)
    grads = net_large.backward(np.ones(50))
    time_ckpt = time.perf_counter() - start
    
    print(f"Large network backward pass with checkpointing: {time_ckpt:.4f}s")
    print(f"Checkpoints maintained: {len(net_large.activations_cache)}")

9.2 数据工程与特征处理

9.2.1 特征工程方法论

9.2.1.1 特征缩放:标准化(Z-score)vs归一化(Min-Max)的分布影响

特征缩放是数据预处理中的关键步骤,直接影响基于梯度的优化算法收敛速度与基于距离的模型(如K近邻、支持向量机)性能。Z-score标准化通过减去均值并除以标准差,将特征转换为均值为零、单位方差的分布,该方法对异常值具有鲁棒性,适用于数据分布近似高斯或存在离群点的场景,但会改变原始数据的极差范围。Min-Max归一化将特征线性映射至[0,1]区间,保留了原始分布的形状信息,适用于对输入范围有严格限制的算法(如神经网络Sigmoid激活),然而其对异常值极为敏感,单个极端值可压缩绝大多数数据的动态范围。从分布理论角度,标准化后的数据满足标准正态分布,其高阶矩(偏度、峰度)保持不变;而归一化后的分布受原始极值约束,可能导致概率密度在边界处堆积。以下代码实现了两种缩放方法及其对数据分布影响的对比分析。

Python

"""
【脚本说明】
本脚本实现特征缩放方法的对比分析,包含:
1. Z-score标准化(Standardization)与Min-Max归一化(Normalization)的实现
2. 异常值对两种方法的影响分析
3. 分布形状保持性评估(偏度、峰度)
4. 不同缩放方法对机器学习模型(KNN、神经网络)性能的影响
5. 可视化展示缩放前后的分布变化

使用方式:
- 直接运行查看缩放效果对比与性能评估
- 修改异常值比例与强度观察鲁棒性差异
- 适用于数据预处理流程选择与参数调优
"""

import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification, make_regression
from scipy import stats
import time

class FeatureScalingAnalyzer:
    """特征缩放方法分析与对比"""
    
    def __init__(self):
        self.scalers = {
            'z_score': StandardScaler(),
            'min_max': MinMaxScaler(),
            'robust': None  # 使用手动实现
        }
    
    def z_score_standardization(self, X: np.ndarray) -> np.ndarray:
        """
        Z-score标准化: (X - μ) / σ
        结果均值为0,标准差为1
        """
        mean = np.mean(X, axis=0)
        std = np.std(X, axis=0)
        # 避免除以零
        std[std == 0] = 1.0
        return (X - mean) / std
    
    def min_max_normalization(self, X: np.ndarray, feature_range=(0, 1)) -> np.ndarray:
        """
        Min-Max归一化: (X - X_min) / (X_max - X_min) * (max - min) + min
        结果映射到指定范围(默认[0,1])
        """
        X_min = np.min(X, axis=0)
        X_max = np.max(X, axis=0)
        range_val = X_max - X_min
        # 避免除以零
        range_val[range_val == 0] = 1.0
        
        X_scaled = (X - X_min) / range_val
        min_val, max_val = feature_range
        return X_scaled * (max_val - min_val) + min_val
    
    def robust_scaling(self, X: np.ndarray) -> np.ndarray:
        """
        稳健标准化: (X - median) / IQR
        使用中位数和四分位距,对异常值鲁棒
        """
        median = np.median(X, axis=0)
        q1 = np.percentile(X, 25, axis=0)
        q3 = np.percentile(X, 75, axis=0)
        iqr = q3 - q1
        iqr[iqr == 0] = 1.0
        return (X - median) / iqr
    
    def add_outliers(self, X: np.ndarray, outlier_ratio: float = 0.05, 
                    magnitude: float = 10.0) -> np.ndarray:
        """向数据添加异常值以测试鲁棒性"""
        n_samples, n_features = X.shape
        n_outliers = int(n_samples * outlier_ratio)
        
        X_with_outliers = X.copy()
        outlier_indices = np.random.choice(n_samples, n_outliers, replace=False)
        
        # 随机选择特征添加极端值
        for idx in outlier_indices:
            feature_idx = np.random.randint(n_features)
            X_with_outliers[idx, feature_idx] += magnitude * np.std(X[:, feature_idx])
        
        return X_with_outliers
    
    def distribution_analysis(self, X_original: np.ndarray, X_scaled: np.ndarray, 
                            method_name: str) -> dict:
        """
        分析缩放后的分布特性
        """
        stats_dict = {
            'method': method_name,
            'mean': np.mean(X_scaled, axis=0),
            'std': np.std(X_scaled, axis=0),
            'min': np.min(X_scaled, axis=0),
            'max': np.max(X_scaled, axis=0),
            'skewness': stats.skew(X_scaled, axis=0),
            'kurtosis': stats.kurtosis(X_scaled, axis=0)
        }
        return stats_dict
    
    def impact_on_models(self, X: np.ndarray, y: np.ndarray, 
                        X_outliers: np.ndarray = None) -> dict:
        """
        评估不同缩放方法对模型性能的影响
        """
        results = {}
        
        datasets = {
            'clean': (X, y),
            'with_outliers': (X_outliers, y) if X_outliers is not None else None
        }
        
        for data_name, data in datasets.items():
            if data is None:
                continue
            
            X_data, y_data = data
            X_train, X_test, y_train, y_test = train_test_split(
                X_data, y_data, test_size=0.2, random_state=42
            )
            
            # 测试不同缩放方法
            scalers = {
                'none': lambda x: x,
                'z_score': self.z_score_standardization,
                'min_max': self.min_max_normalization,
                'robust': self.robust_scaling
            }
            
            for scaler_name, scaler_func in scalers.items():
                X_train_scaled = scaler_func(X_train)
                X_test_scaled = scaler_func(X_test)
                
                # KNN模型
                knn = KNeighborsClassifier(n_neighbors=5)
                knn.fit(X_train_scaled, y_train)
                knn_acc = knn.score(X_test_scaled, y_test)
                
                # 神经网络模型
                mlp = MLPClassifier(hidden_layer_sizes=(50,), max_iter=1000, random_state=42)
                mlp.fit(X_train_scaled, y_train)
                mlp_acc = mlp.score(X_test_scaled, y_test)
                
                results[f"{data_name}_{scaler_name}"] = {
                    'knn_accuracy': knn_acc,
                    'mlp_accuracy': mlp_acc
                }
        
        return results

if __name__ == "__main__":
    print("[1] 生成测试数据")
    np.random.seed(42)
    
    # 生成具有不同尺度的特征
    X, y = make_classification(n_samples=1000, n_features=4, n_informative=4, 
                              n_redundant=0, n_clusters_per_class=1, random_state=42)
    
    # 人为制造不同特征尺度
    X[:, 0] *= 100  # 特征0: 大尺度
    X[:, 1] *= 0.01  # 特征1: 小尺度
    X[:, 2] = np.random.exponential(2, 1000)  # 特征2: 指数分布(右偏)
    X[:, 3] = np.random.normal(50, 10, 1000)  # 特征3: 正态分布
    
    print("原始数据统计:")
    print(f"  均值: {np.mean(X, axis=0)}")
    print(f"  标准差: {np.std(X, axis=0)}")
    print(f"  范围: [{np.min(X, axis=0)}, {np.max(X, axis=0)}]")
    
    analyzer = FeatureScalingAnalyzer()
    
    print("\n[2] 应用不同缩放方法")
    X_zscore = analyzer.z_score_standardization(X)
    X_minmax = analyzer.min_max_normalization(X)
    X_robust = analyzer.robust_scaling(X)
    
    print("\nZ-score标准化后:")
    print(f"  均值: {np.mean(X_zscore, axis=0)}")
    print(f"  标准差: {np.std(X_zscore, axis=0)}")
    
    print("\nMin-Max归一化后:")
    print(f"  范围: [{np.min(X_minmax, axis=0)}, {np.max(X_minmax, axis=0)}]")
    
    print("\nRobust缩放后:")
    print(f"  中位数: {np.median(X_robust, axis=0)}")
    print(f"  IQR: {np.percentile(X_robust, 75, axis=0) - np.percentile(X_robust, 25, axis=0)}")
    
    print("\n[3] 异常值鲁棒性测试")
    X_outliers = analyzer.add_outliers(X, outlier_ratio=0.05, magnitude=20)
    
    X_zscore_outlier = analyzer.z_score_standardization(X_outliers)
    X_minmax_outlier = analyzer.min_max_normalization(X_outliers)
    
    print("\n添加异常值后Min-Max归一化:")
    print(f"  最小值: {np.min(X_minmax_outlier, axis=0)}")
    print(f"  最大值: {np.max(X_minmax_outlier, axis=0)}")
    print(f"  非异常值范围被压缩至:")
    normal_mask = ~np.isin(np.arange(len(X)), 
                          np.where(np.abs(X_outliers - X.mean(axis=0)) > 3 * X.std(axis=0))[0])
    print(f"    [{np.min(X_minmax_outlier[normal_mask], axis=0)}, "
          f"{np.max(X_minmax_outlier[normal_mask], axis=0)}]")
    
    print("\n[4] 分布特性分析")
    stats_zscore = analyzer.distribution_analysis(X, X_zscore, "Z-score")
    stats_minmax = analyzer.distribution_analysis(X, X_minmax, "Min-Max")
    
    print(f"\n偏度比较 (特征2 - 指数分布):")
    print(f"  原始: {stats_zscore['skewness'][2]:.3f}")
    print(f"  Z-score后: {stats_zscore['skewness'][2]:.3f}")
    print(f"  Min-Max后: {stats_minmax['skewness'][2]:.3f}")
    print("  (注意: 线性变换不改变偏度)")
    
    print("\n[5] 对模型性能的影响")
    results = analyzer.impact_on_models(X, y, X_outliers)
    
    print("\n清洁数据上的性能:")
    for method in ['none', 'z_score', 'min_max']:
        key = f"clean_{method}"
        if key in results:
            print(f"  {method:10s} - KNN: {results[key]['knn_accuracy']:.3f}, "
                  f"MLP: {results[key]['mlp_accuracy']:.3f}")
    
    print("\n含异常值数据上的性能:")
    for method in ['none', 'z_score', 'min_max', 'robust']:
        key = f"with_outliers_{method}"
        if key in results:
            print(f"  {method:10s} - KNN: {results[key]['knn_accuracy']:.3f}, "
                  f"MLP: {results[key]['mlp_accuracy']:.3f}")
9.2.1.2 多项式特征交互与SVD降维的维数灾难规避

高阶多项式特征交互能够捕捉原始特征间的非线性关系,显著提升线性模型的表达能力,然而其生成的特征空间维度随原始维度呈指数增长,引发维数灾难(Curse of Dimensionality)。SVD(奇异值分解)通过将高维交互特征投影至低维子空间,在保留主要变异信息的同时控制模型复杂度。具体实现中,多项式特征生成后应用截断SVD(Truncated SVD),保留前k个最大奇异值对应的奇异向量,这些向量构成了数据方差最大的方向。这种组合策略既保留了非线性交互的建模能力,又通过降维缓解了过拟合风险与计算开销。从正则化角度,SVD降维等价于在多项式空间施加低秩约束,抑制了高阶交互中的噪声成分。以下代码实现了多项式特征生成与SVD降维的联合优化流程。

"""
【脚本说明】
本脚本实现多项式特征交互与SVD降维的联合优化,包含:
1. 多项式特征生成(Polynomial Features)的高效实现
2. 截断SVD(Truncated SVD)降维与方差保留分析
3. 交互特征选择(基于重要性或相关性)
4. 维数灾难的可视化分析(样本密度与距离分布)
5. 不同降维策略对模型泛化性能的影响

使用方式:
- 直接运行查看特征生成与降维效果
- 调整多项式度数与降维维度观察性能权衡
- 适用于非线性特征工程与维度优化
"""

import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import PolynomialFeatures
from sklearn.decomposition import TruncatedSVD, PCA
from sklearn.linear_model import Ridge, LinearRegression
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.datasets import make_regression, load_diabetes
from sklearn.metrics import mean_squared_error
import time

class PolynomialSVDFeatureEngineer:
    """多项式特征与SVD降维联合优化"""
    
    def __init__(self, degree: int = 2, n_components: int = 50):
        self.degree = degree
        self.n_components = n_components
        self.poly = PolynomialFeatures(degree=degree, include_bias=False, interaction_only=False)
        self.svd = TruncatedSVD(n_components=n_components)
        self.mean = None
        self.std = None
    
    def fit_transform(self, X: np.ndarray) -> np.ndarray:
        """
        拟合并转换数据:生成多项式特征后应用SVD降维
        """
        # 步骤1: 标准化(为数值稳定性)
        self.mean = np.mean(X, axis=0)
        self.std = np.std(X, axis=0) + 1e-8
        X_scaled = (X - self.mean) / self.std
        
        # 步骤2: 生成多项式特征
        X_poly = self.poly.fit_transform(X_scaled)
        print(f"Polynomial features shape: {X_poly.shape}")
        
        # 步骤3: 应用SVD降维
        if X_poly.shape[1] > self.n_components:
            X_reduced = self.svd.fit_transform(X_poly)
            print(f"Reduced dimensions: {X_reduced.shape}")
            print(f"Explained variance ratio: {self.svd.explained_variance_ratio_.sum():.3f}")
        else:
            X_reduced = X_poly
            print("Skipping SVD (dimensions already low)")
        
        return X_reduced
    
    def transform(self, X: np.ndarray) -> np.ndarray:
        """对新数据进行转换"""
        X_scaled = (X - self.mean) / self.std
        X_poly = self.poly.transform(X_scaled)
        
        if hasattr(self.svd, 'components_'):
            return self.svd.transform(X_poly)
        return X_poly
    
    def get_feature_importance(self, model_coef: np.ndarray) -> np.ndarray:
        """
        通过SVD组件重构原始多项式特征的重要性
        """
        if hasattr(self.svd, 'components_'):
            # 将降维后的系数映射回多项式特征空间
            poly_importance = np.abs(self.svd.components_.T @ model_coef)
            return poly_importance
        return np.abs(model_coef)
    
    def curse_of_dimensionality_analysis(self, X: np.ndarray, 
                                        max_degree: int = 5) -> dict:
        """
        分析维数灾难现象:样本密度与最近邻距离
        """
        n_samples, n_features = X.shape
        
        results = {
            'degrees': [],
            'dimensions': [],
            'sample_density': [],
            'avg_distance': [],
            'distance_variance': []
        }
        
        for degree in range(1, max_degree + 1):
            poly = PolynomialFeatures(degree=degree, include_bias=False)
            X_poly = poly.fit_transform(X)
            n_dim = X_poly.shape[1]
            
            # 计算样本密度(单位超球体内的样本数)
            # 简化为:样本数 / 2^n_features(假设特征归一化到[-1,1])
            hypervolume = 2 ** n_dim
            density = n_samples / hypervolume
            
            # 计算平均欧氏距离
            # 采样计算(避免O(n^2))
            sample_size = min(100, n_samples)
            indices = np.random.choice(n_samples, sample_size, replace=False)
            sample = X_poly[indices]
            
            # 计算两两距离
            distances = []
            for i in range(min(20, sample_size)):
                dists = np.linalg.norm(sample[i+1:] - sample[i], axis=1)
                distances.extend(dists)
            
            avg_dist = np.mean(distances) if distances else 0
            var_dist = np.var(distances) if distances else 0
            
            results['degrees'].append(degree)
            results['dimensions'].append(n_dim)
            results['sample_density'].append(density)
            results['avg_distance'].append(avg_dist)
            results['distance_variance'].append(var_dist)
            
            print(f"Degree {degree}: {n_features} -> {n_dim} dims, "
                  f"density={density:.2e}, avg_dist={avg_dist:.2f}")
        
        return results

class InteractionSelector:
    """基于统计重要性的交互特征选择"""
    
    def __init__(self, max_interactions: int = 100):
        self.max_interactions = max_interactions
        self.selected_indices = None
        self.poly_feature_names = None
    
    def fit(self, X: np.ndarray, y: np.ndarray, degree: int = 2):
        """
        基于与目标变量的相关性选择最重要的交互特征
        """
        poly = PolynomialFeatures(degree=degree, include_bias=False)
        X_poly = poly.fit_transform(X)
        self.poly_feature_names = poly.get_feature_names_out()
        
        # 计算每个特征与目标的相关性
        correlations = np.array([np.corrcoef(X_poly[:, i], y)[0, 1] 
                                for i in range(X_poly.shape[1])])
        correlations = np.nan_to_num(correlations, nan=0.0)
        
        # 选择绝对相关性最高的特征
        n_select = min(self.max_interactions, len(correlations))
        self.selected_indices = np.argsort(np.abs(correlations))[-n_select:][::-1]
        
        print(f"Selected top {n_select} interactions from {len(correlations)} total")
        return self
    
    def transform(self, X: np.ndarray, degree: int = 2) -> np.ndarray:
        """仅保留选定的交互特征"""
        poly = PolynomialFeatures(degree=degree, include_bias=False)
        X_poly = poly.fit_transform(X)
        return X_poly[:, self.selected_indices]
    
    def get_selected_names(self) -> list:
        """获取选定特征的名称"""
        if self.poly_feature_names is not None:
            return [self.poly_feature_names[i] for i in self.selected_indices]
        return []

if __name__ == "__main__":
    print("[1] 生成非线性回归数据")
    np.random.seed(42)
    
    # 生成具有非线性交互的数据
    X, y = make_regression(n_samples=500, n_features=5, noise=10, random_state=42)
    # 添加非线性项
    y = y + 2 * X[:, 0] * X[:, 1] + X[:, 2]**2 - X[:, 3] * X[:, 4]
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    print(f"Original features: {X.shape[1]}")
    
    print("\n[2] 多项式特征生成与SVD降维")
    engineer = PolynomialSVDFeatureEngineer(degree=2, n_components=30)
    X_train_engineered = engineer.fit_transform(X_train)
    X_test_engineered = engineer.transform(X_test)
    
    print("\n[3] 维数灾难分析")
    curse_analysis = engineer.curse_of_dimensionality_analysis(X_train, max_degree=4)
    
    print("\n[4] 模型性能对比")
    models = {
        'Linear': LinearRegression(),
        'Ridge': Ridge(alpha=1.0)
    }
    
    # 原始特征
    print("\nOriginal features:")
    for name, model in models.items():
        model.fit(X_train, y_train)
        pred = model.predict(X_test)
        mse = mean_squared_error(y_test, pred)
        print(f"  {name}: MSE = {mse:.2f}")
    
    # 多项式+SVD特征
    print("\nPolynomial + SVD features:")
    for name, model in models.items():
        model.fit(X_train_engineered, y_train)
        pred = model.predict(X_test_engineered)
        mse = mean_squared_error(y_test, pred)
        cv_scores = cross_val_score(model, X_train_engineered, y_train, 
                                   cv=5, scoring='neg_mean_squared_error')
        print(f"  {name}: MSE = {mse:.2f}, CV MSE = {-cv_scores.mean():.2f} (+/- {cv_scores.std():.2f})")
    
    print("\n[5] 交互特征选择")
    selector = InteractionSelector(max_interactions=20)
    selector.fit(X_train, y_train, degree=2)
    X_train_selected = selector.transform(X_train, degree=2)
    X_test_selected = selector.transform(X_test, degree=2)
    
    print(f"\nSelected features shape: {X_train_selected.shape}")
    print("Top selected interactions:")
    for name in selector.get_selected_names()[:5]:
        print(f"  {name}")
    
    # 测试选定特征的模型性能
    model = Ridge(alpha=1.0)
    model.fit(X_train_selected, y_train)
    pred = model.predict(X_test_selected)
    mse = mean_squared_error(y_test, pred)
    print(f"\nRidge with selected features: MSE = {mse:.2f}")
    
    print("\n[6] 不同降维策略对比")
    strategies = {
        'No Reduction': lambda x: PolynomialFeatures(2, include_bias=False).fit_transform(x),
        'PCA': lambda x: PCA(n_components=30).fit_transform(
            PolynomialFeatures(2, include_bias=False).fit_transform(x)),
        'Truncated SVD': lambda x: TruncatedSVD(n_components=30).fit_transform(
            PolynomialFeatures(2, include_bias=False).fit_transform(x))
    }
    
    for name, strategy in strategies.items():
        start = time.time()
        X_train_trans = strategy(X_train)
        X_test_trans = strategy(X_test) if 'No' in name else TruncatedSVD(n_components=30).fit(
            PolynomialFeatures(2, include_bias=False).fit_transform(X_train)).transform(
            PolynomialFeatures(2, include_bias=False).fit_transform(X_test))
        
        model = Ridge(alpha=1.0)
        model.fit(X_train_trans, y_train)
        pred = model.predict(X_test_trans)
        mse = mean_squared_error(y_test, pred)
        elapsed = time.time() - start
        print(f"{name:15s}: MSE={mse:.2f}, Time={elapsed:.3f}s, "
              f"Dims={X_train_trans.shape[1]}")
9.2.1.3 目标编码(Target Encoding)与CatBoost的顺序统计 leak-proof 实现

目标编码(Target Encoding)通过将分类变量替换为目标变量的条件均值来实现高基数特征的数值化表示,显著优于传统的独热编码在处理类别数量庞大的场景。然而,朴素的目标编码存在严重的数据泄露风险:全局均值计算利用了包含当前样本在内的全部数据,导致训练集上的乐观偏差与泛化性能下降。CatBoost提出的顺序统计编码(Ordered Target Encoding)通过模拟时间序列结构,仅利用当前样本之前的历史数据计算目标统计量,从根本上消除了信息泄露。具体实现中,数据按随机排列的顺序处理,每个样本的编码值仅基于其前驱样本的目标均值,对于测试集则使用训练集的整体统计量。这种有序统计方法在保持编码效率的同时,提供了与留一编码(Leave-One-Out)相当的抗泄露能力,且计算复杂度仅为O(n)。以下代码实现了标准目标编码、留一编码及CatBoost顺序编码的对比。

Python

复制

"""
【脚本说明】
本脚本实现目标编码(Target Encoding)及其防泄露变体,包含:
1. 标准目标编码(存在数据泄露)
2. 留一目标编码(Leave-One-Out Encoding)
3. CatBoost顺序统计编码(Ordered Target Encoding)
4. 平滑技术(Smoothing)处理罕见类别
5. 泄露检测与泛化性能对比

使用方式:
- 直接运行查看不同编码方法的泄露程度与性能差异
- 调整类别基数与噪声水平观察鲁棒性
- 适用于高基数分类特征处理与梯度提升模型预处理
"""

import numpy as np
import pandas as pd
from sklearn.model_selection import KFold, train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error
import hashlib

class TargetEncoder:
    """基础目标编码实现"""
    
    def __init__(self, smoothing: float = 1.0, min_samples_leaf: int = 1):
        self.smoothing = smoothing
        self.min_samples_leaf = min_samples_leaf
        self.target_mean = None
        self.category_means = {}
        self.category_counts = {}
    
    def fit(self, X: pd.Series, y: pd.Series):
        """
        计算目标编码映射
        
        Args:
            X: 分类特征(pandas Series)
            y: 目标变量
        """
        self.target_mean = y.mean()
        
        # 计算每个类别的统计量
        stats = pd.DataFrame({'category': X, 'target': y})
        agg = stats.groupby('category')['target'].agg(['mean', 'count'])
        
        # 应用平滑
        # smoothed_mean = (count * mean + smoothing * global_mean) / (count + smoothing)
        smoothed = (agg['count'] * agg['mean'] + self.smoothing * self.target_mean) / \
                   (agg['count'] + self.smoothing)
        
        self.category_means = smoothed.to_dict()
        self.category_counts = agg['count'].to_dict()
        
        return self
    
    def transform(self, X: pd.Series) -> np.ndarray:
        """应用目标编码"""
        return X.map(self.category_means).fillna(self.target_mean).values
    
    def fit_transform(self, X: pd.Series, y: pd.Series) -> np.ndarray:
        """拟合并转换(存在泄露风险)"""
        self.fit(X, y)
        return self.transform(X)

class LeaveOneOutEncoder:
    """留一目标编码(防止训练集泄露)"""
    
    def __init__(self, smoothing: float = 1.0):
        self.smoothing = smoothing
        self.target_mean = None
        self.category_stats = {}
    
    def fit_transform(self, X: pd.Series, y: pd.Series) -> np.ndarray:
        """
        留一编码:每个样本使用其他样本计算的类别均值
        
        数学上: enc_i = (sum(y_j for j in same category, j≠i) + smoothing * global_mean) / 
                        (count(category) - 1 + smoothing)
        """
        self.target_mean = y.mean()
        df = pd.DataFrame({'category': X, 'target': y})
        
        # 计算全局统计
        global_sum = y.sum()
        global_count = len(y)
        
        encoded = np.zeros(len(X))
        
        # 对每个类别分别处理
        for cat in X.unique():
            mask = (X == cat)
            category_sum = y[mask].sum()
            category_count = mask.sum()
            
            # 留一均值: (total - current) / (count - 1)
            if category_count > 1:
                # 向量化计算
                category_values = y[mask].values
                cumsum = np.cumsum(category_values)
                total = category_sum
                
                # 留一和 = 总和 - 当前值
                leave_one_sums = total - category_values
                leave_one_counts = category_count - 1
                
                # 应用平滑
                smoothed_means = (leave_one_sums + self.smoothing * self.target_mean) / \
                                (leave_one_counts + self.smoothing)
                encoded[mask] = smoothed_means
            else:
                # 只有一个样本,使用全局均值
                encoded[mask] = self.target_mean
        
        return encoded
    
    def transform(self, X: pd.Series, y_train: pd.Series = None) -> np.ndarray:
        """测试集转换:使用训练集计算的均值"""
        # 简化:使用存储的训练集统计量
        return X.map(self.category_stats).fillna(self.target_mean).values

class CatBoostOrderedEncoder:
    """
    CatBoost顺序统计编码
    模拟时间序列结构,仅使用历史数据计算统计量
    """
    
    def __init__(self, smoothing: float = 1.0, n_permutations: int = 1):
        self.smoothing = smoothing
        self.n_permutations = n_permutations
        self.target_mean = None
        self.category_stats = {}
    
    def fit_transform(self, X: pd.Series, y: pd.Series, 
                     random_seed: int = 42) -> np.ndarray:
        """
        顺序编码:按随机顺序处理,每个样本仅看历史
        
        实现:对每个排列,按顺序更新类别统计量
        """
        self.target_mean = y.mean()
        df = pd.DataFrame({'category': X, 'target': y})
        n_samples = len(X)
        
        # 存储所有排列的编码结果取平均
        all_encodings = np.zeros((self.n_permutations, n_samples))
        
        np.random.seed(random_seed)
        
        for perm_idx in range(self.n_permutations):
            # 随机排列
            permutation = np.random.permutation(n_samples)
            inv_permutation = np.argsort(permutation)
            
            X_perm = X.iloc[permutation].reset_index(drop=True)
            y_perm = y.iloc[permutation].reset_index(drop=True)
            
            # 顺序统计
            encoded_perm = np.zeros(n_samples)
            category_sums = {}
            category_counts = {}
            
            for i in range(n_samples):
                cat = X_perm.iloc[i]
                current_target = y_perm.iloc[i]
                
                if cat in category_counts and category_counts[cat] > 0:
                    # 使用历史统计(不包含当前样本)
                    historical_mean = category_sums[cat] / category_counts[cat]
                    count = category_counts[cat]
                    
                    # 应用平滑
                    smoothed = (count * historical_mean + self.smoothing * self.target_mean) / \
                              (count + self.smoothing)
                    encoded_perm[i] = smoothed
                else:
                    # 无历史数据,使用全局均值
                    encoded_perm[i] = self.target_mean
                
                # 更新统计(为后续样本)
                category_sums[cat] = category_sums.get(cat, 0) + current_target
                category_counts[cat] = category_counts.get(cat, 0) + 1
            
            # 恢复原始顺序
            all_encodings[perm_idx] = encoded_perm[inv_permutation]
        
        # 多排列取平均
        final_encoding = all_encodings.mean(axis=0)
        
        # 存储最终统计量用于测试集
        self.category_stats = {cat: (category_sums[cat] / category_counts[cat]) 
                              for cat in category_sums}
        
        return final_encoding
    
    def transform(self, X: pd.Series) -> np.ndarray:
        """测试集转换:使用训练集的全局统计量"""
        return X.map(self.category_stats).fillna(self.target_mean).values

class LeakageDetector:
    """检测目标编码中的数据泄露"""
    
    @staticmethod
    def calculate_leakage_score(encoded_train: np.ndarray, y_train: np.ndarray) -> float:
        """
        计算泄露分数:编码值与目标值的直接相关性
        高相关性表明存在泄露
        """
        correlation = np.corrcoef(encoded_train, y_train)[0, 1]
        return abs(correlation)
    
    @staticmethod
    def permutation_leakage_test(encoder_class, X: pd.Series, y: pd.Series, 
                                 n_trials: int = 10) -> dict:
        """
        通过置换测试评估泄露:比较真实目标与随机目标的编码差异
        """
        real_correlations = []
        random_correlations = []
        
        for _ in range(n_trials):
            # 真实数据
            encoder = encoder_class()
            encoded_real = encoder.fit_transform(X, y)
            real_corr = np.corrcoef(encoded_real, y)[0, 1]
            real_correlations.append(abs(real_corr))
            
            # 随机目标
            y_random = pd.Series(np.random.permutation(y.values), index=y.index)
            encoder_random = encoder_class()
            encoded_random = encoder_random.fit_transform(X, y_random)
            random_corr = np.corrcoef(encoded_random, y_random)[0, 1]
            random_correlations.append(abs(random_corr))
        
        return {
            'real_corr_mean': np.mean(real_correlations),
            'random_corr_mean': np.mean(random_correlations),
            'leakage_indicator': np.mean(real_correlations) - np.mean(random_correlations)
        }

if __name__ == "__main__":
    print("[1] 生成高基数分类数据")
    np.random.seed(42)
    n_samples = 1000
    n_categories = 100
    
    # 生成分类特征(高基数)
    categories = [f'cat_{i}' for i in range(n_categories)]
    X_cat = pd.Series(np.random.choice(categories, n_samples))
    
    # 生成目标变量(与类别有真实关系但含噪声)
    true_effects = {cat: np.random.normal(0, 5) for cat in categories}
    y = X_cat.map(true_effects) + np.random.normal(0, 1, n_samples)
    
    X_train, X_test, y_train, y_test = train_test_split(X_cat, y, test_size=0.2, random_state=42)
    
    print(f"Categories: {n_categories}, Samples: {n_samples}")
    print(f"Train samples per category (avg): {len(X_train) / n_categories:.1f}")
    
    print("\n[2] 不同编码方法对比")
    
    # 标准目标编码(有泄露)
    standard_encoder = TargetEncoder(smoothing=10.0)
    X_train_standard = standard_encoder.fit_transform(X_train, y_train)
    X_test_standard = standard_encoder.transform(X_test)
    
    # 留一编码
    loo_encoder = LeaveOneOutEncoder(smoothing=10.0)
    X_train_loo = loo_encoder.fit_transform(X_train, y_train)
    X_test_loo = loo_encoder.transform(X_test)
    
    # CatBoost顺序编码
    ordered_encoder = CatBoostOrderedEncoder(smoothing=10.0, n_permutations=3)
    X_train_ordered = ordered_encoder.fit_transform(X_train, y_train)
    X_test_ordered = ordered_encoder.transform(X_test)
    
    print("\n编码示例(前5个样本):")
    print(f"Original:      {X_train.iloc[:5].values}")
    print(f"Standard:      {X_train_standard[:5]}")
    print(f"Leave-One-Out: {X_train_loo[:5]}")
    print(f"Ordered:       {X_train_ordered[:5]}")
    print(f"True Target:   {y_train.iloc[:5].values}")
    
    print("\n[3] 泄露检测")
    detector = LeakageDetector()
    
    standard_leakage = detector.calculate_leakage_score(X_train_standard, y_train)
    loo_leakage = detector.calculate_leakage_score(X_train_loo, y_train)
    ordered_leakage = detector.calculate_leakage_score(X_train_ordered, y_train)
    
    print(f"\n训练集编码-目标相关性(泄露指标):")
    print(f"  Standard Encoding: {standard_leakage:.4f} (High leakage expected)")
    print(f"  Leave-One-Out:     {loo_leakage:.4f}")
    print(f"  Ordered Encoding:  {ordered_leakage:.4f}")
    
    print("\n[4] 泛化性能对比")
    # 使用Ridge回归评估编码质量
    encoders = {
        'Standard': (X_train_standard, X_test_standard),
        'Leave-One-Out': (X_train_loo, X_test_loo),
        'Ordered (CatBoost)': (X_train_ordered, X_test_ordered)
    }
    
    for name, (X_tr, X_te) in encoders.items():
        # 重塑为2D数组
        X_tr_2d = X_tr.reshape(-1, 1)
        X_te_2d = X_te.reshape(-1, 1)
        
        model = Ridge(alpha=1.0)
        model.fit(X_tr_2d, y_train)
        
        train_pred = model.predict(X_tr_2d)
        test_pred = model.predict(X_te_2d)
        
        train_mse = mean_squared_error(y_train, train_pred)
        test_mse = mean_squared_error(y_test, test_pred)
        
        overfit_ratio = train_mse / test_mse if test_mse > 0 else float('inf')
        
        print(f"\n{name}:")
        print(f"  Train MSE: {train_mse:.3f}")
        print(f"  Test MSE:  {test_mse:.3f}")
        print(f"  Overfit ratio (train/test): {overfit_ratio:.3f}")
        print(f"  {'HIGH LEAKAGE' if overfit_ratio < 0.8 else 'OK'}")
    
    print("\n[5] 罕见类别处理")
    # 创建罕见类别(只出现1-2次)
    X_rare = X_train.copy()
    rare_cats = [f'rare_{i}' for i in range(20)]
    rare_indices = np.random.choice(len(X_rare), 30, replace=False)
    X_rare.iloc[rare_indices[:20]] = rare_cats[:20]
    X_rare.iloc[rare_indices[20:]] = rare_cats[:10]  # 部分重复
    
    encoder_rare = CatBoostOrderedEncoder(smoothing=10.0)
    encoded_rare = encoder_rare.fit_transform(X_rare, y_train.iloc[X_rare.index])
    
    print(f"\n罕见类别编码值(应接近全局均值 {y_train.mean():.2f}):")
    for cat in rare_cats[:5]:
        mask = X_rare == cat
        if mask.any():
            print(f"  {cat}: {encoded_rare[mask].mean():.3f}")

9.2.2 缺失值与异常值处理

9.2.2.1 多重插补(Multiple Imputation)的链式方程(MICE)与EM算法

缺失数据机制分为完全随机缺失(MCAR)、随机缺失(MAR)与非随机缺失(MNAR),不同机制要求不同的处理策略。简单均值插补虽能恢复样本量,但会低估方差并扭曲变量间相关性。多重插补(Multiple Imputation)通过生成m个完整数据集并分别分析后合并结果,正确反映了缺失值引入的不确定性。链式方程多重插补(MICE)将多元缺失值问题分解为一系列条件分布的迭代抽样,对每个含缺失的变量建立以其他变量为预测变量的回归模型,从预测分布中抽取插补值,通过Gibbs采样收敛至后验分布。期望最大化(EM)算法则通过E步(基于当前参数估计缺失值的条件期望)与M步(基于完整数据更新参数)的迭代最大化似然函数,适用于多元正态分布假设下的参数估计。以下代码实现了MICE与EM算法的完整流程。

Python

复制

"""
【脚本说明】
本脚本实现缺失值处理的高级方法,包含:
1. 链式方程多重插补(MICE)的迭代Gibbs采样实现
2. 期望最大化(EM)算法用于缺失值估计
3. 多重插补的统计推断合并(Rubin's Rules)
4. 不同缺失机制(MCAR, MAR)下的性能评估
5. 与简单插补(均值、回归)的对比分析

使用方式:
- 直接运行查看MICE与EM算法在合成数据上的表现
- 调整缺失比例与变量相关性观察插补质量
- 适用于调查数据、医疗数据等复杂缺失场景
"""

import numpy as np
import pandas as pd
from sklearn.linear_model import BayesianRidge, LinearRegression
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_squared_error
from scipy import stats
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict

class MICEImputer:
    """
    链式方程多重插补(MICE)实现
    
    原理:对每个含缺失变量建立条件分布,迭代抽样直至收敛
    """
    
    def __init__(self, max_iter: int = 10, n_imputations: int = 5, 
                 random_state: int = 42):
        self.max_iter = max_iter
        self.n_imputations = n_imputations
        self.random_state = random_state
        self.imputed_datasets = []
        self.models = {}
    
    def _impute_column(self, data: pd.DataFrame, target_col: str, 
                      current_imputation: pd.DataFrame) -> pd.Series:
        """
        对单个列进行条件插补
        
        使用其他列作为特征,建立贝叶斯回归模型
        """
        # 分离已知和缺失
        mask_missing = current_imputation[target_col].isna()
        mask_observed = ~mask_missing
        
        if mask_observed.sum() < 2:
            # 如果已知样本太少,使用均值
            return current_imputation[target_col].fillna(
                current_imputation[target_col].mean())
        
        # 准备训练数据
        feature_cols = [col for col in data.columns if col != target_col]
        X_train = current_imputation.loc[mask_observed, feature_cols]
        y_train = current_imputation.loc[mask_observed, target_col]
        
        # 处理特征中的剩余缺失值(用临时均值填充)
        X_train = X_train.fillna(X_train.mean())
        
        # 拟合贝叶斯回归(提供预测分布)
        model = BayesianRidge()
        model.fit(X_train, y_train)
        
        # 预测缺失值
        X_missing = current_imputation.loc[mask_missing, feature_cols]
        X_missing = X_missing.fillna(X_missing.mean())
        
        # 从预测分布中抽样(而非仅使用点估计)
        y_pred_mean, y_pred_std = model.predict(X_missing, return_std=True)
        y_imputed = np.random.normal(y_pred_mean, y_pred_std)
        
        # 更新当前插补
        result = current_imputation[target_col].copy()
        result.loc[mask_missing] = y_imputed
        
        return result
    
    def fit_transform(self, data: pd.DataFrame) -> List[pd.DataFrame]:
        """
        执行MICE算法,生成多个插补数据集
        
        Returns:
            List[pd.DataFrame]: m个完整数据集
        """
        np.random.seed(self.random_state)
        
        # 初始化(均值插补)
        initial_imputation = data.copy()
        for col in data.columns:
            initial_imputation[col] = initial_imputation[col].fillna(
                initial_imputation[col].mean())
        
        self.imputed_datasets = []
        
        for m in range(self.n_imputations):
            print(f"Generating imputation {m+1}/{self.n_imputations}")
            current = initial_imputation.copy()
            
            # 添加随机扰动以打破确定性
            for col in data.columns:
                noise = np.random.normal(0, current[col].std() * 0.1, len(current))
                missing_mask = data[col].isna()
                current.loc[missing_mask, col] += noise[missing_mask]
            
            # 迭代链式方程
            for iteration in range(self.max_iter):
                for col in data.columns:
                    if data[col].isna().any():
                        current[col] = self._impute_column(data, col, current)
                
                if (iteration + 1) % 5 == 0:
                    print(f"  Iteration {iteration+1}/{self.max_iter} completed")
            
            self.imputed_datasets.append(current.copy())
        
        return self.imputed_datasets
    
    def pool_results(self, results: List[Dict]) -> Dict:
        """
        应用Rubin's Rules合并多重插补的统计推断结果
        
        Args:
            results: 每个数据集的统计量(如回归系数、均值等)
                   格式: [{'mean': x, 'std': y}, ...]
        """
        # 计算组间和组内方差
        estimates = np.array([r['estimate'] for r in results])
        variances = np.array([r['variance'] for r in results])
        
        # 合并估计
        pooled_estimate = np.mean(estimates)
        
        # 合并方差(组内 + 组间)
        within_var = np.mean(variances)
        between_var = np.var(estimates, ddof=1) if len(estimates) > 1 else 0
        pooled_variance = within_var + (1 + 1/len(estimates)) * between_var
        
        return {
            'estimate': pooled_estimate,
            'variance': pooled_variance,
            'std': np.sqrt(pooled_variance),
            'ci_lower': pooled_estimate - 1.96 * np.sqrt(pooled_variance),
            'ci_upper': pooled_estimate + 1.96 * np.sqrt(pooled_variance)
        }

class EMImputer:
    """
    期望最大化(EM)算法用于缺失值插补
    
    假设数据服从多元正态分布,通过迭代最大化似然估计
    """
    
    def __init__(self, max_iter: int = 100, tol: float = 1e-6):
        self.max_iter = max_iter
        self.tol = tol
        self.mean = None
        self.cov = None
        self.imputed_data = None
    
    def _e_step(self, data: np.ndarray, mask: np.ndarray) -> np.ndarray:
        """
        E步:基于当前参数估计缺失值的条件期望
        
        对于多元正态,条件期望有闭式解
        """
        data_imputed = data.copy()
        n_samples, n_features = data.shape
        
        for i in range(n_samples):
            missing_idx = np.where(mask[i])[0]
            observed_idx = np.where(~mask[i])[0]
            
            if len(missing_idx) == 0:
                continue
            
            # 分块均值与协方差
            mu_obs = self.mean[observed_idx]
            mu_mis = self.mean[missing_idx]
            
            Sigma_oo = self.cov[np.ix_(observed_idx, observed_idx)]
            Sigma_mo = self.cov[np.ix_(missing_idx, observed_idx)]
            
            # 条件期望: E[X_m | X_o] = mu_m + Sigma_mo @ Sigma_oo^-1 @ (X_o - mu_o)
            x_obs = data[i, observed_idx]
            
            try:
                Sigma_oo_inv = np.linalg.inv(Sigma_oo)
                conditional_mean = mu_mis + Sigma_mo @ Sigma_oo_inv @ (x_obs - mu_obs)
                data_imputed[i, missing_idx] = conditional_mean
            except np.linalg.LinAlgError:
                # 如果协方差矩阵奇异,使用伪逆
                Sigma_oo_pinv = np.linalg.pinv(Sigma_oo)
                conditional_mean = mu_mis + Sigma_mo @ Sigma_oo_pinv @ (x_obs - mu_obs)
                data_imputed[i, missing_idx] = conditional_mean
        
        return data_imputed
    
    def _m_step(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """
        M步:基于完整数据(含E步估计值)更新参数
        """
        mean = np.mean(data, axis=0)
        # 使用样本协方差(有偏估计)
        cov = np.cov(data, rowvar=False, bias=True)
        return mean, cov
    
    def fit_transform(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        执行EM算法
        
        Returns:
            pd.DataFrame: 插补后的完整数据
        """
        data_array = data.values.copy()
        mask = data.isna().values
        
        # 初始化(均值插补)
        col_means = np.nanmean(data_array, axis=0)
        for i in range(data_array.shape[1]):
            data_array[np.isnan(data_array[:, i]), i] = col_means[i]
        
        self.mean = np.mean(data_array, axis=0)
        self.cov = np.cov(data_array, rowvar=False)
        
        log_likelihoods = []
        
        for iteration in range(self.max_iter):
            # E步
            data_imputed = self._e_step(data_array, mask)
            
            # M步
            new_mean, new_cov = self._m_step(data_imputed)
            
            # 检查收敛(对数似然变化)
            # 简化为参数变化
            mean_diff = np.linalg.norm(new_mean - self.mean)
            cov_diff = np.linalg.norm(new_cov - self.cov)
            
            self.mean = new_mean
            self.cov = new_cov
            data_array = data_imputed
            
            if mean_diff < self.tol and cov_diff < self.tol:
                print(f"EM converged after {iteration+1} iterations")
                break
            
            if (iteration + 1) % 20 == 0:
                print(f"EM iteration {iteration+1}, change: {mean_diff:.6f}")
        
        self.imputed_data = pd.DataFrame(data_array, columns=data.columns, 
                                        index=data.index)
        return self.imputed_data
    
    def get_parameters(self) -> Tuple[np.ndarray, np.ndarray]:
        """返回估计的均值和协方差矩阵"""
        return self.mean, self.cov

if __name__ == "__main__":
    print("[1] 生成含缺失值的合成数据")
    np.random.seed(42)
    n_samples = 500
    
    # 生成相关数据
    mean = [0, 0, 0]
    cov = [[1, 0.7, 0.5],
           [0.7, 1, 0.3],
           [0.5, 0.3, 1]]
    
    data_complete = np.random.multivariate_normal(mean, cov, n_samples)
    df_complete = pd.DataFrame(data_complete, columns=['X1', 'X2', 'X3'])
    
    # 引入MAR缺失(基于X1的值决定X2是否缺失)
    df_missing = df_complete.copy()
    missing_mask = np.random.rand(n_samples) < (1 / (1 + np.exp(-df_complete['X1'])))
    df_missing.loc[missing_mask, 'X2'] = np.nan
    
    # 随机缺失MCAR
    mcar_mask = np.random.rand(n_samples) < 0.1
    df_missing.loc[mcar_mask, 'X3'] = np.nan
    
    missing_ratio = df_missing.isna().mean()
    print(f"\n缺失比例:")
    print(missing_ratio)
    
    print("\n[2] MICE多重插补")
    mice = MICEImputer(max_iter=10, n_imputations=5)
    imputed_datasets = mice.fit_transform(df_missing)
    
    print(f"\n生成了 {len(imputed_datasets)} 个插补数据集")
    
    # 评估插补质量(以X2为例,因为我们知道真实值)
    true_values = df_complete.loc[df_missing['X2'].isna(), 'X2']
    imputed_values = [d.loc[df_missing['X2'].isna(), 'X2'].values 
                     for d in imputed_datasets]
    
    # 计算每个插补的RMSE
    rmse_list = [np.sqrt(mean_squared_error(true_values, imp)) 
                for imp in imputed_values]
    print(f"\n各插补数据集RMSE: {rmse_list}")
    print(f"平均RMSE: {np.mean(rmse_list):.4f} (+/- {np.std(rmse_list):.4f})")
    
    # 应用Rubin's Rules合并均值估计
    mean_estimates = [{'estimate': d['X2'].mean(), 'variance': d['X2'].var()/len(d)} 
                     for d in imputed_datasets]
    pooled_mean = mice.pool_results(mean_estimates)
    print(f"\nX2均值估计:")
    print(f"  真实值: {df_complete['X2'].mean():.4f}")
    print(f"  合并估计: {pooled_mean['estimate']:.4f} [{pooled_mean['ci_lower']:.4f}, {pooled_mean['ci_upper']:.4f}]")
    
    print("\n[3] EM算法插补")
    em = EMImputer(max_iter=100)
    df_em = em.fit_transform(df_missing)
    
    em_rmse = np.sqrt(mean_squared_error(
        df_complete.loc[df_missing['X2'].isna(), 'X2'],
        df_em.loc[df_missing['X2'].isna(), 'X2']
    ))
    print(f"EM算法RMSE: {em_rmse:.4f}")
    
    mean_em, cov_em = em.get_parameters()
    print(f"\nEM估计的均值: {mean_em}")
    print(f"真实均值: {mean}")
    print(f"\nEM估计的协方差矩阵:")
    print(cov_em)
    
    print("\n[4] 与简单插补对比")
    # 均值插补
    mean_imputer = SimpleImputer(strategy='mean')
    df_mean_imputed = pd.DataFrame(
        mean_imputer.fit_transform(df_missing),
        columns=df_missing.columns
    )
    mean_rmse = np.sqrt(mean_squared_error(
        df_complete.loc[df_missing['X2'].isna(), 'X2'],
        df_mean_imputed.loc[df_missing['X2'].isna(), 'X2']
    ))
    
    # 回归插补(简化版)
    df_regression = df_missing.copy()
    for col in df_missing.columns:
        if df_missing[col].isna().any():
            missing_mask = df_missing[col].isna()
            train_data = df_missing.dropna()
            if len(train_data) > 0:
                X_train = train_data.drop(columns=[col])
                y_train = train_data[col]
                model = LinearRegression()
                model.fit(X_train, y_train)
                X_missing = df_missing.loc[missing_mask].drop(columns=[col])
                X_missing = X_missing.fillna(X_missing.mean())
                df_regression.loc[missing_mask, col] = model.predict(X_missing)
    
    reg_rmse = np.sqrt(mean_squared_error(
        df_complete.loc[df_missing['X2'].isna(), 'X2'],
        df_regression.loc[df_missing['X2'].isna(), 'X2']
    ))
    
    print(f"\nRMSE对比 (X2列):")
    print(f"  Mean Imputation:    {mean_rmse:.4f}")
    print(f"  Regression Impute:  {reg_rmse:.4f}")
    print(f"  EM Algorithm:       {em_rmse:.4f}")
    print(f"  MICE (average):     {np.mean(rmse_list):.4f}")
    
    print("\n[5] 相关系数保持性")
    true_corr = df_complete.corr().values
    mice_corr = imputed_datasets[0].corr().values
    mean_corr = df_mean_imputed.corr().values
    
    corr_error_mice = np.mean(np.abs(true_corr - mice_corr))
    corr_error_mean = np.mean(np.abs(true_corr - mean_corr))
    
    print(f"\n相关系数矩阵平均绝对误差:")
    print(f"  MICE: {corr_error_mice:.4f}")
    print(f"  Mean: {corr_error_mean:.4f}")
9.2.2.2 Isolation Forest的递归随机划分与异常分数计算

Isolation Forest基于异常点比正常点更容易被隔离的假设,通过构建随机决策树森林来检测异常。算法递归地随机选择特征与分割值,将数据空间划分为越来越小的子空间,异常点通常在较浅的树深度即被隔离(路径长度短)。异常分数定义为基于路径长度的归一化度量,考虑了树的平均路径长度与二叉搜索树的期望失败深度。与基于密度或距离的方法不同,Isolation Forest具有线性时间复杂度与对高维数据的可扩展性,且无需定义距离度量或估计密度。其递归随机划分机制等价于构建随机超平面切割数据空间,切割次数的期望值与数据点的异常程度负相关。以下代码实现了Isolation Forest的完整算法及异常分数计算。

Python

复制

"""
【脚本说明】
本脚本实现Isolation Forest异常检测算法,包含:
1. 孤立树(Isolation Tree)的递归构建与随机划分
2. 路径长度计算与异常分数(Anomaly Score)推导
3. 基于二分搜索树理论的平均路径长度估计
4. 与LOF、One-Class SVM等方法的对比
5. 高维数据中的异常检测性能评估

使用方式:
- 直接运行查看孤立森林在合成数据与真实数据上的检测效果
- 调整树的数量与采样大小观察稳定性变化
- 适用于欺诈检测、系统监控、数据清洗等场景
"""

import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest as SklearnIsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.metrics import roc_auc_score, precision_recall_curve, average_precision_score
from sklearn.datasets import make_moons, make_blobs
import matplotlib.pyplot as plt
from typing import Optional, Tuple, List

class IsolationTree:
    """
    孤立树节点实现
    
    通过随机选择特征与分割点递归划分数据,直至达到高度限制或单一样本
    """
    
    def __init__(self, height_limit: int, current_height: int = 0):
        self.height_limit = height_limit
        self.current_height = current_height
        self.split_feature = None
        self.split_value = None
        self.left = None
        self.right = None
        self.size = 0  # 当前节点样本数
        self.is_external = False
    
    def fit(self, X: np.ndarray, random_state: Optional[int] = None):
        """
        递归构建孤立树
        
        Args:
            X: 训练数据 (n_samples, n_features)
        """
        if random_state is not None:
            np.random.seed(random_state)
        
        self.size = len(X)
        
        # 终止条件:达到高度限制或样本数<=1
        if self.current_height >= self.height_limit or self.size <= 1:
            self.is_external = True
            return self
        
        # 随机选择特征
        n_features = X.shape[1]
        self.split_feature = np.random.randint(n_features)
        
        # 在当前特征上随机选择分割值(在min与max之间)
        min_val = X[:, self.split_feature].min()
        max_val = X[:, self.split_feature].max()
        
        if min_val == max_val:
            self.is_external = True
            return self
        
        self.split_value = np.random.uniform(min_val, max_val)
        
        # 划分数据
        left_mask = X[:, self.split_feature] < self.split_value
        right_mask = ~left_mask
        
        # 递归构建子树
        if left_mask.any():
            self.left = IsolationTree(self.height_limit, self.current_height + 1)
            self.left.fit(X[left_mask])
        
        if right_mask.any():
            self.right = IsolationTree(self.height_limit, self.current_height + 1)
            self.right.fit(X[right_mask])
        
        return self
    
    def path_length(self, x: np.ndarray) -> float:
        """
        计算样本x的路径长度(从根到叶子的边数)
        
        Args:
            x: 单个样本 (n_features,)
        
        Returns:
            路径长度(外部节点的修正高度)
        """
        if self.is_external:
            # 修正路径长度:c(size)
            return self._average_path_length(self.size)
        
        if x[self.split_feature] < self.split_value:
            if self.left is not None:
                return 1 + self.left.path_length(x)
            else:
                return self._average_path_length(0)
        else:
            if self.right is not None:
                return 1 + self.right.path_length(x)
            else:
                return self._average_path_length(0)
    
    def _average_path_length(self, n: int) -> float:
        """
        计算大小为n的数据集的平均路径长度
        
        基于调和数: c(n) = 2H(n-1) - 2(n-1)/n
        其中 H(i) = 1 + 1/2 + ... + 1/i
        """
        if n <= 1:
            return 0
        if n == 2:
            return 1
        
        # 计算调和数
        harmonic = np.log(n - 1) + 0.5772156649  # Euler-Mascheroni常数
        return 2 * harmonic - 2 * (n - 1) / n

class CustomIsolationForest:
    """
    自定义Isolation Forest实现
    """
    
    def __init__(self, n_estimators: int = 100, max_samples: int = 256, 
                 contamination: float = 0.1, random_state: int = 42):
        self.n_estimators = n_estimators
        self.max_samples = max_samples
        self.contamination = contamination
        self.random_state = random_state
        self.trees = []
        self.offset_ = None
    
    def fit(self, X: np.ndarray) -> 'CustomIsolationForest':
        """
        训练孤立森林
        
        步骤:
        1. 子采样(提高检测性能与效率)
        2. 构建多棵孤立树
        """
        np.random.seed(self.random_state)
        self.trees = []
        
        # 限制树的高度: ceil(log2(max_samples))
        height_limit = int(np.ceil(np.log2(self.max_samples)))
        
        for i in range(self.n_estimators):
            # 随机子采样
            if len(X) > self.max_samples:
                indices = np.random.choice(len(X), self.max_samples, replace=False)
                X_sample = X[indices]
            else:
                X_sample = X
            
            # 构建孤立树
            tree = IsolationTree(height_limit)
            tree.fit(X_sample, random_state=self.random_state + i)
            self.trees.append(tree)
        
        return self
    
    def anomaly_score(self, X: np.ndarray) -> np.ndarray:
        """
        计算异常分数
        
        s(x, n) = 2 ^ (-E(h(x)) / c(n))
        
        其中:
        - E(h(x)): 样本x在多棵树中的平均路径长度
        - c(n): 给定样本数n时的平均路径长度
        """
        scores = np.zeros(len(X))
        
        for tree in self.trees:
            for i, x in enumerate(X):
                scores[i] += tree.path_length(x)
        
        # 平均路径长度
        avg_path_lengths = scores / self.n_estimators
        
        # 归一化
        n = self.max_samples if hasattr(self, 'max_samples') else len(X)
        avg_c = self._average_path_length(n)
        
        # 计算异常分数
        anomaly_scores = 2 ** (-avg_path_lengths / avg_c)
        
        return anomaly_scores
    
    def _average_path_length(self, n: int) -> float:
        """计算平均路径长度常数"""
        if n <= 1:
            return 0
        if n == 2:
            return 1
        return 2 * (np.log(n - 1) + 0.5772156649) - 2 * (n - 1) / n
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        """
        预测异常标签
        
        -1: 异常
         1: 正常
        """
        scores = self.anomaly_score(X)
        threshold = np.percentile(scores, (1 - self.contamination) * 100)
        return np.where(scores > threshold, -1, 1)

class AnomalyDetectionBenchmark:
    """异常检测方法对比"""
    
    def __init__(self):
        self.results = {}
    
    def generate_synthetic_data(self, n_samples: int = 500, 
                               contamination: float = 0.1) -> Tuple[np.ndarray, np.ndarray]:
        """
        生成合成异常数据
        
        构造两个密集簇与稀疏异常点
        """
        n_inliers = int(n_samples * (1 - contamination))
        n_outliers = n_samples - n_inliers
        
        # 生成密集簇
        X_inliers, _ = make_blobs(n_samples=n_inliers, centers=[[0, 0], [5, 5]], 
                                 cluster_std=0.5, random_state=42)
        
        # 生成均匀分布的异常点
        X_outliers = np.random.uniform(low=-3, high=8, size=(n_outliers, 2))
        
        X = np.vstack([X_inliers, X_outliers])
        y = np.hstack([np.ones(n_inliers), -np.ones(n_outliers)])
        
        # 打乱顺序
        shuffle_idx = np.random.permutation(len(X))
        return X[shuffle_idx], y[shuffle_idx]
    
    def evaluate_model(self, model_name: str, model, X: np.ndarray, y_true: np.ndarray):
        """评估模型性能"""
        if hasattr(model, 'fit_predict'):
            y_pred = model.fit_predict(X)
        else:
            model.fit(X[y_true == 1])  # 仅在正常数据上训练
            y_pred = model.predict(X)
        
        # 计算AUC(需要分数)
        if hasattr(model, 'anomaly_score'):
            scores = model.anomaly_score(X)
        elif hasattr(model, 'decision_function'):
            scores = -model.decision_function(X)  # 反转符号使异常为正
        elif hasattr(model, 'score_samples'):
            scores = -model.score_samples(X)
        else:
            scores = None
        
        auc = roc_auc_score(y_true == -1, scores) if scores is not None else None
        avg_precision = average_precision_score(y_true == -1, scores) if scores is not None else None
        
        self.results[model_name] = {
            'auc': auc,
            'avg_precision': avg_precision,
            'predictions': y_pred
        }
        
        return auc, avg_precision

if __name__ == "__main__":
    print("[1] 生成测试数据")
    benchmark = AnomalyDetectionBenchmark()
    X, y = benchmark.generate_synthetic_data(n_samples=500, contamination=0.1)
    
    print(f"数据形状: {X.shape}")
    print(f"异常比例: {(y == -1).mean():.2%}")
    
    print("\n[2] 训练自定义Isolation Forest")
    iso_forest = CustomIsolationForest(n_estimators=100, max_samples=256, 
                                      contamination=0.1, random_state=42)
    iso_forest.fit(X)
    
    # 计算异常分数
    scores = iso_forest.anomaly_score(X)
    print(f"\n异常分数统计:")
    print(f"  Min: {scores.min():.4f}")
    print(f"  Max: {scores.max():.4f}")
    print(f"  Mean: {scores.mean():.4f}")
    print(f"  异常样本平均分数: {scores[y == -1].mean():.4f}")
    print(f"  正常样本平均分数: {scores[y == 1].mean():.4f}")
    
    print("\n[3] 与sklearn实现对比")
    sklearn_iso = SklearnIsolationForest(n_estimators=100, contamination=0.1, 
                                        random_state=42)
    sklearn_iso.fit(X)
    sklearn_scores = -sklearn_iso.score_samples(X)  # 转换为异常分数
    
    correlation = np.corrcoef(scores, sklearn_scores)[0, 1]
    print(f"自定义实现与sklearn分数相关性: {correlation:.4f}")
    
    print("\n[4] 方法对比")
    models = {
        'Isolation Forest (Custom)': iso_forest,
        'Isolation Forest (Sklearn)': SklearnIsolationForest(contamination=0.1, random_state=42),
        'Local Outlier Factor': LocalOutlierFactor(n_neighbors=20, contamination=0.1),
        'One-Class SVM': OneClassSVM(nu=0.1, kernel='rbf', gamma=0.1)
    }
    
    for name, model in models.items():
        try:
            auc, ap = benchmark.evaluate_model(name, model, X, y)
            print(f"\n{name}:")
            print(f"  AUC: {auc:.4f}" if auc else "  AUC: N/A")
            print(f"  Average Precision: {ap:.4f}" if ap else "  AP: N/A")
        except Exception as e:
            print(f"\n{name}: Error - {e}")
    
    print("\n[5] 高维数据测试")
    # 生成高维数据
    n_features = 50
    X_high_dim = np.random.randn(500, n_features)
    # 添加异常(不同分布)
    X_high_dim[-50:] = np.random.randn(50, n_features) * 3 + 5
    
    iso_high = CustomIsolationForest(n_estimators=50, max_samples=200)
    iso_high.fit(X_high_dim)
    scores_high = iso_high.anomaly_score(X_high_dim)
    
    print(f"高维数据 (dim={n_features}):")
    print(f"  正常样本平均路径长度: {scores_high[:-50].mean():.4f}")
    print(f"  异常样本平均路径长度: {scores_high[-50:].mean():.4f}")
    print(f"  区分度: {scores_high[-50:].mean() - scores_high[:-50].mean():.4f}")
    
    print("\n[6] 路径长度分布分析")
    # 分析正常与异常样本的路径长度分布
    normal_mask = y == 1
    anomaly_mask = y == -1
    
    print(f"路径长度统计:")
    print(f"  正常样本 - Mean: {scores[normal_mask].mean():.4f}, Std: {scores[normal_mask].std():.4f}")
    print(f"  异常样本 - Mean: {scores[anomaly_mask].mean():.4f}, Std: {scores[anomaly_mask].std():.4f}")
9.2.2.3 稳健统计:Huber损失与RANSAC的随机采样一致性实现

稳健统计方法旨在降低异常值对模型估计的影响,Huber损失通过结合平方损失与绝对损失,在残差较小时保持二次增长的效率,在残差较大时转为线性增长以抑制异常值影响,其切换阈值ε决定了稳健区间。RANSAC(Random Sample Consensus)则采用重采样策略,通过随机选取最小样本子集拟合模型,并统计内点(Inliers,残差小于阈值)数量,选择内点最多的模型作为最终估计,具有在超过50%异常值污染下仍能收敛的理论保证。两者分别从损失函数与采样策略角度实现稳健性:Huber损失适用于轻度污染数据的连续优化,RANSAC适用于高污染场景下的模型初始化或离群点剔除。以下代码实现了Huber回归的迭代加权最小二乘(IRLS)求解与RANSAC的完整算法流程。

Python

"""
【脚本说明】
本脚本实现稳健统计方法,包含:
1. Huber损失的迭代加权最小二乘(IRLS)实现
2. RANSAC(随机采样一致性)算法用于稳健回归
3. 不同污染比例下Huber与RANSAC的鲁棒性对比
4. M估计(M-Estimator)的权重函数分析
5. 与最小二乘、L1回归的偏差比较

使用方式:
- 直接运行查看稳健回归在异常数据上的拟合效果
- 调整异常值比例与强度观察方法稳定性
- 适用于计算机视觉(RANSAC用于位姿估计)、金融数据(Huber用于收益率建模)等
"""

import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import HuberRegressor, RANSACRegressor, LinearRegression
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.datasets import make_regression
from typing import Tuple, Optional, Callable

class HuberRegressionIRLS:
    """
    Huber回归的迭代重加权最小二乘(IRLS)实现
    
    Huber损失:
    L(r) = 0.5 * r^2          if |r| <= epsilon
           epsilon * |r| - 0.5 * epsilon^2  otherwise
    """
    
    def __init__(self, epsilon: float = 1.35, max_iter: int = 100, 
                 tol: float = 1e-6):
        self.epsilon = epsilon
        self.max_iter = max_iter
        self.tol = tol
        self.coef_ = None
        self.intercept_ = None
    
    def _huber_weights(self, residuals: np.ndarray) -> np.ndarray:
        """
        计算Huber权重
        
        对于小残差: w = 1
        对于大残差: w = epsilon / |r|
        """
        abs_r = np.abs(residuals)
        weights = np.ones_like(residuals)
        mask = abs_r > self.epsilon
        weights[mask] = self.epsilon / abs_r[mask]
        return weights
    
    def fit(self, X: np.ndarray, y: np.ndarray):
        """
        使用IRLS拟合Huber回归
        
        迭代过程:
        1. 计算当前残差
        2. 根据残差计算权重
        3. 加权最小二乘求解
        4. 重复直至收敛
        """
        n_samples, n_features = X.shape
        
        # 初始化(普通最小二乘)
        X_with_intercept = np.column_stack([np.ones(n_samples), X])
        params = np.linalg.lstsq(X_with_intercept, y, rcond=None)[0]
        
        for iteration in range(self.max_iter):
            # 计算残差
            y_pred = X_with_intercept @ params
            residuals = y - y_pred
            
            # 计算权重
            weights = self._huber_weights(residuals)
            
            # 加权最小二乘: (X'WX)^{-1} X'Wy
            W = np.diag(weights)
            XWX = X_with_intercept.T @ W @ X_with_intercept
            
            try:
                new_params = np.linalg.inv(XWX) @ X_with_intercept.T @ W @ y
            except np.linalg.LinAlgError:
                new_params = np.linalg.pinv(XWX) @ X_with_intercept.T @ W @ y
            
            # 检查收敛
            if np.linalg.norm(new_params - params) < self.tol:
                print(f"Huber converged after {iteration+1} iterations")
                break
            
            params = new_params
        
        self.intercept_ = params[0]
        self.coef_ = params[1:]
        
        return self
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        """预测"""
        return X @ self.coef_ + self.intercept_

class CustomRANSAC:
    """
    RANSAC(随机采样一致性)算法实现
    
    核心思想:通过随机采样寻找内点最多的模型
    """
    
    def __init__(self, min_samples: Optional[int] = None, 
                 residual_threshold: float = None, 
                 max_trials: int = 100, 
                 stop_probability: float = 0.99):
        self.min_samples = min_samples
        self.residual_threshold = residual_threshold
        self.max_trials = max_trials
        self.stop_probability = stop_probability
        self.best_model = None
        self.best_inlier_mask = None
        self.best_score = 0
    
    def fit(self, X: np.ndarray, y: np.ndarray):
        """
        RANSAC主算法
        
        步骤:
        1. 随机选择最小样本集
        2. 拟合模型
        3. 统计内点数量
        4. 保留最佳模型
        5. 使用最佳模型的内点重新拟合
        """
        n_samples, n_features = X.shape
        
        if self.min_samples is None:
            self.min_samples = n_features + 1
        
        if self.residual_threshold is None:
            # 使用MAD(中位数绝对偏差)估计阈值
            mad = np.median(np.abs(y - np.median(y)))
            self.residual_threshold = 3 * mad
        
        best_inlier_num = 0
        best_inlier_mask = None
        
        for trial in range(self.max_trials):
            # 随机采样
            indices = np.random.choice(n_samples, self.min_samples, replace=False)
            X_subset = X[indices]
            y_subset = y[indices]
            
            # 拟合模型(最小二乘)
            try:
                model = self._fit_model(X_subset, y_subset)
            except:
                continue
            
            # 计算所有样本的残差
            y_pred = self._predict_model(model, X)
            residuals = np.abs(y - y_pred)
            
            # 统计内点
            inlier_mask = residuals < self.residual_threshold
            inlier_num = np.sum(inlier_mask)
            
            # 更新最佳模型
            if inlier_num > best_inlier_num:
                best_inlier_num = inlier_num
                best_inlier_mask = inlier_mask
                self.best_model = model
                
                # 动态调整迭代次数(基于内点比例)
                inlier_ratio = inlier_num / n_samples
                if inlier_ratio > 0:
                    # 计算达到置信度所需的迭代次数
                    required_trials = np.log(1 - self.stop_probability) / \
                                     np.log(1 - inlier_ratio ** self.min_samples)
                    if trial >= required_trials:
                        print(f"RANSAC early stop at trial {trial+1}")
                        break
        
        # 使用所有内点重新拟合
        if best_inlier_mask is not None and best_inlier_mask.sum() >= self.min_samples:
            self.best_model = self._fit_model(X[best_inlier_mask], y[best_inlier_mask])
            self.best_inlier_mask = best_inlier_mask
            self.best_score = best_inlier_num
        
        return self
    
    def _fit_model(self, X: np.ndarray, y: np.ndarray):
        """拟合线性模型"""
        X_with_intercept = np.column_stack([np.ones(len(X)), X])
        return np.linalg.lstsq(X_with_intercept, y, rcond=None)[0]
    
    def _predict_model(self, model: np.ndarray, X: np.ndarray) -> np.ndarray:
        """使用模型预测"""
        X_with_intercept = np.column_stack([np.ones(len(X)), X])
        return X_with_intercept @ model
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        """预测"""
        return self._predict_model(self.best_model, X)
    
    def score(self, X: np.ndarray, y: np.ndarray) -> float:
        """返回内点数量"""
        y_pred = self.predict(X)
        residuals = np.abs(y - y_pred)
        return np.sum(residuals < self.residual_threshold)

class RobustRegressionComparison:
    """稳健回归方法对比"""
    
    def __init__(self):
        self.results = {}
    
    def add_outliers(self, X: np.ndarray, y: np.ndarray, 
                    outlier_ratio: float = 0.1, 
                    noise_scale: float = 10.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        添加异常值
        
        Returns:
            X, y_with_outliers, is_outlier (mask)
        """
        n_samples = len(X)
        n_outliers = int(n_samples * outlier_ratio)
        
        is_outlier = np.zeros(n_samples, dtype=bool)
        outlier_indices = np.random.choice(n_samples, n_outliers, replace=False)
        is_outlier[outlier_indices] = True
        
        y_outliers = y.copy()
        # 添加大幅度噪声
        y_outliers[outlier_indices] += np.random.normal(0, noise_scale, n_outliers)
        
        return X, y_outliers, is_outlier
    
    def compare_methods(self, X: np.ndarray, y: np.ndarray, 
                       y_clean: np.ndarray, is_outlier: np.ndarray):
        """
        对比不同回归方法
        """
        methods = {
            'Least Squares': LinearRegression(),
            'Huber (Custom)': HuberRegressionIRLS(epsilon=1.35),
            'Huber (Sklearn)': HuberRegressor(epsilon=1.35),
            'RANSAC (Custom)': CustomRANSAC(max_trials=100),
            'RANSAC (Sklearn)': RANSACRegressor(min_samples=0.5, 
                                               residual_threshold=None,
                                               max_trials=100)
        }
        
        for name, model in methods.items():
            try:
                model.fit(X, y)
                y_pred = model.predict(X)
                
                # 在清洁数据上评估
                mse_clean = mean_squared_error(y_clean, y_pred)
                mae_clean = mean_absolute_error(y_clean, y_pred)
                
                # 参数偏差(假设真实模型已知为线性)
                if hasattr(model, 'coef_'):
                    coef = model.coef_
                    intercept = model.intercept_
                else:
                    coef = model.estimator_.coef_
                    intercept = model.estimator_.intercept_
                
                self.results[name] = {
                    'mse': mse_clean,
                    'mae': mae_clean,
                    'coef': coef,
                    'intercept': intercept
                }
                
                print(f"\n{name}:")
                print(f"  MSE (on clean): {mse_clean:.4f}")
                print(f"  MAE (on clean): {mae_clean:.4f}")
                print(f"  Coef: {coef}")
                print(f"  Intercept: {intercept:.4f}")
                
            except Exception as e:
                print(f"\n{name}: Error - {e}")

if __name__ == "__main__":
    print("[1] 生成线性数据并添加异常值")
    np.random.seed(42)
    n_samples = 100
    
    # 生成真实线性关系: y = 2x + 1 + noise
    X = np.linspace(0, 10, n_samples).reshape(-1, 1)
    true_coef = 2.0
    true_intercept = 1.0
    y_clean = true_coef * X.ravel() + true_intercept + np.random.normal(0, 0.5, n_samples)
    
    # 添加异常值
    comparator = RobustRegressionComparison()
    X, y_outliers, is_outlier = comparator.add_outliers(X, y_clean, 
                                                       outlier_ratio=0.2, 
                                                       noise_scale=15.0)
    
    print(f"数据样本数: {n_samples}")
    print(f"异常值比例: {is_outlier.mean():.1%}")
    print(f"真实参数: coef={true_coef}, intercept={true_intercept}")
    
    print("\n[2] 方法对比")
    comparator.compare_methods(X, y_outliers, y_clean, is_outlier)
    
    print("\n[3] 不同污染比例下的鲁棒性")
    contamination_levels = [0.05, 0.1, 0.2, 0.3, 0.4, 0.5]
    huber_errors = []
    ransac_errors = []
    ols_errors = []
    
    for cont in contamination_levels:
        X_test = np.linspace(0, 10, 100).reshape(-1, 1)
        y_test_clean = true_coef * X_test.ravel() + true_intercept
        
        # 生成污染数据
        _, y_polluted, _ = comparator.add_outliers(X, y_clean, outlier_ratio=cont)
        
        # Huber
        huber = HuberRegressionIRLS(epsilon=1.35)
        huber.fit(X, y_polluted)
        huber_pred = huber.predict(X_test)
        huber_errors.append(mean_squared_error(y_test_clean, huber_pred))
        
        # RANSAC
        ransac = CustomRANSAC(max_trials=200)
        ransac.fit(X, y_polluted)
        ransac_pred = ransac.predict(X_test)
        ransac_errors.append(mean_squared_error(y_test_clean, ransac_pred))
        
        # OLS
        ols = LinearRegression()
        ols.fit(X, y_polluted)
        ols_pred = ols.predict(X_test)
        ols_errors.append(mean_squared_error(y_test_clean, ols_pred))
        
        print(f"Contamination {cont:.0%}: Huber={huber_errors[-1]:.4f}, "
              f"RANSAC={ransac_errors[-1]:.4f}, OLS={ols_errors[-1]:.4f}")
    
    print("\n[4] Huber损失的权重分析")
    huber_model = HuberRegressionIRLS(epsilon=1.35)
    huber_model.fit(X, y_outliers)
    
    residuals = y_outliers - huber_model.predict(X)
    weights = huber_model._huber_weights(residuals)
    
    print(f"异常值平均权重: {weights[is_outlier].mean():.4f}")
    print(f"正常值平均权重: {weights[~is_outlier].mean():.4f}")
    print(f"权重比 (正常/异常): {weights[~is_outlier].mean() / weights[is_outlier].mean():.2f}x")
    
    print("\n[5] RANSAC内点分析")
    ransac_model = CustomRANSAC(max_trials=100)
    ransac_model.fit(X, y_outliers)
    
    detected_inliers = ransac_model.best_inlier_mask
    true_inliers = ~is_outlier
    
    # 计算混淆矩阵
    tp = np.sum(detected_inliers & true_inliers)
    fp = np.sum(detected_inliers & is_outlier)
    tn = np.sum(~detected_inliers & is_outlier)
    fn = np.sum(~detected_inliers & true_inliers)
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    
    print(f"内点检测精确率: {precision:.2%}")
    print(f"内点检测召回率: {recall:.2%}")
    print(f"最终模型内点数: {ransac_model.best_score}")

9.3 模型序列化与部署初步

9.3.1 模型持久化

9.3.1.1 Pickle协议版本与自定义对象的安全序列化(joblib优化)

模型持久化是机器学习工程流程中的关键环节,涉及训练状态、预处理参数及模型架构的存储与恢复。Python的Pickle模块通过协议版本控制序列化格式,高版本协议支持更高效的字节编码与更丰富的类型支持,但可能牺牲向后兼容性。对于包含大规模NumPy数组的机器学习模型,标准Pickle存在序列化效率低下与内存复制的缺陷。Joblib针对科学计算优化,通过内存映射与压缩技术处理大型数组,避免序列化过程中的数据拷贝,并支持延迟加载以缓解内存压力。自定义对象的序列化需实现__getstate____setstate__方法以控制状态捕获与重建逻辑,对于包含非序列化资源(如数据库连接、文件句柄)的复杂模型,需显式管理这些瞬态属性的排除与重新初始化。以下代码展示了Pickle协议选择、Joblib优化及自定义序列化策略的实现。

Python

复制

"""
【脚本说明】
本脚本实现模型持久化的高级技术,包含:
1. 不同Pickle协议版本的对比(兼容性vs效率)
2. Joblib对大型数组的优化序列化(内存映射、压缩)
3. 自定义模型的__getstate__/__setstate__实现
4. 增量模型保存与版本控制集成
5. 序列化安全性检查(防止代码注入)

使用方式:
- 直接运行查看不同序列化方法的性能与文件大小对比
- 测试自定义对象在复杂状态(缓存、临时文件)下的恢复
- 适用于生产环境中的模型部署与版本管理
"""

import pickle
import joblib
import numpy as np
import time
import os
import tempfile
from typing import Any, Dict, Optional
import hashlib
import json

class SecureModelSerializer:
    """安全的模型序列化器"""
    
    # 允许的模块白名单(防止反序列化攻击)
    SAFE_MODULES = {
        'numpy.core.numeric',
        'numpy',
        'sklearn',
        '__builtin__',
        'builtins'
    }
    
    @classmethod
    def secure_load(cls, filepath: str) -> Any:
        """
        安全加载:检查文件哈希并限制反序列化
        
        注意:这仅为基础防护,生产环境应使用更严格的沙箱
        """
        # 验证文件完整性(假设有预先计算的哈希)
        # 实际应用中应从安全源获取预期哈希
        with open(filepath, 'rb') as f:
            content = f.read()
            file_hash = hashlib.sha256(content).hexdigest()
        
        print(f"Loading file with SHA256: {file_hash[:16]}...")
        
        # 使用受限的Unpickler
        class RestrictedUnpickler(pickle.Unpickler):
            def find_class(self, module, name):
                if module not in cls.SAFE_MODULES:
                    raise ValueError(f"Attempted to load forbidden module: {module}")
                return super().find_class(module, name)
        
        with open(filepath, 'rb') as f:
            return RestrictedUnpickler(f).load()

class CustomModelWithState:
    """
    具有复杂状态的自定义模型
    
    包含:
    - 模型参数(可序列化)
    - 训练缓存(可丢弃)
    - 临时文件句柄(需重建)
    """
    
    def __init__(self, n_features: int):
        self.n_features = n_features
        self.weights = np.random.randn(n_features)
        self.training_cache = {}  # 大缓存,不需要持久化
        self.temp_file = None
        self.version = "1.0.0"
        self._create_temp_resource()
    
    def _create_temp_resource(self):
        """创建临时资源(文件句柄等)"""
        self.temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False)
        self.temp_file.write("Model temporary data\n")
        self.temp_file.flush()
    
    def train(self, X: np.ndarray, y: np.ndarray):
        """模拟训练,填充缓存"""
        # 模拟大缓存
        self.training_cache['gradients'] = np.random.randn(1000, 1000)
        self.training_cache['history'] = list(range(10000))
        
        # 实际训练(简化)
        self.weights = np.linalg.lstsq(X, y, rcond=None)[0]
        self.trained_ = True
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        return X @ self.weights
    
    def __getstate__(self):
        """
        自定义序列化状态
        
        排除:
        1. 大缓存(重新计算)
        2. 文件句柄(重新打开)
        """
        state = self.__dict__.copy()
        
        # 移除大缓存以节省空间
        state['training_cache'] = {}
        
        # 关闭并移除文件句柄(保存路径)
        if self.temp_file:
            state['temp_file_path'] = self.temp_file.name
            state['temp_file'] = None
        
        # 添加序列化元数据
        state['_serialization_meta'] = {
            'timestamp': time.time(),
            'version': self.version
        }
        
        print(f"Pickling: Removed cache ({len(self.training_cache)} items), "
              f"closed temp file")
        return state
    
    def __setstate__(self, state):
        """自定义反序列化状态"""
        # 恢复基本属性
        self.__dict__.update(state)
        
        # 重建临时资源
        if hasattr(self, 'temp_file_path'):
            self._create_temp_resource()
            # 可选:恢复文件内容
            print(f"Unpickling: Recreated temp file at {self.temp_file.name}")
        
        # 重新初始化缓存(如果需要)
        self.training_cache = {}
        
        print(f"Unpickled model version {self.version}")

class SerializationBenchmark:
    """序列化方法基准测试"""
    
    def __init__(self):
        self.results = {}
    
    def create_large_model(self, n_features: int = 1000, n_samples: int = 10000):
        """创建包含大数组的模型"""
        model = {
            'weights': np.random.randn(n_features, n_features),
            'bias': np.random.randn(n_features),
            'training_data': np.random.randn(n_samples, n_features),  # 大数据
            'metadata': {
                'created': time.time(),
                'description': 'Large model for serialization test'
            }
        }
        return model
    
    def benchmark_pickle_protocols(self, model: Dict, filepath: str):
        """测试不同Pickle协议"""
        protocols = [2, 3, 4, 5]  # 协议版本
        
        results = {}
        
        for protocol in protocols:
            filename = f"{filepath}_protocol_{protocol}.pkl"
            
            # 序列化
            start = time.time()
            with open(filename, 'wb') as f:
                pickle.dump(model, f, protocol=protocol)
            dump_time = time.time() - start
            
            file_size = os.path.getsize(filename) / (1024 * 1024)  # MB
            
            # 反序列化
            start = time.time()
            with open(filename, 'rb') as f:
                loaded = pickle.load(f)
            load_time = time.time() - start
            
            results[protocol] = {
                'dump_time': dump_time,
                'load_time': load_time,
                'file_size': file_size
            }
            
            print(f"Protocol {protocol}: {file_size:.2f} MB, "
                  f"Dump: {dump_time:.3f}s, Load: {load_time:.3f}s")
        
        return results
    
    def benchmark_joblib(self, model: Dict, filepath: str):
        """测试Joblib不同压缩级别"""
        compressions = [0, 1, 3, 6]  # 压缩级别
        
        results = {}
        
        for compress in compressions:
            filename = f"{filepath}_joblib_{compress}.pkl"
            
            # 序列化
            start = time.time()
            joblib.dump(model, filename, compress=compress, protocol=pickle.HIGHEST_PROTOCOL)
            dump_time = time.time() - start
            
            file_size = os.path.getsize(filename) / (1024 * 1024)
            
            # 反序列化
            start = time.time()
            loaded = joblib.load(filename)
            load_time = time.time() - start
            
            results[compress] = {
                'dump_time': dump_time,
                'load_time': load_time,
                'file_size': file_size
            }
            
            compression_ratio = results[0]['file_size'] / file_size if compress > 0 else 1.0
            print(f"Joblib (compress={compress}): {file_size:.2f} MB "
                  f"(ratio: {compression_ratio:.2f}x), "
                  f"Dump: {dump_time:.3f}s, Load: {load_time:.3f}s")
        
        return results
    
    def benchmark_memory_mapping(self, model: Dict, filepath: str):
        """
        测试Joblib的内存映射功能
        
        允许大数组按需加载,而非一次性载入内存
        """
        filename = f"{filepath}_mmap.pkl"
        
        # 保存
        joblib.dump(model, filename)
        
        # 正常加载(拷贝到内存)
        start = time.time()
        loaded_normal = joblib.load(filename)
        normal_time = time.time() - start
        
        # 内存映射加载(零拷贝)
        start = time.time()
        loaded_mmap = joblib.load(filename, mmap_mode='r')
        mmap_time = time.time() - start
        
        print(f"\nMemory Mapping:")
        print(f"  Normal load: {normal_time:.4f}s")
        print(f"  MMap load:   {mmap_time:.4f}s")
        print(f"  Speedup:     {normal_time/mmap_time:.1f}x")
        
        # 验证数据可访问
        print(f"  Sample data access: {loaded_mmap['weights'][0, 0]:.4f}")
        
        return {
            'normal_time': normal_time,
            'mmap_time': mmap_time
        }

if __name__ == "__main__":
    print("[1] 创建测试模型")
    benchmark = SerializationBenchmark()
    model = benchmark.create_large_model(n_features=500, n_samples=5000)
    print(f"Model size in memory: ~{sum(v.nbytes for v in model.values() if isinstance(v, np.ndarray)) / 1024**2:.2f} MB")
    
    print("\n[2] Pickle协议对比")
    pickle_results = benchmark.benchmark_pickle_protocols(model, "/tmp/model")
    
    print("\n[3] Joblib压缩对比")
    joblib_results = benchmark.benchmark_joblib(model, "/tmp/model")
    
    print("\n[4] 内存映射测试")
    mmap_results = benchmark.benchmark_memory_mapping(model, "/tmp/model")
    
    print("\n[5] 自定义对象序列化")
    custom_model = CustomModelWithState(n_features=10)
    
    # 模拟训练(填充缓存)
    X_train = np.random.randn(100, 10)
    y_train = np.random.randn(100)
    custom_model.train(X_train, y_train)
    
    print(f"\nBefore serialization:")
    print(f"  Cache size: {len(custom_model.training_cache)} items")
    print(f"  Temp file: {custom_model.temp_file.name}")
    
    # 序列化
    with open("/tmp/custom_model.pkl", "wb") as f:
        pickle.dump(custom_model, f)
    
    # 删除原对象
    temp_name = custom_model.temp_file.name
    del custom_model
    
    # 反序列化
    with open("/tmp/custom_model.pkl", "rb") as f:
        restored_model = pickle.load(f)
    
    print(f"\nAfter deserialization:")
    print(f"  Cache size: {len(restored_model.training_cache)} items (reset)")
    print(f"  Temp file: {restored_model.temp_file.name} (new)")
    print(f"  Weights preserved: {np.allclose(restored_model.weights, restored_model.weights)}")
    
    print("\n[6] 安全性检查")
    # 计算文件哈希
    with open("/tmp/custom_model.pkl", "rb") as f:
        content = f.read()
        file_hash = hashlib.sha256(content).hexdigest()
    print(f"Model file SHA256: {file_hash}")
    print("Security note: Always verify file integrity before loading untrusted models")

9.3.1.2 PMML与ONNX开放格式:跨平台模型交换标准

模型交换标准解决了异构平台(Python训练、Java/C++部署)间的互操作性问题。PMML(Predictive Model Markup Language)基于XML Schema定义,提供决策树、回归、神经网络等传统模型的标准化表示,适用于企业级商业智能系统的规则引擎集成,但其对深度学习与现代预处理方法的支持有限。ONNX(Open Neural Network Exchange)作为开源标准,采用Protobuf序列化与计算图表示,支持PyTorch、TensorFlow等框架的互转,通过算子集(Operator Set)版本控制实现前向兼容性。ONNX的图结构包含节点(算子)、初始值(权重)与值信息(张量形状),支持动态形状与自定义算子扩展。部署阶段,ONNX Runtime通过图优化(常量折叠、算子融合)与硬件加速(CUDA、TensorRT)实现高效推理。以下代码实现了scikit-learn模型到PMML的转换及PyTorch模型到ONNX的导出与优化。

Python

复制

"""
【脚本说明】
本脚本实现跨平台模型交换格式(PMML与ONNX)的转换与优化,包含:
1. Scikit-learn模型到PMML的转换(使用sklearn2pmml)
2. PyTorch模型到ONNX的导出(动态轴设置)
3. ONNX模型优化(常量折叠、死代码消除)
4. 跨框架推理对比(PyTorch vs ONNX Runtime)
5. 模型验证与可视化(Netron格式)

使用方式:
- 需安装sklearn2pmml、onnx、onnxruntime库
- 直接运行查看模型转换与推理性能对比
- 适用于模型从研究环境(Python)到生产环境(Java/C++)的迁移
"""

import numpy as np
import pandas as pd
import tempfile
import os
import time
from typing import List, Dict, Any

# 传统机器学习(PMML)
from sklearn.ensemble import RandomForestClassifier, GradientBoostingRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_iris, make_regression

# 深度学习(ONNX)
import torch
import torch.nn as nn
import onnx
import onnxruntime as ort
from onnx.tools import net_drawer

# PMML支持
try:
    from sklearn2pmml import sklearn2pmml
    from sklearn2pmml.pipeline import PMMLPipeline
    HAS_SKLEARN2PMML = True
except ImportError:
    HAS_SKLEARN2PMML = False
    print("Warning: sklearn2pmml not installed, PMML examples will be skipped")

class PMMLExporter:
    """PMML格式导出器"""
    
    def __init__(self):
        self.models = {}
    
    def create_pmml_pipeline(self) -> Pipeline:
        """
        创建符合PMML标准的Pipeline
        
        PMML要求使用PMMLPipeline而非标准Pipeline
        """
        if not HAS_SKLEARN2PMML:
            return None
        
        pipeline = PMMLPipeline([
            ('scaler', StandardScaler()),
            ('classifier', RandomForestClassifier(n_estimators=10, random_state=42))
        ])
        
        return pipeline
    
    def export_sklearn_to_pmml(self, pipeline, X: pd.DataFrame, y: pd.Series, 
                              output_path: str):
        """
        导出scikit-learn Pipeline到PMML
        
        Args:
            pipeline: 训练好的PMMLPipeline
            X: 特征数据(用于推断schema)
            y: 目标数据
            output_path: 输出文件路径(.pmml)
        """
        if not HAS_SKLEARN2PMML:
            print("sklearn2pmml not available")
            return
        
        # 训练
        pipeline.fit(X, y)
        
        # 导出PMML
        sklearn2pmml(pipeline, output_path)
        print(f"PMML exported to: {output_path}")
        
        # 返回文件大小
        file_size = os.path.getsize(output_path) / 1024  # KB
        print(f"PMML file size: {file_size:.2f} KB")
        
        return file_size
    
    def inspect_pmml(self, pmml_path: str) -> Dict:
        """
        解析PMML文件结构(简化版)
        
        实际应用中应使用lxml解析XML
        """
        with open(pmml_path, 'r') as f:
            content = f.read()
        
        # 简单文本分析
        info = {
            'file_size_kb': os.path.getsize(pmml_path) / 1024,
            'lines': len(content.split('\n')),
            'has_preprocessing': 'StandardScaler' in content or 'Normalization' in content,
            'model_type': None
        }
        
        if 'RandomForest' in content:
            info['model_type'] = 'RandomForest'
        elif 'GradientBoosting' in content:
            info['model_type'] = 'GradientBoosting'
        elif 'Regression' in content:
            info['model_type'] = 'Regression'
        
        return info

class ONNXExporter:
    """ONNX格式导出器与优化器"""
    
    def __init__(self):
        self.optimization_passes = [
            'eliminate_deadend',
            'eliminate_identity', 
            'extract_constant_to_initializer',
            'fuse_add_bias_into_conv',
            'fuse_bn_into_conv'
        ]
    
    def create_simple_nn(self, input_dim: int, hidden_dim: int, output_dim: int) -> nn.Module:
        """创建简单神经网络用于导出"""
        class SimpleNN(nn.Module):
            def __init__(self):
                super(SimpleNN, self).__init__()
                self.fc1 = nn.Linear(input_dim, hidden_dim)
                self.relu = nn.ReLU()
                self.dropout = nn.Dropout(0.2)
                self.fc2 = nn.Linear(hidden_dim, output_dim)
            
            def forward(self, x):
                x = self.fc1(x)
                x = self.relu(x)
                x = self.dropout(x)
                x = self.fc2(x)
                return x
        
        return SimpleNN()
    
    def export_to_onnx(self, model: nn.Module, sample_input: torch.Tensor, 
                      output_path: str, input_names: List[str] = ['input'],
                      output_names: List[str] = ['output'],
                      dynamic_axes: Dict[str, Dict[int, str]] = None):
        """
        导出PyTorch模型到ONNX
        
        Args:
            dynamic_axes: 定义动态维度(如batch size)
                示例: {'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
        """
        model.eval()
        
        torch.onnx.export(
            model,
            sample_input,
            output_path,
            export_params=True,
            opset_version=11,
            do_constant_folding=True,
            input_names=input_names,
            output_names=output_names,
            dynamic_axes=dynamic_axes
        )
        
        print(f"ONNX exported to: {output_path}")
        
        # 验证模型
        onnx_model = onnx.load(output_path)
        onnx.checker.check_model(onnx_model)
        print("ONNX model validation: PASSED")
        
        return onnx_model
    
    def optimize_onnx(self, model_path: str, output_path: str):
        """
        使用ONNX优化工具优化模型
        
        包括:
        - 常量折叠(Constant Folding)
        - 算子融合(Operator Fusion)
        - 死代码消除(Dead Code Elimination)
        """
        from onnx import optimizer
        
        # 加载模型
        model = onnx.load(model_path)
        
        # 应用优化
        optimized_model = optimizer.optimize(model, self.optimization_passes)
        
        # 保存
        onnx.save(optimized_model, output_path)
        print(f"Optimized ONNX saved to: {output_path}")
        
        # 对比大小
        original_size = os.path.getsize(model_path) / 1024
        optimized_size = os.path.getsize(output_path) / 1024
        print(f"Size reduction: {original_size:.2f} KB -> {optimized_size:.2f} KB "
              f"({(1-optimized_size/original_size)*100:.1f}%)")
        
        return optimized_model
    
    def benchmark_inference(self, pytorch_model: nn.Module, onnx_path: str, 
                           test_input: np.ndarray, n_runs: int = 100):
        """
        对比PyTorch与ONNX Runtime的推理性能
        """
        # PyTorch推理
        pytorch_model.eval()
        pytorch_input = torch.from_numpy(test_input).float()
        
        # 预热
        with torch.no_grad():
            _ = pytorch_model(pytorch_input)
        
        start = time.time()
        with torch.no_grad():
            for _ in range(n_runs):
                pytorch_out = pytorch_model(pytorch_input)
        pytorch_time = (time.time() - start) / n_runs * 1000  # ms
        
        # ONNX Runtime推理
        ort_session = ort.InferenceSession(onnx_path)
        ort_inputs = {ort_session.get_inputs()[0].name: test_input}
        
        # 预热
        _ = ort_session.run(None, ort_inputs)
        
        start = time.time()
        for _ in range(n_runs):
            ort_out = ort_session.run(None, ort_inputs)
        onnx_time = (time.time() - start) / n_runs * 1000
        
        print(f"\nInference Benchmark ({n_runs} runs):")
        print(f"  PyTorch:      {pytorch_time:.4f} ms")
        print(f"  ONNX Runtime: {onnx_time:.4f} ms")
        print(f"  Speedup:      {pytorch_time/onnx_time:.2f}x")
        
        # 验证输出一致性
        ort_out_array = ort_out[0]
        pytorch_out_array = pytorch_out.numpy()
        max_diff = np.max(np.abs(ort_out_array - pytorch_out_array))
        print(f"  Max diff:     {max_diff:.2e}")
        
        return {
            'pytorch_ms': pytorch_time,
            'onnx_ms': onnx_time,
            'speedup': pytorch_time/onnx_time,
            'max_diff': max_diff
        }

if __name__ == "__main__":
    print("=" * 60)
    print("PMML与ONNX模型交换格式演示")
    print("=" * 60)
    
    # PMML示例
    if HAS_SKLEARN2PMML:
        print("\n[1] PMML导出(传统机器学习)")
        pmml_exporter = PMMLExporter()
        
        # 准备数据
        iris = load_iris()
        X_iris = pd.DataFrame(iris.data, columns=iris.feature_names)
        y_iris = pd.Series(iris.target, name='species')
        
        # 创建并导出模型
        pipeline = pmml_exporter.create_pmml_pipeline()
        pmml_path = "/tmp/iris_model.pmml"
        pmml_exporter.export_sklearn_to_pmml(pipeline, X_iris, y_iris, pmml_path)
        
        # 检查PMML结构
        info = pmml_exporter.inspect_pmml(pmml_path)
        print(f"\nPMML Structure:")
        print(f"  Model type: {info['model_type']}")
        print(f"  Preprocessing: {info['has_preprocessing']}")
        print(f"  File size: {info['file_size_kb']:.2f} KB")
    
    # ONNX示例
    print("\n[2] ONNX导出(深度学习)")
    onnx_exporter = ONNXExporter()
    
    # 创建模型
    input_dim, hidden_dim, output_dim = 10, 20, 1
    model = onnx_exporter.create_simple_nn(input_dim, hidden_dim, output_dim)
    
    # 导出ONNX
    sample_input = torch.randn(1, input_dim)
    onnx_path = "/tmp/simple_nn.onnx"
    onnx_model = onnx_exporter.export_to_onnx(
        model, sample_input, onnx_path,
        dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
    )
    
    # 优化ONNX
    optimized_path = "/tmp/simple_nn_optimized.onnx"
    onnx_exporter.optimize_onnx(onnx_path, optimized_path)
    
    # 性能基准测试
    print("\n[3] 推理性能对比")
    batch_sizes = [1, 32, 128]
    for batch in batch_sizes:
        print(f"\nBatch size: {batch}")
        test_data = np.random.randn(batch, input_dim).astype(np.float32)
        onnx_exporter.benchmark_inference(model, optimized_path, test_data, n_runs=50)
    
    print("\n[4] 跨平台部署说明")
    print("""
    PMML部署选项:
    1. JPMML (Java): https://github.com/jpmml/jpmml-evaluator
    2. Openscoring (REST API): https://github.com/openscoring/openscoring
    3. Augustus (Python/PMML consumer): https://augustus.readthedocs.io/
    
    ONNX部署选项:
    1. ONNX Runtime (Python/C/C++/Java/C#): https://onnxruntime.ai/
    2. TensorRT (NVIDIA GPU优化)
    3. OpenVINO (Intel优化)
    4. TVM (多种硬件后端)
    """)
9.3.1.3 模型版本控制:MLflow与DVC的实验追踪与血缘管理

机器学习模型的版本控制超越了传统代码版本控制的范畴,需同时追踪代码、数据、超参数、环境依赖及模型产物的完整状态。MLflow通过Tracking API提供实验参数、指标与Artifacts的日志记录,支持模型注册中心(Model Registry)实现从实验到生产阶段的状态转换(Staging、Production、Archived)。DVC(Data Version Control)则专注于大型数据文件与模型的版本管理,利用Git存储元数据(指针文件),实际数据存储于远程存储(S3、GCS、HDFS),通过内容寻址(SHA256)确保数据完整性。血缘管理(Lineage Tracking)记录模型训练的数据来源、预处理步骤及依赖关系,支持端到端的可复现性。以下代码实现了MLflow实验追踪与DVC数据版本化的集成方案。

Python

"""
【脚本说明】
本脚本实现模型版本控制与实验追踪,包含:
1. MLflow实验参数、指标与模型Artifacts的日志记录
2. 模型注册中心(Model Registry)的状态管理
3. DVC数据版本控制(基于内容寻址)
4. 血缘追踪:记录数据->预处理->训练->评估的完整链路
5. 可复现性保证:环境依赖与随机种子的完整记录

使用方式:
- 需安装mlflow与dvc库
- 配置MLflow Tracking URI与DVC远程存储
- 适用于团队协作与生产环境模型管理
"""

import os
import json
import yaml
import hashlib
import time
from datetime import datetime
from typing import Dict, Any, Optional
import tempfile
import shutil

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

# MLflow
import mlflow
import mlflow.sklearn

# DVC
try:
    import dvc.repo
    import dvc.main
    HAS_DVC = True
except ImportError:
    HAS_DVC = False
    print("Warning: DVC not installed, data versioning examples will be skipped")

class MLflowExperimentTracker:
    """MLflow实验追踪实现"""
    
    def __init__(self, experiment_name: str, tracking_uri: Optional[str] = None):
        """
        初始化实验追踪
        
        Args:
            experiment_name: 实验名称
            tracking_uri: MLflow追踪服务器地址(默认本地文件系统)
        """
        if tracking_uri:
            mlflow.set_tracking_uri(tracking_uri)
        
        # 创建或获取实验
        experiment = mlflow.get_experiment_by_name(experiment_name)
        if experiment is None:
            self.experiment_id = mlflow.create_experiment(
                experiment_name,
                artifact_location=f"/tmp/mlflow_artifacts/{experiment_name}"
            )
        else:
            self.experiment_id = experiment.experiment_id
        
        mlflow.set_experiment(experiment_name)
        self.current_run = None
    
    def start_run(self, run_name: Optional[str] = None, 
                  nested: bool = False, tags: Optional[Dict] = None):
        """启动新的运行"""
        self.current_run = mlflow.start_run(
            run_name=run_name,
            experiment_id=self.experiment_id,
            nested=nested,
            tags=tags
        )
        return self
    
    def log_params(self, params: Dict[str, Any]):
        """记录参数"""
        for key, value in params.items():
            mlflow.log_param(key, value)
    
    def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None):
        """记录指标"""
        for key, value in metrics.items():
            mlflow.log_metric(key, value, step=step)
    
    def log_model(self, model: Any, artifact_path: str, 
                  registered_model_name: Optional[str] = None):
        """
        记录模型
        
        Args:
            registered_model_name: 如提供,将模型注册到Model Registry
        """
        mlflow.sklearn.log_model(
            model,
            artifact_path=artifact_path,
            registered_model_name=registered_model_name
        )
    
    def log_artifact(self, local_path: str, artifact_path: Optional[str] = None):
        """记录本地文件(如数据集、图表)"""
        mlflow.log_artifact(local_path, artifact_path)
    
    def set_tags(self, tags: Dict[str, str]):
        """设置运行标签"""
        for key, value in tags.items():
            mlflow.set_tag(key, value)
    
    def end_run(self):
        """结束当前运行"""
        mlflow.end_run()
        self.current_run = None
    
    def transition_model_stage(self, model_name: str, version: int, 
                              stage: str, archive_existing_versions: bool = True):
        """
        转换模型阶段(None -> Staging -> Production -> Archived)
        
        Stages: None, Staging, Production, Archived
        """
        from mlflow.tracking import MlflowClient
        client = MlflowClient()
        
        client.transition_model_version_stage(
            name=model_name,
            version=version,
            stage=stage,
            archive_existing_versions=archive_existing_versions
        )
        print(f"Model {model_name} version {version} transitioned to {stage}")

class DVCDataVersioner:
    """DVC数据版本控制实现"""
    
    def __init__(self, repo_path: str = "."):
        self.repo_path = repo_path
        self.dvc_dir = os.path.join(repo_path, ".dvc")
        
        if not os.path.exists(self.dvc_dir) and HAS_DVC:
            # 初始化DVC仓库
            os.system(f"cd {repo_path} && dvc init")
    
    def track_data(self, data_path: str, remote_name: Optional[str] = None):
        """
        追踪数据文件(添加到DVC控制)
        
        步骤:
        1. dvc add data_path(创建.dvc文件与.gitignore)
        2. git add .dvc文件
        3. dvc push(推送到远程存储)
        """
        if not HAS_DVC:
            print("DVC not available")
            return
        
        # 添加文件到DVC
        os.system(f"cd {self.repo_path} && dvc add {data_path}")
        
        # 添加.dvc文件到git(元数据)
        dvc_file = f"{data_path}.dvc"
        gitignore = os.path.join(os.path.dirname(data_path) or '.', '.gitignore')
        os.system(f"cd {self.repo_path} && git add {dvc_file} {gitignore}")
        
        print(f"Data {data_path} is now tracked by DVC")
        print(f"Run 'git commit' and 'dvc push' to save version")
    
    def get_data_hash(self, data_path: str) -> str:
        """获取数据文件的内容哈希(MD5)"""
        dvc_file = f"{data_path}.dvc"
        if os.path.exists(dvc_file):
            with open(dvc_file, 'r') as f:
                dvc_yaml = yaml.safe_load(f)
                return dvc_yaml.get('md5', 'unknown')
        
        # 直接计算文件哈希
        hash_md5 = hashlib.md5()
        with open(data_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()
    
    def checkout_version(self, data_path: str, revision: str):
        """
        检出特定版本的数据
        
        Args:
            revision: Git commit hash, branch, or tag
        """
        if not HAS_DVC:
            print("DVC not available")
            return
        
        # 检出特定git版本的.dvc文件
        os.system(f"cd {self.repo_path} && git checkout {revision} -- {data_path}.dvc")
        
        # 拉取对应数据
        os.system(f"cd {self.repo_path} && dvc checkout {data_path}.dvc")
        print(f"Checked out {data_path} at revision {revision}")

class LineageTracker:
    """血缘追踪器:记录数据到模型的完整链路"""
    
    def __init__(self):
        self.lineage = {
            'timestamp': datetime.now().isoformat(),
            'steps': []
        }
    
    def log_data_source(self, source_path: str, data_hash: str, 
                       description: str = ""):
        """记录数据源"""
        self.lineage['steps'].append({
            'type': 'data_source',
            'path': source_path,
            'hash': data_hash,
            'description': description,
            'timestamp': datetime.now().isoformat()
        })
    
    def log_preprocessing(self, input_hash: str, output_hash: str, 
                         steps: list, code_version: str):
        """记录预处理步骤"""
        self.lineage['steps'].append({
            'type': 'preprocessing',
            'input_hash': input_hash,
            'output_hash': output_hash,
            'steps': steps,
            'code_version': code_version,
            'timestamp': datetime.now().isoformat()
        })
    
    def log_training(self, train_data_hash: str, model_hash: str, 
                    algorithm: str, hyperparams: dict, code_version: str):
        """记录训练过程"""
        self.lineage['steps'].append({
            'type': 'training',
            'train_data_hash': train_data_hash,
            'model_hash': model_hash,
            'algorithm': algorithm,
            'hyperparameters': hyperparams,
            'code_version': code_version,
            'timestamp': datetime.now().isoformat()
        })
    
    def save(self, path: str):
        """保存血缘信息"""
        with open(path, 'w') as f:
            json.dump(self.lineage, f, indent=2)
        print(f"Lineage saved to {path}")
    
    def to_mlflow_tags(self) -> Dict[str, str]:
        """转换为MLflow标签(扁平化)"""
        tags = {
            'lineage_timestamp': self.lineage['timestamp'],
            'lineage_steps_count': str(len(self.lineage['steps']))
        }
        
        for i, step in enumerate(self.lineage['steps']):
            prefix = f"lineage_step_{i}"
            tags[f"{prefix}_type"] = step['type']
            for key, value in step.items():
                if key != 'type' and isinstance(value, (str, int, float)):
                    tags[f"{prefix}_{key}"] = str(value)
        
        return tags

class ReproducibleExperiment:
    """可复现实验:整合MLflow、DVC与血缘追踪"""
    
    def __init__(self, experiment_name: str, repo_path: str = "."):
        self.tracker = MLflowExperimentTracker(experiment_name)
        self.dvc = DVCDataVersioner(repo_path)
        self.lineage = LineageTracker()
        self.code_version = self._get_git_commit()
    
    def _get_git_commit(self) -> str:
        """获取当前Git commit hash"""
        try:
            import subprocess
            result = subprocess.check_output(['git', 'rev-parse', 'HEAD'])
            return result.decode('utf-8').strip()[:8]
        except:
            return "unknown"
    
    def run_experiment(self, data_path: str, model_config: dict):
        """
        运行完整实验流程
        
        1. 数据版本化(DVC)
        2. 训练与评估
        3. 记录血缘(MLflow)
        4. 模型注册
        """
        # 记录数据源
        data_hash = self.dvc.get_data_hash(data_path)
        self.lineage.log_data_source(data_path, data_hash, "Training dataset")
        
        # 追踪数据(如果是新数据)
        self.dvc.track_data(data_path)
        
        # 加载数据
        data = pd.read_csv(data_path)
        X = data.drop('target', axis=1)
        y = data['target']
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
        
        # 记录预处理(简化示例)
        self.lineage.log_preprocessing(
            data_hash, 
            hashlib.md5(X_train.to_csv().encode()).hexdigest()[:8],
            steps=['train_test_split'],
            code_version=self.code_version
        )
        
        # 开始MLflow运行
        with self.tracker.start_run(run_name=f"run_{int(time.time())}"):
            # 记录参数
            self.tracker.log_params(model_config)
            self.tracker.log_params({
                'data_hash': data_hash,
                'code_version': self.code_version,
                'train_size': len(X_train),
                'test_size': len(X_test)
            })
            
            # 训练模型
            model = RandomForestClassifier(**model_config, random_state=42)
            model.fit(X_train, y_train)
            
            # 评估
            y_pred = model.predict(X_test)
            metrics = {
                'accuracy': accuracy_score(y_test, y_pred),
                'f1_score': f1_score(y_test, y_pred, average='weighted')
            }
            self.tracker.log_metrics(metrics)
            
            # 记录血缘
            model_hash = hashlib.md5(str(model.get_params()).encode()).hexdigest()[:8]
            self.lineage.log_training(
                data_hash,
                model_hash,
                'RandomForestClassifier',
                model_config,
                self.code_version
            )
            
            # 保存血缘到MLflow
            lineage_tags = self.lineage.to_mlflow_tags()
            self.tracker.set_tags(lineage_tags)
            
            # 记录模型
            self.tracker.log_model(
                model, 
                "model",
                registered_model_name="RandomForest_Production"
            )
            
            print(f"\nExperiment completed:")
            print(f"  Data hash: {data_hash}")
            print(f"  Model hash: {model_hash}")
            print(f"  Metrics: {metrics}")
            print(f"  Code version: {self.code_version}")

if __name__ == "__main__":
    print("=" * 60)
    print("模型版本控制与实验追踪演示")
    print("=" * 60)
    
    # 创建临时数据
    print("\n[1] 准备示例数据")
    X, y = make_classification(n_samples=1000, n_features=20, n_informative=10, 
                              n_redundant=5, random_state=42)
    df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(20)])
    df['target'] = y
    
    temp_dir = tempfile.mkdtemp()
    data_path = os.path.join(temp_dir, 'data.csv')
    df.to_csv(data_path, index=False)
    print(f"Data saved to: {data_path}")
    
    # 运行实验
    print("\n[2] 运行可复现实验")
    experiment = ReproducibleExperiment(
        experiment_name="classification_experiment",
        repo_path=temp_dir
    )
    
    experiment.run_experiment(
        data_path=data_path,
        model_config={
            'n_estimators': 100,
            'max_depth': 5,
            'min_samples_split': 2
        }
    )
    
    # 展示血缘信息
    print("\n[3] 血缘追踪信息")
    print(json.dumps(experiment.lineage.lineage, indent=2))
    
    # 清理
    shutil.rmtree(temp_dir)
    print(f"\n[4] 清理临时目录: {temp_dir}")
    print("\n注意:实际应用中应配置MLflow Tracking Server与DVC Remote Storage")

9.3.2 推理优化基础

9.3.2.1 量化感知训练(QAT)与后训练量化(PTQ)的线性映射实现

模型量化通过降低权重与激活值的数值精度(从FP32到INT8/INT4)来减少模型大小与推理延迟,其核心在于最小化量化误差的同时保持模型精度。后训练量化(PTQ)在模型训练完成后直接统计权重与激活的动态范围,通过线性映射(Scale-Zero Point)将浮点值映射至整数区间,无需重新训练,但可能因统计分布偏移导致精度损失。量化感知训练(QAT)则在训练过程中模拟量化操作(Fake Quantization),将量化噪声作为正则项融入损失函数,使模型适应低精度表示的离散性。线性映射遵循r = S(q - Z)公式,其中S为缩放因子(scale),Z为零点(zero-point),通过最小化原始值与量化-反量化后值的L2距离确定最优参数。以下代码实现了PTQ的统计校准与QAT的前向传播模拟。

"""
【脚本说明】
本脚本实现神经网络量化技术,包含:
1. 后训练量化(PTQ):基于统计校准的静态量化
2. 量化感知训练(QAT):Fake Quantization前向传播
3. 对称与非对称量化方案(Per-tensor vs Per-channel)
4. 动态范围校准(Calibration)与KL散度最小化
5. 量化-反量化误差分析与可视化

使用方式:
- 直接运行查看PTQ与QAT在简单网络上的实现
- 调整比特宽度(8-bit, 4-bit)观察精度损失
- 适用于模型压缩与边缘设备部署准备
"""

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from typing import Dict, Tuple, Optional, List
import copy

class QuantizationConfig:
    """量化配置"""
    def __init__(self, bits: int = 8, symmetric: bool = True, 
                 per_channel: bool = False):
        self.bits = bits
        self.symmetric = symmetric
        self.per_channel = per_channel
        self.qmin = -(2 ** (bits - 1)) if symmetric else 0
        self.qmax = (2 ** (bits - 1)) - 1 if symmetric else (2 ** bits) - 1

class PostTrainingQuantizer:
    """
    后训练量化(PTQ)实现
    
    通过校准数据集确定激活值的动态范围
    """
    
    def __init__(self, model: nn.Module, config: QuantizationConfig):
        self.model = model
        self.config = config
        self.scales = {}
        self.zero_points = {}
        self.activation_stats = {}
    
    def calibrate(self, calibration_data: torch.Tensor, num_batches: int = 10):
        """
        校准:收集激活值的统计信息
        
        使用KL散度或简单的Min-Max统计确定量化参数
        """
        self.model.eval()
        
        # 注册钩子收集激活值
        activations = {}
        
        def get_activation(name):
            def hook(model, input, output):
                activations[name] = output.detach()
            return hook
        
        # 为所有层注册钩子
        handles = []
        for name, module in self.model.named_modules():
            if isinstance(module, (nn.Linear, nn.Conv2d)):
                handles.append(module.register_forward_hook(get_activation(name)))
        
        # 前向传播收集统计
        with torch.no_grad():
            for i, batch in enumerate(calibration_data):
                if i >= num_batches:
                    break
                _ = self.model(batch)
                
                # 更新统计
                for name, act in activations.items():
                    if name not in self.activation_stats:
                        self.activation_stats[name] = []
                    self.activation_stats[name].append(act)
        
        # 移除钩子
        for handle in handles:
            handle.remove()
        
        # 计算每层量化参数
        for name, act_list in self.activation_stats.items():
            all_activations = torch.cat([a.flatten() for a in act_list])
            self._compute_quantization_params(name, all_activations)
    
    def _compute_quantization_params(self, name: str, tensor: torch.Tensor):
        """
        计算量化的Scale和Zero Point
        
        对称量化: scale = max(|min|, |max|) / qmax, zero_point = 0
        非对称量化: scale = (max - min) / (qmax - qmin), 
                   zero_point = qmin - round(min / scale)
        """
        min_val = tensor.min()
        max_val = tensor.max()
        
        if self.config.symmetric:
            max_abs = max(abs(min_val), abs(max_val))
            scale = max_abs / self.config.qmax
            zero_point = 0
        else:
            scale = (max_val - min_val) / (self.config.qmax - self.config.qmin)
            zero_point = self.config.qmin - round(min_val / scale)
            zero_point = max(self.config.qmin, min(zero_point, self.config.qmax))
        
        self.scales[name] = scale
        self.zero_points[name] = zero_point
    
    def quantize_tensor(self, tensor: torch.Tensor, scale: float, 
                       zero_point: int) -> torch.Tensor:
        """量化:float -> int"""
        quantized = torch.round(tensor / scale + zero_point)
        return torch.clamp(quantized, self.config.qmin, self.config.qmax)
    
    def dequantize_tensor(self, tensor: torch.Tensor, scale: float, 
                         zero_point: int) -> torch.Tensor:
        """反量化:int -> float"""
        return scale * (tensor.float() - zero_point)
    
    def apply_quantization(self):
        """
        应用量化到模型权重
        
        实际部署中,权重会被永久量化为int8,这里模拟该过程
        """
        quantized_state = {}
        
        for name, param in self.model.named_parameters():
            if 'weight' in name:
                # 计算权重量化参数(简化:使用Min-Max)
                scale = (param.data.max() - param.data.min()) / \
                        (self.config.qmax - self.config.qmin)
                
                quantized = self.quantize_tensor(param.data, scale, 0)
                dequantized = self.dequantize_tensor(quantized, scale, 0)
                
                # 替换为量化后的值(模拟量化效果)
                param.data = dequantized
                quantized_state[name] = {
                    'scale': scale,
                    'zero_point': 0,
                    'int_values': quantized
                }
        
        return quantized_state

class QuantizationAwareTraining:
    """
    量化感知训练(QAT)实现
    
    在训练过程中模拟量化效应(Fake Quantization)
    """
    
    def __init__(self, model: nn.Module, config: QuantizationConfig):
        self.model = model
        self.config = config
        self.fake_quantize = FakeQuantize(config)
    
    def apply_qat(self):
        """
        为模型添加Fake Quantization层
        
        替换原有层为支持QAT的版本
        """
        self._replace_modules(self.model)
    
    def _replace_modules(self, model):
        """递归替换模块"""
        for name, module in model.named_children():
            if isinstance(module, nn.Linear):
                setattr(model, name, QuantizedLinear(module, self.fake_quantize))
            elif isinstance(module, nn.Conv2d):
                setattr(model, name, QuantizedConv2d(module, self.fake_quantize))
            else:
                self._replace_modules(module)
    
    def train(self, train_loader, epochs: int = 5, lr: float = 0.001):
        """训练(包含量化噪声)"""
        optimizer = optim.Adam(self.model.parameters(), lr=lr)
        criterion = nn.CrossEntropyLoss()
        
        self.model.train()
        for epoch in range(epochs):
            total_loss = 0
            for batch_idx, (data, target) in enumerate(train_loader):
                optimizer.zero_grad()
                output = self.model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()
            
            print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}")

class FakeQuantize:
    """Fake Quantization操作:模拟量化噪声但保持浮点计算"""
    
    def __init__(self, config: QuantizationConfig):
        self.config = config
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        前向:量化 -> 反量化(引入量化噪声)
        反向:直通估计器(Straight-Through Estimator)
        """
        if not self.config.symmetric:
            # 非对称量化
            min_val = x.min()
            max_val = x.max()
            scale = (max_val - min_val) / (self.config.qmax - self.config.qmin)
            zero_point = self.config.qmin - torch.round(min_val / scale)
            
            x_quant = torch.round(x / scale + zero_point)
            x_quant = torch.clamp(x_quant, self.config.qmin, self.config.qmax)
            x_dequant = scale * (x_quant - zero_point)
        else:
            # 对称量化
            max_abs = torch.max(torch.abs(x))
            scale = max_abs / self.config.qmax
            x_quant = torch.round(x / scale)
            x_quant = torch.clamp(x_quant, self.config.qmin, self.config.qmax)
            x_dequant = scale * x_quant
        
        # 直通估计器:反向传播时梯度直接通过
        return x + (x_dequant - x).detach()

class QuantizedLinear(nn.Module):
    """支持QAT的线性层"""
    
    def __init__(self, linear: nn.Linear, fake_quantize: FakeQuantize):
        super().__init__()
        self.weight = linear.weight
        self.bias = linear.bias
        self.fake_quantize = fake_quantize
    
    def forward(self, x):
        # 对输入和权重进行Fake Quantize
        x_q = self.fake_quantize.forward(x)
        w_q = self.fake_quantize.forward(self.weight)
        return torch.nn.functional.linear(x_q, w_q, self.bias)

class QuantizedConv2d(nn.Module):
    """支持QAT的卷积层"""
    
    def __init__(self, conv: nn.Conv2d, fake_quantize: FakeQuantize):
        super().__init__()
        self.weight = conv.weight
        self.bias = conv.bias
        self.stride = conv.stride
        self.padding = conv.padding
        self.fake_quantize = fake_quantize
    
    def forward(self, x):
        x_q = self.fake_quantize.forward(x)
        w_q = self.fake_quantize.forward(self.weight)
        return torch.nn.functional.conv2d(x_q, w_q, self.bias, 
                                         self.stride, self.padding)

if __name__ == "__main__":
    print("[1] 创建测试模型")
    class SimpleNet(nn.Module):
        def __init__(self):
            super().__init__()
            self.fc1 = nn.Linear(10, 50)
            self.fc2 = nn.Linear(50, 10)
            self.fc3 = nn.Linear(10, 2)
        
        def forward(self, x):
            x = torch.relu(self.fc1(x))
            x = torch.relu(self.fc2(x))
            return self.fc3(x)
    
    model = SimpleNet()
    
    # 生成校准数据
    calibration_data = [torch.randn(32, 10) for _ in range(10)]
    
    print("\n[2] 后训练量化(PTQ)")
    config_8bit = QuantizationConfig(bits=8, symmetric=True)
    ptq = PostTrainingQuantizer(model, config_8bit)
    ptq.calibrate(calibration_data)
    
    print(f"量化参数统计:")
    for name in list(ptq.scales.keys())[:3]:
        print(f"  {name}: scale={ptq.scales[name]:.6f}, "
              f"zero_point={ptq.zero_points[name]}")
    
    # 应用量化并测试
    original_weights = copy.deepcopy(model.state_dict())
    quantized_state = ptq.apply_quantization()
    
    # 测试量化后模型
    test_input = torch.randn(1, 10)
    with torch.no_grad():
        output_quantized = model(test_input)
        
        # 恢复原始权重对比
        model.load_state_dict(original_weights)
        output_original = model(test_input)
        
        error = torch.abs(output_quantized - output_original).mean()
        print(f"\n量化误差(Mean Absolute Error): {error:.6f}")
    
    print("\n[3] 量化感知训练(QAT)")
    # 重新创建模型
    model_qat = SimpleNet()
    qat = QuantizationAwareTraining(model_qat, config_8bit)
    qat.apply_qat()
    
    # 模拟训练数据
    train_data = [(torch.randn(16, 10), torch.randint(0, 2, (16,))) 
                 for _ in range(20)]
    
    print("开始QAT训练...")
    qat.train(train_data, epochs=3)
    
    print("\n[4] 不同比特宽度对比")
    for bits in [8, 6, 4]:
        config = QuantizationConfig(bits=bits, symmetric=True)
        model_test = SimpleNet()
        ptq_test = PostTrainingQuantizer(model_test, config)
        ptq_test.calibrate(calibration_data)
        ptq_test.apply_quantization()
        
        with torch.no_grad():
            out = model_test(test_input)
        
        error = torch.abs(out - output_original).mean()
        print(f"{bits}-bit量化误差: {error:.6f}")
    
    print("\n[5] 对称vs非对称量化")
    # 非对称测试
    config_asym = QuantizationConfig(bits=8, symmetric=False)
    model_asym = SimpleNet()
    ptq_asym = PostTrainingQuantizer(model_asym, config_asym)
    ptq_asym.calibrate(calibration_data)
    ptq_asym.apply_quantization()
    
    with torch.no_grad():
        out_asym = model_asym(test_input)
    
    error_asym = torch.abs(out_asym - output_original).mean()
    error_sym = error  # 来自之前的8-bit测试
    
    print(f"对称量化误差: {error_sym:.6f}")
    print(f"非对称量化误差: {error_asym:.6f}")
    print(f"更优方案: {'对称' if error_sym < error_asym else '非对称'}")
9.3.2.2 知识蒸馏:软标签(Soft Targets)与温度参数的温度退火

知识蒸馏通过训练学生网络模仿教师网络的输出来实现模型压缩,其核心在于软标签(Soft Targets)携带了类别间相似性的暗知识(Dark Knowledge)。温度参数T控制Softmax输出的平滑程度:高温度使概率分布更均匀,保留更多类别间关系信息;低温度接近硬标签(One-Hot)。温度退火策略在训练初期使用高温以传递丰富的结构信息,逐步降低温度至1以聚焦精确分类。蒸馏损失通常结合软目标交叉熵(KL散度)与硬目标交叉熵,权重平衡整体性能与任务精度。以下代码实现了温度调节的Softmax、蒸馏损失及退火调度器。

"""
【脚本说明】
本脚本实现知识蒸馏技术,包含:
1. 温度调节的Softmax(Distillation Softmax)
2. 软目标与硬目标结合的蒸馏损失
3. 温度退火策略(Cosine Annealing, Exponential Decay)
4. 注意力转移(Attention Transfer)实现
5. 教师-学生网络性能对比与压缩率分析

使用方式:
- 直接运行查看知识蒸馏在简单分类任务上的效果
- 调整温度参数与蒸馏权重观察对精度的影响
- 适用于模型压缩与移动端部署
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import copy
from typing import Optional, Callable

class DistillationSoftmax:
    """
    带温度的Softmax
    
    q_i = exp(z_i / T) / sum(exp(z_j / T))
    
    T > 1: 概率分布更平滑(软标签)
    T = 1: 标准Softmax
    T -> 0: 接近硬标签(One-Hot)
    """
    
    @staticmethod
    def forward(logits: torch.Tensor, temperature: float) -> torch.Tensor:
        """温度调节的Softmax"""
        return F.softmax(logits / temperature, dim=1)
    
    @staticmethod
    def kl_divergence(student_logits: torch.Tensor, 
                     teacher_probs: torch.Tensor, 
                     temperature: float) -> torch.Tensor:
        """
        蒸馏损失:KL散度(学生||教师)
        
        使用温度调节的预测计算
        """
        student_probs = DistillationSoftmax.forward(student_logits, temperature)
        student_log_probs = F.log_softmax(student_logits / temperature, dim=1)
        
        # KL(P_teacher || P_student) = sum(P_t * log(P_t / P_s))
        kl = F.kl_div(student_log_probs, teacher_probs, reduction='batchmean')
        return kl * (temperature ** 2)  # 缩放因子保持梯度幅度一致

class TemperatureScheduler:
    """温度退火调度器"""
    
    def __init__(self, initial_temp: float = 10.0, final_temp: float = 1.0, 
                 total_epochs: int = 100, strategy: str = 'cosine'):
        self.initial_temp = initial_temp
        self.final_temp = final_temp
        self.total_epochs = total_epochs
        self.strategy = strategy
    
    def get_temperature(self, epoch: int) -> float:
        """获取当前epoch的温度"""
        if self.strategy == 'cosine':
            # 余弦退火
            progress = epoch / self.total_epochs
            cosine_decay = 0.5 * (1 + np.cos(np.pi * progress))
            return self.final_temp + (self.initial_temp - self.final_temp) * cosine_decay
        
        elif self.strategy == 'exponential':
            # 指数衰减
            decay_rate = np.log(self.final_temp / self.initial_temp) / self.total_epochs
            return self.initial_temp * np.exp(decay_rate * epoch)
        
        elif self.strategy == 'linear':
            # 线性衰减
            progress = epoch / self.total_epochs
            return self.initial_temp + (self.final_temp - self.initial_temp) * progress
        
        else:  # constant
            return self.initial_temp

class KnowledgeDistillationLoss(nn.Module):
    """
    知识蒸馏损失
    
    L = alpha * T^2 * KL(P_teacher^T || P_student^T) + (1-alpha) * CE(P_student^1, y_hard)
    """
    
    def __init__(self, alpha: float = 0.7, temperature: float = 4.0):
        super().__init__()
        self.alpha = alpha
        self.temperature = temperature
        self.ce_loss = nn.CrossEntropyLoss()
    
    def forward(self, student_logits: torch.Tensor, 
                teacher_logits: torch.Tensor, 
                labels: torch.Tensor,
                temperature: Optional[float] = None) -> torch.Tensor:
        """
        计算蒸馏损失
        
        Args:
            student_logits: 学生网络原始输出
            teacher_logits: 教师网络原始输出
            labels: 真实标签(硬标签)
            temperature: 当前温度(如为None则使用初始化温度)
        """
        T = temperature if temperature is not None else self.temperature
        
        # 软目标损失(KL散度)
        with torch.no_grad():
            teacher_probs = DistillationSoftmax.forward(teacher_logits, T)
        
        soft_loss = DistillationSoftmax.kl_divergence(student_logits, teacher_probs, T)
        
        # 硬目标损失(标准交叉熵,T=1)
        hard_loss = self.ce_loss(student_logits, labels)
        
        # 加权组合
        total_loss = self.alpha * soft_loss + (1 - self.alpha) * hard_loss
        
        return total_loss, soft_loss, hard_loss

class TeacherNet(nn.Module):
    """教师网络(大容量)"""
    
    def __init__(self, input_dim: int = 784, num_classes: int = 10):
        super().__init__()
        self.features = nn.Sequential(
            nn.Linear(input_dim, 1200),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1200, 1200),
            nn.ReLU(),
            nn.Dropout(0.5)
        )
        self.classifier = nn.Linear(1200, num_classes)
    
    def forward(self, x):
        x = x.view(x.size(0), -1)
        features = self.features(x)
        logits = self.classifier(features)
        return logits, features

class StudentNet(nn.Module):
    """学生网络(小容量)"""
    
    def __init__(self, input_dim: int = 784, num_classes: int = 10):
        super().__init__()
        self.features = nn.Sequential(
            nn.Linear(input_dim, 400),
            nn.ReLU(),
            nn.Linear(400, 400),
            nn.ReLU()
        )
        self.classifier = nn.Linear(400, num_classes)
    
    def forward(self, x):
        x = x.view(x.size(0), -1)
        features = self.features(x)
        logits = self.classifier(features)
        return logits, features
    
    def count_parameters(self):
        """统计参数量"""
        return sum(p.numel() for p in self.parameters() if p.requires_grad)

class DistillationTrainer:
    """知识蒸馏训练器"""
    
    def __init__(self, teacher: nn.Module, student: nn.Module, 
                 temperature_scheduler: TemperatureScheduler,
                 alpha: float = 0.7, lr: float = 0.001):
        self.teacher = teacher
        self.student = student
        self.scheduler = temperature_scheduler
        self.criterion = KnowledgeDistillationLoss(alpha=alpha)
        self.optimizer = optim.Adam(student.parameters(), lr=lr)
        
        # 教师设为评估模式
        self.teacher.eval()
        for param in self.teacher.parameters():
            param.requires_grad = False
    
    def train_epoch(self, dataloader: DataLoader, epoch: int) -> dict:
        """训练一个epoch"""
        self.student.train()
        total_loss = 0
        total_soft_loss = 0
        total_hard_loss = 0
        correct = 0
        total = 0
        
        temperature = self.scheduler.get_temperature(epoch)
        
        for batch_idx, (data, target) in enumerate(dataloader):
            self.optimizer.zero_grad()
            
            # 教师预测(不计算梯度)
            with torch.no_grad():
                teacher_logits, _ = self.teacher(data)
            
            # 学生预测
            student_logits, _ = self.student(data)
            
            # 计算损失
            loss, soft_loss, hard_loss = self.criterion(
                student_logits, teacher_logits, target, temperature
            )
            
            loss.backward()
            self.optimizer.step()
            
            total_loss += loss.item()
            total_soft_loss += soft_loss.item()
            total_hard_loss += hard_loss.item()
            
            # 计算准确率(使用硬标签)
            pred = student_logits.argmax(dim=1)
            correct += pred.eq(target).sum().item()
            total += target.size(0)
        
        return {
            'loss': total_loss / len(dataloader),
            'soft_loss': total_soft_loss / len(dataloader),
            'hard_loss': total_hard_loss / len(dataloader),
            'accuracy': 100. * correct / total,
            'temperature': temperature
        }
    
    def evaluate(self, dataloader: DataLoader) -> float:
        """评估准确率"""
        self.student.eval()
        correct = 0
        total = 0
        
        with torch.no_grad():
            for data, target in dataloader:
                logits, _ = self.student(data)
                pred = logits.argmax(dim=1)
                correct += pred.eq(target).sum().item()
                total += target.size(0)
        
        return 100. * correct / total

class AttentionTransfer(nn.Module):
    """
    注意力转移:匹配教师与学生的特征图注意力
    
    使用激活值的平方作为注意力图
    """
    
    def __init__(self, student_channels: int, teacher_channels: int):
        super().__init__()
        # 适配器:如果通道数不匹配,使用1x1卷积适配
        if student_channels != teacher_channels:
            self.adapter = nn.Conv2d(student_channels, teacher_channels, 1)
        else:
            self.adapter = nn.Identity()
    
    def forward(self, student_features: torch.Tensor, 
                teacher_features: torch.Tensor) -> torch.Tensor:
        """
        计算注意力转移损失
        """
        # 计算注意力图(Frobenius范数的平方)
        student_att = self.adapter(student_features).pow(2).mean(dim=1, keepdim=True)
        teacher_att = teacher_features.pow(2).mean(dim=1, keepdim=True)
        
        # 归一化
        student_att = F.normalize(student_att.view(student_att.size(0), -1), dim=1)
        teacher_att = F.normalize(teacher_att.view(teacher_att.size(0), -1), dim=1)
        
        # 欧氏距离
        loss = torch.norm(student_att - teacher_att, p=2, dim=1).mean()
        return loss

if __name__ == "__main__":
    print("[1] 准备数据(合成MNIST-like数据)")
    np.random.seed(42)
    torch.manual_seed(42)
    
    # 生成合成数据
    n_samples = 5000
    X = np.random.randn(n_samples, 784).astype(np.float32)
    y = np.random.randint(0, 10, n_samples)
    
    # 添加一些结构使任务可学习
    for i in range(10):
        mask = y == i
        X[mask] += np.random.randn(784) * 0.3 * i
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    
    train_dataset = TensorDataset(torch.FloatTensor(X_train), torch.LongTensor(y_train))
    test_dataset = TensorDataset(torch.FloatTensor(X_test), torch.LongTensor(y_test))
    
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=64)
    
    print(f"Train samples: {len(train_dataset)}, Test samples: {len(test_dataset)}")
    
    print("\n[2] 训练教师网络")
    teacher = TeacherNet()
    optimizer_teacher = optim.Adam(teacher.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    
    # 快速训练教师
    for epoch in range(5):
        teacher.train()
        for data, target in train_loader:
            optimizer_teacher.zero_grad()
            logits, _ = teacher(data)
            loss = criterion(logits, target)
            loss.backward()
            optimizer_teacher.step()
        
        # 评估
        teacher.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for data, target in test_loader:
                logits, _ = teacher(data)
                pred = logits.argmax(dim=1)
                correct += pred.eq(target).sum().item()
                total += target.size(0)
        acc = 100. * correct / total
        print(f"Teacher Epoch {epoch+1}: Accuracy = {acc:.2f}%")
    
    teacher.eval()
    
    print("\n[3] 训练学生网络(无蒸馏)")
    student_baseline = StudentNet()
    optimizer_student = optim.Adam(student_baseline.parameters(), lr=0.001)
    
    for epoch in range(10):
        student_baseline.train()
        for data, target in train_loader:
            optimizer_student.zero_grad()
            logits, _ = student_baseline(data)
            loss = criterion(logits, target)
            loss.backward()
            optimizer_student.step()
    
    acc_baseline = DistillationTrainer(teacher, student_baseline, 
                                      TemperatureScheduler(1, 1, 10)).evaluate(test_loader)
    print(f"Baseline Student Accuracy: {acc_baseline:.2f}%")
    print(f"Student Parameters: {student_baseline.count_parameters()}")
    
    print("\n[4] 知识蒸馏训练")
    student_distilled = StudentNet()
    temp_scheduler = TemperatureScheduler(initial_temp=10.0, final_temp=1.0, 
                                         total_epochs=10, strategy='cosine')
    trainer = DistillationTrainer(teacher, student_distilled, temp_scheduler, 
                                 alpha=0.7, lr=0.001)
    
    for epoch in range(10):
        train_stats = trainer.train_epoch(train_loader, epoch)
        test_acc = trainer.evaluate(test_loader)
        print(f"Epoch {epoch+1}: Temp={train_stats['temperature']:.2f}, "
              f"Train Acc={train_stats['accuracy']:.2f}%, "
              f"Test Acc={test_acc:.2f}%, "
              f"Loss={train_stats['loss']:.4f}")
    
    print("\n[5] 不同温度策略对比")
    strategies = ['cosine', 'exponential', 'linear', 'constant']
    
    for strategy in strategies:
        student_test = StudentNet()
        scheduler = TemperatureScheduler(10.0, 1.0, 10, strategy)
        trainer_test = DistillationTrainer(teacher, student_test, scheduler, alpha=0.7)
        
        for epoch in range(10):
            trainer_test.train_epoch(train_loader, epoch)
        
        final_acc = trainer_test.evaluate(test_loader)
        print(f"{strategy.capitalize():12s} annealing: Final Accuracy = {final_acc:.2f}%")
    
    print("\n[6] 蒸馏 vs 直接训练对比")
    print(f"Teacher Accuracy:          ~85% (假设)")
    print(f"Student (Direct):          {acc_baseline:.2f}%")
    print(f"Student (Distilled):       {trainer.evaluate(test_loader):.2f}%")
    print(f"Improvement:               {trainer.evaluate(test_loader) - acc_baseline:.2f}%")
    print(f"Compression Ratio:         {teacher.count_parameters() / student_distilled.count_parameters():.1f}x")
9.3.2.3 ONNX Runtime与TensorRT的图优化与算子融合

ONNX Runtime作为跨平台的推理引擎,通过图优化器(Graph Optimizer)执行常量折叠、算子融合与布局转换等转换,将原始计算图转换为硬件友好的执行计划。算子融合(Operator Fusion)将多个连续算子(如Conv-BN-ReLU)合并为单个内核调用,减少内存搬运与内核启动开销。TensorRT作为NVIDIA的高性能推理优化器,针对GPU执行深度优化:层融合(Layer Fusion)消除张量传输,精度校准(Calibration)支持INT8量化,内核自动调优(Kernel Auto-Tuning)选择最优CUDA实现。两者结合形成完整的优化流水线:ONNX负责跨框架模型交换与基础图优化,TensorRT提供针对NVIDIA硬件的极致性能。以下代码实现了ONNX Runtime的图优化配置与TensorRT执行提供者的集成。

Python

"""
【脚本说明】
本脚本实现ONNX Runtime与TensorRT的推理优化,包含:
1. ONNX Runtime图优化级别配置(Basic/Extended/All)
2. 算子融合优化(Conv-BN-ReLU等模式匹配)
3. TensorRT执行提供者(Execution Provider)集成
4. 推理性能基准测试(FP32 vs FP16 vs INT8)
5. 动态形状(Dynamic Shapes)支持配置

使用方式:
- 需安装onnxruntime-gpu与tensorrt库
- 运行前确保有NVIDIA GPU与相应驱动
- 适用于生产环境的高吞吐量推理部署
"""

import numpy as np
import time
import torch
import torch.nn as nn
from typing import Dict, List, Tuple, Optional

# ONNX Runtime
import onnxruntime as ort
from onnxruntime.tools import optimizer

class ONNXRuntimeOptimizer:
    """ONNX Runtime推理优化器"""
    
    def __init__(self, model_path: str):
        self.model_path = model_path
        self.session_options = ort.SessionOptions()
    
    def configure_optimizations(self, 
                               graph_optimization_level: str = 'all',
                               enable_fp16: bool = False):
        """
        配置优化选项
        
        Graph Optimization Levels:
        - 'basic':     基础常量折叠与消除
        - 'extended':  扩展优化(包括融合)
        - 'all':       所有优化(包括布局转换)
        """
        level_map = {
            'basic': ort.GraphOptimizationLevel.ORT_ENABLE_BASIC,
            'extended': ort.GraphOptimizationLevel.ORT_ENABLE_EXTENDED,
            'all': ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        }
        
        self.session_options.graph_optimization_level = level_map.get(
            graph_optimization_level, ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        )
        
        # 设置线程数
        self.session_options.intra_op_num_threads = 4
        self.session_options.inter_op_num_threads = 4
        
        # 其他优化标志
        self.session_options.enable_mem_pattern = True
        self.session_options.enable_cpu_mem_arena = True
        
        if enable_fp16:
            # 半精度优化(需硬件支持)
            self.session_options.add_session_config_entry(
                'session.set_fp16', '1'
            )
    
    def create_inference_session(self, 
                                use_tensorrt: bool = False,
                                use_cuda: bool = True) -> ort.InferenceSession:
        """
        创建推理会话
        
        执行提供者优先级:
        1. TensorRT (NVIDIA GPU)
        2. CUDA (NVIDIA GPU)
        3. CPU
        """
        providers = ['CPUExecutionProvider']
        
        if use_cuda and not use_tensorrt:
            providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
        
        if use_tensorrt:
            # TensorRT配置
            trt_config = {
                'device_id': 0,
                'trt_max_workspace_size': 2147483648,  # 2GB
                'trt_fp16_enable': True,  # 启用FP16
                'trt_int8_enable': False,  # 可选启用INT8
                'trt_engine_cache_enable': True,
                'trt_engine_cache_path': './trt_cache'
            }
            providers = [
                ('TensorrtExecutionProvider', trt_config),
                'CUDAExecutionProvider',
                'CPUExecutionProvider'
            ]
        
        session = ort.InferenceSession(
            self.model_path,
            self.session_options,
            providers=providers
        )
        
        # 打印实际使用的提供者
        print(f"Using providers: {session.get_providers()}")
        return session
    
    def benchmark_inference(self, 
                           session: ort.InferenceSession,
                           input_shape: Tuple[int, ...],
                           n_runs: int = 100,
                           warmup: int = 10) -> Dict:
        """
        基准测试推理性能
        """
        input_name = session.get_inputs()[0].name
        
        # 生成随机输入
        input_data = np.random.randn(*input_shape).astype(np.float32)
        
        # 预热
        for _ in range(warmup):
            _ = session.run(None, {input_name: input_data})
        
        # 同步GPU(如果使用)
        if 'CUDA' in str(session.get_providers()):
            torch.cuda.synchronize()
        
        # 测试
        start = time.time()
        for _ in range(n_runs):
            outputs = session.run(None, {input_name: input_data})
        
        if 'CUDA' in str(session.get_providers()):
            torch.cuda.synchronize()
        
        elapsed = (time.time() - start) / n_runs * 1000  # ms
        
        return {
            'latency_ms': elapsed,
            'throughput': 1000 / elapsed,  # inferences/sec
            'output_shape': outputs[0].shape
        }

class GraphOptimizationAnalyzer:
    """图优化分析器"""
    
    def __init__(self, model_path: str):
        self.model_path = model_path
        self.optimized_models = {}
    
    def apply_static_optimizations(self, output_path: str):
        """
        应用静态图优化(离线优化)
        
        优化包括:
        - 常量折叠
        - 死代码消除
        - 算子融合(Conv+BN+ReLU等)
        - 布局转换(NCHW优化)
        """
        # 使用ONNX Runtime工具优化
        optimized_model = optimizer.optimize_model(
            self.model_path,
            model_type='bert',  # 或 'onnx' 通用类型
            use_gpu=True,
            num_heads=0,  # 非BERT模型设为0
            hidden_size=0
        )
        
        optimized_model.convert_model_float32_to_float16()  # 可选FP16转换
        optimized_model.save_model_to_file(output_path)
        
        print(f"Optimized model saved to: {output_path}")
        
        # 对比文件大小
        original_size = np.os.path.getsize(self.model_path) / 1024
        optimized_size = np.os.path.getsize(output_path) / 1024
        print(f"Size: {original_size:.2f} KB -> {optimized_size:.2f} KB "
              f"({(1-optimized_size/original_size)*100:.1f}% reduction)")
        
        return output_path
    
    def analyze_fused_operators(self, session: ort.InferenceSession) -> List[str]:
        """
        分析已融合的算子
        
        通过分析执行计划获取融合信息
        """
        # 获取模型图
        model = session._model_meta
        
        # 这里简化处理,实际应解析ONNX图节点属性
        fused_ops = []
        
        # 检查是否有TensorRT优化
        if 'TensorrtExecutionProvider' in session.get_providers():
            fused_ops.append("TensorRT Layer Fusion")
        
        return fused_ops

class DynamicShapeConfigurator:
    """动态形状配置器"""
    
    def __init__(self, model_path: str):
        self.model_path = model_path
    
    def configure_dynamic_axes(self, 
                              input_name: str,
                              dynamic_axes: Dict[int, str]) -> ort.SessionOptions:
        """
        配置动态形状
        
        Args:
            dynamic_axes: {axis_idx: axis_name, ...}
                        例如: {0: 'batch_size', 2: 'height'}
        """
        options = ort.SessionOptions()
        
        # 添加动态形状配置
        # 注意:实际配置在模型导出时通过dynamic_axes参数设置
        # 这里主要展示如何优化动态形状的推理
        
        options.add_free_dimension_override_by_name(
            list(dynamic_axes.values())[0], 1  # 默认batch=1
        )
        
        return options

class PrecisionCalibration:
    """精度校准(用于INT8量化)"""
    
    def __init__(self, model_path: str):
        self.model_path = model_path
    
    def calibrate_for_tensorrt(self, 
                              calibration_data: List[np.ndarray],
                              cache_path: str = 'calibration.cache'):
        """
        为TensorRT INT8模式进行校准
        
        收集激活值的动态范围用于量化
        """
        # 简化示例,实际应使用ONNX Runtime的校准工具
        print(f"Calibrating with {len(calibration_data)} samples...")
        
        # 统计各层的动态范围
        activation_ranges = {}
        for data in calibration_data:
            # 模拟前向传播统计
            # 实际应使用ONNX Runtime的IOptimizationProfile
            pass
        
        # 保存校准缓存
        with open(cache_path, 'w') as f:
            f.write("Calibration cache for INT8 inference")
        
        print(f"Calibration cache saved to: {cache_path}")
        
        return cache_path

if __name__ == "__main__":
    print("=" * 60)
    print("ONNX Runtime与TensorRT优化演示")
    print("=" * 60)
    
    # 创建简单ONNX模型用于测试
    print("\n[1] 创建测试模型")
    class TestModel(nn.Module):
        def __init__(self):
            super().__init__()
            self.conv1 = nn.Conv2d(3, 64, 3, padding=1)
            self.bn1 = nn.BatchNorm2d(64)
            self.relu = nn.ReLU()
            self.conv2 = nn.Conv2d(64, 128, 3, padding=1)
            self.bn2 = nn.BatchNorm2d(128)
            self.pool = nn.AdaptiveAvgPool2d(1)
            self.fc = nn.Linear(128, 10)
        
        def forward(self, x):
            x = self.relu(self.bn1(self.conv1(x)))
            x = self.relu(self.bn2(self.conv2(x)))
            x = self.pool(x)
            x = x.view(x.size(0), -1)
            return self.fc(x)
    
    model = TestModel()
    dummy_input = torch.randn(1, 3, 224, 224)
    onnx_path = "/tmp/test_model.onnx"
    
    torch.onnx.export(model, dummy_input, onnx_path, 
                     export_params=True,
                     opset_version=11,
                     do_constant_folding=True,
                     input_names=['input'],
                     output_names=['output'],
                     dynamic_axes={'input': {0: 'batch_size'}, 
                                  'output': {0: 'batch_size'}})
    print(f"Test model exported to: {onnx_path}")
    
    print("\n[2] ONNX Runtime基础优化")
    ort_optimizer = ONNXRuntimeOptimizer(onnx_path)
    
    # 不同优化级别对比
    for level in ['basic', 'extended', 'all']:
        print(f"\nOptimization Level: {level.upper()}")
        ort_optimizer.configure_optimizations(
            graph_optimization_level=level,
            enable_fp16=False
        )
        
        session = ort_optimizer.create_inference_session(use_tensorrt=False)
        stats = ort_optimizer.benchmark_inference(
            session, (1, 3, 224, 224), n_runs=50
        )
        print(f"  Latency: {stats['latency_ms']:.2f} ms")
        print(f"  Throughput: {stats['throughput']:.1f} inf/sec")
    
    print("\n[3] TensorRT优化(如可用)")
    try:
        ort_optimizer.configure_optimizations('all', enable_fp16=True)
        trt_session = ort_optimizer.create_inference_session(use_tensorrt=True)
        trt_stats = ort_optimizer.benchmark_inference(
            trt_session, (1, 3, 224, 224), n_runs=50
        )
        print(f"TensorRT Latency: {trt_stats['latency_ms']:.2f} ms")
        print(f"TensorRT Throughput: {trt_stats['throughput']:.1f} inf/sec")
        
        # 计算加速比
        baseline = 50.0  # 假设基础延迟50ms
        speedup = baseline / trt_stats['latency_ms']
        print(f"Speedup over baseline: {speedup:.2f}x")
    except Exception as e:
        print(f"TensorRT not available: {e}")
    
    print("\n[4] 静态图优化(离线)")
    analyzer = GraphOptimizationAnalyzer(onnx_path)
    optimized_path = "/tmp/test_model_optimized.onnx"
    analyzer.apply_static_optimizations(optimized_path)
    
    print("\n[5] 不同Batch Size性能测试")
    batch_sizes = [1, 4, 8, 16]
    for batch in batch_sizes:
        stats = ort_optimizer.benchmark_inference(
            ort_optimizer.create_inference_session(use_tensorrt=False),
            (batch, 3, 224, 224), n_runs=20
        )
        print(f"Batch {batch:2d}: {stats['latency_ms']:.2f} ms "
              f"({stats['throughput']*batch:.1f} samples/sec)")
    
    print("\n[6] 精度对比(FP32 vs FP16)")
    # 生成测试数据
    test_input = np.random.randn(1, 3, 224, 224).astype(np.float32)
    
    # FP32推理
    session_fp32 = ort_optimizer.create_inference_session()
    output_fp32 = session_fp32.run(None, {'input': test_input})[0]
    
    # FP16推理(如支持)
    try:
        session_fp16 = ort_optimizer.create_inference_session(use_tensorrt=True)
        output_fp16 = session_fp16.run(None, {'input': test_input})[0]
        
        max_diff = np.max(np.abs(output_fp32 - output_fp16))
        print(f"FP32 vs FP16 Max Difference: {max_diff:.6f}")
        print(f"Relative Error: {max_diff / np.abs(output_fp32).mean():.6f}")
    except:
        print("FP16 comparison skipped")
    
    print("\n注意:实际TensorRT优化需要NVIDIA GPU与TensorRT库")
    print("安装命令: pip install onnxruntime-gpu")

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐