| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- import sys
- import os
- import time
- import threading
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import numpy as np
- import pytest
- from polling_tracker import PollingTrackingCoordinator, CaptureRecord
- from tracker import TrackedPerson
- class _FakePosition:
- def __init__(self):
- self.pan = 0
- self.tilt = 0
- self.zoom = 1
- class FakePanorama:
- def __init__(self):
- self.frame = np.zeros((480, 640, 3), dtype=np.uint8)
- self.connected = False
- self.streaming = False
- self.stopped = False
- def connect(self):
- self.connected = True
- return True
- def start_stream_rtsp(self):
- self.streaming = True
- return True
- def stop_stream_rtsp(self):
- self.stopped = True
- return True
- def disconnect(self):
- self.connected = False
- self.streaming = False
- return True
- def get_frame(self):
- return self.frame.copy()
- class FakePTZ:
- def __init__(self):
- self.commands = []
- self.connected = False
- self.ptz_frame = np.zeros((100, 100, 3), dtype=np.uint8)
- self.current_position = _FakePosition()
- self.ptz_config = {"default_zoom": 8}
- def connect(self):
- self.connected = True
- return True
- def disconnect(self):
- self.connected = False
- return True
- def goto_exact_position(self, pan, tilt, zoom):
- self.commands.append((pan, tilt, zoom))
- return True
- def get_current_position(self):
- return self.current_position
- def calculate_ptz_position(self, x, y, zoom=None):
- return x * 180, y * 90, zoom or 8
- def get_frame(self):
- return self.ptz_frame.copy()
- class FakeTracker:
- def __init__(self, persons):
- self.persons = persons
- self.released = False
- def update(self, frame):
- return self.persons
- def release(self):
- self.released = True
- class FakeCalibrator:
- """模拟已校准的标定器,返回固定的 PTZ 角度。"""
- def __init__(self, pan=180.0, tilt=45.0):
- self._pan = pan
- self._tilt = tilt
- def transform(self, x_ratio, y_ratio):
- return (self._pan, self._tilt)
- def is_calibrated(self):
- return True
- class FakeEventPusher:
- def __init__(self):
- self.uploads = []
- self.pushes = []
- def upload_numpy_image(self, image):
- url = f"url_{id(image)}"
- self.uploads.append(url)
- return url
- def push_tracking_capture(self, batch_time, captures):
- self.pushes.append({"batch_time": batch_time, "captures": captures})
- def test_update_active_targets():
- pan = FakePanorama()
- ptz = FakePTZ()
- tracker = FakeTracker([
- TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(20, 30), confidence=0.9),
- TrackedPerson(track_id=2, bbox=(50, 60, 70, 80), center=(60, 70), confidence=0.8),
- ])
- coord = PollingTrackingCoordinator(pan, ptz, tracker, config={"max_tracking_targets": 4})
- frame = pan.get_frame()
- coord._update_active_targets(tracker.update(frame), frame.shape)
- assert len(coord.active_targets) == 2
- assert 1 in coord.target_order
- assert 2 in coord.target_order
- def test_advance_loop():
- coord = PollingTrackingCoordinator.__new__(PollingTrackingCoordinator)
- coord.target_order = [1, 2, 3]
- coord.current_index = 0
- coord._advance()
- assert coord.current_index == 1
- coord.current_index = 2
- coord._advance()
- assert coord.current_index == 0
- def test_capture_record_creation():
- record = CaptureRecord(
- track_id=1,
- timestamp=1.0,
- position=(0.5, 0.5),
- ptz_position=(90.0, 45.0, 8),
- ptz_image=np.zeros((100, 100, 3), dtype=np.uint8),
- panorama_image=None,
- confidence=0.9,
- )
- assert record.track_id == 1
- def test_target_lost_and_timeout_removal():
- pan = FakePanorama()
- ptz = FakePTZ()
- tracker = FakeTracker([
- TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(20, 30), confidence=0.9),
- ])
- coord = PollingTrackingCoordinator(
- pan, ptz, tracker, config={"tracking_timeout": 0.2, "max_tracking_targets": 4}
- )
- frame = pan.get_frame()
- coord._update_active_targets(tracker.update(frame), frame.shape)
- assert 1 in coord.target_order
- assert not coord.active_targets[1].lost
- # 当前帧无目标:标记为丢失,但仍在 target_order 中
- tracker.persons = []
- coord._update_active_targets(tracker.update(frame), frame.shape)
- assert 1 in coord.target_order
- assert coord.active_targets[1].lost
- # 超时后应被移除
- time.sleep(0.25)
- coord._update_active_targets(tracker.update(frame), frame.shape)
- assert 1 not in coord.target_order
- assert 1 not in coord.active_targets
- def test_target_lost_not_removed_before_timeout():
- pan = FakePanorama()
- ptz = FakePTZ()
- tracker = FakeTracker([
- TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(20, 30), confidence=0.9),
- ])
- coord = PollingTrackingCoordinator(
- pan, ptz, tracker, config={"tracking_timeout": 1.0, "max_tracking_targets": 4}
- )
- frame = pan.get_frame()
- coord._update_active_targets(tracker.update(frame), frame.shape)
- tracker.persons = []
- coord._update_active_targets(tracker.update(frame), frame.shape)
- assert 1 in coord.target_order
- assert coord.active_targets[1].lost
- def test_batch_upload_flush():
- pan = FakePanorama()
- ptz = FakePTZ()
- tracker = FakeTracker([
- TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(320, 240), confidence=0.9),
- ])
- coord = PollingTrackingCoordinator(
- pan, ptz, tracker,
- config={"ptz_stabilize_time": 0.01, "ptz_command_cooldown": 0.0, "enable_upload": True}
- )
- pusher = FakeEventPusher()
- coord.set_event_pusher(pusher)
- frame = pan.get_frame()
- coord._update_active_targets(tracker.update(frame), frame.shape)
- record = coord._capture_one(list(coord.active_targets.values())[0])
- assert record is not None
- coord.batch_captures.append(record)
- coord._flush_batch_if_needed()
- assert len(coord.batch_captures) == 0
- assert len(pusher.pushes) == 1
- assert len(pusher.uploads) == 2 # PTZ + panorama
- def test_pause_resume():
- pan = FakePanorama()
- ptz = FakePTZ()
- tracker = FakeTracker([])
- coord = PollingTrackingCoordinator(pan, ptz, tracker, config={})
- coord.pause()
- assert coord._paused is True
- assert coord._paused_event.is_set() is False
- coord.resume()
- assert coord._paused is False
- assert coord._paused_event.is_set() is True
- def test_thread_start_stop_lifecycle():
- pan = FakePanorama()
- ptz = FakePTZ()
- tracker = FakeTracker([])
- coord = PollingTrackingCoordinator(
- pan, ptz, tracker,
- config={"ptz_stabilize_time": 0.01, "ptz_command_cooldown": 0.0}
- )
- assert coord.start() is True
- assert coord.running is True
- assert pan.streaming is True
- assert ptz.connected is True
- time.sleep(0.05)
- coord.stop()
- assert coord.running is False
- assert pan.stopped is True
- assert tracker.released is True
- def test_ptz_worker_capture_flow():
- pan = FakePanorama()
- ptz = FakePTZ()
- tracker = FakeTracker([
- TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(320, 240), confidence=0.9),
- ])
- coord = PollingTrackingCoordinator(
- pan, ptz, tracker,
- config={
- "ptz_stabilize_time": 0.01,
- "ptz_command_cooldown": 0.0,
- "max_capture_per_target": 1,
- }
- )
- frame = pan.get_frame()
- coord._update_active_targets(tracker.update(frame), frame.shape)
- target = coord.active_targets[1]
- record = coord._capture_one(target)
- assert record is not None
- assert record.track_id == 1
- assert len(ptz.commands) == 1
- with coord._capture_counts_lock:
- assert coord._capture_counts[1] == 1
- # 超过最大抓拍数后应返回 None
- record2 = coord._capture_one(target)
- assert record2 is None
- def test_capture_with_pan_flip_does_not_double_flip():
- """
- 验证:当存在校准器且 ptz_config 启用 pan_flip 时,
- 不应在校准结果上再次应用 pan_flip。
- 场景:球机实际朝向与枪机相反(pan_flip=True)。
- 校准器通过真实扫描得到全景中心对应的球机角度为 180°,
- 该角度应直接发送给球机。若再次翻转,球机会被转到背面(0°)。
- """
- pan = FakePanorama()
- ptz = FakePTZ()
- ptz.ptz_config["pan_flip"] = True
- tracker = FakeTracker([
- TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(320, 240), confidence=0.9),
- ])
- calibrator = FakeCalibrator(pan=180.0, tilt=45.0)
- coord = PollingTrackingCoordinator(
- pan, ptz, tracker,
- config={"ptz_stabilize_time": 0.01, "ptz_command_cooldown": 0.0},
- calibrator=calibrator,
- )
- frame = pan.get_frame()
- coord._update_active_targets(tracker.update(frame), frame.shape)
- target = coord.active_targets[1]
- record = coord._capture_one(target)
- assert record is not None
- assert len(ptz.commands) == 1
- sent_pan, sent_tilt, sent_zoom = ptz.commands[0]
- # 校准器返回的 180° 应直接发送,不能因 pan_flip 而被翻为 0°
- assert sent_pan == 180.0
- assert sent_tilt == 45.0
|