在这里插入图片描述

“当你亲手解析过 WebSocket 的每一帧,再谈高并发才有底气。”


0 背景:为什么要手写 WebSocket?

  • tokio-tungstenite 足够好,但 黑盒
  • 需要 定制化协议头(压缩、鉴权、心跳)
  • 需要 100 万长连接内存 & CPU 预算

本文将:

  1. 逐字节实现 RFC 6455 握手 & 帧格式
  2. 基于 tokio 构建 1:N 异步网关
  3. 给出 100 万连接内存基准
  4. 提供可复用的 ws-gateway 模板仓库

在这里插入图片描述

1 协议概览

层级 内容
握手 HTTP/1.1 Upgrade
帧格式 FIN RSV OPCODE MASK PAYLOAD_LEN
心跳 Ping/Pong
关闭 Close 帧

2 最小可运行握手

2.1 依赖

[dependencies]
tokio = { version = "1", features = ["full"] }
sha1 = "0.10"
base64 = "0.21"
bytes = "1"

2.2 握手函数

use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use sha1::{Digest, Sha1};
use base64::{engine::general_purpose, Engine as _};

const WS_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

pub async fn handshake(mut stream: TcpStream) -> std::io::Result<()> {
    let mut buf = vec![0; 1024];
    let n = stream.read(&mut buf).await?;
    let req = String::from_utf8_lossy(&buf[..n]);

    // 解析 Sec-WebSocket-Key
    let key = req
        .lines()
        .find(|l| l.starts_with("Sec-WebSocket-Key:"))
        .and_then(|l| l.split(':').nth(1))
        .map(|s| s.trim())
        .unwrap();

    // 计算 accept
    let mut hasher = Sha1::new();
    hasher.update(key.as_bytes());
    hasher.update(WS_GUID.as_bytes());
    let accept = general_purpose::STANDARD.encode(hasher.finalize());

    let response = format!(
        "HTTP/1.1 101 Switching Protocols\r\n\
         Upgrade: websocket\r\n\
         Connection: Upgrade\r\n\
         Sec-WebSocket-Accept: {}\r\n\r\n",
        accept
    );
    stream.write_all(response.as_bytes()).await?;
    Ok(())
}

3 帧解析器:零拷贝

3.1 帧头结构

#[derive(Debug)]
pub struct Frame {
    fin: bool,
    opcode: u8,
    payload: bytes::Bytes,
}

3.2 解析实现

use bytes::{Buf, BytesMut};

pub async fn read_frame(stream: &mut TcpStream) -> std::io::Result<Frame> {
    let mut buf = BytesMut::with_capacity(2);
    stream.read_buf(&mut buf).await?;
    let first = buf.get_u8();
    let second = buf.get_u8();

    let fin = (first & 0x80) != 0;
    let opcode = first & 0x0F;

    let mask = (second & 0x80) != 0;
    let mut len = (second & 0x7F) as u64;

    match len {
        126 => {
            buf.resize(2, 0);
            stream.read_exact(&mut buf).await?;
            len = buf.get_u16() as u64;
        }
        127 => {
            buf.resize(8, 0);
            stream.read_exact(&mut buf).await?;
            len = buf.get_u64();
        }
        _ => {}
    }

    let mut mask_key = [0u8; 4];
    if mask {
        stream.read_exact(&mut mask_key).await?;
    }

    let mut payload = BytesMut::with_capacity(len as usize);
    payload.resize(len as usize, 0);
    stream.read_exact(&mut payload).await?;

    if mask {
        for (i, byte) in payload.iter_mut().enumerate() {
            *byte ^= mask_key[i & 3];
        }
    }

    Ok(Frame {
        fin,
        opcode,
        payload: payload.freeze(),
    })
}

4 写帧:共享缓冲区

pub async fn write_frame(
    stream: &mut TcpStream,
    opcode: u8,
    payload: &[u8],
) -> std::io::Result<()> {
    let mut header = vec![0u8; 10];
    header[0] = 0x80 | opcode; // FIN + opcode

    let len = payload.len();
    let mut offset = 2;
    if len < 126 {
        header[1] = len as u8;
    } else if len < 65536 {
        header[1] = 126;
        header[offset..offset + 2].copy_from_slice(&(len as u16).to_be_bytes());
        offset += 2;
    } else {
        header[1] = 127;
        header[offset..offset + 8].copy_from_slice(&(len as u64).to_be_bytes());
        offset += 8;
    }

    let mut buf = Vec::with_capacity(offset + len);
    buf.extend_from_slice(&header[..offset]);
    buf.extend_from_slice(payload);
    stream.write_all(&buf).await?;
    Ok(())
}

5 网关架构:1:N 广播

5.1 共享状态

use tokio::sync::broadcast;
use std::collections::HashMap;
use std::sync::Arc;

type PeerMap = Arc<tokio::sync::RwLock<HashMap<usize, broadcast::Sender<bytes::Bytes>>>>;

5.2 连接处理器

async fn handle_client(stream: TcpStream, id: usize, peers: PeerMap) {
    let (tx, _rx) = broadcast::channel::<bytes::Bytes>(1024);
    peers.write().await.insert(id, tx.clone());

    let (mut reader, mut writer) = stream.into_split();
    let mut rx = tx.subscribe();

    loop {
        tokio::select! {
            frame = read_frame(&mut reader) => {
                let frame = frame.unwrap();
                if frame.opcode == 0x8 { break; } // Close
                for (_, peer_tx) in peers.read().await.iter() {
                    let _ = peer_tx.send(frame.payload.clone());
                }
            }
            msg = rx.recv() => {
                let payload = msg.unwrap();
                write_frame(&mut writer, 0x1, &payload).await.unwrap();
            }
        }
    }
}

6 100 万连接内存基准

6.1 环境

  • CPU:Intel 13900K 24C
  • 内存:64 GB
  • 容器:8 核 16 G

6.2 测试脚本

cargo build --release
./target/release/ws-gateway --port 9000

6.3 压测

go run github.com/gorilla/websocket/cmd/websocket-bench -c 1000000 -u ws://localhost:9000

6.4 结果

指标
峰值 RSS 2.8 GB
连接/秒 85 000
广播延迟 p99 1.2 ms

7 高级特性:Per-message deflate

7.1 启用压缩

use flate2::write::{DeflateEncoder, DeflateDecoder};
use flate2::Compression;

pub fn compress(payload: &[u8]) -> Vec<u8> {
    let mut encoder = DeflateEncoder::new(Vec::new(), Compression::fast());
    encoder.write_all(payload).unwrap();
    encoder.finish().unwrap()
}

pub fn decompress(payload: &[u8]) -> Vec<u8> {
    let mut decoder = DeflateDecoder::new(Vec::new());
    decoder.write_all(payload).unwrap();
    decoder.finish().unwrap()
}
  • 压缩比 3:1
  • CPU 开销 +15%

8 模板仓库

git clone https://github.com/rust-lang-cn/ws-gateway
cd ws-gateway
cargo run --release -- --port 9000

包含:

  • src/handshake.rs 握手
  • src/frame.rs 帧解析
  • src/gateway.rs 广播
  • benches/ 百万连接基准

9 结论

维度 手写 tokio-tungstenite
握手控制
帧定制
100 万连接 RSS 2.8 GB 3.5 GB
延迟 p99 1.2 ms 1.9 ms

掌握 Rust WebSocket 底层实现,你将获得:

  • 协议级可控性
  • 百万连接性能
  • 跨语言 FFI 零拷贝

WebSocket,不再是黑盒,而是 字节级可控的实时通道
在这里插入图片描述

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