| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- """
- 简化版双摄像头联动抓拍系统
- 全景检测到人 → 球机逐个定位抓拍
- 使用方法:
- python simple_main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101
- """
- import os
- os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
- import sys
- import argparse
- import logging
- import time
- import signal
- # 添加项目路径
- sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
- from config import (
- PANORAMA_CAMERA, PTZ_CAMERA, SDK_PATH,
- DETECTION_CONFIG, PTZ_CONFIG, LOG_CONFIG
- )
- from dahua_sdk import DahuaSDK
- from panorama_camera import PanoramaCamera, ObjectDetector
- from ptz_camera import PTZCamera
- from simple_coordinator import SimpleCoordinator
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- class SimpleSystem:
- """简化版双摄像头联动系统"""
-
- def __init__(self, config_override: dict = None):
- self.config = config_override or {}
- self.sdk = None
- self.panorama_camera = None
- self.ptz_camera = None
- self.detector = None
- self.coordinator = None
- self.running = False
-
- def initialize(self) -> bool:
- """初始化系统"""
- logger.info("=" * 50)
- logger.info("初始化简化版联动系统")
- logger.info("=" * 50)
-
- # 1. 初始化检测器(必须先于SDK加载)
- try:
- self.detector = ObjectDetector(
- model_path=self.config.get('model_path', DETECTION_CONFIG.get('model_path')),
- use_gpu=self.config.get('use_gpu', True),
- model_size=self.config.get('model_size', 'n'),
- model_type=self.config.get('model_type', 'auto')
- )
- logger.info("✓ 检测器初始化成功")
- except Exception as e:
- logger.error(f"✗ 检测器初始化失败: {e}")
- return False
-
- # 2. 初始化SDK
- sdk_path = os.path.join(
- self.config.get('sdk_path', SDK_PATH['lib_path']),
- self.config.get('netsdk', SDK_PATH['netsdk'])
- )
- try:
- self.sdk = DahuaSDK(sdk_path)
- if not self.sdk.init():
- logger.error("✗ SDK初始化失败")
- return False
- logger.info("✓ SDK初始化成功")
- except Exception as e:
- logger.error(f"✗ SDK加载失败: {e}")
- return False
-
- # 3. 初始化摄像头
- panorama_config = self.config.get('panorama_camera', PANORAMA_CAMERA)
- self.panorama_camera = PanoramaCamera(self.sdk, panorama_config)
-
- ptz_config = self.config.get('ptz_camera', PTZ_CAMERA)
- self.ptz_camera = PTZCamera(self.sdk, ptz_config)
-
- # 4. 初始化联动控制器
- self.coordinator = SimpleCoordinator(
- self.panorama_camera,
- self.ptz_camera,
- self.detector
- )
-
- logger.info("=" * 50)
- logger.info("系统初始化完成")
- logger.info("=" * 50)
-
- return True
-
- def start(self) -> bool:
- """启动系统"""
- if self.running:
- return True
-
- if not self.coordinator.start():
- logger.error("启动联动系统失败")
- return False
-
- self.running = True
- logger.info("系统已启动")
- return True
-
- def stop(self):
- """停止系统"""
- if not self.running:
- return
-
- self.running = False
- self.coordinator.stop()
-
- if self.sdk:
- self.sdk.cleanup()
-
- logger.info("系统已停止")
-
- def get_stats(self):
- """获取统计信息"""
- if self.coordinator:
- return self.coordinator.get_stats()
- return {}
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(
- description='简化版双摄像头联动抓拍系统',
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 示例:
- python simple_main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101
- python simple_main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101 --model-size m
- """
- )
-
- parser.add_argument('--panorama-ip', type=str, required=True,
- help='全景摄像头IP')
- parser.add_argument('--ptz-ip', type=str, required=True,
- help='球机IP')
- parser.add_argument('--username', type=str, default='admin',
- help='用户名 (默认: admin)')
- parser.add_argument('--password', type=str, default='admin123',
- help='密码 (默认: admin123)')
- parser.add_argument('--model', type=str,
- help='检测模型路径')
- parser.add_argument('--model-size', type=str, default='n',
- choices=['n', 's', 'm', 'l', 'x'],
- help='YOLO11模型尺寸 (默认: n)')
- parser.add_argument('--no-gpu', action='store_true',
- help='不使用GPU')
- parser.add_argument('--detection-interval', type=float, default=1.0,
- help='检测间隔秒数 (默认: 1.0)')
- parser.add_argument('--ptz-wait', type=float, default=0.5,
- help='PTZ移动后等待时间 (默认: 0.5)')
- parser.add_argument('--min-area', type=int, default=2000,
- help='最小人体面积像素 (默认: 2000)')
- parser.add_argument('--confidence', type=float, default=0.7,
- help='置信度阈值 (默认: 0.7)')
- parser.add_argument('--zoom', type=int, default=8,
- help='默认变倍 (默认: 8)')
-
- args = parser.parse_args()
-
- # 构建配置
- config = {
- 'panorama_camera': {
- **PANORAMA_CAMERA,
- 'ip': args.panorama_ip,
- 'username': args.username,
- 'password': args.password,
- },
- 'ptz_camera': {
- **PTZ_CAMERA,
- 'ip': args.ptz_ip,
- 'username': args.username,
- 'password': args.password,
- },
- 'model_path': args.model,
- 'model_size': args.model_size,
- 'use_gpu': not args.no_gpu,
- }
-
- # 创建系统
- system = SimpleSystem(config)
-
- # 设置信号处理
- def signal_handler(sig, frame):
- logger.info("\n接收到停止信号,正在退出...")
- system.stop()
- sys.exit(0)
-
- signal.signal(signal.SIGINT, signal_handler)
- signal.signal(signal.SIGTERM, signal_handler)
-
- try:
- # 初始化
- if not system.initialize():
- logger.error("系统初始化失败!")
- return 1
-
- # 启动
- if not system.start():
- logger.error("系统启动失败!")
- return 1
-
- # 打印运行状态
- print("\n" + "=" * 50)
- print("系统运行中,按 Ctrl+C 停止")
- print("=" * 50)
- print(f"全景摄像头: {args.panorama_ip}")
- print(f"球机: {args.ptz_ip}")
- print(f"检测间隔: {args.detection_interval}秒")
- print(f"置信度阈值: {args.confidence}")
- print(f"最小人体面积: {args.min_area}像素²")
- print(f"默认变倍: {args.zoom}")
- print("=" * 50 + "\n")
-
- # 运行循环
- last_stats_time = time.time()
- while system.running:
- time.sleep(1)
-
- # 每30秒打印统计
- if time.time() - last_stats_time >= 30:
- stats = system.get_stats()
- logger.info(f"[统计] 帧数={stats.get('frames_processed', 0)} "
- f"检测人数={stats.get('persons_detected', 0)} "
- f"抓拍次数={stats.get('captures_taken', 0)}")
- last_stats_time = time.time()
-
- except KeyboardInterrupt:
- pass
-
- finally:
- system.stop()
-
- return 0
- if __name__ == '__main__':
- sys.exit(main() or 0)
|