一、Reactor 模型到底是什么?(一句话讲透)

Reactor = 事件驱动 + IO 多路复用 + 回调分发

一句话总结:一个循环等着事件来,事件来了自动分发到对应处理函数。

核心三要素:

  1. 事件循环(EventLoop):负责跑循环、等事件、调回调
  2. IO 多路复用(epoll):负责监听海量 fd
  3. 事件分发器(Channel):每个 fd 对应一个 Channel,负责事件处理

二、Reactor 框架整体架构(标准主从 Reactor)

1. Main Reactor(主反应堆)

  • 只有一个 EventLoop
  • 只做 accept 新连接
  • 把新连接分发给 Sub Reactor

2. Sub Reactor(从反应堆)

  • 多个 EventLoop,每个一个线程
  • 负责已连接套接字的 read/write/close
  • 实现多核负载均衡

3. ThreadPool(业务线程池)

  • 处理耗时业务(数据库、计算)

4. TimerQueue(定时器队列)

  • 统一管理定时任务、超时断开

三、核心组件逐行精讲

1. EventLoop —— Reactor 的心脏

一个线程 = 一个 EventLoop = 一个 epoll

头文件:

#pragma once
#include <vector>
#include <atomic>
#include <unistd.h>
#include <thread>
#include <condition_variable>
#include "timestamp.h"
#include <memory>
#include "poller.h"
#include <mutex>
#include <functional>
#include "timer.h"
#include <signal.h>

namespace net
{
    // 写已经关闭的 socket 时,系统会发 SIGPIPE 默认杀死进程
    // 服务器必须忽略!
    class IgnoreSigPipe
    {
    public:
        IgnoreSigPipe()
        {
            signal(SIGPIPE, SIG_IGN);
        }
    };
    static IgnoreSigPipe ignore_pipe;

    // epoll_wait 超时 1 秒
    const int InitEpollTimeout = 1000;
    class Channel;
    // class TimerQueue;
    //  事件循环类:事件监控+任务池+定时器
    // 一个线程 = 一个 EventLoop
    class EventLoop
    {
        // 任务回调类型
        typedef std::function<void()> Functor;

    public:
        // 初始化成成员
        EventLoop();
        ~EventLoop();
        // 循环事件监控,获取就绪channel列表,逐个执⾏handleEvent即可
        void loop();
        // 置位_quit, 如果当前不在loop线程中,就唤醒⼀下loop
        void quit();
        // 在loop线程中则直接执⾏任务,否则就添加到任务池
        void runInLoop(Functor cb);
        // 将任务添加到任务池中,如果当前没有在loop线程中,或者当前处于处理任务池任务状态则唤醒
        void queueInLoop(Functor cb);
        // 唤醒loop,其实就是给_eventfd写⼊个数据即可(要求8字节),与handleRead对应
        void wakeup();
        // 通过poller添加监控即可
        void updateChannel(Channel *channel);
        // 通过poller移除监控即可
        void removeChannel(Channel *channel);
        // 通过poller返回管理结果即可
        bool hasChannel(Channel *channel);
        // 判断不再当前loop的线程中就抛异常就⾏
        void assertInLoopThread();
        // 判断当前是否处于loop所在线程
        bool isInLoopThread() const;
        // 提供三种不同的接口添加定时任务:定点,延迟,循环
        // 通过timer添加指定时间的定时任务
        TimerId runAt(Timestamp time, TimerCallback cb);
        // 通过timer添加延迟N秒的定时任务
        TimerId runAfter(double delay_sec, TimerCallback cb);
        // 通过timer添加循环定时任务
        TimerId runEvery(double interval_sec, TimerCallback cb);

    private:
        // 处理_eventfd的读事件,读取_eventfd中的数据
        void handleRead();
        // 针对任务池中的任务进⾏调⽤执⾏即可
        void doPendingFunctors();

    private:
        typedef std::vector<Channel *> ChannelList;
        bool _looping;                // 当前是否处于循环监控状态中
        std::atomic<bool> _quit;      // 循环退出控制标志位:事件循环的循环条件
        bool _eventHandling;          // 当前是否处理正在处理事件中
        bool _callingPendingFunctors; // 当前是否处于正在处理任务池中的任务中
        // 当前eventloop所在的线程ID,
        pid_t _threadId;                         // 线程ID,pthread_self,pid_t getid()==getpid()
        Timestamp _pollReturnTime;               // poll监控返回时的时间
        std::unique_ptr<Poller> _poller;         // poller监控对象
        std::unique_ptr<TimerQueue> _timerQueue; // 定时器对象,⼀定要在poller之后
        int _wakeupFd;                           // 事件监控唤醒描述符
        std::unique_ptr<Channel> _wakeupChannel; // 循环唤醒描述符对应的channel对象
        ChannelList _activeChannels;             // 当前活跃的所有channel列表
        Channel *_currentActiveChannel;          // 从_activeChannels获取当前正在处理的channel
        std::mutex _mutex;                       // 任务池的保护锁
        std::vector<Functor> _pendingFunctors;   // 任务池
    };

