| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033 |
- """
- 双摄像头联动抓拍系统 - 主程序
- 系统功能:
- 1. 全景摄像头实时监控和物体检测
- 2. 检测到人体后,球机自动变焦定位
- """
- # 必须在import cv2之前设置,否则FFmpeg多线程解码会导致
- # "Assertion fctx->async_lock failed at pthread_frame.c:167" 崩溃
- import os
- os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp|threads;1'
- import sys
- import time
- import glob
- import argparse
- import logging
- import threading
- import signal
- from typing import Optional
- import cv2
- import numpy as np
- # 添加项目路径
- sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
- from config import (
- PANORAMA_CAMERA, PTZ_CAMERA, SDK_PATH,
- DETECTION_CONFIG, PTZ_CONFIG, COORDINATOR_CONFIG,
- CALIBRATION_CONFIG, LOG_CONFIG, SYSTEM_CONFIG,
- CAMERA_GROUPS, get_enabled_groups
- )
- from dahua_sdk import DahuaSDK
- from panorama_camera import PanoramaCamera, ObjectDetector, DetectedObject
- from ptz_camera import PTZCamera, PTZController
- from coordinator import Coordinator, EventDrivenCoordinator, AsyncCoordinator, SequentialCoordinator
- # 配置日志 - 使用LOG_CONFIG
- def _cleanup_old_logs(log_file: str, retention_days: int):
- """清理超过保留天数的日志文件"""
- import glob
- if not log_file:
- return
- log_dir = os.path.dirname(log_file) or '.'
- log_basename = os.path.basename(log_file)
- # 匹配所有轮转的日志文件:app.log, app.log.1, app.log.2, ...
- patterns = [
- log_basename,
- f"{log_basename}.*",
- f"{os.path.splitext(log_basename)[0]}.*", # 处理没有扩展名的情况
- ]
- now = time.time()
- cutoff = now - (retention_days * 86400)
- for pattern in patterns:
- full_pattern = os.path.join(log_dir, pattern)
- for log_path in glob.glob(full_pattern):
- try:
- if os.path.isfile(log_path):
- mtime = os.path.getmtime(log_path)
- if mtime < cutoff:
- os.remove(log_path)
- print(f"[日志清理] 已删除过期日志: {log_path}")
- except Exception as e:
- pass # 忽略删除失败的日志文件
- def _log_cleanup_worker(retention_days: int, interval_hours: int = 6):
- """日志清理后台线程"""
- log_file = LOG_CONFIG.get('file')
- if not log_file:
- return
- while True:
- _cleanup_old_logs(log_file, retention_days)
- time.sleep(interval_hours * 3600)
- def setup_logging():
- """设置日志配置"""
- log_level = getattr(logging, LOG_CONFIG.get('level', 'INFO'), logging.INFO)
- log_format = LOG_CONFIG.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- log_file = LOG_CONFIG.get('file')
- retention_days = LOG_CONFIG.get('retention_days', 7)
- handlers = [logging.StreamHandler()]
- if log_file:
- # 确保日志目录存在
- log_dir = os.path.dirname(log_file)
- if log_dir:
- os.makedirs(log_dir, exist_ok=True)
- from logging.handlers import RotatingFileHandler
- file_handler = RotatingFileHandler(
- log_file,
- maxBytes=LOG_CONFIG.get('max_bytes', 10*1024*1024),
- backupCount=LOG_CONFIG.get('backup_count', 5)
- )
- file_handler.setFormatter(logging.Formatter(log_format))
- handlers.append(file_handler)
- # 启动日志清理后台线程
- cleanup_thread = threading.Thread(
- target=_log_cleanup_worker,
- args=(retention_days, 6),
- daemon=True
- )
- cleanup_thread.start()
- logging.basicConfig(
- level=log_level,
- format=log_format,
- handlers=handlers
- )
- setup_logging()
- logger = logging.getLogger(__name__)
- # 全局停止标志(用于信号处理)
- _shutdown_requested = False
- def _signal_handler(signum, frame):
- """信号处理函数"""
- global _shutdown_requested
- sig_name = signal.Signals(signum).name
- logger.info(f"接收到信号 {sig_name},准备优雅退出...")
- print(f"\n[信号] 接收到 {sig_name},准备停止...")
- _shutdown_requested = True
- class DualCameraSystem:
- """
- 双摄像头联动抓拍系统
- """
-
- def __init__(self, config_override: dict = None):
- """
- 初始化系统
- Args:
- config_override: 配置覆盖
- """
- self.config = config_override or {}
-
- # SDK
- self.sdk = None
-
- # 摄像头
- self.panorama_camera = None
- self.ptz_camera = None
-
- # 检测器
-
- # 联动控制器
- self.coordinator = None
-
- # 校准器
- self.calibrator = None
- self.calibration_manager = None
-
- # 定时校准
- self.calibration_interval = CALIBRATION_CONFIG.get('interval', 24 * 60 * 60) # 默认24小时
- self.daily_calibration_time = CALIBRATION_CONFIG.get('daily_calibration_time', '08:00') # 每日校准时间
- self.force_daily_recalibration = CALIBRATION_CONFIG.get('force_daily_recalibration', True) # 强制重新校准
- self.calibration_thread = None
- self.calibration_running = False
- self.last_calibration_time = 0
-
- # 运行标志
- self.running = False
- # 检测暂停状态(校准时暂停检测)
- self._detection_was_running = False
- # Captures 目录清理配置
- self.captures_cleanup_enabled = False # 已禁用 captures,不再需要定时清理
- self.captures_dir = '/home/admin/dsh/captures'
- self.captures_cleanup_hour = 3 # 每天凌晨3点清理
-
- def initialize(self, skip_calibration: bool = False) -> bool:
- """
- 初始化系统组件
- Args:
- skip_calibration: 是否跳过校准
- Returns:
- 是否成功
- """
- logger.info("初始化双摄像头联动系统...")
-
- # 先初始化检测器(YOLO/PyTorch),再加载大华SDK
- # 大华SDK与PyTorch共享进程空间时可能导致内存冲突,
- # 先加载PyTorch可避免SDK的内存映射覆盖PyTorch运行时
-
- # 初始化检测器 (YOLO11/RKNN/ONNX)
- try:
- from config import DETECTION_CONFIG
- self.detector = ObjectDetector(
- model_path=self.config.get('model_path', DETECTION_CONFIG.get('model_path')),
- use_gpu=self.config.get('use_gpu', DETECTION_CONFIG.get('use_gpu', True)),
- model_size=self.config.get('model_size', 'n'),
- model_type=self.config.get('model_type', DETECTION_CONFIG.get('model_type', 'auto'))
- )
- logger.info("检测器初始化成功")
- except Exception as e:
- logger.warning(f"检测器初始化失败: {e}")
-
- # 初始化SDK(在检测器之后,避免SDK内存映射与PyTorch冲突)
- sdk_path = os.path.join(
- self.config.get('sdk_path', SDK_PATH['lib_path']),
- self.config.get('netsdk', SDK_PATH['netsdk'])
- )
-
- try:
- self.sdk = DahuaSDK(sdk_path)
- if not self.sdk.init():
- logger.error("SDK初始化失败")
- return False
- logger.info("SDK初始化成功")
- except Exception as e:
- logger.error(f"SDK加载失败: {e}")
- return False
-
- # 初始化摄像头
- panorama_config = self.config.get('panorama_camera', PANORAMA_CAMERA)
- self.panorama_camera = PanoramaCamera(self.sdk, panorama_config)
-
- ptz_config = self.config.get('ptz_camera', PTZ_CAMERA)
- self.ptz_camera = PTZCamera(self.sdk, ptz_config)
-
- # 初始化联动控制器
- # 根据配置选择顺序模式或异步模式
- sequential_mode = COORDINATOR_CONFIG.get('sequential_mode', {}).get('enabled', False)
-
- if sequential_mode:
- logger.info("使用顺序联动控制器 (SequentialCoordinator)")
- self.coordinator = SequentialCoordinator(
- self.panorama_camera,
- self.ptz_camera,
- self.detector
- )
- # 应用顺序模式配置
- seq_config = COORDINATOR_CONFIG.get('sequential_mode', {})
- self.coordinator.set_capture_config(
- ptz_stabilize_time=seq_config.get('ptz_stabilize_time', 1.0),
- capture_wait_time=seq_config.get('capture_wait_time', 0.5),
- return_to_panorama=seq_config.get('return_to_panorama', True),
- default_pan=seq_config.get('default_pan', 0.0),
- default_tilt=seq_config.get('default_tilt', 0.0),
- default_zoom=seq_config.get('default_zoom', 1)
- )
- else:
- logger.info("使用异步联动控制器 (AsyncCoordinator)")
- self.coordinator = AsyncCoordinator(
- self.panorama_camera,
- self.ptz_camera,
- self.detector
- )
-
- # 设置回调
- self._setup_callbacks()
-
- logger.info("系统初始化完成")
-
- # 执行自动校准(受配置开关和参数双重控制)
- should_calibrate = SYSTEM_CONFIG.get('enable_calibration', True)
- if not should_calibrate:
- logger.info("自动校准已禁用 (enable_calibration=False)")
- self._load_existing_calibration()
- elif skip_calibration:
- logger.info("自动校准已跳过 (--skip-calibration)")
- self._load_existing_calibration()
- else:
- if not self._auto_calibrate():
- logger.error("自动校准失败!")
- return False
-
- return True
- def _load_existing_calibration(self):
- """加载已有的校准数据并传递给联动控制器(跳过校准时使用)"""
- try:
- from calibration import CameraCalibrator, CalibrationManager
- from config import CALIBRATION_CONFIG
- calibrator = CameraCalibrator(
- ptz_camera=self.ptz_camera,
- get_frame_func=self.panorama_camera.get_frame,
- ptz_capture_func=self._capture_ptz_frame
- )
- calibration_file = CALIBRATION_CONFIG.get('calibration_file', '/home/admin/dsh/calibration.json')
- if calibrator.load_calibration(calibration_file):
- logger.info(f"已加载已有校准数据: {calibration_file}")
- if self.coordinator and calibrator.is_calibrated():
- self.coordinator.set_calibrator(calibrator)
- logger.info("校准器已传递给联动控制器")
- self.calibrator = calibrator
- else:
- logger.warning("无已有校准数据,跟踪将使用默认线性映射")
- except Exception as e:
- logger.warning(f"加载校准数据失败: {e},跟踪将使用默认线性映射")
- def _pause_detection(self):
- """暂停检测线程(校准时使用,避免检测与校准争抢PTZ控制权)"""
- if self.coordinator is None:
- return
- if hasattr(self.coordinator, 'pause_detection') and self.coordinator.running:
- self._detection_was_running = True
- self.coordinator.pause_detection()
- logger.info("已暂停检测,校准期间不执行检测")
- else:
- self._detection_was_running = False
- def _resume_detection(self):
- """恢复检测线程(校准完成后恢复)"""
- if self.coordinator is None:
- return
- if self._detection_was_running and hasattr(self.coordinator, 'resume_detection'):
- self.coordinator.resume_detection()
- logger.info("已恢复检测")
- def _auto_calibrate(self, force: bool = False) -> bool:
- """
- 执行自动校准
- 校准期间会暂停检测线程,避免检测与校准争抢PTZ控制权。
- 校准完成(或失败)后恢复检测。
- Args:
- force: 是否强制重新校准(不使用已有数据)
- Returns:
- 是否成功
- """
- # 校准前暂停检测,避免检测线程与校准争抢PTZ
- self._pause_detection()
- try:
- return self._do_auto_calibrate(force)
- finally:
- # 校准完成(无论成功失败)后恢复检测
- self._resume_detection()
- def _do_auto_calibrate(self, force: bool = False) -> bool:
- """自动校准的实际执行逻辑"""
- from calibration import CameraCalibrator, CalibrationManager
- logger.info("=" * 50)
- logger.info("开始自动校准...")
- logger.info("=" * 50)
-
- # 连接摄像头
- if not self.panorama_camera.connect():
- logger.error("连接全景摄像头失败,无法进行校准")
- return False
-
- if not self.ptz_camera.connect():
- logger.error("连接球机失败,无法进行校准")
- self.panorama_camera.disconnect()
- return False
-
- # 启动视频流获取帧数据
- if not self.panorama_camera.start_stream_rtsp():
- logger.warning("RTSP视频流启动失败,尝试SDK方式...")
- if not self.panorama_camera.start_stream():
- logger.error("无法启动视频流,校准可能无法获取画面")
-
- # 启动球机视频流(校准需要球机画面做特征匹配)
- if not self.ptz_camera.start_stream_rtsp():
- logger.warning("球机RTSP视频流启动失败,校准将无法进行特征匹配")
-
- # 等待视频流稳定,确保帧数据可用
- logger.info("等待视频流稳定...")
- max_wait = 15
- for i in range(max_wait):
- pan_frame = self.panorama_camera.get_frame()
- ptz_frame = self.ptz_camera.get_frame() if self.ptz_camera else None
-
- if pan_frame is not None:
- logger.info(f"全景帧就绪 ({pan_frame.shape}), 等待中...")
- break
- time.sleep(1)
- if (i + 1) % 3 == 0:
- logger.info(f"等待全景帧... ({i + 1}/{max_wait}秒)")
-
- # 再等几秒让流完全稳定
- ptz_ready = False
- for i in range(10):
- ptz_frame = self.ptz_camera.get_frame() if self.ptz_camera else None
- if ptz_frame is not None:
- ptz_ready = True
- logger.info(f"球机帧就绪 ({ptz_frame.shape})")
- break
- time.sleep(1)
-
- if not ptz_ready:
- logger.warning("球机帧未就绪,校准可能仅依赖运动检测")
-
- final_frame = self.panorama_camera.get_frame()
- if final_frame is None:
- logger.error("5秒后仍无法获取全景帧,校准可能失败")
-
- # 创建校准器 - 支持视野重叠发现
- self.calibrator = CameraCalibrator(
- ptz_camera=self.ptz_camera,
- get_frame_func=self.panorama_camera.get_frame,
- detect_marker_func=None,
- ptz_capture_func=self._capture_ptz_frame
- )
-
- # 配置重叠发现参数
- overlap_cfg = CALIBRATION_CONFIG.get('overlap_discovery', {})
- self.calibrator.overlap_pan_range = overlap_cfg.get('pan_range', (0, 360))
- self.calibrator.overlap_tilt_range = overlap_cfg.get('tilt_range', (-20, 50))
- self.calibrator.overlap_pan_step = overlap_cfg.get('pan_step', 20)
- self.calibrator.overlap_tilt_step = overlap_cfg.get('tilt_step', 15)
- self.calibrator.stabilize_time = overlap_cfg.get('stabilize_time', 2.0)
- self.calibrator.max_overlap_ranges = overlap_cfg.get('max_overlap_ranges', 3)
- self.calibrator.min_positions_per_range = overlap_cfg.get('min_positions_per_range', 3)
-
- # 校准进度回调
- def on_progress(current: int, total: int, message: str):
- logger.info(f"校准进度: {current}/{total} - {message}")
-
- # 校准完成回调
- def on_complete(result):
- if result.success:
- logger.info(f"校准完成! RMS误差: {result.rms_error:.4f}")
- else:
- logger.error(f"校准失败: {result.error_message}")
-
- self.calibrator.on_progress = on_progress
- self.calibrator.on_complete = on_complete
-
- # 创建校准管理器
- self.calibration_manager = CalibrationManager(self.calibrator)
-
- # 执行视觉校准(根据参数决定是否强制重新校准)
- result = self.calibration_manager.auto_calibrate(
- force=force,
- fallback_on_failure=True # 校准失败时回退使用已有数据
- )
-
- if not result.success:
- logger.error("=" * 50)
- logger.error("校准失败!")
- logger.error(f"原因: {result.error_message}")
- logger.error("=" * 50)
- logger.error("")
- logger.error("请检查以下问题:")
- logger.error("1. 全景摄像头和球机是否正确连接")
- logger.error("2. 球机PTZ控制是否正常")
- logger.error("3. 两台摄像头的视野是否有重叠区域")
- logger.error("4. 场景是否有足够的纹理/特征用于匹配")
- logger.error("5. 球机RTSP视频流是否正常(特征匹配需要球机画面)")
- logger.error("")
- logger.error("您可以尝试:")
- logger.error("- 检查摄像头连接和网络")
- logger.error("- 手动移动球机确认PTZ控制正常")
- logger.error("- 确保场景有足够的纹理/特征")
- logger.error("- 使用 --skip-calibration 跳过校准")
- logger.error("=" * 50)
-
- # 释放资源
- self.panorama_camera.disconnect()
- self.ptz_camera.stop_stream()
- self.ptz_camera.disconnect()
-
- return False
-
- logger.info("=" * 50)
- logger.info("校准成功!")
- logger.info(f"有效校准点: {len(result.points)}")
- logger.info(f"RMS误差: {result.rms_error:.4f} 度")
- logger.info("=" * 50)
- # 将校准器传递给联动控制器,使其使用校准查找表而非线性映射
- if self.coordinator and self.calibrator and self.calibrator.is_calibrated():
- self.coordinator.set_calibrator(self.calibrator)
- logger.info("校准器已传递给联动控制器,跟踪将使用校准查找表")
- return True
-
- def _capture_ptz_frame(self) -> Optional[np.ndarray]:
- """
- 从球机抓拍一帧图像
- 用于校准时特征匹配
- """
- if self.ptz_camera is None:
- return None
- try:
- return self.ptz_camera.get_frame()
- except Exception as e:
- logger.error(f"球机抓拍失败: {e}")
- return None
-
- def _setup_callbacks(self):
- """设置回调函数"""
- def on_person_detected(person: DetectedObject, frame: np.ndarray):
- """人体检测回调"""
- logger.info(f"检测到人体: 位置={person.center}, 置信度={person.confidence:.2f}")
- self.coordinator.on_person_detected = on_person_detected
-
- def start(self) -> bool:
- """
- 启动系统
- Returns:
- 是否成功
- """
- if self.running:
- logger.warning("系统已在运行")
- return True
-
- logger.info("启动联动系统...")
-
- if not self.coordinator.start():
- logger.error("联动系统启动失败")
- return False
-
- self.running = True
- logger.info("联动系统启动成功")
- # 启动定时校准
- self._start_periodic_calibration()
- # 启动 captures 目录定时清理
- self._start_captures_cleanup()
- return True
-
- def stop(self):
- """停止系统"""
- if not self.running:
- return
-
- logger.info("停止联动系统...")
-
- # 停止定时校准
- self._stop_periodic_calibration()
-
- self.coordinator.stop()
- self.running = False
- logger.info("联动系统已停止")
-
- def get_results(self):
- """获取识别结果"""
- if self.coordinator:
- return self.coordinator.get_results()
- return []
-
- def _start_periodic_calibration(self):
- """启动定时校准"""
- if not SYSTEM_CONFIG.get('enable_calibration', True):
- logger.info("定时校准已禁用 (enable_calibration=False)")
- return
-
- if self.calibration_running:
- return
-
- self.calibration_running = True
- self.calibration_thread = threading.Thread(
- target=self._periodic_calibration_worker,
- daemon=True
- )
- self.calibration_thread.start()
- logger.info(f"定时校准已启动 (每日 {self.daily_calibration_time} 自动校准)")
-
- def _stop_periodic_calibration(self):
- """停止定时校准"""
- self.calibration_running = False
- if self.calibration_thread:
- self.calibration_thread.join(timeout=2)
- self.calibration_thread = None
- logger.info("定时校准已停止")
- def _start_captures_cleanup(self):
- """启动 captures 目录定时清理"""
- if not self.captures_cleanup_enabled:
- return
- def cleanup_worker():
- while self.running:
- try:
- from datetime import datetime
- now = datetime.now()
- # 计算到凌晨清理时间的秒数
- target_hour = self.captures_cleanup_hour
- target_time = now.replace(hour=target_hour, minute=0, second=0, microsecond=0)
- if now.hour >= target_hour:
- target_time = target_time.replace(day=now.day + 1)
- wait_seconds = int((target_time - now).total_seconds())
- logger.info(f"[Captures清理] 下次清理时间: {target_time.strftime('%Y-%m-%d %H:%M')} (等待 {wait_seconds // 3600} 小时)")
- # 每小时检查一次是否到达清理时间
- for _ in range(3600):
- if not self.running:
- return
- time.sleep(1)
- # 检查是否到达清理时间(每小时的0分)
- current = datetime.now()
- if current.hour == target_hour and current.minute == 0:
- self._cleanup_captures()
- break
- except Exception as e:
- logger.error(f"[Captures清理] 清理任务异常: {e}")
- time.sleep(60)
- cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
- cleanup_thread.start()
- logger.info(f"[Captures清理] 定时清理已启��� (每日 {self.captures_cleanup_hour}:00)")
- def _cleanup_captures(self):
- """清理 captures 目录"""
- try:
- import os
- if not os.path.exists(self.captures_dir):
- return
- files = [f for f in os.listdir(self.captures_dir) if f.startswith('capture_')]
- if not files:
- logger.info("[Captures清理] 目录为空,无需清理")
- return
- deleted = 0
- for filename in files:
- filepath = os.path.join(self.captures_dir, filename)
- try:
- os.remove(filepath)
- deleted += 1
- except Exception as e:
- logger.warning(f"[Captures清理] 删除失败: {filepath}, {e}")
- logger.info(f"[Captures清理] 已清理 {deleted} 个文件")
- except Exception as e:
- logger.error(f"[Captures清理] 清理失败: {e}")
-
- def _get_seconds_until_target_time(self, target_time_str: str) -> int:
- """
- 计算到目标时间的秒数
-
- Args:
- target_time_str: 目标时间字符串 (HH:MM格式)
-
- Returns:
- 到目标时间的秒数,如果已过今天的目标时间则返回到明天目标时间的秒数
- """
- from datetime import datetime, timedelta
-
- now = datetime.now()
- target_hour, target_minute = map(int, target_time_str.split(':'))
- target_time = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
-
- # 如果已过今天的目标时间,则计算到明天的目标时间
- if now >= target_time:
- target_time += timedelta(days=1)
-
- return int((target_time - now).total_seconds())
-
- def _periodic_calibration_worker(self):
- """定时校准工作线程 - 每日指定时间执行校准"""
- from datetime import datetime
-
- while self.calibration_running:
- try:
- # 计算到下一个校准时间的等待秒数
- wait_seconds = self._get_seconds_until_target_time(self.daily_calibration_time)
- next_time = datetime.now().replace(
- hour=int(self.daily_calibration_time.split(':')[0]),
- minute=int(self.daily_calibration_time.split(':')[1])
- )
- if datetime.now() >= next_time:
- from datetime import timedelta
- next_time += timedelta(days=1)
-
- logger.info(f"下次校准时间: {next_time.strftime('%Y-%m-%d %H:%M:%S')} (等待 {wait_seconds // 3600}小时{(wait_seconds % 3600) // 60}分钟)")
-
- # 等待到校准时间,每分钟检查一次是否需要停止
- for i in range(wait_seconds):
- if not self.calibration_running:
- return
- time.sleep(1)
-
- if not self.calibration_running:
- return
-
- # 执行校准
- logger.info("=" * 50)
- logger.info(f"执行每日定时校准 (时间: {self.daily_calibration_time})...")
- logger.info("=" * 50)
-
- # 每日校准强制重新校准(不使用已有数据),失败时可回退
- result = self._auto_calibrate(force=self.force_daily_recalibration)
-
- if result:
- logger.info("每日定时校准成功!")
- else:
- logger.warning("每日定时校准失败!")
-
- except Exception as e:
- logger.error(f"定时校准错误: {e}")
- time.sleep(60) # 出错后等待1分钟再重试
-
- def manual_calibrate(self) -> bool:
- """
- 手动触发校准
- Returns:
- 是否成功
- """
- logger.info("手动触发校准...")
- return self._auto_calibrate()
-
- def cleanup(self):
- """清理资源"""
- self.stop()
-
- # 确保定时校准停止
- self._stop_periodic_calibration()
-
- if self.sdk:
- self.sdk.cleanup()
-
- logger.info("系统资源已清理")
- def run_interactive(system: DualCameraSystem):
- """
- 交互模式运行
- Args:
- system: 系统实例
- """
- print("\n双摄像头联动系统 - 交互模式")
- print("=" * 50)
- print("命令:")
- print(" s - 开始/停止联动")
- print(" r - 获取识别结果")
- print(" t - 手动跟踪 (输入 x y)")
- print(" c - 抓拍快照")
- print(" b - 手动校准")
- print(" q - 退出")
- print("=" * 50)
-
- running = False
-
- while True:
- try:
- cmd = input("\n> ").strip().lower()
-
- if cmd == 'q':
- break
-
- elif cmd == 's':
- if running:
- system.stop()
- running = False
- print("联动已停止")
- else:
- if system.start():
- running = True
- print("联动已启动")
-
- elif cmd == 'r':
- results = system.get_results()
- if results:
- print(f"获取到 {len(results)} 个识别结果")
- else:
- print("暂无识别结果")
-
- elif cmd == 't':
- try:
- coords = input("输入坐标 (x y, 范围0-1): ").strip().split()
- x, y = float(coords[0]), float(coords[1])
- system.coordinator.force_track_position(x, y)
- print(f"已移动到位置 ({x:.2f}, {y:.2f})")
- except Exception as e:
- print(f"输入错误: {e}")
-
- elif cmd == 'c':
- frame = system.coordinator.capture_snapshot()
- if frame is not None:
- filename = f"snapshot_{int(time.time())}.jpg"
- cv2.imwrite(filename, frame)
- print(f"快照已保存: {filename}")
- else:
- print("抓拍失败")
-
- elif cmd == 'b':
- print("开始手动校准...")
- if system.manual_calibrate():
- print("校准成功!")
- else:
- print("校准失败!")
-
- else:
- print("未知命令")
-
- except KeyboardInterrupt:
- break
- except Exception as e:
- print(f"错误: {e}")
-
- print("退出交互模式")
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description='双摄像头联动抓拍系统')
-
- # 多组模式参数
- parser.add_argument('--multi-group', action='store_true', help='启用多组摄像头模式')
-
- # 单组模式参数(兼容旧参数)
- parser.add_argument('--panorama-ip', type=str, help='全景摄像头IP')
- parser.add_argument('--ptz-ip', type=str, help='球机IP')
- parser.add_argument('--username', type=str, default='admin', help='用户名')
- parser.add_argument('--password', type=str, default='admin123', help='密码')
- parser.add_argument('--model', type=str, help='检测模型路径 (默认使用YOLO11n)')
- parser.add_argument('--model-size', type=str, default='n',
- choices=['n', 's', 'm', 'l', 'x'],
- help='YOLO11模型尺寸 (n/s/m/l/x)')
- parser.add_argument('--no-gpu', action='store_true', help='不使用GPU')
- parser.add_argument('--interactive', action='store_true', help='交互模式')
- parser.add_argument('--demo', action='store_true', help='演示模式(不连接实际摄像头)')
- parser.add_argument('--skip-calibration', action='store_true', help='跳过自动校准')
- parser.add_argument('--force-calibration', action='store_true', help='强制重新校准')
-
- args = parser.parse_args()
-
- # 演示模式
- if args.demo:
- print("演示模式: 使用模拟数据")
- run_demo()
- return
-
- # 检查是否启用多组模式
- enabled_groups = get_enabled_groups()
- use_multi_group = args.multi_group or len(enabled_groups) > 1
-
- if use_multi_group:
- # 多组模式
- return run_multi_group_mode(args)
- else:
- # 单组模式(保持向后兼容)
- return run_single_group_mode(args)
- def run_multi_group_mode(args):
- """多组摄像头模式"""
- global _shutdown_requested
- from multi_group_system import MultiGroupSystem
-
- # 注册信号处理
- signal.signal(signal.SIGINT, _signal_handler)
- signal.signal(signal.SIGTERM, _signal_handler)
- _shutdown_requested = False
-
- print("\n" + "=" * 60)
- print("多组摄像头联动抓拍系统")
- print("=" * 60)
-
- enabled_groups = get_enabled_groups()
- print(f"启用的摄像头组: {len(enabled_groups)} 个")
- for g in enabled_groups:
- print(f" - {g.get('name', g.get('group_id'))}")
- print()
-
- # 构建配置
- config = {
- 'model_size': args.model_size,
- 'use_gpu': not args.no_gpu,
- }
-
- if args.model:
- config['model_path'] = args.model
-
- # 创建多组系统
- system = MultiGroupSystem(config)
-
- try:
- # 初始化
- if not system.initialize(skip_calibration=args.skip_calibration):
- print("\n系统初始化失败!")
- return 1
-
- # 启动
- print("\n启动多组联动系统...")
- if not system.start():
- print("启动失败")
- return 1
-
- print(f"\n多组摄像头系统运行中 ({len(system.groups)} 个组)")
- print("按 Ctrl+C 停止\n")
-
- # 等待(检查停止标志)
- while system.running and not _shutdown_requested:
- time.sleep(1)
-
- except KeyboardInterrupt:
- print("\n接收到停止信号")
- finally:
- print("正在停止系统...")
- system.stop()
-
- return 0
- def run_single_group_mode(args):
- """单组摄像头模式(保持向后兼容)"""
- global _shutdown_requested
-
- # 注册信号处理
- signal.signal(signal.SIGINT, _signal_handler)
- signal.signal(signal.SIGTERM, _signal_handler)
- _shutdown_requested = False
-
- # 构建配置
- config = {}
-
- if args.panorama_ip:
- config['panorama_camera'] = {
- **PANORAMA_CAMERA,
- 'ip': args.panorama_ip,
- 'username': args.username,
- 'password': args.password,
- }
-
- if args.ptz_ip:
- config['ptz_camera'] = {
- **PTZ_CAMERA,
- 'ip': args.ptz_ip,
- 'username': args.username,
- 'password': args.password,
- }
-
- if args.model:
- config['model_path'] = args.model
-
- config['model_size'] = args.model_size
- config['use_gpu'] = not args.no_gpu
-
- # 创建系统实例
- system = DualCameraSystem(config)
-
- try:
- # 初始化 (包含自动校准)
- if not system.initialize(skip_calibration=args.skip_calibration):
- print("\n系统初始化失败!")
- if not args.skip_calibration:
- print("提示: 可使用 --skip-calibration 跳过校准")
- return 1
-
- # 运行
- if args.interactive:
- run_interactive(system)
- else:
- # 自动模式
- print("启动联动系统...")
- if not system.start():
- print("启动失败")
- return 1
-
- print("系统运行中,按Ctrl+C停止")
- while not _shutdown_requested:
- time.sleep(1)
- results = system.get_results()
- if results:
- print(f"[识别] 获取到 {len(results)} 个结果")
-
- except KeyboardInterrupt:
- print("\n接收到停止信号")
-
- finally:
- print("正在停止系统...")
- system.cleanup()
-
- return 0
- def run_demo():
- """演示模式"""
- print("\n演示模式 - 双摄像头联动系统")
- print("=" * 60)
- print("""
- 系统架构:
- ┌─────────────────────────────────────────────────────────────┐
- │ 全景摄像头 (Panorama) │
- │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
- │ │ 视频流 │ -> │ 人体检测 │ -> │ 位置计算 │ │
- │ └─────────┘ └─────────┘ └─────────┘ │
- └─────────────────────────────────────────────────────────────┘
- │
- ▼ 检测到人体位置 (x_ratio, y_ratio)
- ┌─────────────────────────────────────────────────────────────┐
- │ 球机 (PTZ Camera) │
- │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
- │ │ PTZ控制 │ -> │ 精确定位 │ -> │ 变焦放大 │ │
- │ └─────────┘ └─────────┘ └─────────┘ │
- └─────────────────────────────────────────────────────────────┘
- 工作流程:
- 1. 全景摄像头实时获取视频流
- 2. 使用YOLO11检测画面中的人体
- 3. 计算人体在画面中的相对位置
- 4. 控制球机PTZ移动到对应位置
- 5. 球机变焦放大人体区域
- 主要组件:
- - dahua_sdk.py: 大华SDK封装
- - panorama_camera.py: 全景摄像头和人体检测
- - ptz_camera.py: 球机PTZ控制
- - coordinator.py: 联动控制逻辑
- """)
-
- print("=" * 60)
- print("\n使用方法:")
- print(" python main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101")
- print(" python main.py --interactive # 交互模式")
- print(" python main.py --demo # 演示说明")
- if __name__ == '__main__':
- sys.exit(main() or 0)
|