capture_uploader.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. """检测到人后的保存 + 上传."""
  2. import logging
  3. import os
  4. import threading
  5. import time
  6. import uuid
  7. from typing import Any, Callable, Dict, List, Optional
  8. import cv2
  9. import numpy as np
  10. from config.device import DEVICE_CONFIG
  11. from core.oss_uploader import OSSUploader
  12. logger = logging.getLogger(__name__)
  13. class CaptureUploader:
  14. def __init__(
  15. self,
  16. group_id: str,
  17. save_dir: str = "data/captures",
  18. upload_callback: Optional[Callable[[Dict], None]] = None,
  19. oss_uploader: Optional[OSSUploader] = None,
  20. dedup_seconds: float = 5.0,
  21. ):
  22. self.group_id = group_id
  23. self.save_dir = os.path.join(save_dir, group_id)
  24. os.makedirs(self.save_dir, exist_ok=True)
  25. self.upload_callback = upload_callback
  26. self.oss_uploader = oss_uploader
  27. self.dedup_seconds = dedup_seconds
  28. self._last_uploads: List[Dict[str, Any]] = []
  29. self._lock = threading.Lock()
  30. self._counter = 0
  31. def _should_upload(self, bbox: List[float]) -> bool:
  32. cx = (bbox[0] + bbox[2]) / 2
  33. cy = (bbox[1] + bbox[3]) / 2
  34. for u in self._last_uploads:
  35. dx = abs(u["cx"] - cx)
  36. dy = abs(u["cy"] - cy)
  37. if dx < 50 and dy < 50:
  38. return False
  39. return True
  40. def _validate_inputs(
  41. self,
  42. frame: np.ndarray,
  43. detections: List[Dict],
  44. ) -> None:
  45. if not isinstance(frame, np.ndarray):
  46. raise ValueError("frame must be a numpy ndarray")
  47. if frame.ndim != 3 or frame.shape[2] != 3:
  48. raise ValueError("frame must have shape (H, W, 3)")
  49. if frame.dtype != np.uint8:
  50. raise ValueError("frame must have dtype uint8")
  51. for i, det in enumerate(detections):
  52. if not isinstance(det, dict):
  53. raise ValueError(f"detection {i} must be a dict")
  54. if "bbox" not in det:
  55. raise ValueError(f"detection {i} missing bbox")
  56. bbox = det["bbox"]
  57. if not isinstance(bbox, (list, tuple)) or len(bbox) != 4:
  58. raise ValueError(f"detection {i} bbox must be a list/tuple of 4 numbers")
  59. try:
  60. [float(v) for v in bbox]
  61. except (TypeError, ValueError):
  62. raise ValueError(f"detection {i} bbox must contain numbers")
  63. if "confidence" not in det:
  64. raise ValueError(f"detection {i} missing confidence")
  65. try:
  66. float(det["confidence"])
  67. except (TypeError, ValueError):
  68. raise ValueError(f"detection {i} confidence must be a number")
  69. def handle_detection(
  70. self,
  71. camera_type: str,
  72. frame: np.ndarray,
  73. detections: List[Dict],
  74. ) -> List[Dict]:
  75. self._validate_inputs(frame, detections)
  76. if not detections:
  77. return []
  78. with self._lock:
  79. now = time.monotonic()
  80. self._last_uploads = [
  81. u for u in self._last_uploads
  82. if now - u["time"] < self.dedup_seconds
  83. ]
  84. upload_decisions = [
  85. (det, self._should_upload(det["bbox"]))
  86. for det in detections
  87. ]
  88. to_upload = [det for det, should in upload_decisions if should]
  89. if not to_upload:
  90. logger.debug("All detections deduplicated; skipping file writes")
  91. return []
  92. self._counter += 1
  93. counter = self._counter
  94. ts = int(time.time() * 1000)
  95. original_path = os.path.join(
  96. self.save_dir, f"{ts}_{counter}_original.jpg"
  97. )
  98. marked_path = os.path.join(
  99. self.save_dir, f"{ts}_{counter}_marked.jpg"
  100. )
  101. for det in to_upload:
  102. self._last_uploads.append({
  103. "cx": (det["bbox"][0] + det["bbox"][2]) / 2,
  104. "cy": (det["bbox"][1] + det["bbox"][3]) / 2,
  105. "time": time.monotonic(),
  106. })
  107. logger.info("Saving original image to %s", original_path)
  108. if not cv2.imwrite(original_path, frame):
  109. raise RuntimeError(f"Failed to write {original_path}")
  110. marked = frame.copy()
  111. for det in to_upload:
  112. x1, y1, x2, y2 = map(int, det["bbox"])
  113. cv2.rectangle(marked, (x1, y1), (x2, y2), (0, 255, 0), 2)
  114. cv2.putText(marked, f"{det['confidence']:.2f}", (x1, y1 - 5),
  115. cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
  116. logger.info("Saving marked image to %s", marked_path)
  117. if not cv2.imwrite(marked_path, marked):
  118. raise RuntimeError(f"Failed to write {marked_path}")
  119. results: List[Dict] = []
  120. for det in to_upload:
  121. x1, y1, x2, y2 = map(int, det["bbox"])
  122. payload = {
  123. "group_id": self.group_id,
  124. "camera_type": camera_type,
  125. "timestamp": ts,
  126. "original": original_path,
  127. "marked": marked_path,
  128. "bbox": [x1, y1, x2, y2],
  129. "confidence": det["confidence"],
  130. }
  131. results.append(payload)
  132. image_urls = {}
  133. if self.oss_uploader is not None and self.oss_uploader.enabled:
  134. image_urls = self.oss_uploader.upload_pair(original_path, marked_path, prefix=camera_type)
  135. logger.info("OSS URLs: %s", image_urls)
  136. if self.upload_callback and results:
  137. batch_info = {
  138. "batch_id": str(uuid.uuid4()),
  139. "device_id": DEVICE_CONFIG.get("device_id", "unknown"),
  140. "project_id": DEVICE_CONFIG.get("project_id", ""),
  141. "timestamp": ts,
  142. "camera_type": camera_type,
  143. "image_paths": [original_path, marked_path],
  144. "image_urls": image_urls,
  145. "detections": [
  146. {"bbox": det["bbox"], "confidence": det["confidence"]}
  147. for det in to_upload
  148. ],
  149. }
  150. logger.info("Uploading batch info")
  151. try:
  152. self.upload_callback(batch_info)
  153. except Exception as exc:
  154. logger.warning("Upload callback failed: %s", exc)
  155. return results