核心目标:基于 pyiec61850-ng Python 库开发 IEC 61850 Server/Client,实现 MMS 数据读写、报告订阅、控制操作以及 GOOSE 收发。

前置知识:Part 2 的信息模型(LD/LN/DO/DA/DataSet)和 Part 4 的 SCL 配置概念,Python 基础。


5.1 开源方案选型与 pyiec61850-ng 简介

5.1.1 方案对比

在 Part 5 中,我们原本规划了多个开源库的对比。但真正进入代码实战前,需要认清一个现实:IEC 61850 的完整实现非常复杂,Python 生态中能真正覆盖 MMS + GOOSE + SV 三驾马车的成熟方案并不多。

语言 MMS GOOSE SV 适合场景
libIEC61850 C 嵌入式、高性能、完整功能
pyiec61850-ng Python (C 绑定) 原型验证、测试自动化、快速开发
OpenIEC61850 Java PC 端跨平台
py61850 Python (纯) 部分 学术研究

本系列选择 pyiec61850-ng 作为实战主角,原因有三:

  1. Python 生态:无需 C 环境编译,pip install 即用,适合工程验证和测试自动化
  2. 完整的 MMS + GOOSE + SV 支持:基于 libIEC61850 的 SWIG 绑定,底层是经过生产验证的 C 库
  3. 精心封装的安全 API:提供上下文管理器、类型自动转换、异常体系等现代 Python 特性,避免了底层 C 库的空指针和内存泄漏问题

5.1.2 pyiec61850-ng 的架构

C 底层库

SWIG 绑定层

pyiec61850-ng 封装层

Python 应用层

你的 Python 代码
(Server / Client / GOOSE)

pyiec61850.mms
MMSClient, ControlClient
ReportClient

pyiec61850.goose
GoosePublisher
GooseSubscriber

pyiec61850.sv
SVSubscriber

pyiec61850.server
IedServer, ServerConfig

pyiec61850.pyiec61850
原始 C 函数绑定

libiec61850 (C)
by MZ Automation

5.1.3 安装与环境

# 安装最新发布版
pip install pyiec61850-ng

# 如果需要在 ARM Linux 上使用,建议交叉编译 libiec61850
# 然后在目标平台安装对应的 wheel

# 验证安装
python -c "from pyiec61850.mms import MMSClient; print('OK')"

注意事项

  • GOOSE 和 SV 功能需要 root/管理员权限,因为它们需要在链路层创建原始套接字
  • pyiec61850-ng 在 Windows 上的 GOOSE/SV 支持有限,建议在 Linux 环境下进行 GOOSE/SV 开发
  • MMS 功能(Server/Client)在所有平台上都可以正常运行

5.2 搭建一个 IEC 61850 Server

5.2.1 准备模型配置文件

pyiec61850-ng 的 Server 加载的是 SCL 格式的模型配置文件(ICD/CID/SCD)。下面是一个最小的 ICD 文件,定义了一个具有模拟量输出和开关量输出的简单 IED:

<?xml version="1.0" encoding="UTF-8"?>
<SCL xmlns="http://www.iec.ch/61850/2003/SCL"
     version="2007" revision="B">
  <Header id="SimpleServer_v1" version="1.0" revision="0"
          nameStructure="IEDName"/>

  <DataTypeTemplates>
    <LNodeType id="LLN0_Type" lnClass="LLN0" iedType="Simple">
      <DO name="Mod" type="INC_Type"/>
      <DO name="Health" type="ENS_Type"/>
      <DO name="NamPlt" type="LPL_Type"/>
    </LNodeType>

    <LNodeType id="MMXU_Type" lnClass="MMXU" iedType="Simple">
      <DO name="TotW" type="MV_Type"/>    <!-- 总有功功率 -->
      <DO name="TotVAr" type="MV_Type"/>  <!-- 总无功功率 -->
    </LNodeType>

    <LNodeType id="GGIO_Type" lnClass="GGIO" iedType="Simple">
      <DO name="Ind1" type="SPS_Type"/>   <!-- 状态输入 1 -->
      <DO name="SPCSO1" type="SPC_Type"/> <!-- 控制输出 1 -->
    </LNodeType>

    <DOType id="INC_Type" cdc="INC">
      <DA name="stVal" bType="INT8" fc="ST"/>
      <DA name="q" bType="Quality" fc="ST"/>
      <DA name="t" bType="Timestamp" fc="ST"/>
      <DA name="ctlVal" bType="INT8" fc="CO"/>
    </DOType>

    <DOType id="ENS_Type" cdc="ENS">
      <DA name="stVal" bType="Enum" type="HealthEnum" fc="ST"/>
      <DA name="q" bType="Quality" fc="ST"/>
      <DA name="t" bType="Timestamp" fc="ST"/>
    </DOType>

    <DOType id="SPS_Type" cdc="SPS">
      <DA name="stVal" bType="BOOLEAN" fc="ST"/>
      <DA name="q" bType="Quality" fc="ST"/>
      <DA name="t" bType="Timestamp" fc="ST"/>
    </DOType>

    <DOType id="SPC_Type" cdc="SPC">
      <DA name="stVal" bType="BOOLEAN" fc="ST"/>
      <DA name="q" bType="Quality" fc="ST"/>
      <DA name="t" bType="Timestamp" fc="ST"/>
      <DA name="ctlVal" bType="BOOLEAN" fc="CO"/>
    </DOType>

    <DOType id="MV_Type" cdc="MV">
      <DA name="mag" bType="Struct" type="Vector_Type" fc="MX"/>
      <DA name="q" bType="Quality" fc="MX"/>
      <DA name="t" bType="Timestamp" fc="MX"/>
    </DOType>

    <DOType id="LPL_Type" cdc="LPL">
      <DA name="vendor" bType="VisibleString" fc="DC"/>
      <DA name="swRev" bType="VisibleString" fc="DC"/>
    </DOType>

    <DAType id="Vector_Type">
      <BDA name="f" bType="FLOAT32"/>
    </DAType>

    <EnumType id="HealthEnum">
      <EnumVal ord="0">Ok</EnumVal>
      <EnumVal ord="1">Warning</EnumVal>
      <EnumVal ord="2">Alarm</EnumVal>
    </EnumType>
  </DataTypeTemplates>

  <IED name="SimpleServer" manufacturer="Example" configVersion="1.0">
    <AccessPoint name="S1">
      <Server>
        <Authentication none="true"/>
        <LDevice inst="simpleIOGenericIO">
          <LN lnType="LLN0_Type" lnClass="LLN0" inst="1"/>
          <LN lnType="MMXU_Type" lnClass="MMXU" inst="1"/>
          <LN lnType="GGIO_Type" lnClass="GGIO" inst="1"/>
        </LDevice>
      </Server>
    </AccessPoint>
  </IED>
</SCL>

将上述内容保存为 simple_server.icd。这个文件定义了一个名为 SimpleServer 的 IED,包含一个逻辑设备 simpleIOGenericIO,其下有:

  • LLN0:管理 LN
  • MMXU1:测量单元(TotW.mag.f = 总功率浮点值)
  • GGIO1:通用 I/O(Ind1.stVal = 状态值,SPCSO1.stVal = 控制输出值)

5.2.2 最小 Server 示例

#!/usr/bin/env python3
"""
最小 IEC 61850 Server:每秒更新一个模拟测量值。
"""
import math
import sys
import time

from pyiec61850.server import IedServer, ServerConfig


def main() -> None:
    if len(sys.argv) != 2:
        print(f"用法: {sys.argv[0]} <模型配置文件>")
        sys.exit(1)

    port = 102
    # 加载模型文件,创建 Server
    with IedServer(sys.argv[1], ServerConfig(port=port, max_connections=5)) as server:
        server.start(port)
        print(f"IEC 61850 Server 已启动,端口 {port}")
        print("监听地址: 0.0.0.0:102")
        print("按 Ctrl+C 停止...")

        t = 0
        try:
            while True:
                # 模拟一个按正弦波变化的功率值
                value = 230.0 + 10.0 * math.sin(t * 0.1)

                # 更新数据模型(线程安全)
                server.lock_data_model()
                try:
                    server.update_float(
                        "simpleIOGenericIO/MMXU1.TotW.mag.f", value
                    )
                finally:
                    server.unlock_data_model()

                t += 1
                time.sleep(1)
        except KeyboardInterrupt:
            print("\n正在停止服务器...")


if __name__ == "__main__":
    main()

运行方法:

# 终端 1:启动 Server
python simple_server.py simple_server.icd

# 终端 2:用 IED Scout 或我们的 Client 连接 localhost:102

关键 API 说明:

API 作用
IedServer(model_cfg, config) 创建 IED Server 实例,加载模型配置
ServerConfig(port, max_connections) 服务器配置:端口和最大连接数
server.start(port) 启动 MMS 服务,开始监听
server.lock_data_model() / unlock_data_model() 加锁/解锁数据模型,保证线程安全
server.update_float(ref, value) 更新浮点型数据属性的值
server.update_int32(ref, value) 更新 32 位整型数据属性
server.update_boolean(ref, value) 更新布尔型数据属性
server.update_quality(ref, quality) 更新数据品质字段

注意server.update_* 方法必须在 lock_data_model() / unlock_data_model() 的范围内调用,以确保多个客户端同时访问时的数据一致性。

5.2.3 验证 Server

启动 Server 后,可以用任意 MMS 客户端连接测试。最简单的方式是使用我们的 Client 示例(见 5.3 节)或者使用专门的测试工具:

# 使用 pyiec61850-ng 自带的 Client 示例连接测试
python 01_basic_connection.py localhost
# 预期输出:Vendor: Example  Model: SimpleServer  Revision: ...

5.3 开发 MMS Client

5.3.1 基础连接与浏览模型

连接并获取服务器身份
#!/usr/bin/env python3
"""
示例 1:基础连接 —— 连接 IEC 61850 Server 并获取身份信息。
"""
import sys
from pyiec61850.mms import MMSClient


def main() -> None:
    if len(sys.argv) != 2:
        print(f"用法: {sys.argv[0]} <服务器地址>")
        sys.exit(1)

    # MMSClient 支持上下文管理器,自动管理连接生命周期
    with MMSClient(sys.argv[1]) as client:
        identity = client.get_server_identity()
        print(f"厂商 (Vendor):   {identity.vendor}")
        print(f"型号 (Model):    {identity.model}")
        print(f"版本 (Revision): {identity.revision}")


if __name__ == "__main__":
    main()

运行: python client_01_connect.py localhost

浏览数据模型
#!/usr/bin/env python3
"""
示例 2:设备发现 —— 遍历 IED 的完整数据模型树。
"""
import sys
from pyiec61850.mms import MMSClient


def main() -> None:
    if len(sys.argv) != 2:
        print(f"用法: {sys.argv[0]} <服务器地址>")
        sys.exit(1)

    with MMSClient(sys.argv[1]) as client:
        for device in client.get_logical_devices():
            print(f"LD: {device}")
            for node in client.get_logical_nodes(device):
                print(f"  LN: {node}")
                for obj in client.get_data_objects(device, node):
                    print(f"    DO: {device}/{node}.{obj}")


if __name__ == "__main__":
    main()

预期输出(对应我们前面的 Server 模型):

LD: simpleIOGenericIO
  LN: LLN0
    DO: simpleIOGenericIO/LLN0.Mod
    DO: simpleIOGenericIO/LLN0.Health
    DO: simpleIOGenericIO/LLN0.NamPlt
  LN: MMXU1
    DO: simpleIOGenericIO/MMXU1.TotW
    DO: simpleIOGenericIO/MMXU1.TotVAr
  LN: GGIO1
    DO: simpleIOGenericIO/GGIO1.Ind1
    DO: simpleIOGenericIO/GGIO1.SPCSO1

API 速查:

方法 返回值 说明
get_logical_devices() list[str] 所有逻辑设备名称
get_logical_nodes(device) list[str] 指定 LD 下的 LN 名称
get_data_objects(device, node) list[str] 指定 LN 下的 DO 名称
get_data_attributes(device, node, do) list[str] 指定 DO 下的 DA 名称

5.3.2 读取数据值

#!/usr/bin/env python3
"""
示例 3:读取数据 —— 读取一个或多个数据属性的值。
"""
import sys
from pyiec61850.mms import MMSClient, ReadError


def main() -> None:
    if len(sys.argv) < 3:
        print(f"用法: {sys.argv[0]} <服务器地址> <引用1> [引用2 ...]")
        sys.exit(1)

    host, *refs = sys.argv[1:]
    with MMSClient(host) as client:
        for ref in refs:
            try:
                value = client.read_value(ref)
                print(f"{ref} = {value!r}")
            except ReadError as e:
                print(f"{ref}: 读取失败 ({e})")


if __name__ == "__main__":
    main()

运行示例:

python client_03_read.py localhost \
  "simpleIOGenericIO/MMXU1.TotW.mag.f" \
  "simpleIOGenericIO/MMXU1.TotW.q" \
  "simpleIOGenericIO/GGIO1.Ind1.stVal"

