Просмотр исходного кода

perf(logging): 增加日志清理机制优化日志管理

- 添加日志文件清理功能,自动删除超出保留天数的日志文件
- 在日志配置中支持日志保留天数设置
- 启动日志清理后台线程,周期性执行清理操作
- 确保日志目录存在,避免写日志时出错
- 其他程序结构及注释优化,未改变核心功能逻辑
wenhongquan 5 дней назад
Родитель
Сommit
0dd118eb73

BIN
dual_camera_system/__pycache__/coordinator.cpython-310.pyc


BIN
dual_camera_system/__pycache__/main.cpython-310.pyc


BIN
dual_camera_system/__pycache__/paired_image_saver.cpython-310.pyc


BIN
dual_camera_system/__pycache__/safety_main.cpython-310.pyc


+ 2 - 1
dual_camera_system/config/camera.py

@@ -7,9 +7,10 @@ import os
 LOG_CONFIG = {
     'level': 'INFO',
     'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
-    'file': None,
+    'file': None,  # 日志文件路径,设置为None则只输出到控制台
     'max_bytes': 10 * 1024 * 1024,
     'backup_count': 5,
+    'retention_days': 7,  # 日志保留天数
 }
 
 # ============================================================

+ 61 - 3
dual_camera_system/main.py

@@ -14,6 +14,7 @@ os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
 
 import sys
 import time
+import glob
 import argparse
 import logging
 import threading
@@ -40,15 +41,64 @@ from coordinator import Coordinator, EventDrivenCoordinator, AsyncCoordinator, S
 
 
 # 配置日志 - 使用LOG_CONFIG
+def _cleanup_old_logs(log_file: str, retention_days: int):
+    """清理超过保留天数的日志文件"""
+    import glob
+    if not log_file:
+        return
+
+    log_dir = os.path.dirname(log_file) or '.'
+    log_basename = os.path.basename(log_file)
+
+    # 匹配所有轮转的日志文件:app.log, app.log.1, app.log.2, ...
+    patterns = [
+        log_basename,
+        f"{log_basename}.*",
+        f"{os.path.splitext(log_basename)[0]}.*",  # 处理没有扩展名的情况
+    ]
+
+    now = time.time()
+    cutoff = now - (retention_days * 86400)
+
+    for pattern in patterns:
+        full_pattern = os.path.join(log_dir, pattern)
+        for log_path in glob.glob(full_pattern):
+            try:
+                if os.path.isfile(log_path):
+                    mtime = os.path.getmtime(log_path)
+                    if mtime < cutoff:
+                        os.remove(log_path)
+                        print(f"[日志清理] 已删除过期日志: {log_path}")
+            except Exception as e:
+                pass  # 忽略删除失败的日志文件
+
+
+def _log_cleanup_worker(retention_days: int, interval_hours: int = 6):
+    """日志清理后台线程"""
+    log_file = LOG_CONFIG.get('file')
+    if not log_file:
+        return
+
+    while True:
+        _cleanup_old_logs(log_file, retention_days)
+        time.sleep(interval_hours * 3600)
+
+
 def setup_logging():
     """设置日志配置"""
     log_level = getattr(logging, LOG_CONFIG.get('level', 'INFO'), logging.INFO)
     log_format = LOG_CONFIG.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
     log_file = LOG_CONFIG.get('file')
-    
+    retention_days = LOG_CONFIG.get('retention_days', 7)
+
     handlers = [logging.StreamHandler()]
-    
+
     if log_file:
+        # 确保日志目录存在
+        log_dir = os.path.dirname(log_file)
+        if log_dir:
+            os.makedirs(log_dir, exist_ok=True)
+
         from logging.handlers import RotatingFileHandler
         file_handler = RotatingFileHandler(
             log_file,
@@ -57,7 +107,15 @@ def setup_logging():
         )
         file_handler.setFormatter(logging.Formatter(log_format))
         handlers.append(file_handler)
