|
|
@@ -0,0 +1,91 @@
|
|
|
+import sys
|
|
|
+import os
|
|
|
+import tempfile
|
|
|
+
|
|
|
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
+
|
|
|
+import numpy as np
|
|
|
+from polling_tracker import PollingTrackingCoordinator
|
|
|
+from tracker import TrackedPerson, UltralyticsTracker
|
|
|
+
|
|
|
+
|
|
|
+class FakePanorama:
|
|
|
+ def __init__(self):
|
|
|
+ self.connected = False
|
|
|
+ self.streaming = False
|
|
|
+
|
|
|
+ def connect(self):
|
|
|
+ self.connected = True
|
|
|
+ return True
|
|
|
+
|
|
|
+ def start_stream_rtsp(self):
|
|
|
+ self.streaming = True
|
|
|
+ return True
|
|
|
+
|
|
|
+ def get_frame(self):
|
|
|
+ return np.zeros((480, 640, 3), dtype=np.uint8)
|
|
|
+
|
|
|
+ def disconnect(self):
|
|
|
+ self.connected = False
|
|
|
+ self.streaming = False
|
|
|
+
|
|
|
+
|
|
|
+class FakePTZ:
|
|
|
+ def __init__(self):
|
|
|
+ self.connected = False
|
|
|
+ self.commands = []
|
|
|
+ self.ptz_config = {"default_zoom": 8}
|
|
|
+
|
|
|
+ def connect(self):
|
|
|
+ self.connected = True
|
|
|
+ return True
|
|
|
+
|
|
|
+ def goto_exact_position(self, pan, tilt, zoom):
|
|
|
+ self.commands.append((pan, tilt, zoom))
|
|
|
+ return True
|
|
|
+
|
|
|
+ def get_frame(self):
|
|
|
+ return np.zeros((480, 640, 3), dtype=np.uint8)
|
|
|
+
|
|
|
+ def get_current_position(self):
|
|
|
+ return type("P", (), {"pan": 0, "tilt": 0, "zoom": 1})()
|
|
|
+
|
|
|
+ def calculate_ptz_position(self, x, y, zoom=None):
|
|
|
+ return x * 180, y * 90, zoom or 8
|
|
|
+
|
|
|
+ def disconnect(self):
|
|
|
+ self.connected = False
|
|
|
+
|
|
|
+
|
|
|
+class FakeTracker:
|
|
|
+ def __init__(self):
|
|
|
+ self.frame_count = 0
|
|
|
+
|
|
|
+ def update(self, frame):
|
|
|
+ self.frame_count += 1
|
|
|
+ return [
|
|
|
+ TrackedPerson(track_id=1, bbox=(100, 100, 150, 200), center=(125, 150), confidence=0.9),
|
|
|
+ ]
|
|
|
+
|
|
|
+
|
|
|
+def test_integration_polling_start_stop():
|
|
|
+ pan = FakePanorama()
|
|
|
+ ptz = FakePTZ()
|
|
|
+ tracker = FakeTracker()
|
|
|
+ capture_dir = tempfile.mkdtemp(prefix="tracking_captures_")
|
|
|
+ coord = PollingTrackingCoordinator(pan, ptz, tracker, config={
|
|
|
+ "max_tracking_targets": 4,
|
|
|
+ "ptz_stabilize_time": 0.05,
|
|
|
+ "tracking_timeout": 1.0,
|
|
|
+ "enable_upload": False,
|
|
|
+ "capture_dir": capture_dir,
|
|
|
+ })
|
|
|
+
|
|
|
+ assert coord.start() is True
|
|
|
+ import time
|
|
|
+ time.sleep(0.3)
|
|
|
+ coord.stop()
|
|
|
+
|
|
|
+ assert pan.connected is False
|
|
|
+ assert ptz.connected is False
|
|
|
+ assert len(ptz.commands) > 0
|