物联网设备接入与数据处理完全指南
物联网设备接入与数据处理 以 ESP32、树莓派等硬件设备为例,分享设备接入阿里云、AWS IoT 等平台的过程,以及设备数据采集、传输、处理的开发经验,提供硬件代码和云端配置说明。
关于物联网设备接入与数据处理的详细指南,涵盖ESP32和树莓派等设备,涉及阿里云、AWS IoT等平台,以及数据采集、传输、处理的开发经验。几个方向:阿里云和AWS IoT的设备接入文档、MQTT协议细节、ESP32和树莓派的开发入门,以及一些具体的开发案例。
涵盖了阿里云和AWS IoT的设备接入文档、MQTT协议基础,以及ESP32和树莓派的实际开发案例,特别是一个将ESP32作为数据采集节点、树莓派作为本地MQTT代理的农业物联网案例。接下来围绕这个核心架构来组织回答,先说明MQTT的机制,然后分别给出ESP32和树莓派的代码示例,再详细说明阿里云和AWS IoT的云端配置步骤,最后用对比总结不同平台和设备的适用场景。
物联网设备接入与数据处理完全指南
一、项目文件结构组织
iot_project/
├── README.md # 项目总体说明
├── docs/
│ ├── architecture.md # 系统架构设计文档
│ └── platform_comparison.md # 云平台选型对比
├── esp32/
│ ├── main/
│ │ ├── main.c # ESP32 主程序
│ │ ├── wifi_manager.c/h # WiFi 连接管理
│ │ ├── mqtt_client.c/h # MQTT 客户端封装
│ │ ├── sensor_dht22.c/h # DHT22 温湿度传感器驱动
│ │ ├── sensor_bh1750.c/h # BH1750 光照传感器驱动
│ │ └── config.h # 全局配置(WiFi/MQTT 凭证)
│ ├── CMakeLists.txt # ESP-IDF 构建脚本
│ └── sdkconfig # ESP-IDF 项目配置
├── raspberry_pi/
│ ├── broker/
│ │ ├── mosquitto.conf # Mosquitto MQTT Broker 配置
│ │ └── acl_passwd # ACL 访问控制
│ ├── docker-compose.yml # Docker 部署 Node-RED + InfluxDB
│ ├── gateway.py # MQTT → 阿里云/AWS 数据桥接
│ └── data_processor.py # 数据预处理与清洗
├── cloud/
│ ├── aliyun/
│ │ ├── iot_config.json # 阿里云 IoT 产品/设备配置
│ │ └── rules_engine.sql # 规则引擎 SQL(数据流转)
│ └── aws/
│ ├── iot_core_policy.json # AWS IoT Core 权限策略
│ └── greengrass_config.json # Greengrass 边缘配置
└── dashboard/
├── node_red_flow.json # Node-RED 仪表盘流程
└── grafana_dashboard.json # Grafana 可视化面板
二、系统架构 UML 建模
2.1 整体架构:云-边-端三层模型
2.2 MQTT 发布/订阅模型
2.3 设备全生命周期状态机
三、MQTT 协议核心机制解析
3.1 协议定位与网络栈
MQTT 是一种基于发布/订阅模型的轻量级应用层协议,构建于 TCP/IP 之上,能以极少的代码和有限的带宽为远程设备提供实时可靠的消息服务。它在物联网(IoT)领域已成为事实标准通信协议。
┌─────────────────────┐
│ MQTT (应用层) │ 发布/订阅模型、QoS、遗嘱消息
├─────────────────────┤
│ TCP │ 可靠的端到端传输
├─────────────────────┤
│ IP / Wi-Fi / ETH │ 网络层
├─────────────────────┤
│ 物理层 (PHY/MAC) │
└─────────────────────┘
3.2 核心角色与通信流程
MQTT 模型包含三种角色:发布者、代理(Broker)、订阅者,其中发布者和订阅者都是客户端,代理是服务器。
关键通信要素:
- ClientID:设备唯一标识,Broker 据此绑定会话。
- KeepAlive:心跳间隔,Broker 据此判断设备是否掉线(超时未收到 PINGREQ 则断开)。
- 遗嘱消息(Last Will):设备预先设定,意外断连时 Broker 自动发布,通知其他设备。
- topic:消息类别标签,如
sensor/temp,订阅者订阅该 topic 后即可收到对应消息。
3.3 QoS 质量等级详解
| QoS | 名称 | 语义 | 适用场景 | 开销 |
|---|---|---|---|---|
| 0 | 至多一次 | 消息只发一次,不等待确认,可能丢失 | 高频传感数据,丢失一两条无影响 | 最低 |
| 1 | 至少一次 | 消息保证送达,可能重复 | 传感器数据、设备日志 | 中等 |
| 2 | 恰好一次 | 消息精确送达一次,无丢失无重复 | 控制指令、计费数据 | 最高 |
嵌入式场景下,传感数据上报推荐 QoS 1(可靠但不重复处理);控制指令下发推荐 QoS 2(绝对不能丢)。
四、ESP32 设备端开发实战
4.1 ESP-IDF 开发环境搭建
# 1. 安装 ESP-IDF(以 v5.1 为例)
mkdir ~/esp && cd ~/esp
git clone -b v5.1 --recursive https://github.com/espressif/esp-idf.git
cd esp-idf && ./install.sh esp32s3
# 2. 激活环境变量
. $HOME/esp/esp-idf/export.sh
# 3. 创建项目
cd ~/esp
cp -r $IDF_PATH/examples/protocols/mqtt/tcp my_iot_project
cd my_iot_project
idf.py set-target esp32s3
4.2 WiFi 连接模块设计
// wifi_manager.h
#ifndef WIFI_MANAGER_H
#define WIFI_MANAGER_H
#include "esp_wifi.h"
#include "esp_event.h"
typedef void (*wifi_connected_cb_t)(void);
void wifi_manager_init(void);
void wifi_manager_connect(const char *ssid, const char *password,
wifi_connected_cb_t callback);
#endif
// wifi_manager.c
#include "wifi_manager.h"
#include "esp_log.h"
#include "freertos/FreeRTOS.h"
#include "freertos/event_groups.h"
static const char *TAG = "WIFI";
static EventGroupHandle_t wifi_event_group;
static const int WIFI_CONNECTED_BIT = BIT0;
static wifi_connected_cb_t connected_cb = NULL;
static void wifi_event_handler(void *arg, esp_event_base_t event_base,
int32_t event_id, void *event_data)
{
if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_START) {
esp_wifi_connect();
ESP_LOGI(TAG, "WiFi 启动完成,开始连接...");
}
else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_DISCONNECTED) {
ESP_LOGW(TAG, "WiFi 断连,准备重连...");
esp_wifi_connect();
}
else if (event_base == IP_EVENT && event_id == IP_EVENT_STA_GOT_IP) {
ip_event_got_ip_t *event = (ip_event_got_ip_t *)event_data;
ESP_LOGI(TAG, "获取 IP: " IPSTR, IP2STR(&event->ip_info.ip));
xEventGroupSetBits(wifi_event_group, WIFI_CONNECTED_BIT);
if (connected_cb) connected_cb();
}
}
void wifi_manager_init(void) {
wifi_event_group = xEventGroupCreate();
ESP_ERROR_CHECK(esp_netif_init());
ESP_ERROR_CHECK(esp_event_loop_create_default());
esp_netif_create_default_wifi_sta();
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
ESP_ERROR_CHECK(esp_wifi_init(&cfg));
ESP_ERROR_CHECK(esp_event_handler_register(WIFI_EVENT, ESP_EVENT_ANY_ID,
&wifi_event_handler, NULL));
ESP_ERROR_CHECK(esp_event_handler_register(IP_EVENT, IP_EVENT_STA_GOT_IP,
&wifi_event_handler, NULL));
}
void wifi_manager_connect(const char *ssid, const char *password,
wifi_connected_cb_t callback) {
connected_cb = callback;
wifi_config_t wifi_config = {0};
strncpy((char *)wifi_config.sta.ssid, ssid, sizeof(wifi_config.sta.ssid) - 1);
strncpy((char *)wifi_config.sta.password, password, sizeof(wifi_config.sta.password) - 1);
ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));
ESP_ERROR_CHECK(esp_wifi_set_config(WIFI_IF_STA, &wifi_config));
ESP_ERROR_CHECK(esp_wifi_start());
}
设计要点:
WIFI_EVENT_STA_DISCONNECTED事件触发自动重连,确保设备在路由器重启后自动恢复。EventGroupWaitBits可实现阻塞等待 WiFi 连接完成,或用回调解耦后续 MQTT 连接逻辑。- 生产环境中建议使用智能配网(SmartConfig),避免硬编码 WiFi 密码。
4.3 MQTT 客户端核心实现
// mqtt_client.h
#ifndef MQTT_CLIENT_H
#define MQTT_CLIENT_H
#include "mqtt_client.h"
typedef void (*mqtt_data_cb_t)(const char *topic, const char *data, int data_len);
void mqtt_client_start(const char *broker_url, const char *client_id,
const char *username, const char *password,
mqtt_data_cb_t callback);
int mqtt_client_publish(const char *topic, const char *payload, int qos);
bool mqtt_client_is_connected(void);
#endif
// mqtt_client.c
#include "mqtt_client.h"
#include "esp_log.h"
static const char *TAG = "MQTT";
static esp_mqtt_client_handle_t mqtt_handle = NULL;
static mqtt_data_cb_t data_cb = NULL;
static void mqtt_event_handler(void *args, esp_event_base_t base,
int32_t event_id, void *event_data)
{
esp_mqtt_event_handle_t event = event_data;
switch (event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT 已连接 Broker");
// 订阅云端下发指令的主题
esp_mqtt_client_subscribe(mqtt_handle, "cmd/device_001", 1);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGW(TAG, "MQTT 断连");
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "收到消息: topic=%.*s, data=%.*s",
event->topic_len, event->topic,
event->data_len, event->data);
if (data_cb) {
data_cb(event->topic, event->data, event->data_len);
}
break;
case MQTT_EVENT_PUBLISHED:
ESP_LOGD(TAG, "消息发布成功 msg_id=%d", event->msg_id);
break;
default:
break;
}
}
void mqtt_client_start(const char *broker_url, const char *client_id,
const char *username, const char *password,
mqtt_data_cb_t callback)
{
data_cb = callback;
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = broker_url,
.credentials = {
.client_id = client_id,
.username = username,
.authentication.password = password,
},
.session = {
.keepalive = 60,
.disable_clean_session = false,
},
};
mqtt_handle = esp_mqtt_client_init(&mqtt_cfg);
ESP_ERROR_CHECK(esp_mqtt_client_register_event(mqtt_handle, ESP_EVENT_ANY_ID,
mqtt_event_handler, NULL));
ESP_ERROR_CHECK(esp_mqtt_client_start(mqtt_handle));
}
int mqtt_client_publish(const char *topic, const char *payload, int qos) {
if (!mqtt_handle) return -1;
return esp_mqtt_client_publish(mqtt_handle, topic, payload, 0, qos, 0);
}
bool mqtt_client_is_connected(void) {
return mqtt_handle != NULL;
}
关键设计:
- 心跳保活:
keepalive=60,Broker 60秒未收到心跳则判定设备掉线。ESP-IDF MQTT 库自动发送 PINGREQ,开发者无需手动处理。 - 订阅指令:连接成功后立即订阅
cmd/device_001,确保设备随时可接收云端下发的控制指令。 - 遗嘱消息:可在
mqtt_cfg中配置session.last_will,设备异常断连时 Broker 自动发布通知。
4.4 传感器数据采集与上报
// sensor_dht22.c —— DHT22 温湿度传感器驱动
#include "sensor_dht22.h"
#include "driver/gpio.h"
#include "esp_timer.h"
#define DHT22_GPIO 4
#define DHT_TIMEOUT 1000 // 超时微秒
// 等待指定电平持续的最长时间(返回 0=超时, 非0=持续微秒数)
static int wait_pin_level(int level, int timeout_us) {
int64_t start = esp_timer_get_time();
while (gpio_get_level(DHT22_GPIO) == level) {
if (esp_timer_get_time() - start > timeout_us) return 0;
}
return (int)(esp_timer_get_time() - start);
}
int dht22_read(float *temperature, float *humidity) {
uint8_t data[5] = {0};
// 1. 主机发送起始信号:拉低 1ms 再拉高 30µs
gpio_set_direction(DHT22_GPIO, GPIO_MODE_OUTPUT);
gpio_set_level(DHT22_GPIO, 0);
esp_rom_delay_us(1000);
gpio_set_level(DHT22_GPIO, 1);
esp_rom_delay_us(30);
gpio_set_direction(DHT22_GPIO, GPIO_MODE_INPUT);
// 2. 等待 DHT22 响应
if (!wait_pin_level(1, DHT_TIMEOUT)) { return -1; } // 跳过上升沿
if (!wait_pin_level(0, DHT_TIMEOUT)) { return -2; } // 等待响应低电平
if (!wait_pin_level(1, DHT_TIMEOUT)) { return -3; } // 等待响应高电平
// 3. 读取 40 位数据
for (int i = 0; i < 40; i++) {
int low_us = wait_pin_level(0, DHT_TIMEOUT);
int high_us = wait_pin_level(1, DHT_TIMEOUT);
if (low_us == 0 || high_us == 0) return -4; // 超时
data[i / 8] <<= 1;
if (high_us > low_us) data[i / 8] |= 1; // 长高电平 = 1, 短 = 0
}
// 4. 校验
if (((data[0] + data[1] + data[2] + data[3]) & 0xFF) != data[4]) {
return -5; // 校验失败
}
*humidity = (float)((data[0] << 8) | data[1]) / 10.0f;
*temperature = (float)((data[2] << 8) | data[3]) / 10.0f;
return 0;
}
// main.c —— 主程序
#include "wifi_manager.h"
#include "mqtt_client.h"
#include "sensor_dht22.h"
#include "cJSON.h"
#include "esp_log.h"
#include "esp_timer.h"
static const char *TAG = "MAIN";
static esp_timer_handle_t report_timer;
static void report_sensor_data(void *arg)
{
float temp, humi;
if (dht22_read(&temp, &humi) != 0) {
ESP_LOGW(TAG, "传感器读取失败");
return;
}
// 组装 JSON 数据包
cJSON *root = cJSON_CreateObject();
cJSON_AddNumberToObject(root, "temperature", temp);
cJSON_AddNumberToObject(root, "humidity", humi);
cJSON_AddNumberToObject(root, "timestamp", esp_timer_get_time() / 1000000);
char *json_str = cJSON_PrintUnformatted(root);
mqtt_client_publish("sensor/dht22/data", json_str, 1);
ESP_LOGI(TAG, "数据上报: %s", json_str);
cJSON_Delete(root);
free(json_str);
}
static void on_wifi_connected(void) {
// 阿里云平台连接参数(需从 IoT 控制台获取)
mqtt_client_start(
"mqtts://xxxxx.iot-as-mqtt.cn-shanghai.aliyuncs.com:1883",
"esp32_001|securemode=2,signmethod=hmacsha256|",
"esp32_001&xxxxx",
"xxxxx_HMAC_SHA256_SIGN",
NULL
);
}
void app_main(void)
{
wifi_manager_init();
wifi_manager_connect("YOUR_SSID", "YOUR_PASSWORD", on_wifi_connected);
// 创建 5 秒定时器,定期上报传感器数据
esp_timer_create_args_t timer_args = {
.callback = report_sensor_data,
.dispatch_method = ESP_TIMER_TASK,
.name = "report_timer"
};
esp_timer_create(&timer_args, &report_timer);
esp_timer_start_periodic(report_timer, 5000000); // 5秒 = 5,000,000 微秒
}
数据包 JSON 格式示例:
{
"temperature": 25.3,
"humidity": 62.1,
"timestamp": 1718001234
}
五、树莓派边缘计算层开发
5.1 MQTT Broker 搭建
**
在系统架构中,树莓派既作为本地 MQTT Broker 接收 ESP32 的上报数据,又承担将数据同步到云平台的边缘网关角色。
# 安装 Mosquitto Broker
sudo apt update
sudo apt install -y mosquitto mosquitto-clients
# 配置 Mosquitto
sudo tee /etc/mosquitto/conf.d/custom.conf <<EOF
# 监听端口
listener 1883 0.0.0.0 # 本地局域网
listener 8883 0.0.0.0 # TLS 加密端口(可选)
# 认证配置
allow_anonymous false
password_file /etc/mosquitto/passwd
# 持久化与日志
persistence true
persistence_location /var/lib/mosquitto/
log_dest file /var/log/mosquitto/mosquitto.log
log_type all
EOF
# 创建用户
sudo mosquitto_passwd -c /etc/mosquitto/passwd esp32_user
sudo systemctl restart mosquitto
5.2 数据桥接程序(MQTT → 阿里云 / AWS)
#!/usr/bin/env python3
# gateway.py —— 树莓派本地 Broker 到云平台的数据桥接
"""
功能:订阅本地 Mosquitto 上所有设备数据,转发到阿里云 IoT / AWS IoT Core。
"""
import json
import time
import ssl
import paho.mqtt.client as mqtt
# ========== 配置区 ==========
# 本地 Broker
LOCAL_BROKER = "localhost"
LOCAL_PORT = 1883
# 阿里云 IoT(从控制台获取)
ALIYUN_ENDPOINT = "xxxxx.iot-as-mqtt.cn-shanghai.aliyuncs.com"
ALIYUN_PORT = 1883
ALIYUN_CLIENT_ID = "gateway_001|securemode=2,signmethod=hmacsha256|"
ALIYUN_USERNAME = "gateway_001&xxxxx"
ALIYUN_PASSWORD = "xxxxx_HMAC_SHA256_SIGN"
# 设备映射表:本地 topic → 云端 topic
TOPIC_MAP = {
"sensor/dht22/data": "/sys/gateway_001/dht22/thing/event/property/post",
}
# ========== 本地客户端 ==========
def on_local_message(client, userdata, msg):
"""收到本地设备数据后转发到阿里云"""
topic = msg.topic
payload = msg.payload.decode("utf-8")
print(f"[本地] 收到 {topic}: {payload}")
# 获取对应的云端 topic
cloud_topic = TOPIC_MAP.get(topic, f"forward/{topic}")
# 转发到阿里云
result = aliyun_client.publish(cloud_topic, payload, qos=1)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
print(f"[转发] → 阿里云 {cloud_topic}")
else:
print(f"[错误] 转发失败: {result.rc}")
# ========== 阿里云客户端 ==========
aliyun_client = mqtt.Client(client_id=ALIYUN_CLIENT_ID)
aliyun_client.username_pw_set(ALIYUN_USERNAME, ALIYUN_PASSWORD)
aliyun_client.connect(ALIYUN_ENDPOINT, ALIYUN_PORT, keepalive=120)
aliyun_client.loop_start()
# ========== 本地客户端 ==========
local_client = mqtt.Client(client_id="gateway_local")
local_client.username_pw_set("esp32_user", "your_password")
local_client.on_message = on_local_message
local_client.connect(LOCAL_BROKER, LOCAL_PORT, keepalive=60)
local_client.subscribe("sensor/+/data", qos=1) # 订阅所有传感器的数据主题
print("[网关] 启动完成,等待设备数据...")
local_client.loop_forever()
如果不需要树莓派网关,ESP32 也可以直接连接云平台:
// ESP32 直连阿里云 IoT 的参数示例(从 IoT 控制台获取)
mqtt_client_start(
"mqtts://xxxxx.iot-as-mqtt.cn-shanghai.aliyuncs.com:1883", // ← 改为云平台域名
"esp32_dht22_001|securemode=2,signmethod=hmacsha256|",
"esp32_dht22_001&xxxxx_ProductKey",
"计算出的_HMAC_SHA256_签名",
on_command_received // ← 接收云端下发指令的回调
);
两种方式的对比:
| 方式 | 优势 | 劣势 |
|---|---|---|
| ESP32 直连云平台 | 架构简单,省去树莓派 | 证书管理复杂,设备端资源消耗大 |
| 树莓派网关转发 | 本地数据缓存,加密/证书由树莓派处理,ESP32 负担小 | 多一个硬件节点,需维护网关 |
5.3 Docker 一键部署 IoT 服务栈
# docker-compose.yml
version: '3.8'
services:
mosquitto:
image: eclipse-mosquitto:2.0
ports:
- "1883:1883"
- "8883:8883"
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf
- ./passwd:/mosquitto/config/passwd
restart: unless-stopped
node-red:
image: nodered/node-red:latest
ports:
- "1880:1880"
volumes:
- ./node_red_data:/data
environment:
- TZ=Asia/Shanghai
restart: unless-stopped
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
volumes:
- ./influxdb_data:/var/lib/influxdb2
environment:
- INFLUXDB_ADMIN_USER=admin
- INFLUXDB_ADMIN_PASSWORD=admin123
restart: unless-stopped
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- ./grafana_data:/var/lib/grafana
restart: unless-stopped
六、阿里云 IoT 平台配置详解
6.1 云端创建产品与设备
详细操作步骤:
-
创建产品:进入阿里云物联网平台控制台 → 设备管理 → 产品 → 创建产品。
- 产品名称:
温湿度采集器 - 所属品类:自定义品类
- 节点类型:直连设备(ESP32 直接联网)
- 联网方式:Wi-Fi
- 数据格式:ICA 标准数据格式(Alink JSON)
- 产品名称:
-
功能定义:在产品详情页,点击"功能定义" → “添加自定义功能”。
- 属性:
temperature(浮点型,单位℃)、humidity(浮点型,单位%RH) - 服务:
set_interval(设置上报周期,输入参数 interval 整型,单位秒)
- 属性:
-
创建设备:设备管理 → 设备 → 添加设备,选择上面创建的产品,输入 DeviceName 如
esp32_dht22_001。- 创建设备后,系统自动生成设备证书三元组:
ProductKey、DeviceName、DeviceSecret。
- 创建设备后,系统自动生成设备证书三元组:
-
获取 MQTT 连接参数:设备详情页 → 查看 DeviceSecret,点击"MQTT 连接参数"。
Broker Address:{ProductKey}.iot-as-mqtt.{region}.aliyuncs.comClientID:{DeviceName}|securemode=2,signmethod=hmacsha256|Username:{DeviceName}&{ProductKey}Password:基于DeviceSecret+ 时间戳的 HMAC-SHA256 签名
6.2 数据格式(Alink JSON)
设备上报数据到阿里云 IoT 平台须遵循 Alink 协议格式:
{
"id": "123",
"version": "1.0",
"sys": {
"ack": 0
},
"params": {
"temperature": {
"value": 25.3,
"time": 1718001234000
},
"humidity": {
"value": 62.1,
"time": 1718001234000
}
},
"method": "thing.event.property.post"
}
要点说明:
id:消息 ID,建议用毫秒时间戳。method:功能标识,属性上报使用thing.event.property.post。params:以属性标识符为 key,包含value和时间戳。
接收云端下发的控制指令时,设备通过订阅 /${productKey}/${deviceName}/thing/service/property/set topic 获得 JSON 报文。
6.3 规则引擎:数据流转到数据库
在 IoT 控制台 → 规则引擎 → 云产品流转 → 创建规则:
-- 规则 SQL:将 temperature > 35 的数据写入 RDS
SELECT
items.temperature.value as temperature,
items.humidity.value as humidity,
timestamp('yyyy-MM-dd''T''HH:mm:ss') as event_time,
deviceName() as device_id
FROM
"/sys/+/+/thing/event/property/post"
WHERE
items.temperature.value > 35
数据流转流程:
七、AWS IoT Core 平台配置详解
7.1 设备注册与证书管理
AWS IoT Core 的三层电子证件体系:
| 层级 | 文件 | 作用 |
|---|---|---|
| CA 证书 | AmazonRootCA1.pem |
验证 AWS IoT 服务器身份 |
| 设备证书 | device-certificate.pem |
设备唯一身份标识 |
| 私钥 | private-key.pem |
设备持有,建立加密信道 |
AWS IoT Core 强制使用 TLS 加密连接,需要将证书文件上传到 ESP32 的 SPIFFS 分区或直接嵌入代码。
设备身份授权通过 Policy 文件定义:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Connect",
"iot:Publish",
"iot:Subscribe",
"iot:Receive"
],
"Resource": [
"arn:aws:iot:us-east-1:123456789012:topic/sensor/*",
"arn:aws:iot:us-east-1:123456789012:topic/cmd/*"
]
}
]
}
7.2 设备影子(Device Shadow)
AWS IoT Core 的 Device Shadow 服务创建每个设备的持久虚拟状态副本,即使设备离线也可读写其期望状态。
设备端只需配置 MQTT_TOPIC_SHADOW 即可自动完成影子同步:
// ESP-IDF 中开启影子支持
esp_mqtt_client_config_t cfg = {
.broker.address.uri = "mqtts://xxxxxxxx-ats.iot.us-east-1.amazonaws.com",
.credentials.client_id = "esp32_001",
// 开启 AWS IoT 影子
.broker.verification.certificate = (const char *)aws_root_ca,
.credentials.authentication = {
.certificate = (const char *)aws_client_cert,
.key = (const char *)aws_client_key,
},
};
Shadow JSON 示例:
{
"state": {
"desired": {
"led": "on",
"report_interval": 10
},
"reported": {
"led": "off",
"temperature": 25.3,
"humidity": 62.1
}
},
"metadata": { ... },
"timestamp": 1718001234,
"version": 5
}
八、数据处理与可视化
8.1 Node-RED 数据处理流
Node-RED 是树莓派上常用的低代码 IoT 开发工具,通过拖拽方式构建数据流。
Node-RED Function 节点示例代码(温湿度阈值判断):
// 获取传感器数据
var temp = msg.payload.temperature;
var humi = msg.payload.humidity;
var ts = msg.payload.timestamp;
// 设置告警阈值
var TEMP_HIGH = 35.0;
var TEMP_LOW = 5.0;
var HUMI_HIGH = 80.0;
// 判断并生成告警
if (temp > TEMP_HIGH) {
msg.alarm = "高温告警: " + temp + "°C";
return [msg, null];
} else if (temp < TEMP_LOW) {
msg.alarm = "低温告警: " + temp + "°C";
return [msg, null];
} else {
return [null, msg]; // 正常数据流向数据库
}
8.2 InfluxDB 时序数据存储
InfluxDB 是专为 IoT 设计的时序数据库,写入数据时自动添加时间索引:
# data_processor.py
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# 连接 InfluxDB
client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="iot_org")
write_api = client.write_api(write_options=SYNCHRONOUS)
# 构造数据点
point = Point("environment") \
.tag("device", "esp32_001") \
.tag("location", "greenhouse_a") \
.field("temperature", 25.3) \
.field("humidity", 62.1) \
.time(datetime.utcnow(), WritePrecision.NS)
write_api.write(bucket="iot_bucket", record=point)
8.3 Grafana 可视化面板
Grafana 连接 InfluxDB,可创建以下仪表盘:
- 实时温湿度折线图:X 轴为时间,Y 轴为温度/湿度,双 Y 轴显示。
- 设备状态表:按设备 ID 显示最新上报时间、在线状态。
- 告警统计:每日告警触发次数柱状图。
- 阈值线:在温度折线图上叠加红色阈值线(如 35°C),一目了然。
Grafana InfluxDB 查询语句示例:
SELECT mean("temperature") AS "平均温度"
FROM "environment"
WHERE "device" = 'esp32_001'
AND time >= now() - 1h
GROUP BY time(1m) fill(null)
九、完整实战案例:温湿度监测系统端到端实现
9.1 系统硬件清单与连接
| 组件 | 型号 | 数量 | 用途 |
|---|---|---|---|
| 主控 | ESP32-S3 DevKit | 1 | WiFi + 传感器采集 |
| 温湿度 | DHT22 | 1 | 采集温度(℃)、湿度(%RH) |
| 光照 | BH1750FVI(可选) | 1 | 测量光照强度(Lux) |
| 边缘网关 | 树莓派 4B | 1 | MQTT Broker + 边缘处理 |
| 存储 | 32GB SD 卡 | 1 | 本地数据持久化 |
9.2 设备启动与数据上报完整序列
9.3 关键调试技巧
-
WiFi 连接失败排查:
- 用串口查看 ESP32 日志,
ESP_LOGI输出 WiFi 状态。 - 检查 SSID 是否正确(2.4GHz,ESP32 不支持 5GHz WiFi)。
- 用手机热点先验证 WiFi 模块是否正常。
- 用串口查看 ESP32 日志,
-
MQTT 连接失败:
- 检查 Broker 地址和端口是否正确(阿里云非加密端口 1883,AWS 强制 TLS 8883)。
- 阿里云:检查三元组(ProductKey/DeviceName/DeviceSecret)是否正确,HMAC-SHA256 签名是否过期。
- AWS:检查证书是否匹配,Policy 是否包含
iot:Connect权限。 - 用 MQTT 调试工具(MQTTX)模拟客户端验证 Broker 可达。
-
数据上报成功但平台看不到:
- 阿里云需要检查数据格式是否符合 Alink JSON 规范,method 字段是否正确。
- AWS 检查 topic 是否在 Policy 允许的
iot:Publish资源范围内。 - 在云平台控制台的"设备日志"中查看原始报文。
-
Node-RED 数据流断连:
- 检查 Mosquitto Broker 状态:
sudo systemctl status mosquitto。 - 用
mosquitto_sub -h localhost -t sensor/# -u esp32_user -P password手动订阅验证。
- 检查 Mosquitto Broker 状态:
9.4 生产环境优化建议
| 方向 | 具体措施 |
|---|---|
| OTA 升级 | 利用阿里云/ AWS 的 OTA 固件升级能力,持续迭代设备固件 |
| 异常处理 | 传感器读取超时设置 3 次重试,重试失败后上报错误码 |
| 功耗优化 | ESP32 启用 Light Sleep 模式,在 Deep Sleep 下用 ULP 协处理器定时唤醒 |
| 安全加固 | 阿里云使用设备密钥动态签名,AWS 证书定期轮换,树莓派启用 TLS |
| 数据缓存 | 网络断开时设备本地环形缓冲存储 100 条数据,网络恢复后批量补发 |
十、云平台选型建议
| 场景 | 推荐平台 | 理由 |
|---|---|---|
| 国内项目、政企 | 阿里云 IoT | 国内节点覆盖全(6个区域),符合等保认证,本土化支持好 |
| 全球化部署 | AWS IoT Core | 全球 30+ 区域,设备影子强大,生态集成丰富(Lambda/S3/Kinesis) |
| 低成本实验 | 树莓派 Mosquitto | 完全免费,完全掌控,适合原型验证和小规模部署 |
| 灵活整合 | 阿里云/树莓派 双链路 | 本地处理 + 云端备份,兼顾实时性和可靠性 |
十一、总结
本文档以 ESP32(数据采集)+ 树莓派(边缘网关)+ 阿里云/AWS IoT(云端管理)为典型三层架构,覆盖了从 MQTT 协议机制、设备端代码实现、边缘层 Broker 搭建到云端平台配置与数据可视化的完整物联网链路。
核心开发路径可概括为:
- 设备端:ESP32 读取 DHT22 传感器 → 封装 JSON → MQTT 发布到 Broker。
- 边缘层:树莓派运行 Mosquitto Broker,网关程序将数据转发到云平台。
- 云端:阿里云/AWS IoT 接收数据,规则引擎路由到数据库。
- 可视化:Node-RED 处理告警、Grafana 呈现面板,形成完整闭环。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)