calibration.py 68 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728
  1. """
  2. 相机校准模块
  3. 实现全景相机与球机的自动校准
  4. 建立画面坐标到PTZ角度的映射关系
  5. 核心改进:先发现视野重叠区域,再在重叠区内校准,
  6. 避免球机指向与全景画面无重叠的方向导致校准失败。
  7. """
  8. import time
  9. import math
  10. import threading
  11. import logging
  12. import numpy as np
  13. import cv2
  14. from typing import List, Tuple, Dict, Optional, Callable
  15. from dataclasses import dataclass, field
  16. from enum import Enum
  17. from ptz_camera import PTZCamera
  18. logger = logging.getLogger(__name__)
  19. # 加载PTZ配置
  20. def _get_ptz_config():
  21. try:
  22. from config import PTZ_CONFIG
  23. return PTZ_CONFIG
  24. except ImportError:
  25. return {
  26. 'mount_type': 'wall',
  27. 'tilt_flip': False,
  28. 'pan_flip': False
  29. }
  30. class CalibrationState(Enum):
  31. IDLE = 0
  32. RUNNING = 1
  33. SUCCESS = 2
  34. FAILED = 3
  35. @dataclass
  36. class CalibrationPoint:
  37. pan: float
  38. tilt: float
  39. zoom: float = 1.0
  40. x_ratio: float = 0.0
  41. y_ratio: float = 0.0
  42. detected: bool = False
  43. match_count: int = 0
  44. @dataclass
  45. class CalibrationResult:
  46. success: bool
  47. points: List[CalibrationPoint]
  48. transform_matrix: Optional[np.ndarray] = None
  49. error_message: str = ""
  50. rms_error: float = 0.0
  51. @dataclass
  52. class OverlapRange:
  53. pan_start: float
  54. pan_end: float
  55. tilt_start: float
  56. tilt_end: float
  57. match_count: int
  58. panorama_center_x: float
  59. panorama_center_y: float
  60. MIN_MATCH_THRESHOLD = 10 # 最少匹配点数
  61. LOWE_RATIO = 0.70 # Lowe's ratio test 阈值,越小越严格
  62. RANSAC_THRESHOLD = 4.0 # RANSAC 单应矩阵内点阈值(像素)
  63. MIN_INLIERS = 5 # 最少几何内点数
  64. class OverlapDiscovery:
  65. """
  66. 视野重叠发现器
  67. 扫描球机视野范围,找出与全景画面有视觉重叠的角度区间
  68. """
  69. def __init__(self, feature_type: str = 'SIFT'):
  70. try:
  71. self.feature_detector = cv2.SIFT_create()
  72. self.feature_type = 'SIFT'
  73. except AttributeError:
  74. self.feature_detector = cv2.ORB_create(nfeatures=500)
  75. self.feature_type = 'ORB'
  76. norm_type = cv2.NORM_L2 if self.feature_type == 'SIFT' else cv2.NORM_HAMMING
  77. self.matcher = cv2.BFMatcher(norm_type)
  78. def match_frames(self, ptz_frame: np.ndarray, panorama_frame: np.ndarray
  79. ) -> Tuple[bool, int, float, float]:
  80. """
  81. 特征匹配球机画面与全景画面
  82. Returns: (是否匹配成功, 匹配点数, 全景画面中心x, 全景画面中心y)
  83. """
  84. if ptz_frame is None or panorama_frame is None:
  85. return (False, 0, 0.0, 0.0)
  86. try:
  87. ptz_gray = cv2.cvtColor(ptz_frame, cv2.COLOR_BGR2GRAY) if len(ptz_frame.shape) == 3 else ptz_frame
  88. pan_gray = cv2.cvtColor(panorama_frame, cv2.COLOR_BGR2GRAY) if len(panorama_frame.shape) == 3 else panorama_frame
  89. # 缩小图像加速特征提取(匹配坐标按比例还原)
  90. ptz_scale = 1.0
  91. pan_scale = 1.0
  92. max_dim = 960
  93. if ptz_gray.shape[1] > max_dim:
  94. ptz_scale = max_dim / ptz_gray.shape[1]
  95. ptz_gray = cv2.resize(ptz_gray, None, fx=ptz_scale, fy=ptz_scale,
  96. interpolation=cv2.INTER_AREA)
  97. if pan_gray.shape[1] > max_dim:
  98. pan_scale = max_dim / pan_gray.shape[1]
  99. pan_gray = cv2.resize(pan_gray, None, fx=pan_scale, fy=pan_scale,
  100. interpolation=cv2.INTER_AREA)
  101. kp1, des1 = self.feature_detector.detectAndCompute(ptz_gray, None)
  102. kp2, des2 = self.feature_detector.detectAndCompute(pan_gray, None)
  103. if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
  104. return (False, 0, 0.0, 0.0)
  105. matches = self.matcher.knnMatch(des1, des2, k=2)
  106. good_matches = []
  107. for match_pair in matches:
  108. if len(match_pair) == 2:
  109. m, n = match_pair
  110. if m.distance < LOWE_RATIO * n.distance:
  111. good_matches.append(m)
  112. if len(good_matches) < MIN_MATCH_THRESHOLD:
  113. return (False, len(good_matches), 0.0, 0.0)
  114. # 几何验证:用 RANSAC 单应矩阵剔除空间不一致的匹配
  115. ptz_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches])
  116. pan_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches])
  117. try:
  118. _, mask = cv2.findHomography(ptz_pts, pan_pts, cv2.RANSAC, RANSAC_THRESHOLD)
  119. inlier_mask = mask.ravel().astype(bool)
  120. inlier_count = int(np.sum(inlier_mask))
  121. except Exception:
  122. inlier_count = 0
  123. inlier_mask = np.zeros(len(good_matches), dtype=bool)
  124. if inlier_count < MIN_INLIERS:
  125. logger.debug(f"几何验证失败: {inlier_count}/{len(good_matches)} 个内点")
  126. return (False, inlier_count, 0.0, 0.0)
  127. # 只使用几何一致的内点计算中心
  128. inlier_pan_pts = pan_pts[inlier_mask]
  129. center_x = np.mean(inlier_pan_pts[:, 0]) / pan_scale
  130. center_y = np.mean(inlier_pan_pts[:, 1]) / pan_scale
  131. logger.debug(f"特征匹配: 原始={len(good_matches)}, 内点={inlier_count}, "
  132. f"中心=({center_x:.1f}, {center_y:.1f})")
  133. return (True, inlier_count, center_x, center_y)
  134. except Exception as e:
  135. logger.error(f"特征匹配异常: {e}")
  136. return (False, 0, 0.0, 0.0)
  137. def discover_overlap_ranges(
  138. self,
  139. ptz: PTZCamera,
  140. get_panorama_frame: Callable[[], np.ndarray],
  141. ptz_capture: Callable[[], Optional[np.ndarray]],
  142. pan_range: Tuple[float, float] = (0, 360),
  143. tilt_range: Tuple[float, float] = (-20, 40),
  144. pan_step: float = 20,
  145. tilt_step: float = 15,
  146. stabilize_time: float = 2.0,
  147. on_progress: Callable[[int, int, str], None] = None,
  148. max_ranges: int = 3,
  149. min_positions_per_range: int = 3
  150. ) -> List[OverlapRange]:
  151. """
  152. 扫描球机视野范围,发现与全景画面有重叠的角度区间
  153. 1. 先拍一张全景参考帧
  154. 2. 逐步移动球机到各个角度
  155. 3. 在每个位置抓拍球机画面,与全景做特征匹配
  156. 4. 记录有足够匹配点的角度
  157. 5. 合并相邻的有重叠的角度形成区间
  158. """
  159. logger.info(f"阶段1: 视野重叠发现, 扫描范围: pan={pan_range}, tilt={tilt_range}, 步进: pan={pan_step}°, tilt={tilt_step}°")
  160. # 1. 拍全景参考帧
  161. logger.info("获取全景参考帧...")
  162. ref_frames = []
  163. for _ in range(3):
  164. frame = get_panorama_frame()
  165. if frame is not None:
  166. ref_frames.append(frame)
  167. time.sleep(0.1)
  168. if not ref_frames:
  169. logger.error("无法获取全景参考帧!")
  170. return []
  171. panorama_ref = ref_frames[0]
  172. logger.info(f"全景参考帧: {panorama_ref.shape}")
  173. # 2. 扫描各个角度
  174. scan_results: List[Tuple[float, float, int, float, float]] = []
  175. pan_values = np.arange(pan_range[0], pan_range[1] + pan_step, pan_step)
  176. tilt_values = np.arange(tilt_range[0], tilt_range[1] + tilt_step, tilt_step)
  177. total_positions = len(pan_values) * len(tilt_values)
  178. current_idx = 0
  179. for pan in pan_values:
  180. for tilt in tilt_values:
  181. current_idx += 1
  182. pos_desc = f"pan={pan:.0f}°, tilt={tilt:.0f}°"
  183. if on_progress:
  184. on_progress(current_idx, total_positions, f"扫描 {pos_desc}")
  185. logger.info(f"[{current_idx}/{total_positions}] {pos_desc}")
  186. # 移动球机
  187. if not ptz.goto_exact_position(float(pan), float(tilt), 1):
  188. logger.warning(f"移动球机失败, 跳过")
  189. continue
  190. time.sleep(stabilize_time)
  191. # 抓拍球机画面
  192. ptz_frame = ptz_capture() if ptz_capture else None
  193. if ptz_frame is None:
  194. logger.warning(f"球机抓拍失败, 跳过")
  195. continue
  196. # 获取当前全景帧并匹配
  197. cur_panorama = get_panorama_frame()
  198. if cur_panorama is None:
  199. continue
  200. success, match_count, cx, cy = self.match_frames(ptz_frame, cur_panorama)
  201. if success:
  202. h, w = cur_panorama.shape[:2]
  203. x_ratio = cx / w
  204. y_ratio = cy / h
  205. logger.info(f"匹配成功: {match_count}个特征点, 全景位置=({x_ratio:.3f}, {y_ratio:.3f})")
  206. scan_results.append((float(pan), float(tilt), match_count, x_ratio, y_ratio))
  207. else:
  208. logger.debug(f"匹配不足: {match_count}个特征点")
  209. if not scan_results:
  210. logger.warning("未发现任何视野重叠位置!")
  211. return []
  212. logger.info(f"发现 {len(scan_results)} 个有重叠的扫描位置")
  213. # 保存原始扫描结果供后续校准使用
  214. self.scan_results = scan_results
  215. # 3. 合并相邻位置为重叠区间
  216. overlap_ranges = self._merge_scan_results(
  217. scan_results,
  218. max_ranges=max_ranges,
  219. min_positions=min_positions_per_range
  220. )
  221. for i, r in enumerate(overlap_ranges):
  222. logger.info(f"重叠区间 {i+1}: pan=[{r.pan_start:.0f}°, {r.pan_end:.0f}°], "
  223. f"tilt=[{r.tilt_start:.0f}°, {r.tilt_end:.0f}°], "
  224. f"匹配点={r.match_count}")
  225. return overlap_ranges
  226. def _merge_scan_results(
  227. self,
  228. results: List[Tuple[float, float, int, float, float]],
  229. pan_tolerance: float = 20,
  230. tilt_tolerance: float = 35,
  231. max_ranges: int = 3,
  232. min_positions: int = 2
  233. ) -> List[OverlapRange]:
  234. """
  235. 使用union-find连通分量聚类合并相邻扫描结果
  236. 只保留最大的 max_ranges 个区间
  237. """
  238. if not results:
  239. return []
  240. n = len(results)
  241. # union-find
  242. parent = list(range(n))
  243. def find(x):
  244. while parent[x] != x:
  245. parent[x] = parent[parent[x]]
  246. x = parent[x]
  247. return x
  248. def union(a, b):
  249. ra, rb = find(a), find(b)
  250. if ra != rb:
  251. parent[ra] = rb
  252. # 判断两点是否相邻
  253. for i in range(n):
  254. for j in range(i + 1, n):
  255. pi, ti = results[i][0], results[i][1]
  256. pj, tj = results[j][0], results[j][1]
  257. if abs(pi - pj) <= pan_tolerance and abs(ti - tj) <= tilt_tolerance:
  258. union(i, j)
  259. # 按连通分量分组
  260. groups: Dict[int, List[int]] = {}
  261. for i in range(n):
  262. root = find(i)
  263. if root not in groups:
  264. groups[root] = []
  265. groups[root].append(i)
  266. # 转换为OverlapRange,过滤太小的组
  267. ranges = []
  268. for indices in groups.values():
  269. if len(indices) < min_positions:
  270. continue
  271. group_data = [results[i] for i in indices]
  272. ranges.append(self._group_to_range(group_data))
  273. # 按match_count降序排序,只保留最大的 max_ranges 个
  274. ranges.sort(key=lambda r: r.match_count, reverse=True)
  275. ranges = ranges[:max_ranges]
  276. # 按pan_start排序输出
  277. ranges.sort(key=lambda r: r.pan_start)
  278. return ranges
  279. def _group_to_range(self, group: List[Tuple[float, float, int, float, float]]) -> OverlapRange:
  280. """将一组扫描结果转换为一个OverlapRange"""
  281. pans = [r[0] for r in group]
  282. tilts = [r[1] for r in group]
  283. match_counts = [r[2] for r in group]
  284. x_ratios = [r[3] for r in group]
  285. y_ratios = [r[4] for r in group]
  286. step = 5 # 在边缘各扩展5度
  287. return OverlapRange(
  288. pan_start=min(pans) - step,
  289. pan_end=max(pans) + step,
  290. tilt_start=min(tilts) - step,
  291. tilt_end=max(tilts) + step,
  292. match_count=max(match_counts),
  293. panorama_center_x=float(np.mean(x_ratios)),
  294. panorama_center_y=float(np.mean(y_ratios))
  295. )
  296. class VisualCalibrationDetector:
  297. """
  298. 视觉校准检测器
  299. 通过运动检测和特征匹配定位球机在全景画面中的位置
  300. """
  301. def __init__(self):
  302. try:
  303. self.feature_detector = cv2.SIFT_create()
  304. self.feature_type = 'SIFT'
  305. except AttributeError:
  306. self.feature_detector = cv2.ORB_create(nfeatures=500)
  307. self.feature_type = 'ORB'
  308. self.matcher = cv2.BFMatcher(
  309. cv2.NORM_L2 if self.feature_type == 'SIFT' else cv2.NORM_HAMMING
  310. )
  311. self.use_motion_detection = True
  312. self.use_feature_matching = True
  313. def detect_by_motion(self, frames_before: np.ndarray,
  314. frames_after: np.ndarray) -> Optional[Tuple[float, float]]:
  315. """通过运动检测定位球机指向位置"""
  316. if frames_before is None or frames_after is None:
  317. return None
  318. before_gray = cv2.cvtColor(frames_before, cv2.COLOR_BGR2GRAY) \
  319. if len(frames_before.shape) == 3 else frames_before
  320. after_gray = cv2.cvtColor(frames_after, cv2.COLOR_BGR2GRAY) \
  321. if len(frames_after.shape) == 3 else frames_after
  322. diff = cv2.absdiff(before_gray, after_gray)
  323. _, thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
  324. kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15))
  325. thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
  326. thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
  327. contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
  328. if not contours:
  329. return None
  330. max_contour = max(contours, key=cv2.contourArea)
  331. area = cv2.contourArea(max_contour)
  332. if area < 500:
  333. return None
  334. M = cv2.moments(max_contour)
  335. if M["m00"] == 0:
  336. return None
  337. cx = M["m10"] / M["m00"]
  338. cy = M["m01"] / M["m00"]
  339. h, w = before_gray.shape
  340. logger.debug(f"运动检测: 中心=({cx:.1f}, {cy:.1f}), 面积={area:.0f})")
  341. return (cx / w, cy / h)
  342. def detect_by_feature_match(self, panorama_frame: np.ndarray,
  343. ptz_frame: np.ndarray) -> Optional[Tuple[float, float]]:
  344. """通过特征匹配定位"""
  345. if panorama_frame is None or ptz_frame is None:
  346. return None
  347. try:
  348. pan_gray = cv2.cvtColor(panorama_frame, cv2.COLOR_BGR2GRAY) \
  349. if len(panorama_frame.shape) == 3 else panorama_frame
  350. ptz_gray = cv2.cvtColor(ptz_frame, cv2.COLOR_BGR2GRAY) \
  351. if len(ptz_frame.shape) == 3 else ptz_frame
  352. # 缩小图像加速
  353. max_dim = 960
  354. ptz_scale = 1.0
  355. pan_scale = 1.0
  356. if ptz_gray.shape[1] > max_dim:
  357. ptz_scale = max_dim / ptz_gray.shape[1]
  358. ptz_gray = cv2.resize(ptz_gray, None, fx=ptz_scale, fy=ptz_scale,
  359. interpolation=cv2.INTER_AREA)
  360. if pan_gray.shape[1] > max_dim:
  361. pan_scale = max_dim / pan_gray.shape[1]
  362. pan_gray = cv2.resize(pan_gray, None, fx=pan_scale, fy=pan_scale,
  363. interpolation=cv2.INTER_AREA)
  364. kp1, des1 = self.feature_detector.detectAndCompute(ptz_gray, None)
  365. kp2, des2 = self.feature_detector.detectAndCompute(pan_gray, None)
  366. if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
  367. return None
  368. matches = self.matcher.knnMatch(des1, des2, k=2)
  369. good_matches = []
  370. for match_pair in matches:
  371. if len(match_pair) == 2:
  372. m, n = match_pair
  373. if m.distance < LOWE_RATIO * n.distance:
  374. good_matches.append(m)
  375. if len(good_matches) < MIN_MATCH_THRESHOLD:
  376. return None
  377. # 几何验证:用 RANSAC 单应矩阵剔除空间不一致的匹配
  378. ptz_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches])
  379. pan_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches])
  380. try:
  381. _, mask = cv2.findHomography(ptz_pts, pan_pts, cv2.RANSAC, RANSAC_THRESHOLD)
  382. inlier_mask = mask.ravel().astype(bool)
  383. inlier_count = int(np.sum(inlier_mask))
  384. except Exception:
  385. return None
  386. if inlier_count < MIN_INLIERS:
  387. return None
  388. inlier_pan_pts = pan_pts[inlier_mask]
  389. center_x = np.mean(inlier_pan_pts[:, 0])
  390. center_y = np.mean(inlier_pan_pts[:, 1])
  391. h, w = pan_gray.shape
  392. logger.debug(f"特征匹配: 匹配点={len(good_matches)}, 内点={inlier_count}, "
  393. f"中心=({center_x:.1f}, {center_y:.1f})")
  394. return (center_x / w, center_y / h)
  395. except Exception as e:
  396. logger.error(f"特征匹配错误: {e}")
  397. return None
  398. def detect_position(self, panorama_frame: np.ndarray,
  399. frames_before: np.ndarray = None,
  400. frames_after: np.ndarray = None,
  401. ptz_frame: np.ndarray = None) -> Tuple[bool, float, float]:
  402. """综合检测球机在全景画面中的位置"""
  403. results = []
  404. if self.use_motion_detection and frames_before is not None and frames_after is not None:
  405. motion_result = self.detect_by_motion(frames_before, frames_after)
  406. if motion_result:
  407. results.append(('motion', motion_result, 0.4))
  408. if self.use_feature_matching and ptz_frame is not None:
  409. feature_result = self.detect_by_feature_match(panorama_frame, ptz_frame)
  410. if feature_result:
  411. results.append(('feature', feature_result, 0.6))
  412. if not results:
  413. return (False, 0.0, 0.0)
  414. total_weight = sum(r[2] for r in results)
  415. x_ratio = sum(r[1][0] * r[2] for r in results) / total_weight
  416. y_ratio = sum(r[1][1] * r[2] for r in results) / total_weight
  417. logger.debug(f"融合结果: ({x_ratio:.3f}, {y_ratio:.3f})")
  418. return (True, x_ratio, y_ratio)
  419. class CameraCalibrator:
  420. """
  421. 相机校准器
  422. 两阶段校准:先发现视野重叠区域,再在重叠区内校准
  423. """
  424. def __init__(self, ptz_camera: PTZCamera,
  425. get_frame_func: Callable[[], np.ndarray],
  426. detect_marker_func: Callable[[np.ndarray], Optional[Tuple[float, float]]] = None,
  427. ptz_capture_func: Callable[[], Optional[np.ndarray]] = None):
  428. self.ptz = ptz_camera
  429. self.get_frame = get_frame_func
  430. self.detect_marker = detect_marker_func
  431. self.ptz_capture = ptz_capture_func
  432. self.visual_detector = VisualCalibrationDetector()
  433. self.overlap_discovery = OverlapDiscovery()
  434. self.state = CalibrationState.IDLE
  435. self.result: Optional[CalibrationResult] = None
  436. # 变换参数 (线性模型 - 作为后备)
  437. self.pan_offset = 0.0
  438. self.pan_scale_x = 1.0
  439. self.pan_scale_y = 0.0
  440. self.tilt_offset = 0.0
  441. self.tilt_scale_x = 0.0
  442. self.tilt_scale_y = 1.0
  443. # 分段线性查找表 (主变换方法)
  444. # 存储 x_ratio → pan 和 y_ratio → tilt 的映射
  445. self.pan_lookup: List[Tuple[float, float]] = [] # [(x_ratio, pan), ...] sorted by x_ratio
  446. self.tilt_lookup: List[Tuple[float, float]] = [] # [(y_ratio, tilt), ...] sorted by y_ratio
  447. # tilt偏移补偿(度),正值=向下补偿,优先从传入的球机配置读取
  448. ptz_config = getattr(ptz_camera, 'ptz_config', None)
  449. if ptz_config is None:
  450. from config import PTZ_CONFIG
  451. ptz_config = PTZ_CONFIG
  452. self.tilt_offset_deg = ptz_config.get('tilt_offset', 0)
  453. self.pan_offset_deg = ptz_config.get('pan_offset', 0)
  454. self.pan_edge_offset = ptz_config.get('pan_edge_offset', 0)
  455. self.pan_curve_power = ptz_config.get('pan_curve_power', 1.0)
  456. # tilt线性映射(替代不稳定的查找表)
  457. self.tilt_linear_enabled = ptz_config.get('tilt_linear_enabled', False)
  458. self.tilt_y0 = ptz_config.get('tilt_y0', 0)
  459. self.tilt_y1 = ptz_config.get('tilt_y1', 45)
  460. self.tilt_curve_power = ptz_config.get('tilt_curve_power', 1.0)
  461. # 安装方向翻转(与 ptz_camera.calculate_ptz_position 保持一致)
  462. self.tilt_flip = ptz_config.get('tilt_flip', False)
  463. self.pan_flip = ptz_config.get('pan_flip', False)
  464. # 校准配置
  465. self.stabilize_time = 1.0
  466. self.use_motion_detection = True
  467. self.use_feature_matching = True
  468. # 重叠发现配置(可从 ptz_config 覆盖,避免在无效方向浪费扫描时间)
  469. self.overlap_pan_range = ptz_config.get('overlap_pan_range', (0, 360))
  470. self.overlap_tilt_range = ptz_config.get('overlap_tilt_range', (-20, 50))
  471. self.overlap_pan_step = ptz_config.get('overlap_pan_step', 20)
  472. self.overlap_tilt_step = ptz_config.get('overlap_tilt_step', 15)
  473. self.max_overlap_ranges = 3
  474. self.min_positions_per_range = 3
  475. # 回调
  476. self.on_progress: Optional[Callable[[int, int, str], None]] = None
  477. self.on_complete: Optional[Callable[[CalibrationResult], None]] = None
  478. # 发现的重叠区间
  479. self.overlap_ranges: List[OverlapRange] = []
  480. def _angular_diff(self, a: float, b: float) -> float:
  481. """计算两个角度之间的最小差值,考虑360°环绕"""
  482. diff = a - b
  483. while diff > 180:
  484. diff -= 360
  485. while diff < -180:
  486. diff += 360
  487. return diff
  488. def _unwrap_pan_angles(self, pan_values: np.ndarray) -> np.ndarray:
  489. """
  490. 将pan角度展开为连续值,避免0°/360°边界的不连续性
  491. 使用中位数作为参考点,将所有角度调整到参考点的±180°范围内。
  492. 这样即使校准点跨越0°/360°边界,也能正确拟合线性变换。
  493. 例如: [350, 355, 5, 10] → [-10, -5, 5, 10] (ref=5)
  494. """
  495. if len(pan_values) == 0:
  496. return pan_values
  497. ref = float(np.median(pan_values))
  498. unwrapped = pan_values.astype(float).copy()
  499. for i in range(len(unwrapped)):
  500. diff = unwrapped[i] - ref
  501. while diff > 180:
  502. unwrapped[i] -= 360
  503. diff = unwrapped[i] - ref
  504. while diff < -180:
  505. unwrapped[i] += 360
  506. diff = unwrapped[i] - ref
  507. return unwrapped
  508. def calibrate(self, quick_mode: bool = True) -> CalibrationResult:
  509. """
  510. 执行校准 - 两阶段流程
  511. 阶段1: 视野重叠发现 - 扫描球机范围,找出与全景有重叠的角度区间
  512. 阶段2: 精确校准 - 仅在重叠区间内生成校准点,逐一验证后拟合变换
  513. """
  514. self.state = CalibrationState.RUNNING
  515. # ===================== 阶段1: 视野重叠发现 =====================
  516. logger.info("阶段1: 视野重叠发现 - 确定球机与全景的重叠区域")
  517. self.overlap_ranges = self.overlap_discovery.discover_overlap_ranges(
  518. ptz=self.ptz,
  519. get_panorama_frame=self.get_frame,
  520. ptz_capture=self.ptz_capture,
  521. pan_range=self.overlap_pan_range,
  522. tilt_range=self.overlap_tilt_range,
  523. pan_step=self.overlap_pan_step,
  524. tilt_step=self.overlap_tilt_step,
  525. stabilize_time=self.stabilize_time,
  526. on_progress=self.on_progress,
  527. max_ranges=self.max_overlap_ranges,
  528. min_positions_per_range=self.min_positions_per_range
  529. )
  530. if not self.overlap_ranges:
  531. self.state = CalibrationState.FAILED
  532. self.result = CalibrationResult(
  533. success=False,
  534. points=[],
  535. error_message="未发现球机与全景的视野重叠区域,无法校准。请检查两台摄像头的安装位置和朝向。"
  536. )
  537. logger.error(f"校准失败: {self.result.error_message}")
  538. if self.on_complete:
  539. self.on_complete(self.result)
  540. return self.result
  541. logger.info(f"发现 {len(self.overlap_ranges)} 个重叠区间")
  542. # 保留所有重叠区间用于校准(覆盖更广的视野范围)
  543. logger.info(f"使用全部 {len(self.overlap_ranges)} 个重叠区间进行校准(覆盖更广视野)")
  544. for i, r in enumerate(self.overlap_ranges):
  545. logger.info(f" 区间{i+1}: pan=[{r.pan_start:.0f}°, {r.pan_end:.0f}°], "
  546. f"tilt=[{r.tilt_start:.0f}°, {r.tilt_end:.0f}°], 匹配点={r.match_count}")
  547. # ===================== 阶段2: 使用阶段1扫描数据 + 补充校准 =====================
  548. # 阶段1已对整个视野扫描并记录了(pan, tilt) → (x_ratio, y_ratio)对应关系
  549. # 直接使用这些数据比阶段2重新在单个区间内采集更全面、更高效
  550. valid_points = []
  551. # 直接从阶段1扫描结果构建校准点
  552. scan_results = getattr(self.overlap_discovery, 'scan_results', [])
  553. if scan_results:
  554. logger.info(f"使用阶段1扫描数据: {len(scan_results)}个有效匹配位置")
  555. for pan, tilt, match_count, x_ratio, y_ratio in scan_results:
  556. valid_points.append(CalibrationPoint(
  557. pan=pan, tilt=tilt, zoom=1.0,
  558. x_ratio=x_ratio, y_ratio=y_ratio,
  559. detected=True, match_count=match_count
  560. ))
  561. else:
  562. logger.warning("阶段1无扫描数据,回退到阶段2逐点校准")
  563. # 如果扫描数据不足,补充在重叠区内采集更多点
  564. min_scan_points = 8
  565. if len(valid_points) < min_scan_points:
  566. logger.info(f"扫描数据不足({len(valid_points)}<{min_scan_points}),在重叠区间内补充采集")
  567. supplement_points = self._generate_points_in_overlaps(quick_mode)
  568. total_supplement = len(supplement_points)
  569. supplement_valid = 0
  570. for idx, point in enumerate(supplement_points):
  571. if self.on_progress:
  572. self.on_progress(idx + 1, total_supplement,
  573. f"补充校准点 {idx + 1}/{total_supplement}: pan={point.pan:.1f}°, tilt={point.tilt:.1f}°")
  574. logger.info(f"补充校准点 {idx + 1}/{total_supplement}: pan={point.pan:.1f}°, tilt={point.tilt:.1f}°")
  575. # 获取移动前全景帧
  576. frames_before_list = []
  577. for _ in range(3):
  578. frame = self.get_frame()
  579. if frame is not None:
  580. frames_before_list.append(frame)
  581. time.sleep(0.1)
  582. if not frames_before_list:
  583. continue
  584. frames_before = np.mean(frames_before_list, axis=0).astype(np.uint8)
  585. # 移动球机
  586. if not self.ptz.goto_exact_position(point.pan, point.tilt, 1):
  587. continue
  588. time.sleep(self.stabilize_time)
  589. # 获取移动后帧
  590. frames_after_list = []
  591. for _ in range(3):
  592. frame = self.get_frame()
  593. if frame is not None:
  594. frames_after_list.append(frame)
  595. time.sleep(0.1)
  596. if not frames_after_list:
  597. continue
  598. panorama_frame = np.mean(frames_after_list, axis=0).astype(np.uint8)
  599. # 球机抓拍
  600. ptz_frame = None
  601. if self.ptz_capture:
  602. try:
  603. ptz_frame = self.ptz_capture()
  604. except Exception:
  605. pass
  606. # 特征匹配验证
  607. if ptz_frame is not None and panorama_frame is not None:
  608. success, match_count, cx, cy = self.overlap_discovery.match_frames(ptz_frame, panorama_frame)
  609. if success:
  610. h, w = panorama_frame.shape[:2]
  611. point.x_ratio = cx / w
  612. point.y_ratio = cy / h
  613. point.detected = True
  614. valid_points.append(point)
  615. supplement_valid += 1
  616. logger.info(f"补充点验证通过: {match_count}个匹配点, "
  617. f"全景位置=({point.x_ratio:.3f}, {point.y_ratio:.3f})")
  618. continue
  619. # 运动检测备选
  620. if self.use_motion_detection and frames_before is not None and panorama_frame is not None:
  621. motion_result = self.visual_detector.detect_by_motion(frames_before, panorama_frame)
  622. if motion_result:
  623. point.x_ratio, point.y_ratio = motion_result
  624. point.detected = True
  625. valid_points.append(point)
  626. supplement_valid += 1
  627. logger.info(f"运动检测定位: ({point.x_ratio:.3f}, {point.y_ratio:.3f})")
  628. logger.info(f"补充采集: {supplement_valid}/{total_supplement} 个点验证通过")
  629. # ===================== 检查有效校准点 =====================
  630. min_valid = 4
  631. if len(valid_points) < min_valid:
  632. self.state = CalibrationState.FAILED
  633. self.result = CalibrationResult(
  634. success=False,
  635. points=valid_points,
  636. error_message=f"有效校准点不足 (需要至少{min_valid}个, 实际{len(valid_points)}个)。"
  637. f"请检查球机与全景的视野重叠是否足够。"
  638. )
  639. logger.error(f"校准失败: {self.result.error_message}")
  640. if self.on_complete:
  641. self.on_complete(self.result)
  642. return self.result
  643. # ===================== 计算变换参数 =====================
  644. success = self._calculate_transform(valid_points)
  645. if success:
  646. # 构建分段线性查找表(主变换方法,处理pan环绕)
  647. lookup_ok = self._build_lookup_tables(valid_points)
  648. self.state = CalibrationState.SUCCESS
  649. rms_error = self._calculate_rms_error(valid_points)
  650. self.result = CalibrationResult(
  651. success=True,
  652. points=valid_points,
  653. rms_error=rms_error
  654. )
  655. logger.info(f"校准成功! 有效校准点: {len(valid_points)}, "
  656. f"重叠区间数: {len(self.overlap_ranges)}, RMS误差: {rms_error:.4f}°")
  657. # 校准验证:将球机移到全景画面中心,检查是否指向正确位置
  658. verify_ok = self._verify_calibration()
  659. if not verify_ok:
  660. logger.warning("校准验证未通过,校准结果可能不准确")
  661. # 自动保存校准结果
  662. try:
  663. from config import CALIBRATION_CONFIG
  664. if CALIBRATION_CONFIG.get('auto_save', True):
  665. filepath = CALIBRATION_CONFIG.get('calibration_file', 'calibration.json')
  666. self.save_calibration(filepath)
  667. except Exception:
  668. pass
  669. # 校准完成后,将球机复位到初始位置
  670. self._reset_ptz_position()
  671. else:
  672. self.state = CalibrationState.FAILED
  673. self.result = CalibrationResult(
  674. success=False,
  675. points=valid_points,
  676. error_message="变换参数计算失败"
  677. )
  678. logger.error(f"校准失败: {self.result.error_message}")
  679. if self.on_complete:
  680. self.on_complete(self.result)
  681. return self.result
  682. def _reset_ptz_position(self):
  683. """校准完成后将球机复位到初始位置"""
  684. if self.ptz is None:
  685. return
  686. try:
  687. # 获取默认位置配置,优先从传入的球机配置读取
  688. ptz_config = getattr(self.ptz, 'ptz_config', None)
  689. if ptz_config is None:
  690. from config import PTZ_CONFIG
  691. ptz_config = PTZ_CONFIG
  692. default_pan = ptz_config.get('default_pan', 0)
  693. default_tilt = ptz_config.get('default_tilt', 0)
  694. default_zoom = ptz_config.get('default_zoom', 1)
  695. logger.info(f"球机复位到位置: pan={default_pan}, tilt={default_tilt}, zoom={default_zoom}")
  696. self.ptz.goto_exact_position(default_pan, default_tilt, default_zoom)
  697. time.sleep(0.5)
  698. except Exception as e:
  699. logger.warning(f"球机复位失败: {e}")
  700. def _verify_calibration(self) -> bool:
  701. """
  702. 校准验证:将球机移到全景画面中心对应的PTZ角度,
  703. 通过特征匹配验证球机是否指向了全景画面中心区域。
  704. 同时验证全景画面中的多个关键位置(左、中、右),
  705. 确保变换在整个视野范围内基本正确。
  706. Returns:
  707. 验证是否通过
  708. """
  709. logger.info("=" * 50)
  710. logger.info("校准验证: 将球机移到全景画面中心位置")
  711. logger.info("=" * 50)
  712. if self.ptz is None or self.get_frame is None:
  713. logger.warning("无法执行校准验证: PTZ或全景帧获取函数不可用")
  714. return False
  715. # 验证位置列表:全景画面中的关键位置
  716. verify_positions = [
  717. ("全景中心", 0.5, 0.5),
  718. ("全景左侧", 0.25, 0.5),
  719. ("全景右侧", 0.75, 0.5),
  720. ]
  721. passed = 0
  722. total = len(verify_positions)
  723. for name, x_ratio, y_ratio in verify_positions:
  724. pan, tilt = self.transform(x_ratio, y_ratio)
  725. logger.info(f"验证 {name} ({x_ratio:.2f}, {y_ratio:.2f}) → "
  726. f"PTZ角度: pan={pan:.1f}°, tilt={tilt:.1f}°")
  727. # 检查角度是否在合理范围
  728. if pan < -10 or pan > 370 or tilt < -95 or tilt > 95:
  729. logger.warning(f" 变换结果异常: pan={pan:.1f}°, tilt={tilt:.1f}° 超出合理范围")
  730. continue
  731. # 移动球机到计算出的位置
  732. if not self.ptz.goto_exact_position(pan, tilt, 1):
  733. logger.warning(f" 移动球机失败")
  734. continue
  735. time.sleep(self.stabilize_time)
  736. # 获取全景帧和球机帧
  737. panorama_frame = self.get_frame()
  738. ptz_frame = self.ptz_capture() if self.ptz_capture else None
  739. if panorama_frame is None or ptz_frame is None:
  740. logger.warning(f" 获取帧失败: 全景={'OK' if panorama_frame is not None else '失败'}, "
  741. f"球机={'OK' if ptz_frame is not None else '失败'}")
  742. continue
  743. # 特征匹配验证
  744. success, match_count, cx, cy = self.overlap_discovery.match_frames(
  745. ptz_frame, panorama_frame
  746. )
  747. h, w = panorama_frame.shape[:2]
  748. match_x_ratio = cx / w
  749. match_y_ratio = cy / h
  750. # 计算期望位置与实际匹配位置的偏差
  751. position_error = math.sqrt(
  752. (match_x_ratio - x_ratio) ** 2 + (match_y_ratio - y_ratio) ** 2
  753. )
  754. if success:
  755. logger.info(f" 匹配成功: {match_count}个特征点, "
  756. f"匹配位置=({match_x_ratio:.3f}, {match_y_ratio:.3f}), "
  757. f"期望位置=({x_ratio:.3f}, {y_ratio:.3f}), "
  758. f"位置偏差={position_error:.3f}")
  759. if position_error < 0.15:
  760. passed += 1
  761. logger.info(f" 验证通过 (偏差 < 15%)")
  762. else:
  763. logger.warning(f" 验证偏差较大 ({position_error:.1%}),校准精度可能不足")
  764. else:
  765. logger.warning(f" 特征匹配不足({match_count}点), "
  766. f"球机可能未指向全景画面中期望的位置")
  767. logger.info(f"校准验证结果: {passed}/{total} 个位置验证通过")
  768. if passed == 0:
  769. logger.error("所有验证位置均未通过,校准结果可能完全错误!请检查:")
  770. logger.error(" 1. 球机安装方向配置是否正确 (mount_type, pan_flip, tilt_flip)")
  771. logger.error(" 2. 两台摄像头的相对位置是否合理")
  772. logger.error(" 3. 球机PTZ角度范围是否配置正确")
  773. return False
  774. if passed < total:
  775. logger.warning(f"部分验证未通过,校准精度可能有限")
  776. return True
  777. return True
  778. def _generate_points_in_overlaps(self, quick_mode: bool = True) -> List[CalibrationPoint]:
  779. """
  780. 在发现的重叠区间内生成校准点
  781. 只在球机和全景有视觉重叠的区域生成点
  782. """
  783. points = []
  784. if quick_mode:
  785. # 快速模式: 每个重叠区间内生成9个点(3x3网格)
  786. for overlap in self.overlap_ranges:
  787. # 在区间中心生成点
  788. pan_center = (overlap.pan_start + overlap.pan_end) / 2
  789. tilt_center = (overlap.tilt_start + overlap.tilt_end) / 2
  790. pan_span = overlap.pan_end - overlap.pan_start
  791. tilt_span = overlap.tilt_end - overlap.tilt_start
  792. # 3x3网格分布
  793. pan_positions = [0.25, 0.5, 0.75] if pan_span > 10 else [0.5]
  794. tilt_positions = [0.25, 0.5, 0.75] if tilt_span > 10 else [0.5]
  795. for pf in pan_positions:
  796. for tf in tilt_positions:
  797. points.append(CalibrationPoint(
  798. pan=overlap.pan_start + pan_span * pf,
  799. tilt=overlap.tilt_start + tilt_span * tf,
  800. zoom=1.0))
  801. else:
  802. # 完整模式: 在每个重叠区间内均匀分布
  803. grid_size = 5
  804. for overlap in self.overlap_ranges:
  805. for i in range(grid_size):
  806. for j in range(grid_size):
  807. pan = overlap.pan_start + (overlap.pan_end - overlap.pan_start) * i / (grid_size - 1)
  808. tilt = overlap.tilt_start + (overlap.tilt_end - overlap.tilt_start) * j / (grid_size - 1)
  809. points.append(CalibrationPoint(pan=pan, tilt=tilt, zoom=1.0))
  810. return points
  811. def _calculate_transform(self, points: List[CalibrationPoint]) -> bool:
  812. """使用RANSAC + 最小二乘法拟合变换参数,剔除异常值"""
  813. try:
  814. if len(points) < 4:
  815. logger.error(f"计算变换参数错误: 有效点不足({len(points)}个)")
  816. return False
  817. pan_values = np.array([p.pan for p in points])
  818. tilt_values = np.array([p.tilt for p in points])
  819. x_ratios = np.array([p.x_ratio for p in points])
  820. y_ratios = np.array([p.y_ratio for p in points])
  821. # 记录原始校准数据便于调试
  822. logger.info("校准点原始数据:")
  823. for i, p in enumerate(points):
  824. logger.info(f" 点{i+1}: pan={p.pan:.1f}°, tilt={p.tilt:.1f}° → "
  825. f"全景位置=({p.x_ratio:.3f}, {p.y_ratio:.3f})")
  826. # 展开pan角度避免0°/360°边界不连续性
  827. pan_unwrapped = self._unwrap_pan_angles(pan_values)
  828. if not np.allclose(pan_values, pan_unwrapped, atol=0.1):
  829. logger.info(f"Pan角度展开: 原始={pan_values.tolist()} → 展开后={pan_unwrapped.tolist()}")
  830. else:
  831. logger.info("Pan角度无需展开(无0°/360°边界跨越)")
  832. # RANSAC剔除异常值 (使用展开后的pan)
  833. inlier_mask = self._ransac_filter(x_ratios, y_ratios, pan_unwrapped, tilt_values)
  834. inlier_count = np.sum(inlier_mask)
  835. if inlier_count < 4:
  836. logger.warning(f"RANSAC后有效点不足({inlier_count}个),使用全部点")
  837. inlier_mask = np.ones(len(points), dtype=bool)
  838. inlier_count = np.sum(inlier_mask)
  839. else:
  840. logger.info(f"RANSAC: {len(points)}个点中{inlier_count}个内点,"
  841. f"剔除{len(points) - inlier_count}个异常值")
  842. # 记录内点数据
  843. logger.info("RANSAC内点数据:")
  844. for i, p in enumerate(points):
  845. if inlier_mask[i]:
  846. logger.info(f" 点{i+1}: pan={pan_unwrapped[i]:.1f}°(原始={p.pan:.1f}°), "
  847. f"tilt={p.tilt:.1f}° → ({p.x_ratio:.3f}, {p.y_ratio:.3f})")
  848. # 用内点拟合完整模型
  849. A = np.ones((inlier_count, 3))
  850. A[:, 1] = x_ratios[inlier_mask]
  851. A[:, 2] = y_ratios[inlier_mask]
  852. pan_params, _, _, _ = np.linalg.lstsq(A, pan_unwrapped[inlier_mask], rcond=None)
  853. tilt_params, _, _, _ = np.linalg.lstsq(A, tilt_values[inlier_mask], rcond=None)
  854. self.pan_offset = pan_params[0]
  855. self.pan_scale_x = pan_params[1]
  856. self.pan_scale_y = pan_params[2]
  857. self.tilt_offset = tilt_params[0]
  858. self.tilt_scale_x = tilt_params[1]
  859. self.tilt_scale_y = tilt_params[2]
  860. # 系数合理性检查
  861. pan_coeffs_ok = (abs(self.pan_scale_x) < 500 and abs(self.pan_scale_y) < 500)
  862. tilt_coeffs_ok = (abs(self.tilt_scale_x) < 300 and abs(self.tilt_scale_y) < 300)
  863. if not (pan_coeffs_ok and tilt_coeffs_ok):
  864. logger.warning(f"完整模型系数异常: pan_scale_x={self.pan_scale_x:.1f}, "
  865. f"pan_scale_y={self.pan_scale_y:.1f}, "
  866. f"tilt_scale_x={self.tilt_scale_x:.1f}, "
  867. f"tilt_scale_y={self.tilt_scale_y:.1f}")
  868. logger.info("尝试简化模型: pan仅依赖x, tilt仅依赖y")
  869. # 简化模型: pan = offset + scale_x * x
  870. # tilt = offset + scale_y * y
  871. A_pan = np.ones((inlier_count, 2))
  872. A_pan[:, 1] = x_ratios[inlier_mask]
  873. pan_params_s, _, _, _ = np.linalg.lstsq(A_pan, pan_unwrapped[inlier_mask], rcond=None)
  874. A_tilt = np.ones((inlier_count, 2))
  875. A_tilt[:, 1] = y_ratios[inlier_mask]
  876. tilt_params_s, _, _, _ = np.linalg.lstsq(A_tilt, tilt_values[inlier_mask], rcond=None)
  877. self.pan_offset = pan_params_s[0]
  878. self.pan_scale_x = pan_params_s[1]
  879. self.pan_scale_y = 0.0
  880. self.tilt_offset = tilt_params_s[0]
  881. self.tilt_scale_x = 0.0
  882. self.tilt_scale_y = tilt_params_s[1]
  883. logger.info(f"简化模型: pan={self.pan_offset:.2f} + {self.pan_scale_x:.2f}*x, "
  884. f"tilt={self.tilt_offset:.2f} + {self.tilt_scale_y:.2f}*y")
  885. # 验证变换对全景中心的预测是否合理
  886. center_pan, center_tilt = self.transform(0.5, 0.5)
  887. logger.info(f"全景中心(0.5,0.5)预测: pan={center_pan:.1f}°, tilt={center_tilt:.1f}°")
  888. logger.info(f"最终变换参数: pan = {self.pan_offset:.2f} + {self.pan_scale_x:.2f}*x + {self.pan_scale_y:.2f}*y, "
  889. f"tilt = {self.tilt_offset:.2f} + {self.tilt_scale_x:.2f}*x + {self.tilt_scale_y:.2f}*y")
  890. return True
  891. except Exception as e:
  892. logger.error(f"计算变换参数错误: {e}")
  893. import traceback
  894. logger.error(traceback.format_exc())
  895. return False
  896. def _ransac_filter(self, x: np.ndarray, y: np.ndarray,
  897. pan: np.ndarray, tilt: np.ndarray,
  898. max_iterations: int = 200, threshold: float = 15.0,
  899. min_samples: int = 4) -> np.ndarray:
  900. """RANSAC剔除变换拟合中的异常值(pan应已展开为连续值)"""
  901. n = len(x)
  902. best_inliers = np.zeros(n, dtype=bool)
  903. best_inlier_count = 0
  904. rng = np.random.RandomState(42)
  905. for _ in range(max_iterations):
  906. # 随机选min_samples个点
  907. indices = rng.choice(n, min_samples, replace=False)
  908. # 用这些点拟合
  909. A = np.ones((min_samples, 3))
  910. A[:, 1] = x[indices]
  911. A[:, 2] = y[indices]
  912. try:
  913. pan_params, _, _, _ = np.linalg.lstsq(A, pan[indices], rcond=None)
  914. tilt_params, _, _, _ = np.linalg.lstsq(A, tilt[indices], rcond=None)
  915. except np.linalg.LinAlgError:
  916. continue
  917. # 计算所有点的误差
  918. pred_pan = pan_params[0] + pan_params[1] * x + pan_params[2] * y
  919. pred_tilt = tilt_params[0] + tilt_params[1] * x + tilt_params[2] * y
  920. # 使用角度差计算pan误差(即使已展开,仍用角度差以防边界情况)
  921. pan_errors = np.array([self._angular_diff(float(pred_pan[i]), float(pan[i]))
  922. for i in range(n)])
  923. tilt_errors = pred_tilt - tilt
  924. errors = np.sqrt(pan_errors ** 2 + tilt_errors ** 2)
  925. inliers = errors < threshold
  926. inlier_count = np.sum(inliers)
  927. if inlier_count > best_inlier_count:
  928. best_inlier_count = inlier_count
  929. best_inliers = inliers
  930. if best_inlier_count == 0:
  931. return np.ones(n, dtype=bool)
  932. return best_inliers
  933. def _calculate_rms_error(self, points: List[CalibrationPoint]) -> float:
  934. """计算均方根误差(使用角度差处理pan环绕)"""
  935. total_error = 0.0
  936. for p in points:
  937. pred_pan, pred_tilt = self.transform(p.x_ratio, p.y_ratio)
  938. # 使用角度差计算pan误差,处理0°/360°环绕
  939. pan_error = self._angular_diff(pred_pan, p.pan)
  940. tilt_error = pred_tilt - p.tilt
  941. error = math.sqrt(pan_error ** 2 + tilt_error ** 2)
  942. total_error += error ** 2
  943. return math.sqrt(total_error / len(points))
  944. def transform(self, x_ratio: float, y_ratio: float) -> Tuple[float, float]:
  945. """将全景坐标转换为PTZ角度 - 梯形透视补偿"""
  946. # 优先使用分段线性查找表(pan)
  947. if self.pan_lookup:
  948. pan = self._interp_lookup(self.pan_lookup, x_ratio)
  949. else:
  950. pan = self.pan_offset + self.pan_scale_x * x_ratio + self.pan_scale_y * y_ratio
  951. # pan边缘曲线补偿:越靠近边缘补偿越大,中心不补偿
  952. # 梯形透视:底部(y大)更宽,边缘补偿更大;顶部(y小)更窄,补偿更小
  953. if self.pan_edge_offset != 0:
  954. dx = 2 * x_ratio - 1 # -1(左) ~ 0(中) ~ +1(右)
  955. y_scale = 0.3 + 0.7 * y_ratio # 顶部0.3倍,底部1.0倍
  956. pan_correction = self.pan_edge_offset * y_scale * math.copysign(abs(dx) ** self.pan_curve_power, dx)
  957. pan += pan_correction
  958. # pan 方向翻转(与 ptz_camera.calculate_ptz_position 保持一致)
  959. if self.pan_flip:
  960. pan = -pan
  961. # 将pan归一化到[0, 360),便于发送给球机
  962. pan = pan % 360
  963. # tilt:优先使用曲线映射(查找表tilt数据不稳定),后备查找表
  964. if self.tilt_linear_enabled:
  965. tilt = self.tilt_y0 + (self.tilt_y1 - self.tilt_y0) * (y_ratio ** self.tilt_curve_power)
  966. elif self.tilt_lookup:
  967. tilt = self._interp_lookup(self.tilt_lookup, y_ratio)
  968. else:
  969. tilt = self.tilt_offset + self.tilt_scale_x * x_ratio + self.tilt_scale_y * y_ratio
  970. # tilt 方向翻转(与 ptz_camera.calculate_ptz_position 保持一致)
  971. if self.tilt_flip:
  972. tilt = -tilt
  973. # 应用全局 tilt 偏移补偿(配置中正值=向下补偿)
  974. tilt += self.tilt_offset_deg
  975. return (pan, tilt)
  976. def _interp_lookup(self, lookup: List[Tuple[float, float]], ratio: float) -> float:
  977. """分段线性插值"""
  978. if not lookup:
  979. return 0.0
  980. if len(lookup) == 1:
  981. return lookup[0][1]
  982. if ratio <= lookup[0][0]:
  983. return lookup[0][1]
  984. if ratio >= lookup[-1][0]:
  985. return lookup[-1][1]
  986. # 二分查找插入位置
  987. lo, hi = 0, len(lookup) - 1
  988. while lo < hi - 1:
  989. mid = (lo + hi) // 2
  990. if lookup[mid][0] <= ratio:
  991. lo = mid
  992. else:
  993. hi = mid
  994. # 线性插值
  995. x0, v0 = lookup[lo]
  996. x1, v1 = lookup[hi]
  997. if abs(x1 - x0) < 1e-10:
  998. return v0
  999. t = (ratio - x0) / (x1 - x0)
  1000. return v0 + t * (v1 - v0)
  1001. def _build_lookup_tables(self, points: List[CalibrationPoint]) -> bool:
  1002. """
  1003. 从校准点构建分段线性查找表
  1004. 核心策略:
  1005. 1. 将所有校准点按x_ratio分桶,取匹配点数加权的pan值
  1006. 2. 用最长连续单调子序列(LCMA)过滤假阳性:x_ratio→pan应近似单调
  1007. 3. 处理pan角度环绕
  1008. """
  1009. if len(points) < 3:
  1010. return False
  1011. sorted_by_x = sorted(points, key=lambda p: p.x_ratio)
  1012. # ===== 构建 x_ratio → pan 映射 =====
  1013. grid_size = 0.05
  1014. x_buckets: Dict[float, List[Tuple[float, int]]] = {}
  1015. for p in sorted_by_x:
  1016. x_key = round(p.x_ratio / grid_size) * grid_size
  1017. if x_key not in x_buckets:
  1018. x_buckets[x_key] = []
  1019. match_count = getattr(p, 'match_count', 10)
  1020. x_buckets[x_key].append((p.pan, match_count))
  1021. # 加权中位数
  1022. raw_entries = []
  1023. for x_key in sorted(x_buckets.keys()):
  1024. entries = x_buckets[x_key]
  1025. total_weight = sum(mc for _, mc in entries)
  1026. weighted_pans = []
  1027. for pan, mc in entries:
  1028. weighted_pans.extend([pan] * max(1, mc // 5))
  1029. weighted_pan = float(np.median(weighted_pans))
  1030. raw_entries.append((x_key, weighted_pan, total_weight))
  1031. logger.info(f"Pan原始映射 ({len(raw_entries)} 个x_key):")
  1032. for x, pan, w in raw_entries:
  1033. logger.info(f" x={x:.3f} → pan={pan:.1f}° (weight={w})")
  1034. # 用LCMA过滤:找到最长的近似连续单调子序列
  1035. # pan随x_ratio应该是近似单调递减或递增的
  1036. if len(raw_entries) >= 3:
  1037. filtered = self._filter_continuous_monotonic(raw_entries)
  1038. self.pan_lookup = [(x, pan) for x, pan, w in filtered]
  1039. else:
  1040. self.pan_lookup = [(x, pan % 360) for x, pan, w in raw_entries]
  1041. # ===== 构建 y_ratio → tilt 映射 =====
  1042. # 只使用通过pan过滤的点(x_ratio对应的pan与查找表一致)
  1043. pan_valid_x = set(x for x, _ in self.pan_lookup)
  1044. pan_tolerance = grid_size * 1.5 # 允许在pan有效区域附近的点
  1045. valid_points_for_tilt = []
  1046. for p in sorted_by_x:
  1047. for vx in pan_valid_x:
  1048. if abs(p.x_ratio - vx) <= pan_tolerance:
  1049. valid_points_for_tilt.append(p)
  1050. break
  1051. logger.info(f"Tilt映射使用 {len(valid_points_for_tilt)}/{len(sorted_by_x)} 个经过pan验证的点")
  1052. y_buckets: Dict[float, List[Tuple[float, int]]] = {}
  1053. for p in valid_points_for_tilt:
  1054. y_key = round(p.y_ratio / grid_size) * grid_size
  1055. if y_key not in y_buckets:
  1056. y_buckets[y_key] = []
  1057. match_count = getattr(p, 'match_count', 10)
  1058. y_buckets[y_key].append((p.tilt, match_count))
  1059. tilt_entries = []
  1060. for y_key in sorted(y_buckets.keys()):
  1061. entries = y_buckets[y_key]
  1062. weighted_tilts = []
  1063. for tilt, mc in entries:
  1064. weighted_tilts.extend([tilt] * max(1, mc // 5))
  1065. tilt_median = float(np.median(weighted_tilts))
  1066. tilt_entries.append((y_key, tilt_median))
  1067. self.tilt_lookup = tilt_entries
  1068. # 记录查找表内容
  1069. logger.info(f"Pan查找表 ({len(self.pan_lookup)} 项):")
  1070. for x, pan in self.pan_lookup:
  1071. logger.info(f" x={x:.3f} → pan={pan:.1f}°")
  1072. logger.info(f"Tilt查找表 ({len(self.tilt_lookup)} 项):")
  1073. for y, tilt in self.tilt_lookup:
  1074. logger.info(f" y={y:.3f} → tilt={tilt:.1f}°")
  1075. return True
  1076. def _filter_continuous_monotonic(
  1077. self, entries: List[Tuple[float, float, int]],
  1078. max_step: float = 60.0
  1079. ) -> List[Tuple[float, float, int]]:
  1080. """
  1081. 过滤出最长的连续单调子序列
  1082. x_ratio→pan应该是近似单调的(递增或递减,可能环绕一次)。
  1083. 假阳性匹配会导致pan突然跳变到完全不相关的角度,
  1084. 这个方法通过寻找最长的"步长<max_step"的子序列来过滤。
  1085. Args:
  1086. entries: [(x_key, pan, weight), ...] 已按x_key排序
  1087. max_step: 相邻两个点允许的最大pan差(度)
  1088. Returns:
  1089. 过滤后的entries子集
  1090. """
  1091. n = len(entries)
  1092. if n <= 2:
  1093. return [(x, pan % 360, w) for x, pan, w in entries]
  1094. # 尝试两种方向:pan递减和pan递增
  1095. # 对于递减方向:每个点的pan应比前一个小(允许环绕)
  1096. best_result = []
  1097. for direction in ['decreasing', 'increasing']:
  1098. # 动态规划找最长连续子序列
  1099. # dp[i] = 以entries[i]结尾的最长连续子序列长度
  1100. # parent[i] = 前驱索引
  1101. dp = [1] * n
  1102. parent = [-1] * n
  1103. for i in range(1, n):
  1104. for j in range(i):
  1105. # 检查j→i的pan变化是否合理
  1106. pan_j = entries[j][1]
  1107. pan_i = entries[i][1]
  1108. # 计算角度差(考虑环绕)
  1109. diff = pan_i - pan_j
  1110. while diff > 180:
  1111. diff -= 360
  1112. while diff < -180:
  1113. diff += 360
  1114. # 检查方向
  1115. if direction == 'decreasing':
  1116. ok = diff <= 0 and abs(diff) <= max_step
  1117. else:
  1118. ok = diff >= 0 and abs(diff) <= max_step
  1119. if ok and dp[j] + 1 > dp[i]:
  1120. dp[i] = dp[j] + 1
  1121. parent[i] = j
  1122. # 找最长子序列的终点
  1123. end = max(range(n), key=lambda i: dp[i])
  1124. # 回溯构建子序列
  1125. seq = []
  1126. idx = end
  1127. while idx >= 0:
  1128. seq.append(idx)
  1129. idx = parent[idx]
  1130. seq.reverse()
  1131. # 展开pan角度
  1132. result = self._unwrap_sequence(entries, seq)
  1133. if len(result) > len(best_result):
  1134. best_result = result
  1135. logger.info(f"LCMA过滤: {n}个点 → {len(best_result)}个点 (方向: "
  1136. f"{'递减' if len(best_result) > 0 else '无'})")
  1137. # 如果过滤后太少,放宽条件重试
  1138. if len(best_result) < 3 and n >= 3:
  1139. logger.info("LCMA结果太少,放宽步长限制重试")
  1140. for wider_step in [90, 120, 180]:
  1141. for direction in ['decreasing', 'increasing']:
  1142. dp = [1] * n
  1143. parent = [-1] * n
  1144. for i in range(1, n):
  1145. for j in range(i):
  1146. diff = entries[i][1] - entries[j][1]
  1147. while diff > 180:
  1148. diff -= 360
  1149. while diff < -180:
  1150. diff += 360
  1151. if direction == 'decreasing':
  1152. ok = diff <= 0 and abs(diff) <= wider_step
  1153. else:
  1154. ok = diff >= 0 and abs(diff) <= wider_step
  1155. if ok and dp[j] + 1 > dp[i]:
  1156. dp[i] = dp[j] + 1
  1157. parent[i] = j
  1158. end = max(range(n), key=lambda i: dp[i])
  1159. seq = []
  1160. idx = end
  1161. while idx >= 0:
  1162. seq.append(idx)
  1163. idx = parent[idx]
  1164. seq.reverse()
  1165. result = self._unwrap_sequence(entries, seq)
  1166. if len(result) > len(best_result):
  1167. best_result = result
  1168. if len(best_result) >= 3:
  1169. break
  1170. if not best_result:
  1171. # 全部过滤后为空,使用原始数据
  1172. logger.warning("LCMA过滤后为空,使用原始数据")
  1173. return [(x, pan % 360, w) for x, pan, w in entries]
  1174. return best_result
  1175. def _unwrap_sequence(
  1176. self, entries: List[Tuple[float, float, int]],
  1177. indices: List[int]
  1178. ) -> List[Tuple[float, float, int]]:
  1179. """将子序列的pan角度展开为连续值(不强制归一化到[0,360),便于插值)"""
  1180. result = []
  1181. prev_unwrapped = None
  1182. for idx in indices:
  1183. x, pan, w = entries[idx]
  1184. if prev_unwrapped is None:
  1185. unwrapped = pan
  1186. else:
  1187. diff = pan - prev_unwrapped
  1188. while diff > 180:
  1189. pan -= 360
  1190. diff = pan - prev_unwrapped
  1191. while diff < -180:
  1192. pan += 360
  1193. diff = pan - prev_unwrapped
  1194. unwrapped = pan
  1195. prev_unwrapped = unwrapped
  1196. # 保持连续值,transform 返回前再归一化到 [0,360)
  1197. result.append((x, unwrapped, w))
  1198. return result
  1199. def inverse_transform(self, pan: float, tilt: float) -> Tuple[float, float]:
  1200. """将PTZ角度转换为全景坐标"""
  1201. # 优先使用查找表的反向查找
  1202. if self.pan_lookup and self.tilt_lookup:
  1203. # 反向查找: pan → x_ratio
  1204. x_ratio = self._reverse_lookup(self.pan_lookup, pan % 360)
  1205. y_ratio = self._reverse_lookup(self.tilt_lookup, tilt)
  1206. return (max(0, min(1, x_ratio)), max(0, min(1, y_ratio)))
  1207. # 后备:线性模型逆变换
  1208. M = np.array([
  1209. [self.pan_scale_x, self.pan_scale_y],
  1210. [self.tilt_scale_x, self.tilt_scale_y]
  1211. ])
  1212. det = np.linalg.det(M)
  1213. if abs(det) < 1e-10:
  1214. x_ratio = (pan - self.pan_offset) / self.pan_scale_x if abs(self.pan_scale_x) > 1e-10 else 0.5
  1215. y_ratio = (tilt - self.tilt_offset) / self.tilt_scale_y if abs(self.tilt_scale_y) > 1e-10 else 0.5
  1216. else:
  1217. M_inv = np.linalg.inv(M)
  1218. offset = np.array([pan - self.pan_offset, tilt - self.tilt_offset])
  1219. result = M_inv @ offset
  1220. x_ratio, y_ratio = result[0], result[1]
  1221. return (max(0, min(1, x_ratio)), max(0, min(1, y_ratio)))
  1222. def _reverse_lookup(self, lookup: List[Tuple[float, float]], value: float) -> float:
  1223. """查找表反向查找:从value找ratio"""
  1224. if not lookup:
  1225. return 0.5
  1226. # 处理pan环绕:找到最接近的段
  1227. best_idx = 0
  1228. best_diff = float('inf')
  1229. for i, (ratio, v) in enumerate(lookup):
  1230. diff = self._angular_diff(value, v)
  1231. if abs(diff) < abs(best_diff):
  1232. best_diff = diff
  1233. best_idx = i
  1234. # 精确定位到最近的两个点之间
  1235. if best_idx == 0:
  1236. return lookup[0][0]
  1237. if best_idx == len(lookup) - 1:
  1238. return lookup[-1][0]
  1239. # 检查前一个和后一个点,选择更近的段
  1240. prev_v = lookup[best_idx - 1][1]
  1241. curr_v = lookup[best_idx][1]
  1242. next_v = lookup[best_idx + 1][1] if best_idx + 1 < len(lookup) else curr_v
  1243. # 在 (best_idx-1, best_idx) 和 (best_idx, best_idx+1) 之间选择
  1244. if abs(self._angular_diff(value, prev_v)) < abs(self._angular_diff(value, next_v)):
  1245. lo, hi = best_idx - 1, best_idx
  1246. else:
  1247. lo, hi = best_idx, best_idx + 1
  1248. x0, v0 = lookup[lo]
  1249. x1, v1 = lookup[hi]
  1250. # 考虑角度环绕
  1251. diff_v = self._angular_diff(v1, v0)
  1252. if abs(diff_v) < 1e-10:
  1253. return (x0 + x1) / 2
  1254. t = self._angular_diff(value, v0) / diff_v
  1255. t = max(0, min(1, t))
  1256. return x0 + t * (x1 - x0)
  1257. def is_calibrated(self) -> bool:
  1258. return self.state == CalibrationState.SUCCESS
  1259. def get_state(self) -> CalibrationState:
  1260. return self.state
  1261. def get_result(self) -> Optional[CalibrationResult]:
  1262. return self.result
  1263. def get_overlap_ranges(self) -> List[OverlapRange]:
  1264. """返回发现的重叠区间"""
  1265. return self.overlap_ranges
  1266. def save_calibration(self, filepath: str) -> bool:
  1267. """保存校准结果"""
  1268. if not self.is_calibrated():
  1269. return False
  1270. try:
  1271. import json
  1272. ptz_config = getattr(self.ptz, 'ptz_config', None) or _get_ptz_config()
  1273. data = {
  1274. 'pan_offset': self.pan_offset,
  1275. 'pan_scale_x': self.pan_scale_x,
  1276. 'pan_scale_y': self.pan_scale_y,
  1277. 'tilt_offset': self.tilt_offset,
  1278. 'tilt_scale_x': self.tilt_scale_x,
  1279. 'tilt_scale_y': self.tilt_scale_y,
  1280. 'rms_error': self.result.rms_error if self.result else 0,
  1281. 'overlap_ranges': [
  1282. {
  1283. 'pan_start': r.pan_start,
  1284. 'pan_end': r.pan_end,
  1285. 'tilt_start': r.tilt_start,
  1286. 'tilt_end': r.tilt_end,
  1287. 'match_count': r.match_count
  1288. }
  1289. for r in self.overlap_ranges
  1290. ],
  1291. # 分段线性查找表
  1292. 'pan_lookup': self.pan_lookup,
  1293. 'tilt_lookup': self.tilt_lookup,
  1294. # 保存安装方向配置
  1295. 'mount_type': ptz_config.get('mount_type', 'wall'),
  1296. 'tilt_flip': ptz_config.get('tilt_flip', False),
  1297. 'pan_flip': ptz_config.get('pan_flip', False),
  1298. }
  1299. with open(filepath, 'w') as f:
  1300. json.dump(data, f, indent=2)
  1301. logger.info(f"校准结果已保存: {filepath}")
  1302. return True
  1303. except Exception as e:
  1304. logger.error(f"保存校准结果失败: {e}")
  1305. return False
  1306. def load_calibration(self, filepath: str) -> bool:
  1307. """加载校准结果"""
  1308. try:
  1309. import json
  1310. with open(filepath, 'r') as f:
  1311. data = json.load(f)
  1312. self.pan_offset = data['pan_offset']
  1313. self.pan_scale_x = data['pan_scale_x']
  1314. self.pan_scale_y = data['pan_scale_y']
  1315. self.tilt_offset = data['tilt_offset']
  1316. self.tilt_scale_x = data['tilt_scale_x']
  1317. self.tilt_scale_y = data['tilt_scale_y']
  1318. # 加载分段线性查找表
  1319. self.pan_lookup = [tuple(p) for p in data.get('pan_lookup', [])]
  1320. self.tilt_lookup = [tuple(t) for t in data.get('tilt_lookup', [])]
  1321. # 加载重叠区间(如果有)
  1322. if 'overlap_ranges' in data:
  1323. self.overlap_ranges = [
  1324. OverlapRange(
  1325. pan_start=r['pan_start'],
  1326. pan_end=r['pan_end'],
  1327. tilt_start=r['tilt_start'],
  1328. tilt_end=r['tilt_end'],
  1329. match_count=r.get('match_count', 0),
  1330. panorama_center_x=0,
  1331. panorama_center_y=0
  1332. )
  1333. for r in data['overlap_ranges']
  1334. ]
  1335. self.state = CalibrationState.SUCCESS
  1336. self.result = CalibrationResult(
  1337. success=True,
  1338. points=[],
  1339. rms_error=data.get('rms_error', 0)
  1340. )
  1341. # 检查安装方向配置是否匹配
  1342. ptz_config = getattr(self.ptz, 'ptz_config', None) or _get_ptz_config()
  1343. current_mount = ptz_config.get('mount_type', 'wall')
  1344. saved_mount = data.get('mount_type', 'wall')
  1345. if current_mount != saved_mount:
  1346. logger.warning(f"当前安装类型({current_mount})与校准时的({saved_mount})不同,建议重新校准!")
  1347. logger.info(f"校准结果已加载: {filepath}")
  1348. return True
  1349. except FileNotFoundError:
  1350. logger.warning(f"校准文件不存在: {filepath}")
  1351. return False
  1352. except Exception as e:
  1353. logger.error(f"加载校准结果失败: {e}")
  1354. return False
  1355. class CalibrationManager:
  1356. """校准管理器"""
  1357. def __init__(self, calibrator: CameraCalibrator, calibration_file: str = None):
  1358. self.calibrator = calibrator
  1359. # 优先使用传入的路径,否则从配置读取,最后使用默认值
  1360. if calibration_file:
  1361. self.calibration_file = calibration_file
  1362. else:
  1363. try:
  1364. from config import CALIBRATION_CONFIG
  1365. self.calibration_file = CALIBRATION_CONFIG.get(
  1366. 'calibration_file', 'calibration.json'
  1367. )
  1368. except ImportError:
  1369. self.calibration_file = 'calibration.json'
  1370. def auto_calibrate(self, force: bool = False, fallback_on_failure: bool = True) -> CalibrationResult:
  1371. """
  1372. 自动校准
  1373. Args:
  1374. force: 是否强制重新校准(不加载已有数据)
  1375. fallback_on_failure: 校准失败时是否回退使用已有数据
  1376. Returns:
  1377. 校准结果
  1378. """
  1379. # 检查是否启用加载上次校准数据
  1380. load_on_startup = True # 默认启用
  1381. try:
  1382. from config import CALIBRATION_CONFIG
  1383. load_on_startup = CALIBRATION_CONFIG.get('load_on_startup', True)
  1384. except:
  1385. pass
  1386. # 如果不是强制校准,尝试加载已有数据
  1387. if not force and load_on_startup:
  1388. if self.calibrator.load_calibration(self.calibration_file):
  1389. logger.info("使用已有校准结果")
  1390. return self.calibrator.get_result()
  1391. # 执行新校准
  1392. if force:
  1393. logger.info("强制重新校准(不使用已有数据)...")
  1394. elif not load_on_startup:
  1395. logger.info("已禁用加载校准数据,开始新校准...")
  1396. else:
  1397. logger.info("开始自动校准...")
  1398. result = self.calibrator.calibrate(quick_mode=True)
  1399. if result.success:
  1400. self.calibrator.save_calibration(self.calibration_file)
  1401. elif fallback_on_failure:
  1402. # 校准失败,尝试回退使用已有数据
  1403. logger.warning("校准失败,尝试回退使用已有校准数据...")
  1404. if self.calibrator.load_calibration(self.calibration_file):
  1405. logger.info("已回退到已有校准数据")
  1406. result = self.calibrator.get_result()
  1407. return result
  1408. def check_calibration(self) -> Tuple[bool, str]:
  1409. """检查校准状态"""
  1410. state = self.calibrator.get_state()
  1411. if state == CalibrationState.SUCCESS:
  1412. result = self.calibrator.get_result()
  1413. overlaps = self.calibrator.get_overlap_ranges()
  1414. overlap_info = f", {len(overlaps)}个重叠区间" if overlaps else ""
  1415. return (True, f"校准有效, RMS误差: {result.rms_error:.4f}°{overlap_info}")
  1416. elif state == CalibrationState.FAILED:
  1417. return (False, "校准失败")
  1418. elif state == CalibrationState.RUNNING:
  1419. return (False, "校准进行中")
  1420. else:
  1421. return (False, "未校准")