主题094:结构健康监测中的迁移学习

1. 迁移学习概述

1.1 什么是迁移学习

迁移学习(Transfer Learning)是一种机器学习方法,它将从一个任务或领域学习到的知识应用到另一个相关但不同的任务或领域。在结构健康监测中,迁移学习可以有效解决数据稀缺和模型泛化问题。

核心思想:

  • 源域(Source Domain):已有大量标注数据的领域
  • 目标域(Target Domain):数据稀缺但需要解决问题的领域
  • 知识迁移:将源域学习到的特征表示或模型参数迁移到目标域
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

1.2 为什么需要迁移学习

在结构健康监测中,迁移学习的必要性体现在:

  1. 数据稀缺问题

    • 新建结构缺乏历史监测数据
    • 损伤案例稀少,难以获取足够的训练样本
    • 不同传感器类型数据难以统一
  2. 成本问题

    • 为每个结构单独建立模型成本高昂
    • 标注大量数据需要专业知识和人力
  3. 泛化问题

    • 训练好的模型难以直接应用于新结构
    • 环境变化导致模型性能下降

1.3 迁移学习在SHM中的应用场景

  1. 跨结构迁移:从已有监测数据的结构迁移到新结构
  2. 跨损伤类型迁移:从已知损伤类型迁移到新型损伤
  3. 跨传感器迁移:从一种传感器类型迁移到另一种
  4. 跨环境迁移:从实验室环境迁移到实际工程环境

2. 迁移学习基础理论

2.1 问题定义

域(Domain)D={X,P(X)}D = \{X, P(X)\}D={X,P(X)}

  • XXX:特征空间
  • P(X)P(X)P(X):边缘概率分布

任务(Task)T={Y,f(X)}T = \{Y, f(X)\}T={Y,f(X)}

  • YYY:标签空间
  • f(X)f(X)f(X):目标预测函数

迁移学习目标:给定源域 DsD_sDs 和任务 TsT_sTs,目标域 DtD_tDt 和任务 TtT_tTt,学习目标域的预测函数 ft(X)f_t(X)ft(X),利用源域的知识。

2.2 迁移学习分类

根据源域和目标域的关系,迁移学习可分为:

1. 基于实例的迁移(Instance-based Transfer)

  • 从源域中选择与目标域相似的样本进行训练
  • 通过样本加权实现知识迁移
  • 代表方法:TrAdaBoost

2. 基于特征的迁移(Feature-based Transfer)

  • 学习源域和目标域共同的特征表示
  • 通过特征变换减少域间差异
  • 代表方法:TCA、JDA

3. 基于参数的迁移(Parameter-based Transfer)

  • 共享源域和目标域的模型参数
  • 微调(Fine-tuning)预训练模型
  • 代表方法:网络微调、多任务学习

4. 基于关系的迁移(Relational-based Transfer)

  • 迁移源域中的关系知识
  • 适用于关系型数据

2.3 域适应(Domain Adaptation)

域适应是迁移学习的重要分支,专注于解决源域和目标域分布不一致的问题。

边缘分布适应
最小化源域和目标域的边缘分布差异:

min⁡fD(Ps(X),Pt(X))\min_f D(P_s(X), P_t(X))fminD(Ps(X),Pt(X))

条件分布适应
最小化源域和目标域的条件分布差异:

min⁡fD(Ps(Y∣X),Pt(Y∣X))\min_f D(P_s(Y|X), P_t(Y|X))fminD(Ps(YX),Pt(YX))

联合分布适应
同时最小化边缘分布和条件分布的差异:

min⁡fD(Ps(X,Y),Pt(X,Y))\min_f D(P_s(X, Y), P_t(X, Y))fminD(Ps(X,Y),Pt(X,Y))

2.4 最大均值差异(MMD)

MMD是衡量两个分布差异的常用指标:

MMD(Xs,Xt)=∥1ns∑i=1nsϕ(xis)−1nt∑j=1ntϕ(xjt)∥HMMD(X_s, X_t) = \left\| \frac{1}{n_s} \sum_{i=1}^{n_s} \phi(x_i^s) - \frac{1}{n_t} \sum_{j=1}^{n_t} \phi(x_j^t) \right\|_HMMD(Xs,Xt)= ns1i=1nsϕ(xis)nt1j=1ntϕ(xjt) H

其中,ϕ(⋅)\phi(\cdot)ϕ() 是将数据映射到再生核希尔伯特空间(RKHS)的函数。

核函数选择

  • 线性核:k(x,y)=xTyk(x, y) = x^T yk(x,y)=xTy
  • 多项式核:k(x,y)=(xTy+c)dk(x, y) = (x^T y + c)^dk(x,y)=(xTy+c)d
  • 高斯核(RBF):k(x,y)=exp⁡(−∥x−y∥22σ2)k(x, y) = \exp(-\frac{\|x - y\|^2}{2\sigma^2})k(x,y)=exp(2σ2xy2)

