以下是针对前文《用C#实现工业现场数据的实时采集与存储》的OPC UA 集成扩展,无缝融入原有架构。内容基于 .NET 8 / .NET 9(2025–2026 年主流工业实践),推荐使用 OPC Foundation 官方 UA-.NETStandard 库(开源、跨平台、社区活跃度最高、持续更新至 2025+)。

为什么选择 OPC UA + .NET Standard Stack?

  • OPC UA 是工业 4.0 / IIoT 事实标准,支持复杂信息模型、安全加密、Pub/Sub(未来扩展)。
  • 官方库(OPCFoundation.NetStandard.Opc.Ua)纯 .NET Standard 2.0+,兼容 .NET 8/9,无需商业授权(Redistributable License)。
  • 支持异步读写、订阅(Monitored Items)、断线重连、证书管理。
  • 比商业 SDK(如 Unified Automation、Prosys)更轻量、更新快,适合大多数 PLC(如 Siemens S7-1500/1200、Beckhoff、Codesys、KEPServerEX 等)。

NuGet 安装(核心包):

<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua" Version="1.5.*" />  <!-- 2025–2026 最新稳定版 -->
<!-- 可选:客户端示例与工具 -->
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Symbols" Version="1.5.*" PrivateAssets="all" />

1. OPC UA Collector 封装(异步、带重连)

using Opc.Ua;
using Opc.Ua.Client;
using Opc.Ua.Configuration;
using System.Threading.Tasks;

public class OpcUaCollector : IAsyncDisposable
{
    private Session? _session;
    private readonly string _endpointUrl;
    private readonly ILogger _logger;
    private readonly ApplicationConfiguration _appConfig;

    public OpcUaCollector(string endpointUrl, ILogger logger)
    {
        _endpointUrl = endpointUrl;
        _logger = logger;

        // 简单配置(生产环境需证书 + 用户认证)
        _appConfig = new ApplicationConfiguration
        {
            ApplicationName = "IndustrialDataCollector",
            ApplicationType = ApplicationType.Client,
            SecurityConfiguration = new SecurityConfiguration { /* ... */ },
            TransportConfigurations = new TransportConfigurationCollection(),
            ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 },
            // 更多配置...
        };
        _appConfig.Validate(ApplicationType.Client).GetAwaiter().GetResult();
    }

    public async Task<bool> ConnectAsync()
    {
        try
        {
            var endpoint = new ConfiguredEndpoint(null, new EndpointDescription(_endpointUrl));
            _session = await Session.Create(
                _appConfig,
                endpoint,
                false,                  // 不更新端点
                "IndustrialCollectorSession",
                60000,
                new UserIdentity(),     // 匿名;生产用 UserNameIdentityToken 或证书
                null);

            _logger.LogInformation("OPC UA 连接成功: {Endpoint}", _endpointUrl);
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "OPC UA 连接失败");
            return false;
        }
    }

    /// <summary>
    /// 批量读取多个 Node(异步)
    /// </summary>
    public async Task<Dictionary<string, object>> ReadNodesAsync(IEnumerable<string> nodeIds)
    {
        if (_session == null || !_session.Connected) throw new InvalidOperationException("Session not connected");

        var readValues = new ReadValueIdCollection();
        var nodeIdMap = new Dictionary<string, int>(); // 记住原始顺序

        int index = 0;
        foreach (var nidStr in nodeIds)
        {
            var nodeId = new NodeId(nidStr);
            readValues.Add(new ReadValueId
            {
                NodeId = nodeId,
                AttributeId = Attributes.Value
            });
            nodeIdMap[nidStr] = index++;
        }

        var response = await _session.ReadAsync(null, 0, TimestampsToReturn.Both, readValues, CancellationToken.None);

        var results = new Dictionary<string, object>();
        for (int i = 0; i < response.Results.Count; i++)
        {
            var result = response.Results[i];
            if (result.StatusCode == StatusCodes.Good)
            {
                var kv = nodeIdMap.First(kv => kv.Value == i);
                results[kv.Key] = result.Value; // Variant → object
            }
            else
            {
                _logger.LogWarning("读取失败 Node {NodeId}: {Status}", nodeIdMap.First(kv => kv.Value == i).Key, result.StatusCode);
            }
        }

        return results;
    }

    public async ValueTask DisposeAsync()
    {
        if (_session != null)
        {
            await _session.CloseAsync();
            _session.Dispose();
        }
    }
}

