epoll详解以及Reactor模式
第一部分:epoll 三大核心 API 详解
epoll 的设计哲学是 “职责分离”,它将操作拆分为三个独立的函数,每个函数只做一件事。
1. epoll_create / epoll_create1:创建一个 epoll 实例
- 作用:在内核中创建一个新的
epoll实例,并返回一个文件描述符(epfd) 来代表这个实例。 - 函数原型:
#include <sys/epoll.h> int epoll_create(int size); // size 在现代内核中仅作提示,可忽略 int epoll_create1(int flags); // 推荐使用,flags 可设为 0 或 EPOLL_CLOEXEC - 参数:
size(已过时):早期用于提示内核红黑树的大小,现在内核会动态调整,传入一个大于 0 的数即可(如 1024)。flags:epoll_create1的新特性。EPOLL_CLOEXEC标志非常有用,它能保证在执行exec系列函数时自动关闭这个epfd,避免 fd 泄漏。
- 返回值:成功返回
epfd(一个非负整数),失败返回 -1。 - 生命周期:这个
epfd会一直存在,直到你调用close(epfd)。
2. epoll_ctl:管理兴趣列表(红黑树)
-
作用:向内核的
epoll实例(由epfd指定)中添加、修改或删除需要监控的文件描述符及其事件。 -
函数原型:
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); -
参数详解:
epfd:epoll_create返回的 epoll 实例描述符。op: 操作类型。EPOLL_CTL_ADD: 添加一个新的 fd 到监控列表。EPOLL_CTL_MOD: 修改一个已存在的 fd 的监控事件。EPOLL_CTL_DEL: 删除一个 fd 的监控。此时event参数可以为NULL。
fd: 要操作的目标文件描述符(通常是 socket)。event: 指向epoll_event结构体的指针,定义了要监控的事件和关联数据。
-
struct epoll_event结构体:typedef union epoll_data { void *ptr; // 最常用!可指向自定义的连接上下文对象 int fd; // 存储 fd 本身(简单场景) uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; // 监控的事件掩码 epoll_data_t data; // 用户数据 }; -
常用事件 (
events字段):EPOLLIN: 对应的 fd 可读(有数据到达或连接断开)。EPOLLOUT: 对应的 fd 可写(发送缓冲区有空闲)。EPOLLET: 设置为边缘触发 (ET) 模式。EPOLLONESHOT: 事件触发一次后,自动从监控列表中移除,需手动重新EPOLL_CTL_MOD添加。- 组合使用:
event.events = EPOLLIN | EPOLLET;
3. epoll_wait:等待并获取就绪事件
- 作用:阻塞(或限时阻塞)等待
epoll实例中的事件发生,并将就绪的事件填充到用户提供的数组中。 - 函数原型:
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); - 参数详解:
epfd: epoll 实例描述符。events: 用户分配的数组,用于接收内核返回的就绪事件。maxevents:events数组的大小(元素个数)。必须大于 0。timeout: 超时时间(毫秒)。-1: 永久阻塞,直到有事件。0: 非阻塞,立即返回。>0: 阻塞指定毫秒数。
- 返回值:
> 0: 成功,返回值是events数组中实际填充的就绪事件数量。0: 超时。-1: 出错。
第二部分:epoll 底层实现原理(深度剖析)
核心数据结构:红黑树 + 就绪双向链表
-
epoll_create:- 内核创建一个
eventpoll对象。 - 该对象内部包含一个红黑树(rb_root) 和一个就绪的双向链表(rdllist)。
- 内核创建一个
-
epoll_ctl(ADD/MOD/DEL):- 当你添加一个 fd 时,内核会创建一个
epitem节点(包含 fd、事件、回调函数等信息),并将其插入到红黑树中。 - 同时,内核会在该 fd 对应的设备驱动(如 TCP socket)的等待队列中注册一个回调函数(通常是
ep_poll_callback)。
- 当你添加一个 fd 时,内核会创建一个
-
事件发生(硬件中断):
- 假设网卡收到数据包,产生硬件中断。
- 内核的网络协议栈处理完数据包后,会将数据放入对应 socket 的接收缓冲区。
- 此时,socket 的状态变为“可读”。
- 内核会遍历该 socket 的等待队列,并调用所有注册的回调函数。
ep_poll_callback被调用,它会找到对应的epitem节点,并将其加入到eventpoll对象的就绪链表(rdllist)末尾。- 如果此时有进程正在
epoll_wait上睡眠,ep_poll_callback还会唤醒该进程。
-
epoll_wait:- 进程被唤醒后,
epoll_wait函数检查就绪链表。 - 它将链表中的
epitem节点信息(主要是events和data)拷贝到用户提供的events数组中。 - 函数返回,用户程序开始处理这些就绪事件。
- 进程被唤醒后,
关键优势:整个过程没有对所有被监控的 fd 进行任何扫描。事件的发生直接通过回调精准地通知到
epoll实例。
如下图
第三部分:Reactor 模式——epoll 的灵魂架构
epoll 提供了高效的 I/O 通知机制,但要构建一个完整的服务器,还需要一个清晰的架构来组织代码。这就是 Reactor 模式。
Reactor 模式的核心思想
- I/O 事件驱动:服务器的逻辑围绕“事件”展开,而不是围绕“连接”。
- 职责分离:
- Reactor (反应器):负责监听和分发所有 I/O 事件。
- Handler (处理器):负责处理具体的业务逻辑。
第四部分:具体案例
案例分为两部分,一部分是网络层主要是用epoll以reactor的模式实现,另一部分是简单的http业务(处理接收的数据和给定发送的数据)
下列内容是网络部分的代码,主要实现TCP的连接和数据收发
/*
本文件主要完成reactor的应用
*/
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include<sys/epoll.h>
#include<sys/errno.h>
#include<time.h>
#include"server.h"
#define CONNECTION_SIZE 1048576
#define MAX_PORT 20
#define TIME_SUB_MS(end,start) ((end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9)
int send_cb(int fd);
int recv_cb(int fd);
int epfd=0;
struct timespec begin;// 计算时间的结构体
// 下列结构体的作用:每一个fd都有对应的事件,触发某个特定的事件时
struct Conn conn_list[CONNECTION_SIZE] = {0}; // 定义fd数组
int set_event(int fd, int event, int flag)
{
if(flag) // non-zero add
{
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
}
else // zero modify
{
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);
}
}
int event_register(int fd, int event)
{
if(fd < 0) return -1;
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;
// 初始化存储空间,以防有脏数据,方便后面使用
memset(conn_list[fd].rbuffer, 0,BUFFER_LENGTH);
conn_list[fd].rlength = 0;
memset(conn_list[fd].wbuffer, 0,BUFFER_LENGTH);
conn_list[fd].wlength = 0;
set_event(fd, EPOLLIN, 1);
}
// listenfd --> EPOLLIN --> accept_cb
int accept_cb(int fd)
{
struct sockaddr_in clientAddr;
socklen_t len= sizeof(clientAddr);
int clientfd = accept(fd, (struct sockaddr *)&clientAddr, &len);
// printf("accept finshed:%d \n", clientfd);
if(clientfd < 0)
{
printf("accept errno: %d --> %s\n", errno, strerror(errno));
return -1;
}
if((clientfd%1000)==0) // 每建立1000条连接打印一次
{
struct timespec current;
clock_gettime(CLOCK_MONOTONIC, ¤t);
float time_used = TIME_SUB_MS(current,begin); // 计算得到建立1000条连接所用时间
memcpy(&begin, ¤t, sizeof(struct timespec)); // 进行更新
printf("accept finshed:%d \t time_used: %.6f\n", clientfd,time_used);
}
event_register(clientfd, EPOLLIN);
return 0;
}
int recv_cb(int fd)
{
memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);
if(count == 0) // disconnect
{
printf("client disconnect: %d\n" ,fd);
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); // 待完善
return 0;
}
else if(count<0)
{
printf("count: %d, errno:%d, %s\n", count, errno, strerror(errno));
close(fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
conn_list[fd].rlength=count;
#if 0 // 回传ehco
conn_list[fd].wlength = conn_list[fd].rlength;
memcpy(conn_list[fd].wbuffer,conn_list[fd].rbuffer, conn_list[fd].wlength);
printf("REC : %s\t size: %d\n",conn_list[fd].rbuffer,conn_list[fd].rlength);
#elif 0
/* 实现http server的数据打印 */
http_request(&conn_list[fd]);
#else
/* 实现websocket的数据打印 */
ws_request(&conn_list[fd]);
#endif
set_event(fd, EPOLLOUT, 0);
return count;
}
int send_cb(int fd)
{
int count = 0;
#if 0
/* 实现http server的数据传输*/
http_response(&conn_list[fd]);
if(conn_list[fd].status == 0)
{
count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);
conn_list[fd].wlength = 0;
memset(conn_list[fd].wbuffer, 0,BUFFER_LENGTH);
set_event(fd,EPOLLOUT, 0);
conn_list[fd].status=1;
}
else if(conn_list[fd].status == 1)
{
set_event(fd,EPOLLOUT, 0);
}
else if(conn_list[fd].status == 2)
{
set_event(fd,EPOLLIN, 0);
conn_list[fd].status=0;
}
#else
/* 实现websocket的数据传输*/
ws_response(&conn_list[fd]);
#endif
return count;
}
int init_server(unsigned short port)
{
// TCPserver的基本步骤
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in Addr;
Addr.sin_family= AF_INET;
Addr.sin_port = htons(port);
Addr.sin_addr.s_addr = INADDR_ANY;
if(bind(listenfd,(struct sockaddr *)&Addr,sizeof(Addr))==-1) // 成功返回0
{
perror("bind");
return -1;
}
if(listen(listenfd, 5))
{
perror("listen");
return -1;
}
//printf("listen finished: %d\n", listenfd);
return listenfd;
}
int main()
{
unsigned short port = 8888;
epfd = epoll_create(1);
for(int i = 0;i<MAX_PORT;++i)
{
int listenfd = init_server(port+i);
conn_list[listenfd].fd = listenfd;
conn_list[listenfd].r_action.recv_callback = accept_cb;
// 此处为什么不能是event_register()(简称ER),因为ER中的可读回调函数是recv不是accept,而且accept只需要执行一次即可
set_event(listenfd, EPOLLIN | EPOLLET, 1); // 所以此处只能是单独设置fd的事件类型
}
clock_gettime(CLOCK_MONOTONIC, &begin); // 获取时间段
while(1) // mainloop
{
struct epoll_event events[1024] ={0};
// -1(阻塞等待),等待有事件发生的IO,并将有事件发生的fd相关信息依次放到events中,方便后续操作
int nready = epoll_wait(epfd, events, 1024, 5000);
for(int i=0;i<nready;++i)
{
int connfd = events[i].data.fd;
if(events[i].events & EPOLLIN) // 仅仅是针对事件看是可读(EPOLLIN)还是可写(EPOLLIN),而不需要看连接
{
conn_list[connfd].r_action.recv_callback(connfd);
}
if(events[i].events & EPOLLOUT)
{
conn_list[connfd].send_callback(connfd);
}
}
}
}
下列文件为webserver.c,是具体业务方面代码,主要是给定发送的数据内容和处理收到的数据
/*
本文件实现http协议,只负责业务方面内容(也就是传输到网页中的内容),具体的底层(网络IO)需要参照reactor_app
下列内容都是http默认的请求头
{
"HTTP/1.1 200 ok\r\n"
"Content-Type: text/html\r\n"
"Accept-Rangs: bytess\r\n"
"Content-Length: 83\r\n"
"Date: Tue, 2 Apr 2026 9:59:36 GMT\r\n\r\n"
}
*/
#include<stdio.h>
#include<unistd.h>
#include<fcntl.h>
#include<sys/stat.h>
#include"server.h"
# include <sys/sendfile.h>
#include <string.h>
int http_request(struct Conn *c)
{
printf("requset: %s\n",c->rbuffer);
}
int http_response(struct Conn *c)
{
#if 0 //
c->wlength =sprintf(c->wbuffer,
"HTTP/1.1 200 ok\r\n"
"Content-Type: text/html\r\n"
"Accept-Rangs: bytess\r\n"
"Content-Length: 83\r\n"
"Date: Tue, 2 Apr 2026 9:59:36 GMT\r\n\r\n"
"<html><head><title>Ning.com</title></head><body><h1>Ning, NB</h1></body></html>"
);
#elif 0
/*
此处是把数据放到一个buffer中且只发送了两次,一次是reactor_app中,一次是这里
此处代码没办法发送长段的数据,当数据超过1024时就发送不了了
*/
int filefd = open("index.html", O_RDONLY);
struct stat stat_buf;
// 此处用来获取文件的大小
fstat(filefd, &stat_buf); // 用结构体stat关联文件,以达到可以访问文件相关信息的作用
/*************** http body头部 *************/
// sprintf可以返回写入指定区域数据的大小
c->wlength = sprintf(c->wbuffer,
"HTTP/1.1 200 ok\r\n"
"Content-Type: text/html\r\n"
"Accept-Rangs: bytess\r\n"
"Content-Length: %ld\r\n"
"Date: Tue, 2 Apr 2026 10:22:36 GMT\r\n\r\n", stat_buf.st_size
);
/*************** http body部分 *************/
// 下面加+wlength是偏移地址,减wlength是剩余的空间
int count = read(filefd,(c->wbuffer+c->wlength), (BUFFER_LENGTH-c->wlength));
c->wlength += count;
close(filefd);
#elif 0
/*(完整版)
此处是上述代码的改进,支持长段数据的发送
使用了状态机,在Conn结构体中添加了状态
*/
int filefd = open("index.html", O_RDONLY);
struct stat stat_buf;
// 此处用来获取文件的大小
fstat(filefd, &stat_buf); // 用结构体stat关联文件,以达到可以访问文件相关信息的作用
if(c->status == 0) // 状态机为0时,发送头部数据
{
c->wlength = sprintf(c->wbuffer,
"HTTP/1.1 200 ok\r\n"
"Content-Type: text/html\r\n"
"Accept-Rangs: bytess\r\n"
"Content-Length: %ld\r\n"
"Date: Tue, 2 Apr 2026 10:22:36 GMT\r\n\r\n", stat_buf.st_size
);
}
else if(c->status == 1) // 状态机为1时发送 body数据(存在在文件中)
{
// 下列函数成功会返回输出的字符大小,失败返回-1
int ret = sendfile(c->fd,filefd, NULL, stat_buf.st_size); // 该函数可以直接将文件中的数据通过IO发送出去
if(ret ==-1)
{
perror("sendfile");
}
c->status = 2;
}
close(filefd);
#else // 图片传输, 依旧沿用完整版代码,只做了数据内容的更改
int filefd = open("myPic.jpg", O_RDONLY);
struct stat stat_buf;
// 此处用来获取文件的大小
fstat(filefd, &stat_buf); // 用结构体stat关联文件,以达到可以访问文件相关信息的作用
if(c->status == 0) // 状态机为0时,发送头部数据
{
c->wlength = sprintf(c->wbuffer,
"HTTP/1.1 200 ok\r\n"
"Content-Type: image/jpg\r\n"
"Accept-Rangs: bytes\r\n"
"Content-Length: %ld\r\n"
"Date: Tue, 2 Apr 2026 10:22:36 GMT\r\n\r\n", stat_buf.st_size
);
}
else if(c->status == 1) // 状态机为1时发送 body数据(存在在文件中)
{
// 下列函数成功会返回输出的字符大小,失败返回-1
int ret = sendfile(c->fd,filefd, NULL, stat_buf.st_size); // 该函数可以直接将文件中的数据通过IO发送出去
if(ret ==-1)
{
perror("sendfile");
}
c->status = 2;
}
close(filefd);
#endif
return c->wlength;
}
头文件server.h
#ifndef _SERVER_H
#define _SERVER_H
#define BUFFER_LENGTH 1024
typedef int (*RCALLBACK)(int fd); // 声明一下函数指针类型
struct Conn
{
int fd;
char rbuffer[BUFFER_LENGTH];
int rlength;
char wbuffer[BUFFER_LENGTH];
int wlength;
RCALLBACK send_callback;
union{ // union类似于struct 但union中共享一块内存,且只能有一个成员有效
RCALLBACK recv_callback;
RCALLBACK accept_callback;
}r_action;
int status;
};
// webserver
int http_request(struct Conn *c);
int http_response(struct Conn *c);
#endif
总结:上述内容中的reactor模式能做到底层网络和上层业务分隔,
核心是conn结构体,业务层将所要发送的数据放入conn中,网络层对该结构体进行处理(reacto模式)后,将收到的数据也放到conn中,再有业务层取用即可。
于网络层而言只需要完成本身应该完成的内容(即:TCP连接和数据的收发),不需要管发生的数据是什么,怎样处理接收到的数据;而业务层则只需要将发送数据放到conn中的rbuffer,再从conn中的wbuffer拿出数据即可,不需要理会底层数据是怎么发送的。
https://github.com/0voice
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)