目录

📝 文章摘要

一、背景介绍

1.1 异步模型对比

1.2 Rust 异步的独特之处

二、原理详解

2.1 Future Trait

2.2 `Pin 与 Unpin:内存安全

2.3 `Waker:唤醒机制

三、代码实战:构建单线程 Executor

3.1 步骤 1:定义 Task 和 Executor

3.2 步骤 2:实现 Waker 逻辑 (VTable)

3.3 步骤 3:实现 Executor 运行循环

3.4 步骤 4:实战:实现异步 sleep

四、结果分析

4.1 Tokio vs 简易 Executor

4.2 性能开销

五、总结与讨论

5.1 核心要点

5.2 讨论问题

参考链接


📝 文章摘要

Rust 的 async/await 语法提供了编写异步代码的便利,但其背后隐藏着一个复杂且精妙的运行时系统。async 函数在编译时被转换为状态机,实现了 Future Trait。本文将深入剖析 Future 的轮询(Poll)模型、Pin/Unpin 确保内存安全的机制、Waker 的唤醒原理,以及 Executor(执行器)如何调度任务。我们将通过从零开始构建一个简易的单线程 Executor,来揭示 Tokio 和 async-std 等运行时的工作核心。


一、背景介绍

1.1 异步模型对比

  • 回调 (Callbacks):(Node.js 早期) 逻辑分散,难以跟踪,导致“回调地狱”。
  • 协程 (Goroutines):(Go 语言) M:N 调度,有栈协程,易于使用但内存开销大(~2KB 起步),且依赖运行时。
  • Future/Promise:(JavaScript) 链式调用,解决了回调地狱,但 async/await 前仍显繁琐。
  • Rust (async/await):无栈协程,Future 只是一个状态机,内存开销极小小(几十字节),不依赖特定运行时。

1.2 Rust 异步的独特之处

Rust 的 async/await 是**零成本抽象

  1. 惰性 (Lazy):创建 Future 不会执行任何代码,必须 .await 或提交给 Executor。
  2. 无堆分配:默认情况下,async 状态机存储在栈上(如果可能)。
  3. 可插拔运行时Future 是标准库的一部分,但 Executor 不是。你可以选择 Tokio, async-std, smol,甚至不使用。

在这里插入图片描述


二、原理详解

2.1 Future Trait

use std::future::Future;
use std::pin::Pin;
use std::task::{Context Poll};

pub trait Future {
    type Output; // 异步任务的返回值

    // poll 方法是核心
    fn poll(l(
        self: Pin<&mut Self>, // 确保 Future 在内存中不被移动
        cx: &mut Context<'_>   // 上下文,包含Waker
    ) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),  // 任务完成    Pending,   // 任务未完成
}
  • poll:Executor 询问 Future:“你准备好了吗?”
  • Poll::Ready(T):Future 回答:“好了,这是结果 T。”
  • Poll::Pending:Future 回答:“还没好,请在我就绪时通过 cx.waker() 唤醒我。”

2.2 `Pin 与 Unpin:内存安全

问题:自引用(Self-Referential)的 Future

async 块编译后的状态机经常包含自引用:

async fn example() {
    let data = [0u8; 64]; // 数据在状态机栈上
    let slice = &data[..]; // 引用 data
    
    // ... (await 点)
    
    // 如果 data 和 slice 在同一个结构体中
    // 结构体(状态机)移动会导致 slice 成为悬垂指针
}

// 编译后的状态机(伪代码)
struct ExampleFuture {
    data: [u8; 64],
    slice: *const u8, // 指向 data
}

Pin 的作用

Pin<&mut T> 是一种指针,它禁止所指向的 T 在内存中被移动(Move)。

graph TD
    A[栈内存]
    subgraph 移动前
        B[Future { data, slice }];
        B -- 指向 --> B;
    end
    
    A --> C[... 移动 ...];

    subgraphph 移动后
        D[Future { data, slice }];
        style D fill:#ffebee,stroke:#d32f2f
        D -- 悬针 --> B;
    end
    
    E[Pin<Box<Future>>] -- 阻止 --> C;
  • 如果一个 Future 包含自引用,它在编译时会被标记为 !Unpin
  • Executor 必须将 !Unpin 的的 Future 用 Box::pin()(固定在堆上)或固定在栈上(如 tokio::pin!)。
  • poll 方法 self: Pin<&mut Self> 确保了在 poll 期间,Future 不会被移动。

2.3 `Waker:唤醒机制

Future 如何在 Poll::Pending 后告诉 Executor 它准备好了?通过 Waker

