时序数据库+AI:物联网海量数据的存储与实时分析

IoT设备每秒产生的数据点数以亿计,传统关系型数据库早已不堪重负。时序数据库(TSDB)正是为这种"时间戳+数值"的写入模式而生,配合AI分析可实现真正的实时智能。

为什么需要时序数据库?

传统关系型数据库 (MySQL/PostgreSQL):
┌────────────────────────────────────────┐
│  表: sensor_data                        │
│  id │ device_id │ timestamp │ value     │
│  ───┼──────────┼──────────┼──────────  │
│  1  │ sensor_01│ 1700000001│ 23.5°C    │
│  2  │ sensor_01│ 1700000002│ 23.6°C    │
│  ...                                   │
│  每天: 1亿行 × 1000设备 = 1000亿行     │
│  写入: 随机IO, B+树索引, 锁竞争        │
│  查询: 全表扫描, 聚合慢                │
└────────────────────────────────────────┘

时序数据库 (InfluxDB/TDengine):
┌────────────────────────────────────────┐
│  按时间分片存储, 顺序写入              │
│  自动压缩(10-20倍)                     │
│  内置时间聚合函数                      │
│  写入: 顺序IO, 无锁, 百万点/秒         │
│  查询: 预聚合, 秒级响应                │
└────────────────────────────────────────┘

主流时序数据库对比

特性 InfluxDB TDengine TimescaleDB QuestDB
开源 部分开源 完全开源 完全开源 完全开源
语言 Go C C(扩展) Java
写入速度 50万点/秒 200万点/秒 30万点/秒 100万点/秒
压缩比 10:1 20:1 8:1 10:1
SQL支持 InfluxQL 兼容SQL 完全SQL 兼容SQL
流计算 支持 支持 支持 不支持
AI集成 一般 优秀 良好 一般
适合场景 通用IoT 大规模IoT 已有PG生态 分析为主

TDengine 实战

安装与配置

# docker-compose.yml
version: '3'
services:
  tdengine:
    image: tdengine/tdengine:latest
    container_name: tdengine
    ports:
      - "6030:6030"
      - "6041:6041"
      - "6043:6043"
      - "6044:6044"
    volumes:
      - ./data:/var/lib/taos
      - ./log:/var/log/taos
    environment:
      TAOS_FQDN: tdengine

创建数据库与表

-- 创建数据库,保留365天,每10天一个分区
CREATE DATABASE iot_data
  KEEP 365
  DURATION 10
  BUFFER 256
  COMP 2
  WAL_LEVEL 1;

USE iot_data;

-- 创建超级表(类似模板)
CREATE STABLE sensor_data (
  ts TIMESTAMP,
  temperature FLOAT,
  humidity FLOAT,
  pressure FLOAT,
  battery INT
) TAGS (
  device_id BINARY(64),
  location BINARY(128),
  device_type BINARY(32)
);

-- 创建子表(每个设备一张表)
CREATE TABLE sensor_001 USING sensor_data TAGS ('sensor_001', '工厂A-车间1', '温湿度');
CREATE TABLE sensor_002 USING sensor_data TAGS ('sensor_002', '工厂A-车间2', '温湿度');

数据写入

import taos
from datetime import datetime
import random

class IoTDataWriter:
    """IoT数据写入器"""
    
    def __init__(self, host: str = 'localhost', port: int = 6030):
        self.conn = taos.connect(host=host, port=port, user='root', password='taosdata')
        self.cursor = self.conn.cursor()
        self.cursor.execute("USE iot_data")
    
    def write_sensor_data(self, device_id: str, temperature: float, 
                          humidity: float, pressure: float, battery: int):
        """写入单条传感器数据"""
        sql = f"""
        INSERT INTO {device_id} VALUES (
            NOW, {temperature}, {humidity}, {pressure}, {battery}
        )
        """
        self.cursor.execute(sql)
    
    def batch_write(self, records: list):
        """批量写入(性能最优)"""
        # 构建批量SQL
        values = []
        for r in records:
            values.append(f"('{r['device_id']}', NOW, {r['temp']}, {r['humidity']}, {r['pressure']}, {r['battery']})")
        
        sql = f"""
        INSERT INTO {values[0][0]} VALUES {', '.join(values)}
        """
        self.cursor.execute(sql)
    
    def close(self):
        self.cursor.close()
        self.conn.close()


# 使用示例
writer = IoTDataWriter()

# 模拟100个设备,每秒写入
for i in range(100):
    device_id = f"sensor_{i:03d}"
    writer.write_sensor_data(
        device_id=device_id,
        temperature=20 + random.random() * 10,
        humidity=40 + random.random() * 30,
        pressure=1013 + random.random() * 10,
        battery=int(80 + random.random() * 20)
    )

writer.close()

实时查询与分析

import taos
import pandas as pd
from datetime import datetime, timedelta

