Jelajahi Sumber

feat(calibration): 优化重叠区域发现算法并增加RANSAC异常值过滤

重构重叠区域发现算法,使用union-find合并相邻扫描点
增加max_overlap_ranges和min_positions_per_range配置参数
在变换计算中引入RANSAC算法过滤异常值
调整默认扫描范围为(0,180)和(-30,30)以匹配全景相机特性
wenhongquan 4 hari lalu
induk
melakukan
5eb3bec480

+ 132 - 38
dual_camera_system/calibration.py

@@ -122,12 +122,14 @@ class OverlapDiscovery:
         ptz: PTZCamera,
         get_panorama_frame: Callable[[], np.ndarray],
         ptz_capture: Callable[[], Optional[np.ndarray]],
-        pan_range: Tuple[float, float] = (0, 360),
-        tilt_range: Tuple[float, float] = (-45, 45),
+        pan_range: Tuple[float, float] = (0, 180),
+        tilt_range: Tuple[float, float] = (-30, 30),
         pan_step: float = 20,
         tilt_step: float = 15,
         stabilize_time: float = 2.0,
-        on_progress: Callable[[int, int, str], None] = None
+        on_progress: Callable[[int, int, str], None] = None,
+        max_ranges: int = 3,
+        min_positions_per_range: int = 3
     ) -> List[OverlapRange]:
         """
         扫描球机视野范围,发现与全景画面有重叠的角度区间
@@ -215,7 +217,11 @@ class OverlapDiscovery:
         print(f"\n  发现 {len(scan_results)} 个有重叠的扫描位置")
 
         # 3. 合并相邻位置为重叠区间
-        overlap_ranges = self._merge_scan_results(scan_results)
+        overlap_ranges = self._merge_scan_results(
+            scan_results,
+            max_ranges=max_ranges,
+            min_positions=min_positions_per_range
+        )
 
         for i, r in enumerate(overlap_ranges):
             print(f"  重叠区间 {i+1}: pan=[{r.pan_start:.0f}°, {r.pan_end:.0f}°], "
@@ -228,38 +234,63 @@ class OverlapDiscovery:
         self,
         results: List[Tuple[float, float, int, float, float]],
         pan_tolerance: float = 25,
-        tilt_tolerance: float = 20
+        tilt_tolerance: float = 20,
+        max_ranges: int = 3,
+        min_positions: int = 3
     ) -> List[OverlapRange]:
         """
