【端侧部署系列-分割】MobileSam部署至全志开发板
注:awnpu_model_zoo\docs里有详细的开发文档以及参考指南
本文是根据《NPU开发环境部署参考指南》,部署PC的ubuntu环境,使用Docker镜像环境为例进行说明。
如果想对部署流程进行更加详细的了解,可以参考《NPU_模型部署_开发指南》
重要说明:在awnpu_model_zoo内没有的模型,可参考awnpu_model_zoo\examples里面对应系列的模型进行修改部署,需要自行编写对应模型cpp前后处理代码以及修改相应的配置文件(如model_config.h、CMakeLists.txt)。如果遇到模型导出量化失败的情况,需要对模型进行相应的裁剪、或者采用不同的量化方式等。本文的MobileSam已经在板端经过了推理验证。
由于NPU目前还不支持动态输入,所以不能像官方模型同时选择框和多个点,这里我们就采用ROI区域的方式,也就是画框的方式。
mobilesam目录结构如下
.
├── CMakeLists.txt
├── decoder_convert_model # decoder模型转换
│ ├── config_yml.py
│ └── convert_model_env.sh
├── encoder_convert_model # encoder模型转换
│ ├── config_yml.py
│ ├── convert_model_env.sh
│ └── python
│ ├── export_onnx_models.py # 模型导出
│ └── predict.py # 推理
├── main.cpp
├── mobilesam_post.cpp
├── mobilesam_pre.cpp
├── model
│ └── picture1.jpg
└── model_config.h
环境配置
关于环境配置所有模型都是一样的,采用docker容器的方式,具体关于参考之前部署yolox的文章【端侧部署yolo系列】yolox部署至全志开发板T736
https://blog.csdn.net/troyteng/article/details/155444386?spm=1011.2124.3001.6209
下载镜像文件和AWNPU_Model_Zoo,创建自己的容器。
模型准备
原始模型获取在MobileSAM源码weights目录下mobile_sam.pt
下载源码并配置相应的环境,推荐使用conda
注: 如果只是想跑通demo,这里有已经简化好的模型可以直接使用
# mobilesam_decoder_sim.onnx 输入尺寸为448*448
http://netstorage.allwinnertech.com:5000/sharing/rTvBQcLbX
# mobilesam_encoder.onnx 输入尺寸为448*448
http://netstorage.allwinnertech.com:5000/sharing/afYWhQxe3
# mobilesam_encoder.onnx 输入尺寸为1024*1024
http://netstorage.allwinnertech.com:5000/sharing/gMLYrEd7s
# mobilesam_decoder.onnx 输入尺寸为1024*1024
http://netstorage.allwinnertech.com:5000/sharing/8o2UshoO6
模型导出
将源码中的图像编码器和解码模型mobile_sam.pt导出为onnx模型。图像编码器模型位于/mobile_sam/build_sam.py,有四种不同的类型的VIT模型ViT-Huge、ViT-Lager、ViT-Base以及TinyViT。TinyViT是专为移动设备优化,使用了更小的模型架构,因此导出该模型的onnx作为图像编码器。同时为了进一步提高在板端的推理速度,可以将原始模型的输入尺寸1024*1024改为448*448*:找到build_sam_vit_t函数,修改代码如下
def build_sam_vit_t(checkpoint=None):
prompt_embed_dim = 256
# 修改img_size为448
image_size = 448
vit_patch_size = 16
image_embedding_size = image_size // vit_patch_size
mobile_sam = Sam(
# 修改img_size为448
image_encoder=TinyViT(img_size=448, in_chans=3, num_classes=1000,
embed_dims=[64, 128, 160, 320],
depths=[2, 2, 6, 2],
num_heads=[2, 4, 5, 10],
window_sizes=[7, 7, 14, 7],
...
在encoder_convert_model目录下新建一个python目录,把脚本放在python目录下。模型导出脚本 export_onnx_models.py
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
import torch
from mobile_sam import sam_model_registry
from mobile_sam.utils.onnx import SamOnnxModel
import argparse
import warnings
class SamImageEncoderOnnxModel(torch.nn.Module):
"""
This model should not be called directly, but is used in ONNX export.
It wraps the image encoder of Sam, with some functions modified to enable model tracing.
"""
def __init__(self, model):
super().__init__()
self.image_encoder = model.image_encoder
@torch.no_grad()
def forward(self, image):
"""Directly input preprocessed image, without preprocessing steps"""
image_embeddings = self.image_encoder(image)
return image_embeddings
try:
import onnxruntime # type: ignore
onnxruntime_exists = True
except ImportError:
onnxruntime_exists = False
parser = argparse.ArgumentParser(
description="Export the SAM prompt encoder and mask decoder to an ONNX model."
)
parser.add_argument(
"--checkpoint", type=str, required=True, help="The path to the SAM model checkpoint."
)
parser.add_argument(
"--output", type=str, required=True, help="The filename to save the ONNX model to."
)
parser.add_argument(
"--model-type",
type=str,
required=True,
help="In ['default', 'vit_h', 'vit_l', 'vit_b']. Which type of SAM model to export.",
)
parser.add_argument(
"--return-single-mask",
action="store_true",
help=(
"If true, the exported ONNX model will only return the best mask, "
"instead of returning multiple masks. For high resolution images "
"this can improve runtime when upscaling masks is expensive."
),
)
parser.add_argument(
"--opset",
type=int,
default=16,
help="The ONNX opset version to use. Must be >=11",
)
parser.add_argument(
"--quantize-out",
type=str,
default=None,
help=(
"If set, will quantize the model and save it with this name. "
"Quantization is performed with quantize_dynamic from onnxruntime.quantization.quantize."
),
)
parser.add_argument(
"--gelu-approximate",
action="store_true",
help=(
"Replace GELU operations with approximations using tanh. Useful "
"for some runtimes that have slow or unimplemented erf ops, used in GELU."
),
)
parser.add_argument(
"--use-stability-score",
action="store_true",
help=(
"Replaces the model's predicted mask quality score with the stability "
"score calculated on the low resolution masks using an offset of 1.0. "
),
)
parser.add_argument(
"--return-extra-metrics",
action="store_true",
help=(
"The model will return five results: (masks, scores, stability_scores, "
"areas, low_res_logits) instead of the usual three. This can be "
"significantly slower for high resolution outputs."
),
)
parser.add_argument(
"--export-image-encoder",
action="store_true",
default=True,
help=(
"If true, export the image encoder as a separate ONNX model."
),
)
parser.add_argument(
"--image-encoder-output",
type=str,
default=None,
help=(
"The filename to save the image encoder ONNX model to. "
"If not specified, it will be generated from the main output path."
),
)
parser.add_argument(
"--fixed-image-size",
type=str,
default=None,
help=(
"Fixed image size for the exported model (format: height,width). "
"If specified, the model will have fixed input dimensions instead of dynamic ones. "
"Example: --fixed-image-size 448,448"
),
)
def run_export(
model_type: str,
checkpoint: str,
output: str,
opset: int,
return_single_mask: bool,
gelu_approximate: bool = False,
use_stability_score: bool = False,
return_extra_metrics=False,
export_image_encoder: bool = True,
image_encoder_output: str = None,
fixed_image_size: str = None,
):
print("Loading model...")
sam = sam_model_registry[model_type](checkpoint=checkpoint)
onnx_model = SamOnnxModel(
model=sam,
return_single_mask=return_single_mask,
use_stability_score=use_stability_score,
return_extra_metrics=return_extra_metrics,
)
if gelu_approximate:
for n, m in onnx_model.named_modules():
if isinstance(m, torch.nn.GELU):
m.approximate = "tanh"
# Set dynamic axes
dynamic_axes = {
"point_coords": {1: "num_points"},
"point_labels": {1: "num_points"},
}
embed_dim = sam.prompt_encoder.embed_dim
embed_size = sam.prompt_encoder.image_embedding_size
mask_input_size = [4 * x for x in embed_size]
# Create dummy inputs (using 2 points as example)
dummy_inputs = {
"image_embeddings": torch.randn(1, embed_dim, *embed_size, dtype=torch.float),
"point_coords": torch.randint(low=0, high=448, size=(1, 2, 2), dtype=torch.float),
"point_labels": torch.randint(low=0, high=4, size=(1, 2), dtype=torch.float),
"mask_input": torch.randn(1, 1, *mask_input_size, dtype=torch.float),
"has_mask_input": torch.tensor([1], dtype=torch.float),
"orig_im_size": torch.tensor([1500, 2250], dtype=torch.float),
}
_ = onnx_model(**dummy_inputs)
output_names = ["masks", "iou_predictions", "low_res_masks"]
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=torch.jit.TracerWarning)
warnings.filterwarnings("ignore", category=UserWarning)
with open(output, "wb") as f:
print(f"Exporting onnx model to {output}...")
torch.onnx.export(
onnx_model,
tuple(dummy_inputs.values()),
f,
export_params=True,
verbose=False,
opset_version=opset,
do_constant_folding=True,
input_names=list(dummy_inputs.keys()),
output_names=output_names,
dynamic_axes=dynamic_axes,
)
if onnxruntime_exists:
ort_inputs = {k: to_numpy(v) for k, v in dummy_inputs.items()}
# set cpu provider default
providers = ["CPUExecutionProvider"]
ort_session = onnxruntime.InferenceSession(output, providers=providers)
_ = ort_session.run(None, ort_inputs)
print("Model has successfully been run with ONNXRuntime.")
# Export image encoder model
if export_image_encoder:
if image_encoder_output is None:
# Generate image encoder output path from the main output path
image_encoder_output = output.replace(".onnx", "_image_encoder.onnx")
print("Exporting image encoder model...")
image_encoder_onnx_model = SamImageEncoderOnnxModel(sam)
# Dummy input for image encoder: batch_size=1, channels=3, height=img_size, width=img_size
img_size = sam.image_encoder.img_size
# Handle fixed image size
if fixed_image_size:
# 解析固定尺寸
height, width = map(int, fixed_image_size.split(","))
print(f"Using fixed image size: {height}x{width}")
dummy_image = torch.randn(1, 3, height, width, dtype=torch.float)
# 不使用动态轴
dynamic_axes = None
else:
# Use default size
dummy_image = torch.randn(1, 3, img_size, img_size, dtype=torch.float)
# Use dynamic axes
dynamic_axes = {
"image": {2: "height", 3: "width"},
}
# Export image encoder
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=torch.jit.TracerWarning)
warnings.filterwarnings("ignore", category=UserWarning)
with open(image_encoder_output, "wb") as f:
print(f"Exporting image encoder onnx model to {image_encoder_output}...")
torch.onnx.export(
image_encoder_onnx_model,
dummy_image,
f,
export_params=True,
verbose=False,
opset_version=opset,
do_constant_folding=True,
input_names=["image"],
output_names=["image_embeddings"],
dynamic_axes=dynamic_axes,
)
# Validate image encoder model
if onnxruntime_exists:
ort_inputs = {"image": to_numpy(dummy_image)}
providers = ["CPUExecutionProvider"]
ort_session = onnxruntime.InferenceSession(image_encoder_output, providers=providers)
_ = ort_session.run(None, ort_inputs)
print("Image encoder model has successfully been run with ONNXRuntime.")
def to_numpy(tensor):
return tensor.cpu().numpy()
if __name__ == "__main__":
args = parser.parse_args()
run_export(
model_type=args.model_type,
checkpoint=args.checkpoint,
output=args.output,
opset=args.opset,
return_single_mask=args.return_single_mask,
gelu_approximate=args.gelu_approximate,
use_stability_score=args.use_stability_score,
return_extra_metrics=args.return_extra_metrics,
export_image_encoder=args.export_image_encoder,
image_encoder_output=args.image_encoder_output,
fixed_image_size=args.fixed_image_size,
)
if args.quantize_out is not None:
assert onnxruntime_exists, "onnxruntime is required to quantize the model."
from onnxruntime.quantization import QuantType # type: ignore
from onnxruntime.quantization.quantize import quantize_dynamic # type: ignore
print(f"Quantizing model and writing to {args.quantize_out}...")
quantize_dynamic(
model_input=args.output,
model_output=args.quantize_out,
optimize_model=True,
per_channel=False,
reduce_range=False,
weight_type=QuantType.QUInt8,
)
print("Done!")
导出命令
# 导出图像嵌入模型encoder_onnx
python export_onnx_models.py \
--checkpoint ./mobile_sam.pt \
--output ./../../decoder_convert_model/mobilesam_decoder.onnx \
--model-type vit_t \
--image-encoder-output ./../mobilesam_encoder.onnx \
--fixed-image-size 448,448 \
--return-single-mask
模型修改
利用onnx-modify工具移除mobilesam_decoder模型中的orig_im_size输入,链接里有详细的工具使用指导。删除了orig_im_size输入,masks输出也会自动删除。
orig_im_size 代表原始输入图像的尺寸。解码器内部需要通过这个参数,将生成的掩码从“嵌入空间尺寸”缩放回“原始图像尺寸”。在部署时,输入尺寸是在预处理阶段固定的。如果模型保留这个动态输入,推理引擎可能无法应用静态优化,且每次推理都需要传入这个额外的标量参数,增加了业务代码的复杂度。删除该输入,意味着将“后处理”移出了模型。推理引擎只负责计算嵌入空间中的掩码张量,而将最后的尺寸缩放逻辑交由 CPU 端用 OpenCV 或自定义算子处理。这通常能获得更高的吞吐量,并让模型结构更加简洁,从而获得极致的运行效率。
# onnx-modify工具获取链接
https://github.com/ZhangGe6/onnx-modifier
# 直接在conda环境中安装onnx-modifier
pip install onnx-modifier
# 启动工具
onnx-modifier

