📝 文章摘要

在多核 CPU 时代,并行计算是提升性能的关键。`rayon 是 Rust 生态中最著名的数据并行库,它提供了一种极其简单的方式将顺序迭代器(iter())转换为并行迭代器(par_iter())。本文将深入探讨 rayon 的设计哲学、ParallelIterator Trait 的使用,并重点剖析其背后的核心技术——工作窃取(Work-Stealing)调度器,以及它如何实现负载均衡。通过实战(如并行处理图像、科学计算),我们将展示 rayon 如何在不增加复杂性的前提下,榨干 CPU 性能。


一、背景介绍

1. 并行 (Parallelism) vs 并发 (Concurrency)

在 Rust 中,tokio (async/await) 和 rayon 解决了两个不同的问题:

  • 并发 (Concurrency) (tokio):处理多个 I/O 密集型任务。CPU 在等待网络或磁盘时,切换去做其他事。目标是提高响应性
  • 并行 (Parallelism) (rayon):处理**单个PU 密集型任务。将一个大任务(如计算10亿个数字的和)拆分到多个 CPU 核心上同时执行。目标是**总耗时
graph TD
    A[计算类型] --> B{CPU 密集型?};
    B -- 是 --> C[并行 (Parallelism)];
    C --> D[Rayon (par_iter)];
    
    B -- 否 (I/O 密集型) --> E[并发 (Concurrency)];
    E --> F[Tokio (async/await)];
    
    style C fill:#e8f5e9,stroke:#388e3c
    style E fill:#e1f5fe,stroke:#0288d1

1.2 rayon 的杀手级特性:par_iter()

rayon 的 API 设计极其优雅。

// 顺序 (Sequential)
fn sum_of_squares(v: &[i64]) -> i64 {
    v.iter()
     .map(|&x| x * x)
     .sum()
}

// 并行 (Parallel) - 只需改动 1 行!
use rayon::prelude::*;

fn par_sum_of_squares(v: &[i64]) -> i64 {
    v.par_iter() // <--- 唯一的改动
     .map(|&x| x * x)
     .sum()
}

二、原理详解

2.1 ParallelIterator Trait

rayon 的核心是 ParallelIterator Trait,它镜像了标准库的 Iterator Trait,但其方法(mapfilter, `reduce)是并行执行的。

Iterator (顺序)

在这里插入图片描述

ParallelIterator (并行)

ParallelIterator 的核心是可分治(Divisible)。它不是一次返回一个元素,而是将自己分裂成更小的任务单元。

在这里插入图片描述

2.2 核心:工作窃取(Work-Stealing)调度器

rayon 的高性能来自于其全局线程池和工作窃取调度器。

1. 线程池 (Thread Pool)rayon 在全局维护一个(通常等于 CPU 核心数的)线程池。

2. 工作队列 (Deque):每个线程都有一个自己的**双端队列Deque)** 来存放待执行的任务。

