物联网设备接入与数据处理 以 ESP32、树莓派等硬件设备为例,分享设备接入阿里云、AWS IoT 等平台的过程,以及设备数据采集、传输、处理的开发经验,提供硬件代码和云端配置说明。

关于物联网设备接入与数据处理的详细指南,涵盖ESP32和树莓派等设备,涉及阿里云、AWS IoT等平台,以及数据采集、传输、处理的开发经验。几个方向:阿里云和AWS IoT的设备接入文档、MQTT协议细节、ESP32和树莓派的开发入门,以及一些具体的开发案例。

涵盖了阿里云和AWS IoT的设备接入文档、MQTT协议基础,以及ESP32和树莓派的实际开发案例,特别是一个将ESP32作为数据采集节点、树莓派作为本地MQTT代理的农业物联网案例。接下来围绕这个核心架构来组织回答,先说明MQTT的机制,然后分别给出ESP32和树莓派的代码示例,再详细说明阿里云和AWS IoT的云端配置步骤,最后用对比总结不同平台和设备的适用场景。

物联网设备接入与数据处理完全指南

一、项目文件结构组织

iot_project/
├── README.md                    # 项目总体说明
├── docs/
│   ├── architecture.md          # 系统架构设计文档
│   └── platform_comparison.md   # 云平台选型对比
├── esp32/
│   ├── main/
│   │   ├── main.c               # ESP32 主程序
│   │   ├── wifi_manager.c/h     # WiFi 连接管理
│   │   ├── mqtt_client.c/h      # MQTT 客户端封装
│   │   ├── sensor_dht22.c/h     # DHT22 温湿度传感器驱动
│   │   ├── sensor_bh1750.c/h    # BH1750 光照传感器驱动
│   │   └── config.h             # 全局配置(WiFi/MQTT 凭证)
│   ├── CMakeLists.txt           # ESP-IDF 构建脚本
│   └── sdkconfig                # ESP-IDF 项目配置
├── raspberry_pi/
│   ├── broker/
│   │   ├── mosquitto.conf       # Mosquitto MQTT Broker 配置
│   │   └── acl_passwd           # ACL 访问控制
│   ├── docker-compose.yml       # Docker 部署 Node-RED + InfluxDB
│   ├── gateway.py               # MQTT → 阿里云/AWS 数据桥接
│   └── data_processor.py        # 数据预处理与清洗
├── cloud/
│   ├── aliyun/
│   │   ├── iot_config.json      # 阿里云 IoT 产品/设备配置
│   │   └── rules_engine.sql     # 规则引擎 SQL(数据流转)
│   └── aws/
│       ├── iot_core_policy.json # AWS IoT Core 权限策略
│       └── greengrass_config.json # Greengrass 边缘配置
└── dashboard/
    ├── node_red_flow.json       # Node-RED 仪表盘流程
    └── grafana_dashboard.json   # Grafana 可视化面板

二、系统架构 UML 建模

2.1 整体架构:云-边-端三层模型

应用呈现层 Application Layer

云平台层 Cloud Layer

边缘计算层 Edge Layer

设备感知层 Device Layer

MQTT
QoS 1

MQTT Bridge
数据同步

MQTT Bridge
数据同步

本地持久化

规则触发

规则引擎

IoT Rules

数据查询

数据推送

阈值判断

ESP32 采集节点
DHT22 + BH1750

树莓派 MQTT Broker
Mosquitto

树莓派 Node-RED
本地规则引擎

InfluxDB
本地时序数据库

阿里云 IoT 平台
设备管理/规则引擎

AWS IoT Core
设备影子/Jobs

云端数据库
RDS / DynamoDB

Grafana 可视化

移动端 App

钉钉/邮件 告警

2.2 MQTT 发布/订阅模型

云平台 (订阅者) Node-RED (订阅者) MQTT Broker (树莓派) ESP32 (发布者) 云平台 (订阅者) Node-RED (订阅者) MQTT Broker (树莓派) ESP32 (发布者) alt [温度超标] 采集 DHT22 温湿度数据 组包 JSON PUBLISH topic="sensor/temp" PUBACK (QoS 1) 转发消息给所有订阅者 转发消息给云平台 规则判断(温度>阈值?) 触发告警动作 规则引擎路由 → 数据库

2.3 设备全生命周期状态机

上电启动

DHCP 获取 IP

连接失败

等待 5s 重试

开始连接 Broker

CONNACK 成功

认证失败/超时

等待 5s 重试

定时器触发 (5s)

传感器读取成功

发布完成

心跳超时/网络断开

自动重连

收到云端下发指令

执行完成并回复

收到固件升级通知

升级成功重启

初始化

WiFi连接中

WiFi已连接

WiFi重试

