""" 配对图片保存管理器 将全景检测图片和对应的球机聚焦图片保存到同一目录 """ import os import cv2 import time import logging import threading from pathlib import Path from datetime import datetime from typing import Optional, List, Dict, Tuple from dataclasses import dataclass, field logger = logging.getLogger(__name__) @dataclass class PersonInfo: """人员信息(轨迹追踪已禁用)""" person_index: int # 人员序号(0-based) position: Tuple[float, float] # (x_ratio, y_ratio) bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2) confidence: float ptz_position: Optional[Tuple[float, float, int]] = None # (pan, tilt, zoom) ptz_bbox: Optional[Tuple[int, int, int, int]] = None # 球机图中检测到的bbox (x1, y1, x2, y2) ptz_image_saved: bool = False ptz_image_path: Optional[str] = None @dataclass class DetectionBatch: """一批检测记录""" batch_id: str timestamp: float panorama_image: Optional[object] = None # numpy array panorama_path: Optional[str] = None persons: List[PersonInfo] = field(default_factory=list) total_persons: int = 0 ptz_images_count: int = 0 completed: bool = False class PairedImageSaver: """ 配对图片保存管理器 功能: 1. 为每次全景检测创建批次目录 2. 保存全景标记图到批次目录 3. 为每个人员保存对应的球机聚焦图到同一目录 4. 支持时间窗口内的批量保存 """ def __init__(self, base_dir: str = '/home/admin/dsh/paired_images', time_window: float = 5.0, # 时间窗口(秒) max_batches: int = 100): """ 初始化 Args: base_dir: 基础保存目录 time_window: 批次时间窗口(秒),同一窗口内的检测归为一批 max_batches: 最大保留批次数量 """ self.base_dir = Path(base_dir) self.time_window = time_window self.max_batches = max_batches self._current_batch: Optional[DetectionBatch] = None self._batch_lock = threading.Lock() self._last_batch_time = 0.0 # 统计信息 self._stats = { 'total_batches': 0, 'total_persons': 0, 'total_ptz_images': 0 } self._stats_lock = threading.Lock() # 确保目录存在 self._ensure_base_dir() logger.info(f"[配对保存] 初始化完成: 目录={base_dir}, 时间窗口={time_window}s") def _ensure_base_dir(self): """确保基础目录存在""" try: self.base_dir.mkdir(parents=True, exist_ok=True) except Exception as e: logger.error(f"[配对保存] 创建目录失败: {e}") def _generate_batch_id(self) -> str: """生成批次ID""" return datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3] def _create_batch_dir(self, batch_id: str) -> Path: """创建批次目录""" batch_dir = self.base_dir / f"batch_{batch_id}" try: batch_dir.mkdir(parents=True, exist_ok=True) return batch_dir except Exception as e: logger.error(f"[配对保存] 创建批次目录失败: {e}") return self.base_dir def start_new_batch(self, panorama_frame, persons: List[Dict]) -> Optional[str]: """ 开始新批次 Args: panorama_frame: 全景帧图像 persons: 人员列表,每项包含 track_id, position, bbox, confidence Returns: batch_id: 批次ID,失败返回 None """ with self._batch_lock: current_time = time.time() # 完成上一批次(如果有) # 注意:每次检测都创建独立批次,不复用,确保 batch_info 与实际检测一致 if self._current_batch is not None: self._finalize_batch(self._current_batch) # 创建新批次 batch_id = self._generate_batch_id() batch_dir = self._create_batch_dir(batch_id) # 保存全景图片 panorama_path = None if panorama_frame is not None: panorama_path = self._save_panorama_image( batch_dir, batch_id, panorama_frame, persons ) # 创建人员信息(轨迹追踪已禁用,使用序号代替track_id) person_infos = [] for i, p in enumerate(persons): info = PersonInfo( person_index=i, position=p.get('position', (0, 0)), bbox=p.get('bbox', (0, 0, 0, 0)), confidence=p.get('confidence', 0.0) ) person_infos.append(info) # 创建批次记录 self._current_batch = DetectionBatch( batch_id=batch_id, timestamp=current_time, panorama_image=panorama_frame, panorama_path=panorama_path, persons=person_infos, total_persons=len(persons) ) self._last_batch_time = current_time with self._stats_lock: self._stats['total_batches'] += 1 self._stats['total_persons'] += len(persons) logger.info( f"[配对保存] 新批次创建: {batch_id}, " f"人员={len(persons)}, 目录={batch_dir}" ) return batch_id def _save_panorama_image(self, batch_dir: Path, batch_id: str, frame, persons: List[Dict]) -> Optional[str]: """ 保存全景标记图片 Args: batch_dir: 批次目录 batch_id: 批次ID frame: 全景帧 persons: 人员列表(已由调用方过滤,此处不再过滤) Returns: 保存路径或 None """ try: # 复制图像避免修改原图 marked_frame = frame.copy() # 绘制每个人员的标记(使用连续的序号) # 注意:persons 已由调用方(coordinator)过滤,置信度均 >= 阈值 for i, person in enumerate(persons): bbox = person.get('bbox', (0, 0, 0, 0)) x1, y1, x2, y2 = bbox conf = person.get('confidence', 0.0) # 绘制边界框(绿色) cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 255, 0), 2) # 绘制序号标签(带置信度) label = f"person_{i}({conf:.2f})" (label_w, label_h), baseline = cv2.getTextSize( label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2 ) # 标签背景 cv2.rectangle( marked_frame, (x1, y1 - label_h - 8), (x1 + label_w, y1), (0, 255, 0), -1 ) # 标签文字(黑色) cv2.putText( marked_frame, label, (x1, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2 ) # 保存图片(使用人员数量) filename = f"00_panorama_n{len(persons)}.jpg" filepath = batch_dir / filename cv2.imwrite(str(filepath), marked_frame, [cv2.IMWRITE_JPEG_QUALITY, 90]) logger.info(f"[配对保存] 全景图已保存: {filepath},人员数量 {len(persons)}") return str(filepath) except Exception as e: logger.error(f"[配对保存] 保存全景图失败: {e}") return None def save_ptz_image(self, batch_id: str, person_index: int, ptz_frame, ptz_position: Tuple[float, float, int], ptz_bbox: Tuple[int, int, int, int] = None, person_info: Dict = None) -> Optional[str]: """ 保存球机聚焦图片 Args: batch_id: 批次ID person_index: 人员序号(0-based) ptz_frame: 球机帧 ptz_position: PTZ位置 (pan, tilt, zoom) ptz_bbox: 球机图中检测到的bbox (x1, y1, x2, y2) person_info: 额外人员信息 Returns: 保存路径或 None """ with self._batch_lock: if self._current_batch is None or self._current_batch.batch_id != batch_id: logger.warning(f"[配对保存] 批次不存在或已过期: {batch_id}") return None batch_dir = self.base_dir / f"batch_{batch_id}" try: # 复制图像 marked_frame = ptz_frame.copy() if ptz_frame is not None else None if marked_frame is not None: # 在球机图上添加标记(如果有检测框) h, w = marked_frame.shape[:2] # 添加PTZ位置信息到图片 pan, tilt, zoom = ptz_position info_text = f"PTZ: P={pan:.1f} T={tilt:.1f} Z={zoom}" cv2.putText( marked_frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2 ) # 添加人员序号 person_text = f"person_{person_index}" cv2.putText( marked_frame, person_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2 ) # 绘制PTZ检测到的bbox(红色) if ptz_bbox is not None: x1, y1, x2, y2 = ptz_bbox cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 0, 255), 2) bbox_text = f"PTZ_BBox: ({x1},{y1},{x2},{y2})" cv2.putText( marked_frame, bbox_text, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2 ) # 保存图片 filename = f"01_ptz_person{person_index}_p{int(ptz_position[0])}_t{int(ptz_position[1])}_z{int(ptz_position[2])}.jpg" filepath = batch_dir / filename if marked_frame is not None: cv2.imwrite(str(filepath), marked_frame, [cv2.IMWRITE_JPEG_QUALITY, 90]) # 更新批次信息 # 【关键修复】只有当人员索引有效时才更新和计数 if person_index < len(self._current_batch.persons): self._current_batch.persons[person_index].ptz_position = ptz_position self._current_batch.persons[person_index].ptz_bbox = ptz_bbox self._current_batch.persons[person_index].ptz_image_saved = True self._current_batch.persons[person_index].ptz_image_path = str(filepath) self._current_batch.ptz_images_count += 1 with self._stats_lock: self._stats['total_ptz_images'] += 1 logger.info(f"[配对保存] 球机图已保存: {filepath}, BBox={ptz_bbox}") else: # 人员索引超出范围,说明批次信息不一致,跳过保存 logger.warning(f"[配对保存] 人员索引 {person_index} 超出批次范围 {len(self._current_batch.persons)},跳过计数") return str(filepath) except Exception as e: logger.error(f"[配对保存] 保存球机图失败: {e}") return None def _finalize_batch(self, batch: DetectionBatch): """完成批次处理""" batch.completed = True # 创建批次信息文件 try: batch_dir = self.base_dir / f"batch_{batch.batch_id}" info_path = batch_dir / "batch_info.txt" with open(info_path, 'w', encoding='utf-8') as f: f.write(f"批次ID: {batch.batch_id}\n") f.write(f"时间戳: {datetime.fromtimestamp(batch.timestamp)}\n") f.write(f"总人数: {batch.total_persons}\n") f.write(f"球机图数量: {batch.ptz_images_count}\n") f.write(f"全景图: {batch.panorama_path}\n") f.write("\n人员详情:\n") for i, person in enumerate(batch.persons): f.write(f"\n Person {i}:\n") f.write(f" Person Index: {person.person_index}\n") f.write(f" Position: ({person.position[0]:.3f}, {person.position[1]:.3f})\n") f.write(f" BBox: ({person.bbox[0]}, {person.bbox[1]}, {person.bbox[2]}, {person.bbox[3]})\n") f.write(f" Confidence: {person.confidence:.2f}\n") f.write(f" PTZ Position: {person.ptz_position}\n") if person.ptz_bbox: f.write(f" PTZ BBox: ({person.ptz_bbox[0]}, {person.ptz_bbox[1]}, {person.ptz_bbox[2]}, {person.ptz_bbox[3]})\n") else: f.write(f" PTZ BBox: None\n") f.write(f" PTZ Image: {person.ptz_image_path}\n") logger.info(f"[配对保存] 批次完成: {batch.batch_id}, " f"人员={batch.total_persons}, 球机图={batch.ptz_images_count}") except Exception as e: logger.error(f"[配对保存] 保存批次信息失败: {e}") # 清理旧批次 self._cleanup_old_batches() def _cleanup_old_batches(self): """清理旧批次目录""" try: batch_dirs = sorted( [d for d in self.base_dir.iterdir() if d.is_dir() and d.name.startswith('batch_')], key=lambda x: x.stat().st_mtime ) if len(batch_dirs) > self.max_batches: to_delete = batch_dirs[:len(batch_dirs) - self.max_batches] for d in to_delete: import shutil shutil.rmtree(d) logger.info(f"[配对保存] 清理旧批次: {d.name}") except Exception as e: logger.error(f"[配对保存] 清理旧批次失败: {e}") def get_current_batch_id(self) -> Optional[str]: """获取当前批次ID""" with self._batch_lock: return self._current_batch.batch_id if self._current_batch else None def get_stats(self) -> Dict: """获取统计信息""" with self._stats_lock: return self._stats.copy() def close(self): """关闭管理器,完成当前批次""" with self._batch_lock: if self._current_batch is not None: self._finalize_batch(self._current_batch) self._current_batch = None logger.info("[配对保存] 管理器已关闭") # 全局单例实例 _paired_saver_instance: Optional[PairedImageSaver] = None def get_paired_saver(base_dir: str = None, time_window: float = 5.0) -> PairedImageSaver: """ 获取配对保存管理器实例(单例模式) Args: base_dir: 基础保存目录 time_window: 时间窗口 Returns: PairedImageSaver 实例 """ global _paired_saver_instance if _paired_saver_instance is None: _paired_saver_instance = PairedImageSaver( base_dir=base_dir or '/home/admin/dsh/paired_images', time_window=time_window ) return _paired_saver_instance def reset_paired_saver(): """重置单例实例(用于测试)""" global _paired_saver_instance if _paired_saver_instance is not None: _paired_saver_instance.close() _paired_saver_instance = None