""" 双路流管理器 独立管理全景和球机两路视频流,每路使用独立线程和独立锁, 互不阻塞。提供统一帧获取接口、健康监控、自动重连。 关键改进: - 两路流完全独立,一路卡住不影响另一路 - 每路有自己的锁(per-camera lock),不再用全局 FFmpeg 锁 - 超时读帧 + 自动重连 + 健康状态监控 - 帧新鲜度追踪:确保读到的是最近帧而非缓冲区中的旧帧 """ import os os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1' import cv2 import threading import time import queue import logging from typing import Optional, Dict, Tuple, Callable from dataclasses import dataclass, field from video_lock import safe_read, safe_is_opened, get_stream_health, CameraLockManager logger = logging.getLogger(__name__) @dataclass class StreamConfig: """流配置""" camera_id: str rtsp_url: str camera_type: str = "panorama" # "panorama" 或 "ptz" buffer_size: int = 2 # 帧缓冲大小(保持低延迟) reconnect_interval: float = 5.0 # 重连间隔(秒) max_consecutive_errors: int = 50 # 最大连续错误数 read_timeout: float = 2.0 # 读帧超时(秒) fps_target: float = 25.0 # 目标帧率 @dataclass class StreamStatus: """流状态""" is_running: bool = False is_connected: bool = False consecutive_errors: int = 0 total_frames: int = 0 total_errors: int = 0 last_frame_time: float = 0.0 avg_read_time_ms: float = 0.0 reconnect_count: int = 0 fps_actual: float = 0.0 class StreamWorker: """单路视频流工作线程""" def __init__(self, config: StreamConfig): self.config = config self._cap = None self._current_frame = None self._frame_lock = threading.Lock() self._frame_queue = queue.Queue(maxsize=config.buffer_size) self._running = False self._thread = None self._status = StreamStatus() self._status_lock = threading.Lock() self._last_fps_time = time.time() self._fps_frame_count = 0 @property def status(self) -> StreamStatus: with self._status_lock: return StreamStatus( is_running=self._status.is_running, is_connected=self._status.is_connected, consecutive_errors=self._status.consecutive_errors, total_frames=self._status.total_frames, total_errors=self._status.total_errors, last_frame_time=self._status.last_frame_time, avg_read_time_ms=self._status.avg_read_time_ms, reconnect_count=self._status.reconnect_count, fps_actual=self._status.fps_actual ) def start(self) -> bool: """启动视频流""" if self._running: logger.warning(f"[{self.config.camera_id}] 流已在运行") return True if not self._connect(): logger.error(f"[{self.config.camera_id}] 初始连接失败") # 不返回False,启动重连线程 self._status.is_running = True self._status.is_connected = False else: self._status.is_running = True self._status.is_connected = True self._running = True self._thread = threading.Thread(target=self._stream_loop, daemon=True) self._thread.start() logger.info(f"[{self.config.camera_id}] 流工作线程已启动") return True def stop(self): """停止视频流""" self._running = False if self._thread: self._thread.join(timeout=3) self._thread = None if self._cap is not None: try: self._cap.release() except Exception: pass self._cap = None with self._status_lock: self._status.is_running = False self._status.is_connected = False logger.info(f"[{self.config.camera_id}] 流已停止") def get_frame(self) -> Optional[cv2.Mat]: """获取最新帧(线程安全副本)""" with self._frame_lock: if self._current_frame is not None: return self._current_frame.copy() return None def get_frame_from_queue(self, timeout: float = 0.1) -> Optional[cv2.Mat]: """从帧队列获取帧""" try: return self._frame_queue.get(timeout=timeout) except queue.Empty: return None def _connect(self) -> bool: """建立RTSP连接""" if self._cap is not None: try: self._cap.release() except Exception: pass self._cap = None try: self._cap = cv2.VideoCapture(self.config.rtsp_url, cv2.CAP_FFMPEG) if not self._cap.isOpened(): # 尝试 GStreamer 后端 try: self._cap = cv2.VideoCapture(self.config.rtsp_url, cv2.CAP_GSTREAMER) if self._cap.isOpened(): logger.info(f"[{self.config.camera_id}] 使用 GStreamer 后端连接成功") self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) return True except Exception: pass logger.error(f"[{self.config.camera_id}] 无法打开RTSP流") self._cap = None return False self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) logger.info(f"[{self.config.camera_id}] RTSP流连接成功") return True except Exception as e: logger.error(f"[{self.config.camera_id}] 连接异常: {e}") self._cap = None return False def _stream_loop(self): """流工作主循环""" import signal if hasattr(signal, 'pthread_sigmask'): try: signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT}) except (AttributeError, OSError): pass logger.info(f"[{self.config.camera_id}] 流循环开始") while self._running: try: # 如果未连接,尝试重连 if self._cap is None or not safe_is_opened(self._cap, self.config.camera_id): self._reconnect() continue # 读帧(使用独立锁+超时) start_time = time.time() ret, frame = safe_read(self._cap, self.config.camera_id, self.config.read_timeout) if not ret or frame is None: self._on_read_error() continue self._on_read_success(frame, start_time) except Exception as e: err_str = str(e) if 'async_lock' in err_str or 'Assertion' in err_str: logger.warning(f"[{self.config.camera_id}] FFmpeg内部错误,重建连接: {e}") self._reconnect() else: logger.error(f"[{self.config.camera_id}] 流错误: {e}") time.sleep(0.5) def _on_read_success(self, frame, start_time: float): """读帧成功处理""" read_time_ms = (time.time() - start_time) * 1000 with self._frame_lock: self._current_frame = frame.copy() # 非阻塞写入帧队列,满则丢弃旧帧 try: # 先清空队列确保只保留最新帧 while not self._frame_queue.empty(): try: self._frame_queue.get_nowait() except queue.Empty: break self._frame_queue.put(frame.copy(), block=False) except queue.Full: pass with self._status_lock: self._status.consecutive_errors = 0 self._status.total_frames += 1 self._status.last_frame_time = time.time() self._status.is_connected = True alpha = 0.1 self._status.avg_read_time_ms = alpha * read_time_ms + (1 - alpha) * self._status.avg_read_time_ms # FPS 计算 self._fps_frame_count += 1 elapsed = time.time() - self._last_fps_time if elapsed >= 1.0: self._status.fps_actual = self._fps_frame_count / elapsed self._fps_frame_count = 0 self._last_fps_time = time.time() def _on_read_error(self): """读帧错误处理""" with self._status_lock: self._status.consecutive_errors += 1 self._status.total_errors += 1 if self._status.consecutive_errors > self.config.max_consecutive_errors: logger.warning(f"[{self.config.camera_id}] 连续{self._status.consecutive_errors}次读帧失败,触发重连") time.sleep(0.01) def _reconnect(self): """重连""" with self._status_lock: self._status.reconnect_count += 1 self._status.is_connected = False logger.info(f"[{self.config.camera_id}] 尝试重连...") if self._cap is not None: try: self._cap.release() except Exception: pass self._cap = None time.sleep(self.config.reconnect_interval) if self._connect(): logger.info(f"[{self.config.camera_id}] 重连成功") else: logger.warning(f"[{self.config.camera_id}] 重连失败,{self.config.reconnect_interval}秒后重试") @property def is_healthy(self) -> bool: """流是否健康""" s = self.status if not s.is_running: return False if s.consecutive_errors > 10: return False if s.last_frame_time > 0 and (time.time() - s.last_frame_time) > 5.0: return False return True class DualStreamManager: """ 双路流管理器 管理全景和球机两路视频流,提供统一的帧获取接口。 每路流使用独立线程和独立锁,互不阻塞。 """ PANORAMA_ID = "panorama" PTZ_ID = "ptz" def __init__(self): self._workers: Dict[str, StreamWorker] = {} self._running = False def add_stream(self, config: StreamConfig) -> StreamWorker: """添加一路视频流""" if config.camera_id in self._workers: logger.warning(f"流 {config.camera_id} 已存在,将先停止") self._workers[config.camera_id].stop() worker = StreamWorker(config) self._workers[config.camera_id] = worker return worker def start_stream(self, camera_id: str) -> bool: """启动指定流""" if camera_id not in self._workers: logger.error(f"流 {camera_id} 未注册") return False return self._workers[camera_id].start() def stop_stream(self, camera_id: str): """停止指定流""" if camera_id in self._workers: self._workers[camera_id].stop() def start_all(self) -> bool: """启动所有流""" success = True for camera_id, worker in self._workers.items(): if not worker.start(): logger.error(f"启动流 {camera_id} 失败") success = False else: logger.info(f"流 {camera_id} 启动中...") self._running = True return success def stop_all(self): """停止所有流""" for camera_id, worker in self._workers.items(): worker.stop() self._running = False def get_frame(self, camera_id: str) -> Optional[cv2.Mat]: """获取指定摄像头的最新帧""" if camera_id not in self._workers: return None return self._workers[camera_id].get_frame() def get_panorama_frame(self) -> Optional[cv2.Mat]: """获取全景摄像头最新帧""" return self.get_frame(self.PANORAMA_ID) def get_ptz_frame(self) -> Optional[cv2.Mat]: """获取球机最新帧""" return self.get_frame(self.PTZ_ID) def get_status(self, camera_id: str) -> Optional[StreamStatus]: """获取指定流的状态""" if camera_id not in self._workers: return None return self._workers[camera_id].status def get_all_status(self) -> Dict[str, StreamStatus]: """获取所有流的状态""" return {cid: w.status for cid, w in self._workers.items()} def is_healthy(self, camera_id: str) -> bool: """检查指定流是否健康""" if camera_id not in self._workers: return False return self._workers[camera_id].is_healthy def wait_for_frames(self, timeout: float = 15.0) -> Dict[str, bool]: """ 等待所有流就绪(获取到至少一帧) Returns: Dict[camera_id, 是否有帧] """ start_time = time.time() ready = {cid: False for cid in self._workers} while time.time() - start_time < timeout: all_ready = True for cid in self._workers: if not ready[cid]: frame = self.get_frame(cid) if frame is not None: ready[cid] = True logger.info(f"[{cid}] 帧就绪 ({frame.shape})") if not ready[cid]: all_ready = False if all_ready: break time.sleep(0.5) for cid, r in ready.items(): if not r: logger.warning(f"[{cid}] 等待帧超时({timeout}s)") return ready def get_stream_health(self, camera_id: str): """获取流健康详情(来自video_lock模块)""" return get_stream_health(camera_id) @property def panorama_worker(self) -> Optional[StreamWorker]: """获取全景流工作线程""" return self._workers.get(self.PANORAMA_ID) @property def ptz_worker(self) -> Optional[StreamWorker]: """获取球机流工作线程""" return self._workers.get(self.PTZ_ID) @staticmethod def build_rtsp_url(config: dict) -> str: """从配置构建RTSP URL""" if 'rtsp_url' in config and config['rtsp_url']: return config['rtsp_url'] channel = config.get('channel', 0) return (f"rtsp://{config['username']}:{config['password']}" f"@{config['ip']}:{config.get('rtsp_port', 554)}" f"/cam/realmonitor?channel={channel + 1}&subtype=1") def create_dual_stream_manager(panorama_config: dict, ptz_config: dict) -> DualStreamManager: """ 便捷函数:根据摄像头配置创建双路流管理器 Args: panorama_config: 全景摄像头配置 ptz_config: 球机配置 Returns: 已配置的 DualStreamManager 实例 """ manager = DualStreamManager() pan_rtsp = DualStreamManager.build_rtsp_url(panorama_config) ptz_rtsp = DualStreamManager.build_rtsp_url(ptz_config) manager.add_stream(StreamConfig( camera_id=DualStreamManager.PANORAMA_ID, rtsp_url=pan_rtsp, camera_type="panorama" )) manager.add_stream(StreamConfig( camera_id=DualStreamManager.PTZ_ID, rtsp_url=ptz_rtsp, camera_type="ptz" )) return manager