    // EventLoopThread:封装一个线程 + 一个 EventLoop
    class EventLoopThread
    {
        ...
    };

    // EventLoopThreadPool:线程池,管理 N 个 EventLoopThread
    // 实现主从 Reactor 多线程模型
    class EventLoopTreadPool
    {
        ...
    };
}

源程序:

#include "eventloop.h"
#include "poller.h"
#include "channel.h"
#include <cassert>
#include <sys/eventfd.h>
#include "details.h"
#include <cassert>

namespace net
{
    // 创建 eventfd:用于跨线程唤醒 epoll_wait
    int createEventFd()
    {
        // 创建eventfd对象,用于事件唤醒
        // EFD_CLOEXEC:fork后在子进程中关闭描述符
        // EFD_NONBLOCK:非阻塞
        int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
        if (fd < 0)
        {
            LOG_FATAL("eventfd 失败: %s", strerror(errno));
        }
        return fd;
    }
    // 唤醒:往 eventfd 写 8 字节
    void writeEventFd(int fd)
    {
        uint64_t val = 1;
        ssize_t ret = ::write(fd, &val, sizeof(val));
        if (ret < 0)
        {
            LOG_ERROR("event fd write error: %s", strerror(errno));
        }
    }
    // 读走数据,清空 eventfd 可读状态
    void readEventFd(int fd)
    {
        uint64_t val = 1;
        ssize_t ret = ::read(fd, &val, sizeof(val));
        if (ret < 0)
        {
            LOG_ERROR("event fd write error: %s", strerror(errno));
        }
    }

    // 初始化成成员
    EventLoop::EventLoop()
        : _looping(false),
          _quit(false),
          _eventHandling(false),
          _callingPendingFunctors(false),
          _threadId(::gettid()), // 决定了eventloop对象必须在所在线程中实例化
          _poller(Poller::newDefaultPoller(this)),
          _timerQueue(new TimerQueue(this)),
          _wakeupFd(createEventFd()),
          _wakeupChannel(new Channel(this, _wakeupFd))
    {
        // 为_wakeupFd触发的可读事件设置回调处理函数
        _wakeupChannel->setReadCallback(std::bind(&EventLoop::handleRead, this));
        // 将_wakeupFd挂到Poller中进行监控
        _wakeupChannel->enableReading();
    }
    EventLoop::~EventLoop()
    {
        assert(_looping == false);
        if (_poller->hasChannel(_wakeupChannel.get()))
        {
            _wakeupChannel->disableAll();
            _wakeupChannel->remove();
        }
        ::close(_wakeupFd);
    }
    // 循环事件监控,获取就绪channel列表,逐个执⾏handleEvent即可
    // 开始事件循环:1.开始描述符事件监控,2.处理任务池的任务
    void EventLoop::loop()
    {
        _looping = true;
        _quit = false;
        while (!_quit)
        {
            // 获取就绪描述符channel,进行事件处理
            _activeChannels.clear();
            Timestamp now = _poller->poll(InitEpollTimeout, &_activeChannels);
            _currentActiveChannel = NULL;
            _eventHandling = true;
            for (int i = 0; i < _activeChannels.size(); ++i)
            {
                _currentActiveChannel = _activeChannels[i]; // 拿到就绪channel
                _currentActiveChannel->handleEvent(now);
            }
            _eventHandling = false;
            // 处理任务池中的任务
            doPendingFunctors();
        }
        _looping = false;
    }
    // 置位_quit, 如果当前不在loop线程中,就唤醒⼀下loop
    void EventLoop::quit()
    {
        _quit = true;

        // 如果当前没有在1oop线程中,或者当前处于处理任务池任务状态则唤醒
        if (!isInLoopThread())
        {
            wakeup(); // 唤醒事件监控
        }
    }
    // 在loop线程中则直接执⾏任务,否则就添加到任务池
    void EventLoop::runInLoop(Functor cb)
    {
        if (isInLoopThread())
        {
            cb(); // 当前线程 → 直接执行
        }
        else
        {
            queueInLoop(cb); // 其他线程 → 放入队列
        }
    }
    // 将任务添加到任务池中,如果当前没有在loop线程中,或者当前处于处理任务池任务状态则唤醒
    void EventLoop::queueInLoop(Functor cb)
    {
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _pendingFunctors.push_back(cb);
        }

