|
|
@@ -8,7 +8,7 @@ import numpy as np
|
|
|
from typing import List, Optional, Tuple, Dict
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
-from config import OCR_CONFIG
|
|
|
+from config import OCR_CONFIG, SEGMENTATION_CONFIG
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
@@ -33,7 +33,7 @@ class PersonInfo:
|
|
|
|
|
|
class PersonSegmenter:
|
|
|
"""
|
|
|
- 人体分割器
|
|
|
+ 人体分割器 - 使用 RKNN YOLOv8 分割模型
|
|
|
将人体从背景中分割出来
|
|
|
"""
|
|
|
|
|
|
@@ -41,22 +41,127 @@ class PersonSegmenter:
|
|
|
"""
|
|
|
初始化分割器
|
|
|
Args:
|
|
|
- use_gpu: 是否使用GPU
|
|
|
+ use_gpu: 是否使用GPU (RKNN使用NPU,此参数保留用于兼容)
|
|
|
"""
|
|
|
self.use_gpu = use_gpu
|
|
|
- self.segmentor = None
|
|
|
+ self.config = SEGMENTATION_CONFIG
|
|
|
+ self.input_size = self.config.get('input_size', (640, 640))
|
|
|
+ self.conf_threshold = self.config.get('conf_threshold', 0.5)
|
|
|
+ self.rknn = None
|
|
|
self._load_model()
|
|
|
|
|
|
def _load_model(self):
|
|
|
- """加载分割模型"""
|
|
|
+ """加载 RKNN 分割模型"""
|
|
|
try:
|
|
|
- # 使用YOLO11分割模型
|
|
|
- from ultralytics import YOLO
|
|
|
- self.segmentor = YOLO('yolo11n-seg.pt') # YOLO11分割模型
|
|
|
- print("成功加载YOLO11人体分割模型")
|
|
|
+ from rknnlite.api import RKNNLite
|
|
|
+
|
|
|
+ model_path = self.config.get('model_path', '/home/admin/dsh/testrk3588/yolov8n-seg.rknn')
|
|
|
+ self.rknn = RKNNLite()
|
|
|
+
|
|
|
+ ret = self.rknn.load_rknn(model_path)
|
|
|
+ if ret != 0:
|
|
|
+ print(f"[错误] 加载 RKNN 分割模型失败: {model_path}")
|
|
|
+ self.rknn = None
|
|
|
+ return
|
|
|
+
|
|
|
+ # 初始化运行时,使用所有NPU核心
|
|
|
+ ret = self.rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0_1_2)
|
|
|
+ if ret != 0:
|
|
|
+ print("[错误] 初始化 RKNN 运行时失败")
|
|
|
+ self.rknn = None
|
|
|
+ return
|
|
|
+
|
|
|
+ print(f"成功加载 RKNN 人体分割模型: {model_path}")
|
|
|
+ except ImportError:
|
|
|
+ print("未安装 rknnlite,无法使用 RKNN 分割模型")
|
|
|
+ self.rknn = None
|
|
|
except Exception as e:
|
|
|
print(f"加载分割模型失败: {e}")
|
|
|
- self.segmentor = None
|
|
|
+ self.rknn = None
|
|
|
+
|
|
|
+ def _letterbox(self, image: np.ndarray) -> tuple:
|
|
|
+ """Letterbox 预处理,保持宽高比"""
|
|
|
+ h0, w0 = image.shape[:2]
|
|
|
+ ih, iw = self.input_size
|
|
|
+ scale = min(iw / w0, ih / h0)
|
|
|
+ new_w, new_h = int(w0 * scale), int(h0 * scale)
|
|
|
+ pad_w = (iw - new_w) // 2
|
|
|
+ pad_h = (ih - new_h) // 2
|
|
|
+ resized = cv2.resize(image, (new_w, new_h))
|
|
|
+ canvas = np.full((ih, iw, 3), 114, dtype=np.uint8)
|
|
|
+ canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized
|
|
|
+ return canvas, scale, pad_w, pad_h, h0, w0
|
|
|
+
|
|
|
+ def _postprocess_segmentation(self, outputs, scale, pad_w, pad_h, w0, h0):
|
|
|
+ """
|
|
|
+ 处理 YOLOv8 分割模型输出
|
|
|
+ YOLOv8-seg 输出格式: [检测输出, 分割输出]
|
|
|
+ - 检测输出: (1, 116, 8400) - 包含边界框、类别、掩码系数
|
|
|
+ - 分割输出: (1, 32, 160, 160) - 原型掩码
|
|
|
+ """
|
|
|
+ if not outputs or len(outputs) < 2:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 解析输出
|
|
|
+ det_output = outputs[0] # (1, 116, 8400) - 检测输出
|
|
|
+ seg_output = outputs[1] # (1, 32, 160, 160) - 分割原型
|
|
|
+
|
|
|
+ # 处理检测输出
|
|
|
+ if len(det_output.shape) == 3:
|
|
|
+ det_output = det_output[0] # (116, 8400)
|
|
|
+
|
|
|
+ # YOLOv8-seg: 前 84 维是检测 (4 box + 80 classes),后 32 维是掩码系数
|
|
|
+ num_anchors = det_output.shape[1]
|
|
|
+
|
|
|
+ best_idx = -1
|
|
|
+ best_conf = 0
|
|
|
+
|
|
|
+ # 寻找最佳人体检测 (class 0 = person)
|
|
|
+ for i in range(num_anchors):
|
|
|
+ # 类别概率 (索引 4-84 是80个类别)
|
|
|
+ class_probs = det_output[4:84, i]
|
|
|
+ person_conf = float(class_probs[0]) # class 0 = person
|
|
|
+
|
|
|
+ if person_conf > self.conf_threshold and person_conf > best_conf:
|
|
|
+ best_conf = person_conf
|
|
|
+ best_idx = i
|
|
|
+
|
|
|
+ if best_idx < 0:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 获取掩码系数 (后32维)
|
|
|
+ mask_coeffs = det_output[84:116, best_idx] # (32,)
|
|
|
+
|
|
|
+ # 处理分割原型 (1, 32, 160, 160) -> (32, 160, 160)
|
|
|
+ if len(seg_output.shape) == 4:
|
|
|
+ seg_output = seg_output[0]
|
|
|
+
|
|
|
+ # 计算最终掩码: mask = coeffs @ prototypes
|
|
|
+ # seg_output: (32, 160, 160), mask_coeffs: (32,)
|
|
|
+ mask = np.zeros((160, 160), dtype=np.float32)
|
|
|
+ for i in range(32):
|
|
|
+ mask += mask_coeffs[i] * seg_output[i]
|
|
|
+
|
|
|
+ # Sigmoid 激活
|
|
|
+ mask = 1 / (1 + np.exp(-mask))
|
|
|
+
|
|
|
+ # 移除 padding 并缩放到原始尺寸
|
|
|
+ mask = (mask > 0.5).astype(np.uint8) * 255
|
|
|
+
|
|
|
+ # 裁剪掉 letterbox 添加的 padding
|
|
|
+ mask_h, mask_w = mask.shape
|
|
|
+ pad_h_mask = int(pad_h * mask_h / self.input_size[0]) # 160/640 = 0.25
|
|
|
+ pad_w_mask = int(pad_w * mask_w / self.input_size[1])
|
|
|
+ new_h_mask = int((mask_h - 2 * pad_h_mask))
|
|
|
+ new_w_mask = int((mask_w - 2 * pad_w_mask))
|
|
|
+
|
|
|
+ if new_h_mask > 0 and new_w_mask > 0:
|
|
|
+ mask = mask[pad_h_mask:pad_h_mask+new_h_mask, pad_w_mask:pad_w_mask+new_w_mask]
|
|
|
+
|
|
|
+ # 缩放到原始 ROI 尺寸
|
|
|
+ mask = cv2.resize(mask, (w0, h0))
|
|
|
+
|
|
|
+ return mask
|
|
|
|
|
|
def segment_person(self, frame: np.ndarray,
|
|
|
person_bbox: Tuple[int, int, int, int]) -> Optional[np.ndarray]:
|
|
|
@@ -66,33 +171,44 @@ class PersonSegmenter:
|
|
|
frame: 输入图像
|
|
|
person_bbox: 人体边界框 (x, y, w, h)
|
|
|
Returns:
|
|
|
- 人体分割掩码 (或分割后的人体图像)
|
|
|
+ 人体分割掩码
|
|
|
"""
|
|
|
- if self.segmentor is None:
|
|
|
+ if self.rknn is None:
|
|
|
return None
|
|
|
|
|
|
x, y, w, h = person_bbox
|
|
|
|
|
|
# 裁剪人体区域
|
|
|
person_roi = frame[y:y+h, x:x+w]
|
|
|
+ if person_roi.size == 0:
|
|
|
+ return None
|
|
|
|
|
|
try:
|
|
|
- # 使用分割模型
|
|
|
- results = self.segmentor(person_roi, classes=[0], verbose=False) # class 0 = person
|
|
|
+ # 预处理
|
|
|
+ canvas, scale, pad_w, pad_h, h0, w0 = self._letterbox(person_roi)
|
|
|
+
|
|
|
+ # RKNN 输入: NHWC (1, H, W, C), RGB, float32 normalized 0-1
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
+ blob = img[None, ...] # (1, 640, 640, 3)
|
|
|
+
|
|
|
+ # 推理
|
|
|
+ outputs = self.rknn.inference(inputs=[blob])
|
|
|
+
|
|
|
+ # 后处理
|
|
|
+ mask = self._postprocess_segmentation(outputs, scale, pad_w, pad_h, w0, h0)
|
|
|
+ return mask
|
|
|
|
|
|
- if results and len(results) > 0 and results[0].masks is not None:
|
|
|
- masks = results[0].masks.data
|
|
|
- if len(masks) > 0:
|
|
|
- # 获取第一个掩码
|
|
|
- mask = masks[0].cpu().numpy()
|
|
|
- mask = cv2.resize(mask, (w, h))
|
|
|
- mask = (mask > 0.5).astype(np.uint8) * 255
|
|
|
- return mask
|
|
|
except Exception as e:
|
|
|
print(f"分割错误: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
+ def release(self):
|
|
|
+ """释放 RKNN 资源"""
|
|
|
+ if self.rknn is not None:
|
|
|
+ self.rknn.release()
|
|
|
+ self.rknn = None
|
|
|
+
|
|
|
def extract_person_region(self, frame: np.ndarray,
|
|
|
person_bbox: Tuple[int, int, int, int],
|
|
|
padding: float = 0.1) -> Tuple[np.ndarray, Tuple[int, int]]:
|
|
|
@@ -495,6 +611,11 @@ class NumberDetector:
|
|
|
person_info.person_id = i
|
|
|
results.append(person_info)
|
|
|
return results
|
|
|
+
|
|
|
+ def release(self):
|
|
|
+ """释放资源"""
|
|
|
+ if hasattr(self.segmenter, 'release'):
|
|
|
+ self.segmenter.release()
|
|
|
|
|
|
|
|
|
def preprocess_for_ocr(image: np.ndarray) -> np.ndarray:
|