仿 muduo库 从零实现高并发TCP服务框架
目录
附: 项目代码链接:
这篇文章我们一起从零实现一个高并发服务器项目。
做出TCP服务框架之后呢,在这基础之上我们再搭建一个HTTP的应用层,当然搭建其他服务也是一样的,全凭兴趣。
1. 我的开发环境:
4核 4G的 Ubuntu 云服务器,VSCode。
2. 设计思想:
基于Reactor模型,多线程主从Reactor的架构,One Thread One Loop 的思想……好的是不是有点懵,别担心,后面会慢慢解释这些,先对这些词混个眼熟吧。
3. 模块设计:
如果说设计思想是指导方针,那么模块设计就是计划表,说完模块设计就可以按照计划逐步实现了。
我愿意将模块分为两大类:功能性模块,核心模块。怎么区别?简单来说,核心模块面向需求核心,功能性模块为核心模块服务,或者说功能性模块就是核心模块的一部分。
那么接下来怎么展开说呢?先简单说说核心模块,如果先说功能模块,那么它们究竟有什么用,在哪里起作用是搞不清楚的。

4. 模块描述
4.1 核心模块
TcpServer:所有模块的功能整合,管理所有的Connection和Loop,帮助上层快速搭建TCP服务框架。
EventLoop:整合事件监听和事件处理,一个Loop就是一个事件执行循环,前面提到的One Thread One Loop简单来说就是在一个线程里执行一个事件循环,也就是一个EventLoop。
Connection:TCP的连接管理,说起来就这一句话,不过这是所有模块里最难最复杂的模块了。
4.2 功能性模块
Buffer:数据管理模块,提供丰富的读写接口,方便快速解析和读写。
Socket:TCP socket编程接口封装,快速创建并管理监听套接字和普通套接字。
Channel:事件管理模块,管理要监听什么事件,以及事件触发的回调。
Poller:事件监听模块,监听事件触发,是Reactor模型的核心模块。
TimeWheel:时间轮模块,管理定时任务,自动执行定时任务。
看到这里有人要说了,这人叽里咕噜的说啥呢? 非常正常,特别是对于Reactor模型不熟悉的朋友;那么对Reactor模型有一点了解的朋友就会发现,其中很多模块,在各种使用Reactor模型的 的代码里都是常见的,甚至是通用的。不熟悉没关系,熟悉当然更好,一会我们写起代码来再慢慢体会。
5. 模块实现
刚刚我们从上到下简单介绍了各个模块,接下来的模块实现我们从下往上写。
5.0 日志模块
前面漏掉了,这个模块对于调试和运维还是非常非常重要的,这段代码是我以前造的轮子,如果你以前也写过这样的代码,也可以拿来用。我采用了策略模式,可以选择打印到标准输出,也可以选择打印到文件里。我这个其实写的真挺好的,可以借鉴,就是和主题没多大关系,我就不贴出来了,如果想看,顺着路径去仓库链接找吧 ->(source/log.hpp)
5.1 Buffer模块
TCP协议是面向字节流的典型协议,因此,解析和缓存是我们处理数据最麻烦部分,这个模块就是来简化这个步骤的。Buffer最常被用作用户级别的数据缓冲区,收到数据就直接写到里面。具体每个口我就不介绍了,比较多,但是不难。是vector,但是具体读写呢都是当成指针来的。
写出来其实挺有用的,以后拿到别的项目用起来也挺爽的。
#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
private:
std::vector<char> _buffer;
uint64_t _write_pos;
uint64_t _read_pos;
public:
Buffer() : _write_pos(0), _read_pos(0), _buffer(BUFFER_DEFAULT_SIZE) {}
//buffer的起始位置
char *GetStartPos() {return &*_buffer.begin();}
//获取读位置
char *GetReadPos() {return GetStartPos() + _read_pos;}
//获取写位置
char *GetWritePos() {return GetStartPos() + _write_pos;}
//可读长度
uint64_t ReadableLen() {return _write_pos - _read_pos;}
//缓冲区末尾可写长度
uint64_t TailBufferLen() {return _buffer.size() - _write_pos;}
//缓冲区开头可写长度
uint64_t HeadBufferLen() {return _read_pos;}
//确保缓冲区足够写
void EnsureLen(uint64_t len)
{
if (len <= TailBufferLen()) return;
else if(len <= HeadBufferLen() + TailBufferLen()) //把数据移动到最前面
{
uint64_t read_len = ReadableLen();
std::copy(GetReadPos(), GetWritePos(), GetStartPos());
_read_pos = 0;
_write_pos = read_len;
}
else //扩容
_buffer.resize(_write_pos + len);
}
//移动读位置
void MoveReadPos(uint64_t len)
{
assert(len <= ReadableLen());
_read_pos += len;
}
//移动写位置
void MoveWritePos(uint64_t len)
{
assert(len <= TailBufferLen());
_write_pos += len;
}
//写入数据(void*)
void Write(const void *data, uint64_t len)
{
EnsureLen(len);
std::copy((char*)data, (char*)data + len, GetWritePos());
}
//写入数据(string)
void WriteString(std::string &str)
{
Write((void*)str.c_str(), str.size());
}
//写入数据(buffer)
void WriteBuffer(Buffer &buf)
{
Write(buf.GetReadPos(), buf.ReadableLen());
}
//写入并移动写位置
void WriteAndPush(const void *data, uint64_t len)
{
if(len == 0) return;
Write(data, len);
MoveWritePos(len);
}
void WriteAndPush(std::string &str)
{
WriteString(str);
MoveWritePos(str.size());
}
void WriteAndPush(Buffer &buf)
{
WriteBuffer(buf);
MoveWritePos(buf.ReadableLen());
}
//读取数据
void Read(void *data, uint64_t len)
{
assert(len <= ReadableLen());
std::copy(GetReadPos(), GetReadPos() + len, (char*)data);
}
//读取数据(string)
std::string ReadasString(uint64_t len)
{
assert(len <= ReadableLen());
std::string str;
str.resize(len);
Read(&str[0], len);
return str;
}
//读取数据(one line)
std::string ReadOneLine()
{
char* pos = (char*)memchr(GetReadPos(), '\n', ReadableLen()) ;
if(pos == NULL) return "";
uint64_t len = (uint64_t)(pos - GetReadPos()) + 1;
return ReadasString(len);
}
//读取并移动读位置
void ReadAndPop(void *data, uint64_t len)
{
Read(data, len);
MoveReadPos(len);
}
std::string ReadAndPop(uint64_t len)
{
std::string ret = ReadasString(len);
MoveReadPos(len);
return ret;
}
std::string ReadAndPop()
{
std::string ret = ReadOneLine();
MoveReadPos(ret.size());
return ret;
}
};
(为了描述的完整性呢,我贴出了所有的内容,但是其中一些接口如果暂时感觉用不到,那就先不着急写,用到再说)
5.2 Socket模块
一般涉及到网络的项目,都会有这个模块吧,看看我是怎么封装的:
#define defaultsockfd -1
class Socket
{
private:
int _sockfd;
public:
Socket() : _sockfd(defaultsockfd) {}
Socket(int fd) : _sockfd(fd) {}
~Socket(){LOG(DEBUG)<< "close socket "<< _sockfd; }
int _Fd(){return _sockfd;}
//创建套接字
bool Create()
{
//创建tcp套接字
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(_sockfd < 0)
{
LOG(ERROR) << "create socket error";
return false;
}
LOG(DEBUG) << "create socket success";
return true;
}
//绑定ip和端口号
bool Bind(const std::string &ip, uint16_t port)
{
sockaddr_in addr;
memset(&addr, 0, sizeof addr);
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &addr.sin_addr);
int n = bind(_sockfd, (sockaddr*)&addr, sizeof addr);
if(n < 0)
{
LOG(ERROR) << "bind error";
return false;
}
LOG(DEBUG) << "bind success";
return true;
}
#define MAXLISTEN 128 //backlog表示了accept队列的最大长度,与后续高并发场景有关,128 是一个经验值
//开启监听状态
bool Listen(int backlog = MAXLISTEN)
{
int n = listen(_sockfd, backlog);
if(n < 0)
{
LOG(ERROR) << "listen error";
return false;
}
LOG(DEBUG) << "listen success";
return true;
}
//向服务器发起连接
bool Connect(const std::string &ip, uint16_t port)
{
sockaddr_in addr;
memset(&addr, 0, sizeof addr);
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &addr.sin_addr);
int n = connect(_sockfd, (sockaddr*)&addr, sizeof addr);
if(n < 0)
{
LOG(ERROR) << "connect error";
return false;
}
LOG(DEBUG) << "connect success";
return true;
}
//接受连接
int Accept()
{
int fd = accept(_sockfd, nullptr, nullptr);
if(fd < 0)
{
LOG(ERROR) << "accept error: "<< fd;
return -1;
}
LOG(DEBUG) << "accept success";
return fd;
}
//接受数据
ssize_t Recv(void* buff, size_t len, int flags = 0)
{
ssize_t n = recv(_sockfd, buff, len, flags);
// recv 返回0, 表示对端关闭, 放入判错逻辑, 最终返回0 here n<0 to n<=0
if(n <= 0)
{
if(errno == EAGAIN || errno == EINTR)
return 0;
// LOG(ERROR) << "recv error";
return -1;
}
LOG(DEBUG) << "recv success:" << n;
return n;
}
//发送数据
ssize_t Send(const void* buff, size_t len, int flags = 0)
{
ssize_t n = send(_sockfd, buff, len, flags);
if(n < 0)
{
if(errno == EAGAIN || errno == EINTR)
return 0;
LOG(ERROR) << "send error";
return -1;
}
LOG(DEBUG) << "send success";
return n;
}
//关闭套接字
void Close()
{
if(_sockfd != defaultsockfd)
close(_sockfd);
}
//创建一个tcp服务端连接
bool BuildTcpServer(uint16_t port, bool is_block = true, const std::string &ip = "0.0.0.0", int backlog = MAXLISTEN)
{
if(!Create()) return false;
if(!is_block) SetNonBlock();
ReUseAddr();
if(!Bind(ip, port)) return false;
if(!Listen(backlog)) return false;
// ReUseAddr(); 怎么能比bind更晚调用
return true;
}
//创建一个tcp客户端连接
bool BuildTcpClient(const std::string &ip, uint16_t port, bool is_block = true)
{
if(!Create()) return false;
if(!is_block) SetNonBlock();
ReUseAddr();
if(!Connect(ip, port)) return false;
return true;
}
void ReUseAddr()
{
int opt = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof opt);
}
void SetNonBlock()
{
int fl = fcntl(_sockfd, F_GETFL);
if(fl < 0)
return;
fcntl(_sockfd, F_SETFL, fl | O_NONBLOCK);
}
};
要说接下来这个模块就无法避开Reactor模型了,简单介绍一下:
Reactor 反应堆模型是一种基于事件驱动的高并发设计模式,它的核心思想是:通过一个 I/O 多路复用器(如 epoll、select)监听多个文件描述符,当某个描述符就绪(读写事件就绪),就由调用对应的回调函数进行处理。
经典的单 Reactor 模型包含三个部分:事件源(文件描述符)、监听者(epoll、poll、select)、事件分发器(EventLoop)。
现在可以对应上我们的模块了,Channel对应了对文件描述符的事件和事件对应的回调函数管理,Poller对应监听者,EventLoop来执行就绪事件的回调函数。
5.3 Channel模块
这个模块我们来具体拆解一下它具体需要有什么,具体要干什么。
有什么:
1. (文件描述符)_fd:是谁的事件管理、2. _events:需要监听的事件、3. _revents:被触发的事件、4. 事件回调:各种事件被触发后执行的回调函数、5. 事件执行器:真正执行回调的模块
class Channel
{
public:
using EventCallback = std::function<void()>;
private:
int _fd;
EventLoop *loop; // 事件执行器
uint32_t _events; //需要监控的事件
uint32_t _revents; //触发的事件
EventCallback _write_callback; //写事件回调
EventCallback _read_callback; //读事件回调
EventCallback _error_callback; //错误事件回调
EventCallback _close_callback; //关闭事件回调
EventCallback _event_callback; //默认事件回调
}
干什么:
1. 对于_event 上层要能简单的设置是否要监听读写事件;并且当事件被设置更新了,要在Poller中立马更新监听。
2. 对于_revent Poller监听到的触发事件要能设置进来。
3. 对于事件回调 一共五个上层都能设置,具体他们干什么Channel是不知道的,只是放在这里等调用。
4. 对于事件执行器 它是真正执行回调的地方,Channel需要给他提供一个方法把,这个文件描述符被触发的事件的回调都执行一遍。(对应HandleEvents)
class Poller;
class EventLoop;
class Channel{
public:
Channel(EventLoop *eventloop, int fd) : loop(eventloop), _fd(fd), _events(0), _revents(0)
{}
~Channel() {}
int _Fd() {return _fd;}
uint32_t _Events() {return _events;}
void SetRevents(uint32_t revents) {_revents = revents;}
void SetWriteCall(const EventCallback &cb){_write_callback = cb;}
void SetReadCall(const EventCallback &cb){_read_callback = cb;}
void SetErrorCall(const EventCallback &cb){_error_callback = cb;}
void SetCloseCall(const EventCallback &cb){_close_callback = cb;}
void SetEventCall(const EventCallback &cb){_event_callback = cb;}
//是否监控可读
bool IsRead()
{return (_events & EPOLLIN);}
//是否监控可写
bool IsWrite()
{return (_events & EPOLLOUT);}
//设置监控可读
void SetRead()
{_events |= EPOLLIN; Update();}
//设置监控可写
void SetWrite()
{_events |= EPOLLOUT; Update();}
// 取消监控可读
void UnsetRead()
{_events &= ~EPOLLIN; Update();}
// 取消监控可写
void UnsetWrite()
{_events &= ~EPOLLOUT; Update();}
// 取消所有事件监控
void UnsetAll()
{_events = 0; Update();}
// 更新poller监控的事件
void Update();
// 取消对fd的监控
void Remove();
//处理触发事件
void HandleEvents()
{ // 处理读事件 //对端主动发FIN断联,读取最后数据
if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
{
// LOG(DEBUG)<< _fd<< " :" << &_read_callback;
if(_read_callback) _read_callback();
// 处理默认事件
// if(_event_callback) _event_callback();
}
//处理写事件
if(_revents & EPOLLOUT)
{
if(_write_callback) _write_callback();
// 处理默认事件
// if(_event_callback) _event_callback(); // here refresh
}
//处理错误事件 实际上都是关闭连接,一次就好 这里的处理顺序很有问题,待修改
if(_revents & EPOLLERR)
{
if(_error_callback) _error_callback();
}
//处理关闭事件
else if(_revents & EPOLLHUP)
{
if(_close_callback) _close_callback();
}
// 处理完前面的任务,再刷新时间片,不然执行任务时间过长,导致误以为连接不活跃
if(_event_callback) _event_callback(); // here refresh
}
};
看完代码,是不是还有很多疑惑,比如事件回调的处理顺序,Update Remove这两个函数怎么没实现? 没关系,再看下去,迷雾会慢慢散开……
5.4 Poller模块
事件监听模块,这个模块要用到epoll系列的接口,不熟悉的先去看看,再看代码; 和上个模块一样,我们通过解析这个模块的成员和方法来理解这个模块
有什么:
1. (事件监听文件描述符)_epfd、2.(被触发事件集合)_revents、3.(管理的Channel)_channels
class Poller
{
private:
int _epfd;
struct epoll_event _revents[MAXPOLLEVENTS]; // 保存活跃事件
std::unordered_map<int, Channel*> _channels;// Poller监听的Channel
}
干什么:
1. 更新监听事件,有点眼熟,上面Channel的两个接口就是对应要调用这里的接口才能实现,所以分开声明和实现。
2. 监听事件,主要的本职工作嘛,调用epoll_wait把触发的事件(epoll_event)们拿出来,通过里面保存的文件描述符(epoll_event.data.fd)找到对应Channel,并更新Channel的_revents;别忘了,真正执行回调的还不是这里,而是更上层(EventLoop),所以通过参数,把触发事件的Channel都交给上层,让上层去执行回调。
3. 管理Channel 在_channels 里的添加 和 删除。
#define MAXPOLLEVENTS 1024 // 一次最多获取的活跃事件数
class Poller
{
private:
int _epfd;
struct epoll_event _revents[MAXPOLLEVENTS]; // 保存活跃事件
std::unordered_map<int, Channel*> _channels;// Poller监听的Channel
// 对epoll接口的封装
public:
Poller()
{ // 一次监听的最大事件数已经,不受限制了这个参数没啥意义
_epfd = epoll_create(MAXPOLLEVENTS);
}
~Poller()
{
close(_epfd);
}
// 对于epoll监控的文件描述符进行更新 --
void UpdateEpoll(Channel *channel, int op)
{
auto it = _channels.find(channel->_Fd());
assert(it != _channels.end());
epoll_event event;
event.events = channel->_Events();
event.data.fd = channel->_Fd();
int ret = epoll_ctl(_epfd, op, channel->_Fd(), &event);
if(ret < 0)
{
LOG(ERROR) << "epoll_ctl error";
abort();
}
}
// 监听就绪事件
void Poll(std::vector<Channel *> *active_channels)
{
// epoll_wait(epfd, events, maxevents, timeout)
int nfds = epoll_wait(_epfd, _revents, MAXPOLLEVENTS, -1);
if(nfds < 0)
{
if(errno == EINTR)// 被信号中断
return;
LOG(ERROR) << "epoll_wait error";
abort();
}
for(int i = 0; i < nfds; i++)
{
auto it = _channels.find(_revents[i].data.fd);
assert(it != _channels.end());
it->second->SetRevents(_revents[i].events); // 设置触发事件到channel中
active_channels->push_back(it->second);
}
}
bool HasChannel(Channel *channel)
{
auto it = _channels.find(channel->_Fd());
if(it == _channels.end()) return false;
return true;
}
// 交给下层调用的接口
public:
void Update(Channel *channel)
{
if(!HasChannel(channel))
{
_channels.emplace(channel->_Fd(), channel);
UpdateEpoll(channel, EPOLL_CTL_ADD);
}
else
UpdateEpoll(channel, EPOLL_CTL_MOD);
}
void Remove(Channel *channel)
{
if(HasChannel(channel))
{
UpdateEpoll(channel, EPOLL_CTL_DEL);
_channels.erase(channel->_Fd());
}
}
};
到这里,大部分基础模块已经完成了,下一章,将会揭晓高并发的核心秘密。
未完待续……
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)