| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- """
- OSS 上传模块
- 支持兼容 S3 的对象存储(MinIO、AWS S3、阿里云 OSS 等)
- """
- import os
- import time
- import json
- import logging
- import threading
- import queue
- from typing import Optional, Dict, Any, Tuple, List
- from pathlib import Path
- from datetime import datetime
- from dataclasses import dataclass, field
- import cv2
- import numpy as np
- logger = logging.getLogger(__name__)
- # 尝试导入 boto3
- try:
- import boto3
- from botocore.exceptions import ClientError
- BOTO3_AVAILABLE = True
- except ImportError:
- BOTO3_AVAILABLE = False
- logger.warning("boto3 未安装,S3 兼容存储功能不可用")
- @dataclass
- class UploadTask:
- """上传任务"""
- local_path: str
- oss_key: str
- batch_id: str
- image_type: str # 'panorama' 或 'ptz'
- person_index: Optional[int] = None # 仅用于 PTZ 图片
- retry_count: int = 0
- callback: Optional[callable] = None
- @dataclass
- class UploadResult:
- """上传结果"""
- success: bool
- local_path: str
- oss_key: str
- oss_url: str
- error: Optional[str] = None
- class OSSUploader:
- """
- OSS 上传器
- 支持兼容 S3 的对象存储(MinIO、AWS S3、阿里云 OSS 等)
- """
-
- def __init__(self, config: Dict[str, Any] = None):
- """
- 初始化 OSS 上传器
-
- Args:
- config: OSS 配置字典
- """
- from config import S3_COMPATIBLE_CONFIG
-
- self.config = config or S3_COMPATIBLE_CONFIG
-
- self.enabled = self.config.get('enabled', False)
-
- # S3 客户端
- self.s3_client = None
-
- # 上传队列
- self.upload_queue = queue.Queue()
- self.result_queue = queue.Queue()
-
- # 工作线程
- self.running = False
- self.worker_thread = None
-
- # 统计
- self.stats = {
- 'total_uploads': 0,
- 'success_uploads': 0,
- 'failed_uploads': 0,
- }
- self.stats_lock = threading.Lock()
-
- # 初始化客户端
- if self.enabled:
- self._init_client()
-
- def _init_client(self):
- """初始化 OSS 客户端"""
- try:
- self._init_s3_client()
- except Exception as e:
- logger.error(f"[OSS] 初始化客户端失败: {e}")
- self.enabled = False
-
- def _init_s3_client(self):
- """初始化 S3 兼容客户端"""
- if not BOTO3_AVAILABLE:
- logger.error("[OSS] boto3 未安装")
- self.enabled = False
- return
-
- if not self.config.get('enabled', False):
- logger.error("[OSS] S3 兼容配置未启用")
- self.enabled = False
- return
-
- endpoint_url = self.config.get('endpoint_url', '')
- region_name = self.config.get('region_name', 'us-east-1')
- access_key_id = self.config.get('access_key_id', '')
- secret_access_key = self.config.get('secret_access_key', '')
-
- if not all([endpoint_url, access_key_id, secret_access_key]):
- logger.error("[OSS] S3 配置不完整")
- self.enabled = False
- return
-
- self.s3_client = boto3.client(
- 's3',
- endpoint_url=endpoint_url,
- region_name=region_name,
- aws_access_key_id=access_key_id,
- aws_secret_access_key=secret_access_key,
- use_ssl=self.config.get('use_ssl', True),
- verify=False
- )
-
- logger.info(f"[OSS] S3 客户端初始化成功: {endpoint_url}")
-
- def start(self):
- """启动上传器"""
- if not self.enabled:
- logger.info("[OSS] 上传器未启用")
- return
-
- if self.running:
- return
-
- self.running = True
- self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
- self.worker_thread.start()
- logger.info("[OSS] 上传器已启动")
-
- def stop(self):
- """停止上传器"""
- self.running = False
- if self.worker_thread:
- self.worker_thread.join(timeout=5)
- logger.info("[OSS] 上传器已停止")
-
- def _worker_loop(self):
- """工作线程循环"""
- while self.running:
- try:
- task = self.upload_queue.get(timeout=1.0)
- self._process_upload(task)
- except queue.Empty:
- continue
- except Exception as e:
- logger.error(f"[OSS] 上传处理错误: {e}")
-
- def _process_upload(self, task: UploadTask):
- """处理单个上传任务"""
- result = self._upload_file(task)
-
- with self.stats_lock:
- self.stats['total_uploads'] += 1
- if result.success:
- self.stats['success_uploads'] += 1
- else:
- self.stats['failed_uploads'] += 1
-
- # 调用回调
- if task.callback:
- try:
- task.callback(result)
- except Exception as e:
- logger.error(f"[OSS] 回调执行错误: {e}")
-
- # 放入结果队列
- self.result_queue.put(result)
-
- def _upload_file(self, task: UploadTask) -> UploadResult:
- """
- 上传文件到 OSS
-
- Args:
- task: 上传任务
-
- Returns:
- UploadResult: 上传结果
- """
- local_path = task.local_path
- oss_key = task.oss_key
-
- if not os.path.exists(local_path):
- return UploadResult(
- success=False,
- local_path=local_path,
- oss_key=oss_key,
- oss_url='',
- error='文件不存在'
- )
-
- max_retries = self.config.get('retry_times', 3)
-
- for attempt in range(max_retries):
- try:
- if self.s3_client:
- # S3 兼容上传
- bucket_name = self.config.get('bucket_name', '')
- self.s3_client.upload_file(local_path, bucket_name, oss_key)
-
- # 生成 URL
- custom_domain = self.config.get('custom_domain', '')
- if custom_domain:
- oss_url = f"{custom_domain}/{oss_key}"
- else:
- endpoint_url = self.config.get('endpoint_url', '')
- oss_url = f"{endpoint_url}/{bucket_name}/{oss_key}"
- else:
- return UploadResult(
- success=False,
- local_path=local_path,
- oss_key=oss_key,
- oss_url='',
- error='OSS 客户端未初始化'
- )
-
- logger.info(f"[OSS] 上传成功: {local_path} -> {oss_key}")
-
- return UploadResult(
- success=True,
- local_path=local_path,
- oss_key=oss_key,
- oss_url=oss_url
- )
-
- except Exception as e:
- error_msg = str(e)
- logger.warning(f"[OSS] 上传失败 (尝试 {attempt + 1}/{max_retries}): {error_msg}")
-
- if attempt < max_retries - 1:
- time.sleep(1)
- else:
- return UploadResult(
- success=False,
- local_path=local_path,
- oss_key=oss_key,
- oss_url='',
- error=error_msg
- )
-
- return UploadResult(
- success=False,
- local_path=local_path,
- oss_key=oss_key,
- oss_url='',
- error='达到最大重试次数'
- )
-
- def upload_image(self, local_path: str, batch_id: str, image_type: str,
- person_index: Optional[int] = None,
- callback: Optional[callable] = None) -> str:
- """
- 上传图片(异步)
-
- Args:
- local_path: 本地图片路径
- batch_id: 批次ID
- image_type: 图片类型 ('panorama' 或 'ptz')
- person_index: 人员序号(仅用于 PTZ 图片)
- callback: 上传完成回调函数
-
- Returns:
- oss_key: OSS 对象键(用于后续查询结果)
- """
- if not self.enabled:
- return ''
-
- # 生成 OSS 路径
- timestamp = datetime.now().strftime("%Y%m%d")
- filename = os.path.basename(local_path)
- path_prefix = self.config.get('path_prefix', 'safety-system')
-
- if image_type == 'panorama':
- oss_key = f"{path_prefix}/{timestamp}/{batch_id}/panorama/{filename}"
- else:
- oss_key = f"{path_prefix}/{timestamp}/{batch_id}/ptz/{filename}"
-
- # 创建上传任务
- task = UploadTask(
- local_path=local_path,
- oss_key=oss_key,
- batch_id=batch_id,
- image_type=image_type,
- person_index=person_index,
- callback=callback
- )
-
- # 加入队列
- self.upload_queue.put(task)
-
- return oss_key
-
- def upload_image_sync(self, local_path: str, batch_id: str,
- image_type: str,
- person_index: Optional[int] = None) -> UploadResult:
- """
- 同步上传图片
-
- Args:
- local_path: 本地图片路径
- batch_id: 批次ID
- image_type: 图片类型
- person_index: 人员序号
-
- Returns:
- UploadResult: 上传结果
- """
- if not self.enabled:
- return UploadResult(
- success=False,
- local_path=local_path,
- oss_key='',
- oss_url='',
- error='OSS 未启用'
- )
-
- # 生成 OSS 路径
- timestamp = datetime.now().strftime("%Y%m%d")
- filename = os.path.basename(local_path)
- path_prefix = self.config.get('path_prefix', 'safety-system')
-
- if image_type == 'panorama':
- oss_key = f"{path_prefix}/{timestamp}/{batch_id}/panorama/{filename}"
- else:
- oss_key = f"{path_prefix}/{timestamp}/{batch_id}/ptz/{filename}"
-
- task = UploadTask(
- local_path=local_path,
- oss_key=oss_key,
- batch_id=batch_id,
- image_type=image_type,
- person_index=person_index
- )
-
- return self._upload_file(task)
-
- def get_upload_result(self, timeout: float = 0.1) -> Optional[UploadResult]:
- """
- 获取上传结果(非阻塞)
-
- Args:
- timeout: 等待超时时间
-
- Returns:
- UploadResult 或 None
- """
- try:
- return self.result_queue.get(timeout=timeout)
- except queue.Empty:
- return None
-
- def get_stats(self) -> Dict[str, int]:
- """获取统计信息"""
- with self.stats_lock:
- return self.stats.copy()
-
- def is_enabled(self) -> bool:
- """检查是否启用"""
- return self.enabled
- # 全局单例
- _oss_uploader_instance: Optional[OSSUploader] = None
- def get_oss_uploader(config: Dict[str, Any] = None) -> OSSUploader:
- """
- 获取 OSS 上传器实例(单例模式)
-
- Args:
- config: OSS 配置
-
- Returns:
- OSSUploader 实例
- """
- global _oss_uploader_instance
-
- if _oss_uploader_instance is None:
- _oss_uploader_instance = OSSUploader(config)
-
- return _oss_uploader_instance
- def reset_oss_uploader():
- """重置 OSS 上传器实例"""
- global _oss_uploader_instance
- if _oss_uploader_instance is not None:
- _oss_uploader_instance.stop()
- _oss_uploader_instance = None
|