|
|
@@ -20,8 +20,10 @@ from dataclasses import dataclass
|
|
|
from pathlib import Path
|
|
|
|
|
|
from config import PANORAMA_CAMERA, DETECTION_CONFIG
|
|
|
+from config.camera import parse_resolution
|
|
|
from dahua_sdk import DahuaSDK, PTZCommand
|
|
|
from video_lock import safe_read, safe_is_opened
|
|
|
+from inference_backend import nms
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@@ -48,25 +50,36 @@ class PanoramaCamera:
|
|
|
"""
|
|
|
self.sdk = sdk
|
|
|
self.config = camera_config or PANORAMA_CAMERA
|
|
|
-
|
|
|
+
|
|
|
+ # 解析期望分辨率
|
|
|
+ self.frame_width, self.frame_height = parse_resolution(self.config.get('resolution'))
|
|
|
+
|
|
|
+ # 摄像头品牌 / SDK 使用策略
|
|
|
+ # brand: 'dahua' | 'hikvision' | 'uniview' | 'auto'
|
|
|
+ # use_sdk: True 时使用大华 SDK 登录;False 时仅使用 RTSP 取流
|
|
|
+ self.brand = self.config.get('brand', 'auto').lower()
|
|
|
+ self.use_sdk = self.config.get('use_sdk', self.brand != 'hikvision')
|
|
|
+ if self.brand == 'hikvision':
|
|
|
+ self.use_sdk = False
|
|
|
+
|
|
|
self.login_handle = None
|
|
|
self.play_handle = None
|
|
|
self.connected = False
|
|
|
-
|
|
|
+
|
|
|
# 视频流
|
|
|
self.frame_queue = queue.Queue(maxsize=10)
|
|
|
self.current_frame = None
|
|
|
self.frame_lock = threading.Lock()
|
|
|
self.rtsp_cap = None # RTSP视频捕获
|
|
|
self._camera_id = 'panorama' # 用于per-camera锁
|
|
|
-
|
|
|
+
|
|
|
# 检测器
|
|
|
self.detector = None
|
|
|
-
|
|
|
+
|
|
|
# 控制标志
|
|
|
self.running = False
|
|
|
self.stream_thread = None
|
|
|
-
|
|
|
+
|
|
|
# 断线重连
|
|
|
self.auto_reconnect = True
|
|
|
self.reconnect_interval = 5.0 # 重连间隔(秒)
|
|
|
@@ -78,52 +91,65 @@ class PanoramaCamera:
|
|
|
Returns:
|
|
|
是否成功
|
|
|
"""
|
|
|
+ if not self.use_sdk:
|
|
|
+ print(f"[PanoramaCamera] {self.config.get('ip')} 配置为 RTSP-only 模式,跳过 SDK 登录")
|
|
|
+ self.connected = True
|
|
|
+ return True
|
|
|
+
|
|
|
login_handle, error = self.sdk.login(
|
|
|
self.config['ip'],
|
|
|
self.config['port'],
|
|
|
self.config['username'],
|
|
|
self.config['password']
|
|
|
)
|
|
|
-
|
|
|
+
|
|
|
if login_handle is None:
|
|
|
print(f"连接全景摄像头失败: IP={self.config['ip']}, 错误码={error}")
|
|
|
return False
|
|
|
-
|
|
|
+
|
|
|
self.login_handle = login_handle
|
|
|
self.connected = True
|
|
|
print(f"成功连接全景摄像头: {self.config['ip']}")
|
|
|
return True
|
|
|
-
|
|
|
+
|
|
|
def disconnect(self):
|
|
|
"""断开连接"""
|
|
|
self.stop_stream()
|
|
|
- if self.login_handle:
|
|
|
+ if self.use_sdk and self.login_handle:
|
|
|
self.sdk.logout(self.login_handle)
|
|
|
self.login_handle = None
|
|
|
self.connected = False
|
|
|
-
|
|
|
+
|
|
|
+ def is_connected(self) -> bool:
|
|
|
+ """是否已连接"""
|
|
|
+ return self.connected
|
|
|
+
|
|
|
def start_stream(self) -> bool:
|
|
|
"""
|
|
|
- 开始视频流
|
|
|
+ 开始视频流 (SDK 模式,仅 Dahua 等品牌支持)
|
|
|
Returns:
|
|
|
是否成功
|
|
|
"""
|
|
|
if not self.connected:
|
|
|
return False
|
|
|
-
|
|
|
+
|
|
|
+ if not self.use_sdk:
|
|
|
+ print("[PanoramaCamera] 当前为 RTSP-only 模式,跳过 SDK 视频流")
|
|
|
+ return False
|
|
|
+
|
|
|
self.play_handle = self.sdk.real_play(
|
|
|
- self.login_handle,
|
|
|
+ self.login_handle,
|
|
|
self.config['channel']
|
|
|
)
|
|
|
-
|
|
|
+
|
|
|
if self.play_handle is None:
|
|
|
print("启动视频流失败")
|
|
|
return False
|
|
|
-
|
|
|
+
|
|
|
self.running = True
|
|
|
self.stream_thread = threading.Thread(target=self._stream_worker, daemon=True)
|
|
|
self.stream_thread.start()
|
|
|
-
|
|
|
+
|
|
|
print("视频流已启动")
|
|
|
return True
|
|
|
|
|
|
@@ -150,11 +176,11 @@ class PanoramaCamera:
|
|
|
return False
|
|
|
|
|
|
self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
|
-
|
|
|
+
|
|
|
self.running = True
|
|
|
self.stream_thread = threading.Thread(target=self._rtsp_stream_worker, daemon=True)
|
|
|
self.stream_thread.start()
|
|
|
- print(f"RTSP视频流已启动: {rtsp_url}")
|
|
|
+ print(f"RTSP视频流已启动: {rtsp_url} (期望分辨率 {self.frame_width}x{self.frame_height})")
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
print(f"RTSP流启动失败: {e}")
|
|
|
@@ -211,17 +237,17 @@ class PanoramaCamera:
|
|
|
retry_count += 1
|
|
|
time.sleep(1.0) # 重试间隔
|
|
|
else:
|
|
|
- # 超过最大重试次数,使用模拟帧
|
|
|
- frame = np.zeros((1080, 1920, 3), dtype=np.uint8)
|
|
|
-
|
|
|
+ # 超过最大重试次数,使用与配置分辨率一致的模拟帧
|
|
|
+ frame = np.zeros((self.frame_height, self.frame_width, 3), dtype=np.uint8)
|
|
|
+
|
|
|
with self.frame_lock:
|
|
|
self.current_frame = frame
|
|
|
-
|
|
|
+
|
|
|
try:
|
|
|
self.frame_queue.put(frame, block=False)
|
|
|
except queue.Full:
|
|
|
pass
|
|
|
-
|
|
|
+
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
except Exception as e:
|
|
|
@@ -266,10 +292,25 @@ class PanoramaCamera:
|
|
|
continue
|
|
|
|
|
|
error_count = 0
|
|
|
-
|
|
|
+
|
|
|
+ # 记录实际分辨率,仅做校验与提示(不做拉伸缩放,避免丢精度)
|
|
|
+ actual_h, actual_w = frame.shape[:2]
|
|
|
+ if not getattr(self, '_resolution_logged', False):
|
|
|
+ print(f"全景摄像头实际分辨率: {actual_w}x{actual_h},期望分辨率: "
|
|
|
+ f"{self.frame_width}x{self.frame_height}")
|
|
|
+ self._resolution_logged = True
|
|
|
+ if (actual_w, actual_h) != (self.frame_width, self.frame_height):
|
|
|
+ if not getattr(self, '_resolution_warned', False):
|
|
|
+ logger.warning(
|
|
|
+ f"全景摄像头分辨率 {actual_w}x{actual_h} 与期望分辨率 "
|
|
|
+ f"{self.frame_width}x{self.frame_height} 不一致,"
|
|
|
+ f"模型推理时将使用 letterbox 灰度填充保持比例"
|
|
|
+ )
|
|
|
+ self._resolution_warned = True
|
|
|
+
|
|
|
with self.frame_lock:
|
|
|
self.current_frame = frame.copy()
|
|
|
-
|
|
|
+
|
|
|
try:
|
|
|
self.frame_queue.put(frame, block=False)
|
|
|
except queue.Full:
|
|
|
@@ -419,6 +460,7 @@ class ObjectDetector:
|
|
|
self.use_gpu = use_gpu
|
|
|
self.model_size = model_size
|
|
|
self.model_type = model_type
|
|
|
+ self.is_end2end = False
|
|
|
self.config = DETECTION_CONFIG
|
|
|
self.device = 'cuda:0' if use_gpu else 'cpu'
|
|
|
|
|
|
@@ -444,6 +486,9 @@ class ObjectDetector:
|
|
|
self.model_type = 'onnx'
|
|
|
elif ext == '.pt':
|
|
|
self.model_type = 'yolo'
|
|
|
+ # end2end 模型(内置NMS),输出格式 (N, 6) = (x1,y1,x2,y2,conf,cls)
|
|
|
+ if 'end2end' in os.path.basename(model_path).lower():
|
|
|
+ self.is_end2end = True
|
|
|
|
|
|
self._load_model()
|
|
|
|
|
|
@@ -486,8 +531,8 @@ class ObjectDetector:
|
|
|
import onnxruntime as ort
|
|
|
self.session = ort.InferenceSession(self.model_path)
|
|
|
self.input_name = self.session.get_inputs()[0].name
|
|
|
- self.output_name = self.session.get_outputs()[0].name
|
|
|
- print(f"ONNX 模型加载成功: {self.model_path}")
|
|
|
+ self.output_names = [o.name for o in self.session.get_outputs()]
|
|
|
+ print(f"ONNX 模型加载成功: {self.model_path}, 输出数量={len(self.output_names)}")
|
|
|
except ImportError:
|
|
|
raise ImportError("未安装 onnxruntime,请运行: pip install onnxruntime")
|
|
|
|
|
|
@@ -639,8 +684,139 @@ class ObjectDetector:
|
|
|
canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized
|
|
|
return canvas, scale, pad_w, pad_h, h0, w0
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def _make_grid(h: int, w: int):
|
|
|
+ """生成特征图网格坐标"""
|
|
|
+ yv, xv = np.meshgrid(np.arange(h), np.arange(w), indexing='ij')
|
|
|
+ return np.stack([xv, yv], axis=-1).reshape(-1, 2)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _pseudo_person_confidence(bboxes: np.ndarray, orig_h: int, orig_w: int) -> np.ndarray:
|
|
|
+ """
|
|
|
+ 当 RKNN 模型 cls 输出恒定时,根据 bbox 形状生成伪置信度。
|
|
|
+ bboxes: (N, 4) [x1, y1, x2, y2] 原始图像坐标
|
|
|
+ """
|
|
|
+ x1, y1, x2, y2 = bboxes[:, 0], bboxes[:, 1], bboxes[:, 2], bboxes[:, 3]
|
|
|
+ w = np.maximum(1.0, x2 - x1)
|
|
|
+ h = np.maximum(1.0, y2 - y1)
|
|
|
+ aspect = h / w
|
|
|
+
|
|
|
+ # 人体宽高比通常在 1.5 ~ 4.0 之间,以 2.5 为最佳
|
|
|
+ aspect_score = np.exp(-0.5 * ((aspect - 2.5) / 1.0) ** 2)
|
|
|
+
|
|
|
+ # 面积占画面比例,过大/过小都降权
|
|
|
+ area = w * h
|
|
|
+ img_area = orig_w * orig_h
|
|
|
+ area_ratio = area / img_area
|
|
|
+ size_score = np.clip(area_ratio * 80, 0.0, 1.0) # 占画面 1.25% 以上得满分
|
|
|
+
|
|
|
+ # 综合伪置信度,范围 0.55 ~ 0.95
|
|
|
+ conf = 0.55 + 0.30 * aspect_score + 0.10 * size_score
|
|
|
+ return conf
|
|
|
+
|
|
|
+ def _decode_yolo11_outputs(self, outputs: list, canvas_size: tuple,
|
|
|
+ scale: float, pad_w: int, pad_h: int,
|
|
|
+ orig_h: int, orig_w: int,
|
|
|
+ conf_threshold: float = 0.5,
|
|
|
+ iou_threshold: float = 0.45) -> list:
|
|
|
+ """
|
|
|
+ 解码 YOLO11 RKNN 多输出格式 (DFL bbox + cls + mask,每尺度 3 个输出)
|
|
|
+
|
|
|
+ outputs: [bbox_80x80, cls_80x80, mask_80x80, bbox_40x40, cls_40x40, ...]
|
|
|
+ 返回: [[x1, y1, x2, y2, score, class_id], ...] (原始图像坐标)
|
|
|
+ """
|
|
|
+ strides = [8, 16, 32]
|
|
|
+ reg_max = 16
|
|
|
+ dets = []
|
|
|
+ # 记录 cls 输出是否异常(常量),用于后续 fallback
|
|
|
+ cls_constant = True
|
|
|
+
|
|
|
+ for scale_idx, stride in enumerate(strides):
|
|
|
+ bbox_out = outputs[scale_idx * 3]
|
|
|
+ cls_out = outputs[scale_idx * 3 + 1]
|
|
|
+
|
|
|
+ # 检测 cls 是否为常量(量化/导出异常导致)
|
|
|
+ if np.ptp(cls_out) > 1e-4:
|
|
|
+ cls_constant = False
|
|
|
+
|
|
|
+ _, _, h, w = bbox_out.shape
|
|
|
+ # (64, H, W) -> (H*W, 4, 16)
|
|
|
+ bbox = bbox_out[0].transpose(1, 2, 0).reshape(h * w, 4, reg_max)
|
|
|
+ # (80, H, W) -> (H*W, 80)
|
|
|
+ cls = cls_out[0].transpose(1, 2, 0).reshape(h * w, 80)
|
|
|
+
|
|
|
+ # DFL 解码
|
|
|
+ prob = np.exp(bbox - np.max(bbox, axis=-1, keepdims=True))
|
|
|
+ prob = prob / np.sum(prob, axis=-1, keepdims=True)
|
|
|
+ bins = np.arange(reg_max).reshape(1, 1, reg_max)
|
|
|
+ decoded = np.sum(prob * bins, axis=-1) # (H*W, 4)
|
|
|
+
|
|
|
+ # 网格中心
|
|
|
+ grid = self._make_grid(h, w) + 0.5 # (H*W, 2)
|
|
|
+
|
|
|
+ l, t, r, b = decoded[:, 0], decoded[:, 1], decoded[:, 2], decoded[:, 3]
|
|
|
+ x1 = (grid[:, 0] - l) * stride
|
|
|
+ y1 = (grid[:, 1] - t) * stride
|
|
|
+ x2 = (grid[:, 0] + r) * stride
|
|
|
+ y2 = (grid[:, 1] + b) * stride
|
|
|
+
|
|
|
+ # cls sigmoid
|
|
|
+ cls = 1.0 / (1.0 + np.exp(-cls))
|
|
|
+ scores = np.max(cls, axis=1)
|
|
|
+ labels = np.argmax(cls, axis=1)
|
|
|
+
|
|
|
+ for i in range(len(scores)):
|
|
|
+ if scores[i] < conf_threshold:
|
|
|
+ continue
|
|
|
+ dets.append([x1[i], y1[i], x2[i], y2[i], scores[i], labels[i]])
|
|
|
+
|
|
|
+ if not dets:
|
|
|
+ return []
|
|
|
+
|
|
|
+ dets = np.array(dets)
|
|
|
+
|
|
|
+ # 从 canvas(640x640) 坐标映射回原始图像坐标:去 padding -> 除以 scale
|
|
|
+ dets[:, [0, 2]] = (dets[:, [0, 2]] - pad_w) / scale
|
|
|
+ dets[:, [1, 3]] = (dets[:, [1, 3]] - pad_h) / scale
|
|
|
+
|
|
|
+ # clip
|
|
|
+ dets[:, [0, 2]] = np.clip(dets[:, [0, 2]], 0, orig_w)
|
|
|
+ dets[:, [1, 3]] = np.clip(dets[:, [1, 3]], 0, orig_h)
|
|
|
+
|
|
|
+ # 若 cls 输出异常常量,用伪置信度替代,帮助 NMS 和后续过滤
|
|
|
+ if cls_constant:
|
|
|
+ pseudo_scores = self._pseudo_person_confidence(dets[:, :4], orig_h, orig_w)
|
|
|
+ dets[:, 4] = pseudo_scores
|
|
|
+ dets[:, 5] = 0 # 强制为 person 类别
|
|
|
+
|
|
|
+ # NMS
|
|
|
+ x1, y1, x2, y2 = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3]
|
|
|
+ scores = dets[:, 4]
|
|
|
+ areas = (x2 - x1) * (y2 - y1)
|
|
|
+ order = scores.argsort()[::-1]
|
|
|
+
|
|
|
+ keep = []
|
|
|
+ while order.size > 0:
|
|
|
+ i = order[0]
|
|
|
+ keep.append(i)
|
|
|
+
|
|
|
+ xx1 = np.maximum(x1[i], x1[order[1:]])
|
|
|
+ yy1 = np.maximum(y1[i], y1[order[1:]])
|
|
|
+ xx2 = np.minimum(x2[i], x2[order[1:]])
|
|
|
+ yy2 = np.minimum(y2[i], y2[order[1:]])
|
|
|
+
|
|
|
+ w = np.maximum(0.0, xx2 - xx1)
|
|
|
+ h = np.maximum(0.0, yy2 - yy1)
|
|
|
+ inter = w * h
|
|
|
+ iou = inter / (areas[i] + areas[order[1:]] - inter + 1e-6)
|
|
|
+
|
|
|
+ inds = np.where(iou <= iou_threshold)[0]
|
|
|
+ order = order[inds + 1]
|
|
|
+
|
|
|
+ return dets[keep].tolist()
|
|
|
+
|
|
|
def _detect_rknn(self, frame: np.ndarray) -> List[DetectedObject]:
|
|
|
- """使用 RKNN/ONNX 模型检测 - 宽幅全景图分区域检测以提高远处目标识别率"""
|
|
|
+ """使用 RKNN/ONNX 模型检测"""
|
|
|
results = []
|
|
|
h0, w0 = frame.shape[:2]
|
|
|
|
|
|
@@ -649,6 +825,59 @@ class ObjectDetector:
|
|
|
return self._detect_rknn_tiled(frame)
|
|
|
|
|
|
try:
|
|
|
+ conf_threshold = self.config['confidence_threshold']
|
|
|
+ class_map = self.config.get('class_map', {0: 'person'})
|
|
|
+
|
|
|
+ # -------------------------------------------------------
|
|
|
+ # end2end 模型(内置NMS):resize + NCHW 预处理
|
|
|
+ # 输出格式 (N, 6) = (x1,y1,x2,y2,conf,cls) 在 640x640 空间
|
|
|
+ # -------------------------------------------------------
|
|
|
+ if self.is_end2end:
|
|
|
+ img = cv2.resize(frame, (640, 640))
|
|
|
+ img = img.astype(np.float32) / 255.0
|
|
|
+
|
|
|
+ if hasattr(self, 'rknn'):
|
|
|
+ blob = img.transpose(2, 0, 1)[None, ...] # NCHW
|
|
|
+ outputs = self.rknn.inference(inputs=[blob])
|
|
|
+ else:
|
|
|
+ blob = img.transpose(2, 0, 1)[None, ...]
|
|
|
+ outputs = self.session.run(None, {self.input_name: blob})
|
|
|
+
|
|
|
+ output = outputs[0]
|
|
|
+ if len(output.shape) == 3:
|
|
|
+ output = output[0]
|
|
|
+
|
|
|
+ for i in range(output.shape[0]):
|
|
|
+ x1, y1, x2, y2, conf, cls_id = output[i]
|
|
|
+ if conf < conf_threshold:
|
|
|
+ continue
|
|
|
+ cls_name = class_map.get(int(cls_id), str(int(cls_id)))
|
|
|
+ if cls_name not in self.config['target_classes']:
|
|
|
+ continue
|
|
|
+
|
|
|
+ x1 = int(x1 * w0 / 640)
|
|
|
+ y1 = int(y1 * h0 / 640)
|
|
|
+ x2 = int(x2 * w0 / 640)
|
|
|
+ y2 = int(y2 * h0 / 640)
|
|
|
+ x1 = max(0, min(w0, x1))
|
|
|
+ y1 = max(0, min(h0, y1))
|
|
|
+ x2 = max(0, min(w0, x2))
|
|
|
+ y2 = max(0, min(h0, y2))
|
|
|
+
|
|
|
+ if x2 - x1 < 10 or y2 - y1 < 10:
|
|
|
+ continue
|
|
|
+
|
|
|
+ results.append(DetectedObject(
|
|
|
+ class_name=cls_name,
|
|
|
+ confidence=float(conf),
|
|
|
+ bbox=(x1, y1, x2 - x1, y2 - y1),
|
|
|
+ center=((x1 + x2) // 2, (y1 + y2) // 2)
|
|
|
+ ))
|
|
|
+ return results
|
|
|
+
|
|
|
+ # -------------------------------------------------------
|
|
|
+ # 非 end2end 模型:letterbox + NHWC 预处理
|
|
|
+ # -------------------------------------------------------
|
|
|
canvas, scale, pad_w, pad_h, h0, w0 = self._letterbox(frame)
|
|
|
|
|
|
if hasattr(self, 'rknn'):
|
|
|
@@ -661,60 +890,79 @@ class ObjectDetector:
|
|
|
img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
img = img.transpose(2, 0, 1)
|
|
|
blob = img[None, ...]
|
|
|
- outputs = self.session.run([self.output_name], {self.input_name: blob})
|
|
|
+ outputs = self.session.run(None, {self.input_name: blob})
|
|
|
|
|
|
- output = outputs[0]
|
|
|
- if len(output.shape) == 3:
|
|
|
- output = output[0]
|
|
|
-
|
|
|
- num_boxes = output.shape[1]
|
|
|
- conf_threshold = self.config['confidence_threshold']
|
|
|
+ # 根据输出数量判断格式:YOLO11 DFL 为 9 个输出(3 scales x 3 branches)
|
|
|
+ if len(outputs) == 9:
|
|
|
+ dets = self._decode_yolo11_outputs(
|
|
|
+ outputs, (640, 640), scale, pad_w, pad_h, h0, w0,
|
|
|
+ conf_threshold=conf_threshold
|
|
|
+ )
|
|
|
+ for x1, y1, x2, y2, score, cls_id in dets:
|
|
|
+ cls_name = class_map.get(int(cls_id), str(int(cls_id)))
|
|
|
+ if cls_name not in self.config['target_classes']:
|
|
|
+ continue
|
|
|
+ if x2 - x1 < 10 or y2 - y1 < 10:
|
|
|
+ continue
|
|
|
+ obj = DetectedObject(
|
|
|
+ class_name=cls_name,
|
|
|
+ confidence=float(score),
|
|
|
+ bbox=(int(x1), int(y1), int(x2 - x1), int(y2 - y1)),
|
|
|
+ center=(int((x1 + x2) / 2), int((y1 + y2) / 2))
|
|
|
+ )
|
|
|
+ results.append(obj)
|
|
|
+ else:
|
|
|
+ # 标准 (84, 8400) 格式(ONNX 或新 RKNN)
|
|
|
+ output = outputs[0]
|
|
|
+ if len(output.shape) == 3:
|
|
|
+ output = output[0]
|
|
|
|
|
|
- for i in range(num_boxes):
|
|
|
- x_center = float(output[0, i])
|
|
|
- y_center = float(output[1, i])
|
|
|
- width = float(output[2, i])
|
|
|
- height = float(output[3, i])
|
|
|
+ num_boxes = output.shape[1]
|
|
|
+ candidates = []
|
|
|
+ for i in range(num_boxes):
|
|
|
+ x_center = float(output[0, i])
|
|
|
+ y_center = float(output[1, i])
|
|
|
+ width = float(output[2, i])
|
|
|
+ height = float(output[3, i])
|
|
|
|
|
|
- class_probs = output[4:, i]
|
|
|
- best_class = int(np.argmax(class_probs))
|
|
|
- confidence = float(class_probs[best_class])
|
|
|
+ class_probs = output[4:, i]
|
|
|
+ best_class = int(np.argmax(class_probs))
|
|
|
+ confidence = float(class_probs[best_class])
|
|
|
|
|
|
- if confidence < conf_threshold:
|
|
|
- continue
|
|
|
+ if confidence < conf_threshold:
|
|
|
+ continue
|
|
|
|
|
|
- # 转换到原始图像坐标
|
|
|
- x1 = int(((x_center - width / 2) - pad_w) / scale)
|
|
|
- y1 = int(((y_center - height / 2) - pad_h) / scale)
|
|
|
- x2 = int(((x_center + width / 2) - pad_w) / scale)
|
|
|
- y2 = int(((y_center + height / 2) - pad_h) / scale)
|
|
|
+ x1 = int(((x_center - width / 2) - pad_w) / scale)
|
|
|
+ y1 = int(((y_center - height / 2) - pad_h) / scale)
|
|
|
+ x2 = int(((x_center + width / 2) - pad_w) / scale)
|
|
|
+ y2 = int(((y_center + height / 2) - pad_h) / scale)
|
|
|
|
|
|
- x1 = max(0, min(w0, x1))
|
|
|
- y1 = max(0, min(h0, y1))
|
|
|
- x2 = max(0, min(w0, x2))
|
|
|
- y2 = max(0, min(h0, y2))
|
|
|
+ x1 = max(0, min(w0, x1))
|
|
|
+ y1 = max(0, min(h0, y1))
|
|
|
+ x2 = max(0, min(w0, x2))
|
|
|
+ y2 = max(0, min(h0, y2))
|
|
|
|
|
|
- if x2 - x1 < 10 or y2 - y1 < 10:
|
|
|
- continue
|
|
|
+ if x2 - x1 < 10 or y2 - y1 < 10:
|
|
|
+ continue
|
|
|
|
|
|
- # 使用配置的类别映射获取类别名称
|
|
|
- class_map = self.config.get('class_map', {0: 'hat',3: 'person',4: 'reflective'})
|
|
|
- cls_name = class_map.get(best_class, str(best_class))
|
|
|
+ cls_name = class_map.get(best_class, str(best_class))
|
|
|
+ if cls_name not in self.config['target_classes']:
|
|
|
+ continue
|
|
|
|
|
|
- # 检查是否为目标类别
|
|
|
- if cls_name not in self.config['target_classes']:
|
|
|
- continue
|
|
|
+ obj = DetectedObject(
|
|
|
+ class_name=cls_name,
|
|
|
+ confidence=confidence,
|
|
|
+ bbox=(x1, y1, x2 - x1, y2 - y1),
|
|
|
+ center=((x1 + x2) // 2, (y1 + y2) // 2)
|
|
|
+ )
|
|
|
+ candidates.append(obj)
|
|
|
|
|
|
- obj = DetectedObject(
|
|
|
- class_name=cls_name,
|
|
|
- confidence=confidence,
|
|
|
- bbox=(x1, y1, x2 - x1, y2 - y1),
|
|
|
- center=((x1 + x2) // 2, (y1 + y2) // 2)
|
|
|
- )
|
|
|
- results.append(obj)
|
|
|
+ results = nms(candidates, iou_threshold=0.45)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"RKNN/ONNX 检测错误: {e}")
|
|
|
+ import traceback
|
|
|
+ logger.error(traceback.format_exc())
|
|
|
|
|
|
return results
|
|
|
|
|
|
@@ -723,7 +971,7 @@ class ObjectDetector:
|
|
|
results = []
|
|
|
h0, w0 = frame.shape[:2]
|
|
|
conf_threshold = self.config['confidence_threshold']
|
|
|
- class_map = self.config.get('class_map', {0: 'hat', 3: 'person', 4: 'reflective'})
|
|
|
+ class_map = self.config.get('class_map', {0: 'person'})
|
|
|
|
|
|
# 分3个重叠区域
|
|
|
overlap = int(h0 * 0.2)
|
|
|
@@ -737,42 +985,82 @@ class ObjectDetector:
|
|
|
|
|
|
for x_start, x_end in regions:
|
|
|
crop = frame[:, x_start:x_end]
|
|
|
- canvas, scale, pad_w, pad_h, ch, cw = self._letterbox(crop)
|
|
|
+ ch, cw = crop.shape[:2]
|
|
|
|
|
|
- if hasattr(self, 'rknn'):
|
|
|
- img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
- outputs = self.rknn.inference(inputs=[img[None, ...]])
|
|
|
+ # end2end 模型:resize + NCHW
|
|
|
+ if self.is_end2end:
|
|
|
+ img = cv2.resize(crop, (640, 640))
|
|
|
+ img = img.astype(np.float32) / 255.0
|
|
|
+ if hasattr(self, 'rknn'):
|
|
|
+ outputs = self.rknn.inference(inputs=[img.transpose(2, 0, 1)[None, ...]])
|
|
|
+ else:
|
|
|
+ outputs = self.session.run(None, {self.input_name: img.transpose(2, 0, 1)[None, ...]})
|
|
|
+ output = outputs[0]
|
|
|
+ if len(output.shape) == 3:
|
|
|
+ output = output[0]
|
|
|
+ dets = []
|
|
|
+ for i in range(output.shape[0]):
|
|
|
+ x1, y1, x2, y2, conf, cls_id = output[i]
|
|
|
+ if conf < conf_threshold:
|
|
|
+ continue
|
|
|
+ cls_name = class_map.get(int(cls_id), str(int(cls_id)))
|
|
|
+ if cls_name not in self.config['target_classes']:
|
|
|
+ continue
|
|
|
+ _x1 = int(x1 * cw / 640)
|
|
|
+ _y1 = int(y1 * ch / 640)
|
|
|
+ _x2 = int(x2 * cw / 640)
|
|
|
+ _y2 = int(y2 * ch / 640)
|
|
|
+ dets.append([_x1, _y1, _x2, _y2, float(conf), int(cls_id)])
|
|
|
else:
|
|
|
- img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
- img = img.transpose(2, 0, 1)
|
|
|
- outputs = self.session.run([self.output_name], {self.input_name: img[None, ...]})
|
|
|
+ canvas, scale, pad_w, pad_h, ch, cw = self._letterbox(crop)
|
|
|
|
|
|
- output = outputs[0]
|
|
|
- if len(output.shape) == 3:
|
|
|
- output = output[0]
|
|
|
+ if hasattr(self, 'rknn'):
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
+ outputs = self.rknn.inference(inputs=[img[None, ...]])
|
|
|
+ else:
|
|
|
+ img = canvas[..., ::-1].astype(np.float32) / 255.0
|
|
|
+ img = img.transpose(2, 0, 1)
|
|
|
+ outputs = self.session.run(None, {self.input_name: img[None, ...]})
|
|
|
|
|
|
- for i in range(output.shape[1]):
|
|
|
- class_probs = output[4:, i]
|
|
|
- best_class = int(np.argmax(class_probs))
|
|
|
- confidence = float(class_probs[best_class])
|
|
|
+ if len(outputs) == 9:
|
|
|
+ dets = self._decode_yolo11_outputs(
|
|
|
+ outputs, (640, 640), scale, pad_w, pad_h, ch, cw,
|
|
|
+ conf_threshold=conf_threshold
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # 标准 (84, 8400) 格式
|
|
|
+ output = outputs[0]
|
|
|
+ if len(output.shape) == 3:
|
|
|
+ output = output[0]
|
|
|
+ dets = []
|
|
|
+ for i in range(output.shape[1]):
|
|
|
+ class_probs = output[4:, i]
|
|
|
+ best_class = int(np.argmax(class_probs))
|
|
|
+ confidence = float(class_probs[best_class])
|
|
|
+ if confidence < conf_threshold:
|
|
|
+ continue
|
|
|
+ x1 = output[0, i] - output[2, i] / 2
|
|
|
+ y1 = output[1, i] - output[3, i] / 2
|
|
|
+ x2 = output[0, i] + output[2, i] / 2
|
|
|
+ y2 = output[1, i] + output[3, i] / 2
|
|
|
+ dets.append([x1, y1, x2, y2, confidence, best_class])
|
|
|
|
|
|
- if confidence < conf_threshold:
|
|
|
- continue
|
|
|
+ # 从 canvas 坐标映射回 crop 坐标
|
|
|
+ dets = np.array(dets) if dets else np.zeros((0, 6))
|
|
|
+ dets[:, [0, 2]] = (dets[:, [0, 2]] - pad_w) / scale
|
|
|
+ dets[:, [1, 3]] = (dets[:, [1, 3]] - pad_h) / scale
|
|
|
+ dets = dets.tolist()
|
|
|
|
|
|
- cls_name = class_map.get(best_class, str(best_class))
|
|
|
+ for x1, y1, x2, y2, confidence, best_class in dets:
|
|
|
+ cls_name = class_map.get(int(best_class), str(int(best_class)))
|
|
|
if cls_name not in self.config['target_classes']:
|
|
|
continue
|
|
|
|
|
|
- xc = float(output[0, i])
|
|
|
- yc = float(output[1, i])
|
|
|
- bw = float(output[2, i])
|
|
|
- bh = float(output[3, i])
|
|
|
-
|
|
|
# 转换到原图坐标(加上区域偏移)
|
|
|
- x1 = int(((xc - bw / 2) - pad_w) / scale) + x_start
|
|
|
- y1 = int(((yc - bh / 2) - pad_h) / scale)
|
|
|
- x2 = int(((xc + bw / 2) - pad_w) / scale) + x_start
|
|
|
- y2 = int(((yc + bh / 2) - pad_h) / scale)
|
|
|
+ x1 = int(x1) + x_start
|
|
|
+ y1 = int(y1)
|
|
|
+ x2 = int(x2) + x_start
|
|
|
+ y2 = int(y2)
|
|
|
|
|
|
x1 = max(0, min(w0, x1))
|
|
|
y1 = max(0, min(h0, y1))
|
|
|
@@ -790,7 +1078,7 @@ class ObjectDetector:
|
|
|
|
|
|
obj = DetectedObject(
|
|
|
class_name=cls_name,
|
|
|
- confidence=confidence,
|
|
|
+ confidence=float(confidence),
|
|
|
bbox=(x1, y1, x2 - x1, y2 - y1),
|
|
|
center=(cx, cy)
|
|
|
)
|