若该文为原创文章,转载请注明原文出处。

在B站看到大神使用YOLOV26+LVM多模态视频检测,所以想偿试一下,

基于 YOLOv8 与 Qwen3.5 的多模态视频行为分析系统.

一、背景

随着智慧城市、智能安防、工业巡检等领域的快速发展,传统视频监控系统仅能实现“看得见”,却难以“看得懂”。海量监控视频依赖人工值守,存在效率低、漏报率高、响应滞后等问题。

近年来,计算机视觉与大型语言模型(LLM)技术取得突破性进展。YOLO 系列目标检测算法以其高实时性与准确率成为边缘端视觉感知的主流选择;而多模态大模型(如通义千问 Qwen3.5)具备强大的图文理解与推理能力,能够对检测到的物体、场景进行深度语义分析。将两者融合,可以构建“感知‑理解‑决策”一体化的智能视频分析系统,实现从“目标检测”到“行为理解”的跨越。

本程序正是在此背景下开发:利用 YOLOv8 对视频流进行实时目标检测与跟踪,提取物体类别、位置、轨迹等结构化信息,并调用 Qwen3.5 多模态 API 对关键帧进行行为分析,最终在视频画面上叠加检测框、跟踪 ID 及行为分析文本,输出带智能标注的高清视频。系统适用于校园安全、交通路口、工地监管、居家看护等场景,可有效降低人工监控负担,提升异常行为预警能力。

二、方案

2.1 系统架构

本系统采用 异步解耦、分层处理 的设计思想,整体架构分为三层:

  • 感知层:YOLOv8 目标检测与跟踪模块,逐帧处理视频,输出带跟踪 ID 的物体检测结果。

  • 理解层:异步任务队列 + Qwen3.5 API 调用,对关键帧的结构化检测信息进行语义分析,生成场景描述、异常判断、风险等级及处理建议。

  • 应用层:视频渲染与输出模块,将检测框、跟踪 ID 以及分析文本叠加到原始视频上,支持实时预览与保存为 1080P 高清视频。

系统工作流程如下图所示:

2.2 核心功能模块

模块 技术实现 功能描述
目标检测与跟踪 YOLOv8 + ByteTrack(内置) 检测视频中的人、车、物体等,并为每个目标分配唯一 ID,输出边界框、类别、置信度、跟踪 ID。
多模态行为分析 Qwen3.5‑omni‑flash(阿里云百炼 API) 将检测结果与关键帧图像一同发送给大模型,分析场景是否存在异常行为,返回 JSON 格式的分析结论(场景概括、异常标志、风险等级、详细说明、处理建议)。
中文文本渲染 OpenCV + PIL(Pillow) 解决 OpenCV 原生 putText 不支持中文的问题,使用 TrueType 字体(如黑体)在视频左上角绘制清晰的中文分析文本。
视频处理与输出 OpenCV + 高质量缩放 支持输入任意分辨率视频,输出 1080P(1920×1080)高清视频;使用 INTER_LANCZOS4 插值与 20 Mbps 比特率设置,保证画面清晰。
异步调用与频率控制 threading + Queue 为避免 API 调用阻塞视频处理,采用后台线程队列,每隔 FRAME_SKIP 帧(默认 30 帧)触发一次分析,并限制调用间隔(2 秒),平衡成本与实时性。

2.3 关键技术参数

  • 检测模型:YOLOv8n(可替换为 s/m/l/x 以提升精度)

  • 多模态模型:qwen3.5‑omni‑flash(成本低、速度快)

  • 分析频率:每 30 帧调用一次 API(约 1 秒一次,若视频 30fps)

  • 输出分辨率:1920×1080(保持原始宽高比)

  • 字体大小:主文本 32px,辅助文本 24px

  • 编码格式:H.264(MP4),比特率 20 Mbps

2.4 部署与使用

  1. 环境准备

    • Python 3.9+,安装依赖:pip install ultralytics opencv-python openai python-dotenv Pillow

    • 获取阿里云百炼 API Key,并在 .env 文件中配置 DASHSCOPE_API_KEY

  2. 运行方式

    • 修改 INPUT_VIDEO_PATH 为待分析视频路径

    • 执行 python video_analyzer.py

    • 程序自动下载 YOLOv8 模型,逐帧处理,实时显示预览画面(按 q 可提前终止)

    • 处理完成后在指定路径生成 output_analyzed.mp4

  3. 参数调优

    • FRAME_SKIP:增大可降低 API 调用成本,减小可提高分析密度

    • CONF_THRESHOLD:调整检测置信度阈值,过滤低质量框

    • TARGET_WIDTH:可改为 1280 等其他宽度

    • FONT_PATH:根据操作系统修改中文字体路径

