| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- """
- 视频捕获锁模块
- 提供线程安全的 VideoCapture 操作,防止 FFmpeg 多线程解码崩溃。
- 改进:从全局锁改为每摄像头独立锁 + 超时机制。
- - 每路摄像头使用独立的锁,不再互相阻塞
- - 超时读帧:如果某路摄像头卡住,不会无限阻塞其他路
- - 健康检查:记录每路流的状态(最近帧时间、连续错误数等)
- """
- import threading
- import time
- from typing import Optional, Tuple, Dict
- from dataclasses import dataclass, field
- @dataclass
- class StreamHealth:
- """流健康状态"""
- camera_id: str = ""
- last_frame_time: float = 0.0 # 最近成功读帧时间
- consecutive_errors: int = 0 # 连续错误数
- total_frames: int = 0 # 总帧数
- total_errors: int = 0 # 总错误数
- is_healthy: bool = True # 是否健康
- avg_read_time_ms: float = 0.0 # 平均读帧耗时(ms)
- class CameraLockManager:
- """
- 每摄像头独立锁管理器
-
- 替代全局锁,每路摄像头使用独立的锁,互不阻塞。
- FFmpeg 内部的 async_lock 问题仅在多个 VideoCapture 实例
- 并发调用 read() 时触发。独立锁允许同一路的读操作串行化,
- 而不同路之间完全并行。
- """
-
- def __init__(self):
- self._locks: Dict[str, threading.Lock] = {}
- self._health: Dict[str, StreamHealth] = {}
- self._global_lock = threading.Lock() # 仅用于管理锁的创建
-
- def get_lock(self, camera_id: str) -> threading.Lock:
- """获取或创建指定摄像头的独立锁"""
- with self._global_lock:
- if camera_id not in self._locks:
- self._locks[camera_id] = threading.Lock()
- self._health[camera_id] = StreamHealth(camera_id=camera_id)
- return self._locks[camera_id]
-
- def get_health(self, camera_id: str) -> StreamHealth:
- """获取摄像头流健康状态"""
- with self._global_lock:
- return self._health.get(camera_id, StreamHealth(camera_id=camera_id))
-
- def update_health(self, camera_id: str, success: bool, read_time_ms: float = 0.0):
- """更新流健康状态"""
- with self._global_lock:
- health = self._health.get(camera_id)
- if health is None:
- health = StreamHealth(camera_id=camera_id)
- self._health[camera_id] = health
-
- if success:
- health.last_frame_time = time.time()
- health.consecutive_errors = 0
- health.total_frames += 1
- health.is_healthy = True
- # 指数移动平均计算读帧耗时
- alpha = 0.1
- health.avg_read_time_ms = alpha * read_time_ms + (1 - alpha) * health.avg_read_time_ms
- else:
- health.consecutive_errors += 1
- health.total_errors += 1
- if health.consecutive_errors > 50:
- health.is_healthy = False
- # 全局锁管理器实例
- _manager = CameraLockManager()
- def safe_read(cap, camera_id: str = "default", timeout: float = 5.0) -> Tuple[bool, Optional[object]]:
- """
- 线程安全的 VideoCapture.read(),使用每摄像头独立锁。
-
- Args:
- cap: cv2.VideoCapture 实例
- camera_id: 摄像头标识,用于区分不同路的锁
- timeout: 读帧超时(秒),超时返回 (False, None)
-
- Returns:
- (ret, frame) 与 cap.read() 相同格式
- """
- lock = _manager.get_lock(camera_id)
- acquired = lock.acquire(timeout=timeout)
- if not acquired:
- _manager.update_health(camera_id, False)
- return (False, None)
-
- try:
- start_time = time.time()
- ret, frame = cap.read()
- read_time_ms = (time.time() - start_time) * 1000
- _manager.update_health(camera_id, ret, read_time_ms)
-
- if not ret or frame is None:
- return (False, None)
- return (ret, frame)
- except Exception:
- _manager.update_health(camera_id, False)
- return (False, None)
- finally:
- lock.release()
- def safe_is_opened(cap, camera_id: str = "default") -> bool:
- """
- 线程安全的 VideoCapture.isOpened() 检查
- """
- lock = _manager.get_lock(camera_id)
- acquired = lock.acquire(timeout=2.0)
- if not acquired:
- return False
- try:
- return cap.isOpened()
- except Exception:
- return False
- finally:
- lock.release()
- def get_stream_health(camera_id: str) -> StreamHealth:
- """获取指定摄像头流的健康状态"""
- return _manager.get_health(camera_id)
- def get_all_health() -> Dict[str, StreamHealth]:
- """获取所有摄像头流的健康状态"""
- with _manager._global_lock:
- return dict(_manager._health)
- # ============================================================
- # 向后兼容:保留全局锁接口,但内部使用更安全的超时机制
- # ============================================================
- # 全局锁(向后兼容,用于不指定 camera_id 的场景)
- _compatibility_lock = threading.Lock()
- def safe_read_global(cap) -> Tuple[bool, Optional[object]]:
- """
- 全局锁版本的 safe_read(向后兼容)
- 新代码应使用 safe_read(cap, camera_id) 代替
- """
- acquired = _compatibility_lock.acquire(timeout=5.0)
- if not acquired:
- return (False, None)
- try:
- ret, frame = cap.read()
- if not ret or frame is None:
- return (False, None)
- return (ret, frame)
- except Exception:
- return (False, None)
- finally:
- _compatibility_lock.release()
- def safe_is_opened_global(cap) -> bool:
- """
- 全局锁版本的 safe_is_opened(向后兼容)
- 新代码应使用 safe_is_opened(cap, camera_id) 代替
- """
- acquired = _compatibility_lock.acquire(timeout=2.0)
- if not acquired:
- return False
- try:
- return cap.isOpened()
- except Exception:
- return False
- finally:
- _compatibility_lock.release()
|