AI 边缘部署:多模型级联推理的工程实践,从任务拆分到流水线调度
AI 边缘部署:多模型级联推理的工程实践,从任务拆分到流水线调度

一、单模型的边缘困境:一个模型搞不定所有事
边缘设备上的 AI 推理,往往不是"部署一个模型就完事"的简单场景。以工业质检为例,一个完整的检测流程需要:先定位产品区域(目标检测),再识别缺陷类型(分类),最后判断缺陷严重程度(回归)。如果用一个"万能大模型"端到端完成,模型参数量远超边缘芯片的承载能力;如果部署三个独立模型串行执行,中间结果的内存拷贝和上下文切换会吃掉大量算力。
更深层的问题是,多模型级联推理引入了新的系统复杂度:模型间的数据格式转换、内存共享机制、流水线调度策略、单模型失败时的降级方案。这些工程问题不是"调调参数"能解决的,需要从系统架构层面设计一套适合边缘资源约束的级联推理框架。
二、多模型级联推理的架构设计
2.1 级联推理的数据流
graph LR
subgraph "输入预处理"
Camera[摄像头帧] -->|缩放/归一化| PreProc[预处理模块]
end
subgraph "级联推理流水线"
PreProc -->|640x640 RGB| Det[检测模型<br/>YOLOv5-Nano]
Det -->|裁剪ROI| Crop[ROI 裁剪]
Crop -->|96x96 Patch| Cls[分类模型<br/>MobileNetV3]
Cls -->|缺陷类型| Reg[回归模型<br/>轻量MLP]
Reg -->|严重程度| Post[后处理]
end
subgraph "内存管理"
Shared[共享内存池]
Shared -.->|零拷贝| Det
Shared -.->|零拷贝| Cls
Shared -.->|零拷贝| Reg
end
Post -->|判定结果| Output[输出]
级联推理的核心设计原则是零拷贝传递。传统方案中,检测模型输出的 ROI 区域需要拷贝到新缓冲区再送入分类模型,每次拷贝涉及 CPU 介入和内存分配。在 ARM Cortex-A 平台上,一次 640x640 图像的内存拷贝约需 2ms,三个模型级联就是 6ms 的纯拷贝开销——对于要求 30fps 的实时场景,这是不可接受的。
解决方案是预分配一块连续的共享内存池,所有模型的输入输出张量都映射到这块内存中。检测模型输出的 ROI 坐标直接作为分类模型的输入偏移量,无需数据搬移。这种设计将级联间的数据传递开销从毫秒级降到微秒级。
2.2 流水线调度策略
gantt
title 多模型级联推理时序(流水线 vs 串行)
dateFormat X
axisFormat %L
section 串行执行
预处理 :a1, 0, 5
检测模型 :a2, 5, 20
ROI裁剪 :a3, 20, 22
分类模型 :a4, 22, 35
回归模型 :a5, 35, 42
section 流水线执行
预处理 :b1, 0, 5
检测模型 :b2, 5, 20
ROI裁剪 :b3, 20, 22
分类模型 :b4, 20, 33
回归模型 :b5, 33, 40
预处理_F2 :b6, 5, 10
检测_F2 :b7, 20, 35
流水线调度的关键洞察:检测模型处理第 N 帧时,预处理模块可以同时处理第 N+1 帧。当检测模型输出 ROI 后,分类模型立即开始处理第 N 帧的 ROI,而检测模型可以开始处理第 N+1 帧。这种流水线重叠将整体吞吐量提升约 40%。
三、生产级级联推理框架实现
3.1 共享内存池与零拷贝张量传递
/*
* 共享内存池:预分配连续内存,所有模型共享
* 核心思路:避免推理过程中的动态内存分配,消除内存碎片和拷贝开销
*/
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#define MEMORY_POOL_SIZE (32 * 1024 * 1024) /* 32MB 共享内存池 */
#define MAX_TENSOR_COUNT 16
typedef struct {
void* data; /* 数据指针,指向内存池中的偏移位置 */
size_t size; /* 张量数据字节数 */
int shape[4]; /* 张量形状 [N, C, H, W] */
int ndim; /* 维度数 */
int dtype; /* 数据类型: 0=float32, 1=int8 */
int model_id; /* 所属模型ID,用于生命周期管理 */
int ref_count; /* 引用计数,为0时可回收 */
} TensorDescriptor;
typedef struct {
uint8_t* pool; /* 内存池基地址 */
size_t pool_size; /* 内存池总大小 */
size_t used_offset; /* 当前已分配偏移 */
TensorDescriptor tensors[MAX_TENSOR_COUNT];
int tensor_count;
} MemoryPool;
/* 初始化内存池 */
int mempool_init(MemoryPool* mp) {
mp->pool = (uint8_t*)malloc(MEMORY_POOL_SIZE);
if (!mp->pool) return -1;
mp->pool_size = MEMORY_POOL_SIZE;
mp->used_offset = 0;
mp->tensor_count = 0;
memset(mp->tensors, 0, sizeof(mp->tensors));
return 0;
}
/* 从内存池分配张量,返回张量描述符索引 */
int mempool_alloc_tensor(MemoryPool* mp, int model_id,
const int* shape, int ndim, int dtype) {
if (mp->tensor_count >= MAX_TENSOR_COUNT) return -1;
/* 计算张量所需字节数 */
size_t elem_count = 1;
for (int i = 0; i < ndim; i++) elem_count *= shape[i];
size_t elem_size = (dtype == 0) ? sizeof(float) : sizeof(int8_t);
size_t total_size = elem_count * elem_size;
/* 对齐到64字节,适配ARM NEON的访存对齐要求 */
size_t aligned_size = (total_size + 63) & ~63;
if (mp->used_offset + aligned_size > mp->pool_size) {
/* 内存不足,尝试回收引用计数为0的张量 */
mempool_gc(mp);
if (mp->used_offset + aligned_size > mp->pool_size) {
return -1; /* 回收后仍不足 */
}
}
int idx = mp->tensor_count++;
mp->tensors[idx].data = mp->pool + mp->used_offset;
mp->tensors[idx].size = aligned_size;
memcpy(mp->tensors[idx].shape, shape, ndim * sizeof(int));
mp->tensors[idx].ndim = ndim;
mp->tensors[idx].dtype = dtype;
mp->tensors[idx].model_id = model_id;
mp->tensors[idx].ref_count = 1;
mp->used_offset += aligned_size;
return idx;
}
/* ROI零拷贝:检测模型输出ROI坐标,分类模型直接引用偏移地址 */
int mempool_create_roi_view(MemoryPool* mp, int src_tensor_idx,
int x, int y, int w, int h,
int target_model_id) {
/* 不分配新内存,直接在源张量上创建视图 */
TensorDescriptor* src = &mp->tensors[src_tensor_idx];
int src_w = src->shape[3]; /* 源张量宽度 */
int roi_shape[4] = {1, src->shape[1], h, w}; /* ROI形状 */
int idx = mp->tensor_count++;
/* 计算ROI在源张量中的偏移地址 */
size_t elem_size = (src->dtype == 0) ? sizeof(float) : sizeof(int8_t);
size_t row_stride = src_w * src->shape[1] * elem_size;
mp->tensors[idx].data = (uint8_t*)src->data + y * row_stride + x * src->shape[1] * elem_size;
mp->tensors[idx].size = 0; /* 视图模式,不拥有内存 */
memcpy(mp->tensors[idx].shape, roi_shape, 4 * sizeof(int));
mp->tensors[idx].ndim = 4;
mp->tensors[idx].dtype = src->dtype;
mp->tensors[idx].model_id = target_model_id;
mp->tensors[idx].ref_count = 1;
/* 增加源张量的引用计数,防止被回收 */
src->ref_count++;
return idx;
}
/* 垃圾回收:释放引用计数为0的张量 */
void mempool_gc(MemoryPool* mp) {
/* 简化实现:标记-压缩,将存活张量紧凑排列 */
size_t new_offset = 0;
int new_count = 0;
for (int i = 0; i < mp->tensor_count; i++) {
if (mp->tensors[i].ref_count > 0) {
if (mp->tensors[i].data != mp->pool + new_offset) {
memmove(mp->pool + new_offset, mp->tensors[i].data,
mp->tensors[i].size);
mp->tensors[i].data = mp->pool + new_offset;
}
new_offset += mp->tensors[i].size;
mp->tensors[new_count++] = mp->tensors[i];
}
}
mp->used_offset = new_offset;
mp->tensor_count = new_count;
}
3.2 流水线调度器
/*
* 流水线调度器:管理多模型级联推理的执行时序
* 核心设计:基于状态机的非阻塞调度,避免模型间的等待
*/
#include <pthread.h>
typedef enum {
STAGE_IDLE,
STAGE_RUNNING,
STAGE_DONE,
} StageState;
typedef struct {
int stage_id;
int model_id;
StageState state;
int input_tensor_idx;
int output_tensor_idx;
int64_t start_time_us;
int64_t end_time_us;
} PipelineStage;
typedef struct {
PipelineStage stages[8];
int stage_count;
int current_frame;
pthread_mutex_t lock;
} PipelineScheduler;
/* 推进流水线:检查每个阶段的前置依赖是否完成 */
void pipeline_tick(PipelineScheduler* sched, MemoryPool* mp) {
pthread_mutex_lock(&sched->lock);
for (int i = 0; i < sched->stage_count; i++) {
PipelineStage* s = &sched->stages[i];
if (s->state != STAGE_IDLE) continue;
/* 检查前置阶段是否完成 */
if (i > 0 && sched->stages[i-1].state != STAGE_DONE) continue;
/* 前置完成,启动当前阶段 */
s->state = STAGE_RUNNING;
s->start_time_us = get_time_us();
/* 提交推理任务到NCNN/TFLite执行器 */
submit_inference(s->model_id, s->input_tensor_idx,
s->output_tensor_idx, mp);
}
pthread_mutex_unlock(&sched->lock);
}
/* 推理完成回调 */
void on_inference_done(PipelineScheduler* sched, int stage_id) {
pthread_mutex_lock(&sched->lock);
PipelineStage* s = &sched->stages[stage_id];
s->state = STAGE_DONE;
s->end_time_us = get_time_us();
/* 通知下一阶段可以启动 */
if (stage_id + 1 < sched->stage_count) {
sched->stages[stage_id + 1].input_tensor_idx = s->output_tensor_idx;
}
pthread_mutex_unlock(&sched->lock);
}
四、级联推理的 Trade-offs 分析
方案一:级联推理 vs 端到端大模型
| 维度 | 多模型级联 | 端到端大模型 |
|---|---|---|
| 模型总参数量 | 小(3个小模型合计 < 5M) | 大(单模型 > 20M) |
| 推理延迟 | 中等(级联间有调度开销) | 低(单次前向传播) |
| 精度 | 高(每个模型专注单一任务) | 中等(多任务折衷) |
| 可维护性 | 高(单模型可独立更新) | 低(改一个任务需重训整体) |
| 内存峰值 | 低(模型可逐个加载) | 高(需同时加载整个模型) |
方案二:串行执行 vs 流水线执行
流水线执行将吞吐量提升约 40%,但引入了帧间状态管理的复杂度——第 N 帧的分类结果可能与第 N+1 帧的检测结果同时产生,需要用帧序号标记每个结果。此外,流水线要求每个阶段的执行时间相近,如果某个阶段耗时远大于其他阶段,流水线加速效果会退化为串行。
关键边界条件:
- 共享内存池的大小是硬约束。32MB 的内存池在 ARM Cortex-A53 上是合理的上限,超过这个值会影响 Linux 系统的页面缓存。如果三个模型的权重 + 中间张量超过 32MB,需要采用模型逐个加载策略,用 Flash 存储暂存不活跃的模型权重
- ROI 零拷贝视图要求源张量在内存中是连续排列的。如果检测模型的输出经过后处理(如 NMS)导致 ROI 区域不连续,零拷贝视图无法直接使用,需要退回到拷贝模式
- 流水线调度在单核 CPU 上没有收益——没有并行计算资源,流水线退化为串行。至少需要 2 核 CPU 才能体现流水线优势
五、总结
多模型级联推理是边缘 AI 落地的务实选择——用多个小模型协作替代一个大模型,在资源受限的边缘芯片上实现复杂的多步骤 AI 任务。核心工程挑战是级联间的数据传递开销和调度复杂度。
关键设计决策有三点:第一,共享内存池 + 零拷贝视图消除级联间的数据拷贝开销,将传递延迟从毫秒级降到微秒级;第二,流水线调度重叠相邻帧的推理阶段,提升整体吞吐量约 40%;第三,帧序号标记解决流水线模式下的结果归因问题。
落地建议:先用串行模式验证级联逻辑的正确性,再切换到流水线模式优化吞吐。内存池大小根据实际模型参数量设定,预留 20% 的安全余量。单模型失败时,采用降级策略——跳过失败模型,用前一个模型的输出作为最终结果,保证系统可用性而非完全中断。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐




所有评论(0)