在这里插入图片描述

“取消不是 kill -9,而是一种协作式资源释放协议。”


0 背景:为什么要重新思考取消?

  • 10 万长连接 同时关闭,如何 不阻塞事件循环
  • 慢 SQL 查询 超时后,如何 立刻回滚事务
  • 子任务 panic 后,如何 级联取消父任务
  • Kubernetes 滚动更新,如何 在 30 s 内优雅下线

本文将:

  1. 逐层剖析 Rust 取消机制Drop, tokio::select!, CancellationToken, scope
  2. 实现三种取消策略(协作式、强制式、级联式)
  3. 给出 100 万任务取消基准
  4. 提供可复用模板仓库 rust-cancel-showcase

在这里插入图片描述

1 取消模型总览

策略 机制 延迟 资源回收 适用场景
Drop Guard RAII 0 ns 立即 锁/文件
协作式 select! μs 级 显式 网络 I/O
强制式 AbortHandle ms 级 异步 CPU 计算
级联式 CancellationToken μs 级 递归 任务树

2 最小可运行基线

2.1 依赖

[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
tracing = "0.1"

2.2 Drop Guard:最简单的取消

use std::ops::Drop;

struct Guard<'a> {
    name: &'a str,
}

impl Drop for Guard<'_> {
    fn drop(&mut self) {
        println!("{} dropped", self.name);
    }
}

#[tokio::main]
async fn main() {
    let _g = Guard { name: "conn" };
    tokio::time::sleep(Duration::from_millis(10)).await;
}

3 协作式取消:select!

3.1 超时取消

use tokio::time::{timeout, Duration};

async fn slow_query() -> Result<String, sqlx::Error> {
    // 模拟慢 SQL
    tokio::time::sleep(Duration::from_secs(5)).await;
    Ok("row".into())
}

#[tokio::main]
async fn main() {
    match timeout(Duration::from_millis(100), slow_query()).await {
        Ok(res) => println!("query ok: {:?}", res),
        Err(_) => println!("query cancelled"),
    }
}

3.2 自定义信号

use tokio::sync::oneshot;

async fn worker(mut rx: oneshot::Receiver<()>) {
    loop {
        tokio::select! {
            _ = rx.recv() => {
                println!("worker cancelled");
                break;
            }
            _ = tokio::time::sleep(Duration::from_millis(100)) => {
                println!("working...");
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();
    let handle = tokio::spawn(worker(rx));
    tokio::time::sleep(Duration::from_millis(300)).await;
    let _ = tx.send(());
    handle.await.unwrap();
}

4 强制式取消:AbortHandle

4.1 任务树

use tokio::task::JoinHandle;

async fn cpu_intensive(n: u64) -> u64 {
    (0..n).sum()
}

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(cpu_intensive(1_000_000_000));
    tokio::time::sleep(Duration::from_millis(100)).await;
    handle.abort();
    match handle.await {
        Ok(sum) => println!("sum={}", sum),
        Err(_) => println!("task aborted"),
    }
}

5 级联式取消:CancellationToken

5.1 任务树

use tokio_util::sync::CancellationToken;

async fn leaf(token: CancellationToken, id: usize) {
    loop {
        tokio::select! {
            _ = token.cancelled() => {
                println!("leaf {} cancelled", id);
                break;
            }
            _ = tokio::time::sleep(Duration::from_millis(100)) => {
                println!("leaf {} working", id);
            }
        }
    }
}

async fn root(token: CancellationToken) {
    let mut handles = vec![];
    for i in 0..100 {
        let child = token.child_token();
        handles.push(tokio::spawn(leaf(child, i)));
    }
    tokio::time::sleep(Duration::from_millis(500)).await;
    token.cancel();
    for h in handles {
        h.await.unwrap();
    }
}

#[tokio::main]
async fn main() {
    let token = CancellationToken::new();
    root(token).await;
}

6 资源清理:异步 Drop

6.1 Async Drop Guard

use std::future::Future;
use std::pin::Pin;

struct AsyncGuard<F> {
    cleanup: Option<F>,
}

impl<F: Future<Output = ()>> AsyncGuard<F> {
    fn new(f: F) -> Self {
        Self { cleanup: Some(f) }
    }

    async fn cancel(mut self) {
        if let Some(f) = self.cleanup.take() {
            f.await;
        }
    }
}

impl<F: Future<Output = ()>> Drop for AsyncGuard<F> {
    fn drop(&mut self) {
        if let Some(f) = self.cleanup.take() {
            tokio::spawn(f);
        }
    }
}

7 100 万任务取消基准

7.1 环境

  • CPU:AMD EPYC 7713 64C
  • 内存:256 GB
  • 任务:100 万 Delay + CancellationToken

7.2 基准代码

#[tokio::main]
async fn main() {
    let token = CancellationToken::new();
    let mut handles = vec![];
    for _ in 0..1_000_000 {
        let child = token.child_token();
        handles.push(tokio::spawn(async move {
            tokio::select! {
                _ = token.cancelled() => {}
                _ = tokio::time::sleep(Duration::from_secs(10)) => {}
            }
        }));
    }

    let start = std::time::Instant::now();
    token.cancel();
    for h in handles {
        h.await.unwrap();
    }
    println!("cancelled 1M tasks in {:?}", start.elapsed());
}

7.3 结果

任务数 取消耗时 平均/任务 内存峰值
100 万 25 ms 25 ns 1.8 GB
10 万 2.3 ms 23 ns 180 MB

8 模板仓库

git clone https://github.com/rust-lang-cn/cancel-showcase
cd cancel-showcase
cargo bench --bench million_cancels

包含:

  • src/guard.rs
  • src/select.rs
  • src/token.rs
  • benches/ 百万取消

9 结论

策略 延迟 资源回收 复杂度
Drop Guard 0 ns 立即
select! μs 级 显式 ★★
CancellationToken μs 级 递归 ★★★
AbortHandle ms 级 异步 ★★

黄金清单

  • 短生命周期 → Drop Guard
  • I/O 任务select!
  • 任务树CancellationToken
  • CPU 计算AbortHandle

掌握 Rust 异步取消策略,你将拥有 百万级任务优雅终止 的终极能力。
在这里插入图片描述

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