""" 安全联动控制器 整合安全检测、事件推送和语音播报功能 """ import time import threading import queue from typing import Optional, List, Dict, Tuple, Callable from dataclasses import dataclass from enum import Enum import numpy as np import cv2 from config import ( COORDINATOR_CONFIG, SAFETY_DETECTION_CONFIG, EVENT_PUSHER_CONFIG, EVENT_LISTENER_CONFIG, VOICE_ANNOUNCER_CONFIG, SYSTEM_CONFIG ) from safety_detector import ( SafetyDetector, SafetyDetection, PersonSafetyStatus, SafetyViolationType, draw_safety_result ) from event_pusher import EventPusher, EventListener, SafetyEvent, EventType from voice_announcer import VoiceAnnouncer, VoicePriority class CoordinatorState(Enum): """控制器状态""" IDLE = 0 # 空闲 DETECTING = 1 # 检测中 TRACKING = 2 # 跟踪中 ALERTING = 3 # 告警中 @dataclass class AlertRecord: """告警记录""" track_id: int # 跟踪ID violation_type: str # 违规类型 description: str # 描述 frame: Optional[np.ndarray] # 图像 timestamp: float # 时间戳 pushed: bool = False # 是否已推送 announced: bool = False # 是否已播报 class SafetyCoordinator: """安全联动控制器:协调摄像头、安全检测、事件推送、语音播报、PTZ跟踪""" def __init__(self, camera, config: Dict = None, ptz_camera=None, calibrator=None): self.camera = camera self.config = config or {} self.ptz = ptz_camera # PTZ球机(可选) self.calibrator = calibrator # 校准器(可选) self.detector = None self.event_pusher = None self.voice_announcer = None self.event_listener = None self.state = CoordinatorState.IDLE self.state_lock = threading.Lock() self.running = False self.worker_thread = None self.alert_records: List[AlertRecord] = [] self.alert_cooldown = {} # 告警冷却时间(按违规类型) self._violation_cooldown = {} self.stats = { 'frames_processed': 0, 'persons_detected': 0, 'violations_detected': 0, 'events_pushed': 0, 'voice_announced': 0, 'ptz_commands_sent': 0, 'start_time': None } self.stats_lock = threading.Lock() self.on_violation_detected: Optional[Callable] = None self.on_frame_processed: Optional[Callable] = None self._init_components() def _init_components(self): """初始化各组件""" # 从 SYSTEM_CONFIG 读取功能开关 enable_detection = SYSTEM_CONFIG.get('enable_detection', True) enable_safety_detection = SYSTEM_CONFIG.get('enable_safety_detection', True) enable_event_push = SYSTEM_CONFIG.get('enable_event_push', True) enable_voice_announce = SYSTEM_CONFIG.get('enable_voice_announce', True) # 安全检测器 if enable_detection and enable_safety_detection: try: self.detector = SafetyDetector( model_path=SAFETY_DETECTION_CONFIG.get('model_path'), use_gpu=SAFETY_DETECTION_CONFIG.get('use_gpu', True), conf_threshold=SAFETY_DETECTION_CONFIG.get('conf_threshold', 0.5), person_threshold=SAFETY_DETECTION_CONFIG.get('person_threshold', 0.8) ) print("安全检测器初始化成功") except Exception as e: print(f"安全检测器初始化失败: {e}") else: print("安全检测功能已禁用") # 事件推送器 if enable_event_push: try: self.event_pusher = EventPusher(EVENT_PUSHER_CONFIG) print("事件推送器初始化成功") except Exception as e: print(f"事件推送器初始化失败: {e}") else: print("事件推送功能已禁用") # 语音播报器 if enable_voice_announce: try: self.voice_announcer = VoiceAnnouncer( tts_config=VOICE_ANNOUNCER_CONFIG.get('tts', {}), player_config=VOICE_ANNOUNCER_CONFIG.get('player', {}) ) print("语音播报器初始化成功") except Exception as e: print(f"语音播报器初始化失败: {e}") else: print("语音播报功能已禁用") # 事件监听器 if EVENT_LISTENER_CONFIG.get('enabled', True): try: self.event_listener = EventListener(EVENT_LISTENER_CONFIG) # 设置语音播放回调 self.event_listener.set_voice_callback(self._on_voice_command) print("事件监听器初始化成功") except Exception as e: print(f"事件监听器初始化失败: {e}") def _on_voice_command(self, cmd: Dict): """处理语音播放指令""" if not self.voice_announcer: return text = cmd.get('text', '') priority = VoicePriority(cmd.get('priority', 2)) if text: self.voice_announcer.announce(text, priority=priority) def start(self) -> bool: """启动控制器""" if self.running: return True if self.event_pusher: self.event_pusher.start() if self.voice_announcer: self.voice_announcer.start() if self.event_listener: self.event_listener.start() self.running = True self.worker_thread = threading.Thread(target=self._worker, daemon=True) self.worker_thread.start() # PTZ跟踪已禁用 with self.stats_lock: self.stats['start_time'] = time.time() print("安全联动控制器已启动") return True def stop(self): """停止控制器""" self.running = False if self.worker_thread: self.worker_thread.join(timeout=3) # PTZ跟踪已禁用 if self.event_pusher: self.event_pusher.stop() if self.voice_announcer: self.voice_announcer.stop() if self.event_listener: self.event_listener.stop() self._print_stats() print("安全联动控制器已停止") def _worker(self): """工作线程""" # 优先使用 detection_fps,默认每秒2帧 detection_fps = SAFETY_DETECTION_CONFIG.get('detection_fps', 2) detection_interval = 1.0 / detection_fps # 根据FPS计算间隔 last_detection_time = 0 detection_run_count = 0 detection_violation_count = 0 frame_count = 0 last_log_time = time.time() heartbeat_interval = 30.0 last_no_detect_log_time = 0 import logging sc_logger = logging.getLogger(__name__) if self.detector is None: sc_logger.warning("[安全检测] ⚠️ 安全检测器未初始化! 安全检测不可用") else: sc_logger.info(f"[安全检测] ✓ 安全检测器已就绪, 检测帧率={detection_fps}fps(间隔={detection_interval:.2f}s)") while self.running: try: current_time = time.time() frame = self.camera.get_frame() if self.camera else None if frame is None: time.sleep(0.01) continue frame_count += 1 self._update_stats('frames_processed') if current_time - last_log_time >= heartbeat_interval: stats = self.get_stats() state_str = self.state.name if hasattr(self.state, 'name') else str(self.state) sc_logger.info( f"[安全检测] 状态={state_str}, " f"检测轮次={detection_run_count}(有人={detection_violation_count}), " f"帧数={frame_count}" ) frame_count = 0 last_log_time = current_time if current_time - last_detection_time >= detection_interval: last_detection_time = current_time detection_run_count += 1 result = self._process_frame_with_logging(frame, detection_run_count, detection_violation_count, last_no_detect_log_time, sc_logger) detection_violation_count = result self._cleanup_tracks() time.sleep(0.01) except Exception as e: sc_logger.error(f"[安全检测] 处理错误: {e}") time.sleep(0.1) def _process_frame_with_logging(self, frame: np.ndarray, run_count: int, violation_count: int, last_no_detect_time: float, sc_logger) -> int: """处理帧并返回更新的violation_count""" if self.detector is None: return violation_count self._set_state(CoordinatorState.DETECTING) detections = self.detector.detect(frame) status_list = self.detector.check_safety(frame, detections) self._update_stats('persons_detected', len(status_list)) # 轨迹追踪已禁用 has_violation = False for status in status_list: if status.is_violation: self._handle_violation(status, frame) has_violation = True if has_violation: violation_count += 1 if not status_list: current_time = time.time() if current_time - last_no_detect_time >= 30.0: sc_logger.info( f"[安全检测] · YOLO检测运行正常, 本轮未检测到人员 " f"(累计检测{run_count}轮, 违规{violation_count}轮)" ) if self.on_frame_processed: self.on_frame_processed(frame, detections, status_list) return violation_count def _process_frame(self, frame: np.ndarray): """处理帧""" if self.detector is None: return self._set_state(CoordinatorState.DETECTING) # 安全检测 detections = self.detector.detect(frame) status_list = self.detector.check_safety(frame, detections) self._update_stats('persons_detected', len(status_list)) # 检查违规(轨迹追踪已禁用) for status in status_list: if status.is_violation: self._handle_violation(status, frame) # 回调 if self.on_frame_processed: self.on_frame_processed(frame, detections, status_list) # 轨迹追踪已禁用 - _update_tracks 和 _cleanup_tracks 方法已移除 def _handle_violation(self, status: PersonSafetyStatus, frame: np.ndarray): """处理违规""" current_time = time.time() # 检查冷却时间(按违规类型) violation_key = status.get_violation_desc() cooldown = SAFETY_DETECTION_CONFIG.get('alert_cooldown', 3.0) if violation_key in self.alert_cooldown: if current_time - self.alert_cooldown[violation_key] < cooldown: return # 记录告警 self.alert_cooldown[violation_key] = current_time description = status.get_violation_desc() violation_type = status.violation_types[0].value if status.violation_types else "未知" # 裁剪人体区域 x1, y1, x2, y2 = status.person_bbox margin = 20 x1 = max(0, x1 - margin) y1 = max(0, y1 - margin) x2 = min(frame.shape[1], x2 + margin) y2 = min(frame.shape[0], y2 + margin) person_image = frame[y1:y2, x1:x2].copy() record = AlertRecord( track_id=0, # 轨迹追踪已禁用 violation_type=violation_type, description=description, frame=person_image, timestamp=current_time ) self.alert_records.append(record) self._update_stats('violations_detected') # PTZ跟踪已禁用 # 回调 if self.on_violation_detected: self.on_violation_detected(status, frame) # 推送事件 if self.event_pusher: self.event_pusher.push_safety_violation( description=description, image=person_image, track_id=0, # 轨迹追踪已禁用 confidence=status.person_conf ) self._update_stats('events_pushed') # 语音播报 if self.voice_announcer: self.voice_announcer.announce_violation(description, urgent=True) self._update_stats('voice_announced') print(f"[告警] {description}") # PTZ跟踪已禁用 - _track_violator_ptz 和 _ptz_worker 方法已移除 def _set_state(self, state: CoordinatorState): """设置状态""" with self.state_lock: self.state = state def get_state(self) -> CoordinatorState: """获取状态""" with self.state_lock: return self.state def _update_stats(self, key: str, value: int = 1): """更新统计""" with self.stats_lock: if key in self.stats: self.stats[key] += value def _print_stats(self): """打印统计""" with self.stats_lock: if self.stats['start_time']: elapsed = time.time() - self.stats['start_time'] print("\n=== 安全检测统计 ===") print(f"运行时长: {elapsed:.1f}秒") print(f"处理帧数: {self.stats['frames_processed']}") print(f"检测人员: {self.stats['persons_detected']}次") print(f"违规检测: {self.stats['violations_detected']}次") print(f"事件推送: {self.stats['events_pushed']}次") print(f"语音播报: {self.stats['voice_announced']}次") if self.event_pusher: push_stats = self.event_pusher.get_stats() print(f"推送详情: 成功{push_stats['pushed_events']}, 失败{push_stats['failed_events']}") if self.voice_announcer: voice_stats = self.voice_announcer.get_stats() print(f"播报详情: 成功{voice_stats['played_commands']}, 失败{voice_stats['failed_commands']}") print("===================\n") def get_stats(self) -> Dict: """获取统计""" with self.stats_lock: return self.stats.copy() def get_alerts(self) -> List[AlertRecord]: """获取告警记录""" return self.alert_records.copy() def announce(self, text: str, priority: VoicePriority = VoicePriority.NORMAL): """ 手动播报语音 Args: text: 播报文本 priority: 优先级 """ if self.voice_announcer: self.voice_announcer.announce(text, priority=priority) def force_detect(self, frame: np.ndarray = None) -> Tuple[List[SafetyDetection], List[PersonSafetyStatus]]: """ 强制执行一次检测 Args: frame: 输入帧,如果为 None 则从摄像头获取 Returns: (检测结果, 安全状态列表) """ if frame is None: frame = self.camera.get_frame() if self.camera else None if frame is None or self.detector is None: return [], [] detections = self.detector.detect(frame) status_list = self.detector.check_safety(frame, detections) return detections, status_list class SimpleCamera: """简单摄像头封装(用于测试)""" def __init__(self, source=0): """ 初始化摄像头 Args: source: 视频源 (摄像头索引、RTSP地址、视频文件路径) """ self.cap = None self.source = source self.connected = False def connect(self) -> bool: """连接摄像头""" try: # 使用 FFmpeg 单线程模式避免线程安全崩溃 import os os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1' self.cap = cv2.VideoCapture(self.source, cv2.CAP_FFMPEG) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.connected = self.cap.isOpened() return self.connected except Exception as e: print(f"连接摄像头失败: {e}") return False def disconnect(self): """断开连接""" if self.cap: self.cap.release() self.connected = False def get_frame(self) -> Optional[np.ndarray]: """获取帧""" if self.cap is None or not self.cap.isOpened(): return None ret, frame = self.cap.read() return frame if ret else None def create_coordinator(camera_source=0, config: Dict = None) -> SafetyCoordinator: """ 创建安全联动控制器 Args: camera_source: 摄像头源 config: 配置 Returns: SafetyCoordinator 实例 """ camera = SimpleCamera(camera_source) return SafetyCoordinator(camera, config)