| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824 |
- """
- 施工现场安全行为检测模块
- 使用 YOLO11 模型检测人员、安全帽、反光衣
- 判断是否存在违规行为(未戴安全帽、未穿反光衣)
- 支持两种模型格式:
- - YOLO (.pt/.onnx): 使用 ultralytics 库
- - RKNN (.rknn): 使用 rknnlite 库 (RK3588 平台)
- """
- import cv2
- import numpy as np
- from typing import Optional, List, Tuple, Dict, Any
- from dataclasses import dataclass
- from enum import Enum
- import os
- # ============================================
- # RKNN 模型支持
- # ============================================
- @dataclass
- class Detection:
- """检测结果 (用于 RKNN 模型)"""
- class_id: int
- class_name: str
- confidence: float
- bbox: Tuple[int, int, int, int]
- def nms(dets, iou_threshold=0.45):
- """非极大值抑制"""
- if len(dets) == 0:
- return []
-
- boxes = np.array([[d.bbox[0], d.bbox[1], d.bbox[2], d.bbox[3], d.confidence] for d in dets])
- x1 = boxes[:, 0]
- y1 = boxes[:, 1]
- x2 = boxes[:, 2]
- y2 = boxes[:, 3]
- scores = boxes[:, 4]
-
- areas = (x2 - x1 + 1) * (y2 - y1 + 1)
- order = scores.argsort()[::-1]
-
- keep = []
- while order.size > 0:
- i = order[0]
- keep.append(i)
-
- xx1 = np.maximum(x1[i], x1[order[1:]])
- yy1 = np.maximum(y1[i], y1[order[1:]])
- xx2 = np.minimum(x2[i], x2[order[1:]])
- yy2 = np.minimum(y2[i], y2[order[1:]])
-
- w = np.maximum(0.0, xx2 - xx1 + 1)
- h = np.maximum(0.0, yy2 - yy1 + 1)
- inter = w * h
- ovr = inter / (areas[i] + areas[order[1:]] - inter)
-
- inds = np.where(ovr <= iou_threshold)[0]
- order = order[inds + 1]
-
- return [dets[i] for i in keep]
- class BaseDetector:
- """检测器基类 (用于 RKNN/ONNX 模型)"""
-
- # 类别映射: 0: 安全帽, 3: 人, 4: 反光衣
- LABEL_MAP = {0: '安全帽', 4: '安全衣', 3: '人'}
-
- def __init__(self):
- self.input_size = (640, 640)
- self.num_classes = 5
-
- def letterbox(self, image):
- """Letterbox 预处理,保持宽高比"""
- h0, w0 = image.shape[:2]
- ih, iw = self.input_size
- scale = min(iw / w0, ih / h0)
- new_w, new_h = int(w0 * scale), int(h0 * scale)
- pad_w = (iw - new_w) // 2
- pad_h = (ih - new_h) // 2
- resized = cv2.resize(image, (new_w, new_h))
- canvas = np.full((ih, iw, 3), 114, dtype=np.uint8)
- canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized
- return canvas, scale, pad_w, pad_h, h0, w0
-
- def postprocess(self, outputs, scale, pad_w, pad_h, h0, w0, conf_threshold_map):
- """后处理"""
- dets = []
-
- if not outputs:
- return dets
-
- output = outputs[0]
-
- if len(output.shape) == 3:
- output = output[0]
-
- num_boxes = output.shape[1]
-
- for i in range(num_boxes):
- x_center = float(output[0, i])
- y_center = float(output[1, i])
- width = float(output[2, i])
- height = float(output[3, i])
-
- class_probs = output[4:4+self.num_classes, i]
- best_class = int(np.argmax(class_probs))
- confidence = float(class_probs[best_class])
-
- if best_class not in self.LABEL_MAP:
- continue
-
- conf_threshold = conf_threshold_map.get(best_class, 0.5)
-
- if confidence < conf_threshold:
- continue
-
- # 移除 padding 并缩放到原始图像尺寸
- x1 = int(((x_center - width / 2) - pad_w) / scale)
- y1 = int(((y_center - height / 2) - pad_h) / scale)
- x2 = int(((x_center + width / 2) - pad_w) / scale)
- y2 = int(((y_center + height / 2) - pad_h) / scale)
-
- x1 = max(0, min(w0, x1))
- y1 = max(0, min(h0, y1))
- x2 = max(0, min(w0, x2))
- y2 = max(0, min(h0, y2))
-
- det = Detection(
- class_id=best_class,
- class_name=self.LABEL_MAP[best_class],
- confidence=confidence,
- bbox=(x1, y1, x2, y2)
- )
- dets.append(det)
-
- dets = nms(dets, iou_threshold=0.45)
- return dets
-
- def detect(self, image, conf_threshold_map):
- raise NotImplementedError
-
- def release(self):
- pass
- class RKNNDetector(BaseDetector):
- """RKNN 检测器 - 使用 NHWC 输入格式 (1, H, W, C)"""
-
- def __init__(self, model_path: str):
- super().__init__()
- self.model_path = model_path
- self.rknn = None
-
- try:
- from rknnlite.api import RKNNLite
- self.rknn = RKNNLite()
- except ImportError:
- raise ImportError("未安装 rknnlite,请运行: pip install rknnlite2 或参考 testrk3588/setup_rknn.sh")
-
- ret = self.rknn.load_rknn(model_path)
- if ret != 0:
- raise RuntimeError(f"加载 RKNN 模型失败: {model_path}")
-
- ret = self.rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2)
- if ret != 0:
- raise RuntimeError(f"初始化 RKNN 运行时失败")
-
- print(f"RKNN 模型加载成功: {model_path}")
-
- def detect(self, image, conf_threshold_map):
- canvas, scale, pad_w, pad_h, h0, w0 = self.letterbox(image)
- # RKNN 期望 NHWC (1, H, W, C), RGB, 归一化 0-1
- img = canvas[..., ::-1].astype(np.float32) / 255.0
- blob = img[None, ...] # (1, 640, 640, 3)
- outs = self.rknn.inference(inputs=[blob])
- return self.postprocess(outs, scale, pad_w, pad_h, h0, w0, conf_threshold_map)
-
- def release(self):
- if self.rknn:
- self.rknn.release()
- self.rknn = None
- class ONNXDetector(BaseDetector):
- """ONNX 检测器 - 使用 NCHW 输入格式 (1, C, H, W)"""
-
- def __init__(self, model_path: str):
- super().__init__()
- self.model_path = model_path
-
- try:
- import onnxruntime as ort
- self.session = ort.InferenceSession(model_path)
- self.input_name = self.session.get_inputs()[0].name
- self.output_name = self.session.get_outputs()[0].name
- print(f"ONNX 模型加载成功: {model_path}")
- except ImportError:
- raise ImportError("未安装 onnxruntime,请运行: pip install onnxruntime")
- except Exception as e:
- raise RuntimeError(f"加载 ONNX 模型失败: {e}")
-
- def detect(self, image, conf_threshold_map):
- canvas, scale, pad_w, pad_h, h0, w0 = self.letterbox(image)
- # ONNX 期望 NCHW (1, C, H, W), RGB, 归一化 0-1
- img = canvas[..., ::-1].astype(np.float32) / 255.0
- img = img.transpose(2, 0, 1)
- blob = img[None, ...] # (1, 3, 640, 640)
- outs = self.session.run([self.output_name], {self.input_name: blob})
- return self.postprocess(outs, scale, pad_w, pad_h, h0, w0, conf_threshold_map)
-
- def release(self):
- self.session = None
- def create_detector(model_path: str):
- """
- 创建检测器工厂函数
-
- Args:
- model_path: 模型路径 (.rknn, .onnx, .pt)
-
- Returns:
- 检测器实例
- """
- ext = os.path.splitext(model_path)[1].lower()
-
- if ext == '.rknn':
- print(f"使用 RKNN 模型: {model_path}")
- return RKNNDetector(model_path)
- elif ext == '.onnx':
- print(f"使用 ONNX 模型: {model_path}")
- return ONNXDetector(model_path)
- elif ext == '.pt':
- print(f"使用 YOLO 模型: {model_path}")
- return None # YOLO 使用原来的 SafetyDetector
- else:
- raise ValueError(f"不支持的模型格式: {ext}")
- # ============================================
- # 原有 YOLO 安全检测器
- # ============================================
- class SafetyViolationType(Enum):
- """安全违规类型"""
- NO_HELMET = "未戴安全帽" # 未戴安全帽
- NO_SAFETY_VEST = "未穿反光衣" # 未穿反光衣
- NO_BOTH = "反光衣和安全帽都没戴" # 都没有
- @dataclass
- class SafetyDetection:
- """安全检测结果"""
- # 基础信息
- class_id: int # 类别ID
- class_name: str # 类别名称
- confidence: float # 置信度
- bbox: Tuple[int, int, int, int] # 边界框 (x1, y1, x2, y2)
- center: Tuple[int, int] # 中心点坐标
- track_id: Optional[int] = None # 跟踪ID
- @dataclass
- class PersonSafetyStatus:
- """人员安全状态"""
- track_id: int # 跟踪ID
- person_bbox: Tuple[int, int, int, int] # 人体边界框
- person_conf: float # 人体置信度
- has_helmet: bool = False # 是否戴安全帽
- helmet_conf: float = 0.0 # 安全帽置信度
- has_safety_vest: bool = False # 是否穿反光衣
- vest_conf: float = 0.0 # 反光衣置信度
- is_violation: bool = False # 是否违规
- violation_types: List[SafetyViolationType] = None # 违规类型列表
-
- def __post_init__(self):
- if self.violation_types is None:
- self.violation_types = []
-
- def check_violation(self) -> bool:
- """检查是否违规"""
- self.violation_types = []
-
- if not self.has_helmet and not self.has_safety_vest:
- self.violation_types.append(SafetyViolationType.NO_BOTH)
- elif not self.has_helmet:
- self.violation_types.append(SafetyViolationType.NO_HELMET)
- elif not self.has_safety_vest:
- self.violation_types.append(SafetyViolationType.NO_SAFETY_VEST)
-
- self.is_violation = len(self.violation_types) > 0
- return self.is_violation
-
- def get_violation_desc(self) -> str:
- """获取违规描述"""
- if not self.is_violation:
- return ""
-
- if SafetyViolationType.NO_BOTH in self.violation_types:
- return "反光衣和安全帽都没戴"
- elif SafetyViolationType.NO_HELMET in self.violation_types:
- return "未戴安全帽"
- elif SafetyViolationType.NO_SAFETY_VEST in self.violation_types:
- return "未穿反光衣"
- return ""
- class SafetyDetector:
- """
- 施工现场安全检测器
- 使用 YOLO11 检测人员、安全帽、反光衣
- """
-
- CLASS_MAP = {
- 0: '安全帽',
- 3: '人',
- 4: '反光衣'
- }
-
- CLASS_ID_MAP = {
- 'helmet': 0,
- 'person': 3,
- 'safety_vest': 4
- }
-
- def __init__(self, model_path: str = None, use_gpu: bool = True,
- conf_threshold: float = 0.5, person_threshold: float = 0.8,
- model_type: str = 'auto'):
- """
- 初始化安全检测器
-
- Args:
- model_path: 模型路径,默认使用 yolo11m_safety.pt 或 .rknn
- use_gpu: 是否使用 GPU (仅 YOLO 模型有效)
- conf_threshold: 一般物品置信度阈值 (安全帽、反光衣)
- person_threshold: 人员检测置信度阈值
- model_type: 模型类型 ('auto', 'yolo', 'rknn', 'onnx')
- """
- self.model = None
- self.rknn_detector = None
- self.model_type = model_type
-
- # 根据扩展名自动判断模型类型
- if model_path:
- ext = os.path.splitext(model_path)[1].lower()
- if ext == '.rknn':
- self.model_type = 'rknn'
- elif ext == '.onnx':
- self.model_type = 'onnx'
- elif ext == '.pt':
- self.model_type = 'yolo'
-
- self.model_path = model_path
- self.use_gpu = use_gpu
- self.device = 'cuda:0' if use_gpu else 'cpu'
-
- self.conf_threshold = conf_threshold
- self.person_threshold = person_threshold
-
- self._load_model()
-
- def _load_model(self):
- """加载检测模型"""
- if self.model_type == 'rknn':
- self._load_rknn_model()
- elif self.model_type == 'onnx':
- self._load_onnx_model()
- else:
- self._load_yolo_model()
-
- def _load_rknn_model(self):
- """加载 RKNN 模型"""
- if not self.model_path:
- raise ValueError("RKNN 模型需要指定 model_path")
-
- try:
- self.rknn_detector = RKNNDetector(self.model_path)
- print(f"RKNN 安全检测模型加载成功: {self.model_path}")
- except ImportError as e:
- raise ImportError(f"rknnlite 未安装: {e}")
- except Exception as e:
- raise RuntimeError(f"加载 RKNN 模型失败: {e}")
-
- def _load_onnx_model(self):
- """加载 ONNX 模型"""
- if not self.model_path:
- raise ValueError("ONNX 模型需要指定 model_path")
-
- try:
- self.rknn_detector = ONNXDetector(self.model_path)
- print(f"ONNX 安全检测模型加载成功: {self.model_path}")
- except ImportError as e:
- raise ImportError(f"onnxruntime 未安装: {e}")
- except Exception as e:
- raise RuntimeError(f"加载 ONNX 模型失败: {e}")
-
- def _load_yolo_model(self):
- """加载 YOLO11 安全检测模型"""
- try:
- from ultralytics import YOLO
-
- if not self.model_path:
- self.model_path = '/home/wen/dsh/yolo/yolo11m_safety.pt'
-
- self.model = YOLO(self.model_path)
-
- dummy = np.zeros((640, 640, 3), dtype=np.uint8)
- self.model(dummy, device=self.device, verbose=False)
-
- print(f"YOLO 安全检测模型加载成功: {self.model_path} (device={self.device})")
- except ImportError:
- raise ImportError("未安装 ultralytics,请运行: pip install ultralytics")
- except Exception as e:
- raise RuntimeError(f"加载 YOLO 模型失败: {e}")
-
- def detect(self, frame: np.ndarray) -> List[SafetyDetection]:
- """
- 检测画面中的安全相关对象
-
- Args:
- frame: 输入图像
-
- Returns:
- 检测结果列表
- """
- if frame is None:
- return []
-
- if self.rknn_detector is not None:
- return self._detect_rknn(frame)
- else:
- return self._detect_yolo(frame)
-
- def _detect_rknn(self, frame: np.ndarray) -> List[SafetyDetection]:
- """使用 RKNN/ONNX 模型检测"""
- results = []
-
- try:
- conf_threshold_map = {
- 3: self.person_threshold,
- 0: self.conf_threshold,
- 4: self.conf_threshold
- }
-
- detections = self.rknn_detector.detect(frame, conf_threshold_map)
-
- for det in detections:
- x1, y1, x2, y2 = det.bbox
- center_x = (x1 + x2) // 2
- center_y = (y1 + y2) // 2
-
- safety_det = SafetyDetection(
- class_id=det.class_id,
- class_name=det.class_name,
- confidence=det.confidence,
- bbox=det.bbox,
- center=(center_x, center_y)
- )
- results.append(safety_det)
-
- except Exception as e:
- print(f"RKNN 检测错误: {e}")
-
- return results
-
- def _detect_yolo(self, frame: np.ndarray) -> List[SafetyDetection]:
- """使用 YOLO 模型检测"""
- results = []
-
- try:
- detections = self.model(frame, device=self.device, verbose=False)
-
- for det in detections:
- boxes = det.boxes
- if boxes is None:
- continue
-
- for i in range(len(boxes)):
- cls_id = int(boxes.cls[i])
-
- if cls_id not in self.CLASS_MAP:
- continue
-
- cls_name = self.CLASS_MAP[cls_id]
- conf = float(boxes.conf[i])
-
- threshold = self.person_threshold if cls_id == 3 else self.conf_threshold
- if conf < threshold:
- continue
-
- xyxy = boxes.xyxy[i].cpu().numpy()
- x1, y1, x2, y2 = map(int, xyxy)
-
- width = x2 - x1
- height = y2 - y1
- if width < 10 or height < 10:
- continue
-
- center_x = (x1 + x2) // 2
- center_y = (y1 + y2) // 2
-
- detection = SafetyDetection(
- class_id=cls_id,
- class_name=cls_name,
- confidence=conf,
- bbox=(x1, y1, x2, y2),
- center=(center_x, center_y)
- )
- results.append(detection)
-
- except Exception as e:
- print(f"YOLO 检测错误: {e}")
-
- return results
-
- def release(self):
- """释放模型资源"""
- if self.rknn_detector:
- self.rknn_detector.release()
- self.rknn_detector = None
- self.model = None
-
- def check_safety(self, frame: np.ndarray,
- detections: List[SafetyDetection] = None) -> List[PersonSafetyStatus]:
- """
- 检查人员安全状态
-
- Args:
- frame: 输入图像
- detections: 检测结果,如果为 None 则自动检测
-
- Returns:
- 人员安全状态列表
- """
- if detections is None:
- detections = self.detect(frame)
-
- # 分类检测结果
- persons = []
- helmets = []
- vests = []
-
- for det in detections:
- if det.class_id == 3: # 人
- persons.append(det)
- elif det.class_id == 0: # 安全帽
- helmets.append(det)
- elif det.class_id == 4: # 反光衣
- vests.append(det)
-
- # 检查每个人员的安全状态
- results = []
-
- for person in persons:
- status = PersonSafetyStatus(
- track_id=person.track_id or 0,
- person_bbox=person.bbox,
- person_conf=person.confidence
- )
-
- px1, py1, px2, py2 = person.bbox
-
- # 检查是否戴安全帽
- # 安全帽应该在人体上方区域(头部附近)
- for helmet in helmets:
- hx1, hy1, hx2, hy2 = helmet.bbox
- # 检查安全帽是否在人体框内
- helmet_center_x = (hx1 + hx2) / 2
- helmet_center_y = (hy1 + hy2) / 2
-
- # 安全帽中心在人体框内,且在人体上半部分
- if (hx1 >= px1 and hx2 <= px2 and
- helmet_center_y >= py1 and
- helmet_center_y <= py1 + (py2 - py1) * 0.5):
- status.has_helmet = True
- status.helmet_conf = helmet.confidence
- break
-
- # 检查是否穿反光衣
- # 反光衣应该与人体有重叠
- for vest in vests:
- vx1, vy1, vx2, vy2 = vest.bbox
-
- # 计算重叠区域
- overlap_x1 = max(px1, vx1)
- overlap_y1 = max(py1, vy1)
- overlap_x2 = min(px2, vx2)
- overlap_y2 = min(py2, vy2)
-
- # 如果有重叠
- if overlap_x1 < overlap_x2 and overlap_y1 < overlap_y2:
- # 计算重叠面积占比
- overlap_area = (overlap_x2 - overlap_x1) * (overlap_y2 - overlap_y1)
- vest_area = (vx2 - vx1) * (vy2 - vy1)
- overlap_ratio = overlap_area / vest_area if vest_area > 0 else 0
-
- # 重叠比例超过30%认为穿了反光衣
- if overlap_ratio > 0.3:
- status.has_safety_vest = True
- status.vest_conf = vest.confidence
- break
-
- # 检查是否违规
- status.check_violation()
- results.append(status)
-
- return results
-
- # 轨迹追踪已禁用 - detect_with_tracking 方法已移除
- def draw_safety_result(frame: np.ndarray,
- detections: List[SafetyDetection],
- status_list: List[PersonSafetyStatus]) -> np.ndarray:
- """
- 在图像上绘制安全检测结果
-
- Args:
- frame: 输入图像
- detections: 检测结果
- status_list: 人员安全状态
-
- Returns:
- 绘制后的图像
- """
- result = frame.copy()
-
- # 绘制检测框
- for det in detections:
- x1, y1, x2, y2 = det.bbox
-
- # 根据类别选择颜色
- if det.class_id == 3: # 人
- color = (0, 255, 0) # 绿色
- elif det.class_id == 0: # 安全帽
- color = (255, 165, 0) # 橙色
- elif det.class_id == 4: # 反光衣
- color = (0, 165, 255) # 黄色
- else:
- color = (255, 255, 255)
-
- cv2.rectangle(result, (x1, y1), (x2, y2), color, 2)
-
- # 绘制标签
- label = f"{det.class_name}: {det.conf:.2f}"
- cv2.putText(result, label, (x1, y1 - 5),
- cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
-
- # 绘制安全状态
- for status in status_list:
- x1, y1, x2, y2 = status.person_bbox
-
- if status.is_violation:
- # 违规 - 红色警告
- color = (0, 0, 255)
- text = status.get_violation_desc()
- cv2.rectangle(result, (x1, y1), (x2, y2), color, 3)
- cv2.putText(result, text, (x1, y2 + 20),
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
- else:
- # 正常 - 显示安全标识
- color = (0, 255, 0)
- text = "安全装备齐全"
- cv2.putText(result, text, (x1, y2 + 20),
- cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
-
- return result
- class LLMSafetyDetector:
- """
- 基于大模型的安全检测器
- 结合 YOLO 检测和大模型判断
- """
-
- def __init__(self, yolo_model_path: str = None,
- llm_config: Dict[str, Any] = None,
- use_gpu: bool = True,
- use_llm: bool = True,
- model_type: str = 'auto'):
- """
- 初始化检测器
-
- Args:
- yolo_model_path: 模型路径 (.pt, .rknn, .onnx)
- llm_config: 大模型配置
- use_gpu: 是否使用 GPU (仅 YOLO 模型有效)
- use_llm: 是否使用大模型判断
- model_type: 模型类型 ('auto', 'yolo', 'rknn', 'onnx')
- """
- # 安全检测器 (支持 YOLO/RKNN/ONNX)
- self.yolo_detector = SafetyDetector(
- model_path=yolo_model_path,
- use_gpu=use_gpu,
- model_type=model_type
- )
-
- # 大模型分析器
- self.use_llm = use_llm
- self.llm_analyzer = None
-
- if use_llm:
- try:
- from llm_service import SafetyAnalyzer, NumberRecognizer
- self.llm_analyzer = SafetyAnalyzer(llm_config)
- self.number_recognizer = NumberRecognizer(llm_config)
- print("大模型安全分析器初始化成功")
- except ImportError:
- print("未找到 llm_service 模块,将使用规则判断")
- self.use_llm = False
- except Exception as e:
- print(f"大模型初始化失败: {e},将使用规则判断")
- self.use_llm = False
-
- def detect(self, frame: np.ndarray) -> List[SafetyDetection]:
- """
- YOLO 检测
-
- Args:
- frame: 输入图像
-
- Returns:
- 检测结果列表
- """
- return self.yolo_detector.detect(frame)
-
- def check_safety(self, frame: np.ndarray,
- detections: List[SafetyDetection] = None,
- use_llm: bool = None) -> List[PersonSafetyStatus]:
- """
- 检查人员安全状态
-
- Args:
- frame: 输入图像
- detections: YOLO 检测结果
- use_llm: 是否使用大模型(覆盖默认设置)
-
- Returns:
- 人员安全状态列表
- """
- # 先用 YOLO 检测
- if detections is None:
- detections = self.yolo_detector.detect(frame)
-
- # 规则判断
- rule_status_list = self.yolo_detector.check_safety(frame, detections)
-
- # 如果不使用大模型,直接返回规则判断结果
- should_use_llm = use_llm if use_llm is not None else self.use_llm
- if not should_use_llm or self.llm_analyzer is None:
- return rule_status_list
-
- # 使用大模型对每个人员进行判断
- llm_status_list = []
-
- for status in rule_status_list:
- # 裁剪人员区域
- x1, y1, x2, y2 = status.person_bbox
- margin = 10
- x1 = max(0, x1 - margin)
- y1 = max(0, y1 - margin)
- x2 = min(frame.shape[1], x2 + margin)
- y2 = min(frame.shape[0], y2 + margin)
-
- person_image = frame[y1:y2, x1:x2]
-
- # 调用大模型分析
- try:
- llm_result = self.llm_analyzer.check_person_safety(person_image)
-
- # 更新状态
- if llm_result.get('success', False):
- status.has_helmet = llm_result.get('has_helmet', False)
- status.has_safety_vest = llm_result.get('has_vest', False)
-
- # 重新检查违规
- status.check_violation()
-
- # 如果大模型判断有违规,使用大模型的描述
- if status.is_violation and llm_result.get('violation_desc'):
- # 更新违规类型
- desc = llm_result.get('violation_desc', '')
- if '安全帽' in desc and '反光' in desc:
- status.violation_types = [SafetyViolationType.NO_BOTH]
- elif '安全帽' in desc:
- status.violation_types = [SafetyViolationType.NO_HELMET]
- elif '反光' in desc:
- status.violation_types = [SafetyViolationType.NO_SAFETY_VEST]
- except Exception as e:
- print(f"大模型分析失败: {e}")
-
- llm_status_list.append(status)
-
- return llm_status_list
-
- def recognize_number(self, frame: np.ndarray,
- person_bbox: Tuple[int, int, int, int]) -> Dict[str, Any]:
- """
- 识别人员编号
-
- Args:
- frame: 输入图像
- person_bbox: 人员边界框
-
- Returns:
- 编号识别结果
- """
- if self.number_recognizer is None:
- return {'number': None, 'success': False}
-
- # 裁剪人员区域
- x1, y1, x2, y2 = person_bbox
- person_image = frame[y1:y2, x1:x2]
-
- return self.number_recognizer.recognize_person_number(person_image)
-
- # 轨迹追踪已禁用 - detect_with_tracking 方法已移除
|