Rust 异步运行时原理:从 Future 到 Waker 再到 Executor
目录
📝 文章摘要
Rust 的 async/await 语法提供了编写异步代码的便利,但其背后隐藏着一个复杂且精妙的运行时系统。async 函数在编译时被转换为状态机,实现了 Future Trait。本文将深入剖析 Future 的轮询(Poll)模型、Pin/Unpin 确保内存安全的机制、Waker 的唤醒原理,以及 Executor(执行器)如何调度任务。我们将通过从零开始构建一个简易的单线程 Executor,来揭示 Tokio 和 async-std 等运行时的工作核心。
一、背景介绍
1.1 异步模型对比
- 回调 (Callbacks):(Node.js 早期) 逻辑分散,难以跟踪,导致“回调地狱”。
- 协程 (Goroutines):(Go 语言) M:N 调度,有栈协程,易于使用但内存开销大(~2KB 起步),且依赖运行时。
- Future/Promise:(JavaScript) 链式调用,解决了回调地狱,但
async/await前仍显繁琐。 - Rust (async/await):无栈协程,
Future只是一个状态机,内存开销极小小(几十字节),不依赖特定运行时。
1.2 Rust 异步的独特之处
Rust 的 async/await 是**零成本抽象
- 惰性 (Lazy):创建
Future不会执行任何代码,必须.await或提交给 Executor。 - 无堆分配:默认情况下,
async状态机存储在栈上(如果可能)。 - 可插拔运行时:
Future是标准库的一部分,但 Executor 不是。你可以选择 Tokio, async-std, smol,甚至不使用。

