""" 视频捕获锁模块 提供线程安全的 VideoCapture 操作,防止 FFmpeg 多线程解码崩溃。 改进:从全局锁改为每摄像头独立锁 + 超时机制。 - 每路摄像头使用独立的锁,不再互相阻塞 - 超时读帧:如果某路摄像头卡住,不会无限阻塞其他路 - 健康检查:记录每路流的状态(最近帧时间、连续错误数等) """ import threading import time from typing import Optional, Tuple, Dict from dataclasses import dataclass, field @dataclass class StreamHealth: """流健康状态""" camera_id: str = "" last_frame_time: float = 0.0 # 最近成功读帧时间 consecutive_errors: int = 0 # 连续错误数 total_frames: int = 0 # 总帧数 total_errors: int = 0 # 总错误数 is_healthy: bool = True # 是否健康 avg_read_time_ms: float = 0.0 # 平均读帧耗时(ms) class CameraLockManager: """ 每摄像头独立锁管理器 替代全局锁,每路摄像头使用独立的锁,互不阻塞。 FFmpeg 内部的 async_lock 问题仅在多个 VideoCapture 实例 并发调用 read() 时触发。独立锁允许同一路的读操作串行化, 而不同路之间完全并行。 """ def __init__(self): self._locks: Dict[str, threading.Lock] = {} self._health: Dict[str, StreamHealth] = {} self._global_lock = threading.Lock() # 仅用于管理锁的创建 def get_lock(self, camera_id: str) -> threading.Lock: """获取或创建指定摄像头的独立锁""" with self._global_lock: if camera_id not in self._locks: self._locks[camera_id] = threading.Lock() self._health[camera_id] = StreamHealth(camera_id=camera_id) return self._locks[camera_id] def get_health(self, camera_id: str) -> StreamHealth: """获取摄像头流健康状态""" with self._global_lock: return self._health.get(camera_id, StreamHealth(camera_id=camera_id)) def update_health(self, camera_id: str, success: bool, read_time_ms: float = 0.0): """更新流健康状态""" with self._global_lock: health = self._health.get(camera_id) if health is None: health = StreamHealth(camera_id=camera_id) self._health[camera_id] = health if success: health.last_frame_time = time.time() health.consecutive_errors = 0 health.total_frames += 1 health.is_healthy = True # 指数移动平均计算读帧耗时 alpha = 0.1 health.avg_read_time_ms = alpha * read_time_ms + (1 - alpha) * health.avg_read_time_ms else: health.consecutive_errors += 1 health.total_errors += 1 if health.consecutive_errors > 50: health.is_healthy = False # 全局锁管理器实例 _manager = CameraLockManager() def safe_read(cap, camera_id: str = "default", timeout: float = 5.0) -> Tuple[bool, Optional[object]]: """ 线程安全的 VideoCapture.read(),使用每摄像头独立锁。 Args: cap: cv2.VideoCapture 实例 camera_id: 摄像头标识,用于区分不同路的锁 timeout: 读帧超时(秒),超时返回 (False, None) Returns: (ret, frame) 与 cap.read() 相同格式 """ lock = _manager.get_lock(camera_id) acquired = lock.acquire(timeout=timeout) if not acquired: _manager.update_health(camera_id, False) return (False, None) try: start_time = time.time() ret, frame = cap.read() read_time_ms = (time.time() - start_time) * 1000 _manager.update_health(camera_id, ret, read_time_ms) if not ret or frame is None: return (False, None) return (ret, frame) except Exception: _manager.update_health(camera_id, False) return (False, None) finally: lock.release() def safe_is_opened(cap, camera_id: str = "default") -> bool: """ 线程安全的 VideoCapture.isOpened() 检查 """ lock = _manager.get_lock(camera_id) acquired = lock.acquire(timeout=2.0) if not acquired: return False try: return cap.isOpened() except Exception: return False finally: lock.release() def get_stream_health(camera_id: str) -> StreamHealth: """获取指定摄像头流的健康状态""" return _manager.get_health(camera_id) def get_all_health() -> Dict[str, StreamHealth]: """获取所有摄像头流的健康状态""" with _manager._global_lock: return dict(_manager._health) # ============================================================ # 向后兼容:保留全局锁接口,但内部使用更安全的超时机制 # ============================================================ # 全局锁(向后兼容,用于不指定 camera_id 的场景) _compatibility_lock = threading.Lock() def safe_read_global(cap) -> Tuple[bool, Optional[object]]: """ 全局锁版本的 safe_read(向后兼容) 新代码应使用 safe_read(cap, camera_id) 代替 """ acquired = _compatibility_lock.acquire(timeout=5.0) if not acquired: return (False, None) try: ret, frame = cap.read() if not ret or frame is None: return (False, None) return (ret, frame) except Exception: return (False, None) finally: _compatibility_lock.release() def safe_is_opened_global(cap) -> bool: """ 全局锁版本的 safe_is_opened(向后兼容) 新代码应使用 safe_is_opened(cap, camera_id) 代替 """ acquired = _compatibility_lock.acquire(timeout=2.0) if not acquired: return False try: return cap.isOpened() except Exception: return False finally: _compatibility_lock.release()