class IoTDataAnalyzer:
    """IoT数据分析器"""
    
    def __init__(self, host: str = 'localhost'):
        self.conn = taos.connect(host=host, user='root', password='taosdata')
        self.cursor = self.conn.cursor()
        self.cursor.execute("USE iot_data")
    
    def query_recent(self, device_id: str, hours: int = 24) -> pd.DataFrame:
        """查询最近N小时的数据"""
        sql = f"""
        SELECT ts, temperature, humidity, pressure, battery
        FROM {device_id}
        WHERE ts >= NOW - {hours}h
        ORDER BY ts DESC
        """
        self.cursor.execute(sql)
        data = self.cursor.fetchall()
        columns = ['timestamp', 'temperature', 'humidity', 'pressure', 'battery']
        return pd.DataFrame(data, columns=columns)
    
    def aggregate_by_interval(self, device_id: str, interval: str = '1h') -> pd.DataFrame:
        """按时间间隔聚合"""
        sql = f"""
        SELECT _wstart as ts, 
               AVG(temperature) as avg_temp,
               MAX(temperature) as max_temp,
               MIN(temperature) as min_temp,
               AVG(humidity) as avg_humidity,
               COUNT(*) as data_count
        FROM {device_id}
        WHERE ts >= NOW - 7d
        INTERVAL({interval})
        """
        self.cursor.execute(sql)
        data = self.cursor.fetchall()
        columns = ['timestamp', 'avg_temp', 'max_temp', 'min_temp', 'avg_humidity', 'count']
        return pd.DataFrame(data, columns=columns)
    
    def detect_anomalies(self, device_id: str, threshold: float = 3.0) -> pd.DataFrame:
        """基于统计方法检测异常"""
        df = self.query_recent(device_id, hours=24)
        
        # Z-score异常检测
        mean = df['temperature'].mean()
        std = df['temperature'].std()
        df['z_score'] = (df['temperature'] - mean) / std
        df['is_anomaly'] = abs(df['z_score']) > threshold
        
        anomalies = df[df['is_anomaly']]
        return anomalies
    
    def forecast(self, device_id: str, steps: int = 24) -> list:
        """简单时序预测(移动平均)"""
        df = self.aggregate_by_interval(device_id, '1h')
        
        # 24小时移动平均
        df['ma_24'] = df['avg_temp'].rolling(window=24).mean()
        
        # 线性外推
        recent = df['ma_24'].dropna().tail(24)
        trend = (recent.iloc[-1] - recent.iloc[0]) / len(recent)
        
        forecasts = []
        last_value = recent.iloc[-1]
        for i in range(steps):
            forecast_value = last_value + trend * (i + 1)
            forecasts.append(forecast_value)
        
        return forecasts
    
    def close(self):
        self.cursor.close()
        self.conn.close()


# 使用示例
analyzer = IoTDataAnalyzer()

# 查询最近24小时
df = analyzer.query_recent('sensor_001', hours=24)
print(f"最近24小时数据量: {len(df)} 条")

# 异常检测
anomalies = analyzer.detect_anomalies('sensor_001')
print(f"发现异常: {len(anomalies)} 条")

# 预测未来24小时温度
forecasts = analyzer.forecast('sensor_001', steps=24)
print(f"未来24小时预测温度: {forecasts[:5]}...")

analyzer.close()

与AI模型集成

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

class TimeSeriesPredictor(nn.Module):
    """LSTM时序预测模型"""
    
    def __init__(self, input_size: int = 4, hidden_size: int = 64, num_layers: int = 2):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)
    
    def forward(self, x):
        # x: (batch, seq_len, features)
        lstm_out, _ = self.lstm(x)
        # 取最后一个时间步
        last_output = lstm_out[:, -1, :]
        prediction = self.fc(last_output)
        return prediction


def train_model_from_tdengine(host: str, device_id: str, seq_len: int = 24):
    """从TDengine读取数据训练模型"""
    analyzer = IoTDataAnalyzer(host)
    
    # 获取历史数据
    df = analyzer.query_recent(device_id, hours=24*30)  # 30天数据
    
    # 准备训练数据
    features = ['temperature', 'humidity', 'pressure', 'battery']
    data = df[features].values
    
    # 创建序列数据
    X, y = [], []
    for i in range(len(data) - seq_len):
        X.append(data[i:i+seq_len])
        y.append(data[i+seq_len, 0])  # 预测温度
    
    X = torch.tensor(X, dtype=torch.float32)
    y = torch.tensor(y, dtype=torch.float32).unsqueeze(1)
    
    dataset = TensorDataset(X, y)
    loader = DataLoader(dataset, batch_size=32, shuffle=True)
    
    # 训练模型
    model = TimeSeriesPredictor(input_size=len(features))
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()
    
    for epoch in range(50):
        total_loss = 0
        for batch_X, batch_y in loader:
            optimizer.zero_grad()
            output = model(batch_X)
            loss = criterion(output, batch_y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: loss={total_loss/len(loader):.4f}")
    
    return model

存储优化策略

策略 效果 实现方式
数据降采样 存储减少90% 高频原始数据保留7天,聚合数据保留1年
压缩算法 压缩比20:1 Gorilla编码、Delta编码
分区策略 查询加速 按时间分区,自动过期删除
冷热分离 成本降低70% 热数据SSD,冷数据HDD/S3

下期预告

下一篇将探讨 Matter协议:智能家居的统一语言,敬请期待!

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