| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529 |
- """
- 球机端人体检测与自动对焦模块
- 在球机移动到位后,检测人体并自动调整焦距,使人体居中并占据合适比例
- """
- import time
- import cv2
- import numpy as np
- import logging
- from typing import Optional, List, Tuple, Dict
- from dataclasses import dataclass
- from pathlib import Path
- from datetime import datetime
- logger = logging.getLogger(__name__)
- try:
- from ultralytics import YOLO
- HAS_YOLO = True
- except ImportError:
- HAS_YOLO = False
- try:
- from rknnlite.api import RKNNLite
- HAS_RKNN = True
- except ImportError:
- HAS_RKNN = False
- @dataclass
- class DetectedPerson:
- """检测到的人体"""
- bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2)
- center: Tuple[float, float] # 中心点 (x, y)
- width: int
- height: int
- confidence: float
-
- @property
- def area(self) -> int:
- return self.width * self.height
-
- @property
- def size_ratio(self) -> float:
- """人体宽高比"""
- return self.width / self.height if self.height > 0 else 0
- @dataclass
- class ZoomAdjustResult:
- """变焦调整结果"""
- success: bool
- new_zoom: int
- pan_adjust: float # pan调整量(度)
- tilt_adjust: float # tilt调整量(度)
- person_detected: bool
- person_centered: bool
- message: str
- class PTZPersonDetector:
- """
- 球机端人体检测器
- 支持YOLO(PT)和RKNN两种模型格式
- """
-
- def __init__(self, model_path: str = None, model_type: str = 'auto',
- confidence_threshold: float = 0.5, use_gpu: bool = False,
- save_image: bool = True, image_dir: str = '/home/admin/dsh/ptz_detection_images'):
- """
- 初始化检测器
- Args:
- model_path: 模型路径
- model_type: 模型类型 ('yolo', 'rknn', 'auto')
- confidence_threshold: 置信度阈值
- use_gpu: 是否使用GPU
- save_image: 是否保存检测图片
- image_dir: 图片保存目录
- """
- self.model_path = model_path
- self.model_type = model_type
- self.confidence_threshold = confidence_threshold
- self.use_gpu = use_gpu
- self.model = None
- self.person_class_id = 0 # YOLO默认person类别ID
-
- # 图片保存配置
- self._save_image_enabled = save_image
- self._image_save_dir = Path(image_dir)
- self._last_save_time = 0
- self._save_interval = 0.5 # 最小保存间隔(秒)
-
- if self._save_image_enabled:
- self._ensure_save_dir()
-
- if model_path:
- self._load_model(model_path, model_type)
-
- def _ensure_save_dir(self):
- """确保保存目录存在"""
- try:
- self._image_save_dir.mkdir(parents=True, exist_ok=True)
- logger.info(f"[球机] 检测图片保存目录: {self._image_save_dir}")
- except Exception as e:
- logger.error(f"[球机] 创建检测图片目录失败: {e}")
- self._save_image_enabled = False
-
- def _load_model(self, model_path: str, model_type: str = 'auto'):
- """加载模型"""
- if model_type == 'auto':
- if model_path.endswith('.rknn'):
- model_type = 'rknn'
- elif model_path.endswith('.onnx'):
- model_type = 'onnx'
- else:
- model_type = 'yolo'
-
- self.model_type = model_type
-
- if model_type == 'rknn':
- self._load_rknn_model(model_path)
- elif model_type == 'yolo':
- self._load_yolo_model(model_path)
- else:
- print(f"[PTZPersonDetector] 不支持的模型类型: {model_type}")
-
- def _load_yolo_model(self, model_path: str):
- """加载YOLO模型"""
- if not HAS_YOLO:
- print("[PTZPersonDetector] ultralytics未安装,无法使用YOLO模型")
- return
-
- try:
- self.model = YOLO(model_path)
- print(f"[PTZPersonDetector] YOLO模型加载成功: {model_path}")
- except Exception as e:
- print(f"[PTZPersonDetector] YOLO模型加载失败: {e}")
-
- def _load_rknn_model(self, model_path: str):
- """加载RKNN模型"""
- if not HAS_RKNN:
- print("[PTZPersonDetector] rknnlite未安装,无法使用RKNN模型")
- return
-
- try:
- self.model = RKNNLite()
- ret = self.model.load_rknn(model_path)
- if ret != 0:
- raise RuntimeError(f"加载RKNN模型失败: ret={ret}")
-
- ret = self.model.init_runtime(target=None) # 自动选择NPU核心
- if ret != 0:
- raise RuntimeError(f"初始化RKNN运行时失败: ret={ret}")
-
- # RKNN安全模型的person类别ID
- self.person_class_id = 3
- print(f"[PTZPersonDetector] RKNN模型加载成功: {model_path}")
- except Exception as e:
- print(f"[PTZPersonDetector] RKNN模型加载失败: {e}")
- self.model = None
-
- def detect(self, frame: np.ndarray) -> List[DetectedPerson]:
- """
- 检测人体
- Args:
- frame: BGR图像
- Returns:
- 检测到的人体列表
- """
- if self.model is None:
- return []
-
- if self.model_type == 'rknn':
- return self._detect_rknn(frame)
- elif self.model_type == 'yolo':
- return self._detect_yolo(frame)
-
- return []
-
- def _detect_yolo(self, frame: np.ndarray) -> List[DetectedPerson]:
- """YOLO检测"""
- persons = []
-
- try:
- results = self.model(frame, verbose=False)
-
- for r in results:
- if r.boxes is None:
- continue
-
- for box in r.boxes:
- cls_id = int(box.cls[0])
- if cls_id != self.person_class_id:
- continue
-
- conf = float(box.conf[0])
- if conf < self.confidence_threshold:
- continue
-
- x1, y1, x2, y2 = map(int, box.xyxy[0])
- center_x = (x1 + x2) / 2
- center_y = (y1 + y2) / 2
- width = x2 - x1
- height = y2 - y1
-
- persons.append(DetectedPerson(
- bbox=(x1, y1, x2, y2),
- center=(center_x, center_y),
- width=width,
- height=height,
- confidence=conf
- ))
- except Exception as e:
- print(f"[PTZPersonDetector] YOLO检测错误: {e}")
-
- return persons
-
- def _detect_rknn(self, frame: np.ndarray) -> List[DetectedPerson]:
- """RKNN检测"""
- persons = []
-
- try:
- # 预处理
- img = cv2.resize(frame, (640, 640))
- img = img.astype(np.float32) / 255.0
- img = np.expand_dims(img, 0)
-
- # 推理
- outputs = self.model.inference(inputs=[img])
-
- # 后处理 (YOLO格式输出)
- # outputs shape: [1, 84, 8400] 或类似
- if outputs is None or len(outputs) == 0:
- return []
-
- output = outputs[0]
-
- # 解析检测结果
- h, w = frame.shape[:2]
-
- for i in range(output.shape[-1]):
- data = output[0, :, i]
-
- # 获取类别和置信度
- class_scores = data[4:]
- class_id = np.argmax(class_scores)
- confidence = class_scores[class_id]
-
- if confidence < self.confidence_threshold:
- continue
-
- if class_id != self.person_class_id:
- continue
-
- # 获取边界框
- cx, cy, bw, bh = data[:4]
-
- # 转换为原图坐标
- x1 = int((cx - bw/2) * w / 640)
- y1 = int((cy - bh/2) * h / 640)
- x2 = int((cx + bw/2) * w / 640)
- y2 = int((cy + bh/2) * h / 640)
-
- persons.append(DetectedPerson(
- bbox=(x1, y1, x2, y2),
- center=((x1+x2)/2, (y1+y2)/2),
- width=x2-x1,
- height=y2-y1,
- confidence=float(confidence)
- ))
- except Exception as e:
- print(f"[PTZPersonDetector] RKNN检测错误: {e}")
-
- return persons
-
- def detect_largest_person(self, frame: np.ndarray) -> Optional[DetectedPerson]:
- """检测最大的人体并保存图片"""
- persons = self.detect(frame)
- if persons:
- self._save_detection_image(frame, persons)
- return max(persons, key=lambda p: p.area)
- return None
-
- def _save_detection_image(self, frame: np.ndarray, persons: List[DetectedPerson]):
- """
- 保存带有检测标记的图片(只标记达到置信度阈值的人)
- Args:
- frame: 原始图像
- persons: 检测到的人体列表
- """
- if not self._save_image_enabled or not persons:
- return
-
- # 检查保存间隔
- current_time = time.time()
- if current_time - self._last_save_time < self._save_interval:
- return
-
- try:
- # 复制图像避免修改原图
- marked_frame = frame.copy()
-
- # 只标记达到阈值的人
- person_count = 0
-
- for person in persons:
- # 未达阈值的不标记
- if person.confidence < self.confidence_threshold:
- continue
-
-
- x1, y1, x2, y2 = person.bbox
-
- # 绘制边界框(绿色)
- cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
-
- # 绘制序号标签
- label = f"person_{person_count}"
- person_count += 1
-
- (label_w, label_h), baseline = cv2.getTextSize(
- label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
- )
- cv2.rectangle(
- marked_frame,
- (x1, y1 - label_h - 8),
- (x1 + label_w, y1),
- (0, 255, 0),
- -1
- )
-
- # 绘制标签文字(黑色)
- cv2.putText(
- marked_frame, label,
- (x1, y1 - 4),
- cv2.FONT_HERSHEY_SIMPLEX, 0.8,
- (0, 0, 0), 2
- )
-
-
- # 无有效目标则不保存
- if person_count == 0:
- return
-
-
- # 生成文件名(时间戳+有效人数)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
- filename = f"ptz_{timestamp}_n{person_count}.jpg"
- filepath = self._image_save_dir / filename
-
- # 保存图片
- cv2.imwrite(str(filepath), marked_frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
- self._last_save_time = current_time
-
- logger.info(f"[球机] 已保存检测图片: {filepath},有效人数 {person_count} (阈值={self.confidence_threshold})")
-
- except Exception as e:
- logger.error(f"[球机] 保存检测图片失败: {e}")
- class PTZAutoZoomController:
- """
- 球机自动变焦控制器
- 根据检测到的人体大小和位置,自动调整PTZ角度和变焦
- """
-
- def __init__(self, ptz_camera, detector: PTZPersonDetector = None, config: dict = None):
- """
- 初始化控制器
- Args:
- ptz_camera: PTZCamera实例
- detector: 人体检测器
- config: 自动变焦配置
- """
- self.ptz = ptz_camera
- self.detector = detector
- self.config = config or {}
-
- # 默认配置
- self.target_size_ratio = self.config.get('target_size_ratio', 0.4)
- self.min_zoom = self.config.get('min_zoom', 3)
- self.max_zoom = self.config.get('max_zoom', 20)
- self.zoom_step = self.config.get('zoom_step', 2)
- self.center_threshold = self.config.get('center_threshold', 0.1)
- self.max_adjust_attempts = self.config.get('max_adjust_attempts', 3)
-
- def adjust_to_person(self, frame: np.ndarray, current_pan: float,
- current_tilt: float, current_zoom: int) -> ZoomAdjustResult:
- """
- 调整PTZ使检测到的人体居中并占据合适比例
-
- Args:
- frame: 球机画面
- current_pan: 当前pan角度
- current_tilt: 当前tilt角度
- current_zoom: 当前变倍
-
- Returns:
- ZoomAdjustResult: 调整结果
- """
- if frame is None:
- return ZoomAdjustResult(
- success=False, new_zoom=current_zoom,
- pan_adjust=0, tilt_adjust=0,
- person_detected=False, person_centered=False,
- message="无法获取球机画面"
- )
-
- # 检测人体
- person = self.detector.detect_largest_person(frame) if self.detector else None
-
- if person is None:
- return ZoomAdjustResult(
- success=False, new_zoom=current_zoom,
- pan_adjust=0, tilt_adjust=0,
- person_detected=False, person_centered=False,
- message="球机画面中未检测到人体"
- )
-
- h, w = frame.shape[:2]
-
- # 计算人体中心偏离画面中心的程度
- frame_center_x = w / 2
- frame_center_y = h / 2
-
- offset_x = (person.center[0] - frame_center_x) / w # -0.5 ~ 0.5
- offset_y = (person.center[1] - frame_center_y) / h # -0.5 ~ 0.5
-
- # 计算人体占画面比例
- person_size_ratio = max(person.width / w, person.height / h)
-
- print(f"[AutoZoom] 检测到人体: 中心=({person.center[0]:.0f}, {person.center[1]:.0f}), "
- f"尺寸={person.width}x{person.height}, 占比={person.size_ratio:.2f}")
- print(f"[AutoZoom] 偏移: x={offset_x:.3f}, y={offset_y:.3f}")
-
- # 判断是否居中
- is_centered = abs(offset_x) < self.center_threshold and abs(offset_y) < self.center_threshold
-
- # 计算PTZ角度调整量 (根据视场角估算)
- # 假设当前zoom下的水平视场角约 60/zoom 度
- fov_per_pixel = (60.0 / current_zoom) / w
- pan_adjust = -offset_x * 60.0 / current_zoom # 简化计算
- tilt_adjust = -offset_y * 45.0 / current_zoom
-
- # 计算新的zoom
- if person_size_ratio < self.target_size_ratio * 0.8:
- # 人体太小,放大
- zoom_factor = self.target_size_ratio / person_size_ratio
- new_zoom = min(int(current_zoom * zoom_factor), self.max_zoom)
- elif person_size_ratio > self.target_size_ratio * 1.2:
- # 人体太大,缩小
- zoom_factor = self.target_size_ratio / person_size_ratio
- new_zoom = max(int(current_zoom * zoom_factor), self.min_zoom)
- else:
- # 大小合适
- new_zoom = current_zoom
-
- # 限制调整量
- if is_centered:
- pan_adjust = 0
- tilt_adjust = 0
-
- return ZoomAdjustResult(
- success=True,
- new_zoom=new_zoom,
- pan_adjust=pan_adjust,
- tilt_adjust=tilt_adjust,
- person_detected=True,
- person_centered=is_centered,
- message=f"人体居中={is_centered}, zoom调整={current_zoom}→{new_zoom}"
- )
-
- def auto_focus_loop(self, get_frame_func, max_attempts: int = None) -> Tuple[bool, int]:
- """
- 自动对焦循环
- 持续调整PTZ直到人体居中且大小合适
-
- Args:
- get_frame_func: 获取球机画面的函数
- max_attempts: 最大调整次数
-
- Returns:
- (是否成功, 最终zoom)
- """
- max_attempts = max_attempts or self.max_adjust_attempts
-
- current_pos = self.ptz.get_current_position()
- current_pan = current_pos.pan
- current_tilt = current_pos.tilt
- current_zoom = current_pos.zoom
-
- for attempt in range(max_attempts):
- print(f"[AutoZoom] 调整轮次 {attempt + 1}/{max_attempts}")
-
- # 等待画面稳定
- time.sleep(0.3)
-
- # 获取球机画面
- frame = get_frame_func()
-
- # 分析并计算调整
- result = self.adjust_to_person(frame, current_pan, current_tilt, current_zoom)
-
- if not result.person_detected:
- print(f"[AutoZoom] 未检测到人体,停止调整")
- return False, current_zoom
-
- if result.person_centered and result.new_zoom == current_zoom:
- print(f"[AutoZoom] 人体已居中且大小合适,调整完成")
- return True, current_zoom
-
- # 执行调整
- if result.pan_adjust != 0 or result.tilt_adjust != 0:
- new_pan = current_pan + result.pan_adjust
- new_tilt = current_tilt + result.tilt_adjust
- print(f"[AutoZoom] 调整角度: pan {current_pan:.1f}→{new_pan:.1f}, "
- f"tilt {current_tilt:.1f}→{new_tilt:.1f}")
- self.ptz.goto_exact_position(new_pan, new_tilt, result.new_zoom)
- current_pan = new_pan
- current_tilt = new_tilt
- elif result.new_zoom != current_zoom:
- print(f"[AutoZoom] 调整变焦: {current_zoom}→{result.new_zoom}")
- self.ptz.goto_exact_position(current_pan, current_tilt, result.new_zoom)
-
- current_zoom = result.new_zoom
-
- print(f"[AutoZoom] 达到最大调整次数,当前zoom={current_zoom}")
- return True, current_zoom
|