话题使用第一章:如何发布话题
前置知识:ROS2 节点、Python 类基础
学习主线:理解话题通信 → 创建功能包 → 编写发布者 → 运行验证
📑 目录
- 1. 什么是话题通信
- 2. 话题发布的核心逻辑
- 3. 动手前准备:创建功能包
- 4. 编写发布者节点:逐行拆解
- 5. 案例中用到的 Python 语法
- 6. 案例中用到的库
- 7. 配置、构建与运行
- 8. 终端验证与调试
- 9. 发布者最小模板
- 10. 总结
- 术语速查表
1. 什么是话题通信
1.1 一句话定义
话题通信 = 发布者往"话题"上扔消息,订阅者从"话题"上接消息。双方互不认识,只认话题名。
1.2 生活类比:抖音直播间 🎬
| 角色 | ROS2 术语 | 现实类比 |
|---|---|---|
| 主播 | 发布者(Publisher) | 不停地输出内容 |
| 弹幕频道 | 话题(Topic) | 直播间聊天区,谁都能在这发/看消息 |
| 观众 | 订阅者(Subscriber) | 进入直播间就能看到弹幕,来去自由 |
关键特征:
- 主播看不到观众是谁,只管发消息 → 解耦
- 观众随时进入/退出 → 动态订阅
- 多个观众同时看 → 一对多
- 主播用 C++、观众用 Python → 跨语言通信
1.3 通信模型图
┌─────────────────┐ ┌─────────────────┐
│ 节点 A (Node) │ │ 节点 B (Node) │
│ │ 话题: /novel │ │
│ 发布者 Publisher │ ═══════════════════════▶ │ 订阅者 Subscriber│
│ │ 消息类型: String │ │
│ "第一章..." │ │ 收到: "第一章" │
└─────────────────┘ └─────────────────┘
1.4 三大特点
| 特点 | 含义 |
|---|---|
| 异步单向 | 发布者发完就继续干自己的事,不等待订阅者 |
| 多对多 | 一个话题可以有多个发布者和多个订阅者 |
| 双一致规则 | 话题名 + 消息类型必须一致,缺一个都连不上 |
本章聚焦发布端——只讲怎么"发"。订阅端后续章节再讲。
2. 话题发布的核心逻辑
2.1 发布者的职责
一个发布者节点只做三件事:
① 创建发布者 → 声明"我要往哪个话题发什么类型的消息"
② 构造消息 → 创建一条消息,填入数据
③ 发布消息 → 把消息推到话题上
2.2 三板斧(先记住骨架)
# 第一步:创建发布者
self.pub = self.create_publisher(消息类型, 话题名, 队列大小)
# 第二步:构造消息
msg = 消息类型() # 实例化一条空消息
msg.字段 = 数据 # 填入内容
# 第三步:发布
self.pub.publish(msg) # 推到话题上
2.3 案例场景
本章用一个有趣的案例来演示:从网上下载一本小说,然后一行一行通过话题发布出去。
网络小说 ──下载──▶ Queue 缓冲 ──定时取出──▶ String消息 ──publish()──▶ 话题 'novel'
3. 动手前准备:创建功能包
3.1 创建工作空间
mkdir -p ~/chapt3/topic_ws/src
cd ~/chapt3/topic_ws
3.2 创建功能包
ros2 pkg create demo_python_topic \
--build-type ament_python \
--dependencies rclpy std_msgs
参数说明:
| 参数 | 含义 |
|---|---|
demo_python_topic |
功能包名 |
--build-type ament_python |
Python 包 |
--dependencies rclpy std_msgs |
依赖的 ROS2 库 |
3.3 验证依赖
打开 package.xml,确认包含:
<depend>rclpy</depend>
<depend>std_msgs</depend>
3.4 放入节点文件
demo_python_topic/
├── demo_python_topic/
│ ├── __init__.py
│ └── novel_pub_node.py ← 发布者代码放这里
├── setup.py
└── package.xml
4. 编写发布者节点:逐行拆解
以下是完整的发布者代码 novel_pub_node.py,我们逐段讲解。
4.1 完整代码一览
import rclpy
from rclpy.node import Node
import requests
import threading
from std_msgs.msg import String
from queue import Queue
class NovelPubNode(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.novels_queue_ = Queue()
self.novel_publisher_ = self.create_publisher(String, 'novel', 10)
self.timer_ = self.create_timer(5, self.timer_callback)
def download_novel(self, url):
def _download():
try:
response = requests.get(url, timeout=10)
response.encoding = 'utf-8'
self.get_logger().info(f'下载完成:{url}')
for line in response.text.splitlines():
if line.strip():
self.novels_queue_.put(line)
except requests.RequestException as e:
self.get_logger().error(f'下载失败:{e}')
threading.Thread(target=_download, daemon=True).start()
def timer_callback(self):
if not self.novels_queue_.empty():
msg = String()
msg.data = self.novels_queue_.get()
self.novel_publisher_.publish(msg)
self.get_logger().info(f'发布:{msg.data[:30]}...')
def main():
rclpy.init()
node = NovelPubNode('novel_pub')
node.download_novel('http://localhost:8000/novel1.txt')
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
4.2 导入区 —— 引入武器库
import rclpy # ROS2 客户端库,提供 init、spin、shutdown
from rclpy.node import Node # 节点基类,所有节点都要继承它
import requests # HTTP 请求库,用来下载小说
import threading # 多线程库,让下载不阻塞主循环
from std_msgs.msg import String # ROS2 标准消息类型:字符串消息
from queue import Queue # Python 内置队列,线程安全的数据缓冲
| 导入 | 属于 | 在本案例中的作用 |
|---|---|---|
rclpy |
ROS2 核心 | 初始化、进入循环、关闭 ROS2 |
Node |
ROS2 核心 | 节点基类,提供 create_publisher 等方法 |
requests |
Python 第三方 | 发送 HTTP GET 请求下载小说 |
threading |
Python 标准库 | 开启后台线程下载,不阻塞主循环 |
String |
ROS2 标准消息 | 消息类型,内含一个 data 字段(字符串) |
Queue |
Python 标准库 | 线程安全队列,下载和发布之间的缓冲区 |
4.3 节点类定义 —— 继承 Node
class NovelPubNode(Node):
def __init__(self, node_name):
super().__init__(node_name)
| 代码 | 含义 |
|---|---|
class NovelPubNode(Node) |
定义一个新类,继承 Node |
super().__init__(node_name) |
调用父类 Node 的初始化,传入节点名 |
这就是你之前学过的 ROS2 节点固定模板——继承
Node,super().__init__()。
4.4 🔥 核心:创建发布者
self.novel_publisher_ = self.create_publisher(String, 'novel', 10)
三个参数逐个拆解:
| 参数位置 | 参数值 | 含义 | 类比 |
|---|---|---|---|
| ① 消息类型 | String |
规定消息长什么样 | 选一种信封格式 |
| ② 话题名 | 'novel' |
频道名,订阅者靠这个对接 | 直播间房间号 |
| ③ 队列大小 | 10 |
订阅者处理不过来时最多缓存 10 条 | 快递柜只有 10 个格子 |
如何确认 String 里面有什么?
ros2 interface show std_msgs/msg/String
# 输出:string data
这说明 String 消息里只有一个字段叫 data,类型是 string。所以后面才能写 msg.data = "..."。
4.5 创建定时器 —— 控制发布节奏
self.timer_ = self.create_timer(5, self.timer_callback)
| 参数 | 含义 |
|---|---|
5 |
每 5 秒触发一次 |
self.timer_callback |
到时间就调用这个函数(注意:不加括号,传的是函数引用) |
定时器让小说一行一行慢慢发,而不是一股脑全发出去。
4.6 创建队列 —— 下载和发布的缓冲区
self.novels_queue_ = Queue()
Queue 是 Python 标准库 queue 提供的线程安全队列。这里用它连接下载(快速)和发布(慢速)。
4.7 下载方法 —— 获取小说并切行入队
def download_novel(self, url):
def _download():
try:
response = requests.get(url, timeout=10)
response.encoding = 'utf-8'
self.get_logger().info(f'下载完成:{url}')
for line in response.text.splitlines():
if line.strip():
self.novels_queue_.put(line)
except requests.RequestException as e:
self.get_logger().error(f'下载失败:{e}')
threading.Thread(target=_download, daemon=True).start()
逐层拆解:
| 代码行 | 解释 |
|---|---|
def _download(): |
内部函数,真正干下载活的 |
requests.get(url, timeout=10) |
发送 HTTP GET 请求,最多等 10 秒 |
response.encoding = 'utf-8' |
设置编码,防止中文乱码 |
response.text.splitlines() |
把整本小说按行切成列表 |
for line in ...: |
逐行遍历(详见 5.1 节) |
if line.strip(): |
跳过空行 |
self.novels_queue_.put(line) |
把这一行放入队列(详见 5.2 节) |
try/except |
下载失败不会崩溃,打印错误日志 |
threading.Thread(target=_download, daemon=True).start() |
在后台线程下载,不阻塞 ROS2 主循环 |
为什么用多线程?
requests.get()是阻塞 I/O——下载完成前代码卡住不动。开新线程下载,主循环能立刻进入spin()状态。
4.8 定时回调 —— 从队列取一行并发布
def timer_callback(self):
if not self.novels_queue_.empty():
msg = String()
msg.data = self.novels_queue_.get()
self.novel_publisher_.publish(msg)
self.get_logger().info(f'发布:{msg.data[:30]}...')
逐行拆解(话题通信三板斧的核心):
| 代码 | 属于哪一步 | 解释 |
|---|---|---|
msg = String() |
② 构造消息 | 实例化一条空消息(详见 5.3 节) |
msg.data = self.novels_queue_.get() |
② 构造消息 | 从队列取一行,填入消息的 data 字段 |
self.novel_publisher_.publish(msg) |
③ 发布 | 🔥 把消息推到 novel 话题上! |
消息生命周期图示:
String() → msg.data = "第一章..." → publish(msg)
(空信封诞生) (信封里塞进内容) (扔进话题)
4.9 main 函数 —— 启动节点
def main():
rclpy.init() # ① 初始化 ROS2
node = NovelPubNode('novel_pub') # ② 实例化节点
node.download_novel('http://localhost:8000/novel1.txt') # ③ 触发下载
rclpy.spin(node) # ④ 进入循环
rclpy.shutdown() # ⑤ 退出清理
if __name__ == '__main__':
main()
| 步骤 | 代码 | 作用 |
|---|---|---|
| ① | rclpy.init() |
初始化 ROS2 客户端库 |
| ② | NovelPubNode('novel_pub') |
创建节点实例,名字叫 novel_pub |
| ③ | node.download_novel(...) |
后台线程下载小说 |
| ④ | rclpy.spin(node) |
让节点持续运行,定时器才能循环触发 |
| ⑤ | rclpy.shutdown() |
Ctrl+C 后清理资源 |
if __name__ == '__main__':Python 文件被直接运行时才执行main(),被import时不执行。
5. 案例中用到的 Python 语法
5.1 for 循环
语法骨架:
for 变量名 in 可迭代对象:
循环体(缩进)
关键规则:
| 规则 | 说明 |
|---|---|
| 变量名不需要提前定义 | for 自动创建并逐轮赋值 |
in 后面不一定是列表 |
字符串、range、文件都行——只要"能挨个掏东西" |
| 冒号不能忘 | for x in y: 冒号丢了直接报错 |
| 缩进决定循环体范围 | 不缩进的代码只执行一次 |
案例中的用法:
for line in response.text.splitlines():
self.novels_queue_.put(line)
| 代码 | 翻译 |
|---|---|
for |
循环开始 |
line |
变量名(自动创建),代表当前这一行 |
in |
在…里面 |
response.text.splitlines() |
按行切好的列表 |
self.novels_queue_.put(line) |
把这一行放进队列 |
5.2 队列操作
Queue 来自 Python 标准库 queue(from queue import Queue)。
核心规则:FIFO(First In First Out)——先进先出,不插队。
入队方向 → 出队方向 →
put() get()
│ ▲
▼ │
┌────┬────┬────┬────┬────┐
│ 新 │ │ │ │ 旧 │
└────┴────┴────┴────┴────┘
队尾 队头
常用方法:
| 方法 | 作用 |
|---|---|
Queue() |
创建空队列 |
put(item) |
入队:把 item 加到队尾 |
get() |
出队:从队头取一个元素,取完就没了 |
qsize() |
查看队列里还剩几个 |
empty() |
判断是否为空(True/False) |
案例中的应用:
download_novel() ──put()──▶ [Queue 缓冲] ──get()──▶ timer_callback() → publish()
(快速下载) (缓冲) (每5秒取一条)
为什么用队列而不是列表?
队列 Queue |
普通列表 list |
|---|---|
| ✅ 线程安全,自带锁 | ❌ 多线程同时操作会冲突 |
| ✅ 生产者和消费者解耦 | ❌ 耦合在一起 |
✅ get() 可阻塞等待 |
❌ 空了只能写 while 轮询(浪费 CPU) |
5.3 实例化
类(class)是"饼干模具",实例化就是用模具"压出一个具体的东西"。
msg = String()
# ~~~~~~~~
# String 是模具(类)
# String() 是压面团的动作(实例化)
# msg 是压出来的饼干(实例)
| 术语 | 类比 | 代码 |
|---|---|---|
| 类(模具) | 🧇 星星模具 | String |
| 实例化(动作) | 往面团上一按 | String() |
| 实例(结果) | ⭐ 压出的星星饼干 | msg |
一个模具可以压无数个:
msg1 = String() # 饼干1
msg2 = String() # 饼干2
msg3 = String() # 饼干3
# 都是同一个模具,但各自独立
5.4 splitlines() —— 按行切分
text = "第一章 初入江湖\n第二章 剑指苍穹\n第三章 华山论剑"
lines = text.splitlines()
# → ['第一章 初入江湖', '第二章 剑指苍穹', '第三章 华山论剑']
| 输入 | 输出 |
|---|---|
带 \n 的大字符串 |
列表,每个元素一行(换行符被去掉) |
6. 案例中用到的库
6.1 rclpy(ROS2 Python 客户端库)
ROS2 的 Python 接口库,所有 ROS2 Python 节点的基石。
| 函数/类 | 作用 |
|---|---|
rclpy.init() |
初始化 ROS2 |
rclpy.spin(node) |
让节点进入循环,等待回调触发 |
rclpy.shutdown() |
清理资源,退出 ROS2 |
6.2 rclpy.node.Node(节点基类)
所有 ROS2 Python 节点必须继承它。提供的方法:
| 方法 | 作用 |
|---|---|
create_publisher(类型, 话题, 队列) |
创建发布者 |
create_subscription(类型, 话题, 回调, 队列) |
创建订阅者 |
create_timer(间隔, 回调) |
创建定时器 |
get_logger().info(...) |
打印日志 |
6.3 std_msgs.msg.String(标准字符串消息)
ROS2 内置的消息类型。使用前先查看结构:
ros2 interface show std_msgs/msg/String
# 输出:string data
只有一个字段 data,类型是 string。
6.4 requests(HTTP 请求库)
Python 第三方库,用于发送 HTTP 请求。
response = requests.get(url, timeout=10) # GET 请求
response.text # 响应正文(字符串)
response.encoding = 'utf-8' # 设置编码
6.5 queue.Queue(线程安全队列)
Python 标准库,提供 FIFO 队列,多线程环境下安全使用。
6.6 threading(多线程库)
Python 标准库,用于创建和管理线程。
threading.Thread(target=函数, daemon=True).start()
| 参数 | 含义 |
|---|---|
target=函数 |
线程要执行的函数(传引用,不加括号) |
daemon=True |
守护线程——主线程退出时自动回收 |
7. 配置、构建与运行
7.1 配置 setup.py
在功能包根目录的 setup.py 中添加入口点:
entry_points={
'console_scripts': [
'novel_pub = demo_python_topic.novel_pub_node:main',
],
},
| 字段 | 含义 |
|---|---|
novel_pub |
终端里敲的命令名 |
demo_python_topic.novel_pub_node |
Python 文件路径(不含 .py) |
main |
文件中的入口函数名 |
7.2 构建
cd ~/chapt3/topic_ws
colcon build --packages-select demo_python_topic
7.3 改变环境变量
source install/setup.bash
每次新开终端都要 source 一次。或者追加到
~/.bashrc一劳永逸。
7.4 启动本地 HTTP 服务器
# 先准备一个小说文件 novel1.txt,放在某个目录
cd ~/novels/
python3 -m http.server 8000 &
7.5 运行
ros2 run demo_python_topic novel_pub
输出示例:
[INFO] 下载完成:http://localhost:8000/novel1.txt
[INFO] 发布:第一章 初入江湖...
[INFO] 发布:第二章 剑指苍穹...
[INFO] 发布:第三章 华山论剑...
8. 终端验证与调试
# ① 查看所有话题
ros2 topic list
# 输出应包含 /novel
# ② 查看话题详情
ros2 topic info /novel
# 输出:Type: std_msgs/msg/String
# Publisher count: 1
# ③ 实时偷看话题上的数据
ros2 topic echo /novel
# 每5秒刷出一行小说
# ④ 手动发布一条消息(不写代码也能发)
ros2 topic pub /novel std_msgs/msg/String "data: '手动发的测试消息'"
# ⑤ 查看 String 的结构
ros2 interface show std_msgs/msg/String
# 输出:string data
9. 发布者最小模板
剥离所有业务逻辑,发布者的最小骨架:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MyPublisher(Node):
def __init__(self):
super().__init__('my_publisher')
self.pub = self.create_publisher(String, 'my_topic', 10) # ① 创建发布者
self.timer = self.create_timer(1.0, self.callback)
def callback(self):
msg = String() # ② 构造消息
msg.data = 'Hello ROS2'
self.pub.publish(msg) # ③ 发布
self.get_logger().info(f'发布:{msg.data}')
def main():
rclpy.init()
rclpy.spin(MyPublisher())
rclpy.shutdown()
if __name__ == '__main__':
main()
三板斧对照:
| 步骤 | 代码 |
|---|---|
| ① 创建发布者 | self.create_publisher(String, 'my_topic', 10) |
| ② 构造消息 | msg = String() → msg.data = ... |
| ③ 发布 | self.pub.publish(msg) |
10. 总结
本章核心收获
话题通信的本质:发布-订阅模式,双方通过"话题名+消息类型"对接。
发布者只做三件事:
① create_publisher → 声明"我在哪个频道发什么类型的消息"
② msg = String() → 实例化消息 + 填数据
③ publish(msg) → 推到话题上
案例把这三步嵌入了"下载小说 → 队列缓冲 → 定时发布"的完整流程中。
数据流全景图
┌──────────────────────────────────────────────────────┐
│ NovelPubNode 节点 │
│ │
│ download_novel(url) timer_callback() │
│ │ ▲ │
│ │ requests.get() │ 每5秒触发 │
│ │ splitlines() 切行 │ │
│ ▼ │ │
│ ┌──────────┐ ┌───────┴──────┐ │
│ │ Queue │ │ msg=String() │ │
│ │ 缓冲 │ ──get()──▶ │ msg.data=... │ │
│ │ put(line)│ │ publish(msg) │──────────▶
│ └──────────┘ └──────────────┘ │
└──────────────────────────────────────────────────────┘
│
┌──────▼──────┐
│ 话题 /novel │
│ 类型 String │
└──────────────┘
下一步
学完发布,下一步就是订阅——用 create_subscription 在另一个节点接收这些小说行。然后两个终端同时跑,就能看到"主播发弹幕、观众收弹幕"的完整效果。
术语速查表
| 术语 | 含义 | 一句话 |
|---|---|---|
| 话题(Topic) | 通信的频道名 | 发布者和订阅者之间的"暗号" |
| 消息(Message) | 数据的封装格式 | 通信的"信封",规定里面有什么字段 |
| 发布者(Publisher) | 往话题发消息的节点 | 主播,只管输出 |
| 实例化 | 用类创建一个具体对象 | 用饼干模具压出一块饼干 |
| FIFO | First In First Out | 先进先出,不许插队 |
| 可迭代对象 | 能挨个掏东西出来的类型 | for x in ... 中 in 后面能放的 |
| 回调函数 | 事件发生时自动调用的函数 | 定时器到期、收到消息时触发 |
| 守护线程 | daemon=True |
主线程退出时自动回收的后台线程 |
| QoS | 服务质量(队列大小等) | create_publisher 第三个参数 |
splitlines() |
按换行符切分字符串 | 把一整块文字切成一行一行的列表 |
colcon build |
ROS2 构建命令 | 编译所有功能包 |
source setup.bash |
加载环境变量 | 让终端能识别你包里的节点 |
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)