前置知识:ROS2 节点、Python 类基础
学习主线:理解话题通信 → 创建功能包 → 编写发布者 → 运行验证


📑 目录


1. 什么是话题通信

1.1 一句话定义

话题通信 = 发布者往"话题"上扔消息,订阅者从"话题"上接消息。双方互不认识,只认话题名。

1.2 生活类比:抖音直播间 🎬

角色 ROS2 术语 现实类比
主播 发布者(Publisher) 不停地输出内容
弹幕频道 话题(Topic) 直播间聊天区,谁都能在这发/看消息
观众 订阅者(Subscriber) 进入直播间就能看到弹幕,来去自由

关键特征:

  • 主播看不到观众是谁,只管发消息 → 解耦
  • 观众随时进入/退出 → 动态订阅
  • 多个观众同时看 → 一对多
  • 主播用 C++、观众用 Python → 跨语言通信

1.3 通信模型图

┌─────────────────┐                           ┌─────────────────┐
│   节点 A (Node)  │                           │   节点 B (Node)  │
│                 │       话题: /novel         │                 │
│  发布者 Publisher │ ═══════════════════════▶ │ 订阅者 Subscriber│
│                 │     消息类型: String       │                 │
│  "第一章..."     │                           │  收到: "第一章"   │
└─────────────────┘                           └─────────────────┘

1.4 三大特点

特点 含义
异步单向 发布者发完就继续干自己的事,不等待订阅者
多对多 一个话题可以有多个发布者和多个订阅者
双一致规则 话题名 + 消息类型必须一致,缺一个都连不上

本章聚焦发布端——只讲怎么"发"。订阅端后续章节再讲。


2. 话题发布的核心逻辑

2.1 发布者的职责

一个发布者节点只做三件事:

① 创建发布者    → 声明"我要往哪个话题发什么类型的消息"
② 构造消息      → 创建一条消息,填入数据
③ 发布消息      → 把消息推到话题上

2.2 三板斧(先记住骨架)

# 第一步:创建发布者
self.pub = self.create_publisher(消息类型, 话题名, 队列大小)

# 第二步:构造消息
msg = 消息类型()        # 实例化一条空消息
msg.字段 = 数据          # 填入内容

# 第三步:发布
self.pub.publish(msg)   # 推到话题上

2.3 案例场景

本章用一个有趣的案例来演示:从网上下载一本小说,然后一行一行通过话题发布出去。

网络小说 ──下载──▶ Queue 缓冲 ──定时取出──▶ String消息 ──publish()──▶ 话题 'novel'

3. 动手前准备:创建功能包

3.1 创建工作空间

mkdir -p ~/chapt3/topic_ws/src
cd ~/chapt3/topic_ws

3.2 创建功能包

ros2 pkg create demo_python_topic \
    --build-type ament_python \
    --dependencies rclpy std_msgs

参数说明:

参数 含义
demo_python_topic 功能包名
--build-type ament_python Python 包
--dependencies rclpy std_msgs 依赖的 ROS2 库

3.3 验证依赖

打开 package.xml,确认包含:

<depend>rclpy</depend>
<depend>std_msgs</depend>

3.4 放入节点文件

demo_python_topic/
├── demo_python_topic/
│   ├── __init__.py
│   └── novel_pub_node.py     ← 发布者代码放这里
├── setup.py
└── package.xml

4. 编写发布者节点:逐行拆解

以下是完整的发布者代码 novel_pub_node.py,我们逐段讲解。

4.1 完整代码一览

import rclpy
from rclpy.node import Node
import requests
import threading
from std_msgs.msg import String
from queue import Queue

