主题063:并行计算与MPI仿真教程

目录

  1. 引言
  2. 并行计算基础理论
  3. MPI编程模型
  4. 域分解方法
  5. Python代码实现
  6. 案例实战
  7. 性能分析与优化
  8. 总结与习题

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

引言

为什么需要并行计算?

在现代科学与工程计算中,仿真模型的规模和复杂度呈指数级增长。一个典型的计算流体力学(CFD)仿真可能需要处理数十亿个网格单元,一个有限元结构分析可能涉及数百万个自由度。传统的串行计算已经无法满足这些需求:

  1. 计算时间限制:串行计算的时间随问题规模呈超线性增长
  2. 内存限制:单台计算机的内存容量有限
  3. 物理建模需求:更精细的网格、更复杂的物理模型需要更多计算资源

并行计算通过将大问题分解为多个小问题,在多个处理器上同时求解,是解决上述挑战的关键技术。

MPI简介

**MPI(Message Passing Interface,消息传递接口)**是并行计算领域最广泛使用的标准。它定义了一套用于进程间通信的函数接口,支持:

  • 点对点通信(发送/接收)
  • 集合通信(广播、散播、收集、归约)
  • 进程组和通信域管理
  • 并行I/O操作

MPI的优势在于:

  • 可移植性:支持从笔记本电脑到超级计算机的各种平台
  • 可扩展性:支持数万甚至数十万个进程
  • 成熟稳定:经过30多年的发展和优化

本教程目标

通过本教程,你将学会:

  1. 理解并行计算的基本概念和性能指标
  2. 掌握MPI的核心通信模式
  3. 实现基于域分解的并行热传导求解器
  4. 分析并行程序的性能和可扩展性

并行计算基础理论

1. 并行计算模型

1.1 Flynn分类法

计算机体系结构根据指令流和数据流的关系可分为四类:

类型 名称 描述 示例
SISD 单指令单数据 传统串行计算机 早期PC
SIMD 单指令多数据 向量处理器、GPU GPU计算
MISD 多指令单数据 流水线处理 不太常见
MIMD 多指令多数据 多核处理器、集群 现代多核CPU

科学计算中最常用的是MIMD模型,MPI正是基于这一模型设计的。

1.2 内存模型

并行计算系统根据内存组织方式可分为两类:

共享内存系统(Shared Memory)

  • 所有处理器共享统一的地址空间
  • 通过读写共享变量进行通信
  • 编程模型:OpenMP、Pthreads
  • 优点:编程简单,数据共享方便
  • 缺点:扩展性受限,缓存一致性问题

分布式内存系统(Distributed Memory)

  • 每个处理器拥有独立的私有内存
  • 通过显式消息传递进行通信
  • 编程模型:MPI
  • 优点:可扩展性好,适合大规模并行
  • 缺点:编程复杂,需要显式管理数据分布

混合系统

  • 现代超级计算机通常采用混合架构
  • 节点内使用共享内存(OpenMP)
  • 节点间使用分布式内存(MPI)

2. 并行性能指标

2.1 加速比(Speedup)

加速比衡量并行程序相对于串行程序的性能提升:

S ( p ) = T 1 T p S(p) = \frac{T_1}{T_p} S(p)=TpT1

其中:

  • T 1 T_1 T1:串行执行时间
  • T p T_p Tp:使用 p p p个进程的执行时间
  • S ( p ) S(p) S(p):加速比

理想情况 S ( p ) = p S(p) = p S(p)=p(线性加速)

2.2 并行效率(Efficiency)

并行效率衡量处理器资源的利用率:

E ( p ) = S ( p ) p = T 1 p ⋅ T p E(p) = \frac{S(p)}{p} = \frac{T_1}{p \cdot T_p} E(p)=pS(p)=pTpT1

理想情况 E ( p ) = 1 E(p) = 1 E(p)=1(100%效率)

2.3 可扩展性

强可扩展性(Strong Scaling)

  • 固定问题规模,增加处理器数量
  • 目标:减少求解时间
  • 理想加速比: S ( p ) = p S(p) = p S(p)=p

弱可扩展性(Weak Scaling)

  • 每处理器固定问题规模,增加处理器数量
  • 目标:在相同时间内求解更大问题
  • 理想情况:执行时间保持不变

3. Amdahl定律与Gustafson定律

3.1 Amdahl定律

假设程序中可并行化的比例为 f f f,则最大加速比为:

S m a x = 1 ( 1 − f ) + f p S_{max} = \frac{1}{(1-f) + \frac{f}{p}} Smax=(1f)+pf1

p → ∞ p \to \infty p时:

S m a x → 1 1 − f S_{max} \to \frac{1}{1-f} Smax1f1

启示:即使使用无限多处理器,加速比也受限于串行部分的比例。

3.2 Gustafson定律

Amdahl定律假设问题规模固定,而Gustafson定律假设每处理器的工作量固定:

S ( p ) = p − f ( p − 1 ) S(p) = p - f(p-1) S(p)=pf(p1)

启示:如果随处理器增加而扩大问题规模,可以获得接近线性的加速比。


MPI编程模型

1. MPI基本通信模式

1.1 点对点通信

最基本的通信方式,一个进程发送消息,另一个进程接收:

# 发送
MPI_Send(buffer, count, datatype, dest, tag, comm)

