""" 全景摄像头模块 负责获取视频流和物体检测 """ import os # 必须在导入cv2之前设置,防止FFmpeg多线程解码崩溃 # pthread_frame.c:167 async_lock assertion os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1' import cv2 import numpy as np import threading import queue import time from typing import Optional, List, Tuple, Dict, Any from dataclasses import dataclass from config import PANORAMA_CAMERA, DETECTION_CONFIG from dahua_sdk import DahuaSDK, PTZCommand from video_lock import safe_read, safe_is_opened @dataclass class DetectedObject: """检测到的物体""" class_name: str # 类别名称 confidence: float # 置信度 bbox: Tuple[int, int, int, int] # 边界框 (x, y, width, height) center: Tuple[int, int] # 中心点坐标 track_id: Optional[int] = None # 跟踪ID class PanoramaCamera: """全景摄像头类""" def __init__(self, sdk: DahuaSDK, camera_config: Dict = None): """ 初始化全景摄像头 Args: sdk: 大华SDK实例 camera_config: 摄像头配置 """ self.sdk = sdk self.config = camera_config or PANORAMA_CAMERA self.login_handle = None self.play_handle = None self.connected = False # 视频流 self.frame_queue = queue.Queue(maxsize=10) self.current_frame = None self.frame_lock = threading.Lock() self.rtsp_cap = None # RTSP视频捕获 self._camera_id = 'panorama' # 用于per-camera锁 # 检测器 self.detector = None # 控制标志 self.running = False self.stream_thread = None # 断线重连 self.auto_reconnect = True self.reconnect_interval = 5.0 # 重连间隔(秒) self.max_reconnect_attempts = 3 # 最大重连次数 def connect(self) -> bool: """ 连接摄像头 Returns: 是否成功 """ login_handle, error = self.sdk.login( self.config['ip'], self.config['port'], self.config['username'], self.config['password'] ) if login_handle is None: print(f"连接全景摄像头失败: IP={self.config['ip']}, 错误码={error}") return False self.login_handle = login_handle self.connected = True print(f"成功连接全景摄像头: {self.config['ip']}") return True def disconnect(self): """断开连接""" self.stop_stream() if self.login_handle: self.sdk.logout(self.login_handle) self.login_handle = None self.connected = False def start_stream(self) -> bool: """ 开始视频流 Returns: 是否成功 """ if not self.connected: return False self.play_handle = self.sdk.real_play( self.login_handle, self.config['channel'] ) if self.play_handle is None: print("启动视频流失败") return False self.running = True self.stream_thread = threading.Thread(target=self._stream_worker, daemon=True) self.stream_thread.start() print("视频流已启动") return True def start_stream_rtsp(self, rtsp_url: str = None) -> bool: if rtsp_url is None: rtsp_url = self.config.get('rtsp_url') or f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/h264/ch{self.config['channel']}/main/av_stream" try: # 先尝试FFmpeg后端 self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) if not self.rtsp_cap.isOpened(): # FFmpeg失败,尝试GStreamer后端 print(f"FFmpeg后端无法打开RTSP流,尝试GStreamer后端...") try: gst_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_GSTREAMER) if gst_cap.isOpened(): self.rtsp_cap = gst_cap print(f"使用GStreamer后端打开RTSP流成功") else: print(f"无法打开RTSP流: {rtsp_url}") return False except Exception as ge: print(f"GStreamer后端也不可用: {ge}") return False self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.running = True self.stream_thread = threading.Thread(target=self._rtsp_stream_worker, daemon=True) self.stream_thread.start() print(f"RTSP视频流已启动: {rtsp_url}") return True except Exception as e: print(f"RTSP流启动失败: {e}") return False def _stream_worker(self): """视频流工作线程 (SDK模式)""" retry_count = 0 max_retries = 10 while self.running: try: # 尝试从 SDK 帧缓冲区获取帧 (如果可用) frame_buffer = self.sdk.get_video_frame_buffer(self.config['channel']) if frame_buffer: frame_info = frame_buffer.get(timeout=0.1) if frame_info and frame_info.get('data'): # 解码帧数据 (如果需要) # 注意: SDK回调返回的是编码数据,需要解码 # 这里暂时跳过,因为解码需要额外处理 pass # RTSP 模式获取帧 (推荐方式) if self.rtsp_cap is not None and safe_is_opened(self.rtsp_cap, self._camera_id): ret, frame = safe_read(self.rtsp_cap, self._camera_id) if ret and frame is not None: with self.frame_lock: self.current_frame = frame.copy() try: self.frame_queue.put(frame.copy(), block=False) except queue.Full: pass retry_count = 0 # 重置重试计数 time.sleep(0.001) # 减少CPU占用 continue # 如果 RTSP 不可用,尝试自动连接 if retry_count < max_retries: rtsp_url = self._build_rtsp_url() try: if self.rtsp_cap is None: self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 减少缓冲延迟 if safe_is_opened(self.rtsp_cap, self._camera_id): retry_count = 0 continue except Exception as e: pass retry_count += 1 time.sleep(1.0) # 重试间隔 else: # 超过最大重试次数,使用模拟帧 frame = np.zeros((1080, 1920, 3), dtype=np.uint8) with self.frame_lock: self.current_frame = frame try: self.frame_queue.put(frame, block=False) except queue.Full: pass time.sleep(0.1) except Exception as e: err_str = str(e) if 'async_lock' in err_str or 'Assertion' in err_str: print(f"视频流FFmpeg内部错误,重建连接: {e}") self._reconnect_rtsp() else: print(f"视频流错误: {e}") time.sleep(0.5) def _build_rtsp_url(self) -> str: return self.config.get('rtsp_url') or f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/h264/ch{self.config['channel']}/main/av_stream" def _rtsp_stream_worker(self): """RTSP视频流工作线程""" import signal # 屏蔽SIGINT在此线程,由主线程处理 if hasattr(signal, 'pthread_sigmask'): try: signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT}) except (AttributeError, OSError): pass max_consecutive_errors = 50 error_count = 0 while self.running: try: if self.rtsp_cap is None or not safe_is_opened(self.rtsp_cap, self._camera_id): time.sleep(0.1) continue ret, frame = safe_read(self.rtsp_cap, self._camera_id) if not ret or frame is None: error_count += 1 if error_count > max_consecutive_errors: print(f"全景RTSP流连续{max_consecutive_errors}次读取失败,尝试重连...") self._reconnect_rtsp() error_count = 0 time.sleep(0.01) continue error_count = 0 with self.frame_lock: self.current_frame = frame.copy() try: self.frame_queue.put(frame, block=False) except queue.Full: pass except Exception as e: err_str = str(e) if 'async_lock' in err_str or 'Assertion' in err_str: print(f"全景RTSP流FFmpeg内部错误,3秒后重建连接: {e}") time.sleep(3) self._reconnect_rtsp() else: print(f"全景RTSP视频流错误: {e}") time.sleep(0.5) def _reconnect_rtsp(self): """重建RTSP连接""" rtsp_url = self._build_rtsp_url() if self.rtsp_cap is not None: try: self.rtsp_cap.release() except Exception: pass self.rtsp_cap = None time.sleep(1) try: self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) if safe_is_opened(self.rtsp_cap, self._camera_id): self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) print("全景RTSP流重连成功") else: print("全景RTSP流重连失败") self.rtsp_cap = None except Exception as e: print(f"全景RTSP流重连异常: {e}") self.rtsp_cap = None def stop_stream(self): """停止视频流""" self.running = False if self.stream_thread: self.stream_thread.join(timeout=2) if self.play_handle: self.sdk.stop_real_play(self.play_handle) self.play_handle = None if self.rtsp_cap: self.rtsp_cap.release() self.rtsp_cap = None def get_frame(self) -> Optional[np.ndarray]: """ 获取当前帧 Returns: 当前帧图像 """ with self.frame_lock: return self.current_frame.copy() if self.current_frame is not None else None def get_frame_from_queue(self, timeout: float = 0.1) -> Optional[np.ndarray]: """ 从帧队列获取帧 (用于批量处理) Args: timeout: 等待超时时间 Returns: 帧图像或None """ try: return self.frame_queue.get(timeout=timeout) except: return None def get_frame_buffer(self, count: int = 5) -> List[np.ndarray]: """ 获取帧缓冲 (用于运动检测等需要多帧的场景) Args: count: 获取帧数 Returns: 帧列表 """ frames = [] while len(frames) < count: frame = self.get_frame_from_queue(timeout=0.05) if frame is not None: frames.append(frame) else: break return frames def set_detector(self, detector): """设置物体检测器""" self.detector = detector def detect_objects(self, frame: np.ndarray = None) -> List[DetectedObject]: """ 检测物体 Args: frame: 输入帧,如果为None则使用当前帧 Returns: 检测到的物体列表 """ if frame is None: frame = self.get_frame() if frame is None or self.detector is None: return [] return self.detector.detect(frame) def get_detection_position(self, obj: DetectedObject, frame_size: Tuple[int, int]) -> Tuple[float, float]: """ 获取检测物体在画面中的相对位置 Args: obj: 检测到的物体 frame_size: 画面尺寸 (width, height) Returns: 相对位置 (x_ratio, y_ratio) 范围0-1 """ width, height = frame_size x_ratio = obj.center[0] / width y_ratio = obj.center[1] / height return (x_ratio, y_ratio) class ObjectDetector: """ 物体检测器 使用YOLO11模型进行人体检测 支持 YOLO (.pt), RKNN (.rknn), ONNX (.onnx) 模型 """ def __init__(self, model_path: str = None, use_gpu: bool = True, model_size: str = 'n', model_type: str = 'auto'): """ 初始化检测器 Args: model_path: 模型路径 (支持 .pt, .rknn, .onnx) use_gpu: 是否使用GPU model_size: 模型尺寸 ('n', 's', 'm', 'l', 'x') - 仅 YOLO 模型有效 model_type: 模型类型 ('auto', 'yolo', 'rknn', 'onnx') """ self.model = None self.rknn_detector = None self.model_path = model_path self.use_gpu = use_gpu self.model_size = model_size self.model_type = model_type self.config = DETECTION_CONFIG self.device = 'cuda:0' if use_gpu else 'cpu' # 根据扩展名自动判断模型类型 if model_path: ext = os.path.splitext(model_path)[1].lower() if ext == '.rknn': self.model_type = 'rknn' elif ext == '.onnx': self.model_type = 'onnx' elif ext == '.pt': self.model_type = 'yolo' self._load_model() def _load_model(self): """加载检测模型""" if self.model_type == 'rknn': self._load_rknn_model() elif self.model_type == 'onnx': self._load_onnx_model() else: self._load_yolo_model() def _load_rknn_model(self): """加载 RKNN 模型""" if not self.model_path: raise ValueError("RKNN 模型需要指定 model_path") try: from rknnlite.api import RKNNLite self.rknn = RKNNLite() ret = self.rknn.load_rknn(self.model_path) if ret != 0: raise RuntimeError(f"加载 RKNN 模型失败: {self.model_path}") ret = self.rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2) if ret != 0: raise RuntimeError(f"初始化 RKNN 运行时失败") print(f"RKNN 模型加载成功: {self.model_path}") except ImportError: raise ImportError("未安装 rknnlite,请运行: pip install rknnlite2") def _load_onnx_model(self): """加载 ONNX 模型""" if not self.model_path: raise ValueError("ONNX 模型需要指定 model_path") try: import onnxruntime as ort self.session = ort.InferenceSession(self.model_path) self.input_name = self.session.get_inputs()[0].name self.output_name = self.session.get_outputs()[0].name print(f"ONNX 模型加载成功: {self.model_path}") except ImportError: raise ImportError("未安装 onnxruntime,请运行: pip install onnxruntime") def _load_yolo_model(self): """加载YOLO11检测模型""" try: from ultralytics import YOLO if self.model_path: self.model = YOLO(self.model_path) else: model_name = f'yolo11{self.model_size}.pt' self.model = YOLO(model_name) dummy = np.zeros((640, 640, 3), dtype=np.uint8) self.model(dummy, device=self.device, verbose=False) print(f"成功加载YOLO11检测模型 (device={self.device})") except ImportError: print("未安装ultralytics,请运行: pip install ultralytics") self._load_opencv_model() except Exception as e: print(f"加载YOLO11模型失败: {e}") self._load_opencv_model() def _load_opencv_model(self): """使用OpenCV加载模型""" pass def _letterbox(self, image, size=(640, 640)): """Letterbox 预处理""" h0, w0 = image.shape[:2] ih, iw = size scale = min(iw / w0, ih / h0) new_w, new_h = int(w0 * scale), int(h0 * scale) pad_w = (iw - new_w) // 2 pad_h = (ih - new_h) // 2 resized = cv2.resize(image, (new_w, new_h)) canvas = np.full((ih, iw, 3), 114, dtype=np.uint8) canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized return canvas, scale, pad_w, pad_h, h0, w0 def _detect_rknn(self, frame: np.ndarray) -> List[DetectedObject]: """使用 RKNN/ONNX 模型检测""" results = [] try: canvas, scale, pad_w, pad_h, h0, w0 = self._letterbox(frame) if hasattr(self, 'rknn'): # RKNN img = canvas[..., ::-1].astype(np.float32) / 255.0 blob = img[None, ...] outputs = self.rknn.inference(inputs=[blob]) else: # ONNX img = canvas[..., ::-1].astype(np.float32) / 255.0 img = img.transpose(2, 0, 1) blob = img[None, ...] outputs = self.session.run([self.output_name], {self.input_name: blob}) output = outputs[0] if len(output.shape) == 3: output = output[0] num_boxes = output.shape[1] conf_threshold = self.config['confidence_threshold'] for i in range(num_boxes): x_center = float(output[0, i]) y_center = float(output[1, i]) width = float(output[2, i]) height = float(output[3, i]) class_probs = output[4:, i] best_class = int(np.argmax(class_probs)) confidence = float(class_probs[best_class]) if confidence < conf_threshold: continue # 转换到原始图像坐标 x1 = int(((x_center - width / 2) - pad_w) / scale) y1 = int(((y_center - height / 2) - pad_h) / scale) x2 = int(((x_center + width / 2) - pad_w) / scale) y2 = int(((y_center + height / 2) - pad_h) / scale) x1 = max(0, min(w0, x1)) y1 = max(0, min(h0, y1)) x2 = max(0, min(w0, x2)) y2 = max(0, min(h0, y2)) if x2 - x1 < 10 or y2 - y1 < 10: continue # 使用配置的类别映射获取类别名称 class_map = self.config.get('class_map', {0: 'person', 3: '人'}) cls_name = class_map.get(best_class, str(best_class)) # 检查是否为目标类别 if cls_name not in self.config['target_classes']: continue obj = DetectedObject( class_name=cls_name, confidence=confidence, bbox=(x1, y1, x2 - x1, y2 - y1), center=((x1 + x2) // 2, (y1 + y2) // 2) ) results.append(obj) except Exception as e: print(f"RKNN/ONNX 检测错误: {e}") return results def detect(self, frame: np.ndarray) -> List[DetectedObject]: """ 使用YOLO11检测物体 Args: frame: 输入图像 Returns: 检测结果列表 """ if frame is None: return [] # 优先使用 RKNN/ONNX 模型 if hasattr(self, 'rknn') and self.rknn is not None: return self._detect_rknn(frame) elif hasattr(self, 'session') and self.session is not None: return self._detect_rknn(frame) # 使用 YOLO 模型 elif self.model is not None: return self._detect_yolo(frame) else: print("[错误] 没有可用的检测模型") return [] def _detect_yolo(self, frame: np.ndarray) -> List[DetectedObject]: """使用 YOLO 模型检测""" results = [] try: detections = self.model( frame, device=self.device, verbose=False, conf=self.config['confidence_threshold'] ) for det in detections: boxes = det.boxes if boxes is None: continue for i in range(len(boxes)): cls_id = int(boxes.cls[i]) cls_name = det.names[cls_id] if cls_name not in self.config['target_classes']: continue conf = float(boxes.conf[i]) xyxy = boxes.xyxy[i].cpu().numpy() x1, y1, x2, y2 = map(int, xyxy) width = x2 - x1 height = y2 - y1 if width < 10 or height < 10: continue center_x = x1 + width // 2 center_y = y1 + height // 2 obj = DetectedObject( class_name=cls_name, confidence=conf, bbox=(x1, y1, width, height), center=(center_x, center_y) ) results.append(obj) except Exception as e: print(f"YOLO11检测错误: {e}") return results def detect_with_keypoints(self, frame: np.ndarray) -> List[DetectedObject]: """ 使用YOLO11-pose检测人体并返回关键点 Args: frame: 输入图像 Returns: 带关键点的检测结果列表 """ return self.detect(frame) def detect_persons(self, frame: np.ndarray) -> List[DetectedObject]: """ 检测人体 Args: frame: 输入图像 Returns: 检测到的人体列表 """ results = self.detect(frame) return [obj for obj in results if obj.class_name == 'person'] def release(self): """释放模型资源""" if hasattr(self, 'rknn') and self.rknn: self.rknn.release() self.rknn = None self.model = None self.session = None class PersonTracker: """ 人体跟踪器 使用简单的质心跟踪算法 """ def __init__(self, max_disappeared: int = 30): """ 初始化跟踪器 Args: max_disappeared: 最大消失帧数 """ self.max_disappeared = max_disappeared self.next_id = 0 self.objects = {} # id -> center self.disappeared = {} # id -> disappeared count def update(self, detections: List[DetectedObject]) -> List[DetectedObject]: """ 更新跟踪状态 Args: detections: 当前帧检测结果 Returns: 带有跟踪ID的检测结果 """ # 如果没有检测结果 if len(detections) == 0: # 标记所有已跟踪对象为消失 for obj_id in list(self.disappeared.keys()): self.disappeared[obj_id] += 1 if self.disappeared[obj_id] > self.max_disappeared: self._deregister(obj_id) return [] # 计算当前检测中心点 input_centers = np.array([d.center for d in detections]) # 如果没有已跟踪对象 if len(self.objects) == 0: for det in detections: self._register(det) else: # 计算距离矩阵 object_ids = list(self.objects.keys()) object_centers = np.array([self.objects[obj_id] for obj_id in object_ids]) # 计算欧氏距离 distances = np.linalg.norm( object_centers[:, np.newaxis] - input_centers, axis=2 ) # 匈牙利算法匹配 (简化版: 贪心匹配) rows = distances.min(axis=1).argsort() cols = distances.argmin(axis=1)[rows] used_rows = set() used_cols = set() for (row, col) in zip(rows, cols): if row in used_rows or col in used_cols: continue obj_id = object_ids[row] self.objects[obj_id] = input_centers[col] self.disappeared[obj_id] = 0 detections[col].track_id = obj_id used_rows.add(row) used_cols.add(col) # 处理未匹配的已跟踪对象 unused_rows = set(range(len(object_ids))) - used_rows for row in unused_rows: obj_id = object_ids[row] self.disappeared[obj_id] += 1 if self.disappeared[obj_id] > self.max_disappeared: self._deregister(obj_id) # 处理未匹配的新检测 unused_cols = set(range(len(input_centers))) - used_cols for col in unused_cols: self._register(detections[col]) return [d for d in detections if d.track_id is not None] def _register(self, detection: DetectedObject): """注册新对象""" detection.track_id = self.next_id self.objects[self.next_id] = detection.center self.disappeared[self.next_id] = 0 self.next_id += 1 def _deregister(self, obj_id: int): """注销对象""" del self.objects[obj_id] del self.disappeared[obj_id]