"""多路 RTSP 取流管理 + MJPEG 输出.""" import logging import threading import time from typing import Dict, Optional, Callable import cv2 import numpy as np logger = logging.getLogger(__name__) class CameraStream: """单个摄像头的 RTSP 流封装.""" def __init__(self, name: str, rtsp_url: str, reconnect_delay: float = 1.0): self.name = name self.rtsp_url = rtsp_url self.reconnect_delay = reconnect_delay self.latest_frame: Optional[np.ndarray] = None self.marked_frame: Optional[np.ndarray] = None self._stop_event = threading.Event() self.thread: Optional[threading.Thread] = None self.lock = threading.Lock() self._last_error: Optional[str] = None def start(self): if self.thread and self.thread.is_alive(): return self._stop_event.clear() self.thread = threading.Thread(target=self._worker, daemon=True) self.thread.start() def stop(self): self._stop_event.set() if self.thread: self.thread.join(timeout=5.0) self.thread = None def _worker(self): # OpenCV read timeouts are best-effort and depend on the video backend; # cap.set() may return False on some backends (e.g. FFmpeg) and is ignored. reconnect_delay = self.reconnect_delay cap = None try: while not self._stop_event.is_set(): try: if cap is not None: cap.release() cap = cv2.VideoCapture(self.rtsp_url) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) if hasattr(cv2, 'CAP_PROP_OPEN_TIMEOUT_MSEC'): if not cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 10000): logger.warning("[%s] CAP_PROP_OPEN_TIMEOUT_MSEC not supported", self.name) if hasattr(cv2, 'CAP_PROP_READ_TIMEOUT_MSEC'): if not cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 10000): logger.warning("[%s] CAP_PROP_READ_TIMEOUT_MSEC not supported", self.name) if not cap.isOpened(): raise RuntimeError(f"Cannot open {self.rtsp_url}") while not self._stop_event.is_set(): ret, frame = cap.read() if not ret: raise RuntimeError("Frame read failed") with self.lock: self.latest_frame = frame reconnect_delay = self.reconnect_delay except Exception as e: with self.lock: self._last_error = str(e) time.sleep(reconnect_delay) reconnect_delay = min(reconnect_delay * 2, 30.0) finally: if cap is not None: cap.release() def get_frame(self) -> Optional[np.ndarray]: with self.lock: return self.latest_frame.copy() if self.latest_frame is not None else None def set_marked_frame(self, frame: np.ndarray): with self.lock: self.marked_frame = frame.copy() def get_marked_frame(self) -> Optional[np.ndarray]: with self.lock: return self.marked_frame.copy() if self.marked_frame is not None else None @property def last_error(self) -> Optional[str]: with self.lock: return self._last_error class StreamManager: def __init__(self): self._streams: Dict[str, CameraStream] = {} self._lock = threading.Lock() def register(self, stream_id: str, rtsp_url: str, reconnect_delay: float = 1.0) -> CameraStream: with self._lock: if stream_id not in self._streams: stream = CameraStream(stream_id, rtsp_url, reconnect_delay=reconnect_delay) self._streams[stream_id] = stream stream.start() return self._streams[stream_id] def unregister(self, stream_id: str): with self._lock: stream = self._streams.pop(stream_id, None) if stream: stream.stop() def get(self, stream_id: str) -> Optional[CameraStream]: with self._lock: return self._streams.get(stream_id) def stop_all(self): with self._lock: streams = list(self._streams.values()) self._streams.clear() for s in streams: s.stop() def encode_mjpeg_frame(frame: np.ndarray, quality: int = 80) -> bytes: """把 numpy 帧编码为 JPEG bytes。""" encode_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality] ret, buf = cv2.imencode(".jpg", frame, encode_params) if not ret: raise RuntimeError("JPEG encode failed") return buf.tobytes() def generate_mjpeg_stream(get_frame: Callable[[], Optional[np.ndarray]], fps: int = 15): """MJPEG 流生成器,用于 FastAPI StreamingResponse。""" period = 1.0 / fps while True: frame = get_frame() if frame is None: time.sleep(period) continue jpeg = encode_mjpeg_frame(frame) yield ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n" b"Content-Length: " + str(len(jpeg)).encode() + b"\r\n\r\n" + jpeg + b"\r\n" ) time.sleep(period)