一、定义:io_uring是什么?

定义: io_uring是Linux的一种高性能的异步I/O框架


二、诞生的背景:为什么需要 io_uring?

io_uring 出现之前,Linux 上主要有两种处理高并发 I/O 的方式:

  1. epoll + 阻塞 I/O (Reactor 模式)

    • 优点:成熟、稳定、应用广泛。
    • 缺点:本质上是 同步 I/O。当 epoll_wait 返回某个 fd 可读时,应用程序仍需调用 read() 系统调用去读取数据。这个 read() 调用在内核中可能会因为等待磁盘或网络而阻塞(虽然用户线程不阻塞,但内核线程会)。更重要的是,每次 I/O 操作都需要一次完整的系统调用,带来了上下文切换和 CPU 开销。在极高并发场景下,这些开销会成为瓶颈。
  2. POSIX AIO (aio_read, aio_write)

    • 优点:理论上是真正的异步 I/O。
    • 缺点:Linux 内核的 POSIX AIO 实现在用户空间通过线程池模拟,性能极差,且存在诸多限制(如只能用于 Direct I/O,不能用于 Buffered I/O),几乎无人在生产环境中使用。

io_uring 的诞生正是为了解决上述痛点

  • 消除不必要的系统调用:通过共享内存队列,提交和获取 I/O 结果几乎不需要系统调用。
  • 真正的内核级异步 I/O:I/O 请求完全由内核接管和执行,应用程序无需关心底层细节。
  • 支持 Buffered I/O:这是对 POSIX AIO 的巨大超越,使得绝大多数现有应用都能受益。
  • 统一接口:不仅能处理文件 I/O,还能处理网络 I/O (accept, connect, send, recv)、轮询 (poll)、定时器 (timeout) 等,成为一个通用的异步事件框架

三、工作原理:如何做到如此高效?

io_uring工作流程图

io_uring流程图
上述流程图是io_uring基本的工作框架,详细内容如下

io_uring 的高效性源于其精妙的 “共享内存 + 无锁环形缓冲区” 设计。它主要包含两个核心数据结构:

1. 提交队列 (Submission Queue, SQ)

  • 作用:用户空间向内核提交 I/O 请求的地方。
  • 实现:一个位于用户空间和内核空间共享的内存区域中的环形缓冲区。
  • 工作流程
    1. 用户程序调用 io_uring_prep_* 系列函数(如 io_uring_prep_read),在 SQ 的一个空闲槽位(slot)中准备好 I/O 请求的描述(操作类型、fd、buffer地址、offset等)。
    2. 用户程序更新 SQ 的 tail 指针(一个原子变量),表示有新的请求入队。
    3. 用户程序调用 io_uring_enter 系统调用(或依赖内核轮询线程),通知内核“SQ 里有新活了”。
    4. 关键点:准备请求和更新指针的过程完全在用户空间完成,无需任何系统调用,实现了零拷贝。

2. 完成队列 (Completion Queue, CQ)

  • 作用:内核向用户空间返回已完成 I/O 请求结果的地方。
  • 实现:同样是一个共享内存中的环形缓冲区
  • 工作流程
    1. 内核的 io_uring 驱动模块执行完一个 I/O 请求后,会将结果(成功/失败、读取的字节数等)封装成一个 io_uring_cqe (Completion Queue Entry) 结构体。
    2. 内核将这个 cqe 直接写入 CQ 的一个空闲槽位,并更新 CQ 的 tail 指针。
    3. 用户程序通过检查 CQ 的 headtail 指针(都是共享的原子变量),就能知道是否有新的完成事件,然后直接从 CQ 中读取结果。
    4. 关键点:获取结果的过程也无需系统调用(除了可能需要 io_uring_enter 来等待事件)。

3. 核心优势总结

  • 零拷贝通信:SQ/CQ 在用户和内核间共享,避免了传统系统调用中参数复制的开销。
  • 批处理:一次 io_uring_enter 可以提交多个请求,并等待多个完成事件,极大减少了系统调用次数。
  • 真正的异步:I/O 全程由内核异步处理,用户线程可以去做其他事情。
  • Proactor 模式:完美实现了 Proactor 模式——用户提交请求 -> 内核完成 I/O -> 内核通知用户。这比
  • Reactor 模式:(用户被通知可读 -> 用户自己去读)更高效。

四、如何使用:原生系统调用 vs liburing

直接使用 io_uring 的三个原始系统调用 (io_uring_setup, io_uring_register, io_uring_enter) 非常繁琐,需要手动管理内存映射、队列指针、内存对齐等底层细节。
因此,下列仅介绍官方封装库 liburing

liburing 库简介

liburingio_uring 的作者 Jens Axboe 亲自开发和维护,它将复杂的底层操作封装成简洁、易用的 API。

