主题076:并行计算与高性能仿真

目录

  1. 并行计算概述
  2. 并行计算基础理论
  3. Python多进程与多线程
  4. MPI并行编程
  5. OpenMP并行化
  6. GPU加速计算
  7. 有限元并行求解
  8. 大规模仿真案例
  9. 性能优化策略
  10. 云计算与分布式仿真

1. 并行计算概述

1.1 为什么需要并行计算

计算需求的爆炸式增长

现代工程仿真面临的挑战:

  • 模型规模:百万甚至千万自由度
  • 物理复杂性:多物理场耦合、多尺度问题
  • 时间要求:设计周期缩短,需要快速反馈
  • 精度需求:更精细的网格,更复杂的本构

性能瓶颈

单核CPU性能提升遇到物理极限:

  • 摩尔定律放缓,时钟频率难以继续提升
  • 功耗墙限制,散热成为关键问题
  • 内存墙问题,数据传输速度跟不上计算速度

并行计算的优势

指标 串行计算 并行计算
计算时间 T T T T / P T/P T/P(理想)
可处理问题规模 有限 可扩展
内存容量 单机限制 聚合内存
成本效益 高(使用普通硬件)

1.2 并行计算的基本概念

并行化类型

  1. 数据并行(Data Parallelism)

    • 相同操作应用于不同数据
    • 适合:矩阵运算、图像处理、有限元计算
    • 示例:每个处理器处理网格的一部分
  2. 任务并行(Task Parallelism)

    • 不同任务同时执行
    • 适合:流水线处理、多物理场耦合
    • 示例:同时计算结构和流体
  3. 流水线并行(Pipeline Parallelism)

    • 任务分阶段,重叠执行
    • 适合:迭代算法、时间步进

并行计算模型

┌─────────────────────────────────────────────────────────────┐
│                    并行计算模型分类                          │
├─────────────────────────────────────────────────────────────┤
│  共享内存模型          │  分布式内存模型                     │
│  ├─ OpenMP            │  ├─ MPI                            │
│  ├─ Pthreads          │  ├─ 消息传递                        │
│  └─ GPU (CUDA/OpenCL) │  └─ 网络通信                        │
├─────────────────────────────────────────────────────────────┤
│  混合模型:MPI + OpenMP, MPI + CUDA                         │
└─────────────────────────────────────────────────────────────┘

关键性能指标

  1. 加速比(Speedup)
    S ( P ) = T 1 T P S(P) = \frac{T_1}{T_P} S(P)=TPT1
    其中 T 1 T_1 T1 是串行时间, T P T_P TP P P P 个处理器的并行时间

  2. 效率(Efficiency)
    E ( P ) = S ( P ) P = T 1 P ⋅ T P E(P) = \frac{S(P)}{P} = \frac{T_1}{P \cdot T_P} E(P)=PS(P)=PTPT1

  3. 可扩展性(Scalability)

    • 强可扩展性:固定问题规模,增加处理器
    • 弱可扩展性:问题规模与处理器成比例增加

阿姆达尔定律

S ( P ) = 1 ( 1 − f ) + f P S(P) = \frac{1}{(1-f) + \frac{f}{P}} S(P)=(1f)+Pf1

其中 f f f 是可并行化部分的比例。

  • f = 0.9 f = 0.9 f=0.9(90%可并行), P = 100 P = 100 P=100 时: S ≈ 9.2 S \approx 9.2 S9.2
  • f = 0.99 f = 0.99 f=0.99(99%可并行), P = 100 P = 100 P=100 时: S ≈ 50 S \approx 50 S50

古斯塔夫森定律

阿姆达尔定律假设问题规模固定,而古斯塔夫森定律考虑问题规模随处理器增加而扩大:

S ( P ) = P − f ( P − 1 ) S(P) = P - f(P-1) S(P)=Pf(P1)

这表明并行计算的真正优势在于解决更大规模的问题。

Python实现性能测试

import time
import numpy as np

def measure_serial_performance(func, *args, n_runs=5):
    """测量串行函数性能"""
    times = []
    for _ in range(n_runs):
        start = time.time()
        result = func(*args)
        end = time.time()
        times.append(end - start)
    
    return np.mean(times), np.std(times), result

def matrix_multiply_benchmark(n=2000):
    """矩阵乘法基准测试"""
    A = np.random.randn(n, n)
    B = np.random.randn(n, n)
    
    mean_time, std_time, C = measure_serial_performance(
        lambda: A @ B, n_runs=3
    )
    
    print(f"矩阵乘法 ({n}×{n})")
    print(f"  平均时间: {mean_time:.3f}s ± {std_time:.3f}s")
    print(f"  计算性能: {2*n**3 / mean_time / 1e9:.2f} GFLOPS")
    
    return mean_time

# 基准测试
print("=" * 70)
print("串行性能基准测试")
print("=" * 70)
t_serial = matrix_multiply_benchmark(2000)

1.3 并行计算硬件架构

现代计算架构

  1. 多核CPU

    • 4-64核心,共享内存
    • 适合:OpenMP并行、任务并行
    • 内存带宽:50-200 GB/s
  2. 众核处理器(Xeon Phi)

    • 60+核心,SIMD指令
    • 适合:高度并行的科学计算
  3. GPU加速器

    • 数千核心,高内存带宽
    • 适合:数据并行、大规模矩阵运算
    • 内存带宽:500-2000 GB/s
  4. 集群系统

    • 多台服务器,分布式内存
    • 适合:MPI并行、超大规模问题

内存层次结构

┌─────────────────────────────────────────┐
│  寄存器(Register)                      │  最快,容量最小
│  └─ 每个核心:数十个                     │
├─────────────────────────────────────────┤
│  L1缓存                                 │  32-64 KB
│  └─ 每个核心私有                         │
├─────────────────────────────────────────┤
│  L2缓存                                 │  256-512 KB
│  └─ 每个核心或每几个核心共享              │
├─────────────────────────────────────────┤
│  L3缓存(LLC)                          │  8-64 MB
│  └─ 所有核心共享                         │
├─────────────────────────────────────────┤
│  主内存(DRAM)                          │  16-1024 GB
│  └─ 所有核心共享                         │
├─────────────────────────────────────────┤
│  网络/存储                              │  最慢,容量最大
└─────────────────────────────────────────┘

并行计算硬件选型指南

应用场景 推荐架构 理由
中小规模FEM 多核CPU 开发简单,内存充足
大规模CFD GPU集群 计算密集,数据并行
多物理场耦合 CPU集群 任务并行,通信灵活
实时仿真 FPGA/ASIC 低延迟,确定性

2. 并行计算基础理论

2.1 并行算法设计

分解策略

  1. 域分解(Domain Decomposition)

    • 将计算域划分为子域
    • 每个处理器负责一个子域
    • 适合:有限元、有限差分、CFD
  2. 功能分解(Functional Decomposition)

    • 按功能模块划分任务
    • 适合:多物理场耦合、前后处理分离
  3. 流水线分解(Pipeline Decomposition)

    • 任务分阶段并行
    • 适合:图像处理、信号处理

负载均衡

import numpy as np

def domain_decomposition_1d(n_elements, n_processors):
    """
    一维域分解
    
    参数:
        n_elements: 总单元数
        n_processors: 处理器数
    
    返回:
        每个处理器的起始和结束索引
    """
    base_size = n_elements // n_processors
    remainder = n_elements % n_processors
    
    ranges = []
    start = 0
    
    for i in range(n_processors):
        # 前remainder个处理器多分配一个单元
        size = base_size + (1 if i < remainder else 0)
        end = start + size
        ranges.append((start, end))
        start = end
    
    return ranges


def domain_decomposition_2d(nx, ny, px, py):
    """
    二维域分解
    
    参数:
        nx, ny: x和y方向的单元数
        px, py: x和y方向的处理器数
    
    返回:
        每个处理器的子域范围
    """
    # x方向分解
    x_ranges = domain_decomposition_1d(nx, px)
    # y方向分解
    y_ranges = domain_decomposition_1d(ny, py)
    
    subdomains = []
    for i in range(px):
        for j in range(py):
            subdomains.append({
                'proc_id': i * py + j,
                'x_range': x_ranges[i],
                'y_range': y_ranges[j]
            })
    
    return subdomains


def domain_decomposition_3d(nx, ny, nz, px, py, pz):
    """
    三维域分解
    
    参数:
        nx, ny, nz: 三个方向的单元数
        px, py, pz: 三个方向的处理器数
    
    返回:
        每个处理器的子域范围
    """
    x_ranges = domain_decomposition_1d(nx, px)
    y_ranges = domain_decomposition_1d(ny, py)
    z_ranges = domain_decomposition_1d(nz, pz)
    
    subdomains = []
    for i in range(px):
        for j in range(py):
            for k in range(pz):
                subdomains.append({
                    'proc_id': i * py * pz + j * pz + k,
                    'x_range': x_ranges[i],
                    'y_range': y_ranges[j],
                    'z_range': z_ranges[k]
                })
    
    return subdomains


# 示例:域分解
print("\n" + "=" * 70)
print("域分解示例")
print("=" * 70)

# 1D分解
ranges_1d = domain_decomposition_1d(100, 4)
print("\n1D域分解 (100单元, 4处理器):")
for i, (start, end) in enumerate(ranges_1d):
    print(f"  处理器{i}: 单元 {start}-{end-1} (共{end-start}个)")

# 2D分解
subdomains = domain_decomposition_2d(100, 80, 2, 2)
print("\n2D域分解 (100×80网格, 2×2处理器):")
for sub in subdomains:
    x_start, x_end = sub['x_range']
    y_start, y_end = sub['y_range']
    print(f"  处理器{sub['proc_id']}: x[{x_start}:{x_end}], y[{y_start}:{y_end}]")

# 3D分解
subdomains_3d = domain_decomposition_3d(60, 50, 40, 2, 2, 2)
print("\n3D域分解 (60×50×40网格, 2×2×2处理器):")
for sub in subdomains_3d[:4]:  # 只显示前4个
    x_start, x_end = sub['x_range']
    y_start, y_end = sub['y_range']
    z_start, z_end = sub['z_range']
    print(f"  处理器{sub['proc_id']}: x[{x_start}:{x_end}], y[{y_start}:{y_end}], z[{z_start}:{z_end}]")

2.2 通信与同步

通信模式

  1. 点对点通信

    • 两个处理器之间的直接通信
    • 适合:边界数据交换、不规则通信
  2. 集合通信

    • 广播(Broadcast):一个到所有
    • 归约(Reduce):所有到一个(求和、最大等)
    • 散射(Scatter):一个分散到所有
    • 收集(Gather):所有收集到一个
    • 全收集(Allgather):所有收集到所有