打开网页,选择导出的decoder onnx模型,按照以下顺序删除orig_im_size结构(模型右下角),点击Cast,删除子节点,同理删除shape及其子节点(如下图第二张),打钩clean up,点击Download保存。模型会保存在安装onnx-modifier目录下的modified_onnx目录。

固定尺寸
前面也提到npu不支持动态尺寸,将decoder模型point_labels、point_coords输入的num_points都固定为2,表示给定一个box的输入,输入的形式为原始图像的左上角和右下角像素坐标。
# 将修改好的模型放入decoder_convert_model指定目录
# 进入指定目录
cd ../../decoder_convert_model/
python3 -m onnxsim mobilesam_decoder.onnx mobilesam_decoder_sim.onnx --overwrite-input-shape point_coords:1,2,2 point_labels:1,2 --no-large-tensor
到这里就已经完成了图像编码器模型(mobilesam_encoder)和解码器模型(mobilesam_decoder)的准备了
运行python demo
可以在conda环境下使用python运行推理,验证模型是否正确。把脚本放在encoder_convert_model/python目录下。
# MobileSAM ONNX inference example (using ONNX encoder)
import numpy as np
import cv2
import os
import onnxruntime
def apply_coords(coords, original_size):
"""
Apply coordinate transformation to convert original image coordinates to model input coordinates
Args:
coords: Original coordinates, shape (N, 2)
original_size: Original image size, shape (H, W)
Returns:
Transformed coordinates, shape (N, 2)
"""
# input 448x448
target_size = 448
scale_h = target_size / original_size[0]
scale_w = target_size / original_size[1]
coords = coords * np.array([scale_w, scale_h], dtype=np.float32)
return coords
# Image preprocessing function
def preprocess_image(image):
"""
Preprocess image to adapt to ONNX encoder input
Args:
image: Original image (H, W, 3)
Returns:
Preprocessed image (1, 3, 448, 448)
"""
# Resize image to 448x448
target_size = 448
image = cv2.resize(image, (target_size, target_size))
# Normalize to [0, 1]
image = image.astype(np.float32) / 255.0
# Change channel order: HWC -> CHW
image = np.transpose(image, (2, 0, 1))
# Standardize
pixel_mean = np.array([0.485, 0.456, 0.406], dtype=np.float32).reshape(3, 1, 1)
pixel_std = np.array([0.229, 0.224, 0.225], dtype=np.float32).reshape(3, 1, 1)
image = (image - pixel_mean) / pixel_std
# Add batch dimension
image = np.expand_dims(image, axis=0)
return image
def postprocess_mask(mask, original_size):
mask_resized = cv2.resize(mask, (original_size[1], original_size[0]), interpolation=cv2.INTER_LINEAR)
return mask_resized
def process_masks(masks, original_size, threshold=0.0):
"""
Process model output masks
Args:
masks: Model output masks (1, N, H_model, W_model)
original_size: Original image size (H_original, W_original)
threshold: Binarization threshold
Returns:
Processed mask (H_original, W_original)
"""
mask = masks[0][0]
mask_resized = postprocess_mask(mask, original_size)
mask_binary = mask_resized > threshold
return mask_binary
def main():
print("Loading ONNX encoder...")
encoder_model_path = "./../mobilesam_encoder.onnx"
encoder_session = onnxruntime.InferenceSession(encoder_model_path)
print("Loading ONNX decoder...")
decoder_model_path = "./../../decoder_convert_model/mobilesam_decoder_sim.onnx"
decoder_session = onnxruntime.InferenceSession(decoder_model_path)
image_path = "./../../model/picture1.jpg"
image = cv2.imread(image_path)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
original_size = image.shape[:2] # (H, W)
print("Getting image embedding...")
preprocessed_image = preprocess_image(image)
encoder_inputs = {"image": preprocessed_image}
image_embedding = encoder_session.run(None, encoder_inputs)[0]
print(f"Image embedding shape: {image_embedding.shape}")
input_box = np.array([190, 70, 460, 280]) # [x1, y1, x2, y2]
point_coords = np.array([[input_box[0], input_box[1]],
[input_box[2], input_box[3]]], dtype=np.float32)
point_coords = apply_coords(point_coords, original_size)
point_coords = point_coords.astype(np.float32)
point_coords = np.expand_dims(point_coords, axis=0) # (1, 2, 2)
point_labels = np.array([[2, 3]], dtype=np.float32) # (1, 2)
onnx_mask_input = np.zeros((1, 1, 112, 112), dtype=np.float32)
onnx_has_mask_input = np.zeros(1, dtype=np.float32)
ort_inputs = {
"image_embeddings": image_embedding,
"point_coords": point_coords,
"point_labels": point_labels,
"mask_input": onnx_mask_input,
"has_mask_input": onnx_has_mask_input
}
print("Running ONNX inference...")
iou_predictions, low_res_masks = decoder_session.run(None, ort_inputs)
print(f"low_res_masks shape: {low_res_masks.shape}")
print(f"iou_predictions shape: {iou_predictions.shape}")
best_mask = process_masks(low_res_masks, original_size)
result_image = image.copy()
mask_uint8 = (best_mask * 255).astype(np.uint8)
color = np.array([0, 255, 0])
colored_mask = np.zeros_like(image)
colored_mask[mask_uint8 > 0] = color
result_image = cv2.addWeighted(result_image, 0.7, colored_mask, 0.3, 0)
cv2.rectangle(result_image,
(input_box[0], input_box[1]),
(input_box[2], input_box[3]),
(0, 255, 0), 2)
output_dir = "./output"
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "result.jpg")
cv2.imwrite(output_path, cv2.cvtColor(result_image, cv2.COLOR_RGB2BGR))
print(f"Result saved to {output_path}")
if __name__ == "__main__":
main()
运行推理,将encoder模型和decoder模型分别放在encoder_convert_model和decoder_convert_model目录,或者修改python模型文件路径也可以。推理的图片要使用官方图片picture1,因为ROi区域是硬编码。
# 进入指定目录进行推理
cd ../encoder_convert_model/python
python predict.py
# 输出结果保存在 ./output/result.jpg
结果如下:
注:这里的结果是模型输入尺寸为448*448的结果。

