YOLOv11 目标检测闭环项目:从环境自检到训练、验证、预测和监督报告

摘要

本文整理一个可复现的 YOLOv11 目标检测闭环项目,覆盖环境初始化、演示数据生成、训练、验证、预测、指标阈值判断、JSON/Markdown 报告输出以及 systemd 定时健康检查。项目统一使用 main.py 作为入口,所有运行产物都可重新生成,源码不绑定任何个人本地路径,适合学习、二次开发和部署前自检。

一、项目背景

很多 YOLO 示例只给出一条训练命令或一段预测代码,真正落地时还会遇到依赖不完整、数据目录不规范、模型路径混乱、运行后没有监督报告、失败时不好定位等问题。

这个项目的目标是把 YOLOv11 做成一个小型但完整的闭环工程:先检查环境和数据,再训练或加载模型,然后验证、预测、记录指标,最后输出结构化报告。这样既能用于学习,也方便后续接入定时任务、CI 或运维监控。

二、功能介绍

  • 统一入口:根目录 main.py 提供 demo-datapreflighttrainvalpredictcycle 等命令。
  • 环境自检:检查 Python、依赖、配置文件、数据集目录、模型来源和样例输入。
  • 演示数据:自动生成一套最小 YOLO 格式图片和标签,用于验证流程。
  • 训练验证预测:基于 Ultralytics YOLO 接口执行训练、验证和推理。
  • 闭环监督:把验证指标和推理耗时与阈值配置对比,不达标时返回非零退出码。
  • 报告输出:生成 reports/*.jsonreports/cycle_latest.md
  • 权重管理:可使用 yolo11n.pt 自动下载,也可提前下载到 weights/
  • 摄像头预览:提供基于 GStreamer 和 OpenCV 的实时预览脚本,适合 Linux UVC 摄像头。
  • 定时健康检查:提供 systemd --user timer/service 模板。

三、技术选型

模块 选型 说明
目标检测 Ultralytics YOLOv11 训练、验证、预测接口统一
深度学习框架 PyTorch 默认 CPU 版,GPU 可自行切换
配置格式 YAML 易读、易改、适合工程参数
报告格式 JSON + Markdown JSON 便于机器读取,Markdown 便于人工查看
图像处理 OpenCV / Pillow 生成演示图、摄像头显示和图像处理
摄像头 GStreamer + OpenCV Linux 下绕开部分 OpenCV 采集兼容问题
定时监督 systemd timer 适合 Linux 本地周期检查

四、依赖说明

  • Python 版本:建议 Python 3.8 或以上。
  • Python 依赖:requirements.txt 中包含 Ultralytics、PyYAML、OpenCV、Pillow、tqdm。
  • PyTorch:由 scripts/bootstrap.sh 单独安装,默认安装 CPU 版,避免无 GPU 机器误装大型 CUDA 运行时。
  • 模型权重:默认 yolo11n.pt 首次运行自动下载,也可执行 bash scripts/fetch_weights.sh 提前放入 weights/
  • 摄像头预览:Linux 上建议安装 python3-gigstreamer1.0-toolsgstreamer1.0-plugins-good
  • 运行产物:runs/reports/logs/ 都是可再生成目录,不建议提交到仓库。

五、开发思路

项目按“自检 -> 执行 -> 评估 -> 报告 -> 可监督”的思路设计。所有路径都从项目根目录解析,避免写死私人机器路径。训练后如果生成了 best.pt,闭环会自动把该权重传给验证和预测步骤,避免出现“训练是一个模型,验证预测却是另一个模型”的假闭环。

目录设计如下:

yolov11/
  main.py
  requirements.txt
  configs/
  data/
  scripts/
  src/yolov11/
  monitor/
  weights/
  runs/
  reports/
  logs/

六、完整源码

下面是项目完整文本源码。二进制权重和运行产物不属于源码,运行时会自动下载或重新生成。

requirements.txt

ultralytics>=8.3.0,<9.0.0
PyYAML>=6.0
opencv-python>=4.8.0
Pillow>=10.0.0
tqdm>=4.66.0

.gitignore

.codex
.venv/
.ultralytics/
.matplotlib/
yolo11n.pt
*.corrupt.partial
weights/*.pt
__pycache__/
*.pyc
*.pyo
*.pyd
.pytest_cache/
runs/
logs/
reports/
!runs/.gitkeep
!logs/.gitkeep
!reports/.gitkeep
*.log

Makefile

PYTHON := python3
CONFIG := configs/pipeline.yaml

.PHONY: bootstrap demo-data preflight preflight-strict train val predict cycle cycle-no-train cycle-demo

bootstrap:
	bash scripts/bootstrap.sh

demo-data:
	$(PYTHON) main.py demo-data

preflight:
	$(PYTHON) main.py preflight --config $(CONFIG)

preflight-strict:
	$(PYTHON) main.py preflight --config $(CONFIG) --strict

train:
	$(PYTHON) main.py train --config $(CONFIG)

val:
	$(PYTHON) main.py val --config $(CONFIG)

predict:
	$(PYTHON) main.py predict --config $(CONFIG)

cycle:
	$(PYTHON) main.py cycle --config $(CONFIG)

cycle-no-train:
	$(PYTHON) main.py cycle --config $(CONFIG) --skip-train

cycle-demo:
	$(PYTHON) main.py cycle --config configs/demo_pipeline.yaml

main.py

#!/usr/bin/env python3
import argparse
import json
import os
import sys
from pathlib import Path


ROOT = Path(__file__).resolve().parent
SRC_DIR = ROOT / "src"
if str(SRC_DIR) not in sys.path:
    sys.path.insert(0, str(SRC_DIR))

os.environ.setdefault("YOLO_CONFIG_DIR", str(ROOT / ".ultralytics"))
os.environ.setdefault("MPLCONFIGDIR", str(ROOT / ".matplotlib"))

from yolov11.pipeline import (  # noqa: E402
    load_config,
    preflight,
    run_cycle,
    run_predict,
    run_train,
    run_val,
)


def print_json(payload):
    print(json.dumps(payload, ensure_ascii=False, indent=2))


def generate_demo_data() -> int:
    script = ROOT / "scripts" / "generate_demo_dataset.py"
    namespace = {"__name__": "__main__", "__file__": str(script)}
    exec(compile(script.read_text(encoding="utf-8"), str(script), "exec"), namespace)
    return 0


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="YOLOv11 closed-loop project entry")
    parser.add_argument(
        "command",
        choices=["demo-data", "preflight", "train", "val", "predict", "cycle"],
        help="Command to run",
    )
    parser.add_argument("--config", default="configs/pipeline.yaml", help="Pipeline config path")
    parser.add_argument("--strict", action="store_true", help="Fail when preflight checks fail")
    parser.add_argument("--skip-train", action="store_true", help="Skip training inside cycle")
    return parser


def main() -> int:
    args = build_parser().parse_args()

    if args.command == "demo-data":
        return generate_demo_data()

    config = load_config((ROOT / args.config).resolve() if not Path(args.config).is_absolute() else Path(args.config))

    if args.command == "preflight":
        payload = preflight(config, strict=args.strict)
        print_json(payload)
        return 1 if args.strict and payload["status"] != "pass" else 0
    if args.command == "train":
        print_json(run_train(config))
        return 0
    if args.command == "val":
        print_json(run_val(config))
        return 0
    if args.command == "predict":
        print_json(run_predict(config))
        return 0
    if args.command == "cycle":
        payload = run_cycle(config, skip_train=args.skip_train)
        print_json(payload)
        return 0 if payload["status"] == "pass" else 1

    return 1


if __name__ == "__main__":
    raise SystemExit(main())

configs/pipeline.yaml

project_name: yolov11-closed-loop
run_name: baseline

# 可直接使用 yolo11n.pt,首次运行时 Ultralytics 会自动下载。
# 如果已经提前下载,也可改成 weights/yolo11n.pt。
model: yolo11n.pt
data: data/dataset.yaml
thresholds: configs/thresholds.yaml
sample_source: data/images/test

device: cpu
imgsz: 640
epochs: 50
batch: 8
workers: 2
patience: 20
conf: 0.25
iou: 0.45

train_enabled: true
export_enabled: false
export_format: onnx

run_dir: runs
report_dir: reports
alert_command: null

configs/thresholds.yaml

min_map50: 0.50
min_precision: 0.50
min_recall: 0.50
max_inference_ms: 500.0
min_prediction_files: 1

configs/demo_pipeline.yaml

project_name: yolov11-demo-closed-loop
run_name: demo

model: yolo11n.yaml
data: data/dataset.yaml
thresholds: configs/demo_thresholds.yaml
sample_source: data/images/test

device: cpu
imgsz: 640
epochs: 1
batch: 1
workers: 0
patience: 1
conf: 0.01
iou: 0.45

train_enabled: true
export_enabled: false
export_format: onnx

run_dir: runs
report_dir: reports
alert_command: null

configs/demo_thresholds.yaml

min_map50: 0.0
min_precision: 0.0
min_recall: 0.0
max_inference_ms: 10000.0
min_prediction_files: 1

data/dataset.yaml

path: data
train: images/train
val: images/val
test: images/test

names:
  0: target

scripts/bootstrap.sh

#!/usr/bin/env bash
set -euo pipefail

ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
cd "$ROOT_DIR"
export YOLO_CONFIG_DIR="$ROOT_DIR/.ultralytics"
export MPLCONFIGDIR="$ROOT_DIR/.matplotlib"
mkdir -p "$YOLO_CONFIG_DIR"
mkdir -p "$MPLCONFIGDIR"
CONFIG_PATH="${YOLOV11_CONFIG:-configs/pipeline.yaml}"

python3 -m venv .venv
source .venv/bin/activate

python -m pip install --upgrade pip setuptools wheel

TORCH_FLAVOR="${YOLOV11_TORCH_FLAVOR:-cpu}"
if [[ "$TORCH_FLAVOR" == "cpu" ]]; then
  python -m pip install --index-url https://download.pytorch.org/whl/cpu torch==2.4.1+cpu torchvision==0.19.1+cpu
elif [[ "$TORCH_FLAVOR" == "cu121" ]]; then
  python -m pip install --index-url https://download.pytorch.org/whl/cu121 torch==2.4.1 torchvision==0.19.1
else
  echo "Unsupported YOLOV11_TORCH_FLAVOR: $TORCH_FLAVOR"
  echo "Use one of: cpu, cu121"
  exit 1
fi

python -m pip install -r requirements.txt

python -m compileall main.py src scripts
python main.py preflight --config "$CONFIG_PATH"

echo "Bootstrap complete. Activate with: source $ROOT_DIR/.venv/bin/activate"

scripts/run_cycle.sh

#!/usr/bin/env bash
set -euo pipefail

ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
cd "$ROOT_DIR"
export YOLO_CONFIG_DIR="$ROOT_DIR/.ultralytics"
export MPLCONFIGDIR="$ROOT_DIR/.matplotlib"
mkdir -p "$YOLO_CONFIG_DIR"
mkdir -p "$MPLCONFIGDIR"
CONFIG_PATH="${YOLOV11_CONFIG:-configs/pipeline.yaml}"

if [[ -f ".venv/bin/activate" ]]; then
  source .venv/bin/activate
fi

python3 main.py cycle --config "$CONFIG_PATH" "$@"

scripts/healthcheck.sh

#!/usr/bin/env bash
set -euo pipefail

ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
cd "$ROOT_DIR"
export YOLO_CONFIG_DIR="$ROOT_DIR/.ultralytics"
export MPLCONFIGDIR="$ROOT_DIR/.matplotlib"
mkdir -p "$YOLO_CONFIG_DIR"
mkdir -p "$MPLCONFIGDIR"
CONFIG_PATH="${YOLOV11_CONFIG:-configs/pipeline.yaml}"

if [[ -f ".venv/bin/activate" ]]; then
  source .venv/bin/activate
fi

python3 main.py preflight --config "$CONFIG_PATH" --strict "$@"

scripts/fetch_weights.sh

#!/usr/bin/env bash
set -euo pipefail

ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
cd "$ROOT_DIR"
export YOLO_CONFIG_DIR="$ROOT_DIR/.ultralytics"
mkdir -p "$YOLO_CONFIG_DIR" weights

if [[ -f ".venv/bin/activate" ]]; then
  source .venv/bin/activate
fi

MODEL_NAME="${1:-yolo11n.pt}"

python3 - <<'PY' "$MODEL_NAME" "$ROOT_DIR"
import os
import shutil
import sys
import tempfile
from pathlib import Path

from ultralytics import YOLO

model_name = sys.argv[1]
root_dir = Path(sys.argv[2])

weights_dir = root_dir / "weights"
weights_dir.mkdir(parents=True, exist_ok=True)

with tempfile.TemporaryDirectory(prefix="yolov11-weights-") as tmp:
    os.chdir(tmp)
    model = YOLO(model_name)
    ckpt = Path(getattr(model, "ckpt_path", ""))
    if not ckpt.exists():
        raise SystemExit(f"download failed: {model_name}")
    dest = weights_dir / Path(model_name).name
    shutil.copy2(ckpt, dest)
print(dest)
PY

scripts/generate_demo_dataset.py

#!/usr/bin/env python3
import struct
import zlib
from pathlib import Path


ROOT = Path(__file__).resolve().parents[1]
DATA_DIR = ROOT / "data"


def png_chunk(chunk_type: bytes, data: bytes) -> bytes:
    return (
        struct.pack(">I", len(data))
        + chunk_type
        + data
        + struct.pack(">I", zlib.crc32(chunk_type + data) & 0xFFFFFFFF)
    )


def make_png(width: int, height: int) -> bytes:
    rows = []
    for y in range(height):
        row = bytearray()
        for x in range(width):
            if 40 <= x <= 216 and 60 <= y <= 196:
                row.extend((220, 40, 40))
            else:
                row.extend((240, 240, 240))
        rows.append(b"\x00" + bytes(row))

    raw = b"".join(rows)
    header = b"\x89PNG\r\n\x1a\n"
    ihdr = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0)
    idat = zlib.compress(raw, level=9)
    return header + png_chunk(b"IHDR", ihdr) + png_chunk(b"IDAT", idat) + png_chunk(b"IEND", b"")


def ensure_dirs() -> None:
    for split in ("train", "val", "test"):
        (DATA_DIR / "images" / split).mkdir(parents=True, exist_ok=True)
        (DATA_DIR / "labels" / split).mkdir(parents=True, exist_ok=True)


def write_sample(split: str) -> None:
    image_path = DATA_DIR / "images" / split / f"{split}_sample.png"
    label_path = DATA_DIR / "labels" / split / f"{split}_sample.txt"

    image_path.write_bytes(make_png(256, 256))
    # class x_center y_center width height
    label_path.write_text("0 0.5 0.5 0.691406 0.535156\n", encoding="utf-8")


def main() -> None:
    ensure_dirs()
    for split in ("train", "val", "test"):
        write_sample(split)
    print("Demo dataset generated under data/")


if __name__ == "__main__":
    main()

scripts/webcam_live_view.py

#!/usr/bin/env python3
import argparse
import os
import sys
import tempfile
import time
from pathlib import Path

ROOT = Path(__file__).resolve().parents[1]
os.environ.setdefault("YOLO_CONFIG_DIR", str(ROOT / ".ultralytics"))
os.environ.setdefault("MPLCONFIGDIR", str(ROOT / ".matplotlib"))
VENV_SITE = ROOT / ".venv" / "lib" / f"python{sys.version_info.major}.{sys.version_info.minor}" / "site-packages"
if VENV_SITE.exists() and str(VENV_SITE) not in sys.path:
    sys.path.insert(0, str(VENV_SITE))

import cv2
import gi
import numpy as np
gi.require_version("Gst", "1.0")
from gi.repository import Gst
from ultralytics import YOLO


class GstCamera:
    def __init__(self, device: str, width: int, height: int) -> None:
        Gst.init(None)
        pipeline_str = (
            f"v4l2src device={device} ! "
            f"image/jpeg,width={width},height={height} ! "
            "jpegdec ! videoconvert ! video/x-raw,format=BGR ! "
            "appsink name=sink emit-signals=false sync=false max-buffers=1 drop=true"
        )
        self.pipeline = Gst.parse_launch(pipeline_str)
        self.sink = self.pipeline.get_by_name("sink")
        if self.sink is None:
            raise RuntimeError("failed to create appsink")
        self.bus = self.pipeline.get_bus()

        ret = self.pipeline.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            raise RuntimeError("failed to start camera pipeline")
        time.sleep(0.3)

    def read(self) -> np.ndarray:
        sample = None
        for _ in range(10):
            sample = self.sink.emit("try-pull-sample", 500_000_000)
            if sample is not None:
                break
            message = self.bus.pop_filtered(Gst.MessageType.ERROR | Gst.MessageType.EOS)
            if message and message.type == Gst.MessageType.ERROR:
                err, debug = message.parse_error()
                raise RuntimeError(f"{err}: {debug}")
            time.sleep(0.05)
        if sample is None:
            raise RuntimeError("failed to pull sample from camera")

        buffer = sample.get_buffer()
        caps = sample.get_caps()
        structure = caps.get_structure(0)
        width = structure.get_value("width")
        height = structure.get_value("height")

        ok, map_info = buffer.map(Gst.MapFlags.READ)
        if not ok:
            raise RuntimeError("failed to map camera buffer")
        try:
            frame = np.frombuffer(map_info.data, dtype=np.uint8).reshape((height, width, 3)).copy()
        finally:
            buffer.unmap(map_info)
        return frame

    def close(self) -> None:
        self.pipeline.set_state(Gst.State.NULL)


def main() -> int:
    parser = argparse.ArgumentParser(description="Live webcam object detection preview via GStreamer snapshots")
    parser.add_argument("--device", default="/dev/video0")
    parser.add_argument("--model", default=str(ROOT / "weights" / "yolo11n.pt"))
    parser.add_argument("--width", type=int, default=640)
    parser.add_argument("--height", type=int, default=480)
    parser.add_argument("--imgsz", type=int, default=320)
    parser.add_argument("--conf", type=float, default=0.35)
    parser.add_argument("--infer-every", type=int, default=2)
    parser.add_argument("--title", default="YOLOv11 Webcam Preview")
    args = parser.parse_args()

    model = YOLO(args.model)
    camera = GstCamera(args.device, args.width, args.height)
    window_name = args.title
    temp_frame = Path(tempfile.NamedTemporaryFile(prefix="yolov11-webcam-", suffix=".jpg", delete=False).name)
    cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
    last_plotted = None
    frame_index = 0

    try:
        while True:
            started = time.time()
            frame = camera.read()

            should_infer = frame_index % max(args.infer_every, 1) == 0 or last_plotted is None
            if should_infer:
                ok = cv2.imwrite(str(temp_frame), frame)
                if not ok:
                    raise RuntimeError("failed to write temporary webcam frame")
                results = model.predict(source=str(temp_frame), imgsz=args.imgsz, conf=args.conf, verbose=False)
                plotted = results[0].plot()
                last_plotted = plotted
            else:
                plotted = frame.copy() if last_plotted is None else last_plotted.copy()

            fps = 1.0 / max(time.time() - started, 1e-6)
            cv2.putText(
                plotted,
                f"{Path(args.device).name} | imgsz={args.imgsz} | conf={args.conf:.2f} | infer_every={args.infer_every} | {fps:.2f} FPS-ish | q/ESC quit",
                (10, 28),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.55,
                (0, 255, 255),
                2,
                cv2.LINE_AA,
            )
            cv2.imshow(window_name, plotted)
            key = cv2.waitKey(1) & 0xFF
            if key in (27, ord("q")):
                break
            frame_index += 1
    finally:
        camera.close()
        temp_frame.unlink(missing_ok=True)
        cv2.destroyAllWindows()

    return 0


if __name__ == "__main__":
    raise SystemExit(main())

src/yolov11/__init__.py

__all__ = ["pipeline"]

src/yolov11/pipeline.py

import argparse
import importlib.util
import json
import os
import shlex
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import yaml


REPO_ROOT = Path(__file__).resolve().parents[2]
os.environ.setdefault("YOLO_CONFIG_DIR", str(REPO_ROOT / ".ultralytics"))
os.environ.setdefault("MPLCONFIGDIR", str(REPO_ROOT / ".matplotlib"))


def now_iso() -> str:
    return datetime.now().astimezone().isoformat(timespec="seconds")


def load_yaml(path: Path) -> Dict[str, Any]:
    with path.open("r", encoding="utf-8") as handle:
        data = yaml.safe_load(handle) or {}
    if not isinstance(data, dict):
        raise ValueError(f"YAML must contain a mapping: {path}")
    return data


def resolve_path(value: str) -> Path:
    path = Path(value)
    if path.is_absolute():
        return path
    return REPO_ROOT / path


def ensure_dir(path: Path) -> Path:
    path.mkdir(parents=True, exist_ok=True)
    return path


def write_json(path: Path, payload: Dict[str, Any]) -> None:
    ensure_dir(path.parent)
    with path.open("w", encoding="utf-8") as handle:
        json.dump(payload, handle, ensure_ascii=False, indent=2)


def write_markdown(path: Path, content: str) -> None:
    ensure_dir(path.parent)
    path.write_text(content, encoding="utf-8")


def module_exists(name: str) -> bool:
    return importlib.util.find_spec(name) is not None


def is_builtin_model_name(model_ref: str) -> bool:
    return "/" not in model_ref and "\\" not in model_ref


def count_files(path: Path, suffixes: Tuple[str, ...]) -> int:
    if not path.exists():
        return 0
    total = 0
    for candidate in path.rglob("*"):
        if candidate.is_file() and candidate.suffix.lower() in suffixes:
            total += 1
    return total


def list_missing_dataset_items(dataset_cfg: Dict[str, Any]) -> List[str]:
    missing: List[str] = []
    dataset_root = resolve_path(str(dataset_cfg.get("path", "data")))
    for split in ("train", "val", "test"):
        rel_path = dataset_cfg.get(split)
        if not rel_path:
            missing.append(f"missing dataset split key: {split}")
            continue
        split_dir = dataset_root / rel_path
        if not split_dir.exists():
            missing.append(f"missing path: {split_dir}")
    return missing


def record_check(name: str, ok: bool, detail: str) -> Dict[str, Any]:
    return {"name": name, "ok": ok, "detail": detail}


def load_config(config_path: Path) -> Dict[str, Any]:
    config = load_yaml(config_path)
    config["_config_path"] = str(config_path)
    return config


def load_thresholds(config: Dict[str, Any]) -> Dict[str, Any]:
    thresholds_path = resolve_path(str(config["thresholds"]))
    return load_yaml(thresholds_path)


def preflight(config: Dict[str, Any], strict: bool = False) -> Dict[str, Any]:
    dataset_path = resolve_path(str(config["data"]))
    thresholds_path = resolve_path(str(config["thresholds"]))
    sample_source = resolve_path(str(config["sample_source"]))
    report_dir = ensure_dir(resolve_path(str(config.get("report_dir", "reports"))))
    run_dir = ensure_dir(resolve_path(str(config.get("run_dir", "runs"))))
    checks: List[Dict[str, Any]] = []

    py_ok = sys.version_info >= (3, 8)
    checks.append(record_check("python_version", py_ok, sys.version.split()[0]))

    torch_ok = module_exists("torch")
    checks.append(record_check("torch_installed", torch_ok, "torch importable" if torch_ok else "torch missing"))

    ultra_ok = module_exists("ultralytics")
    checks.append(
        record_check(
            "ultralytics_installed",
            ultra_ok,
            "ultralytics importable" if ultra_ok else "ultralytics missing",
        )
    )

    dataset_ok = dataset_path.exists()
    checks.append(record_check("dataset_yaml", dataset_ok, str(dataset_path)))

    thresholds_ok = thresholds_path.exists()
    checks.append(record_check("thresholds_yaml", thresholds_ok, str(thresholds_path)))

    config_model = str(config["model"])
    if is_builtin_model_name(config_model):
        model_ok = True
        model_detail = f"builtin model reference: {config_model}"
    else:
        model_path = resolve_path(config_model)
        model_ok = model_path.exists()
        model_detail = str(model_path)
    checks.append(record_check("model_source", model_ok, model_detail))

    sample_ok = sample_source.exists()
    checks.append(record_check("sample_source", sample_ok, str(sample_source)))

    dataset_summary: Dict[str, Any] = {}
    if dataset_ok:
        dataset_cfg = load_yaml(dataset_path)
        missing_items = list_missing_dataset_items(dataset_cfg)
        dataset_dirs_ok = not missing_items
        checks.append(
            record_check(
                "dataset_paths",
                dataset_dirs_ok,
                "all dataset paths present" if dataset_dirs_ok else "; ".join(missing_items),
            )
        )

        dataset_root = resolve_path(str(dataset_cfg.get("path", "data")))
        for split in ("train", "val", "test"):
            rel_path = dataset_cfg.get(split)
            split_dir = dataset_root / rel_path if rel_path else dataset_root
            dataset_summary[split] = {
                "path": str(split_dir),
                "images": count_files(split_dir, (".jpg", ".jpeg", ".png", ".bmp", ".webp")),
                "labels_dir": str((resolve_path("data") / "labels" / split)),
            }

        val_images = dataset_summary.get("val", {}).get("images", 0)
        test_images = dataset_summary.get("test", {}).get("images", 0)
        checks.append(
            record_check(
                "validation_or_test_images",
                (val_images + test_images) > 0,
                f"val={val_images}, test={test_images}",
            )
        )

    status = "pass" if all(item["ok"] for item in checks) else "fail"
    report = {
        "type": "preflight",
        "status": status,
        "strict": strict,
        "timestamp": now_iso(),
        "config": {
            "config_path": config["_config_path"],
            "run_dir": str(run_dir),
            "report_dir": str(report_dir),
        },
        "checks": checks,
        "dataset_summary": dataset_summary,
    }
    write_json(report_dir / "preflight_latest.json", report)
    return report


def require_ultralytics() -> None:
    if not module_exists("ultralytics"):
        raise RuntimeError("ultralytics is not installed. Run: bash scripts/bootstrap.sh")


def load_builtin_model(model_ref: str):
    from ultralytics import YOLO  # type: ignore

    weights_dir = ensure_dir(resolve_path("weights"))
    candidate = weights_dir / Path(model_ref).name
    if candidate.exists():
        return YOLO(str(candidate))

    previous_cwd = Path.cwd()
    try:
        os.chdir(weights_dir)
        return YOLO(model_ref)
    finally:
        os.chdir(previous_cwd)


def load_model(model_ref: str):
    require_ultralytics()
    from ultralytics import YOLO  # type: ignore

    if is_builtin_model_name(model_ref):
        return load_builtin_model(model_ref)
    model_source = str(resolve_path(model_ref))
    return YOLO(model_source)


def flatten_metric(value: Any) -> Optional[float]:
    if value is None:
        return None
    try:
        return float(value)
    except (TypeError, ValueError):
        return None


def extract_val_metrics(metrics: Any) -> Dict[str, Optional[float]]:
    box = getattr(metrics, "box", None)
    precision = flatten_metric(getattr(box, "p", None))
    recall = flatten_metric(getattr(box, "r", None))
    map50 = flatten_metric(getattr(box, "map50", None))
    map5095 = flatten_metric(getattr(box, "map", None))
    return {
        "precision": precision,
        "recall": recall,
        "map50": map50,
        "map50_95": map5095,
    }


def run_train(config: Dict[str, Any]) -> Dict[str, Any]:
    model = load_model(str(config["model"]))
    report_dir = ensure_dir(resolve_path(str(config.get("report_dir", "reports"))))
    run_name = f'{config.get("run_name", "baseline")}-train'

    results = model.train(
        data=str(resolve_path(str(config["data"]))),
        imgsz=int(config.get("imgsz", 640)),
        epochs=int(config.get("epochs", 50)),
        batch=int(config.get("batch", 8)),
        workers=int(config.get("workers", 2)),
        patience=int(config.get("patience", 20)),
        device=str(config.get("device", "cpu")),
        project=str(resolve_path(str(config.get("run_dir", "runs")))),
        name=run_name,
        exist_ok=True,
    )

    save_dir = getattr(results, "save_dir", None)
    best_weights = None
    last_weights = None
    if save_dir:
        best_candidate = Path(save_dir) / "weights" / "best.pt"
        last_candidate = Path(save_dir) / "weights" / "last.pt"
        best_weights = str(best_candidate) if best_candidate.exists() else None
        last_weights = str(last_candidate) if last_candidate.exists() else None

    payload = {
        "type": "train",
        "status": "pass",
        "timestamp": now_iso(),
        "run_name": run_name,
        "save_dir": str(save_dir) if save_dir else None,
        "best_weights": best_weights,
        "last_weights": last_weights,
    }
    write_json(report_dir / "train_latest.json", payload)
    return payload


def run_val(config: Dict[str, Any], model_ref: Optional[str] = None) -> Dict[str, Any]:
    model = load_model(model_ref or str(config["model"]))
    report_dir = ensure_dir(resolve_path(str(config.get("report_dir", "reports"))))
    run_name = f'{config.get("run_name", "baseline")}-val'

    metrics = model.val(
        data=str(resolve_path(str(config["data"]))),
        imgsz=int(config.get("imgsz", 640)),
        batch=int(config.get("batch", 8)),
        workers=int(config.get("workers", 2)),
        device=str(config.get("device", "cpu")),
        project=str(resolve_path(str(config.get("run_dir", "runs")))),
        name=run_name,
        exist_ok=True,
    )
    summary = extract_val_metrics(metrics)
    payload = {
        "type": "val",
        "status": "pass",
        "timestamp": now_iso(),
        "run_name": run_name,
        "metrics": summary,
    }
    write_json(report_dir / "val_latest.json", payload)
    return payload


def run_predict(config: Dict[str, Any], model_ref: Optional[str] = None) -> Dict[str, Any]:
    model = load_model(model_ref or str(config["model"]))
    report_dir = ensure_dir(resolve_path(str(config.get("report_dir", "reports"))))
    run_name = f'{config.get("run_name", "baseline")}-predict'
    source_path = resolve_path(str(config["sample_source"]))

    results = model.predict(
        source=str(source_path),
        imgsz=int(config.get("imgsz", 640)),
        conf=float(config.get("conf", 0.25)),
        iou=float(config.get("iou", 0.45)),
        device=str(config.get("device", "cpu")),
        project=str(resolve_path(str(config.get("run_dir", "runs")))),
        name=run_name,
        save=True,
        exist_ok=True,
    )

    prediction_dir = resolve_path(str(config.get("run_dir", "runs"))) / run_name
    prediction_files = count_files(prediction_dir, (".jpg", ".jpeg", ".png", ".bmp", ".webp"))

    speeds: List[float] = []
    for item in results:
        speed = getattr(item, "speed", {}) or {}
        total = 0.0
        for key in ("preprocess", "inference", "postprocess"):
            value = speed.get(key)
            if value is not None:
                total += float(value)
        if total > 0:
            speeds.append(total)

    avg_inference_ms = round(sum(speeds) / len(speeds), 3) if speeds else None
    payload = {
        "type": "predict",
        "status": "pass",
        "timestamp": now_iso(),
        "run_name": run_name,
        "source": str(source_path),
        "prediction_dir": str(prediction_dir),
        "prediction_files": prediction_files,
        "avg_inference_ms": avg_inference_ms,
    }
    write_json(report_dir / "predict_latest.json", payload)
    return payload


def export_model(config: Dict[str, Any], model_ref: Optional[str] = None) -> Optional[Dict[str, Any]]:
    if not bool(config.get("export_enabled", False)):
        return None

    model = load_model(model_ref or str(config["model"]))
    export_format = str(config.get("export_format", "onnx"))
    exported = model.export(format=export_format)
    report_dir = ensure_dir(resolve_path(str(config.get("report_dir", "reports"))))
    payload = {
        "type": "export",
        "status": "pass",
        "timestamp": now_iso(),
        "format": export_format,
        "output": str(exported),
    }
    write_json(report_dir / "export_latest.json", payload)
    return payload


def evaluate_thresholds(
    thresholds: Dict[str, Any],
    val_report: Dict[str, Any],
    predict_report: Dict[str, Any],
) -> Tuple[str, List[Dict[str, Any]]]:
    metrics = val_report.get("metrics", {})
    checks = [
        record_check(
            "min_map50",
            (metrics.get("map50") or 0.0) >= float(thresholds.get("min_map50", 0.0)),
            f'got={metrics.get("map50")}, expected>={thresholds.get("min_map50")}',
        ),
        record_check(
            "min_precision",
            (metrics.get("precision") or 0.0) >= float(thresholds.get("min_precision", 0.0)),
            f'got={metrics.get("precision")}, expected>={thresholds.get("min_precision")}',
        ),
        record_check(
            "min_recall",
            (metrics.get("recall") or 0.0) >= float(thresholds.get("min_recall", 0.0)),
            f'got={metrics.get("recall")}, expected>={thresholds.get("min_recall")}',
        ),
        record_check(
            "max_inference_ms",
            predict_report.get("avg_inference_ms") is not None
            and float(predict_report["avg_inference_ms"]) <= float(thresholds.get("max_inference_ms", 999999.0)),
            f'got={predict_report.get("avg_inference_ms")}, expected<={thresholds.get("max_inference_ms")}',
        ),
        record_check(
            "min_prediction_files",
            int(predict_report.get("prediction_files", 0)) >= int(thresholds.get("min_prediction_files", 1)),
            f'got={predict_report.get("prediction_files")}, expected>={thresholds.get("min_prediction_files")}',
        ),
    ]
    status = "pass" if all(item["ok"] for item in checks) else "fail"
    return status, checks


def run_alert(command: str, cycle_report_path: Path) -> Dict[str, Any]:
    try:
        result = subprocess.run(
            shlex.split(command) + [str(cycle_report_path)],
            check=False,
            capture_output=True,
            text=True,
        )
        return {
            "command": command,
            "returncode": result.returncode,
            "stdout": result.stdout.strip(),
            "stderr": result.stderr.strip(),
        }
    except Exception as exc:
        return {"command": command, "returncode": -1, "stderr": str(exc)}


def render_cycle_markdown(report: Dict[str, Any]) -> str:
    lines = [
        "# YOLOv11 Cycle Report",
        "",
        f"- Timestamp: {report['timestamp']}",
        f"- Status: {report['status']}",
        "",
        "## Threshold Checks",
    ]
    for check in report.get("threshold_checks", []):
        flag = "PASS" if check["ok"] else "FAIL"
        lines.append(f"- [{flag}] {check['name']}: {check['detail']}")

    lines.extend(
        [
            "",
            "## Validation Metrics",
            f"- mAP50: {report.get('val_report', {}).get('metrics', {}).get('map50')}",
            f"- Precision: {report.get('val_report', {}).get('metrics', {}).get('precision')}",
            f"- Recall: {report.get('val_report', {}).get('metrics', {}).get('recall')}",
            "",
            "## Prediction",
            f"- Avg inference ms: {report.get('predict_report', {}).get('avg_inference_ms')}",
            f"- Prediction files: {report.get('predict_report', {}).get('prediction_files')}",
        ]
    )
    return "\n".join(lines) + "\n"


def run_cycle(config: Dict[str, Any], skip_train: bool = False) -> Dict[str, Any]:
    report_dir = ensure_dir(resolve_path(str(config.get("report_dir", "reports"))))
    preflight_report = preflight(config, strict=True)
    if preflight_report["status"] != "pass":
        cycle_report = {
            "type": "cycle",
            "status": "fail",
            "timestamp": now_iso(),
            "reason": "preflight failed",
            "preflight_report": preflight_report,
        }
        cycle_path = report_dir / "cycle_latest.json"
        write_json(cycle_path, cycle_report)
        write_markdown(report_dir / "cycle_latest.md", render_cycle_markdown({**cycle_report, "threshold_checks": []}))
        return cycle_report

    train_report = None
    active_model_ref = str(config["model"])
    if bool(config.get("train_enabled", True)) and not skip_train:
        train_report = run_train(config)
        if train_report.get("best_weights"):
            active_model_ref = str(train_report["best_weights"])

    val_report = run_val(config, model_ref=active_model_ref)
    predict_report = run_predict(config, model_ref=active_model_ref)
    export_report = export_model(config, model_ref=active_model_ref)

    thresholds = load_thresholds(config)
    status, threshold_checks = evaluate_thresholds(thresholds, val_report, predict_report)

    cycle_report = {
        "type": "cycle",
        "status": status,
        "timestamp": now_iso(),
        "preflight_report": preflight_report,
        "train_report": train_report,
        "val_report": val_report,
        "predict_report": predict_report,
        "export_report": export_report,
        "threshold_checks": threshold_checks,
    }
    cycle_path = report_dir / "cycle_latest.json"
    write_json(cycle_path, cycle_report)
    write_markdown(report_dir / "cycle_latest.md", render_cycle_markdown(cycle_report))

    alert_command = config.get("alert_command")
    if status != "pass" and alert_command:
        cycle_report["alert"] = run_alert(str(alert_command), cycle_path)
        write_json(cycle_path, cycle_report)

    return cycle_report


def print_json(payload: Dict[str, Any]) -> None:
    print(json.dumps(payload, ensure_ascii=False, indent=2))


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="YOLOv11 closed-loop pipeline")
    parser.add_argument("--config", required=True, help="Path to pipeline config YAML")
    subparsers = parser.add_subparsers(dest="command", required=True)

    preflight_parser = subparsers.add_parser("preflight", help="Run environment and file checks")
    preflight_parser.add_argument("--strict", action="store_true", help="Return non-zero when checks fail")

    subparsers.add_parser("train", help="Train model")
    subparsers.add_parser("val", help="Validate model")
    subparsers.add_parser("predict", help="Run prediction")

    cycle_parser = subparsers.add_parser("cycle", help="Run closed-loop cycle")
    cycle_parser.add_argument("--skip-train", action="store_true", help="Skip train step in cycle")
    return parser


def main() -> int:
    parser = build_parser()
    args = parser.parse_args()
    config_path = resolve_path(args.config)
    config = load_config(config_path)

    try:
        if args.command == "preflight":
            payload = preflight(config, strict=args.strict)
            print_json(payload)
            return 1 if args.strict and payload["status"] != "pass" else 0
        if args.command == "train":
            payload = run_train(config)
            print_json(payload)
            return 0
        if args.command == "val":
            payload = run_val(config)
            print_json(payload)
            return 0
        if args.command == "predict":
            payload = run_predict(config)
            print_json(payload)
            return 0
        if args.command == "cycle":
            payload = run_cycle(config, skip_train=args.skip_train)
            print_json(payload)
            return 0 if payload["status"] == "pass" else 1
        parser.error(f"Unknown command: {args.command}")
    except Exception as exc:
        failure = {
            "type": args.command,
            "status": "fail",
            "timestamp": now_iso(),
            "error": str(exc),
        }
        report_dir = ensure_dir(resolve_path(str(config.get("report_dir", "reports"))))
        write_json(report_dir / f"{args.command}_error_latest.json", failure)
        print_json(failure)
        return 1

    return 1


if __name__ == "__main__":
    raise SystemExit(main())

monitor/yolov11-healthcheck.service

[Unit]
Description=YOLOv11 closed-loop health check
After=network.target

[Service]
Type=oneshot
WorkingDirectory=%h/yolov11
ExecStart=/bin/bash %h/yolov11/scripts/healthcheck.sh
StandardOutput=append:%h/yolov11/logs/healthcheck.log
StandardError=append:%h/yolov11/logs/healthcheck.log

monitor/yolov11-healthcheck.timer

[Unit]
Description=Run YOLOv11 health check every 30 minutes

[Timer]
OnBootSec=2min
OnUnitActiveSec=30min
Unit=yolov11-healthcheck.service
Persistent=true

[Install]
WantedBy=timers.target

monitor/install_systemd.sh

#!/usr/bin/env bash
set -euo pipefail

ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
SYSTEMD_USER_DIR="${HOME}/.config/systemd/user"

mkdir -p "$SYSTEMD_USER_DIR"
cp "$ROOT_DIR/monitor/yolov11-healthcheck.service" "$SYSTEMD_USER_DIR/"
cp "$ROOT_DIR/monitor/yolov11-healthcheck.timer" "$SYSTEMD_USER_DIR/"

systemctl --user daemon-reload
systemctl --user enable --now yolov11-healthcheck.timer

echo "Installed user timer: yolov11-healthcheck.timer"

README.md

# YOLOv11 Closed-Loop Deployment

这是一个从零可落地的 YOLOv11 工程骨架,目标不是只把训练脚本放进去,而是把部署、检测、自检、监督、告警这条链路一次补齐。

## 工程目标

- 一键初始化 Python 环境与依赖
- 提供统一的训练、验证、预测入口
- 提供上线前 `preflight` 自检
- 提供闭环 `cycle` 监督机制
- 输出结构化 JSON/Markdown 报告,方便人工巡检和定时任务接入
- 预留 `systemd` 定时健康检查模板

## 目录说明

- `configs/`: 主配置和阈值配置
- `data/`: 数据集目录和数据集 YAML
- `scripts/`: 一键部署、闭环运行、健康检查脚本
- `src/yolov11/`: Python 主逻辑
- `monitor/`: 定时监督模板
- `weights/`: 模型权重目录
- `reports/`: 闭环报告输出目录
- `logs/`: 运行日志目录

## 快速开始

1. 放入你的数据集:
   - 图片: `data/images/train|val|test`
   - 标签: `data/labels/train|val|test`
   - 如果暂时没有真实数据,可先执行 `python3 main.py demo-data`
2. 放入你的权重:
   - 默认使用 `yolo11n.pt`,首次运行时自动下载
   - 也可以执行 `bash scripts/fetch_weights.sh` 提前下载到 `weights/`
3. 执行初始化:

```bash
cd yolov11
bash scripts/bootstrap.sh
```

4. 运行严格自检:

```bash
python3 main.py preflight --strict
```

5. 运行闭环:

```bash
python3 main.py cycle
```

如果只是验证整套机制本身是否闭环,可先运行演示闭环:

```bash
python3 main.py demo-data
python3 main.py cycle --config configs/demo_pipeline.yaml
```

如果你想把默认模型权重固化到本地,执行:

```bash
bash scripts/fetch_weights.sh
```

## 闭环监督机制

`cycle` 会按下面的顺序执行:

1. 检查 Python、依赖、配置、数据集、样例输入、模型来源是否完整
2. 可选执行训练
3. 执行验证并抽取 `mAP50`、精度、召回率
4. 执行预测并统计推理耗时与产物数量
5. 将结果与 `configs/thresholds.yaml` 对比
6. 生成:
   - `reports/preflight_latest.json`
   - `reports/train_latest.json`
   - `reports/val_latest.json`
   - `reports/predict_latest.json`
   - `reports/cycle_latest.json`
   - `reports/cycle_latest.md`
7. 如果阈值不达标,流程返回非零退出码,便于被 `cron`、`systemd`、CI、运维平台接管

## systemd 定时监督

已经提供模板:

- `monitor/yolov11-healthcheck.service`
- `monitor/yolov11-healthcheck.timer`
- `monitor/install_systemd.sh`

如果当前机器支持 `systemd --user`,执行:

```bash
bash monitor/install_systemd.sh
```

之后可用下面命令查看状态:

```bash
systemctl --user status yolov11-healthcheck.timer
systemctl --user list-timers | grep yolov11
```

## 当前环境注意事项

Python 依赖由 `requirements.txt` 和 `scripts/bootstrap.sh` 管理。

- 基础环境: Python 3.8+
- 核心依赖: Ultralytics、PyTorch、PyYAML、OpenCV、Pillow、tqdm
- 默认策略: `scripts/bootstrap.sh` 安装 CPU 版 PyTorch
- 可选 GPU: 根据 CUDA 版本调整 `YOLOV11_TORCH_FLAVOR`
- 摄像头预览可选系统依赖: `python3-gi`、`gstreamer1.0-tools`、`gstreamer1.0-plugins-good`

第一次闭环前,必须先执行 `scripts/bootstrap.sh` 安装依赖。默认配置走 CPU,不依赖 GPU。

`scripts/bootstrap.sh` 默认安装 CPU 版 PyTorch。
如果后续确认机器有 NVIDIA GPU,再按需执行:

```bash
YOLOV11_TORCH_FLAVOR=cu121 bash scripts/bootstrap.sh
```

演示配置 `configs/demo_pipeline.yaml` 使用 `yolo11n.yaml` 从结构直接训练,不依赖外部权重下载,专门用于验证整套闭环机制是否贯通。

## 统一入口

本项目统一使用根目录 `main.py` 作为功能入口:

```bash
python3 main.py demo-data
python3 main.py preflight --strict
python3 main.py train
python3 main.py val
python3 main.py predict
python3 main.py cycle
```

## 演示数据

如果你现在只是要先把闭环跑通,可执行:

```bash
python3 main.py demo-data
```

这个脚本会生成一套最小的演示图片和标签,用于验证目录结构与流程连通性。它不是生产数据,训练指标也不代表真实效果。

七、运行方式

1. 创建环境并安装依赖

cd yolov11
bash scripts/bootstrap.sh

默认安装 CPU 版 PyTorch。如果需要 CUDA 版,可以根据自己的 CUDA 版本调整 scripts/bootstrap.sh

2. 生成演示数据

python3 main.py demo-data

3. 自检

python3 main.py preflight --strict

4. 跑完整演示闭环

python3 main.py cycle --config configs/demo_pipeline.yaml

执行后会生成训练结果、预测结果和报告文件:

runs/
reports/preflight_latest.json
reports/train_latest.json
reports/val_latest.json
reports/predict_latest.json
reports/cycle_latest.json
reports/cycle_latest.md

5. 跑主配置

python3 main.py cycle

6. 摄像头实时预览

Linux 下如果插入了 UVC 摄像头,可以尝试:

python3 scripts/webcam_live_view.py --device /dev/video0 --model weights/yolo11n.pt --imgsz 320 --conf 0.4 --infer-every 3

如果摄像头被其他程序占用,需要先关闭占用摄像头的程序。CPU 机器上实时检测会比较吃性能,可以降低 --imgsz 或增大 --infer-every

八、扩展方向

  • 接入真实数据集:替换 data/imagesdata/labels,并修改 data/dataset.yaml 中的类别名。
  • 使用更强权重:把 configs/pipeline.yaml 中的 model 改成自己的 weights/best.pt
  • 优化推理速度:导出 ONNX、NCNN、TensorRT,或使用 GPU/NPU。
  • 增强监控:把 reports/cycle_latest.json 接入告警系统。
  • Web 可视化:用 FastAPI/Streamlit 做一个上传图片检测页面。
  • 数据闭环:把低置信度或误检图片自动保存,后续人工复核再回流训练集。

九、总结

这个项目不是只演示一条 YOLO 命令,而是把目标检测工程常见的环境检查、数据准备、训练、验证、预测、阈值判断和报告输出串成一条闭环。它的优点是结构简单、入口统一、可复现,适合作为 YOLOv11 工程化入门模板。真正部署时,需要结合业务场景补充真实数据、调整阈值,并根据硬件条件选择 CPU、GPU 或边缘推理方案。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