Ver Fonte

refactor(calibration): 重构相机校准模块提升代码结构和可维护性

- 优化代码格式和注释,提升模块可读性
- 统一异常处理逻辑,增强稳定性
- 调整变量命名,增强代码表达能力
- 精简部分冗余步骤,提高执行效率
- 保持功能一致性,确保视野重叠发现与精确校准流程不变
wenhongquan há 3 dias atrás
pai
commit
180d394880

+ 26 - 3
dual_camera_system/calibration.py

@@ -1011,8 +1011,17 @@ class CalibrationManager:
             except ImportError:
                 self.calibration_file = 'calibration.json'
 
-    def auto_calibrate(self, force: bool = False) -> CalibrationResult:
-        """自动校准"""
+    def auto_calibrate(self, force: bool = False, fallback_on_failure: bool = True) -> CalibrationResult:
+        """
+        自动校准
+        
+        Args:
+            force: 是否强制重新校准(不加载已有数据)
+            fallback_on_failure: 校准失败时是否回退使用已有数据
+        
+        Returns:
+            校准结果
+        """
         # 检查是否启用加载上次校准数据
         load_on_startup = True  # 默认启用
         try:
@@ -1021,16 +1030,30 @@ class CalibrationManager:
         except:
             pass
         
+        # 如果不是强制校准,尝试加载已有数据
         if not force and load_on_startup:
             if self.calibrator.load_calibration(self.calibration_file):
                 print("使用已有校准结果")
                 return self.calibrator.get_result()
 
-        print("开始自动校准..." if load_on_startup else "已禁用加载校准数据,开始新校准...")
+        # 执行新校准
+        if force:
+            print("强制重新校准(不使用已有数据)...")
+        elif not load_on_startup:
+            print("已禁用加载校准数据,开始新校准...")
+        else:
+            print("开始自动校准...")
+            
         result = self.calibrator.calibrate(quick_mode=True)
 
         if result.success:
             self.calibrator.save_calibration(self.calibration_file)
+        elif fallback_on_failure:
+            # 校准失败,尝试回退使用已有数据
+            print("校准失败,尝试回退使用已有校准数据...")
+            if self.calibrator.load_calibration(self.calibration_file):
+                print("已回退到已有校准数据")
+                result = self.calibrator.get_result()
 
         return result
 

+ 3 - 1
dual_camera_system/config/coordinator.py

