一、事件驱动 Reactor 模型:核心原理与本质

1. Reactor 模型的核心原理(分 4 步)

Reactor(反应器)是事件驱动的网络编程模型,核心是「事件注册 → 事件监听 → 事件分发 → 事件处理」,可以用 “机场调度中心” 比喻理解:

表格

模型角色 机场调度中心类比 代码中的对应实现
Reactor 核心 调度中心(监听所有航班状态) epoll 实例(epfd)+ epoll_wait
事件注册 航空公司向调度中心报备航班信息 epoll_ctl(ADD/MOD/DEL)注册 fd
事件监听 调度中心监控航班 “就绪”(起飞 / 降落) epoll_wait 阻塞等待 fd 事件就绪
事件分发 调度中心通知地勤处理就绪航班 主线程遍历就绪 events,调用回调
事件处理器 地勤人员(负责起飞 / 降落具体操作) accept_cb/recv_cb/send_cb 回调函数

具体执行逻辑

  1. 注册:将所有需要监听的 fd(监听 fd / 客户端 fd)和关注的事件(EPOLLIN/EPOLLOUT)注册到 Reactor 核心(epoll);
  2. 监听:Reactor 核心阻塞等待事件就绪,不占用 CPU 资源;
  3. 分发:一旦有 fd 事件就绪(如新连接、有数据可读),Reactor 核心通知主线程,主线程取出就绪 fd;
  4. 处理:主线程根据事件类型,调用预先绑定的回调函数处理(如新连接调 accept_cb,读事件调 recv_cb)。
2. Reactor 模型的本质

Reactor 模型的本质是:将 “主动轮询所有 fd” 改为 “被动等待事件通知”,并通过 “回调函数” 解耦事件监听与事件处理

  • 核心思想:“谁触发,谁处理” —— 只有 fd 产生事件时,才会触发对应的处理逻辑,而非无差别轮询;
  • 核心价值:将 CPU 资源聚焦在 “处理实际事件” 上,而非 “检查 fd 状态”,是高并发网络编程的基石。

3.核心角色

  • Reactor 核心:由 epoll 实现(epfd 是 epoll 实例句柄),负责监听所有文件描述符(fd)的 IO 事件(可读 / 可写);
  • 事件处理器accept_cb(处理新连接)、recv_cb(处理读事件)、send_cb(处理写事件),对应不同事件的具体逻辑;
  • 事件注册器event_register/set_event 函数,负责把 fd 和对应的事件(EPOLLIN/EPOLLOUT)注册到 epoll 中;
  • 连接管理器conn_list 数组,存储每个 fd 的上下文(缓冲区、回调函数、状态等)。

4.Reactor 核心逻辑

  • 所有 IO 操作(连接、读、写)都不是 “主动轮询”,而是 “被动等待事件触发”;
  • epoll 监听所有 fd 的事件,一旦有事件就绪(比如有新连接、有数据可读),就通知主线程;
  • 主线程根据事件类型,调用对应的回调函数处理(比如新连接调 accept_cb,读事件调 recv_cb)。

二、Reactor(epoll)vs select/poll:核心对比

先明确三者的底层定位:select/poll 是早期的 IO 多路复用技术,Reactor 模型通常基于 epoll(Linux)实现(也可基于 select/poll,但效率极低)。

1. 核心差异对比表

表格

维度 Reactor(epoll 实现) select poll
就绪事件通知方式 内核主动返回 “就绪 fd 列表”(事件驱动) 内核返回 “是否有就绪 fd”,需用户态轮询所有 fd 同 select,仅数据结构不同
fd 数量限制 无上限(仅受系统文件描述符限制) 有上限(默认 1024,由 FD_SETSIZE 定义) 无上限(用链表存储 fd)
时间复杂度 O(1)(仅遍历就绪 fd) O(n)(遍历所有注册的 fd) O(n)(同 select)
fd 存储方式 内核维护红黑树(注册 fd)+ 就绪链表 用户态位图(fd_set) 用户态链表(pollfd 数组)
触发模式 支持水平触发(LT)+ 边沿触发(ET) 仅水平触发(LT) 仅水平触发(LT)
内存拷贝开销 无(内核直接引用就绪 fd,无需拷贝) 每次调用需拷贝 fd_set 到内核态 每次调用需拷贝 pollfd 数组到内核态
重复注册问题 注册后永久有效,仅需修改事件类型 每次调用需重新设置 fd_set(内核不保存) 每次调用需重新传入 pollfd 数组
2. 优缺点深度分析
(1)Reactor(epoll)的优缺点

