oss_uploader.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. """
  2. OSS 上传模块
  3. 支持兼容 S3 的对象存储(MinIO、AWS S3、阿里云 OSS 等)
  4. """
  5. import os
  6. import time
  7. import json
  8. import logging
  9. import threading
  10. import queue
  11. from typing import Optional, Dict, Any, Tuple, List
  12. from pathlib import Path
  13. from datetime import datetime
  14. from dataclasses import dataclass, field
  15. import cv2
  16. import numpy as np
  17. logger = logging.getLogger(__name__)
  18. # 尝试导入 boto3
  19. try:
  20. import boto3
  21. from botocore.exceptions import ClientError
  22. BOTO3_AVAILABLE = True
  23. except ImportError:
  24. BOTO3_AVAILABLE = False
  25. logger.warning("boto3 未安装,S3 兼容存储功能不可用")
  26. @dataclass
  27. class UploadTask:
  28. """上传任务"""
  29. local_path: str
  30. oss_key: str
  31. batch_id: str
  32. image_type: str # 'panorama' 或 'ptz'
  33. person_index: Optional[int] = None # 仅用于 PTZ 图片
  34. retry_count: int = 0
  35. callback: Optional[callable] = None
  36. @dataclass
  37. class UploadResult:
  38. """上传结果"""
  39. success: bool
  40. local_path: str
  41. oss_key: str
  42. oss_url: str
  43. error: Optional[str] = None
  44. class OSSUploader:
  45. """
  46. OSS 上传器
  47. 支持兼容 S3 的对象存储(MinIO、AWS S3、阿里云 OSS 等)
  48. """
  49. def __init__(self, config: Dict[str, Any] = None):
  50. """
  51. 初始化 OSS 上传器
  52. Args:
  53. config: OSS 配置字典
  54. """
  55. from config import S3_COMPATIBLE_CONFIG
  56. self.config = config or S3_COMPATIBLE_CONFIG
  57. self.enabled = self.config.get('enabled', False)
  58. # S3 客户端
  59. self.s3_client = None
  60. # 上传队列
  61. self.upload_queue = queue.Queue()
  62. self.result_queue = queue.Queue()
  63. # 工作线程
  64. self.running = False
  65. self.worker_thread = None
  66. # 统计
  67. self.stats = {
  68. 'total_uploads': 0,
  69. 'success_uploads': 0,
  70. 'failed_uploads': 0,
  71. }
  72. self.stats_lock = threading.Lock()
  73. # 初始化客户端
  74. if self.enabled:
  75. self._init_client()
  76. def _init_client(self):
  77. """初始化 OSS 客户端"""
  78. try:
  79. self._init_s3_client()
  80. except Exception as e:
  81. logger.error(f"[OSS] 初始化客户端失败: {e}")
  82. self.enabled = False
  83. def _init_s3_client(self):
  84. """初始化 S3 兼容客户端"""
  85. if not BOTO3_AVAILABLE:
  86. logger.error("[OSS] boto3 未安装")
  87. self.enabled = False
  88. return
  89. if not self.config.get('enabled', False):
  90. logger.error("[OSS] S3 兼容配置未启用")
  91. self.enabled = False
  92. return
  93. endpoint_url = self.config.get('endpoint_url', '')
  94. region_name = self.config.get('region_name', 'us-east-1')
  95. access_key_id = self.config.get('access_key_id', '')
  96. secret_access_key = self.config.get('secret_access_key', '')
  97. if not all([endpoint_url, access_key_id, secret_access_key]):
  98. logger.error("[OSS] S3 配置不完整")
  99. self.enabled = False
  100. return
  101. self.s3_client = boto3.client(
  102. 's3',
  103. endpoint_url=endpoint_url,
  104. region_name=region_name,
  105. aws_access_key_id=access_key_id,
  106. aws_secret_access_key=secret_access_key,
  107. use_ssl=self.config.get('use_ssl', True),
  108. verify=False
  109. )
  110. logger.info(f"[OSS] S3 客户端初始化成功: {endpoint_url}")
  111. def start(self):
  112. """启动上传器"""
  113. if not self.enabled:
  114. logger.info("[OSS] 上传器未启用")
  115. return
  116. if self.running:
  117. return
  118. self.running = True
  119. self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
  120. self.worker_thread.start()
  121. logger.info("[OSS] 上传器已启动")
  122. def stop(self):
  123. """停止上传器"""
  124. self.running = False
  125. if self.worker_thread:
  126. self.worker_thread.join(timeout=5)
  127. logger.info("[OSS] 上传器已停止")
  128. def _worker_loop(self):
  129. """工作线程循环"""
  130. while self.running:
  131. try:
  132. task = self.upload_queue.get(timeout=1.0)
  133. self._process_upload(task)
  134. except queue.Empty:
  135. continue
  136. except Exception as e:
  137. logger.error(f"[OSS] 上传处理错误: {e}")
  138. def _process_upload(self, task: UploadTask):
  139. """处理单个上传任务"""
  140. result = self._upload_file(task)
  141. with self.stats_lock:
  142. self.stats['total_uploads'] += 1
  143. if result.success:
  144. self.stats['success_uploads'] += 1
  145. else:
  146. self.stats['failed_uploads'] += 1
  147. # 调用回调
  148. if task.callback:
  149. try:
  150. task.callback(result)
  151. except Exception as e:
  152. logger.error(f"[OSS] 回调执行错误: {e}")
  153. # 放入结果队列
  154. self.result_queue.put(result)
  155. def _upload_file(self, task: UploadTask) -> UploadResult:
  156. """
  157. 上传文件到 OSS
  158. Args:
  159. task: 上传任务
  160. Returns:
  161. UploadResult: 上传结果
  162. """
  163. local_path = task.local_path
  164. oss_key = task.oss_key
  165. if not os.path.exists(local_path):
  166. return UploadResult(
  167. success=False,
  168. local_path=local_path,
  169. oss_key=oss_key,
  170. oss_url='',
  171. error='文件不存在'
  172. )
  173. max_retries = self.config.get('retry_times', 3)
  174. for attempt in range(max_retries):
  175. try:
  176. if self.s3_client:
  177. # S3 兼容上传
  178. bucket_name = self.config.get('bucket_name', '')
  179. self.s3_client.upload_file(local_path, bucket_name, oss_key)
  180. # 生成 URL
  181. custom_domain = self.config.get('custom_domain', '')
  182. if custom_domain:
  183. oss_url = f"{custom_domain}/{oss_key}"
  184. else:
  185. endpoint_url = self.config.get('endpoint_url', '')
  186. oss_url = f"{endpoint_url}/{bucket_name}/{oss_key}"
  187. else:
  188. return UploadResult(
  189. success=False,
  190. local_path=local_path,
  191. oss_key=oss_key,
  192. oss_url='',
  193. error='OSS 客户端未初始化'
  194. )
  195. logger.info(f"[OSS] 上传成功: {local_path} -> {oss_key}")
  196. return UploadResult(
  197. success=True,
  198. local_path=local_path,
  199. oss_key=oss_key,
  200. oss_url=oss_url
  201. )
  202. except Exception as e:
  203. error_msg = str(e)
  204. logger.warning(f"[OSS] 上传失败 (尝试 {attempt + 1}/{max_retries}): {error_msg}")
  205. if attempt < max_retries - 1:
  206. time.sleep(1)
  207. else:
  208. return UploadResult(
  209. success=False,
  210. local_path=local_path,
  211. oss_key=oss_key,
  212. oss_url='',
  213. error=error_msg
  214. )
  215. return UploadResult(
  216. success=False,
  217. local_path=local_path,
  218. oss_key=oss_key,
  219. oss_url='',
  220. error='达到最大重试次数'
  221. )
  222. def upload_image(self, local_path: str, batch_id: str, image_type: str,
  223. person_index: Optional[int] = None,
  224. callback: Optional[callable] = None) -> str:
  225. """
  226. 上传图片(异步)
  227. Args:
  228. local_path: 本地图片路径
  229. batch_id: 批次ID
  230. image_type: 图片类型 ('panorama' 或 'ptz')
  231. person_index: 人员序号(仅用于 PTZ 图片)
  232. callback: 上传完成回调函数
  233. Returns:
  234. oss_key: OSS 对象键(用于后续查询结果)
  235. """
  236. if not self.enabled:
  237. return ''
  238. # 生成 OSS 路径
  239. timestamp = datetime.now().strftime("%Y%m%d")
  240. filename = os.path.basename(local_path)
  241. path_prefix = self.config.get('path_prefix', 'safety-system')
  242. if image_type == 'panorama':
  243. oss_key = f"{path_prefix}/{timestamp}/{batch_id}/panorama/{filename}"
  244. else:
  245. oss_key = f"{path_prefix}/{timestamp}/{batch_id}/ptz/{filename}"
  246. # 创建上传任务
  247. task = UploadTask(
  248. local_path=local_path,
  249. oss_key=oss_key,
  250. batch_id=batch_id,
  251. image_type=image_type,
  252. person_index=person_index,
  253. callback=callback
  254. )
  255. # 加入队列
  256. self.upload_queue.put(task)
  257. return oss_key
  258. def upload_image_sync(self, local_path: str, batch_id: str,
  259. image_type: str,
  260. person_index: Optional[int] = None) -> UploadResult:
  261. """
  262. 同步上传图片
  263. Args:
  264. local_path: 本地图片路径
  265. batch_id: 批次ID
  266. image_type: 图片类型
  267. person_index: 人员序号
  268. Returns:
  269. UploadResult: 上传结果
  270. """
  271. if not self.enabled:
  272. return UploadResult(
  273. success=False,
  274. local_path=local_path,
  275. oss_key='',
  276. oss_url='',
  277. error='OSS 未启用'
  278. )
  279. # 生成 OSS 路径
  280. timestamp = datetime.now().strftime("%Y%m%d")
  281. filename = os.path.basename(local_path)
  282. path_prefix = self.config.get('path_prefix', 'safety-system')
  283. if image_type == 'panorama':
  284. oss_key = f"{path_prefix}/{timestamp}/{batch_id}/panorama/{filename}"
  285. else:
  286. oss_key = f"{path_prefix}/{timestamp}/{batch_id}/ptz/{filename}"
  287. task = UploadTask(
  288. local_path=local_path,
  289. oss_key=oss_key,
  290. batch_id=batch_id,
  291. image_type=image_type,
  292. person_index=person_index
  293. )
  294. return self._upload_file(task)
  295. def get_upload_result(self, timeout: float = 0.1) -> Optional[UploadResult]:
  296. """
  297. 获取上传结果(非阻塞)
  298. Args:
  299. timeout: 等待超时时间
  300. Returns:
  301. UploadResult 或 None
  302. """
  303. try:
  304. return self.result_queue.get(timeout=timeout)
  305. except queue.Empty:
  306. return None
  307. def get_stats(self) -> Dict[str, int]:
  308. """获取统计信息"""
  309. with self.stats_lock:
  310. return self.stats.copy()
  311. def is_enabled(self) -> bool:
  312. """检查是否启用"""
  313. return self.enabled
  314. # 全局单例
  315. _oss_uploader_instance: Optional[OSSUploader] = None
  316. def get_oss_uploader(config: Dict[str, Any] = None) -> OSSUploader:
  317. """
  318. 获取 OSS 上传器实例(单例模式)
  319. Args:
  320. config: OSS 配置
  321. Returns:
  322. OSSUploader 实例
  323. """
  324. global _oss_uploader_instance
  325. if _oss_uploader_instance is None:
  326. _oss_uploader_instance = OSSUploader(config)
  327. return _oss_uploader_instance
  328. def reset_oss_uploader():
  329. """重置 OSS 上传器实例"""
  330. global _oss_uploader_instance
  331. if _oss_uploader_instance is not None:
  332. _oss_uploader_instance.stop()
  333. _oss_uploader_instance = None