Rust 异步编程:从 Future Trait 到手写一个简易 Runtime

cover

一、异步编程的"黑盒"困境:用 Tokio 但不懂为什么

Rust 的 async/await 语法看起来和 JavaScript、Python 一样简单——加个 async,写个 .await,但底层机制完全不同。JavaScript 是单线程事件循环,Python 有 GIL,而 Rust 的 Future 是惰性的:声明 async 函数不会执行任何代码,必须有人调用 .await 推进状态机。这个"有人"就是 Runtime。Tokio 是最常用的 Runtime,但如果不懂 Future 的底层机制,遇到异步代码不执行、任务卡死、性能抖动等问题就只能靠猜。理解 Future Trait 和 Runtime 的工作原理,是从"会用 Tokio"到"能调异步问题"的关键一步。

graph TB
    A[async fn] --> B[编译器生成 Future 状态机]
    B --> C[.await 挂起当前任务]
    C --> D[Runtime 调度其他任务]
    D --> E[IO 就绪后唤醒 Waker]
    E --> F[Runtime 重新 Poll 该 Future]
    F --> G{Poll 结果}
    G -->|Pending| C
    G -->|Ready| H[返回结果]

    style B fill:#fff3e0
    style E fill:#e8f5e9

二、Future Trait 与 Runtime 的底层机制

2.1 Future Trait 的核心定义

Future 只有一个方法 poll:Runtime 调用 poll 推进状态机,返回 Pending(需要等待)或 Ready(value)(已完成)。Pending 时必须注册 Waker,让 Runtime 在条件满足时重新 poll。

stateDiagram-v2
    [*] --> Poll0: 首次 poll
    Poll0 --> Pending0: IO 未就绪<br/>注册 Waker
    Pending0 --> Poll1: Waker 唤醒
    Poll1 --> Pending1: IO 仍未就绪
    Pending1 --> Poll2: Waker 再次唤醒
    Poll2 --> Ready: IO 就绪<br/>返回值
    Ready --> [*]

2.2 Waker 的唤醒机制

Waker 是 Future 与 Runtime 之间的桥梁。当 IO 操作(如 epoll_wait)检测到文件描述符就绪时,操作系统通知 Runtime,Runtime 调用 Waker 唤醒对应的 Future,重新 poll。Waker 内部包含一个虚表(vtable)指针和数据指针,允许不同 Runtime 实现不同的唤醒策略。

2.3 Runtime 的调度循环

Runtime 的核心是一个循环:不断从就绪队列取任务执行 poll,将返回 Pending 的任务挂起等待 Waker 唤醒,唤醒后重新放入就绪队列。

三、手写简易 Runtime 与 Future 实现

3.1 Future Trait 的简化实现

use std::task::{Context, Poll, Waker, RawWaker, RawWakerVTable};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};

/// 简易 Future Trait(与标准库一致)
pub trait SimpleFuture {
    type Output;
    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

/// 一个定时器 Future:在指定时间后完成
pub struct TimerFuture {
    shared_state: Arc<Mutex<TimerState>>,
}

struct TimerState {
    completed: bool,
    waker: Option<Waker>,
}

impl TimerFuture {
    pub fn new(duration: std::time::Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(TimerState {
            completed: false,
            waker: None,
        }));

        // 启动后台线程模拟异步等待
        let state = Arc::clone(&shared_state);
        std::thread::spawn(move || {
            std::thread::sleep(duration);
            let mut state = state.lock().unwrap();
            state.completed = true;
            // 通知 Runtime 重新 poll
            if let Some(waker) = state.waker.take() {
                waker.wake();
            }
        });

        TimerFuture { shared_state }
    }
}

