Poll 机制与状态机转换:Rust 异步引擎的解剖与实战
·

“不要调用我,我会调用你”——好莱坞原则在 Rust 异步世界的终极体现。
0. 引言:为什么需要再谈 Poll?
在已经了解了 Future、async/await、Waker 之后,为什么还要单独聚焦 Poll?
因为 Poll 是 Rust 异步运行时唯一真正执行的动作;其余一切(状态机、Waker、Pin、调度器)都是为了让这个单一动作高效、安全且零成本。
本文将带你:
- 用显微镜观察一次
poll()调用在 CPU 上的真实路径; - 亲手实现 可组合、零分配、零拷贝 的状态机;
- 基于 Poll 打造 可视化调试器,在线追踪 Future 状态;
- 总结一套 “状态机设计模式”,可直接应用于生产代码。
1. Poll 的语义:两个单词,四个承诺
pub enum Poll<T> {
Ready(T),
Pending,
}
| 承诺 | 违反后果 |
|---|---|
| 幂等性 | 同一 &mut Context 多次 poll 结果一致 |
| 不移动 | Pin 保证内存地址不变 |
| Waker 契约 | Pending 必须注册/更新 Waker |
| Ready 即终点 | 返回 Ready 后永不再次 poll |
违反任一承诺都会导致 UB 或 死锁。
2. 从零开始:一条指令的旅行
2.1 CPU 视角的 poll 调用
#[inline(never)]
fn trace_poll() {
let fut = async {
println!("👟 enter");
yield_now().await;
println!("👟 resume");
};
tokio::pin!(fut);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(()) => break,
Poll::Pending => {
// 在真实运行时这里会 park 线程,我们简化
println!("🛑 parked");
}
}
}
}
cargo asm 后关键片段(精简):
; 第一次 poll
call <example::async_block::{{closure}}>
; 第二次 poll(resume)
call <example::async_block::{{closure}}>
结论:
- 每次
poll()对应一次 状态机函数的尾调用 - 状态机函数是一个 可重入的巨型 switch
- 所有局部变量被提升到 enum 字段 中,因此 零栈展开开销
3. 手写状态机:可组合的计时器
3.1 状态定义
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
#[derive(Debug)]
enum TimerState {
NotStarted(Duration),
Waiting { deadline: Instant },
Done,
}
pub struct Timer {
state: TimerState,
}
impl Timer {
pub fn new(dur: Duration) -> Self {
Self {
state: TimerState::NotStarted(dur),
}
}
}
3.2 状态机实现
impl Future for Timer {
type Output = Instant;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match &mut self.state {
TimerState::NotStarted(dur) => {
let deadline = Instant::now() + *dur;
self.state = TimerState::Waiting { deadline };
println!("⏱️ Timer armed, deadline={:?}", deadline);
}
TimerState::Waiting { deadline } => {
if Instant::now() >= *deadline {
self.state = TimerState::Done;
println!("✅ Timer ready");
return Poll::Ready(*deadline);
}
// 注册定时器 Waker(简化:实际用 tokio::time)
cx.waker().wake_by_ref();
return Poll::Pending;
}
TimerState::Done => panic!("Timer polled after completion"),
}
}
}
}
3.3 零拷贝组合:Timer + Map
#[pin_project]
struct Map<F, M> {
#[pin]
inner: F,
map: Option<M>,
}
impl<F, M, T, U> Future for Map<F, M>
where
F: Future<Output = T>,
M: FnOnce(T) -> U,
{
type Output = U;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.inner.poll(cx) {
Poll::Ready(val) => {
let f = this.map.take().unwrap();
Poll::Ready(f(val))
}
Poll::Pending => Poll::Pending,
}
}
}
#[tokio::test]
async fn timer_composition() {
let timer = Timer::new(Duration::from_millis(10));
let mapped = Map {
inner: timer,
map: Some(|t| t.elapsed()),
};
let elapsed = mapped.await;
println!("Elapsed: {:?}", elapsed);
}
4. 可视化调试器:追踪 Future 生命周期
4.1 调试 Future 接口
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Clone)]
struct TraceFuture<F> {
id: u64,
inner: F,
}
static NEXT_ID: AtomicU64 = AtomicU64::new(0);
impl<F> TraceFuture<F> {
fn new(inner: F) -> Self {
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
println!("🆔 Future {} created", id);
Self { id, inner }
}
}
impl<F: Future> Future for TraceFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("📊 Future {} polled", self.id);
let this = unsafe { self.get_unchecked_mut() };
let res = unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx);
match res {
Poll::Ready(v) => println!("✅ Future {} ready", this.id),
Poll::Pending => println!("⏸️ Future {} pending", this.id),
}
res
}
}
4.2 宏化追踪
macro_rules! trace {
($expr:expr) => {{
TraceFuture::new($expr)
}};
}
#[tokio::main]
async fn main() {
let f1 = trace!(Timer::new(Duration::from_millis(20)));
let f2 = trace!(Timer::new(Duration::from_millis(30)));
let (r1, r2) = tokio::join!(f1, f2);
println!("Results: {:?}, {:?}", r1, r2);
}
输出示例:
🆔 Future 0 created
🆔 Future 1 created
📊 Future 0 polled
⏸️ Future 0 pending
📊 Future 1 polled
⏸️ Future 1 pending
...
✅ Future 0 ready
✅ Future 1 ready
5. 生产级模式:可取消、可合并的 Future
5.1 可取消 Future
use std::sync::Arc;
use tokio::sync::Notify;
#[pin_project]
struct Cancellable<F> {
#[pin]
inner: F,
cancel: Arc<Notify>,
}
impl<F> Future for Cancellable<F>
where
F: Future,
{
type Output = Result<F::Output, Cancelled>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
// 检查取消信号
if let Poll::Ready(()) = Pin::new(this.cancel).poll(cx) {
return Poll::Ready(Err(Cancelled));
}
// 轮询内部
this.inner.poll(cx).map(Ok)
}
}
#[derive(Debug)]
struct Cancelled;
5.2 批量并行 Future(零分配)
/// 固定大小的批量执行,零堆分配
#[pin_project]
struct ArrayJoin<const N: usize, F> {
#[pin]
futures: [F; N],
results: [Option<F::Output>; N],
completed: usize,
}
impl<const N: usize, F> ArrayJoin<N, F>
where
F: Future,
{
fn new(futures: [F; N]) -> Self {
Self {
futures,
results: [None; N],
completed: 0,
}
}
}
impl<const N: usize, F> Future for ArrayJoin<N, F>
where
F: Future,
{
type Output = [F::Output; N];
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
for (i, (fut, res)) in this
.futures
.iter_mut()
.zip(this.results.iter_mut())
.enumerate()
{
if res.is_none() {
if let Poll::Ready(val) = fut.poll(cx) {
*res = Some(val);
*this.completed += 1;
}
}
}
if *this.completed == N {
let results = std::mem::take(this.results);
let array = results.map(Option::unwrap);
Poll::Ready(array)
} else {
Poll::Pending
}
}
}
6. 常见陷阱 & 调试技巧
6.1 忘记注册 Waker
// ❌ 永远挂起
impl Future for Broken {
fn poll(&mut self, _: &mut Context) -> Poll<()> {
Poll::Pending // 没有注册 Waker
}
}
// ✅ 正确
impl Future for Fixed {
fn poll(&mut self, cx: &mut Context) -> Poll<()> {
if self.ready {
Poll::Ready(())
} else {
self.register(cx.waker().clone());
Poll::Pending
}
}
}
6.2 工具链:gdb + tokio-console
# 编译时打开调试
RUSTFLAGS="--cfg tokio_unstable" cargo build --release
# 运行时
tokio-console --url http://localhost:6669
7. 总结:Poll 的 7 条黄金法则
| 法则 | 示例 |
|---|---|
| 幂等 | 多次 poll 同一结果 |
| Waker 契约 | Pending ⇒ 注册 |
| 内存不变 | Pin 保证地址 |
| Ready 即终点 | Ready 后不可再 poll |
| 状态完整 | 所有变量存 enum |
| 零分配组合 | 手写组合子 |
| 可观测性 | TraceFuture |
掌握 Poll 机制,你就拥有了 在 Rust 中设计任意异步抽象的能力——这是所有高级异步框架(Tokio、async-std、smol)的共同地基。
下一篇预告:我们将用 Poll 机制实现 无锁 MPSC 通道,敬请期待!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐




所有评论(0)