video_lock.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. """
  2. 视频捕获锁模块
  3. 提供线程安全的 VideoCapture 操作,防止 FFmpeg 多线程解码崩溃。
  4. 改进:从全局锁改为每摄像头独立锁 + 超时机制。
  5. - 每路摄像头使用独立的锁,不再互相阻塞
  6. - 超时读帧:如果某路摄像头卡住,不会无限阻塞其他路
  7. - 健康检查:记录每路流的状态(最近帧时间、连续错误数等)
  8. """
  9. import threading
  10. import time
  11. from typing import Optional, Tuple, Dict
  12. from dataclasses import dataclass, field
  13. @dataclass
  14. class StreamHealth:
  15. """流健康状态"""
  16. camera_id: str = ""
  17. last_frame_time: float = 0.0 # 最近成功读帧时间
  18. consecutive_errors: int = 0 # 连续错误数
  19. total_frames: int = 0 # 总帧数
  20. total_errors: int = 0 # 总错误数
  21. is_healthy: bool = True # 是否健康
  22. avg_read_time_ms: float = 0.0 # 平均读帧耗时(ms)
  23. class CameraLockManager:
  24. """
  25. 每摄像头独立锁管理器
  26. 替代全局锁,每路摄像头使用独立的锁,互不阻塞。
  27. FFmpeg 内部的 async_lock 问题仅在多个 VideoCapture 实例
  28. 并发调用 read() 时触发。独立锁允许同一路的读操作串行化,
  29. 而不同路之间完全并行。
  30. """
  31. def __init__(self):
  32. self._locks: Dict[str, threading.Lock] = {}
  33. self._health: Dict[str, StreamHealth] = {}
  34. self._global_lock = threading.Lock() # 仅用于管理锁的创建
  35. def get_lock(self, camera_id: str) -> threading.Lock:
  36. """获取或创建指定摄像头的独立锁"""
  37. with self._global_lock:
  38. if camera_id not in self._locks:
  39. self._locks[camera_id] = threading.Lock()
  40. self._health[camera_id] = StreamHealth(camera_id=camera_id)
  41. return self._locks[camera_id]
  42. def get_health(self, camera_id: str) -> StreamHealth:
  43. """获取摄像头流健康状态"""
  44. with self._global_lock:
  45. return self._health.get(camera_id, StreamHealth(camera_id=camera_id))
  46. def update_health(self, camera_id: str, success: bool, read_time_ms: float = 0.0):
  47. """更新流健康状态"""
  48. with self._global_lock:
  49. health = self._health.get(camera_id)
  50. if health is None:
  51. health = StreamHealth(camera_id=camera_id)
  52. self._health[camera_id] = health
  53. if success:
  54. health.last_frame_time = time.time()
  55. health.consecutive_errors = 0
  56. health.total_frames += 1
  57. health.is_healthy = True
  58. # 指数移动平均计算读帧耗时
  59. alpha = 0.1
  60. health.avg_read_time_ms = alpha * read_time_ms + (1 - alpha) * health.avg_read_time_ms
  61. else:
  62. health.consecutive_errors += 1
  63. health.total_errors += 1
  64. if health.consecutive_errors > 50:
  65. health.is_healthy = False
  66. # 全局锁管理器实例
  67. _manager = CameraLockManager()
  68. def safe_read(cap, camera_id: str = "default", timeout: float = 5.0) -> Tuple[bool, Optional[object]]:
  69. """
  70. 线程安全的 VideoCapture.read(),使用每摄像头独立锁。
  71. Args:
  72. cap: cv2.VideoCapture 实例
  73. camera_id: 摄像头标识,用于区分不同路的锁
  74. timeout: 读帧超时(秒),超时返回 (False, None)
  75. Returns:
  76. (ret, frame) 与 cap.read() 相同格式
  77. """
  78. lock = _manager.get_lock(camera_id)
  79. acquired = lock.acquire(timeout=timeout)
  80. if not acquired:
  81. _manager.update_health(camera_id, False)
  82. return (False, None)
  83. try:
  84. start_time = time.time()
  85. ret, frame = cap.read()
  86. read_time_ms = (time.time() - start_time) * 1000
  87. _manager.update_health(camera_id, ret, read_time_ms)
  88. if not ret or frame is None:
  89. return (False, None)
  90. return (ret, frame)
  91. except Exception:
  92. _manager.update_health(camera_id, False)
  93. return (False, None)
  94. finally:
  95. lock.release()
  96. def safe_is_opened(cap, camera_id: str = "default") -> bool:
  97. """
  98. 线程安全的 VideoCapture.isOpened() 检查
  99. """
  100. lock = _manager.get_lock(camera_id)
  101. acquired = lock.acquire(timeout=2.0)
  102. if not acquired:
  103. return False
  104. try:
  105. return cap.isOpened()
  106. except Exception:
  107. return False
  108. finally:
  109. lock.release()
  110. def get_stream_health(camera_id: str) -> StreamHealth:
  111. """获取指定摄像头流的健康状态"""
  112. return _manager.get_health(camera_id)
  113. def get_all_health() -> Dict[str, StreamHealth]:
  114. """获取所有摄像头流的健康状态"""
  115. with _manager._global_lock:
  116. return dict(_manager._health)
  117. # ============================================================
  118. # 向后兼容:保留全局锁接口,但内部使用更安全的超时机制
  119. # ============================================================
  120. # 全局锁(向后兼容,用于不指定 camera_id 的场景)
  121. _compatibility_lock = threading.Lock()
  122. def safe_read_global(cap) -> Tuple[bool, Optional[object]]:
  123. """
  124. 全局锁版本的 safe_read(向后兼容)
  125. 新代码应使用 safe_read(cap, camera_id) 代替
  126. """
  127. acquired = _compatibility_lock.acquire(timeout=5.0)
  128. if not acquired:
  129. return (False, None)
  130. try:
  131. ret, frame = cap.read()
  132. if not ret or frame is None:
  133. return (False, None)
  134. return (ret, frame)
  135. except Exception:
  136. return (False, None)
  137. finally:
  138. _compatibility_lock.release()
  139. def safe_is_opened_global(cap) -> bool:
  140. """
  141. 全局锁版本的 safe_is_opened(向后兼容)
  142. 新代码应使用 safe_is_opened(cap, camera_id) 代替
  143. """
  144. acquired = _compatibility_lock.acquire(timeout=2.0)
  145. if not acquired:
  146. return False
  147. try:
  148. return cap.isOpened()
  149. except Exception:
  150. return False
  151. finally:
  152. _compatibility_lock.release()