优点

  • 高并发支撑:无 fd 数量限制,支持十万 / 百万级连接,是高并发服务器的首选;
  • 低 CPU 开销:仅遍历就绪 fd,无需轮询所有 fd,时间复杂度 O(1);
  • 灵活的触发模式:边沿触发(ET)可减少冗余通知,进一步提升效率;
  • 低内存开销:内核仅存储注册的 fd,且就绪事件无需拷贝到用户态;
  • 持久化注册:fd 注册后无需重复设置,仅需修改事件类型(如 EPOLLIN→EPOLLOUT)。

缺点

  • 平台依赖性:仅 Linux 系统支持 epoll,不跨平台(Windows 用 IOCP,macOS 用 kqueue);
  • 编程复杂度略高:需手动管理 fd 上下文、回调函数,对新手不友好(但代码结构更清晰)。
(2)select 的优缺点

优点

  • 跨平台性:几乎所有操作系统都支持(Linux/Windows/macOS);
  • 编程简单:API 简洁,适合入门级 IO 多路复用场景。

缺点

  • fd 数量限制:默认最多监听 1024 个 fd,修改内核参数也难以支撑高并发;
  • 效率极低:每次调用需拷贝 fd_set 到内核,且需遍历所有 fd 找就绪事件;
  • 功能单一:仅支持水平触发,无扩展空间。
(3)poll 的优缺点

优点

  • 跨平台性:同 select,支持主流操作系统;
  • 无 fd 数量限制:用链表存储 fd,理论上可监听任意数量 fd。

缺点

  • 效率仍低:虽无 fd 数量限制,但仍需遍历所有注册的 fd,高并发下 CPU 占用高;
  • 内存拷贝开销:每次调用需拷贝 pollfd 数组到内核态,fd 越多开销越大;
  • 无边沿触发:仅支持水平触发,冗余通知多。

三、实战场景选择

表格

场景 推荐技术 原因
高并发服务器(如网关 / 直播) Reactor(epoll) 支撑百万级连接,低 CPU 开销
小型工具 / 跨平台程序 select/poll 编程简单,无需关注平台兼容性
嵌入式系统 select 资源占用少,API 简单

四、总结

  1. Reactor 模型的本质:事件驱动的 IO 多路复用,核心是 “被动等待事件通知 + 回调处理”,将 CPU 资源聚焦在实际事件处理上;

  2. 核心优势(epoll vs select/poll):

    • epoll 解决了 select/poll “轮询所有 fd” 的核心痛点,实现了 O(1) 时间复杂度;
    • 无 fd 数量限制、支持边沿触发、低内存拷贝开销,是高并发的核心保障;
  3. 选型原则:

    • 高并发场景(如百万连接):必选 Reactor(epoll);
    • 小型 / 跨平台场景:可选 select/poll(优先 poll,无 fd 数量限制)。

五、Reactor实现高并发服务器流程详解

//代码实现
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <sys/time.h>

// 服务器核心结构体定义(保留原结构,仅补充注释)
#ifndef __REACTOR_H__
#define __REACTOR_H__

#define BUFFER_LENGTH     1024  // 读写缓冲区大小
typedef int (*RCALLBACK)(int fd); // 回调函数指针类型定义

/**
 * @brief 单个连接的上下文结构体
 * 存储每个fd的缓冲区、回调函数、状态等核心信息
 */
struct conn {
    int fd;                     // 连接对应的文件描述符
    char rbuffer[BUFFER_LENGTH];// 读缓冲区:存储从客户端读取的数据
    int rlength;                // 读缓冲区有效数据长度
    char wbuffer[BUFFER_LENGTH];// 写缓冲区:存储要发送给客户端的数据
    int wlength;                // 写缓冲区有效数据长度
    RCALLBACK send_callback;    // 写事件回调函数(send_cb)