class NovelPubNode(Node):
    def __init__(self, node_name):
        super().__init__(node_name)
        self.novels_queue_ = Queue()
        self.novel_publisher_ = self.create_publisher(String, 'novel', 10)
        self.timer_ = self.create_timer(5, self.timer_callback)

    def download_novel(self, url):
        def _download():
            try:
                response = requests.get(url, timeout=10)
                response.encoding = 'utf-8'
                self.get_logger().info(f'下载完成:{url}')
                for line in response.text.splitlines():
                    if line.strip():
                        self.novels_queue_.put(line)
            except requests.RequestException as e:
                self.get_logger().error(f'下载失败:{e}')
        threading.Thread(target=_download, daemon=True).start()

    def timer_callback(self):
        if not self.novels_queue_.empty():
            msg = String()
            msg.data = self.novels_queue_.get()
            self.novel_publisher_.publish(msg)
            self.get_logger().info(f'发布:{msg.data[:30]}...')

def main():
    rclpy.init()
    node = NovelPubNode('novel_pub')
    node.download_novel('http://localhost:8000/novel1.txt')
    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

4.2 导入区 —— 引入武器库

import rclpy                         # ROS2 客户端库,提供 init、spin、shutdown
from rclpy.node import Node          # 节点基类,所有节点都要继承它
import requests                      # HTTP 请求库,用来下载小说
import threading                     # 多线程库,让下载不阻塞主循环
from std_msgs.msg import String      # ROS2 标准消息类型:字符串消息
from queue import Queue              # Python 内置队列,线程安全的数据缓冲
导入 属于 在本案例中的作用
rclpy ROS2 核心 初始化、进入循环、关闭 ROS2
Node ROS2 核心 节点基类,提供 create_publisher 等方法
requests Python 第三方 发送 HTTP GET 请求下载小说
threading Python 标准库 开启后台线程下载,不阻塞主循环
String ROS2 标准消息 消息类型,内含一个 data 字段(字符串)
Queue Python 标准库 线程安全队列,下载和发布之间的缓冲区

4.3 节点类定义 —— 继承 Node

class NovelPubNode(Node):
    def __init__(self, node_name):
        super().__init__(node_name)
代码 含义
class NovelPubNode(Node) 定义一个新类,继承 Node
super().__init__(node_name) 调用父类 Node 的初始化,传入节点名

这就是你之前学过的 ROS2 节点固定模板——继承 Nodesuper().__init__()


4.4 🔥 核心:创建发布者

self.novel_publisher_ = self.create_publisher(String, 'novel', 10)

三个参数逐个拆解:

参数位置 参数值 含义 类比
① 消息类型 String 规定消息长什么样 选一种信封格式
② 话题名 'novel' 频道名,订阅者靠这个对接 直播间房间号
③ 队列大小 10 订阅者处理不过来时最多缓存 10 条 快递柜只有 10 个格子

如何确认 String 里面有什么?

ros2 interface show std_msgs/msg/String
# 输出:string data

这说明 String 消息里只有一个字段叫 data,类型是 string。所以后面才能写 msg.data = "..."


4.5 创建定时器 —— 控制发布节奏

self.timer_ = self.create_timer(5, self.timer_callback)
参数 含义
5 5 秒触发一次
self.timer_callback 到时间就调用这个函数(注意:不加括号,传的是函数引用)

定时器让小说一行一行慢慢发,而不是一股脑全发出去。


4.6 创建队列 —— 下载和发布的缓冲区

self.novels_queue_ = Queue()

Queue 是 Python 标准库 queue 提供的线程安全队列。这里用它连接下载(快速)和发布(慢速)。


4.7 下载方法 —— 获取小说并切行入队

def download_novel(self, url):
    def _download():
        try:
            response = requests.get(url, timeout=10)
            response.encoding = 'utf-8'
            self.get_logger().info(f'下载完成:{url}')
            for line in response.text.splitlines():
                if line.strip():
                    self.novels_queue_.put(line)
        except requests.RequestException as e:
            self.get_logger().error(f'下载失败:{e}')
    threading.Thread(target=_download, daemon=True).start()

逐层拆解:

代码行 解释
def _download(): 内部函数,真正干下载活的
requests.get(url, timeout=10) 发送 HTTP GET 请求,最多等 10 秒
response.encoding = 'utf-8' 设置编码,防止中文乱码
response.text.splitlines() 把整本小说按行切成列表
for line in ...: 逐行遍历(详见 5.1 节
if line.strip(): 跳过空行
self.novels_queue_.put(line) 把这一行放入队列(详见 5.2 节
try/except 下载失败不会崩溃,打印错误日志
threading.Thread(target=_download, daemon=True).start() 后台线程下载,不阻塞 ROS2 主循环

为什么用多线程?requests.get() 是阻塞 I/O——下载完成前代码卡住不动。开新线程下载,主循环能立刻进入 spin() 状态。


4.8 定时回调 —— 从队列取一行并发布

def timer_callback(self):
    if not self.novels_queue_.empty():
        msg = String()
        msg.data = self.novels_queue_.get()
        self.novel_publisher_.publish(msg)
        self.get_logger().info(f'发布:{msg.data[:30]}...')

逐行拆解(话题通信三板斧的核心):

代码 属于哪一步 解释
msg = String() ② 构造消息 实例化一条空消息(详见 5.3 节
msg.data = self.novels_queue_.get() ② 构造消息 从队列取一行,填入消息的 data 字段
self.novel_publisher_.publish(msg) ③ 发布 🔥 把消息推到 novel 话题上!

消息生命周期图示:

String()          →   msg.data = "第一章..."   →   publish(msg)
(空信封诞生)          (信封里塞进内容)              (扔进话题)

4.9 main 函数 —— 启动节点

def main():
    rclpy.init()                                              # ① 初始化 ROS2
    node = NovelPubNode('novel_pub')                          # ② 实例化节点
    node.download_novel('http://localhost:8000/novel1.txt')   # ③ 触发下载
    rclpy.spin(node)                                          # ④ 进入循环
    rclpy.shutdown()                                          # ⑤ 退出清理

if __name__ == '__main__':
    main()
步骤 代码 作用
rclpy.init() 初始化 ROS2 客户端库
NovelPubNode('novel_pub') 创建节点实例,名字叫 novel_pub
node.download_novel(...) 后台线程下载小说
rclpy.spin(node) 让节点持续运行,定时器才能循环触发
rclpy.shutdown() Ctrl+C 后清理资源

if __name__ == '__main__':Python 文件被直接运行时才执行 main(),被 import 时不执行。


5. 案例中用到的 Python 语法

5.1 for 循环

语法骨架:

for 变量名 in 可迭代对象:
    循环体(缩进)

关键规则:

规则 说明
变量名不需要提前定义 for 自动创建并逐轮赋值
in 后面不一定是列表 字符串、range、文件都行——只要"能挨个掏东西"
冒号不能忘 for x in y: 冒号丢了直接报错
缩进决定循环体范围 不缩进的代码只执行一次

案例中的用法:

for line in response.text.splitlines():
    self.novels_queue_.put(line)
代码 翻译
for 循环开始
line 变量名(自动创建),代表当前这一行
in 在…里面
response.text.splitlines() 按行切好的列表
self.novels_queue_.put(line) 把这一行放进队列

5.2 队列操作

Queue 来自 Python 标准库 queuefrom queue import Queue)。

核心规则:FIFO(First In First Out)——先进先出,不插队。

入队方向 →             出队方向 →
   put()                  get()
    │                      ▲
    ▼                      │
 ┌────┬────┬────┬────┬────┐
 │ 新 │    │    │    │ 旧 │
 └────┴────┴────┴────┴────┘
  队尾                  队头

常用方法:

方法 作用
Queue() 创建空队列
put(item) 入队:把 item 加到队尾
get() 出队:从队头取一个元素,取完就没了
qsize() 查看队列里还剩几个
empty() 判断是否为空(True/False

案例中的应用:

download_novel()  ──put()──▶  [Queue 缓冲]  ──get()──▶  timer_callback() → publish()
   (快速下载)                    (缓冲)                (每5秒取一条)

为什么用队列而不是列表?

队列 Queue 普通列表 list
✅ 线程安全,自带锁 ❌ 多线程同时操作会冲突
✅ 生产者和消费者解耦 ❌ 耦合在一起
get() 可阻塞等待 ❌ 空了只能写 while 轮询(浪费 CPU)

5.3 实例化

类(class)是"饼干模具",实例化就是用模具"压出一个具体的东西"。

msg = String()
#    ~~~~~~~~
#    String 是模具(类)
#    String() 是压面团的动作(实例化)
#    msg 是压出来的饼干(实例)
术语 类比 代码
类(模具) 🧇 星星模具 String
实例化(动作) 往面团上一按 String()
实例(结果) ⭐ 压出的星星饼干 msg

一个模具可以压无数个:

msg1 = String()    # 饼干1
msg2 = String()    # 饼干2
msg3 = String()    # 饼干3
# 都是同一个模具,但各自独立

5.4 splitlines() —— 按行切分

text = "第一章 初入江湖\n第二章 剑指苍穹\n第三章 华山论剑"
lines = text.splitlines()
# → ['第一章 初入江湖', '第二章 剑指苍穹', '第三章 华山论剑']
输入 输出
\n 的大字符串 列表,每个元素一行(换行符被去掉)

6. 案例中用到的库

6.1 rclpy(ROS2 Python 客户端库)

ROS2 的 Python 接口库,所有 ROS2 Python 节点的基石。

函数/类 作用
rclpy.init() 初始化 ROS2
rclpy.spin(node) 让节点进入循环,等待回调触发
rclpy.shutdown() 清理资源,退出 ROS2

6.2 rclpy.node.Node(节点基类)

所有 ROS2 Python 节点必须继承它。提供的方法:

方法 作用
create_publisher(类型, 话题, 队列) 创建发布者
create_subscription(类型, 话题, 回调, 队列) 创建订阅者
create_timer(间隔, 回调) 创建定时器
get_logger().info(...) 打印日志

6.3 std_msgs.msg.String(标准字符串消息)

ROS2 内置的消息类型。使用前先查看结构:

ros2 interface show std_msgs/msg/String
# 输出:string data

只有一个字段 data,类型是 string

6.4 requests(HTTP 请求库)

Python 第三方库,用于发送 HTTP 请求。

response = requests.get(url, timeout=10)   # GET 请求
response.text                              # 响应正文(字符串)
response.encoding = 'utf-8'               # 设置编码

6.5 queue.Queue(线程安全队列)

Python 标准库,提供 FIFO 队列,多线程环境下安全使用。

6.6 threading(多线程库)

Python 标准库,用于创建和管理线程。

threading.Thread(target=函数, daemon=True).start()
参数 含义
target=函数 线程要执行的函数(传引用,不加括号)
daemon=True 守护线程——主线程退出时自动回收

7. 配置、构建与运行

7.1 配置 setup.py

在功能包根目录的 setup.py 中添加入口点:

entry_points={
    'console_scripts': [
        'novel_pub = demo_python_topic.novel_pub_node:main',
    ],
},
字段 含义
novel_pub 终端里敲的命令名
demo_python_topic.novel_pub_node Python 文件路径(不含 .py
main 文件中的入口函数名

7.2 构建

cd ~/chapt3/topic_ws
colcon build --packages-select demo_python_topic

7.3 改变环境变量

source install/setup.bash

每次新开终端都要 source 一次。或者追加到 ~/.bashrc 一劳永逸。

7.4 启动本地 HTTP 服务器

# 先准备一个小说文件 novel1.txt,放在某个目录
cd ~/novels/
python3 -m http.server 8000 &

7.5 运行

ros2 run demo_python_topic novel_pub

输出示例:

[INFO] 下载完成:http://localhost:8000/novel1.txt
[INFO] 发布:第一章 初入江湖...
[INFO] 发布:第二章 剑指苍穹...
[INFO] 发布:第三章 华山论剑...

8. 终端验证与调试

# ① 查看所有话题
ros2 topic list
# 输出应包含 /novel

# ② 查看话题详情
ros2 topic info /novel
# 输出:Type: std_msgs/msg/String
#       Publisher count: 1

# ③ 实时偷看话题上的数据
ros2 topic echo /novel
# 每5秒刷出一行小说

# ④ 手动发布一条消息(不写代码也能发)
ros2 topic pub /novel std_msgs/msg/String "data: '手动发的测试消息'"

# ⑤ 查看 String 的结构
ros2 interface show std_msgs/msg/String
# 输出:string data

9. 发布者最小模板

剥离所有业务逻辑,发布者的最小骨架:

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class MyPublisher(Node):
    def __init__(self):
        super().__init__('my_publisher')
        self.pub = self.create_publisher(String, 'my_topic', 10)    # ① 创建发布者
        self.timer = self.create_timer(1.0, self.callback)

    def callback(self):
        msg = String()            # ② 构造消息
        msg.data = 'Hello ROS2'
        self.pub.publish(msg)     # ③ 发布
        self.get_logger().info(f'发布:{msg.data}')

def main():
    rclpy.init()
    rclpy.spin(MyPublisher())
    rclpy.shutdown()

if __name__ == '__main__':
    main()

三板斧对照:

步骤 代码
① 创建发布者 self.create_publisher(String, 'my_topic', 10)
② 构造消息 msg = String()msg.data = ...
③ 发布 self.pub.publish(msg)

10. 总结

本章核心收获

话题通信的本质:发布-订阅模式,双方通过"话题名+消息类型"对接。

发布者只做三件事:
    ① create_publisher  →  声明"我在哪个频道发什么类型的消息"
    ② msg = String()    →  实例化消息 + 填数据
    ③ publish(msg)      →  推到话题上

案例把这三步嵌入了"下载小说 → 队列缓冲 → 定时发布"的完整流程中。

数据流全景图

┌──────────────────────────────────────────────────────┐
│                  NovelPubNode 节点                    │
│                                                      │
│  download_novel(url)          timer_callback()        │
│     │                              ▲                  │
│     │ requests.get()               │ 每5秒触发        │
│     │ splitlines() 切行             │                  │
│     ▼                              │                  │
│  ┌──────────┐              ┌───────┴──────┐          │
│  │ Queue    │              │ msg=String()  │          │
│  │ 缓冲     │  ──get()──▶  │ msg.data=...  │          │
│  │ put(line)│              │ publish(msg)  │──────────▶
│  └──────────┘              └──────────────┘          │
└──────────────────────────────────────────────────────┘
                                                       │
                                                ┌──────▼──────┐
                                                │  话题 /novel  │
                                                │  类型 String  │
                                                └──────────────┘

下一步

学完发布,下一步就是订阅——用 create_subscription 在另一个节点接收这些小说行。然后两个终端同时跑,就能看到"主播发弹幕、观众收弹幕"的完整效果。


术语速查表

术语 含义 一句话
话题(Topic) 通信的频道名 发布者和订阅者之间的"暗号"
消息(Message) 数据的封装格式 通信的"信封",规定里面有什么字段
发布者(Publisher) 往话题发消息的节点 主播,只管输出
实例化 用类创建一个具体对象 用饼干模具压出一块饼干
FIFO First In First Out 先进先出,不许插队
可迭代对象 能挨个掏东西出来的类型 for x in ...in 后面能放的
回调函数 事件发生时自动调用的函数 定时器到期、收到消息时触发
守护线程 daemon=True 主线程退出时自动回收的后台线程
QoS 服务质量(队列大小等) create_publisher 第三个参数
splitlines() 按换行符切分字符串 把一整块文字切成一行一行的列表
colcon build ROS2 构建命令 编译所有功能包
source setup.bash 加载环境变量 让终端能识别你包里的节点
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