本文介绍一个基于 YOLO 目标检测模型和 OpenCV 图像处理技术实现的瓶装液位检测方法。

1.前言

1.1yolo源码+环境配置

官方地址:https://github.com/ultralytics/ultralytics

环境配置:https://docs.ultralytics.com/zh/quickstart/

1.2模型训练

需要训练两个模型。

bottle.pt:负责瓶体检测与跟踪

roi_level.pt:负责液位区域定位。

两个模型均通过 Ultralytics YOLO 框架训练,并在推理阶段依次调用。

bottle.pt  数据集和标签如下图:

          

                                           数据集                                                                      标签

roi_level.pt  数据集和标签如下图:

该数据集需要把图片裁剪到瓶身大小。

               

                                           数据集                                                                标签

1.3源码

https://pan.baidu.com/s/1SIjfKDKldKVd3yWy5pqIhQ?pwd=kmd2

2核心功能

2.1总流程

视频输入 ➡ YOLO瓶体检测与跟踪 ➡ ROI检测 ➡ 液位检测算法 ➡ OK/NG判定 ➡ 统计与可视化输出

2.2瓶子检测与跟踪

通过 bottle_model.pt 模型检测图像中的瓶子并进行跟踪。bottle_detect_and_track()

class BottleTracker:

    def bottle_detect_and_track(self, frame):
        """
        Bottle detection and tracking with bottle_model
        """
        h, w = frame.shape[:2]
        detection_line_fraction = self.detection_line_fraction
        self.detection_line_x = int(w * detection_line_fraction)
        results = self.bottle_model.track(
            frame,
            conf=self.conf,
            iou=self.iou,
            tracker="bytetrack.yaml",
            persist=True,
            verbose=False,
            imgsz=640,
        )

        newly_processed = []
        if results and len(results) > 0:
            result = results[0]
            if result.boxes is not None and result.boxes.id is not None:
                boxes = result.boxes.xyxy.cpu().numpy()
                track_ids = result.boxes.id.cpu().numpy().astype(int)
                for bbox, track_id in zip(boxes, track_ids):
                    if self.has_crossed_line(bbox, track_id):
                        res = self.process_bottle(frame, bbox, track_id)
                        newly_processed.append(res)
        return results, newly_processed

2.3ROI精确定位

2.2 基础上裁剪出瓶子区域,在瓶子区域图像中使用 roi_level.pt 模型定位感兴趣区域(ROI)。process_bottle()、detect_objects_in_bottle()

class BottleTracker:

    def detect_objects_in_bottle(self, bottle):
        """
        Use roi_model to detect the region of interest in the bottle area
        """
        if bottle.size == 0:
            return None, None

        results = self.roi_model(bottle, conf=self.conf, verbose=False)
        if results and len(results) > 0:
            result = results[0]
            if result.boxes is not None and len(result.boxes) > 0:
                boxes = result.boxes.xyxy.cpu().numpy()
                confs = result.boxes.conf.cpu().numpy()
                best_idx = np.argmax(confs)
                roi_bbox = boxes[best_idx].astype(int)
                # ROI
                x1, y1, x2, y2 = roi_bbox
                roi_img = bottle[y1:y2, x1:x2]
                return roi_bbox, roi_img
        return None, None

    def process_bottle(self, frame, bottle_bbox, track_id):
        """
        process bottle:
        1. Cutting bottle area
        2. Use roi_model to detect ROI
        3. Detect the liquid level in ROI
        """
        x1, y1, x2, y2 = map(int, bottle_bbox)
        bottle_img = frame[y1:y2, x1:x2]

        # ROI detection in bottle area
        roi_bbox, roi_img = self.detect_objects_in_bottle(bottle_img)

        liquid_level = None
        roi_liquid_level = None
        bottle_liquid_level = None
        gradient_info = {}
        if roi_bbox is not None and roi_img is not None:
            # Detect liquid level in ROI
            gauss_kernel = self.gauss_kernel
            gradient_threshold = self.gradient_threshold
            roi_liquid_level, gradient_info = process_roi(roi_img, gauss_kernel, gradient_threshold)

            if roi_liquid_level is not None:
                bottle_liquid_level = roi_liquid_level + roi_bbox[1]
                liquid_level = bottle_liquid_level + y1

        classification = self._classify(liquid_level)
        status = classification["status"]

        result = {
            "track_id": track_id,
            "roi_bbox": roi_bbox,
            "roi_img": roi_img,
            "bottle_bbox": (x1, y1, x2, y2),
            "bottle_img": bottle_img,
            "roi_liquid_level": roi_liquid_level,
            "bottle_liquid_level": bottle_liquid_level,
            "liquid_level": liquid_level,
            "status": status,
            "gradient_info": gradient_info,
            "classification": classification,
        }

        self.bottle_results[track_id] = result
        self.processed_bottles.add(track_id)

        self.stats["total"] += 1
        if status == "OK":
            self.stats["ok"] += 1
        else:
            self.stats["ng"] += 1
        return result