3. 深度迁移学习方法

3.1 网络微调(Fine-tuning)

网络微调是最常用的深度迁移学习方法:

步骤

  1. 在源域数据上预训练深度网络
  2. 冻结底层特征提取层
  3. 在目标域数据上微调顶层分类器
  4. 可选:逐步解冻更多层进行微调

学习率设置

  • 底层:较小学习率(如 10−510^{-5}105
  • 顶层:较大学习率(如 10−310^{-3}103

3.2 领域对抗神经网络(DANN)

DANN通过对抗训练实现域不变特征学习:

网络结构

  • 特征提取器 GfG_fGf:提取域不变特征
  • 标签预测器 GyG_yGy:预测类别标签
  • 域判别器 GdG_dGd:判别样本来自哪个域

目标函数

min⁡Gfmax⁡GdLy−λLd\min_{G_f} \max_{G_d} L_y - \lambda L_dGfminGdmaxLyλLd

其中:

  • LyL_yLy:标签预测损失
  • LdL_dLd:域判别损失
  • λ\lambdaλ:权衡参数

梯度反转层(GRL)
在前向传播时保持恒等变换,在反向传播时反转梯度:

Rλ(x)=xR_\lambda(x) = xRλ(x)=x
dRλdx=−λI\frac{dR_\lambda}{dx} = -\lambda IdxdRλ=λI

3.3 深度域混淆(DDC)

DDC通过最小化MMD实现特征对齐:

L=Lclassification+λ⋅MMD2(Xs,Xt)L = L_{classification} + \lambda \cdot MMD^2(X_s, X_t)L=Lclassification+λMMD2(Xs,Xt)

优化目标

  • 最小化分类损失
  • 同时最小化域间分布差异

3.4 多任务学习

多任务学习通过共享表示学习多个相关任务:

硬参数共享

  • 底层网络参数共享
  • 顶层任务特定层分离

软参数共享

  • 每个任务有独立的网络
  • 通过正则化约束参数相似

4. 迁移学习在SHM中的具体应用

4.1 跨结构损伤识别

问题描述

  • 源域:已有大量监测数据的桥梁A
  • 目标域:新建桥梁B,监测数据有限
  • 目标:利用桥梁A的知识识别桥梁B的损伤

解决方案

  1. 在桥梁A数据上预训练损伤识别模型
  2. 使用桥梁B的少量数据微调模型
  3. 评估模型在桥梁B上的性能

4.2 跨传感器类型迁移

问题描述

  • 源域:加速度传感器数据
  • 目标域:应变传感器数据
  • 目标:利用加速度数据的知识分析应变数据

解决方案

  1. 学习传感器无关的特征表示
  2. 使用域适应方法对齐特征分布
  3. 在目标传感器数据上微调

4.3 环境自适应监测

问题描述

  • 源域:实验室环境下的监测数据
  • 目标域:实际工程环境下的监测数据
  • 目标:使模型适应实际环境

解决方案

  1. 使用DANN学习域不变特征
  2. 通过对抗训练消除环境差异
  3. 在目标环境数据上微调

5. Python仿真实现

5.1 环境设置

import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix
import warnings
warnings.filterwarnings('ignore')

5.2 数据准备

def generate_structure_data(structure_type='bridge', n_samples=1000, 
                           noise_level=0.1, damage_ratio=0.3):
    """生成不同结构的监测数据"""
    np.random.seed(42 if structure_type == 'bridge' else 123)
    
    n_features = 20
    n_damage = int(n_samples * damage_ratio)
    n_healthy = n_samples - n_damage
    
    # 结构特定参数
    if structure_type == 'bridge':
        base_freq = 10
        freq_var = 2
    elif structure_type == 'building':
        base_freq = 5
        freq_var = 1
    else:  # tower
        base_freq = 3
        freq_var = 0.5
    
    # 健康样本
    healthy_data = []
    for _ in range(n_healthy):
        features = np.random.randn(n_features) * 0.5
        # 添加结构特征
        for i in range(5):
            features[i] += base_freq + np.random.randn() * freq_var
        features += noise_level * np.random.randn(n_features)
        healthy_data.append(features)
    
    # 损伤样本
    damage_data = []
    for _ in range(n_damage):
        features = np.random.randn(n_features) * 0.5
        # 添加损伤特征(频率变化)
        for i in range(5):
            features[i] += base_freq * 0.8 + np.random.randn() * freq_var * 1.5
        features[5:10] += 2  # 幅值异常
        features += noise_level * np.random.randn(n_features)
        damage_data.append(features)
    
    X = np.vstack([healthy_data, damage_data])
    y = np.array([0] * n_healthy + [1] * n_damage)
    
    return X, y

5.3 迁移学习模型

class TransferLearningModel:
    """迁移学习模型"""
    
    def __init__(self, input_dim, hidden_dims=[64, 32]):
        self.input_dim = input_dim
        self.hidden_dims = hidden_dims
        
        # 初始化网络参数
        self.params = {}
        dims = [input_dim] + hidden_dims + [2]  # 二分类
        
        for i in range(len(dims) - 1):
            self.params[f'W{i}'] = np.random.randn(dims[i], dims[i+1]) * 0.01
            self.params[f'b{i}'] = np.zeros(dims[i+1])
    
    def relu(self, x):
        return np.maximum(0, x)
    
    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=1, keepdims=True)
    
    def forward(self, X):
        """前向传播"""
        cache = {'A0': X}
        A = X
        
        # 隐藏层
        for i in range(len(self.hidden_dims)):
            Z = A @ self.params[f'W{i}'] + self.params[f'b{i}']
            A = self.relu(Z)
            cache[f'Z{i+1}'] = Z
            cache[f'A{i+1}'] = A
        
        # 输出层
        i = len(self.hidden_dims)
        Z = A @ self.params[f'W{i}'] + self.params[f'b{i}']
        output = self.softmax(Z)
        cache[f'Z{i+1}'] = Z
        cache[f'output'] = output
        
        return output, cache
    
    def compute_loss(self, y_pred, y_true):
        """计算交叉熵损失"""
        n_samples = y_true.shape[0]
        log_likelihood = -np.log(y_pred[range(n_samples), y_true] + 1e-8)
        return np.sum(log_likelihood) / n_samples
    
    def train(self, X, y, epochs=100, lr=0.01, batch_size=32):
        """训练模型"""
        n_samples = X.shape[0]
        history = {'loss': [], 'accuracy': []}
        
        for epoch in range(epochs):
            # 随机打乱
            indices = np.random.permutation(n_samples)
            X_shuffled = X[indices]
            y_shuffled = y[indices]
            
            epoch_loss = 0
            correct = 0
            
            # 小批量训练
            for i in range(0, n_samples, batch_size):
                X_batch = X_shuffled[i:i+batch_size]
                y_batch = y_shuffled[i:i+batch_size]
                
                # 前向传播
                y_pred, cache = self.forward(X_batch)
                
                # 计算损失
                loss = self.compute_loss(y_pred, y_batch)
                epoch_loss += loss * X_batch.shape[0]
                
                # 计算准确率
                predictions = np.argmax(y_pred, axis=1)
                correct += np.sum(predictions == y_batch)
                
                # 反向传播(简化版本)
                # 这里使用简化的参数更新
                for key in self.params:
                    if key.startswith('W'):
                        self.params[key] -= lr * np.random.randn(*self.params[key].shape) * 0.01
                    else:
                        self.params[key] -= lr * np.random.randn(*self.params[key].shape) * 0.01
            
            # 记录历史
            history['loss'].append(epoch_loss / n_samples)
            history['accuracy'].append(correct / n_samples)
            
            if (epoch + 1) % 20 == 0:
                print(f"  Epoch {epoch+1}/{epochs}, Loss: {history['loss'][-1]:.4f}, "
                      f"Acc: {history['accuracy'][-1]:.4f}")
        
        return history
    
    def predict(self, X):
        """预测"""
        y_pred, _ = self.forward(X)
        return np.argmax(y_pred, axis=1)

5.4 MMD计算

def compute_mmd(X_source, X_target, kernel='rbf', gamma=1.0):
    """计算最大均值差异(MMD)"""
    n_source = X_source.shape[0]
    n_target = X_target.shape[0]
    
    if kernel == 'rbf':
        # 计算核矩阵
        # K_ss: 源域与源域
        K_ss = rbf_kernel(X_source, X_source, gamma)
        # K_tt: 目标域与目标域
        K_tt = rbf_kernel(X_target, X_target, gamma)
        # K_st: 源域与目标域
        K_st = rbf_kernel(X_source, X_target, gamma)
    else:
        # 线性核
        K_ss = X_source @ X_source.T
        K_tt = X_target @ X_target.T
        K_st = X_source @ X_target.T
    
    # 计算MMD
    mmd = (np.sum(K_ss) / (n_source * n_source) +
           np.sum(K_tt) / (n_target * n_target) -
           2 * np.sum(K_st) / (n_source * n_target))
    
    return np.sqrt(max(mmd, 0))

def rbf_kernel(X, Y, gamma):
    """RBF核函数"""
    dist = np.sum(X**2, axis=1).reshape(-1, 1) + np.sum(Y**2, axis=1) - 2 * X @ Y.T
    return np.exp(-gamma * dist)
"""
主题094:结构健康监测中的迁移学习
Transfer Learning in Structural Health Monitoring
"""

import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from matplotlib.patches import FancyBboxPatch, FancyArrowPatch, Circle
import warnings
warnings.filterwarnings('ignore')

# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

print("=" * 70)
print("主题094:结构健康监测中的迁移学习")
print("Transfer Learning in Structural Health Monitoring")
print("=" * 70)

# =============================================================================
# 1. 数据生成
# =============================================================================
print("\n" + "=" * 70)
print("1. 生成源域和目标域数据")
print("=" * 70)

def generate_structure_data(structure_type='bridge', n_samples=1000, 
                           noise_level=0.1, damage_ratio=0.3):
    """生成不同结构的监测数据"""
    np.random.seed(42 if structure_type == 'bridge' else 123)
    
    n_features = 20
    n_damage = int(n_samples * damage_ratio)
    n_healthy = n_samples - n_damage
    
    # 结构特定参数
    if structure_type == 'bridge':
        base_freq = 10
        freq_var = 2
        structure_noise = 0.5
    elif structure_type == 'building':
        base_freq = 5
        freq_var = 1
        structure_noise = 0.3
    else:  # tower
        base_freq = 3
        freq_var = 0.5
        structure_noise = 0.2
    
    # 健康样本
    healthy_data = []
    for _ in range(n_healthy):
        features = np.random.randn(n_features) * 0.5
        # 添加结构特征(频率特征)
        for i in range(5):
            features[i] += base_freq + np.random.randn() * freq_var
        # 添加结构特定噪声
        features += noise_level * np.random.randn(n_features)
        features += structure_noise * np.random.randn(n_features)
        healthy_data.append(features)
    
    # 损伤样本
    damage_data = []
    for _ in range(n_damage):
        features = np.random.randn(n_features) * 0.5
        # 添加损伤特征(频率变化)
        for i in range(5):
            features[i] += base_freq * 0.8 + np.random.randn() * freq_var * 1.5
        features[5:10] += 2  # 幅值异常
        features += noise_level * np.random.randn(n_features)
        features += structure_noise * np.random.randn(n_features)
        damage_data.append(features)
    
    X = np.vstack([healthy_data, damage_data])
    y = np.array([0] * n_healthy + [1] * n_damage)
    
    return X, y

# 生成源域数据(桥梁)
X_source, y_source = generate_structure_data('bridge', n_samples=1000, 
                                             noise_level=0.1, damage_ratio=0.3)
print(f"源域(桥梁): {X_source.shape[0]} 个样本, {X_source.shape[1]} 个特征")
print(f"  健康: {np.sum(y_source == 0)} 个, 损伤: {np.sum(y_source == 1)} 个")

# 生成目标域数据(建筑物)- 少量样本
X_target, y_target = generate_structure_data('building', n_samples=100, 
                                             noise_level=0.15, damage_ratio=0.3)
print(f"\n目标域(建筑物): {X_target.shape[0]} 个样本, {X_target.shape[1]} 个特征")
print(f"  健康: {np.sum(y_target == 0)} 个, 损伤: {np.sum(y_target == 1)} 个")

# 生成测试数据
X_target_test, y_target_test = generate_structure_data('building', n_samples=200, 
                                                       noise_level=0.15, damage_ratio=0.3)
print(f"\n目标域测试集: {X_target_test.shape[0]} 个样本")

# =============================================================================
# 2. 神经网络模型
# =============================================================================
print("\n" + "=" * 70)
print("2. 实现神经网络模型")
print("=" * 70)

class NeuralNetwork:
    """神经网络分类器"""
    
    def __init__(self, input_dim, hidden_dims=[64, 32], output_dim=2):
        self.input_dim = input_dim
        self.hidden_dims = hidden_dims
        self.output_dim = output_dim
        
        # 初始化参数
        self.params = {}
        dims = [input_dim] + hidden_dims + [output_dim]
        
        for i in range(len(dims) - 1):
            # Xavier初始化
            self.params[f'W{i}'] = np.random.randn(dims[i], dims[i+1]) * np.sqrt(2.0 / dims[i])
            self.params[f'b{i}'] = np.zeros(dims[i+1])
        
        self.n_layers = len(hidden_dims) + 1
    
    def relu(self, x):
        return np.maximum(0, x)
    
    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=1, keepdims=True)
    
    def forward(self, X):
        """前向传播"""
        cache = {'A0': X}
        A = X
        
        # 隐藏层
        for i in range(len(self.hidden_dims)):
            Z = A @ self.params[f'W{i}'] + self.params[f'b{i}']
            A = self.relu(Z)
            cache[f'Z{i+1}'] = Z
            cache[f'A{i+1}'] = A
        
        # 输出层
        i = len(self.hidden_dims)
        Z = A @ self.params[f'W{i}'] + self.params[f'b{i}']
        output = self.softmax(Z)
        cache[f'Z{i+1}'] = Z
        cache['output'] = output
        
        return output, cache
    
    def compute_loss(self, y_pred, y_true):
        """计算交叉熵损失"""
        n_samples = y_true.shape[0]
        log_likelihood = -np.log(y_pred[range(n_samples), y_true] + 1e-8)
        return np.sum(log_likelihood) / n_samples
    
    def backward(self, cache, y_true, lr=0.01):
        """反向传播"""
        n_samples = y_true.shape[0]
        grads = {}
        
        # 输出层梯度
        dZ = cache['output'].copy()
        dZ[range(n_samples), y_true] -= 1
        dZ /= n_samples
        
        i = len(self.hidden_dims)
        grads[f'dW{i}'] = cache[f'A{i}'].T @ dZ
        grads[f'db{i}'] = np.sum(dZ, axis=0)
        dA = dZ @ self.params[f'W{i}'].T
        
        # 隐藏层梯度
        for i in range(len(self.hidden_dims) - 1, -1, -1):
            dZ = dA * (cache[f'Z{i+1}'] > 0).astype(float)
            grads[f'dW{i}'] = cache[f'A{i}'].T @ dZ
            grads[f'db{i}'] = np.sum(dZ, axis=0)
            if i > 0:
                dA = dZ @ self.params[f'W{i}'].T
        
        # 更新参数
        for i in range(self.n_layers):
            self.params[f'W{i}'] -= lr * grads[f'dW{i}']
            self.params[f'b{i}'] -= lr * grads[f'db{i}']
    
    def train(self, X, y, epochs=100, lr=0.01, batch_size=32, verbose=True):
        """训练模型"""
        n_samples = X.shape[0]
        history = {'loss': [], 'accuracy': []}
        
        for epoch in range(epochs):
            # 随机打乱
            indices = np.random.permutation(n_samples)
            X_shuffled = X[indices]
            y_shuffled = y[indices]
            
            epoch_loss = 0
            
            # 小批量训练
            for i in range(0, n_samples, batch_size):
                X_batch = X_shuffled[i:i+batch_size]
                y_batch = y_shuffled[i:i+batch_size]
                
                # 前向传播
                y_pred, cache = self.forward(X_batch)
                
                # 计算损失
                loss = self.compute_loss(y_pred, y_batch)
                epoch_loss += loss * X_batch.shape[0]
                
                # 反向传播
                self.backward(cache, y_batch, lr)
            
            # 计算训练准确率
            y_pred_full, _ = self.forward(X)
            acc = np.mean(np.argmax(y_pred_full, axis=1) == y)
            
            history['loss'].append(epoch_loss / n_samples)
            history['accuracy'].append(acc)
            
            if verbose and (epoch + 1) % 20 == 0:
                print(f"  Epoch {epoch+1}/{epochs}, Loss: {history['loss'][-1]:.4f}, "
                      f"Acc: {history['accuracy'][-1]:.4f}")
        
        return history
    
    def predict(self, X):
        """预测"""
        y_pred, _ = self.forward(X)
        return np.argmax(y_pred, axis=1)
    
    def evaluate(self, X, y):
        """评估模型"""
        y_pred = self.predict(X)
        acc = np.mean(y_pred == y)
        return acc

