Преглед изворни кода

feat(system): 实现简化版双摄像头联动抓拍系统

- 新增SimpleCoordinator类,实现全景摄像头检测人体后球机逐个定位抓拍功能
- 实现人体目标检测、排序及标记检测图保存
- 添加球机抓拍控制和抓拍图像保存机制
- 设计PTZ位置计算方式,支持可选校准器转换
- 实现稳定帧获取算法以保障抓拍质量
- 编写SimpleSystem类封装系统初始化、启动及停止流程
- 使用命令行参数支持摄像头IP、认证、检测模型及参数配置
- 添加日志记录,支持运行状态和统计信息输出
- 支持信号处理优雅关闭系统
wenhongquan пре 2 дана
родитељ
комит
0224a721b6
2 измењених фајлова са 599 додато и 0 уклоњено
  1. 345 0
      dual_camera_system/simple_coordinator.py
  2. 254 0
      dual_camera_system/simple_main.py

+ 345 - 0
dual_camera_system/simple_coordinator.py

@@ -0,0 +1,345 @@
+"""
+简化版联动控制器
+核心功能:全景检测到人 → 球机逐个定位抓拍
+"""
+
+import os
+os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
+
+import time
+import threading
+import logging
+from typing import Optional, List, Tuple
+from dataclasses import dataclass
+from datetime import datetime
+from pathlib import Path
+
+import cv2
+import numpy as np
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class PersonTarget:
+    """检测到的人员目标"""
+    index: int                    # 序号(用于标记)
+    position: Tuple[float, float] # 位置比例 (x_ratio, y_ratio)
+    bbox: Tuple[int, int, int, int]  # 边界框 (x, y, w, h)
+    area: int                     # 面积
+    confidence: float             # 置信度
+
+
+class SimpleCoordinator:
+    """
+    简化版联动控制器
+    
+    工作流程:
+    1. 全景摄像头实时检测人体
+    2. 检测到人后,按面积从大到小排序
+    3. 球机逐个定位到每个人,抓拍保存
+    4. 所有人抓拍完成后,回到初始位置
+    """
+    
+    def __init__(self, panorama_camera, ptz_camera, detector, calibrator=None):
+        """
+        初始化
+        
+        Args:
+            panorama_camera: 全景摄像头实例
+            ptz_camera: 球机实例
+            detector: 人体检测器
+            calibrator: 校准器(可选,用于坐标转换)
+        """
+        self.panorama = panorama_camera
+        self.ptz = ptz_camera
+        self.detector = detector
+        self.calibrator = calibrator
+        
+        # 配置
+        self.detection_interval = 1.0      # 检测间隔(秒)
+        self.ptz_move_wait = 0.5           # PTZ移动后等待时间(秒)
+        self.min_person_area = 2000        # 最小人体面积(像素²)
+        self.confidence_threshold = 0.7    # 置信度阈值
+        self.default_zoom = 8              # 默认变倍
+        self.save_dir = Path('/home/admin/dsh/captures')  # 抓拍保存目录
+        self.initial_position = (90, 0, 1) # 初始位置 (pan, tilt, zoom)
+        
+        # 状态
+        self.running = False
+        self.capturing = False  # 是否正在抓拍
+        self.worker_thread = None
+        self.lock = threading.Lock()
+        
+        # 统计
+        self.stats = {
+            'frames_processed': 0,
+            'persons_detected': 0,
+            'captures_taken': 0,
+        }
+        
+        # 创建保存目录
+        self.save_dir.mkdir(parents=True, exist_ok=True)
+    
+    def start(self) -> bool:
+        """启动联动系统"""
+        if self.running:
+            return True
+        
+        # 连接全景摄像头
+        if not self.panorama.connect():
+            logger.error("连接全景摄像头失败")
+            return False
+        
+        # 连接球机
+        if not self.ptz.connect():
+            logger.error("连接球机失败")
+            self.panorama.disconnect()
+            return False
+        
+        # 启动全景视频流
+        if not self.panorama.start_stream_rtsp():
+            logger.error("启动全景视频流失败")
+            self.panorama.disconnect()
+            self.ptz.disconnect()
+            return False
+        
+        # 启动球机视频流(用于抓拍)
+        if not self.ptz.start_stream_rtsp():
+            logger.warning("球机视频流启动失败,抓拍可能失败")
+        
+        # 移动球机到初始位置
+        self.ptz.goto_exact_position(*self.initial_position)
+        
+        # 启动工作线程
+        self.running = True
+        self.worker_thread = threading.Thread(target=self._worker, daemon=True)
+        self.worker_thread.start()
+        
+        logger.info("简化版联动系统已启动")
+        return True
+    
+    def stop(self):
+        """停止联动系统"""
+        self.running = False
+        
+        if self.worker_thread:
+            self.worker_thread.join(timeout=3)
+        
+        self.panorama.disconnect()
+        self.ptz.disconnect()
+        
+        logger.info(f"联动系统已停止,统计: {self.stats}")
+    
+    def _worker(self):
+        """工作线程"""
+        last_detection_time = 0
+        
+        while self.running:
+            try:
+                current_time = time.time()
+                
+                # 获取当前帧
+                frame = self.panorama.get_frame()
+                if frame is None:
+                    time.sleep(0.01)
+                    continue
+                
+                self.stats['frames_processed'] += 1
+                frame_size = (frame.shape[1], frame.shape[0])
+                
+                # 周期性检测
+                if current_time - last_detection_time >= self.detection_interval:
+                    last_detection_time = current_time
+                    
+                    # 检测人体
+                    persons = self._detect_persons(frame)
+                    
+                    if persons:
+                        logger.info(f"[检测] 检测到 {len(persons)} 人")
+                        self.stats['persons_detected'] += len(persons)
+                        
+                        # 保存全景检测图(标记人员)
+                        self._save_panorama_detection(frame, persons)
+                        
+                        # 逐个抓拍
+                        self._capture_all_persons(persons, frame_size)
+                
+                time.sleep(0.01)
+                
+            except Exception as e:
+                logger.error(f"工作线程错误: {e}")
+                time.sleep(0.1)
+    
+    def _detect_persons(self, frame: np.ndarray) -> List[PersonTarget]:
+        """检测人体"""
+        if self.detector is None:
+            return []
+        
+        # 使用检测器
+        detections = self.detector.detect_persons(frame)
+        
+        if not detections:
+            return []
+        
+        frame_h, frame_w = frame.shape[:2]
+        persons = []
+        
+        for i, det in enumerate(detections):
+            # 过滤低置信度
+            if det.confidence < self.confidence_threshold:
+                continue
+            
+            x, y, w, h = det.bbox
+            area = w * h
+            
+            # 过滤小目标
+            if area < self.min_person_area:
+                continue
+            
+            # 计算位置比例
+            center_x = x + w // 2
+            center_y = y + h // 2
+            x_ratio = center_x / frame_w
+            y_ratio = center_y / frame_h
+            
+            persons.append(PersonTarget(
+                index=i,
+                position=(x_ratio, y_ratio),
+                bbox=det.bbox,
+                area=area,
+                confidence=det.confidence
+            ))
+        
+        # 按面积从大到小排序
+        persons.sort(key=lambda p: p.area, reverse=True)
+        
+        # 重新分配序号(按排序后的顺序)
+        for i, p in enumerate(persons):
+            p.index = i
+        
+        return persons
+    
+    def _save_panorama_detection(self, frame: np.ndarray, persons: List[PersonTarget]):
+        """保存全景检测图(标记人员)"""
+        marked_frame = frame.copy()
+        
+        for p in persons:
+            x, y, w, h = p.bbox
+            
+            # 绘制边界框(绿色)
+            cv2.rectangle(marked_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
+            
+            # 绘制序号标签
+            label = f"person_{p.index}"
+            cv2.putText(
+                marked_frame, label,
+                (x, y - 10),
+                cv2.FONT_HERSHEY_SIMPLEX, 0.8,
+                (0, 255, 0), 2
+            )
+        
+        # 保存
+        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+        filename = f"panorama_{timestamp}_n{len(persons)}.jpg"
+        filepath = self.save_dir / filename
+        cv2.imwrite(str(filepath), marked_frame)
+        
+        logger.info(f"[全景] 保存检测图: {filepath}")
+    
+    def _capture_all_persons(self, persons: List[PersonTarget], frame_size: Tuple[int, int]):
+        """逐个抓拍所有人员"""
+        with self.lock:
+            if self.capturing:
+                logger.info("正在抓拍中,跳过本次检测")
+                return
+            self.capturing = True
+        
+        try:
+            logger.info(f"[抓拍] 开始逐个抓拍 {len(persons)} 人")
+            
+            # 创建本次抓拍的子目录
+            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+            batch_dir = self.save_dir / f"batch_{timestamp}"
+            batch_dir.mkdir(parents=True, exist_ok=True)
+            
+            for i, person in enumerate(persons):
+                if not self.running:
+                    break
+                
+                logger.info(f"[抓拍] 正在抓拍 person_{person.index} ({i+1}/{len(persons)})")
+                
+                # 计算PTZ角度
+                pan, tilt, zoom = self._calculate_ptz_position(
+                    person.position[0], person.position[1]
+                )
+                
+                # 移动球机
+                logger.info(f"[PTZ] 移动到: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}")
+                self.ptz.goto_exact_position(pan, tilt, zoom)
+                
+                # 等待画面稳定
+                time.sleep(self.ptz_move_wait)
+                
+                # 抓拍球机画面
+                ptz_frame = self._get_stable_frame()
+                
+                if ptz_frame is not None:
+                    # 保存抓拍图
+                    filename = f"ptz_person_{person.index}.jpg"
+                    filepath = batch_dir / filename
+                    cv2.imwrite(str(filepath), ptz_frame)
+                    
+                    self.stats['captures_taken'] += 1
+                    logger.info(f"[抓拍] 保存球机图: {filepath}")
+                else:
+                    logger.warning(f"[抓拍] 获取球机画面失败: person_{person.index}")
+            
+            # 所有人抓拍完成,回到初始位置
+            logger.info(f"[抓拍] 完成 {len(persons)} 人抓拍,返回初始位置")
+            self.ptz.goto_exact_position(*self.initial_position)
+            
+        finally:
+            self.capturing = False
+    
+    def _calculate_ptz_position(self, x_ratio: float, y_ratio: float) -> Tuple[float, float, int]:
+        """计算PTZ位置"""
+        # 如果有校准器,使用校准结果
+        if self.calibrator and self.calibrator.is_calibrated():
+            pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
+            return (pan, tilt, self.default_zoom)
+        
+        # 否则使用估算
+        # 假设全景视野对应 pan: 0-180°, tilt: -45°~45°
+        pan = x_ratio * 180
+        tilt = -45 + y_ratio * 90  # y_ratio=0.5 对应 tilt=0
+        
+        return (pan, tilt, self.default_zoom)
+    
+    def _get_stable_frame(self, max_attempts: int = 5) -> Optional[np.ndarray]:
+        """获取稳定清晰的帧"""
+        best_frame = None
+        best_score = -1
+        
+        for _ in range(max_attempts):
+            frame = self.ptz.get_frame()
+            if frame is not None:
+                # 评估清晰度
+                gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
+                score = cv2.Laplacian(gray, cv2.CV_64F).var()
+                
+                if score > best_score:
+                    best_score = score
+                    best_frame = frame.copy()
+                
+                # 清晰度足够高就直接返回
+                if score > 100:
+                    return frame
+            
+            time.sleep(0.1)
+        
+        return best_frame
+    
+    def get_stats(self) -> dict:
+        """获取统计信息"""
+        return self.stats.copy()

+ 254 - 0
dual_camera_system/simple_main.py

@@ -0,0 +1,254 @@
+"""
+简化版双摄像头联动抓拍系统
+全景检测到人 → 球机逐个定位抓拍
+
+使用方法:
+    python simple_main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101
+"""
+
+import os
+os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
+
+import sys
+import argparse
+import logging
+import time
+import signal
+
+# 添加项目路径
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+from config import (
+    PANORAMA_CAMERA, PTZ_CAMERA, SDK_PATH,
+    DETECTION_CONFIG, PTZ_CONFIG, LOG_CONFIG
+)
+from dahua_sdk import DahuaSDK
+from panorama_camera import PanoramaCamera, ObjectDetector
+from ptz_camera import PTZCamera
+from simple_coordinator import SimpleCoordinator
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+
+class SimpleSystem:
+    """简化版双摄像头联动系统"""
+    
+    def __init__(self, config_override: dict = None):
+        self.config = config_override or {}
+        self.sdk = None
+        self.panorama_camera = None
+        self.ptz_camera = None
+        self.detector = None
+        self.coordinator = None
+        self.running = False
+    
+    def initialize(self) -> bool:
+        """初始化系统"""
+        logger.info("=" * 50)
+        logger.info("初始化简化版联动系统")
+        logger.info("=" * 50)
+        
+        # 1. 初始化检测器(必须先于SDK加载)
+        try:
+            self.detector = ObjectDetector(
+                model_path=self.config.get('model_path', DETECTION_CONFIG.get('model_path')),
+                use_gpu=self.config.get('use_gpu', True),
+                model_size=self.config.get('model_size', 'n'),
+                model_type=self.config.get('model_type', 'auto')
+            )
+            logger.info("✓ 检测器初始化成功")
+        except Exception as e:
+            logger.error(f"✗ 检测器初始化失败: {e}")
+            return False
+        
+        # 2. 初始化SDK
+        sdk_path = os.path.join(
+            self.config.get('sdk_path', SDK_PATH['lib_path']),
+            self.config.get('netsdk', SDK_PATH['netsdk'])
+        )
+        try:
+            self.sdk = DahuaSDK(sdk_path)
+            if not self.sdk.init():
+                logger.error("✗ SDK初始化失败")
+                return False
+            logger.info("✓ SDK初始化成功")
+        except Exception as e:
+            logger.error(f"✗ SDK加载失败: {e}")
+            return False
+        
+        # 3. 初始化摄像头
+        panorama_config = self.config.get('panorama_camera', PANORAMA_CAMERA)
+        self.panorama_camera = PanoramaCamera(self.sdk, panorama_config)
+        
+        ptz_config = self.config.get('ptz_camera', PTZ_CAMERA)
+        self.ptz_camera = PTZCamera(self.sdk, ptz_config)
+        
+        # 4. 初始化联动控制器
+        self.coordinator = SimpleCoordinator(
+            self.panorama_camera,
+            self.ptz_camera,
+            self.detector
+        )
+        
+        logger.info("=" * 50)
+        logger.info("系统初始化完成")
+        logger.info("=" * 50)
+        
+        return True
+    
+    def start(self) -> bool:
+        """启动系统"""
+        if self.running:
+            return True
+        
+        if not self.coordinator.start():
+            logger.error("启动联动系统失败")
+            return False
+        
+        self.running = True
+        logger.info("系统已启动")
+        return True
+    
+    def stop(self):
+        """停止系统"""
+        if not self.running:
+            return
+        
+        self.running = False
+        self.coordinator.stop()
+        
+        if self.sdk:
+            self.sdk.cleanup()
+        
+        logger.info("系统已停止")
+    
+    def get_stats(self):
+        """获取统计信息"""
+        if self.coordinator:
+            return self.coordinator.get_stats()
+        return {}
+
+
+def main():
+    """主函数"""
+    parser = argparse.ArgumentParser(
+        description='简化版双摄像头联动抓拍系统',
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog="""
+示例:
+  python simple_main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101
+  python simple_main.py --panorama-ip 192.168.1.100 --ptz-ip 192.168.1.101 --model-size m
+        """
+    )
+    
+    parser.add_argument('--panorama-ip', type=str, required=True,
+                        help='全景摄像头IP')
+    parser.add_argument('--ptz-ip', type=str, required=True,
+                        help='球机IP')
+    parser.add_argument('--username', type=str, default='admin',
+                        help='用户名 (默认: admin)')
+    parser.add_argument('--password', type=str, default='admin123',
+                        help='密码 (默认: admin123)')
+    parser.add_argument('--model', type=str,
+                        help='检测模型路径')
+    parser.add_argument('--model-size', type=str, default='n',
+                        choices=['n', 's', 'm', 'l', 'x'],
+                        help='YOLO11模型尺寸 (默认: n)')
+    parser.add_argument('--no-gpu', action='store_true',
+                        help='不使用GPU')
+    parser.add_argument('--detection-interval', type=float, default=1.0,
+                        help='检测间隔秒数 (默认: 1.0)')
+    parser.add_argument('--ptz-wait', type=float, default=0.5,
+                        help='PTZ移动后等待时间 (默认: 0.5)')
+    parser.add_argument('--min-area', type=int, default=2000,
+                        help='最小人体面积像素 (默认: 2000)')
+    parser.add_argument('--confidence', type=float, default=0.7,
+                        help='置信度阈值 (默认: 0.7)')
+    parser.add_argument('--zoom', type=int, default=8,
+                        help='默认变倍 (默认: 8)')
+    
+    args = parser.parse_args()
+    
+    # 构建配置
+    config = {
+        'panorama_camera': {
+            **PANORAMA_CAMERA,
+            'ip': args.panorama_ip,
+            'username': args.username,
+            'password': args.password,
+        },
+        'ptz_camera': {
+            **PTZ_CAMERA,
+            'ip': args.ptz_ip,
+            'username': args.username,
+            'password': args.password,
+        },
+        'model_path': args.model,
+        'model_size': args.model_size,
+        'use_gpu': not args.no_gpu,
+    }
+    
+    # 创建系统
+    system = SimpleSystem(config)
+    
+    # 设置信号处理
+    def signal_handler(sig, frame):
+        logger.info("\n接收到停止信号,正在退出...")
+        system.stop()
+        sys.exit(0)
+    
+    signal.signal(signal.SIGINT, signal_handler)
+    signal.signal(signal.SIGTERM, signal_handler)
+    
+    try:
+        # 初始化
+        if not system.initialize():
+            logger.error("系统初始化失败!")
+            return 1
+        
+        # 启动
+        if not system.start():
+            logger.error("系统启动失败!")
+            return 1
+        
+        # 打印运行状态
+        print("\n" + "=" * 50)
+        print("系统运行中,按 Ctrl+C 停止")
+        print("=" * 50)
+        print(f"全景摄像头: {args.panorama_ip}")
+        print(f"球机: {args.ptz_ip}")
+        print(f"检测间隔: {args.detection_interval}秒")
+        print(f"置信度阈值: {args.confidence}")
+        print(f"最小人体面积: {args.min_area}像素²")
+        print(f"默认变倍: {args.zoom}")
+        print("=" * 50 + "\n")
+        
+        # 运行循环
+        last_stats_time = time.time()
+        while system.running:
+            time.sleep(1)
+            
+            # 每30秒打印统计
+            if time.time() - last_stats_time >= 30:
+                stats = system.get_stats()
+                logger.info(f"[统计] 帧数={stats.get('frames_processed', 0)} "
+                           f"检测人数={stats.get('persons_detected', 0)} "
+                           f"抓拍次数={stats.get('captures_taken', 0)}")
+                last_stats_time = time.time()
+    
+    except KeyboardInterrupt:
+        pass
+    
+    finally:
+        system.stop()
+    
+    return 0
+
+
+if __name__ == '__main__':
+    sys.exit(main() or 0)