| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- import sys
- import os
- import time
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from core.polling_scheduler import PollingScheduler
- class FakePTZ:
- def __init__(self):
- self.positions = []
- def goto_exact_position(self, pan, tilt, zoom):
- self.positions.append((pan, tilt, zoom))
- class FailingPTZ:
- def goto_exact_position(self, pan, tilt, zoom):
- raise RuntimeError("PTZ failure")
- def test_polling_scheduler_basic():
- ptz = FakePTZ()
- points = [{"pan": 0, "tilt": 0, "zoom": 1, "dwell_time": 0.1}]
- scheduler = PollingScheduler("g1", ptz, lambda: points, stabilize_time=0.0)
- scheduler.start()
- time.sleep(0.5)
- scheduler.stop()
- assert len(ptz.positions) >= 1
- def test_polling_scheduler_empty_points():
- ptz = FakePTZ()
- scheduler = PollingScheduler("g1", ptz, lambda: [], stabilize_time=0.0)
- scheduler.start()
- time.sleep(0.3)
- scheduler.stop()
- assert len(ptz.positions) == 0
- def test_polling_scheduler_multiple_points():
- ptz = FakePTZ()
- points = [
- {"pan": 0, "tilt": 0, "zoom": 1, "dwell_time": 0.05},
- {"pan": 90, "tilt": 10, "zoom": 2, "dwell_time": 0.05},
- {"pan": 180, "tilt": 20, "zoom": 3, "dwell_time": 0.05},
- ]
- scheduler = PollingScheduler("g1", ptz, lambda: points, stabilize_time=0.0)
- scheduler.start()
- time.sleep(1.0)
- scheduler.stop()
- assert len(ptz.positions) >= 3
- assert ptz.positions[0] == (0, 0, 1)
- assert ptz.positions[1] == (90, 10, 2)
- assert ptz.positions[2] == (180, 20, 3)
- def test_polling_scheduler_malformed_point_skipped():
- ptz = FakePTZ()
- points = [
- {"pan": 0, "tilt": 0, "zoom": 1, "dwell_time": 0.05},
- {"tilt": 10, "zoom": 2},
- {"pan": 90},
- "not-a-dict",
- {"pan": 180, "tilt": 20, "zoom": 3, "dwell_time": 0.05},
- ]
- scheduler = PollingScheduler("g1", ptz, lambda: points, stabilize_time=0.0)
- scheduler.start()
- time.sleep(0.5)
- scheduler.stop()
- valid_positions = {(0, 0, 1), (180, 20, 3)}
- assert set(ptz.positions) == valid_positions
- assert (0, 0, 1) in ptz.positions
- assert (180, 20, 3) in ptz.positions
- def test_polling_scheduler_on_arrived_invoked():
- ptz = FakePTZ()
- arrived = []
- points = [
- {"pan": 0, "tilt": 0, "zoom": 1, "dwell_time": 0.05},
- {"pan": 90, "tilt": 10, "zoom": 2, "dwell_time": 0.05},
- ]
- scheduler = PollingScheduler(
- "g1",
- ptz,
- lambda: points,
- on_arrived=lambda pt: arrived.append(pt),
- stabilize_time=0.0,
- )
- scheduler.start()
- time.sleep(0.6)
- scheduler.stop()
- assert len(arrived) >= 2
- assert arrived[0] == points[0]
- assert arrived[1] == points[1]
- def test_polling_scheduler_pause_resume():
- ptz = FakePTZ()
- points = [{"pan": 0, "tilt": 0, "zoom": 1, "dwell_time": 0.5}]
- scheduler = PollingScheduler("g1", ptz, lambda: points, stabilize_time=0.0)
- scheduler.start()
- time.sleep(0.2)
- scheduler.pause()
- time.sleep(0.5)
- positions_while_paused = list(ptz.positions)
- scheduler.resume()
- time.sleep(0.7)
- scheduler.stop()
- assert len(positions_while_paused) >= 1
- assert len(ptz.positions) >= len(positions_while_paused)
- def test_polling_scheduler_ptz_error_continues():
- ptz = FailingPTZ()
- points = [
- {"pan": 0, "tilt": 0, "zoom": 1, "dwell_time": 0.05},
- {"pan": 90, "tilt": 10, "zoom": 2, "dwell_time": 0.05},
- ]
- scheduler = PollingScheduler("g1", ptz, lambda: points, stabilize_time=0.0)
- scheduler.start()
- time.sleep(0.6)
- scheduler.stop()
- # The scheduler should not crash; thread exits cleanly.
- assert scheduler.thread is None
|