"""球机 360° 扫描与全景图生成.""" import logging import os import time from typing import Callable, Dict, List, Optional, Tuple import cv2 import numpy as np logger = logging.getLogger(__name__) from core.coord_utils import compute_sample_grid class SpatialScanner: def __init__( self, group_id: str, ptz_camera, ptz_frame_source: Callable[[], Optional[np.ndarray]], data_dir: str = "data", stabilize_time: float = 1.5, ): self.group_id = group_id self.ptz = ptz_camera self.get_ptz_frame = ptz_frame_source self.data_dir = os.path.join(data_dir, group_id) os.makedirs(os.path.join(self.data_dir, "samples"), exist_ok=True) os.makedirs(os.path.join(self.data_dir, "panorama"), exist_ok=True) self.stabilize_time = stabilize_time self.progress = {"total": 0, "current": 0, "state": "idle"} self.cancelled = False def cancel(self): self.cancelled = True def run( self, pan_range: Tuple[float, float] = (0.0, 360.0), tilt_layers: Tuple[float, ...] = (-20.0, 0.0, 20.0), pan_step: float = 30.0, zoom: int = 1, progress_callback: Optional[Callable[[Dict], None]] = None, ) -> Dict: config = { "pan_range": pan_range, "tilt_layers": list(tilt_layers), "pan_step": pan_step, "zoom": zoom, } if self.cancelled: self.progress = {"total": 0, "current": 0, "state": "cancelled"} return { "group_id": self.group_id, "samples": [], "panorama_path": None, "config": config, } grid = compute_sample_grid(pan_range, tilt_layers, pan_step) self.progress = {"total": len(grid), "current": 0, "state": "scanning"} if progress_callback: progress_callback(dict(self.progress)) samples: List[Dict] = [] for idx, (pan, tilt) in enumerate(grid): if self.cancelled: break try: self.ptz.goto_exact_position(pan, tilt, zoom) time.sleep(self.stabilize_time) frame = self._wait_frame(timeout=5.0) if frame is None: logger.warning("No frame for pan=%s tilt=%s", pan, tilt) continue filename = f"p{pan:.1f}_t{tilt:.1f}.jpg" path = os.path.join(self.data_dir, "samples", filename) cv2.imwrite(path, frame) samples.append({ "id": idx + 1, "pan": pan, "tilt": tilt, "zoom": zoom, "thumbnail": path, }) self.progress["current"] = len(samples) if progress_callback: progress_callback(dict(self.progress)) except Exception as exc: logger.error("Error processing scan sample pan=%s tilt=%s: %s", pan, tilt, exc) continue panorama_path = self._build_equirectangular(samples, pan_range, tilt_layers) self.progress["state"] = "cancelled" if self.cancelled else "done" return { "group_id": self.group_id, "samples": samples, "panorama_path": panorama_path, "config": config, } def _wait_frame(self, timeout: float = 5.0) -> Optional[np.ndarray]: deadline = time.time() + timeout while time.time() < deadline: frame = self.get_ptz_frame() if frame is not None: return frame time.sleep(0.1) return None def _paste_direct( self, canvas: np.ndarray, patch: np.ndarray, u: int, v: int, width: int, height: int, ) -> None: """直接把图块贴到画布上(无融合),支持 360° 环绕。""" ph, pw = patch.shape[:2] x0 = int(round(u - pw / 2)) y0 = int(round(v - ph / 2)) xs = max(0, x0) xe = min(width, x0 + pw) ys = max(0, y0) ye = min(height, y0 + ph) pxs = xs - x0 pxe = pxs + (xe - xs) pys = ys - y0 pye = pys + (ye - ys) if xe <= xs or ye <= ys: return canvas[ys:ye, xs:xe] = patch[pys:pye, pxs:pxe] def _build_equirectangular( self, samples: List[Dict], pan_range: Tuple[float, float], tilt_layers: Tuple[float, ...], width: int = 4096, height: int = 2048, ) -> Optional[str]: if not samples: return None panorama = np.zeros((height, width, 3), dtype=np.uint8) pan_start, pan_end = pan_range for s in samples: frame = cv2.imread(s["thumbnail"]) if frame is None: continue fh, fw = frame.shape[:2] pan = s["pan"] tilt = s["tilt"] zoom = max(1, s.get("zoom", 1)) # 按 zoom 估算水平/垂直视场角(zoom 越大视场角越小) hfov = 55.0 / zoom vfov = hfov * (fh / fw) patch_w = max(1, int(hfov * width / 360.0)) patch_h = max(1, int(vfov * height / 180.0)) patch = cv2.resize(frame, (patch_w, patch_h), interpolation=cv2.INTER_AREA) # 仅裁剪相机 OSD 时间戳/水印边缘,避免重复文字破坏连续性 crop_top = int(patch_h * 0.08) crop_bottom = int(patch_h * 0.03) crop_left = int(patch_w * 0.03) crop_right = int(patch_w * 0.03) patch = patch[crop_top:patch_h - crop_bottom, crop_left:patch_w - crop_right] patch_h, patch_w = patch.shape[:2] u_center = int(((pan - pan_start) / (pan_end - pan_start)) * width) v_center = int(((90 - tilt) / 180) * height) self._paste_direct(panorama, patch, u_center, v_center, width, height) # 处理 0°/360° 接缝:左右两侧重复贴图 if u_center - patch_w // 2 < 0: self._paste_direct(panorama, patch, u_center + width, v_center, width, height) if u_center + patch_w // 2 > width: self._paste_direct(panorama, patch, u_center - width, v_center, width, height) path = os.path.join(self.data_dir, "panorama", f"scan_{int(time.time())}.jpg") cv2.imwrite(path, panorama) return path