一、为什么需要多模型编排

1.1 单模型的局限

现实中的 AI 任务往往不是"一个模型搞定一切"。举几个典型例子:

智慧安防场景:摄像头拍到一帧画面,你需要同时完成:

  1. 人脸检测(“画面里有没有人?”)→ 检测模型
  2. 人脸识别(“这个人是谁?”)→ 识别模型
  3. 行为分析(“这个人在干什么?”)→ 行为分类模型

这三个模型各司其职,缺一不可。任何一个单独拿出来都无法完成完整的安防任务。

智慧零售场景:顾客走进商店,你需要:

  1. 商品检测(“货架上有哪些商品?”)→ 检测模型
  2. 商品识别(“具体是哪个 SKU?”)→ 分类模型
  3. 顾客画像(“这个顾客大概什么年龄段?”)→ 属性识别模型

自动驾驶场景:车辆行驶过程中,你需要:

  1. 车道线检测 → 分割模型
  2. 目标检测(车辆、行人、障碍物)→ 检测模型
  3. 语义理解(路标、信号灯)→ 分类模型
  4. 路径规划 → 规划模型

1.2 编排的核心问题

多模型编排本质上是在解决三个核心问题:

执行顺序:哪些模型必须先跑、哪些可以并行跑?人脸检测必须先于人脸识别,因为识别模型需要检测模型提供的裁剪后的人脸区域。

数据流转:模型 A 的输出如何变成模型 B 的输入?检测模型输出的是边界框坐标,识别模型需要的是裁剪后的人脸图像——中间需要一个裁剪操作。

资源分配:多个模型共享 NPU 显存,如何避免 OOM(显存溢出)?如果三个模型各占 2GB 显存,但显卡总共只有 6GB,就需要精心设计显存复用策略。


二、三种编排模式

2.1 串行编排(Pipeline)

串行编排是最简单的模式:模型 A → 模型 B → 模型 C,一个接一个执行。

适用场景

  • 后一个模型的输入依赖前一个模型的输出
  • 对延迟不敏感,但对准确率要求高
  • 模型之间有严格的数据依赖关系

原理

输入数据 → [模型A] → 中间结果 → [模型B] → 中间结果 → [模型C] → 最终输出

时间轴:
  |--模型A--|--模型B--|--模型C--|
  总延迟 = A延迟 + B延迟 + C延迟

优缺点

  • 优点:逻辑简单、易于调试、显存占用可控(一次只跑一个模型)
  • 缺点:总延迟是各模型延迟之和,无法利用并行性

2.2 并行编排

并行编排:模型 A、B、C 同时执行,最后合并结果。

适用场景

  • 多个模型之间没有数据依赖
  • 对延迟要求高,需要缩短总耗时
  • 各模型处理同一输入的不同方面

原理

          ┌─[模型A]─┐
输入数据 ─┼─[模型B]─┼─→ [结果合并] → 最终输出
          └─[模型C]─┘

时间轴:
  |--模型A--|
  |--模型B--|  → [合并]
  |--模型C--|
  总延迟 = max(A延迟, B延迟, C延迟) + 合并延迟

优缺点

  • 优点:总延迟约等于最慢的那个模型,大幅缩短
  • 缺点:需要多份显存同时容纳多个模型,显存压力大

2.3 有向无环图(DAG)编排

DAG 编排是最灵活的模式:模型之间的依赖关系构成一个有向无环图,调度器根据图结构自动决定执行顺序。

适用场景

  • 复杂业务流程,模型之间有分支和汇聚
  • 部分串行、部分并行的混合结构
  • 需要动态调整执行计划

原理

          ┌─[人脸检测]─→[人脸识别]─┐
          │                        │
输入图像 ─┼─[人体检测]─→[行为分析]─┼─→ [结果融合] → 最终输出
          │                        │
          └─[车牌检测]─→[车牌识别]─┘

这是一个 DAG:
  人脸检测 → 人脸识别
  人体检测 → 行为分析
  车牌检测 → 车牌识别
  三个分支并行,最后汇聚到结果融合

优缺点

  • 优点:最灵活,能表达任意复杂的依赖关系
  • 缺点:实现复杂,需要 DAG 调度器,调试困难

三、CANN 多模型编排实现

3.1 串行编排器

import torch.npu
import time

