| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- """兼容 S3 的对象存储上传模块."""
- import logging
- import os
- import time
- import uuid
- from pathlib import Path
- from typing import Optional
- logger = logging.getLogger(__name__)
- class OSSUploader:
- """基于 boto3 的 S3 兼容对象存储上传器."""
- def __init__(self, config: Optional[dict] = None):
- from config.oss import S3_COMPATIBLE_CONFIG
- self.config = config or S3_COMPATIBLE_CONFIG
- self.enabled = self.config.get("enabled", False)
- self.client = None
- self.bucket = self.config.get("bucket_name", "")
- self.path_prefix = self.config.get("path_prefix", "device")
- self.custom_domain = self.config.get("custom_domain", "")
- self.endpoint_url = self.config.get("endpoint_url", "")
- self.retry_times = self.config.get("retry_times", 3)
- self.upload_timeout = self.config.get("upload_timeout", 30)
- if self.enabled:
- try:
- import boto3
- from botocore.config import Config
- s3_config = Config(
- s3={"addressing_style": "path"},
- signature_version="s3v4",
- retries={"max_attempts": self.retry_times, "mode": "standard"},
- )
- self.client = boto3.client(
- "s3",
- endpoint_url=self.endpoint_url,
- region_name=self.config.get("region_name", "us-east-1"),
- aws_access_key_id=self.config.get("access_key_id", ""),
- aws_secret_access_key=self.config.get("secret_access_key", ""),
- config=s3_config,
- verify=False,
- )
- logger.info("[OSS] 上传器初始化完成: %s/%s", self.endpoint_url, self.bucket)
- except Exception as exc:
- logger.error("[OSS] 初始化失败: %s", exc)
- self.enabled = False
- def _object_key(self, local_path: str, suffix: str = "") -> str:
- """生成对象存储 key."""
- filename = Path(local_path).name
- if suffix:
- stem = Path(filename).stem
- ext = Path(filename).suffix
- filename = f"{stem}_{suffix}{ext}"
- date_dir = time.strftime("%Y%m%d")
- unique = uuid.uuid4().hex[:8]
- return f"{self.path_prefix}/{date_dir}/{unique}_{filename}"
- def _make_url(self, key: str) -> str:
- """根据 endpoint 或自定义域名生成访问 URL."""
- if self.custom_domain:
- base = self.custom_domain.rstrip("/")
- return f"{base}/{key}"
- base = self.endpoint_url.rstrip("/")
- return f"{base}/{self.bucket}/{key}"
- def upload(self, local_path: str, suffix: str = "") -> Optional[str]:
- """上传单个文件到 OSS,返回可访问 URL."""
- if not self.enabled or self.client is None:
- return None
- if not os.path.isfile(local_path):
- logger.warning("[OSS] 文件不存在: %s", local_path)
- return None
- key = self._object_key(local_path, suffix)
- for attempt in range(1, self.retry_times + 1):
- try:
- self.client.upload_file(
- local_path,
- self.bucket,
- key,
- ExtraArgs={"ContentType": "image/jpeg"},
- )
- url = self._make_url(key)
- logger.info("[OSS] 上传成功: %s -> %s", local_path, url)
- return url
- except Exception as exc:
- logger.warning("[OSS] 上传失败 (尝试 %d/%d): %s - %s", attempt, self.retry_times, local_path, exc)
- if attempt < self.retry_times:
- time.sleep(0.5 * attempt)
- logger.error("[OSS] 上传最终失败: %s", local_path)
- return None
- def upload_pair(self, original_path: str, marked_path: str, prefix: str = "") -> dict:
- """上传原图和标注图,返回 URL 字典."""
- orig_suffix = f"original_{prefix}" if prefix else "original"
- mark_suffix = f"marked_{prefix}" if prefix else "marked"
- return {
- "original": self.upload(original_path, suffix=orig_suffix),
- "marked": self.upload(marked_path, suffix=mark_suffix),
- }
|