| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- """扫描点轮询调度器."""
- import logging
- import threading
- import time
- from typing import Callable, Dict, List, Optional
- logger = logging.getLogger(__name__)
- class PollingScheduler:
- def __init__(
- self,
- group_id: str,
- ptz_camera,
- get_points: Callable[[], List[Dict]],
- on_arrived: Optional[Callable[[Dict], None]] = None,
- on_finished: Optional[Callable[[], None]] = None,
- default_dwell: float = 3.0,
- stabilize_time: float = 1.5,
- ):
- self.group_id = group_id
- self.ptz = ptz_camera
- self.get_points = get_points
- self.on_arrived = on_arrived
- self.on_finished = on_finished
- self.default_dwell = default_dwell
- self.stabilize_time = stabilize_time
- self._stop_event = threading.Event()
- self._pause_event = threading.Event()
- self._pause_event.set()
- self.thread: Optional[threading.Thread] = None
- self.current_point: Optional[Dict] = None
- def start(self):
- if self.thread is not None and self.thread.is_alive():
- logger.warning(
- "[%s] PollingScheduler start() ignored: previous worker thread is still alive",
- self.group_id,
- )
- return
- self._stop_event.clear()
- self._pause_event.set()
- self.thread = threading.Thread(target=self._loop, daemon=True)
- self.thread.start()
- def stop(self):
- self._stop_event.set()
- self._pause_event.set()
- if self.thread is not None:
- self.thread.join(timeout=5.0)
- if self.thread.is_alive():
- logger.warning(
- "[%s] PollingScheduler stop() timed out waiting for worker thread to exit",
- self.group_id,
- )
- else:
- self.thread = None
- self.current_point = None
- def pause(self):
- self._pause_event.clear()
- def resume(self):
- self._pause_event.set()
- def _loop(self):
- while not self._stop_event.is_set():
- try:
- points = self.get_points()
- except Exception:
- logger.exception("[%s] get_points() raised an exception", self.group_id)
- self._stop_event.wait(1.0)
- continue
- if not points:
- self._stop_event.wait(1.0)
- continue
- for point in points:
- if self._stop_event.is_set():
- break
- while not self._pause_event.is_set() and not self._stop_event.is_set():
- self._stop_event.wait(0.5)
- if self._stop_event.is_set():
- break
- if not isinstance(point, dict) or "pan" not in point or "tilt" not in point:
- logger.error(
- "[%s] Skipping malformed point: %r",
- self.group_id,
- point,
- )
- continue
- self.current_point = point
- try:
- self.ptz.goto_exact_position(
- point["pan"], point["tilt"], point.get("zoom", 1)
- )
- except Exception:
- logger.exception(
- "[%s] goto_exact_position failed for point %r",
- self.group_id,
- point,
- )
- self.current_point = None
- continue
- self._stop_event.wait(self.stabilize_time)
- if self._stop_event.is_set():
- break
- if self.on_arrived:
- try:
- self.on_arrived(point)
- except Exception:
- logger.exception(
- "[%s] on_arrived callback failed for point %r",
- self.group_id,
- point,
- )
- dwell = point.get("dwell_time", self.default_dwell)
- if self._stop_event.wait(dwell):
- break
- if self.on_finished:
- try:
- self.on_finished()
- except Exception:
- logger.exception("[%s] on_finished callback failed", self.group_id)
- self.current_point = None
|