event_pusher.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. """
  2. 事件推送模块
  3. 将识别到的安全违规事件推送到业务平台
  4. """
  5. import os
  6. import time
  7. import json
  8. import threading
  9. import queue
  10. import requests
  11. import http.client
  12. import mimetypes
  13. from typing import Optional, Dict, Any, List
  14. from dataclasses import dataclass
  15. from enum import Enum
  16. from codecs import encode
  17. import cv2
  18. import numpy as np
  19. class EventType(Enum):
  20. """事件类型"""
  21. SAFETY_VIOLATION = "安全违规" # 安全违规(未戴安全帽/未穿反光衣)
  22. INTRUSION = "入侵检测" # 禁区入侵
  23. LOITERING = "徘徊检测" # 徘徊检测
  24. @dataclass
  25. class SafetyEvent:
  26. """安全事件"""
  27. event_type: EventType # 事件类型
  28. description: str # 事件描述
  29. image_path: Optional[str] = None # 图片路径
  30. image_url: Optional[str] = None # 图片URL (上传后)
  31. track_id: int = 0 # 跟踪ID
  32. confidence: float = 0.0 # 置信度
  33. location: str = "" # 位置信息
  34. timestamp: float = 0.0 # 时间戳
  35. extra: Dict[str, Any] = None # 额外信息
  36. def __post_init__(self):
  37. if self.timestamp == 0.0:
  38. self.timestamp = time.time()
  39. if self.extra is None:
  40. self.extra = {}
  41. def to_dict(self) -> Dict[str, Any]:
  42. """转换为字典"""
  43. return {
  44. 'eventType': self.event_type.value,
  45. 'description': self.description,
  46. 'imageUrl': self.image_url,
  47. 'trackId': self.track_id,
  48. 'confidence': self.confidence,
  49. 'location': self.location,
  50. 'timestamp': self.timestamp,
  51. 'extra': self.extra
  52. }
  53. class EventPusher:
  54. """
  55. 事件推送器
  56. 负责将安全事件推送到业务平台
  57. """
  58. def __init__(self, config: Dict[str, Any] = None):
  59. """
  60. 初始化事件推送器
  61. Args:
  62. config: 配置字典
  63. """
  64. self.config = config or {}
  65. # API 配置
  66. self.api_host = self.config.get('api_host', 'jtjai.device.wenhq.top')
  67. self.api_port = self.config.get('api_port', 8583)
  68. self.use_https = self.config.get('use_https', True)
  69. # 上传接口
  70. self.upload_url = self.config.get('upload_url', '/api/resource/oss/upload')
  71. self.event_url = self.config.get('event_url', '/api/system/event')
  72. # 推送控制
  73. self.enabled = self.config.get('enabled', True)
  74. self.upload_interval = self.config.get('upload_interval', 2.0) # 推送间隔
  75. self.retry_count = self.config.get('retry_count', 3) # 重试次数
  76. self.retry_delay = self.config.get('retry_delay', 1.0) # 重试延迟
  77. # 事件队列
  78. self.event_queue = queue.Queue()
  79. # 工作线程
  80. self.running = False
  81. self.worker_thread = None
  82. # 上次推送时间
  83. self.last_push_time = 0
  84. # 统计
  85. self.stats = {
  86. 'total_events': 0,
  87. 'pushed_events': 0,
  88. 'failed_events': 0,
  89. 'upload_success': 0,
  90. 'upload_failed': 0
  91. }
  92. self.stats_lock = threading.Lock()
  93. def start(self):
  94. """启动推送器"""
  95. if self.running:
  96. return
  97. self.running = True
  98. self.worker_thread = threading.Thread(target=self._worker, daemon=True)
  99. self.worker_thread.start()
  100. print("事件推送器已启动")
  101. def stop(self):
  102. """停止推送器"""
  103. self.running = False
  104. if self.worker_thread:
  105. self.worker_thread.join(timeout=3)
  106. print("事件推送器已停止")
  107. def push_event(self, event: SafetyEvent):
  108. """
  109. 推送事件
  110. Args:
  111. event: 安全事件
  112. """
  113. if not self.enabled:
  114. return
  115. # 检查推送间隔
  116. current_time = time.time()
  117. if current_time - self.last_push_time < self.upload_interval:
  118. return
  119. self.event_queue.put(event)
  120. with self.stats_lock:
  121. self.stats['total_events'] += 1
  122. def push_safety_violation(self, description: str, image: np.ndarray = None,
  123. track_id: int = 0, confidence: float = 0.0,
  124. location: str = "施工现场") -> bool:
  125. """
  126. 推送安全违规事件
  127. Args:
  128. description: 违规描述
  129. image: 图像 (可选)
  130. track_id: 跟踪ID
  131. confidence: 置信度
  132. location: 位置
  133. Returns:
  134. 是否成功加入队列
  135. """
  136. event = SafetyEvent(
  137. event_type=EventType.SAFETY_VIOLATION,
  138. description=description,
  139. track_id=track_id,
  140. confidence=confidence,
  141. location=location
  142. )
  143. # 保存图像
  144. if image is not None:
  145. temp_path = f"/tmp/safety_event_{int(time.time() * 1000)}.jpg"
  146. cv2.imwrite(temp_path, image)
  147. event.image_path = temp_path
  148. self.push_event(event)
  149. return True
  150. def _worker(self):
  151. """工作线程"""
  152. while self.running:
  153. try:
  154. # 获取事件
  155. try:
  156. event = self.event_queue.get(timeout=1.0)
  157. except queue.Empty:
  158. continue
  159. # 处理事件
  160. self._process_event(event)
  161. except Exception as e:
  162. print(f"事件处理错误: {e}")
  163. def _process_event(self, event: SafetyEvent):
  164. """处理单个事件"""
  165. try:
  166. # 上传图片
  167. if event.image_path:
  168. image_url = self._upload_image(event.image_path)
  169. if image_url:
  170. event.image_url = image_url
  171. with self.stats_lock:
  172. self.stats['upload_success'] += 1
  173. else:
  174. with self.stats_lock:
  175. self.stats['upload_failed'] += 1
  176. # 清理临时文件
  177. if os.path.exists(event.image_path):
  178. try:
  179. os.remove(event.image_path)
  180. except:
  181. pass
  182. # 创建事件
  183. success = self._create_event(event)
  184. if success:
  185. self.last_push_time = time.time()
  186. with self.stats_lock:
  187. self.stats['pushed_events'] += 1
  188. print(f"事件推送成功: {event.description}")
  189. else:
  190. with self.stats_lock:
  191. self.stats['failed_events'] += 1
  192. print(f"事件推送失败: {event.description}")
  193. except Exception as e:
  194. print(f"处理事件错误: {e}")
  195. with self.stats_lock:
  196. self.stats['failed_events'] += 1
  197. def _upload_image(self, image_path: str) -> Optional[str]:
  198. """
  199. 上传图片到 OSS
  200. Args:
  201. image_path: 图片路径
  202. Returns:
  203. 图片URL或None
  204. """
  205. if not os.path.exists(image_path):
  206. return None
  207. for attempt in range(self.retry_count):
  208. try:
  209. filename = os.path.basename(image_path)
  210. # 创建连接
  211. if self.use_https:
  212. conn = http.client.HTTPSConnection(self.api_host, self.api_port)
  213. else:
  214. conn = http.client.HTTPConnection(self.api_host, self.api_port)
  215. # 准备 multipart/form-data
  216. boundary = f'wL36Yn8afVp8Ag7AmP8qZ0SA4n1v9T{int(time.time())}'
  217. dataList = []
  218. dataList.append(encode(f'--{boundary}'))
  219. dataList.append(encode(f'Content-Disposition: form-data; name=file; filename={filename}'))
  220. # 文件类型
  221. fileType = mimetypes.guess_type(image_path)[0] or 'image/jpeg'
  222. dataList.append(encode(f'Content-Type: {fileType}'))
  223. dataList.append(encode(''))
  224. # 读取文件
  225. with open(image_path, 'rb') as f:
  226. dataList.append(f.read())
  227. dataList.append(encode(f'--{boundary}--'))
  228. dataList.append(encode(''))
  229. body = b'\r\n'.join(dataList)
  230. headers = {
  231. 'User-Agent': 'SafetySystem/1.0',
  232. 'Accept': '*/*',
  233. 'Host': f'{self.api_host}:{self.api_port}',
  234. 'Connection': 'keep-alive',
  235. 'Content-Type': f'multipart/form-data; boundary={boundary}'
  236. }
  237. conn.request("POST", self.upload_url, body, headers)
  238. res = conn.getresponse()
  239. data = res.read()
  240. conn.close()
  241. if res.status == 200:
  242. result = json.loads(data.decode("utf-8"))
  243. if result.get('code') == 200:
  244. return result.get('data', {}).get('purl')
  245. else:
  246. print(f"上传失败: {result.get('msg', '未知错误')}")
  247. else:
  248. print(f"上传失败: HTTP {res.status}")
  249. except Exception as e:
  250. print(f"上传异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
  251. if attempt < self.retry_count - 1:
  252. time.sleep(self.retry_delay)
  253. return None
  254. def _create_event(self, event: SafetyEvent) -> bool:
  255. """
  256. 在业务平台创建事件
  257. Args:
  258. event: 安全事件
  259. Returns:
  260. 是否成功
  261. """
  262. for attempt in range(self.retry_count):
  263. try:
  264. # 构建请求
  265. if self.use_https:
  266. base_url = f"https://{self.api_host}:{self.api_port}"
  267. else:
  268. base_url = f"http://{self.api_host}:{self.api_port}"
  269. url = f"{base_url}{self.event_url}"
  270. create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(event.timestamp))
  271. data = {
  272. "createTime": create_time,
  273. "addr": event.description,
  274. "ext1": json.dumps([event.image_url]) if event.image_url else "[]",
  275. "ext2": json.dumps({
  276. "lx": "工地安全",
  277. "type": event.event_type.value,
  278. "trackId": event.track_id,
  279. "confidence": event.confidence,
  280. "location": event.location
  281. })
  282. }
  283. response = requests.post(url, json=data, verify=False, timeout=10)
  284. if response.status_code == 200:
  285. result = response.json()
  286. if result.get('code') == 200:
  287. return True
  288. else:
  289. print(f"创建事件失败: {result.get('msg', '未知错误')}")
  290. else:
  291. print(f"创建事件失败: HTTP {response.status_code}")
  292. except Exception as e:
  293. print(f"创建事件异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
  294. if attempt < self.retry_count - 1:
  295. time.sleep(self.retry_delay)
  296. return False
  297. def get_stats(self) -> Dict[str, int]:
  298. """获取统计信息"""
  299. with self.stats_lock:
  300. return self.stats.copy()
  301. class EventListener:
  302. """
  303. 事件监听器
  304. 监听业务平台的指令(如语音播放指令)
  305. """
  306. def __init__(self, config: Dict[str, Any] = None):
  307. """
  308. 初始化事件监听器
  309. Args:
  310. config: 配置字典
  311. """
  312. self.config = config or {}
  313. # WebSocket 或 HTTP 长轮询配置
  314. self.listen_url = self.config.get('listen_url', '')
  315. self.poll_interval = self.config.get('poll_interval', 5.0)
  316. # 回调
  317. self.on_voice_command = None
  318. self.on_other_command = None
  319. # 运行状态
  320. self.running = False
  321. self.listener_thread = None
  322. def start(self):
  323. """启动监听"""
  324. if self.running:
  325. return
  326. self.running = True
  327. self.listener_thread = threading.Thread(target=self._listener_worker, daemon=True)
  328. self.listener_thread.start()
  329. print("事件监听器已启动")
  330. def stop(self):
  331. """停止监听"""
  332. self.running = False
  333. if self.listener_thread:
  334. self.listener_thread.join(timeout=3)
  335. print("事件监听器已停止")
  336. def _listener_worker(self):
  337. """监听工作线程"""
  338. while self.running:
  339. try:
  340. # 轮询获取指令
  341. # TODO: 实现 WebSocket 或 HTTP 长轮询
  342. commands = self._poll_commands()
  343. for cmd in commands:
  344. self._process_command(cmd)
  345. time.sleep(self.poll_interval)
  346. except Exception as e:
  347. print(f"监听错误: {e}")
  348. time.sleep(1.0)
  349. def _poll_commands(self) -> List[Dict[str, Any]]:
  350. """
  351. 轮询获取指令
  352. Returns:
  353. 指令列表
  354. """
  355. # TODO: 实现具体的轮询逻辑
  356. # 这里可以对接业务平台的指令接口
  357. return []
  358. def _process_command(self, cmd: Dict[str, Any]):
  359. """处理指令"""
  360. cmd_type = cmd.get('type', '')
  361. if cmd_type == 'voice':
  362. # 语音播放指令
  363. if self.on_voice_command:
  364. self.on_voice_command(cmd)
  365. else:
  366. # 其他指令
  367. if self.on_other_command:
  368. self.on_other_command(cmd)
  369. def set_voice_callback(self, callback):
  370. """设置语音播放回调"""
  371. self.on_voice_command = callback
  372. def set_other_callback(self, callback):
  373. """设置其他指令回调"""
  374. self.on_other_command = callback