这是一个使用YOLO识别模型的代码,k230本地算力有限,使用wifi传递到电脑上进行识别再传递给K230实现视觉识别。

前面踩坑部分需要看的https://blog.csdn.net/weixin_55008315/article/details/161088340

1.K230代码(填入你自己的wifi账号密码2.4g的)

可以直接覆盖原版main

import time, os, sys, network, socket, struct, gc
from ybUtils.YbKey import YbKey 
from media.sensor import *
from media.display import *
from media.media import *
from ybUtils.YbBuzzer import YbBuzzer      
from ybUtils.YbRGB import YbRGB            

# ================= 核心配置区 =================
WIFI_SSID = "XXXXXXX"         # ⚠️ 换回你真实的 Wi-Fi 名字
WIFI_PASSWORD = "123456789"     # ⚠️ 换回你真实的 Wi-Fi 密码
PC_IP = "192.168.1.6"          # ⚠️ 确保你电脑当前的 IP 没变
PC_PORT = 8080                 
# ==========================================

COLOR_RED    = (255, 0, 0)
COLOR_GREEN  = (0, 255, 0)
COLOR_YELLOW = (255, 255, 0)
COLOR_OFF    = (0, 0, 0)

# ---- 网络通信辅助工具函数 ----
def send_all(sock, data):
    total_sent = 0
    chunk_size = 4096  
    view = memoryview(data) 
    while total_sent < len(data):
        try:
            sent = sock.send(view[total_sent : total_sent + chunk_size])
            if sent == 0: raise RuntimeError("Socket 写入断开")
            total_sent += sent
        except OSError as e:
            if e.args[0] == 11: 
                time.sleep(0.005)
                continue
            else: raise e

def recv_all(sock, count):
    buf = b''
    while count:
        try:
            newbuf = sock.recv(count)
            if not newbuf: return None
            buf += newbuf
            count -= len(newbuf)
        except OSError as e:
            if e.args[0] == 11:
                time.sleep(0.005)
                continue
            else: raise e
    return buf


