فهرست منبع

feat(coordinator): 实现异步联动控制器并改进视频流管理

新增 AsyncCoordinator 实现检测与PTZ控制线程分离,避免阻塞
重构视频锁模块为每摄像头独立锁,增加超时和健康监控机制
添加双路流管理器 DualStreamManager 独立管理全景和球机视频流
PTZ球机支持精确到位确认和位置验证功能
wenhongquan 3 روز پیش
والد
کامیت
20287761c8

+ 255 - 42
dual_camera_system/coordinator.py

@@ -498,86 +498,299 @@ class Coordinator:
 
 
 class EventDrivenCoordinator(Coordinator):
-    """
-    事件驱动联动控制器
-    当全景摄像头检测到事件时触发联动
-    """
+    """事件驱动联动控制器,当全景摄像头检测到事件时触发联动"""
     
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
-        
-        # 事件类型
         self.event_types = {
-            'intruder': True,      # 入侵检测
-            'crossline': True,     # 越界检测
-            'motion': True,        # 移动侦测
+            'intruder': True,
+            'crossline': True,
+            'motion': True,
         }
-        
-        # 事件队列
         self.event_queue = queue.Queue()
     
     def on_event(self, event_type: str, event_data: dict):
-        """
-        事件回调
-        Args:
-            event_type: 事件类型
-            event_data: 事件数据
-        """
         if not self.event_types.get(event_type, False):
             return
-        
-        self.event_queue.put({
-            'type': event_type,
-            'data': event_data,
-            'time': time.time()
-        })
+        self.event_queue.put({'type': event_type, 'data': event_data, 'time': time.time()})
     
     def _coordinator_worker(self):
-        """联动工作线程 (事件驱动版本)"""
         while self.running:
             try:
-                # 处理事件
                 try:
                     event = self.event_queue.get(timeout=0.1)
                     self._process_event(event)
                 except queue.Empty:
                     pass
                 
-                # 继续常规处理
                 frame = self.panorama.get_frame()
                 if frame is not None:
                     frame_size = (frame.shape[1], frame.shape[0])
                     detections = self._detect_persons(frame)
-                    
                     if detections:
                         tracked = self.tracker.update(detections)
                         self._update_tracking_targets(tracked, frame_size)
                         self._process_current_target(frame, frame_size)
                 
                 self._cleanup_expired_targets()
-                
             except Exception as e:
                 print(f"事件处理错误: {e}")
                 time.sleep(0.1)
     
     def _process_event(self, event: dict):
-        """处理事件"""
         event_type = event['type']
         event_data = event['data']
-        
         print(f"处理事件: {event_type}")
         
-        # 根据事件类型处理
-        if event_type == 'intruder':
-            # 入侵事件 - 获取入侵位置
-            if 'position' in event_data:
-                x_ratio, y_ratio = event_data['position']
-                self.force_track_position(x_ratio, y_ratio)
+        if event_type == 'intruder' and 'position' in event_data:
+            x_ratio, y_ratio = event_data['position']
+            self.force_track_position(x_ratio, y_ratio)
+
+
+@dataclass
+class PTZCommand:
+    """PTZ控制命令"""
+    pan: float
+    tilt: float
+    zoom: int
+    x_ratio: float = 0.0
+    y_ratio: float = 0.0
+    use_calibration: bool = True
+
+
+class AsyncCoordinator(Coordinator):
+    """
+    异步联动控制器 — 检测线程与PTZ控制线程分离
+    
+    改进:
+    1. 检测线程:持续读取全景帧 + YOLO推理
+    2. PTZ控制线程:通过命令队列接收目标位置,独立控制球机
+    3. 两线程通过 queue 通信,互不阻塞
+    4. PTZ位置确认:移动后等待球机到位并验证帧
+    """
+    
+    PTZ_CONFIRM_WAIT = 0.3          # PTZ命令后等待稳定的秒数
+    PTZ_CONFIRM_TIMEOUT = 2.0       # PTZ位置确认超时
+    PTZ_COMMAND_COOLDOWN = 0.15     # PTZ命令最小间隔秒数
+    
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        
+        # PTZ命令队列(检测→PTZ)
+        self._ptz_queue: queue.Queue = queue.Queue(maxsize=10)
+        
+        # 线程
+        self._detection_thread = None
+        self._ptz_thread = None
+        
+        # PTZ确认回调
+        self._on_ptz_confirmed: Optional[Callable] = None
+        
+        # 上次PTZ命令时间
+        self._last_ptz_time = 0.0
+    
+    def start(self) -> bool:
+        """启动联动(覆盖父类,启动双线程)"""
+        if not self.panorama.connect():
+            print("连接全景摄像头失败")
+            return False
+        
+        if self.enable_ptz_camera:
+            if not self.ptz.connect():
+                print("连接球机失败")
+                self.panorama.disconnect()
+                return False
+        else:
+            print("PTZ球机功能已禁用")
+        
+        if not self.panorama.start_stream_rtsp():
+            print("RTSP视频流启动失败,尝试SDK方式...")
+            if not self.panorama.start_stream():
+                print("启动视频流失败")
+                self.panorama.disconnect()
+                if self.enable_ptz_camera:
+                    self.ptz.disconnect()
+                return False
+        
+        self.running = True
+        
+        # 启动检测线程
+        self._detection_thread = threading.Thread(
+            target=self._detection_worker, name="detection-worker", daemon=True)
+        self._detection_thread.start()
+        
+        # 启动PTZ控制线程
+        if self.enable_ptz_camera and self.enable_ptz_tracking:
+            self._ptz_thread = threading.Thread(
+                target=self._ptz_worker, name="ptz-worker", daemon=True)
+            self._ptz_thread.start()
+        
+        print("异步联动系统已启动 (检测线程 + PTZ控制线程)")
+        return True
+    
+    def stop(self):
+        """停止联动"""
+        self.running = False
+        
+        # 清空PTZ队列,让工作线程退出
+        while not self._ptz_queue.empty():
+            try:
+                self._ptz_queue.get_nowait()
+            except queue.Empty:
+                break
+        
+        if self._detection_thread:
+            self._detection_thread.join(timeout=3)
+        if self._ptz_thread:
+            self._ptz_thread.join(timeout=3)
+        
+        # 停止父类线程(如果有的话)
+        if self.coordinator_thread:
+            self.coordinator_thread.join(timeout=1)
         
-        elif event_type == 'crossline':
-            # 越界事件
-            pass
+        self.panorama.disconnect()
+        if self.enable_ptz_camera:
+            self.ptz.disconnect()
         
