| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284 |
- """
- 联动控制器
- 协调全景摄像头和球机的工作
- """
- import time
- import threading
- import queue
- import logging
- from typing import Optional, List, Dict, Tuple, Callable
- from dataclasses import dataclass, field
- from enum import Enum
- import numpy as np
- from config import COORDINATOR_CONFIG, SYSTEM_CONFIG, PTZ_CONFIG, DETECTION_CONFIG
- from panorama_camera import PanoramaCamera, ObjectDetector, PersonTracker, DetectedObject
- from ptz_camera import PTZCamera, PTZController
- from ocr_recognizer import NumberDetector, PersonInfo
- from ptz_person_tracker import PTZPersonDetector, PTZAutoZoomController
- from paired_image_saver import PairedImageSaver, get_paired_saver
- logger = logging.getLogger(__name__)
- class TrackingState(Enum):
- """跟踪状态"""
- IDLE = 0 # 空闲
- SEARCHING = 1 # 搜索目标
- TRACKING = 2 # 跟踪中
- ZOOMING = 3 # 变焦中
- OCR_PROCESSING = 4 # OCR处理中
- @dataclass
- class TrackingTarget:
- """跟踪目标"""
- track_id: int # 跟踪ID
- position: Tuple[float, float] # 位置比例 (x_ratio, y_ratio)
- last_update: float # 最后更新时间
- person_info: Optional[PersonInfo] = None # 人员信息
- priority: int = 0 # 优先级
- area: int = 0 # 目标面积(像素²)
- confidence: float = 0.0 # 置信度
- center_distance: float = 1.0 # 到画面中心的距离比例(0-1)
- score: float = 0.0 # 综合得分
- class TargetSelector:
- """
- 目标选择策略类
- 支持按面积、置信度、混合模式排序,支持优先级切换
- """
-
- def __init__(self, config: Dict = None):
- """
- 初始化目标选择器
- Args:
- config: 目标选择配置
- """
- self.config = config or {
- 'strategy': 'area',
- 'area_weight': 0.6,
- 'confidence_weight': 0.4,
- 'min_area_threshold': 5000,
- 'prefer_center': True,
- 'center_weight': 0.2,
- 'switch_on_lost': True,
- 'stickiness': 0.3,
- }
- self.current_target_id: Optional[int] = None
- self.current_target_score: float = 0.0
-
- def calculate_score(self, target: TrackingTarget, frame_size: Tuple[int, int] = None) -> float:
- """
- 计算目标综合得分
- Args:
- target: 跟踪目标
- frame_size: 帧尺寸(w, h),用于计算中心距离
- Returns:
- 综合得分(0-1)
- """
- strategy = self.config.get('strategy', 'area')
- area_weight = self.config.get('area_weight', 0.6)
- conf_weight = self.config.get('confidence_weight', 0.4)
- min_area = self.config.get('min_area_threshold', 5000)
- prefer_center = self.config.get('prefer_center', False)
- center_weight = self.config.get('center_weight', 0.2)
-
- # 归一化面积得分 (对数缩放,避免大目标得分过高)
- import math
- area_score = min(1.0, math.log10(max(target.area, 1)) / 5.0) # 100000像素² ≈ 1.0
-
- # 小面积惩罚
- if target.area < min_area:
- area_score *= 0.5
-
- # 置信度得分直接使用
- conf_score = target.confidence
-
- # 中心距离得分 (距离中心越近得分越高)
- center_score = 1.0 - target.center_distance
-
- # 根据策略计算综合得分
- if strategy == 'area':
- score = area_score * 0.8 + conf_score * 0.2
- elif strategy == 'confidence':
- score = conf_score * 0.8 + area_score * 0.2
- else: # hybrid
- score = area_score * area_weight + conf_score * conf_weight
-
- # 加入中心距离权重
- if prefer_center:
- score = score * (1 - center_weight) + center_score * center_weight
-
- return score
-
- def select_target(self, targets: Dict[int, TrackingTarget],
- frame_size: Tuple[int, int] = None) -> Optional[TrackingTarget]:
- """
- 从多个目标中选择最优目标
- Args:
- targets: 目标字典 {track_id: TrackingTarget}
- frame_size: 帧尺寸
- Returns:
- 最优目标
- """
- if not targets:
- self.current_target_id = None
- return None
-
- stickiness = self.config.get('stickiness', 0.3)
- switch_on_lost = self.config.get('switch_on_lost', True)
-
- # 计算所有目标得分
- scored_targets = []
- for track_id, target in targets.items():
- target.score = self.calculate_score(target, frame_size)
- scored_targets.append((track_id, target, target.score))
-
- # 按得分排序
- scored_targets.sort(key=lambda x: x[2], reverse=True)
-
- # 检查当前目标是否仍在列表中
- if self.current_target_id is not None:
- current_exists = self.current_target_id in targets
- if current_exists:
- # 应用粘性:当前目标得分需要显著低于最优目标才切换
- best_id, best_target, best_score = scored_targets[0]
- current_target = targets[self.current_target_id]
-
- # 粘性阈值: 当前目标得分 > 最优得分 * (1 - stickiness) 时保持
- stickiness_threshold = best_score * (1 - stickiness)
- if current_target.score > stickiness_threshold:
- return current_target
-
-
- # 选择得分最高的目标
- best_id, best_target, best_score = scored_targets[0]
- self.current_target_id = best_id
- self.current_target_score = best_score
-
- logger.debug(
- f"[目标选择] 选择目标ID={best_id} 得分={best_score:.3f} "
- f"面积={best_target.area} 置信度={best_target.confidence:.2f}"
- )
-
- return best_target
-
- def get_sorted_targets(self, targets: Dict[int, TrackingTarget],
- frame_size: Tuple[int, int] = None) -> List[Tuple[TrackingTarget, float]]:
- """
- 获取按得分排序的目标列表
- Args:
- targets: 目标字典
- frame_size: 帧尺寸
- Returns:
- 排序后的目标列表 [(target, score), ...]
- """
- scored = []
- for target in targets.values():
- target.score = self.calculate_score(target, frame_size)
- scored.append((target, target.score))
-
- scored.sort(key=lambda x: x[1], reverse=True)
- return scored
-
- def set_strategy(self, strategy: str):
- """设置选择策略"""
- self.config['strategy'] = strategy
- logger.info(f"[目标选择] 策略已切换为: {strategy}")
-
- def set_stickiness(self, stickiness: float):
- """设置目标粘性"""
- self.config['stickiness'] = max(0.0, min(1.0, stickiness))
- logger.info(f"[目标选择] 粘性已设置为: {self.config['stickiness']}")
- class Coordinator:
- """
- 联动控制器
- 协调全景摄像头和球机实现联动抓拍
- """
-
- def __init__(self, panorama_camera: PanoramaCamera,
- ptz_camera: PTZCamera,
- detector: ObjectDetector = None,
- number_detector: NumberDetector = None,
- calibrator = None):
- """
- 初始化联动控制器
- Args:
- panorama_camera: 全景摄像头
- ptz_camera: 球机
- detector: 物体检测器
- number_detector: 编号检测器
- calibrator: 校准器 (用于坐标转换)
- """
- self.panorama = panorama_camera
- self.ptz = ptz_camera
- self.detector = detector
- self.number_detector = number_detector
- self.calibrator = calibrator
-
- self.config = COORDINATOR_CONFIG
-
- # 功能开关 - 从 SYSTEM_CONFIG 读取
- self.enable_ptz_camera = SYSTEM_CONFIG.get('enable_ptz_camera', True)
- self.enable_ptz_tracking = SYSTEM_CONFIG.get('enable_ptz_tracking', True)
- self.enable_calibration = SYSTEM_CONFIG.get('enable_calibration', True)
- self.enable_detection = SYSTEM_CONFIG.get('enable_detection', True)
- self.enable_ocr = SYSTEM_CONFIG.get('enable_ocr', True)
-
- # 球机端人体检测与自动对焦
- self.enable_ptz_detection = PTZ_CONFIG.get('enable_ptz_detection', False)
- self.auto_zoom_config = PTZ_CONFIG.get('auto_zoom', {})
- self.ptz_detector = None
- self.auto_zoom_controller = None
-
- # 跟踪器
- self.tracker = PersonTracker()
-
- # 状态
- self.state = TrackingState.IDLE
- self.state_lock = threading.Lock()
-
- # 跟踪目标
- self.tracking_targets: Dict[int, TrackingTarget] = {}
- self.targets_lock = threading.Lock()
-
- # 当前跟踪目标
- self.current_target: Optional[TrackingTarget] = None
-
- # 回调函数
- self.on_person_detected: Optional[Callable] = None
- self.on_number_recognized: Optional[Callable] = None
- self.on_tracking_started: Optional[Callable] = None
- self.on_tracking_stopped: Optional[Callable] = None
-
- # 控制标志
- self.running = False
- self.coordinator_thread = None
-
- # OCR频率控制
- self.last_ocr_time = 0
- self.ocr_interval = 1.0 # OCR间隔(秒),避免过于频繁调用API
-
- # PTZ优化 - 避免频繁发送相同位置的命令
- self.last_ptz_position = None
- self.ptz_position_threshold = self.config.get('ptz_position_threshold', 0.03)
-
- # 目标选择器
- self.target_selector = TargetSelector(
- self.config.get('target_selection', {})
- )
-
- # 结果队列
- self.result_queue = queue.Queue()
-
- # 性能统计
- self.stats = {
- 'frames_processed': 0,
- 'persons_detected': 0,
- 'ocr_attempts': 0,
- 'ocr_success': 0,
- 'start_time': None,
- 'last_frame_time': None,
- }
- self.stats_lock = threading.Lock()
-
- def set_calibrator(self, calibrator):
- """设置校准器"""
- self.calibrator = calibrator
-
- def _transform_position(self, x_ratio: float, y_ratio: float) -> Tuple[float, float, int]:
- """
- 将全景坐标转换为PTZ角度
- Args:
- x_ratio: X方向比例
- y_ratio: Y方向比例
- Returns:
- (pan, tilt, zoom)
- """
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- # 使用校准结果进行转换
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- zoom = 8 # 默认变倍
- else:
- # 使用默认估算
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
-
- return (pan, tilt, zoom)
-
- def start(self) -> bool:
- """
- 启动联动系统
- Returns:
- 是否成功
- """
- # 连接全景摄像头
- if not self.panorama.connect():
- print("连接全景摄像头失败")
- return False
-
- # 连接 PTZ 球机 (可选)
- if self.enable_ptz_camera:
- if not self.ptz.connect():
- print("连接球机失败")
- self.panorama.disconnect()
- return False
- else:
- print("PTZ 球机功能已禁用")
-
- # 启动视频流(优先RTSP,SDK回调不可用时回退)
- if not self.panorama.start_stream_rtsp():
- print("RTSP视频流启动失败,尝试SDK方式...")
- if not self.panorama.start_stream():
- print("启动视频流失败")
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
- return False
-
- # 启动联动线程
- self.running = True
- self.coordinator_thread = threading.Thread(target=self._coordinator_worker, daemon=True)
- self.coordinator_thread.start()
-
- print("联动系统已启动")
- return True
-
- def stop(self):
- """停止联动系统"""
- self.running = False
-
- if self.coordinator_thread:
- self.coordinator_thread.join(timeout=3)
-
- self.panorama.disconnect()
-
- if self.enable_ptz_camera:
- self.ptz.disconnect()
-
- # 打印统计信息
- self._print_stats()
-
- print("联动系统已停止")
-
- def _update_stats(self, key: str, value: int = 1):
- """更新统计信息"""
- with self.stats_lock:
- if key in self.stats:
- self.stats[key] += value
-
- def _print_stats(self):
- """打印统计信息"""
- with self.stats_lock:
- if self.stats['start_time'] and self.stats['frames_processed'] > 0:
- elapsed = time.time() - self.stats['start_time']
- fps = self.stats['frames_processed'] / elapsed
- print("\n=== 性能统计 ===")
- print(f"运行时长: {elapsed:.1f}秒")
- print(f"处理帧数: {self.stats['frames_processed']}")
- print(f"平均帧率: {fps:.1f} fps")
- print(f"检测人体: {self.stats['persons_detected']}次")
- print(f"OCR尝试: {self.stats['ocr_attempts']}次")
- print(f"OCR成功: {self.stats['ocr_success']}次")
- print("================\n")
-
- def get_stats(self) -> dict:
- """获取统计信息"""
- with self.stats_lock:
- return self.stats.copy()
-
- def _coordinator_worker(self):
- """联动工作线程"""
- last_detection_time = 0
- # 优先使用 detection_fps,默认每秒2帧
- detection_fps = self.config.get('detection_fps', 2)
- detection_interval = 1.0 / detection_fps # 根据FPS计算间隔
-
- # 初始化统计
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- while self.running:
- try:
- current_time = time.time()
-
- # 获取当前帧
- frame = self.panorama.get_frame()
- if frame is None:
- time.sleep(0.01)
- continue
-
- # 更新帧统计
- self._update_stats('frames_processed')
-
- frame_size = (frame.shape[1], frame.shape[0])
-
- # 周期性检测
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
-
- # 检测人体
- detections = self._detect_persons(frame)
-
- # 更新检测统计
- if detections:
- self._update_stats('persons_detected', len(detections))
-
- # 更新跟踪
- tracked = self.tracker.update(detections)
-
- # 更新跟踪目标
- self._update_tracking_targets(tracked, frame_size)
-
- # 处理检测结果
- if tracked:
- self._process_detections(tracked, frame, frame_size)
-
- # 处理当前跟踪目标
- self._process_current_target(frame, frame_size)
-
- # 清理过期目标
- self._cleanup_expired_targets()
-
- time.sleep(0.01)
-
- except Exception as e:
- print(f"联动处理错误: {e}")
- time.sleep(0.1)
-
- def _detect_persons(self, frame: np.ndarray) -> List[DetectedObject]:
- """检测人体"""
- if not self.enable_detection or self.detector is None:
- return []
- return self.detector.detect_persons(frame)
-
- def _update_tracking_targets(self, detections: List[DetectedObject],
- frame_size: Tuple[int, int]):
- """更新跟踪目标"""
- current_time = time.time()
- frame_w, frame_h = frame_size
- center_x, center_y = frame_w / 2, frame_h / 2
-
- with self.targets_lock:
- # 更新现有目标
- for det in detections:
- if det.track_id is None:
- continue
-
- x_ratio = det.center[0] / frame_w
- y_ratio = det.center[1] / frame_h
-
- # 计算面积
- _, _, width, height = det.bbox
- area = width * height
-
- # 计算到画面中心的距离比例
- dx = abs(det.center[0] - center_x) / center_x
- dy = abs(det.center[1] - center_y) / center_y
- center_distance = (dx + dy) / 2 # 归一化到0-1
-
- if det.track_id in self.tracking_targets:
- # 更新位置
- target = self.tracking_targets[det.track_id]
- target.position = (x_ratio, y_ratio)
- target.last_update = current_time
- target.area = area
- target.confidence = det.confidence
- target.center_distance = center_distance
- else:
- # 新目标
- if len(self.tracking_targets) < self.config['max_tracking_targets']:
- self.tracking_targets[det.track_id] = TrackingTarget(
- track_id=det.track_id,
- position=(x_ratio, y_ratio),
- last_update=current_time,
- area=area,
- confidence=det.confidence,
- center_distance=center_distance
- )
-
- def _process_detections(self, detections: List[DetectedObject],
- frame: np.ndarray, frame_size: Tuple[int, int]):
- """处理检测结果"""
- if self.on_person_detected:
- for det in detections:
- self.on_person_detected(det, frame)
-
- def _process_current_target(self, frame: np.ndarray, frame_size: Tuple[int, int]):
- """处理当前跟踪目标"""
- with self.targets_lock:
- if not self.tracking_targets:
- self._set_state(TrackingState.IDLE)
- self.current_target = None
- return
-
- # 使用目标选择器选择最优目标
- self.current_target = self.target_selector.select_target(
- self.tracking_targets, frame_size
- )
-
- if self.current_target:
- # 移动球机到目标位置 (仅在 PTZ 跟踪启用时)
- if self.enable_ptz_tracking and self.enable_ptz_camera:
- self._set_state(TrackingState.TRACKING)
-
- x_ratio, y_ratio = self.current_target.position
-
- # 检查位置是否变化超过阈值
- should_move = True
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- if (abs(x_ratio - last_x) < self.ptz_position_threshold and
- abs(y_ratio - last_y) < self.ptz_position_threshold):
- should_move = False
-
- if should_move:
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- self.ptz.goto_exact_position(pan, tilt, zoom)
- else:
- self.ptz.track_target(x_ratio, y_ratio)
- self.last_ptz_position = (x_ratio, y_ratio)
-
- # 执行OCR识别 (仅在 OCR 启用时)
- if self.enable_ocr:
- self._perform_ocr(frame, self.current_target)
-
- def _perform_ocr(self, frame: np.ndarray, target: TrackingTarget):
- """执行OCR识别"""
- if not self.enable_ocr or self.number_detector is None:
- return
-
- # 频率控制 - 避免过于频繁调用OCR API
- current_time = time.time()
- if current_time - self.last_ocr_time < self.ocr_interval:
- return
- self.last_ocr_time = current_time
-
- # 更新OCR尝试统计
- self._update_stats('ocr_attempts')
-
- # 计算人体边界框 (基于位置估算)
- frame_h, frame_w = frame.shape[:2]
-
- # 人体占画面比例 (可配置,默认宽20%、高40%)
- person_width_ratio = self.config.get('person_width_ratio', 0.2)
- person_height_ratio = self.config.get('person_height_ratio', 0.4)
-
- person_width = int(frame_w * person_width_ratio)
- person_height = int(frame_h * person_height_ratio)
-
- x_ratio, y_ratio = target.position
- center_x = int(x_ratio * frame_w)
- center_y = int(y_ratio * frame_h)
-
- # 计算边界框,确保不超出画面范围
- x1 = max(0, center_x - person_width // 2)
- y1 = max(0, center_y - person_height // 2)
- x2 = min(frame_w, x1 + person_width)
- y2 = min(frame_h, y1 + person_height)
-
- # 更新实际宽高 (可能因边界裁剪而变小)
- actual_width = x2 - x1
- actual_height = y2 - y1
-
- person_bbox = (x1, y1, actual_width, actual_height)
-
- # 检测编号
- self._set_state(TrackingState.OCR_PROCESSING)
- person_info = self.number_detector.detect_number(frame, person_bbox)
- person_info.person_id = target.track_id
-
- # 更新OCR成功统计
- if person_info.number_text:
- self._update_stats('ocr_success')
-
- # 更新目标信息
- with self.targets_lock:
- if target.track_id in self.tracking_targets:
- self.tracking_targets[target.track_id].person_info = person_info
-
- # 回调
- if self.on_number_recognized and person_info.number_text:
- self.on_number_recognized(person_info)
-
- # 放入结果队列
- self.result_queue.put(person_info)
-
- def _cleanup_expired_targets(self):
- """清理过期目标"""
- current_time = time.time()
- timeout = self.config['tracking_timeout']
-
- with self.targets_lock:
- expired_ids = [
- target_id for target_id, target in self.tracking_targets.items()
- if current_time - target.last_update > timeout
- ]
-
- for target_id in expired_ids:
- del self.tracking_targets[target_id]
- if self.current_target and self.current_target.track_id == target_id:
- self.current_target = None
-
- def _set_state(self, state: TrackingState):
- """设置状态"""
- with self.state_lock:
- self.state = state
-
- def get_state(self) -> TrackingState:
- """获取状态"""
- with self.state_lock:
- return self.state
-
- def get_results(self) -> List[PersonInfo]:
- """
- 获取识别结果
- Returns:
- 人员信息列表
- """
- results = []
- while not self.result_queue.empty():
- try:
- results.append(self.result_queue.get_nowait())
- except queue.Empty:
- break
- return results
-
- def get_tracking_targets(self) -> List[TrackingTarget]:
- """获取当前跟踪目标"""
- with self.targets_lock:
- return list(self.tracking_targets.values())
-
- def force_track_position(self, x_ratio: float, y_ratio: float, zoom: int = None):
- """
- 强制跟踪指定位置
- Args:
- x_ratio: X方向比例
- y_ratio: Y方向比例
- zoom: 变倍
- """
- if self.enable_ptz_tracking and self.enable_ptz_camera:
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- self.ptz.goto_exact_position(pan, tilt, zoom or self.ptz.ptz_config.get('default_zoom', 8))
- else:
- self.ptz.move_to_target(x_ratio, y_ratio, zoom)
-
- def capture_snapshot(self) -> Optional[np.ndarray]:
- """
- 抓拍快照
- Returns:
- 快照图像
- """
- return self.panorama.get_frame()
- class EventDrivenCoordinator(Coordinator):
- """事件驱动联动控制器,当全景摄像头检测到事件时触发联动"""
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.event_types = {
- 'intruder': True,
- 'crossline': True,
- 'motion': True,
- }
- self.event_queue = queue.Queue()
-
- def on_event(self, event_type: str, event_data: dict):
- if not self.event_types.get(event_type, False):
- return
- self.event_queue.put({'type': event_type, 'data': event_data, 'time': time.time()})
-
- def _coordinator_worker(self):
- while self.running:
- try:
- try:
- event = self.event_queue.get(timeout=0.1)
- self._process_event(event)
- except queue.Empty:
- pass
-
- frame = self.panorama.get_frame()
- if frame is not None:
- frame_size = (frame.shape[1], frame.shape[0])
- detections = self._detect_persons(frame)
- if detections:
- tracked = self.tracker.update(detections)
- self._update_tracking_targets(tracked, frame_size)
- self._process_current_target(frame, frame_size)
-
- self._cleanup_expired_targets()
- except Exception as e:
- print(f"事件处理错误: {e}")
- time.sleep(0.1)
-
- def _process_event(self, event: dict):
- event_type = event['type']
- event_data = event['data']
- print(f"处理事件: {event_type}")
-
- if event_type == 'intruder' and 'position' in event_data:
- x_ratio, y_ratio = event_data['position']
- self.force_track_position(x_ratio, y_ratio)
- @dataclass
- class PTZCommand:
- """PTZ控制命令"""
- pan: float
- tilt: float
- zoom: int
- x_ratio: float = 0.0
- y_ratio: float = 0.0
- use_calibration: bool = True
- track_id: Optional[int] = None # 跟踪目标ID(用于配对图片保存)
- class AsyncCoordinator(Coordinator):
- """
- 异步联动控制器 — 检测线程与PTZ控制线程分离
-
- 改进:
- 1. 检测线程:持续读取全景帧 + YOLO推理
- 2. PTZ控制线程:通过命令队列接收目标位置,独立控制球机
- 3. 两线程通过 queue 通信,互不阻塞
- 4. PTZ位置确认:移动后等待球机到位并验证帧
- """
-
- PTZ_CONFIRM_WAIT = 0.3 # PTZ命令后等待稳定的秒数
- PTZ_CONFIRM_TIMEOUT = 2.0 # PTZ位置确认超时
- PTZ_COMMAND_COOLDOWN = 0.15 # PTZ命令最小间隔秒数
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- # PTZ命令队列(检测→PTZ)
- self._ptz_queue: queue.Queue = queue.Queue(maxsize=10)
-
- # 线程
- self._detection_thread = None
- self._ptz_thread = None
-
- # PTZ确认回调
- self._on_ptz_confirmed: Optional[Callable] = None
-
- # 上次PTZ命令时间
- self._last_ptz_time = 0.0
-
- # 配对图片保存器
- self._enable_paired_saving = DETECTION_CONFIG.get('enable_paired_saving', False)
- self._paired_saver: Optional[PairedImageSaver] = None
- self._current_batch_id: Optional[str] = None
- self._person_ptz_index: Dict[int, int] = {} # track_id -> person_index
-
- if self._enable_paired_saving:
- save_dir = DETECTION_CONFIG.get('paired_image_dir', '/home/admin/dsh/paired_images')
- time_window = DETECTION_CONFIG.get('paired_time_window', 5.0)
- self._paired_saver = get_paired_saver(base_dir=save_dir, time_window=time_window)
- logger.info(f"[AsyncCoordinator] 配对图片保存已启用: 目录={save_dir}, 时间窗口={time_window}s")
-
- def start(self) -> bool:
- """启动联动(覆盖父类,启动双线程)"""
- if not self.panorama.connect():
- print("连接全景摄像头失败")
- return False
-
- if self.enable_ptz_camera:
- if not self.ptz.connect():
- print("连接球机失败")
- self.panorama.disconnect()
- return False
-
- # 启动球机RTSP流(用于球机端人体检测)
- if self.enable_ptz_detection:
- if not self.ptz.start_stream_rtsp():
- print("球机RTSP流启动失败,禁用球机端检测功能")
- self.enable_ptz_detection = False
- else:
- # 初始化球机端人体检测器
- self._init_ptz_detector()
- else:
- print("PTZ球机功能已禁用")
-
- if not self.panorama.start_stream_rtsp():
- print("RTSP视频流启动失败,尝试SDK方式...")
- if not self.panorama.start_stream():
- print("启动视频流失败")
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
- return False
-
- self.running = True
-
- # 启动检测线程
- self._detection_thread = threading.Thread(
- target=self._detection_worker, name="detection-worker", daemon=True)
- self._detection_thread.start()
-
- # 启动PTZ控制线程
- if self.enable_ptz_camera and self.enable_ptz_tracking:
- self._ptz_thread = threading.Thread(
- target=self._ptz_worker, name="ptz-worker", daemon=True)
- self._ptz_thread.start()
-
- print("异步联动系统已启动 (检测线程 + PTZ控制线程)")
- return True
-
- def stop(self):
- """停止联动"""
- self.running = False
-
- # 清空PTZ队列,让工作线程退出
- while not self._ptz_queue.empty():
- try:
- self._ptz_queue.get_nowait()
- except queue.Empty:
- break
-
- if self._detection_thread:
- self._detection_thread.join(timeout=3)
- if self._ptz_thread:
- self._ptz_thread.join(timeout=3)
-
- # 停止父类线程(如果有的话)
- if self.coordinator_thread:
- self.coordinator_thread.join(timeout=1)
-
- # 关闭配对保存器
- if self._paired_saver is not None:
- self._paired_saver.close()
- self._paired_saver = None
-
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
-
- self._print_stats()
- print("异步联动系统已停止")
-
- def _detection_worker(self):
- """检测线程:持续读帧 + YOLO推理 + 发送PTZ命令 + 打印检测日志"""
- last_detection_time = 0
- # 优先使用 detection_fps,默认每秒2帧
- detection_fps = self.config.get('detection_fps', 2)
- detection_interval = 1.0 / detection_fps # 根据FPS计算间隔
- ptz_cooldown = self.config.get('ptz_command_cooldown', 0.5)
- ptz_threshold = self.config.get('ptz_position_threshold', 0.03)
- frame_count = 0
- last_log_time = time.time()
- log_interval = 5.0 # 每5秒打印一次帧率统计
- detection_run_count = 0
- detection_person_count = 0
- last_no_detect_log_time = 0
- no_detect_log_interval = 30.0
-
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- if self.detector is None:
- logger.warning("[检测线程] ⚠️ 人体检测器未初始化! 检测功能不可用, 请检查 YOLO 模型是否正确加载")
- elif not self.enable_detection:
- logger.warning("[检测线程] ⚠️ 人体检测已禁用 (enable_detection=False)")
- else:
- logger.info(f"[检测线程] ✓ 人体检测器已就绪, 检测帧率={detection_fps}fps(间隔={detection_interval:.2f}s), PTZ冷却={ptz_cooldown}s")
-
- while self.running:
- try:
- current_time = time.time()
- frame = self.panorama.get_frame()
- if frame is None:
- time.sleep(0.01)
- continue
-
- frame_count += 1
- self._update_stats('frames_processed')
- frame_size = (frame.shape[1], frame.shape[0])
-
- if current_time - last_log_time >= log_interval:
- elapsed = current_time - last_log_time
- fps = frame_count / elapsed if elapsed > 0 else 0
-
- state_str = self.state.name if hasattr(self.state, 'name') else str(self.state)
- stats_parts = [f"帧率={fps:.1f}fps", f"处理帧={frame_count}", f"状态={state_str}"]
-
- if self.detector is None:
- stats_parts.append("检测器=未加载")
- elif not self.enable_detection:
- stats_parts.append("检测=已禁用")
- else:
- stats_parts.append(f"检测轮次={detection_run_count}(有人={detection_person_count})")
-
- with self.targets_lock:
- target_count = len(self.tracking_targets)
- stats_parts.append(f"跟踪目标={target_count}")
-
- logger.info(f"[检测线程] {', '.join(stats_parts)}")
- frame_count = 0
- last_log_time = current_time
-
- # 周期性检测(约1次/秒)
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
- detection_run_count += 1
-
- # YOLO 人体检测
- detections = self._detect_persons(frame)
-
- if detections:
- self._update_stats('persons_detected', len(detections))
- detection_person_count += 1
-
- # 更新跟踪
- tracked = self.tracker.update(detections)
- self._update_tracking_targets(tracked, frame_size)
-
- # 配对图片保存:创建新批次
- if tracked and self._enable_paired_saving and self._paired_saver is not None:
- self._create_detection_batch(frame, tracked, frame_size)
-
- # 打印检测日志
- if tracked:
- for t in tracked:
- # tracked 是 DetectedObject,使用 center 计算位置
- x_ratio = t.center[0] / frame_size[0]
- y_ratio = t.center[1] / frame_size[1]
- _, _, w, h = t.bbox
- area = w * h
- logger.info(
- f"[检测] ✓ 目标ID={t.track_id} "
- f"位置=({x_ratio:.3f}, {y_ratio:.3f}) "
- f"面积={area} 置信度={t.confidence:.2f}"
- )
- elif detections:
- # 有检测但没跟踪上
- for d in detections:
- logger.debug(f"[检测] 未跟踪: {d.class_name} @ {d.center}")
- else:
- if current_time - last_no_detect_log_time >= no_detect_log_interval:
- logger.info(
- f"[检测] · YOLO检测运行正常, 本轮未检测到人员 "
- f"(累计检测{detection_run_count}轮, 检测到人{detection_person_count}轮)"
- )
- last_no_detect_log_time = current_time
-
- if tracked:
- self._process_detections(tracked, frame, frame_size)
-
- # 选择跟踪目标并发送PTZ命令
- target = self._select_tracking_target()
- if target and self.enable_ptz_tracking and self.enable_ptz_camera:
- self._send_ptz_command_with_log(target, frame_size)
- elif not tracked and self.current_target:
- # 目标消失,切回IDLE
- self._set_state(TrackingState.IDLE)
- logger.info("[检测] 目标丢失,球机进入IDLE状态")
- self.current_target = None
-
- self._cleanup_expired_targets()
- time.sleep(0.01)
-
- except Exception as e:
- logger.error(f"检测线程错误: {e}")
- time.sleep(0.1)
-
- def _init_ptz_detector(self):
- """初始化球机端人体检测器"""
- try:
- model_path = DETECTION_CONFIG.get('model_path')
- model_type = DETECTION_CONFIG.get('model_type', 'auto')
- conf_threshold = DETECTION_CONFIG.get('person_threshold', 0.5)
-
- if model_path:
- self.ptz_detector = PTZPersonDetector(
- model_path=model_path,
- model_type=model_type,
- confidence_threshold=conf_threshold
- )
- self.auto_zoom_controller = PTZAutoZoomController(
- ptz_camera=self.ptz,
- detector=self.ptz_detector,
- config=self.auto_zoom_config
- )
- print(f"[AsyncCoordinator] 球机端人体检测器初始化成功")
- else:
- print("[AsyncCoordinator] 未配置球机检测模型路径,禁用球机端检测")
- self.enable_ptz_detection = False
- except Exception as e:
- print(f"[AsyncCoordinator] 球机端检测器初始化失败: {e}")
- self.enable_ptz_detection = False
- def _create_detection_batch(self, frame: np.ndarray,
- tracked: List[DetectedObject],
- frame_size: Tuple[int, int]):
- """
- 创建检测批次,用于配对图片保存
-
- Args:
- frame: 全景帧
- tracked: 跟踪到的人员列表
- frame_size: 帧尺寸
- """
- if self._paired_saver is None:
- return
-
- # 构建人员信息列表
- persons = []
- self._person_ptz_index = {} # 重置索引映射
-
- for i, det in enumerate(tracked):
- x_ratio = det.center[0] / frame_size[0]
- y_ratio = det.center[1] / frame_size[1]
-
- person_info = {
- 'track_id': det.track_id,
- 'position': (x_ratio, y_ratio),
- 'bbox': (det.bbox[0], det.bbox[1],
- det.bbox[0] + det.bbox[2],
- det.bbox[1] + det.bbox[3]),
- 'confidence': det.confidence
- }
- persons.append(person_info)
- self._person_ptz_index[det.track_id] = i
-
- # 创建新批次
- batch_id = self._paired_saver.start_new_batch(frame, persons)
- if batch_id:
- self._current_batch_id = batch_id
- logger.info(f"[配对保存] 创建批次: {batch_id}, 人员={len(persons)}")
- def _save_ptz_image_for_person(self, track_id: int,
- ptz_frame: np.ndarray,
- ptz_position: Tuple[float, float, int]):
- """
- 保存球机聚焦图片到对应批次
-
- Args:
- track_id: 人员跟踪ID
- ptz_frame: 球机帧
- ptz_position: PTZ位置 (pan, tilt, zoom)
- """
- if (self._paired_saver is None or
- self._current_batch_id is None or
- track_id not in self._person_ptz_index):
- return
-
- person_index = self._person_ptz_index[track_id]
-
- self._paired_saver.save_ptz_image(
- batch_id=self._current_batch_id,
- person_index=person_index,
- ptz_frame=ptz_frame,
- ptz_position=ptz_position
- )
- def _ptz_worker(self):
- """PTZ控制线程:从队列接收命令并控制球机"""
- while self.running:
- try:
- try:
- cmd = self._ptz_queue.get(timeout=0.1)
- except queue.Empty:
- continue
-
- # 从命令中提取 track_id 并传递
- self._execute_ptz_command(cmd, track_id=cmd.track_id)
-
- except Exception as e:
- print(f"PTZ控制线程错误: {e}")
- time.sleep(0.05)
-
- def _select_tracking_target(self) -> Optional[TrackingTarget]:
- """选择当前跟踪目标"""
- with self.targets_lock:
- if not self.tracking_targets:
- self._set_state(TrackingState.IDLE)
- self.current_target = None
- return None
-
- # 使用目标选择器选择最优目标
- self.current_target = self.target_selector.select_target(
- self.tracking_targets
- )
-
- return self.current_target
-
- def _send_ptz_command(self, target: TrackingTarget, frame_size: Tuple[int, int]):
- """将跟踪目标转化为PTZ命令放入队列"""
- x_ratio, y_ratio = target.position
-
- # 检查位置变化是否超过阈值
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- if abs(x_ratio - last_x) < self.ptz_position_threshold and \
- abs(y_ratio - last_y) < self.ptz_position_threshold:
- return
-
- # 冷却检查
- current_time = time.time()
- if current_time - self._last_ptz_time < self.PTZ_COMMAND_COOLDOWN:
- return
-
- cmd = PTZCommand(
- pan=0, tilt=0, zoom=0,
- x_ratio=x_ratio, y_ratio=y_ratio,
- use_calibration=self.enable_calibration
- )
-
- try:
- self._ptz_queue.put_nowait(cmd)
- self.last_ptz_position = (x_ratio, y_ratio)
- except queue.Full:
- pass # 丢弃命令,下一个检测周期会重发
-
- def _send_ptz_command_with_log(self, target: TrackingTarget, frame_size: Tuple[int, int]):
- """发送PTZ命令并打印日志"""
- x_ratio, y_ratio = target.position
-
- # 计算PTZ角度(用于日志)
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- coord_type = "校准坐标"
- else:
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
- coord_type = "估算坐标"
-
- logger.info(
- f"[PTZ] 发送命令: 目标ID={target.track_id} "
- f"全景位置=({x_ratio:.3f}, {y_ratio:.3f}) → "
- f"PTZ角度=(pan={pan:.1f}°, tilt={tilt:.1f}°, zoom={zoom}) [{coord_type}]"
- )
-
- # 检查位置变化是否超过阈值
- ptz_threshold = self.config.get('ptz_position_threshold', 0.03)
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- dx = abs(x_ratio - last_x)
- dy = abs(y_ratio - last_y)
- if dx < ptz_threshold and dy < ptz_threshold:
- logger.debug(f"[PTZ] 位置变化太小(dx={dx:.4f}, dy={dy:.4f}),跳过")
- return
-
- # 冷却检查
- current_time = time.time()
- ptz_cooldown = self.config.get('ptz_command_cooldown', 0.5)
- if current_time - self._last_ptz_time < ptz_cooldown:
- logger.debug(f"[PTZ] 冷却中,跳过 (间隔={current_time - self._last_ptz_time:.2f}s < {ptz_cooldown}s)")
- return
-
- cmd = PTZCommand(
- pan=0, tilt=0, zoom=0,
- x_ratio=x_ratio, y_ratio=y_ratio,
- use_calibration=self.enable_calibration,
- track_id=target.track_id # 传递跟踪ID
- )
-
- try:
- self._ptz_queue.put_nowait(cmd)
- self.last_ptz_position = (x_ratio, y_ratio)
- self._update_stats('ptz_commands_sent' if 'ptz_commands_sent' in self.stats else 'persons_detected')
- except queue.Full:
- logger.warning("[PTZ] 命令队列满,丢弃本次命令")
-
- def _execute_ptz_command(self, cmd: PTZCommand, track_id: int = None):
- """
- 执行PTZ命令(在PTZ线程中)
-
- Args:
- cmd: PTZ命令
- track_id: 跟踪目标ID(用于配对图片保存)
- """
- self._last_ptz_time = time.time()
-
- if cmd.use_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(cmd.x_ratio, cmd.y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- else:
- pan, tilt, zoom = self.ptz.calculate_ptz_position(cmd.x_ratio, cmd.y_ratio)
-
- self._set_state(TrackingState.TRACKING)
- logger.info(
- f"[PTZ] 执行: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom} "
- f"(全景位置=({cmd.x_ratio:.3f}, {cmd.y_ratio:.3f}))"
- )
-
- success = self.ptz.goto_exact_position(pan, tilt, zoom)
-
- if success:
- time.sleep(self.PTZ_CONFIRM_WAIT)
-
- # 球机端人体检测与自动对焦
- if self.enable_ptz_detection and self.auto_zoom_config.get('enabled', False):
- final_zoom = self._auto_zoom_person(pan, tilt, zoom)
- if final_zoom != zoom:
- zoom = final_zoom
-
- # 保存球机图片到配对批次
- if self._enable_paired_saving and track_id is not None:
- ptz_frame = self.ptz.get_frame()
- if ptz_frame is not None:
- self._save_ptz_image_for_person(track_id, ptz_frame, (pan, tilt, zoom))
-
- logger.info(f"[PTZ] 到位确认完成: pan={pan:.1f}° tilt={tilt:.1f}°")
- else:
- logger.warning(f"[PTZ] 命令执行失败: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}")
-
- def _auto_zoom_person(self, initial_pan: float, initial_tilt: float, initial_zoom: int) -> int:
- """
- 自动对焦人体
- 在球机画面中检测人体,自动调整zoom使人体居中且大小合适
-
- Returns:
- 最终的 zoom 值
- """
- if self.auto_zoom_controller is None:
- return initial_zoom
-
- logger.info("[AutoZoom] 开始自动对焦...")
-
- try:
- success, final_zoom = self.auto_zoom_controller.auto_focus_loop(
- get_frame_func=self.ptz.get_frame,
- max_attempts=self.auto_zoom_config.get('max_adjust_attempts', 3)
- )
-
- if success:
- logger.info(f"[AutoZoom] 自动对焦成功: zoom={final_zoom}")
- return final_zoom
- else:
- logger.warning("[AutoZoom] 自动对焦未能定位人体")
- return initial_zoom
- except Exception as e:
- logger.error(f"[AutoZoom] 自动对焦异常: {e}")
- return initial_zoom
-
- def _confirm_ptz_position(self, x_ratio: float, y_ratio: float):
- """PTZ位置确认:读取球机帧验证目标是否可见"""
- if not hasattr(self.ptz, 'get_frame') or self.ptz.get_frame() is None:
- return
-
- ptz_frame = self.ptz.get_frame()
- if ptz_frame is None:
- return
-
- # 未来可以在这里添加球机帧目标验证逻辑
- # 例如:在球机帧中检测目标是否在画面中心附近
-
- def on_ptz_confirmed(self, callback: Callable):
- """注册PTZ位置确认回调"""
- self._on_ptz_confirmed = callback
|