2. 集成到原有采集架构(修改 IndustrialDataCollector)

// 在 IndustrialDataCollector 中添加 OPC UA 支持
public class IndustrialDataCollector : BackgroundService
{
    // ... 原有 Modbus 等
    private readonly OpcUaCollector? _opcUa; // 可选注入

    public IndustrialDataCollector(
        ModbusTcpCollector modbus,
        OpcUaCollector? opcUa = null, // 通过 DI 注入
        // ...
        )
    {
        _modbus = modbus;
        _opcUa = opcUa;
        // ...
    }

    private async Task<CollectedDataPoint> CollectDataAsync()
    {
        var point = new CollectedDataPoint { Timestamp = DateTimeOffset.UtcNow };

        // Modbus 采集(原有)
        if (_modbus != null)
        {
            try
            {
                var regs = await _modbus.ReadHoldingRegistersAsync(1, 100, 20);
                point.Tags["Modbus_Temp"] = ModbusUtility.ConvertToFloat(regs.Span.Slice(0, 2));
            }
            catch { /* 容错 */ }
        }

        // OPC UA 采集
        if (_opcUa != null)
        {
            try
            {
                var nodes = new[] 
                { 
                    "ns=2;s=PLC1.Tag.Temperature", 
                    "ns=2;s=PLC1.Tag.Pressure" 
                };
                var values = await _opcUa.ReadNodesAsync(nodes);

                if (values.TryGetValue("ns=2;s=PLC1.Tag.Temperature", out var temp))
                    point.Tags["OpcUa_Temp"] = temp;
                // ...
            }
            catch (Exception ex)
            {
                _logger.LogWarning("OPC UA 读取异常: {Message}", ex.Message);
            }
        }

        return point;
    }

    // 重连逻辑扩展
    private async Task<bool> TryEnsureConnected(CancellationToken ct)
    {
        bool ok = true;

        if (_modbus != null) ok &= await _modbus.ConnectAsync();
        if (_opcUa != null) ok &= await _opcUa.ConnectAsync();

        return ok;
    }
}

3. 订阅模式(Monitored Items) – 更高效的实时采集

OPC UA 的订阅远优于轮询(事件驱动、低延迟)。

简单订阅示例(添加到 OpcUaCollector)

public async Task SubscribeAsync(IEnumerable<string> nodeIds, Action<string, object> valueChangedCallback)
{
    if (_session == null) return;

    var items = new MonitoredItemCollection();
    int clientHandle = 1;

    foreach (var nidStr in nodeIds)
    {
        var item = new MonitoredItem
        {
            StartNodeId = new NodeId(nidStr),
            AttributeId = Attributes.Value,
            SamplingInterval = 500,           // 采样间隔 ms
            QueueSize = 10,
            DiscardOldest = true,
            ClientHandle = clientHandle++
        };
        items.Add(item);
    }

    var subscription = new Subscription
    {
        PublishingInterval = 1000,
        KeepAliveCount = 30,
        LifetimeCount = 60,
        Priority = 0
    };

    subscription.AddItems(items);
    await _session.AddSubscriptionAsync(subscription);
    await subscription.CreateAsync();

    subscription.Publish += (sender, e) =>
    {
        foreach (var notification in e.NotificationMessage.NotificationData)
        {
            if (notification is MonitoredItemNotification min)
            {
                var item = subscription.MonitoredItems.First(i => i.ClientHandle == min.ClientHandle);
                valueChangedCallback(item.StartNodeId.ToString(), min.Value.Value);
            }
        }
    };
}

使用方式(在 BackgroundService 中):

await _opcUa.SubscribeAsync(nodes, (nodeId, value) =>
{
    // 直接推入 Channel
    _channel.Writer.TryWrite(new CollectedDataPoint
    {
        Timestamp = DateTimeOffset.UtcNow,
        Tags = { [nodeId] = value }
    });
});

4. DI 注册(Program.cs)

