1、示例代码

#include <chrono>
#include <functional>
#include <cinttypes>
#include <memory>
#include <string>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

using namespace std::chrono_literals;

/* This example shows how to use wait_for_all_acked for the publisher */
class MinimalPublisher : public rclcpp::Node
{
public:
  MinimalPublisher()
  : Node("minimal_publisher_with_wait_for_all_acked"),
    count_(0),
    wait_timeout_(5000)
  {
    // publisher must set reliable mode
    publisher_ = this->create_publisher<std_msgs::msg::String>(
      "topic",
      rclcpp::QoS(10).reliable());

    // call wait_for_all_acked before shutdown
    using rclcpp::contexts::get_global_default_context;
    get_global_default_context()->add_pre_shutdown_callback(
      [this]() {
        this->timer_->cancel();
        this->wait_for_all_acked();
      });

    timer_ = this->create_wall_timer(
      500ms, std::bind(&MinimalPublisher::timer_callback, this));
  }

private:
  void wait_for_all_acked()
  {
    // Confirm all subscribers receive sent messages.
    // Note that if no subscription is connected, wait_for_all_acked() always return true.
    if (publisher_->wait_for_all_acked(wait_timeout_)) {
      RCLCPP_INFO(
        this->get_logger(),
        "All subscribers acknowledge messages");
    } else {
      RCLCPP_INFO(
        this->get_logger(),
        "Not all subscribers acknowledge messages during %" PRId64 " ms",
        static_cast<int64_t>(wait_timeout_.count()));
    }
  }

  void timer_callback()
  {
    auto message = std_msgs::msg::String();
    message.data = "Hello, world! " + std::to_string(count_++);
    RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
    publisher_->publish(message);

    // After sending some messages, you can call wait_for_all_acked() to confirm all subscribers
    // acknowledge messages.
  }
  rclcpp::TimerBase::SharedPtr timer_;
  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
  size_t count_;
  std::chrono::milliseconds wait_timeout_;
};

int main(int argc, char * argv[])
{
  rclcpp::init(argc, argv);

  auto publisher = std::make_shared<MinimalPublisher>();
  rclcpp::spin(publisher);
  rclcpp::shutdown();

  return 0;
}

2、代码解析

以上代码是 ROS 2 中带 wait_for_all_acked 机制的发布者示例,核心目的是演示如何确保 “所有订阅者都确认收到发布的消息”(消息可靠投递),并在节点关闭前等待订阅者的确认反馈。本文将从整体功能、核心概念、逐模块解析、运行逻辑四个维度,对每个细节和底层原理进行讲解。

2.1、整体功能总结

这个程序实现了一个 ROS 2 节点(minimal_publisher_with_wait_for_all_acked):

  1. 以 500ms 为周期向 topic 话题发布字符串消息(Hello, world! + 计数);
  2. 发布者配置为 可靠模式(Reliable)(保证消息必达,而非尽力而为);
  3. 注册 “预关闭回调函数”:节点关闭前先取消定时器,再调用 wait_for_all_acked 等待所有订阅者确认收到消息;
  4. wait_for_all_acked 会阻塞最多 5000ms,检查是否所有订阅者都确认了消息:
  • 若超时前所有订阅者确认 → 打印成功日志;
  • 若超时仍有订阅者未确认 → 打印超时日志;
  1. 核心特性:确保节点关闭前,已发布的消息被所有订阅者可靠接收(避免节点退出导致消息丢失)。

2.2、核心前置概念

在解析代码前,先理解两个关键概念(ROS 2 可靠通信的核心):

  1. QoS 可靠模式(Reliable):
  • ROS 2 消息投递有两种模式:Reliable(可靠)和 Best Effort(尽力而为);
  • Reliable 模式下,发布者会持续重发消息,直到订阅者返回 “确认(ACK)”,保证消息必达;
  • Best Effort 模式下,发布者只发一次消息,不关心订阅者是否收到(适合实时性要求高于可靠性的场景,如传感器流)。
  1. wait_for_all_acked:
  • ROS 2 发布者的核心接口,作用是 “阻塞等待,直到所有已连接的订阅者都确认收到了发布的所有消息”;
  • 入参是超时时间:若超时前所有订阅者确认 → 返回 true;否则返回 false;
  • 特殊情况:若没有订阅者连接 → 直接返回 true(无订阅者无需确认)。

2.3、逐模块代码解析

  1. 头文件引入(基础依赖)
#include <chrono>       // 时间相关(500ms/5000ms 超时)
#include <functional>   // 函数绑定(std::bind)、Lambda 回调
#include <cinttypes>    // 格式化输出宏(PRId64,用于int64_t打印)
#include <memory>       // 智能指针(SharedPtr)
#include <string>       // 字符串处理

