目录

附: 项目代码链接: 

1. 我的开发环境:

2. 设计思想:

3. 模块设计:

 4. 模块描述

4.1 核心模块

4.2 功能性模块

 5. 模块实现

5.0 日志模块

5.1 Buffer模块

5.2 Socket模块

5.3 Channel模块

有什么:

干什么:

5.3 Poller模块

有什么:

 干什么:


附: 项目代码链接: 

high-concurrency_server: 本项目基于C++实现的 轻量级、高并发 TCP通信框架(上层搭建HTTP服务) 参考了muduo库的设计思想,采用主从Reactor架构,多线程核心逻辑无锁设计,极大提高效率……https://gitee.com/waspqj/high-concurrency_server

这篇文章我们一起从零实现一个高并发服务器项目。

做出TCP服务框架之后呢,在这基础之上我们再搭建一个HTTP的应用层,当然搭建其他服务也是一样的,全凭兴趣。

1. 我的开发环境:

4核 4G的 Ubuntu 云服务器,VSCode。

2. 设计思想:

基于Reactor模型,多线程主从Reactor的架构,One Thread One Loop 的思想……好的是不是有点懵,别担心,后面会慢慢解释这些,先对这些词混个眼熟吧。

3. 模块设计:

如果说设计思想是指导方针,那么模块设计就是计划表,说完模块设计就可以按照计划逐步实现了。

我愿意将模块分为两大类:功能性模块,核心模块。怎么区别?简单来说,核心模块面向需求核心,功能性模块为核心模块服务,或者说功能性模块就是核心模块的一部分。

那么接下来怎么展开说呢?先简单说说核心模块,如果先说功能模块,那么它们究竟有什么用,在哪里起作用是搞不清楚的。

 4. 模块描述

4.1 核心模块

TcpServer:所有模块的功能整合,管理所有的Connection和Loop,帮助上层快速搭建TCP服务框架。

EventLoop:整合事件监听和事件处理,一个Loop就是一个事件执行循环,前面提到的One Thread One Loop简单来说就是在一个线程里执行一个事件循环,也就是一个EventLoop。

Connection:TCP的连接管理,说起来就这一句话,不过这是所有模块里最难最复杂的模块了。

4.2 功能性模块

Buffer:数据管理模块,提供丰富的读写接口,方便快速解析和读写。

Socket:TCP socket编程接口封装,快速创建并管理监听套接字和普通套接字。

Channel:事件管理模块,管理要监听什么事件,以及事件触发的回调。

Poller:事件监听模块,监听事件触发,是Reactor模型的核心模块。

TimeWheel:时间轮模块,管理定时任务,自动执行定时任务。

看到这里有人要说了,这人叽里咕噜的说啥呢? 非常正常,特别是对于Reactor模型不熟悉的朋友;那么对Reactor模型有一点了解的朋友就会发现,其中很多模块,在各种使用Reactor模型的 的代码里都是常见的,甚至是通用的。不熟悉没关系,熟悉当然更好,一会我们写起代码来再慢慢体会。

 5. 模块实现

刚刚我们从上到下简单介绍了各个模块,接下来的模块实现我们从下往上写。

5.0 日志模块

前面漏掉了,这个模块对于调试和运维还是非常非常重要的,这段代码是我以前造的轮子,如果你以前也写过这样的代码,也可以拿来用。我采用了策略模式,可以选择打印到标准输出,也可以选择打印到文件里。我这个其实写的真挺好的,可以借鉴,就是和主题没多大关系,我就不贴出来了,如果想看,顺着路径去仓库链接找吧 ->(source/log.hpp)

5.1 Buffer模块

TCP协议是面向字节流的典型协议,因此,解析和缓存是我们处理数据最麻烦部分,这个模块就是来简化这个步骤的。Buffer最常被用作用户级别的数据缓冲区,收到数据就直接写到里面。具体每个口我就不介绍了,比较多,但是不难。是vector,但是具体读写呢都是当成指针来的。

写出来其实挺有用的,以后拿到别的项目用起来也挺爽的。

#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
private:
    std::vector<char> _buffer;
    uint64_t _write_pos;
    uint64_t _read_pos;