2.5 应用示例

  • 校园安全:检测儿童在危险区域(如马路、水池边)玩耍,及时预警。

  • 工地监管:识别工人未戴安全帽、闯入禁区等违规行为。

  • 交通路口:分析行人闯红灯、车辆违规变道等事件。

  • 居家养老:监测老人摔倒、长时间静止等异常状况。

三、环境安装

需要安装以下 Python 依赖库。请在终端或命令提示符中运行以下命令进行安装:

pip install ultralytics opencv-python openai python-dotenv Pillow

依赖说明

用途
ultralytics 加载并运行 YOLOv8 模型,支持目标检测与跟踪
opencv-python 视频读取、图像处理、绘制检测框、保存输出视频
openai 调用阿里云百炼平台的 Qwen3.5 API(兼容 OpenAI 接口)
python-dotenv 从 .env 文件中读取 DASHSCOPE_API_KEY 环境变量
Pillow 在视频帧上绘制中文字符(OpenCV 原生不支持中文)

四、测试代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import cv2
import json
import base64
import re
import threading
from queue import Queue
from dotenv import load_dotenv
from openai import OpenAI
from ultralytics import YOLO
import numpy as np
from PIL import Image, ImageDraw, ImageFont

load_dotenv()

QWEN_API_KEY = os.getenv("DASHSCOPE_API_KEY", "your-api-key-here")
QWEN_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
MODEL_NAME = "qwen3.5-omni-flash"

INPUT_VIDEO_PATH = "input_video.mp4"
OUTPUT_VIDEO_PATH = "output_analyzed.mp4"
YOLO_MODEL_PATH = "yolov8n.pt"
FRAME_SKIP = 30
CONF_THRESHOLD = 0.5
SHOW_PREVIEW = True
SAVE_RESULT = True

# 1080P 宽度 = 1920
TARGET_WIDTH = 1920
FONT_SIZE = 32          # 主文本字体大小
FONT_SMALL_SIZE = 24    # 辅助文本字体大小

# 中文字体路径(Windows 黑体,其他系统请自行修改)
FONT_PATH = "C:/Windows/Fonts/simhei.ttf"

client = OpenAI(api_key=QWEN_API_KEY, base_url=QWEN_BASE_URL)

def load_yolo_model(model_path):
    try:
        print(f"Loading YOLOv8 model: {model_path}")
        model = YOLO(model_path)
        return model
    except Exception as e:
        print(f"模型加载失败: {e}")
        if os.path.exists(model_path):
            os.remove(model_path)
        model = YOLO(model_path)
        return model

model = load_yolo_model(YOLO_MODEL_PATH)

def frame_to_base64(frame):
    _, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
    b64_str = base64.b64encode(buffer).decode('utf-8')
    return f"data:image/jpeg;base64,{b64_str}"

def extract_json_from_text(text):
    pattern = r'```(?:json)?\s*(\{.*?\})\s*```'
    match = re.search(pattern, text, re.DOTALL)
    if match:
        return match.group(1)
    start = text.find('{')
    end = text.rfind('}')
    if start != -1 and end != -1 and end > start:
        return text[start:end+1]
    return text

def analyze_behavior_with_qwen(detections, frame_image_path=None):
    detections_summary = []
    for d in detections:
        detections_summary.append({
            "track_id": d['track_id'],
            "class": d['class'],
            "confidence": d['confidence'],
            "bbox": d['bbox']
        })
    
    prompt = f"""
你是一个专业的视频监控行为分析专家。请基于以下YOLOv8模型检测到的物体信息,分析当前场景是否存在异常行为。
检测到的物体列表(JSON格式):{json.dumps(detections_summary, ensure_ascii=False, indent=2)}
请按以下JSON格式输出你的分析结果,不要包含其他任何解释或标记:
{{
    "scene_summary": "一句话概括当前场景",
    "abnormal_behavior": true/false,
    "risk_level": "低/中/高",
    "analysis_details": "详细的行为分析说明",
    "suggestion": "如果有异常,请给出处理建议"
}}
"""
    print("\n" + "="*50)
    print(f"[API调用] 发送分析请求,检测到 {len(detections)} 个物体")
    print("="*50)

    user_content = [{"type": "text", "text": prompt}]
    if frame_image_path:
        user_content.insert(0, {"type": "image_url", "image_url": {"url": frame_image_path}})
    
    try:
        response = client.chat.completions.create(
            model=MODEL_NAME,
            messages=[{"role": "user", "content": user_content}],
            temperature=0.3,
            max_tokens=500,
        )
        raw_text = response.choices[0].message.content
        print(f"[API返回原始内容] {raw_text}")
        
        json_str = extract_json_from_text(raw_text)
        try:
            result_json = json.loads(json_str)
            print(f"[解析结果] 异常行为: {result_json.get('abnormal_behavior')}, 风险等级: {result_json.get('risk_level')}")
            print(f"           场景概括: {result_json.get('scene_summary')}")
            return result_json
        except json.JSONDecodeError as e:
            print(f"[JSON解析失败] {e}")
            return {"scene_summary": raw_text[:100], "abnormal_behavior": False, "risk_level": "未知"}
    except Exception as e:
        print(f"[API调用失败] {e}")
        return None

