| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- """球机 360° 扫描与全景图生成."""
- import logging
- import os
- import time
- from typing import Callable, Dict, List, Optional, Tuple
- import cv2
- import numpy as np
- logger = logging.getLogger(__name__)
- from core.coord_utils import compute_sample_grid
- class SpatialScanner:
- def __init__(
- self,
- group_id: str,
- ptz_camera,
- ptz_frame_source: Callable[[], Optional[np.ndarray]],
- data_dir: str = "data",
- stabilize_time: float = 1.5,
- ):
- self.group_id = group_id
- self.ptz = ptz_camera
- self.get_ptz_frame = ptz_frame_source
- self.data_dir = os.path.join(data_dir, group_id)
- os.makedirs(os.path.join(self.data_dir, "samples"), exist_ok=True)
- os.makedirs(os.path.join(self.data_dir, "panorama"), exist_ok=True)
- self.stabilize_time = stabilize_time
- self.progress = {"total": 0, "current": 0, "state": "idle"}
- self.cancelled = False
- def cancel(self):
- self.cancelled = True
- def run(
- self,
- pan_range: Tuple[float, float] = (0.0, 360.0),
- tilt_layers: Tuple[float, ...] = (-20.0, 0.0, 20.0),
- pan_step: float = 30.0,
- zoom: int = 1,
- progress_callback: Optional[Callable[[Dict], None]] = None,
- ) -> Dict:
- config = {
- "pan_range": pan_range,
- "tilt_layers": list(tilt_layers),
- "pan_step": pan_step,
- "zoom": zoom,
- }
- if self.cancelled:
- self.progress = {"total": 0, "current": 0, "state": "cancelled"}
- return {
- "group_id": self.group_id,
- "samples": [],
- "panorama_path": None,
- "config": config,
- }
- grid = compute_sample_grid(pan_range, tilt_layers, pan_step)
- self.progress = {"total": len(grid), "current": 0, "state": "scanning"}
- if progress_callback:
- progress_callback(dict(self.progress))
- samples: List[Dict] = []
- for idx, (pan, tilt) in enumerate(grid):
- if self.cancelled:
- break
- try:
- self.ptz.goto_exact_position(pan, tilt, zoom)
- time.sleep(self.stabilize_time)
- frame = self._wait_frame(timeout=5.0)
- if frame is None:
- logger.warning("No frame for pan=%s tilt=%s", pan, tilt)
- continue
- filename = f"p{pan:.1f}_t{tilt:.1f}.jpg"
- path = os.path.join(self.data_dir, "samples", filename)
- cv2.imwrite(path, frame)
- samples.append({
- "id": idx + 1,
- "pan": pan,
- "tilt": tilt,
- "zoom": zoom,
- "thumbnail": path,
- })
- self.progress["current"] = len(samples)
- if progress_callback:
- progress_callback(dict(self.progress))
- except Exception as exc:
- logger.error("Error processing scan sample pan=%s tilt=%s: %s", pan, tilt, exc)
- continue
- panorama_path = self._build_equirectangular(samples, pan_range, tilt_layers)
- self.progress["state"] = "cancelled" if self.cancelled else "done"
- return {
- "group_id": self.group_id,
- "samples": samples,
- "panorama_path": panorama_path,
- "config": config,
- }
- def _wait_frame(self, timeout: float = 5.0) -> Optional[np.ndarray]:
- deadline = time.time() + timeout
- while time.time() < deadline:
- frame = self.get_ptz_frame()
- if frame is not None:
- return frame
- time.sleep(0.1)
- return None
- def _paste_direct(
- self,
- canvas: np.ndarray,
- patch: np.ndarray,
- u: int,
- v: int,
- width: int,
- height: int,
- ) -> None:
- """直接把图块贴到画布上(无融合),支持 360° 环绕。"""
- ph, pw = patch.shape[:2]
- x0 = int(round(u - pw / 2))
- y0 = int(round(v - ph / 2))
- xs = max(0, x0)
- xe = min(width, x0 + pw)
- ys = max(0, y0)
- ye = min(height, y0 + ph)
- pxs = xs - x0
- pxe = pxs + (xe - xs)
- pys = ys - y0
- pye = pys + (ye - ys)
- if xe <= xs or ye <= ys:
- return
- canvas[ys:ye, xs:xe] = patch[pys:pye, pxs:pxe]
- def _build_equirectangular(
- self,
- samples: List[Dict],
- pan_range: Tuple[float, float],
- tilt_layers: Tuple[float, ...],
- width: int = 4096,
- height: int = 2048,
- ) -> Optional[str]:
- if not samples:
- return None
- panorama = np.zeros((height, width, 3), dtype=np.uint8)
- pan_start, pan_end = pan_range
- for s in samples:
- frame = cv2.imread(s["thumbnail"])
- if frame is None:
- continue
- fh, fw = frame.shape[:2]
- pan = s["pan"]
- tilt = s["tilt"]
- zoom = max(1, s.get("zoom", 1))
- # 按 zoom 估算水平/垂直视场角(zoom 越大视场角越小)
- hfov = 55.0 / zoom
- vfov = hfov * (fh / fw)
- patch_w = max(1, int(hfov * width / 360.0))
- patch_h = max(1, int(vfov * height / 180.0))
- patch = cv2.resize(frame, (patch_w, patch_h), interpolation=cv2.INTER_AREA)
- # 仅裁剪相机 OSD 时间戳/水印边缘,避免重复文字破坏连续性
- crop_top = int(patch_h * 0.08)
- crop_bottom = int(patch_h * 0.03)
- crop_left = int(patch_w * 0.03)
- crop_right = int(patch_w * 0.03)
- patch = patch[crop_top:patch_h - crop_bottom, crop_left:patch_w - crop_right]
- patch_h, patch_w = patch.shape[:2]
- u_center = int(((pan - pan_start) / (pan_end - pan_start)) * width)
- v_center = int(((90 - tilt) / 180) * height)
- self._paste_direct(panorama, patch, u_center, v_center, width, height)
- # 处理 0°/360° 接缝:左右两侧重复贴图
- if u_center - patch_w // 2 < 0:
- self._paste_direct(panorama, patch, u_center + width, v_center, width, height)
- if u_center + patch_w // 2 > width:
- self._paste_direct(panorama, patch, u_center - width, v_center, width, height)
- path = os.path.join(self.data_dir, "panorama", f"scan_{int(time.time())}.jpg")
- cv2.imwrite(path, panorama)
- return path
|