【学习笔记】从零开始学习Reactor网络服务器设计模式
一、前言
Reactor 是什么?
Reactor 本质上不是一个具体函数,而是一种网络服务器设计模式。
它解决的问题是:
有很多客户端连接时,服务器不能给每个连接都一直阻塞等待,所以让一个“事件监听器”统一监听所有 fd,哪个 fd 有事件了,就分发给对应的处理函数。
Reactor = 事件监听 + 事件分发 + 事件处理
1. epoll 负责监听事件
2. epoll_wait 返回已经就绪的事件
3. 根据 fd 类型分发:
监听 socket 就 accept
客户端 socket 就 recv/send
4. 处理完后继续回到 epoll_wait 等下一批事件
普通的epoll一般在main函数里面用if else等判断来确定是哪种类型的fd,但是 Reactor 会把每个 fd 和它对应的处理函数绑定起来。
监听 socket 绑定accept_callback
客户端 socket 绑定recv_callback
写事件绑定 send_callback
这样 epoll_wait 发现某个 fd 就绪后,不再写一堆 if else,而是:
event->callback(event);
也就是:
谁有事件,就调用谁自己的处理函数。
Reactor 里面一般有哪些角色?
1. Reactor 主循环
负责 epoll_wait,等待事件发生。
2. Event 事件对象
通常保存:
fd 监听的事件类型 EPOLLIN / EPOLLOUT 回调函数 callback buffer 状态
3. Acceptor
专门处理新连接,也就是 accept。
4. Handler
专门处理普通客户端连接,比如 recv、send。
5. Dispatcher 分发器
epoll_wait 返回后,根据事件调用对应 callback。
Reactor 的运行流程:
服务器启动->创建监听 socket->创建 epoll->把监听 socket 加入 epoll->监听 socket 绑定 accept_cb->进入 while(1)->epoll_wait 等事件->
如果监听 socket 可读:调用 accept_cb + accept 新客户端 + 新客户端 fd 加入 epoll + 新客户端绑定 recv_cb->
如果客户端 fd 可读:调用 recv_cb + recv 数据 + 业务处理 + 如果要回写,切换成 EPOLLOUT->
如果客户端 fd 可写:调用 send_cb + send 数据 + 发送完成后切回 EPOLLIN
reactor和以往多线程的区别:以往多线程是:一个客户端连接来了就创建一个线程 这个线程专门 recv/send 这个客户端 reactor是一个线程监听很多 fd,哪个 fd 有事件,就处理哪个
二、代码实现
代码就是一个单 Reactor 单线程模型的雏形:主线程里只有一个 epoll_wait 事件循环,然后根据事件调用不同回调函数。代码里核心入口是 main() 的 while(1),监听到 EPOLLIN 就调用 conn_list[connfd].r_action.recv_callback(connfd),监听到 EPOLLOUT 就调用 conn_list[connfd].send_callback(connfd)。
也就是用 epoll 监听所有 fd;
用 conn_list[fd] 保存每个 fd 对应的连接信息;
事件来了以后,不在 main 里直接处理,而是调用这个 fd 绑定的回调函数。
server.h 代码
#ifndef __SERVER_H__ //头文件保护宏
#define __SERVER_H__
#define BUFFER_LENGTH 1024 //每个连接的读缓冲区、写缓冲区都是 1024 字节。
typedef int (*RCALLBACK)(int fd); //RCALLBACK 是一种函数指针类型。 这种函数接收一个 int fd,返回 int。
//int accept_cb(int fd); int recv_cb(int fd); int send_cb(int fd); 这些函数都是符合要求的函数
struct conn{ //结构体:一个连接的信息。
int fd; //主文件里面写:struct conn conn_list[CONNECTION_SIZE] = {0}; 这里的下标就是fd epoll_wait 返回 fd 后,可以 O(1) 找到这个 fd 对应的连接信息。
char rbuffer[BUFFER_LENGTH]; //读缓冲区:从客户端收到的数据放这
int rlength; //读缓冲区里实际有多少字节有效数据
char wbuffer[BUFFER_LENGTH]; //写缓冲区:要发给客户端的数据放这
int wlength; //写缓冲区里待发送的字节数
RCALLBACK send_callback; //// 写就绪时调用的回调(指向 send_cb)
union{
RCALLBACK recv_callback;
RCALLBACK accept_callback;
}r_action;
// 联合体:监听 fd 用 accept_callback,普通连接 fd 用 recv_callback
// 它们永远不会同时存在,所以可以共用内存
//可以用这种方式进行赋值:conn_list[fd].r_action.recv_callback = recv_cb; conn_list[fd].send_callback = send_cb;
int status; // 状态机字段(HTTP/WS 协议解析用)
#if 1 // websocket
char *payload; //WebSocket 真正的数据内容
char mask[4]; //WebSocket 客户端数据会带 4 字节掩码
#endif
};
int http_request(struct conn *c);
int http_response(struct conn *c);
int ws_request(struct conn *c);
int ws_response(struct conn *c);
#endif
以上server.h 做了下面几件事:
这个头文件本质上是一份"接口说明书"
告诉所有 #include "server.h" 的源文件:
我有一种类型叫 RCALLBACK
我有一个数据结构叫 struct conn
我有四个业务函数可以调用
真正的实现在别的 .c 文件里
struct conn 是整个 Reactor 模式的核心
把 fd 和它的所有相关状态打包在一起
这个 server.h 定义了一个基于 Reactor 模型的连接管理结构。
每个连接用 conn 表示,内部包含读写缓冲区、回调函数以及状态机字段。
通过函数指针实现事件驱动,将 epoll 的 IO 事件映射到具体处理逻辑。
同时在上层实现 HTTP 和 WebSocket 协议解析,实现 IO 层与协议层的解耦。
总结:server.h 通过 struct conn 给每个连接建了一份"档案",档案里既有数据(缓冲区)也有行为(回调函数指针),让上层的事件循环只需要"查 fd 找档案 → 调用档案里的回调"就能处理任何连接。
主函数代码
#include <errno.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <poll.h>
#include <sys/epoll.h>
#include <errno.h>
#include <sys/time.h>
#include "server.h"
//头文件的引入 记得引入server.h
在实际代码之前,先讲一下main函数的逻辑:
┌─────────────────────────────────────────────────────────┐
│ main 的 4 个阶段 │
├─────────────────────────────────────────────────────────┤
│ │
│ 阶段 1:创建 epoll 实例 │
│ └─ epoll_create(1) │
│ "雇一个秘书,让它帮我管 fd" │
│ │
│ 阶段 2:批量启动监听服务(开 20 个端口) │
│ └─ for 循环 × 20: │
│ ├─ init_server(port) "开张营业" │
│ ├─ 给监听 fd 建档案 "登记到 conn_list" │
│ └─ set_event(...) "把监听 fd 交给秘书" │
│ │
│ 阶段 3:开始计时(性能统计用) │
│ └─ gettimeofday(&begin, NULL) │
│ │
│ 阶段 4:进入事件循环(程序的"心脏",永远不停) │
│ └─ while(1): │
│ ├─ epoll_wait "问秘书:谁就绪了?" │
│ └─ 遍历就绪事件,调用对应的回调函数 │
│ │
└─────────────────────────────────────────────────────────┘

