摘要:随着国产化替代和数字化转型的推进,越来越多的工业企业考虑将历史数据从 OSIsoft PI 迁移到 TDengine IDMP。本文提供从数据模型映射、历史数据迁移到应用层适配的完整实践指南,帮助企业平稳完成 historian 平台的升级。

一、迁移背景与动机

某大型炼化企业使用 PI 已有 15 年,积累了超过 500 万测点、10 年的历史数据。随着业务扩展,企业面临以下挑战:

  • 性能瓶颈:全厂级查询响应时间超过 30 秒,影响实时监控
  • 成本压力:PI 按测点收费,年 License 费用超过 500 万元
  • 国产化要求:信创战略要求核心系统实现自主可控
  • 云化需求:集团要求数据上云,PI 的封闭架构难以适配

经过评估,企业决定将 historian 平台从 PI 迁移到 TDengine IDMP。

二、数据模型映射

2.1 PI 的标签模型分析

PI 的数据模型以标签(Tag)为核心,每个标签包含:

  • Tag Name:唯一标识符,如 UNIT1.CDU.TI-101
  • Point Attributes:描述信息、工程单位、量程等
  • Time-Series Data:(timestamp, value, quality) 三元组

2.2 TDengine IDMP 的层级模型设计

将 PI 的扁平标签模型映射到 TDengine IDMP 的层级模型:

-- 创建 PI 标签对应的超级表

CREATE STABLE pi_tag_data (

    ts TIMESTAMP,

    value DOUBLE,

    quality INT

) TAGS (

    tag_name BINARY(64),

    unit BINARY(32),

    area BINARY(32),

    equipment BINARY(32),

    descriptor BINARY(128),

    eng_units BINARY(16)

);

-- 标签命名空间映射示例

-- PI: UNIT1.CDU.FURNACE.TI-101

-- TDengine: tag_name='TI-101', unit='UNIT1', area='CDU', equipment='FURNACE'

2.3 批量创建子表

# Python 迁移脚本:从 PI AF 读取标签结构,批量创建 TDengine 子表

import taos

from PIconnect import PI AF

# 连接 PI AF

af_server = PI AF.PISystem('PI-AF-SERVER')

af_database = af_server.Databases['Plant Database']

# 连接 TDengine

td_conn = taos.connect(host="tdengine-idmp", database="plant_data")

cursor = td_conn.cursor()

# 遍历 PI Element 层级,创建对应子表

for element in af_database.Elements:

    unit = element.Name

    for child in element.Elements:

        area = child.Name

        for equip in child.Elements:

            equipment = equip.Name

            for attr in equip.Attributes:

                tag_name = attr.Name

                # 创建子表

                cursor.execute(f"""

                    CREATE TABLE IF NOT EXISTS {tag_name}

                    USING pi_tag_data

                    TAGS ('{tag_name}', '{unit}', '{area}', '{equipment}',

                          '{attr.Description}', '{attr.DefaultUOM}')

                """)

三、历史数据迁移

3.1 PI 数据导出

PI 提供多种数据导出方式:

// 使用 PI SDK 批量导出历史数据

PIServer server = new PIServer("PI-SERVER");

PIPoint point = server.PIPoints["TI-101"];

// 按时间范围导出

AFTimeRange timeRange = new AFTimeRange("*-10y", "*");

PIValues values = point.RecordedValues(timeRange, AFBoundaryType.Inside, null, false, 1000000);

// 导出为 CSV

foreach (AFValue value in values)

{

    Console.WriteLine($"{value.Timestamp},{value.Value},{value.IsGood}");

}

3.2 TDengine IDMP 数据导入

# Python 批量导入脚本

import taos

import csv

from datetime import datetime

def import_pi_data(csv_file, batch_size=10000):

    conn = taos.connect(host="tdengine-idmp", database="plant_data")

    cursor = conn.cursor()

    

    with open(csv_file, 'r') as f:

        reader = csv.reader(f)

        batch = []

        

        for row in reader:

            timestamp = datetime.strptime(row[0], "%Y-%m-%d %H:%M:%S")

            value = float(row[1])

            quality = 0 if row[2] == 'True' else 1

            

            batch.append(f"('{timestamp}', {value}, {quality})")

            

            if len(batch) >= batch_size:

                sql = f"INSERT INTO TI_101 VALUES {','.join(batch)}"

                cursor.execute(sql)

                batch = []

        

        # 导入剩余数据

        if batch:

            sql = f"INSERT INTO TI_101 VALUES {','.join(batch)}"

            cursor.execute(sql)

