dual_stream_manager.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. """
  2. 双路流管理器
  3. 独立管理全景和球机两路视频流,每路使用独立线程和独立锁,
  4. 互不阻塞。提供统一帧获取接口、健康监控、自动重连。
  5. 关键改进:
  6. - 两路流完全独立,一路卡住不影响另一路
  7. - 每路有自己的锁(per-camera lock),不再用全局 FFmpeg 锁
  8. - 超时读帧 + 自动重连 + 健康状态监控
  9. - 帧新鲜度追踪:确保读到的是最近帧而非缓冲区中的旧帧
  10. """
  11. import os
  12. os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
  13. import cv2
  14. import threading
  15. import time
  16. import queue
  17. import logging
  18. from typing import Optional, Dict, Tuple, Callable
  19. from dataclasses import dataclass, field
  20. from video_lock import safe_read, safe_is_opened, get_stream_health, CameraLockManager
  21. logger = logging.getLogger(__name__)
  22. @dataclass
  23. class StreamConfig:
  24. """流配置"""
  25. camera_id: str
  26. rtsp_url: str
  27. camera_type: str = "panorama" # "panorama" 或 "ptz"
  28. buffer_size: int = 2 # 帧缓冲大小(保持低延迟)
  29. reconnect_interval: float = 5.0 # 重连间隔(秒)
  30. max_consecutive_errors: int = 50 # 最大连续错误数
  31. read_timeout: float = 2.0 # 读帧超时(秒)
  32. fps_target: float = 25.0 # 目标帧率
  33. @dataclass
  34. class StreamStatus:
  35. """流状态"""
  36. is_running: bool = False
  37. is_connected: bool = False
  38. consecutive_errors: int = 0
  39. total_frames: int = 0
  40. total_errors: int = 0
  41. last_frame_time: float = 0.0
  42. avg_read_time_ms: float = 0.0
  43. reconnect_count: int = 0
  44. fps_actual: float = 0.0
  45. class StreamWorker:
  46. """单路视频流工作线程"""
  47. def __init__(self, config: StreamConfig):
  48. self.config = config
  49. self._cap = None
  50. self._current_frame = None
  51. self._frame_lock = threading.Lock()
  52. self._frame_queue = queue.Queue(maxsize=config.buffer_size)
  53. self._running = False
  54. self._thread = None
  55. self._status = StreamStatus()
  56. self._status_lock = threading.Lock()
  57. self._last_fps_time = time.time()
  58. self._fps_frame_count = 0
  59. @property
  60. def status(self) -> StreamStatus:
  61. with self._status_lock:
  62. return StreamStatus(
  63. is_running=self._status.is_running,
  64. is_connected=self._status.is_connected,
  65. consecutive_errors=self._status.consecutive_errors,
  66. total_frames=self._status.total_frames,
  67. total_errors=self._status.total_errors,
  68. last_frame_time=self._status.last_frame_time,
  69. avg_read_time_ms=self._status.avg_read_time_ms,
  70. reconnect_count=self._status.reconnect_count,
  71. fps_actual=self._status.fps_actual
  72. )
  73. def start(self) -> bool:
  74. """启动视频流"""
  75. if self._running:
  76. logger.warning(f"[{self.config.camera_id}] 流已在运行")
  77. return True
  78. if not self._connect():
  79. logger.error(f"[{self.config.camera_id}] 初始连接失败")
  80. # 不返回False,启动重连线程
  81. self._status.is_running = True
  82. self._status.is_connected = False
  83. else:
  84. self._status.is_running = True
  85. self._status.is_connected = True
  86. self._running = True
  87. self._thread = threading.Thread(target=self._stream_loop, daemon=True)
  88. self._thread.start()
  89. logger.info(f"[{self.config.camera_id}] 流工作线程已启动")
  90. return True
  91. def stop(self):
  92. """停止视频流"""
  93. self._running = False
  94. if self._thread:
  95. self._thread.join(timeout=3)
  96. self._thread = None
  97. if self._cap is not None:
  98. try:
  99. self._cap.release()
  100. except Exception:
  101. pass
  102. self._cap = None
  103. with self._status_lock:
  104. self._status.is_running = False
  105. self._status.is_connected = False
  106. logger.info(f"[{self.config.camera_id}] 流已停止")
  107. def get_frame(self) -> Optional[cv2.Mat]:
  108. """获取最新帧(线程安全副本)"""
  109. with self._frame_lock:
  110. if self._current_frame is not None:
  111. return self._current_frame.copy()
  112. return None
  113. def get_frame_from_queue(self, timeout: float = 0.1) -> Optional[cv2.Mat]:
  114. """从帧队列获取帧"""
  115. try:
  116. return self._frame_queue.get(timeout=timeout)
  117. except queue.Empty:
  118. return None
  119. def _connect(self) -> bool:
  120. """建立RTSP连接"""
  121. if self._cap is not None:
  122. try:
  123. self._cap.release()
  124. except Exception:
  125. pass
  126. self._cap = None
  127. try:
  128. self._cap = cv2.VideoCapture(self.config.rtsp_url, cv2.CAP_FFMPEG)
  129. if not self._cap.isOpened():
  130. # 尝试 GStreamer 后端
  131. try:
  132. self._cap = cv2.VideoCapture(self.config.rtsp_url, cv2.CAP_GSTREAMER)
  133. if self._cap.isOpened():
  134. logger.info(f"[{self.config.camera_id}] 使用 GStreamer 后端连接成功")
  135. self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
  136. return True
  137. except Exception:
  138. pass
  139. logger.error(f"[{self.config.camera_id}] 无法打开RTSP流")
  140. self._cap = None
  141. return False
  142. self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
  143. logger.info(f"[{self.config.camera_id}] RTSP流连接成功")
  144. return True
  145. except Exception as e:
  146. logger.error(f"[{self.config.camera_id}] 连接异常: {e}")
  147. self._cap = None
  148. return False
  149. def _stream_loop(self):
  150. """流工作主循环"""
  151. import signal
  152. if hasattr(signal, 'pthread_sigmask'):
  153. try:
  154. signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
  155. except (AttributeError, OSError):
  156. pass
  157. logger.info(f"[{self.config.camera_id}] 流循环开始")
  158. while self._running:
  159. try:
  160. # 如果未连接,尝试重连
  161. if self._cap is None or not safe_is_opened(self._cap, self.config.camera_id):
  162. self._reconnect()
  163. continue
  164. # 读帧(使用独立锁+超时)
  165. start_time = time.time()
  166. ret, frame = safe_read(self._cap, self.config.camera_id, self.config.read_timeout)
  167. if not ret or frame is None:
  168. self._on_read_error()
  169. continue
  170. self._on_read_success(frame, start_time)
  171. except Exception as e:
  172. err_str = str(e)
  173. if 'async_lock' in err_str or 'Assertion' in err_str:
  174. logger.warning(f"[{self.config.camera_id}] FFmpeg内部错误,重建连接: {e}")
  175. self._reconnect()
  176. else:
  177. logger.error(f"[{self.config.camera_id}] 流错误: {e}")
  178. time.sleep(0.5)
  179. def _on_read_success(self, frame, start_time: float):
  180. """读帧成功处理"""
  181. read_time_ms = (time.time() - start_time) * 1000
  182. with self._frame_lock:
  183. self._current_frame = frame.copy()
  184. # 非阻塞写入帧队列,满则丢弃旧帧
  185. try:
  186. # 先清空队列确保只保留最新帧
  187. while not self._frame_queue.empty():
  188. try:
  189. self._frame_queue.get_nowait()
  190. except queue.Empty:
  191. break
  192. self._frame_queue.put(frame.copy(), block=False)
  193. except queue.Full:
  194. pass
  195. with self._status_lock:
  196. self._status.consecutive_errors = 0
  197. self._status.total_frames += 1
  198. self._status.last_frame_time = time.time()
  199. self._status.is_connected = True
  200. alpha = 0.1
  201. self._status.avg_read_time_ms = alpha * read_time_ms + (1 - alpha) * self._status.avg_read_time_ms
  202. # FPS 计算
  203. self._fps_frame_count += 1
  204. elapsed = time.time() - self._last_fps_time
  205. if elapsed >= 1.0:
  206. self._status.fps_actual = self._fps_frame_count / elapsed
  207. self._fps_frame_count = 0
  208. self._last_fps_time = time.time()
  209. def _on_read_error(self):
  210. """读帧错误处理"""
  211. with self._status_lock:
  212. self._status.consecutive_errors += 1
  213. self._status.total_errors += 1
  214. if self._status.consecutive_errors > self.config.max_consecutive_errors:
  215. logger.warning(f"[{self.config.camera_id}] 连续{self._status.consecutive_errors}次读帧失败,触发重连")
  216. time.sleep(0.01)
  217. def _reconnect(self):
  218. """重连"""
  219. with self._status_lock:
  220. self._status.reconnect_count += 1
  221. self._status.is_connected = False
  222. logger.info(f"[{self.config.camera_id}] 尝试重连...")
  223. if self._cap is not None:
  224. try:
  225. self._cap.release()
  226. except Exception:
  227. pass
  228. self._cap = None
  229. time.sleep(self.config.reconnect_interval)
  230. if self._connect():
  231. logger.info(f"[{self.config.camera_id}] 重连成功")
  232. else:
  233. logger.warning(f"[{self.config.camera_id}] 重连失败,{self.config.reconnect_interval}秒后重试")
  234. @property
  235. def is_healthy(self) -> bool:
  236. """流是否健康"""
  237. s = self.status
  238. if not s.is_running:
  239. return False
  240. if s.consecutive_errors > 10:
  241. return False
  242. if s.last_frame_time > 0 and (time.time() - s.last_frame_time) > 5.0:
  243. return False
  244. return True
  245. class DualStreamManager:
  246. """
  247. 双路流管理器
  248. 管理全景和球机两路视频流,提供统一的帧获取接口。
  249. 每路流使用独立线程和独立锁,互不阻塞。
  250. """
  251. PANORAMA_ID = "panorama"
  252. PTZ_ID = "ptz"
  253. def __init__(self):
  254. self._workers: Dict[str, StreamWorker] = {}
  255. self._running = False
  256. def add_stream(self, config: StreamConfig) -> StreamWorker:
  257. """添加一路视频流"""
  258. if config.camera_id in self._workers:
  259. logger.warning(f"流 {config.camera_id} 已存在,将先停止")
  260. self._workers[config.camera_id].stop()
  261. worker = StreamWorker(config)
  262. self._workers[config.camera_id] = worker
  263. return worker
  264. def start_stream(self, camera_id: str) -> bool:
  265. """启动指定流"""
  266. if camera_id not in self._workers:
  267. logger.error(f"流 {camera_id} 未注册")
  268. return False
  269. return self._workers[camera_id].start()
  270. def stop_stream(self, camera_id: str):
  271. """停止指定流"""
  272. if camera_id in self._workers:
  273. self._workers[camera_id].stop()
  274. def start_all(self) -> bool:
  275. """启动所有流"""
  276. success = True
  277. for camera_id, worker in self._workers.items():
  278. if not worker.start():
  279. logger.error(f"启动流 {camera_id} 失败")
  280. success = False
  281. else:
  282. logger.info(f"流 {camera_id} 启动中...")
  283. self._running = True
  284. return success
  285. def stop_all(self):
  286. """停止所有流"""
  287. for camera_id, worker in self._workers.items():
  288. worker.stop()
  289. self._running = False
  290. def get_frame(self, camera_id: str) -> Optional[cv2.Mat]:
  291. """获取指定摄像头的最新帧"""
  292. if camera_id not in self._workers:
  293. return None
  294. return self._workers[camera_id].get_frame()
  295. def get_panorama_frame(self) -> Optional[cv2.Mat]:
  296. """获取全景摄像头最新帧"""
  297. return self.get_frame(self.PANORAMA_ID)
  298. def get_ptz_frame(self) -> Optional[cv2.Mat]:
  299. """获取球机最新帧"""
  300. return self.get_frame(self.PTZ_ID)
  301. def get_status(self, camera_id: str) -> Optional[StreamStatus]:
  302. """获取指定流的状态"""
  303. if camera_id not in self._workers:
  304. return None
  305. return self._workers[camera_id].status
  306. def get_all_status(self) -> Dict[str, StreamStatus]:
  307. """获取所有流的状态"""
  308. return {cid: w.status for cid, w in self._workers.items()}
  309. def is_healthy(self, camera_id: str) -> bool:
  310. """检查指定流是否健康"""
  311. if camera_id not in self._workers:
  312. return False
  313. return self._workers[camera_id].is_healthy
  314. def wait_for_frames(self, timeout: float = 15.0) -> Dict[str, bool]:
  315. """
  316. 等待所有流就绪(获取到至少一帧)
  317. Returns:
  318. Dict[camera_id, 是否有帧]
  319. """
  320. start_time = time.time()
  321. ready = {cid: False for cid in self._workers}
  322. while time.time() - start_time < timeout:
  323. all_ready = True
  324. for cid in self._workers:
  325. if not ready[cid]:
  326. frame = self.get_frame(cid)
  327. if frame is not None:
  328. ready[cid] = True
  329. logger.info(f"[{cid}] 帧就绪 ({frame.shape})")
  330. if not ready[cid]:
  331. all_ready = False
  332. if all_ready:
  333. break
  334. time.sleep(0.5)
  335. for cid, r in ready.items():
  336. if not r:
  337. logger.warning(f"[{cid}] 等待帧超时({timeout}s)")
  338. return ready
  339. def get_stream_health(self, camera_id: str):
  340. """获取流健康详情(来自video_lock模块)"""
  341. return get_stream_health(camera_id)
  342. @property
  343. def panorama_worker(self) -> Optional[StreamWorker]:
  344. """获取全景流工作线程"""
  345. return self._workers.get(self.PANORAMA_ID)
  346. @property
  347. def ptz_worker(self) -> Optional[StreamWorker]:
  348. """获取球机流工作线程"""
  349. return self._workers.get(self.PTZ_ID)
  350. @staticmethod
  351. def build_rtsp_url(config: dict) -> str:
  352. """从配置构建RTSP URL"""
  353. if 'rtsp_url' in config and config['rtsp_url']:
  354. return config['rtsp_url']
  355. channel = config.get('channel', 0)
  356. return (f"rtsp://{config['username']}:{config['password']}"
  357. f"@{config['ip']}:{config.get('rtsp_port', 554)}"
  358. f"/cam/realmonitor?channel={channel + 1}&subtype=1")
  359. def create_dual_stream_manager(panorama_config: dict, ptz_config: dict) -> DualStreamManager:
  360. """
  361. 便捷函数:根据摄像头配置创建双路流管理器
  362. Args:
  363. panorama_config: 全景摄像头配置
  364. ptz_config: 球机配置
  365. Returns:
  366. 已配置的 DualStreamManager 实例
  367. """
  368. manager = DualStreamManager()
  369. pan_rtsp = DualStreamManager.build_rtsp_url(panorama_config)
  370. ptz_rtsp = DualStreamManager.build_rtsp_url(ptz_config)
  371. manager.add_stream(StreamConfig(
  372. camera_id=DualStreamManager.PANORAMA_ID,
  373. rtsp_url=pan_rtsp,
  374. camera_type="panorama"
  375. ))
  376. manager.add_stream(StreamConfig(
  377. camera_id=DualStreamManager.PTZ_ID,
  378. rtsp_url=ptz_rtsp,
  379. camera_type="ptz"
  380. ))
  381. return manager