Explorar o código

feat(paired_image_saver): 重构配对图片保存器,支持OSS上传及JSON批次信息

- 完善配对图片保存器,支持将全景图与球机图保存于同一目录
- 新增OSS上传支持,可异步上传图片至对象存储服务
- 批次信息保存格式改为batch_info.json,包含设备与项目ID
- 添加人员图片OSS URL存储,助力后续访问和管理
- 提供上传状态跟踪及上传结果回调机制
- 增强线程安全与错误日志,改进目录管理与资源清理
- 保持旧接口兼容,新增初始化参数支持OSS及设备配置
wenhongquan hai 1 mes
pai
achega
3826437811

+ 121 - 0
CLAUDE.md

@@ -0,0 +1,121 @@
+# CLAUDE.md
+
+This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
+
+## Project Overview
+
+**施工现场安全行为智能识别系统 v2.0.0** - Construction site safety behavior intelligent recognition system.
+
+A dual-camera coordinated capture system that:
+- Uses panoramic camera for real-time monitoring with YOLO11 detection (person/hard hat/reflective vest)
+- Controls PTZ dome camera to zoom and track detected targets
+- Performs OCR number recognition via llama-server API
+- Detects safety violations (missing hard hat/reflective vest)
+- Pushes events to business platform and announces via TTS
+
+## Running Commands
+
+### OCR Mode (Number Recognition)
+```bash
+cd dual_camera_system
+python main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101
+python main.py --interactive      # Interactive mode
+python main.py --skip-calibration # Skip auto-calibration
+```
+
+### Safety Mode (Safety Detection)
+```bash
+cd dual_camera_system
+python safety_main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101
+```
+
+### Common Parameters
+```bash
+--model-size {n,s,m,l,x}    # YOLO11 model size
+--no-gpu                    # Disable GPU
+--ocr-host localhost --ocr-port 8111  # OCR API endpoint
+```
+
+### Dependencies
+```bash
+pip install opencv-python opencv-contrib-python ultralytics
+# OCR requires llama-server running on localhost:8111
+```
+
+## Architecture
+
+```
+dual_camera_system/
+├── main.py              # OCR mode entry point
+├── safety_main.py       # Safety mode entry point
+├── config.py            # Config aggregation (imports from config/)
+├── config/              # Modular configuration
+│   ├── system.py        # Feature toggles and working mode
+│   ├── camera.py        # Camera IPs, credentials, SDK paths
+│   ├── detection.py     # YOLO detection config
+│   └── ...              # Other module configs
+├── dahua_sdk.py         # Dahua SDK ctypes wrapper
+├── panorama_camera.py   # Panoramic camera + YOLO detection
+├── ptz_camera.py        # PTZ dome camera control
+├── calibration.py       # Visual calibration (motion detection + feature matching)
+├── coordinator.py       # Camera coordination logic (AsyncCoordinator, SequentialCoordinator)
+├── safety_detector.py   # Safety detection (hard hat/reflective vest)
+├── safety_coordinator.py # Safety mode coordinator
+├── ocr_recognizer.py    # OCR number recognition
+├── event_pusher.py      # Event push to platform
+├── voice_announcer.py   # TTS voice announcement
+└── llm_service.py       # LLM service wrapper (Qwen2.5-VL)
+```
+
+## Critical Technical Details
+
+### SDK Initialization Order (CRITICAL)
+In `main.py`, YOLO/PyTorch MUST load before Dahua SDK. The SDK's `CLIENT_Init` modifies process memory mapping; if loaded before PyTorch, it causes segfault. The current code handles this correctly - do not change the order.
+
+### SDK Path Auto-Selection
+`config/camera.py` auto-selects SDK path by CPU architecture:
+- `aarch64` → `/home/admin/dsh/dh/arm/Bin` (Orange Pi)
+- `x86_64` → `/home/wen/dsh/dh/Bin` (x86 Linux server)
+- Other → `../dh/Bin` (development reference)
+
+### Camera Ports
+- SDK login: port 37777
+- RTSP stream: port 554 (rtsp_port in config)
+
+### Calibration
+- Interval: 24 hours (not 5 minutes as noted in some docs)
+- Daily calibration time: configurable via `CALIBRATION_CONFIG.daily_calibration_time`
+- Methods: motion detection + SIFT/ORB feature matching
+- The system can fallback to angle estimation if visual calibration fails
+
+### Working Modes
+Controlled by `config/system.py`:
+- `mode: 'safety'` - Safety detection mode (safety_main.py)
+- `mode: 'ocr'` - Number recognition mode (main.py)
+
+Feature toggles control enabled modules:
+- `enable_detection`, `enable_safety_detection`
+- `enable_calibration`, `enable_ptz_tracking`
+- `enable_ocr`, `enable_llm`
+- `enable_event_push`, `enable_voice_announce`
+
+## RKNN Test Environment
+
+```bash
+ssh admin@192.168.20.84
+conda activate rknn
+cd /home/admin/dsh/dual_camera_system
+```
+
+## Model Paths
+
+- Safety detection model: `/home/wen/dsh/yolo/yolo11m_safety.pt`
+- Class mapping: 0=helmet, 3=person, 4=reflective vest
+- YOLO11 weights auto-download on first run
+
+## RTSP URL Pattern
+
+```
+rtsp://username:password@ip:554/cam/realmonitor?channel=0&subtype=0
+```
+Note: subtype=0 for main stream, subtype=1 for sub stream.

BIN=BIN
dual_camera_system/__pycache__/oss_uploader.cpython-310.pyc


BIN=BIN
dual_camera_system/__pycache__/paired_image_saver.cpython-310.pyc


BIN=BIN
dual_camera_system/__pycache__/third_party_pusher.cpython-310.pyc


+ 8 - 0
dual_camera_system/config/__init__.py

@@ -17,6 +17,10 @@ from .event import EVENT_PUSHER_CONFIG, EVENT_LISTENER_CONFIG
 from .voice import TTS_CONFIG, AUDIO_PLAYER_CONFIG, VOICE_ANNOUNCER_CONFIG
 from .llm import LLM_CONFIG, LLM_SAFETY_CONFIG
 from .system import SYSTEM_CONFIG
+from .oss import OSS_CONFIG, S3_COMPATIBLE_CONFIG
+from .device import (
+    DEVICE_CONFIG, THIRD_PARTY_CONFIG, BATCH_REPORT_CONFIG
+)
 
 
 # 导出所有配置 (保持向后兼容)
@@ -41,4 +45,8 @@ __all__ = [
     'LLM_CONFIG', 'LLM_SAFETY_CONFIG',
     # 系统
     'SYSTEM_CONFIG',
+    # OSS
+    'OSS_CONFIG', 'S3_COMPATIBLE_CONFIG',
+    # 设备与第三方平台
+    'DEVICE_CONFIG', 'THIRD_PARTY_CONFIG', 'BATCH_REPORT_CONFIG',
 ]

BIN=BIN
dual_camera_system/config/__pycache__/__init__.cpython-310.pyc


BIN=BIN
dual_camera_system/config/__pycache__/camera.cpython-310.pyc


BIN=BIN
dual_camera_system/config/__pycache__/device.cpython-310.pyc


BIN=BIN
dual_camera_system/config/__pycache__/oss.cpython-310.pyc


