结构健康监测仿真 - 主题076: 结构健康监测中的联邦学习

Federated Learning in Structural Health Monitoring


目录

  1. 引言
  2. 联邦学习基础理论
  3. 联邦学习算法
  4. 联邦学习在SHM中的应用
  5. 隐私保护技术
  6. Python联邦学习仿真实现
  7. 案例研究
  8. 挑战与展望
  9. 参考文献

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

1. 引言

1.1 背景与动机

结构健康监测(Structural Health Monitoring, SHM)系统通常部署在多个结构上,如桥梁群、风电场、建筑群等。传统的集中式机器学习方法面临以下挑战:

  • 数据孤岛:每个结构的监测数据分散在不同位置
  • 隐私安全:结构数据可能包含敏感信息
  • 通信成本:海量监测数据传输成本高昂
  • 数据异构:不同结构的数据分布差异大
  • 实时性要求:需要快速响应的本地决策能力

联邦学习(Federated Learning, FL)通过"数据不动模型动"的方式,在保护数据隐私的前提下实现协同学习,为解决上述问题提供了有效方案。

1.2 联邦学习的优势

特性 集中式学习 联邦学习
数据隐私 数据集中存储 数据本地保留
通信开销 传输原始数据 仅传输模型参数
扩展性 受限于中心服务器 支持大规模分布式
个性化 全局模型 支持个性化模型
实时性 依赖网络延迟 本地推理低延迟

1.3 本章目标

  • 掌握联邦学习的核心理论和算法
  • 理解联邦平均(FedAvg)等经典算法
  • 学会处理非独立同分布(Non-IID)数据
  • 实现隐私保护机制
  • 开发多结构协同监测的联邦学习仿真

2. 联邦学习基础理论

2.1 联邦学习框架

联邦学习是一种分布式机器学习范式,其核心思想是在数据不出本地的前提下,通过交换模型参数实现协同训练。

基本架构:

┌─────────────────────────────────────────────────────────────┐
│                      中央服务器                              │
│  ┌─────────────────┐    ┌─────────────────┐                 │
│  │   全局模型聚合    │    │   模型分发      │                 │
│  │   Global Model   │───▶│   Distribution  │                 │
│  │   Aggregation    │◀───│                 │                 │
│  └─────────────────┘    └─────────────────┘                 │
└─────────────────────────────────────────────────────────────┘
         ▲                                    │
         │                                    ▼
┌────────┴────────┐              ┌─────────────────────────┐
│   客户端 1       │              │        客户端 K         │
│  ┌───────────┐  │              │      ┌───────────┐      │
│  │ 本地数据   │  │              │      │ 本地数据   │      │
│  │ Local Data│  │              │      │ Local Data│      │
│  └─────┬─────┘  │              │      └─────┬─────┘      │
│        ▼        │              │            ▼            │
│  ┌───────────┐  │              │      ┌───────────┐      │
│  │ 本地训练   │  │              │      │ 本地训练   │      │
│  │  Local    │──┼──────────────┼─────▶│  Local    │      │
│  │ Training  │  │              │      │ Training  │      │
│  └───────────┘  │              │      └───────────┘      │
└─────────────────┘              └─────────────────────────┘

核心概念:

  1. 中央服务器(Central Server):负责全局模型的聚合与分发
  2. 客户端(Client):拥有本地数据的边缘设备或结构
  3. 全局模型(Global Model):所有客户端共享的模型
  4. 本地模型(Local Model):每个客户端训练的模型
  5. 通信轮次(Communication Round):客户端与服务器的一次完整交互

2.2 联邦学习的问题定义

优化目标:

min ⁡ w F ( w ) = ∑ k = 1 K n k n F k ( w ) \min_{w} F(w) = \sum_{k=1}^{K} \frac{n_k}{n} F_k(w) wminF(w)=k=1KnnkFk(w)

其中:

  • w w w:模型参数
  • K K K:客户端数量
  • n k n_k nk:第 k k k个客户端的数据量
  • n = ∑ k = 1 K n k n = \sum_{k=1}^{K} n_k n=k=1Knk:总数据量
  • F k ( w ) F_k(w) Fk(w):第 k k k个客户端的本地损失函数

本地损失函数:

F k ( w ) = 1 n k ∑ i ∈ D k ℓ ( w ; x i , y i ) F_k(w) = \frac{1}{n_k} \sum_{i \in D_k} \ell(w; x_i, y_i) Fk(w)=nk1iDk(w;xi,yi)

其中 D k D_k Dk是第 k k k个客户端的本地数据集。

2.3 联邦学习的分类

按数据分布分类:

