io_uring详解
·
一、定义:io_uring是什么?
定义: io_uring是Linux的一种高性能的异步I/O框架
二、诞生的背景:为什么需要 io_uring?
在 io_uring 出现之前,Linux 上主要有两种处理高并发 I/O 的方式:
-
epoll+ 阻塞 I/O (Reactor 模式):- 优点:成熟、稳定、应用广泛。
- 缺点:本质上是 同步 I/O。当
epoll_wait返回某个 fd 可读时,应用程序仍需调用read()系统调用去读取数据。这个read()调用在内核中可能会因为等待磁盘或网络而阻塞(虽然用户线程不阻塞,但内核线程会)。更重要的是,每次 I/O 操作都需要一次完整的系统调用,带来了上下文切换和 CPU 开销。在极高并发场景下,这些开销会成为瓶颈。
-
POSIX AIO (
aio_read,aio_write):- 优点:理论上是真正的异步 I/O。
- 缺点:Linux 内核的 POSIX AIO 实现在用户空间通过线程池模拟,性能极差,且存在诸多限制(如只能用于 Direct I/O,不能用于 Buffered I/O),几乎无人在生产环境中使用。
io_uring 的诞生正是为了解决上述痛点:
- 消除不必要的系统调用:通过共享内存队列,提交和获取 I/O 结果几乎不需要系统调用。
- 真正的内核级异步 I/O:I/O 请求完全由内核接管和执行,应用程序无需关心底层细节。
- 支持 Buffered I/O:这是对 POSIX AIO 的巨大超越,使得绝大多数现有应用都能受益。
- 统一接口:不仅能处理文件 I/O,还能处理网络 I/O (
accept,connect,send,recv)、轮询 (poll)、定时器 (timeout) 等,成为一个通用的异步事件框架。
三、工作原理:如何做到如此高效?
io_uring工作流程图

