فهرست منبع

fix(rtsp): 修复FFmpeg多线程解码崩溃问题并增强RTSP流稳定性

在视频流工作线程中设置单线程模式防止FFmpeg内部断言错误
添加RTSP流重连机制和错误计数处理
屏蔽工作线程的SIGINT信号由主线程处理
wenhongquan 4 روز پیش
والد
کامیت
39eb58c74c
2فایلهای تغییر یافته به همراه116 افزوده شده و 16 حذف شده
  1. 62 12
      dual_camera_system/panorama_camera.py
  2. 54 4
      dual_camera_system/ptz_camera.py

+ 62 - 12
dual_camera_system/panorama_camera.py

@@ -3,12 +3,16 @@
 负责获取视频流和物体检测
 """
 
+import os
+# 必须在导入cv2之前设置,防止FFmpeg多线程解码崩溃
+# pthread_frame.c:167 async_lock assertion
+os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
+
 import cv2
 import numpy as np
 import threading
 import queue
 import time
-import os
 from typing import Optional, List, Tuple, Dict, Any
 from dataclasses import dataclass
 
@@ -121,18 +125,11 @@ class PanoramaCamera:
             rtsp_url = self.config.get('rtsp_url') or f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/h264/ch{self.config['channel']}/main/av_stream"
         
         try:
-            # 使用 FFmpeg 单线程模式避免线程安全崩溃
-            # 设置环境变量禁用多线程解码
-            import os
-            os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
-            
-            # 使用 CAP_FFMPEG 后端并设置单线程
             self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
             if not self.rtsp_cap.isOpened():
                 print(f"无法打开RTSP流: {rtsp_url}")
                 return False
             
-            # 设置缓冲区大小为1,减少延迟
             self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
             
             self.running = True
@@ -209,14 +206,30 @@ class PanoramaCamera:
                     time.sleep(0.1)
                 
             except Exception as e:
-                print(f"视频流错误: {e}")
-                time.sleep(0.1)
+                err_str = str(e)
+                if 'async_lock' in err_str or 'Assertion' in err_str:
+                    print(f"视频流FFmpeg内部错误,重建连接: {e}")
+                    self._reconnect_rtsp()
+                else:
+                    print(f"视频流错误: {e}")
+                time.sleep(0.5)
     
     def _build_rtsp_url(self) -> str:
         return self.config.get('rtsp_url') or f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/h264/ch{self.config['channel']}/main/av_stream"
     
     def _rtsp_stream_worker(self):
         """RTSP视频流工作线程"""
+        import signal
+        # 屏蔽SIGINT在此线程,由主线程处理
+        if hasattr(signal, 'pthread_sigmask'):
+            try:
+                signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
+            except (AttributeError, OSError):
+                pass
+        
+        max_consecutive_errors = 50
+        error_count = 0
+        
         while self.running:
             try:
                 if self.rtsp_cap is None or not self.rtsp_cap.isOpened():
@@ -225,9 +238,16 @@ class PanoramaCamera:
                 
                 ret, frame = self.rtsp_cap.read()
                 if not ret or frame is None:
+                    error_count += 1
+                    if error_count > max_consecutive_errors:
+                        print(f"全景RTSP流连续{max_consecutive_errors}次读取失败,尝试重连...")
+                        self._reconnect_rtsp()
+                        error_count = 0
                     time.sleep(0.01)
                     continue
                 
+                error_count = 0
+                
                 with self.frame_lock:
                     self.current_frame = frame.copy()
                 
@@ -237,8 +257,38 @@ class PanoramaCamera:
                     pass
                     
             except Exception as e:
-                print(f"RTSP视频流错误: {e}")
-                time.sleep(0.1)
+                err_str = str(e)
+                if 'async_lock' in err_str or 'Assertion' in err_str:
+                    print(f"全景RTSP流FFmpeg内部错误,3秒后重建连接: {e}")
+                    time.sleep(3)
+                    self._reconnect_rtsp()
+                else:
+                    print(f"全景RTSP视频流错误: {e}")
+                    time.sleep(0.5)
+    
+    def _reconnect_rtsp(self):
+        """重建RTSP连接"""
+        rtsp_url = self._build_rtsp_url()
+        if self.rtsp_cap is not None:
+            try:
+                self.rtsp_cap.release()
+            except Exception:
+                pass
+            self.rtsp_cap = None
+        
+        time.sleep(1)
+        
+        try:
+            self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
+            if self.rtsp_cap.isOpened():
+                self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
+                print("全景RTSP流重连成功")
+            else:
+                print("全景RTSP流重连失败")
+                self.rtsp_cap = None
+        except Exception as e:
+            print(f"全景RTSP流重连异常: {e}")
+            self.rtsp_cap = None
     
     def stop_stream(self):
         """停止视频流"""

+ 54 - 4
dual_camera_system/ptz_camera.py

@@ -3,11 +3,14 @@
 负责PTZ控制、精确定位和视频流获取
 """
 