同步机制

class SimpleBarrier:
    """简单的屏障同步实现"""
    
    def __init__(self, n):
        self.n = n  # 参与者数量
        self.count = 0
        self.mutex = None  # 互斥锁(实际实现需要)
        self.cond = None   # 条件变量(实际实现需要)
    
    def wait(self):
        """等待所有参与者到达"""
        # 实际实现需要线程/进程同步原语
        pass


def halo_exchange_example():
    """
    光环交换(Halo Exchange)示例
    
    有限差分/有限元并行计算中的典型通信模式
    """
    print("\n光环交换示意图:")
    print("""
    处理器0          处理器1          处理器2
    ┌─────────┐     ┌─────────┐     ┌─────────┐
    │内部单元 │←──→│内部单元 │←──→│内部单元 │
    │  + 光环 │     │  + 光环 │     │  + 光环 │
    └─────────┘     └─────────┘     └─────────┘
         ↑               ↑               ↑
       发送边界        双向交换        接收边界
    
    每个处理器需要:
    1. 计算内部单元
    2. 发送边界数据到邻居
    3. 接收邻居的边界数据
    4. 更新光环区域
    """)


halo_exchange_example()

通信优化策略

  1. 通信隐藏计算

    • 非阻塞通信
    • 计算与通信重叠
  2. 减少通信次数

    • 聚合小消息
    • 批量数据传输
  3. 拓扑感知映射

    • 将相邻子域映射到物理相邻的处理器
    • 减少网络跳数

2.3 并行效率分析

性能模型

T P = T c o m p u t e P + T c o m m + T s y n c T_P = \frac{T_{compute}}{P} + T_{comm} + T_{sync} TP=PTcompute+Tcomm+Tsync

其中:

  • T c o m p u t e T_{compute} Tcompute:计算时间
  • T c o m m T_{comm} Tcomm:通信时间
  • T s y n c T_{sync} Tsync:同步开销

通信开销模型

T c o m m = T s t a r t u p + n ⋅ T t r a n s f e r T_{comm} = T_{startup} + n \cdot T_{transfer} Tcomm=Tstartup+nTtransfer

  • T s t a r t u p T_{startup} Tstartup:启动延迟(延迟)
  • T t r a n s f e r T_{transfer} Ttransfer:单位数据传输时间(带宽的倒数)
  • n n n:传输数据量

Python性能分析工具

import time
from functools import wraps

def timer_decorator(func):
    """计时装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        end = time.perf_counter()
        print(f"{func.__name__} 耗时: {end - start:.4f}s")
        return result
    return wrapper


class PerformanceAnalyzer:
    """性能分析器"""
    
    def __init__(self):
        self.timings = {}
        self.memory_usage = {}
    
    def record(self, name, time_value, memory_mb=None):
        """记录时间和内存"""
        if name not in self.timings:
            self.timings[name] = []
            self.memory_usage[name] = []
        self.timings[name].append(time_value)
        if memory_mb is not None:
            self.memory_usage[name].append(memory_mb)
    
    def report(self):
        """生成报告"""
        print("\n" + "=" * 70)
        print("性能分析报告")
        print("=" * 70)
        
        total = 0
        for name, times in self.timings.items():
            avg_time = np.mean(times)
            std_time = np.std(times)
            total += avg_time
            print(f"{name:30s}: {avg_time:.4f}s ± {std_time:.4f}s")
            if name in self.memory_usage and self.memory_usage[name]:
                avg_mem = np.mean(self.memory_usage[name])
                print(f"{'':30s}  内存: {avg_mem:.2f} MB")
        
        print("-" * 70)
        print(f"{'总计':30s}: {total:.4f}s")


class ParallelScalabilityAnalyzer:
    """并行可扩展性分析器"""
    
    def __init__(self):
        self.results = {}
    
    def add_result(self, n_processors, time, problem_size=None):
        """添加测试结果"""
        self.results[n_processors] = {
            'time': time,
            'problem_size': problem_size
        }
    
    def compute_speedup(self, baseline_n_proc=1):
        """计算加速比"""
        if baseline_n_proc not in self.results:
            return None
        
        baseline_time = self.results[baseline_n_proc]['time']
        speedups = {}
        
        for n_proc, data in self.results.items():
            speedups[n_proc] = baseline_time / data['time']
        
        return speedups
    
    def compute_efficiency(self, baseline_n_proc=1):
        """计算效率"""
        speedups = self.compute_speedup(baseline_n_proc)
        if speedups is None:
            return None
        
        efficiencies = {}
        for n_proc, speedup in speedups.items():
            efficiencies[n_proc] = speedup / n_proc
        
        return efficiencies
    
    def report(self):
        """生成可扩展性报告"""
        print("\n" + "=" * 70)
        print("并行可扩展性分析报告")
        print("=" * 70)
        print(f"{'处理器数':>10s} {'时间(s)':>12s} {'加速比':>10s} {'效率(%)':>10s}")
        print("-" * 50)
        
        speedups = self.compute_speedup()
        efficiencies = self.compute_efficiency()
        
        for n_proc in sorted(self.results.keys()):
            time_val = self.results[n_proc]['time']
            speedup = speedups.get(n_proc, 0)
            efficiency = efficiencies.get(n_proc, 0) * 100
            print(f"{n_proc:10d} {time_val:12.4f} {speedup:10.2f} {efficiency:10.1f}")


# 示例:分析矩阵运算性能
print("\n" + "=" * 70)
print("矩阵运算性能分析")
print("=" * 70)

analyzer = PerformanceAnalyzer()

# 矩阵创建
start = time.perf_counter()
A = np.random.randn(3000, 3000)
B = np.random.randn(3000, 3000)
analyzer.record("矩阵创建", time.perf_counter() - start)

# 矩阵乘法
start = time.perf_counter()
C = A @ B
analyzer.record("矩阵乘法", time.perf_counter() - start)

# 矩阵求逆
start = time.perf_counter()
A_inv = np.linalg.inv(A[:1000, :1000])
analyzer.record("矩阵求逆(1000)", time.perf_counter() - start)

# 特征值分解
start = time.perf_counter()
eigenvalues = np.linalg.eigvals(A[:500, :500])
analyzer.record("特征值(500)", time.perf_counter() - start)

analyzer.report()

3. Python多进程与多线程

3.1 Python的GIL与并行选择

全局解释器锁(GIL)

  • Python的CPython实现有GIL
  • 同一时刻只有一个线程执行Python字节码
  • 影响:多线程不能实现真正的CPU并行

解决方案

方法 适用场景 实现方式
多进程 CPU密集型 multiprocessing
多线程 I/O密集型 threading
异步 I/O密集型 asyncio
C扩展 CPU密集型 Cython, Numba

进程 vs 线程

┌─────────────────────────────────────────────────────────────┐
│  多进程 (Multiprocessing)       多线程 (Threading)           │
├─────────────────────────────────────────────────────────────┤
│  • 独立内存空间                 • 共享内存空间               │
│  • 无GIL限制                    • 受GIL限制                  │
│  • 高启动开销                   • 低启动开销                  │
│  • 适合CPU密集型                • 适合I/O密集型              │
│  • 需要序列化通信               • 直接共享数据                │
└─────────────────────────────────────────────────────────────┘

并行策略选择决策树

                    任务类型?
                   /         \
            CPU密集型       I/O密集型
               |               |
          数据并行?        阻塞操作?
          /       \        /       \
       是          否    是         否
       |           |     |          |
   multiprocessing  MPI  threading  asyncio
   或Numba         (大规模)         
   (单机多核)      或GPU

3.2 multiprocessing模块

基础用法

import multiprocessing as mp
from multiprocessing import Pool, Process, Queue, Manager, Value, Lock
import numpy as np
import time

def worker_function(data_chunk):
    """
    工作函数 - 处理数据块
    
    参数:
        data_chunk: 数据块
    
    返回:
        处理结果
    """
    # 模拟计算密集型任务
    result = np.sum(data_chunk ** 2)
    return result


def parallel_map_example():
    """
    使用Pool进行并行map操作
    """
    print("\n" + "=" * 70)
    print("multiprocessing.Pool示例")
    print("=" * 70)
    
    # 生成数据
    n_chunks = 8
    chunk_size = 1000000
    data = [np.random.randn(chunk_size) for _ in range(n_chunks)]
    
    # 串行执行
    print("\n串行执行:")
    start = time.time()
    serial_results = [worker_function(chunk) for chunk in data]
    serial_time = time.time() - start
    print(f"  耗时: {serial_time:.3f}s")
    print(f"  结果: {serial_results}")
    
    # 并行执行
    print("\n并行执行 (8进程):")
    start = time.time()
    with Pool(processes=8) as pool:
        parallel_results = pool.map(worker_function, data)
    parallel_time = time.time() - start
    print(f"  耗时: {parallel_time:.3f}s")
    print(f"  结果: {parallel_results}")
    
    # 加速比
    speedup = serial_time / parallel_time
    efficiency = speedup / 8
    print(f"\n加速比: {speedup:.2f}x")
    print(f"效率: {efficiency*100:.1f}%")
    
    # 验证结果
    assert np.allclose(serial_results, parallel_results), "结果不一致!"
    print("\n✓ 结果验证通过")


parallel_map_example()

进程间通信

def producer(queue, n_items):
    """生产者进程"""
    for i in range(n_items):
        data = np.random.randn(1000)
        queue.put(data)
        print(f"生产者: 生产数据块 {i+1}")
    queue.put(None)  # 结束信号

def consumer(queue, results):
    """消费者进程"""
    while True:
        data = queue.get()
        if data is None:
            break
        result = np.sum(data ** 2)
        results.append(result)
        print(f"消费者: 处理完成,结果={result:.2f}")


def queue_communication_example():
    """
    使用Queue进行进程间通信
    """
    print("\n" + "=" * 70)
    print("进程间通信示例 (Queue)")
    print("=" * 70)
    
    queue = Queue(maxsize=10)
    manager = Manager()
    results = manager.list()
    
    # 创建进程
    producer_proc = Process(target=producer, args=(queue, 5))
    consumer_proc = Process(target=consumer, args=(queue, results))
    
    # 启动进程
    start = time.time()
    producer_proc.start()
    consumer_proc.start()
    
    # 等待完成
    producer_proc.join()
    consumer_proc.join()
    
    elapsed = time.time() - start
    print(f"\n总耗时: {elapsed:.3f}s")
    print(f"处理结果数: {len(results)}")


queue_communication_example()

共享内存

from multiprocessing import shared_memory

def shared_memory_example():
    """
    使用共享内存进行高效数据共享
    """
    print("\n" + "=" * 70)
    print("共享内存示例")
    print("=" * 70)
    
    # 创建共享内存
    n = 1000000
    data = np.random.randn(n)
    
    # 创建共享内存块
    shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
    shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
    shared_array[:] = data[:]
    
    print(f"共享内存名称: {shm.name}")
    print(f"数据大小: {data.nbytes / 1e6:.2f} MB")
    
    def worker(shm_name, shape, dtype, start_idx, end_idx):
        """工作进程 - 处理共享内存的一部分"""
        # 连接到共享内存
        existing_shm = shared_memory.SharedMemory(name=shm_name)
        arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
        
        # 处理数据
        local_sum = np.sum(arr[start_idx:end_idx] ** 2)
        
        existing_shm.close()
        return local_sum
    
    # 并行处理
    n_processes = 4
    chunk_size = n // n_processes
    
    with Pool(processes=n_processes) as pool:
        results = pool.starmap(
            worker,
            [(shm.name, data.shape, data.dtype, i*chunk_size, (i+1)*chunk_size) 
             for i in range(n_processes)]
        )
    
    total_sum = sum(results)
    expected_sum = np.sum(data ** 2)
    
    print(f"\n并行计算结果: {total_sum:.2f}")
    print(f"期望结果: {expected_sum:.2f}")
    print(f"误差: {abs(total_sum - expected_sum):.6f}")
    
    # 清理
    shm.close()
    shm.unlink()


shared_memory_example()

进程池高级用法

def parallel_map_async_example():
    """
    异步并行map操作
    """
    print("\n" + "=" * 70)
    print("异步并行Map示例")
    print("=" * 70)
    
    def slow_function(x):
        """模拟耗时操作"""
        time.sleep(0.1)
        return x ** 2
    
    data = list(range(20))
    
    with Pool(processes=4) as pool:
        # 异步执行
        result_async = pool.map_async(slow_function, data)
        
        # 等待完成(可以在此期间做其他事情)
        while not result_async.ready():
            print(f"进度: {result_async._number_left} 批次剩余")
            time.sleep(0.05)
        
        results = result_async.get()
    
    print(f"\n完成! 结果数: {len(results)}")
    print(f"前5个结果: {results[:5]}")


parallel_map_async_example()

3.3 并行矩阵运算

def parallel_matrix_vector(A, x, n_processes=None):
    """
    并行矩阵-向量乘法
    
    参数:
        A: 矩阵 (m×n)
        x: 向量 (n,)
        n_processes: 进程数
    
    返回:
        y = A @ x
    """
    if n_processes is None:
        n_processes = mp.cpu_count()
    
    m, n = A.shape
    chunk_size = m // n_processes
    
    def compute_chunk(args):
        A_chunk, x, start_row = args
        return start_row, A_chunk @ x
    
    # 分割矩阵
    chunks = []
    for i in range(n_processes):
        start = i * chunk_size
        end = m if i == n_processes - 1 else (i + 1) * chunk_size
        chunks.append((A[start:end], x, start))
    
    # 并行计算
    with Pool(processes=n_processes) as pool:
        results = pool.map(compute_chunk, chunks)
    
    # 合并结果
    y = np.empty(m)
    for start_row, y_chunk in results:
        y[start_row:start_row + len(y_chunk)] = y_chunk
    
    return y


def parallel_matrix_multiply(A, B, n_processes=None):
    """
    并行矩阵乘法(分块算法)
    
    参数:
        A: 矩阵 (m×k)
        B: 矩阵 (k×n)
        n_processes: 进程数
    
    返回:
        C = A @ B (m×n)
    """
    if n_processes is None:
        n_processes = mp.cpu_count()
    
    m, k = A.shape
    k2, n = B.shape
    assert k == k2, "矩阵维度不匹配"
    
    # 对A按行分块
    chunk_size = m // n_processes
    
    def compute_chunk(args):
        A_chunk, B, start_row = args
        return start_row, A_chunk @ B
    
    chunks = []
    for i in range(n_processes):
        start = i * chunk_size
        end = m if i == n_processes - 1 else (i + 1) * chunk_size
        chunks.append((A[start:end], B, start))
    
    with Pool(processes=n_processes) as pool:
        results = pool.map(compute_chunk, chunks)
    
    # 合并结果
    C = np.empty((m, n))
    for start_row, C_chunk in results:
        C[start_row:start_row + C_chunk.shape[0]] = C_chunk
    
    return C


def benchmark_parallel_matrix():
    """
    并行矩阵运算基准测试
    """
    print("\n" + "=" * 70)
    print("并行矩阵运算基准测试")
    print("=" * 70)
    
    # 生成测试数据
    n = 5000
    A = np.random.randn(n, n)
    x = np.random.randn(n)
    
    # NumPy串行计算
    print("\nNumPy串行计算:")
    start = time.time()
    y_serial = A @ x
    serial_time = time.time() - start
    print(f"  耗时: {serial_time:.3f}s")
    
    # 并行计算(不同进程数)
    for n_proc in [2, 4, 8]:
        print(f"\n并行计算 ({n_proc}进程):")
        start = time.time()
        y_parallel = parallel_matrix_vector(A, x, n_processes=n_proc)
        parallel_time = time.time() - start
        
        speedup = serial_time / parallel_time
        error = np.max(np.abs(y_serial - y_parallel))
        
        print(f"  耗时: {parallel_time:.3f}s")
        print(f"  加速比: {speedup:.2f}x")
        print(f"  最大误差: {error:.2e}")
    
    # 矩阵乘法测试
    print("\n" + "-" * 70)
    print("矩阵乘法测试 (2000×2000)")
    n = 2000
    A = np.random.randn(n, n)
    B = np.random.randn(n, n)
    
    print("\nNumPy串行计算:")
    start = time.time()
    C_serial = A @ B
    serial_time = time.time() - start
    print(f"  耗时: {serial_time:.3f}s")
    
    for n_proc in [2, 4]:
        print(f"\n并行计算 ({n_proc}进程):")
        start = time.time()
        C_parallel = parallel_matrix_multiply(A, B, n_processes=n_proc)
        parallel_time = time.time() - start
        
        speedup = serial_time / parallel_time
        error = np.max(np.abs(C_serial - C_parallel))
        
        print(f"  耗时: {parallel_time:.3f}s")
        print(f"  加速比: {speedup:.2f}x")
        print(f"  最大误差: {error:.2e}")


benchmark_parallel_matrix()

4. MPI并行编程

4.1 MPI基础

MPI(Message Passing Interface) 是分布式内存并行编程的标准。

核心概念

  • Rank:进程标识符(0到n-1)
  • Size:总进程数
  • Communicator:通信域(通常是MPI.COMM_WORLD)

Python中的MPI:使用 mpi4py

# MPI基础示例(需要在命令行运行)
"""
# 保存为 mpi_hello.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