# ========================================================
# 📦 核心业务函数:将所有循环与功能完全封装在内部
# ========================================================
def main():
    buzzer = YbBuzzer()
    rgb = YbRGB()
    sock = None
    sensor = None

    try:
        # 1. 硬件屏幕与摄像头初始化
        sensor = Sensor(id=2) 
        sensor.reset()
        sensor.set_framesize(width=640, height=480, chn=CAM_CHN_ID_0)
        sensor.set_pixformat(Sensor.RGB565, chn=CAM_CHN_ID_0)
        Display.init(Display.ST7701, width=640, height=480, to_ide=True)
        MediaManager.init()
        sensor.run()
        print("✅ 硬件初始化完成,可视监控主循环启动!")

        # 2. 初始化网络硬件
        wlan = network.WLAN(network.STA_IF)
        wlan.active(True)

        is_connected = False
        last_alarm_state = 0
        last_reconnect_time = time.ticks_ms()

        # 3. 核心功能死循环
        while True:
            os.exitpoint()
            img = sensor.snapshot(chn=CAM_CHN_ID_0)

            # --------------------------------------------------
            # 状态 A:Wi-Fi 未连接路由器
            # --------------------------------------------------
            if not wlan.isconnected():
                is_connected = False
                if sock:
                    try: sock.close()
                    except: pass
                    sock = None
                
                # 屏幕绘图提示
                text_x, text_y = 400, 10
                img.draw_rectangle(text_x - 10, text_y, 230, 40, color=(150, 0, 0, 0), fill=True)
                img.draw_string_advanced(text_x, text_y + 5, 20, "无线断开,正在连接...", color=(255, 100, 100))
                
                try: wlan.connect(WIFI_SSID, WIFI_PASSWORD)
                except: pass
                
                Display.show_image(img, x=0, y=0)
                time.sleep(0.5) 
                gc.collect()
                continue

            # --------------------------------------------------
            # 状态 B:Wi-Fi 已连,等待呼叫 PC 大脑
            # --------------------------------------------------
            if not is_connected:
                text_x, text_y = 420, 10
                img.draw_rectangle(text_x - 10, text_y, 210, 40, color=(150, 0, 0, 0), fill=True)
                img.draw_string_advanced(text_x, text_y + 5, 20, "已连无线,等待PC...", color=(255, 255, 100))
                
                if time.ticks_diff(time.ticks_ms(), last_reconnect_time) > 3000:
                    last_reconnect_time = time.ticks_ms()
                    try:
                        if sock: sock.close()
                        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                        sock.settimeout(1.5)
                        sock.connect((PC_IP, PC_PORT))
                        sock.setblocking(True) 
                        is_connected = True
                        print("✅ 双向链路构建成功!")
                    except OSError:
                        if sock: sock.close()
                        sock = None
                
                Display.show_image(img, x=0, y=0)
                gc.collect()
                continue

            # --------------------------------------------------
            # 状态 C:全量流媒体无线图传中
            # --------------------------------------------------
            try:
                img_jpg = img.compress(quality=40)
                img_bytes = img_jpg.bytearray()
                send_all(sock, struct.pack('<I', len(img_bytes)))
                send_all(sock, img_bytes)
                
                reply = recv_all(sock, 4)
                if not reply: raise RuntimeError("PC 端断开链接")
                jellyfish_count = struct.unpack('<I', reply)[0]

                # 蜂鸣器与发光二极管联调
                if jellyfish_count == 0: current_state = 0
                elif jellyfish_count <= 5: current_state = 1
                else: current_state = 2

                if current_state != last_alarm_state:
                    if current_state == 1:
                        buzzer.beep()            
                        rgb.show_rgb(COLOR_RED)  
                    elif current_state == 2:
                        buzzer.off()             
                        rgb.show_rgb(COLOR_YELLOW) 
                    else:
                        buzzer.off()
                        rgb.show_rgb(COLOR_GREEN)  
                    last_alarm_state = current_state
                    
                # 动态刷新板载屏幕数据
                text_x, text_y = 440, 10
                img.draw_rectangle(text_x - 10, text_y, 190, 40, color=(150, 0, 0, 0), fill=True)
                img.draw_string_advanced(text_x, text_y + 5, 24, f"水母数量: {jellyfish_count}", color=(255, 255, 0))

                Display.show_image(img, x=0, y=0)
                del img_jpg

            except Exception as e:
                print(f"⚠️ 运行时突发网络中断: {e}")
                if sock: sock.close()
                sock = None
                is_connected = False
                last_reconnect_time = time.ticks_ms()
                buzzer.off()
                rgb.show_rgb(COLOR_OFF)
                last_alarm_state = 0
                
            gc.collect()

    except KeyboardInterrupt:
        print("\n用户手动安全终止进程")
    except Exception as e:
        print(f"\n❌ main 函数内部发生致命崩溃: {e}")
    finally:
        # 即使发生意外,函数退出时也会自我洗刷干净,绝不残留硬件资源
        rgb.show_rgb(COLOR_OFF)
        buzzer.off()
        if sock: 
            try: sock.close()
            except: pass
        if sensor and isinstance(sensor, Sensor): 
            sensor.stop()
        Display.deinit()
        MediaManager.deinit()
        print("--- main 函数生命周期结束,硬件资源安全闭环 ---")


# ========================================================
# 🛡️ 亚博官方开机安全保护与条件触发器
# ========================================================
print("系统通电,给板载硬件 1 秒钟的电压稳定防抖时间...")
time.sleep(1) 

key = YbKey()
abort_main = 0

if key.is_pressed():
    time.sleep_ms(10)
    if key.is_pressed():
        abort_main = 1
        print("⚠️ 检测到安全按键按下,已拦截 main() 函数运行,已进入全安全模式。")

# 干净、标准、优雅的触发判断
if not abort_main:
    print("🚀 未检测到拦截信号,正在切入 main() 监控中心...")
    main()

电脑端,电脑端调用模型识别

在电脑端,这里面识别效果

2.电脑端代码

import tkinter as tk
from tkinter import messagebox, ttk
from PIL import Image, ImageTk
import cv2
import socket
import threading
import struct
import numpy as np
import time
import os
import sys
import subprocess

