""" Ultralytics Tracker 封装 支持 YOLO (.pt) 端到端跟踪 和 RKNN/ONNX 检测 + BYTETracker 关联 """ import logging import os import types from typing import Any, List, Tuple, Optional from dataclasses import dataclass import numpy as np from config import TRACKING_CONFIG from safety_detector import Detection logger = logging.getLogger(__name__) # Model type constants MODEL_TYPE_AUTO = "auto" MODEL_TYPE_RKNN = "rknn" MODEL_TYPE_ONNX = "onnx" MODEL_TYPE_YOLO = "yolo" # Default YOLO model used when no local model is found. # Ultralytics will automatically download the weights on first use. DEFAULT_YOLO_MODEL = "yolo11n.pt" @dataclass class TrackedPerson: """跟踪目标""" track_id: int bbox: Tuple[int, int, int, int] # x1, y1, x2, y2 center: Tuple[int, int] confidence: float class_name: str = "person" lost: bool = False def resolve_model(model_path: Optional[str], model_type: str) -> Tuple[str, str]: """ 解析模型路径和类型 优先级: 1. 显式 model_type(非 auto)优先于扩展名推断 2. model_path 存在时使用 model_path 3. 否则使用 TRACKING_CONFIG['fallback_model_path'] 4. 最终回退到 Ultralytics 默认模型(自动下载) Args: model_path: 模型文件路径,可为 None model_type: 模型类型,'auto' 时根据扩展名推断,否则使用给定值 Returns: (resolved_path, resolved_type) """ def _infer_type(path: str) -> str: ext = os.path.splitext(path)[1].lower() if ext == ".rknn": return MODEL_TYPE_RKNN elif ext == ".onnx": return MODEL_TYPE_ONNX return MODEL_TYPE_YOLO # 1. 优先使用传入的 model_path if model_path and os.path.exists(model_path): resolved_type = _infer_type(model_path) if model_type == MODEL_TYPE_AUTO else model_type return model_path, resolved_type # 2. 回退到配置中的 fallback 路径 fallback = TRACKING_CONFIG.get("fallback_model_path") if fallback and os.path.exists(fallback): resolved_type = _infer_type(fallback) if model_type == MODEL_TYPE_AUTO else model_type return fallback, resolved_type # 3. 最终回退:Ultralytics 自动下载 return DEFAULT_YOLO_MODEL, MODEL_TYPE_YOLO class UltralyticsTracker: """Ultralytics 跟踪器封装 阈值说明: - conf_threshold: 调用模型/跟踪器时传入的检测置信度阈值,用于控制进入 跟踪流程的候选框数量。 - person_threshold: 对检测到的 "person" 类别在解析结果时应用的过滤阈值, 仅保留置信度不低于该值的人员目标。 """ def __init__( self, model_path: Optional[str] = None, model_type: str = MODEL_TYPE_AUTO, use_gpu: bool = True, tracker_type: str = "bytetrack", conf_threshold: float = 0.5, person_threshold: float = 0.5, max_lost: int = 30, ): if model_path is None: model_path = TRACKING_CONFIG["model_path"] self.model_path = model_path self.model_type = model_type self.use_gpu = use_gpu self.tracker_type = tracker_type self.conf_threshold = conf_threshold self.person_threshold = person_threshold self.max_lost = max_lost self.model = None self.rknn_detector = None self.byte_tracker = None resolved_path, resolved_type = resolve_model(model_path, model_type) self.model_path = resolved_path self.model_type = resolved_type self._load_model() def _load_model(self) -> None: if self.model_type == MODEL_TYPE_RKNN: self._load_rknn_model() elif self.model_type == MODEL_TYPE_ONNX: self._load_onnx_model() else: self._load_yolo_model() def _load_yolo_model(self) -> None: from ultralytics import YOLO self.model = YOLO(self.model_path) dummy = np.zeros((640, 640, 3), dtype=np.uint8) device = "cuda:0" if self.use_gpu else "cpu" # Warmup / JIT:在空白图上执行一次跟踪,触发 ultralytics 内部 # 的 tracker 初始化与可能的 PyTorch JIT 编译,避免首帧真实推理延迟。 self.model(dummy, task="track", tracker=f"{self.tracker_type}.yaml", persist=True, verbose=False, device=device) logger.info("YOLO 跟踪模型加载成功: %s", self.model_path) def _load_rknn_model(self) -> None: try: from safety_detector import RKNNDetector self.rknn_detector = RKNNDetector(self.model_path) self._init_byte_tracker() logger.info("RKNN 跟踪模型加载成功: %s", self.model_path) except ImportError as e: logger.warning("RKNN 加载失败 (%s),回退到 YOLO 模型", e) self.model_type = MODEL_TYPE_YOLO self._load_yolo_model() def _load_onnx_model(self) -> None: try: from safety_detector import ONNXDetector self.rknn_detector = ONNXDetector(self.model_path) self._init_byte_tracker() logger.info("ONNX 跟踪模型加载成功: %s", self.model_path) except ImportError as e: logger.warning("ONNX 加载失败 (%s),回退到 YOLO 模型", e) self.model_type = MODEL_TYPE_YOLO self._load_yolo_model() def _init_byte_tracker(self) -> None: try: from ultralytics.trackers.byte_tracker import BYTETracker self.byte_tracker = BYTETracker(args=self._tracker_args()) except Exception as e: logger.warning("初始化 BYTETracker 失败: %s,将使用简化 IOU 关联", e) self.byte_tracker = None def _tracker_args(self) -> types.SimpleNamespace: return types.SimpleNamespace( track_thresh=self.conf_threshold, match_thresh=0.8, track_buffer=self.max_lost, mot20=False, ) def update(self, frame: Optional[np.ndarray]) -> List[TrackedPerson]: if frame is None: return [] if self.model_type == MODEL_TYPE_YOLO: return self._update_yolo(frame) else: return self._update_rknn_onnx(frame) def _update_yolo(self, frame: np.ndarray) -> List[TrackedPerson]: device = "cuda:0" if self.use_gpu else "cpu" results = self.model( frame, task="track", tracker=f"{self.tracker_type}.yaml", persist=True, conf=self.conf_threshold, verbose=False, device=device, ) return self._parse_yolo_results(results, frame.shape) def _parse_yolo_results(self, results: List[Any], frame_shape: Tuple[int, ...]) -> List[TrackedPerson]: persons = [] h, w = frame_shape[:2] for det in results: boxes = det.boxes if boxes is None or len(boxes) == 0: continue for i in range(len(boxes)): cls_id = int(boxes.cls[i]) cls_name = det.names.get(cls_id, str(cls_id)) if cls_name != "person": continue conf = float(boxes.conf[i]) if conf < self.person_threshold: continue xyxy = boxes.xyxy[i] if hasattr(xyxy, "cpu"): xyxy = xyxy.cpu().numpy() x1, y1, x2, y2 = map(int, xyxy) track_id = int(boxes.id[i]) if boxes.id is not None else -1 center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 persons.append(TrackedPerson( track_id=track_id, bbox=(x1, y1, x2, y2), center=(center_x, center_y), confidence=conf, )) return persons def _update_rknn_onnx(self, frame: np.ndarray) -> List[TrackedPerson]: conf_map = {3: self.person_threshold} detections = self.rknn_detector.detect(frame, conf_map) # 只保留 person person_dets = [d for d in detections if d.class_id == 3] if not person_dets: return [] if self.byte_tracker is None: return self._simple_association(person_dets) # 构造 BYTETracker 输入 [x1, y1, x2, y2, conf, cls] try: import torch dets = [] for d in person_dets: x1, y1, x2, y2 = d.bbox dets.append([x1, y1, x2, y2, d.confidence, d.class_id]) dets_t = torch.tensor(dets, dtype=torch.float32) tracks = self.byte_tracker.update(dets_t, frame.shape) persons = [] for t in tracks: x1, y1, x2, y2 = map(int, t.tlbr) center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 persons.append(TrackedPerson( track_id=int(t.track_id), bbox=(x1, y1, x2, y2), center=(center_x, center_y), confidence=float(t.score), )) return persons except Exception as e: logger.warning("BYTETracker 更新失败: %s,使用简化关联", e) return self._simple_association(person_dets) def _simple_association(self, detections: List[Detection]) -> List[TrackedPerson]: """简化关联:无 ID 复用,每次返回新 track_id""" persons = [] for d in detections: x1, y1, x2, y2 = d.bbox center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 persons.append(TrackedPerson( track_id=-1, bbox=(x1, y1, x2, y2), center=(center_x, center_y), confidence=d.confidence, )) return persons def reset(self) -> None: if self.model_type == MODEL_TYPE_YOLO and self.model is not None: self.model.predictor.trackers = [] if self.byte_tracker is not None: self._init_byte_tracker() def release(self) -> None: if self.rknn_detector is not None: self.rknn_detector.release() self.rknn_detector = None self.model = None self.byte_tracker = None