event_pusher.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. """
  2. 事件推送模块
  3. 将识别到的安全违规事件推送到业务平台
  4. """
  5. import os
  6. import time
  7. import json
  8. import threading
  9. import queue
  10. import tempfile
  11. import requests
  12. import http.client
  13. import mimetypes
  14. from typing import Optional, Dict, Any, List
  15. from dataclasses import dataclass
  16. from datetime import datetime
  17. from enum import Enum
  18. from codecs import encode
  19. import cv2
  20. import numpy as np
  21. class EventType(Enum):
  22. """事件类型"""
  23. SAFETY_VIOLATION = "安全违规" # 安全违规(未戴安全帽/未穿反光衣)
  24. INTRUSION = "入侵检测" # 禁区入侵
  25. LOITERING = "徘徊检测" # 徘徊检测
  26. @dataclass
  27. class SafetyEvent:
  28. """安全事件"""
  29. event_type: EventType # 事件类型
  30. description: str # 事件描述
  31. image_path: Optional[str] = None # 图片路径
  32. image_url: Optional[str] = None # 图片URL (上传后)
  33. track_id: int = 0 # 跟踪ID
  34. confidence: float = 0.0 # 置信度
  35. location: str = "" # 位置信息
  36. timestamp: float = 0.0 # 时间戳
  37. extra: Dict[str, Any] = None # 额外信息
  38. def __post_init__(self):
  39. if self.timestamp == 0.0:
  40. self.timestamp = time.time()
  41. if self.extra is None:
  42. self.extra = {}
  43. def to_dict(self) -> Dict[str, Any]:
  44. """转换为字典"""
  45. return {
  46. 'eventType': self.event_type.value,
  47. 'description': self.description,
  48. 'imageUrl': self.image_url,
  49. 'trackId': self.track_id,
  50. 'confidence': self.confidence,
  51. 'location': self.location,
  52. 'timestamp': self.timestamp,
  53. 'extra': self.extra
  54. }
  55. class EventPusher:
  56. """
  57. 事件推送器
  58. 负责将安全事件推送到业务平台
  59. """
  60. def __init__(self, config: Dict[str, Any] = None):
  61. """
  62. 初始化事件推送器
  63. Args:
  64. config: 配置字典
  65. """
  66. self.config = config or {}
  67. # API 配置
  68. self.api_host = self.config.get('api_host', 'jtjai.device.wenhq.top')
  69. self.api_port = self.config.get('api_port', 8583)
  70. self.use_https = self.config.get('use_https', True)
  71. # 基础 URL(优先使用配置中的 base_url)
  72. self.base_url = self.config.get('base_url')
  73. if not self.base_url:
  74. protocol = 'https' if self.use_https else 'http'
  75. self.base_url = f"{protocol}://{self.api_host}:{self.api_port}"
  76. # 上传接口
  77. self.upload_url = self.config.get('upload_url', '/api/resource/oss/upload')
  78. self.event_url = self.config.get('event_url', '/api/system/event')
  79. # 推送控制
  80. self.enabled = self.config.get('enabled', True)
  81. self.upload_interval = self.config.get('upload_interval', 2.0) # 推送间隔
  82. self.retry_count = self.config.get('retry_count', 3) # 重试次数
  83. self.retry_delay = self.config.get('retry_delay', 1.0) # 重试延迟
  84. # 事件队列
  85. self.event_queue = queue.Queue()
  86. # 工作线程
  87. self.running = False
  88. self.worker_thread = None
  89. # 上次推送时间
  90. self.last_push_time = 0
  91. # 统计
  92. self.stats = {
  93. 'total_events': 0,
  94. 'pushed_events': 0,
  95. 'failed_events': 0,
  96. 'upload_success': 0,
  97. 'upload_failed': 0
  98. }
  99. self.stats_lock = threading.Lock()
  100. def start(self):
  101. """启动推送器"""
  102. if self.running:
  103. return
  104. self.running = True
  105. self.worker_thread = threading.Thread(target=self._worker, daemon=True)
  106. self.worker_thread.start()
  107. print("事件推送器已启动")
  108. def stop(self):
  109. """停止推送器"""
  110. self.running = False
  111. if self.worker_thread:
  112. self.worker_thread.join(timeout=3)
  113. print("事件推送器已停止")
  114. def push_event(self, event: SafetyEvent):
  115. """
  116. 推送事件
  117. Args:
  118. event: 安全事件
  119. """
  120. if not self.enabled:
  121. return
  122. # 检查推送间隔
  123. current_time = time.time()
  124. if current_time - self.last_push_time < self.upload_interval:
  125. return
  126. self.event_queue.put(event)
  127. with self.stats_lock:
  128. self.stats['total_events'] += 1
  129. def push_safety_violation(self, description: str, image: np.ndarray = None,
  130. track_id: int = 0, confidence: float = 0.0,
  131. location: str = "施工现场") -> bool:
  132. """
  133. 推送安全违规事件
  134. Args:
  135. description: 违规描述
  136. image: 图像 (可选)
  137. track_id: 跟踪ID
  138. confidence: 置信度
  139. location: 位置
  140. Returns:
  141. 是否成功加入队列
  142. """
  143. event = SafetyEvent(
  144. event_type=EventType.SAFETY_VIOLATION,
  145. description=description,
  146. track_id=track_id,
  147. confidence=confidence,
  148. location=location
  149. )
  150. # 保存图像
  151. if image is not None:
  152. temp_path = f"/tmp/safety_event_{int(time.time() * 1000)}.jpg"
  153. cv2.imwrite(temp_path, image)
  154. event.image_path = temp_path
  155. self.push_event(event)
  156. return True
  157. def upload_numpy_image(self, image: np.ndarray) -> Optional[str]:
  158. """
  159. 将 numpy 图片上传到 OSS
  160. Args:
  161. image: numpy 图像数组
  162. Returns:
  163. 图片URL或None
  164. """
  165. if image is None:
  166. return None
  167. temp_path = None
  168. try:
  169. with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
  170. temp_path = f.name
  171. cv2.imwrite(temp_path, image)
  172. url = self._upload_image(temp_path)
  173. return url
  174. except Exception as e:
  175. print(f"上传 numpy 图片失败: {e}")
  176. return None
  177. finally:
  178. if temp_path:
  179. try:
  180. os.remove(temp_path)
  181. except Exception:
  182. pass
  183. def push_tracking_capture(self, batch_time: float, captures: List[dict]):
  184. """
  185. 推送一轮多目标跟踪抓拍事件
  186. Args:
  187. batch_time: 批次时间戳
  188. captures: 抓拍记录列表
  189. Returns:
  190. 响应对象或None
  191. """
  192. payload = {
  193. "eventType": "TRACKING_CAPTURE",
  194. "eventTime": datetime.fromtimestamp(batch_time).isoformat(),
  195. "deviceId": self.config.get("device_id"),
  196. "data": {
  197. "captureCount": len(captures),
  198. "captures": captures,
  199. }
  200. }
  201. url = f"{self.base_url}{self.event_url}"
  202. return self._post(url, payload)
  203. def _post(self, url: str, json_data: dict):
  204. """
  205. 发送 POST 请求
  206. Args:
  207. url: 请求URL
  208. json_data: JSON数据
  209. Returns:
  210. 响应对象或None
  211. """
  212. for attempt in range(self.retry_count):
  213. try:
  214. response = requests.post(url, json=json_data, verify=False, timeout=10)
  215. return response
  216. except Exception as e:
  217. print(f"POST 请求异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
  218. if attempt < self.retry_count - 1:
  219. time.sleep(self.retry_delay)
  220. return None
  221. def _worker(self):
  222. """工作线程"""
  223. while self.running:
  224. try:
  225. # 获取事件
  226. try:
  227. event = self.event_queue.get(timeout=1.0)
  228. except queue.Empty:
  229. continue
  230. # 处理事件
  231. self._process_event(event)
  232. except Exception as e:
  233. print(f"事件处理错误: {e}")
  234. def _process_event(self, event: SafetyEvent):
  235. """处理单个事件"""
  236. try:
  237. # 上传图片
  238. if event.image_path:
  239. image_url = self._upload_image(event.image_path)
  240. if image_url:
  241. event.image_url = image_url
  242. with self.stats_lock:
  243. self.stats['upload_success'] += 1
  244. else:
  245. with self.stats_lock:
  246. self.stats['upload_failed'] += 1
  247. # 清理临时文件
  248. if os.path.exists(event.image_path):
  249. try:
  250. os.remove(event.image_path)
  251. except:
  252. pass
  253. # 创建事件
  254. success = self._create_event(event)
  255. if success:
  256. self.last_push_time = time.time()
  257. with self.stats_lock:
  258. self.stats['pushed_events'] += 1
  259. print(f"事件推送成功: {event.description}")
  260. else:
  261. with self.stats_lock:
  262. self.stats['failed_events'] += 1
  263. print(f"事件推送失败: {event.description}")
  264. except Exception as e:
  265. print(f"处理事件错误: {e}")
  266. with self.stats_lock:
  267. self.stats['failed_events'] += 1
  268. def _upload_image(self, image_path: str) -> Optional[str]:
  269. """
  270. 上传图片到 OSS
  271. Args:
  272. image_path: 图片路径
  273. Returns:
  274. 图片URL或None
  275. """
  276. if not os.path.exists(image_path):
  277. return None
  278. for attempt in range(self.retry_count):
  279. conn = None
  280. try:
  281. filename = os.path.basename(image_path)
  282. # 创建连接
  283. if self.use_https:
  284. conn = http.client.HTTPSConnection(self.api_host, self.api_port)
  285. else:
  286. conn = http.client.HTTPConnection(self.api_host, self.api_port)
  287. # 准备 multipart/form-data
  288. boundary = f'wL36Yn8afVp8Ag7AmP8qZ0SA4n1v9T{int(time.time())}'
  289. dataList = []
  290. dataList.append(encode(f'--{boundary}'))
  291. dataList.append(encode(f'Content-Disposition: form-data; name=file; filename={filename}'))
  292. # 文件类型
  293. fileType = mimetypes.guess_type(image_path)[0] or 'image/jpeg'
  294. dataList.append(encode(f'Content-Type: {fileType}'))
  295. dataList.append(encode(''))
  296. # 读取文件
  297. with open(image_path, 'rb') as f:
  298. dataList.append(f.read())
  299. dataList.append(encode(f'--{boundary}--'))
  300. dataList.append(encode(''))
  301. body = b'\r\n'.join(dataList)
  302. headers = {
  303. 'User-Agent': 'SafetySystem/1.0',
  304. 'Accept': '*/*',
  305. 'Host': f'{self.api_host}:{self.api_port}',
  306. 'Connection': 'keep-alive',
  307. 'Content-Type': f'multipart/form-data; boundary={boundary}'
  308. }
  309. conn.request("POST", self.upload_url, body, headers)
  310. res = conn.getresponse()
  311. data = res.read()
  312. if res.status == 200:
  313. result = json.loads(data.decode("utf-8"))
  314. if result.get('code') == 200:
  315. return result.get('data', {}).get('purl')
  316. else:
  317. print(f"上传失败: {result.get('msg', '未知错误')}")
  318. else:
  319. print(f"上传失败: HTTP {res.status}")
  320. except Exception as e:
  321. print(f"上传异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
  322. if attempt < self.retry_count - 1:
  323. time.sleep(self.retry_delay)
  324. finally:
  325. if conn:
  326. conn.close()
  327. return None
  328. def _create_event(self, event: SafetyEvent) -> bool:
  329. """
  330. 在业务平台创建事件
  331. Args:
  332. event: 安全事件
  333. Returns:
  334. 是否成功
  335. """
  336. for attempt in range(self.retry_count):
  337. try:
  338. # 构建请求
  339. if self.use_https:
  340. base_url = f"https://{self.api_host}:{self.api_port}"
  341. else:
  342. base_url = f"http://{self.api_host}:{self.api_port}"
  343. url = f"{base_url}{self.event_url}"
  344. create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(event.timestamp))
  345. data = {
  346. "createTime": create_time,
  347. "addr": event.description,
  348. "ext1": json.dumps([event.image_url]) if event.image_url else "[]",
  349. "ext2": json.dumps({
  350. "lx": "工地安全",
  351. "type": event.event_type.value,
  352. "trackId": event.track_id,
  353. "confidence": event.confidence,
  354. "location": event.location
  355. })
  356. }
  357. response = requests.post(url, json=data, verify=False, timeout=10)
  358. if response.status_code == 200:
  359. result = response.json()
  360. if result.get('code') == 200:
  361. return True
  362. else:
  363. print(f"创建事件失败: {result.get('msg', '未知错误')}")
  364. else:
  365. print(f"创建事件失败: HTTP {response.status_code}")
  366. except Exception as e:
  367. print(f"创建事件异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
  368. if attempt < self.retry_count - 1:
  369. time.sleep(self.retry_delay)
  370. return False
  371. def get_stats(self) -> Dict[str, int]:
  372. """获取统计信息"""
  373. with self.stats_lock:
  374. return self.stats.copy()
  375. class EventListener:
  376. """
  377. 事件监听器
  378. 监听业务平台的指令(如语音播放指令)
  379. """
  380. def __init__(self, config: Dict[str, Any] = None):
  381. """
  382. 初始化事件监听器
  383. Args:
  384. config: 配置字典
  385. """
  386. self.config = config or {}
  387. # WebSocket 或 HTTP 长轮询配置
  388. self.listen_url = self.config.get('listen_url', '')
  389. self.poll_interval = self.config.get('poll_interval', 5.0)
  390. # 回调
  391. self.on_voice_command = None
  392. self.on_other_command = None
  393. # 运行状态
  394. self.running = False
  395. self.listener_thread = None
  396. def start(self):
  397. """启动监听"""
  398. if self.running:
  399. return
  400. self.running = True
  401. self.listener_thread = threading.Thread(target=self._listener_worker, daemon=True)
  402. self.listener_thread.start()
  403. print("事件监听器已启动")
  404. def stop(self):
  405. """停止监听"""
  406. self.running = False
  407. if self.listener_thread:
  408. self.listener_thread.join(timeout=3)
  409. print("事件监听器已停止")
  410. def _listener_worker(self):
  411. """监听工作线程"""
  412. while self.running:
  413. try:
  414. # 轮询获取指令
  415. # TODO: 实现 WebSocket 或 HTTP 长轮询
  416. commands = self._poll_commands()
  417. for cmd in commands:
  418. self._process_command(cmd)
  419. time.sleep(self.poll_interval)
  420. except Exception as e:
  421. print(f"监听错误: {e}")
  422. time.sleep(1.0)
  423. def _poll_commands(self) -> List[Dict[str, Any]]:
  424. """
  425. 轮询获取指令
  426. Returns:
  427. 指令列表
  428. """
  429. # TODO: 实现具体的轮询逻辑
  430. # 这里可以对接业务平台的指令接口
  431. return []
  432. def _process_command(self, cmd: Dict[str, Any]):
  433. """处理指令"""
  434. cmd_type = cmd.get('type', '')
  435. if cmd_type == 'voice':
  436. # 语音播放指令
  437. if self.on_voice_command:
  438. self.on_voice_command(cmd)
  439. else:
  440. # 其他指令
  441. if self.on_other_command:
  442. self.on_other_command(cmd)
  443. def set_voice_callback(self, callback):
  444. """设置语音播放回调"""
  445. self.on_voice_command = callback
  446. def set_other_callback(self, callback):
  447. """设置其他指令回调"""
  448. self.on_other_command = callback