        // 如果当前没有在1oop线程中,或者当前处于处理任务池任务状态则唤醒
        if (_callingPendingFunctors || !isInLoopThread())
        {
            wakeup(); // 唤醒事件监控
        }
    }
    // 唤醒loop,其实就是给_eventfd写⼊个数据即可(要求8字节),与handleRead对应
    void EventLoop::wakeup()
    {
        writeEventFd(_wakeupFd);
    }
    // 通过poller添加监控即可
    void EventLoop::updateChannel(Channel *channel)
    {
        assertInLoopThread();
        _poller->updateChannel(channel);
    }
    // 通过poller移除监控即可
    void EventLoop::removeChannel(Channel *channel)
    {
        assertInLoopThread();
        _poller->removeChannel(channel);
    }
    // 通过poller返回管理结果即可
    bool EventLoop::hasChannel(Channel *channel)
    {
        assertInLoopThread();
        return _poller->hasChannel(channel);
    }
    // 判断不再当前loop的线程中就抛异常就⾏
    void EventLoop::assertInLoopThread()
    {
        assert(_threadId == ::gettid());
    }
    // 判断当前是否处于loop所在线程
    bool EventLoop::isInLoopThread() const
    {
        if (_threadId == ::gettid())
        {
            return true;
        }
        return false;
    }

    // 处理_eventfd的读事件,读取_eventfd中的数据
    void EventLoop::handleRead()
    {
        readEventFd(_wakeupFd);
    }
    // 针对任务池中的任务进⾏调⽤执⾏即可
    void EventLoop::doPendingFunctors()
    {
        _callingPendingFunctors = true;
        // 交换,减少锁范围
        std::vector<Functor> functors;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            functors.swap(_pendingFunctors);
        }
        // 依次执行任务
        for (auto &functor : functors)
        {
            functor();
        }
        _callingPendingFunctors = false;
    }
    // 通过timer添加指定时间的定时任务
    // 在构造函数中初始化
    TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
    {
        return _timerQueue->addTimer(std::move(cb), time, 0);
    }
    // 通过timer添加延迟N秒的定时任务
    TimerId EventLoop::runAfter(double delay_sec, TimerCallback cb)
    {
        // 根据当前系统时间,计算得到超时时间点
        Timestamp when = net::addTime(Timestamp::now(), delay_sec);
        // 添加定时任务
        return _timerQueue->addTimer(std::move(cb), when, 0);
    }
    // 通过timer添加循环定时任务
    TimerId EventLoop::runEvery(double interval_sec, TimerCallback cb)
    {
        Timestamp when = net::addTime(Timestamp::now(), interval_sec);
        // 添加定时任务
        return _timerQueue->addTimer(std::move(cb), when, interval_sec);
    }
    
}

核心函数:loop () —— 真正的 Reactor 循环

这就是 Reactor 模型的全部灵魂!


2. Channel —— 事件分发器(fd 的管家)

一个 fd 永远只对应一个 Channel

头文件:

#pragma once
#include <functional>
#include <map>
#include <memory>
#include "poller.h"
namespace net
{
    // 一个 Channel 永远只属于 1 个 EventLoop
    class Channel
    {
    public:
    // 定义回调函数类型
        typedef std::function<void()> EventCallback;// 普通事件回调
        // 读事件回调(带时间戳)
        typedef std::function<void(Timestamp)> ReadEventCallback;
        Channel(EventLoop *loop, int fd);
        ~Channel();
        // 设置事件回调处理函数
        // 启动当前channel中描述符的读事件监控events |= kReadEvent;//修改当前要监控的事件 // poller->updatechannel(this);
        void setReadCallback(ReadEventCallback cb) { _readCallback = std::move(cb); }
        void setWriteCallback(EventCallback cb) { _writeCallback = std::move(cb); }
        void setCloseCallback(EventCallback cb) { _closeCallback = std::move(cb); }
        void setErrorCallback(EventCallback cb) { _errorCallback = std::move(cb); }
        // 启动当前channel中描述符的读事件监控
        void enableReading()
        {
            _events |= kReadEvent; // 修改当前要监控的事件
            update();              // poller->updatechannel(this); epoll_ctl(ADD, fd, events);
        }
        void disableReading()
        {
            _events &= ~kReadEvent; // 将kReadEvent位置0,其他位不变update();
            update();
        }
        void enableWriting()
        {
            _events |= kWriteEvent;
            update();
        }
        void disableWriting()
        {
            _events &= ~kWriteEvent;
            update();
        }
        // 取消所有监听
        void disableAll()
        {
            _events = kNoneEvent;
            update();
        }
        // 查询当前是否在监听 读/写 事件
        bool isWriting() const { return _events & kWriteEvent; }
        bool isReading() const { return _events & kReadEvent; }

