引言

在人工智能项目的全生命周期中,模型训练是核心环节之一。而训练数据的质量、获取效率和管道设计的合理性,直接决定了模型的最终表现。对于时序数据驱动的 AI 应用,如工业预测性维护、金融量化交易、能源负荷预测等,训练数据的管理面临着独特的挑战:数据量巨大、时间维度复杂、需要多表关联。

传统的训练数据准备流程通常涉及多个系统:时序数据存储在专用数据库中,标签数据存储在关系型数据库中,模型训练在 GPU 集群上进行。数据需要在多个系统之间反复搬运,不仅效率低下,还容易引入数据不一致的问题。

TDengine 作为一款高性能的时序 database,不仅提供了卓越的数据存储和查询能力,更通过其丰富的数据访问接口和生态集成能力,为 AI 模型训练提供了一站式的数据管道解决方案。本文将深入探讨如何基于 TDengine 设计和优化 AI 模型训练数据管道。

一、AI 模型训练的数据需求分析

1.1 数据规模与访问模式

时序数据驱动的 AI 模型通常需要海量的历史数据进行训练。以工业预测性维护为例,一个包含 1000 台设备的工厂,每台设备有 10 个传感器,采样频率为 1Hz,一年的数据量就达到:

1000 设备 × 10 传感器 × 1 样本/秒 × 86400 秒/天 × 365 天 ≈ 3150 亿条记录

在训练过程中,模型通常不会一次性加载全部数据,而是采用批量(Batch)读取的方式。每个批次包含若干时间窗口的数据,用于计算损失函数和更新模型参数。这就要求数据管道能够支持高效的随机读取和时间范围查询。

1.2 数据对齐与关联

在实际的 AI 应用中,训练数据往往来自多个数据源。例如,在设备故障预测任务中,需要同时获取:

  • 传感器数据:振动、温度、压力等时序数据,存储在 TDengine 中
  • 设备标签:设备型号、安装日期、维护记录等静态信息,存储在关系型数据库中
  • 故障标签:设备故障的时间、类型、严重程度等,通常由人工标注或从工单系统导入

在构建训练样本时,需要将这些数据进行时间对齐和关联。例如,对于某个时间窗口的传感器数据,需要关联上该设备在该时间段内的运行状态和是否发生故障的标签。

1.3 数据版本与可重复性

AI 模型的训练需要保证可重复性,即使用相同的数据和相同的超参数,应该得到相同的结果。这就要求训练数据能够被版本化管理,确保每次训练使用的数据集是一致的。

此外,在模型迭代过程中,可能需要对比不同数据版本对模型性能的影响。例如,增加某个传感器的数据后,模型准确率是否有提升?使用不同时间范围的数据,模型的泛化能力如何?

二、TDengine 的数据访问接口

2.1 Python 连接器:数据科学家的首选

TDengine 提供了功能完善的 Python 连接器(taos),支持通过 pandas DataFrame 直接读取数据。这是数据科学家最常用的数据访问方式。

import taos

import pandas as pd

# 建立连接

conn = taos.connect(

    host="localhost",

    user="root",

    password="taosdata",

    database="industrial_db"

)

# 读取训练数据:获取过去 30 天的传感器数据

df = pd.read_sql("""

    SELECT ts, device_id, vibration_x, vibration_y, temperature, pressure

    FROM sensor_data

    WHERE ts >= NOW() - 30d

    ORDER BY ts

""", conn)

print(f"Loaded {len(df)} records for training")

对于大规模数据,可以采用分页查询的方式,避免一次性加载过多数据导致内存溢出:

# 分页读取数据

batch_size = 100000

offset = 0

all_data = []

while True:

    batch = pd.read_sql(f"""

        SELECT ts, device_id, vibration_x, temperature

        FROM sensor_data

        ORDER BY ts

        LIMIT {batch_size} OFFSET {offset}

    """, conn)

    

    if len(batch) == 0:

        break

    

    all_data.append(batch)

    offset += batch_size

    print(f"Loaded batch {offset // batch_size}, total records: {offset}")

