""" 施工现场安全行为智能识别系统 - 主程序 系统功能: 1. 实时视频监控 2. 人员、安全帽、反光衣检测 3. 安全违规识别(未戴安全帽、未穿反光衣) 4. 事件推送至业务平台 5. 接收平台指令,TTS 语音播报 """ import os import sys import time import argparse import logging import threading import signal from typing import Optional, List import cv2 import numpy as np # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import ( LOG_CONFIG, PANORAMA_CAMERA, PTZ_CAMERA, SDK_PATH, SAFETY_DETECTION_CONFIG, EVENT_PUSHER_CONFIG, VOICE_ANNOUNCER_CONFIG, SYSTEM_CONFIG, LLM_CONFIG, LLM_SAFETY_CONFIG ) from safety_detector import ( SafetyDetector, SafetyDetection, PersonSafetyStatus, draw_safety_result, SafetyViolationType, LLMSafetyDetector ) from llm_service import SafetyAnalyzer, NumberRecognizer from event_pusher import EventPusher, SafetyEvent, EventType from voice_announcer import VoiceAnnouncer, VoicePriority from safety_coordinator import SafetyCoordinator, SimpleCamera # 配置日志 def setup_logging(): """设置日志配置""" log_level = getattr(logging, LOG_CONFIG.get('level', 'INFO'), logging.INFO) log_format = LOG_CONFIG.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s') log_file = LOG_CONFIG.get('file') handlers = [logging.StreamHandler()] if log_file: from logging.handlers import RotatingFileHandler file_handler = RotatingFileHandler( log_file, maxBytes=LOG_CONFIG.get('max_bytes', 10*1024*1024), backupCount=LOG_CONFIG.get('backup_count', 5) ) file_handler.setFormatter(logging.Formatter(log_format)) handlers.append(file_handler) logging.basicConfig( level=log_level, format=log_format, handlers=handlers ) setup_logging() logger = logging.getLogger(__name__) class SafetyMonitorSystem: """ 施工现场安全监控系统 """ def __init__(self, config: dict = None): """ 初始化系统 Args: config: 配置覆盖 """ self.config = config or {} # 摄像头 self.camera = None self.camera_source = None # PTZ球机(安全模式可选联动) self.ptz_camera = None self.calibrator = None self.sdk = None # 双路流管理器(用于PTZ模式下的双流并行) self.stream_manager = None # 组件 self.detector = None # 安全检测器 (支持 LLM) self.llm_analyzer = None # 大模型安全分析器 self.number_recognizer = None # 编号识别器 self.event_pusher = None # 事件推送器 self.voice_announcer = None # 语音播报器 # 功能开关 - 从 SYSTEM_CONFIG 读取 self.enable_panorama_camera = SYSTEM_CONFIG.get('enable_panorama_camera', True) self.enable_ptz_camera = SYSTEM_CONFIG.get('enable_ptz_camera', True) self.enable_detection = SYSTEM_CONFIG.get('enable_detection', True) self.enable_safety_detection = SYSTEM_CONFIG.get('enable_safety_detection', True) self.enable_calibration = SYSTEM_CONFIG.get('enable_calibration', True) self.enable_ptz_tracking = SYSTEM_CONFIG.get('enable_ptz_tracking', True) self.enable_ocr = SYSTEM_CONFIG.get('enable_ocr', True) self.enable_llm = SYSTEM_CONFIG.get('enable_llm', True) self.enable_event_push = SYSTEM_CONFIG.get('enable_event_push', True) self.enable_voice_announce = SYSTEM_CONFIG.get('enable_voice_announce', True) # 状态 self.running = False self.display = True # 是否显示画面 # 帧处理 self.current_frame = None self.frame_lock = threading.Lock() # 统计 self.stats = { 'frames_processed': 0, 'persons_detected': 0, 'violations_detected': 0, 'events_pushed': 0, 'voice_announced': 0, 'start_time': None } self.stats_lock = threading.Lock() # 工作线程 self.detection_thread = None def initialize(self, camera_source=0) -> bool: """ 初始化系统组件 Args: camera_source: 摄像头源 (索引/RTSP/视频文件) Returns: 是否成功 """ logger.info("=" * 60) logger.info(f"初始化 {SYSTEM_CONFIG['name']} v{SYSTEM_CONFIG['version']}") logger.info("=" * 60) # 初始化摄像头 self.camera_source = camera_source if self.enable_panorama_camera: self.camera = SimpleCamera(camera_source) if not self.camera.connect(): logger.error("连接摄像头失败") return False logger.info(f"摄像头连接成功: {camera_source}") else: self.camera = None logger.info("摄像头功能已禁用") # 初始化 PTZ 球机(安全模式可选联动) if self.enable_ptz_camera and self.enable_ptz_tracking: try: from dahua_sdk import DahuaSDK sdk_path = os.path.join(SDK_PATH['lib_path'], SDK_PATH['netsdk']) self.sdk = DahuaSDK(sdk_path) if self.sdk.init(): from ptz_camera import PTZCamera ptz_config = self.config.get('ptz_camera', PTZ_CAMERA) self.ptz_camera = PTZCamera(self.sdk, ptz_config) if self.ptz_camera.connect(): logger.info(f"PTZ球机连接成功: {ptz_config['ip']}") if self.ptz_camera.start_stream_rtsp(): logger.info("PTZ球机RTSP流启动成功") else: logger.warning("PTZ球机RTSP流启动失败,PTZ跟踪将无法进行帧验证") else: logger.warning(f"PTZ球机连接失败: {ptz_config['ip']}") self.ptz_camera = None else: logger.warning("SDK初始化失败,PTZ功能不可用") self.sdk = None except Exception as e: logger.warning(f"PTZ球机初始化失败: {e},PTZ跟踪将不可用") self.ptz_camera = None # 初始化 LLM 大模型服务 if self.enable_llm: try: llm_config = {**LLM_CONFIG} if 'llm_host' in self.config: llm_config['api_host'] = self.config['llm_host'] if 'llm_port' in self.config: llm_config['api_port'] = self.config['llm_port'] self.llm_analyzer = SafetyAnalyzer(llm_config) logger.info(f"大模型分析器初始化成功: {llm_config['api_host']}:{llm_config['api_port']}") except Exception as e: logger.warning(f"大模型分析器初始化失败: {e},将使用规则判断") self.use_llm = False # 初始化安全检测器 (支持 LLM) if self.enable_detection and self.enable_safety_detection: try: llm_config = {**LLM_CONFIG} if self.enable_llm else None self.detector = LLMSafetyDetector( yolo_model_path=self.config.get('model_path', SAFETY_DETECTION_CONFIG.get('model_path')), llm_config=llm_config, use_gpu=self.config.get('use_gpu', SAFETY_DETECTION_CONFIG.get('use_gpu', True)), use_llm=self.enable_llm, model_type=self.config.get('model_type', SAFETY_DETECTION_CONFIG.get('model_type', 'auto')) ) logger.info("安全检测器初始化成功") except Exception as e: logger.error(f"安全检测器初始化失败: {e}") return False else: self.detector = None logger.info("安全检测功能已禁用") # 初始化编号识别器 if self.enable_ocr and self.enable_llm: try: self.number_recognizer = NumberRecognizer(LLM_CONFIG) logger.info("编号识别器初始化成功") except Exception as e: logger.warning(f"编号识别器初始化失败: {e}") self.number_recognizer = None # 初始化事件推送器 if self.enable_event_push: try: push_config = {**EVENT_PUSHER_CONFIG} if 'api_host' in self.config: push_config['api_host'] = self.config['api_host'] if 'api_port' in self.config: push_config['api_port'] = self.config['api_port'] self.event_pusher = EventPusher(push_config) logger.info("事件推送器初始化成功") except Exception as e: logger.warning(f"事件推送器初始化失败: {e}") # 初始化语音播报器 if self.enable_voice_announce: try: self.voice_announcer = VoiceAnnouncer( tts_config=VOICE_ANNOUNCER_CONFIG.get('tts', {}), player_config=VOICE_ANNOUNCER_CONFIG.get('player', {}) ) logger.info("语音播报器初始化成功") except Exception as e: logger.warning(f"语音播报器初始化失败: {e}") logger.info("系统初始化完成") return True def start(self) -> bool: """启动系统""" if self.running: logger.warning("系统已在运行") return True logger.info("启动安全监控系统...") # 启动事件推送器 if self.event_pusher: self.event_pusher.start() # 启动语音播报器 if self.voice_announcer: self.voice_announcer.start() # 启动检测线程 self.running = True self.detection_thread = threading.Thread(target=self._detection_worker, daemon=True) self.detection_thread.start() with self.stats_lock: self.stats['start_time'] = time.time() logger.info("安全监控系统启动成功") return True def stop(self): """停止系统""" if not self.running: return logger.info("停止安全监控系统...") self.running = False if self.detection_thread: self.detection_thread.join(timeout=3) if self.event_pusher: self.event_pusher.stop() if self.voice_announcer: self.voice_announcer.stop() if self.camera: self.camera.disconnect() if self.ptz_camera: self.ptz_camera.stop_stream() self.ptz_camera.disconnect() if self.stream_manager: self.stream_manager.stop_all() if self.sdk: try: self.sdk.cleanup() except Exception: pass self._print_stats() logger.info("安全监控系统已停止") def _detection_worker(self): """检测工作线程""" # 检查摄像头和检测是否启用 if not self.enable_panorama_camera or not self.enable_detection: logger.info("摄像头或检测功能已禁用,检测线程休眠") while self.running: time.sleep(1) return # 优先使用 detection_fps,默认每秒2帧 detection_fps = SAFETY_DETECTION_CONFIG.get('detection_fps', 2) detection_interval = 1.0 / detection_fps # 根据FPS计算间隔 last_detection_time = 0 # 告警冷却(按违规类型) alert_cooldown = {} cooldown_time = SAFETY_DETECTION_CONFIG.get('alert_cooldown', 3.0) while self.running: try: current_time = time.time() # 获取帧 frame = self.camera.get_frame() if self.camera else None if frame is None: time.sleep(0.01) continue with self.frame_lock: self.current_frame = frame.copy() self._update_stats('frames_processed') # 周期性检测 if current_time - last_detection_time >= detection_interval: last_detection_time = current_time # 执行检测 detections = self.detector.detect(frame) status_list = self.detector.check_safety(frame, detections) self._update_stats('persons_detected', len(status_list)) # 轨迹追踪已禁用 # 处理违规 for status in status_list: if status.is_violation: # 检查冷却(按违规类型) violation_key = status.get_violation_desc() if violation_key in alert_cooldown: if current_time - alert_cooldown[violation_key] < cooldown_time: continue alert_cooldown[violation_key] = current_time self._handle_violation(status, frame) # 显示画面 if self.display: self._display_frame(frame, detections, status_list) time.sleep(0.01) except Exception as e: logger.error(f"检测错误: {e}") time.sleep(0.1) def _handle_violation(self, status: PersonSafetyStatus, frame: np.ndarray): """处理违规""" description = status.get_violation_desc() self._update_stats('violations_detected') # 裁剪人体区域 x1, y1, x2, y2 = status.person_bbox margin = 20 x1 = max(0, x1 - margin) y1 = max(0, y1 - margin) x2 = min(frame.shape[1], x2 + margin) y2 = min(frame.shape[0], y2 + margin) person_image = frame[y1:y2, x1:x2].copy() # 编号识别 number_text = None if self.number_recognizer: try: number_result = self.number_recognizer.recognize_person_number(person_image) number_text = number_result.get('number') if number_text: logger.info(f"识别到编号: {number_text}") except Exception as e: logger.warning(f"编号识别失败: {e}") # 如果识别到编号,添加到描述中 if number_text: description = f"{description} (编号: {number_text})" # 推送事件 if self.event_pusher: self.event_pusher.push_safety_violation( description=description, image=person_image, track_id=status.track_id, confidence=status.person_conf ) self._update_stats('events_pushed') # 语音播报 if self.voice_announcer: self.voice_announcer.announce_violation(description, urgent=True) self._update_stats('voice_announced') logger.warning(f"[违规] {description}") def _display_frame(self, frame: np.ndarray, detections: List[SafetyDetection], status_list: List[PersonSafetyStatus]): """显示帧""" # 绘制检测结果 result_frame = draw_safety_result(frame, detections, status_list) # 添加统计信息 stats_text = f"FPS: {self._get_fps():.1f}" cv2.putText(result_frame, stats_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) cv2.imshow('Safety Monitor', result_frame) # 按 'q' 退出 if cv2.waitKey(1) & 0xFF == ord('q'): self.running = False def _get_fps(self) -> float: """获取帧率""" with self.stats_lock: if self.stats['start_time']: elapsed = time.time() - self.stats['start_time'] if elapsed > 0: return self.stats['frames_processed'] / elapsed return 0.0 def _update_stats(self, key: str, value: int = 1): """更新统计""" with self.stats_lock: if key in self.stats: self.stats[key] += value def _print_stats(self): """打印统计""" with self.stats_lock: if self.stats['start_time']: elapsed = time.time() - self.stats['start_time'] print("\n" + "=" * 50) print("安全检测统计") print("=" * 50) print(f"运行时长: {elapsed:.1f} 秒") print(f"处理帧数: {self.stats['frames_processed']}") print(f"检测人员: {self.stats['persons_detected']} 次") print(f"违规检测: {self.stats['violations_detected']} 次") print(f"事件推送: {self.stats['events_pushed']} 次") print(f"语音播报: {self.stats['voice_announced']} 次") if self.event_pusher: push_stats = self.event_pusher.get_stats() print(f"推送成功: {push_stats['pushed_events']}") print(f"推送失败: {push_stats['failed_events']}") if self.voice_announcer: voice_stats = self.voice_announcer.get_stats() print(f"播报成功: {voice_stats['played_commands']}") print(f"播报失败: {voice_stats['failed_commands']}") print("=" * 50 + "\n") def get_stats(self) -> dict: """获取统计""" with self.stats_lock: return self.stats.copy() def announce(self, text: str): """ 手动播报语音 Args: text: 播报文本 """ if self.voice_announcer: self.voice_announcer.announce(text, priority=VoicePriority.NORMAL) def run_interactive(system: SafetyMonitorSystem): """ 交互模式运行 Args: system: 系统实例 """ print("\n施工现场安全监控系统 - 交互模式") print("=" * 50) print("命令:") print(" s - 开始/停止监控") print(" a - 手动播报 (输入文本)") print(" r - 查看统计信息") print(" q - 退出") print("=" * 50) running = False while True: try: cmd = input("\n> ").strip().lower() if cmd == 'q': break elif cmd == 's': if running: system.stop() running = False print("监控已停止") else: if system.start(): running = True print("监控已启动") elif cmd == 'a': text = input("输入播报文本: ").strip() if text: system.announce(text) print(f"已播报: {text}") elif cmd == 'r': stats = system.get_stats() print("\n统计信息:") for k, v in stats.items(): if v is not None: print(f" {k}: {v}") else: print("未知命令") except KeyboardInterrupt: break except Exception as e: print(f"错误: {e}") print("退出交互模式") def main(): """主函数""" parser = argparse.ArgumentParser( description='施工现场安全行为智能识别系统' ) # 摄像头参数 parser.add_argument('--camera', type=str, default='0', help='摄像头源 (索引/RTSP地址/视频文件)') parser.add_argument('--no-display', action='store_true', help='不显示画面') # 检测参数 parser.add_argument('--model', type=str, help='安全检测模型路径') parser.add_argument('--conf', type=float, default=0.5, help='置信度阈值') parser.add_argument('--person-conf', type=float, default=0.8, help='人员检测置信度阈值') parser.add_argument('--no-gpu', action='store_true', help='不使用GPU') # LLM 大模型参数 parser.add_argument('--llm-host', type=str, help='大模型 API 主机') parser.add_argument('--llm-port', type=int, help='大模型 API 端口') parser.add_argument('--no-llm', action='store_true', help='禁用大模型判断,使用规则判断') parser.add_argument('--no-ocr', action='store_true', help='禁用编号识别') # 业务平台参数 parser.add_argument('--api-host', type=str, help='业务平台 API 主机') parser.add_argument('--api-port', type=int, help='业务平台 API 端口') parser.add_argument('--no-push', action='store_true', help='禁用事件推送') parser.add_argument('--no-voice', action='store_true', help='禁用语音播报') # 运行模式 parser.add_argument('--interactive', action='store_true', help='交互模式') parser.add_argument('--demo', action='store_true', help='演示模式') args = parser.parse_args() # 构建配置 config = {} if args.model: config['model_path'] = args.model config['conf_threshold'] = args.conf config['person_threshold'] = args.person_conf config['use_gpu'] = not args.no_gpu # LLM 配置 if args.llm_host: config['llm_host'] = args.llm_host if args.llm_port: config['llm_port'] = args.llm_port if args.api_host: config['api_host'] = args.api_host if args.api_port: config['api_port'] = args.api_port # 演示模式 if args.demo: print("\n施工现场安全行为智能识别系统") print("=" * 60) print(""" 系统功能: 1. 实时视频监控 2. YOLO11 检测: 人员、安全帽、反光衣 3. 大模型判断: 安全状态分析 4. 编号识别: OCR 识别衣服上的工号 5. 事件推送到业务平台 6. 接收平台指令,TTS 语音播报 系统架构: ┌─────────────────────────────────────────────────────┐ │ 摄像头视频流 │ └─────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ YOLO11 安全检测模型 │ │ 检测类别: 人员(3)、安全帽(0)、反光衣(4) │ └─────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ 大模型安全状态判断 │ │ 分析: 是否佩戴安全帽、是否穿反光衣 │ │ 识别: 衣服上的工号/编号 │ └─────────────────────────────────────────────────────┘ │ ┌─────────────┼─────────────┐ ▼ ▼ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ 事件推送 │ │ 语音播报 │ │ 编号记录 │ │ 业务平台API │ │ TTS服务 │ │ 身份关联 │ └─────────────┘ └─────────────┘ └─────────────┘ 运行命令: python safety_main.py --camera 0 # 使用默认摄像头 python safety_main.py --camera rtsp://... # RTSP 流 python safety_main.py --camera video.mp4 # 视频文件 python safety_main.py --interactive # 交互模式 python safety_main.py --no-display # 无界面模式 python safety_main.py --no-llm # 禁用大模型,使用规则判断 python safety_main.py --no-ocr # 禁用编号识别 python safety_main.py --llm-host localhost --llm-port 8111 # 指定大模型服务 """) return 0 # 创建系统 system = SafetyMonitorSystem(config) system.display = not args.no_display # 设置信号处理 def signal_handler(sig, frame): print("\n接收到停止信号") system.stop() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) try: # 解析摄像头源 camera_source = args.camera if camera_source.isdigit(): camera_source = int(camera_source) # 初始化 if not system.initialize(camera_source): print("\n系统初始化失败!") return 1 # 禁用功能 (命令行参数覆盖配置) if args.no_llm: system.enable_llm = False print("大模型判断已禁用,使用规则判断") if args.no_ocr: system.enable_ocr = False system.number_recognizer = None print("编号识别已禁用") if args.no_push: system.enable_event_push = False system.event_pusher = None print("事件推送已禁用") if args.no_voice: system.enable_voice_announce = False system.voice_announcer = None print("语音播报已禁用") # 运行 if args.interactive: run_interactive(system) else: # 自动模式 if not system.start(): print("启动失败") return 1 print("\n系统运行中,按 Ctrl+C 停止") print("(按 'q' 键退出显示窗口)\n") # 主循环 while system.running: time.sleep(0.1) except KeyboardInterrupt: print("\n接收到停止信号") finally: system.stop() return 0 if __name__ == '__main__': sys.exit(main() or 0)