# 接收
MPI_Recv(buffer, count, datatype, source, tag, comm, status)

参数说明:

  • buffer:数据缓冲区
  • count:元素个数
  • datatype:数据类型
  • dest/source:目标/源进程号
  • tag:消息标签(用于区分不同类型的消息)
  • comm:通信域
1.2 集合通信

广播(Broadcast)

将数据从根进程发送到所有进程:

MPI_Bcast(buffer, count, datatype, root, comm)

散播(Scatter)

将数据从根进程分散到所有进程:

MPI_Scatter(sendbuf, sendcount, sendtype, 
            recvbuf, recvcount, recvtype, root, comm)

收集(Gather)

将各进程的数据收集到根进程:

MPI_Gather(sendbuf, sendcount, sendtype,
           recvbuf, recvcount, recvtype, root, comm)

归约(Reduce)

对所有进程的数据进行归约操作(求和、求最大等):

MPI_Reduce(sendbuf, recvbuf, count, datatype, op, root, comm)

常用归约操作:

  • MPI_SUM:求和
  • MPI_MAX:最大值
  • MPI_MIN:最小值
  • MPI_PROD:乘积

2. MPI程序结构

一个典型的MPI程序结构如下:

from mpi4py import MPI

# 初始化MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()  # 当前进程号
size = comm.Get_size()  # 总进程数

# 各进程执行计算
...

# 通信
comm.send(data, dest=1, tag=0)
data = comm.recv(source=0, tag=0)

# 集合通信
comm.bcast(data, root=0)
comm.scatter(data_list, root=0)
comm.gather(local_data, root=0)
comm.reduce(local_data, op=MPI.SUM, root=0)

# 结束MPI
MPI.Finalize()

3. 通信模式详解

3.1 阻塞与非阻塞通信

阻塞通信

  • 发送操作:直到消息安全发送后才返回
  • 接收操作:直到消息接收完成后才返回
  • 优点:编程简单,数据安全
  • 缺点:可能导致死锁,资源利用率低

非阻塞通信

  • 发送/接收操作立即返回,通信在后台进行
  • 需要显式等待操作完成
  • 优点:可以重叠计算和通信
  • 缺点:编程复杂,需要管理请求对象
# 非阻塞发送
req = comm.Isend(data, dest=1, tag=0)
# 执行其他计算...
req.Wait()  # 等待发送完成

# 非阻塞接收
req = comm.Irecv(buffer, source=0, tag=0)
# 执行其他计算...
req.Wait()  # 等待接收完成
3.2 同步与异步通信

同步通信

  • 发送和接收进程需要同步
  • 确保数据被接收后才继续执行
  • 适合需要严格同步的场景

异步通信

  • 发送和接收不需要同步
  • 使用缓冲区暂存数据
  • 适合计算密集型的异步算法

域分解方法

1. 什么是域分解?

域分解(Domain Decomposition)是将计算区域划分为多个子域,每个子域由一个处理器负责计算的方法。它是并行求解偏微分方程最常用的技术。

2. 域分解策略

2.1 一维分解

将网格沿某一方向(通常是y方向)划分为条带:

进程0: ┌─────────────┐
       │  子域0      │
       ├─────────────┤
进程1: │  子域1      │
       ├─────────────┤
进程2: │  子域2      │
       ├─────────────┤
进程3: │  子域3      │
       └─────────────┘

优点

  • 实现简单
  • 通信只发生在相邻子域之间
  • 适合细长型区域

缺点

  • 扩展性受限(最多分解为ny份)
  • 通信面较大
2.2 二维分解

将网格沿x和y两个方向划分为块:

       进程0      进程1
       ┌─────┬─────┐
进程0: │ 0,0 │ 0,1 │
       ├─────┼─────┤
进程1: │ 1,0 │ 1,1 │
       └─────┴─────┘

优点

  • 更好的扩展性
  • 更小的通信面
  • 适合大规模并行

缺点

  • 实现复杂
  • 需要与更多邻居通信
2.3 三维分解

对于三维问题,可以在三个方向上进行分解,获得更好的并行性。

3. 重叠区域(Ghost Cells)

在域分解中,相邻子域需要交换边界数据。为了实现这一点,每个子域需要包含额外的重叠区域:

        重叠区    内部区    重叠区
        ↓        ↓        ↓
    ┌────────┬────────┬────────┐
    │  ghost │ local  │  ghost │
    │  down  │ domain │   up   │
    └────────┴────────┴────────┘
        ↑                 ↑
     来自进程p-1      来自进程p+1

重叠区域的作用:

  1. 存储来自相邻子域的边界数据
  2. 避免在迭代计算中频繁通信
  3. 保持数值格式的局部性

4. 负载均衡

负载均衡是指将计算任务均匀分配给各个处理器,避免某些处理器空闲等待。

静态负载均衡

  • 在计算开始前确定任务分配
  • 适用于计算量均匀分布的问题
  • 实现简单,开销小

动态负载均衡

  • 在计算过程中动态调整任务分配
  • 适用于计算量不均匀的问题
  • 实现复杂,需要额外的通信开销

负载均衡策略

  1. 块划分:将网格均匀划分为大小相近的块
  2. 循环分配:按循环顺序分配网格单元
  3. 图划分:使用图划分算法(如METIS)最小化通信

Python代码实现

