oss_uploader.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. """兼容 S3 的对象存储上传模块."""
  2. import logging
  3. import os
  4. import time
  5. import uuid
  6. from pathlib import Path
  7. from typing import Optional
  8. logger = logging.getLogger(__name__)
  9. class OSSUploader:
  10. """基于 boto3 的 S3 兼容对象存储上传器."""
  11. def __init__(self, config: Optional[dict] = None):
  12. from config.oss import S3_COMPATIBLE_CONFIG
  13. self.config = config or S3_COMPATIBLE_CONFIG
  14. self.enabled = self.config.get("enabled", False)
  15. self.client = None
  16. self.bucket = self.config.get("bucket_name", "")
  17. self.path_prefix = self.config.get("path_prefix", "device")
  18. self.custom_domain = self.config.get("custom_domain", "")
  19. self.endpoint_url = self.config.get("endpoint_url", "")
  20. self.retry_times = self.config.get("retry_times", 3)
  21. self.upload_timeout = self.config.get("upload_timeout", 30)
  22. if self.enabled:
  23. try:
  24. import boto3
  25. from botocore.config import Config
  26. s3_config = Config(
  27. s3={"addressing_style": "path"},
  28. signature_version="s3v4",
  29. retries={"max_attempts": self.retry_times, "mode": "standard"},
  30. )
  31. self.client = boto3.client(
  32. "s3",
  33. endpoint_url=self.endpoint_url,
  34. region_name=self.config.get("region_name", "us-east-1"),
  35. aws_access_key_id=self.config.get("access_key_id", ""),
  36. aws_secret_access_key=self.config.get("secret_access_key", ""),
  37. config=s3_config,
  38. verify=False,
  39. )
  40. logger.info("[OSS] 上传器初始化完成: %s/%s", self.endpoint_url, self.bucket)
  41. except Exception as exc:
  42. logger.error("[OSS] 初始化失败: %s", exc)
  43. self.enabled = False
  44. def _object_key(self, local_path: str, suffix: str = "") -> str:
  45. """生成对象存储 key."""
  46. filename = Path(local_path).name
  47. if suffix:
  48. stem = Path(filename).stem
  49. ext = Path(filename).suffix
  50. filename = f"{stem}_{suffix}{ext}"
  51. date_dir = time.strftime("%Y%m%d")
  52. unique = uuid.uuid4().hex[:8]
  53. return f"{self.path_prefix}/{date_dir}/{unique}_{filename}"
  54. def _make_url(self, key: str) -> str:
  55. """根据 endpoint 或自定义域名生成访问 URL."""
  56. if self.custom_domain:
  57. base = self.custom_domain.rstrip("/")
  58. return f"{base}/{key}"
  59. base = self.endpoint_url.rstrip("/")
  60. return f"{base}/{self.bucket}/{key}"
  61. def upload(self, local_path: str, suffix: str = "") -> Optional[str]:
  62. """上传单个文件到 OSS,返回可访问 URL."""
  63. if not self.enabled or self.client is None:
  64. return None
  65. if not os.path.isfile(local_path):
  66. logger.warning("[OSS] 文件不存在: %s", local_path)
  67. return None
  68. key = self._object_key(local_path, suffix)
  69. for attempt in range(1, self.retry_times + 1):
  70. try:
  71. self.client.upload_file(
  72. local_path,
  73. self.bucket,
  74. key,
  75. ExtraArgs={"ContentType": "image/jpeg"},
  76. )
  77. url = self._make_url(key)
  78. logger.info("[OSS] 上传成功: %s -> %s", local_path, url)
  79. return url
  80. except Exception as exc:
  81. logger.warning("[OSS] 上传失败 (尝试 %d/%d): %s - %s", attempt, self.retry_times, local_path, exc)
  82. if attempt < self.retry_times:
  83. time.sleep(0.5 * attempt)
  84. logger.error("[OSS] 上传最终失败: %s", local_path)
  85. return None
  86. def upload_pair(self, original_path: str, marked_path: str, prefix: str = "") -> dict:
  87. """上传原图和标注图,返回 URL 字典."""
  88. orig_suffix = f"original_{prefix}" if prefix else "original"
  89. mark_suffix = f"marked_{prefix}" if prefix else "marked"
  90. return {
  91. "original": self.upload(original_path, suffix=orig_suffix),
  92. "marked": self.upload(marked_path, suffix=mark_suffix),
  93. }