#define CONNECTION_SIZE 1048576 //1024 * 1024 最多准备管理 1048576 个连接对象。 后续要创建一个很大的连接数组,最多能放 1048576 个 conn。
#define MAX_PORTS 20 //最多监听 20 个端口。
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000) //计算两个时间点相差多少毫秒。
int accept_cb(int fd); //监听 socket 有新连接时调用
int recv_cb(int fd); //客户端 socket 有数据可读时调用
int send_cb(int fd); //客户端 socket 可以写数据时调用
//三个回调函数声明。 提前声明是因为后面的 event_register() 里面会用到
int epfd = 0; //这个是整个服务器的 epoll 实例 fd。后面会有:epfd = epoll_create(1); 创建成功后,epfd 就代表一个 epoll 对象。
struct timeval begin; //记录时间
struct conn conn_list[CONNECTION_SIZE] = {0}; //conn_list[fd]保存了某个 fd 对应的信息。
以上是前置准备,下面是函数部分
set_event函数
int set_event(int fd,int event,int flag){ //set_event 是对 epoll_ctl 的简单封装,作用是"告诉 epoll:请帮我盯着这个 fd 的某种事件"。
/*参数: fd:表示你要操作哪个 socket 可能是监听socket也可能是客户端socket 参数 event:表示你关心什么事件。 可读/可写 参数 flag:第一次注册用 ADD
后续切换事件用MOD flag == 1就是第一次添加 flag == 0就是后续切换*/
/*EPOLL_CTL_ADD 添加:第一次让 epoll 盯着这个 fd
EPOLL_CTL_MOD 修改:已经盯着了,但我想换一种盯法(比如从盯读改成盯写)
EPOLL_CTL_DEL 删除:不要再盯了(fd 关闭时常用)*/
if(flag){ //flag == 1第一次添加
struct epoll_event ev; //创建 epoll_event ev是告诉epoll:我要监听哪个 fd 的什么事件。
ev.events = event; //设置监听事件 比如如果是EPOLLIN 就代表监听可读事件
ev.data.fd = fd; //绑定 fd
epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev); //把 fd 添加到 epfd 这个 epoll 实例中。
//四个参数分别是:epfd:哪个epoll实例 EPOLL_CTL_ADD:添加操作 fd:要添加哪个 fd &ev:这个 fd 要监听什么事件
}else{ //flag == 0修改已经有的
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epfd,EPOLL_CTL_MOD,fd,&ev); //其他和上面一样 只有这里变成MOD 表示这个fd已经在epoll里面了,现在只是修改它监听的事件。
}
}
/*使用时机:时机 1:服务器启动时,把监听 socket 交给 epoll 盯着 位置:main 函数里 set_event(sockfd, EPOLLIN, 1);
时机 2:新客户端连接进来,把这个新连接也交给 epoll 盯着 位置:event_register 函数里(accept_cb 调用它) set_event(fd, event, 1);
时机 3:收到客户端数据后,告诉 epoll "我现在要写数据了,请盯写事件" 位置:recv_cb 函数末尾 set_event(fd, EPOLLOUT, 0);
时机 4:数据发完后,告诉 epoll "改回去,继续盯读事件" 位置:send_cb 函数末尾 set_event(fd, EPOLLIN, 0);
每当你需要"让 epoll 开始盯一个 fd"或"修改正在盯的方式",就调用 set_event。*/
event_register函数
int event_register(int fd,int event){ //函数的作用:初始化一个客户端连接,并把它注册到 epoll 中。
if(fd < 0) return -1; //判断fd合法
conn_list[fd].fd = fd; //保存fd
conn_list[fd].r_action.recv_callback = recv_cb; //绑定读回调recv_cb
conn_list[fd].send_callback = send_cb; //绑定写回调 send_cb
memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);
conn_list[fd].rlength = 0; //初始化这个连接的读缓冲区。
memset(conn_list[fd].wbuffer,0,BUFFER_LENGTH);
conn_list[fd].wlength = 0; //初始化这个连接的写缓冲区。
set_event(fd,event,1); //把这个 fd 加入 epoll。
}
/*在注册阶段,也就是事件发生之前,先把fd准备好。event_register 负责初始化新客户端连接,并绑定 recv_cb / send_cb。*/
accept_cb函数
int accept_cb(int fd){ //监听 fd 发生 EPOLLIN 事件时,调用 accept_cb。这里的fd是监听fd,不是客户端fd
struct sockaddr_in clientaddr; //声明 sockaddr_in 结构体 下面调用 accept 时,操作系统会把客户端的 IP 和端口写到这个变量里——这样我们就能知道"是谁连进来了"。
socklen_t len = sizeof(clientaddr); //声明长度变量 下面accept要用
int clientfd = accept(fd,(struct sockaddr*)&clientaddr,&len); //调用函数 accept,从监听 fd 的"等待队列"里取出一个客户端连接。
//3个参数:sockfd:监听 socket的fd addr:指向一块内存,accept会把客户端地址写进去(IP+端口) addrlen:指向socklen_t的指针
//这里强转为通用接口struct sockaddr*
if(clientfd < 0){
printf("accept errno: %d ---> %s\n",errno,strerror(errno)); //clientfd非法 strerror(errno)是把errno解耦成人类能看懂的语言
return -1;
}
event_register(clientfd,EPOLLIN); // 注册新客户端 EPOLLIN | EPOLLET 是边缘触发
//LT(水平触发):只要 fd 上有数据没读完,每次 epoll_wait 都会通知你。容错强,但效率略低。
//ET(边缘触发):只在状态变化的瞬间通知一次,没读完是你的事。高效,但写错容易丢数据。
//新连接的整个建立流程到这里就完成了。 下面是性能统计部分
if(clientfd % 1000 == 0){ //每当 clientfd 是 1000 的倍数时,打印一次统计信息。
struct timeval current; //声明一个 timeval 变量,然后调用 gettimeofday 把"当前时间"写进去。 获取到当前时间
gettimeofday(¤t,NULL);
int time_used = TIME_SUB_MS(current,begin); //计算从上次记录的时间点 begin 到当前 current 经过了多少毫秒。
memcpy(&begin,¤t,sizeof(struct timeval)); //把 current 的内容复制到 begin,作为下一次计算的"起点"。
printf("accept finished: %d,time_used: %d\n",clientfd,time_used); //打印性能日志
}
return 0;
}
//accept_cb 不负责和客户端通信,它只负责 accept 新连接,然后把新的 clientfd 注册到 epoll 和 conn_list 里,让这个 clientfd 以后由 recv_cb / send_cb 处理。
recv_cb函数
int recv_cb(int fd){ //当某个客户端 fd 上有数据可读时,调用 recv 把数据收进来,交给协议层处理,然后切换关注写事件。
//这里的fd是客户端fd,不是监听fd
memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH); //清空读缓冲区 防止本次接收残留上次接收的数据
int count = recv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0); //真正从客户端读数据。
//第1个参数 fd:从哪个客户端socket读 第2个参数 conn_list[fd].rbuffer:读到的数据放哪里
//第3个参数 BUFFER_LENGTH:最多读多少字节,这里是1024 第 4 个参数 0:默认读取方式 返回值 count 表示:实际读到了多少字节
if(count == 0){ //返回0:对端关闭了连接(client 调用了 close) 应该关闭我方 fd,从 epoll 移除
printf("client disconnect : %d\n",fd); //打印日志
epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL); //把 fd 从 epoll 监听集合里移除。
close(fd); //关闭fd
return 0;
}else if(count < 0){ //count < 0 读取出错
printf("count: %d,errno: %d,%s\n",count,errno,strerror(errno)); //打印日志
epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL); //把fd从epoll家你听集合中删除
close(fd); //关闭fd
return 0;
}
conn_list[fd].rlength = count; //记录数据长度
//三选一的协议处理
#if 0 // echo回显 最简单的回显服务器:把刚收到的数据原样复制到写缓冲区,等会发给客户端就是。
conn_list[fd].wlength = conn_list[fd].rlength;
memcpy(conn_list[fd].wbuffer,conn_list[fd].rbuffer,conn_list[fd].wlength);
printf("[%d]RECV : %s\n",conn_list[fd].rlength,conn_list[fd].rbuffer);
#elif 0 //HTTP 协议 后续在server.c中补充
http_request(&conn_list[fd]);
#else //WebSocket 协议(当前激活) 调用 ws_request 也在server.c中补充
ws_request(&conn_list[fd]);
#endif
set_event(fd,EPOLLOUT,0);//切换关注事件 把 fd 在 epoll 中的关注事件从 EPOLLIN 改成 EPOLLOUT。
//发送缓冲区绝大多数时候都是空的(因为内核很快把数据发出去了)。如果一直关注 EPOLLOUT,epoll 会一直通知你"可以写了",但没东西要写所以读完切写
return count; //返回收到的字节数
}