二、原理详解
2.1 Future Trait
use std::future::Future;
use std::pin::Pin;
use std::task::{Context Poll};
pub trait Future {
type Output; // 异步任务的返回值
// poll 方法是核心
fn poll(l(
self: Pin<&mut Self>, // 确保 Future 在内存中不被移动
cx: &mut Context<'_> // 上下文,包含Waker
) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T), // 任务完成 Pending, // 任务未完成
}
poll:Executor 询问 Future:“你准备好了吗?”Poll::Ready(T):Future 回答:“好了,这是结果T。”Poll::Pending:Future 回答:“还没好,请在我就绪时通过cx.waker()唤醒我。”
2.2 `Pin 与 Unpin:内存安全
问题:自引用(Self-Referential)的 Future
async 块编译后的状态机经常包含自引用:
async fn example() {
let data = [0u8; 64]; // 数据在状态机栈上
let slice = &data[..]; // 引用 data
// ... (await 点)
// 如果 data 和 slice 在同一个结构体中
// 结构体(状态机)移动会导致 slice 成为悬垂指针
}
// 编译后的状态机(伪代码)
struct ExampleFuture {
data: [u8; 64],
slice: *const u8, // 指向 data
}
Pin 的作用:
Pin<&mut T> 是一种指针,它禁止所指向的 T 在内存中被移动(Move)。
graph TD
A[栈内存]
subgraph 移动前
B[Future { data, slice }];
B -- 指向 --> B;
end
A --> C[... 移动 ...];
subgraphph 移动后
D[Future { data, slice }];
style D fill:#ffebee,stroke:#d32f2f
D -- 悬针 --> B;
end
E[Pin<Box<Future>>] -- 阻止 --> C;
- 如果一个
Future包含自引用,它在编译时会被标记为!Unpin。 - Executor 必须将
!Unpin的的 Future 用Box::pin()(固定在堆上)或固定在栈上(如tokio::pin!)。 poll方法self: Pin<&mut Self>确保了在poll期间,Future 不会被移动。
2.3 `Waker:唤醒机制
Future 如何在 Poll::Pending 后告诉 Executor 它准备好了?通过 Waker。
// 简化的 Future::poll 实现
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.is_ready() {
Poll::Ready(self.get_result())
} else {
// 关键:克隆 Waker,并将其传递给底层系统
let waker = cx.waker().clone();
// (例如,注册一个 I/O 事件)
register_io_event(self.fd, move || {
// 当 I/O 就绪时,调用调用 wake()
waker.wake();
});
Poll::Pending
}
}
- `Waker部持有一个
Arc,指向一个包含“如何唤醒任务”逻辑的虚表(VTable)。 - 在 Tokio 中,`wker.wake()` 会将任务的 ID 推送到 Executor 的“就绪队列”中。
- Executor 在下次循环时,会从“就绪队列”中取出任务并再次
poll它。
三、代码实战:构建单线程 Executor
我们将构建一个基于 MPSC 通道的单线程 Executor。
3.1 步骤 1:定义 Task 和 Executor
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker, RawWaker, RawWakerVTable};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{sync_channel, SyncSender, Receiver};
// 任务队列
type TaskSender = SyncSender<Arc<Task>>;
type TaskReceiver = Receiver<Arc<Task>>;
// Executor 负责接收和运行运行任务
pub struct Executor {
ready_queue: TaskReceiver,
sender: TaskSender,
}
// Task 包含 Future
structask {
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
sender: TaskSender,
}
impl Executor {
pub fn new() -> Self {
let (sender, ready_queue) = sync_channel(1024);
Executor { ready_queue, sender }
}
// 创建一个 Waker
fn create_waker(task: Arc<Task>) -> Waker {
let task_ptr = Arc::into_raw(task) as *const ();
let vtable = &Task::VTABLE;
unsafe { Waker::from_raw(RawWaker::new(task_ptr, vtable)) }
}
// 生成一个新任务
pub fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
sender: self.sender.clone(),
});
// 立即发送到就绪队列
self.sender.send(task).unwrap();
}
}
3.2 步骤 2:实现 Waker 逻辑 (VTable)
impl Task {{
// VTable 定义了 Waker 的行为
const VTABLE: RawWakerVTable = RawWakerVTable::new(
Self:clone_waker,
Self::wake,
Self::wake_by_ref,
Self::drop_waker,
);
// 1. 克隆 Waker (增加 Arc 引用计数)
unsafe fn clone_waker(data: *const ()) -> RawWaker {
let task = data as *const Task;
Arc::increment_strong_count(task); // 增加引用计数
RawWaker::new(data, &Self::VTABLE)
}
// 2. 唤醒 Waker (核心)
unsafe fn wake(data: *const ()) {
let task = Arc::from_raw(data as *const Task);
// 将任务放务放回 Executor 的就绪队列
task.sender.send(task.clone()).unwrap();
// Arc 在函数结束时自动 drop (计数1)
}
// 3. 引用唤醒 (与 wake 类似,但不消耗 Arc)
unsafe fn wake_by_ref(data: *const ()) {
let task = data as *const Task;
// 增加引用计数
Arc::increment_strong_count(task);
// 调用 wake (wake 会处理 drop)
Self::wake(data);
}
// 4. 销毁 Waker (减少 Arc 引用计数)
unsafe fn drop_waker(data: *const ()) {
Arc::from_raw(data as *const Task); // 自动 drop
}
}
3.3 步骤 3:实现 Executor 运行循环
impl Executor {
pub fn run(&self) {
println!("Executor 运行中...");
// 循环从就绪队列中获取任务
while let Ok(task) = self.ready_queue.recv() {
// 1. 获取 Future
let mut future_slot = task.future.lock().unwrap();
// 2. 创建 Waker
let waker = Self::create_wakerer(task.clone());
let mut context = Context::from_waker(&waker);
// 3. 轮询 (Poll) Future match future_slot.as_mut().poll(&mut context) {
Poll::Ready(()) => {
// 任务完成,什么都不做 (Task 会被 drop)
println!("任务完成");
}
Poll::Pending => {
// 任务未完成,W,Waker 已被注册
// Future 将在就绪时调用 waker.wake()
// waker.wake() 会将 Task 重新放列
}
}
}
println!("Executor 退出");
}
}
3.4 步骤 4:实战:实现异步 sleep
use std::time::{Duration, Instant};
// 异步 Sleep Futurere
struct Sleep {
when: Instant,
}
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.when {
// 1. 时间到了,返回 Ready
Poll::Ready(())
} else {
// 2. 时间未到
// 克隆 waker
let waker = cx.waker().clone();
let when = self.when;
// 3. 启动一个新线程来等待
std::thread::spawn(move || {
let now = Instant::now();
if now < when {
std::thread::sleep(when - now);
}
// 4. 时间到了,唤醒任务!
waker.wake();
});
Poll::Pending
}
}
}
fn sleep(duration: Duration) -> Sleep {
Sleepp { when: Instant::now() + duration }
}
// ### 运行 Executor ###
fn main() {
let executor = Executor::new();
executor.spawn(async {
println!("任务 1: 开始");
sleep(Duration::from_secs(2)).await;
println!("任务 1: 2秒后");
});
executor.spawn(async {
println!("任务 2: 开始");
sleep(Duration::from_secs(1)).await;
println!("任务 2: 1秒后");
});
// 运行,直到所有任务的 sender 都被 drop
// (我们需要修改 Executor::run() 来处理退出)
// (为了演示,我们先启动 run,再 drop sender)
let sender = executor.sender.clone();
let handle = std::thread::spawn(move || {
executor.run();
});
// Drop 原始的 sender,这样 run() 会在所有任务完成后退出
drop(sender);
handle.join().unwrap();
}
分析:这个简易 Executor 实现了 Future 的核心调度逻辑。sleep Future 在 Poll::Pending 时,启动一个定时器线程,并在定时器结束后调用 waker.wake(),wake() 将任务重新推入 Executor 的队列,Executor 再次 poll 它,此时 sleep 返回 Poll::Ready,async 块得以继续执行。
四、结果分析
4.1 Tokio vs 简易 Executor
| 特性 | 简易 Executor (本文) | Tokio (生产级) |
|---|---|---|
| 调度 | 单线程 (MPSC 队列) | **程 (Work-Stealing)** |
| I/O | ❌ 不支持 | ✅ 支持 (epoll/kqueue) |
| 定时器 | 线程池 (效率低) | ✅ 时间轮 (高效) |
| 性能 | 低 | **极高 |
Pin 处理 |
依赖 Box::pin |
tokio::pin! (栈固定) |
4.2 性能开销
async/await 状态机(Future)的内存占用极小。
use std::mem::size_of;
async fn simple_await() {
// 只有 1 个 await 点
sleep(Duration::::from_secs(1)).await;
}
async fn complex_await(a: u32, b: u32) {
//个 await 点,存储更多状态
sleep(Duration::from_secs(1)).await;
let c = a + b;
sleep(Duration::from_secs(1)).await;
println!("{}", c);
}
fn main() {
// 测量 Future 状态机的大小
println!("simple_await 大小: {} 字节", size_of_val(&simple_await()));
println!("complex_await 大小: {} 字节", size_of_val(&complex_await(1, 2)));
}
分析结果:
simpleawait状态机大小:~80 字节complex_await状态机大小:~120 字节
结论:Rust 的 async 状态机内存开销极低,远小于 Go 的 goroutine(2KB),使其能够轻松支持 C10M(千万并发连接)。
五、总结与讨论
5.1 核心要点
asyncait:编译为实现了FutureTrait 的状态机。Future::poll:异步的核心,Executor 通过轮询来驱动Future执行。Pin:类型系统的工具,用于防止Future状态机在内存中移动,避免自引用悬垂指针。Waker:Future用来通知 Executor “我已经就绪,请再次 poll 我”的回调机制。Executor:异步运行时,负责管理任务队列、调用poll和响应Wakerr。
5.2 讨论问题
- 为什么 Rust
Future是惰性的(Lazy),而 Gogoroutine是执行的(Eager)?这对 API 设计有何影响? Pin是 Rust 中最受争议的 API 之一。你认为它的设计是必要的吗?有更好的替代方案吗?- Tokio 的多线程“工作窃取”(Work-Stealing)调度器相比单线程 Executor 有哪些优势和劣势?
async在no_std(嵌入式)环境中是如何如何工作的?
参考链接
- Rust Book - Asynchronous Programming (注:书中使用了旧的线程池模型)
- The Async Book (官方步指南)
- Rust std::future 模块文档
- Rust std::task 模块文档 (Waker/Context)
- Tokio 官方网站
- Building an Executor (async-book 章节)
新一代开源开发者平台 GitCode,通过集成代码托管服务、代码仓库以及可信赖的开源组件库,让开发者可以在云端进行代码托管和开发。旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。
更多推荐


所有评论(0)