        // 设置就绪状态
        int index() { return _index; }
        void set_index(int idx) { _index = idx; }
        // 获取当前channel所在的事件循环
        EventLoop *ownerLoop() { return _loop; }

        // 设置channel的外部管理对象,通过_tie成员观察外部对象是否已经被释放
        void tie(const std::shared_ptr<void> &obj)
        {
            _tie = obj;
            _tied = true;
        }
        // 获取监控的描述符
        int fd() const { return _fd; }
        // 获取监控中的事件
        int events() const { return _events; }
        // 设置描述符的就绪事件,在Poller中设置
        void setRevents(int revt) { _revents = revt; }
        // 判断当前描述符是否还有监控的事件
        bool isNoneEvent() const { return _events == kNoneEvent; }

        // 根据tie观察管理⾃⼰的Connection对象是否还存在,不存在就不处理事件了
        // Acceptor和TimerQueue本⾝就不设置这个,所以直接执⾏handleEventWithGuard
        // 总体的事件处理:根据不同的实际就绪的事件,调用不同的回调函数进行事件处理
        
        // EventLoop 收到就绪事件后,调用这个函数处理
        void handleEvent(Timestamp receiveTime)
        {
            if (_tied)
            {
                // 设置了观察者对象,主要针对Connection
                // 如果绑定了对象(Connection),先判断对象是否还活着
                if (_tie.lock())
                {
                    // 通过观察者对象观察外部对象是否被释放
                    handleEventWithGuard(receiveTime);
                }
            }
            else
            {
                // 主要针对Aceeptor,TimerQueue
                handleEventWithGuard(receiveTime);
            }
        }

        // 通过_loop移除监控和管理即可
        void remove();

    private:
        // 1.EPOLLHUP且不可读则直接close; 2. EPOLLERR 就直接error
        // 3.EPOLLIN | EPOLLPRI | EPOLLRDHUP(半关闭)则调⽤read,
        //4. EPOLLOUT则write
        // 根据 _revents(实际发生的事件)调用对应的回调
        void handleEventWithGuard(Timestamp receiveTime)
        {
            // 针对不同事件进行不同的处理
            // 1.若连接挂断,且没有出发可读事件:直接handleClose
            _eventHandling = true;
            if (_revents & EPOLLHUP && !(_revents & EPOLLIN))
            {
                if (_closeCallback)
                    _closeCallback();
            }

            // 2.触发了可读事件:EPoLLIN|EPOLLPRI,调用handleRead
            if (_revents & (EPOLLIN | EPOLLPRI) || _revents & EPOLLRDHUP)
            {
                if (_readCallback)
                    _readCallback(receiveTime);
            }

            // 3.触发了可写事件 :EPOLLOUT,调用handleWrite
            if (_revents & EPOLLOUT)
            {
                if (_writeCallback)
                    _writeCallback();
            }

            // 4.触发了错误事件:EPOLLERR 调用handleError
            if (_revents & EPOLLERR)
            {
                if (_errorCallback)
                    _errorCallback();
            }
            _eventHandling = false;
        }
        // 通过_loop更新监控事件即可
        void update(); // poller->uppdateChannel(this)
    

    private:
        EventLoop *_loop; // 当前channel挂在哪个loop中进⾏事件监控
        int _fd;          // 当前channel对应的描述符
        int _events;      // 当前channel所监控的事件
        int _revents;     // 当前channel所触发的事件
        int _index;       // 当前channel的状态

        // 观察者模式的另类使用(是否绑定了外部对象)
        bool _tied;
        // 观察者模式的⼀种另类使⽤,⽤于观察指定的对象存在性
        std::weak_ptr<void> _tie; 

        bool _eventHandling; // 当前channel是否处于事件处理中
        bool _addedToLoop;   // 当前channel对象是否已经被添加到loop中

        ReadEventCallback _readCallback; // 读事件的处理回调函数
        EventCallback _writeCallback;
        EventCallback _closeCallback;
        EventCallback _errorCallback;
    };
}

源程序:

#include "channel.h"
#include "eventloop.h"
#include "details.h"
#include <cassert>