+import os
+# 必须在导入cv2之前设置,防止FFmpeg多线程解码崩溃
+os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
+
 import math
 import time
 import threading
 import queue
-import os
 from typing import Optional, Tuple, Dict
 from dataclasses import dataclass
 
@@ -95,7 +98,6 @@ class PTZCamera:
                 f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/cam/realmonitor?channel=1&subtype=1"
         
         try:
-            os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
             self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
             if not self.rtsp_cap.isOpened():
                 print(f"[PTZCamera] 无法打开RTSP流: {rtsp_url}")
@@ -114,6 +116,16 @@ class PTZCamera:
     
     def _stream_worker(self):
         """视频流工作线程"""
+        import signal
+        if hasattr(signal, 'pthread_sigmask'):
+            try:
+                signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
+            except (AttributeError, OSError):
+                pass
+        
+        max_consecutive_errors = 50
+        error_count = 0
+        
         while self.running_stream:
             try:
                 if self.rtsp_cap is None or not self.rtsp_cap.isOpened():
@@ -122,16 +134,54 @@ class PTZCamera:
                 
                 ret, frame = self.rtsp_cap.read()
                 if not ret or frame is None:
+                    error_count += 1
+                    if error_count > max_consecutive_errors:
+                        print("[PTZCamera] RTSP流连续读取失败,尝试重连...")
+                        self._reconnect_rtsp()
+                        error_count = 0
                     time.sleep(0.01)
                     continue
                 
+                error_count = 0
+                
                 with self.frame_lock:
                     self.current_frame = frame.copy()
                 
                 time.sleep(0.001)
             except Exception as e:
-                print(f"[PTZCamera] 视频流错误: {e}")
-                time.sleep(0.1)
+                err_str = str(e)
+                if 'async_lock' in err_str or 'Assertion' in err_str:
+                    print(f"[PTZCamera] FFmpeg内部错误,3秒后重建连接: {e}")
+                    time.sleep(3)
+                    self._reconnect_rtsp()
+                else:
+                    print(f"[PTZCamera] 视频流错误: {e}")
+                    time.sleep(0.5)
+    
+    def _reconnect_rtsp(self):
+        rtsp_url = self.config.get('rtsp_url') or \
+            f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/cam/realmonitor?channel=1&subtype=1"
+        
+        if self.rtsp_cap is not None:
+            try:
+                self.rtsp_cap.release()
+            except Exception:
+                pass
+            self.rtsp_cap = None
+        
+        time.sleep(1)
+        
+        try:
+            self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
+            if self.rtsp_cap.isOpened():
+                self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
+                print("[PTZCamera] RTSP流重连成功")
+            else:
+                print("[PTZCamera] RTSP流重连失败")
+                self.rtsp_cap = None
+        except Exception as e:
+            print(f"[PTZCamera] RTSP流重连异常: {e}")
+            self.rtsp_cap = None
     
     def get_frame(self) -> Optional[np.ndarray]:
         """获取球机当前帧"""