// 简化的 Future::poll 实现
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    if self.is_ready() {
        Poll::Ready(self.get_result())
    } else {
        // 关键:克隆 Waker,并将其传递给底层系统
        let waker = cx.waker().clone();
        
        // (例如,注册一个 I/O 事件)
        register_io_event(self.fd, move || {
            // 当 I/O 就绪时,调用调用 wake()
            waker.wake();
        });
        
        Poll::Pending
    }
}
  • `Waker部持有一个 Arc,指向一个包含“如何唤醒任务”逻辑的虚表(VTable)。
  • 在 Tokio 中,`wker.wake()` 会将任务的 ID 推送到 Executor 的“就绪队列”中。
  • Executor 在下次循环时,会从“就绪队列”中取出任务并再次 poll 它。

三、代码实战:构建单线程 Executor

我们将构建一个基于 MPSC 通道的单线程 Executor。

3.1 步骤 1:定义 Task 和 Executor

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker, RawWaker, RawWakerVTable};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{sync_channel, SyncSender, Receiver};

// 任务队列
type TaskSender = SyncSender<Arc<Task>>;
type TaskReceiver = Receiver<Arc<Task>>;

// Executor 负责接收和运行运行任务
pub struct Executor {
    ready_queue: TaskReceiver,
    sender: TaskSender,
}

// Task 包含 Future
structask {
    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
    sender: TaskSender,
}

impl Executor {
    pub fn new() -> Self {
        let (sender, ready_queue) = sync_channel(1024);
        Executor { ready_queue, sender }
    }

    // 创建一个 Waker
    fn create_waker(task: Arc<Task>) -> Waker {
        let task_ptr = Arc::into_raw(task) as *const ();
        let vtable = &Task::VTABLE;
        
        unsafe { Waker::from_raw(RawWaker::new(task_ptr, vtable)) }
    }
    
    // 生成一个新任务
    pub fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
        let task = Arc::new(Task {
            future: Mutex::new(Box::pin(future)),
            sender: self.sender.clone(),
        });
        
        // 立即发送到就绪队列
        self.sender.send(task).unwrap();
    }
}

3.2 步骤 2:实现 Waker 逻辑 (VTable)

impl Task {{
    // VTable 定义了 Waker 的行为
    const VTABLE: RawWakerVTable = RawWakerVTable::new(
        Self:clone_waker,
        Self::wake,
        Self::wake_by_ref,
        Self::drop_waker,
    );

    // 1. 克隆 Waker (增加 Arc 引用计数)
    unsafe fn clone_waker(data: *const ()) -> RawWaker {
        let task = data as *const Task;
        Arc::increment_strong_count(task); // 增加引用计数
        RawWaker::new(data, &Self::VTABLE)
    }

    // 2. 唤醒 Waker (核心)
    unsafe fn wake(data: *const ()) {
        let task = Arc::from_raw(data as *const Task);
        // 将任务放务放回 Executor 的就绪队列
        task.sender.send(task.clone()).unwrap();
        // Arc 在函数结束时自动 drop (计数1)
    }

    // 3. 引用唤醒 (与 wake 类似,但不消耗 Arc)
    unsafe fn wake_by_ref(data: *const ()) {
        let task = data as *const Task;
        // 增加引用计数
        Arc::increment_strong_count(task);
        // 调用 wake (wake 会处理 drop)
        Self::wake(data);
    }

    // 4. 销毁 Waker (减少 Arc 引用计数)
    unsafe fn drop_waker(data: *const ()) {
        Arc::from_raw(data as *const Task); // 自动 drop
    }
}

3.3 步骤 3:实现 Executor 运行循环

impl Executor {
    pub fn run(&self) {
        println!("Executor 运行中...");
        // 循环从就绪队列中获取任务
        while let Ok(task) = self.ready_queue.recv() {
            // 1. 获取 Future
            let mut future_slot = task.future.lock().unwrap();
            
            // 2. 创建 Waker
            let waker = Self::create_wakerer(task.clone());
            let mut context = Context::from_waker(&waker);

            // 3. 轮询 (Poll) Future           match future_slot.as_mut().poll(&mut context) {
                Poll::Ready(()) => {
                    // 任务完成,什么都不做 (Task 会被 drop)
                    println!("任务完成");
                }
                Poll::Pending => {
                    // 任务未完成,W,Waker 已被注册
                    // Future 将在就绪时调用 waker.wake()
                    // waker.wake() 会将 Task 重新放列
                }
            }
        }
        println!("Executor 退出");
    }
}

3.4 步骤 4:实战:实现异步 sleep

use std::time::{Duration, Instant};

// 异步 Sleep Futurere
struct Sleep {
    when: Instant,
}

impl Future for Sleep {
    type Output = ();

    fn poll(self: Pin<&mut Self cx: &mut Context<'_>) -> Poll<Self::Output> {
        if Instant::now() >= self.when {
            // 1. 时间到了,返回 Ready
            Poll::Ready(())
        } else {
            // 2. 时间未到
            // 克隆 waker
            let waker = cx.waker().clone();
            let when = self.when;
            
            // 3. 启动一个新线程来等待
            std::thread::spawn(move || {
                let now = Instant::now();
                if now < when {
                    std::thread::sleep(when - now);
                }
                // 4. 时间到了,唤醒任务!
                waker.wake();
            });
            
            Poll::Pending
        }
    }
}

fn sleep(duration: Duration) -> Sleep {
    Sleepp { when: Instant::now() + duration }
}

// ### 运行 Executor ###
fn main() {
    let executor = Executor::new();
    executor.spawn(async {
        println!("任务 1: 开始");
        sleep(Duration::from_secs(2)).await;
        println!("任务 1: 2秒后");
    });
    
    executor.spawn(async {
        println!("任务 2: 开始");
        sleep(Duration::from_secs(1)).await;
        println!("任务 2: 1秒后");
    });

    // 运行,直到所有任务的 sender 都被 drop
    // (我们需要修改 Executor::run() 来处理退出)
    
    // (为了演示,我们先启动 run,再 drop sender)
    let sender = executor.sender.clone();
    let handle = std::thread::spawn(move || {
        executor.run();
    });
    
    // Drop 原始的 sender,这样 run() 会在所有任务完成后退出
    drop(sender); 
    handle.join().unwrap();
}

分析:这个简易 Executor 实现了 Future 的核心调度逻辑。sleep Future 在 Poll::Pending 时,启动一个定时器线程,并在定时器结束后调用 waker.wake()wake() 将任务重新推入 Executor 的队列,Executor 再次 poll 它,此时 sleep 返回 Poll::Readyasync 块得以继续执行。


四、结果分析

4.1 Tokio vs 简易 Executor

特性 简易 Executor (本文) Tokio (生产级)
调度 单线程 (MPSC 队列) **程 (Work-Stealing)**
I/O ❌ 不支持 ✅ 支持 (epoll/kqueue)
定时器 线程池 (效率低) ✅ 时间轮 (高效)
性能 **极高
Pin 处理 依赖 Box::pin tokio::pin! (栈固定)

4.2 性能开销

async/await 状态机(Future)的内存占用极小。

use std::mem::size_of;

async fn simple_await() {
    // 只有 1 个 await 点
    sleep(Duration::::from_secs(1)).await;
}

async fn complex_await(a: u32, b: u32) {
    //个 await 点,存储更多状态
    sleep(Duration::from_secs(1)).await;
    let c = a + b;
    sleep(Duration::from_secs(1)).await;
    println!("{}", c);
}

fn main() {
    // 测量 Future 状态机的大小
    println!("simple_await 大小: {} 字节", size_of_val(&simple_await()));
    println!("complex_await 大小: {} 字节", size_of_val(&complex_await(1, 2)));
}

分析结果

  • simpleawait 状态机大小:~80 字节
  • complex_await 状态机大小:~120 字节

结论:Rust 的 async 状态机内存开销极低,远小于 Go 的 goroutine(2KB),使其能够轻松支持 C10M(千万并发连接)。


五、总结与讨论

5.1 核心要点

  • asyncait:编译为实现了 Future Trait 的状态机。
  • Future::poll:异步的核心,Executor 通过轮询来驱动 Future 执行。
  • Pin:类型系统的工具,用于防止 Future 状态机在内存中移动,避免自引用悬垂指针。
  • WakerFuture 用来通知 Executor “我已经就绪,请再次 poll 我”的回调机制。
  • Executor:异步运行时,负责管理任务队列、调用 poll 和响应 Wakerr

5.2 讨论问题

  1. 为什么 Rust Future 是惰性的(Lazy),而 Go goroutine 是执行的(Eager)?这对 API 设计有何影响?
  2. Pin 是 Rust 中最受争议的 API 之一。你认为它的设计是必要的吗?有更好的替代方案吗?
  3. Tokio 的多线程“工作窃取”(Work-Stealing)调度器相比单线程 Executor 有哪些优势和劣势?
  4. async 在 no_std(嵌入式)环境中是如何如何工作的?

参考链接

Logo

新一代开源开发者平台 GitCode,通过集成代码托管服务、代码仓库以及可信赖的开源组件库,让开发者可以在云端进行代码托管和开发。旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