| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467 |
- """
- 双路流管理器
- 独立管理全景和球机两路视频流,每路使用独立线程和独立锁,
- 互不阻塞。提供统一帧获取接口、健康监控、自动重连。
- 关键改进:
- - 两路流完全独立,一路卡住不影响另一路
- - 每路有自己的锁(per-camera lock),不再用全局 FFmpeg 锁
- - 超时读帧 + 自动重连 + 健康状态监控
- - 帧新鲜度追踪:确保读到的是最近帧而非缓冲区中的旧帧
- """
- import os
- os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
- import cv2
- import threading
- import time
- import queue
- import logging
- from typing import Optional, Dict, Tuple, Callable
- from dataclasses import dataclass, field
- from video_lock import safe_read, safe_is_opened, get_stream_health, CameraLockManager
- logger = logging.getLogger(__name__)
- @dataclass
- class StreamConfig:
- """流配置"""
- camera_id: str
- rtsp_url: str
- camera_type: str = "panorama" # "panorama" 或 "ptz"
- buffer_size: int = 2 # 帧缓冲大小(保持低延迟)
- reconnect_interval: float = 5.0 # 重连间隔(秒)
- max_consecutive_errors: int = 50 # 最大连续错误数
- read_timeout: float = 2.0 # 读帧超时(秒)
- fps_target: float = 25.0 # 目标帧率
- @dataclass
- class StreamStatus:
- """流状态"""
- is_running: bool = False
- is_connected: bool = False
- consecutive_errors: int = 0
- total_frames: int = 0
- total_errors: int = 0
- last_frame_time: float = 0.0
- avg_read_time_ms: float = 0.0
- reconnect_count: int = 0
- fps_actual: float = 0.0
- class StreamWorker:
- """单路视频流工作线程"""
-
- def __init__(self, config: StreamConfig):
- self.config = config
- self._cap = None
- self._current_frame = None
- self._frame_lock = threading.Lock()
- self._frame_queue = queue.Queue(maxsize=config.buffer_size)
- self._running = False
- self._thread = None
- self._status = StreamStatus()
- self._status_lock = threading.Lock()
- self._last_fps_time = time.time()
- self._fps_frame_count = 0
-
- @property
- def status(self) -> StreamStatus:
- with self._status_lock:
- return StreamStatus(
- is_running=self._status.is_running,
- is_connected=self._status.is_connected,
- consecutive_errors=self._status.consecutive_errors,
- total_frames=self._status.total_frames,
- total_errors=self._status.total_errors,
- last_frame_time=self._status.last_frame_time,
- avg_read_time_ms=self._status.avg_read_time_ms,
- reconnect_count=self._status.reconnect_count,
- fps_actual=self._status.fps_actual
- )
-
- def start(self) -> bool:
- """启动视频流"""
- if self._running:
- logger.warning(f"[{self.config.camera_id}] 流已在运行")
- return True
-
- if not self._connect():
- logger.error(f"[{self.config.camera_id}] 初始连接失败")
- # 不返回False,启动重连线程
- self._status.is_running = True
- self._status.is_connected = False
- else:
- self._status.is_running = True
- self._status.is_connected = True
-
- self._running = True
- self._thread = threading.Thread(target=self._stream_loop, daemon=True)
- self._thread.start()
- logger.info(f"[{self.config.camera_id}] 流工作线程已启动")
- return True
-
- def stop(self):
- """停止视频流"""
- self._running = False
- if self._thread:
- self._thread.join(timeout=3)
- self._thread = None
-
- if self._cap is not None:
- try:
- self._cap.release()
- except Exception:
- pass
- self._cap = None
-
- with self._status_lock:
- self._status.is_running = False
- self._status.is_connected = False
-
- logger.info(f"[{self.config.camera_id}] 流已停止")
-
- def get_frame(self) -> Optional[cv2.Mat]:
- """获取最新帧(线程安全副本)"""
- with self._frame_lock:
- if self._current_frame is not None:
- return self._current_frame.copy()
- return None
-
- def get_frame_from_queue(self, timeout: float = 0.1) -> Optional[cv2.Mat]:
- """从帧队列获取帧"""
- try:
- return self._frame_queue.get(timeout=timeout)
- except queue.Empty:
- return None
-
- def _connect(self) -> bool:
- """建立RTSP连接"""
- if self._cap is not None:
- try:
- self._cap.release()
- except Exception:
- pass
- self._cap = None
-
- try:
- self._cap = cv2.VideoCapture(self.config.rtsp_url, cv2.CAP_FFMPEG)
- if not self._cap.isOpened():
- # 尝试 GStreamer 后端
- try:
- self._cap = cv2.VideoCapture(self.config.rtsp_url, cv2.CAP_GSTREAMER)
- if self._cap.isOpened():
- logger.info(f"[{self.config.camera_id}] 使用 GStreamer 后端连接成功")
- self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- return True
- except Exception:
- pass
-
- logger.error(f"[{self.config.camera_id}] 无法打开RTSP流")
- self._cap = None
- return False
-
- self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- logger.info(f"[{self.config.camera_id}] RTSP流连接成功")
- return True
-
- except Exception as e:
- logger.error(f"[{self.config.camera_id}] 连接异常: {e}")
- self._cap = None
- return False
-
- def _stream_loop(self):
- """流工作主循环"""
- import signal
- if hasattr(signal, 'pthread_sigmask'):
- try:
- signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
- except (AttributeError, OSError):
- pass
-
- logger.info(f"[{self.config.camera_id}] 流循环开始")
-
- while self._running:
- try:
- # 如果未连接,尝试重连
- if self._cap is None or not safe_is_opened(self._cap, self.config.camera_id):
- self._reconnect()
- continue
-
- # 读帧(使用独立锁+超时)
- start_time = time.time()
- ret, frame = safe_read(self._cap, self.config.camera_id, self.config.read_timeout)
-
- if not ret or frame is None:
- self._on_read_error()
- continue
-
- self._on_read_success(frame, start_time)
-
- except Exception as e:
- err_str = str(e)
- if 'async_lock' in err_str or 'Assertion' in err_str:
- logger.warning(f"[{self.config.camera_id}] FFmpeg内部错误,重建连接: {e}")
- self._reconnect()
- else:
- logger.error(f"[{self.config.camera_id}] 流错误: {e}")
- time.sleep(0.5)
-
- def _on_read_success(self, frame, start_time: float):
- """读帧成功处理"""
- read_time_ms = (time.time() - start_time) * 1000
-
- with self._frame_lock:
- self._current_frame = frame.copy()
-
- # 非阻塞写入帧队列,满则丢弃旧帧
- try:
- # 先清空队列确保只保留最新帧
- while not self._frame_queue.empty():
- try:
- self._frame_queue.get_nowait()
- except queue.Empty:
- break
- self._frame_queue.put(frame.copy(), block=False)
- except queue.Full:
- pass
-
- with self._status_lock:
- self._status.consecutive_errors = 0
- self._status.total_frames += 1
- self._status.last_frame_time = time.time()
- self._status.is_connected = True
- alpha = 0.1
- self._status.avg_read_time_ms = alpha * read_time_ms + (1 - alpha) * self._status.avg_read_time_ms
-
- # FPS 计算
- self._fps_frame_count += 1
- elapsed = time.time() - self._last_fps_time
- if elapsed >= 1.0:
- self._status.fps_actual = self._fps_frame_count / elapsed
- self._fps_frame_count = 0
- self._last_fps_time = time.time()
-
- def _on_read_error(self):
- """读帧错误处理"""
- with self._status_lock:
- self._status.consecutive_errors += 1
- self._status.total_errors += 1
-
- if self._status.consecutive_errors > self.config.max_consecutive_errors:
- logger.warning(f"[{self.config.camera_id}] 连续{self._status.consecutive_errors}次读帧失败,触发重连")
-
- time.sleep(0.01)
-
- def _reconnect(self):
- """重连"""
- with self._status_lock:
- self._status.reconnect_count += 1
- self._status.is_connected = False
-
- logger.info(f"[{self.config.camera_id}] 尝试重连...")
-
- if self._cap is not None:
- try:
- self._cap.release()
- except Exception:
- pass
- self._cap = None
-
- time.sleep(self.config.reconnect_interval)
-
- if self._connect():
- logger.info(f"[{self.config.camera_id}] 重连成功")
- else:
- logger.warning(f"[{self.config.camera_id}] 重连失败,{self.config.reconnect_interval}秒后重试")
-
- @property
- def is_healthy(self) -> bool:
- """流是否健康"""
- s = self.status
- if not s.is_running:
- return False
- if s.consecutive_errors > 10:
- return False
- if s.last_frame_time > 0 and (time.time() - s.last_frame_time) > 5.0:
- return False
- return True
- class DualStreamManager:
- """
- 双路流管理器
-
- 管理全景和球机两路视频流,提供统一的帧获取接口。
- 每路流使用独立线程和独立锁,互不阻塞。
- """
-
- PANORAMA_ID = "panorama"
- PTZ_ID = "ptz"
-
- def __init__(self):
- self._workers: Dict[str, StreamWorker] = {}
- self._running = False
-
- def add_stream(self, config: StreamConfig) -> StreamWorker:
- """添加一路视频流"""
- if config.camera_id in self._workers:
- logger.warning(f"流 {config.camera_id} 已存在,将先停止")
- self._workers[config.camera_id].stop()
-
- worker = StreamWorker(config)
- self._workers[config.camera_id] = worker
- return worker
-
- def start_stream(self, camera_id: str) -> bool:
- """启动指定流"""
- if camera_id not in self._workers:
- logger.error(f"流 {camera_id} 未注册")
- return False
- return self._workers[camera_id].start()
-
- def stop_stream(self, camera_id: str):
- """停止指定流"""
- if camera_id in self._workers:
- self._workers[camera_id].stop()
-
- def start_all(self) -> bool:
- """启动所有流"""
- success = True
- for camera_id, worker in self._workers.items():
- if not worker.start():
- logger.error(f"启动流 {camera_id} 失败")
- success = False
- else:
- logger.info(f"流 {camera_id} 启动中...")
- self._running = True
- return success
-
- def stop_all(self):
- """停止所有流"""
- for camera_id, worker in self._workers.items():
- worker.stop()
- self._running = False
-
- def get_frame(self, camera_id: str) -> Optional[cv2.Mat]:
- """获取指定摄像头的最新帧"""
- if camera_id not in self._workers:
- return None
- return self._workers[camera_id].get_frame()
-
- def get_panorama_frame(self) -> Optional[cv2.Mat]:
- """获取全景摄像头最新帧"""
- return self.get_frame(self.PANORAMA_ID)
-
- def get_ptz_frame(self) -> Optional[cv2.Mat]:
- """获取球机最新帧"""
- return self.get_frame(self.PTZ_ID)
-
- def get_status(self, camera_id: str) -> Optional[StreamStatus]:
- """获取指定流的状态"""
- if camera_id not in self._workers:
- return None
- return self._workers[camera_id].status
-
- def get_all_status(self) -> Dict[str, StreamStatus]:
- """获取所有流的状态"""
- return {cid: w.status for cid, w in self._workers.items()}
-
- def is_healthy(self, camera_id: str) -> bool:
- """检查指定流是否健康"""
- if camera_id not in self._workers:
- return False
- return self._workers[camera_id].is_healthy
-
- def wait_for_frames(self, timeout: float = 15.0) -> Dict[str, bool]:
- """
- 等待所有流就绪(获取到至少一帧)
-
- Returns:
- Dict[camera_id, 是否有帧]
- """
- start_time = time.time()
- ready = {cid: False for cid in self._workers}
-
- while time.time() - start_time < timeout:
- all_ready = True
- for cid in self._workers:
- if not ready[cid]:
- frame = self.get_frame(cid)
- if frame is not None:
- ready[cid] = True
- logger.info(f"[{cid}] 帧就绪 ({frame.shape})")
- if not ready[cid]:
- all_ready = False
-
- if all_ready:
- break
-
- time.sleep(0.5)
-
- for cid, r in ready.items():
- if not r:
- logger.warning(f"[{cid}] 等待帧超时({timeout}s)")
-
- return ready
-
- def get_stream_health(self, camera_id: str):
- """获取流健康详情(来自video_lock模块)"""
- return get_stream_health(camera_id)
-
- @property
- def panorama_worker(self) -> Optional[StreamWorker]:
- """获取全景流工作线程"""
- return self._workers.get(self.PANORAMA_ID)
-
- @property
- def ptz_worker(self) -> Optional[StreamWorker]:
- """获取球机流工作线程"""
- return self._workers.get(self.PTZ_ID)
-
- @staticmethod
- def build_rtsp_url(config: dict) -> str:
- """从配置构建RTSP URL"""
- if 'rtsp_url' in config and config['rtsp_url']:
- return config['rtsp_url']
-
- channel = config.get('channel', 0)
- return (f"rtsp://{config['username']}:{config['password']}"
- f"@{config['ip']}:{config.get('rtsp_port', 554)}"
- f"/cam/realmonitor?channel={channel + 1}&subtype=1")
- def create_dual_stream_manager(panorama_config: dict, ptz_config: dict) -> DualStreamManager:
- """
- 便捷函数:根据摄像头配置创建双路流管理器
-
- Args:
- panorama_config: 全景摄像头配置
- ptz_config: 球机配置
-
- Returns:
- 已配置的 DualStreamManager 实例
- """
- manager = DualStreamManager()
-
- pan_rtsp = DualStreamManager.build_rtsp_url(panorama_config)
- ptz_rtsp = DualStreamManager.build_rtsp_url(ptz_config)
-
- manager.add_stream(StreamConfig(
- camera_id=DualStreamManager.PANORAMA_ID,
- rtsp_url=pan_rtsp,
- camera_type="panorama"
- ))
-
- manager.add_stream(StreamConfig(
- camera_id=DualStreamManager.PTZ_ID,
- rtsp_url=ptz_rtsp,
- camera_type="ptz"
- ))
-
- return manager
|