    /**
     * @brief 联合体:读事件/连接事件回调(原设计保留,补充注释)
     * 监听fd:使用accept_callback;客户端fd:使用recv_callback
     */
    union {
        RCALLBACK recv_callback;    // 读事件回调(recv_cb)
        RCALLBACK accept_callback;  // 新连接回调(accept_cb)
    } r_action;

    int status; // 连接状态(预留:0-默认 1-响应头 2-响应体)
};

#endif

// 全局配置常量
#define CONNECTION_SIZE    1048576  // 最大连接数(fd作为数组下标)
#define MAX_PORTS          20       // 监听端口数量(2000~2019)
#define LISTEN_BACKLOG     1024     // listen队列长度(原代码未定义,补充)

// 时间计算宏:计算两个timeval的毫秒差
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

// 全局变量
int epfd = 0;                       // epoll实例句柄
struct timeval begin;               // 性能统计起始时间
struct conn conn_list[CONNECTION_SIZE] = {0}; // 连接上下文数组

// 函数声明(保留原函数名)
int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
int set_event(int fd, int event, int flag);
int event_register(int fd, int event);
int init_server(unsigned short port);

/**
 * @brief 注册/修改epoll事件
 * @param fd      要操作的文件描述符
 * @param event   要监听的事件(EPOLLIN/EPOLLOUT)
 * @param flag    操作类型:1-添加(EPOLL_CTL_ADD) 0-修改(EPOLL_CTL_MOD)
 * @return 0-成功 -1-失败
 */
int set_event(int fd, int event, int flag) {
    struct epoll_event ev = {0};    // 初始化事件结构体(修复原代码未初始化问题)
    ev.events = event;
    ev.data.fd = fd;

    int op = flag ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    int ret = epoll_ctl(epfd, op, fd, &ev);
    
    // 错误处理(原代码缺失)
    if (ret == -1) {
        fprintf(stderr, "epoll_ctl error: fd=%d, op=%d, errno=%d(%s)\n", 
                fd, op, errno, strerror(errno));
        return -1;
    }
    return 0;
}

/**
 * @brief 初始化连接上下文并注册epoll事件
 * @param fd      要初始化的fd
 * @param event   要注册的事件(EPOLLIN/EPOLLOUT)
 * @return 0-成功 -1-失败
 */
int event_register(int fd, int event) {
    if (fd < 0 || fd >= CONNECTION_SIZE) { // 补充fd范围检查
        fprintf(stderr, "invalid fd: %d (max: %d)\n", fd, CONNECTION_SIZE-1);
        return -1;
    }

    // 初始化连接上下文
    struct conn *conn = &conn_list[fd]; // 指针优化,减少数组下标访问
    conn->fd = fd;
    conn->r_action.recv_callback = recv_cb; // 客户端fd:读回调
    conn->send_callback = send_cb;          // 写回调

    // 清空缓冲区(原代码重复memset,简化)
    memset(conn->rbuffer, 0, BUFFER_LENGTH);
    memset(conn->wbuffer, 0, BUFFER_LENGTH);
    conn->rlength = 0;
    conn->wlength = 0;
    conn->status = 0; // 初始化状态

    // 注册epoll事件
    return set_event(fd, event, 1);
}

/**
 * @brief 新连接回调函数(监听fd触发EPOLLIN时调用)
 * @param fd 监听文件描述符
 * @return 0-成功 -1-失败
 */
int accept_cb(int fd) {
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);

    // 接受新连接(设置非阻塞?原代码未做,保留原逻辑)
    int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
    if (clientfd < 0) {
        // EINTR是中断错误,无需报错(修复原代码无差别打印错误)
        if (errno != EINTR) {
            fprintf(stderr, "accept error: errno=%d(%s)\n", errno, strerror(errno));
        }
        return -1;
    }

    // 初始化客户端连接并注册读事件
    event_register(clientfd, EPOLLIN);

    // 每1000个连接统计一次性能
    if ((clientfd % 1000) == 0) {
        struct timeval current;
        gettimeofday(&current, NULL);
        int time_used = TIME_SUB_MS(current, begin);
        memcpy(&begin, &current, sizeof(struct timeval));

        printf("accepted fd: %d, time used: %d ms\n", clientfd, time_used);
    }

    return 0;
}

