在这里插入图片描述

“不要调用我,我会调用你”——好莱坞原则在 Rust 异步世界的终极体现。


0. 引言:为什么需要再谈 Poll?

在已经了解了 Futureasync/awaitWaker 之后,为什么还要单独聚焦 Poll
因为 Poll 是 Rust 异步运行时唯一真正执行的动作;其余一切(状态机、Waker、Pin、调度器)都是为了让这个单一动作高效、安全且零成本。
本文将带你:

  1. 显微镜观察一次 poll() 调用在 CPU 上的真实路径;
  2. 亲手实现 可组合、零分配、零拷贝 的状态机;
  3. 基于 Poll 打造 可视化调试器,在线追踪 Future 状态;
  4. 总结一套 “状态机设计模式”,可直接应用于生产代码。

1. Poll 的语义:两个单词,四个承诺

pub enum Poll<T> {
    Ready(T),
    Pending,
}
承诺 违反后果
幂等性 同一 &mut Context 多次 poll 结果一致
不移动 Pin 保证内存地址不变
Waker 契约 Pending 必须注册/更新 Waker
Ready 即终点 返回 Ready 后永不再次 poll

违反任一承诺都会导致 UB死锁


2. 从零开始:一条指令的旅行

2.1 CPU 视角的 poll 调用

#[inline(never)]
fn trace_poll() {
    let fut = async {
        println!("👟 enter");
        yield_now().await;
        println!("👟 resume");
    };
    tokio::pin!(fut);

    let waker = noop_waker();
    let mut cx = Context::from_waker(&waker);

    loop {
        match fut.as_mut().poll(&mut cx) {
            Poll::Ready(()) => break,
            Poll::Pending => {
                // 在真实运行时这里会 park 线程,我们简化
                println!("🛑 parked");
            }
        }
    }
}

cargo asm 后关键片段(精简):

; 第一次 poll
call <example::async_block::{{closure}}>
; 第二次 poll(resume)
call <example::async_block::{{closure}}>

结论

  • 每次 poll() 对应一次 状态机函数的尾调用
  • 状态机函数是一个 可重入的巨型 switch
  • 所有局部变量被提升到 enum 字段 中,因此 零栈展开开销

3. 手写状态机:可组合的计时器

3.1 状态定义

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
    time::{Duration, Instant},
};

#[derive(Debug)]
enum TimerState {
    NotStarted(Duration),
    Waiting { deadline: Instant },
    Done,
}

pub struct Timer {
    state: TimerState,
}

impl Timer {
    pub fn new(dur: Duration) -> Self {
        Self {
            state: TimerState::NotStarted(dur),
        }
    }
}

3.2 状态机实现

impl Future for Timer {
    type Output = Instant;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            match &mut self.state {
                TimerState::NotStarted(dur) => {
                    let deadline = Instant::now() + *dur;
                    self.state = TimerState::Waiting { deadline };
                    println!("⏱️  Timer armed, deadline={:?}", deadline);
                }
                TimerState::Waiting { deadline } => {
                    if Instant::now() >= *deadline {
                        self.state = TimerState::Done;
                        println!("✅ Timer ready");
                        return Poll::Ready(*deadline);
                    }
                    // 注册定时器 Waker(简化:实际用 tokio::time)
                    cx.waker().wake_by_ref();
                    return Poll::Pending;
                }
                TimerState::Done => panic!("Timer polled after completion"),
            }
        }
    }
}

3.3 零拷贝组合:Timer + Map

#[pin_project]
struct Map<F, M> {
    #[pin]
    inner: F,
    map: Option<M>,
}

