oss_uploader.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. """
  2. OSS 上传模块
  3. 支持兼容 S3 的对象存储(MinIO、AWS S3、阿里云 OSS 等)
  4. """
  5. import os
  6. import time
  7. import json
  8. import logging
  9. import threading
  10. import queue
  11. from typing import Optional, Dict, Any, Tuple, List
  12. from pathlib import Path
  13. from datetime import datetime
  14. from dataclasses import dataclass, field
  15. import cv2
  16. import numpy as np
  17. logger = logging.getLogger(__name__)
  18. # 尝试导入 boto3
  19. try:
  20. import boto3
  21. from botocore.exceptions import ClientError
  22. BOTO3_AVAILABLE = True
  23. except ImportError:
  24. BOTO3_AVAILABLE = False
  25. logger.warning("boto3 未安装,S3 兼容存储功能不可用")
  26. @dataclass
  27. class UploadTask:
  28. """上传任务"""
  29. local_path: str
  30. oss_key: str
  31. batch_id: str
  32. image_type: str # 'panorama' 或 'ptz'
  33. person_index: Optional[int] = None # 仅用于 PTZ 图片
  34. retry_count: int = 0
  35. callback: Optional[callable] = None
  36. @dataclass
  37. class UploadResult:
  38. """上传结果"""
  39. success: bool
  40. local_path: str
  41. oss_key: str
  42. oss_url: str
  43. error: Optional[str] = None
  44. class OSSUploader:
  45. """
  46. OSS 上传器
  47. 支持兼容 S3 的对象存储(MinIO、AWS S3、阿里云 OSS 等)
  48. """
  49. def __init__(self, config: Dict[str, Any] = None):
  50. """
  51. 初始化 OSS 上传器
  52. Args:
  53. config: OSS 配置字典
  54. """
  55. from config import S3_COMPATIBLE_CONFIG
  56. self.config = config or S3_COMPATIBLE_CONFIG
  57. self.enabled = self.config.get('enabled', False)
  58. logger.info(f"[OSS] OSSUploader 初始化: enabled={self.enabled}, config={self.config}")
  59. # S3 客户端
  60. self.s3_client = None
  61. # 上传队列
  62. self.upload_queue = queue.Queue()
  63. self.result_queue = queue.Queue()
  64. # 工作线程
  65. self.running = False
  66. self.worker_thread = None
  67. # 统计
  68. self.stats = {
  69. 'total_uploads': 0,
  70. 'success_uploads': 0,
  71. 'failed_uploads': 0,
  72. }
  73. self.stats_lock = threading.Lock()
  74. # 初始化客户端
  75. if self.enabled:
  76. self._init_client()
  77. def _init_client(self):
  78. """初始化 OSS 客户端"""
  79. logger.info("[OSS] _init_client 开始...")
  80. try:
  81. self._init_s3_client()
  82. logger.info(f"[OSS] _init_client 完成: enabled={self.enabled}")
  83. except Exception as e:
  84. logger.error(f"[OSS] 初始化客户端失败: {e}")
  85. self.enabled = False
  86. def _init_s3_client(self):
  87. """初始化 S3 兼容客户端"""
  88. logger.info(f"[OSS] _init_s3_client 开始: boto3={BOTO3_AVAILABLE}, enabled={self.config.get('enabled', False)}")
  89. if not BOTO3_AVAILABLE:
  90. logger.error("[OSS] boto3 未安装")
  91. self.enabled = False
  92. return
  93. if not self.config.get('enabled', False):
  94. logger.error("[OSS] S3 兼容配置未启用")
  95. self.enabled = False
  96. return
  97. endpoint_url = self.config.get('endpoint_url', '')
  98. region_name = self.config.get('region_name', 'us-east-1')
  99. access_key_id = self.config.get('access_key_id', '')
  100. secret_access_key = self.config.get('secret_access_key', '')
  101. logger.info(f"[OSS] 配置检查: endpoint={endpoint_url}, key={access_key_id[:4] if access_key_id else 'None'}..., bucket={self.config.get('bucket_name', '')}")
  102. if not all([endpoint_url, access_key_id, secret_access_key]):
  103. logger.error("[OSS] S3 配置不完整")
  104. self.enabled = False
  105. return
  106. logger.info("[OSS] 尝试创建 S3 客户端...")
  107. self.s3_client = boto3.client(
  108. 's3',
  109. endpoint_url=endpoint_url,
  110. region_name=region_name,
  111. aws_access_key_id=access_key_id,
  112. aws_secret_access_key=secret_access_key,
  113. use_ssl=self.config.get('use_ssl', True),
  114. verify=False
  115. )
  116. logger.info(f"[OSS] S3 客户端初始化成功: {endpoint_url}")
  117. def start(self):
  118. """启动上传器"""
  119. logger.info(f"[OSS] start() 被调用: enabled={self.enabled}, running={self.running}")
  120. if not self.enabled:
  121. logger.info("[OSS] 上传器未启用,直接返回")
  122. return
  123. if self.running:
  124. logger.info("[OSS] 上传器已经在运行")
  125. return
  126. self.running = True
  127. self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
  128. self.worker_thread.start()
  129. logger.info("[OSS] 上传器已启动")
  130. def stop(self):
  131. """停止上传器"""
  132. self.running = False
  133. if self.worker_thread:
  134. self.worker_thread.join(timeout=5)
  135. logger.info("[OSS] 上传器已停止")
  136. def _worker_loop(self):
  137. """工作线程循环"""
  138. while self.running:
  139. try:
  140. task = self.upload_queue.get(timeout=1.0)
  141. self._process_upload(task)
  142. except queue.Empty:
  143. continue
  144. except Exception as e:
  145. logger.error(f"[OSS] 上传处理错误: {e}")
  146. def _process_upload(self, task: UploadTask):
  147. """处理单个上传任务"""
  148. result = self._upload_file(task)
  149. with self.stats_lock:
  150. self.stats['total_uploads'] += 1
  151. if result.success:
  152. self.stats['success_uploads'] += 1
  153. else:
  154. self.stats['failed_uploads'] += 1
  155. # 调用回调
  156. if task.callback:
  157. try:
  158. task.callback(result)
  159. except Exception as e:
  160. logger.error(f"[OSS] 回调执行错误: {e}")
  161. # 放入结果队列
  162. self.result_queue.put(result)
  163. def _upload_file(self, task: UploadTask) -> UploadResult:
  164. """
  165. 上传文件到 OSS
  166. Args:
  167. task: 上传任务
  168. Returns:
  169. UploadResult: 上传结果
  170. """
  171. local_path = task.local_path
  172. oss_key = task.oss_key
  173. if not os.path.exists(local_path):
  174. return UploadResult(
  175. success=False,
  176. local_path=local_path,
  177. oss_key=oss_key,
  178. oss_url='',
  179. error='文件不存在'
  180. )
  181. max_retries = self.config.get('retry_times', 3)
  182. for attempt in range(max_retries):
  183. try:
  184. if self.s3_client:
  185. # S3 兼容上传
  186. bucket_name = self.config.get('bucket_name', '')
  187. self.s3_client.upload_file(local_path, bucket_name, oss_key)
  188. # 生成 URL
  189. custom_domain = self.config.get('custom_domain', '')
  190. if custom_domain:
  191. oss_url = f"{custom_domain}/{oss_key}"
  192. else:
  193. endpoint_url = self.config.get('endpoint_url', '')
  194. oss_url = f"{endpoint_url}/{bucket_name}/{oss_key}"
  195. else:
  196. return UploadResult(
  197. success=False,
  198. local_path=local_path,
  199. oss_key=oss_key,
  200. oss_url='',
  201. error='OSS 客户端未初始化'
  202. )
  203. logger.info(f"[OSS] 上传成功: {local_path} -> {oss_key}")
  204. return UploadResult(
  205. success=True,
  206. local_path=local_path,
  207. oss_key=oss_key,
  208. oss_url=oss_url
  209. )
  210. except Exception as e:
  211. error_msg = str(e)
  212. logger.warning(f"[OSS] 上传失败 (尝试 {attempt + 1}/{max_retries}): {error_msg}")
  213. if attempt < max_retries - 1:
  214. time.sleep(1)
  215. else:
  216. return UploadResult(
  217. success=False,
  218. local_path=local_path,
  219. oss_key=oss_key,
  220. oss_url='',
  221. error=error_msg
  222. )
  223. return UploadResult(
  224. success=False,
  225. local_path=local_path,
  226. oss_key=oss_key,
  227. oss_url='',
  228. error='达到最大重试次数'
  229. )
  230. def upload_image(self, local_path: str, batch_id: str, image_type: str,
  231. person_index: Optional[int] = None,
  232. callback: Optional[callable] = None) -> str:
  233. """
  234. 上传图片(异步)
  235. Args:
  236. local_path: 本地图片路径
  237. batch_id: 批次ID
  238. image_type: 图片类型 ('panorama' 或 'ptz')
  239. person_index: 人员序号(仅用于 PTZ 图片)
  240. callback: 上传完成回调函数
  241. Returns:
  242. oss_key: OSS 对象键(用于后续查询结果)
  243. """
  244. if not self.enabled:
  245. logger.warning(f"[OSS] upload_image 返回空: enabled={self.enabled}")
  246. return ''
  247. logger.info(f"[OSS] upload_image 开始: path={local_path}, type={image_type}, batch={batch_id}")
  248. # 生成 OSS 路径
  249. timestamp = datetime.now().strftime("%Y%m%d")
  250. filename = os.path.basename(local_path)
  251. path_prefix = self.config.get('path_prefix', 'safety-system')
  252. if image_type == 'panorama':
  253. oss_key = f"{path_prefix}/{timestamp}/{batch_id}/panorama/{filename}"
  254. else:
  255. oss_key = f"{path_prefix}/{timestamp}/{batch_id}/ptz/{filename}"
  256. # 创建上传任务
  257. task = UploadTask(
  258. local_path=local_path,
  259. oss_key=oss_key,
  260. batch_id=batch_id,
  261. image_type=image_type,
  262. person_index=person_index,
  263. callback=callback
  264. )
  265. # 加入队列
  266. self.upload_queue.put(task)
  267. return oss_key
  268. def upload_image_sync(self, local_path: str, batch_id: str,
  269. image_type: str,
  270. person_index: Optional[int] = None) -> UploadResult:
  271. """
  272. 同步上传图片
  273. Args:
  274. local_path: 本地图片路径
  275. batch_id: 批次ID
  276. image_type: 图片类型
  277. person_index: 人员序号
  278. Returns:
  279. UploadResult: 上传结果
  280. """
  281. if not self.enabled:
  282. return UploadResult(
  283. success=False,
  284. local_path=local_path,
  285. oss_key='',
  286. oss_url='',
  287. error='OSS 未启用'
  288. )
  289. # 生成 OSS 路径
  290. timestamp = datetime.now().strftime("%Y%m%d")
  291. filename = os.path.basename(local_path)
  292. path_prefix = self.config.get('path_prefix', 'safety-system')
  293. if image_type == 'panorama':
  294. oss_key = f"{path_prefix}/{timestamp}/{batch_id}/panorama/{filename}"
  295. else:
  296. oss_key = f"{path_prefix}/{timestamp}/{batch_id}/ptz/{filename}"
  297. task = UploadTask(
  298. local_path=local_path,
  299. oss_key=oss_key,
  300. batch_id=batch_id,
  301. image_type=image_type,
  302. person_index=person_index
  303. )
  304. return self._upload_file(task)
  305. def get_upload_result(self, timeout: float = 0.1) -> Optional[UploadResult]:
  306. """
  307. 获取上传结果(非阻塞)
  308. Args:
  309. timeout: 等待超时时间
  310. Returns:
  311. UploadResult 或 None
  312. """
  313. try:
  314. return self.result_queue.get(timeout=timeout)
  315. except queue.Empty:
  316. return None
  317. def get_stats(self) -> Dict[str, int]:
  318. """获取统计信息"""
  319. with self.stats_lock:
  320. return self.stats.copy()
  321. def is_enabled(self) -> bool:
  322. """检查是否启用"""
  323. return self.enabled
  324. # 全局单例
  325. _oss_uploader_instance: Optional[OSSUploader] = None
  326. def get_oss_uploader(config: Dict[str, Any] = None) -> OSSUploader:
  327. """
  328. 获取 OSS 上传器实例(单例模式)
  329. Args:
  330. config: OSS 配置
  331. Returns:
  332. OSSUploader 实例
  333. """
  334. global _oss_uploader_instance
  335. if _oss_uploader_instance is None:
  336. _oss_uploader_instance = OSSUploader(config)
  337. return _oss_uploader_instance
  338. def reset_oss_uploader():
  339. """重置 OSS 上传器实例"""
  340. global _oss_uploader_instance
  341. if _oss_uploader_instance is not None:
  342. _oss_uploader_instance.stop()
  343. _oss_uploader_instance = None