# =============================================================================
# 3. MMD计算
# =============================================================================
def compute_mmd(X_source, X_target, kernel='rbf', gamma=1.0):
    """计算最大均值差异(MMD)"""
    n_source = X_source.shape[0]
    n_target = X_target.shape[0]
    
    def rbf_kernel(X, Y, gamma):
        """RBF核函数"""
        dist = np.sum(X**2, axis=1).reshape(-1, 1) + np.sum(Y**2, axis=1) - 2 * X @ Y.T
        return np.exp(-gamma * dist)
    
    if kernel == 'rbf':
        K_ss = rbf_kernel(X_source, X_source, gamma)
        K_tt = rbf_kernel(X_target, X_target, gamma)
        K_st = rbf_kernel(X_source, X_target, gamma)
    else:
        K_ss = X_source @ X_source.T
        K_tt = X_target @ X_target.T
        K_st = X_source @ X_target.T
    
    # 计算MMD
    mmd = (np.sum(K_ss) / (n_source * n_source) +
           np.sum(K_tt) / (n_target * n_target) -
           2 * np.sum(K_st) / (n_source * n_target))
    
    return np.sqrt(max(mmd, 0))

# =============================================================================
# 4. 迁移学习实验
# =============================================================================
print("\n" + "=" * 70)
print("3. 迁移学习实验对比")
print("=" * 70)

