瓶装水液位检测 - 基于 YOLO(ultralytics) + OpenCV图像处理
本文介绍一个基于 YOLO 目标检测模型和 OpenCV 图像处理技术实现的瓶装液位检测方法。
1.前言
1.1yolo源码+环境配置
官方地址:https://github.com/ultralytics/ultralytics
环境配置:https://docs.ultralytics.com/zh/quickstart/
1.2模型训练
需要训练两个模型。
bottle.pt:负责瓶体检测与跟踪
roi_level.pt:负责液位区域定位。
两个模型均通过 Ultralytics YOLO 框架训练,并在推理阶段依次调用。
bottle.pt 数据集和标签如下图:

数据集 标签
roi_level.pt 数据集和标签如下图:
该数据集需要把图片裁剪到瓶身大小。

数据集 标签
1.3源码
https://pan.baidu.com/s/1SIjfKDKldKVd3yWy5pqIhQ?pwd=kmd2
2核心功能
2.1总流程
视频输入 ➡ YOLO瓶体检测与跟踪 ➡ ROI检测 ➡ 液位检测算法 ➡ OK/NG判定 ➡ 统计与可视化输出
2.2瓶子检测与跟踪
通过 bottle_model.pt 模型检测图像中的瓶子并进行跟踪。bottle_detect_and_track()。
class BottleTracker:
def bottle_detect_and_track(self, frame):
"""
Bottle detection and tracking with bottle_model
"""
h, w = frame.shape[:2]
detection_line_fraction = self.detection_line_fraction
self.detection_line_x = int(w * detection_line_fraction)
results = self.bottle_model.track(
frame,
conf=self.conf,
iou=self.iou,
tracker="bytetrack.yaml",
persist=True,
verbose=False,
imgsz=640,
)
newly_processed = []
if results and len(results) > 0:
result = results[0]
if result.boxes is not None and result.boxes.id is not None:
boxes = result.boxes.xyxy.cpu().numpy()
track_ids = result.boxes.id.cpu().numpy().astype(int)
for bbox, track_id in zip(boxes, track_ids):
if self.has_crossed_line(bbox, track_id):
res = self.process_bottle(frame, bbox, track_id)
newly_processed.append(res)
return results, newly_processed
2.3ROI精确定位
在 2.2 基础上裁剪出瓶子区域,在瓶子区域图像中使用 roi_level.pt 模型定位感兴趣区域(ROI)。process_bottle()、detect_objects_in_bottle()。
class BottleTracker:
def detect_objects_in_bottle(self, bottle):
"""
Use roi_model to detect the region of interest in the bottle area
"""
if bottle.size == 0:
return None, None
results = self.roi_model(bottle, conf=self.conf, verbose=False)
if results and len(results) > 0:
result = results[0]
if result.boxes is not None and len(result.boxes) > 0:
boxes = result.boxes.xyxy.cpu().numpy()
confs = result.boxes.conf.cpu().numpy()
best_idx = np.argmax(confs)
roi_bbox = boxes[best_idx].astype(int)
# ROI
x1, y1, x2, y2 = roi_bbox
roi_img = bottle[y1:y2, x1:x2]
return roi_bbox, roi_img
return None, None
def process_bottle(self, frame, bottle_bbox, track_id):
"""
process bottle:
1. Cutting bottle area
2. Use roi_model to detect ROI
3. Detect the liquid level in ROI
"""
x1, y1, x2, y2 = map(int, bottle_bbox)
bottle_img = frame[y1:y2, x1:x2]
# ROI detection in bottle area
roi_bbox, roi_img = self.detect_objects_in_bottle(bottle_img)
liquid_level = None
roi_liquid_level = None
bottle_liquid_level = None
gradient_info = {}
if roi_bbox is not None and roi_img is not None:
# Detect liquid level in ROI
gauss_kernel = self.gauss_kernel
gradient_threshold = self.gradient_threshold
roi_liquid_level, gradient_info = process_roi(roi_img, gauss_kernel, gradient_threshold)
if roi_liquid_level is not None:
bottle_liquid_level = roi_liquid_level + roi_bbox[1]
liquid_level = bottle_liquid_level + y1
classification = self._classify(liquid_level)
status = classification["status"]
result = {
"track_id": track_id,
"roi_bbox": roi_bbox,
"roi_img": roi_img,
"bottle_bbox": (x1, y1, x2, y2),
"bottle_img": bottle_img,
"roi_liquid_level": roi_liquid_level,
"bottle_liquid_level": bottle_liquid_level,
"liquid_level": liquid_level,
"status": status,
"gradient_info": gradient_info,
"classification": classification,
}
self.bottle_results[track_id] = result
self.processed_bottles.add(track_id)
self.stats["total"] += 1
if status == "OK":
self.stats["ok"] += 1
else:
self.stats["ng"] += 1
return result
2.4液位检测算法
在 2.3 基础上,在ROI内检测液位。process_roi()、detect_liquid_level()。
核心算法:detect_liquid_level(),通过分析图像灰度梯度检测液位
原理:
液面处通常存在明显的明暗变化(液体与空气的分界),该位置的水平方向灰度梯度值最大。
步骤:
步骤1:计算每行像素的平均灰度值(垂直强度分布)
步骤2:计算灰度梯度(相邻行的亮度变化)
步骤3:找到梯度最大的位置(亮度变化最剧烈的地方)
正常情况:液位在梯度最大位置的下方
def detect_liquid_level(gray_img, min_gradient_threshold=10):
if gray_img.size == 0 or gray_img.shape[0] < 2:
return None, {}
intensity_profile = np.mean(gray_img, axis=1)
gradients = np.diff(intensity_profile)
abs_gradients = np.abs(gradients)
if len(abs_gradients) == 0:
return None, {}
max_gradient_idx = np.argmax(abs_gradients)
max_gradient_value = abs_gradients[max_gradient_idx]
if max_gradient_value < min_gradient_threshold:
dark_threshold = np.mean(intensity_profile) * 0.7
liquid_region = np.where(intensity_profile < dark_threshold)[0]
if len(liquid_region) > 0:
liquid_level = liquid_region[0]
else:
return None, {
"intensity_profile": intensity_profile,
"gradients": gradients,
"max_gradient_idx": max_gradient_idx,
"max_gradient_value": max_gradient_value,
}
else:
liquid_level = max_gradient_idx + 1
return liquid_level, {
"intensity_profile": intensity_profile,
"gradients": gradients,
"max_gradient_idx": max_gradient_idx,
"max_gradient_value": max_gradient_value,
}
def process_roi(roi_img, gauss_kernel=5, gradient_threshold=10):
if roi_img.size == 0:
return None, None, {}
gray = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY)
k = gauss_kernel if gauss_kernel % 2 == 1 else gauss_kernel + 1
blurred = cv2.GaussianBlur(gray, (k, k), 0)
roi_liquid_level, gradient_info = detect_liquid_level(
blurred, min_gradient_threshold=gradient_threshold
)
return roi_liquid_level, gradient_info
2.5结果判定
根据检测到的液位判断是否合格
偏差 = 实际液位 - 目标液位
偏差在容差范围内即为合格
class BottleTracker:
def _classify(self, liquid_level):
result = {
"status": "NG",
"liquid_present": 0,
"level_ok": 0,
}
if liquid_level is None:
return result
result["liquid_present"] = 1
target_liquid_level_y = self.target_liquid_level_y
tolerance = self.tolerance
deviation = liquid_level - target_liquid_level_y
if deviation <= tolerance:
result["level_ok"] = 1
result["status"] = "OK"
return result
2.6可视化
系统实时显示检测结果:瓶体框、ROI框、液位线、目标液位线、容差带、检测统计信息。visualize()。

