AI 边缘部署:多模型级联推理的工程实践,从任务拆分到流水线调度

cover

一、单模型的边缘困境:一个模型搞不定所有事

边缘设备上的 AI 推理,往往不是"部署一个模型就完事"的简单场景。以工业质检为例,一个完整的检测流程需要:先定位产品区域(目标检测),再识别缺陷类型(分类),最后判断缺陷严重程度(回归)。如果用一个"万能大模型"端到端完成,模型参数量远超边缘芯片的承载能力;如果部署三个独立模型串行执行,中间结果的内存拷贝和上下文切换会吃掉大量算力。

更深层的问题是,多模型级联推理引入了新的系统复杂度:模型间的数据格式转换、内存共享机制、流水线调度策略、单模型失败时的降级方案。这些工程问题不是"调调参数"能解决的,需要从系统架构层面设计一套适合边缘资源约束的级联推理框架。

二、多模型级联推理的架构设计

2.1 级联推理的数据流

graph LR
    subgraph "输入预处理"
        Camera[摄像头帧] -->|缩放/归一化| PreProc[预处理模块]
    end

    subgraph "级联推理流水线"
        PreProc -->|640x640 RGB| Det[检测模型<br/>YOLOv5-Nano]
        Det -->|裁剪ROI| Crop[ROI 裁剪]
        Crop -->|96x96 Patch| Cls[分类模型<br/>MobileNetV3]
        Cls -->|缺陷类型| Reg[回归模型<br/>轻量MLP]
        Reg -->|严重程度| Post[后处理]
    end

    subgraph "内存管理"
        Shared[共享内存池]
        Shared -.->|零拷贝| Det
        Shared -.->|零拷贝| Cls
        Shared -.->|零拷贝| Reg
    end

    Post -->|判定结果| Output[输出]

级联推理的核心设计原则是零拷贝传递。传统方案中,检测模型输出的 ROI 区域需要拷贝到新缓冲区再送入分类模型,每次拷贝涉及 CPU 介入和内存分配。在 ARM Cortex-A 平台上,一次 640x640 图像的内存拷贝约需 2ms,三个模型级联就是 6ms 的纯拷贝开销——对于要求 30fps 的实时场景,这是不可接受的。

解决方案是预分配一块连续的共享内存池,所有模型的输入输出张量都映射到这块内存中。检测模型输出的 ROI 坐标直接作为分类模型的输入偏移量,无需数据搬移。这种设计将级联间的数据传递开销从毫秒级降到微秒级。

2.2 流水线调度策略

gantt
    title 多模型级联推理时序(流水线 vs 串行)
    dateFormat X
    axisFormat %L

    section 串行执行
    预处理     :a1, 0, 5
    检测模型   :a2, 5, 20
    ROI裁剪    :a3, 20, 22
    分类模型   :a4, 22, 35
    回归模型   :a5, 35, 42

    section 流水线执行
    预处理     :b1, 0, 5
    检测模型   :b2, 5, 20
    ROI裁剪    :b3, 20, 22
    分类模型   :b4, 20, 33
    回归模型   :b5, 33, 40
    预处理_F2  :b6, 5, 10
    检测_F2    :b7, 20, 35

流水线调度的关键洞察:检测模型处理第 N 帧时,预处理模块可以同时处理第 N+1 帧。当检测模型输出 ROI 后,分类模型立即开始处理第 N 帧的 ROI,而检测模型可以开始处理第 N+1 帧。这种流水线重叠将整体吞吐量提升约 40%。

三、生产级级联推理框架实现

3.1 共享内存池与零拷贝张量传递

/*
 * 共享内存池:预分配连续内存,所有模型共享
 * 核心思路:避免推理过程中的动态内存分配,消除内存碎片和拷贝开销
 */
#include <stdint.h>
#include <stdlib.h>
#include <string.h>

#define MEMORY_POOL_SIZE (32 * 1024 * 1024)  /* 32MB 共享内存池 */
#define MAX_TENSOR_COUNT 16

typedef struct {
    void*   data;          /* 数据指针,指向内存池中的偏移位置 */
    size_t  size;          /* 张量数据字节数 */
    int     shape[4];      /* 张量形状 [N, C, H, W] */
    int     ndim;          /* 维度数 */
    int     dtype;         /* 数据类型: 0=float32, 1=int8 */
    int     model_id;      /* 所属模型ID,用于生命周期管理 */
    int     ref_count;     /* 引用计数,为0时可回收 */
} TensorDescriptor;

