环境声明

  • Python版本: Python 3.10+
  • PyTorch版本: PyTorch 2.0+
  • NumPy版本: NumPy 1.24+
  • Pandas版本: Pandas 2.0+
  • OpenCV版本: OpenCV 4.8+
  • 开发工具: PyCharm / VS Code / Jupyter Notebook
  • 操作系统: Windows / macOS / Linux(通用)

学习目标与摘要

本章学习目标

  1. 掌握数据质量评估与清洗的系统方法
  2. 理解特征工程的核心技术(缩放、编码、选择)
  3. 精通图像数据预处理与增强技术
  4. 掌握文本数据预处理流程(分词、向量化)
  5. 学会处理数据不平衡问题
  6. 能够设计高效的PyTorch数据管道

文章摘要:数据是深度学习的燃料,高质量的数据预处理能够显著提升模型性能。本章系统讲解数据工程的完整流程,从数据清洗、特征工程到数据增强,涵盖图像、文本、结构化数据等多种数据类型。你将学会识别和解决数据质量问题,掌握数据增强的最佳实践,并构建高效的数据管道。


1. 数据质量评估与清洗

1.1 数据质量维度

数据质量可以从以下维度评估:

维度 描述 常见问题
完整性 数据是否完整 缺失值、空记录
准确性 数据是否正确 异常值、错误标签
一致性 数据格式是否统一 单位不一致、命名混乱
时效性 数据是否过时 过期数据、概念漂移
唯一性 是否存在重复 重复样本、冗余特征

1.2 缺失值处理

识别缺失值

import pandas as pd
import numpy as np

# 创建示例数据
data = pd.DataFrame({
    'A': [1, 2, np.nan, 4, 5],
    'B': [10, np.nan, 30, 40, 50],
    'C': ['x', 'y', 'z', np.nan, 'x'],
    'D': [1.5, 2.5, 3.5, 4.5, np.nan]
})

print("原始数据:")
print(data)

# 统计缺失值
print("\n缺失值统计:")
print(data.isnull().sum())
print(f"\n缺失值比例:\n{data.isnull().sum() / len(data) * 100:.2f}%")

# 可视化缺失值
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10, 6))
sns.heatmap(data.isnull(), cbar=False, yticklabels=False, cmap='viridis')
plt.title('Missing Values Heatmap')
plt.savefig('missing_values_heatmap.png', dpi=150)
plt.show()

缺失值处理策略

# 策略1:删除包含缺失值的行
data_dropped = data.dropna()
print(f"删除缺失值后: {len(data_dropped)} 行")

# 策略2:删除包含缺失值的列
data_dropped_cols = data.dropna(axis=1)
print(f"删除缺失列后: {data_dropped_cols.columns.tolist()}")

# 策略3:填充缺失值(数值列)
# 均值填充
data_filled_mean = data.copy()
data_filled_mean['A'].fillna(data['A'].mean(), inplace=True)

# 中位数填充
data_filled_median = data.copy()
data_filled_median['A'].fillna(data['A'].median(), inplace=True)

# 策略4:填充缺失值(类别列)
# 众数填充
data_filled_mode = data.copy()
data_filled_mode['C'].fillna(data['C'].mode()[0], inplace=True)

# 策略5:前向/后向填充
data_ffill = data.fillna(method='ffill')  # 前向填充
data_bfill = data.fillna(method='bfill')  # 后向填充

# 策略6:插值填充
data_interpolated = data.copy()
data_interpolated['A'] = data['A'].interpolate(method='linear')

print("\n插值填充结果:")
print(data_interpolated)

深度学习中的缺失值处理

import torch
import torch.nn as nn

# 方法1:使用掩码(Mask)
def create_mask(tensor):
    """创建缺失值掩码,True表示有效值"""
    return ~torch.isnan(tensor)

# 方法2:学习填充(Datawig风格)
class LearnableImputation(nn.Module):
    """可学习的缺失值填充"""
    def __init__(self, num_features):
        super().__init__()
        self.imputation_values = nn.Parameter(torch.zeros(num_features))
    
    def forward(self, x):
        mask = ~torch.isnan(x)
        imputed = torch.where(mask, x, self.imputation_values)
        return imputed, mask

# 示例
x = torch.tensor([[1.0, 2.0, np.nan], 
                  [4.0, np.nan, 6.0],
                  [np.nan, 8.0, 9.0]])

imputation_layer = LearnableImputation(3)
imputed_x, mask = imputation_layer(x)
print(f"原始数据:\n{x}")
print(f"填充后:\n{imputed_x}")
print(f"掩码:\n{mask}")

1.3 异常值检测与处理

统计方法检测异常值

# 生成示例数据
np.random.seed(42)
normal_data = np.random.normal(100, 15, 1000)
outliers = np.random.uniform(200, 250, 50)
data_with_outliers = np.concatenate([normal_data, outliers])

# 方法1:Z-Score
from scipy import stats
z_scores = np.abs(stats.zscore(data_with_outliers))
outliers_zscore = data_with_outliers[z_scores > 3]
print(f"Z-Score检测到的异常值数量: {len(outliers_zscore)}")

