前言

Reactor 模型高性能网络框架中,定时器(TimerQueue) 是不可或缺的核心组件。 它承担着连接超时断开、心跳检测、定时任务、延迟任务等关键功能,直接决定了服务的稳定性与精度。

下面的TimerQueue 代码:

基于 timerfd + epoll 实现,事件驱动、无轮询、高精度

采用 红黑树(std::set) 管理定时任务,O (logN) 增删查

完美集成 Reactor 模型,与 EventLoop 深度融合

支持单次定时、循环定时、任务取消

线程安全,跨线程添加 / 取消任务无压力


一、为什么要用 TimerQueue?

传统定时器方案(sleeptime 轮询)存在致命缺陷:

  • 阻塞线程,无法处理高并发
  • 轮询耗 CPU,性能极低
  • 精度差,难以满足毫秒 / 微秒级需求
  • 难以管理大量定时任务

TimerQueue(定时器队列) 完美解决所有问题:

  1. 事件驱动:定时器超时才触发,不浪费 CPU
  2. 高精度:基于 timerfd,微秒级定时精度
  3. 高效管理:红黑树组织任务,增删复杂度 O (logN)
  4. Reactor 融合:直接挂在 EventLoop 上,与网络事件统一调度
  5. 功能完整:支持单次、循环、取消、线程安全

二、TimerQueue 整体设计思想(一句话总结)

用 timerfd 监听超时事件 → 事件触发后 → 从红黑树中取出所有已超时任务 → 执行回调 → 循环任务重新入队

核心架构三组件:

  1. Timer:最小定时任务单元(回调、超时时间、是否循环)
  2. TimerId:定时任务唯一标识,用于安全取消
  3. TimerQueue:定时器管理器(timerfd + epoll + 红黑树)

三、核心组件深度解析

1. TimerId:任务安全唯一标识

#pragma once
#include "timestamp.h"
#include "channel.h"
#include <set>
#include <vector>
#include <cassert>
#include<functional>
#include<atomic>
namespace net
{
    class EventLoop;
    class Timer;
    class TimerQueue;
    // 定时任务唯一标识,并且能够通过这个ID找到定时任务
    class TimerId
    {
    public:
        TimerId(Timer *timer, int64_t seq)
            : _sequence(seq),
              _timer(timer) {}

    private:
        friend class TimerQueue;
        int64_t _sequence; // 定时任务唯一ID
        Timer *_timer;     // 定时任务对象指针
    };
    ...
}

作用

  • 不暴露内部 Timer 指针,保证安全
  • 用于取消定时任务
  • 序列保证唯一性,避免野指针问题

2. Timer:定时任务本体

typedef std::function<void()> TimerCallback;
    // 定时任务类:超时后要做什么,什么时候超时,是否是重复性任务
    class Timer
    {
    public:
        Timer(TimerCallback functor,Timestamp when,double interval)
        :_callback(std::move(functor)),
        _expired(when),
        _repeated(interval>0),
        _interval(interval),
        _sequence(_numcreated.fetch_add(1))
        {}
        // 执行定时任务
        void run() const { _callback(); }
        int64_t sequence(){return _sequence;}
        // 获取过期时间
        Timestamp expired() { return _expired; }
        /// 判断是否是重复性任务
        bool repeated() const { return _repeated; }
        // 获取重复性任务间隔时间
        double interval() { return _interval; }
        // 重置任务,循环任务就在当前时间的基础上加上循环间隔时间
        void restart(Timestamp now)
        {
            assert(_repeated == true);
            _expired = addTime(now, _interval);
        }

    private:
        TimerCallback _callback; // 定时任务要执行的操作
        Timestamp _expired;      // 任务的过期时间
        bool _repeated;          // 表示任务是否是重复性任务
        double _interval;        // 重复性任务间隔时间
        int64_t _sequence;
        static std::atomic<int64_t> _numcreated; // 递增序号
    };

作用:封装一个定时任务的所有信息。

关键点

  • _sequence:全局原子自增 ID,保证任务唯一
  • restart():循环任务重置下一次超时时间
  • run():超时执行用户回调

3. TimerQueue:定时器核心大脑(重点)