namespace net
{
    Channel::Channel(EventLoop *loop, int fd)
        : _loop(loop),
          _fd(fd),
          _events(kNoneEvent),
          _revents(kNoneEvent),
          _index(kNew),
          _tied(false),
          _eventHandling(false),
          _addedToLoop(false)
    {
        //LOG_DEBUG("NEW Channel: %lu", (uint64_t)this);
    }
    Channel::~Channel()
    {
        assert(_eventHandling == false);
        assert(_addedToLoop == false);
        //LOG_DEBUG("DELETE Channel:%lu", (uint64_t)this);
    }
    void Channel::remove()
    {
        _loop->removeChannel(this);
        _addedToLoop = false;
    }
    // 通过_loop更新监控事件即可
    void Channel::update() // poller->uppdateChannel(this)
    {
        _addedToLoop = true;
        _loop->updateChannel(this);
        
    }

}

核心:handleEvent () —— 事件处理分发

这就是 Reactor 的事件分发机制


3. Poller / EPollPoller —— epoll 封装

封装了 Linux 最高效的 IO 多路复用:epoll

头文件:

#pragma once
// epoll 系统调用所需头文件
#include <sys/epoll.h>
// 存储活跃 Channel、epoll 事件
#include <vector>
// 管理 fd -> Channel 的映射关系
#include <map>
// 时间戳(poll 返回当前时间)
#include "timestamp.h"

namespace net
{
    class Channel;
    class EventLoop;
    class Timestamp;

    class Poller
    {
    public:
        typedef std::vector<Channel *> ChannelList;
        // 构造:绑定一个 EventLoop(一个 Poller 属于一个 EventLoop)
        Poller(EventLoop *loop) : _ownerLoop(loop) {}
        virtual ~Poller() = default;
        // 获取就绪channel数组
        virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0;
        // 新增或修改或移除channel的事件监控
        virtual void updateChannel(Channel *channel) = 0;

        // 移除channel的事件监控以及channel管理
        virtual void removeChannel(Channel *channel) = 0;
        // 判断channel是否在管理中
        virtual bool hasChannel(Channel *channel) const;
        // 创建默认的Poller对象
        static Poller *newDefaultPoller(EventLoop *loop);

    protected:
        // 管理:fd -> Channel* 的映射表
        // 作用:epoll_wait 返回 fd 后,快速找到对应的 Channel
        typedef std::map<int, Channel *> ChannelMap;
        ChannelMap _channels;

    private:
        //一个线程 = 一个 EventLoop = 一个 Poller
        EventLoop *_ownerLoop;
    };

    // Channel的监控状态
    namespace
    {
        const int kNew = -1;    // channel是新建的(未加入epoll)
        const int kAdded = 1;   // channel已添加监控
        const int kDeleted = 2; // channel被移除了监控,但还在_channels(map)中
    }
    namespace
    {
        static const int kNoneEvent = 0;

        static const int kReadEvent = EPOLLIN | EPOLLPRI;
        static const int kWriteEvent = EPOLLOUT;
    }

    // EPollPoller:Poller 的 epoll 实现(Linux 高性能核心)
    class EPollPoller : public Poller
    {
    public:
        EPollPoller(EventLoop *loop);
        ~EPollPoller() override;
        // 获取就绪channel列表,⽐较特殊的是这⾥⽤了动态数组,如果满载就扩容
        Timestamp poll(int timeoutMs, ChannelList *activeChannels) override;
        // 如果channel状态是 kNew || kDeleted 就添加监控,如果是kNew 还要添加管理
        // 添加完毕后,将channel状态设置为 kAdded
        // 如果是 kAdded 还更新,监控事件是0就移除监控,并置位kDeleted,否则就是更新事件
        void updateChannel(Channel *channel) override;
        // 从管理中移除channel,并移除channel的事件监控,置位channel状态为kNew
        void removeChannel(Channel *channel) override;

    private:
        // 将操作宏转换为字符串,⽤于⽇志输出
        // const char *EPollPoller::operationToString(int op);
        // 从EventList中获取就绪事件的channel,设置就绪事件,加⼊activeChannels中
        void fillActiveChannels(int numEvents, ChannelList *activeChannels)
            const;
        // 实际的epoll操作接⼝
        void update(int operation, Channel *channel);

    private:
        static const int kInitEventListSize = 16;
        // 存储 epoll_wait 返回的事件列表
        typedef std::vector<struct epoll_event> EventList;
        int _epollfd;  //文件描述符
        EventList _events;  // 存放就绪事件(动态数组,自动扩容)
    };

}