class SequentialOrchestrator:
    """串行编排器 - 最简单直观的多模型编排方式"""
    
    def __init__(self):
        self.models = []          # 存储 (名称, 模型, 前处理, 后处理) 四元组
        self.stream = torch.npu.Stream()  # 所有模型共享一个 Stream(串行执行)
    
    def add_stage(self, name, model, preprocessor=None, postprocessor=None):
        """添加一个编排阶段
        
        参数说明:
            name: 阶段名称,用于日志和调试
            model: 已加载的 .om 模型
            preprocessor: 前处理函数,将上一阶段的输出转换为当前阶段的输入
            postprocessor: 后处理函数,将当前阶段的输出转换为下一阶段可接受的格式
        """
        self.models.append({
            'name': name,
            'model': model,
            'preprocess': preprocessor or (lambda x: x),  # 默认不做处理
            'postprocess': postprocessor or (lambda x: x)
        })
    
    def run(self, initial_input):
        """执行完整的串行流水线
        
        数据流向:
            initial_input → [阶段1预处理] → [阶段1推理] → [阶段1后处理]
                         → [阶段2预处理] → [阶段2推理] → [阶段2后处理]
                         → ... → 最终输出
        """
        current_data = initial_input
        stage_results = {}
        
        for stage in self.models:
            stage_start = time.time()
            
            # 前处理: 将上一阶段的输出转换为当前模型需要的输入格式
            model_input = stage['preprocess'](current_data)
            
            # 推理: 在 NPU 上执行模型
            with torch.npu.stream(self.stream):
                model_output = stage['model'](model_input)
            
            # 后处理: 将模型输出转换为下一阶段可接受的格式
            current_data = stage['postprocess'](model_output)
            
            stage_time = (time.time() - stage_start) * 1000
            stage_results[stage['name']] = {
                'latency_ms': stage_time,
                'input_shape': model_input.shape if hasattr(model_input, 'shape') else None,
                'output_shape': model_output.shape if hasattr(model_output, 'shape') else None
            }
        
        return current_data, stage_results

# ─── 使用示例:安防场景的人脸检测+识别流水线 ───

# 假设我们有两个模型
face_detector = torch.load("face_detector.om")      # 人脸检测模型
face_recognizer = torch.load("face_recognizer.om")  # 人脸识别模型

# 定义前处理和后处理
def detect_preprocess(image):
    """检测模型前处理: 直接输入图像"""
    return image

def detect_postprocess(detection_output):
    """检测模型后处理: 从检测结果中裁剪出人脸区域
    
    检测模型输出格式: [[x1, y1, x2, y2, confidence], ...]
    我们需要裁剪出每个人脸区域,供识别模型使用
    """
    face_boxes = detection_output['boxes']  # 边界框坐标
    face_scores = detection_output['scores']  # 置信度
    
    # 只保留高置信度的人脸
    valid_mask = face_scores > 0.5
    face_boxes = face_boxes[valid_mask]
    
    return face_boxes  # 返回裁剪后的人脸区域

def recognize_preprocess(face_boxes):
    """识别模型前处理: 将裁剪后的人脸 resize 到识别模型要求的尺寸"""
    # 实际实现中这里会做 resize、归一化等操作
    return face_boxes

def recognize_postprocess(recognition_output):
    """识别模型后处理: 将特征向量转换为身份信息"""
    return recognition_output  # 返回识别结果

# 组装流水线
orchestrator = SequentialOrchestrator()
orchestrator.add_stage("人脸检测", face_detector, detect_preprocess, detect_postprocess)
orchestrator.add_stage("人脸识别", face_recognizer, recognize_preprocess, recognize_postprocess)

# 执行
image = load_camera_frame()
result, stage_info = orchestrator.run(image)

print(f"识别结果: {result}")
for stage_name, info in stage_info.items():
    print(f"  {stage_name}: {info['latency_ms']:.2f}ms")

3.2 并行编排器

import threading