MQTT连接中

在线

MQTT重试

数据采集中

数据上报

离线

指令执行中

OTA升级

三、MQTT 协议核心机制解析

3.1 协议定位与网络栈

MQTT 是一种基于发布/订阅模型的轻量级应用层协议,构建于 TCP/IP 之上,能以极少的代码和有限的带宽为远程设备提供实时可靠的消息服务。它在物联网(IoT)领域已成为事实标准通信协议

              ┌─────────────────────┐
              │    MQTT (应用层)     │  发布/订阅模型、QoS、遗嘱消息
              ├─────────────────────┤
              │       TCP            │  可靠的端到端传输
              ├─────────────────────┤
              │    IP / Wi-Fi / ETH  │  网络层
              ├─────────────────────┤
              │   物理层 (PHY/MAC)   │
              └─────────────────────┘

3.2 核心角色与通信流程

MQTT 模型包含三种角色:发布者代理(Broker)订阅者,其中发布者和订阅者都是客户端,代理是服务器。

MQTT Broker ESP32 客户端 MQTT Broker ESP32 客户端 第一步:建立 TCP 连接 第二步:订阅主题 第三步:发布数据 第四步:心跳维持 loop [每 KeepAlive 周期] CONNECT 报文 (ClientID, Username, Password, KeepAlive) CONNACK 报文 (连接确认) SUBSCRIBE 报文 (topic="cmd/led", QoS=1) SUBACK 报文 (订阅确认) PUBLISH 报文 (topic="sensor/temp", payload) PUBACK 确认 (QoS=1 时) PINGREQ (心跳请求) PINGRESP (心跳响应)

关键通信要素

  • ClientID:设备唯一标识,Broker 据此绑定会话。
  • KeepAlive:心跳间隔,Broker 据此判断设备是否掉线(超时未收到 PINGREQ 则断开)。
  • 遗嘱消息(Last Will):设备预先设定,意外断连时 Broker 自动发布,通知其他设备。
  • topic:消息类别标签,如 sensor/temp,订阅者订阅该 topic 后即可收到对应消息。

3.3 QoS 质量等级详解

QoS 名称 语义 适用场景 开销
0 至多一次 消息只发一次,不等待确认,可能丢失 高频传感数据,丢失一两条无影响 最低
1 至少一次 消息保证送达,可能重复 传感器数据、设备日志 中等
2 恰好一次 消息精确送达一次,无丢失无重复 控制指令、计费数据 最高

嵌入式场景下,传感数据上报推荐 QoS 1(可靠但不重复处理);控制指令下发推荐 QoS 2(绝对不能丢)。

四、ESP32 设备端开发实战

4.1 ESP-IDF 开发环境搭建

# 1. 安装 ESP-IDF(以 v5.1 为例)
mkdir ~/esp && cd ~/esp
git clone -b v5.1 --recursive https://github.com/espressif/esp-idf.git
cd esp-idf && ./install.sh esp32s3

# 2. 激活环境变量
. $HOME/esp/esp-idf/export.sh

# 3. 创建项目
cd ~/esp
cp -r $IDF_PATH/examples/protocols/mqtt/tcp my_iot_project
cd my_iot_project
idf.py set-target esp32s3

4.2 WiFi 连接模块设计

// wifi_manager.h
#ifndef WIFI_MANAGER_H
#define WIFI_MANAGER_H

#include "esp_wifi.h"
#include "esp_event.h"

typedef void (*wifi_connected_cb_t)(void);

void wifi_manager_init(void);
void wifi_manager_connect(const char *ssid, const char *password,
                          wifi_connected_cb_t callback);

#endif
// wifi_manager.c
#include "wifi_manager.h"
#include "esp_log.h"
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"

static const char *TAG = "WIFI";
static EventGroupHandle_t wifi_event_group;
static const int WIFI_CONNECTED_BIT = BIT0;
static wifi_connected_cb_t connected_cb = NULL;

static void wifi_event_handler(void *arg, esp_event_base_t event_base,
                                int32_t event_id, void *event_data)
{
    if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_START) {
        esp_wifi_connect();
        ESP_LOGI(TAG, "WiFi 启动完成,开始连接...");
    }
    else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_DISCONNECTED) {
        ESP_LOGW(TAG, "WiFi 断连,准备重连...");
        esp_wifi_connect();
    }
    else if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) {
        ip_event_got_ip_t *event = (ip_event_got_ip_t *)event_data;
        ESP_LOGI(TAG, "获取 IP: " IPSTR, IP2STR(&event->ip_info.ip));
        xEventGroupSetBits(wifi_event_group, WIFI_CONNECTED_BIT);
        if (connected_cb) connected_cb();
    }
}

