💡前言

前两篇文章分别完成了硬件接入与数据采集、LSTM模型训练与边缘部署,打通了从传感器到模型推理的完整链路。然而,在实际工业现场中,仅有模型是不够的——我们需要一个稳定、实时、可运维的推理服务,能够持续采集数据、调用模型、输出结果,并在检测到异常时及时告警。

本文将围绕以下内容展开:

  • 实时数据采集与推理流水线设计
  • QNN推理服务的工程化封装
  • 多线程与异步处理架构
  • 告警规则与日志记录
  • 性能评估与稳定性优化

1. 实时推理服务整体架构

在IQ-9100上,我们采用 C++ 推理引擎 + Python 采集调度 的混合架构,兼顾开发效率与运行性能。

在这里插入图片描述

  • 采集线程:以固定频率(如1kHz)读取MPU6050和INA219,将数据写入环形缓冲区。
  • 推理线程:从缓冲区取出最新窗口数据(1024点),调用QNN模型进行推理,返回故障类别及置信度。
  • 告警模块:根据推理结果和连续计数规则,触发告警并上报。

2. 环形缓冲区设计

为了避免采集与推理速度不匹配导致数据丢失,我们使用环形缓冲区(Ring Buffer)暂存原始数据。

2.1 Python实现(简易版)

# ring_buffer.py
import numpy as np
from collections import deque

class RingBuffer:
    def __init__(self, maxlen=2048):
        self.buffer = deque(maxlen=maxlen)

    def append(self, value):
        self.buffer.append(value)

    def get_window(self, length):
        """获取最近 length 个数据点,若不足则返回 None"""
        if len(self.buffer) < length:
            return None
        return np.array(list(self.buffer)[-length:])

在实际项目中,可采用共享内存 + 互斥锁实现跨语言通信,或直接使用Python的多线程队列(queue.Queue)作为简单替代。

2.2 采集线程

# collector_thread.py
import threading
import time
from sensor_mpu6050 import init_mpu, read_accel
from sensor_ina219 import init_ina219, read_current_ma
from ring_buffer import RingBuffer

class Collector(threading.Thread):
    def __init__(self, sample_rate=1000, buffer_maxlen=2048):
        super().__init__()
        self.sample_interval = 1.0 / sample_rate
        self.buffer = RingBuffer(maxlen=buffer_maxlen)
        self.running = True
        self.mpu_bus = init_mpu()
        self.ina = init_ina219()

    def run(self):
        while self.running:
            ax, ay, az = read_accel(self.mpu_bus)
            i_ma = read_current_ma(self.ina)
            # 这里只使用振动通道,若需融合电流可扩展
            self.buffer.append(ax)  
            time.sleep(self.sample_interval)

    def stop(self):
        self.running = False

3. QNN推理服务封装

IQ-9100上的QNN推理通常使用 C++ API,我们将其封装为Python可调用的共享库(libqnn_inference.so),通过ctypespybind11调用。

3.1 C++ 推理接口示例

// qnn_inference.h
extern "C" {
    void* init_model(const char* model_path);
    float* infer(void* ctx, float* input, int input_len);
    void cleanup(void* ctx);
}

简化版实现(伪代码):

#include "QnnInterface.h"

void* init_model(const char* model_path) {
    // 加载 QNN 模型,返回上下文指针
    return context;
}

float* infer(void* ctx, float* input, int input_len) {
    // 执行推理,返回 logits 数组(需动态分配)
    static float logits[4];
    // ... 调用 QNN API
    return logits;
}

编译为共享库:

g++ -shared -fPIC qnn_inference.cpp -o libqnn_inference.so -lQnn

3.2 Python 调用封装

# qnn_wrapper.py
import ctypes
import numpy as np

class QNNInference:
    def __init__(self, model_path):
        self.lib = ctypes.CDLL('./libqnn_inference.so')
        self.lib.init_model.argtypes = [ctypes.c_char_p]
        self.lib.init_model.restype = ctypes.c_void_p
        self.lib.infer.argtypes = [ctypes.c_void_p, np.ctypeslib.ndpointer(dtype=np.float32), ctypes.c_int]
        self.lib.infer.restype = ctypes.POINTER(ctypes.c_float)
        self.ctx = self.lib.init_model(model_path.encode())

    def infer(self, input_data):
        """input_data: np.ndarray shape (1024,) dtype float32"""
        if input_data.shape[0] != 1024:
            raise ValueError("Input must be 1024 length")
        logits_ptr = self.lib.infer(self.ctx, input_data, 1024)
        return np.array([logits_ptr[i] for i in range(4)])

4. 推理线程与后处理

4.1 推理线程

推理线程定期从环形缓冲区取出窗口,进行归一化、推理,并计算故障类别与置信度。

