1. RabbitMQ 简单说明

在开发C++ 高并发后端、分布式服务、实时数据推送、异步任务系统时,你是否也遇到过这些难题:C++ 服务间同步 RPC 调用耦合严重,一个模块崩溃导致整个链路不可用;高并发场景下(如物联网设备上报、高频交易、游戏服务)请求突增,C++ 服务直接被打满阻塞;需要跨语言、跨节点通信,手写 Socket、自研队列无持久化、无重试、丢数据风险极高;C++ 项目需要对接微服务集群,没有成熟可靠的消息队列客户端,二次开发成本巨大;延时任务、消息重试、死信处理等通用能力,用 C++ 从零实现复杂且稳定性差;海量消息需要有序、可靠投递,传统 C++ 并发队列无法支撑分布式场景。

尤其是在C++ 高性能服务、物联网、游戏后端、金融交易场景:C++ 核心服务需要异步处理日志、上报指标,同步 IO 严重拖慢性能;设备终端海量数据上行,需要队列削峰保护服务;分布式 C++ 节点需要解耦通信,强依赖会导致部署维护困难;金融、交易类 C++ 服务要求消息不丢失、不重复、必到达,自研方案无法保证;延时消息、定时任务(如订单超时、设备心跳)用 C++ 定时器实现难以支撑大规模场景。轻则服务卡顿、吞吐量下降,重则数据丢失、服务崩溃、线上故障。

RabbitMQ 正是为解决这些痛点而生 —— 这款支持原生 C++ 客户端的开源分布式消息队列中间件,专为 C++ 高性能场景设计,提供稳定、轻量、高性能的 AMQP 客户端库,让你用 C++ 极简代码实现服务解耦、异步通信、流量削峰、可靠消息投递。它完美适配 C++ 高并发、低延迟的需求,支持持久化、死信队列、高可用集群,无需手写底层网络通信与消息队列逻辑,是 C++ 分布式服务、高并发系统、异步任务 的首选消息中间件。

如果你以 C++ 开发 为主,RabbitMQ 提供成熟稳定的 AMQP-CPP 客户端(高性能、纯 C++、跨平台)

2. AMQP 简介

2.1 简介

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端 / 中间件不同产品、不同的开发语言等条件的限制。Erlang 中的实现有 RabbitMQ 等。

它规定了消息系统中三大组件 —— 消息服务器 / 代理节点 (server/broker)、生产者 / 发布者 (producer/publisher)、消费者 / 订阅者 (consumer/subscriber) 之间的通信规范,以及代理节点的设计规范等。


2.2 MQ 核心要素

高级消息队列协议(AMQP)—— 它是应用层的标准协议,不仅约定了消息系统中三大核心组件(消息服务器、生产者、消费者)的通信规范,还打破了客户端与中间件的产品限制、开发语言限制,让不同节点间的通信更灵活。除此之外,AMQP 还定义了消息队列使用中的核心模型要素,这是我们理解消息队列工作机制的关键。

AMQP 的核心模型要素中,首先是 Broker(消息代理服务器),这是整个消息通信的核心中转节点。实际通信场景中,生产者(消息发送方)不会直接与消费者(消息接收方)通信,而是先将消息发送到 Broker,消费者再从 Broker 获取消息。

这种模式的优势很明显:一是降低了生产者与消费者的耦合度,二是提升了容灾性,避免因网络中断导致消息丢失。我们使用 RabbitMQ 作为 Broker,它对 AMQP 协议支持完善,且已预装在开发环境中(版本为 4.0)。Broker 内部还包含虚拟主机(Virtual Host),用于实现环境隔离,让多个客户端或业务场景在同一 Broker 中互不干扰,各自独立运行。

在虚拟主机内部,核心组件是交换机(Exchange)和队列(Queue)交换机作为消息分发中心,接收生产者发送的消息,并根据自身类型和路由规则将消息转发到对应的队列中。交换机主要有四种类型:

我们先把核心角色对应一下:

  • 生产者:寄快递的人

  • 路由键 (Routing Key):快递单上写的“地址关键词”(比如:省份、城市)

  • 交换机 (Exchange):快递分拣中心

  • 队列 (Queue):具体的快递存放点(最终要送到的地方)

  • 绑定键 (Binding Key):快递存放点告诉分拣中心:“如果地址关键词是XXX,就把快递放我这里”

下面来看四种交换机分别怎么工作。

1. 直连交换机(Direct Exchange)根据消息的路由键(Routing Key)与队列的绑定键(Binding Key)完全匹配来转发消息;

一句话:路由键 必须完全等于 绑定键,精准送达。

生活例子:高铁/航班的“直达车”

  • 你想从北京去上海,买票时选“北京 → 上海”。

  • 分拣中心(交换机)看到路由键是“上海”,就直接把你送到绑定了“上海”的队列(上海出口)。

  • 如果你的票是“北京 → 广州”,就绝对不会错送到上海。

队列A 绑定键 = "error"
队列B 绑定键 = "info"

消息 routing_key = "error" → 只去队列A

适用场景:明确的、一对一的精准路由(如日志级别:error / info / debug)。

2. 广播交换机(Fanout Exchange)忽略路由键,将消息广播到所有绑定的队列;

一句话:忽略任何地址,每个绑定的队列都收到一份。

生活例子:小区广播通知

  • 物业在喇叭里喊:“明天停水”。

  • 不管你家在几栋几单元(相当于忽略路由键),小区里所有住户(所有绑定的队列)都听得到。

  • 你不能说“我只想让 1 号楼听到”,广播模式下不行。

队列A、B、C 都绑定到同一个 fanout 交换机
无论 routing_key 写什么(甚至空),A/B/C 都会收到相同消息

适用场景:广播、所有消费者必须同时收到通知(如配置更新、全局事件)。

3. 主题交换机(Topic Exchange)通过路由键与绑定键的通配符匹配,实现更灵活的多队列转发;

一句话:匹配时支持通配符 *(一个单词)和 #(0 或多个单词)。

生活例子:菜鸟驿站的“模糊取件码”

  • 假设绑定键是:*.杭州.*

  • 路由键 A.杭州.B ✅ 匹配

  • 路由键 杭州.A ❌ 不匹配(因为 * 要求必须有单词)

  • 路由键 X.杭州.Y.Z 且绑定键用 #.杭州 → 匹配(# 吃掉了前面的 X)

常用通配符:

  • * :刚好一个单词

  • # :0 个或多个单词

  • 单词之间用 . 分隔

队列A 绑定键 = "china.*.weather"
china.beijing.weather   ✅
china.shanghai.weather  ✅
china.beijing.temp      ❌(最后不是weather)

适用场景:多级分类、多种匹配规则(如系统监控:*.critical.**.error)。

4. 头部交换机(Headers Exchange)则根据消息的自定义头部参数进行路由,适用于复杂场景但复杂度较高。

一句话:不看路由键,只看消息头(Header)里的 key-value 是否匹配。

生活例子:VIP 会员 + 学生身份双重验证

  • 消息头里有:{"会员等级": "钻石", "身份": "学生"}

  • 队列绑定条件:x-match = all,且需要 会员等级=钻石 且 身份=学生

  • 必须两个都满足,才会进入这个队列。

x-match 参数:

  • all :所有条件都必须满足

  • any :任意一个条件满足即可

队列绑定 headers = { "format": "pdf", "type": "report", "x-match": "all" }

消息1 headers { "format": "pdf", "type": "report" } → ✅ 匹配
消息2 headers { "format": "pdf" } → ❌ 不匹配

适用场景:复杂路由、多维度匹配(如:游戏服务器按 客户端版本+渠道+地区 筛选消息),但因为性能和维护成本较高,实际用得相对少。

队列则是消息的存储容器,直到消费者接收并处理消息后,消息才会从队列中移除,且交换机与队列之间支持多对多绑定关系,进一步提升了消息转发的灵活性。

消息队列的核心作用包括解耦、异步处理、削峰填谷、负载均衡等:解耦让生产者与消费者无需直接交互,也无需同时在线;异步处理允许生产者发送消息后无需等待消费者响应,提升系统吞吐量;削峰填谷能应对短时间高并发消息,避免后端服务器过载;负载均衡则可将消息自动分配给多个消费者,提升处理效率。这些特性让消息队列在分布式系统中不可或缺。

生产者(Producer):

  • 生产者是消息的发送方。它们创建消息并将其发布到 RabbitMQ 的交换机上。生产者通常将消息发送到一个或多个队列,以便消费者可以订阅并处理这些消息。

消费者(Consumer):

  • 消费者是消息的接收方,它们订阅队列并从中获取消息。一旦消费者接收到消息,它们可以对消息进行处理,例如执行某些任务或将数据存储到数据库中。
2.3 工作流程

消息的传输流程:

  • 生产者连接Broker进行发布消息,并指定交换机和路由键。
  • 交换机根据路由键将消息路由到一个或多个队列。
  • 消费者从队列中获取消息并处理它们。

3. AMQP-CPP 简介

3.1 简介

