上周我打通了Milk-V Duo S和机器人的串口控制的脉络,同时制作了两个动作组:下蹲、自由飞翔。

阶段 状态
环境搭建 ✅ 完成
网络配置 ✅ 完成
串口控制 ✅ 完成
动作调度 ✅ 完成
序列执行控制 ✅ 完成
时间控制 ✅ 完成
部分动作组制作 ✅ 完成
线程执行锁 ✅ 完成
云端接入 ⏳ 下一步
复杂动作组制作 ⏳ 下一步
⏳ 下一步

本周我将进行Milk-V Duo S开发板与云端服务器进行通信模块的开发,接入云端,以从云端接收数据,从而进一步实现后端执行层与AI分析层之间的连接 。

云端接入

经过我的调研,找到了两种方案如下:

第一种通过MQTT建立与云端的通信

       让机器人从“本地脚本控制”跃迁到“云端实时指挥”,核心就是在 Milk‑V Duo S 上架起一座 MQTT 消息桥。我本周开发的通信模块 mqtt_control.py 扮演了云端指令的接收者和执行调度器。我们使用 paho.mqtt.client 连接到 MQTT 服务器,订阅 robot/control 主题,一旦收到 JSON 格式的动作序列(如 {"actions": [1, 7, 6, 2, 0]}),就立刻调用之前已经调试成熟的 run_sequence 函数,驱动真实的舵机表演。

提示词
请生成一个 Python 脚本,文件保存为 mqtt_control.py,用于在 Milk-V Duo S 上通过 MQTT 接收云端指令,并控制机器人执行动作序列。要求如下:

- 导入 paho.mqtt.client、json、threading 模块。
- 从 seq_control 模块导入 RobotController、run_sequence、ACTION_DURATION。
  串口默认 "/dev/ttyS0",波特率 115200。
- 定义一个全局锁,用于保护串口通信。
- 定义 connect 回调函数
- 定义 message 回调函数
  - 将接收到的消息载荷用 json.loads 解析为字典 data。
  - 打印 "收到云端指令:" 和 data。
  - 否则,使用 threading.Thread 启动一个新线程,目标函数为 run_sequence
- 创建 MQTT 客户端对象将 connect 和 message 分别赋值给 client.on_connect 和 client.on_message。
- 设置云服务器地址:
  SERVER = "47.93.27.196"
  PORT = 1883
- 打印 "等待云端指令...",然后调用 client.loop_forever() 进入消息循环。
请输出完整的 Python 代码。
LLM产出code
import paho.mqtt.client as mqtt
import json
import threading
from seq_control import RobotController, run_sequence, ACTION_DURATION

# ---------- 机器人控制器初始化 ----------
robot = RobotController(port="/dev/ttyS0", baudrate=115200)

# ---------- 锁,保护串口通信 ----------
lock = threading.Lock()

def on_connect(client, userdata, flags, rc):
    print(f"MQTT 连接成功,状态码: {rc}")
    client.subscribe("robot/control")

def on_message(client, userdata, msg):
    try:
        data = json.loads(msg.payload.decode())
        print("收到云端指令:", data)

        sequence = data.get("actions", [])
        if not sequence:
            print("指令中无动作序列,忽略")
            return

        # 在新线程中执行动作序列,避免阻塞 MQTT 消息接收
        threading.Thread(
            target=run_sequence,
            args=(robot, sequence),
            daemon=True
        ).start()

    except Exception as e:
        print("解析或执行错误:", e)

# ---------- MQTT 客户端配置 ----------
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

# 你的云服务器地址(MQTT 默认端口 1883)
SERVER = "47.93.27.196"
PORT = 1883

client.connect(SERVER, PORT, 60)

print("等待云端指令...")
client.loop_forever()

        这里最关键的升级是 多线程执行。因为动作组里有长达 72 秒的舞蹈,如果直接在 on_message 回调里运行,机器人会在这段时间内完全“失聪”,无法响应任何新指令。我们通过 threading.Thread 将动作序列放到后台线程,并设置为 Daemon 模式,这样主 MQTT 循环不会被阻塞,随时可以接收新的云端命令,甚至能够被更新、中断或重新编排。配合 seq_control.py 中已有的 threading.Lock,两个线程对串口的访问也得到了保护,不会出现指令交错。至此,机器人不再只是一段预设脚本的播放器,而是一个可以实时接收、解析、执行云端动作指令的智能终端,为后续接入 NLP 情绪分析、远程控制等场景铺平了道路。

