【ROS2】ROS 2 中 wait_for_all_acked (消息可靠投递)的简介与使用
【ROS2】ROS 2 中 wait_for_all_acked (消息可靠投递)的简介与使用
1、示例代码
#include <chrono>
#include <functional>
#include <cinttypes>
#include <memory>
#include <string>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
using namespace std::chrono_literals;
/* This example shows how to use wait_for_all_acked for the publisher */
class MinimalPublisher : public rclcpp::Node
{
public:
MinimalPublisher()
: Node("minimal_publisher_with_wait_for_all_acked"),
count_(0),
wait_timeout_(5000)
{
// publisher must set reliable mode
publisher_ = this->create_publisher<std_msgs::msg::String>(
"topic",
rclcpp::QoS(10).reliable());
// call wait_for_all_acked before shutdown
using rclcpp::contexts::get_global_default_context;
get_global_default_context()->add_pre_shutdown_callback(
[this]() {
this->timer_->cancel();
this->wait_for_all_acked();
});
timer_ = this->create_wall_timer(
500ms, std::bind(&MinimalPublisher::timer_callback, this));
}
private:
void wait_for_all_acked()
{
// Confirm all subscribers receive sent messages.
// Note that if no subscription is connected, wait_for_all_acked() always return true.
if (publisher_->wait_for_all_acked(wait_timeout_)) {
RCLCPP_INFO(
this->get_logger(),
"All subscribers acknowledge messages");
} else {
RCLCPP_INFO(
this->get_logger(),
"Not all subscribers acknowledge messages during %" PRId64 " ms",
static_cast<int64_t>(wait_timeout_.count()));
}
}
void timer_callback()
{
auto message = std_msgs::msg::String();
message.data = "Hello, world! " + std::to_string(count_++);
RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
publisher_->publish(message);
// After sending some messages, you can call wait_for_all_acked() to confirm all subscribers
// acknowledge messages.
}
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
size_t count_;
std::chrono::milliseconds wait_timeout_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
auto publisher = std::make_shared<MinimalPublisher>();
rclcpp::spin(publisher);
rclcpp::shutdown();
return 0;
}
2、代码解析
以上代码是 ROS 2 中带 wait_for_all_acked 机制的发布者示例,核心目的是演示如何确保 “所有订阅者都确认收到发布的消息”(消息可靠投递),并在节点关闭前等待订阅者的确认反馈。本文将从整体功能、核心概念、逐模块解析、运行逻辑四个维度,对每个细节和底层原理进行讲解。
2.1、整体功能总结
这个程序实现了一个 ROS 2 节点(minimal_publisher_with_wait_for_all_acked):
- 以 500ms 为周期向 topic 话题发布字符串消息(Hello, world! + 计数);
- 发布者配置为 可靠模式(Reliable)(保证消息必达,而非尽力而为);
- 注册 “预关闭回调函数”:节点关闭前先取消定时器,再调用 wait_for_all_acked 等待所有订阅者确认收到消息;
- wait_for_all_acked 会阻塞最多 5000ms,检查是否所有订阅者都确认了消息:
- 若超时前所有订阅者确认 → 打印成功日志;
- 若超时仍有订阅者未确认 → 打印超时日志;
- 核心特性:确保节点关闭前,已发布的消息被所有订阅者可靠接收(避免节点退出导致消息丢失)。
2.2、核心前置概念
在解析代码前,先理解两个关键概念(ROS 2 可靠通信的核心):
- QoS 可靠模式(Reliable):
- ROS 2 消息投递有两种模式:Reliable(可靠)和 Best Effort(尽力而为);
- Reliable 模式下,发布者会持续重发消息,直到订阅者返回 “确认(ACK)”,保证消息必达;
- Best Effort 模式下,发布者只发一次消息,不关心订阅者是否收到(适合实时性要求高于可靠性的场景,如传感器流)。
- wait_for_all_acked:
- ROS 2 发布者的核心接口,作用是 “阻塞等待,直到所有已连接的订阅者都确认收到了发布的所有消息”;
- 入参是超时时间:若超时前所有订阅者确认 → 返回 true;否则返回 false;
- 特殊情况:若没有订阅者连接 → 直接返回 true(无订阅者无需确认)。
2.3、逐模块代码解析
- 头文件引入(基础依赖)
#include <chrono> // 时间相关(500ms/5000ms 超时)
#include <functional> // 函数绑定(std::bind)、Lambda 回调
#include <cinttypes> // 格式化输出宏(PRId64,用于int64_t打印)
#include <memory> // 智能指针(SharedPtr)
#include <string> // 字符串处理
#include "rclcpp/rclcpp.hpp" // ROS 2 核心 API
#include "std_msgs/msg/string.hpp" // ROS 2 标准字符串消息
重点:cinttypes 是为了兼容不同平台的 int64_t 打印(PRId64 宏),避免直接用 %lld 等平台相关格式符。
- 时间字面量命名空间
using namespace std::chrono_literals;
启用 C++14 时间字面量(如 500ms),简化定时器周期和超时时间的写法。
- 发布者类定义(MinimalPublisher)
class MinimalPublisher : public rclcpp::Node
{
public:
// 构造函数:初始化节点、计数器、超时时间、发布者、回调、定时器
MinimalPublisher()
: Node("minimal_publisher_with_wait_for_all_acked"), // 节点名
count_(0), // 消息计数器初始化
wait_timeout_(5000) // 等待确认的超时时间:5000ms(5秒)
{
// ========== 第一步:创建可靠模式的发布者 ==========
// QoS配置:队列大小10 + 可靠模式(reliable())
publisher_ = this->create_publisher<std_msgs::msg::String>(
"topic", // 话题名
rclcpp::QoS(10).reliable() // QoS策略:可靠模式(必须!wait_for_all_acked仅对可靠模式生效)
);
// ========== 第二步:注册预关闭回调函数 ==========
// 作用:节点关闭(rclcpp::shutdown())前,先执行这个Lambda函数
using rclcpp::contexts::get_global_default_context;
get_global_default_context()->add_pre_shutdown_callback(
[this]() { // Lambda表达式,捕获当前节点的this指针
this->timer_->cancel(); // 取消定时器,停止发布新消息
this->wait_for_all_acked(); // 等待所有订阅者确认已收到的消息
});
// ========== 第三步:创建定时器(500ms周期发布消息) ==========
timer_ = this->create_wall_timer(
500ms, std::bind(&MinimalPublisher::timer_callback, this));
}
private:
// ========== 核心方法:等待所有订阅者确认消息 ==========
void wait_for_all_acked()
{
// 调用发布者的wait_for_all_acked,阻塞最多wait_timeout_(5000ms)
if (publisher_->wait_for_all_acked(wait_timeout_)) {
// 成功:所有订阅者都确认了消息
RCLCPP_INFO(
this->get_logger(),
"All subscribers acknowledge messages");
} else {
// 失败:超时,部分订阅者未确认
RCLCPP_INFO(
this->get_logger(),
"Not all subscribers acknowledge messages during %" PRId64 " ms",
static_cast<int64_t>(wait_timeout_.count())); // 打印超时时间(ms)
}
// 注:PRId64是cinttypes的宏,用于安全打印int64_t类型(wait_timeout_.count()返回milliseconds::rep,即int64_t)
}
// ========== 定时器回调函数:发布消息 ==========
void timer_callback()
{
// 1. 创建ROS字符串消息
auto message = std_msgs::msg::String();
// 2. 拼接消息内容:Hello, world! + 计数器
message.data = "Hello, world! " + std::to_string(count_++);
// 3. 打印发布日志
RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
// 4. 发布消息(可靠模式下,会等待订阅者ACK,若未收到则重发)
publisher_->publish(message);
// 注释说明:也可以在发布若干消息后,主动调用wait_for_all_acked(比如发布10条后检查一次)
// After sending some messages, you can call wait_for_all_acked() to confirm all subscribers
// acknowledge messages.
}
// ========== 成员变量 ==========
rclcpp::TimerBase::SharedPtr timer_; // 定时器智能指针
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;// 发布者智能指针
size_t count_; // 消息计数器(无符号整数)
std::chrono::milliseconds wait_timeout_; // 等待确认的超时时间(5000ms)
};
- 主函数(程序入口)
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv); // 初始化ROS 2上下文
auto publisher = std::make_shared<MinimalPublisher>(); // 创建发布者节点
rclcpp::spin(publisher); // 自旋:阻塞等待回调(定时器/关闭回调)
rclcpp::shutdown(); // 关闭ROS 2上下文(触发预关闭回调)
return 0;
}
核心逻辑:rclcpp::shutdown() 会触发之前注册的 pre_shutdown_callback,先取消定时器,再等待订阅者确认消息,最后关闭节点。
2.4、核心细节与关键考点
- 为什么必须配置 reliable() 模式?
- wait_for_all_acked 仅对 可靠模式 的发布者生效;
- 若用 best_effort() 模式,wait_for_all_acked 会直接返回 true(尽力而为模式无 ACK 机制,无需等待)。
- 预关闭回调的作用(核心设计)
- 节点关闭时(如用户按 Ctrl+C),若不调用 wait_for_all_acked,已发布但未被订阅者确认的消息会丢失;
- 预关闭回调确保:节点关闭前,先停止发布新消息,再等待订阅者确认已发消息,避免消息丢失。
- wait_timeout_ 的设计思路
- 超时时间设为 5000ms(5 秒):既给订阅者足够的确认时间,又避免节点无限阻塞(比如订阅者崩溃 / 断网);
- 超时后节点仍会正常关闭,只是打印 “未全部确认” 的日志,不影响程序退出。
- PRId64 的作用
- wait_timeout_.count() 返回 std::chrono::milliseconds::rep 类型,本质是 int64_t;
- 不同平台对 int64_t 的打印格式不同(Linux 是 %lld,Windows 是 %I64d),PRId64 是跨平台的宏,确保格式化输出正确。
2.5、运行逻辑梳理(完整流程)
- 程序启动 → main 函数初始化 ROS 2,创建发布者节点;
- 节点构造函数执行:
- 创建可靠模式的发布者(topic 话题);
- 注册预关闭回调(关闭前取消定时器 + 等待 ACK);
- 创建 500ms 定时器,开始发布消息;
- 节点进入自旋,定时器每 500ms 触发一次:
- 发布 Hello, world! + 计数 消息;
- 可靠模式下,发布者会等待订阅者 ACK,未收到则重发;
- 用户按下 Ctrl+C → 触发 rclcpp::shutdown():
- 执行预关闭回调:取消定时器 → 调用 wait_for_all_acked();
- wait_for_all_acked 阻塞最多 5 秒,检查订阅者 ACK 状态并打印日志;
- 预关闭回调执行完毕 → 节点关闭,程序退出。
2.6、应用场景与价值
这个示例的核心价值是 可靠消息投递的收尾机制,适用于以下场景:
- 工业机器人 / 自动驾驶:关键指令(如 “停止运动”“执行抓取”)必须确保订阅者(执行器)收到,节点关闭前需确认指令已被接收;
- 数据采集 / 日志记录:发布的关键数据(如传感器读数、故障日志)必须确保被订阅者(数据服务器)接收,避免节点退出导致数据丢失;
- 分布式系统 shutdown:大规模 ROS 2 系统关闭时,确保各节点间的最后一批消息被可靠接收,避免系统状态不一致。
2.7、小结
- 核心功能:实现 ROS 2 可靠模式发布者,并在节点关闭前等待所有订阅者确认消息,避免消息丢失;
- 关键技术:QoS::reliable() 可靠模式、wait_for_all_acked 等待确认、pre_shutdown_callback 预关闭回调;
- 核心设计:节点关闭前 “停发新消息 → 等旧消息确认 → 正常退出” 的三步流程,保证可靠通信的完整性。
这个示例是 ROS 2 可靠通信的典型实践,尤其适合对 “消息必达” 有严格要求的工业级、安全关键场景。
3、技术背景与应用场景
3.1、推出时间
该特性随 ROS 2 Humble 长期支持版正式可用,同时兼容 Rolling 开发版ROS。
3.2、核心原理
它是 ROS 2 发布者(Publisher)的阻塞接口,需配合 可靠(Reliable)QoS 模式 使用,作用是等待所有匹配的订阅者(Subscriber)确认收到消息,超时则返回ROS。
3.3、适用场景
- 安全关键场景(自动驾驶、工业自动化、医疗设备):确保停机 / 关闭前的最后一批关键指令(如停止、急停、复位)被可靠接收,避免系统状态不一致。
- 分布式系统收尾同步:多节点协作任务关闭时,保证已发消息全部落地,再执行退出流程,确保数据完整性。
- 强一致性数据传输:传感器数据、配置下发、日志记录等场景,需确认消息无丢失,尤其适合网络不稳定或高延迟环境。
- 业务流程卡点等待:执行下一步操作前,必须确认前置消息已接收(如任务启动指令、模式切换指令),保证流程顺序正确。
3.4、关键依赖与限制
- RMW 支持:需使用 rmw_fastrtps、rmw_connextdds 或 rmw_cyclonedds 实现ROS。
- QoS 要求:仅可靠模式(RELIABLE)生效,尽力而为(BEST_EFFORT)模式下直接返回 trueROS。
- 无订阅者时:自动返回 true,无需等待ROS。
3.5、小结
这是为解决可靠通信 “收尾确认” 问题设计的特性,核心价值是保证节点关闭 / 流程切换时,消息不丢失、状态一致,特别适合工业机器人、自动驾驶等对可靠性有严格要求的系统。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)