| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392 |
- """
- 摄像头组封装类
- 封装一组全景+球机摄像头的所有组件
- """
- import os
- import time
- import logging
- import threading
- from typing import Optional, Dict, Any, Callable
- from dataclasses import dataclass
- import numpy as np
- from panorama_camera import PanoramaCamera, ObjectDetector, DetectedObject
- from ptz_camera import PTZCamera
- from ocr_recognizer import NumberDetector, PersonInfo
- from coordinator import SequentialCoordinator
- from calibration import CameraCalibrator, CalibrationManager
- from paired_image_saver import PairedImageSaver
- logger = logging.getLogger(__name__)
- @dataclass
- class GroupConfig:
- """摄像头组配置"""
- group_id: str
- name: str
- panorama_config: Dict[str, Any]
- ptz_config: Dict[str, Any]
- calibration_file: str
- paired_image_dir: str
- class CameraGroup:
- """
- 摄像头组封装类
-
- 封装一组全景+球机摄像头的所有组件,包括:
- - 全景摄像头实例
- - 球机实例
- - 校准器
- - 联动控制器
- - 配对图片保存器
- """
-
- def __init__(self,
- group_config: Dict[str, Any],
- sdk,
- detector: ObjectDetector,
- number_detector: Optional[NumberDetector] = None,
- shared_config: Optional[Dict[str, Any]] = None):
- """
- 初始化摄像头组
-
- Args:
- group_config: 组配置字典
- sdk: 大华SDK实例(共享)
- detector: 检测器实例(共享)
- number_detector: 编号检测器实例(可选,共享)
- shared_config: 共享配置(校准配置、联动配置等)
- """
- self.group_id = group_config.get('group_id', 'unknown')
- self.name = group_config.get('name', self.group_id)
- self.config = group_config
- self.sdk = sdk
- self.detector = detector
- self.number_detector = number_detector
- self.shared_config = shared_config or {}
-
- # 组件实例
- self.panorama_camera: Optional[PanoramaCamera] = None
- self.ptz_camera: Optional[PTZCamera] = None
- self.calibrator: Optional[CameraCalibrator] = None
- self.calibration_manager: Optional[CalibrationManager] = None
- self.coordinator: Optional[SequentialCoordinator] = None
- self.paired_saver: Optional[PairedImageSaver] = None
-
- # 校准数据文件路径
- self.calibration_file = group_config.get(
- 'calibration_file',
- f'/home/admin/dsh/calibration_{self.group_id}.json'
- )
-
- # 配对图片目录
- self.paired_image_dir = group_config.get(
- 'paired_image_dir',
- f'/home/admin/dsh/paired_images_{self.group_id}'
- )
-
- # 运行状态
- self.running = False
- self.initialized = False
-
- # 定时校准
- self.calibration_thread = None
- self.calibration_running = False
-
- logger.info(f"[{self.group_id}] 摄像头组实例创建: {self.name}")
-
- def initialize(self, skip_calibration: bool = False) -> bool:
- """
- 初始化组内所有组件
-
- Args:
- skip_calibration: 是否跳过校准
-
- Returns:
- 是否成功
- """
- logger.info(f"[{self.group_id}] 开始初始化摄像头组...")
-
- # 1. 初始化全景摄像头
- panorama_config = self.config.get('panorama', {})
- panorama_config['group_id'] = self.group_id
- self.panorama_camera = PanoramaCamera(self.sdk, panorama_config)
-
- # 2. 初始化球机
- ptz_config = self.config.get('ptz', {})
- ptz_config['group_id'] = self.group_id
- self.ptz_camera = PTZCamera(self.sdk, ptz_config)
-
- # 3. 初始化配对图片保存器
- try:
- self.paired_saver = PairedImageSaver(
- base_dir=self.paired_image_dir,
- time_window=5.0
- )
- logger.info(f"[{self.group_id}] 配对图片保存器初始化成功: {self.paired_image_dir}")
- except Exception as e:
- logger.warning(f"[{self.group_id}] 配对图片保存器初始化失败: {e}")
-
- # 4. 初始化联动控制器
- self.coordinator = SequentialCoordinator(
- self.panorama_camera,
- self.ptz_camera,
- self.detector,
- self.number_detector
- )
-
- # 应用顺序模式配置
- from config import COORDINATOR_CONFIG
- seq_config = COORDINATOR_CONFIG.get('sequential_mode', {})
- self.coordinator.set_capture_config(
- ptz_stabilize_time=seq_config.get('ptz_stabilize_time', 2.5),
- capture_wait_time=seq_config.get('capture_wait_time', 0.5),
- return_to_panorama=seq_config.get('return_to_panorama', True),
- default_pan=seq_config.get('default_pan', 0.0),
- default_tilt=seq_config.get('default_tilt', 0.0),
- default_zoom=seq_config.get('default_zoom', 1)
- )
-
- # 设置配对保存器
- if self.paired_saver:
- self.coordinator._paired_saver = self.paired_saver
- self.coordinator._enable_paired_saving = True
-
- # 5. 设置回调
- self._setup_callbacks()
-
- logger.info(f"[{self.group_id}] 组件初始化完成")
-
- # 6. 执行校准
- from config import SYSTEM_CONFIG
- should_calibrate = SYSTEM_CONFIG.get('enable_calibration', True)
- if not should_calibrate:
- logger.info(f"[{self.group_id}] 自动校准已禁用")
- elif skip_calibration:
- logger.info(f"[{self.group_id}] 自动校准已跳过")
- else:
- if not self._auto_calibrate():
- logger.error(f"[{self.group_id}] 自动校准失败!")
- return False
-
- self.initialized = True
- return True
-
- def _auto_calibrate(self, force: bool = False) -> bool:
- """
- 执行自动校准
-
- Args:
- force: 是否强制重新校准
-
- Returns:
- 是否成功
- """
- logger.info(f"[{self.group_id}] 开始自动校准...")
-
- # 连接摄像头
- if not self.panorama_camera.connect():
- logger.error(f"[{self.group_id}] 连接全景摄像头失败")
- return False
-
- if not self.ptz_camera.connect():
- logger.error(f"[{self.group_id}] 连接球机失败")
- self.panorama_camera.disconnect()
- return False
-
- # 启动视频流
- if not self.panorama_camera.start_stream_rtsp():
- logger.warning(f"[{self.group_id}] 全景RTSP启动失败,尝试SDK方式...")
- self.panorama_camera.start_stream()
-
- if not self.ptz_camera.start_stream_rtsp():
- logger.warning(f"[{self.group_id}] 球机RTSP启动失败")
-
- # 等待视频流稳定
- logger.info(f"[{self.group_id}] 等待视频流稳定...")
- time.sleep(3)
-
- # 创建校准器
- self.calibrator = CameraCalibrator(
- ptz_camera=self.ptz_camera,
- get_frame_func=self.panorama_camera.get_frame,
- detect_marker_func=None,
- ptz_capture_func=self._capture_ptz_frame,
- calibration_file=self.calibration_file
- )
-
- # 配置校准参数
- from config import CALIBRATION_CONFIG
- overlap_cfg = CALIBRATION_CONFIG.get('overlap_discovery', {})
- self.calibrator.overlap_pan_range = overlap_cfg.get('pan_range', (0, 360))
- self.calibrator.overlap_tilt_range = overlap_cfg.get('tilt_range', (-30, 30))
- self.calibrator.overlap_pan_step = overlap_cfg.get('pan_step', 20)
- self.calibrator.overlap_tilt_step = overlap_cfg.get('tilt_step', 15)
- self.calibrator.stabilize_time = overlap_cfg.get('stabilize_time', 2.0)
-
- # 创建校准管理器
- self.calibration_manager = CalibrationManager(self.calibrator)
-
- # 执行校准
- result = self.calibration_manager.auto_calibrate(
- force=force,
- fallback_on_failure=True
- )
-
- if not result.success:
- logger.error(f"[{self.group_id}] 校准失败: {result.error_message}")
- return False
-
- logger.info(f"[{self.group_id}] 校准成功! RMS误差: {result.rms_error:.4f}")
-
- # 设置校准器到联动控制器
- if self.coordinator and self.calibrator.is_calibrated():
- self.coordinator.set_calibrator(self.calibrator)
-
- return True
-
- def _capture_ptz_frame(self) -> Optional[np.ndarray]:
- """从球机抓拍一帧"""
- if self.ptz_camera is None:
- return None
- try:
- return self.ptz_camera.get_frame()
- except Exception as e:
- logger.error(f"[{self.group_id}] 球机抓拍失败: {e}")
- return None
-
- def _setup_callbacks(self):
- """设置回调函数"""
- def on_person_detected(person: DetectedObject, frame: np.ndarray):
- logger.info(f"[{self.group_id}] 检测到人体: 位置={person.center}, 置信度={person.confidence:.2f}")
-
- def on_number_recognized(person_info: PersonInfo):
- logger.info(f"[{self.group_id}] 识别到编号: {person_info.number_text}")
-
- if self.coordinator:
- self.coordinator.on_person_detected = on_person_detected
- self.coordinator.on_number_recognized = on_number_recognized
-
- def start(self) -> bool:
- """
- 启动组内联动
-
- Returns:
- 是否成功
- """
- if not self.initialized:
- logger.error(f"[{self.group_id}] 组件未初始化,无法启动")
- return False
-
- logger.info(f"[{self.group_id}] 启动摄像头组...")
-
- # 启动联动控制器
- if self.coordinator:
- if not self.coordinator.start():
- logger.error(f"[{self.group_id}] 联动控制器启动失败")
- return False
-
- self.running = True
- logger.info(f"[{self.group_id}] 摄像头组启动成功")
- return True
-
- def stop(self):
- """停止组内所有组件"""
- logger.info(f"[{self.group_id}] 停止摄像头组...")
-
- self.running = False
- self.calibration_running = False
-
- # 停止联动控制器
- if self.coordinator:
- self.coordinator.stop()
-
- # 停止视频流
- if self.panorama_camera:
- self.panorama_camera.stop_stream()
- self.panorama_camera.disconnect()
-
- if self.ptz_camera:
- self.ptz_camera.stop_stream()
- self.ptz_camera.disconnect()
-
- # 关闭配对保存器
- if self.paired_saver:
- self.paired_saver.close()
-
- logger.info(f"[{self.group_id}] 摄像头组已停止")
-
- def get_status(self) -> Dict[str, Any]:
- """获取组状态"""
- return {
- 'group_id': self.group_id,
- 'name': self.name,
- 'running': self.running,
- 'initialized': self.initialized,
- 'calibrated': self.calibrator.is_calibrated() if self.calibrator else False,
- 'panorama_connected': self.panorama_camera.is_connected() if self.panorama_camera else False,
- 'ptz_connected': self.ptz_camera.is_connected() if self.ptz_camera else False,
- }
-
- def start_scheduled_calibration(self, interval_hours: int = 24, daily_time: str = '08:00'):
- """
- 启动定时校准
-
- Args:
- interval_hours: 校准间隔(小时)
- daily_time: 每日校准时间 (HH:MM)
- """
- if self.calibration_thread and self.calibration_thread.is_alive():
- logger.warning(f"[{self.group_id}] 定时校准已在运行")
- return
-
- self.calibration_running = True
- self.calibration_thread = threading.Thread(
- target=self._calibration_scheduler,
- args=(interval_hours, daily_time),
- name=f"calibration-{self.group_id}",
- daemon=True
- )
- self.calibration_thread.start()
- logger.info(f"[{self.group_id}] 定时校准已启动,每日 {daily_time} 执行")
-
- def _calibration_scheduler(self, interval_hours: int, daily_time: str):
- """定时校准调度器"""
- from datetime import datetime, timedelta
- import time
-
- while self.calibration_running and self.running:
- try:
- # 计算下次校准时间
- now = datetime.now()
- target_time = datetime.strptime(daily_time, '%H:%M')
- next_calibration = now.replace(
- hour=target_time.hour,
- minute=target_time.minute,
- second=0,
- microsecond=0
- )
-
- if next_calibration <= now:
- next_calibration += timedelta(days=1)
-
- wait_seconds = (next_calibration - now).total_seconds()
- logger.info(f"[{self.group_id}] 下次校准时间: {next_calibration} (等待 {wait_seconds/3600:.1f} 小时)")
-
- # 等待
- time.sleep(min(wait_seconds, 60)) # 每分钟检查一次
-
- if not self.calibration_running or not self.running:
- break
-
- # 检查是否到达校准时间
- if datetime.now() >= next_calibration:
- logger.info(f"[{self.group_id}] 开始定时校准...")
- self._auto_calibrate(force=True)
-
- except Exception as e:
- logger.error(f"[{self.group_id}] 定时校准错误: {e}")
- time.sleep(60)
|