| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074 |
- """
- 相机校准模块
- 实现全景相机与球机的自动校准
- 建立画面坐标到PTZ角度的映射关系
- 核心改进:先发现视野重叠区域,再在重叠区内校准,
- 避免球机指向与全景画面无重叠的方向导致校准失败。
- """
- import time
- import math
- import threading
- import numpy as np
- import cv2
- from typing import List, Tuple, Dict, Optional, Callable
- from dataclasses import dataclass, field
- from enum import Enum
- from ptz_camera import PTZCamera
- # 加载PTZ配置
- def _get_ptz_config():
- try:
- from config import PTZ_CONFIG
- return PTZ_CONFIG
- except ImportError:
- return {
- 'mount_type': 'wall',
- 'tilt_flip': False,
- 'pan_flip': False
- }
- class CalibrationState(Enum):
- IDLE = 0
- RUNNING = 1
- SUCCESS = 2
- FAILED = 3
- @dataclass
- class CalibrationPoint:
- pan: float
- tilt: float
- zoom: float = 1.0
- x_ratio: float = 0.0
- y_ratio: float = 0.0
- detected: bool = False
- @dataclass
- class CalibrationResult:
- success: bool
- points: List[CalibrationPoint]
- transform_matrix: Optional[np.ndarray] = None
- error_message: str = ""
- rms_error: float = 0.0
- @dataclass
- class OverlapRange:
- pan_start: float
- pan_end: float
- tilt_start: float
- tilt_end: float
- match_count: int
- panorama_center_x: float
- panorama_center_y: float
- MIN_MATCH_THRESHOLD = 8
- class OverlapDiscovery:
- """
- 视野重叠发现器
- 扫描球机视野范围,找出与全景画面有视觉重叠的角度区间
- """
- def __init__(self, feature_type: str = 'SIFT'):
- try:
- self.feature_detector = cv2.SIFT_create()
- self.feature_type = 'SIFT'
- except AttributeError:
- self.feature_detector = cv2.ORB_create(nfeatures=500)
- self.feature_type = 'ORB'
- norm_type = cv2.NORM_L2 if self.feature_type == 'SIFT' else cv2.NORM_HAMMING
- self.matcher = cv2.BFMatcher(norm_type)
- def match_frames(self, ptz_frame: np.ndarray, panorama_frame: np.ndarray
- ) -> Tuple[bool, int, float, float]:
- """
- 特征匹配球机画面与全景画面
- Returns: (是否匹配成功, 匹配点数, 全景画面中心x, 全景画面中心y)
- """
- if ptz_frame is None or panorama_frame is None:
- return (False, 0, 0.0, 0.0)
- try:
- ptz_gray = cv2.cvtColor(ptz_frame, cv2.COLOR_BGR2GRAY) if len(ptz_frame.shape) == 3 else ptz_frame
- pan_gray = cv2.cvtColor(panorama_frame, cv2.COLOR_BGR2GRAY) if len(panorama_frame.shape) == 3 else panorama_frame
- kp1, des1 = self.feature_detector.detectAndCompute(ptz_gray, None)
- kp2, des2 = self.feature_detector.detectAndCompute(pan_gray, None)
- if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
- return (False, 0, 0.0, 0.0)
- matches = self.matcher.knnMatch(des1, des2, k=2)
- good_matches = []
- for match_pair in matches:
- if len(match_pair) == 2:
- m, n = match_pair
- if m.distance < 0.75 * n.distance:
- good_matches.append(m)
- if len(good_matches) < MIN_MATCH_THRESHOLD:
- return (False, len(good_matches), 0.0, 0.0)
- pan_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches])
- center_x = np.mean(pan_pts[:, 0])
- center_y = np.mean(pan_pts[:, 1])
- return (True, len(good_matches), center_x, center_y)
- except Exception as e:
- print(f" 特征匹配异常: {e}")
- return (False, 0, 0.0, 0.0)
- def discover_overlap_ranges(
- self,
- ptz: PTZCamera,
- get_panorama_frame: Callable[[], np.ndarray],
- ptz_capture: Callable[[], Optional[np.ndarray]],
- pan_range: Tuple[float, float] = (0, 360),
- tilt_range: Tuple[float, float] = (-30, 30),
- pan_step: float = 20,
- tilt_step: float = 15,
- stabilize_time: float = 2.0,
- on_progress: Callable[[int, int, str], None] = None,
- max_ranges: int = 3,
- min_positions_per_range: int = 3
- ) -> List[OverlapRange]:
- """
- 扫描球机视野范围,发现与全景画面有重叠的角度区间
- 1. 先拍一张全景参考帧
- 2. 逐步移动球机到各个角度
- 3. 在每个位置抓拍球机画面,与全景做特征匹配
- 4. 记录有足够匹配点的角度
- 5. 合并相邻的有重叠的角度形成区间
- """
- print(f"\n{'='*50}")
- print(f"阶段1: 视野重叠发现")
- print(f"扫描范围: pan={pan_range}, tilt={tilt_range}")
- print(f"步进: pan={pan_step}°, tilt={tilt_step}°")
- print(f"{'='*50}")
- # 1. 拍全景参考帧
- print(" 获取全景参考帧...")
- ref_frames = []
- for _ in range(3):
- frame = get_panorama_frame()
- if frame is not None:
- ref_frames.append(frame)
- time.sleep(0.1)
- if not ref_frames:
- print(" 错误: 无法获取全景参考帧!")
- return []
- panorama_ref = ref_frames[0]
- print(f" 全景参考帧: {panorama_ref.shape}")
- # 2. 扫描各个角度
- scan_results: List[Tuple[float, float, int, float, float]] = []
- pan_values = np.arange(pan_range[0], pan_range[1] + pan_step, pan_step)
- tilt_values = np.arange(tilt_range[0], tilt_range[1] + tilt_step, tilt_step)
- total_positions = len(pan_values) * len(tilt_values)
- current_idx = 0
- for pan in pan_values:
- for tilt in tilt_values:
- current_idx += 1
- pos_desc = f"pan={pan:.0f}°, tilt={tilt:.0f}°"
- if on_progress:
- on_progress(current_idx, total_positions, f"扫描 {pos_desc}")
- print(f" [{current_idx}/{total_positions}] {pos_desc}")
- # 移动球机
- if not ptz.goto_exact_position(float(pan), float(tilt), 1):
- print(f" 移动球机失败, 跳过")
- continue
- time.sleep(stabilize_time)
- # 抓拍球机画面
- ptz_frame = ptz_capture() if ptz_capture else None
- if ptz_frame is None:
- print(f" 球机抓拍失败, 跳过")
- continue
- # 获取当前全景帧并匹配
- cur_panorama = get_panorama_frame()
- if cur_panorama is None:
- continue
- success, match_count, cx, cy = self.match_frames(ptz_frame, cur_panorama)
- if success:
- h, w = cur_panorama.shape[:2]
- x_ratio = cx / w
- y_ratio = cy / h
- print(f" ✓ 匹配成功: {match_count}个特征点, 全景位置=({x_ratio:.3f}, {y_ratio:.3f})")
- scan_results.append((float(pan), float(tilt), match_count, x_ratio, y_ratio))
- else:
- print(f" ✗ 匹配不足: {match_count}个特征点")
- if not scan_results:
- print("\n 未发现任何视野重叠位置!")
- return []
- print(f"\n 发现 {len(scan_results)} 个有重叠的扫描位置")
- # 3. 合并相邻位置为重叠区间
- overlap_ranges = self._merge_scan_results(
- scan_results,
- max_ranges=max_ranges,
- min_positions=min_positions_per_range
- )
- for i, r in enumerate(overlap_ranges):
- print(f" 重叠区间 {i+1}: pan=[{r.pan_start:.0f}°, {r.pan_end:.0f}°], "
- f"tilt=[{r.tilt_start:.0f}°, {r.tilt_end:.0f}°], "
- f"匹配点={r.match_count}")
- return overlap_ranges
- def _merge_scan_results(
- self,
- results: List[Tuple[float, float, int, float, float]],
- pan_tolerance: float = 25,
- tilt_tolerance: float = 20,
- max_ranges: int = 3,
- min_positions: int = 3
- ) -> List[OverlapRange]:
- """
- 使用union-find连通分量聚类合并相邻扫描结果
- 只保留最大的 max_ranges 个区间
- """
- if not results:
- return []
- n = len(results)
- # union-find
- parent = list(range(n))
- def find(x):
- while parent[x] != x:
- parent[x] = parent[parent[x]]
- x = parent[x]
- return x
- def union(a, b):
- ra, rb = find(a), find(b)
- if ra != rb:
- parent[ra] = rb
- # 判断两点是否相邻
- for i in range(n):
- for j in range(i + 1, n):
- pi, ti = results[i][0], results[i][1]
- pj, tj = results[j][0], results[j][1]
- if abs(pi - pj) <= pan_tolerance and abs(ti - tj) <= tilt_tolerance:
- union(i, j)
- # 按连通分量分组
- groups: Dict[int, List[int]] = {}
- for i in range(n):
- root = find(i)
- if root not in groups:
- groups[root] = []
- groups[root].append(i)
- # 转换为OverlapRange,过滤太小的组
- ranges = []
- for indices in groups.values():
- if len(indices) < min_positions:
- continue
- group_data = [results[i] for i in indices]
- ranges.append(self._group_to_range(group_data))
- # 按match_count降序排序,只保留最大的 max_ranges 个
- ranges.sort(key=lambda r: r.match_count, reverse=True)
- ranges = ranges[:max_ranges]
- # 按pan_start排序输出
- ranges.sort(key=lambda r: r.pan_start)
- return ranges
- def _group_to_range(self, group: List[Tuple[float, float, int, float, float]]) -> OverlapRange:
- """将一组扫描结果转换为一个OverlapRange"""
- pans = [r[0] for r in group]
- tilts = [r[1] for r in group]
- match_counts = [r[2] for r in group]
- x_ratios = [r[3] for r in group]
- y_ratios = [r[4] for r in group]
- step = 5 # 在边缘各扩展5度
- return OverlapRange(
- pan_start=min(pans) - step,
- pan_end=max(pans) + step,
- tilt_start=min(tilts) - step,
- tilt_end=max(tilts) + step,
- match_count=max(match_counts),
- panorama_center_x=float(np.mean(x_ratios)),
- panorama_center_y=float(np.mean(y_ratios))
- )
- class VisualCalibrationDetector:
- """
- 视觉校准检测器
- 通过运动检测和特征匹配定位球机在全景画面中的位置
- """
- def __init__(self):
- try:
- self.feature_detector = cv2.SIFT_create()
- self.feature_type = 'SIFT'
- except AttributeError:
- self.feature_detector = cv2.ORB_create(nfeatures=500)
- self.feature_type = 'ORB'
- self.matcher = cv2.BFMatcher(
- cv2.NORM_L2 if self.feature_type == 'SIFT' else cv2.NORM_HAMMING
- )
- self.use_motion_detection = True
- self.use_feature_matching = True
- def detect_by_motion(self, frames_before: np.ndarray,
- frames_after: np.ndarray) -> Optional[Tuple[float, float]]:
- """通过运动检测定位球机指向位置"""
- if frames_before is None or frames_after is None:
- return None
- before_gray = cv2.cvtColor(frames_before, cv2.COLOR_BGR2GRAY) \
- if len(frames_before.shape) == 3 else frames_before
- after_gray = cv2.cvtColor(frames_after, cv2.COLOR_BGR2GRAY) \
- if len(frames_after.shape) == 3 else frames_after
- diff = cv2.absdiff(before_gray, after_gray)
- _, thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
- kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15))
- thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
- thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
- contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
- if not contours:
- return None
- max_contour = max(contours, key=cv2.contourArea)
- area = cv2.contourArea(max_contour)
- if area < 500:
- return None
- M = cv2.moments(max_contour)
- if M["m00"] == 0:
- return None
- cx = M["m10"] / M["m00"]
- cy = M["m01"] / M["m00"]
- h, w = before_gray.shape
- print(f" 运动检测: 中心=({cx:.1f}, {cy:.1f}), 面积={area:.0f})")
- return (cx / w, cy / h)
- def detect_by_feature_match(self, panorama_frame: np.ndarray,
- ptz_frame: np.ndarray) -> Optional[Tuple[float, float]]:
- """通过特征匹配定位"""
- if panorama_frame is None or ptz_frame is None:
- return None
- try:
- pan_gray = cv2.cvtColor(panorama_frame, cv2.COLOR_BGR2GRAY) \
- if len(panorama_frame.shape) == 3 else panorama_frame
- ptz_gray = cv2.cvtColor(ptz_frame, cv2.COLOR_BGR2GRAY) \
- if len(ptz_frame.shape) == 3 else ptz_frame
- kp1, des1 = self.feature_detector.detectAndCompute(ptz_gray, None)
- kp2, des2 = self.feature_detector.detectAndCompute(pan_gray, None)
- if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
- return None
- matches = self.matcher.knnMatch(des1, des2, k=2)
- good_matches = []
- for match_pair in matches:
- if len(match_pair) == 2:
- m, n = match_pair
- if m.distance < 0.75 * n.distance:
- good_matches.append(m)
- if len(good_matches) < 4:
- return None
- pan_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches])
- center_x = np.mean(pan_pts[:, 0])
- center_y = np.mean(pan_pts[:, 1])
- h, w = pan_gray.shape
- print(f" 特征匹配: 匹配点={len(good_matches)}, 中心=({center_x:.1f}, {center_y:.1f})")
- return (center_x / w, center_y / h)
- except Exception as e:
- print(f" 特征匹配错误: {e}")
- return None
- def detect_position(self, panorama_frame: np.ndarray,
- frames_before: np.ndarray = None,
- frames_after: np.ndarray = None,
- ptz_frame: np.ndarray = None) -> Tuple[bool, float, float]:
- """综合检测球机在全景画面中的位置"""
- results = []
- if self.use_motion_detection and frames_before is not None and frames_after is not None:
- motion_result = self.detect_by_motion(frames_before, frames_after)
- if motion_result:
- results.append(('motion', motion_result, 0.4))
- if self.use_feature_matching and ptz_frame is not None:
- feature_result = self.detect_by_feature_match(panorama_frame, ptz_frame)
- if feature_result:
- results.append(('feature', feature_result, 0.6))
- if not results:
- return (False, 0.0, 0.0)
- total_weight = sum(r[2] for r in results)
- x_ratio = sum(r[1][0] * r[2] for r in results) / total_weight
- y_ratio = sum(r[1][1] * r[2] for r in results) / total_weight
- print(f" 融合结果: ({x_ratio:.3f}, {y_ratio:.3f})")
- return (True, x_ratio, y_ratio)
- class CameraCalibrator:
- """
- 相机校准器
- 两阶段校准:先发现视野重叠区域,再在重叠区内校准
- """
- def __init__(self, ptz_camera: PTZCamera,
- get_frame_func: Callable[[], np.ndarray],
- detect_marker_func: Callable[[np.ndarray], Optional[Tuple[float, float]]] = None,
- ptz_capture_func: Callable[[], Optional[np.ndarray]] = None):
- self.ptz = ptz_camera
- self.get_frame = get_frame_func
- self.detect_marker = detect_marker_func
- self.ptz_capture = ptz_capture_func
- self.visual_detector = VisualCalibrationDetector()
- self.overlap_discovery = OverlapDiscovery()
- self.state = CalibrationState.IDLE
- self.result: Optional[CalibrationResult] = None
- # 变换参数
- self.pan_offset = 0.0
- self.pan_scale_x = 1.0
- self.pan_scale_y = 0.0
- self.tilt_offset = 0.0
- self.tilt_scale_x = 0.0
- self.tilt_scale_y = 1.0
- # 校准配置
- self.stabilize_time = 2.0
- self.use_motion_detection = True
- self.use_feature_matching = True
- # 重叠发现配置
- self.overlap_pan_range = (0, 360)
- self.overlap_tilt_range = (-30, 30)
- self.overlap_pan_step = 20
- self.overlap_tilt_step = 15
- self.max_overlap_ranges = 3
- self.min_positions_per_range = 3
- # 回调
- self.on_progress: Optional[Callable[[int, int, str], None]] = None
- self.on_complete: Optional[Callable[[CalibrationResult], None]] = None
- # 发现的重叠区间
- self.overlap_ranges: List[OverlapRange] = []
- def calibrate(self, quick_mode: bool = True) -> CalibrationResult:
- """
- 执行校准 - 两阶段流程
-
- 阶段1: 视野重叠发现 - 扫描球机范围,找出与全景有重叠的角度区间
- 阶段2: 精确校准 - 仅在重叠区间内生成校准点,逐一验证后拟合变换
- """
- self.state = CalibrationState.RUNNING
- # ===================== 阶段1: 视野重叠发现 =====================
- print(f"\n{'='*60}")
- print(f"阶段1: 视野重叠发现 - 确定球机与全景的重叠区域")
- print(f"{'='*60}")
- self.overlap_ranges = self.overlap_discovery.discover_overlap_ranges(
- ptz=self.ptz,
- get_panorama_frame=self.get_frame,
- ptz_capture=self.ptz_capture,
- pan_range=self.overlap_pan_range,
- tilt_range=self.overlap_tilt_range,
- pan_step=self.overlap_pan_step,
- tilt_step=self.overlap_tilt_step,
- stabilize_time=self.stabilize_time,
- on_progress=self.on_progress,
- max_ranges=self.max_overlap_ranges,
- min_positions_per_range=self.min_positions_per_range
- )
- if not self.overlap_ranges:
- self.state = CalibrationState.FAILED
- self.result = CalibrationResult(
- success=False,
- points=[],
- error_message="未发现球机与全景的视野重叠区域,无法校准。请检查两台摄像头的安装位置和朝向。"
- )
- print(f"\n校准失败: {self.result.error_message}")
- if self.on_complete:
- self.on_complete(self.result)
- return self.result
- print(f"\n发现 {len(self.overlap_ranges)} 个重叠区间")
- # 选择匹配点最多的区间用于校准
- best_range = max(self.overlap_ranges, key=lambda r: r.match_count)
- self.overlap_ranges = [best_range]
- print(f"选择最佳重叠区间: pan=[{best_range.pan_start:.0f}°, {best_range.pan_end:.0f}°], "
- f"tilt=[{best_range.tilt_start:.0f}°, {best_range.tilt_end:.0f}°], "
- f"匹配点={best_range.match_count}")
- print(f"进入阶段2校准")
- # ===================== 阶段2: 在重叠区内精确校准 =====================
- print(f"\n{'='*60}")
- print(f"阶段2: 精确校准 - 在重叠区间内采集校准点")
- print(f"{'='*60}")
- calib_points = self._generate_points_in_overlaps(quick_mode)
- valid_points = []
- total_points = len(calib_points)
- print(f"生成 {total_points} 个校准点(仅位于重叠区域内)")
- for idx, point in enumerate(calib_points):
- if self.on_progress:
- self.on_progress(idx + 1, total_points,
- f"校准点 {idx + 1}/{total_points}: pan={point.pan:.1f}°, tilt={point.tilt:.1f}°")
- print(f"\n 校准点 {idx + 1}/{total_points}: pan={point.pan:.1f}°, tilt={point.tilt:.1f}°")
- # 步骤1: 获取移动前全景帧(用于运动检测)
- frames_before_list = []
- for _ in range(3):
- frame = self.get_frame()
- if frame is not None:
- frames_before_list.append(frame)
- time.sleep(0.1)
- if not frames_before_list:
- print(f" 警告: 无法获取移动前的全景画面")
- continue
- frames_before = np.mean(frames_before_list, axis=0).astype(np.uint8)
- # 步骤2: 移动球机到目标位置
- print(f" [2/4] 移动球机到目标位置...")
- if not self.ptz.goto_exact_position(point.pan, point.tilt, 1):
- print(f" 警告: 移动球机失败")
- continue
- time.sleep(self.stabilize_time)
- # 步骤3: 获取移动后全景帧和球机抓拍
- print(f" [3/4] 获取移动后的帧...")
- frames_after_list = []
- for _ in range(3):
- frame = self.get_frame()
- if frame is not None:
- frames_after_list.append(frame)
- time.sleep(0.1)
- if not frames_after_list:
- print(f" 警告: 无法获取移动后的全景画面")
- continue
- frames_after = np.mean(frames_after_list, axis=0).astype(np.uint8)
- panorama_frame = frames_after
- # 球机抓拍(关键:特征匹配需要球机画面)
- ptz_frame = None
- if self.use_feature_matching and self.ptz_capture:
- try:
- ptz_frame = self.ptz_capture()
- if ptz_frame is not None:
- print(f" 球机抓拍成功: {ptz_frame.shape}")
- except Exception as e:
- print(f" 球机抓拍失败: {e}")
- # 步骤4: 视觉检测 + 重叠验证
- print(f" [4/4] 视觉检测与重叠验证...")
- # 优先使用特征匹配(最可靠的方法)
- if ptz_frame is not None and panorama_frame is not None:
- success, match_count, cx, cy = self.overlap_discovery.match_frames(ptz_frame, panorama_frame)
- if success:
- h, w = panorama_frame.shape[:2]
- point.x_ratio = cx / w
- point.y_ratio = cy / h
- point.detected = True
- valid_points.append(point)
- print(f" ✓ 特征匹配验证通过: {match_count}个匹配点, "
- f"全景位置=({point.x_ratio:.3f}, {point.y_ratio:.3f})")
- continue
- else:
- print(f" ✗ 特征匹配不足({match_count}点), 尝试运动检测...")
- # 备选: 运动检测
- if self.use_motion_detection and frames_before is not None and frames_after is not None:
- motion_result = self.visual_detector.detect_by_motion(frames_before, frames_after)
- if motion_result:
- point.x_ratio, point.y_ratio = motion_result
- point.detected = True
- valid_points.append(point)
- print(f" ✓ 运动检测定位: ({point.x_ratio:.3f}, {point.y_ratio:.3f})")
- continue
- # 自定义标记检测
- if self.detect_marker:
- marker_pos = self.detect_marker(panorama_frame)
- if marker_pos:
- point.x_ratio, point.y_ratio = marker_pos
- point.detected = True
- valid_points.append(point)
- print(f" ✓ 标记检测成功: ({point.x_ratio:.3f}, {point.y_ratio:.3f})")
- continue
- # 所有方法均失败 - 跳过此点(不使用估算!)
- print(f" ✗ 此校准点无法验证,跳过(不使用估算)")
- # ===================== 检查有效校准点 =====================
- min_valid = 4
- if len(valid_points) < min_valid:
- self.state = CalibrationState.FAILED
- self.result = CalibrationResult(
- success=False,
- points=valid_points,
- error_message=f"有效校准点不足 (需要至少{min_valid}个, 实际{len(valid_points)}个)。"
- f"请检查球机与全景的视野重叠是否足够。"
- )
- print(f"\n校准失败: {self.result.error_message}")
- if self.on_complete:
- self.on_complete(self.result)
- return self.result
- # ===================== 计算变换参数 =====================
- success = self._calculate_transform(valid_points)
- if success:
- self.state = CalibrationState.SUCCESS
- rms_error = self._calculate_rms_error(valid_points)
- self.result = CalibrationResult(
- success=True,
- points=valid_points,
- rms_error=rms_error
- )
- print(f"\n{'='*60}")
- print(f"校准成功!")
- print(f"有效校准点: {len(valid_points)}")
- print(f"重叠区间数: {len(self.overlap_ranges)}")
- print(f"RMS误差: {rms_error:.4f}°")
-
- # 自动保存校准结果
- try:
- from config import CALIBRATION_CONFIG
- if CALIBRATION_CONFIG.get('auto_save', True):
- filepath = CALIBRATION_CONFIG.get('calibration_file', 'calibration.json')
- self.save_calibration(filepath)
- except Exception:
- pass
-
- print(f"{'='*60}")
- else:
- self.state = CalibrationState.FAILED
- self.result = CalibrationResult(
- success=False,
- points=valid_points,
- error_message="变换参数计算失败"
- )
- print(f"校准失败: {self.result.error_message}")
- if self.on_complete:
- self.on_complete(self.result)
- return self.result
- def _generate_points_in_overlaps(self, quick_mode: bool = True) -> List[CalibrationPoint]:
- """
- 在发现的重叠区间内生成校准点
- 只在球机和全景有视觉重叠的区域生成点
- """
- points = []
- if quick_mode:
- # 快速模式: 每个重叠区间内生成3-5个点
- for overlap in self.overlap_ranges:
- # 在区间中心生成点
- pan_center = (overlap.pan_start + overlap.pan_end) / 2
- tilt_center = (overlap.tilt_start + overlap.tilt_end) / 2
- pan_span = overlap.pan_end - overlap.pan_start
- tilt_span = overlap.tilt_end - overlap.tilt_start
- # 中心点
- points.append(CalibrationPoint(pan=pan_center, tilt=tilt_center, zoom=1.0))
- # 四角点(如果区间足够宽)
- if pan_span > 10:
- points.append(CalibrationPoint(
- pan=overlap.pan_start + pan_span * 0.25,
- tilt=tilt_center, zoom=1.0))
- points.append(CalibrationPoint(
- pan=overlap.pan_start + pan_span * 0.75,
- tilt=tilt_center, zoom=1.0))
- if tilt_span > 10:
- points.append(CalibrationPoint(
- pan=pan_center,
- tilt=overlap.tilt_start + tilt_span * 0.3, zoom=1.0))
- points.append(CalibrationPoint(
- pan=pan_center,
- tilt=overlap.tilt_start + tilt_span * 0.7, zoom=1.0))
- else:
- # 完整模式: 在每个重叠区间内均匀分布
- grid_size = 5
- for overlap in self.overlap_ranges:
- for i in range(grid_size):
- for j in range(grid_size):
- pan = overlap.pan_start + (overlap.pan_end - overlap.pan_start) * i / (grid_size - 1)
- tilt = overlap.tilt_start + (overlap.tilt_end - overlap.tilt_start) * j / (grid_size - 1)
- points.append(CalibrationPoint(pan=pan, tilt=tilt, zoom=1.0))
- return points
- def _calculate_transform(self, points: List[CalibrationPoint]) -> bool:
- """使用RANSAC + 最小二乘法拟合变换参数,剔除异常值"""
- try:
- if len(points) < 4:
- print(f"计算变换参数错误: 有效点不足({len(points)}个)")
- return False
- pan_values = np.array([p.pan for p in points])
- tilt_values = np.array([p.tilt for p in points])
- x_ratios = np.array([p.x_ratio for p in points])
- y_ratios = np.array([p.y_ratio for p in points])
- # RANSAC剔除异常值
- inlier_mask = self._ransac_filter(x_ratios, y_ratios, pan_values, tilt_values)
- inlier_count = np.sum(inlier_mask)
- if inlier_count < 4:
- print(f"RANSAC后有效点不足({inlier_count}个),使用全部点")
- inlier_mask = np.ones(len(points), dtype=bool)
- inlier_count = np.sum(inlier_mask) # 更新inlier_count
- else:
- print(f"RANSAC: {len(points)}个点中{inlier_count}个内点,剔除{len(points) - inlier_count}个异常值")
- # 用内点拟合
- A = np.ones((inlier_count, 3))
- A[:, 1] = x_ratios[inlier_mask]
- A[:, 2] = y_ratios[inlier_mask]
- pan_params, _, _, _ = np.linalg.lstsq(A, pan_values[inlier_mask], rcond=None)
- tilt_params, _, _, _ = np.linalg.lstsq(A, tilt_values[inlier_mask], rcond=None)
- self.pan_offset = pan_params[0]
- self.pan_scale_x = pan_params[1]
- self.pan_scale_y = pan_params[2]
- self.tilt_offset = tilt_params[0]
- self.tilt_scale_x = tilt_params[1]
- self.tilt_scale_y = tilt_params[2]
- print(f"变换参数:")
- print(f" pan = {self.pan_offset:.2f} + {self.pan_scale_x:.2f}*x + {self.pan_scale_y:.2f}*y")
- print(f" tilt = {self.tilt_offset:.2f} + {self.tilt_scale_x:.2f}*x + {self.tilt_scale_y:.2f}*y")
- return True
- except Exception as e:
- print(f"计算变换参数错误: {e}")
- return False
- def _ransac_filter(self, x: np.ndarray, y: np.ndarray,
- pan: np.ndarray, tilt: np.ndarray,
- max_iterations: int = 200, threshold: float = 15.0,
- min_samples: int = 4) -> np.ndarray:
- """RANSAC剔除变换拟合中的异常值"""
- n = len(x)
- best_inliers = np.zeros(n, dtype=bool)
- best_inlier_count = 0
- rng = np.random.RandomState(42)
- for _ in range(max_iterations):
- # 随机选min_samples个点
- indices = rng.choice(n, min_samples, replace=False)
- # 用这些点拟合
- A = np.ones((min_samples, 3))
- A[:, 1] = x[indices]
- A[:, 2] = y[indices]
- try:
- pan_params, _, _, _ = np.linalg.lstsq(A, pan[indices], rcond=None)
- tilt_params, _, _, _ = np.linalg.lstsq(A, tilt[indices], rcond=None)
- except np.linalg.LinAlgError:
- continue
- # 计算所有点的误差
- pred_pan = pan_params[0] + pan_params[1] * x + pan_params[2] * y
- pred_tilt = tilt_params[0] + tilt_params[1] * x + tilt_params[2] * y
- errors = np.sqrt((pred_pan - pan) ** 2 + (pred_tilt - tilt) ** 2)
- inliers = errors < threshold
- inlier_count = np.sum(inliers)
- if inlier_count > best_inlier_count:
- best_inlier_count = inlier_count
- best_inliers = inliers
- if best_inlier_count == 0:
- return np.ones(n, dtype=bool)
- return best_inliers
- def _calculate_rms_error(self, points: List[CalibrationPoint]) -> float:
- """计算均方根误差"""
- total_error = 0.0
- for p in points:
- pred_pan, pred_tilt = self.transform(p.x_ratio, p.y_ratio)
- error = math.sqrt((pred_pan - p.pan) ** 2 + (pred_tilt - p.tilt) ** 2)
- total_error += error ** 2
- return math.sqrt(total_error / len(points))
- def transform(self, x_ratio: float, y_ratio: float) -> Tuple[float, float]:
- """将全景坐标转换为PTZ角度"""
- pan = self.pan_offset + self.pan_scale_x * x_ratio + self.pan_scale_y * y_ratio
- tilt = self.tilt_offset + self.tilt_scale_x * x_ratio + self.tilt_scale_y * y_ratio
- return (pan, tilt)
- def inverse_transform(self, pan: float, tilt: float) -> Tuple[float, float]:
- """将PTZ角度转换为全景坐标(近似)"""
- x_ratio = (pan - self.pan_offset) / self.pan_scale_x if self.pan_scale_x != 0 else 0.5
- y_ratio = (tilt - self.tilt_offset) / self.tilt_scale_y if self.tilt_scale_y != 0 else 0.5
- return (max(0, min(1, x_ratio)), max(0, min(1, y_ratio)))
- def is_calibrated(self) -> bool:
- return self.state == CalibrationState.SUCCESS
- def get_state(self) -> CalibrationState:
- return self.state
- def get_result(self) -> Optional[CalibrationResult]:
- return self.result
- def get_overlap_ranges(self) -> List[OverlapRange]:
- """返回发现的重叠区间"""
- return self.overlap_ranges
- def save_calibration(self, filepath: str) -> bool:
- """保存校准结果"""
- if not self.is_calibrated():
- return False
- try:
- import json
- ptz_config = _get_ptz_config()
- data = {
- 'pan_offset': self.pan_offset,
- 'pan_scale_x': self.pan_scale_x,
- 'pan_scale_y': self.pan_scale_y,
- 'tilt_offset': self.tilt_offset,
- 'tilt_scale_x': self.tilt_scale_x,
- 'tilt_scale_y': self.tilt_scale_y,
- 'rms_error': self.result.rms_error if self.result else 0,
- 'overlap_ranges': [
- {
- 'pan_start': r.pan_start,
- 'pan_end': r.pan_end,
- 'tilt_start': r.tilt_start,
- 'tilt_end': r.tilt_end,
- 'match_count': r.match_count
- }
- for r in self.overlap_ranges
- ],
- # 保存安装方向配置
- 'mount_type': ptz_config.get('mount_type', 'wall'),
- 'tilt_flip': ptz_config.get('tilt_flip', False),
- 'pan_flip': ptz_config.get('pan_flip', False),
- }
- with open(filepath, 'w') as f:
- json.dump(data, f, indent=2)
- print(f"校准结果已保存: {filepath}")
- return True
- except Exception as e:
- print(f"保存校准结果失败: {e}")
- return False
- def load_calibration(self, filepath: str) -> bool:
- """加载校准结果"""
- try:
- import json
- with open(filepath, 'r') as f:
- data = json.load(f)
- self.pan_offset = data['pan_offset']
- self.pan_scale_x = data['pan_scale_x']
- self.pan_scale_y = data['pan_scale_y']
- self.tilt_offset = data['tilt_offset']
- self.tilt_scale_x = data['tilt_scale_x']
- self.tilt_scale_y = data['tilt_scale_y']
- # 加载重叠区间(如果有)
- if 'overlap_ranges' in data:
- self.overlap_ranges = [
- OverlapRange(
- pan_start=r['pan_start'],
- pan_end=r['pan_end'],
- tilt_start=r['tilt_start'],
- tilt_end=r['tilt_end'],
- match_count=r['match_count'],
- panorama_center_x=0,
- panorama_center_y=0
- )
- for r in data['overlap_ranges']
- ]
- self.state = CalibrationState.SUCCESS
- self.result = CalibrationResult(
- success=True,
- points=[],
- rms_error=data.get('rms_error', 0)
- )
-
- # 检查安装方向配置是否匹配
- ptz_config = _get_ptz_config()
- current_mount = ptz_config.get('mount_type', 'wall')
- saved_mount = data.get('mount_type', 'wall')
- if current_mount != saved_mount:
- print(f"警告: 当前安装类型({current_mount})与校准时的({saved_mount})不同,建议重新校准!")
- print(f"校准结果已加载: {filepath}")
- return True
- except FileNotFoundError:
- print(f"校准文件不存在: {filepath}")
- return False
- except Exception as e:
- print(f"加载校准结果失败: {e}")
- return False
- class CalibrationManager:
- """校准管理器"""
- def __init__(self, calibrator: CameraCalibrator, calibration_file: str = None):
- self.calibrator = calibrator
- # 优先使用传入的路径,否则从配置读取,最后使用默认值
- if calibration_file:
- self.calibration_file = calibration_file
- else:
- try:
- from config import CALIBRATION_CONFIG
- self.calibration_file = CALIBRATION_CONFIG.get(
- 'calibration_file', 'calibration.json'
- )
- except ImportError:
- self.calibration_file = 'calibration.json'
- def auto_calibrate(self, force: bool = False, fallback_on_failure: bool = True) -> CalibrationResult:
- """
- 自动校准
-
- Args:
- force: 是否强制重新校准(不加载已有数据)
- fallback_on_failure: 校准失败时是否回退使用已有数据
-
- Returns:
- 校准结果
- """
- # 检查是否启用加载上次校准数据
- load_on_startup = True # 默认启用
- try:
- from config import CALIBRATION_CONFIG
- load_on_startup = CALIBRATION_CONFIG.get('load_on_startup', True)
- except:
- pass
-
- # 如果不是强制校准,尝试加载已有数据
- if not force and load_on_startup:
- if self.calibrator.load_calibration(self.calibration_file):
- print("使用已有校准结果")
- return self.calibrator.get_result()
- # 执行新校准
- if force:
- print("强制重新校准(不使用已有数据)...")
- elif not load_on_startup:
- print("已禁用加载校准数据,开始新校准...")
- else:
- print("开始自动校准...")
-
- result = self.calibrator.calibrate(quick_mode=True)
- if result.success:
- self.calibrator.save_calibration(self.calibration_file)
- elif fallback_on_failure:
- # 校准失败,尝试回退使用已有数据
- print("校准失败,尝试回退使用已有校准数据...")
- if self.calibrator.load_calibration(self.calibration_file):
- print("已回退到已有校准数据")
- result = self.calibrator.get_result()
- return result
- def check_calibration(self) -> Tuple[bool, str]:
- """检查校准状态"""
- state = self.calibrator.get_state()
- if state == CalibrationState.SUCCESS:
- result = self.calibrator.get_result()
- overlaps = self.calibrator.get_overlap_ranges()
- overlap_info = f", {len(overlaps)}个重叠区间" if overlaps else ""
- return (True, f"校准有效, RMS误差: {result.rms_error:.4f}°{overlap_info}")
- elif state == CalibrationState.FAILED:
- return (False, "校准失败")
- elif state == CalibrationState.RUNNING:
- return (False, "校准进行中")
- else:
- return (False, "未校准")
|