spatial_scanner.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. """球机 360° 扫描与全景图生成."""
  2. import logging
  3. import os
  4. import time
  5. from typing import Callable, Dict, List, Optional, Tuple
  6. import cv2
  7. import numpy as np
  8. logger = logging.getLogger(__name__)
  9. from core.coord_utils import compute_sample_grid
  10. class SpatialScanner:
  11. def __init__(
  12. self,
  13. group_id: str,
  14. ptz_camera,
  15. ptz_frame_source: Callable[[], Optional[np.ndarray]],
  16. data_dir: str = "data",
  17. stabilize_time: float = 1.5,
  18. ):
  19. self.group_id = group_id
  20. self.ptz = ptz_camera
  21. self.get_ptz_frame = ptz_frame_source
  22. self.data_dir = os.path.join(data_dir, group_id)
  23. os.makedirs(os.path.join(self.data_dir, "samples"), exist_ok=True)
  24. os.makedirs(os.path.join(self.data_dir, "panorama"), exist_ok=True)
  25. self.stabilize_time = stabilize_time
  26. self.progress = {"total": 0, "current": 0, "state": "idle"}
  27. self.cancelled = False
  28. def cancel(self):
  29. self.cancelled = True
  30. def run(
  31. self,
  32. pan_range: Tuple[float, float] = (0.0, 360.0),
  33. tilt_layers: Tuple[float, ...] = (-20.0, 0.0, 20.0),
  34. pan_step: float = 30.0,
  35. zoom: int = 1,
  36. progress_callback: Optional[Callable[[Dict], None]] = None,
  37. ) -> Dict:
  38. config = {
  39. "pan_range": pan_range,
  40. "tilt_layers": list(tilt_layers),
  41. "pan_step": pan_step,
  42. "zoom": zoom,
  43. }
  44. if self.cancelled:
  45. self.progress = {"total": 0, "current": 0, "state": "cancelled"}
  46. return {
  47. "group_id": self.group_id,
  48. "samples": [],
  49. "panorama_path": None,
  50. "config": config,
  51. }
  52. grid = compute_sample_grid(pan_range, tilt_layers, pan_step)
  53. self.progress = {"total": len(grid), "current": 0, "state": "scanning"}
  54. samples: List[Dict] = []
  55. for idx, (pan, tilt) in enumerate(grid):
  56. if self.cancelled:
  57. break
  58. try:
  59. self.ptz.goto_exact_position(pan, tilt, zoom)
  60. time.sleep(self.stabilize_time)
  61. frame = self._wait_frame(timeout=5.0)
  62. if frame is None:
  63. continue
  64. filename = f"p{pan:.1f}_t{tilt:.1f}.jpg"
  65. path = os.path.join(self.data_dir, "samples", filename)
  66. cv2.imwrite(path, frame)
  67. samples.append({
  68. "id": idx + 1,
  69. "pan": pan,
  70. "tilt": tilt,
  71. "zoom": zoom,
  72. "thumbnail": path,
  73. })
  74. self.progress["current"] = len(samples)
  75. if progress_callback:
  76. progress_callback(dict(self.progress))
  77. except Exception as exc:
  78. logger.error("Error processing scan sample pan=%s tilt=%s: %s", pan, tilt, exc)
  79. continue
  80. panorama_path = self._build_equirectangular(samples, pan_range, tilt_layers)
  81. self.progress["state"] = "cancelled" if self.cancelled else "done"
  82. return {
  83. "group_id": self.group_id,
  84. "samples": samples,
  85. "panorama_path": panorama_path,
  86. "config": config,
  87. }
  88. def _wait_frame(self, timeout: float = 5.0) -> Optional[np.ndarray]:
  89. deadline = time.time() + timeout
  90. while time.time() < deadline:
  91. frame = self.get_ptz_frame()
  92. if frame is not None:
  93. return frame
  94. time.sleep(0.1)
  95. return None
  96. def _build_equirectangular(
  97. self,
  98. samples: List[Dict],
  99. pan_range: Tuple[float, float],
  100. tilt_layers: Tuple[float, ...],
  101. width: int = 4096,
  102. height: int = 2048,
  103. ) -> Optional[str]:
  104. if not samples:
  105. return None
  106. panorama = np.zeros((height, width, 3), dtype=np.uint8)
  107. pan_start, pan_end = pan_range
  108. for s in samples:
  109. frame = cv2.imread(s["thumbnail"])
  110. if frame is None:
  111. continue
  112. h, w = frame.shape[:2]
  113. pan = s["pan"]
  114. tilt = s["tilt"]
  115. u_center = int(((pan - pan_start) / (pan_end - pan_start)) * width)
  116. v_center = int(((90 - tilt) / 180) * height)
  117. y_start = max(0, v_center - h // 2)
  118. y_end = min(height, v_center + h // 2)
  119. x_start = max(0, u_center - w // 2)
  120. x_end = min(width, u_center + w // 2)
  121. paste_h = y_end - y_start
  122. paste_w = x_end - x_start
  123. if paste_h <= 0 or paste_w <= 0:
  124. continue
  125. roi = frame[(h - paste_h) // 2:(h - paste_h) // 2 + paste_h,
  126. (w - paste_w) // 2:(w - paste_w) // 2 + paste_w]
  127. panorama[y_start:y_end, x_start:x_end] = roi
  128. path = os.path.join(self.data_dir, "panorama", f"scan_{int(time.time())}.jpg")
  129. cv2.imwrite(path, panorama)
  130. return path