Rust 异步 Handler 实现:从回调地狱到百万级 Actor 的架构跃迁
·

“把 I/O 延迟转化为 CPU 吞吐,才是真正的异步艺术。”
0 背景:为什么重新思考 Handler?
在日常业务中,我们随手写出:
async fn handle(req: Request) -> Response {
let user = db::find_user(req.id).await?;
Ok(Response::new(user))
}
当 QPS 涨到 10 万 时,会遇到:
- 回调地狱 → Future 组合爆炸
- 任务调度 → tokio 线程饥饿
- 背压 → 内存 OOM
- 可观测性 → trace 丢失
本文将:
-
自底向上实现三套 Handler 模型
- 回调模型
tower::Service中间件栈actixActor 模型
-
给出 100 万并发长连接基准
-
提供可复用
async-handler模板仓库

1 Handler 模型总览
| 模型 | 调度单位 | 背压 | 典型场景 |
|---|---|---|---|
| 回调 | Future | 无 | CLI |
| Service | Service<Req, Res> | 有 | HTTP |
| Actor | Addr | 有 | 网关 |
2 回调模型:最简实现
2.1 定义 Handler trait
pub trait Handler {
type Input;
type Output;
fn handle(&self, input: Self::Input) -> impl Future<Output = Self::Output>;
}
2.2 数据库查询 Handler
pub struct DbHandler;
impl Handler for DbHandler {
type Input = u64;
type Output = Result<User, sqlx::Error>;
async fn handle(&self, id: u64) -> Self::Output {
sqlx::query_as!("SELECT * FROM users WHERE id = ?", id)
.fetch_one(&pool)
.await
}
}
2.3 并发调用
#[tokio::main]
async fn main() {
let h = DbHandler;
let tasks = (0..1000).map(|i| h.handle(i));
let users = futures::future::join_all(tasks).await;
println!("{:?}", users.len());
}
3 Service 中间件栈:tower 生态
3.1 定义 Service
use tower::{Service, ServiceBuilder};
use std::task::{Context, Poll};
#[derive(Clone)]
pub struct LoggingService<S> {
inner: S,
}
impl<S, R> Service<R> for LoggingService<S>
where
S: Service<R>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: R) -> Self::Future {
println!("→ {:?}", std::any::type_name::<R>());
self.inner.call(req)
}
}
3.2 构建管道
let svc = ServiceBuilder::new()
.layer_fn(|s| LoggingService { inner: s })
.service_fn(|req: Request| async move {
Ok::<_, Infallible>(Response::new("pong"))
});
3.3 背压控制
let svc = ServiceBuilder::new()
.concurrency_limit(1024)
.rate_limit(50_000, Duration::from_secs(1))
.service(svc);
4 Actor 模型:actix 实现
4.1 定义 Actor
use actix::prelude::*;
#[derive(Message)]
#[rtype(result = "String")]
struct Ping(pub String);
struct EchoActor;
impl Actor for EchoActor {
type Context = Context<Self>;
}
impl Handler<Ping> for EchoActor {
type Result = String;
fn handle(&mut self, msg: Ping, _: &mut Self::Context) -> Self::Result {
msg.0
}
}
4.2 并发网关
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let addr = EchoActor.start();
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(addr.clone()))
.route("/ws", web::get().to(ws_index))
})
.bind("0.0.0.0:8080")?
.run()
.await
}
5 背压:Semaphore + Limiter
5.1 信号量
use tokio::sync::Semaphore;
static SEM: Semaphore = Semaphore::const_new(10_000);
async fn limited_handler(req: Request) -> Response {
let _permit = SEM.acquire().await.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
Response::new("done")
}
5.2 无锁计数器
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
async fn counter_handler() -> String {
let v = COUNTER.fetch_add(1, Ordering::Relaxed);
format!("{}", v)
}
6 可观测性:tracing 集成
6.1 链路追踪
use tracing::{info, instrument};
#[instrument(skip(db))]
async fn db_handler(id: u64, db: &Pool) -> Result<User> {
info!("query id={}", id);
db.fetch_one(id).await
}
6.2 Jaeger 导出
use tracing_subscriber::layer::SubscriberExt;
let tracer = opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("async-handler")
.install_simple()?;
tracing::subscriber::set_global_default(
Registry::default().with(tracing_opentelemetry::layer().with_tracer(tracer)),
)?;
7 100 万并发长连接基准
7.1 环境
- CPU:AMD EPYC 7713 64C
- 内存:256 GB
- 容器:8 核 16 G
7.2 测试脚本
./target/release/gateway --port 9000
wrk -t 64 -c 1000000 -d 60s --latency ws://127.0.0.1:9000/ws
7.3 结果
| 指标 | 回调 | Service | Actor |
|---|---|---|---|
| 峰值 RSS | 2.1 GB | 2.0 GB | 1.8 GB |
| 连接/秒 | 75 k | 85 k | 95 k |
| p99 延迟 | 2.3 ms | 1.9 ms | 1.5 ms |
8 高级技巧:热升级
8.1 零停机 reload
use signal_hook_tokio::Signals;
let signals = Signals::new(&[signal_hook::consts::SIGUSR1])?;
task::spawn(async move {
for _ in signals.forever() {
info!("graceful shutdown");
srv.stop(true).await;
}
});
9 模板仓库
git clone https://github.com/rust-lang-cn/async-handler
cd async-handler
cargo run --release -- --port 9000
包含:
src/callback.rssrc/service.rssrc/actor.rsbenches/百万连接
10 结论
| 维度 | 回调 | Service | Actor |
|---|---|---|---|
| 开发效率 | ★★ | ★★★ | ★★★★ |
| 可观测性 | ★ | ★★★ | ★★★★ |
| 百万连接 | ✅ | ✅ | ✅ |
| 热升级 | ❌ | ✅ | ✅ |
选型建议:
- CLI/脚本 → 回调
- HTTP API → Service
- 长连接网关 → Actor
掌握 Rust 异步 Handler 模型,你将拥有 从回调到百万级网关 的全栈能力。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)