| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876 |
- """
- 联动控制器
- 协调全景摄像头和球机的工作
- """
- import time
- import threading
- import queue
- import logging
- import math
- from typing import Optional, List, Dict, Tuple, Callable
- from dataclasses import dataclass, field
- from enum import Enum
- import numpy as np
- import cv2
- from config import COORDINATOR_CONFIG, SYSTEM_CONFIG, PTZ_CONFIG, DETECTION_CONFIG
- from panorama_camera import PanoramaCamera, ObjectDetector, DetectedObject
- from ptz_camera import PTZCamera, PTZController
- from ocr_recognizer import NumberDetector, PersonInfo
- from ptz_person_tracker import PTZPersonDetector, PTZAutoZoomController
- from paired_image_saver import PairedImageSaver, get_paired_saver
- logger = logging.getLogger(__name__)
- class TrackingState(Enum):
- """跟踪状态"""
- IDLE = 0 # 空闲
- SEARCHING = 1 # 搜索目标
- TRACKING = 2 # 跟踪中
- ZOOMING = 3 # 变焦中
- OCR_PROCESSING = 4 # OCR处理中
- @dataclass
- class TrackingTarget:
- """跟踪目标"""
- track_id: int # 跟踪ID
- position: Tuple[float, float] # 位置比例 (x_ratio, y_ratio)
- last_update: float # 最后更新时间
- person_info: Optional[PersonInfo] = None # 人员信息
- priority: int = 0 # 优先级
- area: int = 0 # 目标面积(像素²)
- confidence: float = 0.0 # 置信度
- center_distance: float = 1.0 # 到画面中心的距离比例(0-1)
- score: float = 0.0 # 综合得分
- class TargetSelector:
- """
- 目标选择策略类
- 支持按面积、置信度、混合模式排序,支持优先级切换
- """
-
- def __init__(self, config: Dict = None):
- """
- 初始化目标选择器
- Args:
- config: 目标选择配置
- """
- self.config = config or {
- 'strategy': 'area',
- 'area_weight': 0.6,
- 'confidence_weight': 0.4,
- 'min_area_threshold': 5000,
- 'prefer_center': True,
- 'center_weight': 0.2,
- 'switch_on_lost': True,
- 'stickiness': 0.3,
- }
- self.current_target_id: Optional[int] = None
- self.current_target_score: float = 0.0
-
- def calculate_score(self, target: TrackingTarget, frame_size: Tuple[int, int] = None) -> float:
- """
- 计算目标综合得分
- Args:
- target: 跟踪目标
- frame_size: 帧尺寸(w, h),用于计算中心距离
- Returns:
- 综合得分(0-1)
- """
- strategy = self.config.get('strategy', 'area')
- area_weight = self.config.get('area_weight', 0.6)
- conf_weight = self.config.get('confidence_weight', 0.4)
- min_area = self.config.get('min_area_threshold', 5000)
- prefer_center = self.config.get('prefer_center', False)
- center_weight = self.config.get('center_weight', 0.2)
-
- # 归一化面积得分 (对数缩放,避免大目标得分过高)
- import math
- area_score = min(1.0, math.log10(max(target.area, 1)) / 5.0) # 100000像素² ≈ 1.0
-
- # 小面积惩罚
- if target.area < min_area:
- area_score *= 0.5
-
- # 置信度得分直接使用
- conf_score = target.confidence
-
- # 中心距离得分 (距离中心越近得分越高)
- center_score = 1.0 - target.center_distance
-
- # 根据策略计算综合得分
- if strategy == 'area':
- score = area_score * 0.8 + conf_score * 0.2
- elif strategy == 'confidence':
- score = conf_score * 0.8 + area_score * 0.2
- else: # hybrid
- score = area_score * area_weight + conf_score * conf_weight
-
- # 加入中心距离权重
- if prefer_center:
- score = score * (1 - center_weight) + center_score * center_weight
-
- return score
-
- def select_target(self, targets: Dict[int, TrackingTarget],
- frame_size: Tuple[int, int] = None) -> Optional[TrackingTarget]:
- """
- 从多个目标中选择最优目标
- Args:
- targets: 目标字典 {track_id: TrackingTarget}
- frame_size: 帧尺寸
- Returns:
- 最优目标
- """
- if not targets:
- self.current_target_id = None
- return None
-
- stickiness = self.config.get('stickiness', 0.3)
- switch_on_lost = self.config.get('switch_on_lost', True)
-
- # 计算所有目标得分
- scored_targets = []
- for track_id, target in targets.items():
- target.score = self.calculate_score(target, frame_size)
- scored_targets.append((track_id, target, target.score))
-
- # 按得分排序
- scored_targets.sort(key=lambda x: x[2], reverse=True)
-
- # 检查当前目标是否仍在列表中
- if self.current_target_id is not None:
- current_exists = self.current_target_id in targets
- if current_exists:
- # 应用粘性:当前目标得分需要显著低于最优目标才切换
- best_id, best_target, best_score = scored_targets[0]
- current_target = targets[self.current_target_id]
-
- # 粘性阈值: 当前目标得分 > 最优得分 * (1 - stickiness) 时保持
- stickiness_threshold = best_score * (1 - stickiness)
- if current_target.score > stickiness_threshold:
- return current_target
-
-
- # 选择得分最高的目标
- best_id, best_target, best_score = scored_targets[0]
- self.current_target_id = best_id
- self.current_target_score = best_score
-
- logger.debug(
- f"[目标选择] 选择目标ID={best_id} 得分={best_score:.3f} "
- f"面积={best_target.area} 置信度={best_target.confidence:.2f}"
- )
-
- return best_target
-
- def get_sorted_targets(self, targets: Dict[int, TrackingTarget],
- frame_size: Tuple[int, int] = None) -> List[Tuple[TrackingTarget, float]]:
- """
- 获取按得分排序的目标列表
- Args:
- targets: 目标字典
- frame_size: 帧尺寸
- Returns:
- 排序后的目标列表 [(target, score), ...]
- """
- scored = []
- for target in targets.values():
- target.score = self.calculate_score(target, frame_size)
- scored.append((target, target.score))
-
- scored.sort(key=lambda x: x[1], reverse=True)
- return scored
-
- def set_strategy(self, strategy: str):
- """设置选择策略"""
- self.config['strategy'] = strategy
- logger.info(f"[目标选择] 策略已切换为: {strategy}")
-
- def set_stickiness(self, stickiness: float):
- """设置目标粘性"""
- self.config['stickiness'] = max(0.0, min(1.0, stickiness))
- logger.info(f"[目标选择] 粘性已设置为: {self.config['stickiness']}")
- class Coordinator:
- """
- 联动控制器
- 协调全景摄像头和球机实现联动抓拍
- """
-
- def __init__(self, panorama_camera: PanoramaCamera,
- ptz_camera: PTZCamera,
- detector: ObjectDetector = None,
- number_detector: NumberDetector = None,
- calibrator = None):
- """
- 初始化联动控制器
- Args:
- panorama_camera: 全景摄像头
- ptz_camera: 球机
- detector: 物体检测器
- number_detector: 编号检测器
- calibrator: 校准器 (用于坐标转换)
- """
- self.panorama = panorama_camera
- self.ptz = ptz_camera
- self.detector = detector
- self.number_detector = number_detector
- self.calibrator = calibrator
-
- self.config = COORDINATOR_CONFIG
-
- # 功能开关 - 从 SYSTEM_CONFIG 读取
- self.enable_ptz_camera = SYSTEM_CONFIG.get('enable_ptz_camera', True)
- self.enable_ptz_tracking = SYSTEM_CONFIG.get('enable_ptz_tracking', True)
- self.enable_calibration = SYSTEM_CONFIG.get('enable_calibration', True)
- self.enable_detection = SYSTEM_CONFIG.get('enable_detection', True)
- self.enable_ocr = SYSTEM_CONFIG.get('enable_ocr', True)
-
- # 球机端人体检测与自动对焦
- self.enable_ptz_detection = PTZ_CONFIG.get('enable_ptz_detection', False)
- self.auto_zoom_config = PTZ_CONFIG.get('auto_zoom', {})
- self.ptz_detector = None
- self.auto_zoom_controller = None
-
- # 状态
- self.state = TrackingState.IDLE
- self.state_lock = threading.Lock()
-
- # 跟踪目标
- self.tracking_targets: Dict[int, TrackingTarget] = {}
- self.targets_lock = threading.Lock()
-
- # 当前跟踪目标
- self.current_target: Optional[TrackingTarget] = None
-
- # 回调函数
- self.on_person_detected: Optional[Callable] = None
- self.on_number_recognized: Optional[Callable] = None
- self.on_tracking_started: Optional[Callable] = None
- self.on_tracking_stopped: Optional[Callable] = None
-
- # 控制标志
- self.running = False
- self.coordinator_thread = None
-
- # OCR频率控制
- self.last_ocr_time = 0
- self.ocr_interval = 1.0 # OCR间隔(秒),避免过于频繁调用API
-
- # PTZ优化 - 避免频繁发送相同位置的命令
- self.last_ptz_position = None
- self.ptz_position_threshold = self.config.get('ptz_position_threshold', 0.03)
-
- # 目标选择器
- self.target_selector = TargetSelector(
- self.config.get('target_selection', {})
- )
-
- # 结果队列
- self.result_queue = queue.Queue()
-
- # 性能统计
- self.stats = {
- 'frames_processed': 0,
- 'persons_detected': 0,
- 'ocr_attempts': 0,
- 'ocr_success': 0,
- 'start_time': None,
- 'last_frame_time': None,
- }
- self.stats_lock = threading.Lock()
-
- def set_calibrator(self, calibrator):
- """设置校准器"""
- self.calibrator = calibrator
-
- def _transform_position(self, x_ratio: float, y_ratio: float) -> Tuple[float, float, int]:
- """
- 将全景坐标转换为PTZ角度
- Args:
- x_ratio: X方向比例
- y_ratio: Y方向比例
- Returns:
- (pan, tilt, zoom)
- """
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- # 使用校准结果进行转换
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- zoom = 8 # 默认变倍
- else:
- # 使用默认估算
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
-
- return (pan, tilt, zoom)
-
- def start(self) -> bool:
- """
- 启动联动系统
- Returns:
- 是否成功
- """
- # 连接全景摄像头
- if not self.panorama.connect():
- print("连接全景摄像头失败")
- return False
-
- # 连接 PTZ 球机 (可选)
- if self.enable_ptz_camera:
- if not self.ptz.connect():
- print("连接球机失败")
- self.panorama.disconnect()
- return False
- else:
- print("PTZ 球机功能已禁用")
-
- # 启动视频流(优先RTSP,SDK回调不可用时回退)
- if not self.panorama.start_stream_rtsp():
- print("RTSP视频流启动失败,尝试SDK方式...")
- if not self.panorama.start_stream():
- print("启动视频流失败")
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
- return False
-
- # 启动联动线程
- self.running = True
- self.coordinator_thread = threading.Thread(target=self._coordinator_worker, daemon=True)
- self.coordinator_thread.start()
-
- print("联动系统已启动")
- return True
-
- def stop(self):
- """停止联动系统"""
- self.running = False
-
- if self.coordinator_thread:
- self.coordinator_thread.join(timeout=3)
-
- self.panorama.disconnect()
-
- if self.enable_ptz_camera:
- self.ptz.disconnect()
-
- # 打印统计信息
- self._print_stats()
-
- print("联动系统已停止")
-
- def _update_stats(self, key: str, value: int = 1):
- """更新统计信息"""
- with self.stats_lock:
- if key in self.stats:
- self.stats[key] += value
-
- def _print_stats(self):
- """打印统计信息"""
- with self.stats_lock:
- if self.stats['start_time'] and self.stats['frames_processed'] > 0:
- elapsed = time.time() - self.stats['start_time']
- fps = self.stats['frames_processed'] / elapsed
- print("\n=== 性能统计 ===")
- print(f"运行时长: {elapsed:.1f}秒")
- print(f"处理帧数: {self.stats['frames_processed']}")
- print(f"平均帧率: {fps:.1f} fps")
- print(f"检测人体: {self.stats['persons_detected']}次")
- print(f"OCR尝试: {self.stats['ocr_attempts']}次")
- print(f"OCR成功: {self.stats['ocr_success']}次")
- print("================\n")
-
- def get_stats(self) -> dict:
- """获取统计信息"""
- with self.stats_lock:
- return self.stats.copy()
-
- def _coordinator_worker(self):
- """联动工作线程"""
- last_detection_time = 0
- # 从 DETECTION_CONFIG 获取检测帧率,默认每秒2帧
- detection_fps = self.config.get('detection_fps', DETECTION_CONFIG.get('detection_fps', 2))
- detection_interval = 1.0 / detection_fps # 根据FPS计算间隔
-
- # 初始化统计
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- while self.running:
- try:
- current_time = time.time()
-
- # 获取当前帧
- frame = self.panorama.get_frame()
- if frame is None:
- time.sleep(0.01)
- continue
-
- # 更新帧统计
- self._update_stats('frames_processed')
-
- frame_size = (frame.shape[1], frame.shape[0])
-
- # 周期性检测
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
-
- # 检测人体
- detections = self._detect_persons(frame)
-
- # 更新检测统计
- if detections:
- self._update_stats('persons_detected', len(detections))
-
- # 更新跟踪目标(track_id 在此方法内分配)
- self._update_tracking_targets(detections, frame_size)
-
- # 处理检测结果
- if detections:
- self._process_detections(detections, frame, frame_size)
-
- # 处理当前跟踪目标
- self._process_current_target(frame, frame_size)
-
- # 清理过期目标
- self._cleanup_expired_targets()
-
- time.sleep(0.01)
-
- except Exception as e:
- print(f"联动处理错误: {e}")
- time.sleep(0.1)
-
- def _detect_persons(self, frame: np.ndarray) -> List[DetectedObject]:
- """检测人体"""
- if not self.enable_detection or self.detector is None:
- return []
- return self.detector.detect_persons(frame)
-
- def _update_tracking_targets(self, detections: List[DetectedObject],
- frame_size: Tuple[int, int]):
- """更新跟踪目标(跨帧匹配,支持粘性跟踪)
-
- 改进:不再每轮清空目标,而是使用位置匹配关联连续帧的目标
- """
- current_time = time.time()
- frame_w, frame_h = frame_size
- center_x, center_y = frame_w / 2, frame_h / 2
-
- # 获取人员置信度阈值
- person_threshold = DETECTION_CONFIG.get('person_threshold', 0.8)
-
- # 过滤有效人员
- valid_detections = []
- for det in detections:
- if det.class_name != 'person':
- continue
- if det.confidence < person_threshold:
- continue
- valid_detections.append(det)
-
- with self.targets_lock:
- # 匹配阈值:位置距离小于此值认为是同一目标
- MATCH_THRESHOLD = 0.15 # 画面比例
-
- # 已匹配的检测索引
- matched_det_indices = set()
-
- # 步骤1:尝试匹配现有目标
- for track_id, target in list(self.tracking_targets.items()):
- best_match_idx = None
- best_match_dist = MATCH_THRESHOLD
-
- for idx, det in enumerate(valid_detections):
- if idx in matched_det_indices:
- continue
-
- det_x = det.center[0] / frame_w
- det_y = det.center[1] / frame_h
-
- # 计算位置距离
- dist = math.sqrt(
- (det_x - target.position[0]) ** 2 +
- (det_y - target.position[1]) ** 2
- )
-
- if dist < best_match_dist:
- best_match_dist = dist
- best_match_idx = idx
-
-
- if best_match_idx is not None:
- # 找到匹配,更新目标
- det = valid_detections[best_match_idx]
- matched_det_indices.add(best_match_idx)
-
- x_ratio = det.center[0] / frame_w
- y_ratio = det.center[1] / frame_h
- _, _, width, height = det.bbox
- area = width * height
-
- dx = abs(det.center[0] - center_x) / center_x
- dy = abs(det.center[1] - center_y) / center_y
- center_distance = (dx + dy) / 2
-
- # 更新目标属性
- self.tracking_targets[track_id] = TrackingTarget(
- track_id=track_id,
- position=(x_ratio, y_ratio),
- last_update=current_time,
- area=area,
- confidence=det.confidence,
- center_distance=center_distance,
- person_info=target.person_info # 保留之前识别的信息
- )
-
- # 步骤2:为未匹配的检测创建新目标
- for idx, det in enumerate(valid_detections):
- if idx in matched_det_indices:
- continue
-
- x_ratio = det.center[0] / frame_w
- y_ratio = det.center[1] / frame_h
- _, _, width, height = det.bbox
- area = width * height
-
- dx = abs(det.center[0] - center_x) / center_x
- dy = abs(det.center[1] - center_y) / center_y
- center_distance = (dx + dy) / 2
-
- # 分配全局唯一track_id
- with self._track_id_lock:
- new_track_id = self._next_track_id
- self._next_track_id += 1
-
-
- det.track_id = new_track_id # 更新检测对象的track_id
-
- self.tracking_targets[new_track_id] = TrackingTarget(
- track_id=new_track_id,
- position=(x_ratio, y_ratio),
- last_update=current_time,
- area=area,
- confidence=det.confidence,
- center_distance=center_distance
- )
-
- def _process_detections(self, detections: List[DetectedObject],
- frame: np.ndarray, frame_size: Tuple[int, int]):
- """处理检测结果"""
- if self.on_person_detected:
- for det in detections:
- self.on_person_detected(det, frame)
-
- def _process_current_target(self, frame: np.ndarray, frame_size: Tuple[int, int]):
- """处理当前跟踪目标"""
- with self.targets_lock:
- if not self.tracking_targets:
- self._set_state(TrackingState.IDLE)
- self.current_target = None
- return
-
- # 使用目标选择器选择最优目标
- self.current_target = self.target_selector.select_target(
- self.tracking_targets, frame_size
- )
-
- if self.current_target:
- # 移动球机到目标位置 (仅在 PTZ 跟踪启用时)
- if self.enable_ptz_tracking and self.enable_ptz_camera:
- self._set_state(TrackingState.TRACKING)
-
- x_ratio, y_ratio = self.current_target.position
-
- # 检查位置是否变化超过阈值
- should_move = True
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- if (abs(x_ratio - last_x) < self.ptz_position_threshold and
- abs(y_ratio - last_y) < self.ptz_position_threshold):
- should_move = False
-
- if should_move:
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- self.ptz.goto_exact_position(pan, tilt, zoom)
- else:
- self.ptz.track_target(x_ratio, y_ratio)
- self.last_ptz_position = (x_ratio, y_ratio)
-
- # 执行OCR识别 (仅在 OCR 启用时)
- if self.enable_ocr:
- self._perform_ocr(frame, self.current_target)
-
- def _perform_ocr(self, frame: np.ndarray, target: TrackingTarget):
- """执行OCR识别"""
- if not self.enable_ocr or self.number_detector is None:
- return
-
- # 频率控制 - 避免过于频繁调用OCR API
- current_time = time.time()
- if current_time - self.last_ocr_time < self.ocr_interval:
- return
- self.last_ocr_time = current_time
-
- # 更新OCR尝试统计
- self._update_stats('ocr_attempts')
-
- # 计算人体边界框 (基于位置估算)
- frame_h, frame_w = frame.shape[:2]
-
- # 人体占画面比例 (可配置,默认宽20%、高40%)
- person_width_ratio = self.config.get('person_width_ratio', 0.2)
- person_height_ratio = self.config.get('person_height_ratio', 0.4)
-
- person_width = int(frame_w * person_width_ratio)
- person_height = int(frame_h * person_height_ratio)
-
- x_ratio, y_ratio = target.position
- center_x = int(x_ratio * frame_w)
- center_y = int(y_ratio * frame_h)
-
- # 计算边界框,确保不超出画面范围
- x1 = max(0, center_x - person_width // 2)
- y1 = max(0, center_y - person_height // 2)
- x2 = min(frame_w, x1 + person_width)
- y2 = min(frame_h, y1 + person_height)
-
- # 更新实际宽高 (可能因边界裁剪而变小)
- actual_width = x2 - x1
- actual_height = y2 - y1
-
- person_bbox = (x1, y1, actual_width, actual_height)
-
- # 检测编号
- self._set_state(TrackingState.OCR_PROCESSING)
- person_info = self.number_detector.detect_number(frame, person_bbox)
- person_info.person_id = target.track_id
-
- # 更新OCR成功统计
- if person_info.number_text:
- self._update_stats('ocr_success')
-
- # 更新目标信息
- with self.targets_lock:
- if target.track_id in self.tracking_targets:
- self.tracking_targets[target.track_id].person_info = person_info
-
- # 回调
- if self.on_number_recognized and person_info.number_text:
- self.on_number_recognized(person_info)
-
- # 放入结果队列
- self.result_queue.put(person_info)
-
- def _cleanup_expired_targets(self):
- """清理过期目标"""
- current_time = time.time()
- timeout = self.config['tracking_timeout']
-
- with self.targets_lock:
- expired_ids = [
- target_id for target_id, target in self.tracking_targets.items()
- if current_time - target.last_update > timeout
- ]
-
- for target_id in expired_ids:
- del self.tracking_targets[target_id]
- if self.current_target and self.current_target.track_id == target_id:
- self.current_target = None
-
- def _set_state(self, state: TrackingState):
- """设置状态"""
- with self.state_lock:
- self.state = state
-
- def get_state(self) -> TrackingState:
- """获取状态"""
- with self.state_lock:
- return self.state
-
- def get_results(self) -> List[PersonInfo]:
- """
- 获取识别结果
- Returns:
- 人员信息列表
- """
- results = []
- while not self.result_queue.empty():
- try:
- results.append(self.result_queue.get_nowait())
- except queue.Empty:
- break
- return results
-
- def get_tracking_targets(self) -> List[TrackingTarget]:
- """获取当前跟踪目标"""
- with self.targets_lock:
- return list(self.tracking_targets.values())
-
- def force_track_position(self, x_ratio: float, y_ratio: float, zoom: int = None):
- """
- 强制跟踪指定位置
- Args:
- x_ratio: X方向比例
- y_ratio: Y方向比例
- zoom: 变倍
- """
- if self.enable_ptz_tracking and self.enable_ptz_camera:
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- self.ptz.goto_exact_position(pan, tilt, zoom or self.ptz.ptz_config.get('default_zoom', 8))
- else:
- self.ptz.move_to_target(x_ratio, y_ratio, zoom)
-
- def capture_snapshot(self) -> Optional[np.ndarray]:
- """
- 抓拍快照
- Returns:
- 快照图像
- """
- return self.panorama.get_frame()
- class EventDrivenCoordinator(Coordinator):
- """事件驱动联动控制器,当全景摄像头检测到事件时触发联动"""
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.event_types = {
- 'intruder': True,
- 'crossline': True,
- 'motion': True,
- }
- self.event_queue = queue.Queue()
-
- def on_event(self, event_type: str, event_data: dict):
- if not self.event_types.get(event_type, False):
- return
- self.event_queue.put({'type': event_type, 'data': event_data, 'time': time.time()})
-
- def _coordinator_worker(self):
- while self.running:
- try:
- try:
- event = self.event_queue.get(timeout=0.1)
- self._process_event(event)
- except queue.Empty:
- pass
-
- frame = self.panorama.get_frame()
- if frame is not None:
- frame_size = (frame.shape[1], frame.shape[0])
- detections = self._detect_persons(frame)
- if detections:
- # 更新跟踪目标(track_id 在此方法内分配)
- self._update_tracking_targets(detections, frame_size)
- self._process_current_target(frame, frame_size)
-
- self._cleanup_expired_targets()
- except Exception as e:
- print(f"事件处理错误: {e}")
- time.sleep(0.1)
-
- def _process_event(self, event: dict):
- event_type = event['type']
- event_data = event['data']
- print(f"处理事件: {event_type}")
-
- if event_type == 'intruder' and 'position' in event_data:
- x_ratio, y_ratio = event_data['position']
- self.force_track_position(x_ratio, y_ratio)
- @dataclass
- class PTZCommand:
- """PTZ控制命令"""
- pan: float
- tilt: float
- zoom: int
- x_ratio: float = 0.0
- y_ratio: float = 0.0
- use_calibration: bool = True
- track_id: Optional[int] = None # 跟踪目标ID(用于配对图片保存)
- batch_id: Optional[str] = None # 批次ID(用于配对图片保存)
- person_index: int = -1 # 人员在批次中的序号(用于配对图片保存)
- class AsyncCoordinator(Coordinator):
- """
- 异步联动控制器 — 检测线程与PTZ控制线程分离
-
- 改进:
- 1. 检测线程:持续读取全景帧 + YOLO推理
- 2. PTZ控制线程:通过命令队列接收目标位置,独立控制球机
- 3. 两线程通过 queue 通信,互不阻塞
- 4. PTZ位置确认:移动后等待球机到位并验证帧
- """
-
- PTZ_CONFIRM_WAIT = 0.3 # PTZ命令后等待稳定的秒数
- PTZ_CONFIRM_TIMEOUT = 2.0 # PTZ位置确认超时
- PTZ_COMMAND_COOLDOWN = 0.15 # PTZ命令最小间隔秒数
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- # PTZ命令队列(检测→PTZ)
- self._ptz_queue: queue.Queue = queue.Queue(maxsize=10)
-
- # 线程
- self._detection_thread = None
- self._ptz_thread = None
-
- # PTZ确认回调
- self._on_ptz_confirmed: Optional[Callable] = None
-
- # 上次PTZ命令时间(添加线程锁保护)
- self._last_ptz_time = 0.0
- self._last_ptz_time_lock = threading.Lock()
-
- # 跨帧跟踪:全局track_id计数器
- self._next_track_id = 1
- self._track_id_lock = threading.Lock()
-
- # 配对图片保存器
- self._enable_paired_saving = DETECTION_CONFIG.get('enable_paired_saving', False)
- self._paired_saver: Optional[PairedImageSaver] = None
- self._current_batch_id: Optional[str] = None
- self._person_ptz_index: Dict[int, int] = {} # track_id -> person_index
-
- if self._enable_paired_saving:
- save_dir = DETECTION_CONFIG.get('paired_image_dir', '/home/admin/dsh/paired_images')
- time_window = DETECTION_CONFIG.get('paired_time_window', 5.0)
- self._paired_saver = get_paired_saver(base_dir=save_dir, time_window=time_window)
- logger.info(f"[AsyncCoordinator] 配对图片保存已启用: 目录={save_dir}, 时间窗口={time_window}s")
-
- def start(self) -> bool:
- """启动联动(覆盖父类,启动双线程)"""
- if not self.panorama.connect():
- print("连接全景摄像头失败")
- return False
-
- if self.enable_ptz_camera:
- if not self.ptz.connect():
- print("连接球机失败")
- self.panorama.disconnect()
- return False
-
- # 启动球机RTSP流(用于球机端人体检测)
- if self.enable_ptz_detection:
- if not self.ptz.start_stream_rtsp():
- print("球机RTSP流启动失败,禁用球机端检测功能")
- self.enable_ptz_detection = False
- else:
- # 初始化球机端人体检测器
- self._init_ptz_detector()
- else:
- print("PTZ球机功能已禁用")
-
- if not self.panorama.start_stream_rtsp():
- print("RTSP视频流启动失败,尝试SDK方式...")
- if not self.panorama.start_stream():
- print("启动视频流失败")
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
- return False
-
- self.running = True
-
- # 启动检测线程
- self._detection_thread = threading.Thread(
- target=self._detection_worker, name="detection-worker", daemon=True)
- self._detection_thread.start()
-
- # 启动PTZ控制线程
- if self.enable_ptz_camera and self.enable_ptz_tracking:
- self._ptz_thread = threading.Thread(
- target=self._ptz_worker, name="ptz-worker", daemon=True)
- self._ptz_thread.start()
-
- print("异步联动系统已启动 (检测线程 + PTZ控制线程)")
- return True
-
- def stop(self):
- """停止联动"""
- self.running = False
-
- # 清空PTZ队列,让工作线程退出
- while not self._ptz_queue.empty():
- try:
- self._ptz_queue.get_nowait()
- except queue.Empty:
- break
-
- if self._detection_thread:
- self._detection_thread.join(timeout=3)
- if self._ptz_thread:
- self._ptz_thread.join(timeout=3)
-
- # 停止父类线程(如果有的话)
- if self.coordinator_thread:
- self.coordinator_thread.join(timeout=1)
-
- # 关闭配对保存器
- if self._paired_saver is not None:
- self._paired_saver.close()
- self._paired_saver = None
-
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
-
- self._print_stats()
- print("异步联动系统已停止")
-
- def _detection_worker(self):
- """检测线程:持续读帧 + YOLO推理 + 发送PTZ命令 + 打印检测日志"""
- last_detection_time = 0
- # 从 DETECTION_CONFIG 获取检测帧率,默认每秒2帧
- detection_fps = self.config.get('detection_fps', DETECTION_CONFIG.get('detection_fps', 2))
- detection_interval = 1.0 / detection_fps # 根据FPS计算间隔
- ptz_cooldown = self.config.get('ptz_command_cooldown', 0.5)
- ptz_threshold = self.config.get('ptz_position_threshold', 0.03)
- frame_count = 0
- last_log_time = time.time()
- log_interval = 5.0 # 每5秒打印一次帧率统计
- detection_run_count = 0
- detection_person_count = 0
- last_no_detect_log_time = 0
- no_detect_log_interval = 30.0
-
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- if self.detector is None:
- logger.warning("[检测线程] ⚠️ 人体检测器未初始化! 检测功能不可用, 请检查 YOLO 模型是否正确加载")
- elif not self.enable_detection:
- logger.warning("[检测线程] ⚠️ 人体检测已禁用 (enable_detection=False)")
- else:
- logger.info(f"[检测线程] ✓ 人体检测器已就绪, 检测帧率={detection_fps}fps(间隔={detection_interval:.2f}s), PTZ冷却={ptz_cooldown}s")
-
- while self.running:
- try:
- current_time = time.time()
- frame = self.panorama.get_frame()
- if frame is None:
- time.sleep(0.01)
- continue
-
- frame_count += 1
- self._update_stats('frames_processed')
- frame_size = (frame.shape[1], frame.shape[0])
-
- if current_time - last_log_time >= log_interval:
- elapsed = current_time - last_log_time
- fps = frame_count / elapsed if elapsed > 0 else 0
-
- state_str = self.state.name if hasattr(self.state, 'name') else str(self.state)
- stats_parts = [f"帧率={fps:.1f}fps", f"处理帧={frame_count}", f"状态={state_str}"]
-
- if self.detector is None:
- stats_parts.append("检测器=未加载")
- elif not self.enable_detection:
- stats_parts.append("检测=已禁用")
- else:
- stats_parts.append(f"检测轮次={detection_run_count}(有人={detection_person_count})")
-
- with self.targets_lock:
- target_count = len(self.tracking_targets)
- stats_parts.append(f"跟踪目标={target_count}")
-
- logger.info(f"[检测线程] {', '.join(stats_parts)}")
- frame_count = 0
- last_log_time = current_time
-
- # 周期性检测(约1次/秒)
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
- detection_run_count += 1
-
- # YOLO 人体检测
- detections = self._detect_persons(frame)
-
- if detections:
- self._update_stats('persons_detected', len(detections))
- detection_person_count += 1
-
- # 更新跟踪目标(track_id 在此方法内分配)
- self._update_tracking_targets(detections, frame_size)
-
- # 配对图片保存:创建新批次(在 _update_tracking_targets 之后,使用正确的 track_id)
- if detections and self._enable_paired_saving and self._paired_saver is not None:
- self._create_detection_batch(frame, detections, frame_size)
-
- # 打印检测日志(使用连续序号,与图片标记一致)
- if detections:
- person_threshold = DETECTION_CONFIG.get('person_threshold', 0.8)
- person_idx = 0
- for t in detections:
- # detections 是 DetectedObject,使用 center 计算位置
- x_ratio = t.center[0] / frame_size[0]
- y_ratio = t.center[1] / frame_size[1]
- _, _, w, h = t.bbox
- area = w * h
- # 只对达到阈值的人员打印日志并分配序号
- if t.class_name == 'person' and t.confidence >= person_threshold:
- logger.info(
- f"[检测] ✓ person_{person_idx} "
- f"位置=({x_ratio:.3f}, {y_ratio:.3f}) "
- f"面积={area} 置信度={t.confidence:.2f}"
- )
- person_idx += 1
- else:
- logger.debug(
- f"[检测] · 目标ID={t.track_id}({t.class_name}) "
- f"位置=({x_ratio:.3f}, {y_ratio:.3f}) "
- f"置信度={t.confidence:.2f}(低于阈值{person_threshold})"
- )
- else:
- if current_time - last_no_detect_log_time >= no_detect_log_interval:
- logger.info(
- f"[检测] · YOLO检测运行正常, 本轮未检测到人员 "
- f"(累计检测{detection_run_count}轮, 检测到人{detection_person_count}轮)"
- )
- last_no_detect_log_time = current_time
-
- if detections:
- self._process_detections(detections, frame, frame_size)
-
- # 为每个检测到的人发送PTZ命令(不再只选一个)
- if self.enable_ptz_tracking and self.enable_ptz_camera:
- targets = self._get_all_valid_targets()
- for target in targets:
- self._send_ptz_command_with_log(target, frame_size)
- elif not detections and self.current_target:
- # 目标消失,切回IDLE
- self._set_state(TrackingState.IDLE)
- logger.info("[检测] 目标丢失,球机进入IDLE状态")
- self.current_target = None
-
- self._cleanup_expired_targets()
- time.sleep(0.01)
-
- except Exception as e:
- logger.error(f"检测线程错误: {e}")
- time.sleep(0.1)
-
- def _init_ptz_detector(self):
- """初始化球机端人体检测器"""
- try:
- model_path = DETECTION_CONFIG.get('model_path')
- model_type = DETECTION_CONFIG.get('model_type', 'auto')
- conf_threshold = DETECTION_CONFIG.get('person_threshold', 0.5)
-
- if model_path:
- self.ptz_detector = PTZPersonDetector(
- model_path=model_path,
- model_type=model_type,
- confidence_threshold=conf_threshold
- )
- self.auto_zoom_controller = PTZAutoZoomController(
- ptz_camera=self.ptz,
- detector=self.ptz_detector,
- config=self.auto_zoom_config
- )
- print(f"[AsyncCoordinator] 球机端人体检测器初始化成功")
- else:
- print("[AsyncCoordinator] 未配置球机检测模型路径,禁用球机端检测")
- self.enable_ptz_detection = False
- except Exception as e:
- print(f"[AsyncCoordinator] 球机端检测器初始化失败: {e}")
- self.enable_ptz_detection = False
- def _create_detection_batch(self, frame: np.ndarray,
- detections: List[DetectedObject],
- frame_size: Tuple[int, int]):
- """
- 创建检测批次,用于配对图片保存
-
- Args:
- frame: 全景帧
- detections: 检测到的人员列表
- frame_size: 帧尺寸
- """
- if self._paired_saver is None:
- return
-
- # 过滤有效人员(必须是 person 且置信度 >= 阈值)
- person_threshold = DETECTION_CONFIG.get('person_threshold', 0.8)
- valid_persons = []
- for det in detections:
- # 只处理 class_name 为 person 的目标,排除安全帽、反光衣等
- if det.class_name == 'person' and det.confidence >= person_threshold:
- valid_persons.append(det)
-
- if not valid_persons:
- logger.debug(f"[配对保存] 无有效人员(阈值={person_threshold}),跳过批次创建")
- return
-
- # 构建人员信息列表(只包含有效人员)
- persons = []
- self._person_ptz_index = {} # 重置索引映射
-
- for i, det in enumerate(valid_persons):
- x_ratio = det.center[0] / frame_size[0]
- y_ratio = det.center[1] / frame_size[1]
-
- person_info = {
- 'track_id': det.track_id,
- 'position': (x_ratio, y_ratio),
- 'bbox': (det.bbox[0], det.bbox[1],
- det.bbox[0] + det.bbox[2],
- det.bbox[1] + det.bbox[3]),
- 'confidence': det.confidence
- }
- persons.append(person_info)
- self._person_ptz_index[det.track_id] = i
-
- # 创建新批次
- batch_id = self._paired_saver.start_new_batch(frame, persons)
- if batch_id:
- self._current_batch_id = batch_id
- logger.info(f"[配对保存] 创建批次: {batch_id}, 有效人员={len(persons)}/{len(detections)}")
- def _save_ptz_image_for_person(self, track_id: int,
- ptz_frame: np.ndarray,
- ptz_position: Tuple[float, float, int]):
- """
- 保存球机聚焦图片到对应批次
-
- Args:
- track_id: 人员跟踪ID
- ptz_frame: 球机帧
- ptz_position: PTZ位置 (pan, tilt, zoom)
- """
- if (self._paired_saver is None or
- self._current_batch_id is None or
- track_id not in self._person_ptz_index):
- return
-
- person_index = self._person_ptz_index[track_id]
-
- self._paired_saver.save_ptz_image(
- batch_id=self._current_batch_id,
- person_index=person_index,
- ptz_frame=ptz_frame,
- ptz_position=ptz_position,
- ptz_bbox=getattr(self, '_last_ptz_bbox', None)
- )
- def _save_ptz_image_for_person_batch(self, batch_id: str, person_index: int,
- ptz_frame: np.ndarray,
- ptz_position: Tuple[float, float, int]):
- """
- 保存球机聚焦图片到指定批次(直接使用 batch_id,不依赖当前批次)
-
- Args:
- batch_id: 批次ID
- person_index: 人员序号
- ptz_frame: 球机帧
- ptz_position: PTZ位置 (pan, tilt, zoom)
- """
- if self._paired_saver is None:
- return
-
- self._paired_saver.save_ptz_image(
- batch_id=batch_id,
- person_index=person_index,
- ptz_frame=ptz_frame,
- ptz_position=ptz_position,
- ptz_bbox=getattr(self, '_last_ptz_bbox', None)
- )
- def _ptz_worker(self):
- """PTZ控制线程:从队列接收命令并控制球机"""
- while self.running:
- try:
- try:
- cmd = self._ptz_queue.get(timeout=0.1)
- except queue.Empty:
- continue
-
- # 执行PTZ命令(batch_id 和 person_index 已在命令中)
- self._execute_ptz_command(cmd)
-
- except Exception as e:
- print(f"PTZ控制线程错误: {e}")
- time.sleep(0.05)
-
- def _select_tracking_target(self) -> Optional[TrackingTarget]:
- """选择当前跟踪目标"""
- with self.targets_lock:
- if not self.tracking_targets:
- self._set_state(TrackingState.IDLE)
- self.current_target = None
- return None
-
- # 使用目标选择器选择最优目标
- self.current_target = self.target_selector.select_target(
- self.tracking_targets
- )
-
- return self.current_target
-
- def _get_all_valid_targets(self) -> List[TrackingTarget]:
- """
- 获取所有有效的检测目标(用于多目标PTZ定位)
- 返回按优先级排序的目标列表
- """
- with self.targets_lock:
- if not self.tracking_targets:
- self._set_state(TrackingState.IDLE)
- self.current_target = None
- return []
-
- # 按得分排序所有目标
- targets = list(self.tracking_targets.values())
- targets.sort(key=lambda t: t.score, reverse=True)
-
- if targets:
- self._set_state(TrackingState.TRACKING)
- self.current_target = targets[0] # 第一个作为当前目标
-
- return targets
-
- def _send_ptz_command(self, target: TrackingTarget, frame_size: Tuple[int, int]):
- """将跟踪目标转化为PTZ命令放入队列"""
- x_ratio, y_ratio = target.position
-
- # 检查位置变化是否超过阈值
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- if abs(x_ratio - last_x) < self.ptz_position_threshold and \
- abs(y_ratio - last_y) < self.ptz_position_threshold:
- return
-
- # 冷却检查(线程安全)
- current_time = time.time()
- with self._last_ptz_time_lock:
- if current_time - self._last_ptz_time < self.PTZ_COMMAND_COOLDOWN:
- return
-
- cmd = PTZCommand(
- pan=0, tilt=0, zoom=0,
- x_ratio=x_ratio, y_ratio=y_ratio,
- use_calibration=self.enable_calibration
- )
-
- try:
- self._ptz_queue.put_nowait(cmd)
- self.last_ptz_position = (x_ratio, y_ratio)
- except queue.Full:
- pass # 丢弃命令,下一个检测周期会重发
-
- def _send_ptz_command_with_log(self, target: TrackingTarget, frame_size: Tuple[int, int]):
- """发送PTZ命令并打印日志"""
- x_ratio, y_ratio = target.position
-
- # 冷却检查(线程安全)
- current_time = time.time()
- with self._last_ptz_time_lock:
- if current_time - self._last_ptz_time < self.PTZ_COMMAND_COOLDOWN:
- return
-
- # 位置变化阈值检查
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- if abs(x_ratio - last_x) < self.ptz_position_threshold and \
- abs(y_ratio - last_y) < self.ptz_position_threshold:
- return
-
- # 计算PTZ角度(用于日志)
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- coord_type = "校准坐标"
- else:
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
- coord_type = "估算坐标"
-
- # 获取当前批次信息和人员序号
- batch_id = self._current_batch_id if self._enable_paired_saving else None
- person_index = self._person_ptz_index.get(target.track_id, -1) if self._enable_paired_saving else -1
-
- cmd = PTZCommand(
- pan=0, tilt=0, zoom=0,
- x_ratio=x_ratio, y_ratio=y_ratio,
- use_calibration=self.enable_calibration,
- track_id=target.track_id, # 传递跟踪ID
- batch_id=batch_id, # 传递批次ID
- person_index=person_index # 传递人员序号
- )
-
- try:
- self._ptz_queue.put_nowait(cmd)
- self.last_ptz_position = (x_ratio, y_ratio) # 更新位置记录
- self._update_stats('ptz_commands_sent' if 'ptz_commands_sent' in self.stats else 'persons_detected')
- logger.info(
- f"[PTZ] 命令已发送: 目标ID={target.track_id} "
- f"全景位置=({x_ratio:.3f}, {y_ratio:.3f}) → "
- f"PTZ角度=(pan={pan:.1f}°, tilt={tilt:.1f}°, zoom={zoom}) [{coord_type}]"
- )
- except queue.Full:
- logger.warning("[PTZ] 命令队列满,丢弃本次命令")
-
- def _execute_ptz_command(self, cmd: PTZCommand):
- """
- 执行PTZ命令(在PTZ线程中)
-
- Args:
- cmd: PTZ命令(包含 batch_id, person_index, track_id 用于配对保存)
- """
- # 更新最后执行时间(线程安全)
- with self._last_ptz_time_lock:
- self._last_ptz_time = time.time()
-
- # 从命令中提取配对保存相关信息
- track_id = cmd.track_id
- batch_id = cmd.batch_id
- person_index = cmd.person_index
-
- if cmd.use_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(cmd.x_ratio, cmd.y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- else:
- pan, tilt, zoom = self.ptz.calculate_ptz_position(cmd.x_ratio, cmd.y_ratio)
-
- self._set_state(TrackingState.TRACKING)
- logger.info(
- f"[PTZ] 执行: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom} "
- f"(全景位置=({cmd.x_ratio:.3f}, {cmd.y_ratio:.3f}), "
- f"batch={batch_id}, person={person_index})"
- )
-
- success = self.ptz.goto_exact_position(pan, tilt, zoom)
-
- if success:
- # 等待球机物理移动到位(增加额外等待确保画面清晰)
- time.sleep(self.PTZ_CONFIRM_WAIT)
-
- # 球机端人体检测与自动对焦
- final_pan, final_tilt, final_zoom = pan, tilt, zoom
- if self.enable_ptz_detection and self.auto_zoom_config.get('enabled', False):
- auto_zoom_result = self._auto_zoom_person(pan, tilt, zoom)
- if auto_zoom_result != zoom:
- final_zoom = auto_zoom_result
- # 自动变焦后再次等待画面稳定
- time.sleep(0.5)
-
- # 获取清晰的球机画面(尝试多次获取最新帧)
- ptz_frame = self._get_clear_ptz_frame()
-
- # 保存球机图片到配对批次(使用命令中的 batch_id 和 person_index)
- if self._enable_paired_saving and batch_id is not None and person_index >= 0 and ptz_frame is not None:
- # 使用球机端检测器检测人体并标记
- ptz_frame_marked = self._mark_ptz_frame_with_detection(ptz_frame, person_index=person_index)
- self._save_ptz_image_for_person_batch(batch_id, person_index, ptz_frame_marked, (final_pan, final_tilt, final_zoom))
- elif self._enable_paired_saving:
- logger.warning(f"[配对保存] 跳过球机图保存: batch_id={batch_id}, person_index={person_index}, frame={ptz_frame is not None}")
-
- logger.info(f"[PTZ] 到位确认完成: pan={final_pan:.1f}° tilt={final_tilt:.1f}° zoom={final_zoom}")
- else:
- logger.warning(f"[PTZ] 命令执行失败: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}")
-
- def _auto_zoom_person(self, initial_pan: float, initial_tilt: float, initial_zoom: int) -> int:
- """
- 自动对焦人体
- 在球机画面中检测人体,自动调整zoom使人体居中且大小合适
-
- Returns:
- 最终的 zoom 值
- """
- if self.auto_zoom_controller is None:
- return initial_zoom
-
- logger.info("[AutoZoom] 开始自动对焦...")
-
- try:
- success, final_zoom = self.auto_zoom_controller.auto_focus_loop(
- get_frame_func=self.ptz.get_frame,
- max_attempts=self.auto_zoom_config.get('max_adjust_attempts', 3)
- )
-
- if success:
- logger.info(f"[AutoZoom] 自动对焦成功: zoom={final_zoom}")
- return final_zoom
- else:
- logger.warning("[AutoZoom] 自动对焦未能定位人体")
- return initial_zoom
- except Exception as e:
- logger.error(f"[AutoZoom] 自动对焦异常: {e}")
- return initial_zoom
-
- def _get_clear_ptz_frame(self, max_attempts: int = 5, wait_interval: float = 0.1) -> Optional[np.ndarray]:
- """
- 获取清晰的球机画面
- 尝试多次获取,丢弃模糊/过渡帧
-
- Args:
- max_attempts: 最大尝试次数
- wait_interval: 每次等待间隔
-
- Returns:
- 清晰的球机帧或 None
- """
- best_frame = None
- best_score = -1
-
- for i in range(max_attempts):
- frame = self.ptz.get_frame()
- if frame is not None:
- # 使用拉普拉斯算子评估图像清晰度
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
-
- if laplacian_var > best_score:
- best_score = laplacian_var
- best_frame = frame.copy()
-
- # 如果清晰度足够高,直接返回
- if laplacian_var > 100: # 清晰度阈值
- return frame
-
- time.sleep(wait_interval)
-
- return best_frame
-
- def _mark_ptz_frame_with_detection(self, frame: np.ndarray, person_index: int) -> np.ndarray:
- """
- 在球机帧上标记检测到的人体
-
- Args:
- frame: 球机帧
- person_index: 人员序号
-
- Returns:
- 标记后的帧
- """
- marked_frame = frame.copy()
- h, w = marked_frame.shape[:2]
-
- # 重置保存的bbox
- self._last_ptz_bbox = None
-
- # 使用球机端检测器检测人体
- if self.ptz_detector is not None:
- try:
- persons = self.ptz_detector.detect(frame)
- if persons:
- # 找到最大的人体(假设是目标)
- largest_person = max(persons, key=lambda p: p.area)
- x1, y1, x2, y2 = largest_person.bbox
-
- # 保存bbox供后续使用
- self._last_ptz_bbox = (x1, y1, x2, y2)
-
- # 绘制边界框(红色,区别于全景的绿色)
- cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
-
- # 绘制标签
- label = f"person_{person_index} ({largest_person.confidence:.2f})"
- (label_w, label_h), _ = cv2.getTextSize(
- label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2
- )
-
- # 标签背景(红色)
- cv2.rectangle(
- marked_frame,
- (x1, y1 - label_h - 8),
- (x1 + label_w, y1),
- (0, 0, 255),
- -1
- )
-
- # 标签文字(白色)
- cv2.putText(
- marked_frame, label,
- (x1, y1 - 4),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7,
- (255, 255, 255), 2
- )
-
- logger.info(f"[配对保存] 球机图标记: person_{person_index}, "
- f"位置=({x1},{y1},{x2},{y2}), 置信度={largest_person.confidence:.2f}")
- else:
- # 未检测到人体,在画面中心添加提示
- cv2.putText(
- marked_frame, f"person_{person_index} (no detection)",
- (w // 2 - 100, h // 2),
- cv2.FONT_HERSHEY_SIMPLEX, 0.8,
- (0, 0, 255), 2
- )
- except Exception as e:
- logger.error(f"[配对保存] 球机图检测标记失败: {e}")
-
- return marked_frame
-
- def _confirm_ptz_position(self, x_ratio: float, y_ratio: float):
- """PTZ位置确认:读取球机帧验证目标是否可见"""
- if not hasattr(self.ptz, 'get_frame') or self.ptz.get_frame() is None:
- return
-
- ptz_frame = self.ptz.get_frame()
- if ptz_frame is None:
- return
-
- # 未来可以在这里添加球机帧目标验证逻辑
- # 例如:在球机帧中检测目标是否在画面中心附近
-
- def on_ptz_confirmed(self, callback: Callable):
- """注册PTZ位置确认回调"""
- self._on_ptz_confirmed = callback
- class SequentialCoordinator(AsyncCoordinator):
- """
- 顺序联动控制器 — 全景检测→逐个PTZ抓拍→回到全景的循环模式
-
- 工作流程:
- 1. 全景摄像头检测人员
- 2. 检测到人员后,暂停全景检测
- 3. 球机依次对每个检测到的人员进行PTZ定位+变焦抓拍
- 4. 所有人员抓拍完成后,球机回到默认位置
- 5. 恢复全景检测,进入下一轮循环
-
- 适用于需要高质量抓拍的场景,确保每个目标都能被球机清晰拍摄
- """
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- # 顺序抓拍状态
- self._capture_state = 'idle' # 'idle', 'detecting', 'capturing', 'returning'
- self._capture_state_lock = threading.Lock()
-
- # 当前批次检测到的目标
- self._batch_targets: List[TrackingTarget] = []
- self._batch_targets_lock = threading.Lock()
- self._current_capture_index = 0
-
- # 抓拍完成事件(用于同步)
- self._capture_done_event = threading.Event()
-
- # 配置参数
- self._capture_config = {
- 'ptz_stabilize_time': 1.0, # PTZ到位后稳定等待时间
- 'capture_wait_time': 0.5, # 抓拍等待时间
- 'return_to_panorama': True, # 完成后是否回到全景默认位置
- 'default_pan': 0.0, # 默认pan角度
- 'default_tilt': 0.0, # 默认tilt角度
- 'default_zoom': 1, # 默认zoom(广角)
- }
-
- # 覆盖父类的PTZ冷却时间(顺序模式下可以更短)
- self.PTZ_COMMAND_COOLDOWN = 0.1
-
- logger.info("[SequentialCoordinator] 顺序联动控制器初始化完成")
-
- def _detection_worker(self):
- """检测线程:顺序模式下的检测逻辑"""
- # 从 DETECTION_CONFIG 获取检测帧率,默认每秒2帧
- detection_fps = self.config.get('detection_fps', DETECTION_CONFIG.get('detection_fps', 2))
- detection_interval = 1.0 / detection_fps
- last_detection_time = 0
-
- frame_count = 0
- last_log_time = time.time()
- log_interval = 5.0
- detection_run_count = 0
- detection_person_count = 0
- last_no_detect_log_time = 0
- no_detect_log_interval = 30.0
-
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- if self.detector is None:
- logger.warning("[检测线程] ⚠️ 人体检测器未初始化!")
- else:
- logger.info(f"[检测线程] ✓ 顺序模式已就绪, 检测帧率={detection_fps}fps")
-
- while self.running:
- try:
- current_time = time.time()
-
- # 获取当前帧
- frame = self.panorama.get_frame()
- if frame is None:
- time.sleep(0.01)
- continue
-
- frame_count += 1
- self._update_stats('frames_processed')
- frame_size = (frame.shape[1], frame.shape[0])
-
- # 日志输出
- if current_time - last_log_time >= log_interval:
- elapsed = current_time - last_log_time
- fps = frame_count / elapsed if elapsed > 0 else 0
- state_str = self._get_capture_state()
- logger.info(f"[检测线程] 帧率={fps:.1f}fps, 状态={state_str}, "
- f"检测轮次={detection_run_count}(有人={detection_person_count})")
- frame_count = 0
- last_log_time = current_time
-
- # 状态机处理
- state = self._get_capture_state()
-
- if state == 'idle':
- # 空闲状态:周期性检测
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
- detection_run_count += 1
-
- # 执行检测
- detections = self._detect_persons(frame)
-
- if detections:
- self._update_stats('persons_detected', len(detections))
- detection_person_count += 1
-
- # 更新跟踪目标
- self._update_tracking_targets(detections, frame_size)
-
- # 获取有效目标列表
- targets = self._get_all_valid_targets()
-
- if targets:
- logger.info(f"[顺序模式] 检测到 {len(targets)} 个目标,开始顺序抓拍")
-
- # 创建配对保存批次
- if self._enable_paired_saving and self._paired_saver is not None:
- self._create_detection_batch(frame, detections, frame_size)
-
- # 切换到抓拍状态
- self._start_capture_sequence(targets)
- else:
- # 未检测到人员
- if current_time - last_no_detect_log_time >= no_detect_log_interval:
- logger.info(f"[检测] 本轮未检测到人员 (累计{detection_run_count}轮)")
- last_no_detect_log_time = current_time
-
- elif state == 'capturing':
- # 抓拍状态中:检测线程等待,不执行新检测
- # 等待PTZ线程完成当前批次
- pass
-
- elif state == 'returning':
- # 球机回到默认位置中
- pass
-
- # 清理过期目标
- self._cleanup_expired_targets()
- time.sleep(0.01)
-
- except Exception as e:
- logger.error(f"[检测线程] 错误: {e}")
- time.sleep(0.1)
-
- def _ptz_worker(self):
- """PTZ控制线程:顺序模式下的PTZ控制逻辑"""
- logger.info("[PTZ线程] 顺序模式PTZ控制线程启动")
-
- while self.running:
- try:
- state = self._get_capture_state()
-
- if state == 'capturing':
- # 执行顺序抓拍
- self._execute_sequential_capture()
-
- elif state == 'returning':
- # 回到默认位置
- self._return_to_default_position()
-
- time.sleep(0.05)
-
- except Exception as e:
- logger.error(f"[PTZ线程] 错误: {e}")
- time.sleep(0.1)
-
- def _get_capture_state(self) -> str:
- """获取当前抓拍状态"""
- with self._capture_state_lock:
- return self._capture_state
-
- def _set_capture_state(self, state: str):
- """设置抓拍状态"""
- with self._capture_state_lock:
- old_state = self._capture_state
- self._capture_state = state
- logger.info(f"[顺序模式] 状态切换: {old_state} -> {state}")
-
- def _start_capture_sequence(self, targets: List[TrackingTarget]):
- """开始顺序抓拍序列"""
- with self._batch_targets_lock:
- self._batch_targets = targets.copy()
- self._current_capture_index = 0
-
- self._set_capture_state('capturing')
-
- def _execute_sequential_capture(self):
- """执行顺序抓拍(依次对每个目标进行PTZ定位和抓拍)"""
- with self._batch_targets_lock:
- targets = self._batch_targets.copy()
- current_idx = self._current_capture_index
-
- if current_idx >= len(targets):
- # 所有目标已抓拍完成
- logger.info("[顺序模式] 所有目标抓拍完成")
- self._set_capture_state('returning')
- return
-
- # 获取当前目标
- target = targets[current_idx]
- x_ratio, y_ratio = target.position
-
- # 计算PTZ角度
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- coord_type = "校准坐标"
- else:
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
- coord_type = "估算坐标"
-
- # 获取批次信息
- batch_id = self._current_batch_id if self._enable_paired_saving else None
- person_index = current_idx # 使用当前索引作为人员序号
-
- logger.info(f"[顺序模式] 抓拍目标 {current_idx + 1}/{len(targets)}: "
- f"位置=({x_ratio:.3f}, {y_ratio:.3f}) -> "
- f"PTZ=({pan:.1f}°, {tilt:.1f}°, zoom={zoom}) [{coord_type}]")
-
- # 执行PTZ移动
- self._set_state(TrackingState.TRACKING)
- success = self.ptz.goto_exact_position(pan, tilt, zoom)
-
- if success:
- # 等待球机稳定
- stabilize_time = self._capture_config['ptz_stabilize_time']
- logger.info(f"[顺序模式] 等待球机稳定 {stabilize_time}s...")
- time.sleep(stabilize_time)
-
- # 自动变焦(如果启用)
- final_zoom = zoom
- if self.enable_ptz_detection and self.auto_zoom_config.get('enabled', False):
- auto_zoom_result = self._auto_zoom_person(pan, tilt, zoom)
- if auto_zoom_result != zoom:
- final_zoom = auto_zoom_result
- time.sleep(0.5) # 变焦后再次等待
-
- # 获取清晰的球机画面
- ptz_frame = self._get_clear_ptz_frame()
-
- if ptz_frame is not None:
- # 保存球机图片
- if self._enable_paired_saving and batch_id is not None:
- ptz_frame_marked = self._mark_ptz_frame_with_detection(ptz_frame, person_index=person_index)
- self._save_ptz_image_for_person_batch(
- batch_id, person_index, ptz_frame_marked,
- (pan, tilt, final_zoom)
- )
-
- # 保存到本地(无论是否启用配对保存)
- self._save_local_snapshot(ptz_frame, current_idx, pan, tilt, final_zoom)
-
- logger.info(f"[顺序模式] 目标 {current_idx + 1} 抓拍完成")
- else:
- logger.warning(f"[顺序模式] 获取球机画面失败")
- else:
- logger.warning(f"[顺序模式] PTZ移动失败")
-
- # 移动到下一个目标
- with self._batch_targets_lock:
- self._current_capture_index += 1
-
- # 抓拍间隔
- time.sleep(self._capture_config['capture_wait_time'])
-
- def _return_to_default_position(self):
- """球机回到默认位置(广角全景)"""
- if not self._capture_config['return_to_panorama']:
- # 不回到默认位置,直接回到空闲状态
- self._set_capture_state('idle')
- return
-
- default_pan = self._capture_config['default_pan']
- default_tilt = self._capture_config['default_tilt']
- default_zoom = self._capture_config['default_zoom']
-
- logger.info(f"[顺序模式] 球机回到默认位置: ({default_pan}°, {default_tilt}°, zoom={default_zoom})")
-
- success = self.ptz.goto_exact_position(default_pan, default_tilt, default_zoom)
-
- if success:
- # 等待到位
- time.sleep(self._capture_config['ptz_stabilize_time'])
- logger.info("[顺序模式] 球机已回到默认位置")
- else:
- logger.warning("[顺序模式] 球机回到默认位置失败")
-
- # 回到空闲状态,恢复全景检测
- self._set_capture_state('idle')
-
- # 清空批次目标
- with self._batch_targets_lock:
- self._batch_targets = []
- self._current_capture_index = 0
-
- def _save_local_snapshot(self, frame: np.ndarray, index: int,
- pan: float, tilt: float, zoom: int):
- """保存本地快照"""
- try:
- import os
- from datetime import datetime
-
- # 创建保存目录
- save_dir = '/home/admin/dsh/captures'
- os.makedirs(save_dir, exist_ok=True)
-
- # 生成文件名
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
- filename = f"capture_{timestamp}_person{index}_p{int(pan)}_t{int(tilt)}_z{zoom}.jpg"
- filepath = os.path.join(save_dir, filename)
-
- # 保存图片
- cv2.imwrite(filepath, frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
- logger.info(f"[顺序模式] 快照已保存: {filepath}")
-
- except Exception as e:
- logger.error(f"[顺序模式] 保存快照失败: {e}")
-
- def set_capture_config(self, **kwargs):
- """设置抓拍配置"""
- self._capture_config.update(kwargs)
- logger.info(f"[顺序模式] 配置已更新: {kwargs}")
-
- def get_capture_status(self) -> dict:
- """获取当前抓拍状态"""
- with self._batch_targets_lock:
- total = len(self._batch_targets)
- current = self._current_capture_index
-
- return {
- 'state': self._get_capture_state(),
- 'total_targets': total,
- 'current_index': current,
- 'remaining': max(0, total - current)
- }
|