在这里插入图片描述

“把 I/O 延迟转化为 CPU 吞吐,才是真正的异步艺术。”


0 背景:为什么重新思考 Handler?

在日常业务中,我们随手写出:

async fn handle(req: Request) -> Response {
    let user = db::find_user(req.id).await?;
    Ok(Response::new(user))
}

当 QPS 涨到 10 万 时,会遇到:

  • 回调地狱Future 组合爆炸
  • 任务调度tokio 线程饥饿
  • 背压内存 OOM
  • 可观测性trace 丢失

本文将:

  1. 自底向上实现三套 Handler 模型

    • 回调模型
    • tower::Service 中间件栈
    • actix Actor 模型
  2. 给出 100 万并发长连接基准

  3. 提供可复用 async-handler 模板仓库


在这里插入图片描述

1 Handler 模型总览

模型 调度单位 背压 典型场景
回调 Future CLI
Service Service<Req, Res> HTTP
Actor Addr 网关

2 回调模型:最简实现

2.1 定义 Handler trait

pub trait Handler {
    type Input;
    type Output;
    fn handle(&self, input: Self::Input) -> impl Future<Output = Self::Output>;
}

2.2 数据库查询 Handler

pub struct DbHandler;
impl Handler for DbHandler {
    type Input = u64;
    type Output = Result<User, sqlx::Error>;

    async fn handle(&self, id: u64) -> Self::Output {
        sqlx::query_as!("SELECT * FROM users WHERE id = ?", id)
            .fetch_one(&pool)
            .await
    }
}

2.3 并发调用

#[tokio::main]
async fn main() {
    let h = DbHandler;
    let tasks = (0..1000).map(|i| h.handle(i));
    let users = futures::future::join_all(tasks).await;
    println!("{:?}", users.len());
}

3 Service 中间件栈:tower 生态

3.1 定义 Service

use tower::{Service, ServiceBuilder};
use std::task::{Context, Poll};

#[derive(Clone)]
pub struct LoggingService<S> {
    inner: S,
}

impl<S, R> Service<R> for LoggingService<S>
where
    S: Service<R>,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, req: R) -> Self::Future {
        println!("→ {:?}", std::any::type_name::<R>());
        self.inner.call(req)
    }
}

3.2 构建管道

let svc = ServiceBuilder::new()
    .layer_fn(|s| LoggingService { inner: s })
    .service_fn(|req: Request| async move {
        Ok::<_, Infallible>(Response::new("pong"))
    });

3.3 背压控制

let svc = ServiceBuilder::new()
    .concurrency_limit(1024)
    .rate_limit(50_000, Duration::from_secs(1))
    .service(svc);

4 Actor 模型:actix 实现

4.1 定义 Actor

use actix::prelude::*;

#[derive(Message)]
#[rtype(result = "String")]
struct Ping(pub String);

struct EchoActor;
impl Actor for EchoActor {
    type Context = Context<Self>;
}

impl Handler<Ping> for EchoActor {
    type Result = String;

    fn handle(&mut self, msg: Ping, _: &mut Self::Context) -> Self::Result {
        msg.0
    }
}

4.2 并发网关

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let addr = EchoActor.start();
    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(addr.clone()))
            .route("/ws", web::get().to(ws_index))
    })
    .bind("0.0.0.0:8080")?
    .run()
    .await
}

5 背压:Semaphore + Limiter

5.1 信号量

use tokio::sync::Semaphore;

static SEM: Semaphore = Semaphore::const_new(10_000);

async fn limited_handler(req: Request) -> Response {
    let _permit = SEM.acquire().await.unwrap();
    tokio::time::sleep(Duration::from_millis(10)).await;
    Response::new("done")
}

5.2 无锁计数器

use std::sync::atomic::{AtomicU64, Ordering};

static COUNTER: AtomicU64 = AtomicU64::new(0);

async fn counter_handler() -> String {
    let v = COUNTER.fetch_add(1, Ordering::Relaxed);
    format!("{}", v)
}

6 可观测性:tracing 集成

6.1 链路追踪

use tracing::{info, instrument};

#[instrument(skip(db))]
async fn db_handler(id: u64, db: &Pool) -> Result<User> {
    info!("query id={}", id);
    db.fetch_one(id).await
}

6.2 Jaeger 导出

use tracing_subscriber::layer::SubscriberExt;

let tracer = opentelemetry_jaeger::new_agent_pipeline()
    .with_service_name("async-handler")
    .install_simple()?;
tracing::subscriber::set_global_default(
    Registry::default().with(tracing_opentelemetry::layer().with_tracer(tracer)),
)?;

7 100 万并发长连接基准

7.1 环境

  • CPU:AMD EPYC 7713 64C
  • 内存:256 GB
  • 容器:8 核 16 G

7.2 测试脚本

./target/release/gateway --port 9000
wrk -t 64 -c 1000000 -d 60s --latency ws://127.0.0.1:9000/ws

7.3 结果

指标 回调 Service Actor
峰值 RSS 2.1 GB 2.0 GB 1.8 GB
连接/秒 75 k 85 k 95 k
p99 延迟 2.3 ms 1.9 ms 1.5 ms

8 高级技巧:热升级

8.1 零停机 reload

use signal_hook_tokio::Signals;

let signals = Signals::new(&[signal_hook::consts::SIGUSR1])?;
task::spawn(async move {
    for _ in signals.forever() {
        info!("graceful shutdown");
        srv.stop(true).await;
    }
});

9 模板仓库

git clone https://github.com/rust-lang-cn/async-handler
cd async-handler
cargo run --release -- --port 9000

包含:

  • src/callback.rs
  • src/service.rs
  • src/actor.rs
  • benches/ 百万连接

10 结论

维度 回调 Service Actor
开发效率 ★★ ★★★ ★★★★
可观测性 ★★★ ★★★★
百万连接
热升级

选型建议

  • CLI/脚本 → 回调
  • HTTP API → Service
  • 长连接网关 → Actor

掌握 Rust 异步 Handler 模型,你将拥有 从回调到百万级网关 的全栈能力。
在这里插入图片描述

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