上述流程图是io_uring基本的工作框架,详细内容如下
io_uring 的高效性源于其精妙的 “共享内存 + 无锁环形缓冲区” 设计。它主要包含两个核心数据结构:
1. 提交队列 (Submission Queue, SQ)
- 作用:用户空间向内核提交 I/O 请求的地方。
- 实现:一个位于用户空间和内核空间共享的内存区域中的环形缓冲区。
- 工作流程:
- 用户程序调用
io_uring_prep_*系列函数(如io_uring_prep_read),在 SQ 的一个空闲槽位(slot)中准备好 I/O 请求的描述(操作类型、fd、buffer地址、offset等)。 - 用户程序更新 SQ 的
tail指针(一个原子变量),表示有新的请求入队。 - 用户程序调用
io_uring_enter系统调用(或依赖内核轮询线程),通知内核“SQ 里有新活了”。 - 关键点:准备请求和更新指针的过程完全在用户空间完成,无需任何系统调用,实现了零拷贝。
- 用户程序调用
2. 完成队列 (Completion Queue, CQ)
- 作用:内核向用户空间返回已完成 I/O 请求结果的地方。
- 实现:同样是一个共享内存中的环形缓冲区。
- 工作流程:
- 内核的
io_uring驱动模块执行完一个 I/O 请求后,会将结果(成功/失败、读取的字节数等)封装成一个io_uring_cqe(Completion Queue Entry) 结构体。 - 内核将这个
cqe直接写入 CQ 的一个空闲槽位,并更新 CQ 的tail指针。 - 用户程序通过检查 CQ 的
head和tail指针(都是共享的原子变量),就能知道是否有新的完成事件,然后直接从 CQ 中读取结果。 - 关键点:获取结果的过程也无需系统调用(除了可能需要
io_uring_enter来等待事件)。
- 内核的
3. 核心优势总结
- 零拷贝通信:SQ/CQ 在用户和内核间共享,避免了传统系统调用中参数复制的开销。
- 批处理:一次
io_uring_enter可以提交多个请求,并等待多个完成事件,极大减少了系统调用次数。 - 真正的异步:I/O 全程由内核异步处理,用户线程可以去做其他事情。
- Proactor 模式:完美实现了 Proactor 模式——用户提交请求 -> 内核完成 I/O -> 内核通知用户。这比
- Reactor 模式:(用户被通知可读 -> 用户自己去读)更高效。
四、如何使用:原生系统调用 vs liburing
直接使用
io_uring的三个原始系统调用 (io_uring_setup,io_uring_register,io_uring_enter) 非常繁琐,需要手动管理内存映射、队列指针、内存对齐等底层细节。
因此,下列仅介绍官方封装库liburing。
liburing 库简介
liburing由io_uring的作者 Jens Axboe 亲自开发和维护,它将复杂的底层操作封装成简洁、易用的 API。
核心 API 概览
-
初始化与清理:
io_uring_queue_init(entries, &ring, flags): 初始化一个io_uring实例。io_uring_queue_exit(&ring): 清理资源。
-
请求生命周期管理:
io_uring_get_sqe(&ring): 从 SQ 中获取一个空闲的提交队列项 (SQE)。io_uring_submit(&ring): 提交所有已准备好的 SQE 到内核。io_uring_wait_cqe(&ring, &cqe): 阻塞等待一个完成事件。io_uring_peek_cqe(&ring, &cqe): 非阻塞地检查是否有完成事件。io_uring_cqe_seen(&ring, cqe): 告诉内核这个 CQE 已被处理,可以回收。
-
操作准备函数:
io_uring_prep_read(sqe, fd, buf, nbytes, offset)io_uring_prep_write(sqe, fd, buf, nbytes, offset)io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags)io_uring_prep_connect(sqe, sockfd, addr, addrlen)- … (支持几乎所有常见的 I/O 操作)
注:上述操作的使用方法同常见的io操作一样,只是参数中多了一个提交队列的参数
五、具体的使用案例
/*
本文件完成的是用io_uring完成TCP server的编程
*/
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <liburing.h>
#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024
#define EVENT_ACCEPT 0
#define EVENT_READ 1
#define EVENT_WRITE 2
struct conn_info{
int fd;
int event;
};
int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info ={
.fd = sockfd ,
.event = EVENT_READ
};
io_uring_prep_recv(sqe, sockfd, buf, len, flags); // 向提交队列中增加一个recv任务(也可以理解为节点)
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
int set_event_send(struct io_uring *ring, int sockfd, const void *buf, size_t len, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info ={
.fd = sockfd ,
.event = EVENT_WRITE
};
io_uring_prep_send(sqe, sockfd, buf, len, flags); // 向提交队列中增加一个recv任务(也可以理解为节点)
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr* clientaddr, socklen_t *addrLen, int flags)
{
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info accept_info ={
.fd = sockfd ,
.event = EVENT_ACCEPT
};
io_uring_prep_accept(sqe, sockfd, (struct sockaddr *)&clientaddr, addrLen, 0); // 向提交队列中增加一个accept任务(也可以理解为节点)
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
int init_server(unsigned short port)
{
// TCPserver的基本步骤
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in Addr;
Addr.sin_family= AF_INET;
Addr.sin_port = htons(port);
Addr.sin_addr.s_addr = INADDR_ANY;
if(bind(listenfd,(struct sockaddr *)&Addr,sizeof(Addr))==-1) // 成功返回0
{
perror("bind");
return -1;
}
if(listen(listenfd, 10))
{
perror("listen");
return -1;
}
//printf("listen finished: %d\n", listenfd);
return listenfd;
}
int main(int arg ,char *argv[])
{
unsigned short port = 2048;
int listenfd = init_server(port);
struct io_uring_params params;
memset(¶ms, 0,sizeof(params));
struct io_uring ring;
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms); // 创建io_uring的工作流程(提交队列(SQ)和完成队列(CQ))
// 从提交队列 (SQ) 中获取一个空闲的 sqe 条目,来填写新的 I/O 请求
//struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); // 获取SQ队列的头指针
#if 0
// 按照常规操作应该按下列流程写
struct sockaddr_in clientaddr;
socklen_t len = sizof(clientaddr);
accept(listenfd, (struct sockaddr*)&clientaddr, &len);
#else
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, listenfd, (struct sockaddr* )&clientaddr, &len, 0);
#endif
char buffer[BUFFER_LENGTH] = {0};
while(1)
{
io_uring_submit(&ring); // 一次性提交SQ中全部的任务
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe); //阻塞等待事件完成,直到至少有一个完成事件可用
struct io_uring_cqe *cqes[128];
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // 从完成队列中拿取多少个已完成的节点,将cqes中的指针指向CQ中的任务节点
for(int i=0; i< nready; ++i)
{
//printf("io_uring_peek_batch_cqe\n");
struct io_uring_cqe *entries = cqes[i];
struct conn_info result;
memcpy(&result, &entries->user_data, sizeof(struct conn_info));
if(result.event == EVENT_ACCEPT)
{
set_event_accept(&ring, listenfd, (struct sockaddr *)&clientaddr, &len, 0);
int connfd = entries->res;
set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH,0); // 此处为什么加一个recv?
}
else if(result.event == EVENT_READ)
{
int ret = entries->res; // 返回数据的大小
//printf("set_event_recv: %s recvSize: %d\n",buffer, ret);
if(ret == 0)
{
close(result.fd);
}
else if(ret>0){
set_event_send(&ring, result.fd, buffer, ret,0);
}
}
else if(result.event == EVENT_WRITE)
{
int ret = entries->res;
//printf("set_event_send: %s recvSize: %d\n",buffer, ret);
set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH,0);
}
}
// io_uring_cqe_seen(&ring, cqes[0]); // 清空CQ中的节点,只能清理一个
io_uring_cq_advance(&ring, nready); // 可以一次性清理多个
}
}
与reactor模式服务器的性能对比

综上就是本文的全部内容。
https://github.com/0voice
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)