联邦学习
├── 横向联邦学习 (Horizontal FL)
│   ├── 特征相同,样本不同
│   ├── 例:多座同类型桥梁
│   └── 目标:扩大样本量
├── 纵向联邦学习 (Vertical FL)
│   ├── 样本相同,特征不同
│   ├── 例:同一结构的多源传感器
│   └── 目标:丰富特征维度
└── 联邦迁移学习 (Federated Transfer Learning)
    ├── 样本和特征都不同
    ├── 例:不同类型结构间
    └── 目标:知识迁移

按参与方式分类:

  • 跨设备联邦学习(Cross-device FL):大量移动/IoT设备
  • 跨组织联邦学习(Cross-silo FL):少数组织/机构

2.4 Python实现:联邦学习基础类

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from typing import List, Dict, Tuple, Optional
import copy

class FederatedClient:
    """
    联邦学习客户端
    
    每个客户端代表一个结构或监测站点
    """
    
    def __init__(self, client_id: int, model: nn.Module, 
                 train_data: torch.utils.data.Dataset,
                 test_data: Optional[torch.utils.data.Dataset] = None,
                 lr: float = 0.01, batch_size: int = 32,
                 epochs: int = 5):
        """
        初始化客户端
        
        参数:
            client_id: 客户端唯一标识
            model: 本地模型
            train_data: 本地训练数据
            test_data: 本地测试数据
            lr: 学习率
            batch_size: 批量大小
            epochs: 本地训练轮数
        """
        self.client_id = client_id
        self.model = model
        self.train_data = train_data
        self.test_data = test_data
        self.lr = lr
        self.batch_size = batch_size
        self.epochs = epochs
        
        self.train_loader = torch.utils.data.DataLoader(
            train_data, batch_size=batch_size, shuffle=True
        )
        if test_data is not None:
            self.test_loader = torch.utils.data.DataLoader(
                test_data, batch_size=batch_size, shuffle=False
            )
        
        self.optimizer = optim.SGD(self.model.parameters(), lr=lr)
        self.criterion = nn.CrossEntropyLoss()
        
        # 训练历史
        self.train_history = []
    
    def local_train(self, global_weights: Optional[Dict] = None) -> Tuple[Dict, float, int]:
        """
        本地训练
        
        参数:
            global_weights: 全局模型权重
        
        返回:
            local_weights: 训练后的本地权重
            loss: 平均训练损失
            n_samples: 本地样本数
        """
        # 加载全局模型
        if global_weights is not None:
            self.model.load_state_dict(global_weights)
        
        self.model.train()
        total_loss = 0
        n_batches = 0
        
        for epoch in range(self.epochs):
            epoch_loss = 0
            for batch_idx, (data, target) in enumerate(self.train_loader):
                self.optimizer.zero_grad()
                output = self.model(data)
                loss = self.criterion(output, target)
                loss.backward()
                self.optimizer.step()
                
                epoch_loss += loss.item()
                n_batches += 1
            
            avg_epoch_loss = epoch_loss / len(self.train_loader)
            self.train_history.append(avg_epoch_loss)
            total_loss += avg_epoch_loss
        
        avg_loss = total_loss / self.epochs
        n_samples = len(self.train_data)
        
        return copy.deepcopy(self.model.state_dict()), avg_loss, n_samples
    
    def local_evaluate(self) -> Tuple[float, float]:
        """
        本地评估
        
        返回:
            loss: 测试损失
            accuracy: 测试准确率
        """
        if self.test_data is None:
            return 0.0, 0.0
        
        self.model.eval()
        test_loss = 0
        correct = 0
        
        with torch.no_grad():
            for data, target in self.test_loader:
                output = self.model(data)
                test_loss += self.criterion(output, target).item()
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()
        
        test_loss /= len(self.test_loader)
        accuracy = 100. * correct / len(self.test_data)
        
        return test_loss, accuracy