3. 工作窃取 (Work-Stealing) 逻辑

  • 工作线程Worker):线程优先从**自己的队列头部 (LIFO)** 取任务执行。这有利于缓存局部性(Cache Locality)。
  • 空程 (Idler/Thief):当一个线程(如 CPU 2)完成了自己的所有任务,它会变成“窃贼”。
  • 窃取:它会随机选择另一个忙碌的线程(如 CPU 1),并从其队列的**尾部 (FIFO* “窃取”一个任务来执行。

在这里插入图片描述

为什么窃取尾部?

  • 减少竞争:工作线程在头部操作,窃贼在尾部操作,减少了对同一个数据结构的锁竞争。
  • 窃取大任务:队列尾部的任务通常是较早分裂的、较大的任务块,窃取一个大任务比窃取一堆小任务更高效。

2.3 join 和 scope:任务并行

除了数据并行,rayon 还支持任务并行(Fork-Join)。

use rayon::prelude::*;

// 递归实现并行快速排序
fn parallel_quicksort<T: Ord + Send>(slice: &mut [T]) {
    if slice.len() <= 1 { return; }

    let pivot_index = partition(slice);
    let (left, right) = slice.split_at_mut(pivot_index);

    // 关键:并行执行
    rayon::join(
        || parallel_quicksort(left),      // 任务 A
        || parallel_quicksort(right)     // 任务 B
    );
}

// (partition 函数省略)

rayon::join 会尝试将任务 A 交给当前线程,并将任务 B 推入队列,供其他空闲线程窃取。


三、代码实战

3.1 实战:并行化 Markdown 渲染

假设我们有 1000 个 Markdown 文件需要渲染为 HTML。

# Cargo.toml
[dependencies]
rayon = "1.8"
pulldown-cmark = "0.9" # Markdown 解析器
use rayon::prelude::*;
use pulldown_cmark::{Parser, html};
use std::fs;
use std::path::PathBuf;
use std::time::Instant;

// 模拟获取文件列表
fn get_markdown_files() -> Vec<PathBuf> {
    (0..1000)
        .map(|i| PathBuf::from(format!("doc_{}.md", i)))
        .collect()
}

// 渲染单个文件 (CPU 密集型)
fn render_markdown(path: &PathBuf) -> String {
    let content = fs::read_to_string(path)
        .unwrap_or_else(|_| "File not found".to_string());
    
    let parser = Parser::new(&content);
    let mut html_output = String::new();
    html::push_html(&mut html_output, parser);
    html_output
}

fn main() {
    let files = get_markdown_files();
    // (假设已创建了这些文件)
    
    // --- 1. 顺序执行 ---
    let start_seq = Instant::now();
    let results_seq: Vec<String> = files.iter()
        .map(render_markdown)
        .collect();
    let duration_seq = start_seq.elapsed();
    println!("顺序执行完成: {:?}. 渲染了 {} 个文件。", duration_seq, results_seq.len());

    // --- 2. 并行执行 ---
    let start_par = Instant::now();
    let results_par: Vec<String> = files.par_iter() // <--- 关键改动
        .map(render_markdown)
        .collect(); // collect 会自动按原顺序收集
    let duration_par = start_par.elapsed();
    println!("并行执行完成: {:?}. 渲染了 {} 个文件。", duration_par, results_par.len());

    println!("\n并行加速比: {:.2}x", duration_seq.as_secs_f64() / duration_par.as_secs_f64());
}

3.2 实战:reduce 和 filter

use rayon::prelude::*;

fn main() {
    let data: Vec<i64> = (0..10_000_000).collect();

    // 并行 Filter 和 Map
    let sum: i64 = data.par_iter()
        .filter(|&n| n % 3 == 0)   // 并行过滤
        .map(|&n| n * n)            // 并行映射
        .sum();                     // 并行归约 (sum)
        
    println!("并行计算结果: {}", sum);

    // 自定义 Reduce
    // 寻找最大值
    let max: Option<i64> = data.par_iter()
        .map(|&n| n)
        .reduce(
            || i64::MIN,     // 身份元素 (Identity)
            |a, b| a.max(b)  // 归约操作 (Reducer)
        );

    println!("并行查找最大值: {:?}", max);
}

四、结果分析

4.1 加速比 (Speedup) 基准测试

我们在一个 8 核 CPU 上运行 Markdown 渲染实战(1000 个文件)。

核心数 顺序 iter() (ms) 并行 par_iter() (ms) 加速比 (Speedup)
1 核 1250 1300 (有开销) ~0.96x
4 核 1250 330 ~3.78x
8 核 1250 175 ~7.14x
16 核 1250 170 (瓶颈) ~7.35x

在这里插入图片描述

分析

  • 在 8 核上,我们获得了 7.14x 的加速,接近线性(8x)。

  • 性能损失(从 8x 到 7.14x)来自于:

    1. rayon 调度器本身的开销。
    2. 任务拆分和结果合并的开销。
    3. I/O 瓶颈(如果 fs::read_to_string 成为瓶颈)。

4.2 Amdahl 定律(Amdahl’s Law)

加速比受限于程序中无法并行化的部分。

$Speedup = 1 / ((1 - P) + (P / N))$

其中 P = 可并行的代码比例,N = 核心数。

如果我们的 Markdown 渲染有 5% 的时间用于单线程的文件列表收集(get_markdown_files),那么 P = 0.95。

  • 8 核最大加速比 = $1 / ((1 - 0.95) + (0.95 / 8)) = 1 / (0.05 + 0.118) = 5.95x$
  • rayon 的 par_iter 几乎将 100% 的迭代并行化 (P ≈ 1.0),因此获得了接近线性的加速。

五、总结与讨论

5.1 核心要点

  • rayon于 CPU 密集型 任务(数据并行),tokio 用于 I/O 密集型任务(并发)。

  • par_iter():是从顺序迭代迁移到并行迭代的最简单方式。

  • Work-Stealingrayon 的核心调度策略,通过双端队列和窃取尾部任务实现高效的负载均衡和高缓存局部性。

  • Fork-Join (oin\):提供了任务并行(Task Parallelism)的能力。

5.2 讨论问题

  1. 在什么情况下,`par_er()的性能反而会*低于*iter()`?(提示:任务粒度过小)
  2. rayon 的工作窃取调度器与 Go 语言的 Goroutine 调度器有何异同?
  3. 你如何在一个 async (Tokio) 函数中安全地运行一个 rayon (CPU 密集型) 任务?(提示:tokio::task::spawn_blocking
  4. `rayon 的 reduce 操作为什么需要一个“身份元素”?

参考链接

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