spatial_scanner.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. """球机 360° 扫描与全景图生成."""
  2. import logging
  3. import math
  4. import os
  5. import time
  6. from typing import Callable, Dict, List, Optional, Tuple
  7. import cv2
  8. import numpy as np
  9. logger = logging.getLogger(__name__)
  10. from core.coord_utils import compute_sample_grid
  11. class SpatialScanner:
  12. def __init__(
  13. self,
  14. group_id: str,
  15. ptz_camera,
  16. ptz_frame_source: Callable[[], Optional[np.ndarray]],
  17. data_dir: str = "data",
  18. stabilize_time: float = 3.0,
  19. ):
  20. self.group_id = group_id
  21. self.ptz = ptz_camera
  22. self.get_ptz_frame = ptz_frame_source
  23. self.data_dir = os.path.join(data_dir, group_id)
  24. os.makedirs(os.path.join(self.data_dir, "samples"), exist_ok=True)
  25. os.makedirs(os.path.join(self.data_dir, "panorama"), exist_ok=True)
  26. self.stabilize_time = stabilize_time
  27. self.progress = {"total": 0, "current": 0, "state": "idle"}
  28. self.cancelled = False
  29. def cancel(self):
  30. self.cancelled = True
  31. def run(
  32. self,
  33. pan_range: Tuple[float, float] = (0.0, 360.0),
  34. tilt_layers: Tuple[float, ...] = (-20.0, 0.0, 20.0),
  35. pan_step: float = 30.0,
  36. zoom: int = 1,
  37. progress_callback: Optional[Callable[[Dict], None]] = None,
  38. ) -> Dict:
  39. config = {
  40. "pan_range": pan_range,
  41. "tilt_layers": list(tilt_layers),
  42. "pan_step": pan_step,
  43. "zoom": zoom,
  44. }
  45. if self.cancelled:
  46. self.progress = {"total": 0, "current": 0, "state": "cancelled"}
  47. return {
  48. "group_id": self.group_id,
  49. "samples": [],
  50. "panorama_path": None,
  51. "config": config,
  52. }
  53. grid = compute_sample_grid(pan_range, tilt_layers, pan_step)
  54. self.progress = {"total": len(grid), "current": 0, "state": "scanning"}
  55. if progress_callback:
  56. progress_callback(dict(self.progress))
  57. samples: List[Dict] = []
  58. prev_pan, prev_tilt, prev_zoom = 0.0, 0.0, 1
  59. for idx, (pan, tilt) in enumerate(grid):
  60. if self.cancelled:
  61. break
  62. try:
  63. frame = self._capture_at_position(
  64. pan, tilt, zoom, prev_pan, prev_tilt, prev_zoom
  65. )
  66. if frame is None:
  67. logger.warning("No frame for pan=%s tilt=%s", pan, tilt)
  68. continue
  69. filename = f"p{pan:.1f}_t{tilt:.1f}.jpg"
  70. path = os.path.join(self.data_dir, "samples", filename)
  71. cv2.imwrite(path, frame)
  72. samples.append({
  73. "id": idx + 1,
  74. "pan": pan,
  75. "tilt": tilt,
  76. "zoom": zoom,
  77. "thumbnail": path,
  78. })
  79. prev_pan, prev_tilt, prev_zoom = pan, tilt, zoom
  80. self.progress["current"] = len(samples)
  81. if progress_callback:
  82. progress_callback(dict(self.progress))
  83. except Exception as exc:
  84. logger.error("Error processing scan sample pan=%s tilt=%s: %s", pan, tilt, exc)
  85. continue
  86. panorama_path = self._build_equirectangular(samples, pan_range, tilt_layers)
  87. self.progress["state"] = "cancelled" if self.cancelled else "done"
  88. return {
  89. "group_id": self.group_id,
  90. "samples": samples,
  91. "panorama_path": panorama_path,
  92. "config": config,
  93. }
  94. def _wait_frame(self, timeout: float = 5.0) -> Optional[np.ndarray]:
  95. deadline = time.time() + timeout
  96. while time.time() < deadline:
  97. frame = self.get_ptz_frame()
  98. if frame is not None:
  99. return frame
  100. time.sleep(0.1)
  101. return None
  102. def _drain_frames(self, count: int = 8, interval: float = 0.1) -> Optional[np.ndarray]:
  103. """排空旧帧缓冲,返回最后一帧."""
  104. last_frame = None
  105. for _ in range(count):
  106. frame = self.get_ptz_frame()
  107. if frame is not None:
  108. last_frame = frame
  109. time.sleep(interval)
  110. return last_frame
  111. def _capture_at_position(
  112. self,
  113. pan: float,
  114. tilt: float,
  115. zoom: int,
  116. prev_pan: float,
  117. prev_tilt: float,
  118. prev_zoom: int,
  119. ) -> Optional[np.ndarray]:
  120. """移动 PTZ 到目标位置并等待稳定后抓取一帧。
  121. 根据与上一位置的距离动态计算等待时间,并排空 RTSP 缓冲。
  122. """
  123. pan_diff = abs(((pan - prev_pan + 180) % 360) - 180)
  124. tilt_diff = abs(tilt - prev_tilt)
  125. zoom_diff = abs(zoom - prev_zoom)
  126. move_distance = math.sqrt(pan_diff ** 2 + tilt_diff ** 2 + zoom_diff ** 2 * 100)
  127. # 云台典型速度:pan ~90°/s, tilt ~60°/s,按最远距离留 1.5 倍余量
  128. base_wait = 1.0
  129. per_degree_wait = 0.04
  130. min_wait = self.stabilize_time
  131. max_wait = 6.0
  132. stabilize = min(max_wait, max(min_wait, base_wait + move_distance * per_degree_wait))
  133. logger.info(
  134. "[SpatialScanner] move from (%.1f, %.1f, %d) to (%.1f, %.1f, %d), "
  135. "distance=%.1f, stabilize=%.2fs",
  136. prev_pan, prev_tilt, prev_zoom, pan, tilt, zoom, move_distance, stabilize,
  137. )
  138. t0 = time.time()
  139. if not self.ptz.goto_exact_position(pan, tilt, zoom):
  140. logger.warning("[SpatialScanner] goto_exact_position failed for (%.1f, %.1f, %d)", pan, tilt, zoom)
  141. # 等待云台物理到位
  142. elapsed = time.time() - t0
  143. if elapsed < stabilize:
  144. time.sleep(stabilize - elapsed)
  145. # 排空旧帧:根据移动距离增加排空帧数,确保拿到新位置图像
  146. drain_count = max(12, int(move_distance * 0.5))
  147. drain_count = min(40, drain_count)
  148. frame = self._drain_frames(count=drain_count, interval=0.15)
  149. # 如果 still 为空,再等待并尝试
  150. if frame is None:
  151. frame = self._wait_frame(timeout=5.0)
  152. logger.info(
  153. "[SpatialScanner] captured frame for (%.1f, %.1f, %d) after %.2fs",
  154. pan, tilt, zoom, time.time() - t0,
  155. )
  156. return frame
  157. def _paste_direct(
  158. self,
  159. canvas: np.ndarray,
  160. patch: np.ndarray,
  161. u: int,
  162. v: int,
  163. width: int,
  164. height: int,
  165. ) -> None:
  166. """直接把图块贴到画布上(无融合),支持 360° 环绕。"""
  167. ph, pw = patch.shape[:2]
  168. x0 = int(round(u - pw / 2))
  169. y0 = int(round(v - ph / 2))
  170. xs = max(0, x0)
  171. xe = min(width, x0 + pw)
  172. ys = max(0, y0)
  173. ye = min(height, y0 + ph)
  174. pxs = xs - x0
  175. pxe = pxs + (xe - xs)
  176. pys = ys - y0
  177. pye = pys + (ye - ys)
  178. if xe <= xs or ye <= ys:
  179. return
  180. canvas[ys:ye, xs:xe] = patch[pys:pye, pxs:pxe]
  181. def _build_equirectangular(
  182. self,
  183. samples: List[Dict],
  184. pan_range: Tuple[float, float],
  185. tilt_layers: Tuple[float, ...],
  186. width: int = 4096,
  187. height: int = 2048,
  188. ) -> Optional[str]:
  189. if not samples:
  190. return None
  191. panorama = np.zeros((height, width, 3), dtype=np.uint8)
  192. pan_start, pan_end = pan_range
  193. for s in samples:
  194. frame = cv2.imread(s["thumbnail"])
  195. if frame is None:
  196. continue
  197. fh, fw = frame.shape[:2]
  198. pan = s["pan"]
  199. tilt = s["tilt"]
  200. zoom = max(1, s.get("zoom", 1))
  201. # 按 zoom 估算水平/垂直视场角(zoom 越大视场角越小)
  202. hfov = 55.0 / zoom
  203. vfov = hfov * (fh / fw)
  204. patch_w = max(1, int(hfov * width / 360.0))
  205. patch_h = max(1, int(vfov * height / 180.0))
  206. patch = cv2.resize(frame, (patch_w, patch_h), interpolation=cv2.INTER_AREA)
  207. # 仅裁剪相机 OSD 时间戳/水印边缘,避免重复文字破坏连续性
  208. crop_top = int(patch_h * 0.08)
  209. crop_bottom = int(patch_h * 0.03)
  210. crop_left = int(patch_w * 0.03)
  211. crop_right = int(patch_w * 0.03)
  212. patch = patch[crop_top:patch_h - crop_bottom, crop_left:patch_w - crop_right]
  213. patch_h, patch_w = patch.shape[:2]
  214. u_center = int(((pan - pan_start) / (pan_end - pan_start)) * width)
  215. v_center = int(((90 - tilt) / 180) * height)
  216. self._paste_direct(panorama, patch, u_center, v_center, width, height)
  217. # 处理 0°/360° 接缝:左右两侧重复贴图
  218. if u_center - patch_w // 2 < 0:
  219. self._paste_direct(panorama, patch, u_center + width, v_center, width, height)
  220. if u_center + patch_w // 2 > width:
  221. self._paste_direct(panorama, patch, u_center - width, v_center, width, height)
  222. path = os.path.join(self.data_dir, "panorama", f"scan_{int(time.time())}.jpg")
  223. cv2.imwrite(path, panorama)
  224. return path