/**
 * @brief 读事件回调函数(客户端fd触发EPOLLIN时调用)
 * @param fd 客户端文件描述符
 * @return 读取的字节数 0-断开 -1-失败
 */
int recv_cb(int fd) {
    struct conn *conn = &conn_list[fd];
    memset(conn->rbuffer, 0, BUFFER_LENGTH);

    // 读取客户端数据(MSG_NONBLOCK非阻塞?原代码未做,保留)
    int count = recv(fd, conn->rbuffer, BUFFER_LENGTH - 1, 0); // 留1字节存'\0'
    if (count == 0) { // 客户端主动断开
        printf("client disconnect: fd=%d\n", fd);
        goto close_conn; // 统一关闭逻辑
    } else if (count < 0) { // 读数据失败
        // EAGAIN/EWOULDBLOCK是正常非阻塞错误,EINTR是中断,无需关闭
        if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
            fprintf(stderr, "recv error: fd=%d, errno=%d(%s)\n", fd, errno, strerror(errno));
            goto close_conn;
        }
        return 0;
    }

    // 正常读取数据,启用echo回显逻辑(原代码注释打开)
    conn->rlength = count;
    conn->wlength = count;
    memcpy(conn->wbuffer, conn->rbuffer, count); // 读缓冲区拷贝到写缓冲区
    printf("[fd=%d] RECV: %s (len=%d)\n", fd, conn->rbuffer, count);

    // 修改事件为可写,触发send_cb发送数据
    set_event(fd, EPOLLOUT, 0);
    return count;

close_conn: // 统一关闭连接逻辑(优化原代码重复代码)
    close(fd);
    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // 从epoll中移除
    memset(conn, 0, sizeof(struct conn));     // 清空上下文(防止野指针)
    return -1;
}

/**
 * @brief 写事件回调函数(客户端fd触发EPOLLOUT时调用)
 * @param fd 客户端文件描述符
 * @return 发送的字节数 0-无数据 -1-失败
 */
int send_cb(int fd) {
    struct conn *conn = &conn_list[fd];
    int count = 0;

    // 写缓冲区有数据时发送
    if (conn->wlength > 0) {
        count = send(fd, conn->wbuffer, conn->wlength, 0);
        if (count > 0) {
            printf("[fd=%d] SEND: %s (len=%d)\n", fd, conn->wbuffer, count);
            // 清空写缓冲区(原代码缺失,防止重复发送)
            memset(conn->wbuffer, 0, BUFFER_LENGTH);
            conn->wlength = 0;
        } else if (count < 0) { // 发送失败
            if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
                fprintf(stderr, "send error: fd=%d, errno=%d(%s)\n", fd, errno, strerror(errno));
                close(fd);
                epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
                memset(conn, 0, sizeof(struct conn));
                return -1;
            }
        }
    }

    // 发送完成后,改回监听读事件(等待下一次客户端数据)
    set_event(fd, EPOLLIN, 0);
    return count;
}

/**
 * @brief 初始化TCP服务器监听套接字
 * @param port 要监听的端口号
 * @return 监听fd -1-失败
 */
int init_server(unsigned short port) {
    // 创建TCP套接字
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0) {
        fprintf(stderr, "socket error: errno=%d(%s)\n", errno, strerror(errno));
        return -1;
    }

    // // 设置端口复用(修复原代码端口占用问题)
    // int opt = 1;
    // setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));

    // 绑定地址和端口
    struct sockaddr_in servaddr = {0};
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听所有网卡
    servaddr.sin_port = htons(port);

    if (bind(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1) {
        fprintf(stderr, "bind error: port=%d, errno=%d(%s)\n", port, errno, strerror(errno));
        close(sockfd);
        return -1;
    }

    // 开始监听
    if (listen(sockfd, LISTEN_BACKLOG) == -1) {
        fprintf(stderr, "listen error: port=%d, errno=%d(%s)\n", port, errno, strerror(errno));
        close(sockfd);
        return -1;
    }

    printf("listen success: port=%d, sockfd=%d\n", port, sockfd);
    return sockfd;
}

/**
 * @brief 主函数:Reactor服务器核心逻辑
 */
