Browse Source

feat(coordinator): 增强PTZ控制逻辑并添加详细日志

添加PTZ控制相关配置参数和冷却机制
实现详细的检测和PTZ操作日志记录
优化目标跟踪状态切换逻辑
wenhongquan 3 ngày trước cách đây
mục cha
commit
00501a6b2e
2 tập tin đã thay đổi với 124 bổ sung27 xóa
  1. 20 19
      dual_camera_system/config/coordinator.py
  2. 104 8
      dual_camera_system/coordinator.py

+ 20 - 19
dual_camera_system/config/coordinator.py

@@ -2,28 +2,29 @@
 联动控制配置
 """
 
-# 联动配置
 COORDINATOR_CONFIG = {
-    'tracking_timeout': 5.0,         # 跟踪超时时间(秒)
-    'min_person_size': 50,           # 最小人体尺寸(像素)
-    'max_tracking_targets': 3,       # 最大同时跟踪目标数
+    'tracking_timeout': 5.0,
+    'min_person_size': 50,
+    'max_tracking_targets': 3,
+    'detection_interval': 1.0,
+    'ptz_command_cooldown': 0.5,
+    'ptz_position_threshold': 0.03,
 }
 
-# 校准配置
 CALIBRATION_CONFIG = {
-    'interval': 24 * 60 * 60,        # 校准间隔(秒) - 1天
-    'quick_mode': True,              # 快速校准模式
-    'auto_save': True,               # 自动保存校准结果
-    'min_valid_points': 4,            # 最少有效校准点数
-    'rms_error_threshold': 5.0,       # RMS误差阈值(度)
+    'interval': 24 * 60 * 60,
+    'quick_mode': True,
+    'auto_save': True,
+    'min_valid_points': 4,
+    'rms_error_threshold': 5.0,
     'overlap_discovery': {
-        'pan_range': (0, 360),        # 球机水平扫描范围(度) - 完整360度扫描
-        'tilt_range': (-30, 30),      # 球机垂直扫描范围(度)
-        'pan_step': 20,               # 水平扫描步进(度)
-        'tilt_step': 15,              # 垂直扫描步进(度)
-        'min_match_threshold': 8,     # 最少特征匹配数(判定重叠)
-        'stabilize_time': 2.0,        # 球机稳定等待时间(秒)
-        'max_overlap_ranges': 3,      # 最多保留的重叠区间数
-        'min_positions_per_range': 3, # 每个重叠区间最少扫描位置数
+        'pan_range': (0, 360),
+        'tilt_range': (-30, 30),
+        'pan_step': 20,
+        'tilt_step': 15,
+        'min_match_threshold': 8,
+        'stabilize_time': 2.0,
+        'max_overlap_ranges': 3,
+        'min_positions_per_range': 3,
     },
-}
+}

+ 104 - 8
dual_camera_system/coordinator.py

@@ -6,6 +6,7 @@
 import time
 import threading
 import queue
+import logging
 from typing import Optional, List, Dict, Tuple, Callable
 from dataclasses import dataclass, field
 from enum import Enum
@@ -17,6 +18,8 @@ from panorama_camera import PanoramaCamera, ObjectDetector, PersonTracker, Detec
 from ptz_camera import PTZCamera, PTZController
 from ocr_recognizer import NumberDetector, PersonInfo
 
+logger = logging.getLogger(__name__)
+
 
 class TrackingState(Enum):
     """跟踪状态"""
@@ -102,7 +105,7 @@ class Coordinator:
         
         # PTZ优化 - 避免频繁发送相同位置的命令
         self.last_ptz_position = None
-        self.ptz_position_threshold = 0.02  # 位置变化阈值 (2%)
+        self.ptz_position_threshold = self.config.get('ptz_position_threshold', 0.03)
         
         # 结果队列
         self.result_queue = queue.Queue()
@@ -225,7 +228,7 @@ class Coordinator:
     def _coordinator_worker(self):
         """联动工作线程"""
         last_detection_time = 0
-        detection_interval = 0.1  # 检测间隔
+        detection_interval = self.config.get('detection_interval', 1.0)
         
         # 初始化统计
         with self.stats_lock:
@@ -656,13 +659,20 @@ class AsyncCoordinator(Coordinator):
         print("异步联动系统已停止")
     
     def _detection_worker(self):
-        """检测线程:持续读帧 + 推理 + 发送PTZ命令"""
+        """检测线程:持续读帧 + YOLO推理 + 发送PTZ命令 + 打印检测日志"""
         last_detection_time = 0
-        detection_interval = 0.1
+        detection_interval = self.config.get('detection_interval', 1.0)
+        ptz_cooldown = self.config.get('ptz_command_cooldown', 0.5)
+        ptz_threshold = self.config.get('ptz_position_threshold', 0.03)
+        frame_count = 0
+        last_log_time = time.time()
+        log_interval = 5.0  # 每5秒打印一次帧率统计
         
         with self.stats_lock:
             self.stats['start_time'] = time.time()
         
+        logger.info(f"[检测线程] 启动, 检测间隔={detection_interval}s, PTZ冷却={ptz_cooldown}s")
+        
         while self.running:
             try:
                 current_time = time.time()
@@ -671,32 +681,64 @@ class AsyncCoordinator(Coordinator):
                     time.sleep(0.01)
                     continue
                 
+                frame_count += 1
                 self._update_stats('frames_processed')
                 frame_size = (frame.shape[1], frame.shape[0])
                 
+                # 每隔 log_interval 打印帧率统计
+                if current_time - last_log_time >= log_interval:
+                    elapsed = current_time - last_log_time
+                    fps = frame_count / elapsed if elapsed > 0 else 0
+                    logger.info(f"[检测线程] 帧率={fps:.1f}fps, 处理帧={frame_count}")
+                    frame_count = 0
+                    last_log_time = current_time
+                
+                # 周期性检测(约1次/秒)
                 if current_time - last_detection_time >= detection_interval:
                     last_detection_time = current_time
                     
+                    # YOLO 人体检测
                     detections = self._detect_persons(frame)
+                    
                     if detections:
                         self._update_stats('persons_detected', len(detections))
                     
+                    # 更新跟踪
                     tracked = self.tracker.update(detections)
                     self._update_tracking_targets(tracked, frame_size)
                     
+                    # 打印检测日志
+                    if tracked:
+                        for t in tracked:
+                            x_ratio, y_ratio = t.position
+                            logger.info(
+                                f"[检测] 目标ID={t.track_id} "
+                                f"位置=({x_ratio:.3f}, {y_ratio:.3f}) "
+                                f"置信度={getattr(t, 'confidence', 0):.2f}"
+                            )
+                    elif detections:
+                        # 有检测但没跟踪上
+                        for d in detections:
+                            logger.debug(f"[检测] 未跟踪: {d.class_name} @ {d.center}")
+                    
                     if tracked:
                         self._process_detections(tracked, frame, frame_size)
                     
                     # 选择跟踪目标并发送PTZ命令
                     target = self._select_tracking_target()
                     if target and self.enable_ptz_tracking and self.enable_ptz_camera:
-                        self._send_ptz_command(target, frame_size)
+                        self._send_ptz_command_with_log(target, frame_size)
+                    elif not tracked and self.current_target:
+                        # 目标消失,切回IDLE
+                        self._set_state(TrackingState.IDLE)
+                        logger.info("[检测] 目标丢失,球机进入IDLE状态")
+                        self.current_target = None
                 
                 self._cleanup_expired_targets()
                 time.sleep(0.01)
                 
             except Exception as e:
-                print(f"检测线程错误: {e}")
+                logger.error(f"检测线程错误: {e}")
                 time.sleep(0.1)
     
     def _ptz_worker(self):
@@ -757,6 +799,55 @@ class AsyncCoordinator(Coordinator):
         except queue.Full:
             pass  # 丢弃命令,下一个检测周期会重发
     
+    def _send_ptz_command_with_log(self, target: TrackingTarget, frame_size: Tuple[int, int]):
+        """发送PTZ命令并打印日志"""
+        x_ratio, y_ratio = target.position
+        
+        # 计算PTZ角度(用于日志)
+        if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
+            pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
+            zoom = self.ptz.ptz_config.get('default_zoom', 8)
+            coord_type = "校准坐标"
+        else:
+            pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
+            coord_type = "估算坐标"
+        
+        logger.info(
+            f"[PTZ] 发送命令: 目标ID={target.track_id} "
+            f"全景位置=({x_ratio:.3f}, {y_ratio:.3f}) → "
+            f"PTZ角度=(pan={pan:.1f}°, tilt={tilt:.1f}°, zoom={zoom}) [{coord_type}]"
+        )
+        
+        # 检查位置变化是否超过阈值
+        ptz_threshold = self.config.get('ptz_position_threshold', 0.03)
+        if self.last_ptz_position is not None:
+            last_x, last_y = self.last_ptz_position
+            dx = abs(x_ratio - last_x)
+            dy = abs(y_ratio - last_y)
+            if dx < ptz_threshold and dy < ptz_threshold:
+                logger.debug(f"[PTZ] 位置变化太小(dx={dx:.4f}, dy={dy:.4f}),跳过")
+                return
+        
+        # 冷却检查
+        current_time = time.time()
+        ptz_cooldown = self.config.get('ptz_command_cooldown', 0.5)
+        if current_time - self._last_ptz_time < ptz_cooldown:
+            logger.debug(f"[PTZ] 冷却中,跳过 (间隔={current_time - self._last_ptz_time:.2f}s < {ptz_cooldown}s)")
+            return
+        
+        cmd = PTZCommand(
+            pan=0, tilt=0, zoom=0,
+            x_ratio=x_ratio, y_ratio=y_ratio,
+            use_calibration=self.enable_calibration
+        )
+        
+        try:
+            self._ptz_queue.put_nowait(cmd)
+            self.last_ptz_position = (x_ratio, y_ratio)
+            self._update_stats('ptz_commands_sent' if 'ptz_commands_sent' in self.stats else 'persons_detected')
+        except queue.Full:
+            logger.warning("[PTZ] 命令队列满,丢弃本次命令")
+    
     def _execute_ptz_command(self, cmd: PTZCommand):
         """执行PTZ命令(在PTZ线程中)"""
         self._last_ptz_time = time.time()
@@ -770,14 +861,19 @@ class AsyncCoordinator(Coordinator):
             pan, tilt, zoom = self.ptz.calculate_ptz_position(cmd.x_ratio, cmd.y_ratio)
         
         self._set_state(TrackingState.TRACKING)
+        logger.info(
+            f"[PTZ] 执行: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom} "
+            f"(全景位置=({cmd.x_ratio:.3f}, {cmd.y_ratio:.3f}))"
+        )
+        
         success = self.ptz.goto_exact_position(pan, tilt, zoom)
         
         if success:
-            # PTZ位置确认:等待球机稳定后读取球机帧验证
             time.sleep(self.PTZ_CONFIRM_WAIT)
             self._confirm_ptz_position(cmd.x_ratio, cmd.y_ratio)
+            logger.info(f"[PTZ] 到位确认完成: pan={pan:.1f}° tilt={tilt:.1f}°")
         else:
-            print(f"[AsyncCoordinator] PTZ命令执行失败: pan={pan:.1f}, tilt={tilt:.1f}")
+            logger.warning(f"[PTZ] 命令执行失败: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}")
     
     def _confirm_ptz_position(self, x_ratio: float, y_ratio: float):
         """PTZ位置确认:读取球机帧验证目标是否可见"""