引言:从 ROS1 到 ROS2,不只是 API 的变化

从 ROS1 迁移到 ROS2,很多开发者的第一感受是"API 变了、构建系统换了"。但真正的质变发生在更深的层面——整个通信架构被重新设计。ROS1 依赖中心化的 Master 节点来协调通信,所有节点启动前都要先"报到";而 ROS2 彻底抛弃了 Master,转而以 DDS(Data Distribution Service,数据分发服务)作为通信底座,实现了真正的分布式对等架构。 这种变化带来的不仅是"去中心化"的优雅,更引入了大量新的设计选择。很多开发者习惯"一把梭"用 Topic 解决所有问题,或者在 Service 回调里写死循环——在 ROS1 时代这也许只是"不太优雅",但在 ROS2 中,这可能直接导致节点假死、消息静默丢失,甚至系统级死锁。 核心问题:为什么我们需要区分 Topic、Service 和 Action?它们在 DDS 层面到底有何不同?QoS 策略又该如何配置才不至于"翻车"? 本文将剥开 rclcpp / rclpy 的外衣,带你从 DDS 视角重新审视 ROS2 的神经系统。

Topic(话题):流动的数据血液 核心概念

核心概念

Topic 是 ROS2 通信体系中最基础也最高频的机制,采用发布/订阅(Publish/Subscribe)模型。发布者持续向某个话题推送消息,订阅者静默接收——两者完全解耦,互不知道对方存在。可以把它想象成一个24 小时广播电台:电台只管播放节目,听众只管调频收听,电台不关心有多少人在听,听众也不关心电台的排班表。

底层原理(DDS 视角)

在 DDS 中间件层面,一个 ROS2 Topic 的通信链路是这样的:

Publisher Node  ──>  DDS DataWriter  ──(RTPS协议)──>  DDS DataReader  ──>  Subscriber Node
     topic: /cmd_vel                                        topic: /cmd_vel

发布节点内部创建一个 DDS DataWriter(数据写入器),订阅节点内部创建一个 DDS DataReader(数据读取器)。两者通过 DDS 的发现协议(Discovery Protocol)自动匹配——这也是 ROS2 去掉了 Master 的根本原因。每个节点启动时都会通过组播或单播向网络"宣告"自己的存在,其他节点发现它后自动建立连接,整个过程对开发者完全透明。

消息序列化方面,.msg 文件在编译时会被自动转换为 IDL(Interface Definition Language)定义,再由 DDS 实现(如 Fast DDS、Cyclone DDS)负责序列化和网络传输。这意味着不同语言编写的节点(C++、Python、Rust)可以无缝通信,因为底层走的是同一套 IDL 标准。

QoS:ROS2 相比 ROS1 最大的升级点

如果说 Topic 是 ROS2 的"血管",那 QoS(Quality of Service,服务质量)就是控制血液流速和纯度的"阀门"。这是 ROS2 相比 ROS1 最深刻的能力升级——ROS1 的通信只有一种模式(TCP + 可靠传输),而 ROS2 将传输策略的选择权交给了开发者。

QoS 由多个策略维度组合而成,最核心的三个:

Reliability(可靠性)决定了消息是否保证送达。Reliable 模式通过 RTPS 协议的 ACK/NACK 机制实现重传,确保每条消息都被接收端确认,适合控制指令和关键状态信息;BestEffort 模式则"尽力而为",发出去就不管了,适合激光雷达、摄像头等高频传感器数据——丢一两帧点云总比阻塞整个数据流要好。

Durability(持久性)决定了晚加入的订阅者能否收到历史消息。Volatile 模式只接收订阅建立之后的新消息,适合实时传感器流;TransientLocal 模式会为新订阅者"重放"最近的历史数据,适合地图、坐标系变换等"全局状态"类信息——想象一个新节点加入网络后,它需要立刻知道当前的 TF 树,而不是等下一次发布。