print(f"Hello from rank {rank} of {size}")

# 运行命令:
# mpiexec -n 4 python mpi_hello.py
"""

print("\n" + "=" * 70)
print("MPI基础概念")
print("=" * 70)
print("""
MPI(Message Passing Interface)是分布式内存并行编程的标准。

基本概念:
1. Rank: 进程的唯一标识符 (0, 1, 2, ..., size-1)
2. Size: 通信域中的进程总数
3. Communicator: 定义通信域,通常是 MPI.COMM_WORLD

安装mpi4py:
  pip install mpi4py

运行MPI程序:
  mpiexec -n 4 python script.py
  
或:
  mpirun -np 4 python script.py

MPI编程模式:
  1. SPMD (Single Program Multiple Data)
     - 所有进程运行相同程序
     - 根据rank执行不同代码分支
  
  2. MPMD (Multiple Program Multiple Data)
     - 不同进程运行不同程序
     - 通过MPI通信协调
""")

4.2 点对点通信

# 点对点通信示例
def mpi_point_to_point_demo():
    """
    MPI点对点通信示例
    
    需要在命令行运行:
    mpiexec -n 2 python -c "
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = np.arange(10, dtype='i')
    comm.Send(data, dest=1)
    print(f'Rank {rank}: 发送数据 {data}')
elif rank == 1:
    data = np.empty(10, dtype='i')
    comm.Recv(data, source=0)
    print(f'Rank {rank}: 接收数据 {data}')
"
    """
    pass  # 实际代码需要在MPI环境中运行


print("\n" + "=" * 70)
print("MPI点对点通信")
print("=" * 70)
print("""
点对点通信函数:

发送:
  comm.Send(data, dest, tag=0)      # 阻塞发送
  comm.Isend(data, dest, tag=0)     # 非阻塞发送
  comm.send(obj, dest, tag=0)       # Python对象发送(pickle序列化)

接收:
  comm.Recv(data, source, tag=0)    # 阻塞接收
  comm.Irecv(data, source, tag=0)   # 非阻塞接收
  obj = comm.recv(source, tag=0)    # Python对象接收

示例:
  if rank == 0:
      data = np.array([1, 2, 3])
      comm.Send(data, dest=1)
  elif rank == 1:
      data = np.empty(3)
      comm.Recv(data, source=0)

非阻塞通信:
  req = comm.Isend(data, dest=1)
  # ... 做其他事情 ...
  req.Wait()  # 等待发送完成
  
  req = comm.Irecv(data, source=0)
  # ... 做其他事情 ...
  req.Wait()  # 等待接收完成
""")

4.3 集合通信

print("\n" + "=" * 70)
print("MPI集合通信")
print("=" * 70)
print("""
集合通信操作:

广播 (Broadcast):
  comm.Bcast(data, root=0)          # 根进程广播到所有进程

散射 (Scatter):
  comm.Scatter(send_data, recv_data, root=0)  # 分散数据

收集 (Gather):
  comm.Gather(send_data, recv_data, root=0)   # 收集数据

全收集 (Allgather):
  comm.Allgather(send_data, recv_data)        # 收集到所有进程

归约 (Reduce):
  comm.Reduce(send_data, recv_data, op=MPI.SUM, root=0)  # 求和归约

全归约 (Allreduce):
  comm.Allreduce(send_data, recv_data, op=MPI.SUM)       # 归约到所有进程

全对全 (Alltoall):
  comm.Alltoall(send_data, recv_data)  # 每个进程向所有进程发送不同数据

归约操作:
  MPI.SUM   - 求和
  MPI.PROD  - 乘积
  MPI.MAX   - 最大值
  MPI.MIN   - 最小值
  MPI.MAXLOC - 最大值及位置
  MPI.MINLOC - 最小值及位置
  MPI.LAND  - 逻辑与
  MPI.LOR   - 逻辑或
  MPI.BAND  - 按位与
  MPI.BOR   - 按位或
