Rust 异步编程:从 Future Trait 到手写一个简易 Runtime
Rust 异步编程:从 Future Trait 到手写一个简易 Runtime

一、异步编程的"黑盒"困境:用 Tokio 但不懂为什么
Rust 的 async/await 语法看起来和 JavaScript、Python 一样简单——加个 async,写个 .await,但底层机制完全不同。JavaScript 是单线程事件循环,Python 有 GIL,而 Rust 的 Future 是惰性的:声明 async 函数不会执行任何代码,必须有人调用 .await 推进状态机。这个"有人"就是 Runtime。Tokio 是最常用的 Runtime,但如果不懂 Future 的底层机制,遇到异步代码不执行、任务卡死、性能抖动等问题就只能靠猜。理解 Future Trait 和 Runtime 的工作原理,是从"会用 Tokio"到"能调异步问题"的关键一步。
graph TB
A[async fn] --> B[编译器生成 Future 状态机]
B --> C[.await 挂起当前任务]
C --> D[Runtime 调度其他任务]
D --> E[IO 就绪后唤醒 Waker]
E --> F[Runtime 重新 Poll 该 Future]
F --> G{Poll 结果}
G -->|Pending| C
G -->|Ready| H[返回结果]
style B fill:#fff3e0
style E fill:#e8f5e9
二、Future Trait 与 Runtime 的底层机制
2.1 Future Trait 的核心定义
Future 只有一个方法 poll:Runtime 调用 poll 推进状态机,返回 Pending(需要等待)或 Ready(value)(已完成)。Pending 时必须注册 Waker,让 Runtime 在条件满足时重新 poll。
stateDiagram-v2
[*] --> Poll0: 首次 poll
Poll0 --> Pending0: IO 未就绪<br/>注册 Waker
Pending0 --> Poll1: Waker 唤醒
Poll1 --> Pending1: IO 仍未就绪
Pending1 --> Poll2: Waker 再次唤醒
Poll2 --> Ready: IO 就绪<br/>返回值
Ready --> [*]
2.2 Waker 的唤醒机制
Waker 是 Future 与 Runtime 之间的桥梁。当 IO 操作(如 epoll_wait)检测到文件描述符就绪时,操作系统通知 Runtime,Runtime 调用 Waker 唤醒对应的 Future,重新 poll。Waker 内部包含一个虚表(vtable)指针和数据指针,允许不同 Runtime 实现不同的唤醒策略。
2.3 Runtime 的调度循环
Runtime 的核心是一个循环:不断从就绪队列取任务执行 poll,将返回 Pending 的任务挂起等待 Waker 唤醒,唤醒后重新放入就绪队列。
三、手写简易 Runtime 与 Future 实现
3.1 Future Trait 的简化实现
use std::task::{Context, Poll, Waker, RawWaker, RawWakerVTable};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
/// 简易 Future Trait(与标准库一致)
pub trait SimpleFuture {
type Output;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
/// 一个定时器 Future:在指定时间后完成
pub struct TimerFuture {
shared_state: Arc<Mutex<TimerState>>,
}
struct TimerState {
completed: bool,
waker: Option<Waker>,
}
impl TimerFuture {
pub fn new(duration: std::time::Duration) -> Self {
let shared_state = Arc::new(Mutex::new(TimerState {
completed: false,
waker: None,
}));
// 启动后台线程模拟异步等待
let state = Arc::clone(&shared_state);
std::thread::spawn(move || {
std::thread::sleep(duration);
let mut state = state.lock().unwrap();
state.completed = true;
// 通知 Runtime 重新 poll
if let Some(waker) = state.waker.take() {
waker.wake();
}
});
TimerFuture { shared_state }
}
}
impl SimpleFuture for TimerFuture {
type Output = ();
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.shared_state.lock().unwrap();
if state.completed {
Poll::Ready(())
} else {
// 注册 Waker,以便完成时能被唤醒
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
3.2 简易 Runtime 实现
/// 简易异步 Runtime
pub struct MiniRuntime {
task_queue: Arc<Mutex<VecDeque<Task>>>,
}
/// 任务封装:Box 化的 Future + Waker
type Task = std::pin::Pin<Box<dyn SimpleFuture<Output = ()> + Send>>;
impl MiniRuntime {
pub fn new() -> Self {
Self {
task_queue: Arc::new(Mutex::new(VecDeque::new())),
}
}
/// 生成一个 Waker,唤醒时将任务放回就绪队列
fn create_waker(&self, task_id: usize) -> Waker {
let queue = Arc::clone(&self.task_queue);
// 使用 RawWaker 实现自定义唤醒逻辑
let data = Box::into_raw(Box::new((task_id, queue))) as *const ();
let vtable = &RawWakerVTable::new(
Self::clone_raw,
Self::wake_raw,
Self::wake_by_ref_raw,
Self::drop_raw,
);
unsafe { Waker::from_raw(RawWaker::new(data, vtable)) }
}
unsafe fn clone_raw(data: *const ()) -> RawWaker {
let (task_id, queue) = &*(data as *const (usize, Arc<Mutex<VecDeque<Task>>>));
let cloned = Box::new((*task_id, Arc::clone(queue)));
let data = Box::into_raw(cloned) as *const ();
RawWaker::new(data, &RawWakerVTable::new(
Self::clone_raw, Self::wake_raw, Self::wake_by_ref_raw, Self::drop_raw
))
}
unsafe fn wake_raw(data: *const ()) {
let (task_id, queue) = Box::from_raw(data as *mut (usize, Arc<Mutex<VecDeque<Task>>>));
// 唤醒时将任务重新放入队列(此处简化,实际需要保存 Future)
let _ = (task_id, queue);
}
unsafe fn wake_by_ref_raw(data: *const ()) {
Self::wake_raw(data);
}
unsafe fn drop_raw(data: *const ()) {
let _ = Box::from_raw(data as *mut (usize, Arc<Mutex<VecDeque<Task>>>));
}
/// 运行所有任务直到完成
pub fn block_on<F>(&self, future: F)
where
F: SimpleFuture<Output = ()> + Send + 'static,
{
let mut task = Box::pin(future);
let waker = self.create_waker(0);
let mut cx = Context::from_waker(&waker);
loop {
match task.as_mut().poll(&mut cx) {
Poll::Ready(()) => break,
Poll::Pending => {
// 简化处理:让出 CPU 等待唤醒
std::thread::yield_now();
}
}
}
}
}
3.3 组合多个 Future:Join 与 Select
/// Join: 等待所有 Future 完成
pub struct JoinAll<F> {
futures: Vec<Option<F>>,
pending_count: usize,
}
impl<F: SimpleFuture + Unpin> JoinAll<F> {
pub fn new(futures: Vec<F>) -> Self {
let pending_count = futures.len();
Self {
futures: futures.into_iter().map(Some).collect(),
pending_count,
}
}
}
impl<F: SimpleFuture<Output = ()> + Unpin> SimpleFuture for JoinAll<F> {
type Output = ();
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
for future_slot in &mut self.futures {
if let Some(future) = future_slot {
match future.poll(cx) {
Poll::Ready(()) => {
*future_slot = None; // 已完成,移除
self.pending_count -= 1;
}
Poll::Pending => {}
}
}
}
if self.pending_count == 0 {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
/// Select: 返回最先完成的 Future 的结果
pub struct Select<F1, F2> {
future1: Option<F1>,
future2: Option<F2>,
}
impl<F1: SimpleFuture + Unpin, F2: SimpleFuture + Unpin> Select<F1, F2> {
pub fn new(f1: F1, f2: F2) -> Self {
Self { future1: Some(f1), future2: Some(f2) }
}
}
impl<F1, F2> SimpleFuture for Select<F1, F2>
where
F1: SimpleFuture + Unpin,
F2: SimpleFuture + Unpin,
{
type Output = ();
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll 第一个 Future
if let Some(ref mut f1) = self.future1 {
if let Poll::Ready(()) = f1.poll(cx) {
return Poll::Ready(());
}
}
// Poll 第二个 Future
if let Some(ref mut f2) = self.future2 {
if let Poll::Ready(()) = f2.poll(cx) {
return Poll::Ready(());
}
}
Poll::Pending
}
}
四、异步 Runtime 的架构权衡
4.1 单线程 vs 多线程 Runtime
| 维度 | 单线程 Runtime | 多线程 Runtime (Tokio) |
|---|---|---|
| 任务窃取 | 无 | Work-Stealing 调度 |
| CPU 利用 | 单核 | 多核 |
| 锁竞争 | 无 | 有(全局就绪队列) |
| 适用场景 | 嵌入式/轻量服务 | 高并发服务端 |
4.2 协作式调度 vs 抢占式调度
Rust 的 Future 是协作式调度:任务必须主动 .await 让出控制权。如果某个 Future 在 poll 中执行长时间计算而不 yield,会阻塞整个线程。Tokio 提供了 tokio::task::yield_now() 强制让出,以及 coop 机制限制单次 poll 的预算。
4.3 适用边界与禁用场景
异步适用:
- IO 密集型任务(网络请求、文件读写)
- 高并发连接(HTTP 服务器、WebSocket)
- 需要同时等待多个 IO 事件
异步禁用:
- CPU 密集型计算(用
spawn_blocking委托给线程池) - 简单的顺序逻辑(同步代码更清晰)
- 不需要并发的场景(过度设计)
五、总结
Rust 的异步模型是"零成本抽象"的典范:async 函数编译为状态机,没有隐式的堆分配和运行时开销。但代价是必须显式管理 Runtime——Future 是惰性的,不会自动执行。理解 poll、Waker、Context 三者的协作关系,是诊断异步问题的前提。手写 Runtime 虽然不适合生产使用,但能揭示 Tokio 的核心机制:就绪队列、Waker 唤醒、任务调度。从"会用 async/await"到"理解底层状态机",这个跨越决定了面对异步 Bug 时的排查效率。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)