要实现 C++ 程序与 RabbitMQ 的通信,我们选择 AMQP-CPP 这个开源客户端库 —— 它完全实现了 AMQP 协议,支持异步非阻塞通信,具有分层架构(底层网络层 + 上层 AMQP 应用层)、跨平台兼容性,且支持 C++17 特性。

AMQP-CPP 的使用有两种模式:

一是使用默认 TCP 模块【自己实现】,二是结合扩展网络库(我们选择 libev,因其集成性更好)。

核心类包括:

Connection(通信连接类,用于与 Broker 建立连接,可配置用户名、密码、主机地址、端口和虚拟主机等信息)

Channel(信道,基于单个 Connection 可创建多个信道,避免连接资源浪费,所有 RabbitMQ 操作如声明交换机、声明队列、绑定队列、发布消息、订阅消息等均通过信道接口实现)。需要注意的是,信道的接口均为异步非阻塞类型,调用后立即返回,需通过回调函数(如成功回调、错误回调、消息回调等)处理执行结果。

此外,还有 Message 类(用于封装消息内容、路由键等信息)、Deferred 系列类(用于设置各类操作的回调函数)等核心组件,后续实践将详细讲解其用法。

最后补充 libev 的关键使用要点:libev 用于处理底层网络事件,需先创建事件循环句柄(通过 EV_DEFAULT 宏),启动事件循环(ev_run)后开始监控网络通信事件,退出时需调用 ev_break。由于 ev_run 是阻塞接口,通常会在独立线程中运行;若需跨线程退出事件循环,需借助 EV_ASYNC 异步操作,通过发送回调任务通知事件循环线程执行退出逻辑,避免直接调用 ev_break 导致异常。

项目地址:

AMQP-CPP 的项目地址为:https://github.com/CopernicaMarketingSoftware/AMQP-CPP


3.2 安装
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make && sudo make install
3.3 接口

AMQP-CPP 的使用有两种模式:

  • 使用默认的 TCP 模块进行网络通信
  • 使用扩展的libeventlibevlibuvasio异步网络通信组件进行通信

3.3.1 默认 TCP 模式
  • 实现一个类继承自AMQP::TcpHandler类,它负责网络层的 TCP 连接
  • 重写相关函数,其中必须重写monitor函数
  • monitor函数中需要实现的是将 fd 放入eventloop(select、epoll) 中监控,当 fd 可写可读就绪之后,调用 AMQP-CPP 的connection->process(fd, flags)方法
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>

class MyTcpHandler : public AMQP::TcpHandler
{
    /**
     * @brief AMQP库在创建新连接时调用的方法,与处理程序相关联
     * @param connection 附加到处理程序的连接
     */
    virtual void onAttached(AMQP::TcpConnection *connection) override
    {
        // @todo 添加您自己的实现,例如初始化事物以处理连接
    }

    /**
     * @brief 当TCP连接时由AMQP库调用的方法
     * @param connection 现在可以使用的连接
     */
    virtual void onConnected(AMQP::TcpConnection *connection) override
    {
        // @todo 添加您自己的实现(可能不需要)
    }

    /**
     * @brief 在建立安全TLS连接时调用的方法
     * @param connection 已被保护的连接
     * @param ssl 来自openssl库的ssl结构
     * @return bool 如果可以使用连接,则为True
     */
    virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override
    {
        // @todo 添加您自己的实现,例如读取证书并检查它是否确实是您的
        return true;
    }

    /**
     * @brief 当登录尝试成功时由AMQP库调用的方法
     * @param connection 现在可以使用的连接
     */
    virtual void onReady(AMQP::TcpConnection *connection) override
    {
        // @todo 添加您自己的实现,例如通过创建一个通道实例,然后开始发布或使用
    }

    /**
     * @brief 该方法在服务器尝试协商检测信号间隔时调用
     * @param connection 发生错误的连接
     * @param interval 建议的间隔(秒)
     * @return uint16_t 返回我们要使用的间隔
     */
    virtual uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t interval)
    {
        // 我们接受服务器的建议,但如果间隔小于一分钟,我们将使用一分钟的间隔
        if (interval < 60) interval = 60;

        // @todo 在事件循环中设置一个计时器,确保每隔interval秒调用connection->heartbeat()
        return interval;
    }

    /**
     * @brief 发生致命错误时由AMQP库调用的方法
     * @param connection 发生错误的连接
     * @param message 一条人类可读的错误消息
     */
    virtual void onError(AMQP::TcpConnection *connection, const char *message) override
    {
        // @todo 添加您自己的实现,例如通过向程序的用户报告错误并记录错误
    }

    /**
     * @brief 该方法在AMQP协议结束时调用的方法
     * @param connection AMQP协议结束的连接
     */
    virtual void onClosed(AMQP::TcpConnection *connection) override
    {
        // @todo 添加您自己的实现,例如在AMQP连接结束后立即执行某些操作
    }

    /**
     * @brief 当TCP连接关闭或丢失时调用的方法
     * @param connection 已关闭但现在无法使用的连接
     */
    virtual void onLost(AMQP::TcpConnection *connection) override
    {
        // @todo 添加您自己的实现(可能没有必要)
    }

    /**
     * @brief 调用的最终方法,表示将不再对处理程序进行有关连接的进一步调用
     * @param connection 可以被破坏的连接
     */
    virtual void onDetached(AMQP::TcpConnection *connection) override
    {
        // @todo 添加您自己的实现,如清理资源或退出应用程序
    }

    /**
     * @brief 当AMQP-CPP库想要与主事件循环交互时调用的方法
     * @param connection 想要与事件循环交互的连接
     * @param fd 应该被检查的文件描述符
     * @param flags 应该检查文件描述符的可读性或可写性的标志
     */
    virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override
    {
        // @todo 添加您自己的实现,例如将文件描述符添加到主应用程序和事件循环
        // 当事件循环报告文件描述符变为可读或可写时,调用connection->process(fd, flags)
    }
};
3.3.2 扩展模式

libev为例,我们不必自己实现monitor函数,可以直接使用 AMQP::LibEvHandler

3.3.2.1 头文件包含
#include <amqpcpp.h>

3.3.2.2 Connection

用于设定rabbitmq服务器地址后,创建一个通信连接对象。

/**
 * @brief LibEv事件循环处理器
 * 
 * 继承自TcpHandler,基于libev库实现异步I/O事件处理
 * libev是一个高性能事件循环库,类似于epoll
 * 
 * 职责:
 * - 管理TCP套接字的可读/可写事件
 * - 处理网络I/O的非阻塞读写
 * - 事件循环的调度和执行
 */
class LibEvHandler : public TcpHandler;

/**
 * @brief 连接处理器(抽象基类/接口)
 * 
 * 定义连接事件回调接口,由使用者实现
 * 
 * 典型回调事件:
 * - onConnected()      : 连接建立成功
 * - onConnectionClosed(): 连接关闭
 * - onError()          : 发生错误
 * - onDataReceived()   : 收到数据(解析AMQP帧)
 */
class ConnectionHandler;

/**
 * @brief AMQP服务器地址
 * 
 * 解析和存储RabbitMQ服务器的连接地址
 * 
 * URL格式:amqp://用户名:密码@主机名:端口号/虚拟主机
 * 
 * 示例:
 * - amqp://guest:guest@localhost:5672/
 * - amqp://admin:123456@192.168.1.100:5672/production
 * 
 * 组成部分:
 * - scheme: amqp (协议类型)
 * - user:   用户名(如 guest)
 * - pass:   密码(如 guest)
 * - host:   服务器地址(如 localhost、192.168.1.100)
 * - port:   端口号(默认5672)
 * - vhost:  虚拟主机(可选,默认"/")
 */
class Address {
public:
    /**
     * @brief 从URL字符串解析地址
     * @param address AMQP连接字符串
     * 
     * 解析逻辑:
     * 1. 提取协议(amqp://)
     * 2. 提取用户名和密码(user:pass@)
     * 3. 提取主机和端口(host:port/)
     * 4. 提取虚拟主机(vhost)
     */
    Address(const std::string &address);
    
    // 私有成员(推测):
    // std::string _host;      // 服务器地址
    // int _port;              // 端口号(5672)
    // std::string _user;      // 用户名
    // std::string _password;  // 密码
    // std::string _vhost;     // 虚拟主机
};

/**
 * @brief TCP连接(底层网络连接)
 * 
 * 负责建立和管理与RabbitMQ服务器的TCP连接
 * 
 * 继承关系:private ConnectionHandler
 * - private继承表示“实现继承”,TcpConnection内部使用ConnectionHandler的回调
 * - 对外隐藏了ConnectionHandler的接口
 * 
 * 职责:
 * - 建立TCP套接字连接
 * - 发送/接收原始数据(AMQP帧)
 * - 处理TCP层面的连接事件
 * - 将网络事件转发给TcpHandler
 */
class TcpConnection : private ConnectionHandler {
public:
    /**
     * @brief 建立TCP连接
     * @param handler 上层协议处理器(处理AMQP协议)
     * @param address 服务器地址
     * 
     * 执行流程:
     * 1. 创建socket
     * 2. 连接服务器(非阻塞)
     * 3. 注册到事件循环(libev)
     * 4. 等待连接建立或失败回调
     */
    TcpConnection(TcpHandler *handler, const Address &address);
    