""")


# MPI集合通信示例代码(需要在MPI环境中运行)
mpi_collective_code = '''
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# 广播示例
if rank == 0:
    data = np.array([100, 200, 300])
else:
    data = np.empty(3, dtype='i')

comm.Bcast(data, root=0)
print(f"Rank {rank}: 广播后 data = {data}")

# 散射示例
if rank == 0:
    send_data = np.arange(size * 4, dtype='i')
else:
    send_data = None

recv_data = np.empty(4, dtype='i')
comm.Scatter(send_data, recv_data, root=0)
print(f"Rank {rank}: 散射接收 data = {recv_data}")

# 归约示例
local_data = np.array([rank + 1])
global_sum = np.empty(1, dtype='i')
comm.Reduce(local_data, global_sum, op=MPI.SUM, root=0)

if rank == 0:
    print(f"Rank {rank}: 归约和 = {global_sum[0]}")
    print(f"      期望和 = {size*(size+1)//2}")

# 全归约示例
local_max = np.array([rank])
global_max = np.empty(1, dtype='i')
comm.Allreduce(local_max, global_max, op=MPI.MAX)
print(f"Rank {rank}: 全归约最大值 = {global_max[0]}")
'''

print("\n集合通信示例代码:")
print(mpi_collective_code)

4.4 MPI并行有限元

def mpi_fem_solver_demo():
    """
    MPI并行有限元求解器框架
    
    这是一个概念性示例,展示如何用MPI并行化有限元计算
    """
    print("\n" + "=" * 70)
    print("MPI并行有限元求解器框架")
    print("=" * 70)
    
    code = '''
from mpi4py import MPI
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import spsolve, cg

class ParallelFEMSolver:
    """MPI并行有限元求解器"""
    
    def __init__(self):
        self.comm = MPI.COMM_WORLD
        self.rank = self.comm.Get_rank()
        self.size = self.comm.Get_size()
        
        # 本地数据
        self.local_nodes = None
        self.local_elements = None
        self.local_K = None
        self.local_F = None
    
    def domain_decomposition(self, n_elements):
        """域分解"""
        # 简单的一维分解
        base = n_elements // self.size
        remainder = n_elements % self.size
        
        if self.rank < remainder:
            n_local = base + 1
            start = self.rank * n_local
        else:
            n_local = base
            start = remainder * (base + 1) + (self.rank - remainder) * base
        
        end = start + n_local
        return start, end
    
    def assemble_local(self, elements, nodes, material_props):
        """组装本地刚度矩阵"""
        n_local_nodes = len(nodes)
        n_dof = n_local_nodes * 3  # 3自由度/节点
        
        # 简化的组装过程
        K_local = np.zeros((n_dof, n_dof))
        F_local = np.zeros(n_dof)
        
        for elem_idx, elem_nodes in enumerate(elements):
            # 计算单元刚度(简化)
            ke = self.compute_element_stiffness(
                nodes[elem_nodes], material_props
            )
            
            # 组装到本地矩阵
            for i in range(len(elem_nodes)):
                for j in range(len(elem_nodes)):
                    global_i = elem_nodes[i] * 3
                    global_j = elem_nodes[j] * 3
                    K_local[global_i:global_i+3, global_j:global_j+3] += ke[i*3:(i+1)*3, j*3:(j+1)*3]
        
        return K_local, F_local
    
    def compute_element_stiffness(self, elem_coords, material_props):
        """计算单元刚度矩阵(简化)"""
        E, nu = material_props
        # 简化为单位矩阵乘以弹性模量
        n_nodes = len(elem_coords)
        return np.eye(n_nodes * 3) * E * 0.01
    
    def parallel_cg_solve(self, K, F, max_iter=1000, tol=1e-6):
        """并行共轭梯度求解"""
        n = len(F)
        x = np.zeros(n)
        r = F.copy()
        p = r.copy()
        rs_old = np.dot(r, r)
        
        for iteration in range(max_iter):
            # 并行矩阵-向量乘法
            Ap = self.parallel_spmv(K, p)
            
            alpha = rs_old / np.dot(p, Ap)
            x += alpha * p
            r -= alpha * Ap
            
            rs_new = np.dot(r, r)
            
            # 检查收敛(所有进程)
            global_residual = np.empty(1)
            self.comm.Allreduce(np.array([rs_new]), global_residual, op=MPI.SUM)
            
            if self.rank == 0:
                residual = np.sqrt(global_residual[0])
                if residual < tol:
                    print(f"CG收敛于迭代 {iteration}, 残差={residual:.2e}")
            
            if np.sqrt(global_residual[0]) < tol:
                break
            
            p = r + (rs_new / rs_old) * p
            rs_old = rs_new
        
        return x
    
    def parallel_spmv(self, K, x):
        """并行稀疏矩阵-向量乘法"""
        # 本地计算
        y_local = K @ x
        
        # 边界数据交换(简化)
        # 实际实现需要处理边界节点
        
        return y_local
    
    def solve(self, global_nodes, global_elements, material_props, bc_nodes):
        """并行求解"""
        # 1. 域分解
        start_elem, end_elem = self.domain_decomposition(len(global_elements))
        local_elements = global_elements[start_elem:end_elem]
        
        # 2. 确定本地节点
        local_node_set = set()
        for elem in local_elements:
            local_node_set.update(elem)
        local_nodes = sorted(list(local_node_set))
        local_node_map = {n: i for i, n in enumerate(local_nodes)}
        
        # 3. 组装本地矩阵
        local_coords = global_nodes[local_nodes]
        remapped_elements = [[local_node_map[n] for n in elem] 
                            for elem in local_elements]
        
        K_local, F_local = self.assemble_local(
            remapped_elements, local_coords, material_props
        )
        
        # 4. 边界条件处理
        # ...
        
        # 5. 并行求解
        u_local = self.parallel_cg_solve(K_local, F_local)
        
        # 6. 收集全局解
        # ...
        
        return u_local

# 使用示例
if __name__ == "__main__":
    solver = ParallelFEMSolver()
    print(f"Rank {solver.rank}/{solver.size} 准备就绪")
    
    # 创建测试网格
    if solver.rank == 0:
        n_nodes = 1000
        n_elements = 800
        nodes = np.random.randn(n_nodes, 3)
        elements = np.random.randint(0, n_nodes, size=(n_elements, 8))
        material_props = [200e9, 0.3]
        bc_nodes = [0, 1, 2]
    else:
        nodes = None
        elements = None
        material_props = None
        bc_nodes = None
    
    # 广播网格数据
    nodes = solver.comm.bcast(nodes, root=0)
    elements = solver.comm.bcast(elements, root=0)
    material_props = solver.comm.bcast(material_props, root=0)
    bc_nodes = solver.comm.bcast(bc_nodes, root=0)
    
    # 求解
    solution = solver.solve(nodes, elements, material_props, bc_nodes)
    print(f"Rank {solver.rank}: 求解完成,解向量长度={len(solution)}")
'''
    
    print(code)
    print("\n注: 以上代码需要在MPI环境中运行 (mpiexec -n 4 python script.py)")


mpi_fem_solver_demo()

5. OpenMP并行化

5.1 OpenMP基础

OpenMP 是共享内存并行编程的API,使用编译器指令(pragmas)。

Python中的OpenMP:通过Cython或Numba使用

print("\n" + "=" * 70)
print("OpenMP基础")
print("=" * 70)
print("""
OpenMP (Open Multi-Processing) 是共享内存并行编程标准。

基本指令格式:
  #pragma omp directive [clause[, clause]...]

常用指令:
  parallel     - 创建并行区域
  for          - 并行化for循环
  sections     - 并行执行不同代码段
  single       - 单线程执行
  critical     - 临界区(互斥访问)
  barrier      - 同步屏障
  atomic       - 原子操作
  reduction    - 归约操作

常用子句:
  private(var)      - 私有变量
  shared(var)       - 共享变量
  reduction(op:var) - 归约变量
  schedule(type)    - 调度策略 (static, dynamic, guided)
  num_threads(n)    - 线程数
  default(none)     - 默认无共享

在Python中使用OpenMP:
  1. Numba @jit(parallel=True)
  2. Cython with prange
  3. 调用C/C++ OpenMP代码

示例 (C++):
  #pragma omp parallel for reduction(+:sum) num_threads(4)
  for (int i = 0; i < n; i++) {
      sum += data[i];
  }
