| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027 |
- """
- 相机校准模块
- 实现全景相机与球机的自动校准
- 建立画面坐标到PTZ角度的映射关系
- 核心改进:先发现视野重叠区域,再在重叠区内校准,
- 避免球机指向与全景画面无重叠的方向导致校准失败。
- """
- import time
- import math
- import threading
- import numpy as np
- import cv2
- from typing import List, Tuple, Dict, Optional, Callable
- from dataclasses import dataclass, field
- from enum import Enum
- from ptz_camera import PTZCamera
- class CalibrationState(Enum):
- IDLE = 0
- RUNNING = 1
- SUCCESS = 2
- FAILED = 3
- @dataclass
- class CalibrationPoint:
- pan: float
- tilt: float
- zoom: float = 1.0
- x_ratio: float = 0.0
- y_ratio: float = 0.0
- detected: bool = False
- @dataclass
- class CalibrationResult:
- success: bool
- points: List[CalibrationPoint]
- transform_matrix: Optional[np.ndarray] = None
- error_message: str = ""
- rms_error: float = 0.0
- @dataclass
- class OverlapRange:
- pan_start: float
- pan_end: float
- tilt_start: float
- tilt_end: float
- match_count: int
- panorama_center_x: float
- panorama_center_y: float
- MIN_MATCH_THRESHOLD = 8
- class OverlapDiscovery:
- """
- 视野重叠发现器
- 扫描球机视野范围,找出与全景画面有视觉重叠的角度区间
- """
- def __init__(self, feature_type: str = 'SIFT'):
- try:
- self.feature_detector = cv2.SIFT_create()
- self.feature_type = 'SIFT'
- except AttributeError:
- self.feature_detector = cv2.ORB_create(nfeatures=500)
- self.feature_type = 'ORB'
- norm_type = cv2.NORM_L2 if self.feature_type == 'SIFT' else cv2.NORM_HAMMING
- self.matcher = cv2.BFMatcher(norm_type)
- def match_frames(self, ptz_frame: np.ndarray, panorama_frame: np.ndarray
- ) -> Tuple[bool, int, float, float]:
- """
- 特征匹配球机画面与全景画面
- Returns: (是否匹配成功, 匹配点数, 全景画面中心x, 全景画面中心y)
- """
- if ptz_frame is None or panorama_frame is None:
- return (False, 0, 0.0, 0.0)
- try:
- ptz_gray = cv2.cvtColor(ptz_frame, cv2.COLOR_BGR2GRAY) if len(ptz_frame.shape) == 3 else ptz_frame
- pan_gray = cv2.cvtColor(panorama_frame, cv2.COLOR_BGR2GRAY) if len(panorama_frame.shape) == 3 else panorama_frame
- kp1, des1 = self.feature_detector.detectAndCompute(ptz_gray, None)
- kp2, des2 = self.feature_detector.detectAndCompute(pan_gray, None)
- if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
- return (False, 0, 0.0, 0.0)
- matches = self.matcher.knnMatch(des1, des2, k=2)
- good_matches = []
- for match_pair in matches:
- if len(match_pair) == 2:
- m, n = match_pair
- if m.distance < 0.75 * n.distance:
- good_matches.append(m)
- if len(good_matches) < MIN_MATCH_THRESHOLD:
- return (False, len(good_matches), 0.0, 0.0)
- pan_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches])
- center_x = np.mean(pan_pts[:, 0])
- center_y = np.mean(pan_pts[:, 1])
- return (True, len(good_matches), center_x, center_y)
- except Exception as e:
- print(f" 特征匹配异常: {e}")
- return (False, 0, 0.0, 0.0)
- def discover_overlap_ranges(
- self,
- ptz: PTZCamera,
- get_panorama_frame: Callable[[], np.ndarray],
- ptz_capture: Callable[[], Optional[np.ndarray]],
- pan_range: Tuple[float, float] = (0, 360),
- tilt_range: Tuple[float, float] = (-30, 30),
- pan_step: float = 20,
- tilt_step: float = 15,
- stabilize_time: float = 2.0,
- on_progress: Callable[[int, int, str], None] = None,
- max_ranges: int = 3,
- min_positions_per_range: int = 3
- ) -> List[OverlapRange]:
- """
- 扫描球机视野范围,发现与全景画面有重叠的角度区间
- 1. 先拍一张全景参考帧
- 2. 逐步移动球机到各个角度
- 3. 在每个位置抓拍球机画面,与全景做特征匹配
- 4. 记录有足够匹配点的角度
- 5. 合并相邻的有重叠的角度形成区间
- """
- print(f"\n{'='*50}")
- print(f"阶段1: 视野重叠发现")
- print(f"扫描范围: pan={pan_range}, tilt={tilt_range}")
- print(f"步进: pan={pan_step}°, tilt={tilt_step}°")
- print(f"{'='*50}")
- # 1. 拍全景参考帧
- print(" 获取全景参考帧...")
- ref_frames = []
- for _ in range(3):
- frame = get_panorama_frame()
- if frame is not None:
- ref_frames.append(frame)
- time.sleep(0.1)
- if not ref_frames:
- print(" 错误: 无法获取全景参考帧!")
- return []
- panorama_ref = ref_frames[0]
- print(f" 全景参考帧: {panorama_ref.shape}")
- # 2. 扫描各个角度
- scan_results: List[Tuple[float, float, int, float, float]] = []
- pan_values = np.arange(pan_range[0], pan_range[1] + pan_step, pan_step)
- tilt_values = np.arange(tilt_range[0], tilt_range[1] + tilt_step, tilt_step)
- total_positions = len(pan_values) * len(tilt_values)
- current_idx = 0
- for pan in pan_values:
- for tilt in tilt_values:
- current_idx += 1
- pos_desc = f"pan={pan:.0f}°, tilt={tilt:.0f}°"
- if on_progress:
- on_progress(current_idx, total_positions, f"扫描 {pos_desc}")
- print(f" [{current_idx}/{total_positions}] {pos_desc}")
- # 移动球机
- if not ptz.goto_exact_position(float(pan), float(tilt), 1):
- print(f" 移动球机失败, 跳过")
- continue
- time.sleep(stabilize_time)
- # 抓拍球机画面
- ptz_frame = ptz_capture() if ptz_capture else None
- if ptz_frame is None:
- print(f" 球机抓拍失败, 跳过")
- continue
- # 获取当前全景帧并匹配
- cur_panorama = get_panorama_frame()
- if cur_panorama is None:
- continue
- success, match_count, cx, cy = self.match_frames(ptz_frame, cur_panorama)
- if success:
- h, w = cur_panorama.shape[:2]
- x_ratio = cx / w
- y_ratio = cy / h
- print(f" ✓ 匹配成功: {match_count}个特征点, 全景位置=({x_ratio:.3f}, {y_ratio:.3f})")
- scan_results.append((float(pan), float(tilt), match_count, x_ratio, y_ratio))
- else:
- print(f" ✗ 匹配不足: {match_count}个特征点")
- if not scan_results:
- print("\n 未发现任何视野重叠位置!")
- return []
- print(f"\n 发现 {len(scan_results)} 个有重叠的扫描位置")
- # 3. 合并相邻位置为重叠区间
- overlap_ranges = self._merge_scan_results(
- scan_results,
- max_ranges=max_ranges,
- min_positions=min_positions_per_range
- )
- for i, r in enumerate(overlap_ranges):
- print(f" 重叠区间 {i+1}: pan=[{r.pan_start:.0f}°, {r.pan_end:.0f}°], "
- f"tilt=[{r.tilt_start:.0f}°, {r.tilt_end:.0f}°], "
- f"匹配点={r.match_count}")
- return overlap_ranges
- def _merge_scan_results(
- self,
- results: List[Tuple[float, float, int, float, float]],
- pan_tolerance: float = 25,
- tilt_tolerance: float = 20,
- max_ranges: int = 3,
- min_positions: int = 3
- ) -> List[OverlapRange]:
- """
- 使用union-find连通分量聚类合并相邻扫描结果
- 只保留最大的 max_ranges 个区间
- """
- if not results:
- return []
- n = len(results)
- # union-find
- parent = list(range(n))
- def find(x):
- while parent[x] != x:
- parent[x] = parent[parent[x]]
- x = parent[x]
- return x
- def union(a, b):
- ra, rb = find(a), find(b)
- if ra != rb:
- parent[ra] = rb
- # 判断两点是否相邻
- for i in range(n):
- for j in range(i + 1, n):
- pi, ti = results[i][0], results[i][1]
- pj, tj = results[j][0], results[j][1]
- if abs(pi - pj) <= pan_tolerance and abs(ti - tj) <= tilt_tolerance:
- union(i, j)
- # 按连通分量分组
- groups: Dict[int, List[int]] = {}
- for i in range(n):
- root = find(i)
- if root not in groups:
- groups[root] = []
- groups[root].append(i)
- # 转换为OverlapRange,过滤太小的组
- ranges = []
- for indices in groups.values():
- if len(indices) < min_positions:
- continue
- group_data = [results[i] for i in indices]
- ranges.append(self._group_to_range(group_data))
- # 按match_count降序排序,只保留最大的 max_ranges 个
- ranges.sort(key=lambda r: r.match_count, reverse=True)
- ranges = ranges[:max_ranges]
- # 按pan_start排序输出
- ranges.sort(key=lambda r: r.pan_start)
- return ranges
- def _group_to_range(self, group: List[Tuple[float, float, int, float, float]]) -> OverlapRange:
- """将一组扫描结果转换为一个OverlapRange"""
- pans = [r[0] for r in group]
- tilts = [r[1] for r in group]
- match_counts = [r[2] for r in group]
- x_ratios = [r[3] for r in group]
- y_ratios = [r[4] for r in group]
- step = 5 # 在边缘各扩展5度
- return OverlapRange(
- pan_start=min(pans) - step,
- pan_end=max(pans) + step,
- tilt_start=min(tilts) - step,
- tilt_end=max(tilts) + step,
- match_count=max(match_counts),
- panorama_center_x=float(np.mean(x_ratios)),
- panorama_center_y=float(np.mean(y_ratios))
- )
- class VisualCalibrationDetector:
- """
- 视觉校准检测器
- 通过运动检测和特征匹配定位球机在全景画面中的位置
- """
- def __init__(self):
- try:
- self.feature_detector = cv2.SIFT_create()
- self.feature_type = 'SIFT'
- except AttributeError:
- self.feature_detector = cv2.ORB_create(nfeatures=500)
- self.feature_type = 'ORB'
- self.matcher = cv2.BFMatcher(
- cv2.NORM_L2 if self.feature_type == 'SIFT' else cv2.NORM_HAMMING
- )
- self.use_motion_detection = True
- self.use_feature_matching = True
- def detect_by_motion(self, frames_before: np.ndarray,
- frames_after: np.ndarray) -> Optional[Tuple[float, float]]:
- """通过运动检测定位球机指向位置"""
- if frames_before is None or frames_after is None:
- return None
- before_gray = cv2.cvtColor(frames_before, cv2.COLOR_BGR2GRAY) \
- if len(frames_before.shape) == 3 else frames_before
- after_gray = cv2.cvtColor(frames_after, cv2.COLOR_BGR2GRAY) \
- if len(frames_after.shape) == 3 else frames_after
- diff = cv2.absdiff(before_gray, after_gray)
- _, thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
- kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15))
- thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
- thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
- contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
- if not contours:
- return None
- max_contour = max(contours, key=cv2.contourArea)
- area = cv2.contourArea(max_contour)
- if area < 500:
- return None
- M = cv2.moments(max_contour)
- if M["m00"] == 0:
- return None
- cx = M["m10"] / M["m00"]
- cy = M["m01"] / M["m00"]
- h, w = before_gray.shape
- print(f" 运动检测: 中心=({cx:.1f}, {cy:.1f}), 面积={area:.0f})")
- return (cx / w, cy / h)
- def detect_by_feature_match(self, panorama_frame: np.ndarray,
- ptz_frame: np.ndarray) -> Optional[Tuple[float, float]]:
- """通过特征匹配定位"""
- if panorama_frame is None or ptz_frame is None:
- return None
- try:
- pan_gray = cv2.cvtColor(panorama_frame, cv2.COLOR_BGR2GRAY) \
- if len(panorama_frame.shape) == 3 else panorama_frame
- ptz_gray = cv2.cvtColor(ptz_frame, cv2.COLOR_BGR2GRAY) \
- if len(ptz_frame.shape) == 3 else ptz_frame
- kp1, des1 = self.feature_detector.detectAndCompute(ptz_gray, None)
- kp2, des2 = self.feature_detector.detectAndCompute(pan_gray, None)
- if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
- return None
- matches = self.matcher.knnMatch(des1, des2, k=2)
- good_matches = []
- for match_pair in matches:
- if len(match_pair) == 2:
- m, n = match_pair
- if m.distance < 0.75 * n.distance:
- good_matches.append(m)
- if len(good_matches) < 4:
- return None
- pan_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches])
- center_x = np.mean(pan_pts[:, 0])
- center_y = np.mean(pan_pts[:, 1])
- h, w = pan_gray.shape
- print(f" 特征匹配: 匹配点={len(good_matches)}, 中心=({center_x:.1f}, {center_y:.1f})")
- return (center_x / w, center_y / h)
- except Exception as e:
- print(f" 特征匹配错误: {e}")
- return None
- def detect_position(self, panorama_frame: np.ndarray,
- frames_before: np.ndarray = None,
- frames_after: np.ndarray = None,
- ptz_frame: np.ndarray = None) -> Tuple[bool, float, float]:
- """综合检测球机在全景画面中的位置"""
- results = []
- if self.use_motion_detection and frames_before is not None and frames_after is not None:
- motion_result = self.detect_by_motion(frames_before, frames_after)
- if motion_result:
- results.append(('motion', motion_result, 0.4))
- if self.use_feature_matching and ptz_frame is not None:
- feature_result = self.detect_by_feature_match(panorama_frame, ptz_frame)
- if feature_result:
- results.append(('feature', feature_result, 0.6))
- if not results:
- return (False, 0.0, 0.0)
- total_weight = sum(r[2] for r in results)
- x_ratio = sum(r[1][0] * r[2] for r in results) / total_weight
- y_ratio = sum(r[1][1] * r[2] for r in results) / total_weight
- print(f" 融合结果: ({x_ratio:.3f}, {y_ratio:.3f})")
- return (True, x_ratio, y_ratio)
- class CameraCalibrator:
- """
- 相机校准器
- 两阶段校准:先发现视野重叠区域,再在重叠区内校准
- """
- def __init__(self, ptz_camera: PTZCamera,
- get_frame_func: Callable[[], np.ndarray],
- detect_marker_func: Callable[[np.ndarray], Optional[Tuple[float, float]]] = None,
- ptz_capture_func: Callable[[], Optional[np.ndarray]] = None):
- self.ptz = ptz_camera
- self.get_frame = get_frame_func
- self.detect_marker = detect_marker_func
- self.ptz_capture = ptz_capture_func
- self.visual_detector = VisualCalibrationDetector()
- self.overlap_discovery = OverlapDiscovery()
- self.state = CalibrationState.IDLE
- self.result: Optional[CalibrationResult] = None
- # 变换参数
- self.pan_offset = 0.0
- self.pan_scale_x = 1.0
- self.pan_scale_y = 0.0
- self.tilt_offset = 0.0
- self.tilt_scale_x = 0.0
- self.tilt_scale_y = 1.0
- # 校准配置
- self.stabilize_time = 2.0
- self.use_motion_detection = True
- self.use_feature_matching = True
- # 重叠发现配置
- self.overlap_pan_range = (0, 360)
- self.overlap_tilt_range = (-30, 30)
- self.overlap_pan_step = 20
- self.overlap_tilt_step = 15
- self.max_overlap_ranges = 3
- self.min_positions_per_range = 3
- # 回调
- self.on_progress: Optional[Callable[[int, int, str], None]] = None
- self.on_complete: Optional[Callable[[CalibrationResult], None]] = None
- # 发现的重叠区间
- self.overlap_ranges: List[OverlapRange] = []
- def calibrate(self, quick_mode: bool = True) -> CalibrationResult:
- """
- 执行校准 - 两阶段流程
-
- 阶段1: 视野重叠发现 - 扫描球机范围,找出与全景有重叠的角度区间
- 阶段2: 精确校准 - 仅在重叠区间内生成校准点,逐一验证后拟合变换
- """
- self.state = CalibrationState.RUNNING
- # ===================== 阶段1: 视野重叠发现 =====================
- print(f"\n{'='*60}")
- print(f"阶段1: 视野重叠发现 - 确定球机与全景的重叠区域")
- print(f"{'='*60}")
- self.overlap_ranges = self.overlap_discovery.discover_overlap_ranges(
- ptz=self.ptz,
- get_panorama_frame=self.get_frame,
- ptz_capture=self.ptz_capture,
- pan_range=self.overlap_pan_range,
- tilt_range=self.overlap_tilt_range,
- pan_step=self.overlap_pan_step,
- tilt_step=self.overlap_tilt_step,
- stabilize_time=self.stabilize_time,
- on_progress=self.on_progress,
- max_ranges=self.max_overlap_ranges,
- min_positions_per_range=self.min_positions_per_range
- )
- if not self.overlap_ranges:
- self.state = CalibrationState.FAILED
- self.result = CalibrationResult(
- success=False,
- points=[],
- error_message="未发现球机与全景的视野重叠区域,无法校准。请检查两台摄像头的安装位置和朝向。"
- )
- print(f"\n校准失败: {self.result.error_message}")
- if self.on_complete:
- self.on_complete(self.result)
- return self.result
- print(f"\n发现 {len(self.overlap_ranges)} 个重叠区间")
- # 选择匹配点最多的区间用于校准
- best_range = max(self.overlap_ranges, key=lambda r: r.match_count)
- self.overlap_ranges = [best_range]
- print(f"选择最佳重叠区间: pan=[{best_range.pan_start:.0f}°, {best_range.pan_end:.0f}°], "
- f"tilt=[{best_range.tilt_start:.0f}°, {best_range.tilt_end:.0f}°], "
- f"匹配点={best_range.match_count}")
- print(f"进入阶段2校准")
- # ===================== 阶段2: 在重叠区内精确校准 =====================
- print(f"\n{'='*60}")
- print(f"阶段2: 精确校准 - 在重叠区间内采集校准点")
- print(f"{'='*60}")
- calib_points = self._generate_points_in_overlaps(quick_mode)
- valid_points = []
- total_points = len(calib_points)
- print(f"生成 {total_points} 个校准点(仅位于重叠区域内)")
- for idx, point in enumerate(calib_points):
- if self.on_progress:
- self.on_progress(idx + 1, total_points,
- f"校准点 {idx + 1}/{total_points}: pan={point.pan:.1f}°, tilt={point.tilt:.1f}°")
- print(f"\n 校准点 {idx + 1}/{total_points}: pan={point.pan:.1f}°, tilt={point.tilt:.1f}°")
- # 步骤1: 获取移动前全景帧(用于运动检测)
- frames_before_list = []
- for _ in range(3):
- frame = self.get_frame()
- if frame is not None:
- frames_before_list.append(frame)
- time.sleep(0.1)
- if not frames_before_list:
- print(f" 警告: 无法获取移动前的全景画面")
- continue
- frames_before = np.mean(frames_before_list, axis=0).astype(np.uint8)
- # 步骤2: 移动球机到目标位置
- print(f" [2/4] 移动球机到目标位置...")
- if not self.ptz.goto_exact_position(point.pan, point.tilt, 1):
- print(f" 警告: 移动球机失败")
- continue
- time.sleep(self.stabilize_time)
- # 步骤3: 获取移动后全景帧和球机抓拍
- print(f" [3/4] 获取移动后的帧...")
- frames_after_list = []
- for _ in range(3):
- frame = self.get_frame()
- if frame is not None:
- frames_after_list.append(frame)
- time.sleep(0.1)
- if not frames_after_list:
- print(f" 警告: 无法获取移动后的全景画面")
- continue
- frames_after = np.mean(frames_after_list, axis=0).astype(np.uint8)
- panorama_frame = frames_after
- # 球机抓拍(关键:特征匹配需要球机画面)
- ptz_frame = None
- if self.use_feature_matching and self.ptz_capture:
- try:
- ptz_frame = self.ptz_capture()
- if ptz_frame is not None:
- print(f" 球机抓拍成功: {ptz_frame.shape}")
- except Exception as e:
- print(f" 球机抓拍失败: {e}")
- # 步骤4: 视觉检测 + 重叠验证
- print(f" [4/4] 视觉检测与重叠验证...")
- # 优先使用特征匹配(最可靠的方法)
- if ptz_frame is not None and panorama_frame is not None:
- success, match_count, cx, cy = self.overlap_discovery.match_frames(ptz_frame, panorama_frame)
- if success:
- h, w = panorama_frame.shape[:2]
- point.x_ratio = cx / w
- point.y_ratio = cy / h
- point.detected = True
- valid_points.append(point)
- print(f" ✓ 特征匹配验证通过: {match_count}个匹配点, "
- f"全景位置=({point.x_ratio:.3f}, {point.y_ratio:.3f})")
- continue
- else:
- print(f" ✗ 特征匹配不足({match_count}点), 尝试运动检测...")
- # 备选: 运动检测
- if self.use_motion_detection and frames_before is not None and frames_after is not None:
- motion_result = self.visual_detector.detect_by_motion(frames_before, frames_after)
- if motion_result:
- point.x_ratio, point.y_ratio = motion_result
- point.detected = True
- valid_points.append(point)
- print(f" ✓ 运动检测定位: ({point.x_ratio:.3f}, {point.y_ratio:.3f})")
- continue
- # 自定义标记检测
- if self.detect_marker:
- marker_pos = self.detect_marker(panorama_frame)
- if marker_pos:
- point.x_ratio, point.y_ratio = marker_pos
- point.detected = True
- valid_points.append(point)
- print(f" ✓ 标记检测成功: ({point.x_ratio:.3f}, {point.y_ratio:.3f})")
- continue
- # 所有方法均失败 - 跳过此点(不使用估算!)
- print(f" ✗ 此校准点无法验证,跳过(不使用估算)")
- # ===================== 检查有效校准点 =====================
- min_valid = 4
- if len(valid_points) < min_valid:
- self.state = CalibrationState.FAILED
- self.result = CalibrationResult(
- success=False,
- points=valid_points,
- error_message=f"有效校准点不足 (需要至少{min_valid}个, 实际{len(valid_points)}个)。"
- f"请检查球机与全景的视野重叠是否足够。"
- )
- print(f"\n校准失败: {self.result.error_message}")
- if self.on_complete:
- self.on_complete(self.result)
- return self.result
- # ===================== 计算变换参数 =====================
- success = self._calculate_transform(valid_points)
- if success:
- self.state = CalibrationState.SUCCESS
- rms_error = self._calculate_rms_error(valid_points)
- self.result = CalibrationResult(
- success=True,
- points=valid_points,
- rms_error=rms_error
- )
- print(f"\n{'='*60}")
- print(f"校准成功!")
- print(f"有效校准点: {len(valid_points)}")
- print(f"重叠区间数: {len(self.overlap_ranges)}")
- print(f"RMS误差: {rms_error:.4f}°")
-
- # 自动保存校准结果
- try:
- from config import CALIBRATION_CONFIG
- if CALIBRATION_CONFIG.get('auto_save', True):
- filepath = CALIBRATION_CONFIG.get('calibration_file', 'calibration.json')
- self.save_calibration(filepath)
- except Exception:
- pass
-
- print(f"{'='*60}")
- else:
- self.state = CalibrationState.FAILED
- self.result = CalibrationResult(
- success=False,
- points=valid_points,
- error_message="变换参数计算失败"
- )
- print(f"校准失败: {self.result.error_message}")
- if self.on_complete:
- self.on_complete(self.result)
- return self.result
- def _generate_points_in_overlaps(self, quick_mode: bool = True) -> List[CalibrationPoint]:
- """
- 在发现的重叠区间内生成校准点
- 只在球机和全景有视觉重叠的区域生成点
- """
- points = []
- if quick_mode:
- # 快速模式: 每个重叠区间内生成3-5个点
- for overlap in self.overlap_ranges:
- # 在区间中心生成点
- pan_center = (overlap.pan_start + overlap.pan_end) / 2
- tilt_center = (overlap.tilt_start + overlap.tilt_end) / 2
- pan_span = overlap.pan_end - overlap.pan_start
- tilt_span = overlap.tilt_end - overlap.tilt_start
- # 中心点
- points.append(CalibrationPoint(pan=pan_center, tilt=tilt_center, zoom=1.0))
- # 四角点(如果区间足够宽)
- if pan_span > 10:
- points.append(CalibrationPoint(
- pan=overlap.pan_start + pan_span * 0.25,
- tilt=tilt_center, zoom=1.0))
- points.append(CalibrationPoint(
- pan=overlap.pan_start + pan_span * 0.75,
- tilt=tilt_center, zoom=1.0))
- if tilt_span > 10:
- points.append(CalibrationPoint(
- pan=pan_center,
- tilt=overlap.tilt_start + tilt_span * 0.3, zoom=1.0))
- points.append(CalibrationPoint(
- pan=pan_center,
- tilt=overlap.tilt_start + tilt_span * 0.7, zoom=1.0))
- else:
- # 完整模式: 在每个重叠区间内均匀分布
- grid_size = 5
- for overlap in self.overlap_ranges:
- for i in range(grid_size):
- for j in range(grid_size):
- pan = overlap.pan_start + (overlap.pan_end - overlap.pan_start) * i / (grid_size - 1)
- tilt = overlap.tilt_start + (overlap.tilt_end - overlap.tilt_start) * j / (grid_size - 1)
- points.append(CalibrationPoint(pan=pan, tilt=tilt, zoom=1.0))
- return points
- def _calculate_transform(self, points: List[CalibrationPoint]) -> bool:
- """使用RANSAC + 最小二乘法拟合变换参数,剔除异常值"""
- try:
- if len(points) < 4:
- print(f"计算变换参数错误: 有效点不足({len(points)}个)")
- return False
- pan_values = np.array([p.pan for p in points])
- tilt_values = np.array([p.tilt for p in points])
- x_ratios = np.array([p.x_ratio for p in points])
- y_ratios = np.array([p.y_ratio for p in points])
- # RANSAC剔除异常值
- inlier_mask = self._ransac_filter(x_ratios, y_ratios, pan_values, tilt_values)
- inlier_count = np.sum(inlier_mask)
- if inlier_count < 4:
- print(f"RANSAC后有效点不足({inlier_count}个),使用全部点")
- inlier_mask = np.ones(len(points), dtype=bool)
- inlier_count = np.sum(inlier_mask) # 更新inlier_count
- else:
- print(f"RANSAC: {len(points)}个点中{inlier_count}个内点,剔除{len(points) - inlier_count}个异常值")
- # 用内点拟合
- A = np.ones((inlier_count, 3))
- A[:, 1] = x_ratios[inlier_mask]
- A[:, 2] = y_ratios[inlier_mask]
- pan_params, _, _, _ = np.linalg.lstsq(A, pan_values[inlier_mask], rcond=None)
- tilt_params, _, _, _ = np.linalg.lstsq(A, tilt_values[inlier_mask], rcond=None)
- self.pan_offset = pan_params[0]
- self.pan_scale_x = pan_params[1]
- self.pan_scale_y = pan_params[2]
- self.tilt_offset = tilt_params[0]
- self.tilt_scale_x = tilt_params[1]
- self.tilt_scale_y = tilt_params[2]
- print(f"变换参数:")
- print(f" pan = {self.pan_offset:.2f} + {self.pan_scale_x:.2f}*x + {self.pan_scale_y:.2f}*y")
- print(f" tilt = {self.tilt_offset:.2f} + {self.tilt_scale_x:.2f}*x + {self.tilt_scale_y:.2f}*y")
- return True
- except Exception as e:
- print(f"计算变换参数错误: {e}")
- return False
- def _ransac_filter(self, x: np.ndarray, y: np.ndarray,
- pan: np.ndarray, tilt: np.ndarray,
- max_iterations: int = 200, threshold: float = 15.0,
- min_samples: int = 4) -> np.ndarray:
- """RANSAC剔除变换拟合中的异常值"""
- n = len(x)
- best_inliers = np.zeros(n, dtype=bool)
- best_inlier_count = 0
- rng = np.random.RandomState(42)
- for _ in range(max_iterations):
- # 随机选min_samples个点
- indices = rng.choice(n, min_samples, replace=False)
- # 用这些点拟合
- A = np.ones((min_samples, 3))
- A[:, 1] = x[indices]
- A[:, 2] = y[indices]
- try:
- pan_params, _, _, _ = np.linalg.lstsq(A, pan[indices], rcond=None)
- tilt_params, _, _, _ = np.linalg.lstsq(A, tilt[indices], rcond=None)
- except np.linalg.LinAlgError:
- continue
- # 计算所有点的误差
- pred_pan = pan_params[0] + pan_params[1] * x + pan_params[2] * y
- pred_tilt = tilt_params[0] + tilt_params[1] * x + tilt_params[2] * y
- errors = np.sqrt((pred_pan - pan) ** 2 + (pred_tilt - tilt) ** 2)
- inliers = errors < threshold
- inlier_count = np.sum(inliers)
- if inlier_count > best_inlier_count:
- best_inlier_count = inlier_count
- best_inliers = inliers
- if best_inlier_count == 0:
- return np.ones(n, dtype=bool)
- return best_inliers
- def _calculate_rms_error(self, points: List[CalibrationPoint]) -> float:
- """计算均方根误差"""
- total_error = 0.0
- for p in points:
- pred_pan, pred_tilt = self.transform(p.x_ratio, p.y_ratio)
- error = math.sqrt((pred_pan - p.pan) ** 2 + (pred_tilt - p.tilt) ** 2)
- total_error += error ** 2
- return math.sqrt(total_error / len(points))
- def transform(self, x_ratio: float, y_ratio: float) -> Tuple[float, float]:
- """将全景坐标转换为PTZ角度"""
- pan = self.pan_offset + self.pan_scale_x * x_ratio + self.pan_scale_y * y_ratio
- tilt = self.tilt_offset + self.tilt_scale_x * x_ratio + self.tilt_scale_y * y_ratio
- return (pan, tilt)
- def inverse_transform(self, pan: float, tilt: float) -> Tuple[float, float]:
- """将PTZ角度转换为全景坐标(近似)"""
- x_ratio = (pan - self.pan_offset) / self.pan_scale_x if self.pan_scale_x != 0 else 0.5
- y_ratio = (tilt - self.tilt_offset) / self.tilt_scale_y if self.tilt_scale_y != 0 else 0.5
- return (max(0, min(1, x_ratio)), max(0, min(1, y_ratio)))
- def is_calibrated(self) -> bool:
- return self.state == CalibrationState.SUCCESS
- def get_state(self) -> CalibrationState:
- return self.state
- def get_result(self) -> Optional[CalibrationResult]:
- return self.result
- def get_overlap_ranges(self) -> List[OverlapRange]:
- """返回发现的重叠区间"""
- return self.overlap_ranges
- def save_calibration(self, filepath: str) -> bool:
- """保存校准结果"""
- if not self.is_calibrated():
- return False
- try:
- import json
- data = {
- 'pan_offset': self.pan_offset,
- 'pan_scale_x': self.pan_scale_x,
- 'pan_scale_y': self.pan_scale_y,
- 'tilt_offset': self.tilt_offset,
- 'tilt_scale_x': self.tilt_scale_x,
- 'tilt_scale_y': self.tilt_scale_y,
- 'rms_error': self.result.rms_error if self.result else 0,
- 'overlap_ranges': [
- {
- 'pan_start': r.pan_start,
- 'pan_end': r.pan_end,
- 'tilt_start': r.tilt_start,
- 'tilt_end': r.tilt_end,
- 'match_count': r.match_count
- }
- for r in self.overlap_ranges
- ]
- }
- with open(filepath, 'w') as f:
- json.dump(data, f, indent=2)
- print(f"校准结果已保存: {filepath}")
- return True
- except Exception as e:
- print(f"保存校准结果失败: {e}")
- return False
- def load_calibration(self, filepath: str) -> bool:
- """加载校准结果"""
- try:
- import json
- with open(filepath, 'r') as f:
- data = json.load(f)
- self.pan_offset = data['pan_offset']
- self.pan_scale_x = data['pan_scale_x']
- self.pan_scale_y = data['pan_scale_y']
- self.tilt_offset = data['tilt_offset']
- self.tilt_scale_x = data['tilt_scale_x']
- self.tilt_scale_y = data['tilt_scale_y']
- # 加载重叠区间(如果有)
- if 'overlap_ranges' in data:
- self.overlap_ranges = [
- OverlapRange(
- pan_start=r['pan_start'],
- pan_end=r['pan_end'],
- tilt_start=r['tilt_start'],
- tilt_end=r['tilt_end'],
- match_count=r['match_count'],
- panorama_center_x=0,
- panorama_center_y=0
- )
- for r in data['overlap_ranges']
- ]
- self.state = CalibrationState.SUCCESS
- self.result = CalibrationResult(
- success=True,
- points=[],
- rms_error=data.get('rms_error', 0)
- )
- print(f"校准结果已加载: {filepath}")
- return True
- except FileNotFoundError:
- print(f"校准文件不存在: {filepath}")
- return False
- except Exception as e:
- print(f"加载校准结果失败: {e}")
- return False
- class CalibrationManager:
- """校准管理器"""
- def __init__(self, calibrator: CameraCalibrator, calibration_file: str = None):
- self.calibrator = calibrator
- # 优先使用传入的路径,否则从配置读取,最后使用默认值
- if calibration_file:
- self.calibration_file = calibration_file
- else:
- try:
- from config import CALIBRATION_CONFIG
- self.calibration_file = CALIBRATION_CONFIG.get(
- 'calibration_file', 'calibration.json'
- )
- except ImportError:
- self.calibration_file = 'calibration.json'
- def auto_calibrate(self, force: bool = False) -> CalibrationResult:
- """自动校准"""
- # 检查是否启用加载上次校准数据
- load_on_startup = True # 默认启用
- try:
- from config import CALIBRATION_CONFIG
- load_on_startup = CALIBRATION_CONFIG.get('load_on_startup', True)
- except:
- pass
-
- if not force and load_on_startup:
- if self.calibrator.load_calibration(self.calibration_file):
- print("使用已有校准结果")
- return self.calibrator.get_result()
- print("开始自动校准..." if load_on_startup else "已禁用加载校准数据,开始新校准...")
- result = self.calibrator.calibrate(quick_mode=True)
- if result.success:
- self.calibrator.save_calibration(self.calibration_file)
- return result
- def check_calibration(self) -> Tuple[bool, str]:
- """检查校准状态"""
- state = self.calibrator.get_state()
- if state == CalibrationState.SUCCESS:
- result = self.calibrator.get_result()
- overlaps = self.calibrator.get_overlap_ranges()
- overlap_info = f", {len(overlaps)}个重叠区间" if overlaps else ""
- return (True, f"校准有效, RMS误差: {result.rms_error:.4f}°{overlap_info}")
- elif state == CalibrationState.FAILED:
- return (False, "校准失败")
- elif state == CalibrationState.RUNNING:
- return (False, "校准进行中")
- else:
- return (False, "未校准")
|