int main() {
    // 创建epoll实例(推荐用epoll_create1,修复原代码epoll_create参数问题)
    epfd = epoll_create1(0);
    if (epfd < 0) {
        fprintf(stderr, "epoll_create1 error: errno=%d(%s)\n", errno, strerror(errno));
        return -1;
    }

    // 初始化多个监听端口(2000~2019)
    unsigned short base_port = 2000;
    for (int i = 0; i < MAX_PORTS; i++) {
        int sockfd = init_server(base_port + i);
        if (sockfd < 0) {
            continue; // 某个端口失败不影响其他端口
        }

        // 初始化监听fd的上下文(绑定accept回调)
        conn_list[sockfd].fd = sockfd;
        conn_list[sockfd].r_action.accept_callback = accept_cb;
        // 注册监听fd的读事件(新连接触发)
        set_event(sockfd, EPOLLIN, 1);
    }

    // 记录性能统计起始时间
    gettimeofday(&begin, NULL);
    printf("Reactor server start success, epfd=%d\n", epfd);

    // Reactor主循环
    struct epoll_event events[1024] = {0}; // 就绪事件数组
    while (1) {
        // 等待事件就绪(-1:永久阻塞)
        int nready = epoll_wait(epfd, events, 1024, -1);
        if (nready < 0) {
            // EINTR是信号中断,无需退出循环
            if (errno == EINTR) {
                continue;
            }
            fprintf(stderr, "epoll_wait error: errno=%d(%s)\n", errno, strerror(errno));
            break;
        }

        // 处理就绪事件
        for (int i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;
            struct conn *conn = &conn_list[connfd];

            // 处理读事件(新连接/客户端数据)
            if (events[i].events & EPOLLIN) {
                conn->r_action.recv_callback(connfd);
            }

            // 处理写事件(发送数据)
            if (events[i].events & EPOLLOUT) {
                conn->send_callback(connfd);
            }
        }
    }

    // 释放资源(理论上主循环不退出,仅做兜底)
    close(epfd);
    return 0;
}
阶段 1:程序启动 - 全局初始化(main 函数开头)

核心目标:创建 epoll 实例、初始化监听端口、注册监听事件,为高并发打下基础。

  1. 创建 epoll 实例

    • 调用函数:epoll_create1(0)(替代原 epoll_create,更现代)
    • 具体作用:向 Linux 内核申请一块内存空间(用于存储监听的 fd 和事件),返回 epfd(epoll 实例句柄),作为后续所有 epoll 操作的 “入口”。
    • 高并发体现:epoll 是内核级的事件监听机制,相比 select/poll,epoll 无需轮询所有 fd,仅返回就绪事件,是高并发的核心基础。
  2. 循环初始化 20 个监听端口(MAX_PORTS=20)

    对每个端口(2000~2019),调用 init_server(port + i),该函数的具体执行步骤:

    • socket(AF_INET, SOCK_STREAM, 0):创建 TCP 监听套接字 sockfd,用于接收客户端连接。
    • setsockopt(SO_REUSEADDR | SO_REUSEPORT):设置端口复用,解决程序重启时 “端口被占用” 的问题,保证服务快速重启。
    • bind():将 sockfd 绑定到指定端口(如 2000),关联 IP 和端口。
    • listen(sockfd, LISTEN_BACKLOG):将 sockfd 设为 “监听状态”,内核会为该 fd 维护一个连接队列(长度 1024),暂存未处理的客户端连接。
    • ⑤ 返回 sockfd:每个端口对应一个独立的监听 fd。
  3. 初始化监听 fd 的上下文 + 注册 epoll 事件

    • ① 赋值 conn_list[sockfd].r_action.accept_callback = accept_cb:为监听 fd 绑定 “新连接处理回调”,明确该 fd 触发可读事件时要执行的逻辑。

    • ② 调用

      set_event(sockfd, EPOLLIN, 1)
      
      • set_event 函数作用:封装 epoll_ctl 系统调用,flag=1 表示执行 EPOLL_CTL_ADD,将监听 fd 以「EPOLLIN(可读事件)」注册到 epoll 实例中。
      • 高并发体现:将所有监听 fd 统一交给 epoll 管理,而非主线程逐个轮询,减少 CPU 空耗。
  4. 记录起始时间gettimeofday(&begin, NULL),用于后续统计每 1000 个连接的创建耗时。

