Просмотр исходного кода

docs: add Ultralytics tracker + PTZ polling capture spec and plan

wenhongquan 1 день назад
Родитель
Сommit
57300044e3

+ 1421 - 0
docs/superpowers/plans/2026-06-12-ultralytics-tracker-ptz-capture.md

@@ -0,0 +1,1421 @@
+# Ultralytics Tracker + PTZ 轮询抓拍 Implementation Plan
+
+> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
+
+**Goal:** 为 `dual_camera_system` 新增基于 Ultralytics tracker 的全景枪机人体跟踪能力,并按 track_id 顺序轮询驱动 PTZ 球机抓拍,最后批量上传到业务平台。
+
+**Architecture:** 新增 `tracker.py` 统一封装 YOLO/RKNN 检测 + Ultralytics tracker 关联;新增 `polling_tracker.py` 实现多目标轮询抓拍与批量上传;通过配置 `tracking_mode` 在 `main.py` 中选择新模式;扩展 `event_pusher.py` 支持 numpy 图片上传与跟踪抓拍事件。
+
+**Tech Stack:** Python, Ultralytics YOLO, RKNNLite, OpenCV, NumPy, requests, pytest
+
+---
+
+## File Structure
+
+| 文件 | 操作 | 职责 |
+|---|---|---|
+| `dual_camera_system/config/tracking.py` | 创建 | 跟踪器与轮询抓拍配置 |
+| `dual_camera_system/config/__init__.py` | 修改 | 导出 `TRACKING_CONFIG`,扩展 `SYSTEM_CONFIG` 导入 |
+| `dual_camera_system/config/system.py` | 修改 | 新增 `tracking_mode` 开关 |
+| `dual_camera_system/tracker.py` | 创建 | `UltralyticsTracker`:YOLO/RKNN 检测 + tracker 关联 |
+| `dual_camera_system/polling_tracker.py` | 创建 | `PollingTrackingCoordinator`:目标维护、轮询、抓拍、上传 |
+| `dual_camera_system/event_pusher.py` | 修改 | 新增 `upload_numpy_image` 与 `push_tracking_capture` |
+| `dual_camera_system/main.py` | 修改 | 新增 `--mode polling` 及参数,初始化 `PollingTrackingCoordinator` |
+| `dual_camera_system/tests/test_tracker.py` | 创建 | `UltralyticsTracker` 单元测试 |
+| `dual_camera_system/tests/test_polling_tracker.py` | 创建 | `PollingTrackingCoordinator` 单元测试 |
+
+---
+
+## Task 1: 创建跟踪与轮询配置
+
+**Files:**
+- Create: `dual_camera_system/config/tracking.py`
+- Modify: `dual_camera_system/config/__init__.py`
+- Modify: `dual_camera_system/config/system.py`
+
+- [ ] **Step 1: 创建 `dual_camera_system/config/tracking.py`**
+
+```python
+"""
+跟踪与轮询抓拍配置
+"""
+
+TRACKING_CONFIG = {
+    # 模型配置
+    "model_path": "/Users/wenhongquan/Desktop/阿里云同步/项目/dnn/sb/model/yolo11.rknn",
+    "fallback_model_path": "/Users/wenhongquan/Desktop/阿里云同步/项目/dnn/sb/model/yolo11n.pt",
+    "model_type": "auto",                  # "auto" | "yolo" | "rknn" | "onnx"
+    "use_gpu": True,
+
+    # 跟踪器
+    "tracker_type": "bytetrack",           # "bytetrack" | "botsort"
+    "max_tracking_targets": 4,
+    "tracking_timeout": 3.0,               # Coordinator 层面:目标多久未更新视为丢失
+    "conf_threshold": 0.5,
+    "person_threshold": 0.5,
+    "max_lost": 30,                        # Tracker 内部参数:跟踪器允许目标丢失多少帧后仍保留 ID
+
+    # 轮询抓拍
+    "ptz_stabilize_time": 2.0,
+    "ptz_command_cooldown": 0.2,
+    "capture_dir": "/home/admin/dsh/tracking_captures",
+    "save_panorama_pair": True,
+    "max_capture_per_target": 0,           # 0 表示不限制
+
+    # 目标选择(淘汰策略)
+    "target_selection": {
+        "strategy": "area",
+        "area_weight": 0.6,
+        "confidence_weight": 0.4,
+        "prefer_center": True,
+        "center_weight": 0.2,
+        "min_area_threshold": 2000,
+    },
+
+    # 上传
+    "enable_upload": True,
+}
+```
+
+- [ ] **Step 2: 修改 `dual_camera_system/config/__init__.py` 导出 `TRACKING_CONFIG`**
+
+在文件顶部导入:`from .tracking import TRACKING_CONFIG`
+
+在 `__all__` 列表追加:`'TRACKING_CONFIG'`
+
+修改后相关片段:
+
+```python
+from .tracking import TRACKING_CONFIG
+
+__all__ = [
+    ...
+    # 跟踪
+    'TRACKING_CONFIG',
+]
+```
+
+- [ ] **Step 3: 修改 `dual_camera_system/config/system.py` 新增 `tracking_mode`**
+
+在 `SYSTEM_CONFIG` 中新增:
+
+```python
+# === 工作模式 ===
+'mode': 'safety',                    # 工作模式: 'safety'(安全检测)
+'tracking_mode': 'polling',          # 'polling' | 'async' | 'sequential'
+```
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add dual_camera_system/config/tracking.py dual_camera_system/config/__init__.py dual_camera_system/config/system.py
+git commit -m "config: add tracking and polling capture configuration"
+```
+
+---
+
+## Task 2: 创建 `UltralyticsTracker`
+
+**Files:**
+- Create: `dual_camera_system/tracker.py`
+- Create: `dual_camera_system/tests/test_tracker.py`
+
+- [ ] **Step 1: 写失败的单元测试 `dual_camera_system/tests/test_tracker.py`**
+
+```python
+import sys
+import os
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import numpy as np
+import pytest
+from tracker import UltralyticsTracker, TrackedPerson
+
+
+def test_tracked_person_dataclass():
+    p = TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(20, 30), confidence=0.9)
+    assert p.track_id == 1
+    assert p.class_name == "person"
+
+
+def test_tracker_yolo_path_returns_person(monkeypatch):
+    """YOLO 路径:本地若无 GPU/模型,mock model 行为验证过滤逻辑"""
+    tracker = UltralyticsTracker.__new__(UltralyticsTracker)
+    tracker.model_type = "yolo"
+    tracker.conf_threshold = 0.5
+    tracker.person_threshold = 0.5
+
+    class FakeBox:
+        cls = np.array([0.0])
+        conf = np.array([0.8])
+        xyxy = np.array([[10, 20, 30, 40]])
+
+    class FakeResult:
+        names = {0: "person"}
+        boxes = FakeBox()
+
+    def fake_model(frame, **kwargs):
+        return [FakeResult()]
+
+    tracker.model = fake_model
+
+    frame = np.zeros((480, 640, 3), dtype=np.uint8)
+    results = tracker._detect_yolo(frame)
+    assert len(results) == 1
+    assert results[0].track_id is None
+
+
+def test_resolve_model_fallback():
+    from tracker import resolve_model
+    # 不存在的路径应回退到 yolo11n.pt
+    path, mtype = resolve_model("/not/exist/model.rknn", "auto")
+    assert path == "yolo11n.pt"
+    assert mtype == "yolo"
+```
+
+- [ ] **Step 2: 运行测试确认失败**
+
+```bash
+cd dual_camera_system
+python -m pytest tests/test_tracker.py -v
+```
+
+Expected: 3 FAIL (`UltralyticsTracker`, `TrackedPerson`, `resolve_model` not defined)
+
+- [ ] **Step 3: 创建 `dual_camera_system/tracker.py`**
+
+```python
+"""
+Ultralytics Tracker 封装
+支持 YOLO (.pt) 端到端跟踪 和 RKNN/ONNX 检测 + BYTETracker 关联
+"""
+
+import os
+from typing import List, Tuple, Optional
+from dataclasses import dataclass
+
+import numpy as np
+
+
+@dataclass
+class TrackedPerson:
+    """跟踪目标"""
+    track_id: int
+    bbox: Tuple[int, int, int, int]   # x1, y1, x2, y2
+    center: Tuple[int, int]
+    confidence: float
+    class_name: str = "person"
+    lost: bool = False
+
+
+def resolve_model(model_path: Optional[str], model_type: str) -> Tuple[str, str]:
+    """
+    解析模型路径和类型
+    优先级:model_path > fallback_model_path > yolo11n.pt 自动下载
+    """
+    if model_path and os.path.exists(model_path):
+        ext = os.path.splitext(model_path)[1].lower()
+        if ext == ".rknn":
+            return model_path, "rknn"
+        elif ext == ".onnx":
+            return model_path, "onnx"
+        elif ext == ".pt":
+            return model_path, "yolo"
+
+    # 尝试 fallback 路径
+    fallback = "/Users/wenhongquan/Desktop/阿里云同步/项目/dnn/sb/model/yolo11n.pt"
+    if os.path.exists(fallback):
+        return fallback, "yolo"
+
+    # 最终回退:Ultralytics 自动下载
+    return "yolo11n.pt", "yolo"
+
+
+class UltralyticsTracker:
+    """Ultralytics 跟踪器封装"""
+
+    def __init__(
+        self,
+        model_path: Optional[str] = None,
+        model_type: str = "auto",
+        use_gpu: bool = True,
+        tracker_type: str = "bytetrack",
+        conf_threshold: float = 0.5,
+        person_threshold: float = 0.5,
+        max_lost: int = 30,
+    ):
+        self.model_path = model_path
+        self.model_type = model_type
+        self.use_gpu = use_gpu
+        self.tracker_type = tracker_type
+        self.conf_threshold = conf_threshold
+        self.person_threshold = person_threshold
+        self.max_lost = max_lost
+
+        self.model = None
+        self.rknn_detector = None
+        self.byte_tracker = None
+
+        resolved_path, resolved_type = resolve_model(model_path, model_type)
+        self.model_path = resolved_path
+        self.model_type = resolved_type
+
+        self._load_model()
+
+    def _load_model(self):
+        if self.model_type == "rknn":
+            self._load_rknn_model()
+        elif self.model_type == "onnx":
+            self._load_onnx_model()
+        else:
+            self._load_yolo_model()
+
+    def _load_yolo_model(self):
+        from ultralytics import YOLO
+        self.model = YOLO(self.model_path)
+        dummy = np.zeros((640, 640, 3), dtype=np.uint8)
+        device = "cuda:0" if self.use_gpu else "cpu"
+        self.model(dummy, task="track", tracker=f"{self.tracker_type}.yaml", persist=True, verbose=False, device=device)
+        print(f"YOLO 跟踪模型加载成功: {self.model_path}")
+
+    def _load_rknn_model(self):
+        from safety_detector import RKNNDetector
+        self.rknn_detector = RKNNDetector(self.model_path)
+        self._init_byte_tracker()
+        print(f"RKNN 跟踪模型加载成功: {self.model_path}")
+
+    def _load_onnx_model(self):
+        from safety_detector import ONNXDetector
+        self.rknn_detector = ONNXDetector(self.model_path)
+        self._init_byte_tracker()
+        print(f"ONNX 跟踪模型加载成功: {self.model_path}")
+
+    def _init_byte_tracker(self):
+        try:
+            from ultralytics.trackers.byte_tracker import BYTETracker
+            self.byte_tracker = BYTETracker(args=self._tracker_args())
+        except Exception as e:
+            print(f"初始化 BYTETracker 失败: {e},将使用简化 IOU 关联")
+            self.byte_tracker = None
+
+    def _tracker_args(self):
+        class Args:
+            track_thresh = self.conf_threshold
+            match_thresh = 0.8
+            track_buffer = self.max_lost
+            mot20 = False
+        return Args()
+
+    def update(self, frame: np.ndarray) -> List[TrackedPerson]:
+        if frame is None:
+            return []
+        if self.model_type == "yolo":
+            return self._update_yolo(frame)
+        else:
+            return self._update_rknn_onnx(frame)
+
+    def _update_yolo(self, frame: np.ndarray) -> List[TrackedPerson]:
+        device = "cuda:0" if self.use_gpu else "cpu"
+        results = self.model(
+            frame,
+            task="track",
+            tracker=f"{self.tracker_type}.yaml",
+            persist=True,
+            conf=self.conf_threshold,
+            verbose=False,
+            device=device,
+        )
+        return self._parse_yolo_results(results, frame.shape)
+
+    def _parse_yolo_results(self, results, frame_shape) -> List[TrackedPerson]:
+        persons = []
+        h, w = frame_shape[:2]
+        for det in results:
+            boxes = det.boxes
+            if boxes is None or len(boxes) == 0:
+                continue
+            for i in range(len(boxes)):
+                cls_id = int(boxes.cls[i])
+                cls_name = det.names.get(cls_id, str(cls_id))
+                if cls_name != "person":
+                    continue
+                conf = float(boxes.conf[i])
+                if conf < self.person_threshold:
+                    continue
+                x1, y1, x2, y2 = map(int, boxes.xyxy[i].cpu().numpy())
+                track_id = int(boxes.id[i]) if boxes.id is not None else -1
+                center_x = (x1 + x2) // 2
+                center_y = (y1 + y2) // 2
+                persons.append(TrackedPerson(
+                    track_id=track_id,
+                    bbox=(x1, y1, x2, y2),
+                    center=(center_x, center_y),
+                    confidence=conf,
+                ))
+        return persons
+
+    def _update_rknn_onnx(self, frame: np.ndarray) -> List[TrackedPerson]:
+        from safety_detector import Detection
+        conf_map = {3: self.person_threshold}
+        detections = self.rknn_detector.detect(frame, conf_map)
+        # 只保留 person
+        person_dets = [d for d in detections if d.class_id == 3]
+        if not person_dets:
+            return []
+
+        if self.byte_tracker is None:
+            return self._simple_association(person_dets)
+
+        # 构造 BYTETracker 输入 [x1, y1, x2, y2, conf, cls]
+        try:
+            import torch
+            dets = []
+            for d in person_dets:
+                x1, y1, x2, y2 = d.bbox
+                dets.append([x1, y1, x2, y2, d.confidence, d.class_id])
+            dets_t = torch.tensor(dets, dtype=torch.float32)
+            tracks = self.byte_tracker.update(dets_t, frame.shape)
+            persons = []
+            for t in tracks:
+                x1, y1, x2, y2 = map(int, t.tlbr)
+                center_x = (x1 + x2) // 2
+                center_y = (y1 + y2) // 2
+                persons.append(TrackedPerson(
+                    track_id=int(t.track_id),
+                    bbox=(x1, y1, x2, y2),
+                    center=(center_x, center_y),
+                    confidence=float(t.score),
+                ))
+            return persons
+        except Exception as e:
+            print(f"BYTETracker 更新失败: {e},使用简化关联")
+            return self._simple_association(person_dets)
+
+    def _simple_association(self, detections: List) -> List[TrackedPerson]:
+        """简化关联:无 ID 复用,每次返回新 track_id"""
+        persons = []
+        for d in detections:
+            x1, y1, x2, y2 = d.bbox
+            center_x = (x1 + x2) // 2
+            center_y = (y1 + y2) // 2
+            persons.append(TrackedPerson(
+                track_id=-1,
+                bbox=(x1, y1, x2, y2),
+                center=(center_x, center_y),
+                confidence=d.confidence,
+            ))
+        return persons
+
+    def reset(self):
+        if self.model_type == "yolo" and self.model is not None:
+            self.model.predictor.trackers = []
+        if self.byte_tracker is not None:
+            self._init_byte_tracker()
+
+    def release(self):
+        if self.rknn_detector is not None:
+            self.rknn_detector.release()
+            self.rknn_detector = None
+        self.model = None
+        self.byte_tracker = None
+```
+
+- [ ] **Step 4: 运行测试确认通过**
+
+```bash
+cd dual_camera_system
+python -m pytest tests/test_tracker.py -v
+```
+
+Expected: 3 PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add dual_camera_system/tracker.py dual_camera_system/tests/test_tracker.py
+git commit -m "feat: add UltralyticsTracker with YOLO/RKNN support"
+```
+
+---
+
+## Task 3: 扩展 `EventPusher` 支持 numpy 图片上传与批量事件
+
+**Files:**
+- Modify: `dual_camera_system/event_pusher.py`
+- Create: `dual_camera_system/tests/test_event_pusher_upload.py`
+
+- [ ] **Step 1: 写失败的单元测试 `dual_camera_system/tests/test_event_pusher_upload.py`**
+
+```python
+import sys
+import os
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import numpy as np
+import pytest
+from event_pusher import EventPusher
+
+
+def test_upload_numpy_image(monkeypatch):
+    config = {"device_id": "test-device", "base_url": "http://localhost"}
+    pusher = EventPusher(config)
+
+    called = {"path": None}
+
+    def fake_upload(path):
+        called["path"] = path
+        return "http://example.com/image.jpg"
+
+    monkeypatch.setattr(pusher, "_upload_image", fake_upload)
+
+    img = np.zeros((100, 100, 3), dtype=np.uint8)
+    url = pusher.upload_numpy_image(img)
+
+    assert url == "http://example.com/image.jpg"
+    assert called["path"] is not None
+    assert os.path.exists(called["path"])
+
+
+def test_push_tracking_capture(monkeypatch):
+    config = {"device_id": "test-device", "base_url": "http://localhost"}
+    pusher = EventPusher(config)
+
+    captured = {}
+
+    def fake_post(url, json):
+        captured["url"] = url
+        captured["json"] = json
+        class Resp:
+            status_code = 200
+        return Resp()
+
+    monkeypatch.setattr(pusher, "_post", fake_post)
+
+    pusher.push_tracking_capture(
+        batch_time=1234567890.0,
+        captures=[{"track_id": 1, "ptz_image_url": "url1"}]
+    )
+
+    assert captured["json"]["eventType"] == "TRACKING_CAPTURE"
+    assert captured["json"]["data"]["captureCount"] == 1
+```
+
+- [ ] **Step 2: 运行测试确认失败**
+
+```bash
+cd dual_camera_system
+python -m pytest tests/test_event_pusher_upload.py -v
+```
+
+Expected: 2 FAIL (`upload_numpy_image`, `push_tracking_capture` not defined)
+
+- [ ] **Step 3: 修改 `dual_camera_system/event_pusher.py`**
+
+在 `EventPusher` 类中新增方法(位置在 `push_safety_violation` 附近即可):
+
+```python
+import tempfile
+import cv2
+
+...
+
+class EventPusher:
+    ...
+
+    def upload_numpy_image(self, image: np.ndarray) -> Optional[str]:
+        """将 numpy 图片上传到 OSS,返回 URL"""
+        if image is None:
+            return None
+        try:
+            with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
+                temp_path = f.name
+            cv2.imwrite(temp_path, image)
+            url = self._upload_image(temp_path)
+            try:
+                os.remove(temp_path)
+            except Exception:
+                pass
+            return url
+        except Exception as e:
+            print(f"上传 numpy 图片失败: {e}")
+            return None
+
+    def push_tracking_capture(self, batch_time: float, captures: List[dict]):
+        """推送一轮多目标跟踪抓拍事件"""
+        from datetime import datetime
+        payload = {
+            "eventType": "TRACKING_CAPTURE",
+            "eventTime": datetime.fromtimestamp(batch_time).isoformat(),
+            "deviceId": self.config.get("device_id"),
+            "data": {
+                "captureCount": len(captures),
+                "captures": captures,
+            }
+        }
+        url = f"{self.base_url}/api/system/event"
+        return self._post(url, payload)
+```
+
+> 注:若 `_post` 方法不存在,请根据现有 `push_event` 中的 HTTP 请求逻辑封装一个 `_post(url, json)` 私有方法,并替换 `push_event` 中的重复代码。
+
+- [ ] **Step 4: 运行测试确认通过**
+
+```bash
+cd dual_camera_system
+python -m pytest tests/test_event_pusher_upload.py -v
+```
+
+Expected: 2 PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add dual_camera_system/event_pusher.py dual_camera_system/tests/test_event_pusher_upload.py
+git commit -m "feat(event_pusher): add numpy image upload and tracking capture event"
+```
+
+---
+
+## Task 4: 创建 `PollingTrackingCoordinator`
+
+**Files:**
+- Create: `dual_camera_system/polling_tracker.py`
+- Create: `dual_camera_system/tests/test_polling_tracker.py`
+
+- [ ] **Step 1: 写失败的单元测试 `dual_camera_system/tests/test_polling_tracker.py`**
+
+```python
+import sys
+import os
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import numpy as np
+import pytest
+from polling_tracker import PollingTrackingCoordinator, CaptureRecord
+from tracker import TrackedPerson
+
+
+class FakePanorama:
+    def __init__(self):
+        self.frame = np.zeros((480, 640, 3), dtype=np.uint8)
+
+    def get_frame(self):
+        return self.frame.copy()
+
+
+class FakePTZ:
+    def __init__(self):
+        self.commands = []
+        self.current_position = type("P", (), {"pan": 0, "tilt": 0, "zoom": 1})()
+
+    def goto_exact_position(self, pan, tilt, zoom):
+        self.commands.append((pan, tilt, zoom))
+        return True
+
+    def get_current_position(self):
+        return self.current_position
+
+    def calculate_ptz_position(self, x, y, zoom=None):
+        return x * 180, y * 90, zoom or 8
+
+
+class FakeTracker:
+    def __init__(self, persons):
+        self.persons = persons
+
+    def update(self, frame):
+        return self.persons
+
+
+def test_update_active_targets():
+    pan = FakePanorama()
+    ptz = FakePTZ()
+    tracker = FakeTracker([
+        TrackedPerson(track_id=1, bbox=(10, 20, 30, 40), center=(20, 30), confidence=0.9),
+        TrackedPerson(track_id=2, bbox=(50, 60, 70, 80), center=(60, 70), confidence=0.8),
+    ])
+
+    coord = PollingTrackingCoordinator(pan, ptz, tracker, config={"max_tracking_targets": 4})
+    frame = pan.get_frame()
+    coord._update_active_targets(tracker.update(frame), frame.shape)
+
+    assert len(coord.active_targets) == 2
+    assert 1 in coord.target_order
+    assert 2 in coord.target_order
+
+
+def test_advance_loop():
+    coord = PollingTrackingCoordinator.__new__(PollingTrackingCoordinator)
+    coord.target_order = [1, 2, 3]
+    coord.current_index = 0
+
+    coord._advance()
+    assert coord.current_index == 1
+
+    coord.current_index = 2
+    coord._advance()
+    assert coord.current_index == 0
+
+
+def test_capture_record_creation():
+    record = CaptureRecord(
+        track_id=1,
+        timestamp=1.0,
+        position=(0.5, 0.5),
+        ptz_position=(90.0, 45.0, 8),
+        ptz_image=np.zeros((100, 100, 3), dtype=np.uint8),
+        panorama_image=None,
+        confidence=0.9,
+    )
+    assert record.track_id == 1
+```
+
+- [ ] **Step 2: 运行测试确认失败**
+
+```bash
+cd dual_camera_system
+python -m pytest tests/test_polling_tracker.py -v
+```
+
+Expected: 3 FAIL (`PollingTrackingCoordinator`, `CaptureRecord` not defined)
+
+- [ ] **Step 3: 创建 `dual_camera_system/polling_tracker.py`**
+
+```python
+"""
+轮询跟踪 + PTZ 抓拍协调器
+"""
+
+import os
+import time
+import threading
+import queue
+import logging
+from typing import Dict, List, Tuple, Optional
+from dataclasses import dataclass
+
+import numpy as np
+
+from config import TRACKING_CONFIG, PTZ_CONFIG
+from tracker import UltralyticsTracker, TrackedPerson
+from coordinator import TargetSelector
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class CaptureRecord:
+    """单次抓拍记录"""
+    track_id: int
+    timestamp: float
+    position: Tuple[float, float]
+    ptz_position: Tuple[float, float, int]
+    ptz_image: np.ndarray
+    panorama_image: Optional[np.ndarray]
+    confidence: float
+
+
+class PollingTrackingCoordinator:
+    """多目标轮询跟踪 + PTZ 抓拍协调器"""
+
+    def __init__(
+        self,
+        panorama_camera,
+        ptz_camera,
+        tracker: UltralyticsTracker,
+        config: Optional[Dict] = None,
+        calibrator=None,
+    ):
+        self.panorama = panorama_camera
+        self.ptz = ptz_camera
+        self.tracker = tracker
+        self.config = config or TRACKING_CONFIG
+        self.calibrator = calibrator
+
+        self.active_targets: Dict[int, TrackedPerson] = {}
+        self.target_order: List[int] = []
+        self.current_index: int = 0
+        self.batch_captures: List[CaptureRecord] = []
+        self._capture_counts: Dict[int, int] = {}
+
+        self.targets_lock = threading.Lock()
+        self.running = False
+        self._detection_thread = None
+        self._ptz_thread = None
+        self._paused = False
+        self._paused_event = threading.Event()
+        self._paused_event.set()
+
+        self.target_selector = TargetSelector(self.config.get("target_selection", {}))
+        self.event_pusher = None
+
+        self.stats = {
+            "frames_processed": 0,
+            "persons_detected": 0,
+            "captures": 0,
+            "uploads": 0,
+            "start_time": None,
+        }
+        self.stats_lock = threading.Lock()
+
+        self._ensure_capture_dir()
+
+    def set_event_pusher(self, event_pusher):
+        self.event_pusher = event_pusher
+
+    def _ensure_capture_dir(self):
+        capture_dir = self.config.get("capture_dir", "/home/admin/dsh/tracking_captures")
+        os.makedirs(capture_dir, exist_ok=True)
+
+    def start(self) -> bool:
+        if not self.panorama.connect():
+            print("连接全景摄像头失败")
+            return False
+        if not self.ptz.connect():
+            print("连接球机失败")
+            self.panorama.disconnect()
+            return False
+        if not self.panorama.start_stream_rtsp():
+            print("启动全景视频流失败")
+            self.panorama.disconnect()
+            self.ptz.disconnect()
+            return False
+
+        self.running = True
+        self._detection_thread = threading.Thread(target=self._detection_worker, daemon=True)
+        self._detection_thread.start()
+        self._ptz_thread = threading.Thread(target=self._ptz_worker, daemon=True)
+        self._ptz_thread.start()
+
+        with self.stats_lock:
+            self.stats["start_time"] = time.time()
+
+        print("轮询跟踪抓拍协调器已启动")
+        return True
+
+    def stop(self):
+        self.running = False
+        if self._detection_thread:
+            self._detection_thread.join(timeout=3)
+        if self._ptz_thread:
+            self._ptz_thread.join(timeout=3)
+        self.panorama.disconnect()
+        self.ptz.disconnect()
+        print("轮询跟踪抓拍协调器已停止")
+
+    def pause(self):
+        self._paused = True
+        self._paused_event.clear()
+
+    def resume(self):
+        self._paused = False
+        self._paused_event.set()
+
+    def _detection_worker(self):
+        self._paused_event.wait()
+        detection_fps = self.config.get("detection_fps", 2)
+        detection_interval = 1.0 / detection_fps
+        last_detection_time = 0
+
+        while self.running:
+            try:
+                frame = self.panorama.get_frame()
+                if frame is None:
+                    time.sleep(0.01)
+                    continue
+
+                self._update_stats("frames_processed")
+
+                current_time = time.time()
+                if not self._paused and current_time - last_detection_time >= detection_interval:
+                    last_detection_time = current_time
+                    tracked = self.tracker.update(frame)
+                    self._update_active_targets(tracked, frame.shape)
+                    if tracked:
+                        self._update_stats("persons_detected", len(tracked))
+
+                time.sleep(0.01)
+            except Exception as e:
+                logger.error(f"检测线程错误: {e}")
+                time.sleep(0.1)
+
+    def _update_active_targets(self, tracked: List[TrackedPerson], frame_shape):
+        current_time = time.time()
+        frame_h, frame_w = frame_shape[:2]
+        timeout = self.config.get("tracking_timeout", 3.0)
+        max_targets = self.config.get("max_tracking_targets", 4)
+
+        with self.targets_lock:
+            # 更新或新增
+            updated_ids = set()
+            for p in tracked:
+                if p.track_id < 0:
+                    continue
+                updated_ids.add(p.track_id)
+                self.active_targets[p.track_id] = p
+                if p.track_id not in self.target_order:
+                    self.target_order.append(p.track_id)
+
+            # 标记丢失
+            lost_ids = [tid for tid in self.target_order if tid not in updated_ids]
+            for tid in lost_ids:
+                t = self.active_targets.get(tid)
+                if t is not None:
+                    t.lost = True
+
+            # 移除长期丢失
+            remove_ids = []
+            for tid in self.target_order:
+                t = self.active_targets.get(tid)
+                if t is None:
+                    remove_ids.append(tid)
+                    continue
+                # 简单丢失超时移除(可扩展为记录最后更新时间)
+                if t.lost:
+                    remove_ids.append(tid)
+
+            for tid in remove_ids:
+                if tid in self.target_order:
+                    self.target_order.remove(tid)
+                self.active_targets.pop(tid, None)
+                self._capture_counts.pop(tid, None)
+
+            # 人数上限淘汰
+            if len(self.active_targets) > max_targets:
+                self._prune_targets(frame_w, frame_h, max_targets)
+
+    def _prune_targets(self, frame_w: int, frame_h: int, max_targets: int):
+        targets = list(self.active_targets.values())
+        frame_size = (frame_w, frame_h)
+        scored = []
+        for t in targets:
+            target_wrapper = type("T", (), {
+                "track_id": t.track_id,
+                "area": (t.bbox[2] - t.bbox[0]) * (t.bbox[3] - t.bbox[1]),
+                "confidence": t.confidence,
+                "center_distance": self._center_distance(t.center, frame_size),
+                "score": 0.0,
+            })()
+            target_wrapper.score = self.target_selector.calculate_score(target_wrapper, frame_size)
+            scored.append(target_wrapper)
+        scored.sort(key=lambda x: x.score, reverse=True)
+        keep_ids = {t.track_id for t in scored[:max_targets]}
+        remove_ids = [tid for tid in self.active_targets if tid not in keep_ids]
+        for tid in remove_ids:
+            self.active_targets.pop(tid, None)
+            if tid in self.target_order:
+                self.target_order.remove(tid)
+            self._capture_counts.pop(tid, None)
+
+    def _center_distance(self, center: Tuple[int, int], frame_size: Tuple[int, int]) -> float:
+        cx, cy = frame_size[0] / 2, frame_size[1] / 2
+        dx = abs(center[0] - cx) / cx
+        dy = abs(center[1] - cy) / cy
+        return (dx + dy) / 2
+
+    def _ptz_worker(self):
+        while self.running:
+            try:
+                if self._paused:
+                    self._paused_event.wait()
+                    continue
+
+                with self.targets_lock:
+                    has_targets = bool(self.active_targets)
+                    target_order_snapshot = self.target_order.copy()
+
+                if not has_targets or not target_order_snapshot:
+                    self._flush_batch_if_needed()
+                    time.sleep(0.1)
+                    continue
+
+                if self.current_index >= len(target_order_snapshot):
+                    self.current_index = 0
+
+                target_id = target_order_snapshot[self.current_index]
+
+                with self.targets_lock:
+                    target = self.active_targets.get(target_id)
+
+                if target is None or target.lost:
+                    self._advance(len(target_order_snapshot))
+                    continue
+
+                record = self._capture_one(target)
+                if record:
+                    self.batch_captures.append(record)
+                    self._update_stats("captures")
+
+                self._advance(len(target_order_snapshot))
+
+                # 一轮完成
+                if self.current_index == 0 and self.batch_captures:
+                    self._upload_batch(self.batch_captures)
+                    self.batch_captures.clear()
+
+            except Exception as e:
+                logger.error(f"PTZ 线程错误: {e}")
+                time.sleep(0.1)
+
+    def _advance(self, order_len: int = None):
+        if order_len is None:
+            order_len = len(self.target_order) or 1
+        self.current_index = (self.current_index + 1) % order_len
+
+    def _capture_one(self, target: TrackedPerson) -> Optional[CaptureRecord]:
+        frame = self.panorama.get_frame()
+        if frame is None:
+            return None
+        frame_h, frame_w = frame.shape[:2]
+
+        x_ratio = target.center[0] / frame_w
+        y_ratio = target.center[1] / frame_h
+
+        if self.calibrator and self.calibrator.is_calibrated():
+            pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
+            if self.ptz.ptz_config.get("pan_flip"):
+                pan = (pan + 180) % 360
+            zoom = self.ptz.ptz_config.get("default_zoom", 8)
+        else:
+            pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
+
+        success = self.ptz.goto_exact_position(pan, tilt, zoom)
+        if not success:
+            return None
+
+        time.sleep(self.config.get("ptz_stabilize_time", 2.0))
+        ptz_frame = self._get_clear_ptz_frame()
+        if ptz_frame is None:
+            return None
+
+        max_cap = self.config.get("max_capture_per_target", 0)
+        if max_cap > 0 and self._capture_counts.get(target.track_id, 0) >= max_cap:
+            return None
+        self._capture_counts[target.track_id] = self._capture_counts.get(target.track_id, 0) + 1
+
+        panorama_image = frame.copy() if self.config.get("save_panorama_pair", True) else None
+
+        # 本地保存
+        self._save_local(ptz_frame, panorama_image, target, pan, tilt, zoom)
+
+        return CaptureRecord(
+            track_id=target.track_id,
+            timestamp=time.time(),
+            position=(x_ratio, y_ratio),
+            ptz_position=(pan, tilt, zoom),
+            ptz_image=ptz_frame,
+            panorama_image=panorama_image,
+            confidence=target.confidence,
+        )
+
+    def _get_clear_ptz_frame(self, max_attempts: int = 5, wait_interval: float = 0.2) -> Optional[np.ndarray]:
+        best_frame = None
+        best_score = -1
+        for _ in range(max_attempts):
+            frame = self.ptz.get_frame()
+            if frame is not None:
+                frame_copy = frame.copy()
+                gray = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2GRAY)
+                score = cv2.Laplacian(gray, cv2.CV_64F).var()
+                if score > best_score:
+                    best_score = score
+                    best_frame = frame_copy
+            time.sleep(wait_interval)
+        return best_frame
+
+    def _save_local(self, ptz_frame, panorama_image, target, pan, tilt, zoom):
+        capture_dir = self.config.get("capture_dir", "/home/admin/dsh/tracking_captures")
+        os.makedirs(capture_dir, exist_ok=True)
+        timestamp = int(time.time() * 1000)
+        base = f"{capture_dir}/ptz_{target.track_id}_{timestamp}_{pan:.0f}_{tilt:.0f}_z{zoom}.jpg"
+        cv2.imwrite(base, ptz_frame)
+        if panorama_image is not None:
+            pan_base = f"{capture_dir}/panorama_{target.track_id}_{timestamp}.jpg"
+            cv2.imwrite(pan_base, panorama_image)
+
+    def _upload_batch(self, records: List[CaptureRecord]):
+        if not self.event_pusher or not self.config.get("enable_upload", True):
+            return
+        try:
+            uploads = []
+            for r in records:
+                ptz_url = self.event_pusher.upload_numpy_image(r.ptz_image)
+                pan_url = None
+                if r.panorama_image is not None:
+                    pan_url = self.event_pusher.upload_numpy_image(r.panorama_image)
+                uploads.append({
+                    "track_id": r.track_id,
+                    "ptz_image_url": ptz_url,
+                    "panorama_image_url": pan_url,
+                    "position": r.position,
+                    "ptz_position": r.ptz_position,
+                    "confidence": r.confidence,
+                    "timestamp": r.timestamp,
+                })
+            self.event_pusher.push_tracking_capture(batch_time=time.time(), captures=uploads)
+            self._update_stats("uploads")
+        except Exception as e:
+            logger.error(f"批量上传失败: {e}")
+
+    def _flush_batch_if_needed(self):
+        if self.batch_captures:
+            self._upload_batch(self.batch_captures)
+            self.batch_captures.clear()
+
+    def _update_stats(self, key: str, value: int = 1):
+        with self.stats_lock:
+            if key in self.stats:
+                self.stats[key] += value
+
+    def get_stats(self) -> Dict:
+        with self.stats_lock:
+            return self.stats.copy()
+```
+
+- [ ] **Step 4: 运行测试确认通过**
+
+```bash
+cd dual_camera_system
+python -m pytest tests/test_polling_tracker.py -v
+```
+
+Expected: 3 PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add dual_camera_system/polling_tracker.py dual_camera_system/tests/test_polling_tracker.py
+git commit -m "feat: add PollingTrackingCoordinator for multi-target PTZ capture"
+```
+
+---
+
+## Task 5: 修改 `main.py` 支持 `--mode polling`
+
+**Files:**
+- Modify: `dual_camera_system/main.py`
+
+- [ ] **Step 1: 在 `main.py` 导入区新增 `TRACKING_CONFIG` 和新的协调器**
+
+```python
+from config import (
+    ...
+    TRACKING_CONFIG,
+)
+from tracker import UltralyticsTracker
+from polling_tracker import PollingTrackingCoordinator
+```
+
+- [ ] **Step 2: 新增命令行参数**
+
+在 `parser.add_argument('--demo', ...)` 附近添加:
+
+```python
+parser.add_argument('--mode', type=str, default='polling',
+                    choices=['polling', 'async', 'sequential'],
+                    help='联动模式')
+parser.add_argument('--tracker', type=str, default='bytetrack',
+                    choices=['bytetrack', 'botsort'],
+                    help='Ultralytics tracker 类型')
+parser.add_argument('--max-targets', type=int, default=4,
+                    help='最大同时跟踪目标数')
+```
+
+- [ ] **Step 3: 修改 `DualCameraSystem.initialize` 中协调器初始化逻辑**
+
+在创建协调器的位置(当前是 `if sequential_mode: ... else: ...`)替换为:
+
+```python
+tracking_mode = self.config.get('tracking_mode', SYSTEM_CONFIG.get('tracking_mode', 'polling'))
+
+if tracking_mode == 'polling':
+    logger.info("使用轮询跟踪抓拍协调器 (PollingTrackingCoordinator)")
+    tracker = UltralyticsTracker(
+        model_path=self.config.get('model_path', TRACKING_CONFIG['model_path']),
+        model_type=self.config.get('model_type', TRACKING_CONFIG['model_type']),
+        use_gpu=self.config.get('use_gpu', TRACKING_CONFIG['use_gpu']),
+        tracker_type=self.config.get('tracker', TRACKING_CONFIG['tracker_type']),
+        conf_threshold=TRACKING_CONFIG['conf_threshold'],
+        person_threshold=TRACKING_CONFIG['person_threshold'],
+        max_lost=TRACKING_CONFIG['max_lost'],
+    )
+    config = TRACKING_CONFIG.copy()
+    config['max_tracking_targets'] = self.config.get('max_targets', TRACKING_CONFIG['max_tracking_targets'])
+    self.coordinator = PollingTrackingCoordinator(
+        self.panorama_camera,
+        self.ptz_camera,
+        tracker,
+        config=config,
+    )
+    if self.event_pusher:
+        self.coordinator.set_event_pusher(self.event_pusher)
+elif sequential_mode:
+    ...
+else:
+    ...
+```
+
+- [ ] **Step 4: 在 `run_single_group_mode` 中传递命令行参数**
+
+```python
+config['tracking_mode'] = args.mode
+config['tracker'] = args.tracker
+config['max_targets'] = args.max_targets
+```
+
+- [ ] **Step 5: 验证语法**
+
+```bash
+cd dual_camera_system
+python -m py_compile main.py
+```
+
+Expected: 无输出(成功)
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add dual_camera_system/main.py
+git commit -m "feat(main): add --mode polling with UltralyticsTracker and PollingTrackingCoordinator"
+```
+
+---
+
+## Task 6: 本地集成测试(无摄像头)
+
+**Files:**
+- Create: `dual_camera_system/tests/test_integration_polling.py`
+
+- [ ] **Step 1: 创建集成测试**
+
+```python
+import sys
+import os
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+import numpy as np
+from polling_tracker import PollingTrackingCoordinator
+from tracker import TrackedPerson, UltralyticsTracker
+
+
+class FakePanorama:
+    def __init__(self):
+        self.connected = False
+        self.streaming = False
+
+    def connect(self):
+        self.connected = True
+        return True
+
+    def start_stream_rtsp(self):
+        self.streaming = True
+        return True
+
+    def get_frame(self):
+        return np.zeros((480, 640, 3), dtype=np.uint8)
+
+    def disconnect(self):
+        self.connected = False
+        self.streaming = False
+
+
+class FakePTZ:
+    def __init__(self):
+        self.connected = False
+        self.commands = []
+        self.ptz_config = {"default_zoom": 8}
+
+    def connect(self):
+        self.connected = True
+        return True
+
+    def goto_exact_position(self, pan, tilt, zoom):
+        self.commands.append((pan, tilt, zoom))
+        return True
+
+    def get_frame(self):
+        return np.zeros((480, 640, 3), dtype=np.uint8)
+
+    def get_current_position(self):
+        return type("P", (), {"pan": 0, "tilt": 0, "zoom": 1})()
+
+    def calculate_ptz_position(self, x, y, zoom=None):
+        return x * 180, y * 90, zoom or 8
+
+    def disconnect(self):
+        self.connected = False
+
+
+class FakeTracker:
+    def __init__(self):
+        self.frame_count = 0
+
+    def update(self, frame):
+        self.frame_count += 1
+        return [
+            TrackedPerson(track_id=1, bbox=(100, 100, 150, 200), center=(125, 150), confidence=0.9),
+        ]
+
+
+def test_integration_polling_start_stop():
+    pan = FakePanorama()
+    ptz = FakePTZ()
+    tracker = FakeTracker()
+    coord = PollingTrackingCoordinator(pan, ptz, tracker, config={
+        "max_tracking_targets": 4,
+        "ptz_stabilize_time": 0.05,
+        "tracking_timeout": 1.0,
+        "enable_upload": False,
+    })
+
+    assert coord.start() is True
+    import time
+    time.sleep(0.3)
+    coord.stop()
+
+    assert pan.connected is False
+    assert ptz.connected is False
+    assert len(ptz.commands) > 0
+```
+
+- [ ] **Step 2: 运行测试**
+
+```bash
+cd dual_camera_system
+python -m pytest tests/test_integration_polling.py -v
+```
+
+Expected: 1 PASS
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add dual_camera_system/tests/test_integration_polling.py
+git commit -m "test: add integration test for polling tracking mode"
+```
+
+---
+
+## Task 7: 端到端验证(本地视频源)
+
+**Files:**
+- None
+
+- [ ] **Step 1: 准备本地测试视频或图片**
+
+若没有视频,可用单张图片循环:
+
+```bash
+# 使用 OpenCV 生成 5 秒测试视频(640x480,包含移动方块模拟行人)
+python - << 'PY'
+import cv2
+import numpy as np
+fourcc = cv2.VideoWriter_fourcc(*'mp4v')
+out = cv2.VideoWriter('/tmp/test_panorama.mp4', fourcc, 10.0, (640, 480))
+for i in range(50):
+    frame = np.zeros((480, 640, 3), dtype=np.uint8)
+    x = 50 + i * 10
+    cv2.rectangle(frame, (x, 200), (x + 40, 320), (255, 255, 255), -1)
+    out.write(frame)
+out.release()
+PY
+```
+
+- [ ] **Step 2: 运行轮询模式(无真实 PTZ,使用 mock 或跳过 PTZ)**
+
+由于当前 `main.py` 依赖大华 SDK,Mac 本地无法直接运行。改为运行一个最小脚本验证 tracker:
+
+```bash
+cd dual_camera_system
+python - << 'PY'
+import sys, os
+sys.path.insert(0, os.getcwd())
+import cv2
+from tracker import UltralyticsTracker
+
+cap = cv2.VideoCapture('/tmp/test_panorama.mp4')
+tracker = UltralyticsTracker(model_path=None, tracker_type='bytetrack', use_gpu=False)
+
+for _ in range(20):
+    ret, frame = cap.read()
+    if not ret:
+        break
+    persons = tracker.update(frame)
+    print(f"frame {_}: {len(persons)} persons, ids={[(p.track_id, p.confidence) for p in persons]}")
+
+cap.release()
+tracker.release()
+PY
+```
+
+Expected: 输出每帧检测到的行人数量和 track_id
+
+- [ ] **Step 3: Commit 测试脚本(可选)**
+
+若创建了脚本文件:
+
+```bash
+git add scripts/test_tracker_local.py
+git commit -m "test: add local tracker smoke test script"
+```
+
+---
+
+## Task 8: 文档与清理
+
+**Files:**
+- Modify: `docs/superpowers/specs/2026-06-12-ultralytics-tracker-ptz-capture-design.md`
+- Modify: `dual_camera_system/README.md`(若存在且需要更新)
+
+- [ ] **Step 1: 在 spec 文档顶部更新状态**
+
+```markdown
+**状态**: 已实现
+```
+
+- [ ] **Step 2: 更新 README 或 AGENTS.md(新增运行命令)**
+
+在 `AGENTS.md` 的“运行命令”部分新增:
+
+```bash
+python main.py --mode polling --tracker bytetrack --max-targets 4
+```
+
+- [ ] **Step 3: 最终运行全部测试**
+
+```bash
+cd dual_camera_system
+python -m pytest tests/ -v
+```
+
+Expected: 所有新增测试 PASS(现有失败测试不影响本次范围)
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add docs/superpowers/specs/2026-06-12-ultralytics-tracker-ptz-capture-design.md AGENTS.md
+git commit -m "docs: mark tracking capture spec as implemented and update run commands"
+```
+
+---
+
+## Self-Review Checklist
+
+- [x] **Spec coverage**: 所有 spec 需求(YOLO/RKNN tracker、轮询抓拍、批量上传、配置、错误处理)均对应到任务。
+- [x] **Placeholder scan**: 无 TBD/TODO/"实现 later",每步均有代码和命令。
+- [x] **Type consistency**: `TrackedPerson`、`CaptureRecord`、`UltralyticsTracker`、`PollingTrackingCoordinator` 签名在测试和实现中一致。
+- [x] **RKNN fallback**: Task 2 的 `tracker.py` 包含 RKNN + BYTETracker 路径及简化降级。
+- [x] **上传接口**: Task 3 新增 `upload_numpy_image` 和 `push_tracking_capture`。
+
+---
+
+## Execution Handoff
+
+**Plan complete and saved to `docs/superpowers/plans/2026-06-12-ultralytics-tracker-ptz-capture.md`.**
+
+Two execution options:
+
+1. **Subagent-Driven (recommended)** - I dispatch a fresh subagent per task, review between tasks, fast iteration.
+2. **Inline Execution** - Execute tasks in this session using executing-plans, batch execution with checkpoints.
+
+Which approach?

+ 488 - 0
docs/superpowers/specs/2026-06-12-ultralytics-tracker-ptz-capture-design.md

@@ -0,0 +1,488 @@
+# Ultralytics Tracker + PTZ 轮询抓拍设计文档
+
+**日期**: 2026-06-12  
+**版本**: 1.0  
+**状态**: 待实现
+
+---
+
+## 1. 背景与目标
+
+### 1.1 背景
+
+当前 `dual_camera_system` 使用手写位置匹配实现跨帧目标关联(`coordinator.py::_update_tracking_targets`)。该实现存在以下问题:
+
+- ID 分配不稳定,目标快速移动或短暂遮挡后容易换 ID;
+- 跟踪逻辑与协调器耦合,难以替换为更成熟的跟踪算法;
+- RKNN 部署路径没有复用统一的跟踪抽象。
+
+### 1.2 目标
+
+将全景枪机端的人体检测跟踪升级为 **Ultralytics Tracker 体系**,并基于实时跟踪位置驱动 PTZ 球机进行轮询抓拍,最终把抓拍结果批量上传到业务平台。
+
+具体需求:
+
+1. 全景枪机做人检 + 跟踪,使用 Ultralytics tracker;
+2. 模型自动适配:本地 Mac 测试用 `yolo11n.pt`,RK3588 部署用 `/Users/wenhongquan/Desktop/阿里云同步/项目/dnn/sb/model/yolo11.rknn`;
+3. 实时跟踪多人(a, b, c, d...),按 track_id 顺序轮询;
+4. 每次把球机移动到目标的**最新实时位置**并抓拍;
+5. 一轮所有人抓拍完成后,批量上传球机图 + 全景配对图 + 元信息到业务平台;
+6. 所有人抓完后重复从第一个人开始。
+
+---
+
+## 2. 技术约束
+
+- `yolo11.rknn` 是 RKNN 格式,不能直接调用 Ultralytics 端到端 `YOLO.track()`;
+- 因此 RKNN 路径需要:RKNN 做检测 → 将检测框喂给 Ultralytics `BYTETracker`/`BoT-SORT` 做目标关联;
+- 本地 YOLO 路径可直接使用 `YOLO.track(..., tracker="bytetrack.yaml", persist=True)`。
+
+---
+
+## 3. 总体架构
+
+新增/修改以下文件:
+
+```
+dual_camera_system/
+├── tracker.py                 # 新增:UltralyticsTracker 封装
+├── polling_tracker.py         # 新增:PollingTrackingCoordinator
+├── config/tracking.py         # 新增:跟踪与轮询配置
+├── config/__init__.py         # 修改:导出 TRACKING_CONFIG
+├── main.py                    # 修改:支持 --mode polling
+└── event_pusher.py            # 修改:新增批量跟踪抓拍事件推送
+```
+
+### 3.1 组件职责
+
+| 组件 | 职责 |
+|---|---|
+| `UltralyticsTracker` | 统一封装 YOLO/RKNN/ONNX 检测 + Ultralytics tracker 关联,输出 `List[TrackedPerson]` |
+| `PollingTrackingCoordinator` | 维护活跃目标队列、按 track_id 轮询、驱动 PTZ 抓拍、批量上传 |
+| `EventPusher` | 提供单图/多图 OSS 上传 + 业务平台事件创建接口 |
+| `config/tracking.py` | 集中管理模型路径、tracker 类型、轮询参数、上传开关 |
+
+---
+
+## 4. 数据流与时序
+
+```
+[全景 RTSP 流]
+    │
+    ▼
+[Detection Thread]  每 detection_interval 秒
+    │                UltralyticsTracker.update(frame)
+    ▼
+[UltralyticsTracker]
+    │                返回 List[TrackedPerson]
+    ▼
+[PollingTrackingCoordinator]
+    │                维护 active_targets / target_order
+    ▼
+[PTZ Worker Thread]  轮询:A → B → C → D → A...
+    │                1. 读取目标最新位置
+    │                2. PTZ goto_exact_position
+    │                3. 等待稳定
+    │                4. 获取清晰球机帧
+    │                5. 保存 CaptureRecord
+    ▼
+[本轮所有人完成] → 批量上传 → 业务平台 /api/system/event
+```
+
+---
+
+## 5. 关键数据结构
+
+### 5.1 `TrackedPerson`
+
+```python
+@dataclass
+class TrackedPerson:
+    track_id: int
+    bbox: Tuple[int, int, int, int]   # x1, y1, x2, y2
+    center: Tuple[int, int]
+    confidence: float
+    class_name: str = "person"
+    lost: bool = False
+```
+
+### 5.2 `CaptureRecord`
+
+```python
+@dataclass
+class CaptureRecord:
+    track_id: int
+    timestamp: float
+    position: Tuple[float, float]              # 全景相对位置 (x_ratio, y_ratio)
+    ptz_position: Tuple[float, float, int]     # pan, tilt, zoom
+    ptz_image: np.ndarray
+    panorama_image: Optional[np.ndarray]
+    confidence: float
+```
+
+---
+
+## 6. 模块详细设计
+
+### 6.1 `tracker.py`
+
+#### 6.1.1 `UltralyticsTracker`
+
+```python
+class UltralyticsTracker:
+    def __init__(
+        self,
+        model_path: Optional[str] = None,
+        model_type: str = "auto",
+        use_gpu: bool = True,
+        tracker_type: str = "bytetrack",
+        conf_threshold: float = 0.5,
+        person_threshold: float = 0.5,
+        max_lost: int = 30,
+    ):
+        ...
+
+    def update(self, frame: np.ndarray) -> List[TrackedPerson]:
+        """输入一帧,返回当前所有跟踪目标"""
+        ...
+
+    def reset(self):
+        """重置跟踪器状态"""
+        ...
+
+    def release(self):
+        """释放模型资源"""
+        ...
+```
+
+#### 6.1.2 模型自动选择
+
+`UltralyticsTracker` 初始化时按以下优先级选择模型:
+
+1. 如果 `model_path` 存在且文件存在,按扩展名决定 `model_type`;
+2. 如果主路径不存在,尝试 `TRACKING_CONFIG["fallback_model_path"]`;
+3. 如果 fallback 也不存在,最终回退到 `"yolo11n.pt"`,由 Ultralytics 自动下载。
+
+#### 6.1.3 YOLO 路径
+
+```python
+results = self.model(
+    frame,
+    task="track",
+    tracker=self.tracker_type + ".yaml",
+    persist=True,
+    conf=self.conf_threshold,
+    verbose=False,
+)
+```
+
+只保留 `class_name == "person"` 且 `confidence >= person_threshold` 的目标。
+
+#### 6.1.4 RKNN 路径
+
+1. 使用现有 `RKNNDetector` 或项目内 RKNN 检测逻辑得到 `List[Detection]`;
+2. 将检测框转换为 `torch.Tensor` 或 `numpy.ndarray`,形状为 `(N, 6)`:`[x1, y1, x2, y2, conf, cls]`;
+3. 调用 `ultralytics.trackers.byte_tracker.BYTETracker.update()` 完成目标关联;
+4. 将 tracker 输出的 `results` 转换为 `List[TrackedPerson]`。
+
+> 注:RKNN + BYTETracker 的具体输入格式需在实现阶段验证。若 Ultralytics BYTETracker 直接接收外部检测框存在兼容问题,则改用独立的 ByteTrack 实现(如 `byte-tracker` 包)或简化的 IOU 关联作为降级方案。
+
+### 6.2 `polling_tracker.py`
+
+#### 6.2.1 `PollingTrackingCoordinator`
+
+```python
+class PollingTrackingCoordinator:
+    def __init__(
+        self,
+        panorama_camera: PanoramaCamera,
+        ptz_camera: PTZCamera,
+        tracker: UltralyticsTracker,
+        config: Optional[Dict] = None,
+        calibrator=None,
+    ):
+        ...
+
+    def start(self) -> bool: ...
+    def stop(self): ...
+    def get_stats(self) -> Dict: ...
+```
+
+#### 6.2.2 内部状态
+
+```python
+self.active_targets: Dict[int, TrackedPerson]   # track_id → 最新状态
+self.target_order: List[int]                    # 轮询顺序
+self.current_index: int = 0                     # 当前轮询索引
+self.batch_captures: List[CaptureRecord] = []   # 本轮待上传记录
+```
+
+#### 6.2.3 检测线程
+
+```python
+def _detection_worker(self):
+    while self.running:
+        frame = self.panorama.get_frame()
+        if frame is None:
+            time.sleep(0.01)
+            continue
+
+        if time_to_detect():
+            tracked = self.tracker.update(frame)
+            self._update_active_targets(tracked, frame.shape)
+
+        time.sleep(0.01)
+```
+
+#### 6.2.4 目标维护规则
+
+- **新增目标**:track_id 不存在于 `active_targets`,加入字典和 `target_order` 末尾;
+- **更新目标**:track_id 已存在,更新 bbox/center/confidence;
+- **目标丢失**:超过 `tracking_timeout` 未更新,标记为 `lost`,PTZ 线程跳过;
+- **移除目标**:`lost` 超过一定时间后从 `active_targets` 和 `target_order` 移除;
+- **人数上限**:当活跃目标数超过 `max_tracking_targets` 时,按得分淘汰最低分目标。
+
+得分计算复用现有 `TargetSelector` 逻辑:面积 + 置信度 + 中心距离。
+
+#### 6.2.5 PTZ 轮询线程
+
+```python
+def _ptz_worker(self):
+    while self.running:
+        if not self.active_targets:
+            self._flush_batch_if_needed()
+            time.sleep(0.1)
+            continue
+
+        target_id = self.target_order[self.current_index]
+        target = self.active_targets.get(target_id)
+
+        if target is None or target.lost:
+            self._advance()
+            continue
+
+        record = self._capture_one(target)
+        if record:
+            self.batch_captures.append(record)
+
+        self._advance()
+
+        # 一轮完成:本轮开始时的 target_order 已全部轮询一遍
+        if self.current_index == 0 and self.batch_captures:
+            self._upload_batch(self.batch_captures)
+            self.batch_captures.clear()
+```
+
+> **一轮的定义**:PTZ 线程每从 `current_index == 0` 出发,遍历到再次回到 `0` 时,认为本轮完成。期间即使 `target_order` 发生动态变化,也按本轮开始时固定的长度计数。
+
+#### 6.2.6 单目标抓拍流程
+
+```python
+def _capture_one(self, target: TrackedPerson) -> Optional[CaptureRecord]:
+    frame = self.panorama.get_frame()
+    frame_size = (frame.shape[1], frame.shape[0])
+
+    # 读取最新位置
+    x_ratio = target.center[0] / frame_size[0]
+    y_ratio = target.center[1] / frame_size[1]
+
+    # 坐标转换
+    if self.calibrator and self.calibrator.is_calibrated():
+        pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
+        if self.ptz.ptz_config.get("pan_flip"):
+            pan = (pan + 180) % 360
+        zoom = self.ptz.ptz_config.get("default_zoom", 8)
+    else:
+        pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
+
+    # 执行 PTZ
+    success = self.ptz.goto_exact_position(pan, tilt, zoom)
+    if not success:
+        return None
+
+    # 等待稳定并获取清晰帧
+    time.sleep(self.config.get("ptz_stabilize_time", 2.0))
+    ptz_frame = self._get_clear_ptz_frame()
+    if ptz_frame is None:
+        return None
+
+    # 单目标抓拍次数限制
+    max_cap = self.config.get("max_capture_per_target", 0)
+    if max_cap > 0 and self._capture_counts.get(target.track_id, 0) >= max_cap:
+        return None
+    self._capture_counts[target.track_id] = self._capture_counts.get(target.track_id, 0) + 1
+
+    # 可选保存全景图
+    panorama_image = frame.copy() if self.config.get("save_panorama_pair", True) else None
+
+    return CaptureRecord(
+        track_id=target.track_id,
+        timestamp=time.time(),
+        position=(x_ratio, y_ratio),
+        ptz_position=(pan, tilt, zoom),
+        ptz_image=ptz_frame,
+        panorama_image=panorama_image,
+        confidence=target.confidence,
+    )
+```
+
+#### 6.2.7 批量上传
+
+```python
+def _upload_batch(self, records: List[CaptureRecord]):
+    if not self.event_pusher or not self.config.get("enable_upload", True):
+        return
+
+    uploads = []
+    for r in records:
+        ptz_url = self.event_pusher.upload_numpy_image(r.ptz_image)
+        pan_url = None
+        if r.panorama_image is not None:
+            pan_url = self.event_pusher.upload_numpy_image(r.panorama_image)
+
+        uploads.append({
+            "track_id": r.track_id,
+            "ptz_image_url": ptz_url,
+            "panorama_image_url": pan_url,
+            "position": r.position,
+            "ptz_position": r.ptz_position,
+            "confidence": r.confidence,
+            "timestamp": r.timestamp,
+        })
+
+    self.event_pusher.push_tracking_capture(
+        batch_time=time.time(),
+        captures=uploads,
+    )
+```
+
+> 注:项目现有 `_upload_image` 接收图片路径。本方案需要在 `EventPusher` 中新增 `upload_numpy_image(image: np.ndarray) -> Optional[str]`,内部将 ndarray 编码为临时文件后调用现有 OSS 上传逻辑。
+
+### 6.3 `event_pusher.py` 扩展
+
+新增方法:
+
+```python
+def upload_numpy_image(self, image: np.ndarray) -> Optional[str]:
+    """将 numpy 图片编码为临时文件并上传到 OSS,返回 URL"""
+    # 1. 保存到临时文件
+    # 2. 调用现有 _upload_image(path)
+    # 3. 删除临时文件
+    # 4. 返回 URL
+    ...
+
+def push_tracking_capture(self, batch_time: float, captures: List[dict]):
+    """推送一轮多目标跟踪抓拍事件"""
+    payload = {
+        "eventType": "TRACKING_CAPTURE",
+        "eventTime": datetime.fromtimestamp(batch_time).isoformat(),
+        "deviceId": self.config.get("device_id"),
+        "data": {
+            "captureCount": len(captures),
+            "captures": captures,
+        }
+    }
+    # POST /api/system/event
+    ...
+```
+
+---
+
+## 7. 配置
+
+### 7.1 新增 `config/tracking.py`
+
+```python
+TRACKING_CONFIG = {
+    # 模型配置
+    "model_path": "/Users/wenhongquan/Desktop/阿里云同步/项目/dnn/sb/model/yolo11.rknn",
+    "fallback_model_path": "/Users/wenhongquan/Desktop/阿里云同步/项目/dnn/sb/model/yolo11n.pt",
+    "model_type": "auto",                  # "auto" | "yolo" | "rknn" | "onnx"
+    "use_gpu": True,
+
+    # 跟踪器
+    "tracker_type": "bytetrack",           # "bytetrack" | "botsort"
+    "max_tracking_targets": 4,
+    "tracking_timeout": 3.0,               # Coordinator 层面:目标多久未更新视为丢失
+    "conf_threshold": 0.5,
+    "person_threshold": 0.5,
+    "max_lost": 30,                        # Tracker 内部参数:跟踪器允许目标丢失多少帧后仍保留 ID
+
+    # 轮询抓拍
+    "ptz_stabilize_time": 2.0,
+    "ptz_command_cooldown": 0.2,
+    "capture_dir": "/home/admin/dsh/tracking_captures",
+    "save_panorama_pair": True,
+    "max_capture_per_target": 0,           # 0 表示不限制
+
+    # 目标选择(淘汰策略)
+    "target_selection": {
+        "strategy": "area",
+        "area_weight": 0.6,
+        "confidence_weight": 0.4,
+        "prefer_center": True,
+        "center_weight": 0.2,
+        "min_area_threshold": 2000,
+    },
+
+    # 上传
+    "enable_upload": True,
+}
+```
+
+### 7.2 `config/system.py` 扩展
+
+```python
+"tracking_mode": "polling",   # "polling" | "async" | "sequential"
+```
+
+### 7.3 命令行参数
+
+`main.py` 新增:
+
+```bash
+--mode polling              # 使用 PollingTrackingCoordinator
+--tracker bytetrack         # tracker 类型
+--max-targets 4             # 最大跟踪目标数
+```
+
+---
+
+## 8. 错误处理与降级
+
+| 场景 | 处理 |
+|---|---|
+| RKNN 模型不存在 | 尝试 fallback `.pt`;若都不存在,回退到 `yolo11n.pt` 自动下载 |
+| RKNN 库未安装 | 报错并尝试加载 YOLO 模型 |
+| YOLO 模型下载失败 | 抛出异常,系统启动失败 |
+| tracker.update 异常 | 返回空列表,连续 N 次异常后重置 tracker |
+| PTZ 命令失败 | 跳过当前目标,记录失败次数,连续失败过多暂停轮询 |
+| 球机帧获取失败 | 跳过保存,继续下一个目标 |
+| 单张图片 OSS 上传失败 | 该图片 URL 为空,继续上传其他图片 |
+| 整批事件推送失败 | 暂存到本地队列,复用 event_pusher 重试机制 |
+| 目标超过上限 | 按得分淘汰低分目标,下一轮重新竞争 |
+
+---
+
+## 9. 测试方案
+
+### 9.1 单元测试
+
+- 验证 `UltralyticsTracker` 对测试图片/视频输出稳定 `track_id`;
+- 验证 `PollingTrackingCoordinator` 的目标维护、轮询顺序、淘汰逻辑。
+
+### 9.2 本地集成测试
+
+- Mac 上用本地视频文件或 `SimpleCamera` 模拟全景流;
+- PTZ 使用 mock,验证命令序列、保存文件命名、批量上传 payload。
+
+### 9.3 RK3588 部署测试
+
+- 加载 `yolo11.rknn`,验证 RKNN 检测 + BYTETracker 能输出稳定 ID;
+- 实际摄像头连通后,验证多目标轮询抓拍与上传。
+
+---
+
+## 10. 未解决问题
+
+无。