| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717 |
- """
- 全景摄像头模块
- 负责获取视频流和物体检测
- """
- import cv2
- import numpy as np
- import threading
- import queue
- import time
- import os
- from typing import Optional, List, Tuple, Dict, Any
- from dataclasses import dataclass
- from config import PANORAMA_CAMERA, DETECTION_CONFIG
- from dahua_sdk import DahuaSDK, PTZCommand
- @dataclass
- class DetectedObject:
- """检测到的物体"""
- class_name: str # 类别名称
- confidence: float # 置信度
- bbox: Tuple[int, int, int, int] # 边界框 (x, y, width, height)
- center: Tuple[int, int] # 中心点坐标
- track_id: Optional[int] = None # 跟踪ID
- class PanoramaCamera:
- """全景摄像头类"""
-
- def __init__(self, sdk: DahuaSDK, camera_config: Dict = None):
- """
- 初始化全景摄像头
- Args:
- sdk: 大华SDK实例
- camera_config: 摄像头配置
- """
- self.sdk = sdk
- self.config = camera_config or PANORAMA_CAMERA
-
- self.login_handle = None
- self.play_handle = None
- self.connected = False
-
- # 视频流
- self.frame_queue = queue.Queue(maxsize=10)
- self.current_frame = None
- self.frame_lock = threading.Lock()
- self.rtsp_cap = None # RTSP视频捕获
-
- # 检测器
- self.detector = None
-
- # 控制标志
- self.running = False
- self.stream_thread = None
-
- # 断线重连
- self.auto_reconnect = True
- self.reconnect_interval = 5.0 # 重连间隔(秒)
- self.max_reconnect_attempts = 3 # 最大重连次数
-
- def connect(self) -> bool:
- """
- 连接摄像头
- Returns:
- 是否成功
- """
- login_handle, error = self.sdk.login(
- self.config['ip'],
- self.config['port'],
- self.config['username'],
- self.config['password']
- )
-
- if login_handle is None:
- print(f"连接全景摄像头失败: IP={self.config['ip']}, 错误码={error}")
- return False
-
- self.login_handle = login_handle
- self.connected = True
- print(f"成功连接全景摄像头: {self.config['ip']}")
- return True
-
- def disconnect(self):
- """断开连接"""
- self.stop_stream()
- if self.login_handle:
- self.sdk.logout(self.login_handle)
- self.login_handle = None
- self.connected = False
-
- def start_stream(self) -> bool:
- """
- 开始视频流
- Returns:
- 是否成功
- """
- if not self.connected:
- return False
-
- self.play_handle = self.sdk.real_play(
- self.login_handle,
- self.config['channel']
- )
-
- if self.play_handle is None:
- print("启动视频流失败")
- return False
-
- self.running = True
- self.stream_thread = threading.Thread(target=self._stream_worker, daemon=True)
- self.stream_thread.start()
-
- print("视频流已启动")
- return True
-
- def start_stream_rtsp(self, rtsp_url: str = None) -> bool:
- """
- 通过RTSP协议获取视频流
- Args:
- rtsp_url: RTSP地址,格式: rtsp://user:pass@ip:port/channel
- Returns:
- 是否成功
- """
- if rtsp_url is None:
- # 构建RTSP地址
- rtsp_url = f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/h264/ch{self.config['channel']}/main/av_stream"
-
- try:
- self.rtsp_cap = cv2.VideoCapture(rtsp_url)
- if not self.rtsp_cap.isOpened():
- print(f"无法打开RTSP流: {rtsp_url}")
- return False
-
- self.running = True
- self.stream_thread = threading.Thread(target=self._rtsp_stream_worker, daemon=True)
- self.stream_thread.start()
- print(f"RTSP视频流已启动: {rtsp_url}")
- return True
- except Exception as e:
- print(f"RTSP流启动失败: {e}")
- return False
-
- def _stream_worker(self):
- """视频流工作线程 (SDK模式)"""
- retry_count = 0
- max_retries = 10
-
- while self.running:
- try:
- # 尝试从 SDK 帧缓冲区获取帧 (如果可用)
- frame_buffer = self.sdk.get_video_frame_buffer(self.config['channel'])
-
- if frame_buffer:
- frame_info = frame_buffer.get(timeout=0.1)
- if frame_info and frame_info.get('data'):
- # 解码帧数据 (如果需要)
- # 注意: SDK回调返回的是编码数据,需要解码
- # 这里暂时跳过,因为解码需要额外处理
- pass
-
- # RTSP 模式获取帧 (推荐方式)
- if self.rtsp_cap is not None and self.rtsp_cap.isOpened():
- ret, frame = self.rtsp_cap.read()
- if ret and frame is not None:
- with self.frame_lock:
- self.current_frame = frame.copy()
-
- try:
- self.frame_queue.put(frame.copy(), block=False)
- except queue.Full:
- pass
-
- retry_count = 0 # 重置重试计数
- time.sleep(0.001) # 减少CPU占用
- continue
-
- # 如果 RTSP 不可用,尝试自动连接
- if retry_count < max_retries:
- rtsp_url = self._build_rtsp_url()
- try:
- if self.rtsp_cap is None:
- self.rtsp_cap = cv2.VideoCapture(rtsp_url)
- self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 减少缓冲延迟
-
- if self.rtsp_cap.isOpened():
- retry_count = 0
- continue
- except Exception as e:
- pass
-
- retry_count += 1
- time.sleep(1.0) # 重试间隔
- else:
- # 超过最大重试次数,使用模拟帧
- frame = np.zeros((1080, 1920, 3), dtype=np.uint8)
-
- with self.frame_lock:
- self.current_frame = frame
-
- try:
- self.frame_queue.put(frame, block=False)
- except queue.Full:
- pass
-
- time.sleep(0.1)
-
- except Exception as e:
- print(f"视频流错误: {e}")
- time.sleep(0.1)
-
- def _build_rtsp_url(self) -> str:
- """构建 RTSP URL"""
- return f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/h264/ch{self.config['channel']}/main/av_stream"
-
- def _rtsp_stream_worker(self):
- """RTSP视频流工作线程"""
- while self.running:
- try:
- if self.rtsp_cap is None or not self.rtsp_cap.isOpened():
- time.sleep(0.1)
- continue
-
- ret, frame = self.rtsp_cap.read()
- if not ret or frame is None:
- time.sleep(0.01)
- continue
-
- with self.frame_lock:
- self.current_frame = frame.copy()
-
- try:
- self.frame_queue.put(frame, block=False)
- except queue.Full:
- pass
-
- except Exception as e:
- print(f"RTSP视频流错误: {e}")
- time.sleep(0.1)
-
- def stop_stream(self):
- """停止视频流"""
- self.running = False
- if self.stream_thread:
- self.stream_thread.join(timeout=2)
- if self.play_handle:
- self.sdk.stop_real_play(self.play_handle)
- self.play_handle = None
- if self.rtsp_cap:
- self.rtsp_cap.release()
- self.rtsp_cap = None
-
- def get_frame(self) -> Optional[np.ndarray]:
- """
- 获取当前帧
- Returns:
- 当前帧图像
- """
- with self.frame_lock:
- return self.current_frame.copy() if self.current_frame is not None else None
-
- def get_frame_from_queue(self, timeout: float = 0.1) -> Optional[np.ndarray]:
- """
- 从帧队列获取帧 (用于批量处理)
- Args:
- timeout: 等待超时时间
- Returns:
- 帧图像或None
- """
- try:
- return self.frame_queue.get(timeout=timeout)
- except:
- return None
-
- def get_frame_buffer(self, count: int = 5) -> List[np.ndarray]:
- """
- 获取帧缓冲 (用于运动检测等需要多帧的场景)
- Args:
- count: 获取帧数
- Returns:
- 帧列表
- """
- frames = []
- while len(frames) < count:
- frame = self.get_frame_from_queue(timeout=0.05)
- if frame is not None:
- frames.append(frame)
- else:
- break
- return frames
-
- def set_detector(self, detector):
- """设置物体检测器"""
- self.detector = detector
-
- def detect_objects(self, frame: np.ndarray = None) -> List[DetectedObject]:
- """
- 检测物体
- Args:
- frame: 输入帧,如果为None则使用当前帧
- Returns:
- 检测到的物体列表
- """
- if frame is None:
- frame = self.get_frame()
-
- if frame is None or self.detector is None:
- return []
-
- return self.detector.detect(frame)
-
- def get_detection_position(self, obj: DetectedObject,
- frame_size: Tuple[int, int]) -> Tuple[float, float]:
- """
- 获取检测物体在画面中的相对位置
- Args:
- obj: 检测到的物体
- frame_size: 画面尺寸 (width, height)
- Returns:
- 相对位置 (x_ratio, y_ratio) 范围0-1
- """
- width, height = frame_size
- x_ratio = obj.center[0] / width
- y_ratio = obj.center[1] / height
- return (x_ratio, y_ratio)
- class ObjectDetector:
- """
- 物体检测器
- 使用YOLO11模型进行人体检测
- 支持 YOLO (.pt), RKNN (.rknn), ONNX (.onnx) 模型
- """
-
- def __init__(self, model_path: str = None, use_gpu: bool = True, model_size: str = 'n',
- model_type: str = 'auto'):
- """
- 初始化检测器
- Args:
- model_path: 模型路径 (支持 .pt, .rknn, .onnx)
- use_gpu: 是否使用GPU
- model_size: 模型尺寸 ('n', 's', 'm', 'l', 'x') - 仅 YOLO 模型有效
- model_type: 模型类型 ('auto', 'yolo', 'rknn', 'onnx')
- """
- self.model = None
- self.rknn_detector = None
- self.model_path = model_path
- self.use_gpu = use_gpu
- self.model_size = model_size
- self.model_type = model_type
- self.config = DETECTION_CONFIG
- self.device = 'cuda:0' if use_gpu else 'cpu'
-
- # 根据扩展名自动判断模型类型
- if model_path:
- ext = os.path.splitext(model_path)[1].lower()
- if ext == '.rknn':
- self.model_type = 'rknn'
- elif ext == '.onnx':
- self.model_type = 'onnx'
- elif ext == '.pt':
- self.model_type = 'yolo'
-
- self._load_model()
-
- def _load_model(self):
- """加载检测模型"""
- if self.model_type == 'rknn':
- self._load_rknn_model()
- elif self.model_type == 'onnx':
- self._load_onnx_model()
- else:
- self._load_yolo_model()
-
- def _load_rknn_model(self):
- """加载 RKNN 模型"""
- if not self.model_path:
- raise ValueError("RKNN 模型需要指定 model_path")
-
- try:
- from rknnlite.api import RKNNLite
-
- self.rknn = RKNNLite()
- ret = self.rknn.load_rknn(self.model_path)
- if ret != 0:
- raise RuntimeError(f"加载 RKNN 模型失败: {self.model_path}")
-
- ret = self.rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2)
- if ret != 0:
- raise RuntimeError(f"初始化 RKNN 运行时失败")
-
- print(f"RKNN 模型加载成功: {self.model_path}")
- except ImportError:
- raise ImportError("未安装 rknnlite,请运行: pip install rknnlite2")
-
- def _load_onnx_model(self):
- """加载 ONNX 模型"""
- if not self.model_path:
- raise ValueError("ONNX 模型需要指定 model_path")
-
- try:
- import onnxruntime as ort
- self.session = ort.InferenceSession(self.model_path)
- self.input_name = self.session.get_inputs()[0].name
- self.output_name = self.session.get_outputs()[0].name
- print(f"ONNX 模型加载成功: {self.model_path}")
- except ImportError:
- raise ImportError("未安装 onnxruntime,请运行: pip install onnxruntime")
-
- def _load_yolo_model(self):
- """加载YOLO11检测模型"""
- try:
- from ultralytics import YOLO
-
- if self.model_path:
- self.model = YOLO(self.model_path)
- else:
- model_name = f'yolo11{self.model_size}.pt'
- self.model = YOLO(model_name)
-
- dummy = np.zeros((640, 640, 3), dtype=np.uint8)
- self.model(dummy, device=self.device, verbose=False)
-
- print(f"成功加载YOLO11检测模型 (device={self.device})")
- except ImportError:
- print("未安装ultralytics,请运行: pip install ultralytics")
- self._load_opencv_model()
- except Exception as e:
- print(f"加载YOLO11模型失败: {e}")
- self._load_opencv_model()
-
- def _load_opencv_model(self):
- """使用OpenCV加载模型"""
- pass
-
- def _letterbox(self, image, size=(640, 640)):
- """Letterbox 预处理"""
- h0, w0 = image.shape[:2]
- ih, iw = size
- scale = min(iw / w0, ih / h0)
- new_w, new_h = int(w0 * scale), int(h0 * scale)
- pad_w = (iw - new_w) // 2
- pad_h = (ih - new_h) // 2
- resized = cv2.resize(image, (new_w, new_h))
- canvas = np.full((ih, iw, 3), 114, dtype=np.uint8)
- canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized
- return canvas, scale, pad_w, pad_h, h0, w0
-
- def _detect_rknn(self, frame: np.ndarray) -> List[DetectedObject]:
- """使用 RKNN/ONNX 模型检测"""
- results = []
-
- try:
- canvas, scale, pad_w, pad_h, h0, w0 = self._letterbox(frame)
-
- if hasattr(self, 'rknn'):
- # RKNN
- img = canvas[..., ::-1].astype(np.float32) / 255.0
- blob = img[None, ...]
- outputs = self.rknn.inference(inputs=[blob])
- else:
- # ONNX
- img = canvas[..., ::-1].astype(np.float32) / 255.0
- img = img.transpose(2, 0, 1)
- blob = img[None, ...]
- outputs = self.session.run([self.output_name], {self.input_name: blob})
-
- output = outputs[0]
- if len(output.shape) == 3:
- output = output[0]
-
- num_boxes = output.shape[1]
- conf_threshold = self.config['confidence_threshold']
-
- for i in range(num_boxes):
- x_center = float(output[0, i])
- y_center = float(output[1, i])
- width = float(output[2, i])
- height = float(output[3, i])
-
- class_probs = output[4:, i]
- best_class = int(np.argmax(class_probs))
- confidence = float(class_probs[best_class])
-
- if confidence < conf_threshold:
- continue
-
- # 转换到原始图像坐标
- x1 = int(((x_center - width / 2) - pad_w) / scale)
- y1 = int(((y_center - height / 2) - pad_h) / scale)
- x2 = int(((x_center + width / 2) - pad_w) / scale)
- y2 = int(((y_center + height / 2) - pad_h) / scale)
-
- x1 = max(0, min(w0, x1))
- y1 = max(0, min(h0, y1))
- x2 = max(0, min(w0, x2))
- y2 = max(0, min(h0, y2))
-
- if x2 - x1 < 10 or y2 - y1 < 10:
- continue
-
- # 只检测 person (class 0 in YOLO person model, 但 RKNN safety 模型是 class 3)
- # 对于通用检测,尝试获取类别名称
- cls_name = str(best_class)
- if best_class == 0:
- cls_name = 'person'
- elif best_class == 3:
- cls_name = 'person' # 安全模型中的人
-
- obj = DetectedObject(
- class_name=cls_name,
- confidence=confidence,
- bbox=(x1, y1, x2 - x1, y2 - y1),
- center=((x1 + x2) // 2, (y1 + y2) // 2)
- )
- results.append(obj)
-
- except Exception as e:
- print(f"RKNN/ONNX 检测错误: {e}")
-
- return results
-
- def detect(self, frame: np.ndarray) -> List[DetectedObject]:
- """
- 使用YOLO11检测物体
- Args:
- frame: 输入图像
- Returns:
- 检测结果列表
- """
- if frame is None:
- return []
-
- if self.rknn_detector is not None or hasattr(self, 'rknn') or hasattr(self, 'session'):
- return self._detect_rknn(frame)
- else:
- return self._detect_yolo(frame)
-
- def _detect_yolo(self, frame: np.ndarray) -> List[DetectedObject]:
- """使用 YOLO 模型检测"""
- results = []
-
- try:
- detections = self.model(
- frame,
- device=self.device,
- verbose=False,
- conf=self.config['confidence_threshold']
- )
-
- for det in detections:
- boxes = det.boxes
- if boxes is None:
- continue
-
- for i in range(len(boxes)):
- cls_id = int(boxes.cls[i])
- cls_name = det.names[cls_id]
-
- if cls_name not in self.config['target_classes']:
- continue
-
- conf = float(boxes.conf[i])
-
- xyxy = boxes.xyxy[i].cpu().numpy()
- x1, y1, x2, y2 = map(int, xyxy)
- width = x2 - x1
- height = y2 - y1
-
- if width < 10 or height < 10:
- continue
-
- center_x = x1 + width // 2
- center_y = y1 + height // 2
-
- obj = DetectedObject(
- class_name=cls_name,
- confidence=conf,
- bbox=(x1, y1, width, height),
- center=(center_x, center_y)
- )
- results.append(obj)
-
- except Exception as e:
- print(f"YOLO11检测错误: {e}")
-
- return results
-
- def detect_with_keypoints(self, frame: np.ndarray) -> List[DetectedObject]:
- """
- 使用YOLO11-pose检测人体并返回关键点
- Args:
- frame: 输入图像
- Returns:
- 带关键点的检测结果列表
- """
- return self.detect(frame)
-
- def detect_persons(self, frame: np.ndarray) -> List[DetectedObject]:
- """
- 检测人体
- Args:
- frame: 输入图像
- Returns:
- 检测到的人体列表
- """
- results = self.detect(frame)
- return [obj for obj in results if obj.class_name == 'person']
-
- def release(self):
- """释放模型资源"""
- if hasattr(self, 'rknn') and self.rknn:
- self.rknn.release()
- self.rknn = None
- self.model = None
- self.session = None
- class PersonTracker:
- """
- 人体跟踪器
- 使用简单的质心跟踪算法
- """
-
- def __init__(self, max_disappeared: int = 30):
- """
- 初始化跟踪器
- Args:
- max_disappeared: 最大消失帧数
- """
- self.max_disappeared = max_disappeared
- self.next_id = 0
- self.objects = {} # id -> center
- self.disappeared = {} # id -> disappeared count
-
- def update(self, detections: List[DetectedObject]) -> List[DetectedObject]:
- """
- 更新跟踪状态
- Args:
- detections: 当前帧检测结果
- Returns:
- 带有跟踪ID的检测结果
- """
- # 如果没有检测结果
- if len(detections) == 0:
- # 标记所有已跟踪对象为消失
- for obj_id in list(self.disappeared.keys()):
- self.disappeared[obj_id] += 1
- if self.disappeared[obj_id] > self.max_disappeared:
- self._deregister(obj_id)
- return []
-
- # 计算当前检测中心点
- input_centers = np.array([d.center for d in detections])
-
- # 如果没有已跟踪对象
- if len(self.objects) == 0:
- for det in detections:
- self._register(det)
- else:
- # 计算距离矩阵
- object_ids = list(self.objects.keys())
- object_centers = np.array([self.objects[obj_id] for obj_id in object_ids])
-
- # 计算欧氏距离
- distances = np.linalg.norm(
- object_centers[:, np.newaxis] - input_centers,
- axis=2
- )
-
- # 匈牙利算法匹配 (简化版: 贪心匹配)
- rows = distances.min(axis=1).argsort()
- cols = distances.argmin(axis=1)[rows]
-
- used_rows = set()
- used_cols = set()
-
- for (row, col) in zip(rows, cols):
- if row in used_rows or col in used_cols:
- continue
-
- obj_id = object_ids[row]
- self.objects[obj_id] = input_centers[col]
- self.disappeared[obj_id] = 0
- detections[col].track_id = obj_id
-
- used_rows.add(row)
- used_cols.add(col)
-
- # 处理未匹配的已跟踪对象
- unused_rows = set(range(len(object_ids))) - used_rows
- for row in unused_rows:
- obj_id = object_ids[row]
- self.disappeared[obj_id] += 1
- if self.disappeared[obj_id] > self.max_disappeared:
- self._deregister(obj_id)
-
- # 处理未匹配的新检测
- unused_cols = set(range(len(input_centers))) - used_cols
- for col in unused_cols:
- self._register(detections[col])
-
- return [d for d in detections if d.track_id is not None]
-
- def _register(self, detection: DetectedObject):
- """注册新对象"""
- detection.track_id = self.next_id
- self.objects[self.next_id] = detection.center
- self.disappeared[self.next_id] = 0
- self.next_id += 1
-
- def _deregister(self, obj_id: int):
- """注销对象"""
- del self.objects[obj_id]
- del self.disappeared[obj_id]
|