simple_coordinator.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. """
  2. 简化版联动控制器
  3. 核心功能:全景检测到人 → 球机逐个定位抓拍
  4. """
  5. import os
  6. os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
  7. import time
  8. import threading
  9. import logging
  10. from typing import Optional, List, Tuple
  11. from dataclasses import dataclass
  12. from datetime import datetime
  13. from pathlib import Path
  14. import cv2
  15. import numpy as np
  16. logger = logging.getLogger(__name__)
  17. @dataclass
  18. class PersonTarget:
  19. """检测到的人员目标"""
  20. index: int # 序号(用于标记)
  21. position: Tuple[float, float] # 位置比例 (x_ratio, y_ratio)
  22. bbox: Tuple[int, int, int, int] # 边界框 (x, y, w, h)
  23. area: int # 面积
  24. confidence: float # 置信度
  25. class SimpleCoordinator:
  26. """
  27. 简化版联动控制器
  28. 工作流程:
  29. 1. 全景摄像头实时检测人体
  30. 2. 检测到人后,按面积从大到小排序
  31. 3. 球机逐个定位到每个人,抓拍保存
  32. 4. 所有人抓拍完成后,回到初始位置
  33. """
  34. def __init__(self, panorama_camera, ptz_camera, detector, calibrator=None):
  35. """
  36. 初始化
  37. Args:
  38. panorama_camera: 全景摄像头实例
  39. ptz_camera: 球机实例
  40. detector: 人体检测器
  41. calibrator: 校准器(可选,用于坐标转换)
  42. """
  43. self.panorama = panorama_camera
  44. self.ptz = ptz_camera
  45. self.detector = detector
  46. self.calibrator = calibrator
  47. # 配置
  48. self.detection_interval = 1.0 # 检测间隔(秒)
  49. self.ptz_move_wait = 0.5 # PTZ移动后等待时间(秒)
  50. self.min_person_area = 2000 # 最小人体面积(像素²)
  51. self.confidence_threshold = 0.7 # 置信度阈值
  52. self.default_zoom = 8 # 默认变倍
  53. self.save_dir = Path('/home/admin/dsh/captures') # 抓拍保存目录
  54. self.initial_position = (90, 0, 1) # 初始位置 (pan, tilt, zoom)
  55. # 状态
  56. self.running = False
  57. self.capturing = False # 是否正在抓拍
  58. self.worker_thread = None
  59. self.lock = threading.Lock()
  60. # 统计
  61. self.stats = {
  62. 'frames_processed': 0,
  63. 'persons_detected': 0,
  64. 'captures_taken': 0,
  65. }
  66. # 创建保存目录
  67. self.save_dir.mkdir(parents=True, exist_ok=True)
  68. def start(self) -> bool:
  69. """启动联动系统"""
  70. if self.running:
  71. return True
  72. # 连接全景摄像头
  73. if not self.panorama.connect():
  74. logger.error("连接全景摄像头失败")
  75. return False
  76. # 连接球机
  77. if not self.ptz.connect():
  78. logger.error("连接球机失败")
  79. self.panorama.disconnect()
  80. return False
  81. # 启动全景视频流
  82. if not self.panorama.start_stream_rtsp():
  83. logger.error("启动全景视频流失败")
  84. self.panorama.disconnect()
  85. self.ptz.disconnect()
  86. return False
  87. # 启动球机视频流(用于抓拍)
  88. if not self.ptz.start_stream_rtsp():
  89. logger.warning("球机视频流启动失败,抓拍可能失败")
  90. # 移动球机到初始位置
  91. self.ptz.goto_exact_position(*self.initial_position)
  92. # 启动工作线程
  93. self.running = True
  94. self.worker_thread = threading.Thread(target=self._worker, daemon=True)
  95. self.worker_thread.start()
  96. logger.info("简化版联动系统已启动")
  97. return True
  98. def stop(self):
  99. """停止联动系统"""
  100. self.running = False
  101. if self.worker_thread:
  102. self.worker_thread.join(timeout=3)
  103. self.panorama.disconnect()
  104. self.ptz.disconnect()
  105. logger.info(f"联动系统已停止,统计: {self.stats}")
  106. def _worker(self):
  107. """工作线程"""
  108. last_detection_time = 0
  109. while self.running:
  110. try:
  111. current_time = time.time()
  112. # 获取当前帧
  113. frame = self.panorama.get_frame()
  114. if frame is None:
  115. time.sleep(0.01)
  116. continue
  117. self.stats['frames_processed'] += 1
  118. frame_size = (frame.shape[1], frame.shape[0])
  119. # 周期性检测
  120. if current_time - last_detection_time >= self.detection_interval:
  121. last_detection_time = current_time
  122. # 检测人体
  123. persons = self._detect_persons(frame)
  124. if persons:
  125. logger.info(f"[检测] 检测到 {len(persons)} 人")
  126. self.stats['persons_detected'] += len(persons)
  127. # 保存全景检测图(标记人员)
  128. self._save_panorama_detection(frame, persons)
  129. # 逐个抓拍
  130. self._capture_all_persons(persons, frame_size)
  131. time.sleep(0.01)
  132. except Exception as e:
  133. logger.error(f"工作线程错误: {e}")
  134. time.sleep(0.1)
  135. def _detect_persons(self, frame: np.ndarray) -> List[PersonTarget]:
  136. """检测人体"""
  137. if self.detector is None:
  138. return []
  139. # 使用检测器
  140. detections = self.detector.detect_persons(frame)
  141. if not detections:
  142. return []
  143. frame_h, frame_w = frame.shape[:2]
  144. persons = []
  145. for i, det in enumerate(detections):
  146. # 过滤低置信度
  147. if det.confidence < self.confidence_threshold:
  148. continue
  149. x, y, w, h = det.bbox
  150. area = w * h
  151. # 过滤小目标
  152. if area < self.min_person_area:
  153. continue
  154. # 计算位置比例
  155. center_x = x + w // 2
  156. center_y = y + h // 2
  157. x_ratio = center_x / frame_w
  158. y_ratio = center_y / frame_h
  159. persons.append(PersonTarget(
  160. index=i,
  161. position=(x_ratio, y_ratio),
  162. bbox=det.bbox,
  163. area=area,
  164. confidence=det.confidence
  165. ))
  166. # 按面积从大到小排序
  167. persons.sort(key=lambda p: p.area, reverse=True)
  168. # 重新分配序号(按排序后的顺序)
  169. for i, p in enumerate(persons):
  170. p.index = i
  171. return persons
  172. def _save_panorama_detection(self, frame: np.ndarray, persons: List[PersonTarget]):
  173. """保存全景检测图(标记人员)"""
  174. marked_frame = frame.copy()
  175. for p in persons:
  176. x, y, w, h = p.bbox
  177. # 绘制边界框(绿色)
  178. cv2.rectangle(marked_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
  179. # 绘制序号标签
  180. label = f"person_{p.index}"
  181. cv2.putText(
  182. marked_frame, label,
  183. (x, y - 10),
  184. cv2.FONT_HERSHEY_SIMPLEX, 0.8,
  185. (0, 255, 0), 2
  186. )
  187. # 保存
  188. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  189. filename = f"panorama_{timestamp}_n{len(persons)}.jpg"
  190. filepath = self.save_dir / filename
  191. cv2.imwrite(str(filepath), marked_frame)
  192. logger.info(f"[全景] 保存检测图: {filepath}")
  193. def _capture_all_persons(self, persons: List[PersonTarget], frame_size: Tuple[int, int]):
  194. """逐个抓拍所有人员"""
  195. with self.lock:
  196. if self.capturing:
  197. logger.info("正在抓拍中,跳过本次检测")
  198. return
  199. self.capturing = True
  200. try:
  201. logger.info(f"[抓拍] 开始逐个抓拍 {len(persons)} 人")
  202. # 创建本次抓拍的子目录
  203. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  204. batch_dir = self.save_dir / f"batch_{timestamp}"
  205. batch_dir.mkdir(parents=True, exist_ok=True)
  206. for i, person in enumerate(persons):
  207. if not self.running:
  208. break
  209. logger.info(f"[抓拍] 正在抓拍 person_{person.index} ({i+1}/{len(persons)})")
  210. # 计算PTZ角度
  211. pan, tilt, zoom = self._calculate_ptz_position(
  212. person.position[0], person.position[1]
  213. )
  214. # 移动球机
  215. logger.info(f"[PTZ] 移动到: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom}")
  216. self.ptz.goto_exact_position(pan, tilt, zoom)
  217. # 等待画面稳定
  218. time.sleep(self.ptz_move_wait)
  219. # 抓拍球机画面
  220. ptz_frame = self._get_stable_frame()
  221. if ptz_frame is not None:
  222. # 保存抓拍图
  223. filename = f"ptz_person_{person.index}.jpg"
  224. filepath = batch_dir / filename
  225. cv2.imwrite(str(filepath), ptz_frame)
  226. self.stats['captures_taken'] += 1
  227. logger.info(f"[抓拍] 保存球机图: {filepath}")
  228. else:
  229. logger.warning(f"[抓拍] 获取球机画面失败: person_{person.index}")
  230. # 所有人抓拍完成,回到初始位置
  231. logger.info(f"[抓拍] 完成 {len(persons)} 人抓拍,返回初始位置")
  232. self.ptz.goto_exact_position(*self.initial_position)
  233. finally:
  234. self.capturing = False
  235. def _calculate_ptz_position(self, x_ratio: float, y_ratio: float) -> Tuple[float, float, int]:
  236. """计算PTZ位置"""
  237. # 如果有校准器,使用校准结果
  238. if self.calibrator and self.calibrator.is_calibrated():
  239. pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
  240. return (pan, tilt, self.default_zoom)
  241. # 否则使用估算
  242. # 假设全景视野对应 pan: 0-180°, tilt: -45°~45°
  243. pan = x_ratio * 180
  244. tilt = -45 + y_ratio * 90 # y_ratio=0.5 对应 tilt=0
  245. return (pan, tilt, self.default_zoom)
  246. def _get_stable_frame(self, max_attempts: int = 5) -> Optional[np.ndarray]:
  247. """获取稳定清晰的帧"""
  248. best_frame = None
  249. best_score = -1
  250. for _ in range(max_attempts):
  251. frame = self.ptz.get_frame()
  252. if frame is not None:
  253. # 评估清晰度
  254. gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
  255. score = cv2.Laplacian(gray, cv2.CV_64F).var()
  256. if score > best_score:
  257. best_score = score
  258. best_frame = frame.copy()
  259. # 清晰度足够高就直接返回
  260. if score > 100:
  261. return frame
  262. time.sleep(0.1)
  263. return best_frame
  264. def get_stats(self) -> dict:
  265. """获取统计信息"""
  266. return self.stats.copy()