# 导入 10 年历史数据

import_pi_data("TI-101_history.csv")

3.3 迁移性能优化

优化策略

效果

批量导入(1万条/批次)

导入速度提升 10 倍

多线程并行导入

导入速度提升 5 倍

预创建子表

避免写入时建表开销

调整 TDengine 缓存参数

内存写入比例提升

四、应用层适配

4.1 PI SDK 代码迁移

PI SDK 原始代码:

// PI SDK 读取最新值

PIServer server = new PIServer("PI-SERVER");

PIPoint point = server.PIPoints["TI-101"];

AFValue currentValue = point.CurrentValue();

Console.WriteLine($"Value: {currentValue.Value}, Time: {currentValue.Timestamp}");

TDengine IDMP 等效代码:

# TDengine Python 连接器读取最新值

import taos

conn = taos.connect(host="tdengine-idmp", database="plant_data")

cursor = conn.cursor()

cursor.execute("SELECT LAST(*) FROM TI_101")

result = cursor.fetchone()

print(f"Value: {result[1]}, Time: {result[0]}")

4.2 PI AF 结构迁移

PI AF 的层级结构可以映射到 TDengine IDMP 的标签体系:

PI AF 概念

TDengine IDMP 映射

AF Database

Database

AF Element

超级表 TAGS 层级

AF Attribute

子表(测点)

AF Template

超级表定义

AF Analysis

流计算 / 订阅

4.3 可视化工具替换

PI 工具

TDengine IDMP 替代方案

PI ProcessBook

Grafana + TDengine 数据源

PI Vision

自定义 Web 应用 / 帆软 BI

PI DataLink

Excel + ODBC 连接器

PI WebParts

嵌入式 Grafana 面板

五、数据一致性验证

5.1 数据完整性校验

-- 校验 PI 与 TDengine 的数据条数是否一致

-- PI 侧(通过 PI OLEDB)

SELECT COUNT(*) FROM piarchive..picomp2 WHERE tag = 'TI-101';

-- TDengine 侧

SELECT COUNT(*) FROM TI_101;

5.2 数据准确性校验

# Python 数据对比脚本

import taos

from PIconnect import PIServer

# 读取 PI 数据

pi_server = PIServer('PI-SERVER')

pi_point = pi_server.search('TI-101')[0]

pi_values = pi_point.interpolated_values('2024-01-01', '2024-01-02', '1h')

# 读取 TDengine 数据

conn = taos.connect(host="tdengine-idmp", database="plant_data")

cursor = conn.cursor()

cursor.execute("SELECT ts, value FROM TI_101 WHERE ts >= '2024-01-01' AND ts < '2024-01-02'")

td_values = cursor.fetchall()

# 对比数据

for pi_val, td_val in zip(pi_values, td_values):

    assert abs(pi_val.value - td_val[1]) < 0.001, f"数据不一致: PI={pi_val.value}, TD={td_val[1]}"

print("数据一致性校验通过")

六、迁移风险与应对

风险点

影响

应对措施

历史数据丢失

双写验证 + 增量校验

应用兼容性问题

灰度发布 + 回滚方案

性能不达预期

预生产环境压测

用户培训成本

分批培训 + 操作手册

七、迁移后的收益

该企业完成迁移后,获得了以下收益:

  • 性能提升:全厂级查询从 30 秒降低到 280 毫秒
  • 成本降低:年 License 费用从 500 万元降低到 80 万元
  • 扩展性增强:支持千万级测点,可水平扩展
  • 云化就绪:原生支持 Kubernetes 部署

八、总结

从 PI 迁移到 TDengine IDMP 是一项系统工程,涉及数据模型映射、历史数据迁移、应用层适配等多个环节。通过合理的迁移规划和充分的测试验证,企业可以平稳完成 historian 平台的升级。

对于正在考虑 historian 国产化替代的企业,TDengine IDMP 提供了经过验证的迁移路径和技术支撑。在数字化转型的道路上,选择一款与业务发展匹配的时序 database,是企业构建数据竞争力的重要一步。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