| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593 |
- """
- 安全联动控制器
- 整合安全检测、事件推送和语音播报功能
- """
- import time
- import threading
- import queue
- from typing import Optional, List, Dict, Tuple, Callable
- from dataclasses import dataclass
- from enum import Enum
- import numpy as np
- import cv2
- from config import (
- COORDINATOR_CONFIG,
- SAFETY_DETECTION_CONFIG,
- EVENT_PUSHER_CONFIG,
- EVENT_LISTENER_CONFIG,
- VOICE_ANNOUNCER_CONFIG,
- SYSTEM_CONFIG
- )
- from safety_detector import (
- SafetyDetector, SafetyDetection, PersonSafetyStatus,
- SafetyViolationType, draw_safety_result
- )
- from event_pusher import EventPusher, EventListener, SafetyEvent, EventType
- from voice_announcer import VoiceAnnouncer, VoicePriority
- class CoordinatorState(Enum):
- """控制器状态"""
- IDLE = 0 # 空闲
- DETECTING = 1 # 检测中
- TRACKING = 2 # 跟踪中
- ALERTING = 3 # 告警中
- @dataclass
- class AlertRecord:
- """告警记录"""
- track_id: int # 跟踪ID
- violation_type: str # 违规类型
- description: str # 描述
- frame: Optional[np.ndarray] # 图像
- timestamp: float # 时间戳
- pushed: bool = False # 是否已推送
- announced: bool = False # 是否已播报
- class SafetyCoordinator:
- """安全联动控制器:协调摄像头、安全检测、事件推送、语音播报、PTZ跟踪"""
-
- def __init__(self, camera, config: Dict = None, ptz_camera=None, calibrator=None):
- self.camera = camera
- self.config = config or {}
- self.ptz = ptz_camera # PTZ球机(可选)
- self.calibrator = calibrator # 校准器(可选)
-
- self.detector = None
- self.event_pusher = None
- self.voice_announcer = None
- self.event_listener = None
-
- self.state = CoordinatorState.IDLE
- self.state_lock = threading.Lock()
-
- self.running = False
- self.worker_thread = None
-
- # PTZ跟踪线程(独立于检测线程)
- self._ptz_thread = None
- self._ptz_queue: queue.Queue = queue.Queue(maxsize=10)
- self._ptz_cooldown = 0.15
- self._last_ptz_time = 0.0
-
- # 跟踪状态
- self.tracks = {}
- self.next_track_id = 1
-
- self.alert_records: List[AlertRecord] = []
- self.alert_cooldown = {}
-
- self.stats = {
- 'frames_processed': 0,
- 'persons_detected': 0,
- 'violations_detected': 0,
- 'events_pushed': 0,
- 'voice_announced': 0,
- 'ptz_commands_sent': 0,
- 'start_time': None
- }
- self.stats_lock = threading.Lock()
-
- self.on_violation_detected: Optional[Callable] = None
- self.on_frame_processed: Optional[Callable] = None
-
- self._init_components()
-
- def _init_components(self):
- """初始化各组件"""
- # 从 SYSTEM_CONFIG 读取功能开关
- enable_detection = SYSTEM_CONFIG.get('enable_detection', True)
- enable_safety_detection = SYSTEM_CONFIG.get('enable_safety_detection', True)
- enable_event_push = SYSTEM_CONFIG.get('enable_event_push', True)
- enable_voice_announce = SYSTEM_CONFIG.get('enable_voice_announce', True)
-
- # 安全检测器
- if enable_detection and enable_safety_detection:
- try:
- self.detector = SafetyDetector(
- model_path=SAFETY_DETECTION_CONFIG.get('model_path'),
- use_gpu=SAFETY_DETECTION_CONFIG.get('use_gpu', True),
- conf_threshold=SAFETY_DETECTION_CONFIG.get('conf_threshold', 0.5),
- person_threshold=SAFETY_DETECTION_CONFIG.get('person_threshold', 0.8)
- )
- print("安全检测器初始化成功")
- except Exception as e:
- print(f"安全检测器初始化失败: {e}")
- else:
- print("安全检测功能已禁用")
-
- # 事件推送器
- if enable_event_push:
- try:
- self.event_pusher = EventPusher(EVENT_PUSHER_CONFIG)
- print("事件推送器初始化成功")
- except Exception as e:
- print(f"事件推送器初始化失败: {e}")
- else:
- print("事件推送功能已禁用")
-
- # 语音播报器
- if enable_voice_announce:
- try:
- self.voice_announcer = VoiceAnnouncer(
- tts_config=VOICE_ANNOUNCER_CONFIG.get('tts', {}),
- player_config=VOICE_ANNOUNCER_CONFIG.get('player', {})
- )
- print("语音播报器初始化成功")
- except Exception as e:
- print(f"语音播报器初始化失败: {e}")
- else:
- print("语音播报功能已禁用")
-
- # 事件监听器
- if EVENT_LISTENER_CONFIG.get('enabled', True):
- try:
- self.event_listener = EventListener(EVENT_LISTENER_CONFIG)
- # 设置语音播放回调
- self.event_listener.set_voice_callback(self._on_voice_command)
- print("事件监听器初始化成功")
- except Exception as e:
- print(f"事件监听器初始化失败: {e}")
-
- def _on_voice_command(self, cmd: Dict):
- """处理语音播放指令"""
- if not self.voice_announcer:
- return
-
- text = cmd.get('text', '')
- priority = VoicePriority(cmd.get('priority', 2))
-
- if text:
- self.voice_announcer.announce(text, priority=priority)
-
- def start(self) -> bool:
- """启动控制器"""
- if self.running:
- return True
-
- if self.event_pusher:
- self.event_pusher.start()
-
- if self.voice_announcer:
- self.voice_announcer.start()
-
- if self.event_listener:
- self.event_listener.start()
-
- self.running = True
- self.worker_thread = threading.Thread(target=self._worker, daemon=True)
- self.worker_thread.start()
-
- # 启动 PTZ 跟踪线程(如果 PTZ 可用)
- if self.ptz and SYSTEM_CONFIG.get('enable_ptz_tracking', True):
- self._ptz_thread = threading.Thread(target=self._ptz_worker, daemon=True)
- self._ptz_thread.start()
- print("[SafetyCoordinator] PTZ跟踪线程已启动")
-
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- print("安全联动控制器已启动")
- return True
-
- def stop(self):
- """停止控制器"""
- self.running = False
-
- if self.worker_thread:
- self.worker_thread.join(timeout=3)
-
- # 停止 PTZ 跟踪线程
- if self._ptz_thread:
- self._ptz_thread.join(timeout=2)
- self._ptz_thread = None
-
- if self.event_pusher:
- self.event_pusher.stop()
-
- if self.voice_announcer:
- self.voice_announcer.stop()
-
- if self.event_listener:
- self.event_listener.stop()
-
- self._print_stats()
- print("安全联动控制器已停止")
-
- def _worker(self):
- """工作线程"""
- detection_interval = SAFETY_DETECTION_CONFIG.get('detection_interval', 0.1)
- last_detection_time = 0
-
- while self.running:
- try:
- current_time = time.time()
-
- # 获取帧
- frame = self.camera.get_frame() if self.camera else None
- if frame is None:
- time.sleep(0.01)
- continue
-
- self._update_stats('frames_processed')
-
- # 周期性检测
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
- self._process_frame(frame)
-
- # 清理过期跟踪
- self._cleanup_tracks()
-
- time.sleep(0.01)
-
- except Exception as e:
- print(f"处理错误: {e}")
- time.sleep(0.1)
-
- def _process_frame(self, frame: np.ndarray):
- """处理帧"""
- if self.detector is None:
- return
-
- self._set_state(CoordinatorState.DETECTING)
-
- # 安全检测
- detections = self.detector.detect(frame)
- status_list = self.detector.check_safety(frame, detections)
-
- self._update_stats('persons_detected', len(status_list))
-
- # 更新跟踪
- self._update_tracks(detections)
-
- # 检查违规
- for status in status_list:
- if status.is_violation:
- self._handle_violation(status, frame)
-
- # 回调
- if self.on_frame_processed:
- self.on_frame_processed(frame, detections, status_list)
-
- def _update_tracks(self, detections: List[SafetyDetection]):
- """更新跟踪状态"""
- current_time = time.time()
- persons = [d for d in detections if d.class_id == 3] # 人
-
- # 匹配现有跟踪
- used_ids = set()
-
- for person in persons:
- best_id = None
- min_dist = float('inf')
-
- for track_id, track in self.tracks.items():
- if track_id in used_ids:
- continue
-
- dist = np.sqrt(
- (person.center[0] - track['center'][0])**2 +
- (person.center[1] - track['center'][1])**2
- )
-
- if dist < min_dist and dist < 100: # 距离阈值
- min_dist = dist
- best_id = track_id
-
- if best_id is not None:
- # 更新现有跟踪
- self.tracks[best_id]['center'] = person.center
- self.tracks[best_id]['last_update'] = current_time
- person.track_id = best_id
- used_ids.add(best_id)
- else:
- # 新跟踪
- track_id = self.next_track_id
- self.next_track_id += 1
- person.track_id = track_id
- self.tracks[track_id] = {
- 'center': person.center,
- 'last_update': current_time,
- 'alerts': []
- }
-
- def _cleanup_tracks(self):
- """清理过期跟踪"""
- current_time = time.time()
- timeout = COORDINATOR_CONFIG.get('tracking_timeout', 5.0)
-
- expired = [
- tid for tid, t in self.tracks.items()
- if current_time - t['last_update'] > timeout
- ]
-
- for tid in expired:
- del self.tracks[tid]
- self.alert_cooldown.pop(tid, None)
-
- def _handle_violation(self, status: PersonSafetyStatus, frame: np.ndarray):
- """处理违规"""
- current_time = time.time()
- track_id = status.track_id
-
- # 检查冷却时间
- cooldown = SAFETY_DETECTION_CONFIG.get('alert_cooldown', 3.0)
- if track_id in self.alert_cooldown:
- if current_time - self.alert_cooldown[track_id] < cooldown:
- return
-
- # 记录告警
- self.alert_cooldown[track_id] = current_time
-
- description = status.get_violation_desc()
- violation_type = status.violation_types[0].value if status.violation_types else "未知"
-
- # 裁剪人体区域
- x1, y1, x2, y2 = status.person_bbox
- margin = 20
- x1 = max(0, x1 - margin)
- y1 = max(0, y1 - margin)
- x2 = min(frame.shape[1], x2 + margin)
- y2 = min(frame.shape[0], y2 + margin)
- person_image = frame[y1:y2, x1:x2].copy()
-
- record = AlertRecord(
- track_id=track_id,
- violation_type=violation_type,
- description=description,
- frame=person_image,
- timestamp=current_time
- )
-
- self.alert_records.append(record)
- self._update_stats('violations_detected')
-
- # PTZ 跟踪违规人员(如果 PTZ 可用且启用)
- if self.ptz and SYSTEM_CONFIG.get('enable_ptz_tracking', True):
- self._track_violator_ptz(status, frame)
-
- # 回调
- if self.on_violation_detected:
- self.on_violation_detected(status, frame)
-
- # 推送事件
- if self.event_pusher:
- self.event_pusher.push_safety_violation(
- description=description,
- image=person_image,
- track_id=track_id,
- confidence=status.person_conf
- )
- self._update_stats('events_pushed')
-
- # 语音播报
- if self.voice_announcer:
- self.voice_announcer.announce_violation(description, urgent=True)
- self._update_stats('voice_announced')
-
- print(f"[告警] {description}, 跟踪ID: {track_id}")
-
- def _track_violator_ptz(self, status: PersonSafetyStatus, frame: np.ndarray):
- """违规人员PTZ跟踪:将违规人员在全景画面中的位置发送给PTZ线程"""
- if self.ptz is None:
- return
-
- frame_h, frame_w = frame.shape[:2]
- x1, y1, x2, y2 = status.person_bbox
-
- # 计算违规人员在全景画面中的相对位置
- center_x = (x1 + x2) / 2
- center_y = (y1 + y2) / 2
- x_ratio = center_x / frame_w
- y_ratio = center_y / frame_h
-
- # 冷却检查
- current_time = time.time()
- if current_time - self._last_ptz_time < self._ptz_cooldown:
- return
-
- # 发送PTZ命令
- try:
- self._ptz_queue.put_nowait({
- 'x_ratio': x_ratio,
- 'y_ratio': y_ratio,
- 'track_id': status.track_id,
- 'violation_type': status.violation_types[0].value if status.violation_types else 'unknown'
- })
- self._last_ptz_time = current_time
- self._update_stats('ptz_commands_sent')
- except queue.Full:
- pass # 队列满则丢弃,下一个检测周期会重发
-
- def _ptz_worker(self):
- """PTZ控制工作线程:独立处理所有PTZ命令"""
- while self.running:
- try:
- try:
- cmd = self._ptz_queue.get(timeout=0.1)
- except queue.Empty:
- continue
-
- if self.ptz is None:
- continue
-
- x_ratio = cmd['x_ratio']
- y_ratio = cmd['y_ratio']
-
- # 使用校准器转换坐标,或使用估算
- if self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- zoom = self.ptz.ptz_config.get('default_zoom', 8)
- if self.ptz.ptz_config.get('pan_flip', False):
- pan = (pan + 180) % 360
- self.ptz.goto_exact_position(pan, tilt, zoom)
- else:
- self.ptz.track_target(x_ratio, y_ratio)
-
- except Exception as e:
- print(f"[SafetyCoordinator] PTZ跟踪错误: {e}")
- time.sleep(0.05)
-
- def _set_state(self, state: CoordinatorState):
- """设置状态"""
- with self.state_lock:
- self.state = state
-
- def get_state(self) -> CoordinatorState:
- """获取状态"""
- with self.state_lock:
- return self.state
-
- def _update_stats(self, key: str, value: int = 1):
- """更新统计"""
- with self.stats_lock:
- if key in self.stats:
- self.stats[key] += value
-
- def _print_stats(self):
- """打印统计"""
- with self.stats_lock:
- if self.stats['start_time']:
- elapsed = time.time() - self.stats['start_time']
- print("\n=== 安全检测统计 ===")
- print(f"运行时长: {elapsed:.1f}秒")
- print(f"处理帧数: {self.stats['frames_processed']}")
- print(f"检测人员: {self.stats['persons_detected']}次")
- print(f"违规检测: {self.stats['violations_detected']}次")
- print(f"事件推送: {self.stats['events_pushed']}次")
- print(f"语音播报: {self.stats['voice_announced']}次")
-
- if self.event_pusher:
- push_stats = self.event_pusher.get_stats()
- print(f"推送详情: 成功{push_stats['pushed_events']}, 失败{push_stats['failed_events']}")
-
- if self.voice_announcer:
- voice_stats = self.voice_announcer.get_stats()
- print(f"播报详情: 成功{voice_stats['played_commands']}, 失败{voice_stats['failed_commands']}")
-
- print("===================\n")
-
- def get_stats(self) -> Dict:
- """获取统计"""
- with self.stats_lock:
- return self.stats.copy()
-
- def get_alerts(self) -> List[AlertRecord]:
- """获取告警记录"""
- return self.alert_records.copy()
-
- def announce(self, text: str, priority: VoicePriority = VoicePriority.NORMAL):
- """
- 手动播报语音
-
- Args:
- text: 播报文本
- priority: 优先级
- """
- if self.voice_announcer:
- self.voice_announcer.announce(text, priority=priority)
-
- def force_detect(self, frame: np.ndarray = None) -> Tuple[List[SafetyDetection], List[PersonSafetyStatus]]:
- """
- 强制执行一次检测
-
- Args:
- frame: 输入帧,如果为 None 则从摄像头获取
-
- Returns:
- (检测结果, 安全状态列表)
- """
- if frame is None:
- frame = self.camera.get_frame() if self.camera else None
-
- if frame is None or self.detector is None:
- return [], []
-
- detections = self.detector.detect(frame)
- status_list = self.detector.check_safety(frame, detections)
-
- return detections, status_list
- class SimpleCamera:
- """简单摄像头封装(用于测试)"""
-
- def __init__(self, source=0):
- """
- 初始化摄像头
-
- Args:
- source: 视频源 (摄像头索引、RTSP地址、视频文件路径)
- """
- self.cap = None
- self.source = source
- self.connected = False
-
- def connect(self) -> bool:
- """连接摄像头"""
- try:
- # 使用 FFmpeg 单线程模式避免线程安全崩溃
- import os
- os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
- self.cap = cv2.VideoCapture(self.source, cv2.CAP_FFMPEG)
- self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- self.connected = self.cap.isOpened()
- return self.connected
- except Exception as e:
- print(f"连接摄像头失败: {e}")
- return False
-
- def disconnect(self):
- """断开连接"""
- if self.cap:
- self.cap.release()
- self.connected = False
-
- def get_frame(self) -> Optional[np.ndarray]:
- """获取帧"""
- if self.cap is None or not self.cap.isOpened():
- return None
-
- ret, frame = self.cap.read()
- return frame if ret else None
- def create_coordinator(camera_source=0, config: Dict = None) -> SafetyCoordinator:
- """
- 创建安全联动控制器
-
- Args:
- camera_source: 摄像头源
- config: 配置
-
- Returns:
- SafetyCoordinator 实例
- """
- camera = SimpleCamera(camera_source)
- return SafetyCoordinator(camera, config)
|