| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714 |
- """
- 相机校准模块
- 实现全景相机与球机的自动校准
- 建立画面坐标到PTZ角度的映射关系
- 核心改进:先发现视野重叠区域,再在重叠区内校准,
- 避免球机指向与全景画面无重叠的方向导致校准失败。
- """
- import time
- import math
- import threading
- import logging
- import numpy as np
- import cv2
- from typing import List, Tuple, Dict, Optional, Callable
- from dataclasses import dataclass, field
- from enum import Enum
- from ptz_camera import PTZCamera
- logger = logging.getLogger(__name__)
- # 加载PTZ配置
- def _get_ptz_config():
- try:
- from config import PTZ_CONFIG
- return PTZ_CONFIG
- except ImportError:
- return {
- 'mount_type': 'wall',
- 'tilt_flip': False,
- 'pan_flip': False
- }
- class CalibrationState(Enum):
- IDLE = 0
- RUNNING = 1
- SUCCESS = 2
- FAILED = 3
- @dataclass
- class CalibrationPoint:
- pan: float
- tilt: float
- zoom: float = 1.0
- x_ratio: float = 0.0
- y_ratio: float = 0.0
- detected: bool = False
- match_count: int = 0
- @dataclass
- class CalibrationResult:
- success: bool
- points: List[CalibrationPoint]
- transform_matrix: Optional[np.ndarray] = None
- error_message: str = ""
- rms_error: float = 0.0
- @dataclass
- class OverlapRange:
- pan_start: float
- pan_end: float
- tilt_start: float
- tilt_end: float
- match_count: int
- panorama_center_x: float
- panorama_center_y: float
- MIN_MATCH_THRESHOLD = 10 # 最少匹配点数
- LOWE_RATIO = 0.70 # Lowe's ratio test 阈值,越小越严格
- RANSAC_THRESHOLD = 4.0 # RANSAC 单应矩阵内点阈值(像素)
- MIN_INLIERS = 5 # 最少几何内点数
- class OverlapDiscovery:
- """
- 视野重叠发现器
- 扫描球机视野范围,找出与全景画面有视觉重叠的角度区间
- """
- def __init__(self, feature_type: str = 'SIFT'):
- try:
- self.feature_detector = cv2.SIFT_create()
- self.feature_type = 'SIFT'
- except AttributeError:
- self.feature_detector = cv2.ORB_create(nfeatures=500)
- self.feature_type = 'ORB'
- norm_type = cv2.NORM_L2 if self.feature_type == 'SIFT' else cv2.NORM_HAMMING
- self.matcher = cv2.BFMatcher(norm_type)
- def match_frames(self, ptz_frame: np.ndarray, panorama_frame: np.ndarray
- ) -> Tuple[bool, int, float, float]:
- """
- 特征匹配球机画面与全景画面
- Returns: (是否匹配成功, 匹配点数, 全景画面中心x, 全景画面中心y)
- """
- if ptz_frame is None or panorama_frame is None:
- return (False, 0, 0.0, 0.0)
- try:
- ptz_gray = cv2.cvtColor(ptz_frame, cv2.COLOR_BGR2GRAY) if len(ptz_frame.shape) == 3 else ptz_frame
- pan_gray = cv2.cvtColor(panorama_frame, cv2.COLOR_BGR2GRAY) if len(panorama_frame.shape) == 3 else panorama_frame
- # 缩小图像加速特征提取(匹配坐标按比例还原)
- ptz_scale = 1.0
- pan_scale = 1.0
- max_dim = 960
- if ptz_gray.shape[1] > max_dim:
- ptz_scale = max_dim / ptz_gray.shape[1]
- ptz_gray = cv2.resize(ptz_gray, None, fx=ptz_scale, fy=ptz_scale,
- interpolation=cv2.INTER_AREA)
- if pan_gray.shape[1] > max_dim:
- pan_scale = max_dim / pan_gray.shape[1]
- pan_gray = cv2.resize(pan_gray, None, fx=pan_scale, fy=pan_scale,
- interpolation=cv2.INTER_AREA)
- kp1, des1 = self.feature_detector.detectAndCompute(ptz_gray, None)
- kp2, des2 = self.feature_detector.detectAndCompute(pan_gray, None)
- if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
- return (False, 0, 0.0, 0.0)
- matches = self.matcher.knnMatch(des1, des2, k=2)
- good_matches = []
- for match_pair in matches:
- if len(match_pair) == 2:
- m, n = match_pair
- if m.distance < LOWE_RATIO * n.distance:
- good_matches.append(m)
- if len(good_matches) < MIN_MATCH_THRESHOLD:
- return (False, len(good_matches), 0.0, 0.0)
- # 几何验证:用 RANSAC 单应矩阵剔除空间不一致的匹配
- ptz_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches])
- pan_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches])
- try:
- _, mask = cv2.findHomography(ptz_pts, pan_pts, cv2.RANSAC, RANSAC_THRESHOLD)
- inlier_mask = mask.ravel().astype(bool)
- inlier_count = int(np.sum(inlier_mask))
- except Exception:
- inlier_count = 0
- inlier_mask = np.zeros(len(good_matches), dtype=bool)
- if inlier_count < MIN_INLIERS:
- logger.debug(f"几何验证失败: {inlier_count}/{len(good_matches)} 个内点")
- return (False, inlier_count, 0.0, 0.0)
- # 只使用几何一致的内点计算中心
- inlier_pan_pts = pan_pts[inlier_mask]
- center_x = np.mean(inlier_pan_pts[:, 0]) / pan_scale
- center_y = np.mean(inlier_pan_pts[:, 1]) / pan_scale
- logger.debug(f"特征匹配: 原始={len(good_matches)}, 内点={inlier_count}, "
- f"中心=({center_x:.1f}, {center_y:.1f})")
- return (True, inlier_count, center_x, center_y)
- except Exception as e:
- logger.error(f"特征匹配异常: {e}")
- return (False, 0, 0.0, 0.0)
- def discover_overlap_ranges(
- self,
- ptz: PTZCamera,
- get_panorama_frame: Callable[[], np.ndarray],
- ptz_capture: Callable[[], Optional[np.ndarray]],
- pan_range: Tuple[float, float] = (0, 360),
- tilt_range: Tuple[float, float] = (-20, 40),
- pan_step: float = 20,
- tilt_step: float = 15,
- stabilize_time: float = 2.0,
- on_progress: Callable[[int, int, str], None] = None,
- max_ranges: int = 3,
- min_positions_per_range: int = 3
- ) -> List[OverlapRange]:
- """
- 扫描球机视野范围,发现与全景画面有重叠的角度区间
- 1. 先拍一张全景参考帧
- 2. 逐步移动球机到各个角度
- 3. 在每个位置抓拍球机画面,与全景做特征匹配
- 4. 记录有足够匹配点的角度
- 5. 合并相邻的有重叠的角度形成区间
- """
- logger.info(f"阶段1: 视野重叠发现, 扫描范围: pan={pan_range}, tilt={tilt_range}, 步进: pan={pan_step}°, tilt={tilt_step}°")
- # 1. 拍全景参考帧
- logger.info("获取全景参考帧...")
- ref_frames = []
- for _ in range(3):
- frame = get_panorama_frame()
- if frame is not None:
- ref_frames.append(frame)
- time.sleep(0.1)
- if not ref_frames:
- logger.error("无法获取全景参考帧!")
- return []
- panorama_ref = ref_frames[0]
- logger.info(f"全景参考帧: {panorama_ref.shape}")
- # 2. 扫描各个角度
- scan_results: List[Tuple[float, float, int, float, float]] = []
- pan_values = np.arange(pan_range[0], pan_range[1] + pan_step, pan_step)
- tilt_values = np.arange(tilt_range[0], tilt_range[1] + tilt_step, tilt_step)
- total_positions = len(pan_values) * len(tilt_values)
- current_idx = 0
- for pan in pan_values:
- for tilt in tilt_values:
- current_idx += 1
- pos_desc = f"pan={pan:.0f}°, tilt={tilt:.0f}°"
- if on_progress:
- on_progress(current_idx, total_positions, f"扫描 {pos_desc}")
- logger.info(f"[{current_idx}/{total_positions}] {pos_desc}")
- # 移动球机
- if not ptz.goto_exact_position(float(pan), float(tilt), 1):
- logger.warning(f"移动球机失败, 跳过")
- continue
- time.sleep(stabilize_time)
- # 抓拍球机画面
- ptz_frame = ptz_capture() if ptz_capture else None
- if ptz_frame is None:
- logger.warning(f"球机抓拍失败, 跳过")
- continue
- # 获取当前全景帧并匹配
- cur_panorama = get_panorama_frame()
- if cur_panorama is None:
- continue
- success, match_count, cx, cy = self.match_frames(ptz_frame, cur_panorama)
- if success:
- h, w = cur_panorama.shape[:2]
- x_ratio = cx / w
- y_ratio = cy / h
- logger.info(f"匹配成功: {match_count}个特征点, 全景位置=({x_ratio:.3f}, {y_ratio:.3f})")
- scan_results.append((float(pan), float(tilt), match_count, x_ratio, y_ratio))
- else:
- logger.debug(f"匹配不足: {match_count}个特征点")
- if not scan_results:
- logger.warning("未发现任何视野重叠位置!")
- return []
- logger.info(f"发现 {len(scan_results)} 个有重叠的扫描位置")
- # 保存原始扫描结果供后续校准使用
- self.scan_results = scan_results
- # 3. 合并相邻位置为重叠区间
- overlap_ranges = self._merge_scan_results(
- scan_results,
- max_ranges=max_ranges,
- min_positions=min_positions_per_range
- )
- for i, r in enumerate(overlap_ranges):
- logger.info(f"重叠区间 {i+1}: pan=[{r.pan_start:.0f}°, {r.pan_end:.0f}°], "
- f"tilt=[{r.tilt_start:.0f}°, {r.tilt_end:.0f}°], "
- f"匹配点={r.match_count}")
- return overlap_ranges
- def _merge_scan_results(
- self,
- results: List[Tuple[float, float, int, float, float]],
- pan_tolerance: float = 20,
- tilt_tolerance: float = 35,
- max_ranges: int = 3,
- min_positions: int = 2
- ) -> List[OverlapRange]:
- """
- 使用union-find连通分量聚类合并相邻扫描结果
- 只保留最大的 max_ranges 个区间
- """
- if not results:
- return []
- n = len(results)
- # union-find
- parent = list(range(n))
- def find(x):
- while parent[x] != x:
- parent[x] = parent[parent[x]]
- x = parent[x]
- return x
- def union(a, b):
- ra, rb = find(a), find(b)
- if ra != rb:
- parent[ra] = rb
- # 判断两点是否相邻
- for i in range(n):
- for j in range(i + 1, n):
- pi, ti = results[i][0], results[i][1]
- pj, tj = results[j][0], results[j][1]
- if abs(pi - pj) <= pan_tolerance and abs(ti - tj) <= tilt_tolerance:
- union(i, j)
- # 按连通分量分组
- groups: Dict[int, List[int]] = {}
- for i in range(n):
- root = find(i)
- if root not in groups:
- groups[root] = []
- groups[root].append(i)
- # 转换为OverlapRange,过滤太小的组
- ranges = []
- for indices in groups.values():
- if len(indices) < min_positions:
- continue
- group_data = [results[i] for i in indices]
- ranges.append(self._group_to_range(group_data))
- # 按match_count降序排序,只保留最大的 max_ranges 个
- ranges.sort(key=lambda r: r.match_count, reverse=True)
- ranges = ranges[:max_ranges]
- # 按pan_start排序输出
- ranges.sort(key=lambda r: r.pan_start)
- return ranges
- def _group_to_range(self, group: List[Tuple[float, float, int, float, float]]) -> OverlapRange:
- """将一组扫描结果转换为一个OverlapRange"""
- pans = [r[0] for r in group]
- tilts = [r[1] for r in group]
- match_counts = [r[2] for r in group]
- x_ratios = [r[3] for r in group]
- y_ratios = [r[4] for r in group]
- step = 5 # 在边缘各扩展5度
- return OverlapRange(
- pan_start=min(pans) - step,
- pan_end=max(pans) + step,
- tilt_start=min(tilts) - step,
- tilt_end=max(tilts) + step,
- match_count=max(match_counts),
- panorama_center_x=float(np.mean(x_ratios)),
- panorama_center_y=float(np.mean(y_ratios))
- )
- class VisualCalibrationDetector:
- """
- 视觉校准检测器
- 通过运动检测和特征匹配定位球机在全景画面中的位置
- """
- def __init__(self):
- try:
- self.feature_detector = cv2.SIFT_create()
- self.feature_type = 'SIFT'
- except AttributeError:
- self.feature_detector = cv2.ORB_create(nfeatures=500)
- self.feature_type = 'ORB'
- self.matcher = cv2.BFMatcher(
- cv2.NORM_L2 if self.feature_type == 'SIFT' else cv2.NORM_HAMMING
- )
- self.use_motion_detection = True
- self.use_feature_matching = True
- def detect_by_motion(self, frames_before: np.ndarray,
- frames_after: np.ndarray) -> Optional[Tuple[float, float]]:
- """通过运动检测定位球机指向位置"""
- if frames_before is None or frames_after is None:
- return None
- before_gray = cv2.cvtColor(frames_before, cv2.COLOR_BGR2GRAY) \
- if len(frames_before.shape) == 3 else frames_before
- after_gray = cv2.cvtColor(frames_after, cv2.COLOR_BGR2GRAY) \
- if len(frames_after.shape) == 3 else frames_after
- diff = cv2.absdiff(before_gray, after_gray)
- _, thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
- kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15))
- thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
- thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
- contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
- if not contours:
- return None
- max_contour = max(contours, key=cv2.contourArea)
- area = cv2.contourArea(max_contour)
- if area < 500:
- return None
- M = cv2.moments(max_contour)
- if M["m00"] == 0:
- return None
- cx = M["m10"] / M["m00"]
- cy = M["m01"] / M["m00"]
- h, w = before_gray.shape
- logger.debug(f"运动检测: 中心=({cx:.1f}, {cy:.1f}), 面积={area:.0f})")
- return (cx / w, cy / h)
- def detect_by_feature_match(self, panorama_frame: np.ndarray,
- ptz_frame: np.ndarray) -> Optional[Tuple[float, float]]:
- """通过特征匹配定位"""
- if panorama_frame is None or ptz_frame is None:
- return None
- try:
- pan_gray = cv2.cvtColor(panorama_frame, cv2.COLOR_BGR2GRAY) \
- if len(panorama_frame.shape) == 3 else panorama_frame
- ptz_gray = cv2.cvtColor(ptz_frame, cv2.COLOR_BGR2GRAY) \
- if len(ptz_frame.shape) == 3 else ptz_frame
- # 缩小图像加速
- max_dim = 960
- ptz_scale = 1.0
- pan_scale = 1.0
- if ptz_gray.shape[1] > max_dim:
- ptz_scale = max_dim / ptz_gray.shape[1]
- ptz_gray = cv2.resize(ptz_gray, None, fx=ptz_scale, fy=ptz_scale,
- interpolation=cv2.INTER_AREA)
- if pan_gray.shape[1] > max_dim:
- pan_scale = max_dim / pan_gray.shape[1]
- pan_gray = cv2.resize(pan_gray, None, fx=pan_scale, fy=pan_scale,
- interpolation=cv2.INTER_AREA)
- kp1, des1 = self.feature_detector.detectAndCompute(ptz_gray, None)
- kp2, des2 = self.feature_detector.detectAndCompute(pan_gray, None)
- if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
- return None
- matches = self.matcher.knnMatch(des1, des2, k=2)
- good_matches = []
- for match_pair in matches:
- if len(match_pair) == 2:
- m, n = match_pair
- if m.distance < LOWE_RATIO * n.distance:
- good_matches.append(m)
- if len(good_matches) < MIN_MATCH_THRESHOLD:
- return None
- # 几何验证:用 RANSAC 单应矩阵剔除空间不一致的匹配
- ptz_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches])
- pan_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches])
- try:
- _, mask = cv2.findHomography(ptz_pts, pan_pts, cv2.RANSAC, RANSAC_THRESHOLD)
- inlier_mask = mask.ravel().astype(bool)
- inlier_count = int(np.sum(inlier_mask))
- except Exception:
- return None
- if inlier_count < MIN_INLIERS:
- return None
- inlier_pan_pts = pan_pts[inlier_mask]
- center_x = np.mean(inlier_pan_pts[:, 0])
- center_y = np.mean(inlier_pan_pts[:, 1])
- h, w = pan_gray.shape
- logger.debug(f"特征匹配: 匹配点={len(good_matches)}, 内点={inlier_count}, "
- f"中心=({center_x:.1f}, {center_y:.1f})")
- return (center_x / w, center_y / h)
- except Exception as e:
- logger.error(f"特征匹配错误: {e}")
- return None
- def detect_position(self, panorama_frame: np.ndarray,
- frames_before: np.ndarray = None,
- frames_after: np.ndarray = None,
- ptz_frame: np.ndarray = None) -> Tuple[bool, float, float]:
- """综合检测球机在全景画面中的位置"""
- results = []
- if self.use_motion_detection and frames_before is not None and frames_after is not None:
- motion_result = self.detect_by_motion(frames_before, frames_after)
- if motion_result:
- results.append(('motion', motion_result, 0.4))
- if self.use_feature_matching and ptz_frame is not None:
- feature_result = self.detect_by_feature_match(panorama_frame, ptz_frame)
- if feature_result:
- results.append(('feature', feature_result, 0.6))
- if not results:
- return (False, 0.0, 0.0)
- total_weight = sum(r[2] for r in results)
- x_ratio = sum(r[1][0] * r[2] for r in results) / total_weight
- y_ratio = sum(r[1][1] * r[2] for r in results) / total_weight
- logger.debug(f"融合结果: ({x_ratio:.3f}, {y_ratio:.3f})")
- return (True, x_ratio, y_ratio)
- class CameraCalibrator:
- """
- 相机校准器
- 两阶段校准:先发现视野重叠区域,再在重叠区内校准
- """
- def __init__(self, ptz_camera: PTZCamera,
- get_frame_func: Callable[[], np.ndarray],
- detect_marker_func: Callable[[np.ndarray], Optional[Tuple[float, float]]] = None,
- ptz_capture_func: Callable[[], Optional[np.ndarray]] = None):
- self.ptz = ptz_camera
- self.get_frame = get_frame_func
- self.detect_marker = detect_marker_func
- self.ptz_capture = ptz_capture_func
- self.visual_detector = VisualCalibrationDetector()
- self.overlap_discovery = OverlapDiscovery()
- self.state = CalibrationState.IDLE
- self.result: Optional[CalibrationResult] = None
- # 变换参数 (线性模型 - 作为后备)
- self.pan_offset = 0.0
- self.pan_scale_x = 1.0
- self.pan_scale_y = 0.0
- self.tilt_offset = 0.0
- self.tilt_scale_x = 0.0
- self.tilt_scale_y = 1.0
- # 分段线性查找表 (主变换方法)
- # 存储 x_ratio → pan 和 y_ratio → tilt 的映射
- self.pan_lookup: List[Tuple[float, float]] = [] # [(x_ratio, pan), ...] sorted by x_ratio
- self.tilt_lookup: List[Tuple[float, float]] = [] # [(y_ratio, tilt), ...] sorted by y_ratio
- # tilt偏移补偿(度),正值=向下补偿,优先从传入的球机配置读取
- ptz_config = getattr(ptz_camera, 'ptz_config', None)
- if ptz_config is None:
- from config import PTZ_CONFIG
- ptz_config = PTZ_CONFIG
- self.tilt_offset_deg = ptz_config.get('tilt_offset', 0)
- self.pan_offset_deg = ptz_config.get('pan_offset', 0)
- self.pan_edge_offset = ptz_config.get('pan_edge_offset', 0)
- self.pan_curve_power = ptz_config.get('pan_curve_power', 1.0)
- # tilt线性映射(替代不稳定的查找表)
- self.tilt_linear_enabled = ptz_config.get('tilt_linear_enabled', False)
- self.tilt_y0 = ptz_config.get('tilt_y0', 0)
- self.tilt_y1 = ptz_config.get('tilt_y1', 45)
- self.tilt_curve_power = ptz_config.get('tilt_curve_power', 1.0)
- # 校准配置
- self.stabilize_time = 1.0
- self.use_motion_detection = True
- self.use_feature_matching = True
- # 重叠发现配置(可从 ptz_config 覆盖,避免在无效方向浪费扫描时间)
- self.overlap_pan_range = ptz_config.get('overlap_pan_range', (0, 360))
- self.overlap_tilt_range = ptz_config.get('overlap_tilt_range', (-20, 50))
- self.overlap_pan_step = ptz_config.get('overlap_pan_step', 20)
- self.overlap_tilt_step = ptz_config.get('overlap_tilt_step', 15)
- self.max_overlap_ranges = 3
- self.min_positions_per_range = 3
- # 回调
- self.on_progress: Optional[Callable[[int, int, str], None]] = None
- self.on_complete: Optional[Callable[[CalibrationResult], None]] = None
- # 发现的重叠区间
- self.overlap_ranges: List[OverlapRange] = []
- def _angular_diff(self, a: float, b: float) -> float:
- """计算两个角度之间的最小差值,考虑360°环绕"""
- diff = a - b
- while diff > 180:
- diff -= 360
- while diff < -180:
- diff += 360
- return diff
- def _unwrap_pan_angles(self, pan_values: np.ndarray) -> np.ndarray:
- """
- 将pan角度展开为连续值,避免0°/360°边界的不连续性
- 使用中位数作为参考点,将所有角度调整到参考点的±180°范围内。
- 这样即使校准点跨越0°/360°边界,也能正确拟合线性变换。
- 例如: [350, 355, 5, 10] → [-10, -5, 5, 10] (ref=5)
- """
- if len(pan_values) == 0:
- return pan_values
- ref = float(np.median(pan_values))
- unwrapped = pan_values.astype(float).copy()
- for i in range(len(unwrapped)):
- diff = unwrapped[i] - ref
- while diff > 180:
- unwrapped[i] -= 360
- diff = unwrapped[i] - ref
- while diff < -180:
- unwrapped[i] += 360
- diff = unwrapped[i] - ref
- return unwrapped
- def calibrate(self, quick_mode: bool = True) -> CalibrationResult:
- """
- 执行校准 - 两阶段流程
-
- 阶段1: 视野重叠发现 - 扫描球机范围,找出与全景有重叠的角度区间
- 阶段2: 精确校准 - 仅在重叠区间内生成校准点,逐一验证后拟合变换
- """
- self.state = CalibrationState.RUNNING
- # ===================== 阶段1: 视野重叠发现 =====================
- logger.info("阶段1: 视野重叠发现 - 确定球机与全景的重叠区域")
- self.overlap_ranges = self.overlap_discovery.discover_overlap_ranges(
- ptz=self.ptz,
- get_panorama_frame=self.get_frame,
- ptz_capture=self.ptz_capture,
- pan_range=self.overlap_pan_range,
- tilt_range=self.overlap_tilt_range,
- pan_step=self.overlap_pan_step,
- tilt_step=self.overlap_tilt_step,
- stabilize_time=self.stabilize_time,
- on_progress=self.on_progress,
- max_ranges=self.max_overlap_ranges,
- min_positions_per_range=self.min_positions_per_range
- )
- if not self.overlap_ranges:
- self.state = CalibrationState.FAILED
- self.result = CalibrationResult(
- success=False,
- points=[],
- error_message="未发现球机与全景的视野重叠区域,无法校准。请检查两台摄像头的安装位置和朝向。"
- )
- logger.error(f"校准失败: {self.result.error_message}")
- if self.on_complete:
- self.on_complete(self.result)
- return self.result
- logger.info(f"发现 {len(self.overlap_ranges)} 个重叠区间")
- # 保留所有重叠区间用于校准(覆盖更广的视野范围)
- logger.info(f"使用全部 {len(self.overlap_ranges)} 个重叠区间进行校准(覆盖更广视野)")
- for i, r in enumerate(self.overlap_ranges):
- logger.info(f" 区间{i+1}: pan=[{r.pan_start:.0f}°, {r.pan_end:.0f}°], "
- f"tilt=[{r.tilt_start:.0f}°, {r.tilt_end:.0f}°], 匹配点={r.match_count}")
- # ===================== 阶段2: 使用阶段1扫描数据 + 补充校准 =====================
- # 阶段1已对整个视野扫描并记录了(pan, tilt) → (x_ratio, y_ratio)对应关系
- # 直接使用这些数据比阶段2重新在单个区间内采集更全面、更高效
- valid_points = []
- # 直接从阶段1扫描结果构建校准点
- scan_results = getattr(self.overlap_discovery, 'scan_results', [])
- if scan_results:
- logger.info(f"使用阶段1扫描数据: {len(scan_results)}个有效匹配位置")
- for pan, tilt, match_count, x_ratio, y_ratio in scan_results:
- valid_points.append(CalibrationPoint(
- pan=pan, tilt=tilt, zoom=1.0,
- x_ratio=x_ratio, y_ratio=y_ratio,
- detected=True, match_count=match_count
- ))
- else:
- logger.warning("阶段1无扫描数据,回退到阶段2逐点校准")
- # 如果扫描数据不足,补充在重叠区内采集更多点
- min_scan_points = 8
- if len(valid_points) < min_scan_points:
- logger.info(f"扫描数据不足({len(valid_points)}<{min_scan_points}),在重叠区间内补充采集")
- supplement_points = self._generate_points_in_overlaps(quick_mode)
- total_supplement = len(supplement_points)
- supplement_valid = 0
- for idx, point in enumerate(supplement_points):
- if self.on_progress:
- self.on_progress(idx + 1, total_supplement,
- f"补充校准点 {idx + 1}/{total_supplement}: pan={point.pan:.1f}°, tilt={point.tilt:.1f}°")
- logger.info(f"补充校准点 {idx + 1}/{total_supplement}: pan={point.pan:.1f}°, tilt={point.tilt:.1f}°")
- # 获取移动前全景帧
- frames_before_list = []
- for _ in range(3):
- frame = self.get_frame()
- if frame is not None:
- frames_before_list.append(frame)
- time.sleep(0.1)
- if not frames_before_list:
- continue
- frames_before = np.mean(frames_before_list, axis=0).astype(np.uint8)
- # 移动球机
- if not self.ptz.goto_exact_position(point.pan, point.tilt, 1):
- continue
- time.sleep(self.stabilize_time)
- # 获取移动后帧
- frames_after_list = []
- for _ in range(3):
- frame = self.get_frame()
- if frame is not None:
- frames_after_list.append(frame)
- time.sleep(0.1)
- if not frames_after_list:
- continue
- panorama_frame = np.mean(frames_after_list, axis=0).astype(np.uint8)
- # 球机抓拍
- ptz_frame = None
- if self.ptz_capture:
- try:
- ptz_frame = self.ptz_capture()
- except Exception:
- pass
- # 特征匹配验证
- if ptz_frame is not None and panorama_frame is not None:
- success, match_count, cx, cy = self.overlap_discovery.match_frames(ptz_frame, panorama_frame)
- if success:
- h, w = panorama_frame.shape[:2]
- point.x_ratio = cx / w
- point.y_ratio = cy / h
- point.detected = True
- valid_points.append(point)
- supplement_valid += 1
- logger.info(f"补充点验证通过: {match_count}个匹配点, "
- f"全景位置=({point.x_ratio:.3f}, {point.y_ratio:.3f})")
- continue
- # 运动检测备选
- if self.use_motion_detection and frames_before is not None and panorama_frame is not None:
- motion_result = self.visual_detector.detect_by_motion(frames_before, panorama_frame)
- if motion_result:
- point.x_ratio, point.y_ratio = motion_result
- point.detected = True
- valid_points.append(point)
- supplement_valid += 1
- logger.info(f"运动检测定位: ({point.x_ratio:.3f}, {point.y_ratio:.3f})")
- logger.info(f"补充采集: {supplement_valid}/{total_supplement} 个点验证通过")
- # ===================== 检查有效校准点 =====================
- min_valid = 4
- if len(valid_points) < min_valid:
- self.state = CalibrationState.FAILED
- self.result = CalibrationResult(
- success=False,
- points=valid_points,
- error_message=f"有效校准点不足 (需要至少{min_valid}个, 实际{len(valid_points)}个)。"
- f"请检查球机与全景的视野重叠是否足够。"
- )
- logger.error(f"校准失败: {self.result.error_message}")
- if self.on_complete:
- self.on_complete(self.result)
- return self.result
- # ===================== 计算变换参数 =====================
- success = self._calculate_transform(valid_points)
- if success:
- # 构建分段线性查找表(主变换方法,处理pan环绕)
- lookup_ok = self._build_lookup_tables(valid_points)
- self.state = CalibrationState.SUCCESS
- rms_error = self._calculate_rms_error(valid_points)
- self.result = CalibrationResult(
- success=True,
- points=valid_points,
- rms_error=rms_error
- )
- logger.info(f"校准成功! 有效校准点: {len(valid_points)}, "
- f"重叠区间数: {len(self.overlap_ranges)}, RMS误差: {rms_error:.4f}°")
- # 校准验证:将球机移到全景画面中心,检查是否指向正确位置
- verify_ok = self._verify_calibration()
- if not verify_ok:
- logger.warning("校准验证未通过,校准结果可能不准确")
- # 自动保存校准结果
- try:
- from config import CALIBRATION_CONFIG
- if CALIBRATION_CONFIG.get('auto_save', True):
- filepath = CALIBRATION_CONFIG.get('calibration_file', 'calibration.json')
- self.save_calibration(filepath)
- except Exception:
- pass
- # 校准完成后,将球机复位到初始位置
- self._reset_ptz_position()
- else:
- self.state = CalibrationState.FAILED
- self.result = CalibrationResult(
- success=False,
- points=valid_points,
- error_message="变换参数计算失败"
- )
- logger.error(f"校准失败: {self.result.error_message}")
- if self.on_complete:
- self.on_complete(self.result)
- return self.result
- def _reset_ptz_position(self):
- """校准完成后将球机复位到初始位置"""
- if self.ptz is None:
- return
- try:
- # 获取默认位置配置,优先从传入的球机配置读取
- ptz_config = getattr(self.ptz, 'ptz_config', None)
- if ptz_config is None:
- from config import PTZ_CONFIG
- ptz_config = PTZ_CONFIG
- default_pan = ptz_config.get('default_pan', 0)
- default_tilt = ptz_config.get('default_tilt', 0)
- default_zoom = ptz_config.get('default_zoom', 1)
- logger.info(f"球机复位到位置: pan={default_pan}, tilt={default_tilt}, zoom={default_zoom}")
- self.ptz.goto_exact_position(default_pan, default_tilt, default_zoom)
- time.sleep(0.5)
- except Exception as e:
- logger.warning(f"球机复位失败: {e}")
- def _verify_calibration(self) -> bool:
- """
- 校准验证:将球机移到全景画面中心对应的PTZ角度,
- 通过特征匹配验证球机是否指向了全景画面中心区域。
- 同时验证全景画面中的多个关键位置(左、中、右),
- 确保变换在整个视野范围内基本正确。
- Returns:
- 验证是否通过
- """
- logger.info("=" * 50)
- logger.info("校准验证: 将球机移到全景画面中心位置")
- logger.info("=" * 50)
- if self.ptz is None or self.get_frame is None:
- logger.warning("无法执行校准验证: PTZ或全景帧获取函数不可用")
- return False
- # 验证位置列表:全景画面中的关键位置
- verify_positions = [
- ("全景中心", 0.5, 0.5),
- ("全景左侧", 0.25, 0.5),
- ("全景右侧", 0.75, 0.5),
- ]
- passed = 0
- total = len(verify_positions)
- for name, x_ratio, y_ratio in verify_positions:
- pan, tilt = self.transform(x_ratio, y_ratio)
- logger.info(f"验证 {name} ({x_ratio:.2f}, {y_ratio:.2f}) → "
- f"PTZ角度: pan={pan:.1f}°, tilt={tilt:.1f}°")
- # 检查角度是否在合理范围
- if pan < -10 or pan > 370 or tilt < -95 or tilt > 95:
- logger.warning(f" 变换结果异常: pan={pan:.1f}°, tilt={tilt:.1f}° 超出合理范围")
- continue
- # 移动球机到计算出的位置
- if not self.ptz.goto_exact_position(pan, tilt, 1):
- logger.warning(f" 移动球机失败")
- continue
- time.sleep(self.stabilize_time)
- # 获取全景帧和球机帧
- panorama_frame = self.get_frame()
- ptz_frame = self.ptz_capture() if self.ptz_capture else None
- if panorama_frame is None or ptz_frame is None:
- logger.warning(f" 获取帧失败: 全景={'OK' if panorama_frame is not None else '失败'}, "
- f"球机={'OK' if ptz_frame is not None else '失败'}")
- continue
- # 特征匹配验证
- success, match_count, cx, cy = self.overlap_discovery.match_frames(
- ptz_frame, panorama_frame
- )
- h, w = panorama_frame.shape[:2]
- match_x_ratio = cx / w
- match_y_ratio = cy / h
- # 计算期望位置与实际匹配位置的偏差
- position_error = math.sqrt(
- (match_x_ratio - x_ratio) ** 2 + (match_y_ratio - y_ratio) ** 2
- )
- if success:
- logger.info(f" 匹配成功: {match_count}个特征点, "
- f"匹配位置=({match_x_ratio:.3f}, {match_y_ratio:.3f}), "
- f"期望位置=({x_ratio:.3f}, {y_ratio:.3f}), "
- f"位置偏差={position_error:.3f}")
- if position_error < 0.15:
- passed += 1
- logger.info(f" 验证通过 (偏差 < 15%)")
- else:
- logger.warning(f" 验证偏差较大 ({position_error:.1%}),校准精度可能不足")
- else:
- logger.warning(f" 特征匹配不足({match_count}点), "
- f"球机可能未指向全景画面中期望的位置")
- logger.info(f"校准验证结果: {passed}/{total} 个位置验证通过")
- if passed == 0:
- logger.error("所有验证位置均未通过,校准结果可能完全错误!请检查:")
- logger.error(" 1. 球机安装方向配置是否正确 (mount_type, pan_flip, tilt_flip)")
- logger.error(" 2. 两台摄像头的相对位置是否合理")
- logger.error(" 3. 球机PTZ角度范围是否配置正确")
- return False
- if passed < total:
- logger.warning(f"部分验证未通过,校准精度可能有限")
- return True
- return True
- def _generate_points_in_overlaps(self, quick_mode: bool = True) -> List[CalibrationPoint]:
- """
- 在发现的重叠区间内生成校准点
- 只在球机和全景有视觉重叠的区域生成点
- """
- points = []
- if quick_mode:
- # 快速模式: 每个重叠区间内生成9个点(3x3网格)
- for overlap in self.overlap_ranges:
- # 在区间中心生成点
- pan_center = (overlap.pan_start + overlap.pan_end) / 2
- tilt_center = (overlap.tilt_start + overlap.tilt_end) / 2
- pan_span = overlap.pan_end - overlap.pan_start
- tilt_span = overlap.tilt_end - overlap.tilt_start
- # 3x3网格分布
- pan_positions = [0.25, 0.5, 0.75] if pan_span > 10 else [0.5]
- tilt_positions = [0.25, 0.5, 0.75] if tilt_span > 10 else [0.5]
- for pf in pan_positions:
- for tf in tilt_positions:
- points.append(CalibrationPoint(
- pan=overlap.pan_start + pan_span * pf,
- tilt=overlap.tilt_start + tilt_span * tf,
- zoom=1.0))
- else:
- # 完整模式: 在每个重叠区间内均匀分布
- grid_size = 5
- for overlap in self.overlap_ranges:
- for i in range(grid_size):
- for j in range(grid_size):
- pan = overlap.pan_start + (overlap.pan_end - overlap.pan_start) * i / (grid_size - 1)
- tilt = overlap.tilt_start + (overlap.tilt_end - overlap.tilt_start) * j / (grid_size - 1)
- points.append(CalibrationPoint(pan=pan, tilt=tilt, zoom=1.0))
- return points
- def _calculate_transform(self, points: List[CalibrationPoint]) -> bool:
- """使用RANSAC + 最小二乘法拟合变换参数,剔除异常值"""
- try:
- if len(points) < 4:
- logger.error(f"计算变换参数错误: 有效点不足({len(points)}个)")
- return False
- pan_values = np.array([p.pan for p in points])
- tilt_values = np.array([p.tilt for p in points])
- x_ratios = np.array([p.x_ratio for p in points])
- y_ratios = np.array([p.y_ratio for p in points])
- # 记录原始校准数据便于调试
- logger.info("校准点原始数据:")
- for i, p in enumerate(points):
- logger.info(f" 点{i+1}: pan={p.pan:.1f}°, tilt={p.tilt:.1f}° → "
- f"全景位置=({p.x_ratio:.3f}, {p.y_ratio:.3f})")
- # 展开pan角度避免0°/360°边界不连续性
- pan_unwrapped = self._unwrap_pan_angles(pan_values)
- if not np.allclose(pan_values, pan_unwrapped, atol=0.1):
- logger.info(f"Pan角度展开: 原始={pan_values.tolist()} → 展开后={pan_unwrapped.tolist()}")
- else:
- logger.info("Pan角度无需展开(无0°/360°边界跨越)")
- # RANSAC剔除异常值 (使用展开后的pan)
- inlier_mask = self._ransac_filter(x_ratios, y_ratios, pan_unwrapped, tilt_values)
- inlier_count = np.sum(inlier_mask)
- if inlier_count < 4:
- logger.warning(f"RANSAC后有效点不足({inlier_count}个),使用全部点")
- inlier_mask = np.ones(len(points), dtype=bool)
- inlier_count = np.sum(inlier_mask)
- else:
- logger.info(f"RANSAC: {len(points)}个点中{inlier_count}个内点,"
- f"剔除{len(points) - inlier_count}个异常值")
- # 记录内点数据
- logger.info("RANSAC内点数据:")
- for i, p in enumerate(points):
- if inlier_mask[i]:
- logger.info(f" 点{i+1}: pan={pan_unwrapped[i]:.1f}°(原始={p.pan:.1f}°), "
- f"tilt={p.tilt:.1f}° → ({p.x_ratio:.3f}, {p.y_ratio:.3f})")
- # 用内点拟合完整模型
- A = np.ones((inlier_count, 3))
- A[:, 1] = x_ratios[inlier_mask]
- A[:, 2] = y_ratios[inlier_mask]
- pan_params, _, _, _ = np.linalg.lstsq(A, pan_unwrapped[inlier_mask], rcond=None)
- tilt_params, _, _, _ = np.linalg.lstsq(A, tilt_values[inlier_mask], rcond=None)
- self.pan_offset = pan_params[0]
- self.pan_scale_x = pan_params[1]
- self.pan_scale_y = pan_params[2]
- self.tilt_offset = tilt_params[0]
- self.tilt_scale_x = tilt_params[1]
- self.tilt_scale_y = tilt_params[2]
- # 系数合理性检查
- pan_coeffs_ok = (abs(self.pan_scale_x) < 500 and abs(self.pan_scale_y) < 500)
- tilt_coeffs_ok = (abs(self.tilt_scale_x) < 300 and abs(self.tilt_scale_y) < 300)
- if not (pan_coeffs_ok and tilt_coeffs_ok):
- logger.warning(f"完整模型系数异常: pan_scale_x={self.pan_scale_x:.1f}, "
- f"pan_scale_y={self.pan_scale_y:.1f}, "
- f"tilt_scale_x={self.tilt_scale_x:.1f}, "
- f"tilt_scale_y={self.tilt_scale_y:.1f}")
- logger.info("尝试简化模型: pan仅依赖x, tilt仅依赖y")
- # 简化模型: pan = offset + scale_x * x
- # tilt = offset + scale_y * y
- A_pan = np.ones((inlier_count, 2))
- A_pan[:, 1] = x_ratios[inlier_mask]
- pan_params_s, _, _, _ = np.linalg.lstsq(A_pan, pan_unwrapped[inlier_mask], rcond=None)
- A_tilt = np.ones((inlier_count, 2))
- A_tilt[:, 1] = y_ratios[inlier_mask]
- tilt_params_s, _, _, _ = np.linalg.lstsq(A_tilt, tilt_values[inlier_mask], rcond=None)
- self.pan_offset = pan_params_s[0]
- self.pan_scale_x = pan_params_s[1]
- self.pan_scale_y = 0.0
- self.tilt_offset = tilt_params_s[0]
- self.tilt_scale_x = 0.0
- self.tilt_scale_y = tilt_params_s[1]
- logger.info(f"简化模型: pan={self.pan_offset:.2f} + {self.pan_scale_x:.2f}*x, "
- f"tilt={self.tilt_offset:.2f} + {self.tilt_scale_y:.2f}*y")
- # 验证变换对全景中心的预测是否合理
- center_pan, center_tilt = self.transform(0.5, 0.5)
- logger.info(f"全景中心(0.5,0.5)预测: pan={center_pan:.1f}°, tilt={center_tilt:.1f}°")
- logger.info(f"最终变换参数: pan = {self.pan_offset:.2f} + {self.pan_scale_x:.2f}*x + {self.pan_scale_y:.2f}*y, "
- f"tilt = {self.tilt_offset:.2f} + {self.tilt_scale_x:.2f}*x + {self.tilt_scale_y:.2f}*y")
- return True
- except Exception as e:
- logger.error(f"计算变换参数错误: {e}")
- import traceback
- logger.error(traceback.format_exc())
- return False
- def _ransac_filter(self, x: np.ndarray, y: np.ndarray,
- pan: np.ndarray, tilt: np.ndarray,
- max_iterations: int = 200, threshold: float = 15.0,
- min_samples: int = 4) -> np.ndarray:
- """RANSAC剔除变换拟合中的异常值(pan应已展开为连续值)"""
- n = len(x)
- best_inliers = np.zeros(n, dtype=bool)
- best_inlier_count = 0
- rng = np.random.RandomState(42)
- for _ in range(max_iterations):
- # 随机选min_samples个点
- indices = rng.choice(n, min_samples, replace=False)
- # 用这些点拟合
- A = np.ones((min_samples, 3))
- A[:, 1] = x[indices]
- A[:, 2] = y[indices]
- try:
- pan_params, _, _, _ = np.linalg.lstsq(A, pan[indices], rcond=None)
- tilt_params, _, _, _ = np.linalg.lstsq(A, tilt[indices], rcond=None)
- except np.linalg.LinAlgError:
- continue
- # 计算所有点的误差
- pred_pan = pan_params[0] + pan_params[1] * x + pan_params[2] * y
- pred_tilt = tilt_params[0] + tilt_params[1] * x + tilt_params[2] * y
- # 使用角度差计算pan误差(即使已展开,仍用角度差以防边界情况)
- pan_errors = np.array([self._angular_diff(float(pred_pan[i]), float(pan[i]))
- for i in range(n)])
- tilt_errors = pred_tilt - tilt
- errors = np.sqrt(pan_errors ** 2 + tilt_errors ** 2)
- inliers = errors < threshold
- inlier_count = np.sum(inliers)
- if inlier_count > best_inlier_count:
- best_inlier_count = inlier_count
- best_inliers = inliers
- if best_inlier_count == 0:
- return np.ones(n, dtype=bool)
- return best_inliers
- def _calculate_rms_error(self, points: List[CalibrationPoint]) -> float:
- """计算均方根误差(使用角度差处理pan环绕)"""
- total_error = 0.0
- for p in points:
- pred_pan, pred_tilt = self.transform(p.x_ratio, p.y_ratio)
- # 使用角度差计算pan误差,处理0°/360°环绕
- pan_error = self._angular_diff(pred_pan, p.pan)
- tilt_error = pred_tilt - p.tilt
- error = math.sqrt(pan_error ** 2 + tilt_error ** 2)
- total_error += error ** 2
- return math.sqrt(total_error / len(points))
- def transform(self, x_ratio: float, y_ratio: float) -> Tuple[float, float]:
- """将全景坐标转换为PTZ角度 - 梯形透视补偿"""
- # 优先使用分段线性查找表(pan)
- if self.pan_lookup:
- pan = self._interp_lookup(self.pan_lookup, x_ratio)
- else:
- pan = self.pan_offset + self.pan_scale_x * x_ratio + self.pan_scale_y * y_ratio
- # pan边缘曲线补偿:越靠近边缘补偿越大,中心不补偿
- # 梯形透视:底部(y大)更宽,边缘补偿更大;顶部(y小)更窄,补偿更小
- if self.pan_edge_offset != 0:
- dx = 2 * x_ratio - 1 # -1(左) ~ 0(中) ~ +1(右)
- y_scale = 0.3 + 0.7 * y_ratio # 顶部0.3倍,底部1.0倍
- pan_correction = self.pan_edge_offset * y_scale * math.copysign(abs(dx) ** self.pan_curve_power, dx)
- pan += pan_correction
- # 将pan归一化到[0, 360),便于发送给球机
- pan = pan % 360
- # tilt:优先使用曲线映射(查找表tilt数据不稳定),后备查找表
- if self.tilt_linear_enabled:
- tilt = self.tilt_y0 + (self.tilt_y1 - self.tilt_y0) * (y_ratio ** self.tilt_curve_power)
- elif self.tilt_lookup:
- tilt = self._interp_lookup(self.tilt_lookup, y_ratio)
- else:
- tilt = self.tilt_offset + self.tilt_scale_x * x_ratio + self.tilt_scale_y * y_ratio
- return (pan, tilt)
- def _interp_lookup(self, lookup: List[Tuple[float, float]], ratio: float) -> float:
- """分段线性插值"""
- if not lookup:
- return 0.0
- if len(lookup) == 1:
- return lookup[0][1]
- if ratio <= lookup[0][0]:
- return lookup[0][1]
- if ratio >= lookup[-1][0]:
- return lookup[-1][1]
- # 二分查找插入位置
- lo, hi = 0, len(lookup) - 1
- while lo < hi - 1:
- mid = (lo + hi) // 2
- if lookup[mid][0] <= ratio:
- lo = mid
- else:
- hi = mid
- # 线性插值
- x0, v0 = lookup[lo]
- x1, v1 = lookup[hi]
- if abs(x1 - x0) < 1e-10:
- return v0
- t = (ratio - x0) / (x1 - x0)
- return v0 + t * (v1 - v0)
- def _build_lookup_tables(self, points: List[CalibrationPoint]) -> bool:
- """
- 从校准点构建分段线性查找表
- 核心策略:
- 1. 将所有校准点按x_ratio分桶,取匹配点数加权的pan值
- 2. 用最长连续单调子序列(LCMA)过滤假阳性:x_ratio→pan应近似单调
- 3. 处理pan角度环绕
- """
- if len(points) < 3:
- return False
- sorted_by_x = sorted(points, key=lambda p: p.x_ratio)
- # ===== 构建 x_ratio → pan 映射 =====
- grid_size = 0.05
- x_buckets: Dict[float, List[Tuple[float, int]]] = {}
- for p in sorted_by_x:
- x_key = round(p.x_ratio / grid_size) * grid_size
- if x_key not in x_buckets:
- x_buckets[x_key] = []
- match_count = getattr(p, 'match_count', 10)
- x_buckets[x_key].append((p.pan, match_count))
- # 加权中位数
- raw_entries = []
- for x_key in sorted(x_buckets.keys()):
- entries = x_buckets[x_key]
- total_weight = sum(mc for _, mc in entries)
- weighted_pans = []
- for pan, mc in entries:
- weighted_pans.extend([pan] * max(1, mc // 5))
- weighted_pan = float(np.median(weighted_pans))
- raw_entries.append((x_key, weighted_pan, total_weight))
- logger.info(f"Pan原始映射 ({len(raw_entries)} 个x_key):")
- for x, pan, w in raw_entries:
- logger.info(f" x={x:.3f} → pan={pan:.1f}° (weight={w})")
- # 用LCMA过滤:找到最长的近似连续单调子序列
- # pan随x_ratio应该是近似单调递减或递增的
- if len(raw_entries) >= 3:
- filtered = self._filter_continuous_monotonic(raw_entries)
- self.pan_lookup = [(x, pan) for x, pan, w in filtered]
- else:
- self.pan_lookup = [(x, pan % 360) for x, pan, w in raw_entries]
- # ===== 构建 y_ratio → tilt 映射 =====
- # 只使用通过pan过滤的点(x_ratio对应的pan与查找表一致)
- pan_valid_x = set(x for x, _ in self.pan_lookup)
- pan_tolerance = grid_size * 1.5 # 允许在pan有效区域附近的点
- valid_points_for_tilt = []
- for p in sorted_by_x:
- for vx in pan_valid_x:
- if abs(p.x_ratio - vx) <= pan_tolerance:
- valid_points_for_tilt.append(p)
- break
- logger.info(f"Tilt映射使用 {len(valid_points_for_tilt)}/{len(sorted_by_x)} 个经过pan验证的点")
- y_buckets: Dict[float, List[Tuple[float, int]]] = {}
- for p in valid_points_for_tilt:
- y_key = round(p.y_ratio / grid_size) * grid_size
- if y_key not in y_buckets:
- y_buckets[y_key] = []
- match_count = getattr(p, 'match_count', 10)
- y_buckets[y_key].append((p.tilt, match_count))
- tilt_entries = []
- for y_key in sorted(y_buckets.keys()):
- entries = y_buckets[y_key]
- weighted_tilts = []
- for tilt, mc in entries:
- weighted_tilts.extend([tilt] * max(1, mc // 5))
- tilt_median = float(np.median(weighted_tilts))
- tilt_entries.append((y_key, tilt_median))
- self.tilt_lookup = tilt_entries
- # 记录查找表内容
- logger.info(f"Pan查找表 ({len(self.pan_lookup)} 项):")
- for x, pan in self.pan_lookup:
- logger.info(f" x={x:.3f} → pan={pan:.1f}°")
- logger.info(f"Tilt查找表 ({len(self.tilt_lookup)} 项):")
- for y, tilt in self.tilt_lookup:
- logger.info(f" y={y:.3f} → tilt={tilt:.1f}°")
- return True
- def _filter_continuous_monotonic(
- self, entries: List[Tuple[float, float, int]],
- max_step: float = 60.0
- ) -> List[Tuple[float, float, int]]:
- """
- 过滤出最长的连续单调子序列
- x_ratio→pan应该是近似单调的(递增或递减,可能环绕一次)。
- 假阳性匹配会导致pan突然跳变到完全不相关的角度,
- 这个方法通过寻找最长的"步长<max_step"的子序列来过滤。
- Args:
- entries: [(x_key, pan, weight), ...] 已按x_key排序
- max_step: 相邻两个点允许的最大pan差(度)
- Returns:
- 过滤后的entries子集
- """
- n = len(entries)
- if n <= 2:
- return [(x, pan % 360, w) for x, pan, w in entries]
- # 尝试两种方向:pan递减和pan递增
- # 对于递减方向:每个点的pan应比前一个小(允许环绕)
- best_result = []
- for direction in ['decreasing', 'increasing']:
- # 动态规划找最长连续子序列
- # dp[i] = 以entries[i]结尾的最长连续子序列长度
- # parent[i] = 前驱索引
- dp = [1] * n
- parent = [-1] * n
- for i in range(1, n):
- for j in range(i):
- # 检查j→i的pan变化是否合理
- pan_j = entries[j][1]
- pan_i = entries[i][1]
- # 计算角度差(考虑环绕)
- diff = pan_i - pan_j
- while diff > 180:
- diff -= 360
- while diff < -180:
- diff += 360
- # 检查方向
- if direction == 'decreasing':
- ok = diff <= 0 and abs(diff) <= max_step
- else:
- ok = diff >= 0 and abs(diff) <= max_step
- if ok and dp[j] + 1 > dp[i]:
- dp[i] = dp[j] + 1
- parent[i] = j
- # 找最长子序列的终点
- end = max(range(n), key=lambda i: dp[i])
- # 回溯构建子序列
- seq = []
- idx = end
- while idx >= 0:
- seq.append(idx)
- idx = parent[idx]
- seq.reverse()
- # 展开pan角度
- result = self._unwrap_sequence(entries, seq)
- if len(result) > len(best_result):
- best_result = result
- logger.info(f"LCMA过滤: {n}个点 → {len(best_result)}个点 (方向: "
- f"{'递减' if len(best_result) > 0 else '无'})")
- # 如果过滤后太少,放宽条件重试
- if len(best_result) < 3 and n >= 3:
- logger.info("LCMA结果太少,放宽步长限制重试")
- for wider_step in [90, 120, 180]:
- for direction in ['decreasing', 'increasing']:
- dp = [1] * n
- parent = [-1] * n
- for i in range(1, n):
- for j in range(i):
- diff = entries[i][1] - entries[j][1]
- while diff > 180:
- diff -= 360
- while diff < -180:
- diff += 360
- if direction == 'decreasing':
- ok = diff <= 0 and abs(diff) <= wider_step
- else:
- ok = diff >= 0 and abs(diff) <= wider_step
- if ok and dp[j] + 1 > dp[i]:
- dp[i] = dp[j] + 1
- parent[i] = j
- end = max(range(n), key=lambda i: dp[i])
- seq = []
- idx = end
- while idx >= 0:
- seq.append(idx)
- idx = parent[idx]
- seq.reverse()
- result = self._unwrap_sequence(entries, seq)
- if len(result) > len(best_result):
- best_result = result
- if len(best_result) >= 3:
- break
- if not best_result:
- # 全部过滤后为空,使用原始数据
- logger.warning("LCMA过滤后为空,使用原始数据")
- return [(x, pan % 360, w) for x, pan, w in entries]
- return best_result
- def _unwrap_sequence(
- self, entries: List[Tuple[float, float, int]],
- indices: List[int]
- ) -> List[Tuple[float, float, int]]:
- """将子序列的pan角度展开为连续值(不强制归一化到[0,360),便于插值)"""
- result = []
- prev_unwrapped = None
- for idx in indices:
- x, pan, w = entries[idx]
- if prev_unwrapped is None:
- unwrapped = pan
- else:
- diff = pan - prev_unwrapped
- while diff > 180:
- pan -= 360
- diff = pan - prev_unwrapped
- while diff < -180:
- pan += 360
- diff = pan - prev_unwrapped
- unwrapped = pan
- prev_unwrapped = unwrapped
- # 保持连续值,transform 返回前再归一化到 [0,360)
- result.append((x, unwrapped, w))
- return result
- def inverse_transform(self, pan: float, tilt: float) -> Tuple[float, float]:
- """将PTZ角度转换为全景坐标"""
- # 优先使用查找表的反向查找
- if self.pan_lookup and self.tilt_lookup:
- # 反向查找: pan → x_ratio
- x_ratio = self._reverse_lookup(self.pan_lookup, pan % 360)
- y_ratio = self._reverse_lookup(self.tilt_lookup, tilt)
- return (max(0, min(1, x_ratio)), max(0, min(1, y_ratio)))
- # 后备:线性模型逆变换
- M = np.array([
- [self.pan_scale_x, self.pan_scale_y],
- [self.tilt_scale_x, self.tilt_scale_y]
- ])
- det = np.linalg.det(M)
- if abs(det) < 1e-10:
- x_ratio = (pan - self.pan_offset) / self.pan_scale_x if abs(self.pan_scale_x) > 1e-10 else 0.5
- y_ratio = (tilt - self.tilt_offset) / self.tilt_scale_y if abs(self.tilt_scale_y) > 1e-10 else 0.5
- else:
- M_inv = np.linalg.inv(M)
- offset = np.array([pan - self.pan_offset, tilt - self.tilt_offset])
- result = M_inv @ offset
- x_ratio, y_ratio = result[0], result[1]
- return (max(0, min(1, x_ratio)), max(0, min(1, y_ratio)))
- def _reverse_lookup(self, lookup: List[Tuple[float, float]], value: float) -> float:
- """查找表反向查找:从value找ratio"""
- if not lookup:
- return 0.5
- # 处理pan环绕:找到最接近的段
- best_idx = 0
- best_diff = float('inf')
- for i, (ratio, v) in enumerate(lookup):
- diff = self._angular_diff(value, v)
- if abs(diff) < abs(best_diff):
- best_diff = diff
- best_idx = i
- # 精确定位到最近的两个点之间
- if best_idx == 0:
- return lookup[0][0]
- if best_idx == len(lookup) - 1:
- return lookup[-1][0]
- # 检查前一个和后一个点,选择更近的段
- prev_v = lookup[best_idx - 1][1]
- curr_v = lookup[best_idx][1]
- next_v = lookup[best_idx + 1][1] if best_idx + 1 < len(lookup) else curr_v
- # 在 (best_idx-1, best_idx) 和 (best_idx, best_idx+1) 之间选择
- if abs(self._angular_diff(value, prev_v)) < abs(self._angular_diff(value, next_v)):
- lo, hi = best_idx - 1, best_idx
- else:
- lo, hi = best_idx, best_idx + 1
- x0, v0 = lookup[lo]
- x1, v1 = lookup[hi]
- # 考虑角度环绕
- diff_v = self._angular_diff(v1, v0)
- if abs(diff_v) < 1e-10:
- return (x0 + x1) / 2
- t = self._angular_diff(value, v0) / diff_v
- t = max(0, min(1, t))
- return x0 + t * (x1 - x0)
- def is_calibrated(self) -> bool:
- return self.state == CalibrationState.SUCCESS
- def get_state(self) -> CalibrationState:
- return self.state
- def get_result(self) -> Optional[CalibrationResult]:
- return self.result
- def get_overlap_ranges(self) -> List[OverlapRange]:
- """返回发现的重叠区间"""
- return self.overlap_ranges
- def save_calibration(self, filepath: str) -> bool:
- """保存校准结果"""
- if not self.is_calibrated():
- return False
- try:
- import json
- ptz_config = getattr(self.ptz, 'ptz_config', None) or _get_ptz_config()
- data = {
- 'pan_offset': self.pan_offset,
- 'pan_scale_x': self.pan_scale_x,
- 'pan_scale_y': self.pan_scale_y,
- 'tilt_offset': self.tilt_offset,
- 'tilt_scale_x': self.tilt_scale_x,
- 'tilt_scale_y': self.tilt_scale_y,
- 'rms_error': self.result.rms_error if self.result else 0,
- 'overlap_ranges': [
- {
- 'pan_start': r.pan_start,
- 'pan_end': r.pan_end,
- 'tilt_start': r.tilt_start,
- 'tilt_end': r.tilt_end,
- 'match_count': r.match_count
- }
- for r in self.overlap_ranges
- ],
- # 分段线性查找表
- 'pan_lookup': self.pan_lookup,
- 'tilt_lookup': self.tilt_lookup,
- # 保存安装方向配置
- 'mount_type': ptz_config.get('mount_type', 'wall'),
- 'tilt_flip': ptz_config.get('tilt_flip', False),
- 'pan_flip': ptz_config.get('pan_flip', False),
- }
- with open(filepath, 'w') as f:
- json.dump(data, f, indent=2)
- logger.info(f"校准结果已保存: {filepath}")
- return True
- except Exception as e:
- logger.error(f"保存校准结果失败: {e}")
- return False
- def load_calibration(self, filepath: str) -> bool:
- """加载校准结果"""
- try:
- import json
- with open(filepath, 'r') as f:
- data = json.load(f)
- self.pan_offset = data['pan_offset']
- self.pan_scale_x = data['pan_scale_x']
- self.pan_scale_y = data['pan_scale_y']
- self.tilt_offset = data['tilt_offset']
- self.tilt_scale_x = data['tilt_scale_x']
- self.tilt_scale_y = data['tilt_scale_y']
- # 加载分段线性查找表
- self.pan_lookup = [tuple(p) for p in data.get('pan_lookup', [])]
- self.tilt_lookup = [tuple(t) for t in data.get('tilt_lookup', [])]
- # 加载重叠区间(如果有)
- if 'overlap_ranges' in data:
- self.overlap_ranges = [
- OverlapRange(
- pan_start=r['pan_start'],
- pan_end=r['pan_end'],
- tilt_start=r['tilt_start'],
- tilt_end=r['tilt_end'],
- match_count=r['match_count'],
- panorama_center_x=0,
- panorama_center_y=0
- )
- for r in data['overlap_ranges']
- ]
- self.state = CalibrationState.SUCCESS
- self.result = CalibrationResult(
- success=True,
- points=[],
- rms_error=data.get('rms_error', 0)
- )
-
- # 检查安装方向配置是否匹配
- ptz_config = getattr(self.ptz, 'ptz_config', None) or _get_ptz_config()
- current_mount = ptz_config.get('mount_type', 'wall')
- saved_mount = data.get('mount_type', 'wall')
- if current_mount != saved_mount:
- logger.warning(f"当前安装类型({current_mount})与校准时的({saved_mount})不同,建议重新校准!")
- logger.info(f"校准结果已加载: {filepath}")
- return True
- except FileNotFoundError:
- logger.warning(f"校准文件不存在: {filepath}")
- return False
- except Exception as e:
- logger.error(f"加载校准结果失败: {e}")
- return False
- class CalibrationManager:
- """校准管理器"""
- def __init__(self, calibrator: CameraCalibrator, calibration_file: str = None):
- self.calibrator = calibrator
- # 优先使用传入的路径,否则从配置读取,最后使用默认值
- if calibration_file:
- self.calibration_file = calibration_file
- else:
- try:
- from config import CALIBRATION_CONFIG
- self.calibration_file = CALIBRATION_CONFIG.get(
- 'calibration_file', 'calibration.json'
- )
- except ImportError:
- self.calibration_file = 'calibration.json'
- def auto_calibrate(self, force: bool = False, fallback_on_failure: bool = True) -> CalibrationResult:
- """
- 自动校准
-
- Args:
- force: 是否强制重新校准(不加载已有数据)
- fallback_on_failure: 校准失败时是否回退使用已有数据
-
- Returns:
- 校准结果
- """
- # 检查是否启用加载上次校准数据
- load_on_startup = True # 默认启用
- try:
- from config import CALIBRATION_CONFIG
- load_on_startup = CALIBRATION_CONFIG.get('load_on_startup', True)
- except:
- pass
-
- # 如果不是强制校准,尝试加载已有数据
- if not force and load_on_startup:
- if self.calibrator.load_calibration(self.calibration_file):
- logger.info("使用已有校准结果")
- return self.calibrator.get_result()
- # 执行新校准
- if force:
- logger.info("强制重新校准(不使用已有数据)...")
- elif not load_on_startup:
- logger.info("已禁用加载校准数据,开始新校准...")
- else:
- logger.info("开始自动校准...")
-
- result = self.calibrator.calibrate(quick_mode=True)
- if result.success:
- self.calibrator.save_calibration(self.calibration_file)
- elif fallback_on_failure:
- # 校准失败,尝试回退使用已有数据
- logger.warning("校准失败,尝试回退使用已有校准数据...")
- if self.calibrator.load_calibration(self.calibration_file):
- logger.info("已回退到已有校准数据")
- result = self.calibrator.get_result()
- return result
- def check_calibration(self) -> Tuple[bool, str]:
- """检查校准状态"""
- state = self.calibrator.get_state()
- if state == CalibrationState.SUCCESS:
- result = self.calibrator.get_result()
- overlaps = self.calibrator.get_overlap_ranges()
- overlap_info = f", {len(overlaps)}个重叠区间" if overlaps else ""
- return (True, f"校准有效, RMS误差: {result.rms_error:.4f}°{overlap_info}")
- elif state == CalibrationState.FAILED:
- return (False, "校准失败")
- elif state == CalibrationState.RUNNING:
- return (False, "校准进行中")
- else:
- return (False, "未校准")
|