+ 87 - 0
dual_camera_system/config/device.py

@@ -0,0 +1,87 @@
+"""
+设备配置
+设备编号、第三方平台接口等配置
+"""
+
+# 设备配置
+DEVICE_CONFIG = {
+    # 设备编号(必填,用于标识当前设备)
+    'device_id': 'DEVICE_001',
+    
+    # 设备名称
+    'device_name': '施工现场安全识别设备',
+    
+    # 设备安装位置
+    'location': '施工现场A区',
+    
+    # 项目编号
+    'project_id': 'PROJECT_001',
+    
+    # 项目密钥(用于接口鉴权)
+    'project_secret': '',
+}
+
+# 第三方平台接口配置
+THIRD_PARTY_CONFIG = {
+    'enabled': False,  # 是否启用第三方平台推送
+    
+    # 平台类型: 'custom', 'jtjai', 'huawei', 'aliyun'
+    'platform_type': 'custom',
+    
+    # 接口基础配置
+    'base_url': '',  # 如: https://api.example.com
+    'api_version': 'v1',
+    
+    # 认证配置
+    'auth_type': 'none',  # 可选: 'none', 'api_key', 'oauth2', 'basic'
+    'api_key': '',
+    'api_secret': '',
+    
+    # OAuth2 配置(当 auth_type='oauth2' 时使用)
+    'oauth2': {
+        'token_url': '',
+        'client_id': '',
+        'client_secret': '',
+        'scope': '',
+    },
+    
+    # 接口路径配置
+    'endpoints': {
+        # 批次信息上报接口(接收 batch_info.json)
+        'batch_report': '/api/batch/report',
+        
+        # 心跳接口
+        'heartbeat': '/api/device/heartbeat',
+        
+        # 图片上传回调接口(可选,如果第三方平台需要单独通知)
+        'image_upload_callback': '/api/image/uploaded',
+    },
+    
+    # 推送控制
+    'push_interval': 1.0,     # 推送间隔(秒)
+    'retry_count': 3,         # 重试次数
+    'retry_delay': 2.0,       # 重试延迟(秒)
+    'timeout': 10,            # 请求超时(秒)
+    
+    # 数据格式
+    'data_format': 'json',    # 可选: 'json', 'form'
+    
+    # 是否包含图片文件(multipart/form-data 上传)
+    'include_images': False,
+}
+
+# 批次信息上报配置
+BATCH_REPORT_CONFIG = {
+    # 上报时机
+    'report_on_complete': True,   # 批次完成时上报
+    'report_realtime': False,     # 实时上报(每保存一张图就上报)
+    
+    # 上报内容
+    'include_panorama_url': True,   # 包含全景图 OSS URL
+    'include_ptz_urls': True,       # 包含球机图 OSS URLs
+    'include_raw_images': False,    # 是否包含原始图片数据(Base64)
+    
+    # 本地保留
+    'keep_local_copy': True,        # 上报后是否保留本地副本
+    'local_retention_days': 7,      # 本地保留天数
+}

+ 45 - 0
dual_camera_system/config/oss.py

@@ -0,0 +1,45 @@
+"""
+OSS 配置
+阿里云 OSS 或其他兼容 S3 的对象存储配置
+"""
+
+# OSS 配置
+OSS_CONFIG = {
+    'enabled': False,  # 是否启用 OSS 上传
+    
+    # 阿里云 OSS 配置
+    'provider': 'custom',  # 可选: 'aliyun', 'minio', 'aws', 'custom'
+    
+    # 访问密钥
+    'access_key_id': '',
+    'access_key_secret': '',
+    
+    # 存储桶配置
+    'bucket_name': '',
+    'endpoint': '',  # 如: oss-cn-hangzhou.aliyuncs.com
+    
+    # 自定义域名(可选,用于生成访问 URL)
+    'custom_domain': '',  # 如: https://cdn.example.com
+    
+    # 上传配置
+    'upload_timeout': 30,  # 上传超时时间(秒)
+    'retry_times': 3,      # 重试次数
+    
+    # 路径前缀
+    'path_prefix': 'device',  # OSS 上的路径前缀
+    
+    # 图片访问权限
+    'acl': 'public-read',  # 可选: 'private', 'public-read'
+}
+
+# 兼容 S3 的 OSS 配置(如 MinIO、AWS S3)
+S3_COMPATIBLE_CONFIG = {
+    'enabled': True,
+    'endpoint_url': 'https://oss.dnnbuild.com',  # 如: http://localhost:9000
+    'region_name': 'us-east-1',
+    'access_key_id': 'wvp',
+    'secret_access_key': '6MnZFxZxRwbvS01khA9ldiawJuc9mytyiq2kEv3k',
+    'bucket_name': 'wvp',
+    'path_prefix': 'device',
+    'use_ssl': True,
+}

+ 469 - 0
dual_camera_system/oss_uploader.py