// 创建定时器,获取定时器描述符
    int createTimerfd();
    // 计算 when - now 的时间差  ---可能要根据这个时间差重置定时器超时通知时间
    struct timespec howMuchTimeFromNow(Timestamp when);
    // 读取定时器事件数据(超时次数,8B) read
    void readTimerfd(int timerfd, Timestamp now);
    // 重置定时器超时通知时间
    void resetTimerfd(int timerfd, Timestamp expiration);

    // 管理所有的定时任务,包含超时事件的感知,能够快速找到超时任务
    class TimerQueue
    {
        typedef std::pair<Timestamp, Timer *> Entry;
        typedef std::set<Entry> TimerList;

    public:
        TimerQueue(EventLoop *loop);
        ~TimerQueue();
        // 1.添加定时任务
        TimerId addTimer(TimerCallback functor, Timestamp when, double interval);
        // 2.取消定时任务
        void cancelTimer(TimerId tid);

    private:
        // 在loop线程中进行实际的添加/取消操作,保证成员操作的线程安全
        void addTimerInLoop(Timer *timer);
        void cancelTimerInLoop(TimerId tid);
        // 新增定时任务,返回标志:标识是否需要重新设置定时器超时时间
        bool insert(Timer *timer);
        // 过期任务的处理
        // 获取所有过期的定时任务
        std::vector<Entry> getExpired(Timestamp now);
        // 重置过期的定时任务:针对当前过期的任务,找出重复性任务,重新添加到任务池中
        void reset(std::vector<Entry> &list, Timestamp now);
        // 超时事件的处理
        void handleRead();

    private:
        EventLoop *_loop;      // 时间循环对象
        int _timerfd;           // 定时器描述符
        Channel _timerchannel; // 事件描述符对应的事件处理器
        TimerList _timers;
        std::atomic<bool> _callingExpiredTimers;//是否处于正在处理超时任务期间
        std::set<Timer *> _activeTimers; // 为了快速找到定时任务,判断定时任务是否在任务池中
        std::set<Timer *> _cancelTimers; // 取消任务池
    };

设计亮点

  • std::set 红黑树:自动按时间排序,最快拿到最早超时任务
  • _activeTimers:快速判断任务是否存在
  • _cancelTimers:处理执行中取消任务的安全问题

四、TimerQueue 工作流程(最清晰)

1. 初始化

  1. 创建 timerfd
  2. 封装 Channel 并注册到 epoll
  3. 监听读事件,超时触发 handleRead

2. 添加定时任务

  1. 创建 Timer 对象
  2. 插入红黑树 std::set
  3. 如果新任务是最早超时,重置 timerfd
  4. 返回 TimerId 用于取消

3. 超时事件触发

  1. epoll 感知 timerfd 可读
  2. 读取 timerfd 清除事件
  3. 从红黑树取出所有已超时任务
  4. 依次执行任务回调
  5. 循环任务重新计算时间,重新插入红黑树
  6. 设置下一次超时时间

4. 取消任务

  1. 通过 TimerId 找到 Timer
  2. 从红黑树删除
  3. 释放资源
  4. 处理执行中取消的边界情况

五、核心函数源码精讲

1. addTimer:添加定时任务

// 1.添加定时任务:new定时任务,将定时任务添加到任务池中,返回任务ID
    TimerId TimerQueue::addTimer(TimerCallback functor, Timestamp when, double interval)
    {
        Timer *timer = new Timer(std::move(functor), when, interval);
        // 必须在loop线程执行
        _loop->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer));
        return TimerId(timer, timer->sequence());
    }
 // 在loop线程中进行实际的添加/取消操作,保证成员操作的线程安全
    void TimerQueue::addTimerInLoop(Timer *timer)
    {
        _loop->assertInLoopThread();
        // 调用insert插入定时任务,根据返回值判断是否需要重置定时器超时时间
        bool changed = insert(timer);
        if (changed)
        {
            // 获取定时任务的超时时间,重置定时器
            resetTimerfd(_timerfd, timer->expired());
        }
    }

关键点

  • 必须在 EventLoop 线程执行,保证线程安全
  • 红黑树插入,自动排序
  • 最早任务变更 → 重置 timerfd

2. handleRead:超时事件处理(核心)

// 超时事件的处理
    void TimerQueue::handleRead()
    {
        _loop->assertInLoopThread();
        Timestamp now = Timestamp::now();
        // 1.读取超时事件数据
        readTimerfd(_timerfd, now);
        // 2.获取所有的超时任务:以当前时间构造一个entry对象,从_timers查找比它小的节点
        std::vector<Entry> expireds = getExpired(now);
        // 3.执行处理所有的定时任务:回调任务中的回调函数
        _callingExpiredTimers = true;
        _cancelTimers.clear(); // 清空以前的特殊取消数据
        for (auto &entry : expireds)
        {
            entry.second->run();
        }
        _callingExpiredTimers = false;
        // 4.重置定时任务:将重复性的定时任务,重置时间后,重新添加到任务池中
        reset(expireds, now);
    }