-        将相邻的扫描结果合并为重叠区间
+        使用union-find连通分量聚类合并相邻扫描结果
+        只保留最大的 max_ranges 个区间
         """
         if not results:
             return []
 
-        # 按pan排序
-        sorted_results = sorted(results, key=lambda r: (r[0], r[1]))
-
+        n = len(results)
+
+        # union-find
+        parent = list(range(n))
+
+        def find(x):
+            while parent[x] != x:
+                parent[x] = parent[parent[x]]
+                x = parent[x]
+            return x
+
+        def union(a, b):
+            ra, rb = find(a), find(b)
+            if ra != rb:
+                parent[ra] = rb
+
+        # 判断两点是否相邻
+        for i in range(n):
+            for j in range(i + 1, n):
+                pi, ti = results[i][0], results[i][1]
+                pj, tj = results[j][0], results[j][1]
+                if abs(pi - pj) <= pan_tolerance and abs(ti - tj) <= tilt_tolerance:
+                    union(i, j)
+
+        # 按连通分量分组
+        groups: Dict[int, List[int]] = {}
+        for i in range(n):
+            root = find(i)
+            if root not in groups:
+                groups[root] = []
+            groups[root].append(i)
+
+        # 转换为OverlapRange,过滤太小的组
         ranges = []
-        current_group = [sorted_results[0]]
-
-        for i in range(1, len(sorted_results)):
-            prev = sorted_results[i - 1]
-            curr = sorted_results[i]
-
-            # 判断是否相邻(pan差在步进内,tilt差在步进内)
-            pan_close = abs(curr[0] - prev[0]) <= pan_tolerance
-            tilt_close = abs(curr[1] - prev[1]) <= tilt_tolerance
+        for indices in groups.values():
+            if len(indices) < min_positions:
+                continue
+            group_data = [results[i] for i in indices]
+            ranges.append(self._group_to_range(group_data))
 
-            if pan_close and tilt_close:
-                current_group.append(curr)
-            else:
-                # 输出当前组
-                ranges.append(self._group_to_range(current_group))
-                current_group = [curr]
+        # 按match_count降序排序,只保留最大的 max_ranges 个
+        ranges.sort(key=lambda r: r.match_count, reverse=True)
+        ranges = ranges[:max_ranges]
 
-        # 最后一组
-        if current_group:
-            ranges.append(self._group_to_range(current_group))
+        # 按pan_start排序输出
+        ranges.sort(key=lambda r: r.pan_start)
 
         return ranges
 
@@ -448,10 +479,12 @@ class CameraCalibrator:
         self.use_feature_matching = True
 
         # 重叠发现配置
-        self.overlap_pan_range = (0, 360)
-        self.overlap_tilt_range = (-45, 45)
+        self.overlap_pan_range = (0, 180)
+        self.overlap_tilt_range = (-30, 30)
         self.overlap_pan_step = 20
         self.overlap_tilt_step = 15
+        self.max_overlap_ranges = 3
+        self.min_positions_per_range = 3
 
         # 回调
         self.on_progress: Optional[Callable[[int, int, str], None]] = None
@@ -483,7 +516,9 @@ class CameraCalibrator:
             pan_step=self.overlap_pan_step,
             tilt_step=self.overlap_tilt_step,
             stabilize_time=self.stabilize_time,
-            on_progress=self.on_progress
+            on_progress=self.on_progress,
+            max_ranges=self.max_overlap_ranges,
+            min_positions_per_range=self.min_positions_per_range
         )
 
         if not self.overlap_ranges:
@@ -703,19 +738,34 @@ class CameraCalibrator:
         return points
 
     def _calculate_transform(self, points: List[CalibrationPoint]) -> bool:
-        """最小二乘法拟合变换参数"""
+        """使用RANSAC + 最小二乘法拟合变换参数,剔除异常值"""
         try:
-            n = len(points)
-
-            A = np.ones((n, 3))
-            A[:, 1] = [p.x_ratio for p in points]
-            A[:, 2] = [p.y_ratio for p in points]
+            if len(points) < 4:
+                print(f"计算变换参数错误: 有效点不足({len(points)}个)")
+                return False
 
             pan_values = np.array([p.pan for p in points])
             tilt_values = np.array([p.tilt for p in points])
+            x_ratios = np.array([p.x_ratio for p in points])
+            y_ratios = np.array([p.y_ratio for p in points])
+
+            # RANSAC剔除异常值
+            inlier_mask = self._ransac_filter(x_ratios, y_ratios, pan_values, tilt_values)
+
+            inlier_count = np.sum(inlier_mask)
+            if inlier_count < 4:
+                print(f"RANSAC后有效点不足({inlier_count}个),使用全部点")
+                inlier_mask = np.ones(len(points), dtype=bool)
+            else:
+                print(f"RANSAC: {len(points)}个点中{inlier_count}个内点,剔除{len(points) - inlier_count}个异常值")
 
-            pan_params, _, _, _ = np.linalg.lstsq(A, pan_values, rcond=None)
-            tilt_params, _, _, _ = np.linalg.lstsq(A, tilt_values, rcond=None)
+            # 用内点拟合
+            A = np.ones((inlier_count, 3))
+            A[:, 1] = x_ratios[inlier_mask]
+            A[:, 2] = y_ratios[inlier_mask]
+
+            pan_params, _, _, _ = np.linalg.lstsq(A, pan_values[inlier_mask], rcond=None)
+            tilt_params, _, _, _ = np.linalg.lstsq(A, tilt_values[inlier_mask], rcond=None)
 
             self.pan_offset = pan_params[0]
             self.pan_scale_x = pan_params[1]
@@ -735,6 +785,50 @@ class CameraCalibrator:
             print(f"计算变换参数错误: {e}")
             return False
 
+    def _ransac_filter(self, x: np.ndarray, y: np.ndarray,
+                       pan: np.ndarray, tilt: np.ndarray,
+                       max_iterations: int = 200, threshold: float = 15.0,
+                       min_samples: int = 4) -> np.ndarray:
+        """RANSAC剔除变换拟合中的异常值"""
+        n = len(x)
+        best_inliers = np.zeros(n, dtype=bool)
+        best_inlier_count = 0
+
+        rng = np.random.RandomState(42)
+
+        for _ in range(max_iterations):
+            # 随机选min_samples个点
+            indices = rng.choice(n, min_samples, replace=False)
+
+            # 用这些点拟合
+            A = np.ones((min_samples, 3))
+            A[:, 1] = x[indices]
+            A[:, 2] = y[indices]
+
+            try:
+                pan_params, _, _, _ = np.linalg.lstsq(A, pan[indices], rcond=None)
+                tilt_params, _, _, _ = np.linalg.lstsq(A, tilt[indices], rcond=None)
+            except np.linalg.LinAlgError:
+                continue
+
+            # 计算所有点的误差
+            pred_pan = pan_params[0] + pan_params[1] * x + pan_params[2] * y
+            pred_tilt = tilt_params[0] + tilt_params[1] * x + tilt_params[2] * y
+
+            errors = np.sqrt((pred_pan - pan) ** 2 + (pred_tilt - tilt) ** 2)
+
+            inliers = errors < threshold
+            inlier_count = np.sum(inliers)
+
+            if inlier_count > best_inlier_count:
+                best_inlier_count = inlier_count
+                best_inliers = inliers
+
+        if best_inlier_count == 0:
+            return np.ones(n, dtype=bool)
+
+        return best_inliers
+
     def _calculate_rms_error(self, points: List[CalibrationPoint]) -> float:
         """计算均方根误差"""
         total_error = 0.0

+ 4 - 2
dual_camera_system/config/coordinator.py

@@ -17,11 +17,13 @@ CALIBRATION_CONFIG = {
     'min_valid_points': 4,            # 最少有效校准点数
     'rms_error_threshold': 5.0,       # RMS误差阈值(度)
     'overlap_discovery': {
-        'pan_range': (0, 360),        # 球机水平扫描范围(度)
-        'tilt_range': (-45, 45),      # 球机垂直扫描范围(度)
+        'pan_range': (0, 180),        # 球机水平扫描范围(度) - 改为180,全景相机通常覆盖180度
+        'tilt_range': (-30, 30),      # 球机垂直扫描范围(度)
         'pan_step': 20,               # 水平扫描步进(度)
         'tilt_step': 15,              # 垂直扫描步进(度)
         'min_match_threshold': 8,     # 最少特征匹配数(判定重叠)
         'stabilize_time': 2.0,        # 球机稳定等待时间(秒)
+        'max_overlap_ranges': 3,      # 最多保留的重叠区间数
+        'min_positions_per_range': 3, # 每个重叠区间最少扫描位置数
     },
 }

+ 4 - 2
dual_camera_system/main.py

@@ -229,11 +229,13 @@ class DualCameraSystem:
         
         # 配置重叠发现参数
         overlap_cfg = CALIBRATION_CONFIG.get('overlap_discovery', {})
-        self.calibrator.overlap_pan_range = overlap_cfg.get('pan_range', (0, 360))
-        self.calibrator.overlap_tilt_range = overlap_cfg.get('tilt_range', (-45, 45))
+        self.calibrator.overlap_pan_range = overlap_cfg.get('pan_range', (0, 180))
+        self.calibrator.overlap_tilt_range = overlap_cfg.get('tilt_range', (-30, 30))
         self.calibrator.overlap_pan_step = overlap_cfg.get('pan_step', 20)
         self.calibrator.overlap_tilt_step = overlap_cfg.get('tilt_step', 15)
         self.calibrator.stabilize_time = overlap_cfg.get('stabilize_time', 2.0)
+        self.calibrator.max_overlap_ranges = overlap_cfg.get('max_overlap_ranges', 3)
+        self.calibrator.min_positions_per_range = overlap_cfg.get('min_positions_per_range', 3)
         
         # 校准进度回调
         def on_progress(current: int, total: int, message: str):