# 方法2:IQR(四分位距)
Q1 = np.percentile(data_with_outliers, 25)
Q3 = np.percentile(data_with_outliers, 75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers_iqr = data_with_outliers[(data_with_outliers < lower_bound) | 
                                   (data_with_outliers > upper_bound)]
print(f"IQR检测到的异常值数量: {len(outliers_iqr)}")

# 可视化
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.hist(data_with_outliers, bins=50, alpha=0.7, edgecolor='black')
plt.axvline(lower_bound, color='r', linestyle='--', label='Lower Bound')
plt.axvline(upper_bound, color='r', linestyle='--', label='Upper Bound')
plt.title('Distribution with Outliers')
plt.legend()

plt.subplot(1, 2, 2)
plt.boxplot(data_with_outliers, vert=False)
plt.title('Box Plot')

plt.tight_layout()
plt.savefig('outlier_detection.png', dpi=150)
plt.show()

机器学习方法检测异常值

from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor

# 生成二维数据
np.random.seed(42)
X_normal = np.random.multivariate_normal([0, 0], [[1, 0.5], [0.5, 1]], 1000)
X_outliers = np.random.uniform(-4, 4, (50, 2))
X = np.vstack([X_normal, X_outliers])

# 方法1:Isolation Forest
iso_forest = IsolationForest(contamination=0.05, random_state=42)
outliers_iso = iso_forest.fit_predict(X) == -1

# 方法2:Local Outlier Factor
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.05)
outliers_lof = lof.fit_predict(X) == -1

# 可视化
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.scatter(X[~outliers_iso, 0], X[~outliers_iso, 1], c='blue', label='Normal', alpha=0.6)
plt.scatter(X[outliers_iso, 0], X[outliers_iso, 1], c='red', label='Outliers', alpha=0.8)
plt.title('Isolation Forest')
plt.legend()

plt.subplot(1, 2, 2)
plt.scatter(X[~outliers_lof, 0], X[~outliers_lof, 1], c='blue', label='Normal', alpha=0.6)
plt.scatter(X[outliers_lof, 0], X[outliers_lof, 1], c='red', label='Outliers', alpha=0.8)
plt.title('Local Outlier Factor')
plt.legend()

plt.tight_layout()
plt.savefig('ml_outlier_detection.png', dpi=150)
plt.show()

1.4 数据重复检测

# 创建包含重复的数据
duplicate_data = pd.DataFrame({
    'A': [1, 2, 3, 1, 2, 4],
    'B': ['x', 'y', 'z', 'x', 'y', 'w'],
    'C': [10, 20, 30, 10, 20, 40]
})

# 检测完全重复的行
print("完全重复的行:")
print(duplicate_data[duplicate_data.duplicated(keep=False)])

# 检测基于特定列的重复
print("\n基于A和B列的重复:")
print(duplicate_data[duplicate_data.duplicated(subset=['A', 'B'], keep=False)])

# 删除重复
data_unique = duplicate_data.drop_duplicates()
print(f"\n删除重复后: {len(data_unique)} 行")

# 保留最后一次出现的重复
data_last = duplicate_data.drop_duplicates(keep='last')
print(f"保留最后一次: \n{data_last}")

2. 特征工程基础

2.1 特征缩放

标准化(Standardization)

from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler

# 生成示例数据
data = np.array([[1, 200], [2, 300], [3, 400], [4, 500], [5, 600]], dtype=float)

# Z-score标准化(均值为0,方差为1)
scaler_standard = StandardScaler()
data_standardized = scaler_standard.fit_transform(data)

print("原始数据:")
print(data)
print("\n标准化后(Z-score):")
print(data_standardized)
print(f"均值: {data_standardized.mean(axis=0)}")
print(f"标准差: {data_standardized.std(axis=0)}")

# Min-Max归一化(缩放到[0,1])
scaler_minmax = MinMaxScaler()
data_normalized = scaler_minmax.fit_transform(data)

print("\n归一化后(Min-Max):")
print(data_normalized)
print(f"最小值: {data_normalized.min(axis=0)}")
print(f"最大值: {data_normalized.max(axis=0)}")

# Robust缩放(使用中位数和IQR,对异常值鲁棒)
scaler_robust = RobustScaler()
data_robust = scaler_robust.fit_transform(data)

print("\nRobust缩放后:")
print(data_robust)

缩放方法选择指南

方法 适用场景 优点 缺点
Z-score 数据近似正态分布 保留分布形状 对异常值敏感
Min-Max 需要固定范围(如[0,1]) 保留原始范围信息 受异常值影响大
Robust 存在异常值 对异常值鲁棒 改变分布形状
MaxAbs 稀疏数据 保持稀疏性 受异常值影响

PyTorch中的特征缩放

import torch
import torch.nn as nn

# 方法1:使用BatchNorm进行动态标准化
batch_norm = nn.BatchNorm1d(num_features=2)

# 示例数据
x = torch.tensor([[1.0, 200], [2.0, 300], [3.0, 400], [4.0, 500], [5.0, 600]])
x_normalized = batch_norm(x)

print("原始数据:")
print(x)
print("\nBatchNorm后:")
print(x_normalized)

