""" OSS 上传模块 支持兼容 S3 的对象存储(MinIO、AWS S3、阿里云 OSS 等) """ import os import time import json import logging import threading import queue from typing import Optional, Dict, Any, Tuple, List from pathlib import Path from datetime import datetime from dataclasses import dataclass, field import cv2 import numpy as np logger = logging.getLogger(__name__) # 尝试导入 boto3 try: import boto3 from botocore.exceptions import ClientError BOTO3_AVAILABLE = True except ImportError: BOTO3_AVAILABLE = False logger.warning("boto3 未安装,S3 兼容存储功能不可用") @dataclass class UploadTask: """上传任务""" local_path: str oss_key: str batch_id: str image_type: str # 'panorama' 或 'ptz' person_index: Optional[int] = None # 仅用于 PTZ 图片 retry_count: int = 0 callback: Optional[callable] = None @dataclass class UploadResult: """上传结果""" success: bool local_path: str oss_key: str oss_url: str error: Optional[str] = None class OSSUploader: """ OSS 上传器 支持兼容 S3 的对象存储(MinIO、AWS S3、阿里云 OSS 等) """ def __init__(self, config: Dict[str, Any] = None): """ 初始化 OSS 上传器 Args: config: OSS 配置字典 """ from config import S3_COMPATIBLE_CONFIG self.config = config or S3_COMPATIBLE_CONFIG self.enabled = self.config.get('enabled', False) logger.info(f"[OSS] OSSUploader 初始化: enabled={self.enabled}, config={self.config}") # S3 客户端 self.s3_client = None # 上传队列 self.upload_queue = queue.Queue() self.result_queue = queue.Queue() # 工作线程 self.running = False self.worker_thread = None # 统计 self.stats = { 'total_uploads': 0, 'success_uploads': 0, 'failed_uploads': 0, } self.stats_lock = threading.Lock() # 初始化客户端 if self.enabled: self._init_client() def _init_client(self): """初始化 OSS 客户端""" logger.info("[OSS] _init_client 开始...") try: self._init_s3_client() logger.info(f"[OSS] _init_client 完成: enabled={self.enabled}") except Exception as e: logger.error(f"[OSS] 初始化客户端失败: {e}") self.enabled = False def _init_s3_client(self): """初始化 S3 兼容客户端""" logger.info(f"[OSS] _init_s3_client 开始: boto3={BOTO3_AVAILABLE}, enabled={self.config.get('enabled', False)}") if not BOTO3_AVAILABLE: logger.error("[OSS] boto3 未安装") self.enabled = False return if not self.config.get('enabled', False): logger.error("[OSS] S3 兼容配置未启用") self.enabled = False return endpoint_url = self.config.get('endpoint_url', '') region_name = self.config.get('region_name', 'us-east-1') access_key_id = self.config.get('access_key_id', '') secret_access_key = self.config.get('secret_access_key', '') logger.info(f"[OSS] 配置检查: endpoint={endpoint_url}, key={access_key_id[:4] if access_key_id else 'None'}..., bucket={self.config.get('bucket_name', '')}") if not all([endpoint_url, access_key_id, secret_access_key]): logger.error("[OSS] S3 配置不完整") self.enabled = False return logger.info("[OSS] 尝试创建 S3 客户端...") self.s3_client = boto3.client( 's3', endpoint_url=endpoint_url, region_name=region_name, aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key, use_ssl=self.config.get('use_ssl', True), verify=False ) logger.info(f"[OSS] S3 客户端初始化成功: {endpoint_url}") def start(self): """启动上传器""" logger.info(f"[OSS] start() 被调用: enabled={self.enabled}, running={self.running}") if not self.enabled: logger.info("[OSS] 上传器未启用,直接返回") return if self.running: logger.info("[OSS] 上传器已经在运行") return self.running = True self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True) self.worker_thread.start() logger.info("[OSS] 上传器已启动") def stop(self): """停止上传器""" self.running = False if self.worker_thread: self.worker_thread.join(timeout=5) logger.info("[OSS] 上传器已停止") def _worker_loop(self): """工作线程循环""" while self.running: try: task = self.upload_queue.get(timeout=1.0) self._process_upload(task) except queue.Empty: continue except Exception as e: logger.error(f"[OSS] 上传处理错误: {e}") def _process_upload(self, task: UploadTask): """处理单个上传任务""" result = self._upload_file(task) with self.stats_lock: self.stats['total_uploads'] += 1 if result.success: self.stats['success_uploads'] += 1 else: self.stats['failed_uploads'] += 1 # 调用回调 if task.callback: try: task.callback(result) except Exception as e: logger.error(f"[OSS] 回调执行错误: {e}") # 放入结果队列 self.result_queue.put(result) def _upload_file(self, task: UploadTask) -> UploadResult: """ 上传文件到 OSS Args: task: 上传任务 Returns: UploadResult: 上传结果 """ local_path = task.local_path oss_key = task.oss_key if not os.path.exists(local_path): return UploadResult( success=False, local_path=local_path, oss_key=oss_key, oss_url='', error='文件不存在' ) max_retries = self.config.get('retry_times', 3) for attempt in range(max_retries): try: if self.s3_client: # S3 兼容上传 bucket_name = self.config.get('bucket_name', '') self.s3_client.upload_file(local_path, bucket_name, oss_key) # 生成 URL custom_domain = self.config.get('custom_domain', '') if custom_domain: oss_url = f"{custom_domain}/{oss_key}" else: endpoint_url = self.config.get('endpoint_url', '') oss_url = f"{endpoint_url}/{bucket_name}/{oss_key}" else: return UploadResult( success=False, local_path=local_path, oss_key=oss_key, oss_url='', error='OSS 客户端未初始化' ) logger.info(f"[OSS] 上传成功: {local_path} -> {oss_key}") return UploadResult( success=True, local_path=local_path, oss_key=oss_key, oss_url=oss_url ) except Exception as e: error_msg = str(e) logger.warning(f"[OSS] 上传失败 (尝试 {attempt + 1}/{max_retries}): {error_msg}") if attempt < max_retries - 1: time.sleep(1) else: return UploadResult( success=False, local_path=local_path, oss_key=oss_key, oss_url='', error=error_msg ) return UploadResult( success=False, local_path=local_path, oss_key=oss_key, oss_url='', error='达到最大重试次数' ) def upload_image(self, local_path: str, batch_id: str, image_type: str, person_index: Optional[int] = None, callback: Optional[callable] = None) -> str: """ 上传图片(异步) Args: local_path: 本地图片路径 batch_id: 批次ID image_type: 图片类型 ('panorama' 或 'ptz') person_index: 人员序号(仅用于 PTZ 图片) callback: 上传完成回调函数 Returns: oss_key: OSS 对象键(用于后续查询结果) """ if not self.enabled: logger.warning(f"[OSS] upload_image 返回空: enabled={self.enabled}") return '' logger.info(f"[OSS] upload_image 开始: path={local_path}, type={image_type}, batch={batch_id}") # 生成 OSS 路径 timestamp = datetime.now().strftime("%Y%m%d") filename = os.path.basename(local_path) path_prefix = self.config.get('path_prefix', 'safety-system') if image_type == 'panorama': oss_key = f"{path_prefix}/{timestamp}/{batch_id}/panorama/{filename}" else: oss_key = f"{path_prefix}/{timestamp}/{batch_id}/ptz/{filename}" # 创建上传任务 task = UploadTask( local_path=local_path, oss_key=oss_key, batch_id=batch_id, image_type=image_type, person_index=person_index, callback=callback ) # 加入队列 self.upload_queue.put(task) return oss_key def upload_image_sync(self, local_path: str, batch_id: str, image_type: str, person_index: Optional[int] = None) -> UploadResult: """ 同步上传图片 Args: local_path: 本地图片路径 batch_id: 批次ID image_type: 图片类型 person_index: 人员序号 Returns: UploadResult: 上传结果 """ if not self.enabled: return UploadResult( success=False, local_path=local_path, oss_key='', oss_url='', error='OSS 未启用' ) # 生成 OSS 路径 timestamp = datetime.now().strftime("%Y%m%d") filename = os.path.basename(local_path) path_prefix = self.config.get('path_prefix', 'safety-system') if image_type == 'panorama': oss_key = f"{path_prefix}/{timestamp}/{batch_id}/panorama/{filename}" else: oss_key = f"{path_prefix}/{timestamp}/{batch_id}/ptz/{filename}" task = UploadTask( local_path=local_path, oss_key=oss_key, batch_id=batch_id, image_type=image_type, person_index=person_index ) return self._upload_file(task) def get_upload_result(self, timeout: float = 0.1) -> Optional[UploadResult]: """ 获取上传结果(非阻塞) Args: timeout: 等待超时时间 Returns: UploadResult 或 None """ try: return self.result_queue.get(timeout=timeout) except queue.Empty: return None def get_stats(self) -> Dict[str, int]: """获取统计信息""" with self.stats_lock: return self.stats.copy() def is_enabled(self) -> bool: """检查是否启用""" return self.enabled # 全局单例 _oss_uploader_instance: Optional[OSSUploader] = None _oss_uploader_lock = threading.Lock() def get_oss_uploader(config: Dict[str, Any] = None) -> OSSUploader: """ 获取 OSS 上传器实例(单例模式,线程安全) Args: config: OSS 配置 Returns: OSSUploader 实例 """ global _oss_uploader_instance if _oss_uploader_instance is None: with _oss_uploader_lock: if _oss_uploader_instance is None: _oss_uploader_instance = OSSUploader(config) return _oss_uploader_instance def reset_oss_uploader(): """重置 OSS 上传器实例""" global _oss_uploader_instance with _oss_uploader_lock: if _oss_uploader_instance is not None: _oss_uploader_instance.stop() _oss_uploader_instance = None