| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- """
- 配对图片保存管理器
- 将全景检测图片和对应的球机聚焦图片保存到同一目录
- """
- import os
- import cv2
- import time
- import logging
- import threading
- from pathlib import Path
- from datetime import datetime
- from typing import Optional, List, Dict, Tuple
- from dataclasses import dataclass, field
- logger = logging.getLogger(__name__)
- @dataclass
- class PersonInfo:
- """人员信息(轨迹追踪已禁用)"""
- person_index: int # 人员序号(0-based)
- position: Tuple[float, float] # (x_ratio, y_ratio)
- bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2)
- confidence: float
- ptz_position: Optional[Tuple[float, float, int]] = None # (pan, tilt, zoom)
- ptz_bbox: Optional[Tuple[int, int, int, int]] = None # 球机图中检测到的bbox (x1, y1, x2, y2)
- ptz_image_saved: bool = False
- ptz_image_path: Optional[str] = None
- @dataclass
- class DetectionBatch:
- """一批检测记录"""
- batch_id: str
- timestamp: float
- panorama_image: Optional[object] = None # numpy array
- panorama_path: Optional[str] = None
- persons: List[PersonInfo] = field(default_factory=list)
- total_persons: int = 0
- ptz_images_count: int = 0
- completed: bool = False
- class PairedImageSaver:
- """
- 配对图片保存管理器
-
- 功能:
- 1. 为每次全景检测创建批次目录
- 2. 保存全景标记图到批次目录
- 3. 为每个人员保存对应的球机聚焦图到同一目录
- 4. 支持时间窗口内的批量保存
- """
-
- def __init__(self, base_dir: str = '/home/admin/dsh/paired_images',
- time_window: float = 5.0, # 时间窗口(秒)
- max_batches: int = 100):
- """
- 初始化
-
- Args:
- base_dir: 基础保存目录
- time_window: 批次时间窗口(秒),同一窗口内的检测归为一批
- max_batches: 最大保留批次数量
- """
- self.base_dir = Path(base_dir)
- self.time_window = time_window
- self.max_batches = max_batches
-
- self._current_batch: Optional[DetectionBatch] = None
- self._batch_lock = threading.Lock()
- self._last_batch_time = 0.0
-
- # 统计信息
- self._stats = {
- 'total_batches': 0,
- 'total_persons': 0,
- 'total_ptz_images': 0
- }
- self._stats_lock = threading.Lock()
-
- # 确保目录存在
- self._ensure_base_dir()
-
- logger.info(f"[配对保存] 初始化完成: 目录={base_dir}, 时间窗口={time_window}s")
-
- def _ensure_base_dir(self):
- """确保基础目录存在"""
- try:
- self.base_dir.mkdir(parents=True, exist_ok=True)
- except Exception as e:
- logger.error(f"[配对保存] 创建目录失败: {e}")
-
- def _generate_batch_id(self) -> str:
- """生成批次ID"""
- return datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
-
- def _create_batch_dir(self, batch_id: str) -> Path:
- """创建批次目录"""
- batch_dir = self.base_dir / f"batch_{batch_id}"
- try:
- batch_dir.mkdir(parents=True, exist_ok=True)
- return batch_dir
- except Exception as e:
- logger.error(f"[配对保存] 创建批次目录失败: {e}")
- return self.base_dir
-
- def start_new_batch(self, panorama_frame, persons: List[Dict]) -> Optional[str]:
- """
- 开始新批次
-
- Args:
- panorama_frame: 全景帧图像
- persons: 人员列表,每项包含 track_id, position, bbox, confidence
-
- Returns:
- batch_id: 批次ID,失败返回 None
- """
- with self._batch_lock:
- current_time = time.time()
-
- # 完成上一批次(如果有)
- # 注意:每次检测都创建独立批次,不复用,确保 batch_info 与实际检测一致
- if self._current_batch is not None:
- self._finalize_batch(self._current_batch)
-
- # 创建新批次
- batch_id = self._generate_batch_id()
- batch_dir = self._create_batch_dir(batch_id)
-
- # 保存全景图片
- panorama_path = None
- if panorama_frame is not None:
- panorama_path = self._save_panorama_image(
- batch_dir, batch_id, panorama_frame, persons
- )
-
- # 创建人员信息(轨迹追踪已禁用,使用序号代替track_id)
- person_infos = []
- for i, p in enumerate(persons):
- info = PersonInfo(
- person_index=i,
- position=p.get('position', (0, 0)),
- bbox=p.get('bbox', (0, 0, 0, 0)),
- confidence=p.get('confidence', 0.0)
- )
- person_infos.append(info)
-
- # 创建批次记录
- self._current_batch = DetectionBatch(
- batch_id=batch_id,
- timestamp=current_time,
- panorama_image=panorama_frame,
- panorama_path=panorama_path,
- persons=person_infos,
- total_persons=len(persons)
- )
-
- self._last_batch_time = current_time
-
- with self._stats_lock:
- self._stats['total_batches'] += 1
- self._stats['total_persons'] += len(persons)
-
- logger.info(
- f"[配对保存] 新批次创建: {batch_id}, "
- f"人员={len(persons)}, 目录={batch_dir}"
- )
-
- return batch_id
-
- def _save_panorama_image(self, batch_dir: Path, batch_id: str,
- frame, persons: List[Dict]) -> Optional[str]:
- """
- 保存全景标记图片
-
- Args:
- batch_dir: 批次目录
- batch_id: 批次ID
- frame: 全景帧
- persons: 人员列表(已由调用方过滤,此处不再过滤)
-
- Returns:
- 保存路径或 None
- """
- try:
- # 复制图像避免修改原图
- marked_frame = frame.copy()
-
- # 绘制每个人员的标记(使用连续的序号)
- # 注意:persons 已由调用方(coordinator)过滤,置信度均 >= 阈值
- for i, person in enumerate(persons):
- bbox = person.get('bbox', (0, 0, 0, 0))
- x1, y1, x2, y2 = bbox
- conf = person.get('confidence', 0.0)
-
- # 绘制边界框(绿色)
- cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
-
- # 绘制序号标签(带置信度)
- label = f"person_{i}({conf:.2f})"
- (label_w, label_h), baseline = cv2.getTextSize(
- label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
- )
-
- # 标签背景
- cv2.rectangle(
- marked_frame,
- (x1, y1 - label_h - 8),
- (x1 + label_w, y1),
- (0, 255, 0),
- -1
- )
-
- # 标签文字(黑色)
- cv2.putText(
- marked_frame, label,
- (x1, y1 - 4),
- cv2.FONT_HERSHEY_SIMPLEX, 0.8,
- (0, 0, 0), 2
- )
-
- # 保存图片(使用人员数量)
- filename = f"00_panorama_n{len(persons)}.jpg"
- filepath = batch_dir / filename
- cv2.imwrite(str(filepath), marked_frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
-
- logger.info(f"[配对保存] 全景图已保存: {filepath},人员数量 {len(persons)}")
- return str(filepath)
-
- except Exception as e:
- logger.error(f"[配对保存] 保存全景图失败: {e}")
- return None
-
- def save_ptz_image(self, batch_id: str, person_index: int,
- ptz_frame, ptz_position: Tuple[float, float, int],
- ptz_bbox: Tuple[int, int, int, int] = None,
- person_info: Dict = None) -> Optional[str]:
- """
- 保存球机聚焦图片
-
- Args:
- batch_id: 批次ID
- person_index: 人员序号(0-based)
- ptz_frame: 球机帧
- ptz_position: PTZ位置 (pan, tilt, zoom)
- ptz_bbox: 球机图中检测到的bbox (x1, y1, x2, y2)
- person_info: 额外人员信息
-
- Returns:
- 保存路径或 None
- """
- with self._batch_lock:
- if self._current_batch is None or self._current_batch.batch_id != batch_id:
- logger.warning(f"[配对保存] 批次不存在或已过期: {batch_id}")
- return None
-
- batch_dir = self.base_dir / f"batch_{batch_id}"
-
- try:
- # 复制图像
- marked_frame = ptz_frame.copy() if ptz_frame is not None else None
-
- if marked_frame is not None:
- # 在球机图上添加标记(如果有检测框)
- h, w = marked_frame.shape[:2]
-
- # 添加PTZ位置信息到图片
- pan, tilt, zoom = ptz_position
- info_text = f"PTZ: P={pan:.1f} T={tilt:.1f} Z={zoom}"
- cv2.putText(
- marked_frame, info_text,
- (10, 30),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7,
- (0, 255, 0), 2
- )
-
- # 添加人员序号
- person_text = f"person_{person_index}"
- cv2.putText(
- marked_frame, person_text,
- (10, 60),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7,
- (0, 255, 0), 2
- )
-
- # 绘制PTZ检测到的bbox(红色)
- if ptz_bbox is not None:
- x1, y1, x2, y2 = ptz_bbox
- cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
- bbox_text = f"PTZ_BBox: ({x1},{y1},{x2},{y2})"
- cv2.putText(
- marked_frame, bbox_text,
- (10, 90),
- cv2.FONT_HERSHEY_SIMPLEX, 0.6,
- (0, 0, 255), 2
- )
-
- # 保存图片
- filename = f"01_ptz_person{person_index}_p{int(ptz_position[0])}_t{int(ptz_position[1])}_z{int(ptz_position[2])}.jpg"
- filepath = batch_dir / filename
-
- if marked_frame is not None:
- cv2.imwrite(str(filepath), marked_frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
-
- # 更新批次信息
- if person_index < len(self._current_batch.persons):
- self._current_batch.persons[person_index].ptz_position = ptz_position
- self._current_batch.persons[person_index].ptz_bbox = ptz_bbox
- self._current_batch.persons[person_index].ptz_image_saved = True
- self._current_batch.persons[person_index].ptz_image_path = str(filepath)
-
- self._current_batch.ptz_images_count += 1
-
- with self._stats_lock:
- self._stats['total_ptz_images'] += 1
-
- logger.info(f"[配对保存] 球机图已保存: {filepath}, BBox={ptz_bbox}")
- return str(filepath)
-
- except Exception as e:
- logger.error(f"[配对保存] 保存球机图失败: {e}")
- return None
-
- def _finalize_batch(self, batch: DetectionBatch):
- """完成批次处理"""
- batch.completed = True
-
- # 创建批次信息文件
- try:
- batch_dir = self.base_dir / f"batch_{batch.batch_id}"
- info_path = batch_dir / "batch_info.txt"
-
- with open(info_path, 'w', encoding='utf-8') as f:
- f.write(f"批次ID: {batch.batch_id}\n")
- f.write(f"时间戳: {datetime.fromtimestamp(batch.timestamp)}\n")
- f.write(f"总人数: {batch.total_persons}\n")
- f.write(f"球机图数量: {batch.ptz_images_count}\n")
- f.write(f"全景图: {batch.panorama_path}\n")
- f.write("\n人员详情:\n")
-
- for i, person in enumerate(batch.persons):
- f.write(f"\n Person {i}:\n")
- f.write(f" Person Index: {person.person_index}\n")
- f.write(f" Position: ({person.position[0]:.3f}, {person.position[1]:.3f})\n")
- f.write(f" BBox: ({person.bbox[0]}, {person.bbox[1]}, {person.bbox[2]}, {person.bbox[3]})\n")
- f.write(f" Confidence: {person.confidence:.2f}\n")
- f.write(f" PTZ Position: {person.ptz_position}\n")
- if person.ptz_bbox:
- f.write(f" PTZ BBox: ({person.ptz_bbox[0]}, {person.ptz_bbox[1]}, {person.ptz_bbox[2]}, {person.ptz_bbox[3]})\n")
- else:
- f.write(f" PTZ BBox: None\n")
- f.write(f" PTZ Image: {person.ptz_image_path}\n")
-
- logger.info(f"[配对保存] 批次完成: {batch.batch_id}, "
- f"人员={batch.total_persons}, 球机图={batch.ptz_images_count}")
-
- except Exception as e:
- logger.error(f"[配对保存] 保存批次信息失败: {e}")
-
- # 清理旧批次
- self._cleanup_old_batches()
-
- def _cleanup_old_batches(self):
- """清理旧批次目录"""
- try:
- batch_dirs = sorted(
- [d for d in self.base_dir.iterdir() if d.is_dir() and d.name.startswith('batch_')],
- key=lambda x: x.stat().st_mtime
- )
-
- if len(batch_dirs) > self.max_batches:
- to_delete = batch_dirs[:len(batch_dirs) - self.max_batches]
- for d in to_delete:
- import shutil
- shutil.rmtree(d)
- logger.info(f"[配对保存] 清理旧批次: {d.name}")
-
- except Exception as e:
- logger.error(f"[配对保存] 清理旧批次失败: {e}")
-
- def get_current_batch_id(self) -> Optional[str]:
- """获取当前批次ID"""
- with self._batch_lock:
- return self._current_batch.batch_id if self._current_batch else None
-
- def get_stats(self) -> Dict:
- """获取统计信息"""
- with self._stats_lock:
- return self._stats.copy()
-
- def close(self):
- """关闭管理器,完成当前批次"""
- with self._batch_lock:
- if self._current_batch is not None:
- self._finalize_batch(self._current_batch)
- self._current_batch = None
-
- logger.info("[配对保存] 管理器已关闭")
- # 全局单例实例
- _paired_saver_instance: Optional[PairedImageSaver] = None
- def get_paired_saver(base_dir: str = None, time_window: float = 5.0) -> PairedImageSaver:
- """
- 获取配对保存管理器实例(单例模式)
-
- Args:
- base_dir: 基础保存目录
- time_window: 时间窗口
-
- Returns:
- PairedImageSaver 实例
- """
- global _paired_saver_instance
-
- if _paired_saver_instance is None:
- _paired_saver_instance = PairedImageSaver(
- base_dir=base_dir or '/home/admin/dsh/paired_images',
- time_window=time_window
- )
-
- return _paired_saver_instance
- def reset_paired_saver():
- """重置单例实例(用于测试)"""
- global _paired_saver_instance
- if _paired_saver_instance is not None:
- _paired_saver_instance.close()
- _paired_saver_instance = None
|