class FederatedServer:
    """
    联邦学习服务器
    
    负责全局模型的聚合与分发
    """
    
    def __init__(self, global_model: nn.Module, aggregation_method: str = 'fedavg'):
        """
        初始化服务器
        
        参数:
            global_model: 全局模型
            aggregation_method: 聚合方法 ('fedavg', 'fedprox', 'scaffold')
        """
        self.global_model = global_model
        self.aggregation_method = aggregation_method
        self.global_weights = copy.deepcopy(global_model.state_dict())
        
        # 聚合历史
        self.aggregation_history = []
    
    def aggregate(self, client_weights: List[Dict], client_samples: List[int]) -> Dict:
        """
        聚合客户端模型
        
        参数:
            client_weights: 客户端模型权重列表
            client_samples: 客户端样本数列表
        
        返回:
            global_weights: 更新后的全局权重
        """
        if self.aggregation_method == 'fedavg':
            return self._fedavg(client_weights, client_samples)
        elif self.aggregation_method == 'fedprox':
            return self._fedprox(client_weights, client_samples)
        else:
            raise ValueError(f"Unknown aggregation method: {self.aggregation_method}")
    
    def _fedavg(self, client_weights: List[Dict], client_samples: List[int]) -> Dict:
        """
        FedAvg聚合算法
        
        按样本数加权平均
        """
        total_samples = sum(client_samples)
        global_weights = copy.deepcopy(client_weights[0])
        
        for key in global_weights.keys():
            global_weights[key] = torch.zeros_like(global_weights[key])
        
        for weights, n_samples in zip(client_weights, client_samples):
            weight = n_samples / total_samples
            for key in global_weights.keys():
                global_weights[key] += weight * weights[key]
        
        return global_weights
    
    def _fedprox(self, client_weights: List[Dict], client_samples: List[int],
                 mu: float = 0.01) -> Dict:
        """
        FedProx聚合算法
        
        添加近端项防止本地模型偏离全局模型
        """
        # FedProx的聚合与FedAvg相同,区别在于本地训练
        return self._fedavg(client_weights, client_samples)
    
    def distribute(self) -> Dict:
        """
        分发全局模型
        
        返回:
            global_weights: 当前全局模型权重
        """
        return copy.deepcopy(self.global_weights)
    
    def update_global_model(self, global_weights: Dict):
        """
        更新全局模型
        """
        self.global_weights = global_weights
        self.global_model.load_state_dict(global_weights)
    
    def global_evaluate(self, test_loader: torch.utils.data.DataLoader) -> Tuple[float, float]:
        """
        全局评估
        
        参数:
            test_loader: 测试数据加载器
        
        返回:
            loss: 测试损失
            accuracy: 测试准确率
        """
        self.global_model.eval()
        test_loss = 0
        correct = 0
        criterion = nn.CrossEntropyLoss()
        
        with torch.no_grad():
            for data, target in test_loader:
                output = self.global_model(data)
                test_loss += criterion(output, target).item()
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()
        
        test_loss /= len(test_loader)
        accuracy = 100. * correct / len(test_loader.dataset)
        
        return test_loss, accuracy


# 使用示例
if __name__ == "__main__":
    # 创建简单的神经网络模型
    class SimpleNN(nn.Module):
        def __init__(self, input_dim: int = 10, num_classes: int = 2):
            super(SimpleNN, self).__init__()
            self.fc1 = nn.Linear(input_dim, 64)
            self.fc2 = nn.Linear(64, 32)
            self.fc3 = nn.Linear(32, num_classes)
            self.relu = nn.ReLU()
        
        def forward(self, x):
            x = self.relu(self.fc1(x))
            x = self.relu(self.fc2(x))
            x = self.fc3(x)
            return x
    
    # 创建全局模型
    global_model = SimpleNN(input_dim=10, num_classes=2)
    
    # 创建服务器
    server = FederatedServer(global_model, aggregation_method='fedavg')
    
    print("联邦学习服务器初始化完成")
    print(f"聚合方法: {server.aggregation_method}")

3. 联邦学习算法

3.1 FedAvg(联邦平均)

FedAvg是最基础的联邦学习算法,由McMahan等人于2017年提出。

算法流程:

算法: FedAvg
─────────────────────────────────────────────────────────────
输入: K个客户端,初始全局模型w₀,通信轮数T,本地训练轮数E
输出: 最终全局模型w_T

1. 服务器初始化全局模型w₀
2. for t = 1, 2, ..., T do:
3.     服务器选择客户端子集S_t
4.     for each client k ∈ S_t in parallel do:
5.         w_k^t = ClientUpdate(k, w^{t-1})
6.     end for
7.     w^t = Σ_{k∈S_t} (n_k/n) * w_k^t  // 加权平均
8. end for
9. return w_T

ClientUpdate(k, w):
1. 加载全局模型w
2. for i = 1, 2, ..., E do:
3.     在本地数据D_k上训练模型
4. end for
5. return 更新后的本地模型

Python实现:

def run_fedavg(server: FederatedServer, clients: List[FederatedClient],
               n_rounds: int = 100, clients_per_round: Optional[int] = None,
               verbose: bool = True) -> Dict:
    """
    运行FedAvg算法
    
    参数:
        server: 联邦学习服务器
        clients: 客户端列表
        n_rounds: 通信轮数
        clients_per_round: 每轮选择的客户端数(None表示全部)
        verbose: 是否打印训练信息
    
    返回:
        history: 训练历史
    """
    if clients_per_round is None:
        clients_per_round = len(clients)
    
    history = {
        'train_loss': [],
        'train_acc': [],
        'global_loss': [],
        'global_acc': []
    }
    
    for round_idx in range(n_rounds):
        # 选择参与本轮训练的客户端
        selected_clients = np.random.choice(
            clients, 
            size=min(clients_per_round, len(clients)),
            replace=False
        )
        
        # 分发全局模型
        global_weights = server.distribute()
        
        # 客户端本地训练
        client_weights = []
        client_samples = []
        total_loss = 0
        
        for client in selected_clients:
            weights, loss, n_samples = client.local_train(global_weights)
            client_weights.append(weights)
            client_samples.append(n_samples)
            total_loss += loss
        
        # 服务器聚合
        new_global_weights = server.aggregate(client_weights, client_samples)
        server.update_global_model(new_global_weights)
        
        # 记录历史
        avg_loss = total_loss / len(selected_clients)
        history['train_loss'].append(avg_loss)
        
        if verbose and (round_idx + 1) % 10 == 0:
            print(f"Round {round_idx+1}/{n_rounds}, 平均训练损失: {avg_loss:.4f}")
    
    return history

