AMQP-SDK 使用

1. RabbitMQ 简单说明
在开发C++ 高并发后端、分布式服务、实时数据推送、异步任务系统时,你是否也遇到过这些难题:C++ 服务间同步 RPC 调用耦合严重,一个模块崩溃导致整个链路不可用;高并发场景下(如物联网设备上报、高频交易、游戏服务)请求突增,C++ 服务直接被打满阻塞;需要跨语言、跨节点通信,手写 Socket、自研队列无持久化、无重试、丢数据风险极高;C++ 项目需要对接微服务集群,没有成熟可靠的消息队列客户端,二次开发成本巨大;延时任务、消息重试、死信处理等通用能力,用 C++ 从零实现复杂且稳定性差;海量消息需要有序、可靠投递,传统 C++ 并发队列无法支撑分布式场景。
尤其是在C++ 高性能服务、物联网、游戏后端、金融交易场景:C++ 核心服务需要异步处理日志、上报指标,同步 IO 严重拖慢性能;设备终端海量数据上行,需要队列削峰保护服务;分布式 C++ 节点需要解耦通信,强依赖会导致部署维护困难;金融、交易类 C++ 服务要求消息不丢失、不重复、必到达,自研方案无法保证;延时消息、定时任务(如订单超时、设备心跳)用 C++ 定时器实现难以支撑大规模场景。轻则服务卡顿、吞吐量下降,重则数据丢失、服务崩溃、线上故障。
而 RabbitMQ 正是为解决这些痛点而生 —— 这款支持原生 C++ 客户端的开源分布式消息队列中间件,专为 C++ 高性能场景设计,提供稳定、轻量、高性能的 AMQP 客户端库,让你用 C++ 极简代码实现服务解耦、异步通信、流量削峰、可靠消息投递。它完美适配 C++ 高并发、低延迟的需求,支持持久化、死信队列、高可用集群,无需手写底层网络通信与消息队列逻辑,是 C++ 分布式服务、高并发系统、异步任务 的首选消息中间件。
如果你以 C++ 开发 为主,RabbitMQ 提供成熟稳定的 AMQP-CPP 客户端(高性能、纯 C++、跨平台)
2. AMQP 简介
2.1 简介
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端 / 中间件不同产品、不同的开发语言等条件的限制。Erlang 中的实现有 RabbitMQ 等。
它规定了消息系统中三大组件 —— 消息服务器 / 代理节点 (server/broker)、生产者 / 发布者 (producer/publisher)、消费者 / 订阅者 (consumer/subscriber) 之间的通信规范,以及代理节点的设计规范等。
2.2 MQ 核心要素
高级消息队列协议(AMQP)—— 它是应用层的标准协议,不仅约定了消息系统中三大核心组件(消息服务器、生产者、消费者)的通信规范,还打破了客户端与中间件的产品限制、开发语言限制,让不同节点间的通信更灵活。除此之外,AMQP 还定义了消息队列使用中的核心模型要素,这是我们理解消息队列工作机制的关键。
AMQP 的核心模型要素中,首先是 Broker(消息代理服务器),这是整个消息通信的核心中转节点。实际通信场景中,生产者(消息发送方)不会直接与消费者(消息接收方)通信,而是先将消息发送到 Broker,消费者再从 Broker 获取消息。
这种模式的优势很明显:一是降低了生产者与消费者的耦合度,二是提升了容灾性,避免因网络中断导致消息丢失。我们使用 RabbitMQ 作为 Broker,它对 AMQP 协议支持完善,且已预装在开发环境中(版本为 4.0)。Broker 内部还包含虚拟主机(Virtual Host),用于实现环境隔离,让多个客户端或业务场景在同一 Broker 中互不干扰,各自独立运行。

在虚拟主机内部,核心组件是交换机(Exchange)和队列(Queue)。交换机作为消息分发中心,接收生产者发送的消息,并根据自身类型和路由规则将消息转发到对应的队列中。交换机主要有四种类型:
我们先把核心角色对应一下:
生产者:寄快递的人
路由键 (Routing Key):快递单上写的“地址关键词”(比如:省份、城市)
交换机 (Exchange):快递分拣中心
队列 (Queue):具体的快递存放点(最终要送到的地方)
绑定键 (Binding Key):快递存放点告诉分拣中心:“如果地址关键词是XXX,就把快递放我这里”
下面来看四种交换机分别怎么工作。
1. 直连交换机(Direct Exchange)根据消息的路由键(Routing Key)与队列的绑定键(Binding Key)完全匹配来转发消息;
一句话:路由键 必须完全等于 绑定键,精准送达。
生活例子:高铁/航班的“直达车”
-
你想从北京去上海,买票时选“北京 → 上海”。
-
分拣中心(交换机)看到路由键是“上海”,就直接把你送到绑定了“上海”的队列(上海出口)。
-
如果你的票是“北京 → 广州”,就绝对不会错送到上海。
队列A 绑定键 = "error"
队列B 绑定键 = "info"
消息 routing_key = "error" → 只去队列A
适用场景:明确的、一对一的精准路由(如日志级别:error / info / debug)。
2. 广播交换机(Fanout Exchange)忽略路由键,将消息广播到所有绑定的队列;
一句话:忽略任何地址,每个绑定的队列都收到一份。
生活例子:小区广播通知
-
物业在喇叭里喊:“明天停水”。
-
不管你家在几栋几单元(相当于忽略路由键),小区里所有住户(所有绑定的队列)都听得到。
-
你不能说“我只想让 1 号楼听到”,广播模式下不行。
队列A、B、C 都绑定到同一个 fanout 交换机
无论 routing_key 写什么(甚至空),A/B/C 都会收到相同消息
适用场景:广播、所有消费者必须同时收到通知(如配置更新、全局事件)。
3. 主题交换机(Topic Exchange)通过路由键与绑定键的通配符匹配,实现更灵活的多队列转发;
一句话:匹配时支持通配符 *(一个单词)和 #(0 或多个单词)。
生活例子:菜鸟驿站的“模糊取件码”
-
假设绑定键是:
*.杭州.* -
路由键
A.杭州.B✅ 匹配 -
路由键
杭州.A❌ 不匹配(因为*要求必须有单词) -
路由键
X.杭州.Y.Z且绑定键用#.杭州→ 匹配(#吃掉了前面的 X)
常用通配符:
-
*:刚好一个单词 -
#:0 个或多个单词 -
单词之间用
.分隔
队列A 绑定键 = "china.*.weather"
china.beijing.weather ✅
china.shanghai.weather ✅
china.beijing.temp ❌(最后不是weather)
适用场景:多级分类、多种匹配规则(如系统监控:*.critical.*、*.error)。
4. 头部交换机(Headers Exchange)则根据消息的自定义头部参数进行路由,适用于复杂场景但复杂度较高。
一句话:不看路由键,只看消息头(Header)里的 key-value 是否匹配。
生活例子:VIP 会员 + 学生身份双重验证
-
消息头里有:
{"会员等级": "钻石", "身份": "学生"} -
队列绑定条件:
x-match = all,且需要会员等级=钻石且身份=学生 -
必须两个都满足,才会进入这个队列。
x-match 参数:
-
all:所有条件都必须满足 -
any:任意一个条件满足即可
队列绑定 headers = { "format": "pdf", "type": "report", "x-match": "all" }
消息1 headers { "format": "pdf", "type": "report" } → ✅ 匹配
消息2 headers { "format": "pdf" } → ❌ 不匹配
适用场景:复杂路由、多维度匹配(如:游戏服务器按 客户端版本+渠道+地区 筛选消息),但因为性能和维护成本较高,实际用得相对少。
队列则是消息的存储容器,直到消费者接收并处理消息后,消息才会从队列中移除,且交换机与队列之间支持多对多绑定关系,进一步提升了消息转发的灵活性。
消息队列的核心作用包括解耦、异步处理、削峰填谷、负载均衡等:解耦让生产者与消费者无需直接交互,也无需同时在线;异步处理允许生产者发送消息后无需等待消费者响应,提升系统吞吐量;削峰填谷能应对短时间高并发消息,避免后端服务器过载;负载均衡则可将消息自动分配给多个消费者,提升处理效率。这些特性让消息队列在分布式系统中不可或缺。
生产者(Producer):
- 生产者是消息的发送方。它们创建消息并将其发布到 RabbitMQ 的交换机上。生产者通常将消息发送到一个或多个队列,以便消费者可以订阅并处理这些消息。
消费者(Consumer):
- 消费者是消息的接收方,它们订阅队列并从中获取消息。一旦消费者接收到消息,它们可以对消息进行处理,例如执行某些任务或将数据存储到数据库中。
2.3 工作流程