# 方法2:自定义标准化层
class StandardizationLayer(nn.Module):
    def __init__(self, num_features):
        super().__init__()
        self.register_buffer('mean', torch.zeros(num_features))
        self.register_buffer('std', torch.ones(num_features))
        self.register_buffer('initialized', torch.tensor(False))
    
    def forward(self, x):
        if not self.initialized and self.training:
            self.mean = x.mean(dim=0)
            self.std = x.std(dim=0) + 1e-8
            self.initialized.fill_(True)
        
        return (x - self.mean) / self.std

std_layer = StandardizationLayer(2)
x_std = std_layer(x)
print("\n自定义标准化后:")
print(x_std)

2.2 特征编码

类别特征编码

from sklearn.preprocessing import LabelEncoder, OneHotEncoder, OrdinalEncoder

# 示例数据
categorical_data = pd.DataFrame({
    'color': ['red', 'blue', 'green', 'red', 'blue'],
    'size': ['S', 'M', 'L', 'M', 'S'],
    'category': ['A', 'B', 'A', 'C', 'B']
})

# 方法1:标签编码(Label Encoding)
label_encoders = {}
data_label_encoded = categorical_data.copy()

for col in categorical_data.columns:
    le = LabelEncoder()
    data_label_encoded[col] = le.fit_transform(categorical_data[col])
    label_encoders[col] = le

print("标签编码:")
print(data_label_encoded)

# 方法2:One-Hot编码
onehot_encoder = OneHotEncoder(sparse_output=False)
data_onehot = onehot_encoder.fit_transform(categorical_data)

print("\nOne-Hot编码:")
print(data_onehot)
print(f"特征名: {onehot_encoder.get_feature_names_out()}")

# 方法3:Pandas的get_dummies(更方便)
data_dummies = pd.get_dummies(categorical_data, prefix=['color', 'size', 'cat'])
print("\nPandas One-Hot:")
print(data_dummies)

嵌入编码(Embedding)

import torch.nn as nn

# 在深度学习中,通常使用Embedding层处理高基数类别特征
class CategoricalEmbedding(nn.Module):
    def __init__(self, num_categories, embedding_dim):
        super().__init__()
        self.embedding = nn.Embedding(num_categories, embedding_dim)
    
    def forward(self, x):
        return self.embedding(x)

# 示例:颜色编码(假设有10种颜色,嵌入维度为4)
color_embedding = CategoricalEmbedding(num_categories=10, embedding_dim=4)

# 输入是颜色索引
color_indices = torch.tensor([0, 1, 2, 0, 1])  # red, blue, green, red, blue
embedded_colors = color_embedding(color_indices)

print(f"颜色索引: {color_indices}")
print(f"嵌入向量形状: {embedded_colors.shape}")
print(f"嵌入向量:\n{embedded_colors}")

2.3 特征选择

过滤法(Filter Methods)

from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.datasets import load_iris

# 加载数据
data = load_iris()
X, y = data.data, data.target

# 方法1:基于方差分析(ANOVA F-value)
selector_f = SelectKBest(score_func=f_classif, k=2)
X_selected_f = selector_f.fit_transform(X, y)

print("F-value特征选择:")
print(f"选择的特征索引: {selector_f.get_support(indices=True)}")
print(f"特征得分: {selector_f.scores_}")

# 方法2:基于互信息
selector_mi = SelectKBest(score_func=mutual_info_classif, k=2)
X_selected_mi = selector_mi.fit_transform(X, y)

print("\n互信息特征选择:")
print(f"选择的特征索引: {selector_mi.get_support(indices=True)}")
print(f"特征得分: {selector_mi.scores_}")

包装法(Wrapper Methods)

from sklearn.feature_selection import RFE
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

# 递归特征消除(RFE)
model = LogisticRegression(max_iter=200)
selector_rfe = RFE(estimator=model, n_features_to_select=2, step=1)
X_selected_rfe = selector_rfe.fit_transform(X, y)

print("RFE特征选择:")
print(f"选择的特征: {selector_rfe.support_}")
print(f"特征排名: {selector_rfe.ranking_}")

# 基于树模型的特征重要性
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X, y)

print("\n随机森林特征重要性:")
for name, importance in zip(data.feature_names, rf.feature_importances_):
    print(f"  {name}: {importance:.4f}")

嵌入法(Embedded Methods)

from sklearn.linear_model import Lasso, Ridge
from sklearn.feature_selection import SelectFromModel

# L1正则化(Lasso)自动进行特征选择
lasso = Lasso(alpha=0.1)
lasso.fit(X, y)

print("Lasso系数:")
print(lasso.coef_)
print(f"非零系数数量: {np.sum(lasso.coef_ != 0)}")

# 使用SelectFromModel
selector_l1 = SelectFromModel(Lasso(alpha=0.1), max_features=2)
X_selected_l1 = selector_l1.fit_transform(X, y)
print(f"\nL1选择的特征: {selector_l1.get_support()}")

3. 图像数据预处理

3.1 图像基础操作

import cv2
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

# 读取图像(多种方式)
# OpenCV读取(BGR格式)
img_cv = cv2.imread('example.jpg')
if img_cv is None:
    # 创建示例图像
    img_cv = np.random.randint(0, 255, (224, 224, 3), dtype=np.uint8)

