import sys import os import time import threading sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import numpy as np import pytest from polling_tracker import PollingTrackingCoordinator, CaptureRecord from tracker import TrackedPerson class _FakePosition: def __init__(self): self.pan = 0 self.tilt = 0 self.zoom = 1 class FakePanorama: def __init__(self): self.frame = np.zeros((480, 640, 3), dtype=np.uint8) self.connected = False self.streaming = False self.stopped = False def connect(self): self.connected = True return True def start_stream_rtsp(self): self.streaming = True return True def stop_stream_rtsp(self): self.stopped = True return True def disconnect(self): self.connected = False self.streaming = False return True def get_frame(self): return self.frame.copy() class FakePTZ: def __init__(self): self.commands = [] self.connected = False self.ptz_frame = np.zeros((100, 100, 3), dtype=np.uint8) self.current_position = _FakePosition() self.ptz_config = {"default_zoom": 8} def connect(self): self.connected = True return True def disconnect(self): self.connected = False return True def goto_exact_position(self, pan, tilt, zoom): self.commands.append((pan, tilt, zoom)) return True def get_current_position(self): return self.current_position def calculate_ptz_position(self, x, y, zoom=None): return x * 180, y * 90, zoom or 8 def get_frame(self): return self.ptz_frame.copy() class FakeTracker: def __init__(self, persons): self.persons = persons self.released = False def update(self, frame): return self.persons def release(self): self.released = True class FakeEventPusher: def __init__(self): self.uploads = [] self.pushes = [] def upload_numpy_image(self, image): url = f"url_{id(image)}" self.uploads.append(url) return url def push_tracking_capture(self, batch_time, captures): self.pushes.append({"batch_time": batch_time, "captures": captures}) def test_update_active_targets(): pan = FakePanorama() ptz = FakePTZ() tracker = FakeTracker([ TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(20, 30), confidence=0.9), TrackedPerson(track_id=2, bbox=(50, 60, 70, 80), center=(60, 70), confidence=0.8), ]) coord = PollingTrackingCoordinator(pan, ptz, tracker, config={"max_tracking_targets": 4}) frame = pan.get_frame() coord._update_active_targets(tracker.update(frame), frame.shape) assert len(coord.active_targets) == 2 assert 1 in coord.target_order assert 2 in coord.target_order def test_advance_loop(): coord = PollingTrackingCoordinator.__new__(PollingTrackingCoordinator) coord.target_order = [1, 2, 3] coord.current_index = 0 coord._advance() assert coord.current_index == 1 coord.current_index = 2 coord._advance() assert coord.current_index == 0 def test_capture_record_creation(): record = CaptureRecord( track_id=1, timestamp=1.0, position=(0.5, 0.5), ptz_position=(90.0, 45.0, 8), ptz_image=np.zeros((100, 100, 3), dtype=np.uint8), panorama_image=None, confidence=0.9, ) assert record.track_id == 1 def test_target_lost_and_timeout_removal(): pan = FakePanorama() ptz = FakePTZ() tracker = FakeTracker([ TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(20, 30), confidence=0.9), ]) coord = PollingTrackingCoordinator( pan, ptz, tracker, config={"tracking_timeout": 0.2, "max_tracking_targets": 4} ) frame = pan.get_frame() coord._update_active_targets(tracker.update(frame), frame.shape) assert 1 in coord.target_order assert not coord.active_targets[1].lost # 当前帧无目标:标记为丢失,但仍在 target_order 中 tracker.persons = [] coord._update_active_targets(tracker.update(frame), frame.shape) assert 1 in coord.target_order assert coord.active_targets[1].lost # 超时后应被移除 time.sleep(0.25) coord._update_active_targets(tracker.update(frame), frame.shape) assert 1 not in coord.target_order assert 1 not in coord.active_targets def test_target_lost_not_removed_before_timeout(): pan = FakePanorama() ptz = FakePTZ() tracker = FakeTracker([ TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(20, 30), confidence=0.9), ]) coord = PollingTrackingCoordinator( pan, ptz, tracker, config={"tracking_timeout": 1.0, "max_tracking_targets": 4} ) frame = pan.get_frame() coord._update_active_targets(tracker.update(frame), frame.shape) tracker.persons = [] coord._update_active_targets(tracker.update(frame), frame.shape) assert 1 in coord.target_order assert coord.active_targets[1].lost def test_batch_upload_flush(): pan = FakePanorama() ptz = FakePTZ() tracker = FakeTracker([ TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(320, 240), confidence=0.9), ]) coord = PollingTrackingCoordinator( pan, ptz, tracker, config={"ptz_stabilize_time": 0.01, "ptz_command_cooldown": 0.0, "enable_upload": True} ) pusher = FakeEventPusher() coord.set_event_pusher(pusher) frame = pan.get_frame() coord._update_active_targets(tracker.update(frame), frame.shape) record = coord._capture_one(list(coord.active_targets.values())[0]) assert record is not None coord.batch_captures.append(record) coord._flush_batch_if_needed() assert len(coord.batch_captures) == 0 assert len(pusher.pushes) == 1 assert len(pusher.uploads) == 2 # PTZ + panorama def test_pause_resume(): pan = FakePanorama() ptz = FakePTZ() tracker = FakeTracker([]) coord = PollingTrackingCoordinator(pan, ptz, tracker, config={}) coord.pause() assert coord._paused is True assert coord._paused_event.is_set() is False coord.resume() assert coord._paused is False assert coord._paused_event.is_set() is True def test_thread_start_stop_lifecycle(): pan = FakePanorama() ptz = FakePTZ() tracker = FakeTracker([]) coord = PollingTrackingCoordinator( pan, ptz, tracker, config={"ptz_stabilize_time": 0.01, "ptz_command_cooldown": 0.0} ) assert coord.start() is True assert coord.running is True assert pan.streaming is True assert ptz.connected is True time.sleep(0.05) coord.stop() assert coord.running is False assert pan.stopped is True assert tracker.released is True def test_ptz_worker_capture_flow(): pan = FakePanorama() ptz = FakePTZ() tracker = FakeTracker([ TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(320, 240), confidence=0.9), ]) coord = PollingTrackingCoordinator( pan, ptz, tracker, config={ "ptz_stabilize_time": 0.01, "ptz_command_cooldown": 0.0, "max_capture_per_target": 1, } ) frame = pan.get_frame() coord._update_active_targets(tracker.update(frame), frame.shape) target = coord.active_targets[1] record = coord._capture_one(target) assert record is not None assert record.track_id == 1 assert len(ptz.commands) == 1 with coord._capture_counts_lock: assert coord._capture_counts[1] == 1 # 超过最大抓拍数后应返回 None record2 = coord._capture_one(target) assert record2 is None