|
|
@@ -6,8 +6,13 @@
|
|
|
import time
|
|
|
import cv2
|
|
|
import numpy as np
|
|
|
+import logging
|
|
|
from typing import Optional, List, Tuple, Dict
|
|
|
from dataclasses import dataclass
|
|
|
+from pathlib import Path
|
|
|
+from datetime import datetime
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
|
|
|
try:
|
|
|
from ultralytics import YOLO
|
|
|
@@ -60,7 +65,8 @@ class PTZPersonDetector:
|
|
|
"""
|
|
|
|
|
|
def __init__(self, model_path: str = None, model_type: str = 'auto',
|
|
|
- confidence_threshold: float = 0.5, use_gpu: bool = False):
|
|
|
+ confidence_threshold: float = 0.5, use_gpu: bool = False,
|
|
|
+ save_image: bool = True, image_dir: str = '/home/admin/dsh/ptz_detection_images'):
|
|
|
"""
|
|
|
初始化检测器
|
|
|
Args:
|
|
|
@@ -68,6 +74,8 @@ class PTZPersonDetector:
|
|
|
model_type: 模型类型 ('yolo', 'rknn', 'auto')
|
|
|
confidence_threshold: 置信度阈值
|
|
|
use_gpu: 是否使用GPU
|
|
|
+ save_image: 是否保存检测图片
|
|
|
+ image_dir: 图片保存目录
|
|
|
"""
|
|
|
self.model_path = model_path
|
|
|
self.model_type = model_type
|
|
|
@@ -76,9 +84,27 @@ class PTZPersonDetector:
|
|
|
self.model = None
|
|
|
self.person_class_id = 0 # YOLO默认person类别ID
|
|
|
|
|
|
+ # 图片保存配置
|
|
|
+ self._save_image_enabled = save_image
|
|
|
+ self._image_save_dir = Path(image_dir)
|
|
|
+ self._last_save_time = 0
|
|
|
+ self._save_interval = 0.5 # 最小保存间隔(秒)
|
|
|
+
|
|
|
+ if self._save_image_enabled:
|
|
|
+ self._ensure_save_dir()
|
|
|
+
|
|
|
if model_path:
|
|
|
self._load_model(model_path, model_type)
|
|
|
|
|
|
+ def _ensure_save_dir(self):
|
|
|
+ """确保保存目录存在"""
|
|
|
+ try:
|
|
|
+ self._image_save_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+ logger.info(f"[球机] 检测图片保存目录: {self._image_save_dir}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"[球机] 创建检测图片目录失败: {e}")
|
|
|
+ self._save_image_enabled = False
|
|
|
+
|
|
|
def _load_model(self, model_path: str, model_type: str = 'auto'):
|
|
|
"""加载模型"""
|
|
|
if model_type == 'auto':
|
|
|
@@ -248,11 +274,77 @@ class PTZPersonDetector:
|
|
|
return persons
|
|
|
|
|
|
def detect_largest_person(self, frame: np.ndarray) -> Optional[DetectedPerson]:
|
|
|
- """检测最大的人体"""
|
|
|
+ """检测最大的人体并保存图片"""
|
|
|
persons = self.detect(frame)
|
|
|
- if not persons:
|
|
|
- return None
|
|
|
- return max(persons, key=lambda p: p.area)
|
|
|
+ if persons:
|
|
|
+ self._save_detection_image(frame, persons)
|
|
|
+ return max(persons, key=lambda p: p.area)
|
|
|
+ return None
|
|
|
+
|
|
|
+ def _save_detection_image(self, frame: np.ndarray, persons: List[DetectedPerson]):
|
|
|
+ """
|
|
|
+ 保存带有检测标记的图片(标记人体边界框和序号)
|
|
|
+ Args:
|
|
|
+ frame: 原始图像
|
|
|
+ persons: 检测到的人体列表
|
|
|
+ """
|
|
|
+ if not self._save_image_enabled or not persons:
|
|
|
+ return
|
|
|
+
|
|
|
+ # 检查保存间隔
|
|
|
+ current_time = time.time()
|
|
|
+ if current_time - self._last_save_time < self._save_interval:
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 复制图像避免修改原图
|
|
|
+ marked_frame = frame.copy()
|
|
|
+
|
|
|
+ # 绘制检测结果(带序号)
|
|
|
+ for idx, person in enumerate(persons, 1):
|
|
|
+ x1, y1, x2, y2 = person.bbox
|
|
|
+
|
|
|
+ # 绘制边界框(绿色)
|
|
|
+ cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
|
|
+
|
|
|
+ # 绘制序号标签
|
|
|
+ label = f"#{idx} person"
|
|
|
+ (label_w, label_h), baseline = cv2.getTextSize(
|
|
|
+ label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
|
|
|
+ )
|
|
|
+ cv2.rectangle(
|
|
|
+ marked_frame,
|
|
|
+ (x1, y1 - label_h - 8),
|
|
|
+ (x1 + label_w, y1),
|
|
|
+ (0, 255, 0),
|
|
|
+ -1
|
|
|
+ )
|
|
|
+
|
|
|
+ # 绘制标签文字(黑色)
|
|
|
+ cv2.putText(
|
|
|
+ marked_frame, label,
|
|
|
+ (x1, y1 - 4),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 0.8,
|
|
|
+ (0, 0, 0), 2
|
|
|
+ )
|
|
|
+
|
|
|
+ # 绘制中心点(红色)
|
|
|
+ cv2.circle(marked_frame, (int(person.center[0]), int(person.center[1])), 5, (0, 0, 255), -1)
|
|
|
+
|
|
|
+
|
|
|
+ # 生成文件名(时间戳+人数)
|
|
|
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
|
|
+ filename = f"ptz_{timestamp}_n{len(persons)}.jpg"
|
|
|
+ filepath = self._image_save_dir / filename
|
|
|
+
|
|
|
+ # 保存图片
|
|
|
+ cv2.imwrite(str(filepath), marked_frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
|
|
|
+ self._last_save_time = current_time
|
|
|
+
|
|
|
+ logger.info(f"[球机] 已保存检测图片: {filepath},检测到 {len(persons)} 人")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"[球机] 保存检测图片失败: {e}")
|
|
|
|
|
|
|
|
|
class PTZAutoZoomController:
|