3.2 FedProx(联邦近端)

FedProx在FedAvg基础上添加了近端项,解决数据异构性问题。

本地目标函数:

h k ( w ; w t ) = F k ( w ) + μ 2 ∥ w − w t ∥ 2 h_k(w; w^t) = F_k(w) + \frac{\mu}{2} \|w - w^t\|^2 hk(w;wt)=Fk(w)+2μwwt2

其中:

  • F k ( w ) F_k(w) Fk(w):原始本地损失
  • μ \mu μ:近端系数
  • w t w^t wt:第 t t t轮的全局模型

近端项的作用:

  1. 限制本地更新:防止本地模型过度偏离全局模型
  2. 处理异构数据:对Non-IID数据更加鲁棒
  3. 提高收敛性:改善收敛速度和稳定性

Python实现:

class FedProxClient(FederatedClient):
    """
    FedProx客户端
    
    在本地训练中添加近端项
    """
    
    def __init__(self, client_id: int, model: nn.Module, 
                 train_data: torch.utils.data.Dataset,
                 test_data: Optional[torch.utils.data.Dataset] = None,
                 lr: float = 0.01, batch_size: int = 32,
                 epochs: int = 5, mu: float = 0.01):
        super().__init__(client_id, model, train_data, test_data,
                        lr, batch_size, epochs)
        self.mu = mu
    
    def local_train(self, global_weights: Optional[Dict] = None) -> Tuple[Dict, float, int]:
        """
        本地训练(带近端项)
        """
        if global_weights is not None:
            self.model.load_state_dict(global_weights)
            # 保存全局模型用于计算近端项
            global_model = copy.deepcopy(self.model)
        
        self.model.train()
        total_loss = 0
        
        for epoch in range(self.epochs):
            epoch_loss = 0
            for batch_idx, (data, target) in enumerate(self.train_loader):
                self.optimizer.zero_grad()
                
                # 前向传播
                output = self.model(data)
                loss = self.criterion(output, target)
                
                # 添加近端项
                if global_weights is not None:
                    proximal_term = 0
                    for param, global_param in zip(self.model.parameters(), 
                                                   global_model.parameters()):
                        proximal_term += torch.norm(param - global_param) ** 2
                    loss += (self.mu / 2) * proximal_term
                
                loss.backward()
                self.optimizer.step()
                
                epoch_loss += loss.item()
            
            avg_epoch_loss = epoch_loss / len(self.train_loader)
            self.train_history.append(avg_epoch_loss)
            total_loss += avg_epoch_loss
        
        avg_loss = total_loss / self.epochs
        n_samples = len(self.train_data)
        
        return copy.deepcopy(self.model.state_dict()), avg_loss, n_samples

3.3 SCAFFOLD(随机控制平均框架)

SCAFFOLD使用控制变量来纠正客户端漂移,加速收敛。

核心思想:

  1. 服务器维护全局控制变量 c c c
  2. 每个客户端维护本地控制变量 c k c_k ck
  3. 本地更新时考虑控制变量差异

本地更新规则:

w k t + 1 = w t − η ( g k ( w t ) − c k + c ) w_k^{t+1} = w^t - \eta (g_k(w^t) - c_k + c) wkt+1=wtη(gk(wt)ck+c)

控制变量更新:

c k n e w = c k − c + w t − w k t + 1 K η c_k^{new} = c_k - c + \frac{w^t - w_k^{t+1}}{K\eta} cknew=ckc+Kηwtwkt+1

Python实现:

