""" 全景摄像头模块 负责获取视频流和物体检测 """ import os # 必须在导入cv2之前设置,防止FFmpeg多线程解码崩溃 # pthread_frame.c:167 async_lock assertion os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1' import cv2 import numpy as np import threading import queue import time import logging from datetime import datetime from typing import Optional, List, Tuple, Dict, Any from dataclasses import dataclass from pathlib import Path from config import PANORAMA_CAMERA, DETECTION_CONFIG from dahua_sdk import DahuaSDK, PTZCommand from video_lock import safe_read, safe_is_opened logger = logging.getLogger(__name__) @dataclass class DetectedObject: """检测到的物体""" class_name: str # 类别名称 confidence: float # 置信度 bbox: Tuple[int, int, int, int] # 边界框 (x, y, width, height) center: Tuple[int, int] # 中心点坐标 track_id: Optional[int] = None # 跟踪ID class PanoramaCamera: """全景摄像头类""" def __init__(self, sdk: DahuaSDK, camera_config: Dict = None): """ 初始化全景摄像头 Args: sdk: 大华SDK实例 camera_config: 摄像头配置 """ self.sdk = sdk self.config = camera_config or PANORAMA_CAMERA self.login_handle = None self.play_handle = None self.connected = False # 视频流 self.frame_queue = queue.Queue(maxsize=10) self.current_frame = None self.frame_lock = threading.Lock() self.rtsp_cap = None # RTSP视频捕获 self._camera_id = 'panorama' # 用于per-camera锁 # 检测器 self.detector = None # 控制标志 self.running = False self.stream_thread = None # 断线重连 self.auto_reconnect = True self.reconnect_interval = 5.0 # 重连间隔(秒) self.max_reconnect_attempts = 3 # 最大重连次数 def connect(self) -> bool: """ 连接摄像头 Returns: 是否成功 """ login_handle, error = self.sdk.login( self.config['ip'], self.config['port'], self.config['username'], self.config['password'] ) if login_handle is None: print(f"连接全景摄像头失败: IP={self.config['ip']}, 错误码={error}") return False self.login_handle = login_handle self.connected = True print(f"成功连接全景摄像头: {self.config['ip']}") return True def disconnect(self): """断开连接""" self.stop_stream() if self.login_handle: self.sdk.logout(self.login_handle) self.login_handle = None self.connected = False def start_stream(self) -> bool: """ 开始视频流 Returns: 是否成功 """ if not self.connected: return False self.play_handle = self.sdk.real_play( self.login_handle, self.config['channel'] ) if self.play_handle is None: print("启动视频流失败") return False self.running = True self.stream_thread = threading.Thread(target=self._stream_worker, daemon=True) self.stream_thread.start() print("视频流已启动") return True def start_stream_rtsp(self, rtsp_url: str = None) -> bool: if rtsp_url is None: rtsp_url = self.config.get('rtsp_url') or f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/h264/ch{self.config['channel']}/main/av_stream" try: # 先尝试FFmpeg后端 self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) if not self.rtsp_cap.isOpened(): # FFmpeg失败,尝试GStreamer后端 print(f"FFmpeg后端无法打开RTSP流,尝试GStreamer后端...") try: gst_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_GSTREAMER) if gst_cap.isOpened(): self.rtsp_cap = gst_cap print(f"使用GStreamer后端打开RTSP流成功") else: print(f"无法打开RTSP流: {rtsp_url}") return False except Exception as ge: print(f"GStreamer后端也不可用: {ge}") return False self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.running = True self.stream_thread = threading.Thread(target=self._rtsp_stream_worker, daemon=True) self.stream_thread.start() print(f"RTSP视频流已启动: {rtsp_url}") return True except Exception as e: print(f"RTSP流启动失败: {e}") return False def _stream_worker(self): """视频流工作线程 (SDK模式)""" retry_count = 0 max_retries = 10 while self.running: try: # 尝试从 SDK 帧缓冲区获取帧 (如果可用) frame_buffer = self.sdk.get_video_frame_buffer(self.config['channel']) if frame_buffer: frame_info = frame_buffer.get(timeout=0.1) if frame_info and frame_info.get('data'): # 解码帧数据 (如果需要) # 注意: SDK回调返回的是编码数据,需要解码 # 这里暂时跳过,因为解码需要额外处理 pass # RTSP 模式获取帧 (推荐方式) if self.rtsp_cap is not None and safe_is_opened(self.rtsp_cap, self._camera_id): ret, frame = safe_read(self.rtsp_cap, self._camera_id) if ret and frame is not None: with self.frame_lock: self.current_frame = frame.copy() try: self.frame_queue.put(frame.copy(), block=False) except queue.Full: pass retry_count = 0 # 重置重试计数 time.sleep(0.001) # 减少CPU占用 continue # 如果 RTSP 不可用,尝试自动连接 if retry_count < max_retries: rtsp_url = self._build_rtsp_url() try: if self.rtsp_cap is None: self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 减少缓冲延迟 if safe_is_opened(self.rtsp_cap, self._camera_id): retry_count = 0 continue except Exception as e: pass retry_count += 1 time.sleep(1.0) # 重试间隔 else: # 超过最大重试次数,使用模拟帧 frame = np.zeros((1080, 1920, 3), dtype=np.uint8) with self.frame_lock: self.current_frame = frame try: self.frame_queue.put(frame, block=False) except queue.Full: pass time.sleep(0.1) except Exception as e: err_str = str(e) if 'async_lock' in err_str or 'Assertion' in err_str: print(f"视频流FFmpeg内部错误,重建连接: {e}") self._reconnect_rtsp() else: print(f"视频流错误: {e}") time.sleep(0.5) def _build_rtsp_url(self) -> str: return self.config.get('rtsp_url') or f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/h264/ch{self.config['channel']}/main/av_stream" def _rtsp_stream_worker(self): """RTSP视频流工作线程""" import signal # 屏蔽SIGINT在此线程,由主线程处理 if hasattr(signal, 'pthread_sigmask'): try: signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT}) except (AttributeError, OSError): pass max_consecutive_errors = 50 error_count = 0 while self.running: try: if self.rtsp_cap is None or not safe_is_opened(self.rtsp_cap, self._camera_id): time.sleep(0.1) continue ret, frame = safe_read(self.rtsp_cap, self._camera_id) if not ret or frame is None: error_count += 1 if error_count > max_consecutive_errors: print(f"全景RTSP流连续{max_consecutive_errors}次读取失败,尝试重连...") self._reconnect_rtsp() error_count = 0 time.sleep(0.01) continue error_count = 0 with self.frame_lock: self.current_frame = frame.copy() try: self.frame_queue.put(frame, block=False) except queue.Full: pass except Exception as e: err_str = str(e) if 'async_lock' in err_str or 'Assertion' in err_str: print(f"全景RTSP流FFmpeg内部错误,3秒后重建连接: {e}") time.sleep(3) self._reconnect_rtsp() else: print(f"全景RTSP视频流错误: {e}") time.sleep(0.5) def _reconnect_rtsp(self): """重建RTSP连接""" rtsp_url = self._build_rtsp_url() if self.rtsp_cap is not None: try: self.rtsp_cap.release() except Exception: pass self.rtsp_cap = None time.sleep(1) try: self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) if safe_is_opened(self.rtsp_cap, self._camera_id): self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) print("全景RTSP流重连成功") else: print("全景RTSP流重连失败") self.rtsp_cap = None except Exception as e: print(f"全景RTSP流重连异常: {e}") self.rtsp_cap = None def stop_stream(self): """停止视频流""" self.running = False if self.stream_thread: self.stream_thread.join(timeout=2) if self.play_handle: self.sdk.stop_real_play(self.play_handle) self.play_handle = None if self.rtsp_cap: self.rtsp_cap.release() self.rtsp_cap = None def get_frame(self) -> Optional[np.ndarray]: """ 获取当前帧 Returns: 当前帧图像 """ with self.frame_lock: return self.current_frame.copy() if self.current_frame is not None else None def get_frame_from_queue(self, timeout: float = 0.1) -> Optional[np.ndarray]: """ 从帧队列获取帧 (用于批量处理) Args: timeout: 等待超时时间 Returns: 帧图像或None """ try: return self.frame_queue.get(timeout=timeout) except: return None def get_frame_buffer(self, count: int = 5) -> List[np.ndarray]: """ 获取帧缓冲 (用于运动检测等需要多帧的场景) Args: count: 获取帧数 Returns: 帧列表 """ frames = [] while len(frames) < count: frame = self.get_frame_from_queue(timeout=0.05) if frame is not None: frames.append(frame) else: break return frames def set_detector(self, detector): """设置物体检测器""" self.detector = detector def detect_objects(self, frame: np.ndarray = None) -> List[DetectedObject]: """ 检测物体 Args: frame: 输入帧,如果为None则使用当前帧 Returns: 检测到的物体列表 """ if frame is None: frame = self.get_frame() if frame is None or self.detector is None: return [] return self.detector.detect(frame) def get_detection_position(self, obj: DetectedObject, frame_size: Tuple[int, int]) -> Tuple[float, float]: """ 获取检测物体在画面中的相对位置 Args: obj: 检测到的物体 frame_size: 画面尺寸 (width, height) Returns: 相对位置 (x_ratio, y_ratio) 范围0-1 """ width, height = frame_size x_ratio = obj.center[0] / width y_ratio = obj.center[1] / height return (x_ratio, y_ratio) class ObjectDetector: """ 物体检测器 使用YOLO11模型进行人体检测 支持 YOLO (.pt), RKNN (.rknn), ONNX (.onnx) 模型 """ def __init__(self, model_path: str = None, use_gpu: bool = True, model_size: str = 'n', model_type: str = 'auto'): """ 初始化检测器 Args: model_path: 模型路径 (支持 .pt, .rknn, .onnx) use_gpu: 是否使用GPU model_size: 模型尺寸 ('n', 's', 'm', 'l', 'x') - 仅 YOLO 模型有效 model_type: 模型类型 ('auto', 'yolo', 'rknn', 'onnx') """ self.model = None self.rknn_detector = None self.model_path = model_path self.use_gpu = use_gpu self.model_size = model_size self.model_type = model_type self.config = DETECTION_CONFIG self.device = 'cuda:0' if use_gpu else 'cpu' # 检测图片保存配置 self._save_image_enabled = self.config.get('save_detection_image', False) self._image_save_dir = Path(self.config.get('detection_image_dir', './detection_images')) self._image_max_count = self.config.get('detection_image_max_count', 1000) self._last_save_time = 0 # 保存间隔:优先使用配置值,否则基于检测帧率计算(检测间隔的1.5倍) detection_fps = self.config.get('detection_fps', 2) self._save_interval = self.config.get('save_interval', 1.5 / detection_fps) # 创建保存目录 if self._save_image_enabled: self._ensure_save_dir() # 根据扩展名自动判断模型类型 if model_path: ext = os.path.splitext(model_path)[1].lower() if ext == '.rknn': self.model_type = 'rknn' elif ext == '.onnx': self.model_type = 'onnx' elif ext == '.pt': self.model_type = 'yolo' self._load_model() def _load_model(self): """加载检测模型""" if self.model_type == 'rknn': self._load_rknn_model() elif self.model_type == 'onnx': self._load_onnx_model() else: self._load_yolo_model() def _load_rknn_model(self): """加载 RKNN 模型""" if not self.model_path: raise ValueError("RKNN 模型需要指定 model_path") try: from rknnlite.api import RKNNLite self.rknn = RKNNLite() ret = self.rknn.load_rknn(self.model_path) if ret != 0: raise RuntimeError(f"加载 RKNN 模型失败: {self.model_path}") ret = self.rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2) if ret != 0: raise RuntimeError(f"初始化 RKNN 运行时失败") print(f"RKNN 模型加载成功: {self.model_path}") except ImportError: raise ImportError("未安装 rknnlite,请运行: pip install rknnlite2") def _load_onnx_model(self): """加载 ONNX 模型""" if not self.model_path: raise ValueError("ONNX 模型需要指定 model_path") try: import onnxruntime as ort self.session = ort.InferenceSession(self.model_path) self.input_name = self.session.get_inputs()[0].name self.output_name = self.session.get_outputs()[0].name print(f"ONNX 模型加载成功: {self.model_path}") except ImportError: raise ImportError("未安装 onnxruntime,请运行: pip install onnxruntime") def _load_yolo_model(self): """加载YOLO11检测模型""" try: from ultralytics import YOLO if self.model_path: self.model = YOLO(self.model_path) else: model_name = f'yolo11{self.model_size}.pt' self.model = YOLO(model_name) dummy = np.zeros((640, 640, 3), dtype=np.uint8) self.model(dummy, device=self.device, verbose=False) print(f"成功加载YOLO11检测模型 (device={self.device})") except ImportError: print("未安装ultralytics,请运行: pip install ultralytics") self._load_opencv_model() except Exception as e: print(f"加载YOLO11模型失败: {e}") self._load_opencv_model() def _load_opencv_model(self): """使用OpenCV加载模型""" pass def _ensure_save_dir(self): """确保保存目录存在""" try: self._image_save_dir.mkdir(parents=True, exist_ok=True) logger.info(f"检测图片保存目录: {self._image_save_dir}") except Exception as e: logger.error(f"创建检测图片目录失败: {e}") self._save_image_enabled = False def _cleanup_old_images(self): """清理旧图片,保持目录下图片数量不超过上限""" try: image_files = list(self._image_save_dir.glob("*.jpg")) if len(image_files) > self._image_max_count: # 按修改时间排序,删除最旧的 image_files.sort(key=lambda x: x.stat().st_mtime) to_delete = image_files[:len(image_files) - self._image_max_count] for f in to_delete: f.unlink() logger.info(f"已清理 {len(to_delete)} 张旧检测图片") except Exception as e: logger.error(f"清理旧图片失败: {e}") def _save_detection_image(self, frame: np.ndarray, detections: List[DetectedObject]): """ 保存带有检测标记的图片(只标记达到置信度阈值的人) Args: frame: 原始图像 detections: 检测结果列表 """ if not self._save_image_enabled or not detections: return # 检查保存间隔 current_time = time.time() if current_time - self._last_save_time < self._save_interval: return try: # 复制图像避免修改原图 marked_frame = frame.copy() # 置信度阈值(人员检测用更高阈值) person_threshold = self.config.get('person_threshold', 0.8) # 只标记达到阈值的人 person_count = 0 for det in detections: # 只处理人且达到阈值 is_person = det.class_name in ['person'] if not is_person: continue # 未达阈值的不标记 if det.confidence < person_threshold: continue x, y, w, h = det.bbox # 绘制边界框(绿色) cv2.rectangle(marked_frame, (x, y), (x + w, y + h), (0, 255, 0), 2) # 绘制序号标签 label = f"person_{person_count}" person_count += 1 (label_w, label_h), baseline = cv2.getTextSize( label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2 ) cv2.rectangle( marked_frame, (x, y - label_h - 8), (x + label_w, y), (0, 255, 0), -1 ) # 绘制标签文字(黑色) cv2.putText( marked_frame, label, (x, y - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2 ) # 无有效目标则不保存 if person_count == 0: return # 生成文件名(时间戳+有效人数) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"panorama_{timestamp}_n{person_count}.jpg" filepath = self._image_save_dir / filename # 保存图片 cv2.imwrite(str(filepath), marked_frame, [cv2.IMWRITE_JPEG_QUALITY, 90]) self._last_save_time = current_time logger.info(f"[全景] 已保存检测图片: {filepath},有效人数 {person_count} (阈值={person_threshold})") # 定期清理旧图片 self._cleanup_old_images() except Exception as e: logger.error(f"[全景] 保存检测图片失败: {e}") def _letterbox(self, image, size=(640, 640)): """Letterbox 预处理""" h0, w0 = image.shape[:2] ih, iw = size scale = min(iw / w0, ih / h0) new_w, new_h = int(w0 * scale), int(h0 * scale) pad_w = (iw - new_w) // 2 pad_h = (ih - new_h) // 2 resized = cv2.resize(image, (new_w, new_h)) canvas = np.full((ih, iw, 3), 114, dtype=np.uint8) canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized return canvas, scale, pad_w, pad_h, h0, w0 def _detect_rknn(self, frame: np.ndarray) -> List[DetectedObject]: """使用 RKNN/ONNX 模型检测""" results = [] try: canvas, scale, pad_w, pad_h, h0, w0 = self._letterbox(frame) if hasattr(self, 'rknn'): # RKNN img = canvas[..., ::-1].astype(np.float32) / 255.0 blob = img[None, ...] outputs = self.rknn.inference(inputs=[blob]) else: # ONNX img = canvas[..., ::-1].astype(np.float32) / 255.0 img = img.transpose(2, 0, 1) blob = img[None, ...] outputs = self.session.run([self.output_name], {self.input_name: blob}) output = outputs[0] if len(output.shape) == 3: output = output[0] num_boxes = output.shape[1] conf_threshold = self.config['confidence_threshold'] for i in range(num_boxes): x_center = float(output[0, i]) y_center = float(output[1, i]) width = float(output[2, i]) height = float(output[3, i]) class_probs = output[4:, i] best_class = int(np.argmax(class_probs)) confidence = float(class_probs[best_class]) if confidence < conf_threshold: continue # 转换到原始图像坐标 x1 = int(((x_center - width / 2) - pad_w) / scale) y1 = int(((y_center - height / 2) - pad_h) / scale) x2 = int(((x_center + width / 2) - pad_w) / scale) y2 = int(((y_center + height / 2) - pad_h) / scale) x1 = max(0, min(w0, x1)) y1 = max(0, min(h0, y1)) x2 = max(0, min(w0, x2)) y2 = max(0, min(h0, y2)) if x2 - x1 < 10 or y2 - y1 < 10: continue # 使用配置的类别映射获取类别名称 class_map = self.config.get('class_map', {0: 'hat',3: 'person',4: 'reflective'}) cls_name = class_map.get(best_class, str(best_class)) # 检查是否为目标类别 if cls_name not in self.config['target_classes']: continue obj = DetectedObject( class_name=cls_name, confidence=confidence, bbox=(x1, y1, x2 - x1, y2 - y1), center=((x1 + x2) // 2, (y1 + y2) // 2) ) results.append(obj) except Exception as e: logger.error(f"RKNN/ONNX 检测错误: {e}") return results def detect(self, frame: np.ndarray) -> List[DetectedObject]: """检测物体返回所有类别结果""" if frame is None: return [] if hasattr(self, 'rknn') and self.rknn is not None: results = self._detect_rknn(frame) if results: self._log_detections("RKNN", results, frame) self._save_detection_image(frame, results) return results elif hasattr(self, 'session') and self.session is not None: results = self._detect_rknn(frame) if results: self._log_detections("ONNX", results, frame) self._save_detection_image(frame, results) return results elif self.model is not None: results = self._detect_yolo(frame) if results: self._log_detections("YOLO", results, frame) self._save_detection_image(frame, results) return results else: logger.error("[YOLO] 没有可用的检测模型") return [] def _log_detections(self, model_type: str, results: List[DetectedObject], frame: np.ndarray): if not results: return class_counts = {} for r in results: class_counts[r.class_name] = class_counts.get(r.class_name, 0) + 1 h, w = frame.shape[:2] logger.info(f"[YOLO] {model_type}: {len(results)}个目标 {class_counts} (帧尺寸={w}x{h})") def _detect_yolo(self, frame: np.ndarray) -> List[DetectedObject]: """使用 YOLO 模型检测""" results = [] try: detections = self.model( frame, device=self.device, verbose=False, conf=self.config['confidence_threshold'] ) for det in detections: boxes = det.boxes if boxes is None: continue for i in range(len(boxes)): cls_id = int(boxes.cls[i]) cls_name = det.names[cls_id] if cls_name not in self.config['target_classes']: continue conf = float(boxes.conf[i]) xyxy = boxes.xyxy[i].cpu().numpy() x1, y1, x2, y2 = map(int, xyxy) width = x2 - x1 height = y2 - y1 if width < 10 or height < 10: continue center_x = x1 + width // 2 center_y = y1 + height // 2 obj = DetectedObject( class_name=cls_name, confidence=conf, bbox=(x1, y1, width, height), center=(center_x, center_y) ) results.append(obj) except Exception as e: logger.error(f"YOLO11检测错误: {e}") return results def detect_with_keypoints(self, frame: np.ndarray) -> List[DetectedObject]: """ 使用YOLO11-pose检测人体并返回关键点 Args: frame: 输入图像 Returns: 带关键点的检测结果列表 """ return self.detect(frame) def detect_persons(self, frame: np.ndarray) -> List[DetectedObject]: """检测人体(支持中英文类别名)""" all_detections = self.detect(frame) person_classes = {'person', '人'} return [obj for obj in all_detections if obj.class_name in person_classes] def release(self): """释放模型资源""" if hasattr(self, 'rknn') and self.rknn: self.rknn.release() self.rknn = None self.model = None self.session = None