event_pusher.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. """
  2. 事件推送模块
  3. 当前仅保留跟踪抓拍事件的多图片上传与批量推送功能。
  4. OCR/LLM/安全违规相关事件推送已移除。
  5. """
  6. import os
  7. import time
  8. import json
  9. import tempfile
  10. import threading
  11. import requests
  12. import mimetypes
  13. from typing import Optional, Dict, Any, List
  14. from datetime import datetime
  15. import cv2
  16. import numpy as np
  17. class EventPusher:
  18. """
  19. 事件推送器
  20. 负责将跟踪抓拍事件推送到业务平台
  21. """
  22. def __init__(self, config: Optional[Dict[str, Any]] = None):
  23. """
  24. 初始化事件推送器
  25. Args:
  26. config: 配置字典
  27. """
  28. self.config = config or {}
  29. # API 配置
  30. self.api_host = self.config.get('api_host', 'jtjai.device.wenhq.top')
  31. self.api_port = self.config.get('api_port', 8583)
  32. self.use_https = self.config.get('use_https', True)
  33. # 基础 URL(优先使用配置中的 base_url)
  34. self.base_url = self.config.get('base_url')
  35. if not self.base_url:
  36. protocol = 'https' if self.use_https else 'http'
  37. self.base_url = f"{protocol}://{self.api_host}:{self.api_port}"
  38. # 上传接口
  39. self.upload_url = self.config.get('upload_url', '/api/resource/oss/upload')
  40. self.event_url = self.config.get('event_url', '/api/system/event')
  41. # 推送控制
  42. self.enabled = self.config.get('enabled', True)
  43. self.retry_count = self.config.get('retry_count', 3)
  44. self.retry_delay = self.config.get('retry_delay', 1.0)
  45. # 工作线程
  46. self.running = False
  47. # 统计
  48. self.stats = {
  49. 'total_events': 0,
  50. 'pushed_events': 0,
  51. 'failed_events': 0,
  52. 'upload_success': 0,
  53. 'upload_failed': 0
  54. }
  55. self.stats_lock = threading.Lock()
  56. def start(self):
  57. """启动推送器(当前为无状态接口调用,保留方法便于系统统一生命周期管理)"""
  58. self.running = True
  59. print("事件推送器已启动")
  60. def stop(self):
  61. """停止推送器"""
  62. self.running = False
  63. print("事件推送器已停止")
  64. def upload_numpy_image(self, image: Optional[np.ndarray]) -> Optional[str]:
  65. """
  66. 将 numpy 图片上传到 OSS
  67. Args:
  68. image: numpy 图像数组
  69. Returns:
  70. 图片URL或None
  71. """
  72. if image is None:
  73. return None
  74. temp_path = None
  75. try:
  76. with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
  77. temp_path = f.name
  78. cv2.imwrite(temp_path, image)
  79. url = self._upload_image(temp_path)
  80. return url
  81. except Exception as e:
  82. print(f"上传 numpy 图片失败: {e}")
  83. return None
  84. finally:
  85. if temp_path:
  86. try:
  87. os.remove(temp_path)
  88. except Exception:
  89. pass
  90. def push_tracking_capture(self, batch_time: float, captures: List[Dict[str, Any]]) -> Optional[requests.Response]:
  91. """
  92. 推送一轮多目标跟踪抓拍事件
  93. Args:
  94. batch_time: 批次时间戳
  95. captures: 抓拍记录列表
  96. Returns:
  97. 响应对象或None
  98. """
  99. payload = {
  100. "eventType": "TRACKING_CAPTURE",
  101. "eventTime": datetime.fromtimestamp(batch_time).isoformat(),
  102. "deviceId": self.config.get("device_id"),
  103. "data": {
  104. "captureCount": len(captures),
  105. "captures": captures,
  106. }
  107. }
  108. url = f"{self.base_url}{self.event_url}"
  109. with self.stats_lock:
  110. self.stats['total_events'] += 1
  111. response = self._post(url, payload)
  112. if response is not None:
  113. with self.stats_lock:
  114. self.stats['pushed_events'] += 1
  115. else:
  116. with self.stats_lock:
  117. self.stats['failed_events'] += 1
  118. return response
  119. def _post(self, url: str, json_data: Dict[str, Any]) -> Optional[requests.Response]:
  120. """
  121. 发送 POST 请求
  122. Args:
  123. url: 请求URL
  124. json_data: JSON数据
  125. Returns:
  126. 响应对象或None
  127. """
  128. for attempt in range(self.retry_count):
  129. try:
  130. response = requests.post(url, json=json_data, verify=False, timeout=10)
  131. return response
  132. except Exception as e:
  133. print(f"POST 请求异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
  134. if attempt < self.retry_count - 1:
  135. time.sleep(self.retry_delay)
  136. return None
  137. def _upload_image(self, image_path: str) -> Optional[str]:
  138. """
  139. 上传图片到 OSS
  140. Args:
  141. image_path: 图片路径
  142. Returns:
  143. 图片URL或None
  144. """
  145. if not os.path.exists(image_path):
  146. return None
  147. filename = os.path.basename(image_path)
  148. content_type = mimetypes.guess_type(image_path)[0] or 'image/jpeg'
  149. url = f"{self.base_url}{self.upload_url}"
  150. for attempt in range(self.retry_count):
  151. try:
  152. with open(image_path, 'rb') as f:
  153. files = {'file': (filename, f, content_type)}
  154. response = requests.post(
  155. url,
  156. files=files,
  157. headers={'User-Agent': 'SafetySystem/1.0'},
  158. verify=False,
  159. timeout=10
  160. )
  161. if response.status_code == 200:
  162. result = response.json()
  163. if result.get('code') == 200:
  164. with self.stats_lock:
  165. self.stats['upload_success'] += 1
  166. return result.get('data', {}).get('purl')
  167. else:
  168. print(f"上传失败: {result.get('msg', '未知错误')}")
  169. else:
  170. print(f"上传失败: HTTP {response.status_code}")
  171. except Exception as e:
  172. print(f"上传异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
  173. if attempt < self.retry_count - 1:
  174. time.sleep(self.retry_delay)
  175. with self.stats_lock:
  176. self.stats['upload_failed'] += 1
  177. return None
  178. def get_stats(self) -> Dict[str, int]:
  179. """获取统计信息"""
  180. with self.stats_lock:
  181. return self.stats.copy()