""")

5.2 Numba并行化

from numba import jit, prange, njit
import numpy as np
import time

@njit(parallel=True, fastmath=True)
def parallel_sum_numba(arr):
    """
    使用Numba并行求和
    
    prange: 并行range
    """
    total = 0.0
    for i in prange(len(arr)):
        total += arr[i]
    return total


@njit(parallel=True, fastmath=True)
def parallel_matrix_multiply_numba(A, B):
    """
    并行矩阵乘法
    """
    m, n = A.shape
    n2, p = B.shape
    C = np.zeros((m, p))
    
    for i in prange(m):
        for j in range(p):
            s = 0.0
            for k in range(n):
                s += A[i, k] * B[k, j]
            C[i, j] = s
    
    return C


@njit(parallel=True)
def parallel_element_wise_operation(x, y):
    """
    并行元素级操作
    """
    n = len(x)
    result = np.empty(n)
    for i in prange(n):
        result[i] = np.sin(x[i]) * np.cos(y[i]) + x[i]**2 / (y[i] + 1.0)
    return result


@njit(parallel=True, fastmath=True)
def parallel_array_reduction(arr):
    """
    并行数组归约操作
    """
    n = len(arr)
    sum_val = 0.0
    sum_sq = 0.0
    max_val = arr[0]
    min_val = arr[0]
    
    for i in prange(n):
        val = arr[i]
        sum_val += val
        sum_sq += val * val
        if val > max_val:
            max_val = val
        if val < min_val:
            min_val = val
    
    return sum_val, sum_sq, max_val, min_val


def benchmark_numba_parallel():
    """
    Numba并行性能测试
    """
    print("\n" + "=" * 70)
    print("Numba并行性能测试")
    print("=" * 70)
    
    # 预热编译
    warmup_arr = np.random.randn(100)
    parallel_sum_numba(warmup_arr)
    
    # 测试1: 数组求和
    print("\n测试1: 大数组求和")
    n = 10_000_000
    arr = np.random.randn(n)
    
    # NumPy
    start = time.time()
    result_np = np.sum(arr)
    time_np = time.time() - start
    print(f"  NumPy: {time_np:.4f}s, 结果={result_np:.2f}")
    
    # Numba串行
    @njit(fastmath=True)
    def serial_sum(arr):
        total = 0.0
        for i in range(len(arr)):
            total += arr[i]
        return total
    
    serial_sum(arr)  # 编译
    start = time.time()
    result_serial = serial_sum(arr)
    time_serial = time.time() - start
    print(f"  Numba串行: {time_serial:.4f}s, 结果={result_serial:.2f}")
    
    # Numba并行
    parallel_sum_numba(arr)  # 编译
    start = time.time()
    result_parallel = parallel_sum_numba(arr)
    time_parallel = time.time() - start
    print(f"  Numba并行: {time_parallel:.4f}s, 结果={result_parallel:.2f}")
    
    if time_parallel > 0:
        speedup = time_serial / time_parallel
        print(f"  加速比: {speedup:.2f}x")
    
    # 测试2: 矩阵乘法
    print("\n测试2: 矩阵乘法 (1000×1000)")
    n = 1000
    A = np.random.randn(n, n)
    B = np.random.randn(n, n)
    
    # NumPy
    start = time.time()
    C_np = A @ B
    time_np = time.time() - start
    print(f"  NumPy BLAS: {time_np:.4f}s")
    
    # Numba并行
    parallel_matrix_multiply_numba(A[:100, :100], B[:100, :100])  # 编译
    start = time.time()
    C_numba = parallel_matrix_multiply_numba(A, B)
    time_numba = time.time() - start
    print(f"  Numba并行: {time_numba:.4f}s")
    
    error = np.max(np.abs(C_np - C_numba))
    print(f"  最大误差: {error:.2e}")
    
    # 测试3: 数组归约
    print("\n测试3: 数组归约操作")
    arr = np.random.randn(5_000_000)
    
    start = time.time()
    sum_val, sum_sq, max_val, min_val = parallel_array_reduction(arr)
    elapsed = time.time() - start
    
    print(f"  计算时间: {elapsed:.4f}s")
    print(f"  求和: {sum_val:.2f}")
    print(f"  平方和: {sum_sq:.2f}")
    print(f"  最大值: {max_val:.4f}")
    print(f"  最小值: {min_val:.4f}")


benchmark_numba_parallel()

5.3 并行循环优化

@njit(parallel=True, cache=True)
def parallel_fem_element_loop(nodes, elements, material_props):
    """
    并行有限元单元循环示例
    
    参数:
        nodes: 节点坐标 (n_nodes, 3)
        elements: 单元连接 (n_elements, n_nodes_per_element)
        material_props: 材料属性
    
    返回:
        单元应变能数组
    """
    n_elements = len(elements)
    strain_energy = np.zeros(n_elements)
    
    for e in prange(n_elements):
        # 获取单元节点
        elem_nodes = elements[e]
        
        # 计算单元刚度矩阵(简化)
        # 实际实现需要形函数、雅可比等
        
        # 计算单元应变能(简化示例)
        for i in range(len(elem_nodes)):
            node_idx = elem_nodes[i]
            for j in range(3):  # x, y, z
                strain_energy[e] += nodes[node_idx, j] ** 2
    
    return strain_energy


@njit(parallel=True)
def parallel_stress_computation(stress_array, strain_array, E, nu):
    """
    并行应力计算
    
    参数:
        stress_array: 应力数组 (n_points, 6)
        strain_array: 应变数组 (n_points, 6)
        E: 弹性模量
        nu: 泊松比
    """
    n_points = len(stress_array)
    
    # 弹性矩阵系数(各向同性材料)
    lambda_ = E * nu / ((1 + nu) * (1 - 2 * nu))
    mu = E / (2 * (1 + nu))
    
    for i in prange(n_points):
        # 体积应变
        eps_vol = strain_array[i, 0] + strain_array[i, 1] + strain_array[i, 2]
        
        # 计算应力(广义胡克定律)
        for j in range(3):  # 正应力
            stress_array[i, j] = lambda_ * eps_vol + 2 * mu * strain_array[i, j]
        
        for j in range(3, 6):  # 剪应力
            stress_array[i, j] = 2 * mu * strain_array[i, j]


@njit(parallel=True)
def parallel_gauss_integration(nodes, weights, n_gauss, func_values):
    """
    并行高斯积分
    
    参数:
        nodes: 积分点坐标
        weights: 积分权重
        n_gauss: 每个方向的积分点数
        func_values: 函数在积分点的值
    
    返回:
        积分结果
    """
    n_elements = len(func_values)
    integral = np.zeros(n_elements)
    
    for e in prange(n_elements):
        result = 0.0
        for i in range(n_gauss):
            for j in range(n_gauss):
                for k in range(n_gauss):
                    idx = i * n_gauss * n_gauss + j * n_gauss + k
                    weight = weights[i] * weights[j] * weights[k]
                    result += weight * func_values[e, idx]
        integral[e] = result
    
    return integral


def benchmark_fem_operations():
    """
    FEM操作并行性能测试
    """
    print("\n" + "=" * 70)
    print("FEM操作并行性能测试")
    print("=" * 70)
    
    # 测试1: 单元应变能计算
    print("\n测试1: 单元应变能计算")
    n_nodes = 10000
    n_elements = 5000
    nodes_per_element = 8  # 六面体单元
    
    nodes = np.random.randn(n_nodes, 3)
    elements = np.random.randint(0, n_nodes, size=(n_elements, nodes_per_element))
    material_props = np.array([200e9, 0.3])  # E, nu
    
    # 预热
    parallel_fem_element_loop(nodes, elements[:100], material_props)
    
    start = time.time()
    strain_energy = parallel_fem_element_loop(nodes, elements, material_props)
    elapsed = time.time() - start
    print(f"  单元数: {n_elements}")
    print(f"  计算时间: {elapsed:.4f}s")
    print(f"  总应变能: {np.sum(strain_energy):.2f}")
    
    # 测试2: 应力计算
    print("\n测试2: 并行应力计算")
    n_points = 100000
    strain = np.random.randn(n_points, 6) * 1e-4
    stress = np.zeros((n_points, 6))
    
    E = 200e9  # 钢
    nu = 0.3
    
    start = time.time()
    parallel_stress_computation(stress, strain, E, nu)
    elapsed = time.time() - start
    print(f"  计算点数: {n_points}")
    print(f"  计算时间: {elapsed:.4f}s")
    print(f"  von Mises应力范围: [{np.min(stress):.2e}, {np.max(stress):.2e}]")
    
    # 测试3: 高斯积分
    print("\n测试3: 并行高斯积分")
    n_gauss = 3
    n_elements = 10000
    func_values = np.random.randn(n_elements, n_gauss**3)
    nodes_1d = np.array([-0.7745966692, 0.0, 0.7745966692])
    weights_1d = np.array([0.5555555556, 0.8888888889, 0.5555555556])
    
    start = time.time()
    integral = parallel_gauss_integration(nodes_1d, weights_1d, n_gauss, func_values)
    elapsed = time.time() - start
    print(f"  单元数: {n_elements}")
    print(f"  积分点数/单元: {n_gauss**3}")
    print(f"  计算时间: {elapsed:.4f}s")
    print(f"  积分结果范围: [{np.min(integral):.4f}, {np.max(integral):.4f}]")


benchmark_fem_operations()

6. GPU加速计算

6.1 GPU架构与CUDA

GPU vs CPU

特性 CPU GPU
核心数 4-64 数千
时钟频率 高 (3-5 GHz) 较低 (1-2 GHz)
内存带宽 50-200 GB/s 500-2000 GB/s
适合任务 串行、复杂控制 数据并行、计算密集
延迟隐藏 缓存、分支预测 线程切换

Python GPU计算选项

  1. CuPy:NumPy兼容的GPU数组库
  2. Numba CUDA:Python CUDA内核
  3. PyTorch/TensorFlow:深度学习框架的GPU支持
  4. PyCUDA:直接CUDA编程
print("\n" + "=" * 70)
print("GPU加速计算基础")
print("=" * 70)
print("""
GPU计算库选择:

1. CuPy (推荐用于数值计算)
   - NumPy兼容API
   - 自动内存管理
   - 安装: pip install cupy-cuda11x

2. Numba CUDA
   - Python编写CUDA内核
   - 细粒度控制
   - 安装: pip install numba

3. PyTorch
   - 深度学习优化
   - 自动求导
   - 安装: pip install torch

4. PyCUDA
   - 直接CUDA编程
   - 最灵活但复杂
   - 安装: pip install pycuda

GPU编程模型:
  - Grid: 线程块网格
  - Block: 线程块
  - Thread: 执行线程
  
  Grid (多个Blocks)
  ┌─────────────────────────────────────┐
  │ Block(0,0) │ Block(0,1) │ Block(0,2)│
  │ ┌────────┐ │ ┌────────┐ │ ┌────────┐│
  │ │ Thread │ │ │ Thread │ │ │ Thread ││
  │ │  0..N  │ │ │  0..N  │ │ │  0..N  ││
  │ └────────┘ │ └────────┘ │ └────────┘│
  └─────────────────────────────────────┘
  
  线程索引计算:
    global_id = blockIdx.x * blockDim.x + threadIdx.x
    
  内存层次:
    - 全局内存 (Global Memory): 大容量,高延迟
    - 共享内存 (Shared Memory): 块内共享,低延迟
    - 寄存器 (Registers): 每个线程私有,最快
    - 常量/纹理内存: 只读,缓存优化