-        elif event_type == 'motion':
-            # 移动侦测事件
-            pass
+        self._print_stats()
+        print("异步联动系统已停止")
+    
+    def _detection_worker(self):
+        """检测线程:持续读帧 + 推理 + 发送PTZ命令"""
+        last_detection_time = 0
+        detection_interval = 0.1
+        
+        with self.stats_lock:
+            self.stats['start_time'] = time.time()
+        
+        while self.running:
+            try:
+                current_time = time.time()
+                frame = self.panorama.get_frame()
+                if frame is None:
+                    time.sleep(0.01)
+                    continue
+                
+                self._update_stats('frames_processed')
+                frame_size = (frame.shape[1], frame.shape[0])
+                
+                if current_time - last_detection_time >= detection_interval:
+                    last_detection_time = current_time
+                    
+                    detections = self._detect_persons(frame)
+                    if detections:
+                        self._update_stats('persons_detected', len(detections))
+                    
+                    tracked = self.tracker.update(detections)
+                    self._update_tracking_targets(tracked, frame_size)
+                    
+                    if tracked:
+                        self._process_detections(tracked, frame, frame_size)
+                    
+                    # 选择跟踪目标并发送PTZ命令
+                    target = self._select_tracking_target()
+                    if target and self.enable_ptz_tracking and self.enable_ptz_camera:
+                        self._send_ptz_command(target, frame_size)
+                
+                self._cleanup_expired_targets()
+                time.sleep(0.01)
+                
+            except Exception as e:
+                print(f"检测线程错误: {e}")
+                time.sleep(0.1)
+    
+    def _ptz_worker(self):
+        """PTZ控制线程:从队列接收命令并控制球机"""
+        while self.running:
+            try:
+                try:
+                    cmd = self._ptz_queue.get(timeout=0.1)
+                except queue.Empty:
+                    continue
+                
+                self._execute_ptz_command(cmd)
+                
+            except Exception as e:
+                print(f"PTZ控制线程错误: {e}")
+                time.sleep(0.05)
+    
+    def _select_tracking_target(self) -> Optional[TrackingTarget]:
+        """选择当前跟踪目标"""
+        with self.targets_lock:
+            if not self.tracking_targets:
+                self._set_state(TrackingState.IDLE)
+                self.current_target = None
+                return None
+            
+            if self.current_target is None or \
+               self.current_target.track_id not in self.tracking_targets:
+                target_id = list(self.tracking_targets.keys())[0]
+                self.current_target = self.tracking_targets[target_id]
+            
+            return self.current_target
+    
+    def _send_ptz_command(self, target: TrackingTarget, frame_size: Tuple[int, int]):
+        """将跟踪目标转化为PTZ命令放入队列"""
+        x_ratio, y_ratio = target.position
+        
+        # 检查位置变化是否超过阈值
+        if self.last_ptz_position is not None:
+            last_x, last_y = self.last_ptz_position
+            if abs(x_ratio - last_x) < self.ptz_position_threshold and \
+               abs(y_ratio - last_y) < self.ptz_position_threshold:
+                return
+        
+        # 冷却检查
+        current_time = time.time()
+        if current_time - self._last_ptz_time < self.PTZ_COMMAND_COOLDOWN:
+            return
+        
+        cmd = PTZCommand(
+            pan=0, tilt=0, zoom=0,
+            x_ratio=x_ratio, y_ratio=y_ratio,
+            use_calibration=self.enable_calibration
+        )
+        
+        try:
+            self._ptz_queue.put_nowait(cmd)
+            self.last_ptz_position = (x_ratio, y_ratio)
+        except queue.Full:
+            pass  # 丢弃命令,下一个检测周期会重发
+    
+    def _execute_ptz_command(self, cmd: PTZCommand):
+        """执行PTZ命令(在PTZ线程中)"""
+        self._last_ptz_time = time.time()
+        
+        if cmd.use_calibration and self.calibrator and self.calibrator.is_calibrated():
+            pan, tilt = self.calibrator.transform(cmd.x_ratio, cmd.y_ratio)
+            if self.ptz.ptz_config.get('pan_flip', False):
+                pan = (pan + 180) % 360
+            zoom = self.ptz.ptz_config.get('default_zoom', 8)
+        else:
+            pan, tilt, zoom = self.ptz.calculate_ptz_position(cmd.x_ratio, cmd.y_ratio)
+        
+        self._set_state(TrackingState.TRACKING)
+        success = self.ptz.goto_exact_position(pan, tilt, zoom)
+        
+        if success:
+            # PTZ位置确认:等待球机稳定后读取球机帧验证
+            time.sleep(self.PTZ_CONFIRM_WAIT)
+            self._confirm_ptz_position(cmd.x_ratio, cmd.y_ratio)
+        else:
+            print(f"[AsyncCoordinator] PTZ命令执行失败: pan={pan:.1f}, tilt={tilt:.1f}")
+    
+    def _confirm_ptz_position(self, x_ratio: float, y_ratio: float):
+        """PTZ位置确认:读取球机帧验证目标是否可见"""
+        if not hasattr(self.ptz, 'get_frame') or self.ptz.get_frame() is None:
+            return
+        
+        ptz_frame = self.ptz.get_frame()
+        if ptz_frame is None:
+            return
+        
+        # 未来可以在这里添加球机帧目标验证逻辑
+        # 例如:在球机帧中检测目标是否在画面中心附近
+    
+    def on_ptz_confirmed(self, callback: Callable):
+        """注册PTZ位置确认回调"""
+        self._on_ptz_confirmed = callback

+ 467 - 0
dual_camera_system/dual_stream_manager.py

