polling_scheduler.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. """扫描点轮询调度器."""
  2. import logging
  3. import threading
  4. import time
  5. from typing import Callable, Dict, List, Optional
  6. logger = logging.getLogger(__name__)
  7. class PollingScheduler:
  8. def __init__(
  9. self,
  10. group_id: str,
  11. ptz_camera,
  12. get_points: Callable[[], List[Dict]],
  13. on_arrived: Optional[Callable[[Dict], None]] = None,
  14. on_finished: Optional[Callable[[], None]] = None,
  15. default_dwell: float = 3.0,
  16. stabilize_time: float = 1.5,
  17. ):
  18. self.group_id = group_id
  19. self.ptz = ptz_camera
  20. self.get_points = get_points
  21. self.on_arrived = on_arrived
  22. self.on_finished = on_finished
  23. self.default_dwell = default_dwell
  24. self.stabilize_time = stabilize_time
  25. self._stop_event = threading.Event()
  26. self._pause_event = threading.Event()
  27. self._pause_event.set()
  28. self.thread: Optional[threading.Thread] = None
  29. self.current_point: Optional[Dict] = None
  30. def start(self):
  31. if self.thread is not None and self.thread.is_alive():
  32. logger.warning(
  33. "[%s] PollingScheduler start() ignored: previous worker thread is still alive",
  34. self.group_id,
  35. )
  36. return
  37. self._stop_event.clear()
  38. self._pause_event.set()
  39. self.thread = threading.Thread(target=self._loop, daemon=True)
  40. self.thread.start()
  41. def stop(self):
  42. self._stop_event.set()
  43. self._pause_event.set()
  44. if self.thread is not None:
  45. self.thread.join(timeout=5.0)
  46. if self.thread.is_alive():
  47. logger.warning(
  48. "[%s] PollingScheduler stop() timed out waiting for worker thread to exit",
  49. self.group_id,
  50. )
  51. else:
  52. self.thread = None
  53. self.current_point = None
  54. def pause(self):
  55. self._pause_event.clear()
  56. def resume(self):
  57. self._pause_event.set()
  58. def _loop(self):
  59. while not self._stop_event.is_set():
  60. try:
  61. points = self.get_points()
  62. except Exception:
  63. logger.exception("[%s] get_points() raised an exception", self.group_id)
  64. self._stop_event.wait(1.0)
  65. continue
  66. if not points:
  67. self._stop_event.wait(1.0)
  68. continue
  69. for point in points:
  70. if self._stop_event.is_set():
  71. break
  72. while not self._pause_event.is_set() and not self._stop_event.is_set():
  73. self._stop_event.wait(0.5)
  74. if self._stop_event.is_set():
  75. break
  76. if not isinstance(point, dict) or "pan" not in point or "tilt" not in point:
  77. logger.error(
  78. "[%s] Skipping malformed point: %r",
  79. self.group_id,
  80. point,
  81. )
  82. continue
  83. self.current_point = point
  84. try:
  85. self.ptz.goto_exact_position(
  86. point["pan"], point["tilt"], point.get("zoom", 1)
  87. )
  88. except Exception:
  89. logger.exception(
  90. "[%s] goto_exact_position failed for point %r",
  91. self.group_id,
  92. point,
  93. )
  94. self.current_point = None
  95. continue
  96. self._stop_event.wait(self.stabilize_time)
  97. if self._stop_event.is_set():
  98. break
  99. if self.on_arrived:
  100. try:
  101. self.on_arrived(point)
  102. except Exception:
  103. logger.exception(
  104. "[%s] on_arrived callback failed for point %r",
  105. self.group_id,
  106. point,
  107. )
  108. dwell = point.get("dwell_time", self.default_dwell)
  109. if self._stop_event.wait(dwell):
  110. break
  111. if self.on_finished:
  112. try:
  113. self.on_finished()
  114. except Exception:
  115. logger.exception("[%s] on_finished callback failed", self.group_id)
  116. self.current_point = None