df = pd.concat(all_data, ignore_index=True)

2.2 REST API:跨语言的数据访问

对于使用 Java、Go、Rust 等语言开发的 AI 应用,TDengine 提供了 REST API,支持通过 HTTP 协议进行数据访问。

# 通过 REST API 查询数据

curl -u root:taosdata \

  -d "SELECT ts, device_id, temperature FROM sensor_data LIMIT 10" \

  http://localhost:6041/rest/sql/industrial_db

REST API 的优势在于语言无关性,任何支持 HTTP 的编程语言都可以访问 TDengine。此外,REST API 还支持批量写入,方便将模型的预测结果写回数据库。

2.3 数据订阅:实时数据流

对于在线学习(Online Learning)和增量训练场景,TDengine 的数据订阅(Topic)功能提供了实时数据流的能力。

import taos

conn = taos.connect(host="localhost", user="root", password="taosdata", database="industrial_db")

# 创建数据订阅

sub = conn.subscribe("sensor_data_topic", "model_trainer", restart=True)

# 实时消费数据流

while True:

    rows = sub.consume()

    for row in rows:

        ts, device_id, vibration, temperature = row

        # 将数据送入模型进行在线学习

        model.partial_fit([[vibration, temperature]], [label])

数据订阅机制非常适合构建持续学习的 AI 系统,模型可以随着新数据的到来不断更新,保持对最新数据分布的适应性。

三、训练数据管道的设计模式

3.1 批量训练模式

批量训练是最常见的模型训练模式。在这种模式下,训练数据被划分为多个批次,每个批次包含若干样本,模型在每个批次上计算梯度并更新参数。

基于 TDengine 的批量训练数据管道可以设计如下:

import taos

import pandas as pd

import numpy as np

from torch.utils.data import Dataset, DataLoader

class TDengineDataset(Dataset):

    """基于 TDengine 的 PyTorch Dataset"""

    

    def __init__(self, conn, window_size=100, stride=50):

        self.conn = conn

        self.window_size = window_size

        self.stride = stride

        

        # 获取所有设备 ID

        self.device_ids = pd.read_sql(

            "SELECT DISTINCT device_id FROM sensor_data",

            conn

        )['device_id'].tolist()

        

        # 预计算样本数量

        self.samples = []

        for device_id in self.device_ids:

            count = pd.read_sql(f"""

                SELECT COUNT(*) as cnt FROM sensor_data

                WHERE device_id = '{device_id}'

            """, conn).iloc[0]['cnt']

            

            num_samples = (count - window_size) // stride + 1

            self.samples.extend([(device_id, i) for i in range(num_samples)])

    

    def __len__(self):

        return len(self.samples)

    

    def __getitem__(self, idx):

        device_id, sample_idx = self.samples[idx]

        offset = sample_idx * self.stride

        

        # 从 TDengine 读取时间窗口数据

        df = pd.read_sql(f"""

            SELECT vibration_x, vibration_y, temperature, pressure

            FROM sensor_data

            WHERE device_id = '{device_id}'

            ORDER BY ts

            LIMIT {self.window_size} OFFSET {offset}

        """, self.conn)

        

        # 转换为张量

        features = df.values.astype(np.float32)

        

        # 获取标签(假设标签存储在另一个表中)

        label = self.get_label(device_id, offset)

        

        return features, label

    

    def get_label(self, device_id, offset):

        # 从标签表中获取对应的标签

        result = pd.read_sql(f"""

            SELECT is_fault FROM fault_labels

            WHERE device_id = '{device_id}'

            AND sample_offset = {offset}

        """, self.conn)

        

        return result.iloc[0]['is_fault'] if len(result) > 0 else 0

# 创建 DataLoader

dataset = TDengineDataset(conn)

dataloader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=4)

# 训练循环

