"""检测到人后的保存 + 上传.""" import logging import os import threading import time import uuid from typing import Any, Callable, Dict, List, Optional import cv2 import numpy as np from config.device import DEVICE_CONFIG from core.oss_uploader import OSSUploader logger = logging.getLogger(__name__) class CaptureUploader: def __init__( self, group_id: str, save_dir: str = "data/captures", upload_callback: Optional[Callable[[Dict], None]] = None, oss_uploader: Optional[OSSUploader] = None, dedup_seconds: float = 5.0, ): self.group_id = group_id self.save_dir = os.path.join(save_dir, group_id) os.makedirs(self.save_dir, exist_ok=True) self.upload_callback = upload_callback self.oss_uploader = oss_uploader self.dedup_seconds = dedup_seconds self._last_uploads: List[Dict[str, Any]] = [] self._lock = threading.Lock() self._counter = 0 def _should_upload(self, bbox: List[float]) -> bool: cx = (bbox[0] + bbox[2]) / 2 cy = (bbox[1] + bbox[3]) / 2 for u in self._last_uploads: dx = abs(u["cx"] - cx) dy = abs(u["cy"] - cy) if dx < 50 and dy < 50: return False return True def _validate_inputs( self, frame: np.ndarray, detections: List[Dict], ) -> None: if not isinstance(frame, np.ndarray): raise ValueError("frame must be a numpy ndarray") if frame.ndim != 3 or frame.shape[2] != 3: raise ValueError("frame must have shape (H, W, 3)") if frame.dtype != np.uint8: raise ValueError("frame must have dtype uint8") for i, det in enumerate(detections): if not isinstance(det, dict): raise ValueError(f"detection {i} must be a dict") if "bbox" not in det: raise ValueError(f"detection {i} missing bbox") bbox = det["bbox"] if not isinstance(bbox, (list, tuple)) or len(bbox) != 4: raise ValueError(f"detection {i} bbox must be a list/tuple of 4 numbers") try: [float(v) for v in bbox] except (TypeError, ValueError): raise ValueError(f"detection {i} bbox must contain numbers") if "confidence" not in det: raise ValueError(f"detection {i} missing confidence") try: float(det["confidence"]) except (TypeError, ValueError): raise ValueError(f"detection {i} confidence must be a number") def handle_detection( self, camera_type: str, frame: np.ndarray, detections: List[Dict], ) -> List[Dict]: self._validate_inputs(frame, detections) if not detections: return [] with self._lock: now = time.monotonic() self._last_uploads = [ u for u in self._last_uploads if now - u["time"] < self.dedup_seconds ] upload_decisions = [ (det, self._should_upload(det["bbox"])) for det in detections ] to_upload = [det for det, should in upload_decisions if should] if not to_upload: logger.debug("All detections deduplicated; skipping file writes") return [] self._counter += 1 counter = self._counter ts = int(time.time() * 1000) original_path = os.path.join( self.save_dir, f"{ts}_{counter}_original.jpg" ) marked_path = os.path.join( self.save_dir, f"{ts}_{counter}_marked.jpg" ) for det in to_upload: self._last_uploads.append({ "cx": (det["bbox"][0] + det["bbox"][2]) / 2, "cy": (det["bbox"][1] + det["bbox"][3]) / 2, "time": time.monotonic(), }) logger.info("Saving original image to %s", original_path) if not cv2.imwrite(original_path, frame): raise RuntimeError(f"Failed to write {original_path}") marked = frame.copy() for det in to_upload: x1, y1, x2, y2 = map(int, det["bbox"]) cv2.rectangle(marked, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText(marked, f"{det['confidence']:.2f}", (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) logger.info("Saving marked image to %s", marked_path) if not cv2.imwrite(marked_path, marked): raise RuntimeError(f"Failed to write {marked_path}") results: List[Dict] = [] for det in to_upload: x1, y1, x2, y2 = map(int, det["bbox"]) payload = { "group_id": self.group_id, "camera_type": camera_type, "timestamp": ts, "original": original_path, "marked": marked_path, "bbox": [x1, y1, x2, y2], "confidence": det["confidence"], } results.append(payload) image_urls = {} if self.oss_uploader is not None and self.oss_uploader.enabled: image_urls = self.oss_uploader.upload_pair(original_path, marked_path, prefix=camera_type) logger.info("OSS URLs: %s", image_urls) if self.upload_callback and results: batch_info = { "batch_id": str(uuid.uuid4()), "device_id": DEVICE_CONFIG.get("device_id", "unknown"), "project_id": DEVICE_CONFIG.get("project_id", ""), "timestamp": ts, "camera_type": camera_type, "image_paths": [original_path, marked_path], "image_urls": image_urls, "detections": [ {"bbox": det["bbox"], "confidence": det["confidence"]} for det in to_upload ], } logger.info("Uploading batch info") try: self.upload_callback(batch_info) except Exception as exc: logger.warning("Upload callback failed: %s", exc) return results