event_pusher.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. """
  2. 事件推送模块
  3. 将识别到的安全违规事件推送到业务平台
  4. """
  5. import os
  6. import time
  7. import json
  8. import threading
  9. import queue
  10. import tempfile
  11. import requests
  12. import mimetypes
  13. from typing import Optional, Dict, Any, List, Callable
  14. from dataclasses import dataclass
  15. from datetime import datetime
  16. from enum import Enum
  17. import cv2
  18. import numpy as np
  19. class EventType(Enum):
  20. """事件类型"""
  21. SAFETY_VIOLATION = "安全违规" # 安全违规(未戴安全帽/未穿反光衣)
  22. INTRUSION = "入侵检测" # 禁区入侵
  23. LOITERING = "徘徊检测" # 徘徊检测
  24. @dataclass
  25. class SafetyEvent:
  26. """安全事件"""
  27. event_type: EventType # 事件类型
  28. description: str # 事件描述
  29. image_path: Optional[str] = None # 图片路径
  30. image_url: Optional[str] = None # 图片URL (上传后)
  31. track_id: int = 0 # 跟踪ID
  32. confidence: float = 0.0 # 置信度
  33. location: str = "" # 位置信息
  34. timestamp: float = 0.0 # 时间戳
  35. extra: Optional[Dict[str, Any]] = None # 额外信息
  36. def __post_init__(self):
  37. if self.timestamp == 0.0:
  38. self.timestamp = time.time()
  39. if self.extra is None:
  40. self.extra = {}
  41. def to_dict(self) -> Dict[str, Any]:
  42. """转换为字典"""
  43. return {
  44. 'eventType': self.event_type.value,
  45. 'description': self.description,
  46. 'imageUrl': self.image_url,
  47. 'trackId': self.track_id,
  48. 'confidence': self.confidence,
  49. 'location': self.location,
  50. 'timestamp': self.timestamp,
  51. 'extra': self.extra
  52. }
  53. class EventPusher:
  54. """
  55. 事件推送器
  56. 负责将安全事件推送到业务平台
  57. """
  58. def __init__(self, config: Optional[Dict[str, Any]] = None):
  59. """
  60. 初始化事件推送器
  61. Args:
  62. config: 配置字典
  63. """
  64. self.config = config or {}
  65. # API 配置
  66. self.api_host = self.config.get('api_host', 'jtjai.device.wenhq.top')
  67. self.api_port = self.config.get('api_port', 8583)
  68. self.use_https = self.config.get('use_https', True)
  69. # 基础 URL(优先使用配置中的 base_url)
  70. self.base_url = self.config.get('base_url')
  71. if not self.base_url:
  72. protocol = 'https' if self.use_https else 'http'
  73. self.base_url = f"{protocol}://{self.api_host}:{self.api_port}"
  74. # 上传接口
  75. self.upload_url = self.config.get('upload_url', '/api/resource/oss/upload')
  76. self.event_url = self.config.get('event_url', '/api/system/event')
  77. # 推送控制
  78. self.enabled = self.config.get('enabled', True)
  79. self.upload_interval = self.config.get('upload_interval', 2.0) # 推送间隔
  80. self.retry_count = self.config.get('retry_count', 3) # 重试次数
  81. self.retry_delay = self.config.get('retry_delay', 1.0) # 重试延迟
  82. # 事件队列
  83. self.event_queue = queue.Queue()
  84. # 工作线程
  85. self.running = False
  86. self.worker_thread = None
  87. # 上次推送时间
  88. self.last_push_time = 0
  89. # 统计
  90. self.stats = {
  91. 'total_events': 0,
  92. 'pushed_events': 0,
  93. 'failed_events': 0,
  94. 'upload_success': 0,
  95. 'upload_failed': 0
  96. }
  97. self.stats_lock = threading.Lock()
  98. def start(self):
  99. """启动推送器"""
  100. if self.running:
  101. return
  102. self.running = True
  103. self.worker_thread = threading.Thread(target=self._worker, daemon=True)
  104. self.worker_thread.start()
  105. print("事件推送器已启动")
  106. def stop(self):
  107. """停止推送器"""
  108. self.running = False
  109. if self.worker_thread:
  110. self.worker_thread.join(timeout=3)
  111. print("事件推送器已停止")
  112. def push_event(self, event: SafetyEvent):
  113. """
  114. 推送事件
  115. Args:
  116. event: 安全事件
  117. """
  118. if not self.enabled:
  119. return
  120. # 检查推送间隔
  121. current_time = time.time()
  122. if current_time - self.last_push_time < self.upload_interval:
  123. return
  124. self.event_queue.put(event)
  125. with self.stats_lock:
  126. self.stats['total_events'] += 1
  127. def push_safety_violation(self, description: str, image: Optional[np.ndarray] = None,
  128. track_id: int = 0, confidence: float = 0.0,
  129. location: str = "施工现场") -> bool:
  130. """
  131. 推送安全违规事件
  132. Args:
  133. description: 违规描述
  134. image: 图像 (可选)
  135. track_id: 跟踪ID
  136. confidence: 置信度
  137. location: 位置
  138. Returns:
  139. 是否成功加入队列
  140. """
  141. event = SafetyEvent(
  142. event_type=EventType.SAFETY_VIOLATION,
  143. description=description,
  144. track_id=track_id,
  145. confidence=confidence,
  146. location=location
  147. )
  148. # 保存图像
  149. if image is not None:
  150. temp_path = f"/tmp/safety_event_{int(time.time() * 1000)}.jpg"
  151. cv2.imwrite(temp_path, image)
  152. event.image_path = temp_path
  153. self.push_event(event)
  154. return True
  155. def upload_numpy_image(self, image: Optional[np.ndarray]) -> Optional[str]:
  156. """
  157. 将 numpy 图片上传到 OSS
  158. Args:
  159. image: numpy 图像数组
  160. Returns:
  161. 图片URL或None
  162. """
  163. if image is None:
  164. return None
  165. temp_path = None
  166. try:
  167. with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
  168. temp_path = f.name
  169. cv2.imwrite(temp_path, image)
  170. url = self._upload_image(temp_path)
  171. return url
  172. except Exception as e:
  173. print(f"上传 numpy 图片失败: {e}")
  174. return None
  175. finally:
  176. if temp_path:
  177. try:
  178. os.remove(temp_path)
  179. except Exception:
  180. pass
  181. def push_tracking_capture(self, batch_time: float, captures: List[Dict[str, Any]]) -> Optional[requests.Response]:
  182. """
  183. 推送一轮多目标跟踪抓拍事件
  184. Args:
  185. batch_time: 批次时间戳
  186. captures: 抓拍记录列表
  187. Returns:
  188. 响应对象或None
  189. """
  190. payload = {
  191. "eventType": "TRACKING_CAPTURE",
  192. "eventTime": datetime.fromtimestamp(batch_time).isoformat(),
  193. "deviceId": self.config.get("device_id"),
  194. "data": {
  195. "captureCount": len(captures),
  196. "captures": captures,
  197. }
  198. }
  199. url = f"{self.base_url}{self.event_url}"
  200. return self._post(url, payload)
  201. def _post(self, url: str, json_data: Dict[str, Any]) -> Optional[requests.Response]:
  202. """
  203. 发送 POST 请求
  204. Args:
  205. url: 请求URL
  206. json_data: JSON数据
  207. Returns:
  208. 响应对象或None
  209. """
  210. for attempt in range(self.retry_count):
  211. try:
  212. response = requests.post(url, json=json_data, verify=False, timeout=10)
  213. return response
  214. except Exception as e:
  215. print(f"POST 请求异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
  216. if attempt < self.retry_count - 1:
  217. time.sleep(self.retry_delay)
  218. return None
  219. def _worker(self):
  220. """工作线程"""
  221. while self.running:
  222. try:
  223. # 获取事件
  224. try:
  225. event = self.event_queue.get(timeout=1.0)
  226. except queue.Empty:
  227. continue
  228. # 处理事件
  229. self._process_event(event)
  230. except Exception as e:
  231. print(f"事件处理错误: {e}")
  232. def _process_event(self, event: SafetyEvent):
  233. """处理单个事件"""
  234. try:
  235. # 上传图片
  236. if event.image_path:
  237. image_url = self._upload_image(event.image_path)
  238. if image_url:
  239. event.image_url = image_url
  240. with self.stats_lock:
  241. self.stats['upload_success'] += 1
  242. else:
  243. with self.stats_lock:
  244. self.stats['upload_failed'] += 1
  245. # 清理临时文件
  246. if os.path.exists(event.image_path):
  247. try:
  248. os.remove(event.image_path)
  249. except Exception as e:
  250. print(f"清理临时文件失败: {e}")
  251. # 创建事件
  252. success = self._create_event(event)
  253. if success:
  254. self.last_push_time = time.time()
  255. with self.stats_lock:
  256. self.stats['pushed_events'] += 1
  257. print(f"事件推送成功: {event.description}")
  258. else:
  259. with self.stats_lock:
  260. self.stats['failed_events'] += 1
  261. print(f"事件推送失败: {event.description}")
  262. except Exception as e:
  263. print(f"处理事件错误: {e}")
  264. with self.stats_lock:
  265. self.stats['failed_events'] += 1
  266. def _upload_image(self, image_path: str) -> Optional[str]:
  267. """
  268. 上传图片到 OSS
  269. Args:
  270. image_path: 图片路径
  271. Returns:
  272. 图片URL或None
  273. """
  274. if not os.path.exists(image_path):
  275. return None
  276. filename = os.path.basename(image_path)
  277. content_type = mimetypes.guess_type(image_path)[0] or 'image/jpeg'
  278. url = f"{self.base_url}{self.upload_url}"
  279. for attempt in range(self.retry_count):
  280. try:
  281. with open(image_path, 'rb') as f:
  282. files = {'file': (filename, f, content_type)}
  283. response = requests.post(
  284. url,
  285. files=files,
  286. headers={'User-Agent': 'SafetySystem/1.0'},
  287. verify=False,
  288. timeout=10
  289. )
  290. if response.status_code == 200:
  291. result = response.json()
  292. if result.get('code') == 200:
  293. return result.get('data', {}).get('purl')
  294. else:
  295. print(f"上传失败: {result.get('msg', '未知错误')}")
  296. else:
  297. print(f"上传失败: HTTP {response.status_code}")
  298. except Exception as e:
  299. print(f"上传异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
  300. if attempt < self.retry_count - 1:
  301. time.sleep(self.retry_delay)
  302. return None
  303. def _create_event(self, event: SafetyEvent) -> bool:
  304. """
  305. 在业务平台创建事件
  306. Args:
  307. event: 安全事件
  308. Returns:
  309. 是否成功
  310. """
  311. create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(event.timestamp))
  312. data = {
  313. "createTime": create_time,
  314. "addr": event.description,
  315. "ext1": json.dumps([event.image_url]) if event.image_url else "[]",
  316. "ext2": json.dumps({
  317. "lx": "工地安全",
  318. "type": event.event_type.value,
  319. "trackId": event.track_id,
  320. "confidence": event.confidence,
  321. "location": event.location
  322. })
  323. }
  324. url = f"{self.base_url}{self.event_url}"
  325. response = self._post(url, data)
  326. if response is None:
  327. return False
  328. try:
  329. if response.status_code == 200:
  330. result = response.json()
  331. if result.get('code') == 200:
  332. return True
  333. else:
  334. print(f"创建事件失败: {result.get('msg', '未知错误')}")
  335. else:
  336. print(f"创建事件失败: HTTP {response.status_code}")
  337. except Exception as e:
  338. print(f"创建事件解析响应异常: {e}")
  339. return False
  340. def get_stats(self) -> Dict[str, int]:
  341. """获取统计信息"""
  342. with self.stats_lock:
  343. return self.stats.copy()
  344. class EventListener:
  345. """
  346. 事件监听器
  347. 监听业务平台的指令(如语音播放指令)
  348. """
  349. def __init__(self, config: Optional[Dict[str, Any]] = None):
  350. """
  351. 初始化事件监听器
  352. Args:
  353. config: 配置字典
  354. """
  355. self.config = config or {}
  356. # WebSocket 或 HTTP 长轮询配置
  357. self.listen_url = self.config.get('listen_url', '')
  358. self.poll_interval = self.config.get('poll_interval', 5.0)
  359. # 回调
  360. self.on_voice_command = None
  361. self.on_other_command = None
  362. # 运行状态
  363. self.running = False
  364. self.listener_thread = None
  365. def start(self):
  366. """启动监听"""
  367. if self.running:
  368. return
  369. self.running = True
  370. self.listener_thread = threading.Thread(target=self._listener_worker, daemon=True)
  371. self.listener_thread.start()
  372. print("事件监听器已启动")
  373. def stop(self):
  374. """停止监听"""
  375. self.running = False
  376. if self.listener_thread:
  377. self.listener_thread.join(timeout=3)
  378. print("事件监听器已停止")
  379. def _listener_worker(self):
  380. """监听工作线程"""
  381. while self.running:
  382. try:
  383. # 轮询获取指令
  384. # TODO: 实现 WebSocket 或 HTTP 长轮询
  385. commands = self._poll_commands()
  386. for cmd in commands:
  387. self._process_command(cmd)
  388. time.sleep(self.poll_interval)
  389. except Exception as e:
  390. print(f"监听错误: {e}")
  391. time.sleep(1.0)
  392. def _poll_commands(self) -> List[Dict[str, Any]]:
  393. """
  394. 轮询获取指令
  395. Returns:
  396. 指令列表
  397. """
  398. # TODO: 实现具体的轮询逻辑
  399. # 这里可以对接业务平台的指令接口
  400. return []
  401. def _process_command(self, cmd: Dict[str, Any]):
  402. """处理指令"""
  403. cmd_type = cmd.get('type', '')
  404. if cmd_type == 'voice':
  405. # 语音播放指令
  406. if self.on_voice_command:
  407. self.on_voice_command(cmd)
  408. else:
  409. # 其他指令
  410. if self.on_other_command:
  411. self.on_other_command(cmd)
  412. def set_voice_callback(self, callback: Optional[Callable[[Dict[str, Any]], None]]):
  413. """设置语音播放回调"""
  414. self.on_voice_command = callback
  415. def set_other_callback(self, callback: Optional[Callable[[Dict[str, Any]], None]]):
  416. """设置其他指令回调"""
  417. self.on_other_command = callback