# 方法1:无迁移(直接在目标域上训练)
print("\n方法1:无迁移学习(直接在目标域训练)")
print("-" * 50)
model_no_transfer = NeuralNetwork(input_dim=20, hidden_dims=[64, 32])
history_no_transfer = model_no_transfer.train(X_target, y_target, epochs=100, lr=0.01, batch_size=16)
acc_no_transfer = model_no_transfer.evaluate(X_target_test, y_target_test)
print(f"测试准确率: {acc_no_transfer:.4f}")

# 方法2:预训练+微调
print("\n方法2:预训练 + 微调")
print("-" * 50)

# 在源域上预训练
print("步骤1: 在源域上预训练...")
model_pretrained = NeuralNetwork(input_dim=20, hidden_dims=[64, 32])
history_pretrain = model_pretrained.train(X_source, y_source, epochs=50, lr=0.01, batch_size=32)

# 在目标域上微调
print("\n步骤2: 在目标域上微调...")
history_finetune = model_pretrained.train(X_target, y_target, epochs=50, lr=0.005, batch_size=16)
acc_finetune = model_pretrained.evaluate(X_target_test, y_target_test)
print(f"测试准确率: {acc_finetune:.4f}")

# 方法3:特征提取器迁移
print("\n方法3:特征提取器迁移(冻结底层)")
print("-" * 50)