def draw_chinese_text_opencv(img, text, position, font_path, font_size, color_bgr):
    """使用 PIL 在 OpenCV 图像上绘制中文文本"""
    try:
        img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        draw = ImageDraw.Draw(img_pil)
        font = ImageFont.truetype(font_path, font_size)
        color_rgb = (color_bgr[2], color_bgr[1], color_bgr[0])
        draw.text(position, text, font=font, fill=color_rgb)
        img[:] = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
    except Exception as e:
        print(f"[中文绘制错误] {e}")
    return img

def draw_analysis_on_frame(frame, detections, latest_analysis):
    # 绘制 YOLO 检测框(英文用 OpenCV 原生方法)
    for det in detections:
        bbox = det['bbox']
        x1, y1, x2, y2 = map(int, bbox)
        label = f"{det['class']} {det['confidence']:.2f} ID:{det['track_id']}"
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    
    # 绘制中文分析结果
    if latest_analysis and isinstance(latest_analysis, dict):
        summary = latest_analysis.get('scene_summary', '无描述')
        abnormal = latest_analysis.get('abnormal_behavior')
        risk = latest_analysis.get('risk_level', '未知')
        status_text = f"异常: {'是' if abnormal else '否'} | 风险: {risk}"
        suggestion = latest_analysis.get('suggestion', '')
        short_sug = suggestion[:60] + ('...' if len(suggestion) > 60 else '') if suggestion else ''
        
        # 绘制三行文本,调整位置以适应 1080P
        frame = draw_chinese_text_opencv(frame, summary, (20, 50), FONT_PATH, FONT_SIZE, (0, 0, 255))
        frame = draw_chinese_text_opencv(frame, status_text, (20, 100), FONT_PATH, FONT_SMALL_SIZE, (0, 0, 255))
        if short_sug:
            frame = draw_chinese_text_opencv(frame, short_sug, (20, 140), FONT_PATH, FONT_SMALL_SIZE, (0, 0, 255))
    return frame

def resize_frame_to_width(frame, target_width):
    """高质量缩放,使用 LANCZOS 插值"""
    h, w = frame.shape[:2]
    if w == target_width:
        return frame
    ratio = target_width / w
    new_h = int(h * ratio)
    # 使用 INTER_LANCZOS4 获得更清晰的缩放结果
    return cv2.resize(frame, (target_width, new_h), interpolation=cv2.INTER_LANCZOS4)

