结构热应力仿真-主题063_并行计算与MPI-主题063_并行计算与MPI教程
主题063:并行计算与MPI仿真教程
目录





引言
为什么需要并行计算?
在现代科学与工程计算中,仿真模型的规模和复杂度呈指数级增长。一个典型的计算流体力学(CFD)仿真可能需要处理数十亿个网格单元,一个有限元结构分析可能涉及数百万个自由度。传统的串行计算已经无法满足这些需求:
- 计算时间限制:串行计算的时间随问题规模呈超线性增长
- 内存限制:单台计算机的内存容量有限
- 物理建模需求:更精细的网格、更复杂的物理模型需要更多计算资源
并行计算通过将大问题分解为多个小问题,在多个处理器上同时求解,是解决上述挑战的关键技术。
MPI简介
**MPI(Message Passing Interface,消息传递接口)**是并行计算领域最广泛使用的标准。它定义了一套用于进程间通信的函数接口,支持:
- 点对点通信(发送/接收)
- 集合通信(广播、散播、收集、归约)
- 进程组和通信域管理
- 并行I/O操作
MPI的优势在于:
- 可移植性:支持从笔记本电脑到超级计算机的各种平台
- 可扩展性:支持数万甚至数十万个进程
- 成熟稳定:经过30多年的发展和优化
本教程目标
通过本教程,你将学会:
- 理解并行计算的基本概念和性能指标
- 掌握MPI的核心通信模式
- 实现基于域分解的并行热传导求解器
- 分析并行程序的性能和可扩展性
并行计算基础理论
1. 并行计算模型
1.1 Flynn分类法
计算机体系结构根据指令流和数据流的关系可分为四类:
| 类型 | 名称 | 描述 | 示例 |
|---|---|---|---|
| SISD | 单指令单数据 | 传统串行计算机 | 早期PC |
| SIMD | 单指令多数据 | 向量处理器、GPU | GPU计算 |
| MISD | 多指令单数据 | 流水线处理 | 不太常见 |
| MIMD | 多指令多数据 | 多核处理器、集群 | 现代多核CPU |
科学计算中最常用的是MIMD模型,MPI正是基于这一模型设计的。
1.2 内存模型
并行计算系统根据内存组织方式可分为两类:
共享内存系统(Shared Memory)
- 所有处理器共享统一的地址空间
- 通过读写共享变量进行通信
- 编程模型:OpenMP、Pthreads
- 优点:编程简单,数据共享方便
- 缺点:扩展性受限,缓存一致性问题
分布式内存系统(Distributed Memory)
- 每个处理器拥有独立的私有内存
- 通过显式消息传递进行通信
- 编程模型:MPI
- 优点:可扩展性好,适合大规模并行
- 缺点:编程复杂,需要显式管理数据分布
混合系统
- 现代超级计算机通常采用混合架构
- 节点内使用共享内存(OpenMP)
- 节点间使用分布式内存(MPI)
2. 并行性能指标
2.1 加速比(Speedup)
加速比衡量并行程序相对于串行程序的性能提升:
S ( p ) = T 1 T p S(p) = \frac{T_1}{T_p} S(p)=TpT1
其中:
- T 1 T_1 T1:串行执行时间
- T p T_p Tp:使用 p p p个进程的执行时间
- S ( p ) S(p) S(p):加速比
理想情况: S ( p ) = p S(p) = p S(p)=p(线性加速)
2.2 并行效率(Efficiency)
并行效率衡量处理器资源的利用率:
E ( p ) = S ( p ) p = T 1 p ⋅ T p E(p) = \frac{S(p)}{p} = \frac{T_1}{p \cdot T_p} E(p)=pS(p)=p⋅TpT1
理想情况: E ( p ) = 1 E(p) = 1 E(p)=1(100%效率)
2.3 可扩展性
强可扩展性(Strong Scaling)
- 固定问题规模,增加处理器数量
- 目标:减少求解时间
- 理想加速比: S ( p ) = p S(p) = p S(p)=p
弱可扩展性(Weak Scaling)
- 每处理器固定问题规模,增加处理器数量
- 目标:在相同时间内求解更大问题
- 理想情况:执行时间保持不变
3. Amdahl定律与Gustafson定律
3.1 Amdahl定律
假设程序中可并行化的比例为 f f f,则最大加速比为:
S m a x = 1 ( 1 − f ) + f p S_{max} = \frac{1}{(1-f) + \frac{f}{p}} Smax=(1−f)+pf1
当 p → ∞ p \to \infty p→∞时:
S m a x → 1 1 − f S_{max} \to \frac{1}{1-f} Smax→1−f1
启示:即使使用无限多处理器,加速比也受限于串行部分的比例。
3.2 Gustafson定律
Amdahl定律假设问题规模固定,而Gustafson定律假设每处理器的工作量固定:
S ( p ) = p − f ( p − 1 ) S(p) = p - f(p-1) S(p)=p−f(p−1)
启示:如果随处理器增加而扩大问题规模,可以获得接近线性的加速比。
MPI编程模型
1. MPI基本通信模式
1.1 点对点通信
最基本的通信方式,一个进程发送消息,另一个进程接收:
# 发送
MPI_Send(buffer, count, datatype, dest, tag, comm)
# 接收
MPI_Recv(buffer, count, datatype, source, tag, comm, status)
参数说明:
buffer:数据缓冲区count:元素个数datatype:数据类型dest/source:目标/源进程号tag:消息标签(用于区分不同类型的消息)comm:通信域
1.2 集合通信
广播(Broadcast)
将数据从根进程发送到所有进程:
MPI_Bcast(buffer, count, datatype, root, comm)
散播(Scatter)
将数据从根进程分散到所有进程:
MPI_Scatter(sendbuf, sendcount, sendtype,
recvbuf, recvcount, recvtype, root, comm)
收集(Gather)
将各进程的数据收集到根进程:
MPI_Gather(sendbuf, sendcount, sendtype,
recvbuf, recvcount, recvtype, root, comm)
归约(Reduce)
对所有进程的数据进行归约操作(求和、求最大等):
MPI_Reduce(sendbuf, recvbuf, count, datatype, op, root, comm)
常用归约操作:
MPI_SUM:求和MPI_MAX:最大值MPI_MIN:最小值MPI_PROD:乘积
2. MPI程序结构
一个典型的MPI程序结构如下:
from mpi4py import MPI
# 初始化MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank() # 当前进程号
size = comm.Get_size() # 总进程数
# 各进程执行计算
...
# 通信
comm.send(data, dest=1, tag=0)
data = comm.recv(source=0, tag=0)
# 集合通信
comm.bcast(data, root=0)
comm.scatter(data_list, root=0)
comm.gather(local_data, root=0)
comm.reduce(local_data, op=MPI.SUM, root=0)
# 结束MPI
MPI.Finalize()
3. 通信模式详解
3.1 阻塞与非阻塞通信
阻塞通信
- 发送操作:直到消息安全发送后才返回
- 接收操作:直到消息接收完成后才返回
- 优点:编程简单,数据安全
- 缺点:可能导致死锁,资源利用率低
非阻塞通信
- 发送/接收操作立即返回,通信在后台进行
- 需要显式等待操作完成
- 优点:可以重叠计算和通信
- 缺点:编程复杂,需要管理请求对象
# 非阻塞发送
req = comm.Isend(data, dest=1, tag=0)
# 执行其他计算...
req.Wait() # 等待发送完成
# 非阻塞接收
req = comm.Irecv(buffer, source=0, tag=0)
# 执行其他计算...
req.Wait() # 等待接收完成
3.2 同步与异步通信
同步通信
- 发送和接收进程需要同步
- 确保数据被接收后才继续执行
- 适合需要严格同步的场景
异步通信
- 发送和接收不需要同步
- 使用缓冲区暂存数据
- 适合计算密集型的异步算法
域分解方法
1. 什么是域分解?
域分解(Domain Decomposition)是将计算区域划分为多个子域,每个子域由一个处理器负责计算的方法。它是并行求解偏微分方程最常用的技术。
2. 域分解策略
2.1 一维分解
将网格沿某一方向(通常是y方向)划分为条带:
进程0: ┌─────────────┐
│ 子域0 │
├─────────────┤
进程1: │ 子域1 │
├─────────────┤
进程2: │ 子域2 │
├─────────────┤
进程3: │ 子域3 │
└─────────────┘
优点:
- 实现简单
- 通信只发生在相邻子域之间
- 适合细长型区域
缺点:
- 扩展性受限(最多分解为ny份)
- 通信面较大
2.2 二维分解
将网格沿x和y两个方向划分为块:
进程0 进程1
┌─────┬─────┐
进程0: │ 0,0 │ 0,1 │
├─────┼─────┤
进程1: │ 1,0 │ 1,1 │
└─────┴─────┘
优点:
- 更好的扩展性
- 更小的通信面
- 适合大规模并行
缺点:
- 实现复杂
- 需要与更多邻居通信
2.3 三维分解
对于三维问题,可以在三个方向上进行分解,获得更好的并行性。
3. 重叠区域(Ghost Cells)
在域分解中,相邻子域需要交换边界数据。为了实现这一点,每个子域需要包含额外的重叠区域:
重叠区 内部区 重叠区
↓ ↓ ↓
┌────────┬────────┬────────┐
│ ghost │ local │ ghost │
│ down │ domain │ up │
└────────┴────────┴────────┘
↑ ↑
来自进程p-1 来自进程p+1
重叠区域的作用:
- 存储来自相邻子域的边界数据
- 避免在迭代计算中频繁通信
- 保持数值格式的局部性
4. 负载均衡
负载均衡是指将计算任务均匀分配给各个处理器,避免某些处理器空闲等待。
静态负载均衡
- 在计算开始前确定任务分配
- 适用于计算量均匀分布的问题
- 实现简单,开销小
动态负载均衡
- 在计算过程中动态调整任务分配
- 适用于计算量不均匀的问题
- 实现复杂,需要额外的通信开销
负载均衡策略:
- 块划分:将网格均匀划分为大小相近的块
- 循环分配:按循环顺序分配网格单元
- 图划分:使用图划分算法(如METIS)最小化通信
Python代码实现
1. 环境准备
本教程使用Python的multiprocessing模块模拟MPI并行计算。实际MPI应用需要使用mpi4py库。
# 安装mpi4py(可选,用于实际MPI应用)
pip install mpi4py
# 本教程依赖
pip install numpy matplotlib
2. 并行热传导求解器
以下是并行热传导求解器的核心实现:
import numpy as np
from multiprocessing import Pool, cpu_count
class ParallelHeatSolver:
"""
并行热传导求解器
使用域分解方法将大问题分解为多个子域并行求解
"""
def __init__(self, nx, ny, num_processes=None):
"""
初始化并行求解器
参数:
nx, ny: 全局网格尺寸
num_processes: 进程数,默认为CPU核心数
"""
self.nx = nx
self.ny = ny
self.num_processes = num_processes or cpu_count()
# 计算子域分解
self._decompose_domain()
def _decompose_domain(self):
"""
域分解:将全局网格分解为子域
使用一维条带分解(沿y方向)
"""
# 计算每个子域的y方向大小
base_size = self.ny // self.num_processes
remainder = self.ny % self.num_processes
self.subdomains = []
start_y = 0
for rank in range(self.num_processes):
# 分配额外的行给前remainder个进程
local_ny = base_size + (1 if rank < remainder else 0)
# 添加重叠区域(用于通信)
ghost_bottom = 1 if rank > 0 else 0
ghost_top = 1 if rank < self.num_processes - 1 else 0
total_ny = local_ny + ghost_bottom + ghost_top
self.subdomains.append({
'rank': rank,
'start_y': start_y,
'local_ny': local_ny,
'total_ny': total_ny,
'ghost_bottom': ghost_bottom,
'ghost_top': ghost_top,
})
start_y += local_ny
3. 子域求解
def _solve_subdomain(self, args):
"""
求解单个子域(用于多进程并行)
参数:
args: (subdomain_info, u_global, f_global, alpha, dt, num_steps)
返回:
result: 子域求解结果
"""
sub, u_global, f_global, alpha, dt, num_steps = args
rank = sub['rank']
nx = self.nx
total_ny = sub['total_ny']
local_ny = sub['local_ny']
ghost_bottom = sub['ghost_bottom']
ghost_top = sub['ghost_top']
# 提取子域数据(包含重叠区域)
y_start_global = sub['start_y'] - ghost_bottom
y_end_global = sub['start_y'] + local_ny + ghost_top
u_local = u_global[max(0, y_start_global):min(self.ny+1, y_end_global+1), :].copy()
f_local = f_global[max(0, y_start_global):min(self.ny+1, y_end_global+1), :].copy()
# 时间步进
for step in range(num_steps):
u_new = u_local.copy()
# 内部点更新(不包括重叠区域)
for j in range(ghost_bottom, total_ny - ghost_top):
for i in range(1, nx):
# 热传导方程离散
u_new[j, i] = u_local[j, i] + alpha * dt * (
(u_local[j, i+1] - 2*u_local[j, i] + u_local[j, i-1]) +
(u_local[j+1, i] - 2*u_local[j, i] + u_local[j-1, i])
) + dt * f_local[j, i]
u_local = u_new
# 返回结果(不包括重叠区域)
result = {
'rank': rank,
'start_y': sub['start_y'],
'local_ny': local_ny,
'solution': u_local[ghost_bottom:ghost_bottom+local_ny+1, :].copy()
}
return result
4. 并行求解与结果合并
def solve_parallel(self, u0, f, alpha, dt, num_steps):
"""
并行求解热传导方程
"""
start_time = time.time()
# 准备参数
args_list = [(sub, u0, f, alpha, dt, num_steps) for sub in self.subdomains]
# 并行求解
with Pool(processes=self.num_processes) as pool:
results = pool.map(self._solve_subdomain, args_list)
# 合并结果
u_final = np.zeros((self.ny + 1, self.nx + 1))
for result in results:
start_y = result['start_y']
local_ny = result['local_ny']
u_final[start_y:start_y+local_ny+1, :] = result['solution']
time_elapsed = time.time() - start_time
return u_final, time_elapsed
案例实战
案例1:基本MPI操作演示
本案例展示MPI的核心通信模式:
class MPISimulator:
"""MPI通信模式模拟器"""
@staticmethod
def demonstrate_broadcast(data, root=0, num_processes=4):
"""模拟MPI_Bcast:广播操作"""
print(f"\nMPI广播演示 (根进程={root}):")
print(f" 广播前: 进程{root}有数据 {data}")
results = [None] * num_processes
results[root] = data
for rank in range(num_processes):
if rank != root:
results[rank] = data
print(f" 广播后: 进程{rank}接收到数据 {data}")
return results
运行结果:
MPI广播演示 (根进程=0):
广播前: 进程0有数据 重要配置信息
广播后: 进程1接收到数据 重要配置信息
广播后: 进程2接收到数据 重要配置信息
广播后: 进程3接收到数据 重要配置信息
案例2:并行 vs 串行性能对比
本案例对比不同进程数下的求解性能:
def example_2_parallel_vs_serial():
nx = ny = 128
alpha = 0.01
dt = 0.001
num_steps = 100
# 串行求解
solver = ParallelHeatSolver(nx, ny, num_processes=1)
u_serial, time_serial = solver.solve_serial(u0, f, alpha, dt, num_steps)
print(f"串行时间: {time_serial:.4f}秒")
# 并行求解
for num_proc in [2, 4, 6, 8]:
solver = ParallelHeatSolver(nx, ny, num_processes=num_proc)
u_parallel, time_parallel = solver.solve_parallel(u0, f, alpha, dt, num_steps)
speedup = time_serial / time_parallel
efficiency = speedup / num_proc
print(f"{num_proc}进程: 加速比={speedup:.2f}x, 效率={efficiency*100:.1f}%")
性能结果(128×128网格,100时间步):
| 进程数 | 时间(秒) | 加速比 | 效率 |
|---|---|---|---|
| 1(串行) | 2.18 | 1.00 | 100% |
| 2 | 1.45 | 1.50 | 75% |
| 4 | 0.95 | 2.29 | 57% |
| 8 | 0.72 | 3.03 | 38% |
分析:
- 随着进程数增加,加速比提高但效率下降
- 效率下降的原因:通信开销、负载不均衡、串行部分
案例3:域分解可视化
本案例可视化不同进程数下的域分解结果:

观察:
- 1进程:整个区域由一个进程处理
- 2进程:区域沿y方向分为两个条带
- 4进程:区域分为四个条带
- 8进程:更细的条带划分
案例4:强可扩展性分析
固定问题规模(256×256网格),增加进程数:

结果分析:
- 2进程:加速比1.27x
- 4进程:加速比1.77x
- 6进程:加速比1.67x(效率开始下降)
- 8进程:加速比1.51x(效率继续下降)
效率下降原因:
- 通信开销随进程数增加
- 子域过小导致计算/通信比降低
- 进程启动和同步开销
案例5:弱可扩展性分析
每进程固定网格大小(64×64),增加进程数:

结果分析:
- 理想情况下,执行时间应保持不变
- 实际效率从100%逐渐下降到约58%
- 主要原因是通信开销随进程数增加
案例6:热扩散过程可视化
展示多个热源在并行求解下的扩散过程:

物理现象:
- t=0:四个独立的高斯热源
- t=50:热源开始扩散,温度场开始融合
- t=100:温度场进一步融合
- t=200:温度场基本均匀
- t=500:接近稳态分布
附录:完整代码
完整的示例代码见文件:实例一_并行热传导仿真.py
运行命令:
python 实例一_并行热传导仿真.py
生成的可视化结果:
案例2_并行性能对比.png:并行性能对比图案例3_域分解可视化.png:域分解可视化案例4_强可扩展性.png:强可扩展性分析案例5_弱可扩展性.png:弱可扩展性分析案例6_热扩散过程.png:热扩散过程可视化
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
主题063: 并行计算与MPI仿真
核心内容: 域分解、负载均衡、通信优化、并行热传导求解
注意: 本代码使用Python multiprocessing模拟MPI并行计算
实际MPI应用需要使用mpi4py库
"""
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from multiprocessing import Pool, cpu_count
import time
import warnings
warnings.filterwarnings('ignore')
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
class ParallelHeatSolver:
"""
并行热传导求解器
使用域分解方法将大问题分解为多个子域并行求解
"""
def __init__(self, nx, ny, num_processes=None):
"""
初始化并行求解器
参数:
nx, ny: 全局网格尺寸
num_processes: 进程数,默认为CPU核心数
"""
self.nx = nx
self.ny = ny
self.num_processes = num_processes or cpu_count()
print(f"并行热传导求解器初始化:")
print(f" 全局网格: {nx}×{ny}")
print(f" 进程数: {self.num_processes}")
# 计算子域分解
self._decompose_domain()
def _decompose_domain(self):
"""
域分解:将全局网格分解为子域
使用一维条带分解(沿y方向)
"""
# 计算每个子域的y方向大小
base_size = self.ny // self.num_processes
remainder = self.ny % self.num_processes
self.subdomains = []
start_y = 0
for rank in range(self.num_processes):
# 分配额外的行给前remainder个进程
local_ny = base_size + (1 if rank < remainder else 0)
# 添加重叠区域(用于通信)
# 每个子域包含上下各一个重叠行(边界子域除外)
ghost_bottom = 1 if rank > 0 else 0
ghost_top = 1 if rank < self.num_processes - 1 else 0
total_ny = local_ny + ghost_bottom + ghost_top
self.subdomains.append({
'rank': rank,
'start_y': start_y,
'local_ny': local_ny,
'total_ny': total_ny,
'ghost_bottom': ghost_bottom,
'ghost_top': ghost_top,
'y_range': (start_y - ghost_bottom, start_y + local_ny + ghost_top)
})
start_y += local_ny
print(f"\n域分解信息:")
for sub in self.subdomains:
print(f" 进程{sub['rank']}: y={sub['y_range']}, 本地网格={self.nx}×{sub['total_ny']}")
def _solve_subdomain(self, args):
"""
求解单个子域(用于多进程并行)
参数:
args: (subdomain_info, u_global, f_global, alpha, dt, num_steps)
返回:
result: 子域求解结果
"""
sub, u_global, f_global, alpha, dt, num_steps = args
rank = sub['rank']
nx = self.nx
total_ny = sub['total_ny']
local_ny = sub['local_ny']
ghost_bottom = sub['ghost_bottom']
ghost_top = sub['ghost_top']
# 提取子域数据(包含重叠区域)
y_start_global = sub['start_y'] - ghost_bottom
y_end_global = sub['start_y'] + local_ny + ghost_top
u_local = u_global[max(0, y_start_global):min(self.ny+1, y_end_global+1), :].copy()
f_local = f_global[max(0, y_start_global):min(self.ny+1, y_end_global+1), :].copy()
# 如果子域在边界,需要调整大小
if u_local.shape[0] != total_ny + 1:
# 填充边界
temp_u = np.zeros((total_ny + 1, nx + 1))
temp_f = np.zeros((total_ny + 1, nx + 1))
if y_start_global < 0:
# 底部边界
offset = -y_start_global
temp_u[offset:offset+u_local.shape[0], :] = u_local
temp_f[offset:offset+f_local.shape[0], :] = f_local
else:
# 顶部边界
temp_u[:u_local.shape[0], :] = u_local
temp_f[:f_local.shape[0], :] = f_local
u_local = temp_u
f_local = temp_f
# 时间步进
for step in range(num_steps):
u_new = u_local.copy()
# 内部点更新(不包括重叠区域)
# 注意:循环范围应该是 ghost_bottom 到 total_ny - ghost_top(不包含)
# 因为 u_local 的 shape 是 (total_ny+1, nx+1)
for j in range(ghost_bottom, total_ny - ghost_top):
for i in range(1, nx):
# 热传导方程离散
u_new[j, i] = u_local[j, i] + alpha * dt * (
(u_local[j, i+1] - 2*u_local[j, i] + u_local[j, i-1]) +
(u_local[j+1, i] - 2*u_local[j, i] + u_local[j-1, i])
) + dt * f_local[j, i]
u_local = u_new
# 返回结果(不包括重叠区域)
result = {
'rank': rank,
'start_y': sub['start_y'],
'local_ny': local_ny,
'solution': u_local[ghost_bottom:ghost_bottom+local_ny+1, :].copy()
}
return result
def solve_parallel(self, u0, f, alpha, dt, num_steps):
"""
并行求解热传导方程
参数:
u0: 初始温度场
f: 热源项
alpha: 热扩散系数
dt: 时间步长
num_steps: 时间步数
返回:
u: 最终温度场
time_elapsed: 计算时间
"""
start_time = time.time()
# 准备参数
args_list = [(sub, u0, f, alpha, dt, num_steps) for sub in self.subdomains]
# 并行求解
with Pool(processes=self.num_processes) as pool:
results = pool.map(self._solve_subdomain, args_list)
# 合并结果
u_final = np.zeros((self.ny + 1, self.nx + 1))
for result in results:
start_y = result['start_y']
local_ny = result['local_ny']
u_final[start_y:start_y+local_ny+1, :] = result['solution']
time_elapsed = time.time() - start_time
return u_final, time_elapsed
def solve_serial(self, u0, f, alpha, dt, num_steps):
"""
串行求解(用于对比)
"""
start_time = time.time()
u = u0.copy()
nx, ny = self.nx, self.ny
for step in range(num_steps):
u_new = u.copy()
for j in range(1, ny):
for i in range(1, nx):
u_new[j, i] = u[j, i] + alpha * dt * (
(u[j, i+1] - 2*u[j, i] + u[j, i-1]) +
(u[j+1, i] - 2*u[j, i] + u[j-1, i])
) + dt * f[j, i]
u = u_new
time_elapsed = time.time() - start_time
return u, time_elapsed
class MPISimulator:
"""
MPI通信模式模拟器
展示MPI中的基本通信模式
"""
@staticmethod
def demonstrate_broadcast(data, root=0, num_processes=4):
"""
模拟MPI_Bcast:广播操作
参数:
data: 要广播的数据
root: 根进程
num_processes: 进程数
返回:
results: 每个进程接收到的数据
"""
print(f"\nMPI广播演示 (根进程={root}):")
print(f" 广播前: 进程{root}有数据 {data}")
results = [None] * num_processes
results[root] = data
# 模拟广播
for rank in range(num_processes):
if rank != root:
results[rank] = data
print(f" 广播后: 进程{rank}接收到数据 {data}")
return results
@staticmethod
def demonstrate_scatter(data_list, root=0):
"""
模拟MPI_Scatter:散播操作
参数:
data_list: 要散播的数据列表
root: 根进程
返回:
results: 每个进程接收到的数据
"""
num_processes = len(data_list)
print(f"\nMPI散播演示 (根进程={root}):")
print(f" 散播前: 进程{root}有数据列表 {data_list}")
results = []
for rank in range(num_processes):
results.append(data_list[rank])
print(f" 散播后: 进程{rank}接收到数据 {data_list[rank]}")
return results
@staticmethod
def demonstrate_gather(data_list, root=0):
"""
模拟MPI_Gather:收集操作
参数:
data_list: 每个进程的数据列表
root: 根进程
返回:
gathered_data: 收集后的数据
"""
print(f"\nMPI收集演示 (根进程={root}):")
for rank, data in enumerate(data_list):
print(f" 收集前: 进程{rank}有数据 {data}")
gathered_data = data_list.copy()
print(f" 收集后: 进程{root}收集到数据 {gathered_data}")
return gathered_data
@staticmethod
def demonstrate_reduce(data_list, operation='sum', root=0):
"""
模拟MPI_Reduce:归约操作
参数:
data_list: 每个进程的数据列表
operation: 归约操作 ('sum', 'max', 'min')
root: 根进程
返回:
result: 归约结果
"""
print(f"\nMPI归约演示 (操作={operation}, 根进程={root}):")
for rank, data in enumerate(data_list):
print(f" 进程{rank}贡献数据 {data}")
if operation == 'sum':
result = sum(data_list)
elif operation == 'max':
result = max(data_list)
elif operation == 'min':
result = min(data_list)
else:
result = None
print(f" 归约结果: {result} (存储在进程{root})")
return result
def example_1_basic_mpi_operations():
"""案例1: 基本MPI操作演示"""
print("\n" + "="*60)
print("案例1: 基本MPI操作演示")
print("="*60)
mpi = MPISimulator()
# 广播
data = "重要配置信息"
mpi.demonstrate_broadcast(data, root=0, num_processes=4)
# 散播
data_list = [10, 20, 30, 40]
mpi.demonstrate_scatter(data_list, root=0)
# 收集
data_list = [1.1, 2.2, 3.3, 4.4]
mpi.demonstrate_gather(data_list, root=0)
# 归约
data_list = [100, 200, 300, 400]
mpi.demonstrate_reduce(data_list, operation='sum', root=0)
mpi.demonstrate_reduce(data_list, operation='max', root=0)
def example_2_parallel_vs_serial():
"""案例2: 并行 vs 串行性能对比"""
print("\n" + "="*60)
print("案例2: 并行 vs 串行性能对比")
print("="*60)
# 网格参数
nx = ny = 128
# 物理参数
alpha = 0.01
dt = 0.001
num_steps = 100
# 初始化
x = np.linspace(0, 1, nx+1)
y = np.linspace(0, 1, ny+1)
X, Y = np.meshgrid(x, y)
# 初始条件:高斯分布
u0 = np.exp(-((X-0.5)**2 + (Y-0.5)**2) / 0.1)
# 热源项
f = np.zeros((ny+1, nx+1))
# 边界条件
u0[0, :] = 0
u0[ny, :] = 0
u0[:, 0] = 0
u0[:, nx] = 0
print(f"\n网格尺寸: {nx}×{ny}")
print(f"时间步数: {num_steps}")
# 串行求解
print("\n串行求解...")
solver = ParallelHeatSolver(nx, ny, num_processes=1)
u_serial, time_serial = solver.solve_serial(u0, f, alpha, dt, num_steps)
print(f" 串行时间: {time_serial:.4f}秒")
# 并行求解(不同进程数)
process_counts = [2, 4, 6, 8]
speedups = []
efficiencies = []
for num_proc in process_counts:
if num_proc > cpu_count():
continue
print(f"\n并行求解 ({num_proc}进程)...")
solver = ParallelHeatSolver(nx, ny, num_processes=num_proc)
u_parallel, time_parallel = solver.solve_parallel(u0, f, alpha, dt, num_steps)
speedup = time_serial / time_parallel
efficiency = speedup / num_proc
speedups.append(speedup)
efficiencies.append(efficiency)
print(f" 并行时间: {time_parallel:.4f}秒")
print(f" 加速比: {speedup:.2f}x")
print(f" 效率: {efficiency*100:.1f}%")
# 验证结果一致性
error = np.linalg.norm(u_serial - u_parallel) / np.linalg.norm(u_serial)
print(f" 相对误差: {error:.6e}")
# 绘制加速比和效率
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
axes[0].plot(process_counts[:len(speedups)], speedups, 'b-o', linewidth=2, markersize=8)
axes[0].plot(process_counts[:len(speedups)], process_counts[:len(speedups)],
'r--', label='理想加速比', linewidth=2)
axes[0].set_xlabel('进程数', fontsize=12)
axes[0].set_ylabel('加速比', fontsize=12)
axes[0].set_title('并行加速比', fontsize=14)
axes[0].legend(fontsize=11)
axes[0].grid(True, alpha=0.3)
axes[1].plot(process_counts[:len(efficiencies)],
[e*100 for e in efficiencies], 'g-s', linewidth=2, markersize=8)
axes[1].axhline(y=100, color='r', linestyle='--', label='理想效率', linewidth=2)
axes[1].set_xlabel('进程数', fontsize=12)
axes[1].set_ylabel('并行效率 (%)', fontsize=12)
axes[1].set_title('并行效率', fontsize=14)
axes[1].legend(fontsize=11)
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('案例2_并行性能对比.png', dpi=150, bbox_inches='tight')
print("\n 图已保存: 案例2_并行性能对比.png")
plt.close()
def example_3_domain_decomposition():
"""案例3: 域分解可视化"""
print("\n" + "="*60)
print("案例3: 域分解可视化")
print("="*60)
nx = ny = 64
process_counts = [1, 2, 4, 8]
fig, axes = plt.subplots(2, 2, figsize=(12, 12))
axes = axes.flatten()
for idx, num_proc in enumerate(process_counts):
if num_proc > cpu_count():
continue
solver = ParallelHeatSolver(nx, ny, num_processes=num_proc)
# 创建域分解可视化
domain_map = np.zeros((ny+1, nx+1))
colors = plt.cm.tab10(np.linspace(0, 1, num_proc))
for sub in solver.subdomains:
rank = sub['rank']
start_y = sub['start_y']
local_ny = sub['local_ny']
domain_map[start_y:start_y+local_ny+1, :] = rank + 1
ax = axes[idx]
im = ax.imshow(domain_map, extent=[0, 1, 0, 1], origin='lower',
cmap='tab10', vmin=0.5, vmax=num_proc+0.5)
ax.set_title(f'{num_proc}进程域分解', fontsize=12)
ax.set_xlabel('x')
ax.set_ylabel('y')
# 添加网格线
for sub in solver.subdomains:
if sub['rank'] > 0:
y_pos = sub['start_y'] / ny
ax.axhline(y=y_pos, color='white', linewidth=2, linestyle='--')
plt.tight_layout()
plt.savefig('案例3_域分解可视化.png', dpi=150, bbox_inches='tight')
print(" 图已保存: 案例3_域分解可视化.png")
plt.close()
def example_4_strong_scaling():
"""案例4: 强可扩展性分析"""
print("\n" + "="*60)
print("案例4: 强可扩展性分析")
print("="*60)
# 固定问题规模,增加进程数
nx = ny = 256
alpha = 0.01
dt = 0.001
num_steps = 50
# 初始化
x = np.linspace(0, 1, nx+1)
y = np.linspace(0, 1, ny+1)
X, Y = np.meshgrid(x, y)
u0 = np.exp(-((X-0.5)**2 + (Y-0.5)**2) / 0.1)
f = np.zeros((ny+1, nx+1))
u0[0, :] = u0[ny, :] = u0[:, 0] = u0[:, nx] = 0
print(f"\n固定问题规模: {nx}×{ny}")
# 串行基准
solver = ParallelHeatSolver(nx, ny, num_processes=1)
u_serial, time_serial = solver.solve_serial(u0, f, alpha, dt, num_steps)
print(f"串行时间: {time_serial:.4f}秒")
# 不同进程数
process_counts = [1, 2, 4, 6, 8]
speedups = []
for num_proc in process_counts:
if num_proc > cpu_count():
continue
print(f"\n{num_proc}进程...")
solver = ParallelHeatSolver(nx, ny, num_processes=num_proc)
u_parallel, time_parallel = solver.solve_parallel(u0, f, alpha, dt, num_steps)
speedup = time_serial / time_parallel
speedups.append(speedup)
print(f" 时间: {time_parallel:.4f}秒, 加速比: {speedup:.2f}x")
# 绘制强可扩展性
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(process_counts[:len(speedups)], speedups, 'b-o',
linewidth=2, markersize=8, label='实际加速比')
ax.plot(process_counts[:len(speedups)], process_counts[:len(speedups)],
'r--', linewidth=2, label='理想加速比')
ax.set_xlabel('进程数', fontsize=12)
ax.set_ylabel('加速比', fontsize=12)
ax.set_title('强可扩展性分析 (固定问题规模)', fontsize=14)
ax.legend(fontsize=11)
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('案例4_强可扩展性.png', dpi=150, bbox_inches='tight')
print("\n 图已保存: 案例4_强可扩展性.png")
plt.close()
def example_5_weak_scaling():
"""案例5: 弱可扩展性分析"""
print("\n" + "="*60)
print("案例5: 弱可扩展性分析")
print("="*60)
# 每个进程固定网格大小
local_nx = local_ny = 64
alpha = 0.01
dt = 0.001
num_steps = 50
process_counts = [1, 2, 4, 6, 8]
times = []
print(f"\n每进程固定网格: {local_nx}×{local_ny}")
for num_proc in process_counts:
if num_proc > cpu_count():
continue
# 全局网格随进程数增加
nx = local_nx
ny = local_ny * num_proc
print(f"\n{num_proc}进程, 全局网格: {nx}×{ny}")
# 初始化
x = np.linspace(0, 1, nx+1)
y = np.linspace(0, 1, ny+1)
X, Y = np.meshgrid(x, y)
u0 = np.exp(-((X-0.5)**2 + (Y-0.5)**2) / 0.1)
f = np.zeros((ny+1, nx+1))
u0[0, :] = u0[ny, :] = u0[:, 0] = u0[:, nx] = 0
solver = ParallelHeatSolver(nx, ny, num_processes=num_proc)
u_parallel, time_parallel = solver.solve_parallel(u0, f, alpha, dt, num_steps)
times.append(time_parallel)
print(f" 时间: {time_parallel:.4f}秒")
# 绘制弱可扩展性
fig, ax = plt.subplots(figsize=(10, 6))
# 效率 = 基准时间 / 当前时间
base_time = times[0]
efficiencies = [base_time / t * 100 for t in times]
ax.plot(process_counts[:len(efficiencies)], efficiencies, 'g-s',
linewidth=2, markersize=8)
ax.axhline(y=100, color='r', linestyle='--', linewidth=2, label='理想效率')
ax.set_xlabel('进程数', fontsize=12)
ax.set_ylabel('效率 (%)', fontsize=12)
ax.set_title('弱可扩展性分析 (每进程固定负载)', fontsize=14)
ax.legend(fontsize=11)
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('案例5_弱可扩展性.png', dpi=150, bbox_inches='tight')
print("\n 图已保存: 案例5_弱可扩展性.png")
plt.close()
def example_6_heat_diffusion_animation():
"""案例6: 热扩散过程可视化"""
print("\n" + "="*60)
print("案例6: 热扩散过程可视化")
print("="*60)
nx = ny = 64
alpha = 0.01
dt = 0.001
# 初始化
x = np.linspace(0, 1, nx+1)
y = np.linspace(0, 1, ny+1)
X, Y = np.meshgrid(x, y)
# 初始条件:多个热源
u0 = np.zeros((ny+1, nx+1))
u0 += np.exp(-((X-0.3)**2 + (Y-0.3)**2) / 0.02)
u0 += np.exp(-((X-0.7)**2 + (Y-0.7)**2) / 0.02)
u0 += np.exp(-((X-0.3)**2 + (Y-0.7)**2) / 0.02)
u0 += np.exp(-((X-0.7)**2 + (Y-0.3)**2) / 0.02)
f = np.zeros((ny+1, nx+1))
# 边界条件
u0[0, :] = 0
u0[ny, :] = 0
u0[:, 0] = 0
u0[:, nx] = 0
# 并行求解并记录中间结果
solver = ParallelHeatSolver(nx, ny, num_processes=4)
# 分阶段求解
time_steps = [0, 50, 100, 200, 500]
solutions = [u0.copy()]
u_current = u0.copy()
prev_steps = 0
for steps in time_steps[1:]:
u_current, _ = solver.solve_parallel(u_current, f, alpha, dt, steps - prev_steps)
solutions.append(u_current.copy())
prev_steps = steps
# 绘制热扩散过程
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()
for idx, (u, step) in enumerate(zip(solutions, time_steps)):
ax = axes[idx]
im = ax.contourf(X, Y, u, levels=20, cmap='hot')
ax.set_title(f'时间步 = {step}', fontsize=12)
ax.set_xlabel('x')
ax.set_ylabel('y')
plt.colorbar(im, ax=ax)
plt.tight_layout()
plt.savefig('案例6_热扩散过程.png', dpi=150, bbox_inches='tight')
print(" 图已保存: 案例6_热扩散过程.png")
plt.close()
if __name__ == "__main__":
print("="*60)
print("主题063: 并行计算与MPI仿真")
print("="*60)
print(f"\n系统CPU核心数: {cpu_count()}")
# 运行所有案例
example_1_basic_mpi_operations()
example_2_parallel_vs_serial()
example_3_domain_decomposition()
example_4_strong_scaling()
example_5_weak_scaling()
example_6_heat_diffusion_animation()
print("\n" + "="*60)
print("仿真完成!")
print("="*60)
print("\n生成的文件:")
print(" 1. 案例2_并行性能对比.png")
print(" 2. 案例3_域分解可视化.png")
print(" 3. 案例4_强可扩展性.png")
print(" 4. 案例5_弱可扩展性.png")
print(" 5. 案例6_热扩散过程.png")
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐




所有评论(0)