这就是定时器的灵魂!

3. getExpired:获取所有超时任务

// 过期任务的处理
    // 获取所有过期的定时任务
    std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
    {
        assert(_activeTimers.size() == _timers.size());
        // 任务池用的是红黑树,获取过期任务,就是找到小于当前时间节点的数据节点
        std::vector<TimerQueue::Entry> retval;
        // 1.根据当前时间,组织一个entry节点,必须是最大的时间节点
        TimerQueue::Entry entry(now, (Timer *)UINTPTR_MAX);
        // 2.根据组织的entry节点从任务池中取出所有小于该节点的数据
        auto end = _timers.lower_bound(entry);
        // 获取到第一个大于等于entry的节点,小于end的节点都是过期任务节点
        // 要么end等于末尾:要么任务池是空的,要么所有任务都是过期任务,要么end这个节点的过期时间必然大于当前时间
        assert(end == _timers.end() || now < end->first);
        std::copy(_timers.begin(), end, std::back_inserter(retval));
        // 3.从任务池&&查找池中删除这些节点
        _timers.erase(_timers.begin(), end);
        for (const auto &it : retval)
        {
            int n = _activeTimers.erase(it.second);
            assert(n == 1);
        }
        assert(_activeTimers.size() == _timers.size());
        // 4.返回结果
        return retval;
    }

设计极巧妙

  • lower_bound 快速找到第一个未超时任务
  • 批量获取、批量删除,效率极高

4. reset:循环任务重新入队

// 重置过期的定时任务:针对当前过期的任务,找出重复性任务,重新添加到任务池中
    void TimerQueue::reset(std::vector<Entry> &list, Timestamp now)
    {
        // 遍历过期任务,判断是否是重复性任务
        for (auto &entry : list)
        {
            // 如果是重复性任务,且没有被取消,则重置过期时间,重新添加到任务池
            if (entry.second->repeated() && _cancelTimers.find(entry.second) == _cancelTimers.end())
            {
                entry.second->restart(now);
                _timers.insert(Entry(entry.second->expired(), entry.second));
                _activeTimers.insert(entry.second);
            }
            else
            {
                // 如果不是,则释放任务节点
                delete entry.second;
            }
        }
        // 从任务池中找到最小的节点,将这个节点的过期时间,设置为定时器的通知时间
        Timestamp t; // 这个变量是一个无效时间
        if (!_timers.empty())
        {
            t = _timers.begin()->second->expired();
        }
        if (t.valid())
        {
            resetTimerfd(_timerfd, t);
        }
    }

5. 源程序


#include "timer.h"
#include "eventloop.h"
#include "details.h"
#include "channel.h"
#include <sys/timerfd.h>
#include <cstring>
#include <cassert>
#include <atomic>
#include <errno.h>
namespace net
{
    // 全局自增ID,保证每个Timer唯一
    std::atomic<int64_t> Timer::_numcreated = 0;
    // 创建定时器,获取定时器描述符
    int createTimerfd()
    {
        // CLOCK_MONOTONIC:单调递增时钟
        // TFD_NONBLOCK:非阻塞模式
        //  TFD_CLOEXEC:fork 子进程时自动关闭 fd
        int fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
        if (fd < 0)
        {
            LOG_FATAL("timerd_create ERROR");
        }
        return fd;
    }
    // 计算 when - now 的时间差  ---可能要根据这个时间差重置定时器超时通知时间
    struct timespec howMuchTimeFromNow(Timestamp when)
    {
        Timestamp now = Timestamp::now();
        int64_t detal = when.microSecondsSinceEpoch() - now.microSecondsSinceEpoch();
        if (detal < 100)
        {
            detal = 100;
        }
        struct timespec retval;
        retval.tv_sec = detal / Timestamp::_microSecondsPerSecond;
        retval.tv_nsec = (detal % Timestamp::_microSecondsPerSecond) * 1000;
        return retval;
    }
    // 读取定时器事件数据(超时次数,8B) read
    void readTimerfd(int timerfd, Timestamp now)
    {
        // 定时器描述符读取数据,需要读取8字节数据,读取到的数据是超时的次数
        int64_t val;
        ssize_t ret = ::read(timerfd, &val, sizeof(val));
        if (ret < 0)
        {
            LOG_ERROR("readTimerfd ERROR");
        }
    }
    // 重置定时器超时通知时间:expiration是最近的定时任务的超时时间
    // 功能:设置 timerfd 什么时候【第一次超时】
    void resetTimerfd(int timerfd, Timestamp expiration)
    {
        // int timerfd settime(int fd, int flags, const struct itimerspec *new value, struct itimerspec*_NULLable old_value
        // 1. 定义定时器设置结构体
        struct itimerspec its;
        memset(&its, 0x00, sizeof(its));
        // 2. 计算:从现在到超时时间点,还剩多久
        its.it_value = howMuchTimeFromNow(expiration);
        // 3. 把超时时间设置给内核!
        int ret = timerfd_settime(
            timerfd, // 哪个定时器
            0,       // 0 = 相对时间,TFD_TIMER_ABSTIME = 绝对时间
            &its,    // 新的超时设置
            NULL     // 不需要旧设置
        );
        if (ret < 0)
        {
            LOG_ERROR("resetTimerfd ERROR: %s", strerror(errno));
        }
    }