    // 私有成员(推测):
    // int _sockfd;                    // 套接字文件描述符
    // TcpHandler* _handler;           // 上层协议处理器
    // Address _address;               // 服务器地址
    // bool _connected;                // 连接状态
    // std::vector<char> _send_buffer; // 发送缓冲区
    // std::vector<char> _recv_buffer; // 接收缓冲区
};

/**
 * @brief 登录认证信息
 * 
 * 封装RabbitMQ连接认证所需的用户名和密码
 * 
 * RabbitMQ认证机制:
 * - 默认使用PLAIN机制(明文传输,但在TLS加密下安全)
 * - 也支持其他SASL机制(如AMQPLAIN、EXTERNAL等)
 */
class Login {
public:
    /**
     * @brief 构造登录信息
     * @param user 用户名
     * @param password 密码
     * 
     * 默认用户名密码(开发环境):
     * - 用户名: guest
     * - 密码: guest
     * 
     * 生产环境建议:
     * - 创建独立用户,设置强密码
     * - 配置权限(读、写、配置)
     * - 不要使用guest(默认只允许localhost连接)
     */
    Login(std::string user, std::string password);
    
    // 私有成员(推测):
    // std::string _username;
    // std::string _password;
};

/**
 * @brief AMQP连接(应用层连接)
 * 
 * 在TcpConnection之上,实现AMQP 0-9-1协议的连接层
 * 
 * 连接建立流程(AMQP握手):
 * 1. TCP连接建立
 * 2. 客户端发送Protocol Header("AMQP 0-9-1")
 * 3. 服务器发送Connection.Start(包含认证机制)
 * 4. 客户端发送Connection.StartOk(携带Login信息)
 * 5. 服务器发送Connection.Tune(协商参数:心跳、帧最大大小等)
 * 6. 客户端发送Connection.TuneOk
 * 7. 客户端发送Connection.Open(指定vhost)
 * 8. 服务器发送Connection.OpenOk → 连接就绪
 * 
 * 连接参数:
 * - 心跳间隔:保持连接活跃,检测死连接
 * - 帧最大大小:每个AMQP帧的最大字节数(默认131072)
 * - 通道最大数:允许创建的最大Channel数(默认2047)
 */
class Connection {
public:
    /**
     * @brief 建立AMQP连接
     * @param handler 连接事件处理器(监听连接状态变化)
     * @param login 登录认证信息
     * @param vhost 虚拟主机(类似命名空间,隔离资源)
     * 
     * vhost的作用:
     * - 逻辑隔离不同的业务或租户
     * - 独立的交换机、队列、绑定关系
     * - 独立的权限配置
     * 
     * 示例:
     * Connection conn(&myHandler, Login("admin", "123"), "/production");
     */
    Connection(
        ConnectionHandler *handler,
        const Login &login,
        const std::string &vhost
    );
    
    /**
     * @brief 关闭连接
     * @return true: 关闭成功 false: 关闭失败
     * 
     * 优雅关闭流程:
     * 1. 发送Connection.Close(AMQP协议)
     * 2. 等待服务器回复Connection.CloseOk
     * 3. 关闭TCP连接
     * 4. 清理资源(释放Channel、队列等)
     * 
     * 注意:
     * - 关闭前应该先关闭所有Channel
     * - 未确认的消息可能会丢失
     */
    bool close();
    
    // 其他推测的方法(未在代码中):
    // Channel* createChannel();     // 创建通道(执行具体操作)
    // bool isOpen();                // 检查连接是否正常
    // void setHeartbeatInterval();  // 设置心跳间隔
    
private:
    // 私有成员(推测):
    // TcpConnection _tcp_conn;        // 底层TCP连接
    // ConnectionHandler* _handler;    // 连接事件处理器
    // std::string _vhost;             // 虚拟主机
    // Login _login;                   // 认证信息
    // int _channel_max;               // 最大通道数
    // int _frame_max;                 // 最大帧大小
    // int _heartbeat;                 // 心跳间隔(秒)
    // std::map<int, Channel*> _channels; // 通道映射表
};

3.3.2.3 Channel

channel是一个虚拟连接,一个连接上可以建立多个通道。并且所有的RabbitMq指令都是通过channel传输,所以连接建立后的第一步,就是建立channel。因为所有操作是异步的,所以在channel上执行指令的返回值并不能作为操作执行结果,实际上它返回的是Deferred类,可以使用它安装处理函数。

namespace AMQP {
/**
 * 通用回调函数类型
 */
using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;

// 队列的声明的成功回调函数
using QueueCallback = std::function<void(
    const std::string &name,
    uint32_t messagecount,
    uint32_t consumecount)>;

// 队列删除回调函数
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;

// 消息处理回调函数
using MessageCallback = std::function<void(
    const Message &message,   // 收到的消息
    uint64_t deliveryTag,     // 消息的唯一标识
    bool redelivered)>;       // 指示消息是否是被重新投递的

// 当启用发布者确认时,当服务器确认消息已被接收和处理时,将调用AckCallback
using AckCallback = std::function<void(
    uint64_t deliveryTag,     // 消息的唯一标识符
    bool multiple)>;          // 指示确认范围:false确认单个消息,true确认所有消费的该消息

// 消息被RabbitMQ成功接收和处理后调用的回调函数
using PublishAckCallback = std::function<void()>;
// 消息被RabbitMQ明确拒绝(nacked)时调用的回调函数
using PublishNackCallback = std::function<void()>;
// 消息丢失时调用的回调函数
using PublishLostCallback = std::function<void()>;
}

这些回调函数使得 AMQP-CPP 库能够在消息发布后提供反馈,帮助应用程序实现更可靠的消息传递机制。通过实现这些回调,开发者可以对消息的发布结果进行适当的处理,确保消息传递的可靠性或进行错误恢复。

extern const int durable;
extern const int autodelete;
extern const int active;
extern const int passive;
extern const int ifunused;
extern const int ifempty;
extern const int global;
extern const int nolocal;
extern const int noack;
extern const int exclusive;
extern const int nowait;
extern const int mandatory;
extern const int immediate;
extern const int redelivered;
extern const int multiple;
extern const int requeue;
extern const int readable;
extern const int writable;
extern const int internal;

namespace AMQP {
enum ExchangeType
{
    fanout,   // 广播交换,绑定的队列都能拿到消息
    direct,   // 直接交换,只将消息交给routingkey一致的队列
    topic,    // 主题交换,将消息交给符合bindingkey规则的队列
    headers,
    consistent_hash,
    message_deduplication
};
}
class Channel {
    Channel(Connection *connection);
    Deferred &close(); // 关闭当前channel
    Deferred &confirm(); // confirmSelect()
    Deferred &startTransaction();
    Deferred &rollbackTransaction();

    /**
     * 声明交换机
     * 以下flags可用于交换机:
     *  - durable      持久化,重启后交换机依然有效
     *  - autodelete   删除所有连接的使用者都离开时,自动删除交换
     *  - passive      仅被动检查交换机是否存在
     *  - internal     创建内部交换
     *
     * 此函数返回一个延迟处理程序。可以安装回调onSuccess(), onError() and onFinalize() methods.
     */
    Deferred &declareExchange(
        const std::string_view &name,  // 交换机名称
        ExchangeType type,             // 交换机类型
        int flags                      // 标志位
    );

    /**
     * int flags
     * flags可以是以下值的组合:
     *  - durable      持久队列,服务器重启后仍然有效
     *  - autodelete   当所有连接的使用者都离开时,自动删除队列
     *  - exclusive    仅被动检查队列是否存在,并且在连接断开时自动删除
     *
     * 可安装回调onSuccess(), onError() and onFinalize()方法。
     */
    Deferred &declareQueue(
        const std::string &name,  // 队列名称
        int flags                 // 队列标志位
    );

    /**
     * 将队列绑定到交换机
     * 可以安装回调onSuccess()、onError()和onFinalize()方法。
     */
    Deferred &bindQueue(
        const std::string_view &exchange,  // 交换机名称
        const std::string_view &queue,     // 队列名称
        const std::string_view &bindingkey // 绑定Key
    );

    Deferred &unbindQueue(
        const std::string_view &exchange,
        const std::string_view &queue,
        const std::string_view &bindingkey
    );

    /**
     * 删除队列
     * flag
     *  - ifunused    仅在队列没有连接者时删除
     *  - ifempty     仅在队列为空时删除
     */
    DeferredDelete &removeQueue(
        const std::string_view &name,
        int flags = 0
    );

    /**
     * 发布消息
     * flags参数指定如果消息无法路由由到队列时应该发生的情况。
     * 默认情况下,不可更改的消息将被静默地丢弃。
     * 发布之前,请确保您已经调用了recall()方法,并设置了所有适当的处理程序来处理这些返回消息。
     * flags:
     *  - mandatory  如果设置,服务器将返回未发送到队列的消息
     *  - immediate  如果设置,服务器将返回无法立即转发给使用者的消息
     */
    bool publish(
        std::string_view &exchange,    // 交换机名称
        const std::string_view &routingKey, // 路由Key
        const std::string &message,    // 消息内容
        int flags = 0                  // 标志位
    );

