Reactor 模型高性能网络框架全解析:从原理到工程级代码实现
一、Reactor 模型到底是什么?(一句话讲透)
Reactor = 事件驱动 + IO 多路复用 + 回调分发
一句话总结:一个循环等着事件来,事件来了自动分发到对应处理函数。
核心三要素:
- 事件循环(EventLoop):负责跑循环、等事件、调回调
- IO 多路复用(epoll):负责监听海量 fd
- 事件分发器(Channel):每个 fd 对应一个 Channel,负责事件处理
二、Reactor 框架整体架构(标准主从 Reactor)
1. Main Reactor(主反应堆)
- 只有一个 EventLoop
- 只做 accept 新连接
- 把新连接分发给 Sub Reactor
2. Sub Reactor(从反应堆)
- 多个 EventLoop,每个一个线程
- 负责已连接套接字的 read/write/close
- 实现多核负载均衡
3. ThreadPool(业务线程池)
- 处理耗时业务(数据库、计算)
4. TimerQueue(定时器队列)
- 统一管理定时任务、超时断开
三、核心组件逐行精讲
1. EventLoop —— Reactor 的心脏
一个线程 = 一个 EventLoop = 一个 epoll
头文件:
#pragma once
#include <vector>
#include <atomic>
#include <unistd.h>
#include <thread>
#include <condition_variable>
#include "timestamp.h"
#include <memory>
#include "poller.h"
#include <mutex>
#include <functional>
#include "timer.h"
#include <signal.h>
namespace net
{
// 写已经关闭的 socket 时,系统会发 SIGPIPE 默认杀死进程
// 服务器必须忽略!
class IgnoreSigPipe
{
public:
IgnoreSigPipe()
{
signal(SIGPIPE, SIG_IGN);
}
};
static IgnoreSigPipe ignore_pipe;
// epoll_wait 超时 1 秒
const int InitEpollTimeout = 1000;
class Channel;
// class TimerQueue;
// 事件循环类:事件监控+任务池+定时器
// 一个线程 = 一个 EventLoop
class EventLoop
{
// 任务回调类型
typedef std::function<void()> Functor;
public:
// 初始化成成员
EventLoop();
~EventLoop();
// 循环事件监控,获取就绪channel列表,逐个执⾏handleEvent即可
void loop();
// 置位_quit, 如果当前不在loop线程中,就唤醒⼀下loop
void quit();
// 在loop线程中则直接执⾏任务,否则就添加到任务池
void runInLoop(Functor cb);
// 将任务添加到任务池中,如果当前没有在loop线程中,或者当前处于处理任务池任务状态则唤醒
void queueInLoop(Functor cb);
// 唤醒loop,其实就是给_eventfd写⼊个数据即可(要求8字节),与handleRead对应
void wakeup();
// 通过poller添加监控即可
void updateChannel(Channel *channel);
// 通过poller移除监控即可
void removeChannel(Channel *channel);
// 通过poller返回管理结果即可
bool hasChannel(Channel *channel);
// 判断不再当前loop的线程中就抛异常就⾏
void assertInLoopThread();
// 判断当前是否处于loop所在线程
bool isInLoopThread() const;
// 提供三种不同的接口添加定时任务:定点,延迟,循环
// 通过timer添加指定时间的定时任务
TimerId runAt(Timestamp time, TimerCallback cb);
// 通过timer添加延迟N秒的定时任务
TimerId runAfter(double delay_sec, TimerCallback cb);
// 通过timer添加循环定时任务
TimerId runEvery(double interval_sec, TimerCallback cb);
private:
// 处理_eventfd的读事件,读取_eventfd中的数据
void handleRead();
// 针对任务池中的任务进⾏调⽤执⾏即可
void doPendingFunctors();
private:
typedef std::vector<Channel *> ChannelList;
bool _looping; // 当前是否处于循环监控状态中
std::atomic<bool> _quit; // 循环退出控制标志位:事件循环的循环条件
bool _eventHandling; // 当前是否处理正在处理事件中
bool _callingPendingFunctors; // 当前是否处于正在处理任务池中的任务中
// 当前eventloop所在的线程ID,
pid_t _threadId; // 线程ID,pthread_self,pid_t getid()==getpid()
Timestamp _pollReturnTime; // poll监控返回时的时间
std::unique_ptr<Poller> _poller; // poller监控对象
std::unique_ptr<TimerQueue> _timerQueue; // 定时器对象,⼀定要在poller之后
int _wakeupFd; // 事件监控唤醒描述符
std::unique_ptr<Channel> _wakeupChannel; // 循环唤醒描述符对应的channel对象
ChannelList _activeChannels; // 当前活跃的所有channel列表
Channel *_currentActiveChannel; // 从_activeChannels获取当前正在处理的channel
std::mutex _mutex; // 任务池的保护锁
std::vector<Functor> _pendingFunctors; // 任务池
};
// EventLoopThread:封装一个线程 + 一个 EventLoop
class EventLoopThread
{
...
};
// EventLoopThreadPool:线程池,管理 N 个 EventLoopThread
// 实现主从 Reactor 多线程模型
class EventLoopTreadPool
{
...
};
}
源程序:
#include "eventloop.h"
#include "poller.h"
#include "channel.h"
#include <cassert>
#include <sys/eventfd.h>
#include "details.h"
#include <cassert>
namespace net
{
// 创建 eventfd:用于跨线程唤醒 epoll_wait
int createEventFd()
{
// 创建eventfd对象,用于事件唤醒
// EFD_CLOEXEC:fork后在子进程中关闭描述符
// EFD_NONBLOCK:非阻塞
int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (fd < 0)
{
LOG_FATAL("eventfd 失败: %s", strerror(errno));
}
return fd;
}
// 唤醒:往 eventfd 写 8 字节
void writeEventFd(int fd)
{
uint64_t val = 1;
ssize_t ret = ::write(fd, &val, sizeof(val));
if (ret < 0)
{
LOG_ERROR("event fd write error: %s", strerror(errno));
}
}
// 读走数据,清空 eventfd 可读状态
void readEventFd(int fd)
{
uint64_t val = 1;
ssize_t ret = ::read(fd, &val, sizeof(val));
if (ret < 0)
{
LOG_ERROR("event fd write error: %s", strerror(errno));
}
}
// 初始化成成员
EventLoop::EventLoop()
: _looping(false),
_quit(false),
_eventHandling(false),
_callingPendingFunctors(false),
_threadId(::gettid()), // 决定了eventloop对象必须在所在线程中实例化
_poller(Poller::newDefaultPoller(this)),
_timerQueue(new TimerQueue(this)),
_wakeupFd(createEventFd()),
_wakeupChannel(new Channel(this, _wakeupFd))
{
// 为_wakeupFd触发的可读事件设置回调处理函数
_wakeupChannel->setReadCallback(std::bind(&EventLoop::handleRead, this));
// 将_wakeupFd挂到Poller中进行监控
_wakeupChannel->enableReading();
}
EventLoop::~EventLoop()
{
assert(_looping == false);
if (_poller->hasChannel(_wakeupChannel.get()))
{
_wakeupChannel->disableAll();
_wakeupChannel->remove();
}
::close(_wakeupFd);
}
// 循环事件监控,获取就绪channel列表,逐个执⾏handleEvent即可
// 开始事件循环:1.开始描述符事件监控,2.处理任务池的任务
void EventLoop::loop()
{
_looping = true;
_quit = false;
while (!_quit)
{
// 获取就绪描述符channel,进行事件处理
_activeChannels.clear();
Timestamp now = _poller->poll(InitEpollTimeout, &_activeChannels);
_currentActiveChannel = NULL;
_eventHandling = true;
for (int i = 0; i < _activeChannels.size(); ++i)
{
_currentActiveChannel = _activeChannels[i]; // 拿到就绪channel
_currentActiveChannel->handleEvent(now);
}
_eventHandling = false;
// 处理任务池中的任务
doPendingFunctors();
}
_looping = false;
}
// 置位_quit, 如果当前不在loop线程中,就唤醒⼀下loop
void EventLoop::quit()
{
_quit = true;
// 如果当前没有在1oop线程中,或者当前处于处理任务池任务状态则唤醒
if (!isInLoopThread())
{
wakeup(); // 唤醒事件监控
}
}
// 在loop线程中则直接执⾏任务,否则就添加到任务池
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb(); // 当前线程 → 直接执行
}
else
{
queueInLoop(cb); // 其他线程 → 放入队列
}
}
// 将任务添加到任务池中,如果当前没有在loop线程中,或者当前处于处理任务池任务状态则唤醒
void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(_mutex);
_pendingFunctors.push_back(cb);
}
// 如果当前没有在1oop线程中,或者当前处于处理任务池任务状态则唤醒
if (_callingPendingFunctors || !isInLoopThread())
{
wakeup(); // 唤醒事件监控
}
}
// 唤醒loop,其实就是给_eventfd写⼊个数据即可(要求8字节),与handleRead对应
void EventLoop::wakeup()
{
writeEventFd(_wakeupFd);
}
// 通过poller添加监控即可
void EventLoop::updateChannel(Channel *channel)
{
assertInLoopThread();
_poller->updateChannel(channel);
}
// 通过poller移除监控即可
void EventLoop::removeChannel(Channel *channel)
{
assertInLoopThread();
_poller->removeChannel(channel);
}
// 通过poller返回管理结果即可
bool EventLoop::hasChannel(Channel *channel)
{
assertInLoopThread();
return _poller->hasChannel(channel);
}
// 判断不再当前loop的线程中就抛异常就⾏
void EventLoop::assertInLoopThread()
{
assert(_threadId == ::gettid());
}
// 判断当前是否处于loop所在线程
bool EventLoop::isInLoopThread() const
{
if (_threadId == ::gettid())
{
return true;
}
return false;
}
// 处理_eventfd的读事件,读取_eventfd中的数据
void EventLoop::handleRead()
{
readEventFd(_wakeupFd);
}
// 针对任务池中的任务进⾏调⽤执⾏即可
void EventLoop::doPendingFunctors()
{
_callingPendingFunctors = true;
// 交换,减少锁范围
std::vector<Functor> functors;
{
std::unique_lock<std::mutex> lock(_mutex);
functors.swap(_pendingFunctors);
}
// 依次执行任务
for (auto &functor : functors)
{
functor();
}
_callingPendingFunctors = false;
}
// 通过timer添加指定时间的定时任务
// 在构造函数中初始化
TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
{
return _timerQueue->addTimer(std::move(cb), time, 0);
}
// 通过timer添加延迟N秒的定时任务
TimerId EventLoop::runAfter(double delay_sec, TimerCallback cb)
{
// 根据当前系统时间,计算得到超时时间点
Timestamp when = net::addTime(Timestamp::now(), delay_sec);
// 添加定时任务
return _timerQueue->addTimer(std::move(cb), when, 0);
}
// 通过timer添加循环定时任务
TimerId EventLoop::runEvery(double interval_sec, TimerCallback cb)
{
Timestamp when = net::addTime(Timestamp::now(), interval_sec);
// 添加定时任务
return _timerQueue->addTimer(std::move(cb), when, interval_sec);
}
}
核心函数:loop () —— 真正的 Reactor 循环
这就是 Reactor 模型的全部灵魂!
2. Channel —— 事件分发器(fd 的管家)
一个 fd 永远只对应一个 Channel
头文件:
#pragma once
#include <functional>
#include <map>
#include <memory>
#include "poller.h"
namespace net
{
// 一个 Channel 永远只属于 1 个 EventLoop
class Channel
{
public:
// 定义回调函数类型
typedef std::function<void()> EventCallback;// 普通事件回调
// 读事件回调(带时间戳)
typedef std::function<void(Timestamp)> ReadEventCallback;
Channel(EventLoop *loop, int fd);
~Channel();
// 设置事件回调处理函数
// 启动当前channel中描述符的读事件监控events |= kReadEvent;//修改当前要监控的事件 // poller->updatechannel(this);
void setReadCallback(ReadEventCallback cb) { _readCallback = std::move(cb); }
void setWriteCallback(EventCallback cb) { _writeCallback = std::move(cb); }
void setCloseCallback(EventCallback cb) { _closeCallback = std::move(cb); }
void setErrorCallback(EventCallback cb) { _errorCallback = std::move(cb); }
// 启动当前channel中描述符的读事件监控
void enableReading()
{
_events |= kReadEvent; // 修改当前要监控的事件
update(); // poller->updatechannel(this); epoll_ctl(ADD, fd, events);
}
void disableReading()
{
_events &= ~kReadEvent; // 将kReadEvent位置0,其他位不变update();
update();
}
void enableWriting()
{
_events |= kWriteEvent;
update();
}
void disableWriting()
{
_events &= ~kWriteEvent;
update();
}
// 取消所有监听
void disableAll()
{
_events = kNoneEvent;
update();
}
// 查询当前是否在监听 读/写 事件
bool isWriting() const { return _events & kWriteEvent; }
bool isReading() const { return _events & kReadEvent; }
// 设置就绪状态
int index() { return _index; }
void set_index(int idx) { _index = idx; }
// 获取当前channel所在的事件循环
EventLoop *ownerLoop() { return _loop; }
// 设置channel的外部管理对象,通过_tie成员观察外部对象是否已经被释放
void tie(const std::shared_ptr<void> &obj)
{
_tie = obj;
_tied = true;
}
// 获取监控的描述符
int fd() const { return _fd; }
// 获取监控中的事件
int events() const { return _events; }
// 设置描述符的就绪事件,在Poller中设置
void setRevents(int revt) { _revents = revt; }
// 判断当前描述符是否还有监控的事件
bool isNoneEvent() const { return _events == kNoneEvent; }
// 根据tie观察管理⾃⼰的Connection对象是否还存在,不存在就不处理事件了
// Acceptor和TimerQueue本⾝就不设置这个,所以直接执⾏handleEventWithGuard
// 总体的事件处理:根据不同的实际就绪的事件,调用不同的回调函数进行事件处理
// EventLoop 收到就绪事件后,调用这个函数处理
void handleEvent(Timestamp receiveTime)
{
if (_tied)
{
// 设置了观察者对象,主要针对Connection
// 如果绑定了对象(Connection),先判断对象是否还活着
if (_tie.lock())
{
// 通过观察者对象观察外部对象是否被释放
handleEventWithGuard(receiveTime);
}
}
else
{
// 主要针对Aceeptor,TimerQueue
handleEventWithGuard(receiveTime);
}
}
// 通过_loop移除监控和管理即可
void remove();
private:
// 1.EPOLLHUP且不可读则直接close; 2. EPOLLERR 就直接error
// 3.EPOLLIN | EPOLLPRI | EPOLLRDHUP(半关闭)则调⽤read,
//4. EPOLLOUT则write
// 根据 _revents(实际发生的事件)调用对应的回调
void handleEventWithGuard(Timestamp receiveTime)
{
// 针对不同事件进行不同的处理
// 1.若连接挂断,且没有出发可读事件:直接handleClose
_eventHandling = true;
if (_revents & EPOLLHUP && !(_revents & EPOLLIN))
{
if (_closeCallback)
_closeCallback();
}
// 2.触发了可读事件:EPoLLIN|EPOLLPRI,调用handleRead
if (_revents & (EPOLLIN | EPOLLPRI) || _revents & EPOLLRDHUP)
{
if (_readCallback)
_readCallback(receiveTime);
}
// 3.触发了可写事件 :EPOLLOUT,调用handleWrite
if (_revents & EPOLLOUT)
{
if (_writeCallback)
_writeCallback();
}
// 4.触发了错误事件:EPOLLERR 调用handleError
if (_revents & EPOLLERR)
{
if (_errorCallback)
_errorCallback();
}
_eventHandling = false;
}
// 通过_loop更新监控事件即可
void update(); // poller->uppdateChannel(this)
private:
EventLoop *_loop; // 当前channel挂在哪个loop中进⾏事件监控
int _fd; // 当前channel对应的描述符
int _events; // 当前channel所监控的事件
int _revents; // 当前channel所触发的事件
int _index; // 当前channel的状态
// 观察者模式的另类使用(是否绑定了外部对象)
bool _tied;
// 观察者模式的⼀种另类使⽤,⽤于观察指定的对象存在性
std::weak_ptr<void> _tie;
bool _eventHandling; // 当前channel是否处于事件处理中
bool _addedToLoop; // 当前channel对象是否已经被添加到loop中
ReadEventCallback _readCallback; // 读事件的处理回调函数
EventCallback _writeCallback;
EventCallback _closeCallback;
EventCallback _errorCallback;
};
}
源程序:
#include "channel.h"
#include "eventloop.h"
#include "details.h"
#include <cassert>
namespace net
{
Channel::Channel(EventLoop *loop, int fd)
: _loop(loop),
_fd(fd),
_events(kNoneEvent),
_revents(kNoneEvent),
_index(kNew),
_tied(false),
_eventHandling(false),
_addedToLoop(false)
{
//LOG_DEBUG("NEW Channel: %lu", (uint64_t)this);
}
Channel::~Channel()
{
assert(_eventHandling == false);
assert(_addedToLoop == false);
//LOG_DEBUG("DELETE Channel:%lu", (uint64_t)this);
}
void Channel::remove()
{
_loop->removeChannel(this);
_addedToLoop = false;
}
// 通过_loop更新监控事件即可
void Channel::update() // poller->uppdateChannel(this)
{
_addedToLoop = true;
_loop->updateChannel(this);
}
}
核心:handleEvent () —— 事件处理分发
这就是 Reactor 的事件分发机制。
3. Poller / EPollPoller —— epoll 封装
封装了 Linux 最高效的 IO 多路复用:epoll
头文件:
#pragma once
// epoll 系统调用所需头文件
#include <sys/epoll.h>
// 存储活跃 Channel、epoll 事件
#include <vector>
// 管理 fd -> Channel 的映射关系
#include <map>
// 时间戳(poll 返回当前时间)
#include "timestamp.h"
namespace net
{
class Channel;
class EventLoop;
class Timestamp;
class Poller
{
public:
typedef std::vector<Channel *> ChannelList;
// 构造:绑定一个 EventLoop(一个 Poller 属于一个 EventLoop)
Poller(EventLoop *loop) : _ownerLoop(loop) {}
virtual ~Poller() = default;
// 获取就绪channel数组
virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0;
// 新增或修改或移除channel的事件监控
virtual void updateChannel(Channel *channel) = 0;
// 移除channel的事件监控以及channel管理
virtual void removeChannel(Channel *channel) = 0;
// 判断channel是否在管理中
virtual bool hasChannel(Channel *channel) const;
// 创建默认的Poller对象
static Poller *newDefaultPoller(EventLoop *loop);
protected:
// 管理:fd -> Channel* 的映射表
// 作用:epoll_wait 返回 fd 后,快速找到对应的 Channel
typedef std::map<int, Channel *> ChannelMap;
ChannelMap _channels;
private:
//一个线程 = 一个 EventLoop = 一个 Poller
EventLoop *_ownerLoop;
};
// Channel的监控状态
namespace
{
const int kNew = -1; // channel是新建的(未加入epoll)
const int kAdded = 1; // channel已添加监控
const int kDeleted = 2; // channel被移除了监控,但还在_channels(map)中
}
namespace
{
static const int kNoneEvent = 0;
static const int kReadEvent = EPOLLIN | EPOLLPRI;
static const int kWriteEvent = EPOLLOUT;
}
// EPollPoller:Poller 的 epoll 实现(Linux 高性能核心)
class EPollPoller : public Poller
{
public:
EPollPoller(EventLoop *loop);
~EPollPoller() override;
// 获取就绪channel列表,⽐较特殊的是这⾥⽤了动态数组,如果满载就扩容
Timestamp poll(int timeoutMs, ChannelList *activeChannels) override;
// 如果channel状态是 kNew || kDeleted 就添加监控,如果是kNew 还要添加管理
// 添加完毕后,将channel状态设置为 kAdded
// 如果是 kAdded 还更新,监控事件是0就移除监控,并置位kDeleted,否则就是更新事件
void updateChannel(Channel *channel) override;
// 从管理中移除channel,并移除channel的事件监控,置位channel状态为kNew
void removeChannel(Channel *channel) override;
private:
// 将操作宏转换为字符串,⽤于⽇志输出
// const char *EPollPoller::operationToString(int op);
// 从EventList中获取就绪事件的channel,设置就绪事件,加⼊activeChannels中
void fillActiveChannels(int numEvents, ChannelList *activeChannels)
const;
// 实际的epoll操作接⼝
void update(int operation, Channel *channel);
private:
static const int kInitEventListSize = 16;
// 存储 epoll_wait 返回的事件列表
typedef std::vector<struct epoll_event> EventList;
int _epollfd; //文件描述符
EventList _events; // 存放就绪事件(动态数组,自动扩容)
};
}
源程序:
#include "poller.h"
#include "details.h"
#include <errno.h> //错误码
#include <cstring> //strerror
#include <sys/eventfd.h>
#include <cassert>
#include <unistd.h> //close
#include "timestamp.h"
#include "channel.h"
namespace net
{
// epoll_wait 默认超时时间 1000ms
const int InitEpollTimeout = 1000;
int createEpoll()
{
// 创建eventfd对象,用于事件唤醒
// EFD_CLOEXEC:fork 子进程时自动关闭 fd,防止 fd 泄漏
int fd = epoll_create1(EPOLL_CLOEXEC);
if (fd < 0)
{
LOG_FATAL("eventfd 失败: %s", strerror(errno));
}
return fd;
}
// 判断channel是否在管理中
bool Poller::hasChannel(Channel *channel) const
{
int fd = channel->fd();
if (_channels.find(fd) != _channels.end())
{
return true;
}
return false;
}
// 创建默认的Poller对象(就是 EpollPoller)
Poller *Poller::newDefaultPoller(EventLoop *loop)
{
return new EPollPoller(loop);
}
// 初始化成员
EPollPoller::EPollPoller(EventLoop *loop)
: Poller(loop),
_epollfd(createEpoll()),
_events(kInitEventListSize)
{
}
// 关闭epoll
EPollPoller::~EPollPoller()
{
::close(_epollfd);
}
// 获取就绪channel列表,⽐较特殊的是这⾥⽤了动态数组,如果满载就扩容
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels)
{
Timestamp now = Timestamp::now();
// 阻塞等待事件
// &_events[0]:数组首地址
// _events.size():最大监听数量
// InitEpollTimeout:超时时间
// 内核将就绪 fd 事件拷贝到你给的数组里,返回就绪个数。
int ret = epoll_wait(_epollfd, &_events[0], _events.size(), InitEpollTimeout);
if (ret < 0)
{
// EINTR:被信号中断,正常现象,不报错
if (errno == EINTR)
{
LOG_DEBUG("epoll 被信号中断");
return now;
}
LOG_ERROR("epoll wait 错误:%s", strerror(errno));
return now;
}
else if (ret == 0)
{
// LOG_DEBUG("epoll wait 超时");
// 超时,无事发生
return now;
}
// 有事件到来:把 epoll 事件 → 填充为 Channel 列表
fillActiveChannels(ret, activeChannels);
// 扩容:如果事件数量达到数组上限,自动扩容 2 倍
if (ret == _events.size())
{
_events.resize(_events.size() * 2);
}
return now;
}
// 如果channel状态是 kNew || kDeleted 就添加监控,如果是kNew 还要添加管理
// 添加完毕后,将channel状态设置为 kAdded
// 如果是 kAdded 还更新,监控事件是0就移除监控,并置位kDeleted,否则就是更新事件
void EPollPoller::updateChannel(Channel *channel)
{
// 针对channel的index状态进行不同处理
// kNew,kDeleted 表示channel 描述符没有被监控
// kAdded 表示监控中
if (channel->index() == kNew || channel->index() == kDeleted)
{
int fd = channel->fd();
if (channel->index() == kNew)
{
assert(_channels.find(fd) == _channels.end());
_channels[channel->fd()] = channel;
}
else
{
assert(_channels.find(fd) != _channels.end());
assert(_channels[fd] == channel);
}
update(EPOLL_CTL_ADD, channel);
channel->set_index(kAdded);
}
else
{
if (channel->events() == kNoneEvent)
{
update(EPOLL_CTL_DEL, channel);
channel->set_index(kDeleted);
}
else
{
update(EPOLL_CTL_MOD, channel);
}
}
}
// 从管理中移除channel,并移除channel的事件监控,置位channel状态为kNew
void EPollPoller::removeChannel(Channel *channel)
{
int fd = channel->fd();
assert(_channels.find(fd) != _channels.end());
assert(_channels[fd] == channel);
if (channel->index() == kAdded)
{
update(EPOLL_CTL_DEL, channel);
}
// 移除管理(从 map 中删除)
_channels.erase(channel->fd());
channel->set_index(kNew);
}
// 从EventList中获取就绪事件的channel,设置就绪事件,加⼊activeChannels中
void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const
{
for (int i = 0; i < numEvents; ++i)
{
// 从 epoll_event 中取出 Channel*
Channel *channel = (Channel *)_events[i].data.ptr;
// 设置实际就绪的事件
// _events[i].events 是内核告诉我们的:这个 fd 到底发生了什么事件
// 比如:可读(EPOLLIN)、可写(EPOLLOUT)、错误(EPOLLERR)
// 把它设置给 Channel,让 Channel 知道自己触发了什么事件
channel->setRevents(_events[i].events);
// 加入活跃列表,返回给 EventLoop
// 最终 EventLoop.loop() 会遍历它,调用 handleEvent()
activeChannels->push_back(channel);
}
}
// 实际的epoll操作接⼝
// 真正执行 epoll_ctl 系统调用:添加/修改/删除 fd 的监听
// operation: EPOLL_CTL_ADD / EPOLL_CTL_MOD / EPOLL_CTL_DEL
void EPollPoller::update(int operation, Channel *channel)
{
int fd = channel->fd();
int events = channel->events();
// 定义一个 epoll_event 结构体,传给内核
struct epoll_event ev;
// 要监听的事件
ev.events = events;
// 附带 Channel 指针
// 把 channel 的地址存进 data.ptr
ev.data.ptr = channel;
int ret = epoll_ctl(_epollfd, operation, fd, &ev);
if (ret < 0)
{
LOG_ERROR("epoll_ctl 错误:%s", strerror(errno));
}
}
}
4. EventLoopThread —— 线程 + 事件循环
头文件:
// EventLoopThread:封装一个线程 + 一个 EventLoop
class EventLoopThread
{
public:
EventLoopThread();
~EventLoopThread();
// 启动线程,返回内部创建的 loop
EventLoop *startLoop(); // 获取事件循环对象的指针
private:
void threadFunc(); // 线程的入口函数
private:
EventLoop *_loop; // 事件循环对象的指针
std::mutex _mutex; // 互斥锁
std::condition_variable _cond; // 条件变量
std::thread _thread; // 线程
};
源程序:
EventLoopThread::EventLoopThread() : _loop(NULL),
_thread(std::thread(&EventLoopThread::threadFunc, this)) {}
EventLoopThread::~EventLoopThread()
{
// 1.退出事件循环
_loop->quit();
// 2.等待线程退出
_thread.join();
}
// 获取事件循环对象的指针
EventLoop *EventLoopThread::startLoop()
{
// 当获取loop对象指针的时候,必须保证_loop已经赋值完毕
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait(lock, [this]()
{ return _loop != NULL; }); // 不为空结束
return _loop;
}
// 线程的入口函数
void EventLoopThread::threadFunc()
{
// 实例化一个局部的eventloop对象
EventLoop loop;
{
std::unique_lock<std::mutex> lock(_mutex);
_loop = &loop;
_cond.notify_all(); // 唤醒所有的阻塞等待
}
loop.loop(); // 开始事件循环
_loop = NULL;
}
每个线程独立运行一个 EventLoop。
5. EventLoopThreadPool —— 主从 Reactor 线程池
这是高并发关键!
头文件:
// EventLoopThreadPool:线程池,管理 N 个 EventLoopThread
// 实现主从 Reactor 多线程模型
class EventLoopTreadPool
{
public:
EventLoopTreadPool(EventLoop *baseloop);
~EventLoopTreadPool();
void setThreadNum(int count); // 设置线程数
void start(); // 启动所有线程
EventLoop *getNextLoop(); // 轮询获取下一个 loop(负载均衡)
private:
int _thread_num; // 线程数量
int _next_idx; // 负载均衡索引
EventLoop *_baseloop; // 使用者定义的EventLoop对象
std::vector<std::unique_ptr<EventLoopThread>> _threads;
std::vector<EventLoop *> _loops;
};
源程序:
EventLoopTreadPool::EventLoopTreadPool(EventLoop *baseloop)
: _thread_num(0),
_next_idx(0),
_baseloop(baseloop) {}
EventLoopTreadPool::~EventLoopTreadPool() {}
void EventLoopTreadPool::setThreadNum(int count)
{
_thread_num = count;
}
void EventLoopTreadPool::start()
{
_baseloop->assertInLoopThread();
// 根据数量设置,创建指定数量的Loopthread
// 根据创建的Loopthread,初始化添加loop和thread成员
for (int i = 0; i < _thread_num; i++)
{
EventLoopThread *loopThread = new EventLoopThread();
_threads.push_back(std::unique_ptr<EventLoopThread>(loopThread));
_loops.push_back(loopThread->startLoop());
}
}
// 轮询获取下一个 loop(负载均衡)
EventLoop *EventLoopTreadPool::getNextLoop()
{
// 必须在【主线程(baseloop 线程)】调用
// 因为这是 main reactor 负责分配连接
_baseloop->assertInLoopThread();
// 如果线程池数量为0(单线程模式)
// 直接返回主 loop,所有事情都在主线程处理
if (_thread_num == 0)
{
return _baseloop;
}
// RR轮转负载均衡策略
// RR 轮询算法:按顺序取下一个 EventLoop
// 第一次取 _loops[0]
// 第二次取 _loops[1]
// 第三次取 _loops[2]...
EventLoop *retval = _loops[_next_idx];
// 索引 +1,下次取下一个
_next_idx++;
// 如果索引到达末尾,重置为 0,循环往复
if (_next_idx == _loops.size())
{
_next_idx = 0;
}
// 返回选中的 EventLoop
return retval;
}
作用:
- Main Reactor 接收新连接
- 调用 getNextLoop () 分配给 Sub Reactor
- 实现多核 CPU 负载均衡
四、Reactor 框架执行流程(最清晰图解)
1. 服务器启动
- 启动 Main Reactor(baseloop)
- 启动 Sub Reactor 线程池
- 监听套接字 Channel 注册到 Main Reactor
2. 新连接到来
- epoll 通知 accept 事件
- Acceptor 接受新连接
- 分配一个 Sub Reactor
- 将客户端 fd 打包成 Channel 加入 Sub Reactor
3. 消息到达
- Sub Reactor 监听到 EPOLLIN 事件
- 调用 Channel 读回调
- 解析报文 → 执行业务 → 回复消息
4. 连接关闭
- EPOLLHUP / EPOLLERR
- 调用关闭回调
- 移除 Channel,关闭 fd
五、框架超高亮点
1. 线程隔离(One Loop Per Thread)
- 一个 EventLoop 只在一个线程运行
- 无锁、无竞争、高性能
2. 跨线程任务调用(runInLoop /queueInLoop)
// 在loop线程中则直接执⾏任务,否则就添加到任务池
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb(); // 当前线程 → 直接执行
}
else
{
queueInLoop(cb); // 其他线程 → 放入队列
}
}
// 将任务添加到任务池中,如果当前没有在loop线程中,或者当前处于处理任务池任务状态则唤醒
void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(_mutex);
_pendingFunctors.push_back(cb);
}
// 如果当前没有在1oop线程中,或者当前处于处理任务池任务状态则唤醒
if (_callingPendingFunctors || !isInLoopThread())
{
wakeup(); // 唤醒事件监控
}
}
3. eventfd 唤醒机制
- 用 eventfd 唤醒 epoll_wait
- 实现高效跨线程异步调用
4. 主从 Reactor 多线程架构
- 业界标准高并发架构
- Nginx / Netty /muduo 同款
5. 高精度定时器(runAt /runAfter/runEvery)
_timerQueue->addTimer(cb, when, interval);
六、Reactor 模型面试 10 问(背会直接拿分)
-
什么是 Reactor 模型?事件驱动,epoll 监听,回调分发。
-
Reactor 三大组件?EventLoop、Channel、Poller。
-
为什么一个线程一个 EventLoop?无锁、线程安全、减少切换。
-
Channel 作用?包装 fd,管理事件与回调。
-
EPollPoller 作用?封装 epoll,等待事件返回。
-
EventLoopThreadPool 作用?主从 Reactor,多核负载均衡。
-
runInLoop 作用?跨线程安全执行任务。
-
wakeupFd 作用?唤醒 epoll,处理异步任务。
-
水平触发 vs 边缘触发?水平触发稳定,你用的是水平触发。
-
主从 Reactor 优势?高并发、多核利用、高稳定、高吞吐。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)