结构健康监测仿真-主题064-结构健康监测中的联邦学习技术
第六十四篇:结构健康监测中的联邦学习技术
摘要
随着结构健康监测系统的规模不断扩大,多源异构数据的融合与隐私保护成为关键挑战。联邦学习作为一种分布式机器学习范式,能够在保护数据隐私的前提下实现跨机构、跨地域的协同建模,为结构健康监测领域的数据共享与知识迁移提供了创新解决方案。本主题系统阐述联邦学习的理论基础、核心算法及其在结构健康监测中的应用,重点探讨横向联邦学习、纵向联邦学习与联邦迁移学习三种范式,分析联邦平均算法、差分隐私机制与安全聚合协议的技术实现。通过Python仿真实现多桥梁协同损伤识别场景,验证联邦学习在保护数据隐私的同时提升模型泛化能力的有效性,为构建分布式、隐私保护的结构健康监测智能系统提供理论指导与技术参考。
关键词
联邦学习;结构健康监测;隐私保护;分布式机器学习;横向联邦学习;纵向联邦学习;联邦迁移学习;差分隐私;安全聚合





1. 联邦学习基础理论
1.1 联邦学习的定义与起源
联邦学习(Federated Learning,FL)是一种分布式机器学习范式,由Google研究团队在2016年首次提出,旨在解决移动设备上的模型训练问题。与传统集中式机器学习不同,联邦学习允许多个参与方在本地训练模型,仅共享模型参数(而非原始数据),通过聚合服务器协调实现全局模型的协同优化。
联邦学习的核心思想可以用以下数学框架描述:
minwF(w)=∑k=1KnknFk(w)\min_{w} F(w) = \sum_{k=1}^{K} \frac{n_k}{n} F_k(w)wminF(w)=k=1∑KnnkFk(w)
其中,www表示模型参数,KKK为参与方数量,nkn_knk为第kkk个参与方的样本数,n=∑k=1Knkn = \sum_{k=1}^{K} n_kn=∑k=1Knk为总样本数,Fk(w)F_k(w)Fk(w)为第kkk个参与方的本地损失函数。
联邦学习的出现背景与结构健康监测领域的需求高度契合:
(1)数据孤岛问题:不同桥梁、不同管理单位的数据相互隔离,难以集中训练
(2)隐私保护需求:结构监测数据涉及基础设施安全信息,具有高度敏感性
(3)通信带宽限制:海量监测数据传输成本高昂,边缘网络带宽受限
(4)数据异构性:不同结构的监测数据分布差异显著,需要个性化建模
1.2 联邦学习的核心特征
联邦学习具有以下区别于传统机器学习的核心特征:
(1)数据本地化:原始数据始终保留在本地设备或机构,不上传到中央服务器。这一特征从根本上解决了数据隐私泄露风险,符合《数据安全法》《个人信息保护法》等法规要求。
(2)模型参数共享:参与方仅上传模型参数(如神经网络权重、梯度信息),而非原始数据。模型参数的信息量远低于原始数据,降低了隐私泄露风险。
(3)分布式协同优化:通过迭代式的本地训练与全局聚合,实现多方协同建模。每轮通信仅传输模型参数,大幅降低了通信开销。
(4)非独立同分布(Non-IID)数据:联邦学习场景下的数据通常呈现统计异质性,即不同参与方的数据分布Pk(x,y)P_k(x, y)Pk(x,y)存在显著差异。这一特征对算法设计提出了更高要求。
(5)系统异构性:参与方的计算能力、网络条件、可用时间存在差异,需要设计异步聚合策略与容错机制。
1.3 联邦学习与传统机器学习的对比
| 特性 | 传统集中式机器学习 | 联邦学习 |
|---|---|---|
| 数据存储 | 集中存储于数据中心 | 分布式存储于各参与方 |
| 隐私保护 | 依赖数据脱敏与访问控制 | 数据不出本地,天然隐私保护 |
| 通信开销 | 一次性传输全部数据 | 迭代传输模型参数 |
| 数据分布 | 假设独立同分布(IID) | 允许非独立同分布(Non-IID) |
| 模型训练 | 中心化训练 | 分布式协同训练 |
| 适用场景 | 数据可集中管理 | 数据分散、隐私敏感 |
在结构健康监测领域,联邦学习的优势尤为突出:不同桥梁管理单位可以在不共享原始监测数据的前提下,协同训练损伤识别模型,实现知识共享与模型性能提升。
2. 联邦学习的技术架构
2.1 系统架构组成
联邦学习系统通常由三类角色组成:
(1)参与方(Clients/Workers):拥有本地数据的实体,如桥梁监测系统的边缘节点、不同桥梁的管理单位。参与方负责本地模型训练,计算梯度或参数更新。
(2)聚合服务器(Aggregation Server):协调联邦学习过程的中央服务器,负责收集参与方的模型更新、执行聚合算法、分发全局模型。服务器不接触原始数据。
(3)可信第三方(可选):在部分安全协议中,引入可信第三方协助密钥分发、安全聚合等操作。
联邦学习的通信拓扑结构主要有三种:
中心化架构(Client-Server):所有参与方与中央服务器通信,这是最常用的架构。服务器维护全局模型,参与方定期上传本地更新。
去中心化架构(Peer-to-Peer):参与方之间直接通信,通过 gossip 协议实现模型同步。这种架构消除了单点故障风险,但通信复杂度较高。
分层架构(Hierarchical):引入边缘服务器作为中间层,形成"云-边-端"三级架构。边缘服务器聚合区域内参与方的更新,再与云端服务器通信。
2.2 联邦学习的基本流程
联邦学习的典型工作流程包含以下步骤:
步骤1:初始化
中央服务器初始化全局模型参数w0w_0w0,并广播给所有参与方。
步骤2:本地训练
每个参与方kkk基于本地数据集DkD_kDk进行多轮梯度下降:
wkt+1=wkt−η∇Fk(wkt;Dk)w_k^{t+1} = w_k^t - \eta \nabla F_k(w_k^t; D_k)wkt+1=wkt−η∇Fk(wkt;Dk)
其中,η\etaη为学习率,∇Fk\nabla F_k∇Fk为本地损失函数的梯度。
步骤3:参数上传
参与方将本地模型更新Δwk=wkt+1−wt\Delta w_k = w_k^{t+1} - w^tΔwk=wkt+1−wt上传至服务器。
步骤4:全局聚合
服务器执行聚合算法,更新全局模型:
wt+1=wt+∑k=1KnknΔwkw^{t+1} = w^t + \sum_{k=1}^{K} \frac{n_k}{n} \Delta w_kwt+1=wt+k=1∑KnnkΔwk
步骤5:模型分发
服务器将更新后的全局模型广播给参与方,进入下一轮迭代。
上述过程重复进行,直到全局模型收敛或达到预设的通信轮数。
2.3 联邦学习的分类
根据数据分布特征,联邦学习可分为三类:
(1)横向联邦学习(Horizontal Federated Learning,HFL)
适用于参与方数据特征重叠较多、样本空间不同的场景。在结构健康监测中,不同桥梁监测同类损伤模式(如裂缝、腐蚀),但监测数据来自不同结构,适合采用横向联邦学习。
数学描述:参与方kkk的数据集Dk={(xi,yi)}i=1nkD_k = \{(x_i, y_i)\}_{i=1}^{n_k}Dk={(xi,yi)}i=1nk,其中特征空间X\mathcal{X}X和标签空间Y\mathcal{Y}Y相同,但数据分布Pk(x,y)P_k(x, y)Pk(x,y)可能不同。
(2)纵向联邦学习(Vertical Federated Learning,VFL)
适用于参与方样本空间重叠较多、特征空间不同的场景。在结构健康监测中,同一座桥梁的不同监测系统(如振动监测、应变监测、视觉监测)可以协同建模,适合采用纵向联邦学习。
数学描述:参与方kkk拥有数据集DkD_kDk的部分特征,所有参与方的样本ID集合存在交集I=⋂k=1KIkI = \bigcap_{k=1}^{K} I_kI=⋂k=1KIk,通过安全对齐实现特征联合。
(3)联邦迁移学习(Federated Transfer Learning,FTL)
适用于参与方数据特征空间和样本空间均重叠较少的场景。在结构健康监测中,不同结构类型(如桥梁与建筑)之间进行知识迁移,适合采用联邦迁移学习。
数学描述:源域DSD_SDS和目标域DTD_TDT的数据分布不同,通过迁移学习技术实现知识从源域到目标域的迁移。
3. 联邦学习核心算法
3.1 联邦平均算法(FedAvg)
联邦平均算法(Federated Averaging,FedAvg)是联邦学习最基础的聚合算法,由McMahan等人在2017年提出。
算法原理:
FedAvg算法的核心思想是在本地执行多轮随机梯度下降(SGD),然后上传模型参数而非梯度,服务器执行加权平均:
wt+1=∑k=1Knknwkt+1w_{t+1} = \sum_{k=1}^{K} \frac{n_k}{n} w_k^{t+1}wt+1=k=1∑Knnkwkt+1
其中,wkt+1w_k^{t+1}wkt+1为参与方kkk经过EEE个本地epoch训练后的模型参数。
算法优势:
- 通信效率高:本地多轮训练减少了通信轮数
- 计算效率高:充分利用本地计算资源
- 实现简单:易于部署和扩展
算法局限性:
- Non-IID数据收敛慢:当数据分布差异大时,收敛速度显著下降
- 异构设备适应性差:难以处理设备掉线、计算能力差异等问题
- 公平性问题:数据量大的参与方对全局模型影响过大
3.2 联邦学习优化算法
针对FedAvg的局限性,研究者提出了多种改进算法:
(1)FedProx算法
FedProx在本地目标函数中引入近端项,限制本地模型与全局模型的偏离程度:
minwFk(w)+μ2∥w−wt∥2\min_{w} F_k(w) + \frac{\mu}{2} \|w - w^t\|^2wminFk(w)+2μ∥w−wt∥2
其中,μ\muμ为正则化系数,控制本地模型与全局模型的差异。该算法提高了Non-IID数据下的收敛稳定性。
(2)SCAFFOLD算法
SCAFFOLD(Stochastic Controlled Averaging for Federated Learning)引入控制变量(Control Variates)来校正本地更新的方向,减少客户端漂移(Client Drift)问题。
(3)FedOpt算法族
FedOpt将自适应优化器(如Adam、Adagrad)引入联邦学习,服务器端使用自适应聚合策略:
mt+1=β1mt+(1−β1)Δwtm_{t+1} = \beta_1 m_t + (1-\beta_1) \Delta w_tmt+1=β1mt+(1−β1)Δwt
vt+1=β2vt+(1−β2)(Δwt)2v_{t+1} = \beta_2 v_t + (1-\beta_2) (\Delta w_t)^2vt+1=β2vt+(1−β2)(Δwt)2
wt+1=wt−ηmt+1vt+1+ϵw_{t+1} = w_t - \eta \frac{m_{t+1}}{\sqrt{v_{t+1}} + \epsilon}wt+1=wt−ηvt+1+ϵmt+1
3.3 差分隐私机制
差分隐私(Differential Privacy,DP)是保护模型参数隐私的数学框架。
定义:随机算法MMM满足(ϵ,δ)(\epsilon, \delta)(ϵ,δ)-差分隐私,如果对于任意相邻数据集DDD和D′D'D′(仅相差一个样本),以及任意输出子集SSS:
P[M(D)∈S]≤eϵP[M(D′)∈S]+δP[M(D) \in S] \leq e^{\epsilon} P[M(D') \in S] + \deltaP[M(D)∈S]≤eϵP[M(D′)∈S]+δ
其中,ϵ\epsilonϵ为隐私预算,δ\deltaδ为失败概率。
在联邦学习中的应用:
- 本地差分隐私(LDP):参与方在本地训练时添加噪声
- 全局差分隐私(GDP):服务器在聚合后添加噪声
- 梯度裁剪与噪声注入:限制单个样本对梯度的影响
g~=gmax(1,∥g∥2C)+N(0,σ2C2I)\tilde{g} = \frac{g}{\max(1, \frac{\|g\|_2}{C})} + \mathcal{N}(0, \sigma^2 C^2 I)g~=max(1,C∥g∥2)g+N(0,σ2C2I)
其中,CCC为裁剪阈值,σ\sigmaσ为噪声尺度。
3.4 安全聚合协议
安全聚合(Secure Aggregation)协议确保服务器只能看到聚合后的模型更新,无法获取单个参与方的更新。
基于秘密共享的安全聚合:
- 每个参与方将模型更新拆分为KKK个秘密份额
- 参与方之间两两交换秘密份额
- 参与方将收到的所有份额相加,上传给服务器
- 服务器聚合所有参与方的份额和,得到全局更新
基于同态加密的安全聚合:
使用同态加密技术,服务器可以直接在密文上执行聚合操作,无需解密单个参与方的更新。
4. 联邦学习在结构健康监测中的应用
4.1 多桥梁协同损伤识别
应用场景:多个城市的路桥管理部门希望在保护各自监测数据隐私的前提下,协同训练损伤识别模型。
联邦学习方案:
- 数据分布:每个城市拥有多座桥梁的监测数据,数据特征相同(加速度、应变等),但损伤样本分布不同
- 学习范式:采用横向联邦学习
- 模型架构:卷积神经网络(CNN)或长短期记忆网络(LSTM)
- 隐私保护:结合差分隐私与安全聚合
技术优势:
- 各城市数据不出本地,保护敏感基础设施信息
- 利用多源数据提升模型泛化能力
- 适应不同桥梁类型的损伤特征
4.2 多源传感器数据融合
应用场景:同一座桥梁部署了多种传感器(振动、应变、视觉、温度),需要融合多源数据进行综合评估。
联邦学习方案:
- 数据分布:不同传感器系统拥有相同样本(时间戳对齐),但特征空间不同
- 学习范式:采用纵向联邦学习
- 对齐机制:基于安全多方计算实现样本ID对齐
- 模型架构:分裂学习(Split Learning)或联邦特征提取
技术优势:
- 各传感器系统独立维护数据
- 实现多源异构数据的深度融合
- 支持增量式传感器接入
4.3 跨结构类型知识迁移
应用场景:新建桥梁缺乏历史损伤数据,需要利用既有桥梁(如老旧桥梁、相似结构)的知识进行迁移学习。
联邦学习方案:
- 数据分布:源域(既有桥梁)与目标域(新建桥梁)的数据分布不同
- 学习范式:采用联邦迁移学习
- 迁移策略:领域自适应(Domain Adaptation)或元学习(Meta-Learning)
- 模型架构:共享特征提取器+个性化分类器
技术优势:
- 解决新建桥梁冷启动问题
- 实现跨结构类型的知识共享
- 支持模型个性化定制
4.4 边缘-云协同智能监测
应用场景:大规模传感器网络需要在边缘端进行实时推理,同时利用云端进行模型训练与更新。
联邦学习方案:
- 架构设计:分层联邦学习架构
- 边缘聚合:边缘网关聚合区域内传感器数据
- 云端聚合:云端服务器聚合各边缘网关的模型更新
- 模型下发:全局模型下发至边缘端进行推理
技术优势:
- 降低云端通信开销
- 支持边缘端实时决策
- 实现模型的持续学习与更新
5. 联邦学习性能评估指标
5.1 模型性能指标
(1)准确率(Accuracy)
Accuracy=TP+TNTP+TN+FP+FN\text{Accuracy} = \frac{TP + TN}{TP + TN + FP + FN}Accuracy=TP+TN+FP+FNTP+TN
(2)精确率(Precision)与召回率(Recall)
Precision=TPTP+FP,Recall=TPTP+FN\text{Precision} = \frac{TP}{TP + FP}, \quad \text{Recall} = \frac{TP}{TP + FN}Precision=TP+FPTP,Recall=TP+FNTP
(3)F1分数
F1=2⋅Precision⋅RecallPrecision+Recall\text{F1} = 2 \cdot \frac{\text{Precision} \cdot \text{Recall}}{\text{Precision} + \text{Recall}}F1=2⋅Precision+RecallPrecision⋅Recall
(4)AUC-ROC:ROC曲线下面积,评估模型区分能力
5.2 通信效率指标
(1)通信轮数:达到目标精度所需的通信轮数
(2)通信开销:每轮通信传输的数据量
Communication Cost=Rounds×Model Size×Participants\text{Communication Cost} = \text{Rounds} \times \text{Model Size} \times \text{Participants}Communication Cost=Rounds×Model Size×Participants
(3)压缩率:模型压缩前后的数据量之比
5.3 隐私保护指标
(1)隐私预算ϵ\epsilonϵ:差分隐私参数,越小表示隐私保护越强
(2)成员推断攻击成功率:评估模型对成员推断攻击的抵抗能力
(3)模型反演攻击恢复率:评估从模型参数恢复原始数据的难度
5.4 公平性指标
(1)参与方公平性:不同参与方的模型性能差异
Fairness=1−std({Ak}k=1K)mean({Ak}k=1K)\text{Fairness} = 1 - \frac{\text{std}(\{A_k\}_{k=1}^K)}{\text{mean}(\{A_k\}_{k=1}^K)}Fairness=1−mean({Ak}k=1K)std({Ak}k=1K)
其中,AkA_kAk为参与方kkk的本地测试准确率。
(2)收敛公平性:各参与方收敛速度的一致性
6. Python仿真实现
6.1 仿真场景设计
本仿真实现一个多桥梁协同损伤识别的联邦学习场景:
场景设定:
- 5个桥梁管理单位(参与方),各管理2-4座桥梁
- 每座桥梁部署多个传感器采集振动数据
- 损伤类型包括:无损伤、轻微损伤、中度损伤、严重损伤
- 各参与方的数据分布存在差异(Non-IID)
仿真目标:
- 对比联邦学习与本地独立训练的性能差异
- 评估Non-IID数据对联邦学习的影响
- 验证差分隐私机制的隐私保护效果
- 可视化联邦学习训练过程
6.2 核心代码实现
# -*- coding: utf-8 -*-
"""
主题064:结构健康监测中的联邦学习技术
联邦学习多桥梁协同损伤识别仿真
"""
import matplotlib
matplotlib.use('Agg') # 非交互式后端
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import FancyBboxPatch, Circle, Rectangle, FancyArrowPatch
from matplotlib.collections import LineCollection
import warnings
warnings.filterwarnings('ignore')
import os
from datetime import datetime
import json
# 创建输出目录
output_dir = r'd:\文档\500仿真领域\工程仿真\结构健康监测仿真\主题064\simulation_results'
os.makedirs(output_dir, exist_ok=True)
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
print("="*80)
print("结构健康监测联邦学习仿真")
print("="*80)
# ==============================================================================
# 步骤1:定义联邦学习基础组件
# ==============================================================================
print("\n【步骤1】初始化联邦学习组件...")
class FederatedClient:
"""联邦学习参与方(桥梁管理单位)"""
def __init__(self, client_id, num_bridges, data_distribution='iid'):
"""
初始化联邦学习参与方
参数:
client_id: 参与方ID
num_bridges: 管理的桥梁数量
data_distribution: 数据分布类型 ('iid' 或 'non-iid')
"""
self.client_id = client_id
self.num_bridges = num_bridges
self.data_distribution = data_distribution
# 本地数据集
self.X_train = None
self.y_train = None
self.X_test = None
self.y_test = None
# 本地模型参数
self.local_weights = None
self.local_bias = None
# 训练历史
self.train_history = {'loss': [], 'accuracy': []}
# 隐私预算(差分隐私)
self.epsilon = 1.0
self.noise_scale = 0.01
def generate_data(self, n_samples_per_bridge=500, n_features=10):
"""
生成模拟监测数据
参数:
n_samples_per_bridge: 每座桥梁的样本数
n_features: 特征维度
"""
np.random.seed(42 + self.client_id)
n_samples = self.num_bridges * n_samples_per_bridge
# 特征:模拟结构健康监测指标
# 包括:振动频率、振幅、应变、温度、湿度等
X = np.random.randn(n_samples, n_features)
# 根据数据分布类型生成标签
if self.data_distribution == 'iid':
# 独立同分布:各损伤类型均匀分布
y = np.random.randint(0, 4, n_samples) # 4类损伤状态
else:
# 非独立同分布:各参与方偏向特定损伤类型
dominant_class = self.client_id % 4
y = np.random.choice(4, n_samples,
p=[0.1, 0.1, 0.1, 0.7] if dominant_class == 3 else
[0.7, 0.1, 0.1, 0.1] if dominant_class == 0 else
[0.1, 0.7, 0.1, 0.1] if dominant_class == 1 else
[0.1, 0.1, 0.7, 0.1])
# 添加特征与标签的相关性
for i in range(n_samples):
X[i, 0] += y[i] * 0.5 # 特征0与损伤程度相关
X[i, 1] += y[i] * 0.3 # 特征1与损伤程度相关
X[i, 2] += np.random.randn() * (1 + y[i] * 0.2) # 噪声随损伤增加
# 划分训练集和测试集
split_idx = int(0.8 * n_samples)
self.X_train = X[:split_idx]
self.y_train = y[:split_idx]
self.X_test = X[split_idx:]
self.y_test = y[split_idx:]
# 初始化模型参数
self.local_weights = np.random.randn(n_features, 4) * 0.01
self.local_bias = np.zeros(4)
def local_train(self, global_weights, global_bias, epochs=5, lr=0.01):
"""
本地训练
参数:
global_weights: 全局模型权重
global_bias: 全局模型偏置
epochs: 本地训练轮数
lr: 学习率
"""
# 使用全局模型初始化本地模型
self.local_weights = global_weights.copy()
self.local_bias = global_bias.copy()
n_samples = len(self.X_train)
for epoch in range(epochs):
# 前向传播
logits = self.X_train @ self.local_weights + self.local_bias
# Softmax
exp_logits = np.exp(logits - np.max(logits, axis=1, keepdims=True))
probs = exp_logits / np.sum(exp_logits, axis=1, keepdims=True)
# 计算损失(交叉熵)
loss = -np.mean(np.log(probs[range(n_samples), self.y_train] + 1e-8))
# 反向传播
grad_probs = probs.copy()
grad_probs[range(n_samples), self.y_train] -= 1
grad_probs /= n_samples
grad_weights = self.X_train.T @ grad_probs
grad_bias = np.sum(grad_probs, axis=0)
# 添加差分隐私噪声
if self.epsilon < float('inf'):
noise_w = np.random.randn(*grad_weights.shape) * self.noise_scale
noise_b = np.random.randn(*grad_bias.shape) * self.noise_scale
grad_weights += noise_w
grad_bias += noise_b
# 更新参数
self.local_weights -= lr * grad_weights
self.local_bias -= lr * grad_bias
# 记录训练历史
if epoch % 2 == 0:
acc = self.evaluate()
self.train_history['loss'].append(loss)
self.train_history['accuracy'].append(acc)
return self.local_weights, self.local_bias
def evaluate(self):
"""评估本地模型性能"""
logits = self.X_test @ self.local_weights + self.local_bias
predictions = np.argmax(logits, axis=1)
accuracy = np.mean(predictions == self.y_test)
return accuracy
class FederatedServer:
"""联邦学习聚合服务器"""
def __init__(self, n_features, n_classes):
"""
初始化联邦学习服务器
参数:
n_features: 特征维度
n_classes: 类别数
"""
self.n_features = n_features
self.n_classes = n_classes
# 全局模型参数
self.global_weights = np.random.randn(n_features, n_classes) * 0.01
self.global_bias = np.zeros(n_classes)
# 训练历史
self.global_history = {'round': [], 'accuracy': []}
# 参与方信息
self.clients = []
def register_clients(self, clients):
"""注册参与方"""
self.clients = clients
def aggregate(self, client_updates):
"""
聚合参与方更新(FedAvg算法)
参数:
client_updates: 列表,每个元素为 (client_id, weights, bias, n_samples)
"""
total_samples = sum([update[3] for update in client_updates])
# 加权平均
new_weights = np.zeros_like(self.global_weights)
new_bias = np.zeros_like(self.global_bias)
for client_id, weights, bias, n_samples in client_updates:
weight = n_samples / total_samples
new_weights += weight * weights
new_bias += weight * bias
self.global_weights = new_weights
self.global_bias = new_bias
def evaluate_global_model(self, test_clients):
"""
评估全局模型性能
参数:
test_clients: 用于测试的参与方列表
"""
total_correct = 0
total_samples = 0
for client in test_clients:
logits = client.X_test @ self.global_weights + self.global_bias
predictions = np.argmax(logits, axis=1)
total_correct += np.sum(predictions == client.y_test)
total_samples += len(client.y_test)
accuracy = total_correct / total_samples
return accuracy
# ==============================================================================
# 步骤2:创建联邦学习场景
# ==============================================================================
print("\n【步骤2】创建联邦学习场景...")
# 创建5个参与方(桥梁管理单位)
n_clients = 5
clients = []
for i in range(n_clients):
num_bridges = np.random.randint(2, 5) # 每单位管理2-4座桥梁
# 前3个为IID,后2个为Non-IID
dist_type = 'iid' if i < 3 else 'non-iid'
client = FederatedClient(i, num_bridges, dist_type)
client.generate_data()
clients.append(client)
print(f" 参与方 {i}: {num_bridges}座桥梁, 样本数 {len(client.X_train)}, 分布类型: {dist_type}")
# 创建服务器
server = FederatedServer(n_features=10, n_classes=4)
server.register_clients(clients)
print(f" ✓ 已创建 {n_clients} 个参与方和 1 个聚合服务器")
# ==============================================================================
# 步骤3:运行联邦学习训练
# ==============================================================================
print("\n【步骤3】运行联邦学习训练...")
n_rounds = 20
participation_rate = 0.8 # 每轮参与的参与方比例
for round_idx in range(n_rounds):
# 选择本轮参与的参与方
n_participate = max(1, int(n_clients * participation_rate))
selected_clients = np.random.choice(clients, n_participate, replace=False)
client_updates = []
# 本地训练
for client in selected_clients:
weights, bias = client.local_train(
server.global_weights,
server.global_bias,
epochs=5,
lr=0.01
)
n_samples = len(client.X_train)
client_updates.append((client.client_id, weights, bias, n_samples))
# 服务器聚合
server.aggregate(client_updates)
# 评估全局模型
global_acc = server.evaluate_global_model(clients)
server.global_history['round'].append(round_idx + 1)
server.global_history['accuracy'].append(global_acc)
if (round_idx + 1) % 5 == 0:
print(f" 轮次 {round_idx+1}/{n_rounds}: 全局准确率 = {global_acc:.4f}")
print(f" ✓ 联邦学习训练完成,最终全局准确率: {global_acc:.4f}")
# ==============================================================================
# 步骤4:对比实验 - 本地独立训练
# ==============================================================================
print("\n【步骤4】对比实验 - 本地独立训练...")
local_histories = []
for client in clients:
# 独立训练(不使用联邦学习)
local_weights = np.random.randn(10, 4) * 0.01
local_bias = np.zeros(4)
history = {'round': [], 'accuracy': []}
for round_idx in range(n_rounds):
# 模拟多轮本地训练
for epoch in range(5):
logits = client.X_train @ local_weights + local_bias
exp_logits = np.exp(logits - np.max(logits, axis=1, keepdims=True))
probs = exp_logits / np.sum(exp_logits, axis=1, keepdims=True)
n_samples = len(client.X_train)
grad_probs = probs.copy()
grad_probs[range(n_samples), client.y_train] -= 1
grad_probs /= n_samples
grad_weights = client.X_train.T @ grad_probs
grad_bias = np.sum(grad_probs, axis=0)
local_weights -= 0.01 * grad_weights
local_bias -= 0.01 * grad_bias
# 评估
test_logits = client.X_test @ local_weights + local_bias
predictions = np.argmax(test_logits, axis=1)
acc = np.mean(predictions == client.y_test)
history['round'].append(round_idx + 1)
history['accuracy'].append(acc)
local_histories.append(history)
print(f" 参与方 {client.client_id} 本地训练准确率: {acc:.4f}")
print(" ✓ 本地独立训练对比完成")
# ==============================================================================
# 步骤5:生成可视化
# ==============================================================================
print("\n【步骤5】生成可视化...")
# 可视化1:联邦学习系统架构图
print("\n【可视化1】生成联邦学习系统架构图...")
fig, ax = plt.subplots(figsize=(14, 10))
ax.set_xlim(0, 14)
ax.set_ylim(0, 10)
ax.axis('off')
ax.set_title('Federated Learning System Architecture for SHM', fontsize=16, fontweight='bold', pad=20)
# 绘制中央服务器
server_box = FancyBboxPatch((5.5, 8), 3, 1.2, boxstyle="round,pad=0.1",
facecolor='#3498db', edgecolor='#2980b9', linewidth=2)
ax.add_patch(server_box)
ax.text(7, 8.6, 'Aggregation Server', ha='center', va='center', fontsize=12, fontweight='bold', color='white')
ax.text(7, 8.2, 'Global Model', ha='center', va='center', fontsize=9, color='white')
# 绘制参与方
client_positions = [(1.5, 4), (4.5, 4), (7.5, 4), (10.5, 4), (1.5, 1)]
client_colors = ['#e74c3c', '#e67e22', '#f39c12', '#27ae60', '#9b59b6']
client_names = ['Client 1\n(3 Bridges)', 'Client 2\n(2 Bridges)', 'Client 3\n(4 Bridges)',
'Client 4\n(2 Bridges)', 'Client 5\n(3 Bridges)']
for i, ((x, y), color, name) in enumerate(zip(client_positions, client_colors, client_names)):
# 参与方框
client_box = FancyBboxPatch((x-0.8, y-0.6), 1.6, 1.2, boxstyle="round,pad=0.05",
facecolor=color, edgecolor='black', linewidth=1.5, alpha=0.8)
ax.add_patch(client_box)
ax.text(x, y, name, ha='center', va='center', fontsize=9, fontweight='bold', color='white')
# 绘制到服务器的双向箭头
if y > 2: # 上方的参与方
arrow_up = FancyArrowPatch((x, y+0.6), (x*0.3 + 7*0.7, 8),
arrowstyle='->', mutation_scale=15,
color='#2c3e50', linewidth=1.5, linestyle='--')
arrow_down = FancyArrowPatch((x*0.3 + 7*0.7, 8), (x, y+0.6),
arrowstyle='->', mutation_scale=15,
color='#2c3e50', linewidth=1.5)
else: # 下方的参与方
arrow_up = FancyArrowPatch((x, y+0.6), (x*0.5 + 7*0.5, 8),
arrowstyle='->', mutation_scale=15,
color='#2c3e50', linewidth=1.5, linestyle='--')
arrow_down = FancyArrowPatch((x*0.5 + 7*0.5, 8), (x, y+0.6),
arrowstyle='->', mutation_scale=15,
color='#2c3e50', linewidth=1.5)
ax.add_patch(arrow_up)
ax.add_patch(arrow_down)
# 添加图例说明
legend_y = 6.5
ax.text(7, legend_y, 'Upload: Local Model Update', ha='center', va='center', fontsize=9, style='italic')
ax.text(7, legend_y-0.3, 'Download: Global Model', ha='center', va='center', fontsize=9, style='italic')
# 添加数据隐私标识
for i, (x, y) in enumerate(client_positions):
privacy_circle = Circle((x-0.5, y-0.3), 0.15, facecolor='#2ecc71', edgecolor='black', linewidth=1)
ax.add_patch(privacy_circle)
ax.text(x-0.5, y-0.3, 'P', ha='center', va='center', fontsize=7, fontweight='bold', color='white')
ax.text(0.5, 0.5, 'P: Privacy Protected (Data Local)', fontsize=8, style='italic')
plt.tight_layout()
plt.savefig(f'{output_dir}/federated_learning_architecture.png', dpi=150, bbox_inches='tight')
plt.close()
print(" ✓ 系统架构图已保存")
# 可视化2:训练过程对比
print("\n【可视化2】生成训练过程对比图...")
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 左图:联邦学习全局模型性能
ax1 = axes[0]
ax1.plot(server.global_history['round'], server.global_history['accuracy'],
'b-', linewidth=2, marker='o', markersize=4, label='Federated Learning')
ax1.set_xlabel('Communication Round', fontsize=11)
ax1.set_ylabel('Global Accuracy', fontsize=11)
ax1.set_title('Federated Learning Performance', fontsize=12, fontweight='bold')
ax1.grid(True, alpha=0.3)
ax1.legend(fontsize=10)
ax1.set_ylim(0, 1)
# 右图:各参与方本地训练性能
ax2 = axes[1]
colors = ['#e74c3c', '#e67e22', '#f39c12', '#27ae60', '#9b59b6']
for i, (history, color) in enumerate(zip(local_histories, colors)):
label = f'Client {i+1}'
if i >= 3:
label += ' (Non-IID)'
ax2.plot(history['round'], history['accuracy'],
color=color, linewidth=1.5, alpha=0.7, label=label)
ax2.set_xlabel('Training Round', fontsize=11)
ax2.set_ylabel('Local Accuracy', fontsize=11)
ax2.set_title('Local Training Performance (Independent)', fontsize=12, fontweight='bold')
ax2.grid(True, alpha=0.3)
ax2.legend(fontsize=9, loc='lower right')
ax2.set_ylim(0, 1)
plt.tight_layout()
plt.savefig(f'{output_dir}/training_comparison.png', dpi=150, bbox_inches='tight')
plt.close()
print(" ✓ 训练过程对比图已保存")
# 可视化3:联邦学习vs本地训练性能对比
print("\n【可视化3】生成性能对比图...")
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# 计算最终性能
fl_final_acc = server.global_history['accuracy'][-1]
local_final_accs = [h['accuracy'][-1] for h in local_histories]
# 左图:准确率对比
ax1 = axes[0]
methods = ['Federated\nLearning'] + [f'Client {i+1}\nLocal' for i in range(n_clients)]
accuracies = [fl_final_acc] + local_final_accs
colors_bar = ['#3498db'] + colors
bars = ax1.bar(range(len(methods)), accuracies, color=colors_bar, edgecolor='black', alpha=0.8)
ax1.set_ylabel('Accuracy', fontsize=11)
ax1.set_title('Final Accuracy Comparison', fontsize=12, fontweight='bold')
ax1.set_xticks(range(len(methods)))
ax1.set_xticklabels(methods, fontsize=9)
ax1.set_ylim(0, 1)
ax1.grid(True, alpha=0.3, axis='y')
# 在柱状图上添加数值
for bar, acc in zip(bars, accuracies):
height = bar.get_height()
ax1.text(bar.get_x() + bar.get_width()/2., height + 0.02,
f'{acc:.3f}', ha='center', va='bottom', fontsize=8)
# 中图:数据分布可视化(t-SNE风格)
ax2 = axes[1]
# 合并所有数据用于可视化
all_X = []
all_y = []
all_client = []
for client in clients:
all_X.append(client.X_test[:100]) # 取部分样本
all_y.append(client.y_test[:100])
all_client.extend([client.client_id] * 100)
all_X = np.vstack(all_X)
all_y = np.concatenate(all_y)
# 使用PCA降维到2D
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
X_2d = pca.fit_transform(all_X)
# 按参与方着色
scatter = ax2.scatter(X_2d[:, 0], X_2d[:, 1], c=all_client, cmap='tab10',
alpha=0.6, s=30, edgecolors='black', linewidth=0.5)
ax2.set_xlabel('First Principal Component', fontsize=11)
ax2.set_ylabel('Second Principal Component', fontsize=11)
ax2.set_title('Data Distribution by Client (PCA)', fontsize=12, fontweight='bold')
cbar = plt.colorbar(scatter, ax=ax2)
cbar.set_label('Client ID', fontsize=10)
# 右图:Non-IID影响分析
ax3 = axes[2]
iid_clients = [i for i, c in enumerate(clients) if c.data_distribution == 'iid']
non_iid_clients = [i for i, c in enumerate(clients) if c.data_distribution == 'non-iid']
iid_accs = [local_final_accs[i] for i in iid_clients]
non_iid_accs = [local_final_accs[i] for i in non_iid_clients]
x_pos = np.arange(2)
mean_accs = [np.mean(iid_accs), np.mean(non_iid_accs)]
std_accs = [np.std(iid_accs), np.std(non_iid_accs)]
bars = ax3.bar(x_pos, mean_accs, yerr=std_accs, capsize=5,
color=['#3498db', '#e74c3c'], edgecolor='black', alpha=0.8)
ax3.set_ylabel('Average Local Accuracy', fontsize=11)
ax3.set_title('Non-IID Data Impact', fontsize=12, fontweight='bold')
ax3.set_xticks(x_pos)
ax3.set_xticklabels(['IID Clients', 'Non-IID Clients'], fontsize=10)
ax3.set_ylim(0, 1)
ax3.grid(True, alpha=0.3, axis='y')
# 添加数值标签
for bar, mean_val in zip(bars, mean_accs):
ax3.text(bar.get_x() + bar.get_width()/2., bar.get_height() + 0.03,
f'{mean_val:.3f}', ha='center', va='bottom', fontsize=10, fontweight='bold')
plt.tight_layout()
plt.savefig(f'{output_dir}/performance_analysis.png', dpi=150, bbox_inches='tight')
plt.close()
print(" ✓ 性能分析图已保存")
# 可视化4:联邦学习算法流程
print("\n【可视化4】生成联邦学习算法流程图...")
fig, ax = plt.subplots(figsize=(12, 10))
ax.set_xlim(0, 12)
ax.set_ylim(0, 10)
ax.axis('off')
ax.set_title('Federated Learning Algorithm Flow (FedAvg)', fontsize=16, fontweight='bold', pad=20)
# 流程框
flow_steps = [
(6, 9, 'Initialize Global Model\n$w_0$'),
(2, 7, 'Local Training\n$w_k = w^t - \\eta \\nabla F_k$'),
(6, 7, 'Local Training\n$w_k = w^t - \\eta \\nabla F_k$'),
(10, 7, 'Local Training\n$w_k = w^t - \\eta \\nabla F_k$'),
(6, 5, 'Upload Updates\n$\\Delta w_k = w_k - w^t$'),
(6, 3, 'Server Aggregation\n$w^{t+1} = \\sum_k \\frac{n_k}{n} w_k$'),
(6, 1, 'Broadcast Model\n$w^{t+1} \\rightarrow$ Clients'),
]
box_colors = ['#3498db', '#e74c3c', '#e74c3c', '#e74c3c', '#f39c12', '#9b59b6', '#27ae60']
for i, (x, y, text) in enumerate(flow_steps):
width = 2.5 if i == 0 or i >= 4 else 2.2
height = 1.2
box = FancyBboxPatch((x-width/2, y-height/2), width, height,
boxstyle="round,pad=0.1",
facecolor=box_colors[i], edgecolor='black',
linewidth=2, alpha=0.8)
ax.add_patch(box)
ax.text(x, y, text, ha='center', va='center', fontsize=9,
fontweight='bold', color='white')
# 绘制箭头
arrows = [
((6, 8.4), (6, 7.6)), # 初始化到中间本地训练
((6, 8.4), (2, 7.6)), # 初始化到左侧本地训练
((6, 8.4), (10, 7.6)), # 初始化到右侧本地训练
((2, 6.4), (6, 5.6)), # 左侧到上传
((6, 6.4), (6, 5.6)), # 中间到上传
((10, 6.4), (6, 5.6)), # 右侧到上传
((6, 4.4), (6, 3.6)), # 上传到聚合
((6, 2.4), (6, 1.6)), # 聚合到广播
((6, 0.4), (6, -0.2), 'Repeat'), # 广播回初始化
]
for arrow in arrows:
if len(arrow) == 3:
start, end, label = arrow
ax.annotate('', xy=end, xytext=start,
arrowprops=dict(arrowstyle='->', color='#2c3e50', lw=2))
ax.text((start[0]+end[0])/2 + 0.5, (start[1]+end[1])/2, label,
fontsize=9, style='italic', color='#2c3e50')
else:
start, end = arrow
ax.annotate('', xy=end, xytext=start,
arrowprops=dict(arrowstyle='->', color='#2c3e50', lw=2))
# 添加说明
ax.text(0.5, 5, 'Step 1: Broadcast', fontsize=10, rotation=90,
va='center', color='#3498db', fontweight='bold')
ax.text(3.5, 7, 'Step 2: Local Update', fontsize=10,
ha='center', color='#e74c3c', fontweight='bold')
ax.text(6, 4.5, 'Step 3: Upload', fontsize=10,
ha='center', color='#f39c12', fontweight='bold')
ax.text(8.5, 3, 'Step 4: Aggregate', fontsize=10,
ha='center', color='#9b59b6', fontweight='bold')
plt.tight_layout()
plt.savefig(f'{output_dir}/federated_algorithm_flow.png', dpi=150, bbox_inches='tight')
plt.close()
print(" ✓ 算法流程图已保存")
# 可视化5:联邦学习训练动画
print("\n【可视化5】生成联邦学习训练动画...")
from matplotlib.animation import FuncAnimation
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# 准备动画数据
n_frames = min(20, n_rounds)
frame_indices = np.linspace(0, n_rounds-1, n_frames, dtype=int)
# 左图:全局模型准确率
ax1 = axes[0]
line1, = ax1.plot([], [], 'b-', linewidth=2, marker='o', markersize=6)
ax1.set_xlim(0, n_rounds)
ax1.set_ylim(0, 1)
ax1.set_xlabel('Communication Round', fontsize=11)
ax1.set_ylabel('Global Accuracy', fontsize=11)
ax1.set_title('Federated Learning Convergence', fontsize=12, fontweight='bold')
ax1.grid(True, alpha=0.3)
# 右图:各参与方本地性能
ax2 = axes[1]
lines = []
for i in range(n_clients):
line, = ax2.plot([], [], color=colors[i], linewidth=2, marker='o',
markersize=4, label=f'Client {i+1}')
lines.append(line)
ax2.set_xlim(0, n_rounds)
ax2.set_ylim(0, 1)
ax2.set_xlabel('Training Round', fontsize=11)
ax2.set_ylabel('Local Accuracy', fontsize=11)
ax2.set_title('Local Model Performance', fontsize=12, fontweight='bold')
ax2.grid(True, alpha=0.3)
ax2.legend(fontsize=9, loc='lower right')
def init():
line1.set_data([], [])
for line in lines:
line.set_data([], [])
return [line1] + lines
def update(frame):
idx = frame_indices[frame]
# 更新全局模型曲线
x_data = server.global_history['round'][:idx+1]
y_data = server.global_history['accuracy'][:idx+1]
line1.set_data(x_data, y_data)
# 更新各参与方曲线
for i, (line, history) in enumerate(zip(lines, local_histories)):
x_data = history['round'][:idx+1]
y_data = history['accuracy'][:idx+1]
line.set_data(x_data, y_data)
return [line1] + lines
anim = FuncAnimation(fig, update, frames=n_frames, init_func=init,
blit=True, interval=300, repeat=True)
anim.save(f'{output_dir}/federated_learning_animation.gif',
writer='pillow', fps=3, dpi=100)
plt.close()
print(" ✓ 训练动画已保存")
# ==============================================================================
# 步骤6:生成最终报告
# ==============================================================================
print("\n" + "="*80)
print("仿真完成!最终报告")
print("="*80)
print("\n【系统配置】")
print(f" 参与方数量: {n_clients}")
print(f" 通信轮数: {n_rounds}")
print(f" 每轮参与比例: {participation_rate*100:.0f}%")
print(f" 特征维度: 10")
print(f" 损伤类别: 4")
print("\n【性能对比】")
print(f" 联邦学习最终准确率: {fl_final_acc:.4f}")
print(f" 本地训练平均准确率: {np.mean(local_final_accs):.4f}")
print(f" 性能提升: {(fl_final_acc - np.mean(local_final_accs))*100:.2f}%")
print("\n【各参与方详情】")
for i, (client, local_acc) in enumerate(zip(clients, local_final_accs)):
dist_label = "IID" if client.data_distribution == 'iid' else "Non-IID"
print(f" 参与方 {i}: {client.num_bridges}座桥梁 | {dist_label} | "
f"样本数 {len(client.X_train)} | 本地准确率 {local_acc:.4f}")
print("\n【生成文件】")
print(f" 输出目录: {output_dir}")
print(" 1. federated_learning_architecture.png - 联邦学习系统架构")
print(" 2. training_comparison.png - 训练过程对比")
print(" 3. performance_analysis.png - 性能分析")
print(" 4. federated_algorithm_flow.png - 算法流程")
print(" 5. federated_learning_animation.gif - 训练动画")
print("\n" + "="*80)
print("联邦学习SHM仿真完成!")
print("="*80)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)