| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445 |
- """
- 联动控制器
- 协调全景摄像头和球机的工作
- """
- import time
- import threading
- import queue
- import logging
- from typing import Optional, List, Dict, Tuple, Callable
- from dataclasses import dataclass, field
- from enum import Enum
- import numpy as np
- import cv2
- from config import COORDINATOR_CONFIG, SYSTEM_CONFIG, PTZ_CONFIG, DETECTION_CONFIG
- from panorama_camera import PanoramaCamera, ObjectDetector, PersonTracker, DetectedObject
- from ptz_camera import PTZCamera, PTZController
- from ocr_recognizer import NumberDetector, PersonInfo
- from ptz_person_tracker import PTZPersonDetector, PTZAutoZoomController
- from paired_image_saver import PairedImageSaver, get_paired_saver
- logger = logging.getLogger(__name__)
- class TrackingState(Enum):
- """跟踪状态"""
- IDLE = 0 # 空闲
- SEARCHING = 1 # 搜索目标
- TRACKING = 2 # 跟踪中
- ZOOMING = 3 # 变焦中
- OCR_PROCESSING = 4 # OCR处理中
- @dataclass
- class TrackingTarget:
- """跟踪目标"""
- track_id: int # 跟踪ID
- position: Tuple[float, float] # 位置比例 (x_ratio, y_ratio)
- last_update: float # 最后更新时间
- person_info: Optional[PersonInfo] = None # 人员信息
- priority: int = 0 # 优先级
- area: int = 0 # 目标面积(像素²)
- confidence: float = 0.0 # 置信度
- center_distance: float = 1.0 # 到画面中心的距离比例(0-1)
- score: float = 0.0 # 综合得分
- class TargetSelector:
- """
- 目标选择策略类
- 支持按面积、置信度、混合模式排序,支持优先级切换
- """
-
- def __init__(self, config: Dict = None):
- """
- 初始化目标选择器
- Args:
- config: 目标选择配置
- """
- self.config = config or {
- 'strategy': 'area',
- 'area_weight': 0.6,
- 'confidence_weight': 0.4,
- 'min_area_threshold': 5000,
- 'prefer_center': True,
- 'center_weight': 0.2,
- 'switch_on_lost': True,
- 'stickiness': 0.3,
- }
- self.current_target_id: Optional[int] = None
- self.current_target_score: float = 0.0
-
- def calculate_score(self, target: TrackingTarget, frame_size: Tuple[int, int] = None) -> float:
- """
- 计算目标综合得分
- Args:
- target: 跟踪目标
- frame_size: 帧尺寸(w, h),用于计算中心距离
- Returns:
- 综合得分(0-1)
- """
- strategy = self.config.get('strategy', 'area')
- area_weight = self.config.get('area_weight', 0.6)
- conf_weight = self.config.get('confidence_weight', 0.4)
- min_area = self.config.get('min_area_threshold', 5000)
- prefer_center = self.config.get('prefer_center', False)
- center_weight = self.config.get('center_weight', 0.2)
-
- # 归一化面积得分 (对数缩放,避免大目标得分过高)
- import math
- area_score = min(1.0, math.log10(max(target.area, 1)) / 5.0) # 100000像素² ≈ 1.0
-
- # 小面积惩罚
- if target.area < min_area:
- area_score *= 0.5
-
- # 置信度得分直接使用
- conf_score = target.confidence
-
- # 中心距离得分 (距离中心越近得分越高)
- center_score = 1.0 - target.center_distance
-
- # 根据策略计算综合得分
- if strategy == 'area':
- score = area_score * 0.8 + conf_score * 0.2
- elif strategy == 'confidence':
- score = conf_score * 0.8 + area_score * 0.2
- else: # hybrid
- score = area_score * area_weight + conf_score * conf_weight
-
- # 加入中心距离权重
- if prefer_center:
- score = score * (1 - center_weight) + center_score * center_weight
-
- return score
-
- def select_target(self, targets: Dict[int, TrackingTarget],
- frame_size: Tuple[int, int] = None) -> Optional[TrackingTarget]:
- """
- 从多个目标中选择最优目标
- Args:
- targets: 目标字典 {track_id: TrackingTarget}
- frame_size: 帧尺寸
- Returns:
- 最优目标
- """
- if not targets:
- self.current_target_id = None
- return None
-
- stickiness = self.config.get('stickiness', 0.3)
- switch_on_lost = self.config.get('switch_on_lost', True)
-
- # 计算所有目标得分
- scored_targets = []
- for track_id, target in targets.items():
- target.score = self.calculate_score(target, frame_size)
- scored_targets.append((track_id, target, target.score))
-
- # 按得分排序
- scored_targets.sort(key=lambda x: x[2], reverse=True)
-
- # 检查当前目标是否仍在列表中
- if self.current_target_id is not None:
- current_exists = self.current_target_id in targets
- if current_exists:
- # 应用粘性:当前目标得分需要显著低于最优目标才切换
- best_id, best_target, best_score = scored_targets[0]
- current_target = targets[self.current_target_id]
-
- # 粘性阈值: 当前目标得分 > 最优得分 * (1 - stickiness) 时保持
- stickiness_threshold = best_score * (1 - stickiness)
- if current_target.score > stickiness_threshold:
- return current_target
-
-
- # 选择得分最高的目标
- best_id, best_target, best_score = scored_targets[0]
- self.current_target_id = best_id
- self.current_target_score = best_score
-
- logger.debug(
- f"[目标选择] 选择目标ID={best_id} 得分={best_score:.3f} "
- f"面积={best_target.area} 置信度={best_target.confidence:.2f}"
- )
-
- return best_target
-
- def get_sorted_targets(self, targets: Dict[int, TrackingTarget],
- frame_size: Tuple[int, int] = None) -> List[Tuple[TrackingTarget, float]]:
- """
- 获取按得分排序的目标列表
- Args:
- targets: 目标字典
- frame_size: 帧尺寸
- Returns:
- 排序后的目标列表 [(target, score), ...]
- """
- scored = []
- for target in targets.values():
- target.score = self.calculate_score(target, frame_size)
- scored.append((target, target.score))
-
- scored.sort(key=lambda x: x[1], reverse=True)
- return scored
-
- def set_strategy(self, strategy: str):
- """设置选择策略"""
- self.config['strategy'] = strategy
- logger.info(f"[目标选择] 策略已切换为: {strategy}")
-
- def set_stickiness(self, stickiness: float):
- """设置目标粘性"""
- self.config['stickiness'] = max(0.0, min(1.0, stickiness))
- logger.info(f"[目标选择] 粘性已设置为: {self.config['stickiness']}")
- class Coordinator:
- """
- 联动控制器
- 协调全景摄像头和球机实现联动抓拍
- """
-
- def __init__(self, panorama_camera: PanoramaCamera,
- ptz_camera: PTZCamera,
- detector: ObjectDetector = None,
- number_detector: NumberDetector = None,
- calibrator = None):
- """
- 初始化联动控制器
- Args:
- panorama_camera: 全景摄像头
- ptz_camera: 球机
- detector: 物体检测器
- number_detector: 编号检测器
- calibrator: 校准器 (用于坐标转换)
- """
- self.panorama = panorama_camera
- self.ptz = ptz_camera
- self.detector = detector
- self.number_detector = number_detector
- self.calibrator = calibrator
-
- self.config = COORDINATOR_CONFIG
-
- # 功能开关 - 从 SYSTEM_CONFIG 读取
- self.enable_ptz_camera = SYSTEM_CONFIG.get('enable_ptz_camera', True)
- self.enable_ptz_tracking = SYSTEM_CONFIG.get('enable_ptz_tracking', True)
- self.enable_calibration = SYSTEM_CONFIG.get('enable_calibration', True)
- self.enable_detection = SYSTEM_CONFIG.get('enable_detection', True)
- self.enable_ocr = SYSTEM_CONFIG.get('enable_ocr', True)
-
- # 球机端人体检测与自动对焦
- self.enable_ptz_detection = PTZ_CONFIG.get('enable_ptz_detection', False)
- self.auto_zoom_config = PTZ_CONFIG.get('auto_zoom', {})
- self.ptz_detector = None
- self.auto_zoom_controller = None
-
- # 跟踪器
- self.tracker = PersonTracker()
-
- # 状态
- self.state = TrackingState.IDLE
- self.state_lock = threading.Lock()
-
- # 跟踪目标
- self.tracking_targets: Dict[int, TrackingTarget] = {}
- self.targets_lock = threading.Lock()
-
- # 当前跟踪目标
- self.current_target: Optional[TrackingTarget] = None
-
- # 回调函数
- self.on_person_detected: Optional[Callable] = None
- self.on_number_recognized: Optional[Callable] = None
- self.on_tracking_started: Optional[Callable] = None
- self.on_tracking_stopped: Optional[Callable] = None
-
- # 控制标志
- self.running = False
- self.coordinator_thread = None
-
- # OCR频率控制
- self.last_ocr_time = 0
- self.ocr_interval = 1.0 # OCR间隔(秒),避免过于频繁调用API
-
- # PTZ优化 - 避免频繁发送相同位置的命令
- self.last_ptz_position = None
- self.ptz_position_threshold = self.config.get('ptz_position_threshold', 0.03)
-
- # 目标选择器
- self.target_selector = TargetSelector(
- self.config.get('target_selection', {})
- )
-
- # 结果队列
- self.result_queue = queue.Queue()
-
- # 性能统计
- self.stats = {
- 'frames_processed': 0,
- 'persons_detected': 0,
- 'ocr_attempts': 0,
- 'ocr_success': 0,
- 'start_time': None,
- 'last_frame_time': None,
- }
- self.stats_lock = threading.Lock()
-
- def set_calibrator(self, calibrator):
- """设置校准器"""
- self.calibrator = calibrator
-
- def _transform_position(self, x_ratio: float, y_ratio: float) -> Tuple[float, float, int]:
- """
- 将全景坐标转换为PTZ角度
- Args:
- x_ratio: X方向比例
- y_ratio: Y方向比例
- Returns:
- (pan, tilt, zoom)
- """
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- # 使用校准结果进行转换
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- zoom = 8 # 默认变倍
- else:
- # 使用默认估算
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
-
- return (pan, tilt, zoom)
-
- def start(self) -> bool:
- """
- 启动联动系统
- Returns:
- 是否成功
- """
- # 连接全景摄像头
- if not self.panorama.connect():
- print("连接全景摄像头失败")
- return False
-
- # 连接 PTZ 球机 (可选)
- if self.enable_ptz_camera:
- if not self.ptz.connect():
- print("连接球机失败")
- self.panorama.disconnect()
- return False
- else:
- print("PTZ 球机功能已禁用")
-
- # 启动视频流(优先RTSP,SDK回调不可用时回退)
- if not self.panorama.start_stream_rtsp():
- print("RTSP视频流启动失败,尝试SDK方式...")
- if not self.panorama.start_stream():
- print("启动视频流失败")
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
- return False
-
- # 启动联动线程
- self.running = True
- self.coordinator_thread = threading.Thread(target=self._coordinator_worker, daemon=True)
- self.coordinator_thread.start()
-
- print("联动系统已启动")
- return True
-
- def stop(self):
- """停止联动系统"""
- self.running = False
-
- if self.coordinator_thread:
- self.coordinator_thread.join(timeout=3)
-
- self.panorama.disconnect()
-
- if self.enable_ptz_camera:
- self.ptz.disconnect()
-
- # 打印统计信息
- self._print_stats()
-
- print("联动系统已停止")
-
- def _update_stats(self, key: str, value: int = 1):
- """更新统计信息"""
- with self.stats_lock:
- if key in self.stats:
- self.stats[key] += value
-
- def _print_stats(self):
- """打印统计信息"""
- with self.stats_lock:
- if self.stats['start_time'] and self.stats['frames_processed'] > 0:
- elapsed = time.time() - self.stats['start_time']
- fps = self.stats['frames_processed'] / elapsed
- print("\n=== 性能统计 ===")
- print(f"运行时长: {elapsed:.1f}秒")
- print(f"处理帧数: {self.stats['frames_processed']}")
- print(f"平均帧率: {fps:.1f} fps")
- print(f"检测人体: {self.stats['persons_detected']}次")
- print(f"OCR尝试: {self.stats['ocr_attempts']}次")
- print(f"OCR成功: {self.stats['ocr_success']}次")
- print("================\n")
-
- def get_stats(self) -> dict:
- """获取统计信息"""
- with self.stats_lock:
- return self.stats.copy()
-
- def _coordinator_worker(self):
- """联动工作线程"""
- last_detection_time = 0
- # 优先使用 detection_fps,默认每秒2帧
- detection_fps = self.config.get('detection_fps', 2)
- detection_interval = 1.0 / detection_fps # 根据FPS计算间隔
-
- # 初始化统计
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- while self.running:
- try:
- current_time = time.time()
-
- # 获取当前帧
- frame = self.panorama.get_frame()
- if frame is None:
- time.sleep(0.01)
- continue
-
- # 更新帧统计
- self._update_stats('frames_processed')
-
- frame_size = (frame.shape[1], frame.shape[0])
-
- # 周期性检测
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
-
- # 检测人体
- detections = self._detect_persons(frame)
-
- # 更新检测统计
- if detections:
- self._update_stats('persons_detected', len(detections))
-
- # 更新跟踪
- tracked = self.tracker.update(detections)
-
- # 更新跟踪目标
- self._update_tracking_targets(tracked, frame_size)
-
- # 处理检测结果
- if tracked:
- self._process_detections(tracked, frame, frame_size)
-
- # 处理当前跟踪目标
- self._process_current_target(frame, frame_size)
-
- # 清理过期目标
- self._cleanup_expired_targets()
-
- time.sleep(0.01)
-
- except Exception as e:
- print(f"联动处理错误: {e}")
- time.sleep(0.1)
-
- def _detect_persons(self, frame: np.ndarray) -> List[DetectedObject]:
- """检测人体"""
- if not self.enable_detection or self.detector is None:
- return []
- return self.detector.detect_persons(frame)
-
- def _update_tracking_targets(self, detections: List[DetectedObject],
- frame_size: Tuple[int, int]):
- """更新跟踪目标"""
- current_time = time.time()
- frame_w, frame_h = frame_size
- center_x, center_y = frame_w / 2, frame_h / 2
-
- with self.targets_lock:
- # 更新现有目标
- for det in detections:
- if det.track_id is None:
- continue
-
- x_ratio = det.center[0] / frame_w
- y_ratio = det.center[1] / frame_h
-
- # 计算面积
- _, _, width, height = det.bbox
- area = width * height
-
- # 计算到画面中心的距离比例
- dx = abs(det.center[0] - center_x) / center_x
- dy = abs(det.center[1] - center_y) / center_y
- center_distance = (dx + dy) / 2 # 归一化到0-1
-
- if det.track_id in self.tracking_targets:
- # 更新位置
- target = self.tracking_targets[det.track_id]
- target.position = (x_ratio, y_ratio)
- target.last_update = current_time
- target.area = area
- target.confidence = det.confidence
- target.center_distance = center_distance
- else:
- # 新目标
- if len(self.tracking_targets) < self.config['max_tracking_targets']:
- self.tracking_targets[det.track_id] = TrackingTarget(
- track_id=det.track_id,
- position=(x_ratio, y_ratio),
- last_update=current_time,
- area=area,
- confidence=det.confidence,
- center_distance=center_distance
- )
-
- def _process_detections(self, detections: List[DetectedObject],
- frame: np.ndarray, frame_size: Tuple[int, int]):
- """处理检测结果"""
- if self.on_person_detected:
- for det in detections:
- self.on_person_detected(det, frame)
-
- def _process_current_target(self, frame: np.ndarray, frame_size: Tuple[int, int]):
- """处理当前跟踪目标"""
- with self.targets_lock:
- if not self.tracking_targets:
- self._set_state(TrackingState.IDLE)
- self.current_target = None
- return
-
- # 使用目标选择器选择最优目标
- self.current_target = self.target_selector.select_target(
- self.tracking_targets, frame_size
- )
-
- if self.current_target:
- # 移动球机到目标位置 (仅在 PTZ 跟踪启用时)
- if self.enable_ptz_tracking and self.enable_ptz_camera:
- self._set_state(TrackingState.TRACKING)
-
- x_ratio, y_ratio = self.current_target.position
-
- # 检查位置是否变化超过阈值
- should_move = True
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- if (abs(x_ratio - last_x) < self.ptz_position_threshold and
- abs(y_ratio - last_y) < self.ptz_position_threshold):
- should_move = False
-
- if should_move:
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- self.ptz.goto_exact_position(pan, tilt, zoom)
- else:
- self.ptz.track_target(x_ratio, y_ratio)
- self.last_ptz_position = (x_ratio, y_ratio)
-
- # 执行OCR识别 (仅在 OCR 启用时)
- if self.enable_ocr:
- self._perform_ocr(frame, self.current_target)
-
- def _perform_ocr(self, frame: np.ndarray, target: TrackingTarget):
- """执行OCR识别"""
- if not self.enable_ocr or self.number_detector is None:
- return
-
- # 频率控制 - 避免过于频繁调用OCR API
- current_time = time.time()
- if current_time - self.last_ocr_time < self.ocr_interval:
- return
- self.last_ocr_time = current_time
-
- # 更新OCR尝试统计
- self._update_stats('ocr_attempts')
-
- # 计算人体边界框 (基于位置估算)
- frame_h, frame_w = frame.shape[:2]
-
- # 人体占画面比例 (可配置,默认宽20%、高40%)
- person_width_ratio = self.config.get('person_width_ratio', 0.2)
- person_height_ratio = self.config.get('person_height_ratio', 0.4)
-
- person_width = int(frame_w * person_width_ratio)
- person_height = int(frame_h * person_height_ratio)
-
- x_ratio, y_ratio = target.position
- center_x = int(x_ratio * frame_w)
- center_y = int(y_ratio * frame_h)
-
- # 计算边界框,确保不超出画面范围
- x1 = max(0, center_x - person_width // 2)
- y1 = max(0, center_y - person_height // 2)
- x2 = min(frame_w, x1 + person_width)
- y2 = min(frame_h, y1 + person_height)
-
- # 更新实际宽高 (可能因边界裁剪而变小)
- actual_width = x2 - x1
- actual_height = y2 - y1
-
- person_bbox = (x1, y1, actual_width, actual_height)
-
- # 检测编号
- self._set_state(TrackingState.OCR_PROCESSING)
- person_info = self.number_detector.detect_number(frame, person_bbox)
- person_info.person_id = target.track_id
-
- # 更新OCR成功统计
- if person_info.number_text:
- self._update_stats('ocr_success')
-
- # 更新目标信息
- with self.targets_lock:
- if target.track_id in self.tracking_targets:
- self.tracking_targets[target.track_id].person_info = person_info
-
- # 回调
- if self.on_number_recognized and person_info.number_text:
- self.on_number_recognized(person_info)
-
- # 放入结果队列
- self.result_queue.put(person_info)
-
- def _cleanup_expired_targets(self):
- """清理过期目标"""
- current_time = time.time()
- timeout = self.config['tracking_timeout']
-
- with self.targets_lock:
- expired_ids = [
- target_id for target_id, target in self.tracking_targets.items()
- if current_time - target.last_update > timeout
- ]
-
- for target_id in expired_ids:
- del self.tracking_targets[target_id]
- if self.current_target and self.current_target.track_id == target_id:
- self.current_target = None
-
- def _set_state(self, state: TrackingState):
- """设置状态"""
- with self.state_lock:
- self.state = state
-
- def get_state(self) -> TrackingState:
- """获取状态"""
- with self.state_lock:
- return self.state
-
- def get_results(self) -> List[PersonInfo]:
- """
- 获取识别结果
- Returns:
- 人员信息列表
- """
- results = []
- while not self.result_queue.empty():
- try:
- results.append(self.result_queue.get_nowait())
- except queue.Empty:
- break
- return results
-
- def get_tracking_targets(self) -> List[TrackingTarget]:
- """获取当前跟踪目标"""
- with self.targets_lock:
- return list(self.tracking_targets.values())
-
- def force_track_position(self, x_ratio: float, y_ratio: float, zoom: int = None):
- """
- 强制跟踪指定位置
- Args:
- x_ratio: X方向比例
- y_ratio: Y方向比例
- zoom: 变倍
- """
- if self.enable_ptz_tracking and self.enable_ptz_camera:
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- self.ptz.goto_exact_position(pan, tilt, zoom or self.ptz.ptz_config.get('default_zoom', 8))
- else:
- self.ptz.move_to_target(x_ratio, y_ratio, zoom)
-
- def capture_snapshot(self) -> Optional[np.ndarray]:
- """
- 抓拍快照
- Returns:
- 快照图像
- """
- return self.panorama.get_frame()
- class EventDrivenCoordinator(Coordinator):
- """事件驱动联动控制器,当全景摄像头检测到事件时触发联动"""
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.event_types = {
- 'intruder': True,
- 'crossline': True,
- 'motion': True,
- }
- self.event_queue = queue.Queue()
-
- def on_event(self, event_type: str, event_data: dict):
- if not self.event_types.get(event_type, False):
- return
- self.event_queue.put({'type': event_type, 'data': event_data, 'time': time.time()})
-
- def _coordinator_worker(self):
- while self.running:
- try:
- try:
- event = self.event_queue.get(timeout=0.1)
- self._process_event(event)
- except queue.Empty:
- pass
-
- frame = self.panorama.get_frame()
- if frame is not None:
- frame_size = (frame.shape[1], frame.shape[0])
- detections = self._detect_persons(frame)
- if detections:
- tracked = self.tracker.update(detections)
- self._update_tracking_targets(tracked, frame_size)
- self._process_current_target(frame, frame_size)
-
- self._cleanup_expired_targets()
- except Exception as e:
- print(f"事件处理错误: {e}")
- time.sleep(0.1)
-
- def _process_event(self, event: dict):
- event_type = event['type']
- event_data = event['data']
- print(f"处理事件: {event_type}")
-
- if event_type == 'intruder' and 'position' in event_data:
- x_ratio, y_ratio = event_data['position']
- self.force_track_position(x_ratio, y_ratio)
- @dataclass
- class PTZCommand:
- """PTZ控制命令"""
- pan: float
- tilt: float
- zoom: int
- x_ratio: float = 0.0
- y_ratio: float = 0.0
- use_calibration: bool = True
- track_id: Optional[int] = None # 跟踪目标ID(用于配对图片保存)
- batch_id: Optional[str] = None # 批次ID(用于配对图片保存)
- person_index: int = -1 # 人员在批次中的序号(用于配对图片保存)
- class AsyncCoordinator(Coordinator):
- """
- 异步联动控制器 — 检测线程与PTZ控制线程分离
-
- 改进:
- 1. 检测线程:持续读取全景帧 + YOLO推理
- 2. PTZ控制线程:通过命令队列接收目标位置,独立控制球机
- 3. 两线程通过 queue 通信,互不阻塞
- 4. PTZ位置确认:移动后等待球机到位并验证帧
- """
-
- PTZ_CONFIRM_WAIT = 0.3 # PTZ命令后等待稳定的秒数
- PTZ_CONFIRM_TIMEOUT = 2.0 # PTZ位置确认超时
- PTZ_COMMAND_COOLDOWN = 0.15 # PTZ命令最小间隔秒数
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- # PTZ命令队列(检测→PTZ)
- self._ptz_queue: queue.Queue = queue.Queue(maxsize=10)
-
- # 线程
- self._detection_thread = None
- self._ptz_thread = None
-
- # PTZ确认回调
- self._on_ptz_confirmed: Optional[Callable] = None
-
- # 上次PTZ命令时间
- self._last_ptz_time = 0.0
-
- # 配对图片保存器
- self._enable_paired_saving = DETECTION_CONFIG.get('enable_paired_saving', False)
- self._paired_saver: Optional[PairedImageSaver] = None
- self._current_batch_id: Optional[str] = None
- self._person_ptz_index: Dict[int, int] = {} # track_id -> person_index
-
- if self._enable_paired_saving:
- save_dir = DETECTION_CONFIG.get('paired_image_dir', '/home/admin/dsh/paired_images')
- time_window = DETECTION_CONFIG.get('paired_time_window', 5.0)
- self._paired_saver = get_paired_saver(base_dir=save_dir, time_window=time_window)
- logger.info(f"[AsyncCoordinator] 配对图片保存已启用: 目录={save_dir}, 时间窗口={time_window}s")
-
- def start(self) -> bool:
- """启动联动(覆盖父类,启动双线程)"""
- if not self.panorama.connect():
- print("连接全景摄像头失败")
- return False
-
- if self.enable_ptz_camera:
- if not self.ptz.connect():
- print("连接球机失败")
- self.panorama.disconnect()
- return False
-
- # 启动球机RTSP流(用于球机端人体检测)
- if self.enable_ptz_detection:
- if not self.ptz.start_stream_rtsp():
- print("球机RTSP流启动失败,禁用球机端检测功能")
- self.enable_ptz_detection = False
- else:
- # 初始化球机端人体检测器
- self._init_ptz_detector()
- else:
- print("PTZ球机功能已禁用")
-
- if not self.panorama.start_stream_rtsp():
- print("RTSP视频流启动失败,尝试SDK方式...")
- if not self.panorama.start_stream():
- print("启动视频流失败")
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
- return False
-
- self.running = True
-
- # 启动检测线程
- self._detection_thread = threading.Thread(
- target=self._detection_worker, name="detection-worker", daemon=True)
- self._detection_thread.start()
-
- # 启动PTZ控制线程
- if self.enable_ptz_camera and self.enable_ptz_tracking:
- self._ptz_thread = threading.Thread(
- target=self._ptz_worker, name="ptz-worker", daemon=True)
- self._ptz_thread.start()
-
- print("异步联动系统已启动 (检测线程 + PTZ控制线程)")
- return True
-
- def stop(self):
- """停止联动"""
- self.running = False
-
- # 清空PTZ队列,让工作线程退出
- while not self._ptz_queue.empty():
- try:
- self._ptz_queue.get_nowait()
- except queue.Empty:
- break
-
- if self._detection_thread:
- self._detection_thread.join(timeout=3)
- if self._ptz_thread:
- self._ptz_thread.join(timeout=3)
-
- # 停止父类线程(如果有的话)
- if self.coordinator_thread:
- self.coordinator_thread.join(timeout=1)
-
- # 关闭配对保存器
- if self._paired_saver is not None:
- self._paired_saver.close()
- self._paired_saver = None
-
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
-
- self._print_stats()
- print("异步联动系统已停止")
-
- def _detection_worker(self):
- """检测线程:持续读帧 + YOLO推理 + 发送PTZ命令 + 打印检测日志"""
- last_detection_time = 0
- # 优先使用 detection_fps,默认每秒2帧
- detection_fps = self.config.get('detection_fps', 2)
- detection_interval = 1.0 / detection_fps # 根据FPS计算间隔
- ptz_cooldown = self.config.get('ptz_command_cooldown', 0.5)
- ptz_threshold = self.config.get('ptz_position_threshold', 0.03)
- frame_count = 0
- last_log_time = time.time()
- log_interval = 5.0 # 每5秒打印一次帧率统计
- detection_run_count = 0
- detection_person_count = 0
- last_no_detect_log_time = 0
- no_detect_log_interval = 30.0
-
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- if self.detector is None:
- logger.warning("[检测线程] ⚠️ 人体检测器未初始化! 检测功能不可用, 请检查 YOLO 模型是否正确加载")
- elif not self.enable_detection:
- logger.warning("[检测线程] ⚠️ 人体检测已禁用 (enable_detection=False)")
- else:
- logger.info(f"[检测线程] ✓ 人体检测器已就绪, 检测帧率={detection_fps}fps(间隔={detection_interval:.2f}s), PTZ冷却={ptz_cooldown}s")
-
- while self.running:
- try:
- current_time = time.time()
- frame = self.panorama.get_frame()
- if frame is None:
- time.sleep(0.01)
- continue
-
- frame_count += 1
- self._update_stats('frames_processed')
- frame_size = (frame.shape[1], frame.shape[0])
-
- if current_time - last_log_time >= log_interval:
- elapsed = current_time - last_log_time
- fps = frame_count / elapsed if elapsed > 0 else 0
-
- state_str = self.state.name if hasattr(self.state, 'name') else str(self.state)
- stats_parts = [f"帧率={fps:.1f}fps", f"处理帧={frame_count}", f"状态={state_str}"]
-
- if self.detector is None:
- stats_parts.append("检测器=未加载")
- elif not self.enable_detection:
- stats_parts.append("检测=已禁用")
- else:
- stats_parts.append(f"检测轮次={detection_run_count}(有人={detection_person_count})")
-
- with self.targets_lock:
- target_count = len(self.tracking_targets)
- stats_parts.append(f"跟踪目标={target_count}")
-
- logger.info(f"[检测线程] {', '.join(stats_parts)}")
- frame_count = 0
- last_log_time = current_time
-
- # 周期性检测(约1次/秒)
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
- detection_run_count += 1
-
- # YOLO 人体检测
- detections = self._detect_persons(frame)
-
- if detections:
- self._update_stats('persons_detected', len(detections))
- detection_person_count += 1
-
- # 更新跟踪
- tracked = self.tracker.update(detections)
- self._update_tracking_targets(tracked, frame_size)
-
- # 配对图片保存:创建新批次
- if tracked and self._enable_paired_saving and self._paired_saver is not None:
- self._create_detection_batch(frame, tracked, frame_size)
-
- # 打印检测日志
- if tracked:
- for t in tracked:
- # tracked 是 DetectedObject,使用 center 计算位置
- x_ratio = t.center[0] / frame_size[0]
- y_ratio = t.center[1] / frame_size[1]
- _, _, w, h = t.bbox
- area = w * h
- logger.info(
- f"[检测] ✓ 目标ID={t.track_id} "
- f"位置=({x_ratio:.3f}, {y_ratio:.3f}) "
- f"面积={area} 置信度={t.confidence:.2f}"
- )
- elif detections:
- # 有检测但没跟踪上
- for d in detections:
- logger.debug(f"[检测] 未跟踪: {d.class_name} @ {d.center}")
- else:
- if current_time - last_no_detect_log_time >= no_detect_log_interval:
- logger.info(
- f"[检测] · YOLO检测运行正常, 本轮未检测到人员 "
- f"(累计检测{detection_run_count}轮, 检测到人{detection_person_count}轮)"
- )
- last_no_detect_log_time = current_time
-
- if tracked:
- self._process_detections(tracked, frame, frame_size)
-
- # 选择跟踪目标并发送PTZ命令
- target = self._select_tracking_target()
- if target and self.enable_ptz_tracking and self.enable_ptz_camera:
- self._send_ptz_command_with_log(target, frame_size)
- elif not tracked and self.current_target:
- # 目标消失,切回IDLE
- self._set_state(TrackingState.IDLE)
- logger.info("[检测] 目标丢失,球机进入IDLE状态")
- self.current_target = None
-
- self._cleanup_expired_targets()
- time.sleep(0.01)
-
- except Exception as e:
- logger.error(f"检测线程错误: {e}")
- time.sleep(0.1)
-
- def _init_ptz_detector(self):
- """初始化球机端人体检测器"""
- try:
- model_path = DETECTION_CONFIG.get('model_path')
- model_type = DETECTION_CONFIG.get('model_type', 'auto')
- conf_threshold = DETECTION_CONFIG.get('person_threshold', 0.5)
-
- if model_path:
- self.ptz_detector = PTZPersonDetector(
- model_path=model_path,
- model_type=model_type,
- confidence_threshold=conf_threshold
- )
- self.auto_zoom_controller = PTZAutoZoomController(
- ptz_camera=self.ptz,
- detector=self.ptz_detector,
- config=self.auto_zoom_config
- )
- print(f"[AsyncCoordinator] 球机端人体检测器初始化成功")
- else:
- print("[AsyncCoordinator] 未配置球机检测模型路径,禁用球机端检测")
- self.enable_ptz_detection = False
- except Exception as e:
- print(f"[AsyncCoordinator] 球机端检测器初始化失败: {e}")
- self.enable_ptz_detection = False
- def _create_detection_batch(self, frame: np.ndarray,
- tracked: List[DetectedObject],
- frame_size: Tuple[int, int]):
- """
- 创建检测批次,用于配对图片保存
-
- Args:
- frame: 全景帧
- tracked: 跟踪到的人员列表
- frame_size: 帧尺寸
- """
- if self._paired_saver is None:
- return
-
- # 过滤有效人员(置信度 >= 阈值)
- person_threshold = DETECTION_CONFIG.get('person_threshold', 0.8)
- valid_persons = []
- for det in tracked:
- if det.confidence >= person_threshold:
- valid_persons.append(det)
-
- if not valid_persons:
- logger.debug(f"[配对保存] 无有效人员(阈值={person_threshold}),跳过批次创建")
- return
-
- # 构建人员信息列表(只包含有效人员)
- persons = []
- self._person_ptz_index = {} # 重置索引映射
-
- for i, det in enumerate(valid_persons):
- x_ratio = det.center[0] / frame_size[0]
- y_ratio = det.center[1] / frame_size[1]
-
- person_info = {
- 'track_id': det.track_id,
- 'position': (x_ratio, y_ratio),
- 'bbox': (det.bbox[0], det.bbox[1],
- det.bbox[0] + det.bbox[2],
- det.bbox[1] + det.bbox[3]),
- 'confidence': det.confidence
- }
- persons.append(person_info)
- self._person_ptz_index[det.track_id] = i
-
- # 创建新批次
- batch_id = self._paired_saver.start_new_batch(frame, persons)
- if batch_id:
- self._current_batch_id = batch_id
- logger.info(f"[配对保存] 创建批次: {batch_id}, 有效人员={len(persons)}/{len(tracked)}")
- def _save_ptz_image_for_person(self, track_id: int,
- ptz_frame: np.ndarray,
- ptz_position: Tuple[float, float, int]):
- """
- 保存球机聚焦图片到对应批次
-
- Args:
- track_id: 人员跟踪ID
- ptz_frame: 球机帧
- ptz_position: PTZ位置 (pan, tilt, zoom)
- """
- if (self._paired_saver is None or
- self._current_batch_id is None or
- track_id not in self._person_ptz_index):
- return
-
- person_index = self._person_ptz_index[track_id]
-
- self._paired_saver.save_ptz_image(
- batch_id=self._current_batch_id,
- person_index=person_index,
- ptz_frame=ptz_frame,
- ptz_position=ptz_position,
- ptz_bbox=getattr(self, '_last_ptz_bbox', None)
- )
- def _save_ptz_image_for_person_batch(self, batch_id: str, person_index: int,
- ptz_frame: np.ndarray,
- ptz_position: Tuple[float, float, int]):
- """
- 保存球机聚焦图片到指定批次(直接使用 batch_id,不依赖当前批次)
-
- Args:
- batch_id: 批次ID
- person_index: 人员序号
- ptz_frame: 球机帧
- ptz_position: PTZ位置 (pan, tilt, zoom)
- """
- if self._paired_saver is None:
- return
-
- self._paired_saver.save_ptz_image(
- batch_id=batch_id,
- person_index=person_index,
- ptz_frame=ptz_frame,
- ptz_position=ptz_position,
- ptz_bbox=getattr(self, '_last_ptz_bbox', None)
- )
- def _ptz_worker(self):
- """PTZ控制线程:从队列接收命令并控制球机"""
- while self.running:
- try:
- try:
- cmd = self._ptz_queue.get(timeout=0.1)
- except queue.Empty:
- continue
-
- # 执行PTZ命令(batch_id 和 person_index 已在命令中)
- self._execute_ptz_command(cmd)
-
- except Exception as e:
- print(f"PTZ控制线程错误: {e}")
- time.sleep(0.05)
-
- def _select_tracking_target(self) -> Optional[TrackingTarget]:
- """选择当前跟踪目标"""
- with self.targets_lock:
- if not self.tracking_targets:
- self._set_state(TrackingState.IDLE)
- self.current_target = None
- return None
-
- # 使用目标选择器选择最优目标
- self.current_target = self.target_selector.select_target(
- self.tracking_targets
- )
-
- return self.current_target
-
- def _send_ptz_command(self, target: TrackingTarget, frame_size: Tuple[int, int]):
- """将跟踪目标转化为PTZ命令放入队列"""
- x_ratio, y_ratio = target.position
-
- # 检查位置变化是否超过阈值
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- if abs(x_ratio - last_x) < self.ptz_position_threshold and \
- abs(y_ratio - last_y) < self.ptz_position_threshold:
- return
-
- # 冷却检查
- current_time = time.time()
- if current_time - self._last_ptz_time < self.PTZ_COMMAND_COOLDOWN:
- return
-
- cmd = PTZCommand(
- pan=0, tilt=0, zoom=0,
- x_ratio=x_ratio, y_ratio=y_ratio,
- use_calibration=self.enable_calibration
- )
-
- try:
- self._ptz_queue.put_nowait(cmd)
- self.last_ptz_position = (x_ratio, y_ratio)
- except queue.Full:
- pass # 丢弃命令,下一个检测周期会重发
-
- def _send_ptz_command_with_log(self, target: TrackingTarget, frame_size: Tuple[int, int]):
- """发送PTZ命令并打印日志"""
- x_ratio, y_ratio = target.position
-
- # 检查位置变化是否超过阈值
- ptz_threshold = self.config.get('ptz_position_threshold', 0.03)
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- dx = abs(x_ratio - last_x)
- dy = abs(y_ratio - last_y)
- if dx < ptz_threshold and dy < ptz_threshold:
- logger.debug(f"[PTZ] 位置变化太小(dx={dx:.4f}, dy={dy:.4f}),跳过")
- return
-
- # 冷却检查
- current_time = time.time()
- ptz_cooldown = self.config.get('ptz_command_cooldown', 0.5)
- if current_time - self._last_ptz_time < ptz_cooldown:
- logger.debug(f"[PTZ] 冷却中,跳过 (间隔={current_time - self._last_ptz_time:.2f}s < {ptz_cooldown}s)")
- return
-
- # 计算PTZ角度(用于日志)
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- coord_type = "校准坐标"
- else:
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
- coord_type = "估算坐标"
-
- # 获取当前批次信息和人员序号
- batch_id = self._current_batch_id if self._enable_paired_saving else None
- person_index = self._person_ptz_index.get(target.track_id, -1) if self._enable_paired_saving else -1
-
- cmd = PTZCommand(
- pan=0, tilt=0, zoom=0,
- x_ratio=x_ratio, y_ratio=y_ratio,
- use_calibration=self.enable_calibration,
- track_id=target.track_id, # 传递跟踪ID
- batch_id=batch_id, # 传递批次ID
- person_index=person_index # 传递人员序号
- )
-
- try:
- self._ptz_queue.put_nowait(cmd)
- self.last_ptz_position = (x_ratio, y_ratio)
- self._update_stats('ptz_commands_sent' if 'ptz_commands_sent' in self.stats else 'persons_detected')
- logger.info(
- f"[PTZ] 命令已发送: 目标ID={target.track_id} "
- f"全景位置=({x_ratio:.3f}, {y_ratio:.3f}) → "
- f"PTZ角度=(pan={pan:.1f}°, tilt={tilt:.1f}°, zoom={zoom}) [{coord_type}]"
- )
- except queue.Full:
- logger.warning("[PTZ] 命令队列满,丢弃本次命令")
-
- def _execute_ptz_command(self, cmd: PTZCommand):
- """
- 执行PTZ命令(在PTZ线程中)
-
- Args:
- cmd: PTZ命令(包含 batch_id, person_index, track_id 用于配对保存)
- """
- self._last_ptz_time = time.time()
-
- # 从命令中提取配对保存相关信息
- track_id = cmd.track_id
- batch_id = cmd.batch_id
- person_index = cmd.person_index
-
- if cmd.use_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(cmd.x_ratio, cmd.y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- else:
- pan, tilt, zoom = self.ptz.calculate_ptz_position(cmd.x_ratio, cmd.y_ratio)
-
- self._set_state(TrackingState.TRACKING)
- logger.info(
- f"[PTZ] 执行: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom} "
- f"(全景位置=({cmd.x_ratio:.3f}, {cmd.y_ratio:.3f}), "
- f"batch={batch_id}, person={person_index})"
- )
-
- success = self.ptz.goto_exact_position(pan, tilt, zoom)
-
- if success:
- # 等待球机物理移动到位(增加额外等待确保画面清晰)
- time.sleep(self.PTZ_CONFIRM_WAIT)
-
- # 球机端人体检测与自动对焦
- final_pan, final_tilt, final_zoom = pan, tilt, zoom
- if self.enable_ptz_detection and self.auto_zoom_config.get('enabled', False):
- auto_zoom_result = self._auto_zoom_person(pan, tilt, zoom)
- if auto_zoom_result != zoom:
- final_zoom = auto_zoom_result
- # 自动变焦后再次等待画面稳定
- time.sleep(0.5)
-
- # 获取清晰的球机画面(尝试多次获取最新帧)
- ptz_frame = self._get_clear_ptz_frame()
-
- # 保存球机图片到配对批次(使用命令中的 batch_id 和 person_index)
- if self._enable_paired_saving and batch_id is not None and person_index >= 0 and ptz_frame is not None:
- # 使用球机端检测器检测人体并标记
- ptz_frame_marked = self._mark_ptz_frame_with_detection(ptz_frame, person_index=person_index)
- self._save_ptz_image_for_person_batch(batch_id, person_index, ptz_frame_marked, (final_pan, final_tilt, final_zoom))
- elif self._enable_paired_saving:
- logger.warning(f"[配对保存] 跳过球机图保存: batch_id={batch_id}, person_index={person_index}, frame={ptz_frame is not None}")
-
- logger.info(f"[PTZ] 到位确认完成: pan={final_pan:.1f}° tilt={final_tilt:.1f}° zoom={final_zoom}")
- else:
- logger.warning(f"[PTZ] 命令执行失败: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}")
-
- def _auto_zoom_person(self, initial_pan: float, initial_tilt: float, initial_zoom: int) -> int:
- """
- 自动对焦人体
- 在球机画面中检测人体,自动调整zoom使人体居中且大小合适
-
- Returns:
- 最终的 zoom 值
- """
- if self.auto_zoom_controller is None:
- return initial_zoom
-
- logger.info("[AutoZoom] 开始自动对焦...")
-
- try:
- success, final_zoom = self.auto_zoom_controller.auto_focus_loop(
- get_frame_func=self.ptz.get_frame,
- max_attempts=self.auto_zoom_config.get('max_adjust_attempts', 3)
- )
-
- if success:
- logger.info(f"[AutoZoom] 自动对焦成功: zoom={final_zoom}")
- return final_zoom
- else:
- logger.warning("[AutoZoom] 自动对焦未能定位人体")
- return initial_zoom
- except Exception as e:
- logger.error(f"[AutoZoom] 自动对焦异常: {e}")
- return initial_zoom
-
- def _get_clear_ptz_frame(self, max_attempts: int = 5, wait_interval: float = 0.1) -> Optional[np.ndarray]:
- """
- 获取清晰的球机画面
- 尝试多次获取,丢弃模糊/过渡帧
-
- Args:
- max_attempts: 最大尝试次数
- wait_interval: 每次等待间隔
-
- Returns:
- 清晰的球机帧或 None
- """
- best_frame = None
- best_score = -1
-
- for i in range(max_attempts):
- frame = self.ptz.get_frame()
- if frame is not None:
- # 使用拉普拉斯算子评估图像清晰度
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
-
- if laplacian_var > best_score:
- best_score = laplacian_var
- best_frame = frame.copy()
-
- # 如果清晰度足够高,直接返回
- if laplacian_var > 100: # 清晰度阈值
- return frame
-
- time.sleep(wait_interval)
-
- return best_frame
-
- def _mark_ptz_frame_with_detection(self, frame: np.ndarray, person_index: int) -> np.ndarray:
- """
- 在球机帧上标记检测到的人体
-
- Args:
- frame: 球机帧
- person_index: 人员序号
-
- Returns:
- 标记后的帧
- """
- marked_frame = frame.copy()
- h, w = marked_frame.shape[:2]
-
- # 重置保存的bbox
- self._last_ptz_bbox = None
-
- # 使用球机端检测器检测人体
- if self.ptz_detector is not None:
- try:
- persons = self.ptz_detector.detect(frame)
- if persons:
- # 找到最大的人体(假设是目标)
- largest_person = max(persons, key=lambda p: p.area)
- x1, y1, x2, y2 = largest_person.bbox
-
- # 保存bbox供后续使用
- self._last_ptz_bbox = (x1, y1, x2, y2)
-
- # 绘制边界框(红色,区别于全景的绿色)
- cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
-
- # 绘制标签
- label = f"person_{person_index} ({largest_person.confidence:.2f})"
- (label_w, label_h), _ = cv2.getTextSize(
- label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2
- )
-
- # 标签背景(红色)
- cv2.rectangle(
- marked_frame,
- (x1, y1 - label_h - 8),
- (x1 + label_w, y1),
- (0, 0, 255),
- -1
- )
-
- # 标签文字(白色)
- cv2.putText(
- marked_frame, label,
- (x1, y1 - 4),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7,
- (255, 255, 255), 2
- )
-
- logger.info(f"[配对保存] 球机图标记: person_{person_index}, "
- f"位置=({x1},{y1},{x2},{y2}), 置信度={largest_person.confidence:.2f}")
- else:
- # 未检测到人体,在画面中心添加提示
- cv2.putText(
- marked_frame, f"person_{person_index} (no detection)",
- (w // 2 - 100, h // 2),
- cv2.FONT_HERSHEY_SIMPLEX, 0.8,
- (0, 0, 255), 2
- )
- except Exception as e:
- logger.error(f"[配对保存] 球机图检测标记失败: {e}")
-
- return marked_frame
-
- def _confirm_ptz_position(self, x_ratio: float, y_ratio: float):
- """PTZ位置确认:读取球机帧验证目标是否可见"""
- if not hasattr(self.ptz, 'get_frame') or self.ptz.get_frame() is None:
- return
-
- ptz_frame = self.ptz.get_frame()
- if ptz_frame is None:
- return
-
- # 未来可以在这里添加球机帧目标验证逻辑
- # 例如:在球机帧中检测目标是否在画面中心附近
-
- def on_ptz_confirmed(self, callback: Callable):
- """注册PTZ位置确认回调"""
- self._on_ptz_confirmed = callback
|