    // 初始化成员,将定时器添加读事件监控
    TimerQueue::TimerQueue(EventLoop *loop)
        : _loop(loop),
          _timerfd(createTimerfd()),
          _timerchannel(loop, _timerfd),
          _callingExpiredTimers(false)
    {
        _timerchannel.setReadCallback(std::bind(&TimerQueue::handleRead, this));
        _timerchannel.enableReading(); // 启动描述符的读事件监控
    }
    // 清理资源:解除描述符监控,移除监控,关闭描述符,释放当前还没有触发的定时任务
    TimerQueue::~TimerQueue()
    {
        _timerchannel.disableAll();
        _timerchannel.remove();
        ::close(_timerfd);
        for (auto &entry : _timers)
        {
            delete entry.second;
        }
    }
    // 1.添加定时任务:new定时任务,将定时任务添加到任务池中,返回任务ID
    TimerId TimerQueue::addTimer(TimerCallback functor, Timestamp when, double interval)
    {
        Timer *timer = new Timer(std::move(functor), when, interval);
        // 必须在loop线程执行
        _loop->runInLoop(std::bind(&TimerQueue::addTimerInLoop, this, timer));
        return TimerId(timer, timer->sequence());
    }
    // 2.取消定时任务s
    void TimerQueue::cancelTimer(TimerId tid)
    {
        _loop->runInLoop(std::bind(&TimerQueue::cancelTimerInLoop, this, tid));
    }

