| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- """检测到人后的保存 + 上传."""
- import logging
- import os
- import threading
- import time
- import uuid
- from typing import Any, Callable, Dict, List, Optional
- import cv2
- import numpy as np
- from config.device import DEVICE_CONFIG
- logger = logging.getLogger(__name__)
- class CaptureUploader:
- def __init__(
- self,
- group_id: str,
- save_dir: str = "data/captures",
- upload_callback: Optional[Callable[[Dict], None]] = None,
- dedup_seconds: float = 5.0,
- ):
- self.group_id = group_id
- self.save_dir = os.path.join(save_dir, group_id)
- os.makedirs(self.save_dir, exist_ok=True)
- self.upload_callback = upload_callback
- self.dedup_seconds = dedup_seconds
- self._last_uploads: List[Dict[str, Any]] = []
- self._lock = threading.Lock()
- self._counter = 0
- def _should_upload(self, camera_type: str, bbox: List[float]) -> bool:
- cx = (bbox[0] + bbox[2]) / 2
- cy = (bbox[1] + bbox[3]) / 2
- for u in self._last_uploads:
- if u["camera_type"] != camera_type:
- continue
- dx = abs(u["cx"] - cx)
- dy = abs(u["cy"] - cy)
- if dx < 50 and dy < 50:
- return False
- return True
- def _validate_inputs(
- self,
- camera_type: str,
- frame: np.ndarray,
- detections: List[Dict],
- ) -> None:
- if camera_type not in ("panorama", "ptz"):
- raise ValueError("camera_type must be 'panorama' or 'ptz'")
- if not isinstance(frame, np.ndarray):
- raise ValueError("frame must be a numpy ndarray")
- if frame.ndim != 3 or frame.shape[2] != 3:
- raise ValueError("frame must have shape (H, W, 3)")
- if frame.dtype != np.uint8:
- raise ValueError("frame must have dtype uint8")
- for i, det in enumerate(detections):
- if not isinstance(det, dict):
- raise ValueError(f"detection {i} must be a dict")
- if "bbox" not in det:
- raise ValueError(f"detection {i} missing bbox")
- bbox = det["bbox"]
- if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
- raise ValueError(f"detection {i} bbox must be a list/tuple of 4 numbers")
- try:
- [float(v) for v in bbox]
- except (TypeError, ValueError):
- raise ValueError(f"detection {i} bbox must contain numbers")
- if "confidence" not in det:
- raise ValueError(f"detection {i} missing confidence")
- try:
- float(det["confidence"])
- except (TypeError, ValueError):
- raise ValueError(f"detection {i} confidence must be a number")
- def handle_detection(
- self,
- camera_type: str, # 'panorama' or 'ptz'
- frame: np.ndarray,
- detections: List[Dict],
- ptz_position: Optional[Dict] = None,
- ) -> List[Dict]:
- self._validate_inputs(camera_type, frame, detections)
- if not detections:
- return []
- with self._lock:
- now = time.monotonic()
- self._last_uploads = [
- u for u in self._last_uploads
- if now - u["time"] < self.dedup_seconds
- ]
- upload_decisions = [
- (det, self._should_upload(camera_type, det["bbox"]))
- for det in detections
- ]
- to_upload = [det for det, should in upload_decisions if should]
- if not to_upload:
- logger.debug("All detections deduplicated; skipping file writes for %s", camera_type)
- return []
- self._counter += 1
- counter = self._counter
- ts = int(time.time() * 1000)
- original_path = os.path.join(
- self.save_dir, f"{camera_type}_{ts}_{counter}_original.jpg"
- )
- marked_path = os.path.join(
- self.save_dir, f"{camera_type}_{ts}_{counter}_marked.jpg"
- )
- # Reserve dedup slots and generate paths under lock.
- for det in to_upload:
- self._last_uploads.append({
- "camera_type": camera_type,
- "cx": (det["bbox"][0] + det["bbox"][2]) / 2,
- "cy": (det["bbox"][1] + det["bbox"][3]) / 2,
- "time": time.monotonic(),
- })
- # File I/O and user callback run outside the lock.
- logger.info("Saving original image to %s", original_path)
- if not cv2.imwrite(original_path, frame):
- raise RuntimeError(f"Failed to write {original_path}")
- marked = frame.copy()
- for det in to_upload:
- x1, y1, x2, y2 = map(int, det["bbox"])
- cv2.rectangle(marked, (x1, y1), (x2, y2), (0, 255, 0), 2)
- cv2.putText(marked, f"{det['confidence']:.2f}", (x1, y1 - 5),
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
- logger.info("Saving marked image to %s", marked_path)
- if not cv2.imwrite(marked_path, marked):
- raise RuntimeError(f"Failed to write {marked_path}")
- results: List[Dict] = []
- for det in to_upload:
- x1, y1, x2, y2 = map(int, det["bbox"])
- payload = {
- "group_id": self.group_id,
- "camera_type": camera_type,
- "timestamp": ts,
- "original": original_path,
- "marked": marked_path,
- "bbox": [x1, y1, x2, y2],
- "confidence": det["confidence"],
- "ptz_position": ptz_position,
- }
- results.append(payload)
- if self.upload_callback and results:
- batch_info = {
- "batch_id": str(uuid.uuid4()),
- "device_id": DEVICE_CONFIG.get("device_id", "unknown"),
- "project_id": DEVICE_CONFIG.get("project_id", ""),
- "timestamp": ts,
- "image_paths": [original_path, marked_path],
- "detections": [
- {"bbox": det["bbox"], "confidence": det["confidence"], "camera_type": camera_type}
- for det in to_upload
- ],
- "ptz_position": ptz_position,
- }
- logger.info("Uploading batch info for %s", camera_type)
- try:
- self.upload_callback(batch_info)
- except Exception as exc: # noqa: BLE001
- logger.warning("Upload callback failed: %s", exc)
- return results
|