【ROS2】ROS 2 中 WaitSet(等待集)的简介与使用
·
【ROS2】ROS 2 中 WaitSet(等待集)的简介与使用
1、官方示例代码
#include <iostream>
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
/* This example creates a subclass of Node and uses a wait-set based loop to wait on
* a subscription to have messages available and then handles them manually without an executor */
class WaitSetSubscriber : public rclcpp::Node
{
public:
explicit WaitSetSubscriber(rclcpp::NodeOptions options)
: Node("wait_set_subscriber", options)
{
rclcpp::CallbackGroup::SharedPtr cb_group_waitset = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive, false);
auto subscription_options = rclcpp::SubscriptionOptions();
subscription_options.callback_group = cb_group_waitset;
auto subscription_callback = [this](std_msgs::msg::String::UniquePtr msg) {
RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
};
subscription_ = this->create_subscription<std_msgs::msg::String>(
"topic",
10,
subscription_callback,
subscription_options);
wait_set_.add_subscription(subscription_);
thread_ = std::thread([this]() -> void {spin_wait_set();});
}
~WaitSetSubscriber()
{
if (thread_.joinable()) {
thread_.join();
}
}
void spin_wait_set()
{
while (rclcpp::ok()) {
// Wait for the subscriber event to trigger. Set a 1 ms margin to trigger a timeout.
const auto wait_result = wait_set_.wait(std::chrono::milliseconds(501));
switch (wait_result.kind()) {
case rclcpp::WaitResultKind::Ready:
{
std_msgs::msg::String msg;
rclcpp::MessageInfo msg_info;
if (subscription_->take(msg, msg_info)) {
std::shared_ptr<void> type_erased_msg = std::make_shared<std_msgs::msg::String>(msg);
subscription_->handle_message(type_erased_msg, msg_info);
}
break;
}
case rclcpp::WaitResultKind::Timeout:
if (rclcpp::ok()) {
RCLCPP_WARN(this->get_logger(), "Timeout. No message received after given wait-time");
}
break;
default:
RCLCPP_ERROR(this->get_logger(), "Error. Wait-set failed.");
}
}
}
private:
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
rclcpp::WaitSet wait_set_;
std::thread thread_;
};
#include "rclcpp_components/register_node_macro.hpp"
RCLCPP_COMPONENTS_REGISTER_NODE(WaitSetSubscriber)
2、代码解析
以上代码是 ROS 2 中基于 WaitSet(等待集)的手动消息处理示例,核心特点是不依赖 ROS 2 默认的 Executor(执行器),而是通过 WaitSet 主动等待订阅者的消息事件,手动接收并处理消息。这种方式能让开发者完全掌控消息处理的时机和线程模型,是 ROS 2 进阶编程中 “手动控制事件循环” 的典型实现。
2.1、整体功能总结
这个程序实现了一个 ROS 2 订阅者节点(wait_set_subscriber),核心逻辑:
- 创建独立的回调组和订阅者,将订阅者加入 WaitSet(等待集);
- 启动一个独立线程,在该线程中运行基于 WaitSet 的事件循环;
- 循环等待订阅者有消息可用(超时时间 501ms):
- 有消息时:手动 take(取出)消息,调用订阅者的回调函数处理;
- 超时无消息时:打印警告日志;
- 等待失败时:打印错误日志;
- 节点销毁时,等待线程退出,保证资源安全释放;
- 核心特性:脱离 ROS 2 内置的 spin()/spin_some() 执行器,完全手动控制消息的等待、接收和处理流程。
2.2、核心前置概念
在解析代码前,先理解 3 个关键概念(ROS 2 事件驱动核心):
- WaitSet(等待集):
- 本质:ROS 2 封装的事件等待机制,底层对接 DDS 的 WaitSet,可监听订阅者、定时器、服务、客户端等事件;
- 作用:主动阻塞等待 “目标事件触发”(如订阅者有消息),替代 Executor 的自动事件分发;
- 核心方法:add_subscription()(添加监听的订阅者)、wait()(阻塞等待事件,可设超时)。
- Executor(执行器):
- ROS 2 默认的事件处理机制(spin()/spin_some() 底层就是 Executor),自动监听事件、分发回调;
- 本示例完全绕过 Executor,用 WaitSet + 手动线程实现事件循环,灵活性更高但需手动处理所有逻辑。
- CallbackGroup(回调组):
- 用于管理回调函数的线程模型,示例中创建 MutuallyExclusive(互斥)回调组,保证同一时间只有一个回调执行;
- 这里的回调组主要是为订阅者绑定上下文,实际消息处理由手动线程完成。
2.3、逐模块代码解析
- 头文件引入(基础依赖)
#include <iostream> // 基础输入输出(示例中未直接使用,预留)
#include <memory> // 智能指针(SharedPtr/UniquePtr)
#include <thread> // 线程创建与管理(std::thread)
#include "rclcpp/rclcpp.hpp" // ROS 2 核心 API
#include "std_msgs/msg/string.hpp" // ROS 2 标准字符串消息
#include "rclcpp_components/register_node_macro.hpp" // 组件化注册宏(节点可编译为组件)
重点: 用于创建独立的事件循环线程;rclcpp_components/register_node_macro.hpp 支持将节点注册为 ROS 2 组件(可选特性)。
- 节点类定义(核心逻辑)
class WaitSetSubscriber : public rclcpp::Node
{
public:
// 构造函数:初始化节点、回调组、订阅者、WaitSet、事件线程
explicit WaitSetSubscriber(rclcpp::NodeOptions options)
: Node("wait_set_subscriber", options) // 节点名 + 自定义节点选项
{
// ========== 第一步:创建独立的回调组 ==========
// 类型:MutuallyExclusive(互斥),第二个参数 false 表示不自动添加到节点的默认执行器
rclcpp::CallbackGroup::SharedPtr cb_group_waitset = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive, false);
// ========== 第二步:配置订阅者选项(绑定回调组) ==========
auto subscription_options = rclcpp::SubscriptionOptions();
subscription_options.callback_group = cb_group_waitset; // 订阅者绑定到自定义回调组
// ========== 第三步:定义订阅者回调函数(Lambda) ==========
auto subscription_callback = [this](std_msgs::msg::String::UniquePtr msg) {
// 回调逻辑:打印收到的消息
RCLCPP_INFO(this->get_logger(), "I heard: '%s'", msg->data.c_str());
};
// ========== 第四步:创建订阅者 ==========
subscription_ = this->create_subscription<std_msgs::msg::String>(
"topic", // 订阅的话题名
10, // 队列大小
subscription_callback, // 消息回调函数
subscription_options // 自定义选项(绑定回调组)
);
// ========== 第五步:将订阅者加入 WaitSet(监听其消息事件) ==========
wait_set_.add_subscription(subscription_);
// ========== 第六步:启动独立线程运行 WaitSet 事件循环 ==========
// 线程执行 spin_wait_set() 方法,脱离主线程的 spin()
thread_ = std::thread([this]() -> void {spin_wait_set();});
}
// ========== 析构函数:安全释放线程资源 ==========
~WaitSetSubscriber()
{
// 检查线程是否可连接(避免重复 join),确保线程退出后再销毁节点
if (thread_.joinable()) {
thread_.join();
}
}
// ========== 核心方法:WaitSet 事件循环 ==========
void spin_wait_set()
{
// 循环条件:ROS 2 上下文正常(未调用 shutdown)
while (rclcpp::ok()) {
// 1. 阻塞等待订阅者事件,超时时间 501ms
// 若订阅者有消息,返回 Ready;超时返回 Timeout;失败返回 Error
const auto wait_result = wait_set_.wait(std::chrono::milliseconds(501));
// 2. 根据等待结果处理
switch (wait_result.kind()) {
// 情况 1:有消息可用(Ready)
case rclcpp::WaitResultKind::Ready:
{
// 定义存储消息的变量和消息信息(时间戳、发布者等)
std_msgs::msg::String msg;
rclcpp::MessageInfo msg_info;
// 手动 take(取出)消息:从订阅者队列中取出一条消息
// take 返回 bool:true 表示成功取出,false 表示队列空(理论上不会发生,因 WaitSet 已通知 Ready)
if (subscription_->take(msg, msg_info)) {
// 将消息封装为类型擦除的共享指针(适配 handle_message 的参数要求)
std::shared_ptr<void> type_erased_msg = std::make_shared<std_msgs::msg::String>(msg);
// 手动调用订阅者的回调函数处理消息
subscription_->handle_message(type_erased_msg, msg_info);
}
break;
}
// 情况 2:超时(Timeout)
case rclcpp::WaitResultKind::Timeout:
// 若 ROS 2 上下文仍正常,打印超时警告
if (rclcpp::ok()) {
RCLCPP_WARN(this->get_logger(), "Timeout. No message received after given wait-time");
}
break;
// 情况 3:等待失败(Error)
default:
RCLCPP_ERROR(this->get_logger(), "Error. Wait-set failed.");
}
}
}
private:
// ========== 成员变量 ==========
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_; // 订阅者智能指针
rclcpp::WaitSet wait_set_; // 等待集(监听订阅者事件)
std::thread thread_; // 事件循环线程
};
// ========== 组件化注册:将节点注册为 ROS 2 组件(可选) ==========
RCLCPP_COMPONENTS_REGISTER_NODE(WaitSetSubscriber)
- 核心逻辑补充说明
- take() vs callback:
- 普通订阅者由 Executor 自动调用 take() + handle_message();
- 本示例中,WaitSet 通知 “有消息” 后,手动调用 take() 取出消息,再调用 handle_message() 触发回调,完全复刻 Executor 的核心逻辑,但由开发者掌控时机。
- 线程安全:
- 事件循环运行在独立线程 thread_ 中,与主线程隔离;
- 析构函数中 join() 线程,避免节点销毁时线程仍在运行导致的资源泄漏。
- 超时设置 501ms:
- 预留 1ms 余量,避免与 500ms 等常见周期冲突,确保超时逻辑能稳定触发。
2.4、运行逻辑梳理(完整流程)
- 程序启动 → 初始化 ROS 2 上下文,创建 WaitSetSubscriber 节点;
- 节点构造函数执行:
- 创建回调组 → 配置订阅者 → 绑定到回调组 → 将订阅者加入 WaitSet;
- 启动独立线程,执行 spin_wait_set() 方法;
- 线程进入 spin_wait_set() 循环:
- 调用 wait_set_.wait(501ms) 阻塞等待订阅者消息;
- 若发布者向 topic 发布消息:
- WaitSet 检测到事件,返回 Ready;
- 手动 take() 取出消息,调用 handle_message() 触发回调,打印消息;
- 若 501ms 内无消息:
- WaitSet 返回 Timeout,打印超时警告;
- 若等待失败(如订阅者销毁):
- 返回 Error,打印错误日志;
- 用户按下 Ctrl+C → rclcpp::ok() 变为 false,循环退出;
- 节点析构 → join() 线程,释放资源,程序退出。
2.5、应用场景与价值
这个示例的核心价值是 脱离 Executor 手动控制事件循环,适用于以下场景:
- 自定义事件循环:
- 需要将 ROS 2 事件(消息、定时器)集成到外部事件循环(如 Qt 事件循环、工业控制循环);
- 需精确控制消息处理的时机(如每 100ms 批量处理一次消息,而非收到即处理)。
- 高性能 / 低延迟场景:
- 绕过 Executor 的封装,直接调用底层 take()/handle_message(),减少中间层开销;
- 对消息处理线程做精细化调度(如绑定到特定 CPU 核心)。
- 多事件协同等待:
- WaitSet 可同时监听多个订阅者、定时器、服务端事件,实现 “任意一个事件触发就处理” 的逻辑(如同时等待传感器消息和控制指令)。
- 嵌入式 / 资源受限系统:
- 替代笨重的 Executor,仅保留必要的 WaitSet 逻辑,减少内存 / CPU 占用。
- 调试 / 故障排查:
- 手动控制消息接收流程,便于断点调试(如在 take() 后暂停,检查消息内容)。
2.6、关键对比(WaitSet vs Executor)
| 维度 | WaitSet(手动) | Executor(自动,如 spin()) |
|---|---|---|
| 控制粒度 | 极细(完全掌控等待 / 接收 / 处理) | 较粗(自动分发,开发者仅写回调) |
| 灵活性 | 极高(可集成到任意事件循环) | 较低(依赖 ROS 2 内置逻辑) |
| 代码复杂度 | 高(需手动处理等待 / 错误 / 线程) | 低(一行 spin() 搞定) |
| 性能 | 略高(减少 Executor 封装开销) | 略低(额外封装层) |
| 适用场景 | 进阶 / 定制化需求 | 普通 / 快速开发需求 |
2.7、小结
- 核心功能:实现不依赖 Executor 的 ROS 2 订阅者,通过 WaitSet + 独立线程手动控制消息的等待、接收和处理;
- 关键技术:rclcpp::WaitSet 事件监听、take() 手动取消息、handle_message() 手动触发回调、独立线程管理;
- 核心价值:完全掌控消息处理流程,适用于自定义事件循环、高性能 / 低延迟、嵌入式系统等进阶场景;
- 学习重点:理解 ROS 2 事件驱动的底层逻辑(Executor 本质是 WaitSet + 自动循环),是 ROS 2 进阶开发的核心知识点。
这个示例是 ROS 2 从 “使用封装” 到 “理解底层” 的关键过渡,掌握它能深入理解 ROS 2 事件处理的本质。
3、技术背景与应用场景
ROS 2 WaitSet(等待集):推出时间与适用场景
3.1、推出时间与版本演进
WaitSet 是 ROS 2 从底层对接 DDS 标准的核心特性,其演进历程如下:
- 底层基础能力:随 ROS 2 首个正式版本 Ardent Apalone(2017 年 12 月) 引入,作为 rcl 层(ROS Client Library)的基础接口,直接封装 DDS 的 WaitSet 机制;
- rclcpp 层标准化:在 Foxy Fitzroy(2020 年 6 月) 版本完成 rclcpp::WaitSet 接口定型,提供易用的 C++ 封装(如 add_subscription()、wait() 等方法);
- 功能完善与稳定:
- Humble Hawksbill(2022 年 5 月,LTS):补充超时处理、多事件监听、错误码标准化,成为生产级可用特性;
- Iron Irwini(2023 年):优化性能,支持更多事件类型(如定时器、服务、客户端、GuardCondition);
- 兼容性:所有 ROS 2 版本(Ardent 及以后)均支持 WaitSet,无版本兼容限制,是 ROS 2 事件驱动的底层核心。
3.2、核心原理
WaitSet 是 ROS 2 最底层的事件等待机制,本质是:
- 开发者将需要监听的事件源(订阅者、定时器、服务端、客户端等)加入 WaitSet;
- 调用 wait() 方法阻塞当前线程,直到任意一个事件源触发(如订阅者有消息、定时器到期)或超时;
- 开发者手动处理触发的事件(如取出消息、执行定时器逻辑),完全掌控事件处理的时机和流程。
它是 ROS 2 内置 Executor(spin()/spin_some())的底层实现基础——Executor 本质就是 “WaitSet + 自动事件分发 + 线程管理” 的封装。
3.3、核心适用场景(按优先级)
- 自定义事件循环(最核心场景)
- 集成外部事件循环:将 ROS 2 事件(消息、定时器)融入非 ROS 原生的事件循环(如 Qt/QML 事件循环、工业控制周期循环、Unity/Unreal 游戏引擎循环);
- 批量事件处理:不 “收到消息即处理”,而是按固定周期(如 100ms)批量取出所有待处理消息 / 事件,适配高实时性的控制周期;
- 单线程多事件协同:在一个线程中同时监听 “传感器消息 + 控制指令 + 定时器”,任意一个事件触发即处理,避免多线程同步开销。
- 高性能 / 低延迟场景
- 嵌入式 / 边缘设备:绕过 Executor 的封装开销(如自动回调分发、线程池管理),直接调用底层 take() 取消息,减少 CPU / 内存占用;
- 硬实时系统:手动控制 WaitSet 的等待超时、事件处理时机,可绑定到特定 CPU 核心,满足微秒级延迟要求(如工业机器人运动控制);
- 高频消息处理:对激光雷达、图像等高频消息,通过 WaitSet 精准控制 “取消息→处理→反馈” 的全流程,避免 Executor 调度的不确定性。
- 精细化事件控制
- 选择性处理事件:监听多个订阅者,但仅处理 “优先级高” 的事件(如先处理急停指令,再处理普通传感器消息);
- 自定义超时逻辑:为不同事件设置差异化超时(如传感器消息超时 100ms 报警,控制指令超时 10ms 触发应急逻辑);
- 事件依赖处理:等待 “多个事件都触发” 后再处理(如同时收到 “传感器数据 + 校准指令” 才执行校准,通过多次 WaitSet 等待实现)。
- 多事件源统一监听
- 一个 WaitSet 可同时监听多种类型的事件源:订阅者(消息到达)、定时器(周期触发)、服务端(客户端请求)、客户端(服务响应)、GuardCondition(手动触发的条件);
- 典型场景:机器人 “待机状态” 下,同时监听 “唤醒指令(订阅者)、定时唤醒(定时器)、手动触发(GuardCondition)”,任意一个事件触发即退出待机。
- 调试 / 底层开发
- 深入理解 ROS 2 事件机制:通过 WaitSet 手动实现 Executor 的核心逻辑,理解 spin() 的底层原理;
- 精准调试事件流程:在 wait() 后断点,检查哪些事件触发、触发时机,定位 “消息丢失”“定时器延迟” 等底层问题;
- 自定义 Executor:基于 WaitSet 开发适配特定场景的 Executor(如单线程高优先级 Executor、批量处理 Executor)。
- 资源极度受限的场景
- 单片机、裸机移植的 ROS 2 Micro(RMW 为 MicroXRCE-DDS):仅保留 WaitSet 核心逻辑,无需加载完整 Executor,大幅降低资源占用。
3.4、不适用场景
- 普通快速开发:仅需简单订阅 / 发布消息,使用 spin()/spin_some() 更高效,无需手动编写 WaitSet 循环;
- 多线程并发处理:需要多个回调并行执行时,Executor 的线程池机制比手动管理 WaitSet + 多线程更简单;
- 对代码复杂度敏感的场景:WaitSet 需手动处理事件检测、消息取出、错误处理,代码量远大于 Executor。
3.5、与 Executor 的核心对比(补充)
| 维度 | WaitSet(手动) | Executor(自动,如 spin()) |
|---|---|---|
| 控制粒度 | 极细(底层事件级) | 较粗(回调级) |
| 灵活性 | 完全自定义(适配任意场景) | 固定逻辑(仅可配置线程数) |
| 代码复杂度 | 高(需手动处理等待 / 取消息 / 错误) | 低(仅需编写回调函数) |
| 性能 | 无封装开销(更高) | 有调度 / 分发开销(略低) |
| 学习成本 | 高(需理解 ROS 2 底层事件模型) | 低(开箱即用) |
3.6、小结
- 推出核心节点:2017 年随 ROS 2 首个版本引入底层能力,2020 年 Foxy 版本完成 C++ 接口标准化,2022 年 Humble 成为稳定的生产级特性;
- 核心定位:ROS 2 事件驱动的底层基石,Executor 是其 “封装后的易用版”;
- 最佳场景:自定义事件循环、高性能 / 低延迟系统、嵌入式 / 硬实时场景、多事件源统一监听,是 ROS 2 进阶开发(从 “使用” 到 “定制”)的核心工具。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)