ptz_person_tracker.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. """
  2. 球机端人体检测与自动对焦模块
  3. 在球机移动到位后,检测人体并自动调整焦距,使人体居中并占据合适比例
  4. """
  5. import time
  6. import cv2
  7. import numpy as np
  8. import logging
  9. from typing import Optional, List, Tuple, Dict
  10. from dataclasses import dataclass
  11. from pathlib import Path
  12. from datetime import datetime
  13. logger = logging.getLogger(__name__)
  14. try:
  15. from ultralytics import YOLO
  16. HAS_YOLO = True
  17. except ImportError:
  18. HAS_YOLO = False
  19. try:
  20. from rknnlite.api import RKNNLite
  21. HAS_RKNN = True
  22. except ImportError:
  23. HAS_RKNN = False
  24. @dataclass
  25. class DetectedPerson:
  26. """检测到的人体"""
  27. bbox: Tuple[int, int, int, int] # (x1, y1, x2, y2)
  28. center: Tuple[float, float] # 中心点 (x, y)
  29. width: int
  30. height: int
  31. confidence: float
  32. @property
  33. def area(self) -> int:
  34. return self.width * self.height
  35. @property
  36. def size_ratio(self) -> float:
  37. """人体宽高比"""
  38. return self.width / self.height if self.height > 0 else 0
  39. @dataclass
  40. class ZoomAdjustResult:
  41. """变焦调整结果"""
  42. success: bool
  43. new_zoom: int
  44. pan_adjust: float # pan调整量(度)
  45. tilt_adjust: float # tilt调整量(度)
  46. person_detected: bool
  47. person_centered: bool
  48. message: str
  49. class PTZPersonDetector:
  50. """
  51. 球机端人体检测器
  52. 支持YOLO(PT)和RKNN两种模型格式
  53. """
  54. def __init__(self, model_path: str = None, model_type: str = 'auto',
  55. confidence_threshold: float = 0.5, use_gpu: bool = False,
  56. save_image: bool = True, image_dir: str = '/home/admin/dsh/ptz_detection_images'):
  57. """
  58. 初始化检测器
  59. Args:
  60. model_path: 模型路径
  61. model_type: 模型类型 ('yolo', 'rknn', 'auto')
  62. confidence_threshold: 置信度阈值
  63. use_gpu: 是否使用GPU
  64. save_image: 是否保存检测图片
  65. image_dir: 图片保存目录
  66. """
  67. self.model_path = model_path
  68. self.model_type = model_type
  69. self.confidence_threshold = confidence_threshold
  70. self.use_gpu = use_gpu
  71. self.model = None
  72. self.person_class_id = 0 # YOLO默认person类别ID
  73. # 图片保存配置
  74. self._save_image_enabled = save_image
  75. self._image_save_dir = Path(image_dir)
  76. self._last_save_time = 0
  77. self._save_interval = 0.5 # 最小保存间隔(秒)
  78. if self._save_image_enabled:
  79. self._ensure_save_dir()
  80. if model_path:
  81. self._load_model(model_path, model_type)
  82. def _ensure_save_dir(self):
  83. """确保保存目录存在"""
  84. try:
  85. self._image_save_dir.mkdir(parents=True, exist_ok=True)
  86. logger.info(f"[球机] 检测图片保存目录: {self._image_save_dir}")
  87. except Exception as e:
  88. logger.error(f"[球机] 创建检测图片目录失败: {e}")
  89. self._save_image_enabled = False
  90. def _load_model(self, model_path: str, model_type: str = 'auto'):
  91. """加载模型"""
  92. if model_type == 'auto':
  93. if model_path.endswith('.rknn'):
  94. model_type = 'rknn'
  95. elif model_path.endswith('.onnx'):
  96. model_type = 'onnx'
  97. else:
  98. model_type = 'yolo'
  99. self.model_type = model_type
  100. if model_type == 'rknn':
  101. self._load_rknn_model(model_path)
  102. elif model_type == 'yolo':
  103. self._load_yolo_model(model_path)
  104. else:
  105. print(f"[PTZPersonDetector] 不支持的模型类型: {model_type}")
  106. def _load_yolo_model(self, model_path: str):
  107. """加载YOLO模型"""
  108. if not HAS_YOLO:
  109. print("[PTZPersonDetector] ultralytics未安装,无法使用YOLO模型")
  110. return
  111. try:
  112. self.model = YOLO(model_path)
  113. print(f"[PTZPersonDetector] YOLO模型加载成功: {model_path}")
  114. except Exception as e:
  115. print(f"[PTZPersonDetector] YOLO模型加载失败: {e}")
  116. def _load_rknn_model(self, model_path: str):
  117. """加载RKNN模型"""
  118. if not HAS_RKNN:
  119. print("[PTZPersonDetector] rknnlite未安装,无法使用RKNN模型")
  120. return
  121. try:
  122. self.model = RKNNLite()
  123. ret = self.model.load_rknn(model_path)
  124. if ret != 0:
  125. raise RuntimeError(f"加载RKNN模型失败: ret={ret}")
  126. ret = self.model.init_runtime(target=None) # 自动选择NPU核心
  127. if ret != 0:
  128. raise RuntimeError(f"初始化RKNN运行时失败: ret={ret}")
  129. # RKNN安全模型的person类别ID
  130. self.person_class_id = 3
  131. print(f"[PTZPersonDetector] RKNN模型加载成功: {model_path}")
  132. except Exception as e:
  133. print(f"[PTZPersonDetector] RKNN模型加载失败: {e}")
  134. self.model = None
  135. def detect(self, frame: np.ndarray) -> List[DetectedPerson]:
  136. """
  137. 检测人体
  138. Args:
  139. frame: BGR图像
  140. Returns:
  141. 检测到的人体列表
  142. """
  143. if self.model is None:
  144. return []
  145. if self.model_type == 'rknn':
  146. return self._detect_rknn(frame)
  147. elif self.model_type == 'yolo':
  148. return self._detect_yolo(frame)
  149. return []
  150. def _detect_yolo(self, frame: np.ndarray) -> List[DetectedPerson]:
  151. """YOLO检测"""
  152. persons = []
  153. try:
  154. results = self.model(frame, verbose=False)
  155. for r in results:
  156. if r.boxes is None:
  157. continue
  158. for box in r.boxes:
  159. cls_id = int(box.cls[0])
  160. if cls_id != self.person_class_id:
  161. continue
  162. conf = float(box.conf[0])
  163. if conf < self.confidence_threshold:
  164. continue
  165. x1, y1, x2, y2 = map(int, box.xyxy[0])
  166. center_x = (x1 + x2) / 2
  167. center_y = (y1 + y2) / 2
  168. width = x2 - x1
  169. height = y2 - y1
  170. persons.append(DetectedPerson(
  171. bbox=(x1, y1, x2, y2),
  172. center=(center_x, center_y),
  173. width=width,
  174. height=height,
  175. confidence=conf
  176. ))
  177. except Exception as e:
  178. print(f"[PTZPersonDetector] YOLO检测错误: {e}")
  179. return persons
  180. def _detect_rknn(self, frame: np.ndarray) -> List[DetectedPerson]:
  181. """RKNN检测"""
  182. persons = []
  183. try:
  184. # 预处理
  185. img = cv2.resize(frame, (640, 640))
  186. img = img.astype(np.float32) / 255.0
  187. img = np.expand_dims(img, 0)
  188. # 推理
  189. outputs = self.model.inference(inputs=[img])
  190. # 后处理 (YOLO格式输出)
  191. # outputs shape: [1, 84, 8400] 或类似
  192. if outputs is None or len(outputs) == 0:
  193. return []
  194. output = outputs[0]
  195. # 解析检测结果
  196. h, w = frame.shape[:2]
  197. for i in range(output.shape[-1]):
  198. data = output[0, :, i]
  199. # 获取类别和置信度
  200. class_scores = data[4:]
  201. class_id = np.argmax(class_scores)
  202. confidence = class_scores[class_id]
  203. if confidence < self.confidence_threshold:
  204. continue
  205. if class_id != self.person_class_id:
  206. continue
  207. # 获取边界框
  208. cx, cy, bw, bh = data[:4]
  209. # 转换为原图坐标
  210. x1 = int((cx - bw/2) * w / 640)
  211. y1 = int((cy - bh/2) * h / 640)
  212. x2 = int((cx + bw/2) * w / 640)
  213. y2 = int((cy + bh/2) * h / 640)
  214. persons.append(DetectedPerson(
  215. bbox=(x1, y1, x2, y2),
  216. center=((x1+x2)/2, (y1+y2)/2),
  217. width=x2-x1,
  218. height=y2-y1,
  219. confidence=float(confidence)
  220. ))
  221. except Exception as e:
  222. print(f"[PTZPersonDetector] RKNN检测错误: {e}")
  223. return persons
  224. def detect_largest_person(self, frame: np.ndarray) -> Optional[DetectedPerson]:
  225. """检测最大的人体并保存图片"""
  226. persons = self.detect(frame)
  227. if persons:
  228. self._save_detection_image(frame, persons)
  229. return max(persons, key=lambda p: p.area)
  230. return None
  231. def _save_detection_image(self, frame: np.ndarray, persons: List[DetectedPerson]):
  232. """
  233. 保存带有检测标记的图片(只标记达到置信度阈值的人)
  234. Args:
  235. frame: 原始图像
  236. persons: 检测到的人体列表
  237. """
  238. if not self._save_image_enabled or not persons:
  239. return
  240. # 检查保存间隔
  241. current_time = time.time()
  242. if current_time - self._last_save_time < self._save_interval:
  243. return
  244. try:
  245. # 复制图像避免修改原图
  246. marked_frame = frame.copy()
  247. # 只标记达到阈值的人
  248. person_count = 0
  249. for person in persons:
  250. # 未达阈值的不标记
  251. if person.confidence < self.confidence_threshold:
  252. continue
  253. x1, y1, x2, y2 = person.bbox
  254. # 绘制边界框(绿色)
  255. cv2.rectangle(marked_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
  256. # 绘制序号标签
  257. label = f"person_{person_count}"
  258. person_count += 1
  259. (label_w, label_h), baseline = cv2.getTextSize(
  260. label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
  261. )
  262. cv2.rectangle(
  263. marked_frame,
  264. (x1, y1 - label_h - 8),
  265. (x1 + label_w, y1),
  266. (0, 255, 0),
  267. -1
  268. )
  269. # 绘制标签文字(黑色)
  270. cv2.putText(
  271. marked_frame, label,
  272. (x1, y1 - 4),
  273. cv2.FONT_HERSHEY_SIMPLEX, 0.8,
  274. (0, 0, 0), 2
  275. )
  276. # 无有效目标则不保存
  277. if person_count == 0:
  278. return
  279. # 生成文件名(时间戳+有效人数)
  280. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
  281. filename = f"ptz_{timestamp}_n{person_count}.jpg"
  282. filepath = self._image_save_dir / filename
  283. # 保存图片
  284. cv2.imwrite(str(filepath), marked_frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
  285. self._last_save_time = current_time
  286. logger.info(f"[球机] 已保存检测图片: {filepath},有效人数 {person_count} (阈值={self.confidence_threshold})")
  287. except Exception as e:
  288. logger.error(f"[球机] 保存检测图片失败: {e}")
  289. class PTZAutoZoomController:
  290. """
  291. 球机自动变焦控制器
  292. 根据检测到的人体大小和位置,自动调整PTZ角度和变焦
  293. """
  294. def __init__(self, ptz_camera, detector: PTZPersonDetector = None, config: dict = None):
  295. """
  296. 初始化控制器
  297. Args:
  298. ptz_camera: PTZCamera实例
  299. detector: 人体检测器
  300. config: 自动变焦配置
  301. """
  302. self.ptz = ptz_camera
  303. self.detector = detector
  304. self.config = config or {}
  305. # 默认配置
  306. self.target_size_ratio = self.config.get('target_size_ratio', 0.4)
  307. self.min_zoom = self.config.get('min_zoom', 3)
  308. self.max_zoom = self.config.get('max_zoom', 20)
  309. self.zoom_step = self.config.get('zoom_step', 2)
  310. self.center_threshold = self.config.get('center_threshold', 0.1)
  311. self.max_adjust_attempts = self.config.get('max_adjust_attempts', 3)
  312. def adjust_to_person(self, frame: np.ndarray, current_pan: float,
  313. current_tilt: float, current_zoom: int) -> ZoomAdjustResult:
  314. """
  315. 调整PTZ使检测到的人体居中并占据合适比例
  316. Args:
  317. frame: 球机画面
  318. current_pan: 当前pan角度
  319. current_tilt: 当前tilt角度
  320. current_zoom: 当前变倍
  321. Returns:
  322. ZoomAdjustResult: 调整结果
  323. """
  324. if frame is None:
  325. return ZoomAdjustResult(
  326. success=False, new_zoom=current_zoom,
  327. pan_adjust=0, tilt_adjust=0,
  328. person_detected=False, person_centered=False,
  329. message="无法获取球机画面"
  330. )
  331. # 检测人体
  332. person = self.detector.detect_largest_person(frame) if self.detector else None
  333. if person is None:
  334. return ZoomAdjustResult(
  335. success=False, new_zoom=current_zoom,
  336. pan_adjust=0, tilt_adjust=0,
  337. person_detected=False, person_centered=False,
  338. message="球机画面中未检测到人体"
  339. )
  340. h, w = frame.shape[:2]
  341. # 计算人体中心偏离画面中心的程度
  342. frame_center_x = w / 2
  343. frame_center_y = h / 2
  344. offset_x = (person.center[0] - frame_center_x) / w # -0.5 ~ 0.5
  345. offset_y = (person.center[1] - frame_center_y) / h # -0.5 ~ 0.5
  346. # 计算人体占画面比例
  347. person_size_ratio = max(person.width / w, person.height / h)
  348. print(f"[AutoZoom] 检测到人体: 中心=({person.center[0]:.0f}, {person.center[1]:.0f}), "
  349. f"尺寸={person.width}x{person.height}, 占比={person.size_ratio:.2f}")
  350. print(f"[AutoZoom] 偏移: x={offset_x:.3f}, y={offset_y:.3f}")
  351. # 判断是否居中
  352. is_centered = abs(offset_x) < self.center_threshold and abs(offset_y) < self.center_threshold
  353. # 计算PTZ角度调整量 (根据视场角估算)
  354. # 假设当前zoom下的水平视场角约 60/zoom 度
  355. fov_per_pixel = (60.0 / current_zoom) / w
  356. pan_adjust = -offset_x * 60.0 / current_zoom # 简化计算
  357. tilt_adjust = -offset_y * 45.0 / current_zoom
  358. # 计算新的zoom
  359. if person_size_ratio < self.target_size_ratio * 0.8:
  360. # 人体太小,放大
  361. zoom_factor = self.target_size_ratio / person_size_ratio
  362. new_zoom = min(int(current_zoom * zoom_factor), self.max_zoom)
  363. elif person_size_ratio > self.target_size_ratio * 1.2:
  364. # 人体太大,缩小
  365. zoom_factor = self.target_size_ratio / person_size_ratio
  366. new_zoom = max(int(current_zoom * zoom_factor), self.min_zoom)
  367. else:
  368. # 大小合适
  369. new_zoom = current_zoom
  370. # 限制调整量
  371. if is_centered:
  372. pan_adjust = 0
  373. tilt_adjust = 0
  374. return ZoomAdjustResult(
  375. success=True,
  376. new_zoom=new_zoom,
  377. pan_adjust=pan_adjust,
  378. tilt_adjust=tilt_adjust,
  379. person_detected=True,
  380. person_centered=is_centered,
  381. message=f"人体居中={is_centered}, zoom调整={current_zoom}→{new_zoom}"
  382. )
  383. def auto_focus_loop(self, get_frame_func, max_attempts: int = None) -> Tuple[bool, int]:
  384. """
  385. 自动对焦循环
  386. 持续调整PTZ直到人体居中且大小合适
  387. Args:
  388. get_frame_func: 获取球机画面的函数
  389. max_attempts: 最大调整次数
  390. Returns:
  391. (是否成功, 最终zoom)
  392. """
  393. max_attempts = max_attempts or self.max_adjust_attempts
  394. current_pos = self.ptz.get_current_position()
  395. current_pan = current_pos.pan
  396. current_tilt = current_pos.tilt
  397. current_zoom = current_pos.zoom
  398. for attempt in range(max_attempts):
  399. print(f"[AutoZoom] 调整轮次 {attempt + 1}/{max_attempts}")
  400. # 等待画面稳定
  401. time.sleep(0.3)
  402. # 获取球机画面
  403. frame = get_frame_func()
  404. # 分析并计算调整
  405. result = self.adjust_to_person(frame, current_pan, current_tilt, current_zoom)
  406. if not result.person_detected:
  407. print(f"[AutoZoom] 未检测到人体,停止调整")
  408. return False, current_zoom
  409. if result.person_centered and result.new_zoom == current_zoom:
  410. print(f"[AutoZoom] 人体已居中且大小合适,调整完成")
  411. return True, current_zoom
  412. # 执行调整
  413. if result.pan_adjust != 0 or result.tilt_adjust != 0:
  414. new_pan = current_pan + result.pan_adjust
  415. new_tilt = current_tilt + result.tilt_adjust
  416. print(f"[AutoZoom] 调整角度: pan {current_pan:.1f}→{new_pan:.1f}, "
  417. f"tilt {current_tilt:.1f}→{new_tilt:.1f}")
  418. self.ptz.goto_exact_position(new_pan, new_tilt, result.new_zoom)
  419. current_pan = new_pan
  420. current_tilt = new_tilt
  421. elif result.new_zoom != current_zoom:
  422. print(f"[AutoZoom] 调整变焦: {current_zoom}→{result.new_zoom}")
  423. self.ptz.goto_exact_position(current_pan, current_tilt, result.new_zoom)
  424. current_zoom = result.new_zoom
  425. print(f"[AutoZoom] 达到最大调整次数,当前zoom={current_zoom}")
  426. return True, current_zoom