""" 简化版联动控制器 核心功能:全景检测到人 → 球机逐个定位抓拍 """ import os os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1' import time import threading import logging from typing import Optional, List, Tuple from dataclasses import dataclass from datetime import datetime from pathlib import Path import cv2 import numpy as np logger = logging.getLogger(__name__) @dataclass class PersonTarget: """检测到的人员目标""" index: int # 序号(用于标记) position: Tuple[float, float] # 位置比例 (x_ratio, y_ratio) bbox: Tuple[int, int, int, int] # 边界框 (x, y, w, h) area: int # 面积 confidence: float # 置信度 class SimpleCoordinator: """ 简化版联动控制器 工作流程: 1. 全景摄像头实时检测人体 2. 检测到人后,按面积从大到小排序 3. 球机逐个定位到每个人,抓拍保存 4. 所有人抓拍完成后,回到初始位置 """ def __init__(self, panorama_camera, ptz_camera, detector, calibrator=None): """ 初始化 Args: panorama_camera: 全景摄像头实例 ptz_camera: 球机实例 detector: 人体检测器 calibrator: 校准器(可选,用于坐标转换) """ self.panorama = panorama_camera self.ptz = ptz_camera self.detector = detector self.calibrator = calibrator # 配置 self.detection_interval = 1.0 # 检测间隔(秒) self.ptz_move_wait = 0.5 # PTZ移动后等待时间(秒) self.min_person_area = 2000 # 最小人体面积(像素²) self.confidence_threshold = 0.7 # 置信度阈值 self.default_zoom = 8 # 默认变倍 self.save_dir = Path('/home/admin/dsh/captures') # 抓拍保存目录 self.initial_position = (90, 0, 1) # 初始位置 (pan, tilt, zoom) # 状态 self.running = False self.capturing = False # 是否正在抓拍 self.worker_thread = None self.lock = threading.Lock() # 统计 self.stats = { 'frames_processed': 0, 'persons_detected': 0, 'captures_taken': 0, } # 创建保存目录 self.save_dir.mkdir(parents=True, exist_ok=True) def start(self) -> bool: """启动联动系统""" if self.running: return True # 连接全景摄像头 if not self.panorama.connect(): logger.error("连接全景摄像头失败") return False # 连接球机 if not self.ptz.connect(): logger.error("连接球机失败") self.panorama.disconnect() return False # 启动全景视频流 if not self.panorama.start_stream_rtsp(): logger.error("启动全景视频流失败") self.panorama.disconnect() self.ptz.disconnect() return False # 启动球机视频流(用于抓拍) if not self.ptz.start_stream_rtsp(): logger.warning("球机视频流启动失败,抓拍可能失败") # 移动球机到初始位置 self.ptz.goto_exact_position(*self.initial_position) # 启动工作线程 self.running = True self.worker_thread = threading.Thread(target=self._worker, daemon=True) self.worker_thread.start() logger.info("简化版联动系统已启动") return True def stop(self): """停止联动系统""" self.running = False if self.worker_thread: self.worker_thread.join(timeout=3) self.panorama.disconnect() self.ptz.disconnect() logger.info(f"联动系统已停止,统计: {self.stats}") def _worker(self): """工作线程""" last_detection_time = 0 while self.running: try: current_time = time.time() # 获取当前帧 frame = self.panorama.get_frame() if frame is None: time.sleep(0.01) continue self.stats['frames_processed'] += 1 frame_size = (frame.shape[1], frame.shape[0]) # 周期性检测 if current_time - last_detection_time >= self.detection_interval: last_detection_time = current_time # 检测人体 persons = self._detect_persons(frame) if persons: logger.info(f"[检测] 检测到 {len(persons)} 人") self.stats['persons_detected'] += len(persons) # 保存全景检测图(标记人员) self._save_panorama_detection(frame, persons) # 逐个抓拍 self._capture_all_persons(persons, frame_size) time.sleep(0.01) except Exception as e: logger.error(f"工作线程错误: {e}") time.sleep(0.1) def _detect_persons(self, frame: np.ndarray) -> List[PersonTarget]: """检测人体""" if self.detector is None: return [] # 使用检测器 detections = self.detector.detect_persons(frame) if not detections: return [] frame_h, frame_w = frame.shape[:2] persons = [] for i, det in enumerate(detections): # 过滤低置信度 if det.confidence < self.confidence_threshold: continue x, y, w, h = det.bbox area = w * h # 过滤小目标 if area < self.min_person_area: continue # 计算位置比例 center_x = x + w // 2 center_y = y + h // 2 x_ratio = center_x / frame_w y_ratio = center_y / frame_h persons.append(PersonTarget( index=i, position=(x_ratio, y_ratio), bbox=det.bbox, area=area, confidence=det.confidence )) # 按面积从大到小排序 persons.sort(key=lambda p: p.area, reverse=True) # 重新分配序号(按排序后的顺序) for i, p in enumerate(persons): p.index = i return persons def _save_panorama_detection(self, frame: np.ndarray, persons: List[PersonTarget]): """保存全景检测图(标记人员)""" marked_frame = frame.copy() for p in persons: x, y, w, h = p.bbox # 绘制边界框(绿色) cv2.rectangle(marked_frame, (x, y), (x + w, y + h), (0, 255, 0), 2) # 绘制序号标签 label = f"person_{p.index}" cv2.putText( marked_frame, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2 ) # 保存 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"panorama_{timestamp}_n{len(persons)}.jpg" filepath = self.save_dir / filename cv2.imwrite(str(filepath), marked_frame) logger.info(f"[全景] 保存检测图: {filepath}") def _capture_all_persons(self, persons: List[PersonTarget], frame_size: Tuple[int, int]): """逐个抓拍所有人员""" with self.lock: if self.capturing: logger.info("正在抓拍中,跳过本次检测") return self.capturing = True try: logger.info(f"[抓拍] 开始逐个抓拍 {len(persons)} 人") # 创建本次抓拍的子目录 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") batch_dir = self.save_dir / f"batch_{timestamp}" batch_dir.mkdir(parents=True, exist_ok=True) for i, person in enumerate(persons): if not self.running: break logger.info(f"[抓拍] 正在抓拍 person_{person.index} ({i+1}/{len(persons)})") # 计算PTZ角度 pan, tilt, zoom = self._calculate_ptz_position( person.position[0], person.position[1] ) # 移动球机 logger.info(f"[PTZ] 移动到: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}") self.ptz.goto_exact_position(pan, tilt, zoom) # 等待画面稳定 time.sleep(self.ptz_move_wait) # 抓拍球机画面 ptz_frame = self._get_stable_frame() if ptz_frame is not None: # 保存抓拍图 filename = f"ptz_person_{person.index}.jpg" filepath = batch_dir / filename cv2.imwrite(str(filepath), ptz_frame) self.stats['captures_taken'] += 1 logger.info(f"[抓拍] 保存球机图: {filepath}") else: logger.warning(f"[抓拍] 获取球机画面失败: person_{person.index}") # 所有人抓拍完成,回到初始位置 logger.info(f"[抓拍] 完成 {len(persons)} 人抓拍,返回初始位置") self.ptz.goto_exact_position(*self.initial_position) finally: self.capturing = False def _calculate_ptz_position(self, x_ratio: float, y_ratio: float) -> Tuple[float, float, int]: """计算PTZ位置""" # 如果有校准器,使用校准结果 if self.calibrator and self.calibrator.is_calibrated(): pan, tilt = self.calibrator.transform(x_ratio, y_ratio) return (pan, tilt, self.default_zoom) # 否则使用估算 # 假设全景视野对应 pan: 0-180°, tilt: -45°~45° pan = x_ratio * 180 tilt = -45 + y_ratio * 90 # y_ratio=0.5 对应 tilt=0 return (pan, tilt, self.default_zoom) def _get_stable_frame(self, max_attempts: int = 5) -> Optional[np.ndarray]: """获取稳定清晰的帧""" best_frame = None best_score = -1 for _ in range(max_attempts): frame = self.ptz.get_frame() if frame is not None: # 评估清晰度 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) score = cv2.Laplacian(gray, cv2.CV_64F).var() if score > best_score: best_score = score best_frame = frame.copy() # 清晰度足够高就直接返回 if score > 100: return frame time.sleep(0.1) return best_frame def get_stats(self) -> dict: """获取统计信息""" return self.stats.copy()