2.4液位检测算法

在 2.3 基础上,在ROI内检测液位。process_roi()、detect_liquid_level()

核心算法:detect_liquid_level(),通过分析图像灰度梯度检测液位


原理:
液面处通常存在明显的明暗变化(液体与空气的分界),该位置的水平方向灰度梯度值最大。


步骤:
步骤1:计算每行像素的平均灰度值(垂直强度分布)
步骤2:计算灰度梯度(相邻行的亮度变化)
步骤3:找到梯度最大的位置(亮度变化最剧烈的地方)
正常情况:液位在梯度最大位置的下方

def detect_liquid_level(gray_img, min_gradient_threshold=10):
    if gray_img.size == 0 or gray_img.shape[0] < 2:
        return None, {}
    intensity_profile = np.mean(gray_img, axis=1)
    gradients = np.diff(intensity_profile)
    abs_gradients = np.abs(gradients)
    if len(abs_gradients) == 0:
        return None, {}
    max_gradient_idx = np.argmax(abs_gradients)
    max_gradient_value = abs_gradients[max_gradient_idx]
    if max_gradient_value < min_gradient_threshold:
        dark_threshold = np.mean(intensity_profile) * 0.7
        liquid_region = np.where(intensity_profile < dark_threshold)[0]
        if len(liquid_region) > 0:
            liquid_level = liquid_region[0]
        else:
            return None, {
                "intensity_profile": intensity_profile,
                "gradients": gradients,
                "max_gradient_idx": max_gradient_idx,
                "max_gradient_value": max_gradient_value,
            }
    else:
        liquid_level = max_gradient_idx + 1
    return liquid_level, {
        "intensity_profile": intensity_profile,
        "gradients": gradients,
        "max_gradient_idx": max_gradient_idx,
        "max_gradient_value": max_gradient_value,
    }


def process_roi(roi_img, gauss_kernel=5, gradient_threshold=10):
    if roi_img.size == 0:
        return None, None, {}
    gray = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY)
    k = gauss_kernel if gauss_kernel % 2 == 1 else gauss_kernel + 1
    blurred = cv2.GaussianBlur(gray, (k, k), 0)
    roi_liquid_level, gradient_info = detect_liquid_level(
        blurred, min_gradient_threshold=gradient_threshold
    )
    return roi_liquid_level, gradient_info

2.5结果判定

根据检测到的液位判断是否合格
偏差 = 实际液位 - 目标液位
偏差在容差范围内即为合格

class BottleTracker:

    def _classify(self, liquid_level):
        result = {
            "status": "NG",
            "liquid_present": 0,
            "level_ok": 0,
        }
        if liquid_level is None:
            return result
        result["liquid_present"] = 1

        target_liquid_level_y = self.target_liquid_level_y
        tolerance = self.tolerance
        deviation = liquid_level - target_liquid_level_y
        if deviation <= tolerance:
            result["level_ok"] = 1
            result["status"] = "OK"
        return result

2.6可视化

系统实时显示检测结果:瓶体框、ROI框、液位线、目标液位线、容差带、检测统计信息。visualize()

                  

                                OK                                                                                NG

3完整代码

import os
import cv2
import numpy as np
from datetime import datetime
from ultralytics import YOLO

# ──────────────────────────────────────────────────────────────────────────────
# CONSTANTS / DEFAULTS
# ──────────────────────────────────────────────────────────────────────────────

DEFAULT_BOTTLE_MODEL_PATH = "bottle.pt"
DEFAULT_ROI_MODEL_PATH = "roi_level.pt"
DEFAULT_VIDEO_PATH = "video1.mp4"
DEFAULT_TARGET_LEVEL = 800
DEFAULT_CONF = 0.30
DEFAULT_IOU = 0.45
DEFAULT_TOLERANCE = 20
DEFAULT_GRADIENT_THR = 5
DEFAULT_GAUSS_KERNEL = 5
DEFAULT_DETECT_LINE = 0.50  # fraction of frame width
DEFAULT_OUTPUT_VIDEO = "output_result.mp4"  # 默认输出视频文件名