-    
+
+        # 启动日志清理后台线程
+        cleanup_thread = threading.Thread(
+            target=_log_cleanup_worker,
+            args=(retention_days, 6),
+            daemon=True
+        )
+        cleanup_thread.start()
+
     logging.basicConfig(
         level=log_level,
         format=log_format,

+ 211 - 25
dual_camera_system/paired_image_saver.py

@@ -147,11 +147,17 @@ class PairedImageSaver:
         self._current_batch: Optional[DetectionBatch] = None
         self._batch_lock = threading.Lock()
         self._last_batch_time = 0.0
-        
+
+        # 后台线程池(用于异步保存图片和上传OSS,不阻塞主识别线程)
+        self._save_thread_pool = threading.Thread(target=self._save_worker, daemon=True)
+        self._save_thread_pool.start()
+        self._save_queue = []  # 保存任务队列
+        self._save_queue_lock = threading.Lock()
+
         # 上传状态追踪
         self._upload_status: Dict[str, Dict] = {}  # batch_id -> {panorama: bool, ptz: Dict}
         self._upload_callback: Optional[Callable] = None
-        
+
         # 统计信息
         self._stats = {
             'total_batches': 0,
@@ -174,6 +180,173 @@ class PairedImageSaver:
             self.base_dir.mkdir(parents=True, exist_ok=True)
         except Exception as e:
             logger.error(f"[配对保存] 创建目录失败: {e}")
+
+    def _save_worker(self):
+        """后台工作线程:异步保存图片和上传OSS"""
+        logger.info("[配对保存] 后台保存线程已启动")
+        while True:
+            task = None
+            with self._save_queue_lock:
+                if self._save_queue:
+                    task = self._save_queue.pop(0)
+
+            if task is not None:
+                task_type = task.get('type')
+                try:
+                    if task_type == 'panorama':
+                        self._async_save_panorama(task)
+                    elif task_type == 'ptz':
+                        self._async_save_ptz(task)
+                    elif task_type == 'finalize':
+                        self._async_finalize_batch(task)
+                except Exception as e:
+                    logger.error(f"[配对保存] 后台任务执行失败: {e}")
+            else:
+                time.sleep(0.1)  # 无任务时短暂休眠
+
+    def _async_save_panorama(self, task: Dict):
+        """异步保存全景图并上传OSS"""
+        batch_id = task['batch_id']
+        batch_dir = task['batch_dir']
+        frame = task['frame']
+        persons = task['persons']
+
+        try:
+            # 保存原图
+            original_filename = f"00_panorama_original_n{len(persons)}.png"
+            original_filepath = batch_dir / original_filename
+            cv2.imwrite(str(original_filepath), frame)
+
+            # 保存标记图
+            marked_frame = frame.copy()
+            for i, person in enumerate(persons):
+                bbox = person.get('bbox', (0, 0, 0, 0))
+                x1, y1, x2, y2 = bbox
+                conf = person.get('confidence', 0.0)
+                cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
+                label = f"person_{i}({conf:.2f})"
+                (label_w, label_h), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
+                cv2.rectangle(marked_frame, (x1, y1 - label_h - 8), (x1 + label_w, y1), (0, 255, 0), -1)
+                cv2.putText(marked_frame, label, (x1, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
+
+            marked_filename = f"00_panorama_marked_n{len(persons)}.png"
+            marked_filepath = batch_dir / marked_filename
+            cv2.imwrite(str(marked_filepath), marked_frame)
+
+            logger.info(f"[配对保存] 全景图已保存: batch={batch_id}")
+
+            # 异步上传OSS
+            if self.enable_oss and self.oss_uploader:
+                if original_filepath.exists():
+                    self._upload_panorama_to_oss(batch_id, str(original_filepath), image_type='panorama_original')
+                if marked_filepath.exists():
+                    self._upload_panorama_to_oss(batch_id, str(marked_filepath), image_type='panorama')
+
+        except Exception as e:
+            logger.error(f"[配对保存] 异步保存全景图失败: {e}")
+
+    def _async_save_ptz(self, task: Dict):
+        """异步保存球机图并上传OSS"""
+        batch_id = task['batch_id']
+        batch_dir = task['batch_dir']
+        person_index = task['person_index']
+        ptz_frame = task['ptz_frame']
+        ptz_position = task['ptz_position']
+        ptz_frame_marked = task.get('ptz_frame_marked')
+        ptz_bbox = task.get('ptz_bbox')
+
+        try:
+            pan, tilt, zoom = ptz_position
+
+            # 保存原图(未标记)
+            original_filename = f"01_ptz_person{person_index}_p{int(pan)}_t{int(tilt)}_z{int(zoom)}_original.png"
+            original_filepath = batch_dir / original_filename
+            if ptz_frame is not None:
+                cv2.imwrite(str(original_filepath), ptz_frame)
+
+            # 保存标记图
+            marked_frame = ptz_frame_marked if ptz_frame_marked is not None else None
+            if marked_frame is None and ptz_frame is not None:
+                marked_frame = ptz_frame.copy()
+                h, w = marked_frame.shape[:2]
+                info_text = f"PTZ: P={pan:.1f} T={tilt:.1f} Z={zoom}"
+                cv2.putText(marked_frame, info_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
+                person_text = f"person_{person_index}"
+                cv2.putText(marked_frame, person_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
+                if ptz_bbox is not None:
+                    x1, y1, x2, y2 = ptz_bbox
+                    cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
+                    bbox_text = f"PTZ_BBox: ({x1},{y1},{x2},{y2})"
+                    cv2.putText(marked_frame, bbox_text, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
+
+            marked_filename = f"01_ptz_person{person_index}_p{int(pan)}_t{int(tilt)}_z{int(zoom)}_marked.png"
+            marked_filepath = batch_dir / marked_filename
+            if marked_frame is not None:
+                cv2.imwrite(str(marked_filepath), marked_frame)
+
+            logger.info(f"[配对保存] 球机图已保存: batch={batch_id}, person={person_index}")
+
+            # 异步上传OSS
+            if self.enable_oss and self.oss_uploader:
+                if original_filepath.exists():
+                    self._upload_ptz_to_oss(batch_id, person_index, str(original_filepath), str(marked_filepath))
+
+        except Exception as e:
+            logger.error(f"[配对保存] 异步保存球机图失败: {e}")
+
+    def _async_finalize_batch(self, task: Dict):
+        """异步完成批次(生成batch_info.json)"""
+        batch = task['batch']
+        try:
+            batch.completed = True
+            batch_dir = self.base_dir / f"batch_{batch.batch_id}"
+
+            # 等待 OSS 上传完成(最多等待8秒,避免阻塞太久)
+            if self.enable_oss and batch.batch_id in self._upload_status:
+                wait_start = time.time()
+                max_wait = 8.0
+                while time.time() - wait_start < max_wait:
+                    status = self._upload_status[batch.batch_id]
+                    panorama_done = status.get('panorama_url') is not None or not batch.panorama_path
+                    ptz_status = status.get('ptz', {})
+                    ptz_done = all(
+                        ptz_status.get(idx) is not None
+                        for idx, person in enumerate(batch.persons)
+                        if person.ptz_image_saved
+                    )
+                    if panorama_done and ptz_done:
+                        break
+                    time.sleep(0.2)
+
+            # 构建并保存 batch_info.json
+            batch_info = self._build_batch_info_json(batch)
+            json_path = batch_dir / "batch_info.json"
+            with open(json_path, 'w', encoding='utf-8') as f:
+                json.dump(batch_info, f, ensure_ascii=False, indent=2)
+
+            txt_path = batch_dir / "batch_info.txt"
+            self._save_batch_info_txt(batch, txt_path)
+
+            # 标记上传完成
+            if batch.batch_id in self._upload_status:
+                self._upload_status[batch.batch_id]['completed'] = True
+
+            # 触发回调
+            if self._upload_callback:
+                try:
+                    self._upload_callback(batch_info)
+                except Exception as e:
+                    logger.error(f"[配对保存] 回调执行错误: {e}")
+
+            logger.info(f"[配对保存] 批次完成: {batch.batch_id}")
+
+        except Exception as e:
+            logger.error(f"[配对保存] 异步完成批次失败: {e}")
+
+    def _queue_save_task(self, task: Dict):
+        """添加保存任务到队列"""
+        with self._save_queue_lock:
+            self._save_queue.append(task)
     
     def _generate_batch_id(self) -> str:
         """生成批次ID"""
@@ -203,22 +376,34 @@ class PairedImageSaver:
         with self._batch_lock:
             current_time = time.time()
             
-            # 完成上一批次(如果有)
-            # 注意:每次检测都创建独立批次,不复用,确保 batch_info 与实际检测一致
+            # 完成上一批次(如果有)- 异步执行,不阻塞主线程
             if self._current_batch is not None:
-                self._finalize_batch(self._current_batch)
+                self._queue_save_task({
+                    'type': 'finalize',
+                    'batch': self._current_batch
+                })
             
             # 创建新批次
             batch_id = self._generate_batch_id()
             batch_dir = self._create_batch_dir(batch_id)
 
-            # 保存全景图片(原图和标记图)
+            # 异步保存全景图片(原图和标记图),不阻塞主线程
             panorama_original_path = None
             panorama_marked_path = None
             if panorama_frame is not None:
+                # 异步保存,路径在保存完成后会更新
                 panorama_original_path, panorama_marked_path = self._save_panorama_image(
                     batch_dir, batch_id, panorama_frame, persons
                 )
+                # 异步上传OSS(不阻塞主线程)
+                if panorama_original_path and panorama_marked_path:
+                    self._queue_save_task({
+                        'type': 'panorama',
+                        'batch_id': batch_id,
+                        'batch_dir': batch_dir,
+                        'frame': panorama_frame,
+                        'persons': persons
+                    })
 
             # 创建人员信息
             person_infos = []
@@ -267,20 +452,6 @@ class PairedImageSaver:
                 f"人员={len(persons)}, 目录={batch_dir}"
             )
 
-            # 上传全景图到 OSS(原图和标记图)
-            logger.info(f"[配对保存] 开始新批次: enable_oss={self.enable_oss}, uploader={self.oss_uploader}")
-            if self.enable_oss and self.oss_uploader:
-                # 上传原图
-                if panorama_original_path:
-                    logger.info(f"[配对保存] 准备上传全景原图到 OSS: {panorama_original_path}")
-                    self._upload_panorama_to_oss(batch_id, panorama_original_path, image_type='panorama_original')
-                # 上传标记图
-                if panorama_marked_path:
-                    logger.info(f"[配对保存] 准备上传全景标记图到 OSS: {panorama_marked_path}")
-                    self._upload_panorama_to_oss(batch_id, panorama_marked_path, image_type='panorama')
-            else:
-                logger.warning(f"[配对保存] OSS未启用或上传器不可用: enable_oss={self.enable_oss}, uploader={self.oss_uploader}")
-            
             return batch_id
     
     def _save_panorama_image(self, batch_dir: Path, batch_id: str,
@@ -440,9 +611,18 @@ class PairedImageSaver:
                     with self._stats_lock:
                         self._stats['total_ptz_images'] += 1
 
-                    # 上传原图和标记图到 OSS
+                    # 异步上传OSS(不阻塞主线程)
                     if self.enable_oss and self.oss_uploader:
-                        self._upload_ptz_to_oss(batch_id, person_index, str(original_filepath), str(marked_filepath))
+                        self._queue_save_task({
+                            'type': 'ptz',
+                            'batch_id': batch_id,
+                            'batch_dir': batch_dir,
+                            'person_index': person_index,
+                            'ptz_frame': ptz_frame,
+                            'ptz_position': ptz_position,
+                            'ptz_frame_marked': ptz_frame_marked,
+                            'ptz_bbox': ptz_bbox
+                        })
 
                 return str(original_filepath), str(marked_filepath)
 
@@ -823,12 +1003,18 @@ class PairedImageSaver:
         return None
     
     def close(self):
-        """关闭管理器,完成当前批次"""
+        """关闭管理器,完成当前批次(异步)"""
         with self._batch_lock:
             if self._current_batch is not None:
-                self._finalize_batch(self._current_batch)
+                # 异步完成最后批次
+                self._queue_save_task({
+                    'type': 'finalize',
+                    'batch': self._current_batch
+                })
+                # 短暂等待让后台线程处理
+                time.sleep(0.5)
                 self._current_batch = None
-        
+
         logger.info("[配对保存] 管理器已关闭")
 
 

+ 58 - 3
dual_camera_system/safety_main.py

@@ -12,6 +12,7 @@
 import os
 import sys
 import time
+import glob
 import argparse
 import logging
 import threading
@@ -41,15 +42,61 @@ from safety_coordinator import SafetyCoordinator, SimpleCamera
 
 
 # 配置日志
+def _cleanup_old_logs(log_file: str, retention_days: int):
+    """清理超过保留天数的日志文件"""
+    if not log_file:
+        return
+
+    log_dir = os.path.dirname(log_file) or '.'
+    log_basename = os.path.basename(log_file)
+
+    patterns = [
+        log_basename,
+        f"{log_basename}.*",
+        f"{os.path.splitext(log_basename)[0]}.*",
+    ]
+
+    now = time.time()
+    cutoff = now - (retention_days * 86400)
+
+    for pattern in patterns:
+        full_pattern = os.path.join(log_dir, pattern)
+        for log_path in glob.glob(full_pattern):
+            try:
+                if os.path.isfile(log_path):
+                    mtime = os.path.getmtime(log_path)
+                    if mtime < cutoff:
+                        os.remove(log_path)
+                        print(f"[日志清理] 已删除过期日志: {log_path}")
+            except Exception:
+                pass
+
+
+def _log_cleanup_worker(retention_days: int, interval_hours: int = 6):
+    """日志清理后台线程"""
+    log_file = LOG_CONFIG.get('file')
+    if not log_file:
+        return
+
+    while True:
+        _cleanup_old_logs(log_file, retention_days)
+        time.sleep(interval_hours * 3600)
+
+
 def setup_logging():
     """设置日志配置"""
     log_level = getattr(logging, LOG_CONFIG.get('level', 'INFO'), logging.INFO)
     log_format = LOG_CONFIG.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
     log_file = LOG_CONFIG.get('file')
-    
+    retention_days = LOG_CONFIG.get('retention_days', 7)
+
     handlers = [logging.StreamHandler()]
-    
+
     if log_file:
+        log_dir = os.path.dirname(log_file)
+        if log_dir:
+            os.makedirs(log_dir, exist_ok=True)
+
         from logging.handlers import RotatingFileHandler
         file_handler = RotatingFileHandler(
             log_file,
@@ -58,7 +105,15 @@ def setup_logging():
         )
         file_handler.setFormatter(logging.Formatter(log_format))
         handlers.append(file_handler)
-    
+
+        # 启动日志清理后台线程
+        cleanup_thread = threading.Thread(
+            target=_log_cleanup_worker,
+            args=(retention_days, 6),
+            daemon=True
+        )
+        cleanup_thread.start()
+
     logging.basicConfig(
         level=log_level,
         format=log_format,