CANN 多模型编排:复杂业务场景下的模型组合实战
一、为什么需要多模型编排
1.1 单模型的局限
现实中的 AI 任务往往不是"一个模型搞定一切"。举几个典型例子:
智慧安防场景:摄像头拍到一帧画面,你需要同时完成:
- 人脸检测(“画面里有没有人?”)→ 检测模型
- 人脸识别(“这个人是谁?”)→ 识别模型
- 行为分析(“这个人在干什么?”)→ 行为分类模型
这三个模型各司其职,缺一不可。任何一个单独拿出来都无法完成完整的安防任务。
智慧零售场景:顾客走进商店,你需要:
- 商品检测(“货架上有哪些商品?”)→ 检测模型
- 商品识别(“具体是哪个 SKU?”)→ 分类模型
- 顾客画像(“这个顾客大概什么年龄段?”)→ 属性识别模型
自动驾驶场景:车辆行驶过程中,你需要:
- 车道线检测 → 分割模型
- 目标检测(车辆、行人、障碍物)→ 检测模型
- 语义理解(路标、信号灯)→ 分类模型
- 路径规划 → 规划模型
1.2 编排的核心问题
多模型编排本质上是在解决三个核心问题:
执行顺序:哪些模型必须先跑、哪些可以并行跑?人脸检测必须先于人脸识别,因为识别模型需要检测模型提供的裁剪后的人脸区域。
数据流转:模型 A 的输出如何变成模型 B 的输入?检测模型输出的是边界框坐标,识别模型需要的是裁剪后的人脸图像——中间需要一个裁剪操作。
资源分配:多个模型共享 NPU 显存,如何避免 OOM(显存溢出)?如果三个模型各占 2GB 显存,但显卡总共只有 6GB,就需要精心设计显存复用策略。
二、三种编排模式
2.1 串行编排(Pipeline)
串行编排是最简单的模式:模型 A → 模型 B → 模型 C,一个接一个执行。
适用场景:
- 后一个模型的输入依赖前一个模型的输出
- 对延迟不敏感,但对准确率要求高
- 模型之间有严格的数据依赖关系
原理:
输入数据 → [模型A] → 中间结果 → [模型B] → 中间结果 → [模型C] → 最终输出
时间轴:
|--模型A--|--模型B--|--模型C--|
总延迟 = A延迟 + B延迟 + C延迟
优缺点:
- 优点:逻辑简单、易于调试、显存占用可控(一次只跑一个模型)
- 缺点:总延迟是各模型延迟之和,无法利用并行性
2.2 并行编排
并行编排:模型 A、B、C 同时执行,最后合并结果。
适用场景:
- 多个模型之间没有数据依赖
- 对延迟要求高,需要缩短总耗时
- 各模型处理同一输入的不同方面
原理:
┌─[模型A]─┐
输入数据 ─┼─[模型B]─┼─→ [结果合并] → 最终输出
└─[模型C]─┘
时间轴:
|--模型A--|
|--模型B--| → [合并]
|--模型C--|
总延迟 = max(A延迟, B延迟, C延迟) + 合并延迟
优缺点:
- 优点:总延迟约等于最慢的那个模型,大幅缩短
- 缺点:需要多份显存同时容纳多个模型,显存压力大
2.3 有向无环图(DAG)编排
DAG 编排是最灵活的模式:模型之间的依赖关系构成一个有向无环图,调度器根据图结构自动决定执行顺序。
适用场景:
- 复杂业务流程,模型之间有分支和汇聚
- 部分串行、部分并行的混合结构
- 需要动态调整执行计划
原理:
┌─[人脸检测]─→[人脸识别]─┐
│ │
输入图像 ─┼─[人体检测]─→[行为分析]─┼─→ [结果融合] → 最终输出
│ │
└─[车牌检测]─→[车牌识别]─┘
这是一个 DAG:
人脸检测 → 人脸识别
人体检测 → 行为分析
车牌检测 → 车牌识别
三个分支并行,最后汇聚到结果融合
优缺点:
- 优点:最灵活,能表达任意复杂的依赖关系
- 缺点:实现复杂,需要 DAG 调度器,调试困难
三、CANN 多模型编排实现
3.1 串行编排器
import torch.npu
import time
class SequentialOrchestrator:
"""串行编排器 - 最简单直观的多模型编排方式"""
def __init__(self):
self.models = [] # 存储 (名称, 模型, 前处理, 后处理) 四元组
self.stream = torch.npu.Stream() # 所有模型共享一个 Stream(串行执行)
def add_stage(self, name, model, preprocessor=None, postprocessor=None):
"""添加一个编排阶段
参数说明:
name: 阶段名称,用于日志和调试
model: 已加载的 .om 模型
preprocessor: 前处理函数,将上一阶段的输出转换为当前阶段的输入
postprocessor: 后处理函数,将当前阶段的输出转换为下一阶段可接受的格式
"""
self.models.append({
'name': name,
'model': model,
'preprocess': preprocessor or (lambda x: x), # 默认不做处理
'postprocess': postprocessor or (lambda x: x)
})
def run(self, initial_input):
"""执行完整的串行流水线
数据流向:
initial_input → [阶段1预处理] → [阶段1推理] → [阶段1后处理]
→ [阶段2预处理] → [阶段2推理] → [阶段2后处理]
→ ... → 最终输出
"""
current_data = initial_input
stage_results = {}
for stage in self.models:
stage_start = time.time()
# 前处理: 将上一阶段的输出转换为当前模型需要的输入格式
model_input = stage['preprocess'](current_data)
# 推理: 在 NPU 上执行模型
with torch.npu.stream(self.stream):
model_output = stage['model'](model_input)
# 后处理: 将模型输出转换为下一阶段可接受的格式
current_data = stage['postprocess'](model_output)
stage_time = (time.time() - stage_start) * 1000
stage_results[stage['name']] = {
'latency_ms': stage_time,
'input_shape': model_input.shape if hasattr(model_input, 'shape') else None,
'output_shape': model_output.shape if hasattr(model_output, 'shape') else None
}
return current_data, stage_results
# ─── 使用示例:安防场景的人脸检测+识别流水线 ───
# 假设我们有两个模型
face_detector = torch.load("face_detector.om") # 人脸检测模型
face_recognizer = torch.load("face_recognizer.om") # 人脸识别模型
# 定义前处理和后处理
def detect_preprocess(image):
"""检测模型前处理: 直接输入图像"""
return image
def detect_postprocess(detection_output):
"""检测模型后处理: 从检测结果中裁剪出人脸区域
检测模型输出格式: [[x1, y1, x2, y2, confidence], ...]
我们需要裁剪出每个人脸区域,供识别模型使用
"""
face_boxes = detection_output['boxes'] # 边界框坐标
face_scores = detection_output['scores'] # 置信度
# 只保留高置信度的人脸
valid_mask = face_scores > 0.5
face_boxes = face_boxes[valid_mask]
return face_boxes # 返回裁剪后的人脸区域
def recognize_preprocess(face_boxes):
"""识别模型前处理: 将裁剪后的人脸 resize 到识别模型要求的尺寸"""
# 实际实现中这里会做 resize、归一化等操作
return face_boxes
def recognize_postprocess(recognition_output):
"""识别模型后处理: 将特征向量转换为身份信息"""
return recognition_output # 返回识别结果
# 组装流水线
orchestrator = SequentialOrchestrator()
orchestrator.add_stage("人脸检测", face_detector, detect_preprocess, detect_postprocess)
orchestrator.add_stage("人脸识别", face_recognizer, recognize_preprocess, recognize_postprocess)
# 执行
image = load_camera_frame()
result, stage_info = orchestrator.run(image)
print(f"识别结果: {result}")
for stage_name, info in stage_info.items():
print(f" {stage_name}: {info['latency_ms']:.2f}ms")
3.2 并行编排器
import threading
class ParallelOrchestrator:
"""并行编排器 - 多个模型同时执行,缩短总延迟
核心思想: 如果多个模型之间没有数据依赖,就可以让它们在不同的 Stream 上同时执行。
这样总延迟约等于最慢的那个模型,而不是所有模型之和。
"""
def __init__(self, num_streams=4):
self.stages = [] # 存储并行执行的阶段
# 每个阶段一个 Stream,实现真正的并行执行
self.streams = [torch.npu.Stream() for _ in range(num_streams)]
def add_stage(self, name, model, preprocessor=None, postprocessor=None):
"""添加一个并行阶段"""
self.stages.append({
'name': name,
'model': model,
'preprocess': preprocessor or (lambda x: x),
'postprocess': postprocessor or (lambda x: x)
})
def run(self, input_data):
"""并行执行所有阶段
执行流程:
1. 所有模型同时开始推理(各自在自己的 Stream 上)
2. 等待所有模型完成
3. 按顺序执行后处理(因为结果可能有顺序要求)
"""
results = {}
threads = []
# 为每个阶段创建一个线程,并行执行
for i, stage in enumerate(self.stages):
stream = self.streams[i % len(self.streams)]
def run_stage(stage, stream, stage_idx):
"""单个阶段的执行函数"""
stage_start = time.time()
# 前处理
model_input = stage['preprocess'](input_data)
# 推理(在独立的 Stream 上执行,不影响其他 Stream)
with torch.npu.stream(stream):
model_output = stage['model'](model_input)
# 后处理
stage_result = stage['postprocess'](model_output)
results[stage['name']] = {
'result': stage_result,
'latency_ms': (time.time() - stage_start) * 1000
}
thread = threading.Thread(target=run_stage, args=(stage, stream, i))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
# 同步所有 Stream
torch.npu.synchronize()
return results
# ─── 使用示例:智慧零售场景 ───
# 三个模型并行执行: 商品检测、顾客检测、环境分析
product_detector = torch.load("product_detector.om")
customer_detector = torch.load("customer_detector.om")
environment_analyzer = torch.load("environment_analyzer.om")
parallel = ParallelOrchestrator(num_streams=3)
# 注意: 这三个模型处理同一帧图像,但关注不同方面
# 它们之间没有数据依赖,所以可以并行执行
parallel.add_stage("商品检测", product_detector)
parallel.add_stage("顾客检测", customer_detector)
parallel.add_stage("环境分析", environment_analyzer)
# 并行执行
frame = load_store_camera_frame()
results = parallel.run(frame)
# results 包含三个模型各自的输出
for stage_name, info in results.items():
print(f"{stage_name}: {info['latency_ms']:.2f}ms")
3.3 DAG 编排器
from collections import defaultdict, deque
class DAGOrchestrator:
"""DAG 编排器 - 最灵活的多模型编排方式
什么是 DAG?
DAG(Directed Acyclic Graph)即有向无环图。
在多模型编排中,DAG 描述了模型之间的依赖关系:
- 节点 = 模型(或处理步骤)
- 有向边 = 数据依赖(A 的输出是 B 的输入)
- 无环 = 不会出现循环依赖(A→B→C→A 是不允许的)
调度算法:
使用拓扑排序(Topological Sort)确定执行顺序。
没有依赖的节点可以并行执行。
"""
def __init__(self):
self.nodes = {} # 节点信息 {节点名: {model, preprocess, postprocess}}
self.edges = defaultdict(list) # 邻接表 {父节点: [子节点列表]}
self.in_degree = defaultdict(int) # 入度 {节点名: 入度值}
self.reverse_edges = defaultdict(list) # 反向边 {子节点: [父节点列表]}
def add_node(self, name, model, preprocessor=None, postprocessor=None):
"""添加节点(模型)"""
self.nodes[name] = {
'model': model,
'preprocess': preprocessor or (lambda x: x),
'postprocess': postprocessor or (lambda x: x)
}
# 确保节点在入度表中存在
if name not in self.in_degree:
self.in_degree[name] = 0
def add_edge(self, from_node, to_node):
"""添加依赖边: from_node 的输出是 to_node 的输入
例如: add_edge("人脸检测", "人脸识别")
表示: 人脸识别依赖人脸检测的结果
"""
self.edges[from_node].append(to_node)
self.reverse_edges[to_node].append(from_node)
self.in_degree[to_node] += 1
def _topological_sort(self):
"""拓扑排序 - 确定执行顺序
算法: Kahn's Algorithm
1. 找到所有入度为 0 的节点(没有依赖的节点)
2. 将它们加入执行队列
3. 执行后,将它们的下游节点入度减 1
4. 重复直到所有节点都执行完毕
返回: [[可并行执行的节点组1], [可并行执行的节点组2], ...]
"""
# 计算入度
in_deg = dict(self.in_degree)
# 入度为 0 的节点可以立即执行
queue = deque([node for node in self.nodes if in_deg[node] == 0])
execution_levels = [] # 每一层的节点可以并行执行
while queue:
# 当前层: 所有入度为 0 的节点
current_level = list(queue)
execution_levels.append(current_level)
queue.clear()
# 执行当前层的节点后,更新下游节点的入度
for node in current_level:
for downstream in self.edges[node]:
in_deg[downstream] -= 1
if in_deg[downstream] == 0:
queue.append(downstream)
return execution_levels
def run(self, initial_input):
"""执行 DAG 编排
流程:
1. 对 DAG 进行拓扑排序,得到执行层级
2. 逐层执行: 同一层的节点可以并行执行
3. 层间串行: 上一层全部完成才能执行下一层
"""
execution_levels = self._topological_sort()
# 存储每个节点的输出(供后续节点使用)
node_outputs = {}
all_results = {}
total_start = time.time()
for level_idx, level in enumerate(execution_levels):
level_start = time.time()
if len(level) == 1:
# ── 单节点: 直接串行执行 ──
node_name = level[0]
node = self.nodes[node_name]
# 收集所有父节点的输出作为输入
parent_outputs = [node_outputs[p] for p in self.reverse_edges[node_name]]
model_input = node['preprocess'](parent_outputs if len(parent_outputs) > 1 else parent_outputs[0] if parent_outputs else initial_input)
# 推理
model_output = node['model'](model_input)
# 后处理并存储
node_outputs[node_name] = node['postprocess'](model_output)
else:
# ── 多节点: 并行执行 ──
threads = []
for node_name in level:
node = self.nodes[node_name]
def execute_node(name, node):
parent_outputs = [node_outputs[p] for p in self.reverse_edges[name]]
model_input = node['preprocess'](parent_outputs if len(parent_outputs) > 1 else parent_outputs[0] if parent_outputs else initial_input)
model_output = node['model'](model_input)
node_outputs[name] = node['postprocess'](model_output)
thread = threading.Thread(target=execute_node, args=(node_name, node))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
level_latency = (time.time() - level_start) * 1000
all_results[f"level_{level_idx}"] = {
'nodes': level,
'latency_ms': level_latency
}
total_latency = (time.time() - total_start) * 1000
return node_outputs, {
'levels': all_results,
'total_latency_ms': total_latency
}
# ─── 使用示例:安防场景的 DAG 编排 ───
"""
DAG 结构:
┌─[人脸检测]─→[人脸识别]─┐
│ │
[摄像头] ──┼─[人体检测]─→[行为分析]─┼─→ [结果融合]
│ │
└─[车牌检测]─→[车牌识别]─┘
三个分支并行,最后汇聚到结果融合节点。
"""
# 创建 DAG
dag = DAGOrchestrator()
# 添加节点
dag.add_node("人脸检测", torch.load("face_detector.om"))
dag.add_node("人体检测", torch.load("body_detector.om"))
dag.add_node("车牌检测", torch.load("plate_detector.om"))
dag.add_node("人脸识别", torch.load("face_recognizer.om"))
dag.add_node("行为分析", torch.load("behavior_analyzer.om"))
dag.add_node("车牌识别", torch.load("plate_recognizer.om"))
dag.add_node("结果融合", None) # 融合节点可以是一个简单的合并函数
# 添加依赖边
dag.add_edge("人脸检测", "人脸识别")
dag.add_edge("人体检测", "行为分析")
dag.add_edge("车牌检测", "车牌识别")
dag.add_edge("人脸识别", "结果融合")
dag.add_edge("行为分析", "结果融合")
dag.add_edge("车牌识别", "结果融合")
# 执行
results, timing = dag.run(camera_frame)
print(f"总耗时: {timing['total_latency_ms']:.2f}ms")
for level, info in timing['levels'].items():
print(f" {level}: {info['nodes']} → {info['latency_ms']:.2f}ms")
四、显存共享策略
4.1 为什么需要关注显存
NPU 显存是有限资源。以 Atlas 500 为例,它只有有限的片上存储。如果多个模型同时驻留显存,很容易出现显存不足的问题。
核心思路: 不是所有模型都需要同时在显存中。串行编排时,模型 A 执行完就可以把显存释放给模型 B 使用。
4.2 显存复用实现
class MemoryEfficientOrchestrator:
"""显存高效的编排器
核心策略:
1. 串行模型共享显存: A 释放后 B 才加载
2. 并行模型分配不同显存区域
3. 中间结果及时释放
"""
def __init__(self, total_memory_gb=4.0):
self.total_memory = total_memory_gb
self.allocated = 0
def load_model_with_memory_check(self, model_path, required_memory_gb):
"""带显存检查的模型加载
如果显存不够,先卸载已加载的模型,再加载新模型。
这种策略在串行编排中非常有效。
"""
if self.allocated + required_memory_gb > self.total_memory:
print(f"显存不足: 需要 {required_memory_gb}GB, 可用 {self.total_memory - self.allocated}GB")
print("卸载已有模型释放显存...")
self._unload_all_models()
model = torch.load(model_path)
self.allocated += required_memory_gb
print(f"加载模型: {model_path}, 使用显存: {required_memory_gb}GB, 总占用: {self.allocated}GB")
return model
def _unload_all_models(self):
"""卸载所有模型"""
# 实际实现中会调用具体的卸载逻辑
self.allocated = 0
print("已卸载所有模型")
def sequential_memory_reuse(self, model_configs, input_data):
"""串行编排 + 显存复用
模型按顺序执行,前一个模型的显存会在后一个模型加载前释放。
这样显存峰值只需要容纳最大的那个模型。
"""
result = input_data
for config in model_configs:
# 加载模型(可能需要先卸载之前的)
model = self.load_model_with_memory_check(
config['path'],
config['memory_gb']
)
# 执行推理
with torch.npu.stream(torch.npu.Stream()):
result = model(result)
# 推理完成后立即卸载,释放显存
self._unload_model(model)
print(f"完成阶段: {config['name']}, 已卸载释放显存")
return result
def _unload_model(self, model):
"""卸载单个模型"""
del model
# 实际实现中还需要调用 torch.npu.empty_cache() 等
五、完整业务场景示例
5.1 智能安防编排方案
def create_security_orchestrator():
"""智能安防场景的完整编排方案
业务需求:
1. 实时检测画面中的人脸、人体、车辆
2. 对检测到的人脸进行身份识别
3. 对检测到的人体进行行为分析(是否有异常行为)
4. 对检测到的车辆进行车牌识别
5. 所有结果融合后输出告警
编排策略:
- 第一层: 三个检测模型并行(人脸检测、人体检测、车牌检测)
- 第二层: 三个识别/分析模型并行(人脸识别、行为分析、车牌识别)
- 第三层: 结果融合
"""
dag = DAGOrchestrator()
# 第一层: 并行检测
dag.add_node("人脸检测", torch.load("face_detector.om"))
dag.add_node("人体检测", torch.load("body_detector.om"))
dag.add_node("车牌检测", torch.load("plate_detector.om"))
# 第二层: 并行识别(依赖对应的检测结果)
dag.add_node("人脸识别", torch.load("face_recognizer.om"))
dag.add_node("行为分析", torch.load("behavior_analyzer.om"))
dag.add_node("车牌识别", torch.load("plate_recognizer.om"))
# 第三层: 结果融合
dag.add_node("告警判断", torch.load("alert_judger.om"))
# 添加依赖
dag.add_edge("人脸检测", "人脸识别")
dag.add_edge("人体检测", "行为分析")
dag.add_edge("车牌检测", "车牌识别")
dag.add_edge("人脸识别", "告警判断")
dag.add_edge("行为分析", "告警判断")
dag.add_edge("车牌识别", "告警判断")
return dag
# 使用
security_dag = create_security_orchestrator()
results, timing = security_dag.run(camera_frame)
# 判断是否需要告警
alert_result = results['告警判断']
if alert_result.get('alert'):
send_alert(f"检测到异常: {alert_result['description']}")
六、常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 并行模型 OOM | 多个模型同时占用显存 | 降低并行度或使用更轻量的模型 |
| 编排延迟高 | 串行模型太多 | 识别可并行的阶段并行化 |
| 数据格式不匹配 | 前后处理缺失 | 补充 preprocessor/postprocessor |
| DAG 死循环 | 依赖关系配置错误 | 检查 DAG 是否有环 |
| 模型加载慢 | 模型太大 | 使用模型缓存、预加载 |
相关仓库
- ascend-cl - 推理接口 https://gitee.com/ascend/ascend-cl
- torch_npu - Stream 管理 https://gitee.com/ascend/torch_npu
- ascend-op-builder - 算子构建 https://gitee.com/ascend/ascend-op-builder
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)