消息的传输流程:
- 生产者连接
Broker进行发布消息,并指定交换机和路由键。 - 交换机根据路由键将消息路由到一个或多个队列。
- 消费者从队列中获取消息并处理它们。
3. AMQP-CPP 简介
3.1 简介
要实现 C++ 程序与 RabbitMQ 的通信,我们选择 AMQP-CPP 这个开源客户端库 —— 它完全实现了 AMQP 协议,支持异步非阻塞通信,具有分层架构(底层网络层 + 上层 AMQP 应用层)、跨平台兼容性,且支持 C++17 特性。
AMQP-CPP 的使用有两种模式:
一是使用默认 TCP 模块【自己实现】,二是结合扩展网络库(我们选择 libev,因其集成性更好)。
核心类包括:
Connection(通信连接类,用于与 Broker 建立连接,可配置用户名、密码、主机地址、端口和虚拟主机等信息)
Channel(信道,基于单个 Connection 可创建多个信道,避免连接资源浪费,所有 RabbitMQ 操作如声明交换机、声明队列、绑定队列、发布消息、订阅消息等均通过信道接口实现)。需要注意的是,信道的接口均为异步非阻塞类型,调用后立即返回,需通过回调函数(如成功回调、错误回调、消息回调等)处理执行结果。
此外,还有 Message 类(用于封装消息内容、路由键等信息)、Deferred 系列类(用于设置各类操作的回调函数)等核心组件,后续实践将详细讲解其用法。
最后补充 libev 的关键使用要点:libev 用于处理底层网络事件,需先创建事件循环句柄(通过 EV_DEFAULT 宏),启动事件循环(ev_run)后开始监控网络通信事件,退出时需调用 ev_break。由于 ev_run 是阻塞接口,通常会在独立线程中运行;若需跨线程退出事件循环,需借助 EV_ASYNC 异步操作,通过发送回调任务通知事件循环线程执行退出逻辑,避免直接调用 ev_break 导致异常。
项目地址:
AMQP-CPP 的项目地址为:https://github.com/CopernicaMarketingSoftware/AMQP-CPP
3.2 安装
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make && sudo make install
3.3 接口
AMQP-CPP 的使用有两种模式:
- 使用默认的 TCP 模块进行网络通信
- 使用扩展的
libevent、libev、libuv、asio异步网络通信组件进行通信
3.3.1 默认 TCP 模式
- 实现一个类继承自
AMQP::TcpHandler类,它负责网络层的 TCP 连接 - 重写相关函数,其中必须重写
monitor函数 - 在
monitor函数中需要实现的是将 fd 放入eventloop(select、epoll) 中监控,当 fd 可写可读就绪之后,调用 AMQP-CPP 的connection->process(fd, flags)方法
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
class MyTcpHandler : public AMQP::TcpHandler
{
/**
* @brief AMQP库在创建新连接时调用的方法,与处理程序相关联
* @param connection 附加到处理程序的连接
*/
virtual void onAttached(AMQP::TcpConnection *connection) override
{
// @todo 添加您自己的实现,例如初始化事物以处理连接
}
/**
* @brief 当TCP连接时由AMQP库调用的方法
* @param connection 现在可以使用的连接
*/
virtual void onConnected(AMQP::TcpConnection *connection) override
{
// @todo 添加您自己的实现(可能不需要)
}
/**
* @brief 在建立安全TLS连接时调用的方法
* @param connection 已被保护的连接
* @param ssl 来自openssl库的ssl结构
* @return bool 如果可以使用连接,则为True
*/
virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override
{
// @todo 添加您自己的实现,例如读取证书并检查它是否确实是您的
return true;
}
/**
* @brief 当登录尝试成功时由AMQP库调用的方法
* @param connection 现在可以使用的连接
*/
virtual void onReady(AMQP::TcpConnection *connection) override
{
// @todo 添加您自己的实现,例如通过创建一个通道实例,然后开始发布或使用
}
/**
* @brief 该方法在服务器尝试协商检测信号间隔时调用
* @param connection 发生错误的连接
* @param interval 建议的间隔(秒)
* @return uint16_t 返回我们要使用的间隔
*/
virtual uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t interval)
{
// 我们接受服务器的建议,但如果间隔小于一分钟,我们将使用一分钟的间隔
if (interval < 60) interval = 60;
// @todo 在事件循环中设置一个计时器,确保每隔interval秒调用connection->heartbeat()
return interval;
}
/**
* @brief 发生致命错误时由AMQP库调用的方法
* @param connection 发生错误的连接
* @param message 一条人类可读的错误消息
*/
virtual void onError(AMQP::TcpConnection *connection, const char *message) override
{
// @todo 添加您自己的实现,例如通过向程序的用户报告错误并记录错误
}
/**
* @brief 该方法在AMQP协议结束时调用的方法
* @param connection AMQP协议结束的连接
*/
virtual void onClosed(AMQP::TcpConnection *connection) override
{
// @todo 添加您自己的实现,例如在AMQP连接结束后立即执行某些操作
}
/**
* @brief 当TCP连接关闭或丢失时调用的方法
* @param connection 已关闭但现在无法使用的连接
*/
virtual void onLost(AMQP::TcpConnection *connection) override
{
// @todo 添加您自己的实现(可能没有必要)
}
/**
* @brief 调用的最终方法,表示将不再对处理程序进行有关连接的进一步调用
* @param connection 可以被破坏的连接
*/
virtual void onDetached(AMQP::TcpConnection *connection) override
{
// @todo 添加您自己的实现,如清理资源或退出应用程序
}
/**
* @brief 当AMQP-CPP库想要与主事件循环交互时调用的方法
* @param connection 想要与事件循环交互的连接
* @param fd 应该被检查的文件描述符
* @param flags 应该检查文件描述符的可读性或可写性的标志
*/
virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override
{
// @todo 添加您自己的实现,例如将文件描述符添加到主应用程序和事件循环
// 当事件循环报告文件描述符变为可读或可写时,调用connection->process(fd, flags)
}
};
3.3.2 扩展模式
以libev为例,我们不必自己实现monitor函数,可以直接使用 AMQP::LibEvHandler。
3.3.2.1 头文件包含
#include <amqpcpp.h>
3.3.2.2 Connection
用于设定rabbitmq服务器地址后,创建一个通信连接对象。
/**
* @brief LibEv事件循环处理器
*
* 继承自TcpHandler,基于libev库实现异步I/O事件处理
* libev是一个高性能事件循环库,类似于epoll
*
* 职责:
* - 管理TCP套接字的可读/可写事件
* - 处理网络I/O的非阻塞读写
* - 事件循环的调度和执行
*/
class LibEvHandler : public TcpHandler;
/**
* @brief 连接处理器(抽象基类/接口)
*
* 定义连接事件回调接口,由使用者实现
*
* 典型回调事件:
* - onConnected() : 连接建立成功
* - onConnectionClosed(): 连接关闭
* - onError() : 发生错误
* - onDataReceived() : 收到数据(解析AMQP帧)
*/
class ConnectionHandler;
/**
* @brief AMQP服务器地址
*
* 解析和存储RabbitMQ服务器的连接地址
*
* URL格式:amqp://用户名:密码@主机名:端口号/虚拟主机
*
* 示例:
* - amqp://guest:guest@localhost:5672/
* - amqp://admin:123456@192.168.1.100:5672/production
*
* 组成部分:
* - scheme: amqp (协议类型)
* - user: 用户名(如 guest)
* - pass: 密码(如 guest)
* - host: 服务器地址(如 localhost、192.168.1.100)
* - port: 端口号(默认5672)
* - vhost: 虚拟主机(可选,默认"/")
*/
class Address {
public:
/**
* @brief 从URL字符串解析地址
* @param address AMQP连接字符串
*
* 解析逻辑:
* 1. 提取协议(amqp://)
* 2. 提取用户名和密码(user:pass@)
* 3. 提取主机和端口(host:port/)
* 4. 提取虚拟主机(vhost)
*/
Address(const std::string &address);
// 私有成员(推测):
// std::string _host; // 服务器地址
// int _port; // 端口号(5672)
// std::string _user; // 用户名
// std::string _password; // 密码
// std::string _vhost; // 虚拟主机
};
/**
* @brief TCP连接(底层网络连接)
*
* 负责建立和管理与RabbitMQ服务器的TCP连接
*
* 继承关系:private ConnectionHandler
* - private继承表示“实现继承”,TcpConnection内部使用ConnectionHandler的回调
* - 对外隐藏了ConnectionHandler的接口
*
* 职责:
* - 建立TCP套接字连接
* - 发送/接收原始数据(AMQP帧)
* - 处理TCP层面的连接事件
* - 将网络事件转发给TcpHandler
*/
class TcpConnection : private ConnectionHandler {
public:
/**
* @brief 建立TCP连接
* @param handler 上层协议处理器(处理AMQP协议)
* @param address 服务器地址
*
* 执行流程:
* 1. 创建socket
* 2. 连接服务器(非阻塞)
* 3. 注册到事件循环(libev)
* 4. 等待连接建立或失败回调
*/
TcpConnection(TcpHandler *handler, const Address &address);
// 私有成员(推测):
// int _sockfd; // 套接字文件描述符
// TcpHandler* _handler; // 上层协议处理器
// Address _address; // 服务器地址
// bool _connected; // 连接状态
// std::vector<char> _send_buffer; // 发送缓冲区
// std::vector<char> _recv_buffer; // 接收缓冲区
};
/**
* @brief 登录认证信息
*
* 封装RabbitMQ连接认证所需的用户名和密码
*
* RabbitMQ认证机制:
* - 默认使用PLAIN机制(明文传输,但在TLS加密下安全)
* - 也支持其他SASL机制(如AMQPLAIN、EXTERNAL等)
*/
class Login {
public:
/**
* @brief 构造登录信息
* @param user 用户名
* @param password 密码
*
* 默认用户名密码(开发环境):
* - 用户名: guest
* - 密码: guest
*
* 生产环境建议:
* - 创建独立用户,设置强密码
* - 配置权限(读、写、配置)
* - 不要使用guest(默认只允许localhost连接)
*/
Login(std::string user, std::string password);
// 私有成员(推测):
// std::string _username;
// std::string _password;
};
/**
* @brief AMQP连接(应用层连接)
*
* 在TcpConnection之上,实现AMQP 0-9-1协议的连接层
*
* 连接建立流程(AMQP握手):
* 1. TCP连接建立
* 2. 客户端发送Protocol Header("AMQP 0-9-1")
* 3. 服务器发送Connection.Start(包含认证机制)
* 4. 客户端发送Connection.StartOk(携带Login信息)
* 5. 服务器发送Connection.Tune(协商参数:心跳、帧最大大小等)
* 6. 客户端发送Connection.TuneOk
* 7. 客户端发送Connection.Open(指定vhost)
* 8. 服务器发送Connection.OpenOk → 连接就绪
*
* 连接参数:
* - 心跳间隔:保持连接活跃,检测死连接
* - 帧最大大小:每个AMQP帧的最大字节数(默认131072)
* - 通道最大数:允许创建的最大Channel数(默认2047)
*/
class Connection {
public:
/**
* @brief 建立AMQP连接
* @param handler 连接事件处理器(监听连接状态变化)
* @param login 登录认证信息
* @param vhost 虚拟主机(类似命名空间,隔离资源)
*
* vhost的作用:
* - 逻辑隔离不同的业务或租户
* - 独立的交换机、队列、绑定关系
* - 独立的权限配置
*
* 示例:
* Connection conn(&myHandler, Login("admin", "123"), "/production");
*/
Connection(
ConnectionHandler *handler,
const Login &login,
const std::string &vhost
);
/**
* @brief 关闭连接
* @return true: 关闭成功 false: 关闭失败
*
* 优雅关闭流程:
* 1. 发送Connection.Close(AMQP协议)
* 2. 等待服务器回复Connection.CloseOk
* 3. 关闭TCP连接
* 4. 清理资源(释放Channel、队列等)
*
* 注意:
* - 关闭前应该先关闭所有Channel
* - 未确认的消息可能会丢失
*/
bool close();
// 其他推测的方法(未在代码中):
// Channel* createChannel(); // 创建通道(执行具体操作)
// bool isOpen(); // 检查连接是否正常
// void setHeartbeatInterval(); // 设置心跳间隔
private:
// 私有成员(推测):
// TcpConnection _tcp_conn; // 底层TCP连接
// ConnectionHandler* _handler; // 连接事件处理器
// std::string _vhost; // 虚拟主机
// Login _login; // 认证信息
// int _channel_max; // 最大通道数
// int _frame_max; // 最大帧大小
// int _heartbeat; // 心跳间隔(秒)
// std::map<int, Channel*> _channels; // 通道映射表
};
3.3.2.3 Channel
channel是一个虚拟连接,一个连接上可以建立多个通道。并且所有的RabbitMq指令都是通过channel传输,所以连接建立后的第一步,就是建立channel。因为所有操作是异步的,所以在channel上执行指令的返回值并不能作为操作执行结果,实际上它返回的是Deferred类,可以使用它安装处理函数。
namespace AMQP {
/**
* 通用回调函数类型
*/
using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;
// 队列的声明的成功回调函数
using QueueCallback = std::function<void(
const std::string &name,
uint32_t messagecount,
uint32_t consumecount)>;
// 队列删除回调函数
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
// 消息处理回调函数
using MessageCallback = std::function<void(
const Message &message, // 收到的消息
uint64_t deliveryTag, // 消息的唯一标识
bool redelivered)>; // 指示消息是否是被重新投递的
// 当启用发布者确认时,当服务器确认消息已被接收和处理时,将调用AckCallback
using AckCallback = std::function<void(
uint64_t deliveryTag, // 消息的唯一标识符
bool multiple)>; // 指示确认范围:false确认单个消息,true确认所有消费的该消息
// 消息被RabbitMQ成功接收和处理后调用的回调函数
using PublishAckCallback = std::function<void()>;
// 消息被RabbitMQ明确拒绝(nacked)时调用的回调函数
using PublishNackCallback = std::function<void()>;
// 消息丢失时调用的回调函数
using PublishLostCallback = std::function<void()>;
}
这些回调函数使得 AMQP-CPP 库能够在消息发布后提供反馈,帮助应用程序实现更可靠的消息传递机制。通过实现这些回调,开发者可以对消息的发布结果进行适当的处理,确保消息传递的可靠性或进行错误恢复。
extern const int durable;
extern const int autodelete;
extern const int active;
extern const int passive;
extern const int ifunused;
extern const int ifempty;
extern const int global;
extern const int nolocal;
extern const int noack;
extern const int exclusive;
extern const int nowait;
extern const int mandatory;
extern const int immediate;
extern const int redelivered;
extern const int multiple;
extern const int requeue;
extern const int readable;
extern const int writable;
extern const int internal;
namespace AMQP {
enum ExchangeType
{
fanout, // 广播交换,绑定的队列都能拿到消息
direct, // 直接交换,只将消息交给routingkey一致的队列
topic, // 主题交换,将消息交给符合bindingkey规则的队列
headers,
consistent_hash,
message_deduplication
};
}
class Channel {
Channel(Connection *connection);
Deferred &close(); // 关闭当前channel
Deferred &confirm(); // confirmSelect()
Deferred &startTransaction();
Deferred &rollbackTransaction();
/**
* 声明交换机
* 以下flags可用于交换机:
* - durable 持久化,重启后交换机依然有效
* - autodelete 删除所有连接的使用者都离开时,自动删除交换
* - passive 仅被动检查交换机是否存在
* - internal 创建内部交换
*
* 此函数返回一个延迟处理程序。可以安装回调onSuccess(), onError() and onFinalize() methods.
*/
Deferred &declareExchange(
const std::string_view &name, // 交换机名称
ExchangeType type, // 交换机类型
int flags // 标志位
);
/**
* int flags
* flags可以是以下值的组合:
* - durable 持久队列,服务器重启后仍然有效
* - autodelete 当所有连接的使用者都离开时,自动删除队列
* - exclusive 仅被动检查队列是否存在,并且在连接断开时自动删除
*
* 可安装回调onSuccess(), onError() and onFinalize()方法。
*/
Deferred &declareQueue(
const std::string &name, // 队列名称
int flags // 队列标志位
);
/**
* 将队列绑定到交换机
* 可以安装回调onSuccess()、onError()和onFinalize()方法。
*/
Deferred &bindQueue(
const std::string_view &exchange, // 交换机名称
const std::string_view &queue, // 队列名称
const std::string_view &bindingkey // 绑定Key
);
Deferred &unbindQueue(
const std::string_view &exchange,
const std::string_view &queue,
const std::string_view &bindingkey
);
/**
* 删除队列
* flag
* - ifunused 仅在队列没有连接者时删除
* - ifempty 仅在队列为空时删除
*/
DeferredDelete &removeQueue(
const std::string_view &name,
int flags = 0
);
/**
* 发布消息
* flags参数指定如果消息无法路由由到队列时应该发生的情况。
* 默认情况下,不可更改的消息将被静默地丢弃。
* 发布之前,请确保您已经调用了recall()方法,并设置了所有适当的处理程序来处理这些返回消息。
* flags:
* - mandatory 如果设置,服务器将返回未发送到队列的消息
* - immediate 如果设置,服务器将返回无法立即转发给使用者的消息
*/
bool publish(
std::string_view &exchange, // 交换机名称
const std::string_view &routingKey, // 路由Key
const std::string &message, // 消息内容
int flags = 0 // 标志位
);
/**
* 当您将publish()方法与"immediate"或"mandatory"标志结合使用时,
* rabbitmq会发回不可路由的消息。
* DeferredRecall &recall();
*/
DeferredRecall &recall();
/**
* 订阅队列消息
* 消费者标识用于通过channel::cancel()调用取消订阅
* 支持以下flags:
* - nolocal 如果设置了,则不会向消费者在此通道上发布的消息
* - noack 如果设置了,则不必对已消费的消息进行确认
* - exclusive 独占访问,不允许其他消费者
* 可以安装回调onSuccess()、onError()和onFinalize()方法。
*/
DeferredConsumer &consume(
const std::string_view &queue, // 队列名称
const std::string_view &tag, // 消费者标识
int flags = 0
);
/**
* 取消订阅
* flags:
* - noack 消费消息不需要确认,也就是自动确认
*/
DeferredCancel &cancel(const std::string_view &tag);
/**
* 从MQ检索单条消息
*/
DeferredGet &get(const std::string_view &queue, int flags = 0);
/**
* 消费者确认接收到的消息
* 当在RabbitMQ中从队列中接收(使用noack中接收消费消息时,必须确认该消息,以便从队列中删除)
* flag: multiple 确认多条消息:之前传递的所有未确认的消息也会被确认
*/
bool ack(uint64_t deliveryTag, int flags=0);
/**
* 消费者拒绝消息
* flag: multiple 如果设置,则之前传递的所有未确认的消息也都是未确认的
* requeue: 如果已设置,则消息将放回队列中,否则将被删除
*/
bool reject(uint64_t deliveryTag, int flags=0);
/**
* 回复所有没有被确认的消息
* 如果requeue没有被设置为true,服务器将重新对消息进行排队,因此也可能最终到达不同的消费者
*/
Deferred &recover(int flags = 0);
};
3.3.2.4 Reliable
用于可靠的消息发布及错误处理。
template <typename BASE=Tagger>
class Reliable : public BASE
{
template <typename ...Args>
Reliable(Args &&...args)
DeferredPublish &publish(
const std::string_view &exchange,
const std::string_view &routingKey,
const std::string_view &message,
int flags = 0)
}
// 样例:
AMQP::TcpChannel mychannel(connection);
AMQP::Reliable reliable(mychannel);
reliable.publish("my-exchange", "my-key", "my first message")
.onAck()
.onNack()
.onLost()
.onError();
3.3.2.5 Message
用于订阅者收到消息后的处理。
class Envelope : public MetaData{
const char *body()
uint64_t bodySize()
}
class Message : public Envelope{
const std::string &exchange()
const std::string &routingkey()
}
3.3.2.6 Deferred
Deferred 类(包括其派生类,如 DeferredQueue、DeferredConsumer 等)是处理异步操作的核心机制。它用于表示一个尚未完成的操作,用于设置异步调用的回调函数,进行接口的连续调用。
class Deferred
{
Deferred &onSuccess(const SuccessCallback& callback)
Deferred &onError(const ErrorCallback& callback)
Deferred &onFinalize(const FinalizeCallback& callback)
}
示例:声明交换机
// channel.declareExchange() 接口返回的是Deferred对象
// 因此,可以连续调用onSuccess设置成功处理回调函数以及onError失败处理回调函数
channel.declareExchange(exchange, AMQP::ExchangeType::direct)
.onSuccess([&](){
std::cout << "声明交换机成功:" << exchange << std::endl;
})
.onError([&](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
});
3.3.2.7 DeferredQueue
Channel 中队列相关操作接口的返回对象,用于设置队列操作的相关回调函数。
class DeferredQueue : public Deferred
{
DeferredQueue &onSuccess(const QueueCallback& callback)
DeferredQueue &onSuccess(const SuccessCallback& callback)
}
using QueueCallback = std::function<void(const std::string &name,
uint32_t messageCount,
uint32_t consumerCount)>;
示例:声明队列
// channel.declareQueue() 接口返回的是DeferredQueue对象
channel.declareQueue(queue)
.onSuccess([&](const std::string &name,
uint32_t messagecount,
uint32_t consumercount){
std::cout << "声明队列成功:" << queue << std::endl;
std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
})
.onError([&](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
abort(); // 终止程序
});
3.3.2.8 DeferredConsumer
Channel 中订阅相关操作接口的返回对象,用于设置订阅的相关回调函数。
class DeferredConsumer : public Deferred {
DeferredConsumer &onSuccess(const ConsumeCallback& callback)
// 收到消息的回调函数
DeferredConsumer &onReceived(const MessageCallback& callback)
/* Alias for onReceived() */
DeferredConsumer &onMessage(const MessageCallback& callback)
// 取消订阅的回调
DeferredConsumer &onCancelled(const CancelCallback& callback)
}
using MessageCallback = std::function<void(const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)>;
示例:订阅队列消息
// channel.consume() 接口返回的是DeferredConsumer对象
channel.consume(queue)
.onMessage([&](const AMQP::Message &message, // 收到的消息
uint64_t deliveryTag, // 消息的唯一标识
bool redelivered){
std::string body(message.body(), message.bodySize());
std::cout << "收到消息:" << body << std::endl;
// 收到消息进行处理后,不要忘了对消息进行确认
channel.ack(deliveryTag);
})
.onError([&](const char *message) {
std::cout << "订阅队列消息失败:" << message << std::endl;
abort(); // 终止程序
})
.onSuccess([&](const std::string_view &tag){
std::cout << "订阅队列消息成功:" << std::endl;
});
3.3.3 扩展网络库 libev
3.3.3.1 LibEvHandler
amqpcpp 库与 libev 网络库的对接类
#include <amqpcpp/libev.h>
#include <ev.h>
class LibEvHandler : public TcpHandler{
LibEvHandler(struct ev_loop *loop, int priority = 0)
};
3.3.3.2 ev_loop
libev 网络通信库的核心使用接口。
typedef struct ev_async
{
EV_WATCHER (ev_async)
EV_ATOMIC_T sent; /* private */
} ev_async;
// break type
enum {
EVBREAK_CANCEL = 0, /* undo unloop */
EVBREAK_ONE = 1, /* unloop once */
EVBREAK_ALL = 2 /* unloop all loops */
};
// 创建libev操作句柄
struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0))
#define EV_DEFAULT ev_default_loop (0)
void ev_loop_destroy (struct ev_loop *loop)
// 开始事件循环 --- 阻塞接口
int ev_run (struct ev_loop *loop);
// 退出事件循环
void ev_break (struct ev_loop *loop, int32_t break_type);
// 异步事件回调函数
// 也就是每个外部针对ev的操作都必须在evloop所在线程内执行
// 这里相当打包了一个任务,加入到evloop的任务队列中进行执行
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents)
// 初始化一个异步任务
void ev_async_init(ev_async *w, callback cb);
// 启动loop的异步任务监测,这样就可以接收任务进行处理了
void ev_async_start(struct ev_loop *loop, ev_async *w);
// 发送异步任务到loop中进行执行
void ev_async_send(struct ev_loop *loop, ev_async *w);
// 示例:异步退出
void mycb(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
ev_break(loop, EVBREAK_ALL);
}
ev_async ea;
ev_async_init(&ea, mycb);
ev_async_start(loop, &ea);
ev_async_send(loop, &ea);
3.4 样例
3.4.1 简单消息传输
3.4.1.1 目录结构
$ tree
.
├── makefile
├── simple_publish.cc
└── simple_subscribe.cc
3.4.1.2 样例代码
simple_publish.cc
/**
* @file rabbitmq_producer.cpp
* @brief RabbitMQ 生产者示例 - 使用 Direct Exchange 发送消息
*
* 依赖库:
* - libev: 高性能事件循环库
* - amqpcpp: RabbitMQ 的 C++ 客户端库
* - amqpcpp-libev: amqpcpp 与 libev 的适配器
*
* 编译命令:
* g++ -o producer rabbitmq_producer.cpp -lev -lamqpcpp -lpthread
*/
#include <amqpcpp.h> // AMQP 协议核心库
#include <ev.h> // libev 事件循环库
#include <amqpcpp/libev.h> // AMQP-CPP 的 libev 适配器
#include <iostream> // 标准输入输出
int main()
{
// ==================== 0. 配置参数 ====================
// RabbitMQ 服务器连接 URL(标准 AMQP 格式)
const std::string url = "amqp://admin:123456@192.168.65.128:5672/";
// URL 格式解析:
// amqp:// - 协议类型(也支持 amqps:// 表示 TLS 加密)
// admin - 用户名
// 123456 - 密码
// 192.168.65.128 - RabbitMQ 服务器 IP
// 5672 - 端口号(默认 5672,TLS 是 5671)
// / - 虚拟主机(默认根路径)
const std::string exchange = "my-exchange"; // 交换机名称
const std::string queue = "my-queue"; // 队列名称
const std::string binding_key = "my-binding-key"; // 绑定键(路由键)
// ==================== 1. 初始化网络通信层 ====================
/**
* @brief libev 事件循环
*
* EV_DEFAULT 是一个宏,返回默认的事件循环实例
* libev 会管理所有套接字的 I/O 事件(可读、可写、错误等)
* 类似于 epoll/select,但更高效且跨平台
*/
auto *ev_loop = EV_DEFAULT;
/**
* @brief AMQP-CPP 的 libev 适配器
*
* 将 AMQP-CPP 的网络事件转换为 libev 事件
* 当有网络事件发生时(如连接建立、数据到达),会通过这个 handler 通知 AMQP-CPP
*/
AMQP::LibEvHandler handler(ev_loop);
// ==================== 2. 建立 TCP 连接 ====================
/**
* @brief TCP 连接对象
*
* 负责与 RabbitMQ 服务器建立底层的 TCP 连接
* 内部处理:
* - TCP 三次握手
* - AMQP 协议头协商
* - 连接认证(用户名/密码)
* - 心跳保活
* - 断线重连(可选)
*/
AMQP::TcpConnection connection(&handler, AMQP::Address(url));
// ==================== 3. 创建通道 ====================
/**
* @brief AMQP 通道
*
* 通道是 AMQP 协议中的重要概念:
* - 一个 TCP 连接可以包含多个通道(多路复用)
* - 每个通道独立工作,互不干扰
* - 通道是执行 AMQP 命令的上下文(声明交换机、队列,发送消息等)
*
* 典型应用:
* - 通道1:处理订单消息
* - 通道2:处理用户通知
* - 通道3:处理日志消息
*
* 优势:避免为每种业务创建单独的 TCP 连接,节省资源
*/
AMQP::TcpChannel channel(&connection);
// ==================== 4. 声明交换机 ====================
/**
* @brief 声明交换机(异步操作)
*
* declareExchange() 会发送 AMQP 命令到 RabbitMQ
* 操作是异步的,需要注册回调处理结果
*
* 交换机类型(ExchangeType):
* - direct : 完全匹配路由键(精确路由)
* - fanout : 广播到所有绑定的队列(忽略路由键)
* - topic : 通配符匹配路由键(* 匹配一个词,# 匹配多个词)
* - headers : 根据消息头部匹配(不依赖路由键)
*/
channel.declareExchange(exchange, AMQP::ExchangeType::direct)
/**
* @brief 声明成功的回调
*
* 执行顺序:在 RabbitMQ 确认交换机创建成功后调用
*/
.onSuccess([&](){
std::cout << "声明交换机成功:" << exchange << std::endl;
// ==================== 5. 声明队列 ====================
/**
* @brief 声明队列
*
* 队列是消息的存储容器
* 特性:
* - 先进先出(FIFO)
* - 支持持久化(服务器重启后消息不丢失)
* - 支持排他(只能被当前连接使用)
* - 支持自动删除(没有消费者时自动删除)
*
* 回调参数:
* - name : 队列名称(RabbitMQ 可能自动生成)
* - messagecount : 队列中积压的消息数量
* - consumercount: 当前正在消费的消费者数量
*/
channel.declareQueue(queue)
.onSuccess([&](const std::string &name,
uint32_t messagecount,
uint32_t consumercount){
std::cout << "声明队列成功:" << queue << std::endl;
std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
// ==================== 6. 绑定交换机和队列 ====================
/**
* @brief 绑定队列到交换机
*
* 绑定关系决定了消息如何从交换机路由到队列
*
* 对于 direct 交换机:
* - 消息的 routing_key 必须 == binding_key 才会被路由到该队列
*
* 对于 fanout 交换机:
* - 忽略 binding_key,所有绑定的队列都收到消息
*
* 对于 topic 交换机:
* - 支持通配符匹配(如 *.stock.*, #.error)
*
* 一个队列可以用多个 binding_key 绑定到同一个交换机
* 一个交换机也可以绑定到多个队列
*/
channel.bindQueue(exchange, queue, binding_key)
.onSuccess([&](){
std::cout << "绑定交换机和队列成功:" << std::endl;
// ==================== 7. 发送消息 ====================
/**
* @brief 发布消息到交换机
*
* @param exchange : 目标交换机名称
* @param routing_key : 路由键(决定消息去哪个队列)
* @param message : 消息内容(字节流)
* @param flags : 消息属性(可选)
*
* 消息属性(flags)示例:
* - AMQP::mandatory : 如果没有队列接收,返回消息给生产者
* - AMQP::immediate : 如果没有消费者,立即返回(已废弃)
*
* 消息可以携带自定义属性(Message Properties):
* - contentType : MIME 类型(如 application/json)
* - deliveryMode : 持久化模式(1=非持久,2=持久)
* - priority : 优先级(0-9)
* - correlationId : 关联 ID(用于 RPC 调用)
* - replyTo : 回复队列名称(用于 RPC 调用)
* - expiration : 消息过期时间(毫秒)
* - messageId : 消息唯一标识(用于去重)
* - timestamp : 时间戳
* - userId : 用户 ID(需要授权)
* - appId : 应用 ID
*/
bool ret = channel.publish(exchange,
binding_key, "Hello World");
if(ret == false) {
std::cout << "发送消息失败" << std::endl;
} else {
std::cout << "发送消息成功: Hello World" << std::endl;
}
})
/**
* @brief 绑定失败的回调
*
* @param message 错误描述信息
*/
.onError([&](const char *message) {
std::cout << "绑定交换机和队列失败:" << message << std::endl;
abort(); // 终止程序
});
})
.onError([&](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
abort(); // 终止程序
});
})
/**
* @brief 声明交换机失败的回调
*/
.onError([&](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
abort(); // 终止程序
});
// ==================== 8. 启动事件循环 ====================
/**
* @brief 运行 libev 事件循环
*
* 这是一个阻塞调用,会一直运行直到:
* - ev_break() 被调用
* - 程序收到终止信号
* - 发生致命错误
*
* 事件循环处理的任务:
* - 监听套接字的 I/O 事件
* - 执行定时器(如心跳检测)
* - 执行异步回调函数
*
* 注意:所有 AMQP-CPP 的回调函数都在这个事件循环中执行
* 因此不要在回调中执行耗时操作,否则会阻塞网络处理
*/
ev_run(ev_loop);
/**
* @note 代码不会执行到这里,因为 ev_run 会阻塞
* 正常退出需要在回调中调用 ev_break(ev_loop)
*/
return 0;
}
simple_subscribe.cc
#include <amqpcpp.h>
#include <ev.h>
#include <amqpcpp/libev.h>
#include <iostream>
int main()
{
// 0. 定义rabbitmq容器的访问url
const std::string url = "amqp://admin:123456@192.168.65.128:5672/";
const std::string exchange = "my-exchange";
const std::string queue = "my-queue";
const std::string binding_key = "my-binding-key";
// 1. 实例化libev网络通信模块句柄
auto *ev_loop = EV_DEFAULT; // 网络通信事件循环句柄
AMQP::LibEvHandler handler(ev_loop); // 与amqpcpp库结合的句柄
// 2. 实例化Connection对象//与rabbitmq服务器的连接对象
AMQP::TcpConnection connection(&handler, AMQP::Address(url));
// 3. 实例化Channel对象
AMQP::TcpChannel channel(&connection); // 与rabbitmq服务器的通信通道对象
// 4. 声明交换机: 声明一个交换机, direct类型(直接交换)
channel.declareExchange(exchange, AMQP::ExchangeType::direct)
.onSuccess([&](){
std::cout << "声明交换机成功:" << exchange << std::endl;
// 5. 声明队列
channel.declareQueue(queue)
.onSuccess([&](const std::string &name,
uint32_t messagecount,
uint32_t consumercount){
std::cout << "声明队列成功:" << queue << std::endl;
std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
// 6. 绑定交换机和队列(采用直接交换)
channel.bindQueue(exchange, queue, binding_key)
.onSuccess([&](){
std::cout << "绑定交换机和队列成功:" << std::endl;
// 7. 订阅队列消息
channel.consume(queue)
.onMessage([&](const AMQP::Message &message, // 收到的消息
uint64_t deliveryTag, // 消息的唯一标识
bool redelivered){
std::string body(message.body(), message.bodySize());
std::cout << "收到消息:" << body << std::endl;
// 收到消息进行处理后,不要忘了对消息进行确认
channel.ack(deliveryTag);
})
.onError([&](const char *message) {
std::cout << "订阅队列消息失败:" << message << std::endl;
abort(); // 终止程序
})
.onSuccess([&](){
std::cout << "订阅队列消息成功:" << std::endl;
});
})
.onError([&](const char *message) {
std::cout << "绑定交换机和队列失败:" << message << std::endl;
abort(); // 终止程序
});
})
.onError([&](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
abort(); // 终止程序
});
})
.onError([&](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
abort(); // 终止程序
});
// 8. 启动事件循环
ev_run(ev_loop);
return 0;
}
3.4.1.3 项目构建
all: simple_publish simple_subscribe
simple_publish: simple_publish.cc
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev -lpthread
simple_subscribe: simple_subscribe.cc
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev -lpthread
clean:
rm -rf simple_publish simple_subscribe
3.4.1.4 运行演示
订阅端
dev@fc56006988fe:~/example/amqpcpp$ ./simple_subscribe
声明交换机成功:my-exchange
声明队列成功:my-queue
队列中有0个消息
队列中有0个消费者
绑定交换机和队列成功:
订阅队列消息成功:
收到消息:Hello World
发布端
dev@fc56006988fe:~/example/amqpcpp$ ./simple_publish
声明交换机成功:my-exchange
声明队列成功:my-queue
队列中有0个消息
队列中有1个消费者
绑定交换机和队列成功:
3.4.2 死信队列
3.4.2.1 要素
上面我们已经完成了 AMQP-CPP 第三方库的基础使用实践与功能测试。下面我们将重点带大家学习死信队列相关知识。首先要明确的核心概念是 “死信”,死信队列的核心包含三个关键要素,第一个便是死信消息(又称死信)。死信消息指的是当一条消息满足特定条件时,就会成为无处安放的 “死信”,具体满足的条件有三类:
第一,消息被订阅客户端拒绝处理,代理服务器将消息推送给订阅端后,订阅端明确拒绝处理该消息,这条消息就会变为死信;
第二,消息过期,若创建队列时为队列中的消息设置了过期时间,而消息在规定时间内未被消费,就会成为死信(上面的简单示例中未设置过期时间,因此消息不会过期);
第三,队列达到最大节点数量且已存满,此时再发布新消息,新消息因无处存放会变为死信。
为了处理这些 “无处安放” 的死信消息,避免其被随意丢弃(尤其是部分死信可能仍有重要价值),便引入了死信交换机和死信队列。死信交换机本质上是一个普通交换机,只是它与另一个常规交换机 / 队列建立了关联;死信队列则是绑定到死信交换机上的普通队列。当消息变为死信后,RabbitMQ 会将其重新发布到预先关联的死信交换机,再由死信交换机通过绑定的路由键,将死信消息路由到死信队列中存储,应用程序可通过消费死信队列中的消息来处理这些死信。值得一提的是,死信队列的核心应用场景之一是实现延时队列 —— 在 RabbitMQ 高版本虽提供了延时队列插件,但早期版本的延时功能正是基于死信队列的思想实现的,比如数据库与缓存数据同步等场景中,延时队列就有着广泛应用。
死信消息(Dead Letter Message): 当一个消息满足某些特定条件时(消息拒绝、消息过期、队列满了),它就会变成 “死信消息”。
死信交换机(Dead Letter Exchange - DLX): 这是一个普通的交换机。当一个消息变成死信后,RabbitMQ 会将其重新发布(publish)到这个特定的交换机。
死信队列(Dead Letter Queue - DLQ): 这是一个绑定到死信交换机上的普通队列。死信交换机将死信消息(通过绑定的 Routing Key)路由到这个队列进行存储。应用程序可以消费这个队列中的消息来处理这些 “死信”。
3.4.2.2 使用流程
死信队列的使用流程十分清晰,主要分为两步:
第一步,创建普通交换机 A 和普通队列 A 并进行绑定,这一组交换机和队列将作为死信交换机与死信队列使用;
第二步,创建普通交换机 B 和普通队列 B 并进行绑定,这一组作为常规的交换机与队列,关键是在创建队列 B 时,需要额外设置关联的死信交换机(即第一步创建的交换机 A)相关信息,完成关联配置。一旦常规队列 B 中的消息出现被拒绝、过期或队列满导致无法存放的情况,这些消息就会被转发到死信队列 A 中。
3.4.2.3 相关接口与类
/**
* @brief 字段基类
*
* 设计目的:
* 1. 作为所有字段类型的基类,实现多态
* 2. 可能提供通用的字段操作接口(如序列化、比较等)
* 3. 用于构建类似 ORM 的字段映射系统
*
* 典型应用场景:
* - 数据库查询结果的行数据封装
* - 动态配置项的键值对存储
* - RabbitMQ 的消息头/参数封装
*/
class Field {
// 可能包含的虚函数:
// virtual std::string toString() const;
// virtual json toJson() const;
// virtual void fromJson(const json& j);
};
/**
* @brief 表/键值对容器类(继承自 Field)
*
* 设计意图:
* - 模拟数据库表的行数据结构
* - 提供类似 Python 字典或 JavaScript 对象的字段访问方式
* - 支持链式调用,方便构建复杂的参数配置
*
* 使用示例:
* Table config;
* config.set("host", "localhost")
* .set("port", 5672)
* .set("auto_reconnect", true);
*
* std::string host = config["host"].as<std::string>();
* int port = config["port"].as<int>();
*/
class Table : public Field {
public:
/**
* @brief 设置字段值(布尔类型重载)
* @param name 字段名称
* @param value 字段值
* @return Table& 返回自身引用,支持链式调用
*
* 设计模式:Builder 模式 + Fluent Interface
* 优点:可以连续设置多个字段,代码更简洁
*
* 使用示例:
* table.set("enabled", true)
* .set("debug", false)
* .set("persistent", true);
*/
Table &set(const std::string &name, bool value);
// 其他基础类型的重载(未显示但应该存在):
// Table &set(const std::string &name, int value);
// Table &set(const std::string &name, double value);
// Table &set(const std::string &name, const std::string &value);
// Table &set(const std::string &name, const char* value);
// Table &set(const std::string &name, const Table& value); // 嵌套表
/**
* @brief 下标访问运算符重载
* @param name 字段名称
* @return AssociativeFieldProxy 代理对象
*
* 设计模式:Proxy 模式
* 目的:
* 1. 延迟字段类型转换(类似 std::any)
* 2. 支持动态类型(运行时才知道字段是什么类型)
* 3. 实现 operator[] 的读写统一接口
*
* 使用示例:
* // 写入
* table["name"] = "张三";
* table["age"] = 25;
* table["active"] = true;
*
* // 读取(需要调用 as<T>() 转换)
* std::string name = table["name"].as<std::string>();
* int age = table["age"].as<int>();
* bool active = table["active"].as<bool>();
*
* // 判断字段是否存在
* if (table.has("name")) { ... }
*/
AssociativeFieldProxy operator[](const std::string& name);
};
/**
* @brief RabbitMQ 队列/交换机参数配置类
*
* 注意:这里的 Table 类型就是 AMQP::Table
* 它是 AMQP-CPP 库中用于表示 AMQP 表(字段表)的类型
*
* AMQP 0-9-1 协议中的表(Table)是一种键值对数据结构:
* - 键(Key):字符串
* - 值(Value):可以是多种类型(布尔、整数、字符串、数组、嵌套表等)
*
* 死信队列(Dead Letter Queue, DLQ)配置:
* 当消息在队列中变成"死信"时,RabbitMQ 会自动将其转发到指定的死信交换机
*
* 消息变成死信的条件:
* 1. 消息被消费者拒绝(basic.reject 或 basic.nack)且 requeue=false
* 2. 消息过期(TTL 到期)
* 3. 队列长度达到上限,最早的消息被移除
*/
// 死信队列参数配置
AMQP::Table args; // 创建一个空表,用于存储队列参数
/**
* @brief 设置死信交换机
*
* x-dead-letter-exchange(DLX):Dead Letter Exchange
* - 作用:指定消息变成死信后转发到哪个交换机
* - 类型:字符串
* - 必需:是(如果不设置,死信消息会被直接丢弃)
*
* 工作原理:
* 当队列中的消息变成死信时,RabbitMQ 会:
* 1. 自动将消息发布到指定的死信交换机
* 2. 使用原消息的路由键(除非指定了 x-dead-letter-routing-key)
* 3. 死信交换机根据路由键将消息路由到死信队列
*/
args["x-dead-letter-exchange"] = "xxx";
// ↑
// 死信交换机名称(需要提前声明)
/**
* @brief 设置死信路由键
*
* x-dead-letter-routing-key:Dead Letter Routing Key
* - 作用:覆盖死信消息的原始路由键
* - 类型:字符串
* - 必需:否(如果不设置,使用原消息的路由键)
*
* 使用场景:
* 1. 统一路由:将所有死信都路由到同一个队列
* 2. 分类处理:根据死信原因使用不同的路由键
* 3. 调试:给死信消息打上特殊标记
*
* 示例:
* - 设置特定路由键:所有死信都进入同一个死信队列
* - 不设置:死信可能根据原始路由键进入不同的队列
*/
args["x-dead-letter-routing-key"] = "xxx";
// ↑
// 死信路由键(用于绑定交换机和队列)
/**
* @brief 设置消息过期时间(队列级别)
*
* x-message-ttl:Message Time To Live
* - 作用:队列中所有消息的存活时间(毫秒)
* - 类型:整数(32位)
* - 必需:否(默认不设置,消息永不过期)
* - 单位:毫秒(1秒 = 1000毫秒)
*
* 重要特性:
* 1. 队列级别 TTL:所有进入该队列的消息都有相同的过期时间
* 2. 消息级别 TTL:每条消息可以单独设置 expiration 属性
* 3. 优先级:取较小的值(如果同时设置)
*
* 使用场景:
* 1. 实现延时队列:消息过期后进入死信队列,被消费者处理
* 2. 自动清理:临时消息(如验证码)自动过期删除
* 3. 限时任务:超过时间未处理的任务自动失效
*
* 注意事项:
* - 消息过期不会立即删除,而是在即将投递给消费者时检查
* - 过期消息会进入死信队列(如果配置了 DLX)
* - 设置过短的 TTL 可能影响性能(需要频繁检查过期)
*/
args["x-message-ttl"] = 10000; // 10秒 = 10000毫秒
// ↑
// 10秒后消息过期(单位:毫秒)
// ========== 其他常见的队列参数 ==========
// args["x-max-length"] = 1000; // 队列最大消息数量
// args["x-max-size"] = 10485760; // 队列最大总大小(字节)
// args["x-max-priority"] = 10; // 消息优先级(0-10)
// args["x-expires"] = 3600000; // 队列空闲自动删除时间(毫秒)
// args["x-queue-mode"] = "lazy"; // 惰性队列(消息存磁盘)
在代码实现层面,创建队列时的关联配置需通过参数设定,涉及 AMQP-CPP 库中的 Table 类和 Field 类,可通过 Table 类的中括号语法设置队列参数,无需刻意记忆,使用时参考示例复制即可。核心参数包括:x-dead-letter-exchange(指定关联的死信交换机名称)、x-dead-letter-routing-key(指定死信交换机路由到死信队列的绑定键)、x-message-ttl(设置消息过期时间,以毫秒为单位)。
3.4.2.4 样例
接下来我们通过实际代码调整来演示死信队列的使用,实现延时消息功能。我们将创建两个代码文件:dlx_publish.cc(发布端)和 dlx_subscribe.cc(订阅端)。
发布端的实现流程为:
第一步,创建死信交换机 A、死信队列 A 并绑定;
第二步,创建常规交换机 B、常规队列 B 并绑定,创建队列 B 时传入上述死信相关参数(设置 5000 毫秒的消息过期时间);
第三步,向常规交换机 B 发布消息,消息过期后会自动转发到死信交换机 A,再路由到死信队列 A。订阅端的实现则更为简单,只需创建并绑定死信交换机 A 和死信队列 A,直接订阅死信队列 A 的消息即可,无需关注常规队列 B 的情况,这样收到的消息就是经过延时(过期转发)的消息。
在实际操作中,需注意几个关键问题:
-
一是确保各类名称(交换机名、队列名、绑定键)完全对应,避免因名称不一致导致路由失败;
-
二是创建常规队列时,务必将配置好的死信参数传入,否则无法实现死信转发;
-
三是订阅操作需放在队列与交换机绑定成功的回调函数中,避免因资源未创建完成导致订阅失败。
通过本次实践,我们成功基于死信队列实现了延时队列功能,大家后面也需动手实践,熟练掌握相关配置与代码编写,为后续缓存相关场景的应用打下基础。
$ tree
.
├── makefile
├── dlx_publish.cc
└── dlx_subscribe.cc
dlx_publish.cc
/**
* @file delayed_queue.cpp
* @brief RabbitMQ 延时队列实现(基于死信交换机 DLX)
*
* 实现原理:
* 1. 创建带 TTL(过期时间)的队列,并关联死信交换机
* 2. 消息发送到延时队列后,等待 TTL 时间过期
* 3. 消息过期后自动转发到死信队列
* 4. 消费者监听死信队列,实现延时消费
*
* 应用场景:
* - 订单超时自动取消(30分钟未支付)
* - 定时任务延迟执行
* - 重试机制(失败后延迟重试)
* - 缓存延迟刷新
*/
#include <amqpcpp.h> // AMQP 协议核心库
#include <ev.h> // libev 事件循环库
#include <amqpcpp/libev.h> // AMQP-CPP 的 libev 适配器
#include <iostream> // 标准输入输出
/**
* @brief 声明交换机、队列并绑定(辅助函数)
*
* 这是一个通用的组件声明函数,用于快速创建 RabbitMQ 的基础组件
*
* @param channel AMQP 通道
* @param exchange 交换机名称
* @param queue 队列名称
* @param binding_key 绑定键(路由键)
*
* @note 这是一个异步函数,通过回调通知结果
*/
void declaredComponent(AMQP::TcpChannel &channel,
const std::string &exchange,
const std::string &queue,
const std::string &binding_key) {
// 1. 声明交换机(direct 类型:完全匹配路由键)
channel.declareExchange(exchange, AMQP::ExchangeType::direct)
.onSuccess([&](){
std::cout << "声明交换机成功:" << exchange << std::endl;
// 2. 声明队列
channel.declareQueue(queue)
.onSuccess([&](const std::string &name,
uint32_t messagecount,
uint32_t consumercount){
std::cout << "声明队列成功:" << queue << std::endl;
std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
// 3. 绑定交换机与队列(建立路由关系)
channel.bindQueue(exchange, queue, binding_key)
.onSuccess([&](){
std::cout << "绑定交换机和队列成功:" << std::endl;
})
.onError([&](const char *message) {
std::cout << "绑定交换机和队列失败:" << message << std::endl;
abort(); // 终止程序
});
})
.onError([&](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
abort(); // 终止程序
});
})
.onError([&](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
abort(); // 终止程序
});
}
int main()
{
// ==================== 0. 配置参数 ====================
// RabbitMQ 服务器地址(注意端口是 5672,不是 15672)
const std::string url = "amqp://admin:123456@192.168.65.130:5672/";
// 延时队列组件(用于接收消息,等待过期)
const std::string delayed_exchange = "delayed-exchange";
const std::string delayed_queue = "delayed-queue";
const std::string delayed_binding_key = "delayed-binding-key";
// 死信队列组件(用于接收过期后的消息)
const std::string dlx_exchange = "dlx-exchange";
const std::string dlx_queue = "dlx-queue";
const std::string dlx_binding_key = "dlx-binding-key";
// ==================== 1. 初始化网络层 ====================
auto *ev_loop = EV_DEFAULT; // libev 事件循环
AMQP::LibEvHandler handler(ev_loop); // AMQP 事件处理器
AMQP::TcpConnection connection(&handler, AMQP::Address(url)); // TCP 连接
AMQP::TcpChannel channel(&connection); // AMQP 通道
// ==================== 2. 创建死信组件 ====================
/**
* 死信交换机(DLX)和死信队列的作用:
* - 当延时队列中的消息过期后,RabbitMQ 会自动将其转发到这里
* - 消费者应该监听这个死信队列,而不是延时队列
*
* 架构图:
* ┌─────────────┐ TTL=5s ┌─────────────┐ 过期后 ┌─────────────┐
* │ 生产者 │ ──────────→ │ 延时队列 │ ──────────→ │ 死信队列 │
* │ │ 发送消息 │ (delayed) │ 自动转发 │ (DLX) │
* └─────────────┘ └─────────────┘ └─────────────┘
* │
* ↓
* ┌─────────────┐
* │ 消费者 │
* │ (延时处理) │
* └─────────────┘
*/
declaredComponent(channel, dlx_exchange, dlx_queue, dlx_binding_key);
// ==================== 3. 创建延时队列(带死信配置) ====================
/**
* 延时队列的核心配置:
* - x-dead-letter-exchange: 指定死信转发目标交换机
* - x-dead-letter-routing-key: 指定死信消息的路由键
* - x-message-ttl: 消息存活时间(毫秒),过期后自动进入死信
*
* 注意:这个队列的消费者不是直接消费它!
* 而是消费它绑定的死信队列
*/
channel.declareExchange(delayed_exchange, AMQP::ExchangeType::direct)
.onSuccess([&](){
std::cout << "声明交换机成功:" << delayed_exchange << std::endl;
// 配置队列参数(关键!)
AMQP::Table args;
/**
* @brief x-dead-letter-exchange 参数
* 作用:消息过期后,转发到哪个交换机
*/
args["x-dead-letter-exchange"] = dlx_exchange;
/**
* @brief x-dead-letter-routing-key 参数
* 作用:转发时使用什么路由键(覆盖原消息的路由键)
*
* 如果不设置,会使用原消息的路由键
* 这里设置为死信队列的绑定键,确保消息能正确路由到死信队列
*/
args["x-dead-letter-routing-key"] = dlx_binding_key;
/**
* @brief x-message-ttl 参数
* 作用:消息在队列中的存活时间(毫秒)
*
* 这里设置为 5000ms = 5 秒
* 消息进入此队列 5 秒后会自动过期,进入死信队列
*
* 常见 TTL 配置:
* - 订单超时:30分钟 = 1800000ms
* - 短信验证码:60秒 = 60000ms
* - 重试延迟:1秒 = 1000ms
*/
args["x-message-ttl"] = 5000; // 5秒过期
// 声明带参数的队列
channel.declareQueue(delayed_queue, args)
.onSuccess([&](const std::string &name,
uint32_t messagecount,
uint32_t consumercount){
std::cout << "声明队列成功:" << delayed_queue << std::endl;
std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
// 绑定延时交换机和延时队列
channel.bindQueue(delayed_exchange, delayed_queue, delayed_binding_key)
.onSuccess([&](){
std::cout << "绑定交换机和队列成功:" << std::endl;
// ==================== 4. 发送测试消息 ====================
/**
* 发送消息到延时队列
*
* 流程:
* 1. 消息进入 delayed_queue
* 2. 等待 5 秒(TTL)
* 3. 消息过期
* 4. 自动转发到 dlx_exchange
* 5. 根据 dlx_binding_key 路由到 dlx_queue
* 6. 消费者从 dlx_queue 消费消息
*
* 最终效果:消息从发送到被消费,延迟了 5 秒
*/
bool ret = channel.publish(delayed_exchange, delayed_binding_key, "hello world");
if(ret == false) {
std::cout << "发布消息到延时队列失败" << std::endl;
} else {
std::cout << "消息已发送到延时队列,将在 5 秒后进入死信队列" << std::endl;
}
})
.onError([&](const char *message) {
std::cout << "绑定交换机和队列失败:" << message << std::endl;
abort();
});
})
.onError([&](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
abort();
});
})
.onError([&](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
abort();
});
// ==================== 5. 启动事件循环 ====================
/**
* 事件循环会一直运行,处理:
* - 网络 I/O 事件
* - 定时器(心跳)
* - 异步回调函数
*
* 由于所有操作都是异步的,程序不会立即退出
*/
ev_run(ev_loop);
return 0;
}
dlx_subscribe.cc
#include <amqpcpp.h>
#include <ev.h>
#include <amqpcpp/libev.h>
#include <iostream>
void declaredComponent(AMQP::TcpChannel &channel,
const std::string &exchange,
const std::string &queue,
const std::string &binding_key) {
channel.declareExchange(exchange, AMQP::ExchangeType::direct)
.onSuccess([&](){
std::cout << "声明交换机成功:" << exchange << std::endl;
// 5. 声明队列
channel.declareQueue(queue)
.onSuccess([&](const std::string &name,
uint32_t messagecount,
uint32_t consumercount){
std::cout << "声明队列成功:" << queue << std::endl;
std::cout << "队列中有 " << messagecount << " 个消息" << std::endl;
std::cout << "队列中有 " << consumercount << " 个消费者" << std::endl;
// 6. 绑定交换机和队列(采用直接交换)
channel.bindQueue(exchange, queue, binding_key)
.onSuccess([&](){
std::cout << "绑定交换机和队列成功:" << std::endl;
// 7. 订阅队列消息
channel.consume(queue)
.onMessage([&](const AMQP::Message &message, // 收到的消息
uint64_t deliveryTag, // 消息的唯一标识
bool redelivered){
std::string body(message.body(), message.bodySize());
std::cout << "收到消息:" << body << std::endl;
// 收到消息进行处理后,不要忘了对消息进行确认
channel.ack(deliveryTag);
})
.onError([&](const char *message) {
std::cout << "订阅队列消息失败:" << message << std::endl;
abort(); // 终止程序
})
.onSuccess([&](){
std::cout << "订阅队列消息成功:" << std::endl;
});
})
.onError([&](const char *message) {
std::cout << "绑定交换机和队列失败:" << message << std::endl;
abort(); // 终止程序
});
})
.onError([&](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
abort(); // 终止程序
});
})
.onError([&](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
abort(); // 终止程序
});
}
int main()
{
const std::string url = "amqp://admin:123456@192.168.65.130:5672/";
const std::string dlx_exchange = "dlx-exchange";
const std::string dlx_queue = "dlx-queue";
const std::string dlx_binding_key = "dlx-binding-key";
auto *ev_loop = EV_DEFAULT;
AMQP::LibEvHandler handler(ev_loop);
AMQP::TcpConnection connection(&handler, AMQP::Address(url));
AMQP::TcpChannel channel(&connection);
// 订阅者只需要和死信队列进行绑定,不用关心死信消息的来源队列
declaredComponent(channel, dlx_exchange, dlx_queue, dlx_binding_key);
ev_run(ev_loop);
return 0;
}
all: dlx_publish dlx_subscribe
dlx_publish:dlx_publish.cc
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev -lpthread
dlx_subscribe:dlx_subscribe.cc
g++ -std=c++17 $^ -o $@ -lamqpcpp -lev -lpthread
clean:
rm -rf dlx_publish dlx_subscribe
订阅端
dev@fc56006988fe:~/example/amqpcpp$ ./dlx_subscribe
声明交换机成功:dlx-exchange
声明队列成功:dlx-queue
队列中有0个消息
队列中有0个消费者
绑定交换机和队列成功:
订阅队列消息成功:
收到消息:hello world # 该消息是延迟5秒后才收到的
发布端
dev@fc56006988fe:~/example/amqpcpp$ ./dlx_publish
声明交换机成功:dlx-exchange
声明交换机成功:delayed-exchange
声明队列成功:delayed-queue
队列中有0个消息
队列中有1个消费者
绑定交换机和队列成功:发布消息到延时队列
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)