void wifi_manager_init(void) {
    wifi_event_group = xEventGroupCreate();
    ESP_ERROR_CHECK(esp_netif_init());
    ESP_ERROR_CHECK(esp_event_loop_create_default());
    esp_netif_create_default_wifi_sta();

    wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
    ESP_ERROR_CHECK(esp_wifi_init(&cfg));

    ESP_ERROR_CHECK(esp_event_handler_register(WIFI_EVENT, ESP_EVENT_ANY_ID,
                                               &wifi_event_handler, NULL));
    ESP_ERROR_CHECK(esp_event_handler_register(IP_EVENT, IP_EVENT_STA_GOT_IP,
                                               &wifi_event_handler, NULL));
}

void wifi_manager_connect(const char *ssid, const char *password,
                          wifi_connected_cb_t callback) {
    connected_cb = callback;

    wifi_config_t wifi_config = {0};
    strncpy((char *)wifi_config.sta.ssid, ssid, sizeof(wifi_config.sta.ssid) - 1);
    strncpy((char *)wifi_config.sta.password, password, sizeof(wifi_config.sta.password) - 1);

    ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));
    ESP_ERROR_CHECK(esp_wifi_set_config(WIFI_IF_STA, &wifi_config));
    ESP_ERROR_CHECK(esp_wifi_start());
}

设计要点

  • WIFI_EVENT_STA_DISCONNECTED 事件触发自动重连,确保设备在路由器重启后自动恢复。
  • EventGroupWaitBits 可实现阻塞等待 WiFi 连接完成,或用回调解耦后续 MQTT 连接逻辑。
  • 生产环境中建议使用智能配网(SmartConfig),避免硬编码 WiFi 密码。

4.3 MQTT 客户端核心实现

// mqtt_client.h
#ifndef MQTT_CLIENT_H
#define MQTT_CLIENT_H

#include "mqtt_client.h"

typedef void (*mqtt_data_cb_t)(const char *topic, const char *data, int data_len);

void mqtt_client_start(const char *broker_url, const char *client_id,
                       const char *username, const char *password,
                       mqtt_data_cb_t callback);
int  mqtt_client_publish(const char *topic, const char *payload, int qos);
bool mqtt_client_is_connected(void);

#endif
// mqtt_client.c
#include "mqtt_client.h"
#include "esp_log.h"

static const char *TAG = "MQTT";
static esp_mqtt_client_handle_t mqtt_handle = NULL;
static mqtt_data_cb_t data_cb = NULL;

static void mqtt_event_handler(void *args, esp_event_base_t base,
                                int32_t event_id, void *event_data)
{
    esp_mqtt_event_handle_t event = event_data;

    switch (event_id) {
    case MQTT_EVENT_CONNECTED:
        ESP_LOGI(TAG, "MQTT 已连接 Broker");
        // 订阅云端下发指令的主题
        esp_mqtt_client_subscribe(mqtt_handle, "cmd/device_001", 1);
        break;

    case MQTT_EVENT_DISCONNECTED:
        ESP_LOGW(TAG, "MQTT 断连");
        break;

    case MQTT_EVENT_DATA:
        ESP_LOGI(TAG, "收到消息: topic=%.*s, data=%.*s",
                 event->topic_len, event->topic,
                 event->data_len, event->data);
        if (data_cb) {
            data_cb(event->topic, event->data, event->data_len);
        }
        break;

    case MQTT_EVENT_PUBLISHED:
        ESP_LOGD(TAG, "消息发布成功 msg_id=%d", event->msg_id);
        break;

    default:
        break;
    }
}

void mqtt_client_start(const char *broker_url, const char *client_id,
                        const char *username, const char *password,
                        mqtt_data_cb_t callback)
{
    data_cb = callback;

    esp_mqtt_client_config_t mqtt_cfg = {
        .broker.address.uri = broker_url,
        .credentials = {
            .client_id = client_id,
            .username = username,
            .authentication.password = password,
        },
        .session = {
            .keepalive = 60,
            .disable_clean_session = false,
        },
    };

    mqtt_handle = esp_mqtt_client_init(&mqtt_cfg);
    ESP_ERROR_CHECK(esp_mqtt_client_register_event(mqtt_handle, ESP_EVENT_ANY_ID,
                                                    mqtt_event_handler, NULL));
    ESP_ERROR_CHECK(esp_mqtt_client_start(mqtt_handle));
}

int mqtt_client_publish(const char *topic, const char *payload, int qos) {
    if (!mqtt_handle) return -1;
    return esp_mqtt_client_publish(mqtt_handle, topic, payload, 0, qos, 0);
}

bool mqtt_client_is_connected(void) {
    return mqtt_handle != NULL;
}

