""" 多组摄像头系统管理器 管理多组(全景+球机)摄像头的并行独立运行 """ import os import sys import time import logging import threading from typing import List, Optional, Dict, Any from concurrent.futures import ThreadPoolExecutor # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import ( CAMERA_GROUPS, get_enabled_groups, SDK_PATH, DETECTION_CONFIG, COORDINATOR_CONFIG, CALIBRATION_CONFIG, SYSTEM_CONFIG ) from dahua_sdk import DahuaSDK from panorama_camera import ObjectDetector from ocr_recognizer import NumberDetector from camera_group import CameraGroup logger = logging.getLogger(__name__) class MultiGroupSystem: """ 多组摄像头系统管理器 管理多组(全景+球机)摄像头: - 共享 SDK 实例(大华SDK只需初始化一次) - 共享检测器(YOLO模型只加载一次) - 各组并行独立运行检测和抓拍 """ def __init__(self, config_override: dict = None): """ 初始化多组系统 Args: config_override: 配置覆盖 """ self.config = config_override or {} # 共享组件 self.sdk: Optional[DahuaSDK] = None self.detector: Optional[ObjectDetector] = None self.number_detector: Optional[NumberDetector] = None # 摄像头组列表 self.groups: List[CameraGroup] = [] # 运行状态 self.running = False self.initialized = False logger.info("[MultiGroupSystem] 多组摄像头系统实例创建") def initialize(self, skip_calibration: bool = False) -> bool: """ 初始化系统(SDK + 检测器 + 所有组) Args: skip_calibration: 是否跳过校准 Returns: 是否成功 """ logger.info("=" * 60) logger.info("[MultiGroupSystem] 开始初始化多组摄像头系统...") logger.info("=" * 60) # 1. 初始化检测器(先于SDK,避免内存冲突) try: self.detector = ObjectDetector( model_path=self.config.get('model_path', DETECTION_CONFIG.get('model_path')), use_gpu=self.config.get('use_gpu', DETECTION_CONFIG.get('use_gpu', True)), model_size=self.config.get('model_size', 'n'), model_type=self.config.get('model_type', DETECTION_CONFIG.get('model_type', 'auto')) ) logger.info("[MultiGroupSystem] 检测器初始化成功") except Exception as e: logger.warning(f"[MultiGroupSystem] 检测器初始化失败: {e}") # 2. 初始化编号检测器 try: from config import OCR_CONFIG ocr_config = { 'api_host': self.config.get('ocr_host', OCR_CONFIG['api_host']), 'api_port': self.config.get('ocr_port', OCR_CONFIG['api_port']), 'model': self.config.get('ocr_model', OCR_CONFIG['model']), } self.number_detector = NumberDetector(use_api=True, ocr_config=ocr_config) logger.info("[MultiGroupSystem] 编号检测器初始化成功") except Exception as e: logger.warning(f"[MultiGroupSystem] 编号检测器初始化失败: {e}") # 3. 初始化SDK sdk_path = os.path.join( self.config.get('sdk_path', SDK_PATH['lib_path']), self.config.get('netsdk', SDK_PATH['netsdk']) ) try: self.sdk = DahuaSDK(sdk_path) if not self.sdk.init(): logger.error("[MultiGroupSystem] SDK初始化失败") return False logger.info("[MultiGroupSystem] SDK初始化成功") except Exception as e: logger.error(f"[MultiGroupSystem] SDK加载失败: {e}") return False # 4. 获取启用的摄像头组配置 enabled_groups = get_enabled_groups() if not enabled_groups: logger.error("[MultiGroupSystem] 没有启用的摄像头组!请在 config/camera.py 中配置 CAMERA_GROUPS") return False logger.info(f"[MultiGroupSystem] 检测到 {len(enabled_groups)} 个启用的摄像头组") # 5. 初始化各组 shared_config = { 'coordinator_config': COORDINATOR_CONFIG, 'calibration_config': CALIBRATION_CONFIG, } success_count = 0 for group_config in enabled_groups: group_id = group_config.get('group_id', 'unknown') logger.info(f"[MultiGroupSystem] 初始化组: {group_id}") group = CameraGroup( group_config=group_config, sdk=self.sdk, detector=self.detector, number_detector=self.number_detector, shared_config=shared_config ) if group.initialize(skip_calibration=skip_calibration): self.groups.append(group) success_count += 1 logger.info(f"[MultiGroupSystem] 组 {group_id} 初始化成功") else: logger.error(f"[MultiGroupSystem] 组 {group_id} 初始化失败") if success_count == 0: logger.error("[MultiGroupSystem] 所有组初始化失败!") return False logger.info(f"[MultiGroupSystem] 成功初始化 {success_count}/{len(enabled_groups)} 个组") self.initialized = True return True def start(self) -> bool: """ 并行启动所有组 Returns: 是否成功 """ if not self.initialized: logger.error("[MultiGroupSystem] 系统未初始化,无法启动") return False logger.info(f"[MultiGroupSystem] 启动 {len(self.groups)} 个摄像头组...") success_count = 0 for group in self.groups: if group.start(): success_count += 1 else: logger.error(f"[MultiGroupSystem] 组 {group.group_id} 启动失败") if success_count == 0: logger.error("[MultiGroupSystem] 所有组启动失败!") return False self.running = True logger.info(f"[MultiGroupSystem] 成功启动 {success_count}/{len(self.groups)} 个组") # 启动定时校准 interval_hours = CALIBRATION_CONFIG.get('interval', 24 * 60 * 60) // 3600 daily_time = CALIBRATION_CONFIG.get('daily_calibration_time', '08:00') for group in self.groups: group.start_scheduled_calibration( interval_hours=interval_hours, daily_time=daily_time ) return True def stop(self): """停止所有组""" logger.info("[MultiGroupSystem] 停止所有摄像头组...") self.running = False for group in self.groups: try: group.stop() except Exception as e: logger.error(f"[MultiGroupSystem] 停止组 {group.group_id} 失败: {e}") # 清理SDK if self.sdk: try: self.sdk.cleanup() except Exception as e: logger.error(f"[MultiGroupSystem] SDK清理失败: {e}") logger.info("[MultiGroupSystem] 系统已停止") def get_status(self) -> Dict[str, Any]: """获取系统状态""" return { 'running': self.running, 'initialized': self.initialized, 'total_groups': len(self.groups), 'groups': [group.get_status() for group in self.groups], } def calibrate_all(self, force: bool = False) -> bool: """ 校准所有组 Args: force: 是否强制重新校准 Returns: 是否全部成功 """ logger.info(f"[MultiGroupSystem] 开始校准所有组...") success_count = 0 for group in self.groups: logger.info(f"[MultiGroupSystem] 校准组: {group.group_id}") if group._auto_calibrate(force=force): success_count += 1 logger.info(f"[MultiGroupSystem] 校准完成: {success_count}/{len(self.groups)} 组成功") return success_count == len(self.groups) def wait(self): """等待系统运行(阻塞)""" try: while self.running: time.sleep(1) except KeyboardInterrupt: logger.info("[MultiGroupSystem] 收到中断信号") self.stop() def main(): """多组系统主入口""" import argparse parser = argparse.ArgumentParser(description='多组摄像头联动抓拍系统') parser.add_argument('--skip-calibration', action='store_true', help='跳过自动校准') parser.add_argument('--calibrate-only', action='store_true', help='仅执行校准后退出') args = parser.parse_args() # 设置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # 创建系统 system = MultiGroupSystem() # 初始化 if not system.initialize(skip_calibration=args.skip_calibration): print("系统初始化失败!") return 1 # 仅校准模式 if args.calibrate_only: print("校准完成,退出。") system.stop() return 0 # 启动 if not system.start(): print("系统启动失败!") return 1 print(f"\n多组摄像头系统已启动 ({len(system.groups)} 个组)") print("按 Ctrl+C 停止\n") # 等待 try: system.wait() except KeyboardInterrupt: pass return 0 if __name__ == '__main__': sys.exit(main())