public:
    Buffer() : _write_pos(0), _read_pos(0), _buffer(BUFFER_DEFAULT_SIZE) {}
    //buffer的起始位置
    char *GetStartPos() {return &*_buffer.begin();}
    //获取读位置
    char *GetReadPos() {return GetStartPos() + _read_pos;}
    //获取写位置
    char *GetWritePos() {return GetStartPos() + _write_pos;}
    //可读长度
    uint64_t ReadableLen() {return _write_pos - _read_pos;}
    //缓冲区末尾可写长度
    uint64_t TailBufferLen() {return _buffer.size() - _write_pos;}
    //缓冲区开头可写长度
    uint64_t HeadBufferLen() {return _read_pos;}

    //确保缓冲区足够写
    void EnsureLen(uint64_t len)
    {
        if (len <= TailBufferLen()) return;
        else if(len <= HeadBufferLen() + TailBufferLen()) //把数据移动到最前面
        {
            uint64_t read_len = ReadableLen();
            std::copy(GetReadPos(), GetWritePos(), GetStartPos());
            _read_pos = 0;
            _write_pos = read_len;
        }
        else //扩容
            _buffer.resize(_write_pos + len);
    }
    //移动读位置
    void MoveReadPos(uint64_t len) 
    {
        assert(len <= ReadableLen());
        _read_pos += len;
    }
    //移动写位置
    void MoveWritePos(uint64_t len) 
    {
        assert(len <= TailBufferLen());
        _write_pos += len;
    }

    //写入数据(void*)
    void Write(const void *data, uint64_t len)
    {
        EnsureLen(len);
        std::copy((char*)data, (char*)data + len, GetWritePos());
    }
    //写入数据(string)
    void WriteString(std::string &str)
    {
        Write((void*)str.c_str(), str.size());
    }
    //写入数据(buffer)
    void WriteBuffer(Buffer &buf)
    {
        Write(buf.GetReadPos(), buf.ReadableLen());
    }

    //写入并移动写位置
    void WriteAndPush(const void *data, uint64_t len)
    {
        if(len == 0) return;
        Write(data, len);
        MoveWritePos(len);
    }
    void WriteAndPush(std::string &str)
    {
        WriteString(str);
        MoveWritePos(str.size());
    }
    void WriteAndPush(Buffer &buf)
    {
        WriteBuffer(buf);
        MoveWritePos(buf.ReadableLen());
    }

    //读取数据
    void Read(void *data, uint64_t len)
    {
        assert(len <= ReadableLen());
        std::copy(GetReadPos(), GetReadPos() + len, (char*)data);
    }
    //读取数据(string)
    std::string ReadasString(uint64_t len)
    {
        assert(len <= ReadableLen());
        std::string str;
        str.resize(len);
        Read(&str[0], len);
        return str;
    }
    //读取数据(one line)
    std::string ReadOneLine()
    {
        char* pos = (char*)memchr(GetReadPos(), '\n', ReadableLen()) ;
        if(pos == NULL) return "";
        uint64_t len = (uint64_t)(pos - GetReadPos()) + 1;
        return ReadasString(len);        
    }
    //读取并移动读位置
    void ReadAndPop(void *data, uint64_t len)
    {
        Read(data, len);
        MoveReadPos(len);
    }
    std::string ReadAndPop(uint64_t len)
    {
        std::string ret = ReadasString(len);
        MoveReadPos(len);
        return ret;
    }
    std::string ReadAndPop()
    {
        std::string ret = ReadOneLine();
        MoveReadPos(ret.size());
        return ret;
    }
};

(为了描述的完整性呢,我贴出了所有的内容,但是其中一些接口如果暂时感觉用不到,那就先不着急写,用到再说)

5.2 Socket模块

一般涉及到网络的项目,都会有这个模块吧,看看我是怎么封装的:

#define defaultsockfd -1
class Socket
{
private:
    int _sockfd;
public:
    Socket() : _sockfd(defaultsockfd) {}
    Socket(int fd) : _sockfd(fd) {}
    ~Socket(){LOG(DEBUG)<< "close socket "<< _sockfd; }
    int _Fd(){return _sockfd;}
    //创建套接字
    bool Create()
    {
        //创建tcp套接字
        _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if(_sockfd < 0)
        {
            LOG(ERROR) << "create socket error";
            return false;
        }
        LOG(DEBUG) << "create socket success";
        return true;    
    }
    //绑定ip和端口号
    bool Bind(const std::string &ip, uint16_t port)
    { 
        sockaddr_in addr;
        memset(&addr, 0, sizeof addr);
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &addr.sin_addr);
        int n = bind(_sockfd, (sockaddr*)&addr, sizeof addr);
        if(n < 0)
        {
            LOG(ERROR) << "bind error";
            return false;
        }
        LOG(DEBUG) << "bind success";
        return true;
    }
#define MAXLISTEN 128  //backlog表示了accept队列的最大长度,与后续高并发场景有关,128 是一个经验值
    //开启监听状态
    bool Listen(int backlog = MAXLISTEN)
    {
        int n = listen(_sockfd, backlog);
        if(n < 0)
        {
            LOG(ERROR) << "listen error";
            return false;
        }
        LOG(DEBUG) << "listen success";
        return true;
    }
    //向服务器发起连接
    bool Connect(const std::string &ip, uint16_t port)
    { 
        sockaddr_in addr;
        memset(&addr, 0, sizeof addr);
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &addr.sin_addr);
        int n = connect(_sockfd, (sockaddr*)&addr, sizeof addr);
        if(n < 0)
        {
            LOG(ERROR) << "connect error";
            return false;
        }
        LOG(DEBUG) << "connect success";
        return true;
    }
    //接受连接
    int Accept()
    {
        int fd = accept(_sockfd, nullptr, nullptr);
        if(fd < 0)
        {
            LOG(ERROR) << "accept error: "<< fd;
            return -1;
        }
        LOG(DEBUG) << "accept success";

        return fd;
    }
     //接受数据
    ssize_t Recv(void* buff, size_t len, int flags = 0)
    {
        ssize_t n = recv(_sockfd, buff, len, flags);
        // recv 返回0, 表示对端关闭, 放入判错逻辑, 最终返回0    here n<0 to n<=0 
        if(n <= 0)
        {
            if(errno == EAGAIN || errno == EINTR)
                return 0;
            // LOG(ERROR) << "recv error";
            return -1;
        }
        LOG(DEBUG) << "recv success:" << n;
        return n; 
    }
    //发送数据
    ssize_t Send(const void* buff, size_t len, int flags = 0)
    {
        ssize_t n = send(_sockfd, buff, len, flags);
        if(n < 0)
        {
            if(errno == EAGAIN || errno == EINTR)
                return 0;
            LOG(ERROR) << "send error";
            return -1;
        }
        LOG(DEBUG) << "send success";
        return n;
    }
    //关闭套接字
    void Close()
    {
        if(_sockfd != defaultsockfd)
            close(_sockfd);
    }

    //创建一个tcp服务端连接
    bool BuildTcpServer(uint16_t port, bool is_block = true, const std::string &ip = "0.0.0.0", int backlog = MAXLISTEN)
    {
        if(!Create()) return false;
        if(!is_block) SetNonBlock();
        ReUseAddr();
        if(!Bind(ip, port)) return false;
        if(!Listen(backlog)) return false;
        // ReUseAddr(); 怎么能比bind更晚调用
        return true;
    }
    //创建一个tcp客户端连接
    bool BuildTcpClient(const std::string &ip, uint16_t port, bool is_block = true)
    {
        if(!Create()) return false;
        if(!is_block) SetNonBlock();
        ReUseAddr();
        if(!Connect(ip, port)) return false;
        return true;
    }

    void ReUseAddr()
    {
        int opt = 1;
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof opt);
    }
    void SetNonBlock()
    {
        int fl = fcntl(_sockfd, F_GETFL);
        if(fl < 0)
            return; 
        fcntl(_sockfd, F_SETFL, fl | O_NONBLOCK);
    }
};

要说接下来这个模块就无法避开Reactor模型了,简单介绍一下:

Reactor 反应堆模型是一种基于事件驱动的高并发设计模式,它的核心思想是:通过一个 I/O 多路复用器(如 epollselect)监听多个文件描述符,当某个描述符就绪(读写事件就绪),就由调用对应的回调函数进行处理。

