local_test.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. """
  2. 本地冒烟测试脚本
  3. 无需大华 SDK 即可运行,用途:
  4. 1. 验证 Hikvision 枪机 RTSP 流可连接
  5. 2. 验证 YOLO 人体检测可运行
  6. 3. 验证根据检测坐标计算 PTZ 角度(不实际转动球机)
  7. 运行方式:
  8. cd dual_camera_system
  9. python scripts/local_test.py [--frames 10]
  10. """
  11. import os
  12. import sys
  13. import argparse
  14. import time
  15. # 必须在 import cv2 之前
  16. os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp|threads;1'
  17. import cv2
  18. import numpy as np
  19. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  20. from config import CAMERA_GROUPS, get_enabled_groups, DETECTION_CONFIG
  21. from config.camera import parse_resolution
  22. from panorama_camera import ObjectDetector
  23. from ptz_camera import PTZCamera
  24. def open_rtsp(url: str, timeout_sec: float = 10.0):
  25. """尝试打开 RTSP 流,等待首帧"""
  26. print(f"[RTSP] 正在打开: {url}")
  27. cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
  28. if not cap.isOpened():
  29. return None
  30. deadline = time.time() + timeout_sec
  31. while time.time() < deadline:
  32. ret, frame = cap.read()
  33. if ret and frame is not None:
  34. return cap
  35. time.sleep(0.2)
  36. cap.release()
  37. return None
  38. def main():
  39. parser = argparse.ArgumentParser(description='本地冒烟测试(无需大华 SDK)')
  40. parser.add_argument('--frames', type=int, default=10, help='读取并检测的帧数')
  41. parser.add_argument('--show', action='store_true', help='显示检测结果窗口')
  42. args = parser.parse_args()
  43. groups = get_enabled_groups()
  44. if not groups:
  45. print('[错误] 没有启用的摄像头组,请检查 config/camera.py')
  46. return 1
  47. group = groups[0]
  48. panorama_cfg = group['panorama']
  49. ptz_cfg = group['ptz']
  50. expected_w, expected_h = parse_resolution(panorama_cfg.get('resolution'))
  51. print(f"[配置] 组: {group.get('name', group['group_id'])}")
  52. print(f"[配置] 枪机 IP: {panorama_cfg['ip']}, 期望分辨率: {expected_w}x{expected_h}")
  53. print(f"[配置] 球机 IP: {ptz_cfg['ip']}, mount_type={ptz_cfg.get('mount_type')}, pan_flip={ptz_cfg.get('pan_flip')}")
  54. # 1. 打开 RTSP
  55. cap = open_rtsp(panorama_cfg['rtsp_url'])
  56. if cap is None:
  57. print('[错误] 无法打开枪机 RTSP 流,请检查网络、用户名密码、RTSP 地址')
  58. return 1
  59. ret, frame = cap.read()
  60. if not ret or frame is None:
  61. print('[错误] RTSP 已打开但读取不到帧')
  62. cap.release()
  63. return 1
  64. actual_h, actual_w = frame.shape[:2]
  65. print(f"[RTSP] 连接成功,实际分辨率: {actual_w}x{actual_h}")
  66. cap.release()
  67. # 2. 初始化检测器
  68. print('[检测] 正在加载 YOLO 检测器...')
  69. detector = ObjectDetector(
  70. model_path=DETECTION_CONFIG.get('model_path'),
  71. use_gpu=DETECTION_CONFIG.get('use_gpu', True),
  72. model_type=DETECTION_CONFIG.get('model_type', 'yolo'),
  73. model_size='n',
  74. )
  75. print('[检测] 检测器加载完成')
  76. # 3. 实例化 PTZCamera(不连接 SDK,仅用于坐标计算)
  77. ptz = PTZCamera(sdk=None, camera_config=ptz_cfg)
  78. # 4. 重新打开 RTSP 并检测
  79. cap = open_rtsp(panorama_cfg['rtsp_url'])
  80. if cap is None:
  81. print('[错误] 第二次打开 RTSP 失败')
  82. return 1
  83. print(f'[检测] 开始读取 {args.frames} 帧...')
  84. for i in range(args.frames):
  85. ret, frame = cap.read()
  86. if not ret or frame is None:
  87. print(f'[检测] 第 {i+1}/{args.frames} 帧读取失败,跳过')
  88. time.sleep(0.5)
  89. continue
  90. detections = detector.detect(frame)
  91. persons = [d for d in detections if d.class_name == 'person']
  92. print(f"[检测] 帧 {i+1}/{args.frames}: 检测到 {len(persons)} 个人体")
  93. for p in persons:
  94. cx, cy = p.center
  95. x_ratio = cx / frame.shape[1]
  96. y_ratio = cy / frame.shape[0]
  97. pan, tilt, zoom = ptz.calculate_ptz_position(x_ratio, y_ratio)
  98. print(f" 人体: center=({cx}, {cy}), ratio=({x_ratio:.3f}, {y_ratio:.3f}) "
  99. f"-> PTZ(pan={pan:.1f}, tilt={tilt:.1f}, zoom={zoom})")
  100. if args.show:
  101. x1, y1, x2, y2 = p.bbox
  102. cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
  103. cv2.circle(frame, (cx, cy), 4, (0, 0, 255), -1)
  104. cv2.putText(frame, f"P{pan:.0f} T{tilt:.0f} Z{zoom}",
  105. (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
  106. if args.show:
  107. cv2.imshow('Local Test', frame)
  108. if cv2.waitKey(1) & 0xFF == ord('q'):
  109. break
  110. cap.release()
  111. if args.show:
  112. cv2.destroyAllWindows()
  113. print('[完成] 本地冒烟测试结束')
  114. return 0
  115. if __name__ == '__main__':
  116. sys.exit(main())