@@ -0,0 +1,469 @@
+"""
+OSS 上传模块
+支持阿里云 OSS、MinIO、AWS S3 等兼容 S3 的对象存储
+"""
+
+import os
+import time
+import json
+import logging
+import threading
+import queue
+from typing import Optional, Dict, Any, Tuple, List
+from pathlib import Path
+from datetime import datetime
+from dataclasses import dataclass, field
+from enum import Enum
+
+import cv2
+import numpy as np
+
+logger = logging.getLogger(__name__)
+
+# 尝试导入 OSS SDK
+try:
+    import oss2
+    ALIYUN_OSS_AVAILABLE = True
+except ImportError:
+    ALIYUN_OSS_AVAILABLE = False
+    logger.warning("阿里云 OSS SDK (oss2) 未安装,阿里云 OSS 功能不可用")
+
+try:
+    import boto3
+    from botocore.exceptions import ClientError
+    BOTO3_AVAILABLE = True
+except ImportError:
+    BOTO3_AVAILABLE = False
+    logger.warning("boto3 未安装,S3 兼容存储功能不可用")
+
+
+class OSSProvider(Enum):
+    """OSS 提供商类型"""
+    ALIYUN = "aliyun"
+    MINIO = "minio"
+    AWS = "aws"
+    CUSTOM = "custom"
+
+
+@dataclass
+class UploadTask:
+    """上传任务"""
+    local_path: str
+    oss_key: str
+    batch_id: str
+    image_type: str  # 'panorama' 或 'ptz'
+    person_index: Optional[int] = None  # 仅用于 PTZ 图片
+    retry_count: int = 0
+    callback: Optional[callable] = None
+
+
+@dataclass
+class UploadResult:
+    """上传结果"""
+    success: bool
+    local_path: str
+    oss_key: str
+    oss_url: str
+    error: Optional[str] = None
+
+
+class OSSUploader:
+    """
+    OSS 上传器
+    支持阿里云 OSS 和 S3 兼容存储
+    """
+    
+    def __init__(self, config: Dict[str, Any] = None):
+        """
+        初始化 OSS 上传器
+        
+        Args:
+            config: OSS 配置字典
+        """
+        from config import OSS_CONFIG, S3_COMPATIBLE_CONFIG
+        
+        self.config = config or OSS_CONFIG
+        self.s3_config = S3_COMPATIBLE_CONFIG
+        
+        self.enabled = self.config.get('enabled', False)
+        self.provider = OSSProvider(self.config.get('provider', 'aliyun'))
+        
+        # 阿里云 OSS 客户端
+        self.aliyun_bucket = None
+        
+        # S3 客户端
+        self.s3_client = None
+        
+        # 上传队列
+        self.upload_queue = queue.Queue()
+        self.result_queue = queue.Queue()
+        
+        # 工作线程
+        self.running = False
+        self.worker_thread = None
+        
+        # 统计
+        self.stats = {
+            'total_uploads': 0,
+            'success_uploads': 0,
+            'failed_uploads': 0,
+        }
+        self.stats_lock = threading.Lock()
+        
+        # 初始化客户端
+        if self.enabled:
+            self._init_client()
+    
+    def _init_client(self):
+        """初始化 OSS 客户端"""
+        try:
+            if self.provider == OSSProvider.ALIYUN:
+                self._init_aliyun_oss()
+            elif self.provider in [OSSProvider.MINIO, OSSProvider.AWS, OSSProvider.CUSTOM]:
+                self._init_s3_client()
+        except Exception as e:
+            logger.error(f"[OSS] 初始化客户端失败: {e}")
+            self.enabled = False
+    
+    def _init_aliyun_oss(self):
+        """初始化阿里云 OSS"""
+        if not ALIYUN_OSS_AVAILABLE:
+            logger.error("[OSS] 阿里云 OSS SDK 未安装")
+            self.enabled = False
+            return
+        
+        access_key_id = self.config.get('access_key_id', '')
+        access_key_secret = self.config.get('access_key_secret', '')
+        endpoint = self.config.get('endpoint', '')
+        bucket_name = self.config.get('bucket_name', '')
+        
+        if not all([access_key_id, access_key_secret, endpoint, bucket_name]):
+            logger.error("[OSS] 阿里云 OSS 配置不完整")
+            self.enabled = False
+            return
+        
+        auth = oss2.Auth(access_key_id, access_key_secret)
+        self.aliyun_bucket = oss2.Bucket(auth, endpoint, bucket_name)
+        
+        # 测试连接
+        try:
+            self.aliyun_bucket.get_bucket_info()
+            logger.info(f"[OSS] 阿里云 OSS 连接成功: {bucket_name}")
+        except Exception as e:
+            logger.error(f"[OSS] 阿里云 OSS 连接失败: {e}")
+            self.enabled = False
+    
+    def _init_s3_client(self):
+        """初始化 S3 兼容客户端"""
+        if not BOTO3_AVAILABLE:
+            logger.error("[OSS] boto3 未安装")
+            self.enabled = False
+            return
+        
+        if not self.s3_config.get('enabled', False):
+            logger.error("[OSS] S3 兼容配置未启用")
+            self.enabled = False
+            return
+        
+        endpoint_url = self.s3_config.get('endpoint_url', '')
+        region_name = self.s3_config.get('region_name', 'us-east-1')
+        access_key_id = self.s3_config.get('access_key_id', '')
+        secret_access_key = self.s3_config.get('secret_access_key', '')
+        
+        if not all([endpoint_url, access_key_id, secret_access_key]):
+            logger.error("[OSS] S3 配置不完整")
+            self.enabled = False
+            return
+        
+        self.s3_client = boto3.client(
+            's3',
+            endpoint_url=endpoint_url,
+            region_name=region_name,
+            aws_access_key_id=access_key_id,
+            aws_secret_access_key=secret_access_key,
+            use_ssl=self.s3_config.get('use_ssl', True),
+            verify=False
+        )
+        
+        logger.info(f"[OSS] S3 客户端初始化成功: {endpoint_url}")
+    
+    def start(self):
+        """启动上传器"""
+        if not self.enabled:
+            logger.info("[OSS] 上传器未启用")
+            return
+        
+        if self.running:
+            return
+        
+        self.running = True
+        self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
+        self.worker_thread.start()
+        logger.info("[OSS] 上传器已启动")
+    
+    def stop(self):
+        """停止上传器"""
+        self.running = False
+        if self.worker_thread:
+            self.worker_thread.join(timeout=5)
+        logger.info("[OSS] 上传器已停止")
+    
+    def _worker_loop(self):
+        """工作线程循环"""
+        while self.running:
+            try:
+                task = self.upload_queue.get(timeout=1.0)
+                self._process_upload(task)
+            except queue.Empty:
+                continue
+            except Exception as e:
+                logger.error(f"[OSS] 上传处理错误: {e}")
+    
+    def _process_upload(self, task: UploadTask):
+        """处理单个上传任务"""
+        result = self._upload_file(task)
+        
+        with self.stats_lock:
+            self.stats['total_uploads'] += 1
+            if result.success:
+                self.stats['success_uploads'] += 1
+            else:
+                self.stats['failed_uploads'] += 1
+        
+        # 调用回调
+        if task.callback:
+            try:
+                task.callback(result)
+            except Exception as e:
+                logger.error(f"[OSS] 回调执行错误: {e}")
+        
+        # 放入结果队列
+        self.result_queue.put(result)
+    
+    def _upload_file(self, task: UploadTask) -> UploadResult:
+        """
+        上传文件到 OSS
+        
+        Args:
+            task: 上传任务
+            
+        Returns:
+            UploadResult: 上传结果
+        """
+        local_path = task.local_path
+        oss_key = task.oss_key
+        
+        if not os.path.exists(local_path):
+            return UploadResult(
+                success=False,
+                local_path=local_path,
+                oss_key=oss_key,
+                oss_url='',
+                error='文件不存在'
+            )
+        
+        max_retries = self.config.get('retry_times', 3)
+        
+        for attempt in range(max_retries):
+            try:
+                if self.provider == OSSProvider.ALIYUN and self.aliyun_bucket:
+                    # 阿里云 OSS 上传
+                    self.aliyun_bucket.put_object_from_file(oss_key, local_path)
+                    
+                    # 生成访问 URL
+                    custom_domain = self.config.get('custom_domain', '')
+                    if custom_domain:
+                        oss_url = f"{custom_domain}/{oss_key}"
+                    else:
+                        oss_url = f"https://{self.config.get('bucket_name')}.{self.config.get('endpoint')}/{oss_key}"
+                    
+                elif self.s3_client:
+                    # S3 兼容上传
+                    bucket_name = self.s3_config.get('bucket_name', '')
+                    self.s3_client.upload_file(local_path, bucket_name, oss_key)
+                    
+                    # 生成 URL
+                    endpoint_url = self.s3_config.get('endpoint_url', '')
+                    oss_url = f"{endpoint_url}/{bucket_name}/{oss_key}"
+                else:
+                    return UploadResult(
+                        success=False,
+                        local_path=local_path,
+                        oss_key=oss_key,
+                        oss_url='',
+                        error='OSS 客户端未初始化'
+                    )
+                
+                logger.info(f"[OSS] 上传成功: {local_path} -> {oss_key}")
+                
+                return UploadResult(
+                    success=True,
+                    local_path=local_path,
+                    oss_key=oss_key,
+                    oss_url=oss_url
+                )
+                
+            except Exception as e:
+                error_msg = str(e)
+                logger.warning(f"[OSS] 上传失败 (尝试 {attempt + 1}/{max_retries}): {error_msg}")
+                
+                if attempt < max_retries - 1:
+                    time.sleep(1)
+                else:
+                    return UploadResult(
+                        success=False,
+                        local_path=local_path,
+                        oss_key=oss_key,
+                        oss_url='',
+                        error=error_msg
+                    )
+        
+        return UploadResult(
+            success=False,
+            local_path=local_path,
+            oss_key=oss_key,
+            oss_url='',
+            error='达到最大重试次数'
+        )
+    
+    def upload_image(self, local_path: str, batch_id: str, image_type: str,
+                     person_index: Optional[int] = None,
+                     callback: Optional[callable] = None) -> str:
+        """
+        上传图片(异步)
+        
+        Args:
+            local_path: 本地图片路径
+            batch_id: 批次ID
+            image_type: 图片类型 ('panorama' 或 'ptz')
+            person_index: 人员序号(仅用于 PTZ 图片)
+            callback: 上传完成回调函数
+            
+        Returns:
+            oss_key: OSS 对象键(用于后续查询结果)
+        """
+        if not self.enabled:
+            return ''
+        
+        # 生成 OSS 路径
+        timestamp = datetime.now().strftime("%Y%m%d")
+        filename = os.path.basename(local_path)
+        path_prefix = self.config.get('path_prefix', 'safety-system')
+        
+        if image_type == 'panorama':
+            oss_key = f"{path_prefix}/{timestamp}/{batch_id}/panorama/{filename}"
+        else:
+            oss_key = f"{path_prefix}/{timestamp}/{batch_id}/ptz/{filename}"
+        
+        # 创建上传任务
+        task = UploadTask(
+            local_path=local_path,
+            oss_key=oss_key,
+            batch_id=batch_id,
+            image_type=image_type,
+            person_index=person_index,
+            callback=callback
+        )
+        
+        # 加入队列
+        self.upload_queue.put(task)
+        
+        return oss_key
+    
+    def upload_image_sync(self, local_path: str, batch_id: str, 
+                          image_type: str,
+                          person_index: Optional[int] = None) -> UploadResult:
+        """
+        同步上传图片
+        
+        Args:
+            local_path: 本地图片路径
+            batch_id: 批次ID
+            image_type: 图片类型
+            person_index: 人员序号
+            
+        Returns:
+            UploadResult: 上传结果
+        """
+        if not self.enabled:
+            return UploadResult(
+                success=False,
+                local_path=local_path,
+                oss_key='',
+                oss_url='',
+                error='OSS 未启用'
+            )
+        
+        # 生成 OSS 路径
+        timestamp = datetime.now().strftime("%Y%m%d")
+        filename = os.path.basename(local_path)
+        path_prefix = self.config.get('path_prefix', 'safety-system')
+        
+        if image_type == 'panorama':
+            oss_key = f"{path_prefix}/{timestamp}/{batch_id}/panorama/{filename}"
+        else:
+            oss_key = f"{path_prefix}/{timestamp}/{batch_id}/ptz/{filename}"
+        
+        task = UploadTask(
+            local_path=local_path,
+            oss_key=oss_key,
+            batch_id=batch_id,
+            image_type=image_type,
+            person_index=person_index
+        )
+        
+        return self._upload_file(task)
+    
+    def get_upload_result(self, timeout: float = 0.1) -> Optional[UploadResult]:
+        """
+        获取上传结果(非阻塞)
+        
+        Args:
+            timeout: 等待超时时间
+            
+        Returns:
+            UploadResult 或 None
+        """
+        try:
+            return self.result_queue.get(timeout=timeout)
+        except queue.Empty:
+            return None
+    
+    def get_stats(self) -> Dict[str, int]:
+        """获取统计信息"""
+        with self.stats_lock:
+            return self.stats.copy()
+    
+    def is_enabled(self) -> bool:
+        """检查是否启用"""
+        return self.enabled
+
+
+# 全局单例
+_oss_uploader_instance: Optional[OSSUploader] = None
+
+
+def get_oss_uploader(config: Dict[str, Any] = None) -> OSSUploader:
+    """
+    获取 OSS 上传器实例(单例模式)
+    
+    Args:
+        config: OSS 配置
+        
+    Returns:
+        OSSUploader 实例
+    """
+    global _oss_uploader_instance
+    
+    if _oss_uploader_instance is None:
+        _oss_uploader_instance = OSSUploader(config)
+    
+    return _oss_uploader_instance
+
+
+def reset_oss_uploader():
+    """重置 OSS 上传器实例"""
+    global _oss_uploader_instance
+    if _oss_uploader_instance is not None:
+        _oss_uploader_instance.stop()
+    _oss_uploader_instance = None

