io_uring原理与实践
io_uring原理与实践
1.引言
在本篇博客中,我们将从基础原理出发,带你了解io_uring的结构和工作流程,并最终通过搭建一个 Echo TCP 服务器,把理论与实践结合,让你在动手中真正理解异步 I/O 的威力。
2.五种I/O模型
所有的I/O,都会有以下两个阶段:
- 数据准备阶段。
- 数据拷贝阶段。
因为在这两个阶段上,有不同的行为,所以就出现了多种I/O模型。
Linux中,有5中I/O模型:
- 阻塞I/O: 调用read函数之后,线程就卡主了,一直等到数据到来。数据到来后,还需要等内核把数据拷贝到用户进程缓冲区。在等过过程中,线程什么也干不了。
- 非阻塞I/O: 调用read函数之后,如果没有数据,就会立即返回。因为不知道数据什么时候准备好,所以一种方案是:线程不断询问内核数据是否准备就绪。这么做的不好之处在于,消耗CPU。如果数据准备就绪了,内核会把数据拷贝到用户态的进程空间,但是,在拷贝的过程中,依然会阻塞线程。另一种告诉线程数据就绪的方案就是事件驱动的方式。
- I/O多路复用: 允许一个线程监听成千上万个fd,比如epoll。
- 信号驱动I/O: 基本不用。
- 异步I/O: 调用read函数之后,立即返回。线程可以去做其他的事情,内核将数据拷贝到用户空间后,会发信号通知该线程,全程都不会阻塞线程。

上图是一个阻塞的I/O调用。在等待数据就绪阶段和等待内核将数据拷贝到用户空间阶段,线程一直被挂起等待。
下面,我们对比一下非阻塞I/O。

通过以上两幅图,我们可以看出阻塞I/O和非阻塞I/O的共同点与区别:
- 区别:数据未准备就绪时,阻塞I/O挂起等待,而非阻塞I/O立即返回。
- 共同点:数据准备就绪后,都需要
等待内核将数据拷贝到用户空间。
总而言之,阻塞与非阻塞,描述的是调用会不会立即返回;同步与异步,描述的是是否需要等待内核将数据拷贝到用户空间。
所以,上述的5中I/O模型中,前四种都是同步I/O。下面要介绍的io_uring,是一种异步I/O。
3.io_uring的基本原理