try:
    from ultralytics import YOLO

    HAS_YOLO = True
except ImportError:
    HAS_YOLO = False
    print("警告: 未检测到 ultralytics 库,请先安装: pip install ultralytics")


class JellyfishApp:
    def __init__(self, root):
        self.root = root
        self.root.title("水母检测中心 - 智能监控台 (Light)")
        self.root.geometry("1150x700")  # 稍微拉高一点以容纳新按钮

        # --- 清爽主题配色 ---
        self.bg_main = "#F0F4F8"  # 整体浅灰蓝背景
        self.bg_panel = "#FFFFFF"  # 纯白卡片面板
        self.fg_title = "#1E293B"  # 深灰蓝标题文字
        self.fg_text = "#334155"  # 正文文字
        self.color_primary = "#0078D4"  # 微软主题蓝
        self.color_success = "#107C10"  # 绿色 (启动按钮)
        self.color_warning = "#D83B01"  # 橘色 (断开按钮)
        self.color_danger = "#A4262C"  # 红色 (退出按钮)
        self.color_border = "#CBD5E1"  # 边框浅灰

        self.root.configure(bg=self.bg_main)

        # --- 核心变量 ---
        self.tk_img = None
        self.is_running = False
        self.server_socket = None
        self.client_conn = None

        self.bind_ip = tk.StringVar(value="0.0.0.0")
        self.bind_port = tk.IntVar(value=8080)
        self.conf_thresh = tk.DoubleVar(value=0.50)

        self.count_var = tk.StringVar(value="0")
        self.status_var = tk.StringVar(value="就绪,请点击【▶ 启动监听】")

        # 【新增变量】:用于保存截图的最新 BGR 原图/带框图
        self.latest_bgr_frame = None
        # 【新增变量】:保存截图的文件夹名称
        self.screenshot_dir = "screenshots"
        if not os.path.exists(self.screenshot_dir):
            os.makedirs(self.screenshot_dir)

        # --- 加载模型 ---
        if HAS_YOLO:
            try:
                print("正在加载 PyTorch 原生模型 best.pt ...")
                self.model = YOLO("best.pt", task='detect')
                print("模型加载成功!")
            except Exception as e:
                messagebox.showerror("模型错误", f"无法加载 best.pt:\n{e}")
                self.model = None
        else:
            self.model = None

        # ==========================================
        # 整体布局:左侧控制台,右侧显示屏
        # ==========================================
        self.left_panel = tk.Frame(root, bg=self.bg_panel, width=320, highlightthickness=1,
                                   highlightbackground=self.color_border)
        self.left_panel.pack(side="left", fill="y", padx=15, pady=15)
        self.left_panel.pack_propagate(False)  # 固定宽度

        self.right_panel = tk.Frame(root, bg=self.bg_main)
        self.right_panel.pack(side="right", expand=True, fill="both", padx=(0, 15), pady=15)

        self._build_left_panel()
        self._build_right_panel()

    # ==========================================
    # 构建左侧控制面板
    # ==========================================
    def _build_left_panel(self):
        # 标题
        tk.Label(self.left_panel, text="控制面板", font=("微软雅黑", 18, "bold"), fg=self.fg_title,
                 bg=self.bg_panel).pack(pady=(15, 10))

        # ---- 网络设置区 ----
        net_frame = tk.LabelFrame(self.left_panel, text=" 网络通信配置 ", font=("微软雅黑", 10, "bold"),
                                  fg=self.fg_text, bg=self.bg_panel, bd=1, highlightthickness=0)
        net_frame.pack(fill="x", padx=15, pady=5)

        btn_get_ip = tk.Button(net_frame, text="🔍 获取本机 IPv4", command=self.show_local_ip, font=("微软雅黑", 10),
                               bg=self.bg_panel, fg=self.color_primary, relief="groove", cursor="hand2")
        btn_get_ip.pack(fill="x", padx=10, pady=(5, 5))

        # IP 填写
        ip_frame = tk.Frame(net_frame, bg=self.bg_panel)
        ip_frame.pack(fill="x", padx=10, pady=3)
        tk.Label(ip_frame, text="监听 IP:", fg=self.fg_text, bg=self.bg_panel, font=("微软雅黑", 10)).pack(side="left")
        tk.Entry(ip_frame, textvariable=self.bind_ip, width=15, bg="#F8FAFC", fg=self.fg_text, highlightthickness=1,
                 highlightbackground=self.color_border, relief="flat").pack(side="right", ipady=2)

        # 端口 填写
        port_frame = tk.Frame(net_frame, bg=self.bg_panel)
        port_frame.pack(fill="x", padx=10, pady=3)
        tk.Label(port_frame, text="监听端口:", fg=self.fg_text, bg=self.bg_panel, font=("微软雅黑", 10)).pack(
            side="left")
        tk.Entry(port_frame, textvariable=self.bind_port, width=15, bg="#F8FAFC", fg=self.fg_text, highlightthickness=1,
                 highlightbackground=self.color_border, relief="flat").pack(side="right", ipady=2)

        # ---- 操作按钮区 ----
        btn_frame = tk.Frame(self.left_panel, bg=self.bg_panel)
        btn_frame.pack(fill="x", padx=15, pady=5)

        btn_start = tk.Button(btn_frame, text="▶ 启动监听", command=self.start_server, font=("微软雅黑", 11, "bold"),
                              bg=self.color_success, fg="white", relief="flat", cursor="hand2")
        btn_start.pack(fill="x", pady=3)

        btn_stop = tk.Button(btn_frame, text="⏹ 断开连接", command=self.stop_server, font=("微软雅黑", 11),
                             bg=self.color_warning, fg="white", relief="flat", cursor="hand2")
        btn_stop.pack(fill="x", pady=3)

        # ---- 模型参数区 ----
        model_frame = tk.LabelFrame(self.left_panel, text=" AI 模型参数 ", font=("微软雅黑", 10, "bold"),
                                    fg=self.fg_text, bg=self.bg_panel, bd=1)
        model_frame.pack(fill="x", padx=15, pady=5)

        tk.Label(model_frame, text="置信度阈值 (Confidence)", fg=self.fg_text, bg=self.bg_panel,
                 font=("微软雅黑", 10)).pack(anchor="w", padx=10, pady=(5, 0))
        conf_slider = tk.Scale(model_frame, from_=0.1, to=1.0, resolution=0.05, orient="horizontal",
                               variable=self.conf_thresh, bg=self.bg_panel, fg=self.color_primary, highlightthickness=0,
                               troughcolor="#E2E8F0", activebackground=self.color_primary)
        conf_slider.pack(fill="x", padx=10, pady=3)

        # ---- 【核心新增】:图像快照与快览区 ----
        snap_frame = tk.LabelFrame(self.left_panel, text=" 📸 图像捕获快照 ", font=("微软雅黑", 10, "bold"),
                                   fg=self.fg_text, bg=self.bg_panel, bd=1)
        snap_frame.pack(fill="x", padx=15, pady=5)

        btn_snap = tk.Button(snap_frame, text="📸 立即截图保存", command=self.take_screenshot,
                             font=("微软雅黑", 10, "bold"),
                             bg=self.color_primary, fg="white", relief="flat", cursor="hand2")
        btn_snap.pack(fill="x", padx=10, pady=5)

        btn_open_folder = tk.Button(snap_frame, text="📂 打开截图文件夹", command=self.open_screenshot_folder,
                                    font=("微软雅黑", 10),
                                    bg=self.bg_panel, fg=self.fg_text, relief="groove", cursor="hand2")
        btn_open_folder.pack(fill="x", padx=10, pady=(0, 5))

        # ---- 底部退出 ----
        tk.Button(self.left_panel, text="❌ 彻底退出系统", command=self.on_close, font=("微软雅黑", 11),
                  bg=self.color_danger, fg="white", relief="flat", cursor="hand2").pack(side="bottom", fill="x",
                                                                                        padx=15, pady=15)

    # ==========================================
    # 构建右侧监控面板
    # ==========================================
    def _build_right_panel(self):
        top_bar = tk.Frame(self.right_panel, bg=self.bg_main, height=40)
        top_bar.pack(fill="x", pady=(0, 5))
        top_bar.pack_propagate(False)
        tk.Label(top_bar, text="📹 实时监控终端", font=("微软雅黑", 14, "bold"), fg=self.fg_title, bg=self.bg_main).pack(
            side="left")

        video_frame = tk.Frame(self.right_panel, bg=self.bg_panel, highlightthickness=1,
                               highlightbackground=self.color_border)
        video_frame.pack(expand=True, fill="both")

        self.img_label = tk.Label(video_frame, bg="#E2E8F0", text="服务未启动\n点击左侧【▶ 启动监听】等待 K230 接入",
                                  font=("微软雅黑", 14), fg="#64748B")
        self.img_label.pack(expand=True, fill="both", padx=2, pady=2)

        bottom_bar = tk.Frame(self.right_panel, bg=self.bg_panel, highlightthickness=1,
                              highlightbackground=self.color_border)
        bottom_bar.pack(fill="x", pady=(15, 0))

        self.status_label = tk.Label(bottom_bar, textvariable=self.status_var, font=("微软雅黑", 11), fg=self.fg_text,
                                     bg=self.bg_panel, anchor="w", padx=10)
        self.status_label.pack(side="left", pady=10)

        dashboard = tk.Frame(bottom_bar, bg="#F8FAFC", highlightthickness=1, highlightbackground=self.color_border)
        dashboard.pack(side="right", padx=10, pady=5)
        tk.Label(dashboard, text="实时水母检出数: ", font=("微软雅黑", 12), fg=self.fg_text, bg="#F8FAFC").pack(
            side="left", padx=(10, 0), pady=5)
        tk.Label(dashboard, textvariable=self.count_var, font=("Arial", 16, "bold"), fg="#E81123", bg="#F8FAFC").pack(
            side="left", padx=(0, 10), pady=5)

    # ==========================================
    # 新增逻辑 1:保存带画框的截图
    # ==========================================
    def take_screenshot(self):
        if self.latest_bgr_frame is None:
            messagebox.showwarning("提示", "当前没有实时视频流,无法截图!")
            return
        try:
            # 采用精确到秒的时间戳作为文件名
            timestamp = time.strftime("%Y%m%d_%H%M%S")
            filename = f"水母捕获_{timestamp}.jpg"
            filepath = os.path.join(self.screenshot_dir, filename)

            # 写入本地存储 (直接写入带 YOLO 画好框的 BGR 原图)
            cv2.imwrite(filepath, self.latest_bgr_frame)
            self.status_var.set(f"📸 截图成功!已保存至: {filename}")
        except Exception as e:
            messagebox.showerror("截图失败", f"保存图片发生异常:\n{e}")

    # ==========================================
    # 新增逻辑 2:一键唤醒本地文件管理器
    # ==========================================
    def open_screenshot_folder(self):
        abs_path = os.path.abspath(self.screenshot_dir)
        try:
            if sys.platform == 'win32':
                os.startfile(abs_path)
            elif sys.platform == 'darwin':  # macOS
                subprocess.Popen(['open', abs_path])
            else:  # Linux 系统
                subprocess.Popen(['xdg-open', abs_path])
        except Exception as e:
            messagebox.showerror("打开失败", f"无法唤醒文件夹:\n{e}")

    def show_local_ip(self):
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(("8.8.8.8", 80))
            ip = s.getsockname()[0]
            s.close()
            self.bind_ip.set(ip)
            messagebox.showinfo("获取成功",
                                f"你的局域网 IP 是: {ip}\n\n已自动填入左侧输入框!\n请确保 K230 代码中的 PC_IP 也修改为该地址。")
        except Exception as e:
            messagebox.showerror("错误", "无法获取局域网 IP,请检查电脑网络连接。")

    def start_server(self):
        if self.is_running:
            messagebox.showinfo("提示", "服务已经在运行中,如需重启请先点击【断开连接】。")
            return
        self.is_running = True
        self.server_thread = threading.Thread(target=self.receive_video_stream)
        self.server_thread.daemon = True
        self.server_thread.start()

    def stop_server(self):
        self.is_running = False
        if self.client_conn:
            try:
                self.client_conn.close()
            except:
                pass
            self.client_conn = None
        if self.server_socket:
            try:
                self.server_socket.close()
            except:
                pass
            self.server_socket = None
        self.status_var.set("⛔ 服务已停止。随时可以重新启动。")
        self.img_label.configure(image="", text="监控已断开\n点击左侧【▶ 启动监听】重新开始")
        self.count_var.set("0")
        self.latest_bgr_frame = None

    def receive_video_stream(self):
        try:
            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            host = self.bind_ip.get().strip()
            port = self.bind_port.get()
            self.server_socket.bind((host, port))
            self.server_socket.listen(1)
            self.root.after(0, lambda: self.status_var.set(f"✅ 监听就绪 ({host}:{port}),等待 K230 接入..."))
        except Exception as e:
            self.root.after(0, lambda: messagebox.showerror("端口错误", f"启动服务失败:\n{e}"))
            self.root.after(0, lambda: self.status_var.set("❌ 服务启动失败"))
            self.is_running = False
            return

        while self.is_running:
            try:
                self.server_socket.settimeout(1.0)
                self.client_conn, addr = self.server_socket.accept()
                self.root.after(0, lambda: self.status_var.set(f"🔗 正在接收 K230 [{addr[0]}] 视频流..."))

                while self.is_running:
                    length_data = self.recvall(self.client_conn, 4)
                    if not length_data: break
                    msg_len = struct.unpack('<I', length_data)[0]

                    img_data = self.recvall(self.client_conn, msg_len)
                    if not img_data: break

                    np_arr = np.frombuffer(img_data, np.uint8)
                    frame = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)

                    if frame is not None:
                        jelly_count = self.process_frame(frame)
                        try:
                            self.client_conn.sendall(struct.pack('<I', jelly_count))
                        except Exception:
                            break
            except socket.timeout:
                continue
            except Exception:
                pass
            finally:
                if self.client_conn:
                    try:
                        self.client_conn.close()
                    except:
                        pass
                if self.is_running:
                    self.root.after(0, lambda: self.status_var.set("⚠️ 设备连接断开,继续等待接入..."))
                    self.root.after(0, lambda: self.img_label.configure(image="", text="K230 连接已断开,等待重连..."))
                    self.latest_bgr_frame = None

    def recvall(self, sock, count):
        buf = b''
        while count:
            newbuf = sock.recv(count)
            if not newbuf: return None
            buf += newbuf
            count -= len(newbuf)
        return buf

    def process_frame(self, frame):
        jelly_count = 0

        if self.model:
            current_conf = self.conf_thresh.get()
            results = self.model(frame, imgsz=640, conf=current_conf, iou=0.45, verbose=False)
            res = results[0]

            if res.boxes is not None:
                jelly_count = int((res.boxes.cls.cpu().numpy() == 2).sum())

            # 【重要更新】:获取画好识别框的 BGR 原生图像
            self.latest_bgr_frame = res.plot()
            img_rgb = cv2.cvtColor(self.latest_bgr_frame, cv2.COLOR_BGR2RGB)
        else:
            self.latest_bgr_frame = frame.copy()
            img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        self.root.after(0, self.update_ui, img_rgb, jelly_count)
        return jelly_count

    def update_ui(self, img_rgb, jelly_count):
        w = self.img_label.winfo_width()
        h = self.img_label.winfo_height()
        if w < 100 or h < 100:
            w, h = 800, 480

        img_pil = Image.fromarray(img_rgb)
        img_pil.thumbnail((w, h), Image.LANCZOS)

        self.tk_img = ImageTk.PhotoImage(img_pil)
        self.img_label.configure(image=self.tk_img, text="")
        self.count_var.set(str(jelly_count))

    def on_close(self):
        self.stop_server()
        self.root.destroy()


if __name__ == "__main__":
    root = tk.Tk()
    app = JellyfishApp(root)
    root.protocol("WM_DELETE_WINDOW", app.on_close)
    root.mainloop()

在K230里需要填写你的WIFI,如果不支持wifi的无法使用这个代码

最终效果

最终的成品在这里
链接:https://pan.quark.cn/s/f8a9a662b31c

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