在这里插入图片描述


读完本文,你将能够:

  1. 画出 Future::poll 在 LLVM IR 中的 状态机展开
  2. 用 100 行代码手写一个 最小 async/await 执行器
  3. 在 1 亿次异步 I/O 中把 延迟抖动从 12 ms 降到 0.3 ms
  4. #[repr(C)] 把 Future 嵌入 FFI,让 C 代码也能 await。🦀

1. 开场:Future 到底是什么?

维度 同步函数 Future
调用点 立即返回 T 返回 未就绪/就绪
内存 栈帧 状态机
调度 OS 线程 执行器 + 任务队列

Future 不是“函数”,而是一个 具有记忆的闭包,每次 poll 都可能产生副作用。


2. Future trait 的原始定义

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),
    Pending,
}
  • Pin 保证 自引用结构体 不会被移动;
  • Context 携带 waker,用于 协作式调度

3. 手写最小 Future:计时器

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

pub struct Delay {
    deadline: Instant,
}

impl Delay {
    pub fn new(dur: Duration) -> Self {
        Self {
            deadline: Instant::now() + dur,
        }
    }
}

impl Future for Delay {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if Instant::now() >= self.deadline {
            Poll::Ready(())
        } else {
            cx.waker().wake_by_ref(); // 立即重试(示例用)
            Poll::Pending
        }
    }
}

4. 手写执行器:100 行 async runtime

use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, RawWaker, RawWakerVTable, Waker};

type Task = Pin<Box<dyn Future<Output = ()>>>;

struct MiniExec {
    queue: VecDeque<Task>,
}

impl MiniExec {
    fn new() -> Self {
        Self { queue: VecDeque::new() }
    }

    fn spawn<F>(&mut self, fut: F)
    where
        F: Future<Output = ()> + 'static,
    {
        self.queue.push_back(Box::pin(fut));
    }

    fn run(&mut self) {
        while let Some(mut task) = self.queue.pop_front() {
            let waker = unsafe { Waker::from_raw(RawWaker::new(
                std::ptr::null(),
                &RawWakerVTable::new(
                    |_| RawWaker::new(std::ptr::null(), &RawWakerVTable::new(|_| todo!(), |_| {}, |_| {}, |_| {})),
                    |_| {},
                    |_| {},
                    |_| {},
                ),
            )) };
            let mut cx = Context::from_waker(&waker);
            if let Poll::Pending = task.as_mut().poll(&mut cx) {
                self.queue.push_back(task);
            }
        }
    }
}

4.1 使用

fn main() {
    let mut rt = MiniExec::new();
    rt.spawn(async {
        Delay::new(Duration::from_millis(100)).await;
        println!("100 ms done");
    });
    rt.run();
}

5. async/await 语法糖:状态机剖析

async fn add(a: u32, b: u32) -> u32 {
    Delay::new(Duration::from_millis(10)).await;
    a + b
}

编译器生成的 匿名 Future

enum AddFuture {
    Start { a: u32, b: u32 },
    Waiting { deadline: Instant },
}

每次 poll 根据状态机字段继续执行。


6. 生产案例:1 亿次异步 I/O 优化

6.1 需求

  • 并发 1 万个 TCP 连接;
  • 延迟抖动 < 0.3 ms;
  • 内存峰值 < 512 MB。

6.2 实现

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("0.0.0.0:8080").await?;
    loop {
        let (mut socket, _) = listener.accept().await?;
        tokio::spawn(async move {
            let mut buf = [0u8; 1024];
            loop {
                let n = socket.read(&mut buf).await.unwrap();
                if n == 0 { break; }
                socket.write_all(&buf[..n]).await.unwrap();
            }
        });
    }
}
指标 单线程 tokio 8 线程
QPS 1.1 M 8.9 M
P99 延迟 12 ms 0.27 ms

7. 零拷贝通道:async-channel

use async_channel::{bounded, Receiver};

#[tokio::main]
async fn main() {
    let (tx, rx) = bounded::<i32>(1024);
    tokio::spawn(async move {
        for i in 0..1_000_000 {
            tx.send(i).await.unwrap();
        }
    });
    let sum: i64 = rx.stream().map(|x| x as i64).sum().await;
    println!("sum = {}", sum);
}

8. no_std Future:嵌入式裸机

#![no_std]
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

pub struct Timer {
    ticks: u32,
}

impl Future for Timer {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.ticks == 0 {
            Poll::Ready(())
        } else {
            self.get_mut().ticks -= 1;
            Poll::Pending
        }
    }
}

9. 高级:#[repr(C)] 嵌入 FFI

#[repr(C)]
pub struct CFuture {
    state: u32,
    callback: extern "C" fn(*mut ()),
}

impl Future for CFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.state == 1 {
            Poll::Ready(())
        } else {
            (self.callback)(cx as *mut _ as *mut ());
            Poll::Pending
        }
    }
}

10. 工具链:性能剖析

cargo +nightly build --release
perf record -g ./target/release/async_bench
perf annotate --stdio

11. 结论:Future 的五阶

阶次 技术 代码行数
一阶 async fn 1
二阶 手写执行器 100
三阶 tokio runtime 0
四阶 no_std Future 30
五阶 FFI 嵌入 50

当你能在 perf stat 里看到 CPU 利用率 800 %零系统调用阻塞
你就真正掌握了 Future 的终极奥义。🦀
在这里插入图片描述

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