""")

6.2 CuPy基础

try:
    import cupy as cp
    CUPY_AVAILABLE = True
except ImportError:
    CUPY_AVAILABLE = False
    print("\nCuPy未安装,跳过GPU示例")
    print("安装命令: pip install cupy-cuda11x")


if CUPY_AVAILABLE:
    def cupy_basic_demo():
        """
        CuPy基础操作示例
        """
        print("\n" + "=" * 70)
        print("CuPy基础操作")
        print("=" * 70)
        
        # 检查GPU
        print(f"\nGPU设备: {cp.cuda.Device(0).use()}")
        print(f"GPU数量: {cp.cuda.runtime.getDeviceCount()}")
        
        # 创建GPU数组
        print("\n创建GPU数组:")
        x_gpu = cp.array([1, 2, 3, 4, 5])
        print(f"  GPU数组: {x_gpu}")
        print(f"  设备: {x_gpu.device}")
        
        # 从NumPy创建
        x_cpu = np.random.randn(1000, 1000)
        x_gpu = cp.asarray(x_cpu)
        print(f"\n  从NumPy转换: shape={x_gpu.shape}")
        
        # GPU计算
        print("\nGPU计算:")
        y_gpu = cp.sin(x_gpu) + cp.cos(x_gpu)
        z_gpu = x_gpu @ x_gpu.T
        print(f"  元素级运算完成")
        print(f"  矩阵乘法结果: shape={z_gpu.shape}")
        
        # 转回CPU
        z_cpu = cp.asnumpy(z_gpu)
        print(f"\n  转回CPU: type={type(z_cpu)}")
    
    
    def cupy_performance_benchmark():
        """
        CuPy性能基准测试
        """
        print("\n" + "=" * 70)
        print("CuPy性能基准测试")
        print("=" * 70)
        
        sizes = [1000, 2000, 5000]
        
        for n in sizes:
            print(f"\n矩阵大小: {n}×{n}")
            
            # CPU数据
            A_cpu = np.random.randn(n, n).astype('float32')
            B_cpu = np.random.randn(n, n).astype('float32')
            
            # GPU数据
            A_gpu = cp.asarray(A_cpu)
            B_gpu = cp.asarray(B_cpu)
            
            # CPU计算
            start = time.time()
            C_cpu = A_cpu @ B_cpu
            cpu_time = time.time() - start
            print(f"  CPU: {cpu_time:.4f}s")
            
            # GPU计算(包含数据传输)
            start = time.time()
            C_gpu = A_gpu @ B_gpu
            cp.cuda.Device(0).synchronize()
            gpu_time_with_transfer = time.time() - start
            print(f"  GPU(含传输): {gpu_time_with_transfer:.4f}s")
            
            # 纯GPU计算(不含传输)
            C_gpu = A_gpu @ B_gpu
            cp.cuda.Device(0).synchronize()
            
            start = time.time()
            C_gpu = A_gpu @ B_gpu
            cp.cuda.Device(0).synchronize()
            gpu_time_pure = time.time() - start
            print(f"  GPU(纯计算): {gpu_time_pure:.4f}s")
            
            if gpu_time_pure > 0:
                speedup = cpu_time / gpu_time_pure
                print(f"  加速比: {speedup:.1f}x")
    
    
    cupy_basic_demo()
    cupy_performance_benchmark()

else:
    # 模拟CuPy示例
    print("\n" + "=" * 70)
    print("CuPy示例代码 (需要GPU运行)")
    print("=" * 70)
    print("""
import cupy as cp
import numpy as np

# 创建GPU数组
x_gpu = cp.array([1, 2, 3, 4, 5])

# NumPy数组转GPU
x_cpu = np.random.randn(1000, 1000)
x_gpu = cp.asarray(x_cpu)

# GPU计算
y_gpu = cp.sin(x_gpu) + cp.cos(x_gpu)
z_gpu = x_gpu @ x_gpu.T  # 矩阵乘法

# 转回CPU
z_cpu = cp.asnumpy(z_gpu)

# 性能对比
# GPU通常比CPU快10-100倍(大规模矩阵运算)
""")


# 6.3 Numba CUDA编程
from numba import cuda

# 检查CUDA可用性
try:
    cuda_available = cuda.is_available()
except:
    cuda_available = False

if cuda_available:
    print("\n" + "=" * 70)
    print("Numba CUDA编程")
    print("=" * 70)
    
    # CUDA内核示例
    @cuda.jit
    def vector_add_kernel(result, a, b):
        """
        向量加法CUDA内核
        
        参数:
            result: 结果数组
            a, b: 输入数组
        """
        # 获取线程索引
        i = cuda.grid(1)
        
        # 边界检查
        if i < result.size:
            result[i] = a[i] + b[i]
    
    
    @cuda.jit
    def matrix_multiply_kernel(C, A, B):
        """
        矩阵乘法CUDA内核(简化版)
        """
        row, col = cuda.grid(2)
        
        if row < C.shape[0] and col < C.shape[1]:
            tmp = 0.0
            for k in range(A.shape[1]):
                tmp += A[row, k] * B[k, col]
            C[row, col] = tmp
    
    
    def run_cuda_vector_add():
        """运行CUDA向量加法"""
        print("\nCUDA向量加法:")
        
        # 数据
        n = 1000000
        a = np.random.randn(n).astype(np.float32)
        b = np.random.randn(n).astype(np.float32)
        result = np.empty_like(a)
        
        # 传输到GPU
        d_a = cuda.to_device(a)
        d_b = cuda.to_device(b)
        d_result = cuda.to_device(result)
        
        # 配置线程
        threads_per_block = 256
        blocks_per_grid = (n + threads_per_block - 1) // threads_per_block
        
        # 启动内核
        start = time.time()
        vector_add_kernel[blocks_per_grid, threads_per_block](d_result, d_a, d_b)
        cuda.synchronize()
        gpu_time = time.time() - start
        
        # 复制结果回CPU
        result = d_result.copy_to_host()
        
        # 验证
        expected = a + b
        error = np.max(np.abs(result - expected))
        
        print(f"  数组大小: {n}")
        print(f"  GPU时间: {gpu_time:.4f}s")
        print(f"  最大误差: {error:.2e}")
        
        # 对比CPU
        start = time.time()
        result_cpu = a + b
        cpu_time = time.time() - start
        print(f"  CPU时间: {cpu_time:.4f}s")
        print(f"  加速比: {cpu_time/gpu_time:.1f}x")
    
    
    run_cuda_vector_add()

else:
    print("\n" + "=" * 70)
    print("Numba CUDA示例代码 (需要NVIDIA GPU)")
    print("=" * 70)
    print("""
from numba import cuda
import numpy as np

@cuda.jit
def vector_add_kernel(result, a, b):
    i = cuda.grid(1)
    if i < result.size:
        result[i] = a[i] + b[i]

# 数据准备
n = 1000000
a = np.random.randn(n).astype(np.float32)
b = np.random.randn(n).astype(np.float32)

# 传输到GPU
d_a = cuda.to_device(a)
d_b = cuda.to_device(b)
d_result = cuda.device_array_like(a)

# 启动内核
threads_per_block = 256
blocks_per_grid = (n + threads_per_block - 1) // threads_per_block
vector_add_kernel[blocks_per_grid, threads_per_block](d_result, d_a, d_b)

# 获取结果
result = d_result.copy_to_host()
""")


---

## 7. 有限元并行求解

### 7.1 有限元并行化策略

**并行化层次**1. **单元级并行**
   - 并行计算单元刚度矩阵
   - 无通信开销
   - 适合:OpenMP、GPU

2. **方程级并行**
   - 并行求解线性方程组
   - 需要通信
   - 适合:MPI、混合并行

3. **域分解并行**
   - 将网格分解到不同处理器
   - 需要边界通信
   - 适合:MPI

```python
print("\n" + "=" * 70)
print("有限元并行化策略")
print("=" * 70)
print("""
有限元并行化层次:

1. 单元级并行 (Element-level Parallelism)
   ┌─────────────────────────────────────┐
   │  for e in elements (并行)            │
   │    Ke = compute_element_stiffness(e) │
   │    assemble(K, Ke)                   │
   └─────────────────────────────────────┘
   
   特点:
   - 计算密集,通信少
   - 适合OpenMP/GPU
   - 注意组装时的竞争条件

2. 方程级并行 (Equation-level Parallelism)
   ┌─────────────────────────────────────┐
   │  K * u = F                          │
   │  ↓                                  │
   │  并行求解器 (CG, GMRES等)            │
   └─────────────────────────────────────┘
   
   特点:
   - 需要矩阵-向量乘并行
   - 需要全局通信(归约)
   - 适合MPI

3. 域分解并行 (Domain Decomposition)
   ┌─────────┐  ┌─────────┐  ┌─────────┐
   │ 子域0   │  │ 子域1   │  │ 子域2   │
   │ ┌─────┐ │  │ ┌─────┐ │  │ ┌─────┐ │
   │ │内部 │ │←→│ │内部 │ │←→│ │内部 │ │
   │ │+边界│ │  │ │+边界│ │  │ │+边界│ │
   │ └─────┘ │  │ └─────┘ │  │ └─────┘ │
   └─────────┘  └─────────┘  └─────────┘
      ↑              ↑              ↑
   本地求解      边界通信        本地求解
   
   特点:
   - 扩展性好
   - 需要迭代求解
   - 适合大规模问题