class ParallelOrchestrator:
    """并行编排器 - 多个模型同时执行,缩短总延迟
    
    核心思想: 如果多个模型之间没有数据依赖,就可以让它们在不同的 Stream 上同时执行。
    这样总延迟约等于最慢的那个模型,而不是所有模型之和。
    """
    
    def __init__(self, num_streams=4):
        self.stages = []  # 存储并行执行的阶段
        # 每个阶段一个 Stream,实现真正的并行执行
        self.streams = [torch.npu.Stream() for _ in range(num_streams)]
    
    def add_stage(self, name, model, preprocessor=None, postprocessor=None):
        """添加一个并行阶段"""
        self.stages.append({
            'name': name,
            'model': model,
            'preprocess': preprocessor or (lambda x: x),
            'postprocess': postprocessor or (lambda x: x)
        })
    
    def run(self, input_data):
        """并行执行所有阶段
        
        执行流程:
        1. 所有模型同时开始推理(各自在自己的 Stream 上)
        2. 等待所有模型完成
        3. 按顺序执行后处理(因为结果可能有顺序要求)
        """
        results = {}
        threads = []
        
        # 为每个阶段创建一个线程,并行执行
        for i, stage in enumerate(self.stages):
            stream = self.streams[i % len(self.streams)]
            
            def run_stage(stage, stream, stage_idx):
                """单个阶段的执行函数"""
                stage_start = time.time()
                
                # 前处理
                model_input = stage['preprocess'](input_data)
                
                # 推理(在独立的 Stream 上执行,不影响其他 Stream)
                with torch.npu.stream(stream):
                    model_output = stage['model'](model_input)
                
                # 后处理
                stage_result = stage['postprocess'](model_output)
                
                results[stage['name']] = {
                    'result': stage_result,
                    'latency_ms': (time.time() - stage_start) * 1000
                }
            
            thread = threading.Thread(target=run_stage, args=(stage, stream, i))
            threads.append(thread)
            thread.start()
        
        # 等待所有线程完成
        for thread in threads:
            thread.join()
        
        # 同步所有 Stream
        torch.npu.synchronize()
        
        return results

# ─── 使用示例:智慧零售场景 ───

# 三个模型并行执行: 商品检测、顾客检测、环境分析
product_detector = torch.load("product_detector.om")
customer_detector = torch.load("customer_detector.om")
environment_analyzer = torch.load("environment_analyzer.om")

parallel = ParallelOrchestrator(num_streams=3)

# 注意: 这三个模型处理同一帧图像,但关注不同方面
# 它们之间没有数据依赖,所以可以并行执行
parallel.add_stage("商品检测", product_detector)
parallel.add_stage("顾客检测", customer_detector)
parallel.add_stage("环境分析", environment_analyzer)

# 并行执行
frame = load_store_camera_frame()
results = parallel.run(frame)

# results 包含三个模型各自的输出
for stage_name, info in results.items():
    print(f"{stage_name}: {info['latency_ms']:.2f}ms")

3.3 DAG 编排器

from collections import defaultdict, deque

class DAGOrchestrator:
    """DAG 编排器 - 最灵活的多模型编排方式
    
    什么是 DAG?
    DAG(Directed Acyclic Graph)即有向无环图。
    在多模型编排中,DAG 描述了模型之间的依赖关系:
      - 节点 = 模型(或处理步骤)
      - 有向边 = 数据依赖(A 的输出是 B 的输入)
      - 无环 = 不会出现循环依赖(A→B→C→A 是不允许的)
    
    调度算法:
      使用拓扑排序(Topological Sort)确定执行顺序。
      没有依赖的节点可以并行执行。
    """
    
    def __init__(self):
        self.nodes = {}           # 节点信息 {节点名: {model, preprocess, postprocess}}
        self.edges = defaultdict(list)  # 邻接表 {父节点: [子节点列表]}
        self.in_degree = defaultdict(int)  # 入度 {节点名: 入度值}
        self.reverse_edges = defaultdict(list)  # 反向边 {子节点: [父节点列表]}
    
    def add_node(self, name, model, preprocessor=None, postprocessor=None):
        """添加节点(模型)"""
        self.nodes[name] = {
            'model': model,
            'preprocess': preprocessor or (lambda x: x),
            'postprocess': postprocessor or (lambda x: x)
        }
        # 确保节点在入度表中存在
        if name not in self.in_degree:
            self.in_degree[name] = 0
    
    def add_edge(self, from_node, to_node):
        """添加依赖边: from_node 的输出是 to_node 的输入
        
        例如: add_edge("人脸检测", "人脸识别")
        表示: 人脸识别依赖人脸检测的结果
        """
        self.edges[from_node].append(to_node)
        self.reverse_edges[to_node].append(from_node)
        self.in_degree[to_node] += 1
    
    def _topological_sort(self):
        """拓扑排序 - 确定执行顺序
        
        算法: Kahn's Algorithm
        1. 找到所有入度为 0 的节点(没有依赖的节点)
        2. 将它们加入执行队列
        3. 执行后,将它们的下游节点入度减 1
        4. 重复直到所有节点都执行完毕
        
        返回: [[可并行执行的节点组1], [可并行执行的节点组2], ...]
        """
        # 计算入度
        in_deg = dict(self.in_degree)
        
        # 入度为 0 的节点可以立即执行
        queue = deque([node for node in self.nodes if in_deg[node] == 0])
        
        execution_levels = []  # 每一层的节点可以并行执行
        
        while queue:
            # 当前层: 所有入度为 0 的节点
            current_level = list(queue)
            execution_levels.append(current_level)
            
            queue.clear()
            
            # 执行当前层的节点后,更新下游节点的入度
            for node in current_level:
                for downstream in self.edges[node]:
                    in_deg[downstream] -= 1
                    if in_deg[downstream] == 0:
                        queue.append(downstream)
        
        return execution_levels
    
    def run(self, initial_input):
        """执行 DAG 编排
        
        流程:
        1. 对 DAG 进行拓扑排序,得到执行层级
        2. 逐层执行: 同一层的节点可以并行执行
        3. 层间串行: 上一层全部完成才能执行下一层
        """
        execution_levels = self._topological_sort()
        
        # 存储每个节点的输出(供后续节点使用)
        node_outputs = {}
        all_results = {}
        total_start = time.time()
        
        for level_idx, level in enumerate(execution_levels):
            level_start = time.time()
            
            if len(level) == 1:
                # ── 单节点: 直接串行执行 ──
                node_name = level[0]
                node = self.nodes[node_name]
                
                # 收集所有父节点的输出作为输入
                parent_outputs = [node_outputs[p] for p in self.reverse_edges[node_name]]
                model_input = node['preprocess'](parent_outputs if len(parent_outputs) > 1 else parent_outputs[0] if parent_outputs else initial_input)
                
                # 推理
                model_output = node['model'](model_input)
                
                # 后处理并存储
                node_outputs[node_name] = node['postprocess'](model_output)
                
            else:
                # ── 多节点: 并行执行 ──
                threads = []
                
                for node_name in level:
                    node = self.nodes[node_name]
                    
                    def execute_node(name, node):
                        parent_outputs = [node_outputs[p] for p in self.reverse_edges[name]]
                        model_input = node['preprocess'](parent_outputs if len(parent_outputs) > 1 else parent_outputs[0] if parent_outputs else initial_input)
                        model_output = node['model'](model_input)
                        node_outputs[name] = node['postprocess'](model_output)
                    
                    thread = threading.Thread(target=execute_node, args=(node_name, node))
                    threads.append(thread)
                    thread.start()
                
                for thread in threads:
                    thread.join()
            
            level_latency = (time.time() - level_start) * 1000
            all_results[f"level_{level_idx}"] = {
                'nodes': level,
                'latency_ms': level_latency
            }
        
        total_latency = (time.time() - total_start) * 1000
        
        return node_outputs, {
            'levels': all_results,
            'total_latency_ms': total_latency
        }

