capture_uploader.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. """检测到人后的保存 + 上传."""
  2. import logging
  3. import os
  4. import threading
  5. import time
  6. import uuid
  7. from typing import Any, Callable, Dict, List, Optional
  8. import cv2
  9. import numpy as np
  10. from config.device import DEVICE_CONFIG
  11. logger = logging.getLogger(__name__)
  12. class CaptureUploader:
  13. def __init__(
  14. self,
  15. group_id: str,
  16. save_dir: str = "data/captures",
  17. upload_callback: Optional[Callable[[Dict], None]] = None,
  18. dedup_seconds: float = 5.0,
  19. ):
  20. self.group_id = group_id
  21. self.save_dir = os.path.join(save_dir, group_id)
  22. os.makedirs(self.save_dir, exist_ok=True)
  23. self.upload_callback = upload_callback
  24. self.dedup_seconds = dedup_seconds
  25. self._last_uploads: List[Dict[str, Any]] = []
  26. self._lock = threading.Lock()
  27. self._counter = 0
  28. def _should_upload(self, camera_type: str, bbox: List[float]) -> bool:
  29. cx = (bbox[0] + bbox[2]) / 2
  30. cy = (bbox[1] + bbox[3]) / 2
  31. for u in self._last_uploads:
  32. if u["camera_type"] != camera_type:
  33. continue
  34. dx = abs(u["cx"] - cx)
  35. dy = abs(u["cy"] - cy)
  36. if dx < 50 and dy < 50:
  37. return False
  38. return True
  39. def _validate_inputs(
  40. self,
  41. camera_type: str,
  42. frame: np.ndarray,
  43. detections: List[Dict],
  44. ) -> None:
  45. if camera_type not in ("panorama", "ptz"):
  46. raise ValueError("camera_type must be 'panorama' or 'ptz'")
  47. if not isinstance(frame, np.ndarray):
  48. raise ValueError("frame must be a numpy ndarray")
  49. if frame.ndim != 3 or frame.shape[2] != 3:
  50. raise ValueError("frame must have shape (H, W, 3)")
  51. if frame.dtype != np.uint8:
  52. raise ValueError("frame must have dtype uint8")
  53. for i, det in enumerate(detections):
  54. if not isinstance(det, dict):
  55. raise ValueError(f"detection {i} must be a dict")
  56. if "bbox" not in det:
  57. raise ValueError(f"detection {i} missing bbox")
  58. bbox = det["bbox"]
  59. if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
  60. raise ValueError(f"detection {i} bbox must be a list/tuple of 4 numbers")
  61. try:
  62. [float(v) for v in bbox]
  63. except (TypeError, ValueError):
  64. raise ValueError(f"detection {i} bbox must contain numbers")
  65. if "confidence" not in det:
  66. raise ValueError(f"detection {i} missing confidence")
  67. try:
  68. float(det["confidence"])
  69. except (TypeError, ValueError):
  70. raise ValueError(f"detection {i} confidence must be a number")
  71. def handle_detection(
  72. self,
  73. camera_type: str, # 'panorama' or 'ptz'
  74. frame: np.ndarray,
  75. detections: List[Dict],
  76. ptz_position: Optional[Dict] = None,
  77. ) -> List[Dict]:
  78. self._validate_inputs(camera_type, frame, detections)
  79. if not detections:
  80. return []
  81. with self._lock:
  82. now = time.monotonic()
  83. self._last_uploads = [
  84. u for u in self._last_uploads
  85. if now - u["time"] < self.dedup_seconds
  86. ]
  87. upload_decisions = [
  88. (det, self._should_upload(camera_type, det["bbox"]))
  89. for det in detections
  90. ]
  91. to_upload = [det for det, should in upload_decisions if should]
  92. if not to_upload:
  93. logger.debug("All detections deduplicated; skipping file writes for %s", camera_type)
  94. return []
  95. self._counter += 1
  96. counter = self._counter
  97. ts = int(time.time() * 1000)
  98. original_path = os.path.join(
  99. self.save_dir, f"{camera_type}_{ts}_{counter}_original.jpg"
  100. )
  101. marked_path = os.path.join(
  102. self.save_dir, f"{camera_type}_{ts}_{counter}_marked.jpg"
  103. )
  104. # Reserve dedup slots and generate paths under lock.
  105. for det in to_upload:
  106. self._last_uploads.append({
  107. "camera_type": camera_type,
  108. "cx": (det["bbox"][0] + det["bbox"][2]) / 2,
  109. "cy": (det["bbox"][1] + det["bbox"][3]) / 2,
  110. "time": time.monotonic(),
  111. })
  112. # File I/O and user callback run outside the lock.
  113. logger.info("Saving original image to %s", original_path)
  114. if not cv2.imwrite(original_path, frame):
  115. raise RuntimeError(f"Failed to write {original_path}")
  116. marked = frame.copy()
  117. for det in to_upload:
  118. x1, y1, x2, y2 = map(int, det["bbox"])
  119. cv2.rectangle(marked, (x1, y1), (x2, y2), (0, 255, 0), 2)
  120. cv2.putText(marked, f"{det['confidence']:.2f}", (x1, y1 - 5),
  121. cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
  122. logger.info("Saving marked image to %s", marked_path)
  123. if not cv2.imwrite(marked_path, marked):
  124. raise RuntimeError(f"Failed to write {marked_path}")
  125. results: List[Dict] = []
  126. for det in to_upload:
  127. x1, y1, x2, y2 = map(int, det["bbox"])
  128. payload = {
  129. "group_id": self.group_id,
  130. "camera_type": camera_type,
  131. "timestamp": ts,
  132. "original": original_path,
  133. "marked": marked_path,
  134. "bbox": [x1, y1, x2, y2],
  135. "confidence": det["confidence"],
  136. "ptz_position": ptz_position,
  137. }
  138. results.append(payload)
  139. if self.upload_callback and results:
  140. batch_info = {
  141. "batch_id": str(uuid.uuid4()),
  142. "device_id": DEVICE_CONFIG.get("device_id", "unknown"),
  143. "project_id": DEVICE_CONFIG.get("project_id", ""),
  144. "timestamp": ts,
  145. "image_paths": [original_path, marked_path],
  146. "detections": [
  147. {"bbox": det["bbox"], "confidence": det["confidence"], "camera_type": camera_type}
  148. for det in to_upload
  149. ],
  150. "ptz_position": ptz_position,
  151. }
  152. logger.info("Uploading batch info for %s", camera_type)
  153. try:
  154. self.upload_callback(batch_info)
  155. except Exception as exc: # noqa: BLE001
  156. logger.warning("Upload callback failed: %s", exc)
  157. return results