# PIL读取(RGB格式)
img_pil = Image.fromarray(cv2.cvtColor(img_cv, cv2.COLOR_BGR2RGB))

# 转换为RGB
img_rgb = cv2.cvtColor(img_cv, cv2.COLOR_BGR2RGB)

# 基本操作
print(f"图像形状: {img_rgb.shape}")
print(f"数据类型: {img_rgb.dtype}")
print(f"像素值范围: [{img_rgb.min()}, {img_rgb.max()}]")

# 调整大小
img_resized = cv2.resize(img_rgb, (128, 128))

# 裁剪
center_x, center_y = img_rgb.shape[1] // 2, img_rgb.shape[0] // 2
img_cropped = img_rgb[center_y-50:center_y+50, center_x-50:center_x+50]

# 可视化
fig, axes = plt.subplots(2, 2, figsize=(10, 10))
axes[0, 0].imshow(img_rgb)
axes[0, 0].set_title('Original')
axes[0, 1].imshow(img_resized)
axes[0, 1].set_title('Resized (128x128)')
axes[1, 0].imshow(img_cropped)
axes[1, 0].set_title('Cropped (100x100)')
axes[1, 1].hist(img_rgb.ravel(), bins=256, range=(0, 256))
axes[1, 1].set_title('Histogram')

plt.tight_layout()
plt.savefig('basic_image_ops.png', dpi=150)
plt.show()

3.2 图像数据增强

几何变换

import albumentations as A
from albumentations.pytorch import ToTensorV2

# 定义增强管道
transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.2),
    A.RandomRotate90(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.2, rotate_limit=30, p=0.5),
    A.RandomResizedCrop(height=224, width=224, scale=(0.8, 1.0), p=0.5),
])

# 应用增强
augmented = transform(image=img_rgb)['image']

# 可视化多个增强结果
fig, axes = plt.subplots(2, 3, figsize=(12, 8))
axes[0, 0].imshow(img_rgb)
axes[0, 0].set_title('Original')

for i in range(5):
    aug_img = transform(image=img_rgb)['image']
    row, col = (i + 1) // 3, (i + 1) % 3
    axes[row, col].imshow(aug_img)
    axes[row, col].set_title(f'Augmented {i+1}')

plt.tight_layout()
plt.savefig('geometric_augmentation.png', dpi=150)
plt.show()

颜色变换

# 颜色增强
color_transform = A.Compose([
    A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3, p=0.5),
    A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20, p=0.5),
    A.RGBShift(r_shift_limit=20, g_shift_limit=20, b_shift_limit=20, p=0.5),
    A.RandomGamma(gamma_limit=(80, 120), p=0.3),
])

# 可视化
fig, axes = plt.subplots(2, 3, figsize=(12, 8))
axes[0, 0].imshow(img_rgb)
axes[0, 0].set_title('Original')

for i in range(5):
    aug_img = color_transform(image=img_rgb)['image']
    row, col = (i + 1) // 3, (i + 1) % 3
    axes[row, col].imshow(aug_img)
    axes[row, col].set_title(f'Color Aug {i+1}')

plt.tight_layout()
plt.savefig('color_augmentation.png', dpi=150)
plt.show()

高级增强技术

# Mixup和CutMix(需要批量处理)
class MixUp:
    """Mixup数据增强"""
    def __init__(self, alpha=0.4):
        self.alpha = alpha
    
    def __call__(self, img1, img2):
        lam = np.random.beta(self.alpha, self.alpha)
        mixed_img = lam * img1 + (1 - lam) * img2
        return mixed_img, lam

class CutMix:
    """CutMix数据增强"""
    def __init__(self, alpha=1.0):
        self.alpha = alpha
    
    def __call__(self, img1, img2):
        lam = np.random.beta(self.alpha, self.alpha)
        
        H, W = img1.shape[:2]
        cut_ratio = np.sqrt(1 - lam)
        cut_w, cut_h = int(W * cut_ratio), int(H * cut_ratio)
        
        cx, cy = np.random.randint(W), np.random.randint(H)
        x1 = np.clip(cx - cut_w // 2, 0, W)
        y1 = np.clip(cy - cut_h // 2, 0, H)
        x2 = np.clip(cx + cut_w // 2, 0, W)
        y2 = np.clip(cy + cut_h // 2, 0, H)
        
        mixed_img = img1.copy()
        mixed_img[y1:y2, x1:x2] = img2[y1:y2, x1:x2]
        
        # 调整lambda以匹配像素比例
        lam = 1 - ((x2 - x1) * (y2 - y1) / (W * H))
        
        return mixed_img, lam

# 创建第二张示例图像
img2 = np.random.randint(0, 255, img_rgb.shape, dtype=np.uint8)

# 应用Mixup和CutMix
mixup = MixUp()
cutmix = CutMix()

mixed_img, lam_mixup = mixup(img_rgb.astype(float) / 255, img2.astype(float) / 255)
cutmixed_img, lam_cutmix = cutmix(img_rgb, img2)

fig, axes = plt.subplots(1, 3, figsize=(15, 5))
axes[0].imshow(img_rgb)
axes[0].set_title('Image 1')
axes[1].imshow(mixed_img)
axes[1].set_title(f'MixUp (λ={lam_mixup:.2f})')
axes[2].imshow(cutmixed_img)
axes[2].set_title(f'CutMix (λ={lam_cutmix:.2f})')

plt.tight_layout()
plt.savefig('advanced_augmentation.png', dpi=150)
plt.show()

3.3 图像归一化与标准化

# ImageNet统计值
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]

# PyTorch transforms
import torchvision.transforms as transforms

transform_pipeline = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),  # 转换为[0,1]并调整维度为(C,H,W)
    transforms.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD)
])