History(历史深度)决定了 DDS 在内存中缓存多少条消息。KeepLast(N) 保留最近 N 条,KeepAll 保留所有未确认消息。对于高频传感器,通常设置 KeepLast(1) 即可——你只关心最新的那一帧;对于需要可靠传输的控制指令,则需要更大的历史深度来应对网络抖动。

from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy

# 传感器数据的典型 QoS 配置:快速但可能丢包
sensor_qos = QoSProfile(
    reliability=ReliabilityPolicy.BEST_EFFORT,
    durability=DurabilityPolicy.VOLATILE,
    history=HistoryPolicy.KEEP_LAST,
    depth=1
)

# 控制指令的典型 QoS 配置:可靠且持久
control_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,
    durability=DurabilityPolicy.TRANSIENT_LOCAL,
    history=HistoryPolicy.KEEP_LAST,
    depth=10
)

适用场景

Topic 天然适合持续性的单向数据流:传感器数据(雷达点云、相机图像、IMU 数据)、状态广播(机器人位姿、电池电量)、高频控制指令(速度命令 /cmd_vel)。它的核心优势是极低的通信延迟和极高的吞吐量

避坑提醒

丢包问题:ROS2 底层传输基于 UDP(通过 RTPS 协议),UDP 本身不保证可靠传输。当你发现传感器数据"偶尔丢一帧"时,首先检查 QoS 是否匹配——如果发布者用了 BestEffort,而你的订阅者期望 Reliable,消息会静默丢失,不会有任何报错。这就是所谓的"QoS 不兼容",是新手最常踩的坑。

频率匹配:当发布频率远高于订阅者的处理能力时,消息会在 DDS 的接收缓冲区中堆积,导致延迟增大甚至内存溢出。解决方案是合理设置 History 的 depth 值,并在订阅者侧做好流量控制——只保留最新的 N 条消息,丢弃过时的数据。

Service(服务):短平快的指令交互

核心概念

Service 采用请求/响应(Request/Response)模型。客户端发起请求,服务端处理后返回响应——像打了一通电话:你拨号、对方接听、你说完挂断。整个过程在客户端侧表现为同步阻塞(默认行为),客户端在收到响应前会一直等待。

底层原理(揭秘时刻)

这里有一个很多人不知道的冷知识:DDS 规范本身并不原生支持 RPC(远程过程调用)。ROS2 的 Service 是通过两对 DDS Topic 来"模拟"请求/响应语义的:

Client ──(publish)──> rq/{service_name}/Request  ──(subscribe)──> Server
Server ──(publish)──> rr/{service_name}/Reply    ──(subscribe)──> Client

当你调用一个 Service 时,底层发生了四件事:客户端向 rq/ 主题发布请求消息,服务端从 rq/ 主题订阅并接收请求,服务端处理完毕后向 rr/ 主题发布响应,客户端从 rr/ 主题订阅并接收响应。表面上看是"打电话",底层其实是"互发短信"。(实际的 DDS Topic 命名还包含命名空间前缀,形如 rq/{namespace}/{service_name}/Request,这里为简化示意。)

这意味着 Service 的"同步"行为并不是真正的同步——它是在客户端侧通过 Future/Promise 机制实现的。客户端发出请求后,底层会创建一个 Future 对象,然后通过 Executor 轮询等待响应到达。如果你在服务端回调中做了耗时操作(比如调用另一个 Service),就可能导致死锁——客户端在等响应,而服务端的 Executor 线程已经被占满了,新的回调永远排不上队。

为什么 Service 不适合长耗时任务

Service 的回调函数运行在节点的 Executor(执行器)中。默认情况下,ROS2 节点使用 SingleThreadedExecutor,即所有回调(包括 Timer 回调、Subscription 回调、Service 回调)都在同一个线程中串行执行。

如果你在 Service 回调中做了一个耗时 5 秒的操作,那么在这 5 秒内,该节点的所有其他回调都无法执行——订阅的消息会堆积,定时器会延迟,甚至心跳检测都会超时。这不是"性能问题",而是"功能性 Bug"。

适用场景