typedef struct {
    uint8_t*        pool;              /* 内存池基地址 */
    size_t          pool_size;         /* 内存池总大小 */
    size_t          used_offset;       /* 当前已分配偏移 */
    TensorDescriptor tensors[MAX_TENSOR_COUNT];
    int             tensor_count;
} MemoryPool;

/* 初始化内存池 */
int mempool_init(MemoryPool* mp) {
    mp->pool = (uint8_t*)malloc(MEMORY_POOL_SIZE);
    if (!mp->pool) return -1;

    mp->pool_size = MEMORY_POOL_SIZE;
    mp->used_offset = 0;
    mp->tensor_count = 0;
    memset(mp->tensors, 0, sizeof(mp->tensors));
    return 0;
}

/* 从内存池分配张量,返回张量描述符索引 */
int mempool_alloc_tensor(MemoryPool* mp, int model_id,
                         const int* shape, int ndim, int dtype) {
    if (mp->tensor_count >= MAX_TENSOR_COUNT) return -1;

    /* 计算张量所需字节数 */
    size_t elem_count = 1;
    for (int i = 0; i < ndim; i++) elem_count *= shape[i];
    size_t elem_size = (dtype == 0) ? sizeof(float) : sizeof(int8_t);
    size_t total_size = elem_count * elem_size;

    /* 对齐到64字节,适配ARM NEON的访存对齐要求 */
    size_t aligned_size = (total_size + 63) & ~63;

    if (mp->used_offset + aligned_size > mp->pool_size) {
        /* 内存不足,尝试回收引用计数为0的张量 */
        mempool_gc(mp);
        if (mp->used_offset + aligned_size > mp->pool_size) {
            return -1;  /* 回收后仍不足 */
        }
    }

    int idx = mp->tensor_count++;
    mp->tensors[idx].data = mp->pool + mp->used_offset;
    mp->tensors[idx].size = aligned_size;
    memcpy(mp->tensors[idx].shape, shape, ndim * sizeof(int));
    mp->tensors[idx].ndim = ndim;
    mp->tensors[idx].dtype = dtype;
    mp->tensors[idx].model_id = model_id;
    mp->tensors[idx].ref_count = 1;

    mp->used_offset += aligned_size;
    return idx;
}

/* ROI零拷贝:检测模型输出ROI坐标,分类模型直接引用偏移地址 */
int mempool_create_roi_view(MemoryPool* mp, int src_tensor_idx,
                            int x, int y, int w, int h,
                            int target_model_id) {
    /* 不分配新内存,直接在源张量上创建视图 */
    TensorDescriptor* src = &mp->tensors[src_tensor_idx];
    int src_w = src->shape[3];  /* 源张量宽度 */

    int roi_shape[4] = {1, src->shape[1], h, w};  /* ROI形状 */
    int idx = mp->tensor_count++;

    /* 计算ROI在源张量中的偏移地址 */
    size_t elem_size = (src->dtype == 0) ? sizeof(float) : sizeof(int8_t);
    size_t row_stride = src_w * src->shape[1] * elem_size;
    mp->tensors[idx].data = (uint8_t*)src->data + y * row_stride + x * src->shape[1] * elem_size;
    mp->tensors[idx].size = 0;  /* 视图模式,不拥有内存 */
    memcpy(mp->tensors[idx].shape, roi_shape, 4 * sizeof(int));
    mp->tensors[idx].ndim = 4;
    mp->tensors[idx].dtype = src->dtype;
    mp->tensors[idx].model_id = target_model_id;
    mp->tensors[idx].ref_count = 1;

    /* 增加源张量的引用计数,防止被回收 */
    src->ref_count++;

    return idx;
}

/* 垃圾回收:释放引用计数为0的张量 */
void mempool_gc(MemoryPool* mp) {
    /* 简化实现:标记-压缩,将存活张量紧凑排列 */
    size_t new_offset = 0;
    int new_count = 0;

    for (int i = 0; i < mp->tensor_count; i++) {
        if (mp->tensors[i].ref_count > 0) {
            if (mp->tensors[i].data != mp->pool + new_offset) {
                memmove(mp->pool + new_offset, mp->tensors[i].data,
                        mp->tensors[i].size);
                mp->tensors[i].data = mp->pool + new_offset;
            }
            new_offset += mp->tensors[i].size;
            mp->tensors[new_count++] = mp->tensors[i];
        }
    }
    mp->used_offset = new_offset;
    mp->tensor_count = new_count;
}

3.2 流水线调度器

/*
 * 流水线调度器:管理多模型级联推理的执行时序
 * 核心设计:基于状态机的非阻塞调度,避免模型间的等待
 */