# 重新初始化模型
model_feature = NeuralNetwork(input_dim=20, hidden_dims=[64, 32])

# 在源域上训练
print("步骤1: 在源域上训练特征提取器...")
model_feature.train(X_source, y_source, epochs=50, lr=0.01, batch_size=32)

# 冻结前两层,只训练输出层
print("\n步骤2: 冻结底层,只微调输出层...")
# 保存底层参数
W0_backup = model_feature.params['W0'].copy()
b0_backup = model_feature.params['b0'].copy()
W1_backup = model_feature.params['W1'].copy()
b1_backup = model_feature.params['b1'].copy()

# 在目标域上训练(保持底层不变)
for epoch in range(50):
    indices = np.random.permutation(len(X_target))
    for i in range(0, len(X_target), 16):
        X_batch = X_target[indices[i:i+16]]
        y_batch = y_target[indices[i:i+16]]
        
        y_pred, cache = model_feature.forward(X_batch)
        
        # 只更新输出层
        n_samples = len(y_batch)
        dZ = y_pred.copy()
        dZ[range(n_samples), y_batch] -= 1
        dZ /= n_samples
        
        model_feature.params['W2'] -= 0.005 * cache['A2'].T @ dZ
        model_feature.params['b2'] -= 0.005 * np.sum(dZ, axis=0)
    
    # 恢复底层参数
    model_feature.params['W0'] = W0_backup
    model_feature.params['b0'] = b0_backup
    model_feature.params['W1'] = W1_backup
    model_feature.params['b1'] = b1_backup

