1. 背景

需求:针对视频形式的数据输入,对每一帧图像,有多个神经网络模型需要进行推理并获得预测结果。如何让整个推理过程更加高效,尝试了几种不同的方案。

硬件:单显卡主机。

2. 方案

由于存在多个模型需要推理,但模型之间没有相互依赖关系,因此很容易想到通过并行的方式来提高运行效率。

对比了如下几种方案的结果,包括:

  1. 串行
  2. 线程
  3. 进程
  4. 协程

3. 实现

3.1 整体流程

配置了 4 个体量相近的模型。
为了屏蔽读取和解码的时间消耗对最终结果的影响,提前读取视频并准备输入。
统计每个单独模型执行推理的累积时间,以及整体的运行时间。

import asyncio
from time import time

def main():
    frames = load_video()
    weights = load_weights()

    print('串行:')
    one_by_one(weights, frames)
    print('多线程:')
    multit_thread(weights, frames)
    print('多进程:')
    multi_process(weights, frames)
    print('协程:')
    asyncio.run(coroutine(weights, frames))

3.2 串行

读取到当前帧数据后,所有模型依次运行。

def one_by_one(weights, frames):
    sessions = [init_session(weight) for weight in weights]
    costs = [[] for _ in range(len(weights))]
    since_infer = time()
    for frame in frames:
        for session in sessions:
            since = time()
            _ = session.run('output', {'input': frame})
            cost = time() - since
            costs[idx].append(cost)
    print([sum(cost) for cost in costs])
    print("infer:", time() - since_infer)
    return

3.3 多线程

为每一个模型分配一个线程。

from threading import Thread

def multit_thread(weights, frames):
    sessions = [init_session(weight) for weight in weights]
    threads = []
    since_infer = time()
    for session in sessions:
        thread = Thread(target=run_session_thread, args=(session, frames))
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    print("infer:", time() - since_infer)
    return

def run_session_thread(session, frames):
    costs = []
    for frame in frames:
        since = time()
        _ = session.run('output', {'input': frame})
        costs.append(time() - since)
    print(sum(costs))
    return

3.4 多进程

为每一个模型分配一个进程。
由于 session 不能在进程间传递,因此需要在每个进程的内部单独初始化。如果数据较多,这部分初始化的时间消耗基本可以忽略不急。

from multiprocessing import Manager, Process

def multi_process(weights, frames):
    inputs = Manager().list(frames)
    processes = []
    since_infer = time()
    for weight in weights:
        process = Process(target=run_session_process, args=(weight, inputs))
        process.start()
        processes.append(process)
    for process in processes:
        process.join()
    print("infer:", time() - since_infer)
    return

def run_session_process(weight, frames):
    session = init_session(weight)
    costs = []
    for frame in frames:
        since = time()
        _ = session.run('output', {'input': frame})
        costs.append(time() - since)
    print(sum(costs))
    return

3.5 协程

为每一个模型分配一个协程。

async def coroutine(weights, frames):
    sessions = [init_session(weight) for weight in weights]
    since_infer = time()
    tasks = [
        asyncio.create_task(run_session_coroutine(session, frames))
        for session in sessions
    ]
    for task in tasks:
        await task
    print("infer:", time() - since_all)
    return

async def run_session_coroutine(session, frames):
    costs = []
    for frame in frames:
        since = time()
        _ = session.run('output', {'input': frame})
        costs.append(time() - since)
    print(sum(costs))
    return

3.6 其他辅助函数

import cv2
import numpy as np
import onnxruntime as ort

def init_session(weight):
    provider = "CUDAExecutionProvider"
    session = ort.InferenceSession(weight, providers=[provider])
    return session

def load_video():
    # 为了减少读视频的时间,复制相同的图片组成batch
    vcap = cv2.VideoCapture('path_to_video')
    count = 1000
    batch_size = 4
    frames = []
    for _ in range(count):
        _, frame = vcap.read()
        frame = cv2.resize(frame, (256, 256)).transpose((2, 0, 1))
        frame = np.stack([frame] * batch_size, axis=0)
        frames.append(frame.astype(np.float32))
    return frames

def load_weights():
    return ['path_to_weights_0',
            'path_to_weights_1',
            'path_to_weights_2',
            'path_to_weights_3',]

4. 结果及分析

4.1 执行结果

batch_size=4共运行 1000 帧数据,推理结果如下:

方案串行线程进程协程
单模型累积时间/s7.9/5.3/5.2/5.213.5/13.5/15.6/15.713.5/13.8/13.7/13.66.5/5.2/5.3/5.3
总时间/s23.715.830.122.5
显存占用/MB1280141633751280
平均 GPU-Util约 60%约 85%约 70%约 55%
  • 在这个场景下,多线程是综合效率最高的方式(时间最短、显存占用合理、GPU 利用率最高);
  • 串行作为最基础的方案,总时间就是每个模型执行时间之和;
  • 多进程的方式,单模型的累积时间与多线程类似,但是总时间有明显增加,且极大增加了显存占用;
  • 用协程的方式,总结果看,与串行模式本质上是一样的。

4.2 结果分析

4.2.1 关于线程方案

为什么多线程相比串行可以提高运行效率?

  • 基本的判断是,session.run()函数运行时,既有 CPU 执行的部分,又有 GPU 执行的部分;
  • 如果是串行方案,则 CPU 运行时,GPU 会等待,反之亦然;
  • 当换用多线程方案后,当一个线程从 CPU 执行切换到 GPU 执行后,会继续执行另一个线程的 CPU 部分,并等待 GPU 返回结果。
4.2.2 关于进程方案

为什么多进程反而降低了运行效率?

  • 基本的判断是,整体执行的瓶颈并不在 CPU 的运算部分,而是在于 GPU 上模型前向推理的计算部分;
  • 因此,用多个进程并没有充分利用系统资源,多个 CPU 核心会争夺同一个 GPU 的计算资源,并增加了调度消耗。
4.2.3 关于协程方案

为什么看起来协程与串行的效果一样?

协程方案在执行过程中,从表现上来看:

  • 单个模型的累积时间是逐步print出来的,间隔大致等于每个模型的累积时间(而线程和进程方案中,几乎是同时输出 4 个模型的累积时间,说明是同时运行结束);
  • 显存占用是逐步增加的,最后达到与串行方案一致。

可能的原因:

  • CPU 和 GPU 的任务切换,可能无法触发协程的切换,导致最终的效果是,一个模型完成了所有数据的推理后,再进行下一个模型的推理。

使用协程的必要性:

  • 从线程改为协程,是为了进一步降低线程切换的消耗;
  • 在这个场景下,需要同时执行推理的模型数量一般不会太多,建立同样数量的线程,系统资源的消耗是可控的;
  • 因此,没有使用协程的必要性。
GitHub 加速计划 / on / onnxruntime
13.76 K
2.79 K
下载
microsoft/onnxruntime: 是一个用于运行各种机器学习模型的开源库。适合对机器学习和深度学习有兴趣的人,特别是在开发和部署机器学习模型时需要处理各种不同框架和算子的人。特点是支持多种机器学习框架和算子,包括 TensorFlow、PyTorch、Caffe 等,具有高性能和广泛的兼容性。
最近提交(Master分支:2 个月前 )
59280095 Adds support for einsum via WebNN matmul, transpose, reshape, reducesum, identity and element-wise binary ops. 5 天前
c73a3d18 ### Description A breakdown PR of https://github.com/microsoft/onnxruntime/pull/22651 ### Motivation and Context <!-- - Why is this change required? What problem does it solve? - If it fixes an open issue, please link to the issue here. --> 5 天前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