|
|
@@ -1,2154 +0,0 @@
|
|
|
-"""
|
|
|
-联动控制器
|
|
|
-协调全景摄像头和球机的工作
|
|
|
-"""
|
|
|
-
|
|
|
-import time
|
|
|
-import threading
|
|
|
-import queue
|
|
|
-import logging
|
|
|
-import math
|
|
|
-from typing import Optional, List, Dict, Tuple, Callable
|
|
|
-from dataclasses import dataclass, field
|
|
|
-from enum import Enum
|
|
|
-
|
|
|
-import numpy as np
|
|
|
-import cv2
|
|
|
-
|
|
|
-from config import COORDINATOR_CONFIG, SYSTEM_CONFIG, PTZ_CONFIG, DETECTION_CONFIG
|
|
|
-from panorama_camera import PanoramaCamera, ObjectDetector, DetectedObject
|
|
|
-from ptz_camera import PTZCamera, PTZController
|
|
|
-from ptz_person_tracker import PTZPersonDetector, PTZAutoZoomController
|
|
|
-from paired_image_saver import PairedImageSaver, get_paired_saver, PersonInfo
|
|
|
-
|
|
|
-logger = logging.getLogger(__name__)
|
|
|
-
|
|
|
-
|
|
|
-class TrackingState(Enum):
|
|
|
- """跟踪状态"""
|
|
|
- IDLE = 0 # 空闲
|
|
|
- SEARCHING = 1 # 搜索目标
|
|
|
- TRACKING = 2 # 跟踪中
|
|
|
- ZOOMING = 3 # 变焦中
|
|
|
-
|
|
|
-
|
|
|
-@dataclass
|
|
|
-class TrackingTarget:
|
|
|
- """跟踪目标"""
|
|
|
- track_id: int # 跟踪ID
|
|
|
- position: Tuple[float, float] # 位置比例 (x_ratio, y_ratio)
|
|
|
- last_update: float # 最后更新时间
|
|
|
- person_info: Optional[dict] = None # 人员信息
|
|
|
- priority: int = 0 # 优先级
|
|
|
- area: int = 0 # 目标面积(像素²)
|
|
|
- confidence: float = 0.0 # 置信度
|
|
|
- center_distance: float = 1.0 # 到画面中心的距离比例(0-1)
|
|
|
- score: float = 0.0 # 综合得分
|
|
|
-
|
|
|
-
|
|
|
-class TargetSelector:
|
|
|
- """
|
|
|
- 目标选择策略类
|
|
|
- 支持按面积、置信度、混合模式排序,支持优先级切换
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self, config: Dict = None):
|
|
|
- """
|
|
|
- 初始化目标选择器
|
|
|
- Args:
|
|
|
- config: 目标选择配置
|
|
|
- """
|
|
|
- self.config = config or {
|
|
|
- 'strategy': 'area',
|
|
|
- 'area_weight': 0.6,
|
|
|
- 'confidence_weight': 0.4,
|
|
|
- 'min_area_threshold': 5000,
|
|
|
- 'prefer_center': True,
|
|
|
- 'center_weight': 0.2,
|
|
|
- 'switch_on_lost': True,
|
|
|
- 'stickiness': 0.3,
|
|
|
- }
|
|
|
- self.current_target_id: Optional[int] = None
|
|
|
- self.current_target_score: float = 0.0
|
|
|
-
|
|
|
- def calculate_score(self, target: TrackingTarget, frame_size: Tuple[int, int] = None) -> float:
|
|
|
- """
|
|
|
- 计算目标综合得分
|
|
|
- Args:
|
|
|
- target: 跟踪目标
|
|
|
- frame_size: 帧尺寸(w, h),用于计算中心距离
|
|
|
- Returns:
|
|
|
- 综合得分(0-1)
|
|
|
- """
|
|
|
- strategy = self.config.get('strategy', 'area')
|
|
|
- area_weight = self.config.get('area_weight', 0.6)
|
|
|
- conf_weight = self.config.get('confidence_weight', 0.4)
|
|
|
- min_area = self.config.get('min_area_threshold', 5000)
|
|
|
- prefer_center = self.config.get('prefer_center', False)
|
|
|
- center_weight = self.config.get('center_weight', 0.2)
|
|
|
-
|
|
|
- # 归一化面积得分 (对数缩放,避免大目标得分过高)
|
|
|
- import math
|
|
|
- area_score = min(1.0, math.log10(max(target.area, 1)) / 5.0) # 100000像素² ≈ 1.0
|
|
|
-
|
|
|
- # 小面积惩罚
|
|
|
- if target.area < min_area:
|
|
|
- area_score *= 0.5
|
|
|
-
|
|
|
- # 置信度得分直接使用
|
|
|
- conf_score = target.confidence
|
|
|
-
|
|
|
- # 中心距离得分 (距离中心越近得分越高)
|
|
|
- center_score = 1.0 - target.center_distance
|
|
|
-
|
|
|
- # 根据策略计算综合得分
|
|
|
- if strategy == 'area':
|
|
|
- score = area_score * 0.8 + conf_score * 0.2
|
|
|
- elif strategy == 'confidence':
|
|
|
- score = conf_score * 0.8 + area_score * 0.2
|
|
|
- else: # hybrid
|
|
|
- score = area_score * area_weight + conf_score * conf_weight
|
|
|
-
|
|
|
- # 加入中心距离权重
|
|
|
- if prefer_center:
|
|
|
- score = score * (1 - center_weight) + center_score * center_weight
|
|
|
-
|
|
|
- return score
|
|
|
-
|
|
|
- def select_target(self, targets: Dict[int, TrackingTarget],
|
|
|
- frame_size: Tuple[int, int] = None) -> Optional[TrackingTarget]:
|
|
|
- """
|
|
|
- 从多个目标中选择最优目标
|
|
|
- Args:
|
|
|
- targets: 目标字典 {track_id: TrackingTarget}
|
|
|
- frame_size: 帧尺寸
|
|
|
- Returns:
|
|
|
- 最优目标
|
|
|
- """
|
|
|
- if not targets:
|
|
|
- self.current_target_id = None
|
|
|
- return None
|
|
|
-
|
|
|
- stickiness = self.config.get('stickiness', 0.3)
|
|
|
- switch_on_lost = self.config.get('switch_on_lost', True)
|
|
|
-
|
|
|
- # 计算所有目标得分
|
|
|
- scored_targets = []
|
|
|
- for track_id, target in targets.items():
|
|
|
- target.score = self.calculate_score(target, frame_size)
|
|
|
- scored_targets.append((track_id, target, target.score))
|
|
|
-
|
|
|
- # 按得分排序
|
|
|
- scored_targets.sort(key=lambda x: x[2], reverse=True)
|
|
|
-
|
|
|
- # 检查当前目标是否仍在列表中
|
|
|
- if self.current_target_id is not None:
|
|
|
- current_exists = self.current_target_id in targets
|
|
|
- if current_exists:
|
|
|
- # 应用粘性:当前目标得分需要显著低于最优目标才切换
|
|
|
- best_id, best_target, best_score = scored_targets[0]
|
|
|
- current_target = targets[self.current_target_id]
|
|
|
-
|
|
|
- # 粘性阈值: 当前目标得分 > 最优得分 * (1 - stickiness) 时保持
|
|
|
- stickiness_threshold = best_score * (1 - stickiness)
|
|
|
- if current_target.score > stickiness_threshold:
|
|
|
- return current_target
|
|
|
-
|
|
|
-
|
|
|
- # 选择得分最高的目标
|
|
|
- best_id, best_target, best_score = scored_targets[0]
|
|
|
- self.current_target_id = best_id
|
|
|
- self.current_target_score = best_score
|
|
|
-
|
|
|
- logger.debug(
|
|
|
- f"[目标选择] 选择目标ID={best_id} 得分={best_score:.3f} "
|
|
|
- f"面积={best_target.area} 置信度={best_target.confidence:.2f}"
|
|
|
- )
|
|
|
-
|
|
|
- return best_target
|
|
|
-
|
|
|
- def get_sorted_targets(self, targets: Dict[int, TrackingTarget],
|
|
|
- frame_size: Tuple[int, int] = None) -> List[Tuple[TrackingTarget, float]]:
|
|
|
- """
|
|
|
- 获取按得分排序的目标列表
|
|
|
- Args:
|
|
|
- targets: 目标字典
|
|
|
- frame_size: 帧尺寸
|
|
|
- Returns:
|
|
|
- 排序后的目标列表 [(target, score), ...]
|
|
|
- """
|
|
|
- scored = []
|
|
|
- for target in targets.values():
|
|
|
- target.score = self.calculate_score(target, frame_size)
|
|
|
- scored.append((target, target.score))
|
|
|
-
|
|
|
- scored.sort(key=lambda x: x[1], reverse=True)
|
|
|
- return scored
|
|
|
-
|
|
|
- def set_strategy(self, strategy: str):
|
|
|
- """设置选择策略"""
|
|
|
- self.config['strategy'] = strategy
|
|
|
- logger.info(f"[目标选择] 策略已切换为: {strategy}")
|
|
|
-
|
|
|
- def set_stickiness(self, stickiness: float):
|
|
|
- """设置目标粘性"""
|
|
|
- self.config['stickiness'] = max(0.0, min(1.0, stickiness))
|
|
|
- logger.info(f"[目标选择] 粘性已设置为: {self.config['stickiness']}")
|
|
|
-
|
|
|
-
|
|
|
-class Coordinator:
|
|
|
- """
|
|
|
- 联动控制器
|
|
|
- 协调全景摄像头和球机实现联动抓拍
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self, panorama_camera: PanoramaCamera,
|
|
|
- ptz_camera: PTZCamera,
|
|
|
- detector: ObjectDetector = None,
|
|
|
- calibrator = None):
|
|
|
- """
|
|
|
- 初始化联动控制器
|
|
|
- Args:
|
|
|
- panorama_camera: 全景摄像头
|
|
|
- ptz_camera: 球机
|
|
|
- detector: 物体检测器
|
|
|
- calibrator: 校准器 (用于坐标转换)
|
|
|
- """
|
|
|
- self.panorama = panorama_camera
|
|
|
- self.ptz = ptz_camera
|
|
|
- self.detector = detector
|
|
|
- self.calibrator = calibrator
|
|
|
-
|
|
|
- self.config = COORDINATOR_CONFIG
|
|
|
-
|
|
|
- # 功能开关 - 从 SYSTEM_CONFIG 读取
|
|
|
- self.enable_ptz_camera = SYSTEM_CONFIG.get('enable_ptz_camera', True)
|
|
|
- self.enable_ptz_tracking = SYSTEM_CONFIG.get('enable_ptz_tracking', True)
|
|
|
- self.enable_calibration = SYSTEM_CONFIG.get('enable_calibration', True)
|
|
|
- self.enable_detection = SYSTEM_CONFIG.get('enable_detection', True)
|
|
|
-
|
|
|
- # 球机端人体检测与自动对焦
|
|
|
- self.enable_ptz_detection = PTZ_CONFIG.get('enable_ptz_detection', False)
|
|
|
- self.auto_zoom_config = PTZ_CONFIG.get('auto_zoom', {})
|
|
|
- self.ptz_detector = None
|
|
|
- self.auto_zoom_controller = None
|
|
|
-
|
|
|
- # 状态
|
|
|
- self.state = TrackingState.IDLE
|
|
|
- self.state_lock = threading.Lock()
|
|
|
-
|
|
|
- # 跟踪目标
|
|
|
- self.tracking_targets: Dict[int, TrackingTarget] = {}
|
|
|
- self.targets_lock = threading.Lock()
|
|
|
-
|
|
|
- # 当前跟踪目标
|
|
|
- self.current_target: Optional[TrackingTarget] = None
|
|
|
-
|
|
|
- # 回调函数
|
|
|
- self.on_person_detected: Optional[Callable] = None
|
|
|
- self.on_tracking_started: Optional[Callable] = None
|
|
|
- self.on_tracking_stopped: Optional[Callable] = None
|
|
|
-
|
|
|
- # 控制标志
|
|
|
- self.running = False
|
|
|
- self._paused = False
|
|
|
- self._paused_event = threading.Event()
|
|
|
- self._paused_event.set() # 默认非暂停状态
|
|
|
- self.coordinator_thread = None
|
|
|
-
|
|
|
- # PTZ优化 - 避免频繁发送相同位置的命令
|
|
|
- self.last_ptz_position = None
|
|
|
- self.ptz_position_threshold = self.config.get('ptz_position_threshold', 0.03)
|
|
|
-
|
|
|
- # 目标选择器
|
|
|
- self.target_selector = TargetSelector(
|
|
|
- self.config.get('target_selection', {})
|
|
|
- )
|
|
|
-
|
|
|
- # 结果队列
|
|
|
- self.result_queue = queue.Queue()
|
|
|
-
|
|
|
- # ByteTrack 跟踪器(基于 Ultralytics BYTETracker)
|
|
|
- self.byte_tracker = None
|
|
|
- self._byte_tracker_init_lock = threading.Lock()
|
|
|
-
|
|
|
- # 跨帧跟踪:全局track_id计数器
|
|
|
- self._next_track_id = 1
|
|
|
- self._track_id_lock = threading.Lock()
|
|
|
-
|
|
|
- # 性能统计
|
|
|
- self.stats = {
|
|
|
- 'frames_processed': 0,
|
|
|
- 'persons_detected': 0,
|
|
|
- 'start_time': None,
|
|
|
- 'last_frame_time': None,
|
|
|
- }
|
|
|
- self.stats_lock = threading.Lock()
|
|
|
-
|
|
|
- def set_calibrator(self, calibrator):
|
|
|
- """设置校准器"""
|
|
|
- self.calibrator = calibrator
|
|
|
-
|
|
|
- def pause_detection(self):
|
|
|
- """暂停检测(校准时使用,线程不退出,仅跳过检测逻辑)"""
|
|
|
- self._paused = True
|
|
|
- self._paused_event.clear()
|
|
|
- logger.info("[协调器] 检测已暂停")
|
|
|
-
|
|
|
- def resume_detection(self):
|
|
|
- """恢复检测(校准完成后恢复)"""
|
|
|
- self._paused = False
|
|
|
- self._paused_event.set()
|
|
|
- logger.info("[协调器] 检测已恢复")
|
|
|
-
|
|
|
- def is_paused(self) -> bool:
|
|
|
- """检测是否暂停"""
|
|
|
- return self._paused
|
|
|
-
|
|
|
- def _transform_position(self, x_ratio: float, y_ratio: float) -> Tuple[float, float, int]:
|
|
|
- """
|
|
|
- 将全景坐标转换为PTZ角度
|
|
|
- Args:
|
|
|
- x_ratio: X方向比例
|
|
|
- y_ratio: Y方向比例
|
|
|
- Returns:
|
|
|
- (pan, tilt, zoom)
|
|
|
- """
|
|
|
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
|
|
|
- # 使用校准结果进行转换(tilt偏移已在calibrator.transform中应用)
|
|
|
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
|
|
|
- zoom = 8 # 默认变倍
|
|
|
- else:
|
|
|
- # 使用默认估算
|
|
|
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
|
|
|
-
|
|
|
- return (pan, tilt, zoom)
|
|
|
-
|
|
|
- def start(self) -> bool:
|
|
|
- """
|
|
|
- 启动联动系统
|
|
|
- Returns:
|
|
|
- 是否成功
|
|
|
- """
|
|
|
- # 连接全景摄像头
|
|
|
- if not self.panorama.connect():
|
|
|
- print("连接全景摄像头失败")
|
|
|
- return False
|
|
|
-
|
|
|
- # 连接 PTZ 球机 (可选)
|
|
|
- if self.enable_ptz_camera:
|
|
|
- if not self.ptz.connect():
|
|
|
- print("连接球机失败")
|
|
|
- self.panorama.disconnect()
|
|
|
- return False
|
|
|
- else:
|
|
|
- print("PTZ 球机功能已禁用")
|
|
|
-
|
|
|
- # 启动视频流(优先RTSP,SDK回调不可用时回退)
|
|
|
- if not self.panorama.start_stream_rtsp():
|
|
|
- print("RTSP视频流启动失败,尝试SDK方式...")
|
|
|
- if not self.panorama.start_stream():
|
|
|
- print("启动视频流失败")
|
|
|
- self.panorama.disconnect()
|
|
|
- if self.enable_ptz_camera:
|
|
|
- self.ptz.disconnect()
|
|
|
- return False
|
|
|
-
|
|
|
- # 启动联动线程
|
|
|
- self.running = True
|
|
|
- self.coordinator_thread = threading.Thread(target=self._coordinator_worker, daemon=True)
|
|
|
- self.coordinator_thread.start()
|
|
|
-
|
|
|
- print("联动系统已启动")
|
|
|
- return True
|
|
|
-
|
|
|
- def stop(self):
|
|
|
- """停止联动系统"""
|
|
|
- self.running = False
|
|
|
-
|
|
|
- if self.coordinator_thread:
|
|
|
- self.coordinator_thread.join(timeout=3)
|
|
|
-
|
|
|
- self.panorama.disconnect()
|
|
|
-
|
|
|
- if self.enable_ptz_camera:
|
|
|
- self.ptz.disconnect()
|
|
|
-
|
|
|
- # 清理 BYTETracker
|
|
|
- self.byte_tracker = None
|
|
|
-
|
|
|
- # 打印统计信息
|
|
|
- self._print_stats()
|
|
|
-
|
|
|
- print("联动系统已停止")
|
|
|
-
|
|
|
- def _update_stats(self, key: str, value: int = 1):
|
|
|
- """更新统计信息"""
|
|
|
- with self.stats_lock:
|
|
|
- if key in self.stats:
|
|
|
- self.stats[key] += value
|
|
|
-
|
|
|
- def _print_stats(self):
|
|
|
- """打印统计信息"""
|
|
|
- with self.stats_lock:
|
|
|
- if self.stats['start_time'] and self.stats['frames_processed'] > 0:
|
|
|
- elapsed = time.time() - self.stats['start_time']
|
|
|
- fps = self.stats['frames_processed'] / elapsed
|
|
|
- print("\n=== 性能统计 ===")
|
|
|
- print(f"运行时长: {elapsed:.1f}秒")
|
|
|
- print(f"处理帧数: {self.stats['frames_processed']}")
|
|
|
- print(f"平均帧率: {fps:.1f} fps")
|
|
|
- print(f"检测人体: {self.stats['persons_detected']}次")
|
|
|
- print("================\n")
|
|
|
-
|
|
|
- def get_stats(self) -> dict:
|
|
|
- """获取统计信息"""
|
|
|
- with self.stats_lock:
|
|
|
- return self.stats.copy()
|
|
|
-
|
|
|
- def _coordinator_worker(self):
|
|
|
- """联动工作线程"""
|
|
|
- # 暂停时阻塞等待恢复,不消耗CPU
|
|
|
- self._paused_event.wait()
|
|
|
-
|
|
|
- last_detection_time = 0
|
|
|
- # 从 DETECTION_CONFIG 获取检测帧率,默认每秒2帧
|
|
|
- detection_fps = self.config.get('detection_fps', DETECTION_CONFIG.get('detection_fps', 2))
|
|
|
- detection_interval = 1.0 / detection_fps # 根据FPS计算间隔
|
|
|
-
|
|
|
- # 初始化统计
|
|
|
- with self.stats_lock:
|
|
|
- self.stats['start_time'] = time.time()
|
|
|
-
|
|
|
- while self.running:
|
|
|
- try:
|
|
|
- current_time = time.time()
|
|
|
-
|
|
|
- # 获取当前帧
|
|
|
- frame = self.panorama.get_frame()
|
|
|
- if frame is None:
|
|
|
- time.sleep(0.01)
|
|
|
- continue
|
|
|
-
|
|
|
- # 更新帧统计
|
|
|
- self._update_stats('frames_processed')
|
|
|
-
|
|
|
- frame_size = (frame.shape[1], frame.shape[0])
|
|
|
-
|
|
|
- # 周期性检测(暂停时跳过)
|
|
|
- if not self._paused and current_time - last_detection_time >= detection_interval:
|
|
|
- last_detection_time = current_time
|
|
|
-
|
|
|
- # 检测人体
|
|
|
- detections = self._detect_persons(frame)
|
|
|
- # 使用 BYTETracker 进行跟踪(失败时回退到位置匹配)
|
|
|
- detections = self._update_with_bytetrack(detections, frame, frame_size)
|
|
|
-
|
|
|
- # 更新检测统计
|
|
|
- if detections:
|
|
|
- self._update_stats('persons_detected', len(detections))
|
|
|
-
|
|
|
- # 处理检测结果
|
|
|
- if detections:
|
|
|
- self._process_detections(detections, frame, frame_size)
|
|
|
-
|
|
|
- # 处理当前跟踪目标(暂停时跳过PTZ控制)
|
|
|
- if not self._paused:
|
|
|
- self._process_current_target(frame, frame_size)
|
|
|
-
|
|
|
- # 清理过期目标
|
|
|
- self._cleanup_expired_targets()
|
|
|
-
|
|
|
- time.sleep(0.01)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"联动处理错误: {e}")
|
|
|
- time.sleep(0.1)
|
|
|
-
|
|
|
- def _init_byte_tracker(self):
|
|
|
- """初始化 BYTETracker"""
|
|
|
- with self._byte_tracker_init_lock:
|
|
|
- if self.byte_tracker is not None:
|
|
|
- return
|
|
|
- try:
|
|
|
- from ultralytics.trackers.byte_tracker import BYTETracker
|
|
|
- import types
|
|
|
- self._bt_args = types.SimpleNamespace(
|
|
|
- track_high_thresh=0.5,
|
|
|
- track_low_thresh=0.1,
|
|
|
- new_track_thresh=0.3,
|
|
|
- match_thresh=0.8,
|
|
|
- fuse_score=False,
|
|
|
- track_buffer=30,
|
|
|
- mot20=False,
|
|
|
- )
|
|
|
- self.byte_tracker = BYTETracker(args=self._bt_args)
|
|
|
- logger.info("[跟踪] BYTETracker 初始化成功")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"[跟踪] BYTETracker 初始化失败: {e},将使用简化位置匹配跟踪")
|
|
|
- self.byte_tracker = None
|
|
|
-
|
|
|
- def _update_with_bytetrack(self, detections: List[DetectedObject],
|
|
|
- frame: np.ndarray,
|
|
|
- frame_size: Tuple[int, int]) -> List[DetectedObject]:
|
|
|
- """
|
|
|
- 使用 ObjectDetector + BYTETracker 进行跟踪
|
|
|
- ByteTrack 失败时回退到旧位置匹配
|
|
|
- """
|
|
|
- conf_thr = DETECTION_CONFIG.get('confidence_threshold', 0.35)
|
|
|
- person_dets = [d for d in detections if d.class_name == 'person'
|
|
|
- and d.confidence >= conf_thr]
|
|
|
-
|
|
|
- self._init_byte_tracker()
|
|
|
- if self.byte_tracker is not None and person_dets:
|
|
|
- try:
|
|
|
- import torch
|
|
|
- dets_t = torch.tensor([[d.bbox[0], d.bbox[1], d.bbox[0]+d.bbox[2], d.bbox[1]+d.bbox[3], d.confidence, 0] for d in person_dets], dtype=torch.float32)
|
|
|
- class _R:
|
|
|
- def __init__(s, x):
|
|
|
- s._raw = x
|
|
|
- s.xywh = torch.stack([(x[:,0]+x[:,2])/2,(x[:,1]+x[:,3])/2,x[:,2]-x[:,0],x[:,3]-x[:,1]], dim=-1)
|
|
|
- s.conf = x[:, 4]; s.cls = x[:, 5].long()
|
|
|
- def __getitem__(s, i): return _R(s._raw[i])
|
|
|
- def __len__(s): return len(s.conf)
|
|
|
- tracks = self.byte_tracker.update(_R(dets_t), None)
|
|
|
- if tracks is not None and len(tracks) > 0:
|
|
|
- frame_w, frame_h = frame_size
|
|
|
- cx_, cy_ = frame_w / 2, frame_h / 2
|
|
|
- now = time.time()
|
|
|
- with self.targets_lock:
|
|
|
- self.tracking_targets.clear()
|
|
|
- for tr in tracks:
|
|
|
- tx1, ty1, tx2, ty2, tid, tsc = int(tr[0]), int(tr[1]), int(tr[2]), int(tr[3]), int(tr[4]), float(tr[5])
|
|
|
- cxc, cyc = (tx1+tx2)//2, (ty1+ty2)//2
|
|
|
- self.tracking_targets[tid] = TrackingTarget(
|
|
|
- track_id=tid, position=(cxc/frame_w, cyc/frame_h),
|
|
|
- last_update=now, area=(tx2-tx1)*(ty2-ty1),
|
|
|
- confidence=tsc, center_distance=(abs(cxc-cx_)/cx_ + abs(cyc-cy_)/cy_)/2 if cx_>0 else 0)
|
|
|
- # IOU match
|
|
|
- import numpy as np
|
|
|
- for tr in np.array(tracks):
|
|
|
- tx1, ty1, tx2, ty2, tid = int(tr[0]), int(tr[1]), int(tr[2]), int(tr[3]), int(tr[4])
|
|
|
- best = None
|
|
|
- for d in person_dets:
|
|
|
- dx1, dy1, dw, dh = d.bbox; dx2, dy2 = dx1+dw, dy1+dh
|
|
|
- ix1, iy1 = max(tx1,dx1), max(ty1,dy1); ix2, iy2 = min(tx2,dx2), min(ty2,dy2)
|
|
|
- if ix1<ix2 and iy1<iy2 and ((ix2-ix1)*(iy2-iy1))/((tx2-tx1)*(ty2-ty1)+dw*dh-(ix2-ix1)*(iy2-iy1)+1e-6) > 0.3:
|
|
|
- best = d
|
|
|
- if best is not None:
|
|
|
- best.track_id = tid
|
|
|
- return [d for d in person_dets if d.track_id is not None]
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"[跟踪] ByteTrack 执行异常: {e}")
|
|
|
-
|
|
|
- # ByteTrack 不可用/无结果 → 回退旧位置匹配
|
|
|
- self._update_tracking_targets(detections, frame_size)
|
|
|
- return detections
|
|
|
-
|
|
|
- def _detect_persons(self, frame: np.ndarray) -> List[DetectedObject]:
|
|
|
- """检测人体"""
|
|
|
- if not self.enable_detection or self.detector is None:
|
|
|
- return []
|
|
|
- return self.detector.detect_persons(frame)
|
|
|
-
|
|
|
- def _update_tracking_targets(self, detections: List[DetectedObject],
|
|
|
- frame_size: Tuple[int, int]):
|
|
|
- """更新跟踪目标(跨帧匹配,支持粘性跟踪)
|
|
|
-
|
|
|
- 改进:不再每轮清空目标,而是使用位置匹配关联连续帧的目标
|
|
|
- """
|
|
|
- current_time = time.time()
|
|
|
- frame_w, frame_h = frame_size
|
|
|
- center_x, center_y = frame_w / 2, frame_h / 2
|
|
|
-
|
|
|
- # 获取人员置信度阈值
|
|
|
- person_threshold = DETECTION_CONFIG.get('person_threshold', 0.8)
|
|
|
-
|
|
|
- # 过滤有效人员
|
|
|
- valid_detections = []
|
|
|
- low_conf_count = 0
|
|
|
- for det in detections:
|
|
|
- if det.class_name != 'person':
|
|
|
- continue
|
|
|
- if det.confidence < person_threshold:
|
|
|
- low_conf_count += 1
|
|
|
- continue
|
|
|
- valid_detections.append(det)
|
|
|
-
|
|
|
- # 【调试日志】显示过滤结果
|
|
|
- if detections:
|
|
|
- logger.info(f"[跟踪] 检测到 {len(detections)} 个目标, 置信度>={person_threshold} 的有 {len(valid_detections)} 个 (过滤掉 {low_conf_count} 个)")
|
|
|
-
|
|
|
- if not valid_detections:
|
|
|
- return
|
|
|
-
|
|
|
- with self.targets_lock:
|
|
|
- # 匹配阈值:位置距离小于此值认为是同一目标
|
|
|
- MATCH_THRESHOLD = 0.15 # 画面比例
|
|
|
-
|
|
|
- # 已匹配的检测索引
|
|
|
- matched_det_indices = set()
|
|
|
-
|
|
|
- # 步骤1:尝试匹配现有目标
|
|
|
- for track_id, target in list(self.tracking_targets.items()):
|
|
|
- best_match_idx = None
|
|
|
- best_match_dist = MATCH_THRESHOLD
|
|
|
-
|
|
|
- for idx, det in enumerate(valid_detections):
|
|
|
- if idx in matched_det_indices:
|
|
|
- continue
|
|
|
-
|
|
|
- det_x = det.center[0] / frame_w
|
|
|
- det_y = det.center[1] / frame_h
|
|
|
-
|
|
|
- # 计算位置距离
|
|
|
- dist = math.sqrt(
|
|
|
- (det_x - target.position[0]) ** 2 +
|
|
|
- (det_y - target.position[1]) ** 2
|
|
|
- )
|
|
|
-
|
|
|
- if dist < best_match_dist:
|
|
|
- best_match_dist = dist
|
|
|
- best_match_idx = idx
|
|
|
-
|
|
|
-
|
|
|
- if best_match_idx is not None:
|
|
|
- # 找到匹配,更新目标
|
|
|
- det = valid_detections[best_match_idx]
|
|
|
- matched_det_indices.add(best_match_idx)
|
|
|
-
|
|
|
- x_ratio = det.center[0] / frame_w
|
|
|
- y_ratio = det.center[1] / frame_h
|
|
|
- _, _, width, height = det.bbox
|
|
|
- area = width * height
|
|
|
-
|
|
|
- dx = abs(det.center[0] - center_x) / center_x
|
|
|
- dy = abs(det.center[1] - center_y) / center_y
|
|
|
- center_distance = (dx + dy) / 2
|
|
|
-
|
|
|
- # 更新目标属性
|
|
|
- self.tracking_targets[track_id] = TrackingTarget(
|
|
|
- track_id=track_id,
|
|
|
- position=(x_ratio, y_ratio),
|
|
|
- last_update=current_time,
|
|
|
- area=area,
|
|
|
- confidence=det.confidence,
|
|
|
- center_distance=center_distance,
|
|
|
- person_info=target.person_info # 保留之前识别的信息
|
|
|
- )
|
|
|
-
|
|
|
- # 步骤2:为未匹配的检测创建新目标
|
|
|
- for idx, det in enumerate(valid_detections):
|
|
|
- if idx in matched_det_indices:
|
|
|
- continue
|
|
|
-
|
|
|
- x_ratio = det.center[0] / frame_w
|
|
|
- y_ratio = det.center[1] / frame_h
|
|
|
- _, _, width, height = det.bbox
|
|
|
- area = width * height
|
|
|
-
|
|
|
- dx = abs(det.center[0] - center_x) / center_x
|
|
|
- dy = abs(det.center[1] - center_y) / center_y
|
|
|
- center_distance = (dx + dy) / 2
|
|
|
-
|
|
|
- # 分配全局唯一track_id
|
|
|
- with self._track_id_lock:
|
|
|
- new_track_id = self._next_track_id
|
|
|
- self._next_track_id += 1
|
|
|
-
|
|
|
-
|
|
|
- det.track_id = new_track_id # 更新检测对象的track_id
|
|
|
-
|
|
|
- self.tracking_targets[new_track_id] = TrackingTarget(
|
|
|
- track_id=new_track_id,
|
|
|
- position=(x_ratio, y_ratio),
|
|
|
- last_update=current_time,
|
|
|
- area=area,
|
|
|
- confidence=det.confidence,
|
|
|
- center_distance=center_distance
|
|
|
- )
|
|
|
-
|
|
|
- def _process_detections(self, detections: List[DetectedObject],
|
|
|
- frame: np.ndarray, frame_size: Tuple[int, int]):
|
|
|
- """处理检测结果"""
|
|
|
- if self.on_person_detected:
|
|
|
- for det in detections:
|
|
|
- self.on_person_detected(det, frame)
|
|
|
-
|
|
|
- def _process_current_target(self, frame: np.ndarray, frame_size: Tuple[int, int]):
|
|
|
- """处理当前跟踪目标"""
|
|
|
- with self.targets_lock:
|
|
|
- if not self.tracking_targets:
|
|
|
- self._set_state(TrackingState.IDLE)
|
|
|
- self.current_target = None
|
|
|
- return
|
|
|
-
|
|
|
- # 使用目标选择器选择最优目标
|
|
|
- self.current_target = self.target_selector.select_target(
|
|
|
- self.tracking_targets, frame_size
|
|
|
- )
|
|
|
-
|
|
|
- if self.current_target:
|
|
|
- # 移动球机到目标位置 (仅在 PTZ 跟踪启用时)
|
|
|
- if self.enable_ptz_tracking and self.enable_ptz_camera:
|
|
|
- self._set_state(TrackingState.TRACKING)
|
|
|
-
|
|
|
- x_ratio, y_ratio = self.current_target.position
|
|
|
-
|
|
|
- # 检查位置是否变化超过阈值
|
|
|
- should_move = True
|
|
|
- if self.last_ptz_position is not None:
|
|
|
- last_x, last_y = self.last_ptz_position
|
|
|
- if (abs(x_ratio - last_x) < self.ptz_position_threshold and
|
|
|
- abs(y_ratio - last_y) < self.ptz_position_threshold):
|
|
|
- should_move = False
|
|
|
-
|
|
|
- if should_move:
|
|
|
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
|
|
|
- # 校准器返回的是可直接发送给球机的真实 PTZ 角度,不再应用 pan_flip
|
|
|
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
|
|
|
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
|
|
|
- self.ptz.goto_exact_position(pan, tilt, zoom)
|
|
|
- else:
|
|
|
- self.ptz.track_target(x_ratio, y_ratio)
|
|
|
- self.last_ptz_position = (x_ratio, y_ratio)
|
|
|
-
|
|
|
- def _cleanup_expired_targets(self):
|
|
|
- """清理过期目标"""
|
|
|
- current_time = time.time()
|
|
|
- timeout = self.config['tracking_timeout']
|
|
|
-
|
|
|
- with self.targets_lock:
|
|
|
- expired_ids = [
|
|
|
- target_id for target_id, target in self.tracking_targets.items()
|
|
|
- if current_time - target.last_update > timeout
|
|
|
- ]
|
|
|
-
|
|
|
- for target_id in expired_ids:
|
|
|
- del self.tracking_targets[target_id]
|
|
|
- if self.current_target and self.current_target.track_id == target_id:
|
|
|
- self.current_target = None
|
|
|
-
|
|
|
- def _set_state(self, state: TrackingState):
|
|
|
- """设置状态"""
|
|
|
- with self.state_lock:
|
|
|
- self.state = state
|
|
|
-
|
|
|
- def get_state(self) -> TrackingState:
|
|
|
- """获取状态"""
|
|
|
- with self.state_lock:
|
|
|
- return self.state
|
|
|
-
|
|
|
- def get_results(self) -> List[PersonInfo]:
|
|
|
- """
|
|
|
- 获取识别结果
|
|
|
- Returns:
|
|
|
- 人员信息列表
|
|
|
- """
|
|
|
- results = []
|
|
|
- while not self.result_queue.empty():
|
|
|
- try:
|
|
|
- results.append(self.result_queue.get_nowait())
|
|
|
- except queue.Empty:
|
|
|
- break
|
|
|
- return results
|
|
|
-
|
|
|
- def get_tracking_targets(self) -> List[TrackingTarget]:
|
|
|
- """获取当前跟踪目标"""
|
|
|
- with self.targets_lock:
|
|
|
- return list(self.tracking_targets.values())
|
|
|
-
|
|
|
- def force_track_position(self, x_ratio: float, y_ratio: float, zoom: int = None):
|
|
|
- """
|
|
|
- 强制跟踪指定位置
|
|
|
- Args:
|
|
|
- x_ratio: X方向比例
|
|
|
- y_ratio: Y方向比例
|
|
|
- zoom: 变倍
|
|
|
- """
|
|
|
- if self.enable_ptz_tracking and self.enable_ptz_camera:
|
|
|
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
|
|
|
- # 校准器返回的是可直接发送给球机的真实 PTZ 角度,不再应用 pan_flip
|
|
|
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
|
|
|
- self.ptz.goto_exact_position(pan, tilt, zoom or self.ptz.ptz_config.get('default_zoom', 8))
|
|
|
- else:
|
|
|
- self.ptz.move_to_target(x_ratio, y_ratio, zoom)
|
|
|
-
|
|
|
- def capture_snapshot(self) -> Optional[np.ndarray]:
|
|
|
- """
|
|
|
- 抓拍快照
|
|
|
- Returns:
|
|
|
- 快照图像
|
|
|
- """
|
|
|
- return self.panorama.get_frame()
|
|
|
-
|
|
|
-
|
|
|
-class EventDrivenCoordinator(Coordinator):
|
|
|
- """事件驱动联动控制器,当全景摄像头检测到事件时触发联动"""
|
|
|
-
|
|
|
- def __init__(self, *args, **kwargs):
|
|
|
- super().__init__(*args, **kwargs)
|
|
|
- self.event_types = {
|
|
|
- 'intruder': True,
|
|
|
- 'crossline': True,
|
|
|
- 'motion': True,
|
|
|
- }
|
|
|
- self.event_queue = queue.Queue()
|
|
|
-
|
|
|
- def on_event(self, event_type: str, event_data: dict):
|
|
|
- if not self.event_types.get(event_type, False):
|
|
|
- return
|
|
|
- self.event_queue.put({'type': event_type, 'data': event_data, 'time': time.time()})
|
|
|
-
|
|
|
- def _coordinator_worker(self):
|
|
|
- while self.running:
|
|
|
- try:
|
|
|
- try:
|
|
|
- event = self.event_queue.get(timeout=0.1)
|
|
|
- self._process_event(event)
|
|
|
- except queue.Empty:
|
|
|
- pass
|
|
|
-
|
|
|
- frame = self.panorama.get_frame()
|
|
|
- if frame is not None:
|
|
|
- frame_size = (frame.shape[1], frame.shape[0])
|
|
|
- detections = self._detect_persons(frame)
|
|
|
- if detections:
|
|
|
- # 更新跟踪目标(track_id 在此方法内分配)
|
|
|
- self._update_tracking_targets(detections, frame_size)
|
|
|
- self._process_current_target(frame, frame_size)
|
|
|
-
|
|
|
- self._cleanup_expired_targets()
|
|
|
- except Exception as e:
|
|
|
- print(f"事件处理错误: {e}")
|
|
|
- time.sleep(0.1)
|
|
|
-
|
|
|
- def _process_event(self, event: dict):
|
|
|
- event_type = event['type']
|
|
|
- event_data = event['data']
|
|
|
- print(f"处理事件: {event_type}")
|
|
|
-
|
|
|
- if event_type == 'intruder' and 'position' in event_data:
|
|
|
- x_ratio, y_ratio = event_data['position']
|
|
|
- self.force_track_position(x_ratio, y_ratio)
|
|
|
-
|
|
|
-
|
|
|
-@dataclass
|
|
|
-class PTZCommand:
|
|
|
- """PTZ控制命令"""
|
|
|
- pan: float
|
|
|
- tilt: float
|
|
|
- zoom: int
|
|
|
- x_ratio: float = 0.0
|
|
|
- y_ratio: float = 0.0
|
|
|
- use_calibration: bool = True
|
|
|
- track_id: Optional[int] = None # 跟踪目标ID(用于配对图片保存)
|
|
|
- batch_id: Optional[str] = None # 批次ID(用于配对图片保存)
|
|
|
- person_index: int = -1 # 人员在批次中的序号(用于配对图片保存)
|
|
|
-
|
|
|
-
|
|
|
-class AsyncCoordinator(Coordinator):
|
|
|
- """
|
|
|
- 异步联动控制器 — 检测线程与PTZ控制线程分离
|
|
|
-
|
|
|
- 改进:
|
|
|
- 1. 检测线程:持续读取全景帧 + YOLO推理
|
|
|
- 2. PTZ控制线程:通过命令队列接收目标位置,独立控制球机
|
|
|
- 3. 两线程通过 queue 通信,互不阻塞
|
|
|
- 4. PTZ位置确认:移动后等待球机到位并验证帧
|
|
|
- """
|
|
|
-
|
|
|
- PTZ_CONFIRM_WAIT = 0.3 # PTZ命令后等待稳定的秒数
|
|
|
- PTZ_CONFIRM_TIMEOUT = 2.0 # PTZ位置确认超时
|
|
|
- PTZ_COMMAND_COOLDOWN = 0.15 # PTZ命令最小间隔秒数
|
|
|
-
|
|
|
- def __init__(self, *args, **kwargs):
|
|
|
- super().__init__(*args, **kwargs)
|
|
|
-
|
|
|
- # PTZ命令队列(检测→PTZ)
|
|
|
- self._ptz_queue: queue.Queue = queue.Queue(maxsize=10)
|
|
|
-
|
|
|
- # 线程
|
|
|
- self._detection_thread = None
|
|
|
- self._ptz_thread = None
|
|
|
-
|
|
|
- # PTZ确认回调
|
|
|
- self._on_ptz_confirmed: Optional[Callable] = None
|
|
|
-
|
|
|
- # 上次PTZ命令时间(添加线程锁保护)
|
|
|
- self._last_ptz_time = 0.0
|
|
|
- self._last_ptz_time_lock = threading.Lock()
|
|
|
-
|
|
|
- # 帧获取配置
|
|
|
- self._frame_config = {
|
|
|
- 'wait_interval': PTZ_CONFIG.get('frame_wait_interval', 0.2),
|
|
|
- 'max_attempts': PTZ_CONFIG.get('frame_max_attempts', 8),
|
|
|
- 'min_clarity': PTZ_CONFIG.get('min_clarity', 200),
|
|
|
- }
|
|
|
-
|
|
|
- # 配对图片保存器
|
|
|
- self._enable_paired_saving = DETECTION_CONFIG.get('enable_paired_saving', False)
|
|
|
- self._paired_saver: Optional[PairedImageSaver] = None
|
|
|
- self._current_batch_id: Optional[str] = None
|
|
|
- self._person_ptz_index: Dict[int, int] = {} # track_id -> person_index
|
|
|
-
|
|
|
- if self._enable_paired_saving:
|
|
|
- save_dir = DETECTION_CONFIG.get('paired_image_dir', '/home/admin/dsh/paired_images')
|
|
|
- time_window = DETECTION_CONFIG.get('paired_time_window', 5.0)
|
|
|
- self._paired_saver = get_paired_saver(base_dir=save_dir, time_window=time_window)
|
|
|
- logger.info(f"[AsyncCoordinator] 配对图片保存已启用: 目录={save_dir}, 时间窗口={time_window}s")
|
|
|
-
|
|
|
- def start(self) -> bool:
|
|
|
- """启动联动(覆盖父类,启动双线程)"""
|
|
|
- if not self.panorama.connect():
|
|
|
- print("连接全景摄像头失败")
|
|
|
- return False
|
|
|
-
|
|
|
- if self.enable_ptz_camera:
|
|
|
- if not self.ptz.connect():
|
|
|
- print("连接球机失败")
|
|
|
- self.panorama.disconnect()
|
|
|
- return False
|
|
|
-
|
|
|
- # 启动球机RTSP流(用于球机端人体检测)
|
|
|
- if self.enable_ptz_detection:
|
|
|
- if not self.ptz.start_stream_rtsp():
|
|
|
- print("球机RTSP流启动失败,禁用球机端检测功能")
|
|
|
- self.enable_ptz_detection = False
|
|
|
- else:
|
|
|
- # 初始化球机端人体检测器
|
|
|
- self._init_ptz_detector()
|
|
|
- else:
|
|
|
- print("PTZ球机功能已禁用")
|
|
|
-
|
|
|
- if not self.panorama.start_stream_rtsp():
|
|
|
- print("RTSP视频流启动失败,尝试SDK方式...")
|
|
|
- if not self.panorama.start_stream():
|
|
|
- print("启动视频流失败")
|
|
|
- self.panorama.disconnect()
|
|
|
- if self.enable_ptz_camera:
|
|
|
- self.ptz.disconnect()
|
|
|
- return False
|
|
|
-
|
|
|
- self.running = True
|
|
|
-
|
|
|
- # 启动检测线程
|
|
|
- self._detection_thread = threading.Thread(
|
|
|
- target=self._detection_worker, name="detection-worker", daemon=True)
|
|
|
- self._detection_thread.start()
|
|
|
-
|
|
|
- # 启动PTZ控制线程
|
|
|
- if self.enable_ptz_camera and self.enable_ptz_tracking:
|
|
|
- self._ptz_thread = threading.Thread(
|
|
|
- target=self._ptz_worker, name="ptz-worker", daemon=True)
|
|
|
- self._ptz_thread.start()
|
|
|
-
|
|
|
- print("异步联动系统已启动 (检测线程 + PTZ控制线程)")
|
|
|
- return True
|
|
|
-
|
|
|
- def stop(self):
|
|
|
- """停止联动"""
|
|
|
- self.running = False
|
|
|
-
|
|
|
- # 清空PTZ队列,让工作线程退出
|
|
|
- while not self._ptz_queue.empty():
|
|
|
- try:
|
|
|
- self._ptz_queue.get_nowait()
|
|
|
- except queue.Empty:
|
|
|
- break
|
|
|
-
|
|
|
- if self._detection_thread:
|
|
|
- self._detection_thread.join(timeout=3)
|
|
|
- if self._ptz_thread:
|
|
|
- self._ptz_thread.join(timeout=3)
|
|
|
-
|
|
|
- # 停止父类线程(如果有的话)
|
|
|
- if self.coordinator_thread:
|
|
|
- self.coordinator_thread.join(timeout=1)
|
|
|
-
|
|
|
- # 关闭配对保存器
|
|
|
- if self._paired_saver is not None:
|
|
|
- self._paired_saver.close()
|
|
|
- self._paired_saver = None
|
|
|
-
|
|
|
- # 清理 BYTETracker
|
|
|
- self.byte_tracker = None
|
|
|
-
|
|
|
- self.panorama.disconnect()
|
|
|
- if self.enable_ptz_camera:
|
|
|
- self.ptz.disconnect()
|
|
|
-
|
|
|
- self._print_stats()
|
|
|
- print("异步联动系统已停止")
|
|
|
-
|
|
|
- def _detection_worker(self):
|
|
|
- """检测线程:持续读帧 + YOLO推理 + 发送PTZ命令 + 打印检测日志"""
|
|
|
- # 暂停时阻塞等待恢复
|
|
|
- self._paused_event.wait()
|
|
|
-
|
|
|
- last_detection_time = 0
|
|
|
- # 从 DETECTION_CONFIG 获取检测帧率,默认每秒2帧
|
|
|
- detection_fps = self.config.get('detection_fps', DETECTION_CONFIG.get('detection_fps', 2))
|
|
|
- detection_interval = 1.0 / detection_fps # 根据FPS计算间隔
|
|
|
- ptz_cooldown = self.config.get('ptz_command_cooldown', 0.5)
|
|
|
- ptz_threshold = self.config.get('ptz_position_threshold', 0.03)
|
|
|
- frame_count = 0
|
|
|
- last_log_time = time.time()
|
|
|
- log_interval = 5.0 # 每5秒打印一次帧率统计
|
|
|
- detection_run_count = 0
|
|
|
- detection_person_count = 0
|
|
|
- detection_last_seen = 0
|
|
|
- last_no_detect_log_time = 0
|
|
|
- no_detect_log_interval = 30.0
|
|
|
-
|
|
|
- with self.stats_lock:
|
|
|
- self.stats['start_time'] = time.time()
|
|
|
-
|
|
|
- if self.detector is None:
|
|
|
- logger.warning("[检测线程] ⚠️ 人体检测器未初始化! 检测功能不可用, 请检查 YOLO 模型是否正确加载")
|
|
|
- elif not self.enable_detection:
|
|
|
- logger.warning("[检测线程] ⚠️ 人体检测已禁用 (enable_detection=False)")
|
|
|
- else:
|
|
|
- logger.info(f"[检测线程] ✓ 人体检测器已就绪, 检测帧率={detection_fps}fps(间隔={detection_interval:.2f}s), PTZ冷却={ptz_cooldown}s")
|
|
|
-
|
|
|
- while self.running:
|
|
|
- try:
|
|
|
- current_time = time.time()
|
|
|
- frame = self.panorama.get_frame()
|
|
|
- if frame is None:
|
|
|
- time.sleep(0.01)
|
|
|
- continue
|
|
|
-
|
|
|
- frame_count += 1
|
|
|
- self._update_stats('frames_processed')
|
|
|
- frame_size = (frame.shape[1], frame.shape[0])
|
|
|
-
|
|
|
- if current_time - last_log_time >= log_interval:
|
|
|
- elapsed = current_time - last_log_time
|
|
|
- fps = frame_count / elapsed if elapsed > 0 else 0
|
|
|
-
|
|
|
- state_str = self.state.name if hasattr(self.state, 'name') else str(self.state)
|
|
|
- stats_parts = [f"帧率={fps:.1f}fps", f"处理帧={frame_count}", f"状态={state_str}"]
|
|
|
-
|
|
|
- if self.detector is None:
|
|
|
- stats_parts.append("检测器=未加载")
|
|
|
- elif not self.enable_detection:
|
|
|
- stats_parts.append("检测=已禁用")
|
|
|
- else:
|
|
|
- if detection_last_seen > 0:
|
|
|
- ago = int(current_time - detection_last_seen)
|
|
|
- stats_parts.append(f"检测轮次={detection_run_count}(最后有人={ago}s前)")
|
|
|
- else:
|
|
|
- stats_parts.append(f"检测轮次={detection_run_count}(未检出)")
|
|
|
-
|
|
|
- with self.targets_lock:
|
|
|
- target_count = len(self.tracking_targets)
|
|
|
- stats_parts.append(f"跟踪目标={target_count}")
|
|
|
-
|
|
|
- logger.info(f"[检测线程] {', '.join(stats_parts)}")
|
|
|
- frame_count = 0
|
|
|
- last_log_time = current_time
|
|
|
-
|
|
|
- # 周期性检测(暂停时跳过检测和PTZ命令)
|
|
|
- if not self._paused and current_time - last_detection_time >= detection_interval:
|
|
|
- last_detection_time = current_time
|
|
|
- detection_run_count += 1
|
|
|
-
|
|
|
- # YOLO 人体检测
|
|
|
- detections = self._detect_persons(frame)
|
|
|
- # 使用 BYTETracker 进行跟踪(失败时回退到位置匹配)
|
|
|
- detections = self._update_with_bytetrack(detections, frame, frame_size)
|
|
|
-
|
|
|
- if detections:
|
|
|
- self._update_stats('persons_detected', len(detections))
|
|
|
- detection_person_count += 1
|
|
|
- detection_last_seen = current_time
|
|
|
-
|
|
|
- # 配对图片保存:创建新批次
|
|
|
- if detections and self._enable_paired_saving and self._paired_saver is not None:
|
|
|
- self._create_detection_batch(frame, detections, frame_size)
|
|
|
-
|
|
|
- # 打印检测日志(使用连续序号,与图片标记一致)
|
|
|
- if detections:
|
|
|
- person_threshold = DETECTION_CONFIG.get('person_threshold', 0.8)
|
|
|
- person_idx = 0
|
|
|
- for t in detections:
|
|
|
- # detections 是 DetectedObject,使用 center 计算位置
|
|
|
- x_ratio = t.center[0] / frame_size[0]
|
|
|
- y_ratio = t.center[1] / frame_size[1]
|
|
|
- _, _, w, h = t.bbox
|
|
|
- area = w * h
|
|
|
- # 只对达到阈值的人员打印日志并分配序号
|
|
|
- if t.class_name == 'person' and t.confidence >= person_threshold:
|
|
|
- logger.info(
|
|
|
- f"[检测] ✓ person_{person_idx} "
|
|
|
- f"位置=({x_ratio:.3f}, {y_ratio:.3f}) "
|
|
|
- f"面积={area} 置信度={t.confidence:.2f}"
|
|
|
- )
|
|
|
- person_idx += 1
|
|
|
- else:
|
|
|
- logger.debug(
|
|
|
- f"[检测] · 目标ID={t.track_id}({t.class_name}) "
|
|
|
- f"位置=({x_ratio:.3f}, {y_ratio:.3f}) "
|
|
|
- f"置信度={t.confidence:.2f}(低于阈值{person_threshold})"
|
|
|
- )
|
|
|
- else:
|
|
|
- if current_time - last_no_detect_log_time >= no_detect_log_interval:
|
|
|
- logger.info(
|
|
|
- f"[检测] · YOLO检测运行正常, 本轮未检测到人员 "
|
|
|
- f"(累计检测{detection_run_count}轮, 检测到人{detection_person_count}轮)"
|
|
|
- )
|
|
|
- last_no_detect_log_time = current_time
|
|
|
-
|
|
|
- if detections:
|
|
|
- self._process_detections(detections, frame, frame_size)
|
|
|
-
|
|
|
- # 为每个检测到的人发送PTZ命令(不再只选一个)
|
|
|
- if self.enable_ptz_tracking and self.enable_ptz_camera:
|
|
|
- targets = self._get_all_valid_targets()
|
|
|
- for target in targets:
|
|
|
- self._send_ptz_command_with_log(target, frame_size)
|
|
|
- elif not detections and self.current_target:
|
|
|
- # 目标消失,切回IDLE
|
|
|
- self._set_state(TrackingState.IDLE)
|
|
|
- logger.info("[检测] 目标丢失,球机进入IDLE状态")
|
|
|
- self.current_target = None
|
|
|
-
|
|
|
- self._cleanup_expired_targets()
|
|
|
- time.sleep(0.01)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"检测线程错误: {e}")
|
|
|
- time.sleep(0.1)
|
|
|
-
|
|
|
- def _init_ptz_detector(self):
|
|
|
- """初始化球机端人体检测器"""
|
|
|
- try:
|
|
|
- model_path = DETECTION_CONFIG.get('model_path')
|
|
|
- model_type = DETECTION_CONFIG.get('model_type', 'auto')
|
|
|
- conf_threshold = DETECTION_CONFIG.get('person_threshold', 0.5)
|
|
|
-
|
|
|
- if model_path:
|
|
|
- self.ptz_detector = PTZPersonDetector(
|
|
|
- model_path=model_path,
|
|
|
- model_type=model_type,
|
|
|
- confidence_threshold=conf_threshold
|
|
|
- )
|
|
|
- self.auto_zoom_controller = PTZAutoZoomController(
|
|
|
- ptz_camera=self.ptz,
|
|
|
- detector=self.ptz_detector,
|
|
|
- config=self.auto_zoom_config
|
|
|
- )
|
|
|
- print(f"[AsyncCoordinator] 球机端人体检测器初始化成功")
|
|
|
- else:
|
|
|
- print("[AsyncCoordinator] 未配置球机检测模型路径,禁用球机端检测")
|
|
|
- self.enable_ptz_detection = False
|
|
|
- except Exception as e:
|
|
|
- print(f"[AsyncCoordinator] 球机端检测器初始化失败: {e}")
|
|
|
- self.enable_ptz_detection = False
|
|
|
-
|
|
|
- def _deduplicate_detections(self, detections: List[DetectedObject],
|
|
|
- frame_size: Tuple[int, int]) -> List[DetectedObject]:
|
|
|
- """
|
|
|
- 去重检测结果(按位置合并重叠的检测框)
|
|
|
-
|
|
|
- Args:
|
|
|
- detections: 检测列表
|
|
|
- frame_size: 帧尺寸
|
|
|
-
|
|
|
- Returns:
|
|
|
- 去重后的人员检测列表
|
|
|
- """
|
|
|
- # 过滤有效人员
|
|
|
- person_threshold = DETECTION_CONFIG.get('person_threshold', 0.5)
|
|
|
- valid_persons = [d for d in detections
|
|
|
- if d.class_name == 'person' and d.confidence >= person_threshold]
|
|
|
-
|
|
|
- if not valid_persons:
|
|
|
- return []
|
|
|
-
|
|
|
- # 去重:按位置合并重叠的检测框
|
|
|
- DEDUP_DISTANCE = 0.05 # 画面比例 5%
|
|
|
- dedup_persons = []
|
|
|
-
|
|
|
- for det in valid_persons:
|
|
|
- det_x = det.center[0] / frame_size[0]
|
|
|
- det_y = det.center[1] / frame_size[1]
|
|
|
-
|
|
|
- # 检查是否与已有人员重叠
|
|
|
- is_duplicate = False
|
|
|
- for i, existing in enumerate(dedup_persons):
|
|
|
- ex_x = existing.center[0] / frame_size[0]
|
|
|
- ex_y = existing.center[1] / frame_size[1]
|
|
|
-
|
|
|
- dist = math.sqrt((det_x - ex_x)**2 + (det_y - ex_y)**2)
|
|
|
- if dist < DEDUP_DISTANCE:
|
|
|
- # 重叠,保留置信度更高的
|
|
|
- is_duplicate = True
|
|
|
- if det.confidence > existing.confidence:
|
|
|
- dedup_persons[i] = det
|
|
|
- break
|
|
|
-
|
|
|
- if not is_duplicate:
|
|
|
- dedup_persons.append(det)
|
|
|
-
|
|
|
-
|
|
|
- return dedup_persons
|
|
|
-
|
|
|
- def _create_detection_batch(self, frame: np.ndarray,
|
|
|
- detections: List[DetectedObject],
|
|
|
- frame_size: Tuple[int, int]) -> List[DetectedObject]:
|
|
|
- """
|
|
|
- 创建检测批次,用于配对图片保存
|
|
|
-
|
|
|
- Args:
|
|
|
- frame: 全景帧
|
|
|
- detections: 检测到的人员列表
|
|
|
- frame_size: 帧尺寸
|
|
|
-
|
|
|
- Returns:
|
|
|
- 去重后的人员检测列表
|
|
|
- """
|
|
|
- if self._paired_saver is None:
|
|
|
- return []
|
|
|
-
|
|
|
- # 过滤有效人员(必须是 person 且置信度 >= 阈值)
|
|
|
- person_threshold = DETECTION_CONFIG.get('person_threshold', 0.8)
|
|
|
- valid_persons = []
|
|
|
- for det in detections:
|
|
|
- # 只处理 class_name 为 person 的目标,排除安全帽、反光衣等
|
|
|
- if det.class_name == 'person' and det.confidence >= person_threshold:
|
|
|
- valid_persons.append(det)
|
|
|
-
|
|
|
- # 【关键修复】去重:按位置合并重叠的检测框
|
|
|
- # 如果两个检测框的中心距离小于阈值,只保留置信度更高的
|
|
|
- DEDUP_DISTANCE = 0.05 # 画面比例 5%
|
|
|
- dedup_persons = []
|
|
|
- for det in valid_persons:
|
|
|
- det_x = det.center[0] / frame_size[0]
|
|
|
- det_y = det.center[1] / frame_size[1]
|
|
|
-
|
|
|
- # 检查是否与已有人员重叠
|
|
|
- is_duplicate = False
|
|
|
- for i, existing in enumerate(dedup_persons):
|
|
|
- ex_x = existing.center[0] / frame_size[0]
|
|
|
- ex_y = existing.center[1] / frame_size[1]
|
|
|
-
|
|
|
- dist = math.sqrt((det_x - ex_x)**2 + (det_y - ex_y)**2)
|
|
|
- if dist < DEDUP_DISTANCE:
|
|
|
- # 重叠,保留置信度更高的
|
|
|
- is_duplicate = True
|
|
|
- if det.confidence > existing.confidence:
|
|
|
- dedup_persons[i] = det
|
|
|
- break
|
|
|
-
|
|
|
- if not is_duplicate:
|
|
|
- dedup_persons.append(det)
|
|
|
-
|
|
|
-
|
|
|
- if not dedup_persons:
|
|
|
- logger.debug(f"[配对保存] 无有效人员(阈值={person_threshold}),跳过批次创建")
|
|
|
- return []
|
|
|
-
|
|
|
-
|
|
|
- logger.info(f"[配对保存] 检测结果去重: {len(valid_persons)} -> {len(dedup_persons)} 个人员")
|
|
|
-
|
|
|
- # 构建人员信息列表(只包含去重后的人员)
|
|
|
- persons = []
|
|
|
- self._person_ptz_index = {} # 重置索引映射
|
|
|
-
|
|
|
- for i, det in enumerate(dedup_persons):
|
|
|
- x_ratio = det.center[0] / frame_size[0]
|
|
|
- y_ratio = det.center[1] / frame_size[1]
|
|
|
-
|
|
|
- person_info = {
|
|
|
- 'track_id': det.track_id,
|
|
|
- 'position': (x_ratio, y_ratio),
|
|
|
- 'bbox': (det.bbox[0], det.bbox[1],
|
|
|
- det.bbox[0] + det.bbox[2],
|
|
|
- det.bbox[1] + det.bbox[3]),
|
|
|
- 'confidence': det.confidence
|
|
|
- }
|
|
|
- persons.append(person_info)
|
|
|
- self._person_ptz_index[det.track_id] = i
|
|
|
-
|
|
|
- # 创建新批次
|
|
|
- batch_id = self._paired_saver.start_new_batch(frame, persons)
|
|
|
- if batch_id:
|
|
|
- self._current_batch_id = batch_id
|
|
|
- logger.info(f"[配对保存] 创建批次: {batch_id}, 有效人员={len(persons)}/{len(detections)}")
|
|
|
-
|
|
|
- return dedup_persons
|
|
|
-
|
|
|
- def _save_ptz_image_for_person(self, track_id: int,
|
|
|
- ptz_frame: np.ndarray,
|
|
|
- ptz_position: Tuple[float, float, int]):
|
|
|
- """
|
|
|
- 保存球机聚焦图片到对应批次
|
|
|
-
|
|
|
- Args:
|
|
|
- track_id: 人员跟踪ID
|
|
|
- ptz_frame: 球机帧
|
|
|
- ptz_position: PTZ位置 (pan, tilt, zoom)
|
|
|
- """
|
|
|
- if (self._paired_saver is None or
|
|
|
- self._current_batch_id is None or
|
|
|
- track_id not in self._person_ptz_index):
|
|
|
- return
|
|
|
-
|
|
|
- person_index = self._person_ptz_index[track_id]
|
|
|
-
|
|
|
- self._paired_saver.save_ptz_image(
|
|
|
- batch_id=self._current_batch_id,
|
|
|
- person_index=person_index,
|
|
|
- ptz_frame=ptz_frame,
|
|
|
- ptz_position=ptz_position,
|
|
|
- ptz_bbox=getattr(self, '_last_ptz_bbox', None)
|
|
|
- )
|
|
|
-
|
|
|
- def _save_ptz_image_for_person_batch(self, batch_id: str, person_index: int,
|
|
|
- ptz_frame: np.ndarray,
|
|
|
- ptz_position: Tuple[float, float, int],
|
|
|
- ptz_frame_marked: np.ndarray = None):
|
|
|
- """
|
|
|
- 保存球机聚焦图片到指定批次(直接使用 batch_id,不依赖当前批次)
|
|
|
-
|
|
|
- Args:
|
|
|
- batch_id: 批次ID
|
|
|
- person_index: 人员序号
|
|
|
- ptz_frame: 球机原始帧
|
|
|
- ptz_position: PTZ位置 (pan, tilt, zoom)
|
|
|
- ptz_frame_marked: 球机标记帧(可选,不传则在内部标记)
|
|
|
- """
|
|
|
- if self._paired_saver is None:
|
|
|
- return
|
|
|
-
|
|
|
- self._paired_saver.save_ptz_image(
|
|
|
- batch_id=batch_id,
|
|
|
- person_index=person_index,
|
|
|
- ptz_frame=ptz_frame,
|
|
|
- ptz_position=ptz_position,
|
|
|
- ptz_bbox=getattr(self, '_last_ptz_bbox', None),
|
|
|
- ptz_frame_marked=ptz_frame_marked
|
|
|
- )
|
|
|
-
|
|
|
- def _ptz_worker(self):
|
|
|
- """PTZ控制线程:从队列接收命令并控制球机"""
|
|
|
- while self.running:
|
|
|
- try:
|
|
|
- # 暂停时等待恢复
|
|
|
- if self._paused:
|
|
|
- self._paused_event.wait()
|
|
|
- continue
|
|
|
-
|
|
|
- try:
|
|
|
- cmd = self._ptz_queue.get(timeout=0.1)
|
|
|
- except queue.Empty:
|
|
|
- continue
|
|
|
-
|
|
|
- # 执行PTZ命令(batch_id 和 person_index 已在命令中)
|
|
|
- self._execute_ptz_command(cmd)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"PTZ控制线程错误: {e}")
|
|
|
- time.sleep(0.05)
|
|
|
-
|
|
|
- def _select_tracking_target(self) -> Optional[TrackingTarget]:
|
|
|
- """选择当前跟踪目标"""
|
|
|
- with self.targets_lock:
|
|
|
- if not self.tracking_targets:
|
|
|
- self._set_state(TrackingState.IDLE)
|
|
|
- self.current_target = None
|
|
|
- return None
|
|
|
-
|
|
|
- # 使用目标选择器选择最优目标
|
|
|
- self.current_target = self.target_selector.select_target(
|
|
|
- self.tracking_targets
|
|
|
- )
|
|
|
-
|
|
|
- return self.current_target
|
|
|
-
|
|
|
- def _get_all_valid_targets(self) -> List[TrackingTarget]:
|
|
|
- """
|
|
|
- 获取所有有效的检测目标(用于多目标PTZ定位)
|
|
|
- 返回按优先级排序的目标列表
|
|
|
- """
|
|
|
- with self.targets_lock:
|
|
|
- if not self.tracking_targets:
|
|
|
- self._set_state(TrackingState.IDLE)
|
|
|
- self.current_target = None
|
|
|
- return []
|
|
|
-
|
|
|
- # 按得分排序所有目标
|
|
|
- targets = list(self.tracking_targets.values())
|
|
|
- targets.sort(key=lambda t: t.score, reverse=True)
|
|
|
-
|
|
|
- if targets:
|
|
|
- self._set_state(TrackingState.TRACKING)
|
|
|
- self.current_target = targets[0] # 第一个作为当前目标
|
|
|
-
|
|
|
- return targets
|
|
|
-
|
|
|
- def _send_ptz_command(self, target: TrackingTarget, frame_size: Tuple[int, int]):
|
|
|
- """将跟踪目标转化为PTZ命令放入队列"""
|
|
|
- x_ratio, y_ratio = target.position
|
|
|
-
|
|
|
- # 检查位置变化是否超过阈值
|
|
|
- if self.last_ptz_position is not None:
|
|
|
- last_x, last_y = self.last_ptz_position
|
|
|
- if abs(x_ratio - last_x) < self.ptz_position_threshold and \
|
|
|
- abs(y_ratio - last_y) < self.ptz_position_threshold:
|
|
|
- return
|
|
|
-
|
|
|
- # 冷却检查(线程安全)
|
|
|
- current_time = time.time()
|
|
|
- with self._last_ptz_time_lock:
|
|
|
- if current_time - self._last_ptz_time < self.PTZ_COMMAND_COOLDOWN:
|
|
|
- return
|
|
|
-
|
|
|
- cmd = PTZCommand(
|
|
|
- pan=0, tilt=0, zoom=0,
|
|
|
- x_ratio=x_ratio, y_ratio=y_ratio,
|
|
|
- use_calibration=self.enable_calibration
|
|
|
- )
|
|
|
-
|
|
|
- try:
|
|
|
- self._ptz_queue.put_nowait(cmd)
|
|
|
- self.last_ptz_position = (x_ratio, y_ratio)
|
|
|
- except queue.Full:
|
|
|
- pass # 丢弃命令,下一个检测周期会重发
|
|
|
-
|
|
|
- def _send_ptz_command_with_log(self, target: TrackingTarget, frame_size: Tuple[int, int]):
|
|
|
- """发送PTZ命令并打印日志"""
|
|
|
- x_ratio, y_ratio = target.position
|
|
|
-
|
|
|
- # 冷却检查(线程安全)
|
|
|
- current_time = time.time()
|
|
|
- with self._last_ptz_time_lock:
|
|
|
- if current_time - self._last_ptz_time < self.PTZ_COMMAND_COOLDOWN:
|
|
|
- return
|
|
|
-
|
|
|
- # 位置变化阈值检查
|
|
|
- if self.last_ptz_position is not None:
|
|
|
- last_x, last_y = self.last_ptz_position
|
|
|
- if abs(x_ratio - last_x) < self.ptz_position_threshold and \
|
|
|
- abs(y_ratio - last_y) < self.ptz_position_threshold:
|
|
|
- return
|
|
|
-
|
|
|
- # 计算PTZ角度(用于日志)
|
|
|
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
|
|
|
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
|
|
|
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
|
|
|
- coord_type = "校准坐标"
|
|
|
- else:
|
|
|
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
|
|
|
- coord_type = "估算坐标"
|
|
|
-
|
|
|
- # 获取当前批次信息和人员序号
|
|
|
- batch_id = self._current_batch_id if self._enable_paired_saving else None
|
|
|
- person_index = self._person_ptz_index.get(target.track_id, -1) if self._enable_paired_saving else -1
|
|
|
-
|
|
|
- cmd = PTZCommand(
|
|
|
- pan=0, tilt=0, zoom=0,
|
|
|
- x_ratio=x_ratio, y_ratio=y_ratio,
|
|
|
- use_calibration=self.enable_calibration,
|
|
|
- track_id=target.track_id, # 传递跟踪ID
|
|
|
- batch_id=batch_id, # 传递批次ID
|
|
|
- person_index=person_index # 传递人员序号
|
|
|
- )
|
|
|
-
|
|
|
- try:
|
|
|
- self._ptz_queue.put_nowait(cmd)
|
|
|
- self.last_ptz_position = (x_ratio, y_ratio) # 更新位置记录
|
|
|
- self._update_stats('ptz_commands_sent' if 'ptz_commands_sent' in self.stats else 'persons_detected')
|
|
|
- logger.info(
|
|
|
- f"[PTZ] 命令已发送: 目标ID={target.track_id} "
|
|
|
- f"全景位置=({x_ratio:.3f}, {y_ratio:.3f}) → "
|
|
|
- f"PTZ角度=(pan={pan:.1f}°, tilt={tilt:.1f}°, zoom={zoom}) [{coord_type}]"
|
|
|
- )
|
|
|
- except queue.Full:
|
|
|
- logger.warning("[PTZ] 命令队列满,丢弃本次命令")
|
|
|
-
|
|
|
- def _execute_ptz_command(self, cmd: PTZCommand):
|
|
|
- """
|
|
|
- 执行PTZ命令(在PTZ线程中)
|
|
|
-
|
|
|
- Args:
|
|
|
- cmd: PTZ命令(包含 batch_id, person_index, track_id 用于配对保存)
|
|
|
- """
|
|
|
- # 更新最后执行时间(线程安全)
|
|
|
- with self._last_ptz_time_lock:
|
|
|
- self._last_ptz_time = time.time()
|
|
|
-
|
|
|
- # 从命令中提取配对保存相关信息
|
|
|
- track_id = cmd.track_id
|
|
|
- batch_id = cmd.batch_id
|
|
|
- person_index = cmd.person_index
|
|
|
-
|
|
|
- if cmd.use_calibration and self.calibrator and self.calibrator.is_calibrated():
|
|
|
- # 校准器返回的是可直接发送给球机的真实 PTZ 角度,不再应用 pan_flip
|
|
|
- pan, tilt = self.calibrator.transform(cmd.x_ratio, cmd.y_ratio)
|
|
|
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
|
|
|
- else:
|
|
|
- pan, tilt, zoom = self.ptz.calculate_ptz_position(cmd.x_ratio, cmd.y_ratio)
|
|
|
-
|
|
|
- self._set_state(TrackingState.TRACKING)
|
|
|
- logger.info(
|
|
|
- f"[PTZ] 执行: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom} "
|
|
|
- f"(全景位置=({cmd.x_ratio:.3f}, {cmd.y_ratio:.3f}), "
|
|
|
- f"batch={batch_id}, person={person_index})"
|
|
|
- )
|
|
|
-
|
|
|
- success = self.ptz.goto_exact_position(pan, tilt, zoom)
|
|
|
-
|
|
|
- if success:
|
|
|
- # 等待球机物理移动到位(增加额外等待确保画面清晰)
|
|
|
- time.sleep(self.PTZ_CONFIRM_WAIT)
|
|
|
-
|
|
|
- # 球机端人体检测与自动对焦
|
|
|
- final_pan, final_tilt, final_zoom = pan, tilt, zoom
|
|
|
- if self.enable_ptz_detection and self.auto_zoom_config.get('enabled', False):
|
|
|
- auto_zoom_result = self._auto_zoom_person(pan, tilt, zoom)
|
|
|
- if auto_zoom_result != zoom:
|
|
|
- final_zoom = auto_zoom_result
|
|
|
- # 自动变焦后再次等待画面稳定
|
|
|
- time.sleep(0.5)
|
|
|
-
|
|
|
- # 获取清晰的球机画面(尝试多次获取最新帧)
|
|
|
- ptz_frame = self._get_clear_ptz_frame()
|
|
|
-
|
|
|
- # 保存球机图片到配对批次(使用命令中的 batch_id 和 person_index)
|
|
|
- if self._enable_paired_saving and batch_id is not None and person_index >= 0 and ptz_frame is not None:
|
|
|
- # 使用球机端检测器检测人体并标记(用于标记图)
|
|
|
- ptz_frame_marked = self._mark_ptz_frame_with_detection(ptz_frame, person_index=person_index)
|
|
|
- # 传入原始帧保存为原图,标记帧保存为标记图
|
|
|
- self._save_ptz_image_for_person_batch(batch_id, person_index, ptz_frame, (final_pan, final_tilt, final_zoom), ptz_frame_marked=ptz_frame_marked)
|
|
|
- elif self._enable_paired_saving:
|
|
|
- logger.warning(f"[配对保存] 跳过球机图保存: batch_id={batch_id}, person_index={person_index}, frame={ptz_frame is not None}")
|
|
|
-
|
|
|
- logger.info(f"[PTZ] 到位确认完成: pan={final_pan:.1f}° tilt={final_tilt:.1f}° zoom={final_zoom}")
|
|
|
- else:
|
|
|
- logger.warning(f"[PTZ] 命令执行失败: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}")
|
|
|
-
|
|
|
- def _auto_zoom_person(self, initial_pan: float, initial_tilt: float, initial_zoom: int) -> int:
|
|
|
- """
|
|
|
- 自动对焦人体
|
|
|
- 在球机画面中检测人体,自动调整zoom使人体居中且大小合适
|
|
|
-
|
|
|
- Returns:
|
|
|
- 最终的 zoom 值
|
|
|
- """
|
|
|
- if self.auto_zoom_controller is None:
|
|
|
- return initial_zoom
|
|
|
-
|
|
|
- logger.info("[AutoZoom] 开始自动对焦...")
|
|
|
-
|
|
|
- try:
|
|
|
- success, final_zoom = self.auto_zoom_controller.auto_focus_loop(
|
|
|
- get_frame_func=self.ptz.get_frame,
|
|
|
- max_attempts=self.auto_zoom_config.get('max_adjust_attempts', 3)
|
|
|
- )
|
|
|
-
|
|
|
- if success:
|
|
|
- logger.info(f"[AutoZoom] 自动对焦成功: zoom={final_zoom}")
|
|
|
- return final_zoom
|
|
|
- else:
|
|
|
- logger.warning("[AutoZoom] 自动对焦未能定位人体")
|
|
|
- return initial_zoom
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"[AutoZoom] 自动对焦异常: {e}")
|
|
|
- return initial_zoom
|
|
|
-
|
|
|
- def _get_clear_ptz_frame(self, max_attempts: int = None, wait_interval: float = None) -> Optional[np.ndarray]:
|
|
|
- """
|
|
|
- 获取清晰的球机画面
|
|
|
- 尝试多次获取,丢弃模糊/过渡帧
|
|
|
-
|
|
|
- Args:
|
|
|
- max_attempts: 最大尝试次数(默认从配置读取)
|
|
|
- wait_interval: 每次等待间隔(默认从配置读取)
|
|
|
-
|
|
|
- Returns:
|
|
|
- 清晰的球机帧或 None
|
|
|
- """
|
|
|
- # 使用配置值或默认值
|
|
|
- cfg = self._frame_config
|
|
|
- max_attempts = max_attempts or cfg.get('max_attempts', 8)
|
|
|
- wait_interval = wait_interval or cfg.get('wait_interval', 0.2)
|
|
|
- min_clarity = cfg.get('min_clarity', 200)
|
|
|
-
|
|
|
- best_frame = None
|
|
|
- best_score = -1
|
|
|
-
|
|
|
- # 先刷新缓冲区,丢弃旧帧
|
|
|
- logger.debug("[帧获取] 刷新RTSP缓冲区...")
|
|
|
- for _ in range(5):
|
|
|
- self.ptz.get_frame()
|
|
|
- time.sleep(0.05)
|
|
|
-
|
|
|
- # 尝试获取清晰帧
|
|
|
- for i in range(max_attempts):
|
|
|
- frame = self.ptz.get_frame()
|
|
|
- if frame is not None:
|
|
|
- # 立即复制帧,防止 RTSP 流更新导致帧被覆盖
|
|
|
- frame_copy = frame.copy()
|
|
|
-
|
|
|
- # 使用拉普拉斯算子评估图像清晰度
|
|
|
- gray = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2GRAY)
|
|
|
- laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
|
|
|
-
|
|
|
- logger.debug(f"[帧获取] 尝试 {i+1}/{max_attempts}: 清晰度={laplacian_var:.1f}")
|
|
|
-
|
|
|
- if laplacian_var > best_score:
|
|
|
- best_score = laplacian_var
|
|
|
- best_frame = frame_copy
|
|
|
-
|
|
|
- # 如果清晰度足够高,直接返回
|
|
|
- if laplacian_var > min_clarity:
|
|
|
- logger.info(f"[帧获取] 获取清晰帧: 尝试 {i+1} 次, 清晰度={laplacian_var:.1f}")
|
|
|
- return frame_copy
|
|
|
-
|
|
|
- time.sleep(wait_interval)
|
|
|
-
|
|
|
- if best_frame is not None:
|
|
|
- logger.info(f"[帧获取] 返回最佳帧: 清晰度={best_score:.1f}")
|
|
|
- else:
|
|
|
- logger.warning("[帧获取] 未能获取有效帧")
|
|
|
-
|
|
|
- return best_frame
|
|
|
-
|
|
|
- def _mark_ptz_frame_with_detection(self, frame: np.ndarray, person_index: int) -> np.ndarray:
|
|
|
- """
|
|
|
- 在球机帧上标记检测到的人体
|
|
|
-
|
|
|
- Args:
|
|
|
- frame: 球机帧
|
|
|
- person_index: 人员序号
|
|
|
-
|
|
|
- Returns:
|
|
|
- 标记后的帧
|
|
|
- """
|
|
|
- marked_frame = frame.copy()
|
|
|
- h, w = marked_frame.shape[:2]
|
|
|
-
|
|
|
- # 重置保存的bbox
|
|
|
- self._last_ptz_bbox = None
|
|
|
-
|
|
|
- # 使用球机端检测器检测人体
|
|
|
- if self.ptz_detector is not None:
|
|
|
- try:
|
|
|
- persons = self.ptz_detector.detect(frame)
|
|
|
- if persons:
|
|
|
- # 找到最大的人体(假设是目标)
|
|
|
- largest_person = max(persons, key=lambda p: p.area)
|
|
|
- x1, y1, x2, y2 = largest_person.bbox
|
|
|
-
|
|
|
- # 保存bbox供后续使用
|
|
|
- self._last_ptz_bbox = (x1, y1, x2, y2)
|
|
|
-
|
|
|
- # 绘制边界框(红色,区别于全景的绿色)
|
|
|
- cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
|
|
|
-
|
|
|
- # 绘制标签
|
|
|
- label = f"person_{person_index} ({largest_person.confidence:.2f})"
|
|
|
- (label_w, label_h), _ = cv2.getTextSize(
|
|
|
- label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2
|
|
|
- )
|
|
|
-
|
|
|
- # 标签背景(红色)
|
|
|
- cv2.rectangle(
|
|
|
- marked_frame,
|
|
|
- (x1, y1 - label_h - 8),
|
|
|
- (x1 + label_w, y1),
|
|
|
- (0, 0, 255),
|
|
|
- -1
|
|
|
- )
|
|
|
-
|
|
|
- # 标签文字(白色)
|
|
|
- cv2.putText(
|
|
|
- marked_frame, label,
|
|
|
- (x1, y1 - 4),
|
|
|
- cv2.FONT_HERSHEY_SIMPLEX, 0.7,
|
|
|
- (255, 255, 255), 2
|
|
|
- )
|
|
|
-
|
|
|
- logger.info(f"[配对保存] 球机图标记: person_{person_index}, "
|
|
|
- f"位置=({x1},{y1},{x2},{y2}), 置信度={largest_person.confidence:.2f}")
|
|
|
- else:
|
|
|
- # 未检测到人体,在画面中心添加提示
|
|
|
- cv2.putText(
|
|
|
- marked_frame, f"person_{person_index} (no detection)",
|
|
|
- (w // 2 - 100, h // 2),
|
|
|
- cv2.FONT_HERSHEY_SIMPLEX, 0.8,
|
|
|
- (0, 0, 255), 2
|
|
|
- )
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"[配对保存] 球机图检测标记失败: {e}")
|
|
|
-
|
|
|
- return marked_frame
|
|
|
-
|
|
|
- def _confirm_ptz_position(self, x_ratio: float, y_ratio: float):
|
|
|
- """PTZ位置确认:读取球机帧验证目标是否可见"""
|
|
|
- if not hasattr(self.ptz, 'get_frame') or self.ptz.get_frame() is None:
|
|
|
- return
|
|
|
-
|
|
|
- ptz_frame = self.ptz.get_frame()
|
|
|
- if ptz_frame is None:
|
|
|
- return
|
|
|
-
|
|
|
- # 未来可以在这里添加球机帧目标验证逻辑
|
|
|
- # 例如:在球机帧中检测目标是否在画面中心附近
|
|
|
-
|
|
|
- def on_ptz_confirmed(self, callback: Callable):
|
|
|
- """注册PTZ位置确认回调"""
|
|
|
- self._on_ptz_confirmed = callback
|
|
|
-
|
|
|
-
|
|
|
-class SequentialCoordinator(AsyncCoordinator):
|
|
|
- """
|
|
|
- 顺序联动控制器 — 全景检测→逐个PTZ抓拍→回到全景的循环模式
|
|
|
-
|
|
|
- 工作流程:
|
|
|
- 1. 全景摄像头检测人员
|
|
|
- 2. 检测到人员后,暂停全景检测
|
|
|
- 3. 球机依次对每个检测到的人员进行PTZ定位+变焦抓拍
|
|
|
- 4. 所有人员抓拍完成后,球机回到默认位置
|
|
|
- 5. 恢复全景检测,进入下一轮循环
|
|
|
-
|
|
|
- 适用于需要高质量抓拍的场景,确保每个目标都能被球机清晰拍摄
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self, *args, **kwargs):
|
|
|
- super().__init__(*args, **kwargs)
|
|
|
-
|
|
|
- # 顺序抓拍状态
|
|
|
- self._capture_state = 'idle' # 'idle', 'detecting', 'capturing', 'returning'
|
|
|
- self._capture_state_lock = threading.Lock()
|
|
|
-
|
|
|
- # 当前批次检测到的目标
|
|
|
- self._batch_targets: List[TrackingTarget] = []
|
|
|
- self._batch_targets_lock = threading.Lock()
|
|
|
- self._current_capture_index = 0
|
|
|
-
|
|
|
- # 【关键修复】保存当前批次的完整信息,防止抓拍过程中被覆盖
|
|
|
- self._capture_batch_id: Optional[str] = None
|
|
|
- self._capture_batch_size: int = 0
|
|
|
-
|
|
|
- # 抓拍完成事件(用于同步)
|
|
|
- self._capture_done_event = threading.Event()
|
|
|
-
|
|
|
- # 配置参数 - 从 PTZ_CONFIG 读取
|
|
|
- ptz_capture_config = PTZ_CONFIG.get('capture', {})
|
|
|
- self._capture_config = {
|
|
|
- 'ptz_stabilize_time': ptz_capture_config.get('stabilize_time', 3.0), # PTZ到位后稳定等待时间(秒)
|
|
|
- 'capture_wait_time': 0.5, # 抓拍等待时间
|
|
|
- 'auto_zoom_wait_time': 1.5, # AutoZoom变焦后额外等待时间(秒),增加以确保对焦完成
|
|
|
- 'return_to_panorama': True, # 完成后是否回到全景默认位置
|
|
|
- 'default_pan': 0.0, # 默认pan角度
|
|
|
- 'default_tilt': 0.0, # 默认tilt角度
|
|
|
- 'default_zoom': 1, # 默认zoom(广角)
|
|
|
- }
|
|
|
-
|
|
|
- # 帧获取配置(覆盖父类默认值)
|
|
|
- self._frame_config.update({
|
|
|
- 'wait_interval': ptz_capture_config.get('frame_wait_interval', 0.2),
|
|
|
- 'max_attempts': ptz_capture_config.get('frame_max_attempts', 8),
|
|
|
- 'min_clarity': ptz_capture_config.get('min_clarity', 200),
|
|
|
- })
|
|
|
-
|
|
|
- # 覆盖父类的PTZ冷却时间(顺序模式下可以更短)
|
|
|
- self.PTZ_COMMAND_COOLDOWN = 0.1
|
|
|
-
|
|
|
- logger.info("[SequentialCoordinator] 顺序联动控制器初始化完成")
|
|
|
-
|
|
|
- def _detection_worker(self):
|
|
|
- """检测线程:顺序模式下的检测逻辑"""
|
|
|
- # 从 DETECTION_CONFIG 获取检测帧率,默认每秒2帧
|
|
|
- detection_fps = self.config.get('detection_fps', DETECTION_CONFIG.get('detection_fps', 2))
|
|
|
- detection_interval = 1.0 / detection_fps
|
|
|
- last_detection_time = 0
|
|
|
-
|
|
|
- frame_count = 0
|
|
|
- last_log_time = time.time()
|
|
|
- log_interval = 5.0
|
|
|
- detection_run_count = 0
|
|
|
- detection_person_count = 0
|
|
|
- detection_last_seen = 0
|
|
|
- last_no_detect_log_time = 0
|
|
|
- no_detect_log_interval = 30.0
|
|
|
-
|
|
|
- with self.stats_lock:
|
|
|
- self.stats['start_time'] = time.time()
|
|
|
-
|
|
|
- if self.detector is None:
|
|
|
- logger.warning("[检测线程] ⚠️ 人体检测器未初始化!")
|
|
|
- else:
|
|
|
- logger.info(f"[检测线程] ✓ 顺序模式已就绪, 检测帧率={detection_fps}fps")
|
|
|
-
|
|
|
- while self.running:
|
|
|
- try:
|
|
|
- current_time = time.time()
|
|
|
-
|
|
|
- # 获取当前帧
|
|
|
- frame = self.panorama.get_frame()
|
|
|
- if frame is None:
|
|
|
- time.sleep(0.01)
|
|
|
- continue
|
|
|
-
|
|
|
- frame_count += 1
|
|
|
- self._update_stats('frames_processed')
|
|
|
- frame_size = (frame.shape[1], frame.shape[0])
|
|
|
-
|
|
|
- # 日志输出
|
|
|
- if current_time - last_log_time >= log_interval:
|
|
|
- elapsed = current_time - last_log_time
|
|
|
- fps = frame_count / elapsed if elapsed > 0 else 0
|
|
|
- state_str = self._get_capture_state()
|
|
|
- if detection_last_seen > 0:
|
|
|
- ago = int(current_time - detection_last_seen)
|
|
|
- person_info = f"最后有人={ago}s前"
|
|
|
- else:
|
|
|
- person_info = "未检出"
|
|
|
- logger.info(f"[检测线程] 帧率={fps:.1f}fps, 状态={state_str}, "
|
|
|
- f"检测轮次={detection_run_count}({person_info})")
|
|
|
- frame_count = 0
|
|
|
- last_log_time = current_time
|
|
|
-
|
|
|
- # 状态机处理
|
|
|
- state = self._get_capture_state()
|
|
|
-
|
|
|
- if state == 'idle':
|
|
|
- # 【关键修复】每轮检测开始前清空跟踪目标,防止跨帧累积
|
|
|
- with self.targets_lock:
|
|
|
- if self.tracking_targets:
|
|
|
- logger.debug(f"[顺序模式] 清空上一轮跟踪目标: {len(self.tracking_targets)} 个")
|
|
|
- self.tracking_targets.clear()
|
|
|
-
|
|
|
- # 空闲状态:周期性检测(暂停时跳过)
|
|
|
- if not self._paused and current_time - last_detection_time >= detection_interval:
|
|
|
- last_detection_time = current_time
|
|
|
- detection_run_count += 1
|
|
|
-
|
|
|
- # 执行检测
|
|
|
- detections = self._detect_persons(frame)
|
|
|
- # 使用 BYTETracker 进行跟踪(失败时回退到位置匹配)
|
|
|
- detections = self._update_with_bytetrack(detections, frame, frame_size)
|
|
|
-
|
|
|
- if detections:
|
|
|
- self._update_stats('persons_detected', len(detections))
|
|
|
- detection_last_seen = current_time
|
|
|
- detection_person_count += 1
|
|
|
-
|
|
|
- # 【调试日志】检查跟踪目标数量
|
|
|
- with self.targets_lock:
|
|
|
- tracking_count = len(self.tracking_targets)
|
|
|
- logger.info(f"[顺序模式] 检测到 {len(detections)} 个目标, 跟踪列表 {tracking_count} 个")
|
|
|
-
|
|
|
- # 【关键修复】先创建配对批次并获取去重后的人员列表
|
|
|
- dedup_persons = []
|
|
|
- if self._enable_paired_saving and self._paired_saver is not None:
|
|
|
- dedup_persons = self._create_detection_batch(frame, detections, frame_size)
|
|
|
- else:
|
|
|
- # 如果未启用配对保存,也需要去重
|
|
|
- dedup_persons = self._deduplicate_detections(detections, frame_size)
|
|
|
-
|
|
|
- # 【调试日志】显示去重后目标数量
|
|
|
- logger.info(f"[顺序模式] 去重后有效目标数量: {len(dedup_persons)}")
|
|
|
-
|
|
|
- if dedup_persons:
|
|
|
- # 将去重后的检测结果转换为抓拍目标
|
|
|
- capture_targets = []
|
|
|
- for i, det in enumerate(dedup_persons):
|
|
|
- x_ratio = det.center[0] / frame_size[0]
|
|
|
- y_ratio = det.center[1] / frame_size[1]
|
|
|
- target = TrackingTarget(
|
|
|
- track_id=det.track_id,
|
|
|
- position=(x_ratio, y_ratio),
|
|
|
- last_update=current_time,
|
|
|
- area=det.bbox[2] * det.bbox[3],
|
|
|
- confidence=det.confidence
|
|
|
- )
|
|
|
- capture_targets.append(target)
|
|
|
-
|
|
|
- logger.info(f"[顺序模式] 检测到 {len(capture_targets)} 个目标,开始顺序抓拍")
|
|
|
-
|
|
|
- # 【关键修复】切换到抓拍状态后立即清空 tracking_targets
|
|
|
- # 防止后续检测再次获取到同一批目标
|
|
|
- with self.targets_lock:
|
|
|
- self.tracking_targets.clear()
|
|
|
- logger.info("[顺序模式] 已清空跟踪目标,防止重复抓拍")
|
|
|
-
|
|
|
- # 切换到抓拍状态(使用去重后的目标)
|
|
|
- self._start_capture_sequence(capture_targets)
|
|
|
- else:
|
|
|
- logger.warning(f"[顺序模式] 去重后无有效目标,跳过抓拍")
|
|
|
- else:
|
|
|
- # 未检测到人员
|
|
|
- if current_time - last_no_detect_log_time >= no_detect_log_interval:
|
|
|
- logger.info(f"[检测] 本轮未检测到人员 (累计{detection_run_count}轮)")
|
|
|
- last_no_detect_log_time = current_time
|
|
|
-
|
|
|
- elif state == 'capturing':
|
|
|
- # 抓拍状态中:检测线程等待,不执行新检测
|
|
|
- # 等待PTZ线程完成当前批次
|
|
|
- pass
|
|
|
-
|
|
|
- elif state == 'returning':
|
|
|
- # 球机回到默认位置中
|
|
|
- pass
|
|
|
-
|
|
|
- # 清理过期目标
|
|
|
- self._cleanup_expired_targets()
|
|
|
- time.sleep(0.01)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"[检测线程] 错误: {e}")
|
|
|
- time.sleep(0.1)
|
|
|
-
|
|
|
- def _ptz_worker(self):
|
|
|
- """PTZ控制线程:顺序模式下的PTZ控制逻辑"""
|
|
|
- logger.info("[PTZ线程] 顺序模式PTZ控制线程启动")
|
|
|
-
|
|
|
- while self.running:
|
|
|
- try:
|
|
|
- # 暂停时等待恢复
|
|
|
- if self._paused:
|
|
|
- self._paused_event.wait()
|
|
|
- time.sleep(0.05)
|
|
|
- continue
|
|
|
-
|
|
|
- state = self._get_capture_state()
|
|
|
-
|
|
|
- if state == 'capturing':
|
|
|
- # 执行顺序抓拍(阻塞直到当前目标抓拍完成)
|
|
|
- self._execute_sequential_capture()
|
|
|
-
|
|
|
- elif state == 'returning':
|
|
|
- # 回到默认位置
|
|
|
- self._return_to_default_position()
|
|
|
-
|
|
|
- # idle 状态不做任何操作,让检测线程控制
|
|
|
- time.sleep(0.05)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"[PTZ线程] 错误: {e}")
|
|
|
- time.sleep(0.1)
|
|
|
-
|
|
|
- def _get_capture_state(self) -> str:
|
|
|
- """获取当前抓拍状态"""
|
|
|
- with self._capture_state_lock:
|
|
|
- return self._capture_state
|
|
|
-
|
|
|
- def _set_capture_state(self, state: str):
|
|
|
- """设置抓拍状态"""
|
|
|
- with self._capture_state_lock:
|
|
|
- old_state = self._capture_state
|
|
|
- self._capture_state = state
|
|
|
- logger.info(f"[顺序模式] 状态切换: {old_state} -> {state}")
|
|
|
-
|
|
|
- def _start_capture_sequence(self, targets: List[TrackingTarget]):
|
|
|
- """开始顺序抓拍序列"""
|
|
|
- with self._batch_targets_lock:
|
|
|
- # 【关键修复】先清空再赋值,防止累积
|
|
|
- old_count = len(self._batch_targets)
|
|
|
- self._batch_targets = targets.copy() # 直接替换,不追加
|
|
|
- self._current_capture_index = 0
|
|
|
-
|
|
|
- # 【关键修复】保存批次信息,防止抓拍过程中被新批次覆盖
|
|
|
- self._capture_batch_id = self._current_batch_id
|
|
|
- self._capture_batch_size = len(targets)
|
|
|
-
|
|
|
- logger.info(f"[顺序模式] 开始抓拍序列: {old_count} -> {len(self._batch_targets)} 个目标, "
|
|
|
- f"批次ID={self._capture_batch_id}")
|
|
|
-
|
|
|
- self._set_capture_state('capturing')
|
|
|
-
|
|
|
- def _execute_sequential_capture(self):
|
|
|
- """执行顺序抓拍(依次对每个目标进行PTZ定位和抓拍)"""
|
|
|
- with self._batch_targets_lock:
|
|
|
- targets = self._batch_targets.copy()
|
|
|
- current_idx = self._current_capture_index
|
|
|
- batch_size = self._capture_batch_size
|
|
|
- batch_id = self._capture_batch_id
|
|
|
-
|
|
|
- # 【调试日志】详细输出抓拍状态
|
|
|
- logger.info(f"[顺序模式] 执行抓拍: idx={current_idx}, batch_size={batch_size}, "
|
|
|
- f"targets_len={len(targets)}, batch_id={batch_id}")
|
|
|
-
|
|
|
- # 使用保存的批次大小进行检查,而不是 len(targets)
|
|
|
- if current_idx >= batch_size:
|
|
|
- # 所有目标已抓拍完成
|
|
|
- logger.info(f"[顺序模式] 所有目标抓拍完成: {current_idx}/{batch_size}")
|
|
|
- self._set_capture_state('returning')
|
|
|
- return
|
|
|
-
|
|
|
-
|
|
|
- # 【安全检查】确保 targets 有足够的数据
|
|
|
- if current_idx >= len(targets):
|
|
|
- logger.warning(f"[顺序模式] 索引越界: idx={current_idx} >= targets_len={len(targets)}, "
|
|
|
- f"batch_size={batch_size}。可能批次信息不一致!")
|
|
|
- self._set_capture_state('returning')
|
|
|
- return
|
|
|
-
|
|
|
- # 获取当前目标
|
|
|
- target = targets[current_idx]
|
|
|
- x_ratio, y_ratio = target.position
|
|
|
-
|
|
|
- # 计算PTZ角度
|
|
|
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
|
|
|
- # 校准器返回的是可直接发送给球机的真实 PTZ 角度,不再应用 pan_flip
|
|
|
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
|
|
|
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
|
|
|
- coord_type = "校准坐标"
|
|
|
- else:
|
|
|
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
|
|
|
- coord_type = "估算坐标"
|
|
|
-
|
|
|
- # 获取批次信息(使用保存的批次ID,防止被新批次覆盖)
|
|
|
- with self._batch_targets_lock:
|
|
|
- batch_id = self._capture_batch_id if self._enable_paired_saving else None
|
|
|
- batch_size = self._capture_batch_size
|
|
|
- person_index = current_idx # 使用当前索引作为人员序号
|
|
|
-
|
|
|
- logger.info(f"[顺序模式] 抓拍目标 {current_idx + 1}/{batch_size}: "
|
|
|
- f"位置=({x_ratio:.3f}, {y_ratio:.3f}) -> "
|
|
|
- f"PTZ=({pan:.1f}°, {tilt:.1f}°, zoom={zoom}) [{coord_type}], 批次={batch_id}")
|
|
|
-
|
|
|
- # 【关键修复】先递增索引,防止 PTZ 线程重复进入时重复执行
|
|
|
- with self._batch_targets_lock:
|
|
|
- self._current_capture_index += 1
|
|
|
-
|
|
|
- # 执行PTZ移动
|
|
|
- self._set_state(TrackingState.TRACKING)
|
|
|
-
|
|
|
- # 【调试】记录移动前的 PTZ 位置
|
|
|
- try:
|
|
|
- current_pos = self.ptz.get_current_position()
|
|
|
- logger.info(f"[顺序模式] PTZ移动前: pan={current_pos.pan:.1f}° tilt={current_pos.tilt:.1f}° zoom={current_pos.zoom}")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"[顺序模式] 获取当前PTZ位置失败: {e}")
|
|
|
- current_pos = None
|
|
|
-
|
|
|
- success = self.ptz.goto_exact_position(pan, tilt, zoom)
|
|
|
-
|
|
|
- if success:
|
|
|
- # 等待球机物理移动到位
|
|
|
- stabilize_time = self._capture_config['ptz_stabilize_time']
|
|
|
- logger.info(f"[顺序模式] 等待球机稳定 {stabilize_time}s...")
|
|
|
-
|
|
|
- # 【调试】记录移动后的 PTZ 位置
|
|
|
- time.sleep(0.5) # 短暂等待后检查
|
|
|
- try:
|
|
|
- after_pos = self.ptz.get_current_position()
|
|
|
- logger.info(f"[顺序模式] PTZ移动后: pan={after_pos.pan:.1f}° tilt={after_pos.tilt:.1f}° zoom={after_pos.zoom}")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"[顺序模式] 获取PTZ位置失败: {e}")
|
|
|
- time.sleep(stabilize_time)
|
|
|
-
|
|
|
- # 【关键修复】清空RTSP缓冲区,确保获取的是新位置的帧
|
|
|
- logger.debug("[顺序模式] 清空RTSP缓冲区...")
|
|
|
- for _ in range(5):
|
|
|
- self.ptz.get_frame()
|
|
|
- time.sleep(0.05)
|
|
|
-
|
|
|
- # 自动变焦(如果启用)
|
|
|
- final_zoom = zoom
|
|
|
- if self.enable_ptz_detection and self.auto_zoom_config.get('enabled', False):
|
|
|
- auto_zoom_result = self._auto_zoom_person(pan, tilt, zoom)
|
|
|
- if auto_zoom_result != zoom:
|
|
|
- final_zoom = auto_zoom_result
|
|
|
- # 变焦后等待镜头对焦
|
|
|
- auto_zoom_wait = self._capture_config.get('auto_zoom_wait_time', 1.0)
|
|
|
- logger.info(f"[顺序模式] 变焦完成,等待镜头对焦 {auto_zoom_wait}s...")
|
|
|
- time.sleep(auto_zoom_wait)
|
|
|
- # 变焦后再次清空缓冲区
|
|
|
- for _ in range(3):
|
|
|
- self.ptz.get_frame()
|
|
|
- time.sleep(0.05)
|
|
|
-
|
|
|
- # 获取清晰的球机画面
|
|
|
- ptz_frame = self._get_clear_ptz_frame()
|
|
|
-
|
|
|
- if ptz_frame is not None:
|
|
|
- # 【调试】记录抓拍的图像信息
|
|
|
- frame_h, frame_w = ptz_frame.shape[:2]
|
|
|
- logger.info(f"[顺序模式] 抓拍帧: {frame_w}x{frame_h}, 目标序号={person_index}, PTZ=({pan:.1f}°, {tilt:.1f}°, zoom={final_zoom})")
|
|
|
-
|
|
|
- # 保存球机图片
|
|
|
- if self._enable_paired_saving and batch_id is not None:
|
|
|
- # 使用球机端检测器检测人体并标记(用于标记图)
|
|
|
- ptz_frame_marked = self._mark_ptz_frame_with_detection(ptz_frame, person_index=person_index)
|
|
|
- # 传入原始帧保存为原图,标记帧保存为标记图
|
|
|
- self._save_ptz_image_for_person_batch(
|
|
|
- batch_id, person_index, ptz_frame,
|
|
|
- (pan, tilt, final_zoom), ptz_frame_marked=ptz_frame_marked
|
|
|
- )
|
|
|
-
|
|
|
- logger.info(f"[顺序模式] 目标 {current_idx + 1} 抓拍完成")
|
|
|
- else:
|
|
|
- logger.warning(f"[顺序模式] 获取球机画面失败")
|
|
|
- else:
|
|
|
- logger.warning(f"[顺序模式] PTZ移动失败")
|
|
|
-
|
|
|
- # 抓拍间隔
|
|
|
- time.sleep(self._capture_config['capture_wait_time'])
|
|
|
-
|
|
|
- def _return_to_default_position(self):
|
|
|
- """球机回到默认位置(广角全景)"""
|
|
|
- if not self._capture_config['return_to_panorama']:
|
|
|
- # 不回到默认位置,直接回到空闲状态
|
|
|
- self._set_capture_state('idle')
|
|
|
- return
|
|
|
-
|
|
|
- default_pan = self._capture_config['default_pan']
|
|
|
- default_tilt = self._capture_config['default_tilt']
|
|
|
- default_zoom = self._capture_config['default_zoom']
|
|
|
-
|
|
|
- # 【关键修复】等待一小段时间,确保最后的帧已经被读取和保存
|
|
|
- # 因为 _get_clear_ptz_frame 是异步获取帧,可能在抓拍后立刻触发返回默认位置
|
|
|
- time.sleep(0.5)
|
|
|
-
|
|
|
- logger.info(f"[顺序模式] 球机回到默认位置: ({default_pan}°, {default_tilt}°, zoom={default_zoom})")
|
|
|
-
|
|
|
- success = self.ptz.goto_exact_position(default_pan, default_tilt, default_zoom)
|
|
|
-
|
|
|
- if success:
|
|
|
- # 等待到位
|
|
|
- time.sleep(self._capture_config['ptz_stabilize_time'])
|
|
|
- logger.info("[顺序模式] 球机已回到默认位置")
|
|
|
- else:
|
|
|
- logger.warning("[顺序模式] 球机回到默认位置失败")
|
|
|
-
|
|
|
- # 回到空闲状态,恢复全景检测
|
|
|
- self._set_capture_state('idle')
|
|
|
-
|
|
|
- # 清空批次目标
|
|
|
- with self._batch_targets_lock:
|
|
|
- self._batch_targets = []
|
|
|
- self._current_capture_index = 0
|
|
|
- # 【关键修复】清空保存的批次信息
|
|
|
- self._capture_batch_id = None
|
|
|
- self._capture_batch_size = 0
|
|
|
-
|
|
|
- # 【关键修复】清空跟踪目标,防止跨帧累积
|
|
|
- with self.targets_lock:
|
|
|
- self.tracking_targets.clear()
|
|
|
- logger.info("[顺序模式] 已清空跟踪目标列表")
|
|
|
-
|
|
|
- def set_capture_config(self, **kwargs):
|
|
|
- """设置抓拍配置"""
|
|
|
- self._capture_config.update(kwargs)
|
|
|
- logger.info(f"[顺序模式] 配置已更新: {kwargs}")
|
|
|
-
|
|
|
- def get_capture_status(self) -> dict:
|
|
|
- """获取当前抓拍状态"""
|
|
|
- with self._batch_targets_lock:
|
|
|
- total = len(self._batch_targets)
|
|
|
- current = self._current_capture_index
|
|
|
-
|
|
|
- return {
|
|
|
- 'state': self._get_capture_state(),
|
|
|
- 'total_targets': total,
|
|
|
- 'current_index': current,
|
|
|
- 'remaining': max(0, total - current)
|
|
|
- }
|