file_cleanup.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. """本地图片文件清理工具."""
  2. import logging
  3. import os
  4. import threading
  5. import time
  6. from pathlib import Path
  7. from typing import Callable, List, Optional
  8. logger = logging.getLogger(__name__)
  9. def _file_mtime(path: Path) -> float:
  10. """获取文件修改时间,失败返回 0。"""
  11. try:
  12. return path.stat().st_mtime
  13. except OSError:
  14. return 0.0
  15. def cleanup_directory(
  16. directory: str,
  17. retention_days: int,
  18. max_files: Optional[int] = None,
  19. pattern: str = "*",
  20. now: Optional[float] = None,
  21. ) -> int:
  22. """
  23. 清理目录中的过期文件。
  24. 规则:
  25. 1. 删除修改时间超过 retention_days 的文件。
  26. 2. 如果指定 max_files 且文件总数超过,删除最旧的文件直到数量符合限制。
  27. Args:
  28. directory: 要清理的目录。
  29. retention_days: 保留天数。
  30. max_files: 最大文件数,None 表示不限制。
  31. pattern: 匹配模式,默认匹配所有文件。
  32. now: 当前时间戳,默认 time.time()。
  33. Returns:
  34. 删除的文件数量。
  35. """
  36. if retention_days <= 0 and max_files is None:
  37. return 0
  38. dir_path = Path(directory)
  39. if not dir_path.exists():
  40. return 0
  41. now = now or time.time()
  42. cutoff = now - retention_days * 86400.0
  43. try:
  44. files = [p for p in dir_path.rglob(pattern) if p.is_file()]
  45. except OSError as exc:
  46. logger.warning("[cleanup] 扫描目录失败: %s, %s", directory, exc)
  47. return 0
  48. deleted = 0
  49. for p in files:
  50. mtime = _file_mtime(p)
  51. if retention_days > 0 and mtime < cutoff:
  52. try:
  53. p.unlink()
  54. deleted += 1
  55. logger.debug("[cleanup] 删除过期文件: %s", p)
  56. except OSError as exc:
  57. logger.warning("[cleanup] 删除文件失败: %s, %s", p, exc)
  58. # 如果仍然超过最大数量,按修改时间删除最旧的
  59. if max_files is not None and max_files > 0:
  60. try:
  61. remaining = sorted(
  62. [p for p in dir_path.rglob(pattern) if p.is_file()],
  63. key=_file_mtime,
  64. )
  65. except OSError:
  66. remaining = []
  67. while len(remaining) > max_files:
  68. oldest = remaining.pop(0)
  69. try:
  70. oldest.unlink()
  71. deleted += 1
  72. logger.debug("[cleanup] 删除超量旧文件: %s", oldest)
  73. except OSError as exc:
  74. logger.warning("[cleanup] 删除文件失败: %s, %s", oldest, exc)
  75. break
  76. if deleted:
  77. logger.info("[cleanup] %s 清理完成,删除 %d 个文件", directory, deleted)
  78. return deleted
  79. class CleanupWorker:
  80. """定时清理 worker,每个实例管理一个目录。"""
  81. def __init__(
  82. self,
  83. directory: str,
  84. retention_days: int,
  85. max_files: Optional[int] = None,
  86. interval_seconds: float = 3600.0,
  87. pattern: str = "*",
  88. ):
  89. self.directory = directory
  90. self.retention_days = retention_days
  91. self.max_files = max_files
  92. self.interval_seconds = interval_seconds
  93. self.pattern = pattern
  94. self._stop_event = threading.Event()
  95. self._thread: Optional[threading.Thread] = None
  96. def start(self) -> None:
  97. if self._thread is not None and self._thread.is_alive():
  98. return
  99. self._stop_event.clear()
  100. self._thread = threading.Thread(target=self._loop, daemon=True)
  101. self._thread.start()
  102. logger.info(
  103. "[cleanup] worker 启动: %s, retention=%d天, max=%s, interval=%.0fs",
  104. self.directory,
  105. self.retention_days,
  106. self.max_files,
  107. self.interval_seconds,
  108. )
  109. def stop(self) -> None:
  110. self._stop_event.set()
  111. if self._thread is not None:
  112. self._thread.join(timeout=2.0)
  113. def run_once(self) -> int:
  114. return cleanup_directory(
  115. self.directory,
  116. self.retention_days,
  117. self.max_files,
  118. self.pattern,
  119. )
  120. def _loop(self) -> None:
  121. while not self._stop_event.is_set():
  122. try:
  123. self.run_once()
  124. except Exception as exc:
  125. logger.error("[cleanup] 清理异常: %s", exc)
  126. # 等待下一次清理或停止信号
  127. self._stop_event.wait(timeout=self.interval_seconds)
  128. def make_cleanup_workers(
  129. storage_config: dict,
  130. group_ids: List[str],
  131. base_path: str = ".",
  132. ) -> List[CleanupWorker]:
  133. """
  134. 根据 STORAGE_CONFIG 为 captures 和 previews 创建清理 worker。
  135. Args:
  136. storage_config: STORAGE_CONFIG 字典。
  137. group_ids: 摄像头组 ID 列表,用于为 captures 每组创建 worker。
  138. base_path: 项目根目录,相对路径基于此。
  139. Returns:
  140. CleanupWorker 列表。
  141. """
  142. workers: List[CleanupWorker] = []
  143. captures_cfg = storage_config.get("captures", {})
  144. captures_base = captures_cfg.get("base_dir", "data/captures")
  145. captures_retention = captures_cfg.get("retention_days", 7)
  146. captures_max = captures_cfg.get("max_files")
  147. captures_interval = captures_cfg.get("cleanup_interval_seconds", 3600)
  148. for gid in group_ids:
  149. workers.append(
  150. CleanupWorker(
  151. directory=os.path.join(base_path, captures_base, gid),
  152. retention_days=captures_retention,
  153. max_files=captures_max,
  154. interval_seconds=captures_interval,
  155. pattern="*",
  156. )
  157. )
  158. previews_cfg = storage_config.get("previews", {})
  159. previews_base = previews_cfg.get("base_dir", "data/previews")
  160. previews_retention = previews_cfg.get("retention_days", 7)
  161. previews_max = previews_cfg.get("max_files")
  162. previews_interval = previews_cfg.get("cleanup_interval_seconds", 3600)
  163. workers.append(
  164. CleanupWorker(
  165. directory=os.path.join(base_path, previews_base),
  166. retention_days=previews_retention,
  167. max_files=previews_max,
  168. interval_seconds=previews_interval,
  169. pattern="*",
  170. )
  171. )
  172. return workers