| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- """
- 本地冒烟测试脚本
- 无需大华 SDK 即可运行,用途:
- 1. 验证 Hikvision 枪机 RTSP 流可连接
- 2. 验证 YOLO 人体检测可运行
- 3. 验证根据检测坐标计算 PTZ 角度(不实际转动球机)
- 运行方式:
- cd dual_camera_system
- python scripts/local_test.py [--frames 10]
- """
- import os
- import sys
- import argparse
- import time
- # 必须在 import cv2 之前
- os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp|threads;1'
- import cv2
- import numpy as np
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from config import CAMERA_GROUPS, get_enabled_groups, DETECTION_CONFIG
- from config.camera import parse_resolution
- from panorama_camera import ObjectDetector
- from ptz_camera import PTZCamera
- def open_rtsp(url: str, timeout_sec: float = 10.0):
- """尝试打开 RTSP 流,等待首帧"""
- print(f"[RTSP] 正在打开: {url}")
- cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
- if not cap.isOpened():
- return None
- deadline = time.time() + timeout_sec
- while time.time() < deadline:
- ret, frame = cap.read()
- if ret and frame is not None:
- return cap
- time.sleep(0.2)
- cap.release()
- return None
- def main():
- parser = argparse.ArgumentParser(description='本地冒烟测试(无需大华 SDK)')
- parser.add_argument('--frames', type=int, default=10, help='读取并检测的帧数')
- parser.add_argument('--show', action='store_true', help='显示检测结果窗口')
- args = parser.parse_args()
- groups = get_enabled_groups()
- if not groups:
- print('[错误] 没有启用的摄像头组,请检查 config/camera.py')
- return 1
- group = groups[0]
- panorama_cfg = group['panorama']
- ptz_cfg = group['ptz']
- expected_w, expected_h = parse_resolution(panorama_cfg.get('resolution'))
- print(f"[配置] 组: {group.get('name', group['group_id'])}")
- print(f"[配置] 枪机 IP: {panorama_cfg['ip']}, 期望分辨率: {expected_w}x{expected_h}")
- print(f"[配置] 球机 IP: {ptz_cfg['ip']}, mount_type={ptz_cfg.get('mount_type')}, pan_flip={ptz_cfg.get('pan_flip')}")
- # 1. 打开 RTSP
- cap = open_rtsp(panorama_cfg['rtsp_url'])
- if cap is None:
- print('[错误] 无法打开枪机 RTSP 流,请检查网络、用户名密码、RTSP 地址')
- return 1
- ret, frame = cap.read()
- if not ret or frame is None:
- print('[错误] RTSP 已打开但读取不到帧')
- cap.release()
- return 1
- actual_h, actual_w = frame.shape[:2]
- print(f"[RTSP] 连接成功,实际分辨率: {actual_w}x{actual_h}")
- cap.release()
- # 2. 初始化检测器
- print('[检测] 正在加载 YOLO 检测器...')
- detector = ObjectDetector(
- model_path=DETECTION_CONFIG.get('model_path'),
- use_gpu=DETECTION_CONFIG.get('use_gpu', True),
- model_type=DETECTION_CONFIG.get('model_type', 'yolo'),
- model_size='n',
- )
- print('[检测] 检测器加载完成')
- # 3. 实例化 PTZCamera(不连接 SDK,仅用于坐标计算)
- ptz = PTZCamera(sdk=None, camera_config=ptz_cfg)
- # 4. 重新打开 RTSP 并检测
- cap = open_rtsp(panorama_cfg['rtsp_url'])
- if cap is None:
- print('[错误] 第二次打开 RTSP 失败')
- return 1
- print(f'[检测] 开始读取 {args.frames} 帧...')
- for i in range(args.frames):
- ret, frame = cap.read()
- if not ret or frame is None:
- print(f'[检测] 第 {i+1}/{args.frames} 帧读取失败,跳过')
- time.sleep(0.5)
- continue
- detections = detector.detect(frame)
- persons = [d for d in detections if d.class_name == 'person']
- print(f"[检测] 帧 {i+1}/{args.frames}: 检测到 {len(persons)} 个人体")
- for p in persons:
- cx, cy = p.center
- x_ratio = cx / frame.shape[1]
- y_ratio = cy / frame.shape[0]
- pan, tilt, zoom = ptz.calculate_ptz_position(x_ratio, y_ratio)
- print(f" 人体: center=({cx}, {cy}), ratio=({x_ratio:.3f}, {y_ratio:.3f}) "
- f"-> PTZ(pan={pan:.1f}, tilt={tilt:.1f}, zoom={zoom})")
- if args.show:
- x1, y1, x2, y2 = p.bbox
- cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
- cv2.circle(frame, (cx, cy), 4, (0, 0, 255), -1)
- cv2.putText(frame, f"P{pan:.0f} T{tilt:.0f} Z{zoom}",
- (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
- if args.show:
- cv2.imshow('Local Test', frame)
- if cv2.waitKey(1) & 0xFF == ord('q'):
- break
- cap.release()
- if args.show:
- cv2.destroyAllWindows()
- print('[完成] 本地冒烟测试结束')
- return 0
- if __name__ == '__main__':
- sys.exit(main())
|