""" 双摄像头联动抓拍系统 - 主程序 系统功能: 1. 全景摄像头实时监控和物体检测 2. 检测到人体后,球机自动变焦定位 3. 对人体进行分割并OCR识别衣服上的编号 """ import os import sys import time import argparse import logging import threading from typing import Optional import cv2 import numpy as np # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import ( PANORAMA_CAMERA, PTZ_CAMERA, SDK_PATH, DETECTION_CONFIG, PTZ_CONFIG, OCR_CONFIG, COORDINATOR_CONFIG, CALIBRATION_CONFIG, LOG_CONFIG ) from dahua_sdk import DahuaSDK from panorama_camera import PanoramaCamera, ObjectDetector, PersonTracker, DetectedObject from ptz_camera import PTZCamera, PTZController from ocr_recognizer import NumberDetector, PersonInfo from coordinator import Coordinator, EventDrivenCoordinator # 配置日志 - 使用LOG_CONFIG def setup_logging(): """设置日志配置""" log_level = getattr(logging, LOG_CONFIG.get('level', 'INFO'), logging.INFO) log_format = LOG_CONFIG.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s') log_file = LOG_CONFIG.get('file') handlers = [logging.StreamHandler()] if log_file: from logging.handlers import RotatingFileHandler file_handler = RotatingFileHandler( log_file, maxBytes=LOG_CONFIG.get('max_bytes', 10*1024*1024), backupCount=LOG_CONFIG.get('backup_count', 5) ) file_handler.setFormatter(logging.Formatter(log_format)) handlers.append(file_handler) logging.basicConfig( level=log_level, format=log_format, handlers=handlers ) setup_logging() logger = logging.getLogger(__name__) class DualCameraSystem: """ 双摄像头联动抓拍系统 """ def __init__(self, config_override: dict = None): """ 初始化系统 Args: config_override: 配置覆盖 """ self.config = config_override or {} # SDK self.sdk = None # 摄像头 self.panorama_camera = None self.ptz_camera = None # 检测器和识别器 self.detector = None self.number_detector = None # 联动控制器 self.coordinator = None # 校准器 self.calibrator = None self.calibration_manager = None # 定时校准 self.calibration_interval = CALIBRATION_CONFIG.get('interval', 5 * 60) # 默认5分钟 self.calibration_thread = None self.calibration_running = False self.last_calibration_time = 0 # 运行标志 self.running = False def initialize(self, skip_calibration: bool = False) -> bool: """ 初始化系统组件 Args: skip_calibration: 是否跳过校准 Returns: 是否成功 """ logger.info("初始化双摄像头联动系统...") # 先初始化检测器(YOLO/PyTorch),再加载大华SDK # 大华SDK与PyTorch共享进程空间时可能导致内存冲突, # 先加载PyTorch可避免SDK的内存映射覆盖PyTorch运行时 # 初始化检测器 (YOLO11/RKNN/ONNX) try: from config import DETECTION_CONFIG self.detector = ObjectDetector( model_path=self.config.get('model_path', DETECTION_CONFIG.get('model_path')), use_gpu=self.config.get('use_gpu', DETECTION_CONFIG.get('use_gpu', True)), model_size=self.config.get('model_size', 'n'), model_type=self.config.get('model_type', DETECTION_CONFIG.get('model_type', 'auto')) ) logger.info("检测器初始化成功") except Exception as e: logger.warning(f"检测器初始化失败: {e}") # 初始化编号检测器 (使用llama-server API) try: ocr_config = { 'api_host': self.config.get('ocr_host', OCR_CONFIG['api_host']), 'api_port': self.config.get('ocr_port', OCR_CONFIG['api_port']), 'model': self.config.get('ocr_model', OCR_CONFIG['model']), } self.number_detector = NumberDetector(use_api=True, ocr_config=ocr_config) logger.info("编号检测器初始化成功 (使用llama-server API)") except Exception as e: logger.warning(f"编号检测器初始化失败: {e}") # 初始化SDK(在检测器之后,避免SDK内存映射与PyTorch冲突) sdk_path = os.path.join( self.config.get('sdk_path', SDK_PATH['lib_path']), self.config.get('netsdk', SDK_PATH['netsdk']) ) try: self.sdk = DahuaSDK(sdk_path) if not self.sdk.init(): logger.error("SDK初始化失败") return False logger.info("SDK初始化成功") except Exception as e: logger.error(f"SDK加载失败: {e}") return False # 初始化摄像头 panorama_config = self.config.get('panorama_camera', PANORAMA_CAMERA) self.panorama_camera = PanoramaCamera(self.sdk, panorama_config) ptz_config = self.config.get('ptz_camera', PTZ_CAMERA) self.ptz_camera = PTZCamera(self.sdk, ptz_config) # 初始化联动控制器 self.coordinator = Coordinator( self.panorama_camera, self.ptz_camera, self.detector, self.number_detector ) # 设置回调 self._setup_callbacks() logger.info("系统初始化完成") # 执行自动校准 if not skip_calibration: if not self._auto_calibrate(): logger.error("自动校准失败!") return False return True def _auto_calibrate(self) -> bool: """ 执行自动校准 Returns: 是否成功 """ from calibration import CameraCalibrator, CalibrationManager logger.info("=" * 50) logger.info("开始自动校准...") logger.info("=" * 50) # 连接摄像头 if not self.panorama_camera.connect(): logger.error("连接全景摄像头失败,无法进行校准") return False if not self.ptz_camera.connect(): logger.error("连接球机失败,无法进行校准") self.panorama_camera.disconnect() return False # 启动视频流获取帧数据 if not self.panorama_camera.start_stream_rtsp(): logger.warning("RTSP视频流启动失败,尝试SDK方式...") if not self.panorama_camera.start_stream(): logger.error("无法启动视频流,校准可能无法获取画面") # 启动球机视频流(校准需要球机画面做特征匹配) if not self.ptz_camera.start_stream_rtsp(): logger.warning("球机RTSP视频流启动失败,校准将无法进行特征匹配") if not self.ptz_camera.start_stream(): logger.error("无法启动球机视频流,校准将仅依赖运动检测") # 创建校准器 - 支持视野重叠发现 self.calibrator = CameraCalibrator( ptz_camera=self.ptz_camera, get_frame_func=self.panorama_camera.get_frame, detect_marker_func=None, ptz_capture_func=self._capture_ptz_frame ) # 配置重叠发现参数 overlap_cfg = CALIBRATION_CONFIG.get('overlap_discovery', {}) self.calibrator.overlap_pan_range = overlap_cfg.get('pan_range', (0, 360)) self.calibrator.overlap_tilt_range = overlap_cfg.get('tilt_range', (-45, 45)) self.calibrator.overlap_pan_step = overlap_cfg.get('pan_step', 20) self.calibrator.overlap_tilt_step = overlap_cfg.get('tilt_step', 15) self.calibrator.stabilize_time = overlap_cfg.get('stabilize_time', 2.0) # 校准进度回调 def on_progress(current: int, total: int, message: str): logger.info(f"校准进度: {current}/{total} - {message}") # 校准完成回调 def on_complete(result): if result.success: logger.info(f"校准完成! RMS误差: {result.rms_error:.4f}") else: logger.error(f"校准失败: {result.error_message}") self.calibrator.on_progress = on_progress self.calibrator.on_complete = on_complete # 创建校准管理器 self.calibration_manager = CalibrationManager(self.calibrator) # 执行视觉校准 result = self.calibration_manager.auto_calibrate(force=True) if not result.success: logger.error("=" * 50) logger.error("校准失败!") logger.error(f"原因: {result.error_message}") logger.error("=" * 50) logger.error("") logger.error("请检查以下问题:") logger.error("1. 全景摄像头和球机是否正确连接") logger.error("2. 球机PTZ控制是否正常") logger.error("3. 两台摄像头的视野是否有重叠区域") logger.error("4. 场景是否有足够的纹理/特征用于匹配") logger.error("5. 球机RTSP视频流是否正常(特征匹配需要球机画面)") logger.error("") logger.error("您可以尝试:") logger.error("- 检查摄像头连接和网络") logger.error("- 手动移动球机确认PTZ控制正常") logger.error("- 确保场景有足够的纹理/特征") logger.error("- 使用 --skip-calibration 跳过校准") logger.error("=" * 50) # 释放资源 self.panorama_camera.disconnect() self.ptz_camera.stop_stream() self.ptz_camera.disconnect() return False logger.info("=" * 50) logger.info("校准成功!") logger.info(f"有效校准点: {len(result.points)}") logger.info(f"RMS误差: {result.rms_error:.4f} 度") logger.info("=" * 50) return True def _capture_ptz_frame(self) -> Optional[np.ndarray]: """ 从球机抓拍一帧图像 用于校准时特征匹配 """ if self.ptz_camera is None: return None try: return self.ptz_camera.get_frame() except Exception as e: logger.error(f"球机抓拍失败: {e}") return None def _setup_callbacks(self): """设置回调函数""" def on_person_detected(person: DetectedObject, frame: np.ndarray): """人体检测回调""" logger.info(f"检测到人体: 位置={person.center}, 置信度={person.confidence:.2f}") def on_number_recognized(person_info: PersonInfo): """编号识别回调""" logger.info( f"识别到编号: ID={person_info.person_id}, " f"编号={person_info.number_text}, " f"置信度={person_info.number_confidence:.2f}, " f"位置={person_info.number_location}" ) self.coordinator.on_person_detected = on_person_detected self.coordinator.on_number_recognized = on_number_recognized def start(self) -> bool: """ 启动系统 Returns: 是否成功 """ if self.running: logger.warning("系统已在运行") return True logger.info("启动联动系统...") if not self.coordinator.start(): logger.error("联动系统启动失败") return False self.running = True logger.info("联动系统启动成功") # 启动定时校准 self._start_periodic_calibration() return True def stop(self): """停止系统""" if not self.running: return logger.info("停止联动系统...") # 停止定时校准 self._stop_periodic_calibration() self.coordinator.stop() self.running = False logger.info("联动系统已停止") def get_results(self): """获取识别结果""" if self.coordinator: return self.coordinator.get_results() return [] def _start_periodic_calibration(self): """启动定时校准""" if self.calibration_running: return self.calibration_running = True self.calibration_thread = threading.Thread( target=self._periodic_calibration_worker, daemon=True ) self.calibration_thread.start() # 格式化显示间隔 interval_hours = self.calibration_interval // 3600 if interval_hours >= 24: interval_str = f"{interval_hours // 24}天" elif interval_hours >= 1: interval_str = f"{interval_hours}小时" else: interval_str = f"{self.calibration_interval // 60}分钟" logger.info(f"定时校准已启动 (间隔: {interval_str})") def _stop_periodic_calibration(self): """停止定时校准""" self.calibration_running = False if self.calibration_thread: self.calibration_thread.join(timeout=2) self.calibration_thread = None logger.info("定时校准已停止") def _periodic_calibration_worker(self): """定时校准工作线程""" import time while self.calibration_running: try: # 等待校准间隔 for _ in range(self.calibration_interval): if not self.calibration_running: return time.sleep(1) if not self.calibration_running: return # 执行校准 logger.info("=" * 50) logger.info("执行定时校准...") logger.info("=" * 50) result = self._auto_calibrate() if result: logger.info("定时校准成功!") else: logger.warning("定时校准失败, 将在下次间隔重试") except Exception as e: logger.error(f"定时校准错误: {e}") time.sleep(10) def manual_calibrate(self) -> bool: """ 手动触发校准 Returns: 是否成功 """ logger.info("手动触发校准...") return self._auto_calibrate() def cleanup(self): """清理资源""" self.stop() # 确保定时校准停止 self._stop_periodic_calibration() if self.sdk: self.sdk.cleanup() logger.info("系统资源已清理") def run_interactive(system: DualCameraSystem): """ 交互模式运行 Args: system: 系统实例 """ print("\n双摄像头联动系统 - 交互模式") print("=" * 50) print("命令:") print(" s - 开始/停止联动") print(" r - 获取识别结果") print(" t - 手动跟踪 (输入 x y)") print(" c - 抓拍快照") print(" b - 手动校准") print(" q - 退出") print("=" * 50) running = False while True: try: cmd = input("\n> ").strip().lower() if cmd == 'q': break elif cmd == 's': if running: system.stop() running = False print("联动已停止") else: if system.start(): running = True print("联动已启动") elif cmd == 'r': results = system.get_results() if results: print(f"获取到 {len(results)} 个识别结果:") for r in results: print(f" ID={r.person_id}, 编号={r.number_text}, 置信度={r.number_confidence:.2f}") else: print("暂无识别结果") elif cmd == 't': try: coords = input("输入坐标 (x y, 范围0-1): ").strip().split() x, y = float(coords[0]), float(coords[1]) system.coordinator.force_track_position(x, y) print(f"已移动到位置 ({x:.2f}, {y:.2f})") except Exception as e: print(f"输入错误: {e}") elif cmd == 'c': frame = system.coordinator.capture_snapshot() if frame is not None: filename = f"snapshot_{int(time.time())}.jpg" cv2.imwrite(filename, frame) print(f"快照已保存: {filename}") else: print("抓拍失败") elif cmd == 'b': print("开始手动校准...") if system.manual_calibrate(): print("校准成功!") else: print("校准失败!") else: print("未知命令") except KeyboardInterrupt: break except Exception as e: print(f"错误: {e}") print("退出交互模式") def main(): """主函数""" parser = argparse.ArgumentParser(description='双摄像头联动抓拍系统') parser.add_argument('--panorama-ip', type=str, help='全景摄像头IP') parser.add_argument('--ptz-ip', type=str, help='球机IP') parser.add_argument('--username', type=str, default='admin', help='用户名') parser.add_argument('--password', type=str, default='admin123', help='密码') parser.add_argument('--model', type=str, help='检测模型路径 (默认使用YOLO11n)') parser.add_argument('--model-size', type=str, default='n', choices=['n', 's', 'm', 'l', 'x'], help='YOLO11模型尺寸 (n/s/m/l/x)') parser.add_argument('--no-gpu', action='store_true', help='不使用GPU') parser.add_argument('--ocr-host', type=str, default='localhost', help='OCR API服务器地址') parser.add_argument('--ocr-port', type=int, default=8111, help='OCR API端口') parser.add_argument('--ocr-model', type=str, default='PaddleOCR-VL-1.5-GGUF.gguf', help='OCR模型名称') parser.add_argument('--interactive', action='store_true', help='交互模式') parser.add_argument('--demo', action='store_true', help='演示模式(不连接实际摄像头)') parser.add_argument('--skip-calibration', action='store_true', help='跳过自动校准') parser.add_argument('--force-calibration', action='store_true', help='强制重新校准') args = parser.parse_args() # 构建配置 config = {} if args.panorama_ip: config['panorama_camera'] = { **PANORAMA_CAMERA, 'ip': args.panorama_ip, 'username': args.username, 'password': args.password, } if args.ptz_ip: config['ptz_camera'] = { **PTZ_CAMERA, 'ip': args.ptz_ip, 'username': args.username, 'password': args.password, } if args.model: config['model_path'] = args.model config['model_size'] = args.model_size config['use_gpu'] = not args.no_gpu config['ocr_host'] = args.ocr_host config['ocr_port'] = args.ocr_port config['ocr_model'] = args.ocr_model # 演示模式 if args.demo: print("演示模式: 使用模拟数据") run_demo() return # 创建系统实例 system = DualCameraSystem(config) try: # 初始化 (包含自动校准) if not system.initialize(skip_calibration=args.skip_calibration): print("\n系统初始化失败!") if not args.skip_calibration: print("提示: 可使用 --skip-calibration 跳过校准") return 1 # 运行 if args.interactive: run_interactive(system) else: # 自动模式 print("启动联动系统...") if not system.start(): print("启动失败") return 1 print("系统运行中,按Ctrl+C停止") while True: time.sleep(1) results = system.get_results() if results: for r in results: if r.number_text: print(f"[识别] ID={r.person_id}, 编号={r.number_text}") except KeyboardInterrupt: print("\n接收到停止信号") finally: system.cleanup() return 0 def run_demo(): """演示模式""" print("\n演示模式 - 双摄像头联动系统") print("=" * 60) print(""" 系统架构: ┌─────────────────────────────────────────────────────────────┐ │ 全景摄像头 (Panorama) │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ 视频流 │ -> │ 人体检测 │ -> │ 位置计算 │ │ │ └─────────┘ └─────────┘ └─────────┘ │ └─────────────────────────────────────────────────────────────┘ │ ▼ 检测到人体位置 (x_ratio, y_ratio) ┌─────────────────────────────────────────────────────────────┐ │ 球机 (PTZ Camera) │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ PTZ控制 │ -> │ 精确定位 │ -> │ 变焦放大 │ │ │ └─────────┘ └─────────┘ └─────────┘ │ └─────────────────────────────────────────────────────────────┘ │ ▼ 变焦后的人体图像 ┌─────────────────────────────────────────────────────────────┐ │ 识别模块 (OCR) │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │人体分割 │ -> │ 区域检测 │ -> │ OCR识别 │ -> 编号结果 │ │ └─────────┘ └─────────┘ └─────────┘ │ └─────────────────────────────────────────────────────────────┘ 工作流程: 1. 全景摄像头实时获取视频流 2. 使用YOLO11检测画面中的人体 3. 计算人体在画面中的相对位置 4. 控制球机PTZ移动到对应位置 5. 球机变焦放大人体区域 6. 对人体进行分割,提取服装区域 7. 使用OCR识别服装上的编号 8. 输出识别结果 主要组件: - dahua_sdk.py: 大华SDK封装 - panorama_camera.py: 全景摄像头和人体检测 - ptz_camera.py: 球机PTZ控制 - ocr_recognizer.py: 人体分割和OCR识别 - coordinator.py: 联动控制逻辑 """) print("=" * 60) print("\n使用方法:") print(" python main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101") print(" python main.py --interactive # 交互模式") print(" python main.py --demo # 演示说明") if __name__ == '__main__': sys.exit(main() or 0)