关键设计

  • 心跳保活keepalive=60,Broker 60秒未收到心跳则判定设备掉线。ESP-IDF MQTT 库自动发送 PINGREQ,开发者无需手动处理。
  • 订阅指令:连接成功后立即订阅 cmd/device_001,确保设备随时可接收云端下发的控制指令。
  • 遗嘱消息:可在 mqtt_cfg 中配置 session.last_will,设备异常断连时 Broker 自动发布通知。

4.4 传感器数据采集与上报

// sensor_dht22.c —— DHT22 温湿度传感器驱动
#include "sensor_dht22.h"
#include "driver/gpio.h"
#include "esp_timer.h"

#define DHT22_GPIO      4
#define DHT_TIMEOUT     1000  // 超时微秒

// 等待指定电平持续的最长时间(返回 0=超时, 非0=持续微秒数)
static int wait_pin_level(int level, int timeout_us) {
    int64_t start = esp_timer_get_time();
    while (gpio_get_level(DHT22_GPIO) == level) {
        if (esp_timer_get_time() - start > timeout_us) return 0;
    }
    return (int)(esp_timer_get_time() - start);
}

int dht22_read(float *temperature, float *humidity) {
    uint8_t data[5] = {0};

    // 1. 主机发送起始信号:拉低 1ms 再拉高 30µs
    gpio_set_direction(DHT22_GPIO, GPIO_MODE_OUTPUT);
    gpio_set_level(DHT22_GPIO, 0);
    esp_rom_delay_us(1000);
    gpio_set_level(DHT22_GPIO, 1);
    esp_rom_delay_us(30);
    gpio_set_direction(DHT22_GPIO, GPIO_MODE_INPUT);

    // 2. 等待 DHT22 响应
    if (!wait_pin_level(1, DHT_TIMEOUT)) { return -1; }  // 跳过上升沿
    if (!wait_pin_level(0, DHT_TIMEOUT)) { return -2; }  // 等待响应低电平
    if (!wait_pin_level(1, DHT_TIMEOUT)) { return -3; }  // 等待响应高电平

    // 3. 读取 40 位数据
    for (int i = 0; i < 40; i++) {
        int low_us  = wait_pin_level(0, DHT_TIMEOUT);
        int high_us = wait_pin_level(1, DHT_TIMEOUT);
        if (low_us == 0 || high_us == 0) return -4;  // 超时

        data[i / 8] <<= 1;
        if (high_us > low_us) data[i / 8] |= 1;  // 长高电平 = 1, 短 = 0
    }

    // 4. 校验
    if (((data[0] + data[1] + data[2] + data[3]) & 0xFF) != data[4]) {
        return -5;  // 校验失败
    }

    *humidity    = (float)((data[0] << 8) | data[1]) / 10.0f;
    *temperature = (float)((data[2] << 8) | data[3]) / 10.0f;

    return 0;
}
// main.c —— 主程序
#include "wifi_manager.h"
#include "mqtt_client.h"
#include "sensor_dht22.h"
#include "cJSON.h"
#include "esp_log.h"
#include "esp_timer.h"

static const char *TAG = "MAIN";
static esp_timer_handle_t report_timer;

static void report_sensor_data(void *arg)
{
    float temp, humi;
    if (dht22_read(&temp, &humi) != 0) {
        ESP_LOGW(TAG, "传感器读取失败");
        return;
    }

    // 组装 JSON 数据包
    cJSON *root = cJSON_CreateObject();
    cJSON_AddNumberToObject(root, "temperature", temp);
    cJSON_AddNumberToObject(root, "humidity", humi);
    cJSON_AddNumberToObject(root, "timestamp", esp_timer_get_time() / 1000000);

    char *json_str = cJSON_PrintUnformatted(root);
    mqtt_client_publish("sensor/dht22/data", json_str, 1);

    ESP_LOGI(TAG, "数据上报: %s", json_str);

    cJSON_Delete(root);
    free(json_str);
}

static void on_wifi_connected(void) {
    // 阿里云平台连接参数(需从 IoT 控制台获取)
    mqtt_client_start(
        "mqtts://xxxxx.iot-as-mqtt.cn-shanghai.aliyuncs.com:1883",
        "esp32_001|securemode=2,signmethod=hmacsha256|",
        "esp32_001&xxxxx",
        "xxxxx_HMAC_SHA256_SIGN",
        NULL
    );
}

void app_main(void)
{
    wifi_manager_init();
    wifi_manager_connect("YOUR_SSID", "YOUR_PASSWORD", on_wifi_connected);

    // 创建 5 秒定时器,定期上报传感器数据
    esp_timer_create_args_t timer_args = {
        .callback = report_sensor_data,
        .dispatch_method = ESP_TIMER_TASK,
        .name = "report_timer"
    };
    esp_timer_create(&timer_args, &report_timer);
    esp_timer_start_periodic(report_timer, 5000000);  // 5秒 = 5,000,000 微秒
}

