stream_manager.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. """多路 RTSP 取流管理 + MJPEG 输出."""
  2. import logging
  3. import threading
  4. import time
  5. from typing import Dict, Optional, Callable
  6. import cv2
  7. import numpy as np
  8. logger = logging.getLogger(__name__)
  9. class CameraStream:
  10. """单个摄像头的 RTSP 流封装."""
  11. def __init__(self, name: str, rtsp_url: str, reconnect_delay: float = 1.0):
  12. self.name = name
  13. self.rtsp_url = rtsp_url
  14. self.reconnect_delay = reconnect_delay
  15. self.latest_frame: Optional[np.ndarray] = None
  16. self.marked_frame: Optional[np.ndarray] = None
  17. self._stop_event = threading.Event()
  18. self.thread: Optional[threading.Thread] = None
  19. self.lock = threading.Lock()
  20. self._last_error: Optional[str] = None
  21. def start(self):
  22. if self.thread and self.thread.is_alive():
  23. return
  24. self._stop_event.clear()
  25. self.thread = threading.Thread(target=self._worker, daemon=True)
  26. self.thread.start()
  27. def stop(self):
  28. self._stop_event.set()
  29. if self.thread:
  30. self.thread.join(timeout=5.0)
  31. self.thread = None
  32. def _worker(self):
  33. # OpenCV read timeouts are best-effort and depend on the video backend;
  34. # cap.set() may return False on some backends (e.g. FFmpeg) and is ignored.
  35. reconnect_delay = self.reconnect_delay
  36. cap = None
  37. try:
  38. while not self._stop_event.is_set():
  39. try:
  40. if cap is not None:
  41. cap.release()
  42. cap = cv2.VideoCapture(self.rtsp_url)
  43. cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
  44. if hasattr(cv2, 'CAP_PROP_OPEN_TIMEOUT_MSEC'):
  45. if not cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 10000):
  46. logger.warning("[%s] CAP_PROP_OPEN_TIMEOUT_MSEC not supported", self.name)
  47. if hasattr(cv2, 'CAP_PROP_READ_TIMEOUT_MSEC'):
  48. if not cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 10000):
  49. logger.warning("[%s] CAP_PROP_READ_TIMEOUT_MSEC not supported", self.name)
  50. if not cap.isOpened():
  51. raise RuntimeError(f"Cannot open {self.rtsp_url}")
  52. while not self._stop_event.is_set():
  53. ret, frame = cap.read()
  54. if not ret:
  55. raise RuntimeError("Frame read failed")
  56. with self.lock:
  57. self.latest_frame = frame
  58. reconnect_delay = self.reconnect_delay
  59. except Exception as e:
  60. with self.lock:
  61. self._last_error = str(e)
  62. time.sleep(reconnect_delay)
  63. reconnect_delay = min(reconnect_delay * 2, 30.0)
  64. finally:
  65. if cap is not None:
  66. cap.release()
  67. def get_frame(self) -> Optional[np.ndarray]:
  68. with self.lock:
  69. return self.latest_frame.copy() if self.latest_frame is not None else None
  70. def set_marked_frame(self, frame: np.ndarray):
  71. with self.lock:
  72. self.marked_frame = frame.copy()
  73. def get_marked_frame(self) -> Optional[np.ndarray]:
  74. with self.lock:
  75. return self.marked_frame.copy() if self.marked_frame is not None else None
  76. @property
  77. def last_error(self) -> Optional[str]:
  78. with self.lock:
  79. return self._last_error
  80. class StreamManager:
  81. def __init__(self):
  82. self._streams: Dict[str, CameraStream] = {}
  83. self._lock = threading.Lock()
  84. def register(self, stream_id: str, rtsp_url: str, reconnect_delay: float = 1.0) -> CameraStream:
  85. with self._lock:
  86. if stream_id not in self._streams:
  87. stream = CameraStream(stream_id, rtsp_url, reconnect_delay=reconnect_delay)
  88. self._streams[stream_id] = stream
  89. stream.start()
  90. return self._streams[stream_id]
  91. def unregister(self, stream_id: str):
  92. with self._lock:
  93. stream = self._streams.pop(stream_id, None)
  94. if stream:
  95. stream.stop()
  96. def get(self, stream_id: str) -> Optional[CameraStream]:
  97. with self._lock:
  98. return self._streams.get(stream_id)
  99. def stop_all(self):
  100. with self._lock:
  101. streams = list(self._streams.values())
  102. self._streams.clear()
  103. for s in streams:
  104. s.stop()
  105. def encode_mjpeg_frame(frame: np.ndarray, quality: int = 80) -> bytes:
  106. """把 numpy 帧编码为 JPEG bytes。"""
  107. encode_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
  108. ret, buf = cv2.imencode(".jpg", frame, encode_params)
  109. if not ret:
  110. raise RuntimeError("JPEG encode failed")
  111. return buf.tobytes()
  112. def generate_mjpeg_stream(get_frame: Callable[[], Optional[np.ndarray]], fps: int = 15):
  113. """MJPEG 流生成器,用于 FastAPI StreamingResponse。"""
  114. period = 1.0 / fps
  115. while True:
  116. frame = get_frame()
  117. if frame is None:
  118. time.sleep(period)
  119. continue
  120. jpeg = encode_mjpeg_frame(frame)
  121. yield (
  122. b"--frame\r\n"
  123. b"Content-Type: image/jpeg\r\n"
  124. b"Content-Length: " + str(len(jpeg)).encode() + b"\r\n\r\n" + jpeg + b"\r\n"
  125. )
  126. time.sleep(period)