预期输出:

simpleIOGenericIO/MMXU1.TotW.mag.f = 238.4
simpleIOGenericIO/MMXU1.TotW.q = 0100000000000000
simpleIOGenericIO/GGIO1.Ind1.stVal = False

数据引用格式:

格式: LDName/LNName.DOName.DAName
示例: simpleIOGenericIO/MMXU1.TotW.mag.f
                    ↑     ↑     ↑     ↑
                    LD    LN    DO    DA

read_value() 自动将底层 MmsValue 转换为 Python 原生类型:

CDC 类型 DA 示例 Python 类型
BOOLEAN stVal bool
INT8/16/32 stVal int
FLOAT32/64 mag.f float
VisibleString vendor str
Timestamp t datetime (或 int)
Quality q str (十六进制)

5.3.3 批量读取数据集

很多时候我们需要一次性读取一组相关的数据点,而不是逐个读取。使用 read_dataset() 可以大幅减少网络往返次数:

#!/usr/bin/env python3
"""
示例 4:数据集批量读取 —— 一次 MMS 请求读取整个数据集。
"""
import sys
from pyiec61850.mms import MMSClient


def main() -> None:
    if len(sys.argv) != 3:
        print(f"用法: {sys.argv[0]} <服务器地址> <数据集引用>")
        sys.exit(1)

    host, ds_ref = sys.argv[1], sys.argv[2]
    with MMSClient(host) as client:
        values = client.read_dataset(ds_ref)
        print(f"数据集 {ds_ref}{len(values)} 个成员:")
        for i, v in enumerate(values):
            print(f"  [{i}] {v!r}")


if __name__ == "__main__":
    main()

运行示例(如果 Server 配置了 DataSet):

python client_04_dataset.py localhost "simpleIOGenericIO/LLN0.DSReport"

为什么批量读取重要? 在监控上百个遥测点的场景下,逐个 read_value() 需要 N 次网络往返。而 read_dataset() 只需要 1 次。当 N=100 时,网络延迟从 100ms 降为 1ms,差距是 100 倍。

5.3.4 写入数据值

#!/usr/bin/env python3
"""
示例 5:写入数据 —— 向 Server 写入数据值。
"""
import sys
from pyiec61850.mms import MMSClient, WriteError


def main() -> None:
    if len(sys.argv) != 4:
        print(f"用法: {sys.argv[0]} <服务器地址> <引用> <值>")
        sys.exit(1)

    host, ref, raw_value = sys.argv[1], sys.argv[2], sys.argv[3]

    # 自动类型推断:尝试转为 float→int→bool
    try:
        value = float(raw_value)
        if value == int(value):
            value = int(value) if not isinstance(value, bool) else value
    except ValueError:
        value = raw_value

    with MMSClient(host) as client:
        try:
            client.write_value(ref, value)
            print(f"写入成功: {ref} = {value!r}")
        except WriteError as e:
            print(f"写入失败: {e}")


if __name__ == "__main__":
    main()

运行示例:

# 写入布尔值
python client_05_write.py localhost \
  "simpleIOGenericIO/GGIO1.SPCSO1.stVal" True

# 写入浮点数
python client_05_write.py localhost \
  "simpleIOGenericIO/MMXU1.TotW.mag.f" 220.5

注意:写入操作需要 Server 端允许写入。在默认的 SCL 模型中,状态值(ST 功能约束)通常是只读的。控制输出(CO 功能约束)才允许写入。实际写入哪个 DA 取决于 IED 的访问控制配置。

5.3.5 订阅 Report

报告(Report)是 IEC 61850 MMS 中最核心的"推送"机制。当 Server 端数据变化时,它主动向订阅的 Client 发送报告,避免了 Client 频繁轮询。

#!/usr/bin/env python3
"""
示例 6:报告订阅 —— 订阅 Server 的缓冲报告控制块(BRCB)。
"""
import sys
import time

from pyiec61850.mms import MMSClient
from pyiec61850.mms.report import ReportClient


def on_report(report):
    """收到报告时的回调函数"""
    print(f"\n=== 收到报告 ===")
    print(f"  RptID: {report.rpt_id}")
    print(f"  序列号: {report.seq_num}")
    print(f"  条目数: {len(report.entries)}")
    for entry in report.entries[:5]:  # 只打印前 5 条
        print(f"    {entry.reference}: {entry.value}")