#include "rclcpp/rclcpp.hpp"        // ROS 2 核心 API
#include "std_msgs/msg/string.hpp"  // ROS 2 标准字符串消息

重点:cinttypes 是为了兼容不同平台的 int64_t 打印(PRId64 宏),避免直接用 %lld 等平台相关格式符。

  1. 时间字面量命名空间
using namespace std::chrono_literals;

启用 C++14 时间字面量(如 500ms),简化定时器周期和超时时间的写法。

  1. 发布者类定义(MinimalPublisher)
class MinimalPublisher : public rclcpp::Node
{
public:
  // 构造函数:初始化节点、计数器、超时时间、发布者、回调、定时器
  MinimalPublisher()
  : Node("minimal_publisher_with_wait_for_all_acked"),  // 节点名
    count_(0),                                          // 消息计数器初始化
    wait_timeout_(5000)                                 // 等待确认的超时时间:5000ms(5秒)
  {
    // ========== 第一步:创建可靠模式的发布者 ==========
    // QoS配置:队列大小10 + 可靠模式(reliable())
    publisher_ = this->create_publisher<std_msgs::msg::String>(
      "topic",                  // 话题名
      rclcpp::QoS(10).reliable() // QoS策略:可靠模式(必须!wait_for_all_acked仅对可靠模式生效)
    );

    // ========== 第二步:注册预关闭回调函数 ==========
    // 作用:节点关闭(rclcpp::shutdown())前,先执行这个Lambda函数
    using rclcpp::contexts::get_global_default_context;
    get_global_default_context()->add_pre_shutdown_callback(
      [this]() {  // Lambda表达式,捕获当前节点的this指针
        this->timer_->cancel();       // 取消定时器,停止发布新消息
        this->wait_for_all_acked();   // 等待所有订阅者确认已收到的消息
      });

    // ========== 第三步:创建定时器(500ms周期发布消息) ==========
    timer_ = this->create_wall_timer(
      500ms, std::bind(&MinimalPublisher::timer_callback, this));
  }

private:
  // ========== 核心方法:等待所有订阅者确认消息 ==========
  void wait_for_all_acked()
  {
    // 调用发布者的wait_for_all_acked,阻塞最多wait_timeout_(5000ms)
    if (publisher_->wait_for_all_acked(wait_timeout_)) {
      // 成功:所有订阅者都确认了消息
      RCLCPP_INFO(
        this->get_logger(),
        "All subscribers acknowledge messages");
    } else {
      // 失败:超时,部分订阅者未确认
      RCLCPP_INFO(
        this->get_logger(),
        "Not all subscribers acknowledge messages during %" PRId64 " ms",
        static_cast<int64_t>(wait_timeout_.count())); // 打印超时时间(ms)
    }
    // 注:PRId64是cinttypes的宏,用于安全打印int64_t类型(wait_timeout_.count()返回milliseconds::rep,即int64_t)
  }

  // ========== 定时器回调函数:发布消息 ==========
  void timer_callback()
  {
    // 1. 创建ROS字符串消息
    auto message = std_msgs::msg::String();
    // 2. 拼接消息内容:Hello, world! + 计数器
    message.data = "Hello, world! " + std::to_string(count_++);
    // 3. 打印发布日志
    RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
    // 4. 发布消息(可靠模式下,会等待订阅者ACK,若未收到则重发)
    publisher_->publish(message);

    // 注释说明:也可以在发布若干消息后,主动调用wait_for_all_acked(比如发布10条后检查一次)
    // After sending some messages, you can call wait_for_all_acked() to confirm all subscribers
    // acknowledge messages.
  }

  // ========== 成员变量 ==========
  rclcpp::TimerBase::SharedPtr timer_;                          // 定时器智能指针
  rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;// 发布者智能指针
  size_t count_;                                                // 消息计数器(无符号整数)
  std::chrono::milliseconds wait_timeout_;                      // 等待确认的超时时间(5000ms)
};
  1. 主函数(程序入口)
int main(int argc, char * argv[])
{
  rclcpp::init(argc, argv);  // 初始化ROS 2上下文

  auto publisher = std::make_shared<MinimalPublisher>(); // 创建发布者节点
  rclcpp::spin(publisher);                               // 自旋:阻塞等待回调(定时器/关闭回调)
  rclcpp::shutdown();                                    // 关闭ROS 2上下文(触发预关闭回调)

  return 0;
}

核心逻辑:rclcpp::shutdown() 会触发之前注册的 pre_shutdown_callback,先取消定时器,再等待订阅者确认消息,最后关闭节点。

