|
@@ -0,0 +1,1270 @@
|
|
|
|
|
+"""
|
|
|
|
|
+全景摄像头模块
|
|
|
|
|
+负责获取视频流和物体检测
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+import os
|
|
|
|
|
+# 必须在导入cv2之前设置,防止FFmpeg多线程解码崩溃
|
|
|
|
|
+# pthread_frame.c:167 async_lock assertion
|
|
|
|
|
+os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp|threads;1'
|
|
|
|
|
+
|
|
|
|
|
+import cv2
|
|
|
|
|
+import numpy as np
|
|
|
|
|
+import threading
|
|
|
|
|
+import queue
|
|
|
|
|
+import time
|
|
|
|
|
+import logging
|
|
|
|
|
+from datetime import datetime
|
|
|
|
|
+from typing import Optional, List, Tuple, Dict, Any
|
|
|
|
|
+from dataclasses import dataclass
|
|
|
|
|
+from pathlib import Path
|
|
|
|
|
+
|
|
|
|
|
+from config import DETECTION_CONFIG
|
|
|
|
|
+from config.camera import parse_resolution
|
|
|
|
|
+from video_lock import safe_read, safe_is_opened
|
|
|
|
|
+from inference_backend import nms
|
|
|
|
|
+
|
|
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class DetectedObject:
|
|
|
|
|
+ """检测到的物体"""
|
|
|
|
|
+ class_name: str # 类别名称
|
|
|
|
|
+ confidence: float # 置信度
|
|
|
|
|
+ bbox: Tuple[int, int, int, int] # 边界框 (x, y, width, height)
|
|
|
|
|
+ center: Tuple[int, int] # 中心点坐标
|
|
|
|
|
+ track_id: Optional[int] = None # 跟踪ID
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class PanoramaCamera:
|
|
|
|
|
+ """全景摄像头类"""
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self, camera_config: Dict = None):
|
|
|
|
|
+ """
|
|
|
|
|
+ 初始化全景摄像头
|
|
|
|
|
+ Args:
|
|
|
|
|
+ camera_config: 摄像头配置
|
|
|
|
|
+ """
|
|
|
|
|
+ self.config = camera_config or {}
|
|
|
|
|
+
|
|
|
|
|
+ # 解析期望分辨率
|
|
|
|
|
+ self.frame_width, self.frame_height = parse_resolution(self.config.get('resolution'))
|
|
|
|
|
+
|
|
|
|
|
+ # 摄像头品牌 / SDK 使用策略
|
|
|
|
|
+ # brand: 'dahua' | 'hikvision' | 'uniview' | 'auto'
|
|
|
|
|
+ # use_sdk: True 时使用大华 SDK 登录;False 时仅使用 RTSP 取流
|
|
|
|
|
+ self.brand = self.config.get('brand', 'auto').lower()
|
|
|
|
|
+ self.use_sdk = self.config.get('use_sdk', self.brand != 'hikvision')
|
|
|
|
|
+ if self.brand == 'hikvision':
|
|
|
|
|
+ self.use_sdk = False
|
|
|
|
|
+
|
|
|
|
|
+ self.login_handle = None
|
|
|
|
|
+ self.play_handle = None
|
|
|
|
|
+ self.connected = False
|
|
|
|
|
+
|
|
|
|
|
+ # 视频流
|
|
|
|
|
+ self.frame_queue = queue.Queue(maxsize=10)
|
|
|
|
|
+ self.current_frame = None
|
|
|
|
|
+ self.frame_lock = threading.Lock()
|
|
|
|
|
+ self.rtsp_cap = None # RTSP视频捕获
|
|
|
|
|
+ self._camera_id = 'panorama' # 用于per-camera锁
|
|
|
|
|
+
|
|
|
|
|
+ # 检测器
|
|
|
|
|
+ self.detector = None
|
|
|
|
|
+
|
|
|
|
|
+ # 控制标志
|
|
|
|
|
+ self.running = False
|
|
|
|
|
+ self.stream_thread = None
|
|
|
|
|
+
|
|
|
|
|
+ # 断线重连
|
|
|
|
|
+ self.auto_reconnect = True
|
|
|
|
|
+ self.reconnect_interval = 5.0 # 重连间隔(秒)
|
|
|
|
|
+ self.max_reconnect_attempts = 3 # 最大重连次数
|
|
|
|
|
+
|
|
|
|
|
+ def connect(self) -> bool:
|
|
|
|
|
+ """
|
|
|
|
|
+ 连接摄像头
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 是否成功
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.use_sdk:
|
|
|
|
|
+ print(f"[PanoramaCamera] {self.config.get('ip')} 配置为 RTSP-only 模式,跳过 SDK 登录")
|
|
|
|
|
+ self.connected = True
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ login_handle, error = self.sdk.login(
|
|
|
|
|
+ self.config['ip'],
|
|
|
|
|
+ self.config['port'],
|
|
|
|
|
+ self.config['username'],
|
|
|
|
|
+ self.config['password']
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ if login_handle is None:
|
|
|
|
|
+ print(f"连接全景摄像头失败: IP={self.config['ip']}, 错误码={error}")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ self.login_handle = login_handle
|
|
|
|
|
+ self.connected = True
|
|
|
|
|
+ print(f"成功连接全景摄像头: {self.config['ip']}")
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ def disconnect(self):
|
|
|
|
|
+ """断开连接"""
|
|
|
|
|
+ self.stop_stream()
|
|
|
|
|
+ if self.use_sdk and self.login_handle:
|
|
|
|
|
+ self.sdk.logout(self.login_handle)
|
|
|
|
|
+ self.login_handle = None
|
|
|
|
|
+ self.connected = False
|
|
|
|
|
+
|
|
|
|
|
+ def is_connected(self) -> bool:
|
|
|
|
|
+ """是否已连接"""
|
|
|
|
|
+ return self.connected
|
|
|
|
|
+
|
|
|
|
|
+ def start_stream(self) -> bool:
|
|
|
|
|
+ """
|
|
|
|
|
+ 开始视频流 (SDK 模式,仅 Dahua 等品牌支持)
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 是否成功
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self.connected:
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ if not self.use_sdk:
|
|
|
|
|
+ print("[PanoramaCamera] 当前为 RTSP-only 模式,跳过 SDK 视频流")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ self.play_handle = self.sdk.real_play(
|
|
|
|
|
+ self.login_handle,
|
|
|
|
|
+ self.config['channel']
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ if self.play_handle is None:
|
|
|
|
|
+ print("启动视频流失败")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ self.running = True
|
|
|
|
|
+ self.stream_thread = threading.Thread(target=self._stream_worker, daemon=True)
|
|
|
|
|
+ self.stream_thread.start()
|
|
|
|
|
+
|
|
|
|
|
+ print("视频流已启动")
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ def start_stream_rtsp(self, rtsp_url: str = None) -> bool:
|
|
|
|
|
+ if rtsp_url is None:
|
|
|
|
|
+ rtsp_url = self.config.get('rtsp_url') or f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/h264/ch{self.config['channel']}/main/av_stream"
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 先尝试FFmpeg后端
|
|
|
|
|
+ self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
|
|
|
|
|
+ if not self.rtsp_cap.isOpened():
|
|
|
|
|
+ # FFmpeg失败,尝试GStreamer后端
|
|
|
|
|
+ print(f"FFmpeg后端无法打开RTSP流,尝试GStreamer后端...")
|
|
|
|
|
+ try:
|
|
|
|
|
+ gst_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_GSTREAMER)
|
|
|
|
|
+ if gst_cap.isOpened():
|
|
|
|
|
+ self.rtsp_cap = gst_cap
|
|
|
|
|
+ print(f"使用GStreamer后端打开RTSP流成功")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f"无法打开RTSP流: {rtsp_url}")
|
|
|
|
|
+ return False
|
|
|
|
|
+ except Exception as ge:
|
|
|
|
|
+ print(f"GStreamer后端也不可用: {ge}")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
|
|
|
+
|
|
|
|
|
+ self.running = True
|
|
|
|
|
+ self.stream_thread = threading.Thread(target=self._rtsp_stream_worker, daemon=True)
|
|
|
|
|
+ self.stream_thread.start()
|
|
|
|
|
+ print(f"RTSP视频流已启动: {rtsp_url} (期望分辨率 {self.frame_width}x{self.frame_height})")
|
|
|
|
|
+ return True
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"RTSP流启动失败: {e}")
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ def _stream_worker(self):
|
|
|
|
|
+ """视频流工作线程 (SDK模式)"""
|
|
|
|
|
+ retry_count = 0
|
|
|
|
|
+ max_retries = 10
|
|
|
|
|
+
|
|
|
|
|
+ while self.running:
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 尝试从 SDK 帧缓冲区获取帧 (如果可用)
|
|
|
|
|
+ frame_buffer = self.sdk.get_video_frame_buffer(self.config['channel'])
|
|
|
|
|
+
|
|
|
|
|
+ if frame_buffer:
|
|
|
|
|
+ frame_info = frame_buffer.get(timeout=0.1)
|
|
|
|
|
+ if frame_info and frame_info.get('data'):
|
|
|
|
|
+ # 解码帧数据 (如果需要)
|
|
|
|
|
+ # 注意: SDK回调返回的是编码数据,需要解码
|
|
|
|
|
+ # 这里暂时跳过,因为解码需要额外处理
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ # RTSP 模式获取帧 (推荐方式)
|
|
|
|
|
+ if self.rtsp_cap is not None and safe_is_opened(self.rtsp_cap, self._camera_id):
|
|
|
|
|
+ ret, frame = safe_read(self.rtsp_cap, self._camera_id)
|
|
|
|
|
+ if ret and frame is not None:
|
|
|
|
|
+ with self.frame_lock:
|
|
|
|
|
+ self.current_frame = frame.copy()
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ self.frame_queue.put(frame.copy(), block=False)
|
|
|
|
|
+ except queue.Full:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ retry_count = 0 # 重置重试计数
|
|
|
|
|
+ time.sleep(0.001) # 减少CPU占用
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 如果 RTSP 不可用,尝试自动连接
|
|
|
|
|
+ if retry_count < max_retries:
|
|
|
|
|
+ rtsp_url = self._build_rtsp_url()
|
|
|
|
|
+ try:
|
|
|
|
|
+ if self.rtsp_cap is None:
|
|
|
|
|
+ self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
|
|
|
|
|
+ self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 减少缓冲延迟
|
|
|
|
|
+
|
|
|
|
|
+ if safe_is_opened(self.rtsp_cap, self._camera_id):
|
|
|
|
|
+ retry_count = 0
|
|
|
|
|
+ continue
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ retry_count += 1
|
|
|
|
|
+ time.sleep(1.0) # 重试间隔
|
|
|
|
|
+ else:
|
|
|
|
|
+ # 超过最大重试次数,使用与配置分辨率一致的模拟帧
|
|
|
|
|
+ frame = np.zeros((self.frame_height, self.frame_width, 3), dtype=np.uint8)
|
|
|
|
|
+
|
|
|
|
|
+ with self.frame_lock:
|
|
|
|
|
+ self.current_frame = frame
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ self.frame_queue.put(frame, block=False)
|
|
|
|
|
+ except queue.Full:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ time.sleep(0.1)
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ err_str = str(e)
|
|
|
|
|
+ if 'async_lock' in err_str or 'Assertion' in err_str:
|
|
|
|
|
+ print(f"视频流FFmpeg内部错误,重建连接: {e}")
|
|
|
|
|
+ self._reconnect_rtsp()
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f"视频流错误: {e}")
|
|
|
|
|
+ time.sleep(0.5)
|
|
|
|
|
+
|
|
|
|
|
+ def _build_rtsp_url(self) -> str:
|
|
|
|
|
+ return self.config.get('rtsp_url') or f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/h264/ch{self.config['channel']}/main/av_stream"
|
|
|
|
|
+
|
|
|
|
|
+ def _rtsp_stream_worker(self):
|
|
|
|
|
+ """RTSP视频流工作线程"""
|
|
|
|
|
+ import signal
|
|
|
|
|
+ # 屏蔽SIGINT在此线程,由主线程处理
|
|
|
|
|
+ if hasattr(signal, 'pthread_sigmask'):
|
|
|
|
|
+ try:
|
|
|
|
|
+ signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
|
|
|
|
|
+ except (AttributeError, OSError):
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ max_consecutive_errors = 50
|
|
|
|
|
+ error_count = 0
|
|
|
|
|
+
|
|
|
|
|
+ while self.running:
|
|
|
|
|
+ try:
|
|
|
|
|
+ if self.rtsp_cap is None or not safe_is_opened(self.rtsp_cap, self._camera_id):
|
|
|
|
|
+ time.sleep(0.1)
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ ret, frame = safe_read(self.rtsp_cap, self._camera_id)
|
|
|
|
|
+ if not ret or frame is None:
|
|
|
|
|
+ error_count += 1
|
|
|
|
|
+ if error_count > max_consecutive_errors:
|
|
|
|
|
+ print(f"全景RTSP流连续{max_consecutive_errors}次读取失败,尝试重连...")
|
|
|
|
|
+ self._reconnect_rtsp()
|
|
|
|
|
+ error_count = 0
|
|
|
|
|
+ time.sleep(0.01)
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ error_count = 0
|
|
|
|
|
+
|
|
|
|
|
+ # 记录实际分辨率,仅做校验与提示(不做拉伸缩放,避免丢精度)
|
|
|
|
|
+ actual_h, actual_w = frame.shape[:2]
|
|
|
|
|
+ if not getattr(self, '_resolution_logged', False):
|
|
|
|
|
+ print(f"全景摄像头实际分辨率: {actual_w}x{actual_h},期望分辨率: "
|
|
|
|
|
+ f"{self.frame_width}x{self.frame_height}")
|
|
|
|
|
+ self._resolution_logged = True
|
|
|
|
|
+ if (actual_w, actual_h) != (self.frame_width, self.frame_height):
|
|
|
|
|
+ if not getattr(self, '_resolution_warned', False):
|
|
|
|
|
+ logger.warning(
|
|
|
|
|
+ f"全景摄像头分辨率 {actual_w}x{actual_h} 与期望分辨率 "
|
|
|
|
|
+ f"{self.frame_width}x{self.frame_height} 不一致,"
|
|
|
|
|
+ f"模型推理时将使用 letterbox 灰度填充保持比例"
|
|
|
|
|
+ )
|
|
|
|
|
+ self._resolution_warned = True
|
|
|
|
|
+
|
|
|
|
|
+ with self.frame_lock:
|
|
|
|
|
+ self.current_frame = frame.copy()
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ self.frame_queue.put(frame, block=False)
|
|
|
|
|
+ except queue.Full:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ err_str = str(e)
|
|
|
|
|
+ if 'async_lock' in err_str or 'Assertion' in err_str:
|
|
|
|
|
+ print(f"全景RTSP流FFmpeg内部错误,3秒后重建连接: {e}")
|
|
|
|
|
+ time.sleep(3)
|
|
|
|
|
+ self._reconnect_rtsp()
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f"全景RTSP视频流错误: {e}")
|
|
|
|
|
+ time.sleep(0.5)
|
|
|
|
|
+
|
|
|
|
|
+ def _reconnect_rtsp(self):
|
|
|
|
|
+ """重建RTSP连接"""
|
|
|
|
|
+ rtsp_url = self._build_rtsp_url()
|
|
|
|
|
+ if self.rtsp_cap is not None:
|
|
|
|
|
+ try:
|
|
|
|
|
+ self.rtsp_cap.release()
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ pass
|
|
|
|
|
+ self.rtsp_cap = None
|
|
|
|
|
+
|
|
|
|
|
+ time.sleep(1)
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
|
|
|
|
|
+ if safe_is_opened(self.rtsp_cap, self._camera_id):
|
|
|
|
|
+ self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
|
|
|
+ print("全景RTSP流重连成功")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("全景RTSP流重连失败")
|
|
|
|
|
+ self.rtsp_cap = None
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"全景RTSP流重连异常: {e}")
|
|
|
|
|
+ self.rtsp_cap = None
|
|
|
|
|
+
|
|
|
|
|
+ def stop_stream(self):
|
|
|
|
|
+ """停止视频流"""
|
|
|
|
|
+ self.running = False
|
|
|
|
|
+ if self.stream_thread:
|
|
|
|
|
+ self.stream_thread.join(timeout=2)
|
|
|
|
|
+ if self.play_handle:
|
|
|
|
|
+ self.sdk.stop_real_play(self.play_handle)
|
|
|
|
|
+ self.play_handle = None
|
|
|
|
|
+ if self.rtsp_cap:
|
|
|
|
|
+ self.rtsp_cap.release()
|
|
|
|
|
+ self.rtsp_cap = None
|
|
|
|
|
+
|
|
|
|
|
+ def get_frame(self) -> Optional[np.ndarray]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 获取当前帧
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 当前帧图像
|
|
|
|
|
+ """
|
|
|
|
|
+ with self.frame_lock:
|
|
|
|
|
+ return self.current_frame.copy() if self.current_frame is not None else None
|
|
|
|
|
+
|
|
|
|
|
+ def get_frame_from_queue(self, timeout: float = 0.1) -> Optional[np.ndarray]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 从帧队列获取帧 (用于批量处理)
|
|
|
|
|
+ Args:
|
|
|
|
|
+ timeout: 等待超时时间
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 帧图像或None
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ return self.frame_queue.get(timeout=timeout)
|
|
|
|
|
+ except:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ def get_frame_buffer(self, count: int = 5) -> List[np.ndarray]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 获取帧缓冲 (用于运动检测等需要多帧的场景)
|
|
|
|
|
+ Args:
|
|
|
|
|
+ count: 获取帧数
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 帧列表
|
|
|
|
|
+ """
|
|
|
|
|
+ frames = []
|
|
|
|
|
+ while len(frames) < count:
|
|
|
|
|
+ frame = self.get_frame_from_queue(timeout=0.05)
|
|
|
|
|
+ if frame is not None:
|
|
|
|
|
+ frames.append(frame)
|
|
|
|
|
+ else:
|
|
|
|
|
+ break
|
|
|
|
|
+ return frames
|
|
|
|
|
+
|
|
|
|
|
+ def set_detector(self, detector):
|
|
|
|
|
+ """设置物体检测器"""
|
|
|
|
|
+ self.detector = detector
|
|
|
|
|
+
|
|
|
|
|
+ def detect_objects(self, frame: np.ndarray = None) -> List[DetectedObject]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 检测物体
|
|
|
|
|
+ Args:
|
|
|
|
|
+ frame: 输入帧,如果为None则使用当前帧
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 检测到的物体列表
|
|
|
|
|
+ """
|
|
|
|
|
+ if frame is None:
|
|
|
|
|
+ frame = self.get_frame()
|
|
|
|
|
+
|
|
|
|
|
+ if frame is None or self.detector is None:
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ return self.detector.detect(frame)
|
|
|
|
|
+
|
|
|
|
|
+ def get_detection_position(self, obj: DetectedObject,
|
|
|
|
|
+ frame_size: Tuple[int, int]) -> Tuple[float, float]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 获取检测物体在画面中的相对位置
|
|
|
|
|
+ Args:
|
|
|
|
|
+ obj: 检测到的物体
|
|
|
|
|
+ frame_size: 画面尺寸 (width, height)
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 相对位置 (x_ratio, y_ratio) 范围0-1
|
|
|
|
|
+ """
|
|
|
|
|
+ width, height = frame_size
|
|
|
|
|
+ x_ratio = obj.center[0] / width
|
|
|
|
|
+ y_ratio = obj.center[1] / height
|
|
|
|
|
+ return (x_ratio, y_ratio)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class ObjectDetector:
|
|
|
|
|
+ """
|
|
|
|
|
+ 物体检测器
|
|
|
|
|
+ 使用YOLO11模型进行人体检测
|
|
|
|
|
+ 支持 YOLO (.pt), RKNN (.rknn), ONNX (.onnx) 模型
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self, model_path: str = None, use_gpu: bool = True, model_size: str = 'n',
|
|
|
|
|
+ model_type: str = 'auto'):
|
|
|
|
|
+ """
|
|
|
|
|
+ 初始化检测器
|
|
|
|
|
+ Args:
|
|
|
|
|
+ model_path: 模型路径 (支持 .pt, .rknn, .onnx)
|
|
|
|
|
+ use_gpu: 是否使用GPU
|
|
|
|
|
+ model_size: 模型尺寸 ('n', 's', 'm', 'l', 'x') - 仅 YOLO 模型有效
|
|
|
|
|
+ model_type: 模型类型 ('auto', 'yolo', 'rknn', 'onnx')
|
|
|
|
|
+ """
|
|
|
|
|
+ self.model = None
|
|
|
|
|
+ self.rknn_detector = None
|
|
|
|
|
+ self.model_path = model_path
|
|
|
|
|
+ self.use_gpu = use_gpu
|
|
|
|
|
+ self.model_size = model_size
|
|
|
|
|
+ self.model_type = model_type
|
|
|
|
|
+ self.is_end2end = False
|
|
|
|
|
+ self.config = DETECTION_CONFIG
|
|
|
|
|
+ self.device = 'cuda:0' if use_gpu else 'cpu'
|
|
|
|
|
+
|
|
|
|
|
+ # 检测图片保存配置
|
|
|
|
|
+ self._save_image_enabled = self.config.get('save_detection_image', False)
|
|
|
|
|
+ self._image_save_dir = Path(self.config.get('detection_image_dir', './detection_images'))
|
|
|
|
|
+ self._image_max_count = self.config.get('detection_image_max_count', 1000)
|
|
|
|
|
+ self._last_save_time = 0
|
|
|
|
|
+ # 保存间隔:优先使用配置值,否则基于检测帧率计算(检测间隔的1.5倍)
|
|
|
|
|
+ detection_fps = self.config.get('detection_fps', 2)
|
|
|
|
|
+ self._save_interval = self.config.get('save_interval', 1.5 / detection_fps)
|
|
|
|
|
+
|
|
|
|
|
+ # 创建保存目录
|
|
|
|
|
+ if self._save_image_enabled:
|
|
|
|
|
+ self._ensure_save_dir()
|
|
|
|
|
+
|
|
|
|
|
+ # 根据扩展名自动判断模型类型
|
|
|
|
|
+ if model_path:
|
|
|
|
|
+ ext = os.path.splitext(model_path)[1].lower()
|
|
|
|
|
+ if ext == '.rknn':
|
|
|
|
|
+ self.model_type = 'rknn'
|
|
|
|
|
+ elif ext == '.onnx':
|
|
|
|
|
+ self.model_type = 'onnx'
|
|
|
|
|
+ elif ext == '.pt':
|
|
|
|
|
+ self.model_type = 'yolo'
|
|
|
|
|
+ # end2end 模型(内置NMS),输出格式 (N, 6) = (x1,y1,x2,y2,conf,cls)
|
|
|
|
|
+ if 'end2end' in os.path.basename(model_path).lower():
|
|
|
|
|
+ self.is_end2end = True
|
|
|
|
|
+
|
|
|
|
|
+ self._load_model()
|
|
|
|
|
+
|
|
|
|
|
+ def _load_model(self):
|
|
|
|
|
+ """加载检测模型"""
|
|
|
|
|
+ if self.model_type == 'rknn':
|
|
|
|
|
+ self._load_rknn_model()
|
|
|
|
|
+ elif self.model_type == 'onnx':
|
|
|
|
|
+ self._load_onnx_model()
|
|
|
|
|
+ else:
|
|
|
|
|
+ self._load_yolo_model()
|
|
|
|
|
+
|
|
|
|
|
+ def _load_rknn_model(self):
|
|
|
|
|
+ """加载 RKNN 模型"""
|
|
|
|
|
+ if not self.model_path:
|
|
|
|
|
+ raise ValueError("RKNN 模型需要指定 model_path")
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ from rknnlite.api import RKNNLite
|
|
|
|
|
+
|
|
|
|
|
+ self.rknn = RKNNLite()
|
|
|
|
|
+ ret = self.rknn.load_rknn(self.model_path)
|
|
|
|
|
+ if ret != 0:
|
|
|
|
|
+ raise RuntimeError(f"加载 RKNN 模型失败: {self.model_path}")
|
|
|
|
|
+
|
|
|
|
|
+ ret = self.rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2)
|
|
|
|
|
+ if ret != 0:
|
|
|
|
|
+ raise RuntimeError(f"初始化 RKNN 运行时失败")
|
|
|
|
|
+
|
|
|
|
|
+ print(f"RKNN 模型加载成功: {self.model_path}")
|
|
|
|
|
+ except ImportError:
|
|
|
|
|
+ raise ImportError("未安装 rknnlite,请运行: pip install rknnlite2")
|
|
|
|
|
+
|
|
|
|
|
+ def _load_onnx_model(self):
|
|
|
|
|
+ """加载 ONNX 模型(优先 ONNX Runtime,回退 Ultralytics)"""
|
|
|
|
|
+ if not self.model_path:
|
|
|
|
|
+ raise ValueError("ONNX 模型需要指定 model_path")
|
|
|
|
|
+
|
|
|
|
|
+ # 尝试直接使用 ONNX Runtime(end2end 格式)
|
|
|
|
|
+ try:
|
|
|
|
|
+ import onnxruntime as ort
|
|
|
|
|
+ available = ort.get_available_providers()
|
|
|
|
|
+ providers = [p for p in ['CPUExecutionProvider'] if p in available]
|
|
|
|
|
+ session = ort.InferenceSession(self.model_path, providers=providers)
|
|
|
|
|
+ self.session = session
|
|
|
|
|
+ self.input_name = session.get_inputs()[0].name
|
|
|
|
|
+ self.is_end2end = True # 导出的 ONNX 为 end2end 格式 (1, 300, 6)
|
|
|
|
|
+ print(f"ONNX 模型加载成功: {self.model_path} (via ONNX Runtime, providers={providers})")
|
|
|
|
|
+ return
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"ONNX Runtime 加载失败,尝试 Ultralytics: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ from ultralytics import YOLO
|
|
|
|
|
+ self.model = YOLO(self.model_path, task='detect')
|
|
|
|
|
+ self.device = 'cpu'
|
|
|
|
|
+ print(f"ONNX 模型加载成功: {self.model_path} (via Ultralytics, device=cpu)")
|
|
|
|
|
+ except ImportError:
|
|
|
|
|
+ raise ImportError("未安装 ultralytics")
|
|
|
|
|
+
|
|
|
|
|
+ def _load_yolo_model(self):
|
|
|
|
|
+ """加载YOLO11检测模型"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ from ultralytics import YOLO
|
|
|
|
|
+
|
|
|
|
|
+ if self.model_path:
|
|
|
|
|
+ self.model = YOLO(self.model_path)
|
|
|
|
|
+ else:
|
|
|
|
|
+ model_name = f'yolo11{self.model_size}.pt'
|
|
|
|
|
+ self.model = YOLO(model_name)
|
|
|
|
|
+
|
|
|
|
|
+ dummy = np.zeros((640, 640, 3), dtype=np.uint8)
|
|
|
|
|
+ self.model(dummy, device=self.device, verbose=False)
|
|
|
|
|
+
|
|
|
|
|
+ print(f"成功加载YOLO11检测模型 (device={self.device})")
|
|
|
|
|
+ except ImportError:
|
|
|
|
|
+ print("未安装ultralytics,请运行: pip install ultralytics")
|
|
|
|
|
+ self._load_opencv_model()
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"加载YOLO11模型失败: {e}")
|
|
|
|
|
+ self._load_opencv_model()
|
|
|
|
|
+
|
|
|
|
|
+ def _load_opencv_model(self):
|
|
|
|
|
+ """使用OpenCV加载模型"""
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ def _ensure_save_dir(self):
|
|
|
|
|
+ """确保保存目录存在"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ self._image_save_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
+ logger.info(f"检测图片保存目录: {self._image_save_dir}")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"创建检测图片目录失败: {e}")
|
|
|
|
|
+ self._save_image_enabled = False
|
|
|
|
|
+
|
|
|
|
|
+ def _cleanup_old_images(self):
|
|
|
|
|
+ """清理旧图片,保持目录下图片数量不超过上限"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ image_files = list(self._image_save_dir.glob("*.jpg"))
|
|
|
|
|
+ if len(image_files) > self._image_max_count:
|
|
|
|
|
+ # 按修改时间排序,删除最旧的
|
|
|
|
|
+ image_files.sort(key=lambda x: x.stat().st_mtime)
|
|
|
|
|
+ to_delete = image_files[:len(image_files) - self._image_max_count]
|
|
|
|
|
+ for f in to_delete:
|
|
|
|
|
+ f.unlink()
|
|
|
|
|
+ logger.info(f"已清理 {len(to_delete)} 张旧检测图片")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"清理旧图片失败: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ def _save_detection_image(self, frame: np.ndarray, detections: List[DetectedObject]):
|
|
|
|
|
+ """
|
|
|
|
|
+ 保存带有检测标记的图片(只标记达到置信度阈值的人)
|
|
|
|
|
+ Args:
|
|
|
|
|
+ frame: 原始图像
|
|
|
|
|
+ detections: 检测结果列表
|
|
|
|
|
+ """
|
|
|
|
|
+ if not self._save_image_enabled or not detections:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ # 检查保存间隔
|
|
|
|
|
+ current_time = time.time()
|
|
|
|
|
+ if current_time - self._last_save_time < self._save_interval:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 复制图像避免修改原图
|
|
|
|
|
+ marked_frame = frame.copy()
|
|
|
|
|
+
|
|
|
|
|
+ # 置信度阈值(人员检测用更高阈值)
|
|
|
|
|
+ person_threshold = self.config.get('person_threshold', 0.8)
|
|
|
|
|
+
|
|
|
|
|
+ # 只标记达到阈值的人
|
|
|
|
|
+ person_count = 0
|
|
|
|
|
+
|
|
|
|
|
+ for det in detections:
|
|
|
|
|
+ # 只处理人且达到阈值
|
|
|
|
|
+ is_person = det.class_name in ['person']
|
|
|
|
|
+ if not is_person:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 未达阈值的不标记
|
|
|
|
|
+ if det.confidence < person_threshold:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ x, y, w, h = det.bbox
|
|
|
|
|
+
|
|
|
|
|
+ # 绘制边界框(绿色)
|
|
|
|
|
+ cv2.rectangle(marked_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
|
|
|
|
|
+
|
|
|
|
|
+ # 绘制序号标签
|
|
|
|
|
+ label = f"person_{person_count}"
|
|
|
|
|
+ person_count += 1
|
|
|
|
|
+
|
|
|
|
|
+ (label_w, label_h), baseline = cv2.getTextSize(
|
|
|
|
|
+ label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
|
|
|
|
|
+ )
|
|
|
|
|
+ cv2.rectangle(
|
|
|
|
|
+ marked_frame,
|
|
|
|
|
+ (x, y - label_h - 8),
|
|
|
|
|
+ (x + label_w, y),
|
|
|
|
|
+ (0, 255, 0),
|
|
|
|
|
+ -1
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 绘制标签文字(黑色)
|
|
|
|
|
+ cv2.putText(
|
|
|
|
|
+ marked_frame, label,
|
|
|
|
|
+ (x, y - 4),
|
|
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.8,
|
|
|
|
|
+ (0, 0, 0), 2
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ # 无有效目标则不保存
|
|
|
|
|
+ if person_count == 0:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ # 生成文件名(时间戳+有效人数)
|
|
|
|
|
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
|
|
|
|
+ filename = f"panorama_{timestamp}_n{person_count}.png"
|
|
|
|
|
+ filepath = self._image_save_dir / filename
|
|
|
|
|
+
|
|
|
|
|
+ # 保存图片(PNG无损格式,不压缩)
|
|
|
|
|
+ cv2.imwrite(str(filepath), marked_frame)
|
|
|
|
|
+ self._last_save_time = current_time
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"[全景] 已保存检测图片: {filepath},有效人数 {person_count} (阈值={person_threshold})")
|
|
|
|
|
+
|
|
|
|
|
+ # 定期清理旧图片
|
|
|
|
|
+ self._cleanup_old_images()
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"[全景] 保存检测图片失败: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ def _letterbox(self, image, size=(640, 640)):
|
|
|
|
|
+ """Letterbox 预处理"""
|
|
|
|
|
+ h0, w0 = image.shape[:2]
|
|
|
|
|
+ ih, iw = size
|
|
|
|
|
+ scale = min(iw / w0, ih / h0)
|
|
|
|
|
+ new_w, new_h = int(w0 * scale), int(h0 * scale)
|
|
|
|
|
+ pad_w = (iw - new_w) // 2
|
|
|
|
|
+ pad_h = (ih - new_h) // 2
|
|
|
|
|
+ resized = cv2.resize(image, (new_w, new_h))
|
|
|
|
|
+ canvas = np.full((ih, iw, 3), 114, dtype=np.uint8)
|
|
|
|
|
+ canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized
|
|
|
|
|
+ return canvas, scale, pad_w, pad_h, h0, w0
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _make_grid(h: int, w: int):
|
|
|
|
|
+ """生成特征图网格坐标"""
|
|
|
|
|
+ yv, xv = np.meshgrid(np.arange(h), np.arange(w), indexing='ij')
|
|
|
|
|
+ return np.stack([xv, yv], axis=-1).reshape(-1, 2)
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _pseudo_person_confidence(bboxes: np.ndarray, orig_h: int, orig_w: int) -> np.ndarray:
|
|
|
|
|
+ """
|
|
|
|
|
+ 当 RKNN 模型 cls 输出恒定时,根据 bbox 形状生成伪置信度。
|
|
|
|
|
+ bboxes: (N, 4) [x1, y1, x2, y2] 原始图像坐标
|
|
|
|
|
+ """
|
|
|
|
|
+ x1, y1, x2, y2 = bboxes[:, 0], bboxes[:, 1], bboxes[:, 2], bboxes[:, 3]
|
|
|
|
|
+ w = np.maximum(1.0, x2 - x1)
|
|
|
|
|
+ h = np.maximum(1.0, y2 - y1)
|
|
|
|
|
+ aspect = h / w
|
|
|
|
|
+
|
|
|
|
|
+ # 人体宽高比通常在 1.5 ~ 4.0 之间,以 2.5 为最佳
|
|
|
|
|
+ aspect_score = np.exp(-0.5 * ((aspect - 2.5) / 1.0) ** 2)
|
|
|
|
|
+
|
|
|
|
|
+ # 面积占画面比例,过大/过小都降权
|
|
|
|
|
+ area = w * h
|
|
|
|
|
+ img_area = orig_w * orig_h
|
|
|
|
|
+ area_ratio = area / img_area
|
|
|
|
|
+ size_score = np.clip(area_ratio * 80, 0.0, 1.0) # 占画面 1.25% 以上得满分
|
|
|
|
|
+
|
|
|
|
|
+ # 综合伪置信度,范围 0.55 ~ 0.95
|
|
|
|
|
+ conf = 0.55 + 0.30 * aspect_score + 0.10 * size_score
|
|
|
|
|
+ return conf
|
|
|
|
|
+
|
|
|
|
|
+ def _decode_yolo11_outputs(self, outputs: list, canvas_size: tuple,
|
|
|
|
|
+ scale: float, pad_w: int, pad_h: int,
|
|
|
|
|
+ orig_h: int, orig_w: int,
|
|
|
|
|
+ conf_threshold: float = 0.5,
|
|
|
|
|
+ iou_threshold: float = 0.45) -> list:
|
|
|
|
|
+ """
|
|
|
|
|
+ 解码 YOLO11 RKNN 多输出格式 (DFL bbox + cls + mask,每尺度 3 个输出)
|
|
|
|
|
+
|
|
|
|
|
+ outputs: [bbox_80x80, cls_80x80, mask_80x80, bbox_40x40, cls_40x40, ...]
|
|
|
|
|
+ 返回: [[x1, y1, x2, y2, score, class_id], ...] (原始图像坐标)
|
|
|
|
|
+ """
|
|
|
|
|
+ strides = [8, 16, 32]
|
|
|
|
|
+ reg_max = 16
|
|
|
|
|
+ dets = []
|
|
|
|
|
+ # 记录 cls 输出是否异常(常量),用于后续 fallback
|
|
|
|
|
+ cls_constant = True
|
|
|
|
|
+
|
|
|
|
|
+ for scale_idx, stride in enumerate(strides):
|
|
|
|
|
+ bbox_out = outputs[scale_idx * 3]
|
|
|
|
|
+ cls_out = outputs[scale_idx * 3 + 1]
|
|
|
|
|
+
|
|
|
|
|
+ # 检测 cls 是否为常量(量化/导出异常导致)
|
|
|
|
|
+ if np.ptp(cls_out) > 1e-4:
|
|
|
|
|
+ cls_constant = False
|
|
|
|
|
+
|
|
|
|
|
+ _, _, h, w = bbox_out.shape
|
|
|
|
|
+ # (64, H, W) -> (H*W, 4, 16)
|
|
|
|
|
+ bbox = bbox_out[0].transpose(1, 2, 0).reshape(h * w, 4, reg_max)
|
|
|
|
|
+ # (80, H, W) -> (H*W, 80)
|
|
|
|
|
+ cls = cls_out[0].transpose(1, 2, 0).reshape(h * w, 80)
|
|
|
|
|
+
|
|
|
|
|
+ # DFL 解码
|
|
|
|
|
+ prob = np.exp(bbox - np.max(bbox, axis=-1, keepdims=True))
|
|
|
|
|
+ prob = prob / np.sum(prob, axis=-1, keepdims=True)
|
|
|
|
|
+ bins = np.arange(reg_max).reshape(1, 1, reg_max)
|
|
|
|
|
+ decoded = np.sum(prob * bins, axis=-1) # (H*W, 4)
|
|
|
|
|
+
|
|
|
|
|
+ # 网格中心
|
|
|
|
|
+ grid = self._make_grid(h, w) + 0.5 # (H*W, 2)
|
|
|
|
|
+
|
|
|
|
|
+ l, t, r, b = decoded[:, 0], decoded[:, 1], decoded[:, 2], decoded[:, 3]
|
|
|
|
|
+ x1 = (grid[:, 0] - l) * stride
|
|
|
|
|
+ y1 = (grid[:, 1] - t) * stride
|
|
|
|
|
+ x2 = (grid[:, 0] + r) * stride
|
|
|
|
|
+ y2 = (grid[:, 1] + b) * stride
|
|
|
|
|
+
|
|
|
|
|
+ # cls sigmoid
|
|
|
|
|
+ cls = 1.0 / (1.0 + np.exp(-cls))
|
|
|
|
|
+ scores = np.max(cls, axis=1)
|
|
|
|
|
+ labels = np.argmax(cls, axis=1)
|
|
|
|
|
+
|
|
|
|
|
+ for i in range(len(scores)):
|
|
|
|
|
+ if scores[i] < conf_threshold:
|
|
|
|
|
+ continue
|
|
|
|
|
+ dets.append([x1[i], y1[i], x2[i], y2[i], scores[i], labels[i]])
|
|
|
|
|
+
|
|
|
|
|
+ if not dets:
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ dets = np.array(dets)
|
|
|
|
|
+
|
|
|
|
|
+ # 从 canvas(640x640) 坐标映射回原始图像坐标:去 padding -> 除以 scale
|
|
|
|
|
+ dets[:, [0, 2]] = (dets[:, [0, 2]] - pad_w) / scale
|
|
|
|
|
+ dets[:, [1, 3]] = (dets[:, [1, 3]] - pad_h) / scale
|
|
|
|
|
+
|
|
|
|
|
+ # clip
|
|
|
|
|
+ dets[:, [0, 2]] = np.clip(dets[:, [0, 2]], 0, orig_w)
|
|
|
|
|
+ dets[:, [1, 3]] = np.clip(dets[:, [1, 3]], 0, orig_h)
|
|
|
|
|
+
|
|
|
|
|
+ # 若 cls 输出异常常量,用伪置信度替代,帮助 NMS 和后续过滤
|
|
|
|
|
+ if cls_constant:
|
|
|
|
|
+ pseudo_scores = self._pseudo_person_confidence(dets[:, :4], orig_h, orig_w)
|
|
|
|
|
+ dets[:, 4] = pseudo_scores
|
|
|
|
|
+ dets[:, 5] = 0 # 强制为 person 类别
|
|
|
|
|
+
|
|
|
|
|
+ # NMS
|
|
|
|
|
+ x1, y1, x2, y2 = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3]
|
|
|
|
|
+ scores = dets[:, 4]
|
|
|
|
|
+ areas = (x2 - x1) * (y2 - y1)
|
|
|
|
|
+ order = scores.argsort()[::-1]
|
|
|
|
|
+
|
|
|
|
|
+ keep = []
|
|
|
|
|
+ while order.size > 0:
|
|
|
|
|
+ i = order[0]
|
|
|
|
|
+ keep.append(i)
|
|
|
|
|
+
|
|
|
|
|
+ xx1 = np.maximum(x1[i], x1[order[1:]])
|
|
|
|
|
+ yy1 = np.maximum(y1[i], y1[order[1:]])
|
|
|
|
|
+ xx2 = np.minimum(x2[i], x2[order[1:]])
|
|
|
|
|
+ yy2 = np.minimum(y2[i], y2[order[1:]])
|
|
|
|
|
+
|
|
|
|
|
+ w = np.maximum(0.0, xx2 - xx1)
|
|
|
|
|
+ h = np.maximum(0.0, yy2 - yy1)
|
|
|
|
|
+ inter = w * h
|
|
|
|
|
+ iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-6)
|
|
|
|
|
+
|
|
|
|
|
+ inds = np.where(iou <= iou_threshold)[0]
|
|
|
|
|
+ order = order[inds + 1]
|
|
|
|
|
+
|
|
|
|
|
+ return dets[keep].tolist()
|
|
|
|
|
+
|
|
|
|
|
+ def _detect_rknn(self, frame: np.ndarray) -> List[DetectedObject]:
|
|
|
|
|
+ """使用 RKNN/ONNX 模型检测"""
|
|
|
|
|
+ results = []
|
|
|
|
|
+ h0, w0 = frame.shape[:2]
|
|
|
|
|
+
|
|
|
|
|
+ # 宽幅图(宽高比>1.5)使用分区域检测,避免letterbox后人太小
|
|
|
|
|
+ if w0 / h0 > 1.5:
|
|
|
|
|
+ return self._detect_rknn_tiled(frame)
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ conf_threshold = self.config['confidence_threshold']
|
|
|
|
|
+ class_map = self.config.get('class_map', {0: 'person'})
|
|
|
|
|
+
|
|
|
|
|
+ # -------------------------------------------------------
|
|
|
|
|
+ # end2end 模型(内置NMS):letterbox + NHWC 预处理
|
|
|
|
|
+ # 输出格式 (N, max_det, 6) = (x1,y1,x2,y2,conf,cls) 在 letterbox 空间
|
|
|
|
|
+ # -------------------------------------------------------
|
|
|
|
|
+ if self.is_end2end:
|
|
|
|
|
+ canvas, scale, pad_w, pad_h, h0, w0 = self._letterbox(frame)
|
|
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
|
|
+ blob = img[None, ...] # NHWC
|
|
|
|
|
+
|
|
|
|
|
+ if hasattr(self, 'rknn'):
|
|
|
|
|
+ outputs = self.rknn.inference(inputs=[blob])
|
|
|
|
|
+ else:
|
|
|
|
|
+ # ONNX 通常期望 NCHW,需转置
|
|
|
|
|
+ nchw = blob.transpose(0, 3, 1, 2)
|
|
|
|
|
+ outputs = self.session.run(None, {self.input_name: nchw})
|
|
|
|
|
+
|
|
|
|
|
+ output = outputs[0]
|
|
|
|
|
+ if len(output.shape) == 3:
|
|
|
|
|
+ output = output[0]
|
|
|
|
|
+
|
|
|
|
|
+ for i in range(output.shape[0]):
|
|
|
|
|
+ x1_lb, y1_lb, x2_lb, y2_lb, conf, cls_id = output[i]
|
|
|
|
|
+ if not np.isfinite(conf) or conf < conf_threshold:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if not (np.isfinite(x1_lb) and np.isfinite(y1_lb) and np.isfinite(x2_lb) and np.isfinite(y2_lb)):
|
|
|
|
|
+ continue
|
|
|
|
|
+ if x1_lb >= x2_lb or y1_lb >= y2_lb:
|
|
|
|
|
+ continue
|
|
|
|
|
+ cls_name = class_map.get(int(cls_id), str(int(cls_id)))
|
|
|
|
|
+ if cls_name not in self.config['target_classes']:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 从 letterbox 空间映射回原图
|
|
|
|
|
+ x1 = int((x1_lb - pad_w) / scale)
|
|
|
|
|
+ y1 = int((y1_lb - pad_h) / scale)
|
|
|
|
|
+ x2 = int((x2_lb - pad_w) / scale)
|
|
|
|
|
+ y2 = int((y2_lb - pad_h) / scale)
|
|
|
|
|
+ x1 = max(0, min(w0, x1))
|
|
|
|
|
+ y1 = max(0, min(h0, y1))
|
|
|
|
|
+ x2 = max(0, min(w0, x2))
|
|
|
|
|
+ y2 = max(0, min(h0, y2))
|
|
|
|
|
+
|
|
|
|
|
+ if x2 - x1 < 10 or y2 - y1 < 10:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ results.append(DetectedObject(
|
|
|
|
|
+ class_name=cls_name,
|
|
|
|
|
+ confidence=float(conf),
|
|
|
|
|
+ bbox=(x1, y1, x2 - x1, y2 - y1),
|
|
|
|
|
+ center=((x1 + x2) // 2, (y1 + y2) // 2)
|
|
|
|
|
+ ))
|
|
|
|
|
+ return results
|
|
|
|
|
+
|
|
|
|
|
+ # -------------------------------------------------------
|
|
|
|
|
+ # 非 end2end 模型:letterbox + NHWC 预处理
|
|
|
|
|
+ # -------------------------------------------------------
|
|
|
|
|
+ canvas, scale, pad_w, pad_h, h0, w0 = self._letterbox(frame)
|
|
|
|
|
+
|
|
|
|
|
+ if hasattr(self, 'rknn'):
|
|
|
|
|
+ # RKNN
|
|
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
|
|
+ blob = img[None, ...]
|
|
|
|
|
+ outputs = self.rknn.inference(inputs=[blob])
|
|
|
|
|
+ else:
|
|
|
|
|
+ # ONNX
|
|
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
|
|
+ img = img.transpose(2, 0, 1)
|
|
|
|
|
+ blob = img[None, ...]
|
|
|
|
|
+ outputs = self.session.run(None, {self.input_name: blob})
|
|
|
|
|
+
|
|
|
|
|
+ # 根据输出数量判断格式:YOLO11 DFL 为 9 个输出(3 scales x 3 branches)
|
|
|
|
|
+ if len(outputs) == 9:
|
|
|
|
|
+ dets = self._decode_yolo11_outputs(
|
|
|
|
|
+ outputs, (640, 640), scale, pad_w, pad_h, h0, w0,
|
|
|
|
|
+ conf_threshold=conf_threshold
|
|
|
|
|
+ )
|
|
|
|
|
+ for x1, y1, x2, y2, score, cls_id in dets:
|
|
|
|
|
+ cls_name = class_map.get(int(cls_id), str(int(cls_id)))
|
|
|
|
|
+ if cls_name not in self.config['target_classes']:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if x2 - x1 < 10 or y2 - y1 < 10:
|
|
|
|
|
+ continue
|
|
|
|
|
+ obj = DetectedObject(
|
|
|
|
|
+ class_name=cls_name,
|
|
|
|
|
+ confidence=float(score),
|
|
|
|
|
+ bbox=(int(x1), int(y1), int(x2 - x1), int(y2 - y1)),
|
|
|
|
|
+ center=(int((x1 + x2) / 2), int((y1 + y2) / 2))
|
|
|
|
|
+ )
|
|
|
|
|
+ results.append(obj)
|
|
|
|
|
+ else:
|
|
|
|
|
+ output = outputs[0]
|
|
|
|
|
+ if len(output.shape) == 3:
|
|
|
|
|
+ output = output[0]
|
|
|
|
|
+
|
|
|
|
|
+ num_boxes = output.shape[1]
|
|
|
|
|
+ candidates = []
|
|
|
|
|
+
|
|
|
|
|
+ # YOLO11 单输出格式 (9, 8400): bbox(4) + obj(1) + cls(4)
|
|
|
|
|
+ if output.shape[0] == 9:
|
|
|
|
|
+ for i in range(num_boxes):
|
|
|
|
|
+ obj = float(output[4, i])
|
|
|
|
|
+ cls_logits = output[5:9, i]
|
|
|
|
|
+ best_class = int(np.argmax(cls_logits))
|
|
|
|
|
+ confidence = 1.0 / (1.0 + np.exp(-float(cls_logits[best_class])))
|
|
|
|
|
+ if confidence < conf_threshold:
|
|
|
|
|
+ continue
|
|
|
|
|
+ x_center = float(output[0, i])
|
|
|
|
|
+ y_center = float(output[1, i])
|
|
|
|
|
+ w = float(output[2, i])
|
|
|
|
|
+ h = float(output[3, i])
|
|
|
|
|
+ x1 = int(((x_center - w / 2) - pad_w) / scale)
|
|
|
|
|
+ y1 = int(((y_center - h / 2) - pad_h) / scale)
|
|
|
|
|
+ x2 = int(((x_center + w / 2) - pad_w) / scale)
|
|
|
|
|
+ y2 = int(((y_center + h / 2) - pad_h) / scale)
|
|
|
|
|
+ x1 = max(0, min(w0, x1))
|
|
|
|
|
+ y1 = max(0, min(h0, y1))
|
|
|
|
|
+ x2 = max(0, min(w0, x2))
|
|
|
|
|
+ y2 = max(0, min(h0, y2))
|
|
|
|
|
+ if x2 - x1 < 10 or y2 - y1 < 10:
|
|
|
|
|
+ continue
|
|
|
|
|
+ cls_name = class_map.get(best_class, str(best_class))
|
|
|
|
|
+ if cls_name not in self.config['target_classes']:
|
|
|
|
|
+ continue
|
|
|
|
|
+ candidates.append(DetectedObject(
|
|
|
|
|
+ class_name=cls_name,
|
|
|
|
|
+ confidence=confidence,
|
|
|
|
|
+ bbox=(x1, y1, x2 - x1, y2 - y1),
|
|
|
|
|
+ center=((x1 + x2) // 2, (y1 + y2) // 2)
|
|
|
|
|
+ ))
|
|
|
|
|
+ else:
|
|
|
|
|
+ # 标准 (84, 8400) 格式(ONNX 或新 RKNN)
|
|
|
|
|
+ for i in range(num_boxes):
|
|
|
|
|
+ x_center = float(output[0, i])
|
|
|
|
|
+ y_center = float(output[1, i])
|
|
|
|
|
+ width = float(output[2, i])
|
|
|
|
|
+ height = float(output[3, i])
|
|
|
|
|
+
|
|
|
|
|
+ class_probs = output[4:, i]
|
|
|
|
|
+ best_class = int(np.argmax(class_probs))
|
|
|
|
|
+ confidence = 1.0 / (1.0 + np.exp(-float(class_probs[best_class])))
|
|
|
|
|
+
|
|
|
|
|
+ if confidence < conf_threshold:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ x1 = int(((x_center - width / 2) - pad_w) / scale)
|
|
|
|
|
+ y1 = int(((y_center - height / 2) - pad_h) / scale)
|
|
|
|
|
+ x2 = int(((x_center + width / 2) - pad_w) / scale)
|
|
|
|
|
+ y2 = int(((y_center + height / 2) - pad_h) / scale)
|
|
|
|
|
+
|
|
|
|
|
+ x1 = max(0, min(w0, x1))
|
|
|
|
|
+ y1 = max(0, min(h0, y1))
|
|
|
|
|
+ x2 = max(0, min(w0, x2))
|
|
|
|
|
+ y2 = max(0, min(h0, y2))
|
|
|
|
|
+
|
|
|
|
|
+ if x2 - x1 < 10 or y2 - y1 < 10:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ cls_name = class_map.get(best_class, str(best_class))
|
|
|
|
|
+ if cls_name not in self.config['target_classes']:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ candidates.append(DetectedObject(
|
|
|
|
|
+ class_name=cls_name,
|
|
|
|
|
+ confidence=confidence,
|
|
|
|
|
+ bbox=(x1, y1, x2 - x1, y2 - y1),
|
|
|
|
|
+ center=((x1 + x2) // 2, (y1 + y2) // 2)
|
|
|
|
|
+ ))
|
|
|
|
|
+
|
|
|
|
|
+ results = nms(candidates, iou_threshold=0.45)
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"RKNN/ONNX 检测错误: {e}")
|
|
|
|
|
+ import traceback
|
|
|
|
|
+ logger.error(traceback.format_exc())
|
|
|
|
|
+
|
|
|
|
|
+ return results
|
|
|
|
|
+
|
|
|
|
|
+ def _detect_rknn_tiled(self, frame: np.ndarray) -> List[DetectedObject]:
|
|
|
|
|
+ """分区域检测 - 将宽幅全景图分成多个重叠区域分别检测,提高远处目标识别率"""
|
|
|
|
|
+ results = []
|
|
|
|
|
+ h0, w0 = frame.shape[:2]
|
|
|
|
|
+ need_nms = not self.is_end2end
|
|
|
|
|
+ conf_threshold = self.config['confidence_threshold']
|
|
|
|
|
+ class_map = self.config.get('class_map', {0: 'person'})
|
|
|
|
|
+
|
|
|
|
|
+ # 分3个重叠区域
|
|
|
|
|
+ overlap = int(h0 * 0.2)
|
|
|
|
|
+ regions = [
|
|
|
|
|
+ (0, w0 // 3 + overlap),
|
|
|
|
|
+ (w0 // 3 - overlap // 2, 2 * w0 // 3 + overlap // 2),
|
|
|
|
|
+ (2 * w0 // 3 - overlap, w0),
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ seen_centers = []
|
|
|
|
|
+
|
|
|
|
|
+ for x_start, x_end in regions:
|
|
|
|
|
+ crop = frame[:, x_start:x_end]
|
|
|
|
|
+ ch, cw = crop.shape[:2]
|
|
|
|
|
+
|
|
|
|
|
+ # end2end 模型:letterbox + NCHW
|
|
|
|
|
+ if self.is_end2end:
|
|
|
|
|
+ canvas, scale, pad_w, pad_h, ch, cw = self._letterbox(crop)
|
|
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
|
|
+ if hasattr(self, 'rknn'):
|
|
|
|
|
+ outputs = self.rknn.inference(inputs=[img[None, ...]])
|
|
|
|
|
+ else:
|
|
|
|
|
+ outputs = self.session.run(None, {self.input_name: img.transpose(2, 0, 1)[None, ...]})
|
|
|
|
|
+ output = outputs[0]
|
|
|
|
|
+ if len(output.shape) == 3:
|
|
|
|
|
+ output = output[0]
|
|
|
|
|
+ dets = []
|
|
|
|
|
+ for i in range(output.shape[0]):
|
|
|
|
|
+ x1_lb, y1_lb, x2_lb, y2_lb, conf, cls_id = output[i]
|
|
|
|
|
+ if conf < conf_threshold:
|
|
|
|
|
+ continue
|
|
|
|
|
+ cls_name = class_map.get(int(cls_id), str(int(cls_id)))
|
|
|
|
|
+ if cls_name not in self.config['target_classes']:
|
|
|
|
|
+ continue
|
|
|
|
|
+ # 从 letterbox 空间映射回 crop 坐标
|
|
|
|
|
+ _x1 = int((x1_lb - pad_w) / scale)
|
|
|
|
|
+ _y1 = int((y1_lb - pad_h) / scale)
|
|
|
|
|
+ _x2 = int((x2_lb - pad_w) / scale)
|
|
|
|
|
+ _y2 = int((y2_lb - pad_h) / scale)
|
|
|
|
|
+ dets.append([_x1, _y1, _x2, _y2, float(conf), int(cls_id)])
|
|
|
|
|
+ else:
|
|
|
|
|
+ canvas, scale, pad_w, pad_h, ch, cw = self._letterbox(crop)
|
|
|
|
|
+
|
|
|
|
|
+ if hasattr(self, 'rknn'):
|
|
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
|
|
+ outputs = self.rknn.inference(inputs=[img[None, ...]])
|
|
|
|
|
+ else:
|
|
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
|
|
+ img = img.transpose(2, 0, 1)
|
|
|
|
|
+ outputs = self.session.run(None, {self.input_name: img[None, ...]})
|
|
|
|
|
+
|
|
|
|
|
+ if len(outputs) == 9:
|
|
|
|
|
+ dets = self._decode_yolo11_outputs(
|
|
|
|
|
+ outputs, (640, 640), scale, pad_w, pad_h, ch, cw,
|
|
|
|
|
+ conf_threshold=conf_threshold
|
|
|
|
|
+ )
|
|
|
|
|
+ else:
|
|
|
|
|
+ output = outputs[0]
|
|
|
|
|
+ if len(output.shape) == 3:
|
|
|
|
|
+ output = output[0]
|
|
|
|
|
+ dets = []
|
|
|
|
|
+ # YOLO11 单输出格式 (9, 8400): bbox(4) + obj(1) + cls(4)
|
|
|
|
|
+ if output.shape[0] == 9:
|
|
|
|
|
+ for i in range(output.shape[1]):
|
|
|
|
|
+ obj = float(output[4, i])
|
|
|
|
|
+ cls_logits = output[5:9, i]
|
|
|
|
|
+ best_class = int(np.argmax(cls_logits))
|
|
|
|
|
+ confidence = 1.0 / (1.0 + np.exp(-float(cls_logits[best_class])))
|
|
|
|
|
+ if confidence < conf_threshold:
|
|
|
|
|
+ continue
|
|
|
|
|
+ x_center = float(output[0, i])
|
|
|
|
|
+ y_center = float(output[1, i])
|
|
|
|
|
+ w = float(output[2, i])
|
|
|
|
|
+ h = float(output[3, i])
|
|
|
|
|
+ x1 = x_center - w / 2
|
|
|
|
|
+ y1 = y_center - h / 2
|
|
|
|
|
+ x2 = x_center + w / 2
|
|
|
|
|
+ y2 = y_center + h / 2
|
|
|
|
|
+ dets.append([x1, y1, x2, y2, confidence, best_class])
|
|
|
|
|
+ else:
|
|
|
|
|
+ # 标准 (84, 8400) 格式
|
|
|
|
|
+ for i in range(output.shape[1]):
|
|
|
|
|
+ class_probs = output[4:, i]
|
|
|
|
|
+ best_class = int(np.argmax(class_probs))
|
|
|
|
|
+ confidence = 1.0 / (1.0 + np.exp(-float(class_probs[best_class])))
|
|
|
|
|
+ if confidence < conf_threshold:
|
|
|
|
|
+ continue
|
|
|
|
|
+ x1 = output[0, i] - output[2, i] / 2
|
|
|
|
|
+ y1 = output[1, i] - output[3, i] / 2
|
|
|
|
|
+ x2 = output[0, i] + output[2, i] / 2
|
|
|
|
|
+ y2 = output[1, i] + output[3, i] / 2
|
|
|
|
|
+ dets.append([x1, y1, x2, y2, confidence, best_class])
|
|
|
|
|
+
|
|
|
|
|
+ # 从 canvas 坐标映射回 crop 坐标
|
|
|
|
|
+ dets = np.array(dets) if dets else np.zeros((0, 6))
|
|
|
|
|
+ dets[:, [0, 2]] = (dets[:, [0, 2]] - pad_w) / scale
|
|
|
|
|
+ dets[:, [1, 3]] = (dets[:, [1, 3]] - pad_h) / scale
|
|
|
|
|
+ dets = dets.tolist()
|
|
|
|
|
+
|
|
|
|
|
+ for x1, y1, x2, y2, confidence, best_class in dets:
|
|
|
|
|
+ cls_name = class_map.get(int(best_class), str(int(best_class)))
|
|
|
|
|
+ if cls_name not in self.config['target_classes']:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 转换到原图坐标(加上区域偏移)
|
|
|
|
|
+ x1 = int(x1) + x_start
|
|
|
|
|
+ y1 = int(y1)
|
|
|
|
|
+ x2 = int(x2) + x_start
|
|
|
|
|
+ y2 = int(y2)
|
|
|
|
|
+
|
|
|
|
|
+ x1 = max(0, min(w0, x1))
|
|
|
|
|
+ y1 = max(0, min(h0, y1))
|
|
|
|
|
+ x2 = max(0, min(w0, x2))
|
|
|
|
|
+ y2 = max(0, min(h0, y2))
|
|
|
|
|
+
|
|
|
|
|
+ if x2 - x1 < 10 or y2 - y1 < 10:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 去重:同一目标可能被相邻区域重复检测
|
|
|
|
|
+ cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
|
|
|
|
|
+ if any(abs(cx - sx) < 50 and abs(cy - sy) < 50 for sx, sy in seen_centers):
|
|
|
|
|
+ continue
|
|
|
|
|
+ seen_centers.append((cx, cy))
|
|
|
|
|
+
|
|
|
|
|
+ obj = DetectedObject(
|
|
|
|
|
+ class_name=cls_name,
|
|
|
|
|
+ confidence=float(confidence),
|
|
|
|
|
+ bbox=(x1, y1, x2 - x1, y2 - y1),
|
|
|
|
|
+ center=(cx, cy)
|
|
|
|
|
+ )
|
|
|
|
|
+ results.append(obj)
|
|
|
|
|
+
|
|
|
|
|
+ if need_nms and results:
|
|
|
|
|
+ results = nms(results, iou_threshold=0.45)
|
|
|
|
|
+
|
|
|
|
|
+ return results
|
|
|
|
|
+
|
|
|
|
|
+ def detect(self, frame: np.ndarray) -> List[DetectedObject]:
|
|
|
|
|
+ """检测物体返回所有类别结果"""
|
|
|
|
|
+ if frame is None:
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ if hasattr(self, 'rknn') and self.rknn is not None:
|
|
|
|
|
+ results = self._detect_rknn(frame)
|
|
|
|
|
+ if results:
|
|
|
|
|
+ self._log_detections("RKNN", results, frame)
|
|
|
|
|
+ self._save_detection_image(frame, results)
|
|
|
|
|
+ return results
|
|
|
|
|
+ elif hasattr(self, 'session') and self.session is not None:
|
|
|
|
|
+ results = self._detect_rknn(frame)
|
|
|
|
|
+ if results:
|
|
|
|
|
+ self._log_detections("ONNX", results, frame)
|
|
|
|
|
+ self._save_detection_image(frame, results)
|
|
|
|
|
+ return results
|
|
|
|
|
+ elif self.model is not None:
|
|
|
|
|
+ results = self._detect_yolo(frame)
|
|
|
|
|
+ if results:
|
|
|
|
|
+ self._log_detections("YOLO", results, frame)
|
|
|
|
|
+ self._save_detection_image(frame, results)
|
|
|
|
|
+ return results
|
|
|
|
|
+ else:
|
|
|
|
|
+ logger.error("[YOLO] 没有可用的检测模型")
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ def _log_detections(self, model_type: str, results: List[DetectedObject], frame: np.ndarray):
|
|
|
|
|
+ if not results:
|
|
|
|
|
+ return
|
|
|
|
|
+ class_counts = {}
|
|
|
|
|
+ for r in results:
|
|
|
|
|
+ class_counts[r.class_name] = class_counts.get(r.class_name, 0) + 1
|
|
|
|
|
+ h, w = frame.shape[:2]
|
|
|
|
|
+ logger.info(f"[YOLO] {model_type}: {len(results)}个目标 {class_counts} (帧尺寸={w}x{h})")
|
|
|
|
|
+
|
|
|
|
|
+ def _detect_yolo(self, frame: np.ndarray) -> List[DetectedObject]:
|
|
|
|
|
+ """使用 YOLO 模型检测"""
|
|
|
|
|
+ results = []
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ detections = self.model(
|
|
|
|
|
+ frame,
|
|
|
|
|
+ device=self.device,
|
|
|
|
|
+ verbose=False,
|
|
|
|
|
+ conf=self.config['confidence_threshold']
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ for det in detections:
|
|
|
|
|
+ boxes = det.boxes
|
|
|
|
|
+ if boxes is None:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ for i in range(len(boxes)):
|
|
|
|
|
+ cls_id = int(boxes.cls[i])
|
|
|
|
|
+ cls_name = det.names[cls_id]
|
|
|
|
|
+
|
|
|
|
|
+ if cls_name not in self.config['target_classes']:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ conf = float(boxes.conf[i])
|
|
|
|
|
+
|
|
|
|
|
+ xyxy = boxes.xyxy[i].cpu().numpy()
|
|
|
|
|
+ x1, y1, x2, y2 = map(int, xyxy)
|
|
|
|
|
+ width = x2 - x1
|
|
|
|
|
+ height = y2 - y1
|
|
|
|
|
+
|
|
|
|
|
+ if width < 10 or height < 10:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ center_x = x1 + width // 2
|
|
|
|
|
+ center_y = y1 + height // 2
|
|
|
|
|
+
|
|
|
|
|
+ obj = DetectedObject(
|
|
|
|
|
+ class_name=cls_name,
|
|
|
|
|
+ confidence=conf,
|
|
|
|
|
+ bbox=(x1, y1, width, height),
|
|
|
|
|
+ center=(center_x, center_y)
|
|
|
|
|
+ )
|
|
|
|
|
+ results.append(obj)
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"YOLO11检测错误: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ return results
|
|
|
|
|
+
|
|
|
|
|
+ def detect_with_keypoints(self, frame: np.ndarray) -> List[DetectedObject]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 使用YOLO11-pose检测人体并返回关键点
|
|
|
|
|
+ Args:
|
|
|
|
|
+ frame: 输入图像
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 带关键点的检测结果列表
|
|
|
|
|
+ """
|
|
|
|
|
+ return self.detect(frame)
|
|
|
|
|
+
|
|
|
|
|
+ def detect_persons(self, frame: np.ndarray) -> List[DetectedObject]:
|
|
|
|
|
+ """检测人体(支持中英文类别名)"""
|
|
|
|
|
+ all_detections = self.detect(frame)
|
|
|
|
|
+ person_classes = {'person', '人'}
|
|
|
|
|
+ return [obj for obj in all_detections if obj.class_name in person_classes]
|
|
|
|
|
+
|
|
|
|
|
+ def release(self):
|
|
|
|
|
+ """释放模型资源"""
|
|
|
|
|
+ if hasattr(self, 'rknn') and self.rknn:
|
|
|
|
|
+ self.rknn.release()
|
|
|
|
|
+ self.rknn = None
|
|
|
|
|
+ self.model = None
|
|
|
|
|
+ self.session = None
|