经典的单 Reactor 模型包含三个部分:事件源(文件描述符)、监听者(epoll、poll、select)、事件分发器(EventLoop)。

现在可以对应上我们的模块了,Channel对应了对文件描述符的事件和事件对应的回调函数管理,Poller对应监听者,EventLoop来执行就绪事件的回调函数。

5.3 Channel模块

这个模块我们来具体拆解一下它具体需要有什么,具体要干什么。

有什么:

1. (文件描述符)_fd:是谁的事件管理、2. _events:需要监听的事件、3. _revents:被触发的事件、4. 事件回调:各种事件被触发后执行的回调函数、5. 事件执行器:真正执行回调的模块

class Channel
{
public:
    using EventCallback = std::function<void()>;
private:
    int _fd;
    EventLoop *loop; // 事件执行器

    uint32_t _events;  //需要监控的事件
    uint32_t _revents; //触发的事件
    EventCallback _write_callback; //写事件回调
    EventCallback _read_callback;  //读事件回调
    EventCallback _error_callback; //错误事件回调
    EventCallback _close_callback; //关闭事件回调
    EventCallback _event_callback; //默认事件回调
}

干什么:

1. 对于_event 上层要能简单的设置是否要监听读写事件;并且当事件被设置更新了,要在Poller中立马更新监听。

2. 对于_revent Poller监听到的触发事件要能设置进来。

3. 对于事件回调 一共五个上层都能设置,具体他们干什么Channel是不知道的,只是放在这里等调用。

4. 对于事件执行器 它是真正执行回调的地方,Channel需要给他提供一个方法把,这个文件描述符被触发的事件的回调都执行一遍。(对应HandleEvents)

class Poller;
class EventLoop;
class Channel{
public:
    Channel(EventLoop *eventloop, int fd) : loop(eventloop), _fd(fd), _events(0), _revents(0)
    {}
    ~Channel() {}
    int _Fd() {return _fd;}
    uint32_t _Events() {return _events;}
    void SetRevents(uint32_t revents) {_revents = revents;}
    void SetWriteCall(const EventCallback &cb){_write_callback = cb;}
    void SetReadCall(const EventCallback &cb){_read_callback = cb;}
    void SetErrorCall(const EventCallback &cb){_error_callback = cb;}
    void SetCloseCall(const EventCallback &cb){_close_callback = cb;}
    void SetEventCall(const EventCallback &cb){_event_callback = cb;}

    //是否监控可读
    bool IsRead()
    {return (_events & EPOLLIN);}
    //是否监控可写
    bool IsWrite()
    {return (_events & EPOLLOUT);}
    //设置监控可读
    void SetRead()
    {_events |= EPOLLIN; Update();}
    //设置监控可写
    void SetWrite()
    {_events |= EPOLLOUT; Update();}
    // 取消监控可读
    void UnsetRead()
    {_events &= ~EPOLLIN; Update();}
    // 取消监控可写
    void UnsetWrite()
    {_events &= ~EPOLLOUT; Update();}
    // 取消所有事件监控
    void UnsetAll()
    {_events = 0; Update();}
    // 更新poller监控的事件
    void Update();
    // 取消对fd的监控
    void Remove();
    //处理触发事件
    void HandleEvents()
    {   // 处理读事件       //对端主动发FIN断联,读取最后数据           
        if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
        {
            // LOG(DEBUG)<< _fd<< " :" << &_read_callback;
            if(_read_callback) _read_callback();
            // 处理默认事件
            // if(_event_callback) _event_callback();
        }
        //处理写事件
        if(_revents & EPOLLOUT)
        {
            if(_write_callback) _write_callback();
            // 处理默认事件
            // if(_event_callback) _event_callback(); // here refresh
        }
        //处理错误事件 实际上都是关闭连接,一次就好       这里的处理顺序很有问题,待修改
        if(_revents & EPOLLERR)
        {
            if(_error_callback) _error_callback();
        }
        //处理关闭事件
        else if(_revents & EPOLLHUP)
        {
            if(_close_callback) _close_callback();
        }
        // 处理完前面的任务,再刷新时间片,不然执行任务时间过长,导致误以为连接不活跃
        if(_event_callback) _event_callback(); // here refresh
    }
};

