IEC 61850 系列(五):开发实现——基于 pyiec61850-ng 的代码实战
核心目标:基于 pyiec61850-ng Python 库开发 IEC 61850 Server/Client,实现 MMS 数据读写、报告订阅、控制操作以及 GOOSE 收发。
前置知识:Part 2 的信息模型(LD/LN/DO/DA/DataSet)和 Part 4 的 SCL 配置概念,Python 基础。
5.1 开源方案选型与 pyiec61850-ng 简介
5.1.1 方案对比
在 Part 5 中,我们原本规划了多个开源库的对比。但真正进入代码实战前,需要认清一个现实:IEC 61850 的完整实现非常复杂,Python 生态中能真正覆盖 MMS + GOOSE + SV 三驾马车的成熟方案并不多。
| 库 | 语言 | MMS | GOOSE | SV | 适合场景 |
|---|---|---|---|---|---|
| libIEC61850 | C | ✓ | ✓ | ✓ | 嵌入式、高性能、完整功能 |
| pyiec61850-ng | Python (C 绑定) | ✓ | ✓ | ✓ | 原型验证、测试自动化、快速开发 |
| OpenIEC61850 | Java | ✓ | ✓ | ✓ | PC 端跨平台 |
| py61850 | Python (纯) | 部分 | — | — | 学术研究 |
本系列选择 pyiec61850-ng 作为实战主角,原因有三:
- Python 生态:无需 C 环境编译,
pip install即用,适合工程验证和测试自动化 - 完整的 MMS + GOOSE + SV 支持:基于 libIEC61850 的 SWIG 绑定,底层是经过生产验证的 C 库
- 精心封装的安全 API:提供上下文管理器、类型自动转换、异常体系等现代 Python 特性,避免了底层 C 库的空指针和内存泄漏问题
5.1.2 pyiec61850-ng 的架构
5.1.3 安装与环境
# 安装最新发布版
pip install pyiec61850-ng
# 如果需要在 ARM Linux 上使用,建议交叉编译 libiec61850
# 然后在目标平台安装对应的 wheel
# 验证安装
python -c "from pyiec61850.mms import MMSClient; print('OK')"
注意事项:
- GOOSE 和 SV 功能需要 root/管理员权限,因为它们需要在链路层创建原始套接字
- pyiec61850-ng 在 Windows 上的 GOOSE/SV 支持有限,建议在 Linux 环境下进行 GOOSE/SV 开发
- MMS 功能(Server/Client)在所有平台上都可以正常运行
5.2 搭建一个 IEC 61850 Server
5.2.1 准备模型配置文件
pyiec61850-ng 的 Server 加载的是 SCL 格式的模型配置文件(ICD/CID/SCD)。下面是一个最小的 ICD 文件,定义了一个具有模拟量输出和开关量输出的简单 IED:
<?xml version="1.0" encoding="UTF-8"?>
<SCL xmlns="http://www.iec.ch/61850/2003/SCL"
version="2007" revision="B">
<Header id="SimpleServer_v1" version="1.0" revision="0"
nameStructure="IEDName"/>
<DataTypeTemplates>
<LNodeType id="LLN0_Type" lnClass="LLN0" iedType="Simple">
<DO name="Mod" type="INC_Type"/>
<DO name="Health" type="ENS_Type"/>
<DO name="NamPlt" type="LPL_Type"/>
</LNodeType>
<LNodeType id="MMXU_Type" lnClass="MMXU" iedType="Simple">
<DO name="TotW" type="MV_Type"/> <!-- 总有功功率 -->
<DO name="TotVAr" type="MV_Type"/> <!-- 总无功功率 -->
</LNodeType>
<LNodeType id="GGIO_Type" lnClass="GGIO" iedType="Simple">
<DO name="Ind1" type="SPS_Type"/> <!-- 状态输入 1 -->
<DO name="SPCSO1" type="SPC_Type"/> <!-- 控制输出 1 -->
</LNodeType>
<DOType id="INC_Type" cdc="INC">
<DA name="stVal" bType="INT8" fc="ST"/>
<DA name="q" bType="Quality" fc="ST"/>
<DA name="t" bType="Timestamp" fc="ST"/>
<DA name="ctlVal" bType="INT8" fc="CO"/>
</DOType>
<DOType id="ENS_Type" cdc="ENS">
<DA name="stVal" bType="Enum" type="HealthEnum" fc="ST"/>
<DA name="q" bType="Quality" fc="ST"/>
<DA name="t" bType="Timestamp" fc="ST"/>
</DOType>
<DOType id="SPS_Type" cdc="SPS">
<DA name="stVal" bType="BOOLEAN" fc="ST"/>
<DA name="q" bType="Quality" fc="ST"/>
<DA name="t" bType="Timestamp" fc="ST"/>
</DOType>
<DOType id="SPC_Type" cdc="SPC">
<DA name="stVal" bType="BOOLEAN" fc="ST"/>
<DA name="q" bType="Quality" fc="ST"/>
<DA name="t" bType="Timestamp" fc="ST"/>
<DA name="ctlVal" bType="BOOLEAN" fc="CO"/>
</DOType>
<DOType id="MV_Type" cdc="MV">
<DA name="mag" bType="Struct" type="Vector_Type" fc="MX"/>
<DA name="q" bType="Quality" fc="MX"/>
<DA name="t" bType="Timestamp" fc="MX"/>
</DOType>
<DOType id="LPL_Type" cdc="LPL">
<DA name="vendor" bType="VisibleString" fc="DC"/>
<DA name="swRev" bType="VisibleString" fc="DC"/>
</DOType>
<DAType id="Vector_Type">
<BDA name="f" bType="FLOAT32"/>
</DAType>
<EnumType id="HealthEnum">
<EnumVal ord="0">Ok</EnumVal>
<EnumVal ord="1">Warning</EnumVal>
<EnumVal ord="2">Alarm</EnumVal>
</EnumType>
</DataTypeTemplates>
<IED name="SimpleServer" manufacturer="Example" configVersion="1.0">
<AccessPoint name="S1">
<Server>
<Authentication none="true"/>
<LDevice inst="simpleIOGenericIO">
<LN lnType="LLN0_Type" lnClass="LLN0" inst="1"/>
<LN lnType="MMXU_Type" lnClass="MMXU" inst="1"/>
<LN lnType="GGIO_Type" lnClass="GGIO" inst="1"/>
</LDevice>
</Server>
</AccessPoint>
</IED>
</SCL>
将上述内容保存为 simple_server.icd。这个文件定义了一个名为 SimpleServer 的 IED,包含一个逻辑设备 simpleIOGenericIO,其下有:
LLN0:管理 LNMMXU1:测量单元(TotW.mag.f= 总功率浮点值)GGIO1:通用 I/O(Ind1.stVal= 状态值,SPCSO1.stVal= 控制输出值)
5.2.2 最小 Server 示例
#!/usr/bin/env python3
"""
最小 IEC 61850 Server:每秒更新一个模拟测量值。
"""
import math
import sys
import time
from pyiec61850.server import IedServer, ServerConfig
def main() -> None:
if len(sys.argv) != 2:
print(f"用法: {sys.argv[0]} <模型配置文件>")
sys.exit(1)
port = 102
# 加载模型文件,创建 Server
with IedServer(sys.argv[1], ServerConfig(port=port, max_connections=5)) as server:
server.start(port)
print(f"IEC 61850 Server 已启动,端口 {port}")
print("监听地址: 0.0.0.0:102")
print("按 Ctrl+C 停止...")
t = 0
try:
while True:
# 模拟一个按正弦波变化的功率值
value = 230.0 + 10.0 * math.sin(t * 0.1)
# 更新数据模型(线程安全)
server.lock_data_model()
try:
server.update_float(
"simpleIOGenericIO/MMXU1.TotW.mag.f", value
)
finally:
server.unlock_data_model()
t += 1
time.sleep(1)
except KeyboardInterrupt:
print("\n正在停止服务器...")
if __name__ == "__main__":
main()
运行方法:
# 终端 1:启动 Server
python simple_server.py simple_server.icd
# 终端 2:用 IED Scout 或我们的 Client 连接 localhost:102
关键 API 说明:
| API | 作用 |
|---|---|
IedServer(model_cfg, config) |
创建 IED Server 实例,加载模型配置 |
ServerConfig(port, max_connections) |
服务器配置:端口和最大连接数 |
server.start(port) |
启动 MMS 服务,开始监听 |
server.lock_data_model() / unlock_data_model() |
加锁/解锁数据模型,保证线程安全 |
server.update_float(ref, value) |
更新浮点型数据属性的值 |
server.update_int32(ref, value) |
更新 32 位整型数据属性 |
server.update_boolean(ref, value) |
更新布尔型数据属性 |
server.update_quality(ref, quality) |
更新数据品质字段 |
注意:
server.update_*方法必须在lock_data_model()/unlock_data_model()的范围内调用,以确保多个客户端同时访问时的数据一致性。
5.2.3 验证 Server
启动 Server 后,可以用任意 MMS 客户端连接测试。最简单的方式是使用我们的 Client 示例(见 5.3 节)或者使用专门的测试工具:
# 使用 pyiec61850-ng 自带的 Client 示例连接测试
python 01_basic_connection.py localhost
# 预期输出:Vendor: Example Model: SimpleServer Revision: ...
5.3 开发 MMS Client
5.3.1 基础连接与浏览模型
连接并获取服务器身份
#!/usr/bin/env python3
"""
示例 1:基础连接 —— 连接 IEC 61850 Server 并获取身份信息。
"""
import sys
from pyiec61850.mms import MMSClient
def main() -> None:
if len(sys.argv) != 2:
print(f"用法: {sys.argv[0]} <服务器地址>")
sys.exit(1)
# MMSClient 支持上下文管理器,自动管理连接生命周期
with MMSClient(sys.argv[1]) as client:
identity = client.get_server_identity()
print(f"厂商 (Vendor): {identity.vendor}")
print(f"型号 (Model): {identity.model}")
print(f"版本 (Revision): {identity.revision}")
if __name__ == "__main__":
main()
运行: python client_01_connect.py localhost
浏览数据模型
#!/usr/bin/env python3
"""
示例 2:设备发现 —— 遍历 IED 的完整数据模型树。
"""
import sys
from pyiec61850.mms import MMSClient
def main() -> None:
if len(sys.argv) != 2:
print(f"用法: {sys.argv[0]} <服务器地址>")
sys.exit(1)
with MMSClient(sys.argv[1]) as client:
for device in client.get_logical_devices():
print(f"LD: {device}")
for node in client.get_logical_nodes(device):
print(f" LN: {node}")
for obj in client.get_data_objects(device, node):
print(f" DO: {device}/{node}.{obj}")
if __name__ == "__main__":
main()
预期输出(对应我们前面的 Server 模型):
LD: simpleIOGenericIO
LN: LLN0
DO: simpleIOGenericIO/LLN0.Mod
DO: simpleIOGenericIO/LLN0.Health
DO: simpleIOGenericIO/LLN0.NamPlt
LN: MMXU1
DO: simpleIOGenericIO/MMXU1.TotW
DO: simpleIOGenericIO/MMXU1.TotVAr
LN: GGIO1
DO: simpleIOGenericIO/GGIO1.Ind1
DO: simpleIOGenericIO/GGIO1.SPCSO1
API 速查:
| 方法 | 返回值 | 说明 |
|---|---|---|
get_logical_devices() |
list[str] |
所有逻辑设备名称 |
get_logical_nodes(device) |
list[str] |
指定 LD 下的 LN 名称 |
get_data_objects(device, node) |
list[str] |
指定 LN 下的 DO 名称 |
get_data_attributes(device, node, do) |
list[str] |
指定 DO 下的 DA 名称 |
5.3.2 读取数据值
#!/usr/bin/env python3
"""
示例 3:读取数据 —— 读取一个或多个数据属性的值。
"""
import sys
from pyiec61850.mms import MMSClient, ReadError
def main() -> None:
if len(sys.argv) < 3:
print(f"用法: {sys.argv[0]} <服务器地址> <引用1> [引用2 ...]")
sys.exit(1)
host, *refs = sys.argv[1:]
with MMSClient(host) as client:
for ref in refs:
try:
value = client.read_value(ref)
print(f"{ref} = {value!r}")
except ReadError as e:
print(f"{ref}: 读取失败 ({e})")
if __name__ == "__main__":
main()
运行示例:
python client_03_read.py localhost \
"simpleIOGenericIO/MMXU1.TotW.mag.f" \
"simpleIOGenericIO/MMXU1.TotW.q" \
"simpleIOGenericIO/GGIO1.Ind1.stVal"
预期输出:
simpleIOGenericIO/MMXU1.TotW.mag.f = 238.4
simpleIOGenericIO/MMXU1.TotW.q = 0100000000000000
simpleIOGenericIO/GGIO1.Ind1.stVal = False
数据引用格式:
格式: LDName/LNName.DOName.DAName
示例: simpleIOGenericIO/MMXU1.TotW.mag.f
↑ ↑ ↑ ↑
LD LN DO DA
read_value() 自动将底层 MmsValue 转换为 Python 原生类型:
| CDC 类型 | DA 示例 | Python 类型 |
|---|---|---|
| BOOLEAN | stVal |
bool |
| INT8/16/32 | stVal |
int |
| FLOAT32/64 | mag.f |
float |
| VisibleString | vendor |
str |
| Timestamp | t |
datetime (或 int) |
| Quality | q |
str (十六进制) |
5.3.3 批量读取数据集
很多时候我们需要一次性读取一组相关的数据点,而不是逐个读取。使用 read_dataset() 可以大幅减少网络往返次数:
#!/usr/bin/env python3
"""
示例 4:数据集批量读取 —— 一次 MMS 请求读取整个数据集。
"""
import sys
from pyiec61850.mms import MMSClient
def main() -> None:
if len(sys.argv) != 3:
print(f"用法: {sys.argv[0]} <服务器地址> <数据集引用>")
sys.exit(1)
host, ds_ref = sys.argv[1], sys.argv[2]
with MMSClient(host) as client:
values = client.read_dataset(ds_ref)
print(f"数据集 {ds_ref} 共 {len(values)} 个成员:")
for i, v in enumerate(values):
print(f" [{i}] {v!r}")
if __name__ == "__main__":
main()
运行示例(如果 Server 配置了 DataSet):
python client_04_dataset.py localhost "simpleIOGenericIO/LLN0.DSReport"
为什么批量读取重要? 在监控上百个遥测点的场景下,逐个
read_value()需要 N 次网络往返。而read_dataset()只需要 1 次。当 N=100 时,网络延迟从 100ms 降为 1ms,差距是 100 倍。
5.3.4 写入数据值
#!/usr/bin/env python3
"""
示例 5:写入数据 —— 向 Server 写入数据值。
"""
import sys
from pyiec61850.mms import MMSClient, WriteError
def main() -> None:
if len(sys.argv) != 4:
print(f"用法: {sys.argv[0]} <服务器地址> <引用> <值>")
sys.exit(1)
host, ref, raw_value = sys.argv[1], sys.argv[2], sys.argv[3]
# 自动类型推断:尝试转为 float→int→bool
try:
value = float(raw_value)
if value == int(value):
value = int(value) if not isinstance(value, bool) else value
except ValueError:
value = raw_value
with MMSClient(host) as client:
try:
client.write_value(ref, value)
print(f"写入成功: {ref} = {value!r}")
except WriteError as e:
print(f"写入失败: {e}")
if __name__ == "__main__":
main()
运行示例:
# 写入布尔值
python client_05_write.py localhost \
"simpleIOGenericIO/GGIO1.SPCSO1.stVal" True
# 写入浮点数
python client_05_write.py localhost \
"simpleIOGenericIO/MMXU1.TotW.mag.f" 220.5
注意:写入操作需要 Server 端允许写入。在默认的 SCL 模型中,状态值(ST 功能约束)通常是只读的。控制输出(CO 功能约束)才允许写入。实际写入哪个 DA 取决于 IED 的访问控制配置。
5.3.5 订阅 Report
报告(Report)是 IEC 61850 MMS 中最核心的"推送"机制。当 Server 端数据变化时,它主动向订阅的 Client 发送报告,避免了 Client 频繁轮询。
#!/usr/bin/env python3
"""
示例 6:报告订阅 —— 订阅 Server 的缓冲报告控制块(BRCB)。
"""
import sys
import time
from pyiec61850.mms import MMSClient
from pyiec61850.mms.report import ReportClient
def on_report(report):
"""收到报告时的回调函数"""
print(f"\n=== 收到报告 ===")
print(f" RptID: {report.rpt_id}")
print(f" 序列号: {report.seq_num}")
print(f" 条目数: {len(report.entries)}")
for entry in report.entries[:5]: # 只打印前 5 条
print(f" {entry.reference}: {entry.value}")
def main() -> None:
if len(sys.argv) != 3:
print(f"用法: {sys.argv[0]} <服务器地址> <RCB 引用>")
print(f"示例:")
print(f" {sys.argv[0]} 192.168.1.100 \"SimpleServer/simpleIOGenericIO/LLN0.RP.BRCB1\"")
sys.exit(1)
host, rcb_ref = sys.argv[1], sys.argv[2]
with MMSClient(host) as client:
reports = ReportClient(client)
# 1. 安装报告处理回调
reports.install_report_handler(rcb_ref, "my_client", on_report)
print(f"已安装报告处理程序: {rcb_ref}")
# 2. 启用报告
reports.enable_reporting(rcb_ref)
print("报告已启用")
# 3. 触发一次总召唤(GI),立即获取当前所有值
reports.trigger_gi_report(rcb_ref)
print("总召唤已触发")
# 4. 等待报告
print("等待报告... (按 Ctrl+C 停止)")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n正在停止...")
# 5. 清理
reports.disable_reporting(rcb_ref)
reports.uninstall_all_handlers()
if __name__ == "__main__":
main()
运行条件:Server 端的 ICD 文件中必须配置了 ReportControl:
<LN lnType="LLN0_Type" lnClass="LLN0" inst="1">
<ReportControl name="BRCB1" rptID="SimpleServer/LLN0.BRCB1"
datSet="dsReport" intgPd="5000"
buffered="true" bufTime="100">
<TrgOps dchg="true" qchg="true" dupd="false" period="false"/>
</ReportControl>
<DataSet name="dsReport">
<!-- 数据集成员定义 -->
<FCDA ldInst="simpleIOGenericIO" lnClass="MMXU" lnInst="1"
doName="TotW" fc="MX"/>
</DataSet>
</LN>
报告工作流程:
BRCB(缓冲报告)vs URCB(非缓冲报告)的区别:
| 特性 | BRCB(缓冲) | URCB(非缓冲) |
|---|---|---|
| 断线恢复 | 自动补发断线期间的数据 | 丢失断线期间的数据 |
| 典型场景 | 保护事件、需要完整记录的场合 | 状态监视、可以容忍丢数据的场合 |
| bufTime | 聚合多个事件打包发送 | 立即发送 |
| 资源占用 | 需要缓冲区 | 不需要缓冲区 |
5.3.6 控制操作
控制操作是 IEC 61850 中最严谨的部分,涉及"选择-执行"两阶段确认机制。
#!/usr/bin/env python3
"""
示例 7:控制操作 —— Direct Operate 与 SBO (Select-Before-Operate)。
"""
import sys
from pyiec61850.mms import MMSClient
from pyiec61850.mms.control import ControlClient, OperateError, SelectError
def main() -> None:
if len(sys.argv) != 3:
print(f"用法: {sys.argv[0]} <服务器地址> <控制对象引用>")
print(f"示例:")
print(f" {sys.argv[0]} localhost \"simpleIOGenericIO/GGIO1.SPCSO1\"")
sys.exit(1)
host, ref = sys.argv[1], sys.argv[2]
with MMSClient(host) as client:
ctrl = ControlClient(client)
# 1. 查询控制模型
model = ctrl.get_control_model(ref)
model_names = {
0: "status-only",
1: "direct-with-normal-security",
2: "sbo-with-normal-security",
3: "direct-with-enhanced-security",
4: "sbo-with-enhanced-security",
}
print(f"控制模型: {model_names.get(model, f'未知({model})')}")
# 2. 尝试 Direct Operate(直接操作)
print("\n尝试 Direct Operate (True)...")
try:
ctrl.direct_operate(ref, True)
print(" ✓ Direct Operate 成功")
except OperateError as e:
print(f" ✗ Direct Operate 失败: {e}")
# 3. 尝试 SBO(选择-后-操作)
print("\n尝试 SBO (False)...")
try:
# 第一步:选择
ctrl.select(ref)
print(" ✓ Select 成功")
# 第二步:执行
ctrl.operate(ref, False)
print(" ✓ Operate 成功")
except (SelectError, OperateError) as e:
print(f" ✗ SBO 失败: {e}")
# 4. 释放所有控制资源
ctrl.release_all()
print("\n所有控制资源已释放")
if __name__ == "__main__":
main()
五种控制模型的选择场景:
| 模型名称 | 常量 | 流程 | 安全等级 | 典型场景 |
|---|---|---|---|---|
| status-only | 0 | — | — | 只读,不可控 |
| direct-with-normal-security | 1 | Operate | 普通 | 紧急跳闸、简单开关 |
| sbo-with-normal-security | 2 | Select → Operate | 普通 | 常规遥控 |
| direct-with-enhanced-security | 3 | Operate + Check | 加强 | 需校验的直接控制 |
| sbo-with-enhanced-security | 4 | Select + Check → Operate | 加强 | 断路器等高安全对象 |
SBO 控制完整时序:
工程最佳实践:
- 始终先调用
get_control_model()确认控制模型类型- 对于 SBO 模型,
select()和operate()之间存在超时(sboTimeout),默认通常为 30 秒- 操作完成后务必调用
release_all()释放资源- 增强型模型(enhanced)要求 Client 在操作时附带 Check 参数(同步检查、联锁检查)
5.4 GOOSE 发布与订阅
GOOSE 是 IEC 61850 中信号实时传输的核心机制。与 MMS 不同,GOOSE 不基于 TCP/IP,而是直接在以太网二层上传输组播报文。
5.4.1 GOOSE Publisher(发布者)
#!/usr/bin/env python3
"""
示例 8:GOOSE Publisher —— 在指定网络接口发布 GOOSE 消息。
需要 root/管理员权限。
用法:
sudo python goose_publisher.py <网络接口>
sudo python goose_publisher.py eth0
"""
import sys
import time
from pyiec61850.goose import GoosePublisher
def main() -> None:
if len(sys.argv) < 2:
print(f"用法: sudo {sys.argv[0]} <网络接口>")
print(f"示例: sudo {sys.argv[0]} eth0")
sys.exit(1)
interface = sys.argv[1]
with GoosePublisher(interface) as pub:
# 配置 GOOSE 控制块参数
pub.set_go_cb_ref("simpleIOGenericIO/LLN0$GO$gcbAnalogValues")
pub.set_app_id(0x1000)
pub.set_conf_rev(1)
pub.set_time_allowed_to_live(2000) # 2 秒
print(f"GOOSE Publisher 启动: {interface}")
pub.start()
print("开始发布 GOOSE 消息 (Ctrl+C 停止)...")
try:
counter = 0
while True:
# 发布一组数据值
values = [True, counter, 3.14 * counter, "status_ok"]
pub.publish(values)
print(f" 发布: stNum={counter}, 值={values}")
# 数据变化时递增 stNum
counter += 1
pub.increase_st_num()
time.sleep(1)
except KeyboardInterrupt:
print("\n停止发布...")
print("GOOSE 发布结束。")
if __name__ == "__main__":
main()
GOOSE 发布关键参数:
| 参数 | 说明 | 典型值 |
|---|---|---|
go_cb_ref |
GOOSE 控制块引用 | LD/LN$GO$CBName |
app_id |
APPID,全站唯一 | 0x0001 ~ 0x3FFF |
conf_rev |
配置版本号,结构变化时递增 | 初始为 1 |
time_allowed_to_live |
最大允许生存时间 (ms) | 2000 (2秒) |
increase_st_num() |
状态编号递增,数据变化时调用 | 每次数据集变化 |
publish(values) |
发布一组数据值 | 值类型:bool/int/float/str |
5.4.2 GOOSE Subscriber(订阅者)
#!/usr/bin/env python3
"""
示例 9:GOOSE Subscriber —— 订阅并接收 GOOSE 消息。
需要 root/管理员权限。
用法:
sudo python goose_subscriber.py <网络接口> <GOOSE 控制块引用>
sudo python goose_subscriber.py eth0 "simpleIOGenericIO/LLN0$GO$gcbAnalogValues"
"""
import sys
import time
from pyiec61850.goose import GooseSubscriber
def on_goose_message(msg):
"""收到 GOOSE 消息时的回调函数"""
print(f"\n=== GOOSE 消息 ===")
print(f" stNum (状态号): {msg.st_num}")
print(f" sqNum (序列号): {msg.sq_num}")
print(f" 数据有效: {msg.is_valid}")
print(f" 数据值: {msg.values}")
# msg.values 是一个列表,顺序对应发布时的数据集顺序
def main() -> None:
if len(sys.argv) != 3:
print(f"用法: sudo {sys.argv[0]} <接口> <GOOSE 控制块引用>")
print(f"示例: sudo {sys.argv[0]} eth0 "
"\"simpleIOGenericIO/LLN0$GO$gcbAnalogValues\"")
sys.exit(1)
interface, go_cb_ref = sys.argv[1], sys.argv[2]
with GooseSubscriber(interface, go_cb_ref) as sub:
sub.set_listener(on_goose_message)
sub.start()
print(f"GOOSE 订阅者已启动: {interface}")
print(f"监听 GOOSE 控制块: {go_cb_ref}")
print("等待 GOOSE 消息... (Ctrl+C 停止)")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n停止订阅...")
if __name__ == "__main__":
main()
5.4.3 GOOSE 的状态机与重传机制
GOOSE 订阅者的关键检查逻辑:
| 检查项 | 说明 | 异常处理 |
|---|---|---|
stNum 递增 |
状态号必须单调递增 | 若 stNum 回退,表明配置变更或异常 |
sqNum 循环 |
序列号从 0 到 TAL 内循环 | sqNum 不递增可能表示发布者卡死 |
timeAllowedToLive |
超时检测 | 超时未收到任何帧,判定连接断开 |
confRev 变更 |
数据集结构版本号 | 版本变更通常需要重新配置订阅 |
| 组播 MAC + APPID | 帧过滤 | 帧格式错误丢弃 |
5.4.4 用 Wireshark 抓包验证
启动 Publisher 后,可以用 Wireshark 抓包分析 GOOSE 报文:
# 过滤条件
eth.type == 0x88b8
抓到的 GOOSE 报文特征:
Ethernet II, Src: xx:xx, Dst: 01-0c-cd-01-xx-xx
Type: IEC 61850/GOOSE (0x88B8)
GOOSE
gocbRef: simpleIOGenericIO/LLN0$GO$gcbAnalogValues
timeAllowedtoLive: 2000
datSet: simpleIOGenericIO/LLN0$gcbAnalogValues
goID: simpleIOGenericIO/LLN0$GO$gcbAnalogValues
stNum: 1
sqNum: 0
simulation: False
confRev: 1
ndsCom: False
numDatSetEntries: 4
allData: 4 items
boolean: True
integer: 0
float: 0.0
VisibleString: status_ok
5.5 SV 采样值订阅
SV(Sampled Values)用于传输连续的采样值数据,典型场景是将合并单元(MU)的电流/电压采样值传输给保护装置。
🚧 当前限制:pyiec61850-ng 的
SVSubscriber目前仅支持订阅(接收),暂不支持发布。完整的 SV 发布通常需要底层硬件支持(专用 FPGA 或高性能网卡)。
SV 订阅示例
#!/usr/bin/env python3
"""
示例 10:SV 采样值订阅 —— 订阅 IEC 61850-9-2 LE 采样值。
需要 root/管理员权限。
用法:
sudo python sv_subscriber.py <网络接口>
sudo python sv_subscriber.py eth0
"""
import sys
import time
from pyiec61850.sv import SVSubscriber
def on_sample(msg):
"""收到 SV 报文时的回调函数"""
print(f"\n=== SV 采样 ===")
print(f" smpCnt (采样计数): {msg.smp_cnt}")
print(f" confRev (配置版本): {msg.conf_rev}")
print(f" 通道数: {len(msg.values)}")
# 前 4 个值通常是: Ia, Ib, Ic, In (A 相/B 相/C 相/零序)
channels = msg.values[:4]
channel_names = ["Ia", "Ib", "Ic", "In"]
for i, (name, value) in enumerate(zip(channel_names, channels)):
print(f" {name} = {value}")
def main() -> None:
if len(sys.argv) != 2:
print(f"用法: sudo {sys.argv[0]} <网络接口>")
print(f"示例: sudo {sys.argv[0]} eth0")
sys.exit(1)
interface = sys.argv[1]
with SVSubscriber(interface) as sub:
sub.set_listener(on_sample)
sub.start()
print(f"SV 订阅者已启动: {interface}")
print("等待 SV 报文... (Ctrl+C 停止)")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n停止 SV 订阅...")
if __name__ == "__main__":
main()
SV 的关键参数:
| 参数 | 说明 | 9-2 LE 典型值 |
|---|---|---|
| 以太网类型 | SV 专用 | 0x88BA |
| 组播 MAC | SV 组播地址池 | 01-0C-CD-04-xx-xx |
| 采样率 | 点/周波 | 80 (50Hz时=4000Hz) |
| smpCnt | 采样计数器 | 每个采样递增,同步脉冲复位 |
| confRev | 配置版本号 | ASDU 通道结构变化时递增 |
SV 的时间同步要求:SV 采样值的时间戳要求高精度同步,通常依赖 IEEE 1588 PTP(精确时间协议)实现。没有准确的 PTP 同步,SV 数据无法用于保护算法。
5.6 运行测试:完整的端到端验证
上面我们分别实现了 Server、Client、GOOSE 和 SV 功能。下面是一个完整的端到端测试流程:
测试步骤:
# 1. 启动 Server(终端 1)
python 14_server.py simple_server.icd
# 2. 浏览数据模型(终端 2)
python 02_device_discovery.py localhost
# 3. 读取测量值(终端 2)
python 03_read_data_values.py localhost \
"simpleIOGenericIO/MMXU1.TotW.mag.f"
# 4. GOOSE 发布(终端 3,需 root)
sudo python 10_goose_publisher.py eth0
# 5. GOOSE 订阅(终端 4,需 root)
sudo python 09_goose_subscriber.py eth0 \
"simpleIOGenericIO/LLN0$GO$gcbAnalogValues"
本篇小结
| 技能 | API/模块 | 掌握要求 |
|---|---|---|
| 搭建 IEC 61850 Server | IedServer, ServerConfig |
理解 |
| 浏览数据模型 | get_logical_devices(), get_logical_nodes() |
熟练 |
| 读写数据值 | read_value(), write_value() |
熟练 |
| 批量读取数据集 | read_dataset() |
掌握 |
| 订阅 Report | ReportClient, install_report_handler() |
掌握 |
| 控制操作 | ControlClient, select(), operate() |
掌握 |
| GOOSE 发布 | GoosePublisher, publish() |
理解 |
| GOOSE 订阅 | GooseSubscriber, set_listener() |
理解 |
| SV 订阅 | SVSubscriber |
了解 |
接下来 → Part 6:工程实战,我们将把所有这些能力部署到实际工程场景中,处理系统集成、网络架构、故障排查等现实问题。
附录:完整示例文件清单
以下所有示例代码均可直接在安装了 pyiec61850-ng 的环境中运行:
| 文件 | 功能 | 需求 |
|---|---|---|
simple_server.icd |
最小 IED 模型配置 | — |
simple_server.py |
IEC 61850 Server | pyiec61850-ng |
client_connect.py |
基础连接与身份查询 | pyiec61850-ng |
client_discovery.py |
设备模型浏览 | pyiec61850-ng |
client_read.py |
读取数据值 | pyiec61850-ng |
client_dataset.py |
数据集批量读取 | pyiec61850-ng |
client_write.py |
写入数据值 | pyiec61850-ng |
client_report.py |
报告订阅 | pyiec61850-ng |
client_control.py |
控制操作 (SBO/Direct) | pyiec61850-ng |
goose_publisher.py |
GOOSE 发布 | pyiec61850-ng + root |
goose_subscriber.py |
GOOSE 订阅 | pyiec61850-ng + root |
sv_subscriber.py |
SV 采样值订阅 | pyiec61850-ng + root |
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)