| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570 |
- """
- 联动控制器
- 协调全景摄像头和球机的工作
- """
- import time
- import threading
- import queue
- from typing import Optional, List, Dict, Tuple, Callable
- from dataclasses import dataclass, field
- from enum import Enum
- import numpy as np
- from config import COORDINATOR_CONFIG, SYSTEM_CONFIG
- from panorama_camera import PanoramaCamera, ObjectDetector, PersonTracker, DetectedObject
- from ptz_camera import PTZCamera, PTZController
- from ocr_recognizer import NumberDetector, PersonInfo
- class TrackingState(Enum):
- """跟踪状态"""
- IDLE = 0 # 空闲
- SEARCHING = 1 # 搜索目标
- TRACKING = 2 # 跟踪中
- ZOOMING = 3 # 变焦中
- OCR_PROCESSING = 4 # OCR处理中
- @dataclass
- class TrackingTarget:
- """跟踪目标"""
- track_id: int # 跟踪ID
- position: Tuple[float, float] # 位置比例 (x_ratio, y_ratio)
- last_update: float # 最后更新时间
- person_info: Optional[PersonInfo] = None # 人员信息
- priority: int = 0 # 优先级
- class Coordinator:
- """
- 联动控制器
- 协调全景摄像头和球机实现联动抓拍
- """
-
- def __init__(self, panorama_camera: PanoramaCamera,
- ptz_camera: PTZCamera,
- detector: ObjectDetector = None,
- number_detector: NumberDetector = None,
- calibrator = None):
- """
- 初始化联动控制器
- Args:
- panorama_camera: 全景摄像头
- ptz_camera: 球机
- detector: 物体检测器
- number_detector: 编号检测器
- calibrator: 校准器 (用于坐标转换)
- """
- self.panorama = panorama_camera
- self.ptz = ptz_camera
- self.detector = detector
- self.number_detector = number_detector
- self.calibrator = calibrator
-
- self.config = COORDINATOR_CONFIG
-
- # 功能开关 - 从 SYSTEM_CONFIG 读取
- self.enable_ptz_camera = SYSTEM_CONFIG.get('enable_ptz_camera', True)
- self.enable_ptz_tracking = SYSTEM_CONFIG.get('enable_ptz_tracking', True)
- self.enable_calibration = SYSTEM_CONFIG.get('enable_calibration', True)
- self.enable_detection = SYSTEM_CONFIG.get('enable_detection', True)
- self.enable_ocr = SYSTEM_CONFIG.get('enable_ocr', True)
-
- # 跟踪器
- self.tracker = PersonTracker()
-
- # 状态
- self.state = TrackingState.IDLE
- self.state_lock = threading.Lock()
-
- # 跟踪目标
- self.tracking_targets: Dict[int, TrackingTarget] = {}
- self.targets_lock = threading.Lock()
-
- # 当前跟踪目标
- self.current_target: Optional[TrackingTarget] = None
-
- # 回调函数
- self.on_person_detected: Optional[Callable] = None
- self.on_number_recognized: Optional[Callable] = None
- self.on_tracking_started: Optional[Callable] = None
- self.on_tracking_stopped: Optional[Callable] = None
-
- # 控制标志
- self.running = False
- self.coordinator_thread = None
-
- # OCR频率控制
- self.last_ocr_time = 0
- self.ocr_interval = 1.0 # OCR间隔(秒),避免过于频繁调用API
-
- # PTZ优化 - 避免频繁发送相同位置的命令
- self.last_ptz_position = None
- self.ptz_position_threshold = 0.02 # 位置变化阈值 (2%)
-
- # 结果队列
- self.result_queue = queue.Queue()
-
- # 性能统计
- self.stats = {
- 'frames_processed': 0,
- 'persons_detected': 0,
- 'ocr_attempts': 0,
- 'ocr_success': 0,
- 'start_time': None,
- 'last_frame_time': None,
- }
- self.stats_lock = threading.Lock()
-
- def set_calibrator(self, calibrator):
- """设置校准器"""
- self.calibrator = calibrator
-
- def _transform_position(self, x_ratio: float, y_ratio: float) -> Tuple[float, float, int]:
- """
- 将全景坐标转换为PTZ角度
- Args:
- x_ratio: X方向比例
- y_ratio: Y方向比例
- Returns:
- (pan, tilt, zoom)
- """
- if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
- # 使用校准结果进行转换
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- zoom = 8 # 默认变倍
- else:
- # 使用默认估算
- pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
-
- return (pan, tilt, zoom)
-
- def start(self) -> bool:
- """
- 启动联动系统
- Returns:
- 是否成功
- """
- # 连接全景摄像头
- if not self.panorama.connect():
- print("连接全景摄像头失败")
- return False
-
- # 连接 PTZ 球机 (可选)
- if self.enable_ptz_camera:
- if not self.ptz.connect():
- print("连接球机失败")
- self.panorama.disconnect()
- return False
- else:
- print("PTZ 球机功能已禁用")
-
- # 启动视频流(优先RTSP,SDK回调不可用时回退)
- if not self.panorama.start_stream_rtsp():
- print("RTSP视频流启动失败,尝试SDK方式...")
- if not self.panorama.start_stream():
- print("启动视频流失败")
- self.panorama.disconnect()
- if self.enable_ptz_camera:
- self.ptz.disconnect()
- return False
-
- # 启动联动线程
- self.running = True
- self.coordinator_thread = threading.Thread(target=self._coordinator_worker, daemon=True)
- self.coordinator_thread.start()
-
- print("联动系统已启动")
- return True
-
- def stop(self):
- """停止联动系统"""
- self.running = False
-
- if self.coordinator_thread:
- self.coordinator_thread.join(timeout=3)
-
- self.panorama.disconnect()
-
- if self.enable_ptz_camera:
- self.ptz.disconnect()
-
- # 打印统计信息
- self._print_stats()
-
- print("联动系统已停止")
-
- def _update_stats(self, key: str, value: int = 1):
- """更新统计信息"""
- with self.stats_lock:
- if key in self.stats:
- self.stats[key] += value
-
- def _print_stats(self):
- """打印统计信息"""
- with self.stats_lock:
- if self.stats['start_time'] and self.stats['frames_processed'] > 0:
- elapsed = time.time() - self.stats['start_time']
- fps = self.stats['frames_processed'] / elapsed
- print("\n=== 性能统计 ===")
- print(f"运行时长: {elapsed:.1f}秒")
- print(f"处理帧数: {self.stats['frames_processed']}")
- print(f"平均帧率: {fps:.1f} fps")
- print(f"检测人体: {self.stats['persons_detected']}次")
- print(f"OCR尝试: {self.stats['ocr_attempts']}次")
- print(f"OCR成功: {self.stats['ocr_success']}次")
- print("================\n")
-
- def get_stats(self) -> dict:
- """获取统计信息"""
- with self.stats_lock:
- return self.stats.copy()
-
- def _coordinator_worker(self):
- """联动工作线程"""
- last_detection_time = 0
- detection_interval = 0.1 # 检测间隔
-
- # 初始化统计
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- while self.running:
- try:
- current_time = time.time()
-
- # 获取当前帧
- frame = self.panorama.get_frame()
- if frame is None:
- time.sleep(0.01)
- continue
-
- # 更新帧统计
- self._update_stats('frames_processed')
-
- frame_size = (frame.shape[1], frame.shape[0])
-
- # 周期性检测
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
-
- # 检测人体
- detections = self._detect_persons(frame)
-
- # 更新检测统计
- if detections:
- self._update_stats('persons_detected', len(detections))
-
- # 更新跟踪
- tracked = self.tracker.update(detections)
-
- # 更新跟踪目标
- self._update_tracking_targets(tracked, frame_size)
-
- # 处理检测结果
- if tracked:
- self._process_detections(tracked, frame, frame_size)
-
- # 处理当前跟踪目标
- self._process_current_target(frame, frame_size)
-
- # 清理过期目标
- self._cleanup_expired_targets()
-
- time.sleep(0.01)
-
- except Exception as e:
- print(f"联动处理错误: {e}")
- time.sleep(0.1)
-
- def _detect_persons(self, frame: np.ndarray) -> List[DetectedObject]:
- """检测人体"""
- if not self.enable_detection or self.detector is None:
- return []
- return self.detector.detect_persons(frame)
-
- def _update_tracking_targets(self, detections: List[DetectedObject],
- frame_size: Tuple[int, int]):
- """更新跟踪目标"""
- current_time = time.time()
-
- with self.targets_lock:
- # 更新现有目标
- for det in detections:
- if det.track_id is None:
- continue
-
- x_ratio = det.center[0] / frame_size[0]
- y_ratio = det.center[1] / frame_size[1]
-
- if det.track_id in self.tracking_targets:
- # 更新位置
- target = self.tracking_targets[det.track_id]
- target.position = (x_ratio, y_ratio)
- target.last_update = current_time
- else:
- # 新目标
- if len(self.tracking_targets) < self.config['max_tracking_targets']:
- self.tracking_targets[det.track_id] = TrackingTarget(
- track_id=det.track_id,
- position=(x_ratio, y_ratio),
- last_update=current_time
- )
-
- def _process_detections(self, detections: List[DetectedObject],
- frame: np.ndarray, frame_size: Tuple[int, int]):
- """处理检测结果"""
- if self.on_person_detected:
- for det in detections:
- self.on_person_detected(det, frame)
-
- def _process_current_target(self, frame: np.ndarray, frame_size: Tuple[int, int]):
- """处理当前跟踪目标"""
- with self.targets_lock:
- if not self.tracking_targets:
- self._set_state(TrackingState.IDLE)
- self.current_target = None
- return
-
- # 选择优先级最高的目标(这里选择最新的)
- if self.current_target is None or \
- self.current_target.track_id not in self.tracking_targets:
- # 选择一个新目标
- target_id = list(self.tracking_targets.keys())[0]
- self.current_target = self.tracking_targets[target_id]
-
- if self.current_target:
- # 移动球机到目标位置 (仅在 PTZ 跟踪启用时)
- if self.enable_ptz_tracking and self.enable_ptz_camera:
- self._set_state(TrackingState.TRACKING)
-
- x_ratio, y_ratio = self.current_target.position
-
- # 检查位置是否变化超过阈值
- should_move = True
- if self.last_ptz_position is not None:
- last_x, last_y = self.last_ptz_position
- if (abs(x_ratio - last_x) < self.ptz_position_threshold and
- abs(y_ratio - last_y) < self.ptz_position_threshold):
- should_move = False
-
- if should_move:
- self.ptz.track_target(x_ratio, y_ratio)
- self.last_ptz_position = (x_ratio, y_ratio)
-
- # 执行OCR识别 (仅在 OCR 启用时)
- if self.enable_ocr:
- self._perform_ocr(frame, self.current_target)
-
- def _perform_ocr(self, frame: np.ndarray, target: TrackingTarget):
- """执行OCR识别"""
- if not self.enable_ocr or self.number_detector is None:
- return
-
- # 频率控制 - 避免过于频繁调用OCR API
- current_time = time.time()
- if current_time - self.last_ocr_time < self.ocr_interval:
- return
- self.last_ocr_time = current_time
-
- # 更新OCR尝试统计
- self._update_stats('ocr_attempts')
-
- # 计算人体边界框 (基于位置估算)
- frame_h, frame_w = frame.shape[:2]
-
- # 人体占画面比例 (可配置,默认宽20%、高40%)
- person_width_ratio = self.config.get('person_width_ratio', 0.2)
- person_height_ratio = self.config.get('person_height_ratio', 0.4)
-
- person_width = int(frame_w * person_width_ratio)
- person_height = int(frame_h * person_height_ratio)
-
- x_ratio, y_ratio = target.position
- center_x = int(x_ratio * frame_w)
- center_y = int(y_ratio * frame_h)
-
- # 计算边界框,确保不超出画面范围
- x1 = max(0, center_x - person_width // 2)
- y1 = max(0, center_y - person_height // 2)
- x2 = min(frame_w, x1 + person_width)
- y2 = min(frame_h, y1 + person_height)
-
- # 更新实际宽高 (可能因边界裁剪而变小)
- actual_width = x2 - x1
- actual_height = y2 - y1
-
- person_bbox = (x1, y1, actual_width, actual_height)
-
- # 检测编号
- self._set_state(TrackingState.OCR_PROCESSING)
- person_info = self.number_detector.detect_number(frame, person_bbox)
- person_info.person_id = target.track_id
-
- # 更新OCR成功统计
- if person_info.number_text:
- self._update_stats('ocr_success')
-
- # 更新目标信息
- with self.targets_lock:
- if target.track_id in self.tracking_targets:
- self.tracking_targets[target.track_id].person_info = person_info
-
- # 回调
- if self.on_number_recognized and person_info.number_text:
- self.on_number_recognized(person_info)
-
- # 放入结果队列
- self.result_queue.put(person_info)
-
- def _cleanup_expired_targets(self):
- """清理过期目标"""
- current_time = time.time()
- timeout = self.config['tracking_timeout']
-
- with self.targets_lock:
- expired_ids = [
- target_id for target_id, target in self.tracking_targets.items()
- if current_time - target.last_update > timeout
- ]
-
- for target_id in expired_ids:
- del self.tracking_targets[target_id]
- if self.current_target and self.current_target.track_id == target_id:
- self.current_target = None
-
- def _set_state(self, state: TrackingState):
- """设置状态"""
- with self.state_lock:
- self.state = state
-
- def get_state(self) -> TrackingState:
- """获取状态"""
- with self.state_lock:
- return self.state
-
- def get_results(self) -> List[PersonInfo]:
- """
- 获取识别结果
- Returns:
- 人员信息列表
- """
- results = []
- while not self.result_queue.empty():
- try:
- results.append(self.result_queue.get_nowait())
- except queue.Empty:
- break
- return results
-
- def get_tracking_targets(self) -> List[TrackingTarget]:
- """获取当前跟踪目标"""
- with self.targets_lock:
- return list(self.tracking_targets.values())
-
- def force_track_position(self, x_ratio: float, y_ratio: float, zoom: int = None):
- """
- 强制跟踪指定位置
- Args:
- x_ratio: X方向比例
- y_ratio: Y方向比例
- zoom: 变倍
- """
- if self.enable_ptz_tracking and self.enable_ptz_camera:
- self.ptz.move_to_target(x_ratio, y_ratio, zoom)
-
- def capture_snapshot(self) -> Optional[np.ndarray]:
- """
- 抓拍快照
- Returns:
- 快照图像
- """
- return self.panorama.get_frame()
- class EventDrivenCoordinator(Coordinator):
- """
- 事件驱动联动控制器
- 当全景摄像头检测到事件时触发联动
- """
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
-
- # 事件类型
- self.event_types = {
- 'intruder': True, # 入侵检测
- 'crossline': True, # 越界检测
- 'motion': True, # 移动侦测
- }
-
- # 事件队列
- self.event_queue = queue.Queue()
-
- def on_event(self, event_type: str, event_data: dict):
- """
- 事件回调
- Args:
- event_type: 事件类型
- event_data: 事件数据
- """
- if not self.event_types.get(event_type, False):
- return
-
- self.event_queue.put({
- 'type': event_type,
- 'data': event_data,
- 'time': time.time()
- })
-
- def _coordinator_worker(self):
- """联动工作线程 (事件驱动版本)"""
- while self.running:
- try:
- # 处理事件
- try:
- event = self.event_queue.get(timeout=0.1)
- self._process_event(event)
- except queue.Empty:
- pass
-
- # 继续常规处理
- frame = self.panorama.get_frame()
- if frame is not None:
- frame_size = (frame.shape[1], frame.shape[0])
- detections = self._detect_persons(frame)
-
- if detections:
- tracked = self.tracker.update(detections)
- self._update_tracking_targets(tracked, frame_size)
- self._process_current_target(frame, frame_size)
-
- self._cleanup_expired_targets()
-
- except Exception as e:
- print(f"事件处理错误: {e}")
- time.sleep(0.1)
-
- def _process_event(self, event: dict):
- """处理事件"""
- event_type = event['type']
- event_data = event['data']
-
- print(f"处理事件: {event_type}")
-
- # 根据事件类型处理
- if event_type == 'intruder':
- # 入侵事件 - 获取入侵位置
- if 'position' in event_data:
- x_ratio, y_ratio = event_data['position']
- self.force_track_position(x_ratio, y_ratio)
-
- elif event_type == 'crossline':
- # 越界事件
- pass
-
- elif event_type == 'motion':
- # 移动侦测事件
- pass
|