    /**
     * 当您将publish()方法与"immediate"或"mandatory"标志结合使用时,
     * rabbitmq会发回不可路由的消息。
     * DeferredRecall &recall();
     */
    DeferredRecall &recall();

    /**
     * 订阅队列消息
     * 消费者标识用于通过channel::cancel()调用取消订阅
     * 支持以下flags:
     *  - nolocal    如果设置了,则不会向消费者在此通道上发布的消息
     *  - noack      如果设置了,则不必对已消费的消息进行确认
     *  - exclusive  独占访问,不允许其他消费者
     * 可以安装回调onSuccess()、onError()和onFinalize()方法。
     */
    DeferredConsumer &consume(
        const std::string_view &queue,  // 队列名称
        const std::string_view &tag,    // 消费者标识
        int flags = 0
    );

    /**
     * 取消订阅
     * flags:
     *  - noack  消费消息不需要确认,也就是自动确认
     */
    DeferredCancel &cancel(const std::string_view &tag);

    /**
     * 从MQ检索单条消息
     */
    DeferredGet &get(const std::string_view &queue, int flags = 0);

    /**
     * 消费者确认接收到的消息
     * 当在RabbitMQ中从队列中接收(使用noack中接收消费消息时,必须确认该消息,以便从队列中删除)
     * flag: multiple 确认多条消息:之前传递的所有未确认的消息也会被确认
     */
    bool ack(uint64_t deliveryTag, int flags=0);

    /**
     * 消费者拒绝消息
     * flag: multiple 如果设置,则之前传递的所有未确认的消息也都是未确认的
     *  requeue: 如果已设置,则消息将放回队列中,否则将被删除
     */
    bool reject(uint64_t deliveryTag, int flags=0);

    /**
     * 回复所有没有被确认的消息
     * 如果requeue没有被设置为true,服务器将重新对消息进行排队,因此也可能最终到达不同的消费者
     */
    Deferred &recover(int flags = 0);
};
3.3.2.4 Reliable

用于可靠的消息发布及错误处理。

template <typename BASE=Tagger>
class Reliable : public BASE
{
    template <typename ...Args>
    Reliable(Args &&...args)
    DeferredPublish &publish(
        const std::string_view &exchange,
        const std::string_view &routingKey,
        const std::string_view &message,
        int flags = 0)
}

// 样例:
AMQP::TcpChannel mychannel(connection);
AMQP::Reliable reliable(mychannel);
reliable.publish("my-exchange", "my-key", "my first message")
    .onAck()
    .onNack()
    .onLost()
    .onError();

3.3.2.5 Message

用于订阅者收到消息后的处理。

class Envelope : public MetaData{
    const char *body()
    uint64_t bodySize()
}

class Message : public Envelope{
    const std::string &exchange()
    const std::string &routingkey()
}

3.3.2.6 Deferred

Deferred 类(包括其派生类,如 DeferredQueueDeferredConsumer 等)是处理异步操作的核心机制。它用于表示一个尚未完成的操作,用于设置异步调用的回调函数,进行接口的连续调用。

class Deferred
{
    Deferred &onSuccess(const SuccessCallback& callback)
    Deferred &onError(const ErrorCallback& callback)
    Deferred &onFinalize(const FinalizeCallback& callback)
}

示例:声明交换机

// channel.declareExchange() 接口返回的是Deferred对象
// 因此,可以连续调用onSuccess设置成功处理回调函数以及onError失败处理回调函数
channel.declareExchange(exchange, AMQP::ExchangeType::direct)
    .onSuccess([&](){
        std::cout << "声明交换机成功:" << exchange << std::endl;
    })
    .onError([&](const char *message) {
        std::cout << "声明交换机失败:" << message << std::endl;
    });

3.3.2.7 DeferredQueue

Channel 中队列相关操作接口的返回对象,用于设置队列操作的相关回调函数。

class DeferredQueue : public Deferred
{
    DeferredQueue &onSuccess(const QueueCallback& callback)
    DeferredQueue &onSuccess(const SuccessCallback& callback)
}

using QueueCallback = std::function<void(const std::string &name,
    uint32_t messageCount,
    uint32_t consumerCount)>;

示例:声明队列

// channel.declareQueue() 接口返回的是DeferredQueue对象
channel.declareQueue(queue)
    .onSuccess([&](const std::string &name,
        uint32_t messagecount,
        uint32_t consumercount){
        std::cout << "声明队列成功:" << queue << std::endl;
        std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
        std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
    })
    .onError([&](const char *message) {
        std::cout << "声明队列失败:" << message << std::endl;
        abort(); // 终止程序
    });

3.3.2.8 DeferredConsumer

Channel 中订阅相关操作接口的返回对象,用于设置订阅的相关回调函数。

class DeferredConsumer : public Deferred {
    DeferredConsumer &onSuccess(const ConsumeCallback& callback)
    // 收到消息的回调函数
    DeferredConsumer &onReceived(const MessageCallback& callback)
    /* Alias for onReceived() */
    DeferredConsumer &onMessage(const MessageCallback& callback)
    // 取消订阅的回调
    DeferredConsumer &onCancelled(const CancelCallback& callback)
}

using MessageCallback = std::function<void(const AMQP::Message &message,
    uint64_t deliveryTag,
    bool redelivered)>;

示例:订阅队列消息

// channel.consume() 接口返回的是DeferredConsumer对象
channel.consume(queue)
    .onMessage([&](const AMQP::Message &message,  // 收到的消息
        uint64_t deliveryTag,                   // 消息的唯一标识
        bool redelivered){
        std::string body(message.body(), message.bodySize());
        std::cout << "收到消息:" << body << std::endl;
        // 收到消息进行处理后,不要忘了对消息进行确认
        channel.ack(deliveryTag);
    })
    .onError([&](const char *message) {
        std::cout << "订阅队列消息失败:" << message << std::endl;
        abort(); // 终止程序
    })
    .onSuccess([&](const std::string_view &tag){
        std::cout << "订阅队列消息成功:" << std::endl;
    });
3.3.3 扩展网络库 libev
3.3.3.1 LibEvHandler

amqpcpp 库与 libev 网络库的对接类

#include <amqpcpp/libev.h>
#include <ev.h>

class LibEvHandler : public TcpHandler{
    LibEvHandler(struct ev_loop *loop, int priority = 0)
};

3.3.3.2 ev_loop

libev 网络通信库的核心使用接口。

typedef struct ev_async
{
    EV_WATCHER (ev_async)
    EV_ATOMIC_T sent; /* private */
} ev_async;

// break type
enum {
    EVBREAK_CANCEL = 0, /* undo unloop */
    EVBREAK_ONE   = 1, /* unloop once */
    EVBREAK_ALL   = 2  /* unloop all loops */
};

// 创建libev操作句柄
struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0))
#define EV_DEFAULT  ev_default_loop (0)
void ev_loop_destroy (struct ev_loop *loop)

// 开始事件循环 --- 阻塞接口
int ev_run (struct ev_loop *loop);
// 退出事件循环
void ev_break (struct ev_loop *loop, int32_t break_type);

// 异步事件回调函数
// 也就是每个外部针对ev的操作都必须在evloop所在线程内执行
// 这里相当打包了一个任务,加入到evloop的任务队列中进行执行
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents)

// 初始化一个异步任务
void ev_async_init(ev_async *w, callback cb);
// 启动loop的异步任务监测,这样就可以接收任务进行处理了
void ev_async_start(struct ev_loop *loop, ev_async *w);
// 发送异步任务到loop中进行执行
void ev_async_send(struct ev_loop *loop, ev_async *w);

// 示例:异步退出
void mycb(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
    ev_break(loop, EVBREAK_ALL);
}
ev_async ea;

ev_async_init(&ea, mycb);
ev_async_start(loop, &ea);
ev_async_send(loop, &ea);
3.4 样例
3.4.1 简单消息传输
3.4.1.1 目录结构
$ tree
.
├── makefile
├── simple_publish.cc
└── simple_subscribe.cc

3.4.1.2 样例代码

simple_publish.cc

/**
 * @file rabbitmq_producer.cpp
 * @brief RabbitMQ 生产者示例 - 使用 Direct Exchange 发送消息
 * 
 * 依赖库:
 * - libev: 高性能事件循环库
 * - amqpcpp: RabbitMQ 的 C++ 客户端库
 * - amqpcpp-libev: amqpcpp 与 libev 的适配器
 * 
 * 编译命令:
 * g++ -o producer rabbitmq_producer.cpp -lev -lamqpcpp -lpthread
 */

#include <amqpcpp.h>      // AMQP 协议核心库
#include <ev.h>           // libev 事件循环库
#include <amqpcpp/libev.h> // AMQP-CPP 的 libev 适配器
#include <iostream>       // 标准输入输出