def main() -> None:
    if len(sys.argv) != 3:
        print(f"用法: {sys.argv[0]} <服务器地址> <RCB 引用>")
        print(f"示例:")
        print(f"  {sys.argv[0]} 192.168.1.100 \"SimpleServer/simpleIOGenericIO/LLN0.RP.BRCB1\"")
        sys.exit(1)

    host, rcb_ref = sys.argv[1], sys.argv[2]

    with MMSClient(host) as client:
        reports = ReportClient(client)

        # 1. 安装报告处理回调
        reports.install_report_handler(rcb_ref, "my_client", on_report)
        print(f"已安装报告处理程序: {rcb_ref}")

        # 2. 启用报告
        reports.enable_reporting(rcb_ref)
        print("报告已启用")

        # 3. 触发一次总召唤(GI),立即获取当前所有值
        reports.trigger_gi_report(rcb_ref)
        print("总召唤已触发")

        # 4. 等待报告
        print("等待报告... (按 Ctrl+C 停止)")
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("\n正在停止...")

        # 5. 清理
        reports.disable_reporting(rcb_ref)
        reports.uninstall_all_handlers()


if __name__ == "__main__":
    main()

运行条件:Server 端的 ICD 文件中必须配置了 ReportControl:

<LN lnType="LLN0_Type" lnClass="LLN0" inst="1">
  <ReportControl name="BRCB1" rptID="SimpleServer/LLN0.BRCB1"
                 datSet="dsReport" intgPd="5000"
                 buffered="true" bufTime="100">
    <TrgOps dchg="true" qchg="true" dupd="false" period="false"/>
  </ReportControl>
  <DataSet name="dsReport">
    <!-- 数据集成员定义 -->
    <FCDA ldInst="simpleIOGenericIO" lnClass="MMXU" lnInst="1"
          doName="TotW" fc="MX"/>
  </DataSet>
</LN>

报告工作流程:

"Server (IedServer)" Client "Server (IedServer)" Client 无报告时靠轮询 安装报告后 数据变化 (dchg/qchg) 总召唤响应 intgPd 超时 MMS Read (数据轮询) installReportHandler(RCB) OK enableReporting(RCB) OK Report (数据变化触发) Report (GI 响应) Report (周期触发)

BRCB(缓冲报告)vs URCB(非缓冲报告)的区别:

特性 BRCB(缓冲) URCB(非缓冲)
断线恢复 自动补发断线期间的数据 丢失断线期间的数据
典型场景 保护事件、需要完整记录的场合 状态监视、可以容忍丢数据的场合
bufTime 聚合多个事件打包发送 立即发送
资源占用 需要缓冲区 不需要缓冲区

5.3.6 控制操作

控制操作是 IEC 61850 中最严谨的部分,涉及"选择-执行"两阶段确认机制。

#!/usr/bin/env python3
"""
示例 7:控制操作 —— Direct Operate 与 SBO (Select-Before-Operate)。
"""
import sys

from pyiec61850.mms import MMSClient
from pyiec61850.mms.control import ControlClient, OperateError, SelectError


def main() -> None:
    if len(sys.argv) != 3:
        print(f"用法: {sys.argv[0]} <服务器地址> <控制对象引用>")
        print(f"示例:")
        print(f"  {sys.argv[0]} localhost \"simpleIOGenericIO/GGIO1.SPCSO1\"")
        sys.exit(1)

    host, ref = sys.argv[1], sys.argv[2]

    with MMSClient(host) as client:
        ctrl = ControlClient(client)

        # 1. 查询控制模型
        model = ctrl.get_control_model(ref)
        model_names = {
            0: "status-only",
            1: "direct-with-normal-security",
            2: "sbo-with-normal-security",
            3: "direct-with-enhanced-security",
            4: "sbo-with-enhanced-security",
        }
        print(f"控制模型: {model_names.get(model, f'未知({model})')}")

        # 2. 尝试 Direct Operate(直接操作)
        print("\n尝试 Direct Operate (True)...")
        try:
            ctrl.direct_operate(ref, True)
            print("  ✓ Direct Operate 成功")
        except OperateError as e:
            print(f"  ✗ Direct Operate 失败: {e}")

        # 3. 尝试 SBO(选择-后-操作)
        print("\n尝试 SBO (False)...")
        try:
            # 第一步:选择
            ctrl.select(ref)
            print("  ✓ Select 成功")
            # 第二步:执行
            ctrl.operate(ref, False)
            print("  ✓ Operate 成功")
        except (SelectError, OperateError) as e:
            print(f"  ✗ SBO 失败: {e}")

        # 4. 释放所有控制资源
        ctrl.release_all()
        print("\n所有控制资源已释放")