builder.Services.AddSingleton<OpcUaCollector>(sp => 
    new OpcUaCollector("opc.tcp://192.168.1.50:4840", sp.GetRequiredService<ILogger<OpcUaCollector>>()));

// 可选:多 Collector 共存(Modbus + OPC UA + MQTT 等)

5. 工业级注意事项(2025–2026 实践)

  • 证书与安全:生产环境强制 Sign & Encrypt(X509 证书),用 _appConfig.SecurityConfiguration 配置。
  • 重连策略:Session.ReconnectPeriod = 5000; + KeepAlive 检测。
  • 批量订阅:单个 Subscription 限 1000–5000 items,根据服务器能力分多个。
  • 性能:订阅优于轮询(延迟 <100ms,网络负载低)。
  • 诊断:用 UaExpert / Prosys OPC UA Browser 验证节点 & 订阅。
  • 异常处理:BadSessionIdInvalid / BadTimeout → 自动重连。

如果需要更详细的证书配置订阅 + 批量读取混合UA Pub/Sub 扩展(MQTT over OPC UA)、多服务器聚合,或与 WPF/MAUI 实时曲线联动示例,告诉我你的具体 OPC UA 服务器类型(Siemens、KEPServer、Codesys 等)或节点结构,我可以进一步细化代码。

以下是针对前文 OpcUaCollector 类中 OPC UA 客户端的证书配置完整示例(生产级、安全推荐方式)。基于 OPC Foundation 官方 UA-.NETStandard 栈(2025–2026 年最新实践),包括:

  • 自签名证书自动创建(开发/测试常用)
  • 加载已有证书(生产推荐)
  • CA 签发证书 + 信任列表管理
  • 加密模式(Sign + Encrypt)
  • 证书存储路径自定义(跨平台兼容)

1. 推荐配置原则(工业场景)

  • 开发/测试:允许栈自动创建自签名证书(最简单)。
  • 生产环境
    • 使用 CA 签发的 X.509 证书(推荐 RSA 2048/3072 或 ECDSA)。
    • 强制 SecurityMode.SignAndEncrypt + SecurityPolicy.Basic256Sha256(或更高)。
    • 证书存储用目录(DirectoryStore),便于容器/边缘部署。
    • 信任服务器证书(TrustedPeer) + 信任 CA(TrustedIssuer)。
    • 拒绝列表(RejectedStore)自动管理。

2. 完整 ApplicationConfiguration 示例(XML 或代码)

方式一:代码中硬编码(推荐用于容器/无配置文件场景)
private async Task<ApplicationConfiguration> CreateApplicationConfigurationAsync()
{
    var config = new ApplicationConfiguration
    {
        ApplicationName = "IndustrialDataCollector",
        ApplicationUri = Utils.Format("urn:{0}:IndustrialDataCollector", System.Net.Dns.GetHostName()),
        ApplicationType = ApplicationType.Client,

        // 证书相关核心配置
        SecurityConfiguration = new SecurityConfiguration
        {
            // 应用实例证书(own)
            ApplicationCertificate = new CertificateIdentifier
            {
                StoreType = CertificateStoreType.Directory,
                StorePath = "./pki/own",                        // 相对路径或绝对路径 %LocalAppData%/OPC Foundation/pki/own
                SubjectName = "CN=IndustrialDataCollector, DC=" + System.Net.Dns.GetHostName()
            },

            // 信任的发行者证书(CA 根证书)
            TrustedIssuerCertificates = new CertificateTrustList
            {
                StoreType = CertificateStoreType.Directory,
                StorePath = "./pki/issuer"
            },

            // 信任的 peer 证书(服务器证书)
            TrustedPeerCertificates = new CertificateTrustList
            {
                StoreType = CertificateStoreType.Directory,
                StorePath = "./pki/trusted"
            },

            // 被拒绝的证书(自动填充)
            RejectedCertificateStore = new CertificateTrustList
            {
                StoreType = CertificateStoreType.Directory,
                StorePath = "./pki/rejected"
            },

            // 支持的安全策略(生产只留强策略)
            SupportedSecurityPolicies = new StringCollection
            {
                SecurityPolicies.Basic256Sha256,
                SecurityPolicies.Basic256,
                SecurityPolicies.Basic128Rsa15   // 可选,视服务器支持
            },

            // 自动接受未签名证书(测试用,生产关闭)
            AutoAcceptUntrustedCertificates = false,

            // 最小密钥长度(推荐 2048+)
            MinimumCertificateKeySize = 2048
        },

        TransportConfigurations = new TransportConfigurationCollection(),
        ClientConfiguration = new ClientConfiguration
        {
            DefaultSessionTimeout = 60000,
            MinPublishRequestCount = 5,
            MaxPublishRequestCount = 20
        },

        // 其他(日志、Trace 等)
        TraceConfiguration = new TraceConfiguration { OutputFilePath = "./opcua.log" }
    };

    // 验证并创建缺失的证书存储目录
    await config.Validate(ApplicationType.Client);

    // 自动创建或检查应用证书(最重要一步!)
    bool haveCert = await config.CheckApplicationInstanceCertificate(
        false,                              // 不强制创建新证书
        CertificateFactory.DefaultKeySize,  // 2048
        CertificateFactory.DefaultLifeTime  // 120 个月
    );

    if (!haveCert)
    {
        _logger.LogError("应用证书无效或创建失败");
        throw new Exception("Application instance certificate invalid!");
    }

    return config;
}