Service 最适合快速完成的短任务:逻辑开关触发(拍照、重置位置、切换模式)、参数配置更新(动态调参)、一次性查询(获取当前地图信息)。判断标准很简单:如果一个操作能在几百毫秒内完成,用 Service;如果需要更久,考虑 Action。

代码示例:定义 .srv 文件与 Server/Client 实现

# srv/TakePhoto.srv
# 请求字段:相机ID和分辨率
string camera_id
int32 resolution
---
# 响应字段:是否成功 + 图像路径
bool success
string image_path
# Service Server:拍照服务
import rclpy
from rclpy.node import Node
from my_interfaces.srv import TakePhoto

class TakePhotoServer(Node):
    def __init__(self):
        super().__init__('take_photo_server')
        # 创建Service,绑定回调
        self.srv = self.create_service(
            TakePhoto, 'take_photo', self.handle_request
        )

    def handle_request(self, request, response):
        """处理拍照请求——注意:此回调应快速返回!"""
        self.get_logger().info(f'收到请求:相机={request.camera_id}, '
                               f'分辨率={request.resolution}')
        # 模拟快速拍照(毫秒级操作,可以接受)
        response.success = True
        response.image_path = f'/tmp/{request.camera_id}_photo.jpg'
        return response
# Service Client:调用拍照服务
import rclpy
from rclpy.node import Node
from my_interfaces.srv import TakePhoto

class TakePhotoClient(Node):
    def __init__(self):
        super().__init__('take_photo_client')
        self.client = self.create_client(TakePhoto, 'take_photo')
        # 等待服务端就绪(带超时)
        while not self.client.wait_for_service(timeout_sec=2.0):
            self.get_logger().warn('等待 take_photo 服务...')

    def send_request(self):
        """异步发送请求,避免阻塞主线程"""
        request = TakePhoto.Request()
        request.camera_id = 'front_cam'
        request.resolution = 1080
        # 异步调用:返回Future,不阻塞当前线程
        future = self.client.call_async(request)
        future.add_done_callback(self._on_response)

    def _on_response(self, future):
        """响应回调"""
        response = future.result()
        if response.success:
            self.get_logger().info(f'拍照成功:{response.image_path}')
        else:
            self.get_logger().warn('拍照失败')

def main():
    rclpy.init()
    client = TakePhotoClient()
    client.send_request()
    rclpy.spin_once(client, timeout_sec=5.0)  # 等待响应
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Action(动作):长程任务的指挥官

核心概念

Action 专门为长时间运行的任务设计,它解决了 Service 的两个核心缺陷:无法反馈进度无法中途取消。你可以把它想象成点外卖——下单(Goal)、骑手位置实时更新(Feedback)、你可以随时取消订单(Cancel),最后收到外卖(Result)。

底层原理(三组 Topic 协同工作)

Action 的实现最为精巧,它在底层由 3 个 Service + 2 个 Topic协同工作:

组件

类型

作用

_action/send_goal

Service

客户端发送目标,服务端接受或拒绝

_action/cancel_goal

Service

客户端请求取消某个进行中的目标

_action/get_result

Service

客户端获取最终执行结果

_action/feedback

Topic

服务端周期性发布实时反馈

_action/status

Topic

服务端发布目标状态(接受/执行/完成/取消等)

Client                              Server
  │                                   │
  ├──── send_goal (Service) ─────────>│  "导航到坐标(3, 5)"
  │<─── goal_response ────────────────┤  "收到,开始执行"
  │                                   │
  │<═══ feedback (Topic) ═════════════┤  "当前位置(1, 2)..."
  │<═══ feedback (Topic) ═════════════┤  "当前位置(2, 4)..."
  │<═══ status (Topic) ═══════════════┤  "STATUS_EXECUTING"
  │                                   │
  ├──── cancel_goal (Service) ───────>│  "算了,取消"(可选)
  │                                   │
  ├──── get_result (Service) ────────>│  "给我最终结果"
  │<─── result ───────────────────────┤  "已到达 / 已取消"

Goal 状态机:Action 的灵魂

与 Topic 和 Service 的"无状态"不同,Action 内部维护了一个Goal 状态机,每个目标在其生命周期中会经历以下状态转换:

send_goal ──> goal_callback 判定
                 │
                 ├──(拒绝)──> STATUS_REJECTED(终态,不进入执行)
                 │
                 └──(接受)──> STATUS_ACCEPTED ──(开始执行)──> STATUS_EXECUTING
                                                                 │
                                                                 ├──── (成功) ────> STATUS_SUCCEEDED
                                                                 ├──── (失败) ────> STATUS_ABORTED
                                                                 └──── (取消) ────> STATUS_CANCELING
                                                                                       │
                                                                                       └──> STATUS_CANCELED

每个 Goal 通过客户端生成的 UUID 来唯一标识,这意味着一个 Action Server 可以同时处理多个并行的目标。当你发送取消请求时,正是通过这个 UUID 精准定位到要取消的那个任务。

status Topic 使用了 TRANSIENT_LOCAL 的 QoS 持久性策略——这意味着即使客户端在目标执行过程中重启,它重新订阅后仍能收到最新的状态信息,不会"失联"。这个设计看似微小,但在生产环境中极其重要:想象一个导航任务正在执行,你的 UI 节点意外重启,如果没有持久性,重启后你将完全不知道小车的导航状态。

适用场景

Action 专为需要进度反馈和取消能力的长任务设计:导航(MoveBase)、机械臂抓取(Pick & Place)、长时间的计算任务(地图构建、路径规划)、多步骤任务编排(巡检流程)。

全维度横向对比

对比维度

Topic

Service

Action

通信模型

单向(多对多)

双向(一对一)

双向(一对多)

同步/异步

纯异步

默认同步(可异步调用)

纯异步

实时反馈

天然支持(持续发布)

不支持

支持(Feedback 通道)

任务取消

不适用

不适用

支持(Cancel 机制)

底层 DDS 实现

1 对 DataWriter/DataReader

2 对 Topic(rq + rr)

3 个 Service + 2 个 Topic

状态管理

无状态

无状态

有状态(Goal 状态机)

典型延迟

微秒级

毫秒级

毫秒级(额外协议开销)

适用场景

传感器数据、控制指令

参数查询、模式切换

导航、抓取、长任务编排

编程复杂度

中-高

一句话选型口诀:持续广播用 Topic,一问一答用 Service,又长又复杂还要能取消就用 Action。 在这九个维度中,最值得关注的三个差异是:同步与异步、有无状态和底层实现复杂度。Topic 和 Service 都是无状态的"即发即忘"模式,调试起来相对直观;而 Action 内部维护了一个 Goal 状态机,这意味着你需要处理更多的状态分支和边界条件。选 Action 之前,先确认你的任务确实"够格"——如果只是毫秒级的快速查询,用 Service 远比 Action 轻量。

实战案例:用 Action 控制小车导航到指定坐标

以下以 Python 为例,展示一个小车导航到目标位置的完整 Action 流程。

1. 定义 Action 接口

# action/NavigateTo.action
# === Goal:目标坐标 ===
float64 target_x
float64 target_y
float64 max_speed
---
# === Result:导航结果 ===
bool success
float64 final_x
float64 final_y
string message
---
# === Feedback:实时反馈 ===
float64 current_x
float64 current_y
float64 distance_remaining  # 剩余距离
float64 progress            # 进度 0.0 ~ 1.0

2. 编写 Action Server(导航服务端)

import rclpy
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.node import Node
from my_interfaces.action import NavigateTo
import math