# ──────────────────────────────────────────────────────────────────────────────
# LIQUID LEVEL DETECTION CORE
# ──────────────────────────────────────────────────────────────────────────────

def detect_liquid_level(gray_img, min_gradient_threshold=10):
    if gray_img.size == 0 or gray_img.shape[0] < 2:
        return None, {}
    intensity_profile = np.mean(gray_img, axis=1)
    gradients = np.diff(intensity_profile)
    abs_gradients = np.abs(gradients)
    if len(abs_gradients) == 0:
        return None, {}
    max_gradient_idx = np.argmax(abs_gradients)
    max_gradient_value = abs_gradients[max_gradient_idx]
    if max_gradient_value < min_gradient_threshold:
        dark_threshold = np.mean(intensity_profile) * 0.7
        liquid_region = np.where(intensity_profile < dark_threshold)[0]
        if len(liquid_region) > 0:
            liquid_level = liquid_region[0]
        else:
            return None, {
                "intensity_profile": intensity_profile,
                "gradients": gradients,
                "max_gradient_idx": max_gradient_idx,
                "max_gradient_value": max_gradient_value,
            }
    else:
        liquid_level = max_gradient_idx + 1
    return liquid_level, {
        "intensity_profile": intensity_profile,
        "gradients": gradients,
        "max_gradient_idx": max_gradient_idx,
        "max_gradient_value": max_gradient_value,
    }


def process_roi(roi_img, gauss_kernel=5, gradient_threshold=10):
    if roi_img.size == 0:
        return None, None, {}
    gray = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY)
    k = gauss_kernel if gauss_kernel % 2 == 1 else gauss_kernel + 1
    blurred = cv2.GaussianBlur(gray, (k, k), 0)
    roi_liquid_level, gradient_info = detect_liquid_level(
        blurred, min_gradient_threshold=gradient_threshold
    )
    return roi_liquid_level, gradient_info


# ──────────────────────────────────────────────────────────────────────────────
# BOTTLE TRACKER
# ──────────────────────────────────────────────────────────────────────────────