acc_feature = model_feature.evaluate(X_target_test, y_target_test)
print(f"测试准确率: {acc_feature:.4f}")

# =============================================================================
# 5. 计算MMD
# =============================================================================
print("\n" + "=" * 70)
print("4. 域间差异分析(MMD)")
print("=" * 70)

# 计算源域和目标域的MMD
mmd_before = compute_mmd(X_source, X_target, kernel='rbf', gamma=0.1)
print(f"MMD(源域 vs 目标域): {mmd_before:.4f}")

# 使用PCA降维后计算MMD
from sklearn.decomposition import PCA
pca = PCA(n_components=5)
X_source_pca = pca.fit_transform(X_source)
X_target_pca = pca.transform(X_target)
mmd_pca = compute_mmd(X_source_pca, X_target_pca, kernel='rbf', gamma=0.1)
print(f"MMD(PCA降维后): {mmd_pca:.4f}")

# =============================================================================
# 6. 可视化结果
# =============================================================================
print("\n" + "=" * 70)
print("5. 生成可视化")
print("=" * 70)

# 6.1 训练曲线对比
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
fig.suptitle('迁移学习方法训练曲线对比', fontsize=16, fontweight='bold')

# 损失曲线
axes[0].plot(history_no_transfer['loss'], 'b-', linewidth=2, label='无迁移', alpha=0.8)
axes[0].plot(history_finetune['loss'], 'r-', linewidth=2, label='预训练+微调', alpha=0.8)
axes[0].set_xlabel('迭代次数', fontsize=12)
axes[0].set_ylabel('损失', fontsize=12)
axes[0].set_title('训练损失', fontsize=13)
axes[0].legend(fontsize=11)
axes[0].grid(True, alpha=0.3)

# 准确率曲线
axes[1].plot(history_no_transfer['accuracy'], 'b-', linewidth=2, label='无迁移', alpha=0.8)
axes[1].plot(history_finetune['accuracy'], 'r-', linewidth=2, label='预训练+微调', alpha=0.8)
axes[1].set_xlabel('迭代次数', fontsize=12)
axes[1].set_ylabel('准确率', fontsize=12)
axes[1].set_title('训练准确率', fontsize=13)
axes[1].legend(fontsize=11)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('training_comparison.png', dpi=150, bbox_inches='tight')
print("[OK] 保存: training_comparison.png")
plt.close()

# 6.2 方法性能对比
fig, ax = plt.subplots(figsize=(10, 6))

methods = ['无迁移\n(目标域)', '预训练+微调', '特征提取器\n迁移']
accuracies = [acc_no_transfer, acc_finetune, acc_feature]
colors = ['#3498DB', '#E74C3C', '#2ECC71']

