| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524 |
- """
- 安全联动控制器
- 整合安全检测、事件推送和语音播报功能
- """
- import time
- import threading
- import queue
- from typing import Optional, List, Dict, Tuple, Callable
- from dataclasses import dataclass
- from enum import Enum
- import numpy as np
- import cv2
- from config import (
- COORDINATOR_CONFIG,
- SAFETY_DETECTION_CONFIG,
- EVENT_PUSHER_CONFIG,
- EVENT_LISTENER_CONFIG,
- VOICE_ANNOUNCER_CONFIG,
- SYSTEM_CONFIG
- )
- from safety_detector import (
- SafetyDetector, SafetyDetection, PersonSafetyStatus,
- SafetyViolationType, draw_safety_result
- )
- from event_pusher import EventPusher, EventListener, SafetyEvent, EventType
- from voice_announcer import VoiceAnnouncer, VoicePriority
- class CoordinatorState(Enum):
- """控制器状态"""
- IDLE = 0 # 空闲
- DETECTING = 1 # 检测中
- TRACKING = 2 # 跟踪中
- ALERTING = 3 # 告警中
- @dataclass
- class AlertRecord:
- """告警记录"""
- track_id: int # 跟踪ID
- violation_type: str # 违规类型
- description: str # 描述
- frame: Optional[np.ndarray] # 图像
- timestamp: float # 时间戳
- pushed: bool = False # 是否已推送
- announced: bool = False # 是否已播报
- class SafetyCoordinator:
- """安全联动控制器:协调摄像头、安全检测、事件推送、语音播报、PTZ跟踪"""
-
- def __init__(self, camera, config: Dict = None, ptz_camera=None, calibrator=None):
- self.camera = camera
- self.config = config or {}
- self.ptz = ptz_camera # PTZ球机(可选)
- self.calibrator = calibrator # 校准器(可选)
-
- self.detector = None
- self.event_pusher = None
- self.voice_announcer = None
- self.event_listener = None
-
- self.state = CoordinatorState.IDLE
- self.state_lock = threading.Lock()
-
- self.running = False
- self.worker_thread = None
-
- self.alert_records: List[AlertRecord] = []
- self.alert_cooldown = {}
-
- # 告警冷却时间(按违规类型)
- self._violation_cooldown = {}
-
- self.stats = {
- 'frames_processed': 0,
- 'persons_detected': 0,
- 'violations_detected': 0,
- 'events_pushed': 0,
- 'voice_announced': 0,
- 'ptz_commands_sent': 0,
- 'start_time': None
- }
- self.stats_lock = threading.Lock()
-
- self.on_violation_detected: Optional[Callable] = None
- self.on_frame_processed: Optional[Callable] = None
-
- self._init_components()
-
- def _init_components(self):
- """初始化各组件"""
- # 从 SYSTEM_CONFIG 读取功能开关
- enable_detection = SYSTEM_CONFIG.get('enable_detection', True)
- enable_safety_detection = SYSTEM_CONFIG.get('enable_safety_detection', True)
- enable_event_push = SYSTEM_CONFIG.get('enable_event_push', True)
- enable_voice_announce = SYSTEM_CONFIG.get('enable_voice_announce', True)
-
- # 安全检测器
- if enable_detection and enable_safety_detection:
- try:
- self.detector = SafetyDetector(
- model_path=SAFETY_DETECTION_CONFIG.get('model_path'),
- use_gpu=SAFETY_DETECTION_CONFIG.get('use_gpu', True),
- conf_threshold=SAFETY_DETECTION_CONFIG.get('conf_threshold', 0.5),
- person_threshold=SAFETY_DETECTION_CONFIG.get('person_threshold', 0.8)
- )
- print("安全检测器初始化成功")
- except Exception as e:
- print(f"安全检测器初始化失败: {e}")
- else:
- print("安全检测功能已禁用")
-
- # 事件推送器
- if enable_event_push:
- try:
- self.event_pusher = EventPusher(EVENT_PUSHER_CONFIG)
- print("事件推送器初始化成功")
- except Exception as e:
- print(f"事件推送器初始化失败: {e}")
- else:
- print("事件推送功能已禁用")
-
- # 语音播报器
- if enable_voice_announce:
- try:
- self.voice_announcer = VoiceAnnouncer(
- tts_config=VOICE_ANNOUNCER_CONFIG.get('tts', {}),
- player_config=VOICE_ANNOUNCER_CONFIG.get('player', {})
- )
- print("语音播报器初始化成功")
- except Exception as e:
- print(f"语音播报器初始化失败: {e}")
- else:
- print("语音播报功能已禁用")
-
- # 事件监听器
- if EVENT_LISTENER_CONFIG.get('enabled', True):
- try:
- self.event_listener = EventListener(EVENT_LISTENER_CONFIG)
- # 设置语音播放回调
- self.event_listener.set_voice_callback(self._on_voice_command)
- print("事件监听器初始化成功")
- except Exception as e:
- print(f"事件监听器初始化失败: {e}")
-
- def _on_voice_command(self, cmd: Dict):
- """处理语音播放指令"""
- if not self.voice_announcer:
- return
-
- text = cmd.get('text', '')
- priority = VoicePriority(cmd.get('priority', 2))
-
- if text:
- self.voice_announcer.announce(text, priority=priority)
-
- def start(self) -> bool:
- """启动控制器"""
- if self.running:
- return True
-
- if self.event_pusher:
- self.event_pusher.start()
-
- if self.voice_announcer:
- self.voice_announcer.start()
-
- if self.event_listener:
- self.event_listener.start()
-
- self.running = True
- self.worker_thread = threading.Thread(target=self._worker, daemon=True)
- self.worker_thread.start()
-
- # PTZ跟踪已禁用
-
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- print("安全联动控制器已启动")
- return True
-
- def stop(self):
- """停止控制器"""
- self.running = False
-
- if self.worker_thread:
- self.worker_thread.join(timeout=3)
-
- # PTZ跟踪已禁用
-
- if self.event_pusher:
- self.event_pusher.stop()
-
- if self.voice_announcer:
- self.voice_announcer.stop()
-
- if self.event_listener:
- self.event_listener.stop()
-
- self._print_stats()
- print("安全联动控制器已停止")
-
- def _worker(self):
- """工作线程"""
- # 优先使用 detection_fps,默认每秒2帧
- detection_fps = SAFETY_DETECTION_CONFIG.get('detection_fps', 2)
- detection_interval = 1.0 / detection_fps # 根据FPS计算间隔
- last_detection_time = 0
- detection_run_count = 0
- detection_violation_count = 0
- frame_count = 0
- last_log_time = time.time()
- heartbeat_interval = 30.0
- last_no_detect_log_time = 0
-
- import logging
- sc_logger = logging.getLogger(__name__)
-
- if self.detector is None:
- sc_logger.warning("[安全检测] ⚠️ 安全检测器未初始化! 安全检测不可用")
- else:
- sc_logger.info(f"[安全检测] ✓ 安全检测器已就绪, 检测帧率={detection_fps}fps(间隔={detection_interval:.2f}s)")
-
- while self.running:
- try:
- current_time = time.time()
-
- frame = self.camera.get_frame() if self.camera else None
- if frame is None:
- time.sleep(0.01)
- continue
-
- frame_count += 1
- self._update_stats('frames_processed')
-
- if current_time - last_log_time >= heartbeat_interval:
- stats = self.get_stats()
- state_str = self.state.name if hasattr(self.state, 'name') else str(self.state)
- sc_logger.info(
- f"[安全检测] 状态={state_str}, "
- f"检测轮次={detection_run_count}(有人={detection_violation_count}), "
- f"帧数={frame_count}"
- )
- frame_count = 0
- last_log_time = current_time
-
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
- detection_run_count += 1
-
- result = self._process_frame_with_logging(frame, detection_run_count, detection_violation_count, last_no_detect_log_time, sc_logger)
- detection_violation_count = result
-
- self._cleanup_tracks()
-
- time.sleep(0.01)
-
- except Exception as e:
- sc_logger.error(f"[安全检测] 处理错误: {e}")
- time.sleep(0.1)
-
- def _process_frame_with_logging(self, frame: np.ndarray, run_count: int, violation_count: int, last_no_detect_time: float, sc_logger) -> int:
- """处理帧并返回更新的violation_count"""
- if self.detector is None:
- return violation_count
-
- self._set_state(CoordinatorState.DETECTING)
-
- detections = self.detector.detect(frame)
- status_list = self.detector.check_safety(frame, detections)
-
- self._update_stats('persons_detected', len(status_list))
- # 轨迹追踪已禁用
-
- has_violation = False
- for status in status_list:
- if status.is_violation:
- self._handle_violation(status, frame)
- has_violation = True
-
- if has_violation:
- violation_count += 1
-
- if not status_list:
- current_time = time.time()
- if current_time - last_no_detect_time >= 30.0:
- sc_logger.info(
- f"[安全检测] · YOLO检测运行正常, 本轮未检测到人员 "
- f"(累计检测{run_count}轮, 违规{violation_count}轮)"
- )
-
- if self.on_frame_processed:
- self.on_frame_processed(frame, detections, status_list)
-
- return violation_count
-
- def _process_frame(self, frame: np.ndarray):
- """处理帧"""
- if self.detector is None:
- return
-
- self._set_state(CoordinatorState.DETECTING)
-
- # 安全检测
- detections = self.detector.detect(frame)
- status_list = self.detector.check_safety(frame, detections)
-
- self._update_stats('persons_detected', len(status_list))
-
- # 检查违规(轨迹追踪已禁用)
- for status in status_list:
- if status.is_violation:
- self._handle_violation(status, frame)
-
- # 回调
- if self.on_frame_processed:
- self.on_frame_processed(frame, detections, status_list)
-
- # 轨迹追踪已禁用 - _update_tracks 和 _cleanup_tracks 方法已移除
-
- def _handle_violation(self, status: PersonSafetyStatus, frame: np.ndarray):
- """处理违规"""
- current_time = time.time()
-
- # 检查冷却时间(按违规类型)
- violation_key = status.get_violation_desc()
- cooldown = SAFETY_DETECTION_CONFIG.get('alert_cooldown', 3.0)
- if violation_key in self.alert_cooldown:
- if current_time - self.alert_cooldown[violation_key] < cooldown:
- return
-
- # 记录告警
- self.alert_cooldown[violation_key] = current_time
-
- description = status.get_violation_desc()
- violation_type = status.violation_types[0].value if status.violation_types else "未知"
-
- # 裁剪人体区域
- x1, y1, x2, y2 = status.person_bbox
- margin = 20
- x1 = max(0, x1 - margin)
- y1 = max(0, y1 - margin)
- x2 = min(frame.shape[1], x2 + margin)
- y2 = min(frame.shape[0], y2 + margin)
- person_image = frame[y1:y2, x1:x2].copy()
-
- record = AlertRecord(
- track_id=0, # 轨迹追踪已禁用
- violation_type=violation_type,
- description=description,
- frame=person_image,
- timestamp=current_time
- )
-
- self.alert_records.append(record)
- self._update_stats('violations_detected')
-
- # PTZ跟踪已禁用
-
- # 回调
- if self.on_violation_detected:
- self.on_violation_detected(status, frame)
-
- # 推送事件
- if self.event_pusher:
- self.event_pusher.push_safety_violation(
- description=description,
- image=person_image,
- track_id=0, # 轨迹追踪已禁用
- confidence=status.person_conf
- )
- self._update_stats('events_pushed')
-
- # 语音播报
- if self.voice_announcer:
- self.voice_announcer.announce_violation(description, urgent=True)
- self._update_stats('voice_announced')
-
- print(f"[告警] {description}")
-
- # PTZ跟踪已禁用 - _track_violator_ptz 和 _ptz_worker 方法已移除
-
- def _set_state(self, state: CoordinatorState):
- """设置状态"""
- with self.state_lock:
- self.state = state
-
- def get_state(self) -> CoordinatorState:
- """获取状态"""
- with self.state_lock:
- return self.state
-
- def _update_stats(self, key: str, value: int = 1):
- """更新统计"""
- with self.stats_lock:
- if key in self.stats:
- self.stats[key] += value
-
- def _print_stats(self):
- """打印统计"""
- with self.stats_lock:
- if self.stats['start_time']:
- elapsed = time.time() - self.stats['start_time']
- print("\n=== 安全检测统计 ===")
- print(f"运行时长: {elapsed:.1f}秒")
- print(f"处理帧数: {self.stats['frames_processed']}")
- print(f"检测人员: {self.stats['persons_detected']}次")
- print(f"违规检测: {self.stats['violations_detected']}次")
- print(f"事件推送: {self.stats['events_pushed']}次")
- print(f"语音播报: {self.stats['voice_announced']}次")
-
- if self.event_pusher:
- push_stats = self.event_pusher.get_stats()
- print(f"推送详情: 成功{push_stats['pushed_events']}, 失败{push_stats['failed_events']}")
-
- if self.voice_announcer:
- voice_stats = self.voice_announcer.get_stats()
- print(f"播报详情: 成功{voice_stats['played_commands']}, 失败{voice_stats['failed_commands']}")
-
- print("===================\n")
-
- def get_stats(self) -> Dict:
- """获取统计"""
- with self.stats_lock:
- return self.stats.copy()
-
- def get_alerts(self) -> List[AlertRecord]:
- """获取告警记录"""
- return self.alert_records.copy()
-
- def announce(self, text: str, priority: VoicePriority = VoicePriority.NORMAL):
- """
- 手动播报语音
-
- Args:
- text: 播报文本
- priority: 优先级
- """
- if self.voice_announcer:
- self.voice_announcer.announce(text, priority=priority)
-
- def force_detect(self, frame: np.ndarray = None) -> Tuple[List[SafetyDetection], List[PersonSafetyStatus]]:
- """
- 强制执行一次检测
-
- Args:
- frame: 输入帧,如果为 None 则从摄像头获取
-
- Returns:
- (检测结果, 安全状态列表)
- """
- if frame is None:
- frame = self.camera.get_frame() if self.camera else None
-
- if frame is None or self.detector is None:
- return [], []
-
- detections = self.detector.detect(frame)
- status_list = self.detector.check_safety(frame, detections)
-
- return detections, status_list
- class SimpleCamera:
- """简单摄像头封装(用于测试)"""
-
- def __init__(self, source=0):
- """
- 初始化摄像头
-
- Args:
- source: 视频源 (摄像头索引、RTSP地址、视频文件路径)
- """
- self.cap = None
- self.source = source
- self.connected = False
-
- def connect(self) -> bool:
- """连接摄像头"""
- try:
- # 使用 FFmpeg 单线程模式避免线程安全崩溃
- import os
- os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
- self.cap = cv2.VideoCapture(self.source, cv2.CAP_FFMPEG)
- self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- self.connected = self.cap.isOpened()
- return self.connected
- except Exception as e:
- print(f"连接摄像头失败: {e}")
- return False
-
- def disconnect(self):
- """断开连接"""
- if self.cap:
- self.cap.release()
- self.connected = False
-
- def get_frame(self) -> Optional[np.ndarray]:
- """获取帧"""
- if self.cap is None or not self.cap.isOpened():
- return None
-
- ret, frame = self.cap.read()
- return frame if ret else None
- def create_coordinator(camera_source=0, config: Dict = None) -> SafetyCoordinator:
- """
- 创建安全联动控制器
-
- Args:
- camera_source: 摄像头源
- config: 配置
-
- Returns:
- SafetyCoordinator 实例
- """
- camera = SimpleCamera(camera_source)
- return SafetyCoordinator(camera, config)
|