2.4、核心细节与关键考点

  1. 为什么必须配置 reliable() 模式?
  • wait_for_all_acked 仅对 可靠模式 的发布者生效;
  • 若用 best_effort() 模式,wait_for_all_acked 会直接返回 true(尽力而为模式无 ACK 机制,无需等待)。
  1. 预关闭回调的作用(核心设计)
  • 节点关闭时(如用户按 Ctrl+C),若不调用 wait_for_all_acked,已发布但未被订阅者确认的消息会丢失;
  • 预关闭回调确保:节点关闭前,先停止发布新消息,再等待订阅者确认已发消息,避免消息丢失。
  1. wait_timeout_ 的设计思路
  • 超时时间设为 5000ms(5 秒):既给订阅者足够的确认时间,又避免节点无限阻塞(比如订阅者崩溃 / 断网);
  • 超时后节点仍会正常关闭,只是打印 “未全部确认” 的日志,不影响程序退出。
  1. PRId64 的作用
  • wait_timeout_.count() 返回 std::chrono::milliseconds::rep 类型,本质是 int64_t;
  • 不同平台对 int64_t 的打印格式不同(Linux 是 %lld,Windows 是 %I64d),PRId64 是跨平台的宏,确保格式化输出正确。

2.5、运行逻辑梳理(完整流程)

  1. 程序启动 → main 函数初始化 ROS 2,创建发布者节点;
  2. 节点构造函数执行:
  • 创建可靠模式的发布者(topic 话题);
  • 注册预关闭回调(关闭前取消定时器 + 等待 ACK);
  • 创建 500ms 定时器,开始发布消息;
  1. 节点进入自旋,定时器每 500ms 触发一次:
  • 发布 Hello, world! + 计数 消息;
  • 可靠模式下,发布者会等待订阅者 ACK,未收到则重发;
  1. 用户按下 Ctrl+C → 触发 rclcpp::shutdown():
  • 执行预关闭回调:取消定时器 → 调用 wait_for_all_acked();
  • wait_for_all_acked 阻塞最多 5 秒,检查订阅者 ACK 状态并打印日志;
  1. 预关闭回调执行完毕 → 节点关闭,程序退出。

2.6、应用场景与价值

这个示例的核心价值是 可靠消息投递的收尾机制,适用于以下场景:

  1. 工业机器人 / 自动驾驶:关键指令(如 “停止运动”“执行抓取”)必须确保订阅者(执行器)收到,节点关闭前需确认指令已被接收;
  2. 数据采集 / 日志记录:发布的关键数据(如传感器读数、故障日志)必须确保被订阅者(数据服务器)接收,避免节点退出导致数据丢失;
  3. 分布式系统 shutdown:大规模 ROS 2 系统关闭时,确保各节点间的最后一批消息被可靠接收,避免系统状态不一致。

2.7、小结

  1. 核心功能:实现 ROS 2 可靠模式发布者,并在节点关闭前等待所有订阅者确认消息,避免消息丢失;
  2. 关键技术:QoS::reliable() 可靠模式、wait_for_all_acked 等待确认、pre_shutdown_callback 预关闭回调;
  3. 核心设计:节点关闭前 “停发新消息 → 等旧消息确认 → 正常退出” 的三步流程,保证可靠通信的完整性。

这个示例是 ROS 2 可靠通信的典型实践,尤其适合对 “消息必达” 有严格要求的工业级、安全关键场景。

3、技术背景与应用场景

3.1、推出时间

该特性随 ROS 2 Humble 长期支持版正式可用,同时兼容 Rolling 开发版ROS。

3.2、核心原理

它是 ROS 2 发布者(Publisher)的阻塞接口,需配合 可靠(Reliable)QoS 模式 使用,作用是等待所有匹配的订阅者(Subscriber)确认收到消息,超时则返回ROS。

3.3、适用场景

  1. 安全关键场景(自动驾驶、工业自动化、医疗设备):确保停机 / 关闭前的最后一批关键指令(如停止、急停、复位)被可靠接收,避免系统状态不一致。
  2. 分布式系统收尾同步:多节点协作任务关闭时,保证已发消息全部落地,再执行退出流程,确保数据完整性。
  3. 强一致性数据传输:传感器数据、配置下发、日志记录等场景,需确认消息无丢失,尤其适合网络不稳定或高延迟环境。
  4. 业务流程卡点等待:执行下一步操作前,必须确认前置消息已接收(如任务启动指令、模式切换指令),保证流程顺序正确。

3.4、关键依赖与限制

  • RMW 支持:需使用 rmw_fastrtps、rmw_connextdds 或 rmw_cyclonedds 实现ROS。
  • QoS 要求:仅可靠模式(RELIABLE)生效,尽力而为(BEST_EFFORT)模式下直接返回 trueROS。
  • 无订阅者时:自动返回 true,无需等待ROS。

3.5、小结

这是为解决可靠通信 “收尾确认” 问题设计的特性,核心价值是保证节点关闭 / 流程切换时,消息不丢失、状态一致,特别适合工业机器人、自动驾驶等对可靠性有严格要求的系统。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