""" 施工现场安全行为检测模块 使用 YOLO11 模型检测人员、安全帽、反光衣 判断是否存在违规行为(未戴安全帽、未穿反光衣) 支持两种模型格式: - YOLO (.pt/.onnx): 使用 ultralytics 库 - RKNN (.rknn): 使用 rknnlite 库 (RK3588 平台) """ import cv2 import numpy as np from typing import Optional, List, Tuple, Dict, Any from dataclasses import dataclass from enum import Enum import os # ============================================ # RKNN 模型支持 # ============================================ @dataclass class Detection: """检测结果 (用于 RKNN 模型)""" class_id: int class_name: str confidence: float bbox: Tuple[int, int, int, int] def nms(dets, iou_threshold=0.45): """非极大值抑制""" if len(dets) == 0: return [] boxes = np.array([[d.bbox[0], d.bbox[1], d.bbox[2], d.bbox[3], d.confidence] for d in dets]) x1 = boxes[:, 0] y1 = boxes[:, 1] x2 = boxes[:, 2] y2 = boxes[:, 3] scores = boxes[:, 4] areas = (x2 - x1 + 1) * (y2 - y1 + 1) order = scores.argsort()[::-1] keep = [] while order.size > 0: i = order[0] keep.append(i) xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) w = np.maximum(0.0, xx2 - xx1 + 1) h = np.maximum(0.0, yy2 - yy1 + 1) inter = w * h ovr = inter / (areas[i] + areas[order[1:]] - inter) inds = np.where(ovr <= iou_threshold)[0] order = order[inds + 1] return [dets[i] for i in keep] class BaseDetector: """检测器基类 (用于 RKNN/ONNX 模型)""" # 类别映射: 0: 安全帽, 3: 人, 4: 反光衣 LABEL_MAP = {0: '安全帽', 4: '安全衣', 3: '人'} def __init__(self): self.input_size = (640, 640) self.num_classes = 5 def letterbox(self, image): """Letterbox 预处理,保持宽高比""" h0, w0 = image.shape[:2] ih, iw = self.input_size scale = min(iw / w0, ih / h0) new_w, new_h = int(w0 * scale), int(h0 * scale) pad_w = (iw - new_w) // 2 pad_h = (ih - new_h) // 2 resized = cv2.resize(image, (new_w, new_h)) canvas = np.full((ih, iw, 3), 114, dtype=np.uint8) canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized return canvas, scale, pad_w, pad_h, h0, w0 def postprocess(self, outputs, scale, pad_w, pad_h, h0, w0, conf_threshold_map): """后处理""" dets = [] if not outputs: return dets output = outputs[0] if len(output.shape) == 3: output = output[0] num_boxes = output.shape[1] for i in range(num_boxes): x_center = float(output[0, i]) y_center = float(output[1, i]) width = float(output[2, i]) height = float(output[3, i]) class_probs = output[4:4+self.num_classes, i] best_class = int(np.argmax(class_probs)) confidence = float(class_probs[best_class]) if best_class not in self.LABEL_MAP: continue conf_threshold = conf_threshold_map.get(best_class, 0.5) if confidence < conf_threshold: continue # 移除 padding 并缩放到原始图像尺寸 x1 = int(((x_center - width / 2) - pad_w) / scale) y1 = int(((y_center - height / 2) - pad_h) / scale) x2 = int(((x_center + width / 2) - pad_w) / scale) y2 = int(((y_center + height / 2) - pad_h) / scale) x1 = max(0, min(w0, x1)) y1 = max(0, min(h0, y1)) x2 = max(0, min(w0, x2)) y2 = max(0, min(h0, y2)) det = Detection( class_id=best_class, class_name=self.LABEL_MAP[best_class], confidence=confidence, bbox=(x1, y1, x2, y2) ) dets.append(det) dets = nms(dets, iou_threshold=0.45) return dets def detect(self, image, conf_threshold_map): raise NotImplementedError def release(self): pass class RKNNDetector(BaseDetector): """RKNN 检测器 - 使用 NHWC 输入格式 (1, H, W, C)""" def __init__(self, model_path: str): super().__init__() self.model_path = model_path self.rknn = None try: from rknnlite.api import RKNNLite self.rknn = RKNNLite() except ImportError: raise ImportError("未安装 rknnlite,请运行: pip install rknnlite2 或参考 testrk3588/setup_rknn.sh") ret = self.rknn.load_rknn(model_path) if ret != 0: raise RuntimeError(f"加载 RKNN 模型失败: {model_path}") ret = self.rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2) if ret != 0: raise RuntimeError(f"初始化 RKNN 运行时失败") print(f"RKNN 模型加载成功: {model_path}") def detect(self, image, conf_threshold_map): canvas, scale, pad_w, pad_h, h0, w0 = self.letterbox(image) # RKNN 期望 NHWC (1, H, W, C), RGB, 归一化 0-1 img = canvas[..., ::-1].astype(np.float32) / 255.0 blob = img[None, ...] # (1, 640, 640, 3) outs = self.rknn.inference(inputs=[blob]) return self.postprocess(outs, scale, pad_w, pad_h, h0, w0, conf_threshold_map) def release(self): if self.rknn: self.rknn.release() self.rknn = None class ONNXDetector(BaseDetector): """ONNX 检测器 - 使用 NCHW 输入格式 (1, C, H, W)""" def __init__(self, model_path: str): super().__init__() self.model_path = model_path try: import onnxruntime as ort self.session = ort.InferenceSession(model_path) self.input_name = self.session.get_inputs()[0].name self.output_name = self.session.get_outputs()[0].name print(f"ONNX 模型加载成功: {model_path}") except ImportError: raise ImportError("未安装 onnxruntime,请运行: pip install onnxruntime") except Exception as e: raise RuntimeError(f"加载 ONNX 模型失败: {e}") def detect(self, image, conf_threshold_map): canvas, scale, pad_w, pad_h, h0, w0 = self.letterbox(image) # ONNX 期望 NCHW (1, C, H, W), RGB, 归一化 0-1 img = canvas[..., ::-1].astype(np.float32) / 255.0 img = img.transpose(2, 0, 1) blob = img[None, ...] # (1, 3, 640, 640) outs = self.session.run([self.output_name], {self.input_name: blob}) return self.postprocess(outs, scale, pad_w, pad_h, h0, w0, conf_threshold_map) def release(self): self.session = None def create_detector(model_path: str): """ 创建检测器工厂函数 Args: model_path: 模型路径 (.rknn, .onnx, .pt) Returns: 检测器实例 """ ext = os.path.splitext(model_path)[1].lower() if ext == '.rknn': print(f"使用 RKNN 模型: {model_path}") return RKNNDetector(model_path) elif ext == '.onnx': print(f"使用 ONNX 模型: {model_path}") return ONNXDetector(model_path) elif ext == '.pt': print(f"使用 YOLO 模型: {model_path}") return None # YOLO 使用原来的 SafetyDetector else: raise ValueError(f"不支持的模型格式: {ext}") # ============================================ # 原有 YOLO 安全检测器 # ============================================ class SafetyViolationType(Enum): """安全违规类型""" NO_HELMET = "未戴安全帽" # 未戴安全帽 NO_SAFETY_VEST = "未穿反光衣" # 未穿反光衣 NO_BOTH = "反光衣和安全帽都没戴" # 都没有 @dataclass class SafetyDetection: """安全检测结果""" # 基础信息 class_id: int # 类别ID class_name: str # 类别名称 confidence: float # 置信度 bbox: Tuple[int, int, int, int] # 边界框 (x1, y1, x2, y2) center: Tuple[int, int] # 中心点坐标 track_id: Optional[int] = None # 跟踪ID @dataclass class PersonSafetyStatus: """人员安全状态""" track_id: int # 跟踪ID person_bbox: Tuple[int, int, int, int] # 人体边界框 person_conf: float # 人体置信度 has_helmet: bool = False # 是否戴安全帽 helmet_conf: float = 0.0 # 安全帽置信度 has_safety_vest: bool = False # 是否穿反光衣 vest_conf: float = 0.0 # 反光衣置信度 is_violation: bool = False # 是否违规 violation_types: List[SafetyViolationType] = None # 违规类型列表 def __post_init__(self): if self.violation_types is None: self.violation_types = [] def check_violation(self) -> bool: """检查是否违规""" self.violation_types = [] if not self.has_helmet and not self.has_safety_vest: self.violation_types.append(SafetyViolationType.NO_BOTH) elif not self.has_helmet: self.violation_types.append(SafetyViolationType.NO_HELMET) elif not self.has_safety_vest: self.violation_types.append(SafetyViolationType.NO_SAFETY_VEST) self.is_violation = len(self.violation_types) > 0 return self.is_violation def get_violation_desc(self) -> str: """获取违规描述""" if not self.is_violation: return "" if SafetyViolationType.NO_BOTH in self.violation_types: return "反光衣和安全帽都没戴" elif SafetyViolationType.NO_HELMET in self.violation_types: return "未戴安全帽" elif SafetyViolationType.NO_SAFETY_VEST in self.violation_types: return "未穿反光衣" return "" class SafetyDetector: """ 施工现场安全检测器 使用 YOLO11 检测人员、安全帽、反光衣 """ CLASS_MAP = { 0: '安全帽', 3: '人', 4: '反光衣' } CLASS_ID_MAP = { 'helmet': 0, 'person': 3, 'safety_vest': 4 } def __init__(self, model_path: str = None, use_gpu: bool = True, conf_threshold: float = 0.5, person_threshold: float = 0.8, model_type: str = 'auto'): """ 初始化安全检测器 Args: model_path: 模型路径,默认使用 yolo11m_safety.pt 或 .rknn use_gpu: 是否使用 GPU (仅 YOLO 模型有效) conf_threshold: 一般物品置信度阈值 (安全帽、反光衣) person_threshold: 人员检测置信度阈值 model_type: 模型类型 ('auto', 'yolo', 'rknn', 'onnx') """ self.model = None self.rknn_detector = None self.model_type = model_type # 根据扩展名自动判断模型类型 if model_path: ext = os.path.splitext(model_path)[1].lower() if ext == '.rknn': self.model_type = 'rknn' elif ext == '.onnx': self.model_type = 'onnx' elif ext == '.pt': self.model_type = 'yolo' self.model_path = model_path self.use_gpu = use_gpu self.device = 'cuda:0' if use_gpu else 'cpu' self.conf_threshold = conf_threshold self.person_threshold = person_threshold self._load_model() def _load_model(self): """加载检测模型""" if self.model_type == 'rknn': self._load_rknn_model() elif self.model_type == 'onnx': self._load_onnx_model() else: self._load_yolo_model() def _load_rknn_model(self): """加载 RKNN 模型""" if not self.model_path: raise ValueError("RKNN 模型需要指定 model_path") try: self.rknn_detector = RKNNDetector(self.model_path) print(f"RKNN 安全检测模型加载成功: {self.model_path}") except ImportError as e: raise ImportError(f"rknnlite 未安装: {e}") except Exception as e: raise RuntimeError(f"加载 RKNN 模型失败: {e}") def _load_onnx_model(self): """加载 ONNX 模型""" if not self.model_path: raise ValueError("ONNX 模型需要指定 model_path") try: self.rknn_detector = ONNXDetector(self.model_path) print(f"ONNX 安全检测模型加载成功: {self.model_path}") except ImportError as e: raise ImportError(f"onnxruntime 未安装: {e}") except Exception as e: raise RuntimeError(f"加载 ONNX 模型失败: {e}") def _load_yolo_model(self): """加载 YOLO11 安全检测模型""" try: from ultralytics import YOLO if not self.model_path: self.model_path = '/home/wen/dsh/yolo/yolo11m_safety.pt' self.model = YOLO(self.model_path) dummy = np.zeros((640, 640, 3), dtype=np.uint8) self.model(dummy, device=self.device, verbose=False) print(f"YOLO 安全检测模型加载成功: {self.model_path} (device={self.device})") except ImportError: raise ImportError("未安装 ultralytics,请运行: pip install ultralytics") except Exception as e: raise RuntimeError(f"加载 YOLO 模型失败: {e}") def detect(self, frame: np.ndarray) -> List[SafetyDetection]: """ 检测画面中的安全相关对象 Args: frame: 输入图像 Returns: 检测结果列表 """ if frame is None: return [] if self.rknn_detector is not None: return self._detect_rknn(frame) else: return self._detect_yolo(frame) def _detect_rknn(self, frame: np.ndarray) -> List[SafetyDetection]: """使用 RKNN/ONNX 模型检测""" results = [] try: conf_threshold_map = { 3: self.person_threshold, 0: self.conf_threshold, 4: self.conf_threshold } detections = self.rknn_detector.detect(frame, conf_threshold_map) for det in detections: x1, y1, x2, y2 = det.bbox center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 safety_det = SafetyDetection( class_id=det.class_id, class_name=det.class_name, confidence=det.confidence, bbox=det.bbox, center=(center_x, center_y) ) results.append(safety_det) except Exception as e: print(f"RKNN 检测错误: {e}") return results def _detect_yolo(self, frame: np.ndarray) -> List[SafetyDetection]: """使用 YOLO 模型检测""" results = [] try: detections = self.model(frame, device=self.device, verbose=False) for det in detections: boxes = det.boxes if boxes is None: continue for i in range(len(boxes)): cls_id = int(boxes.cls[i]) if cls_id not in self.CLASS_MAP: continue cls_name = self.CLASS_MAP[cls_id] conf = float(boxes.conf[i]) threshold = self.person_threshold if cls_id == 3 else self.conf_threshold if conf < threshold: continue xyxy = boxes.xyxy[i].cpu().numpy() x1, y1, x2, y2 = map(int, xyxy) width = x2 - x1 height = y2 - y1 if width < 10 or height < 10: continue center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 detection = SafetyDetection( class_id=cls_id, class_name=cls_name, confidence=conf, bbox=(x1, y1, x2, y2), center=(center_x, center_y) ) results.append(detection) except Exception as e: print(f"YOLO 检测错误: {e}") return results def release(self): """释放模型资源""" if self.rknn_detector: self.rknn_detector.release() self.rknn_detector = None self.model = None def check_safety(self, frame: np.ndarray, detections: List[SafetyDetection] = None) -> List[PersonSafetyStatus]: """ 检查人员安全状态 Args: frame: 输入图像 detections: 检测结果,如果为 None 则自动检测 Returns: 人员安全状态列表 """ if detections is None: detections = self.detect(frame) # 分类检测结果 persons = [] helmets = [] vests = [] for det in detections: if det.class_id == 3: # 人 persons.append(det) elif det.class_id == 0: # 安全帽 helmets.append(det) elif det.class_id == 4: # 反光衣 vests.append(det) # 检查每个人员的安全状态 results = [] for person in persons: status = PersonSafetyStatus( track_id=person.track_id or 0, person_bbox=person.bbox, person_conf=person.confidence ) px1, py1, px2, py2 = person.bbox # 检查是否戴安全帽 # 安全帽应该在人体上方区域(头部附近) for helmet in helmets: hx1, hy1, hx2, hy2 = helmet.bbox # 检查安全帽是否在人体框内 helmet_center_x = (hx1 + hx2) / 2 helmet_center_y = (hy1 + hy2) / 2 # 安全帽中心在人体框内,且在人体上半部分 if (hx1 >= px1 and hx2 <= px2 and helmet_center_y >= py1 and helmet_center_y <= py1 + (py2 - py1) * 0.5): status.has_helmet = True status.helmet_conf = helmet.confidence break # 检查是否穿反光衣 # 反光衣应该与人体有重叠 for vest in vests: vx1, vy1, vx2, vy2 = vest.bbox # 计算重叠区域 overlap_x1 = max(px1, vx1) overlap_y1 = max(py1, vy1) overlap_x2 = min(px2, vx2) overlap_y2 = min(py2, vy2) # 如果有重叠 if overlap_x1 < overlap_x2 and overlap_y1 < overlap_y2: # 计算重叠面积占比 overlap_area = (overlap_x2 - overlap_x1) * (overlap_y2 - overlap_y1) vest_area = (vx2 - vx1) * (vy2 - vy1) overlap_ratio = overlap_area / vest_area if vest_area > 0 else 0 # 重叠比例超过30%认为穿了反光衣 if overlap_ratio > 0.3: status.has_safety_vest = True status.vest_conf = vest.confidence break # 检查是否违规 status.check_violation() results.append(status) return results # 轨迹追踪已禁用 - detect_with_tracking 方法已移除 def draw_safety_result(frame: np.ndarray, detections: List[SafetyDetection], status_list: List[PersonSafetyStatus]) -> np.ndarray: """ 在图像上绘制安全检测结果 Args: frame: 输入图像 detections: 检测结果 status_list: 人员安全状态 Returns: 绘制后的图像 """ result = frame.copy() # 绘制检测框 for det in detections: x1, y1, x2, y2 = det.bbox # 根据类别选择颜色 if det.class_id == 3: # 人 color = (0, 255, 0) # 绿色 elif det.class_id == 0: # 安全帽 color = (255, 165, 0) # 橙色 elif det.class_id == 4: # 反光衣 color = (0, 165, 255) # 黄色 else: color = (255, 255, 255) cv2.rectangle(result, (x1, y1), (x2, y2), color, 2) # 绘制标签 label = f"{det.class_name}: {det.conf:.2f}" cv2.putText(result, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1) # 绘制安全状态 for status in status_list: x1, y1, x2, y2 = status.person_bbox if status.is_violation: # 违规 - 红色警告 color = (0, 0, 255) text = status.get_violation_desc() cv2.rectangle(result, (x1, y1), (x2, y2), color, 3) cv2.putText(result, text, (x1, y2 + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) else: # 正常 - 显示安全标识 color = (0, 255, 0) text = "安全装备齐全" cv2.putText(result, text, (x1, y2 + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1) return result class LLMSafetyDetector: """ 基于大模型的安全检测器 结合 YOLO 检测和大模型判断 """ def __init__(self, yolo_model_path: str = None, llm_config: Dict[str, Any] = None, use_gpu: bool = True, use_llm: bool = True, model_type: str = 'auto'): """ 初始化检测器 Args: yolo_model_path: 模型路径 (.pt, .rknn, .onnx) llm_config: 大模型配置 use_gpu: 是否使用 GPU (仅 YOLO 模型有效) use_llm: 是否使用大模型判断 model_type: 模型类型 ('auto', 'yolo', 'rknn', 'onnx') """ # 安全检测器 (支持 YOLO/RKNN/ONNX) self.yolo_detector = SafetyDetector( model_path=yolo_model_path, use_gpu=use_gpu, model_type=model_type ) # 大模型分析器 self.use_llm = use_llm self.llm_analyzer = None if use_llm: try: from llm_service import SafetyAnalyzer, NumberRecognizer self.llm_analyzer = SafetyAnalyzer(llm_config) self.number_recognizer = NumberRecognizer(llm_config) print("大模型安全分析器初始化成功") except ImportError: print("未找到 llm_service 模块,将使用规则判断") self.use_llm = False except Exception as e: print(f"大模型初始化失败: {e},将使用规则判断") self.use_llm = False def detect(self, frame: np.ndarray) -> List[SafetyDetection]: """ YOLO 检测 Args: frame: 输入图像 Returns: 检测结果列表 """ return self.yolo_detector.detect(frame) def check_safety(self, frame: np.ndarray, detections: List[SafetyDetection] = None, use_llm: bool = None) -> List[PersonSafetyStatus]: """ 检查人员安全状态 Args: frame: 输入图像 detections: YOLO 检测结果 use_llm: 是否使用大模型(覆盖默认设置) Returns: 人员安全状态列表 """ # 先用 YOLO 检测 if detections is None: detections = self.yolo_detector.detect(frame) # 规则判断 rule_status_list = self.yolo_detector.check_safety(frame, detections) # 如果不使用大模型,直接返回规则判断结果 should_use_llm = use_llm if use_llm is not None else self.use_llm if not should_use_llm or self.llm_analyzer is None: return rule_status_list # 使用大模型对每个人员进行判断 llm_status_list = [] for status in rule_status_list: # 裁剪人员区域 x1, y1, x2, y2 = status.person_bbox margin = 10 x1 = max(0, x1 - margin) y1 = max(0, y1 - margin) x2 = min(frame.shape[1], x2 + margin) y2 = min(frame.shape[0], y2 + margin) person_image = frame[y1:y2, x1:x2] # 调用大模型分析 try: llm_result = self.llm_analyzer.check_person_safety(person_image) # 更新状态 if llm_result.get('success', False): status.has_helmet = llm_result.get('has_helmet', False) status.has_safety_vest = llm_result.get('has_vest', False) # 重新检查违规 status.check_violation() # 如果大模型判断有违规,使用大模型的描述 if status.is_violation and llm_result.get('violation_desc'): # 更新违规类型 desc = llm_result.get('violation_desc', '') if '安全帽' in desc and '反光' in desc: status.violation_types = [SafetyViolationType.NO_BOTH] elif '安全帽' in desc: status.violation_types = [SafetyViolationType.NO_HELMET] elif '反光' in desc: status.violation_types = [SafetyViolationType.NO_SAFETY_VEST] except Exception as e: print(f"大模型分析失败: {e}") llm_status_list.append(status) return llm_status_list def recognize_number(self, frame: np.ndarray, person_bbox: Tuple[int, int, int, int]) -> Dict[str, Any]: """ 识别人员编号 Args: frame: 输入图像 person_bbox: 人员边界框 Returns: 编号识别结果 """ if self.number_recognizer is None: return {'number': None, 'success': False} # 裁剪人员区域 x1, y1, x2, y2 = person_bbox person_image = frame[y1:y2, x1:x2] return self.number_recognizer.recognize_person_number(person_image) # 轨迹追踪已禁用 - detect_with_tracking 方法已移除