impl SimpleFuture for TimerFuture {
    type Output = ();

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut state = self.shared_state.lock().unwrap();
        if state.completed {
            Poll::Ready(())
        } else {
            // 注册 Waker,以便完成时能被唤醒
            state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

3.2 简易 Runtime 实现

/// 简易异步 Runtime
pub struct MiniRuntime {
    task_queue: Arc<Mutex<VecDeque<Task>>>,
}

/// 任务封装:Box 化的 Future + Waker
type Task = std::pin::Pin<Box<dyn SimpleFuture<Output = ()> + Send>>;

impl MiniRuntime {
    pub fn new() -> Self {
        Self {
            task_queue: Arc::new(Mutex::new(VecDeque::new())),
        }
    }

    /// 生成一个 Waker,唤醒时将任务放回就绪队列
    fn create_waker(&self, task_id: usize) -> Waker {
        let queue = Arc::clone(&self.task_queue);
        // 使用 RawWaker 实现自定义唤醒逻辑
        let data = Box::into_raw(Box::new((task_id, queue))) as *const ();
        let vtable = &RawWakerVTable::new(
            Self::clone_raw,
            Self::wake_raw,
            Self::wake_by_ref_raw,
            Self::drop_raw,
        );
        unsafe { Waker::from_raw(RawWaker::new(data, vtable)) }
    }

    unsafe fn clone_raw(data: *const ()) -> RawWaker {
        let (task_id, queue) = &*(data as *const (usize, Arc<Mutex<VecDeque<Task>>>));
        let cloned = Box::new((*task_id, Arc::clone(queue)));
        let data = Box::into_raw(cloned) as *const ();
        RawWaker::new(data, &RawWakerVTable::new(
            Self::clone_raw, Self::wake_raw, Self::wake_by_ref_raw, Self::drop_raw
        ))
    }

    unsafe fn wake_raw(data: *const ()) {
        let (task_id, queue) = Box::from_raw(data as *mut (usize, Arc<Mutex<VecDeque<Task>>>));
        // 唤醒时将任务重新放入队列(此处简化,实际需要保存 Future)
        let _ = (task_id, queue);
    }

    unsafe fn wake_by_ref_raw(data: *const ()) {
        Self::wake_raw(data);
    }

    unsafe fn drop_raw(data: *const ()) {
        let _ = Box::from_raw(data as *mut (usize, Arc<Mutex<VecDeque<Task>>>));
    }

    /// 运行所有任务直到完成
    pub fn block_on<F>(&self, future: F)
    where
        F: SimpleFuture<Output = ()> + Send + 'static,
    {
        let mut task = Box::pin(future);
        let waker = self.create_waker(0);
        let mut cx = Context::from_waker(&waker);

        loop {
            match task.as_mut().poll(&mut cx) {
                Poll::Ready(()) => break,
                Poll::Pending => {
                    // 简化处理:让出 CPU 等待唤醒
                    std::thread::yield_now();
                }
            }
        }
    }
}

3.3 组合多个 Future:Join 与 Select

/// Join: 等待所有 Future 完成
pub struct JoinAll<F> {
    futures: Vec<Option<F>>,
    pending_count: usize,
}

impl<F: SimpleFuture + Unpin> JoinAll<F> {
    pub fn new(futures: Vec<F>) -> Self {
        let pending_count = futures.len();
        Self {
            futures: futures.into_iter().map(Some).collect(),
            pending_count,
        }
    }
}

impl<F: SimpleFuture<Output = ()> + Unpin> SimpleFuture for JoinAll<F> {
    type Output = ();

    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        for future_slot in &mut self.futures {
            if let Some(future) = future_slot {
                match future.poll(cx) {
                    Poll::Ready(()) => {
                        *future_slot = None; // 已完成,移除
                        self.pending_count -= 1;
                    }
                    Poll::Pending => {}
                }
            }
        }

        if self.pending_count == 0 {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

/// Select: 返回最先完成的 Future 的结果
pub struct Select<F1, F2> {
    future1: Option<F1>,
    future2: Option<F2>,
}

impl<F1: SimpleFuture + Unpin, F2: SimpleFuture + Unpin> Select<F1, F2> {
    pub fn new(f1: F1, f2: F2) -> Self {
        Self { future1: Some(f1), future2: Some(f2) }
    }
}

impl<F1, F2> SimpleFuture for Select<F1, F2>
where
    F1: SimpleFuture + Unpin,
    F2: SimpleFuture + Unpin,
{
    type Output = ();

    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Poll 第一个 Future
        if let Some(ref mut f1) = self.future1 {
            if let Poll::Ready(()) = f1.poll(cx) {
                return Poll::Ready(());
            }
        }
        // Poll 第二个 Future
        if let Some(ref mut f2) = self.future2 {
            if let Poll::Ready(()) = f2.poll(cx) {
                return Poll::Ready(());
            }
        }
        Poll::Pending
    }
}

四、异步 Runtime 的架构权衡

4.1 单线程 vs 多线程 Runtime

维度 单线程 Runtime 多线程 Runtime (Tokio)
任务窃取 Work-Stealing 调度
CPU 利用 单核 多核
锁竞争 有(全局就绪队列)
适用场景 嵌入式/轻量服务 高并发服务端

4.2 协作式调度 vs 抢占式调度

Rust 的 Future 是协作式调度:任务必须主动 .await 让出控制权。如果某个 Future 在 poll 中执行长时间计算而不 yield,会阻塞整个线程。Tokio 提供了 tokio::task::yield_now() 强制让出,以及 coop 机制限制单次 poll 的预算。

4.3 适用边界与禁用场景

异步适用:

  • IO 密集型任务(网络请求、文件读写)
  • 高并发连接(HTTP 服务器、WebSocket)
  • 需要同时等待多个 IO 事件

异步禁用:

  • CPU 密集型计算(用 spawn_blocking 委托给线程池)
  • 简单的顺序逻辑(同步代码更清晰)
  • 不需要并发的场景(过度设计)

五、总结

Rust 的异步模型是"零成本抽象"的典范:async 函数编译为状态机,没有隐式的堆分配和运行时开销。但代价是必须显式管理 Runtime——Future 是惰性的,不会自动执行。理解 pollWakerContext 三者的协作关系,是诊断异步问题的前提。手写 Runtime 虽然不适合生产使用,但能揭示 Tokio 的核心机制:就绪队列、Waker 唤醒、任务调度。从"会用 async/await"到"理解底层状态机",这个跨越决定了面对异步 Bug 时的排查效率。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