""" 事件推送模块 将识别到的安全违规事件推送到业务平台 """ import os import time import json import threading import queue import requests import http.client import mimetypes from typing import Optional, Dict, Any, List from dataclasses import dataclass from enum import Enum from codecs import encode import cv2 import numpy as np class EventType(Enum): """事件类型""" SAFETY_VIOLATION = "安全违规" # 安全违规(未戴安全帽/未穿反光衣) INTRUSION = "入侵检测" # 禁区入侵 LOITERING = "徘徊检测" # 徘徊检测 @dataclass class SafetyEvent: """安全事件""" event_type: EventType # 事件类型 description: str # 事件描述 image_path: Optional[str] = None # 图片路径 image_url: Optional[str] = None # 图片URL (上传后) track_id: int = 0 # 跟踪ID confidence: float = 0.0 # 置信度 location: str = "" # 位置信息 timestamp: float = 0.0 # 时间戳 extra: Dict[str, Any] = None # 额外信息 def __post_init__(self): if self.timestamp == 0.0: self.timestamp = time.time() if self.extra is None: self.extra = {} def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { 'eventType': self.event_type.value, 'description': self.description, 'imageUrl': self.image_url, 'trackId': self.track_id, 'confidence': self.confidence, 'location': self.location, 'timestamp': self.timestamp, 'extra': self.extra } class EventPusher: """ 事件推送器 负责将安全事件推送到业务平台 """ def __init__(self, config: Dict[str, Any] = None): """ 初始化事件推送器 Args: config: 配置字典 """ self.config = config or {} # API 配置 self.api_host = self.config.get('api_host', 'jtjai.device.wenhq.top') self.api_port = self.config.get('api_port', 8583) self.use_https = self.config.get('use_https', True) # 上传接口 self.upload_url = self.config.get('upload_url', '/api/resource/oss/upload') self.event_url = self.config.get('event_url', '/api/system/event') # 推送控制 self.enabled = self.config.get('enabled', True) self.upload_interval = self.config.get('upload_interval', 2.0) # 推送间隔 self.retry_count = self.config.get('retry_count', 3) # 重试次数 self.retry_delay = self.config.get('retry_delay', 1.0) # 重试延迟 # 事件队列 self.event_queue = queue.Queue() # 工作线程 self.running = False self.worker_thread = None # 上次推送时间 self.last_push_time = 0 # 统计 self.stats = { 'total_events': 0, 'pushed_events': 0, 'failed_events': 0, 'upload_success': 0, 'upload_failed': 0 } self.stats_lock = threading.Lock() def start(self): """启动推送器""" if self.running: return self.running = True self.worker_thread = threading.Thread(target=self._worker, daemon=True) self.worker_thread.start() print("事件推送器已启动") def stop(self): """停止推送器""" self.running = False if self.worker_thread: self.worker_thread.join(timeout=3) print("事件推送器已停止") def push_event(self, event: SafetyEvent): """ 推送事件 Args: event: 安全事件 """ if not self.enabled: return # 检查推送间隔 current_time = time.time() if current_time - self.last_push_time < self.upload_interval: return self.event_queue.put(event) with self.stats_lock: self.stats['total_events'] += 1 def push_safety_violation(self, description: str, image: np.ndarray = None, track_id: int = 0, confidence: float = 0.0, location: str = "施工现场") -> bool: """ 推送安全违规事件 Args: description: 违规描述 image: 图像 (可选) track_id: 跟踪ID confidence: 置信度 location: 位置 Returns: 是否成功加入队列 """ event = SafetyEvent( event_type=EventType.SAFETY_VIOLATION, description=description, track_id=track_id, confidence=confidence, location=location ) # 保存图像 if image is not None: temp_path = f"/tmp/safety_event_{int(time.time() * 1000)}.jpg" cv2.imwrite(temp_path, image) event.image_path = temp_path self.push_event(event) return True def _worker(self): """工作线程""" while self.running: try: # 获取事件 try: event = self.event_queue.get(timeout=1.0) except queue.Empty: continue # 处理事件 self._process_event(event) except Exception as e: print(f"事件处理错误: {e}") def _process_event(self, event: SafetyEvent): """处理单个事件""" try: # 上传图片 if event.image_path: image_url = self._upload_image(event.image_path) if image_url: event.image_url = image_url with self.stats_lock: self.stats['upload_success'] += 1 else: with self.stats_lock: self.stats['upload_failed'] += 1 # 清理临时文件 if os.path.exists(event.image_path): try: os.remove(event.image_path) except: pass # 创建事件 success = self._create_event(event) if success: self.last_push_time = time.time() with self.stats_lock: self.stats['pushed_events'] += 1 print(f"事件推送成功: {event.description}") else: with self.stats_lock: self.stats['failed_events'] += 1 print(f"事件推送失败: {event.description}") except Exception as e: print(f"处理事件错误: {e}") with self.stats_lock: self.stats['failed_events'] += 1 def _upload_image(self, image_path: str) -> Optional[str]: """ 上传图片到 OSS Args: image_path: 图片路径 Returns: 图片URL或None """ if not os.path.exists(image_path): return None for attempt in range(self.retry_count): try: filename = os.path.basename(image_path) # 创建连接 if self.use_https: conn = http.client.HTTPSConnection(self.api_host, self.api_port) else: conn = http.client.HTTPConnection(self.api_host, self.api_port) # 准备 multipart/form-data boundary = f'wL36Yn8afVp8Ag7AmP8qZ0SA4n1v9T{int(time.time())}' dataList = [] dataList.append(encode(f'--{boundary}')) dataList.append(encode(f'Content-Disposition: form-data; name=file; filename={filename}')) # 文件类型 fileType = mimetypes.guess_type(image_path)[0] or 'image/jpeg' dataList.append(encode(f'Content-Type: {fileType}')) dataList.append(encode('')) # 读取文件 with open(image_path, 'rb') as f: dataList.append(f.read()) dataList.append(encode(f'--{boundary}--')) dataList.append(encode('')) body = b'\r\n'.join(dataList) headers = { 'User-Agent': 'SafetySystem/1.0', 'Accept': '*/*', 'Host': f'{self.api_host}:{self.api_port}', 'Connection': 'keep-alive', 'Content-Type': f'multipart/form-data; boundary={boundary}' } conn.request("POST", self.upload_url, body, headers) res = conn.getresponse() data = res.read() conn.close() if res.status == 200: result = json.loads(data.decode("utf-8")) if result.get('code') == 200: return result.get('data', {}).get('purl') else: print(f"上传失败: {result.get('msg', '未知错误')}") else: print(f"上传失败: HTTP {res.status}") except Exception as e: print(f"上传异常 (尝试 {attempt + 1}/{self.retry_count}): {e}") if attempt < self.retry_count - 1: time.sleep(self.retry_delay) return None def _create_event(self, event: SafetyEvent) -> bool: """ 在业务平台创建事件 Args: event: 安全事件 Returns: 是否成功 """ for attempt in range(self.retry_count): try: # 构建请求 if self.use_https: base_url = f"https://{self.api_host}:{self.api_port}" else: base_url = f"http://{self.api_host}:{self.api_port}" url = f"{base_url}{self.event_url}" create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(event.timestamp)) data = { "createTime": create_time, "addr": event.description, "ext1": json.dumps([event.image_url]) if event.image_url else "[]", "ext2": json.dumps({ "lx": "工地安全", "type": event.event_type.value, "trackId": event.track_id, "confidence": event.confidence, "location": event.location }) } response = requests.post(url, json=data, verify=False, timeout=10) if response.status_code == 200: result = response.json() if result.get('code') == 200: return True else: print(f"创建事件失败: {result.get('msg', '未知错误')}") else: print(f"创建事件失败: HTTP {response.status_code}") except Exception as e: print(f"创建事件异常 (尝试 {attempt + 1}/{self.retry_count}): {e}") if attempt < self.retry_count - 1: time.sleep(self.retry_delay) return False def get_stats(self) -> Dict[str, int]: """获取统计信息""" with self.stats_lock: return self.stats.copy() class EventListener: """ 事件监听器 监听业务平台的指令(如语音播放指令) """ def __init__(self, config: Dict[str, Any] = None): """ 初始化事件监听器 Args: config: 配置字典 """ self.config = config or {} # WebSocket 或 HTTP 长轮询配置 self.listen_url = self.config.get('listen_url', '') self.poll_interval = self.config.get('poll_interval', 5.0) # 回调 self.on_voice_command = None self.on_other_command = None # 运行状态 self.running = False self.listener_thread = None def start(self): """启动监听""" if self.running: return self.running = True self.listener_thread = threading.Thread(target=self._listener_worker, daemon=True) self.listener_thread.start() print("事件监听器已启动") def stop(self): """停止监听""" self.running = False if self.listener_thread: self.listener_thread.join(timeout=3) print("事件监听器已停止") def _listener_worker(self): """监听工作线程""" while self.running: try: # 轮询获取指令 # TODO: 实现 WebSocket 或 HTTP 长轮询 commands = self._poll_commands() for cmd in commands: self._process_command(cmd) time.sleep(self.poll_interval) except Exception as e: print(f"监听错误: {e}") time.sleep(1.0) def _poll_commands(self) -> List[Dict[str, Any]]: """ 轮询获取指令 Returns: 指令列表 """ # TODO: 实现具体的轮询逻辑 # 这里可以对接业务平台的指令接口 return [] def _process_command(self, cmd: Dict[str, Any]): """处理指令""" cmd_type = cmd.get('type', '') if cmd_type == 'voice': # 语音播放指令 if self.on_voice_command: self.on_voice_command(cmd) else: # 其他指令 if self.on_other_command: self.on_other_command(cmd) def set_voice_callback(self, callback): """设置语音播放回调""" self.on_voice_command = callback def set_other_callback(self, callback): """设置其他指令回调""" self.on_other_command = callback