@@ -0,0 +1,467 @@
+"""
+双路流管理器
+
+独立管理全景和球机两路视频流,每路使用独立线程和独立锁,
+互不阻塞。提供统一帧获取接口、健康监控、自动重连。
+
+关键改进:
+- 两路流完全独立,一路卡住不影响另一路
+- 每路有自己的锁(per-camera lock),不再用全局 FFmpeg 锁
+- 超时读帧 + 自动重连 + 健康状态监控
+- 帧新鲜度追踪:确保读到的是最近帧而非缓冲区中的旧帧
+"""
+
+import os
+os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
+
+import cv2
+import threading
+import time
+import queue
+import logging
+from typing import Optional, Dict, Tuple, Callable
+from dataclasses import dataclass, field
+
+from video_lock import safe_read, safe_is_opened, get_stream_health, CameraLockManager
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class StreamConfig:
+    """流配置"""
+    camera_id: str
+    rtsp_url: str
+    camera_type: str = "panorama"  # "panorama" 或 "ptz"
+    buffer_size: int = 2            # 帧缓冲大小(保持低延迟)
+    reconnect_interval: float = 5.0  # 重连间隔(秒)
+    max_consecutive_errors: int = 50 # 最大连续错误数
+    read_timeout: float = 2.0       # 读帧超时(秒)
+    fps_target: float = 25.0        # 目标帧率
+
+
+@dataclass
+class StreamStatus:
+    """流状态"""
+    is_running: bool = False
+    is_connected: bool = False
+    consecutive_errors: int = 0
+    total_frames: int = 0
+    total_errors: int = 0
+    last_frame_time: float = 0.0
+    avg_read_time_ms: float = 0.0
+    reconnect_count: int = 0
+    fps_actual: float = 0.0
+
+
+class StreamWorker:
+    """单路视频流工作线程"""
+    
+    def __init__(self, config: StreamConfig):
+        self.config = config
+        self._cap = None
+        self._current_frame = None
+        self._frame_lock = threading.Lock()
+        self._frame_queue = queue.Queue(maxsize=config.buffer_size)
+        self._running = False
+        self._thread = None
+        self._status = StreamStatus()
+        self._status_lock = threading.Lock()
+        self._last_fps_time = time.time()
+        self._fps_frame_count = 0
+    
+    @property
+    def status(self) -> StreamStatus:
+        with self._status_lock:
+            return StreamStatus(
+                is_running=self._status.is_running,
+                is_connected=self._status.is_connected,
+                consecutive_errors=self._status.consecutive_errors,
+                total_frames=self._status.total_frames,
+                total_errors=self._status.total_errors,
+                last_frame_time=self._status.last_frame_time,
+                avg_read_time_ms=self._status.avg_read_time_ms,
+                reconnect_count=self._status.reconnect_count,
+                fps_actual=self._status.fps_actual
+            )
+    
+    def start(self) -> bool:
+        """启动视频流"""
+        if self._running:
+            logger.warning(f"[{self.config.camera_id}] 流已在运行")
+            return True
+        
+        if not self._connect():
+            logger.error(f"[{self.config.camera_id}] 初始连接失败")
+            # 不返回False,启动重连线程
+            self._status.is_running = True
+            self._status.is_connected = False
+        else:
+            self._status.is_running = True
+            self._status.is_connected = True
+        
+        self._running = True
+        self._thread = threading.Thread(target=self._stream_loop, daemon=True)
+        self._thread.start()
+        logger.info(f"[{self.config.camera_id}] 流工作线程已启动")
+        return True
+    
+    def stop(self):
+        """停止视频流"""
+        self._running = False
+        if self._thread:
+            self._thread.join(timeout=3)
+            self._thread = None
+        
+        if self._cap is not None:
+            try:
+                self._cap.release()
+            except Exception:
+                pass
+            self._cap = None
+        
+        with self._status_lock:
+            self._status.is_running = False
+            self._status.is_connected = False
+        
+        logger.info(f"[{self.config.camera_id}] 流已停止")
+    
+    def get_frame(self) -> Optional[cv2.Mat]:
+        """获取最新帧(线程安全副本)"""
+        with self._frame_lock:
+            if self._current_frame is not None:
+                return self._current_frame.copy()
+            return None
+    
+    def get_frame_from_queue(self, timeout: float = 0.1) -> Optional[cv2.Mat]:
+        """从帧队列获取帧"""
+        try:
+            return self._frame_queue.get(timeout=timeout)
+        except queue.Empty:
+            return None
+    
+    def _connect(self) -> bool:
+        """建立RTSP连接"""
+        if self._cap is not None:
+            try:
+                self._cap.release()
+            except Exception:
+                pass
+            self._cap = None
+        
+        try:
+            self._cap = cv2.VideoCapture(self.config.rtsp_url, cv2.CAP_FFMPEG)
+            if not self._cap.isOpened():
+                # 尝试 GStreamer 后端
+                try:
+                    self._cap = cv2.VideoCapture(self.config.rtsp_url, cv2.CAP_GSTREAMER)
+                    if self._cap.isOpened():
+                        logger.info(f"[{self.config.camera_id}] 使用 GStreamer 后端连接成功")
+                        self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
+                        return True
+                except Exception:
+                    pass
+                
+                logger.error(f"[{self.config.camera_id}] 无法打开RTSP流")
+                self._cap = None
+                return False
+            
+            self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
+            logger.info(f"[{self.config.camera_id}] RTSP流连接成功")
+            return True
+            
+        except Exception as e:
+            logger.error(f"[{self.config.camera_id}] 连接异常: {e}")
+            self._cap = None
+            return False
+    
+    def _stream_loop(self):
+        """流工作主循环"""
+        import signal
+        if hasattr(signal, 'pthread_sigmask'):
+            try:
+                signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
+            except (AttributeError, OSError):
+                pass
+        
+        logger.info(f"[{self.config.camera_id}] 流循环开始")
+        
+        while self._running:
+            try:
+                # 如果未连接,尝试重连
+                if self._cap is None or not safe_is_opened(self._cap, self.config.camera_id):
+                    self._reconnect()
+                    continue
+                
+                # 读帧(使用独立锁+超时)
+                start_time = time.time()
+                ret, frame = safe_read(self._cap, self.config.camera_id, self.config.read_timeout)
+                
+                if not ret or frame is None:
+                    self._on_read_error()
+                    continue
+                
+                self._on_read_success(frame, start_time)
+                
+            except Exception as e:
+                err_str = str(e)
+                if 'async_lock' in err_str or 'Assertion' in err_str:
+                    logger.warning(f"[{self.config.camera_id}] FFmpeg内部错误,重建连接: {e}")
+                    self._reconnect()
+                else:
+                    logger.error(f"[{self.config.camera_id}] 流错误: {e}")
+                    time.sleep(0.5)
+    
+    def _on_read_success(self, frame, start_time: float):
+        """读帧成功处理"""
+        read_time_ms = (time.time() - start_time) * 1000
+        
+        with self._frame_lock:
+            self._current_frame = frame.copy()
+        
+        # 非阻塞写入帧队列,满则丢弃旧帧
+        try:
+            # 先清空队列确保只保留最新帧
+            while not self._frame_queue.empty():
+                try:
+                    self._frame_queue.get_nowait()
+                except queue.Empty:
+                    break
+            self._frame_queue.put(frame.copy(), block=False)
+        except queue.Full:
+            pass
+        
+        with self._status_lock:
+            self._status.consecutive_errors = 0
+            self._status.total_frames += 1
+            self._status.last_frame_time = time.time()
+            self._status.is_connected = True
+            alpha = 0.1
+            self._status.avg_read_time_ms = alpha * read_time_ms + (1 - alpha) * self._status.avg_read_time_ms
+            
+            # FPS 计算
+            self._fps_frame_count += 1
+            elapsed = time.time() - self._last_fps_time
+            if elapsed >= 1.0:
+                self._status.fps_actual = self._fps_frame_count / elapsed
+                self._fps_frame_count = 0
+                self._last_fps_time = time.time()
+    
+    def _on_read_error(self):
+        """读帧错误处理"""
+        with self._status_lock:
+            self._status.consecutive_errors += 1
+            self._status.total_errors += 1
+            
+            if self._status.consecutive_errors > self.config.max_consecutive_errors:
+                logger.warning(f"[{self.config.camera_id}] 连续{self._status.consecutive_errors}次读帧失败,触发重连")
+        
+        time.sleep(0.01)
+    
+    def _reconnect(self):
+        """重连"""
+        with self._status_lock:
+            self._status.reconnect_count += 1
+            self._status.is_connected = False
+        
+        logger.info(f"[{self.config.camera_id}] 尝试重连...")
+        
+        if self._cap is not None:
+            try:
+                self._cap.release()
+            except Exception:
+                pass
+            self._cap = None
+        
+        time.sleep(self.config.reconnect_interval)
+        
+        if self._connect():
+            logger.info(f"[{self.config.camera_id}] 重连成功")
+        else:
+            logger.warning(f"[{self.config.camera_id}] 重连失败,{self.config.reconnect_interval}秒后重试")
+    
+    @property
+    def is_healthy(self) -> bool:
+        """流是否健康"""
+        s = self.status
+        if not s.is_running:
+            return False
+        if s.consecutive_errors > 10:
+            return False
+        if s.last_frame_time > 0 and (time.time() - s.last_frame_time) > 5.0:
+            return False
+        return True
+
+
+class DualStreamManager:
+    """
+    双路流管理器
+    
+    管理全景和球机两路视频流,提供统一的帧获取接口。
+    每路流使用独立线程和独立锁,互不阻塞。
+    """
+    
+    PANORAMA_ID = "panorama"
+    PTZ_ID = "ptz"
+    
+    def __init__(self):
+        self._workers: Dict[str, StreamWorker] = {}
+        self._running = False
+    
+    def add_stream(self, config: StreamConfig) -> StreamWorker:
+        """添加一路视频流"""
+        if config.camera_id in self._workers:
+            logger.warning(f"流 {config.camera_id} 已存在,将先停止")
+            self._workers[config.camera_id].stop()
+        
+        worker = StreamWorker(config)
+        self._workers[config.camera_id] = worker
+        return worker
+    
+    def start_stream(self, camera_id: str) -> bool:
+        """启动指定流"""
+        if camera_id not in self._workers:
+            logger.error(f"流 {camera_id} 未注册")
+            return False
+        return self._workers[camera_id].start()
+    
+    def stop_stream(self, camera_id: str):
+        """停止指定流"""
+        if camera_id in self._workers:
+            self._workers[camera_id].stop()
+    
+    def start_all(self) -> bool:
+        """启动所有流"""
+        success = True
+        for camera_id, worker in self._workers.items():
+            if not worker.start():
+                logger.error(f"启动流 {camera_id} 失败")
+                success = False
+            else:
+                logger.info(f"流 {camera_id} 启动中...")
+        self._running = True
+        return success
+    
+    def stop_all(self):
+        """停止所有流"""
+        for camera_id, worker in self._workers.items():
+            worker.stop()
+        self._running = False
+    
+    def get_frame(self, camera_id: str) -> Optional[cv2.Mat]:
+        """获取指定摄像头的最新帧"""
+        if camera_id not in self._workers:
+            return None
+        return self._workers[camera_id].get_frame()
+    
+    def get_panorama_frame(self) -> Optional[cv2.Mat]:
+        """获取全景摄像头最新帧"""
+        return self.get_frame(self.PANORAMA_ID)
+    
+    def get_ptz_frame(self) -> Optional[cv2.Mat]:
+        """获取球机最新帧"""
+        return self.get_frame(self.PTZ_ID)
+    
+    def get_status(self, camera_id: str) -> Optional[StreamStatus]:
+        """获取指定流的状态"""
+        if camera_id not in self._workers:
+            return None
+        return self._workers[camera_id].status
+    
+    def get_all_status(self) -> Dict[str, StreamStatus]:
+        """获取所有流的状态"""
+        return {cid: w.status for cid, w in self._workers.items()}
+    
+    def is_healthy(self, camera_id: str) -> bool:
+        """检查指定流是否健康"""
+        if camera_id not in self._workers:
+            return False
+        return self._workers[camera_id].is_healthy
+    
+    def wait_for_frames(self, timeout: float = 15.0) -> Dict[str, bool]:
+        """
+        等待所有流就绪(获取到至少一帧)
+        
+        Returns:
+            Dict[camera_id, 是否有帧]
+        """
+        start_time = time.time()
+        ready = {cid: False for cid in self._workers}
+        
+        while time.time() - start_time < timeout:
+            all_ready = True
+            for cid in self._workers:
+                if not ready[cid]:
+                    frame = self.get_frame(cid)
+                    if frame is not None:
+                        ready[cid] = True
+                        logger.info(f"[{cid}] 帧就绪 ({frame.shape})")
+                if not ready[cid]:
+                    all_ready = False
+            
+            if all_ready:
+                break
+            
+            time.sleep(0.5)
+        
+        for cid, r in ready.items():
+            if not r:
+                logger.warning(f"[{cid}] 等待帧超时({timeout}s)")
+        
+        return ready
+    
+    def get_stream_health(self, camera_id: str):
+        """获取流健康详情(来自video_lock模块)"""
+        return get_stream_health(camera_id)
+    
+    @property
+    def panorama_worker(self) -> Optional[StreamWorker]:
+        """获取全景流工作线程"""
+        return self._workers.get(self.PANORAMA_ID)
+    
+    @property
+    def ptz_worker(self) -> Optional[StreamWorker]:
+        """获取球机流工作线程"""
+        return self._workers.get(self.PTZ_ID)
+    
+    @staticmethod
+    def build_rtsp_url(config: dict) -> str:
+        """从配置构建RTSP URL"""
+        if 'rtsp_url' in config and config['rtsp_url']:
+            return config['rtsp_url']
+        
+        channel = config.get('channel', 0)
+        return (f"rtsp://{config['username']}:{config['password']}"
+                f"@{config['ip']}:{config.get('rtsp_port', 554)}"
+                f"/cam/realmonitor?channel={channel + 1}&subtype=1")
+
+
+def create_dual_stream_manager(panorama_config: dict, ptz_config: dict) -> DualStreamManager:
+    """
+    便捷函数:根据摄像头配置创建双路流管理器
+    
+    Args:
+        panorama_config: 全景摄像头配置
+        ptz_config: 球机配置
+    
+    Returns:
+        已配置的 DualStreamManager 实例
+    """
+    manager = DualStreamManager()
+    
+    pan_rtsp = DualStreamManager.build_rtsp_url(panorama_config)
+    ptz_rtsp = DualStreamManager.build_rtsp_url(ptz_config)
+    
+    manager.add_stream(StreamConfig(
+        camera_id=DualStreamManager.PANORAMA_ID,
+        rtsp_url=pan_rtsp,
+        camera_type="panorama"
+    ))
+    
+    manager.add_stream(StreamConfig(
+        camera_id=DualStreamManager.PTZ_ID,
+        rtsp_url=ptz_rtsp,
+        camera_type="ptz"
+    ))
+    
+    return manager