int main()
{
    // ==================== 0. 配置参数 ====================
    // RabbitMQ 服务器连接 URL(标准 AMQP 格式)
    const std::string url = "amqp://admin:123456@192.168.65.128:5672/";
    // URL 格式解析:
    // amqp://  - 协议类型(也支持 amqps:// 表示 TLS 加密)
    // admin    - 用户名
    // 123456   - 密码
    // 192.168.65.128 - RabbitMQ 服务器 IP
    // 5672     - 端口号(默认 5672,TLS 是 5671)
    // /        - 虚拟主机(默认根路径)
    
    const std::string exchange = "my-exchange";      // 交换机名称
    const std::string queue = "my-queue";            // 队列名称
    const std::string binding_key = "my-binding-key"; // 绑定键(路由键)

    // ==================== 1. 初始化网络通信层 ====================
    /**
     * @brief libev 事件循环
     * 
     * EV_DEFAULT 是一个宏,返回默认的事件循环实例
     * libev 会管理所有套接字的 I/O 事件(可读、可写、错误等)
     * 类似于 epoll/select,但更高效且跨平台
     */
    auto *ev_loop = EV_DEFAULT;
    
    /**
     * @brief AMQP-CPP 的 libev 适配器
     * 
     * 将 AMQP-CPP 的网络事件转换为 libev 事件
     * 当有网络事件发生时(如连接建立、数据到达),会通过这个 handler 通知 AMQP-CPP
     */
    AMQP::LibEvHandler handler(ev_loop);

    // ==================== 2. 建立 TCP 连接 ====================
    /**
     * @brief TCP 连接对象
     * 
     * 负责与 RabbitMQ 服务器建立底层的 TCP 连接
     * 内部处理:
     * - TCP 三次握手
     * - AMQP 协议头协商
     * - 连接认证(用户名/密码)
     * - 心跳保活
     * - 断线重连(可选)
     */
    AMQP::TcpConnection connection(&handler, AMQP::Address(url));

    // ==================== 3. 创建通道 ====================
    /**
     * @brief AMQP 通道
     * 
     * 通道是 AMQP 协议中的重要概念:
     * - 一个 TCP 连接可以包含多个通道(多路复用)
     * - 每个通道独立工作,互不干扰
     * - 通道是执行 AMQP 命令的上下文(声明交换机、队列,发送消息等)
     * 
     * 典型应用:
     * - 通道1:处理订单消息
     * - 通道2:处理用户通知
     * - 通道3:处理日志消息
     * 
     * 优势:避免为每种业务创建单独的 TCP 连接,节省资源
     */
    AMQP::TcpChannel channel(&connection);

    // ==================== 4. 声明交换机 ====================
    /**
     * @brief 声明交换机(异步操作)
     * 
     * declareExchange() 会发送 AMQP 命令到 RabbitMQ
     * 操作是异步的,需要注册回调处理结果
     * 
     * 交换机类型(ExchangeType):
     * - direct  : 完全匹配路由键(精确路由)
     * - fanout  : 广播到所有绑定的队列(忽略路由键)
     * - topic   : 通配符匹配路由键(* 匹配一个词,# 匹配多个词)
     * - headers : 根据消息头部匹配(不依赖路由键)
     */
    channel.declareExchange(exchange, AMQP::ExchangeType::direct)
        /**
         * @brief 声明成功的回调
         * 
         * 执行顺序:在 RabbitMQ 确认交换机创建成功后调用
         */
        .onSuccess([&](){
            std::cout << "声明交换机成功:" << exchange << std::endl;
            
            // ==================== 5. 声明队列 ====================
            /**
             * @brief 声明队列
             * 
             * 队列是消息的存储容器
             * 特性:
             * - 先进先出(FIFO)
             * - 支持持久化(服务器重启后消息不丢失)
             * - 支持排他(只能被当前连接使用)
             * - 支持自动删除(没有消费者时自动删除)
             * 
             * 回调参数:
             * - name         : 队列名称(RabbitMQ 可能自动生成)
             * - messagecount : 队列中积压的消息数量
             * - consumercount: 当前正在消费的消费者数量
             */
            channel.declareQueue(queue)
                .onSuccess([&](const std::string &name,
                    uint32_t messagecount,
                    uint32_t consumercount){
                    std::cout << "声明队列成功:" << queue << std::endl;
                    std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
                    std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
                    
                    // ==================== 6. 绑定交换机和队列 ====================
                    /**
                     * @brief 绑定队列到交换机
                     * 
                     * 绑定关系决定了消息如何从交换机路由到队列
                     * 
                     * 对于 direct 交换机:
                     * - 消息的 routing_key 必须 == binding_key 才会被路由到该队列
                     * 
                     * 对于 fanout 交换机:
                     * - 忽略 binding_key,所有绑定的队列都收到消息
                     * 
                     * 对于 topic 交换机:
                     * - 支持通配符匹配(如 *.stock.*, #.error)
                     * 
                     * 一个队列可以用多个 binding_key 绑定到同一个交换机
                     * 一个交换机也可以绑定到多个队列
                     */
                    channel.bindQueue(exchange, queue, binding_key)
                        .onSuccess([&](){
                            std::cout << "绑定交换机和队列成功:" << std::endl;
                            
                            // ==================== 7. 发送消息 ====================
                            /**
                             * @brief 发布消息到交换机
                             * 
                             * @param exchange    : 目标交换机名称
                             * @param routing_key : 路由键(决定消息去哪个队列)
                             * @param message     : 消息内容(字节流)
                             * @param flags       : 消息属性(可选)
                             * 
                             * 消息属性(flags)示例:
                             * - AMQP::mandatory : 如果没有队列接收,返回消息给生产者
                             * - AMQP::immediate : 如果没有消费者,立即返回(已废弃)
                             * 
                             * 消息可以携带自定义属性(Message Properties):
                             * - contentType     : MIME 类型(如 application/json)
                             * - deliveryMode    : 持久化模式(1=非持久,2=持久)
                             * - priority        : 优先级(0-9)
                             * - correlationId   : 关联 ID(用于 RPC 调用)
                             * - replyTo         : 回复队列名称(用于 RPC 调用)
                             * - expiration      : 消息过期时间(毫秒)
                             * - messageId       : 消息唯一标识(用于去重)
                             * - timestamp       : 时间戳
                             * - userId          : 用户 ID(需要授权)
                             * - appId           : 应用 ID
                             */
                            bool ret = channel.publish(exchange,
                                binding_key, "Hello World");
                            
                            if(ret == false) {
                                std::cout << "发送消息失败" << std::endl;
                            } else {
                                std::cout << "发送消息成功: Hello World" << std::endl;
                            }
                        })
                        /**
                         * @brief 绑定失败的回调
                         * 
                         * @param message 错误描述信息
                         */
                        .onError([&](const char *message) {
                            std::cout << "绑定交换机和队列失败:" << message << std::endl;
                            abort(); // 终止程序
                        });
                })
                .onError([&](const char *message) {
                    std::cout << "声明队列失败:" << message << std::endl;
                    abort(); // 终止程序
                });
        })
        /**
         * @brief 声明交换机失败的回调
         */
        .onError([&](const char *message) {
            std::cout << "声明交换机失败:" << message << std::endl;
            abort(); // 终止程序
        });

    // ==================== 8. 启动事件循环 ====================
    /**
     * @brief 运行 libev 事件循环
     * 
     * 这是一个阻塞调用,会一直运行直到:
     * - ev_break() 被调用
     * - 程序收到终止信号
     * - 发生致命错误
     * 
     * 事件循环处理的任务:
     * - 监听套接字的 I/O 事件
     * - 执行定时器(如心跳检测)
     * - 执行异步回调函数
     * 
     * 注意:所有 AMQP-CPP 的回调函数都在这个事件循环中执行
     * 因此不要在回调中执行耗时操作,否则会阻塞网络处理
     */
    ev_run(ev_loop);
    
    /**
     * @note 代码不会执行到这里,因为 ev_run 会阻塞
     * 正常退出需要在回调中调用 ev_break(ev_loop)
     */
    return 0;
}

simple_subscribe.cc

#include <amqpcpp.h>
#include <ev.h>
#include <amqpcpp/libev.h>
#include <iostream>

