test_polling_scheduler.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import sys
  2. import os
  3. import time
  4. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  5. from core.polling_scheduler import PollingScheduler
  6. class FakePTZ:
  7. def __init__(self):
  8. self.positions = []
  9. def goto_exact_position(self, pan, tilt, zoom):
  10. self.positions.append((pan, tilt, zoom))
  11. class FailingPTZ:
  12. def goto_exact_position(self, pan, tilt, zoom):
  13. raise RuntimeError("PTZ failure")
  14. def test_polling_scheduler_basic():
  15. ptz = FakePTZ()
  16. points = [{"pan": 0, "tilt": 0, "zoom": 1, "dwell_time": 0.1}]
  17. scheduler = PollingScheduler("g1", ptz, lambda: points, stabilize_time=0.0)
  18. scheduler.start()
  19. time.sleep(0.5)
  20. scheduler.stop()
  21. assert len(ptz.positions) >= 1
  22. def test_polling_scheduler_empty_points():
  23. ptz = FakePTZ()
  24. scheduler = PollingScheduler("g1", ptz, lambda: [], stabilize_time=0.0)
  25. scheduler.start()
  26. time.sleep(0.3)
  27. scheduler.stop()
  28. assert len(ptz.positions) == 0
  29. def test_polling_scheduler_multiple_points():
  30. ptz = FakePTZ()
  31. points = [
  32. {"pan": 0, "tilt": 0, "zoom": 1, "dwell_time": 0.05},
  33. {"pan": 90, "tilt": 10, "zoom": 2, "dwell_time": 0.05},
  34. {"pan": 180, "tilt": 20, "zoom": 3, "dwell_time": 0.05},
  35. ]
  36. scheduler = PollingScheduler("g1", ptz, lambda: points, stabilize_time=0.0)
  37. scheduler.start()
  38. time.sleep(1.0)
  39. scheduler.stop()
  40. assert len(ptz.positions) >= 3
  41. assert ptz.positions[0] == (0, 0, 1)
  42. assert ptz.positions[1] == (90, 10, 2)
  43. assert ptz.positions[2] == (180, 20, 3)
  44. def test_polling_scheduler_malformed_point_skipped():
  45. ptz = FakePTZ()
  46. points = [
  47. {"pan": 0, "tilt": 0, "zoom": 1, "dwell_time": 0.05},
  48. {"tilt": 10, "zoom": 2},
  49. {"pan": 90},
  50. "not-a-dict",
  51. {"pan": 180, "tilt": 20, "zoom": 3, "dwell_time": 0.05},
  52. ]
  53. scheduler = PollingScheduler("g1", ptz, lambda: points, stabilize_time=0.0)
  54. scheduler.start()
  55. time.sleep(0.5)
  56. scheduler.stop()
  57. valid_positions = {(0, 0, 1), (180, 20, 3)}
  58. assert set(ptz.positions) == valid_positions
  59. assert (0, 0, 1) in ptz.positions
  60. assert (180, 20, 3) in ptz.positions
  61. def test_polling_scheduler_on_arrived_invoked():
  62. ptz = FakePTZ()
  63. arrived = []
  64. points = [
  65. {"pan": 0, "tilt": 0, "zoom": 1, "dwell_time": 0.05},
  66. {"pan": 90, "tilt": 10, "zoom": 2, "dwell_time": 0.05},
  67. ]
  68. scheduler = PollingScheduler(
  69. "g1",
  70. ptz,
  71. lambda: points,
  72. on_arrived=lambda pt: arrived.append(pt),
  73. stabilize_time=0.0,
  74. )
  75. scheduler.start()
  76. time.sleep(0.6)
  77. scheduler.stop()
  78. assert len(arrived) >= 2
  79. assert arrived[0] == points[0]
  80. assert arrived[1] == points[1]
  81. def test_polling_scheduler_pause_resume():
  82. ptz = FakePTZ()
  83. points = [{"pan": 0, "tilt": 0, "zoom": 1, "dwell_time": 0.5}]
  84. scheduler = PollingScheduler("g1", ptz, lambda: points, stabilize_time=0.0)
  85. scheduler.start()
  86. time.sleep(0.2)
  87. scheduler.pause()
  88. time.sleep(0.5)
  89. positions_while_paused = list(ptz.positions)
  90. scheduler.resume()
  91. time.sleep(0.7)
  92. scheduler.stop()
  93. assert len(positions_while_paused) >= 1
  94. assert len(ptz.positions) >= len(positions_while_paused)
  95. def test_polling_scheduler_ptz_error_continues():
  96. ptz = FailingPTZ()
  97. points = [
  98. {"pan": 0, "tilt": 0, "zoom": 1, "dwell_time": 0.05},
  99. {"pan": 90, "tilt": 10, "zoom": 2, "dwell_time": 0.05},
  100. ]
  101. scheduler = PollingScheduler("g1", ptz, lambda: points, stabilize_time=0.0)
  102. scheduler.start()
  103. time.sleep(0.6)
  104. scheduler.stop()
  105. # The scheduler should not crash; thread exits cleanly.
  106. assert scheduler.thread is None