""")

7.2 并行单元组装

from numba import njit, prange

@njit(parallel=True)
def parallel_element_assembly(nodes, elements, material_props, n_nodes):
    """
    并行单元刚度矩阵组装
    
    参数:
        nodes: 节点坐标 (n_nodes, 3)
        elements: 单元连接 (n_elements, n_nodes_per_element)
        material_props: 材料属性 [E, nu]
        n_nodes: 总节点数
    
    返回:
        element_data: 单元刚度矩阵数据
    """
    n_elements = len(elements)
    nodes_per_element = elements.shape[1]
    dof_per_node = 3
    dof_per_element = nodes_per_element * dof_per_node
    
    # 存储单元刚度矩阵
    element_data = np.zeros((n_elements, dof_per_element, dof_per_element))
    
    for e in prange(n_elements):
        # 获取单元节点坐标
        elem_nodes = elements[e]
        coords = np.zeros((nodes_per_element, 3))
        for i in range(nodes_per_element):
            for j in range(3):
                coords[i, j] = nodes[elem_nodes[i], j]
        
        # 计算单元刚度矩阵(简化:使用单位矩阵)
        E = material_props[0]
        for i in range(dof_per_element):
            element_data[e, i, i] = E * 0.1
    
    return element_data


@njit(parallel=True)
def parallel_stress_recovery(nodes, elements, displacements, E, nu):
    """
    并行应力恢复
    
    参数:
        nodes: 节点坐标
        elements: 单元连接
        displacements: 节点位移
        E, nu: 材料参数
    
    返回:
        element_stresses: 单元应力
    """
    n_elements = len(elements)
    element_stresses = np.zeros((n_elements, 6))  # 6个应力分量
    
    for e in prange(n_elements):
        elem_nodes = elements[e]
        
        # 获取单元位移
        elem_disp = np.zeros(24)  # 8节点×3自由度
        for i in range(8):
            node_idx = elem_nodes[i]
            for j in range(3):
                elem_disp[i*3 + j] = displacements[node_idx*3 + j]
        
        # 计算应变(简化)
        strain = np.zeros(6)
        for i in range(6):
            strain[i] = elem_disp[i] * 0.01
        
        # 计算应力(广义胡克定律)
        lambda_ = E * nu / ((1 + nu) * (1 - 2 * nu))
        mu = E / (2 * (1 + nu))
        
        eps_vol = strain[0] + strain[1] + strain[2]
        
        for i in range(3):
            element_stresses[e, i] = lambda_ * eps_vol + 2 * mu * strain[i]
        for i in range(3, 6):
            element_stresses[e, i] = 2 * mu * strain[i]
    
    return element_stresses


def benchmark_parallel_fem():
    """
    并行FEM操作基准测试
    """
    print("\n" + "=" * 70)
    print("并行FEM操作基准测试")
    print("=" * 70)
    
    # 生成测试网格
    n_nodes = 10000
    n_elements = 8000
    nodes_per_element = 8
    
    nodes = np.random.randn(n_nodes, 3)
    elements = np.random.randint(0, n_nodes, size=(n_elements, nodes_per_element))
    material_props = np.array([200e9, 0.3])  # 钢
    
    print(f"\n网格规模:")
    print(f"  节点数: {n_nodes}")
    print(f"  单元数: {n_elements}")
    
    # 预热
    parallel_element_assembly(nodes[:100], elements[:100], material_props, n_nodes)
    
    # 并行单元组装
    print("\n并行单元刚度矩阵组装:")
    start = time.time()
    element_data = parallel_element_assembly(nodes, elements, material_props, n_nodes)
    elapsed = time.time() - start
    print(f"  计算时间: {elapsed:.4f}s")
    print(f"  单元刚度矩阵数: {len(element_data)}")
    
    # 并行应力恢复
    print("\n并行应力恢复:")
    displacements = np.random.randn(n_nodes * 3) * 1e-3
    
    start = time.time()
    stresses = parallel_stress_recovery(nodes, elements, displacements, 
                                        material_props[0], material_props[1])
    elapsed = time.time() - start
    print(f"  计算时间: {elapsed:.4f}s")
    print(f"  应力计算点数: {len(stresses)}")
    
    # von Mises应力统计
    von_mises = np.sqrt(stresses[:, 0]**2 + stresses[:, 1]**2 + stresses[:, 2]**2
                        - stresses[:, 0]*stresses[:, 1] 
                        - stresses[:, 1]*stresses[:, 2] 
                        - stresses[:, 2]*stresses[:, 0]
                        + 3*(stresses[:, 3]**2 + stresses[:, 4]**2 + stresses[:, 5]**2))
    print(f"\n  von Mises应力统计:")
    print(f"    最小值: {np.min(von_mises):.2e} Pa")
    print(f"    最大值: {np.max(von_mises):.2e} Pa")
    print(f"    平均值: {np.mean(von_mises):.2e} Pa")


benchmark_parallel_fem()

7.3 并行线性求解器

from scipy.sparse import csr_matrix
from scipy.sparse.linalg import cg, LinearOperator

@njit(parallel=True)
def parallel_spmv(A_data, A_indices, A_indptr, x, n_rows):
    """
    并行稀疏矩阵-向量乘法
    
    参数:
        A_data, A_indices, A_indptr: CSR格式矩阵数据
        x: 向量
        n_rows: 行数
    
    返回:
        y = A @ x
    """
    y = np.zeros(n_rows)
    
    for i in prange(n_rows):
        row_start = A_indptr[i]
        row_end = A_indptr[i + 1]
        
        s = 0.0
        for j in range(row_start, row_end):
            col = A_indices[j]
            s += A_data[j] * x[col]
        
        y[i] = s
    
    return y


def create_parallel_linear_operator(A):
    """
    创建并行线性算子用于迭代求解
    
    参数:
        A: 稀疏矩阵 (CSR格式)
    
    返回:
        LinearOperator
    """
    n = A.shape[0]
    A_data = A.data
    A_indices = A.indices
    A_indptr = A.indptr
    
    def matvec(x):
        return parallel_spmv(A_data, A_indices, A_indptr, x, n)
    
    return LinearOperator((n, n), matvec=matvec)


def benchmark_parallel_solver():
    """
    并行求解器基准测试
    """
    print("\n" + "=" * 70)
    print("并行线性求解器基准测试")
    print("=" * 70)
    
    # 创建测试矩阵(有限元风格)
    n = 5000
    print(f"\n矩阵规模: {n}×{n}")
    
    # 创建带状稀疏矩阵
    diagonals = [np.ones(n), np.ones(n-1), np.ones(n-1), 
                 np.ones(n-2), np.ones(n-2)]
    offsets = [0, 1, -1, 2, -2]
    
    from scipy.sparse import diags
    A = diags(diagonals, offsets, format='csr')
    A = A + 4 * csr_matrix(np.eye(n))  # 使矩阵正定
    
    # 右端项
    b = np.random.randn(n)
    
    # 标准CG求解
    print("\n标准CG求解:")
    start = time.time()
    x_serial, info = cg(A, b, tol=1e-6, maxiter=1000)
    serial_time = time.time() - start
    print(f"  求解时间: {serial_time:.4f}s")
    print(f"  迭代次数: info={info}")
    print(f"  残差: {np.linalg.norm(A @ x_serial - b):.2e}")
    
    # 使用并行SpMV的CG求解
    print("\n并行CG求解:")
    A_parallel = create_parallel_linear_operator(A)
    
    start = time.time()
    x_parallel, info = cg(A_parallel, b, tol=1e-6, maxiter=1000)
    parallel_time = time.time() - start
    print(f"  求解时间: {parallel_time:.4f}s")
    print(f"  迭代次数: info={info}")
    print(f"  残差: {np.linalg.norm(A @ x_parallel - b):.2e}")
    print(f"  加速比: {serial_time/parallel_time:.2f}x")


benchmark_parallel_solver()

8. 大规模仿真案例

8.1 案例1:并行模态分析

def parallel_modal_analysis_case():
    """
    并行模态分析案例
    
    计算大型结构的固有频率和振型
    """
    print("\n" + "=" * 70)
    print("案例1: 并行模态分析")
    print("=" * 70)
    
    print("""
    问题描述:
    - 飞机机翼结构模态分析
    - 网格规模: 100万节点,80万单元
    - 求解前50阶模态
    
    并行策略:
    1. 单元级并行: 并行组装质量矩阵和刚度矩阵
    2. 方程级并行: 并行求解广义特征值问题
    3. 使用子空间迭代法或Lanczos算法
    
    性能指标:
    - 串行时间: ~2小时
    - 并行时间(64核): ~3分钟
    - 加速比: ~40x
    """)
    
    # 模拟模态分析
    n_modes = 50
    n_dof = 1000000
    
    print(f"\n模拟模态分析:")
    print(f"  自由度: {n_dof:,}")
    print(f"  求解模态数: {n_modes}")
    
    # 模拟特征值(固有频率)
    frequencies = np.linspace(1, 500, n_modes)  # Hz
    
    print(f"\n  前10阶固有频率:")
    for i in range(10):
        print(f"    模态{i+1}: {frequencies[i]:.2f} Hz")
    
    print(f"\n  频率范围: {frequencies[0]:.2f} - {frequencies[-1]:.2f} Hz")


parallel_modal_analysis_case()

8.2 案例2:并行瞬态动力学

def parallel_transient_dynamics_case():
    """
    并行瞬态动力学案例
    
    冲击载荷下的结构响应分析
    """
    print("\n" + "=" * 70)
    print("案例2: 并行瞬态动力学")
    print("=" * 70)
    
    print("""
    问题描述:
    - 汽车碰撞仿真
    - 显式动力学求解
    - 时间步长: 1e-6秒
    - 总时间: 0.1秒
    - 时间步数: 100,000
    
    并行策略:
    1. 单元级并行: 并行计算单元内力
    2. 节点级并行: 并行更新节点速度和位移
    3. 域分解: 将网格分配到多个处理器
    
    性能指标:
    - 串行时间: ~10天
    - 并行时间(256核集群): ~1小时
    - 加速比: ~240x
    """)
    
    # 模拟瞬态分析
    n_steps = 100000
    dt = 1e-6
    total_time = n_steps * dt
    
    print(f"\n模拟瞬态分析:")
    print(f"  时间步数: {n_steps:,}")
    print(f"  时间步长: {dt:.2e} s")
    print(f"  总时间: {total_time:.2f} s")
    
    # 模拟位移响应
    time_points = np.linspace(0, total_time, 1000)
    displacement = np.sin(2 * np.pi * 10 * time_points) * np.exp(-time_points * 5)
    
    print(f"\n  最大位移: {np.max(np.abs(displacement)):.4f} m")
    print(f"  响应频率: ~10 Hz")


parallel_transient_dynamics_case()

8.3 案例3:多物理场耦合

def multiphysics_coupling_case():
    """
    多物理场耦合案例
    
    热-结构耦合分析
    """
    print("\n" + "=" * 70)
    print("案例3: 热-结构耦合分析")
    print("=" * 70)
    
    print("""
    问题描述:
    - 涡轮叶片热-结构耦合分析
    - 温度场影响材料属性
    - 热应力计算
    
    并行策略:
    1. 任务并行: 温度场和结构场同时求解
    2. 数据并行: 每个场内部并行化
    3. 流水线并行: 时间步进中重叠计算
    
    性能指标:
    - 串行时间: ~8小时
    - 并行时间(32核): ~20分钟
    - 加速比: ~24x
    """)
    
    # 模拟热-结构耦合
    n_nodes = 50000
    time_steps = 100
    
    print(f"\n模拟热-结构耦合:")
    print(f"  节点数: {n_nodes:,}")
    print(f"  时间步数: {time_steps}")
    
    # 模拟温度分布
    temperature = np.linspace(300, 1200, n_nodes)  # K
    thermal_stress = (temperature - 300) * 2e5 * 1e-6  # Pa
    
    print(f"\n  温度范围: {np.min(temperature):.0f} - {np.max(temperature):.0f} K")
    print(f"  热应力范围: {np.min(thermal_stress):.2e} - {np.max(thermal_stress):.2e} Pa")
    print(f"  最大von Mises应力: {np.max(thermal_stress) * 1.5:.2e} Pa")


multiphysics_coupling_case()

9. 性能优化策略

9.1 内存优化

def memory_optimization_techniques():
    """
    内存优化技术
    """
    print("\n" + "=" * 70)
    print("内存优化技术")
    print("=" * 70)
    
    print("""
    1. 数据局部性优化
       - 空间局部性: 连续访问内存
       - 时间局部性: 重用缓存数据
       
    2. 内存对齐
       - 确保数据按缓存行对齐
       - 避免伪共享
       
    3. 分块技术 (Tiling/Blocking)
       - 将大数据集分成适合缓存的小块
       - 提高缓存命中率
       
    4. 稀疏矩阵存储优化
       - CSR格式: 适合行访问
       - CSC格式: 适合列访问
       - 块稀疏格式: 适合有限元
    """)
    
    # 示例: 矩阵分块乘法
    def blocked_matrix_multiply(A, B, block_size=64):
        """分块矩阵乘法"""
        m, k = A.shape
        k2, n = B.shape
        C = np.zeros((m, n))
        
        for i0 in range(0, m, block_size):
            i1 = min(i0 + block_size, m)
            for j0 in range(0, n, block_size):
                j1 = min(j0 + block_size, n)
                    for k0 in range(0, k, block_size):
                        k1 = min(k0 + block_size, k)
                        # 块乘法
                        C[i0:i1, j0:j1] += A[i0:i1, k0:k1] @ B[k0:k1, j0:j1]
        
        return C
    
    print("\n分块矩阵乘法示例:")
    n = 1024
    A = np.random.randn(n, n)
    B = np.random.randn(n, n)
    
    # 标准乘法
    start = time.time()
    C_standard = A @ B
    t_standard = time.time() - start
    
    # 分块乘法
    start = time.time()
    C_blocked = blocked_matrix_multiply(A, B, block_size=128)
    t_blocked = time.time() - start
    
    print(f"  标准乘法: {t_standard:.4f}s")
    print(f"  分块乘法: {t_blocked:.4f}s")
    print(f"  误差: {np.max(np.abs(C_standard - C_blocked)):.2e}")


memory_optimization_techniques()

9.2 计算优化

def computation_optimization_techniques():
    """
    计算优化技术
    """
    print("\n" + "=" * 70)
    print("计算优化技术")
    print("=" * 70)
    
    print("""
    1. 向量化计算
       - 使用NumPy/Numba向量化操作
       - 避免Python循环
       
    2. 预计算和查表
       - 预计算形函数值
       - 预计算积分点权重
       
    3. 算法优化
       - 选择合适的求解器
       - 预处理技术
       
    4. 编译优化
       - Numba JIT编译
       - Cython编译
    """)
    
    # 示例: 预计算形函数
    def precompute_shape_functions():
        """预计算有限元形函数"""
        # 高斯积分点
        gauss_points = np.array([
            [-0.577350269189626, -0.577350269189626, -0.577350269189626],
            [0.577350269189626, -0.577350269189626, -0.577350269189626],
            [0.577350269189626, 0.577350269189626, -0.577350269189626],
            [-0.577350269189626, 0.577350269189626, -0.577350269189626],
            [-0.577350269189626, -0.577350269189626, 0.577350269189626],
            [0.577350269189626, -0.577350269189626, 0.577350269189626],
            [0.577350269189626, 0.577350269189626, 0.577350269189626],
            [-0.577350269189626, 0.577350269189626, 0.577350269189626]
        ])
        
        gauss_weights = np.ones(8)  # 2×2×2积分
        
        # 预计算形函数及其导数
        n_gp = len(gauss_points)
        shape_functions = np.zeros((n_gp, 8))
        shape_derivatives = np.zeros((n_gp, 8, 3))
        
        for gp in range(n_gp):
            xi, eta, zeta = gauss_points[gp]
            
            # 8节点六面体形函数
            shape_functions[gp] = np.array([
                (1-xi)*(1-eta)*(1-zeta)/8,
                (1+xi)*(1-eta)*(1-zeta)/8,
                (1+xi)*(1+eta)*(1-zeta)/8,
                (1-xi)*(1+eta)*(1-zeta)/8,
                (1-xi)*(1-eta)*(1+zeta)/8,
                (1+xi)*(1-eta)*(1+zeta)/8,
                (1+xi)*(1+eta)*(1+zeta)/8,
                (1-xi)*(1+eta)*(1+zeta)/8
            ])
        
        print(f"\n预计算形函数:")
        print(f"  积分点数: {n_gp}")
        print(f"  形函数矩阵: {shape_functions.shape}")
        print(f"  内存占用: {shape_functions.nbytes / 1024:.2f} KB")
        
        return shape_functions, gauss_weights
    
    precompute_shape_functions()


computation_optimization_techniques()

9.3 通信优化

def communication_optimization():
    """
    通信优化技术
    """
    print("\n" + "=" * 70)
    print("通信优化技术")
    print("=" * 70)
    
    print("""
    1. 通信聚合
       - 合并小消息为大数据包
       - 减少通信次数
       
    2. 非阻塞通信
       - 计算与通信重叠
       - 隐藏通信延迟
       
    3. 拓扑感知映射
       - 将相邻子域映射到相邻处理器
       - 减少网络跳数
       
    4. 负载均衡
       - 动态负载均衡
       - 自适应网格细化
    """)
    
    # 示例: 通信聚合
    def aggregate_communication_example():
        """通信聚合示例"""
        print("\n通信聚合示例:")
        
        n_messages = 1000
        message_size = 100  # doubles
        
        # 单独发送
        print(f"  单独发送 {n_messages} 条消息:")
        print(f"    总数据量: {n_messages * message_size * 8 / 1024:.2f} KB")
        print(f"    通信次数: {n_messages}")
        
        # 聚合发送
        aggregated_size = n_messages * message_size
        print(f"\n  聚合发送:")
        print(f"    总数据量: {aggregated_size * 8 / 1024:.2f} KB")
        print(f"    通信次数: 1")
        print(f"    减少开销: {(1 - 1/n_messages)*100:.1f}%")
    
    aggregate_communication_example()


communication_optimization()

10. 云计算与分布式仿真

10.1 云计算基础

def cloud_computing_basics():
    """
    云计算基础
    """
    print("\n" + "=" * 70)
    print("云计算与分布式仿真")
    print("=" * 70)
    
    print("""
    云计算服务模式:
    
    1. IaaS (基础设施即服务)
       - 提供虚拟机、存储、网络
       - 示例: AWS EC2, Azure VMs, 阿里云ECS
       
    2. PaaS (平台即服务)
       - 提供开发平台和工具
       - 示例: AWS Elastic Beanstalk, Google App Engine
       
    3. SaaS (软件即服务)
       - 提供完整软件应用
       - 示例: Onshape, SimScale
    
    云计算优势:
    - 弹性扩展: 按需分配资源
    - 成本效益: 按需付费
    - 高可用性: 多区域部署
    - 全球访问: 任何地方提交作业
    """)
    
    # 云服务提供商对比
    print("\n主要云服务提供商:")
    providers = {
        'AWS': {'compute': 'EC2', 'storage': 'S3', 'HPC': 'ParallelCluster'},
        'Azure': {'compute': 'Virtual Machines', 'storage': 'Blob', 'HPC': 'CycleCloud'},
        'Google Cloud': {'compute': 'Compute Engine', 'storage': 'Cloud Storage', 'HPC': 'Cloud HPC'},
        '阿里云': {'compute': 'ECS', 'storage': 'OSS', 'HPC': 'E-HPC'},
    }
    
    for provider, services in providers.items():
        print(f"  {provider}:")
        for service_type, service_name in services.items():
            print(f"    {service_type}: {service_name}")


cloud_computing_basics()

10.2 容器化与微服务

def containerization_microservices():
    """
    容器化与微服务
    """
    print("\n" + "=" * 70)
    print("容器化与微服务")
    print("=" * 70)
    
    print("""
    容器化技术:
    
    1. Docker
       - 轻量级虚拟化
       - 环境一致性
       - 快速部署
       
    2. Kubernetes
       - 容器编排
       - 自动扩展
       - 负载均衡
    
    微服务架构:
    
    仿真微服务示例:
    ┌─────────────────────────────────────────┐
    │           API Gateway                    │
    └─────────────┬───────────────────────────┘
                  │
        ┌─────────┼─────────┐
        ↓         ↓         ↓
    ┌───────┐ ┌───────┐ ┌───────┐
    │前处理  │ │求解器  │ │后处理  │
    │服务    │ │服务    │ │服务    │
    └───────┘ └───────┘ └───────┘
        │         │         │
        └─────────┼─────────┘
                  ↓
           ┌───────────┐
           │  数据库    │
           └───────────┘
    
    优势:
    - 独立部署和扩展
    - 技术栈灵活
    - 故障隔离
    """)
    
    # Dockerfile示例
    dockerfile_example = '''
# Dockerfile for FEM Simulation
FROM python:3.9-slim

# 安装依赖
RUN apt-get update && apt-get install -y \\
    libopenmpi-dev \\
    && rm -rf /var/lib/apt/lists/*

# 安装Python包
RUN pip install numpy scipy numba mpi4py

# 复制代码
WORKDIR /app
COPY . /app

# 运行命令
CMD ["python", "fem_solver.py"]
'''
    
    print("\nDockerfile示例:")
    print(dockerfile_example)


containerization_microservices()

10.3 总结与展望

def summary_and_outlook():
    """
    总结与展望
    """
    print("\n" + "=" * 70)
    print("总结与展望")
    print("=" * 70)
    
    print("""
    本主题要点总结:
    
    1. 并行计算基础
       - 阿姆达尔定律和古斯塔夫森定律
       - 并行化类型: 数据并行、任务并行
       - 性能指标: 加速比、效率、可扩展性
       
    2. Python并行编程
       - multiprocessing: 多进程并行
       - MPI: 分布式内存并行
       - Numba: 共享内存并行
       - GPU: CUDA编程
       
    3. 有限元并行求解
       - 单元级并行
       - 方程级并行
       - 域分解方法
       
    4. 性能优化
       - 内存优化
       - 计算优化
       - 通信优化
    
    未来发展趋势:
    
    1. 异构计算
       - CPU + GPU协同计算
       - 专用AI加速器(TPU, NPU)
       
    2. 云原生仿真
       - 无服务器计算
       - 边缘计算
       
    3. 智能化仿真
       - AI驱动的网格生成
       - 物理信息神经网络(PINN)
       - 数字孪生
       
    4. 量子计算
       - 量子线性求解器
       - 量子机器学习
    """)
    
    print("\n关键学习要点:")
    key_points = [
        "理解并行计算的基本原理和限制",
        "掌握Python多进程和MPI编程",
        "学会使用Numba进行并行加速",
        "了解GPU编程的基本概念",
        "能够设计和优化并行有限元算法",
        "熟悉云计算和容器化技术"
    ]
    
    for i, point in enumerate(key_points, 1):
        print(f"  {i}. {point}")
    
    print("\n" + "=" * 70)
    print("主题076: 并行计算与高性能仿真 - 完")
    print("=" * 70)


summary_and_outlook()

附录: 完整运行代码

以下是本主题所有代码的整合版本,可以保存为Python文件直接运行:

"""
主题076: 并行计算与高性能仿真 - 完整代码
运行方式: python parallel_computing_simulation.py
"""

import numpy as np
import time
import multiprocessing as mp
from multiprocessing import Pool
from numba import njit, prange, jit
from functools import wraps

# 设置随机种子保证可重复性
np.random.seed(42)

print("=" * 70)
print("主题076: 并行计算与高性能仿真")
print("=" * 70)

# 运行所有示例...
# (此处整合所有上述代码)

if __name__ == "__main__":
    print("\n所有示例运行完成!")

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