《用C#实现工业现场数据的实时采集与存储》的OPC UA 集成扩展,无缝融入原有架构
以下是针对前文《用C#实现工业现场数据的实时采集与存储》的OPC UA 集成扩展,无缝融入原有架构。内容基于 .NET 8 / .NET 9(2025–2026 年主流工业实践),推荐使用 OPC Foundation 官方 UA-.NETStandard 库(开源、跨平台、社区活跃度最高、持续更新至 2025+)。
为什么选择 OPC UA + .NET Standard Stack?
- OPC UA 是工业 4.0 / IIoT 事实标准,支持复杂信息模型、安全加密、Pub/Sub(未来扩展)。
- 官方库(OPCFoundation.NetStandard.Opc.Ua)纯 .NET Standard 2.0+,兼容 .NET 8/9,无需商业授权(Redistributable License)。
- 支持异步读写、订阅(Monitored Items)、断线重连、证书管理。
- 比商业 SDK(如 Unified Automation、Prosys)更轻量、更新快,适合大多数 PLC(如 Siemens S7-1500/1200、Beckhoff、Codesys、KEPServerEX 等)。
NuGet 安装(核心包):
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua" Version="1.5.*" /> <!-- 2025–2026 最新稳定版 -->
<!-- 可选:客户端示例与工具 -->
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Symbols" Version="1.5.*" PrivateAssets="all" />
1. OPC UA Collector 封装(异步、带重连)
using Opc.Ua;
using Opc.Ua.Client;
using Opc.Ua.Configuration;
using System.Threading.Tasks;
public class OpcUaCollector : IAsyncDisposable
{
private Session? _session;
private readonly string _endpointUrl;
private readonly ILogger _logger;
private readonly ApplicationConfiguration _appConfig;
public OpcUaCollector(string endpointUrl, ILogger logger)
{
_endpointUrl = endpointUrl;
_logger = logger;
// 简单配置(生产环境需证书 + 用户认证)
_appConfig = new ApplicationConfiguration
{
ApplicationName = "IndustrialDataCollector",
ApplicationType = ApplicationType.Client,
SecurityConfiguration = new SecurityConfiguration { /* ... */ },
TransportConfigurations = new TransportConfigurationCollection(),
ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 },
// 更多配置...
};
_appConfig.Validate(ApplicationType.Client).GetAwaiter().GetResult();
}
public async Task<bool> ConnectAsync()
{
try
{
var endpoint = new ConfiguredEndpoint(null, new EndpointDescription(_endpointUrl));
_session = await Session.Create(
_appConfig,
endpoint,
false, // 不更新端点
"IndustrialCollectorSession",
60000,
new UserIdentity(), // 匿名;生产用 UserNameIdentityToken 或证书
null);
_logger.LogInformation("OPC UA 连接成功: {Endpoint}", _endpointUrl);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "OPC UA 连接失败");
return false;
}
}
/// <summary>
/// 批量读取多个 Node(异步)
/// </summary>
public async Task<Dictionary<string, object>> ReadNodesAsync(IEnumerable<string> nodeIds)
{
if (_session == null || !_session.Connected) throw new InvalidOperationException("Session not connected");
var readValues = new ReadValueIdCollection();
var nodeIdMap = new Dictionary<string, int>(); // 记住原始顺序
int index = 0;
foreach (var nidStr in nodeIds)
{
var nodeId = new NodeId(nidStr);
readValues.Add(new ReadValueId
{
NodeId = nodeId,
AttributeId = Attributes.Value
});
nodeIdMap[nidStr] = index++;
}
var response = await _session.ReadAsync(null, 0, TimestampsToReturn.Both, readValues, CancellationToken.None);
var results = new Dictionary<string, object>();
for (int i = 0; i < response.Results.Count; i++)
{
var result = response.Results[i];
if (result.StatusCode == StatusCodes.Good)
{
var kv = nodeIdMap.First(kv => kv.Value == i);
results[kv.Key] = result.Value; // Variant → object
}
else
{
_logger.LogWarning("读取失败 Node {NodeId}: {Status}", nodeIdMap.First(kv => kv.Value == i).Key, result.StatusCode);
}
}
return results;
}
public async ValueTask DisposeAsync()
{
if (_session != null)
{
await _session.CloseAsync();
_session.Dispose();
}
}
}
2. 集成到原有采集架构(修改 IndustrialDataCollector)
// 在 IndustrialDataCollector 中添加 OPC UA 支持
public class IndustrialDataCollector : BackgroundService
{
// ... 原有 Modbus 等
private readonly OpcUaCollector? _opcUa; // 可选注入
public IndustrialDataCollector(
ModbusTcpCollector modbus,
OpcUaCollector? opcUa = null, // 通过 DI 注入
// ...
)
{
_modbus = modbus;
_opcUa = opcUa;
// ...
}
private async Task<CollectedDataPoint> CollectDataAsync()
{
var point = new CollectedDataPoint { Timestamp = DateTimeOffset.UtcNow };
// Modbus 采集(原有)
if (_modbus != null)
{
try
{
var regs = await _modbus.ReadHoldingRegistersAsync(1, 100, 20);
point.Tags["Modbus_Temp"] = ModbusUtility.ConvertToFloat(regs.Span.Slice(0, 2));
}
catch { /* 容错 */ }
}
// OPC UA 采集
if (_opcUa != null)
{
try
{
var nodes = new[]
{
"ns=2;s=PLC1.Tag.Temperature",
"ns=2;s=PLC1.Tag.Pressure"
};
var values = await _opcUa.ReadNodesAsync(nodes);
if (values.TryGetValue("ns=2;s=PLC1.Tag.Temperature", out var temp))
point.Tags["OpcUa_Temp"] = temp;
// ...
}
catch (Exception ex)
{
_logger.LogWarning("OPC UA 读取异常: {Message}", ex.Message);
}
}
return point;
}
// 重连逻辑扩展
private async Task<bool> TryEnsureConnected(CancellationToken ct)
{
bool ok = true;
if (_modbus != null) ok &= await _modbus.ConnectAsync();
if (_opcUa != null) ok &= await _opcUa.ConnectAsync();
return ok;
}
}
3. 订阅模式(Monitored Items) – 更高效的实时采集
OPC UA 的订阅远优于轮询(事件驱动、低延迟)。
简单订阅示例(添加到 OpcUaCollector)
public async Task SubscribeAsync(IEnumerable<string> nodeIds, Action<string, object> valueChangedCallback)
{
if (_session == null) return;
var items = new MonitoredItemCollection();
int clientHandle = 1;
foreach (var nidStr in nodeIds)
{
var item = new MonitoredItem
{
StartNodeId = new NodeId(nidStr),
AttributeId = Attributes.Value,
SamplingInterval = 500, // 采样间隔 ms
QueueSize = 10,
DiscardOldest = true,
ClientHandle = clientHandle++
};
items.Add(item);
}
var subscription = new Subscription
{
PublishingInterval = 1000,
KeepAliveCount = 30,
LifetimeCount = 60,
Priority = 0
};
subscription.AddItems(items);
await _session.AddSubscriptionAsync(subscription);
await subscription.CreateAsync();
subscription.Publish += (sender, e) =>
{
foreach (var notification in e.NotificationMessage.NotificationData)
{
if (notification is MonitoredItemNotification min)
{
var item = subscription.MonitoredItems.First(i => i.ClientHandle == min.ClientHandle);
valueChangedCallback(item.StartNodeId.ToString(), min.Value.Value);
}
}
};
}
使用方式(在 BackgroundService 中):
await _opcUa.SubscribeAsync(nodes, (nodeId, value) =>
{
// 直接推入 Channel
_channel.Writer.TryWrite(new CollectedDataPoint
{
Timestamp = DateTimeOffset.UtcNow,
Tags = { [nodeId] = value }
});
});
4. DI 注册(Program.cs)
builder.Services.AddSingleton<OpcUaCollector>(sp =>
new OpcUaCollector("opc.tcp://192.168.1.50:4840", sp.GetRequiredService<ILogger<OpcUaCollector>>()));
// 可选:多 Collector 共存(Modbus + OPC UA + MQTT 等)
5. 工业级注意事项(2025–2026 实践)
- 证书与安全:生产环境强制 Sign & Encrypt(X509 证书),用
_appConfig.SecurityConfiguration配置。 - 重连策略:Session.ReconnectPeriod = 5000; + KeepAlive 检测。
- 批量订阅:单个 Subscription 限 1000–5000 items,根据服务器能力分多个。
- 性能:订阅优于轮询(延迟 <100ms,网络负载低)。
- 诊断:用 UaExpert / Prosys OPC UA Browser 验证节点 & 订阅。
- 异常处理:BadSessionIdInvalid / BadTimeout → 自动重连。
如果需要更详细的证书配置、订阅 + 批量读取混合、UA Pub/Sub 扩展(MQTT over OPC UA)、多服务器聚合,或与 WPF/MAUI 实时曲线联动示例,告诉我你的具体 OPC UA 服务器类型(Siemens、KEPServer、Codesys 等)或节点结构,我可以进一步细化代码。
以下是针对前文 OpcUaCollector 类中 OPC UA 客户端的证书配置完整示例(生产级、安全推荐方式)。基于 OPC Foundation 官方 UA-.NETStandard 栈(2025–2026 年最新实践),包括:
- 自签名证书自动创建(开发/测试常用)
- 加载已有证书(生产推荐)
- CA 签发证书 + 信任列表管理
- 加密模式(Sign + Encrypt)
- 证书存储路径自定义(跨平台兼容)
1. 推荐配置原则(工业场景)
- 开发/测试:允许栈自动创建自签名证书(最简单)。
- 生产环境:
- 使用 CA 签发的 X.509 证书(推荐 RSA 2048/3072 或 ECDSA)。
- 强制 SecurityMode.SignAndEncrypt + SecurityPolicy.Basic256Sha256(或更高)。
- 证书存储用目录(DirectoryStore),便于容器/边缘部署。
- 信任服务器证书(TrustedPeer) + 信任 CA(TrustedIssuer)。
- 拒绝列表(RejectedStore)自动管理。
2. 完整 ApplicationConfiguration 示例(XML 或代码)
方式一:代码中硬编码(推荐用于容器/无配置文件场景)
private async Task<ApplicationConfiguration> CreateApplicationConfigurationAsync()
{
var config = new ApplicationConfiguration
{
ApplicationName = "IndustrialDataCollector",
ApplicationUri = Utils.Format("urn:{0}:IndustrialDataCollector", System.Net.Dns.GetHostName()),
ApplicationType = ApplicationType.Client,
// 证书相关核心配置
SecurityConfiguration = new SecurityConfiguration
{
// 应用实例证书(own)
ApplicationCertificate = new CertificateIdentifier
{
StoreType = CertificateStoreType.Directory,
StorePath = "./pki/own", // 相对路径或绝对路径 %LocalAppData%/OPC Foundation/pki/own
SubjectName = "CN=IndustrialDataCollector, DC=" + System.Net.Dns.GetHostName()
},
// 信任的发行者证书(CA 根证书)
TrustedIssuerCertificates = new CertificateTrustList
{
StoreType = CertificateStoreType.Directory,
StorePath = "./pki/issuer"
},
// 信任的 peer 证书(服务器证书)
TrustedPeerCertificates = new CertificateTrustList
{
StoreType = CertificateStoreType.Directory,
StorePath = "./pki/trusted"
},
// 被拒绝的证书(自动填充)
RejectedCertificateStore = new CertificateTrustList
{
StoreType = CertificateStoreType.Directory,
StorePath = "./pki/rejected"
},
// 支持的安全策略(生产只留强策略)
SupportedSecurityPolicies = new StringCollection
{
SecurityPolicies.Basic256Sha256,
SecurityPolicies.Basic256,
SecurityPolicies.Basic128Rsa15 // 可选,视服务器支持
},
// 自动接受未签名证书(测试用,生产关闭)
AutoAcceptUntrustedCertificates = false,
// 最小密钥长度(推荐 2048+)
MinimumCertificateKeySize = 2048
},
TransportConfigurations = new TransportConfigurationCollection(),
ClientConfiguration = new ClientConfiguration
{
DefaultSessionTimeout = 60000,
MinPublishRequestCount = 5,
MaxPublishRequestCount = 20
},
// 其他(日志、Trace 等)
TraceConfiguration = new TraceConfiguration { OutputFilePath = "./opcua.log" }
};
// 验证并创建缺失的证书存储目录
await config.Validate(ApplicationType.Client);
// 自动创建或检查应用证书(最重要一步!)
bool haveCert = await config.CheckApplicationInstanceCertificate(
false, // 不强制创建新证书
CertificateFactory.DefaultKeySize, // 2048
CertificateFactory.DefaultLifeTime // 120 个月
);
if (!haveCert)
{
_logger.LogError("应用证书无效或创建失败");
throw new Exception("Application instance certificate invalid!");
}
return config;
}
使用方式(在 OpcUaCollector 构造函数中调用):
public OpcUaCollector(string endpointUrl, ILogger logger)
{
_endpointUrl = endpointUrl;
_logger = logger;
_appConfig = CreateApplicationConfigurationAsync().GetAwaiter().GetResult();
}
3. 加载已有 CA 签发证书(生产推荐)
如果已有 .pfx(含私钥)或 .der + .key 文件:
// 在 SecurityConfiguration.ApplicationCertificate 设置后,手动加载
var certIdentifier = config.SecurityConfiguration.ApplicationCertificate;
// 从 PFX 文件加载(含私钥)
byte[] pfxData = File.ReadAllBytes("C:/certs/myapp.pfx");
var cert = new X509Certificate2(pfxData, "pfx密码", X509KeyStorageFlags.Exportable);
await certIdentifier.SetCertificateAsync(cert);
// 或从 DER + PEM 私钥加载(更安全)
var publicCert = new X509Certificate2("myapp.der");
var privateKey = await CertificateFactory.LoadPrivateKey("myapp.key", "key密码");
await certIdentifier.SetCertificateAsync(publicCert, privateKey);
4. 信任服务器证书(手动或自动)
// 手动信任服务器证书(生产前必须执行一次)
public async Task TrustServerCertificateAsync(byte[] serverCertDer)
{
var cert = new X509Certificate2(serverCertDer);
config.SecurityConfiguration.AddTrustedPeer(serverCertDer);
// 或通过代码信任所有未签名(仅测试!)
config.SecurityConfiguration.AutoAcceptUntrustedCertificates = true; // 危险,勿生产用
}
5. 连接时指定 SecurityMode & Policy
var endpoint = new ConfiguredEndpoint(null, new EndpointDescription(_endpointUrl)
{
SecurityMode = MessageSecurityMode.SignAndEncrypt,
SecurityPolicyUri = SecurityPolicies.Basic256Sha256
});
6. 常见目录结构(推荐)
./pki/
├── own/ # 本应用证书 + 私钥(自动创建或手动放置)
│ ├── cert.der
│ └── privatekey.pem
├── trusted/ # 信任的服务器证书(.der)
├── issuer/ # 信任的 CA 根/中间证书
└── rejected/ # 被拒绝的证书(自动)
7. 生产落地 Checklist
- 使用
./pki/相对路径或容器卷挂载(Kubernetes / Docker 友好) - 关闭
AutoAcceptUntrustedCertificates = false - 最小密钥 2048 bit,优先 Basic256Sha256
- 证书过期监控(定期检查 NotAfter)
- 备份 pki/own 私钥(丢失无法恢复会话)
- 用 UaExpert 测试连接 & 证书信任
如果需要完整带证书的 Session 创建代码、证书自动续期、用户身份证书(User Certificate Token) 示例,或与现有 Modbus 混合采集的完整 BackgroundService,告诉我你的具体安全要求(匿名 / 用户名密码 / 证书用户 / CA 链),我可以继续扩展。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)