静磁场仿真-主题079_磁场测量与数据同化-磁场测量与数据同化
主题079:磁场测量与数据同化
目录





引言
1.1 背景与意义
磁场测量与数据同化是现代电磁场工程中的核心技术,在地球物理勘探、生物医学成像、工业无损检测、导航系统等领域具有广泛应用。传统的磁场仿真主要关注正向问题——即已知磁源分布计算磁场分布,而实际工程问题往往需要解决逆向问题——从有限的磁场测量数据中重构磁源信息或估计场分布。
数据同化(Data Assimilation)是一种将观测数据与数值模型相结合的先进技术,通过融合多源信息来提高对物理系统状态的估计精度。这一概念最初源于气象学和海洋学,现已成功应用于电磁场反演、磁异常探测、磁导航定位等领域。
1.2 学习目标
通过本教程的学习,读者将掌握:
- 磁场传感器原理:理解霍尔效应、磁阻效应、磁通门等不同类型传感器的工作原理和特性参数
- 传感器阵列设计:学习均匀布局、圆形布局、随机布局和优化布局的设计方法
- 磁场反演算法:掌握从测量数据重构磁源位置的最小二乘优化方法
- 卡尔曼滤波:实现移动磁源的实时跟踪和状态估计
- 数据同化技术:理解最优插值、集合卡尔曼滤波等数据融合方法
1.3 应用场景
磁场测量与数据同化的典型应用包括:
- 磁异常探测:地下矿产资源勘探、未爆弹药探测、考古调查
- 生物医学成像:磁脑图(MEG)、心磁图(MCG)、磁粒子成像(MPI)
- 工业检测:管道缺陷检测、焊缝质量评估、应力监测
- 导航定位:地磁导航、室内定位、水下导航
- 空间科学:卫星磁测、空间天气监测、行星探测
磁场测量基础
2.1 磁场的基本概念
磁场是描述磁力作用的空间分布的物理量。在静磁学中,磁场由两个互补的矢量场描述:
磁场强度 H:描述磁场的源特性,单位为安培/米(A/m)
磁感应强度 B:描述磁场对运动电荷的作用力,单位为特斯拉(T)
两者之间的关系由本构方程给出:
B = μ 0 ( H + M ) = μ 0 μ r H \mathbf{B} = \mu_0 (\mathbf{H} + \mathbf{M}) = \mu_0 \mu_r \mathbf{H} B=μ0(H+M)=μ0μrH
其中, μ 0 = 4 π × 10 − 7 \mu_0 = 4\pi \times 10^{-7} μ0=4π×10−7 H/m 是真空磁导率, μ r \mu_r μr 是相对磁导率, M \mathbf{M} M 是磁化强度。
2.2 磁偶极子场
点磁偶极子产生的磁场是磁场测量的基础模型。对于位于 r ′ \mathbf{r}' r′ 处、磁矩为 m \mathbf{m} m 的磁偶极子,在观测点 r \mathbf{r} r 处产生的磁感应强度为:
B ( r ) = μ 0 4 π r 3 [ 3 ( m ⋅ r ) r r 2 − m ] \mathbf{B}(\mathbf{r}) = \frac{\mu_0}{4\pi r^3} \left[ \frac{3(\mathbf{m} \cdot \mathbf{r})\mathbf{r}}{r^2} - \mathbf{m} \right] B(r)=4πr3μ0[r23(m⋅r)r−m]
其中, r = r − r ′ \mathbf{r} = \mathbf{r} - \mathbf{r}' r=r−r′ 是从磁源指向观测点的矢量, r = ∣ r ∣ r = |\mathbf{r}| r=∣r∣ 是距离。
在二维平面问题中,磁偶极子场可以简化为:
B x = μ 0 4 π r 3 [ 3 ( m x x + m y y ) x r 2 − m x ] B_x = \frac{\mu_0}{4\pi r^3} \left[ \frac{3(m_x x + m_y y)x}{r^2} - m_x \right] Bx=4πr3μ0[r23(mxx+myy)x−mx]
B y = μ 0 4 π r 3 [ 3 ( m x x + m y y ) y r 2 − m y ] B_y = \frac{\mu_0}{4\pi r^3} \left[ \frac{3(m_x x + m_y y)y}{r^2} - m_y \right] By=4πr3μ0[r23(mxx+myy)y−my]
其中, x x x 和 y y y 是相对于磁源位置的坐标, r = x 2 + y 2 r = \sqrt{x^2 + y^2} r=x2+y2。
2.3 测量坐标系
磁场测量需要明确定义坐标系。常用的坐标系包括:
直角坐标系:使用 B x B_x Bx, B y B_y By, B z B_z Bz 三个分量描述磁场
球坐标系:使用总场强度 B B B、磁偏角 D D D 和磁倾角 I I I 描述
B = B x 2 + B y 2 + B z 2 B = \sqrt{B_x^2 + B_y^2 + B_z^2} B=Bx2+By2+Bz2
D = arctan ( B y B x ) D = \arctan\left(\frac{B_y}{B_x}\right) D=arctan(BxBy)
I = arctan ( B z B x 2 + B y 2 ) I = \arctan\left(\frac{B_z}{\sqrt{B_x^2 + B_y^2}}\right) I=arctan Bx2+By2Bz
在二维问题中,通常只测量磁场在平面内的分量或总场幅值。
传感器模型与特性
3.1 霍尔效应传感器
工作原理:霍尔效应传感器基于霍尔效应,当电流垂直于外磁场通过半导体时,载流子受到洛伦兹力作用而在垂直于电流和磁场的方向上产生电势差(霍尔电压)。
V H = R H I B t V_H = \frac{R_H I B}{t} VH=tRHIB
其中, R H R_H RH 是霍尔系数, I I I 是工作电流, B B B 是磁感应强度, t t t 是半导体厚度。
特性参数:
- 灵敏度:50-500 mV/T
- 线性范围:±0.1 至 ±2 T
- 分辨率:0.1-1 mT
- 频率响应:DC 至 100 kHz
- 工作温度:-40°C 至 +150°C
优缺点:
- 优点:成本低、响应快、体积小、非接触测量
- 缺点:温度敏感、需要温度补偿、分辨率有限
3.2 磁阻传感器
工作原理:磁阻传感器利用某些材料的电阻随磁场变化的特性。各向异性磁阻(AMR)传感器的电阻变化与磁场方向和电流方向的夹角有关;巨磁阻(GMR)和隧道磁阻(TMR)传感器则利用多层薄膜结构的自旋相关输运特性。
AMR效应:
R ( θ ) = R m i n + ( R m a x − R m i n ) cos 2 θ R(\theta) = R_{min} + (R_{max} - R_{min}) \cos^2\theta R(θ)=Rmin+(Rmax−Rmin)cos2θ
其中, θ \theta θ 是磁场方向与电流方向的夹角。
特性参数:
- 灵敏度:1-50 mV/V/T
- 线性范围:±0.01 至 ±0.5 mT
- 分辨率:1-100 nT
- 频率响应:DC 至 10 MHz
优缺点:
- 优点:高灵敏度、高分辨率、低功耗
- 缺点:存在磁滞、需要置位/复位脉冲、温度敏感
3.3 磁通门传感器
工作原理:磁通门传感器利用高磁导率材料的饱和特性。激励线圈产生交变磁场使磁芯周期性饱和,被测磁场使饱和点偏移,通过检测输出线圈的电压变化测量外磁场。
特性参数:
- 灵敏度:1-100 μV/nT
- 线性范围:±10 至 ±1000 μT
- 分辨率:0.1-10 pT
- 频率响应:DC 至 10 kHz
优缺点:
- 优点:极高分辨率、低噪声、稳定性好
- 缺点:体积较大、功耗较高、成本较高
3.4 传感器噪声模型
实际传感器测量不可避免地包含噪声。常见的噪声来源包括:
热噪声(约翰逊噪声):
V n = 4 k B T R Δ f V_n = \sqrt{4k_B T R \Delta f} Vn=4kBTRΔf
其中, k B k_B kB 是玻尔兹曼常数, T T T 是温度, R R R 是电阻, Δ f \Delta f Δf 是带宽。
散粒噪声:
I n = 2 q I Δ f I_n = \sqrt{2qI\Delta f} In=2qIΔf
其中, q q q 是电子电荷, I I I 是电流。
1/f 噪声:低频时占主导,与频率成反比。
量化噪声:
σ q = Δ 12 \sigma_q = \frac{\Delta}{\sqrt{12}} σq=12Δ
其中, Δ \Delta Δ 是量化步长。
在仿真中,通常使用高斯白噪声模型:
B m e a s u r e d = B t r u e + ϵ , ϵ ∼ N ( 0 , σ 2 ) B_{measured} = B_{true} + \epsilon, \quad \epsilon \sim \mathcal{N}(0, \sigma^2) Bmeasured=Btrue+ϵ,ϵ∼N(0,σ2)
传感器阵列设计
4.1 阵列布局类型
传感器阵列的布局直接影响磁场测量的空间分辨率和反演精度。常见的布局包括:
均匀网格布局:
传感器均匀分布在矩形网格上,适用于大面积区域测量。
x i = x m i n + i − 1 n x − 1 ( x m a x − x m i n ) x_i = x_{min} + \frac{i-1}{n_x-1}(x_{max} - x_{min}) xi=xmin+nx−1i−1(xmax−xmin)
y j = y m i n + j − 1 n y − 1 ( y m a x − y m i n ) y_j = y_{min} + \frac{j-1}{n_y-1}(y_{max} - y_{min}) yj=ymin+ny−1j−1(ymax−ymin)
圆形布局:
传感器分布在圆周上,适用于点源定位问题。
x k = x c + r cos ( 2 π k n ) x_k = x_c + r \cos\left(\frac{2\pi k}{n}\right) xk=xc+rcos(n2πk)
y k = y c + r sin ( 2 π k n ) y_k = y_c + r \sin\left(\frac{2\pi k}{n}\right) yk=yc+rsin(n2πk)
随机布局:
传感器随机分布,可打破周期性伪影,但可能导致某些区域采样不足。
自适应布局:
根据先验信息或实时测量结果动态调整传感器位置。
4.2 最优设计准则
传感器布局优化通常基于以下准则:
D-最优准则:最大化Fisher信息矩阵的行列式
max det ( F ) \max \det(\mathbf{F}) maxdet(F)
其中,Fisher信息矩阵:
F = H T R − 1 H \mathbf{F} = \mathbf{H}^T \mathbf{R}^{-1} \mathbf{H} F=HTR−1H
H \mathbf{H} H 是灵敏度矩阵, R \mathbf{R} R 是测量噪声协方差矩阵。
A-最优准则:最小化参数估计协方差矩阵的迹
min tr ( F − 1 ) \min \text{tr}(\mathbf{F}^{-1}) mintr(F−1)
E-最优准则:最大化Fisher信息矩阵的最小特征值
max λ m i n ( F ) \max \lambda_{min}(\mathbf{F}) maxλmin(F)
4.3 灵敏度矩阵计算
灵敏度矩阵 H \mathbf{H} H 描述了测量值对模型参数的敏感程度:
H i j = ∂ B i ∂ p j H_{ij} = \frac{\partial B_i}{\partial p_j} Hij=∂pj∂Bi
其中, B i B_i Bi 是第 i i i 个传感器的测量值, p j p_j pj 是第 j j j 个模型参数。
对于磁源位置反演问题,灵敏度可以通过数值微分计算:
H i j ≈ B i ( p j + Δ p j ) − B i ( p j ) Δ p j H_{ij} \approx \frac{B_i(p_j + \Delta p_j) - B_i(p_j)}{\Delta p_j} Hij≈ΔpjBi(pj+Δpj)−Bi(pj)
4.4 空间采样定理
根据奈奎斯特采样定理,传感器间距应满足:
Δ x ≤ λ m i n 2 \Delta x \leq \frac{\lambda_{min}}{2} Δx≤2λmin
其中, λ m i n \lambda_{min} λmin 是磁场空间变化的最小波长。对于偶极子场,有效波长与磁源距离有关:
λ e f f ≈ 2 π d 3 \lambda_{eff} \approx \frac{2\pi d}{3} λeff≈32πd
其中, d d d 是传感器到磁源的距离。
磁场反演方法
5.1 反问题概述
磁场反演是从测量数据重构磁源参数或场分布的数学过程。与正问题相比,反问题通常具有以下特点:
不适定性:解可能不唯一、不稳定或不存在
非线性:磁场与磁源参数之间通常是非线性关系
欠定性:测量数据通常少于未知参数
5.2 最小二乘法
最小二乘法是最常用的反演方法,通过最小化预测值与测量值的残差平方和来估计参数:
min p S ( p ) = ∑ i = 1 n [ B i o b s − B i p r e d ( p ) ] 2 \min_{\mathbf{p}} S(\mathbf{p}) = \sum_{i=1}^{n} \left[ B_i^{obs} - B_i^{pred}(\mathbf{p}) \right]^2 pminS(p)=i=1∑n[Biobs−Bipred(p)]2
对于线性问题,解析解为:
p = ( H T H ) − 1 H T B o b s \mathbf{p} = (\mathbf{H}^T \mathbf{H})^{-1} \mathbf{H}^T \mathbf{B}^{obs} p=(HTH)−1HTBobs
对于非线性问题,需要使用迭代优化算法,如Levenberg-Marquardt算法、拟牛顿法等。
5.3 正则化方法
由于反问题的不适定性,直接求解可能导致噪声放大。正则化方法通过引入先验约束来稳定解:
Tikhonov正则化(岭回归):
min p { ∥ B o b s − B p r e d ( p ) ∥ 2 + λ ∥ p ∥ 2 } \min_{\mathbf{p}} \left\{ \|\mathbf{B}^{obs} - \mathbf{B}^{pred}(\mathbf{p})\|^2 + \lambda \|\mathbf{p}\|^2 \right\} pmin{∥Bobs−Bpred(p)∥2+λ∥p∥2}
其中, λ \lambda λ 是正则化参数,控制拟合精度与解的平滑度之间的权衡。
LASSO正则化:
min p { ∥ B o b s − B p r e d ( p ) ∥ 2 + λ ∥ p ∥ 1 } \min_{\mathbf{p}} \left\{ \|\mathbf{B}^{obs} - \mathbf{B}^{pred}(\mathbf{p})\|^2 + \lambda \|\mathbf{p}\|_1 \right\} pmin{∥Bobs−Bpred(p)∥2+λ∥p∥1}
LASSO可以产生稀疏解,适用于磁源数量未知的情况。
总变差正则化:
min p { ∥ B o b s − B p r e d ( p ) ∥ 2 + λ T V ( p ) } \min_{\mathbf{p}} \left\{ \|\mathbf{B}^{obs} - \mathbf{B}^{pred}(\mathbf{p})\|^2 + \lambda TV(\mathbf{p}) \right\} pmin{∥Bobs−Bpred(p)∥2+λTV(p)}
TV正则化可以保留边缘信息,适用于图像重建。
5.4 贝叶斯反演
贝叶斯方法将反演问题转化为概率推断问题:
p ( p ∣ B o b s ) = p ( B o b s ∣ p ) p ( p ) p ( B o b s ) p(\mathbf{p} | \mathbf{B}^{obs}) = \frac{p(\mathbf{B}^{obs} | \mathbf{p}) p(\mathbf{p})}{p(\mathbf{B}^{obs})} p(p∣Bobs)=p(Bobs)p(Bobs∣p)p(p)
其中:
- p ( p ∣ B o b s ) p(\mathbf{p} | \mathbf{B}^{obs}) p(p∣Bobs) 是后验分布
- p ( B o b s ∣ p ) p(\mathbf{B}^{obs} | \mathbf{p}) p(Bobs∣p) 是似然函数
- p ( p ) p(\mathbf{p}) p(p) 是先验分布
- p ( B o b s ) p(\mathbf{B}^{obs}) p(Bobs) 是证据(归一化常数)
最大后验估计(MAP):
p M A P = arg max p [ ln p ( B o b s ∣ p ) + ln p ( p ) ] \mathbf{p}_{MAP} = \arg\max_{\mathbf{p}} \left[ \ln p(\mathbf{B}^{obs} | \mathbf{p}) + \ln p(\mathbf{p}) \right] pMAP=argpmax[lnp(Bobs∣p)+lnp(p)]
5.5 反演不确定性量化
反演结果的不确定性可以通过以下方法量化:
协方差矩阵:
C p = ( H T C B − 1 H ) − 1 \mathbf{C}_p = (\mathbf{H}^T \mathbf{C}_B^{-1} \mathbf{H})^{-1} Cp=(HTCB−1H)−1
其中, C B \mathbf{C}_B CB 是测量误差协方差矩阵。
置信区间:参数 p j p_j pj 的95%置信区间为:
p j ± 1.96 C p , j j p_j \pm 1.96 \sqrt{C_{p,jj}} pj±1.96Cp,jj
后验采样:使用马尔可夫链蒙特卡洛(MCMC)方法从后验分布中采样,获得参数的不确定性分布。
卡尔曼滤波
6.1 状态空间模型
卡尔曼滤波基于离散时间状态空间模型:
状态方程:
x k + 1 = F k x k + G k u k + w k \mathbf{x}_{k+1} = \mathbf{F}_k \mathbf{x}_k + \mathbf{G}_k \mathbf{u}_k + \mathbf{w}_k xk+1=Fkxk+Gkuk+wk
观测方程:
z k = H k x k + v k \mathbf{z}_k = \mathbf{H}_k \mathbf{x}_k + \mathbf{v}_k zk=Hkxk+vk
其中:
- x k \mathbf{x}_k xk 是状态向量
- z k \mathbf{z}_k zk 是观测向量
- F k \mathbf{F}_k Fk 是状态转移矩阵
- H k \mathbf{H}_k Hk 是观测矩阵
- w k ∼ N ( 0 , Q k ) \mathbf{w}_k \sim \mathcal{N}(0, \mathbf{Q}_k) wk∼N(0,Qk) 是过程噪声
- v k ∼ N ( 0 , R k ) \mathbf{v}_k \sim \mathcal{N}(0, \mathbf{R}_k) vk∼N(0,Rk) 是观测噪声
6.2 标准卡尔曼滤波算法
卡尔曼滤波包含两个步骤:预测和更新。
预测步骤:
x ^ k ∣ k − 1 = F k x ^ k − 1 ∣ k − 1 \hat{\mathbf{x}}_{k|k-1} = \mathbf{F}_k \hat{\mathbf{x}}_{k-1|k-1} x^k∣k−1=Fkx^k−1∣k−1
P k ∣ k − 1 = F k P k − 1 ∣ k − 1 F k T + Q k \mathbf{P}_{k|k-1} = \mathbf{F}_k \mathbf{P}_{k-1|k-1} \mathbf{F}_k^T + \mathbf{Q}_k Pk∣k−1=FkPk−1∣k−1FkT+Qk
更新步骤:
创新:
y ~ k = z k − H k x ^ k ∣ k − 1 \tilde{\mathbf{y}}_k = \mathbf{z}_k - \mathbf{H}_k \hat{\mathbf{x}}_{k|k-1} y~k=zk−Hkx^k∣k−1
创新协方差:
S k = H k P k ∣ k − 1 H k T + R k \mathbf{S}_k = \mathbf{H}_k \mathbf{P}_{k|k-1} \mathbf{H}_k^T + \mathbf{R}_k Sk=HkPk∣k−1HkT+Rk
卡尔曼增益:
K k = P k ∣ k − 1 H k T S k − 1 \mathbf{K}_k = \mathbf{P}_{k|k-1} \mathbf{H}_k^T \mathbf{S}_k^{-1} Kk=Pk∣k−1HkTSk−1
状态更新:
x ^ k ∣ k = x ^ k ∣ k − 1 + K k y ~ k \hat{\mathbf{x}}_{k|k} = \hat{\mathbf{x}}_{k|k-1} + \mathbf{K}_k \tilde{\mathbf{y}}_k x^k∣k=x^k∣k−1+Kky~k
协方差更新:
P k ∣ k = ( I − K k H k ) P k ∣ k − 1 \mathbf{P}_{k|k} = (\mathbf{I} - \mathbf{K}_k \mathbf{H}_k) \mathbf{P}_{k|k-1} Pk∣k=(I−KkHk)Pk∣k−1
或使用Joseph形式提高数值稳定性:
P k ∣ k = ( I − K k H k ) P k ∣ k − 1 ( I − K k H k ) T + K k R k K k T \mathbf{P}_{k|k} = (\mathbf{I} - \mathbf{K}_k \mathbf{H}_k) \mathbf{P}_{k|k-1} (\mathbf{I} - \mathbf{K}_k \mathbf{H}_k)^T + \mathbf{K}_k \mathbf{R}_k \mathbf{K}_k^T Pk∣k=(I−KkHk)Pk∣k−1(I−KkHk)T+KkRkKkT
6.3 扩展卡尔曼滤波(EKF)
对于非线性系统:
x k + 1 = f ( x k , u k ) + w k \mathbf{x}_{k+1} = \mathbf{f}(\mathbf{x}_k, \mathbf{u}_k) + \mathbf{w}_k xk+1=f(xk,uk)+wk
z k = h ( x k ) + v k \mathbf{z}_k = \mathbf{h}(\mathbf{x}_k) + \mathbf{v}_k zk=h(xk)+vk
EKF通过线性化处理非线性函数:
雅可比矩阵:
F k = ∂ f ∂ x ∣ x ^ k − 1 ∣ k − 1 \mathbf{F}_k = \left. \frac{\partial \mathbf{f}}{\partial \mathbf{x}} \right|_{\hat{\mathbf{x}}_{k-1|k-1}} Fk=∂x∂f x^k−1∣k−1
H k = ∂ h ∂ x ∣ x ^ k ∣ k − 1 \mathbf{H}_k = \left. \frac{\partial \mathbf{h}}{\partial \mathbf{x}} \right|_{\hat{\mathbf{x}}_{k|k-1}} Hk=∂x∂h x^k∣k−1
然后应用标准卡尔曼滤波公式。
6.4 无迹卡尔曼滤波(UKF)
UKF使用无迹变换(Unscented Transform)来近似非线性变换后的均值和协方差,避免了计算雅可比矩阵。
Sigma点选择:
χ 0 = x ^ \chi_0 = \hat{\mathbf{x}} χ0=x^
χ i = x ^ + ( ( n + λ ) P ) i , i = 1 , … , n \chi_i = \hat{\mathbf{x}} + (\sqrt{(n+\lambda)\mathbf{P}})_i, \quad i=1,\ldots,n χi=x^+((n+λ)P)i,i=1,…,n
χ i = x ^ − ( ( n + λ ) P ) i − n , i = n + 1 , … , 2 n \chi_i = \hat{\mathbf{x}} - (\sqrt{(n+\lambda)\mathbf{P}})_{i-n}, \quad i=n+1,\ldots,2n χi=x^−((n+λ)P)i−n,i=n+1,…,2n
其中, λ = α 2 ( n + κ ) − n \lambda = \alpha^2(n+\kappa) - n λ=α2(n+κ)−n 是缩放参数。
权重:
W 0 ( m ) = λ n + λ W_0^{(m)} = \frac{\lambda}{n+\lambda} W0(m)=n+λλ
W 0 ( c ) = λ n + λ + ( 1 − α 2 + β ) W_0^{(c)} = \frac{\lambda}{n+\lambda} + (1-\alpha^2+\beta) W0(c)=n+λλ+(1−α2+β)
W i ( m ) = W i ( c ) = 1 2 ( n + λ ) , i = 1 , … , 2 n W_i^{(m)} = W_i^{(c)} = \frac{1}{2(n+\lambda)}, \quad i=1,\ldots,2n Wi(m)=Wi(c)=2(n+λ)1,i=1,…,2n
6.5 粒子滤波
粒子滤波使用蒙特卡洛方法近似后验分布,适用于高度非线性、非高斯系统。
算法步骤:
-
初始化:从先验分布采样 N N N 个粒子 x 0 ( i ) \mathbf{x}_0^{(i)} x0(i)
-
预测:根据状态方程传播粒子
x k ( i ) = f ( x k − 1 ( i ) , u k − 1 ) + w k − 1 ( i ) \mathbf{x}_k^{(i)} = \mathbf{f}(\mathbf{x}_{k-1}^{(i)}, \mathbf{u}_{k-1}) + \mathbf{w}_{k-1}^{(i)} xk(i)=f(xk−1(i),uk−1)+wk−1(i)
- 更新:根据观测值计算权重
w k ( i ) = p ( z k ∣ x k ( i ) ) w_k^{(i)} = p(\mathbf{z}_k | \mathbf{x}_k^{(i)}) wk(i)=p(zk∣xk(i))
- 归一化:
w ~ k ( i ) = w k ( i ) ∑ j = 1 N w k ( j ) \tilde{w}_k^{(i)} = \frac{w_k^{(i)}}{\sum_{j=1}^{N} w_k^{(j)}} w~k(i)=∑j=1Nwk(j)wk(i)
-
重采样:根据权重重采样粒子
-
估计:
x ^ k = ∑ i = 1 N w ~ k ( i ) x k ( i ) \hat{\mathbf{x}}_k = \sum_{i=1}^{N} \tilde{w}_k^{(i)} \mathbf{x}_k^{(i)} x^k=i=1∑Nw~k(i)xk(i)
数据同化框架
7.1 数据同化概述
数据同化是将观测数据与数值模型预测相结合,以获得对系统状态的最佳估计。主要方法包括:
变分方法:三维变分(3D-Var)、四维变分(4D-Var)
统计方法:最优插值(OI)、卡尔曼滤波(KF)、集合卡尔曼滤波(EnKF)
混合方法:集合变分方法(EnVar)
7.2 最优插值(OI)
最优插值是一种基于统计理论的数据同化方法,假设背景场和观测误差都是高斯分布。
分析场:
x a = x b + K ( y o − H x b ) \mathbf{x}_a = \mathbf{x}_b + \mathbf{K}(\mathbf{y}_o - \mathbf{H}\mathbf{x}_b) xa=xb+K(yo−Hxb)
其中, x b \mathbf{x}_b xb 是背景场, y o \mathbf{y}_o yo 是观测值, H \mathbf{H} H 是观测算子。
增益矩阵:
K = P b H T ( H P b H T + R ) − 1 \mathbf{K} = \mathbf{P}_b \mathbf{H}^T (\mathbf{H}\mathbf{P}_b\mathbf{H}^T + \mathbf{R})^{-1} K=PbHT(HPbHT+R)−1
分析误差协方差:
P a = ( I − K H ) P b \mathbf{P}_a = (\mathbf{I} - \mathbf{K}\mathbf{H})\mathbf{P}_b Pa=(I−KH)Pb
或:
P a = P b − P b H T ( H P b H T + R ) − 1 H P b \mathbf{P}_a = \mathbf{P}_b - \mathbf{P}_b\mathbf{H}^T(\mathbf{H}\mathbf{P}_b\mathbf{H}^T + \mathbf{R})^{-1}\mathbf{H}\mathbf{P}_b Pa=Pb−PbHT(HPbHT+R)−1HPb
7.3 集合卡尔曼滤波(EnKF)
EnKF使用蒙特卡洛集合来表示误差统计,避免了计算和存储庞大的协方差矩阵。
集合表示:
X = [ x ( 1 ) , x ( 2 ) , … , x ( N ) ] \mathbf{X} = [\mathbf{x}^{(1)}, \mathbf{x}^{(2)}, \ldots, \mathbf{x}^{(N)}] X=[x(1),x(2),…,x(N)]
集合均值:
x ˉ = 1 N ∑ i = 1 N x ( i ) \bar{\mathbf{x}} = \frac{1}{N} \sum_{i=1}^{N} \mathbf{x}^{(i)} xˉ=N1i=1∑Nx(i)
集合协方差:
P ≈ 1 N − 1 X ′ X ′ T \mathbf{P} \approx \frac{1}{N-1} \mathbf{X}' \mathbf{X}'^T P≈N−11X′X′T
其中, X ′ = X − x ˉ 1 T \mathbf{X}' = \mathbf{X} - \bar{\mathbf{x}}\mathbf{1}^T X′=X−xˉ1T 是扰动矩阵。
分析步骤:
-
对每个集合成员进行预测
-
计算集合均值和协方差
-
计算卡尔曼增益
-
更新集合成员:
x a ( i ) = x b ( i ) + K ( y o ( i ) − H x b ( i ) ) \mathbf{x}_a^{(i)} = \mathbf{x}_b^{(i)} + \mathbf{K}(\mathbf{y}_o^{(i)} - \mathbf{H}\mathbf{x}_b^{(i)}) xa(i)=xb(i)+K(yo(i)−Hxb(i))
其中, y o ( i ) \mathbf{y}_o^{(i)} yo(i) 是扰动观测。
7.4 协方差局地化
由于集合规模有限,估计的协方差矩阵可能存在虚假相关。局地化通过Schur乘积抑制远距离相关:
P l o c = C ∘ P \mathbf{P}_{loc} = \mathbf{C} \circ \mathbf{P} Ploc=C∘P
其中, C \mathbf{C} C 是局地化相关矩阵,通常使用Gaspari-Cohn函数:
c ( r ) = { 1 − 5 3 ( r L ) 2 + 5 8 ( r L ) 3 + 1 2 ( r L ) 4 − 1 4 ( r L ) 5 0 ≤ r ≤ L 4 − 5 r L + 5 3 ( r L ) 2 + 5 8 ( r L ) 3 − 1 2 ( r L ) 4 + 1 12 ( r L ) 5 L < r ≤ 2 L 0 r > 2 L c(r) = \begin{cases} 1 - \frac{5}{3}\left(\frac{r}{L}\right)^2 + \frac{5}{8}\left(\frac{r}{L}\right)^3 + \frac{1}{2}\left(\frac{r}{L}\right)^4 - \frac{1}{4}\left(\frac{r}{L}\right)^5 & 0 \leq r \leq L \\ 4 - 5\frac{r}{L} + \frac{5}{3}\left(\frac{r}{L}\right)^2 + \frac{5}{8}\left(\frac{r}{L}\right)^3 - \frac{1}{2}\left(\frac{r}{L}\right)^4 + \frac{1}{12}\left(\frac{r}{L}\right)^5 & L < r \leq 2L \\ 0 & r > 2L \end{cases} c(r)=⎩ ⎨ ⎧1−35(Lr)2+85(Lr)3+21(Lr)4−41(Lr)54−5Lr+35(Lr)2+85(Lr)3−21(Lr)4+121(Lr)500≤r≤LL<r≤2Lr>2L
7.5 膨胀技术
由于采样误差和模型误差,集合往往会低估真实的不确定性。膨胀技术通过放大集合扰动来解决这个问题:
乘法膨胀:
x ′ ( i ) ← α x ′ ( i ) \mathbf{x}'^{(i)} \leftarrow \alpha \mathbf{x}'^{(i)} x′(i)←αx′(i)
其中, α > 1 \alpha > 1 α>1 是膨胀系数。
加法膨胀:
x ( i ) ← x ( i ) + η ( i ) \mathbf{x}^{(i)} \leftarrow \mathbf{x}^{(i)} + \boldsymbol{\eta}^{(i)} x(i)←x(i)+η(i)
其中, η ( i ) ∼ N ( 0 , Q ) \boldsymbol{\eta}^{(i)} \sim \mathcal{N}(0, \mathbf{Q}) η(i)∼N(0,Q) 是随机扰动。
Python实现详解
8.1 代码结构概览
本教程的Python代码实现了完整的磁场测量与数据同化流程,主要包含以下模块:
# 配置参数
class MeasurementConfig:
"""磁场测量配置参数"""
nx: int = 50 # x方向网格数
ny: int = 50 # y方向网格数
n_sensors: int = 16 # 传感器数量
sensor_noise_std: float = 0.01 # 传感器噪声标准差
...
# 磁场正演模型
class MagneticFieldModel:
"""磁场正演模型"""
def compute_field_dipole(...)
def compute_field(...)
...
# 传感器模型
class MagneticSensor:
"""磁场传感器模型"""
def measure(...)
...
# 传感器阵列
class SensorArray:
"""传感器阵列"""
def uniform_layout(...)
def circular_layout(...)
def optimized_layout(...)
...
# 卡尔曼滤波
class KalmanFilter:
"""卡尔曼滤波器"""
def predict(...)
def update(...)
...
# 磁场反演
class MagneticInversion:
"""磁场反演"""
def invert(...)
...
# 数据同化
class DataAssimilation:
"""数据同化框架"""
def ensemble_kalman_filter(...)
def optimal_interpolation(...)
...
8.2 磁场正演模型实现
磁场正演模型是仿真的基础,用于计算给定磁源分布产生的磁场:
class MagneticFieldModel:
def compute_field_dipole(self, x, y, source_pos, moment):
"""
计算单个偶极子在观测点产生的磁场
参数:
x, y: 观测点坐标
source_pos: 磁源位置 [x_s, y_s]
moment: 磁矩向量 [m_x, m_y]
返回:
Bx, By: 磁场分量
"""
# 计算相对位置
dx = x - source_pos[0]
dy = y - source_pos[1]
r = np.sqrt(dx**2 + dy**2)
# 避免除零
if r < 1e-10:
return 0.0, 0.0
# 磁偶极子场公式
r_vec = np.array([dx, dy])
m_dot_r = np.dot(moment, r_vec)
# 磁场系数
coeff = self.config.mu0 / (4 * np.pi * r**3)
# 计算磁场向量
B = coeff * (3 * m_dot_r * r_vec / r**2 - moment)
return B[0], B[1]
代码解析:
- 第1-2行:计算观测点相对于磁源的位移
- 第4行:计算距离,用于后续场强计算
- 第6-7行:防止距离过近导致的数值不稳定
- 第9-10行:计算磁矩与位置矢量的点积
- 第12行:应用磁偶极子场公式中的系数
- 第14行:计算完整的磁场向量
- 第16行:返回磁场分量
8.3 传感器模型实现
传感器模型模拟真实传感器的测量过程,包括噪声和非线性效应:
class MagneticSensor:
def measure(self, B_true, position=None):
"""
模拟传感器测量
参数:
B_true: 真实磁场值
position: 传感器位置(可选)
返回:
B_measured: 测量值(含噪声)
"""
# 添加高斯噪声
noise = np.random.normal(
0,
self.config.sensor_noise_std,
size=B_true.shape
)
# 添加量化误差
quantization = self.resolution * np.random.uniform(
-0.5, 0.5,
size=B_true.shape
)
# 添加非线性(饱和效应)
B_saturated = np.clip(
B_true,
-self.linear_range,
self.linear_range
)
# 合成测量值
B_measured = B_saturated + noise + quantization
return B_measured
代码解析:
- 第9-13行:生成高斯白噪声,模拟热噪声等随机干扰
- 第15-19行:添加量化噪声,模拟ADC转换过程
- 第21-25行:应用饱和非线性,模拟传感器的线性范围限制
- 第27行:将各项误差叠加到真实值上
8.4 传感器布局优化实现
基于D-最优准则的传感器布局优化:
def optimized_layout(self, field_model):
"""优化传感器布局(基于D-最优准则)"""
n = self.config.n_sensors
L = self.config.domain_size
# 初始布局
x0 = self.uniform_layout().flatten()
# 目标函数:最大化Fisher信息矩阵的行列式
def objective(x):
positions = x.reshape(n, 2)
# 边界检查
if np.any(positions < 0) or np.any(positions > L):
return 1e10
# 计算灵敏度矩阵
H = self.compute_sensitivity_matrix(positions, field_model)
# Fisher信息矩阵
I = H.T @ H
# 最小化负对数行列式
try:
sign, logdet = np.linalg.slogdet(I)
if sign <= 0:
return 1e10
return -logdet
except:
return 1e10
# 执行优化
bounds = [(0, L) for _ in range(2*n)]
result = minimize(objective, x0, method='L-BFGS-B', bounds=bounds)
self.positions = result.x.reshape(n, 2)
return self.positions
代码解析:
- 第6行:使用均匀布局作为初始猜测
- 第9-25行:定义目标函数,计算负对数行列式
- 第13-14行:检查约束条件,惩罚越界解
- 第16-19行:计算Fisher信息矩阵
- 第21-25行:使用对数行列式避免数值溢出
- 第28-29行:设置边界约束并调用优化器
8.5 卡尔曼滤波实现
标准卡尔曼滤波的预测和更新步骤:
class KalmanFilter:
def predict(self, F, Q):
"""
预测步骤
参数:
F: 状态转移矩阵
Q: 过程噪声协方差
"""
# 状态预测
self.x = F @ self.x
# 协方差预测
self.P = F @ self.P @ F.T + Q
def update(self, z, H, R):
"""
更新步骤
参数:
z: 测量值
H: 观测矩阵
R: 测量噪声协方差
"""
# 创新
y = z - H @ self.x
# 创新协方差
S = H @ self.P @ H.T + R
# 卡尔曼增益
K = self.P @ H.T @ inv(S)
# 状态更新
self.x = self.x + K @ y
# 协方差更新 (Joseph形式)
I_KH = np.eye(self.n_state) - K @ H
self.P = I_KH @ self.P @ I_KH.T + K @ R @ K.T
代码解析:
-
predict方法:- 第11行:状态外推
- 第14行:协方差外推,包含过程噪声
-
update方法:- 第24行:计算新息(测量残差)
- 第27行:计算新息协方差
- 第30行:计算卡尔曼增益,权衡预测与测量
- 第33行:更新状态估计
- 第36-37行:使用Joseph形式更新协方差,保证正定性
8.6 磁场反演实现
基于最小二乘的磁场反演:
class MagneticInversion:
def invert(self, measurements, sensor_positions, field_model):
"""
执行磁场反演
参数:
measurements: 测量值
sensor_positions: 传感器位置
field_model: 磁场模型
返回:
result: 反演结果字典
"""
n_sources = self.config.n_sources
# 初始猜测
L = self.config.domain_size
initial_guess = np.random.uniform(
0.2*L, 0.8*L,
size=4*n_sources
)
# 设置边界约束
bounds = []
for i in range(n_sources):
bounds.extend([(0, L), (0, L)]) # 位置边界
bounds.extend([(-2, 2), (-2, 2)]) # 磁矩边界
# 执行优化
result = minimize(
self.objective_function,
initial_guess,
args=(measurements, sensor_positions, field_model),
method='L-BFGS-B',
bounds=bounds
)
# 提取结果
estimated_params = result.x
estimated_positions = np.array([
estimated_params[4*i:4*i+2]
for i in range(n_sources)
])
estimated_moments = np.array([
estimated_params[4*i+2:4*i+4]
for i in range(n_sources)
])
return {
'positions': estimated_positions,
'moments': estimated_moments,
'residual': result.fun,
'success': result.success
}
代码解析:
- 第14-17行:生成随机初始猜测,避免陷入局部最优
- 第20-23行:设置位置和磁矩的边界约束
- 第26-32行:调用L-BFGS-B优化器求解非线性最小二乘问题
- 第35-44行:从优化结果中提取磁源位置和磁矩
8.7 数据同化实现
最优插值算法实现:
class DataAssimilation:
def optimal_interpolation(self, x_b, P_b, y_o, H, R):
"""
最优插值
参数:
x_b: 背景场
P_b: 背景误差协方差
y_o: 观测值
H: 观测算子
R: 观测误差协方差
返回:
x_a: 分析场
P_a: 分析误差协方差
"""
# 创新
d = y_o - H @ x_b
# 创新协方差
S = H @ P_b @ H.T + R
# 增益矩阵
K = P_b @ H.T @ inv(S)
# 分析场
x_a = x_b + K @ d
# 分析误差协方差
P_a = P_b - K @ H @ P_b
return x_a, P_a
代码解析:
- 第16行:计算创新(观测与背景场的差异)
- 第19行:计算创新协方差,包含背景和观测误差
- 第22行:计算增益矩阵,确定背景场和观测的权重
- 第25行:更新分析场,融合背景场和观测信息
- 第28行:更新分析误差协方差
附录:完整代码
完整代码已包含在 run_magnetic_measurement.py 文件中,包含以下功能模块:
MeasurementConfig:配置参数类MagneticFieldModel:磁场正演模型MagneticSensor:传感器模型SensorArray:传感器阵列管理KalmanFilter:卡尔曼滤波器MagneticInversion:磁场反演DataAssimilation:数据同化框架- 可视化函数:绘制各种结果图表
运行代码将生成以下输出文件:
field_and_sensors.png:磁场分布和传感器布局sensor_layout_comparison.png:不同布局对比inversion_result.png:磁场反演结果kalman_filter_trajectory.png:卡尔曼滤波跟踪data_assimilation_result.png:数据同化结果
"""
主题079:磁场测量与数据同化
=====================================
本代码实现磁场测量、传感器阵列布局优化和数据同化,包括:
1. 磁场传感器模型(霍尔效应、磁阻效应)
2. 传感器阵列设计与优化
3. 卡尔曼滤波与扩展卡尔曼滤波
4. 磁场反演与源重构
5. 数据同化框架
作者:仿真教学专家
日期:2026年3月
"""
import numpy as np
import matplotlib
matplotlib.use('Agg') # 使用无头模式
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Tuple, List, Dict, Optional, Callable
from scipy.linalg import solve, inv, cholesky, sqrtm
from scipy.optimize import minimize, least_squares
from scipy.interpolate import griddata
import warnings
warnings.filterwarnings('ignore')
# ==================== 配置参数 ====================
@dataclass
class MeasurementConfig:
"""磁场测量配置参数"""
# 网格参数
nx: int = 50 # x方向网格数
ny: int = 50 # y方向网格数
domain_size: float = 1.0 # 计算域尺寸
# 传感器参数
n_sensors: int = 16 # 传感器数量
sensor_noise_std: float = 0.01 # 传感器噪声标准差
sensor_type: str = 'hall' # 传感器类型: 'hall', 'mr', 'fluxgate'
# 磁源参数
n_sources: int = 3 # 磁源数量
source_strength: float = 1.0 # 磁源强度
# 卡尔曼滤波参数
process_noise: float = 0.001 # 过程噪声
measurement_noise: float = 0.01 # 测量噪声
# 物理参数
mu0: float = 4 * np.pi * 1e-7 # 真空磁导率
# ==================== 磁场正演模型 ====================
class MagneticFieldModel:
"""
磁场正演模型
模拟磁源产生的磁场分布
"""
def __init__(self, config: MeasurementConfig):
self.config = config
self.setup_grid()
self.setup_sources()
def setup_grid(self):
"""设置计算网格"""
nx, ny = self.config.nx, self.config.ny
L = self.config.domain_size
self.x = np.linspace(0, L, nx)
self.y = np.linspace(0, L, ny)
self.X, self.Y = np.meshgrid(self.x, self.y)
# 网格间距
self.dx = L / (nx - 1)
self.dy = L / (ny - 1)
def setup_sources(self):
"""设置磁源位置"""
np.random.seed(42)
L = self.config.domain_size
# 随机放置磁源(避免边界)
margin = 0.2 * L
self.source_positions = np.random.uniform(
margin, L - margin,
size=(self.config.n_sources, 2)
)
# 磁源强度(随机方向)
self.source_moments = np.random.randn(self.config.n_sources, 2)
self.source_moments *= self.config.source_strength / \
np.linalg.norm(self.source_moments, axis=1, keepdims=True)
print(f" 磁源位置:")
for i, pos in enumerate(self.source_positions):
print(f" Source {i+1}: ({pos[0]:.3f}, {pos[1]:.3f}), "
f"moment=({self.source_moments[i,0]:.3f}, {self.source_moments[i,1]:.3f})")
def compute_field_dipole(self, x: float, y: float,
source_pos: np.ndarray,
moment: np.ndarray) -> Tuple[float, float]:
"""
计算单个偶极子在观测点产生的磁场
参数:
x, y: 观测点坐标
source_pos: 磁源位置
moment: 磁矩向量
返回:
Bx, By: 磁场分量
"""
dx = x - source_pos[0]
dy = y - source_pos[1]
r = np.sqrt(dx**2 + dy**2)
if r < 1e-10:
return 0.0, 0.0
# 磁偶极子场公式 (2D简化)
# B = (mu0 / 4*pi*r^3) * [3(m·r)r/r^2 - m]
r_vec = np.array([dx, dy])
m_dot_r = np.dot(moment, r_vec)
coeff = self.config.mu0 / (4 * np.pi * r**3)
B = coeff * (3 * m_dot_r * r_vec / r**2 - moment)
return B[0], B[1]
def compute_field(self, x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
计算总磁场
参数:
x, y: 观测点坐标数组
返回:
Bx, By: 磁场分量数组
"""
Bx = np.zeros_like(x)
By = np.zeros_like(y)
for i in range(len(x)):
for j in range(self.config.n_sources):
bx, by = self.compute_field_dipole(
x[i], y[i],
self.source_positions[j],
self.source_moments[j]
)
Bx[i] += bx
By[i] += by
return Bx, By
def compute_field_grid(self) -> Tuple[np.ndarray, np.ndarray]:
"""计算网格上的磁场"""
Bx = np.zeros_like(self.X)
By = np.zeros_like(self.Y)
for i in range(self.config.nx):
for j in range(self.config.ny):
bx, by = self.compute_field(
np.array([self.X[j, i]]),
np.array([self.Y[j, i]])
)
Bx[j, i] = bx[0]
By[j, i] = by[0]
return Bx, By
def get_field_magnitude(self, Bx: np.ndarray, By: np.ndarray) -> np.ndarray:
"""计算磁场幅值"""
return np.sqrt(Bx**2 + By**2)
# ==================== 传感器模型 ====================
class MagneticSensor:
"""
磁场传感器模型
支持多种传感器类型
"""
def __init__(self, config: MeasurementConfig):
self.config = config
self.setup_sensor_characteristics()
def setup_sensor_characteristics(self):
"""设置传感器特性"""
sensor_type = self.config.sensor_type
if sensor_type == 'hall':
# 霍尔效应传感器
self.sensitivity = 100 # mV/T
self.offset = 0.0 # mV
self.linear_range = 1.0 # T
self.resolution = 0.1e-3 # T
elif sensor_type == 'mr':
# 磁阻传感器
self.sensitivity = 500 # mV/T
self.offset = 2.5 # mV (半桥)
self.linear_range = 0.1 # T
self.resolution = 0.01e-3 # T
elif sensor_type == 'fluxgate':
# 磁通门传感器
self.sensitivity = 1000 # mV/T
self.offset = 0.0 # mV
self.linear_range = 0.1 # T
self.resolution = 1e-6 # T
else:
raise ValueError(f"Unknown sensor type: {sensor_type}")
print(f" 传感器类型: {sensor_type}")
print(f" 灵敏度: {self.sensitivity} mV/T")
print(f" 线性范围: ±{self.linear_range} T")
print(f" 分辨率: {self.resolution*1e6:.1f} μT")
def measure(self, B_true: np.ndarray,
position: Optional[np.ndarray] = None) -> np.ndarray:
"""
模拟传感器测量
参数:
B_true: 真实磁场值
position: 传感器位置(用于添加位置相关误差)
返回:
B_measured: 测量值(含噪声)
"""
# 添加高斯噪声
noise = np.random.normal(0, self.config.sensor_noise_std, size=B_true.shape)
# 添加量化误差
quantization = self.resolution * np.random.uniform(-0.5, 0.5, size=B_true.shape)
# 添加非线性(饱和效应)
B_saturated = np.clip(B_true, -self.linear_range, self.linear_range)
B_measured = B_saturated + noise + quantization
return B_measured
def get_noise_covariance(self, n_measurements: int) -> np.ndarray:
"""获取噪声协方差矩阵"""
return np.eye(n_measurements) * self.config.sensor_noise_std**2
# ==================== 传感器阵列 ====================
class SensorArray:
"""
传感器阵列
管理多个传感器的布局和测量
"""
def __init__(self, config: MeasurementConfig):
self.config = config
self.positions = None
def uniform_layout(self):
"""均匀布局"""
n = self.config.n_sensors
L = self.config.domain_size
# 网格布局
n_side = int(np.sqrt(n))
x_pos = np.linspace(0.1*L, 0.9*L, n_side)
y_pos = np.linspace(0.1*L, 0.9*L, n_side)
positions = []
for x in x_pos:
for y in y_pos:
positions.append([x, y])
self.positions = np.array(positions[:n])
return self.positions
def circular_layout(self):
"""圆形布局"""
n = self.config.n_sensors
L = self.config.domain_size
center = L / 2
radius = 0.35 * L
angles = np.linspace(0, 2*np.pi, n, endpoint=False)
self.positions = np.array([
[center + radius * np.cos(a), center + radius * np.sin(a)]
for a in angles
])
return self.positions
def random_layout(self):
"""随机布局"""
np.random.seed(42)
n = self.config.n_sensors
L = self.config.domain_size
margin = 0.1 * L
self.positions = np.random.uniform(
margin, L - margin, size=(n, 2)
)
return self.positions
def optimized_layout(self, field_model: MagneticFieldModel):
"""
优化传感器布局(基于D-最优准则)
参数:
field_model: 磁场模型
返回:
positions: 优化后的位置
"""
print(" 优化传感器布局...")
n = self.config.n_sensors
L = self.config.domain_size
# 初始布局
x0 = self.uniform_layout().flatten()
# 定义目标函数(最大化Fisher信息矩阵的行列式)
def objective(x):
positions = x.reshape(n, 2)
# 确保在域内
if np.any(positions < 0) or np.any(positions > L):
return 1e10
# 计算灵敏度矩阵
H = self.compute_sensitivity_matrix(positions, field_model)
# Fisher信息矩阵
I = H.T @ H
# 最小化负对数行列式(等价于最大化行列式)
try:
sign, logdet = np.linalg.slogdet(I)
if sign <= 0:
return 1e10
return -logdet
except:
return 1e10
# 优化
bounds = [(0, L) for _ in range(2*n)]
result = minimize(objective, x0, method='L-BFGS-B', bounds=bounds)
self.positions = result.x.reshape(n, 2)
# 计算优化后的指标
H = self.compute_sensitivity_matrix(self.positions, field_model)
I = H.T @ H
det_I = np.linalg.det(I)
print(f" Fisher信息矩阵行列式: {det_I:.6e}")
return self.positions
def compute_sensitivity_matrix(self, positions: np.ndarray,
field_model: MagneticFieldModel) -> np.ndarray:
"""
计算灵敏度矩阵
参数:
positions: 传感器位置
field_model: 磁场模型
返回:
H: 灵敏度矩阵
"""
n_sensors = len(positions)
n_params = 2 * self.config.n_sources # 每个源有x,y位置
H = np.zeros((n_sensors, n_params))
# 数值微分计算灵敏度
eps = 1e-6
for i, pos in enumerate(positions):
# 基准测量
B0, _ = field_model.compute_field(np.array([pos[0]]), np.array([pos[1]]))
for j in range(self.config.n_sources):
# 对源位置x的灵敏度
field_model.source_positions[j, 0] += eps
Bx, _ = field_model.compute_field(np.array([pos[0]]), np.array([pos[1]]))
H[i, 2*j] = (Bx[0] - B0[0]) / eps
field_model.source_positions[j, 0] -= eps
# 对源位置y的灵敏度
field_model.source_positions[j, 1] += eps
By, _ = field_model.compute_field(np.array([pos[0]]), np.array([pos[1]]))
H[i, 2*j+1] = (By[0] - B0[0]) / eps
field_model.source_positions[j, 1] -= eps
return H
def measure_field(self, field_model: MagneticFieldModel,
sensor: MagneticSensor) -> np.ndarray:
"""
使用传感器阵列测量磁场
参数:
field_model: 磁场模型
sensor: 传感器
返回:
measurements: 测量值
"""
if self.positions is None:
self.uniform_layout()
# 计算真实磁场
Bx_true, By_true = field_model.compute_field(
self.positions[:, 0], self.positions[:, 1]
)
# 测量幅值
B_mag = np.sqrt(Bx_true**2 + By_true**2)
# 添加测量噪声
measurements = sensor.measure(B_mag, self.positions)
return measurements
# ==================== 卡尔曼滤波 ====================
class KalmanFilter:
"""
卡尔曼滤波器
用于磁场状态估计
"""
def __init__(self, config: MeasurementConfig):
self.config = config
def initialize(self, x0: np.ndarray, P0: np.ndarray):
"""
初始化滤波器
参数:
x0: 初始状态估计
P0: 初始协方差矩阵
"""
self.x = x0.copy()
self.P = P0.copy()
self.n_state = len(x0)
def predict(self, F: np.ndarray, Q: np.ndarray):
"""
预测步骤
参数:
F: 状态转移矩阵
Q: 过程噪声协方差
"""
# 状态预测
self.x = F @ self.x
# 协方差预测
self.P = F @ self.P @ F.T + Q
def update(self, z: np.ndarray, H: np.ndarray, R: np.ndarray):
"""
更新步骤
参数:
z: 测量值
H: 观测矩阵
R: 测量噪声协方差
"""
# 创新
y = z - H @ self.x
# 创新协方差
S = H @ self.P @ H.T + R
# 卡尔曼增益
K = self.P @ H.T @ inv(S)
# 状态更新
self.x = self.x + K @ y
# 协方差更新 (Joseph形式,数值稳定)
I_KH = np.eye(self.n_state) - K @ H
self.P = I_KH @ self.P @ I_KH.T + K @ R @ K.T
def get_state(self) -> Tuple[np.ndarray, np.ndarray]:
"""获取当前状态估计和协方差"""
return self.x.copy(), self.P.copy()
class ExtendedKalmanFilter(KalmanFilter):
"""
扩展卡尔曼滤波器
用于非线性磁场反演
"""
def __init__(self, config: MeasurementConfig):
super().__init__(config)
def update_nonlinear(self, z: np.ndarray, h_func: Callable,
H_jacobian: np.ndarray, R: np.ndarray):
"""
非线性更新步骤
参数:
z: 测量值
h_func: 观测函数
H_jacobian: 观测函数雅可比矩阵
R: 测量噪声协方差
"""
# 预测测量
z_pred = h_func(self.x)
# 创新
y = z - z_pred
# 创新协方差
S = H_jacobian @ self.P @ H_jacobian.T + R
# 卡尔曼增益
K = self.P @ H_jacobian.T @ inv(S)
# 状态更新
self.x = self.x + K @ y
# 协方差更新
I_KH = np.eye(self.n_state) - K @ H_jacobian
self.P = I_KH @ self.P @ I_KH.T + K @ R @ K.T
# ==================== 磁场反演 ====================
class MagneticInversion:
"""
磁场反演
从测量数据重构磁源参数
"""
def __init__(self, config: MeasurementConfig):
self.config = config
def objective_function(self, params: np.ndarray,
measurements: np.ndarray,
sensor_positions: np.ndarray,
field_model: MagneticFieldModel) -> float:
"""
反演目标函数(最小二乘)
参数:
params: 待求参数 [x1, y1, mx1, my1, x2, y2, mx2, my2, ...]
measurements: 测量值
sensor_positions: 传感器位置
field_model: 磁场模型
返回:
residual: 残差平方和
"""
n_sources = self.config.n_sources
# 更新模型参数
for i in range(n_sources):
field_model.source_positions[i] = params[4*i:4*i+2]
field_model.source_moments[i] = params[4*i+2:4*i+4]
# 计算预测场
Bx_pred, By_pred = field_model.compute_field(
sensor_positions[:, 0], sensor_positions[:, 1]
)
B_pred = np.sqrt(Bx_pred**2 + By_pred**2)
# 残差
residual = np.sum((measurements - B_pred)**2)
return residual
def invert(self, measurements: np.ndarray,
sensor_positions: np.ndarray,
field_model: MagneticFieldModel,
initial_guess: Optional[np.ndarray] = None) -> Dict:
"""
执行反演
参数:
measurements: 测量值
sensor_positions: 传感器位置
field_model: 磁场模型
initial_guess: 初始猜测
返回:
result: 反演结果
"""
print(" 执行磁场反演...")
n_sources = self.config.n_sources
# 初始猜测
if initial_guess is None:
L = self.config.domain_size
initial_guess = np.random.uniform(0.2*L, 0.8*L, size=4*n_sources)
# 优化
bounds = []
L = self.config.domain_size
for i in range(n_sources):
# 位置边界
bounds.extend([(0, L), (0, L)])
# 磁矩边界
bounds.extend([(-2, 2), (-2, 2)])
result = minimize(
self.objective_function,
initial_guess,
args=(measurements, sensor_positions, field_model),
method='L-BFGS-B',
bounds=bounds
)
# 提取结果
estimated_params = result.x
estimated_positions = np.array([
estimated_params[4*i:4*i+2] for i in range(n_sources)
])
estimated_moments = np.array([
estimated_params[4*i+2:4*i+4] for i in range(n_sources)
])
print(f" 优化收敛: {result.success}")
print(f" 残差: {result.fun:.6e}")
return {
'positions': estimated_positions,
'moments': estimated_moments,
'residual': result.fun,
'success': result.success
}
# ==================== 数据同化 ====================
class DataAssimilation:
"""
数据同化框架
融合模型预测和观测数据
"""
def __init__(self, config: MeasurementConfig):
self.config = config
def ensemble_kalman_filter(self, ensemble: np.ndarray,
measurements: np.ndarray,
H: np.ndarray, R: np.ndarray) -> np.ndarray:
"""
集合卡尔曼滤波(EnKF)
参数:
ensemble: 状态集合 (n_state x n_ensemble)
measurements: 观测值
H: 观测矩阵
R: 观测误差协方差
返回:
updated_ensemble: 更新后的集合
"""
n_state, n_ensemble = ensemble.shape
n_obs = len(measurements)
# 集合均值
x_mean = np.mean(ensemble, axis=1)
# 扰动观测
Z = np.random.multivariate_normal(measurements, R, n_ensemble).T
# 预测观测
Y = H @ ensemble
# 观测集合均值
y_mean = np.mean(Y, axis=1)
# 协方差
P_xy = (ensemble - x_mean[:, np.newaxis]) @ (Y - y_mean[:, np.newaxis]).T / (n_ensemble - 1)
P_yy = (Y - y_mean[:, np.newaxis]) @ (Y - y_mean[:, np.newaxis]).T / (n_ensemble - 1)
# 卡尔曼增益
K = P_xy @ inv(P_yy + R)
# 更新集合
updated_ensemble = ensemble + K @ (Z - Y)
return updated_ensemble
def optimal_interpolation(self, x_b: np.ndarray, P_b: np.ndarray,
y_o: np.ndarray, H: np.ndarray,
R: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
最优插值(OI)
参数:
x_b: 背景场
P_b: 背景误差协方差
y_o: 观测值
H: 观测算子
R: 观测误差协方差
返回:
x_a: 分析场
P_a: 分析误差协方差
"""
# 创新
d = y_o - H @ x_b
# 创新协方差
S = H @ P_b @ H.T + R
# 增益矩阵
K = P_b @ H.T @ inv(S)
# 分析场
x_a = x_b + K @ d
# 分析误差协方差
P_a = P_b - K @ H @ P_b
return x_a, P_a
# ==================== 可视化函数 ====================
def plot_field_and_sensors(field_model: MagneticFieldModel,
sensor_array: SensorArray,
measurements: np.ndarray,
filename: str):
"""绘制磁场和传感器布局"""
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# 计算网格场
Bx, By = field_model.compute_field_grid()
B_mag = np.sqrt(Bx**2 + By**2)
# 绘制磁场幅值
im1 = axes[0].contourf(field_model.X, field_model.Y, B_mag,
levels=20, cmap='viridis')
axes[0].set_xlabel('x (m)', fontsize=12)
axes[0].set_ylabel('y (m)', fontsize=12)
axes[0].set_title('Magnetic Field Magnitude', fontsize=14)
plt.colorbar(im1, ax=axes[0], label='|B| (T)')
# 绘制传感器位置
axes[0].scatter(sensor_array.positions[:, 0],
sensor_array.positions[:, 1],
c='red', s=100, marker='o',
edgecolors='white', linewidths=2,
label='Sensors')
# 绘制磁源位置
axes[0].scatter(field_model.source_positions[:, 0],
field_model.source_positions[:, 1],
c='yellow', s=200, marker='*',
edgecolors='black', linewidths=1,
label='Sources')
axes[0].legend(loc='upper right')
# 绘制测量值
axes[1].bar(range(len(measurements)), measurements,
color='steelblue', edgecolor='navy')
axes[1].set_xlabel('Sensor Index', fontsize=12)
axes[1].set_ylabel('Measured |B| (T)', fontsize=12)
axes[1].set_title('Sensor Measurements', fontsize=14)
axes[1].grid(True, alpha=0.3, axis='y')
plt.tight_layout()
plt.savefig(filename, dpi=150)
plt.close()
print(f" 磁场和传感器布局已保存: {filename}")
def plot_inversion_result(field_model: MagneticFieldModel,
inversion_result: Dict,
sensor_array: SensorArray,
measurements: np.ndarray,
filename: str):
"""绘制反演结果"""
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# 真实源位置
axes[0].scatter(field_model.source_positions[:, 0],
field_model.source_positions[:, 1],
c='green', s=300, marker='*',
edgecolors='darkgreen', linewidths=2,
label='True Sources', zorder=5)
# 估计源位置
axes[0].scatter(inversion_result['positions'][:, 0],
inversion_result['positions'][:, 1],
c='red', s=200, marker='x',
linewidths=3,
label='Estimated Sources', zorder=5)
# 传感器位置
axes[0].scatter(sensor_array.positions[:, 0],
sensor_array.positions[:, 1],
c='blue', s=100, marker='o',
edgecolors='white', linewidths=2,
label='Sensors', zorder=4)
# 绘制连接线(真实vs估计)
for i in range(len(field_model.source_positions)):
axes[0].plot([field_model.source_positions[i, 0],
inversion_result['positions'][i, 0]],
[field_model.source_positions[i, 1],
inversion_result['positions'][i, 1]],
'k--', alpha=0.5, zorder=3)
axes[0].set_xlabel('x (m)', fontsize=12)
axes[0].set_ylabel('y (m)', fontsize=12)
axes[0].set_title('Source Localization', fontsize=14)
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[0].axis('equal')
# 计算位置误差
errors = np.linalg.norm(
field_model.source_positions - inversion_result['positions'],
axis=1
)
# 误差条形图
axes[1].bar(range(len(errors)), errors,
color='coral', edgecolor='darkred')
axes[1].set_xlabel('Source Index', fontsize=12)
axes[1].set_ylabel('Position Error (m)', fontsize=12)
axes[1].set_title('Localization Error', fontsize=14)
axes[1].grid(True, alpha=0.3, axis='y')
# 添加平均误差文本
avg_error = np.mean(errors)
axes[1].axhline(y=avg_error, color='blue', linestyle='--',
label=f'Mean: {avg_error:.4f} m')
axes[1].legend()
plt.tight_layout()
plt.savefig(filename, dpi=150)
plt.close()
print(f" 反演结果已保存: {filename}")
def plot_kalman_filter_trajectory(true_trajectory: np.ndarray,
estimated_trajectory: np.ndarray,
covariance_history: List[np.ndarray],
measurements: np.ndarray,
filename: str):
"""绘制卡尔曼滤波轨迹"""
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
n_steps = len(true_trajectory)
time = np.arange(n_steps)
# 轨迹对比
axes[0, 0].plot(true_trajectory[:, 0], true_trajectory[:, 1],
'g-', linewidth=2, label='True Trajectory')
axes[0, 0].plot(estimated_trajectory[:, 0], estimated_trajectory[:, 1],
'r--', linewidth=1.5, label='Estimated Trajectory')
axes[0, 0].scatter(measurements[:, 0], measurements[:, 1],
c='blue', s=20, alpha=0.5, label='Measurements')
axes[0, 0].set_xlabel('x (m)', fontsize=11)
axes[0, 0].set_ylabel('y (m)', fontsize=11)
axes[0, 0].set_title('Trajectory Tracking', fontsize=12)
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
axes[0, 0].axis('equal')
# x位置误差
x_error = estimated_trajectory[:, 0] - true_trajectory[:, 0]
x_std = np.sqrt([P[0, 0] for P in covariance_history])
axes[0, 1].plot(time, x_error, 'b-', linewidth=1.5, label='Error')
axes[0, 1].fill_between(time, -2*x_std, 2*x_std,
alpha=0.3, color='red', label='±2σ')
axes[0, 1].set_xlabel('Time Step', fontsize=11)
axes[0, 1].set_ylabel('x Error (m)', fontsize=11)
axes[0, 1].set_title('Position Error (x)', fontsize=12)
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# y位置误差
y_error = estimated_trajectory[:, 1] - true_trajectory[:, 1]
y_std = np.sqrt([P[1, 1] for P in covariance_history])
axes[1, 0].plot(time, y_error, 'b-', linewidth=1.5, label='Error')
axes[1, 0].fill_between(time, -2*y_std, 2*y_std,
alpha=0.3, color='red', label='±2σ')
axes[1, 0].set_xlabel('Time Step', fontsize=11)
axes[1, 0].set_ylabel('y Error (m)', fontsize=11)
axes[1, 0].set_title('Position Error (y)', fontsize=12)
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# 总误差
total_error = np.sqrt(x_error**2 + y_error**2)
axes[1, 1].semilogy(time, total_error, 'k-', linewidth=1.5)
axes[1, 1].set_xlabel('Time Step', fontsize=11)
axes[1, 1].set_ylabel('Total Error (m)', fontsize=11)
axes[1, 1].set_title('Total Position Error (log)', fontsize=12)
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(filename, dpi=150)
plt.close()
print(f" 卡尔曼滤波轨迹已保存: {filename}")
def plot_sensor_layout_comparison(field_model: MagneticFieldModel,
layouts: Dict[str, np.ndarray],
filename: str):
"""比较不同传感器布局"""
n_layouts = len(layouts)
fig, axes = plt.subplots(2, 2, figsize=(14, 12))
axes = axes.flatten()
# 计算网格场
Bx, By = field_model.compute_field_grid()
B_mag = np.sqrt(Bx**2 + By**2)
for idx, (name, positions) in enumerate(layouts.items()):
if idx >= 4:
break
ax = axes[idx]
# 绘制磁场
im = ax.contourf(field_model.X, field_model.Y, B_mag,
levels=15, cmap='viridis', alpha=0.7)
# 绘制传感器
ax.scatter(positions[:, 0], positions[:, 1],
c='red', s=150, marker='o',
edgecolors='white', linewidths=2,
label='Sensors')
# 绘制磁源
ax.scatter(field_model.source_positions[:, 0],
field_model.source_positions[:, 1],
c='yellow', s=300, marker='*',
edgecolors='black', linewidths=1.5,
label='Sources')
ax.set_xlabel('x (m)', fontsize=11)
ax.set_ylabel('y (m)', fontsize=11)
ax.set_title(f'{name} Layout', fontsize=12)
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(filename, dpi=150)
plt.close()
print(f" 传感器布局对比已保存: {filename}")
def plot_data_assimilation_comparison(true_field: np.ndarray,
background: np.ndarray,
analysis: np.ndarray,
sensor_positions: np.ndarray,
filename: str):
"""绘制数据同化结果对比"""
fig, axes = plt.subplots(2, 2, figsize=(14, 12))
vmax = np.max(true_field)
vmin = np.min(true_field)
# 真实场
im1 = axes[0, 0].imshow(true_field, origin='lower', cmap='RdBu_r',
vmin=vmin, vmax=vmax)
axes[0, 0].set_title('True Field', fontsize=12)
plt.colorbar(im1, ax=axes[0, 0])
# 背景场
im2 = axes[0, 1].imshow(background, origin='lower', cmap='RdBu_r',
vmin=vmin, vmax=vmax)
axes[0, 1].set_title('Background', fontsize=12)
plt.colorbar(im2, ax=axes[0, 1])
# 分析场
im3 = axes[1, 0].imshow(analysis, origin='lower', cmap='RdBu_r',
vmin=vmin, vmax=vmax)
axes[1, 0].scatter(sensor_positions[:, 0], sensor_positions[:, 1],
c='green', s=50, marker='x')
axes[1, 0].set_title('Analysis (with DA)', fontsize=12)
plt.colorbar(im3, ax=axes[1, 0])
# 误差对比
bg_error = np.abs(background - true_field)
an_error = np.abs(analysis - true_field)
x_pos = np.arange(2)
errors = [np.mean(bg_error), np.mean(an_error)]
axes[1, 1].bar(x_pos, errors, color=['coral', 'steelblue'],
edgecolor='black')
axes[1, 1].set_xticks(x_pos)
axes[1, 1].set_xticklabels(['Background', 'Analysis'])
axes[1, 1].set_ylabel('Mean Absolute Error', fontsize=12)
axes[1, 1].set_title('Error Reduction', fontsize=12)
axes[1, 1].grid(True, alpha=0.3, axis='y')
# 添加改进百分比
improvement = (errors[0] - errors[1]) / errors[0] * 100
axes[1, 1].text(0.5, max(errors)*0.9, f'Improvement: {improvement:.1f}%',
ha='center', fontsize=11, fontweight='bold')
plt.tight_layout()
plt.savefig(filename, dpi=150)
plt.close()
print(f" 数据同化结果已保存: {filename}")
# ==================== 主程序 ====================
def main():
"""主程序"""
print("=" * 70)
print("主题079:磁场测量与数据同化")
print("=" * 70)
# 配置参数
config = MeasurementConfig(
nx=50, ny=50,
n_sensors=16,
n_sources=3,
sensor_noise_std=0.01
)
# ============================================================
# 1. 初始化磁场模型
# ============================================================
print("\n" + "=" * 70)
print("1. 初始化磁场模型")
print("=" * 70)
field_model = MagneticFieldModel(config)
# ============================================================
# 2. 传感器模型与阵列
# ============================================================
print("\n" + "=" * 70)
print("2. 传感器模型与阵列")
print("=" * 70)
sensor = MagneticSensor(config)
sensor_array = SensorArray(config)
# 比较不同布局
layouts = {
'Uniform': sensor_array.uniform_layout(),
'Circular': sensor_array.circular_layout(),
'Random': sensor_array.random_layout()
}
# 使用均匀布局进行测量
sensor_array.uniform_layout()
measurements = sensor_array.measure_field(field_model, sensor)
print(f" 传感器数量: {config.n_sensors}")
print(f" 测量值范围: [{np.min(measurements):.6f}, {np.max(measurements):.6f}] T")
# 绘制磁场和传感器布局
plot_field_and_sensors(field_model, sensor_array, measurements,
'field_and_sensors.png')
# 绘制布局对比
plot_sensor_layout_comparison(field_model, layouts,
'sensor_layout_comparison.png')
# ============================================================
# 3. 磁场反演
# ============================================================
print("\n" + "=" * 70)
print("3. 磁场反演")
print("=" * 70)
inversion = MagneticInversion(config)
# 保存真实参数
true_positions = field_model.source_positions.copy()
true_moments = field_model.source_moments.copy()
# 执行反演
inversion_result = inversion.invert(
measurements,
sensor_array.positions,
field_model
)
# 恢复真实参数
field_model.source_positions = true_positions
field_model.source_moments = true_moments
# 绘制反演结果
plot_inversion_result(field_model, inversion_result,
sensor_array, measurements,
'inversion_result.png')
# 计算反演误差
position_errors = np.linalg.norm(
true_positions - inversion_result['positions'],
axis=1
)
print(f"\n 反演误差:")
print(f" 平均位置误差: {np.mean(position_errors):.4f} m")
print(f" 最大位置误差: {np.max(position_errors):.4f} m")
# ============================================================
# 4. 卡尔曼滤波演示
# ============================================================
print("\n" + "=" * 70)
print("4. 卡尔曼滤波演示")
print("=" * 70)
print(" 模拟移动磁源的跟踪...")
# 模拟参数
n_steps = 100
dt = 0.1
# 生成真实轨迹(圆周运动)
center = config.domain_size / 2
radius = 0.3 * config.domain_size
omega = 0.5
true_trajectory = np.zeros((n_steps, 2))
for k in range(n_steps):
angle = omega * k * dt
true_trajectory[k] = [
center + radius * np.cos(angle),
center + radius * np.sin(angle)
]
# 生成测量(带噪声)
measurement_noise = 0.02
measurements_kf = true_trajectory + np.random.normal(
0, measurement_noise, size=true_trajectory.shape
)
# 初始化卡尔曼滤波
kf = KalmanFilter(config)
# 初始状态估计
x0 = measurements_kf[0]
P0 = np.eye(2) * 0.1
kf.initialize(x0, P0)
# 状态转移矩阵(恒定速度模型)
F = np.eye(2)
Q = np.eye(2) * config.process_noise
H = np.eye(2)
R = np.eye(2) * measurement_noise**2
# 存储结果
estimated_trajectory = np.zeros((n_steps, 2))
covariance_history = []
estimated_trajectory[0] = x0
covariance_history.append(P0)
# 滤波循环
for k in range(1, n_steps):
# 预测
kf.predict(F, Q)
# 更新
kf.update(measurements_kf[k], H, R)
# 存储
x_est, P_est = kf.get_state()
estimated_trajectory[k] = x_est
covariance_history.append(P_est)
# 绘制卡尔曼滤波结果
plot_kalman_filter_trajectory(
true_trajectory, estimated_trajectory,
covariance_history, measurements_kf,
'kalman_filter_trajectory.png'
)
# 计算跟踪误差
final_error = np.linalg.norm(estimated_trajectory[-1] - true_trajectory[-1])
rmse = np.sqrt(np.mean(np.sum((estimated_trajectory - true_trajectory)**2, axis=1)))
print(f"\n 跟踪性能:")
print(f" 最终位置误差: {final_error:.4f} m")
print(f" RMSE: {rmse:.4f} m")
# ============================================================
# 5. 数据同化演示
# ============================================================
print("\n" + "=" * 70)
print("5. 数据同化演示")
print("=" * 70)
print(" 使用最优插值融合模型和观测...")
da = DataAssimilation(config)
# 创建网格场
nx, ny = 20, 20
x_grid = np.linspace(0, config.domain_size, nx)
y_grid = np.linspace(0, config.domain_size, ny)
X_grid, Y_grid = np.meshgrid(x_grid, y_grid)
# 真实场
Bx_true, By_true = field_model.compute_field(
X_grid.flatten(), Y_grid.flatten()
)
true_field = np.sqrt(Bx_true**2 + By_true**2).reshape(ny, nx)
# 背景场(带偏差的模型预测)
# 模拟模型误差:偏移磁源位置
bias = 0.05
field_model.source_positions += bias
Bx_bg, By_bg = field_model.compute_field(
X_grid.flatten(), Y_grid.flatten()
)
background = np.sqrt(Bx_bg**2 + By_bg**2).reshape(ny, nx)
field_model.source_positions -= bias # 恢复
# 观测(在传感器位置)
obs_indices = []
for pos in sensor_array.positions:
ix = np.argmin(np.abs(x_grid - pos[0]))
iy = np.argmin(np.abs(y_grid - pos[1]))
obs_indices.append([iy, ix])
obs_indices = np.array(obs_indices)
# 构建观测算子
n_grid = nx * ny
n_obs = len(sensor_array.positions)
H_da = np.zeros((n_obs, n_grid))
for i, (iy, ix) in enumerate(obs_indices):
idx = iy * nx + ix
H_da[i, idx] = 1.0
# 观测值
y_o = measurements[:n_obs]
# 误差协方差
P_b = np.eye(n_grid) * 0.01 # 背景误差
R_da = np.eye(n_obs) * config.measurement_noise**2 # 观测误差
# 执行最优插值
x_a, P_a = da.optimal_interpolation(
background.flatten(), P_b,
y_o, H_da, R_da
)
analysis = x_a.reshape(ny, nx)
# 绘制数据同化结果
plot_data_assimilation_comparison(
true_field, background, analysis,
obs_indices, 'data_assimilation_result.png'
)
# 计算改进
bg_error = np.mean(np.abs(background - true_field))
an_error = np.mean(np.abs(analysis - true_field))
improvement = (bg_error - an_error) / bg_error * 100
print(f"\n 数据同化效果:")
print(f" 背景场误差: {bg_error:.6f}")
print(f" 分析场误差: {an_error:.6f}")
print(f" 改进: {improvement:.1f}%")
# ============================================================
# 6. 结果总结
# ============================================================
print("\n" + "=" * 70)
print("6. 结果总结")
print("=" * 70)
print("\n磁场测量与数据同化演示完成!")
print("\n主要功能:")
print(" 1. 磁场传感器建模(霍尔、磁阻、磁通门)")
print(" 2. 传感器阵列布局(均匀、圆形、随机、优化)")
print(" 3. 磁场反演(从测量重构磁源)")
print(" 4. 卡尔曼滤波(移动源跟踪)")
print(" 5. 数据同化(最优插值、EnKF)")
print("\n生成的文件:")
print(" - field_and_sensors.png: 磁场分布和传感器布局")
print(" - sensor_layout_comparison.png: 不同布局对比")
print(" - inversion_result.png: 磁场反演结果")
print(" - kalman_filter_trajectory.png: 卡尔曼滤波跟踪")
print(" - data_assimilation_result.png: 数据同化结果")
print("\n应用建议:")
print(" - 传感器布局优化可提高反演精度")
print(" - 卡尔曼滤波适用于实时跟踪应用")
print(" - 数据同化可融合多源信息提高估计精度")
print(" - 实际应用中需校准传感器模型参数")
if __name__ == "__main__":
main()
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐




所有评论(0)