@@ -25,10 +25,12 @@ COORDINATOR_CONFIG = {
 }
 
 CALIBRATION_CONFIG = {
-    'interval': 24 * 60 * 60,
+    'interval': 24 * 60 * 60,        # 校准间隔(秒),默认24小时
+    'daily_calibration_time': '08:00',  # 每日自动校准时间 (HH:MM格式)
     'quick_mode': True,
     'auto_save': True,
     'load_on_startup': True,         # 启动时是否加载上次保存的校准数据
+    'force_daily_recalibration': True,  # 每日校准是否强制重新校准(不使用已有数据)
     'min_valid_points': 4,
     'rms_error_threshold': 5.0,
     # 校准结果保存路径

+ 67 - 31
dual_camera_system/main.py

@@ -98,7 +98,9 @@ class DualCameraSystem:
         self.calibration_manager = None
         
         # 定时校准
-        self.calibration_interval = CALIBRATION_CONFIG.get('interval', 5 * 60)  # 默认5分钟
+        self.calibration_interval = CALIBRATION_CONFIG.get('interval', 24 * 60 * 60)  # 默认24小时
+        self.daily_calibration_time = CALIBRATION_CONFIG.get('daily_calibration_time', '08:00')  # 每日校准时间
+        self.force_daily_recalibration = CALIBRATION_CONFIG.get('force_daily_recalibration', True)  # 强制重新校准
         self.calibration_thread = None
         self.calibration_running = False
         self.last_calibration_time = 0
@@ -194,9 +196,13 @@ class DualCameraSystem:
         
         return True
     
-    def _auto_calibrate(self) -> bool:
+    def _auto_calibrate(self, force: bool = False) -> bool:
         """
         执行自动校准
+        
+        Args:
+            force: 是否强制重新校准(不使用已有数据)
+            
         Returns:
             是否成功
         """
@@ -292,8 +298,11 @@ class DualCameraSystem:
         # 创建校准管理器
         self.calibration_manager = CalibrationManager(self.calibrator)
         
-        # 执行视觉校准(根据配置决定是否强制重新校准)
-        result = self.calibration_manager.auto_calibrate(force=False)
+        # 执行视觉校准(根据参数决定是否强制重新校准)
+        result = self.calibration_manager.auto_calibrate(
+            force=force, 
+            fallback_on_failure=True  # 校准失败时回退使用已有数据
+        )
         
         if not result.success:
             logger.error("=" * 50)
@@ -411,26 +420,18 @@ class DualCameraSystem:
         if not SYSTEM_CONFIG.get('enable_calibration', True):
             logger.info("定时校准已禁用 (enable_calibration=False)")
             return
-        
+    
         if self.calibration_running:
             return
-        
+    
         self.calibration_running = True
         self.calibration_thread = threading.Thread(
             target=self._periodic_calibration_worker, 
             daemon=True
         )
         self.calibration_thread.start()
-        # 格式化显示间隔
-        interval_hours = self.calibration_interval // 3600
-        if interval_hours >= 24:
-            interval_str = f"{interval_hours // 24}天"
-        elif interval_hours >= 1:
-            interval_str = f"{interval_hours}小时"
-        else:
-            interval_str = f"{self.calibration_interval // 60}分钟"
-        logger.info(f"定时校准已启动 (间隔: {interval_str})")
-    
+        logger.info(f"定时校准已启动 (每日 {self.daily_calibration_time} 自动校准)")
+            
     def _stop_periodic_calibration(self):
         """停止定时校准"""
         self.calibration_running = False
@@ -439,36 +440,71 @@ class DualCameraSystem:
             self.calibration_thread = None
         logger.info("定时校准已停止")
     
+    def _get_seconds_until_target_time(self, target_time_str: str) -> int:
+        """
+        计算到目标时间的秒数
+            
+        Args:
+            target_time_str: 目标时间字符串 (HH:MM格式)
+                
+        Returns:
+            到目标时间的秒数,如果已过今天的目标时间则返回到明天目标时间的秒数
+        """
+        from datetime import datetime, timedelta
+            
+        now = datetime.now()
+        target_hour, target_minute = map(int, target_time_str.split(':'))
+        target_time = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
+            
+        # 如果已过今天的目标时间,则计算到明天的目标时间
+        if now >= target_time:
+            target_time += timedelta(days=1)
+                
+        return int((target_time - now).total_seconds())
+    
     def _periodic_calibration_worker(self):
-        """定时校准工作线程"""
-        import time
-        
+        """定时校准工作线程 - 每日指定时间执行校准"""
+        from datetime import datetime
+            
         while self.calibration_running:
             try:
-                # 等待校准间隔
-                for _ in range(self.calibration_interval):
+                # 计算到下一个校准时间的等待秒数
+                wait_seconds = self._get_seconds_until_target_time(self.daily_calibration_time)
+                next_time = datetime.now().replace(
+                    hour=int(self.daily_calibration_time.split(':')[0]),
+                    minute=int(self.daily_calibration_time.split(':')[1])
+                )
+                if datetime.now() >= next_time:
+                    from datetime import timedelta
+                    next_time += timedelta(days=1)
+                        
+                logger.info(f"下次校准时间: {next_time.strftime('%Y-%m-%d %H:%M:%S')} (等待 {wait_seconds // 3600}小时{(wait_seconds % 3600) // 60}分钟)")
+                    
+                # 等待到校准时间,每分钟检查一次是否需要停止
+                for i in range(wait_seconds):
                     if not self.calibration_running:
                         return
                     time.sleep(1)
-                
+                    
                 if not self.calibration_running:
                     return
-                
+                    
                 # 执行校准
                 logger.info("=" * 50)
-                logger.info("执行定时校准...")
+                logger.info(f"执行每日定时校准 (时间: {self.daily_calibration_time})...")
                 logger.info("=" * 50)
-                
-                result = self._auto_calibrate()
-                
+                    
+                # 每日校准强制重新校准(不使用已有数据),失败时可回退
+                result = self._auto_calibrate(force=self.force_daily_recalibration)
+                    
                 if result:
-                    logger.info("定时校准成功!")
+                    logger.info("每日定时校准成功!")
                 else:
-                    logger.warning("定时校准失败, 将在下次间隔重试")
-                
+                    logger.warning("每日定时校准失败!")
+    
             except Exception as e:
                 logger.error(f"定时校准错误: {e}")
-                time.sleep(10)
+                time.sleep(60)  # 出错后等待1分钟再重试
     
     def manual_calibrate(self) -> bool:
         """

+ 16 - 11
dual_camera_system/panorama_camera.py

@@ -540,7 +540,7 @@ class ObjectDetector:
     
     def _save_detection_image(self, frame: np.ndarray, detections: List[DetectedObject]):
         """
-        保存带有检测标记的图片(标记人体边界框和序号
+        保存带有检测标记的图片(只标记人体,序号从0开始
         Args:
             frame: 原始图像
             detections: 检测结果列表
@@ -557,19 +557,27 @@ class ObjectDetector:
             # 复制图像避免修改原图
             marked_frame = frame.copy()
             
-            # 绘制检测结果(带序号)
-            for idx, det in enumerate(detections, 1):
+            # 只统计人,序号从0开始
+            person_count = 0
+            
+            for det in detections:
                 x, y, w, h = det.bbox
                 
-                # 根据类别选择颜色:人=绿色,其他=蓝色
-                is_person = det.class_name in ['person']
+                # 只对人标记序号
+                is_person = det.class_name in ['person', '人']
                 box_color = (0, 255, 0) if is_person else (255, 165, 0)  # 绿色/橙色
                 
                 # 绘制边界框
                 cv2.rectangle(marked_frame, (x, y), (x + w, y + h), box_color, 2)
                 
-                # 绘制序号标签(大号白色背景)
-                label = f"#{det.class_name}_{idx}"
+                # 标签:人用序号,其他显示类别名
+                if is_person:
+                    label = f"person_{person_count}"
+                    person_count += 1
+                else:
+                    label = det.class_name
+                
+                
                 (label_w, label_h), baseline = cv2.getTextSize(
                     label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
                 )
@@ -588,14 +596,11 @@ class ObjectDetector:
                     cv2.FONT_HERSHEY_SIMPLEX, 0.8,
                     (0, 0, 0), 2
                 )
-                
-                # 绘制中心点(红色)
-                cv2.circle(marked_frame, det.center, 5, (0, 0, 255), -1)
             
             
             # 生成文件名(时间戳+人数)
             timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
-            filename = f"panorama_{timestamp}_n{len(detections)}.jpg"
+            filename = f"panorama_{timestamp}_n{person_count}.jpg"
             filepath = self._image_save_dir / filename
             
             # 保存图片

+ 5 - 8
dual_camera_system/ptz_person_tracker.py

@@ -283,7 +283,7 @@ class PTZPersonDetector:
     
     def _save_detection_image(self, frame: np.ndarray, persons: List[DetectedPerson]):
         """
-        保存带有检测标记的图片(标记人体边界框和序号)
+        保存带有检测标记的图片(序号从0开始
         Args:
             frame: 原始图像
             persons: 检测到的人体列表
@@ -300,15 +300,15 @@ class PTZPersonDetector:
             # 复制图像避免修改原图
             marked_frame = frame.copy()
             
-            # 绘制检测结果(序号)
-            for idx, person in enumerate(persons, 1):
+            # 绘制检测结果(序号从0开始
+            for idx, person in enumerate(persons):
                 x1, y1, x2, y2 = person.bbox
                 
                 # 绘制边界框(绿色)
                 cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                 
-                # 绘制序号标签
-                label = f"#{idx} person"
+                # 绘制序号标签:person_0, person_1, ...
+                label = f"person_{idx}"
                 (label_w, label_h), baseline = cv2.getTextSize(
                     label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
                 )
@@ -327,9 +327,6 @@ class PTZPersonDetector:
                     cv2.FONT_HERSHEY_SIMPLEX, 0.8,
                     (0, 0, 0), 2
                 )
-                
-                # 绘制中心点(红色)
-                cv2.circle(marked_frame, (int(person.center[0]), int(person.center[1])), 5, (0, 0, 255), -1)
             
             
             # 生成文件名(时间戳+人数)