|
|
@@ -1,5 +1,6 @@
|
|
|
"""球机 360° 扫描与全景图生成."""
|
|
|
import logging
|
|
|
+import math
|
|
|
import os
|
|
|
import time
|
|
|
from typing import Callable, Dict, List, Optional, Tuple
|
|
|
@@ -18,7 +19,7 @@ class SpatialScanner:
|
|
|
ptz_camera,
|
|
|
ptz_frame_source: Callable[[], Optional[np.ndarray]],
|
|
|
data_dir: str = "data",
|
|
|
- stabilize_time: float = 1.5,
|
|
|
+ stabilize_time: float = 3.0,
|
|
|
):
|
|
|
self.group_id = group_id
|
|
|
self.ptz = ptz_camera
|
|
|
@@ -62,14 +63,15 @@ class SpatialScanner:
|
|
|
if progress_callback:
|
|
|
progress_callback(dict(self.progress))
|
|
|
samples: List[Dict] = []
|
|
|
+ prev_pan, prev_tilt, prev_zoom = 0.0, 0.0, 1
|
|
|
|
|
|
for idx, (pan, tilt) in enumerate(grid):
|
|
|
if self.cancelled:
|
|
|
break
|
|
|
try:
|
|
|
- self.ptz.goto_exact_position(pan, tilt, zoom)
|
|
|
- time.sleep(self.stabilize_time)
|
|
|
- frame = self._wait_frame(timeout=5.0)
|
|
|
+ frame = self._capture_at_position(
|
|
|
+ pan, tilt, zoom, prev_pan, prev_tilt, prev_zoom
|
|
|
+ )
|
|
|
if frame is None:
|
|
|
logger.warning("No frame for pan=%s tilt=%s", pan, tilt)
|
|
|
continue
|
|
|
@@ -83,6 +85,7 @@ class SpatialScanner:
|
|
|
"zoom": zoom,
|
|
|
"thumbnail": path,
|
|
|
})
|
|
|
+ prev_pan, prev_tilt, prev_zoom = pan, tilt, zoom
|
|
|
self.progress["current"] = len(samples)
|
|
|
if progress_callback:
|
|
|
progress_callback(dict(self.progress))
|
|
|
@@ -108,6 +111,71 @@ class SpatialScanner:
|
|
|
time.sleep(0.1)
|
|
|
return None
|
|
|
|
|
|
+ def _drain_frames(self, count: int = 8, interval: float = 0.1) -> Optional[np.ndarray]:
|
|
|
+ """排空旧帧缓冲,返回最后一帧."""
|
|
|
+ last_frame = None
|
|
|
+ for _ in range(count):
|
|
|
+ frame = self.get_ptz_frame()
|
|
|
+ if frame is not None:
|
|
|
+ last_frame = frame
|
|
|
+ time.sleep(interval)
|
|
|
+ return last_frame
|
|
|
+
|
|
|
+ def _capture_at_position(
|
|
|
+ self,
|
|
|
+ pan: float,
|
|
|
+ tilt: float,
|
|
|
+ zoom: int,
|
|
|
+ prev_pan: float,
|
|
|
+ prev_tilt: float,
|
|
|
+ prev_zoom: int,
|
|
|
+ ) -> Optional[np.ndarray]:
|
|
|
+ """移动 PTZ 到目标位置并等待稳定后抓取一帧。
|
|
|
+
|
|
|
+ 根据与上一位置的距离动态计算等待时间,并排空 RTSP 缓冲。
|
|
|
+ """
|
|
|
+ pan_diff = abs(((pan - prev_pan + 180) % 360) - 180)
|
|
|
+ tilt_diff = abs(tilt - prev_tilt)
|
|
|
+ zoom_diff = abs(zoom - prev_zoom)
|
|
|
+ move_distance = math.sqrt(pan_diff ** 2 + tilt_diff ** 2 + zoom_diff ** 2 * 100)
|
|
|
+
|
|
|
+ # 云台典型速度:pan ~90°/s, tilt ~60°/s,按最远距离留 1.5 倍余量
|
|
|
+ base_wait = 1.0
|
|
|
+ per_degree_wait = 0.04
|
|
|
+ min_wait = self.stabilize_time
|
|
|
+ max_wait = 6.0
|
|
|
+ stabilize = min(max_wait, max(min_wait, base_wait + move_distance * per_degree_wait))
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ "[SpatialScanner] move from (%.1f, %.1f, %d) to (%.1f, %.1f, %d), "
|
|
|
+ "distance=%.1f, stabilize=%.2fs",
|
|
|
+ prev_pan, prev_tilt, prev_zoom, pan, tilt, zoom, move_distance, stabilize,
|
|
|
+ )
|
|
|
+
|
|
|
+ t0 = time.time()
|
|
|
+ if not self.ptz.goto_exact_position(pan, tilt, zoom):
|
|
|
+ logger.warning("[SpatialScanner] goto_exact_position failed for (%.1f, %.1f, %d)", pan, tilt, zoom)
|
|
|
+
|
|
|
+ # 等待云台物理到位
|
|
|
+ elapsed = time.time() - t0
|
|
|
+ if elapsed < stabilize:
|
|
|
+ time.sleep(stabilize - elapsed)
|
|
|
+
|
|
|
+ # 排空旧帧:根据移动距离增加排空帧数,确保拿到新位置图像
|
|
|
+ drain_count = max(12, int(move_distance * 0.5))
|
|
|
+ drain_count = min(40, drain_count)
|
|
|
+ frame = self._drain_frames(count=drain_count, interval=0.15)
|
|
|
+
|
|
|
+ # 如果 still 为空,再等待并尝试
|
|
|
+ if frame is None:
|
|
|
+ frame = self._wait_frame(timeout=5.0)
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ "[SpatialScanner] captured frame for (%.1f, %.1f, %d) after %.2fs",
|
|
|
+ pan, tilt, zoom, time.time() - t0,
|
|
|
+ )
|
|
|
+ return frame
|
|
|
+
|
|
|
def _paste_direct(
|
|
|
self,
|
|
|
canvas: np.ndarray,
|