|
|
@@ -1,345 +0,0 @@
|
|
|
-"""
|
|
|
-简化版联动控制器
|
|
|
-核心功能:全景检测到人 → 球机逐个定位抓拍
|
|
|
-"""
|
|
|
-
|
|
|
-import os
|
|
|
-os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
|
|
|
-
|
|
|
-import time
|
|
|
-import threading
|
|
|
-import logging
|
|
|
-from typing import Optional, List, Tuple
|
|
|
-from dataclasses import dataclass
|
|
|
-from datetime import datetime
|
|
|
-from pathlib import Path
|
|
|
-
|
|
|
-import cv2
|
|
|
-import numpy as np
|
|
|
-
|
|
|
-logger = logging.getLogger(__name__)
|
|
|
-
|
|
|
-
|
|
|
-@dataclass
|
|
|
-class PersonTarget:
|
|
|
- """检测到的人员目标"""
|
|
|
- index: int # 序号(用于标记)
|
|
|
- position: Tuple[float, float] # 位置比例 (x_ratio, y_ratio)
|
|
|
- bbox: Tuple[int, int, int, int] # 边界框 (x, y, w, h)
|
|
|
- area: int # 面积
|
|
|
- confidence: float # 置信度
|
|
|
-
|
|
|
-
|
|
|
-class SimpleCoordinator:
|
|
|
- """
|
|
|
- 简化版联动控制器
|
|
|
-
|
|
|
- 工作流程:
|
|
|
- 1. 全景摄像头实时检测人体
|
|
|
- 2. 检测到人后,按面积从大到小排序
|
|
|
- 3. 球机逐个定位到每个人,抓拍保存
|
|
|
- 4. 所有人抓拍完成后,回到初始位置
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self, panorama_camera, ptz_camera, detector, calibrator=None):
|
|
|
- """
|
|
|
- 初始化
|
|
|
-
|
|
|
- Args:
|
|
|
- panorama_camera: 全景摄像头实例
|
|
|
- ptz_camera: 球机实例
|
|
|
- detector: 人体检测器
|
|
|
- calibrator: 校准器(可选,用于坐标转换)
|
|
|
- """
|
|
|
- self.panorama = panorama_camera
|
|
|
- self.ptz = ptz_camera
|
|
|
- self.detector = detector
|
|
|
- self.calibrator = calibrator
|
|
|
-
|
|
|
- # 配置
|
|
|
- self.detection_interval = 1.0 # 检测间隔(秒)
|
|
|
- self.ptz_move_wait = 0.5 # PTZ移动后等待时间(秒)
|
|
|
- self.min_person_area = 2000 # 最小人体面积(像素²)
|
|
|
- self.confidence_threshold = 0.7 # 置信度阈值
|
|
|
- self.default_zoom = 8 # 默认变倍
|
|
|
- self.save_dir = Path('/home/admin/dsh/captures') # 抓拍保存目录
|
|
|
- self.initial_position = (90, 0, 1) # 初始位置 (pan, tilt, zoom)
|
|
|
-
|
|
|
- # 状态
|
|
|
- self.running = False
|
|
|
- self.capturing = False # 是否正在抓拍
|
|
|
- self.worker_thread = None
|
|
|
- self.lock = threading.Lock()
|
|
|
-
|
|
|
- # 统计
|
|
|
- self.stats = {
|
|
|
- 'frames_processed': 0,
|
|
|
- 'persons_detected': 0,
|
|
|
- 'captures_taken': 0,
|
|
|
- }
|
|
|
-
|
|
|
- # 创建保存目录
|
|
|
- self.save_dir.mkdir(parents=True, exist_ok=True)
|
|
|
-
|
|
|
- def start(self) -> bool:
|
|
|
- """启动联动系统"""
|
|
|
- if self.running:
|
|
|
- return True
|
|
|
-
|
|
|
- # 连接全景摄像头
|
|
|
- if not self.panorama.connect():
|
|
|
- logger.error("连接全景摄像头失败")
|
|
|
- return False
|
|
|
-
|
|
|
- # 连接球机
|
|
|
- if not self.ptz.connect():
|
|
|
- logger.error("连接球机失败")
|
|
|
- self.panorama.disconnect()
|
|
|
- return False
|
|
|
-
|
|
|
- # 启动全景视频流
|
|
|
- if not self.panorama.start_stream_rtsp():
|
|
|
- logger.error("启动全景视频流失败")
|
|
|
- self.panorama.disconnect()
|
|
|
- self.ptz.disconnect()
|
|
|
- return False
|
|
|
-
|
|
|
- # 启动球机视频流(用于抓拍)
|
|
|
- if not self.ptz.start_stream_rtsp():
|
|
|
- logger.warning("球机视频流启动失败,抓拍可能失败")
|
|
|
-
|
|
|
- # 移动球机到初始位置
|
|
|
- self.ptz.goto_exact_position(*self.initial_position)
|
|
|
-
|
|
|
- # 启动工作线程
|
|
|
- self.running = True
|
|
|
- self.worker_thread = threading.Thread(target=self._worker, daemon=True)
|
|
|
- self.worker_thread.start()
|
|
|
-
|
|
|
- logger.info("简化版联动系统已启动")
|
|
|
- return True
|
|
|
-
|
|
|
- def stop(self):
|
|
|
- """停止联动系统"""
|
|
|
- self.running = False
|
|
|
-
|
|
|
- if self.worker_thread:
|
|
|
- self.worker_thread.join(timeout=3)
|
|
|
-
|
|
|
- self.panorama.disconnect()
|
|
|
- self.ptz.disconnect()
|
|
|
-
|
|
|
- logger.info(f"联动系统已停止,统计: {self.stats}")
|
|
|
-
|
|
|
- def _worker(self):
|
|
|
- """工作线程"""
|
|
|
- last_detection_time = 0
|
|
|
-
|
|
|
- while self.running:
|
|
|
- try:
|
|
|
- current_time = time.time()
|
|
|
-
|
|
|
- # 获取当前帧
|
|
|
- frame = self.panorama.get_frame()
|
|
|
- if frame is None:
|
|
|
- time.sleep(0.01)
|
|
|
- continue
|
|
|
-
|
|
|
- self.stats['frames_processed'] += 1
|
|
|
- frame_size = (frame.shape[1], frame.shape[0])
|
|
|
-
|
|
|
- # 周期性检测
|
|
|
- if current_time - last_detection_time >= self.detection_interval:
|
|
|
- last_detection_time = current_time
|
|
|
-
|
|
|
- # 检测人体
|
|
|
- persons = self._detect_persons(frame)
|
|
|
-
|
|
|
- if persons:
|
|
|
- logger.info(f"[检测] 检测到 {len(persons)} 人")
|
|
|
- self.stats['persons_detected'] += len(persons)
|
|
|
-
|
|
|
- # 保存全景检测图(标记人员)
|
|
|
- self._save_panorama_detection(frame, persons)
|
|
|
-
|
|
|
- # 逐个抓拍
|
|
|
- self._capture_all_persons(persons, frame_size)
|
|
|
-
|
|
|
- time.sleep(0.01)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"工作线程错误: {e}")
|
|
|
- time.sleep(0.1)
|
|
|
-
|
|
|
- def _detect_persons(self, frame: np.ndarray) -> List[PersonTarget]:
|
|
|
- """检测人体"""
|
|
|
- if self.detector is None:
|
|
|
- return []
|
|
|
-
|
|
|
- # 使用检测器
|
|
|
- detections = self.detector.detect_persons(frame)
|
|
|
-
|
|
|
- if not detections:
|
|
|
- return []
|
|
|
-
|
|
|
- frame_h, frame_w = frame.shape[:2]
|
|
|
- persons = []
|
|
|
-
|
|
|
- for i, det in enumerate(detections):
|
|
|
- # 过滤低置信度
|
|
|
- if det.confidence < self.confidence_threshold:
|
|
|
- continue
|
|
|
-
|
|
|
- x, y, w, h = det.bbox
|
|
|
- area = w * h
|
|
|
-
|
|
|
- # 过滤小目标
|
|
|
- if area < self.min_person_area:
|
|
|
- continue
|
|
|
-
|
|
|
- # 计算位置比例
|
|
|
- center_x = x + w // 2
|
|
|
- center_y = y + h // 2
|
|
|
- x_ratio = center_x / frame_w
|
|
|
- y_ratio = center_y / frame_h
|
|
|
-
|
|
|
- persons.append(PersonTarget(
|
|
|
- index=i,
|
|
|
- position=(x_ratio, y_ratio),
|
|
|
- bbox=det.bbox,
|
|
|
- area=area,
|
|
|
- confidence=det.confidence
|
|
|
- ))
|
|
|
-
|
|
|
- # 按面积从大到小排序
|
|
|
- persons.sort(key=lambda p: p.area, reverse=True)
|
|
|
-
|
|
|
- # 重新分配序号(按排序后的顺序)
|
|
|
- for i, p in enumerate(persons):
|
|
|
- p.index = i
|
|
|
-
|
|
|
- return persons
|
|
|
-
|
|
|
- def _save_panorama_detection(self, frame: np.ndarray, persons: List[PersonTarget]):
|
|
|
- """保存全景检测图(标记人员)"""
|
|
|
- marked_frame = frame.copy()
|
|
|
-
|
|
|
- for p in persons:
|
|
|
- x, y, w, h = p.bbox
|
|
|
-
|
|
|
- # 绘制边界框(绿色)
|
|
|
- cv2.rectangle(marked_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
|
|
|
-
|
|
|
- # 绘制序号标签
|
|
|
- label = f"person_{p.index}"
|
|
|
- cv2.putText(
|
|
|
- marked_frame, label,
|
|
|
- (x, y - 10),
|
|
|
- cv2.FONT_HERSHEY_SIMPLEX, 0.8,
|
|
|
- (0, 255, 0), 2
|
|
|
- )
|
|
|
-
|
|
|
- # 保存
|
|
|
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
- filename = f"panorama_{timestamp}_n{len(persons)}.jpg"
|
|
|
- filepath = self.save_dir / filename
|
|
|
- cv2.imwrite(str(filepath), marked_frame)
|
|
|
-
|
|
|
- logger.info(f"[全景] 保存检测图: {filepath}")
|
|
|
-
|
|
|
- def _capture_all_persons(self, persons: List[PersonTarget], frame_size: Tuple[int, int]):
|
|
|
- """逐个抓拍所有人员"""
|
|
|
- with self.lock:
|
|
|
- if self.capturing:
|
|
|
- logger.info("正在抓拍中,跳过本次检测")
|
|
|
- return
|
|
|
- self.capturing = True
|
|
|
-
|
|
|
- try:
|
|
|
- logger.info(f"[抓拍] 开始逐个抓拍 {len(persons)} 人")
|
|
|
-
|
|
|
- # 创建本次抓拍的子目录
|
|
|
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
- batch_dir = self.save_dir / f"batch_{timestamp}"
|
|
|
- batch_dir.mkdir(parents=True, exist_ok=True)
|
|
|
-
|
|
|
- for i, person in enumerate(persons):
|
|
|
- if not self.running:
|
|
|
- break
|
|
|
-
|
|
|
- logger.info(f"[抓拍] 正在抓拍 person_{person.index} ({i+1}/{len(persons)})")
|
|
|
-
|
|
|
- # 计算PTZ角度
|
|
|
- pan, tilt, zoom = self._calculate_ptz_position(
|
|
|
- person.position[0], person.position[1]
|
|
|
- )
|
|
|
-
|
|
|
- # 移动球机
|
|
|
- logger.info(f"[PTZ] 移动到: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}")
|
|
|
- self.ptz.goto_exact_position(pan, tilt, zoom)
|
|
|
-
|
|
|
- # 等待画面稳定
|
|
|
- time.sleep(self.ptz_move_wait)
|
|
|
-
|
|
|
- # 抓拍球机画面
|
|
|
- ptz_frame = self._get_stable_frame()
|
|
|
-
|
|
|
- if ptz_frame is not None:
|
|
|
- # 保存抓拍图
|
|
|
- filename = f"ptz_person_{person.index}.jpg"
|
|
|
- filepath = batch_dir / filename
|
|
|
- cv2.imwrite(str(filepath), ptz_frame)
|
|
|
-
|
|
|
- self.stats['captures_taken'] += 1
|
|
|
- logger.info(f"[抓拍] 保存球机图: {filepath}")
|
|
|
- else:
|
|
|
- logger.warning(f"[抓拍] 获取球机画面失败: person_{person.index}")
|
|
|
-
|
|
|
- # 所有人抓拍完成,回到初始位置
|
|
|
- logger.info(f"[抓拍] 完成 {len(persons)} 人抓拍,返回初始位置")
|
|
|
- self.ptz.goto_exact_position(*self.initial_position)
|
|
|
-
|
|
|
- finally:
|
|
|
- self.capturing = False
|
|
|
-
|
|
|
- def _calculate_ptz_position(self, x_ratio: float, y_ratio: float) -> Tuple[float, float, int]:
|
|
|
- """计算PTZ位置"""
|
|
|
- # 如果有校准器,使用校准结果
|
|
|
- if self.calibrator and self.calibrator.is_calibrated():
|
|
|
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
|
|
|
- return (pan, tilt, self.default_zoom)
|
|
|
-
|
|
|
- # 否则使用估算
|
|
|
- # 假设全景视野对应 pan: 0-180°, tilt: -45°~45°
|
|
|
- pan = x_ratio * 180
|
|
|
- tilt = -45 + y_ratio * 90 # y_ratio=0.5 对应 tilt=0
|
|
|
-
|
|
|
- return (pan, tilt, self.default_zoom)
|
|
|
-
|
|
|
- def _get_stable_frame(self, max_attempts: int = 5) -> Optional[np.ndarray]:
|
|
|
- """获取稳定清晰的帧"""
|
|
|
- best_frame = None
|
|
|
- best_score = -1
|
|
|
-
|
|
|
- for _ in range(max_attempts):
|
|
|
- frame = self.ptz.get_frame()
|
|
|
- if frame is not None:
|
|
|
- # 评估清晰度
|
|
|
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
- score = cv2.Laplacian(gray, cv2.CV_64F).var()
|
|
|
-
|
|
|
- if score > best_score:
|
|
|
- best_score = score
|
|
|
- best_frame = frame.copy()
|
|
|
-
|
|
|
- # 清晰度足够高就直接返回
|
|
|
- if score > 100:
|
|
|
- return frame
|
|
|
-
|
|
|
- time.sleep(0.1)
|
|
|
-
|
|
|
- return best_frame
|
|
|
-
|
|
|
- def get_stats(self) -> dict:
|
|
|
- """获取统计信息"""
|
|
|
- return self.stats.copy()
|