在高通跃龙IQ-9100上实现振荡电流故障预测(3): 实时推理服务与告警系统实现
💡前言
前两篇文章分别完成了硬件接入与数据采集、LSTM模型训练与边缘部署,打通了从传感器到模型推理的完整链路。然而,在实际工业现场中,仅有模型是不够的——我们需要一个稳定、实时、可运维的推理服务,能够持续采集数据、调用模型、输出结果,并在检测到异常时及时告警。
本文将围绕以下内容展开:
- 实时数据采集与推理流水线设计
- QNN推理服务的工程化封装
- 多线程与异步处理架构
- 告警规则与日志记录
- 性能评估与稳定性优化
1. 实时推理服务整体架构
在IQ-9100上,我们采用 C++ 推理引擎 + Python 采集调度 的混合架构,兼顾开发效率与运行性能。

- 采集线程:以固定频率(如1kHz)读取MPU6050和INA219,将数据写入环形缓冲区。
- 推理线程:从缓冲区取出最新窗口数据(1024点),调用QNN模型进行推理,返回故障类别及置信度。
- 告警模块:根据推理结果和连续计数规则,触发告警并上报。
2. 环形缓冲区设计
为了避免采集与推理速度不匹配导致数据丢失,我们使用环形缓冲区(Ring Buffer)暂存原始数据。
2.1 Python实现(简易版)
# ring_buffer.py
import numpy as np
from collections import deque
class RingBuffer:
def __init__(self, maxlen=2048):
self.buffer = deque(maxlen=maxlen)
def append(self, value):
self.buffer.append(value)
def get_window(self, length):
"""获取最近 length 个数据点,若不足则返回 None"""
if len(self.buffer) < length:
return None
return np.array(list(self.buffer)[-length:])
在实际项目中,可采用共享内存 + 互斥锁实现跨语言通信,或直接使用Python的多线程队列(queue.Queue)作为简单替代。
2.2 采集线程
# collector_thread.py
import threading
import time
from sensor_mpu6050 import init_mpu, read_accel
from sensor_ina219 import init_ina219, read_current_ma
from ring_buffer import RingBuffer
class Collector(threading.Thread):
def __init__(self, sample_rate=1000, buffer_maxlen=2048):
super().__init__()
self.sample_interval = 1.0 / sample_rate
self.buffer = RingBuffer(maxlen=buffer_maxlen)
self.running = True
self.mpu_bus = init_mpu()
self.ina = init_ina219()
def run(self):
while self.running:
ax, ay, az = read_accel(self.mpu_bus)
i_ma = read_current_ma(self.ina)
# 这里只使用振动通道,若需融合电流可扩展
self.buffer.append(ax)
time.sleep(self.sample_interval)
def stop(self):
self.running = False
3. QNN推理服务封装
IQ-9100上的QNN推理通常使用 C++ API,我们将其封装为Python可调用的共享库(libqnn_inference.so),通过ctypes或pybind11调用。
3.1 C++ 推理接口示例
// qnn_inference.h
extern "C" {
void* init_model(const char* model_path);
float* infer(void* ctx, float* input, int input_len);
void cleanup(void* ctx);
}
简化版实现(伪代码):
#include "QnnInterface.h"
void* init_model(const char* model_path) {
// 加载 QNN 模型,返回上下文指针
return context;
}
float* infer(void* ctx, float* input, int input_len) {
// 执行推理,返回 logits 数组(需动态分配)
static float logits[4];
// ... 调用 QNN API
return logits;
}
编译为共享库:
g++ -shared -fPIC qnn_inference.cpp -o libqnn_inference.so -lQnn
3.2 Python 调用封装
# qnn_wrapper.py
import ctypes
import numpy as np
class QNNInference:
def __init__(self, model_path):
self.lib = ctypes.CDLL('./libqnn_inference.so')
self.lib.init_model.argtypes = [ctypes.c_char_p]
self.lib.init_model.restype = ctypes.c_void_p
self.lib.infer.argtypes = [ctypes.c_void_p, np.ctypeslib.ndpointer(dtype=np.float32), ctypes.c_int]
self.lib.infer.restype = ctypes.POINTER(ctypes.c_float)
self.ctx = self.lib.init_model(model_path.encode())
def infer(self, input_data):
"""input_data: np.ndarray shape (1024,) dtype float32"""
if input_data.shape[0] != 1024:
raise ValueError("Input must be 1024 length")
logits_ptr = self.lib.infer(self.ctx, input_data, 1024)
return np.array([logits_ptr[i] for i in range(4)])
4. 推理线程与后处理
4.1 推理线程
推理线程定期从环形缓冲区取出窗口,进行归一化、推理,并计算故障类别与置信度。
# inference_thread.py
import threading
import numpy as np
import pickle
from qnn_wrapper import QNNInference
class Inference(threading.Thread):
def __init__(self, ring_buffer, window_len=1024, scaler_path='scaler.pkl'):
super().__init__()
self.buffer = ring_buffer
self.window_len = window_len
with open(scaler_path, 'rb') as f:
self.scaler = pickle.load(f)
self.model = QNNInference('./lstm_fault.qnn')
self.running = True
self.result_callback = None
def run(self):
while self.running:
window = self.buffer.get_window(self.window_len)
if window is not None:
# 归一化
window_norm = self.scaler.transform(window.reshape(1, -1)).flatten()
logits = self.model.infer(window_norm.astype(np.float32))
probs = np.exp(logits) / np.sum(np.exp(logits)) # softmax
pred = np.argmax(probs)
confidence = probs[pred]
if self.result_callback:
self.result_callback(pred, confidence)
time.sleep(0.01) # 控制推理频率
def stop(self):
self.running = False
4.2 告警回调
# alarm.py
import time
import logging
logging.basicConfig(filename='fault.log', level=logging.WARNING)
class Alarm:
def __init__(self, threshold=0.8, consecutive=3):
self.threshold = threshold
self.consecutive = consecutive
self.count = 0
def on_result(self, pred, confidence):
if pred != 0 and confidence > self.threshold:
self.count += 1
if self.count >= self.consecutive:
logging.warning(f"Fault detected! Type={pred}, confidence={confidence:.2f}")
# 可发送 MQTT 消息或触发蜂鸣器
self.count = 0
else:
self.count = 0
5. 主程序集成
将采集、推理、告警组合在一起:
# main.py
from collector_thread import Collector
from inference_thread import Inference
from alarm import Alarm
if __name__ == "__main__":
collector = Collector(sample_rate=1000)
inference = Inference(collector.buffer, window_len=1024)
alarm = Alarm(threshold=0.8, consecutive=3)
inference.result_callback = alarm.on_result
collector.start()
inference.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
collector.stop()
inference.stop()
collector.join()
inference.join()
6. 性能评估与优化
6.1 延迟与吞吐
在IQ-9100上,我们对典型配置进行了测试:
| 指标 | 数值 |
|---|---|
| 采集频率 | 1 kHz |
| 窗口长度 | 1024 点 |
| 推理耗时(单次) | ≈ 8 ms |
| 端到端延迟(从采样到告警) | ≈ 20 ms |
| CPU 占用 | 单核约 30% |
6.2 优化建议
- 使用DMA/硬件加速:IQ-9100支持Hexagon DSP,可通过QNN SDK将模型部署到DSP上,推理延迟可降至2ms以内。
- 批处理:如果多个传感器通道,可将多个窗口打包为batch,提升吞吐。
- 数据压缩:CSV落盘可选用二进制格式(如numpy的
.npy)减少IO开销。 - 动态降频:当无异常时,降低采集频率或推理频率以节省功耗。
7. 稳定性与运维
7.1 看门狗与自恢复
为防止采集或推理线程异常退出,可增加看门狗定时器:
import signal
def watchdog_handler(signum, frame):
logging.error("Watchdog timeout, restart service")
os.system("systemctl restart fault_prediction.service")
signal.signal(signal.SIGALRM, watchdog_handler)
signal.alarm(10) # 10秒内未喂狗则重启
在采集循环中定期喂狗:
signal.alarm(10)
7.2 日志与监控
- 推理结果日志:记录每次推理的类别、置信度、时间戳,便于事后分析。
- 性能监控:定期输出CPU/内存占用、推理延迟分布。
- 远程上报:通过MQTT将告警信息发送到云端或运维平台。
8. 小结
本文完成了振荡电流故障预测系统的最后一环——实时推理服务与告警实现。至此,我们已在IQ-9100上搭建了一套完整的预测性维护系统:
- 硬件接入与数据采集 ✅
- LSTM模型训练与边缘部署 ✅
- 实时推理服务与告警系统 ✅
系统能够以1kHz的采样率实时监测设备状态,在检测到异常时快速告警,具备工业现场所需的稳定性和可维护性。
系列文章回顾
- (一)硬件接入与数据采集 ✅
- (二)LSTM模型训练与边缘部署 ✅
- (三)实时推理服务与告警系统实现 ✅
下一步展望
- 将系统集成到实际产线,开展长期稳定性测试
- 探索更轻量级的时序模型(如TinyML)进一步降低资源占用
- 引入更多传感器(温度、声音)实现多模态故障诊断
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)