def main():
    if not os.path.exists(INPUT_VIDEO_PATH):
        print(f"错误:视频文件不存在 - {INPUT_VIDEO_PATH}")
        return

    cap = cv2.VideoCapture(INPUT_VIDEO_PATH)
    original_fps = int(cap.get(cv2.CAP_PROP_FPS))
    original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    print(f"原始视频: {original_width}x{original_height}, {original_fps} fps")
    
    output_width = TARGET_WIDTH
    # 读取一帧确定缩放后的高度
    ret, sample_frame = cap.read()
    if not ret:
        print("无法读取视频帧")
        return
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
    sample_resized = resize_frame_to_width(sample_frame, output_width)
    output_height = sample_resized.shape[0]
    print(f"输出视频: {output_width}x{output_height}, {original_fps} fps")

    out = None
    if SAVE_RESULT:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(OUTPUT_VIDEO_PATH, fourcc, original_fps, (output_width, output_height))
        # 尝试设置更高的比特率以减少模糊(部分后端支持)
        if out.isOpened():
            try:
                out.set(cv2.CAP_PROP_BITRATE, 20000000)  # 20 Mbps
                print("已设置输出比特率为 20 Mbps")
            except:
                print("当前后端不支持设置比特率,使用默认编码质量")

    analysis_queue = Queue()
    latest_analysis = None
    analysis_lock = threading.Lock()

    def worker():
        nonlocal latest_analysis
        while True:
            item = analysis_queue.get()
            if item is None:
                break
            frame_id, frame_b64, detections = item
            result = analyze_behavior_with_qwen(detections, frame_b64)
            if result:
                with analysis_lock:
                    latest_analysis = result
            analysis_queue.task_done()

    worker_thread = threading.Thread(target=worker, daemon=True)
    worker_thread.start()

    frame_idx = 0
    last_analysis_frame = -FRAME_SKIP

    for result in model.track(source=INPUT_VIDEO_PATH, stream=True, persist=True, conf=CONF_THRESHOLD):
        frame_idx += 1
        annotated_frame = result.plot()
        
        detections = []
        if result.boxes is not None:
            boxes = result.boxes.xyxy.cpu().numpy()
            track_ids = result.boxes.id.cpu().numpy() if result.boxes.id is not None else [-1]*len(boxes)
            classes = result.boxes.cls.cpu().numpy()
            confs = result.boxes.conf.cpu().numpy()
            for box, tid, cls, conf in zip(boxes, track_ids, classes, confs):
                detections.append({
                    'track_id': int(tid),
                    'class': model.names[int(cls)],
                    'confidence': float(conf),
                    'bbox': box.tolist()
                })

        if (frame_idx - last_analysis_frame) >= FRAME_SKIP:
            last_analysis_frame = frame_idx
            frame_b64 = frame_to_base64(annotated_frame)
            analysis_queue.put((frame_idx, frame_b64, detections))

        with analysis_lock:
            current_analysis = latest_analysis

        output_frame = draw_analysis_on_frame(annotated_frame, detections, current_analysis)
        output_frame = resize_frame_to_width(output_frame, TARGET_WIDTH)

        if SHOW_PREVIEW:
            cv2.imshow("YOLOv8 + Qwen3.5 Analysis", output_frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        if out:
            out.write(output_frame)

        if frame_idx % 100 == 0:
            print(f"已处理 {frame_idx} 帧")

    analysis_queue.put(None)
    analysis_queue.join()
    cap.release()
    if out:
        out.release()
    cv2.destroyAllWindows()
    print(f"处理完成!输出视频: {OUTPUT_VIDEO_PATH} (1080P)")

if __name__ == "__main__":
    main()

输出结果

注意QWen3.5模型免费用量是1,000,000,不要超过哈,超过是要费用的。

后面想在RK3568上处理。

五、新增加功能

1. 电子围栏

  • 配置文件 zones.json 定义禁区多边形(支持多个区域)。

  • 坐标基于原始视频分辨率,程序自动根据输出尺寸缩放。

  • 每帧检测每个目标的底部中心点是否落入禁区,若入侵则:

    • 在画面中绘制红色半透明填充区域和入侵标记。

    • 触发独立报警(不依赖Qwen周期)。

    • 防抖机制:同一目标同一区域在 1 秒内只报警一次。

2. 报警日志

  • 报警源:

    • Qwen分析返回 abnormal_behavior = true 且 risk_level 为“中”或“高”时。

    • 电子围栏入侵检测时。

  • 每次报警保存截图到 alerts/ 目录(文件名:alert_时间戳_帧号_类型.jpg)。

  • 追加写入 alerts/alerts.log(JSON Lines 格式),包含时间、帧号、类型、风险等级、详情、截图路径。

  • 终端打印红色报警信息。

3. 统计报告

  • 程序结束时生成 report.json 和 report.csv 保存在当前目录。

  • 报告内容:

    • 视频基本信息(总帧数、时长、分辨率、处理帧率)。

    • 物体检测统计(每类物体出现次数、总目标数)。

    • Qwen 异常统计(总分析次数、异常次数、风险等级分布)。

    • 电子围栏入侵事件统计(总入侵次数、各区域入侵次数)。

    • 生成时间戳。

4、新增文件zones.json

#zones.json 示例(定义两个禁区)
json
{
    "zones": [
        {
            "name": "变电站大门",
            "points": [[500, 200], [700, 200], [700, 400], [500, 400]],
            "risk_level": "高"
        },
        {
            "name": "铁路轨道",
            "points": [[100, 800], [1200, 800], [1300, 900], [0, 900]],
            "risk_level": "中"
        }
    ]
}

5、代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import cv2
import json
import base64
import re
import threading
import time
import csv
from datetime import datetime
from queue import Queue
from dotenv import load_dotenv
from openai import OpenAI
from ultralytics import YOLO
import numpy as np
from PIL import Image, ImageDraw, ImageFont
from collections import defaultdict

load_dotenv()

# ==================== 配置参数 ====================
QWEN_API_KEY = os.getenv("DASHSCOPE_API_KEY", "")
QWEN_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
MODEL_NAME = "qwen3.5-omni-flash"

INPUT_VIDEO_PATH = "input_video.mp4"
OUTPUT_VIDEO_PATH = "output_analyzed.mp4"
YOLO_MODEL_PATH = "yolov8n.pt"
FRAME_SKIP = 30          # 每隔多少帧调用一次Qwen
CONF_THRESHOLD = 0.5
SHOW_PREVIEW = True
SAVE_RESULT = True

TARGET_WIDTH = 1920
FONT_SIZE = 32
FONT_SMALL_SIZE = 28
FONT_PATH = "simhei.ttf"      # 确保此文件存在,或改为其他可用中文字体

# 报警与统计相关配置
ALERTS_DIR = "alerts"          # 报警截图保存目录
ZONES_CONFIG = "zones.json"    # 电子围栏配置文件
COOLDOWN_SEC = 1.0             # 同一目标同一区域报警冷却时间(秒)

# ==================== 初始化客户端和模型 ====================
client = OpenAI(api_key=QWEN_API_KEY, base_url=QWEN_BASE_URL)

def load_yolo_model(model_path):
    try:
        print(f"Loading YOLOv8 model: {model_path}")
        model = YOLO(model_path)
        return model
    except Exception as e:
        print(f"模型加载失败: {e}")
        if os.path.exists(model_path):
            os.remove(model_path)
        model = YOLO(model_path)
        return model

model = load_yolo_model(YOLO_MODEL_PATH)

# ==================== 电子围栏相关 ====================
class Zone:
    def __init__(self, name, points, risk_level, original_size, target_size):
        """
        points: 绝对坐标列表 [(x1,y1),...],基于原始视频分辨率
        original_size: (width, height)
        target_size: (width, height)
        """
        self.name = name
        self.risk_level = risk_level
        # 将坐标缩放到目标尺寸
        scale_x = target_size[0] / original_size[0]
        scale_y = target_size[1] / original_size[1]
        self.scaled_points = [(int(x * scale_x), int(y * scale_y)) for (x, y) in points]
        self.polygon = np.array(self.scaled_points, dtype=np.int32)

    def contains_point(self, point):
        """point: (x,y) 缩放后的坐标"""
        return cv2.pointPolygonTest(self.polygon, point, False) >= 0

def load_zones(original_size, target_size):
    if not os.path.exists(ZONES_CONFIG):
        print("未找到电子围栏配置文件,跳过禁区检测。")
        return []
    with open(ZONES_CONFIG, 'r', encoding='utf-8') as f:
        data = json.load(f)
    zones = []
    for zone_data in data.get("zones", []):
        name = zone_data["name"]
        points = zone_data["points"]
        risk = zone_data.get("risk_level", "中")
        zone = Zone(name, points, risk, original_size, target_size)
        zones.append(zone)
    print(f"加载 {len(zones)} 个电子围栏区域")
    return zones

def draw_zones(frame, zones):
    """在帧上绘制半透明填充的禁区"""
    overlay = frame.copy()
    for zone in zones:
        cv2.fillPoly(overlay, [zone.polygon], (0, 0, 255))  # 红色填充
    # 透明度混合
    cv2.addWeighted(overlay, 0.3, frame, 0.7, 0, frame)
    for zone in zones:
        cv2.polylines(frame, [zone.polygon], True, (0, 0, 255), 2)
        # 在区域左上角标注名称
        x, y = zone.scaled_points[0]
        cv2.putText(frame, zone.name, (x, y-5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,255), 2)
    return frame

# ==================== 报警与统计模块 ====================
class AlertLogger:
    def __init__(self, alerts_dir="alerts"):
        self.alerts_dir = alerts_dir
        self.log_file = os.path.join(alerts_dir, "alerts.log")
        self.last_alert_time = {}  # 防抖: (track_id, zone_name) -> last_alert_timestamp
        if not os.path.exists(alerts_dir):
            os.makedirs(alerts_dir)
        # 清空旧日志(可选)
        if os.path.exists(self.log_file):
            open(self.log_file, 'w').close()

    def save_screenshot(self, frame, frame_idx, alert_type, zone_name=""):
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"alert_{timestamp}_frame{frame_idx}_{alert_type}"
        if zone_name:
            filename += f"_{zone_name}"
        filename += ".jpg"
        filepath = os.path.join(self.alerts_dir, filename)
        cv2.imwrite(filepath, frame)
        return filepath

    def log_alert(self, frame, frame_idx, alert_type, risk_level, details, zone_name=""):
        # 防抖检查:由调用者负责
        snapshot_path = self.save_screenshot(frame, frame_idx, alert_type, zone_name)
        alert_record = {
            "timestamp": datetime.now().isoformat(),
            "frame_idx": frame_idx,
            "type": alert_type,
            "risk_level": risk_level,
            "zone": zone_name if zone_name else None,
            "details": details,
            "snapshot": snapshot_path
        }
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(json.dumps(alert_record, ensure_ascii=False) + "\n")
        print(f"\033[91m[ALERT] {alert_type} | 风险:{risk_level} | 帧:{frame_idx} | {details}\033[0m")
        return snapshot_path

class StatisticsCollector:
    def __init__(self):
        self.total_frames = 0
        self.processed_frames = 0
        self.video_fps = 0
        self.video_duration = 0
        self.object_counts = defaultdict(int)    # 类别 -> 计数
        self.qwen_calls = 0
        self.qwen_abnormal_count = 0
        self.qwen_risk_dist = {"低":0, "中":0, "高":0}
        self.intrusion_events = 0
        self.intrusion_by_zone = defaultdict(int)

    def update_object_count(self, class_name):
        self.object_counts[class_name] += 1

    def update_qwen_result(self, result):
        self.qwen_calls += 1
        if result.get("abnormal_behavior"):
            self.qwen_abnormal_count += 1
        risk = result.get("risk_level", "未知")
        if risk in self.qwen_risk_dist:
            self.qwen_risk_dist[risk] += 1

    def update_intrusion(self, zone_name):
        self.intrusion_events += 1
        self.intrusion_by_zone[zone_name] += 1

    def generate_report(self, video_path):
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        fps = cap.get(cv2.CAP_PROP_FPS)
        duration = total_frames / fps if fps > 0 else 0
        cap.release()
        self.total_frames = total_frames
        self.video_fps = fps
        self.video_duration = duration

        report = {
            "generated_at": datetime.now().isoformat(),
            "video": {
                "path": video_path,
                "total_frames": self.total_frames,
                "duration_sec": round(self.video_duration, 2),
                "fps": self.video_fps
            },
            "detection": {
                "total_objects": sum(self.object_counts.values()),
                "class_counts": dict(self.object_counts)
            },
            "qwen_analysis": {
                "calls": self.qwen_calls,
                "abnormal_behaviors": self.qwen_abnormal_count,
                "risk_distribution": self.qwen_risk_dist
            },
            "intrusion_alerts": {
                "total_intrusions": self.intrusion_events,
                "by_zone": dict(self.intrusion_by_zone)
            }
        }
        # 保存 JSON
        with open("report.json", "w", encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
        # 保存 CSV
        with open("report.csv", "w", newline='', encoding='utf-8-sig') as f:
            writer = csv.writer(f)
            writer.writerow(["指标", "数值"])
            writer.writerow(["总帧数", self.total_frames])
            writer.writerow(["视频时长(秒)", round(self.video_duration,2)])
            writer.writerow(["总检测物体数", sum(self.object_counts.values())])
            writer.writerow(["Qwen调用次数", self.qwen_calls])
            writer.writerow(["异常行为次数", self.qwen_abnormal_count])
            writer.writerow(["电子围栏入侵次数", self.intrusion_events])
            writer.writerow(["高风险报警次数", self.qwen_risk_dist["高"] + self.intrusion_events])
        print(f"\n统计报告已生成: report.json, report.csv")

# ==================== 辅助函数 ====================
def frame_to_base64(frame):
    _, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
    b64_str = base64.b64encode(buffer).decode('utf-8')
    return f"data:image/jpeg;base64,{b64_str}"

def extract_json_from_text(text):
    pattern = r'```(?:json)?\s*(\{.*?\})\s*```'
    match = re.search(pattern, text, re.DOTALL)
    if match:
        return match.group(1)
    start = text.find('{')
    end = text.rfind('}')
    if start != -1 and end != -1 and end > start:
        return text[start:end+1]
    return text

def analyze_behavior_with_qwen(detections, frame_image_path=None):
    detections_summary = []
    for d in detections:
        detections_summary.append({
            "track_id": d['track_id'],
            "class": d['class'],
            "confidence": d['confidence'],
            "bbox": d['bbox']
        })
    prompt = f"""
你是一个专业的视频监控行为分析专家。请基于以下YOLOv8模型检测到的物体信息,分析当前场景是否存在异常行为。
检测到的物体列表(JSON格式):{json.dumps(detections_summary, ensure_ascii=False, indent=2)}
请按以下JSON格式输出你的分析结果,不要包含其他任何解释或标记:
{{
    "scene_summary": "一句话概括当前场景",
    "abnormal_behavior": true/false,
    "risk_level": "低/中/高",
    "analysis_details": "详细的行为分析说明",
    "suggestion": "如果有异常,请给出处理建议"
}}
"""
    user_content = [{"type": "text", "text": prompt}]
    if frame_image_path:
        user_content.insert(0, {"type": "image_url", "image_url": {"url": frame_image_path}})
    try:
        response = client.chat.completions.create(
            model=MODEL_NAME,
            messages=[{"role": "user", "content": user_content}],
            temperature=0.3,
            max_tokens=500,
        )
        raw_text = response.choices[0].message.content
        json_str = extract_json_from_text(raw_text)
        result_json = json.loads(json_str)
        return result_json
    except Exception as e:
        print(f"[API调用失败] {e}")
        return None

def draw_chinese_text_opencv(img, text, position, font_path, font_size, color_bgr):
    try:
        img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        draw = ImageDraw.Draw(img_pil)
        font = ImageFont.truetype(font_path, font_size)
        color_rgb = (color_bgr[2], color_bgr[1], color_bgr[0])
        draw.text(position, text, font=font, fill=color_rgb)
        img[:] = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
    except Exception as e:
        print(f"[中文绘制错误] {e}")
    return img

def resize_frame_to_width(frame, target_width):
    h, w = frame.shape[:2]
    if w == target_width:
        return frame
    ratio = target_width / w
    new_h = int(h * ratio)
    return cv2.resize(frame, (target_width, new_h), interpolation=cv2.INTER_LANCZOS4)

def draw_analysis_on_frame(frame, detections, latest_analysis, alert_flag=False):
    # 绘制检测框
    for det in detections:
        bbox = det['bbox']
        x1, y1, x2, y2 = map(int, bbox)
        label = f"{det['class']} {det['confidence']:.2f} ID:{det['track_id']}"
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    # 绘制Qwen分析结果
    if latest_analysis and isinstance(latest_analysis, dict):
        summary = latest_analysis.get('scene_summary', '无描述')
        abnormal = latest_analysis.get('abnormal_behavior')
        risk = latest_analysis.get('risk_level', '未知')
        status_text = f"异常: {'是' if abnormal else '否'} | 风险: {risk}"
        suggestion = latest_analysis.get('suggestion', '')
        short_sug = suggestion[:60] + ('...' if len(suggestion) > 60 else '') if suggestion else ''
        frame = draw_chinese_text_opencv(frame, summary, (20, 50), FONT_PATH, FONT_SIZE, (0, 0, 255))
        frame = draw_chinese_text_opencv(frame, status_text, (20, 100), FONT_PATH, FONT_SMALL_SIZE, (0, 0, 255))
        if short_sug:
            frame = draw_chinese_text_opencv(frame, short_sug, (20, 140), FONT_PATH, FONT_SMALL_SIZE, (0, 0, 255))
    # 如果有电子围栏入侵等实时报警,额外绘制红色警示
    if alert_flag:
        cv2.putText(frame, "!! INTRUSION ALERT !!", (frame.shape[1]//2-150, 80),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
    return frame

# ==================== 主函数 ====================
def main():
    if not os.path.exists(INPUT_VIDEO_PATH):
        print(f"错误:视频文件不存在 - {INPUT_VIDEO_PATH}")
        return

    cap = cv2.VideoCapture(INPUT_VIDEO_PATH)
    original_fps = int(cap.get(cv2.CAP_PROP_FPS))
    original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    print(f"原始视频: {original_width}x{original_height}, {original_fps} fps")

    output_width = TARGET_WIDTH
    ret, sample_frame = cap.read()
    if not ret:
        print("无法读取视频帧")
        return
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
    sample_resized = resize_frame_to_width(sample_frame, output_width)
    output_height = sample_resized.shape[0]
    print(f"输出视频: {output_width}x{output_height}, {original_fps} fps")

    # 加载电子围栏(基于原始尺寸和输出尺寸)
    zones = load_zones((original_width, original_height), (output_width, output_height))

    # 初始化报警器和统计器
    alert_logger = AlertLogger(ALERTS_DIR)
    stats = StatisticsCollector()

    # 视频写入器
    out = None
    if SAVE_RESULT:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(OUTPUT_VIDEO_PATH, fourcc, original_fps, (output_width, output_height))

    # Qwen 分析线程
    analysis_queue = Queue()
    latest_analysis = None
    analysis_lock = threading.Lock()

    def worker():
        nonlocal latest_analysis
        while True:
            item = analysis_queue.get()
            if item is None:
                break
            frame_id, frame_b64, detections = item
            result = analyze_behavior_with_qwen(detections, frame_b64)
            if result:
                with analysis_lock:
                    latest_analysis = result
                # 更新统计数据
                stats.update_qwen_result(result)
                # 若异常且风险为中/高,单独报警(不依赖围栏)
                if result.get("abnormal_behavior") and result.get("risk_level") in ["中", "高"]:
                    # 注意:此处截图使用的是原始 frame_b64 对应的帧,但我们需要实际的frame对象,
                    # 为了方便,我们可以传递帧对象,但 worker 线程没有当前帧。解决方案:在放入队列时同时传递帧。
                    # 为简化,我们可以将报警触发移到主线程中(利用 latest_analysis 的更新来触发)。
                    # 这里先不实现,后面在主线程中检测 latest_analysis 变化并报警。
                    pass
            analysis_queue.task_done()

    worker_thread = threading.Thread(target=worker, daemon=True)
    worker_thread.start()

    frame_idx = 0
    last_analysis_frame = -FRAME_SKIP
    last_alert_time = {}   # 防抖字典 (track_id, zone_name) -> last_alert_timestamp

    for result in model.track(source=INPUT_VIDEO_PATH, stream=True, persist=True, conf=CONF_THRESHOLD):
        frame_idx += 1
        annotated_frame = result.plot()   # YOLO 标注后的帧(原始尺寸)
        # 检测结果提取
        detections = []
        if result.boxes is not None:
            boxes = result.boxes.xyxy.cpu().numpy()
            track_ids = result.boxes.id.cpu().numpy() if result.boxes.id is not None else [-1]*len(boxes)
            classes = result.boxes.cls.cpu().numpy()
            confs = result.boxes.conf.cpu().numpy()
            for box, tid, cls, conf in zip(boxes, track_ids, classes, confs):
                class_name = model.names[int(cls)]
                detections.append({
                    'track_id': int(tid),
                    'class': class_name,
                    'confidence': float(conf),
                    'bbox': box.tolist()
                })
                stats.update_object_count(class_name)
        # 缩放当前帧到输出尺寸(用于后续绘制和报警)
        frame_resized = resize_frame_to_width(annotated_frame, output_width)
        # 绘制电子围栏
        if zones:
            frame_resized = draw_zones(frame_resized, zones)
        # 电子围栏入侵检测(基于缩放后的帧坐标)
        intrusion_alert_flag = False
        current_time = time.time()
        for det in detections:
            bbox = det['bbox']
            # 底部中心点(目标接触地面的点)
            center_x = (bbox[0] + bbox[2]) / 2
            bottom_y = bbox[3]   # y2
            # 缩放坐标
            scale_x = output_width / original_width
            scale_y = output_height / original_height
            pt_scaled = (int(center_x * scale_x), int(bottom_y * scale_y))
            for zone in zones:
                if zone.contains_point(pt_scaled):
                    key = (det['track_id'], zone.name)
                    last_alert = last_alert_time.get(key, 0)
                    if current_time - last_alert >= COOLDOWN_SEC:
                        last_alert_time[key] = current_time
                        # 报警
                        details = f"目标 {det['class']} ID:{det['track_id']} 进入禁区 {zone.name}"
                        alert_logger.log_alert(frame_resized, frame_idx, "intrusion",
                                               zone.risk_level, details, zone.name)
                        stats.update_intrusion(zone.name)
                        intrusion_alert_flag = True
                    break  # 一个目标只触发第一个入侵区域
        # 周期性调用 Qwen
        if (frame_idx - last_analysis_frame) >= FRAME_SKIP:
            last_analysis_frame = frame_idx
            frame_b64 = frame_to_base64(frame_resized)
            analysis_queue.put((frame_idx, frame_b64, detections))
        # 检查最新的 Qwen 分析结果是否触发报警
        with analysis_lock:
            current_analysis = latest_analysis
        qwen_alert_flag = False
        if current_analysis and current_analysis.get("abnormal_behavior") and current_analysis.get("risk_level") in ["中", "高"]:
            # 防止重复报警:添加一个标志记录本次分析是否已报警
            if not hasattr(main, "_last_qwen_alarm_frame") or frame_idx - main._last_qwen_alarm_frame > FRAME_SKIP:
                main._last_qwen_alarm_frame = frame_idx
                details = f"Qwen异常分析: {current_analysis.get('analysis_details', '')}"
                alert_logger.log_alert(frame_resized, frame_idx, "qwen_abnormal",
                                       current_analysis.get("risk_level"), details)
                qwen_alert_flag = True
        # 绘制文本和报警标记
        output_frame = draw_analysis_on_frame(frame_resized, detections, current_analysis,
                                              alert_flag=intrusion_alert_flag or qwen_alert_flag)
        # 显示/保存
        if SHOW_PREVIEW:
            cv2.imshow("YOLOv8 + Qwen3.5 Analysis", output_frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        if out:
            out.write(output_frame)
        if frame_idx % 100 == 0:
            print(f"已处理 {frame_idx} 帧")

    # 清理资源
    analysis_queue.put(None)
    analysis_queue.join()
    cap.release()
    if out:
        out.release()
    cv2.destroyAllWindows()

    # 生成统计报告
    stats.generate_report(INPUT_VIDEO_PATH)
    print(f"处理完成!输出视频: {OUTPUT_VIDEO_PATH} (1080P)")
    print(f"报警日志保存在: {ALERTS_DIR}/alerts.log")
    print(f"报警截图保存在: {ALERTS_DIR}/")

if __name__ == "__main__":
    main()

如有侵权,或需要完整代码,请及时联系博主。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