calibration.py 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118
  1. """
  2. 相机校准模块
  3. 实现全景相机与球机的自动校准
  4. 建立画面坐标到PTZ角度的映射关系
  5. 核心改进:先发现视野重叠区域,再在重叠区内校准,
  6. 避免球机指向与全景画面无重叠的方向导致校准失败。
  7. """
  8. import time
  9. import math
  10. import threading
  11. import numpy as np
  12. import cv2
  13. from typing import List, Tuple, Dict, Optional, Callable
  14. from dataclasses import dataclass, field
  15. from enum import Enum
  16. from ptz_camera import PTZCamera
  17. # 加载PTZ配置
  18. def _get_ptz_config():
  19. try:
  20. from config import PTZ_CONFIG
  21. return PTZ_CONFIG
  22. except ImportError:
  23. return {
  24. 'mount_type': 'wall',
  25. 'tilt_flip': False,
  26. 'pan_flip': False
  27. }
  28. class CalibrationState(Enum):
  29. IDLE = 0
  30. RUNNING = 1
  31. SUCCESS = 2
  32. FAILED = 3
  33. @dataclass
  34. class CalibrationPoint:
  35. pan: float
  36. tilt: float
  37. zoom: float = 1.0
  38. x_ratio: float = 0.0
  39. y_ratio: float = 0.0
  40. detected: bool = False
  41. @dataclass
  42. class CalibrationResult:
  43. success: bool
  44. points: List[CalibrationPoint]
  45. transform_matrix: Optional[np.ndarray] = None
  46. error_message: str = ""
  47. rms_error: float = 0.0
  48. @dataclass
  49. class OverlapRange:
  50. pan_start: float
  51. pan_end: float
  52. tilt_start: float
  53. tilt_end: float
  54. match_count: int
  55. panorama_center_x: float
  56. panorama_center_y: float
  57. MIN_MATCH_THRESHOLD = 8
  58. class OverlapDiscovery:
  59. """
  60. 视野重叠发现器
  61. 扫描球机视野范围,找出与全景画面有视觉重叠的角度区间
  62. """
  63. def __init__(self, feature_type: str = 'SIFT'):
  64. try:
  65. self.feature_detector = cv2.SIFT_create()
  66. self.feature_type = 'SIFT'
  67. except AttributeError:
  68. self.feature_detector = cv2.ORB_create(nfeatures=500)
  69. self.feature_type = 'ORB'
  70. norm_type = cv2.NORM_L2 if self.feature_type == 'SIFT' else cv2.NORM_HAMMING
  71. self.matcher = cv2.BFMatcher(norm_type)
  72. def match_frames(self, ptz_frame: np.ndarray, panorama_frame: np.ndarray
  73. ) -> Tuple[bool, int, float, float]:
  74. """
  75. 特征匹配球机画面与全景画面
  76. Returns: (是否匹配成功, 匹配点数, 全景画面中心x, 全景画面中心y)
  77. """
  78. if ptz_frame is None or panorama_frame is None:
  79. return (False, 0, 0.0, 0.0)
  80. try:
  81. ptz_gray = cv2.cvtColor(ptz_frame, cv2.COLOR_BGR2GRAY) if len(ptz_frame.shape) == 3 else ptz_frame
  82. pan_gray = cv2.cvtColor(panorama_frame, cv2.COLOR_BGR2GRAY) if len(panorama_frame.shape) == 3 else panorama_frame
  83. kp1, des1 = self.feature_detector.detectAndCompute(ptz_gray, None)
  84. kp2, des2 = self.feature_detector.detectAndCompute(pan_gray, None)
  85. if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
  86. return (False, 0, 0.0, 0.0)
  87. matches = self.matcher.knnMatch(des1, des2, k=2)
  88. good_matches = []
  89. for match_pair in matches:
  90. if len(match_pair) == 2:
  91. m, n = match_pair
  92. if m.distance < 0.75 * n.distance:
  93. good_matches.append(m)
  94. if len(good_matches) < MIN_MATCH_THRESHOLD:
  95. return (False, len(good_matches), 0.0, 0.0)
  96. pan_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches])
  97. center_x = np.mean(pan_pts[:, 0])
  98. center_y = np.mean(pan_pts[:, 1])
  99. return (True, len(good_matches), center_x, center_y)
  100. except Exception as e:
  101. print(f" 特征匹配异常: {e}")
  102. return (False, 0, 0.0, 0.0)
  103. def discover_overlap_ranges(
  104. self,
  105. ptz: PTZCamera,
  106. get_panorama_frame: Callable[[], np.ndarray],
  107. ptz_capture: Callable[[], Optional[np.ndarray]],
  108. pan_range: Tuple[float, float] = (0, 360),
  109. tilt_range: Tuple[float, float] = (-30, 30),
  110. pan_step: float = 20,
  111. tilt_step: float = 15,
  112. stabilize_time: float = 2.0,
  113. on_progress: Callable[[int, int, str], None] = None,
  114. max_ranges: int = 3,
  115. min_positions_per_range: int = 3
  116. ) -> List[OverlapRange]:
  117. """
  118. 扫描球机视野范围,发现与全景画面有重叠的角度区间
  119. 1. 先拍一张全景参考帧
  120. 2. 逐步移动球机到各个角度
  121. 3. 在每个位置抓拍球机画面,与全景做特征匹配
  122. 4. 记录有足够匹配点的角度
  123. 5. 合并相邻的有重叠的角度形成区间
  124. """
  125. print(f"\n{'='*50}")
  126. print(f"阶段1: 视野重叠发现")
  127. print(f"扫描范围: pan={pan_range}, tilt={tilt_range}")
  128. print(f"步进: pan={pan_step}°, tilt={tilt_step}°")
  129. print(f"{'='*50}")
  130. # 1. 拍全景参考帧
  131. print(" 获取全景参考帧...")
  132. ref_frames = []
  133. for _ in range(3):
  134. frame = get_panorama_frame()
  135. if frame is not None:
  136. ref_frames.append(frame)
  137. time.sleep(0.1)
  138. if not ref_frames:
  139. print(" 错误: 无法获取全景参考帧!")
  140. return []
  141. panorama_ref = ref_frames[0]
  142. print(f" 全景参考帧: {panorama_ref.shape}")
  143. # 2. 扫描各个角度
  144. scan_results: List[Tuple[float, float, int, float, float]] = []
  145. pan_values = np.arange(pan_range[0], pan_range[1] + pan_step, pan_step)
  146. tilt_values = np.arange(tilt_range[0], tilt_range[1] + tilt_step, tilt_step)
  147. total_positions = len(pan_values) * len(tilt_values)
  148. current_idx = 0
  149. for pan in pan_values:
  150. for tilt in tilt_values:
  151. current_idx += 1
  152. pos_desc = f"pan={pan:.0f}°, tilt={tilt:.0f}°"
  153. if on_progress:
  154. on_progress(current_idx, total_positions, f"扫描 {pos_desc}")
  155. print(f" [{current_idx}/{total_positions}] {pos_desc}")
  156. # 移动球机
  157. if not ptz.goto_exact_position(float(pan), float(tilt), 1):
  158. print(f" 移动球机失败, 跳过")
  159. continue
  160. time.sleep(stabilize_time)
  161. # 抓拍球机画面
  162. ptz_frame = ptz_capture() if ptz_capture else None
  163. if ptz_frame is None:
  164. print(f" 球机抓拍失败, 跳过")
  165. continue
  166. # 获取当前全景帧并匹配
  167. cur_panorama = get_panorama_frame()
  168. if cur_panorama is None:
  169. continue
  170. success, match_count, cx, cy = self.match_frames(ptz_frame, cur_panorama)
  171. if success:
  172. h, w = cur_panorama.shape[:2]
  173. x_ratio = cx / w
  174. y_ratio = cy / h
  175. print(f" ✓ 匹配成功: {match_count}个特征点, 全景位置=({x_ratio:.3f}, {y_ratio:.3f})")
  176. scan_results.append((float(pan), float(tilt), match_count, x_ratio, y_ratio))
  177. else:
  178. print(f" ✗ 匹配不足: {match_count}个特征点")
  179. if not scan_results:
  180. print("\n 未发现任何视野重叠位置!")
  181. return []
  182. print(f"\n 发现 {len(scan_results)} 个有重叠的扫描位置")
  183. # 3. 合并相邻位置为重叠区间
  184. overlap_ranges = self._merge_scan_results(
  185. scan_results,
  186. max_ranges=max_ranges,
  187. min_positions=min_positions_per_range
  188. )
  189. for i, r in enumerate(overlap_ranges):
  190. print(f" 重叠区间 {i+1}: pan=[{r.pan_start:.0f}°, {r.pan_end:.0f}°], "
  191. f"tilt=[{r.tilt_start:.0f}°, {r.tilt_end:.0f}°], "
  192. f"匹配点={r.match_count}")
  193. return overlap_ranges
  194. def _merge_scan_results(
  195. self,
  196. results: List[Tuple[float, float, int, float, float]],
  197. pan_tolerance: float = 25,
  198. tilt_tolerance: float = 20,
  199. max_ranges: int = 3,
  200. min_positions: int = 3
  201. ) -> List[OverlapRange]:
  202. """
  203. 使用union-find连通分量聚类合并相邻扫描结果
  204. 只保留最大的 max_ranges 个区间
  205. """
  206. if not results:
  207. return []
  208. n = len(results)
  209. # union-find
  210. parent = list(range(n))
  211. def find(x):
  212. while parent[x] != x:
  213. parent[x] = parent[parent[x]]
  214. x = parent[x]
  215. return x
  216. def union(a, b):
  217. ra, rb = find(a), find(b)
  218. if ra != rb:
  219. parent[ra] = rb
  220. # 判断两点是否相邻
  221. for i in range(n):
  222. for j in range(i + 1, n):
  223. pi, ti = results[i][0], results[i][1]
  224. pj, tj = results[j][0], results[j][1]
  225. if abs(pi - pj) <= pan_tolerance and abs(ti - tj) <= tilt_tolerance:
  226. union(i, j)
  227. # 按连通分量分组
  228. groups: Dict[int, List[int]] = {}
  229. for i in range(n):
  230. root = find(i)
  231. if root not in groups:
  232. groups[root] = []
  233. groups[root].append(i)
  234. # 转换为OverlapRange,过滤太小的组
  235. ranges = []
  236. for indices in groups.values():
  237. if len(indices) < min_positions:
  238. continue
  239. group_data = [results[i] for i in indices]
  240. ranges.append(self._group_to_range(group_data))
  241. # 按match_count降序排序,只保留最大的 max_ranges 个
  242. ranges.sort(key=lambda r: r.match_count, reverse=True)
  243. ranges = ranges[:max_ranges]
  244. # 按pan_start排序输出
  245. ranges.sort(key=lambda r: r.pan_start)
  246. return ranges
  247. def _group_to_range(self, group: List[Tuple[float, float, int, float, float]]) -> OverlapRange:
  248. """将一组扫描结果转换为一个OverlapRange"""
  249. pans = [r[0] for r in group]
  250. tilts = [r[1] for r in group]
  251. match_counts = [r[2] for r in group]
  252. x_ratios = [r[3] for r in group]
  253. y_ratios = [r[4] for r in group]
  254. step = 5 # 在边缘各扩展5度
  255. return OverlapRange(
  256. pan_start=min(pans) - step,
  257. pan_end=max(pans) + step,
  258. tilt_start=min(tilts) - step,
  259. tilt_end=max(tilts) + step,
  260. match_count=max(match_counts),
  261. panorama_center_x=float(np.mean(x_ratios)),
  262. panorama_center_y=float(np.mean(y_ratios))
  263. )
  264. class VisualCalibrationDetector:
  265. """
  266. 视觉校准检测器
  267. 通过运动检测和特征匹配定位球机在全景画面中的位置
  268. """
  269. def __init__(self):
  270. try:
  271. self.feature_detector = cv2.SIFT_create()
  272. self.feature_type = 'SIFT'
  273. except AttributeError:
  274. self.feature_detector = cv2.ORB_create(nfeatures=500)
  275. self.feature_type = 'ORB'
  276. self.matcher = cv2.BFMatcher(
  277. cv2.NORM_L2 if self.feature_type == 'SIFT' else cv2.NORM_HAMMING
  278. )
  279. self.use_motion_detection = True
  280. self.use_feature_matching = True
  281. def detect_by_motion(self, frames_before: np.ndarray,
  282. frames_after: np.ndarray) -> Optional[Tuple[float, float]]:
  283. """通过运动检测定位球机指向位置"""
  284. if frames_before is None or frames_after is None:
  285. return None
  286. before_gray = cv2.cvtColor(frames_before, cv2.COLOR_BGR2GRAY) \
  287. if len(frames_before.shape) == 3 else frames_before
  288. after_gray = cv2.cvtColor(frames_after, cv2.COLOR_BGR2GRAY) \
  289. if len(frames_after.shape) == 3 else frames_after
  290. diff = cv2.absdiff(before_gray, after_gray)
  291. _, thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
  292. kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15, 15))
  293. thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
  294. thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
  295. contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
  296. if not contours:
  297. return None
  298. max_contour = max(contours, key=cv2.contourArea)
  299. area = cv2.contourArea(max_contour)
  300. if area < 500:
  301. return None
  302. M = cv2.moments(max_contour)
  303. if M["m00"] == 0:
  304. return None
  305. cx = M["m10"] / M["m00"]
  306. cy = M["m01"] / M["m00"]
  307. h, w = before_gray.shape
  308. print(f" 运动检测: 中心=({cx:.1f}, {cy:.1f}), 面积={area:.0f})")
  309. return (cx / w, cy / h)
  310. def detect_by_feature_match(self, panorama_frame: np.ndarray,
  311. ptz_frame: np.ndarray) -> Optional[Tuple[float, float]]:
  312. """通过特征匹配定位"""
  313. if panorama_frame is None or ptz_frame is None:
  314. return None
  315. try:
  316. pan_gray = cv2.cvtColor(panorama_frame, cv2.COLOR_BGR2GRAY) \
  317. if len(panorama_frame.shape) == 3 else panorama_frame
  318. ptz_gray = cv2.cvtColor(ptz_frame, cv2.COLOR_BGR2GRAY) \
  319. if len(ptz_frame.shape) == 3 else ptz_frame
  320. kp1, des1 = self.feature_detector.detectAndCompute(ptz_gray, None)
  321. kp2, des2 = self.feature_detector.detectAndCompute(pan_gray, None)
  322. if des1 is None or des2 is None or len(kp1) < 4 or len(kp2) < 4:
  323. return None
  324. matches = self.matcher.knnMatch(des1, des2, k=2)
  325. good_matches = []
  326. for match_pair in matches:
  327. if len(match_pair) == 2:
  328. m, n = match_pair
  329. if m.distance < 0.75 * n.distance:
  330. good_matches.append(m)
  331. if len(good_matches) < 4:
  332. return None
  333. pan_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches])
  334. center_x = np.mean(pan_pts[:, 0])
  335. center_y = np.mean(pan_pts[:, 1])
  336. h, w = pan_gray.shape
  337. print(f" 特征匹配: 匹配点={len(good_matches)}, 中心=({center_x:.1f}, {center_y:.1f})")
  338. return (center_x / w, center_y / h)
  339. except Exception as e:
  340. print(f" 特征匹配错误: {e}")
  341. return None
  342. def detect_position(self, panorama_frame: np.ndarray,
  343. frames_before: np.ndarray = None,
  344. frames_after: np.ndarray = None,
  345. ptz_frame: np.ndarray = None) -> Tuple[bool, float, float]:
  346. """综合检测球机在全景画面中的位置"""
  347. results = []
  348. if self.use_motion_detection and frames_before is not None and frames_after is not None:
  349. motion_result = self.detect_by_motion(frames_before, frames_after)
  350. if motion_result:
  351. results.append(('motion', motion_result, 0.4))
  352. if self.use_feature_matching and ptz_frame is not None:
  353. feature_result = self.detect_by_feature_match(panorama_frame, ptz_frame)
  354. if feature_result:
  355. results.append(('feature', feature_result, 0.6))
  356. if not results:
  357. return (False, 0.0, 0.0)
  358. total_weight = sum(r[2] for r in results)
  359. x_ratio = sum(r[1][0] * r[2] for r in results) / total_weight
  360. y_ratio = sum(r[1][1] * r[2] for r in results) / total_weight
  361. print(f" 融合结果: ({x_ratio:.3f}, {y_ratio:.3f})")
  362. return (True, x_ratio, y_ratio)
  363. class CameraCalibrator:
  364. """
  365. 相机校准器
  366. 两阶段校准:先发现视野重叠区域,再在重叠区内校准
  367. """
  368. def __init__(self, ptz_camera: PTZCamera,
  369. get_frame_func: Callable[[], np.ndarray],
  370. detect_marker_func: Callable[[np.ndarray], Optional[Tuple[float, float]]] = None,
  371. ptz_capture_func: Callable[[], Optional[np.ndarray]] = None):
  372. self.ptz = ptz_camera
  373. self.get_frame = get_frame_func
  374. self.detect_marker = detect_marker_func
  375. self.ptz_capture = ptz_capture_func
  376. self.visual_detector = VisualCalibrationDetector()
  377. self.overlap_discovery = OverlapDiscovery()
  378. self.state = CalibrationState.IDLE
  379. self.result: Optional[CalibrationResult] = None
  380. # 变换参数
  381. self.pan_offset = 0.0
  382. self.pan_scale_x = 1.0
  383. self.pan_scale_y = 0.0
  384. self.tilt_offset = 0.0
  385. self.tilt_scale_x = 0.0
  386. self.tilt_scale_y = 1.0
  387. # 校准配置
  388. self.stabilize_time = 2.0
  389. self.use_motion_detection = True
  390. self.use_feature_matching = True
  391. # 重叠发现配置
  392. self.overlap_pan_range = (0, 360)
  393. self.overlap_tilt_range = (-30, 30)
  394. self.overlap_pan_step = 20
  395. self.overlap_tilt_step = 15
  396. self.max_overlap_ranges = 3
  397. self.min_positions_per_range = 3
  398. # 回调
  399. self.on_progress: Optional[Callable[[int, int, str], None]] = None
  400. self.on_complete: Optional[Callable[[CalibrationResult], None]] = None
  401. # 发现的重叠区间
  402. self.overlap_ranges: List[OverlapRange] = []
  403. def calibrate(self, quick_mode: bool = True) -> CalibrationResult:
  404. """
  405. 执行校准 - 两阶段流程
  406. 阶段1: 视野重叠发现 - 扫描球机范围,找出与全景有重叠的角度区间
  407. 阶段2: 精确校准 - 仅在重叠区间内生成校准点,逐一验证后拟合变换
  408. """
  409. self.state = CalibrationState.RUNNING
  410. # ===================== 阶段1: 视野重叠发现 =====================
  411. print(f"\n{'='*60}")
  412. print(f"阶段1: 视野重叠发现 - 确定球机与全景的重叠区域")
  413. print(f"{'='*60}")
  414. self.overlap_ranges = self.overlap_discovery.discover_overlap_ranges(
  415. ptz=self.ptz,
  416. get_panorama_frame=self.get_frame,
  417. ptz_capture=self.ptz_capture,
  418. pan_range=self.overlap_pan_range,
  419. tilt_range=self.overlap_tilt_range,
  420. pan_step=self.overlap_pan_step,
  421. tilt_step=self.overlap_tilt_step,
  422. stabilize_time=self.stabilize_time,
  423. on_progress=self.on_progress,
  424. max_ranges=self.max_overlap_ranges,
  425. min_positions_per_range=self.min_positions_per_range
  426. )
  427. if not self.overlap_ranges:
  428. self.state = CalibrationState.FAILED
  429. self.result = CalibrationResult(
  430. success=False,
  431. points=[],
  432. error_message="未发现球机与全景的视野重叠区域,无法校准。请检查两台摄像头的安装位置和朝向。"
  433. )
  434. print(f"\n校准失败: {self.result.error_message}")
  435. if self.on_complete:
  436. self.on_complete(self.result)
  437. return self.result
  438. print(f"\n发现 {len(self.overlap_ranges)} 个重叠区间")
  439. # 选择匹配点最多的区间用于校准
  440. best_range = max(self.overlap_ranges, key=lambda r: r.match_count)
  441. self.overlap_ranges = [best_range]
  442. print(f"选择最佳重叠区间: pan=[{best_range.pan_start:.0f}°, {best_range.pan_end:.0f}°], "
  443. f"tilt=[{best_range.tilt_start:.0f}°, {best_range.tilt_end:.0f}°], "
  444. f"匹配点={best_range.match_count}")
  445. print(f"进入阶段2校准")
  446. # ===================== 阶段2: 在重叠区内精确校准 =====================
  447. print(f"\n{'='*60}")
  448. print(f"阶段2: 精确校准 - 在重叠区间内采集校准点")
  449. print(f"{'='*60}")
  450. calib_points = self._generate_points_in_overlaps(quick_mode)
  451. valid_points = []
  452. total_points = len(calib_points)
  453. print(f"生成 {total_points} 个校准点(仅位于重叠区域内)")
  454. for idx, point in enumerate(calib_points):
  455. if self.on_progress:
  456. self.on_progress(idx + 1, total_points,
  457. f"校准点 {idx + 1}/{total_points}: pan={point.pan:.1f}°, tilt={point.tilt:.1f}°")
  458. print(f"\n 校准点 {idx + 1}/{total_points}: pan={point.pan:.1f}°, tilt={point.tilt:.1f}°")
  459. # 步骤1: 获取移动前全景帧(用于运动检测)
  460. frames_before_list = []
  461. for _ in range(3):
  462. frame = self.get_frame()
  463. if frame is not None:
  464. frames_before_list.append(frame)
  465. time.sleep(0.1)
  466. if not frames_before_list:
  467. print(f" 警告: 无法获取移动前的全景画面")
  468. continue
  469. frames_before = np.mean(frames_before_list, axis=0).astype(np.uint8)
  470. # 步骤2: 移动球机到目标位置
  471. print(f" [2/4] 移动球机到目标位置...")
  472. if not self.ptz.goto_exact_position(point.pan, point.tilt, 1):
  473. print(f" 警告: 移动球机失败")
  474. continue
  475. time.sleep(self.stabilize_time)
  476. # 步骤3: 获取移动后全景帧和球机抓拍
  477. print(f" [3/4] 获取移动后的帧...")
  478. frames_after_list = []
  479. for _ in range(3):
  480. frame = self.get_frame()
  481. if frame is not None:
  482. frames_after_list.append(frame)
  483. time.sleep(0.1)
  484. if not frames_after_list:
  485. print(f" 警告: 无法获取移动后的全景画面")
  486. continue
  487. frames_after = np.mean(frames_after_list, axis=0).astype(np.uint8)
  488. panorama_frame = frames_after
  489. # 球机抓拍(关键:特征匹配需要球机画面)
  490. ptz_frame = None
  491. if self.use_feature_matching and self.ptz_capture:
  492. try:
  493. ptz_frame = self.ptz_capture()
  494. if ptz_frame is not None:
  495. print(f" 球机抓拍成功: {ptz_frame.shape}")
  496. except Exception as e:
  497. print(f" 球机抓拍失败: {e}")
  498. # 步骤4: 视觉检测 + 重叠验证
  499. print(f" [4/4] 视觉检测与重叠验证...")
  500. # 优先使用特征匹配(最可靠的方法)
  501. if ptz_frame is not None and panorama_frame is not None:
  502. success, match_count, cx, cy = self.overlap_discovery.match_frames(ptz_frame, panorama_frame)
  503. if success:
  504. h, w = panorama_frame.shape[:2]
  505. point.x_ratio = cx / w
  506. point.y_ratio = cy / h
  507. point.detected = True
  508. valid_points.append(point)
  509. print(f" ✓ 特征匹配验证通过: {match_count}个匹配点, "
  510. f"全景位置=({point.x_ratio:.3f}, {point.y_ratio:.3f})")
  511. continue
  512. else:
  513. print(f" ✗ 特征匹配不足({match_count}点), 尝试运动检测...")
  514. # 备选: 运动检测
  515. if self.use_motion_detection and frames_before is not None and frames_after is not None:
  516. motion_result = self.visual_detector.detect_by_motion(frames_before, frames_after)
  517. if motion_result:
  518. point.x_ratio, point.y_ratio = motion_result
  519. point.detected = True
  520. valid_points.append(point)
  521. print(f" ✓ 运动检测定位: ({point.x_ratio:.3f}, {point.y_ratio:.3f})")
  522. continue
  523. # 自定义标记检测
  524. if self.detect_marker:
  525. marker_pos = self.detect_marker(panorama_frame)
  526. if marker_pos:
  527. point.x_ratio, point.y_ratio = marker_pos
  528. point.detected = True
  529. valid_points.append(point)
  530. print(f" ✓ 标记检测成功: ({point.x_ratio:.3f}, {point.y_ratio:.3f})")
  531. continue
  532. # 所有方法均失败 - 跳过此点(不使用估算!)
  533. print(f" ✗ 此校准点无法验证,跳过(不使用估算)")
  534. # ===================== 检查有效校准点 =====================
  535. min_valid = 4
  536. if len(valid_points) < min_valid:
  537. self.state = CalibrationState.FAILED
  538. self.result = CalibrationResult(
  539. success=False,
  540. points=valid_points,
  541. error_message=f"有效校准点不足 (需要至少{min_valid}个, 实际{len(valid_points)}个)。"
  542. f"请检查球机与全景的视野重叠是否足够。"
  543. )
  544. print(f"\n校准失败: {self.result.error_message}")
  545. if self.on_complete:
  546. self.on_complete(self.result)
  547. return self.result
  548. # ===================== 计算变换参数 =====================
  549. success = self._calculate_transform(valid_points)
  550. if success:
  551. self.state = CalibrationState.SUCCESS
  552. rms_error = self._calculate_rms_error(valid_points)
  553. self.result = CalibrationResult(
  554. success=True,
  555. points=valid_points,
  556. rms_error=rms_error
  557. )
  558. print(f"\n{'='*60}")
  559. print(f"校准成功!")
  560. print(f"有效校准点: {len(valid_points)}")
  561. print(f"重叠区间数: {len(self.overlap_ranges)}")
  562. print(f"RMS误差: {rms_error:.4f}°")
  563. # 自动保存校准结果
  564. try:
  565. from config import CALIBRATION_CONFIG
  566. if CALIBRATION_CONFIG.get('auto_save', True):
  567. filepath = CALIBRATION_CONFIG.get('calibration_file', 'calibration.json')
  568. self.save_calibration(filepath)
  569. except Exception:
  570. pass
  571. # 校准完成后,将球机复位到初始位置
  572. self._reset_ptz_position()
  573. print(f"{'='*60}")
  574. else:
  575. self.state = CalibrationState.FAILED
  576. self.result = CalibrationResult(
  577. success=False,
  578. points=valid_points,
  579. error_message="变换参数计算失败"
  580. )
  581. print(f"校准失败: {self.result.error_message}")
  582. if self.on_complete:
  583. self.on_complete(self.result)
  584. return self.result
  585. def _reset_ptz_position(self):
  586. """校准完成后将球机复位到初始位置"""
  587. if self.ptz is None:
  588. return
  589. try:
  590. # 获取默认位置配置
  591. from config import PTZ_CONFIG
  592. default_pan = PTZ_CONFIG.get('default_pan', 0)
  593. default_tilt = PTZ_CONFIG.get('default_tilt', 0)
  594. default_zoom = PTZ_CONFIG.get('default_zoom', 1)
  595. print(f"[校准] 球机复位到位置: pan={default_pan}, tilt={default_tilt}, zoom={default_zoom}")
  596. self.ptz.goto_exact_position(default_pan, default_tilt, default_zoom)
  597. time.sleep(0.5) # 等待球机到位
  598. except Exception as e:
  599. print(f"[校准] 球机复位失败: {e}")
  600. def _generate_points_in_overlaps(self, quick_mode: bool = True) -> List[CalibrationPoint]:
  601. """
  602. 在发现的重叠区间内生成校准点
  603. 只在球机和全景有视觉重叠的区域生成点
  604. """
  605. points = []
  606. if quick_mode:
  607. # 快速模式: 每个重叠区间内生成3-5个点
  608. for overlap in self.overlap_ranges:
  609. # 在区间中心生成点
  610. pan_center = (overlap.pan_start + overlap.pan_end) / 2
  611. tilt_center = (overlap.tilt_start + overlap.tilt_end) / 2
  612. pan_span = overlap.pan_end - overlap.pan_start
  613. tilt_span = overlap.tilt_end - overlap.tilt_start
  614. # 中心点
  615. points.append(CalibrationPoint(pan=pan_center, tilt=tilt_center, zoom=1.0))
  616. # 四角点(如果区间足够宽)
  617. if pan_span > 10:
  618. points.append(CalibrationPoint(
  619. pan=overlap.pan_start + pan_span * 0.25,
  620. tilt=tilt_center, zoom=1.0))
  621. points.append(CalibrationPoint(
  622. pan=overlap.pan_start + pan_span * 0.75,
  623. tilt=tilt_center, zoom=1.0))
  624. if tilt_span > 10:
  625. points.append(CalibrationPoint(
  626. pan=pan_center,
  627. tilt=overlap.tilt_start + tilt_span * 0.3, zoom=1.0))
  628. points.append(CalibrationPoint(
  629. pan=pan_center,
  630. tilt=overlap.tilt_start + tilt_span * 0.7, zoom=1.0))
  631. else:
  632. # 完整模式: 在每个重叠区间内均匀分布
  633. grid_size = 5
  634. for overlap in self.overlap_ranges:
  635. for i in range(grid_size):
  636. for j in range(grid_size):
  637. pan = overlap.pan_start + (overlap.pan_end - overlap.pan_start) * i / (grid_size - 1)
  638. tilt = overlap.tilt_start + (overlap.tilt_end - overlap.tilt_start) * j / (grid_size - 1)
  639. points.append(CalibrationPoint(pan=pan, tilt=tilt, zoom=1.0))
  640. return points
  641. def _calculate_transform(self, points: List[CalibrationPoint]) -> bool:
  642. """使用RANSAC + 最小二乘法拟合变换参数,剔除异常值"""
  643. try:
  644. if len(points) < 4:
  645. print(f"计算变换参数错误: 有效点不足({len(points)}个)")
  646. return False
  647. pan_values = np.array([p.pan for p in points])
  648. tilt_values = np.array([p.tilt for p in points])
  649. x_ratios = np.array([p.x_ratio for p in points])
  650. y_ratios = np.array([p.y_ratio for p in points])
  651. # RANSAC剔除异常值
  652. inlier_mask = self._ransac_filter(x_ratios, y_ratios, pan_values, tilt_values)
  653. inlier_count = np.sum(inlier_mask)
  654. if inlier_count < 4:
  655. print(f"RANSAC后有效点不足({inlier_count}个),使用全部点")
  656. inlier_mask = np.ones(len(points), dtype=bool)
  657. inlier_count = np.sum(inlier_mask) # 更新inlier_count
  658. else:
  659. print(f"RANSAC: {len(points)}个点中{inlier_count}个内点,剔除{len(points) - inlier_count}个异常值")
  660. # 用内点拟合
  661. A = np.ones((inlier_count, 3))
  662. A[:, 1] = x_ratios[inlier_mask]
  663. A[:, 2] = y_ratios[inlier_mask]
  664. pan_params, _, _, _ = np.linalg.lstsq(A, pan_values[inlier_mask], rcond=None)
  665. tilt_params, _, _, _ = np.linalg.lstsq(A, tilt_values[inlier_mask], rcond=None)
  666. self.pan_offset = pan_params[0]
  667. self.pan_scale_x = pan_params[1]
  668. self.pan_scale_y = pan_params[2]
  669. self.tilt_offset = tilt_params[0]
  670. self.tilt_scale_x = tilt_params[1]
  671. self.tilt_scale_y = tilt_params[2]
  672. print(f"变换参数:")
  673. print(f" pan = {self.pan_offset:.2f} + {self.pan_scale_x:.2f}*x + {self.pan_scale_y:.2f}*y")
  674. print(f" tilt = {self.tilt_offset:.2f} + {self.tilt_scale_x:.2f}*x + {self.tilt_scale_y:.2f}*y")
  675. return True
  676. except Exception as e:
  677. print(f"计算变换参数错误: {e}")
  678. return False
  679. def _ransac_filter(self, x: np.ndarray, y: np.ndarray,
  680. pan: np.ndarray, tilt: np.ndarray,
  681. max_iterations: int = 200, threshold: float = 15.0,
  682. min_samples: int = 4) -> np.ndarray:
  683. """RANSAC剔除变换拟合中的异常值"""
  684. n = len(x)
  685. best_inliers = np.zeros(n, dtype=bool)
  686. best_inlier_count = 0
  687. rng = np.random.RandomState(42)
  688. for _ in range(max_iterations):
  689. # 随机选min_samples个点
  690. indices = rng.choice(n, min_samples, replace=False)
  691. # 用这些点拟合
  692. A = np.ones((min_samples, 3))
  693. A[:, 1] = x[indices]
  694. A[:, 2] = y[indices]
  695. try:
  696. pan_params, _, _, _ = np.linalg.lstsq(A, pan[indices], rcond=None)
  697. tilt_params, _, _, _ = np.linalg.lstsq(A, tilt[indices], rcond=None)
  698. except np.linalg.LinAlgError:
  699. continue
  700. # 计算所有点的误差
  701. pred_pan = pan_params[0] + pan_params[1] * x + pan_params[2] * y
  702. pred_tilt = tilt_params[0] + tilt_params[1] * x + tilt_params[2] * y
  703. errors = np.sqrt((pred_pan - pan) ** 2 + (pred_tilt - tilt) ** 2)
  704. inliers = errors < threshold
  705. inlier_count = np.sum(inliers)
  706. if inlier_count > best_inlier_count:
  707. best_inlier_count = inlier_count
  708. best_inliers = inliers
  709. if best_inlier_count == 0:
  710. return np.ones(n, dtype=bool)
  711. return best_inliers
  712. def _calculate_rms_error(self, points: List[CalibrationPoint]) -> float:
  713. """计算均方根误差"""
  714. total_error = 0.0
  715. for p in points:
  716. pred_pan, pred_tilt = self.transform(p.x_ratio, p.y_ratio)
  717. error = math.sqrt((pred_pan - p.pan) ** 2 + (pred_tilt - p.tilt) ** 2)
  718. total_error += error ** 2
  719. return math.sqrt(total_error / len(points))
  720. def transform(self, x_ratio: float, y_ratio: float) -> Tuple[float, float]:
  721. """将全景坐标转换为PTZ角度"""
  722. pan = self.pan_offset + self.pan_scale_x * x_ratio + self.pan_scale_y * y_ratio
  723. tilt = self.tilt_offset + self.tilt_scale_x * x_ratio + self.tilt_scale_y * y_ratio
  724. return (pan, tilt)
  725. def inverse_transform(self, pan: float, tilt: float) -> Tuple[float, float]:
  726. """将PTZ角度转换为全景坐标(使用矩阵求逆处理交叉项)"""
  727. # 变换矩阵: [pan] = [pan_offset] + [pan_scale_x pan_scale_y] * [x]
  728. # [tilt] [tilt_offset] [tilt_scale_x tilt_scale_y] [y]
  729. #
  730. # 逆变换: [x] = M^-1 * ([pan, tilt] - offset)
  731. # 构造变换矩阵
  732. M = np.array([
  733. [self.pan_scale_x, self.pan_scale_y],
  734. [self.tilt_scale_x, self.tilt_scale_y]
  735. ])
  736. # 检查矩阵是否可逆
  737. det = np.linalg.det(M)
  738. if abs(det) < 1e-10:
  739. # 矩阵接近奇异,回退到简化计算
  740. x_ratio = (pan - self.pan_offset) / self.pan_scale_x if abs(self.pan_scale_x) > 1e-10 else 0.5
  741. y_ratio = (tilt - self.tilt_offset) / self.tilt_scale_y if abs(self.tilt_scale_y) > 1e-10 else 0.5
  742. else:
  743. # 计算逆矩阵
  744. M_inv = np.linalg.inv(M)
  745. offset = np.array([pan - self.pan_offset, tilt - self.tilt_offset])
  746. result = M_inv @ offset
  747. x_ratio, y_ratio = result[0], result[1]
  748. return (max(0, min(1, x_ratio)), max(0, min(1, y_ratio)))
  749. def is_calibrated(self) -> bool:
  750. return self.state == CalibrationState.SUCCESS
  751. def get_state(self) -> CalibrationState:
  752. return self.state
  753. def get_result(self) -> Optional[CalibrationResult]:
  754. return self.result
  755. def get_overlap_ranges(self) -> List[OverlapRange]:
  756. """返回发现的重叠区间"""
  757. return self.overlap_ranges
  758. def save_calibration(self, filepath: str) -> bool:
  759. """保存校准结果"""
  760. if not self.is_calibrated():
  761. return False
  762. try:
  763. import json
  764. ptz_config = _get_ptz_config()
  765. data = {
  766. 'pan_offset': self.pan_offset,
  767. 'pan_scale_x': self.pan_scale_x,
  768. 'pan_scale_y': self.pan_scale_y,
  769. 'tilt_offset': self.tilt_offset,
  770. 'tilt_scale_x': self.tilt_scale_x,
  771. 'tilt_scale_y': self.tilt_scale_y,
  772. 'rms_error': self.result.rms_error if self.result else 0,
  773. 'overlap_ranges': [
  774. {
  775. 'pan_start': r.pan_start,
  776. 'pan_end': r.pan_end,
  777. 'tilt_start': r.tilt_start,
  778. 'tilt_end': r.tilt_end,
  779. 'match_count': r.match_count
  780. }
  781. for r in self.overlap_ranges
  782. ],
  783. # 保存安装方向配置
  784. 'mount_type': ptz_config.get('mount_type', 'wall'),
  785. 'tilt_flip': ptz_config.get('tilt_flip', False),
  786. 'pan_flip': ptz_config.get('pan_flip', False),
  787. }
  788. with open(filepath, 'w') as f:
  789. json.dump(data, f, indent=2)
  790. print(f"校准结果已保存: {filepath}")
  791. return True
  792. except Exception as e:
  793. print(f"保存校准结果失败: {e}")
  794. return False
  795. def load_calibration(self, filepath: str) -> bool:
  796. """加载校准结果"""
  797. try:
  798. import json
  799. with open(filepath, 'r') as f:
  800. data = json.load(f)
  801. self.pan_offset = data['pan_offset']
  802. self.pan_scale_x = data['pan_scale_x']
  803. self.pan_scale_y = data['pan_scale_y']
  804. self.tilt_offset = data['tilt_offset']
  805. self.tilt_scale_x = data['tilt_scale_x']
  806. self.tilt_scale_y = data['tilt_scale_y']
  807. # 加载重叠区间(如果有)
  808. if 'overlap_ranges' in data:
  809. self.overlap_ranges = [
  810. OverlapRange(
  811. pan_start=r['pan_start'],
  812. pan_end=r['pan_end'],
  813. tilt_start=r['tilt_start'],
  814. tilt_end=r['tilt_end'],
  815. match_count=r['match_count'],
  816. panorama_center_x=0,
  817. panorama_center_y=0
  818. )
  819. for r in data['overlap_ranges']
  820. ]
  821. self.state = CalibrationState.SUCCESS
  822. self.result = CalibrationResult(
  823. success=True,
  824. points=[],
  825. rms_error=data.get('rms_error', 0)
  826. )
  827. # 检查安装方向配置是否匹配
  828. ptz_config = _get_ptz_config()
  829. current_mount = ptz_config.get('mount_type', 'wall')
  830. saved_mount = data.get('mount_type', 'wall')
  831. if current_mount != saved_mount:
  832. print(f"警告: 当前安装类型({current_mount})与校准时的({saved_mount})不同,建议重新校准!")
  833. print(f"校准结果已加载: {filepath}")
  834. return True
  835. except FileNotFoundError:
  836. print(f"校准文件不存在: {filepath}")
  837. return False
  838. except Exception as e:
  839. print(f"加载校准结果失败: {e}")
  840. return False
  841. class CalibrationManager:
  842. """校准管理器"""
  843. def __init__(self, calibrator: CameraCalibrator, calibration_file: str = None):
  844. self.calibrator = calibrator
  845. # 优先使用传入的路径,否则从配置读取,最后使用默认值
  846. if calibration_file:
  847. self.calibration_file = calibration_file
  848. else:
  849. try:
  850. from config import CALIBRATION_CONFIG
  851. self.calibration_file = CALIBRATION_CONFIG.get(
  852. 'calibration_file', 'calibration.json'
  853. )
  854. except ImportError:
  855. self.calibration_file = 'calibration.json'
  856. def auto_calibrate(self, force: bool = False, fallback_on_failure: bool = True) -> CalibrationResult:
  857. """
  858. 自动校准
  859. Args:
  860. force: 是否强制重新校准(不加载已有数据)
  861. fallback_on_failure: 校准失败时是否回退使用已有数据
  862. Returns:
  863. 校准结果
  864. """
  865. # 检查是否启用加载上次校准数据
  866. load_on_startup = True # 默认启用
  867. try:
  868. from config import CALIBRATION_CONFIG
  869. load_on_startup = CALIBRATION_CONFIG.get('load_on_startup', True)
  870. except:
  871. pass
  872. # 如果不是强制校准,尝试加载已有数据
  873. if not force and load_on_startup:
  874. if self.calibrator.load_calibration(self.calibration_file):
  875. print("使用已有校准结果")
  876. return self.calibrator.get_result()
  877. # 执行新校准
  878. if force:
  879. print("强制重新校准(不使用已有数据)...")
  880. elif not load_on_startup:
  881. print("已禁用加载校准数据,开始新校准...")
  882. else:
  883. print("开始自动校准...")
  884. result = self.calibrator.calibrate(quick_mode=True)
  885. if result.success:
  886. self.calibrator.save_calibration(self.calibration_file)
  887. elif fallback_on_failure:
  888. # 校准失败,尝试回退使用已有数据
  889. print("校准失败,尝试回退使用已有校准数据...")
  890. if self.calibrator.load_calibration(self.calibration_file):
  891. print("已回退到已有校准数据")
  892. result = self.calibrator.get_result()
  893. return result
  894. def check_calibration(self) -> Tuple[bool, str]:
  895. """检查校准状态"""
  896. state = self.calibrator.get_state()
  897. if state == CalibrationState.SUCCESS:
  898. result = self.calibrator.get_result()
  899. overlaps = self.calibrator.get_overlap_ranges()
  900. overlap_info = f", {len(overlaps)}个重叠区间" if overlaps else ""
  901. return (True, f"校准有效, RMS误差: {result.rms_error:.4f}°{overlap_info}")
  902. elif state == CalibrationState.FAILED:
  903. return (False, "校准失败")
  904. elif state == CalibrationState.RUNNING:
  905. return (False, "校准进行中")
  906. else:
  907. return (False, "未校准")