数据包 JSON 格式示例

{
    "temperature": 25.3,
    "humidity": 62.1,
    "timestamp": 1718001234
}

五、树莓派边缘计算层开发

5.1 MQTT Broker 搭建

**

在系统架构中,树莓派既作为本地 MQTT Broker 接收 ESP32 的上报数据,又承担将数据同步到云平台的边缘网关角色。

# 安装 Mosquitto Broker
sudo apt update
sudo apt install -y mosquitto mosquitto-clients

# 配置 Mosquitto
sudo tee /etc/mosquitto/conf.d/custom.conf <<EOF
# 监听端口
listener 1883 0.0.0.0    # 本地局域网
listener 8883 0.0.0.0    # TLS 加密端口(可选)

# 认证配置
allow_anonymous false
password_file /etc/mosquitto/passwd

# 持久化与日志
persistence true
persistence_location /var/lib/mosquitto/
log_dest file /var/log/mosquitto/mosquitto.log
log_type all
EOF

# 创建用户
sudo mosquitto_passwd -c /etc/mosquitto/passwd esp32_user
sudo systemctl restart mosquitto

5.2 数据桥接程序(MQTT → 阿里云 / AWS)

#!/usr/bin/env python3
# gateway.py —— 树莓派本地 Broker 到云平台的数据桥接
"""
功能:订阅本地 Mosquitto 上所有设备数据,转发到阿里云 IoT / AWS IoT Core。
"""

import json
import time
import ssl
import paho.mqtt.client as mqtt

# ========== 配置区 ==========
# 本地 Broker
LOCAL_BROKER = "localhost"
LOCAL_PORT   = 1883

# 阿里云 IoT(从控制台获取)
ALIYUN_ENDPOINT    = "xxxxx.iot-as-mqtt.cn-shanghai.aliyuncs.com"
ALIYUN_PORT        = 1883
ALIYUN_CLIENT_ID   = "gateway_001|securemode=2,signmethod=hmacsha256|"
ALIYUN_USERNAME    = "gateway_001&xxxxx"
ALIYUN_PASSWORD    = "xxxxx_HMAC_SHA256_SIGN"

# 设备映射表:本地 topic → 云端 topic
TOPIC_MAP = {
    "sensor/dht22/data": "/sys/gateway_001/dht22/thing/event/property/post",
}

# ========== 本地客户端 ==========
def on_local_message(client, userdata, msg):
    """收到本地设备数据后转发到阿里云"""
    topic = msg.topic
    payload = msg.payload.decode("utf-8")
    print(f"[本地] 收到 {topic}: {payload}")

    # 获取对应的云端 topic
    cloud_topic = TOPIC_MAP.get(topic, f"forward/{topic}")
    # 转发到阿里云
    result = aliyun_client.publish(cloud_topic, payload, qos=1)
    if result.rc == mqtt.MQTT_ERR_SUCCESS:
        print(f"[转发] → 阿里云 {cloud_topic}")
    else:
        print(f"[错误] 转发失败: {result.rc}")

# ========== 阿里云客户端 ==========
aliyun_client = mqtt.Client(client_id=ALIYUN_CLIENT_ID)
aliyun_client.username_pw_set(ALIYUN_USERNAME, ALIYUN_PASSWORD)
aliyun_client.connect(ALIYUN_ENDPOINT, ALIYUN_PORT, keepalive=120)
aliyun_client.loop_start()

# ========== 本地客户端 ==========
local_client = mqtt.Client(client_id="gateway_local")
local_client.username_pw_set("esp32_user", "your_password")
local_client.on_message = on_local_message
local_client.connect(LOCAL_BROKER, LOCAL_PORT, keepalive=60)
local_client.subscribe("sensor/+/data", qos=1)  # 订阅所有传感器的数据主题

print("[网关] 启动完成,等待设备数据...")
local_client.loop_forever()

如果不需要树莓派网关,ESP32 也可以直接连接云平台

// ESP32 直连阿里云 IoT 的参数示例(从 IoT 控制台获取)
mqtt_client_start(
    "mqtts://xxxxx.iot-as-mqtt.cn-shanghai.aliyuncs.com:1883",    // ← 改为云平台域名
    "esp32_dht22_001|securemode=2,signmethod=hmacsha256|",
    "esp32_dht22_001&xxxxx_ProductKey",
    "计算出的_HMAC_SHA256_签名",
    on_command_received   // ← 接收云端下发指令的回调
);

两种方式的对比:

方式 优势 劣势
ESP32 直连云平台 架构简单,省去树莓派 证书管理复杂,设备端资源消耗大
树莓派网关转发 本地数据缓存,加密/证书由树莓派处理,ESP32 负担小 多一个硬件节点,需维护网关

5.3 Docker 一键部署 IoT 服务栈

# docker-compose.yml
version: '3.8'
services:
  mosquitto:
    image: eclipse-mosquitto:2.0
    ports:
      - "1883:1883"
      - "8883:8883"
    volumes:
      - ./mosquitto.conf:/mosquitto/config/mosquitto.conf
      - ./passwd:/mosquitto/config/passwd
    restart: unless-stopped

  node-red:
    image: nodered/node-red:latest
    ports:
      - "1880:1880"
    volumes:
      - ./node_red_data:/data
    environment:
      - TZ=Asia/Shanghai
    restart: unless-stopped

  influxdb:
    image: influxdb:2.7
    ports:
      - "8086:8086"
    volumes:
      - ./influxdb_data:/var/lib/influxdb2
    environment:
      - INFLUXDB_ADMIN_USER=admin
      - INFLUXDB_ADMIN_PASSWORD=admin123
    restart: unless-stopped

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    volumes:
      - ./grafana_data:/var/lib/grafana
    restart: unless-stopped

六、阿里云 IoT 平台配置详解

6.1 云端创建产品与设备

登录阿里云 IoT 控制台

创建产品
所属品类: 自定义品类
节点类型: 直连设备
联网方式: Wi-Fi
数据格式: ICA标准

功能定义
添加属性: temperature/humidity
添加服务: set_report_interval

生成设备证书
ProductKey
DeviceName
DeviceSecret

设备开发
获取 MQTT 连接参数
Broker Address / ClientID / Username / Password

设备上线
验证 MQTT 连接与数据上报

详细操作步骤

  1. 创建产品:进入阿里云物联网平台控制台 → 设备管理 → 产品 → 创建产品。

    • 产品名称:温湿度采集器
    • 所属品类:自定义品类
    • 节点类型:直连设备(ESP32 直接联网)
    • 联网方式:Wi-Fi
    • 数据格式:ICA 标准数据格式(Alink JSON)
  2. 功能定义:在产品详情页,点击"功能定义" → “添加自定义功能”。

    • 属性:temperature(浮点型,单位℃)、humidity(浮点型,单位%RH)
    • 服务:set_interval(设置上报周期,输入参数 interval 整型,单位秒)
  3. 创建设备:设备管理 → 设备 → 添加设备,选择上面创建的产品,输入 DeviceName 如 esp32_dht22_001

    • 创建设备后,系统自动生成设备证书三元组:ProductKeyDeviceNameDeviceSecret
  4. 获取 MQTT 连接参数:设备详情页 → 查看 DeviceSecret,点击"MQTT 连接参数"。

    • Broker Address{ProductKey}.iot-as-mqtt.{region}.aliyuncs.com
    • ClientID{DeviceName}|securemode=2,signmethod=hmacsha256|
    • Username{DeviceName}&{ProductKey}
    • Password:基于 DeviceSecret + 时间戳的 HMAC-SHA256 签名

6.2 数据格式(Alink JSON)

设备上报数据到阿里云 IoT 平台须遵循 Alink 协议格式:

{
    "id": "123",
    "version": "1.0",
    "sys": {
        "ack": 0
    },
    "params": {
        "temperature": {
            "value": 25.3,
            "time": 1718001234000
        },
        "humidity": {
            "value": 62.1,
            "time": 1718001234000
        }
    },
    "method": "thing.event.property.post"
}

要点说明

  • id:消息 ID,建议用毫秒时间戳。
  • method:功能标识,属性上报使用 thing.event.property.post
  • params:以属性标识符为 key,包含 value 和时间戳。

接收云端下发的控制指令时,设备通过订阅 /${productKey}/${deviceName}/thing/service/property/set topic 获得 JSON 报文。

6.3 规则引擎:数据流转到数据库

在 IoT 控制台 → 规则引擎 → 云产品流转 → 创建规则:

-- 规则 SQL:将 temperature > 35 的数据写入 RDS
SELECT
    items.temperature.value as temperature,
    items.humidity.value as humidity,
    timestamp('yyyy-MM-dd''T''HH:mm:ss') as event_time,
    deviceName() as device_id
FROM
    "/sys/+/+/thing/event/property/post"
WHERE
    items.temperature.value > 35

数据流转流程

ESP32 设备上报

IoT 平台 MQTT Broker

规则引擎
SQL 过滤/转换

云数据库 RDS
结构化存储

时序数据库
高频数据

函数计算
触发告警

七、AWS IoT Core 平台配置详解

7.1 设备注册与证书管理

登录 AWS Console

IoT Core → Manage → Things → Create Thing