源程序:


#include "poller.h"
#include "details.h"
#include <errno.h> //错误码
#include <cstring> //strerror
#include <sys/eventfd.h>
#include <cassert>
#include <unistd.h> //close
#include "timestamp.h"
#include "channel.h"

namespace net
{
    // epoll_wait 默认超时时间 1000ms
    const int InitEpollTimeout = 1000;
    int createEpoll()
    {
        // 创建eventfd对象,用于事件唤醒
        // EFD_CLOEXEC:fork 子进程时自动关闭 fd,防止 fd 泄漏
        int fd = epoll_create1(EPOLL_CLOEXEC);
        if (fd < 0)
        {
            LOG_FATAL("eventfd 失败: %s", strerror(errno));
        }
        return fd;
    }

    // 判断channel是否在管理中
    bool Poller::hasChannel(Channel *channel) const
    {
        int fd = channel->fd();
        if (_channels.find(fd) != _channels.end())
        {
            return true;
        }
        return false;
    }
    // 创建默认的Poller对象(就是 EpollPoller)
    Poller *Poller::newDefaultPoller(EventLoop *loop)
    {
        return new EPollPoller(loop);
    }

    // 初始化成员
    EPollPoller::EPollPoller(EventLoop *loop)
        : Poller(loop),
          _epollfd(createEpoll()),
          _events(kInitEventListSize)
    {
    }
    // 关闭epoll
    EPollPoller::~EPollPoller()
    {
        ::close(_epollfd);
    }
    // 获取就绪channel列表,⽐较特殊的是这⾥⽤了动态数组,如果满载就扩容
    Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels)
    {
        Timestamp now = Timestamp::now();
        // 阻塞等待事件
        // &_events[0]:数组首地址
        // _events.size():最大监听数量
        // InitEpollTimeout:超时时间
        // 内核将就绪 fd 事件拷贝到你给的数组里,返回就绪个数。
        int ret = epoll_wait(_epollfd, &_events[0], _events.size(), InitEpollTimeout);
        if (ret < 0)
        {
            // EINTR:被信号中断,正常现象,不报错
            if (errno == EINTR)
            {
                LOG_DEBUG("epoll 被信号中断");
                return now;
            }
            LOG_ERROR("epoll wait 错误:%s", strerror(errno));
            return now;
        }
        else if (ret == 0)
        {
            // LOG_DEBUG("epoll wait 超时");
            //  超时,无事发生
            return now;
        }
        // 有事件到来:把 epoll 事件 → 填充为 Channel 列表
        fillActiveChannels(ret, activeChannels);
        // 扩容:如果事件数量达到数组上限,自动扩容 2 倍
        if (ret == _events.size())
        {
            _events.resize(_events.size() * 2);
        }
        return now;
    }
    // 如果channel状态是 kNew || kDeleted 就添加监控,如果是kNew 还要添加管理
    // 添加完毕后,将channel状态设置为 kAdded
    // 如果是 kAdded 还更新,监控事件是0就移除监控,并置位kDeleted,否则就是更新事件
    void EPollPoller::updateChannel(Channel *channel)
    {
        // 针对channel的index状态进行不同处理
        // kNew,kDeleted 表示channel 描述符没有被监控
        // kAdded 表示监控中
        if (channel->index() == kNew || channel->index() == kDeleted)
        {
            int fd = channel->fd();
            if (channel->index() == kNew)
            {
                assert(_channels.find(fd) == _channels.end());
                _channels[channel->fd()] = channel;
            }
            else
            {
                assert(_channels.find(fd) != _channels.end());
                assert(_channels[fd] == channel);
            }
            update(EPOLL_CTL_ADD, channel);
            channel->set_index(kAdded);
        }
        else
        {
            if (channel->events() == kNoneEvent)
            {
                update(EPOLL_CTL_DEL, channel);
                channel->set_index(kDeleted);
            }
            else
            {
                update(EPOLL_CTL_MOD, channel);
            }
        }
    }
    // 从管理中移除channel,并移除channel的事件监控,置位channel状态为kNew
    void EPollPoller::removeChannel(Channel *channel)
    {
        int fd = channel->fd();
        assert(_channels.find(fd) != _channels.end());
        assert(_channels[fd] == channel);
        if (channel->index() == kAdded)
        {
            update(EPOLL_CTL_DEL, channel);
        }

        // 移除管理(从 map 中删除)
        _channels.erase(channel->fd());
        channel->set_index(kNew);
    }

    // 从EventList中获取就绪事件的channel,设置就绪事件,加⼊activeChannels中
    void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const
    {
        for (int i = 0; i < numEvents; ++i)
        {
            // 从 epoll_event 中取出 Channel*
            Channel *channel = (Channel *)_events[i].data.ptr;
            // 设置实际就绪的事件
            // _events[i].events 是内核告诉我们的:这个 fd 到底发生了什么事件
            // 比如:可读(EPOLLIN)、可写(EPOLLOUT)、错误(EPOLLERR)
            // 把它设置给 Channel,让 Channel 知道自己触发了什么事件
            channel->setRevents(_events[i].events);
            // 加入活跃列表,返回给 EventLoop
            // 最终 EventLoop.loop() 会遍历它,调用 handleEvent()
            activeChannels->push_back(channel);
        }
    }
    // 实际的epoll操作接⼝
    // 真正执行 epoll_ctl 系统调用:添加/修改/删除 fd 的监听
    // operation: EPOLL_CTL_ADD / EPOLL_CTL_MOD / EPOLL_CTL_DEL
    void EPollPoller::update(int operation, Channel *channel)
    {
        int fd = channel->fd();
        int events = channel->events();
        // 定义一个 epoll_event 结构体,传给内核
        struct epoll_event ev;
        // 要监听的事件
        ev.events = events;
        // 附带 Channel 指针
        // 把 channel 的地址存进 data.ptr
        ev.data.ptr = channel;
        int ret = epoll_ctl(_epollfd, operation, fd, &ev);
        if (ret < 0)
        {
            LOG_ERROR("epoll_ctl 错误:%s", strerror(errno));
        }
    }
}