impl<F, M, T, U> Future for Map<F, M>
where
    F: Future<Output = T>,
    M: FnOnce(T) -> U,
{
    type Output = U;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        match this.inner.poll(cx) {
            Poll::Ready(val) => {
                let f = this.map.take().unwrap();
                Poll::Ready(f(val))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

#[tokio::test]
async fn timer_composition() {
    let timer = Timer::new(Duration::from_millis(10));
    let mapped = Map {
        inner: timer,
        map: Some(|t| t.elapsed()),
    };
    let elapsed = mapped.await;
    println!("Elapsed: {:?}", elapsed);
}

4. 可视化调试器:追踪 Future 生命周期

4.1 调试 Future 接口

use std::sync::atomic::{AtomicU64, Ordering};

#[derive(Clone)]
struct TraceFuture<F> {
    id: u64,
    inner: F,
}

static NEXT_ID: AtomicU64 = AtomicU64::new(0);

impl<F> TraceFuture<F> {
    fn new(inner: F) -> Self {
        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
        println!("🆔 Future {} created", id);
        Self { id, inner }
    }
}

impl<F: Future> Future for TraceFuture<F> {
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("📊 Future {} polled", self.id);
        let this = unsafe { self.get_unchecked_mut() };
        let res = unsafe { Pin::new_unchecked(&mut this.inner) }.poll(cx);
        match res {
            Poll::Ready(v) => println!("✅ Future {} ready", this.id),
            Poll::Pending => println!("⏸️  Future {} pending", this.id),
        }
        res
    }
}

4.2 宏化追踪

macro_rules! trace {
    ($expr:expr) => {{
        TraceFuture::new($expr)
    }};
}

#[tokio::main]
async fn main() {
    let f1 = trace!(Timer::new(Duration::from_millis(20)));
    let f2 = trace!(Timer::new(Duration::from_millis(30)));
    
    let (r1, r2) = tokio::join!(f1, f2);
    println!("Results: {:?}, {:?}", r1, r2);
}

输出示例:

🆔 Future 0 created
🆔 Future 1 created
📊 Future 0 polled
⏸️  Future 0 pending
📊 Future 1 polled
⏸️  Future 1 pending
...
✅ Future 0 ready
✅ Future 1 ready

5. 生产级模式:可取消、可合并的 Future

5.1 可取消 Future

use std::sync::Arc;
use tokio::sync::Notify;

#[pin_project]
struct Cancellable<F> {
    #[pin]
    inner: F,
    cancel: Arc<Notify>,
}

impl<F> Future for Cancellable<F>
where
    F: Future,
{
    type Output = Result<F::Output, Cancelled>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        
        // 检查取消信号
        if let Poll::Ready(()) = Pin::new(this.cancel).poll(cx) {
            return Poll::Ready(Err(Cancelled));
        }
        
        // 轮询内部
        this.inner.poll(cx).map(Ok)
    }
}

#[derive(Debug)]
struct Cancelled;

5.2 批量并行 Future(零分配)

/// 固定大小的批量执行,零堆分配
#[pin_project]
struct ArrayJoin<const N: usize, F> {
    #[pin]
    futures: [F; N],
    results: [Option<F::Output>; N],
    completed: usize,
}

impl<const N: usize, F> ArrayJoin<N, F>
where
    F: Future,
{
    fn new(futures: [F; N]) -> Self {
        Self {
            futures,
            results: [None; N],
            completed: 0,
        }
    }
}

impl<const N: usize, F> Future for ArrayJoin<N, F>
where
    F: Future,
{
    type Output = [F::Output; N];

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        
        for (i, (fut, res)) in this
            .futures
            .iter_mut()
            .zip(this.results.iter_mut())
            .enumerate()
        {
            if res.is_none() {
                if let Poll::Ready(val) = fut.poll(cx) {
                    *res = Some(val);
                    *this.completed += 1;
                }
            }
        }
        
        if *this.completed == N {
            let results = std::mem::take(this.results);
            let array = results.map(Option::unwrap);
            Poll::Ready(array)
        } else {
            Poll::Pending
        }
    }
}

6. 常见陷阱 & 调试技巧

6.1 忘记注册 Waker

// ❌ 永远挂起
impl Future for Broken {
    fn poll(&mut self, _: &mut Context) -> Poll<()> {
        Poll::Pending // 没有注册 Waker
    }
}

// ✅ 正确
impl Future for Fixed {
    fn poll(&mut self, cx: &mut Context) -> Poll<()> {
        if self.ready {
            Poll::Ready(())
        } else {
            self.register(cx.waker().clone());
            Poll::Pending
        }
    }
}

6.2 工具链:gdb + tokio-console

# 编译时打开调试
RUSTFLAGS="--cfg tokio_unstable" cargo build --release

# 运行时
tokio-console --url http://localhost:6669

7. 总结:Poll 的 7 条黄金法则

法则 示例
幂等 多次 poll 同一结果
Waker 契约 Pending ⇒ 注册
内存不变 Pin 保证地址
Ready 即终点 Ready 后不可再 poll
状态完整 所有变量存 enum
零分配组合 手写组合子
可观测性 TraceFuture

掌握 Poll 机制,你就拥有了 在 Rust 中设计任意异步抽象的能力——这是所有高级异步框架(Tokio、async-std、smol)的共同地基。

下一篇预告:我们将用 Poll 机制实现 无锁 MPSC 通道,敬请期待!
在这里插入图片描述

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