| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- """球机 360° 扫描与全景图生成."""
- import logging
- import math
- import os
- import time
- from typing import Callable, Dict, List, Optional, Tuple
- import cv2
- import numpy as np
- logger = logging.getLogger(__name__)
- from core.coord_utils import compute_sample_grid
- class SpatialScanner:
- def __init__(
- self,
- group_id: str,
- ptz_camera,
- ptz_frame_source: Callable[[], Optional[np.ndarray]],
- data_dir: str = "data",
- stabilize_time: float = 3.0,
- ):
- self.group_id = group_id
- self.ptz = ptz_camera
- self.get_ptz_frame = ptz_frame_source
- self.data_dir = os.path.join(data_dir, group_id)
- os.makedirs(os.path.join(self.data_dir, "samples"), exist_ok=True)
- os.makedirs(os.path.join(self.data_dir, "panorama"), exist_ok=True)
- self.stabilize_time = stabilize_time
- self.progress = {"total": 0, "current": 0, "state": "idle"}
- self.cancelled = False
- def cancel(self):
- self.cancelled = True
- def run(
- self,
- pan_range: Tuple[float, float] = (0.0, 360.0),
- tilt_layers: Tuple[float, ...] = (-20.0, 0.0, 20.0),
- pan_step: float = 30.0,
- zoom: int = 1,
- progress_callback: Optional[Callable[[Dict], None]] = None,
- ) -> Dict:
- config = {
- "pan_range": pan_range,
- "tilt_layers": list(tilt_layers),
- "pan_step": pan_step,
- "zoom": zoom,
- }
- if self.cancelled:
- self.progress = {"total": 0, "current": 0, "state": "cancelled"}
- return {
- "group_id": self.group_id,
- "samples": [],
- "panorama_path": None,
- "config": config,
- }
- grid = compute_sample_grid(pan_range, tilt_layers, pan_step)
- self.progress = {"total": len(grid), "current": 0, "state": "scanning"}
- if progress_callback:
- progress_callback(dict(self.progress))
- samples: List[Dict] = []
- prev_pan, prev_tilt, prev_zoom = 0.0, 0.0, 1
- for idx, (pan, tilt) in enumerate(grid):
- if self.cancelled:
- break
- try:
- frame = self._capture_at_position(
- pan, tilt, zoom, prev_pan, prev_tilt, prev_zoom
- )
- if frame is None:
- logger.warning("No frame for pan=%s tilt=%s", pan, tilt)
- continue
- filename = f"p{pan:.1f}_t{tilt:.1f}.jpg"
- path = os.path.join(self.data_dir, "samples", filename)
- cv2.imwrite(path, frame)
- samples.append({
- "id": idx + 1,
- "pan": pan,
- "tilt": tilt,
- "zoom": zoom,
- "thumbnail": path,
- })
- prev_pan, prev_tilt, prev_zoom = pan, tilt, zoom
- self.progress["current"] = len(samples)
- if progress_callback:
- progress_callback(dict(self.progress))
- except Exception as exc:
- logger.error("Error processing scan sample pan=%s tilt=%s: %s", pan, tilt, exc)
- continue
- panorama_path = self._build_equirectangular(samples, pan_range, tilt_layers)
- self.progress["state"] = "cancelled" if self.cancelled else "done"
- return {
- "group_id": self.group_id,
- "samples": samples,
- "panorama_path": panorama_path,
- "config": config,
- }
- def _wait_frame(self, timeout: float = 5.0) -> Optional[np.ndarray]:
- deadline = time.time() + timeout
- while time.time() < deadline:
- frame = self.get_ptz_frame()
- if frame is not None:
- return frame
- time.sleep(0.1)
- return None
- def _drain_frames(self, count: int = 8, interval: float = 0.1) -> Optional[np.ndarray]:
- """排空旧帧缓冲,返回最后一帧."""
- last_frame = None
- for _ in range(count):
- frame = self.get_ptz_frame()
- if frame is not None:
- last_frame = frame
- time.sleep(interval)
- return last_frame
- def _capture_at_position(
- self,
- pan: float,
- tilt: float,
- zoom: int,
- prev_pan: float,
- prev_tilt: float,
- prev_zoom: int,
- ) -> Optional[np.ndarray]:
- """移动 PTZ 到目标位置并等待稳定后抓取一帧。
- 根据与上一位置的距离动态计算等待时间,并排空 RTSP 缓冲。
- """
- pan_diff = abs(((pan - prev_pan + 180) % 360) - 180)
- tilt_diff = abs(tilt - prev_tilt)
- zoom_diff = abs(zoom - prev_zoom)
- move_distance = math.sqrt(pan_diff ** 2 + tilt_diff ** 2 + zoom_diff ** 2 * 100)
- # 云台典型速度:pan ~90°/s, tilt ~60°/s,按最远距离留 1.5 倍余量
- base_wait = 1.0
- per_degree_wait = 0.04
- min_wait = self.stabilize_time
- max_wait = 6.0
- stabilize = min(max_wait, max(min_wait, base_wait + move_distance * per_degree_wait))
- logger.info(
- "[SpatialScanner] move from (%.1f, %.1f, %d) to (%.1f, %.1f, %d), "
- "distance=%.1f, stabilize=%.2fs",
- prev_pan, prev_tilt, prev_zoom, pan, tilt, zoom, move_distance, stabilize,
- )
- t0 = time.time()
- if not self.ptz.goto_exact_position(pan, tilt, zoom):
- logger.warning("[SpatialScanner] goto_exact_position failed for (%.1f, %.1f, %d)", pan, tilt, zoom)
- # 等待云台物理到位
- elapsed = time.time() - t0
- if elapsed < stabilize:
- time.sleep(stabilize - elapsed)
- # 排空旧帧:根据移动距离增加排空帧数,确保拿到新位置图像
- drain_count = max(12, int(move_distance * 0.5))
- drain_count = min(40, drain_count)
- frame = self._drain_frames(count=drain_count, interval=0.15)
- # 如果 still 为空,再等待并尝试
- if frame is None:
- frame = self._wait_frame(timeout=5.0)
- logger.info(
- "[SpatialScanner] captured frame for (%.1f, %.1f, %d) after %.2fs",
- pan, tilt, zoom, time.time() - t0,
- )
- return frame
- def _paste_direct(
- self,
- canvas: np.ndarray,
- patch: np.ndarray,
- u: int,
- v: int,
- width: int,
- height: int,
- ) -> None:
- """直接把图块贴到画布上(无融合),支持 360° 环绕。"""
- ph, pw = patch.shape[:2]
- x0 = int(round(u - pw / 2))
- y0 = int(round(v - ph / 2))
- xs = max(0, x0)
- xe = min(width, x0 + pw)
- ys = max(0, y0)
- ye = min(height, y0 + ph)
- pxs = xs - x0
- pxe = pxs + (xe - xs)
- pys = ys - y0
- pye = pys + (ye - ys)
- if xe <= xs or ye <= ys:
- return
- canvas[ys:ye, xs:xe] = patch[pys:pye, pxs:pxe]
- def _build_equirectangular(
- self,
- samples: List[Dict],
- pan_range: Tuple[float, float],
- tilt_layers: Tuple[float, ...],
- width: int = 4096,
- height: int = 2048,
- ) -> Optional[str]:
- if not samples:
- return None
- panorama = np.zeros((height, width, 3), dtype=np.uint8)
- pan_start, pan_end = pan_range
- for s in samples:
- frame = cv2.imread(s["thumbnail"])
- if frame is None:
- continue
- fh, fw = frame.shape[:2]
- pan = s["pan"]
- tilt = s["tilt"]
- zoom = max(1, s.get("zoom", 1))
- # 按 zoom 估算水平/垂直视场角(zoom 越大视场角越小)
- hfov = 55.0 / zoom
- vfov = hfov * (fh / fw)
- patch_w = max(1, int(hfov * width / 360.0))
- patch_h = max(1, int(vfov * height / 180.0))
- patch = cv2.resize(frame, (patch_w, patch_h), interpolation=cv2.INTER_AREA)
- # 仅裁剪相机 OSD 时间戳/水印边缘,避免重复文字破坏连续性
- crop_top = int(patch_h * 0.08)
- crop_bottom = int(patch_h * 0.03)
- crop_left = int(patch_w * 0.03)
- crop_right = int(patch_w * 0.03)
- patch = patch[crop_top:patch_h - crop_bottom, crop_left:patch_w - crop_right]
- patch_h, patch_w = patch.shape[:2]
- u_center = int(((pan - pan_start) / (pan_end - pan_start)) * width)
- v_center = int(((90 - tilt) / 180) * height)
- self._paste_direct(panorama, patch, u_center, v_center, width, height)
- # 处理 0°/360° 接缝:左右两侧重复贴图
- if u_center - patch_w // 2 < 0:
- self._paste_direct(panorama, patch, u_center + width, v_center, width, height)
- if u_center + patch_w // 2 > width:
- self._paste_direct(panorama, patch, u_center - width, v_center, width, height)
- path = os.path.join(self.data_dir, "panorama", f"scan_{int(time.time())}.jpg")
- cv2.imwrite(path, panorama)
- return path
|