if __name__ == "__main__":
    main()

五种控制模型的选择场景:

模型名称 常量 流程 安全等级 典型场景
status-only 0 只读,不可控
direct-with-normal-security 1 Operate 普通 紧急跳闸、简单开关
sbo-with-normal-security 2 Select → Operate 普通 常规遥控
direct-with-enhanced-security 3 Operate + Check 加强 需校验的直接控制
sbo-with-enhanced-security 4 Select + Check → Operate 加强 断路器等高安全对象

SBO 控制完整时序:

Server Client Server Client 锁定对象 分配操作令牌 验证令牌 执行操作 释放控制锁 Select(对象引用) Select 响应 (成功/失败) Operate(对象引用, 值) Operate 响应 (成功/失败) (可选)Cancel Cancel 响应

工程最佳实践

  1. 始终先调用 get_control_model() 确认控制模型类型
  2. 对于 SBO 模型,select()operate() 之间存在超时(sboTimeout),默认通常为 30 秒
  3. 操作完成后务必调用 release_all() 释放资源
  4. 增强型模型(enhanced)要求 Client 在操作时附带 Check 参数(同步检查、联锁检查)

5.4 GOOSE 发布与订阅

GOOSE 是 IEC 61850 中信号实时传输的核心机制。与 MMS 不同,GOOSE 不基于 TCP/IP,而是直接在以太网二层上传输组播报文。

5.4.1 GOOSE Publisher(发布者)

#!/usr/bin/env python3
"""
示例 8:GOOSE Publisher —— 在指定网络接口发布 GOOSE 消息。
需要 root/管理员权限。

用法:
    sudo python goose_publisher.py <网络接口>
    sudo python goose_publisher.py eth0
"""
import sys
import time

from pyiec61850.goose import GoosePublisher


def main() -> None:
    if len(sys.argv) < 2:
        print(f"用法: sudo {sys.argv[0]} <网络接口>")
        print(f"示例: sudo {sys.argv[0]} eth0")
        sys.exit(1)

    interface = sys.argv[1]

    with GoosePublisher(interface) as pub:
        # 配置 GOOSE 控制块参数
        pub.set_go_cb_ref("simpleIOGenericIO/LLN0$GO$gcbAnalogValues")
        pub.set_app_id(0x1000)
        pub.set_conf_rev(1)
        pub.set_time_allowed_to_live(2000)  # 2 秒

        print(f"GOOSE Publisher 启动: {interface}")
        pub.start()

        print("开始发布 GOOSE 消息 (Ctrl+C 停止)...")
        try:
            counter = 0
            while True:
                # 发布一组数据值
                values = [True, counter, 3.14 * counter, "status_ok"]
                pub.publish(values)
                print(f"  发布: stNum={counter}, 值={values}")

                # 数据变化时递增 stNum
                counter += 1
                pub.increase_st_num()
                time.sleep(1)
        except KeyboardInterrupt:
            print("\n停止发布...")

    print("GOOSE 发布结束。")


if __name__ == "__main__":
    main()

GOOSE 发布关键参数:

参数 说明 典型值
go_cb_ref GOOSE 控制块引用 LD/LN$GO$CBName
app_id APPID,全站唯一 0x0001 ~ 0x3FFF
conf_rev 配置版本号,结构变化时递增 初始为 1
time_allowed_to_live 最大允许生存时间 (ms) 2000 (2秒)
increase_st_num() 状态编号递增,数据变化时调用 每次数据集变化
publish(values) 发布一组数据值 值类型:bool/int/float/str

5.4.2 GOOSE Subscriber(订阅者)

#!/usr/bin/env python3
"""
示例 9:GOOSE Subscriber —— 订阅并接收 GOOSE 消息。
需要 root/管理员权限。

用法:
    sudo python goose_subscriber.py <网络接口> <GOOSE 控制块引用>
    sudo python goose_subscriber.py eth0 "simpleIOGenericIO/LLN0$GO$gcbAnalogValues"
"""
import sys
import time

from pyiec61850.goose import GooseSubscriber


def on_goose_message(msg):
    """收到 GOOSE 消息时的回调函数"""
    print(f"\n=== GOOSE 消息 ===")
    print(f"  stNum (状态号): {msg.st_num}")
    print(f"  sqNum (序列号): {msg.sq_num}")
    print(f"  数据有效: {msg.is_valid}")
    print(f"  数据值: {msg.values}")
    # msg.values 是一个列表,顺序对应发布时的数据集顺序