阶段 2:主循环 - Reactor 事件驱动核心(main 函数 while (1))

核心目标:无限等待 epoll 事件,分发并处理就绪事件,这是 Reactor 模式的核心,也是高并发的关键。

  1. 等待事件就绪:调用 epoll_wait(epfd, events, 1024, -1)

    • 具体作用:主线程阻塞,内核监控 epoll 实例中所有注册的 fd,只有当 fd 触发事件(如监听 fd 有新连接、客户端 fd 有数据)时,才唤醒主线程。

    • 参数说明:

      • events[1024]:存储内核返回的 “就绪事件数组”,最多一次返回 1024 个就绪事件;
      • nready:实际就绪的事件数量;
      • -1:永久阻塞,直到有事件触发(无事件时主线程休眠,不占用 CPU)。
    • 高并发体现:

      • 「阻塞等待」:无事件时主线程休眠,避免空轮询,大幅降低 CPU 使用率;
      • 「事件通知」:内核主动告知 “哪些 fd 就绪”,而非主线程遍历所有 fd(select/poll 缺点),支持十万级以上 fd 监听。
  2. 遍历就绪事件,分发处理

    循环 nready 次,对每个就绪事件执行:

    • ① 取出就绪 fd:connfd = events[i].data.fd

    • ② 处理读事件(EPOLLIN):

      conn->r_action.recv_callback(connfd)
      
      • 监听 fd 触发 EPOLLIN:调用 accept_cb(处理新连接);
      • 客户端 fd 触发 EPOLLIN:调用 recv_cb(读取客户端数据);
    • ③ 处理写事件(EPOLLOUT):conn->send_callback(connfd),调用 send_cb(向客户端发送数据)。

    • 高并发体现:「事件驱动 + 回调解耦」,主线程仅做 “事件分发”,具体业务逻辑(accept/recv/send)由回调函数处理,逻辑清晰且无阻塞(回调函数无耗时操作)。

阶段 3:新连接处理(accept_cb 函数)

核心目标:接受客户端连接,初始化客户端 fd 上下文,注册读事件。

当客户端发起 TCP 连接,监听 fd 触发 EPOLLIN 事件,主线程调用 accept_cb(fd)

  1. 接受连接accept(fd, &clientaddr, &len)

    • 具体作用:内核完成 TCP 三次握手,从监听 fd 的连接队列中取出一个连接,返回 clientfd(客户端专属 fd)。
    • 错误处理:过滤 EINTR(信号中断)等非致命错误,仅打印真正的连接失败日志。
  2. 初始化客户端 fd 上下文:调用 event_register(clientfd, EPOLLIN),该函数的具体作用:

    • ① 检查 fd 合法性(防止数组越界);

    • ② 初始化

      conn_list[clientfd]
      

      • 绑定读回调 recv_cb、写回调 send_cb
      • 清空读写缓冲区,初始化长度为 0;
    • ③ 调用 set_event(clientfd, EPOLLIN, 1):将客户端 fd 以「EPOLLIN(可读事件)」注册到 epoll 实例中,等待客户端发送数据。

    • 高并发体现:每个客户端 fd 独立管理上下文,epoll 统一监听,支持海量客户端连接。

  3. 性能统计:每创建 1000 个连接,计算并打印耗时,用于监控服务器连接处理能力。

阶段 4:客户端数据读取(recv_cb 函数)

核心目标:读取客户端数据,准备回显,切换为写事件。

当客户端发送数据,客户端 fd 触发 EPOLLIN 事件,主线程调用 recv_cb(fd)

  1. 读取数据recv(fd, conn->rbuffer, BUFFER_LENGTH-1, 0)

    • 具体作用:从客户端 fd 读取数据到读缓冲区 rbuffer,预留 1 字节存 \0 保证字符串合法。

    • 错误处理:

      • count=0:客户端主动断开,跳转到 close_conn 逻辑;
      • count<0:过滤 EAGAIN(非阻塞无数据)、EINTR(中断),仅处理致命错误;
      • 正常读取:记录数据长度 rlength
  2. echo 回显准备

    • 将读缓冲区数据拷贝到写缓冲区 wbuffer,设置 wlength
    • 打印接收日志,便于调试。
  3. 切换事件类型:调用 set_event(fd, EPOLLOUT, 0)

    • flag=0 表示执行 EPOLL_CTL_MOD,修改该 fd 在 epoll 中的监听事件为「EPOLLOUT(可写事件)」;
    • 高并发体现:仅修改事件类型,无需重新注册 fd,减少 epoll_ctl 系统调用开销。
  4. 连接关闭逻辑(close_conn)

    • close(fd):关闭客户端 fd,释放文件资源;
    • epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL):从 epoll 中移除该 fd,停止监听;
    • memset(conn, 0, sizeof(struct conn)):清空上下文,防止野指针。
