|
@@ -13,10 +13,11 @@ from enum import Enum
|
|
|
|
|
|
|
|
import numpy as np
|
|
import numpy as np
|
|
|
|
|
|
|
|
-from config import COORDINATOR_CONFIG, SYSTEM_CONFIG
|
|
|
|
|
|
|
+from config import COORDINATOR_CONFIG, SYSTEM_CONFIG, PTZ_CONFIG, DETECTION_CONFIG
|
|
|
from panorama_camera import PanoramaCamera, ObjectDetector, PersonTracker, DetectedObject
|
|
from panorama_camera import PanoramaCamera, ObjectDetector, PersonTracker, DetectedObject
|
|
|
from ptz_camera import PTZCamera, PTZController
|
|
from ptz_camera import PTZCamera, PTZController
|
|
|
from ocr_recognizer import NumberDetector, PersonInfo
|
|
from ocr_recognizer import NumberDetector, PersonInfo
|
|
|
|
|
+from ptz_person_tracker import PTZPersonDetector, PTZAutoZoomController
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
@@ -38,6 +39,160 @@ class TrackingTarget:
|
|
|
last_update: float # 最后更新时间
|
|
last_update: float # 最后更新时间
|
|
|
person_info: Optional[PersonInfo] = None # 人员信息
|
|
person_info: Optional[PersonInfo] = None # 人员信息
|
|
|
priority: int = 0 # 优先级
|
|
priority: int = 0 # 优先级
|
|
|
|
|
+ area: int = 0 # 目标面积(像素²)
|
|
|
|
|
+ confidence: float = 0.0 # 置信度
|
|
|
|
|
+ center_distance: float = 1.0 # 到画面中心的距离比例(0-1)
|
|
|
|
|
+ score: float = 0.0 # 综合得分
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class TargetSelector:
|
|
|
|
|
+ """
|
|
|
|
|
+ 目标选择策略类
|
|
|
|
|
+ 支持按面积、置信度、混合模式排序,支持优先级切换
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self, config: Dict = None):
|
|
|
|
|
+ """
|
|
|
|
|
+ 初始化目标选择器
|
|
|
|
|
+ Args:
|
|
|
|
|
+ config: 目标选择配置
|
|
|
|
|
+ """
|
|
|
|
|
+ self.config = config or {
|
|
|
|
|
+ 'strategy': 'area',
|
|
|
|
|
+ 'area_weight': 0.6,
|
|
|
|
|
+ 'confidence_weight': 0.4,
|
|
|
|
|
+ 'min_area_threshold': 5000,
|
|
|
|
|
+ 'prefer_center': True,
|
|
|
|
|
+ 'center_weight': 0.2,
|
|
|
|
|
+ 'switch_on_lost': True,
|
|
|
|
|
+ 'stickiness': 0.3,
|
|
|
|
|
+ }
|
|
|
|
|
+ self.current_target_id: Optional[int] = None
|
|
|
|
|
+ self.current_target_score: float = 0.0
|
|
|
|
|
+
|
|
|
|
|
+ def calculate_score(self, target: TrackingTarget, frame_size: Tuple[int, int] = None) -> float:
|
|
|
|
|
+ """
|
|
|
|
|
+ 计算目标综合得分
|
|
|
|
|
+ Args:
|
|
|
|
|
+ target: 跟踪目标
|
|
|
|
|
+ frame_size: 帧尺寸(w, h),用于计算中心距离
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 综合得分(0-1)
|
|
|
|
|
+ """
|
|
|
|
|
+ strategy = self.config.get('strategy', 'area')
|
|
|
|
|
+ area_weight = self.config.get('area_weight', 0.6)
|
|
|
|
|
+ conf_weight = self.config.get('confidence_weight', 0.4)
|
|
|
|
|
+ min_area = self.config.get('min_area_threshold', 5000)
|
|
|
|
|
+ prefer_center = self.config.get('prefer_center', False)
|
|
|
|
|
+ center_weight = self.config.get('center_weight', 0.2)
|
|
|
|
|
+
|
|
|
|
|
+ # 归一化面积得分 (对数缩放,避免大目标得分过高)
|
|
|
|
|
+ import math
|
|
|
|
|
+ area_score = min(1.0, math.log10(max(target.area, 1)) / 5.0) # 100000像素² ≈ 1.0
|
|
|
|
|
+
|
|
|
|
|
+ # 小面积惩罚
|
|
|
|
|
+ if target.area < min_area:
|
|
|
|
|
+ area_score *= 0.5
|
|
|
|
|
+
|
|
|
|
|
+ # 置信度得分直接使用
|
|
|
|
|
+ conf_score = target.confidence
|
|
|
|
|
+
|
|
|
|
|
+ # 中心距离得分 (距离中心越近得分越高)
|
|
|
|
|
+ center_score = 1.0 - target.center_distance
|
|
|
|
|
+
|
|
|
|
|
+ # 根据策略计算综合得分
|
|
|
|
|
+ if strategy == 'area':
|
|
|
|
|
+ score = area_score * 0.8 + conf_score * 0.2
|
|
|
|
|
+ elif strategy == 'confidence':
|
|
|
|
|
+ score = conf_score * 0.8 + area_score * 0.2
|
|
|
|
|
+ else: # hybrid
|
|
|
|
|
+ score = area_score * area_weight + conf_score * conf_weight
|
|
|
|
|
+
|
|
|
|
|
+ # 加入中心距离权重
|
|
|
|
|
+ if prefer_center:
|
|
|
|
|
+ score = score * (1 - center_weight) + center_score * center_weight
|
|
|
|
|
+
|
|
|
|
|
+ return score
|
|
|
|
|
+
|
|
|
|
|
+ def select_target(self, targets: Dict[int, TrackingTarget],
|
|
|
|
|
+ frame_size: Tuple[int, int] = None) -> Optional[TrackingTarget]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 从多个目标中选择最优目标
|
|
|
|
|
+ Args:
|
|
|
|
|
+ targets: 目标字典 {track_id: TrackingTarget}
|
|
|
|
|
+ frame_size: 帧尺寸
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 最优目标
|
|
|
|
|
+ """
|
|
|
|
|
+ if not targets:
|
|
|
|
|
+ self.current_target_id = None
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ stickiness = self.config.get('stickiness', 0.3)
|
|
|
|
|
+ switch_on_lost = self.config.get('switch_on_lost', True)
|
|
|
|
|
+
|
|
|
|
|
+ # 计算所有目标得分
|
|
|
|
|
+ scored_targets = []
|
|
|
|
|
+ for track_id, target in targets.items():
|
|
|
|
|
+ target.score = self.calculate_score(target, frame_size)
|
|
|
|
|
+ scored_targets.append((track_id, target, target.score))
|
|
|
|
|
+
|
|
|
|
|
+ # 按得分排序
|
|
|
|
|
+ scored_targets.sort(key=lambda x: x[2], reverse=True)
|
|
|
|
|
+
|
|
|
|
|
+ # 检查当前目标是否仍在列表中
|
|
|
|
|
+ if self.current_target_id is not None:
|
|
|
|
|
+ current_exists = self.current_target_id in targets
|
|
|
|
|
+ if current_exists:
|
|
|
|
|
+ # 应用粘性:当前目标得分需要显著低于最优目标才切换
|
|
|
|
|
+ best_id, best_target, best_score = scored_targets[0]
|
|
|
|
|
+ current_target = targets[self.current_target_id]
|
|
|
|
|
+
|
|
|
|
|
+ # 粘性阈值: 当前目标得分 > 最优得分 * (1 - stickiness) 时保持
|
|
|
|
|
+ stickiness_threshold = best_score * (1 - stickiness)
|
|
|
|
|
+ if current_target.score > stickiness_threshold:
|
|
|
|
|
+ return current_target
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ # 选择得分最高的目标
|
|
|
|
|
+ best_id, best_target, best_score = scored_targets[0]
|
|
|
|
|
+ self.current_target_id = best_id
|
|
|
|
|
+ self.current_target_score = best_score
|
|
|
|
|
+
|
|
|
|
|
+ logger.debug(
|
|
|
|
|
+ f"[目标选择] 选择目标ID={best_id} 得分={best_score:.3f} "
|
|
|
|
|
+ f"面积={best_target.area} 置信度={best_target.confidence:.2f}"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ return best_target
|
|
|
|
|
+
|
|
|
|
|
+ def get_sorted_targets(self, targets: Dict[int, TrackingTarget],
|
|
|
|
|
+ frame_size: Tuple[int, int] = None) -> List[Tuple[TrackingTarget, float]]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 获取按得分排序的目标列表
|
|
|
|
|
+ Args:
|
|
|
|
|
+ targets: 目标字典
|
|
|
|
|
+ frame_size: 帧尺寸
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ 排序后的目标列表 [(target, score), ...]
|
|
|
|
|
+ """
|
|
|
|
|
+ scored = []
|
|
|
|
|
+ for target in targets.values():
|
|
|
|
|
+ target.score = self.calculate_score(target, frame_size)
|
|
|
|
|
+ scored.append((target, target.score))
|
|
|
|
|
+
|
|
|
|
|
+ scored.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
|
+ return scored
|
|
|
|
|
+
|
|
|
|
|
+ def set_strategy(self, strategy: str):
|
|
|
|
|
+ """设置选择策略"""
|
|
|
|
|
+ self.config['strategy'] = strategy
|
|
|
|
|
+ logger.info(f"[目标选择] 策略已切换为: {strategy}")
|
|
|
|
|
+
|
|
|
|
|
+ def set_stickiness(self, stickiness: float):
|
|
|
|
|
+ """设置目标粘性"""
|
|
|
|
|
+ self.config['stickiness'] = max(0.0, min(1.0, stickiness))
|
|
|
|
|
+ logger.info(f"[目标选择] 粘性已设置为: {self.config['stickiness']}")
|
|
|
|
|
|
|
|
|
|
|
|
|
class Coordinator:
|
|
class Coordinator:
|
|
@@ -75,6 +230,12 @@ class Coordinator:
|
|
|
self.enable_detection = SYSTEM_CONFIG.get('enable_detection', True)
|
|
self.enable_detection = SYSTEM_CONFIG.get('enable_detection', True)
|
|
|
self.enable_ocr = SYSTEM_CONFIG.get('enable_ocr', True)
|
|
self.enable_ocr = SYSTEM_CONFIG.get('enable_ocr', True)
|
|
|
|
|
|
|
|
|
|
+ # 球机端人体检测与自动对焦
|
|
|
|
|
+ self.enable_ptz_detection = PTZ_CONFIG.get('enable_ptz_detection', False)
|
|
|
|
|
+ self.auto_zoom_config = PTZ_CONFIG.get('auto_zoom', {})
|
|
|
|
|
+ self.ptz_detector = None
|
|
|
|
|
+ self.auto_zoom_controller = None
|
|
|
|
|
+
|
|
|
# 跟踪器
|
|
# 跟踪器
|
|
|
self.tracker = PersonTracker()
|
|
self.tracker = PersonTracker()
|
|
|
|
|
|
|
@@ -107,6 +268,11 @@ class Coordinator:
|
|
|
self.last_ptz_position = None
|
|
self.last_ptz_position = None
|
|
|
self.ptz_position_threshold = self.config.get('ptz_position_threshold', 0.03)
|
|
self.ptz_position_threshold = self.config.get('ptz_position_threshold', 0.03)
|
|
|
|
|
|
|
|
|
|
+ # 目标选择器
|
|
|
|
|
+ self.target_selector = TargetSelector(
|
|
|
|
|
+ self.config.get('target_selection', {})
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
# 结果队列
|
|
# 结果队列
|
|
|
self.result_queue = queue.Queue()
|
|
self.result_queue = queue.Queue()
|
|
|
|
|
|
|
@@ -292,6 +458,8 @@ class Coordinator:
|
|
|
frame_size: Tuple[int, int]):
|
|
frame_size: Tuple[int, int]):
|
|
|
"""更新跟踪目标"""
|
|
"""更新跟踪目标"""
|
|
|
current_time = time.time()
|
|
current_time = time.time()
|
|
|
|
|
+ frame_w, frame_h = frame_size
|
|
|
|
|
+ center_x, center_y = frame_w / 2, frame_h / 2
|
|
|
|
|
|
|
|
with self.targets_lock:
|
|
with self.targets_lock:
|
|
|
# 更新现有目标
|
|
# 更新现有目标
|
|
@@ -299,21 +467,36 @@ class Coordinator:
|
|
|
if det.track_id is None:
|
|
if det.track_id is None:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- x_ratio = det.center[0] / frame_size[0]
|
|
|
|
|
- y_ratio = det.center[1] / frame_size[1]
|
|
|
|
|
|
|
+ x_ratio = det.center[0] / frame_w
|
|
|
|
|
+ y_ratio = det.center[1] / frame_h
|
|
|
|
|
+
|
|
|
|
|
+ # 计算面积
|
|
|
|
|
+ _, _, width, height = det.bbox
|
|
|
|
|
+ area = width * height
|
|
|
|
|
+
|
|
|
|
|
+ # 计算到画面中心的距离比例
|
|
|
|
|
+ dx = abs(det.center[0] - center_x) / center_x
|
|
|
|
|
+ dy = abs(det.center[1] - center_y) / center_y
|
|
|
|
|
+ center_distance = (dx + dy) / 2 # 归一化到0-1
|
|
|
|
|
|
|
|
if det.track_id in self.tracking_targets:
|
|
if det.track_id in self.tracking_targets:
|
|
|
# 更新位置
|
|
# 更新位置
|
|
|
target = self.tracking_targets[det.track_id]
|
|
target = self.tracking_targets[det.track_id]
|
|
|
target.position = (x_ratio, y_ratio)
|
|
target.position = (x_ratio, y_ratio)
|
|
|
target.last_update = current_time
|
|
target.last_update = current_time
|
|
|
|
|
+ target.area = area
|
|
|
|
|
+ target.confidence = det.confidence
|
|
|
|
|
+ target.center_distance = center_distance
|
|
|
else:
|
|
else:
|
|
|
# 新目标
|
|
# 新目标
|
|
|
if len(self.tracking_targets) < self.config['max_tracking_targets']:
|
|
if len(self.tracking_targets) < self.config['max_tracking_targets']:
|
|
|
self.tracking_targets[det.track_id] = TrackingTarget(
|
|
self.tracking_targets[det.track_id] = TrackingTarget(
|
|
|
track_id=det.track_id,
|
|
track_id=det.track_id,
|
|
|
position=(x_ratio, y_ratio),
|
|
position=(x_ratio, y_ratio),
|
|
|
- last_update=current_time
|
|
|
|
|
|
|
+ last_update=current_time,
|
|
|
|
|
+ area=area,
|
|
|
|
|
+ confidence=det.confidence,
|
|
|
|
|
+ center_distance=center_distance
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
def _process_detections(self, detections: List[DetectedObject],
|
|
def _process_detections(self, detections: List[DetectedObject],
|
|
@@ -331,12 +514,10 @@ class Coordinator:
|
|
|
self.current_target = None
|
|
self.current_target = None
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
- # 选择优先级最高的目标(这里选择最新的)
|
|
|
|
|
- if self.current_target is None or \
|
|
|
|
|
- self.current_target.track_id not in self.tracking_targets:
|
|
|
|
|
- # 选择一个新目标
|
|
|
|
|
- target_id = list(self.tracking_targets.keys())[0]
|
|
|
|
|
- self.current_target = self.tracking_targets[target_id]
|
|
|
|
|
|
|
+ # 使用目标选择器选择最优目标
|
|
|
|
|
+ self.current_target = self.target_selector.select_target(
|
|
|
|
|
+ self.tracking_targets, frame_size
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
if self.current_target:
|
|
if self.current_target:
|
|
|
# 移动球机到目标位置 (仅在 PTZ 跟踪启用时)
|
|
# 移动球机到目标位置 (仅在 PTZ 跟踪启用时)
|
|
@@ -603,6 +784,15 @@ class AsyncCoordinator(Coordinator):
|
|
|
print("连接球机失败")
|
|
print("连接球机失败")
|
|
|
self.panorama.disconnect()
|
|
self.panorama.disconnect()
|
|
|
return False
|
|
return False
|
|
|
|
|
+
|
|
|
|
|
+ # 启动球机RTSP流(用于球机端人体检测)
|
|
|
|
|
+ if self.enable_ptz_detection:
|
|
|
|
|
+ if not self.ptz.start_stream_rtsp():
|
|
|
|
|
+ print("球机RTSP流启动失败,禁用球机端检测功能")
|
|
|
|
|
+ self.enable_ptz_detection = False
|
|
|
|
|
+ else:
|
|
|
|
|
+ # 初始化球机端人体检测器
|
|
|
|
|
+ self._init_ptz_detector()
|
|
|
else:
|
|
else:
|
|
|
print("PTZ球机功能已禁用")
|
|
print("PTZ球机功能已禁用")
|
|
|
|
|
|
|
@@ -738,10 +928,12 @@ class AsyncCoordinator(Coordinator):
|
|
|
# tracked 是 DetectedObject,使用 center 计算位置
|
|
# tracked 是 DetectedObject,使用 center 计算位置
|
|
|
x_ratio = t.center[0] / frame_size[0]
|
|
x_ratio = t.center[0] / frame_size[0]
|
|
|
y_ratio = t.center[1] / frame_size[1]
|
|
y_ratio = t.center[1] / frame_size[1]
|
|
|
|
|
+ _, _, w, h = t.bbox
|
|
|
|
|
+ area = w * h
|
|
|
logger.info(
|
|
logger.info(
|
|
|
f"[检测] ✓ 目标ID={t.track_id} "
|
|
f"[检测] ✓ 目标ID={t.track_id} "
|
|
|
f"位置=({x_ratio:.3f}, {y_ratio:.3f}) "
|
|
f"位置=({x_ratio:.3f}, {y_ratio:.3f}) "
|
|
|
- f"置信度={t.confidence:.2f}"
|
|
|
|
|
|
|
+ f"面积={area} 置信度={t.confidence:.2f}"
|
|
|
)
|
|
)
|
|
|
elif detections:
|
|
elif detections:
|
|
|
# 有检测但没跟踪上
|
|
# 有检测但没跟踪上
|
|
@@ -775,6 +967,32 @@ class AsyncCoordinator(Coordinator):
|
|
|
logger.error(f"检测线程错误: {e}")
|
|
logger.error(f"检测线程错误: {e}")
|
|
|
time.sleep(0.1)
|
|
time.sleep(0.1)
|
|
|
|
|
|
|
|
|
|
+ def _init_ptz_detector(self):
|
|
|
|
|
+ """初始化球机端人体检测器"""
|
|
|
|
|
+ try:
|
|
|
|
|
+ model_path = DETECTION_CONFIG.get('model_path')
|
|
|
|
|
+ model_type = DETECTION_CONFIG.get('model_type', 'auto')
|
|
|
|
|
+ conf_threshold = DETECTION_CONFIG.get('person_threshold', 0.5)
|
|
|
|
|
+
|
|
|
|
|
+ if model_path:
|
|
|
|
|
+ self.ptz_detector = PTZPersonDetector(
|
|
|
|
|
+ model_path=model_path,
|
|
|
|
|
+ model_type=model_type,
|
|
|
|
|
+ confidence_threshold=conf_threshold
|
|
|
|
|
+ )
|
|
|
|
|
+ self.auto_zoom_controller = PTZAutoZoomController(
|
|
|
|
|
+ ptz_camera=self.ptz,
|
|
|
|
|
+ detector=self.ptz_detector,
|
|
|
|
|
+ config=self.auto_zoom_config
|
|
|
|
|
+ )
|
|
|
|
|
+ print(f"[AsyncCoordinator] 球机端人体检测器初始化成功")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("[AsyncCoordinator] 未配置球机检测模型路径,禁用球机端检测")
|
|
|
|
|
+ self.enable_ptz_detection = False
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"[AsyncCoordinator] 球机端检测器初始化失败: {e}")
|
|
|
|
|
+ self.enable_ptz_detection = False
|
|
|
|
|
+
|
|
|
def _ptz_worker(self):
|
|
def _ptz_worker(self):
|
|
|
"""PTZ控制线程:从队列接收命令并控制球机"""
|
|
"""PTZ控制线程:从队列接收命令并控制球机"""
|
|
|
while self.running:
|
|
while self.running:
|
|
@@ -798,10 +1016,10 @@ class AsyncCoordinator(Coordinator):
|
|
|
self.current_target = None
|
|
self.current_target = None
|
|
|
return None
|
|
return None
|
|
|
|
|
|
|
|
- if self.current_target is None or \
|
|
|
|
|
- self.current_target.track_id not in self.tracking_targets:
|
|
|
|
|
- target_id = list(self.tracking_targets.keys())[0]
|
|
|
|
|
- self.current_target = self.tracking_targets[target_id]
|
|
|
|
|
|
|
+ # 使用目标选择器选择最优目标
|
|
|
|
|
+ self.current_target = self.target_selector.select_target(
|
|
|
|
|
+ self.tracking_targets
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
return self.current_target
|
|
return self.current_target
|
|
|
|
|
|
|
@@ -904,11 +1122,40 @@ class AsyncCoordinator(Coordinator):
|
|
|
|
|
|
|
|
if success:
|
|
if success:
|
|
|
time.sleep(self.PTZ_CONFIRM_WAIT)
|
|
time.sleep(self.PTZ_CONFIRM_WAIT)
|
|
|
- self._confirm_ptz_position(cmd.x_ratio, cmd.y_ratio)
|
|
|
|
|
|
|
+
|
|
|
|
|
+ # 球机端人体检测与自动对焦
|
|
|
|
|
+ if self.enable_ptz_detection and self.auto_zoom_config.get('enabled', False):
|
|
|
|
|
+ self._auto_zoom_person(pan, tilt, zoom)
|
|
|
|
|
+ else:
|
|
|
|
|
+ self._confirm_ptz_position(cmd.x_ratio, cmd.y_ratio)
|
|
|
|
|
+
|
|
|
logger.info(f"[PTZ] 到位确认完成: pan={pan:.1f}° tilt={tilt:.1f}°")
|
|
logger.info(f"[PTZ] 到位确认完成: pan={pan:.1f}° tilt={tilt:.1f}°")
|
|
|
else:
|
|
else:
|
|
|
logger.warning(f"[PTZ] 命令执行失败: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}")
|
|
logger.warning(f"[PTZ] 命令执行失败: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}")
|
|
|
|
|
|
|
|
|
|
+ def _auto_zoom_person(self, initial_pan: float, initial_tilt: float, initial_zoom: int):
|
|
|
|
|
+ """
|
|
|
|
|
+ 自动对焦人体
|
|
|
|
|
+ 在球机画面中检测人体,自动调整zoom使人体居中且大小合适
|
|
|
|
|
+ """
|
|
|
|
|
+ if self.auto_zoom_controller is None:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ logger.info("[AutoZoom] 开始自动对焦...")
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ success, final_zoom = self.auto_zoom_controller.auto_focus_loop(
|
|
|
|
|
+ get_frame_func=self.ptz.get_frame,
|
|
|
|
|
+ max_attempts=self.auto_zoom_config.get('max_adjust_attempts', 3)
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ if success:
|
|
|
|
|
+ logger.info(f"[AutoZoom] 自动对焦成功: zoom={final_zoom}")
|
|
|
|
|
+ else:
|
|
|
|
|
+ logger.warning("[AutoZoom] 自动对焦未能定位人体")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"[AutoZoom] 自动对焦异常: {e}")
|
|
|
|
|
+
|
|
|
def _confirm_ptz_position(self, x_ratio: float, y_ratio: float):
|
|
def _confirm_ptz_position(self, x_ratio: float, y_ratio: float):
|
|
|
"""PTZ位置确认:读取球机帧验证目标是否可见"""
|
|
"""PTZ位置确认:读取球机帧验证目标是否可见"""
|
|
|
if not hasattr(self.ptz, 'get_frame') or self.ptz.get_frame() is None:
|
|
if not hasattr(self.ptz, 'get_frame') or self.ptz.get_frame() is None:
|