def main() -> None:
    if len(sys.argv) != 3:
        print(f"用法: sudo {sys.argv[0]} <接口> <GOOSE 控制块引用>")
        print(f"示例: sudo {sys.argv[0]} eth0 "
              "\"simpleIOGenericIO/LLN0$GO$gcbAnalogValues\"")
        sys.exit(1)

    interface, go_cb_ref = sys.argv[1], sys.argv[2]

    with GooseSubscriber(interface, go_cb_ref) as sub:
        sub.set_listener(on_goose_message)
        sub.start()
        print(f"GOOSE 订阅者已启动: {interface}")
        print(f"监听 GOOSE 控制块: {go_cb_ref}")
        print("等待 GOOSE 消息... (Ctrl+C 停止)")

        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("\n停止订阅...")


if __name__ == "__main__":
    main()

5.4.3 GOOSE 的状态机与重传机制

GOOSE Subscriber GOOSE Publisher GOOSE Subscriber GOOSE Publisher 数据变化事件发生 第一帧:立即发送 重传:MinTime (例如 2ms) 重传:MinTime × 2 (4ms) 重传:MinTime × 4 (8ms) 指数退避... 稳定状态:MaxTime 间隔 如果 TimeAllowedToLive 超时未收到 则判定 GOOSE 连接断开 下一次数据变化 stNum 递增,sqNum 从0重新开始 Event (stNum=N, sqNum=0) Retransmission 1 (stNum=N, sqNum=1) Retransmission 2 (stNum=N, sqNum=2) Retransmission 3 (stNum=N, sqNum=3) Heartbeat (stNum=N, sqNum=...) — 进入心跳阶段 Event (stNum=N+1, sqNum=0)

GOOSE 订阅者的关键检查逻辑:

检查项 说明 异常处理
stNum 递增 状态号必须单调递增 若 stNum 回退,表明配置变更或异常
sqNum 循环 序列号从 0 到 TAL 内循环 sqNum 不递增可能表示发布者卡死
timeAllowedToLive 超时检测 超时未收到任何帧,判定连接断开
confRev 变更 数据集结构版本号 版本变更通常需要重新配置订阅
组播 MAC + APPID 帧过滤 帧格式错误丢弃

5.4.4 用 Wireshark 抓包验证

启动 Publisher 后,可以用 Wireshark 抓包分析 GOOSE 报文:

# 过滤条件
eth.type == 0x88b8

抓到的 GOOSE 报文特征:

Ethernet II, Src: xx:xx, Dst: 01-0c-cd-01-xx-xx
  Type: IEC 61850/GOOSE (0x88B8)
GOOSE
  gocbRef: simpleIOGenericIO/LLN0$GO$gcbAnalogValues
  timeAllowedtoLive: 2000
  datSet: simpleIOGenericIO/LLN0$gcbAnalogValues
  goID: simpleIOGenericIO/LLN0$GO$gcbAnalogValues
  stNum: 1
  sqNum: 0
  simulation: False
  confRev: 1
  ndsCom: False
  numDatSetEntries: 4
  allData: 4 items
    boolean: True
    integer: 0
    float: 0.0
    VisibleString: status_ok

5.5 SV 采样值订阅

SV(Sampled Values)用于传输连续的采样值数据,典型场景是将合并单元(MU)的电流/电压采样值传输给保护装置。

🚧 当前限制:pyiec61850-ng 的 SVSubscriber 目前仅支持订阅(接收),暂不支持发布。完整的 SV 发布通常需要底层硬件支持(专用 FPGA 或高性能网卡)。

SV 订阅示例

#!/usr/bin/env python3
"""
示例 10:SV 采样值订阅 —— 订阅 IEC 61850-9-2 LE 采样值。
需要 root/管理员权限。

用法:
    sudo python sv_subscriber.py <网络接口>
    sudo python sv_subscriber.py eth0
"""
import sys
import time

from pyiec61850.sv import SVSubscriber


