| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- import sys
- import os
- import threading
- import logging
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import numpy as np
- import pytest
- from core.capture_uploader import CaptureUploader
- def test_handle_detection_saves_and_uploads(tmp_path):
- uploads = []
- uploader = CaptureUploader("g1", str(tmp_path), upload_callback=uploads.append, dedup_seconds=5.0)
- frame = np.zeros((100, 100, 3), dtype=np.uint8)
- dets = [{"bbox": [10, 10, 30, 30], "confidence": 0.9}]
- results = uploader.handle_detection("panorama", frame, dets)
- assert len(results) == 1
- assert len(uploads) == 1
- batch_info = uploads[0]
- assert "batch_id" in batch_info
- assert "device_id" in batch_info
- assert batch_info["image_paths"] == [results[0]["original"], results[0]["marked"]]
- assert len(batch_info["detections"]) == 1
- assert os.path.exists(results[0]["original"])
- assert os.path.exists(results[0]["marked"])
- def test_dedup(tmp_path):
- uploads = []
- uploader = CaptureUploader("g1", str(tmp_path), upload_callback=uploads.append, dedup_seconds=5.0)
- frame = np.zeros((100, 100, 3), dtype=np.uint8)
- dets = [{"bbox": [10, 10, 30, 30], "confidence": 0.9}]
- uploader.handle_detection("panorama", frame, dets)
- uploader.handle_detection("panorama", frame, dets)
- assert len(uploads) == 1
- assert len(uploads[0]["detections"]) == 1
- def test_thread_safety(tmp_path):
- uploads = []
- errors = []
- uploader = CaptureUploader("g1", str(tmp_path), upload_callback=uploads.append, dedup_seconds=5.0)
- frame = np.zeros((100, 100, 3), dtype=np.uint8)
- dets = [{"bbox": [10, 10, 30, 30], "confidence": 0.9}]
- def worker():
- try:
- uploader.handle_detection("panorama", frame, dets)
- except Exception as exc:
- errors.append(exc)
- threads = [threading.Thread(target=worker) for _ in range(10)]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
- assert not errors
- assert len(uploads) == 1
- assert len(uploads[0]["detections"]) == 1
- @pytest.mark.parametrize("frame", [
- None,
- np.zeros((100, 100), dtype=np.uint8),
- np.zeros((100, 100, 4), dtype=np.uint8),
- np.zeros((100, 100, 3), dtype=np.float32),
- ])
- def test_invalid_frame_raises(tmp_path, frame):
- uploader = CaptureUploader("g1", str(tmp_path), dedup_seconds=5.0)
- dets = [{"bbox": [10, 10, 30, 30], "confidence": 0.9}]
- with pytest.raises(ValueError):
- uploader.handle_detection("panorama", frame, dets)
- @pytest.mark.parametrize("dets", [
- [{"confidence": 0.9}],
- [{"bbox": [10, 10, 30], "confidence": 0.9}],
- [{"bbox": [10, 10, 30, "x"], "confidence": 0.9}],
- [{"bbox": [10, 10, 30, 30]}],
- [{"bbox": [10, 10, 30, 30], "confidence": "high"}],
- ])
- def test_invalid_detections_raise(tmp_path, dets):
- uploader = CaptureUploader("g1", str(tmp_path), dedup_seconds=5.0)
- frame = np.zeros((100, 100, 3), dtype=np.uint8)
- with pytest.raises(ValueError):
- uploader.handle_detection("panorama", frame, dets)
- def test_all_dedup_writes_no_files(tmp_path):
- uploads = []
- uploader = CaptureUploader("g1", str(tmp_path), upload_callback=uploads.append, dedup_seconds=5.0)
- frame = np.zeros((100, 100, 3), dtype=np.uint8)
- dets = [{"bbox": [10, 10, 30, 30], "confidence": 0.9}]
- uploader.handle_detection("panorama", frame, dets)
- before = set(os.listdir(uploader.save_dir))
- results = uploader.handle_detection("panorama", frame, dets)
- after = set(os.listdir(uploader.save_dir))
- assert results == []
- assert before == after
- def test_upload_callback_exception_logged_not_propagated(tmp_path, caplog):
- def failing_callback(payload):
- raise RuntimeError("upload failed")
- uploader = CaptureUploader("g1", str(tmp_path), upload_callback=failing_callback, dedup_seconds=5.0)
- frame = np.zeros((100, 100, 3), dtype=np.uint8)
- dets = [{"bbox": [10, 10, 30, 30], "confidence": 0.9}]
- with caplog.at_level(logging.WARNING):
- results = uploader.handle_detection("panorama", frame, dets)
- assert len(results) == 1
- assert "upload failed" in caplog.text
- assert len(uploader._last_uploads) == 1
- def test_float_bbox_cast_to_int(tmp_path):
- """Float bboxes should be cast to ints for cv2 drawing and stored as ints in payload."""
- uploads = []
- uploader = CaptureUploader("g1", str(tmp_path), upload_callback=uploads.append, dedup_seconds=5.0)
- frame = np.zeros((100, 100, 3), dtype=np.uint8)
- dets = [{"bbox": [10.7, 15.2, 30.9, 35.1], "confidence": 0.85}]
- results = uploader.handle_detection("panorama", frame, dets)
- assert len(results) == 1
- assert results[0]["bbox"] == [10, 15, 30, 35]
- assert len(uploads) == 1
- assert uploads[0]["detections"][0]["bbox"] == [10.7, 15.2, 30.9, 35.1]
- assert os.path.exists(results[0]["original"])
- assert os.path.exists(results[0]["marked"])
|