4. EventLoopThread —— 线程 + 事件循环

头文件:

// EventLoopThread:封装一个线程 + 一个 EventLoop
    class EventLoopThread
    {
    public:
        EventLoopThread();
        ~EventLoopThread();
        // 启动线程,返回内部创建的 loop
        EventLoop *startLoop(); // 获取事件循环对象的指针
    private:
        void threadFunc(); // 线程的入口函数
    private:
        EventLoop *_loop;              // 事件循环对象的指针
        std::mutex _mutex;             // 互斥锁
        std::condition_variable _cond; // 条件变量
        std::thread _thread;           // 线程
    };

源程序:

EventLoopThread::EventLoopThread() : _loop(NULL),
                                         _thread(std::thread(&EventLoopThread::threadFunc, this)) {}
    EventLoopThread::~EventLoopThread()
    {
        // 1.退出事件循环
        _loop->quit();
        // 2.等待线程退出
        _thread.join();
    }
    // 获取事件循环对象的指针
    EventLoop *EventLoopThread::startLoop()
    {
        // 当获取loop对象指针的时候,必须保证_loop已经赋值完毕
        std::unique_lock<std::mutex> lock(_mutex);
        _cond.wait(lock, [this]()
                   { return _loop != NULL; }); // 不为空结束
        return _loop;
    }
    // 线程的入口函数
    void EventLoopThread::threadFunc()
    {
        // 实例化一个局部的eventloop对象
        EventLoop loop;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _loop = &loop;
            _cond.notify_all(); // 唤醒所有的阻塞等待
        }
        loop.loop(); // 开始事件循环
        _loop = NULL;
    }

每个线程独立运行一个 EventLoop。


5. EventLoopThreadPool —— 主从 Reactor 线程池

这是高并发关键!

头文件:

// EventLoopThreadPool:线程池,管理 N 个 EventLoopThread
    // 实现主从 Reactor 多线程模型
    class EventLoopTreadPool
    {
    public:
        EventLoopTreadPool(EventLoop *baseloop);
        ~EventLoopTreadPool();
        void setThreadNum(int count); // 设置线程数
        void start();                 // 启动所有线程
        EventLoop *getNextLoop();     // 轮询获取下一个 loop(负载均衡)

    private:
        int _thread_num;      // 线程数量
        int _next_idx;        // 负载均衡索引
        EventLoop *_baseloop; // 使用者定义的EventLoop对象
        std::vector<std::unique_ptr<EventLoopThread>> _threads;
        std::vector<EventLoop *> _loops;
    };

源程序:

EventLoopTreadPool::EventLoopTreadPool(EventLoop *baseloop)
        : _thread_num(0),
          _next_idx(0),
          _baseloop(baseloop) {}
    EventLoopTreadPool::~EventLoopTreadPool() {}
    void EventLoopTreadPool::setThreadNum(int count)
    {
        _thread_num = count;
    }
    void EventLoopTreadPool::start()
    {
        _baseloop->assertInLoopThread();
        // 根据数量设置,创建指定数量的Loopthread
        // 根据创建的Loopthread,初始化添加loop和thread成员
        for (int i = 0; i < _thread_num; i++)
        {
            EventLoopThread *loopThread = new EventLoopThread();
            _threads.push_back(std::unique_ptr<EventLoopThread>(loopThread));
            _loops.push_back(loopThread->startLoop());
        }
    }
    // 轮询获取下一个 loop(负载均衡)
    EventLoop *EventLoopTreadPool::getNextLoop()
    {
        // 必须在【主线程(baseloop 线程)】调用
        // 因为这是 main reactor 负责分配连接
        _baseloop->assertInLoopThread();

        // 如果线程池数量为0(单线程模式)
        // 直接返回主 loop,所有事情都在主线程处理
        if (_thread_num == 0)
        {
            return _baseloop;
        }
        // RR轮转负载均衡策略
        //  RR 轮询算法:按顺序取下一个 EventLoop
        // 第一次取 _loops[0]
        // 第二次取 _loops[1]
        // 第三次取 _loops[2]...
        EventLoop *retval = _loops[_next_idx];
        // 索引 +1,下次取下一个
        _next_idx++;
        // 如果索引到达末尾,重置为 0,循环往复
        if (_next_idx == _loops.size())
        {
            _next_idx = 0;
        }
        // 返回选中的 EventLoop
        return retval;
    }

作用:

  • Main Reactor 接收新连接
  • 调用 getNextLoop () 分配给 Sub Reactor
  • 实现多核 CPU 负载均衡

四、Reactor 框架执行流程(最清晰图解)

1. 服务器启动

  • 启动 Main Reactor(baseloop)
  • 启动 Sub Reactor 线程池
  • 监听套接字 Channel 注册到 Main Reactor

2. 新连接到来

  1. epoll 通知 accept 事件
  2. Acceptor 接受新连接
  3. 分配一个 Sub Reactor
  4. 将客户端 fd 打包成 Channel 加入 Sub Reactor

3. 消息到达

  1. Sub Reactor 监听到 EPOLLIN 事件
  2. 调用 Channel 读回调
  3. 解析报文 → 执行业务 → 回复消息

4. 连接关闭

  1. EPOLLHUP / EPOLLERR
  2. 调用关闭回调
  3. 移除 Channel,关闭 fd

五、框架超高亮点

1. 线程隔离(One Loop Per Thread)

  • 一个 EventLoop 只在一个线程运行
  • 无锁、无竞争、高性能

2. 跨线程任务调用(runInLoop /queueInLoop)

// 在loop线程中则直接执⾏任务,否则就添加到任务池
    void EventLoop::runInLoop(Functor cb)
    {
        if (isInLoopThread())
        {
            cb(); // 当前线程 → 直接执行
        }
        else
        {
            queueInLoop(cb); // 其他线程 → 放入队列
        }
    }
    // 将任务添加到任务池中,如果当前没有在loop线程中,或者当前处于处理任务池任务状态则唤醒
    void EventLoop::queueInLoop(Functor cb)
    {
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _pendingFunctors.push_back(cb);
        }

        // 如果当前没有在1oop线程中,或者当前处于处理任务池任务状态则唤醒
        if (_callingPendingFunctors || !isInLoopThread())
        {
            wakeup(); // 唤醒事件监控
        }
    }

3. eventfd 唤醒机制

  • 用 eventfd 唤醒 epoll_wait
  • 实现高效跨线程异步调用

4. 主从 Reactor 多线程架构

  • 业界标准高并发架构
  • Nginx / Netty /muduo 同款

5. 高精度定时器(runAt /runAfter/runEvery)

_timerQueue->addTimer(cb, when, interval);

六、Reactor 模型面试 10 问(背会直接拿分)

  1. 什么是 Reactor 模型?事件驱动,epoll 监听,回调分发。

  2. Reactor 三大组件?EventLoop、Channel、Poller。

  3. 为什么一个线程一个 EventLoop?无锁、线程安全、减少切换。

  4. Channel 作用?包装 fd,管理事件与回调。

  5. EPollPoller 作用?封装 epoll,等待事件返回。

  6. EventLoopThreadPool 作用?主从 Reactor,多核负载均衡。

  7. runInLoop 作用?跨线程安全执行任务。

  8. wakeupFd 作用?唤醒 epoll,处理异步任务。

  9. 水平触发 vs 边缘触发?水平触发稳定,你用的是水平触发。

  10. 主从 Reactor 优势?高并发、多核利用、高稳定、高吞吐。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