| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- """
- 事件推送模块
- 当前仅保留跟踪抓拍事件的多图片上传与批量推送功能。
- OCR/LLM/安全违规相关事件推送已移除。
- """
- import os
- import time
- import json
- import tempfile
- import threading
- import requests
- import mimetypes
- from typing import Optional, Dict, Any, List
- from datetime import datetime
- import cv2
- import numpy as np
- class EventPusher:
- """
- 事件推送器
- 负责将跟踪抓拍事件推送到业务平台
- """
- def __init__(self, config: Optional[Dict[str, Any]] = None):
- """
- 初始化事件推送器
- Args:
- config: 配置字典
- """
- self.config = config or {}
- # API 配置
- self.api_host = self.config.get('api_host', 'jtjai.device.wenhq.top')
- self.api_port = self.config.get('api_port', 8583)
- self.use_https = self.config.get('use_https', True)
- # 基础 URL(优先使用配置中的 base_url)
- self.base_url = self.config.get('base_url')
- if not self.base_url:
- protocol = 'https' if self.use_https else 'http'
- self.base_url = f"{protocol}://{self.api_host}:{self.api_port}"
- # 上传接口
- self.upload_url = self.config.get('upload_url', '/api/resource/oss/upload')
- self.event_url = self.config.get('event_url', '/api/system/event')
- # 推送控制
- self.enabled = self.config.get('enabled', True)
- self.retry_count = self.config.get('retry_count', 3)
- self.retry_delay = self.config.get('retry_delay', 1.0)
- # 工作线程
- self.running = False
- # 统计
- self.stats = {
- 'total_events': 0,
- 'pushed_events': 0,
- 'failed_events': 0,
- 'upload_success': 0,
- 'upload_failed': 0
- }
- self.stats_lock = threading.Lock()
- def start(self):
- """启动推送器(当前为无状态接口调用,保留方法便于系统统一生命周期管理)"""
- self.running = True
- print("事件推送器已启动")
- def stop(self):
- """停止推送器"""
- self.running = False
- print("事件推送器已停止")
- def upload_numpy_image(self, image: Optional[np.ndarray]) -> Optional[str]:
- """
- 将 numpy 图片上传到 OSS
- Args:
- image: numpy 图像数组
- Returns:
- 图片URL或None
- """
- if image is None:
- return None
- temp_path = None
- try:
- with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as f:
- temp_path = f.name
- cv2.imwrite(temp_path, image)
- url = self._upload_image(temp_path)
- return url
- except Exception as e:
- print(f"上传 numpy 图片失败: {e}")
- return None
- finally:
- if temp_path:
- try:
- os.remove(temp_path)
- except Exception:
- pass
- def push_tracking_capture(self, batch_time: float, captures: List[Dict[str, Any]]) -> Optional[requests.Response]:
- """
- 推送一轮多目标跟踪抓拍事件
- Args:
- batch_time: 批次时间戳
- captures: 抓拍记录列表
- Returns:
- 响应对象或None
- """
- payload = {
- "eventType": "TRACKING_CAPTURE",
- "eventTime": datetime.fromtimestamp(batch_time).isoformat(),
- "deviceId": self.config.get("device_id"),
- "data": {
- "captureCount": len(captures),
- "captures": captures,
- }
- }
- url = f"{self.base_url}{self.event_url}"
- with self.stats_lock:
- self.stats['total_events'] += 1
- response = self._post(url, payload)
- if response is not None:
- with self.stats_lock:
- self.stats['pushed_events'] += 1
- else:
- with self.stats_lock:
- self.stats['failed_events'] += 1
- return response
- def _post(self, url: str, json_data: Dict[str, Any]) -> Optional[requests.Response]:
- """
- 发送 POST 请求
- Args:
- url: 请求URL
- json_data: JSON数据
- Returns:
- 响应对象或None
- """
- for attempt in range(self.retry_count):
- try:
- response = requests.post(url, json=json_data, verify=False, timeout=10)
- return response
- except Exception as e:
- print(f"POST 请求异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
- if attempt < self.retry_count - 1:
- time.sleep(self.retry_delay)
- return None
- def _upload_image(self, image_path: str) -> Optional[str]:
- """
- 上传图片到 OSS
- Args:
- image_path: 图片路径
- Returns:
- 图片URL或None
- """
- if not os.path.exists(image_path):
- return None
- filename = os.path.basename(image_path)
- content_type = mimetypes.guess_type(image_path)[0] or 'image/jpeg'
- url = f"{self.base_url}{self.upload_url}"
- for attempt in range(self.retry_count):
- try:
- with open(image_path, 'rb') as f:
- files = {'file': (filename, f, content_type)}
- response = requests.post(
- url,
- files=files,
- headers={'User-Agent': 'SafetySystem/1.0'},
- verify=False,
- timeout=10
- )
- if response.status_code == 200:
- result = response.json()
- if result.get('code') == 200:
- with self.stats_lock:
- self.stats['upload_success'] += 1
- return result.get('data', {}).get('purl')
- else:
- print(f"上传失败: {result.get('msg', '未知错误')}")
- else:
- print(f"上传失败: HTTP {response.status_code}")
- except Exception as e:
- print(f"上传异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
- if attempt < self.retry_count - 1:
- time.sleep(self.retry_delay)
- with self.stats_lock:
- self.stats['upload_failed'] += 1
- return None
- def get_stats(self) -> Dict[str, int]:
- """获取统计信息"""
- with self.stats_lock:
- return self.stats.copy()
|