OK NG
3完整代码
import os
import cv2
import numpy as np
from datetime import datetime
from ultralytics import YOLO
# ──────────────────────────────────────────────────────────────────────────────
# CONSTANTS / DEFAULTS
# ──────────────────────────────────────────────────────────────────────────────
DEFAULT_BOTTLE_MODEL_PATH = "bottle.pt"
DEFAULT_ROI_MODEL_PATH = "roi_level.pt"
DEFAULT_VIDEO_PATH = "video1.mp4"
DEFAULT_TARGET_LEVEL = 800
DEFAULT_CONF = 0.30
DEFAULT_IOU = 0.45
DEFAULT_TOLERANCE = 20
DEFAULT_GRADIENT_THR = 5
DEFAULT_GAUSS_KERNEL = 5
DEFAULT_DETECT_LINE = 0.50 # fraction of frame width
DEFAULT_OUTPUT_VIDEO = "output_result.mp4" # 默认输出视频文件名
# ──────────────────────────────────────────────────────────────────────────────
# LIQUID LEVEL DETECTION CORE
# ──────────────────────────────────────────────────────────────────────────────
def detect_liquid_level(gray_img, min_gradient_threshold=10):
if gray_img.size == 0 or gray_img.shape[0] < 2:
return None, {}
intensity_profile = np.mean(gray_img, axis=1)
gradients = np.diff(intensity_profile)
abs_gradients = np.abs(gradients)
if len(abs_gradients) == 0:
return None, {}
max_gradient_idx = np.argmax(abs_gradients)
max_gradient_value = abs_gradients[max_gradient_idx]
if max_gradient_value < min_gradient_threshold:
dark_threshold = np.mean(intensity_profile) * 0.7
liquid_region = np.where(intensity_profile < dark_threshold)[0]
if len(liquid_region) > 0:
liquid_level = liquid_region[0]
else:
return None, {
"intensity_profile": intensity_profile,
"gradients": gradients,
"max_gradient_idx": max_gradient_idx,
"max_gradient_value": max_gradient_value,
}
else:
liquid_level = max_gradient_idx + 1
return liquid_level, {
"intensity_profile": intensity_profile,
"gradients": gradients,
"max_gradient_idx": max_gradient_idx,
"max_gradient_value": max_gradient_value,
}
def process_roi(roi_img, gauss_kernel=5, gradient_threshold=10):
if roi_img.size == 0:
return None, None, {}
gray = cv2.cvtColor(roi_img, cv2.COLOR_BGR2GRAY)
k = gauss_kernel if gauss_kernel % 2 == 1 else gauss_kernel + 1
blurred = cv2.GaussianBlur(gray, (k, k), 0)
roi_liquid_level, gradient_info = detect_liquid_level(
blurred, min_gradient_threshold=gradient_threshold
)
return roi_liquid_level, gradient_info
# ──────────────────────────────────────────────────────────────────────────────
# BOTTLE TRACKER
# ──────────────────────────────────────────────────────────────────────────────
class BottleTracker:
"""
1. Use bottle_model to detect bottles and track them
2. Use roi_model to detect ROI for each bottle area
"""
def __init__(self, bottle_model_path, roi_model_path, detection_line_fraction,
target_liquid_level, tolerance, conf, iou,
gauss_kernel, gradient_threshold):
self.bottle_model = YOLO(bottle_model_path)
self.roi_model = YOLO(roi_model_path)
self.detection_line_fraction = detection_line_fraction
self.target_liquid_level_y = target_liquid_level
self.tolerance = tolerance
self.conf = conf
self.iou = iou
self.gauss_kernel = gauss_kernel
self.gradient_threshold = gradient_threshold
self.processed_bottles = set()
self.bottle_results = {}
self.detection_line_x = 0
self.stats = {
"total": 0,
"ok": 0,
"ng": 0,
}
def reset(self):
self.processed_bottles.clear()
self.bottle_results.clear()
self.stats = {
"total": 0,
"ok": 0,
"ng": 0,
}
def has_crossed_line(self, bbox, track_id):
x1, y1, x2, y2 = bbox
center_x = (x1 + x2) / 2
return center_x >= self.detection_line_x and track_id not in self.processed_bottles
def _classify(self, liquid_level):
result = {
"status": "NG",
"liquid_present": 0,
"level_ok": 0,
}
if liquid_level is None:
return result
result["liquid_present"] = 1
target_liquid_level_y = self.target_liquid_level_y
tolerance = self.tolerance
deviation = liquid_level - target_liquid_level_y
if deviation <= tolerance:
result["level_ok"] = 1
result["status"] = "OK"
return result
def detect_objects_in_bottle(self, bottle):
"""
Use roi_model to detect the region of interest in the bottle area
"""
if bottle.size == 0:
return None, None
results = self.roi_model(bottle, conf=self.conf, verbose=False)
if results and len(results) > 0:
result = results[0]
if result.boxes is not None and len(result.boxes) > 0:
boxes = result.boxes.xyxy.cpu().numpy()
confs = result.boxes.conf.cpu().numpy()
best_idx = np.argmax(confs)
roi_bbox = boxes[best_idx].astype(int)
# ROI
x1, y1, x2, y2 = roi_bbox
roi_img = bottle[y1:y2, x1:x2]
return roi_bbox, roi_img
return None, None
def process_bottle(self, frame, bottle_bbox, track_id):
"""
process bottle:
1. Cutting bottle area
2. Use roi_model to detect ROI
3. Detect the liquid level in ROI
"""
x1, y1, x2, y2 = map(int, bottle_bbox)
bottle_img = frame[y1:y2, x1:x2]
# ROI detection in bottle area
roi_bbox, roi_img = self.detect_objects_in_bottle(bottle_img)
liquid_level = None
roi_liquid_level = None
bottle_liquid_level = None
gradient_info = {}
if roi_bbox is not None and roi_img is not None:
# Detect liquid level in ROI
gauss_kernel = self.gauss_kernel
gradient_threshold = self.gradient_threshold
roi_liquid_level, gradient_info = process_roi(roi_img, gauss_kernel, gradient_threshold)
if roi_liquid_level is not None:
bottle_liquid_level = roi_liquid_level + roi_bbox[1]
liquid_level = bottle_liquid_level + y1
classification = self._classify(liquid_level)
status = classification["status"]
result = {
"track_id": track_id,
"roi_bbox": roi_bbox,
"roi_img": roi_img,
"bottle_bbox": (x1, y1, x2, y2),
"bottle_img": bottle_img,
"roi_liquid_level": roi_liquid_level,
"bottle_liquid_level": bottle_liquid_level,
"liquid_level": liquid_level,
"status": status,
"gradient_info": gradient_info,
"classification": classification,
}
self.bottle_results[track_id] = result
self.processed_bottles.add(track_id)
self.stats["total"] += 1
if status == "OK":
self.stats["ok"] += 1
else:
self.stats["ng"] += 1
return result
def bottle_detect_and_track(self, frame):
"""
Bottle detection and tracking with bottle_model
"""
h, w = frame.shape[:2]
detection_line_fraction = self.detection_line_fraction
self.detection_line_x = int(w * detection_line_fraction)
results = self.bottle_model.track(
frame,
conf=self.conf,
iou=self.iou,
tracker="bytetrack.yaml",
persist=True,
verbose=False,
imgsz=640,
)
newly_processed = []
if results and len(results) > 0:
result = results[0]
if result.boxes is not None and result.boxes.id is not None:
boxes = result.boxes.xyxy.cpu().numpy()
track_ids = result.boxes.id.cpu().numpy().astype(int)
for bbox, track_id in zip(boxes, track_ids):
if self.has_crossed_line(bbox, track_id):
res = self.process_bottle(frame, bbox, track_id)
newly_processed.append(res)
return results, newly_processed
def visualize(self, frame, yolo_results, show_ids=True):
vis = frame.copy()
h, w = vis.shape[:2]
target_y = self.target_liquid_level_y
# Target line (green)
cv2.line(vis, (0, target_y), (w, target_y), (0, 220, 150), 2)
cv2.putText(vis, f"TARGET LIQUID LEVEL={target_y}", (10, target_y - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 220, 150), 2)
# Tolerance bands (dashed red)
tolerance = self.tolerance
for tolerance_y in [target_y - tolerance, target_y + tolerance]:
for i in range(0, w, 20):
cv2.line(vis, (i, tolerance_y), (min(i + 10, w), tolerance_y), (60, 60, 220), 2)
# Detection line (cyan vertical)
cv2.line(vis, (self.detection_line_x, 0),
(self.detection_line_x, h), (220, 220, 0), 2)
cv2.putText(vis, "DETECT", (self.detection_line_x + 5, 28),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (220, 220, 0), 2)
# Detections
if yolo_results and len(yolo_results) > 0:
result = yolo_results[0]
if result.boxes is not None and result.boxes.id is not None:
boxes = result.boxes.xyxy.cpu().numpy()
track_ids = result.boxes.id.cpu().numpy().astype(int)
for bbox, track_id in zip(boxes, track_ids):
x1, y1, x2, y2 = map(int, bbox)
if track_id in self.bottle_results:
bottle_result = self.bottle_results[track_id]
status = bottle_result["status"]
color = ((0, 220, 80) if status == "OK"
else (0, 60, 220))
cv2.rectangle(vis, (x1, y1), (x2, y2), color, 2)
roi_bbox = bottle_result.get("roi_bbox")
if roi_bbox is not None:
roi_x1, roi_y1, roi_x2, roi_y2 = roi_bbox
# 转换到原始帧坐标
abs_roi_x1 = x1 + roi_x1
abs_roi_y1 = y1 + roi_y1
abs_roi_x2 = x1 + roi_x2
abs_roi_y2 = y1 + roi_y2
cv2.rectangle(vis, (abs_roi_x1, abs_roi_y1),
(abs_roi_x2, abs_roi_y2), (255, 200, 100), 2)
liquid_level = bottle_result["liquid_level"]
if liquid_level is not None:
cv2.line(vis, (x1, int(liquid_level)), (x2, int(liquid_level)), color, 2)
else:
cv2.rectangle(vis, (x1, y1), (x2, y2), (200, 200, 200), 2)
if show_ids:
label = f"#{track_id}"
if track_id in self.bottle_results:
label += f" {self.bottle_results[track_id]['status']}"
cv2.putText(vis, label, (x1, y1 - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.48, (200, 200, 200), 2)
# Stats overlay
pad = 8
Stats_x, Stats_y = 8, h - 108
cv2.rectangle(vis, (Stats_x, Stats_y - pad), (420, h - pad), (0, 0, 0), -1)
stats = self.stats
cv2.putText(vis,
f"Total:{stats['total']} OK:{stats['ok']} NG:{stats['ng']}",
(Stats_x + 4, Stats_y + 34), cv2.FONT_HERSHEY_SIMPLEX, 1,
(220, 220, 220), 2)
ok_rate = stats["ok"] / max(stats["total"], 1) * 100
cv2.putText(vis, f"OK Rate:{ok_rate:.1f}%",
(Stats_x + 4, Stats_y + 76), cv2.FONT_HERSHEY_SIMPLEX, 1,
(0, 220, 150), 2)
return vis
# ──────────────────────────────────────────────────────────────────────────────
# VIDEO PROCESSOR (Command Line Version)
# ──────────────────────────────────────────────────────────────────────────────
class VideoProcessor:
def __init__(self, video_path, bottle_model_path, roi_model_path,
target_liquid_level, conf, iou, tolerance,
gauss_kernel, gradient_threshold,
detect_line_fraction, show_ids=True,
output_video=None, save_video=False):
self.video_path = video_path
self.bottle_model_path = bottle_model_path
self.roi_model_path = roi_model_path
self.target_liquid_level = target_liquid_level
self.conf = conf
self.iou = iou
self.tolerance = tolerance
self.gauss_kernel = gauss_kernel
self.gradient_threshold = gradient_threshold
self.detect_line_fraction = detect_line_fraction
self.show_ids = show_ids
self.output_video = output_video
self.save_video = save_video
self.tracker = None
self.video_writer = None
def _log(self, msg):
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] {msg}")
def initialize(self):
self._log(f"Loading bottle model: {self.bottle_model_path}")
self._log(f"Loading ROI model: {self.roi_model_path}")
self.tracker = BottleTracker(
bottle_model_path=self.bottle_model_path,
roi_model_path=self.roi_model_path,
detection_line_fraction=self.detect_line_fraction,
target_liquid_level=self.target_liquid_level,
tolerance=self.tolerance,
conf=self.conf,
iou=self.iou,
gauss_kernel=self.gauss_kernel,
gradient_threshold=self.gradient_threshold
)
def setup_video_writer(self, width, height, fps):
if not self.save_video:
return
if self.output_video is None:
base_name = os.path.splitext(os.path.basename(self.video_path))[0]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.output_video = f"{base_name}_result_{timestamp}.mp4"
output_dir = os.path.dirname(self.output_video)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self.video_writer = cv2.VideoWriter(
self.output_video, fourcc, fps, (width, height)
)
if self.video_writer.isOpened():
self._log(f"✓ Output video will be saved to: {self.output_video}")
self._log(f" Resolution: {width}×{height}, FPS: {fps:.2f}")
else:
self._log(f"⚠ Warning: Failed to create video writer for {self.output_video}")
self.save_video = False
def release_video_writer(self):
if self.video_writer is not None:
self.video_writer.release()
self._log(f"✓ Video saved to: {self.output_video}")
def run(self):
if self.tracker is None:
self.initialize()
self._log(f"Opening video: {os.path.basename(self.video_path)}")
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
raise RuntimeError(f"Cannot open video: {self.video_path}")
fps = cap.get(cv2.CAP_PROP_FPS) or 25
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self._log(f"Video info: {width}×{height} @ {fps:.1f} fps")
# 设置视频写入器
self.setup_video_writer(width, height, fps)
frame_count = 0
bottle_detection_results = None
try:
while True:
ret, frame = cap.read()
if not ret:
# Loop video
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
self.tracker.processed_bottles.clear()
self._log("↩ Video loop restarted (tracking IDs reset)")
continue
frame_count += 1
try:
bottle_detection_results, newly = self.tracker.bottle_detect_and_track(frame)
except Exception as e:
self._log(f"⚠ Detection error: {e}")
continue
# Log newly processed bottles
for res in newly:
track_id = res["track_id"]
liquid_level = res["liquid_level"]
status = res["status"]
level_ok = res["classification"]["level_ok"]
deviation = (liquid_level - self.target_liquid_level) if liquid_level is not None else None
dev_str = f" deviation={deviation:+d}px" if deviation is not None else ""
icon = "✅" if status == "OK" else "❌"
self._log(
f"{icon} Bottle #{track_id}: {status}\n"
f"liquid_level={liquid_level if liquid_level else 'N/A'}\n"
f"{dev_str}\n"
f"level_ok={level_ok}\n")
# Visualize
vis = self.tracker.visualize(frame, bottle_detection_results, self.show_ids)
# 保存视频帧
if self.save_video and self.video_writer is not None:
self.video_writer.write(vis)
# Show window
cv2.namedWindow('BottleVision - Liquid Level Detection', cv2.WINDOW_NORMAL)
cv2.resizeWindow('BottleVision - Liquid Level Detection', 450, 800)
cv2.moveWindow('BottleVision - Liquid Level Detection', 500, 100)
cv2.imshow("BottleVision - Liquid Level Detection", vis)
# Print stats every 100 frames
if frame_count % 100 == 0:
stats = self.tracker.stats
ok_rate = stats["ok"] / max(stats["total"], 1) * 100
self._log(
f"📊 Stats - Total: {stats['total']}, OK: {stats['ok']}, NG: {stats['ng']}, OK Rate: {ok_rate:.1f}%")
# Press 'q' to quit, 'r' to reset stats, 's' to save current frame
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
self._log("User requested quit")
break
elif key == ord('r'):
self.tracker.reset()
self._log("Statistics reset")
elif key == ord('s'):
# 保存当前帧为图片
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshot_path = f"screenshot_{timestamp}.jpg"
cv2.imwrite(screenshot_path, vis)
self._log(f"📸 Screenshot saved: {screenshot_path}")
except KeyboardInterrupt:
self._log("Interrupted by user")
finally:
cap.release()
self.release_video_writer() # 释放视频写入器
cv2.destroyAllWindows()
self._log("Processing finished")
# Final statistics
stats = self.tracker.stats
ok_rate = stats["ok"] / max(stats["total"], 1) * 100
self._log("=" * 50)
self._log(f"FINAL STATISTICS:")
self._log(f" Total bottles processed: {stats['total']}")
self._log(f" OK: {stats['ok']}")
self._log(f" NG: {stats['ng']}")
self._log(f" OK Rate: {ok_rate:.1f}%")
self._log("=" * 50)
return stats
# ──────────────────────────────────────────────────────────────────────────────
# COMMAND LINE INTERFACE
# ──────────────────────────────────────────────────────────────────────────────
def main():
import argparse
parser = argparse.ArgumentParser(description="BottleVision - Two-Stage Liquid Level Estimation System")
parser.add_argument("--video", default=DEFAULT_VIDEO_PATH, help="Path to input video file")
parser.add_argument("-b", "--bottle-model", default=DEFAULT_BOTTLE_MODEL_PATH,
help=f"Path to bottle detection model (default: {DEFAULT_BOTTLE_MODEL_PATH})")
parser.add_argument("-r", "--roi-model", default=DEFAULT_ROI_MODEL_PATH,
help=f"Path to ROI detection model (default: {DEFAULT_ROI_MODEL_PATH})")
parser.add_argument("-t", "--target-level", type=int, default=DEFAULT_TARGET_LEVEL,
help=f"Target liquid level Y coordinate (default: {DEFAULT_TARGET_LEVEL})")
parser.add_argument("--conf", type=float, default=DEFAULT_CONF,
help=f"Detection confidence threshold (default: {DEFAULT_CONF})")
parser.add_argument("--iou", type=float, default=DEFAULT_IOU,
help=f"IoU threshold for NMS (default: {DEFAULT_IOU})")
parser.add_argument("--tolerance", type=int, default=DEFAULT_TOLERANCE,
help=f"Tolerance for liquid level deviation in pixels (default: {DEFAULT_TOLERANCE})")
parser.add_argument("--gauss-kernel", type=int, default=DEFAULT_GAUSS_KERNEL,
help=f"Gaussian blur kernel size (default: {DEFAULT_GAUSS_KERNEL})")
parser.add_argument("--gradient-thr", type=int, default=DEFAULT_GRADIENT_THR,
help=f"Gradient threshold for liquid level detection (default: {DEFAULT_GRADIENT_THR})")
parser.add_argument("--detect-line", type=float, default=DEFAULT_DETECT_LINE,
help=f"Detection line position as fraction of frame width (default: {DEFAULT_DETECT_LINE})")
parser.add_argument("--no-ids", action="store_true",
help="Hide track IDs and ROI annotations")
parser.add_argument("--save-video", default=DEFAULT_OUTPUT_VIDEO,
help="Save the output video with visualizations")
parser.add_argument("--output-video", default=None,
help="Path to save the output video (default: auto-generated)")
args = parser.parse_args()
# Validate input video
if not os.path.exists(args.video):
print(f"Error: Video file not found: {args.video}")
return 1
# Validate models
if not os.path.exists(args.bottle_model):
print(f"Error: Bottle model not found: {args.bottle_model}")
return 1
if not os.path.exists(args.roi_model):
print(f"Error: ROI model not found: {args.roi_model}")
return 1
# Create processor and run
processor = VideoProcessor(
video_path=args.video,
bottle_model_path=args.bottle_model,
roi_model_path=args.roi_model,
target_liquid_level=args.target_level,
conf=args.conf,
iou=args.iou,
tolerance=args.tolerance,
gauss_kernel=args.gauss_kernel,
gradient_threshold=args.gradient_thr,
detect_line_fraction=args.detect_line,
show_ids=not args.no_ids,
output_video=args.output_video,
save_video=args.save_video,
)
try:
stats = processor.run()
return 0 if stats["total"] > 0 else 1
except Exception as e:
print(f"Error: {e}")
return 1
if __name__ == "__main__":
exit(main())
4结语
参考代码:https://github.com/Shamlan321/BTLES
本文代码使用了参考代码中的液位检测算法process_roi()、detect_liquid_level(),以及对BottleTracker做了改进。
通过两阶段检测计算出液位高度,判断液位是否合格。第二阶段检测可以扩展为瓶盖、标签、液位等多目标检测。通过训练新的第二阶段模型以及修改结果判定可以实现。
参考代码中有完整的界面设计可以作为参考。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)