# 应用转换
img_tensor = transform_pipeline(img_rgb)
print(f"Tensor形状: {img_tensor.shape}")
print(f"Tensor范围: [{img_tensor.min():.3f}, {img_tensor.max():.3f}]")

# 反归一化(用于可视化)
def denormalize(tensor, mean, std):
    """反归一化"""
    for t, m, s in zip(tensor, mean, std):
        t.mul_(s).add_(m)
    return tensor

img_denorm = denormalize(img_tensor.clone(), IMAGENET_MEAN, IMAGENET_STD)
img_denorm = img_denorm.permute(1, 2, 0).numpy()
img_denorm = np.clip(img_denorm, 0, 1)

plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.imshow(img_rgb)
plt.title('Original')
plt.subplot(1, 2, 2)
plt.imshow(img_denorm)
plt.title('After Transform + Denormalize')
plt.tight_layout()
plt.savefig('normalization.png', dpi=150)
plt.show()

4. 文本数据预处理

4.1 文本清洗

import re
import string

# 示例文本
text = """
Hello!!! This is an EXAMPLE text with some issues...   
It has extra spaces, punctuation!!!, and UPPERCASE letters.
Check out https://example.com and email@domain.com
Also has numbers like 12345 and special chars @#$%
"""

def clean_text(text):
    """文本清洗函数"""
    # 转为小写
    text = text.lower()
    
    # 移除URL
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    
    # 移除邮箱
    text = re.sub(r'\S+@\S+', '', text)
    
    # 移除数字
    text = re.sub(r'\d+', '', text)
    
    # 移除标点符号
    text = text.translate(str.maketrans('', '', string.punctuation))
    
    # 移除多余空格
    text = ' '.join(text.split())
    
    return text

cleaned_text = clean_text(text)
print("原始文本:")
print(text)
print("\n清洗后文本:")
print(cleaned_text)

4.2 分词技术

# 方法1:空格分词(简单)
def simple_tokenize(text):
    return text.split()

# 方法2:正则表达式分词
def regex_tokenize(text):
    return re.findall(r'\b\w+\b', text.lower())

# 方法3:使用NLTK
import nltk
# nltk.download('punkt')  # 首次使用需要下载
from nltk.tokenize import word_tokenize

# 方法4:使用spaCy(推荐)
import spacy
# nlp = spacy.load('en_core_web_sm')  # 首次使用需要下载

text = "Hello world! This is a test sentence."
print(f"简单分词: {simple_tokenize(text)}")
print(f"正则分词: {regex_tokenize(text)}")

# 中文分词示例(使用jieba)
import jieba
chinese_text = "深度学习是机器学习的一个分支"
chinese_tokens = jieba.lcut(chinese_text)
print(f"中文分词: {chinese_tokens}")

4.3 子词分词(BPE、WordPiece)

from transformers import BertTokenizer, GPT2Tokenizer

# BERT的WordPiece分词
bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

text = "Hello, this is tokenization!"
bert_tokens = bert_tokenizer.tokenize(text)
bert_ids = bert_tokenizer.encode(text)

print(f"原始文本: {text}")
print(f"BERT分词: {bert_tokens}")
print(f"BERT IDs: {bert_ids}")

# GPT-2的BPE分词
gpt2_tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
gpt2_tokens = gpt2_tokenizer.tokenize(text)
gpt2_ids = gpt2_tokenizer.encode(text)

print(f"\nGPT-2分词: {gpt2_tokens}")
print(f"GPT-2 IDs: {gpt2_ids}")

# 比较不同分词器对罕见词的处理
rare_word = "tokenization"
print(f"\n罕见词 '{rare_word}':")
print(f"  BERT: {bert_tokenizer.tokenize(rare_word)}")
print(f"  GPT-2: {gpt2_tokenizer.tokenize(rare_word)}")

4.4 文本向量化

from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer

# 示例语料
corpus = [
    "This is the first document.",
    "This document is the second document.",
    "And this is the third one.",
    "Is this the first document?"
]

# 词袋模型(Bag of Words)
count_vectorizer = CountVectorizer()
X_count = count_vectorizer.fit_transform(corpus)

print("词袋模型:")
print(f"词汇表: {count_vectorizer.get_feature_names_out()}")
print(f"向量形状: {X_count.shape}")
print(f"向量表示:\n{X_count.toarray()}")

# TF-IDF
tfidf_vectorizer = TfidfVectorizer()
X_tfidf = tfidf_vectorizer.fit_transform(corpus)

print("\nTF-IDF:")
print(f"向量形状: {X_tfidf.shape}")
print(f"TF-IDF表示:\n{X_tfidf.toarray()}")

词嵌入(Word Embeddings)