class SCAFFOLDClient(FederatedClient):
    """
    SCAFFOLD客户端
    
    使用控制变量纠正客户端漂移
    """
    
    def __init__(self, client_id: int, model: nn.Module, 
                 train_data: torch.utils.data.Dataset,
                 test_data: Optional[torch.utils.data.Dataset] = None,
                 lr: float = 0.01, batch_size: int = 32,
                 epochs: int = 5):
        super().__init__(client_id, model, train_data, test_data,
                        lr, batch_size, epochs)
        
        # 初始化本地控制变量
        self.local_control = {name: torch.zeros_like(param) 
                             for name, param in model.named_parameters()}
        self.global_control = None
    
    def set_global_control(self, global_control: Dict):
        """设置全局控制变量"""
        self.global_control = global_control
    
    def local_train(self, global_weights: Optional[Dict] = None) -> Tuple[Dict, Dict, float, int]:
        """
        本地训练(带控制变量)
        
        返回:
            local_weights: 本地权重
            control_delta: 控制变量变化
            loss: 平均损失
            n_samples: 样本数
        """
        if global_weights is not None:
            self.model.load_state_dict(global_weights)
        
        # 保存初始权重
        initial_weights = {name: param.clone() 
                          for name, param in self.model.named_parameters()}
        
        self.model.train()
        total_loss = 0
        
        for epoch in range(self.epochs):
            epoch_loss = 0
            for batch_idx, (data, target) in enumerate(self.train_loader):
                self.optimizer.zero_grad()
                
                output = self.model(data)
                loss = self.criterion(output, target)
                loss.backward()
                
                # 应用控制变量修正
                if self.global_control is not None:
                    for name, param in self.model.named_parameters():
                        if param.grad is not None:
                            correction = self.global_control[name] - self.local_control[name]
                            param.grad -= correction
                
                self.optimizer.step()
                epoch_loss += loss.item()
            
            avg_epoch_loss = epoch_loss / len(self.train_loader)
            self.train_history.append(avg_epoch_loss)
            total_loss += avg_epoch_loss
        
        # 计算控制变量变化
        control_delta = {}
        K = self.epochs * len(self.train_loader)
        for name, param in self.model.named_parameters():
            control_delta[name] = (initial_weights[name] - param) / (K * self.lr) \
                                 - self.local_control[name] + self.global_control[name]
            self.local_control[name] += control_delta[name]
        
        avg_loss = total_loss / self.epochs
        n_samples = len(self.train_data)
        
        return copy.deepcopy(self.model.state_dict()), control_delta, avg_loss, n_samples


class SCAFFOLDServer(FederatedServer):
    """
    SCAFFOLD服务器
    """
    
    def __init__(self, global_model: nn.Module):
        super().__init__(global_model, aggregation_method='scaffold')
        
        # 初始化全局控制变量
        self.global_control = {name: torch.zeros_like(param) 
                              for name, param in global_model.named_parameters()}
    
    def aggregate_with_control(self, client_weights: List[Dict], 
                               control_deltas: List[Dict],
                               client_samples: List[int]) -> Dict:
        """
        聚合模型并更新控制变量
        """
        # 聚合模型权重
        global_weights = self._fedavg(client_weights, client_samples)
        
        # 更新全局控制变量
        total_samples = sum(client_samples)
        for name in self.global_control.keys():
            avg_delta = sum(delta[name] * n / total_samples 
                          for delta, n in zip(control_deltas, client_samples))
            self.global_control[name] += avg_delta
        
        return global_weights

3.4 个性化联邦学习

针对SHM中不同结构特性差异大的问题,个性化联邦学习为每个客户端学习个性化模型。

主要方法:

  1. FedPer:分层个性化,底层共享,顶层个性化
  2. Per-FedAvg:元学习方法
  3. pFedMe:Moreau包络个性化

FedPer实现:

class FedPerClient(FederatedClient):
    """
    FedPer客户端
    
    分层个性化:特征提取层共享,分类层个性化
    """
    
    def __init__(self, client_id: int, model: nn.Module, 
                 train_data: torch.utils.data.Dataset,
                 test_data: Optional[torch.utils.data.Dataset] = None,
                 lr: float = 0.01, batch_size: int = 32,
                 epochs: int = 5, personalization_layers: List[str] = None):
        super().__init__(client_id, model, train_data, test_data,
                        lr, batch_size, epochs)
        
        # 个性化层(不共享)
        self.personalization_layers = personalization_layers or ['fc3']
        
        # 保存个性化参数
        self.personalized_params = {}
        for name in self.personalization_layers:
            if name in dict(model.named_parameters()):
                self.personalized_params[name] = dict(model.named_parameters())[name].clone()
    
    def get_shared_params(self) -> Dict:
        """获取共享参数"""
        shared_params = {}
        for name, param in self.model.named_parameters():
            if name not in self.personalization_layers:
                shared_params[name] = param.clone()
        return shared_params
    
    def set_shared_params(self, shared_params: Dict):
        """设置共享参数"""
        for name, param in shared_params.items():
            if name in dict(self.model.named_parameters()):
                dict(self.model.named_parameters())[name].data.copy_(param)

4. 联邦学习在SHM中的应用

4.1 多桥梁协同损伤识别

问题描述:

多座桥梁各自拥有监测系统,需要协同训练损伤识别模型,但数据不能共享。

联邦学习方案:

class BridgeFederatedSystem:
    """
    多桥梁联邦学习系统
    
    多座桥梁协同训练损伤识别模型
    """
    
    def __init__(self, n_bridges: int = 5):
        self.n_bridges = n_bridges
        self.bridges = []
        
        # 创建每座桥梁的客户端
        for i in range(n_bridges):
            bridge_client = self._create_bridge_client(i)
            self.bridges.append(bridge_client)
    
    def _create_bridge_client(self, bridge_id: int) -> FederatedClient:
        """
        创建桥梁客户端
        
        每座桥梁有不同的结构特性和损伤模式
        """
        # 创建模型
        model = DamageDetectionCNN()
        
        # 生成本地数据(模拟不同桥梁的数据分布)
        train_data = self._generate_bridge_data(bridge_id, train=True)
        test_data = self._generate_bridge_data(bridge_id, train=False)
        
        client = FederatedClient(
            client_id=bridge_id,
            model=model,
            train_data=train_data,
            test_data=test_data,
            lr=0.001,
            epochs=3
        )
        
        return client
    
    def _generate_bridge_data(self, bridge_id: int, train: bool = True):
        """
        生成桥梁监测数据
        
        模拟不同桥梁的数据异构性
        """
        # 每座桥梁有不同的损伤特征分布
        np.random.seed(bridge_id)
        
        n_samples = 1000 if train else 200
        
        # 生成振动信号数据
        # 不同桥梁有不同的基频和模态
        base_freq = 2.0 + bridge_id * 0.5  # 基频差异
        
        data = []
        labels = []
        
        for _ in range(n_samples):
            # 生成信号
            t = np.linspace(0, 1, 1024)
            
            # 健康状态
            if np.random.random() > 0.3:
                signal = np.sin(2 * np.pi * base_freq * t)
                label = 0  # 健康
            else:
                # 损伤状态(添加谐波成分)
                signal = np.sin(2 * np.pi * base_freq * t) + \
                        0.3 * np.sin(2 * np.pi * base_freq * 2 * t)
                label = 1  # 损伤
            
            # 添加噪声
            signal += np.random.normal(0, 0.1, len(t))
            
            data.append(signal)
            labels.append(label)
        
        data = np.array(data, dtype=np.float32)
        labels = np.array(labels, dtype=np.int64)
        
        return torch.utils.data.TensorDataset(
            torch.from_numpy(data).unsqueeze(1),  # 添加通道维度
            torch.from_numpy(labels)
        )

4.2 传感器网络分布式异常检测

问题描述:

大型结构(如大坝、高层建筑)部署大量传感器,需要分布式异常检测。

联邦学习方案:

class SensorNetworkFL:
    """
    传感器网络联邦学习
    
    分布式传感器协同训练异常检测模型
    """
    
    def __init__(self, n_sensors: int = 20, sensor_groups: int = 4):
        self.n_sensors = n_sensors
        self.sensor_groups = sensor_groups
        
        # 将传感器分组(如按楼层或区域)
        self.groups = self._group_sensors()
    
    def _group_sensors(self) -> Dict[int, List[int]]:
        """传感器分组"""
        groups = {}
        sensors_per_group = self.n_sensors // self.sensor_groups
        
        for g in range(self.sensor_groups):
            start = g * sensors_per_group
            end = start + sensors_per_group
            groups[g] = list(range(start, end))
        
        return groups
    
    def hierarchical_federated_learning(self):
        """
        分层联邦学习
        
        组内聚合 -> 全局聚合
        """
        # 第一层:组内联邦学习
        group_models = {}
        for group_id, sensor_ids in self.groups.items():
            # 组内客户端
            group_clients = [self._create_sensor_client(sid) for sid in sensor_ids]
            
            # 组内服务器
            group_server = FederatedServer(
                global_model=AnomalyDetectionAutoencoder(),
                aggregation_method='fedavg'
            )
            
            # 组内训练
            group_history = run_fedavg(
                server=group_server,
                clients=group_clients,
                n_rounds=50
            )
            
            group_models[group_id] = group_server.global_model
        
        # 第二层:全局聚合
        global_server = FederatedServer(
            global_model=AnomalyDetectionAutoencoder(),
            aggregation_method='fedavg'
        )
        
        # 聚合各组模型
        group_weights = [model.state_dict() for model in group_models.values()]
        group_samples = [len(self.groups[g]) for g in self.groups]
        
        global_weights = global_server.aggregate(group_weights, group_samples)
        global_server.update_global_model(global_weights)
        
        return global_server, group_models

4.3 跨组织知识共享

问题描述:

不同机构(如桥梁管理局、大学、咨询公司)希望共享知识但保护数据隐私。

联邦学习方案:

class CrossOrganizationFL:
    """
    跨组织联邦学习
    
    不同机构间安全共享知识
    """
    
    def __init__(self, organizations: List[str]):
        self.organizations = organizations
        self.n_orgs = len(organizations)
        
        # 为每个组织创建客户端
        self.org_clients = {}
        for org in organizations:
            self.org_clients[org] = self._create_org_client(org)
    
    def _create_org_client(self, org_name: str) -> FederatedClient:
        """创建组织客户端"""
        # 不同组织有不同的数据特点
        if org_name == 'bridge_admin':
            # 桥梁管理局:大量运营数据
            data = self._load_admin_data()
        elif org_name == 'university':
            # 大学:实验数据和理论研究
            data = self._load_university_data()
        elif org_name == 'consultant':
            # 咨询公司:多项目经验数据
            data = self._load_consultant_data()
        else:
            data = self._load_default_data()
        
        model = StructuralHealthModel()
        
        return FederatedClient(
            client_id=org_name,
            model=model,
            train_data=data['train'],
            test_data=data['test'],
            lr=0.001,
            epochs=5
        )
    
    def secure_aggregation(self):
        """
        安全聚合
        
        使用差分隐私或安全多方计算
        """
        # 实现安全聚合逻辑
        pass

5. 隐私保护技术

5.1 差分隐私(Differential Privacy)

差分隐私通过添加噪声保护个体数据隐私。

定义:

一个随机算法 M M M满足 ( ϵ , δ ) (\epsilon, \delta) (ϵ,δ)-差分隐私,如果对于所有相邻数据集 D D D D ′ D' D,以及所有输出 S S S