int main()
{
    // 0. 定义rabbitmq容器的访问url
    const std::string url = "amqp://admin:123456@192.168.65.128:5672/";
    const std::string exchange = "my-exchange";
    const std::string queue = "my-queue";
    const std::string binding_key = "my-binding-key";

    // 1. 实例化libev网络通信模块句柄
    auto *ev_loop = EV_DEFAULT; // 网络通信事件循环句柄
    AMQP::LibEvHandler handler(ev_loop); // 与amqpcpp库结合的句柄

    // 2. 实例化Connection对象//与rabbitmq服务器的连接对象
    AMQP::TcpConnection connection(&handler, AMQP::Address(url));

    // 3. 实例化Channel对象
    AMQP::TcpChannel channel(&connection); // 与rabbitmq服务器的通信通道对象

    // 4. 声明交换机: 声明一个交换机, direct类型(直接交换)
    channel.declareExchange(exchange, AMQP::ExchangeType::direct)
        .onSuccess([&](){
            std::cout << "声明交换机成功:" << exchange << std::endl;
            // 5. 声明队列
            channel.declareQueue(queue)
                .onSuccess([&](const std::string &name,
                    uint32_t messagecount,
                    uint32_t consumercount){
                    std::cout << "声明队列成功:" << queue << std::endl;
                    std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
                    std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
                    // 6. 绑定交换机和队列(采用直接交换)
                    channel.bindQueue(exchange, queue, binding_key)
                        .onSuccess([&](){
                            std::cout << "绑定交换机和队列成功:" << std::endl;
                            // 7. 订阅队列消息
                            channel.consume(queue)
                                .onMessage([&](const AMQP::Message &message,  // 收到的消息
                                    uint64_t deliveryTag,                   // 消息的唯一标识
                                    bool redelivered){
                                    std::string body(message.body(), message.bodySize());
                                    std::cout << "收到消息:" << body << std::endl;
                                    // 收到消息进行处理后,不要忘了对消息进行确认
                                    channel.ack(deliveryTag);
                                })
                                .onError([&](const char *message) {
                                    std::cout << "订阅队列消息失败:" << message << std::endl;
                                    abort(); // 终止程序
                                })
                                .onSuccess([&](){
                                    std::cout << "订阅队列消息成功:" << std::endl;
                                });
                        })
                        .onError([&](const char *message) {
                            std::cout << "绑定交换机和队列失败:" << message << std::endl;
                            abort(); // 终止程序
                        });
                })
                .onError([&](const char *message) {
                    std::cout << "声明队列失败:" << message << std::endl;
                    abort(); // 终止程序
                });
        })
        .onError([&](const char *message) {
            std::cout << "声明交换机失败:" << message << std::endl;
            abort(); // 终止程序
        });

    // 8. 启动事件循环
    ev_run(ev_loop);
    return 0;
}

3.4.1.3 项目构建
all: simple_publish simple_subscribe

simple_publish: simple_publish.cc
	g++ -std=c++17 $^ -o $@ -lamqpcpp -lev -lpthread

simple_subscribe: simple_subscribe.cc
	g++ -std=c++17 $^ -o $@ -lamqpcpp -lev -lpthread

clean:
	rm -rf simple_publish simple_subscribe

3.4.1.4 运行演示

订阅端

dev@fc56006988fe:~/example/amqpcpp$ ./simple_subscribe
声明交换机成功:my-exchange
声明队列成功:my-queue
队列中有0个消息
队列中有0个消费者
绑定交换机和队列成功:
订阅队列消息成功:
收到消息:Hello World

发布端

dev@fc56006988fe:~/example/amqpcpp$ ./simple_publish
声明交换机成功:my-exchange
声明队列成功:my-queue
队列中有0个消息
队列中有1个消费者
绑定交换机和队列成功:
3.4.2 死信队列
3.4.2.1 要素

上面我们已经完成了 AMQP-CPP 第三方库的基础使用实践与功能测试。下面我们将重点带大家学习死信队列相关知识。首先要明确的核心概念是 “死信”,死信队列的核心包含三个关键要素,第一个便是死信消息(又称死信)。死信消息指的是当一条消息满足特定条件时,就会成为无处安放的 “死信”,具体满足的条件有三类:

第一,消息被订阅客户端拒绝处理,代理服务器将消息推送给订阅端后,订阅端明确拒绝处理该消息,这条消息就会变为死信;

第二,消息过期,若创建队列时为队列中的消息设置了过期时间,而消息在规定时间内未被消费,就会成为死信(上面的简单示例中未设置过期时间,因此消息不会过期);

第三队列达到最大节点数量且已存满,此时再发布新消息,新消息因无处存放会变为死信。

为了处理这些 “无处安放” 的死信消息,避免其被随意丢弃(尤其是部分死信可能仍有重要价值),便引入了死信交换机和死信队列。死信交换机本质上是一个普通交换机,只是它与另一个常规交换机 / 队列建立了关联;死信队列则是绑定到死信交换机上的普通队列。当消息变为死信后,RabbitMQ 会将其重新发布到预先关联的死信交换机,再由死信交换机通过绑定的路由键,将死信消息路由到死信队列中存储,应用程序可通过消费死信队列中的消息来处理这些死信。值得一提的是,死信队列的核心应用场景之一是实现延时队列 —— 在 RabbitMQ 高版本虽提供了延时队列插件,但早期版本的延时功能正是基于死信队列的思想实现的,比如数据库与缓存数据同步等场景中,延时队列就有着广泛应用。

死信消息(Dead Letter Message): 当一个消息满足某些特定条件时(消息拒绝、消息过期、队列满了),它就会变成 “死信消息”。

死信交换机(Dead Letter Exchange - DLX): 这是一个普通的交换机。当一个消息变成死信后,RabbitMQ 会将其重新发布(publish)到这个特定的交换机。

死信队列(Dead Letter Queue - DLQ): 这是一个绑定到死信交换机上的普通队列。死信交换机将死信消息(通过绑定的 Routing Key)路由到这个队列进行存储。应用程序可以消费这个队列中的消息来处理这些 “死信”。


3.4.2.2 使用流程

死信队列的使用流程十分清晰,主要分为两步:

第一步,创建普通交换机 A 和普通队列 A 并进行绑定,这一组交换机和队列将作为死信交换机与死信队列使用;

第二步,创建普通交换机 B 和普通队列 B 并进行绑定,这一组作为常规的交换机与队列,关键是在创建队列 B 时,需要额外设置关联的死信交换机(即第一步创建的交换机 A)相关信息,完成关联配置。一旦常规队列 B 中的消息出现被拒绝、过期或队列满导致无法存放的情况,这些消息就会被转发到死信队列 A 中。


3.4.2.3 相关接口与类
/**
 * @brief 字段基类
 * 
 * 设计目的:
 * 1. 作为所有字段类型的基类,实现多态
 * 2. 可能提供通用的字段操作接口(如序列化、比较等)
 * 3. 用于构建类似 ORM 的字段映射系统
 * 
 * 典型应用场景:
 * - 数据库查询结果的行数据封装
 * - 动态配置项的键值对存储
 * - RabbitMQ 的消息头/参数封装
 */
class Field {
    // 可能包含的虚函数:
    // virtual std::string toString() const;
    // virtual json toJson() const;
    // virtual void fromJson(const json& j);
};

/**
 * @brief 表/键值对容器类(继承自 Field)
 * 
 * 设计意图:
 * - 模拟数据库表的行数据结构
 * - 提供类似 Python 字典或 JavaScript 对象的字段访问方式
 * - 支持链式调用,方便构建复杂的参数配置
 * 
 * 使用示例:
 * Table config;
 * config.set("host", "localhost")
 *       .set("port", 5672)
 *       .set("auto_reconnect", true);
 * 
 * std::string host = config["host"].as<std::string>();
 * int port = config["port"].as<int>();
 */
class Table : public Field {
public:
    /**
     * @brief 设置字段值(布尔类型重载)
     * @param name 字段名称
     * @param value 字段值
     * @return Table& 返回自身引用,支持链式调用
     * 
     * 设计模式:Builder 模式 + Fluent Interface
     * 优点:可以连续设置多个字段,代码更简洁
     * 
     * 使用示例:
     * table.set("enabled", true)
     *      .set("debug", false)
     *      .set("persistent", true);
     */
    Table &set(const std::string &name, bool value);
    
    // 其他基础类型的重载(未显示但应该存在):
    // Table &set(const std::string &name, int value);
    // Table &set(const std::string &name, double value);
    // Table &set(const std::string &name, const std::string &value);
    // Table &set(const std::string &name, const char* value);
    // Table &set(const std::string &name, const Table& value); // 嵌套表
    
    /**
     * @brief 下标访问运算符重载
     * @param name 字段名称
     * @return AssociativeFieldProxy 代理对象
     * 
     * 设计模式:Proxy 模式
     * 目的:
     * 1. 延迟字段类型转换(类似 std::any)
     * 2. 支持动态类型(运行时才知道字段是什么类型)
     * 3. 实现 operator[] 的读写统一接口
     * 
     * 使用示例:
     * // 写入
     * table["name"] = "张三";
     * table["age"] = 25;
     * table["active"] = true;
     * 
     * // 读取(需要调用 as<T>() 转换)
     * std::string name = table["name"].as<std::string>();
     * int age = table["age"].as<int>();
     * bool active = table["active"].as<bool>();
     * 
     * // 判断字段是否存在
     * if (table.has("name")) { ... }
     */
    AssociativeFieldProxy operator[](const std::string& name);
};

/**
 * @brief RabbitMQ 队列/交换机参数配置类
 * 
 * 注意:这里的 Table 类型就是 AMQP::Table
 * 它是 AMQP-CPP 库中用于表示 AMQP 表(字段表)的类型
 * 
 * AMQP 0-9-1 协议中的表(Table)是一种键值对数据结构:
 * - 键(Key):字符串
 * - 值(Value):可以是多种类型(布尔、整数、字符串、数组、嵌套表等)
 * 
 * 死信队列(Dead Letter Queue, DLQ)配置:
 * 当消息在队列中变成"死信"时,RabbitMQ 会自动将其转发到指定的死信交换机
 * 
 * 消息变成死信的条件:
 * 1. 消息被消费者拒绝(basic.reject 或 basic.nack)且 requeue=false
 * 2. 消息过期(TTL 到期)
 * 3. 队列长度达到上限,最早的消息被移除
 */
