| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192 |
- """
- 配对图片保存管理器
- 将全景检测图片和对应的球机聚焦图片保存到同一目录
- 支持 OSS 上传和 batch_info.json 格式
- """
- import os
- import cv2
- import time
- import json
- import logging
- import threading
- from pathlib import Path
- from datetime import datetime
- from typing import Optional, List, Dict, Tuple, Callable
- from dataclasses import dataclass, field, asdict
- logger = logging.getLogger(__name__)
- @dataclass
- class PersonInfo:
- """人员信息"""
- person_index: int # 人员序号(0-based)
- position: Tuple[float, float] # (x_ratio, y_ratio)
- bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2)
- confidence: float
- ptz_position: Optional[Tuple[float, float, int]] = None # (pan, tilt, zoom)
- ptz_bbox: Optional[Tuple[int, int, int, int]] = None # 球机图中检测到的bbox (x1, y1, x2, y2)
- ptz_image_saved: bool = False
- ptz_image_path: Optional[str] = None # 标记后的球机图路径
- ptz_image_original_path: Optional[str] = None # 未标记的球机图路径
- ptz_oss_url: Optional[str] = None # 标记后球机图 OSS URL
- ptz_oss_url_original: Optional[str] = None # 未标记球机图 OSS URL
- @dataclass
- class DetectionBatch:
- """一批检测记录"""
- batch_id: str
- timestamp: float
- panorama_image: Optional[object] = None # numpy array
- panorama_path: Optional[str] = None # 标记后的全景图路径
- panorama_original_path: Optional[str] = None # 未标记的全景图路径
- panorama_oss_url: Optional[str] = None # 标记后全景图 OSS URL
- panorama_oss_url_original: Optional[str] = None # 未标记全景图 OSS URL
- persons: List[PersonInfo] = field(default_factory=list)
- total_persons: int = 0
- ptz_images_count: int = 0
- completed: bool = False
- device_id: str = '' # 设备编号
- project_id: str = '' # 项目编号
- class PairedImageSaver:
- """
- 配对图片保存管理器
-
- 功能:
- 1. 为每次全景检测创建批次目录
- 2. 保存全景标记图到批次目录
- 3. 为每个人员保存对应的球机聚焦图到同一目录
- 4. 支持时间窗口内的批量保存
- 5. 支持 OSS 上传
- 6. 生成 batch_info.json
- """
-
- def __init__(self, base_dir: str = None,
- time_window: float = None,
- max_batches: int = None,
- cleanup_enabled: bool = None,
- retention_days: int = None,
- enable_oss: bool = False,
- oss_uploader = None,
- device_config: Dict = None):
- """
- 初始化
- Args:
- base_dir: 基础保存目录(默认从配置读取)
- time_window: 批次时间窗口(秒),同一窗口内的检测归为一批
- max_batches: 最大保留批次数量
- cleanup_enabled: 是否启用自动清理
- retention_days: 保留天数
- enable_oss: 是否启用 OSS 上传
- oss_uploader: OSS 上传器实例
- device_config: 设备配置字典
- """
- # 从配置模块读取配对图片保存配置
- try:
- from config import PAIRED_IMAGE_CONFIG
- config = PAIRED_IMAGE_CONFIG
- except ImportError:
- config = {}
- # 使用传入参数或配置默认值
- self.base_dir = Path(base_dir or config.get('base_dir', '/home/admin/dsh/paired_images'))
- self.time_window = time_window or config.get('time_window', 5.0)
- self.max_batches = max_batches if max_batches is not None else config.get('max_batches', 100)
- self.cleanup_enabled = cleanup_enabled if cleanup_enabled is not None else config.get('cleanup_enabled', True)
- self.retention_days = retention_days if retention_days is not None else config.get('retention_days', 7)
-
- # 从配置模块读取 OSS 和设备配置(确保即使外部不传也能正确配置)
- try:
- from config import S3_COMPATIBLE_CONFIG, DEVICE_CONFIG
- # OSS 配置:优先使用传入参数,否则从配置模块读取
- oss_enabled_in_config = S3_COMPATIBLE_CONFIG.get('enabled', False)
- if enable_oss or (not enable_oss and oss_enabled_in_config):
- self.enable_oss = True
- else:
- self.enable_oss = enable_oss
- logger.info(f"[配对保存] OSS配置: enable_oss={enable_oss}, config_enabled={oss_enabled_in_config}, 最终启用={self.enable_oss}")
- # OSS 上传器:优先使用传入的实例,否则从全局获取
- if oss_uploader is not None:
- self.oss_uploader = oss_uploader
- logger.info("[配对保存] 使用传入的 OSS 上传器")
- elif self.enable_oss:
- try:
- from oss_uploader import get_oss_uploader
- self.oss_uploader = get_oss_uploader()
- logger.info(f"[配对保存] 获取 OSS 上传器: enabled={self.oss_uploader.enabled}, running={getattr(self.oss_uploader, 'running', False)}")
- if not self.oss_uploader.running:
- self.oss_uploader.start()
- logger.info("[配对保存] OSS 上传器已启动")
- except Exception as e:
- logger.warning(f"[配对保存] OSS 上传器初始化失败: {e}")
- self.oss_uploader = None
- self.enable_oss = False
- else:
- self.oss_uploader = None
- # 设备配置:合并传入参数和配置模块
- self.device_config = DEVICE_CONFIG.copy()
- if device_config:
- self.device_config.update(device_config)
- except ImportError as e:
- # 配置模块不可用时使用传入参数
- logger.warning(f"[配对保存] 配置模块导入失败: {e}")
- self.enable_oss = enable_oss
- self.oss_uploader = oss_uploader
- self.device_config = device_config or {}
-
- self._current_batch: Optional[DetectionBatch] = None
- self._batch_lock = threading.Lock()
- self._last_batch_time = 0.0
- # 保存任务队列(必须在线程启动前初始化)
- self._save_queue = []
- self._save_queue_lock = threading.Lock()
- # 后台线程池(用于异步保存图片和上传OSS,不阻塞主识别线程)
- self._save_thread_pool = threading.Thread(target=self._save_worker, daemon=True)
- self._save_thread_pool.start()
- # 上传状态追踪
- self._upload_status: Dict[str, Dict] = {} # batch_id -> {panorama: bool, ptz: Dict}
- self._upload_callback: Optional[Callable] = None
- # 统计信息
- self._stats = {
- 'total_batches': 0,
- 'total_persons': 0,
- 'total_ptz_images': 0,
- 'oss_upload_success': 0,
- 'oss_upload_failed': 0,
- }
- self._stats_lock = threading.Lock()
-
- # 确保目录存在
- self._ensure_base_dir()
-
- logger.info(f"[配对保存] 初始化完成: 目录={self.base_dir}, 时间窗口={self.time_window}s, "
- f"最大批次={self.max_batches}, 清理={self.cleanup_enabled}, 保留天数={self.retention_days}, OSS={enable_oss}")
-
- def _ensure_base_dir(self):
- """确保基础目录存在"""
- try:
- self.base_dir.mkdir(parents=True, exist_ok=True)
- except Exception as e:
- logger.error(f"[配对保存] 创建目录失败: {e}")
- def _save_worker(self):
- """后台工作线程:异步保存图片和上传OSS"""
- logger.info("[配对保存] 后台保存线程已启动")
- while True:
- task = None
- with self._save_queue_lock:
- if self._save_queue:
- task = self._save_queue.pop(0)
- if task is not None:
- task_type = task.get('type')
- try:
- if task_type == 'panorama':
- self._async_save_panorama(task)
- elif task_type == 'ptz':
- self._async_save_ptz(task)
- elif task_type == 'finalize':
- self._async_finalize_batch(task)
- except Exception as e:
- logger.error(f"[配对保存] 后台任务执行失败: {e}")
- else:
- time.sleep(0.1) # 无任务时短暂休眠
- def _async_save_panorama(self, task: Dict):
- """异步保存全景图并上传OSS"""
- batch_id = task['batch_id']
- batch_dir = task['batch_dir']
- frame = task['frame']
- persons = task['persons']
- try:
- # 保存原图 - JPEG 压缩 (质量 85%)
- original_filename = f"00_panorama_original_n{len(persons)}.jpg"
- original_filepath = batch_dir / original_filename
- encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
- result, encoded = cv2.imencode('.jpg', frame, encode_param)
- if result:
- with open(str(original_filepath), 'wb') as f:
- f.write(encoded)
- # 保存标记图
- marked_frame = frame.copy()
- for i, person in enumerate(persons):
- bbox = person.get('bbox', (0, 0, 0, 0))
- x1, y1, x2, y2 = bbox
- conf = person.get('confidence', 0.0)
- cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
- label = f"person_{i}({conf:.2f})"
- (label_w, label_h), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
- cv2.rectangle(marked_frame, (x1, y1 - label_h - 8), (x1 + label_w, y1), (0, 255, 0), -1)
- cv2.putText(marked_frame, label, (x1, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
- marked_filename = f"00_panorama_marked_n{len(persons)}.jpg"
- marked_filepath = batch_dir / marked_filename
- encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
- result, encoded = cv2.imencode('.jpg', marked_frame, encode_param)
- if result:
- with open(str(marked_filepath), 'wb') as f:
- f.write(encoded)
- logger.info(f"[配对保存] 全景图已保存: batch={batch_id}")
- # 异步上传OSS
- if self.enable_oss and self.oss_uploader:
- if original_filepath.exists():
- self._upload_panorama_to_oss(batch_id, str(original_filepath), image_type='panorama_original')
- if marked_filepath.exists():
- self._upload_panorama_to_oss(batch_id, str(marked_filepath), image_type='panorama')
- except Exception as e:
- logger.error(f"[配对保存] 异步保存全景图失败: {e}")
- def _async_save_ptz(self, task: Dict):
- """异步保存球机图并上传OSS"""
- batch_id = task['batch_id']
- batch_dir = task['batch_dir']
- person_index = task['person_index']
- ptz_frame = task['ptz_frame']
- ptz_position = task['ptz_position']
- ptz_frame_marked = task.get('ptz_frame_marked')
- ptz_bbox = task.get('ptz_bbox')
- try:
- pan, tilt, zoom = ptz_position
- # 保存原图(未标记)- JPEG 压缩
- original_filename = f"01_ptz_person{person_index}_p{int(pan)}_t{int(tilt)}_z{int(zoom)}_original.jpg"
- original_filepath = batch_dir / original_filename
- if ptz_frame is not None:
- encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
- result, encoded = cv2.imencode('.jpg', ptz_frame, encode_param)
- if result:
- with open(str(original_filepath), 'wb') as f:
- f.write(encoded)
- # 保存标记图 - JPEG 压缩
- marked_frame = ptz_frame_marked if ptz_frame_marked is not None else None
- if marked_frame is None and ptz_frame is not None:
- marked_frame = ptz_frame.copy()
- h, w = marked_frame.shape[:2]
- info_text = f"PTZ: P={pan:.1f} T={tilt:.1f} Z={zoom}"
- cv2.putText(marked_frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
- person_text = f"person_{person_index}"
- cv2.putText(marked_frame, person_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
- if ptz_bbox is not None:
- x1, y1, x2, y2 = ptz_bbox
- cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
- bbox_text = f"PTZ_BBox: ({x1},{y1},{x2},{y2})"
- cv2.putText(marked_frame, bbox_text, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
- marked_filename = f"01_ptz_person{person_index}_p{int(pan)}_t{int(tilt)}_z{int(zoom)}_marked.jpg"
- marked_filepath = batch_dir / marked_filename
- if marked_frame is not None:
- encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
- result, encoded = cv2.imencode('.jpg', marked_frame, encode_param)
- if result:
- with open(str(marked_filepath), 'wb') as f:
- f.write(encoded)
- logger.info(f"[配对保存] 球机图已保存: batch={batch_id}, person={person_index}")
- # 异步上传OSS
- if self.enable_oss and self.oss_uploader:
- if original_filepath.exists():
- self._upload_ptz_to_oss(batch_id, person_index, str(original_filepath), str(marked_filepath))
- except Exception as e:
- logger.error(f"[配对保存] 异步保存球机图失败: {e}")
- def _async_finalize_batch(self, task: Dict):
- """异步完成批次(生成batch_info.json + 上报第三方)"""
- batch = task['batch']
- try:
- batch.completed = True
- batch_dir = self.base_dir / f"batch_{batch.batch_id}"
- # 等待 OSS 上传完成(最多等待30秒)
- if self.enable_oss:
- wait_start = time.time()
- max_wait = 30.0
- # 统计需要等待的 PTZ 上传数量
- expected_ptz_count = sum(1 for p in batch.persons if p.ptz_image_saved)
- while time.time() - wait_start < max_wait:
- status = self._upload_status.get(batch.batch_id, {})
- panorama_done = status.get('panorama_url') is not None or not batch.panorama_path
- ptz_marked_status = status.get('ptz_marked', {})
- ptz_original_status = status.get('ptz_original', {})
- # 只有当 PTZ 上传状态已提交后才判断完成,避免空字典误判
- if expected_ptz_count == 0:
- ptz_done = True
- else:
- ptz_done = (
- len(ptz_marked_status) >= expected_ptz_count
- and len(ptz_original_status) >= expected_ptz_count
- and all(
- ptz_marked_status.get(idx) is not None and ptz_original_status.get(idx) is not None
- for idx, person in enumerate(batch.persons)
- if person.ptz_image_saved
- )
- )
- if panorama_done and ptz_done:
- break
- time.sleep(0.5)
- # 构建并保存 batch_info.json
- batch_info = self._build_batch_info_json(batch)
- json_path = batch_dir / "batch_info.json"
- with open(json_path, 'w', encoding='utf-8') as f:
- json.dump(batch_info, f, ensure_ascii=False, indent=2)
- txt_path = batch_dir / "batch_info.txt"
- self._save_batch_info_txt(batch, txt_path)
- # 校验 OSS URL 不能为 null,上报前确保所有图片已上传
- upload_status = self._upload_status.get(batch.batch_id, {})
- missing_urls = []
- # 检查全景图
- if not upload_status.get('panorama_url') and not batch.panorama_oss_url:
- missing_urls.append('panorama')
- if not upload_status.get('panorama_original_url') and not batch.panorama_oss_url_original:
- missing_urls.append('panorama_original')
- # 检查球机图
- for idx, person in enumerate(batch.persons):
- if person.ptz_image_saved:
- ptz_marked = upload_status.get('ptz_marked', {}).get(idx) or person.ptz_oss_url
- ptz_original = upload_status.get('ptz_original', {}).get(idx) or person.ptz_oss_url_original
- if not ptz_marked:
- missing_urls.append(f'ptz_{idx}_marked')
- if not ptz_original:
- missing_urls.append(f'ptz_{idx}_original')
- if missing_urls:
- logger.warning(f"[配对保存] OSS 上传未完成,跳过上报: batch_id={batch.batch_id}, 缺失: {missing_urls}")
- # 上报第三方平台
- self._report_to_third_party(batch_info)
- # 标记上传完成
- if batch.batch_id in self._upload_status:
- self._upload_status[batch.batch_id]['completed'] = True
- # 触发回调
- if self._upload_callback:
- try:
- self._upload_callback(batch_info)
- except Exception as e:
- logger.error(f"[配对保存] 回调执行错误: {e}")
- logger.info(f"[配对保存] 批次完成: {batch.batch_id}")
- except Exception as e:
- logger.error(f"[配对保存] 异步完成批次失败: {e}")
- finally:
- # 确保清理逻辑始终执行
- self._cleanup_old_batches()
- def _queue_save_task(self, task: Dict):
- """添加保存任务到队列"""
- with self._save_queue_lock:
- self._save_queue.append(task)
-
- def _generate_batch_id(self) -> str:
- """生成批次ID"""
- return datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
-
- def _create_batch_dir(self, batch_id: str) -> Path:
- """创建批次目录"""
- batch_dir = self.base_dir / f"batch_{batch_id}"
- try:
- batch_dir.mkdir(parents=True, exist_ok=True)
- return batch_dir
- except Exception as e:
- logger.error(f"[配对保存] 创建批次目录失败: {e}")
- return self.base_dir
-
- def start_new_batch(self, panorama_frame, persons: List[Dict]) -> Optional[str]:
- """
- 开始新批次
-
- Args:
- panorama_frame: 全景帧图像
- persons: 人员列表,每项包含 track_id, position, bbox, confidence
-
- Returns:
- batch_id: 批次ID,失败返回 None
- """
- with self._batch_lock:
- current_time = time.time()
-
- # 完成上一批次(如果有)- 异步执行,不阻塞主线程
- if self._current_batch is not None:
- self._queue_save_task({
- 'type': 'finalize',
- 'batch': self._current_batch
- })
-
- # 创建新批次
- batch_id = self._generate_batch_id()
- batch_dir = self._create_batch_dir(batch_id)
- # 异步保存全景图片(原图和标记图),不阻塞主线程
- panorama_original_path = None
- panorama_marked_path = None
- if panorama_frame is not None:
- # 异步保存,路径在保存完成后会更新
- panorama_original_path, panorama_marked_path = self._save_panorama_image(
- batch_dir, batch_id, panorama_frame, persons
- )
- # 异步上传OSS(不阻塞主线程)
- if panorama_original_path and panorama_marked_path:
- self._queue_save_task({
- 'type': 'panorama',
- 'batch_id': batch_id,
- 'batch_dir': batch_dir,
- 'frame': panorama_frame,
- 'persons': persons
- })
- # 创建人员信息
- person_infos = []
- for i, p in enumerate(persons):
- info = PersonInfo(
- person_index=i,
- position=p.get('position', (0, 0)),
- bbox=p.get('bbox', (0, 0, 0, 0)),
- confidence=p.get('confidence', 0.0)
- )
- person_infos.append(info)
- # 获取设备信息
- device_id = self.device_config.get('device_id', 'UNKNOWN')
- project_id = self.device_config.get('project_id', 'UNKNOWN')
- # 创建批次记录
- self._current_batch = DetectionBatch(
- batch_id=batch_id,
- timestamp=current_time,
- panorama_image=panorama_frame,
- panorama_path=panorama_marked_path, # 标记图作为主路径
- panorama_original_path=panorama_original_path, # 原图路径
- persons=person_infos,
- total_persons=len(persons),
- device_id=device_id,
- project_id=project_id
- )
-
- # 初始化上传状态
- self._upload_status[batch_id] = {
- 'panorama': False,
- 'panorama_url': None,
- 'ptz': {},
- 'completed': False
- }
-
- self._last_batch_time = current_time
-
- with self._stats_lock:
- self._stats['total_batches'] += 1
- self._stats['total_persons'] += len(persons)
-
- logger.info(
- f"[配对保存] 新批次创建: {batch_id}, "
- f"人员={len(persons)}, 目录={batch_dir}"
- )
- return batch_id
-
- def _save_panorama_image(self, batch_dir: Path, batch_id: str,
- frame, persons: List[Dict]) -> Tuple[Optional[str], Optional[str]]:
- """
- 保存全景原图和标记图片
- Args:
- batch_dir: 批次目录
- batch_id: 批次ID
- frame: 全景帧
- persons: 人员列表(已由调用方过滤,此处不再过滤)
- Returns:
- (原图路径, 标记图路径) 或 (None, None)
- """
- try:
- # 保存原图(未标记)- JPEG 压缩
- original_filename = f"00_panorama_original_n{len(persons)}.jpg"
- original_filepath = batch_dir / original_filename
- encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
- result, encoded = cv2.imencode('.jpg', frame, encode_param)
- if result:
- with open(str(original_filepath), 'wb') as f:
- f.write(encoded)
- # 复制图像避免修改原图
- marked_frame = frame.copy()
- # 绘制每个人员的标记(使用连续的序号)
- for i, person in enumerate(persons):
- bbox = person.get('bbox', (0, 0, 0, 0))
- x1, y1, x2, y2 = bbox
- conf = person.get('confidence', 0.0)
- # 绘制边界框(绿色)
- cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
- # 绘制序号标签(带置信度)
- label = f"person_{i}({conf:.2f})"
- (label_w, label_h), baseline = cv2.getTextSize(
- label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
- )
- # 标签背景
- cv2.rectangle(
- marked_frame,
- (x1, y1 - label_h - 8),
- (x1 + label_w, y1),
- (0, 255, 0),
- -1
- )
- # 标签文字(黑色)
- cv2.putText(
- marked_frame, label,
- (x1, y1 - 4),
- cv2.FONT_HERSHEY_SIMPLEX, 0.8,
- (0, 0, 0), 2
- )
- # 保存标记图
- marked_filename = f"00_panorama_marked_n{len(persons)}.jpg"
- marked_filepath = batch_dir / marked_filename
- encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
- result, encoded = cv2.imencode('.jpg', marked_frame, encode_param)
- if result:
- with open(str(marked_filepath), 'wb') as f:
- f.write(encoded)
- logger.info(f"[配对保存] 全景图已保存: 原图={original_filepath}, 标记图={marked_filepath}, 人员数量 {len(persons)}")
- return str(original_filepath), str(marked_filepath)
- except Exception as e:
- logger.error(f"[配对保存] 保存全景图失败: {e}")
- return None, None
-
- def save_ptz_image(self, batch_id: str, person_index: int,
- ptz_frame, ptz_position: Tuple[float, float, int],
- ptz_bbox: Tuple[int, int, int, int] = None,
- person_info: Dict = None,
- ptz_frame_marked: object = None) -> Tuple[Optional[str], Optional[str]]:
- """
- 保存球机聚焦图片(原图和标记图)
- Args:
- batch_id: 批次ID
- person_index: 人员序号(0-based)
- ptz_frame: 球机原始帧(未标记)
- ptz_position: PTZ位置 (pan, tilt, zoom)
- ptz_bbox: 球机图中检测到的bbox (x1, y1, x2, y2)
- person_info: 额外人员信息
- ptz_frame_marked: 外部传入的已标记帧(可选,不传则在内部生成标记图)
- Returns:
- (原图路径, 标记图路径) 或 (None, None)
- """
- logger.info(f"[配对保存] save_ptz_image: batch={batch_id}, person={person_index}, "
- f"PTZ=({ptz_position[0]:.1f}°, {ptz_position[1]:.1f}°, zoom={ptz_position[2]})")
- with self._batch_lock:
- if self._current_batch is None or self._current_batch.batch_id != batch_id:
- logger.warning(f"[配对保存] 批次不存在或已过期: {batch_id}")
- return None, None
- batch_dir = self.base_dir / f"batch_{batch_id}"
- try:
- # 保存原图(未标记)- JPEG 压缩
- original_filename = f"01_ptz_person{person_index}_p{int(ptz_position[0])}_t{int(ptz_position[1])}_z{int(ptz_position[2])}_original.jpg"
- original_filepath = batch_dir / original_filename
- if ptz_frame is not None:
- encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
- result, encoded = cv2.imencode('.jpg', ptz_frame, encode_param)
- if result:
- with open(str(original_filepath), 'wb') as f:
- f.write(encoded)
- # 优先使用外部传入的标记帧,否则内部生成
- if ptz_frame_marked is not None:
- marked_frame = ptz_frame_marked
- else:
- # 复制图像用于标记
- marked_frame = ptz_frame.copy() if ptz_frame is not None else None
- if marked_frame is not None:
- # 在球机图上添加标记
- h, w = marked_frame.shape[:2]
- # 添加PTZ位置信息
- pan, tilt, zoom = ptz_position
- info_text = f"PTZ: P={pan:.1f} T={tilt:.1f} Z={zoom}"
- cv2.putText(marked_frame, info_text, (10, 30),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
- # 添加人员序号
- person_text = f"person_{person_index}"
- cv2.putText(marked_frame, person_text, (10, 60),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
- # 绘制PTZ检测到的bbox
- if ptz_bbox is not None:
- x1, y1, x2, y2 = ptz_bbox
- cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
- bbox_text = f"PTZ_BBox: ({x1},{y1},{x2},{y2})"
- cv2.putText(marked_frame, bbox_text, (10, 90),
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
- # 保存标记图 - JPEG 压缩
- marked_filename = f"01_ptz_person{person_index}_p{int(ptz_position[0])}_t{int(ptz_position[1])}_z{int(ptz_position[2])}_marked.jpg"
- marked_filepath = batch_dir / marked_filename
- if marked_frame is not None:
- encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
- result, encoded = cv2.imencode('.jpg', marked_frame, encode_param)
- if result:
- with open(str(marked_filepath), 'wb') as f:
- f.write(encoded)
- logger.info(f"[配对保存] 球机图已保存: 原图={original_filepath}, 标记图={marked_filepath}")
- # 更新批次信息
- if person_index < len(self._current_batch.persons):
- self._current_batch.persons[person_index].ptz_position = ptz_position
- self._current_batch.persons[person_index].ptz_bbox = ptz_bbox
- self._current_batch.persons[person_index].ptz_image_saved = True
- self._current_batch.persons[person_index].ptz_image_path = str(marked_filepath)
- self._current_batch.persons[person_index].ptz_image_original_path = str(original_filepath)
- self._current_batch.ptz_images_count += 1
- with self._stats_lock:
- self._stats['total_ptz_images'] += 1
- # 异步上传OSS(不阻塞主线程)
- if self.enable_oss and self.oss_uploader:
- self._queue_save_task({
- 'type': 'ptz',
- 'batch_id': batch_id,
- 'batch_dir': batch_dir,
- 'person_index': person_index,
- 'ptz_frame': ptz_frame,
- 'ptz_position': ptz_position,
- 'ptz_frame_marked': ptz_frame_marked,
- 'ptz_bbox': ptz_bbox
- })
- return str(original_filepath), str(marked_filepath)
- except Exception as e:
- logger.error(f"[配对保存] 保存球机图失败: {e}")
- return None, None
-
- def _upload_panorama_to_oss(self, batch_id: str, panorama_path: str, image_type: str = 'panorama'):
- """上传全景图到 OSS"""
- logger.info(f"[OSS] _upload_panorama_to_oss: batch_id={batch_id}, path={panorama_path}, type={image_type}")
- # 确定是原图还是标记图
- is_original = image_type == 'panorama_original'
- def on_upload_complete(result):
- logger.info(f"[OSS] 全景图上传完成回调: type={image_type}, url={result.oss_url}")
- if is_original:
- self._upload_status[batch_id]['panorama_original'] = True
- self._upload_status[batch_id]['panorama_original_url'] = result.oss_url
- if self._current_batch and self._current_batch.batch_id == batch_id:
- self._current_batch.panorama_oss_url_original = result.oss_url
- else:
- self._upload_status[batch_id]['panorama'] = True
- self._upload_status[batch_id]['panorama_url'] = result.oss_url
- if self._current_batch and self._current_batch.batch_id == batch_id:
- self._current_batch.panorama_oss_url = result.oss_url
- with self._stats_lock:
- self._stats['oss_upload_success'] += 1
- def on_upload_error(result):
- logger.error(f"[OSS] 全景图上传失败回调: type={image_type}, error={result.error}")
- with self._stats_lock:
- self._stats['oss_upload_failed'] += 1
- def on_upload_done(result):
- if result.success:
- on_upload_complete(result)
- else:
- on_upload_error(result)
- try:
- oss_key = self.oss_uploader.upload_image(
- local_path=panorama_path,
- batch_id=batch_id,
- image_type=image_type,
- callback=on_upload_done
- )
- logger.info(f"[OSS] 全景图已加入上传队列: type={image_type}, oss_key={oss_key}")
- except Exception as e:
- logger.error(f"[OSS] 全景图上传异常: {e}")
-
- def _upload_ptz_to_oss(self, batch_id: str, person_index: int, original_path: str = None, marked_path: str = None):
- """上传球机图到 OSS(原图和标记图)"""
- logger.info(f"[OSS] _upload_ptz_to_oss: batch={batch_id}, person={person_index}, original={original_path}, marked={marked_path}")
- # 初始化上传状态
- if batch_id not in self._upload_status:
- self._upload_status[batch_id] = {'ptz': {}}
- if 'ptz_original' not in self._upload_status[batch_id]:
- self._upload_status[batch_id]['ptz_original'] = {}
- if 'ptz_marked' not in self._upload_status[batch_id]:
- self._upload_status[batch_id]['ptz_marked'] = {}
- # 上传原图
- if original_path:
- def on_original_complete(result):
- self._upload_status[batch_id]['ptz_original'][person_index] = result.oss_url
- if self._current_batch and self._current_batch.batch_id == batch_id:
- if person_index < len(self._current_batch.persons):
- self._current_batch.persons[person_index].ptz_oss_url_original = result.oss_url
- with self._stats_lock:
- self._stats['oss_upload_success'] += 1
- logger.info(f"[OSS] 球机原图上传成功: {result.oss_url}")
- def on_original_done(result):
- if result.success:
- on_original_complete(result)
- else:
- logger.error(f"[OSS] 球机原图上传失败: {result.error}")
- try:
- self.oss_uploader.upload_image(
- local_path=original_path,
- batch_id=batch_id,
- image_type='ptz_original',
- person_index=person_index,
- callback=on_original_done
- )
- except Exception as e:
- logger.error(f"[OSS] 球机原图上传异常: {e}")
- # 上传标记图
- if marked_path:
- def on_marked_complete(result):
- self._upload_status[batch_id]['ptz_marked'][person_index] = result.oss_url
- if self._current_batch and self._current_batch.batch_id == batch_id:
- if person_index < len(self._current_batch.persons):
- self._current_batch.persons[person_index].ptz_oss_url = result.oss_url
- with self._stats_lock:
- self._stats['oss_upload_success'] += 1
- logger.info(f"[OSS] 球机标记图上传成功: {result.oss_url}")
- def on_marked_done(result):
- if result.success:
- on_marked_complete(result)
- else:
- logger.error(f"[OSS] 球机标记图上传失败: {result.error}")
- try:
- self.oss_uploader.upload_image(
- local_path=marked_path,
- batch_id=batch_id,
- image_type='ptz',
- person_index=person_index,
- callback=on_marked_done
- )
- except Exception as e:
- logger.error(f"[OSS] 球机标记图上传异常: {e}")
-
- def _finalize_batch(self, batch: DetectionBatch):
- """完成批次处理"""
- batch.completed = True
- # 等待 OSS 上传完成(最多等待10秒)
- if self.enable_oss and batch.batch_id in self._upload_status:
- wait_start = time.time()
- max_wait = 10.0 # 增加等待时间
- while time.time() - wait_start < max_wait:
- status = self._upload_status[batch.batch_id]
- # 检查全景图是否上传完成(优先检查 _upload_status 中的状态)
- panorama_url = status.get('panorama_url')
- panorama_done = panorama_url is not None or not batch.panorama_path
- # 检查所有球机图是否上传完成
- ptz_status = status.get('ptz', {})
- ptz_done = all(
- ptz_status.get(idx) is not None
- for idx, person in enumerate(batch.persons)
- if person.ptz_image_saved
- )
- if panorama_done and ptz_done:
- break
- time.sleep(0.2)
-
- # 创建 batch_info.json 文件
- try:
- batch_dir = self.base_dir / f"batch_{batch.batch_id}"
-
- # 构建 JSON 数据
- batch_info = self._build_batch_info_json(batch)
-
- # 保存为 JSON 文件
- json_path = batch_dir / "batch_info.json"
- with open(json_path, 'w', encoding='utf-8') as f:
- json.dump(batch_info, f, ensure_ascii=False, indent=2)
-
- logger.info(f"[配对保存] batch_info.json 已保存: {json_path}")
-
- # 同时保留 txt 格式用于兼容(可选)
- txt_path = batch_dir / "batch_info.txt"
- self._save_batch_info_txt(batch, txt_path)
-
- # 标记上传完成
- if batch.batch_id in self._upload_status:
- self._upload_status[batch.batch_id]['completed'] = True
-
- # 触发回调
- if self._upload_callback:
- try:
- self._upload_callback(batch_info)
- except Exception as e:
- logger.error(f"[配对保存] 回调执行错误: {e}")
-
- logger.info(f"[配对保存] 批次完成: {batch.batch_id}, "
- f"人员={batch.total_persons}, 球机图={batch.ptz_images_count}")
-
- except Exception as e:
- logger.error(f"[配对保存] 保存批次信息失败: {e}")
-
- # 清理旧批次
- self._cleanup_old_batches()
- def _report_to_third_party(self, batch_info: Dict):
- """上报批次信息到第三方平台"""
- try:
- from config import THIRD_PARTY_CONFIG
- except ImportError:
- logger.warning("[第三方] 配置模块不可用,跳过上报")
- return
- if not THIRD_PARTY_CONFIG.get('enabled', False):
- return
- base_url = THIRD_PARTY_CONFIG.get('base_url', '')
- endpoint = THIRD_PARTY_CONFIG.get('endpoints', {}).get('batch_report', '')
- if not base_url or not endpoint:
- logger.warning("[第三方] 接口地址未配置,跳过上报")
- return
- url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}"
- retry_count = THIRD_PARTY_CONFIG.get('retry_count', 3)
- retry_delay = THIRD_PARTY_CONFIG.get('retry_delay', 2.0)
- timeout = THIRD_PARTY_CONFIG.get('timeout', 10)
- for attempt in range(1, retry_count + 1):
- try:
- import requests
- resp = requests.post(url, json=batch_info, timeout=timeout)
- if resp.status_code in (200, 201):
- logger.info(f"[第三方] 批次上报成功: batch_id={batch_info.get('batch_id')}, status={resp.status_code}")
- return
- else:
- logger.warning(f"[第三方] 批次上报失败: status={resp.status_code}, body={resp.text[:200]}")
- except Exception as e:
- logger.warning(f"[第三方] 批次上报异常(第{attempt}次): {e}")
- if attempt < retry_count:
- time.sleep(retry_delay)
- logger.error(f"[第三方] 批次上报最终失败: batch_id={batch_info.get('batch_id')}")
- def _build_batch_info_json(self, batch: DetectionBatch) -> Dict:
- """
- 构建 batch_info.json 数据结构
- Returns:
- Dict: 批次信息字典
- """
- # 从上传状态获取最新的 OSS URL
- upload_status = self._upload_status.get(batch.batch_id, {})
- # 获取全景图 OSS URL
- panorama_oss_url = upload_status.get('panorama_url', batch.panorama_oss_url)
- panorama_oss_url_original = upload_status.get('panorama_original_url', batch.panorama_oss_url_original)
- # 人员信息列表
- persons_list = []
- for person in batch.persons:
- # 获取球机图 OSS URL
- ptz_oss_url = upload_status.get('ptz_marked', {}).get(person.person_index, person.ptz_oss_url)
- ptz_oss_url_original = upload_status.get('ptz_original', {}).get(person.person_index, person.ptz_oss_url_original)
- person_data = {
- 'person_index': person.person_index,
- 'position': {
- 'x': round(person.position[0], 4),
- 'y': round(person.position[1], 4)
- },
- 'bbox': {
- 'x1': person.bbox[0],
- 'y1': person.bbox[1],
- 'x2': person.bbox[2],
- 'y2': person.bbox[3]
- },
- 'confidence': round(person.confidence, 4),
- 'ptz_position': {
- 'pan': round(person.ptz_position[0], 2) if person.ptz_position else None,
- 'tilt': round(person.ptz_position[1], 2) if person.ptz_position else None,
- 'zoom': person.ptz_position[2] if person.ptz_position else None
- } if person.ptz_position else None,
- 'ptz_bbox': {
- 'x1': person.ptz_bbox[0],
- 'y1': person.ptz_bbox[1],
- 'x2': person.ptz_bbox[2],
- 'y2': person.ptz_bbox[3]
- } if person.ptz_bbox else None,
- 'ptz_image_saved': person.ptz_image_saved,
- 'ptz_image_path': person.ptz_image_path, # 标记图
- 'ptz_image_original_path': person.ptz_image_original_path, # 原图
- 'ptz_oss_url': ptz_oss_url, # 标记图 OSS URL
- 'ptz_oss_url_original': ptz_oss_url_original # 原图 OSS URL
- }
- persons_list.append(person_data)
- # 构建完整批次信息
- batch_info = {
- 'batch_id': batch.batch_id,
- 'device_id': batch.device_id,
- 'project_id': batch.project_id,
- 'timestamp': batch.timestamp,
- 'datetime': datetime.fromtimestamp(batch.timestamp).isoformat(),
- 'total_persons': batch.total_persons,
- 'ptz_images_count': batch.ptz_images_count,
- 'panorama': {
- 'local_path': batch.panorama_path, # 标记图
- 'local_path_original': batch.panorama_original_path, # 原图
- 'oss_url': panorama_oss_url, # 标记图 OSS URL
- 'oss_url_original': panorama_oss_url_original # 原图 OSS URL
- },
- 'persons': persons_list,
- 'upload_status': {
- 'panorama_uploaded': panorama_oss_url is not None,
- 'panorama_original_uploaded': panorama_oss_url_original is not None,
- 'all_ptz_uploaded': all(
- upload_status.get('ptz', {}).get(p.person_index) is not None
- for p in batch.persons if p.ptz_image_saved
- )
- }
- }
- return batch_info
-
- def _save_batch_info_txt(self, batch: DetectionBatch, txt_path: Path):
- """保存批次信息为 TXT 格式(兼容旧版本)"""
- try:
- with open(txt_path, 'w', encoding='utf-8') as f:
- f.write(f"批次ID: {batch.batch_id}\n")
- f.write(f"设备ID: {batch.device_id}\n")
- f.write(f"项目ID: {batch.project_id}\n")
- f.write(f"时间戳: {datetime.fromtimestamp(batch.timestamp)}\n")
- f.write(f"总人数: {batch.total_persons}\n")
- f.write(f"球机图数量: {batch.ptz_images_count}\n")
- f.write(f"全景图: {batch.panorama_path}\n")
- f.write(f"全景图OSS: {batch.panorama_oss_url}\n")
- f.write("\n人员详情:\n")
-
- for i, person in enumerate(batch.persons):
- f.write(f"\n Person {i}:\n")
- f.write(f" Person Index: {person.person_index}\n")
- f.write(f" Position: ({person.position[0]:.3f}, {person.position[1]:.3f})\n")
- f.write(f" BBox: ({person.bbox[0]}, {person.bbox[1]}, {person.bbox[2]}, {person.bbox[3]})\n")
- f.write(f" Confidence: {person.confidence:.2f}\n")
- f.write(f" PTZ Position: {person.ptz_position}\n")
- if person.ptz_bbox:
- f.write(f" PTZ BBox: ({person.ptz_bbox[0]}, {person.ptz_bbox[1]}, {person.ptz_bbox[2]}, {person.ptz_bbox[3]})\n")
- else:
- f.write(f" PTZ BBox: None\n")
- f.write(f" PTZ Image: {person.ptz_image_path}\n")
- f.write(f" PTZ OSS URL: {person.ptz_oss_url}\n")
- except Exception as e:
- logger.error(f"[配对保存] 保存 TXT 批次信息失败: {e}")
-
- def _cleanup_old_batches(self):
- """清理旧批次目录"""
- if not self.cleanup_enabled:
- logger.debug("[配对保存] 自动清理已禁用")
- return
- try:
- batch_dirs = sorted(
- [d for d in self.base_dir.iterdir() if d.is_dir() and d.name.startswith('batch_')],
- key=lambda x: x.stat().st_mtime
- )
- if not batch_dirs:
- return
- now = time.time()
- to_delete = []
- for d in batch_dirs:
- # 按数量清理:超出最大保留数量时删除最旧的
- if len(batch_dirs) - len(to_delete) > self.max_batches:
- to_delete.append(d)
- continue
- # 按天数清理:超过保留天数时删除
- if self.retention_days > 0:
- age_days = (now - d.stat().st_mtime) / 86400
- if age_days > self.retention_days:
- to_delete.append(d)
- # 执行删除
- deleted_count = 0
- for d in to_delete:
- import shutil
- shutil.rmtree(d)
- deleted_count += 1
- logger.info(f"[配对保存] 清理旧批次: {d.name}")
- if deleted_count > 0:
- logger.info(f"[配对保存] 共清理 {deleted_count} 个旧批次")
- except Exception as e:
- logger.error(f"[配对保存] 清理旧批次失败: {e}")
-
- def get_current_batch_id(self) -> Optional[str]:
- """获取当前批次ID"""
- with self._batch_lock:
- return self._current_batch.batch_id if self._current_batch else None
-
- def get_stats(self) -> Dict:
- """获取统计信息"""
- with self._stats_lock:
- return self._stats.copy()
-
- def set_upload_callback(self, callback: Callable):
- """
- 设置批次完成回调函数
-
- Args:
- callback: 回调函数,接收 batch_info_dict 参数
- """
- self._upload_callback = callback
-
- def get_batch_info(self, batch_id: str) -> Optional[Dict]:
- """
- 获取指定批次的 batch_info.json 数据
-
- Args:
- batch_id: 批次ID
-
- Returns:
- Dict 或 None
- """
- try:
- batch_dir = self.base_dir / f"batch_{batch_id}"
- json_path = batch_dir / "batch_info.json"
-
- if json_path.exists():
- with open(json_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- except Exception as e:
- logger.error(f"[配对保存] 读取 batch_info.json 失败: {e}")
-
- return None
-
- def close(self):
- """关闭管理器,完成当前批次(异步)"""
- with self._batch_lock:
- if self._current_batch is not None:
- # 异步完成最后批次
- self._queue_save_task({
- 'type': 'finalize',
- 'batch': self._current_batch
- })
- # 短暂等待让后台线程处理
- time.sleep(0.5)
- self._current_batch = None
- logger.info("[配对保存] 管理器已关闭")
- # 全局单例实例
- _paired_saver_instance: Optional[PairedImageSaver] = None
- def get_paired_saver(base_dir: str = None, time_window: float = None,
- max_batches: int = None, cleanup_enabled: bool = None,
- retention_days: int = None,
- enable_oss: bool = False, oss_uploader = None,
- device_config: Dict = None) -> PairedImageSaver:
- """
- 获取配对保存管理器实例(单例模式)
- 如果实例已存在但缺少 OSS/设备配置,会自动从配置模块更新
- Args:
- base_dir: 基础保存目录(默认从配置读取)
- time_window: 时间窗口(默认从配置读取)
- max_batches: 最大保留批次数量(默认从配置读取)
- cleanup_enabled: 是否启用自动清理(默认从配置读取)
- retention_days: 保留天数(默认从配置读取)
- enable_oss: 是否启用 OSS 上传
- oss_uploader: OSS 上传器实例
- device_config: 设备配置字典
- Returns:
- PairedImageSaver 实例
- """
- global _paired_saver_instance
- if _paired_saver_instance is None:
- _paired_saver_instance = PairedImageSaver(
- base_dir=base_dir,
- time_window=time_window,
- max_batches=max_batches,
- cleanup_enabled=cleanup_enabled,
- retention_days=retention_days,
- enable_oss=enable_oss,
- oss_uploader=oss_uploader,
- device_config=device_config
- )
- else:
- # 单例已存在,检查是否需要更新配置
- # PairedImageSaver.__init__ 已从配置模块自动读取,
- # 这里只处理外部显式传入的参数覆盖
- if oss_uploader is not None and _paired_saver_instance.oss_uploader is None:
- _paired_saver_instance.oss_uploader = oss_uploader
- _paired_saver_instance.enable_oss = True
- logger.info("[配对保存] 更新 OSS 上传器配置")
- if device_config is not None:
- _paired_saver_instance.device_config.update(device_config)
- logger.info(f"[配对保存] 更新设备配置: {device_config}")
- return _paired_saver_instance
- def reset_paired_saver():
- """重置单例实例(用于测试)"""
- global _paired_saver_instance
- if _paired_saver_instance is not None:
- _paired_saver_instance.close()
- _paired_saver_instance = None
|