    // 在loop线程中进行实际的添加/取消操作,保证成员操作的线程安全
    void TimerQueue::addTimerInLoop(Timer *timer)
    {
        _loop->assertInLoopThread();
        // 调用insert插入定时任务,根据返回值判断是否需要重置定时器超时时间
        bool changed = insert(timer);
        if (changed)
        {
            // 获取定时任务的超时时间,重置定时器
            resetTimerfd(_timerfd, timer->expired());
        }
    }
    void TimerQueue::cancelTimerInLoop(TimerId tid)
    {
        _loop->assertInLoopThread();
        // 1.从任务池中删除定时任务
        assert(_activeTimers.size() == _timers.size());
        if (_activeTimers.find(tid._timer) != _activeTimers.end())
        {
            Entry entry(tid._timer->expired(), tid._timer);
            _timers.erase(entry);
            _activeTimers.erase(tid._timer);
            delete tid._timer;
        }
        else if (_callingExpiredTimers == true)
        {
            // 2.取消任务的时候,任务不在任务池中,而是取消的任务就是当前正在处理的超时任务(在内部取消
            //  将任务添加到取消池中
            _cancelTimers.insert(tid._timer);
        }
        assert(_activeTimers.size() == _timers.size());
    }
    // 新增定时任务,返回标志:标识是否需要重新设置定时器超时时间
    bool TimerQueue::insert(Timer *timer)
    {
        _loop->assertInLoopThread();
        assert(_activeTimers.size() == _timers.size());
        bool changed = false;
        Timestamp t = timer->expired();
        // 2.如果添加的节点就是最小节点,意味着需要重置定时器
        if (_timers.empty() || t < _timers.begin()->second->expired())
        {
            changed = true;
        }
        // 1.将任务向任务池,和查找池中各自添加一份
        auto ret1 = _timers.insert(Entry(t, timer));
        assert(ret1.second);
        auto ret2 = _activeTimers.insert(timer);
        assert(ret2.second);
        assert(_activeTimers.size() == _timers.size());
        return changed;
    }
    // 超时事件的处理
    void TimerQueue::handleRead()
    {
        _loop->assertInLoopThread();
        Timestamp now = Timestamp::now();
        // 1.读取超时事件数据
        readTimerfd(_timerfd, now);
        // 2.获取所有的超时任务:以当前时间构造一个entry对象,从_timers查找比它小的节点
        std::vector<Entry> expireds = getExpired(now);
        // 3.执行处理所有的定时任务:回调任务中的回调函数
        _callingExpiredTimers = true;
        _cancelTimers.clear(); // 清空以前的特殊取消数据
        for (auto &entry : expireds)
        {
            entry.second->run();
        }
        _callingExpiredTimers = false;
        // 4.重置定时任务:将重复性的定时任务,重置时间后,重新添加到任务池中
        reset(expireds, now);
    }
    // 过期任务的处理
    // 获取所有过期的定时任务
    std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
    {
        assert(_activeTimers.size() == _timers.size());
        // 任务池用的是红黑树,获取过期任务,就是找到小于当前时间节点的数据节点
        std::vector<TimerQueue::Entry> retval;
        // 1.根据当前时间,组织一个entry节点,必须是最大的时间节点
        TimerQueue::Entry entry(now, (Timer *)UINTPTR_MAX);
        // 2.根据组织的entry节点从任务池中取出所有小于该节点的数据
        auto end = _timers.lower_bound(entry);
        // 获取到第一个大于等于entry的节点,小于end的节点都是过期任务节点
        // 要么end等于末尾:要么任务池是空的,要么所有任务都是过期任务,要么end这个节点的过期时间必然大于当前时间
        assert(end == _timers.end() || now < end->first);
        std::copy(_timers.begin(), end, std::back_inserter(retval));
        // 3.从任务池&&查找池中删除这些节点
        _timers.erase(_timers.begin(), end);
        for (const auto &it : retval)
        {
            int n = _activeTimers.erase(it.second);
            assert(n == 1);
        }
        assert(_activeTimers.size() == _timers.size());
        // 4.返回结果
        return retval;
    }
    // 重置过期的定时任务:针对当前过期的任务,找出重复性任务,重新添加到任务池中
    void TimerQueue::reset(std::vector<Entry> &list, Timestamp now)
    {
        // 遍历过期任务,判断是否是重复性任务
        for (auto &entry : list)
        {
            // 如果是重复性任务,且没有被取消,则重置过期时间,重新添加到任务池
            if (entry.second->repeated() && _cancelTimers.find(entry.second) == _cancelTimers.end())
            {
                entry.second->restart(now);
                _timers.insert(Entry(entry.second->expired(), entry.second));
                _activeTimers.insert(entry.second);
            }
            else
            {
                // 如果不是,则释放任务节点
                delete entry.second;
            }
        }
        // 从任务池中找到最小的节点,将这个节点的过期时间,设置为定时器的通知时间
        Timestamp t; // 这个变量是一个无效时间
        if (!_timers.empty())
        {
            t = _timers.begin()->second->expired();
        }
        if (t.valid())
        {
            resetTimerfd(_timerfd, t);
        }
    }

}

六、超高亮工程级设计亮点

1. 线程安全

所有操作通过 runInLoop 转到 IO 线程执行,无锁、无竞争

2. 红黑树排序

std::set 自动按时间排序,O(logN) 获取最早任务。

3. timerfd 事件驱动

不轮询、不阻塞,CPU 开销几乎为零

4. 安全取消机制

  • _cancelTimers 处理任务执行中被取消的边界场景
  • TimerId 隔离内部指针,防止野指针
  • 双集合校验保证数据一致性

5. 完美集成 Reactor

  • 一个 EventLoop 对应一个 TimerQueue
  • 网络事件 + 定时事件统一调度

七、TimerQueue 使用方式(极简)

// 延迟 3 秒执行
loop->runAfter(3.0, [](){ cout << "延迟执行" << endl; });

// 每隔 1 秒循环执行
loop->runEvery(1.0, [](){ cout << "循环执行" << endl; });

// 指定时间点执行
loop->runAt(someTime, cb);

八、总结(最精华)

我的 TimerQueue 定时器队列

它的核心价值:

高精度、事件驱动、无浪费

红黑树管理,O (logN) 高效增删

线程安全、支持取消、循环任务

完美融合 Reactor 模型

工业级稳定,可直接上线使用

一句话总结 TimerQueue: 一个 timerfd 监听超时,一棵红黑树管理任务,一套 Reactor 统一调度,高性能、高精度、高稳定!

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