核心 API 概览
  1. 初始化与清理:

    • io_uring_queue_init(entries, &ring, flags): 初始化一个 io_uring 实例。
    • io_uring_queue_exit(&ring): 清理资源。
  2. 请求生命周期管理:

    • io_uring_get_sqe(&ring): 从 SQ 中获取一个空闲的提交队列项 (SQE)。
    • io_uring_submit(&ring): 提交所有已准备好的 SQE 到内核。
    • io_uring_wait_cqe(&ring, &cqe): 阻塞等待一个完成事件。
    • io_uring_peek_cqe(&ring, &cqe): 非阻塞地检查是否有完成事件。
    • io_uring_cqe_seen(&ring, cqe): 告诉内核这个 CQE 已被处理,可以回收。
  3. 操作准备函数:

    • io_uring_prep_read(sqe, fd, buf, nbytes, offset)
    • io_uring_prep_write(sqe, fd, buf, nbytes, offset)
    • io_uring_prep_accept(sqe, sockfd, addr, addrlen, flags)
    • io_uring_prep_connect(sqe, sockfd, addr, addrlen)
    • … (支持几乎所有常见的 I/O 操作)
      注:上述操作的使用方法同常见的io操作一样,只是参数中多了一个提交队列的参数

五、具体的使用案例

/*
    本文件完成的是用io_uring完成TCP server的编程
*/

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <liburing.h>

#define ENTRIES_LENGTH  1024
#define BUFFER_LENGTH   1024
#define EVENT_ACCEPT    0
#define EVENT_READ      1
#define EVENT_WRITE     2

struct conn_info{
    int fd;
    int event;
};

int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags)
{
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

    struct conn_info accept_info ={
        .fd = sockfd ,
        .event = EVENT_READ
    };
    

    io_uring_prep_recv(sqe, sockfd, buf, len, flags);   // 向提交队列中增加一个recv任务(也可以理解为节点)
     memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}

int set_event_send(struct io_uring *ring, int sockfd, const void *buf, size_t len, int flags)
{
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

    struct conn_info accept_info ={
        .fd = sockfd ,
        .event = EVENT_WRITE
    };
    

    io_uring_prep_send(sqe, sockfd, buf, len, flags);   // 向提交队列中增加一个recv任务(也可以理解为节点)
     memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}

int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr* clientaddr, socklen_t *addrLen, int flags)
{
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

   struct conn_info accept_info ={
        .fd = sockfd ,
        .event = EVENT_ACCEPT
    };
    

    io_uring_prep_accept(sqe, sockfd, (struct sockaddr *)&clientaddr, addrLen, 0);   // 向提交队列中增加一个accept任务(也可以理解为节点)
    memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}



int init_server(unsigned short port)
{
    // TCPserver的基本步骤
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in Addr;
    Addr.sin_family= AF_INET;
    Addr.sin_port = htons(port);
    Addr.sin_addr.s_addr = INADDR_ANY;
    
    if(bind(listenfd,(struct sockaddr *)&Addr,sizeof(Addr))==-1)    // 成功返回0
    {
        perror("bind");
        return -1;
    }
    if(listen(listenfd, 10))
    {
        perror("listen");
        return -1;
    }
    //printf("listen finished: %d\n", listenfd);
    return listenfd;
}

int main(int arg ,char *argv[])
{
    unsigned short port = 2048;
    int listenfd = init_server(port);

    struct io_uring_params params;
    memset(&params, 0,sizeof(params));

    struct io_uring ring;
    io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);     // 创建io_uring的工作流程(提交队列(SQ)和完成队列(CQ))

    // 从提交队列 (SQ) 中获取一个空闲的 sqe 条目,来填写新的 I/O 请求
    //struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);   // 获取SQ队列的头指针
    
#if 0
    // 按照常规操作应该按下列流程写
    struct sockaddr_in clientaddr;
    socklen_t len = sizof(clientaddr);
    accept(listenfd, (struct sockaddr*)&clientaddr, &len);
#else
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);
    set_event_accept(&ring, listenfd, (struct sockaddr* )&clientaddr, &len, 0);
#endif
    char buffer[BUFFER_LENGTH] = {0};
    while(1)
    {
        
        io_uring_submit(&ring); // 一次性提交SQ中全部的任务

        struct io_uring_cqe *cqe;
        io_uring_wait_cqe(&ring, &cqe); //阻塞等待事件完成,直到至少有一个完成事件可用

        struct io_uring_cqe *cqes[128];
        int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);  // 从完成队列中拿取多少个已完成的节点,将cqes中的指针指向CQ中的任务节点
        
        for(int i=0; i< nready; ++i)
        {
            
            //printf("io_uring_peek_batch_cqe\n");
            struct io_uring_cqe *entries = cqes[i];
            struct conn_info result;
             memcpy(&result, &entries->user_data, sizeof(struct conn_info));

             if(result.event == EVENT_ACCEPT)
             {
                set_event_accept(&ring, listenfd, (struct sockaddr *)&clientaddr, &len, 0);

                int connfd = entries->res; 
                set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH,0);     // 此处为什么加一个recv?
             }
             else if(result.event == EVENT_READ)
             {
                int ret = entries->res;     // 返回数据的大小
                //printf("set_event_recv: %s recvSize: %d\n",buffer, ret);
                if(ret == 0)
                {
                    close(result.fd);
                }
                else if(ret>0){
                    set_event_send(&ring, result.fd, buffer, ret,0);
                    
                }
             }
             else if(result.event == EVENT_WRITE)
             {
                 int ret = entries->res;
                //printf("set_event_send: %s recvSize: %d\n",buffer, ret);
                set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH,0);
             }
            
        }

        // io_uring_cqe_seen(&ring, cqes[0]);         // 清空CQ中的节点,只能清理一个
        io_uring_cq_advance(&ring, nready);     // 可以一次性清理多个
    }
}
与reactor模式服务器的性能对比

在这里插入图片描述

综上就是本文的全部内容。

https://github.com/0voice

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