""" 简化版双摄像头联动抓拍系统 全景检测到人 → 球机逐个定位抓拍 使用方法: python simple_main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101 """ import os os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1' import sys import argparse import logging import time import signal # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import ( PANORAMA_CAMERA, PTZ_CAMERA, SDK_PATH, DETECTION_CONFIG, PTZ_CONFIG, LOG_CONFIG ) from dahua_sdk import DahuaSDK from panorama_camera import PanoramaCamera, ObjectDetector from ptz_camera import PTZCamera from simple_coordinator import SimpleCoordinator # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class SimpleSystem: """简化版双摄像头联动系统""" def __init__(self, config_override: dict = None): self.config = config_override or {} self.sdk = None self.panorama_camera = None self.ptz_camera = None self.detector = None self.coordinator = None self.running = False def initialize(self) -> bool: """初始化系统""" logger.info("=" * 50) logger.info("初始化简化版联动系统") logger.info("=" * 50) # 1. 初始化检测器(必须先于SDK加载) try: self.detector = ObjectDetector( model_path=self.config.get('model_path', DETECTION_CONFIG.get('model_path')), use_gpu=self.config.get('use_gpu', True), model_size=self.config.get('model_size', 'n'), model_type=self.config.get('model_type', 'auto') ) logger.info("✓ 检测器初始化成功") except Exception as e: logger.error(f"✗ 检测器初始化失败: {e}") return False # 2. 初始化SDK sdk_path = os.path.join( self.config.get('sdk_path', SDK_PATH['lib_path']), self.config.get('netsdk', SDK_PATH['netsdk']) ) try: self.sdk = DahuaSDK(sdk_path) if not self.sdk.init(): logger.error("✗ SDK初始化失败") return False logger.info("✓ SDK初始化成功") except Exception as e: logger.error(f"✗ SDK加载失败: {e}") return False # 3. 初始化摄像头 panorama_config = self.config.get('panorama_camera', PANORAMA_CAMERA) self.panorama_camera = PanoramaCamera(self.sdk, panorama_config) ptz_config = self.config.get('ptz_camera', PTZ_CAMERA) self.ptz_camera = PTZCamera(self.sdk, ptz_config) # 4. 初始化联动控制器 self.coordinator = SimpleCoordinator( self.panorama_camera, self.ptz_camera, self.detector ) logger.info("=" * 50) logger.info("系统初始化完成") logger.info("=" * 50) return True def start(self) -> bool: """启动系统""" if self.running: return True if not self.coordinator.start(): logger.error("启动联动系统失败") return False self.running = True logger.info("系统已启动") return True def stop(self): """停止系统""" if not self.running: return self.running = False self.coordinator.stop() if self.sdk: self.sdk.cleanup() logger.info("系统已停止") def get_stats(self): """获取统计信息""" if self.coordinator: return self.coordinator.get_stats() return {} def main(): """主函数""" parser = argparse.ArgumentParser( description='简化版双摄像头联动抓拍系统', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: python simple_main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101 python simple_main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101 --model-size m """ ) parser.add_argument('--panorama-ip', type=str, required=True, help='全景摄像头IP') parser.add_argument('--ptz-ip', type=str, required=True, help='球机IP') parser.add_argument('--username', type=str, default='admin', help='用户名 (默认: admin)') parser.add_argument('--password', type=str, default='admin123', help='密码 (默认: admin123)') parser.add_argument('--model', type=str, help='检测模型路径') parser.add_argument('--model-size', type=str, default='n', choices=['n', 's', 'm', 'l', 'x'], help='YOLO11模型尺寸 (默认: n)') parser.add_argument('--no-gpu', action='store_true', help='不使用GPU') parser.add_argument('--detection-interval', type=float, default=1.0, help='检测间隔秒数 (默认: 1.0)') parser.add_argument('--ptz-wait', type=float, default=0.5, help='PTZ移动后等待时间 (默认: 0.5)') parser.add_argument('--min-area', type=int, default=2000, help='最小人体面积像素 (默认: 2000)') parser.add_argument('--confidence', type=float, default=0.7, help='置信度阈值 (默认: 0.7)') parser.add_argument('--zoom', type=int, default=8, help='默认变倍 (默认: 8)') args = parser.parse_args() # 构建配置 config = { 'panorama_camera': { **PANORAMA_CAMERA, 'ip': args.panorama_ip, 'username': args.username, 'password': args.password, }, 'ptz_camera': { **PTZ_CAMERA, 'ip': args.ptz_ip, 'username': args.username, 'password': args.password, }, 'model_path': args.model, 'model_size': args.model_size, 'use_gpu': not args.no_gpu, } # 创建系统 system = SimpleSystem(config) # 设置信号处理 def signal_handler(sig, frame): logger.info("\n接收到停止信号,正在退出...") system.stop() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: # 初始化 if not system.initialize(): logger.error("系统初始化失败!") return 1 # 启动 if not system.start(): logger.error("系统启动失败!") return 1 # 打印运行状态 print("\n" + "=" * 50) print("系统运行中,按 Ctrl+C 停止") print("=" * 50) print(f"全景摄像头: {args.panorama_ip}") print(f"球机: {args.ptz_ip}") print(f"检测间隔: {args.detection_interval}秒") print(f"置信度阈值: {args.confidence}") print(f"最小人体面积: {args.min_area}像素²") print(f"默认变倍: {args.zoom}") print("=" * 50 + "\n") # 运行循环 last_stats_time = time.time() while system.running: time.sleep(1) # 每30秒打印统计 if time.time() - last_stats_time >= 30: stats = system.get_stats() logger.info(f"[统计] 帧数={stats.get('frames_processed', 0)} " f"检测人数={stats.get('persons_detected', 0)} " f"抓拍次数={stats.get('captures_taken', 0)}") last_stats_time = time.time() except KeyboardInterrupt: pass finally: system.stop() return 0 if __name__ == '__main__': sys.exit(main() or 0)