1. 环境准备

本教程使用Python的multiprocessing模块模拟MPI并行计算。实际MPI应用需要使用mpi4py库。

# 安装mpi4py(可选,用于实际MPI应用)
pip install mpi4py

# 本教程依赖
pip install numpy matplotlib

2. 并行热传导求解器

以下是并行热传导求解器的核心实现:

import numpy as np
from multiprocessing import Pool, cpu_count

class ParallelHeatSolver:
    """
    并行热传导求解器
    使用域分解方法将大问题分解为多个子域并行求解
    """
    
    def __init__(self, nx, ny, num_processes=None):
        """
        初始化并行求解器
        
        参数:
        nx, ny: 全局网格尺寸
        num_processes: 进程数,默认为CPU核心数
        """
        self.nx = nx
        self.ny = ny
        self.num_processes = num_processes or cpu_count()
        
        # 计算子域分解
        self._decompose_domain()
    
    def _decompose_domain(self):
        """
        域分解:将全局网格分解为子域
        使用一维条带分解(沿y方向)
        """
        # 计算每个子域的y方向大小
        base_size = self.ny // self.num_processes
        remainder = self.ny % self.num_processes
        
        self.subdomains = []
        start_y = 0
        
        for rank in range(self.num_processes):
            # 分配额外的行给前remainder个进程
            local_ny = base_size + (1 if rank < remainder else 0)
            
            # 添加重叠区域(用于通信)
            ghost_bottom = 1 if rank > 0 else 0
            ghost_top = 1 if rank < self.num_processes - 1 else 0
            
            total_ny = local_ny + ghost_bottom + ghost_top
            
            self.subdomains.append({
                'rank': rank,
                'start_y': start_y,
                'local_ny': local_ny,
                'total_ny': total_ny,
                'ghost_bottom': ghost_bottom,
                'ghost_top': ghost_top,
            })
            
            start_y += local_ny

3. 子域求解

    def _solve_subdomain(self, args):
        """
        求解单个子域(用于多进程并行)
        
        参数:
        args: (subdomain_info, u_global, f_global, alpha, dt, num_steps)
        
        返回:
        result: 子域求解结果
        """
        sub, u_global, f_global, alpha, dt, num_steps = args
        
        rank = sub['rank']
        nx = self.nx
        total_ny = sub['total_ny']
        local_ny = sub['local_ny']
        ghost_bottom = sub['ghost_bottom']
        ghost_top = sub['ghost_top']
        
        # 提取子域数据(包含重叠区域)
        y_start_global = sub['start_y'] - ghost_bottom
        y_end_global = sub['start_y'] + local_ny + ghost_top
        
        u_local = u_global[max(0, y_start_global):min(self.ny+1, y_end_global+1), :].copy()
        f_local = f_global[max(0, y_start_global):min(self.ny+1, y_end_global+1), :].copy()
        
        # 时间步进
        for step in range(num_steps):
            u_new = u_local.copy()
            
            # 内部点更新(不包括重叠区域)
            for j in range(ghost_bottom, total_ny - ghost_top):
                for i in range(1, nx):
                    # 热传导方程离散
                    u_new[j, i] = u_local[j, i] + alpha * dt * (
                        (u_local[j, i+1] - 2*u_local[j, i] + u_local[j, i-1]) +
                        (u_local[j+1, i] - 2*u_local[j, i] + u_local[j-1, i])
                    ) + dt * f_local[j, i]
            
            u_local = u_new
        
        # 返回结果(不包括重叠区域)
        result = {
            'rank': rank,
            'start_y': sub['start_y'],
            'local_ny': local_ny,
            'solution': u_local[ghost_bottom:ghost_bottom+local_ny+1, :].copy()
        }
        
        return result

4. 并行求解与结果合并

    def solve_parallel(self, u0, f, alpha, dt, num_steps):
        """
        并行求解热传导方程
        """
        start_time = time.time()
        
        # 准备参数
        args_list = [(sub, u0, f, alpha, dt, num_steps) for sub in self.subdomains]
        
        # 并行求解
        with Pool(processes=self.num_processes) as pool:
            results = pool.map(self._solve_subdomain, args_list)
        
        # 合并结果
        u_final = np.zeros((self.ny + 1, self.nx + 1))
        for result in results:
            start_y = result['start_y']
            local_ny = result['local_ny']
            u_final[start_y:start_y+local_ny+1, :] = result['solution']
        
        time_elapsed = time.time() - start_time
        
        return u_final, time_elapsed

案例实战

案例1:基本MPI操作演示

本案例展示MPI的核心通信模式:

class MPISimulator:
    """MPI通信模式模拟器"""
    
    @staticmethod
    def demonstrate_broadcast(data, root=0, num_processes=4):
        """模拟MPI_Bcast:广播操作"""
        print(f"\nMPI广播演示 (根进程={root}):")
        print(f"  广播前: 进程{root}有数据 {data}")
        
        results = [None] * num_processes
        results[root] = data
        
        for rank in range(num_processes):
            if rank != root:
                results[rank] = data
                print(f"  广播后: 进程{rank}接收到数据 {data}")
        
        return results

运行结果

MPI广播演示 (根进程=0):
  广播前: 进程0有数据 重要配置信息
  广播后: 进程1接收到数据 重要配置信息
  广播后: 进程2接收到数据 重要配置信息
  广播后: 进程3接收到数据 重要配置信息