class BottleTracker:
    """
    1. Use bottle_model to detect bottles and track them
    2. Use roi_model to detect ROI for each bottle area
    """

    def __init__(self, bottle_model_path, roi_model_path, detection_line_fraction,
                 target_liquid_level, tolerance, conf, iou,
                 gauss_kernel, gradient_threshold):
        self.bottle_model = YOLO(bottle_model_path)
        self.roi_model = YOLO(roi_model_path)
        self.detection_line_fraction = detection_line_fraction
        self.target_liquid_level_y = target_liquid_level
        self.tolerance = tolerance
        self.conf = conf
        self.iou = iou
        self.gauss_kernel = gauss_kernel
        self.gradient_threshold = gradient_threshold

        self.processed_bottles = set()
        self.bottle_results = {}
        self.detection_line_x = 0
        self.stats = {
            "total": 0,
            "ok": 0,
            "ng": 0,
        }

    def reset(self):
        self.processed_bottles.clear()
        self.bottle_results.clear()
        self.stats = {
            "total": 0,
            "ok": 0,
            "ng": 0,
        }

    def has_crossed_line(self, bbox, track_id):
        x1, y1, x2, y2 = bbox
        center_x = (x1 + x2) / 2
        return center_x >= self.detection_line_x and track_id not in self.processed_bottles

    def _classify(self, liquid_level):
        result = {
            "status": "NG",
            "liquid_present": 0,
            "level_ok": 0,
        }
        if liquid_level is None:
            return result
        result["liquid_present"] = 1

        target_liquid_level_y = self.target_liquid_level_y
        tolerance = self.tolerance
        deviation = liquid_level - target_liquid_level_y
        if deviation <= tolerance:
            result["level_ok"] = 1
            result["status"] = "OK"
        return result

    def detect_objects_in_bottle(self, bottle):
        """
        Use roi_model to detect the region of interest in the bottle area
        """
        if bottle.size == 0:
            return None, None

        results = self.roi_model(bottle, conf=self.conf, verbose=False)
        if results and len(results) > 0:
            result = results[0]
            if result.boxes is not None and len(result.boxes) > 0:
                boxes = result.boxes.xyxy.cpu().numpy()
                confs = result.boxes.conf.cpu().numpy()
                best_idx = np.argmax(confs)
                roi_bbox = boxes[best_idx].astype(int)
                # ROI
                x1, y1, x2, y2 = roi_bbox
                roi_img = bottle[y1:y2, x1:x2]
                return roi_bbox, roi_img
        return None, None

    def process_bottle(self, frame, bottle_bbox, track_id):
        """
        process bottle:
        1. Cutting bottle area
        2. Use roi_model to detect ROI
        3. Detect the liquid level in ROI
        """
        x1, y1, x2, y2 = map(int, bottle_bbox)
        bottle_img = frame[y1:y2, x1:x2]

        # ROI detection in bottle area
        roi_bbox, roi_img = self.detect_objects_in_bottle(bottle_img)

        liquid_level = None
        roi_liquid_level = None
        bottle_liquid_level = None
        gradient_info = {}
        if roi_bbox is not None and roi_img is not None:
            # Detect liquid level in ROI
            gauss_kernel = self.gauss_kernel
            gradient_threshold = self.gradient_threshold
            roi_liquid_level, gradient_info = process_roi(roi_img, gauss_kernel, gradient_threshold)

            if roi_liquid_level is not None:
                bottle_liquid_level = roi_liquid_level + roi_bbox[1]
                liquid_level = bottle_liquid_level + y1

        classification = self._classify(liquid_level)
        status = classification["status"]

        result = {
            "track_id": track_id,
            "roi_bbox": roi_bbox,
            "roi_img": roi_img,
            "bottle_bbox": (x1, y1, x2, y2),
            "bottle_img": bottle_img,
            "roi_liquid_level": roi_liquid_level,
            "bottle_liquid_level": bottle_liquid_level,
            "liquid_level": liquid_level,
            "status": status,
            "gradient_info": gradient_info,
            "classification": classification,
        }

        self.bottle_results[track_id] = result
        self.processed_bottles.add(track_id)

        self.stats["total"] += 1
        if status == "OK":
            self.stats["ok"] += 1
        else:
            self.stats["ng"] += 1
        return result

    def bottle_detect_and_track(self, frame):
        """
        Bottle detection and tracking with bottle_model
        """
        h, w = frame.shape[:2]
        detection_line_fraction = self.detection_line_fraction
        self.detection_line_x = int(w * detection_line_fraction)
        results = self.bottle_model.track(
            frame,
            conf=self.conf,
            iou=self.iou,
            tracker="bytetrack.yaml",
            persist=True,
            verbose=False,
            imgsz=640,
        )

        newly_processed = []
        if results and len(results) > 0:
            result = results[0]
            if result.boxes is not None and result.boxes.id is not None:
                boxes = result.boxes.xyxy.cpu().numpy()
                track_ids = result.boxes.id.cpu().numpy().astype(int)
                for bbox, track_id in zip(boxes, track_ids):
                    if self.has_crossed_line(bbox, track_id):
                        res = self.process_bottle(frame, bbox, track_id)
                        newly_processed.append(res)
        return results, newly_processed

    def visualize(self, frame, yolo_results, show_ids=True):
        vis = frame.copy()
        h, w = vis.shape[:2]
        target_y = self.target_liquid_level_y

        # Target line (green)
        cv2.line(vis, (0, target_y), (w, target_y), (0, 220, 150), 2)
        cv2.putText(vis, f"TARGET LIQUID LEVEL={target_y}", (10, target_y - 8),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 220, 150), 2)

        # Tolerance bands (dashed red)
        tolerance = self.tolerance
        for tolerance_y in [target_y - tolerance, target_y + tolerance]:
            for i in range(0, w, 20):
                cv2.line(vis, (i, tolerance_y), (min(i + 10, w), tolerance_y), (60, 60, 220), 2)

        # Detection line (cyan vertical)
        cv2.line(vis, (self.detection_line_x, 0),
                 (self.detection_line_x, h), (220, 220, 0), 2)
        cv2.putText(vis, "DETECT", (self.detection_line_x + 5, 28),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (220, 220, 0), 2)

        # Detections
        if yolo_results and len(yolo_results) > 0:
            result = yolo_results[0]
            if result.boxes is not None and result.boxes.id is not None:
                boxes = result.boxes.xyxy.cpu().numpy()
                track_ids = result.boxes.id.cpu().numpy().astype(int)
                for bbox, track_id in zip(boxes, track_ids):
                    x1, y1, x2, y2 = map(int, bbox)
                    if track_id in self.bottle_results:
                        bottle_result = self.bottle_results[track_id]
                        status = bottle_result["status"]
                        color = ((0, 220, 80) if status == "OK"
                                 else (0, 60, 220))
                        cv2.rectangle(vis, (x1, y1), (x2, y2), color, 2)

                        roi_bbox = bottle_result.get("roi_bbox")
                        if roi_bbox is not None:
                            roi_x1, roi_y1, roi_x2, roi_y2 = roi_bbox
                            # 转换到原始帧坐标
                            abs_roi_x1 = x1 + roi_x1
                            abs_roi_y1 = y1 + roi_y1
                            abs_roi_x2 = x1 + roi_x2
                            abs_roi_y2 = y1 + roi_y2
                            cv2.rectangle(vis, (abs_roi_x1, abs_roi_y1),
                                          (abs_roi_x2, abs_roi_y2), (255, 200, 100), 2)

                        liquid_level = bottle_result["liquid_level"]
                        if liquid_level is not None:
                            cv2.line(vis, (x1, int(liquid_level)), (x2, int(liquid_level)), color, 2)
                    else:
                        cv2.rectangle(vis, (x1, y1), (x2, y2), (200, 200, 200), 2)

                    if show_ids:
                        label = f"#{track_id}"
                        if track_id in self.bottle_results:
                            label += f" {self.bottle_results[track_id]['status']}"
                        cv2.putText(vis, label, (x1, y1 - 8),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.48, (200, 200, 200), 2)

        # Stats overlay
        pad = 8
        Stats_x, Stats_y = 8, h - 108
        cv2.rectangle(vis, (Stats_x, Stats_y - pad), (420, h - pad), (0, 0, 0), -1)
        stats = self.stats
        cv2.putText(vis,
                    f"Total:{stats['total']}  OK:{stats['ok']}  NG:{stats['ng']}",
                    (Stats_x + 4, Stats_y + 34), cv2.FONT_HERSHEY_SIMPLEX, 1,
                    (220, 220, 220), 2)
        ok_rate = stats["ok"] / max(stats["total"], 1) * 100
        cv2.putText(vis, f"OK Rate:{ok_rate:.1f}%",
                    (Stats_x + 4, Stats_y + 76), cv2.FONT_HERSHEY_SIMPLEX, 1,
                    (0, 220, 150), 2)
        return vis