+ 259 - 19
dual_camera_system/paired_image_saver.py

@@ -1,17 +1,19 @@
 """
 配对图片保存管理器
 将全景检测图片和对应的球机聚焦图片保存到同一目录
+支持 OSS 上传和 batch_info.json 格式
 """
 
 import os
 import cv2
 import time
+import json
 import logging
 import threading
 from pathlib import Path
 from datetime import datetime
-from typing import Optional, List, Dict, Tuple
-from dataclasses import dataclass, field
+from typing import Optional, List, Dict, Tuple, Callable
+from dataclasses import dataclass, field, asdict
 
 logger = logging.getLogger(__name__)
 
@@ -27,6 +29,7 @@ class PersonInfo:
     ptz_bbox: Optional[Tuple[int, int, int, int]] = None  # 球机图中检测到的bbox (x1, y1, x2, y2)
     ptz_image_saved: bool = False
     ptz_image_path: Optional[str] = None
+    ptz_oss_url: Optional[str] = None  # 球机图 OSS URL
 
 
 @dataclass
@@ -36,10 +39,13 @@ class DetectionBatch:
     timestamp: float
     panorama_image: Optional[object] = None  # numpy array
     panorama_path: Optional[str] = None
+    panorama_oss_url: Optional[str] = None  # 全景图 OSS URL
     persons: List[PersonInfo] = field(default_factory=list)
     total_persons: int = 0
     ptz_images_count: int = 0
     completed: bool = False
+    device_id: str = ''  # 设备编号
+    project_id: str = ''  # 项目编号
 
 
 class PairedImageSaver:
@@ -51,11 +57,16 @@ class PairedImageSaver:
     2. 保存全景标记图到批次目录
     3. 为每个人员保存对应的球机聚焦图到同一目录
     4. 支持时间窗口内的批量保存
+    5. 支持 OSS 上传
+    6. 生成 batch_info.json
     """
     
     def __init__(self, base_dir: str = '/home/admin/dsh/paired_images',
                  time_window: float = 5.0,  # 时间窗口(秒)
-                 max_batches: int = 100):
+                 max_batches: int = 100,
+                 enable_oss: bool = False,
+                 oss_uploader = None,
+                 device_config: Dict = None):
         """
         初始化
         
@@ -63,27 +74,39 @@ class PairedImageSaver:
             base_dir: 基础保存目录
             time_window: 批次时间窗口(秒),同一窗口内的检测归为一批
             max_batches: 最大保留批次数量