io_uring 主要组件有提交队列SQ、SQE数组和完成队列CQ。这三大组件都是在内存映射段中创建,通过内存映射技术,避免用户态和内核态之间的数据拷贝。
- 提交队列 (Submission Queue, SQ) :是一个
环形队列,它不直接存储SQE任务本身,而是存SQE的索引。 - SQE 提交队列项数组(SQE Array):和SQ配套使用,真正存储任务的地方。
- 完成队列(CQ / Completion Queue):CQ也是一个
环形队列,它直接存储CQE结果本身。
SQE的结构:
struct io_uring_sqe {
__u8 opcode; // 操作类型:IORING_OP_ACCEPT / READ / WRITE
__u8 flags;
__u16 ioprio;
__s32 fd; // 要操作的文件描述符(socket/文件)
__u64 off; // 文件偏移(socket用不到)
__u64 addr; // 数据缓冲区地址(读/写时用)
__u32 len; // 数据长度
__u32 rw_flags;
__u64 user_data; // 你自己的标记,内核原封不动还给你
};
我们把要做的操作(读/写/ACCEPT)填进SQE,然后把它的索引放到SQ队列里,内核通过SQ队列里的所以就可以找到它,然后执行。
CQE的结构:
struct io_uring_cqe {
__u64 user_data; // 从 SQE 里原封不动带回来的标记
__s32 res; // 执行结果:字节数 / 新 fd / 错误码
__u32 flags;
};
为什么需要内存映射:
传统的系统调用每次读写都要进行用户态到内核态的来回切换,而且还会在用户空间和内核空间之间来回拷贝数据。这样频繁的切换和拷贝开销大,通过内存映射技术,用户和内核共享同一段内存,双方都可以直接访问,不需要每次都调用系统调用。
4.liburing常用接口介绍
liburing 是 Linux 官方提供的用户态库,封装了内核的 io_uring 接口,简化了提交、完成、管理队列的操作。
int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags)
- 功能:初始化一个io_uring实例,并在内存映射段建立
提交队列(SQ)、完成队列(CQ)和SQE数组。 - entries:指定提交队列的大小,决定了一次能处理多少个请求。
- ring:指向 struct io_uring 结构体的指针,初始化成功后,该结构体将保存队列的元数据和共享内存映射信息。
- flags:标志位,通常置为0。
- 返回值:成功返回0,失败返回错误码。
void io_uring_queue_exit(struct io_uring *ring)
- 功能:清理并销毁io_uring实例,释放资源。
- ring:要被清理的io_uring实例。
struct io_uring_sqe io_uring_get_sqe(struct io_uring *ring)
- 功能:获取一个空的或者说空闲的SQE(任务单),但是并不填表。
- ring:说明要从哪一个io_uring中获取SQE。
- 返回值:成功则返回一个指向
io_uring_sqe的结构体指针,失败(队列满了)则返回空。
void io_uring_prep_accept(struct io_uring_sqe *sqe, int sockfd, struct sockaddr *addr, socklen_t addrlen, int flags)
- 功能:填写
任务单SQE,具体是填写一个用来处理网络连接的任务单,后续通过其他函数交给内核。 - sqe:任务单指针。
- sockfd:监听描述符listen_fd。
- addr:用于存放客户端地址信息的结构体指针。
- flags:标志位,通常置为0。
void io_uring_prep_read(struct io_uring_sqe *sqe, int fd, void *buf, unsigned nbytes, off_t offset)
- 功能:填写读事件的任务单,后续由其他函数来提交给内核。
- seq:任务单指针。
- fd:要读取的文件描述符。
- buf:存放读取数据的用户空间缓冲区指针。
- nbytes:期望读取的字节数。
- offset:文件内的读取偏移量。
void io_uring_prep_write(struct io_uring_sqe *sqe, int fd, const void *buf, unsigned nbytes, off_t offset)
- 功能:填写写事件的任务单,后续由其他函数来提交给内核。
- sqe:任务单指针。
- fd:要写入的文件描述符。
- buf:写入的位置。
- nbytes:写入的字节数。
- offset:文件内部的写入偏移量。
void io_uring_sqe_set_data(struct io_uring_sqe *sqe, void *data)
- 功能:将用户自定义的指针绑定到SQE上,内核会原封不动的把这份数据写入到CQE,用户拿到之后就可以知道它关联的数据。
- seq:关联的SQE。
- data:任意类型的用户数据指针。
int io_uring_submit(struct io_uring *ring)
- 功能:将已经写好的SQE任务单批量提交给内核,也就是放入到提交队列SQ。
- ring:指明哪个io_uring实例。
- 返回值:成功返回实际提交给内核的SQE数量,失败返回负的错误码。
int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr)
- 功能:从完成队列中取出一个CQE,如果完成队列为空,就会阻塞等待。
- ring:指明是哪个io_uring实例。
- cqe_ptr:二级指针,用来接收内核返回的CQE地址。
- 返回值:成功返回0,失败返回负的错误码。
void io_uring_cqe_seen(struct io_uring *ring, struct io_uring_cqe *cqe)
- 功能:告诉内核,这个CQE已经被处理完毕,可以清理或者复用。
- ring:io_uring的实例。
- cqe:告诉哪一个CQE已经被处理完毕。
void *io_uring_cqe_get_data(const struct io_uring_cqe *cqe)
- 功能:拿回之前通过io_uring_set_data设置进去的用户数据指针。
- cqe:指明哪一个CQE,如果事件完成后,内核会原封不动的把我们设置进去的数据指针保存到CQE中,这样我们就可以通过这个CQE拿到。
- 返回值:通过io_uring_set_data设置进去的用户数据指针。
以上的函数,结合上面的数据流向图理解,就很清晰了。
5.io_uring实现TCP服务器
实现的是一个单线程的Echo服务器。
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <unistd.h>
#include <liburing.h>
#define PORT 8080
#define BACKLOG 128
#define MAXENTRIES 128
#define BUF_SIZE 1024
enum event_type {
EVENT_ACCEPT,
EVENT_READ,
EVENT_WRITE
};
struct conn_data {
enum event_type type;
int fd;
char buf[BUF_SIZE];
};
int create_listen_socket() {
int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (listen_fd < 0) {
printf("[%s:%d]create socket failed!\n", __func__, __LINE__);
return -1;
}
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(PORT);
if (bind(listen_fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
printf("[%s:%d]bind failed!\n", __func__, __LINE__);
close(listen_fd);
return -1;
}
if (listen(listen_fd, BACKLOG) == -1) {
printf("[%s:%d]listen failed!\n", __func__, __LINE__);
close(listen_fd);
return -1;
}
// printf("[%s:%d]begin listen...\n", __func__, __LINE__);
return listen_fd;
}
void submit_accept(struct io_uring *ring, int listen_fd) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) return;
struct conn_data *accept_data = (struct conn_data *)malloc(sizeof(struct conn_data));
accept_data->fd = listen_fd;
accept_data->type = EVENT_ACCEPT;
io_uring_prep_accept(sqe, listen_fd, NULL, NULL, 0);
io_uring_sqe_set_data(sqe, accept_data);
io_uring_submit(ring);
}
void submit_read(struct io_uring *ring, int client_fd) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) return;
struct conn_data *data = (struct conn_data *)malloc(sizeof(struct conn_data));
data->type = EVENT_READ;
data->fd = client_fd;
io_uring_prep_read(sqe, client_fd, data->buf, BUF_SIZE, 0);
io_uring_sqe_set_data(sqe, data);
io_uring_submit(ring);
}
void submit_write(struct io_uring *ring, struct conn_data *data, int nbytes) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
if (!sqe) return;
data->type = EVENT_WRITE;
io_uring_prep_write(sqe, data->fd, data->buf, nbytes, 0);
io_uring_sqe_set_data(sqe, data);
io_uring_submit(ring);
}
int main() {
int listen_fd = create_listen_socket();
if (listen_fd == -1) return -1;
struct io_uring ring;
int ret = io_uring_queue_init(MAXENTRIES, &ring, 0);
if (ret != 0) {
close(listen_fd);
printf("[%s:%d]io_uring_queue_init faild!\n", __func__, __LINE__);
return -1;
}
submit_accept(&ring, listen_fd);
while (1) {
struct io_uring_cqe *cqe;
ret = io_uring_wait_cqe(&ring, &cqe);
if (ret != 0) {
printf("[%s:%d]wait faild!\n", __func__, __LINE__);
break;
}
int res = cqe->res;
struct conn_data *data = (struct conn_data *)io_uring_cqe_get_data(cqe);
if (data->type == EVENT_ACCEPT) {
if (res >= 0) {
// printf("[%s:%d]New client fd = %d\n", __func__, __LINE__, res);
submit_read(&ring, res);
}
submit_accept(&ring, listen_fd);
}else if (data->type == EVENT_READ) {
if (res > 0) {
// printf("[%s:%d]recv[fd=%d]: %s\n", __func__, __LINE__, data->fd, data->buf);
submit_write(&ring, data, res);
}else {
close(data->fd);
// printf("[%s:%d][fd=%d]connect close!\n", __func__, __LINE__, data->fd);
free(data);
}
}else if (data->type == EVENT_WRITE) {
if (res > 0) submit_read(&ring, data->fd);
else close(data->fd);
free(data);
}
io_uring_cqe_seen(&ring, cqe);
}
close(listen_fd);
io_uring_queue_exit(&ring);
return 0;
}
6.性能测试与对比
下面是我自己测的一组io_uring搭建的TCP服务器和epoll搭建的TCP服务器,都是单线程的,后面附上epoll代码和测试代码。
测试请求数量固定为50万。
| 并发数 | 模型 | 耗时 | 性能指标 |
|---|---|---|---|
| 1000 | epoll | 155.40 us | 6435.2 |
| 1000 | io_uring | 312.87 us | 3196.22 |
| 3000 | epoll | 475.84 us | 2101.55 |
| 3000 | io_uring | 1040.15 us | 961.4 |
| 5000 | epoll | 936.32 us | 1068.01 |
| 5000 | io_uring | 1956.29 us | 511.17 |
通过测试结果我们可以看到,epoll的性能在这些场景下都是要更优的。原因在于这些场景不涉及深度的IO。io_uring的优势在于高并发、大数据量。
epoll搭建的TCP服务器代码:
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <errno.h>
#include <asm-generic/socket.h>
#include <signal.h>
#define SERVER_PORT 8080
#define BUFFER_SIZE 128
#define BACKLOG 128
#define MAX_EVENTS 128
int set_noblocking(int fd) {
int flag = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flag | O_NONBLOCK);
return 0;
}
int main() {
signal(SIGPIPE, SIG_IGN);
int listen_fd;
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1) {
perror("socket创建失败");
return -1;
}
int opt = 1;
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(SERVER_PORT);
if (bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定失败");
close(listen_fd);
return -1;
}
if (listen(listen_fd, BACKLOG) == -1) {
perror("监听失败");
close(listen_fd);
return -1;
}
set_noblocking(listen_fd);
int epfd = epoll_create(1);
// printf("epfd: %d\n", epfd);
if (epfd == -1) {
perror("epoll create失败");
close(listen_fd);
return -1;
}
struct epoll_event event;
event.data.fd = listen_fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &event);
struct epoll_event events[MAX_EVENTS];
while (1) {
int nready = epoll_wait(epfd, events, MAX_EVENTS, -1);
if (nready == -1) {
perror("epoll wait错误");
break;
}
for (int i = 0; i < nready; i++) {
int curr_fd = events[i].data.fd;
if (curr_fd == listen_fd) {
while (1) {
struct sockaddr_in conn_addr;
socklen_t sock_len = sizeof(conn_addr);
int new_fd = accept(listen_fd, (struct sockaddr*)&conn_addr, &sock_len);
if (new_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
perror("accept 错误");
break;
}
set_noblocking(new_fd);
struct epoll_event event;
event.data.fd = new_fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, new_fd, &event);
// printf("新连接到来\n");
}
}else if (events[i].events & EPOLLIN){
char buffer[BUFFER_SIZE] = { 0 };
ssize_t read_len = 0;
while (1) {
read_len = read(curr_fd, buffer, BUFFER_SIZE - 1);
if (read_len == 0) break;
if (read_len == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
perror("连接断开");
close(curr_fd);
break;
}
buffer[read_len] = '\0';
// printf("收到%d客户端数据:%s\n", curr_fd, buffer);
ssize_t sent = send(curr_fd, buffer, sizeof(buffer), 0);
if (sent == -1) {
// perror("发送数据失败");
close(curr_fd);
}
}
}
}
}
close(epfd);
close(listen_fd);
return 0;
}
测试代码:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>
#define CONN_COUNT 5000 // 并发连接数,可改 1000/3000/5000/10000
#define TOTAL 500000 // 总请求数
#define IP "192.168.121.128"
#define PORT 8080
#define MSG "ping\n"
int socks[CONN_COUNT];
int cur = 0;
// 创建一个 TCP 连接
int create_conn() {
int fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
inet_pton(AF_INET, IP, &addr.sin_addr);
if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
perror("connect");
exit(1);
}
return fd;
}
// 初始化所有连接
void init_conns() {
for (int i = 0; i < CONN_COUNT; i++) {
socks[i] = create_conn();
}
}
// 轮询用一个连接发送
void send_one() {
int fd = socks[cur++ % CONN_COUNT];
send(fd, MSG, strlen(MSG), 0);
}
// 从任意连接读取回复
void recv_one() {
char buf[64];
for (int i = 0; i < CONN_COUNT; i++) {
int fd = socks[i];
ssize_t n = recv(fd, buf, 64, MSG_DONTWAIT);
if (n > 0) return;
}
}
int main() {
printf("创建连接...\n");
init_conns();
printf("开始压测...\n");
long long start = clock();
for (int i = 0; i < TOTAL; i++) {
send_one();
recv_one();
// 实时打印进度,不会看起来卡住
if (i % 10000 == 0) {
printf("进度: %d/%d\n", i, TOTAL);
}
}
long long end = clock();
double use = (end - start) * 1000.0 / CLOCKS_PER_SEC;
double qps = TOTAL / (use / 1000.0);
double rtt = use * 1000.0 / TOTAL;
printf("\n==== 结果 ====\n");
printf("并发: %d\n", CONN_COUNT);
printf("总请求: %d\n", TOTAL);
printf("耗时: %.2f ms\n", use);
printf("RTT: %.2f us\n", rtt);
printf("QPS: %.2f\n", qps);
for (int i = 0; i < CONN_COUNT; i++) close(socks[i]);
return 0;
}
7.结语
欢迎批评指正!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)