# ─── 使用示例:安防场景的 DAG 编排 ───
"""
DAG 结构:
           ┌─[人脸检测]─→[人脸识别]─┐
           │                        │
[摄像头] ──┼─[人体检测]─→[行为分析]─┼─→ [结果融合]
           │                        │
           └─[车牌检测]─→[车牌识别]─┘

三个分支并行,最后汇聚到结果融合节点。
"""

# 创建 DAG
dag = DAGOrchestrator()

# 添加节点
dag.add_node("人脸检测", torch.load("face_detector.om"))
dag.add_node("人体检测", torch.load("body_detector.om"))
dag.add_node("车牌检测", torch.load("plate_detector.om"))
dag.add_node("人脸识别", torch.load("face_recognizer.om"))
dag.add_node("行为分析", torch.load("behavior_analyzer.om"))
dag.add_node("车牌识别", torch.load("plate_recognizer.om"))
dag.add_node("结果融合", None)  # 融合节点可以是一个简单的合并函数

# 添加依赖边
dag.add_edge("人脸检测", "人脸识别")
dag.add_edge("人体检测", "行为分析")
dag.add_edge("车牌检测", "车牌识别")
dag.add_edge("人脸识别", "结果融合")
dag.add_edge("行为分析", "结果融合")
dag.add_edge("车牌识别", "结果融合")

# 执行
results, timing = dag.run(camera_frame)

print(f"总耗时: {timing['total_latency_ms']:.2f}ms")
for level, info in timing['levels'].items():
    print(f"  {level}: {info['nodes']}{info['latency_ms']:.2f}ms")

四、显存共享策略

4.1 为什么需要关注显存

NPU 显存是有限资源。以 Atlas 500 为例,它只有有限的片上存储。如果多个模型同时驻留显存,很容易出现显存不足的问题。

核心思路: 不是所有模型都需要同时在显存中。串行编排时,模型 A 执行完就可以把显存释放给模型 B 使用。

4.2 显存复用实现

