| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- """本地图片文件清理工具."""
- import logging
- import os
- import threading
- import time
- from pathlib import Path
- from typing import Callable, List, Optional
- logger = logging.getLogger(__name__)
- def _file_mtime(path: Path) -> float:
- """获取文件修改时间,失败返回 0。"""
- try:
- return path.stat().st_mtime
- except OSError:
- return 0.0
- def cleanup_directory(
- directory: str,
- retention_days: int,
- max_files: Optional[int] = None,
- pattern: str = "*",
- now: Optional[float] = None,
- ) -> int:
- """
- 清理目录中的过期文件。
- 规则:
- 1. 删除修改时间超过 retention_days 的文件。
- 2. 如果指定 max_files 且文件总数超过,删除最旧的文件直到数量符合限制。
- Args:
- directory: 要清理的目录。
- retention_days: 保留天数。
- max_files: 最大文件数,None 表示不限制。
- pattern: 匹配模式,默认匹配所有文件。
- now: 当前时间戳,默认 time.time()。
- Returns:
- 删除的文件数量。
- """
- if retention_days <= 0 and max_files is None:
- return 0
- dir_path = Path(directory)
- if not dir_path.exists():
- return 0
- now = now or time.time()
- cutoff = now - retention_days * 86400.0
- try:
- files = [p for p in dir_path.rglob(pattern) if p.is_file()]
- except OSError as exc:
- logger.warning("[cleanup] 扫描目录失败: %s, %s", directory, exc)
- return 0
- deleted = 0
- for p in files:
- mtime = _file_mtime(p)
- if retention_days > 0 and mtime < cutoff:
- try:
- p.unlink()
- deleted += 1
- logger.debug("[cleanup] 删除过期文件: %s", p)
- except OSError as exc:
- logger.warning("[cleanup] 删除文件失败: %s, %s", p, exc)
- # 如果仍然超过最大数量,按修改时间删除最旧的
- if max_files is not None and max_files > 0:
- try:
- remaining = sorted(
- [p for p in dir_path.rglob(pattern) if p.is_file()],
- key=_file_mtime,
- )
- except OSError:
- remaining = []
- while len(remaining) > max_files:
- oldest = remaining.pop(0)
- try:
- oldest.unlink()
- deleted += 1
- logger.debug("[cleanup] 删除超量旧文件: %s", oldest)
- except OSError as exc:
- logger.warning("[cleanup] 删除文件失败: %s, %s", oldest, exc)
- break
- if deleted:
- logger.info("[cleanup] %s 清理完成,删除 %d 个文件", directory, deleted)
- return deleted
- class CleanupWorker:
- """定时清理 worker,每个实例管理一个目录。"""
- def __init__(
- self,
- directory: str,
- retention_days: int,
- max_files: Optional[int] = None,
- interval_seconds: float = 3600.0,
- pattern: str = "*",
- ):
- self.directory = directory
- self.retention_days = retention_days
- self.max_files = max_files
- self.interval_seconds = interval_seconds
- self.pattern = pattern
- self._stop_event = threading.Event()
- self._thread: Optional[threading.Thread] = None
- def start(self) -> None:
- if self._thread is not None and self._thread.is_alive():
- return
- self._stop_event.clear()
- self._thread = threading.Thread(target=self._loop, daemon=True)
- self._thread.start()
- logger.info(
- "[cleanup] worker 启动: %s, retention=%d天, max=%s, interval=%.0fs",
- self.directory,
- self.retention_days,
- self.max_files,
- self.interval_seconds,
- )
- def stop(self) -> None:
- self._stop_event.set()
- if self._thread is not None:
- self._thread.join(timeout=2.0)
- def run_once(self) -> int:
- return cleanup_directory(
- self.directory,
- self.retention_days,
- self.max_files,
- self.pattern,
- )
- def _loop(self) -> None:
- while not self._stop_event.is_set():
- try:
- self.run_once()
- except Exception as exc:
- logger.error("[cleanup] 清理异常: %s", exc)
- # 等待下一次清理或停止信号
- self._stop_event.wait(timeout=self.interval_seconds)
- def make_cleanup_workers(
- storage_config: dict,
- group_ids: List[str],
- base_path: str = ".",
- ) -> List[CleanupWorker]:
- """
- 根据 STORAGE_CONFIG 为 captures 和 previews 创建清理 worker。
- Args:
- storage_config: STORAGE_CONFIG 字典。
- group_ids: 摄像头组 ID 列表,用于为 captures 每组创建 worker。
- base_path: 项目根目录,相对路径基于此。
- Returns:
- CleanupWorker 列表。
- """
- workers: List[CleanupWorker] = []
- captures_cfg = storage_config.get("captures", {})
- captures_base = captures_cfg.get("base_dir", "data/captures")
- captures_retention = captures_cfg.get("retention_days", 7)
- captures_max = captures_cfg.get("max_files")
- captures_interval = captures_cfg.get("cleanup_interval_seconds", 3600)
- for gid in group_ids:
- workers.append(
- CleanupWorker(
- directory=os.path.join(base_path, captures_base, gid),
- retention_days=captures_retention,
- max_files=captures_max,
- interval_seconds=captures_interval,
- pattern="*",
- )
- )
- previews_cfg = storage_config.get("previews", {})
- previews_base = previews_cfg.get("base_dir", "data/previews")
- previews_retention = previews_cfg.get("retention_days", 7)
- previews_max = previews_cfg.get("max_files")
- previews_interval = previews_cfg.get("cleanup_interval_seconds", 3600)
- workers.append(
- CleanupWorker(
- directory=os.path.join(base_path, previews_base),
- retention_days=previews_retention,
- max_files=previews_max,
- interval_seconds=previews_interval,
- pattern="*",
- )
- )
- return workers
|