+            enable_oss: 是否启用 OSS 上传
+            oss_uploader: OSS 上传器实例
+            device_config: 设备配置字典
         """
         self.base_dir = Path(base_dir)
         self.time_window = time_window
         self.max_batches = max_batches
+        self.enable_oss = enable_oss
+        self.oss_uploader = oss_uploader
+        self.device_config = device_config or {}
         
         self._current_batch: Optional[DetectionBatch] = None
         self._batch_lock = threading.Lock()
         self._last_batch_time = 0.0
         
+        # 上传状态追踪
+        self._upload_status: Dict[str, Dict] = {}  # batch_id -> {panorama: bool, ptz: Dict}
+        self._upload_callback: Optional[Callable] = None
+        
         # 统计信息
         self._stats = {
             'total_batches': 0,
             'total_persons': 0,
-            'total_ptz_images': 0
+            'total_ptz_images': 0,
+            'oss_upload_success': 0,
+            'oss_upload_failed': 0,
         }
         self._stats_lock = threading.Lock()
         
         # 确保目录存在
         self._ensure_base_dir()
         
-        logger.info(f"[配对保存] 初始化完成: 目录={base_dir}, 时间窗口={time_window}s")
+        logger.info(f"[配对保存] 初始化完成: 目录={base_dir}, 时间窗口={time_window}s, OSS={enable_oss}")
     
     def _ensure_base_dir(self):
         """确保基础目录存在"""
@@ -147,6 +170,10 @@ class PairedImageSaver:
                 )
                 person_infos.append(info)
             
+            # 获取设备信息
+            device_id = self.device_config.get('device_id', 'UNKNOWN')
+            project_id = self.device_config.get('project_id', 'UNKNOWN')
+            
             # 创建批次记录
             self._current_batch = DetectionBatch(
                 batch_id=batch_id,
@@ -154,9 +181,18 @@ class PairedImageSaver:
                 panorama_image=panorama_frame,
                 panorama_path=panorama_path,
                 persons=person_infos,
-                total_persons=len(persons)
+                total_persons=len(persons),
+                device_id=device_id,
+                project_id=project_id
             )
             
+            # 初始化上传状态
+            self._upload_status[batch_id] = {
+                'panorama': False,
+                'ptz': {},
+                'completed': False
+            }
+            
             self._last_batch_time = current_time
             
             with self._stats_lock:
@@ -168,6 +204,10 @@ class PairedImageSaver:
                 f"人员={len(persons)}, 目录={batch_dir}"
             )
             
+            # 上传全景图到 OSS
+            if self.enable_oss and panorama_path and self.oss_uploader:
+                self._upload_panorama_to_oss(batch_id, panorama_path)
+            
             return batch_id
     
     def _save_panorama_image(self, batch_dir: Path, batch_id: str,
@@ -319,6 +359,10 @@ class PairedImageSaver:
                         self._stats['total_ptz_images'] += 1
                     
                     logger.info(f"[配对保存] 球机图已保存: {filepath}, BBox={ptz_bbox}")
+                    
+                    # 上传球机图到 OSS
+                    if self.enable_oss and self.oss_uploader:
+                        self._upload_ptz_to_oss(batch_id, person_index, str(filepath))
                 else:
                     # 人员索引超出范围,说明批次信息不一致,跳过保存
                     logger.warning(f"[配对保存] 人员索引 {person_index} 超出批次范围 {len(self._current_batch.persons)},跳过计数")
@@ -329,21 +373,184 @@ class PairedImageSaver:
                 logger.error(f"[配对保存] 保存球机图失败: {e}")
                 return None
     
+    def _upload_panorama_to_oss(self, batch_id: str, panorama_path: str):
+        """上传全景图到 OSS"""
+        def on_upload_complete(result):
+            if result.success:
+                self._current_batch.panorama_oss_url = result.oss_url
+                self._upload_status[batch_id]['panorama'] = True
+                with self._stats_lock:
+                    self._stats['oss_upload_success'] += 1
+                logger.info(f"[OSS] 全景图上传成功: {result.oss_url}")
+            else:
+                with self._stats_lock:
+                    self._stats['oss_upload_failed'] += 1
+                logger.error(f"[OSS] 全景图上传失败: {result.error}")
+        
+        self.oss_uploader.upload_image(
+            local_path=panorama_path,
+            batch_id=batch_id,
+            image_type='panorama',
+            callback=on_upload_complete
+        )
+    
+    def _upload_ptz_to_oss(self, batch_id: str, person_index: int, ptz_path: str):
+        """上传球机图到 OSS"""
+        def on_upload_complete(result):
+            if result.success:
+                # 更新人员信息中的 OSS URL
+                if person_index < len(self._current_batch.persons):
+                    self._current_batch.persons[person_index].ptz_oss_url = result.oss_url
+                self._upload_status[batch_id]['ptz'][person_index] = result.oss_url
+                with self._stats_lock:
+                    self._stats['oss_upload_success'] += 1
+                logger.info(f"[OSS] 球机图上传成功 (person_{person_index}): {result.oss_url}")
+            else:
+                with self._stats_lock:
+                    self._stats['oss_upload_failed'] += 1
+                logger.error(f"[OSS] 球机图上传失败 (person_{person_index}): {result.error}")
+        
+        self.oss_uploader.upload_image(
+            local_path=ptz_path,
+            batch_id=batch_id,
+            image_type='ptz',
+            person_index=person_index,
+            callback=on_upload_complete
+        )
+    
     def _finalize_batch(self, batch: DetectionBatch):
         """完成批次处理"""
         batch.completed = True
         
-        # 创建批次信息文件
+        # 等待 OSS 上传完成(最多等待5秒)
+        if self.enable_oss and batch.batch_id in self._upload_status:
+            wait_start = time.time()
+            while time.time() - wait_start < 5.0:
+                status = self._upload_status[batch.batch_id]
+                # 检查全景图是否上传完成
+                panorama_done = status.get('panorama', False) or not batch.panorama_path
+                # 检查所有球机图是否上传完成
+                ptz_done = all(
+                    idx in status.get('ptz', {})
+                    for idx, person in enumerate(batch.persons)
+                    if person.ptz_image_saved
+                )
+                if panorama_done and ptz_done:
+                    break
+                time.sleep(0.1)
+        
+        # 创建 batch_info.json 文件
         try:
             batch_dir = self.base_dir / f"batch_{batch.batch_id}"
-            info_path = batch_dir / "batch_info.txt"
             
-            with open(info_path, 'w', encoding='utf-8') as f:
+            # 构建 JSON 数据
+            batch_info = self._build_batch_info_json(batch)
+            
+            # 保存为 JSON 文件
+            json_path = batch_dir / "batch_info.json"
+            with open(json_path, 'w', encoding='utf-8') as f:
+                json.dump(batch_info, f, ensure_ascii=False, indent=2)
+            
+            logger.info(f"[配对保存] batch_info.json 已保存: {json_path}")
+            
+            # 同时保留 txt 格式用于兼容(可选)
+            txt_path = batch_dir / "batch_info.txt"
+            self._save_batch_info_txt(batch, txt_path)
+            
+            # 标记上传完成
+            if batch.batch_id in self._upload_status:
+                self._upload_status[batch.batch_id]['completed'] = True
+            
+            # 触发回调
+            if self._upload_callback:
+                try:
+                    self._upload_callback(batch_info)
+                except Exception as e:
+                    logger.error(f"[配对保存] 回调执行错误: {e}")
+            
+            logger.info(f"[配对保存] 批次完成: {batch.batch_id}, "
+                       f"人员={batch.total_persons}, 球机图={batch.ptz_images_count}")
+            
+        except Exception as e:
+            logger.error(f"[配对保存] 保存批次信息失败: {e}")
+        
+        # 清理旧批次
+        self._cleanup_old_batches()
+    
+    def _build_batch_info_json(self, batch: DetectionBatch) -> Dict:
+        """
+        构建 batch_info.json 数据结构
+        
+        Returns:
+            Dict: 批次信息字典
+        """
+        # 人员信息列表
+        persons_list = []
+        for person in batch.persons:
+            person_data = {
+                'person_index': person.person_index,
+                'position': {
+                    'x': round(person.position[0], 4),
+                    'y': round(person.position[1], 4)
+                },
+                'bbox': {
+                    'x1': person.bbox[0],
+                    'y1': person.bbox[1],
+                    'x2': person.bbox[2],
+                    'y2': person.bbox[3]
+                },
+                'confidence': round(person.confidence, 4),
+                'ptz_position': {
+                    'pan': round(person.ptz_position[0], 2) if person.ptz_position else None,
+                    'tilt': round(person.ptz_position[1], 2) if person.ptz_position else None,
+                    'zoom': person.ptz_position[2] if person.ptz_position else None
+                } if person.ptz_position else None,
+                'ptz_bbox': {
+                    'x1': person.ptz_bbox[0],
+                    'y1': person.ptz_bbox[1],
+                    'x2': person.ptz_bbox[2],
+                    'y2': person.ptz_bbox[3]
+                } if person.ptz_bbox else None,
+                'ptz_image_saved': person.ptz_image_saved,
+                'ptz_image_path': person.ptz_image_path,
+                'ptz_oss_url': person.ptz_oss_url
+            }
+            persons_list.append(person_data)
+        
+        # 构建完整批次信息
+        batch_info = {
+            'batch_id': batch.batch_id,
+            'device_id': batch.device_id,
+            'project_id': batch.project_id,
+            'timestamp': batch.timestamp,
+            'datetime': datetime.fromtimestamp(batch.timestamp).isoformat(),
+            'total_persons': batch.total_persons,
+            'ptz_images_count': batch.ptz_images_count,
+            'panorama': {
+                'local_path': batch.panorama_path,
+                'oss_url': batch.panorama_oss_url
+            },
+            'persons': persons_list,
+            'upload_status': {
+                'panorama_uploaded': batch.panorama_oss_url is not None,
+                'all_ptz_uploaded': all(p.ptz_oss_url is not None for p in batch.persons if p.ptz_image_saved)
+            }
+        }
+        
+        return batch_info
+    
+    def _save_batch_info_txt(self, batch: DetectionBatch, txt_path: Path):
+        """保存批次信息为 TXT 格式(兼容旧版本)"""
+        try:
+            with open(txt_path, 'w', encoding='utf-8') as f:
                 f.write(f"批次ID: {batch.batch_id}\n")
+                f.write(f"设备ID: {batch.device_id}\n")
+                f.write(f"项目ID: {batch.project_id}\n")
                 f.write(f"时间戳: {datetime.fromtimestamp(batch.timestamp)}\n")
                 f.write(f"总人数: {batch.total_persons}\n")
                 f.write(f"球机图数量: {batch.ptz_images_count}\n")
                 f.write(f"全景图: {batch.panorama_path}\n")
+                f.write(f"全景图OSS: {batch.panorama_oss_url}\n")
                 f.write("\n人员详情:\n")
                 
                 for i, person in enumerate(batch.persons):
@@ -358,15 +565,9 @@ class PairedImageSaver:
                     else:
                         f.write(f"    PTZ BBox: None\n")
                     f.write(f"    PTZ Image: {person.ptz_image_path}\n")
-            
-            logger.info(f"[配对保存] 批次完成: {batch.batch_id}, "
-                       f"人员={batch.total_persons}, 球机图={batch.ptz_images_count}")
-            
+                    f.write(f"    PTZ OSS URL: {person.ptz_oss_url}\n")
         except Exception as e:
-            logger.error(f"[配对保存] 保存批次信息失败: {e}")
-        
-        # 清理旧批次
-        self._cleanup_old_batches()
+            logger.error(f"[配对保存] 保存 TXT 批次信息失败: {e}")
     
     def _cleanup_old_batches(self):
         """清理旧批次目录"""
@@ -396,6 +597,37 @@ class PairedImageSaver:
         with self._stats_lock:
             return self._stats.copy()
     
+    def set_upload_callback(self, callback: Callable):
+        """
+        设置批次完成回调函数
+        
+        Args:
+            callback: 回调函数,接收 batch_info_dict 参数
+        """
+        self._upload_callback = callback
+    
+    def get_batch_info(self, batch_id: str) -> Optional[Dict]:
+        """
+        获取指定批次的 batch_info.json 数据
+        
+        Args:
+            batch_id: 批次ID
+            
+        Returns:
+            Dict 或 None
+        """
+        try:
+            batch_dir = self.base_dir / f"batch_{batch_id}"
+            json_path = batch_dir / "batch_info.json"
+            
+            if json_path.exists():
+                with open(json_path, 'r', encoding='utf-8') as f:
+                    return json.load(f)
+        except Exception as e:
+            logger.error(f"[配对保存] 读取 batch_info.json 失败: {e}")
+        
+        return None
+    
     def close(self):
         """关闭管理器,完成当前批次"""
         with self._batch_lock:
@@ -410,13 +642,18 @@ class PairedImageSaver:
 _paired_saver_instance: Optional[PairedImageSaver] = None
 
 
-def get_paired_saver(base_dir: str = None, time_window: float = 5.0) -> PairedImageSaver:
+def get_paired_saver(base_dir: str = None, time_window: float = 5.0,
+                     enable_oss: bool = False, oss_uploader = None,
+                     device_config: Dict = None) -> PairedImageSaver:
     """
     获取配对保存管理器实例(单例模式)
     
     Args:
         base_dir: 基础保存目录
         time_window: 时间窗口
