test_capture_uploader.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import sys
  2. import os
  3. import threading
  4. import logging
  5. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  6. import numpy as np
  7. import pytest
  8. from core.capture_uploader import CaptureUploader
  9. def test_handle_detection_saves_and_uploads(tmp_path):
  10. uploads = []
  11. uploader = CaptureUploader("g1", str(tmp_path), upload_callback=uploads.append, dedup_seconds=5.0)
  12. frame = np.zeros((100, 100, 3), dtype=np.uint8)
  13. dets = [{"bbox": [10, 10, 30, 30], "confidence": 0.9}]
  14. results = uploader.handle_detection("panorama", frame, dets)
  15. assert len(results) == 1
  16. assert len(uploads) == 1
  17. batch_info = uploads[0]
  18. assert "batch_id" in batch_info
  19. assert "device_id" in batch_info
  20. assert batch_info["image_paths"] == [results[0]["original"], results[0]["marked"]]
  21. assert len(batch_info["detections"]) == 1
  22. assert os.path.exists(results[0]["original"])
  23. assert os.path.exists(results[0]["marked"])
  24. def test_dedup(tmp_path):
  25. uploads = []
  26. uploader = CaptureUploader("g1", str(tmp_path), upload_callback=uploads.append, dedup_seconds=5.0)
  27. frame = np.zeros((100, 100, 3), dtype=np.uint8)
  28. dets = [{"bbox": [10, 10, 30, 30], "confidence": 0.9}]
  29. uploader.handle_detection("panorama", frame, dets)
  30. uploader.handle_detection("panorama", frame, dets)
  31. assert len(uploads) == 1
  32. assert len(uploads[0]["detections"]) == 1
  33. def test_thread_safety(tmp_path):
  34. uploads = []
  35. errors = []
  36. uploader = CaptureUploader("g1", str(tmp_path), upload_callback=uploads.append, dedup_seconds=5.0)
  37. frame = np.zeros((100, 100, 3), dtype=np.uint8)
  38. dets = [{"bbox": [10, 10, 30, 30], "confidence": 0.9}]
  39. def worker():
  40. try:
  41. uploader.handle_detection("panorama", frame, dets)
  42. except Exception as exc:
  43. errors.append(exc)
  44. threads = [threading.Thread(target=worker) for _ in range(10)]
  45. for t in threads:
  46. t.start()
  47. for t in threads:
  48. t.join()
  49. assert not errors
  50. assert len(uploads) == 1
  51. assert len(uploads[0]["detections"]) == 1
  52. @pytest.mark.parametrize("frame", [
  53. None,
  54. np.zeros((100, 100), dtype=np.uint8),
  55. np.zeros((100, 100, 4), dtype=np.uint8),
  56. np.zeros((100, 100, 3), dtype=np.float32),
  57. ])
  58. def test_invalid_frame_raises(tmp_path, frame):
  59. uploader = CaptureUploader("g1", str(tmp_path), dedup_seconds=5.0)
  60. dets = [{"bbox": [10, 10, 30, 30], "confidence": 0.9}]
  61. with pytest.raises(ValueError):
  62. uploader.handle_detection("panorama", frame, dets)
  63. @pytest.mark.parametrize("dets", [
  64. [{"confidence": 0.9}],
  65. [{"bbox": [10, 10, 30], "confidence": 0.9}],
  66. [{"bbox": [10, 10, 30, "x"], "confidence": 0.9}],
  67. [{"bbox": [10, 10, 30, 30]}],
  68. [{"bbox": [10, 10, 30, 30], "confidence": "high"}],
  69. ])
  70. def test_invalid_detections_raise(tmp_path, dets):
  71. uploader = CaptureUploader("g1", str(tmp_path), dedup_seconds=5.0)
  72. frame = np.zeros((100, 100, 3), dtype=np.uint8)
  73. with pytest.raises(ValueError):
  74. uploader.handle_detection("panorama", frame, dets)
  75. def test_all_dedup_writes_no_files(tmp_path):
  76. uploads = []
  77. uploader = CaptureUploader("g1", str(tmp_path), upload_callback=uploads.append, dedup_seconds=5.0)
  78. frame = np.zeros((100, 100, 3), dtype=np.uint8)
  79. dets = [{"bbox": [10, 10, 30, 30], "confidence": 0.9}]
  80. uploader.handle_detection("panorama", frame, dets)
  81. before = set(os.listdir(uploader.save_dir))
  82. results = uploader.handle_detection("panorama", frame, dets)
  83. after = set(os.listdir(uploader.save_dir))
  84. assert results == []
  85. assert before == after
  86. def test_upload_callback_exception_logged_not_propagated(tmp_path, caplog):
  87. def failing_callback(payload):
  88. raise RuntimeError("upload failed")
  89. uploader = CaptureUploader("g1", str(tmp_path), upload_callback=failing_callback, dedup_seconds=5.0)
  90. frame = np.zeros((100, 100, 3), dtype=np.uint8)
  91. dets = [{"bbox": [10, 10, 30, 30], "confidence": 0.9}]
  92. with caplog.at_level(logging.WARNING):
  93. results = uploader.handle_detection("panorama", frame, dets)
  94. assert len(results) == 1
  95. assert "upload failed" in caplog.text
  96. assert len(uploader._last_uploads) == 1
  97. def test_float_bbox_cast_to_int(tmp_path):
  98. """Float bboxes should be cast to ints for cv2 drawing and stored as ints in payload."""
  99. uploads = []
  100. uploader = CaptureUploader("g1", str(tmp_path), upload_callback=uploads.append, dedup_seconds=5.0)
  101. frame = np.zeros((100, 100, 3), dtype=np.uint8)
  102. dets = [{"bbox": [10.7, 15.2, 30.9, 35.1], "confidence": 0.85}]
  103. results = uploader.handle_detection("panorama", frame, dets)
  104. assert len(results) == 1
  105. assert results[0]["bbox"] == [10, 15, 30, 35]
  106. assert len(uploads) == 1
  107. assert uploads[0]["detections"][0]["bbox"] == [10.7, 15.2, 30.9, 35.1]
  108. assert os.path.exists(results[0]["original"])
  109. assert os.path.exists(results[0]["marked"])