使用方式(在 OpcUaCollector 构造函数中调用):

public OpcUaCollector(string endpointUrl, ILogger logger)
{
    _endpointUrl = endpointUrl;
    _logger = logger;
    _appConfig = CreateApplicationConfigurationAsync().GetAwaiter().GetResult();
}

3. 加载已有 CA 签发证书(生产推荐)

如果已有 .pfx(含私钥)或 .der + .key 文件:

// 在 SecurityConfiguration.ApplicationCertificate 设置后,手动加载
var certIdentifier = config.SecurityConfiguration.ApplicationCertificate;

// 从 PFX 文件加载(含私钥)
byte[] pfxData = File.ReadAllBytes("C:/certs/myapp.pfx");
var cert = new X509Certificate2(pfxData, "pfx密码", X509KeyStorageFlags.Exportable);

await certIdentifier.SetCertificateAsync(cert);

// 或从 DER + PEM 私钥加载(更安全)
var publicCert = new X509Certificate2("myapp.der");
var privateKey = await CertificateFactory.LoadPrivateKey("myapp.key", "key密码");

await certIdentifier.SetCertificateAsync(publicCert, privateKey);

4. 信任服务器证书(手动或自动)

// 手动信任服务器证书(生产前必须执行一次)
public async Task TrustServerCertificateAsync(byte[] serverCertDer)
{
    var cert = new X509Certificate2(serverCertDer);
    config.SecurityConfiguration.AddTrustedPeer(serverCertDer);

    // 或通过代码信任所有未签名(仅测试!)
    config.SecurityConfiguration.AutoAcceptUntrustedCertificates = true; // 危险,勿生产用
}

5. 连接时指定 SecurityMode & Policy

var endpoint = new ConfiguredEndpoint(null, new EndpointDescription(_endpointUrl)
{
    SecurityMode = MessageSecurityMode.SignAndEncrypt,
    SecurityPolicyUri = SecurityPolicies.Basic256Sha256
});

6. 常见目录结构(推荐)

./pki/
├── own/               # 本应用证书 + 私钥(自动创建或手动放置)
│   ├── cert.der
│   └── privatekey.pem
├── trusted/           # 信任的服务器证书(.der)
├── issuer/            # 信任的 CA 根/中间证书
└── rejected/          # 被拒绝的证书(自动)

7. 生产落地 Checklist

  • 使用 ./pki/ 相对路径或容器卷挂载(Kubernetes / Docker 友好)
  • 关闭 AutoAcceptUntrustedCertificates = false
  • 最小密钥 2048 bit,优先 Basic256Sha256
  • 证书过期监控(定期检查 NotAfter)
  • 备份 pki/own 私钥(丢失无法恢复会话)
  • 用 UaExpert 测试连接 & 证书信任

如果需要完整带证书的 Session 创建代码证书自动续期用户身份证书(User Certificate Token) 示例,或与现有 Modbus 混合采集的完整 BackgroundService,告诉我你的具体安全要求(匿名 / 用户名密码 / 证书用户 / CA 链),我可以继续扩展。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