+ 3 - 3
dual_camera_system/main.py

@@ -34,7 +34,7 @@ from dahua_sdk import DahuaSDK
 from panorama_camera import PanoramaCamera, ObjectDetector, PersonTracker, DetectedObject
 from ptz_camera import PTZCamera, PTZController
 from ocr_recognizer import NumberDetector, PersonInfo
-from coordinator import Coordinator, EventDrivenCoordinator
+from coordinator import Coordinator, EventDrivenCoordinator, AsyncCoordinator
 
 
 # 配置日志 - 使用LOG_CONFIG
@@ -168,8 +168,8 @@ class DualCameraSystem:
         ptz_config = self.config.get('ptz_camera', PTZ_CAMERA)
         self.ptz_camera = PTZCamera(self.sdk, ptz_config)
         
-        # 初始化联动控制器
-        self.coordinator = Coordinator(
+        # 初始化联动控制器(使用异步版本,检测与PTZ分线程)
+        self.coordinator = AsyncCoordinator(
             self.panorama_camera,
             self.ptz_camera,
             self.detector,

+ 7 - 6
dual_camera_system/panorama_camera.py

@@ -53,6 +53,7 @@ class PanoramaCamera:
         self.current_frame = None
         self.frame_lock = threading.Lock()
         self.rtsp_cap = None  # RTSP视频捕获
+        self._camera_id = 'panorama'  # 用于per-camera锁
         
         # 检测器
         self.detector = None
@@ -173,8 +174,8 @@ class PanoramaCamera:
                         pass
                 
                 # RTSP 模式获取帧 (推荐方式)
-                if self.rtsp_cap is not None and safe_is_opened(self.rtsp_cap):
-                    ret, frame = safe_read(self.rtsp_cap)
+                if self.rtsp_cap is not None and safe_is_opened(self.rtsp_cap, self._camera_id):
+                    ret, frame = safe_read(self.rtsp_cap, self._camera_id)
                     if ret and frame is not None:
                         with self.frame_lock:
                             self.current_frame = frame.copy()
@@ -196,7 +197,7 @@ class PanoramaCamera:
                             self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
                             self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)  # 减少缓冲延迟
                         
-                        if safe_is_opened(self.rtsp_cap):
+                        if safe_is_opened(self.rtsp_cap, self._camera_id):
                             retry_count = 0
                             continue
                     except Exception as e:
@@ -245,11 +246,11 @@ class PanoramaCamera:
         
         while self.running:
             try:
-                if self.rtsp_cap is None or not safe_is_opened(self.rtsp_cap):
+                if self.rtsp_cap is None or not safe_is_opened(self.rtsp_cap, self._camera_id):
                     time.sleep(0.1)
                     continue
                 
-                ret, frame = safe_read(self.rtsp_cap)
+                ret, frame = safe_read(self.rtsp_cap, self._camera_id)
                 if not ret or frame is None:
                     error_count += 1
                     if error_count > max_consecutive_errors:
@@ -293,7 +294,7 @@ class PanoramaCamera:
         
         try:
             self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
-            if safe_is_opened(self.rtsp_cap):
+            if safe_is_opened(self.rtsp_cap, self._camera_id):
                 self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
                 print("全景RTSP流重连成功")
             else:

+ 81 - 5
dual_camera_system/ptz_camera.py

@@ -57,6 +57,7 @@ class PTZCamera:
         self.frame_lock = threading.Lock()
         self.stream_thread = None
         self.running_stream = False
+        self._camera_id = 'ptz'  # 用于per-camera锁
     
     def connect(self) -> bool:
         """
@@ -99,10 +100,22 @@ class PTZCamera:
                 f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/cam/realmonitor?channel=1&subtype=1"
         
         try:
+            # 先尝试FFmpeg后端
             self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
             if not self.rtsp_cap.isOpened():
-                print(f"[PTZCamera] 无法打开RTSP流: {rtsp_url}")
-                return False
+                # FFmpeg失败,尝试GStreamer后端(ARM64上更稳定)
+                print(f"[PTZCamera] FFmpeg后端无法打开RTSP流,尝试GStreamer后端...")
+                try:
+                    gst_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_GSTREAMER)
+                    if gst_cap.isOpened():
+                        self.rtsp_cap = gst_cap
+                        print(f"[PTZCamera] 使用GStreamer后端打开RTSP流成功")
+                    else:
+                        print(f"[PTZCamera] 无法打开RTSP流: {rtsp_url}")
+                        return False
+                except Exception as ge:
+                    print(f"[PTZCamera] GStreamer后端也不可用: {ge}")
+                    return False
             
             self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
             
@@ -129,11 +142,11 @@ class PTZCamera:
         
         while self.running_stream:
             try:
-                if self.rtsp_cap is None or not safe_is_opened(self.rtsp_cap):
+                if self.rtsp_cap is None or not safe_is_opened(self.rtsp_cap, self._camera_id):
                     time.sleep(0.1)
                     continue
                 
-                ret, frame = safe_read(self.rtsp_cap)
+                ret, frame = safe_read(self.rtsp_cap, self._camera_id)
                 if not ret or frame is None:
                     error_count += 1
                     if error_count > max_consecutive_errors:
@@ -174,7 +187,7 @@ class PTZCamera:
         
         try:
             self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
-            if safe_is_opened(self.rtsp_cap):
+            if safe_is_opened(self.rtsp_cap, self._camera_id):
                 self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
                 print("[PTZCamera] RTSP流重连成功")
             else:
@@ -381,6 +394,69 @@ class PTZCamera:
                 tilt=self.current_position.tilt,
                 zoom=self.current_position.zoom
             )
+    
+    def goto_and_confirm(self, pan: float, tilt: float, zoom: int,
+                         confirm_timeout: float = 1.0,
+                         get_frame_func=None) -> dict:
+        """
+        PTZ精确定位并确认到位
+        
+        Args:
+            pan, tilt, zoom: 目标位置
+            confirm_timeout: 确认超时秒数
+            get_frame_func: 获取球机帧的函数,用于验证
+        
+        Returns:
+            dict: {'success': bool, 'pan': float, 'tilt': float, 'zoom': int,
+                   'frame_available': bool, 'elapsed_ms': float}
+        """
+        import time as _time
+        start = _time.time()
+        
+        success = self.goto_exact_position(pan, tilt, zoom)
+        
+        result = {
+            'success': success,
+            'pan': pan,
+            'tilt': tilt,
+            'zoom': zoom,
+            'frame_available': False,
+            'elapsed_ms': (_time.time() - start) * 1000
+        }
+        
+        if not success:
+            return result
+        
+        # 等待球机物理移动到位
+        time.sleep(0.2)
+        
+        # 如果有帧获取函数,验证球机画面
+        if get_frame_func is not None:
+            deadline = _time.time() + confirm_timeout
+            while _time.time() < deadline:
+                frame = get_frame_func()
+                if frame is not None:
+                    result['frame_available'] = True
+                    break
+                time.sleep(0.05)
+        
+        result['elapsed_ms'] = (_time.time() - start) * 1000
+        return result
+    
+    def is_position_close(self, target_pan: float, target_tilt: float,
+                          threshold: float = 1.0) -> bool:
+        """
+        检查当前位置是否接近目标位置
+        
+        Args:
+            target_pan: 目标水平角度
+            target_tilt: 目标垂直角度
+            threshold: 角度容差(度)
+        """
+        current = self.get_current_position()
+        pan_diff = abs(current.pan - target_pan)
+        tilt_diff = abs(current.tilt - target_tilt)
+        return pan_diff <= threshold and tilt_diff <= threshold
 
 
 class PTZController:

+ 89 - 29
dual_camera_system/safety_coordinator.py

@@ -50,66 +50,52 @@ class AlertRecord:
 
 
 class SafetyCoordinator:
-    """
-    安全联动控制器
-    协调摄像头、安全检测、事件推送和语音播报
-    """
+    """安全联动控制器:协调摄像头、安全检测、事件推送、语音播报、PTZ跟踪"""
     
-    def __init__(self, camera, config: Dict = None):
-        """
-        初始化安全联动控制器
-        
-        Args:
-            camera: 摄像头实例 (支持 get_frame() 方法)
-            config: 配置覆盖
-        """
+    def __init__(self, camera, config: Dict = None, ptz_camera=None, calibrator=None):
         self.camera = camera
         self.config = config or {}
+        self.ptz = ptz_camera       # PTZ球机(可选)
+        self.calibrator = calibrator  # 校准器(可选)
         
-        # 安全检测器
         self.detector = None
-        
-        # 事件推送器
         self.event_pusher = None
-        
-        # 语音播报器
         self.voice_announcer = None
-        
-        # 事件监听器
         self.event_listener = None
         
-        # 状态
         self.state = CoordinatorState.IDLE
         self.state_lock = threading.Lock()
         
-        # 运行标志
         self.running = False
         self.worker_thread = None
         
+        # PTZ跟踪线程(独立于检测线程)
+        self._ptz_thread = None
+        self._ptz_queue: queue.Queue = queue.Queue(maxsize=10)
+        self._ptz_cooldown = 0.15
+        self._last_ptz_time = 0.0
+        
         # 跟踪状态
-        self.tracks = {}  # track_id -> {'center': (x,y), 'last_update': time, 'alerts': [...]}
+        self.tracks = {}
         self.next_track_id = 1
         
-        # 告警记录
         self.alert_records: List[AlertRecord] = []
-        self.alert_cooldown = {}  # track_id -> last_alert_time
+        self.alert_cooldown = {}
         
-        # 统计
         self.stats = {
             'frames_processed': 0,
             'persons_detected': 0,
             'violations_detected': 0,
             'events_pushed': 0,
             'voice_announced': 0,
+            'ptz_commands_sent': 0,
             'start_time': None
         }
         self.stats_lock = threading.Lock()
         
-        # 回调
         self.on_violation_detected: Optional[Callable] = None
         self.on_frame_processed: Optional[Callable] = None
         
-        # 初始化组件
         self._init_components()
     
     def _init_components(self):
@@ -184,7 +170,6 @@ class SafetyCoordinator:
         if self.running:
             return True
         
-        # 启动各组件
         if self.event_pusher:
             self.event_pusher.start()
         
@@ -194,11 +179,16 @@ class SafetyCoordinator:
         if self.event_listener:
             self.event_listener.start()
         
-        # 启动工作线程
         self.running = True
         self.worker_thread = threading.Thread(target=self._worker, daemon=True)
         self.worker_thread.start()
         
+        # 启动 PTZ 跟踪线程(如果 PTZ 可用)
+        if self.ptz and SYSTEM_CONFIG.get('enable_ptz_tracking', True):
+            self._ptz_thread = threading.Thread(target=self._ptz_worker, daemon=True)
+            self._ptz_thread.start()
+            print("[SafetyCoordinator] PTZ跟踪线程已启动")
+        
         with self.stats_lock:
             self.stats['start_time'] = time.time()
         
@@ -212,6 +202,11 @@ class SafetyCoordinator:
         if self.worker_thread:
             self.worker_thread.join(timeout=3)
         
+        # 停止 PTZ 跟踪线程
+        if self._ptz_thread:
+            self._ptz_thread.join(timeout=2)
+            self._ptz_thread = None
+        
         if self.event_pusher:
             self.event_pusher.stop()
         
@@ -373,6 +368,10 @@ class SafetyCoordinator:
         self.alert_records.append(record)
         self._update_stats('violations_detected')
         
+        # PTZ 跟踪违规人员(如果 PTZ 可用且启用)
+        if self.ptz and SYSTEM_CONFIG.get('enable_ptz_tracking', True):
+            self._track_violator_ptz(status, frame)
+        
         # 回调
         if self.on_violation_detected:
             self.on_violation_detected(status, frame)
@@ -394,6 +393,67 @@ class SafetyCoordinator:
         
         print(f"[告警] {description}, 跟踪ID: {track_id}")
     
+    def _track_violator_ptz(self, status: PersonSafetyStatus, frame: np.ndarray):
+        """违规人员PTZ跟踪:将违规人员在全景画面中的位置发送给PTZ线程"""
+        if self.ptz is None:
+            return
+        
+        frame_h, frame_w = frame.shape[:2]
+        x1, y1, x2, y2 = status.person_bbox
+        
+        # 计算违规人员在全景画面中的相对位置
+        center_x = (x1 + x2) / 2
+        center_y = (y1 + y2) / 2
+        x_ratio = center_x / frame_w
+        y_ratio = center_y / frame_h
+        
+        # 冷却检查
+        current_time = time.time()
+        if current_time - self._last_ptz_time < self._ptz_cooldown:
+            return
+        
+        # 发送PTZ命令
+        try:
+            self._ptz_queue.put_nowait({
+                'x_ratio': x_ratio,
+                'y_ratio': y_ratio,
+                'track_id': status.track_id,
+                'violation_type': status.violation_types[0].value if status.violation_types else 'unknown'
+            })
+            self._last_ptz_time = current_time
+            self._update_stats('ptz_commands_sent')
+        except queue.Full:
+            pass  # 队列满则丢弃,下一个检测周期会重发
+    
+    def _ptz_worker(self):
+        """PTZ控制工作线程:独立处理所有PTZ命令"""
+        while self.running:
+            try:
+                try:
+                    cmd = self._ptz_queue.get(timeout=0.1)
+                except queue.Empty:
+                    continue
+                
+                if self.ptz is None:
+                    continue
+                
+                x_ratio = cmd['x_ratio']
+                y_ratio = cmd['y_ratio']
+                
+                # 使用校准器转换坐标,或使用估算
+                if self.calibrator and self.calibrator.is_calibrated():
+                    pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
+                    zoom = self.ptz.ptz_config.get('default_zoom', 8)
+                    if self.ptz.ptz_config.get('pan_flip', False):
+                        pan = (pan + 180) % 360
+                    self.ptz.goto_exact_position(pan, tilt, zoom)
+                else:
+                    self.ptz.track_target(x_ratio, y_ratio)
+                
+            except Exception as e:
+                print(f"[SafetyCoordinator] PTZ跟踪错误: {e}")
+                time.sleep(0.05)
+    
     def _set_state(self, state: CoordinatorState):
         """设置状态"""
         with self.state_lock:

+ 48 - 1
dual_camera_system/safety_main.py

@@ -25,7 +25,7 @@ import numpy as np
 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
 
 from config import (
-    LOG_CONFIG, PANORAMA_CAMERA, SDK_PATH,
+    LOG_CONFIG, PANORAMA_CAMERA, PTZ_CAMERA, SDK_PATH,
     SAFETY_DETECTION_CONFIG, EVENT_PUSHER_CONFIG,
     VOICE_ANNOUNCER_CONFIG, SYSTEM_CONFIG,
     LLM_CONFIG, LLM_SAFETY_CONFIG
@@ -87,6 +87,14 @@ class SafetyMonitorSystem:
         self.camera = None
         self.camera_source = None
         
+        # PTZ球机(安全模式可选联动)
+        self.ptz_camera = None
+        self.calibrator = None
+        self.sdk = None
+        
+        # 双路流管理器(用于PTZ模式下的双流并行)
+        self.stream_manager = None
+        
         # 组件
         self.detector = None          # 安全检测器 (支持 LLM)
         self.llm_analyzer = None      # 大模型安全分析器
@@ -156,6 +164,32 @@ class SafetyMonitorSystem:
             self.camera = None
             logger.info("摄像头功能已禁用")
         
+        # 初始化 PTZ 球机(安全模式可选联动)
+        if self.enable_ptz_camera and self.enable_ptz_tracking:
+            try:
+                from dahua_sdk import DahuaSDK
+                sdk_path = os.path.join(SDK_PATH['lib_path'], SDK_PATH['netsdk'])
+                self.sdk = DahuaSDK(sdk_path)
+                if self.sdk.init():
+                    from ptz_camera import PTZCamera
+                    ptz_config = self.config.get('ptz_camera', PTZ_CAMERA)
+                    self.ptz_camera = PTZCamera(self.sdk, ptz_config)
+                    if self.ptz_camera.connect():
+                        logger.info(f"PTZ球机连接成功: {ptz_config['ip']}")
+                        if self.ptz_camera.start_stream_rtsp():
+                            logger.info("PTZ球机RTSP流启动成功")
+                        else:
+                            logger.warning("PTZ球机RTSP流启动失败,PTZ跟踪将无法进行帧验证")
+                    else:
+                        logger.warning(f"PTZ球机连接失败: {ptz_config['ip']}")
+                        self.ptz_camera = None
+                else:
+                    logger.warning("SDK初始化失败,PTZ功能不可用")
+                    self.sdk = None
+            except Exception as e:
+                logger.warning(f"PTZ球机初始化失败: {e},PTZ跟踪将不可用")
+                self.ptz_camera = None
+        
         # 初始化 LLM 大模型服务
         if self.enable_llm:
             try:
@@ -276,6 +310,19 @@ class SafetyMonitorSystem:
         if self.camera:
             self.camera.disconnect()
         
+        if self.ptz_camera:
+            self.ptz_camera.stop_stream()
+            self.ptz_camera.disconnect()
+        
+        if self.stream_manager:
+            self.stream_manager.stop_all()
+        
+        if self.sdk:
+            try:
+                self.sdk.cleanup()
+            except Exception:
+                pass
+        
         self._print_stats()
         logger.info("安全监控系统已停止")
     

+ 171 - 22
dual_camera_system/video_lock.py

@@ -1,37 +1,186 @@
 """
-全局视频捕获锁
-防止多个VideoCapture并发read()导致FFmpeg async_lock崩溃
+视频捕获锁模块
 
-FFmpeg内部使用共享线程池解码,多个VideoCapture.read()并发调用
-会触发 pthread_frame.c:167 的 async_lock 断言失败导致进程崩溃。
-此模块提供全局锁,序列化所有read()调用。
+提供线程安全的 VideoCapture 操作,防止 FFmpeg 多线程解码崩溃。
+
+改进:从全局锁改为每摄像头独立锁 + 超时机制。
+- 每路摄像头使用独立的锁,不再互相阻塞
+- 超时读帧:如果某路摄像头卡住,不会无限阻塞其他路
+- 健康检查:记录每路流的状态(最近帧时间、连续错误数等)
 """
 
 import threading
+import time
+from typing import Optional, Tuple, Dict
+from dataclasses import dataclass, field
+
+
+@dataclass
+class StreamHealth:
+    """流健康状态"""
+    camera_id: str = ""
+    last_frame_time: float = 0.0       # 最近成功读帧时间
+    consecutive_errors: int = 0         # 连续错误数
+    total_frames: int = 0               # 总帧数
+    total_errors: int = 0               # 总错误数
+    is_healthy: bool = True             # 是否健康
+    avg_read_time_ms: float = 0.0      # 平均读帧耗时(ms)
+
+
+class CameraLockManager:
+    """
+    每摄像头独立锁管理器
+    
+    替代全局锁,每路摄像头使用独立的锁,互不阻塞。
+    FFmpeg 内部的 async_lock 问题仅在多个 VideoCapture 实例
+    并发调用 read() 时触发。独立锁允许同一路的读操作串行化,
+    而不同路之间完全并行。
+    """
+    
+    def __init__(self):
+        self._locks: Dict[str, threading.Lock] = {}
+        self._health: Dict[str, StreamHealth] = {}
+        self._global_lock = threading.Lock()  # 仅用于管理锁的创建
+    
+    def get_lock(self, camera_id: str) -> threading.Lock:
+        """获取或创建指定摄像头的独立锁"""
+        with self._global_lock:
+            if camera_id not in self._locks:
+                self._locks[camera_id] = threading.Lock()
+                self._health[camera_id] = StreamHealth(camera_id=camera_id)
+            return self._locks[camera_id]
+    
+    def get_health(self, camera_id: str) -> StreamHealth:
+        """获取摄像头流健康状态"""
+        with self._global_lock:
+            return self._health.get(camera_id, StreamHealth(camera_id=camera_id))
+    
+    def update_health(self, camera_id: str, success: bool, read_time_ms: float = 0.0):
+        """更新流健康状态"""
+        with self._global_lock:
+            health = self._health.get(camera_id)
+            if health is None:
+                health = StreamHealth(camera_id=camera_id)
+                self._health[camera_id] = health
+            
+            if success:
+                health.last_frame_time = time.time()
+                health.consecutive_errors = 0
+                health.total_frames += 1
+                health.is_healthy = True
+                # 指数移动平均计算读帧耗时
+                alpha = 0.1
+                health.avg_read_time_ms = alpha * read_time_ms + (1 - alpha) * health.avg_read_time_ms
+            else:
+                health.consecutive_errors += 1
+                health.total_errors += 1
+                if health.consecutive_errors > 50:
+                    health.is_healthy = False
+
+
+# 全局锁管理器实例
+_manager = CameraLockManager()
+
+
+def safe_read(cap, camera_id: str = "default", timeout: float = 5.0) -> Tuple[bool, Optional[object]]:
+    """
+    线程安全的 VideoCapture.read(),使用每摄像头独立锁。
+    
+    Args:
+        cap: cv2.VideoCapture 实例
+        camera_id: 摄像头标识,用于区分不同路的锁
+        timeout: 读帧超时(秒),超时返回 (False, None)
+    
+    Returns:
+        (ret, frame) 与 cap.read() 相同格式
+    """
+    lock = _manager.get_lock(camera_id)
+    acquired = lock.acquire(timeout=timeout)
+    if not acquired:
+        _manager.update_health(camera_id, False)
+        return (False, None)
+    
+    try:
+        start_time = time.time()
+        ret, frame = cap.read()
+        read_time_ms = (time.time() - start_time) * 1000
+        _manager.update_health(camera_id, ret, read_time_ms)
+        
+        if not ret or frame is None:
+            return (False, None)
+        return (ret, frame)
+    except Exception:
+        _manager.update_health(camera_id, False)
+        return (False, None)
+    finally:
+        lock.release()
+
+
+def safe_is_opened(cap, camera_id: str = "default") -> bool:
+    """
+    线程安全的 VideoCapture.isOpened() 检查
+    """
+    lock = _manager.get_lock(camera_id)
+    acquired = lock.acquire(timeout=2.0)
+    if not acquired:
+        return False
+    try:
+        return cap.isOpened()
+    except Exception:
+        return False
+    finally:
+        lock.release()
+
+
+def get_stream_health(camera_id: str) -> StreamHealth:
+    """获取指定摄像头流的健康状态"""
+    return _manager.get_health(camera_id)
+
+
+def get_all_health() -> Dict[str, StreamHealth]:
+    """获取所有摄像头流的健康状态"""
+    with _manager._global_lock:
+        return dict(_manager._health)
+
+
+# ============================================================
+# 向后兼容:保留全局锁接口,但内部使用更安全的超时机制
+# ============================================================
 
-# 全局锁:所有VideoCapture.read()必须持有此锁
-_ffmpeg_lock = threading.Lock()
+# 全局锁(向后兼容,用于不指定 camera_id 的场景)
+_compatibility_lock = threading.Lock()
 
 
-def safe_read(cap) -> tuple:
+def safe_read_global(cap) -> Tuple[bool, Optional[object]]:
     """
-    线程安全的VideoCapture.read()
-    持有全局锁调用cap.read(),防止FFmpeg并发崩溃。
-    如果发生异常,返回(False, None)而非让异常传播导致线程退出。
+    全局锁版本的 safe_read(向后兼容)
+    新代码应使用 safe_read(cap, camera_id) 代替
     """
-    with _ffmpeg_lock:
-        try:
-            return cap.read()
-        except Exception:
+    acquired = _compatibility_lock.acquire(timeout=5.0)
+    if not acquired:
+        return (False, None)
+    try:
+        ret, frame = cap.read()
+        if not ret or frame is None:
             return (False, None)
+        return (ret, frame)
+    except Exception:
+        return (False, None)
+    finally:
+        _compatibility_lock.release()
 
 
-def safe_is_opened(cap) -> bool:
+def safe_is_opened_global(cap) -> bool:
     """
-    线程安全的VideoCapture.isOpened()检查
+    全局锁版本的 safe_is_opened(向后兼容)
+    新代码应使用 safe_is_opened(cap, camera_id) 代替
     """
-    with _ffmpeg_lock:
-        try:
-            return cap.isOpened()
-        except Exception:
-            return False
+    acquired = _compatibility_lock.acquire(timeout=2.0)
+    if not acquired:
+        return False
+    try:
+        return cap.isOpened()
+    except Exception:
+        return False
+    finally:
+        _compatibility_lock.release()