send_cb函数
int send_cb(int fd){ //当某个 fd 的发送缓冲区有空间时,把 wbuffer 里准备好的响应发出去,然后切回关注 EPOLLIN。
// 客户端连接 fd 触发 EPOLLOUT 事件时调用
#if 0
http_response(&conn_list[fd]); //http协议 后会实现
#else
ws_response(&conn_list[fd]); //也是未实现部分 WebSocket服务器
#endif
int count = 0;
#if 0 //"协议状态机"的设计 支持分多次发送
if(conn_list[fd].status == 1){ //状态 1:发数据,但发完之后还要继续发(保持 EPOLLOUT)
count = send(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);
set_event(fd,EPOLLOUT,0);
}else if(conn_list[fd].status == 2){ // 状态 2:现在不发,但保持关注 EPOLLOUT
set_event(fd,EPOLLOUT,0);
}else if(conn_list[fd].status == 0){ // 状态 0:发完了,切回 EPOLLIN
if(conn_list[fd].wlength != 0){
count = send(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);
}
set_event(fd,EPOLLIN,0);
}
#else
if(conn_list[fd].wlength != 0){ //如果写缓冲区里有数据,就发送。
count = send(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);
}
set_event(fd,EPOLLIN,0); //发送完切回 EPOLLIN 避免一直告诉你可以写的空转
#endif
return count;
}

