强度仿真-主题076-并行计算与高性能仿真
主题076:并行计算与高性能仿真
目录
1. 并行计算概述
1.1 为什么需要并行计算
计算需求的爆炸式增长:
现代工程仿真面临的挑战:
- 模型规模:百万甚至千万自由度
- 物理复杂性:多物理场耦合、多尺度问题
- 时间要求:设计周期缩短,需要快速反馈
- 精度需求:更精细的网格,更复杂的本构
性能瓶颈:
单核CPU性能提升遇到物理极限:
- 摩尔定律放缓,时钟频率难以继续提升
- 功耗墙限制,散热成为关键问题
- 内存墙问题,数据传输速度跟不上计算速度
并行计算的优势:
| 指标 | 串行计算 | 并行计算 |
|---|---|---|
| 计算时间 | T T T | T / P T/P T/P(理想) |
| 可处理问题规模 | 有限 | 可扩展 |
| 内存容量 | 单机限制 | 聚合内存 |
| 成本效益 | 低 | 高(使用普通硬件) |
1.2 并行计算的基本概念
并行化类型:
-
数据并行(Data Parallelism)
- 相同操作应用于不同数据
- 适合:矩阵运算、图像处理、有限元计算
- 示例:每个处理器处理网格的一部分
-
任务并行(Task Parallelism)
- 不同任务同时执行
- 适合:流水线处理、多物理场耦合
- 示例:同时计算结构和流体
-
流水线并行(Pipeline Parallelism)
- 任务分阶段,重叠执行
- 适合:迭代算法、时间步进
并行计算模型:
┌─────────────────────────────────────────────────────────────┐
│ 并行计算模型分类 │
├─────────────────────────────────────────────────────────────┤
│ 共享内存模型 │ 分布式内存模型 │
│ ├─ OpenMP │ ├─ MPI │
│ ├─ Pthreads │ ├─ 消息传递 │
│ └─ GPU (CUDA/OpenCL) │ └─ 网络通信 │
├─────────────────────────────────────────────────────────────┤
│ 混合模型:MPI + OpenMP, MPI + CUDA │
└─────────────────────────────────────────────────────────────┘
关键性能指标:
-
加速比(Speedup):
S ( P ) = T 1 T P S(P) = \frac{T_1}{T_P} S(P)=TPT1
其中 T 1 T_1 T1 是串行时间, T P T_P TP 是 P P P 个处理器的并行时间 -
效率(Efficiency):
E ( P ) = S ( P ) P = T 1 P ⋅ T P E(P) = \frac{S(P)}{P} = \frac{T_1}{P \cdot T_P} E(P)=PS(P)=P⋅TPT1 -
可扩展性(Scalability):
- 强可扩展性:固定问题规模,增加处理器
- 弱可扩展性:问题规模与处理器成比例增加
阿姆达尔定律:
S ( P ) = 1 ( 1 − f ) + f P S(P) = \frac{1}{(1-f) + \frac{f}{P}} S(P)=(1−f)+Pf1
其中 f f f 是可并行化部分的比例。
- 当 f = 0.9 f = 0.9 f=0.9(90%可并行), P = 100 P = 100 P=100 时: S ≈ 9.2 S \approx 9.2 S≈9.2
- 当 f = 0.99 f = 0.99 f=0.99(99%可并行), P = 100 P = 100 P=100 时: S ≈ 50 S \approx 50 S≈50
古斯塔夫森定律:
阿姆达尔定律假设问题规模固定,而古斯塔夫森定律考虑问题规模随处理器增加而扩大:
S ( P ) = P − f ( P − 1 ) S(P) = P - f(P-1) S(P)=P−f(P−1)
这表明并行计算的真正优势在于解决更大规模的问题。
Python实现性能测试:
import time
import numpy as np
def measure_serial_performance(func, *args, n_runs=5):
"""测量串行函数性能"""
times = []
for _ in range(n_runs):
start = time.time()
result = func(*args)
end = time.time()
times.append(end - start)
return np.mean(times), np.std(times), result
def matrix_multiply_benchmark(n=2000):
"""矩阵乘法基准测试"""
A = np.random.randn(n, n)
B = np.random.randn(n, n)
mean_time, std_time, C = measure_serial_performance(
lambda: A @ B, n_runs=3
)
print(f"矩阵乘法 ({n}×{n})")
print(f" 平均时间: {mean_time:.3f}s ± {std_time:.3f}s")
print(f" 计算性能: {2*n**3 / mean_time / 1e9:.2f} GFLOPS")
return mean_time
# 基准测试
print("=" * 70)
print("串行性能基准测试")
print("=" * 70)
t_serial = matrix_multiply_benchmark(2000)
1.3 并行计算硬件架构
现代计算架构:
-
多核CPU
- 4-64核心,共享内存
- 适合:OpenMP并行、任务并行
- 内存带宽:50-200 GB/s
-
众核处理器(Xeon Phi)
- 60+核心,SIMD指令
- 适合:高度并行的科学计算
-
GPU加速器
- 数千核心,高内存带宽
- 适合:数据并行、大规模矩阵运算
- 内存带宽:500-2000 GB/s
-
集群系统
- 多台服务器,分布式内存
- 适合:MPI并行、超大规模问题
内存层次结构:
┌─────────────────────────────────────────┐
│ 寄存器(Register) │ 最快,容量最小
│ └─ 每个核心:数十个 │
├─────────────────────────────────────────┤
│ L1缓存 │ 32-64 KB
│ └─ 每个核心私有 │
├─────────────────────────────────────────┤
│ L2缓存 │ 256-512 KB
│ └─ 每个核心或每几个核心共享 │
├─────────────────────────────────────────┤
│ L3缓存(LLC) │ 8-64 MB
│ └─ 所有核心共享 │
├─────────────────────────────────────────┤
│ 主内存(DRAM) │ 16-1024 GB
│ └─ 所有核心共享 │
├─────────────────────────────────────────┤
│ 网络/存储 │ 最慢,容量最大
└─────────────────────────────────────────┘
并行计算硬件选型指南:
| 应用场景 | 推荐架构 | 理由 |
|---|---|---|
| 中小规模FEM | 多核CPU | 开发简单,内存充足 |
| 大规模CFD | GPU集群 | 计算密集,数据并行 |
| 多物理场耦合 | CPU集群 | 任务并行,通信灵活 |
| 实时仿真 | FPGA/ASIC | 低延迟,确定性 |
2. 并行计算基础理论
2.1 并行算法设计
分解策略:
-
域分解(Domain Decomposition)
- 将计算域划分为子域
- 每个处理器负责一个子域
- 适合:有限元、有限差分、CFD
-
功能分解(Functional Decomposition)
- 按功能模块划分任务
- 适合:多物理场耦合、前后处理分离
-
流水线分解(Pipeline Decomposition)
- 任务分阶段并行
- 适合:图像处理、信号处理
负载均衡:
import numpy as np
def domain_decomposition_1d(n_elements, n_processors):
"""
一维域分解
参数:
n_elements: 总单元数
n_processors: 处理器数
返回:
每个处理器的起始和结束索引
"""
base_size = n_elements // n_processors
remainder = n_elements % n_processors
ranges = []
start = 0
for i in range(n_processors):
# 前remainder个处理器多分配一个单元
size = base_size + (1 if i < remainder else 0)
end = start + size
ranges.append((start, end))
start = end
return ranges
def domain_decomposition_2d(nx, ny, px, py):
"""
二维域分解
参数:
nx, ny: x和y方向的单元数
px, py: x和y方向的处理器数
返回:
每个处理器的子域范围
"""
# x方向分解
x_ranges = domain_decomposition_1d(nx, px)
# y方向分解
y_ranges = domain_decomposition_1d(ny, py)
subdomains = []
for i in range(px):
for j in range(py):
subdomains.append({
'proc_id': i * py + j,
'x_range': x_ranges[i],
'y_range': y_ranges[j]
})
return subdomains
def domain_decomposition_3d(nx, ny, nz, px, py, pz):
"""
三维域分解
参数:
nx, ny, nz: 三个方向的单元数
px, py, pz: 三个方向的处理器数
返回:
每个处理器的子域范围
"""
x_ranges = domain_decomposition_1d(nx, px)
y_ranges = domain_decomposition_1d(ny, py)
z_ranges = domain_decomposition_1d(nz, pz)
subdomains = []
for i in range(px):
for j in range(py):
for k in range(pz):
subdomains.append({
'proc_id': i * py * pz + j * pz + k,
'x_range': x_ranges[i],
'y_range': y_ranges[j],
'z_range': z_ranges[k]
})
return subdomains
# 示例:域分解
print("\n" + "=" * 70)
print("域分解示例")
print("=" * 70)
# 1D分解
ranges_1d = domain_decomposition_1d(100, 4)
print("\n1D域分解 (100单元, 4处理器):")
for i, (start, end) in enumerate(ranges_1d):
print(f" 处理器{i}: 单元 {start}-{end-1} (共{end-start}个)")
# 2D分解
subdomains = domain_decomposition_2d(100, 80, 2, 2)
print("\n2D域分解 (100×80网格, 2×2处理器):")
for sub in subdomains:
x_start, x_end = sub['x_range']
y_start, y_end = sub['y_range']
print(f" 处理器{sub['proc_id']}: x[{x_start}:{x_end}], y[{y_start}:{y_end}]")
# 3D分解
subdomains_3d = domain_decomposition_3d(60, 50, 40, 2, 2, 2)
print("\n3D域分解 (60×50×40网格, 2×2×2处理器):")
for sub in subdomains_3d[:4]: # 只显示前4个
x_start, x_end = sub['x_range']
y_start, y_end = sub['y_range']
z_start, z_end = sub['z_range']
print(f" 处理器{sub['proc_id']}: x[{x_start}:{x_end}], y[{y_start}:{y_end}], z[{z_start}:{z_end}]")
2.2 通信与同步
通信模式:
-
点对点通信
- 两个处理器之间的直接通信
- 适合:边界数据交换、不规则通信
-
集合通信
- 广播(Broadcast):一个到所有
- 归约(Reduce):所有到一个(求和、最大等)
- 散射(Scatter):一个分散到所有
- 收集(Gather):所有收集到一个
- 全收集(Allgather):所有收集到所有
同步机制:
class SimpleBarrier:
"""简单的屏障同步实现"""
def __init__(self, n):
self.n = n # 参与者数量
self.count = 0
self.mutex = None # 互斥锁(实际实现需要)
self.cond = None # 条件变量(实际实现需要)
def wait(self):
"""等待所有参与者到达"""
# 实际实现需要线程/进程同步原语
pass
def halo_exchange_example():
"""
光环交换(Halo Exchange)示例
有限差分/有限元并行计算中的典型通信模式
"""
print("\n光环交换示意图:")
print("""
处理器0 处理器1 处理器2
┌─────────┐ ┌─────────┐ ┌─────────┐
│内部单元 │←──→│内部单元 │←──→│内部单元 │
│ + 光环 │ │ + 光环 │ │ + 光环 │
└─────────┘ └─────────┘ └─────────┘
↑ ↑ ↑
发送边界 双向交换 接收边界
每个处理器需要:
1. 计算内部单元
2. 发送边界数据到邻居
3. 接收邻居的边界数据
4. 更新光环区域
""")
halo_exchange_example()
通信优化策略:
-
通信隐藏计算
- 非阻塞通信
- 计算与通信重叠
-
减少通信次数
- 聚合小消息
- 批量数据传输
-
拓扑感知映射
- 将相邻子域映射到物理相邻的处理器
- 减少网络跳数
2.3 并行效率分析
性能模型:
T P = T c o m p u t e P + T c o m m + T s y n c T_P = \frac{T_{compute}}{P} + T_{comm} + T_{sync} TP=PTcompute+Tcomm+Tsync
其中:
- T c o m p u t e T_{compute} Tcompute:计算时间
- T c o m m T_{comm} Tcomm:通信时间
- T s y n c T_{sync} Tsync:同步开销
通信开销模型:
T c o m m = T s t a r t u p + n ⋅ T t r a n s f e r T_{comm} = T_{startup} + n \cdot T_{transfer} Tcomm=Tstartup+n⋅Ttransfer
- T s t a r t u p T_{startup} Tstartup:启动延迟(延迟)
- T t r a n s f e r T_{transfer} Ttransfer:单位数据传输时间(带宽的倒数)
- n n n:传输数据量
Python性能分析工具:
import time
from functools import wraps
def timer_decorator(func):
"""计时装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
print(f"{func.__name__} 耗时: {end - start:.4f}s")
return result
return wrapper
class PerformanceAnalyzer:
"""性能分析器"""
def __init__(self):
self.timings = {}
self.memory_usage = {}
def record(self, name, time_value, memory_mb=None):
"""记录时间和内存"""
if name not in self.timings:
self.timings[name] = []
self.memory_usage[name] = []
self.timings[name].append(time_value)
if memory_mb is not None:
self.memory_usage[name].append(memory_mb)
def report(self):
"""生成报告"""
print("\n" + "=" * 70)
print("性能分析报告")
print("=" * 70)
total = 0
for name, times in self.timings.items():
avg_time = np.mean(times)
std_time = np.std(times)
total += avg_time
print(f"{name:30s}: {avg_time:.4f}s ± {std_time:.4f}s")
if name in self.memory_usage and self.memory_usage[name]:
avg_mem = np.mean(self.memory_usage[name])
print(f"{'':30s} 内存: {avg_mem:.2f} MB")
print("-" * 70)
print(f"{'总计':30s}: {total:.4f}s")
class ParallelScalabilityAnalyzer:
"""并行可扩展性分析器"""
def __init__(self):
self.results = {}
def add_result(self, n_processors, time, problem_size=None):
"""添加测试结果"""
self.results[n_processors] = {
'time': time,
'problem_size': problem_size
}
def compute_speedup(self, baseline_n_proc=1):
"""计算加速比"""
if baseline_n_proc not in self.results:
return None
baseline_time = self.results[baseline_n_proc]['time']
speedups = {}
for n_proc, data in self.results.items():
speedups[n_proc] = baseline_time / data['time']
return speedups
def compute_efficiency(self, baseline_n_proc=1):
"""计算效率"""
speedups = self.compute_speedup(baseline_n_proc)
if speedups is None:
return None
efficiencies = {}
for n_proc, speedup in speedups.items():
efficiencies[n_proc] = speedup / n_proc
return efficiencies
def report(self):
"""生成可扩展性报告"""
print("\n" + "=" * 70)
print("并行可扩展性分析报告")
print("=" * 70)
print(f"{'处理器数':>10s} {'时间(s)':>12s} {'加速比':>10s} {'效率(%)':>10s}")
print("-" * 50)
speedups = self.compute_speedup()
efficiencies = self.compute_efficiency()
for n_proc in sorted(self.results.keys()):
time_val = self.results[n_proc]['time']
speedup = speedups.get(n_proc, 0)
efficiency = efficiencies.get(n_proc, 0) * 100
print(f"{n_proc:10d} {time_val:12.4f} {speedup:10.2f} {efficiency:10.1f}")
# 示例:分析矩阵运算性能
print("\n" + "=" * 70)
print("矩阵运算性能分析")
print("=" * 70)
analyzer = PerformanceAnalyzer()
# 矩阵创建
start = time.perf_counter()
A = np.random.randn(3000, 3000)
B = np.random.randn(3000, 3000)
analyzer.record("矩阵创建", time.perf_counter() - start)
# 矩阵乘法
start = time.perf_counter()
C = A @ B
analyzer.record("矩阵乘法", time.perf_counter() - start)
# 矩阵求逆
start = time.perf_counter()
A_inv = np.linalg.inv(A[:1000, :1000])
analyzer.record("矩阵求逆(1000)", time.perf_counter() - start)
# 特征值分解
start = time.perf_counter()
eigenvalues = np.linalg.eigvals(A[:500, :500])
analyzer.record("特征值(500)", time.perf_counter() - start)
analyzer.report()
3. Python多进程与多线程
3.1 Python的GIL与并行选择
全局解释器锁(GIL):
- Python的CPython实现有GIL
- 同一时刻只有一个线程执行Python字节码
- 影响:多线程不能实现真正的CPU并行
解决方案:
| 方法 | 适用场景 | 实现方式 |
|---|---|---|
| 多进程 | CPU密集型 | multiprocessing |
| 多线程 | I/O密集型 | threading |
| 异步 | I/O密集型 | asyncio |
| C扩展 | CPU密集型 | Cython, Numba |
进程 vs 线程:
┌─────────────────────────────────────────────────────────────┐
│ 多进程 (Multiprocessing) 多线程 (Threading) │
├─────────────────────────────────────────────────────────────┤
│ • 独立内存空间 • 共享内存空间 │
│ • 无GIL限制 • 受GIL限制 │
│ • 高启动开销 • 低启动开销 │
│ • 适合CPU密集型 • 适合I/O密集型 │
│ • 需要序列化通信 • 直接共享数据 │
└─────────────────────────────────────────────────────────────┘
并行策略选择决策树:
任务类型?
/ \
CPU密集型 I/O密集型
| |
数据并行? 阻塞操作?
/ \ / \
是 否 是 否
| | | |
multiprocessing MPI threading asyncio
或Numba (大规模)
(单机多核) 或GPU
3.2 multiprocessing模块
基础用法:
import multiprocessing as mp
from multiprocessing import Pool, Process, Queue, Manager, Value, Lock
import numpy as np
import time
def worker_function(data_chunk):
"""
工作函数 - 处理数据块
参数:
data_chunk: 数据块
返回:
处理结果
"""
# 模拟计算密集型任务
result = np.sum(data_chunk ** 2)
return result
def parallel_map_example():
"""
使用Pool进行并行map操作
"""
print("\n" + "=" * 70)
print("multiprocessing.Pool示例")
print("=" * 70)
# 生成数据
n_chunks = 8
chunk_size = 1000000
data = [np.random.randn(chunk_size) for _ in range(n_chunks)]
# 串行执行
print("\n串行执行:")
start = time.time()
serial_results = [worker_function(chunk) for chunk in data]
serial_time = time.time() - start
print(f" 耗时: {serial_time:.3f}s")
print(f" 结果: {serial_results}")
# 并行执行
print("\n并行执行 (8进程):")
start = time.time()
with Pool(processes=8) as pool:
parallel_results = pool.map(worker_function, data)
parallel_time = time.time() - start
print(f" 耗时: {parallel_time:.3f}s")
print(f" 结果: {parallel_results}")
# 加速比
speedup = serial_time / parallel_time
efficiency = speedup / 8
print(f"\n加速比: {speedup:.2f}x")
print(f"效率: {efficiency*100:.1f}%")
# 验证结果
assert np.allclose(serial_results, parallel_results), "结果不一致!"
print("\n✓ 结果验证通过")
parallel_map_example()
进程间通信:
def producer(queue, n_items):
"""生产者进程"""
for i in range(n_items):
data = np.random.randn(1000)
queue.put(data)
print(f"生产者: 生产数据块 {i+1}")
queue.put(None) # 结束信号
def consumer(queue, results):
"""消费者进程"""
while True:
data = queue.get()
if data is None:
break
result = np.sum(data ** 2)
results.append(result)
print(f"消费者: 处理完成,结果={result:.2f}")
def queue_communication_example():
"""
使用Queue进行进程间通信
"""
print("\n" + "=" * 70)
print("进程间通信示例 (Queue)")
print("=" * 70)
queue = Queue(maxsize=10)
manager = Manager()
results = manager.list()
# 创建进程
producer_proc = Process(target=producer, args=(queue, 5))
consumer_proc = Process(target=consumer, args=(queue, results))
# 启动进程
start = time.time()
producer_proc.start()
consumer_proc.start()
# 等待完成
producer_proc.join()
consumer_proc.join()
elapsed = time.time() - start
print(f"\n总耗时: {elapsed:.3f}s")
print(f"处理结果数: {len(results)}")
queue_communication_example()
共享内存:
from multiprocessing import shared_memory
def shared_memory_example():
"""
使用共享内存进行高效数据共享
"""
print("\n" + "=" * 70)
print("共享内存示例")
print("=" * 70)
# 创建共享内存
n = 1000000
data = np.random.randn(n)
# 创建共享内存块
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
shared_array[:] = data[:]
print(f"共享内存名称: {shm.name}")
print(f"数据大小: {data.nbytes / 1e6:.2f} MB")
def worker(shm_name, shape, dtype, start_idx, end_idx):
"""工作进程 - 处理共享内存的一部分"""
# 连接到共享内存
existing_shm = shared_memory.SharedMemory(name=shm_name)
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# 处理数据
local_sum = np.sum(arr[start_idx:end_idx] ** 2)
existing_shm.close()
return local_sum
# 并行处理
n_processes = 4
chunk_size = n // n_processes
with Pool(processes=n_processes) as pool:
results = pool.starmap(
worker,
[(shm.name, data.shape, data.dtype, i*chunk_size, (i+1)*chunk_size)
for i in range(n_processes)]
)
total_sum = sum(results)
expected_sum = np.sum(data ** 2)
print(f"\n并行计算结果: {total_sum:.2f}")
print(f"期望结果: {expected_sum:.2f}")
print(f"误差: {abs(total_sum - expected_sum):.6f}")
# 清理
shm.close()
shm.unlink()
shared_memory_example()
进程池高级用法:
def parallel_map_async_example():
"""
异步并行map操作
"""
print("\n" + "=" * 70)
print("异步并行Map示例")
print("=" * 70)
def slow_function(x):
"""模拟耗时操作"""
time.sleep(0.1)
return x ** 2
data = list(range(20))
with Pool(processes=4) as pool:
# 异步执行
result_async = pool.map_async(slow_function, data)
# 等待完成(可以在此期间做其他事情)
while not result_async.ready():
print(f"进度: {result_async._number_left} 批次剩余")
time.sleep(0.05)
results = result_async.get()
print(f"\n完成! 结果数: {len(results)}")
print(f"前5个结果: {results[:5]}")
parallel_map_async_example()
3.3 并行矩阵运算
def parallel_matrix_vector(A, x, n_processes=None):
"""
并行矩阵-向量乘法
参数:
A: 矩阵 (m×n)
x: 向量 (n,)
n_processes: 进程数
返回:
y = A @ x
"""
if n_processes is None:
n_processes = mp.cpu_count()
m, n = A.shape
chunk_size = m // n_processes
def compute_chunk(args):
A_chunk, x, start_row = args
return start_row, A_chunk @ x
# 分割矩阵
chunks = []
for i in range(n_processes):
start = i * chunk_size
end = m if i == n_processes - 1 else (i + 1) * chunk_size
chunks.append((A[start:end], x, start))
# 并行计算
with Pool(processes=n_processes) as pool:
results = pool.map(compute_chunk, chunks)
# 合并结果
y = np.empty(m)
for start_row, y_chunk in results:
y[start_row:start_row + len(y_chunk)] = y_chunk
return y
def parallel_matrix_multiply(A, B, n_processes=None):
"""
并行矩阵乘法(分块算法)
参数:
A: 矩阵 (m×k)
B: 矩阵 (k×n)
n_processes: 进程数
返回:
C = A @ B (m×n)
"""
if n_processes is None:
n_processes = mp.cpu_count()
m, k = A.shape
k2, n = B.shape
assert k == k2, "矩阵维度不匹配"
# 对A按行分块
chunk_size = m // n_processes
def compute_chunk(args):
A_chunk, B, start_row = args
return start_row, A_chunk @ B
chunks = []
for i in range(n_processes):
start = i * chunk_size
end = m if i == n_processes - 1 else (i + 1) * chunk_size
chunks.append((A[start:end], B, start))
with Pool(processes=n_processes) as pool:
results = pool.map(compute_chunk, chunks)
# 合并结果
C = np.empty((m, n))
for start_row, C_chunk in results:
C[start_row:start_row + C_chunk.shape[0]] = C_chunk
return C
def benchmark_parallel_matrix():
"""
并行矩阵运算基准测试
"""
print("\n" + "=" * 70)
print("并行矩阵运算基准测试")
print("=" * 70)
# 生成测试数据
n = 5000
A = np.random.randn(n, n)
x = np.random.randn(n)
# NumPy串行计算
print("\nNumPy串行计算:")
start = time.time()
y_serial = A @ x
serial_time = time.time() - start
print(f" 耗时: {serial_time:.3f}s")
# 并行计算(不同进程数)
for n_proc in [2, 4, 8]:
print(f"\n并行计算 ({n_proc}进程):")
start = time.time()
y_parallel = parallel_matrix_vector(A, x, n_processes=n_proc)
parallel_time = time.time() - start
speedup = serial_time / parallel_time
error = np.max(np.abs(y_serial - y_parallel))
print(f" 耗时: {parallel_time:.3f}s")
print(f" 加速比: {speedup:.2f}x")
print(f" 最大误差: {error:.2e}")
# 矩阵乘法测试
print("\n" + "-" * 70)
print("矩阵乘法测试 (2000×2000)")
n = 2000
A = np.random.randn(n, n)
B = np.random.randn(n, n)
print("\nNumPy串行计算:")
start = time.time()
C_serial = A @ B
serial_time = time.time() - start
print(f" 耗时: {serial_time:.3f}s")
for n_proc in [2, 4]:
print(f"\n并行计算 ({n_proc}进程):")
start = time.time()
C_parallel = parallel_matrix_multiply(A, B, n_processes=n_proc)
parallel_time = time.time() - start
speedup = serial_time / parallel_time
error = np.max(np.abs(C_serial - C_parallel))
print(f" 耗时: {parallel_time:.3f}s")
print(f" 加速比: {speedup:.2f}x")
print(f" 最大误差: {error:.2e}")
benchmark_parallel_matrix()
4. MPI并行编程
4.1 MPI基础
MPI(Message Passing Interface) 是分布式内存并行编程的标准。
核心概念:
- Rank:进程标识符(0到n-1)
- Size:总进程数
- Communicator:通信域(通常是MPI.COMM_WORLD)
Python中的MPI:使用 mpi4py 库
# MPI基础示例(需要在命令行运行)
"""
# 保存为 mpi_hello.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
print(f"Hello from rank {rank} of {size}")
# 运行命令:
# mpiexec -n 4 python mpi_hello.py
"""
print("\n" + "=" * 70)
print("MPI基础概念")
print("=" * 70)
print("""
MPI(Message Passing Interface)是分布式内存并行编程的标准。
基本概念:
1. Rank: 进程的唯一标识符 (0, 1, 2, ..., size-1)
2. Size: 通信域中的进程总数
3. Communicator: 定义通信域,通常是 MPI.COMM_WORLD
安装mpi4py:
pip install mpi4py
运行MPI程序:
mpiexec -n 4 python script.py
或:
mpirun -np 4 python script.py
MPI编程模式:
1. SPMD (Single Program Multiple Data)
- 所有进程运行相同程序
- 根据rank执行不同代码分支
2. MPMD (Multiple Program Multiple Data)
- 不同进程运行不同程序
- 通过MPI通信协调
""")
4.2 点对点通信
# 点对点通信示例
def mpi_point_to_point_demo():
"""
MPI点对点通信示例
需要在命令行运行:
mpiexec -n 2 python -c "
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = np.arange(10, dtype='i')
comm.Send(data, dest=1)
print(f'Rank {rank}: 发送数据 {data}')
elif rank == 1:
data = np.empty(10, dtype='i')
comm.Recv(data, source=0)
print(f'Rank {rank}: 接收数据 {data}')
"
"""
pass # 实际代码需要在MPI环境中运行
print("\n" + "=" * 70)
print("MPI点对点通信")
print("=" * 70)
print("""
点对点通信函数:
发送:
comm.Send(data, dest, tag=0) # 阻塞发送
comm.Isend(data, dest, tag=0) # 非阻塞发送
comm.send(obj, dest, tag=0) # Python对象发送(pickle序列化)
接收:
comm.Recv(data, source, tag=0) # 阻塞接收
comm.Irecv(data, source, tag=0) # 非阻塞接收
obj = comm.recv(source, tag=0) # Python对象接收
示例:
if rank == 0:
data = np.array([1, 2, 3])
comm.Send(data, dest=1)
elif rank == 1:
data = np.empty(3)
comm.Recv(data, source=0)
非阻塞通信:
req = comm.Isend(data, dest=1)
# ... 做其他事情 ...
req.Wait() # 等待发送完成
req = comm.Irecv(data, source=0)
# ... 做其他事情 ...
req.Wait() # 等待接收完成
""")
4.3 集合通信
print("\n" + "=" * 70)
print("MPI集合通信")
print("=" * 70)
print("""
集合通信操作:
广播 (Broadcast):
comm.Bcast(data, root=0) # 根进程广播到所有进程
散射 (Scatter):
comm.Scatter(send_data, recv_data, root=0) # 分散数据
收集 (Gather):
comm.Gather(send_data, recv_data, root=0) # 收集数据
全收集 (Allgather):
comm.Allgather(send_data, recv_data) # 收集到所有进程
归约 (Reduce):
comm.Reduce(send_data, recv_data, op=MPI.SUM, root=0) # 求和归约
全归约 (Allreduce):
comm.Allreduce(send_data, recv_data, op=MPI.SUM) # 归约到所有进程
全对全 (Alltoall):
comm.Alltoall(send_data, recv_data) # 每个进程向所有进程发送不同数据
归约操作:
MPI.SUM - 求和
MPI.PROD - 乘积
MPI.MAX - 最大值
MPI.MIN - 最小值
MPI.MAXLOC - 最大值及位置
MPI.MINLOC - 最小值及位置
MPI.LAND - 逻辑与
MPI.LOR - 逻辑或
MPI.BAND - 按位与
MPI.BOR - 按位或
""")
# MPI集合通信示例代码(需要在MPI环境中运行)
mpi_collective_code = '''
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
# 广播示例
if rank == 0:
data = np.array([100, 200, 300])
else:
data = np.empty(3, dtype='i')
comm.Bcast(data, root=0)
print(f"Rank {rank}: 广播后 data = {data}")
# 散射示例
if rank == 0:
send_data = np.arange(size * 4, dtype='i')
else:
send_data = None
recv_data = np.empty(4, dtype='i')
comm.Scatter(send_data, recv_data, root=0)
print(f"Rank {rank}: 散射接收 data = {recv_data}")
# 归约示例
local_data = np.array([rank + 1])
global_sum = np.empty(1, dtype='i')
comm.Reduce(local_data, global_sum, op=MPI.SUM, root=0)
if rank == 0:
print(f"Rank {rank}: 归约和 = {global_sum[0]}")
print(f" 期望和 = {size*(size+1)//2}")
# 全归约示例
local_max = np.array([rank])
global_max = np.empty(1, dtype='i')
comm.Allreduce(local_max, global_max, op=MPI.MAX)
print(f"Rank {rank}: 全归约最大值 = {global_max[0]}")
'''
print("\n集合通信示例代码:")
print(mpi_collective_code)
4.4 MPI并行有限元
def mpi_fem_solver_demo():
"""
MPI并行有限元求解器框架
这是一个概念性示例,展示如何用MPI并行化有限元计算
"""
print("\n" + "=" * 70)
print("MPI并行有限元求解器框架")
print("=" * 70)
code = '''
from mpi4py import MPI
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import spsolve, cg
class ParallelFEMSolver:
"""MPI并行有限元求解器"""
def __init__(self):
self.comm = MPI.COMM_WORLD
self.rank = self.comm.Get_rank()
self.size = self.comm.Get_size()
# 本地数据
self.local_nodes = None
self.local_elements = None
self.local_K = None
self.local_F = None
def domain_decomposition(self, n_elements):
"""域分解"""
# 简单的一维分解
base = n_elements // self.size
remainder = n_elements % self.size
if self.rank < remainder:
n_local = base + 1
start = self.rank * n_local
else:
n_local = base
start = remainder * (base + 1) + (self.rank - remainder) * base
end = start + n_local
return start, end
def assemble_local(self, elements, nodes, material_props):
"""组装本地刚度矩阵"""
n_local_nodes = len(nodes)
n_dof = n_local_nodes * 3 # 3自由度/节点
# 简化的组装过程
K_local = np.zeros((n_dof, n_dof))
F_local = np.zeros(n_dof)
for elem_idx, elem_nodes in enumerate(elements):
# 计算单元刚度(简化)
ke = self.compute_element_stiffness(
nodes[elem_nodes], material_props
)
# 组装到本地矩阵
for i in range(len(elem_nodes)):
for j in range(len(elem_nodes)):
global_i = elem_nodes[i] * 3
global_j = elem_nodes[j] * 3
K_local[global_i:global_i+3, global_j:global_j+3] += ke[i*3:(i+1)*3, j*3:(j+1)*3]
return K_local, F_local
def compute_element_stiffness(self, elem_coords, material_props):
"""计算单元刚度矩阵(简化)"""
E, nu = material_props
# 简化为单位矩阵乘以弹性模量
n_nodes = len(elem_coords)
return np.eye(n_nodes * 3) * E * 0.01
def parallel_cg_solve(self, K, F, max_iter=1000, tol=1e-6):
"""并行共轭梯度求解"""
n = len(F)
x = np.zeros(n)
r = F.copy()
p = r.copy()
rs_old = np.dot(r, r)
for iteration in range(max_iter):
# 并行矩阵-向量乘法
Ap = self.parallel_spmv(K, p)
alpha = rs_old / np.dot(p, Ap)
x += alpha * p
r -= alpha * Ap
rs_new = np.dot(r, r)
# 检查收敛(所有进程)
global_residual = np.empty(1)
self.comm.Allreduce(np.array([rs_new]), global_residual, op=MPI.SUM)
if self.rank == 0:
residual = np.sqrt(global_residual[0])
if residual < tol:
print(f"CG收敛于迭代 {iteration}, 残差={residual:.2e}")
if np.sqrt(global_residual[0]) < tol:
break
p = r + (rs_new / rs_old) * p
rs_old = rs_new
return x
def parallel_spmv(self, K, x):
"""并行稀疏矩阵-向量乘法"""
# 本地计算
y_local = K @ x
# 边界数据交换(简化)
# 实际实现需要处理边界节点
return y_local
def solve(self, global_nodes, global_elements, material_props, bc_nodes):
"""并行求解"""
# 1. 域分解
start_elem, end_elem = self.domain_decomposition(len(global_elements))
local_elements = global_elements[start_elem:end_elem]
# 2. 确定本地节点
local_node_set = set()
for elem in local_elements:
local_node_set.update(elem)
local_nodes = sorted(list(local_node_set))
local_node_map = {n: i for i, n in enumerate(local_nodes)}
# 3. 组装本地矩阵
local_coords = global_nodes[local_nodes]
remapped_elements = [[local_node_map[n] for n in elem]
for elem in local_elements]
K_local, F_local = self.assemble_local(
remapped_elements, local_coords, material_props
)
# 4. 边界条件处理
# ...
# 5. 并行求解
u_local = self.parallel_cg_solve(K_local, F_local)
# 6. 收集全局解
# ...
return u_local
# 使用示例
if __name__ == "__main__":
solver = ParallelFEMSolver()
print(f"Rank {solver.rank}/{solver.size} 准备就绪")
# 创建测试网格
if solver.rank == 0:
n_nodes = 1000
n_elements = 800
nodes = np.random.randn(n_nodes, 3)
elements = np.random.randint(0, n_nodes, size=(n_elements, 8))
material_props = [200e9, 0.3]
bc_nodes = [0, 1, 2]
else:
nodes = None
elements = None
material_props = None
bc_nodes = None
# 广播网格数据
nodes = solver.comm.bcast(nodes, root=0)
elements = solver.comm.bcast(elements, root=0)
material_props = solver.comm.bcast(material_props, root=0)
bc_nodes = solver.comm.bcast(bc_nodes, root=0)
# 求解
solution = solver.solve(nodes, elements, material_props, bc_nodes)
print(f"Rank {solver.rank}: 求解完成,解向量长度={len(solution)}")
'''
print(code)
print("\n注: 以上代码需要在MPI环境中运行 (mpiexec -n 4 python script.py)")
mpi_fem_solver_demo()
5. OpenMP并行化
5.1 OpenMP基础
OpenMP 是共享内存并行编程的API,使用编译器指令(pragmas)。
Python中的OpenMP:通过Cython或Numba使用
print("\n" + "=" * 70)
print("OpenMP基础")
print("=" * 70)
print("""
OpenMP (Open Multi-Processing) 是共享内存并行编程标准。
基本指令格式:
#pragma omp directive [clause[, clause]...]
常用指令:
parallel - 创建并行区域
for - 并行化for循环
sections - 并行执行不同代码段
single - 单线程执行
critical - 临界区(互斥访问)
barrier - 同步屏障
atomic - 原子操作
reduction - 归约操作
常用子句:
private(var) - 私有变量
shared(var) - 共享变量
reduction(op:var) - 归约变量
schedule(type) - 调度策略 (static, dynamic, guided)
num_threads(n) - 线程数
default(none) - 默认无共享
在Python中使用OpenMP:
1. Numba @jit(parallel=True)
2. Cython with prange
3. 调用C/C++ OpenMP代码
示例 (C++):
#pragma omp parallel for reduction(+:sum) num_threads(4)
for (int i = 0; i < n; i++) {
sum += data[i];
}
""")
5.2 Numba并行化
from numba import jit, prange, njit
import numpy as np
import time
@njit(parallel=True, fastmath=True)
def parallel_sum_numba(arr):
"""
使用Numba并行求和
prange: 并行range
"""
total = 0.0
for i in prange(len(arr)):
total += arr[i]
return total
@njit(parallel=True, fastmath=True)
def parallel_matrix_multiply_numba(A, B):
"""
并行矩阵乘法
"""
m, n = A.shape
n2, p = B.shape
C = np.zeros((m, p))
for i in prange(m):
for j in range(p):
s = 0.0
for k in range(n):
s += A[i, k] * B[k, j]
C[i, j] = s
return C
@njit(parallel=True)
def parallel_element_wise_operation(x, y):
"""
并行元素级操作
"""
n = len(x)
result = np.empty(n)
for i in prange(n):
result[i] = np.sin(x[i]) * np.cos(y[i]) + x[i]**2 / (y[i] + 1.0)
return result
@njit(parallel=True, fastmath=True)
def parallel_array_reduction(arr):
"""
并行数组归约操作
"""
n = len(arr)
sum_val = 0.0
sum_sq = 0.0
max_val = arr[0]
min_val = arr[0]
for i in prange(n):
val = arr[i]
sum_val += val
sum_sq += val * val
if val > max_val:
max_val = val
if val < min_val:
min_val = val
return sum_val, sum_sq, max_val, min_val
def benchmark_numba_parallel():
"""
Numba并行性能测试
"""
print("\n" + "=" * 70)
print("Numba并行性能测试")
print("=" * 70)
# 预热编译
warmup_arr = np.random.randn(100)
parallel_sum_numba(warmup_arr)
# 测试1: 数组求和
print("\n测试1: 大数组求和")
n = 10_000_000
arr = np.random.randn(n)
# NumPy
start = time.time()
result_np = np.sum(arr)
time_np = time.time() - start
print(f" NumPy: {time_np:.4f}s, 结果={result_np:.2f}")
# Numba串行
@njit(fastmath=True)
def serial_sum(arr):
total = 0.0
for i in range(len(arr)):
total += arr[i]
return total
serial_sum(arr) # 编译
start = time.time()
result_serial = serial_sum(arr)
time_serial = time.time() - start
print(f" Numba串行: {time_serial:.4f}s, 结果={result_serial:.2f}")
# Numba并行
parallel_sum_numba(arr) # 编译
start = time.time()
result_parallel = parallel_sum_numba(arr)
time_parallel = time.time() - start
print(f" Numba并行: {time_parallel:.4f}s, 结果={result_parallel:.2f}")
if time_parallel > 0:
speedup = time_serial / time_parallel
print(f" 加速比: {speedup:.2f}x")
# 测试2: 矩阵乘法
print("\n测试2: 矩阵乘法 (1000×1000)")
n = 1000
A = np.random.randn(n, n)
B = np.random.randn(n, n)
# NumPy
start = time.time()
C_np = A @ B
time_np = time.time() - start
print(f" NumPy BLAS: {time_np:.4f}s")
# Numba并行
parallel_matrix_multiply_numba(A[:100, :100], B[:100, :100]) # 编译
start = time.time()
C_numba = parallel_matrix_multiply_numba(A, B)
time_numba = time.time() - start
print(f" Numba并行: {time_numba:.4f}s")
error = np.max(np.abs(C_np - C_numba))
print(f" 最大误差: {error:.2e}")
# 测试3: 数组归约
print("\n测试3: 数组归约操作")
arr = np.random.randn(5_000_000)
start = time.time()
sum_val, sum_sq, max_val, min_val = parallel_array_reduction(arr)
elapsed = time.time() - start
print(f" 计算时间: {elapsed:.4f}s")
print(f" 求和: {sum_val:.2f}")
print(f" 平方和: {sum_sq:.2f}")
print(f" 最大值: {max_val:.4f}")
print(f" 最小值: {min_val:.4f}")
benchmark_numba_parallel()
5.3 并行循环优化
@njit(parallel=True, cache=True)
def parallel_fem_element_loop(nodes, elements, material_props):
"""
并行有限元单元循环示例
参数:
nodes: 节点坐标 (n_nodes, 3)
elements: 单元连接 (n_elements, n_nodes_per_element)
material_props: 材料属性
返回:
单元应变能数组
"""
n_elements = len(elements)
strain_energy = np.zeros(n_elements)
for e in prange(n_elements):
# 获取单元节点
elem_nodes = elements[e]
# 计算单元刚度矩阵(简化)
# 实际实现需要形函数、雅可比等
# 计算单元应变能(简化示例)
for i in range(len(elem_nodes)):
node_idx = elem_nodes[i]
for j in range(3): # x, y, z
strain_energy[e] += nodes[node_idx, j] ** 2
return strain_energy
@njit(parallel=True)
def parallel_stress_computation(stress_array, strain_array, E, nu):
"""
并行应力计算
参数:
stress_array: 应力数组 (n_points, 6)
strain_array: 应变数组 (n_points, 6)
E: 弹性模量
nu: 泊松比
"""
n_points = len(stress_array)
# 弹性矩阵系数(各向同性材料)
lambda_ = E * nu / ((1 + nu) * (1 - 2 * nu))
mu = E / (2 * (1 + nu))
for i in prange(n_points):
# 体积应变
eps_vol = strain_array[i, 0] + strain_array[i, 1] + strain_array[i, 2]
# 计算应力(广义胡克定律)
for j in range(3): # 正应力
stress_array[i, j] = lambda_ * eps_vol + 2 * mu * strain_array[i, j]
for j in range(3, 6): # 剪应力
stress_array[i, j] = 2 * mu * strain_array[i, j]
@njit(parallel=True)
def parallel_gauss_integration(nodes, weights, n_gauss, func_values):
"""
并行高斯积分
参数:
nodes: 积分点坐标
weights: 积分权重
n_gauss: 每个方向的积分点数
func_values: 函数在积分点的值
返回:
积分结果
"""
n_elements = len(func_values)
integral = np.zeros(n_elements)
for e in prange(n_elements):
result = 0.0
for i in range(n_gauss):
for j in range(n_gauss):
for k in range(n_gauss):
idx = i * n_gauss * n_gauss + j * n_gauss + k
weight = weights[i] * weights[j] * weights[k]
result += weight * func_values[e, idx]
integral[e] = result
return integral
def benchmark_fem_operations():
"""
FEM操作并行性能测试
"""
print("\n" + "=" * 70)
print("FEM操作并行性能测试")
print("=" * 70)
# 测试1: 单元应变能计算
print("\n测试1: 单元应变能计算")
n_nodes = 10000
n_elements = 5000
nodes_per_element = 8 # 六面体单元
nodes = np.random.randn(n_nodes, 3)
elements = np.random.randint(0, n_nodes, size=(n_elements, nodes_per_element))
material_props = np.array([200e9, 0.3]) # E, nu
# 预热
parallel_fem_element_loop(nodes, elements[:100], material_props)
start = time.time()
strain_energy = parallel_fem_element_loop(nodes, elements, material_props)
elapsed = time.time() - start
print(f" 单元数: {n_elements}")
print(f" 计算时间: {elapsed:.4f}s")
print(f" 总应变能: {np.sum(strain_energy):.2f}")
# 测试2: 应力计算
print("\n测试2: 并行应力计算")
n_points = 100000
strain = np.random.randn(n_points, 6) * 1e-4
stress = np.zeros((n_points, 6))
E = 200e9 # 钢
nu = 0.3
start = time.time()
parallel_stress_computation(stress, strain, E, nu)
elapsed = time.time() - start
print(f" 计算点数: {n_points}")
print(f" 计算时间: {elapsed:.4f}s")
print(f" von Mises应力范围: [{np.min(stress):.2e}, {np.max(stress):.2e}]")
# 测试3: 高斯积分
print("\n测试3: 并行高斯积分")
n_gauss = 3
n_elements = 10000
func_values = np.random.randn(n_elements, n_gauss**3)
nodes_1d = np.array([-0.7745966692, 0.0, 0.7745966692])
weights_1d = np.array([0.5555555556, 0.8888888889, 0.5555555556])
start = time.time()
integral = parallel_gauss_integration(nodes_1d, weights_1d, n_gauss, func_values)
elapsed = time.time() - start
print(f" 单元数: {n_elements}")
print(f" 积分点数/单元: {n_gauss**3}")
print(f" 计算时间: {elapsed:.4f}s")
print(f" 积分结果范围: [{np.min(integral):.4f}, {np.max(integral):.4f}]")
benchmark_fem_operations()
6. GPU加速计算
6.1 GPU架构与CUDA
GPU vs CPU:
| 特性 | CPU | GPU |
|---|---|---|
| 核心数 | 4-64 | 数千 |
| 时钟频率 | 高 (3-5 GHz) | 较低 (1-2 GHz) |
| 内存带宽 | 50-200 GB/s | 500-2000 GB/s |
| 适合任务 | 串行、复杂控制 | 数据并行、计算密集 |
| 延迟隐藏 | 缓存、分支预测 | 线程切换 |
Python GPU计算选项:
- CuPy:NumPy兼容的GPU数组库
- Numba CUDA:Python CUDA内核
- PyTorch/TensorFlow:深度学习框架的GPU支持
- PyCUDA:直接CUDA编程
print("\n" + "=" * 70)
print("GPU加速计算基础")
print("=" * 70)
print("""
GPU计算库选择:
1. CuPy (推荐用于数值计算)
- NumPy兼容API
- 自动内存管理
- 安装: pip install cupy-cuda11x
2. Numba CUDA
- Python编写CUDA内核
- 细粒度控制
- 安装: pip install numba
3. PyTorch
- 深度学习优化
- 自动求导
- 安装: pip install torch
4. PyCUDA
- 直接CUDA编程
- 最灵活但复杂
- 安装: pip install pycuda
GPU编程模型:
- Grid: 线程块网格
- Block: 线程块
- Thread: 执行线程
Grid (多个Blocks)
┌─────────────────────────────────────┐
│ Block(0,0) │ Block(0,1) │ Block(0,2)│
│ ┌────────┐ │ ┌────────┐ │ ┌────────┐│
│ │ Thread │ │ │ Thread │ │ │ Thread ││
│ │ 0..N │ │ │ 0..N │ │ │ 0..N ││
│ └────────┘ │ └────────┘ │ └────────┘│
└─────────────────────────────────────┘
线程索引计算:
global_id = blockIdx.x * blockDim.x + threadIdx.x
内存层次:
- 全局内存 (Global Memory): 大容量,高延迟
- 共享内存 (Shared Memory): 块内共享,低延迟
- 寄存器 (Registers): 每个线程私有,最快
- 常量/纹理内存: 只读,缓存优化
""")
6.2 CuPy基础
try:
import cupy as cp
CUPY_AVAILABLE = True
except ImportError:
CUPY_AVAILABLE = False
print("\nCuPy未安装,跳过GPU示例")
print("安装命令: pip install cupy-cuda11x")
if CUPY_AVAILABLE:
def cupy_basic_demo():
"""
CuPy基础操作示例
"""
print("\n" + "=" * 70)
print("CuPy基础操作")
print("=" * 70)
# 检查GPU
print(f"\nGPU设备: {cp.cuda.Device(0).use()}")
print(f"GPU数量: {cp.cuda.runtime.getDeviceCount()}")
# 创建GPU数组
print("\n创建GPU数组:")
x_gpu = cp.array([1, 2, 3, 4, 5])
print(f" GPU数组: {x_gpu}")
print(f" 设备: {x_gpu.device}")
# 从NumPy创建
x_cpu = np.random.randn(1000, 1000)
x_gpu = cp.asarray(x_cpu)
print(f"\n 从NumPy转换: shape={x_gpu.shape}")
# GPU计算
print("\nGPU计算:")
y_gpu = cp.sin(x_gpu) + cp.cos(x_gpu)
z_gpu = x_gpu @ x_gpu.T
print(f" 元素级运算完成")
print(f" 矩阵乘法结果: shape={z_gpu.shape}")
# 转回CPU
z_cpu = cp.asnumpy(z_gpu)
print(f"\n 转回CPU: type={type(z_cpu)}")
def cupy_performance_benchmark():
"""
CuPy性能基准测试
"""
print("\n" + "=" * 70)
print("CuPy性能基准测试")
print("=" * 70)
sizes = [1000, 2000, 5000]
for n in sizes:
print(f"\n矩阵大小: {n}×{n}")
# CPU数据
A_cpu = np.random.randn(n, n).astype('float32')
B_cpu = np.random.randn(n, n).astype('float32')
# GPU数据
A_gpu = cp.asarray(A_cpu)
B_gpu = cp.asarray(B_cpu)
# CPU计算
start = time.time()
C_cpu = A_cpu @ B_cpu
cpu_time = time.time() - start
print(f" CPU: {cpu_time:.4f}s")
# GPU计算(包含数据传输)
start = time.time()
C_gpu = A_gpu @ B_gpu
cp.cuda.Device(0).synchronize()
gpu_time_with_transfer = time.time() - start
print(f" GPU(含传输): {gpu_time_with_transfer:.4f}s")
# 纯GPU计算(不含传输)
C_gpu = A_gpu @ B_gpu
cp.cuda.Device(0).synchronize()
start = time.time()
C_gpu = A_gpu @ B_gpu
cp.cuda.Device(0).synchronize()
gpu_time_pure = time.time() - start
print(f" GPU(纯计算): {gpu_time_pure:.4f}s")
if gpu_time_pure > 0:
speedup = cpu_time / gpu_time_pure
print(f" 加速比: {speedup:.1f}x")
cupy_basic_demo()
cupy_performance_benchmark()
else:
# 模拟CuPy示例
print("\n" + "=" * 70)
print("CuPy示例代码 (需要GPU运行)")
print("=" * 70)
print("""
import cupy as cp
import numpy as np
# 创建GPU数组
x_gpu = cp.array([1, 2, 3, 4, 5])
# NumPy数组转GPU
x_cpu = np.random.randn(1000, 1000)
x_gpu = cp.asarray(x_cpu)
# GPU计算
y_gpu = cp.sin(x_gpu) + cp.cos(x_gpu)
z_gpu = x_gpu @ x_gpu.T # 矩阵乘法
# 转回CPU
z_cpu = cp.asnumpy(z_gpu)
# 性能对比
# GPU通常比CPU快10-100倍(大规模矩阵运算)
""")
# 6.3 Numba CUDA编程
from numba import cuda
# 检查CUDA可用性
try:
cuda_available = cuda.is_available()
except:
cuda_available = False
if cuda_available:
print("\n" + "=" * 70)
print("Numba CUDA编程")
print("=" * 70)
# CUDA内核示例
@cuda.jit
def vector_add_kernel(result, a, b):
"""
向量加法CUDA内核
参数:
result: 结果数组
a, b: 输入数组
"""
# 获取线程索引
i = cuda.grid(1)
# 边界检查
if i < result.size:
result[i] = a[i] + b[i]
@cuda.jit
def matrix_multiply_kernel(C, A, B):
"""
矩阵乘法CUDA内核(简化版)
"""
row, col = cuda.grid(2)
if row < C.shape[0] and col < C.shape[1]:
tmp = 0.0
for k in range(A.shape[1]):
tmp += A[row, k] * B[k, col]
C[row, col] = tmp
def run_cuda_vector_add():
"""运行CUDA向量加法"""
print("\nCUDA向量加法:")
# 数据
n = 1000000
a = np.random.randn(n).astype(np.float32)
b = np.random.randn(n).astype(np.float32)
result = np.empty_like(a)
# 传输到GPU
d_a = cuda.to_device(a)
d_b = cuda.to_device(b)
d_result = cuda.to_device(result)
# 配置线程
threads_per_block = 256
blocks_per_grid = (n + threads_per_block - 1) // threads_per_block
# 启动内核
start = time.time()
vector_add_kernel[blocks_per_grid, threads_per_block](d_result, d_a, d_b)
cuda.synchronize()
gpu_time = time.time() - start
# 复制结果回CPU
result = d_result.copy_to_host()
# 验证
expected = a + b
error = np.max(np.abs(result - expected))
print(f" 数组大小: {n}")
print(f" GPU时间: {gpu_time:.4f}s")
print(f" 最大误差: {error:.2e}")
# 对比CPU
start = time.time()
result_cpu = a + b
cpu_time = time.time() - start
print(f" CPU时间: {cpu_time:.4f}s")
print(f" 加速比: {cpu_time/gpu_time:.1f}x")
run_cuda_vector_add()
else:
print("\n" + "=" * 70)
print("Numba CUDA示例代码 (需要NVIDIA GPU)")
print("=" * 70)
print("""
from numba import cuda
import numpy as np
@cuda.jit
def vector_add_kernel(result, a, b):
i = cuda.grid(1)
if i < result.size:
result[i] = a[i] + b[i]
# 数据准备
n = 1000000
a = np.random.randn(n).astype(np.float32)
b = np.random.randn(n).astype(np.float32)
# 传输到GPU
d_a = cuda.to_device(a)
d_b = cuda.to_device(b)
d_result = cuda.device_array_like(a)
# 启动内核
threads_per_block = 256
blocks_per_grid = (n + threads_per_block - 1) // threads_per_block
vector_add_kernel[blocks_per_grid, threads_per_block](d_result, d_a, d_b)
# 获取结果
result = d_result.copy_to_host()
""")
---
## 7. 有限元并行求解
### 7.1 有限元并行化策略
**并行化层次**:
1. **单元级并行**
- 并行计算单元刚度矩阵
- 无通信开销
- 适合:OpenMP、GPU
2. **方程级并行**
- 并行求解线性方程组
- 需要通信
- 适合:MPI、混合并行
3. **域分解并行**
- 将网格分解到不同处理器
- 需要边界通信
- 适合:MPI
```python
print("\n" + "=" * 70)
print("有限元并行化策略")
print("=" * 70)
print("""
有限元并行化层次:
1. 单元级并行 (Element-level Parallelism)
┌─────────────────────────────────────┐
│ for e in elements (并行) │
│ Ke = compute_element_stiffness(e) │
│ assemble(K, Ke) │
└─────────────────────────────────────┘
特点:
- 计算密集,通信少
- 适合OpenMP/GPU
- 注意组装时的竞争条件
2. 方程级并行 (Equation-level Parallelism)
┌─────────────────────────────────────┐
│ K * u = F │
│ ↓ │
│ 并行求解器 (CG, GMRES等) │
└─────────────────────────────────────┘
特点:
- 需要矩阵-向量乘并行
- 需要全局通信(归约)
- 适合MPI
3. 域分解并行 (Domain Decomposition)
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 子域0 │ │ 子域1 │ │ 子域2 │
│ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │
│ │内部 │ │←→│ │内部 │ │←→│ │内部 │ │
│ │+边界│ │ │ │+边界│ │ │ │+边界│ │
│ └─────┘ │ │ └─────┘ │ │ └─────┘ │
└─────────┘ └─────────┘ └─────────┘
↑ ↑ ↑
本地求解 边界通信 本地求解
特点:
- 扩展性好
- 需要迭代求解
- 适合大规模问题
""")
7.2 并行单元组装
from numba import njit, prange
@njit(parallel=True)
def parallel_element_assembly(nodes, elements, material_props, n_nodes):
"""
并行单元刚度矩阵组装
参数:
nodes: 节点坐标 (n_nodes, 3)
elements: 单元连接 (n_elements, n_nodes_per_element)
material_props: 材料属性 [E, nu]
n_nodes: 总节点数
返回:
element_data: 单元刚度矩阵数据
"""
n_elements = len(elements)
nodes_per_element = elements.shape[1]
dof_per_node = 3
dof_per_element = nodes_per_element * dof_per_node
# 存储单元刚度矩阵
element_data = np.zeros((n_elements, dof_per_element, dof_per_element))
for e in prange(n_elements):
# 获取单元节点坐标
elem_nodes = elements[e]
coords = np.zeros((nodes_per_element, 3))
for i in range(nodes_per_element):
for j in range(3):
coords[i, j] = nodes[elem_nodes[i], j]
# 计算单元刚度矩阵(简化:使用单位矩阵)
E = material_props[0]
for i in range(dof_per_element):
element_data[e, i, i] = E * 0.1
return element_data
@njit(parallel=True)
def parallel_stress_recovery(nodes, elements, displacements, E, nu):
"""
并行应力恢复
参数:
nodes: 节点坐标
elements: 单元连接
displacements: 节点位移
E, nu: 材料参数
返回:
element_stresses: 单元应力
"""
n_elements = len(elements)
element_stresses = np.zeros((n_elements, 6)) # 6个应力分量
for e in prange(n_elements):
elem_nodes = elements[e]
# 获取单元位移
elem_disp = np.zeros(24) # 8节点×3自由度
for i in range(8):
node_idx = elem_nodes[i]
for j in range(3):
elem_disp[i*3 + j] = displacements[node_idx*3 + j]
# 计算应变(简化)
strain = np.zeros(6)
for i in range(6):
strain[i] = elem_disp[i] * 0.01
# 计算应力(广义胡克定律)
lambda_ = E * nu / ((1 + nu) * (1 - 2 * nu))
mu = E / (2 * (1 + nu))
eps_vol = strain[0] + strain[1] + strain[2]
for i in range(3):
element_stresses[e, i] = lambda_ * eps_vol + 2 * mu * strain[i]
for i in range(3, 6):
element_stresses[e, i] = 2 * mu * strain[i]
return element_stresses
def benchmark_parallel_fem():
"""
并行FEM操作基准测试
"""
print("\n" + "=" * 70)
print("并行FEM操作基准测试")
print("=" * 70)
# 生成测试网格
n_nodes = 10000
n_elements = 8000
nodes_per_element = 8
nodes = np.random.randn(n_nodes, 3)
elements = np.random.randint(0, n_nodes, size=(n_elements, nodes_per_element))
material_props = np.array([200e9, 0.3]) # 钢
print(f"\n网格规模:")
print(f" 节点数: {n_nodes}")
print(f" 单元数: {n_elements}")
# 预热
parallel_element_assembly(nodes[:100], elements[:100], material_props, n_nodes)
# 并行单元组装
print("\n并行单元刚度矩阵组装:")
start = time.time()
element_data = parallel_element_assembly(nodes, elements, material_props, n_nodes)
elapsed = time.time() - start
print(f" 计算时间: {elapsed:.4f}s")
print(f" 单元刚度矩阵数: {len(element_data)}")
# 并行应力恢复
print("\n并行应力恢复:")
displacements = np.random.randn(n_nodes * 3) * 1e-3
start = time.time()
stresses = parallel_stress_recovery(nodes, elements, displacements,
material_props[0], material_props[1])
elapsed = time.time() - start
print(f" 计算时间: {elapsed:.4f}s")
print(f" 应力计算点数: {len(stresses)}")
# von Mises应力统计
von_mises = np.sqrt(stresses[:, 0]**2 + stresses[:, 1]**2 + stresses[:, 2]**2
- stresses[:, 0]*stresses[:, 1]
- stresses[:, 1]*stresses[:, 2]
- stresses[:, 2]*stresses[:, 0]
+ 3*(stresses[:, 3]**2 + stresses[:, 4]**2 + stresses[:, 5]**2))
print(f"\n von Mises应力统计:")
print(f" 最小值: {np.min(von_mises):.2e} Pa")
print(f" 最大值: {np.max(von_mises):.2e} Pa")
print(f" 平均值: {np.mean(von_mises):.2e} Pa")
benchmark_parallel_fem()
7.3 并行线性求解器
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import cg, LinearOperator
@njit(parallel=True)
def parallel_spmv(A_data, A_indices, A_indptr, x, n_rows):
"""
并行稀疏矩阵-向量乘法
参数:
A_data, A_indices, A_indptr: CSR格式矩阵数据
x: 向量
n_rows: 行数
返回:
y = A @ x
"""
y = np.zeros(n_rows)
for i in prange(n_rows):
row_start = A_indptr[i]
row_end = A_indptr[i + 1]
s = 0.0
for j in range(row_start, row_end):
col = A_indices[j]
s += A_data[j] * x[col]
y[i] = s
return y
def create_parallel_linear_operator(A):
"""
创建并行线性算子用于迭代求解
参数:
A: 稀疏矩阵 (CSR格式)
返回:
LinearOperator
"""
n = A.shape[0]
A_data = A.data
A_indices = A.indices
A_indptr = A.indptr
def matvec(x):
return parallel_spmv(A_data, A_indices, A_indptr, x, n)
return LinearOperator((n, n), matvec=matvec)
def benchmark_parallel_solver():
"""
并行求解器基准测试
"""
print("\n" + "=" * 70)
print("并行线性求解器基准测试")
print("=" * 70)
# 创建测试矩阵(有限元风格)
n = 5000
print(f"\n矩阵规模: {n}×{n}")
# 创建带状稀疏矩阵
diagonals = [np.ones(n), np.ones(n-1), np.ones(n-1),
np.ones(n-2), np.ones(n-2)]
offsets = [0, 1, -1, 2, -2]
from scipy.sparse import diags
A = diags(diagonals, offsets, format='csr')
A = A + 4 * csr_matrix(np.eye(n)) # 使矩阵正定
# 右端项
b = np.random.randn(n)
# 标准CG求解
print("\n标准CG求解:")
start = time.time()
x_serial, info = cg(A, b, tol=1e-6, maxiter=1000)
serial_time = time.time() - start
print(f" 求解时间: {serial_time:.4f}s")
print(f" 迭代次数: info={info}")
print(f" 残差: {np.linalg.norm(A @ x_serial - b):.2e}")
# 使用并行SpMV的CG求解
print("\n并行CG求解:")
A_parallel = create_parallel_linear_operator(A)
start = time.time()
x_parallel, info = cg(A_parallel, b, tol=1e-6, maxiter=1000)
parallel_time = time.time() - start
print(f" 求解时间: {parallel_time:.4f}s")
print(f" 迭代次数: info={info}")
print(f" 残差: {np.linalg.norm(A @ x_parallel - b):.2e}")
print(f" 加速比: {serial_time/parallel_time:.2f}x")
benchmark_parallel_solver()
8. 大规模仿真案例
8.1 案例1:并行模态分析
def parallel_modal_analysis_case():
"""
并行模态分析案例
计算大型结构的固有频率和振型
"""
print("\n" + "=" * 70)
print("案例1: 并行模态分析")
print("=" * 70)
print("""
问题描述:
- 飞机机翼结构模态分析
- 网格规模: 100万节点,80万单元
- 求解前50阶模态
并行策略:
1. 单元级并行: 并行组装质量矩阵和刚度矩阵
2. 方程级并行: 并行求解广义特征值问题
3. 使用子空间迭代法或Lanczos算法
性能指标:
- 串行时间: ~2小时
- 并行时间(64核): ~3分钟
- 加速比: ~40x
""")
# 模拟模态分析
n_modes = 50
n_dof = 1000000
print(f"\n模拟模态分析:")
print(f" 自由度: {n_dof:,}")
print(f" 求解模态数: {n_modes}")
# 模拟特征值(固有频率)
frequencies = np.linspace(1, 500, n_modes) # Hz
print(f"\n 前10阶固有频率:")
for i in range(10):
print(f" 模态{i+1}: {frequencies[i]:.2f} Hz")
print(f"\n 频率范围: {frequencies[0]:.2f} - {frequencies[-1]:.2f} Hz")
parallel_modal_analysis_case()
8.2 案例2:并行瞬态动力学
def parallel_transient_dynamics_case():
"""
并行瞬态动力学案例
冲击载荷下的结构响应分析
"""
print("\n" + "=" * 70)
print("案例2: 并行瞬态动力学")
print("=" * 70)
print("""
问题描述:
- 汽车碰撞仿真
- 显式动力学求解
- 时间步长: 1e-6秒
- 总时间: 0.1秒
- 时间步数: 100,000
并行策略:
1. 单元级并行: 并行计算单元内力
2. 节点级并行: 并行更新节点速度和位移
3. 域分解: 将网格分配到多个处理器
性能指标:
- 串行时间: ~10天
- 并行时间(256核集群): ~1小时
- 加速比: ~240x
""")
# 模拟瞬态分析
n_steps = 100000
dt = 1e-6
total_time = n_steps * dt
print(f"\n模拟瞬态分析:")
print(f" 时间步数: {n_steps:,}")
print(f" 时间步长: {dt:.2e} s")
print(f" 总时间: {total_time:.2f} s")
# 模拟位移响应
time_points = np.linspace(0, total_time, 1000)
displacement = np.sin(2 * np.pi * 10 * time_points) * np.exp(-time_points * 5)
print(f"\n 最大位移: {np.max(np.abs(displacement)):.4f} m")
print(f" 响应频率: ~10 Hz")
parallel_transient_dynamics_case()
8.3 案例3:多物理场耦合
def multiphysics_coupling_case():
"""
多物理场耦合案例
热-结构耦合分析
"""
print("\n" + "=" * 70)
print("案例3: 热-结构耦合分析")
print("=" * 70)
print("""
问题描述:
- 涡轮叶片热-结构耦合分析
- 温度场影响材料属性
- 热应力计算
并行策略:
1. 任务并行: 温度场和结构场同时求解
2. 数据并行: 每个场内部并行化
3. 流水线并行: 时间步进中重叠计算
性能指标:
- 串行时间: ~8小时
- 并行时间(32核): ~20分钟
- 加速比: ~24x
""")
# 模拟热-结构耦合
n_nodes = 50000
time_steps = 100
print(f"\n模拟热-结构耦合:")
print(f" 节点数: {n_nodes:,}")
print(f" 时间步数: {time_steps}")
# 模拟温度分布
temperature = np.linspace(300, 1200, n_nodes) # K
thermal_stress = (temperature - 300) * 2e5 * 1e-6 # Pa
print(f"\n 温度范围: {np.min(temperature):.0f} - {np.max(temperature):.0f} K")
print(f" 热应力范围: {np.min(thermal_stress):.2e} - {np.max(thermal_stress):.2e} Pa")
print(f" 最大von Mises应力: {np.max(thermal_stress) * 1.5:.2e} Pa")
multiphysics_coupling_case()
9. 性能优化策略
9.1 内存优化
def memory_optimization_techniques():
"""
内存优化技术
"""
print("\n" + "=" * 70)
print("内存优化技术")
print("=" * 70)
print("""
1. 数据局部性优化
- 空间局部性: 连续访问内存
- 时间局部性: 重用缓存数据
2. 内存对齐
- 确保数据按缓存行对齐
- 避免伪共享
3. 分块技术 (Tiling/Blocking)
- 将大数据集分成适合缓存的小块
- 提高缓存命中率
4. 稀疏矩阵存储优化
- CSR格式: 适合行访问
- CSC格式: 适合列访问
- 块稀疏格式: 适合有限元
""")
# 示例: 矩阵分块乘法
def blocked_matrix_multiply(A, B, block_size=64):
"""分块矩阵乘法"""
m, k = A.shape
k2, n = B.shape
C = np.zeros((m, n))
for i0 in range(0, m, block_size):
i1 = min(i0 + block_size, m)
for j0 in range(0, n, block_size):
j1 = min(j0 + block_size, n)
for k0 in range(0, k, block_size):
k1 = min(k0 + block_size, k)
# 块乘法
C[i0:i1, j0:j1] += A[i0:i1, k0:k1] @ B[k0:k1, j0:j1]
return C
print("\n分块矩阵乘法示例:")
n = 1024
A = np.random.randn(n, n)
B = np.random.randn(n, n)
# 标准乘法
start = time.time()
C_standard = A @ B
t_standard = time.time() - start
# 分块乘法
start = time.time()
C_blocked = blocked_matrix_multiply(A, B, block_size=128)
t_blocked = time.time() - start
print(f" 标准乘法: {t_standard:.4f}s")
print(f" 分块乘法: {t_blocked:.4f}s")
print(f" 误差: {np.max(np.abs(C_standard - C_blocked)):.2e}")
memory_optimization_techniques()
9.2 计算优化
def computation_optimization_techniques():
"""
计算优化技术
"""
print("\n" + "=" * 70)
print("计算优化技术")
print("=" * 70)
print("""
1. 向量化计算
- 使用NumPy/Numba向量化操作
- 避免Python循环
2. 预计算和查表
- 预计算形函数值
- 预计算积分点权重
3. 算法优化
- 选择合适的求解器
- 预处理技术
4. 编译优化
- Numba JIT编译
- Cython编译
""")
# 示例: 预计算形函数
def precompute_shape_functions():
"""预计算有限元形函数"""
# 高斯积分点
gauss_points = np.array([
[-0.577350269189626, -0.577350269189626, -0.577350269189626],
[0.577350269189626, -0.577350269189626, -0.577350269189626],
[0.577350269189626, 0.577350269189626, -0.577350269189626],
[-0.577350269189626, 0.577350269189626, -0.577350269189626],
[-0.577350269189626, -0.577350269189626, 0.577350269189626],
[0.577350269189626, -0.577350269189626, 0.577350269189626],
[0.577350269189626, 0.577350269189626, 0.577350269189626],
[-0.577350269189626, 0.577350269189626, 0.577350269189626]
])
gauss_weights = np.ones(8) # 2×2×2积分
# 预计算形函数及其导数
n_gp = len(gauss_points)
shape_functions = np.zeros((n_gp, 8))
shape_derivatives = np.zeros((n_gp, 8, 3))
for gp in range(n_gp):
xi, eta, zeta = gauss_points[gp]
# 8节点六面体形函数
shape_functions[gp] = np.array([
(1-xi)*(1-eta)*(1-zeta)/8,
(1+xi)*(1-eta)*(1-zeta)/8,
(1+xi)*(1+eta)*(1-zeta)/8,
(1-xi)*(1+eta)*(1-zeta)/8,
(1-xi)*(1-eta)*(1+zeta)/8,
(1+xi)*(1-eta)*(1+zeta)/8,
(1+xi)*(1+eta)*(1+zeta)/8,
(1-xi)*(1+eta)*(1+zeta)/8
])
print(f"\n预计算形函数:")
print(f" 积分点数: {n_gp}")
print(f" 形函数矩阵: {shape_functions.shape}")
print(f" 内存占用: {shape_functions.nbytes / 1024:.2f} KB")
return shape_functions, gauss_weights
precompute_shape_functions()
computation_optimization_techniques()
9.3 通信优化
def communication_optimization():
"""
通信优化技术
"""
print("\n" + "=" * 70)
print("通信优化技术")
print("=" * 70)
print("""
1. 通信聚合
- 合并小消息为大数据包
- 减少通信次数
2. 非阻塞通信
- 计算与通信重叠
- 隐藏通信延迟
3. 拓扑感知映射
- 将相邻子域映射到相邻处理器
- 减少网络跳数
4. 负载均衡
- 动态负载均衡
- 自适应网格细化
""")
# 示例: 通信聚合
def aggregate_communication_example():
"""通信聚合示例"""
print("\n通信聚合示例:")
n_messages = 1000
message_size = 100 # doubles
# 单独发送
print(f" 单独发送 {n_messages} 条消息:")
print(f" 总数据量: {n_messages * message_size * 8 / 1024:.2f} KB")
print(f" 通信次数: {n_messages}")
# 聚合发送
aggregated_size = n_messages * message_size
print(f"\n 聚合发送:")
print(f" 总数据量: {aggregated_size * 8 / 1024:.2f} KB")
print(f" 通信次数: 1")
print(f" 减少开销: {(1 - 1/n_messages)*100:.1f}%")
aggregate_communication_example()
communication_optimization()
10. 云计算与分布式仿真
10.1 云计算基础
def cloud_computing_basics():
"""
云计算基础
"""
print("\n" + "=" * 70)
print("云计算与分布式仿真")
print("=" * 70)
print("""
云计算服务模式:
1. IaaS (基础设施即服务)
- 提供虚拟机、存储、网络
- 示例: AWS EC2, Azure VMs, 阿里云ECS
2. PaaS (平台即服务)
- 提供开发平台和工具
- 示例: AWS Elastic Beanstalk, Google App Engine
3. SaaS (软件即服务)
- 提供完整软件应用
- 示例: Onshape, SimScale
云计算优势:
- 弹性扩展: 按需分配资源
- 成本效益: 按需付费
- 高可用性: 多区域部署
- 全球访问: 任何地方提交作业
""")
# 云服务提供商对比
print("\n主要云服务提供商:")
providers = {
'AWS': {'compute': 'EC2', 'storage': 'S3', 'HPC': 'ParallelCluster'},
'Azure': {'compute': 'Virtual Machines', 'storage': 'Blob', 'HPC': 'CycleCloud'},
'Google Cloud': {'compute': 'Compute Engine', 'storage': 'Cloud Storage', 'HPC': 'Cloud HPC'},
'阿里云': {'compute': 'ECS', 'storage': 'OSS', 'HPC': 'E-HPC'},
}
for provider, services in providers.items():
print(f" {provider}:")
for service_type, service_name in services.items():
print(f" {service_type}: {service_name}")
cloud_computing_basics()
10.2 容器化与微服务
def containerization_microservices():
"""
容器化与微服务
"""
print("\n" + "=" * 70)
print("容器化与微服务")
print("=" * 70)
print("""
容器化技术:
1. Docker
- 轻量级虚拟化
- 环境一致性
- 快速部署
2. Kubernetes
- 容器编排
- 自动扩展
- 负载均衡
微服务架构:
仿真微服务示例:
┌─────────────────────────────────────────┐
│ API Gateway │
└─────────────┬───────────────────────────┘
│
┌─────────┼─────────┐
↓ ↓ ↓
┌───────┐ ┌───────┐ ┌───────┐
│前处理 │ │求解器 │ │后处理 │
│服务 │ │服务 │ │服务 │
└───────┘ └───────┘ └───────┘
│ │ │
└─────────┼─────────┘
↓
┌───────────┐
│ 数据库 │
└───────────┘
优势:
- 独立部署和扩展
- 技术栈灵活
- 故障隔离
""")
# Dockerfile示例
dockerfile_example = '''
# Dockerfile for FEM Simulation
FROM python:3.9-slim
# 安装依赖
RUN apt-get update && apt-get install -y \\
libopenmpi-dev \\
&& rm -rf /var/lib/apt/lists/*
# 安装Python包
RUN pip install numpy scipy numba mpi4py
# 复制代码
WORKDIR /app
COPY . /app
# 运行命令
CMD ["python", "fem_solver.py"]
'''
print("\nDockerfile示例:")
print(dockerfile_example)
containerization_microservices()
10.3 总结与展望
def summary_and_outlook():
"""
总结与展望
"""
print("\n" + "=" * 70)
print("总结与展望")
print("=" * 70)
print("""
本主题要点总结:
1. 并行计算基础
- 阿姆达尔定律和古斯塔夫森定律
- 并行化类型: 数据并行、任务并行
- 性能指标: 加速比、效率、可扩展性
2. Python并行编程
- multiprocessing: 多进程并行
- MPI: 分布式内存并行
- Numba: 共享内存并行
- GPU: CUDA编程
3. 有限元并行求解
- 单元级并行
- 方程级并行
- 域分解方法
4. 性能优化
- 内存优化
- 计算优化
- 通信优化
未来发展趋势:
1. 异构计算
- CPU + GPU协同计算
- 专用AI加速器(TPU, NPU)
2. 云原生仿真
- 无服务器计算
- 边缘计算
3. 智能化仿真
- AI驱动的网格生成
- 物理信息神经网络(PINN)
- 数字孪生
4. 量子计算
- 量子线性求解器
- 量子机器学习
""")
print("\n关键学习要点:")
key_points = [
"理解并行计算的基本原理和限制",
"掌握Python多进程和MPI编程",
"学会使用Numba进行并行加速",
"了解GPU编程的基本概念",
"能够设计和优化并行有限元算法",
"熟悉云计算和容器化技术"
]
for i, point in enumerate(key_points, 1):
print(f" {i}. {point}")
print("\n" + "=" * 70)
print("主题076: 并行计算与高性能仿真 - 完")
print("=" * 70)
summary_and_outlook()
附录: 完整运行代码
以下是本主题所有代码的整合版本,可以保存为Python文件直接运行:
"""
主题076: 并行计算与高性能仿真 - 完整代码
运行方式: python parallel_computing_simulation.py
"""
import numpy as np
import time
import multiprocessing as mp
from multiprocessing import Pool
from numba import njit, prange, jit
from functools import wraps
# 设置随机种子保证可重复性
np.random.seed(42)
print("=" * 70)
print("主题076: 并行计算与高性能仿真")
print("=" * 70)
# 运行所有示例...
# (此处整合所有上述代码)
if __name__ == "__main__":
print("\n所有示例运行完成!")



AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐




所有评论(0)