# ──────────────────────────────────────────────────────────────────────────────
# VIDEO PROCESSOR (Command Line Version)
# ──────────────────────────────────────────────────────────────────────────────

class VideoProcessor:
    def __init__(self, video_path, bottle_model_path, roi_model_path,
                 target_liquid_level, conf, iou, tolerance,
                 gauss_kernel, gradient_threshold,
                 detect_line_fraction, show_ids=True,
                 output_video=None, save_video=False):
        self.video_path = video_path
        self.bottle_model_path = bottle_model_path
        self.roi_model_path = roi_model_path
        self.target_liquid_level = target_liquid_level
        self.conf = conf
        self.iou = iou
        self.tolerance = tolerance
        self.gauss_kernel = gauss_kernel
        self.gradient_threshold = gradient_threshold
        self.detect_line_fraction = detect_line_fraction
        self.show_ids = show_ids
        self.output_video = output_video
        self.save_video = save_video

        self.tracker = None
        self.video_writer = None

    def _log(self, msg):
        timestamp = datetime.now().strftime("%H:%M:%S")
        print(f"[{timestamp}] {msg}")

    def initialize(self):
        self._log(f"Loading bottle model: {self.bottle_model_path}")
        self._log(f"Loading ROI model: {self.roi_model_path}")

        self.tracker = BottleTracker(
            bottle_model_path=self.bottle_model_path,
            roi_model_path=self.roi_model_path,
            detection_line_fraction=self.detect_line_fraction,
            target_liquid_level=self.target_liquid_level,
            tolerance=self.tolerance,
            conf=self.conf,
            iou=self.iou,
            gauss_kernel=self.gauss_kernel,
            gradient_threshold=self.gradient_threshold
        )

    def setup_video_writer(self, width, height, fps):
        if not self.save_video:
            return

        if self.output_video is None:
            base_name = os.path.splitext(os.path.basename(self.video_path))[0]
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            self.output_video = f"{base_name}_result_{timestamp}.mp4"

        output_dir = os.path.dirname(self.output_video)
        if output_dir and not os.path.exists(output_dir):
            os.makedirs(output_dir)

        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        self.video_writer = cv2.VideoWriter(
            self.output_video, fourcc, fps, (width, height)
        )

        if self.video_writer.isOpened():
            self._log(f"✓ Output video will be saved to: {self.output_video}")
            self._log(f"  Resolution: {width}×{height}, FPS: {fps:.2f}")
        else:
            self._log(f"⚠ Warning: Failed to create video writer for {self.output_video}")
            self.save_video = False

    def release_video_writer(self):
        if self.video_writer is not None:
            self.video_writer.release()
            self._log(f"✓ Video saved to: {self.output_video}")

    def run(self):
        if self.tracker is None:
            self.initialize()

        self._log(f"Opening video: {os.path.basename(self.video_path)}")
        cap = cv2.VideoCapture(self.video_path)
        if not cap.isOpened():
            raise RuntimeError(f"Cannot open video: {self.video_path}")

        fps = cap.get(cv2.CAP_PROP_FPS) or 25
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        self._log(f"Video info: {width}×{height} @ {fps:.1f} fps")

        # 设置视频写入器
        self.setup_video_writer(width, height, fps)

        frame_count = 0
        bottle_detection_results = None

        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    # Loop video
                    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
                    self.tracker.processed_bottles.clear()
                    self._log("↩ Video loop restarted (tracking IDs reset)")
                    continue

                frame_count += 1

                try:
                    bottle_detection_results, newly = self.tracker.bottle_detect_and_track(frame)
                except Exception as e:
                    self._log(f"⚠ Detection error: {e}")
                    continue

                # Log newly processed bottles
                for res in newly:
                    track_id = res["track_id"]
                    liquid_level = res["liquid_level"]
                    status = res["status"]
                    level_ok = res["classification"]["level_ok"]
                    deviation = (liquid_level - self.target_liquid_level) if liquid_level is not None else None
                    dev_str = f" deviation={deviation:+d}px" if deviation is not None else ""
                    icon = "✅" if status == "OK" else "❌"
                    self._log(
                        f"{icon} Bottle #{track_id}: {status}\n"
                        f"liquid_level={liquid_level if liquid_level else 'N/A'}\n"
                        f"{dev_str}\n"
                        f"level_ok={level_ok}\n")

                # Visualize
                vis = self.tracker.visualize(frame, bottle_detection_results, self.show_ids)

                # 保存视频帧
                if self.save_video and self.video_writer is not None:
                    self.video_writer.write(vis)

                # Show window
                cv2.namedWindow('BottleVision - Liquid Level Detection', cv2.WINDOW_NORMAL)
                cv2.resizeWindow('BottleVision - Liquid Level Detection', 450, 800)
                cv2.moveWindow('BottleVision - Liquid Level Detection', 500, 100)
                cv2.imshow("BottleVision - Liquid Level Detection", vis)

                # Print stats every 100 frames
                if frame_count % 100 == 0:
                    stats = self.tracker.stats
                    ok_rate = stats["ok"] / max(stats["total"], 1) * 100
                    self._log(
                        f"📊 Stats - Total: {stats['total']}, OK: {stats['ok']}, NG: {stats['ng']}, OK Rate: {ok_rate:.1f}%")

                # Press 'q' to quit, 'r' to reset stats, 's' to save current frame
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    self._log("User requested quit")
                    break
                elif key == ord('r'):
                    self.tracker.reset()
                    self._log("Statistics reset")
                elif key == ord('s'):
                    # 保存当前帧为图片
                    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                    screenshot_path = f"screenshot_{timestamp}.jpg"
                    cv2.imwrite(screenshot_path, vis)
                    self._log(f"📸 Screenshot saved: {screenshot_path}")

        except KeyboardInterrupt:
            self._log("Interrupted by user")
        finally:
            cap.release()
            self.release_video_writer()  # 释放视频写入器
            cv2.destroyAllWindows()
            self._log("Processing finished")

            # Final statistics
            stats = self.tracker.stats
            ok_rate = stats["ok"] / max(stats["total"], 1) * 100
            self._log("=" * 50)
            self._log(f"FINAL STATISTICS:")
            self._log(f"  Total bottles processed: {stats['total']}")
            self._log(f"  OK: {stats['ok']}")
            self._log(f"  NG: {stats['ng']}")
            self._log(f"  OK Rate: {ok_rate:.1f}%")
            self._log("=" * 50)

            return stats