init_server函数
int init_server(unsigned short port){ //创建一个监听某个端口的 TCP socket,并返回这个 socket 的 fd。
//只在程序启动时被调用,被 main 函数循环调用 20 次
int sockfd = socket(AF_INET,SOCK_STREAM,0); //调用 socket() 系统调用,创建一个新的 socket,得到一个 fd。
//三个参数:协议族 socket 类型(流式还是数据报) 具体协议(一般传 0 让系统选)
struct sockaddr_in servaddr; //声明地址结构体
servaddr.sin_family = AF_INET; //把 sin_family 字段填成 AF_INET,告诉系统"这是 IPv4 地址"。
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); //填 IP 地址 0.0.0.0 绑定到本机的所有网卡 IP 地址 注意要转化成网络字节序
servaddr.sin_port = htons(port); //把端口号转成网络字节序后填进去。 0-1023需要特权
if(-1 == bind(sockfd,(struct sockaddr*)&servaddr,sizeof(struct sockaddr_in))){ //调用 bind() 系统调用,把 socket 和地址绑定
printf("bind failed: %s\n",strerror(errno));
close(sockfd);
return -1;
}
listen(sockfd,10); //调用 listen() 系统调用,把 socket 变成监听状态。
return sockfd; //// 返回监听 fd,main 会拿去注册到 epoll
}
main函数
int main(){
unsigned short port = 2000;
epfd = epoll_create(1);
int i = 0;
for(i = 0;i < MAX_PORTS;++i){ //启动 20 个监听端口 循环 20 次,每次:创建一个监听 socket(端口 = 2000 + i)
//给监听 fd 在 conn_list 里建档 注册到 epoll,关注 EPOLLIN
int sockfd = init_server(port + i);
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_cb;
set_event(sockfd,EPOLLIN,1); //注册到 epoll 关注 EPOLLIN(读事件)。
}
gettimeofday(&begin,NULL); //获取当前时间,写到全局变量 begin
while(1){
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd,events,1024,-1); //调用 epoll_wait() 系统调用,等待至少一个事件就绪,然后返回就绪事件的数量。
int i = 0;
for(i = 0;i < nready;++i){ //处理就绪事件
int connfd = events[i].data.fd;
#if 0 //只处理一个事件。
if(events[i].events & EPOLLIN){
conn_list[connfd].r_action.recv_callback(connfd);
}else if(events[i].events & EPOLLOUT){
conn_list[connfd].send_callback(connfd);
}
#else //用两个独立 if,同一 fd 的多个事件都处理。
if(events[i].events & EPOLLIN){
conn_list[connfd].r_action.recv_callback(connfd);
}
if(events[i].events & EPOLLOUT){
conn_list[connfd].send_callback(connfd);
}
#endif
}
}
}
/*main 函数是 Reactor 的事件循环中心。
启动时,它创建 epoll 和监听 socket;
然后把监听 socket 绑定 accept_cb 并注册到 epoll;
运行时,它通过 epoll_wait 等待所有 fd 的事件;
事件来了以后,它不直接 accept/recv/send,
而是根据 conn_list[fd] 里提前绑定好的 callback 来调用对应函数。*/
对于http和webserver的实现,在下一篇文章中,这里if 0只做参考
下面是测试函数代码
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/time.h>
#include <unistd.h>
#define MAX_BUFFER 128
#define MAX_EPOLLSIZE (384*1024)
#define MAX_PORT 20
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
int isContinue = 0;
static int ntySetNonblock(int fd) {
int flags;
flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) return flags;
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0) return -1;
return 0;
}
static int ntySetReUseAddr(int fd) {
int reuse = 1;
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}
int main(int argc, char **argv) {
if (argc <= 2) {
printf("Usage: %s ip port\n", argv[0]);
exit(0);
}
const char *ip = argv[1];
int port = atoi(argv[2]);
int connections = 0;
char buffer[128] = {0};
int i = 0, index = 0;
struct epoll_event events[MAX_EPOLLSIZE];
int epoll_fd = epoll_create(MAX_EPOLLSIZE);
strcpy(buffer, " Data From MulClient\n");
struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip);
struct timeval tv_begin;
gettimeofday(&tv_begin, NULL);
int sockfd = 0;
while (1) {
if (++index >= MAX_PORT) index = 0;
struct epoll_event ev;
if (connections < 340000 && !isContinue) {
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
perror("socket");
goto err;
}
//ntySetReUseAddr(sockfd);
addr.sin_port = htons(port+index);
if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
perror("connect");
goto err;
}
ntySetNonblock(sockfd);
ntySetReUseAddr(sockfd);
sprintf(buffer, "Hello Server: client --> %d\n", connections);
send(sockfd, buffer, strlen(buffer), 0);
ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLOUT;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);
connections ++;
}
//connections ++;
if (connections % 1000 == 999 || connections >= 340000) {
struct timeval tv_cur;
memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));
gettimeofday(&tv_begin, NULL);
int time_used = TIME_SUB_MS(tv_begin, tv_cur);
printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);
int nfds = epoll_wait(epoll_fd, events, connections, 100);
for (i = 0;i < nfds;i ++) {
int clientfd = events[i].data.fd;
if (events[i].events & EPOLLOUT) {
// sprintf(buffer, "data from %d\n", clientfd);
send(sockfd, buffer, strlen(buffer), 0);
} else if (events[i].events & EPOLLIN) {
char rBuffer[MAX_BUFFER] = {0};
ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
if (length > 0) {
// printf(" RecvBuffer:%s\n", rBuffer);
if (!strcmp(rBuffer, "quit")) {
isContinue = 0;
}
} else if (length == 0) {
printf(" Disconnect clientfd:%d\n", clientfd);
connections --;
close(clientfd);
} else {
if (errno == EINTR || errno == EAGAIN || errno == ENOTSOCK) continue;
printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);
close(clientfd);
}
} else {
printf(" clientfd:%d, errno:%d\n", clientfd, errno);
close(clientfd);
}
}
}
usleep(500);
}
return 0;
err:
printf("error : %s\n", strerror(errno));
return 0;
}
本文基于 Linux 下的 epoll 机制,从零实现了一个单线程 Reactor 模型网络服务器。通过将“事件监听”与“业务处理”解耦,实现了高效、可扩展的 IO 多路复用架构。
整个系统以 epoll 为核心,采用“事件驱动 + 回调函数”的设计思想,将不同类型的 socket(监听 socket 与客户端 socket)统一抽象为 conn 结构体,通过函数指针实现事件分发机制,使主循环仅负责事件调度,而具体逻辑交由 accept_cb、recv_cb、send_cb 等回调函数处理。
这种设计不仅避免了传统多线程模型中的线程开销问题,同时也为后续扩展(如线程池、主从 Reactor、多协议支持)提供了良好的基础。
零声社区资源链接:https://github.com/0voice
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)