+        enable_oss: 是否启用 OSS 上传
+        oss_uploader: OSS 上传器实例
+        device_config: 设备配置字典
         
     Returns:
         PairedImageSaver 实例
@@ -426,7 +663,10 @@ def get_paired_saver(base_dir: str = None, time_window: float = 5.0) -> PairedIm
     if _paired_saver_instance is None:
         _paired_saver_instance = PairedImageSaver(
             base_dir=base_dir or '/home/admin/dsh/paired_images',
-            time_window=time_window
+            time_window=time_window,
+            enable_oss=enable_oss,
+            oss_uploader=oss_uploader,
+            device_config=device_config
         )
     
     return _paired_saver_instance

+ 485 - 0
dual_camera_system/scripts/start.sh

@@ -0,0 +1,485 @@
+"""
+第三方平台推送模块
+将批次信息推送到第三方平台接口
+"""
+
+import os
+import time
+import json
+import logging
+import threading
+import queue
+import requests
+from typing import Optional, Dict, Any, List, Callable
+from dataclasses import dataclass
+from datetime import datetime
+from pathlib import Path
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class BatchReport:
+    """批次上报数据"""
+    batch_id: str
+    device_id: str
+    project_id: str
+    timestamp: float
+    batch_info: Dict[str, Any]  # batch_info.json 的完整内容
+    local_path: Optional[str] = None  # batch_info.json 本地路径
+
+
+class ThirdPartyPusher:
+    """
+    第三方平台推送器
+    负责将批次信息推送到配置的第三方平台接口
+    """
+    
+    def __init__(self, config: Dict[str, Any] = None):
+        """
+        初始化第三方平台推送器
+        
+        Args:
+            config: 第三方平台配置字典
+        """
+        from config import THIRD_PARTY_CONFIG, DEVICE_CONFIG
+        
+        self.config = config or THIRD_PARTY_CONFIG
+        self.device_config = DEVICE_CONFIG
+        
+        # 功能开关
+        self.enabled = self.config.get('enabled', False)
+        
+        # 平台配置
+        self.platform_type = self.config.get('platform_type', 'custom')
+        self.base_url = self.config.get('base_url', '')
+        self.api_version = self.config.get('api_version', 'v1')
+        
+        # 认证配置
+        self.auth_type = self.config.get('auth_type', 'none')
+        self.api_key = self.config.get('api_key', '')
+        self.api_secret = self.config.get('api_secret', '')
+        self.oauth2_config = self.config.get('oauth2', {})
+        
+        # 接口路径
+        self.endpoints = self.config.get('endpoints', {})
+        self.batch_report_url = self.endpoints.get('batch_report', '/api/batch/report')
+        self.heartbeat_url = self.endpoints.get('heartbeat', '/api/device/heartbeat')
+        
+        # 推送控制
+        self.push_interval = self.config.get('push_interval', 1.0)
+        self.retry_count = self.config.get('retry_count', 3)
+        self.retry_delay = self.config.get('retry_delay', 2.0)
+        self.timeout = self.config.get('timeout', 10)
+        self.data_format = self.config.get('data_format', 'json')
+        self.include_images = self.config.get('include_images', False)
+        
+        # OAuth2 Token
+        self._access_token = None
+        self._token_expires_at = 0
+        
+        # 上报队列
+        self.report_queue = queue.Queue()
+        
+        # 工作线程
+        self.running = False
+        self.worker_thread = None
+        
+        # 统计
+        self.stats = {
+            'total_reports': 0,
+            'success_reports': 0,
+            'failed_reports': 0,
+        }
+        self.stats_lock = threading.Lock()
+        
+        # 回调
+        self.on_report_success: Optional[Callable] = None
+        self.on_report_failed: Optional[Callable] = None
+        
+        # 最后上报时间
+        self.last_report_time = 0
+        
+        if self.enabled:
+            logger.info(f"[第三方平台] 推送器初始化完成: {self.base_url}")
+    
+    def start(self):
+        """启动推送器"""
+        if not self.enabled:
+            logger.info("[第三方平台] 推送器未启用")
+            return
+        
+        if self.running:
+            return
+        
+        self.running = True
+        self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
+        self.worker_thread.start()
+        logger.info("[第三方平台] 推送器已启动")
+    
+    def stop(self):
+        """停止推送器"""
+        self.running = False
+        if self.worker_thread:
+            self.worker_thread.join(timeout=5)
+        logger.info("[第三方平台] 推送器已停止")
+    
+    def _worker_loop(self):
+        """工作线程循环"""
+        while self.running:
+            try:
+                report = self.report_queue.get(timeout=1.0)
+                self._process_report(report)
+            except queue.Empty:
+                continue
+            except Exception as e:
+                logger.error(f"[第三方平台] 处理上报错误: {e}")
+    
+    def _get_auth_headers(self) -> Dict[str, str]:
+        """获取认证请求头"""
+        headers = {
+            'Content-Type': 'application/json',
+            'Accept': 'application/json',
+        }
+        
+        if self.auth_type == 'api_key':
+            headers['X-API-Key'] = self.api_key
+            if self.api_secret:
+                headers['X-API-Secret'] = self.api_secret
+        
+        elif self.auth_type == 'oauth2':
+            token = self._get_oauth2_token()
+            if token:
+                headers['Authorization'] = f'Bearer {token}'
+        
+        elif self.auth_type == 'basic':
+            import base64
+            credentials = base64.b64encode(f"{self.api_key}:{self.api_secret}".encode()).decode()
+            headers['Authorization'] = f'Basic {credentials}'
+        
+        return headers
+    
+    def _get_oauth2_token(self) -> Optional[str]:
+        """获取 OAuth2 Token"""
+        # 检查现有 token 是否有效
+        if self._access_token and time.time() < self._token_expires_at - 60:
+            return self._access_token
+        
+        # 重新获取 token
+        token_url = self.oauth2_config.get('token_url', '')
+        client_id = self.oauth2_config.get('client_id', '')
+        client_secret = self.oauth2_config.get('client_secret', '')
+        scope = self.oauth2_config.get('scope', '')
+        
+        if not all([token_url, client_id, client_secret]):
+            logger.error("[第三方平台] OAuth2 配置不完整")
+            return None
+        
+        try:
+            data = {
+                'grant_type': 'client_credentials',
+                'client_id': client_id,
+                'client_secret': client_secret,
+            }
+            if scope:
+                data['scope'] = scope
+            
+            response = requests.post(token_url, data=data, timeout=self.timeout)
+            
+            if response.status_code == 200:
+                result = response.json()
+                self._access_token = result.get('access_token')
+                expires_in = result.get('expires_in', 3600)
+                self._token_expires_at = time.time() + expires_in
+                logger.info("[第三方平台] OAuth2 Token 获取成功")
+                return self._access_token
+            else:
+                logger.error(f"[第三方平台] OAuth2 Token 获取失败: {response.status_code}")
+                return None
+                
+        except Exception as e:
+            logger.error(f"[第三方平台] OAuth2 Token 请求异常: {e}")
+            return None
+    
+    def _process_report(self, report: BatchReport):
+        """处理单个上报任务"""
+        # 检查推送间隔
+        current_time = time.time()
+        time_since_last = current_time - self.last_report_time
+        if time_since_last < self.push_interval:
+            time.sleep(self.push_interval - time_since_last)
+        
+        success = self._send_batch_report(report)
+        
+        with self.stats_lock:
+            self.stats['total_reports'] += 1
+            if success:
+                self.stats['success_reports'] += 1
+            else:
+                self.stats['failed_reports'] += 1
+        
+        self.last_report_time = time.time()
+        
+        # 触发回调
+        if success and self.on_report_success:
+            try:
+                self.on_report_success(report)
+            except Exception as e:
+                logger.error(f"[第三方平台] 成功回调执行错误: {e}")
+        elif not success and self.on_report_failed:
+            try:
+                self.on_report_failed(report)
+            except Exception as e:
+                logger.error(f"[第三方平台] 失败回调执行错误: {e}")
+    
+    def _send_batch_report(self, report: BatchReport) -> bool:
+        """
+        发送批次上报请求
+        
+        Args:
+            report: 批次上报数据
+            
+        Returns:
+            bool: 是否成功
+        """
+        if not self.base_url:
+            logger.error("[第三方平台] 未配置 base_url")
+            return False
+        
+        url = f"{self.base_url}{self.batch_report_url}"
+        
+        # 构建请求数据
+        payload = self._build_payload(report)
+        
+        headers = self._get_auth_headers()
+        
+        for attempt in range(self.retry_count):
+            try:
+                if self.data_format == 'json':
+                    response = requests.post(
+                        url,
+                        json=payload,
+                        headers=headers,
+                        timeout=self.timeout,
+                        verify=False
+                    )
+                else:
+                    response = requests.post(
+                        url,
+                        data=payload,
+                        headers=headers,
+                        timeout=self.timeout,
+                        verify=False
+                    )
+                
+                if response.status_code == 200:
+                    result = response.json()
+                    if result.get('code') == 200 or result.get('success') == True:
+                        logger.info(f"[第三方平台] 批次上报成功: {report.batch_id}")
+                        return True
+                    else:
+                        logger.warning(f"[第三方平台] 批次上报失败: {result.get('msg', '未知错误')}")
+                else:
+                    logger.warning(f"[第三方平台] 批次上报失败: HTTP {response.status_code}")
+                
+                if attempt < self.retry_count - 1:
+                    time.sleep(self.retry_delay)
+                    
+            except requests.exceptions.Timeout:
+                logger.warning(f"[第三方平台] 请求超时 (尝试 {attempt + 1}/{self.retry_count})")
+                if attempt < self.retry_count - 1:
+                    time.sleep(self.retry_delay)
+            except Exception as e:
+                logger.error(f"[第三方平台] 请求异常 (尝试 {attempt + 1}/{self.retry_count}): {e}")
+                if attempt < self.retry_count - 1:
+                    time.sleep(self.retry_delay)
+        
+        logger.error(f"[第三方平台] 批次上报最终失败: {report.batch_id}")
+        return False
+    
+    def _build_payload(self, report: BatchReport) -> Dict[str, Any]:
+        """
+        构建上报请求体
+        
+        Args:
+            report: 批次上报数据
+            
+        Returns:
+            Dict: 请求体字典
+        """
+        batch_info = report.batch_info
+        
+        # 标准上报格式
+        payload = {
+            'deviceId': report.device_id,
+            'projectId': report.project_id,
+            'batchId': report.batch_id,
+            'timestamp': report.timestamp,
+            'datetime': datetime.fromtimestamp(report.timestamp).isoformat(),
+            'totalPersons': batch_info.get('total_persons', 0),
+            'ptzImagesCount': batch_info.get('ptz_images_count', 0),
+            'panorama': batch_info.get('panorama', {}),
+            'persons': batch_info.get('persons', []),
+            'uploadStatus': batch_info.get('upload_status', {}),
+        }
+        
+        # 根据平台类型调整格式
+        if self.platform_type == 'jtjai':
+            # jtjai 平台特定格式
+            payload = {
+                'createTime': datetime.fromtimestamp(report.timestamp).strftime("%Y-%m-%d %H:%M:%S"),
+                'addr': f"设备{report.device_id}批次上报",
+                'ext1': json.dumps([batch_info.get('panorama', {}).get('oss_url')]),
+                'ext2': json.dumps({
+                    'batchId': report.batch_id,
+                    'deviceId': report.device_id,
+                    'projectId': report.project_id,
+                    'totalPersons': batch_info.get('total_persons', 0),
+                    'ptzImagesCount': batch_info.get('ptz_images_count', 0),
+                    'persons': batch_info.get('persons', []),
+                })
+            }
+        
+        return payload
+    
+    def report_batch(self, batch_info: Dict[str, Any], local_path: Optional[str] = None):
+        """
+        上报批次信息
+        
+        Args:
+            batch_info: batch_info.json 的字典内容
+            local_path: batch_info.json 的本地文件路径(可选)
+        """
+        if not self.enabled:
+            return
+        
+        report = BatchReport(
+            batch_id=batch_info.get('batch_id', ''),
+            device_id=batch_info.get('device_id', ''),
+            project_id=batch_info.get('project_id', ''),
+            timestamp=batch_info.get('timestamp', time.time()),
+            batch_info=batch_info,
+            local_path=local_path
+        )
+        
+        self.report_queue.put(report)
+        
+        with self.stats_lock:
+            self.stats['total_reports'] += 1
+    
+    def report_batch_sync(self, batch_info: Dict[str, Any], 
+                          local_path: Optional[str] = None) -> bool:
+        """
+        同步上报批次信息
+        
+        Args:
+            batch_info: batch_info.json 的字典内容
+            local_path: batch_info.json 的本地文件路径(可选)
+            
+        Returns:
+            bool: 是否成功
+        """
+        if not self.enabled:
+            return False
+        
+        report = BatchReport(
+            batch_id=batch_info.get('batch_id', ''),
+            device_id=batch_info.get('device_id', ''),
+            project_id=batch_info.get('project_id', ''),
+            timestamp=batch_info.get('timestamp', time.time()),
+            batch_info=batch_info,
+            local_path=local_path
+        )
+        
+        return self._send_batch_report(report)
+    
+    def send_heartbeat(self) -> bool:
+        """
+        发送心跳
+        
+        Returns:
+            bool: 是否成功
+        """
+        if not self.enabled or not self.heartbeat_url:
+            return False
+        
+        url = f"{self.base_url}{self.heartbeat_url}"
+        
+        payload = {
+            'deviceId': self.device_config.get('device_id', ''),
+            'projectId': self.device_config.get('project_id', ''),
+            'timestamp': time.time(),
+            'status': 'online',
+        }
+        
+        headers = self._get_auth_headers()
+        
+        try:
+            response = requests.post(
+                url,
+                json=payload,
+                headers=headers,
+                timeout=self.timeout,
+                verify=False
+            )
+            
+            if response.status_code == 200:
+                logger.debug("[第三方平台] 心跳发送成功")
+                return True
+            else:
+                logger.warning(f"[第三方平台] 心跳发送失败: HTTP {response.status_code}")
+                return False
+                
+        except Exception as e:
+            logger.error(f"[第三方平台] 心跳发送异常: {e}")
+            return False
+    
+    def set_callbacks(self, on_success: Callable = None, on_failed: Callable = None):
+        """
+        设置回调函数
+        
+        Args:
+            on_success: 上报成功回调
+            on_failed: 上报失败回调
+        """
+        self.on_report_success = on_success
+        self.on_report_failed = on_failed
+    
+    def get_stats(self) -> Dict[str, int]:
+        """获取统计信息"""
+        with self.stats_lock:
+            return self.stats.copy()
+    
+    def is_enabled(self) -> bool:
+        """检查是否启用"""
+        return self.enabled
+
+
+# 全局单例
+_third_party_pusher_instance: Optional[ThirdPartyPusher] = None
+
+
+def get_third_party_pusher(config: Dict[str, Any] = None) -> ThirdPartyPusher:
+    """
+    获取第三方平台推送器实例(单例模式)
+    
+    Args:
+        config: 第三方平台配置
+        
+    Returns:
+        ThirdPartyPusher 实例
+    """
+    global _third_party_pusher_instance
+    
+    if _third_party_pusher_instance is None:
+        _third_party_pusher_instance = ThirdPartyPusher(config)
+    
+    return _third_party_pusher_instance
+
+
+def reset_third_party_pusher():
+    """重置第三方平台推送器实例"""
+    global _third_party_pusher_instance
+    if _third_party_pusher_instance is not None:
+        _third_party_pusher_instance.stop()
+    _third_party_pusher_instance = None