| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- """多路 RTSP 取流管理 + MJPEG 输出."""
- import logging
- import threading
- import time
- from typing import Dict, Optional, Callable
- import cv2
- import numpy as np
- logger = logging.getLogger(__name__)
- class CameraStream:
- """单个摄像头的 RTSP 流封装."""
- def __init__(self, name: str, rtsp_url: str, reconnect_delay: float = 1.0):
- self.name = name
- self.rtsp_url = rtsp_url
- self.reconnect_delay = reconnect_delay
- self.latest_frame: Optional[np.ndarray] = None
- self.marked_frame: Optional[np.ndarray] = None
- self._stop_event = threading.Event()
- self.thread: Optional[threading.Thread] = None
- self.lock = threading.Lock()
- self._last_error: Optional[str] = None
- def start(self):
- if self.thread and self.thread.is_alive():
- return
- self._stop_event.clear()
- self.thread = threading.Thread(target=self._worker, daemon=True)
- self.thread.start()
- def stop(self):
- self._stop_event.set()
- if self.thread:
- self.thread.join(timeout=5.0)
- self.thread = None
- def _worker(self):
- # OpenCV read timeouts are best-effort and depend on the video backend;
- # cap.set() may return False on some backends (e.g. FFmpeg) and is ignored.
- reconnect_delay = self.reconnect_delay
- cap = None
- try:
- while not self._stop_event.is_set():
- try:
- if cap is not None:
- cap.release()
- cap = cv2.VideoCapture(self.rtsp_url)
- cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- if hasattr(cv2, 'CAP_PROP_OPEN_TIMEOUT_MSEC'):
- if not cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 10000):
- logger.warning("[%s] CAP_PROP_OPEN_TIMEOUT_MSEC not supported", self.name)
- if hasattr(cv2, 'CAP_PROP_READ_TIMEOUT_MSEC'):
- if not cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 10000):
- logger.warning("[%s] CAP_PROP_READ_TIMEOUT_MSEC not supported", self.name)
- if not cap.isOpened():
- raise RuntimeError(f"Cannot open {self.rtsp_url}")
- while not self._stop_event.is_set():
- ret, frame = cap.read()
- if not ret:
- raise RuntimeError("Frame read failed")
- with self.lock:
- self.latest_frame = frame
- reconnect_delay = self.reconnect_delay
- except Exception as e:
- with self.lock:
- self._last_error = str(e)
- time.sleep(reconnect_delay)
- reconnect_delay = min(reconnect_delay * 2, 30.0)
- finally:
- if cap is not None:
- cap.release()
- def get_frame(self) -> Optional[np.ndarray]:
- with self.lock:
- return self.latest_frame.copy() if self.latest_frame is not None else None
- def set_marked_frame(self, frame: np.ndarray):
- with self.lock:
- self.marked_frame = frame.copy()
- def get_marked_frame(self) -> Optional[np.ndarray]:
- with self.lock:
- return self.marked_frame.copy() if self.marked_frame is not None else None
- @property
- def last_error(self) -> Optional[str]:
- with self.lock:
- return self._last_error
- class StreamManager:
- def __init__(self):
- self._streams: Dict[str, CameraStream] = {}
- self._lock = threading.Lock()
- def register(self, stream_id: str, rtsp_url: str, reconnect_delay: float = 1.0) -> CameraStream:
- with self._lock:
- if stream_id not in self._streams:
- stream = CameraStream(stream_id, rtsp_url, reconnect_delay=reconnect_delay)
- self._streams[stream_id] = stream
- stream.start()
- return self._streams[stream_id]
- def unregister(self, stream_id: str):
- with self._lock:
- stream = self._streams.pop(stream_id, None)
- if stream:
- stream.stop()
- def get(self, stream_id: str) -> Optional[CameraStream]:
- with self._lock:
- return self._streams.get(stream_id)
- def stop_all(self):
- with self._lock:
- streams = list(self._streams.values())
- self._streams.clear()
- for s in streams:
- s.stop()
- def encode_mjpeg_frame(frame: np.ndarray, quality: int = 80) -> bytes:
- """把 numpy 帧编码为 JPEG bytes。"""
- encode_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
- ret, buf = cv2.imencode(".jpg", frame, encode_params)
- if not ret:
- raise RuntimeError("JPEG encode failed")
- return buf.tobytes()
- def generate_mjpeg_stream(get_frame: Callable[[], Optional[np.ndarray]], fps: int = 15):
- """MJPEG 流生成器,用于 FastAPI StreamingResponse。"""
- period = 1.0 / fps
- while True:
- frame = get_frame()
- if frame is None:
- time.sleep(period)
- continue
- jpeg = encode_mjpeg_frame(frame)
- yield (
- b"--frame\r\n"
- b"Content-Type: image/jpeg\r\n"
- b"Content-Length: " + str(len(jpeg)).encode() + b"\r\n\r\n" + jpeg + b"\r\n"
- )
- time.sleep(period)
|