案例2:并行 vs 串行性能对比

本案例对比不同进程数下的求解性能:

def example_2_parallel_vs_serial():
    nx = ny = 128
    alpha = 0.01
    dt = 0.001
    num_steps = 100
    
    # 串行求解
    solver = ParallelHeatSolver(nx, ny, num_processes=1)
    u_serial, time_serial = solver.solve_serial(u0, f, alpha, dt, num_steps)
    print(f"串行时间: {time_serial:.4f}秒")
    
    # 并行求解
    for num_proc in [2, 4, 6, 8]:
        solver = ParallelHeatSolver(nx, ny, num_processes=num_proc)
        u_parallel, time_parallel = solver.solve_parallel(u0, f, alpha, dt, num_steps)
        
        speedup = time_serial / time_parallel
        efficiency = speedup / num_proc
        
        print(f"{num_proc}进程: 加速比={speedup:.2f}x, 效率={efficiency*100:.1f}%")

性能结果(128×128网格,100时间步):

进程数 时间(秒) 加速比 效率
1(串行) 2.18 1.00 100%
2 1.45 1.50 75%
4 0.95 2.29 57%
8 0.72 3.03 38%

分析

  • 随着进程数增加,加速比提高但效率下降
  • 效率下降的原因:通信开销、负载不均衡、串行部分

案例3:域分解可视化

本案例可视化不同进程数下的域分解结果:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

观察

  • 1进程:整个区域由一个进程处理
  • 2进程:区域沿y方向分为两个条带
  • 4进程:区域分为四个条带
  • 8进程:更细的条带划分

案例4:强可扩展性分析

固定问题规模(256×256网格),增加进程数:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

结果分析

  • 2进程:加速比1.27x
  • 4进程:加速比1.77x
  • 6进程:加速比1.67x(效率开始下降)
  • 8进程:加速比1.51x(效率继续下降)

效率下降原因

  1. 通信开销随进程数增加
  2. 子域过小导致计算/通信比降低
  3. 进程启动和同步开销

案例5:弱可扩展性分析

每进程固定网格大小(64×64),增加进程数:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

结果分析

  • 理想情况下,执行时间应保持不变
  • 实际效率从100%逐渐下降到约58%
  • 主要原因是通信开销随进程数增加

案例6:热扩散过程可视化

展示多个热源在并行求解下的扩散过程:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

物理现象

  • t=0:四个独立的高斯热源
  • t=50:热源开始扩散,温度场开始融合
  • t=100:温度场进一步融合
  • t=200:温度场基本均匀
  • t=500:接近稳态分布

附录:完整代码

完整的示例代码见文件:实例一_并行热传导仿真.py

运行命令:

python 实例一_并行热传导仿真.py

生成的可视化结果:

  • 案例2_并行性能对比.png:并行性能对比图
  • 案例3_域分解可视化.png:域分解可视化
  • 案例4_强可扩展性.png:强可扩展性分析
  • 案例5_弱可扩展性.png:弱可扩展性分析
  • 案例6_热扩散过程.png:热扩散过程可视化

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
主题063: 并行计算与MPI仿真
核心内容: 域分解、负载均衡、通信优化、并行热传导求解

