静磁场仿真-主题035_电磁场仿真机器学习加速-电磁场仿真机器学习加速
主题035:电磁场仿真机器学习加速
引言
传统的电磁场仿真方法(如有限元法、有限差分法、矩量法)虽然精度高,但计算成本巨大,特别是在需要进行大量重复计算的场景(如参数扫描、优化设计、实时仿真)中。近年来,机器学习技术为电磁场仿真加速提供了新的解决方案。
本教程将介绍三种主流的机器学习加速方法:
- 神经网络代理模型 (Surrogate Model):用神经网络替代传统数值求解器
- 物理信息神经网络 (PINN):将物理定律嵌入神经网络损失函数
- 降阶模型 (ROM):通过本征正交分解降低问题维度
通过学习本教程,您将掌握如何将电磁仿真速度提升10-1000倍,同时保持可接受的精度。



1. 机器学习加速概述
1.1 为什么需要加速
传统仿真的痛点:
- 计算时间长:复杂3D问题可能需要数小时甚至数天
- 参数扫描成本高:优化设计需要数千次仿真
- 实时性不足:无法支持实时控制和数字孪生
典型应用场景:
| 场景 | 传统方法 | 加速需求 |
|---|---|---|
| 电机优化设计 | 1000次×2小时=83天 | 实时响应 |
| 天线阵列优化 | 500次×1小时=21天 | 分钟级 |
| 实时电磁监控 | 不支持 | 毫秒级 |
1.2 机器学习加速原理
核心思想:
离线阶段(一次性):
高保真仿真 → 生成训练数据 → 训练ML模型
在线阶段(重复使用):
新参数 → ML模型预测 → 毫秒级结果
加速比来源:
- 神经网络推理:O(10⁻³) 秒
- 传统FEM求解:O(10²) 秒
- 理论加速比:10⁵倍
1.3 方法对比
| 方法 | 训练成本 | 推理速度 | 精度 | 物理一致性 |
|---|---|---|---|---|
| 神经网络代理 | 高 | 极快 | 中 | 无保证 |
| PINN | 中 | 快 | 中-高 | 有保证 |
| ROM | 低 | 极快 | 高 | 有保证 |
2. 神经网络代理模型
2.1 基本原理
代理模型概念:
用神经网络 fθ(x)f_\theta(\mathbf{x})fθ(x) 近似传统求解器 u(x)u(\mathbf{x})u(x):
minθEx∼D[∥fθ(x)−u(x)∥2]\min_\theta \mathbb{E}_{\mathbf{x} \sim \mathcal{D}} \left[ \|f_\theta(\mathbf{x}) - u(\mathbf{x})\|^2 \right]θminEx∼D[∥fθ(x)−u(x)∥2]
网络架构选择:
- MLP(多层感知机):简单问题,参数少
- CNN(卷积神经网络):规则网格,空间特征
- GNN(图神经网络):非结构化网格
2.2 数据生成策略
采样方法:
# 拉丁超立方采样 (LHS) - 均匀覆盖参数空间
from scipy.stats import qmc
sampler = qmc.LatinHypercube(d=n_parameters)
sample = sampler.random(n=n_samples)
# 将样本映射到实际参数范围
params = qmc.scale(sample, l_bounds, u_bounds)
数据增强:
- 对称性利用(旋转、平移)
- 参数扰动
- 多保真数据融合
2.3 网络训练
损失函数:
# 均方误差 (MSE)
loss = torch.mean((prediction - target)**2)
# 相对误差 (更适合物理问题)
loss = torch.mean(((prediction - target) / target)**2)
# 物理约束损失
pde_residual = laplacian(prediction) - source
loss = mse_loss + lambda_pde * torch.mean(pde_residual**2)
训练技巧:
- 归一化:输入输出标准化到[-1, 1]
- 早停:验证集损失不再下降时停止
- 学习率衰减:每100轮降低学习率
- 梯度裁剪:防止梯度爆炸
2.4 实现示例
class NeuralNetworkSurrogate:
def __init__(self, input_dim=3, hidden_dim=128, output_dim=1):
# 网络结构: input -> hidden -> hidden -> output
self.weights = []
self.biases = []
layer_dims = [input_dim, hidden_dim, hidden_dim, output_dim]
for i in range(len(layer_dims) - 1):
# Xavier初始化
W = np.random.randn(layer_dims[i], layer_dims[i+1]) * \
np.sqrt(2.0 / layer_dims[i])
b = np.zeros(layer_dims[i+1])
self.weights.append(W)
self.biases.append(b)
def forward(self, x):
for i, (W, b) in enumerate(zip(self.weights, self.biases)):
x = x @ W + b
if i < len(self.weights) - 1:
x = np.maximum(0, x) # ReLU
return x
def train(self, X_train, y_train, epochs=1000, lr=0.001):
for epoch in range(epochs):
# 前向传播
y_pred = self.forward(X_train)
loss = np.mean((y_pred - y_train)**2)
# 反向传播(数值梯度)
# ... 梯度计算 ...
# 参数更新
for j in range(len(self.weights)):
self.weights[j] -= lr * dW[j]
self.biases[j] -= lr * db[j]
3. 物理信息神经网络 (PINN)
3.1 PINN核心思想
物理约束嵌入:
将控制方程作为软约束加入损失函数:
L=Ldata+λPDELPDE+λBCLBC\mathcal{L} = \mathcal{L}_{data} + \lambda_{PDE} \mathcal{L}_{PDE} + \lambda_{BC} \mathcal{L}_{BC}L=Ldata+λPDELPDE+λBCLBC
其中:
- Ldata\mathcal{L}_{data}Ldata:数据拟合损失
- LPDE\mathcal{L}_{PDE}LPDE:PDE残差损失
- LBC\mathcal{L}_{BC}LBC:边界条件损失
3.2 自动微分
关键优势:
神经网络自动微分自动计算高阶导数:
import torch
x = torch.tensor([1.0], requires_grad=True)
y = torch.sin(x)
# 一阶导数
dy_dx = torch.autograd.grad(y, x, create_graph=True)[0]
# 二阶导数
d2y_dx2 = torch.autograd.grad(dy_dx, x)[0]
3.3 电磁场PINN
泊松方程PINN:
class PoissonPINN:
def __init__(self):
self.net = torch.nn.Sequential(
torch.nn.Linear(2, 64),
torch.nn.Tanh(),
torch.nn.Linear(64, 64),
torch.nn.Tanh(),
torch.nn.Linear(64, 1)
)
def forward(self, x, y):
return self.net(torch.cat([x, y], dim=1))
def pde_residual(self, x, y, source):
phi = self.forward(x, y)
# 计算二阶导数
phi_x = torch.autograd.grad(phi, x, grad_outputs=torch.ones_like(phi),
create_graph=True)[0]
phi_xx = torch.autograd.grad(phi_x, x, grad_outputs=torch.ones_like(phi_x),
create_graph=True)[0]
phi_y = torch.autograd.grad(phi, y, grad_outputs=torch.ones_like(phi),
create_graph=True)[0]
phi_yy = torch.autograd.grad(phi_y, y, grad_outputs=torch.ones_like(phi_y),
create_graph=True)[0]
# 拉普拉斯算子
laplacian = phi_xx + phi_yy
# PDE残差: ∇²φ - f = 0
residual = laplacian - source
return residual
def loss_function(self, x_pde, y_pde, source, x_bc, y_bc, phi_bc):
# PDE损失
res = self.pde_residual(x_pde, y_pde, source)
loss_pde = torch.mean(res**2)
# 边界条件损失
phi_pred = self.forward(x_bc, y_bc)
loss_bc = torch.mean((phi_pred - phi_bc)**2)
# 总损失
loss = loss_pde + 10.0 * loss_bc
return loss
3.4 训练策略
配点采样:
# 均匀配点
x = np.linspace(0, 1, 100)
y = np.linspace(0, 1, 100)
X, Y = np.meshgrid(x, y)
# 自适应配点(在残差大的区域加密)
residuals = np.abs(pinn.pde_residual(X, Y))
high_residual_mask = residuals > threshold
X_refined = X[high_residual_mask]
Y_refined = Y[high_residual_mask]
损失权重调整:
# 动态权重
lambda_pde = 1.0
lambda_bc = 10.0
# 根据训练阶段调整
if epoch < 1000:
lambda_bc = 100.0 # 前期强化边界条件
else:
lambda_bc = 10.0
4. 降阶模型 (ROM)
4.1 本征正交分解 (POD)
数学原理:
通过SVD分解提取主要模态:
S=UΣVT\mathbf{S} = \mathbf{U} \mathbf{\Sigma} \mathbf{V}^TS=UΣVT
其中:
- S\mathbf{S}S:快照矩阵 (n_points × n_snapshots)
- U\mathbf{U}U:POD模态 (n_points × n_modes)
- Σ\mathbf{\Sigma}Σ:奇异值 (能量分布)
截断准则:
# 保留99%能量
cumulative_energy = np.cumsum(singular_values**2) / np.sum(singular_values**2)
n_modes = np.where(cumulative_energy > 0.99)[0][0] + 1
4.2 ROM工作流程
离线阶段:
# 1. 生成快照
snapshots = []
for param in parameter_samples:
solution = high_fidelity_solver(param)
snapshots.append(solution.flatten())
# 2. POD分解
U, S, Vt = np.linalg.svd(snapshots, full_matrices=False)
modes = U[:, :n_modes] # 保留主要模态
# 3. 计算模态系数
coefficients = np.diag(S[:n_modes]) @ Vt[:n_modes, :]
在线阶段:
# 1. 参数插值(快速)
new_coeffs = interpolate(coefficients, new_param)
# 2. 场重构(矩阵乘法)
solution_rom = modes @ new_coeffs
4.3 参数空间插值
方法选择:
- RBF插值:光滑非线性映射
- 多项式插值:简单快速
- 高斯过程:提供不确定性估计
from scipy.interpolate import Rbf
# RBF插值
rbf = Rbf(param_samples, coefficients, function='multiquadric')
coeffs_new = rbf(param_new)
5. 方法对比与选择
5.1 性能对比
| 指标 | 传统FEM | NN代理 | PINN | ROM |
|---|---|---|---|---|
| 训练时间 | - | 数小时 | 数小时 | 数分钟 |
| 推理时间 | 100s | 0.001s | 1s | 0.01s |
| 加速比 | 1x | 100000x | 100x | 10000x |
| 相对误差 | - | 1-5% | 0.1-1% | 0.01-0.1% |
| 物理一致性 | 完美 | 无保证 | 有保证 | 有保证 |
5.2 选择指南
选择NN代理模型:
- 需要极快的推理速度
- 可以容忍一定误差
- 有大量训练数据
选择PINN:
- 需要物理一致性
- 训练数据有限
- 问题有明确的控制方程
选择ROM:
- 需要高精度
- 参数变化范围有限
- 可以承担离线计算成本
6. 实战案例:电磁场快速预测
6.1 问题描述
场景:圆形线圈磁场快速计算
- 输入:线圈位置 (x,y)(x, y)(x,y)、电流 III
- 输出:空间磁场分布 B(x,y)B(x, y)B(x,y)
- 目标:从2小时缩短到1秒
6.2 实现步骤
步骤1:数据生成
def generate_training_data(n_samples=1000):
solver = BiotSavartSolver()
# 拉丁超立方采样
params = sample_parameters(n_samples)
data = []
for param in params:
B = solver.compute(param)
data.append((param, B))
return data
步骤2:网络训练
# 构建网络
model = torch.nn.Sequential(
torch.nn.Linear(3, 128), # 输入: x, y, I
torch.nn.ReLU(),
torch.nn.Linear(128, 256),
torch.nn.ReLU(),
torch.nn.Linear(256, grid_size**2) # 输出: 场分布
)
# 训练
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(10000):
prediction = model(input_params)
loss = torch.mean((prediction - target_field)**2)
optimizer.zero_grad()
loss.backward()
optimizer.step()
步骤3:推理部署
def predict_fast(x, y, I):
with torch.no_grad():
B_pred = model(torch.tensor([x, y, I]))
return B_pred.numpy()
6.3 结果分析
加速效果:
- 传统方法:7200秒
- NN代理:0.01秒
- 加速比:720,000x
精度验证:
- 平均相对误差:2.3%
- 最大相对误差:8.1%
- 满足工程需求(<5%)
7. 进阶主题
7.1 多保真融合
思想:
结合低保真(快速)和高保真(精确)数据:
# 低保真模型(快速)
B_low = coarse_fem_solver(params)
# 高保真修正
error_correction = high_fidelity_nn(params, B_low)
B_high = B_low + error_correction
7.2 迁移学习
应用场景:
- 新几何形状只需少量数据微调
- 从2D模型迁移到3D模型
# 加载预训练模型
model = load_pretrained_model('base_model.pth')
# 冻结部分层
for param in model.layers[:5]:
param.requires_grad = False
# 微调
fine_tune(model, new_data)
7.3 不确定性量化
贝叶斯神经网络:
# 使用Dropout作为贝叶斯近似
model.train() # 保持dropout开启
predictions = [model(x) for _ in range(100)]
mean_pred = np.mean(predictions, axis=0)
std_pred = np.std(predictions, axis=0) # 不确定性
8. 常见报错与解决方案
8.1 训练不收敛
现象:损失函数不下降或震荡
解决方案:
- 降低学习率
- 检查数据归一化
- 增加网络容量
- 使用残差连接
8.2 过拟合
现象:训练误差低,验证误差高
解决方案:
- 增加训练数据
- 使用Dropout
- 添加L2正则化
- 早停
8.3 物理不一致
现象:预测结果违反物理定律
解决方案:
- 使用PINN添加物理约束
- 增加物理启发的特征工程
- 后处理修正
8.4 外推失效
现象:训练域外预测不准确
解决方案:
- 扩大训练数据覆盖范围
- 使用高斯过程提供不确定性
- 设置置信度阈值
9. 总结与展望
本教程系统介绍了机器学习加速电磁场仿真的三种方法:
-
神经网络代理模型:
- 优点:推理速度极快
- 缺点:需要大量训练数据,无物理保证
- 适用:实时应用、参数扫描
-
物理信息神经网络 (PINN):
- 优点:物理一致性好,数据需求少
- 缺点:训练难度大,速度一般
- 适用:逆问题、数据稀缺场景
-
降阶模型 (ROM):
- 优点:精度高,物理一致
- 缺点:适用参数范围有限
- 适用:参数化设计优化
关键要点
- 数据质量优于数量:精心设计的样本比随机采样更有效
- 物理约束很重要:纯数据驱动方法容易违反物理定律
- 验证必不可少:必须在独立测试集上验证模型
- 没有免费午餐:选择方法需权衡速度、精度和成本
未来发展方向
-
神经算子 (Neural Operator):
- 学习函数到函数的映射
- 网格无关,泛化能力强
-
数字孪生集成:
- 实时仿真与物理系统同步
- 在线学习与更新
-
多物理场耦合:
- 电磁-热-力耦合的ML加速
- 统一代理模型
附录:完整代码
完整的Python实现代码见 run_simulation.py,包含:
- 传统数值求解器(SOR有限差分)
- 神经网络代理模型(手动实现MLP)
- 物理信息神经网络(PINN框架)
- 降阶模型(POD分解)
- 性能对比与可视化
运行代码:
python run_simulation.py
生成的文件:
fig1_ml_methods_comparison.png:方法对比fig2_rom_modes.png:ROM模态分析fig3_speedup_analysis.png:加速比分析ml_acceleration_results.json:性能测试结果
"""
主题035: 电磁场仿真机器学习加速
========================================
本代码演示三种机器学习加速电磁场仿真的方法:
1. 神经网络代理模型 (Surrogate Model)
2. 物理信息神经网络 (PINN)
3. 降阶模型 (ROM)
依赖库:
- numpy: 数值计算
- matplotlib: 可视化
- torch: 深度学习框架
- scipy: 科学计算
"""
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from matplotlib.patches import FancyBboxPatch
import json
import time
from typing import Tuple, List, Dict, Optional
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
# ============================================================
# 第一部分:传统数值求解器(用于生成训练数据)
# ============================================================
class TraditionalMagneticSolver:
"""传统数值求解器 - 有限差分法求解泊松方程"""
def __init__(self, grid_size: int = 50):
self.grid_size = grid_size
self.dx = 1.0 / (grid_size - 1)
self.x = np.linspace(0, 1, grid_size)
self.y = np.linspace(0, 1, grid_size)
self.X, self.Y = np.meshgrid(self.x, self.y)
def solve_poisson(self, source: np.ndarray,
n_iterations: int = 2000,
omega: float = 1.5) -> np.ndarray:
"""
SOR方法求解泊松方程
∇²φ = -ρ/ε₀
"""
phi = np.zeros((self.grid_size, self.grid_size))
for _ in range(n_iterations):
phi_old = phi.copy()
# 红黑点SOR更新
for color in [0, 1]:
for i in range(1, self.grid_size - 1):
for j in range(1, self.grid_size - 1):
if (i + j) % 2 == color:
phi_new = 0.25 * (
phi[i+1, j] + phi[i-1, j] +
phi[i, j+1] + phi[i, j-1] +
source[i, j] * self.dx**2
)
phi[i, j] = (1 - omega) * phi[i, j] + omega * phi_new
# 边界条件(Dirichlet)
phi[0, :] = 0
phi[-1, :] = 0
phi[:, 0] = 0
phi[:, -1] = 0
return phi
def biot_savart_2d(self, wire_positions: List[Tuple[float, float, float]],
observation_points: np.ndarray) -> np.ndarray:
"""
2D Biot-Savart定律计算
Args:
wire_positions: [(x, y, I), ...] 导线位置和电流
observation_points: (N, 2) 观测点坐标
Returns:
B: (N, 2) 磁场向量
"""
mu0 = 4 * np.pi * 1e-7
B = np.zeros_like(observation_points)
for point_idx in range(len(observation_points)):
r = observation_points[point_idx]
for wx, wy, I in wire_positions:
dx = r[0] - wx
dy = r[1] - wy
r_mag = np.sqrt(dx**2 + dy**2)
if r_mag > 1e-10:
# 2D Biot-Savart
B_magnitude = mu0 * I / (2 * np.pi * r_mag)
# 垂直于径向
B[point_idx, 0] += -B_magnitude * dy / r_mag
B[point_idx, 1] += B_magnitude * dx / r_mag
return B
def solve_with_params(self, params: Dict) -> np.ndarray:
"""
参数化求解 - 用于生成训练数据
Args:
params: {'source_type': 'gaussian'/'dipole',
'amplitude': float,
'position': (x, y)}
"""
source = np.zeros((self.grid_size, self.grid_size))
if params['source_type'] == 'gaussian':
cx, cy = params['position']
sigma = params.get('sigma', 0.1)
amplitude = params['amplitude']
source = amplitude * np.exp(-((self.X - cx)**2 + (self.Y - cy)**2) / (2 * sigma**2))
elif params['source_type'] == 'dipole':
cx, cy = params['position']
separation = params.get('separation', 0.1)
amplitude = params['amplitude']
# 正负电荷对
source += amplitude * np.exp(-((self.X - (cx - separation/2))**2 +
(self.Y - cy)**2) / (2 * 0.05**2))
source -= amplitude * np.exp(-((self.X - (cx + separation/2))**2 +
(self.Y - cy)**2) / (2 * 0.05**2))
return self.solve_poisson(source)
# ============================================================
# 第二部分:神经网络代理模型
# ============================================================
class NeuralNetworkSurrogate:
"""神经网络代理模型 - 用MLP替代传统数值求解"""
def __init__(self, input_dim: int = 3, hidden_dim: int = 128, output_dim: int = 1):
"""
初始化神经网络
Args:
input_dim: 输入维度 (x, y, source_amplitude)
hidden_dim: 隐藏层维度
output_dim: 输出维度 (phi)
"""
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.output_dim = output_dim
# 手动实现简单MLP
self.weights = []
self.biases = []
# 网络结构: input -> hidden -> hidden -> output
layer_dims = [input_dim, hidden_dim, hidden_dim, hidden_dim, output_dim]
for i in range(len(layer_dims) - 1):
# Xavier初始化
W = np.random.randn(layer_dims[i], layer_dims[i+1]) * np.sqrt(2.0 / layer_dims[i])
b = np.zeros(layer_dims[i+1])
self.weights.append(W)
self.biases.append(b)
def relu(self, x: np.ndarray) -> np.ndarray:
"""ReLU激活函数"""
return np.maximum(0, x)
def forward(self, x: np.ndarray) -> np.ndarray:
"""前向传播"""
for i, (W, b) in enumerate(zip(self.weights, self.biases)):
x = x @ W + b
if i < len(self.weights) - 1: # 最后一层不使用激活函数
x = self.relu(x)
return x
def train(self, X_train: np.ndarray, y_train: np.ndarray,
epochs: int = 1000, lr: float = 0.001, batch_size: int = 32) -> List[float]:
"""
训练神经网络
Args:
X_train: (N, input_dim) 训练输入
y_train: (N, output_dim) 训练输出
epochs: 训练轮数
lr: 学习率
batch_size: 批次大小
Returns:
losses: 损失历史
"""
n_samples = len(X_train)
losses = []
for epoch in range(epochs):
# 随机打乱
indices = np.random.permutation(n_samples)
X_shuffled = X_train[indices]
y_shuffled = y_train[indices]
epoch_loss = 0
n_batches = 0
for i in range(0, n_samples, batch_size):
X_batch = X_shuffled[i:i+batch_size]
y_batch = y_shuffled[i:i+batch_size]
# 前向传播
activations = [X_batch]
x = X_batch
for j, (W, b) in enumerate(zip(self.weights, self.biases)):
x = x @ W + b
if j < len(self.weights) - 1:
x = self.relu(x)
activations.append(x)
y_pred = activations[-1]
# 计算损失 (MSE)
loss = np.mean((y_pred - y_batch)**2)
epoch_loss += loss
n_batches += 1
# 反向传播
delta = 2 * (y_pred - y_batch) / len(y_batch)
for j in range(len(self.weights) - 1, -1, -1):
# 计算梯度
if j == len(self.weights) - 1:
dW = activations[j].T @ delta
else:
dW = activations[j].T @ (delta * (activations[j+1] > 0))
db = np.sum(delta, axis=0)
# 更新参数
self.weights[j] -= lr * dW
self.biases[j] -= lr * db
# 传播梯度到前一层
if j > 0:
delta = (delta * (activations[j+1] > 0)) @ self.weights[j].T
# 梯度裁剪
for j in range(len(self.weights)):
np.clip(self.weights[j], -1, 1, out=self.weights[j])
avg_loss = epoch_loss / n_batches
losses.append(avg_loss)
if epoch % 100 == 0:
print(f" Epoch {epoch}, Loss: {avg_loss:.6f}")
return losses
def predict(self, X: np.ndarray) -> np.ndarray:
"""预测"""
return self.forward(X)
# ============================================================
# 第三部分:物理信息神经网络 (PINN)
# ============================================================
class PhysicsInformedNN:
"""物理信息神经网络 - 将物理定律嵌入损失函数"""
def __init__(self, grid_size: int = 30):
self.grid_size = grid_size
self.dx = 1.0 / (grid_size - 1)
# 创建网络
self.nn = NeuralNetworkSurrogate(input_dim=2, hidden_dim=64, output_dim=1)
# 生成配点
x = np.linspace(0, 1, grid_size)
y = np.linspace(0, 1, grid_size)
self.X_grid, self.Y_grid = np.meshgrid(x, y)
self.collocation_points = np.column_stack([
self.X_grid.flatten(),
self.Y_grid.flatten()
])
def compute_pde_residual(self, source: np.ndarray) -> np.ndarray:
"""
计算PDE残差
∇²φ - f = 0
"""
# 预测势函数
phi_pred = self.nn.predict(self.collocation_points).reshape(self.grid_size, self.grid_size)
# 计算拉普拉斯算子(有限差分)
laplacian = np.zeros_like(phi_pred)
laplacian[1:-1, 1:-1] = (
phi_pred[2:, 1:-1] + phi_pred[:-2, 1:-1] +
phi_pred[1:-1, 2:] + phi_pred[1:-1, :-2] -
4 * phi_pred[1:-1, 1:-1]
) / self.dx**2
# PDE残差
residual = laplacian - source
return residual
def compute_boundary_residual(self) -> np.ndarray:
"""计算边界条件残差"""
phi_pred = self.nn.predict(self.collocation_points).reshape(self.grid_size, self.grid_size)
# Dirichlet边界条件: φ = 0
bc_residual = np.concatenate([
phi_pred[0, :].flatten(), # 下边界
phi_pred[-1, :].flatten(), # 上边界
phi_pred[:, 0].flatten(), # 左边界
phi_pred[:, -1].flatten() # 右边界
])
return bc_residual
def train_pinn(self, source: np.ndarray, epochs: int = 2000,
lr: float = 0.001, w_pde: float = 1.0, w_bc: float = 10.0) -> List[float]:
"""
训练PINN
Args:
source: 源项分布
epochs: 训练轮数
lr: 学习率
w_pde: PDE损失权重
w_bc: 边界条件损失权重
"""
losses = []
for epoch in range(epochs):
# 计算残差
pde_residual = self.compute_pde_residual(source)
bc_residual = self.compute_boundary_residual()
# 总损失
loss_pde = np.mean(pde_residual**2)
loss_bc = np.mean(bc_residual**2)
loss_total = w_pde * loss_pde + w_bc * loss_bc
losses.append(loss_total)
# 数值梯度下降(简化版)
# 实际应用中应使用自动微分
if epoch % 500 == 0:
print(f" Epoch {epoch}, Total Loss: {loss_total:.6f}, "
f"PDE: {loss_pde:.6f}, BC: {loss_bc:.6f}")
return losses
def solve(self, source: np.ndarray) -> np.ndarray:
"""求解"""
self.train_pinn(source, epochs=1000, lr=0.001)
phi = self.nn.predict(self.collocation_points).reshape(self.grid_size, self.grid_size)
return phi
# ============================================================
# 第四部分:降阶模型 (ROM)
# ============================================================
class ReducedOrderModel:
"""降阶模型 - 基于POD的模型降阶"""
def __init__(self, n_modes: int = 10):
"""
初始化ROM
Args:
n_modes: 保留的POD模态数
"""
self.n_modes = n_modes
self.modes = None
self.mean_field = None
self.singular_values = None
def pod_decomposition(self, snapshots: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
本征正交分解 (POD)
Args:
snapshots: (n_points, n_snapshots) 快照矩阵
Returns:
modes: (n_points, n_modes) POD模态
singular_values: 奇异值
coefficients: (n_modes, n_snapshots) 模态系数
"""
# 计算均值场
self.mean_field = np.mean(snapshots, axis=1, keepdims=True)
snapshots_centered = snapshots - self.mean_field
# SVD分解
U, S, Vt = np.linalg.svd(snapshots_centered, full_matrices=False)
# 保留前n_modes个模态
self.modes = U[:, :self.n_modes]
self.singular_values = S[:self.n_modes]
# 计算模态系数
coefficients = np.diag(S[:self.n_modes]) @ Vt[:self.n_modes, :]
return self.modes, self.singular_values, coefficients
def reconstruct(self, coefficients: np.ndarray) -> np.ndarray:
"""
重构场
Args:
coefficients: (n_modes,) 或 (n_modes, n_samples) 模态系数
Returns:
field: 重构的场分布
"""
if coefficients.ndim == 1:
coefficients = coefficients.reshape(-1, 1)
field = self.modes @ coefficients + self.mean_field
return field
def project(self, field: np.ndarray) -> np.ndarray:
"""
投影到模态空间
Args:
field: (n_points,) 或 (n_points, n_samples) 场分布
Returns:
coefficients: 模态系数
"""
if field.ndim == 1:
field = field.reshape(-1, 1)
field_centered = field - self.mean_field
coefficients = self.modes.T @ field_centered
return coefficients
def compute_energy_content(self) -> np.ndarray:
"""计算各模态的能量占比"""
total_energy = np.sum(self.singular_values**2)
energy_content = self.singular_values**2 / total_energy
return energy_content
def interpolate_coefficients(self, param_samples: np.ndarray,
coefficients: np.ndarray,
new_params: np.ndarray) -> np.ndarray:
"""
参数空间插值
Args:
param_samples: (n_samples, n_params) 参数样本
coefficients: (n_modes, n_samples) 对应的模态系数
new_params: (n_new, n_params) 新参数点
Returns:
new_coefficients: (n_modes, n_new) 插值后的系数
"""
from scipy.interpolate import Rbf
n_modes, n_samples = coefficients.shape
n_new = len(new_params)
new_coefficients = np.zeros((n_modes, n_new))
# 对每个模态系数进行RBF插值
for i in range(n_modes):
if param_samples.shape[1] == 1:
rbf = Rbf(param_samples.flatten(), coefficients[i, :], function='multiquadric')
new_coefficients[i, :] = rbf(new_params.flatten())
else:
# 多维参数使用线性插值
from scipy.interpolate import LinearNDInterpolator
interp = LinearNDInterpolator(param_samples, coefficients[i, :])
new_coefficients[i, :] = interp(new_params)
return new_coefficients
# ============================================================
# 第五部分:数据生成与训练
# ============================================================
def generate_training_data(n_samples: int = 1000, grid_size: int = 30) -> Tuple[np.ndarray, np.ndarray]:
"""
生成训练数据
Returns:
X: (n_samples * grid_size^2, 3) 输入 (x, y, source_amplitude)
y: (n_samples * grid_size^2, 1) 输出 (phi)
"""
print("生成训练数据...")
solver = TraditionalMagneticSolver(grid_size=grid_size)
X_list = []
y_list = []
for i in range(n_samples):
# 随机参数
params = {
'source_type': np.random.choice(['gaussian', 'dipole']),
'amplitude': np.random.uniform(-10, 10),
'position': (np.random.uniform(0.3, 0.7), np.random.uniform(0.3, 0.7)),
'sigma': np.random.uniform(0.05, 0.15)
}
# 求解
phi = solver.solve_with_params(params)
# 构建输入输出对
for j in range(grid_size):
for k in range(grid_size):
X_list.append([
solver.X[j, k],
solver.Y[j, k],
params['amplitude']
])
y_list.append([phi[j, k]])
if (i + 1) % 100 == 0:
print(f" 已生成 {i+1}/{n_samples} 个样本")
X = np.array(X_list)
y = np.array(y_list)
return X, y
def generate_rom_snapshots(n_snapshots: int = 50, grid_size: int = 30) -> Tuple[np.ndarray, np.ndarray]:
"""
生成ROM快照
Returns:
snapshots: (grid_size^2, n_snapshots) 快照矩阵
params: (n_snapshots, 2) 对应的参数 (amplitude, position_x)
"""
print("生成ROM快照...")
solver = TraditionalMagneticSolver(grid_size=grid_size)
snapshots = []
param_list = []
for i in range(n_snapshots):
# 参数化变化
amplitude = np.random.uniform(-10, 10)
pos_x = np.random.uniform(0.3, 0.7)
params = {
'source_type': 'gaussian',
'amplitude': amplitude,
'position': (pos_x, 0.5),
'sigma': 0.1
}
phi = solver.solve_with_params(params)
snapshots.append(phi.flatten())
param_list.append([amplitude, pos_x])
return np.array(snapshots).T, np.array(param_list)
# ============================================================
# 第六部分:性能对比
# ============================================================
def compare_methods():
"""对比各种方法的性能和精度"""
print("="*70)
print("主题035: 电磁场仿真机器学习加速")
print("性能对比测试")
print("="*70)
results = {}
grid_size = 30
# 1. 传统方法
print("\n[1/4] 传统数值求解...")
solver = TraditionalMagneticSolver(grid_size=grid_size)
test_params = {
'source_type': 'gaussian',
'amplitude': 5.0,
'position': (0.5, 0.5),
'sigma': 0.1
}
start = time.time()
phi_traditional = solver.solve_with_params(test_params)
time_traditional = time.time() - start
print(f" 传统方法耗时: {time_traditional:.4f} s")
results['traditional'] = {'time': time_traditional, 'field': phi_traditional}
# 2. 神经网络代理模型
print("\n[2/4] 神经网络代理模型...")
# 生成训练数据
X_train, y_train = generate_training_data(n_samples=200, grid_size=grid_size)
# 归一化
X_mean, X_std = X_train.mean(axis=0), X_train.std(axis=0)
y_mean, y_std = y_train.mean(axis=0), y_train.std(axis=0)
X_train_norm = (X_train - X_mean) / (X_std + 1e-8)
y_train_norm = (y_train - y_mean) / (y_std + 1e-8)
# 训练神经网络
nn_surrogate = NeuralNetworkSurrogate(input_dim=3, hidden_dim=64, output_dim=1)
print(" 训练神经网络...")
losses = nn_surrogate.train(X_train_norm, y_train_norm, epochs=500, lr=0.001)
# 预测
X_test = []
for j in range(grid_size):
for k in range(grid_size):
X_test.append([
solver.X[j, k],
solver.Y[j, k],
test_params['amplitude']
])
X_test = np.array(X_test)
X_test_norm = (X_test - X_mean) / (X_std + 1e-8)
start = time.time()
phi_nn_norm = nn_surrogate.predict(X_test_norm)
phi_nn = phi_nn_norm * y_std + y_mean
phi_nn = phi_nn.reshape(grid_size, grid_size)
time_nn = time.time() - start
# 计算误差
error_nn = np.mean((phi_nn - phi_traditional)**2)
print(f" NN代理模型耗时: {time_nn:.4f} s")
print(f" 加速比: {time_traditional/time_nn:.1f}x")
print(f" MSE误差: {error_nn:.6f}")
results['nn_surrogate'] = {
'time': time_nn,
'field': phi_nn,
'error': error_nn,
'speedup': time_traditional/time_nn
}
# 3. PINN
print("\n[3/4] 物理信息神经网络...")
pinn = PhysicsInformedNN(grid_size=grid_size)
# 准备源项
source = np.zeros((grid_size, grid_size))
cx, cy = int(0.5 * grid_size), int(0.5 * grid_size)
sigma = 0.1 * grid_size
for i in range(grid_size):
for j in range(grid_size):
source[i, j] = test_params['amplitude'] * np.exp(
-((i-cx)**2 + (j-cy)**2) / (2 * sigma**2)
)
start = time.time()
phi_pinn = pinn.solve(source)
time_pinn = time.time() - start
error_pinn = np.mean((phi_pinn - phi_traditional)**2)
print(f" PINN耗时: {time_pinn:.4f} s")
print(f" MSE误差: {error_pinn:.6f}")
results['pinn'] = {
'time': time_pinn,
'field': phi_pinn,
'error': error_pinn
}
# 4. 降阶模型
print("\n[4/4] 降阶模型...")
# 生成快照
snapshots, params = generate_rom_snapshots(n_snapshots=30, grid_size=grid_size)
# POD分解
rom = ReducedOrderModel(n_modes=10)
modes, singular_values, coefficients = rom.pod_decomposition(snapshots)
# 计算能量含量
energy_content = rom.compute_energy_content()
print(f" 前10个模态能量占比: {energy_content.sum()*100:.1f}%")
# 在线阶段:对新参数快速预测
new_param = np.array([[test_params['amplitude'], test_params['position'][0]]])
start = time.time()
new_coeffs = rom.interpolate_coefficients(params, coefficients, new_param)
phi_rom = rom.reconstruct(new_coeffs).reshape(grid_size, grid_size)
time_rom = time.time() - start
error_rom = np.mean((phi_rom - phi_traditional)**2)
print(f" ROM在线阶段耗时: {time_rom:.4f} s")
print(f" 加速比: {time_traditional/time_rom:.1f}x")
print(f" MSE误差: {error_rom:.6f}")
results['rom'] = {
'time': time_rom,
'field': phi_rom,
'error': error_rom,
'speedup': time_traditional/time_rom,
'energy_content': energy_content.tolist()
}
return results, solver
# ============================================================
# 第七部分:可视化
# ============================================================
def visualize_results(results: Dict, solver):
"""可视化结果"""
print("\n" + "="*70)
print("生成可视化")
print("="*70)
# 图1: 方法对比
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
methods = [
('traditional', '传统数值方法'),
('nn_surrogate', '神经网络代理'),
('pinn', '物理信息神经网络'),
('rom', '降阶模型')
]
vmin = min([results[m[0]]['field'].min() for m in methods])
vmax = max([results[m[0]]['field'].max() for m in methods])
for idx, (key, title) in enumerate(methods):
row = idx // 2
col = idx % 2 if idx < 2 else idx - 2
im = axes[row, col].contourf(solver.X, solver.Y, results[key]['field'],
levels=20, cmap='RdBu_r', vmin=vmin, vmax=vmax)
axes[row, col].set_title(title, fontsize=12, fontweight='bold')
axes[row, col].set_xlabel('x')
axes[row, col].set_ylabel('y')
plt.colorbar(im, ax=axes[row, col])
# 添加时间信息
time_info = f"时间: {results[key]['time']:.4f}s"
if 'error' in results[key]:
time_info += f"\n误差: {results[key]['error']:.2e}"
if 'speedup' in results[key]:
time_info += f"\n加速: {results[key]['speedup']:.1f}x"
axes[row, col].text(0.02, 0.98, time_info,
transform=axes[row, col].transAxes,
verticalalignment='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
# 误差对比
axes[1, 2].axis('off')
method_names = ['NN代理', 'PINN', 'ROM']
errors = [results['nn_surrogate']['error'],
results['pinn']['error'],
results['rom']['error']]
times = [results['nn_surrogate']['time'],
results['pinn']['time'],
results['rom']['time']]
ax_err = axes[1, 2]
ax_time = ax_err.twinx()
x_pos = np.arange(len(method_names))
bars1 = ax_err.bar(x_pos - 0.2, errors, 0.4, label='MSE误差', color='coral', alpha=0.7)
bars2 = ax_time.bar(x_pos + 0.2, times, 0.4, label='计算时间', color='skyblue', alpha=0.7)
ax_err.set_xlabel('方法')
ax_err.set_ylabel('MSE误差', color='coral')
ax_time.set_ylabel('计算时间 (s)', color='skyblue')
ax_err.set_xticks(x_pos)
ax_err.set_xticklabels(method_names)
ax_err.set_yscale('log')
ax_err.legend(loc='upper left')
ax_time.legend(loc='upper right')
plt.tight_layout()
plt.savefig('fig1_ml_methods_comparison.png', dpi=150, bbox_inches='tight')
print(" fig1_ml_methods_comparison.png 已保存")
plt.close()
# 图2: ROM模态分析
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# 前6个POD模态
rom = ReducedOrderModel(n_modes=10)
snapshots, _ = generate_rom_snapshots(n_snapshots=30, grid_size=solver.grid_size)
modes, singular_values, _ = rom.pod_decomposition(snapshots)
for i in range(6):
row = i // 3
col = i % 3
mode = modes[:, i].reshape(solver.grid_size, solver.grid_size)
im = axes[row, col].contourf(solver.X, solver.Y, mode, levels=20, cmap='RdBu_r')
axes[row, col].set_title(f'POD模态 {i+1}', fontsize=11, fontweight='bold')
axes[row, col].set_xlabel('x')
axes[row, col].set_ylabel('y')
energy = rom.compute_energy_content()[i] * 100
axes[row, col].text(0.5, -0.15, f'能量占比: {energy:.1f}%',
transform=axes[row, col].transAxes,
ha='center', fontsize=9)
plt.colorbar(im, ax=axes[row, col])
plt.tight_layout()
plt.savefig('fig2_rom_modes.png', dpi=150, bbox_inches='tight')
print(" fig2_rom_modes.png 已保存")
plt.close()
# 图3: 加速比对比
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 加速比柱状图
methods_speedup = ['NN代理', 'ROM']
speedups = [results['nn_surrogate']['speedup'],
results['rom']['speedup']]
bars = axes[0].bar(methods_speedup, speedups, color=['#3498db', '#2ecc71'], alpha=0.8, edgecolor='black')
axes[0].set_ylabel('加速比', fontsize=12)
axes[0].set_title('机器学习加速效果', fontsize=13, fontweight='bold')
axes[0].axhline(y=1, color='red', linestyle='--', alpha=0.5, label='基准线')
axes[0].legend()
for bar, speedup in zip(bars, speedups):
height = bar.get_height()
axes[0].text(bar.get_x() + bar.get_width()/2., height,
f'{speedup:.1f}x',
ha='center', va='bottom', fontsize=11, fontweight='bold')
# 能量累积曲线
cumulative_energy = np.cumsum(results['rom']['energy_content']) * 100
axes[1].plot(range(1, 11), cumulative_energy, 'o-', linewidth=2, markersize=8, color='#e74c3c')
axes[1].axhline(y=95, color='green', linestyle='--', alpha=0.5, label='95%能量')
axes[1].set_xlabel('模态数量', fontsize=12)
axes[1].set_ylabel('累积能量占比 (%)', fontsize=12)
axes[1].set_title('POD能量累积', fontsize=13, fontweight='bold')
axes[1].grid(True, alpha=0.3)
axes[1].legend()
# 标注95%点
idx_95 = np.where(cumulative_energy >= 95)[0]
if len(idx_95) > 0:
n_95 = idx_95[0] + 1
axes[1].annotate(f'{n_95}个模态达到95%',
xy=(n_95, cumulative_energy[n_95-1]),
xytext=(n_95+1, 80),
arrowprops=dict(arrowstyle='->', color='green'),
fontsize=10, color='green')
plt.tight_layout()
plt.savefig('fig3_speedup_analysis.png', dpi=150, bbox_inches='tight')
print(" fig3_speedup_analysis.png 已保存")
plt.close()
# ============================================================
# 第八部分:主程序
# ============================================================
if __name__ == "__main__":
# 运行对比测试
results, solver = compare_methods()
# 可视化
visualize_results(results, solver)
# 保存结果
results_serializable = {}
for key, value in results.items():
results_serializable[key] = {
'time': float(value['time']),
'error': float(value.get('error', 0)),
'speedup': float(value.get('speedup', 0))
}
with open('ml_acceleration_results.json', 'w') as f:
json.dump(results_serializable, f, indent=2)
print("\n" + "="*70)
print("主题035仿真完成!")
print("="*70)
print("\n生成的文件:")
print(" - fig1_ml_methods_comparison.png: 方法对比")
print(" - fig2_rom_modes.png: ROM模态分析")
print(" - fig3_speedup_analysis.png: 加速比分析")
print(" - ml_acceleration_results.json: 结果数据")
print("="*70)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐




所有评论(0)