【C++ 高性能网络库核心】TimerQueue 定时器队列:从原理到源码深度解析
前言
在Reactor 模型高性能网络框架中,定时器(TimerQueue) 是不可或缺的核心组件。 它承担着连接超时断开、心跳检测、定时任务、延迟任务等关键功能,直接决定了服务的稳定性与精度。
下面的TimerQueue 代码:
基于
timerfd+epoll实现,事件驱动、无轮询、高精度采用 红黑树(std::set) 管理定时任务,O (logN) 增删查
完美集成 Reactor 模型,与 EventLoop 深度融合
支持单次定时、循环定时、任务取消
线程安全,跨线程添加 / 取消任务无压力
一、为什么要用 TimerQueue?
传统定时器方案(sleep、time 轮询)存在致命缺陷:
- 阻塞线程,无法处理高并发
- 轮询耗 CPU,性能极低
- 精度差,难以满足毫秒 / 微秒级需求
- 难以管理大量定时任务
而 TimerQueue(定时器队列) 完美解决所有问题:
- 事件驱动:定时器超时才触发,不浪费 CPU
- 高精度:基于
timerfd,微秒级定时精度 - 高效管理:红黑树组织任务,增删复杂度 O (logN)
- Reactor 融合:直接挂在 EventLoop 上,与网络事件统一调度
- 功能完整:支持单次、循环、取消、线程安全
二、TimerQueue 整体设计思想(一句话总结)
用 timerfd 监听超时事件 → 事件触发后 → 从红黑树中取出所有已超时任务 → 执行回调 → 循环任务重新入队
核心架构三组件:
- Timer:最小定时任务单元(回调、超时时间、是否循环)
- TimerId:定时任务唯一标识,用于安全取消
- TimerQueue:定时器管理器(timerfd + epoll + 红黑树)
三、核心组件深度解析
1. TimerId:任务安全唯一标识
#pragma once
#include "timestamp.h"
#include "channel.h"
#include <set>
#include <vector>
#include <cassert>
#include<functional>
#include<atomic>
namespace net
{
class EventLoop;
class Timer;
class TimerQueue;
// 定时任务唯一标识,并且能够通过这个ID找到定时任务
class TimerId
{
public:
TimerId(Timer *timer, int64_t seq)
: _sequence(seq),
_timer(timer) {}
private:
friend class TimerQueue;
int64_t _sequence; // 定时任务唯一ID
Timer *_timer; // 定时任务对象指针
};
...
}
作用:
- 不暴露内部 Timer 指针,保证安全
- 用于取消定时任务
- 序列保证唯一性,避免野指针问题
2. Timer:定时任务本体
typedef std::function<void()> TimerCallback;
// 定时任务类:超时后要做什么,什么时候超时,是否是重复性任务
class Timer
{
public:
Timer(TimerCallback functor,Timestamp when,double interval)
:_callback(std::move(functor)),
_expired(when),
_repeated(interval>0),
_interval(interval),
_sequence(_numcreated.fetch_add(1))
{}
// 执行定时任务
void run() const { _callback(); }
int64_t sequence(){return _sequence;}
// 获取过期时间
Timestamp expired() { return _expired; }
/// 判断是否是重复性任务
bool repeated() const { return _repeated; }
// 获取重复性任务间隔时间
double interval() { return _interval; }
// 重置任务,循环任务就在当前时间的基础上加上循环间隔时间
void restart(Timestamp now)
{
assert(_repeated == true);
_expired = addTime(now, _interval);
}
private:
TimerCallback _callback; // 定时任务要执行的操作
Timestamp _expired; // 任务的过期时间
bool _repeated; // 表示任务是否是重复性任务
double _interval; // 重复性任务间隔时间
int64_t _sequence;
static std::atomic<int64_t> _numcreated; // 递增序号
};
作用:封装一个定时任务的所有信息。
关键点:
_sequence:全局原子自增 ID,保证任务唯一restart():循环任务重置下一次超时时间run():超时执行用户回调
3. TimerQueue:定时器核心大脑(重点)
// 创建定时器,获取定时器描述符
int createTimerfd();
// 计算 when - now 的时间差 ---可能要根据这个时间差重置定时器超时通知时间
struct timespec howMuchTimeFromNow(Timestamp when);
// 读取定时器事件数据(超时次数,8B) read
void readTimerfd(int timerfd, Timestamp now);
// 重置定时器超时通知时间
void resetTimerfd(int timerfd, Timestamp expiration);
// 管理所有的定时任务,包含超时事件的感知,能够快速找到超时任务
class TimerQueue
{
typedef std::pair<Timestamp, Timer *> Entry;
typedef std::set<Entry> TimerList;
public:
TimerQueue(EventLoop *loop);
~TimerQueue();
// 1.添加定时任务
TimerId addTimer(TimerCallback functor, Timestamp when, double interval);
// 2.取消定时任务
void cancelTimer(TimerId tid);
private:
// 在loop线程中进行实际的添加/取消操作,保证成员操作的线程安全
void addTimerInLoop(Timer *timer);
void cancelTimerInLoop(TimerId tid);
// 新增定时任务,返回标志:标识是否需要重新设置定时器超时时间
bool insert(Timer *timer);
// 过期任务的处理
// 获取所有过期的定时任务
std::vector<Entry> getExpired(Timestamp now);
// 重置过期的定时任务:针对当前过期的任务,找出重复性任务,重新添加到任务池中
void reset(std::vector<Entry> &list, Timestamp now);
// 超时事件的处理
void handleRead();
private:
EventLoop *_loop; // 时间循环对象
int _timerfd; // 定时器描述符
Channel _timerchannel; // 事件描述符对应的事件处理器
TimerList _timers;
std::atomic<bool> _callingExpiredTimers;//是否处于正在处理超时任务期间
std::set<Timer *> _activeTimers; // 为了快速找到定时任务,判断定时任务是否在任务池中
std::set<Timer *> _cancelTimers; // 取消任务池
};
设计亮点:
std::set红黑树:自动按时间排序,最快拿到最早超时任务_activeTimers:快速判断任务是否存在_cancelTimers:处理执行中取消任务的安全问题
四、TimerQueue 工作流程(最清晰)
1. 初始化
- 创建
timerfd - 封装
Channel并注册到epoll - 监听读事件,超时触发
handleRead
2. 添加定时任务
- 创建 Timer 对象
- 插入红黑树
std::set - 如果新任务是最早超时,重置
timerfd - 返回 TimerId 用于取消
3. 超时事件触发
epoll感知 timerfd 可读- 读取 timerfd 清除事件
- 从红黑树取出所有已超时任务
- 依次执行任务回调
- 循环任务重新计算时间,重新插入红黑树
- 设置下一次超时时间
4. 取消任务
- 通过 TimerId 找到 Timer
- 从红黑树删除
- 释放资源
- 处理执行中取消的边界情况
五、核心函数源码精讲
1. addTimer:添加定时任务
// 1.添加定时任务:new定时任务,将定时任务添加到任务池中,返回任务ID
TimerId TimerQueue::addTimer(TimerCallback functor, Timestamp when, double interval)
{
Timer *timer = new Timer(std::move(functor), when, interval);
// 必须在loop线程执行
_loop->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer));
return TimerId(timer, timer->sequence());
}
// 在loop线程中进行实际的添加/取消操作,保证成员操作的线程安全
void TimerQueue::addTimerInLoop(Timer *timer)
{
_loop->assertInLoopThread();
// 调用insert插入定时任务,根据返回值判断是否需要重置定时器超时时间
bool changed = insert(timer);
if (changed)
{
// 获取定时任务的超时时间,重置定时器
resetTimerfd(_timerfd, timer->expired());
}
}
关键点:
- 必须在 EventLoop 线程执行,保证线程安全
- 红黑树插入,自动排序
- 最早任务变更 → 重置 timerfd
2. handleRead:超时事件处理(核心)
// 超时事件的处理
void TimerQueue::handleRead()
{
_loop->assertInLoopThread();
Timestamp now = Timestamp::now();
// 1.读取超时事件数据
readTimerfd(_timerfd, now);
// 2.获取所有的超时任务:以当前时间构造一个entry对象,从_timers查找比它小的节点
std::vector<Entry> expireds = getExpired(now);
// 3.执行处理所有的定时任务:回调任务中的回调函数
_callingExpiredTimers = true;
_cancelTimers.clear(); // 清空以前的特殊取消数据
for (auto &entry : expireds)
{
entry.second->run();
}
_callingExpiredTimers = false;
// 4.重置定时任务:将重复性的定时任务,重置时间后,重新添加到任务池中
reset(expireds, now);
}
这就是定时器的灵魂!
3. getExpired:获取所有超时任务
// 过期任务的处理
// 获取所有过期的定时任务
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
assert(_activeTimers.size() == _timers.size());
// 任务池用的是红黑树,获取过期任务,就是找到小于当前时间节点的数据节点
std::vector<TimerQueue::Entry> retval;
// 1.根据当前时间,组织一个entry节点,必须是最大的时间节点
TimerQueue::Entry entry(now, (Timer *)UINTPTR_MAX);
// 2.根据组织的entry节点从任务池中取出所有小于该节点的数据
auto end = _timers.lower_bound(entry);
// 获取到第一个大于等于entry的节点,小于end的节点都是过期任务节点
// 要么end等于末尾:要么任务池是空的,要么所有任务都是过期任务,要么end这个节点的过期时间必然大于当前时间
assert(end == _timers.end() || now < end->first);
std::copy(_timers.begin(), end, std::back_inserter(retval));
// 3.从任务池&&查找池中删除这些节点
_timers.erase(_timers.begin(), end);
for (const auto &it : retval)
{
int n = _activeTimers.erase(it.second);
assert(n == 1);
}
assert(_activeTimers.size() == _timers.size());
// 4.返回结果
return retval;
}
设计极巧妙:
lower_bound快速找到第一个未超时任务- 批量获取、批量删除,效率极高
4. reset:循环任务重新入队
// 重置过期的定时任务:针对当前过期的任务,找出重复性任务,重新添加到任务池中
void TimerQueue::reset(std::vector<Entry> &list, Timestamp now)
{
// 遍历过期任务,判断是否是重复性任务
for (auto &entry : list)
{
// 如果是重复性任务,且没有被取消,则重置过期时间,重新添加到任务池
if (entry.second->repeated() && _cancelTimers.find(entry.second) == _cancelTimers.end())
{
entry.second->restart(now);
_timers.insert(Entry(entry.second->expired(), entry.second));
_activeTimers.insert(entry.second);
}
else
{
// 如果不是,则释放任务节点
delete entry.second;
}
}
// 从任务池中找到最小的节点,将这个节点的过期时间,设置为定时器的通知时间
Timestamp t; // 这个变量是一个无效时间
if (!_timers.empty())
{
t = _timers.begin()->second->expired();
}
if (t.valid())
{
resetTimerfd(_timerfd, t);
}
}
5. 源程序
#include "timer.h"
#include "eventloop.h"
#include "details.h"
#include "channel.h"
#include <sys/timerfd.h>
#include <cstring>
#include <cassert>
#include <atomic>
#include <errno.h>
namespace net
{
// 全局自增ID,保证每个Timer唯一
std::atomic<int64_t> Timer::_numcreated = 0;
// 创建定时器,获取定时器描述符
int createTimerfd()
{
// CLOCK_MONOTONIC:单调递增时钟
// TFD_NONBLOCK:非阻塞模式
// TFD_CLOEXEC:fork 子进程时自动关闭 fd
int fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (fd < 0)
{
LOG_FATAL("timerd_create ERROR");
}
return fd;
}
// 计算 when - now 的时间差 ---可能要根据这个时间差重置定时器超时通知时间
struct timespec howMuchTimeFromNow(Timestamp when)
{
Timestamp now = Timestamp::now();
int64_t detal = when.microSecondsSinceEpoch() - now.microSecondsSinceEpoch();
if (detal < 100)
{
detal = 100;
}
struct timespec retval;
retval.tv_sec = detal / Timestamp::_microSecondsPerSecond;
retval.tv_nsec = (detal % Timestamp::_microSecondsPerSecond) * 1000;
return retval;
}
// 读取定时器事件数据(超时次数,8B) read
void readTimerfd(int timerfd, Timestamp now)
{
// 定时器描述符读取数据,需要读取8字节数据,读取到的数据是超时的次数
int64_t val;
ssize_t ret = ::read(timerfd, &val, sizeof(val));
if (ret < 0)
{
LOG_ERROR("readTimerfd ERROR");
}
}
// 重置定时器超时通知时间:expiration是最近的定时任务的超时时间
// 功能:设置 timerfd 什么时候【第一次超时】
void resetTimerfd(int timerfd, Timestamp expiration)
{
// int timerfd settime(int fd, int flags, const struct itimerspec *new value, struct itimerspec*_NULLable old_value
// 1. 定义定时器设置结构体
struct itimerspec its;
memset(&its, 0x00, sizeof(its));
// 2. 计算:从现在到超时时间点,还剩多久
its.it_value = howMuchTimeFromNow(expiration);
// 3. 把超时时间设置给内核!
int ret = timerfd_settime(
timerfd, // 哪个定时器
0, // 0 = 相对时间,TFD_TIMER_ABSTIME = 绝对时间
&its, // 新的超时设置
NULL // 不需要旧设置
);
if (ret < 0)
{
LOG_ERROR("resetTimerfd ERROR: %s", strerror(errno));
}
}
// 初始化成员,将定时器添加读事件监控
TimerQueue::TimerQueue(EventLoop *loop)
: _loop(loop),
_timerfd(createTimerfd()),
_timerchannel(loop, _timerfd),
_callingExpiredTimers(false)
{
_timerchannel.setReadCallback(std::bind(&TimerQueue::handleRead, this));
_timerchannel.enableReading(); // 启动描述符的读事件监控
}
// 清理资源:解除描述符监控,移除监控,关闭描述符,释放当前还没有触发的定时任务
TimerQueue::~TimerQueue()
{
_timerchannel.disableAll();
_timerchannel.remove();
::close(_timerfd);
for (auto &entry : _timers)
{
delete entry.second;
}
}
// 1.添加定时任务:new定时任务,将定时任务添加到任务池中,返回任务ID
TimerId TimerQueue::addTimer(TimerCallback functor, Timestamp when, double interval)
{
Timer *timer = new Timer(std::move(functor), when, interval);
// 必须在loop线程执行
_loop->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer));
return TimerId(timer, timer->sequence());
}
// 2.取消定时任务s
void TimerQueue::cancelTimer(TimerId tid)
{
_loop->runInLoop(std::bind(&TimerQueue::cancelTimerInLoop, this, tid));
}
// 在loop线程中进行实际的添加/取消操作,保证成员操作的线程安全
void TimerQueue::addTimerInLoop(Timer *timer)
{
_loop->assertInLoopThread();
// 调用insert插入定时任务,根据返回值判断是否需要重置定时器超时时间
bool changed = insert(timer);
if (changed)
{
// 获取定时任务的超时时间,重置定时器
resetTimerfd(_timerfd, timer->expired());
}
}
void TimerQueue::cancelTimerInLoop(TimerId tid)
{
_loop->assertInLoopThread();
// 1.从任务池中删除定时任务
assert(_activeTimers.size() == _timers.size());
if (_activeTimers.find(tid._timer) != _activeTimers.end())
{
Entry entry(tid._timer->expired(), tid._timer);
_timers.erase(entry);
_activeTimers.erase(tid._timer);
delete tid._timer;
}
else if (_callingExpiredTimers == true)
{
// 2.取消任务的时候,任务不在任务池中,而是取消的任务就是当前正在处理的超时任务(在内部取消
// 将任务添加到取消池中
_cancelTimers.insert(tid._timer);
}
assert(_activeTimers.size() == _timers.size());
}
// 新增定时任务,返回标志:标识是否需要重新设置定时器超时时间
bool TimerQueue::insert(Timer *timer)
{
_loop->assertInLoopThread();
assert(_activeTimers.size() == _timers.size());
bool changed = false;
Timestamp t = timer->expired();
// 2.如果添加的节点就是最小节点,意味着需要重置定时器
if (_timers.empty() || t < _timers.begin()->second->expired())
{
changed = true;
}
// 1.将任务向任务池,和查找池中各自添加一份
auto ret1 = _timers.insert(Entry(t, timer));
assert(ret1.second);
auto ret2 = _activeTimers.insert(timer);
assert(ret2.second);
assert(_activeTimers.size() == _timers.size());
return changed;
}
// 超时事件的处理
void TimerQueue::handleRead()
{
_loop->assertInLoopThread();
Timestamp now = Timestamp::now();
// 1.读取超时事件数据
readTimerfd(_timerfd, now);
// 2.获取所有的超时任务:以当前时间构造一个entry对象,从_timers查找比它小的节点
std::vector<Entry> expireds = getExpired(now);
// 3.执行处理所有的定时任务:回调任务中的回调函数
_callingExpiredTimers = true;
_cancelTimers.clear(); // 清空以前的特殊取消数据
for (auto &entry : expireds)
{
entry.second->run();
}
_callingExpiredTimers = false;
// 4.重置定时任务:将重复性的定时任务,重置时间后,重新添加到任务池中
reset(expireds, now);
}
// 过期任务的处理
// 获取所有过期的定时任务
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
assert(_activeTimers.size() == _timers.size());
// 任务池用的是红黑树,获取过期任务,就是找到小于当前时间节点的数据节点
std::vector<TimerQueue::Entry> retval;
// 1.根据当前时间,组织一个entry节点,必须是最大的时间节点
TimerQueue::Entry entry(now, (Timer *)UINTPTR_MAX);
// 2.根据组织的entry节点从任务池中取出所有小于该节点的数据
auto end = _timers.lower_bound(entry);
// 获取到第一个大于等于entry的节点,小于end的节点都是过期任务节点
// 要么end等于末尾:要么任务池是空的,要么所有任务都是过期任务,要么end这个节点的过期时间必然大于当前时间
assert(end == _timers.end() || now < end->first);
std::copy(_timers.begin(), end, std::back_inserter(retval));
// 3.从任务池&&查找池中删除这些节点
_timers.erase(_timers.begin(), end);
for (const auto &it : retval)
{
int n = _activeTimers.erase(it.second);
assert(n == 1);
}
assert(_activeTimers.size() == _timers.size());
// 4.返回结果
return retval;
}
// 重置过期的定时任务:针对当前过期的任务,找出重复性任务,重新添加到任务池中
void TimerQueue::reset(std::vector<Entry> &list, Timestamp now)
{
// 遍历过期任务,判断是否是重复性任务
for (auto &entry : list)
{
// 如果是重复性任务,且没有被取消,则重置过期时间,重新添加到任务池
if (entry.second->repeated() && _cancelTimers.find(entry.second) == _cancelTimers.end())
{
entry.second->restart(now);
_timers.insert(Entry(entry.second->expired(), entry.second));
_activeTimers.insert(entry.second);
}
else
{
// 如果不是,则释放任务节点
delete entry.second;
}
}
// 从任务池中找到最小的节点,将这个节点的过期时间,设置为定时器的通知时间
Timestamp t; // 这个变量是一个无效时间
if (!_timers.empty())
{
t = _timers.begin()->second->expired();
}
if (t.valid())
{
resetTimerfd(_timerfd, t);
}
}
}
六、超高亮工程级设计亮点
1. 线程安全
所有操作通过 runInLoop 转到 IO 线程执行,无锁、无竞争。
2. 红黑树排序
std::set 自动按时间排序,O(logN) 获取最早任务。
3. timerfd 事件驱动
不轮询、不阻塞,CPU 开销几乎为零。
4. 安全取消机制
_cancelTimers处理任务执行中被取消的边界场景- TimerId 隔离内部指针,防止野指针
- 双集合校验保证数据一致性
5. 完美集成 Reactor
- 一个 EventLoop 对应一个 TimerQueue
- 网络事件 + 定时事件统一调度
七、TimerQueue 使用方式(极简)
// 延迟 3 秒执行
loop->runAfter(3.0, [](){ cout << "延迟执行" << endl; });
// 每隔 1 秒循环执行
loop->runEvery(1.0, [](){ cout << "循环执行" << endl; });
// 指定时间点执行
loop->runAt(someTime, cb);
八、总结(最精华)
我的 TimerQueue 定时器队列
它的核心价值:
高精度、事件驱动、无浪费
红黑树管理,O (logN) 高效增删
线程安全、支持取消、循环任务
完美融合 Reactor 模型
工业级稳定,可直接上线使用
一句话总结 TimerQueue: 一个 timerfd 监听超时,一棵红黑树管理任务,一套 Reactor 统一调度,高性能、高精度、高稳定!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)