// 死信队列参数配置
AMQP::Table args;  // 创建一个空表,用于存储队列参数

/**
 * @brief 设置死信交换机
 * 
 * x-dead-letter-exchange(DLX):Dead Letter Exchange
 * - 作用:指定消息变成死信后转发到哪个交换机
 * - 类型:字符串
 * - 必需:是(如果不设置,死信消息会被直接丢弃)
 * 
 * 工作原理:
 * 当队列中的消息变成死信时,RabbitMQ 会:
 * 1. 自动将消息发布到指定的死信交换机
 * 2. 使用原消息的路由键(除非指定了 x-dead-letter-routing-key)
 * 3. 死信交换机根据路由键将消息路由到死信队列
 */
args["x-dead-letter-exchange"] = "xxx";
//                                 ↑
//                                 死信交换机名称(需要提前声明)

/**
 * @brief 设置死信路由键
 * 
 * x-dead-letter-routing-key:Dead Letter Routing Key
 * - 作用:覆盖死信消息的原始路由键
 * - 类型:字符串
 * - 必需:否(如果不设置,使用原消息的路由键)
 * 
 * 使用场景:
 * 1. 统一路由:将所有死信都路由到同一个队列
 * 2. 分类处理:根据死信原因使用不同的路由键
 * 3. 调试:给死信消息打上特殊标记
 * 
 * 示例:
 * - 设置特定路由键:所有死信都进入同一个死信队列
 * - 不设置:死信可能根据原始路由键进入不同的队列
 */
args["x-dead-letter-routing-key"] = "xxx";
//                                  ↑
//                                  死信路由键(用于绑定交换机和队列)

/**
 * @brief 设置消息过期时间(队列级别)
 * 
 * x-message-ttl:Message Time To Live
 * - 作用:队列中所有消息的存活时间(毫秒)
 * - 类型:整数(32位)
 * - 必需:否(默认不设置,消息永不过期)
 * - 单位:毫秒(1秒 = 1000毫秒)
 * 
 * 重要特性:
 * 1. 队列级别 TTL:所有进入该队列的消息都有相同的过期时间
 * 2. 消息级别 TTL:每条消息可以单独设置 expiration 属性
 * 3. 优先级:取较小的值(如果同时设置)
 * 
 * 使用场景:
 * 1. 实现延时队列:消息过期后进入死信队列,被消费者处理
 * 2. 自动清理:临时消息(如验证码)自动过期删除
 * 3. 限时任务:超过时间未处理的任务自动失效
 * 
 * 注意事项:
 * - 消息过期不会立即删除,而是在即将投递给消费者时检查
 * - 过期消息会进入死信队列(如果配置了 DLX)
 * - 设置过短的 TTL 可能影响性能(需要频繁检查过期)
 */
args["x-message-ttl"] = 10000; // 10秒 = 10000毫秒
//                      ↑
//                      10秒后消息过期(单位:毫秒)

// ========== 其他常见的队列参数 ==========
// args["x-max-length"] = 1000;        // 队列最大消息数量
// args["x-max-size"] = 10485760;      // 队列最大总大小(字节)
// args["x-max-priority"] = 10;        // 消息优先级(0-10)
// args["x-expires"] = 3600000;        // 队列空闲自动删除时间(毫秒)
// args["x-queue-mode"] = "lazy";      // 惰性队列(消息存磁盘)

在代码实现层面,创建队列时的关联配置需通过参数设定,涉及 AMQP-CPP 库中的 Table 类和 Field 类,可通过 Table 类的中括号语法设置队列参数,无需刻意记忆,使用时参考示例复制即可。核心参数包括:x-dead-letter-exchange(指定关联的死信交换机名称)、x-dead-letter-routing-key(指定死信交换机路由到死信队列的绑定键)、x-message-ttl(设置消息过期时间,以毫秒为单位)。


3.4.2.4 样例

接下来我们通过实际代码调整来演示死信队列的使用,实现延时消息功能。我们将创建两个代码文件:dlx_publish.cc(发布端)和 dlx_subscribe.cc(订阅端)。

发布端的实现流程为:

第一步,创建死信交换机 A、死信队列 A 并绑定;

第二步,创建常规交换机 B、常规队列 B 并绑定,创建队列 B 时传入上述死信相关参数(设置 5000 毫秒的消息过期时间);

第三步,向常规交换机 B 发布消息,消息过期后会自动转发到死信交换机 A,再路由到死信队列 A。订阅端的实现则更为简单,只需创建并绑定死信交换机 A 和死信队列 A,直接订阅死信队列 A 的消息即可,无需关注常规队列 B 的情况,这样收到的消息就是经过延时(过期转发)的消息。

在实际操作中,需注意几个关键问题:

  • 一是确保各类名称(交换机名、队列名、绑定键)完全对应,避免因名称不一致导致路由失败;

  • 二是创建常规队列时,务必将配置好的死信参数传入,否则无法实现死信转发;

  • 三是订阅操作需放在队列与交换机绑定成功的回调函数中,避免因资源未创建完成导致订阅失败。

通过本次实践,我们成功基于死信队列实现了延时队列功能,大家后面也需动手实践,熟练掌握相关配置与代码编写,为后续缓存相关场景的应用打下基础。

$ tree
.
├── makefile
├── dlx_publish.cc
└── dlx_subscribe.cc

dlx_publish.cc

/**
 * @file delayed_queue.cpp
 * @brief RabbitMQ 延时队列实现(基于死信交换机 DLX)
 * 
 * 实现原理:
 * 1. 创建带 TTL(过期时间)的队列,并关联死信交换机
 * 2. 消息发送到延时队列后,等待 TTL 时间过期
 * 3. 消息过期后自动转发到死信队列
 * 4. 消费者监听死信队列,实现延时消费
 * 
 * 应用场景:
 * - 订单超时自动取消(30分钟未支付)
 * - 定时任务延迟执行
 * - 重试机制(失败后延迟重试)
 * - 缓存延迟刷新
 */

#include <amqpcpp.h>      // AMQP 协议核心库
#include <ev.h>           // libev 事件循环库
#include <amqpcpp/libev.h> // AMQP-CPP 的 libev 适配器
#include <iostream>       // 标准输入输出

/**
 * @brief 声明交换机、队列并绑定(辅助函数)
 * 
 * 这是一个通用的组件声明函数,用于快速创建 RabbitMQ 的基础组件
 * 
 * @param channel      AMQP 通道
 * @param exchange     交换机名称
 * @param queue        队列名称
 * @param binding_key  绑定键(路由键)
 * 
 * @note 这是一个异步函数,通过回调通知结果
 */
void declaredComponent(AMQP::TcpChannel &channel,
    const std::string &exchange,
    const std::string &queue,
    const std::string &binding_key) {
    
    // 1. 声明交换机(direct 类型:完全匹配路由键)
    channel.declareExchange(exchange, AMQP::ExchangeType::direct)
        .onSuccess([&](){
            std::cout << "声明交换机成功:" << exchange << std::endl;
            
            // 2. 声明队列
            channel.declareQueue(queue)
                .onSuccess([&](const std::string &name,
                    uint32_t messagecount,
                    uint32_t consumercount){
                    std::cout << "声明队列成功:" << queue << std::endl;
                    std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
                    std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
                    
                    // 3. 绑定交换机与队列(建立路由关系)
                    channel.bindQueue(exchange, queue, binding_key)
                        .onSuccess([&](){
                            std::cout << "绑定交换机和队列成功:" << std::endl;
                        })
                        .onError([&](const char *message) {
                            std::cout << "绑定交换机和队列失败:" << message << std::endl;
                            abort(); // 终止程序
                        });
                })
                .onError([&](const char *message) {
                    std::cout << "声明队列失败:" << message << std::endl;
                    abort(); // 终止程序
                });
        })
        .onError([&](const char *message) {
            std::cout << "声明交换机失败:" << message << std::endl;
            abort(); // 终止程序
        });
}