#include <pthread.h>

typedef enum {
    STAGE_IDLE,
    STAGE_RUNNING,
    STAGE_DONE,
} StageState;

typedef struct {
    int             stage_id;
    int             model_id;
    StageState      state;
    int             input_tensor_idx;
    int             output_tensor_idx;
    int64_t         start_time_us;
    int64_t         end_time_us;
} PipelineStage;

typedef struct {
    PipelineStage   stages[8];
    int             stage_count;
    int             current_frame;
    pthread_mutex_t lock;
} PipelineScheduler;

/* 推进流水线:检查每个阶段的前置依赖是否完成 */
void pipeline_tick(PipelineScheduler* sched, MemoryPool* mp) {
    pthread_mutex_lock(&sched->lock);

    for (int i = 0; i < sched->stage_count; i++) {
        PipelineStage* s = &sched->stages[i];

        if (s->state != STAGE_IDLE) continue;

        /* 检查前置阶段是否完成 */
        if (i > 0 && sched->stages[i-1].state != STAGE_DONE) continue;

        /* 前置完成,启动当前阶段 */
        s->state = STAGE_RUNNING;
        s->start_time_us = get_time_us();

        /* 提交推理任务到NCNN/TFLite执行器 */
        submit_inference(s->model_id, s->input_tensor_idx,
                         s->output_tensor_idx, mp);
    }

    pthread_mutex_unlock(&sched->lock);
}

/* 推理完成回调 */
void on_inference_done(PipelineScheduler* sched, int stage_id) {
    pthread_mutex_lock(&sched->lock);

    PipelineStage* s = &sched->stages[stage_id];
    s->state = STAGE_DONE;
    s->end_time_us = get_time_us();

    /* 通知下一阶段可以启动 */
    if (stage_id + 1 < sched->stage_count) {
        sched->stages[stage_id + 1].input_tensor_idx = s->output_tensor_idx;
    }

    pthread_mutex_unlock(&sched->lock);
}

四、级联推理的 Trade-offs 分析

方案一:级联推理 vs 端到端大模型

维度 多模型级联 端到端大模型
模型总参数量 小(3个小模型合计 < 5M) 大(单模型 > 20M)
推理延迟 中等(级联间有调度开销) 低(单次前向传播)
精度 高(每个模型专注单一任务) 中等(多任务折衷)
可维护性 高(单模型可独立更新) 低(改一个任务需重训整体)
内存峰值 低(模型可逐个加载) 高(需同时加载整个模型)

方案二:串行执行 vs 流水线执行

流水线执行将吞吐量提升约 40%,但引入了帧间状态管理的复杂度——第 N 帧的分类结果可能与第 N+1 帧的检测结果同时产生,需要用帧序号标记每个结果。此外,流水线要求每个阶段的执行时间相近,如果某个阶段耗时远大于其他阶段,流水线加速效果会退化为串行。

关键边界条件

  • 共享内存池的大小是硬约束。32MB 的内存池在 ARM Cortex-A53 上是合理的上限,超过这个值会影响 Linux 系统的页面缓存。如果三个模型的权重 + 中间张量超过 32MB,需要采用模型逐个加载策略,用 Flash 存储暂存不活跃的模型权重
  • ROI 零拷贝视图要求源张量在内存中是连续排列的。如果检测模型的输出经过后处理(如 NMS)导致 ROI 区域不连续,零拷贝视图无法直接使用,需要退回到拷贝模式
  • 流水线调度在单核 CPU 上没有收益——没有并行计算资源,流水线退化为串行。至少需要 2 核 CPU 才能体现流水线优势

五、总结

多模型级联推理是边缘 AI 落地的务实选择——用多个小模型协作替代一个大模型,在资源受限的边缘芯片上实现复杂的多步骤 AI 任务。核心工程挑战是级联间的数据传递开销和调度复杂度。

关键设计决策有三点:第一,共享内存池 + 零拷贝视图消除级联间的数据拷贝开销,将传递延迟从毫秒级降到微秒级;第二,流水线调度重叠相邻帧的推理阶段,提升整体吞吐量约 40%;第三,帧序号标记解决流水线模式下的结果归因问题。

落地建议:先用串行模式验证级联逻辑的正确性,再切换到流水线模式优化吞吐。内存池大小根据实际模型参数量设定,预留 20% 的安全余量。单模型失败时,采用降级策略——跳过失败模型,用前一个模型的输出作为最终结果,保证系统可用性而非完全中断。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