| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460 |
- """
- 事件推送模块
- 将识别到的安全违规事件推送到业务平台
- """
- import os
- import time
- import json
- import threading
- import queue
- import requests
- import http.client
- import mimetypes
- from typing import Optional, Dict, Any, List
- from dataclasses import dataclass
- from enum import Enum
- from codecs import encode
- import cv2
- import numpy as np
- class EventType(Enum):
- """事件类型"""
- SAFETY_VIOLATION = "安全违规" # 安全违规(未戴安全帽/未穿反光衣)
- INTRUSION = "入侵检测" # 禁区入侵
- LOITERING = "徘徊检测" # 徘徊检测
- @dataclass
- class SafetyEvent:
- """安全事件"""
- event_type: EventType # 事件类型
- description: str # 事件描述
- image_path: Optional[str] = None # 图片路径
- image_url: Optional[str] = None # 图片URL (上传后)
- track_id: int = 0 # 跟踪ID
- confidence: float = 0.0 # 置信度
- location: str = "" # 位置信息
- timestamp: float = 0.0 # 时间戳
- extra: Dict[str, Any] = None # 额外信息
-
- def __post_init__(self):
- if self.timestamp == 0.0:
- self.timestamp = time.time()
- if self.extra is None:
- self.extra = {}
-
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典"""
- return {
- 'eventType': self.event_type.value,
- 'description': self.description,
- 'imageUrl': self.image_url,
- 'trackId': self.track_id,
- 'confidence': self.confidence,
- 'location': self.location,
- 'timestamp': self.timestamp,
- 'extra': self.extra
- }
- class EventPusher:
- """
- 事件推送器
- 负责将安全事件推送到业务平台
- """
-
- def __init__(self, config: Dict[str, Any] = None):
- """
- 初始化事件推送器
-
- Args:
- config: 配置字典
- """
- self.config = config or {}
-
- # API 配置
- self.api_host = self.config.get('api_host', 'jtjai.device.wenhq.top')
- self.api_port = self.config.get('api_port', 8583)
- self.use_https = self.config.get('use_https', True)
-
- # 上传接口
- self.upload_url = self.config.get('upload_url', '/api/resource/oss/upload')
- self.event_url = self.config.get('event_url', '/api/system/event')
-
- # 推送控制
- self.enabled = self.config.get('enabled', True)
- self.upload_interval = self.config.get('upload_interval', 2.0) # 推送间隔
- self.retry_count = self.config.get('retry_count', 3) # 重试次数
- self.retry_delay = self.config.get('retry_delay', 1.0) # 重试延迟
-
- # 事件队列
- self.event_queue = queue.Queue()
-
- # 工作线程
- self.running = False
- self.worker_thread = None
-
- # 上次推送时间
- self.last_push_time = 0
-
- # 统计
- self.stats = {
- 'total_events': 0,
- 'pushed_events': 0,
- 'failed_events': 0,
- 'upload_success': 0,
- 'upload_failed': 0
- }
- self.stats_lock = threading.Lock()
-
- def start(self):
- """启动推送器"""
- if self.running:
- return
-
- self.running = True
- self.worker_thread = threading.Thread(target=self._worker, daemon=True)
- self.worker_thread.start()
- print("事件推送器已启动")
-
- def stop(self):
- """停止推送器"""
- self.running = False
- if self.worker_thread:
- self.worker_thread.join(timeout=3)
- print("事件推送器已停止")
-
- def push_event(self, event: SafetyEvent):
- """
- 推送事件
-
- Args:
- event: 安全事件
- """
- if not self.enabled:
- return
-
- # 检查推送间隔
- current_time = time.time()
- if current_time - self.last_push_time < self.upload_interval:
- return
-
- self.event_queue.put(event)
-
- with self.stats_lock:
- self.stats['total_events'] += 1
-
- def push_safety_violation(self, description: str, image: np.ndarray = None,
- track_id: int = 0, confidence: float = 0.0,
- location: str = "施工现场") -> bool:
- """
- 推送安全违规事件
-
- Args:
- description: 违规描述
- image: 图像 (可选)
- track_id: 跟踪ID
- confidence: 置信度
- location: 位置
-
- Returns:
- 是否成功加入队列
- """
- event = SafetyEvent(
- event_type=EventType.SAFETY_VIOLATION,
- description=description,
- track_id=track_id,
- confidence=confidence,
- location=location
- )
-
- # 保存图像
- if image is not None:
- temp_path = f"/tmp/safety_event_{int(time.time() * 1000)}.jpg"
- cv2.imwrite(temp_path, image)
- event.image_path = temp_path
-
- self.push_event(event)
- return True
-
- def _worker(self):
- """工作线程"""
- while self.running:
- try:
- # 获取事件
- try:
- event = self.event_queue.get(timeout=1.0)
- except queue.Empty:
- continue
-
- # 处理事件
- self._process_event(event)
-
- except Exception as e:
- print(f"事件处理错误: {e}")
-
- def _process_event(self, event: SafetyEvent):
- """处理单个事件"""
- try:
- # 上传图片
- if event.image_path:
- image_url = self._upload_image(event.image_path)
- if image_url:
- event.image_url = image_url
- with self.stats_lock:
- self.stats['upload_success'] += 1
- else:
- with self.stats_lock:
- self.stats['upload_failed'] += 1
-
- # 清理临时文件
- if os.path.exists(event.image_path):
- try:
- os.remove(event.image_path)
- except:
- pass
-
- # 创建事件
- success = self._create_event(event)
-
- if success:
- self.last_push_time = time.time()
- with self.stats_lock:
- self.stats['pushed_events'] += 1
- print(f"事件推送成功: {event.description}")
- else:
- with self.stats_lock:
- self.stats['failed_events'] += 1
- print(f"事件推送失败: {event.description}")
-
- except Exception as e:
- print(f"处理事件错误: {e}")
- with self.stats_lock:
- self.stats['failed_events'] += 1
-
- def _upload_image(self, image_path: str) -> Optional[str]:
- """
- 上传图片到 OSS
-
- Args:
- image_path: 图片路径
-
- Returns:
- 图片URL或None
- """
- if not os.path.exists(image_path):
- return None
-
- for attempt in range(self.retry_count):
- try:
- filename = os.path.basename(image_path)
-
- # 创建连接
- if self.use_https:
- conn = http.client.HTTPSConnection(self.api_host, self.api_port)
- else:
- conn = http.client.HTTPConnection(self.api_host, self.api_port)
-
- # 准备 multipart/form-data
- boundary = f'wL36Yn8afVp8Ag7AmP8qZ0SA4n1v9T{int(time.time())}'
- dataList = []
- dataList.append(encode(f'--{boundary}'))
- dataList.append(encode(f'Content-Disposition: form-data; name=file; filename={filename}'))
-
- # 文件类型
- fileType = mimetypes.guess_type(image_path)[0] or 'image/jpeg'
- dataList.append(encode(f'Content-Type: {fileType}'))
- dataList.append(encode(''))
-
- # 读取文件
- with open(image_path, 'rb') as f:
- dataList.append(f.read())
-
- dataList.append(encode(f'--{boundary}--'))
- dataList.append(encode(''))
-
- body = b'\r\n'.join(dataList)
-
- headers = {
- 'User-Agent': 'SafetySystem/1.0',
- 'Accept': '*/*',
- 'Host': f'{self.api_host}:{self.api_port}',
- 'Connection': 'keep-alive',
- 'Content-Type': f'multipart/form-data; boundary={boundary}'
- }
-
- conn.request("POST", self.upload_url, body, headers)
- res = conn.getresponse()
- data = res.read()
- conn.close()
-
- if res.status == 200:
- result = json.loads(data.decode("utf-8"))
- if result.get('code') == 200:
- return result.get('data', {}).get('purl')
- else:
- print(f"上传失败: {result.get('msg', '未知错误')}")
- else:
- print(f"上传失败: HTTP {res.status}")
-
- except Exception as e:
- print(f"上传异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
- if attempt < self.retry_count - 1:
- time.sleep(self.retry_delay)
-
- return None
-
- def _create_event(self, event: SafetyEvent) -> bool:
- """
- 在业务平台创建事件
-
- Args:
- event: 安全事件
-
- Returns:
- 是否成功
- """
- for attempt in range(self.retry_count):
- try:
- # 构建请求
- if self.use_https:
- base_url = f"https://{self.api_host}:{self.api_port}"
- else:
- base_url = f"http://{self.api_host}:{self.api_port}"
-
- url = f"{base_url}{self.event_url}"
-
- create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(event.timestamp))
-
- data = {
- "createTime": create_time,
- "addr": event.description,
- "ext1": json.dumps([event.image_url]) if event.image_url else "[]",
- "ext2": json.dumps({
- "lx": "工地安全",
- "type": event.event_type.value,
- "trackId": event.track_id,
- "confidence": event.confidence,
- "location": event.location
- })
- }
-
- response = requests.post(url, json=data, verify=False, timeout=10)
-
- if response.status_code == 200:
- result = response.json()
- if result.get('code') == 200:
- return True
- else:
- print(f"创建事件失败: {result.get('msg', '未知错误')}")
- else:
- print(f"创建事件失败: HTTP {response.status_code}")
-
- except Exception as e:
- print(f"创建事件异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
- if attempt < self.retry_count - 1:
- time.sleep(self.retry_delay)
-
- return False
-
- def get_stats(self) -> Dict[str, int]:
- """获取统计信息"""
- with self.stats_lock:
- return self.stats.copy()
- class EventListener:
- """
- 事件监听器
- 监听业务平台的指令(如语音播放指令)
- """
-
- def __init__(self, config: Dict[str, Any] = None):
- """
- 初始化事件监听器
-
- Args:
- config: 配置字典
- """
- self.config = config or {}
-
- # WebSocket 或 HTTP 长轮询配置
- self.listen_url = self.config.get('listen_url', '')
- self.poll_interval = self.config.get('poll_interval', 5.0)
-
- # 回调
- self.on_voice_command = None
- self.on_other_command = None
-
- # 运行状态
- self.running = False
- self.listener_thread = None
-
- def start(self):
- """启动监听"""
- if self.running:
- return
-
- self.running = True
- self.listener_thread = threading.Thread(target=self._listener_worker, daemon=True)
- self.listener_thread.start()
- print("事件监听器已启动")
-
- def stop(self):
- """停止监听"""
- self.running = False
- if self.listener_thread:
- self.listener_thread.join(timeout=3)
- print("事件监听器已停止")
-
- def _listener_worker(self):
- """监听工作线程"""
- while self.running:
- try:
- # 轮询获取指令
- # TODO: 实现 WebSocket 或 HTTP 长轮询
- commands = self._poll_commands()
-
- for cmd in commands:
- self._process_command(cmd)
-
- time.sleep(self.poll_interval)
-
- except Exception as e:
- print(f"监听错误: {e}")
- time.sleep(1.0)
-
- def _poll_commands(self) -> List[Dict[str, Any]]:
- """
- 轮询获取指令
-
- Returns:
- 指令列表
- """
- # TODO: 实现具体的轮询逻辑
- # 这里可以对接业务平台的指令接口
- return []
-
- def _process_command(self, cmd: Dict[str, Any]):
- """处理指令"""
- cmd_type = cmd.get('type', '')
-
- if cmd_type == 'voice':
- # 语音播放指令
- if self.on_voice_command:
- self.on_voice_command(cmd)
- else:
- # 其他指令
- if self.on_other_command:
- self.on_other_command(cmd)
-
- def set_voice_callback(self, callback):
- """设置语音播放回调"""
- self.on_voice_command = callback
-
- def set_other_callback(self, callback):
- """设置其他指令回调"""
- self.on_other_command = callback
|