class NavigateToServer(Node):
    def __init__(self):
        super().__init__('navigate_to_server')
        # 当前位置(简化模拟)
        self._current_x = 0.0
        self._current_y = 0.0

        self._action_server = ActionServer(
            self,
            NavigateTo,
            'navigate_to',
            execute_callback=self.execute_callback,
            # 目标接收时的回调(决定是否接受)
            goal_callback=self.goal_callback,
            # 取消请求时的回调(决定是否允许取消)
            cancel_callback=self.cancel_callback,
        )

    def goal_callback(self, goal_request):
        """服务端决定是否接受这个目标"""
        if goal_request.max_speed <= 0:
            self.get_logger().warn('无效的速度参数,拒绝目标')
            return GoalResponse.REJECT
        return GoalResponse.ACCEPT

    def cancel_callback(self, goal_handle):
        """服务端决定是否允许取消"""
        self.get_logger().info('收到取消请求,允许取消')
        return CancelResponse.ACCEPT

    def execute_callback(self, goal_handle):
        """核心执行逻辑:模拟小车逐步移动"""
        self.get_logger().info(f'开始导航到 ({goal_handle.request.target_x}, '
                               f'{goal_handle.request.target_y})')

        target_x = goal_handle.request.target_x
        target_y = goal_handle.request.target_y
        total_distance = math.sqrt(
            (target_x - self._current_x)**2 +
            (target_y - self._current_y)**2
        )

        feedback_msg = NavigateTo.Feedback()
        # 使用rate控制循环频率,非阻塞,允许响应取消请求
        rate = self.create_rate(10.0)  # 10Hz 更新频率
        step = 0.1  # 每步移动比例

        progress = 0.0
        while progress < 1.0:
            # 每步检查取消请求
            if goal_handle.is_cancel_requested:
                result = NavigateTo.Result()
                result.success = False
                result.message = '导航已被用户取消'
                goal_handle.canceled()  # 先构建结果,再标记状态
                return result

            # 更新位置(简化模拟)
            progress = min(progress + step, 1.0)
            feedback_msg.current_x = self._current_x + (target_x - self._current_x) * progress
            feedback_msg.current_y = self._current_y + (target_y - self._current_y) * progress
            feedback_msg.distance_remaining = total_distance * (1.0 - progress)
            feedback_msg.progress = progress
            goal_handle.publish_feedback(feedback_msg)

            rate.sleep()

        # 导航成功
        goal_handle.succeed()
        result = NavigateTo.Result()
        result.success = True
        result.final_x = target_x
        result.final_y = target_y
        result.message = '已到达目标位置'
        self.get_logger().info(f'导航完成!到达 ({target_x}, {target_y})')
        return result

def main():
    rclpy.init()
    node = NavigateToServer()
    rclpy.spin(node)

if __name__ == '__main__':
    main()

3. 编写 Action Client(导航客户端)

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from my_interfaces.action import NavigateTo

class NavigateToClient(Node):
    def __init__(self):
        super().__init__('navigate_to_client')
        self._action_client = ActionClient(self, NavigateTo, 'navigate_to')
        self._goal_handle = None

    def send_goal(self, x: float, y: float, speed: float = 1.0):
        """发送导航目标"""
        goal_msg = NavigateTo.Goal()
        goal_msg.target_x = x
        goal_msg.target_y = y
        goal_msg.max_speed = speed

        self.get_logger().info(f'正在发送导航目标: ({x}, {y})')
        self._action_client.wait_for_server()

        # 异步发送,注册反馈回调
        self._send_goal_future = self._action_client.send_goal_async(
            goal_msg,
            feedback_callback=self._on_feedback
        )
        self._send_goal_future.add_done_callback(self._on_goal_response)

    def _on_feedback(self, feedback_msg):
        """实时接收导航进度"""
        feedback = feedback_msg.feedback
        self.get_logger().info(
            f'进度: {feedback.progress*100:.0f}% | '
            f'位置: ({feedback.current_x:.2f}, {feedback.current_y:.2f}) | '
            f'剩余: {feedback.distance_remaining:.2f}m'
        )

    def _on_goal_response(self, future):
        """服务端回应(接受/拒绝)"""
        self._goal_handle = future.result()
        if not self._goal_handle.accepted:
            self.get_logger().warn('导航目标被服务端拒绝!')
            self._done = True
            return
        self.get_logger().info('导航目标已被接受,开始执行...')
        # 注册结果回调
        self._result_future = self._goal_handle.get_result_async()
        self._result_future.add_done_callback(self._on_result)

    def _on_result(self, future):
        """接收最终导航结果"""
        result = future.result().result
        if result.success:
            self.get_logger().info(f'{result.message} '
                                   f'最终位置: ({result.final_x:.2f}, {result.final_y:.2f})')
        else:
            self.get_logger().warn(f'导航失败: {result.message}')
        self._done = True

    def cancel_goal(self):
        """主动取消当前导航"""
        if self._goal_handle:
            self._goal_handle.cancel_goal_async()
            self.get_logger().info('已发送取消请求')