注意: 本代码使用Python multiprocessing模拟MPI并行计算
实际MPI应用需要使用mpi4py库
"""

import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from multiprocessing import Pool, cpu_count
import time
import warnings
warnings.filterwarnings('ignore')

plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False


class ParallelHeatSolver:
    """
    并行热传导求解器
    使用域分解方法将大问题分解为多个子域并行求解
    """
    
    def __init__(self, nx, ny, num_processes=None):
        """
        初始化并行求解器
        
        参数:
        nx, ny: 全局网格尺寸
        num_processes: 进程数,默认为CPU核心数
        """
        self.nx = nx
        self.ny = ny
        self.num_processes = num_processes or cpu_count()
        
        print(f"并行热传导求解器初始化:")
        print(f"  全局网格: {nx}×{ny}")
        print(f"  进程数: {self.num_processes}")
        
        # 计算子域分解
        self._decompose_domain()
    
    def _decompose_domain(self):
        """
        域分解:将全局网格分解为子域
        使用一维条带分解(沿y方向)
        """
        # 计算每个子域的y方向大小
        base_size = self.ny // self.num_processes
        remainder = self.ny % self.num_processes
        
        self.subdomains = []
        start_y = 0
        
        for rank in range(self.num_processes):
            # 分配额外的行给前remainder个进程
            local_ny = base_size + (1 if rank < remainder else 0)
            
            # 添加重叠区域(用于通信)
            # 每个子域包含上下各一个重叠行(边界子域除外)
            ghost_bottom = 1 if rank > 0 else 0
            ghost_top = 1 if rank < self.num_processes - 1 else 0
            
            total_ny = local_ny + ghost_bottom + ghost_top
            
            self.subdomains.append({
                'rank': rank,
                'start_y': start_y,
                'local_ny': local_ny,
                'total_ny': total_ny,
                'ghost_bottom': ghost_bottom,
                'ghost_top': ghost_top,
                'y_range': (start_y - ghost_bottom, start_y + local_ny + ghost_top)
            })
            
            start_y += local_ny
        
        print(f"\n域分解信息:")
        for sub in self.subdomains:
            print(f"  进程{sub['rank']}: y={sub['y_range']}, 本地网格={self.nx}×{sub['total_ny']}")
    
    def _solve_subdomain(self, args):
        """
        求解单个子域(用于多进程并行)
        
        参数:
        args: (subdomain_info, u_global, f_global, alpha, dt, num_steps)
        
        返回:
        result: 子域求解结果
        """
        sub, u_global, f_global, alpha, dt, num_steps = args
        
        rank = sub['rank']
        nx = self.nx
        total_ny = sub['total_ny']
        local_ny = sub['local_ny']
        ghost_bottom = sub['ghost_bottom']
        ghost_top = sub['ghost_top']
        
        # 提取子域数据(包含重叠区域)
        y_start_global = sub['start_y'] - ghost_bottom
        y_end_global = sub['start_y'] + local_ny + ghost_top
        
        u_local = u_global[max(0, y_start_global):min(self.ny+1, y_end_global+1), :].copy()
        f_local = f_global[max(0, y_start_global):min(self.ny+1, y_end_global+1), :].copy()
        
        # 如果子域在边界,需要调整大小
        if u_local.shape[0] != total_ny + 1:
            # 填充边界
            temp_u = np.zeros((total_ny + 1, nx + 1))
            temp_f = np.zeros((total_ny + 1, nx + 1))
            
            if y_start_global < 0:
                # 底部边界
                offset = -y_start_global
                temp_u[offset:offset+u_local.shape[0], :] = u_local
                temp_f[offset:offset+f_local.shape[0], :] = f_local
            else:
                # 顶部边界
                temp_u[:u_local.shape[0], :] = u_local
                temp_f[:f_local.shape[0], :] = f_local
            
            u_local = temp_u
            f_local = temp_f
        
        # 时间步进
        for step in range(num_steps):
            u_new = u_local.copy()
            
            # 内部点更新(不包括重叠区域)
            # 注意:循环范围应该是 ghost_bottom 到 total_ny - ghost_top(不包含)
            # 因为 u_local 的 shape 是 (total_ny+1, nx+1)
            for j in range(ghost_bottom, total_ny - ghost_top):
                for i in range(1, nx):
                    # 热传导方程离散
                    u_new[j, i] = u_local[j, i] + alpha * dt * (
                        (u_local[j, i+1] - 2*u_local[j, i] + u_local[j, i-1]) +
                        (u_local[j+1, i] - 2*u_local[j, i] + u_local[j-1, i])
                    ) + dt * f_local[j, i]
            
            u_local = u_new
        
        # 返回结果(不包括重叠区域)
        result = {
            'rank': rank,
            'start_y': sub['start_y'],
            'local_ny': local_ny,
            'solution': u_local[ghost_bottom:ghost_bottom+local_ny+1, :].copy()
        }
        
        return result
    
    def solve_parallel(self, u0, f, alpha, dt, num_steps):
        """
        并行求解热传导方程
        
        参数:
        u0: 初始温度场
        f: 热源项
        alpha: 热扩散系数
        dt: 时间步长
        num_steps: 时间步数
        
        返回:
        u: 最终温度场
        time_elapsed: 计算时间
        """
        start_time = time.time()
        
        # 准备参数
        args_list = [(sub, u0, f, alpha, dt, num_steps) for sub in self.subdomains]
        
        # 并行求解
        with Pool(processes=self.num_processes) as pool:
            results = pool.map(self._solve_subdomain, args_list)
        
        # 合并结果
        u_final = np.zeros((self.ny + 1, self.nx + 1))
        for result in results:
            start_y = result['start_y']
            local_ny = result['local_ny']
            u_final[start_y:start_y+local_ny+1, :] = result['solution']
        
        time_elapsed = time.time() - start_time
        
        return u_final, time_elapsed
    
    def solve_serial(self, u0, f, alpha, dt, num_steps):
        """
        串行求解(用于对比)
        """
        start_time = time.time()
        
        u = u0.copy()
        nx, ny = self.nx, self.ny
        
        for step in range(num_steps):
            u_new = u.copy()
            
            for j in range(1, ny):
                for i in range(1, nx):
                    u_new[j, i] = u[j, i] + alpha * dt * (
                        (u[j, i+1] - 2*u[j, i] + u[j, i-1]) +
                        (u[j+1, i] - 2*u[j, i] + u[j-1, i])
                    ) + dt * f[j, i]
            
            u = u_new
        
        time_elapsed = time.time() - start_time
        
        return u, time_elapsed


class MPISimulator:
    """
    MPI通信模式模拟器
    展示MPI中的基本通信模式
    """
    
    @staticmethod
    def demonstrate_broadcast(data, root=0, num_processes=4):
        """
        模拟MPI_Bcast:广播操作
        
        参数:
        data: 要广播的数据
        root: 根进程
        num_processes: 进程数
        
        返回:
        results: 每个进程接收到的数据
        """
        print(f"\nMPI广播演示 (根进程={root}):")
        print(f"  广播前: 进程{root}有数据 {data}")
        
        results = [None] * num_processes
        results[root] = data
        
        # 模拟广播
        for rank in range(num_processes):
            if rank != root:
                results[rank] = data
                print(f"  广播后: 进程{rank}接收到数据 {data}")
        
        return results
    
    @staticmethod
    def demonstrate_scatter(data_list, root=0):
        """
        模拟MPI_Scatter:散播操作
        
        参数:
        data_list: 要散播的数据列表
        root: 根进程
        
        返回:
        results: 每个进程接收到的数据
        """
        num_processes = len(data_list)
        print(f"\nMPI散播演示 (根进程={root}):")
        print(f"  散播前: 进程{root}有数据列表 {data_list}")
        
        results = []
        for rank in range(num_processes):
            results.append(data_list[rank])
            print(f"  散播后: 进程{rank}接收到数据 {data_list[rank]}")
        
        return results
    
    @staticmethod
    def demonstrate_gather(data_list, root=0):
        """
        模拟MPI_Gather:收集操作
        
        参数:
        data_list: 每个进程的数据列表
        root: 根进程
        
        返回:
        gathered_data: 收集后的数据
        """
        print(f"\nMPI收集演示 (根进程={root}):")
        for rank, data in enumerate(data_list):
            print(f"  收集前: 进程{rank}有数据 {data}")
        
        gathered_data = data_list.copy()
        print(f"  收集后: 进程{root}收集到数据 {gathered_data}")
        
        return gathered_data
    
    @staticmethod
    def demonstrate_reduce(data_list, operation='sum', root=0):
        """
        模拟MPI_Reduce:归约操作
        
        参数:
        data_list: 每个进程的数据列表
        operation: 归约操作 ('sum', 'max', 'min')
        root: 根进程
        
        返回:
        result: 归约结果
        """
        print(f"\nMPI归约演示 (操作={operation}, 根进程={root}):")
        for rank, data in enumerate(data_list):
            print(f"  进程{rank}贡献数据 {data}")
        
        if operation == 'sum':
            result = sum(data_list)
        elif operation == 'max':
            result = max(data_list)
        elif operation == 'min':
            result = min(data_list)
        else:
            result = None
        
        print(f"  归约结果: {result} (存储在进程{root})")
        
        return result


def example_1_basic_mpi_operations():
    """案例1: 基本MPI操作演示"""
    print("\n" + "="*60)
    print("案例1: 基本MPI操作演示")
    print("="*60)
    
    mpi = MPISimulator()
    
    # 广播
    data = "重要配置信息"
    mpi.demonstrate_broadcast(data, root=0, num_processes=4)
    
    # 散播
    data_list = [10, 20, 30, 40]
    mpi.demonstrate_scatter(data_list, root=0)
    
    # 收集
    data_list = [1.1, 2.2, 3.3, 4.4]
    mpi.demonstrate_gather(data_list, root=0)
    
    # 归约
    data_list = [100, 200, 300, 400]
    mpi.demonstrate_reduce(data_list, operation='sum', root=0)
    mpi.demonstrate_reduce(data_list, operation='max', root=0)


def example_2_parallel_vs_serial():
    """案例2: 并行 vs 串行性能对比"""
    print("\n" + "="*60)
    print("案例2: 并行 vs 串行性能对比")
    print("="*60)
    
    # 网格参数
    nx = ny = 128
    
    # 物理参数
    alpha = 0.01
    dt = 0.001
    num_steps = 100
    
    # 初始化
    x = np.linspace(0, 1, nx+1)
    y = np.linspace(0, 1, ny+1)
    X, Y = np.meshgrid(x, y)
    
    # 初始条件:高斯分布
    u0 = np.exp(-((X-0.5)**2 + (Y-0.5)**2) / 0.1)
    
    # 热源项
    f = np.zeros((ny+1, nx+1))
    
    # 边界条件
    u0[0, :] = 0
    u0[ny, :] = 0
    u0[:, 0] = 0
    u0[:, nx] = 0
    
    print(f"\n网格尺寸: {nx}×{ny}")
    print(f"时间步数: {num_steps}")
    
    # 串行求解
    print("\n串行求解...")
    solver = ParallelHeatSolver(nx, ny, num_processes=1)
    u_serial, time_serial = solver.solve_serial(u0, f, alpha, dt, num_steps)
    print(f"  串行时间: {time_serial:.4f}秒")
    
    # 并行求解(不同进程数)
    process_counts = [2, 4, 6, 8]
    speedups = []
    efficiencies = []
    
    for num_proc in process_counts:
        if num_proc > cpu_count():
            continue
        
        print(f"\n并行求解 ({num_proc}进程)...")
        solver = ParallelHeatSolver(nx, ny, num_processes=num_proc)
        u_parallel, time_parallel = solver.solve_parallel(u0, f, alpha, dt, num_steps)
        
        speedup = time_serial / time_parallel
        efficiency = speedup / num_proc
        
        speedups.append(speedup)
        efficiencies.append(efficiency)
        
        print(f"  并行时间: {time_parallel:.4f}秒")
        print(f"  加速比: {speedup:.2f}x")
        print(f"  效率: {efficiency*100:.1f}%")
        
        # 验证结果一致性
        error = np.linalg.norm(u_serial - u_parallel) / np.linalg.norm(u_serial)
        print(f"  相对误差: {error:.6e}")
    
    # 绘制加速比和效率
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    axes[0].plot(process_counts[:len(speedups)], speedups, 'b-o', linewidth=2, markersize=8)
    axes[0].plot(process_counts[:len(speedups)], process_counts[:len(speedups)], 
                 'r--', label='理想加速比', linewidth=2)
    axes[0].set_xlabel('进程数', fontsize=12)
    axes[0].set_ylabel('加速比', fontsize=12)
    axes[0].set_title('并行加速比', fontsize=14)
    axes[0].legend(fontsize=11)
    axes[0].grid(True, alpha=0.3)
    
    axes[1].plot(process_counts[:len(efficiencies)], 
                 [e*100 for e in efficiencies], 'g-s', linewidth=2, markersize=8)
    axes[1].axhline(y=100, color='r', linestyle='--', label='理想效率', linewidth=2)
    axes[1].set_xlabel('进程数', fontsize=12)
    axes[1].set_ylabel('并行效率 (%)', fontsize=12)
    axes[1].set_title('并行效率', fontsize=14)
    axes[1].legend(fontsize=11)
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('案例2_并行性能对比.png', dpi=150, bbox_inches='tight')
    print("\n  图已保存: 案例2_并行性能对比.png")
    plt.close()


def example_3_domain_decomposition():
    """案例3: 域分解可视化"""
    print("\n" + "="*60)
    print("案例3: 域分解可视化")
    print("="*60)
    
    nx = ny = 64
    process_counts = [1, 2, 4, 8]
    
    fig, axes = plt.subplots(2, 2, figsize=(12, 12))
    axes = axes.flatten()
    
    for idx, num_proc in enumerate(process_counts):
        if num_proc > cpu_count():
            continue
        
        solver = ParallelHeatSolver(nx, ny, num_processes=num_proc)
        
        # 创建域分解可视化
        domain_map = np.zeros((ny+1, nx+1))
        colors = plt.cm.tab10(np.linspace(0, 1, num_proc))
        
        for sub in solver.subdomains:
            rank = sub['rank']
            start_y = sub['start_y']
            local_ny = sub['local_ny']
            
            domain_map[start_y:start_y+local_ny+1, :] = rank + 1
        
        ax = axes[idx]
        im = ax.imshow(domain_map, extent=[0, 1, 0, 1], origin='lower',
                       cmap='tab10', vmin=0.5, vmax=num_proc+0.5)
        ax.set_title(f'{num_proc}进程域分解', fontsize=12)
        ax.set_xlabel('x')
        ax.set_ylabel('y')
        
        # 添加网格线
        for sub in solver.subdomains:
            if sub['rank'] > 0:
                y_pos = sub['start_y'] / ny
                ax.axhline(y=y_pos, color='white', linewidth=2, linestyle='--')
    
    plt.tight_layout()
    plt.savefig('案例3_域分解可视化.png', dpi=150, bbox_inches='tight')
    print("  图已保存: 案例3_域分解可视化.png")
    plt.close()


def example_4_strong_scaling():
    """案例4: 强可扩展性分析"""
    print("\n" + "="*60)
    print("案例4: 强可扩展性分析")
    print("="*60)
    
    # 固定问题规模,增加进程数
    nx = ny = 256
    alpha = 0.01
    dt = 0.001
    num_steps = 50
    
    # 初始化
    x = np.linspace(0, 1, nx+1)
    y = np.linspace(0, 1, ny+1)
    X, Y = np.meshgrid(x, y)
    u0 = np.exp(-((X-0.5)**2 + (Y-0.5)**2) / 0.1)
    f = np.zeros((ny+1, nx+1))
    u0[0, :] = u0[ny, :] = u0[:, 0] = u0[:, nx] = 0
    
    print(f"\n固定问题规模: {nx}×{ny}")
    
    # 串行基准
    solver = ParallelHeatSolver(nx, ny, num_processes=1)
    u_serial, time_serial = solver.solve_serial(u0, f, alpha, dt, num_steps)
    print(f"串行时间: {time_serial:.4f}秒")
    
    # 不同进程数
    process_counts = [1, 2, 4, 6, 8]
    speedups = []
    
    for num_proc in process_counts:
        if num_proc > cpu_count():
            continue
        
        print(f"\n{num_proc}进程...")
        solver = ParallelHeatSolver(nx, ny, num_processes=num_proc)
        u_parallel, time_parallel = solver.solve_parallel(u0, f, alpha, dt, num_steps)
        
        speedup = time_serial / time_parallel
        speedups.append(speedup)
        
        print(f"  时间: {time_parallel:.4f}秒, 加速比: {speedup:.2f}x")
    
    # 绘制强可扩展性
    fig, ax = plt.subplots(figsize=(10, 6))
    
    ax.plot(process_counts[:len(speedups)], speedups, 'b-o', 
            linewidth=2, markersize=8, label='实际加速比')
    ax.plot(process_counts[:len(speedups)], process_counts[:len(speedups)], 
            'r--', linewidth=2, label='理想加速比')
    ax.set_xlabel('进程数', fontsize=12)
    ax.set_ylabel('加速比', fontsize=12)
    ax.set_title('强可扩展性分析 (固定问题规模)', fontsize=14)
    ax.legend(fontsize=11)
    ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('案例4_强可扩展性.png', dpi=150, bbox_inches='tight')
    print("\n  图已保存: 案例4_强可扩展性.png")
    plt.close()


def example_5_weak_scaling():
    """案例5: 弱可扩展性分析"""
    print("\n" + "="*60)
    print("案例5: 弱可扩展性分析")
    print("="*60)
    
    # 每个进程固定网格大小
    local_nx = local_ny = 64
    alpha = 0.01
    dt = 0.001
    num_steps = 50
    
    process_counts = [1, 2, 4, 6, 8]
    times = []
    
    print(f"\n每进程固定网格: {local_nx}×{local_ny}")
    
    for num_proc in process_counts:
        if num_proc > cpu_count():
            continue
        
        # 全局网格随进程数增加
        nx = local_nx
        ny = local_ny * num_proc
        
        print(f"\n{num_proc}进程, 全局网格: {nx}×{ny}")
        
        # 初始化
        x = np.linspace(0, 1, nx+1)
        y = np.linspace(0, 1, ny+1)
        X, Y = np.meshgrid(x, y)
        u0 = np.exp(-((X-0.5)**2 + (Y-0.5)**2) / 0.1)
        f = np.zeros((ny+1, nx+1))
        u0[0, :] = u0[ny, :] = u0[:, 0] = u0[:, nx] = 0
        
        solver = ParallelHeatSolver(nx, ny, num_processes=num_proc)
        u_parallel, time_parallel = solver.solve_parallel(u0, f, alpha, dt, num_steps)
        
        times.append(time_parallel)
        print(f"  时间: {time_parallel:.4f}秒")
    
    # 绘制弱可扩展性
    fig, ax = plt.subplots(figsize=(10, 6))
    
    # 效率 = 基准时间 / 当前时间
    base_time = times[0]
    efficiencies = [base_time / t * 100 for t in times]
    
    ax.plot(process_counts[:len(efficiencies)], efficiencies, 'g-s', 
            linewidth=2, markersize=8)
    ax.axhline(y=100, color='r', linestyle='--', linewidth=2, label='理想效率')
    ax.set_xlabel('进程数', fontsize=12)
    ax.set_ylabel('效率 (%)', fontsize=12)
    ax.set_title('弱可扩展性分析 (每进程固定负载)', fontsize=14)
    ax.legend(fontsize=11)
    ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('案例5_弱可扩展性.png', dpi=150, bbox_inches='tight')
    print("\n  图已保存: 案例5_弱可扩展性.png")
    plt.close()


def example_6_heat_diffusion_animation():
    """案例6: 热扩散过程可视化"""
    print("\n" + "="*60)
    print("案例6: 热扩散过程可视化")
    print("="*60)
    
    nx = ny = 64
    alpha = 0.01
    dt = 0.001
    
    # 初始化
    x = np.linspace(0, 1, nx+1)
    y = np.linspace(0, 1, ny+1)
    X, Y = np.meshgrid(x, y)
    
    # 初始条件:多个热源
    u0 = np.zeros((ny+1, nx+1))
    u0 += np.exp(-((X-0.3)**2 + (Y-0.3)**2) / 0.02)
    u0 += np.exp(-((X-0.7)**2 + (Y-0.7)**2) / 0.02)
    u0 += np.exp(-((X-0.3)**2 + (Y-0.7)**2) / 0.02)
    u0 += np.exp(-((X-0.7)**2 + (Y-0.3)**2) / 0.02)
    
    f = np.zeros((ny+1, nx+1))
    
    # 边界条件
    u0[0, :] = 0
    u0[ny, :] = 0
    u0[:, 0] = 0
    u0[:, nx] = 0
    
    # 并行求解并记录中间结果
    solver = ParallelHeatSolver(nx, ny, num_processes=4)
    
    # 分阶段求解
    time_steps = [0, 50, 100, 200, 500]
    solutions = [u0.copy()]
    
    u_current = u0.copy()
    prev_steps = 0
    for steps in time_steps[1:]:
        u_current, _ = solver.solve_parallel(u_current, f, alpha, dt, steps - prev_steps)
        solutions.append(u_current.copy())
        prev_steps = steps
    
    # 绘制热扩散过程
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.flatten()
    
    for idx, (u, step) in enumerate(zip(solutions, time_steps)):
        ax = axes[idx]
        im = ax.contourf(X, Y, u, levels=20, cmap='hot')
        ax.set_title(f'时间步 = {step}', fontsize=12)
        ax.set_xlabel('x')
        ax.set_ylabel('y')
        plt.colorbar(im, ax=ax)
    
    plt.tight_layout()
    plt.savefig('案例6_热扩散过程.png', dpi=150, bbox_inches='tight')
    print("  图已保存: 案例6_热扩散过程.png")
    plt.close()


if __name__ == "__main__":
    print("="*60)
    print("主题063: 并行计算与MPI仿真")
    print("="*60)
    print(f"\n系统CPU核心数: {cpu_count()}")
    
    # 运行所有案例
    example_1_basic_mpi_operations()
    example_2_parallel_vs_serial()
    example_3_domain_decomposition()
    example_4_strong_scaling()
    example_5_weak_scaling()
    example_6_heat_diffusion_animation()
    
    print("\n" + "="*60)
    print("仿真完成!")
    print("="*60)
    print("\n生成的文件:")
    print("  1. 案例2_并行性能对比.png")
    print("  2. 案例3_域分解可视化.png")
    print("  3. 案例4_强可扩展性.png")
    print("  4. 案例5_弱可扩展性.png")
    print("  5. 案例6_热扩散过程.png")

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