Single Thing → 命名 DeviceID

Auto-generate Certificate

创建并附加 Policy

下载证书文件
device-certificate.pem
private-key.pem
AmazonRootCA1.pem

ESP32 端配置
使用证书建立 TLS 连接

AWS IoT Core 的三层电子证件体系:

层级 文件 作用
CA 证书 AmazonRootCA1.pem 验证 AWS IoT 服务器身份
设备证书 device-certificate.pem 设备唯一身份标识
私钥 private-key.pem 设备持有,建立加密信道

AWS IoT Core 强制使用 TLS 加密连接,需要将证书文件上传到 ESP32 的 SPIFFS 分区或直接嵌入代码。

设备身份授权通过 Policy 文件定义:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iot:Connect",
                "iot:Publish",
                "iot:Subscribe",
                "iot:Receive"
            ],
            "Resource": [
                "arn:aws:iot:us-east-1:123456789012:topic/sensor/*",
                "arn:aws:iot:us-east-1:123456789012:topic/cmd/*"
            ]
        }
    ]
}

7.2 设备影子(Device Shadow)

AWS IoT Core 的 Device Shadow 服务创建每个设备的持久虚拟状态副本,即使设备离线也可读写其期望状态。

移动端 App AWS IoT Shadow ESP32 设备 移动端 App AWS IoT Shadow ESP32 设备 设备在线时 设备离线时 上报 reported: {led: "off", temp: 25} 更新 desired: {led: "on"} 下发 delta: {led: "on"} 执行开灯操作 上报 reported: {led: "on"} 更新 desired: {led: "off"} 等待设备重新连接 设备重新连接后恢复 同步 desired: {led: "off"} 执行关灯操作

设备端只需配置 MQTT_TOPIC_SHADOW 即可自动完成影子同步:

// ESP-IDF 中开启影子支持
esp_mqtt_client_config_t cfg = {
    .broker.address.uri = "mqtts://xxxxxxxx-ats.iot.us-east-1.amazonaws.com",
    .credentials.client_id = "esp32_001",
    // 开启 AWS IoT 影子
    .broker.verification.certificate = (const char *)aws_root_ca,
    .credentials.authentication = {
        .certificate = (const char *)aws_client_cert,
        .key = (const char *)aws_client_key,
    },
};

Shadow JSON 示例:

{
    "state": {
        "desired": {
            "led": "on",
            "report_interval": 10
        },
        "reported": {
            "led": "off",
            "temperature": 25.3,
            "humidity": 62.1
        }
    },
    "metadata": { ... },
    "timestamp": 1718001234,
    "version": 5
}

八、数据处理与可视化

8.1 Node-RED 数据处理流

Node-RED 是树莓派上常用的低代码 IoT 开发工具,通过拖拽方式构建数据流。

MQTT In
topic: sensor/+/data

json
解析JSON

function
温度 > 35?

发送告警通知

邮件/钉钉
告警推送

influxdb out
写入时序数据库

Grafana
可视化面板

file
本地CSV备份

Node-RED Function 节点示例代码(温湿度阈值判断):

// 获取传感器数据
var temp = msg.payload.temperature;
var humi = msg.payload.humidity;
var ts   = msg.payload.timestamp;

// 设置告警阈值
var TEMP_HIGH = 35.0;
var TEMP_LOW  = 5.0;
var HUMI_HIGH = 80.0;

// 判断并生成告警
if (temp > TEMP_HIGH) {
    msg.alarm = "高温告警: " + temp + "°C";
    return [msg, null];
} else if (temp < TEMP_LOW) {
    msg.alarm = "低温告警: " + temp + "°C";
    return [msg, null];
} else {
    return [null, msg];  // 正常数据流向数据库
}

8.2 InfluxDB 时序数据存储

InfluxDB 是专为 IoT 设计的时序数据库,写入数据时自动添加时间索引:

# data_processor.py
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

# 连接 InfluxDB
client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="iot_org")
write_api = client.write_api(write_options=SYNCHRONOUS)

# 构造数据点
point = Point("environment") \
    .tag("device", "esp32_001") \
    .tag("location", "greenhouse_a") \
    .field("temperature", 25.3) \
    .field("humidity", 62.1) \
    .time(datetime.utcnow(), WritePrecision.NS)

write_api.write(bucket="iot_bucket", record=point)

8.3 Grafana 可视化面板

Grafana 连接 InfluxDB,可创建以下仪表盘:

  • 实时温湿度折线图:X 轴为时间,Y 轴为温度/湿度,双 Y 轴显示。
  • 设备状态表:按设备 ID 显示最新上报时间、在线状态。
  • 告警统计:每日告警触发次数柱状图。
  • 阈值线:在温度折线图上叠加红色阈值线(如 35°C),一目了然。