for epoch in range(num_epochs):

    for batch_features, batch_labels in dataloader:

        # 前向传播、计算损失、反向传播

        optimizer.zero_grad()

        outputs = model(batch_features)

        loss = criterion(outputs, batch_labels)

        loss.backward()

        optimizer.step()

这种设计模式的优势在于:

  • 按需加载:数据在需要时才从 TDengine 读取,避免了一次性加载全部数据导致的内存问题
  • 随机访问:通过 LIMIT 和 OFFSET 实现随机采样,支持 DataLoader 的 shuffle 操作
  • 多进程支持:num_workers 参数允许多个子进程并行读取数据,加速数据加载

3.2 时间序列分割策略

在时序数据的模型训练中,数据的分割策略至关重要。由于时序数据具有时间相关性,随机分割会导致数据泄露(Data Leakage),即训练集包含了未来信息。正确的方法是按时间顺序分割:

# 按时间顺序分割数据集

def split_time_series_data(conn, train_ratio=0.7, val_ratio=0.15):

    # 获取数据的时间范围

    time_range = pd.read_sql("""

        SELECT MIN(ts) as min_ts, MAX(ts) as max_ts FROM sensor_data

    """, conn).iloc[0]

    

    min_ts, max_ts = time_range['min_ts'], time_range['max_ts']

    total_duration = max_ts - min_ts

    

    # 计算分割点

    train_end = min_ts + total_duration * train_ratio

    val_end = min_ts + total_duration * (train_ratio + val_ratio)

    

    # 创建训练集、验证集、测试集

    train_data = pd.read_sql(f"""

        SELECT * FROM sensor_data WHERE ts < '{train_end}'

    """, conn)

    

    val_data = pd.read_sql(f"""

        SELECT * FROM sensor_data WHERE ts >= '{train_end}' AND ts < '{val_end}'

    """, conn)

    

    test_data = pd.read_sql(f"""

        SELECT * FROM sensor_data WHERE ts >= '{val_end}'

    """, conn)

    

    return train_data, val_data, test_data

3.3 增量训练模式

对于数据持续产生的场景,增量训练(Incremental Learning)是一种高效的模型更新策略。TDengine 的数据订阅功能为增量训练提供了天然的支持。

import taos

from sklearn.linear_model import SGDClassifier

# 初始化模型

model = SGDClassifier()

# 创建数据订阅

conn = taos.connect(host="localhost", user="root", password="taosdata", database="industrial_db")

sub = conn.subscribe("new_data_topic", "incremental_trainer", restart=True)

# 增量训练循环

while True:

    rows = sub.consume()

    

    if len(rows) > 0:

        # 将新数据转换为特征和标签

        X = []

        y = []

        for row in rows:

            ts, device_id, vibration, temperature, label = row

            X.append([vibration, temperature])

            y.append(label)

        

        # 部分拟合(增量更新)

        model.partial_fit(X, y, classes=[0, 1])

        

        print(f"Model updated with {len(rows)} new samples")

四、数据预处理与特征工程

4.1 数据清洗

原始时序数据往往包含噪声和异常值,需要进行清洗。TDengine 的 SQL 接口支持在查询时进行数据清洗:

-- 去除异常值:使用 3-sigma 原则

SELECT ts, device_id, temperature

FROM sensor_data

WHERE ABS(temperature - AVG(temperature) OVER (PARTITION BY device_id))

      < 3 * STDDEV(temperature) OVER (PARTITION BY device_id);

-- 缺失值填充:使用线性插值

SELECT ts, device_id,

       INTERP(temperature) AS temperature_filled

FROM sensor_data

WHERE ts >= NOW() - 7d;

4.2 数据归一化

在将数据送入神经网络之前,通常需要进行归一化处理。可以在 Python 中实现,也可以利用 TDengine 的 UDF 功能:

# 在 Python 中进行归一化

from sklearn.preprocessing import StandardScaler

# 从 TDengine 读取数据

df = pd.read_sql("SELECT temperature, pressure FROM sensor_data", conn)

