实战:基于 TDengine 构建 AI 特征工程的实时流水线
引言
在机器学习项目中,特征工程通常占据 80% 的工作量,其质量直接决定了模型的上限。对于时序数据驱动的 AI 应用,特征工程面临着独特的挑战:数据量大、实时性要求高、特征维度复杂。传统的特征工程方案依赖批处理作业,将数据从数据库导出到 Spark 或 Flink 进行计算,再将结果写回存储,整个流程冗长且低效。
TDengine 作为专为时序数据设计的 database,其内置的流计算引擎为特征工程提供了一种全新的"就地计算"范式。数据无需离开数据库,即可实时生成统计特征、频域特征、时域特征,直接供给下游的 AI 模型使用。本文将通过实际案例,展示如何基于 TDengine 构建高效的 AI 特征工程流水线。
一、时序特征工程的核心需求
1.1 统计特征:描述数据的宏观规律
统计特征是最基础也是最常用的时序特征,包括均值、方差、最大值、最小值、分位数等。这些特征能够描述数据在特定时间窗口内的分布规律,是异常检测、状态识别等任务的常用输入。
例如,在设备监控场景中,温度的均值反映了设备的平均热负荷,方差反映了温度波动的剧烈程度,而最大值则可能预示着散热系统的问题。
1.2 频域特征:揭示数据的周期性规律
许多时序数据具有周期性规律,如设备的振动信号、电网的负荷曲线等。通过快速傅里叶变换(FFT)或小波变换,可以将时域信号转换到频域,提取主频率、频谱能量、谐波分量等特征。
频域特征在旋转机械故障诊断、电力系统谐波分析等场景中尤为重要。例如,轴承外圈故障通常会在特定的特征频率处产生明显的频谱峰值。
1.3 时域特征:捕捉数据的动态变化
时域特征关注数据随时间的变化规律,包括趋势、斜率、自相关性、互相关性等。这些特征能够反映系统的动态特性,对于预测性维护、趋势预测等任务至关重要。
例如,设备温度的上升斜率可以反映散热系统的恶化程度,振动信号的自相关函数可以揭示信号的周期性成分。
二、TDengine 流计算:特征工程的利器
2.1 时间窗口聚合:统计特征的实时生成
TDengine 的流计算引擎支持多种时间窗口类型,包括翻转窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。通过这些窗口,可以实时生成各种统计特征。
-- 创建统计特征流,每 5 分钟计算一次
CREATE STREAM device_stats
INTO TABLE feature_stats
AS
SELECT
_irowts AS ts,
device_id,
AVG(temperature) AS temp_mean,
STDDEV(temperature) AS temp_std,
MAX(temperature) AS temp_max,
MIN(temperature) AS temp_min,
SPREAD(temperature) AS temp_spread,
PERCENTILE(temperature, 95) AS temp_p95
FROM sensor_data
PARTITION BY device_id
INTERVAL(5m)
FILL(PREV);
这条 SQL 创建了一个流计算任务,每 5 分钟为每台设备计算温度的均值、标准差、最大值、最小值、极差和 95 分位数。FILL(PREV) 语句确保在数据缺失时,使用前一个窗口的值进行填充,保证特征的连续性。
2.2 状态窗口:捕捉设备状态变化
在工业场景中,设备的状态变化(如开机、停机、故障)往往伴随着数据特征的显著变化。TDengine 的状态窗口(STATE_WINDOW)可以根据数据状态的变化自动划分窗口,非常适合提取状态相关的特征。
-- 按设备状态划分窗口,计算每个状态下的统计特征
CREATE STREAM state_features
INTO TABLE feature_by_state
AS
SELECT
_irowts AS ts,
device_id,
status,
AVG(temperature) AS temp_mean,
AVG(pressure) AS pressure_mean,
COUNT(*) AS sample_count
FROM sensor_data
PARTITION BY device_id
STATE_WINDOW(status);
这条 SQL 根据 status 字段的值自动划分窗口,当状态变化时,自动关闭当前窗口并开启新窗口。这样可以提取设备在不同运行状态下的特征,用于状态识别和故障诊断。
2.3 数据订阅:特征数据的实时消费
TDengine 的数据订阅(Topic)功能,允许 AI 应用以消息队列的方式实时消费特征数据。这种机制非常适合构建事件驱动的 AI 架构。
import taos
# 创建订阅
conn = taos.connect(host="localhost", user="root", password="taosdata", database="ai_db")
sub = conn.subscribe("feature_stats_topic", "feature_consumer", restart=True)
# 实时消费特征数据
while True:
rows = sub.consume()
for row in rows:
ts, device_id, temp_mean, temp_std = row
# 将特征数据送入 AI 模型进行推理
prediction = model.predict([[temp_mean, temp_std]])
# 根据预测结果触发告警或控制逻辑
if prediction[0] == 1:
send_alert(device_id, "异常状态 detected")
这种实时消费机制,使得 AI 模型可以在特征生成的第一时间获得输入,实现真正的实时推理。
三、高级特征:从时域到频域
3.1 基于 UDF 的频域特征提取
TDengine 支持用户自定义函数(UDF),可以通过 C、Python 等语言编写自定义的计算逻辑。利用 UDF,可以在数据库层面实现频域特征的提取。
# 注册 Python UDF 进行 FFT 计算
import numpy as np
def fft_features(data):
"""计算振动信号的频域特征"""
fft_result = np.fft.fft(data)
magnitude = np.abs(fft_result)
# 提取主频率
main_freq = np.argmax(magnitude[:len(magnitude)//2])
# 提取频谱能量
spectral_energy = np.sum(magnitude ** 2)
# 提取频谱熵
prob = magnitude / np.sum(magnitude)
spectral_entropy = -np.sum(prob * np.log2(prob + 1e-10))
return main_freq, spectral_energy, spectral_entropy
通过将 FFT 计算封装为 UDF,可以在 TDengine 的查询中直接调用,实现频域特征的实时提取。
3.2 时域特征的 SQL 实现
许多时域特征可以通过 SQL 的窗口函数和内置函数实现。例如,计算数据的一阶差分(趋势)和二阶差分(加速度):
-- 计算温度的一阶差分(趋势)
SELECT
ts,
device_id,
temperature,
temperature - LAG(temperature) OVER (PARTITION BY device_id ORDER BY ts) AS temp_diff,
(temperature - LAG(temperature, 2) OVER (PARTITION BY device_id ORDER BY ts)) / 2 AS temp_trend
FROM sensor_data;
通过 LAG 窗口函数,可以方便地计算数据的变化率和趋势,这些特征对于预测性维护等任务非常有价值。
四、特征存储与版本管理
4.1 特征表的设计
在 TDengine 中,可以为不同类型的特征创建专门的超级表,便于管理和查询。
-- 统计特征表
CREATE STABLE feature_stats (
ts TIMESTAMP,
temp_mean FLOAT,
temp_std FLOAT,
temp_max FLOAT,
temp_min FLOAT,
pressure_mean FLOAT,
pressure_std FLOAT
) TAGS (
device_id BINARY(32),
feature_version BINARY(16)
);
-- 频域特征表
CREATE STABLE feature_freq (
ts TIMESTAMP,
main_freq FLOAT,
spectral_energy FLOAT,
spectral_entropy FLOAT
) TAGS (
device_id BINARY(32),
feature_version BINARY(16)
);
通过 feature_version 标签,可以管理不同版本的特征,便于模型回溯和 A/B 测试。
4.2 特征与模型的关联
在 MLOps 实践中,需要建立特征与模型之间的关联。可以通过在特征表中记录模型版本信息,或者在模型元数据中记录使用的特征版本,实现可追溯性。
-- 记录模型使用的特征版本
CREATE TABLE model_feature_mapping (
model_version BINARY(32),
feature_version BINARY(16),
feature_table BINARY(32),
created_at TIMESTAMP
);
五、性能优化:让特征流水线更高效
5.1 预计算与物化视图
对于频繁使用的特征,可以通过 TDengine 的流计算进行预计算,将结果存储在物化视图中,避免重复计算。
-- 创建预计算的特征流
CREATE STREAM precomputed_features
INTO TABLE feature_materialized
AS
SELECT
_irowts AS ts,
device_id,
AVG(temperature) AS temp_mean,
STDDEV(temperature) AS temp_std,
MAX(temperature) - MIN(temperature) AS temp_range
FROM sensor_data
PARTITION BY device_id
INTERVAL(1m);
5.2 分层存储:热特征与冷特征
不同时间段的特征数据,访问频率差异很大。最近生成的特征(热数据)需要频繁访问,而历史特征(冷数据)主要用于模型训练。TDengine 支持自动分层存储,可以将热数据保留在 SSD 上,冷数据自动迁移到 HDD 或对象存储。
-- 创建数据库时配置分层存储
CREATE DATABASE ai_features
DURATION 7d
KEEP 365d
COMP 2;
六、案例:风电设备故障预测的特征工程
6.1 场景描述
某风电集团管理着 2000 多台风机,每台风机配备数百个传感器,实时采集振动、温度、转速、风速等数据。目标是构建一个故障预测模型,提前发现轴承、齿轮箱等关键部件的潜在故障。
6.2 特征工程方案
基于 TDengine,构建了以下特征工程流水线:
第一层:原始数据接入
所有传感器数据实时写入 TDengine,按设备类型创建超级表。
第二层:统计特征生成
通过流计算,每 10 分钟计算一次振动信号的统计特征:
CREATE STREAM vibration_stats
INTO TABLE feature_vibration_stats
AS
SELECT
_irowts AS ts,
device_id,
AVG(vibration_x) AS vx_mean,
STDDEV(vibration_x) AS vx_std,
MAX(vibration_x) AS vx_max,
ROOT_MEAN_SQUARE(vibration_x) AS vx_rms,
KURTOSIS(vibration_x) AS vx_kurtosis,
SKEWNESS(vibration_x) AS vx_skewness
FROM turbine_vibration
PARTITION BY device_id
INTERVAL(10m);
第三层:频域特征生成
通过 UDF,每小时对振动信号进行一次 FFT,提取频域特征:
SELECT
ts,
device_id,
fft_main_freq(vibration_x, 1024) AS main_freq,
fft_spectral_energy(vibration_x, 1024) AS spectral_energy
FROM turbine_vibration
WHERE ts >= NOW() - 1h;
第四层:特征融合与模型训练
将统计特征和频域特征按时间窗口对齐,构建训练数据集,送入 XGBoost 模型进行训练。
6.3 实施效果
通过 TDengine 的实时特征工程流水线,该风电集团实现了:
- 特征延迟降低:从小时级降至分钟级
- 计算资源节省:无需额外的 Spark/Flink 集群,特征计算在数据库层完成
- 模型精度提升:实时特征捕捉了更多动态信息,模型准确率提升 8%
- 运维成本降低:整体数据 pipeline 的运维成本降低 60%
结语
特征工程是 AI 应用的核心环节,而时序特征工程因其数据规模大、实时性要求高,对传统方案提出了严峻挑战。TDengine 通过内置的流计算引擎、UDF 扩展能力和高效的数据存储,为时序特征工程提供了一种全新的"就地计算"范式。数据无需离开数据库,即可实时生成各种统计特征、频域特征和时域特征,直接供给 AI 模型使用。这种架构不仅大幅降低了数据搬运的开销,更提升了特征工程的实时性和可靠性。随着 AI 与数据库技术的进一步融合,TDengine 有望成为 AI 特征工程领域的重要基础设施。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)