| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177 |
- """
- 联动控制器
- 协调全景摄像头和球机的工作
- """
- import time
- import threading
- import queue
- import logging
- from typing import Optional, List, Dict, Tuple, Callable
- from dataclasses import dataclass, field
- from enum import Enum
- import numpy as np
- from config import COORDINATOR_CONFIG, SYSTEM_CONFIG, PTZ_CONFIG, DETECTION_CONFIG
- from panorama_camera import PanoramaCamera, ObjectDetector, PersonTracker, DetectedObject
- from ptz_camera import PTZCamera, PTZController
- from ocr_recognizer import NumberDetector, PersonInfo
- from ptz_person_tracker import PTZPersonDetector, PTZAutoZoomController
- logger = logging.getLogger(__name__)
- class TrackingState(Enum):
- """跟踪状态"""
- IDLE = 0 # 空闲
- SEARCHING = 1 # 搜索目标
- TRACKING = 2 # 跟踪中
- ZOOMING = 3 # 变焦中
- OCR_PROCESSING = 4 # OCR处理中
- @dataclass
- class TrackingTarget:
- """跟踪目标"""
- track_id: int # 跟踪ID
- position: Tuple[float, float] # 位置比例 (x_ratio, y_ratio)
- last_update: float # 最后更新时间
- person_info: Optional[PersonInfo] = None # 人员信息
- priority: int = 0 # 优先级
- area: int = 0 # 目标面积(像素²)
- confidence: float = 0.0 # 置信度
- center_distance: float = 1.0 # 到画面中心的距离比例(0-1)
- score: float = 0.0 # 综合得分
- class TargetSelector:
- """
- 目标选择策略类
- 支持按面积、置信度、混合模式排序,支持优先级切换
- """
-
- def __init__(self, config: Dict = None):
- """
- 初始化目标选择器
- Args:
- config: 目标选择配置
- """
- self.config = config or {
- 'strategy': 'area',
- 'area_weight': 0.6,
- 'confidence_weight': 0.4,
- 'min_area_threshold': 5000,
- 'prefer_center': True,
- 'center_weight': 0.2,
- 'switch_on_lost': True,
- 'stickiness': 0.3,
- }
- self.current_target_id: Optional[int] = None
- self.current_target_score: float = 0.0
-
- def calculate_score(self, target: TrackingTarget, frame_size: Tuple[int, int] = None) -> float:
- """
- 计算目标综合得分
- Args:
- target: 跟踪目标
- frame_size: 帧尺寸(w, h),用于计算中心距离
- Returns:
- 综合得分(0-1)
- """
- strategy = self.config.get('strategy', 'area')
- area_weight = self.config.get('area_weight', 0.6)
- conf_weight = self.config.get('confidence_weight', 0.4)
- min_area = self.config.get('min_area_threshold', 5000)
- prefer_center = self.config.get('prefer_center', False)
- center_weight = self.config.get('center_weight', 0.2)
-
- # 归一化面积得分 (对数缩放,避免大目标得分过高)
- import math
- area_score = min(1.0, math.log10(max(target.area, 1)) / 5.0) # 100000像素² ≈ 1.0
-
- # 小面积惩罚
- if target.area < min_area:
- area_score *= 0.5
-
- # 置信度得分直接使用
- conf_score = target.confidence
-
- # 中心距离得分 (距离中心越近得分越高)
- center_score = 1.0 - target.center_distance
-
- # 根据策略计算综合得分
- if strategy == 'area':
- score = area_score * 0.8 + conf_score * 0.2
- elif strategy == 'confidence':
- score = conf_score * 0.8 + area_score * 0.2
- else: # hybrid
- score = area_score * area_weight + conf_score * conf_weight
-
- # 加入中心距离权重
- if prefer_center:
- score = score * (1 - center_weight) + center_score * center_weight
-
- return score
-
- def select_target(self, targets: Dict[int, TrackingTarget],
- frame_size: Tuple[int, int] = None) -> Optional[TrackingTarget]:
- """
- 从多个目标中选择最优目标
- Args:
- targets: 目标字典 {track_id: TrackingTarget}
- frame_size: 帧尺寸
- Returns:
- 最优目标
- """
- if not targets:
- self.current_target_id = None
- return None
-
- stickiness = self.config.get('stickiness', 0.3)
- switch_on_lost = self.config.get('switch_on_lost', True)
-
- # 计算所有目标得分
- scored_targets = []
- for track_id, target in targets.items():
- target.score = self.calculate_score(target, frame_size)
- scored_targets.append((track_id, target, target.score))
-
- # 按得分排序
- scored_targets.sort(key=lambda x: x[2], reverse=True)
-
- # 检查当前目标是否仍在列表中
- if self.current_target_id is not None:
- current_exists = self.current_target_id in targets
- if current_exists:
- # 应用粘性:当前目标得分需要显著低于最优目标才切换
- best_id, best_target, best_score = scored_targets[0]
- current_target = targets[self.current_target_id]
-
- # 粘性阈值: 当前目标得分 > 最优得分 * (1 - stickiness) 时保持
- stickiness_threshold = best_score * (1 - stickiness)
- if current_target.score > stickiness_threshold:
- return current_target
-
-
- # 选择得分最高的目标
- best_id, best_target, best_score = scored_targets[0]
- self.current_target_id = best_id
- self.current_target_score = best_score
-
- logger.debug(
- f"[目标选择] 选择目标ID={best_id} 得分={best_score:.3f} "
- f"面积={best_target.area} 置信度={best_target.confidence:.2f}"
- )
-
- return best_target
-
- def get_sorted_targets(self, targets: Dict[int, TrackingTarget],
- frame_size: Tuple[int, int] = None) -> List[Tuple[TrackingTarget, float]]:
- """
- 获取按得分排序的目标列表
- Args:
- targets: 目标字典
- frame_size: 帧尺寸
- Returns:
- 排序后的目标列表 [(target, score), ...]
- """
- scored = []
- for target in targets.values():
- target.score = self.calculate_score(target, frame_size)
- scored.append((target, target.score))
-
- scored.sort(key=lambda x: x[1], reverse=True)
- return scored
-
- def set_strategy(self, strategy: str):
- """设置选择策略"""
- self.config['strategy'] = strategy
- logger.info(f"[目标选择] 策略已切换为: {strategy}")
-
- def set_stickiness(self, stickiness: float):
- """设置目标粘性"""
- self.config['stickiness'] = max(0.0, min(1.0, stickiness))
- logger.info(f"[目标选择] 粘性已设置为: {self.config['stickiness']}")
- class Coordinator:
- """
- 联动控制器
- 协调全景摄像头和球机实现联动抓拍
- """
-
- def __init__(self, panorama_camera: PanoramaCamera,
- ptz_camera: PTZCamera,
- detector: ObjectDetector = None,
- number_detector: NumberDetector = None,
- calibrator = None):
- """
- 初始化联动控制器
- Args:
- panorama_camera: 全景摄像头
- ptz_camera: 球机
- detector: 物体检测器
- number_detector: 编号检测器
- calibrator: 校准器 (用于坐标转换)
- """
- self.panorama = panorama_camera
- self.ptz = ptz_camera
- self.detector = detector
- self.number_detector = number_detector
- self.calibrator = calibrator
-
- self.config = COORDINATOR_CONFIG
-
- # 功能开关 - 从 SYSTEM_CONFIG 读取
- self.enable_ptz_camera = SYSTEM_CONFIG.get('enable_ptz_camera', True)
- self.enable_ptz_tracking = SYSTEM_CONFIG.get('enable_ptz_tracking', True)
- self.enable_calibration = SYSTEM_CONFIG.get('enable_calibration', True)
- self.enable_detection = SYSTEM_CONFIG.get('enable_detection', True)
- self.enable_ocr = SYSTEM_CONFIG.get('enable_ocr', True)
-
- # 球机端人体检测与自动对焦
- self.enable_ptz_detection = PTZ_CONFIG.get('enable_ptz_detection', False)
- self.auto_zoom_config = PTZ_CONFIG.get('auto_zoom', {})
- self.ptz_detector = None
- self.auto_zoom_controller = None
-
- # 跟踪器
- self.tracker = PersonTracker()
-
- # 状态
- self.state = TrackingState.IDLE
- self.state_lock = threading.Lock()
-
- # 跟踪目标
- self.tracking_targets: Dict[int, TrackingTarget] = {}
- self.targets_lock = threading.Lock()
-
- # 当前跟踪目标
- self.current_target: Optional[TrackingTarget] = None
-
- # 回调函数
- self.on_person_detected: Optional[Callable] = None
- self.on_number_recognized: Optional[Callable] = None
- self.on_tracking_started: Optional[Callable] = None
- self.on_tracking_stopped: Optional[Callable] = None
-
- # 控制标志
- self.running = False
- self.coordinator_thread = None
-
- # OCR频率控制
- self.last_ocr_time = 0
- self.ocr_interval = 1.0 # OCR间隔(秒),避免过于频繁调用API
-
- # PTZ优化 - 避免频繁发送相同位置的命令
- self.last_ptz_position = None
- self.ptz_position_threshold = self.config.get('ptz_position_threshold', 0.03)
-
- # 目标选择器
- self.target_selector = TargetSelector(
- self.config.get('target_selection', {})
- )
-
- # 结果队列
- self.result_queue = queue.Queue()
-
- # 性能统计
- self.stats = {
- 'frames_processed': 0,
- 'persons_detected': 0,
- 'ocr_attempts': 0,
- 'ocr_success': 0,
- 'start_time': None,
- 'last_frame_time': None,
- }
- self.stats_lock = threading.Lock()
-
- def set_calibrator(self, calibrator):
- """设置校准器"""
- self.calibrator = calibrator
-
- def _transform_position(self, x_ratio: float, y_ratio: float) -> Tuple[float, float, int]:
- """
- 将全景坐标转换为PTZ角度
- Args:
- x_ratio: X方向比例
- y_ratio: Y方向比例
- Returns:
- (pan, tilt, zoom)
- """
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- # 使用校准结果进行转换
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- zoom = 8 # 默认变倍
- else:
- # 使用默认估算
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
-
- return (pan, tilt, zoom)
-
- def start(self) -> bool:
- """
- 启动联动系统
- Returns:
- 是否成功
- """
- # 连接全景摄像头
- if not self.panorama.connect():
- print("连接全景摄像头失败")
- return False
-
- # 连接 PTZ 球机 (可选)
- if self.enable_ptz_camera:
- if not self.ptz.connect():
- print("连接球机失败")
- self.panorama.disconnect()
- return False
- else:
- print("PTZ 球机功能已禁用")
-
- # 启动视频流(优先RTSP,SDK回调不可用时回退)
- if not self.panorama.start_stream_rtsp():
- print("RTSP视频流启动失败,尝试SDK方式...")
- if not self.panorama.start_stream():
- print("启动视频流失败")
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
- return False
-
- # 启动联动线程
- self.running = True
- self.coordinator_thread = threading.Thread(target=self._coordinator_worker, daemon=True)
- self.coordinator_thread.start()
-
- print("联动系统已启动")
- return True
-
- def stop(self):
- """停止联动系统"""
- self.running = False
-
- if self.coordinator_thread:
- self.coordinator_thread.join(timeout=3)
-
- self.panorama.disconnect()
-
- if self.enable_ptz_camera:
- self.ptz.disconnect()
-
- # 打印统计信息
- self._print_stats()
-
- print("联动系统已停止")
-
- def _update_stats(self, key: str, value: int = 1):
- """更新统计信息"""
- with self.stats_lock:
- if key in self.stats:
- self.stats[key] += value
-
- def _print_stats(self):
- """打印统计信息"""
- with self.stats_lock:
- if self.stats['start_time'] and self.stats['frames_processed'] > 0:
- elapsed = time.time() - self.stats['start_time']
- fps = self.stats['frames_processed'] / elapsed
- print("\n=== 性能统计 ===")
- print(f"运行时长: {elapsed:.1f}秒")
- print(f"处理帧数: {self.stats['frames_processed']}")
- print(f"平均帧率: {fps:.1f} fps")
- print(f"检测人体: {self.stats['persons_detected']}次")
- print(f"OCR尝试: {self.stats['ocr_attempts']}次")
- print(f"OCR成功: {self.stats['ocr_success']}次")
- print("================\n")
-
- def get_stats(self) -> dict:
- """获取统计信息"""
- with self.stats_lock:
- return self.stats.copy()
-
- def _coordinator_worker(self):
- """联动工作线程"""
- last_detection_time = 0
- # 优先使用 detection_fps,默认每秒2帧
- detection_fps = self.config.get('detection_fps', 2)
- detection_interval = 1.0 / detection_fps # 根据FPS计算间隔
-
- # 初始化统计
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- while self.running:
- try:
- current_time = time.time()
-
- # 获取当前帧
- frame = self.panorama.get_frame()
- if frame is None:
- time.sleep(0.01)
- continue
-
- # 更新帧统计
- self._update_stats('frames_processed')
-
- frame_size = (frame.shape[1], frame.shape[0])
-
- # 周期性检测
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
-
- # 检测人体
- detections = self._detect_persons(frame)
-
- # 更新检测统计
- if detections:
- self._update_stats('persons_detected', len(detections))
-
- # 更新跟踪
- tracked = self.tracker.update(detections)
-
- # 更新跟踪目标
- self._update_tracking_targets(tracked, frame_size)
-
- # 处理检测结果
- if tracked:
- self._process_detections(tracked, frame, frame_size)
-
- # 处理当前跟踪目标
- self._process_current_target(frame, frame_size)
-
- # 清理过期目标
- self._cleanup_expired_targets()
-
- time.sleep(0.01)
-
- except Exception as e:
- print(f"联动处理错误: {e}")
- time.sleep(0.1)
-
- def _detect_persons(self, frame: np.ndarray) -> List[DetectedObject]:
- """检测人体"""
- if not self.enable_detection or self.detector is None:
- return []
- return self.detector.detect_persons(frame)
-
- def _update_tracking_targets(self, detections: List[DetectedObject],
- frame_size: Tuple[int, int]):
- """更新跟踪目标"""
- current_time = time.time()
- frame_w, frame_h = frame_size
- center_x, center_y = frame_w / 2, frame_h / 2
-
- with self.targets_lock:
- # 更新现有目标
- for det in detections:
- if det.track_id is None:
- continue
-
- x_ratio = det.center[0] / frame_w
- y_ratio = det.center[1] / frame_h
-
- # 计算面积
- _, _, width, height = det.bbox
- area = width * height
-
- # 计算到画面中心的距离比例
- dx = abs(det.center[0] - center_x) / center_x
- dy = abs(det.center[1] - center_y) / center_y
- center_distance = (dx + dy) / 2 # 归一化到0-1
-
- if det.track_id in self.tracking_targets:
- # 更新位置
- target = self.tracking_targets[det.track_id]
- target.position = (x_ratio, y_ratio)
- target.last_update = current_time
- target.area = area
- target.confidence = det.confidence
- target.center_distance = center_distance
- else:
- # 新目标
- if len(self.tracking_targets) < self.config['max_tracking_targets']:
- self.tracking_targets[det.track_id] = TrackingTarget(
- track_id=det.track_id,
- position=(x_ratio, y_ratio),
- last_update=current_time,
- area=area,
- confidence=det.confidence,
- center_distance=center_distance
- )
-
- def _process_detections(self, detections: List[DetectedObject],
- frame: np.ndarray, frame_size: Tuple[int, int]):
- """处理检测结果"""
- if self.on_person_detected:
- for det in detections:
- self.on_person_detected(det, frame)
-
- def _process_current_target(self, frame: np.ndarray, frame_size: Tuple[int, int]):
- """处理当前跟踪目标"""
- with self.targets_lock:
- if not self.tracking_targets:
- self._set_state(TrackingState.IDLE)
- self.current_target = None
- return
-
- # 使用目标选择器选择最优目标
- self.current_target = self.target_selector.select_target(
- self.tracking_targets, frame_size
- )
-
- if self.current_target:
- # 移动球机到目标位置 (仅在 PTZ 跟踪启用时)
- if self.enable_ptz_tracking and self.enable_ptz_camera:
- self._set_state(TrackingState.TRACKING)
-
- x_ratio, y_ratio = self.current_target.position
-
- # 检查位置是否变化超过阈值
- should_move = True
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- if (abs(x_ratio - last_x) < self.ptz_position_threshold and
- abs(y_ratio - last_y) < self.ptz_position_threshold):
- should_move = False
-
- if should_move:
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- self.ptz.goto_exact_position(pan, tilt, zoom)
- else:
- self.ptz.track_target(x_ratio, y_ratio)
- self.last_ptz_position = (x_ratio, y_ratio)
-
- # 执行OCR识别 (仅在 OCR 启用时)
- if self.enable_ocr:
- self._perform_ocr(frame, self.current_target)
-
- def _perform_ocr(self, frame: np.ndarray, target: TrackingTarget):
- """执行OCR识别"""
- if not self.enable_ocr or self.number_detector is None:
- return
-
- # 频率控制 - 避免过于频繁调用OCR API
- current_time = time.time()
- if current_time - self.last_ocr_time < self.ocr_interval:
- return
- self.last_ocr_time = current_time
-
- # 更新OCR尝试统计
- self._update_stats('ocr_attempts')
-
- # 计算人体边界框 (基于位置估算)
- frame_h, frame_w = frame.shape[:2]
-
- # 人体占画面比例 (可配置,默认宽20%、高40%)
- person_width_ratio = self.config.get('person_width_ratio', 0.2)
- person_height_ratio = self.config.get('person_height_ratio', 0.4)
-
- person_width = int(frame_w * person_width_ratio)
- person_height = int(frame_h * person_height_ratio)
-
- x_ratio, y_ratio = target.position
- center_x = int(x_ratio * frame_w)
- center_y = int(y_ratio * frame_h)
-
- # 计算边界框,确保不超出画面范围
- x1 = max(0, center_x - person_width // 2)
- y1 = max(0, center_y - person_height // 2)
- x2 = min(frame_w, x1 + person_width)
- y2 = min(frame_h, y1 + person_height)
-
- # 更新实际宽高 (可能因边界裁剪而变小)
- actual_width = x2 - x1
- actual_height = y2 - y1
-
- person_bbox = (x1, y1, actual_width, actual_height)
-
- # 检测编号
- self._set_state(TrackingState.OCR_PROCESSING)
- person_info = self.number_detector.detect_number(frame, person_bbox)
- person_info.person_id = target.track_id
-
- # 更新OCR成功统计
- if person_info.number_text:
- self._update_stats('ocr_success')
-
- # 更新目标信息
- with self.targets_lock:
- if target.track_id in self.tracking_targets:
- self.tracking_targets[target.track_id].person_info = person_info
-
- # 回调
- if self.on_number_recognized and person_info.number_text:
- self.on_number_recognized(person_info)
-
- # 放入结果队列
- self.result_queue.put(person_info)
-
- def _cleanup_expired_targets(self):
- """清理过期目标"""
- current_time = time.time()
- timeout = self.config['tracking_timeout']
-
- with self.targets_lock:
- expired_ids = [
- target_id for target_id, target in self.tracking_targets.items()
- if current_time - target.last_update > timeout
- ]
-
- for target_id in expired_ids:
- del self.tracking_targets[target_id]
- if self.current_target and self.current_target.track_id == target_id:
- self.current_target = None
-
- def _set_state(self, state: TrackingState):
- """设置状态"""
- with self.state_lock:
- self.state = state
-
- def get_state(self) -> TrackingState:
- """获取状态"""
- with self.state_lock:
- return self.state
-
- def get_results(self) -> List[PersonInfo]:
- """
- 获取识别结果
- Returns:
- 人员信息列表
- """
- results = []
- while not self.result_queue.empty():
- try:
- results.append(self.result_queue.get_nowait())
- except queue.Empty:
- break
- return results
-
- def get_tracking_targets(self) -> List[TrackingTarget]:
- """获取当前跟踪目标"""
- with self.targets_lock:
- return list(self.tracking_targets.values())
-
- def force_track_position(self, x_ratio: float, y_ratio: float, zoom: int = None):
- """
- 强制跟踪指定位置
- Args:
- x_ratio: X方向比例
- y_ratio: Y方向比例
- zoom: 变倍
- """
- if self.enable_ptz_tracking and self.enable_ptz_camera:
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- self.ptz.goto_exact_position(pan, tilt, zoom or self.ptz.ptz_config.get('default_zoom', 8))
- else:
- self.ptz.move_to_target(x_ratio, y_ratio, zoom)
-
- def capture_snapshot(self) -> Optional[np.ndarray]:
- """
- 抓拍快照
- Returns:
- 快照图像
- """
- return self.panorama.get_frame()
- class EventDrivenCoordinator(Coordinator):
- """事件驱动联动控制器,当全景摄像头检测到事件时触发联动"""
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.event_types = {
- 'intruder': True,
- 'crossline': True,
- 'motion': True,
- }
- self.event_queue = queue.Queue()
-
- def on_event(self, event_type: str, event_data: dict):
- if not self.event_types.get(event_type, False):
- return
- self.event_queue.put({'type': event_type, 'data': event_data, 'time': time.time()})
-
- def _coordinator_worker(self):
- while self.running:
- try:
- try:
- event = self.event_queue.get(timeout=0.1)
- self._process_event(event)
- except queue.Empty:
- pass
-
- frame = self.panorama.get_frame()
- if frame is not None:
- frame_size = (frame.shape[1], frame.shape[0])
- detections = self._detect_persons(frame)
- if detections:
- tracked = self.tracker.update(detections)
- self._update_tracking_targets(tracked, frame_size)
- self._process_current_target(frame, frame_size)
-
- self._cleanup_expired_targets()
- except Exception as e:
- print(f"事件处理错误: {e}")
- time.sleep(0.1)
-
- def _process_event(self, event: dict):
- event_type = event['type']
- event_data = event['data']
- print(f"处理事件: {event_type}")
-
- if event_type == 'intruder' and 'position' in event_data:
- x_ratio, y_ratio = event_data['position']
- self.force_track_position(x_ratio, y_ratio)
- @dataclass
- class PTZCommand:
- """PTZ控制命令"""
- pan: float
- tilt: float
- zoom: int
- x_ratio: float = 0.0
- y_ratio: float = 0.0
- use_calibration: bool = True
- class AsyncCoordinator(Coordinator):
- """
- 异步联动控制器 — 检测线程与PTZ控制线程分离
-
- 改进:
- 1. 检测线程:持续读取全景帧 + YOLO推理
- 2. PTZ控制线程:通过命令队列接收目标位置,独立控制球机
- 3. 两线程通过 queue 通信,互不阻塞
- 4. PTZ位置确认:移动后等待球机到位并验证帧
- """
-
- PTZ_CONFIRM_WAIT = 0.3 # PTZ命令后等待稳定的秒数
- PTZ_CONFIRM_TIMEOUT = 2.0 # PTZ位置确认超时
- PTZ_COMMAND_COOLDOWN = 0.15 # PTZ命令最小间隔秒数
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- # PTZ命令队列(检测→PTZ)
- self._ptz_queue: queue.Queue = queue.Queue(maxsize=10)
-
- # 线程
- self._detection_thread = None
- self._ptz_thread = None
-
- # PTZ确认回调
- self._on_ptz_confirmed: Optional[Callable] = None
-
- # 上次PTZ命令时间
- self._last_ptz_time = 0.0
-
- def start(self) -> bool:
- """启动联动(覆盖父类,启动双线程)"""
- if not self.panorama.connect():
- print("连接全景摄像头失败")
- return False
-
- if self.enable_ptz_camera:
- if not self.ptz.connect():
- print("连接球机失败")
- self.panorama.disconnect()
- return False
-
- # 启动球机RTSP流(用于球机端人体检测)
- if self.enable_ptz_detection:
- if not self.ptz.start_stream_rtsp():
- print("球机RTSP流启动失败,禁用球机端检测功能")
- self.enable_ptz_detection = False
- else:
- # 初始化球机端人体检测器
- self._init_ptz_detector()
- else:
- print("PTZ球机功能已禁用")
-
- if not self.panorama.start_stream_rtsp():
- print("RTSP视频流启动失败,尝试SDK方式...")
- if not self.panorama.start_stream():
- print("启动视频流失败")
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
- return False
-
- self.running = True
-
- # 启动检测线程
- self._detection_thread = threading.Thread(
- target=self._detection_worker, name="detection-worker", daemon=True)
- self._detection_thread.start()
-
- # 启动PTZ控制线程
- if self.enable_ptz_camera and self.enable_ptz_tracking:
- self._ptz_thread = threading.Thread(
- target=self._ptz_worker, name="ptz-worker", daemon=True)
- self._ptz_thread.start()
-
- print("异步联动系统已启动 (检测线程 + PTZ控制线程)")
- return True
-
- def stop(self):
- """停止联动"""
- self.running = False
-
- # 清空PTZ队列,让工作线程退出
- while not self._ptz_queue.empty():
- try:
- self._ptz_queue.get_nowait()
- except queue.Empty:
- break
-
- if self._detection_thread:
- self._detection_thread.join(timeout=3)
- if self._ptz_thread:
- self._ptz_thread.join(timeout=3)
-
- # 停止父类线程(如果有的话)
- if self.coordinator_thread:
- self.coordinator_thread.join(timeout=1)
-
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
-
- self._print_stats()
- print("异步联动系统已停止")
-
- def _detection_worker(self):
- """检测线程:持续读帧 + YOLO推理 + 发送PTZ命令 + 打印检测日志"""
- last_detection_time = 0
- # 优先使用 detection_fps,默认每秒2帧
- detection_fps = self.config.get('detection_fps', 2)
- detection_interval = 1.0 / detection_fps # 根据FPS计算间隔
- ptz_cooldown = self.config.get('ptz_command_cooldown', 0.5)
- ptz_threshold = self.config.get('ptz_position_threshold', 0.03)
- frame_count = 0
- last_log_time = time.time()
- log_interval = 5.0 # 每5秒打印一次帧率统计
- detection_run_count = 0
- detection_person_count = 0
- last_no_detect_log_time = 0
- no_detect_log_interval = 30.0
-
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- if self.detector is None:
- logger.warning("[检测线程] ⚠️ 人体检测器未初始化! 检测功能不可用, 请检查 YOLO 模型是否正确加载")
- elif not self.enable_detection:
- logger.warning("[检测线程] ⚠️ 人体检测已禁用 (enable_detection=False)")
- else:
- logger.info(f"[检测线程] ✓ 人体检测器已就绪, 检测帧率={detection_fps}fps(间隔={detection_interval:.2f}s), PTZ冷却={ptz_cooldown}s")
-
- while self.running:
- try:
- current_time = time.time()
- frame = self.panorama.get_frame()
- if frame is None:
- time.sleep(0.01)
- continue
-
- frame_count += 1
- self._update_stats('frames_processed')
- frame_size = (frame.shape[1], frame.shape[0])
-
- if current_time - last_log_time >= log_interval:
- elapsed = current_time - last_log_time
- fps = frame_count / elapsed if elapsed > 0 else 0
-
- state_str = self.state.name if hasattr(self.state, 'name') else str(self.state)
- stats_parts = [f"帧率={fps:.1f}fps", f"处理帧={frame_count}", f"状态={state_str}"]
-
- if self.detector is None:
- stats_parts.append("检测器=未加载")
- elif not self.enable_detection:
- stats_parts.append("检测=已禁用")
- else:
- stats_parts.append(f"检测轮次={detection_run_count}(有人={detection_person_count})")
-
- with self.targets_lock:
- target_count = len(self.tracking_targets)
- stats_parts.append(f"跟踪目标={target_count}")
-
- logger.info(f"[检测线程] {', '.join(stats_parts)}")
- frame_count = 0
- last_log_time = current_time
-
- # 周期性检测(约1次/秒)
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
- detection_run_count += 1
-
- # YOLO 人体检测
- detections = self._detect_persons(frame)
-
- if detections:
- self._update_stats('persons_detected', len(detections))
- detection_person_count += 1
-
- # 更新跟踪
- tracked = self.tracker.update(detections)
- self._update_tracking_targets(tracked, frame_size)
-
- # 打印检测日志
- if tracked:
- for t in tracked:
- # tracked 是 DetectedObject,使用 center 计算位置
- x_ratio = t.center[0] / frame_size[0]
- y_ratio = t.center[1] / frame_size[1]
- _, _, w, h = t.bbox
- area = w * h
- logger.info(
- f"[检测] ✓ 目标ID={t.track_id} "
- f"位置=({x_ratio:.3f}, {y_ratio:.3f}) "
- f"面积={area} 置信度={t.confidence:.2f}"
- )
- elif detections:
- # 有检测但没跟踪上
- for d in detections:
- logger.debug(f"[检测] 未跟踪: {d.class_name} @ {d.center}")
- else:
- if current_time - last_no_detect_log_time >= no_detect_log_interval:
- logger.info(
- f"[检测] · YOLO检测运行正常, 本轮未检测到人员 "
- f"(累计检测{detection_run_count}轮, 检测到人{detection_person_count}轮)"
- )
- last_no_detect_log_time = current_time
-
- if tracked:
- self._process_detections(tracked, frame, frame_size)
-
- # 选择跟踪目标并发送PTZ命令
- target = self._select_tracking_target()
- if target and self.enable_ptz_tracking and self.enable_ptz_camera:
- self._send_ptz_command_with_log(target, frame_size)
- elif not tracked and self.current_target:
- # 目标消失,切回IDLE
- self._set_state(TrackingState.IDLE)
- logger.info("[检测] 目标丢失,球机进入IDLE状态")
- self.current_target = None
-
- self._cleanup_expired_targets()
- time.sleep(0.01)
-
- except Exception as e:
- logger.error(f"检测线程错误: {e}")
- time.sleep(0.1)
-
- def _init_ptz_detector(self):
- """初始化球机端人体检测器"""
- try:
- model_path = DETECTION_CONFIG.get('model_path')
- model_type = DETECTION_CONFIG.get('model_type', 'auto')
- conf_threshold = DETECTION_CONFIG.get('person_threshold', 0.5)
-
- if model_path:
- self.ptz_detector = PTZPersonDetector(
- model_path=model_path,
- model_type=model_type,
- confidence_threshold=conf_threshold
- )
- self.auto_zoom_controller = PTZAutoZoomController(
- ptz_camera=self.ptz,
- detector=self.ptz_detector,
- config=self.auto_zoom_config
- )
- print(f"[AsyncCoordinator] 球机端人体检测器初始化成功")
- else:
- print("[AsyncCoordinator] 未配置球机检测模型路径,禁用球机端检测")
- self.enable_ptz_detection = False
- except Exception as e:
- print(f"[AsyncCoordinator] 球机端检测器初始化失败: {e}")
- self.enable_ptz_detection = False
- def _ptz_worker(self):
- """PTZ控制线程:从队列接收命令并控制球机"""
- while self.running:
- try:
- try:
- cmd = self._ptz_queue.get(timeout=0.1)
- except queue.Empty:
- continue
-
- self._execute_ptz_command(cmd)
-
- except Exception as e:
- print(f"PTZ控制线程错误: {e}")
- time.sleep(0.05)
-
- def _select_tracking_target(self) -> Optional[TrackingTarget]:
- """选择当前跟踪目标"""
- with self.targets_lock:
- if not self.tracking_targets:
- self._set_state(TrackingState.IDLE)
- self.current_target = None
- return None
-
- # 使用目标选择器选择最优目标
- self.current_target = self.target_selector.select_target(
- self.tracking_targets
- )
-
- return self.current_target
-
- def _send_ptz_command(self, target: TrackingTarget, frame_size: Tuple[int, int]):
- """将跟踪目标转化为PTZ命令放入队列"""
- x_ratio, y_ratio = target.position
-
- # 检查位置变化是否超过阈值
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- if abs(x_ratio - last_x) < self.ptz_position_threshold and \
- abs(y_ratio - last_y) < self.ptz_position_threshold:
- return
-
- # 冷却检查
- current_time = time.time()
- if current_time - self._last_ptz_time < self.PTZ_COMMAND_COOLDOWN:
- return
-
- cmd = PTZCommand(
- pan=0, tilt=0, zoom=0,
- x_ratio=x_ratio, y_ratio=y_ratio,
- use_calibration=self.enable_calibration
- )
-
- try:
- self._ptz_queue.put_nowait(cmd)
- self.last_ptz_position = (x_ratio, y_ratio)
- except queue.Full:
- pass # 丢弃命令,下一个检测周期会重发
-
- def _send_ptz_command_with_log(self, target: TrackingTarget, frame_size: Tuple[int, int]):
- """发送PTZ命令并打印日志"""
- x_ratio, y_ratio = target.position
-
- # 计算PTZ角度(用于日志)
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- coord_type = "校准坐标"
- else:
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
- coord_type = "估算坐标"
-
- logger.info(
- f"[PTZ] 发送命令: 目标ID={target.track_id} "
- f"全景位置=({x_ratio:.3f}, {y_ratio:.3f}) → "
- f"PTZ角度=(pan={pan:.1f}°, tilt={tilt:.1f}°, zoom={zoom}) [{coord_type}]"
- )
-
- # 检查位置变化是否超过阈值
- ptz_threshold = self.config.get('ptz_position_threshold', 0.03)
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- dx = abs(x_ratio - last_x)
- dy = abs(y_ratio - last_y)
- if dx < ptz_threshold and dy < ptz_threshold:
- logger.debug(f"[PTZ] 位置变化太小(dx={dx:.4f}, dy={dy:.4f}),跳过")
- return
-
- # 冷却检查
- current_time = time.time()
- ptz_cooldown = self.config.get('ptz_command_cooldown', 0.5)
- if current_time - self._last_ptz_time < ptz_cooldown:
- logger.debug(f"[PTZ] 冷却中,跳过 (间隔={current_time - self._last_ptz_time:.2f}s < {ptz_cooldown}s)")
- return
-
- cmd = PTZCommand(
- pan=0, tilt=0, zoom=0,
- x_ratio=x_ratio, y_ratio=y_ratio,
- use_calibration=self.enable_calibration
- )
-
- try:
- self._ptz_queue.put_nowait(cmd)
- self.last_ptz_position = (x_ratio, y_ratio)
- self._update_stats('ptz_commands_sent' if 'ptz_commands_sent' in self.stats else 'persons_detected')
- except queue.Full:
- logger.warning("[PTZ] 命令队列满,丢弃本次命令")
-
- def _execute_ptz_command(self, cmd: PTZCommand):
- """执行PTZ命令(在PTZ线程中)"""
- self._last_ptz_time = time.time()
-
- if cmd.use_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(cmd.x_ratio, cmd.y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- else:
- pan, tilt, zoom = self.ptz.calculate_ptz_position(cmd.x_ratio, cmd.y_ratio)
-
- self._set_state(TrackingState.TRACKING)
- logger.info(
- f"[PTZ] 执行: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom} "
- f"(全景位置=({cmd.x_ratio:.3f}, {cmd.y_ratio:.3f}))"
- )
-
- success = self.ptz.goto_exact_position(pan, tilt, zoom)
-
- if success:
- time.sleep(self.PTZ_CONFIRM_WAIT)
-
- # 球机端人体检测与自动对焦
- if self.enable_ptz_detection and self.auto_zoom_config.get('enabled', False):
- self._auto_zoom_person(pan, tilt, zoom)
- else:
- self._confirm_ptz_position(cmd.x_ratio, cmd.y_ratio)
-
- logger.info(f"[PTZ] 到位确认完成: pan={pan:.1f}° tilt={tilt:.1f}°")
- else:
- logger.warning(f"[PTZ] 命令执行失败: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}")
-
- def _auto_zoom_person(self, initial_pan: float, initial_tilt: float, initial_zoom: int):
- """
- 自动对焦人体
- 在球机画面中检测人体,自动调整zoom使人体居中且大小合适
- """
- if self.auto_zoom_controller is None:
- return
-
- logger.info("[AutoZoom] 开始自动对焦...")
-
- try:
- success, final_zoom = self.auto_zoom_controller.auto_focus_loop(
- get_frame_func=self.ptz.get_frame,
- max_attempts=self.auto_zoom_config.get('max_adjust_attempts', 3)
- )
-
- if success:
- logger.info(f"[AutoZoom] 自动对焦成功: zoom={final_zoom}")
- else:
- logger.warning("[AutoZoom] 自动对焦未能定位人体")
- except Exception as e:
- logger.error(f"[AutoZoom] 自动对焦异常: {e}")
-
- def _confirm_ptz_position(self, x_ratio: float, y_ratio: float):
- """PTZ位置确认:读取球机帧验证目标是否可见"""
- if not hasattr(self.ptz, 'get_frame') or self.ptz.get_frame() is None:
- return
-
- ptz_frame = self.ptz.get_frame()
- if ptz_frame is None:
- return
-
- # 未来可以在这里添加球机帧目标验证逻辑
- # 例如:在球机帧中检测目标是否在画面中心附近
-
- def on_ptz_confirmed(self, callback: Callable):
- """注册PTZ位置确认回调"""
- self._on_ptz_confirmed = callback
|