时序数据库+AI:物联网海量数据的存储与实时分析
·
时序数据库+AI:物联网海量数据的存储与实时分析
IoT设备每秒产生的数据点数以亿计,传统关系型数据库早已不堪重负。时序数据库(TSDB)正是为这种"时间戳+数值"的写入模式而生,配合AI分析可实现真正的实时智能。
为什么需要时序数据库?
传统关系型数据库 (MySQL/PostgreSQL):
┌────────────────────────────────────────┐
│ 表: sensor_data │
│ id │ device_id │ timestamp │ value │
│ ───┼──────────┼──────────┼────────── │
│ 1 │ sensor_01│ 1700000001│ 23.5°C │
│ 2 │ sensor_01│ 1700000002│ 23.6°C │
│ ... │
│ 每天: 1亿行 × 1000设备 = 1000亿行 │
│ 写入: 随机IO, B+树索引, 锁竞争 │
│ 查询: 全表扫描, 聚合慢 │
└────────────────────────────────────────┘
时序数据库 (InfluxDB/TDengine):
┌────────────────────────────────────────┐
│ 按时间分片存储, 顺序写入 │
│ 自动压缩(10-20倍) │
│ 内置时间聚合函数 │
│ 写入: 顺序IO, 无锁, 百万点/秒 │
│ 查询: 预聚合, 秒级响应 │
└────────────────────────────────────────┘
主流时序数据库对比
| 特性 | InfluxDB | TDengine | TimescaleDB | QuestDB |
|---|---|---|---|---|
| 开源 | 部分开源 | 完全开源 | 完全开源 | 完全开源 |
| 语言 | Go | C | C(扩展) | Java |
| 写入速度 | 50万点/秒 | 200万点/秒 | 30万点/秒 | 100万点/秒 |
| 压缩比 | 10:1 | 20:1 | 8:1 | 10:1 |
| SQL支持 | InfluxQL | 兼容SQL | 完全SQL | 兼容SQL |
| 流计算 | 支持 | 支持 | 支持 | 不支持 |
| AI集成 | 一般 | 优秀 | 良好 | 一般 |
| 适合场景 | 通用IoT | 大规模IoT | 已有PG生态 | 分析为主 |
TDengine 实战
安装与配置
# docker-compose.yml
version: '3'
services:
tdengine:
image: tdengine/tdengine:latest
container_name: tdengine
ports:
- "6030:6030"
- "6041:6041"
- "6043:6043"
- "6044:6044"
volumes:
- ./data:/var/lib/taos
- ./log:/var/log/taos
environment:
TAOS_FQDN: tdengine
创建数据库与表
-- 创建数据库,保留365天,每10天一个分区
CREATE DATABASE iot_data
KEEP 365
DURATION 10
BUFFER 256
COMP 2
WAL_LEVEL 1;
USE iot_data;
-- 创建超级表(类似模板)
CREATE STABLE sensor_data (
ts TIMESTAMP,
temperature FLOAT,
humidity FLOAT,
pressure FLOAT,
battery INT
) TAGS (
device_id BINARY(64),
location BINARY(128),
device_type BINARY(32)
);
-- 创建子表(每个设备一张表)
CREATE TABLE sensor_001 USING sensor_data TAGS ('sensor_001', '工厂A-车间1', '温湿度');
CREATE TABLE sensor_002 USING sensor_data TAGS ('sensor_002', '工厂A-车间2', '温湿度');
数据写入
import taos
from datetime import datetime
import random
class IoTDataWriter:
"""IoT数据写入器"""
def __init__(self, host: str = 'localhost', port: int = 6030):
self.conn = taos.connect(host=host, port=port, user='root', password='taosdata')
self.cursor = self.conn.cursor()
self.cursor.execute("USE iot_data")
def write_sensor_data(self, device_id: str, temperature: float,
humidity: float, pressure: float, battery: int):
"""写入单条传感器数据"""
sql = f"""
INSERT INTO {device_id} VALUES (
NOW, {temperature}, {humidity}, {pressure}, {battery}
)
"""
self.cursor.execute(sql)
def batch_write(self, records: list):
"""批量写入(性能最优)"""
# 构建批量SQL
values = []
for r in records:
values.append(f"('{r['device_id']}', NOW, {r['temp']}, {r['humidity']}, {r['pressure']}, {r['battery']})")
sql = f"""
INSERT INTO {values[0][0]} VALUES {', '.join(values)}
"""
self.cursor.execute(sql)
def close(self):
self.cursor.close()
self.conn.close()
# 使用示例
writer = IoTDataWriter()
# 模拟100个设备,每秒写入
for i in range(100):
device_id = f"sensor_{i:03d}"
writer.write_sensor_data(
device_id=device_id,
temperature=20 + random.random() * 10,
humidity=40 + random.random() * 30,
pressure=1013 + random.random() * 10,
battery=int(80 + random.random() * 20)
)
writer.close()
实时查询与分析
import taos
import pandas as pd
from datetime import datetime, timedelta
class IoTDataAnalyzer:
"""IoT数据分析器"""
def __init__(self, host: str = 'localhost'):
self.conn = taos.connect(host=host, user='root', password='taosdata')
self.cursor = self.conn.cursor()
self.cursor.execute("USE iot_data")
def query_recent(self, device_id: str, hours: int = 24) -> pd.DataFrame:
"""查询最近N小时的数据"""
sql = f"""
SELECT ts, temperature, humidity, pressure, battery
FROM {device_id}
WHERE ts >= NOW - {hours}h
ORDER BY ts DESC
"""
self.cursor.execute(sql)
data = self.cursor.fetchall()
columns = ['timestamp', 'temperature', 'humidity', 'pressure', 'battery']
return pd.DataFrame(data, columns=columns)
def aggregate_by_interval(self, device_id: str, interval: str = '1h') -> pd.DataFrame:
"""按时间间隔聚合"""
sql = f"""
SELECT _wstart as ts,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp,
AVG(humidity) as avg_humidity,
COUNT(*) as data_count
FROM {device_id}
WHERE ts >= NOW - 7d
INTERVAL({interval})
"""
self.cursor.execute(sql)
data = self.cursor.fetchall()
columns = ['timestamp', 'avg_temp', 'max_temp', 'min_temp', 'avg_humidity', 'count']
return pd.DataFrame(data, columns=columns)
def detect_anomalies(self, device_id: str, threshold: float = 3.0) -> pd.DataFrame:
"""基于统计方法检测异常"""
df = self.query_recent(device_id, hours=24)
# Z-score异常检测
mean = df['temperature'].mean()
std = df['temperature'].std()
df['z_score'] = (df['temperature'] - mean) / std
df['is_anomaly'] = abs(df['z_score']) > threshold
anomalies = df[df['is_anomaly']]
return anomalies
def forecast(self, device_id: str, steps: int = 24) -> list:
"""简单时序预测(移动平均)"""
df = self.aggregate_by_interval(device_id, '1h')
# 24小时移动平均
df['ma_24'] = df['avg_temp'].rolling(window=24).mean()
# 线性外推
recent = df['ma_24'].dropna().tail(24)
trend = (recent.iloc[-1] - recent.iloc[0]) / len(recent)
forecasts = []
last_value = recent.iloc[-1]
for i in range(steps):
forecast_value = last_value + trend * (i + 1)
forecasts.append(forecast_value)
return forecasts
def close(self):
self.cursor.close()
self.conn.close()
# 使用示例
analyzer = IoTDataAnalyzer()
# 查询最近24小时
df = analyzer.query_recent('sensor_001', hours=24)
print(f"最近24小时数据量: {len(df)} 条")
# 异常检测
anomalies = analyzer.detect_anomalies('sensor_001')
print(f"发现异常: {len(anomalies)} 条")
# 预测未来24小时温度
forecasts = analyzer.forecast('sensor_001', steps=24)
print(f"未来24小时预测温度: {forecasts[:5]}...")
analyzer.close()
与AI模型集成
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
class TimeSeriesPredictor(nn.Module):
"""LSTM时序预测模型"""
def __init__(self, input_size: int = 4, hidden_size: int = 64, num_layers: int = 2):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, 1)
def forward(self, x):
# x: (batch, seq_len, features)
lstm_out, _ = self.lstm(x)
# 取最后一个时间步
last_output = lstm_out[:, -1, :]
prediction = self.fc(last_output)
return prediction
def train_model_from_tdengine(host: str, device_id: str, seq_len: int = 24):
"""从TDengine读取数据训练模型"""
analyzer = IoTDataAnalyzer(host)
# 获取历史数据
df = analyzer.query_recent(device_id, hours=24*30) # 30天数据
# 准备训练数据
features = ['temperature', 'humidity', 'pressure', 'battery']
data = df[features].values
# 创建序列数据
X, y = [], []
for i in range(len(data) - seq_len):
X.append(data[i:i+seq_len])
y.append(data[i+seq_len, 0]) # 预测温度
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32).unsqueeze(1)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, shuffle=True)
# 训练模型
model = TimeSeriesPredictor(input_size=len(features))
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
for epoch in range(50):
total_loss = 0
for batch_X, batch_y in loader:
optimizer.zero_grad()
output = model(batch_X)
loss = criterion(output, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
if epoch % 10 == 0:
print(f"Epoch {epoch}: loss={total_loss/len(loader):.4f}")
return model
存储优化策略
| 策略 | 效果 | 实现方式 |
|---|---|---|
| 数据降采样 | 存储减少90% | 高频原始数据保留7天,聚合数据保留1年 |
| 压缩算法 | 压缩比20:1 | Gorilla编码、Delta编码 |
| 分区策略 | 查询加速 | 按时间分区,自动过期删除 |
| 冷热分离 | 成本降低70% | 热数据SSD,冷数据HDD/S3 |
下期预告
下一篇将探讨 Matter协议:智能家居的统一语言,敬请期待!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)