bars = ax.bar(methods, accuracies, color=colors, alpha=0.8, edgecolor='black', linewidth=2)

# 添加数值标签
for bar, acc in zip(bars, accuracies):
    height = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2., height + 0.01,
            f'{acc:.3f}', ha='center', va='bottom', fontsize=14, fontweight='bold')

ax.set_ylabel('测试准确率', fontsize=13)
ax.set_title('不同迁移学习方法性能对比', fontsize=15, fontweight='bold')
ax.set_ylim([0, 1])
ax.grid(True, alpha=0.3, axis='y')

# 添加提升标注
improvement = (acc_finetune - acc_no_transfer) / acc_no_transfer * 100
ax.text(1.5, 0.5, f'迁移学习提升:\n{improvement:.1f}%', 
        ha='center', va='center', fontsize=12, 
        bbox=dict(boxstyle='round', facecolor='yellow', alpha=0.5))

plt.tight_layout()
plt.savefig('method_comparison.png', dpi=150, bbox_inches='tight')
print("[OK] 保存: method_comparison.png")
plt.close()

# 6.3 特征分布可视化(PCA)
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
fig.suptitle('源域与目标域特征分布对比(PCA降维)', fontsize=16, fontweight='bold')

# 原始特征分布
pca_viz = PCA(n_components=2)
X_source_viz = pca_viz.fit_transform(X_source)
X_target_viz = pca_viz.transform(X_target)

axes[0].scatter(X_source_viz[y_source==0, 0], X_source_viz[y_source==0, 1], 
               c='blue', s=30, alpha=0.5, label='源域-健康')
axes[0].scatter(X_source_viz[y_source==1, 0], X_source_viz[y_source==1, 1], 
               c='red', s=30, alpha=0.5, label='源域-损伤')
axes[0].scatter(X_target_viz[y_target==0, 0], X_target_viz[y_target==0, 1], 
               c='cyan', s=50, marker='^', alpha=0.7, label='目标域-健康')
axes[0].scatter(X_target_viz[y_target==1, 0], X_target_viz[y_target==1, 1], 
               c='orange', s=50, marker='^', alpha=0.7, label='目标域-损伤')
axes[0].set_xlabel('主成分1', fontsize=12)
axes[0].set_ylabel('主成分2', fontsize=12)
axes[0].set_title('原始特征分布', fontsize=13)
axes[0].legend(fontsize=10)
axes[0].grid(True, alpha=0.3)

# 域分布
axes[1].scatter(X_source_viz[:, 0], X_source_viz[:, 1], 
               c='blue', s=30, alpha=0.4, label='源域(桥梁)')
axes[1].scatter(X_target_viz[:, 0], X_target_viz[:, 1], 
               c='red', s=50, marker='^', alpha=0.7, label='目标域(建筑物)')
axes[1].set_xlabel('主成分1', fontsize=12)
axes[1].set_ylabel('主成分2', fontsize=12)
axes[1].set_title('域分布对比', fontsize=13)
axes[1].legend(fontsize=10)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('feature_distribution.png', dpi=150, bbox_inches='tight')
print("[OK] 保存: feature_distribution.png")
plt.close()

# 6.4 迁移学习架构图
fig, ax = plt.subplots(figsize=(14, 10))
ax.set_xlim(0, 14)
ax.set_ylim(0, 10)
ax.axis('off')
ax.set_title('迁移学习架构示意图', fontsize=18, fontweight='bold', pad=20)

def draw_box(ax, x, y, width, height, color, label, fontsize=10):
    box = FancyBboxPatch((x, y), width, height, 
                         boxstyle="round,pad=0.1", 
                         facecolor=color, edgecolor='black', linewidth=2)
    ax.add_patch(box)
    ax.text(x + width/2, y + height/2, label, 
            ha='center', va='center', fontsize=fontsize, fontweight='bold')

# 源域
ax.text(2.5, 9, '源域(桥梁)', fontsize=14, fontweight='bold', ha='center')
draw_box(ax, 1, 7.5, 3, 1.2, '#90EE90', '源域数据\n(大量标注)', 10)
draw_box(ax, 1, 5.8, 3, 1.2, '#87CEEB', '特征提取器\n(预训练)', 10)
draw_box(ax, 1, 4.1, 3, 1.2, '#DDA0DD', '分类器\n(预训练)', 10)

# 目标域
ax.text(9.5, 9, '目标域(建筑物)', fontsize=14, fontweight='bold', ha='center')
draw_box(ax, 8, 7.5, 3, 1.2, '#FFB6C1', '目标域数据\n(少量标注)', 10)
draw_box(ax, 8, 5.8, 3, 1.2, '#87CEEB', '特征提取器\n(冻结/微调)', 10)
draw_box(ax, 8, 4.1, 3, 1.2, '#DDA0DD', '分类器\n(微调)', 10)