模型配置
encoder模型的配置config_yml.py
#!/usr/bin/env python3
import os
import sys
from acuitylib.vsi_nn import VSInn
import numpy as np
# "database" allowed types: "TEXT, NPY, H5FS, SQLITE, LMDB, GENERATOR, ZIP"
DATASET = '../../dataset/mobliesam/encoder/dataset.txt'
DATASET_TYPE = "TEXT"
# mean, scale
MEAN = [123.675, 116.28, 103.53]
SCALE = [0.0171247, 0.017507, 0.0174292]
# reverse_channel: True bgr, False rgb
REVERSE_CHANNEL = False
# add_preproc_node, True or False
ADD_PREPROC_NODE = True
# "preproc_type" allowed types:"IMAGE_RGB, IMAGE_RGB888_PLANAR, IMAGE_RGB888_PLANAR_SEP, IMAGE_I420,
# IMAGE_NV12,IMAGE_NV21, IMAGE_YUV444, IMAGE_YUYV422, IMAGE_UYVY422, IMAGE_GRAY, IMAGE_BGRA, TENSOR"
PREPROC_TYPE = "IMAGE_RGB"
# add_postproc_node, quant output -> float32 output
ADD_POSTPROC_NODE = True
...
decoder模型的配置config_yml.py
由于在图像编码前就已经做了前处理操作(缩放、归一化等),所以对于decoder模型前处理节点就不需要打开,只需要打开后处理节点。
#!/usr/bin/env python3
import os
import sys
from acuitylib.vsi_nn import VSInn
import numpy as np
# "database" allowed types: "TEXT, NPY, H5FS, SQLITE, LMDB, GENERATOR, ZIP"
DATASET = ['./../../dataset/mobliesam/decoder/image_embeddings.npy',
'./../../dataset/mobliesam/decoder/point_coords.npy',
'./../../dataset/mobliesam/decoder/point_labels.npy',
'./../../dataset/mobliesam/decoder/mask_input.npy',
'./../../dataset/mobliesam/decoder/has_mask_input.npy']
DATASET_TYPE = ['NPY', 'NPY', 'NPY', 'NPY', 'NPY']
# mean, scale
MEAN = [0, 0, 0]
SCALE = [1.0, 1.0, 1.0]
# reverse_channel: True bgr, False rgb
REVERSE_CHANNEL = False
# add_preproc_node, True or False
ADD_PREPROC_NODE = False
# "preproc_type" allowed types:"IMAGE_RGB, IMAGE_RGB888_PLANAR, IMAGE_RGB888_PLANAR_SEP, IMAGE_I420,
# IMAGE_NV12,IMAGE_NV21, IMAGE_YUV444, IMAGE_YUYV422, IMAGE_UYVY422, IMAGE_GRAY, IMAGE_BGRA, TENSOR"
PREPROC_TYPE = "TENSOR"
# add_postproc_node
ADD_POSTPROC_NODE = True
...
模型前后处理
配置文件model_config.h
/****************************************************************************
* model config header file
****************************************************************************/
#ifndef _MODEL_CONFIG_H_
#define _MODEL_CONFIG_H_
#define LETTERBOX_ROWS 448
#define LETTERBOX_COLS 448
#define MOBILESAM_MASK_SIZE 112
#define TOP_LEFT_X 190
#define TOP_LEFT_Y 70
#define BOTTOM_RIGHT_X 460
#define BOTTOM_RIGHT_Y 280
#endif
前处理mobilesam_pre.cpp
#include <opencv2/core/core.hpp>
#include <opencv2/highgui/highgui.hpp>
#include <opencv2/imgproc/imgproc.hpp>
#include <iostream>
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <math.h>
#include "model_config.h"
/* model_inputmeta.yml file param modify, eg:
preproc_node_params:
add_preproc_node: true
preproc_type: IMAGE_RGB
*/
void calculate_letterbox_params(int img_rows, int img_cols, int letterbox_rows, int letterbox_cols,
float& scale_letterbox, int& top, int& left, int& resize_rows, int& resize_cols){
if ((letterbox_rows * 1.0 / img_rows) < (letterbox_cols * 1.0 / img_cols)) {
scale_letterbox = letterbox_rows * 1.0 / img_rows;
} else {
scale_letterbox = letterbox_cols * 1.0 / img_cols;
}
resize_cols = int(round(scale_letterbox * img_cols));
resize_rows = int(round(scale_letterbox * img_rows));
float dh = (float)(letterbox_rows - resize_rows);
float dw = (float)(letterbox_cols - resize_cols);
dh /= 2.0f;
dw /= 2.0f;
top = (int)(round(dh - 0.1));
left = (int)(round(dw - 0.1));
}
void calculate_scale_and_padding(const char* image_file, float& scale_letterbox, int& top, int& left) {
cv::Mat sample = cv::imread(image_file, 1);
if (sample.empty()) {
fprintf(stderr, "cv::imread %s failed\n", image_file);
scale_letterbox = 1.0;
top = 0;
left = 0;
return;
}
int resize_rows, resize_cols;
calculate_letterbox_params(sample.rows, sample.cols, LETTERBOX_ROWS, LETTERBOX_COLS,
scale_letterbox, top, left, resize_rows, resize_cols);
}
void get_input_data(const char* image_file, unsigned char* input_data, int letterbox_rows, int letterbox_cols) {
cv::Mat sample = cv::imread(image_file, 1);
if (sample.empty()) {
fprintf(stderr, "cv::imread %s failed\n", image_file);
return;
}
cv::Mat img;
cv::cvtColor(sample, img, cv::COLOR_BGR2RGB);
/* letterbox process to support different letterbox size */
float scale_letterbox;
int top, left, resize_rows, resize_cols;
calculate_letterbox_params(img.rows, img.cols, letterbox_rows, letterbox_cols,
scale_letterbox, top, left, resize_rows, resize_cols);
cv::resize(img, img, cv::Size(resize_cols, resize_rows));
// create a mat with input_data ptr
cv::Mat img_new(letterbox_rows, letterbox_cols, CV_8UC3, input_data);
int bot = (int)(round((float)(letterbox_rows - resize_rows) / 2.0f + 0.1));
int right = (int)(round((float)(letterbox_cols - resize_cols) / 2.0f + 0.1));
// Letterbox filling
cv::copyMakeBorder(img, img_new, top, bot, left, right, cv::BORDER_CONSTANT, cv::Scalar(114, 114, 114));
}
int mobilesam_preprocess(const char* imagepath, void* buff_ptr, unsigned int buff_size) {
int img_c = 3;
// set default letterbox size
int letterbox_rows = LETTERBOX_ROWS;
int letterbox_cols = LETTERBOX_COLS;
int img_size = letterbox_rows * letterbox_cols * img_c;
unsigned int data_size = img_size * sizeof(uint8_t);
if (data_size > buff_size) {
printf("data size > buff size, please check code. data_size=%u, buff_size=%u\n", data_size, buff_size);
return -1;
}
get_input_data(imagepath, (unsigned char*)buff_ptr, letterbox_rows, letterbox_cols);
printf("mobilesam preprocess completed: %s -> %dx%d, buffer size: %u\n",
imagepath, letterbox_cols, letterbox_rows, data_size);
return 0;
}
后处理mobilesam_post.cpp
#include <opencv2/core/core.hpp>
#include <opencv2/highgui/highgui.hpp>
#include <opencv2/imgproc/imgproc.hpp>
#include <iostream>
#include <stdio.h>
#include <vector>
#include <cmath>
#include <chrono>
#include <fstream>
#include <sstream>
#include "model_config.h"
using namespace std;
cv::Mat postprocess_mask(const cv::Mat& mask, const cv::Size& original_size)
{
cv::Mat mask_resized;
cv::resize(mask, mask_resized, cv::Size(original_size.height, original_size.width), 0, 0, cv::INTER_LINEAR);
return mask_resized;
}
cv::Mat process_masks(const float* mask_data, const float* iou_predictions, int mask_height, int mask_width, const cv::Size& original_size, float threshold = 0.0)
{
cv::Mat mask(mask_height, mask_width, CV_32FC1, (float*)mask_data);
cv::Mat mask_resized = postprocess_mask(mask, original_size);
cv::Mat mask_binary;
mask_binary = (mask_resized > threshold);
mask_binary.convertTo(mask_binary, CV_8UC1, 255);
return mask_binary;
}
int mobilesam_postprocess(const char *imagepath, float **output)
{
cv::Mat image = cv::imread(imagepath);
if (image.empty()) {
fprintf(stderr, "cv::imread %s failed\n", imagepath);
return -1;
}
std::chrono::steady_clock::time_point Tbegin, Tend;
Tbegin = std::chrono::steady_clock::now();
cv::Rect input_box;
cv::Point top_left(TOP_LEFT_X, TOP_LEFT_Y);
cv::Point bottom_right(BOTTOM_RIGHT_X, BOTTOM_RIGHT_Y);
int width = bottom_right.x - top_left.x;
int height = bottom_right.y - top_left.y;
input_box = cv::Rect(top_left.x, top_left.y, width, height);
printf("Input box: (%d, %d, %d, %d)\n", input_box.x, input_box.y, input_box.width, input_box.height);
std::vector<cv::Point> positive_points, negative_points;
cv::Size original_size(image.rows, image.cols);
// output[0]
const float *iou_predictions_data = output[0]; // iou_predictions
// output[1]
const float *low_res_masks_data = output[1]; // low_res_masks
int mask_height = MOBILESAM_MASK_SIZE;
int mask_width = MOBILESAM_MASK_SIZE;
cv::Mat best_mask = process_masks(low_res_masks_data, iou_predictions_data, mask_height, mask_width, original_size);
cv::Mat result_image = image.clone();
cv::Mat colored_mask = cv::Mat::zeros(image.size(), CV_8UC3);
colored_mask.setTo(cv::Scalar(0, 255, 0), best_mask);
cv::addWeighted(result_image, 0.7, colored_mask, 0.3, 0, result_image);
for (const auto& point : positive_points) {
cv::circle(result_image, point, 5, cv::Scalar(0, 255, 0), -1);
}
for (const auto& point : negative_points) {
cv::circle(result_image, point, 5, cv::Scalar(0, 0, 255), -1);
}
if (input_box.width > 0 && input_box.height > 0) {
cv::rectangle(result_image, input_box, cv::Scalar(0, 255, 0), 2);
}
Tend = std::chrono::steady_clock::now();
float f = std::chrono::duration_cast<std::chrono::milliseconds>(Tend - Tbegin).count();
std::cout << "post process time : " << f << " ms" << std::endl;
cv::imwrite("out_mobilesam.jpg", result_image);
// cv::imwrite("out_mobilesam_mask.png", best_mask);
printf("mobilesam_postprocess finished. \n");
return 0;
}
主函数main()
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <opencv2/core/core.hpp>
#include "npulib.h"
#include "model_config.h"
/*-------------------------------------------
Macros and Variables
-------------------------------------------*/
extern int mobilesam_preprocess(const char* imagepath, void* buff_ptr, unsigned int buff_size);
extern int mobilesam_postprocess(const char *imagepath, float **output);
extern void calculate_scale_and_padding(const char* image_file, float& scale_letterbox, int& top, int& left);
const char *usage =
"model_demo -encoder encoder_model_path -decoder decoder_model_path -i input_path -l loop_run_count -m malloc_mbyte \n"
"-encoder encoder_model_path: the encoder NBG file path.\n"
"-decoder decoder_model_path: the decoder NBG file path.\n"
"-i input_path: the input file path.\n"
"-l loop_run_count: the number of loop run network.\n"
"-m malloc_mbyte: npu_unit init memory Mbytes.\n"
"-h : help\n"
"example: model_demo -encoder encoder.nb -decoder decoder.nb -i input.jpg -l 10 -m 20 \n";
enum time_idx_e {
NPU_INIT = 0,
NETWORK_CREATE,
NETWORK_PREPARE,
NETWORK_PREPROCESS,
NETWORK_RUN,
NETWORK_LOOP,
TIME_IDX_MAX = 9
};
// float -> fp16
uint16_t float_to_fp16(float value) {
uint32_t f32 = *(uint32_t*)&value;
uint32_t sign = (f32 >> 31) & 0x1;
uint32_t exponent = (f32 >> 23) & 0xff;
uint32_t mantissa = f32 & 0x7fffff;
if (exponent == 0xff) {
return (sign << 15) | 0x7c00 | (mantissa >> 13);
} else if (exponent == 0) {
if (mantissa == 0) {
return sign << 15;
} else {
int shift = __builtin_clz(mantissa) - 8;
mantissa <<= shift;
exponent = 1 - shift;
return (sign << 15) | ((exponent & 0x1f) << 10) | (mantissa >> 13);
}
} else {
int new_exponent = exponent - 127 + 15;
if (new_exponent > 0x1f) {
return (sign << 15) | 0x7c00;
} else if (new_exponent < 1) {
int shift = 1 - new_exponent;
mantissa = (0x800000 | mantissa) >> shift;
return (sign << 15) | (mantissa >> 13);
} else {
return (sign << 15) | ((new_exponent & 0x1f) << 10) | (mantissa >> 13);
}
}
}
// float -> fp16
void float_array_to_fp16(const float* float_array, uint16_t* fp16_array, size_t size) {
for (size_t i = 0; i < size; i++) {
fp16_array[i] = float_to_fp16(float_array[i]);
}
}
int main(int argc, char** argv)
{
int status = 0;
int i = 0;
unsigned int count = 0;
long long total_infer_time = 0;
char *encoder_model_file = nullptr;
char *decoder_model_file = nullptr;
char *input_file = nullptr;
unsigned int loop_count = 1;
if (argc < 2) {
printf("%s\n", usage);
return -1;
}
for (i = 0; i< argc; i++) {
if (!strcmp(argv[i], "-encoder")) {
encoder_model_file = argv[++i];
}
else if (!strcmp(argv[i], "-decoder")) {
decoder_model_file = argv[++i];
}
else if (!strcmp(argv[i], "-i")) {
input_file = argv[++i];
}
else if (!strcmp(argv[i], "-l")) {
loop_count = atoi(argv[++i]);
}
else if (!strcmp(argv[i], "-h")) {
printf("%s\n", usage);
return 0;
}
}
printf("encoder_model_file=%s, decoder_model_file=%s, input=%s, loop_count=%d \n", encoder_model_file, decoder_model_file, input_file, loop_count);
if (encoder_model_file == nullptr || decoder_model_file == nullptr)
return -1;
/* NPU init*/
NpuUint npu_uint;
int ret = npu_uint.npu_init();
if (ret != 0) {
return -1;
}
// encoder
NetworkItem encoder;
unsigned int encoder_id = 0;
status = encoder.network_create(encoder_model_file, encoder_id);
if (status != 0) {
printf("encoder network create failed.\n");
return -1;
}
status = encoder.network_prepare();
if (status != 0) {
printf("encoder network prepare fail, status=%d\n", status);
return -1;
}
// decoder
NetworkItem decoder;
unsigned int decoder_id = 1;
status = decoder.network_create(decoder_model_file, decoder_id);
if (status != 0) {
printf("decoder network create failed.\n");
return -1;
}
status = decoder.network_prepare();
if (status != 0) {
printf("decoder network prepare fail, status=%d\n", status);
return -1;
}
TimeBegin(NETWORK_PREPROCESS);
// input jpg file, no copy way
void *input_buffer_ptr = nullptr;
unsigned int input_buffer_size = 0;
encoder.get_network_input_buff_info(0, &input_buffer_ptr, &input_buffer_size);
printf("encoder input buffer ptr: %p, buffer size: %d \n", input_buffer_ptr, input_buffer_size);
mobilesam_preprocess(input_file, input_buffer_ptr, input_buffer_size);
TimeEnd(NETWORK_PREPROCESS);
printf("feed input cost: %lu us.\n", (unsigned long)TimeGet(NETWORK_PREPROCESS));
// create encoder output buffer
int encoder_output_cnt = encoder.get_output_cnt();
float **encoder_output_data = new float*[encoder_output_cnt]();
for (int i = 0; i < encoder_output_cnt; i++)
encoder_output_data[i] = new float[encoder.m_output_data_len[i]];
// create decoder output buffer
int decoder_output_cnt = decoder.get_output_cnt();
float **decoder_output_data = new float*[decoder_output_cnt]();
for (int i = 0; i < decoder_output_cnt; i++) {
decoder_output_data[i] = new float[decoder.m_output_data_len[i]];
}
i = 0;
/* run network */
TimeBegin(NETWORK_LOOP);
while (count < loop_count) {
count++;
// run encoder
status = encoder.network_input_output_set();
if (status != 0) {
printf("set encoder input/output failed.\n");
return -1;
}
#if defined (__linux__)
TimeBegin(NETWORK_RUN);
#endif
status = encoder.network_run();
if (status != 0) {
printf("fail to run encoder, status=%d\n", status);
return -2;
}
#if defined (__linux__)
TimeEnd(NETWORK_RUN);
printf("encoder run time: %lu us.\n", (unsigned long)TimeGet(NETWORK_RUN));
#endif
total_infer_time += (unsigned long)TimeGet(NETWORK_RUN);
// get encoder output
encoder.get_output(encoder_output_data);
// Use the encoder output as the decoder input (1, 256, 28, 28).
void *decoder_input_ptr = nullptr;
unsigned int decoder_input_size = 0;
int ret = decoder.get_network_input_buff_info(0, &decoder_input_ptr, &decoder_input_size);
if (ret == 0 && decoder_input_ptr != nullptr && decoder_input_size > 0) {
// fp32 -> fp16
float_array_to_fp16(encoder_output_data[0], (uint16_t*)decoder_input_ptr, encoder.m_output_data_len[0]);
} else {
printf("Error: Failed to get decoder input 0 buffer info\n");
return -1;
}
// point_coords input (1, 2, 2)
void *point_coords_ptr = nullptr;
unsigned int point_coords_size = 0;
ret = decoder.get_network_input_buff_info(1, &point_coords_ptr, &point_coords_size);
if (ret == 0 && point_coords_ptr != nullptr && point_coords_size > 0) {
float scale_letterbox;
int top, left;
calculate_scale_and_padding(input_file, scale_letterbox, top, left);
float point_coords_float[4] = {
TOP_LEFT_X * scale_letterbox + left, // x1
TOP_LEFT_Y * scale_letterbox + top, // y1
BOTTOM_RIGHT_X * scale_letterbox + left, // x2
BOTTOM_RIGHT_Y * scale_letterbox + top // y2
};
for (int i = 0; i < 4; i++) {
if (point_coords_float[i] < 0.0f) point_coords_float[i] = 0.0f;
if (point_coords_float[i] > LETTERBOX_ROWS) point_coords_float[i] = LETTERBOX_ROWS;
}
// float32 -> fp16
uint16_t point_coords_fp16[4];
float_array_to_fp16(point_coords_float, point_coords_fp16, 4);
memcpy(point_coords_ptr, point_coords_fp16, sizeof(point_coords_fp16));
} else {
printf("Error: Failed to get decoder input 1 buffer info\n");
return -1;
}
// point_labels
void *point_labels_ptr = nullptr;
unsigned int point_labels_size = 0;
ret = decoder.get_network_input_buff_info(2, &point_labels_ptr, &point_labels_size);
if (ret == 0 && point_labels_ptr != nullptr && point_labels_size > 0) {
float point_labels_float[2] = {2.0f, 3.0f};
// float32 -> fp16
uint16_t point_labels_fp16[2];
float_array_to_fp16(point_labels_float, point_labels_fp16, 2);
memcpy(point_labels_ptr, point_labels_fp16, sizeof(point_labels_fp16));
} else {
printf("Error: Failed to get decoder input 2 buffer info\n");
return -1;
}
// mask_input (1, 1, 112, 112)
void *mask_input_ptr = nullptr;
unsigned int mask_input_size = 0;
ret = decoder.get_network_input_buff_info(3, &mask_input_ptr, &mask_input_size);
if (ret == 0 && mask_input_ptr != nullptr && mask_input_size > 0) {
memset(mask_input_ptr, 0, mask_input_size);
} else {
printf("Error: Failed to get decoder input 3 buffer info\n");
return -1;
}
// has_mask_input
void *has_mask_input_ptr = nullptr;
unsigned int has_mask_input_size = 0;
ret = decoder.get_network_input_buff_info(4, &has_mask_input_ptr, &has_mask_input_size);
if (ret == 0 && has_mask_input_ptr != nullptr && has_mask_input_size > 0) {
uint8_t has_mask_input_uint8 = 0;
memcpy(has_mask_input_ptr, &has_mask_input_uint8, sizeof(has_mask_input_uint8));
} else {
printf("Error: Failed to get decoder input 4 buffer info\n");
return -1;
}
// run decoder
status = decoder.network_input_output_set();
if (status != 0) {
printf("set decoder input/output failed.\n");
return -1;
}
#if defined (__linux__)
TimeBegin(NETWORK_RUN);
#endif
status = decoder.network_run();
if (status != 0) {
printf("fail to run decoder, status=%d\n", status);
return -2;
}
#if defined (__linux__)
TimeEnd(NETWORK_RUN);
printf("decoder run time: %lu us.\n", (unsigned long)TimeGet(NETWORK_RUN));
#endif
total_infer_time += (unsigned long)TimeGet(NETWORK_RUN);
// get decoder output
decoder.get_output(decoder_output_data);
// postprocess
mobilesam_postprocess(input_file, decoder_output_data);
}
TimeEnd(NETWORK_LOOP);
if (loop_count > 1) {
printf("this network run avg inference time=%d us, total avg cost: %d us\n",
(uint32_t)(total_infer_time / loop_count), (unsigned int)(TimeGet(NETWORK_LOOP) / loop_count));
}
// free output buffer
for (int i = 0; i < encoder_output_cnt; i++) {
delete[] encoder_output_data[i];
encoder_output_data[i] = nullptr;
}
if (encoder_output_data != nullptr)
delete[] encoder_output_data;
for (int i = 0; i < decoder_output_cnt; i++) {
delete[] decoder_output_data[i];
decoder_output_data[i] = nullptr;
}
if (decoder_output_data != nullptr)
delete[] decoder_output_data;
return ret;
}
模型转换
进入创建的docker容器
分别进入encoder_convert_model和decoder_convert_model目录进行模型转换
1.转换encoder模型,在最后导出模型,时间稍长,等待即可。
# using xxx_env.sh to create softlink
./convert_model_env.sh
# 导入
# pegasus_import.sh <model_name>
./pegasus_import.sh mobilesam_encoder
# 量化
# pegasus_quantize.sh <model_name> <quantize_type> <calibration_set_size>
./pegasus_quantize.sh mobilesam_encoder int16 10
# 仿真(可选)
# pegasus_inference.sh <model_name> <quantize_type>
./pegasus_inference.sh mobilesam_encoder int16
# 导出nb模型
# pegasus_export_ovx_nbg.sh <model_name> <quantize_type> <platform>
./pegasus_export_ovx_nbg.sh mobilesam_encoder int16 t736
# 导出的模型文件存放在../model目录
# 例如 ../model/mobilesam_encoder_int16_t736.nb
2.转换decoder模型。
# using xxx_env.sh to create softlink
./convert_model_env.sh
# 导入
# pegasus_import.sh <model_name>
./pegasus_import.sh mobilesam_decoder_sim
# 量化
# pegasus_quantize.sh <model_name> <quantize_type> <calibration_set_size>
./pegasus_quantize.sh mobilesam_decoder_sim uint8 10
# 仿真(可选)
# pegasus_inference.sh <model_name> <quantize_type>
./pegasus_inference.sh mobilesam_decoder_sim uint8
# 导出nb模型
# pegasus_export_ovx_nbg.sh <model_name> <quantize_type> <platform>
./pegasus_export_ovx_nbg.sh mobilesam_decoder_sim uint8 t736
# 导出的模型文件存放在../model目录
# 例如 ../model/mobilesam_decoder_sim_uint8_t736.nb
交叉编译
编译工具链参考其他模型,这里直接给出编译命令
cd ../examples/mobilesam/
./../build_linux.sh -t t736
在./examples/mobilesam/目录下生成install目录,目录结构如下:
`-- mobileSam_demo_linux_t736
|-- mobileSam_demo_t736
`-- model
|-- mobilesam_decoder_sim_uint8_t736.nb
|-- mobilesam_encoder_int16_t736.nb
`-- picture1.jpg
模型推理
将上述生成的文件推送至开发板,方式不限于adb这一种
adb push install\mobilesam_demo_linux_t736 /mnt/UDISK/
运行推理
./mobilesam_demo_t736 -encoder model/mobilesam_encoder_int16_t736.nb -decoder model/mobilesam_decoder_sim_uint8_t736.nb -i model/picture1.jpg
板端的运行结果:

注:由于将原始模型的输入尺寸1024×1024改为了448×448,所以导致分割区域边缘精度有所下降。如果对精度有要求,可以使用模型原来的输入尺寸,同时需要修改模型配置文件`model_config.h`。
原始模型输入尺寸1024*1024结果如下

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)