spatial_scanner.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. """球机 360° 扫描与全景图生成."""
  2. import logging
  3. import os
  4. import time
  5. from typing import Callable, Dict, List, Optional, Tuple
  6. import cv2
  7. import numpy as np
  8. logger = logging.getLogger(__name__)
  9. from core.coord_utils import compute_sample_grid
  10. class SpatialScanner:
  11. def __init__(
  12. self,
  13. group_id: str,
  14. ptz_camera,
  15. ptz_frame_source: Callable[[], Optional[np.ndarray]],
  16. data_dir: str = "data",
  17. stabilize_time: float = 1.5,
  18. ):
  19. self.group_id = group_id
  20. self.ptz = ptz_camera
  21. self.get_ptz_frame = ptz_frame_source
  22. self.data_dir = os.path.join(data_dir, group_id)
  23. os.makedirs(os.path.join(self.data_dir, "samples"), exist_ok=True)
  24. os.makedirs(os.path.join(self.data_dir, "panorama"), exist_ok=True)
  25. self.stabilize_time = stabilize_time
  26. self.progress = {"total": 0, "current": 0, "state": "idle"}
  27. self.cancelled = False
  28. def cancel(self):
  29. self.cancelled = True
  30. def run(
  31. self,
  32. pan_range: Tuple[float, float] = (0.0, 360.0),
  33. tilt_layers: Tuple[float, ...] = (-20.0, 0.0, 20.0),
  34. pan_step: float = 30.0,
  35. zoom: int = 1,
  36. progress_callback: Optional[Callable[[Dict], None]] = None,
  37. ) -> Dict:
  38. config = {
  39. "pan_range": pan_range,
  40. "tilt_layers": list(tilt_layers),
  41. "pan_step": pan_step,
  42. "zoom": zoom,
  43. }
  44. if self.cancelled:
  45. self.progress = {"total": 0, "current": 0, "state": "cancelled"}
  46. return {
  47. "group_id": self.group_id,
  48. "samples": [],
  49. "panorama_path": None,
  50. "config": config,
  51. }
  52. grid = compute_sample_grid(pan_range, tilt_layers, pan_step)
  53. self.progress = {"total": len(grid), "current": 0, "state": "scanning"}
  54. if progress_callback:
  55. progress_callback(dict(self.progress))
  56. samples: List[Dict] = []
  57. for idx, (pan, tilt) in enumerate(grid):
  58. if self.cancelled:
  59. break
  60. try:
  61. self.ptz.goto_exact_position(pan, tilt, zoom)
  62. time.sleep(self.stabilize_time)
  63. frame = self._wait_frame(timeout=5.0)
  64. if frame is None:
  65. logger.warning("No frame for pan=%s tilt=%s", pan, tilt)
  66. continue
  67. filename = f"p{pan:.1f}_t{tilt:.1f}.jpg"
  68. path = os.path.join(self.data_dir, "samples", filename)
  69. cv2.imwrite(path, frame)
  70. samples.append({
  71. "id": idx + 1,
  72. "pan": pan,
  73. "tilt": tilt,
  74. "zoom": zoom,
  75. "thumbnail": path,
  76. })
  77. self.progress["current"] = len(samples)
  78. if progress_callback:
  79. progress_callback(dict(self.progress))
  80. except Exception as exc:
  81. logger.error("Error processing scan sample pan=%s tilt=%s: %s", pan, tilt, exc)
  82. continue
  83. panorama_path = self._build_equirectangular(samples, pan_range, tilt_layers)
  84. self.progress["state"] = "cancelled" if self.cancelled else "done"
  85. return {
  86. "group_id": self.group_id,
  87. "samples": samples,
  88. "panorama_path": panorama_path,
  89. "config": config,
  90. }
  91. def _wait_frame(self, timeout: float = 5.0) -> Optional[np.ndarray]:
  92. deadline = time.time() + timeout
  93. while time.time() < deadline:
  94. frame = self.get_ptz_frame()
  95. if frame is not None:
  96. return frame
  97. time.sleep(0.1)
  98. return None
  99. def _paste_direct(
  100. self,
  101. canvas: np.ndarray,
  102. patch: np.ndarray,
  103. u: int,
  104. v: int,
  105. width: int,
  106. height: int,
  107. ) -> None:
  108. """直接把图块贴到画布上(无融合),支持 360° 环绕。"""
  109. ph, pw = patch.shape[:2]
  110. x0 = int(round(u - pw / 2))
  111. y0 = int(round(v - ph / 2))
  112. xs = max(0, x0)
  113. xe = min(width, x0 + pw)
  114. ys = max(0, y0)
  115. ye = min(height, y0 + ph)
  116. pxs = xs - x0
  117. pxe = pxs + (xe - xs)
  118. pys = ys - y0
  119. pye = pys + (ye - ys)
  120. if xe <= xs or ye <= ys:
  121. return
  122. canvas[ys:ye, xs:xe] = patch[pys:pye, pxs:pxe]
  123. def _build_equirectangular(
  124. self,
  125. samples: List[Dict],
  126. pan_range: Tuple[float, float],
  127. tilt_layers: Tuple[float, ...],
  128. width: int = 4096,
  129. height: int = 2048,
  130. ) -> Optional[str]:
  131. if not samples:
  132. return None
  133. panorama = np.zeros((height, width, 3), dtype=np.uint8)
  134. pan_start, pan_end = pan_range
  135. for s in samples:
  136. frame = cv2.imread(s["thumbnail"])
  137. if frame is None:
  138. continue
  139. fh, fw = frame.shape[:2]
  140. pan = s["pan"]
  141. tilt = s["tilt"]
  142. zoom = max(1, s.get("zoom", 1))
  143. # 按 zoom 估算水平/垂直视场角(zoom 越大视场角越小)
  144. hfov = 55.0 / zoom
  145. vfov = hfov * (fh / fw)
  146. patch_w = max(1, int(hfov * width / 360.0))
  147. patch_h = max(1, int(vfov * height / 180.0))
  148. patch = cv2.resize(frame, (patch_w, patch_h), interpolation=cv2.INTER_AREA)
  149. # 仅裁剪相机 OSD 时间戳/水印边缘,避免重复文字破坏连续性
  150. crop_top = int(patch_h * 0.08)
  151. crop_bottom = int(patch_h * 0.03)
  152. crop_left = int(patch_w * 0.03)
  153. crop_right = int(patch_w * 0.03)
  154. patch = patch[crop_top:patch_h - crop_bottom, crop_left:patch_w - crop_right]
  155. patch_h, patch_w = patch.shape[:2]
  156. u_center = int(((pan - pan_start) / (pan_end - pan_start)) * width)
  157. v_center = int(((90 - tilt) / 180) * height)
  158. self._paste_direct(panorama, patch, u_center, v_center, width, height)
  159. # 处理 0°/360° 接缝:左右两侧重复贴图
  160. if u_center - patch_w // 2 < 0:
  161. self._paste_direct(panorama, patch, u_center + width, v_center, width, height)
  162. if u_center + patch_w // 2 > width:
  163. self._paste_direct(panorama, patch, u_center - width, v_center, width, height)
  164. path = os.path.join(self.data_dir, "panorama", f"scan_{int(time.time())}.jpg")
  165. cv2.imwrite(path, panorama)
  166. return path