def main():
    rclpy.init()
    client = NavigateToClient()
    # 发送导航目标:移动到 (3.0, 5.0)
    client.send_goal(x=3.0, y=5.0, speed=1.5)
    while rclpy.ok() and not getattr(client, '_done', False):
        rclpy.spin_once(client, timeout_sec=0.5)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

注意一:QoS 不匹配是"沉默杀手"

这是新手最常踩的坑。如果发布者和订阅者的 QoS 策略不兼容,消息会静默丢失,不会有任何报错。最典型的案例:发布者用 BestEffort,订阅者用 Reliable,结果订阅者永远收不到数据,而控制台上一切正常——没有任何错误日志。

最佳实践:对于传感器数据(激光雷达、IMU、摄像头),统一使用 rclpy.qos.qos_profile_sensor_data(BestEffort + Volatile);对于控制指令和关键状态,统一使用 Reliable + TransientLocal。在团队协作中,建议在接口文档中明确标注每个 Topic 的 QoS 策略。ROS2 从 Humble 版本开始提供了 QoS 兼容性检测 API,善用它能在联调阶段省下大量排查时间。

注意二:Service 回调中禁止阻塞操作

Service 的回调函数运行在 Executor 的线程池中。如果你在回调里调用另一个 Service(同步等待),或者做了耗时的 I/O 操作,会直接阻塞 Executor。轻则后续回调排队延迟,重则整个节点"假死"。

最佳实践:如果服务端逻辑涉及耗时操作,使用 ReentrantCallbackGroup 或 MultiThreadedExecutor,让回调能在不同线程中并行执行。更好的方案是——当一个操作需要超过几百毫秒时,直接改用 Action。

from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup

# 使用可重入回调组,避免Service回调阻塞
cb_group = ReentrantCallbackGroup()
self.srv = self.create_service(
    MySrv, 'my_service', self.callback,
    callback_group=cb_group  # 关键:指定回调组
)

# main() 中使用多线程执行器
executor = MultiThreadedExecutor()
executor.add_node(my_node)
executor.spin()

注意三:Action 的 Feedback 频率需要克制

很多开发者在 Action 的 Feedback 回调中发布了过于频繁的状态更新(比如每毫秒一次),导致网络带宽被大量小消息占满,反而拖慢了整体性能。在 DDS 层面,每条 Feedback 消息都要经历序列化、网络传输、反序列化的完整链路,高频发布会显著增加 CPU 和网络开销。

最佳实践:Feedback 的发布频率应根据实际需求设定,通常 10Hz~20Hz 足够覆盖绝大多数场景(包括导航和机械臂控制)。如果是高频控制回路(如力控、阻抗控制),那部分逻辑应该放在底层控制器中通过 Topic 通信,而不是通过 Action 的 Feedback 通道。记住:Feedback 是给人看的"进度条",不是给控制回路用的"反馈信号"。

应用结论

Topic、Service 和 Action 并非彼此替代的关系,而是各守其位的"三剑客"。Topic 负责数据流的自由流通,Service 负责简洁的同步交互,Action 负责复杂的异步任务编排。理解它们在 DDS 层面的实现差异,能帮你在遇到通信故障时快速定位问题——是 QoS 不兼容?是 Executor 阻塞?还是 Goal 状态机异常?

好的通信架构,不是选最"高级"的机制,而是为每个场景选最合适的工具。下次当你纠结"该用 Service 还是 Action"时,不妨问自己三个问题:这个操作需要多久?我需要中间进度吗?我需要能取消吗?三个问题答完,答案自然就出来了。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