阶段 5:客户端数据发送(send_cb 函数)

核心目标:发送回显数据,切换回读事件,等待下一次数据。

当客户端 fd 可写(内核缓冲区空闲),触发 EPOLLOUT 事件,主线程调用 send_cb(fd)

  1. 发送数据send(fd, conn->wbuffer, conn->wlength, 0)

    • 具体作用:将写缓冲区数据发送给客户端,发送成功后清空写缓冲区,防止重复发送。
    • 错误处理:过滤非致命错误,仅处理真正的发送失败(如客户端断开)。
  2. 切换回读事件:调用 set_event(fd, EPOLLIN, 0),将监听事件改回 EPOLLIN,等待客户端下一次发送数据。

    • 高并发体现:事件类型动态切换,仅在需要时监听可写事件,减少 epoll 通知次数。
阶段 6:程序退出(理论上不触发)

主循环仅在 epoll_wait 发生致命错误时退出,最后调用 close(epfd) 释放 epoll 实例资源。

二、Reactor 实现高并发的核心原理(结合代码体现)

Reactor 模式是高并发网络编程的基石,本代码通过 epoll 实现的 Reactor 核心优势体现在以下 4 点:

1. 「内核级事件监听」替代「用户态轮询」
  • 传统 select/poll:主线程需遍历所有 fd,检查是否就绪,时间复杂度 O(n),fd 数量越多越慢;
  • 本代码的 epoll:内核维护就绪事件列表,仅返回有事件的 fd,时间复杂度 O(1),支持 10 万 + fd 监听(代码中 CONNECTION_SIZE=1048576 也验证了这一点)。
2. 「阻塞等待」减少 CPU 空耗
  • 主线程在 epoll_wait 处阻塞,无事件时进入休眠状态,不占用 CPU 资源;
  • 只有当有连接 / 数据事件时,内核才唤醒主线程,CPU 利用率聚焦在 “处理实际事件” 上。
3. 「事件驱动 + 回调解耦」提升处理效率
  • 主线程仅做 “事件分发”,不处理具体业务逻辑,避免耗时操作阻塞主线程;
  • 不同事件(新连接 / 读 / 写)对应不同回调函数(accept_cb/recv_cb/send_cb),逻辑解耦,可独立扩展;
  • 本代码中回调函数均为 “短逻辑”(无磁盘 IO / 复杂计算),主线程能快速处理完所有就绪事件,支撑高并发。
4. 「高效的 fd 管理」降低系统调用开销
  • 监听 fd / 客户端 fd 统一注册到 epoll 后,仅需通过 epoll_ctlMOD 操作切换事件类型(如 EPOLLIN→EPOLLOUT),无需反复 ADD/DEL
  • 用 fd 作为 conn_list 数组下标,直接通过 fd 定位连接上下文,无需遍历查找,时间复杂度 O(1)。

总结

  1. 执行流程核心:初始化监听端口 → 注册 epoll 事件 → 主循环等待事件 → 新连接触发 accept_cb → 数据可读触发 recv_cb → 数据可写触发 send_cb → 动态切换事件类型;
  2. 高并发核心:epoll 内核级事件监听(O(1) 就绪事件返回)+ 阻塞等待(低 CPU 占用)+ 事件驱动回调(主线程不阻塞)+ 高效 fd 管理(低系统调用开销);
  3. 代码设计亮点:保留 Reactor 模式 “注册 - 监听 - 分发 - 回调” 的核心架构,通过 epoll 实现海量 fd 的高效管理,是典型的单线程 Reactor 高并发模型。
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