事件驱动Reactor模型实现网络服务器高并发的原理与应用
一、事件驱动 Reactor 模型:核心原理与本质
1. Reactor 模型的核心原理(分 4 步)
Reactor(反应器)是事件驱动的网络编程模型,核心是「事件注册 → 事件监听 → 事件分发 → 事件处理」,可以用 “机场调度中心” 比喻理解:
表格
| 模型角色 | 机场调度中心类比 | 代码中的对应实现 |
|---|---|---|
| Reactor 核心 | 调度中心(监听所有航班状态) | epoll 实例(epfd)+ epoll_wait |
| 事件注册 | 航空公司向调度中心报备航班信息 | epoll_ctl(ADD/MOD/DEL)注册 fd |
| 事件监听 | 调度中心监控航班 “就绪”(起飞 / 降落) | epoll_wait 阻塞等待 fd 事件就绪 |
| 事件分发 | 调度中心通知地勤处理就绪航班 | 主线程遍历就绪 events,调用回调 |
| 事件处理器 | 地勤人员(负责起飞 / 降落具体操作) | accept_cb/recv_cb/send_cb 回调函数 |
具体执行逻辑:
- 注册:将所有需要监听的 fd(监听 fd / 客户端 fd)和关注的事件(EPOLLIN/EPOLLOUT)注册到 Reactor 核心(epoll);
- 监听:Reactor 核心阻塞等待事件就绪,不占用 CPU 资源;
- 分发:一旦有 fd 事件就绪(如新连接、有数据可读),Reactor 核心通知主线程,主线程取出就绪 fd;
- 处理:主线程根据事件类型,调用预先绑定的回调函数处理(如新连接调 accept_cb,读事件调 recv_cb)。
2. Reactor 模型的本质
Reactor 模型的本质是:将 “主动轮询所有 fd” 改为 “被动等待事件通知”,并通过 “回调函数” 解耦事件监听与事件处理。
- 核心思想:“谁触发,谁处理” —— 只有 fd 产生事件时,才会触发对应的处理逻辑,而非无差别轮询;
- 核心价值:将 CPU 资源聚焦在 “处理实际事件” 上,而非 “检查 fd 状态”,是高并发网络编程的基石。
3.核心角色:
- Reactor 核心:由
epoll实现(epfd是 epoll 实例句柄),负责监听所有文件描述符(fd)的 IO 事件(可读 / 可写); - 事件处理器:
accept_cb(处理新连接)、recv_cb(处理读事件)、send_cb(处理写事件),对应不同事件的具体逻辑; - 事件注册器:
event_register/set_event函数,负责把 fd 和对应的事件(EPOLLIN/EPOLLOUT)注册到 epoll 中; - 连接管理器:
conn_list数组,存储每个 fd 的上下文(缓冲区、回调函数、状态等)。
4.Reactor 核心逻辑:
- 所有 IO 操作(连接、读、写)都不是 “主动轮询”,而是 “被动等待事件触发”;
- epoll 监听所有 fd 的事件,一旦有事件就绪(比如有新连接、有数据可读),就通知主线程;
- 主线程根据事件类型,调用对应的回调函数处理(比如新连接调
accept_cb,读事件调recv_cb)。
二、Reactor(epoll)vs select/poll:核心对比
先明确三者的底层定位:select/poll 是早期的 IO 多路复用技术,Reactor 模型通常基于 epoll(Linux)实现(也可基于 select/poll,但效率极低)。
1. 核心差异对比表
表格
| 维度 | Reactor(epoll 实现) | select | poll |
|---|---|---|---|
| 就绪事件通知方式 | 内核主动返回 “就绪 fd 列表”(事件驱动) | 内核返回 “是否有就绪 fd”,需用户态轮询所有 fd | 同 select,仅数据结构不同 |
| fd 数量限制 | 无上限(仅受系统文件描述符限制) | 有上限(默认 1024,由 FD_SETSIZE 定义) | 无上限(用链表存储 fd) |
| 时间复杂度 | O(1)(仅遍历就绪 fd) | O(n)(遍历所有注册的 fd) | O(n)(同 select) |
| fd 存储方式 | 内核维护红黑树(注册 fd)+ 就绪链表 | 用户态位图(fd_set) | 用户态链表(pollfd 数组) |
| 触发模式 | 支持水平触发(LT)+ 边沿触发(ET) | 仅水平触发(LT) | 仅水平触发(LT) |
| 内存拷贝开销 | 无(内核直接引用就绪 fd,无需拷贝) | 每次调用需拷贝 fd_set 到内核态 | 每次调用需拷贝 pollfd 数组到内核态 |
| 重复注册问题 | 注册后永久有效,仅需修改事件类型 | 每次调用需重新设置 fd_set(内核不保存) | 每次调用需重新传入 pollfd 数组 |
2. 优缺点深度分析
(1)Reactor(epoll)的优缺点
✅ 优点:
- 高并发支撑:无 fd 数量限制,支持十万 / 百万级连接,是高并发服务器的首选;
- 低 CPU 开销:仅遍历就绪 fd,无需轮询所有 fd,时间复杂度 O(1);
- 灵活的触发模式:边沿触发(ET)可减少冗余通知,进一步提升效率;
- 低内存开销:内核仅存储注册的 fd,且就绪事件无需拷贝到用户态;
- 持久化注册:fd 注册后无需重复设置,仅需修改事件类型(如 EPOLLIN→EPOLLOUT)。
❌ 缺点:
- 平台依赖性:仅 Linux 系统支持 epoll,不跨平台(Windows 用 IOCP,macOS 用 kqueue);
- 编程复杂度略高:需手动管理 fd 上下文、回调函数,对新手不友好(但代码结构更清晰)。
(2)select 的优缺点
✅ 优点:
- 跨平台性:几乎所有操作系统都支持(Linux/Windows/macOS);
- 编程简单:API 简洁,适合入门级 IO 多路复用场景。
❌ 缺点:
- fd 数量限制:默认最多监听 1024 个 fd,修改内核参数也难以支撑高并发;
- 效率极低:每次调用需拷贝 fd_set 到内核,且需遍历所有 fd 找就绪事件;
- 功能单一:仅支持水平触发,无扩展空间。
(3)poll 的优缺点
✅ 优点:
- 跨平台性:同 select,支持主流操作系统;
- 无 fd 数量限制:用链表存储 fd,理论上可监听任意数量 fd。
❌ 缺点:
- 效率仍低:虽无 fd 数量限制,但仍需遍历所有注册的 fd,高并发下 CPU 占用高;
- 内存拷贝开销:每次调用需拷贝 pollfd 数组到内核态,fd 越多开销越大;
- 无边沿触发:仅支持水平触发,冗余通知多。
三、实战场景选择
表格
| 场景 | 推荐技术 | 原因 |
|---|---|---|
| 高并发服务器(如网关 / 直播) | Reactor(epoll) | 支撑百万级连接,低 CPU 开销 |
| 小型工具 / 跨平台程序 | select/poll | 编程简单,无需关注平台兼容性 |
| 嵌入式系统 | select | 资源占用少,API 简单 |
四、总结
-
Reactor 模型的本质:事件驱动的 IO 多路复用,核心是 “被动等待事件通知 + 回调处理”,将 CPU 资源聚焦在实际事件处理上;
-
核心优势(epoll vs select/poll):
- epoll 解决了 select/poll “轮询所有 fd” 的核心痛点,实现了 O(1) 时间复杂度;
- 无 fd 数量限制、支持边沿触发、低内存拷贝开销,是高并发的核心保障;
-
选型原则:
- 高并发场景(如百万连接):必选 Reactor(epoll);
- 小型 / 跨平台场景:可选 select/poll(优先 poll,无 fd 数量限制)。
五、Reactor实现高并发服务器流程详解
//代码实现
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <sys/time.h>
// 服务器核心结构体定义(保留原结构,仅补充注释)
#ifndef __REACTOR_H__
#define __REACTOR_H__
#define BUFFER_LENGTH 1024 // 读写缓冲区大小
typedef int (*RCALLBACK)(int fd); // 回调函数指针类型定义
/**
* @brief 单个连接的上下文结构体
* 存储每个fd的缓冲区、回调函数、状态等核心信息
*/
struct conn {
int fd; // 连接对应的文件描述符
char rbuffer[BUFFER_LENGTH];// 读缓冲区:存储从客户端读取的数据
int rlength; // 读缓冲区有效数据长度
char wbuffer[BUFFER_LENGTH];// 写缓冲区:存储要发送给客户端的数据
int wlength; // 写缓冲区有效数据长度
RCALLBACK send_callback; // 写事件回调函数(send_cb)
/**
* @brief 联合体:读事件/连接事件回调(原设计保留,补充注释)
* 监听fd:使用accept_callback;客户端fd:使用recv_callback
*/
union {
RCALLBACK recv_callback; // 读事件回调(recv_cb)
RCALLBACK accept_callback; // 新连接回调(accept_cb)
} r_action;
int status; // 连接状态(预留:0-默认 1-响应头 2-响应体)
};
#endif
// 全局配置常量
#define CONNECTION_SIZE 1048576 // 最大连接数(fd作为数组下标)
#define MAX_PORTS 20 // 监听端口数量(2000~2019)
#define LISTEN_BACKLOG 1024 // listen队列长度(原代码未定义,补充)
// 时间计算宏:计算两个timeval的毫秒差
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
// 全局变量
int epfd = 0; // epoll实例句柄
struct timeval begin; // 性能统计起始时间
struct conn conn_list[CONNECTION_SIZE] = {0}; // 连接上下文数组
// 函数声明(保留原函数名)
int accept_cb(int fd);
int recv_cb(int fd);
int send_cb(int fd);
int set_event(int fd, int event, int flag);
int event_register(int fd, int event);
int init_server(unsigned short port);
/**
* @brief 注册/修改epoll事件
* @param fd 要操作的文件描述符
* @param event 要监听的事件(EPOLLIN/EPOLLOUT)
* @param flag 操作类型:1-添加(EPOLL_CTL_ADD) 0-修改(EPOLL_CTL_MOD)
* @return 0-成功 -1-失败
*/
int set_event(int fd, int event, int flag) {
struct epoll_event ev = {0}; // 初始化事件结构体(修复原代码未初始化问题)
ev.events = event;
ev.data.fd = fd;
int op = flag ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
int ret = epoll_ctl(epfd, op, fd, &ev);
// 错误处理(原代码缺失)
if (ret == -1) {
fprintf(stderr, "epoll_ctl error: fd=%d, op=%d, errno=%d(%s)\n",
fd, op, errno, strerror(errno));
return -1;
}
return 0;
}
/**
* @brief 初始化连接上下文并注册epoll事件
* @param fd 要初始化的fd
* @param event 要注册的事件(EPOLLIN/EPOLLOUT)
* @return 0-成功 -1-失败
*/
int event_register(int fd, int event) {
if (fd < 0 || fd >= CONNECTION_SIZE) { // 补充fd范围检查
fprintf(stderr, "invalid fd: %d (max: %d)\n", fd, CONNECTION_SIZE-1);
return -1;
}
// 初始化连接上下文
struct conn *conn = &conn_list[fd]; // 指针优化,减少数组下标访问
conn->fd = fd;
conn->r_action.recv_callback = recv_cb; // 客户端fd:读回调
conn->send_callback = send_cb; // 写回调
// 清空缓冲区(原代码重复memset,简化)
memset(conn->rbuffer, 0, BUFFER_LENGTH);
memset(conn->wbuffer, 0, BUFFER_LENGTH);
conn->rlength = 0;
conn->wlength = 0;
conn->status = 0; // 初始化状态
// 注册epoll事件
return set_event(fd, event, 1);
}
/**
* @brief 新连接回调函数(监听fd触发EPOLLIN时调用)
* @param fd 监听文件描述符
* @return 0-成功 -1-失败
*/
int accept_cb(int fd) {
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
// 接受新连接(设置非阻塞?原代码未做,保留原逻辑)
int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
if (clientfd < 0) {
// EINTR是中断错误,无需报错(修复原代码无差别打印错误)
if (errno != EINTR) {
fprintf(stderr, "accept error: errno=%d(%s)\n", errno, strerror(errno));
}
return -1;
}
// 初始化客户端连接并注册读事件
event_register(clientfd, EPOLLIN);
// 每1000个连接统计一次性能
if ((clientfd % 1000) == 0) {
struct timeval current;
gettimeofday(¤t, NULL);
int time_used = TIME_SUB_MS(current, begin);
memcpy(&begin, ¤t, sizeof(struct timeval));
printf("accepted fd: %d, time used: %d ms\n", clientfd, time_used);
}
return 0;
}
/**
* @brief 读事件回调函数(客户端fd触发EPOLLIN时调用)
* @param fd 客户端文件描述符
* @return 读取的字节数 0-断开 -1-失败
*/
int recv_cb(int fd) {
struct conn *conn = &conn_list[fd];
memset(conn->rbuffer, 0, BUFFER_LENGTH);
// 读取客户端数据(MSG_NONBLOCK非阻塞?原代码未做,保留)
int count = recv(fd, conn->rbuffer, BUFFER_LENGTH - 1, 0); // 留1字节存'\0'
if (count == 0) { // 客户端主动断开
printf("client disconnect: fd=%d\n", fd);
goto close_conn; // 统一关闭逻辑
} else if (count < 0) { // 读数据失败
// EAGAIN/EWOULDBLOCK是正常非阻塞错误,EINTR是中断,无需关闭
if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
fprintf(stderr, "recv error: fd=%d, errno=%d(%s)\n", fd, errno, strerror(errno));
goto close_conn;
}
return 0;
}
// 正常读取数据,启用echo回显逻辑(原代码注释打开)
conn->rlength = count;
conn->wlength = count;
memcpy(conn->wbuffer, conn->rbuffer, count); // 读缓冲区拷贝到写缓冲区
printf("[fd=%d] RECV: %s (len=%d)\n", fd, conn->rbuffer, count);
// 修改事件为可写,触发send_cb发送数据
set_event(fd, EPOLLOUT, 0);
return count;
close_conn: // 统一关闭连接逻辑(优化原代码重复代码)
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // 从epoll中移除
memset(conn, 0, sizeof(struct conn)); // 清空上下文(防止野指针)
return -1;
}
/**
* @brief 写事件回调函数(客户端fd触发EPOLLOUT时调用)
* @param fd 客户端文件描述符
* @return 发送的字节数 0-无数据 -1-失败
*/
int send_cb(int fd) {
struct conn *conn = &conn_list[fd];
int count = 0;
// 写缓冲区有数据时发送
if (conn->wlength > 0) {
count = send(fd, conn->wbuffer, conn->wlength, 0);
if (count > 0) {
printf("[fd=%d] SEND: %s (len=%d)\n", fd, conn->wbuffer, count);
// 清空写缓冲区(原代码缺失,防止重复发送)
memset(conn->wbuffer, 0, BUFFER_LENGTH);
conn->wlength = 0;
} else if (count < 0) { // 发送失败
if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
fprintf(stderr, "send error: fd=%d, errno=%d(%s)\n", fd, errno, strerror(errno));
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
memset(conn, 0, sizeof(struct conn));
return -1;
}
}
}
// 发送完成后,改回监听读事件(等待下一次客户端数据)
set_event(fd, EPOLLIN, 0);
return count;
}
/**
* @brief 初始化TCP服务器监听套接字
* @param port 要监听的端口号
* @return 监听fd -1-失败
*/
int init_server(unsigned short port) {
// 创建TCP套接字
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
fprintf(stderr, "socket error: errno=%d(%s)\n", errno, strerror(errno));
return -1;
}
// // 设置端口复用(修复原代码端口占用问题)
// int opt = 1;
// setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
// 绑定地址和端口
struct sockaddr_in servaddr = {0};
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听所有网卡
servaddr.sin_port = htons(port);
if (bind(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1) {
fprintf(stderr, "bind error: port=%d, errno=%d(%s)\n", port, errno, strerror(errno));
close(sockfd);
return -1;
}
// 开始监听
if (listen(sockfd, LISTEN_BACKLOG) == -1) {
fprintf(stderr, "listen error: port=%d, errno=%d(%s)\n", port, errno, strerror(errno));
close(sockfd);
return -1;
}
printf("listen success: port=%d, sockfd=%d\n", port, sockfd);
return sockfd;
}
/**
* @brief 主函数:Reactor服务器核心逻辑
*/
int main() {
// 创建epoll实例(推荐用epoll_create1,修复原代码epoll_create参数问题)
epfd = epoll_create1(0);
if (epfd < 0) {
fprintf(stderr, "epoll_create1 error: errno=%d(%s)\n", errno, strerror(errno));
return -1;
}
// 初始化多个监听端口(2000~2019)
unsigned short base_port = 2000;
for (int i = 0; i < MAX_PORTS; i++) {
int sockfd = init_server(base_port + i);
if (sockfd < 0) {
continue; // 某个端口失败不影响其他端口
}
// 初始化监听fd的上下文(绑定accept回调)
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.accept_callback = accept_cb;
// 注册监听fd的读事件(新连接触发)
set_event(sockfd, EPOLLIN, 1);
}
// 记录性能统计起始时间
gettimeofday(&begin, NULL);
printf("Reactor server start success, epfd=%d\n", epfd);
// Reactor主循环
struct epoll_event events[1024] = {0}; // 就绪事件数组
while (1) {
// 等待事件就绪(-1:永久阻塞)
int nready = epoll_wait(epfd, events, 1024, -1);
if (nready < 0) {
// EINTR是信号中断,无需退出循环
if (errno == EINTR) {
continue;
}
fprintf(stderr, "epoll_wait error: errno=%d(%s)\n", errno, strerror(errno));
break;
}
// 处理就绪事件
for (int i = 0; i < nready; i++) {
int connfd = events[i].data.fd;
struct conn *conn = &conn_list[connfd];
// 处理读事件(新连接/客户端数据)
if (events[i].events & EPOLLIN) {
conn->r_action.recv_callback(connfd);
}
// 处理写事件(发送数据)
if (events[i].events & EPOLLOUT) {
conn->send_callback(connfd);
}
}
}
// 释放资源(理论上主循环不退出,仅做兜底)
close(epfd);
return 0;
}
阶段 1:程序启动 - 全局初始化(main 函数开头)
核心目标:创建 epoll 实例、初始化监听端口、注册监听事件,为高并发打下基础。
-
创建 epoll 实例
- 调用函数:
epoll_create1(0)(替代原epoll_create,更现代) - 具体作用:向 Linux 内核申请一块内存空间(用于存储监听的 fd 和事件),返回
epfd(epoll 实例句柄),作为后续所有 epoll 操作的 “入口”。 - 高并发体现:epoll 是内核级的事件监听机制,相比 select/poll,epoll 无需轮询所有 fd,仅返回就绪事件,是高并发的核心基础。
- 调用函数:
-
循环初始化 20 个监听端口(MAX_PORTS=20)
对每个端口(2000~2019),调用
init_server(port + i),该函数的具体执行步骤:- ①
socket(AF_INET, SOCK_STREAM, 0):创建 TCP 监听套接字sockfd,用于接收客户端连接。 - ②
setsockopt(SO_REUSEADDR | SO_REUSEPORT):设置端口复用,解决程序重启时 “端口被占用” 的问题,保证服务快速重启。 - ③
bind():将sockfd绑定到指定端口(如 2000),关联 IP 和端口。 - ④
listen(sockfd, LISTEN_BACKLOG):将sockfd设为 “监听状态”,内核会为该 fd 维护一个连接队列(长度 1024),暂存未处理的客户端连接。 - ⑤ 返回
sockfd:每个端口对应一个独立的监听 fd。
- ①
-
初始化监听 fd 的上下文 + 注册 epoll 事件
-
① 赋值
conn_list[sockfd].r_action.accept_callback = accept_cb:为监听 fd 绑定 “新连接处理回调”,明确该 fd 触发可读事件时要执行的逻辑。 -
② 调用
set_event(sockfd, EPOLLIN, 1)set_event函数作用:封装epoll_ctl系统调用,flag=1表示执行EPOLL_CTL_ADD,将监听 fd 以「EPOLLIN(可读事件)」注册到 epoll 实例中。- 高并发体现:将所有监听 fd 统一交给 epoll 管理,而非主线程逐个轮询,减少 CPU 空耗。
-
-
记录起始时间:
gettimeofday(&begin, NULL),用于后续统计每 1000 个连接的创建耗时。
阶段 2:主循环 - Reactor 事件驱动核心(main 函数 while (1))
核心目标:无限等待 epoll 事件,分发并处理就绪事件,这是 Reactor 模式的核心,也是高并发的关键。
-
等待事件就绪:调用
epoll_wait(epfd, events, 1024, -1)-
具体作用:主线程阻塞,内核监控 epoll 实例中所有注册的 fd,只有当 fd 触发事件(如监听 fd 有新连接、客户端 fd 有数据)时,才唤醒主线程。
-
参数说明:
events[1024]:存储内核返回的 “就绪事件数组”,最多一次返回 1024 个就绪事件;nready:实际就绪的事件数量;-1:永久阻塞,直到有事件触发(无事件时主线程休眠,不占用 CPU)。
-
高并发体现:
- 「阻塞等待」:无事件时主线程休眠,避免空轮询,大幅降低 CPU 使用率;
- 「事件通知」:内核主动告知 “哪些 fd 就绪”,而非主线程遍历所有 fd(select/poll 缺点),支持十万级以上 fd 监听。
-
-
遍历就绪事件,分发处理
循环
nready次,对每个就绪事件执行:-
① 取出就绪 fd:
connfd = events[i].data.fd; -
② 处理读事件(EPOLLIN):
conn->r_action.recv_callback(connfd)- 监听 fd 触发 EPOLLIN:调用
accept_cb(处理新连接); - 客户端 fd 触发 EPOLLIN:调用
recv_cb(读取客户端数据);
- 监听 fd 触发 EPOLLIN:调用
-
③ 处理写事件(EPOLLOUT):
conn->send_callback(connfd),调用send_cb(向客户端发送数据)。 -
高并发体现:「事件驱动 + 回调解耦」,主线程仅做 “事件分发”,具体业务逻辑(accept/recv/send)由回调函数处理,逻辑清晰且无阻塞(回调函数无耗时操作)。
-
阶段 3:新连接处理(accept_cb 函数)
核心目标:接受客户端连接,初始化客户端 fd 上下文,注册读事件。
当客户端发起 TCP 连接,监听 fd 触发 EPOLLIN 事件,主线程调用 accept_cb(fd):
-
接受连接:
accept(fd, &clientaddr, &len)- 具体作用:内核完成 TCP 三次握手,从监听 fd 的连接队列中取出一个连接,返回
clientfd(客户端专属 fd)。 - 错误处理:过滤
EINTR(信号中断)等非致命错误,仅打印真正的连接失败日志。
- 具体作用:内核完成 TCP 三次握手,从监听 fd 的连接队列中取出一个连接,返回
-
初始化客户端 fd 上下文:调用
event_register(clientfd, EPOLLIN),该函数的具体作用:-
① 检查 fd 合法性(防止数组越界);
-
② 初始化
conn_list[clientfd]:
- 绑定读回调
recv_cb、写回调send_cb; - 清空读写缓冲区,初始化长度为 0;
- 绑定读回调
-
③ 调用
set_event(clientfd, EPOLLIN, 1):将客户端 fd 以「EPOLLIN(可读事件)」注册到 epoll 实例中,等待客户端发送数据。 -
高并发体现:每个客户端 fd 独立管理上下文,epoll 统一监听,支持海量客户端连接。
-
-
性能统计:每创建 1000 个连接,计算并打印耗时,用于监控服务器连接处理能力。
阶段 4:客户端数据读取(recv_cb 函数)
核心目标:读取客户端数据,准备回显,切换为写事件。
当客户端发送数据,客户端 fd 触发 EPOLLIN 事件,主线程调用 recv_cb(fd):
-
读取数据:
recv(fd, conn->rbuffer, BUFFER_LENGTH-1, 0)-
具体作用:从客户端 fd 读取数据到读缓冲区
rbuffer,预留 1 字节存\0保证字符串合法。 -
错误处理:
count=0:客户端主动断开,跳转到close_conn逻辑;count<0:过滤EAGAIN(非阻塞无数据)、EINTR(中断),仅处理致命错误;- 正常读取:记录数据长度
rlength。
-
-
echo 回显准备:
- 将读缓冲区数据拷贝到写缓冲区
wbuffer,设置wlength; - 打印接收日志,便于调试。
- 将读缓冲区数据拷贝到写缓冲区
-
切换事件类型:调用
set_event(fd, EPOLLOUT, 0)flag=0表示执行EPOLL_CTL_MOD,修改该 fd 在 epoll 中的监听事件为「EPOLLOUT(可写事件)」;- 高并发体现:仅修改事件类型,无需重新注册 fd,减少
epoll_ctl系统调用开销。
-
连接关闭逻辑(close_conn):
close(fd):关闭客户端 fd,释放文件资源;epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL):从 epoll 中移除该 fd,停止监听;memset(conn, 0, sizeof(struct conn)):清空上下文,防止野指针。
阶段 5:客户端数据发送(send_cb 函数)
核心目标:发送回显数据,切换回读事件,等待下一次数据。
当客户端 fd 可写(内核缓冲区空闲),触发 EPOLLOUT 事件,主线程调用 send_cb(fd):
-
发送数据:
send(fd, conn->wbuffer, conn->wlength, 0)- 具体作用:将写缓冲区数据发送给客户端,发送成功后清空写缓冲区,防止重复发送。
- 错误处理:过滤非致命错误,仅处理真正的发送失败(如客户端断开)。
-
切换回读事件:调用
set_event(fd, EPOLLIN, 0),将监听事件改回 EPOLLIN,等待客户端下一次发送数据。- 高并发体现:事件类型动态切换,仅在需要时监听可写事件,减少 epoll 通知次数。
阶段 6:程序退出(理论上不触发)
主循环仅在 epoll_wait 发生致命错误时退出,最后调用 close(epfd) 释放 epoll 实例资源。
二、Reactor 实现高并发的核心原理(结合代码体现)
Reactor 模式是高并发网络编程的基石,本代码通过 epoll 实现的 Reactor 核心优势体现在以下 4 点:
1. 「内核级事件监听」替代「用户态轮询」
- 传统 select/poll:主线程需遍历所有 fd,检查是否就绪,时间复杂度 O(n),fd 数量越多越慢;
- 本代码的 epoll:内核维护就绪事件列表,仅返回有事件的 fd,时间复杂度 O(1),支持 10 万 + fd 监听(代码中
CONNECTION_SIZE=1048576也验证了这一点)。
2. 「阻塞等待」减少 CPU 空耗
- 主线程在
epoll_wait处阻塞,无事件时进入休眠状态,不占用 CPU 资源; - 只有当有连接 / 数据事件时,内核才唤醒主线程,CPU 利用率聚焦在 “处理实际事件” 上。
3. 「事件驱动 + 回调解耦」提升处理效率
- 主线程仅做 “事件分发”,不处理具体业务逻辑,避免耗时操作阻塞主线程;
- 不同事件(新连接 / 读 / 写)对应不同回调函数(
accept_cb/recv_cb/send_cb),逻辑解耦,可独立扩展; - 本代码中回调函数均为 “短逻辑”(无磁盘 IO / 复杂计算),主线程能快速处理完所有就绪事件,支撑高并发。
4. 「高效的 fd 管理」降低系统调用开销
- 监听 fd / 客户端 fd 统一注册到 epoll 后,仅需通过
epoll_ctl的MOD操作切换事件类型(如 EPOLLIN→EPOLLOUT),无需反复ADD/DEL; - 用 fd 作为
conn_list数组下标,直接通过 fd 定位连接上下文,无需遍历查找,时间复杂度 O(1)。
总结
- 执行流程核心:初始化监听端口 → 注册 epoll 事件 → 主循环等待事件 → 新连接触发
accept_cb→ 数据可读触发recv_cb→ 数据可写触发send_cb→ 动态切换事件类型; - 高并发核心:epoll 内核级事件监听(O(1) 就绪事件返回)+ 阻塞等待(低 CPU 占用)+ 事件驱动回调(主线程不阻塞)+ 高效 fd 管理(低系统调用开销);
- 代码设计亮点:保留 Reactor 模式 “注册 - 监听 - 分发 - 回调” 的核心架构,通过 epoll 实现海量 fd 的高效管理,是典型的单线程 Reactor 高并发模型。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)