paired_image_saver.py 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192
  1. """
  2. 配对图片保存管理器
  3. 将全景检测图片和对应的球机聚焦图片保存到同一目录
  4. 支持 OSS 上传和 batch_info.json 格式
  5. """
  6. import os
  7. import cv2
  8. import time
  9. import json
  10. import logging
  11. import threading
  12. from pathlib import Path
  13. from datetime import datetime
  14. from typing import Optional, List, Dict, Tuple, Callable
  15. from dataclasses import dataclass, field, asdict
  16. logger = logging.getLogger(__name__)
  17. @dataclass
  18. class PersonInfo:
  19. """人员信息"""
  20. person_index: int # 人员序号(0-based)
  21. position: Tuple[float, float] # (x_ratio, y_ratio)
  22. bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2)
  23. confidence: float
  24. ptz_position: Optional[Tuple[float, float, int]] = None # (pan, tilt, zoom)
  25. ptz_bbox: Optional[Tuple[int, int, int, int]] = None # 球机图中检测到的bbox (x1, y1, x2, y2)
  26. ptz_image_saved: bool = False
  27. ptz_image_path: Optional[str] = None # 标记后的球机图路径
  28. ptz_image_original_path: Optional[str] = None # 未标记的球机图路径
  29. ptz_oss_url: Optional[str] = None # 标记后球机图 OSS URL
  30. ptz_oss_url_original: Optional[str] = None # 未标记球机图 OSS URL
  31. @dataclass
  32. class DetectionBatch:
  33. """一批检测记录"""
  34. batch_id: str
  35. timestamp: float
  36. panorama_image: Optional[object] = None # numpy array
  37. panorama_path: Optional[str] = None # 标记后的全景图路径
  38. panorama_original_path: Optional[str] = None # 未标记的全景图路径
  39. panorama_oss_url: Optional[str] = None # 标记后全景图 OSS URL
  40. panorama_oss_url_original: Optional[str] = None # 未标记全景图 OSS URL
  41. persons: List[PersonInfo] = field(default_factory=list)
  42. total_persons: int = 0
  43. ptz_images_count: int = 0
  44. completed: bool = False
  45. device_id: str = '' # 设备编号
  46. project_id: str = '' # 项目编号
  47. class PairedImageSaver:
  48. """
  49. 配对图片保存管理器
  50. 功能:
  51. 1. 为每次全景检测创建批次目录
  52. 2. 保存全景标记图到批次目录
  53. 3. 为每个人员保存对应的球机聚焦图到同一目录
  54. 4. 支持时间窗口内的批量保存
  55. 5. 支持 OSS 上传
  56. 6. 生成 batch_info.json
  57. """
  58. def __init__(self, base_dir: str = None,
  59. time_window: float = None,
  60. max_batches: int = None,
  61. cleanup_enabled: bool = None,
  62. retention_days: int = None,
  63. enable_oss: bool = False,
  64. oss_uploader = None,
  65. device_config: Dict = None):
  66. """
  67. 初始化
  68. Args:
  69. base_dir: 基础保存目录(默认从配置读取)
  70. time_window: 批次时间窗口(秒),同一窗口内的检测归为一批
  71. max_batches: 最大保留批次数量
  72. cleanup_enabled: 是否启用自动清理
  73. retention_days: 保留天数
  74. enable_oss: 是否启用 OSS 上传
  75. oss_uploader: OSS 上传器实例
  76. device_config: 设备配置字典
  77. """
  78. # 从配置模块读取配对图片保存配置
  79. try:
  80. from config import PAIRED_IMAGE_CONFIG
  81. config = PAIRED_IMAGE_CONFIG
  82. except ImportError:
  83. config = {}
  84. # 使用传入参数或配置默认值
  85. self.base_dir = Path(base_dir or config.get('base_dir', '/home/admin/dsh/paired_images'))
  86. self.time_window = time_window or config.get('time_window', 5.0)
  87. self.max_batches = max_batches if max_batches is not None else config.get('max_batches', 100)
  88. self.cleanup_enabled = cleanup_enabled if cleanup_enabled is not None else config.get('cleanup_enabled', True)
  89. self.retention_days = retention_days if retention_days is not None else config.get('retention_days', 7)
  90. # 从配置模块读取 OSS 和设备配置(确保即使外部不传也能正确配置)
  91. try:
  92. from config import S3_COMPATIBLE_CONFIG, DEVICE_CONFIG
  93. # OSS 配置:优先使用传入参数,否则从配置模块读取
  94. oss_enabled_in_config = S3_COMPATIBLE_CONFIG.get('enabled', False)
  95. if enable_oss or (not enable_oss and oss_enabled_in_config):
  96. self.enable_oss = True
  97. else:
  98. self.enable_oss = enable_oss
  99. logger.info(f"[配对保存] OSS配置: enable_oss={enable_oss}, config_enabled={oss_enabled_in_config}, 最终启用={self.enable_oss}")
  100. # OSS 上传器:优先使用传入的实例,否则从全局获取
  101. if oss_uploader is not None:
  102. self.oss_uploader = oss_uploader
  103. logger.info("[配对保存] 使用传入的 OSS 上传器")
  104. elif self.enable_oss:
  105. try:
  106. from oss_uploader import get_oss_uploader
  107. self.oss_uploader = get_oss_uploader()
  108. logger.info(f"[配对保存] 获取 OSS 上传器: enabled={self.oss_uploader.enabled}, running={getattr(self.oss_uploader, 'running', False)}")
  109. if not self.oss_uploader.running:
  110. self.oss_uploader.start()
  111. logger.info("[配对保存] OSS 上传器已启动")
  112. except Exception as e:
  113. logger.warning(f"[配对保存] OSS 上传器初始化失败: {e}")
  114. self.oss_uploader = None
  115. self.enable_oss = False
  116. else:
  117. self.oss_uploader = None
  118. # 设备配置:合并传入参数和配置模块
  119. self.device_config = DEVICE_CONFIG.copy()
  120. if device_config:
  121. self.device_config.update(device_config)
  122. except ImportError as e:
  123. # 配置模块不可用时使用传入参数
  124. logger.warning(f"[配对保存] 配置模块导入失败: {e}")
  125. self.enable_oss = enable_oss
  126. self.oss_uploader = oss_uploader
  127. self.device_config = device_config or {}
  128. self._current_batch: Optional[DetectionBatch] = None
  129. self._batch_lock = threading.Lock()
  130. self._last_batch_time = 0.0
  131. # 保存任务队列(必须在线程启动前初始化)
  132. self._save_queue = []
  133. self._save_queue_lock = threading.Lock()
  134. # 后台线程池(用于异步保存图片和上传OSS,不阻塞主识别线程)
  135. self._save_thread_pool = threading.Thread(target=self._save_worker, daemon=True)
  136. self._save_thread_pool.start()
  137. # 上传状态追踪
  138. self._upload_status: Dict[str, Dict] = {} # batch_id -> {panorama: bool, ptz: Dict}
  139. self._upload_callback: Optional[Callable] = None
  140. # 统计信息
  141. self._stats = {
  142. 'total_batches': 0,
  143. 'total_persons': 0,
  144. 'total_ptz_images': 0,
  145. 'oss_upload_success': 0,
  146. 'oss_upload_failed': 0,
  147. }
  148. self._stats_lock = threading.Lock()
  149. # 确保目录存在
  150. self._ensure_base_dir()
  151. logger.info(f"[配对保存] 初始化完成: 目录={self.base_dir}, 时间窗口={self.time_window}s, "
  152. f"最大批次={self.max_batches}, 清理={self.cleanup_enabled}, 保留天数={self.retention_days}, OSS={enable_oss}")
  153. def _ensure_base_dir(self):
  154. """确保基础目录存在"""
  155. try:
  156. self.base_dir.mkdir(parents=True, exist_ok=True)
  157. except Exception as e:
  158. logger.error(f"[配对保存] 创建目录失败: {e}")
  159. def _save_worker(self):
  160. """后台工作线程:异步保存图片和上传OSS"""
  161. logger.info("[配对保存] 后台保存线程已启动")
  162. while True:
  163. task = None
  164. with self._save_queue_lock:
  165. if self._save_queue:
  166. task = self._save_queue.pop(0)
  167. if task is not None:
  168. task_type = task.get('type')
  169. try:
  170. if task_type == 'panorama':
  171. self._async_save_panorama(task)
  172. elif task_type == 'ptz':
  173. self._async_save_ptz(task)
  174. elif task_type == 'finalize':
  175. self._async_finalize_batch(task)
  176. except Exception as e:
  177. logger.error(f"[配对保存] 后台任务执行失败: {e}")
  178. else:
  179. time.sleep(0.1) # 无任务时短暂休眠
  180. def _async_save_panorama(self, task: Dict):
  181. """异步保存全景图并上传OSS"""
  182. batch_id = task['batch_id']
  183. batch_dir = task['batch_dir']
  184. frame = task['frame']
  185. persons = task['persons']
  186. try:
  187. # 保存原图 - JPEG 压缩 (质量 85%)
  188. original_filename = f"00_panorama_original_n{len(persons)}.jpg"
  189. original_filepath = batch_dir / original_filename
  190. encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
  191. result, encoded = cv2.imencode('.jpg', frame, encode_param)
  192. if result:
  193. with open(str(original_filepath), 'wb') as f:
  194. f.write(encoded)
  195. # 保存标记图
  196. marked_frame = frame.copy()
  197. for i, person in enumerate(persons):
  198. bbox = person.get('bbox', (0, 0, 0, 0))
  199. x1, y1, x2, y2 = bbox
  200. conf = person.get('confidence', 0.0)
  201. cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
  202. label = f"person_{i}({conf:.2f})"
  203. (label_w, label_h), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
  204. cv2.rectangle(marked_frame, (x1, y1 - label_h - 8), (x1 + label_w, y1), (0, 255, 0), -1)
  205. cv2.putText(marked_frame, label, (x1, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
  206. marked_filename = f"00_panorama_marked_n{len(persons)}.jpg"
  207. marked_filepath = batch_dir / marked_filename
  208. encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
  209. result, encoded = cv2.imencode('.jpg', marked_frame, encode_param)
  210. if result:
  211. with open(str(marked_filepath), 'wb') as f:
  212. f.write(encoded)
  213. logger.info(f"[配对保存] 全景图已保存: batch={batch_id}")
  214. # 异步上传OSS
  215. if self.enable_oss and self.oss_uploader:
  216. if original_filepath.exists():
  217. self._upload_panorama_to_oss(batch_id, str(original_filepath), image_type='panorama_original')
  218. if marked_filepath.exists():
  219. self._upload_panorama_to_oss(batch_id, str(marked_filepath), image_type='panorama')
  220. except Exception as e:
  221. logger.error(f"[配对保存] 异步保存全景图失败: {e}")
  222. def _async_save_ptz(self, task: Dict):
  223. """异步保存球机图并上传OSS"""
  224. batch_id = task['batch_id']
  225. batch_dir = task['batch_dir']
  226. person_index = task['person_index']
  227. ptz_frame = task['ptz_frame']
  228. ptz_position = task['ptz_position']
  229. ptz_frame_marked = task.get('ptz_frame_marked')
  230. ptz_bbox = task.get('ptz_bbox')
  231. try:
  232. pan, tilt, zoom = ptz_position
  233. # 保存原图(未标记)- JPEG 压缩
  234. original_filename = f"01_ptz_person{person_index}_p{int(pan)}_t{int(tilt)}_z{int(zoom)}_original.jpg"
  235. original_filepath = batch_dir / original_filename
  236. if ptz_frame is not None:
  237. encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
  238. result, encoded = cv2.imencode('.jpg', ptz_frame, encode_param)
  239. if result:
  240. with open(str(original_filepath), 'wb') as f:
  241. f.write(encoded)
  242. # 保存标记图 - JPEG 压缩
  243. marked_frame = ptz_frame_marked if ptz_frame_marked is not None else None
  244. if marked_frame is None and ptz_frame is not None:
  245. marked_frame = ptz_frame.copy()
  246. h, w = marked_frame.shape[:2]
  247. info_text = f"PTZ: P={pan:.1f} T={tilt:.1f} Z={zoom}"
  248. cv2.putText(marked_frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
  249. person_text = f"person_{person_index}"
  250. cv2.putText(marked_frame, person_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
  251. if ptz_bbox is not None:
  252. x1, y1, x2, y2 = ptz_bbox
  253. cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
  254. bbox_text = f"PTZ_BBox: ({x1},{y1},{x2},{y2})"
  255. cv2.putText(marked_frame, bbox_text, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
  256. marked_filename = f"01_ptz_person{person_index}_p{int(pan)}_t{int(tilt)}_z{int(zoom)}_marked.jpg"
  257. marked_filepath = batch_dir / marked_filename
  258. if marked_frame is not None:
  259. encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
  260. result, encoded = cv2.imencode('.jpg', marked_frame, encode_param)
  261. if result:
  262. with open(str(marked_filepath), 'wb') as f:
  263. f.write(encoded)
  264. logger.info(f"[配对保存] 球机图已保存: batch={batch_id}, person={person_index}")
  265. # 异步上传OSS
  266. if self.enable_oss and self.oss_uploader:
  267. if original_filepath.exists():
  268. self._upload_ptz_to_oss(batch_id, person_index, str(original_filepath), str(marked_filepath))
  269. except Exception as e:
  270. logger.error(f"[配对保存] 异步保存球机图失败: {e}")
  271. def _async_finalize_batch(self, task: Dict):
  272. """异步完成批次(生成batch_info.json + 上报第三方)"""
  273. batch = task['batch']
  274. try:
  275. batch.completed = True
  276. batch_dir = self.base_dir / f"batch_{batch.batch_id}"
  277. # 等待 OSS 上传完成(最多等待30秒)
  278. if self.enable_oss:
  279. wait_start = time.time()
  280. max_wait = 30.0
  281. # 统计需要等待的 PTZ 上传数量
  282. expected_ptz_count = sum(1 for p in batch.persons if p.ptz_image_saved)
  283. while time.time() - wait_start < max_wait:
  284. status = self._upload_status.get(batch.batch_id, {})
  285. panorama_done = status.get('panorama_url') is not None or not batch.panorama_path
  286. ptz_marked_status = status.get('ptz_marked', {})
  287. ptz_original_status = status.get('ptz_original', {})
  288. # 只有当 PTZ 上传状态已提交后才判断完成,避免空字典误判
  289. if expected_ptz_count == 0:
  290. ptz_done = True
  291. else:
  292. ptz_done = (
  293. len(ptz_marked_status) >= expected_ptz_count
  294. and len(ptz_original_status) >= expected_ptz_count
  295. and all(
  296. ptz_marked_status.get(idx) is not None and ptz_original_status.get(idx) is not None
  297. for idx, person in enumerate(batch.persons)
  298. if person.ptz_image_saved
  299. )
  300. )
  301. if panorama_done and ptz_done:
  302. break
  303. time.sleep(0.5)
  304. # 构建并保存 batch_info.json
  305. batch_info = self._build_batch_info_json(batch)
  306. json_path = batch_dir / "batch_info.json"
  307. with open(json_path, 'w', encoding='utf-8') as f:
  308. json.dump(batch_info, f, ensure_ascii=False, indent=2)
  309. txt_path = batch_dir / "batch_info.txt"
  310. self._save_batch_info_txt(batch, txt_path)
  311. # 校验 OSS URL 不能为 null,上报前确保所有图片已上传
  312. upload_status = self._upload_status.get(batch.batch_id, {})
  313. missing_urls = []
  314. # 检查全景图
  315. if not upload_status.get('panorama_url') and not batch.panorama_oss_url:
  316. missing_urls.append('panorama')
  317. if not upload_status.get('panorama_original_url') and not batch.panorama_oss_url_original:
  318. missing_urls.append('panorama_original')
  319. # 检查球机图
  320. for idx, person in enumerate(batch.persons):
  321. if person.ptz_image_saved:
  322. ptz_marked = upload_status.get('ptz_marked', {}).get(idx) or person.ptz_oss_url
  323. ptz_original = upload_status.get('ptz_original', {}).get(idx) or person.ptz_oss_url_original
  324. if not ptz_marked:
  325. missing_urls.append(f'ptz_{idx}_marked')
  326. if not ptz_original:
  327. missing_urls.append(f'ptz_{idx}_original')
  328. if missing_urls:
  329. logger.warning(f"[配对保存] OSS 上传未完成,跳过上报: batch_id={batch.batch_id}, 缺失: {missing_urls}")
  330. # 上报第三方平台
  331. self._report_to_third_party(batch_info)
  332. # 标记上传完成
  333. if batch.batch_id in self._upload_status:
  334. self._upload_status[batch.batch_id]['completed'] = True
  335. # 触发回调
  336. if self._upload_callback:
  337. try:
  338. self._upload_callback(batch_info)
  339. except Exception as e:
  340. logger.error(f"[配对保存] 回调执行错误: {e}")
  341. logger.info(f"[配对保存] 批次完成: {batch.batch_id}")
  342. except Exception as e:
  343. logger.error(f"[配对保存] 异步完成批次失败: {e}")
  344. finally:
  345. # 确保清理逻辑始终执行
  346. self._cleanup_old_batches()
  347. def _queue_save_task(self, task: Dict):
  348. """添加保存任务到队列"""
  349. with self._save_queue_lock:
  350. self._save_queue.append(task)
  351. def _generate_batch_id(self) -> str:
  352. """生成批次ID"""
  353. return datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
  354. def _create_batch_dir(self, batch_id: str) -> Path:
  355. """创建批次目录"""
  356. batch_dir = self.base_dir / f"batch_{batch_id}"
  357. try:
  358. batch_dir.mkdir(parents=True, exist_ok=True)
  359. return batch_dir
  360. except Exception as e:
  361. logger.error(f"[配对保存] 创建批次目录失败: {e}")
  362. return self.base_dir
  363. def start_new_batch(self, panorama_frame, persons: List[Dict]) -> Optional[str]:
  364. """
  365. 开始新批次
  366. Args:
  367. panorama_frame: 全景帧图像
  368. persons: 人员列表,每项包含 track_id, position, bbox, confidence
  369. Returns:
  370. batch_id: 批次ID,失败返回 None
  371. """
  372. with self._batch_lock:
  373. current_time = time.time()
  374. # 完成上一批次(如果有)- 异步执行,不阻塞主线程
  375. if self._current_batch is not None:
  376. self._queue_save_task({
  377. 'type': 'finalize',
  378. 'batch': self._current_batch
  379. })
  380. # 创建新批次
  381. batch_id = self._generate_batch_id()
  382. batch_dir = self._create_batch_dir(batch_id)
  383. # 异步保存全景图片(原图和标记图),不阻塞主线程
  384. panorama_original_path = None
  385. panorama_marked_path = None
  386. if panorama_frame is not None:
  387. # 异步保存,路径在保存完成后会更新
  388. panorama_original_path, panorama_marked_path = self._save_panorama_image(
  389. batch_dir, batch_id, panorama_frame, persons
  390. )
  391. # 异步上传OSS(不阻塞主线程)
  392. if panorama_original_path and panorama_marked_path:
  393. self._queue_save_task({
  394. 'type': 'panorama',
  395. 'batch_id': batch_id,
  396. 'batch_dir': batch_dir,
  397. 'frame': panorama_frame,
  398. 'persons': persons
  399. })
  400. # 创建人员信息
  401. person_infos = []
  402. for i, p in enumerate(persons):
  403. info = PersonInfo(
  404. person_index=i,
  405. position=p.get('position', (0, 0)),
  406. bbox=p.get('bbox', (0, 0, 0, 0)),
  407. confidence=p.get('confidence', 0.0)
  408. )
  409. person_infos.append(info)
  410. # 获取设备信息
  411. device_id = self.device_config.get('device_id', 'UNKNOWN')
  412. project_id = self.device_config.get('project_id', 'UNKNOWN')
  413. # 创建批次记录
  414. self._current_batch = DetectionBatch(
  415. batch_id=batch_id,
  416. timestamp=current_time,
  417. panorama_image=panorama_frame,
  418. panorama_path=panorama_marked_path, # 标记图作为主路径
  419. panorama_original_path=panorama_original_path, # 原图路径
  420. persons=person_infos,
  421. total_persons=len(persons),
  422. device_id=device_id,
  423. project_id=project_id
  424. )
  425. # 初始化上传状态
  426. self._upload_status[batch_id] = {
  427. 'panorama': False,
  428. 'panorama_url': None,
  429. 'ptz': {},
  430. 'completed': False
  431. }
  432. self._last_batch_time = current_time
  433. with self._stats_lock:
  434. self._stats['total_batches'] += 1
  435. self._stats['total_persons'] += len(persons)
  436. logger.info(
  437. f"[配对保存] 新批次创建: {batch_id}, "
  438. f"人员={len(persons)}, 目录={batch_dir}"
  439. )
  440. return batch_id
  441. def _save_panorama_image(self, batch_dir: Path, batch_id: str,
  442. frame, persons: List[Dict]) -> Tuple[Optional[str], Optional[str]]:
  443. """
  444. 保存全景原图和标记图片
  445. Args:
  446. batch_dir: 批次目录
  447. batch_id: 批次ID
  448. frame: 全景帧
  449. persons: 人员列表(已由调用方过滤,此处不再过滤)
  450. Returns:
  451. (原图路径, 标记图路径) 或 (None, None)
  452. """
  453. try:
  454. # 保存原图(未标记)- JPEG 压缩
  455. original_filename = f"00_panorama_original_n{len(persons)}.jpg"
  456. original_filepath = batch_dir / original_filename
  457. encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
  458. result, encoded = cv2.imencode('.jpg', frame, encode_param)
  459. if result:
  460. with open(str(original_filepath), 'wb') as f:
  461. f.write(encoded)
  462. # 复制图像避免修改原图
  463. marked_frame = frame.copy()
  464. # 绘制每个人员的标记(使用连续的序号)
  465. for i, person in enumerate(persons):
  466. bbox = person.get('bbox', (0, 0, 0, 0))
  467. x1, y1, x2, y2 = bbox
  468. conf = person.get('confidence', 0.0)
  469. # 绘制边界框(绿色)
  470. cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
  471. # 绘制序号标签(带置信度)
  472. label = f"person_{i}({conf:.2f})"
  473. (label_w, label_h), baseline = cv2.getTextSize(
  474. label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
  475. )
  476. # 标签背景
  477. cv2.rectangle(
  478. marked_frame,
  479. (x1, y1 - label_h - 8),
  480. (x1 + label_w, y1),
  481. (0, 255, 0),
  482. -1
  483. )
  484. # 标签文字(黑色)
  485. cv2.putText(
  486. marked_frame, label,
  487. (x1, y1 - 4),
  488. cv2.FONT_HERSHEY_SIMPLEX, 0.8,
  489. (0, 0, 0), 2
  490. )
  491. # 保存标记图
  492. marked_filename = f"00_panorama_marked_n{len(persons)}.jpg"
  493. marked_filepath = batch_dir / marked_filename
  494. encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
  495. result, encoded = cv2.imencode('.jpg', marked_frame, encode_param)
  496. if result:
  497. with open(str(marked_filepath), 'wb') as f:
  498. f.write(encoded)
  499. logger.info(f"[配对保存] 全景图已保存: 原图={original_filepath}, 标记图={marked_filepath}, 人员数量 {len(persons)}")
  500. return str(original_filepath), str(marked_filepath)
  501. except Exception as e:
  502. logger.error(f"[配对保存] 保存全景图失败: {e}")
  503. return None, None
  504. def save_ptz_image(self, batch_id: str, person_index: int,
  505. ptz_frame, ptz_position: Tuple[float, float, int],
  506. ptz_bbox: Tuple[int, int, int, int] = None,
  507. person_info: Dict = None,
  508. ptz_frame_marked: object = None) -> Tuple[Optional[str], Optional[str]]:
  509. """
  510. 保存球机聚焦图片(原图和标记图)
  511. Args:
  512. batch_id: 批次ID
  513. person_index: 人员序号(0-based)
  514. ptz_frame: 球机原始帧(未标记)
  515. ptz_position: PTZ位置 (pan, tilt, zoom)
  516. ptz_bbox: 球机图中检测到的bbox (x1, y1, x2, y2)
  517. person_info: 额外人员信息
  518. ptz_frame_marked: 外部传入的已标记帧(可选,不传则在内部生成标记图)
  519. Returns:
  520. (原图路径, 标记图路径) 或 (None, None)
  521. """
  522. logger.info(f"[配对保存] save_ptz_image: batch={batch_id}, person={person_index}, "
  523. f"PTZ=({ptz_position[0]:.1f}°, {ptz_position[1]:.1f}°, zoom={ptz_position[2]})")
  524. with self._batch_lock:
  525. if self._current_batch is None or self._current_batch.batch_id != batch_id:
  526. logger.warning(f"[配对保存] 批次不存在或已过期: {batch_id}")
  527. return None, None
  528. batch_dir = self.base_dir / f"batch_{batch_id}"
  529. try:
  530. # 保存原图(未标记)- JPEG 压缩
  531. original_filename = f"01_ptz_person{person_index}_p{int(ptz_position[0])}_t{int(ptz_position[1])}_z{int(ptz_position[2])}_original.jpg"
  532. original_filepath = batch_dir / original_filename
  533. if ptz_frame is not None:
  534. encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
  535. result, encoded = cv2.imencode('.jpg', ptz_frame, encode_param)
  536. if result:
  537. with open(str(original_filepath), 'wb') as f:
  538. f.write(encoded)
  539. # 优先使用外部传入的标记帧,否则内部生成
  540. if ptz_frame_marked is not None:
  541. marked_frame = ptz_frame_marked
  542. else:
  543. # 复制图像用于标记
  544. marked_frame = ptz_frame.copy() if ptz_frame is not None else None
  545. if marked_frame is not None:
  546. # 在球机图上添加标记
  547. h, w = marked_frame.shape[:2]
  548. # 添加PTZ位置信息
  549. pan, tilt, zoom = ptz_position
  550. info_text = f"PTZ: P={pan:.1f} T={tilt:.1f} Z={zoom}"
  551. cv2.putText(marked_frame, info_text, (10, 30),
  552. cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
  553. # 添加人员序号
  554. person_text = f"person_{person_index}"
  555. cv2.putText(marked_frame, person_text, (10, 60),
  556. cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
  557. # 绘制PTZ检测到的bbox
  558. if ptz_bbox is not None:
  559. x1, y1, x2, y2 = ptz_bbox
  560. cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
  561. bbox_text = f"PTZ_BBox: ({x1},{y1},{x2},{y2})"
  562. cv2.putText(marked_frame, bbox_text, (10, 90),
  563. cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
  564. # 保存标记图 - JPEG 压缩
  565. marked_filename = f"01_ptz_person{person_index}_p{int(ptz_position[0])}_t{int(ptz_position[1])}_z{int(ptz_position[2])}_marked.jpg"
  566. marked_filepath = batch_dir / marked_filename
  567. if marked_frame is not None:
  568. encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
  569. result, encoded = cv2.imencode('.jpg', marked_frame, encode_param)
  570. if result:
  571. with open(str(marked_filepath), 'wb') as f:
  572. f.write(encoded)
  573. logger.info(f"[配对保存] 球机图已保存: 原图={original_filepath}, 标记图={marked_filepath}")
  574. # 更新批次信息
  575. if person_index < len(self._current_batch.persons):
  576. self._current_batch.persons[person_index].ptz_position = ptz_position
  577. self._current_batch.persons[person_index].ptz_bbox = ptz_bbox
  578. self._current_batch.persons[person_index].ptz_image_saved = True
  579. self._current_batch.persons[person_index].ptz_image_path = str(marked_filepath)
  580. self._current_batch.persons[person_index].ptz_image_original_path = str(original_filepath)
  581. self._current_batch.ptz_images_count += 1
  582. with self._stats_lock:
  583. self._stats['total_ptz_images'] += 1
  584. # 异步上传OSS(不阻塞主线程)
  585. if self.enable_oss and self.oss_uploader:
  586. self._queue_save_task({
  587. 'type': 'ptz',
  588. 'batch_id': batch_id,
  589. 'batch_dir': batch_dir,
  590. 'person_index': person_index,
  591. 'ptz_frame': ptz_frame,
  592. 'ptz_position': ptz_position,
  593. 'ptz_frame_marked': ptz_frame_marked,
  594. 'ptz_bbox': ptz_bbox
  595. })
  596. return str(original_filepath), str(marked_filepath)
  597. except Exception as e:
  598. logger.error(f"[配对保存] 保存球机图失败: {e}")
  599. return None, None
  600. def _upload_panorama_to_oss(self, batch_id: str, panorama_path: str, image_type: str = 'panorama'):
  601. """上传全景图到 OSS"""
  602. logger.info(f"[OSS] _upload_panorama_to_oss: batch_id={batch_id}, path={panorama_path}, type={image_type}")
  603. # 确定是原图还是标记图
  604. is_original = image_type == 'panorama_original'
  605. def on_upload_complete(result):
  606. logger.info(f"[OSS] 全景图上传完成回调: type={image_type}, url={result.oss_url}")
  607. if is_original:
  608. self._upload_status[batch_id]['panorama_original'] = True
  609. self._upload_status[batch_id]['panorama_original_url'] = result.oss_url
  610. if self._current_batch and self._current_batch.batch_id == batch_id:
  611. self._current_batch.panorama_oss_url_original = result.oss_url
  612. else:
  613. self._upload_status[batch_id]['panorama'] = True
  614. self._upload_status[batch_id]['panorama_url'] = result.oss_url
  615. if self._current_batch and self._current_batch.batch_id == batch_id:
  616. self._current_batch.panorama_oss_url = result.oss_url
  617. with self._stats_lock:
  618. self._stats['oss_upload_success'] += 1
  619. def on_upload_error(result):
  620. logger.error(f"[OSS] 全景图上传失败回调: type={image_type}, error={result.error}")
  621. with self._stats_lock:
  622. self._stats['oss_upload_failed'] += 1
  623. def on_upload_done(result):
  624. if result.success:
  625. on_upload_complete(result)
  626. else:
  627. on_upload_error(result)
  628. try:
  629. oss_key = self.oss_uploader.upload_image(
  630. local_path=panorama_path,
  631. batch_id=batch_id,
  632. image_type=image_type,
  633. callback=on_upload_done
  634. )
  635. logger.info(f"[OSS] 全景图已加入上传队列: type={image_type}, oss_key={oss_key}")
  636. except Exception as e:
  637. logger.error(f"[OSS] 全景图上传异常: {e}")
  638. def _upload_ptz_to_oss(self, batch_id: str, person_index: int, original_path: str = None, marked_path: str = None):
  639. """上传球机图到 OSS(原图和标记图)"""
  640. logger.info(f"[OSS] _upload_ptz_to_oss: batch={batch_id}, person={person_index}, original={original_path}, marked={marked_path}")
  641. # 初始化上传状态
  642. if batch_id not in self._upload_status:
  643. self._upload_status[batch_id] = {'ptz': {}}
  644. if 'ptz_original' not in self._upload_status[batch_id]:
  645. self._upload_status[batch_id]['ptz_original'] = {}
  646. if 'ptz_marked' not in self._upload_status[batch_id]:
  647. self._upload_status[batch_id]['ptz_marked'] = {}
  648. # 上传原图
  649. if original_path:
  650. def on_original_complete(result):
  651. self._upload_status[batch_id]['ptz_original'][person_index] = result.oss_url
  652. if self._current_batch and self._current_batch.batch_id == batch_id:
  653. if person_index < len(self._current_batch.persons):
  654. self._current_batch.persons[person_index].ptz_oss_url_original = result.oss_url
  655. with self._stats_lock:
  656. self._stats['oss_upload_success'] += 1
  657. logger.info(f"[OSS] 球机原图上传成功: {result.oss_url}")
  658. def on_original_done(result):
  659. if result.success:
  660. on_original_complete(result)
  661. else:
  662. logger.error(f"[OSS] 球机原图上传失败: {result.error}")
  663. try:
  664. self.oss_uploader.upload_image(
  665. local_path=original_path,
  666. batch_id=batch_id,
  667. image_type='ptz_original',
  668. person_index=person_index,
  669. callback=on_original_done
  670. )
  671. except Exception as e:
  672. logger.error(f"[OSS] 球机原图上传异常: {e}")
  673. # 上传标记图
  674. if marked_path:
  675. def on_marked_complete(result):
  676. self._upload_status[batch_id]['ptz_marked'][person_index] = result.oss_url
  677. if self._current_batch and self._current_batch.batch_id == batch_id:
  678. if person_index < len(self._current_batch.persons):
  679. self._current_batch.persons[person_index].ptz_oss_url = result.oss_url
  680. with self._stats_lock:
  681. self._stats['oss_upload_success'] += 1
  682. logger.info(f"[OSS] 球机标记图上传成功: {result.oss_url}")
  683. def on_marked_done(result):
  684. if result.success:
  685. on_marked_complete(result)
  686. else:
  687. logger.error(f"[OSS] 球机标记图上传失败: {result.error}")
  688. try:
  689. self.oss_uploader.upload_image(
  690. local_path=marked_path,
  691. batch_id=batch_id,
  692. image_type='ptz',
  693. person_index=person_index,
  694. callback=on_marked_done
  695. )
  696. except Exception as e:
  697. logger.error(f"[OSS] 球机标记图上传异常: {e}")
  698. def _finalize_batch(self, batch: DetectionBatch):
  699. """完成批次处理"""
  700. batch.completed = True
  701. # 等待 OSS 上传完成(最多等待10秒)
  702. if self.enable_oss and batch.batch_id in self._upload_status:
  703. wait_start = time.time()
  704. max_wait = 10.0 # 增加等待时间
  705. while time.time() - wait_start < max_wait:
  706. status = self._upload_status[batch.batch_id]
  707. # 检查全景图是否上传完成(优先检查 _upload_status 中的状态)
  708. panorama_url = status.get('panorama_url')
  709. panorama_done = panorama_url is not None or not batch.panorama_path
  710. # 检查所有球机图是否上传完成
  711. ptz_status = status.get('ptz', {})
  712. ptz_done = all(
  713. ptz_status.get(idx) is not None
  714. for idx, person in enumerate(batch.persons)
  715. if person.ptz_image_saved
  716. )
  717. if panorama_done and ptz_done:
  718. break
  719. time.sleep(0.2)
  720. # 创建 batch_info.json 文件
  721. try:
  722. batch_dir = self.base_dir / f"batch_{batch.batch_id}"
  723. # 构建 JSON 数据
  724. batch_info = self._build_batch_info_json(batch)
  725. # 保存为 JSON 文件
  726. json_path = batch_dir / "batch_info.json"
  727. with open(json_path, 'w', encoding='utf-8') as f:
  728. json.dump(batch_info, f, ensure_ascii=False, indent=2)
  729. logger.info(f"[配对保存] batch_info.json 已保存: {json_path}")
  730. # 同时保留 txt 格式用于兼容(可选)
  731. txt_path = batch_dir / "batch_info.txt"
  732. self._save_batch_info_txt(batch, txt_path)
  733. # 标记上传完成
  734. if batch.batch_id in self._upload_status:
  735. self._upload_status[batch.batch_id]['completed'] = True
  736. # 触发回调
  737. if self._upload_callback:
  738. try:
  739. self._upload_callback(batch_info)
  740. except Exception as e:
  741. logger.error(f"[配对保存] 回调执行错误: {e}")
  742. logger.info(f"[配对保存] 批次完成: {batch.batch_id}, "
  743. f"人员={batch.total_persons}, 球机图={batch.ptz_images_count}")
  744. except Exception as e:
  745. logger.error(f"[配对保存] 保存批次信息失败: {e}")
  746. # 清理旧批次
  747. self._cleanup_old_batches()
  748. def _report_to_third_party(self, batch_info: Dict):
  749. """上报批次信息到第三方平台"""
  750. try:
  751. from config import THIRD_PARTY_CONFIG
  752. except ImportError:
  753. logger.warning("[第三方] 配置模块不可用,跳过上报")
  754. return
  755. if not THIRD_PARTY_CONFIG.get('enabled', False):
  756. return
  757. base_url = THIRD_PARTY_CONFIG.get('base_url', '')
  758. endpoint = THIRD_PARTY_CONFIG.get('endpoints', {}).get('batch_report', '')
  759. if not base_url or not endpoint:
  760. logger.warning("[第三方] 接口地址未配置,跳过上报")
  761. return
  762. url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}"
  763. retry_count = THIRD_PARTY_CONFIG.get('retry_count', 3)
  764. retry_delay = THIRD_PARTY_CONFIG.get('retry_delay', 2.0)
  765. timeout = THIRD_PARTY_CONFIG.get('timeout', 10)
  766. for attempt in range(1, retry_count + 1):
  767. try:
  768. import requests
  769. resp = requests.post(url, json=batch_info, timeout=timeout)
  770. if resp.status_code in (200, 201):
  771. logger.info(f"[第三方] 批次上报成功: batch_id={batch_info.get('batch_id')}, status={resp.status_code}")
  772. return
  773. else:
  774. logger.warning(f"[第三方] 批次上报失败: status={resp.status_code}, body={resp.text[:200]}")
  775. except Exception as e:
  776. logger.warning(f"[第三方] 批次上报异常(第{attempt}次): {e}")
  777. if attempt < retry_count:
  778. time.sleep(retry_delay)
  779. logger.error(f"[第三方] 批次上报最终失败: batch_id={batch_info.get('batch_id')}")
  780. def _build_batch_info_json(self, batch: DetectionBatch) -> Dict:
  781. """
  782. 构建 batch_info.json 数据结构
  783. Returns:
  784. Dict: 批次信息字典
  785. """
  786. # 从上传状态获取最新的 OSS URL
  787. upload_status = self._upload_status.get(batch.batch_id, {})
  788. # 获取全景图 OSS URL
  789. panorama_oss_url = upload_status.get('panorama_url', batch.panorama_oss_url)
  790. panorama_oss_url_original = upload_status.get('panorama_original_url', batch.panorama_oss_url_original)
  791. # 人员信息列表
  792. persons_list = []
  793. for person in batch.persons:
  794. # 获取球机图 OSS URL
  795. ptz_oss_url = upload_status.get('ptz_marked', {}).get(person.person_index, person.ptz_oss_url)
  796. ptz_oss_url_original = upload_status.get('ptz_original', {}).get(person.person_index, person.ptz_oss_url_original)
  797. person_data = {
  798. 'person_index': person.person_index,
  799. 'position': {
  800. 'x': round(person.position[0], 4),
  801. 'y': round(person.position[1], 4)
  802. },
  803. 'bbox': {
  804. 'x1': person.bbox[0],
  805. 'y1': person.bbox[1],
  806. 'x2': person.bbox[2],
  807. 'y2': person.bbox[3]
  808. },
  809. 'confidence': round(person.confidence, 4),
  810. 'ptz_position': {
  811. 'pan': round(person.ptz_position[0], 2) if person.ptz_position else None,
  812. 'tilt': round(person.ptz_position[1], 2) if person.ptz_position else None,
  813. 'zoom': person.ptz_position[2] if person.ptz_position else None
  814. } if person.ptz_position else None,
  815. 'ptz_bbox': {
  816. 'x1': person.ptz_bbox[0],
  817. 'y1': person.ptz_bbox[1],
  818. 'x2': person.ptz_bbox[2],
  819. 'y2': person.ptz_bbox[3]
  820. } if person.ptz_bbox else None,
  821. 'ptz_image_saved': person.ptz_image_saved,
  822. 'ptz_image_path': person.ptz_image_path, # 标记图
  823. 'ptz_image_original_path': person.ptz_image_original_path, # 原图
  824. 'ptz_oss_url': ptz_oss_url, # 标记图 OSS URL
  825. 'ptz_oss_url_original': ptz_oss_url_original # 原图 OSS URL
  826. }
  827. persons_list.append(person_data)
  828. # 构建完整批次信息
  829. batch_info = {
  830. 'batch_id': batch.batch_id,
  831. 'device_id': batch.device_id,
  832. 'project_id': batch.project_id,
  833. 'timestamp': batch.timestamp,
  834. 'datetime': datetime.fromtimestamp(batch.timestamp).isoformat(),
  835. 'total_persons': batch.total_persons,
  836. 'ptz_images_count': batch.ptz_images_count,
  837. 'panorama': {
  838. 'local_path': batch.panorama_path, # 标记图
  839. 'local_path_original': batch.panorama_original_path, # 原图
  840. 'oss_url': panorama_oss_url, # 标记图 OSS URL
  841. 'oss_url_original': panorama_oss_url_original # 原图 OSS URL
  842. },
  843. 'persons': persons_list,
  844. 'upload_status': {
  845. 'panorama_uploaded': panorama_oss_url is not None,
  846. 'panorama_original_uploaded': panorama_oss_url_original is not None,
  847. 'all_ptz_uploaded': all(
  848. upload_status.get('ptz', {}).get(p.person_index) is not None
  849. for p in batch.persons if p.ptz_image_saved
  850. )
  851. }
  852. }
  853. return batch_info
  854. def _save_batch_info_txt(self, batch: DetectionBatch, txt_path: Path):
  855. """保存批次信息为 TXT 格式(兼容旧版本)"""
  856. try:
  857. with open(txt_path, 'w', encoding='utf-8') as f:
  858. f.write(f"批次ID: {batch.batch_id}\n")
  859. f.write(f"设备ID: {batch.device_id}\n")
  860. f.write(f"项目ID: {batch.project_id}\n")
  861. f.write(f"时间戳: {datetime.fromtimestamp(batch.timestamp)}\n")
  862. f.write(f"总人数: {batch.total_persons}\n")
  863. f.write(f"球机图数量: {batch.ptz_images_count}\n")
  864. f.write(f"全景图: {batch.panorama_path}\n")
  865. f.write(f"全景图OSS: {batch.panorama_oss_url}\n")
  866. f.write("\n人员详情:\n")
  867. for i, person in enumerate(batch.persons):
  868. f.write(f"\n Person {i}:\n")
  869. f.write(f" Person Index: {person.person_index}\n")
  870. f.write(f" Position: ({person.position[0]:.3f}, {person.position[1]:.3f})\n")
  871. f.write(f" BBox: ({person.bbox[0]}, {person.bbox[1]}, {person.bbox[2]}, {person.bbox[3]})\n")
  872. f.write(f" Confidence: {person.confidence:.2f}\n")
  873. f.write(f" PTZ Position: {person.ptz_position}\n")
  874. if person.ptz_bbox:
  875. f.write(f" PTZ BBox: ({person.ptz_bbox[0]}, {person.ptz_bbox[1]}, {person.ptz_bbox[2]}, {person.ptz_bbox[3]})\n")
  876. else:
  877. f.write(f" PTZ BBox: None\n")
  878. f.write(f" PTZ Image: {person.ptz_image_path}\n")
  879. f.write(f" PTZ OSS URL: {person.ptz_oss_url}\n")
  880. except Exception as e:
  881. logger.error(f"[配对保存] 保存 TXT 批次信息失败: {e}")
  882. def _cleanup_old_batches(self):
  883. """清理旧批次目录"""
  884. if not self.cleanup_enabled:
  885. logger.debug("[配对保存] 自动清理已禁用")
  886. return
  887. try:
  888. batch_dirs = sorted(
  889. [d for d in self.base_dir.iterdir() if d.is_dir() and d.name.startswith('batch_')],
  890. key=lambda x: x.stat().st_mtime
  891. )
  892. if not batch_dirs:
  893. return
  894. now = time.time()
  895. to_delete = []
  896. for d in batch_dirs:
  897. # 按数量清理:超出最大保留数量时删除最旧的
  898. if len(batch_dirs) - len(to_delete) > self.max_batches:
  899. to_delete.append(d)
  900. continue
  901. # 按天数清理:超过保留天数时删除
  902. if self.retention_days > 0:
  903. age_days = (now - d.stat().st_mtime) / 86400
  904. if age_days > self.retention_days:
  905. to_delete.append(d)
  906. # 执行删除
  907. deleted_count = 0
  908. for d in to_delete:
  909. import shutil
  910. shutil.rmtree(d)
  911. deleted_count += 1
  912. logger.info(f"[配对保存] 清理旧批次: {d.name}")
  913. if deleted_count > 0:
  914. logger.info(f"[配对保存] 共清理 {deleted_count} 个旧批次")
  915. except Exception as e:
  916. logger.error(f"[配对保存] 清理旧批次失败: {e}")
  917. def get_current_batch_id(self) -> Optional[str]:
  918. """获取当前批次ID"""
  919. with self._batch_lock:
  920. return self._current_batch.batch_id if self._current_batch else None
  921. def get_stats(self) -> Dict:
  922. """获取统计信息"""
  923. with self._stats_lock:
  924. return self._stats.copy()
  925. def set_upload_callback(self, callback: Callable):
  926. """
  927. 设置批次完成回调函数
  928. Args:
  929. callback: 回调函数,接收 batch_info_dict 参数
  930. """
  931. self._upload_callback = callback
  932. def get_batch_info(self, batch_id: str) -> Optional[Dict]:
  933. """
  934. 获取指定批次的 batch_info.json 数据
  935. Args:
  936. batch_id: 批次ID
  937. Returns:
  938. Dict 或 None
  939. """
  940. try:
  941. batch_dir = self.base_dir / f"batch_{batch_id}"
  942. json_path = batch_dir / "batch_info.json"
  943. if json_path.exists():
  944. with open(json_path, 'r', encoding='utf-8') as f:
  945. return json.load(f)
  946. except Exception as e:
  947. logger.error(f"[配对保存] 读取 batch_info.json 失败: {e}")
  948. return None
  949. def close(self):
  950. """关闭管理器,完成当前批次(异步)"""
  951. with self._batch_lock:
  952. if self._current_batch is not None:
  953. # 异步完成最后批次
  954. self._queue_save_task({
  955. 'type': 'finalize',
  956. 'batch': self._current_batch
  957. })
  958. # 短暂等待让后台线程处理
  959. time.sleep(0.5)
  960. self._current_batch = None
  961. logger.info("[配对保存] 管理器已关闭")
  962. # 全局单例实例
  963. _paired_saver_instance: Optional[PairedImageSaver] = None
  964. def get_paired_saver(base_dir: str = None, time_window: float = None,
  965. max_batches: int = None, cleanup_enabled: bool = None,
  966. retention_days: int = None,
  967. enable_oss: bool = False, oss_uploader = None,
  968. device_config: Dict = None) -> PairedImageSaver:
  969. """
  970. 获取配对保存管理器实例(单例模式)
  971. 如果实例已存在但缺少 OSS/设备配置,会自动从配置模块更新
  972. Args:
  973. base_dir: 基础保存目录(默认从配置读取)
  974. time_window: 时间窗口(默认从配置读取)
  975. max_batches: 最大保留批次数量(默认从配置读取)
  976. cleanup_enabled: 是否启用自动清理(默认从配置读取)
  977. retention_days: 保留天数(默认从配置读取)
  978. enable_oss: 是否启用 OSS 上传
  979. oss_uploader: OSS 上传器实例
  980. device_config: 设备配置字典
  981. Returns:
  982. PairedImageSaver 实例
  983. """
  984. global _paired_saver_instance
  985. if _paired_saver_instance is None:
  986. _paired_saver_instance = PairedImageSaver(
  987. base_dir=base_dir,
  988. time_window=time_window,
  989. max_batches=max_batches,
  990. cleanup_enabled=cleanup_enabled,
  991. retention_days=retention_days,
  992. enable_oss=enable_oss,
  993. oss_uploader=oss_uploader,
  994. device_config=device_config
  995. )
  996. else:
  997. # 单例已存在,检查是否需要更新配置
  998. # PairedImageSaver.__init__ 已从配置模块自动读取,
  999. # 这里只处理外部显式传入的参数覆盖
  1000. if oss_uploader is not None and _paired_saver_instance.oss_uploader is None:
  1001. _paired_saver_instance.oss_uploader = oss_uploader
  1002. _paired_saver_instance.enable_oss = True
  1003. logger.info("[配对保存] 更新 OSS 上传器配置")
  1004. if device_config is not None:
  1005. _paired_saver_instance.device_config.update(device_config)
  1006. logger.info(f"[配对保存] 更新设备配置: {device_config}")
  1007. return _paired_saver_instance
  1008. def reset_paired_saver():
  1009. """重置单例实例(用于测试)"""
  1010. global _paired_saver_instance
  1011. if _paired_saver_instance is not None:
  1012. _paired_saver_instance.close()
  1013. _paired_saver_instance = None