看完代码,是不是还有很多疑惑,比如事件回调的处理顺序,Update Remove这两个函数怎么没实现? 没关系,再看下去,迷雾会慢慢散开……

5.4 Poller模块

事件监听模块,这个模块要用到epoll系列的接口,不熟悉的先去看看,再看代码; 和上个模块一样,我们通过解析这个模块的成员和方法来理解这个模块

有什么:

1. (事件监听文件描述符)_epfd、2.(被触发事件集合)_revents、3.(管理的Channel)_channels

class Poller
{
private:
    int _epfd;
    struct epoll_event _revents[MAXPOLLEVENTS]; // 保存活跃事件
    std::unordered_map<int, Channel*> _channels;// Poller监听的Channel
}

 干什么:

1. 更新监听事件,有点眼熟,上面Channel的两个接口就是对应要调用这里的接口才能实现,所以分开声明和实现。

2. 监听事件,主要的本职工作嘛,调用epoll_wait把触发的事件(epoll_event)们拿出来,通过里面保存的文件描述符(epoll_event.data.fd)找到对应Channel,并更新Channel的_revents;别忘了,真正执行回调的还不是这里,而是更上层(EventLoop),所以通过参数,把触发事件的Channel都交给上层,让上层去执行回调。

3. 管理Channel 在_channels 里的添加 和 删除。

#define MAXPOLLEVENTS 1024 // 一次最多获取的活跃事件数
class Poller
{
private:
    int _epfd;
    struct epoll_event _revents[MAXPOLLEVENTS]; // 保存活跃事件
    std::unordered_map<int, Channel*> _channels;// Poller监听的Channel

// 对epoll接口的封装
public:
    Poller()
    {   // 一次监听的最大事件数已经,不受限制了这个参数没啥意义
        _epfd = epoll_create(MAXPOLLEVENTS);
    }
    ~Poller()
    {
        close(_epfd);
    }
    // 对于epoll监控的文件描述符进行更新 -- 
    void UpdateEpoll(Channel *channel, int op)
    {
        auto it = _channels.find(channel->_Fd());
        assert(it != _channels.end());
        epoll_event event;
        event.events = channel->_Events();
        event.data.fd = channel->_Fd();
        int ret = epoll_ctl(_epfd, op, channel->_Fd(), &event);
        if(ret < 0)
        {
            LOG(ERROR) << "epoll_ctl error";
            abort();
        }
    }
    // 监听就绪事件
    void Poll(std::vector<Channel *> *active_channels)
    {
        // epoll_wait(epfd, events, maxevents, timeout)
        int nfds = epoll_wait(_epfd, _revents, MAXPOLLEVENTS, -1);
        if(nfds < 0)
        {   
            if(errno == EINTR)// 被信号中断
                return;
            LOG(ERROR) << "epoll_wait error";
            abort();
        }
        for(int i = 0; i < nfds; i++)
        {
            auto it = _channels.find(_revents[i].data.fd);
            assert(it != _channels.end());
            it->second->SetRevents(_revents[i].events); // 设置触发事件到channel中
            active_channels->push_back(it->second);
        }
    }

    bool HasChannel(Channel *channel)
    {
        auto it = _channels.find(channel->_Fd());
        if(it == _channels.end()) return false;
        return true;
    }
// 交给下层调用的接口
public:
    void Update(Channel *channel)
    {
        if(!HasChannel(channel))
        {
            _channels.emplace(channel->_Fd(), channel);
            UpdateEpoll(channel, EPOLL_CTL_ADD);
        }
        else
            UpdateEpoll(channel, EPOLL_CTL_MOD);
    }
    void Remove(Channel *channel)
    {
        if(HasChannel(channel))
        {
            UpdateEpoll(channel, EPOLL_CTL_DEL);
            _channels.erase(channel->_Fd());
        }
    }
};

到这里,大部分基础模块已经完成了,下一章,将会揭晓高并发的核心秘密。

未完待续……

仿 muduo库 从零实现高并发TCP服务框架 (二)-CSDN博客https://blog.csdn.net/2502_91433987/article/details/161198186?sharetype=blogdetail&sharerId=161198186&sharerefer=PC&sharesource=2502_91433987&spm=1011.2480.3001.8118

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