""" 摄像头组封装类 封装一组全景+球机摄像头的所有组件 """ import os import time import logging import threading from typing import Optional, Dict, Any, Callable from dataclasses import dataclass import numpy as np from panorama_camera import PanoramaCamera, ObjectDetector, DetectedObject from ptz_camera import PTZCamera from ocr_recognizer import NumberDetector, PersonInfo from coordinator import SequentialCoordinator from calibration import CameraCalibrator, CalibrationManager from paired_image_saver import PairedImageSaver logger = logging.getLogger(__name__) @dataclass class GroupConfig: """摄像头组配置""" group_id: str name: str panorama_config: Dict[str, Any] ptz_config: Dict[str, Any] calibration_file: str paired_image_dir: str class CameraGroup: """ 摄像头组封装类 封装一组全景+球机摄像头的所有组件,包括: - 全景摄像头实例 - 球机实例 - 校准器 - 联动控制器 - 配对图片保存器 """ def __init__(self, group_config: Dict[str, Any], sdk, detector: ObjectDetector, number_detector: Optional[NumberDetector] = None, shared_config: Optional[Dict[str, Any]] = None): """ 初始化摄像头组 Args: group_config: 组配置字典 sdk: 大华SDK实例(共享) detector: 检测器实例(共享) number_detector: 编号检测器实例(可选,共享) shared_config: 共享配置(校准配置、联动配置等) """ self.group_id = group_config.get('group_id', 'unknown') self.name = group_config.get('name', self.group_id) self.config = group_config self.sdk = sdk self.detector = detector self.number_detector = number_detector self.shared_config = shared_config or {} # 组件实例 self.panorama_camera: Optional[PanoramaCamera] = None self.ptz_camera: Optional[PTZCamera] = None self.calibrator: Optional[CameraCalibrator] = None self.calibration_manager: Optional[CalibrationManager] = None self.coordinator: Optional[SequentialCoordinator] = None self.paired_saver: Optional[PairedImageSaver] = None # 校准数据文件路径 self.calibration_file = group_config.get( 'calibration_file', f'/home/admin/dsh/calibration_{self.group_id}.json' ) # 配对图片目录 self.paired_image_dir = group_config.get( 'paired_image_dir', f'/home/admin/dsh/paired_images_{self.group_id}' ) # 运行状态 self.running = False self.initialized = False # 定时校准 self.calibration_thread = None self.calibration_running = False logger.info(f"[{self.group_id}] 摄像头组实例创建: {self.name}") def initialize(self, skip_calibration: bool = False) -> bool: """ 初始化组内所有组件 Args: skip_calibration: 是否跳过校准 Returns: 是否成功 """ logger.info(f"[{self.group_id}] 开始初始化摄像头组...") # 1. 初始化全景摄像头 panorama_config = self.config.get('panorama', {}) panorama_config['group_id'] = self.group_id self.panorama_camera = PanoramaCamera(self.sdk, panorama_config) # 2. 初始化球机 ptz_config = self.config.get('ptz', {}) ptz_config['group_id'] = self.group_id self.ptz_camera = PTZCamera(self.sdk, ptz_config) # 3. 初始化配对图片保存器 try: self.paired_saver = PairedImageSaver( base_dir=self.paired_image_dir, time_window=5.0 ) logger.info(f"[{self.group_id}] 配对图片保存器初始化成功: {self.paired_image_dir}") except Exception as e: logger.warning(f"[{self.group_id}] 配对图片保存器初始化失败: {e}") # 4. 初始化联动控制器 self.coordinator = SequentialCoordinator( self.panorama_camera, self.ptz_camera, self.detector, self.number_detector ) # 应用顺序模式配置 from config import COORDINATOR_CONFIG seq_config = COORDINATOR_CONFIG.get('sequential_mode', {}) self.coordinator.set_capture_config( ptz_stabilize_time=seq_config.get('ptz_stabilize_time', 2.5), capture_wait_time=seq_config.get('capture_wait_time', 0.5), return_to_panorama=seq_config.get('return_to_panorama', True), default_pan=seq_config.get('default_pan', 0.0), default_tilt=seq_config.get('default_tilt', 0.0), default_zoom=seq_config.get('default_zoom', 1) ) # 设置配对保存器 if self.paired_saver: self.coordinator._paired_saver = self.paired_saver self.coordinator._enable_paired_saving = True # 5. 设置回调 self._setup_callbacks() logger.info(f"[{self.group_id}] 组件初始化完成") # 6. 执行校准 from config import SYSTEM_CONFIG should_calibrate = SYSTEM_CONFIG.get('enable_calibration', True) if not should_calibrate: logger.info(f"[{self.group_id}] 自动校准已禁用") elif skip_calibration: logger.info(f"[{self.group_id}] 自动校准已跳过") else: if not self._auto_calibrate(): logger.error(f"[{self.group_id}] 自动校准失败!") return False self.initialized = True return True def _auto_calibrate(self, force: bool = False) -> bool: """ 执行自动校准 Args: force: 是否强制重新校准 Returns: 是否成功 """ logger.info(f"[{self.group_id}] 开始自动校准...") # 连接摄像头 if not self.panorama_camera.connect(): logger.error(f"[{self.group_id}] 连接全景摄像头失败") return False if not self.ptz_camera.connect(): logger.error(f"[{self.group_id}] 连接球机失败") self.panorama_camera.disconnect() return False # 启动视频流 if not self.panorama_camera.start_stream_rtsp(): logger.warning(f"[{self.group_id}] 全景RTSP启动失败,尝试SDK方式...") self.panorama_camera.start_stream() if not self.ptz_camera.start_stream_rtsp(): logger.warning(f"[{self.group_id}] 球机RTSP启动失败") # 等待视频流稳定 logger.info(f"[{self.group_id}] 等待视频流稳定...") time.sleep(3) # 创建校准器 self.calibrator = CameraCalibrator( ptz_camera=self.ptz_camera, get_frame_func=self.panorama_camera.get_frame, detect_marker_func=None, ptz_capture_func=self._capture_ptz_frame, calibration_file=self.calibration_file ) # 配置校准参数 from config import CALIBRATION_CONFIG overlap_cfg = CALIBRATION_CONFIG.get('overlap_discovery', {}) self.calibrator.overlap_pan_range = overlap_cfg.get('pan_range', (0, 360)) self.calibrator.overlap_tilt_range = overlap_cfg.get('tilt_range', (-30, 30)) self.calibrator.overlap_pan_step = overlap_cfg.get('pan_step', 20) self.calibrator.overlap_tilt_step = overlap_cfg.get('tilt_step', 15) self.calibrator.stabilize_time = overlap_cfg.get('stabilize_time', 2.0) # 创建校准管理器 self.calibration_manager = CalibrationManager(self.calibrator) # 执行校准 result = self.calibration_manager.auto_calibrate( force=force, fallback_on_failure=True ) if not result.success: logger.error(f"[{self.group_id}] 校准失败: {result.error_message}") return False logger.info(f"[{self.group_id}] 校准成功! RMS误差: {result.rms_error:.4f}") # 设置校准器到联动控制器 if self.coordinator and self.calibrator.is_calibrated(): self.coordinator.set_calibrator(self.calibrator) return True def _capture_ptz_frame(self) -> Optional[np.ndarray]: """从球机抓拍一帧""" if self.ptz_camera is None: return None try: return self.ptz_camera.get_frame() except Exception as e: logger.error(f"[{self.group_id}] 球机抓拍失败: {e}") return None def _setup_callbacks(self): """设置回调函数""" def on_person_detected(person: DetectedObject, frame: np.ndarray): logger.info(f"[{self.group_id}] 检测到人体: 位置={person.center}, 置信度={person.confidence:.2f}") def on_number_recognized(person_info: PersonInfo): logger.info(f"[{self.group_id}] 识别到编号: {person_info.number_text}") if self.coordinator: self.coordinator.on_person_detected = on_person_detected self.coordinator.on_number_recognized = on_number_recognized def start(self) -> bool: """ 启动组内联动 Returns: 是否成功 """ if not self.initialized: logger.error(f"[{self.group_id}] 组件未初始化,无法启动") return False logger.info(f"[{self.group_id}] 启动摄像头组...") # 启动联动控制器 if self.coordinator: if not self.coordinator.start(): logger.error(f"[{self.group_id}] 联动控制器启动失败") return False self.running = True logger.info(f"[{self.group_id}] 摄像头组启动成功") return True def stop(self): """停止组内所有组件""" logger.info(f"[{self.group_id}] 停止摄像头组...") self.running = False self.calibration_running = False # 停止联动控制器 if self.coordinator: self.coordinator.stop() # 停止视频流 if self.panorama_camera: self.panorama_camera.stop_stream() self.panorama_camera.disconnect() if self.ptz_camera: self.ptz_camera.stop_stream() self.ptz_camera.disconnect() # 关闭配对保存器 if self.paired_saver: self.paired_saver.close() logger.info(f"[{self.group_id}] 摄像头组已停止") def get_status(self) -> Dict[str, Any]: """获取组状态""" return { 'group_id': self.group_id, 'name': self.name, 'running': self.running, 'initialized': self.initialized, 'calibrated': self.calibrator.is_calibrated() if self.calibrator else False, 'panorama_connected': self.panorama_camera.is_connected() if self.panorama_camera else False, 'ptz_connected': self.ptz_camera.is_connected() if self.ptz_camera else False, } def start_scheduled_calibration(self, interval_hours: int = 24, daily_time: str = '08:00'): """ 启动定时校准 Args: interval_hours: 校准间隔(小时) daily_time: 每日校准时间 (HH:MM) """ if self.calibration_thread and self.calibration_thread.is_alive(): logger.warning(f"[{self.group_id}] 定时校准已在运行") return self.calibration_running = True self.calibration_thread = threading.Thread( target=self._calibration_scheduler, args=(interval_hours, daily_time), name=f"calibration-{self.group_id}", daemon=True ) self.calibration_thread.start() logger.info(f"[{self.group_id}] 定时校准已启动,每日 {daily_time} 执行") def _calibration_scheduler(self, interval_hours: int, daily_time: str): """定时校准调度器""" from datetime import datetime, timedelta import time while self.calibration_running and self.running: try: # 计算下次校准时间 now = datetime.now() target_time = datetime.strptime(daily_time, '%H:%M') next_calibration = now.replace( hour=target_time.hour, minute=target_time.minute, second=0, microsecond=0 ) if next_calibration <= now: next_calibration += timedelta(days=1) wait_seconds = (next_calibration - now).total_seconds() logger.info(f"[{self.group_id}] 下次校准时间: {next_calibration} (等待 {wait_seconds/3600:.1f} 小时)") # 等待 time.sleep(min(wait_seconds, 60)) # 每分钟检查一次 if not self.calibration_running or not self.running: break # 检查是否到达校准时间 if datetime.now() >= next_calibration: logger.info(f"[{self.group_id}] 开始定时校准...") self._auto_calibrate(force=True) except Exception as e: logger.error(f"[{self.group_id}] 定时校准错误: {e}") time.sleep(60)