|
|
@@ -0,0 +1,377 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+"""
|
|
|
+独立校准扫描脚本
|
|
|
+按用户方案执行:
|
|
|
+1. 球机水平 360°、步长 20° 转一圈
|
|
|
+2. 每个水平位置由下朝上扫描 tilt
|
|
|
+3. 每个位置拍一张球机图,文件名带 pan_tilt
|
|
|
+4. 同时拍一张全景图
|
|
|
+5. 基于这些图做特征匹配 / 人工确认,生成映射表 JSON
|
|
|
+
|
|
|
+用法:
|
|
|
+ cd /home/admin/dsh/dual_camera_system
|
|
|
+ conda activate rknn
|
|
|
+ python scripts/calibration_scanner.py --output /home/admin/dsh/calibration_scan
|
|
|
+"""
|
|
|
+
|
|
|
+import os
|
|
|
+import sys
|
|
|
+import json
|
|
|
+import time
|
|
|
+import argparse
|
|
|
+import logging
|
|
|
+from pathlib import Path
|
|
|
+from datetime import datetime
|
|
|
+from typing import List, Tuple, Optional
|
|
|
+
|
|
|
+# 必须在导入 cv2 之前设置
|
|
|
+os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp|threads;1'
|
|
|
+
|
|
|
+import cv2
|
|
|
+import numpy as np
|
|
|
+
|
|
|
+# 把项目根目录加入路径
|
|
|
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
+
|
|
|
+from config import CAMERA_GROUPS, SDK_PATH
|
|
|
+from config.camera import parse_resolution
|
|
|
+from dahua_sdk import DahuaSDK
|
|
|
+from ptz_camera import PTZCamera
|
|
|
+from panorama_camera import PanoramaCamera
|
|
|
+
|
|
|
+logging.basicConfig(
|
|
|
+ level=logging.INFO,
|
|
|
+ format='%(asctime)s - %(levelname)s - %(message)s'
|
|
|
+)
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+
|
|
|
+def ensure_dir(path: Path):
|
|
|
+ path.mkdir(parents=True, exist_ok=True)
|
|
|
+ return path
|
|
|
+
|
|
|
+
|
|
|
+def clear_frame_buffer(cap, count: int = 5, interval: float = 0.05):
|
|
|
+ """丢弃缓存中的旧帧,获取最新帧"""
|
|
|
+ for _ in range(count):
|
|
|
+ cap.grab()
|
|
|
+ time.sleep(interval)
|
|
|
+ return cap.read()
|
|
|
+
|
|
|
+
|
|
|
+def save_image(path: Path, img: np.ndarray):
|
|
|
+ cv2.imwrite(str(path), img, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
|
|
|
+
|
|
|
+
|
|
|
+def match_to_panorama(
|
|
|
+ ptz_img: np.ndarray,
|
|
|
+ panorama_img: np.ndarray,
|
|
|
+ min_matches: int = 10,
|
|
|
+ ratio_thresh: float = 0.75,
|
|
|
+) -> Tuple[Optional[Tuple[float, float]], Optional[np.ndarray]]:
|
|
|
+ """
|
|
|
+ 将球机图与全景图做 SIFT 特征匹配,返回全景中的归一化位置 (x_ratio, y_ratio)
|
|
|
+ 以及可视化匹配图
|
|
|
+ """
|
|
|
+ if ptz_img is None or panorama_img is None:
|
|
|
+ return None, None
|
|
|
+
|
|
|
+ # 转换为灰度图
|
|
|
+ gray_p = cv2.cvtColor(ptz_img, cv2.COLOR_BGR2GRAY)
|
|
|
+ gray_g = cv2.cvtColor(panorama_img, cv2.COLOR_BGR2GRAY)
|
|
|
+
|
|
|
+ # 使用 SIFT
|
|
|
+ sift = cv2.SIFT_create(nfeatures=500)
|
|
|
+ kp_p, des_p = sift.detectAndCompute(gray_p, None)
|
|
|
+ kp_g, des_g = sift.detectAndCompute(gray_g, None)
|
|
|
+
|
|
|
+ if des_p is None or des_g is None or len(kp_p) < 4 or len(kp_g) < 4:
|
|
|
+ return None, None
|
|
|
+
|
|
|
+ # FLANN 匹配
|
|
|
+ index_params = dict(algorithm=1, trees=5)
|
|
|
+ search_params = dict(checks=50)
|
|
|
+ flann = cv2.FlannBasedMatcher(index_params, search_params)
|
|
|
+ matches = flann.knnMatch(des_p, des_g, k=2)
|
|
|
+
|
|
|
+ good = []
|
|
|
+ for m_n in matches:
|
|
|
+ if len(m_n) == 2:
|
|
|
+ m, n = m_n
|
|
|
+ if m.distance < ratio_thresh * n.distance:
|
|
|
+ good.append(m)
|
|
|
+
|
|
|
+ if len(good) < min_matches:
|
|
|
+ return None, None
|
|
|
+
|
|
|
+ # 提取匹配点坐标
|
|
|
+ pts_p = np.float32([kp_p[m.queryIdx].pt for m in good])
|
|
|
+ pts_g = np.float32([kp_g[m.trainIdx].pt for m in good])
|
|
|
+
|
|
|
+ # 用 RANSAC 过滤异常点
|
|
|
+ H, mask = cv2.findHomography(pts_p, pts_g, cv2.RANSAC, 5.0)
|
|
|
+ if mask is None:
|
|
|
+ return None, None
|
|
|
+
|
|
|
+ inlier_g = pts_g[mask.ravel() == 1]
|
|
|
+ if len(inlier_g) < min_matches:
|
|
|
+ return None, None
|
|
|
+
|
|
|
+ # 计算全景位置中心
|
|
|
+ center_x = float(np.median(inlier_g[:, 0]))
|
|
|
+ center_y = float(np.median(inlier_g[:, 1]))
|
|
|
+
|
|
|
+ h, w = panorama_img.shape[:2]
|
|
|
+ x_ratio = np.clip(center_x / w, 0.0, 1.0)
|
|
|
+ y_ratio = np.clip(center_y / h, 0.0, 1.0)
|
|
|
+
|
|
|
+ # 绘制匹配可视化
|
|
|
+ vis = cv2.drawMatches(
|
|
|
+ ptz_img, kp_p, panorama_img, kp_g,
|
|
|
+ [good[i] for i in range(len(good)) if mask[i]],
|
|
|
+ None, flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS
|
|
|
+ )
|
|
|
+
|
|
|
+ return (x_ratio, y_ratio), vis
|
|
|
+
|
|
|
+
|
|
|
+def run_scan(
|
|
|
+ output_dir: Path,
|
|
|
+ pan_step: int = 20,
|
|
|
+ tilt_range: Tuple[int, int] = (-35, 45),
|
|
|
+ tilt_step: int = 10,
|
|
|
+ zoom: int = 1,
|
|
|
+ stabilize_time: float = 1.0,
|
|
|
+):
|
|
|
+ """执行完整扫描"""
|
|
|
+
|
|
|
+ ensure_dir(output_dir)
|
|
|
+ ptz_dir = ensure_dir(output_dir / 'ptz_images')
|
|
|
+ match_dir = ensure_dir(output_dir / 'matches')
|
|
|
+
|
|
|
+ # 加载第一组配置
|
|
|
+ groups = [g for g in CAMERA_GROUPS if g.get('enabled', False)]
|
|
|
+ if not groups:
|
|
|
+ logger.error('没有启用的摄像头组')
|
|
|
+ return
|
|
|
+ group = groups[0]
|
|
|
+ pano_cfg = group['panorama']
|
|
|
+ ptz_cfg = group['ptz']
|
|
|
+
|
|
|
+ # 初始化 SDK
|
|
|
+ sdk_lib = os.path.join(SDK_PATH['lib_path'], SDK_PATH['netsdk'])
|
|
|
+ sdk = DahuaSDK(sdk_lib)
|
|
|
+ if not sdk.initialized or not sdk.init():
|
|
|
+ logger.error('大华 SDK 初始化失败')
|
|
|
+ return
|
|
|
+
|
|
|
+ # 连接枪机
|
|
|
+ logger.info('连接全景摄像头...')
|
|
|
+ panorama = PanoramaCamera(sdk, pano_cfg)
|
|
|
+ if not panorama.connect():
|
|
|
+ logger.error('全景摄像头连接失败')
|
|
|
+ sdk.cleanup()
|
|
|
+ return
|
|
|
+ if not panorama.start_stream_rtsp():
|
|
|
+ logger.error('全景 RTSP 启动失败')
|
|
|
+ panorama.disconnect()
|
|
|
+ sdk.cleanup()
|
|
|
+ return
|
|
|
+
|
|
|
+ # 连接球机
|
|
|
+ logger.info('连接 PTZ 球机...')
|
|
|
+ ptz = PTZCamera(sdk, ptz_cfg)
|
|
|
+ if not ptz.connect():
|
|
|
+ logger.error('PTZ 球机连接失败')
|
|
|
+ panorama.disconnect()
|
|
|
+ sdk.cleanup()
|
|
|
+ return
|
|
|
+ if not ptz.start_stream_rtsp():
|
|
|
+ logger.warning('PTZ RTSP 启动失败,但仍可控制云台')
|
|
|
+
|
|
|
+ # 等待视频流稳定
|
|
|
+ logger.info('等待视频流稳定 5 秒...')
|
|
|
+ time.sleep(5)
|
|
|
+
|
|
|
+ # 获取并保存全景图
|
|
|
+ logger.info('保存全景图...')
|
|
|
+ pano_frame = None
|
|
|
+ for _ in range(10):
|
|
|
+ pano_frame = panorama.get_frame()
|
|
|
+ if pano_frame is not None:
|
|
|
+ break
|
|
|
+ time.sleep(0.5)
|
|
|
+
|
|
|
+ if pano_frame is None:
|
|
|
+ logger.error('无法获取全景图')
|
|
|
+ ptz.disconnect()
|
|
|
+ panorama.disconnect()
|
|
|
+ sdk.cleanup()
|
|
|
+ return
|
|
|
+
|
|
|
+ pano_path = output_dir / 'panorama.jpg'
|
|
|
+ save_image(pano_path, pano_frame)
|
|
|
+ logger.info(f'全景图已保存: {pano_path}')
|
|
|
+
|
|
|
+ h, w = pano_frame.shape[:2]
|
|
|
+ logger.info(f'全景图尺寸: {w}x{h}')
|
|
|
+
|
|
|
+ # 构建扫描位置列表
|
|
|
+ scan_positions: List[Tuple[int, int]] = []
|
|
|
+ pan = 0
|
|
|
+ while pan < 360:
|
|
|
+ tilt = tilt_range[0]
|
|
|
+ while tilt <= tilt_range[1]:
|
|
|
+ scan_positions.append((pan, tilt))
|
|
|
+ tilt += tilt_step
|
|
|
+ pan += pan_step
|
|
|
+
|
|
|
+ logger.info(f'扫描位置总数: {len(scan_positions)}')
|
|
|
+
|
|
|
+ # 扫描并保存图片
|
|
|
+ mapping_records = []
|
|
|
+ failed_positions = []
|
|
|
+
|
|
|
+ for idx, (pan, tilt) in enumerate(scan_positions, 1):
|
|
|
+ logger.info(f'[{idx}/{len(scan_positions)}] pan={pan}°, tilt={tilt}°')
|
|
|
+
|
|
|
+ # 移动球机
|
|
|
+ if not ptz.goto_exact_position(float(pan), float(tilt), zoom):
|
|
|
+ logger.warning(f' 移动到 pan={pan}, tilt={tilt} 失败')
|
|
|
+ failed_positions.append((pan, tilt))
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 等待稳定
|
|
|
+ time.sleep(stabilize_time)
|
|
|
+
|
|
|
+ # 获取清晰帧
|
|
|
+ frame = None
|
|
|
+ for _ in range(5):
|
|
|
+ frame = ptz.get_frame()
|
|
|
+ if frame is not None:
|
|
|
+ break
|
|
|
+ time.sleep(0.2)
|
|
|
+
|
|
|
+ if frame is None:
|
|
|
+ logger.warning(f' 获取帧失败: pan={pan}, tilt={tilt}')
|
|
|
+ failed_positions.append((pan, tilt))
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 保存球机图
|
|
|
+ filename = f'ptz_p{pan:03d}_t{tilt:+03d}.jpg'
|
|
|
+ img_path = ptz_dir / filename
|
|
|
+ save_image(img_path, frame)
|
|
|
+
|
|
|
+ # 尝试与全景图匹配
|
|
|
+ pos, vis = match_to_panorama(frame, pano_frame)
|
|
|
+ record = {
|
|
|
+ 'pan': pan,
|
|
|
+ 'tilt': tilt,
|
|
|
+ 'zoom': zoom,
|
|
|
+ 'filename': filename,
|
|
|
+ 'matched': pos is not None,
|
|
|
+ }
|
|
|
+ if pos:
|
|
|
+ record['x_ratio'] = round(pos[0], 4)
|
|
|
+ record['y_ratio'] = round(pos[1], 4)
|
|
|
+ record['panorama_x'] = int(pos[0] * w)
|
|
|
+ record['panorama_y'] = int(pos[1] * h)
|
|
|
+ logger.info(f' 匹配成功: 全景位置=({pos[0]:.3f}, {pos[1]:.3f})')
|
|
|
+
|
|
|
+ # 保存可视化匹配图
|
|
|
+ if vis is not None:
|
|
|
+ vis_path = match_dir / f'match_{filename}'
|
|
|
+ save_image(vis_path, vis)
|
|
|
+ else:
|
|
|
+ logger.info(' 未匹配到全景区域')
|
|
|
+
|
|
|
+ mapping_records.append(record)
|
|
|
+
|
|
|
+ # 生成映射表
|
|
|
+ mapping = {
|
|
|
+ 'created_at': datetime.now().isoformat(),
|
|
|
+ 'panorama_size': {'width': w, 'height': h},
|
|
|
+ 'pan_step': pan_step,
|
|
|
+ 'tilt_range': tilt_range,
|
|
|
+ 'tilt_step': tilt_step,
|
|
|
+ 'zoom': zoom,
|
|
|
+ 'total_positions': len(scan_positions),
|
|
|
+ 'failed_positions': failed_positions,
|
|
|
+ 'records': mapping_records,
|
|
|
+ }
|
|
|
+
|
|
|
+ mapping_path = output_dir / 'mapping_raw.json'
|
|
|
+ with open(mapping_path, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(mapping, f, indent=2, ensure_ascii=False)
|
|
|
+ logger.info(f'原始映射表已保存: {mapping_path}')
|
|
|
+
|
|
|
+ # 生成可直接用于校准器的查找表(仅包含有匹配的点)
|
|
|
+ valid_records = [r for r in mapping_records if r.get('matched')]
|
|
|
+ pan_lookup = sorted([[r['x_ratio'], float(r['pan'])] for r in valid_records], key=lambda x: x[0])
|
|
|
+ tilt_lookup = sorted([[r['y_ratio'], float(r['tilt'])] for r in valid_records], key=lambda x: x[0])
|
|
|
+
|
|
|
+ lookup = {
|
|
|
+ 'created_at': datetime.now().isoformat(),
|
|
|
+ 'pan_lookup': pan_lookup,
|
|
|
+ 'tilt_lookup': tilt_lookup,
|
|
|
+ 'valid_count': len(valid_records),
|
|
|
+ }
|
|
|
+ lookup_path = output_dir / 'lookup_table.json'
|
|
|
+ with open(lookup_path, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(lookup, f, indent=2, ensure_ascii=False)
|
|
|
+ logger.info(f'查找表已保存: {lookup_path} (有效点 {len(valid_records)}/{len(scan_positions)})')
|
|
|
+
|
|
|
+ # 生成人工复核用 CSV
|
|
|
+ csv_path = output_dir / 'mapping_for_review.csv'
|
|
|
+ with open(csv_path, 'w', encoding='utf-8') as f:
|
|
|
+ f.write('filename,pan,tilt,x_ratio,y_ratio,panorama_x,panorama_y,matched,review_x,review_y\n')
|
|
|
+ for r in mapping_records:
|
|
|
+ f.write(
|
|
|
+ f"{r['filename']},{r['pan']},{r['tilt']},"
|
|
|
+ f"{r.get('x_ratio', '')},{r.get('y_ratio', '')},"
|
|
|
+ f"{r.get('panorama_x', '')},{r.get('panorama_y', '')},"
|
|
|
+ f"{r['matched']},,\n"
|
|
|
+ )
|
|
|
+ logger.info(f'人工复核 CSV 已保存: {csv_path}')
|
|
|
+
|
|
|
+ # 回到初始位置
|
|
|
+ ptz.goto_exact_position(0.0, 0.0, 1)
|
|
|
+
|
|
|
+ # 清理
|
|
|
+ ptz.disconnect()
|
|
|
+ panorama.disconnect()
|
|
|
+ sdk.cleanup()
|
|
|
+ logger.info('扫描完成')
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ parser = argparse.ArgumentParser(description='PTZ 校准扫描工具')
|
|
|
+ parser.add_argument('--output', type=str, default='/home/admin/dsh/calibration_scan',
|
|
|
+ help='扫描结果输出目录')
|
|
|
+ parser.add_argument('--pan-step', type=int, default=20,
|
|
|
+ help='水平扫描步长(度)')
|
|
|
+ parser.add_argument('--tilt-min', type=int, default=-35,
|
|
|
+ help='最小 tilt(度)')
|
|
|
+ parser.add_argument('--tilt-max', type=int, default=45,
|
|
|
+ help='最大 tilt(度)')
|
|
|
+ parser.add_argument('--tilt-step', type=int, default=10,
|
|
|
+ help='tilt 扫描步长(度)')
|
|
|
+ parser.add_argument('--zoom', type=int, default=1,
|
|
|
+ help='扫描时使用 zoom')
|
|
|
+ parser.add_argument('--stabilize', type=float, default=1.0,
|
|
|
+ help='PTZ 到位后等待时间(秒)')
|
|
|
+
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ run_scan(
|
|
|
+ output_dir=Path(args.output),
|
|
|
+ pan_step=args.pan_step,
|
|
|
+ tilt_range=(args.tilt_min, args.tilt_max),
|
|
|
+ tilt_step=args.tilt_step,
|
|
|
+ zoom=args.zoom,
|
|
|
+ stabilize_time=args.stabilize,
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|