P [ M ( D ) ∈ S ] ≤ e ϵ P [ M ( D ′ ) ∈ S ] + δ P[M(D) \in S] \leq e^\epsilon P[M(D') \in S] + \delta P[M(D)S]eϵP[M(D)S]+δ

Python实现:

class DPFederatedClient(FederatedClient):
    """
    差分隐私联邦学习客户端
    """
    
    def __init__(self, client_id: int, model: nn.Module, 
                 train_data: torch.utils.data.Dataset,
                 epsilon: float = 1.0, delta: float = 1e-5,
                 max_grad_norm: float = 1.0,
                 **kwargs):
        super().__init__(client_id, model, train_data, **kwargs)
        
        self.epsilon = epsilon
        self.delta = delta
        self.max_grad_norm = max_grad_norm
        
        # 计算噪声乘数
        self.noise_multiplier = self._compute_noise_multiplier()
    
    def _compute_noise_multiplier(self) -> float:
        """计算噪声乘数"""
        # 使用高斯机制
        # sigma = sqrt(2 * ln(1.25/delta)) / epsilon
        sigma = np.sqrt(2 * np.log(1.25 / self.delta)) / self.epsilon
        return sigma
    
    def local_train(self, global_weights: Optional[Dict] = None) -> Tuple[Dict, float, int]:
        """
        本地训练(带差分隐私)
        """
        if global_weights is not None:
            self.model.load_state_dict(global_weights)
        
        self.model.train()
        total_loss = 0
        
        for epoch in range(self.epochs):
            epoch_loss = 0
            for batch_idx, (data, target) in enumerate(self.train_loader):
                self.optimizer.zero_grad()
                
                output = self.model(data)
                loss = self.criterion(output, target)
                loss.backward()
                
                # 梯度裁剪
                torch.nn.utils.clip_grad_norm_(
                    self.model.parameters(), 
                    self.max_grad_norm
                )
                
                # 添加高斯噪声
                with torch.no_grad():
                    for param in self.model.parameters():
                        if param.grad is not None:
                            noise = torch.normal(
                                mean=0, 
                                std=self.noise_multiplier * self.max_grad_norm,
                                size=param.grad.shape
                            )
                            param.grad += noise
                
                self.optimizer.step()
                epoch_loss += loss.item()
            
            avg_epoch_loss = epoch_loss / len(self.train_loader)
            self.train_history.append(avg_epoch_loss)
            total_loss += avg_epoch_loss
        
        avg_loss = total_loss / self.epochs
        n_samples = len(self.train_data)
        
        return copy.deepcopy(self.model.state_dict()), avg_loss, n_samples

5.2 安全多方计算(Secure Multi-Party Computation)

安全多方计算允许多方在不泄露输入的情况下共同计算函数。

核心思想:

  1. 秘密分享:将数据分割成多个份额
  2. 安全计算:在密文上执行计算
  3. 结果重构:组合计算结果

简单实现:

class SecureAggregation:
    """
    安全聚合
    
    使用秘密分享保护模型参数
    """
    
    def __init__(self, n_clients: int, threshold: int):
        self.n_clients = n_clients
        self.threshold = threshold  # 重构阈值
    
    def share_secret(self, value: float) -> List[float]:
        """
        秘密分享
        
        使用Shamir秘密分享
        """
        # 生成随机多项式系数
        coefficients = [value] + [np.random.random() for _ in range(self.threshold - 1)]
        
        # 生成份额
        shares = []
        for i in range(1, self.n_clients + 1):
            share = sum(coef * (i ** j) for j, coef in enumerate(coefficients))
            shares.append(share)
        
        return shares
    
    def reconstruct_secret(self, shares: List[float], indices: List[int]) -> float:
        """
        重构秘密
        
        使用拉格朗日插值
        """
        secret = 0
        for i, share in zip(indices, shares):
            # 计算拉格朗日基多项式
            li = 1
            for j in indices:
                if i != j:
                    li *= -j / (i - j)
            secret += share * li
        
        return secret

5.3 同态加密(Homomorphic Encryption)

同态加密允许在密文上直接进行计算。

class HomomorphicEncryption:
    """
    同态加密简化实现
    
    基于Paillier密码系统的简化版本
    """
    
    def __init__(self, key_size: int = 1024):
        self.key_size = key_size
        self._generate_keys()
    
    def _generate_keys(self):
        """生成密钥对"""
        # 选择两个大素数
        p = self._generate_prime()
        q = self._generate_prime()
        
        self.n = p * q
        self.n_squared = self.n ** 2
        
        # 公钥: (n, g)
        self.g = self.n + 1
        
        # 私钥: (lambda, mu)
        self.lambda_val = (p - 1) * (q - 1)
        self.mu = self._mod_inverse(self.lambda_val, self.n)
    
    def encrypt(self, plaintext: int) -> int:
        """加密"""
        # 选择随机数r
        r = np.random.randint(1, self.n)
        
        # c = g^m * r^n mod n^2
        ciphertext = (pow(self.g, plaintext, self.n_squared) * 
                     pow(r, self.n, self.n_squared)) % self.n_squared
        
        return ciphertext
    
    def decrypt(self, ciphertext: int) -> int:
        """解密"""
        # m = L(c^lambda mod n^2) * mu mod n
        # 其中 L(u) = (u - 1) / n
        u = pow(ciphertext, self.lambda_val, self.n_squared)
        l = (u - 1) // self.n
        plaintext = (l * self.mu) % self.n
        
        return plaintext
    
    def add_encrypted(self, c1: int, c2: int) -> int:
        """
        同态加法
        
        E(m1) * E(m2) = E(m1 + m2)
        """
        return (c1 * c2) % self.n_squared
    
    def multiply_constant(self, ciphertext: int, constant: int) -> int:
        """
        密文与常数相乘
        
        E(m)^k = E(m * k)
        """
        return pow(ciphertext, constant, self.n_squared)
    
    def _generate_prime(self) -> int:
        """生成素数(简化版)"""
        while True:
            num = np.random.randint(2**(self.key_size//2 - 1), 2**(self.key_size//2))
            if self._is_prime(num):
                return num
    
    def _is_prime(self, n: int, k: int = 10) -> bool:
        """Miller-Rabin素性测试"""
        if n < 2:
            return False
        if n == 2 or n == 3:
            return True
        if n % 2 == 0:
            return False
        
        r, d = 0, n - 1
        while d % 2 == 0:
            r += 1
            d //= 2
        
        for _ in range(k):
            a = np.random.randint(2, n - 1)
            x = pow(a, d, n)
            if x == 1 or x == n - 1:
                continue
            for _ in range(r - 1):
                x = pow(x, 2, n)
                if x == n - 1:
                    break
            else:
                return False
        return True
    
    def _mod_inverse(self, a: int, m: int) -> int:
        """计算模逆元"""
        def extended_gcd(a, b):
            if a == 0:
                return b, 0, 1
            gcd, x1, y1 = extended_gcd(b % a, a)
            x = y1 - (b // a) * x1
            y = x1
            return gcd, x, y
        
        _, x, _ = extended_gcd(a % m, m)
        return (x % m + m) % m

6. Python联邦学习仿真实现

6.1 仿真场景设计

本仿真实现以下场景:

场景1:多桥梁损伤识别

  • 5座桥梁,每座桥梁有不同的结构特性
  • 每座桥梁1000个训练样本
  • 目标:协同训练损伤识别模型

场景2:数据异构性分析

  • 对比IID和Non-IID数据分布
  • 分析不同聚合算法的性能

场景3:隐私保护效果评估

  • 差分隐私噪声对模型性能的影响
  • 隐私预算与模型精度的权衡

6.2 环境实现

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.patches import Rectangle, Circle, FancyBboxPatch, FancyArrowPatch
from matplotlib.colors import LinearSegmentedColormap
import torch
import torch.nn as nn
import torch.optim as optim
from collections import defaultdict
import copy
import warnings
from typing import List, Dict, Tuple, Optional
import os

# 设置matplotlib后端
plt.switch_backend('Agg')
warnings.filterwarnings('ignore')

# 设置随机种子
np.random.seed(42)
torch.manual_seed(42)

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False

print("=" * 80)
print("结构健康监测仿真 - 主题076: 结构健康监测中的联邦学习")
print("Federated Learning in Structural Health Monitoring")
print("=" * 80)

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