Просмотр исходного кода

refactor(camera_group): 优化配对图片保存器并集成OSS和第三方推送

- 使用单例模式获取配对图片保存器,保证资源共享
- 集成S3兼容的OSS上传功能,支持远程图片存储
- 启用第三方平台推送功能,实现配对图片自动上报
- 提升配对图片保存组件的灵活性与扩展性
- 停止摄像头组时清理全局OSS上传器和第三方推送器资源
wenhongquan 3 дней назад
Родитель
Сommit
df654f6d1b

BIN
dual_camera_system/__pycache__/calibration.cpython-310.pyc


BIN
dual_camera_system/__pycache__/camera_group.cpython-310.pyc


BIN
dual_camera_system/__pycache__/coordinator.cpython-310.pyc


BIN
dual_camera_system/__pycache__/oss_uploader.cpython-310.pyc


BIN
dual_camera_system/__pycache__/paired_image_saver.cpython-310.pyc


BIN
dual_camera_system/__pycache__/panorama_camera.cpython-310.pyc


BIN
dual_camera_system/__pycache__/ptz_person_tracker.cpython-310.pyc


BIN
dual_camera_system/__pycache__/third_party_pusher.cpython-310.pyc


+ 42 - 4
dual_camera_system/camera_group.py

@@ -16,7 +16,10 @@ from ptz_camera import PTZCamera
 from ocr_recognizer import NumberDetector, PersonInfo
 from coordinator import SequentialCoordinator
 from calibration import CameraCalibrator, CalibrationManager
-from paired_image_saver import PairedImageSaver
+from paired_image_saver import PairedImageSaver, get_paired_saver
+from oss_uploader import get_oss_uploader
+from third_party_pusher import get_third_party_pusher
+from config import DEVICE_CONFIG, S3_COMPATIBLE_CONFIG, THIRD_PARTY_CONFIG
 
 logger = logging.getLogger(__name__)
 
@@ -120,12 +123,38 @@ class CameraGroup:
         ptz_config['group_id'] = self.group_id
         self.ptz_camera = PTZCamera(self.sdk, ptz_config)
         
-        # 3. 初始化配对图片保存器
+        # 3. 初始化配对图片保存器(使用单例模式,支持 OSS)
         try:
-            self.paired_saver = PairedImageSaver(
+            # 获取 OSS 上传器
+            oss_uploader = None
+            enable_oss = S3_COMPATIBLE_CONFIG.get('enabled', False)
+            if enable_oss:
+                oss_uploader = get_oss_uploader()
+                if not oss_uploader.running:
+                    oss_uploader.start()
+                logger.info(f"[{self.group_id}] OSS 上传已启用")
+            
+            # 创建设备配置(包含 group_id)
+            device_config = DEVICE_CONFIG.copy()
+            device_config['group_id'] = self.group_id
+            
+            # 使用单例获取配对保存器
+            self.paired_saver = get_paired_saver(
                 base_dir=self.paired_image_dir,
-                time_window=5.0
+                time_window=5.0,
+                enable_oss=enable_oss,
+                oss_uploader=oss_uploader,
+                device_config=device_config
             )
+            
+            # 设置第三方平台推送回调
+            if THIRD_PARTY_CONFIG.get('enabled', False):
+                pusher = get_third_party_pusher()
+                if not pusher.running:
+                    pusher.start()
+                self.paired_saver.set_upload_callback(pusher.report_batch)
+                logger.info(f"[{self.group_id}] 第三方平台推送已启用")
+            
             logger.info(f"[{self.group_id}] 配对图片保存器初始化成功: {self.paired_image_dir}")
         except Exception as e:
             logger.warning(f"[{self.group_id}] 配对图片保存器初始化失败: {e}")
@@ -317,6 +346,15 @@ class CameraGroup:
         if self.paired_saver:
             self.paired_saver.close()
         
+        # 停止 OSS 上传器和第三方推送器(只在最后一个组停止时执行)
+        try:
+            from oss_uploader import reset_oss_uploader
+            from third_party_pusher import reset_third_party_pusher
+            reset_oss_uploader()
+            reset_third_party_pusher()
+        except Exception as e:
+            logger.debug(f"[{self.group_id}] 清理全局上传器: {e}")
+        
         logger.info(f"[{self.group_id}] 摄像头组已停止")
     
     def get_status(self) -> Dict[str, Any]:

+ 2 - 2
dual_camera_system/config/__init__.py

@@ -17,7 +17,7 @@ from .event import EVENT_PUSHER_CONFIG, EVENT_LISTENER_CONFIG
 from .voice import TTS_CONFIG, AUDIO_PLAYER_CONFIG, VOICE_ANNOUNCER_CONFIG
 from .llm import LLM_CONFIG, LLM_SAFETY_CONFIG
 from .system import SYSTEM_CONFIG
-from .oss import OSS_CONFIG, S3_COMPATIBLE_CONFIG
+from .oss import S3_COMPATIBLE_CONFIG
 from .device import (
     DEVICE_CONFIG, THIRD_PARTY_CONFIG, BATCH_REPORT_CONFIG
 )
@@ -46,7 +46,7 @@ __all__ = [
     # 系统
     'SYSTEM_CONFIG',
     # OSS
-    'OSS_CONFIG', 'S3_COMPATIBLE_CONFIG',
+    'S3_COMPATIBLE_CONFIG',
     # 设备与第三方平台
     'DEVICE_CONFIG', 'THIRD_PARTY_CONFIG', 'BATCH_REPORT_CONFIG',
 ]

BIN
dual_camera_system/config/__pycache__/__init__.cpython-310.pyc


BIN
dual_camera_system/config/__pycache__/device.cpython-310.pyc


BIN
dual_camera_system/config/__pycache__/oss.cpython-310.pyc


+ 7 - 32
dual_camera_system/config/oss.py

@@ -1,40 +1,15 @@
 """
 OSS 配置
-阿里云 OSS 或其他兼容 S3 的对象存储配置
+兼容 S3 的对象存储配置(MinIO、AWS S3、阿里云 OSS 等)
 """
 
-# OSS 配置
-OSS_CONFIG = {
-    'enabled': False,  # 是否启用 OSS 上传
-    
-    # 阿里云 OSS 配置
-    'provider': 'custom',  # 可选: 'aliyun', 'minio', 'aws', 'custom'
-    
-    # 访问密钥
-    'access_key_id': '',
-    'access_key_secret': '',
-    
-    # 存储桶配置
-    'bucket_name': '',
-    'endpoint': '',  # 如: oss-cn-hangzhou.aliyuncs.com
-    
-    # 自定义域名(可选,用于生成访问 URL)
-    'custom_domain': '',  # 如: https://cdn.example.com
-    
-    # 上传配置
-    'upload_timeout': 30,  # 上传超时时间(秒)
-    'retry_times': 3,      # 重试次数
-    
-    # 路径前缀
-    'path_prefix': 'device',  # OSS 上的路径前缀
-    
-    # 图片访问权限
-    'acl': 'public-read',  # 可选: 'private', 'public-read'
-}
-
-# 兼容 S3 的 OSS 配置(如 MinIO、AWS S3)
+# 兼容 S3 的 OSS 配置(如 MinIO、AWS S3、阿里云 OSS)
 S3_COMPATIBLE_CONFIG = {
-    'enabled': True,
+    'enabled': True,           # 是否启用 OSS 上传
+    'upload_timeout': 30,       # 上传超时时间(秒)
+    'retry_times': 3,           # 重试次数
+    'custom_domain': '',        # 自定义域名(可选,用于生成访问 URL)
+
     'endpoint_url': 'https://oss.dnnbuild.com',  # 如: http://localhost:9000
     'region_name': 'us-east-1',
     'access_key_id': 'wvp',

+ 11 - 1
dual_camera_system/coordinator.py

@@ -1936,11 +1936,17 @@ class SequentialCoordinator(AsyncCoordinator):
         success = self.ptz.goto_exact_position(pan, tilt, zoom)
         
         if success:
-            # 等待球机稳定
+            # 等待球机物理移动到位
             stabilize_time = self._capture_config['ptz_stabilize_time']
             logger.info(f"[顺序模式] 等待球机稳定 {stabilize_time}s...")
             time.sleep(stabilize_time)
             
+            # 【关键修复】清空RTSP缓冲区,确保获取的是新位置的帧
+            logger.debug("[顺序模式] 清空RTSP缓冲区...")
+            for _ in range(5):
+                self.ptz.get_frame()
+                time.sleep(0.05)
+            
             # 自动变焦(如果启用)
             final_zoom = zoom
             if self.enable_ptz_detection and self.auto_zoom_config.get('enabled', False):
@@ -1951,6 +1957,10 @@ class SequentialCoordinator(AsyncCoordinator):
                     auto_zoom_wait = self._capture_config.get('auto_zoom_wait_time', 1.0)
                     logger.info(f"[顺序模式] 变焦完成,等待镜头对焦 {auto_zoom_wait}s...")
                     time.sleep(auto_zoom_wait)
+                    # 变焦后再次清空缓冲区
+                    for _ in range(3):
+                        self.ptz.get_frame()
+                        time.sleep(0.05)
             
             # 获取清晰的球机画面
             ptz_frame = self._get_clear_ptz_frame()

+ 19 - 78
dual_camera_system/oss_uploader.py

@@ -1,6 +1,6 @@
 """
 OSS 上传模块
-支持阿里云 OSS、MinIO、AWS S3 等兼容 S3 的对象存储
+支持兼容 S3 的对象存储(MinIO、AWS S3、阿里云 OSS 等)
 """
 
 import os
@@ -13,21 +13,13 @@ from typing import Optional, Dict, Any, Tuple, List
 from pathlib import Path
 from datetime import datetime
 from dataclasses import dataclass, field
-from enum import Enum
 
 import cv2
 import numpy as np
 
 logger = logging.getLogger(__name__)
 
-# 尝试导入 OSS SDK
-try:
-    import oss2
-    ALIYUN_OSS_AVAILABLE = True
-except ImportError:
-    ALIYUN_OSS_AVAILABLE = False
-    logger.warning("阿里云 OSS SDK (oss2) 未安装,阿里云 OSS 功能不可用")
-
+# 尝试导入 boto3
 try:
     import boto3
     from botocore.exceptions import ClientError
@@ -37,14 +29,6 @@ except ImportError:
     logger.warning("boto3 未安装,S3 兼容存储功能不可用")
 
 
-class OSSProvider(Enum):
-    """OSS 提供商类型"""
-    ALIYUN = "aliyun"
-    MINIO = "minio"
-    AWS = "aws"
-    CUSTOM = "custom"
-
-
 @dataclass
 class UploadTask:
     """上传任务"""
@@ -70,7 +54,7 @@ class UploadResult:
 class OSSUploader:
     """
     OSS 上传器
-    支持阿里云 OSS 和 S3 兼容存储
+    支持兼容 S3 的对象存储(MinIO、AWS S3、阿里云 OSS 等)
     """
     
     def __init__(self, config: Dict[str, Any] = None):
@@ -80,16 +64,11 @@ class OSSUploader:
         Args:
             config: OSS 配置字典
         """
-        from config import OSS_CONFIG, S3_COMPATIBLE_CONFIG
+        from config import S3_COMPATIBLE_CONFIG
         
-        self.config = config or OSS_CONFIG
-        self.s3_config = S3_COMPATIBLE_CONFIG
+        self.config = config or S3_COMPATIBLE_CONFIG
         
         self.enabled = self.config.get('enabled', False)
-        self.provider = OSSProvider(self.config.get('provider', 'aliyun'))
-        
-        # 阿里云 OSS 客户端
-        self.aliyun_bucket = None
         
         # S3 客户端
         self.s3_client = None
@@ -117,42 +96,11 @@ class OSSUploader:
     def _init_client(self):
         """初始化 OSS 客户端"""
         try:
-            if self.provider == OSSProvider.ALIYUN:
-                self._init_aliyun_oss()
-            elif self.provider in [OSSProvider.MINIO, OSSProvider.AWS, OSSProvider.CUSTOM]:
-                self._init_s3_client()
+            self._init_s3_client()
         except Exception as e:
             logger.error(f"[OSS] 初始化客户端失败: {e}")
             self.enabled = False
     
-    def _init_aliyun_oss(self):
-        """初始化阿里云 OSS"""
-        if not ALIYUN_OSS_AVAILABLE:
-            logger.error("[OSS] 阿里云 OSS SDK 未安装")
-            self.enabled = False
-            return
-        
-        access_key_id = self.config.get('access_key_id', '')
-        access_key_secret = self.config.get('access_key_secret', '')
-        endpoint = self.config.get('endpoint', '')
-        bucket_name = self.config.get('bucket_name', '')
-        
-        if not all([access_key_id, access_key_secret, endpoint, bucket_name]):
-            logger.error("[OSS] 阿里云 OSS 配置不完整")
-            self.enabled = False
-            return
-        
-        auth = oss2.Auth(access_key_id, access_key_secret)
-        self.aliyun_bucket = oss2.Bucket(auth, endpoint, bucket_name)
-        
-        # 测试连接
-        try:
-            self.aliyun_bucket.get_bucket_info()
-            logger.info(f"[OSS] 阿里云 OSS 连接成功: {bucket_name}")
-        except Exception as e:
-            logger.error(f"[OSS] 阿里云 OSS 连接失败: {e}")
-            self.enabled = False
-    
     def _init_s3_client(self):
         """初始化 S3 兼容客户端"""
         if not BOTO3_AVAILABLE:
@@ -160,15 +108,15 @@ class OSSUploader:
             self.enabled = False
             return
         
-        if not self.s3_config.get('enabled', False):
+        if not self.config.get('enabled', False):
             logger.error("[OSS] S3 兼容配置未启用")
             self.enabled = False
             return
         
-        endpoint_url = self.s3_config.get('endpoint_url', '')
-        region_name = self.s3_config.get('region_name', 'us-east-1')
-        access_key_id = self.s3_config.get('access_key_id', '')
-        secret_access_key = self.s3_config.get('secret_access_key', '')
+        endpoint_url = self.config.get('endpoint_url', '')
+        region_name = self.config.get('region_name', 'us-east-1')
+        access_key_id = self.config.get('access_key_id', '')
+        secret_access_key = self.config.get('secret_access_key', '')
         
         if not all([endpoint_url, access_key_id, secret_access_key]):
             logger.error("[OSS] S3 配置不完整")
@@ -181,7 +129,7 @@ class OSSUploader:
             region_name=region_name,
             aws_access_key_id=access_key_id,
             aws_secret_access_key=secret_access_key,
-            use_ssl=self.s3_config.get('use_ssl', True),
+            use_ssl=self.config.get('use_ssl', True),
             verify=False
         )
         
@@ -266,25 +214,18 @@ class OSSUploader:
         
         for attempt in range(max_retries):
             try:
-                if self.provider == OSSProvider.ALIYUN and self.aliyun_bucket:
-                    # 阿里云 OSS 上传
-                    self.aliyun_bucket.put_object_from_file(oss_key, local_path)
+                if self.s3_client:
+                    # S3 兼容上传
+                    bucket_name = self.config.get('bucket_name', '')
+                    self.s3_client.upload_file(local_path, bucket_name, oss_key)
                     
-                    # 生成访问 URL
+                    # 生成 URL
                     custom_domain = self.config.get('custom_domain', '')
                     if custom_domain:
                         oss_url = f"{custom_domain}/{oss_key}"
                     else:
-                        oss_url = f"https://{self.config.get('bucket_name')}.{self.config.get('endpoint')}/{oss_key}"
-                    
-                elif self.s3_client:
-                    # S3 兼容上传
-                    bucket_name = self.s3_config.get('bucket_name', '')
-                    self.s3_client.upload_file(local_path, bucket_name, oss_key)
-                    
-                    # 生成 URL
-                    endpoint_url = self.s3_config.get('endpoint_url', '')
-                    oss_url = f"{endpoint_url}/{bucket_name}/{oss_key}"
+                        endpoint_url = self.config.get('endpoint_url', '')
+                        oss_url = f"{endpoint_url}/{bucket_name}/{oss_key}"
                 else:
                     return UploadResult(
                         success=False,