Grafana InfluxDB 查询语句示例

SELECT mean("temperature") AS "平均温度"
FROM "environment"
WHERE "device" = 'esp32_001'
  AND time >= now() - 1h
GROUP BY time(1m) fill(null)

九、完整实战案例:温湿度监测系统端到端实现

9.1 系统硬件清单与连接

组件 型号 数量 用途
主控 ESP32-S3 DevKit 1 WiFi + 传感器采集
温湿度 DHT22 1 采集温度(℃)、湿度(%RH)
光照 BH1750FVI(可选) 1 测量光照强度(Lux)
边缘网关 树莓派 4B 1 MQTT Broker + 边缘处理
存储 32GB SD 卡 1 本地数据持久化

9.2 设备启动与数据上报完整序列

云数据库 阿里云 IoT 树莓派 Broker ESP32 云数据库 阿里云 IoT 树莓派 Broker ESP32 loop [每 5 秒] 上电启动 连接 WiFi MQTT CONNECT (client_id=esp32_001, keepalive=60) CONNACK (成功) SUBSCRIBE cmd/esp32_001 (订阅控制指令) SUBACK (订阅成功) 读取 DHT22 PUBLISH sensor/dht22/data (QoS=1) PUBACK MQTT Bridge 转发 规则引擎路由 → 数据库 下发指令 set_interval=10 PUBLISH cmd/esp32_001 PUBLISH cmd_resp (确认执行) 调整上报周期为 10 秒

9.3 关键调试技巧

  1. WiFi 连接失败排查

    • 用串口查看 ESP32 日志,ESP_LOGI 输出 WiFi 状态。
    • 检查 SSID 是否正确(2.4GHz,ESP32 不支持 5GHz WiFi)。
    • 用手机热点先验证 WiFi 模块是否正常。
  2. MQTT 连接失败

    • 检查 Broker 地址和端口是否正确(阿里云非加密端口 1883,AWS 强制 TLS 8883)。
    • 阿里云:检查三元组(ProductKey/DeviceName/DeviceSecret)是否正确,HMAC-SHA256 签名是否过期。
    • AWS:检查证书是否匹配,Policy 是否包含 iot:Connect 权限。
    • 用 MQTT 调试工具(MQTTX)模拟客户端验证 Broker 可达。
  3. 数据上报成功但平台看不到

    • 阿里云需要检查数据格式是否符合 Alink JSON 规范,method 字段是否正确。
    • AWS 检查 topic 是否在 Policy 允许的 iot:Publish 资源范围内。
    • 在云平台控制台的"设备日志"中查看原始报文。
  4. Node-RED 数据流断连

    • 检查 Mosquitto Broker 状态:sudo systemctl status mosquitto
    • mosquitto_sub -h localhost -t sensor/# -u esp32_user -P password 手动订阅验证。

9.4 生产环境优化建议

方向 具体措施
OTA 升级 利用阿里云/ AWS 的 OTA 固件升级能力,持续迭代设备固件
异常处理 传感器读取超时设置 3 次重试,重试失败后上报错误码
功耗优化 ESP32 启用 Light Sleep 模式,在 Deep Sleep 下用 ULP 协处理器定时唤醒
安全加固 阿里云使用设备密钥动态签名,AWS 证书定期轮换,树莓派启用 TLS
数据缓存 网络断开时设备本地环形缓冲存储 100 条数据,网络恢复后批量补发

十、云平台选型建议

场景 推荐平台 理由
国内项目、政企 阿里云 IoT 国内节点覆盖全(6个区域),符合等保认证,本土化支持好
全球化部署 AWS IoT Core 全球 30+ 区域,设备影子强大,生态集成丰富(Lambda/S3/Kinesis)
低成本实验 树莓派 Mosquitto 完全免费,完全掌控,适合原型验证和小规模部署
灵活整合 阿里云/树莓派 双链路 本地处理 + 云端备份,兼顾实时性和可靠性

十一、总结

本文档以 ESP32(数据采集)+ 树莓派(边缘网关)+ 阿里云/AWS IoT(云端管理)为典型三层架构,覆盖了从 MQTT 协议机制、设备端代码实现、边缘层 Broker 搭建到云端平台配置与数据可视化的完整物联网链路。

核心开发路径可概括为:

  1. 设备端:ESP32 读取 DHT22 传感器 → 封装 JSON → MQTT 发布到 Broker。
  2. 边缘层:树莓派运行 Mosquitto Broker,网关程序将数据转发到云平台。
  3. 云端:阿里云/AWS IoT 接收数据,规则引擎路由到数据库。
  4. 可视化:Node-RED 处理告警、Grafana 呈现面板,形成完整闭环。
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