| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- """
- 简化版联动控制器
- 核心功能:全景检测到人 → 球机逐个定位抓拍
- """
- import os
- os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
- import time
- import threading
- import logging
- from typing import Optional, List, Tuple
- from dataclasses import dataclass
- from datetime import datetime
- from pathlib import Path
- import cv2
- import numpy as np
- logger = logging.getLogger(__name__)
- @dataclass
- class PersonTarget:
- """检测到的人员目标"""
- index: int # 序号(用于标记)
- position: Tuple[float, float] # 位置比例 (x_ratio, y_ratio)
- bbox: Tuple[int, int, int, int] # 边界框 (x, y, w, h)
- area: int # 面积
- confidence: float # 置信度
- class SimpleCoordinator:
- """
- 简化版联动控制器
-
- 工作流程:
- 1. 全景摄像头实时检测人体
- 2. 检测到人后,按面积从大到小排序
- 3. 球机逐个定位到每个人,抓拍保存
- 4. 所有人抓拍完成后,回到初始位置
- """
-
- def __init__(self, panorama_camera, ptz_camera, detector, calibrator=None):
- """
- 初始化
-
- Args:
- panorama_camera: 全景摄像头实例
- ptz_camera: 球机实例
- detector: 人体检测器
- calibrator: 校准器(可选,用于坐标转换)
- """
- self.panorama = panorama_camera
- self.ptz = ptz_camera
- self.detector = detector
- self.calibrator = calibrator
-
- # 配置
- self.detection_interval = 1.0 # 检测间隔(秒)
- self.ptz_move_wait = 0.5 # PTZ移动后等待时间(秒)
- self.min_person_area = 2000 # 最小人体面积(像素²)
- self.confidence_threshold = 0.7 # 置信度阈值
- self.default_zoom = 8 # 默认变倍
- self.save_dir = Path('/home/admin/dsh/captures') # 抓拍保存目录
- self.initial_position = (90, 0, 1) # 初始位置 (pan, tilt, zoom)
-
- # 状态
- self.running = False
- self.capturing = False # 是否正在抓拍
- self.worker_thread = None
- self.lock = threading.Lock()
-
- # 统计
- self.stats = {
- 'frames_processed': 0,
- 'persons_detected': 0,
- 'captures_taken': 0,
- }
-
- # 创建保存目录
- self.save_dir.mkdir(parents=True, exist_ok=True)
-
- def start(self) -> bool:
- """启动联动系统"""
- if self.running:
- return True
-
- # 连接全景摄像头
- if not self.panorama.connect():
- logger.error("连接全景摄像头失败")
- return False
-
- # 连接球机
- if not self.ptz.connect():
- logger.error("连接球机失败")
- self.panorama.disconnect()
- return False
-
- # 启动全景视频流
- if not self.panorama.start_stream_rtsp():
- logger.error("启动全景视频流失败")
- self.panorama.disconnect()
- self.ptz.disconnect()
- return False
-
- # 启动球机视频流(用于抓拍)
- if not self.ptz.start_stream_rtsp():
- logger.warning("球机视频流启动失败,抓拍可能失败")
-
- # 移动球机到初始位置
- self.ptz.goto_exact_position(*self.initial_position)
-
- # 启动工作线程
- self.running = True
- self.worker_thread = threading.Thread(target=self._worker, daemon=True)
- self.worker_thread.start()
-
- logger.info("简化版联动系统已启动")
- return True
-
- def stop(self):
- """停止联动系统"""
- self.running = False
-
- if self.worker_thread:
- self.worker_thread.join(timeout=3)
-
- self.panorama.disconnect()
- self.ptz.disconnect()
-
- logger.info(f"联动系统已停止,统计: {self.stats}")
-
- def _worker(self):
- """工作线程"""
- last_detection_time = 0
-
- while self.running:
- try:
- current_time = time.time()
-
- # 获取当前帧
- frame = self.panorama.get_frame()
- if frame is None:
- time.sleep(0.01)
- continue
-
- self.stats['frames_processed'] += 1
- frame_size = (frame.shape[1], frame.shape[0])
-
- # 周期性检测
- if current_time - last_detection_time >= self.detection_interval:
- last_detection_time = current_time
-
- # 检测人体
- persons = self._detect_persons(frame)
-
- if persons:
- logger.info(f"[检测] 检测到 {len(persons)} 人")
- self.stats['persons_detected'] += len(persons)
-
- # 保存全景检测图(标记人员)
- self._save_panorama_detection(frame, persons)
-
- # 逐个抓拍
- self._capture_all_persons(persons, frame_size)
-
- time.sleep(0.01)
-
- except Exception as e:
- logger.error(f"工作线程错误: {e}")
- time.sleep(0.1)
-
- def _detect_persons(self, frame: np.ndarray) -> List[PersonTarget]:
- """检测人体"""
- if self.detector is None:
- return []
-
- # 使用检测器
- detections = self.detector.detect_persons(frame)
-
- if not detections:
- return []
-
- frame_h, frame_w = frame.shape[:2]
- persons = []
-
- for i, det in enumerate(detections):
- # 过滤低置信度
- if det.confidence < self.confidence_threshold:
- continue
-
- x, y, w, h = det.bbox
- area = w * h
-
- # 过滤小目标
- if area < self.min_person_area:
- continue
-
- # 计算位置比例
- center_x = x + w // 2
- center_y = y + h // 2
- x_ratio = center_x / frame_w
- y_ratio = center_y / frame_h
-
- persons.append(PersonTarget(
- index=i,
- position=(x_ratio, y_ratio),
- bbox=det.bbox,
- area=area,
- confidence=det.confidence
- ))
-
- # 按面积从大到小排序
- persons.sort(key=lambda p: p.area, reverse=True)
-
- # 重新分配序号(按排序后的顺序)
- for i, p in enumerate(persons):
- p.index = i
-
- return persons
-
- def _save_panorama_detection(self, frame: np.ndarray, persons: List[PersonTarget]):
- """保存全景检测图(标记人员)"""
- marked_frame = frame.copy()
-
- for p in persons:
- x, y, w, h = p.bbox
-
- # 绘制边界框(绿色)
- cv2.rectangle(marked_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
-
- # 绘制序号标签
- label = f"person_{p.index}"
- cv2.putText(
- marked_frame, label,
- (x, y - 10),
- cv2.FONT_HERSHEY_SIMPLEX, 0.8,
- (0, 255, 0), 2
- )
-
- # 保存
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"panorama_{timestamp}_n{len(persons)}.jpg"
- filepath = self.save_dir / filename
- cv2.imwrite(str(filepath), marked_frame)
-
- logger.info(f"[全景] 保存检测图: {filepath}")
-
- def _capture_all_persons(self, persons: List[PersonTarget], frame_size: Tuple[int, int]):
- """逐个抓拍所有人员"""
- with self.lock:
- if self.capturing:
- logger.info("正在抓拍中,跳过本次检测")
- return
- self.capturing = True
-
- try:
- logger.info(f"[抓拍] 开始逐个抓拍 {len(persons)} 人")
-
- # 创建本次抓拍的子目录
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- batch_dir = self.save_dir / f"batch_{timestamp}"
- batch_dir.mkdir(parents=True, exist_ok=True)
-
- for i, person in enumerate(persons):
- if not self.running:
- break
-
- logger.info(f"[抓拍] 正在抓拍 person_{person.index} ({i+1}/{len(persons)})")
-
- # 计算PTZ角度
- pan, tilt, zoom = self._calculate_ptz_position(
- person.position[0], person.position[1]
- )
-
- # 移动球机
- logger.info(f"[PTZ] 移动到: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}")
- self.ptz.goto_exact_position(pan, tilt, zoom)
-
- # 等待画面稳定
- time.sleep(self.ptz_move_wait)
-
- # 抓拍球机画面
- ptz_frame = self._get_stable_frame()
-
- if ptz_frame is not None:
- # 保存抓拍图
- filename = f"ptz_person_{person.index}.jpg"
- filepath = batch_dir / filename
- cv2.imwrite(str(filepath), ptz_frame)
-
- self.stats['captures_taken'] += 1
- logger.info(f"[抓拍] 保存球机图: {filepath}")
- else:
- logger.warning(f"[抓拍] 获取球机画面失败: person_{person.index}")
-
- # 所有人抓拍完成,回到初始位置
- logger.info(f"[抓拍] 完成 {len(persons)} 人抓拍,返回初始位置")
- self.ptz.goto_exact_position(*self.initial_position)
-
- finally:
- self.capturing = False
-
- def _calculate_ptz_position(self, x_ratio: float, y_ratio: float) -> Tuple[float, float, int]:
- """计算PTZ位置"""
- # 如果有校准器,使用校准结果
- if self.calibrator and self.calibrator.is_calibrated():
- pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
- return (pan, tilt, self.default_zoom)
-
- # 否则使用估算
- # 假设全景视野对应 pan: 0-180°, tilt: -45°~45°
- pan = x_ratio * 180
- tilt = -45 + y_ratio * 90 # y_ratio=0.5 对应 tilt=0
-
- return (pan, tilt, self.default_zoom)
-
- def _get_stable_frame(self, max_attempts: int = 5) -> Optional[np.ndarray]:
- """获取稳定清晰的帧"""
- best_frame = None
- best_score = -1
-
- for _ in range(max_attempts):
- frame = self.ptz.get_frame()
- if frame is not None:
- # 评估清晰度
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- score = cv2.Laplacian(gray, cv2.CV_64F).var()
-
- if score > best_score:
- best_score = score
- best_frame = frame.copy()
-
- # 清晰度足够高就直接返回
- if score > 100:
- return frame
-
- time.sleep(0.1)
-
- return best_frame
-
- def get_stats(self) -> dict:
- """获取统计信息"""
- return self.stats.copy()
|