class MemoryEfficientOrchestrator:
    """显存高效的编排器
    
    核心策略:
    1. 串行模型共享显存: A 释放后 B 才加载
    2. 并行模型分配不同显存区域
    3. 中间结果及时释放
    """
    
    def __init__(self, total_memory_gb=4.0):
        self.total_memory = total_memory_gb
        self.allocated = 0
    
    def load_model_with_memory_check(self, model_path, required_memory_gb):
        """带显存检查的模型加载
        
        如果显存不够,先卸载已加载的模型,再加载新模型。
        这种策略在串行编排中非常有效。
        """
        if self.allocated + required_memory_gb > self.total_memory:
            print(f"显存不足: 需要 {required_memory_gb}GB, 可用 {self.total_memory - self.allocated}GB")
            print("卸载已有模型释放显存...")
            self._unload_all_models()
        
        model = torch.load(model_path)
        self.allocated += required_memory_gb
        
        print(f"加载模型: {model_path}, 使用显存: {required_memory_gb}GB, 总占用: {self.allocated}GB")
        return model
    
    def _unload_all_models(self):
        """卸载所有模型"""
        # 实际实现中会调用具体的卸载逻辑
        self.allocated = 0
        print("已卸载所有模型")
    
    def sequential_memory_reuse(self, model_configs, input_data):
        """串行编排 + 显存复用
        
        模型按顺序执行,前一个模型的显存会在后一个模型加载前释放。
        这样显存峰值只需要容纳最大的那个模型。
        """
        result = input_data
        
        for config in model_configs:
            # 加载模型(可能需要先卸载之前的)
            model = self.load_model_with_memory_check(
                config['path'],
                config['memory_gb']
            )
            
            # 执行推理
            with torch.npu.stream(torch.npu.Stream()):
                result = model(result)
            
            # 推理完成后立即卸载,释放显存
            self._unload_model(model)
            print(f"完成阶段: {config['name']}, 已卸载释放显存")
        
        return result
    
    def _unload_model(self, model):
        """卸载单个模型"""
        del model
        # 实际实现中还需要调用 torch.npu.empty_cache() 等

五、完整业务场景示例

5.1 智能安防编排方案

def create_security_orchestrator():
    """智能安防场景的完整编排方案
    
    业务需求:
    1. 实时检测画面中的人脸、人体、车辆
    2. 对检测到的人脸进行身份识别
    3. 对检测到的人体进行行为分析(是否有异常行为)
    4. 对检测到的车辆进行车牌识别
    5. 所有结果融合后输出告警
    
    编排策略:
    - 第一层: 三个检测模型并行(人脸检测、人体检测、车牌检测)
    - 第二层: 三个识别/分析模型并行(人脸识别、行为分析、车牌识别)
    - 第三层: 结果融合
    """
    
    dag = DAGOrchestrator()
    
    # 第一层: 并行检测
    dag.add_node("人脸检测", torch.load("face_detector.om"))
    dag.add_node("人体检测", torch.load("body_detector.om"))
    dag.add_node("车牌检测", torch.load("plate_detector.om"))
    
    # 第二层: 并行识别(依赖对应的检测结果)
    dag.add_node("人脸识别", torch.load("face_recognizer.om"))
    dag.add_node("行为分析", torch.load("behavior_analyzer.om"))
    dag.add_node("车牌识别", torch.load("plate_recognizer.om"))
    
    # 第三层: 结果融合
    dag.add_node("告警判断", torch.load("alert_judger.om"))
    
    # 添加依赖
    dag.add_edge("人脸检测", "人脸识别")
    dag.add_edge("人体检测", "行为分析")
    dag.add_edge("车牌检测", "车牌识别")
    dag.add_edge("人脸识别", "告警判断")
    dag.add_edge("行为分析", "告警判断")
    dag.add_edge("车牌识别", "告警判断")
    
    return dag

# 使用
security_dag = create_security_orchestrator()
results, timing = security_dag.run(camera_frame)

# 判断是否需要告警
alert_result = results['告警判断']
if alert_result.get('alert'):
    send_alert(f"检测到异常: {alert_result['description']}")

六、常见问题

问题 原因 解决方案
并行模型 OOM 多个模型同时占用显存 降低并行度或使用更轻量的模型
编排延迟高 串行模型太多 识别可并行的阶段并行化
数据格式不匹配 前后处理缺失 补充 preprocessor/postprocessor
DAG 死循环 依赖关系配置错误 检查 DAG 是否有环
模型加载慢 模型太大 使用模型缓存、预加载

相关仓库

  • ascend-cl - 推理接口 https://gitee.com/ascend/ascend-cl
  • torch_npu - Stream 管理 https://gitee.com/ascend/torch_npu
  • ascend-op-builder - 算子构建 https://gitee.com/ascend/ascend-op-builder
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