| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668 |
- """
- 相机校准模块
- 实现全景相机与球机的自动校准
- 建立画面坐标到PTZ角度的映射关系
- 核心改进:先发现视野重叠区域,再在重叠区内校准,
- 避免球机指向与全景画面无重叠的方向导致校准失败。
- """
- import time
- import math
- import threading
- import logging
- import numpy as np
- import cv2
- from typing import List, Tuple, Dict, Optional, Callable
- from dataclasses import dataclass, field
- from enum import Enum
- from ptz_camera import PTZCamera
- logger = logging.getLogger(__name__)
- # 加载PTZ配置
- def _get_ptz_config():
- try:
- from config import PTZ_CONFIG
- return PTZ_CONFIG
- except ImportError:
- return {
- 'mount_type': 'wall',
- 'tilt_flip': False,
- 'pan_flip': False
- }
- class CalibrationState(Enum):
- IDLE = 0
- RUNNING = 1
- SUCCESS = 2
- FAILED = 3
- @dataclass
- class CalibrationPoint:
- pan: float
- tilt: float
- zoom: float = 1.0
- x_ratio: float = 0.0
- y_ratio: float = 0.0
- detected: bool = False
- match_count: int = 0
- @dataclass
- class CalibrationResult:
- success: bool
- points: List[CalibrationPoint]
- transform_matrix: Optional[np.ndarray] = None
- error_message: str = ""
- rms_error: float = 0.0
- @dataclass
- class OverlapRange:
- pan_start: float
- pan_end: float
- tilt_start: float
- tilt_end: float
- match_count: int
- panorama_center_x: float
- panorama_center_y: float
- MIN_MATCH_THRESHOLD = 8
- class OverlapDiscovery:
- """
- 视野重叠发现器
- 扫描球机视野范围,找出与全景画面有视觉重叠的角度区间
- """
- def __init__(self, feature_type: str = 'SIFT'):
- try:
- self.feature_detector = cv2.SIFT_create()
- self.feature_type = 'SIFT'
- except AttributeError:
- self.feature_detector = cv2.ORB_create(nfeatures=500)
- self.feature_type = 'ORB'
- norm_type = cv2.NORM_L2 if self.feature_type == 'SIFT' else cv2.NORM_HAMMING
- self.matcher = cv2.BFMatcher(norm_type)
- def match_frames(self, ptz_frame: np.ndarray, panorama_frame: np.ndarray
- ) -> Tuple[bool, int, float, float]:
- """
- 特征匹配球机画面与全景画面
- Returns: (是否匹配成功, 匹配点数, 全景画面中心x, 全景画面中心y)
- """
- if ptz_frame is None or panorama_frame is None:
- return (False, 0, 0.0, 0.0)
- try:
- ptz_gray = cv2.cvtColor(ptz_frame, cv2.COLOR_BGR2GRAY) if len(ptz_frame.shape) == 3 else ptz_frame
- pan_gray = cv2.cvtColor(panorama_frame, cv2.COLOR_BGR2GRAY) if len(panorama_frame.shape) == 3 else panorama_frame
- # 缩小图像加速特征提取(匹配坐标按比例还原)
- ptz_scale = 1.0
- pan_scale = 1.0
- max_dim = 960
- if ptz_gray.shape[1] > max_dim:
- ptz_scale = max_dim / ptz_gray.shape[1]
- ptz_gray = cv2.resize(ptz_gray, None, fx=ptz_scale, fy=ptz_scale,
- interpolation=cv2.INTER_AREA)
- if pan_gray.shape[1] > max_dim:
- pan_scale = max_dim / pan_gray.shape[1]
- pan_gray = cv2.resize(pan_gray, None, fx=pan_scale, fy=pan_scale,
- interpolation=cv2.INTER_AREA)
- kp1, des1 = self.feature_detector.detectAndCompute(ptz_gray, None)
- kp2, des2 = self.feature_detector.detectAndCompute(pan_gray, None)
- if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
- return (False, 0, 0.0, 0.0)
- matches = self.matcher.knnMatch(des1, des2, k=2)
- good_matches = []
- for match_pair in matches:
- if len(match_pair) == 2:
- m, n = match_pair
- if m.distance < 0.75 * n.distance:
- good_matches.append(m)
- if len(good_matches) < MIN_MATCH_THRESHOLD:
- return (False, len(good_matches), 0.0, 0.0)
- pan_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches])
- # 还原到原始图像坐标系
- center_x = np.mean(pan_pts[:, 0]) / pan_scale
- center_y = np.mean(pan_pts[:, 1]) / pan_scale
- return (True, len(good_matches), center_x, center_y)
- except Exception as e:
- logger.error(f"特征匹配异常: {e}")
- return (False, 0, 0.0, 0.0)
- def discover_overlap_ranges(
- self,
- ptz: PTZCamera,
- get_panorama_frame: Callable[[], np.ndarray],
- ptz_capture: Callable[[], Optional[np.ndarray]],
- pan_range: Tuple[float, float] = (0, 360),
- tilt_range: Tuple[float, float] = (-20, 40),
- pan_step: float = 20,
- tilt_step: float = 15,
- stabilize_time: float = 2.0,
- on_progress: Callable[[int, int, str], None] = None,
- max_ranges: int = 3,
- min_positions_per_range: int = 3
- ) -> List[OverlapRange]:
- """
- 扫描球机视野范围,发现与全景画面有重叠的角度区间
- 1. 先拍一张全景参考帧
- 2. 逐步移动球机到各个角度
- 3. 在每个位置抓拍球机画面,与全景做特征匹配
- 4. 记录有足够匹配点的角度
- 5. 合并相邻的有重叠的角度形成区间
- """
- logger.info(f"阶段1: 视野重叠发现, 扫描范围: pan={pan_range}, tilt={tilt_range}, 步进: pan={pan_step}°, tilt={tilt_step}°")
- # 1. 拍全景参考帧
- logger.info("获取全景参考帧...")
- ref_frames = []
- for _ in range(3):
- frame = get_panorama_frame()
- if frame is not None:
- ref_frames.append(frame)
- time.sleep(0.1)
- if not ref_frames:
- logger.error("无法获取全景参考帧!")
- return []
- panorama_ref = ref_frames[0]
- logger.info(f"全景参考帧: {panorama_ref.shape}")
- # 2. 扫描各个角度
- scan_results: List[Tuple[float, float, int, float, float]] = []
- pan_values = np.arange(pan_range[0], pan_range[1] + pan_step, pan_step)
- tilt_values = np.arange(tilt_range[0], tilt_range[1] + tilt_step, tilt_step)
- total_positions = len(pan_values) * len(tilt_values)
- current_idx = 0
- for pan in pan_values:
- for tilt in tilt_values:
- current_idx += 1
- pos_desc = f"pan={pan:.0f}°, tilt={tilt:.0f}°"
- if on_progress:
- on_progress(current_idx, total_positions, f"扫描 {pos_desc}")
- logger.info(f"[{current_idx}/{total_positions}] {pos_desc}")
- # 移动球机
- if not ptz.goto_exact_position(float(pan), float(tilt), 1):
- logger.warning(f"移动球机失败, 跳过")
- continue
- time.sleep(stabilize_time)
- # 抓拍球机画面
- ptz_frame = ptz_capture() if ptz_capture else None
- if ptz_frame is None:
- logger.warning(f"球机抓拍失败, 跳过")
- continue
- # 获取当前全景帧并匹配
- cur_panorama = get_panorama_frame()
- if cur_panorama is None:
- continue
- success, match_count, cx, cy = self.match_frames(ptz_frame, cur_panorama)
- if success:
- h, w = cur_panorama.shape[:2]
- x_ratio = cx / w
- y_ratio = cy / h
- logger.info(f"匹配成功: {match_count}个特征点, 全景位置=({x_ratio:.3f}, {y_ratio:.3f})")
- scan_results.append((float(pan), float(tilt), match_count, x_ratio, y_ratio))
- else:
- logger.debug(f"匹配不足: {match_count}个特征点")
- if not scan_results:
- logger.warning("未发现任何视野重叠位置!")
- return []
- logger.info(f"发现 {len(scan_results)} 个有重叠的扫描位置")
- # 保存原始扫描结果供后续校准使用
- self.scan_results = scan_results
- # 3. 合并相邻位置为重叠区间
- overlap_ranges = self._merge_scan_results(
- scan_results,
- max_ranges=max_ranges,
- min_positions=min_positions_per_range
- )
- for i, r in enumerate(overlap_ranges):
- logger.info(f"重叠区间 {i+1}: pan=[{r.pan_start:.0f}°, {r.pan_end:.0f}°], "
- f"tilt=[{r.tilt_start:.0f}°, {r.tilt_end:.0f}°], "
- f"匹配点={r.match_count}")
- return overlap_ranges
- def _merge_scan_results(
- self,
- results: List[Tuple[float, float, int, float, float]],
- pan_tolerance: float = 20,
- tilt_tolerance: float = 35,
- max_ranges: int = 3,
- min_positions: int = 2
- ) -> List[OverlapRange]:
- """
- 使用union-find连通分量聚类合并相邻扫描结果
- 只保留最大的 max_ranges 个区间
- """
- if not results:
- return []
- n = len(results)
- # union-find
- parent = list(range(n))
- def find(x):
- while parent[x] != x:
- parent[x] = parent[parent[x]]
- x = parent[x]
- return x
- def union(a, b):
- ra, rb = find(a), find(b)
- if ra != rb:
- parent[ra] = rb
- # 判断两点是否相邻
- for i in range(n):
- for j in range(i + 1, n):
- pi, ti = results[i][0], results[i][1]
- pj, tj = results[j][0], results[j][1]
- if abs(pi - pj) <= pan_tolerance and abs(ti - tj) <= tilt_tolerance:
- union(i, j)
- # 按连通分量分组
- groups: Dict[int, List[int]] = {}
- for i in range(n):
- root = find(i)
- if root not in groups:
- groups[root] = []
- groups[root].append(i)
- # 转换为OverlapRange,过滤太小的组
- ranges = []
- for indices in groups.values():
- if len(indices) < min_positions:
- continue
- group_data = [results[i] for i in indices]
- ranges.append(self._group_to_range(group_data))
- # 按match_count降序排序,只保留最大的 max_ranges 个
- ranges.sort(key=lambda r: r.match_count, reverse=True)
- ranges = ranges[:max_ranges]
- # 按pan_start排序输出
- ranges.sort(key=lambda r: r.pan_start)
- return ranges
- def _group_to_range(self, group: List[Tuple[float, float, int, float, float]]) -> OverlapRange:
- """将一组扫描结果转换为一个OverlapRange"""
- pans = [r[0] for r in group]
- tilts = [r[1] for r in group]
- match_counts = [r[2] for r in group]
- x_ratios = [r[3] for r in group]
- y_ratios = [r[4] for r in group]
- step = 5 # 在边缘各扩展5度
- return OverlapRange(
- pan_start=min(pans) - step,
- pan_end=max(pans) + step,
- tilt_start=min(tilts) - step,
- tilt_end=max(tilts) + step,
- match_count=max(match_counts),
- panorama_center_x=float(np.mean(x_ratios)),
- panorama_center_y=float(np.mean(y_ratios))
- )
- class VisualCalibrationDetector:
- """
- 视觉校准检测器
- 通过运动检测和特征匹配定位球机在全景画面中的位置
- """
- def __init__(self):
- try:
- self.feature_detector = cv2.SIFT_create()
- self.feature_type = 'SIFT'
- except AttributeError:
- self.feature_detector = cv2.ORB_create(nfeatures=500)
- self.feature_type = 'ORB'
- self.matcher = cv2.BFMatcher(
- cv2.NORM_L2 if self.feature_type == 'SIFT' else cv2.NORM_HAMMING
- )
- self.use_motion_detection = True
- self.use_feature_matching = True
- def detect_by_motion(self, frames_before: np.ndarray,
- frames_after: np.ndarray) -> Optional[Tuple[float, float]]:
- """通过运动检测定位球机指向位置"""
- if frames_before is None or frames_after is None:
- return None
- before_gray = cv2.cvtColor(frames_before, cv2.COLOR_BGR2GRAY) \
- if len(frames_before.shape) == 3 else frames_before
- after_gray = cv2.cvtColor(frames_after, cv2.COLOR_BGR2GRAY) \
- if len(frames_after.shape) == 3 else frames_after
- diff = cv2.absdiff(before_gray, after_gray)
- _, thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
- kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15))
- thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
- thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
- contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
- if not contours:
- return None
- max_contour = max(contours, key=cv2.contourArea)
- area = cv2.contourArea(max_contour)
- if area < 500:
- return None
- M = cv2.moments(max_contour)
- if M["m00"] == 0:
- return None
- cx = M["m10"] / M["m00"]
- cy = M["m01"] / M["m00"]
- h, w = before_gray.shape
- logger.debug(f"运动检测: 中心=({cx:.1f}, {cy:.1f}), 面积={area:.0f})")
- return (cx / w, cy / h)
- def detect_by_feature_match(self, panorama_frame: np.ndarray,
- ptz_frame: np.ndarray) -> Optional[Tuple[float, float]]:
- """通过特征匹配定位"""
- if panorama_frame is None or ptz_frame is None:
- return None
- try:
- pan_gray = cv2.cvtColor(panorama_frame, cv2.COLOR_BGR2GRAY) \
- if len(panorama_frame.shape) == 3 else panorama_frame
- ptz_gray = cv2.cvtColor(ptz_frame, cv2.COLOR_BGR2GRAY) \
- if len(ptz_frame.shape) == 3 else ptz_frame
- # 缩小图像加速
- max_dim = 960
- ptz_scale = 1.0
- pan_scale = 1.0
- if ptz_gray.shape[1] > max_dim:
- ptz_scale = max_dim / ptz_gray.shape[1]
- ptz_gray = cv2.resize(ptz_gray, None, fx=ptz_scale, fy=ptz_scale,
- interpolation=cv2.INTER_AREA)
- if pan_gray.shape[1] > max_dim:
- pan_scale = max_dim / pan_gray.shape[1]
- pan_gray = cv2.resize(pan_gray, None, fx=pan_scale, fy=pan_scale,
- interpolation=cv2.INTER_AREA)
- kp1, des1 = self.feature_detector.detectAndCompute(ptz_gray, None)
- kp2, des2 = self.feature_detector.detectAndCompute(pan_gray, None)
- if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
- return None
- matches = self.matcher.knnMatch(des1, des2, k=2)
- good_matches = []
- for match_pair in matches:
- if len(match_pair) == 2:
- m, n = match_pair
- if m.distance < 0.75 * n.distance:
- good_matches.append(m)
- if len(good_matches) < 4:
- return None
- pan_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches])
- center_x = np.mean(pan_pts[:, 0])
- center_y = np.mean(pan_pts[:, 1])
- h, w = pan_gray.shape
- logger.debug(f"特征匹配: 匹配点={len(good_matches)}, 中心=({center_x:.1f}, {center_y:.1f})")
- return (center_x / w, center_y / h)
- except Exception as e:
- logger.error(f"特征匹配错误: {e}")
- return None
- def detect_position(self, panorama_frame: np.ndarray,
- frames_before: np.ndarray = None,
- frames_after: np.ndarray = None,
- ptz_frame: np.ndarray = None) -> Tuple[bool, float, float]:
- """综合检测球机在全景画面中的位置"""
- results = []
- if self.use_motion_detection and frames_before is not None and frames_after is not None:
- motion_result = self.detect_by_motion(frames_before, frames_after)
- if motion_result:
- results.append(('motion', motion_result, 0.4))
- if self.use_feature_matching and ptz_frame is not None:
- feature_result = self.detect_by_feature_match(panorama_frame, ptz_frame)
- if feature_result:
- results.append(('feature', feature_result, 0.6))
- if not results:
- return (False, 0.0, 0.0)
- total_weight = sum(r[2] for r in results)
- x_ratio = sum(r[1][0] * r[2] for r in results) / total_weight
- y_ratio = sum(r[1][1] * r[2] for r in results) / total_weight
- logger.debug(f"融合结果: ({x_ratio:.3f}, {y_ratio:.3f})")
- return (True, x_ratio, y_ratio)
- class CameraCalibrator:
- """
- 相机校准器
- 两阶段校准:先发现视野重叠区域,再在重叠区内校准
- """
- def __init__(self, ptz_camera: PTZCamera,
- get_frame_func: Callable[[], np.ndarray],
- detect_marker_func: Callable[[np.ndarray], Optional[Tuple[float, float]]] = None,
- ptz_capture_func: Callable[[], Optional[np.ndarray]] = None):
- self.ptz = ptz_camera
- self.get_frame = get_frame_func
- self.detect_marker = detect_marker_func
- self.ptz_capture = ptz_capture_func
- self.visual_detector = VisualCalibrationDetector()
- self.overlap_discovery = OverlapDiscovery()
- self.state = CalibrationState.IDLE
- self.result: Optional[CalibrationResult] = None
- # 变换参数 (线性模型 - 作为后备)
- self.pan_offset = 0.0
- self.pan_scale_x = 1.0
- self.pan_scale_y = 0.0
- self.tilt_offset = 0.0
- self.tilt_scale_x = 0.0
- self.tilt_scale_y = 1.0
- # 分段线性查找表 (主变换方法)
- # 存储 x_ratio → pan 和 y_ratio → tilt 的映射
- self.pan_lookup: List[Tuple[float, float]] = [] # [(x_ratio, pan), ...] sorted by x_ratio
- self.tilt_lookup: List[Tuple[float, float]] = [] # [(y_ratio, tilt), ...] sorted by y_ratio
- # tilt偏移补偿(度),正值=向下补偿,从PTZ_CONFIG读取
- from config import PTZ_CONFIG
- self.tilt_offset_deg = PTZ_CONFIG.get('tilt_offset', 0)
- self.pan_offset_deg = PTZ_CONFIG.get('pan_offset', 0)
- self.pan_edge_offset = PTZ_CONFIG.get('pan_edge_offset', 0)
- self.pan_curve_power = PTZ_CONFIG.get('pan_curve_power', 1.0)
- # tilt线性映射(替代不稳定的查找表)
- self.tilt_linear_enabled = PTZ_CONFIG.get('tilt_linear_enabled', False)
- self.tilt_y0 = PTZ_CONFIG.get('tilt_y0', 0)
- self.tilt_y1 = PTZ_CONFIG.get('tilt_y1', 45)
- self.tilt_curve_power = PTZ_CONFIG.get('tilt_curve_power', 1.0)
- # 校准配置
- self.stabilize_time = 1.0
- self.use_motion_detection = True
- self.use_feature_matching = True
- # 重叠发现配置
- self.overlap_pan_range = (0, 360)
- self.overlap_tilt_range = (-20, 50)
- self.overlap_pan_step = 20
- self.overlap_tilt_step = 15
- self.max_overlap_ranges = 3
- self.min_positions_per_range = 3
- # 回调
- self.on_progress: Optional[Callable[[int, int, str], None]] = None
- self.on_complete: Optional[Callable[[CalibrationResult], None]] = None
- # 发现的重叠区间
- self.overlap_ranges: List[OverlapRange] = []
- def _angular_diff(self, a: float, b: float) -> float:
- """计算两个角度之间的最小差值,考虑360°环绕"""
- diff = a - b
- while diff > 180:
- diff -= 360
- while diff < -180:
- diff += 360
- return diff
- def _unwrap_pan_angles(self, pan_values: np.ndarray) -> np.ndarray:
- """
- 将pan角度展开为连续值,避免0°/360°边界的不连续性
- 使用中位数作为参考点,将所有角度调整到参考点的±180°范围内。
- 这样即使校准点跨越0°/360°边界,也能正确拟合线性变换。
- 例如: [350, 355, 5, 10] → [-10, -5, 5, 10] (ref=5)
- """
- if len(pan_values) == 0:
- return pan_values
- ref = float(np.median(pan_values))
- unwrapped = pan_values.astype(float).copy()
- for i in range(len(unwrapped)):
- diff = unwrapped[i] - ref
- while diff > 180:
- unwrapped[i] -= 360
- diff = unwrapped[i] - ref
- while diff < -180:
- unwrapped[i] += 360
- diff = unwrapped[i] - ref
- return unwrapped
- def calibrate(self, quick_mode: bool = True) -> CalibrationResult:
- """
- 执行校准 - 两阶段流程
-
- 阶段1: 视野重叠发现 - 扫描球机范围,找出与全景有重叠的角度区间
- 阶段2: 精确校准 - 仅在重叠区间内生成校准点,逐一验证后拟合变换
- """
- self.state = CalibrationState.RUNNING
- # ===================== 阶段1: 视野重叠发现 =====================
- logger.info("阶段1: 视野重叠发现 - 确定球机与全景的重叠区域")
- self.overlap_ranges = self.overlap_discovery.discover_overlap_ranges(
- ptz=self.ptz,
- get_panorama_frame=self.get_frame,
- ptz_capture=self.ptz_capture,
- pan_range=self.overlap_pan_range,
- tilt_range=self.overlap_tilt_range,
- pan_step=self.overlap_pan_step,
- tilt_step=self.overlap_tilt_step,
- stabilize_time=self.stabilize_time,
- on_progress=self.on_progress,
- max_ranges=self.max_overlap_ranges,
- min_positions_per_range=self.min_positions_per_range
- )
- if not self.overlap_ranges:
- self.state = CalibrationState.FAILED
- self.result = CalibrationResult(
- success=False,
- points=[],
- error_message="未发现球机与全景的视野重叠区域,无法校准。请检查两台摄像头的安装位置和朝向。"
- )
- logger.error(f"校准失败: {self.result.error_message}")
- if self.on_complete:
- self.on_complete(self.result)
- return self.result
- logger.info(f"发现 {len(self.overlap_ranges)} 个重叠区间")
- # 保留所有重叠区间用于校准(覆盖更广的视野范围)
- logger.info(f"使用全部 {len(self.overlap_ranges)} 个重叠区间进行校准(覆盖更广视野)")
- for i, r in enumerate(self.overlap_ranges):
- logger.info(f" 区间{i+1}: pan=[{r.pan_start:.0f}°, {r.pan_end:.0f}°], "
- f"tilt=[{r.tilt_start:.0f}°, {r.tilt_end:.0f}°], 匹配点={r.match_count}")
- # ===================== 阶段2: 使用阶段1扫描数据 + 补充校准 =====================
- # 阶段1已对整个视野扫描并记录了(pan, tilt) → (x_ratio, y_ratio)对应关系
- # 直接使用这些数据比阶段2重新在单个区间内采集更全面、更高效
- valid_points = []
- # 直接从阶段1扫描结果构建校准点
- scan_results = getattr(self.overlap_discovery, 'scan_results', [])
- if scan_results:
- logger.info(f"使用阶段1扫描数据: {len(scan_results)}个有效匹配位置")
- for pan, tilt, match_count, x_ratio, y_ratio in scan_results:
- valid_points.append(CalibrationPoint(
- pan=pan, tilt=tilt, zoom=1.0,
- x_ratio=x_ratio, y_ratio=y_ratio,
- detected=True, match_count=match_count
- ))
- else:
- logger.warning("阶段1无扫描数据,回退到阶段2逐点校准")
- # 如果扫描数据不足,补充在重叠区内采集更多点
- min_scan_points = 8
- if len(valid_points) < min_scan_points:
- logger.info(f"扫描数据不足({len(valid_points)}<{min_scan_points}),在重叠区间内补充采集")
- supplement_points = self._generate_points_in_overlaps(quick_mode)
- total_supplement = len(supplement_points)
- supplement_valid = 0
- for idx, point in enumerate(supplement_points):
- if self.on_progress:
- self.on_progress(idx + 1, total_supplement,
- f"补充校准点 {idx + 1}/{total_supplement}: pan={point.pan:.1f}°, tilt={point.tilt:.1f}°")
- logger.info(f"补充校准点 {idx + 1}/{total_supplement}: pan={point.pan:.1f}°, tilt={point.tilt:.1f}°")
- # 获取移动前全景帧
- frames_before_list = []
- for _ in range(3):
- frame = self.get_frame()
- if frame is not None:
- frames_before_list.append(frame)
- time.sleep(0.1)
- if not frames_before_list:
- continue
- frames_before = np.mean(frames_before_list, axis=0).astype(np.uint8)
- # 移动球机
- if not self.ptz.goto_exact_position(point.pan, point.tilt, 1):
- continue
- time.sleep(self.stabilize_time)
- # 获取移动后帧
- frames_after_list = []
- for _ in range(3):
- frame = self.get_frame()
- if frame is not None:
- frames_after_list.append(frame)
- time.sleep(0.1)
- if not frames_after_list:
- continue
- panorama_frame = np.mean(frames_after_list, axis=0).astype(np.uint8)
- # 球机抓拍
- ptz_frame = None
- if self.ptz_capture:
- try:
- ptz_frame = self.ptz_capture()
- except Exception:
- pass
- # 特征匹配验证
- if ptz_frame is not None and panorama_frame is not None:
- success, match_count, cx, cy = self.overlap_discovery.match_frames(ptz_frame, panorama_frame)
- if success:
- h, w = panorama_frame.shape[:2]
- point.x_ratio = cx / w
- point.y_ratio = cy / h
- point.detected = True
- valid_points.append(point)
- supplement_valid += 1
- logger.info(f"补充点验证通过: {match_count}个匹配点, "
- f"全景位置=({point.x_ratio:.3f}, {point.y_ratio:.3f})")
- continue
- # 运动检测备选
- if self.use_motion_detection and frames_before is not None and panorama_frame is not None:
- motion_result = self.visual_detector.detect_by_motion(frames_before, panorama_frame)
- if motion_result:
- point.x_ratio, point.y_ratio = motion_result
- point.detected = True
- valid_points.append(point)
- supplement_valid += 1
- logger.info(f"运动检测定位: ({point.x_ratio:.3f}, {point.y_ratio:.3f})")
- logger.info(f"补充采集: {supplement_valid}/{total_supplement} 个点验证通过")
- # ===================== 检查有效校准点 =====================
- min_valid = 4
- if len(valid_points) < min_valid:
- self.state = CalibrationState.FAILED
- self.result = CalibrationResult(
- success=False,
- points=valid_points,
- error_message=f"有效校准点不足 (需要至少{min_valid}个, 实际{len(valid_points)}个)。"
- f"请检查球机与全景的视野重叠是否足够。"
- )
- logger.error(f"校准失败: {self.result.error_message}")
- if self.on_complete:
- self.on_complete(self.result)
- return self.result
- # ===================== 计算变换参数 =====================
- success = self._calculate_transform(valid_points)
- if success:
- # 构建分段线性查找表(主变换方法,处理pan环绕)
- lookup_ok = self._build_lookup_tables(valid_points)
- self.state = CalibrationState.SUCCESS
- rms_error = self._calculate_rms_error(valid_points)
- self.result = CalibrationResult(
- success=True,
- points=valid_points,
- rms_error=rms_error
- )
- logger.info(f"校准成功! 有效校准点: {len(valid_points)}, "
- f"重叠区间数: {len(self.overlap_ranges)}, RMS误差: {rms_error:.4f}°")
- # 校准验证:将球机移到全景画面中心,检查是否指向正确位置
- verify_ok = self._verify_calibration()
- if not verify_ok:
- logger.warning("校准验证未通过,校准结果可能不准确")
- # 自动保存校准结果
- try:
- from config import CALIBRATION_CONFIG
- if CALIBRATION_CONFIG.get('auto_save', True):
- filepath = CALIBRATION_CONFIG.get('calibration_file', 'calibration.json')
- self.save_calibration(filepath)
- except Exception:
- pass
- # 校准完成后,将球机复位到初始位置
- self._reset_ptz_position()
- else:
- self.state = CalibrationState.FAILED
- self.result = CalibrationResult(
- success=False,
- points=valid_points,
- error_message="变换参数计算失败"
- )
- logger.error(f"校准失败: {self.result.error_message}")
- if self.on_complete:
- self.on_complete(self.result)
- return self.result
- def _reset_ptz_position(self):
- """校准完成后将球机复位到初始位置"""
- if self.ptz is None:
- return
- try:
- # 获取默认位置配置
- from config import PTZ_CONFIG
- default_pan = PTZ_CONFIG.get('default_pan', 0)
- default_tilt = PTZ_CONFIG.get('default_tilt', 0)
- default_zoom = PTZ_CONFIG.get('default_zoom', 1)
- logger.info(f"球机复位到位置: pan={default_pan}, tilt={default_tilt}, zoom={default_zoom}")
- self.ptz.goto_exact_position(default_pan, default_tilt, default_zoom)
- time.sleep(0.5)
- except Exception as e:
- logger.warning(f"球机复位失败: {e}")
- def _verify_calibration(self) -> bool:
- """
- 校准验证:将球机移到全景画面中心对应的PTZ角度,
- 通过特征匹配验证球机是否指向了全景画面中心区域。
- 同时验证全景画面中的多个关键位置(左、中、右),
- 确保变换在整个视野范围内基本正确。
- Returns:
- 验证是否通过
- """
- logger.info("=" * 50)
- logger.info("校准验证: 将球机移到全景画面中心位置")
- logger.info("=" * 50)
- if self.ptz is None or self.get_frame is None:
- logger.warning("无法执行校准验证: PTZ或全景帧获取函数不可用")
- return False
- # 验证位置列表:全景画面中的关键位置
- verify_positions = [
- ("全景中心", 0.5, 0.5),
- ("全景左侧", 0.25, 0.5),
- ("全景右侧", 0.75, 0.5),
- ]
- passed = 0
- total = len(verify_positions)
- for name, x_ratio, y_ratio in verify_positions:
- pan, tilt = self.transform(x_ratio, y_ratio)
- logger.info(f"验证 {name} ({x_ratio:.2f}, {y_ratio:.2f}) → "
- f"PTZ角度: pan={pan:.1f}°, tilt={tilt:.1f}°")
- # 检查角度是否在合理范围
- if pan < -10 or pan > 370 or tilt < -95 or tilt > 95:
- logger.warning(f" 变换结果异常: pan={pan:.1f}°, tilt={tilt:.1f}° 超出合理范围")
- continue
- # 移动球机到计算出的位置
- if not self.ptz.goto_exact_position(pan, tilt, 1):
- logger.warning(f" 移动球机失败")
- continue
- time.sleep(self.stabilize_time)
- # 获取全景帧和球机帧
- panorama_frame = self.get_frame()
- ptz_frame = self.ptz_capture() if self.ptz_capture else None
- if panorama_frame is None or ptz_frame is None:
- logger.warning(f" 获取帧失败: 全景={'OK' if panorama_frame is not None else '失败'}, "
- f"球机={'OK' if ptz_frame is not None else '失败'}")
- continue
- # 特征匹配验证
- success, match_count, cx, cy = self.overlap_discovery.match_frames(
- ptz_frame, panorama_frame
- )
- h, w = panorama_frame.shape[:2]
- match_x_ratio = cx / w
- match_y_ratio = cy / h
- # 计算期望位置与实际匹配位置的偏差
- position_error = math.sqrt(
- (match_x_ratio - x_ratio) ** 2 + (match_y_ratio - y_ratio) ** 2
- )
- if success:
- logger.info(f" 匹配成功: {match_count}个特征点, "
- f"匹配位置=({match_x_ratio:.3f}, {match_y_ratio:.3f}), "
- f"期望位置=({x_ratio:.3f}, {y_ratio:.3f}), "
- f"位置偏差={position_error:.3f}")
- if position_error < 0.15:
- passed += 1
- logger.info(f" 验证通过 (偏差 < 15%)")
- else:
- logger.warning(f" 验证偏差较大 ({position_error:.1%}),校准精度可能不足")
- else:
- logger.warning(f" 特征匹配不足({match_count}点), "
- f"球机可能未指向全景画面中期望的位置")
- logger.info(f"校准验证结果: {passed}/{total} 个位置验证通过")
- if passed == 0:
- logger.error("所有验证位置均未通过,校准结果可能完全错误!请检查:")
- logger.error(" 1. 球机安装方向配置是否正确 (mount_type, pan_flip, tilt_flip)")
- logger.error(" 2. 两台摄像头的相对位置是否合理")
- logger.error(" 3. 球机PTZ角度范围是否配置正确")
- return False
- if passed < total:
- logger.warning(f"部分验证未通过,校准精度可能有限")
- return True
- return True
- def _generate_points_in_overlaps(self, quick_mode: bool = True) -> List[CalibrationPoint]:
- """
- 在发现的重叠区间内生成校准点
- 只在球机和全景有视觉重叠的区域生成点
- """
- points = []
- if quick_mode:
- # 快速模式: 每个重叠区间内生成9个点(3x3网格)
- for overlap in self.overlap_ranges:
- # 在区间中心生成点
- pan_center = (overlap.pan_start + overlap.pan_end) / 2
- tilt_center = (overlap.tilt_start + overlap.tilt_end) / 2
- pan_span = overlap.pan_end - overlap.pan_start
- tilt_span = overlap.tilt_end - overlap.tilt_start
- # 3x3网格分布
- pan_positions = [0.25, 0.5, 0.75] if pan_span > 10 else [0.5]
- tilt_positions = [0.25, 0.5, 0.75] if tilt_span > 10 else [0.5]
- for pf in pan_positions:
- for tf in tilt_positions:
- points.append(CalibrationPoint(
- pan=overlap.pan_start + pan_span * pf,
- tilt=overlap.tilt_start + tilt_span * tf,
- zoom=1.0))
- else:
- # 完整模式: 在每个重叠区间内均匀分布
- grid_size = 5
- for overlap in self.overlap_ranges:
- for i in range(grid_size):
- for j in range(grid_size):
- pan = overlap.pan_start + (overlap.pan_end - overlap.pan_start) * i / (grid_size - 1)
- tilt = overlap.tilt_start + (overlap.tilt_end - overlap.tilt_start) * j / (grid_size - 1)
- points.append(CalibrationPoint(pan=pan, tilt=tilt, zoom=1.0))
- return points
- def _calculate_transform(self, points: List[CalibrationPoint]) -> bool:
- """使用RANSAC + 最小二乘法拟合变换参数,剔除异常值"""
- try:
- if len(points) < 4:
- logger.error(f"计算变换参数错误: 有效点不足({len(points)}个)")
- return False
- pan_values = np.array([p.pan for p in points])
- tilt_values = np.array([p.tilt for p in points])
- x_ratios = np.array([p.x_ratio for p in points])
- y_ratios = np.array([p.y_ratio for p in points])
- # 记录原始校准数据便于调试
- logger.info("校准点原始数据:")
- for i, p in enumerate(points):
- logger.info(f" 点{i+1}: pan={p.pan:.1f}°, tilt={p.tilt:.1f}° → "
- f"全景位置=({p.x_ratio:.3f}, {p.y_ratio:.3f})")
- # 展开pan角度避免0°/360°边界不连续性
- pan_unwrapped = self._unwrap_pan_angles(pan_values)
- if not np.allclose(pan_values, pan_unwrapped, atol=0.1):
- logger.info(f"Pan角度展开: 原始={pan_values.tolist()} → 展开后={pan_unwrapped.tolist()}")
- else:
- logger.info("Pan角度无需展开(无0°/360°边界跨越)")
- # RANSAC剔除异常值 (使用展开后的pan)
- inlier_mask = self._ransac_filter(x_ratios, y_ratios, pan_unwrapped, tilt_values)
- inlier_count = np.sum(inlier_mask)
- if inlier_count < 4:
- logger.warning(f"RANSAC后有效点不足({inlier_count}个),使用全部点")
- inlier_mask = np.ones(len(points), dtype=bool)
- inlier_count = np.sum(inlier_mask)
- else:
- logger.info(f"RANSAC: {len(points)}个点中{inlier_count}个内点,"
- f"剔除{len(points) - inlier_count}个异常值")
- # 记录内点数据
- logger.info("RANSAC内点数据:")
- for i, p in enumerate(points):
- if inlier_mask[i]:
- logger.info(f" 点{i+1}: pan={pan_unwrapped[i]:.1f}°(原始={p.pan:.1f}°), "
- f"tilt={p.tilt:.1f}° → ({p.x_ratio:.3f}, {p.y_ratio:.3f})")
- # 用内点拟合完整模型
- A = np.ones((inlier_count, 3))
- A[:, 1] = x_ratios[inlier_mask]
- A[:, 2] = y_ratios[inlier_mask]
- pan_params, _, _, _ = np.linalg.lstsq(A, pan_unwrapped[inlier_mask], rcond=None)
- tilt_params, _, _, _ = np.linalg.lstsq(A, tilt_values[inlier_mask], rcond=None)
- self.pan_offset = pan_params[0]
- self.pan_scale_x = pan_params[1]
- self.pan_scale_y = pan_params[2]
- self.tilt_offset = tilt_params[0]
- self.tilt_scale_x = tilt_params[1]
- self.tilt_scale_y = tilt_params[2]
- # 系数合理性检查
- pan_coeffs_ok = (abs(self.pan_scale_x) < 500 and abs(self.pan_scale_y) < 500)
- tilt_coeffs_ok = (abs(self.tilt_scale_x) < 300 and abs(self.tilt_scale_y) < 300)
- if not (pan_coeffs_ok and tilt_coeffs_ok):
- logger.warning(f"完整模型系数异常: pan_scale_x={self.pan_scale_x:.1f}, "
- f"pan_scale_y={self.pan_scale_y:.1f}, "
- f"tilt_scale_x={self.tilt_scale_x:.1f}, "
- f"tilt_scale_y={self.tilt_scale_y:.1f}")
- logger.info("尝试简化模型: pan仅依赖x, tilt仅依赖y")
- # 简化模型: pan = offset + scale_x * x
- # tilt = offset + scale_y * y
- A_pan = np.ones((inlier_count, 2))
- A_pan[:, 1] = x_ratios[inlier_mask]
- pan_params_s, _, _, _ = np.linalg.lstsq(A_pan, pan_unwrapped[inlier_mask], rcond=None)
- A_tilt = np.ones((inlier_count, 2))
- A_tilt[:, 1] = y_ratios[inlier_mask]
- tilt_params_s, _, _, _ = np.linalg.lstsq(A_tilt, tilt_values[inlier_mask], rcond=None)
- self.pan_offset = pan_params_s[0]
- self.pan_scale_x = pan_params_s[1]
- self.pan_scale_y = 0.0
- self.tilt_offset = tilt_params_s[0]
- self.tilt_scale_x = 0.0
- self.tilt_scale_y = tilt_params_s[1]
- logger.info(f"简化模型: pan={self.pan_offset:.2f} + {self.pan_scale_x:.2f}*x, "
- f"tilt={self.tilt_offset:.2f} + {self.tilt_scale_y:.2f}*y")
- # 验证变换对全景中心的预测是否合理
- center_pan, center_tilt = self.transform(0.5, 0.5)
- logger.info(f"全景中心(0.5,0.5)预测: pan={center_pan:.1f}°, tilt={center_tilt:.1f}°")
- logger.info(f"最终变换参数: pan = {self.pan_offset:.2f} + {self.pan_scale_x:.2f}*x + {self.pan_scale_y:.2f}*y, "
- f"tilt = {self.tilt_offset:.2f} + {self.tilt_scale_x:.2f}*x + {self.tilt_scale_y:.2f}*y")
- return True
- except Exception as e:
- logger.error(f"计算变换参数错误: {e}")
- import traceback
- logger.error(traceback.format_exc())
- return False
- def _ransac_filter(self, x: np.ndarray, y: np.ndarray,
- pan: np.ndarray, tilt: np.ndarray,
- max_iterations: int = 200, threshold: float = 15.0,
- min_samples: int = 4) -> np.ndarray:
- """RANSAC剔除变换拟合中的异常值(pan应已展开为连续值)"""
- n = len(x)
- best_inliers = np.zeros(n, dtype=bool)
- best_inlier_count = 0
- rng = np.random.RandomState(42)
- for _ in range(max_iterations):
- # 随机选min_samples个点
- indices = rng.choice(n, min_samples, replace=False)
- # 用这些点拟合
- A = np.ones((min_samples, 3))
- A[:, 1] = x[indices]
- A[:, 2] = y[indices]
- try:
- pan_params, _, _, _ = np.linalg.lstsq(A, pan[indices], rcond=None)
- tilt_params, _, _, _ = np.linalg.lstsq(A, tilt[indices], rcond=None)
- except np.linalg.LinAlgError:
- continue
- # 计算所有点的误差
- pred_pan = pan_params[0] + pan_params[1] * x + pan_params[2] * y
- pred_tilt = tilt_params[0] + tilt_params[1] * x + tilt_params[2] * y
- # 使用角度差计算pan误差(即使已展开,仍用角度差以防边界情况)
- pan_errors = np.array([self._angular_diff(float(pred_pan[i]), float(pan[i]))
- for i in range(n)])
- tilt_errors = pred_tilt - tilt
- errors = np.sqrt(pan_errors ** 2 + tilt_errors ** 2)
- inliers = errors < threshold
- inlier_count = np.sum(inliers)
- if inlier_count > best_inlier_count:
- best_inlier_count = inlier_count
- best_inliers = inliers
- if best_inlier_count == 0:
- return np.ones(n, dtype=bool)
- return best_inliers
- def _calculate_rms_error(self, points: List[CalibrationPoint]) -> float:
- """计算均方根误差(使用角度差处理pan环绕)"""
- total_error = 0.0
- for p in points:
- pred_pan, pred_tilt = self.transform(p.x_ratio, p.y_ratio)
- # 使用角度差计算pan误差,处理0°/360°环绕
- pan_error = self._angular_diff(pred_pan, p.pan)
- tilt_error = pred_tilt - p.tilt
- error = math.sqrt(pan_error ** 2 + tilt_error ** 2)
- total_error += error ** 2
- return math.sqrt(total_error / len(points))
- def transform(self, x_ratio: float, y_ratio: float) -> Tuple[float, float]:
- """将全景坐标转换为PTZ角度 - 梯形透视补偿"""
- # 优先使用分段线性查找表(pan)
- if self.pan_lookup:
- pan = self._interp_lookup(self.pan_lookup, x_ratio)
- else:
- pan = self.pan_offset + self.pan_scale_x * x_ratio + self.pan_scale_y * y_ratio
- # pan边缘曲线补偿:越靠近边缘补偿越大,中心不补偿
- # 梯形透视:底部(y大)更宽,边缘补偿更大;顶部(y小)更窄,补偿更小
- if self.pan_edge_offset != 0:
- dx = 2 * x_ratio - 1 # -1(左) ~ 0(中) ~ +1(右)
- y_scale = 0.3 + 0.7 * y_ratio # 顶部0.3倍,底部1.0倍
- pan_correction = self.pan_edge_offset * y_scale * math.copysign(abs(dx) ** self.pan_curve_power, dx)
- pan += pan_correction
- # tilt:优先使用曲线映射(查找表tilt数据不稳定),后备查找表
- if self.tilt_linear_enabled:
- tilt = self.tilt_y0 + (self.tilt_y1 - self.tilt_y0) * (y_ratio ** self.tilt_curve_power)
- elif self.tilt_lookup:
- tilt = self._interp_lookup(self.tilt_lookup, y_ratio)
- else:
- tilt = self.tilt_offset + self.tilt_scale_x * x_ratio + self.tilt_scale_y * y_ratio
- return (pan % 360, tilt)
- def _interp_lookup(self, lookup: List[Tuple[float, float]], ratio: float) -> float:
- """分段线性插值"""
- if not lookup:
- return 0.0
- if len(lookup) == 1:
- return lookup[0][1]
- if ratio <= lookup[0][0]:
- return lookup[0][1]
- if ratio >= lookup[-1][0]:
- return lookup[-1][1]
- # 二分查找插入位置
- lo, hi = 0, len(lookup) - 1
- while lo < hi - 1:
- mid = (lo + hi) // 2
- if lookup[mid][0] <= ratio:
- lo = mid
- else:
- hi = mid
- # 线性插值
- x0, v0 = lookup[lo]
- x1, v1 = lookup[hi]
- if abs(x1 - x0) < 1e-10:
- return v0
- t = (ratio - x0) / (x1 - x0)
- return v0 + t * (v1 - v0)
- def _build_lookup_tables(self, points: List[CalibrationPoint]) -> bool:
- """
- 从校准点构建分段线性查找表
- 核心策略:
- 1. 将所有校准点按x_ratio分桶,取匹配点数加权的pan值
- 2. 用最长连续单调子序列(LCMA)过滤假阳性:x_ratio→pan应近似单调
- 3. 处理pan角度环绕
- """
- if len(points) < 3:
- return False
- sorted_by_x = sorted(points, key=lambda p: p.x_ratio)
- # ===== 构建 x_ratio → pan 映射 =====
- grid_size = 0.05
- x_buckets: Dict[float, List[Tuple[float, int]]] = {}
- for p in sorted_by_x:
- x_key = round(p.x_ratio / grid_size) * grid_size
- if x_key not in x_buckets:
- x_buckets[x_key] = []
- match_count = getattr(p, 'match_count', 10)
- x_buckets[x_key].append((p.pan, match_count))
- # 加权中位数
- raw_entries = []
- for x_key in sorted(x_buckets.keys()):
- entries = x_buckets[x_key]
- total_weight = sum(mc for _, mc in entries)
- weighted_pans = []
- for pan, mc in entries:
- weighted_pans.extend([pan] * max(1, mc // 5))
- weighted_pan = float(np.median(weighted_pans))
- raw_entries.append((x_key, weighted_pan, total_weight))
- logger.info(f"Pan原始映射 ({len(raw_entries)} 个x_key):")
- for x, pan, w in raw_entries:
- logger.info(f" x={x:.3f} → pan={pan:.1f}° (weight={w})")
- # 用LCMA过滤:找到最长的近似连续单调子序列
- # pan随x_ratio应该是近似单调递减或递增的
- if len(raw_entries) >= 3:
- filtered = self._filter_continuous_monotonic(raw_entries)
- self.pan_lookup = [(x, pan) for x, pan, w in filtered]
- else:
- self.pan_lookup = [(x, pan % 360) for x, pan, w in raw_entries]
- # ===== 构建 y_ratio → tilt 映射 =====
- # 只使用通过pan过滤的点(x_ratio对应的pan与查找表一致)
- pan_valid_x = set(x for x, _ in self.pan_lookup)
- pan_tolerance = grid_size * 1.5 # 允许在pan有效区域附近的点
- valid_points_for_tilt = []
- for p in sorted_by_x:
- for vx in pan_valid_x:
- if abs(p.x_ratio - vx) <= pan_tolerance:
- valid_points_for_tilt.append(p)
- break
- logger.info(f"Tilt映射使用 {len(valid_points_for_tilt)}/{len(sorted_by_x)} 个经过pan验证的点")
- y_buckets: Dict[float, List[Tuple[float, int]]] = {}
- for p in valid_points_for_tilt:
- y_key = round(p.y_ratio / grid_size) * grid_size
- if y_key not in y_buckets:
- y_buckets[y_key] = []
- match_count = getattr(p, 'match_count', 10)
- y_buckets[y_key].append((p.tilt, match_count))
- tilt_entries = []
- for y_key in sorted(y_buckets.keys()):
- entries = y_buckets[y_key]
- weighted_tilts = []
- for tilt, mc in entries:
- weighted_tilts.extend([tilt] * max(1, mc // 5))
- tilt_median = float(np.median(weighted_tilts))
- tilt_entries.append((y_key, tilt_median))
- self.tilt_lookup = tilt_entries
- # 记录查找表内容
- logger.info(f"Pan查找表 ({len(self.pan_lookup)} 项):")
- for x, pan in self.pan_lookup:
- logger.info(f" x={x:.3f} → pan={pan:.1f}°")
- logger.info(f"Tilt查找表 ({len(self.tilt_lookup)} 项):")
- for y, tilt in self.tilt_lookup:
- logger.info(f" y={y:.3f} → tilt={tilt:.1f}°")
- return True
- def _filter_continuous_monotonic(
- self, entries: List[Tuple[float, float, int]],
- max_step: float = 60.0
- ) -> List[Tuple[float, float, int]]:
- """
- 过滤出最长的连续单调子序列
- x_ratio→pan应该是近似单调的(递增或递减,可能环绕一次)。
- 假阳性匹配会导致pan突然跳变到完全不相关的角度,
- 这个方法通过寻找最长的"步长<max_step"的子序列来过滤。
- Args:
- entries: [(x_key, pan, weight), ...] 已按x_key排序
- max_step: 相邻两个点允许的最大pan差(度)
- Returns:
- 过滤后的entries子集
- """
- n = len(entries)
- if n <= 2:
- return [(x, pan % 360, w) for x, pan, w in entries]
- # 尝试两种方向:pan递减和pan递增
- # 对于递减方向:每个点的pan应比前一个小(允许环绕)
- best_result = []
- for direction in ['decreasing', 'increasing']:
- # 动态规划找最长连续子序列
- # dp[i] = 以entries[i]结尾的最长连续子序列长度
- # parent[i] = 前驱索引
- dp = [1] * n
- parent = [-1] * n
- for i in range(1, n):
- for j in range(i):
- # 检查j→i的pan变化是否合理
- pan_j = entries[j][1]
- pan_i = entries[i][1]
- # 计算角度差(考虑环绕)
- diff = pan_i - pan_j
- while diff > 180:
- diff -= 360
- while diff < -180:
- diff += 360
- # 检查方向
- if direction == 'decreasing':
- ok = diff <= 0 and abs(diff) <= max_step
- else:
- ok = diff >= 0 and abs(diff) <= max_step
- if ok and dp[j] + 1 > dp[i]:
- dp[i] = dp[j] + 1
- parent[i] = j
- # 找最长子序列的终点
- end = max(range(n), key=lambda i: dp[i])
- # 回溯构建子序列
- seq = []
- idx = end
- while idx >= 0:
- seq.append(idx)
- idx = parent[idx]
- seq.reverse()
- # 展开pan角度
- result = self._unwrap_sequence(entries, seq)
- if len(result) > len(best_result):
- best_result = result
- logger.info(f"LCMA过滤: {n}个点 → {len(best_result)}个点 (方向: "
- f"{'递减' if len(best_result) > 0 else '无'})")
- # 如果过滤后太少,放宽条件重试
- if len(best_result) < 3 and n >= 3:
- logger.info("LCMA结果太少,放宽步长限制重试")
- for wider_step in [90, 120, 180]:
- for direction in ['decreasing', 'increasing']:
- dp = [1] * n
- parent = [-1] * n
- for i in range(1, n):
- for j in range(i):
- diff = entries[i][1] - entries[j][1]
- while diff > 180:
- diff -= 360
- while diff < -180:
- diff += 360
- if direction == 'decreasing':
- ok = diff <= 0 and abs(diff) <= wider_step
- else:
- ok = diff >= 0 and abs(diff) <= wider_step
- if ok and dp[j] + 1 > dp[i]:
- dp[i] = dp[j] + 1
- parent[i] = j
- end = max(range(n), key=lambda i: dp[i])
- seq = []
- idx = end
- while idx >= 0:
- seq.append(idx)
- idx = parent[idx]
- seq.reverse()
- result = self._unwrap_sequence(entries, seq)
- if len(result) > len(best_result):
- best_result = result
- if len(best_result) >= 3:
- break
- if not best_result:
- # 全部过滤后为空,使用原始数据
- logger.warning("LCMA过滤后为空,使用原始数据")
- return [(x, pan % 360, w) for x, pan, w in entries]
- return best_result
- def _unwrap_sequence(
- self, entries: List[Tuple[float, float, int]],
- indices: List[int]
- ) -> List[Tuple[float, float, int]]:
- """将子序列的pan角度展开并归一化到[0, 360)"""
- result = []
- prev_unwrapped = None
- for idx in indices:
- x, pan, w = entries[idx]
- if prev_unwrapped is None:
- unwrapped = pan
- else:
- diff = pan - prev_unwrapped
- while diff > 180:
- pan -= 360
- diff = pan - prev_unwrapped
- while diff < -180:
- pan += 360
- diff = pan - prev_unwrapped
- unwrapped = pan
- prev_unwrapped = unwrapped
- result.append((x, unwrapped % 360, w))
- return result
- def inverse_transform(self, pan: float, tilt: float) -> Tuple[float, float]:
- """将PTZ角度转换为全景坐标"""
- # 优先使用查找表的反向查找
- if self.pan_lookup and self.tilt_lookup:
- # 反向查找: pan → x_ratio
- x_ratio = self._reverse_lookup(self.pan_lookup, pan % 360)
- y_ratio = self._reverse_lookup(self.tilt_lookup, tilt)
- return (max(0, min(1, x_ratio)), max(0, min(1, y_ratio)))
- # 后备:线性模型逆变换
- M = np.array([
- [self.pan_scale_x, self.pan_scale_y],
- [self.tilt_scale_x, self.tilt_scale_y]
- ])
- det = np.linalg.det(M)
- if abs(det) < 1e-10:
- x_ratio = (pan - self.pan_offset) / self.pan_scale_x if abs(self.pan_scale_x) > 1e-10 else 0.5
- y_ratio = (tilt - self.tilt_offset) / self.tilt_scale_y if abs(self.tilt_scale_y) > 1e-10 else 0.5
- else:
- M_inv = np.linalg.inv(M)
- offset = np.array([pan - self.pan_offset, tilt - self.tilt_offset])
- result = M_inv @ offset
- x_ratio, y_ratio = result[0], result[1]
- return (max(0, min(1, x_ratio)), max(0, min(1, y_ratio)))
- def _reverse_lookup(self, lookup: List[Tuple[float, float]], value: float) -> float:
- """查找表反向查找:从value找ratio"""
- if not lookup:
- return 0.5
- # 处理pan环绕:找到最接近的段
- best_idx = 0
- best_diff = float('inf')
- for i, (ratio, v) in enumerate(lookup):
- diff = self._angular_diff(value, v)
- if abs(diff) < abs(best_diff):
- best_diff = diff
- best_idx = i
- # 精确定位到最近的两个点之间
- if best_idx == 0:
- return lookup[0][0]
- if best_idx == len(lookup) - 1:
- return lookup[-1][0]
- # 检查前一个和后一个点,选择更近的段
- prev_v = lookup[best_idx - 1][1]
- curr_v = lookup[best_idx][1]
- next_v = lookup[best_idx + 1][1] if best_idx + 1 < len(lookup) else curr_v
- # 在 (best_idx-1, best_idx) 和 (best_idx, best_idx+1) 之间选择
- if abs(self._angular_diff(value, prev_v)) < abs(self._angular_diff(value, next_v)):
- lo, hi = best_idx - 1, best_idx
- else:
- lo, hi = best_idx, best_idx + 1
- x0, v0 = lookup[lo]
- x1, v1 = lookup[hi]
- # 考虑角度环绕
- diff_v = self._angular_diff(v1, v0)
- if abs(diff_v) < 1e-10:
- return (x0 + x1) / 2
- t = self._angular_diff(value, v0) / diff_v
- t = max(0, min(1, t))
- return x0 + t * (x1 - x0)
- def is_calibrated(self) -> bool:
- return self.state == CalibrationState.SUCCESS
- def get_state(self) -> CalibrationState:
- return self.state
- def get_result(self) -> Optional[CalibrationResult]:
- return self.result
- def get_overlap_ranges(self) -> List[OverlapRange]:
- """返回发现的重叠区间"""
- return self.overlap_ranges
- def save_calibration(self, filepath: str) -> bool:
- """保存校准结果"""
- if not self.is_calibrated():
- return False
- try:
- import json
- ptz_config = _get_ptz_config()
- data = {
- 'pan_offset': self.pan_offset,
- 'pan_scale_x': self.pan_scale_x,
- 'pan_scale_y': self.pan_scale_y,
- 'tilt_offset': self.tilt_offset,
- 'tilt_scale_x': self.tilt_scale_x,
- 'tilt_scale_y': self.tilt_scale_y,
- 'rms_error': self.result.rms_error if self.result else 0,
- 'overlap_ranges': [
- {
- 'pan_start': r.pan_start,
- 'pan_end': r.pan_end,
- 'tilt_start': r.tilt_start,
- 'tilt_end': r.tilt_end,
- 'match_count': r.match_count
- }
- for r in self.overlap_ranges
- ],
- # 分段线性查找表
- 'pan_lookup': self.pan_lookup,
- 'tilt_lookup': self.tilt_lookup,
- # 保存安装方向配置
- 'mount_type': ptz_config.get('mount_type', 'wall'),
- 'tilt_flip': ptz_config.get('tilt_flip', False),
- 'pan_flip': ptz_config.get('pan_flip', False),
- }
- with open(filepath, 'w') as f:
- json.dump(data, f, indent=2)
- logger.info(f"校准结果已保存: {filepath}")
- return True
- except Exception as e:
- logger.error(f"保存校准结果失败: {e}")
- return False
- def load_calibration(self, filepath: str) -> bool:
- """加载校准结果"""
- try:
- import json
- with open(filepath, 'r') as f:
- data = json.load(f)
- self.pan_offset = data['pan_offset']
- self.pan_scale_x = data['pan_scale_x']
- self.pan_scale_y = data['pan_scale_y']
- self.tilt_offset = data['tilt_offset']
- self.tilt_scale_x = data['tilt_scale_x']
- self.tilt_scale_y = data['tilt_scale_y']
- # 加载分段线性查找表
- self.pan_lookup = [tuple(p) for p in data.get('pan_lookup', [])]
- self.tilt_lookup = [tuple(t) for t in data.get('tilt_lookup', [])]
- # 加载重叠区间(如果有)
- if 'overlap_ranges' in data:
- self.overlap_ranges = [
- OverlapRange(
- pan_start=r['pan_start'],
- pan_end=r['pan_end'],
- tilt_start=r['tilt_start'],
- tilt_end=r['tilt_end'],
- match_count=r['match_count'],
- panorama_center_x=0,
- panorama_center_y=0
- )
- for r in data['overlap_ranges']
- ]
- self.state = CalibrationState.SUCCESS
- self.result = CalibrationResult(
- success=True,
- points=[],
- rms_error=data.get('rms_error', 0)
- )
-
- # 检查安装方向配置是否匹配
- ptz_config = _get_ptz_config()
- current_mount = ptz_config.get('mount_type', 'wall')
- saved_mount = data.get('mount_type', 'wall')
- if current_mount != saved_mount:
- logger.warning(f"当前安装类型({current_mount})与校准时的({saved_mount})不同,建议重新校准!")
- logger.info(f"校准结果已加载: {filepath}")
- return True
- except FileNotFoundError:
- logger.warning(f"校准文件不存在: {filepath}")
- return False
- except Exception as e:
- logger.error(f"加载校准结果失败: {e}")
- return False
- class CalibrationManager:
- """校准管理器"""
- def __init__(self, calibrator: CameraCalibrator, calibration_file: str = None):
- self.calibrator = calibrator
- # 优先使用传入的路径,否则从配置读取,最后使用默认值
- if calibration_file:
- self.calibration_file = calibration_file
- else:
- try:
- from config import CALIBRATION_CONFIG
- self.calibration_file = CALIBRATION_CONFIG.get(
- 'calibration_file', 'calibration.json'
- )
- except ImportError:
- self.calibration_file = 'calibration.json'
- def auto_calibrate(self, force: bool = False, fallback_on_failure: bool = True) -> CalibrationResult:
- """
- 自动校准
-
- Args:
- force: 是否强制重新校准(不加载已有数据)
- fallback_on_failure: 校准失败时是否回退使用已有数据
-
- Returns:
- 校准结果
- """
- # 检查是否启用加载上次校准数据
- load_on_startup = True # 默认启用
- try:
- from config import CALIBRATION_CONFIG
- load_on_startup = CALIBRATION_CONFIG.get('load_on_startup', True)
- except:
- pass
-
- # 如果不是强制校准,尝试加载已有数据
- if not force and load_on_startup:
- if self.calibrator.load_calibration(self.calibration_file):
- logger.info("使用已有校准结果")
- return self.calibrator.get_result()
- # 执行新校准
- if force:
- logger.info("强制重新校准(不使用已有数据)...")
- elif not load_on_startup:
- logger.info("已禁用加载校准数据,开始新校准...")
- else:
- logger.info("开始自动校准...")
-
- result = self.calibrator.calibrate(quick_mode=True)
- if result.success:
- self.calibrator.save_calibration(self.calibration_file)
- elif fallback_on_failure:
- # 校准失败,尝试回退使用已有数据
- logger.warning("校准失败,尝试回退使用已有校准数据...")
- if self.calibrator.load_calibration(self.calibration_file):
- logger.info("已回退到已有校准数据")
- result = self.calibrator.get_result()
- return result
- def check_calibration(self) -> Tuple[bool, str]:
- """检查校准状态"""
- state = self.calibrator.get_state()
- if state == CalibrationState.SUCCESS:
- result = self.calibrator.get_result()
- overlaps = self.calibrator.get_overlap_ranges()
- overlap_info = f", {len(overlaps)}个重叠区间" if overlaps else ""
- return (True, f"校准有效, RMS误差: {result.rms_error:.4f}°{overlap_info}")
- elif state == CalibrationState.FAILED:
- return (False, "校准失败")
- elif state == CalibrationState.RUNNING:
- return (False, "校准进行中")
- else:
- return (False, "未校准")
|