|
|
@@ -2,6 +2,10 @@
|
|
|
施工现场安全行为检测模块
|
|
|
使用 YOLO11 模型检测人员、安全帽、反光衣
|
|
|
判断是否存在违规行为(未戴安全帽、未穿反光衣)
|
|
|
+
|
|
|
+支持两种模型格式:
|
|
|
+- YOLO (.pt/.onnx): 使用 ultralytics 库
|
|
|
+- RKNN (.rknn): 使用 rknnlite 库 (RK3588 平台)
|
|
|
"""
|
|
|
|
|
|
import cv2
|
|
|
@@ -9,6 +13,239 @@ import numpy as np
|
|
|
from typing import Optional, List, Tuple, Dict, Any
|
|
|
from dataclasses import dataclass
|
|
|
from enum import Enum
|
|
|
+import os
|
|
|
+
|
|
|
+
|
|
|
+# ============================================
|
|
|
+# RKNN 模型支持
|
|
|
+# ============================================
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class Detection:
|
|
|
+ """检测结果 (用于 RKNN 模型)"""
|
|
|
+ class_id: int
|
|
|
+ class_name: str
|
|
|
+ confidence: float
|
|
|
+ bbox: Tuple[int, int, int, int]
|
|
|
+
|
|
|
+
|
|
|
+def nms(dets, iou_threshold=0.45):
|
|
|
+ """非极大值抑制"""
|
|
|
+ if len(dets) == 0:
|
|
|
+ return []
|
|
|
+
|
|
|
+ boxes = np.array([[d.bbox[0], d.bbox[1], d.bbox[2], d.bbox[3], d.confidence] for d in dets])
|
|
|
+ x1 = boxes[:, 0]
|
|
|
+ y1 = boxes[:, 1]
|
|
|
+ x2 = boxes[:, 2]
|
|
|
+ y2 = boxes[:, 3]
|
|
|
+ scores = boxes[:, 4]
|
|
|
+
|
|
|
+ areas = (x2 - x1 + 1) * (y2 - y1 + 1)
|
|
|
+ order = scores.argsort()[::-1]
|
|
|
+
|
|
|
+ keep = []
|
|
|
+ while order.size > 0:
|
|
|
+ i = order[0]
|
|
|
+ keep.append(i)
|
|
|
+
|
|
|
+ xx1 = np.maximum(x1[i], x1[order[1:]])
|
|
|
+ yy1 = np.maximum(y1[i], y1[order[1:]])
|
|
|
+ xx2 = np.minimum(x2[i], x2[order[1:]])
|
|
|
+ yy2 = np.minimum(y2[i], y2[order[1:]])
|
|
|
+
|
|
|
+ w = np.maximum(0.0, xx2 - xx1 + 1)
|
|
|
+ h = np.maximum(0.0, yy2 - yy1 + 1)
|
|
|
+ inter = w * h
|
|
|
+ ovr = inter / (areas[i] + areas[order[1:]] - inter)
|
|
|
+
|
|
|
+ inds = np.where(ovr <= iou_threshold)[0]
|
|
|
+ order = order[inds + 1]
|
|
|
+
|
|
|
+ return [dets[i] for i in keep]
|
|
|
+
|
|
|
+
|
|
|
+class BaseDetector:
|
|
|
+ """检测器基类 (用于 RKNN/ONNX 模型)"""
|
|
|
+
|
|
|
+ # 类别映射: 0: 安全帽, 3: 人, 4: 反光衣
|
|
|
+ LABEL_MAP = {0: '安全帽', 4: '安全衣', 3: '人'}
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.input_size = (640, 640)
|
|
|
+ self.num_classes = 5
|
|
|
+
|
|
|
+ def letterbox(self, image):
|
|
|
+ """Letterbox 预处理,保持宽高比"""
|
|
|
+ h0, w0 = image.shape[:2]
|
|
|
+ ih, iw = self.input_size
|
|
|
+ scale = min(iw / w0, ih / h0)
|
|
|
+ new_w, new_h = int(w0 * scale), int(h0 * scale)
|
|
|
+ pad_w = (iw - new_w) // 2
|
|
|
+ pad_h = (ih - new_h) // 2
|
|
|
+ resized = cv2.resize(image, (new_w, new_h))
|
|
|
+ canvas = np.full((ih, iw, 3), 114, dtype=np.uint8)
|
|
|
+ canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized
|
|
|
+ return canvas, scale, pad_w, pad_h, h0, w0
|
|
|
+
|
|
|
+ def postprocess(self, outputs, scale, pad_w, pad_h, h0, w0, conf_threshold_map):
|
|
|
+ """后处理"""
|
|
|
+ dets = []
|
|
|
+
|
|
|
+ if not outputs:
|
|
|
+ return dets
|
|
|
+
|
|
|
+ output = outputs[0]
|
|
|
+
|
|
|
+ if len(output.shape) == 3:
|
|
|
+ output = output[0]
|
|
|
+
|
|
|
+ num_boxes = output.shape[1]
|
|
|
+
|
|
|
+ for i in range(num_boxes):
|
|
|
+ x_center = float(output[0, i])
|
|
|
+ y_center = float(output[1, i])
|
|
|
+ width = float(output[2, i])
|
|
|
+ height = float(output[3, i])
|
|
|
+
|
|
|
+ class_probs = output[4:4+self.num_classes, i]
|
|
|
+ best_class = int(np.argmax(class_probs))
|
|
|
+ confidence = float(class_probs[best_class])
|
|
|
+
|
|
|
+ if best_class not in self.LABEL_MAP:
|
|
|
+ continue
|
|
|
+
|
|
|
+ conf_threshold = conf_threshold_map.get(best_class, 0.5)
|
|
|
+
|
|
|
+ if confidence < conf_threshold:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 移除 padding 并缩放到原始图像尺寸
|
|
|
+ x1 = int(((x_center - width / 2) - pad_w) / scale)
|
|
|
+ y1 = int(((y_center - height / 2) - pad_h) / scale)
|
|
|
+ x2 = int(((x_center + width / 2) - pad_w) / scale)
|
|
|
+ y2 = int(((y_center + height / 2) - pad_h) / scale)
|
|
|
+
|
|
|
+ x1 = max(0, min(w0, x1))
|
|
|
+ y1 = max(0, min(h0, y1))
|
|
|
+ x2 = max(0, min(w0, x2))
|
|
|
+ y2 = max(0, min(h0, y2))
|
|
|
+
|
|
|
+ det = Detection(
|
|
|
+ class_id=best_class,
|
|
|
+ class_name=self.LABEL_MAP[best_class],
|
|
|
+ confidence=confidence,
|
|
|
+ bbox=(x1, y1, x2, y2)
|
|
|
+ )
|
|
|
+ dets.append(det)
|
|
|
+
|
|
|
+ dets = nms(dets, iou_threshold=0.45)
|
|
|
+ return dets
|
|
|
+
|
|
|
+ def detect(self, image, conf_threshold_map):
|
|
|
+ raise NotImplementedError
|
|
|
+
|
|
|
+ def release(self):
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+class RKNNDetector(BaseDetector):
|
|
|
+ """RKNN 检测器 - 使用 NHWC 输入格式 (1, H, W, C)"""
|
|
|
+
|
|
|
+ def __init__(self, model_path: str):
|
|
|
+ super().__init__()
|
|
|
+ self.model_path = model_path
|
|
|
+ self.rknn = None
|
|
|
+
|
|
|
+ try:
|
|
|
+ from rknnlite.api import RKNNLite
|
|
|
+ self.rknn = RKNNLite()
|
|
|
+ except ImportError:
|
|
|
+ raise ImportError("未安装 rknnlite,请运行: pip install rknnlite2 或参考 testrk3588/setup_rknn.sh")
|
|
|
+
|
|
|
+ ret = self.rknn.load_rknn(model_path)
|
|
|
+ if ret != 0:
|
|
|
+ raise RuntimeError(f"加载 RKNN 模型失败: {model_path}")
|
|
|
+
|
|
|
+ ret = self.rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2)
|
|
|
+ if ret != 0:
|
|
|
+ raise RuntimeError(f"初始化 RKNN 运行时失败")
|
|
|
+
|
|
|
+ print(f"RKNN 模型加载成功: {model_path}")
|
|
|
+
|
|
|
+ def detect(self, image, conf_threshold_map):
|
|
|
+ canvas, scale, pad_w, pad_h, h0, w0 = self.letterbox(image)
|
|
|
+ # RKNN 期望 NHWC (1, H, W, C), RGB, 归一化 0-1
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
+ blob = img[None, ...] # (1, 640, 640, 3)
|
|
|
+ outs = self.rknn.inference(inputs=[blob])
|
|
|
+ return self.postprocess(outs, scale, pad_w, pad_h, h0, w0, conf_threshold_map)
|
|
|
+
|
|
|
+ def release(self):
|
|
|
+ if self.rknn:
|
|
|
+ self.rknn.release()
|
|
|
+ self.rknn = None
|
|
|
+
|
|
|
+
|
|
|
+class ONNXDetector(BaseDetector):
|
|
|
+ """ONNX 检测器 - 使用 NCHW 输入格式 (1, C, H, W)"""
|
|
|
+
|
|
|
+ def __init__(self, model_path: str):
|
|
|
+ super().__init__()
|
|
|
+ self.model_path = model_path
|
|
|
+
|
|
|
+ try:
|
|
|
+ import onnxruntime as ort
|
|
|
+ self.session = ort.InferenceSession(model_path)
|
|
|
+ self.input_name = self.session.get_inputs()[0].name
|
|
|
+ self.output_name = self.session.get_outputs()[0].name
|
|
|
+ print(f"ONNX 模型加载成功: {model_path}")
|
|
|
+ except ImportError:
|
|
|
+ raise ImportError("未安装 onnxruntime,请运行: pip install onnxruntime")
|
|
|
+ except Exception as e:
|
|
|
+ raise RuntimeError(f"加载 ONNX 模型失败: {e}")
|
|
|
+
|
|
|
+ def detect(self, image, conf_threshold_map):
|
|
|
+ canvas, scale, pad_w, pad_h, h0, w0 = self.letterbox(image)
|
|
|
+ # ONNX 期望 NCHW (1, C, H, W), RGB, 归一化 0-1
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
+ img = img.transpose(2, 0, 1)
|
|
|
+ blob = img[None, ...] # (1, 3, 640, 640)
|
|
|
+ outs = self.session.run([self.output_name], {self.input_name: blob})
|
|
|
+ return self.postprocess(outs, scale, pad_w, pad_h, h0, w0, conf_threshold_map)
|
|
|
+
|
|
|
+ def release(self):
|
|
|
+ self.session = None
|
|
|
+
|
|
|
+
|
|
|
+def create_detector(model_path: str):
|
|
|
+ """
|
|
|
+ 创建检测器工厂函数
|
|
|
+
|
|
|
+ Args:
|
|
|
+ model_path: 模型路径 (.rknn, .onnx, .pt)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 检测器实例
|
|
|
+ """
|
|
|
+ ext = os.path.splitext(model_path)[1].lower()
|
|
|
+
|
|
|
+ if ext == '.rknn':
|
|
|
+ print(f"使用 RKNN 模型: {model_path}")
|
|
|
+ return RKNNDetector(model_path)
|
|
|
+ elif ext == '.onnx':
|
|
|
+ print(f"使用 ONNX 模型: {model_path}")
|
|
|
+ return ONNXDetector(model_path)
|
|
|
+ elif ext == '.pt':
|
|
|
+ print(f"使用 YOLO 模型: {model_path}")
|
|
|
+ return None # YOLO 使用原来的 SafetyDetector
|
|
|
+ else:
|
|
|
+ raise ValueError(f"不支持的模型格式: {ext}")
|
|
|
+
|
|
|
+
|
|
|
+# ============================================
|
|
|
+# 原有 YOLO 安全检测器
|
|
|
+# ============================================
|
|
|
|
|
|
|
|
|
class SafetyViolationType(Enum):
|
|
|
@@ -81,15 +318,12 @@ class SafetyDetector:
|
|
|
使用 YOLO11 检测人员、安全帽、反光衣
|
|
|
"""
|
|
|
|
|
|
- # 类别映射 (根据 yolo11m_safety.pt 模型的训练标签)
|
|
|
- # 0: 安全帽, 3: 人, 4: 安全衣/反光衣
|
|
|
CLASS_MAP = {
|
|
|
0: '安全帽',
|
|
|
3: '人',
|
|
|
4: '反光衣'
|
|
|
}
|
|
|
|
|
|
- # 反向映射
|
|
|
CLASS_ID_MAP = {
|
|
|
'helmet': 0,
|
|
|
'person': 3,
|
|
|
@@ -97,44 +331,94 @@ class SafetyDetector:
|
|
|
}
|
|
|
|
|
|
def __init__(self, model_path: str = None, use_gpu: bool = True,
|
|
|
- conf_threshold: float = 0.5, person_threshold: float = 0.8):
|
|
|
+ conf_threshold: float = 0.5, person_threshold: float = 0.8,
|
|
|
+ model_type: str = 'auto'):
|
|
|
"""
|
|
|
初始化安全检测器
|
|
|
|
|
|
Args:
|
|
|
- model_path: 模型路径,默认使用 yolo11m_safety.pt
|
|
|
- use_gpu: 是否使用 GPU
|
|
|
+ model_path: 模型路径,默认使用 yolo11m_safety.pt 或 .rknn
|
|
|
+ use_gpu: 是否使用 GPU (仅 YOLO 模型有效)
|
|
|
conf_threshold: 一般物品置信度阈值 (安全帽、反光衣)
|
|
|
person_threshold: 人员检测置信度阈值
|
|
|
+ model_type: 模型类型 ('auto', 'yolo', 'rknn', 'onnx')
|
|
|
"""
|
|
|
self.model = None
|
|
|
- self.model_path = model_path or '/home/wen/dsh/yolo/yolo11m_safety.pt'
|
|
|
+ self.rknn_detector = None
|
|
|
+ self.model_type = model_type
|
|
|
+
|
|
|
+ # 根据扩展名自动判断模型类型
|
|
|
+ if model_path:
|
|
|
+ ext = os.path.splitext(model_path)[1].lower()
|
|
|
+ if ext == '.rknn':
|
|
|
+ self.model_type = 'rknn'
|
|
|
+ elif ext == '.onnx':
|
|
|
+ self.model_type = 'onnx'
|
|
|
+ elif ext == '.pt':
|
|
|
+ self.model_type = 'yolo'
|
|
|
+
|
|
|
+ self.model_path = model_path
|
|
|
self.use_gpu = use_gpu
|
|
|
self.device = 'cuda:0' if use_gpu else 'cpu'
|
|
|
|
|
|
- # 置信度阈值
|
|
|
self.conf_threshold = conf_threshold
|
|
|
self.person_threshold = person_threshold
|
|
|
|
|
|
- # 加载模型
|
|
|
self._load_model()
|
|
|
|
|
|
def _load_model(self):
|
|
|
+ """加载检测模型"""
|
|
|
+ if self.model_type == 'rknn':
|
|
|
+ self._load_rknn_model()
|
|
|
+ elif self.model_type == 'onnx':
|
|
|
+ self._load_onnx_model()
|
|
|
+ else:
|
|
|
+ self._load_yolo_model()
|
|
|
+
|
|
|
+ def _load_rknn_model(self):
|
|
|
+ """加载 RKNN 模型"""
|
|
|
+ if not self.model_path:
|
|
|
+ raise ValueError("RKNN 模型需要指定 model_path")
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.rknn_detector = RKNNDetector(self.model_path)
|
|
|
+ print(f"RKNN 安全检测模型加载成功: {self.model_path}")
|
|
|
+ except ImportError as e:
|
|
|
+ raise ImportError(f"rknnlite 未安装: {e}")
|
|
|
+ except Exception as e:
|
|
|
+ raise RuntimeError(f"加载 RKNN 模型失败: {e}")
|
|
|
+
|
|
|
+ def _load_onnx_model(self):
|
|
|
+ """加载 ONNX 模型"""
|
|
|
+ if not self.model_path:
|
|
|
+ raise ValueError("ONNX 模型需要指定 model_path")
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.rknn_detector = ONNXDetector(self.model_path)
|
|
|
+ print(f"ONNX 安全检测模型加载成功: {self.model_path}")
|
|
|
+ except ImportError as e:
|
|
|
+ raise ImportError(f"onnxruntime 未安装: {e}")
|
|
|
+ except Exception as e:
|
|
|
+ raise RuntimeError(f"加载 ONNX 模型失败: {e}")
|
|
|
+
|
|
|
+ def _load_yolo_model(self):
|
|
|
"""加载 YOLO11 安全检测模型"""
|
|
|
try:
|
|
|
from ultralytics import YOLO
|
|
|
|
|
|
+ if not self.model_path:
|
|
|
+ self.model_path = '/home/wen/dsh/yolo/yolo11m_safety.pt'
|
|
|
+
|
|
|
self.model = YOLO(self.model_path)
|
|
|
|
|
|
- # 预热模型
|
|
|
dummy = np.zeros((640, 640, 3), dtype=np.uint8)
|
|
|
self.model(dummy, device=self.device, verbose=False)
|
|
|
|
|
|
- print(f"安全检测模型加载成功: {self.model_path} (device={self.device})")
|
|
|
+ print(f"YOLO 安全检测模型加载成功: {self.model_path} (device={self.device})")
|
|
|
except ImportError:
|
|
|
raise ImportError("未安装 ultralytics,请运行: pip install ultralytics")
|
|
|
except Exception as e:
|
|
|
- raise RuntimeError(f"加载模型失败: {e}")
|
|
|
+ raise RuntimeError(f"加载 YOLO 模型失败: {e}")
|
|
|
|
|
|
def detect(self, frame: np.ndarray) -> List[SafetyDetection]:
|
|
|
"""
|
|
|
@@ -146,9 +430,48 @@ class SafetyDetector:
|
|
|
Returns:
|
|
|
检测结果列表
|
|
|
"""
|
|
|
- if self.model is None or frame is None:
|
|
|
+ if frame is None:
|
|
|
return []
|
|
|
|
|
|
+ if self.rknn_detector is not None:
|
|
|
+ return self._detect_rknn(frame)
|
|
|
+ else:
|
|
|
+ return self._detect_yolo(frame)
|
|
|
+
|
|
|
+ def _detect_rknn(self, frame: np.ndarray) -> List[SafetyDetection]:
|
|
|
+ """使用 RKNN/ONNX 模型检测"""
|
|
|
+ results = []
|
|
|
+
|
|
|
+ try:
|
|
|
+ conf_threshold_map = {
|
|
|
+ 3: self.person_threshold,
|
|
|
+ 0: self.conf_threshold,
|
|
|
+ 4: self.conf_threshold
|
|
|
+ }
|
|
|
+
|
|
|
+ detections = self.rknn_detector.detect(frame, conf_threshold_map)
|
|
|
+
|
|
|
+ for det in detections:
|
|
|
+ x1, y1, x2, y2 = det.bbox
|
|
|
+ center_x = (x1 + x2) // 2
|
|
|
+ center_y = (y1 + y2) // 2
|
|
|
+
|
|
|
+ safety_det = SafetyDetection(
|
|
|
+ class_id=det.class_id,
|
|
|
+ class_name=det.class_name,
|
|
|
+ confidence=det.confidence,
|
|
|
+ bbox=det.bbox,
|
|
|
+ center=(center_x, center_y)
|
|
|
+ )
|
|
|
+ results.append(safety_det)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"RKNN 检测错误: {e}")
|
|
|
+
|
|
|
+ return results
|
|
|
+
|
|
|
+ def _detect_yolo(self, frame: np.ndarray) -> List[SafetyDetection]:
|
|
|
+ """使用 YOLO 模型检测"""
|
|
|
results = []
|
|
|
|
|
|
try:
|
|
|
@@ -160,32 +483,26 @@ class SafetyDetector:
|
|
|
continue
|
|
|
|
|
|
for i in range(len(boxes)):
|
|
|
- # 获取类别
|
|
|
cls_id = int(boxes.cls[i])
|
|
|
|
|
|
- # 只处理我们关心的类别
|
|
|
if cls_id not in self.CLASS_MAP:
|
|
|
continue
|
|
|
|
|
|
cls_name = self.CLASS_MAP[cls_id]
|
|
|
conf = float(boxes.conf[i])
|
|
|
|
|
|
- # 根据类别设置不同的置信度阈值
|
|
|
threshold = self.person_threshold if cls_id == 3 else self.conf_threshold
|
|
|
if conf < threshold:
|
|
|
continue
|
|
|
|
|
|
- # 获取边界框
|
|
|
xyxy = boxes.xyxy[i].cpu().numpy()
|
|
|
x1, y1, x2, y2 = map(int, xyxy)
|
|
|
|
|
|
- # 过滤过小的检测框
|
|
|
width = x2 - x1
|
|
|
height = y2 - y1
|
|
|
if width < 10 or height < 10:
|
|
|
continue
|
|
|
|
|
|
- # 计算中心点
|
|
|
center_x = (x1 + x2) // 2
|
|
|
center_y = (y1 + y2) // 2
|
|
|
|
|
|
@@ -199,10 +516,17 @@ class SafetyDetector:
|
|
|
results.append(detection)
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"检测错误: {e}")
|
|
|
+ print(f"YOLO 检测错误: {e}")
|
|
|
|
|
|
return results
|
|
|
|
|
|
+ def release(self):
|
|
|
+ """释放模型资源"""
|
|
|
+ if self.rknn_detector:
|
|
|
+ self.rknn_detector.release()
|
|
|
+ self.rknn_detector = None
|
|
|
+ self.model = None
|
|
|
+
|
|
|
def check_safety(self, frame: np.ndarray,
|
|
|
detections: List[SafetyDetection] = None) -> List[PersonSafetyStatus]:
|
|
|
"""
|
|
|
@@ -420,20 +744,23 @@ class LLMSafetyDetector:
|
|
|
def __init__(self, yolo_model_path: str = None,
|
|
|
llm_config: Dict[str, Any] = None,
|
|
|
use_gpu: bool = True,
|
|
|
- use_llm: bool = True):
|
|
|
+ use_llm: bool = True,
|
|
|
+ model_type: str = 'auto'):
|
|
|
"""
|
|
|
初始化检测器
|
|
|
|
|
|
Args:
|
|
|
- yolo_model_path: YOLO 模型路径
|
|
|
+ yolo_model_path: 模型路径 (.pt, .rknn, .onnx)
|
|
|
llm_config: 大模型配置
|
|
|
- use_gpu: 是否使用 GPU
|
|
|
+ use_gpu: 是否使用 GPU (仅 YOLO 模型有效)
|
|
|
use_llm: 是否使用大模型判断
|
|
|
+ model_type: 模型类型 ('auto', 'yolo', 'rknn', 'onnx')
|
|
|
"""
|
|
|
- # YOLO 检测器
|
|
|
+ # 安全检测器 (支持 YOLO/RKNN/ONNX)
|
|
|
self.yolo_detector = SafetyDetector(
|
|
|
model_path=yolo_model_path,
|
|
|
- use_gpu=use_gpu
|
|
|
+ use_gpu=use_gpu,
|
|
|
+ model_type=model_type
|
|
|
)
|
|
|
|
|
|
# 大模型分析器
|