import torch
import torch.nn as nn

# 使用预训练词嵌入(GloVe)
from torchtext.vocab import GloVe

# 加载GloVe(首次使用会自动下载)
# glove = GloVe(name='6B', dim=100)

# 自定义词嵌入层
vocab_size = 10000
embedding_dim = 100

embedding_layer = nn.Embedding(vocab_size, embedding_dim)

# 示例:将词ID转换为嵌入向量
word_ids = torch.tensor([1, 5, 100, 500])
word_embeddings = embedding_layer(word_ids)

print(f"词ID: {word_ids}")
print(f"嵌入形状: {word_embeddings.shape}")
print(f"嵌入向量:\n{word_embeddings}")

# 位置编码(Transformer中使用)
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                            (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))
    
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

pos_encoding = PositionalEncoding(embedding_dim)
seq_embeddings = word_embeddings.unsqueeze(0)  # 添加batch维度
encoded = pos_encoding(seq_embeddings)

print(f"\n位置编码后形状: {encoded.shape}")

5. 数据不平衡处理

5.1 重采样策略

from sklearn.datasets import make_classification
from collections import Counter

# 生成不平衡数据
X, y = make_classification(n_samples=1000, n_classes=2, weights=[0.9, 0.1], 
                           n_features=20, random_state=42)

print(f"类别分布: {Counter(y)}")

# 可视化
plt.figure(figsize=(8, 4))
plt.bar(Counter(y).keys(), Counter(y).values())
plt.xlabel('Class')
plt.ylabel('Count')
plt.title('Imbalanced Class Distribution')
plt.savefig('imbalanced_data.png', dpi=150)
plt.show()

过采样(Oversampling)

from imblearn.over_sampling import RandomOverSampler, SMOTE, ADASYN

# 随机过采样
ros = RandomOverSampler(random_state=42)
X_ros, y_ros = ros.fit_resample(X, y)
print(f"随机过采样后: {Counter(y_ros)}")

# SMOTE(合成少数类)
smote = SMOTE(random_state=42)
X_smote, y_smote = smote.fit_resample(X, y)
print(f"SMOTE后: {Counter(y_smote)}")

# ADASYN(自适应合成)
adasyn = ADASYN(random_state=42)
X_adasyn, y_adasyn = adasyn.fit_resample(X, y)
print(f"ADASYN后: {Counter(y_adasyn)}")

# 可视化SMOTE效果
from sklearn.decomposition import PCA

pca = PCA(n_components=2)
X_pca = pca.fit_transform(X)
X_smote_pca = pca.transform(X_smote)

plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.scatter(X_pca[y==0, 0], X_pca[y==0, 1], label='Class 0', alpha=0.6)
plt.scatter(X_pca[y==1, 0], X_pca[y==1, 1], label='Class 1', alpha=0.6)
plt.title('Original Data')
plt.legend()

plt.subplot(1, 2, 2)
plt.scatter(X_smote_pca[y_smote==0, 0], X_smote_pca[y_smote==0, 1], 
           label='Class 0', alpha=0.6)
plt.scatter(X_smote_pca[y_smote==1, 0], X_smote_pca[y_smote==1, 1], 
           label='Class 1', alpha=0.6)
plt.title('After SMOTE')
plt.legend()

plt.tight_layout()
plt.savefig('smote_comparison.png', dpi=150)
plt.show()

欠采样(Undersampling)

from imblearn.under_sampling import RandomUnderSampler, TomekLinks, EditedNearestNeighbours

# 随机欠采样
rus = RandomUnderSampler(random_state=42)
X_rus, y_rus = rus.fit_resample(X, y)
print(f"随机欠采样后: {Counter(y_rus)}")

# Tomek Links(清理边界样本)
tomek = TomekLinks()
X_tomek, y_tomek = tomek.fit_resample(X, y)
print(f"Tomek Links后: {Counter(y_tomek)}")

# 组合采样:SMOTE + Tomek
from imblearn.combine import SMOTETomek
smote_tomek = SMOTETomek(random_state=42)
X_combined, y_combined = smote_tomek.fit_resample(X, y)
print(f"SMOTE+Tomek后: {Counter(y_combined)}")

5.2 类别权重

import torch
import torch.nn as nn
from sklearn.utils.class_weight import compute_class_weight

# 计算类别权重
class_weights = compute_class_weight('balanced', classes=np.unique(y), y=y)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float32)

print(f"类别权重: {class_weights}")

# 在损失函数中使用类别权重
criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)

# 示例前向传播
outputs = torch.randn(4, 2)  # batch_size=4, num_classes=2
targets = torch.tensor([0, 1, 0, 1])

loss = criterion(outputs, targets)
print(f"加权损失: {loss.item():.4f}")

# 对比不加权重
 criterion_unweighted = nn.CrossEntropyLoss()
loss_unweighted = criterion_unweighted(outputs, targets)
print(f"不加权重损失: {loss_unweighted.item():.4f}")

5.3 Focal Loss

class FocalLoss(nn.Module):
    """
    Focal Loss for Dense Object Detection
    解决类别不平衡问题,让模型更关注难分类样本
    """
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction
    
    def forward(self, inputs, targets):
        ce_loss = nn.functional.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-ce_loss)  # 预测概率
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        
        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss

# 使用Focal Loss
focal_criterion = FocalLoss(alpha=1, gamma=2)
loss_focal = focal_criterion(outputs, targets)
print(f"Focal Loss: {loss_focal.item():.4f}")

# 对比不同gamma值
for gamma in [0, 1, 2, 5]:
    focal = FocalLoss(gamma=gamma)
    loss = focal(outputs, targets)
    print(f"Gamma={gamma}: Loss={loss.item():.4f}")

6. PyTorch数据管道设计

6.1 自定义Dataset

from torch.utils.data import Dataset, DataLoader

class CustomDataset(Dataset):
    """
    自定义数据集类
    
    参数:
        data: 特征数据
        labels: 标签
        transform: 数据变换
    """
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        sample = self.data[idx]
        label = self.labels[idx]
        
        if self.transform:
            sample = self.transform(sample)
        
        return sample, label

# 创建示例数据集
data = np.random.randn(1000, 10).astype(np.float32)
labels = np.random.randint(0, 3, 1000)

dataset = CustomDataset(data, labels)

# 测试数据集
sample, label = dataset[0]
print(f"样本形状: {sample.shape}")
print(f"标签: {label}")
print(f"数据集大小: {len(dataset)}")

6.2 图像数据集

class ImageDataset(Dataset):
    """图像数据集类"""
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        # 读取图像
        image_path = self.image_paths[idx]
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        label = self.labels[idx]
        
        if self.transform:
            augmented = self.transform(image=image)
            image = augmented['image']
        
        return image, label

# 定义图像变换
train_transform = A.Compose([
    A.Resize(224, 224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2()
])

val_transform = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2()
])

6.3 高效DataLoader

# DataLoader配置
dataloader = DataLoader(
    dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4,  # 多进程数据加载
    pin_memory=True,  # 加速GPU数据传输
    drop_last=True,  # 丢弃不完整的最后一个batch
    prefetch_factor=2  # 预加载batch数
)

# 迭代DataLoader
for batch_idx, (data, target) in enumerate(dataloader):
    print(f"Batch {batch_idx}: data shape {data.shape}, target shape {target.shape}")
    if batch_idx >= 2:
        break

6.4 处理变长序列

from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

class SequenceDataset(Dataset):
    """变长序列数据集"""
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels
    
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return self.sequences[idx], self.labels[idx]

# 创建变长序列数据
sequences = [
    torch.randn(10, 5),
    torch.randn(15, 5),
    torch.randn(8, 5),
    torch.randn(20, 5)
]
labels = torch.tensor([0, 1, 0, 1])

seq_dataset = SequenceDataset(sequences, labels)

def collate_fn(batch):
    """自定义collate函数处理变长序列"""
    sequences, labels = zip(*batch)
    
    # 获取每个序列的长度
    lengths = torch.tensor([len(seq) for seq in sequences])
    
    # 填充序列
    padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=0)
    
    # 打包序列(用于RNN)
    packed_sequences = pack_padded_sequence(
        padded_sequences, lengths, batch_first=True, enforce_sorted=False
    )
    
    return packed_sequences, torch.tensor(labels)

# 使用自定义collate函数
seq_dataloader = DataLoader(seq_dataset, batch_size=2, collate_fn=collate_fn)

for packed_seq, batch_labels in seq_dataloader:
    print(f"Packed sequence: {packed_seq}")
    print(f"Labels: {batch_labels}")
    break

6.5 数据加载最佳实践

# 完整的训练数据管道示例
import torchvision.transforms as T
from torch.utils.data import random_split

class DataPipeline:
    """完整的数据管道类"""
    def __init__(self, data_dir, batch_size=32, num_workers=4):
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.num_workers = num_workers
        
        # 定义变换
        self.train_transform = T.Compose([
            T.Resize(256),
            T.RandomCrop(224),
            T.RandomHorizontalFlip(),
            T.ColorJitter(brightness=0.2, contrast=0.2),
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        
        self.val_transform = T.Compose([
            T.Resize(256),
            T.CenterCrop(224),
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
    
    def get_dataloaders(self):
        """获取训练和验证DataLoader"""
        # 加载数据集(使用torchvision.datasets)
        from torchvision.datasets import ImageFolder
        
        full_dataset = ImageFolder(self.data_dir, transform=self.train_transform)
        
        # 划分训练集和验证集
        train_size = int(0.8 * len(full_dataset))
        val_size = len(full_dataset) - train_size
        train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])
        
        # 验证集使用不同的变换
        val_dataset.dataset.transform = self.val_transform
        
        # 创建DataLoader
        train_loader = DataLoader(
            train_dataset,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=self.num_workers,
            pin_memory=True,
            drop_last=True
        )
        
        val_loader = DataLoader(
            val_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers,
            pin_memory=True
        )
        
        return train_loader, val_loader

# 使用示例
# pipeline = DataPipeline(data_dir='path/to/data', batch_size=32)
# train_loader, val_loader = pipeline.get_dataloaders()

7. 避坑小贴士

7.1 数据泄露问题

# 错误做法:先标准化再划分训练/测试集
from sklearn.preprocessing import StandardScaler

# 错误示例
X_train, X_test = X[:800], X[800:]
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)  # 这是正确的

# 严重错误:在划分前对整个数据集进行标准化
X_scaled = scaler.fit_transform(X)  # 信息泄露!
X_train_leaked, X_test_leaked = X_scaled[:800], X_scaled[800:]

# 正确做法:数据预处理必须在训练集上fit,然后transform所有数据
# 交叉验证中的数据泄露
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC

# 正确:使用Pipeline确保每次fold都重新fit
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('svm', SVC())
])
scores = cross_val_score(pipeline, X, y, cv=5)
print(f"交叉验证分数: {scores.mean():.4f} (+/- {scores.std():.4f})")