# 迁移箭头
ax.annotate('', xy=(8, 6.4), xytext=(4, 6.4),
           arrowprops=dict(arrowstyle='->', lw=3, color='red'))
ax.text(6, 6.8, '知识迁移', fontsize=12, color='red', 
       fontweight='bold', ha='center')

# 输出
draw_box(ax, 5, 2, 4, 1.2, '#F0E68C', '损伤识别模型\n(适配目标域)', 11)

# 连接线
ax.plot([2.5, 2.5], [7.5, 6.2], 'k-', lw=2)
ax.plot([2.5, 2.5], [5.8, 4.5], 'k-', lw=2)
ax.plot([9.5, 9.5], [7.5, 6.2], 'k-', lw=2)
ax.plot([9.5, 9.5], [5.8, 4.5], 'k-', lw=2)
ax.plot([9.5, 7], [3.5, 2.6], 'k-', lw=2)

# 说明文字
ax.text(7, 0.8, '迁移学习:利用源域知识,提升目标域模型性能', 
       ha='center', fontsize=12, style='italic')

plt.tight_layout()
plt.savefig('transfer_learning_architecture.png', dpi=150, bbox_inches='tight')
print("[OK] 保存: transfer_learning_architecture.png")
plt.close()

# 6.5 混淆矩阵对比
fig, axes = plt.subplots(1, 3, figsize=(15, 4))
fig.suptitle('混淆矩阵对比', fontsize=16, fontweight='bold')

def plot_confusion_matrix(ax, y_true, y_pred, title):
    cm = np.zeros((2, 2))
    for i in range(2):
        for j in range(2):
            cm[i, j] = np.sum((y_true == i) & (y_pred == j))
    
    im = ax.imshow(cm, cmap='Blues', aspect='auto')
    ax.set_xticks([0, 1])
    ax.set_yticks([0, 1])
    ax.set_xticklabels(['健康', '损伤'])
    ax.set_yticklabels(['健康', '损伤'])
    ax.set_xlabel('预测标签', fontsize=11)
    ax.set_ylabel('真实标签', fontsize=11)
    ax.set_title(title, fontsize=12)
    
    # 添加数值
    for i in range(2):
        for j in range(2):
            ax.text(j, i, int(cm[i, j]), ha='center', va='center', 
                   fontsize=14, fontweight='bold')

# 无迁移
y_pred_no = model_no_transfer.predict(X_target_test)
plot_confusion_matrix(axes[0], y_target_test, y_pred_no, 
                     f'无迁移\n准确率: {acc_no_transfer:.3f}')

# 预训练+微调
y_pred_fine = model_pretrained.predict(X_target_test)
plot_confusion_matrix(axes[1], y_target_test, y_pred_fine, 
                     f'预训练+微调\n准确率: {acc_finetune:.3f}')

# 特征提取器迁移
y_pred_feat = model_feature.predict(X_target_test)
plot_confusion_matrix(axes[2], y_target_test, y_pred_feat, 
                     f'特征提取器迁移\n准确率: {acc_feature:.3f}')

plt.tight_layout()
plt.savefig('confusion_matrix_comparison.png', dpi=150, bbox_inches='tight')
print("[OK] 保存: confusion_matrix_comparison.png")
plt.close()

# =============================================================================
# 7. 保存结果
# =============================================================================
with open('results_summary.txt', 'w', encoding='utf-8') as f:
    f.write("=" * 70 + "\n")
    f.write("主题094:结构健康监测中的迁移学习\n")
    f.write("=" * 70 + "\n\n")
    
    f.write("实验设置:\n")
    f.write(f"  源域(桥梁): {X_source.shape[0]} 个样本\n")
    f.write(f"  目标域(建筑物): {X_target.shape[0]} 个样本\n")
    f.write(f"  目标域测试集: {X_target_test.shape[0]} 个样本\n")
    f.write(f"  特征维度: {X_source.shape[1]}\n\n")
    
    f.write("域间差异:\n")
    f.write(f"  MMD(原始特征): {mmd_before:.4f}\n")
    f.write(f"  MMD(PCA降维后): {mmd_pca:.4f}\n\n")
    
    f.write("方法性能对比:\n")
    f.write(f"  无迁移学习: {acc_no_transfer:.4f}\n")
    f.write(f"  预训练+微调: {acc_finetune:.4f}\n")
    f.write(f"  特征提取器迁移: {acc_feature:.4f}\n\n")
    
    f.write("迁移学习效果:\n")
    improvement = (acc_finetune - acc_no_transfer) / acc_no_transfer * 100
    f.write(f"  准确率提升: {improvement:.2f}%\n")
    f.write(f"  绝对提升: {acc_finetune - acc_no_transfer:.4f}\n\n")
    
    f.write("=" * 70 + "\n")

print("\n结果已保存到 results_summary.txt")

print("\n" + "=" * 70)
print("仿真完成!")
print("=" * 70)

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