# ──────────────────────────────────────────────────────────────────────────────
# COMMAND LINE INTERFACE
# ──────────────────────────────────────────────────────────────────────────────

def main():
    import argparse

    parser = argparse.ArgumentParser(description="BottleVision - Two-Stage Liquid Level Estimation System")
    parser.add_argument("--video", default=DEFAULT_VIDEO_PATH, help="Path to input video file")
    parser.add_argument("-b", "--bottle-model", default=DEFAULT_BOTTLE_MODEL_PATH,
                        help=f"Path to bottle detection model (default: {DEFAULT_BOTTLE_MODEL_PATH})")
    parser.add_argument("-r", "--roi-model", default=DEFAULT_ROI_MODEL_PATH,
                        help=f"Path to ROI detection model (default: {DEFAULT_ROI_MODEL_PATH})")
    parser.add_argument("-t", "--target-level", type=int, default=DEFAULT_TARGET_LEVEL,
                        help=f"Target liquid level Y coordinate (default: {DEFAULT_TARGET_LEVEL})")
    parser.add_argument("--conf", type=float, default=DEFAULT_CONF,
                        help=f"Detection confidence threshold (default: {DEFAULT_CONF})")
    parser.add_argument("--iou", type=float, default=DEFAULT_IOU,
                        help=f"IoU threshold for NMS (default: {DEFAULT_IOU})")
    parser.add_argument("--tolerance", type=int, default=DEFAULT_TOLERANCE,
                        help=f"Tolerance for liquid level deviation in pixels (default: {DEFAULT_TOLERANCE})")
    parser.add_argument("--gauss-kernel", type=int, default=DEFAULT_GAUSS_KERNEL,
                        help=f"Gaussian blur kernel size (default: {DEFAULT_GAUSS_KERNEL})")
    parser.add_argument("--gradient-thr", type=int, default=DEFAULT_GRADIENT_THR,
                        help=f"Gradient threshold for liquid level detection (default: {DEFAULT_GRADIENT_THR})")
    parser.add_argument("--detect-line", type=float, default=DEFAULT_DETECT_LINE,
                        help=f"Detection line position as fraction of frame width (default: {DEFAULT_DETECT_LINE})")
    parser.add_argument("--no-ids", action="store_true",
                        help="Hide track IDs and ROI annotations")
    parser.add_argument("--save-video", default=DEFAULT_OUTPUT_VIDEO,
                        help="Save the output video with visualizations")
    parser.add_argument("--output-video", default=None,
                        help="Path to save the output video (default: auto-generated)")

    args = parser.parse_args()

    # Validate input video
    if not os.path.exists(args.video):
        print(f"Error: Video file not found: {args.video}")
        return 1

    # Validate models
    if not os.path.exists(args.bottle_model):
        print(f"Error: Bottle model not found: {args.bottle_model}")
        return 1
    if not os.path.exists(args.roi_model):
        print(f"Error: ROI model not found: {args.roi_model}")
        return 1

    # Create processor and run
    processor = VideoProcessor(
        video_path=args.video,
        bottle_model_path=args.bottle_model,
        roi_model_path=args.roi_model,
        target_liquid_level=args.target_level,
        conf=args.conf,
        iou=args.iou,
        tolerance=args.tolerance,
        gauss_kernel=args.gauss_kernel,
        gradient_threshold=args.gradient_thr,
        detect_line_fraction=args.detect_line,
        show_ids=not args.no_ids,
        output_video=args.output_video,
        save_video=args.save_video,
    )

    try:
        stats = processor.run()
        return 0 if stats["total"] > 0 else 1
    except Exception as e:
        print(f"Error: {e}")
        return 1


if __name__ == "__main__":
    exit(main())

4结语

参考代码:https://github.com/Shamlan321/BTLES

本文代码使用了参考代码中的液位检测算法process_roi()、detect_liquid_level(),以及对BottleTracker做了改进。

通过两阶段检测计算出液位高度,判断液位是否合格。第二阶段检测可以扩展为瓶盖、标签、液位等多目标检测。通过训练新的第二阶段模型以及修改结果判定可以实现。

参考代码中有完整的界面设计可以作为参考。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