7.2 图像预处理常见错误

# 错误1:归一化顺序错误
# 错误:先Normalize再ToTensor
wrong_transform = T.Compose([
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # 错误!
    T.ToTensor()
])

# 正确:先ToTensor再Normalize
correct_transform = T.Compose([
    T.ToTensor(),  # 将[H,W,C]转为[C,H,W]并归一化到[0,1]
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 错误2:混淆OpenCV和PIL的颜色通道
# OpenCV读取的是BGR格式,PIL是RGB格式
img_cv = cv2.imread('image.jpg')  # BGR
img_rgb = cv2.cvtColor(img_cv, cv2.COLOR_BGR2RGB)  # 转换为RGB

# 错误3:忘记调整图像大小
# 不同大小的图像不能直接组成batch

7.3 文本处理注意事项

# 注意1:词汇表大小设置
# 词汇表太小 -> 大量未知词(UNK)
# 词汇表太大 -> 内存占用高,训练困难

# 注意2:序列长度截断与填充
# 应该根据数据分布选择合适的长度
seq_lengths = [len(seq) for seq in sequences]
print(f"序列长度统计: 平均={np.mean(seq_lengths):.1f}, "
      f"中位数={np.median(seq_lengths):.1f}, "
      f"95分位数={np.percentile(seq_lengths, 95):.1f}")

# 注意3:特殊token的处理
# 确保padding、UNK、CLS、SEP等token有正确的ID
# vocab = {'<PAD>': 0, '<UNK>': 1, '<CLS>': 2, '<SEP>': 3, ...}

7.4 数据增强注意事项

# 注意1:验证集和测试集不应该做数据增强
# 训练时使用增强
train_transform = A.Compose([...])  # 包含增强

# 验证/测试时不使用增强
val_transform = A.Compose([...])  # 只有必要的预处理

# 注意2:增强强度要适中
# 过强的增强会使数据分布发生太大变化
# 过弱的增强效果不明显

# 注意3:某些任务需要特殊的增强策略
# 医学图像:谨慎使用增强,避免改变病理特征
# OCR:避免几何变换,保持文字可读性

8. 本章小结与知识点回顾

核心知识点总结

数据质量

  • 缺失值处理:删除、填充(均值/中位数/众数/插值)、学习填充
  • 异常值检测:Z-score、IQR、Isolation Forest、LOF
  • 重复数据:检测并删除

特征工程

  • 特征缩放:Z-score、Min-Max、Robust Scaling
  • 特征编码:Label Encoding、One-Hot、Embedding
  • 特征选择:Filter、Wrapper、Embedded方法

图像预处理

  • 基础操作:读取、调整大小、裁剪、归一化
  • 数据增强:几何变换、颜色变换、Mixup/CutMix
  • 标准化:ImageNet统计值、自定义统计值

文本预处理

  • 清洗:去噪、标准化、去除特殊字符
  • 分词:空格分词、BPE、WordPiece
  • 向量化:词袋、TF-IDF、词嵌入

数据不平衡

  • 重采样:过采样(SMOTE、ADASYN)、欠采样
  • 类别权重:调整损失函数权重
  • Focal Loss:关注难分类样本

数据管道

  • 自定义Dataset:继承Dataset类实现__getitem__和__len__
  • DataLoader:批处理、多进程、内存优化
  • 变长序列:使用pack_padded_sequence处理

关键工具速查

任务 工具/库 关键函数/类
缺失值处理 Pandas fillna(), dropna(), interpolate()
异常值检测 Scikit-learn IsolationForest, LocalOutlierFactor
特征缩放 Scikit-learn StandardScaler, MinMaxScaler
特征编码 Scikit-learn LabelEncoder, OneHotEncoder
图像增强 Albumentations Compose, 各种变换类
文本分词 Transformers BertTokenizer, GPT2Tokenizer
不平衡处理 imbalanced-learn SMOTE, RandomOverSampler
数据加载 PyTorch Dataset, DataLoader

学习建议

  1. 数据探索:在建模前充分了解数据分布和质量
  2. 可视化:使用图表直观理解数据特征
  3. 迭代优化:根据模型反馈调整预处理策略
  4. 可复现性:记录所有预处理参数和随机种子
  5. 领域知识:结合具体任务选择合适的预处理方法

补充:数据工程通常占深度学习项目的70%时间,但常被忽视。高质量的数据预处理往往比复杂的模型架构更能提升性能。记住:Garbage in, garbage out。


本文首发于 CSDN 专栏《深度学习精通》,转载请注明出处。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