int main()
{
    // ==================== 0. 配置参数 ====================
    // RabbitMQ 服务器地址(注意端口是 5672,不是 15672)
    const std::string url = "amqp://admin:123456@192.168.65.130:5672/";
    
    // 延时队列组件(用于接收消息,等待过期)
    const std::string delayed_exchange = "delayed-exchange";
    const std::string delayed_queue = "delayed-queue";
    const std::string delayed_binding_key = "delayed-binding-key";
    
    // 死信队列组件(用于接收过期后的消息)
    const std::string dlx_exchange = "dlx-exchange";
    const std::string dlx_queue = "dlx-queue";
    const std::string dlx_binding_key = "dlx-binding-key";

    // ==================== 1. 初始化网络层 ====================
    auto *ev_loop = EV_DEFAULT;                    // libev 事件循环
    AMQP::LibEvHandler handler(ev_loop);           // AMQP 事件处理器
    AMQP::TcpConnection connection(&handler, AMQP::Address(url)); // TCP 连接
    AMQP::TcpChannel channel(&connection);         // AMQP 通道

    // ==================== 2. 创建死信组件 ====================
    /**
     * 死信交换机(DLX)和死信队列的作用:
     * - 当延时队列中的消息过期后,RabbitMQ 会自动将其转发到这里
     * - 消费者应该监听这个死信队列,而不是延时队列
     * 
     * 架构图:
     * ┌─────────────┐     TTL=5s    ┌─────────────┐     过期后    ┌─────────────┐
     * │  生产者     │ ──────────→  │  延时队列   │ ──────────→  │  死信队列   │
     * │             │   发送消息    │ (delayed)  │   自动转发    │   (DLX)     │
     * └─────────────┘              └─────────────┘               └─────────────┘
     *                                                                   │
     *                                                                   ↓
     *                                                              ┌─────────────┐
     *                                                              │  消费者     │
     *                                                              │ (延时处理)  │
     *                                                              └─────────────┘
     */
    declaredComponent(channel, dlx_exchange, dlx_queue, dlx_binding_key);
    
    // ==================== 3. 创建延时队列(带死信配置) ====================
    /**
     * 延时队列的核心配置:
     * - x-dead-letter-exchange: 指定死信转发目标交换机
     * - x-dead-letter-routing-key: 指定死信消息的路由键
     * - x-message-ttl: 消息存活时间(毫秒),过期后自动进入死信
     * 
     * 注意:这个队列的消费者不是直接消费它!
     * 而是消费它绑定的死信队列
     */
    channel.declareExchange(delayed_exchange, AMQP::ExchangeType::direct)
        .onSuccess([&](){
            std::cout << "声明交换机成功:" << delayed_exchange << std::endl;
            
            // 配置队列参数(关键!)
            AMQP::Table args;
            /**
             * @brief x-dead-letter-exchange 参数
             * 作用:消息过期后,转发到哪个交换机
             */
            args["x-dead-letter-exchange"] = dlx_exchange;
            
            /**
             * @brief x-dead-letter-routing-key 参数
             * 作用:转发时使用什么路由键(覆盖原消息的路由键)
             * 
             * 如果不设置,会使用原消息的路由键
             * 这里设置为死信队列的绑定键,确保消息能正确路由到死信队列
             */
            args["x-dead-letter-routing-key"] = dlx_binding_key;
            
            /**
             * @brief x-message-ttl 参数
             * 作用:消息在队列中的存活时间(毫秒)
             * 
             * 这里设置为 5000ms = 5 秒
             * 消息进入此队列 5 秒后会自动过期,进入死信队列
             * 
             * 常见 TTL 配置:
             * - 订单超时:30分钟 = 1800000ms
             * - 短信验证码:60秒 = 60000ms
             * - 重试延迟:1秒 = 1000ms
             */
            args["x-message-ttl"] = 5000; // 5秒过期
            
            // 声明带参数的队列
            channel.declareQueue(delayed_queue, args)
                .onSuccess([&](const std::string &name,
                    uint32_t messagecount,
                    uint32_t consumercount){
                    std::cout << "声明队列成功:" << delayed_queue << std::endl;
                    std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
                    std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
                    
                    // 绑定延时交换机和延时队列
                    channel.bindQueue(delayed_exchange, delayed_queue, delayed_binding_key)
                        .onSuccess([&](){
                            std::cout << "绑定交换机和队列成功:" << std::endl;
                            
                            // ==================== 4. 发送测试消息 ====================
                            /**
                             * 发送消息到延时队列
                             * 
                             * 流程:
                             * 1. 消息进入 delayed_queue
                             * 2. 等待 5 秒(TTL)
                             * 3. 消息过期
                             * 4. 自动转发到 dlx_exchange
                             * 5. 根据 dlx_binding_key 路由到 dlx_queue
                             * 6. 消费者从 dlx_queue 消费消息
                             * 
                             * 最终效果:消息从发送到被消费,延迟了 5 秒
                             */
                            bool ret = channel.publish(delayed_exchange, delayed_binding_key, "hello world");
                            if(ret == false) {
                                std::cout << "发布消息到延时队列失败" << std::endl;
                            } else {
                                std::cout << "消息已发送到延时队列,将在 5 秒后进入死信队列" << std::endl;
                            }
                        })
                        .onError([&](const char *message) {
                            std::cout << "绑定交换机和队列失败:" << message << std::endl;
                            abort();
                        });
                })
                .onError([&](const char *message) {
                    std::cout << "声明队列失败:" << message << std::endl;
                    abort();
                });
        })
        .onError([&](const char *message) {
            std::cout << "声明交换机失败:" << message << std::endl;
            abort();
        });

    // ==================== 5. 启动事件循环 ====================
    /**
     * 事件循环会一直运行,处理:
     * - 网络 I/O 事件
     * - 定时器(心跳)
     * - 异步回调函数
     * 
     * 由于所有操作都是异步的,程序不会立即退出
     */
    ev_run(ev_loop);
    
    return 0;
}

dlx_subscribe.cc

#include <amqpcpp.h>
#include <ev.h>
#include <amqpcpp/libev.h>
#include <iostream>

void declaredComponent(AMQP::TcpChannel &channel,
    const std::string &exchange,
    const std::string &queue,
    const std::string &binding_key) {
    channel.declareExchange(exchange, AMQP::ExchangeType::direct)
        .onSuccess([&](){
            std::cout << "声明交换机成功:" << exchange << std::endl;
            // 5. 声明队列
            channel.declareQueue(queue)
                .onSuccess([&](const std::string &name,
                    uint32_t messagecount,
                    uint32_t consumercount){
                    std::cout << "声明队列成功:" << queue << std::endl;
                    std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
                    std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
                    // 6. 绑定交换机和队列(采用直接交换)
                    channel.bindQueue(exchange, queue, binding_key)
                        .onSuccess([&](){
                            std::cout << "绑定交换机和队列成功:" << std::endl;
                            // 7. 订阅队列消息
                            channel.consume(queue)
                                .onMessage([&](const AMQP::Message &message,  // 收到的消息
                                    uint64_t deliveryTag,                   // 消息的唯一标识
                                    bool redelivered){
                                    std::string body(message.body(), message.bodySize());
                                    std::cout << "收到消息:" << body << std::endl;
                                    // 收到消息进行处理后,不要忘了对消息进行确认
                                    channel.ack(deliveryTag);
                                })
                                .onError([&](const char *message) {
                                    std::cout << "订阅队列消息失败:" << message << std::endl;
                                    abort(); // 终止程序
                                })
                                .onSuccess([&](){
                                    std::cout << "订阅队列消息成功:" << std::endl;
                                });
                        })
                        .onError([&](const char *message) {
                            std::cout << "绑定交换机和队列失败:" << message << std::endl;
                            abort(); // 终止程序
                        });
                })
                .onError([&](const char *message) {
                    std::cout << "声明队列失败:" << message << std::endl;
                    abort(); // 终止程序
                });
        })
        .onError([&](const char *message) {
            std::cout << "声明交换机失败:" << message << std::endl;
            abort(); // 终止程序
        });
}

int main()
{
    const std::string url = "amqp://admin:123456@192.168.65.130:5672/";
    const std::string dlx_exchange = "dlx-exchange";
    const std::string dlx_queue = "dlx-queue";
    const std::string dlx_binding_key = "dlx-binding-key";

    auto *ev_loop = EV_DEFAULT;
    AMQP::LibEvHandler handler(ev_loop);
    AMQP::TcpConnection connection(&handler, AMQP::Address(url));
    AMQP::TcpChannel channel(&connection);

    // 订阅者只需要和死信队列进行绑定,不用关心死信消息的来源队列
    declaredComponent(channel, dlx_exchange, dlx_queue, dlx_binding_key);

    ev_run(ev_loop);
    return 0;
}
all: dlx_publish dlx_subscribe

dlx_publish:dlx_publish.cc
	g++ -std=c++17 $^ -o $@ -lamqpcpp -lev -lpthread

dlx_subscribe:dlx_subscribe.cc
	g++ -std=c++17 $^ -o $@ -lamqpcpp -lev -lpthread

clean:
	rm -rf dlx_publish dlx_subscribe

订阅端

dev@fc56006988fe:~/example/amqpcpp$ ./dlx_subscribe
声明交换机成功:dlx-exchange
声明队列成功:dlx-queue
队列中有0个消息
队列中有0个消费者
绑定交换机和队列成功:
订阅队列消息成功:
收到消息:hello world    # 该消息是延迟5秒后才收到的

发布端

dev@fc56006988fe:~/example/amqpcpp$ ./dlx_publish
声明交换机成功:dlx-exchange
声明交换机成功:delayed-exchange
声明队列成功:delayed-queue
队列中有0个消息
队列中有1个消费者
绑定交换机和队列成功:发布消息到延时队列

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