# 计算统计量并归一化

scaler = StandardScaler()

df_normalized = scaler.fit_transform(df)

# 保存 scaler 参数,用于推理时的归一化

import joblib

joblib.dump(scaler, 'scaler.pkl')

4.3 特征工程

如前文所述,TDengine 的流计算引擎可以实时生成各种统计特征、频域特征和时域特征。这些特征可以直接用于模型训练,无需额外的计算框架。

五、性能优化策略

5.1 查询优化

在训练数据管道中,数据查询是主要的性能瓶颈。以下是一些优化策略:

使用索引:TDengine 对标签列自动建立索引,确保按标签过滤的查询高效执行。

限制返回列:只查询模型需要的列,避免读取无关数据。

-- 优化:只查询需要的列

SELECT vibration_x, temperature FROM sensor_data WHERE device_id = 'D001';

-- 避免:查询所有列

SELECT * FROM sensor_data WHERE device_id = 'D001';

批量读取:使用 LIMIT 和 OFFSET 进行分页查询,避免一次性读取大量数据。

5.2 缓存策略

对于频繁访问的数据,可以在应用层引入缓存机制:

from functools import lru_cache

@lru_cache(maxsize=1000)

def get_device_data(device_id, start_time, end_time):

    """缓存设备数据查询结果"""

    return pd.read_sql(f"""

        SELECT * FROM sensor_data

        WHERE device_id = '{device_id}'

        AND ts >= '{start_time}' AND ts < '{end_time}'

    """, conn)

5.3 并行读取

利用多线程或多进程并行读取数据,可以显著提升数据加载速度:

from concurrent.futures import ThreadPoolExecutor

def load_device_data(device_id):

    return pd.read_sql(f"SELECT * FROM sensor_data WHERE device_id = '{device_id}'", conn)

# 并行加载多个设备的数据

device_ids = ['D001', 'D002', 'D003', 'D004', 'D005']

with ThreadPoolExecutor(max_workers=5) as executor:

    results = list(executor.map(load_device_data, device_ids))

# 合并数据

all_data = pd.concat(results, ignore_index=True)

六、与 MLOps 平台的集成

6.1 数据版本管理

在 MLOps 实践中,需要追踪用于训练每个模型版本的数据集。可以通过在 TDengine 中记录数据版本信息来实现:

-- 创建数据版本表

CREATE TABLE data_versions (

    version_id BINARY(32),

    start_time TIMESTAMP,

    end_time TIMESTAMP,

    description BINARY(256),

    created_at TIMESTAMP

);

-- 记录训练数据版本

INSERT INTO data_versions VALUES (

    'v1.0.0', '2023-01-01 00:00:00', '2023-06-30 23:59:59',

    'Initial training data', NOW()

);

6.2 实验追踪

与 MLflow 等实验追踪平台集成,记录每次实验使用的数据配置:

import mlflow

with mlflow.start_run():

    # 记录数据参数

    mlflow.log_param("data_version", "v1.0.0")

    mlflow.log_param("time_range", "2023-01-01 to 2023-06-30")

    mlflow.log_param("num_devices", 1000)

    

    # 训练模型

    model.fit(train_data, train_labels)

    

    # 记录模型指标

    mlflow.log_metric("accuracy", accuracy)

    mlflow.log_metric("f1_score", f1)

结语

AI 模型训练数据管道的设计,是连接数据基础设施与 AI 应用的关键桥梁。TDengine 作为一款专为时序数据设计的 database,通过其高性能的查询引擎、丰富的数据访问接口和原生的数据处理能力,为 AI 模型训练提供了一站式的数据管道解决方案。从批量训练到增量学习,从数据清洗到特征工程,从单机训练到分布式 MLOps,TDengine 都能够提供有力的支持。随着 AI 技术的不断发展,时序数据库在 AI 数据 pipeline 中的角色将愈发重要,TDengine 有望在这一领域发挥更大的价值。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