第二种通过HTTP轮询建立与云端的通信

提示词
生成一个 HTTP 轮询机器人控制脚本 poll_control.py,
从 seq_control 导入 RobotController 和 run_sequence。
服务器地址 http://47.93.27.196:8765,客户端 ID 为 milk_duos_001,
每 2 秒轮询一次 /poll/<id> 获取动作序列,
有新的 command_id 时执行 run_sequence,
并向 /ack/<id> 发送确认。
包含异常处理和 Ctrl+C 停止。
输出完整Python代码.
LLM产出code
import requests
import time
from robot_control import RobotController, run_sequence

# ========= 配置 =========
SERVER = "http://47.93.27.196:8765"
CLIENT_ID = "milk_duos_001"
POLL_INTERVAL = 2

robot = RobotController()


def poll_action():
    url = f"{SERVER}/poll/{CLIENT_ID}"
    try:
        r = requests.get(url, timeout=5)
        return r.json()
    except Exception as e:
        print("轮询失败:", e)
        return None


def ack_action(command_id):
    url = f"{SERVER}/ack/{CLIENT_ID}"
    try:
        requests.post(url, json={"command_id": command_id})
        print("已确认:", command_id)
    except Exception as e:
        print("确认失败:", e)


def main():
    print("🚀 Duo S 已连接云端,开始轮询...")

    last_id = None

    while True:
        result = poll_action()

        if result:
            # 有动作
            if result.get("action_sequence"):
                cmd_id = result.get("command_id")

                # 防止重复执行
                if cmd_id != last_id:
                    sequence = result["action_sequence"]

                    print("收到动作:", sequence)

                    # ⭐ 核心:执行真实机器人动作
                    run_sequence(robot, sequence)

                    # ⭐ 执行完告诉云端
                    ack_action(cmd_id)

                    last_id = cmd_id

            elif result.get("status") == "no_action":
                pass
            else:
                print("未知响应:", result)

        time.sleep(POLL_INTERVAL)


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        robot.stop()
        robot.close()

       采用 HTTP 轮询 的方式让机器人从云端获取动作指令。Doo S 周期性地向服务器发起 GET 请求,查询当前是否有待执行的动作序列;如果返回了新的 command_id,就调用已有的 run_sequence 执行真实机器人动作,执行完毕再通过 POST 请求向服务器确认已完成。这种方式实现简单,无需额外的中间件,仅靠 HTTP 就能在有限资源下跑通。但它本质上是一种“拉”模式:机器人每隔 2 秒主动查一次,动作的响应会有最多 2 秒的延迟,不适合需要即时反应的交互场景。不过在 Duo S 这种嵌入式板卡上,该方案不依赖持久的连接,对网络波动容忍度更高,且容易调试和扩展,很适合初期原型验证。

对比 MQTT HTTP 轮询
消息模式 “推” 模式,云端有新指令立刻推送给机器人 “拉” 模式,由机器人定时主动查询云端指令
实时性 实时性高,指令几乎无延迟 存在固定查询间隔(如 2 秒),响应不够及时
连接依赖 需要维持长连接,对网络稳定性要求高,断网需重连 每次请求独立、无状态,网络适应性强
资源开销 长连接省流量,但客户端需维护心跳 频繁发起请求,即便无新指令也会持续消耗带宽和 CPU
适用场景 交互频繁、要求低延迟的云控场景 指令稀疏、开发早期快速验证场景

根据测试情况外加与组长和组员的交流,我们在测试过程中发现,HTTP轮询的查询间隔(2s)延迟并不明显,于是决定先使用HTTP轮询的方式快速进行项目原型验证。

云端&Duo S&机器人端联调

1.确认云服务器状态正常

curl http://47.93.27.196:8765/health

{"status":"ok"}表明云服务器状态正常,且Duo S 已经成功连上云服务器

2.Duo S在终端运行同时云端发送情绪

curl -X POST http://47.93.27.196:8765/emotion \
-H "Content-Type: application/json" \
-d "{\"content\":\"我人工智能导论只考了95分差一分就能拿到A+了,现在没有达成目标感觉好难过\"}"

刚开始我们是失败状态的,

后来发现犯了一个低级错误,Duo S wifi配置自动执行文件的默认账号密码之前在借给队友时候修改过了,然后现在还没修改为我的热点。

到目录/etc/wpa_supplicant.conf文件 修改完成之后,才实现HTTP正常连接,

同时云服务器再次发送emotion

演示视频2

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