|
|
@@ -640,12 +640,17 @@ class ObjectDetector:
|
|
|
return canvas, scale, pad_w, pad_h, h0, w0
|
|
|
|
|
|
def _detect_rknn(self, frame: np.ndarray) -> List[DetectedObject]:
|
|
|
- """使用 RKNN/ONNX 模型检测"""
|
|
|
+ """使用 RKNN/ONNX 模型检测 - 宽幅全景图分区域检测以提高远处目标识别率"""
|
|
|
results = []
|
|
|
-
|
|
|
+ h0, w0 = frame.shape[:2]
|
|
|
+
|
|
|
+ # 宽幅图(宽高比>2.5)使用分区域检测,避免letterbox后人太小
|
|
|
+ if w0 / h0 > 2.5:
|
|
|
+ return self._detect_rknn_tiled(frame)
|
|
|
+
|
|
|
try:
|
|
|
canvas, scale, pad_w, pad_h, h0, w0 = self._letterbox(frame)
|
|
|
-
|
|
|
+
|
|
|
if hasattr(self, 'rknn'):
|
|
|
# RKNN
|
|
|
img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
@@ -657,49 +662,49 @@ class ObjectDetector:
|
|
|
img = img.transpose(2, 0, 1)
|
|
|
blob = img[None, ...]
|
|
|
outputs = self.session.run([self.output_name], {self.input_name: blob})
|
|
|
-
|
|
|
+
|
|
|
output = outputs[0]
|
|
|
if len(output.shape) == 3:
|
|
|
output = output[0]
|
|
|
-
|
|
|
+
|
|
|
num_boxes = output.shape[1]
|
|
|
conf_threshold = self.config['confidence_threshold']
|
|
|
-
|
|
|
+
|
|
|
for i in range(num_boxes):
|
|
|
x_center = float(output[0, i])
|
|
|
y_center = float(output[1, i])
|
|
|
width = float(output[2, i])
|
|
|
height = float(output[3, i])
|
|
|
-
|
|
|
+
|
|
|
class_probs = output[4:, i]
|
|
|
best_class = int(np.argmax(class_probs))
|
|
|
confidence = float(class_probs[best_class])
|
|
|
-
|
|
|
+
|
|
|
if confidence < conf_threshold:
|
|
|
continue
|
|
|
-
|
|
|
+
|
|
|
# 转换到原始图像坐标
|
|
|
x1 = int(((x_center - width / 2) - pad_w) / scale)
|
|
|
y1 = int(((y_center - height / 2) - pad_h) / scale)
|
|
|
x2 = int(((x_center + width / 2) - pad_w) / scale)
|
|
|
y2 = int(((y_center + height / 2) - pad_h) / scale)
|
|
|
-
|
|
|
+
|
|
|
x1 = max(0, min(w0, x1))
|
|
|
y1 = max(0, min(h0, y1))
|
|
|
x2 = max(0, min(w0, x2))
|
|
|
y2 = max(0, min(h0, y2))
|
|
|
-
|
|
|
+
|
|
|
if x2 - x1 < 10 or y2 - y1 < 10:
|
|
|
continue
|
|
|
-
|
|
|
+
|
|
|
# 使用配置的类别映射获取类别名称
|
|
|
class_map = self.config.get('class_map', {0: 'hat',3: 'person',4: 'reflective'})
|
|
|
cls_name = class_map.get(best_class, str(best_class))
|
|
|
-
|
|
|
+
|
|
|
# 检查是否为目标类别
|
|
|
if cls_name not in self.config['target_classes']:
|
|
|
continue
|
|
|
-
|
|
|
+
|
|
|
obj = DetectedObject(
|
|
|
class_name=cls_name,
|
|
|
confidence=confidence,
|
|
|
@@ -707,10 +712,90 @@ class ObjectDetector:
|
|
|
center=((x1 + x2) // 2, (y1 + y2) // 2)
|
|
|
)
|
|
|
results.append(obj)
|
|
|
-
|
|
|
+
|
|
|
except Exception as e:
|
|
|
logger.error(f"RKNN/ONNX 检测错误: {e}")
|
|
|
-
|
|
|
+
|
|
|
+ return results
|
|
|
+
|
|
|
+ def _detect_rknn_tiled(self, frame: np.ndarray) -> List[DetectedObject]:
|
|
|
+ """分区域检测 - 将宽幅全景图分成多个重叠区域分别检测,提高远处目标识别率"""
|
|
|
+ results = []
|
|
|
+ h0, w0 = frame.shape[:2]
|
|
|
+ conf_threshold = self.config['confidence_threshold']
|
|
|
+ class_map = self.config.get('class_map', {0: 'hat', 3: 'person', 4: 'reflective'})
|
|
|
+
|
|
|
+ # 分3个重叠区域
|
|
|
+ overlap = int(h0 * 0.2)
|
|
|
+ regions = [
|
|
|
+ (0, w0 // 3 + overlap),
|
|
|
+ (w0 // 3 - overlap // 2, 2 * w0 // 3 + overlap // 2),
|
|
|
+ (2 * w0 // 3 - overlap, w0),
|
|
|
+ ]
|
|
|
+
|
|
|
+ seen_centers = []
|
|
|
+
|
|
|
+ for x_start, x_end in regions:
|
|
|
+ crop = frame[:, x_start:x_end]
|
|
|
+ canvas, scale, pad_w, pad_h, ch, cw = self._letterbox(crop)
|
|
|
+
|
|
|
+ if hasattr(self, 'rknn'):
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
+ outputs = self.rknn.inference(inputs=[img[None, ...]])
|
|
|
+ else:
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
+ img = img.transpose(2, 0, 1)
|
|
|
+ outputs = self.session.run([self.output_name], {self.input_name: img[None, ...]})
|
|
|
+
|
|
|
+ output = outputs[0]
|
|
|
+ if len(output.shape) == 3:
|
|
|
+ output = output[0]
|
|
|
+
|
|
|
+ for i in range(output.shape[1]):
|
|
|
+ class_probs = output[4:, i]
|
|
|
+ best_class = int(np.argmax(class_probs))
|
|
|
+ confidence = float(class_probs[best_class])
|
|
|
+
|
|
|
+ if confidence < conf_threshold:
|
|
|
+ continue
|
|
|
+
|
|
|
+ cls_name = class_map.get(best_class, str(best_class))
|
|
|
+ if cls_name not in self.config['target_classes']:
|
|
|
+ continue
|
|
|
+
|
|
|
+ xc = float(output[0, i])
|
|
|
+ yc = float(output[1, i])
|
|
|
+ bw = float(output[2, i])
|
|
|
+ bh = float(output[3, i])
|
|
|
+
|
|
|
+ # 转换到原图坐标(加上区域偏移)
|
|
|
+ x1 = int(((xc - bw / 2) - pad_w) / scale) + x_start
|
|
|
+ y1 = int(((yc - bh / 2) - pad_h) / scale)
|
|
|
+ x2 = int(((xc + bw / 2) - pad_w) / scale) + x_start
|
|
|
+ y2 = int(((yc + bh / 2) - pad_h) / scale)
|
|
|
+
|
|
|
+ x1 = max(0, min(w0, x1))
|
|
|
+ y1 = max(0, min(h0, y1))
|
|
|
+ x2 = max(0, min(w0, x2))
|
|
|
+ y2 = max(0, min(h0, y2))
|
|
|
+
|
|
|
+ if x2 - x1 < 10 or y2 - y1 < 10:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 去重:同一目标可能被相邻区域重复检测
|
|
|
+ cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
|
|
|
+ if any(abs(cx - sx) < 50 and abs(cy - sy) < 50 for sx, sy in seen_centers):
|
|
|
+ continue
|
|
|
+ seen_centers.append((cx, cy))
|
|
|
+
|
|
|
+ obj = DetectedObject(
|
|
|
+ class_name=cls_name,
|
|
|
+ confidence=confidence,
|
|
|
+ bbox=(x1, y1, x2 - x1, y2 - y1),
|
|
|
+ center=(cx, cy)
|
|
|
+ )
|
|
|
+ results.append(obj)
|
|
|
+
|
|
|
return results
|
|
|
|
|
|
def detect(self, frame: np.ndarray) -> List[DetectedObject]:
|