def on_sample(msg):
    """收到 SV 报文时的回调函数"""
    print(f"\n=== SV 采样 ===")
    print(f"  smpCnt (采样计数): {msg.smp_cnt}")
    print(f"  confRev (配置版本): {msg.conf_rev}")
    print(f"  通道数: {len(msg.values)}")
    # 前 4 个值通常是: Ia, Ib, Ic, In (A 相/B 相/C 相/零序)
    channels = msg.values[:4]
    channel_names = ["Ia", "Ib", "Ic", "In"]
    for i, (name, value) in enumerate(zip(channel_names, channels)):
        print(f"    {name} = {value}")


def main() -> None:
    if len(sys.argv) != 2:
        print(f"用法: sudo {sys.argv[0]} <网络接口>")
        print(f"示例: sudo {sys.argv[0]} eth0")
        sys.exit(1)

    interface = sys.argv[1]

    with SVSubscriber(interface) as sub:
        sub.set_listener(on_sample)
        sub.start()
        print(f"SV 订阅者已启动: {interface}")
        print("等待 SV 报文... (Ctrl+C 停止)")

        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("\n停止 SV 订阅...")


if __name__ == "__main__":
    main()

SV 的关键参数:

参数 说明 9-2 LE 典型值
以太网类型 SV 专用 0x88BA
组播 MAC SV 组播地址池 01-0C-CD-04-xx-xx
采样率 点/周波 80 (50Hz时=4000Hz)
smpCnt 采样计数器 每个采样递增,同步脉冲复位
confRev 配置版本号 ASDU 通道结构变化时递增

SV 的时间同步要求:SV 采样值的时间戳要求高精度同步,通常依赖 IEEE 1588 PTP(精确时间协议)实现。没有准确的 PTP 同步,SV 数据无法用于保护算法。


5.6 运行测试:完整的端到端验证

上面我们分别实现了 Server、Client、GOOSE 和 SV 功能。下面是一个完整的端到端测试流程:

终端4 (sudo)

终端3 (sudo)

终端2

终端1

MMS TCP:102

MMS TCP:102

MMS TCP:102

GOOSE 组播

IEC 61850 Server
python 14_server.py
端口 102

Client: 身份查询
01_basic_connection.py

Client: 读取数据
03_read_data_values.py

Client: 报告订阅
12_reporting.py

GOOSE Publisher
10_goose_publisher.py

GOOSE Subscriber
09_goose_subscriber.py

测试步骤:

# 1. 启动 Server(终端 1)
python 14_server.py simple_server.icd

# 2. 浏览数据模型(终端 2)
python 02_device_discovery.py localhost

# 3. 读取测量值(终端 2)
python 03_read_data_values.py localhost \
  "simpleIOGenericIO/MMXU1.TotW.mag.f"

# 4. GOOSE 发布(终端 3,需 root)
sudo python 10_goose_publisher.py eth0

# 5. GOOSE 订阅(终端 4,需 root)
sudo python 09_goose_subscriber.py eth0 \
  "simpleIOGenericIO/LLN0$GO$gcbAnalogValues"

本篇小结

技能 API/模块 掌握要求
搭建 IEC 61850 Server IedServer, ServerConfig 理解
浏览数据模型 get_logical_devices(), get_logical_nodes() 熟练
读写数据值 read_value(), write_value() 熟练
批量读取数据集 read_dataset() 掌握
订阅 Report ReportClient, install_report_handler() 掌握
控制操作 ControlClient, select(), operate() 掌握
GOOSE 发布 GoosePublisher, publish() 理解
GOOSE 订阅 GooseSubscriber, set_listener() 理解
SV 订阅 SVSubscriber 了解

接下来 → Part 6:工程实战,我们将把所有这些能力部署到实际工程场景中,处理系统集成、网络架构、故障排查等现实问题。


附录:完整示例文件清单

以下所有示例代码均可直接在安装了 pyiec61850-ng 的环境中运行:

文件 功能 需求
simple_server.icd 最小 IED 模型配置
simple_server.py IEC 61850 Server pyiec61850-ng
client_connect.py 基础连接与身份查询 pyiec61850-ng
client_discovery.py 设备模型浏览 pyiec61850-ng
client_read.py 读取数据值 pyiec61850-ng
client_dataset.py 数据集批量读取 pyiec61850-ng
client_write.py 写入数据值 pyiec61850-ng
client_report.py 报告订阅 pyiec61850-ng
client_control.py 控制操作 (SBO/Direct) pyiec61850-ng
goose_publisher.py GOOSE 发布 pyiec61850-ng + root
goose_subscriber.py GOOSE 订阅 pyiec61850-ng + root
sv_subscriber.py SV 采样值订阅 pyiec61850-ng + root
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