# inference_thread.py
import threading
import numpy as np
import pickle
from qnn_wrapper import QNNInference

class Inference(threading.Thread):
    def __init__(self, ring_buffer, window_len=1024, scaler_path='scaler.pkl'):
        super().__init__()
        self.buffer = ring_buffer
        self.window_len = window_len
        with open(scaler_path, 'rb') as f:
            self.scaler = pickle.load(f)
        self.model = QNNInference('./lstm_fault.qnn')
        self.running = True
        self.result_callback = None

    def run(self):
        while self.running:
            window = self.buffer.get_window(self.window_len)
            if window is not None:
                # 归一化
                window_norm = self.scaler.transform(window.reshape(1, -1)).flatten()
                logits = self.model.infer(window_norm.astype(np.float32))
                probs = np.exp(logits) / np.sum(np.exp(logits))  # softmax
                pred = np.argmax(probs)
                confidence = probs[pred]
                if self.result_callback:
                    self.result_callback(pred, confidence)
            time.sleep(0.01)  # 控制推理频率

    def stop(self):
        self.running = False

4.2 告警回调

# alarm.py
import time
import logging

logging.basicConfig(filename='fault.log', level=logging.WARNING)

class Alarm:
    def __init__(self, threshold=0.8, consecutive=3):
        self.threshold = threshold
        self.consecutive = consecutive
        self.count = 0

    def on_result(self, pred, confidence):
        if pred != 0 and confidence > self.threshold:
            self.count += 1
            if self.count >= self.consecutive:
                logging.warning(f"Fault detected! Type={pred}, confidence={confidence:.2f}")
                # 可发送 MQTT 消息或触发蜂鸣器
                self.count = 0
        else:
            self.count = 0

5. 主程序集成

将采集、推理、告警组合在一起:

# main.py
from collector_thread import Collector
from inference_thread import Inference
from alarm import Alarm

if __name__ == "__main__":
    collector = Collector(sample_rate=1000)
    inference = Inference(collector.buffer, window_len=1024)
    alarm = Alarm(threshold=0.8, consecutive=3)

    inference.result_callback = alarm.on_result

    collector.start()
    inference.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        collector.stop()
        inference.stop()
        collector.join()
        inference.join()

6. 性能评估与优化

6.1 延迟与吞吐

在IQ-9100上,我们对典型配置进行了测试:

指标 数值
采集频率 1 kHz
窗口长度 1024 点
推理耗时(单次) ≈ 8 ms
端到端延迟(从采样到告警) ≈ 20 ms
CPU 占用 单核约 30%

6.2 优化建议

  • 使用DMA/硬件加速:IQ-9100支持Hexagon DSP,可通过QNN SDK将模型部署到DSP上,推理延迟可降至2ms以内。
  • 批处理:如果多个传感器通道,可将多个窗口打包为batch,提升吞吐。
  • 数据压缩:CSV落盘可选用二进制格式(如numpy的.npy)减少IO开销。
  • 动态降频:当无异常时,降低采集频率或推理频率以节省功耗。

7. 稳定性与运维

7.1 看门狗与自恢复

为防止采集或推理线程异常退出,可增加看门狗定时器:

import signal

def watchdog_handler(signum, frame):
    logging.error("Watchdog timeout, restart service")
    os.system("systemctl restart fault_prediction.service")

signal.signal(signal.SIGALRM, watchdog_handler)
signal.alarm(10)  # 10秒内未喂狗则重启

在采集循环中定期喂狗:

signal.alarm(10)

7.2 日志与监控

  • 推理结果日志:记录每次推理的类别、置信度、时间戳,便于事后分析。
  • 性能监控:定期输出CPU/内存占用、推理延迟分布。
  • 远程上报:通过MQTT将告警信息发送到云端或运维平台。

8. 小结

本文完成了振荡电流故障预测系统的最后一环——实时推理服务与告警实现。至此,我们已在IQ-9100上搭建了一套完整的预测性维护系统:

  1. 硬件接入与数据采集 ✅
  2. LSTM模型训练与边缘部署 ✅
  3. 实时推理服务与告警系统 ✅

系统能够以1kHz的采样率实时监测设备状态,在检测到异常时快速告警,具备工业现场所需的稳定性和可维护性。

系列文章回顾

  • (一)硬件接入与数据采集 ✅
  • (二)LSTM模型训练与边缘部署 ✅
  • (三)实时推理服务与告警系统实现 ✅

下一步展望

  • 将系统集成到实际产线,开展长期稳定性测试
  • 探索更轻量级的时序模型(如TinyML)进一步降低资源占用
  • 引入更多传感器(温度、声音)实现多模态故障诊断
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