#!/usr/bin/env python3 """ 独立校准扫描脚本 按用户方案执行: 1. 球机水平 360°、步长 20° 转一圈 2. 每个水平位置由下朝上扫描 tilt 3. 每个位置拍一张球机图,文件名带 pan_tilt 4. 同时拍一张全景图 5. 基于这些图做特征匹配 / 人工确认,生成映射表 JSON 用法: cd /home/admin/dsh/dual_camera_system conda activate rknn python scripts/calibration_scanner.py --output /home/admin/dsh/calibration_scan """ import os import sys import json import time import argparse import logging from pathlib import Path from datetime import datetime from typing import List, Tuple, Optional # 必须在导入 cv2 之前设置 os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp|threads;1' import cv2 import numpy as np # 把项目根目录加入路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config import CAMERA_GROUPS, SDK_PATH from config.camera import parse_resolution from dahua_sdk import DahuaSDK from ptz_camera import PTZCamera from panorama_camera import PanoramaCamera logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def ensure_dir(path: Path): path.mkdir(parents=True, exist_ok=True) return path def clear_frame_buffer(cap, count: int = 5, interval: float = 0.05): """丢弃缓存中的旧帧,获取最新帧""" for _ in range(count): cap.grab() time.sleep(interval) return cap.read() def save_image(path: Path, img: np.ndarray): cv2.imwrite(str(path), img, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) def match_to_panorama( ptz_img: np.ndarray, panorama_img: np.ndarray, min_matches: int = 10, ratio_thresh: float = 0.75, ) -> Tuple[Optional[Tuple[float, float]], Optional[np.ndarray]]: """ 将球机图与全景图做 SIFT 特征匹配,返回全景中的归一化位置 (x_ratio, y_ratio) 以及可视化匹配图 """ if ptz_img is None or panorama_img is None: return None, None # 转换为灰度图 gray_p = cv2.cvtColor(ptz_img, cv2.COLOR_BGR2GRAY) gray_g = cv2.cvtColor(panorama_img, cv2.COLOR_BGR2GRAY) # 使用 SIFT sift = cv2.SIFT_create(nfeatures=500) kp_p, des_p = sift.detectAndCompute(gray_p, None) kp_g, des_g = sift.detectAndCompute(gray_g, None) if des_p is None or des_g is None or len(kp_p) < 4 or len(kp_g) < 4: return None, None # FLANN 匹配 index_params = dict(algorithm=1, trees=5) search_params = dict(checks=50) flann = cv2.FlannBasedMatcher(index_params, search_params) matches = flann.knnMatch(des_p, des_g, k=2) good = [] for m_n in matches: if len(m_n) == 2: m, n = m_n if m.distance < ratio_thresh * n.distance: good.append(m) if len(good) < min_matches: return None, None # 提取匹配点坐标 pts_p = np.float32([kp_p[m.queryIdx].pt for m in good]) pts_g = np.float32([kp_g[m.trainIdx].pt for m in good]) # 用 RANSAC 过滤异常点 H, mask = cv2.findHomography(pts_p, pts_g, cv2.RANSAC, 5.0) if mask is None: return None, None inlier_g = pts_g[mask.ravel() == 1] if len(inlier_g) < min_matches: return None, None # 计算全景位置中心 center_x = float(np.median(inlier_g[:, 0])) center_y = float(np.median(inlier_g[:, 1])) h, w = panorama_img.shape[:2] x_ratio = np.clip(center_x / w, 0.0, 1.0) y_ratio = np.clip(center_y / h, 0.0, 1.0) # 绘制匹配可视化 vis = cv2.drawMatches( ptz_img, kp_p, panorama_img, kp_g, [good[i] for i in range(len(good)) if mask[i]], None, flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS ) return (x_ratio, y_ratio), vis def run_scan( output_dir: Path, pan_step: int = 20, tilt_range: Tuple[int, int] = (-35, 45), tilt_step: int = 10, zoom: int = 1, stabilize_time: float = 1.0, ): """执行完整扫描""" ensure_dir(output_dir) ptz_dir = ensure_dir(output_dir / 'ptz_images') match_dir = ensure_dir(output_dir / 'matches') # 加载第一组配置 groups = [g for g in CAMERA_GROUPS if g.get('enabled', False)] if not groups: logger.error('没有启用的摄像头组') return group = groups[0] pano_cfg = group['panorama'] ptz_cfg = group['ptz'] # 初始化 SDK sdk_lib = os.path.join(SDK_PATH['lib_path'], SDK_PATH['netsdk']) sdk = DahuaSDK(sdk_lib) if not sdk.initialized or not sdk.init(): logger.error('大华 SDK 初始化失败') return # 连接枪机 logger.info('连接全景摄像头...') panorama = PanoramaCamera(sdk, pano_cfg) if not panorama.connect(): logger.error('全景摄像头连接失败') sdk.cleanup() return if not panorama.start_stream_rtsp(): logger.error('全景 RTSP 启动失败') panorama.disconnect() sdk.cleanup() return # 连接球机 logger.info('连接 PTZ 球机...') ptz = PTZCamera(sdk, ptz_cfg) if not ptz.connect(): logger.error('PTZ 球机连接失败') panorama.disconnect() sdk.cleanup() return if not ptz.start_stream_rtsp(): logger.warning('PTZ RTSP 启动失败,但仍可控制云台') # 等待视频流稳定 logger.info('等待视频流稳定 5 秒...') time.sleep(5) # 获取并保存全景图 logger.info('保存全景图...') pano_frame = None for _ in range(10): pano_frame = panorama.get_frame() if pano_frame is not None: break time.sleep(0.5) if pano_frame is None: logger.error('无法获取全景图') ptz.disconnect() panorama.disconnect() sdk.cleanup() return pano_path = output_dir / 'panorama.jpg' save_image(pano_path, pano_frame) logger.info(f'全景图已保存: {pano_path}') h, w = pano_frame.shape[:2] logger.info(f'全景图尺寸: {w}x{h}') # 构建扫描位置列表 scan_positions: List[Tuple[int, int]] = [] pan = 0 while pan < 360: tilt = tilt_range[0] while tilt <= tilt_range[1]: scan_positions.append((pan, tilt)) tilt += tilt_step pan += pan_step logger.info(f'扫描位置总数: {len(scan_positions)}') # 扫描并保存图片 mapping_records = [] failed_positions = [] for idx, (pan, tilt) in enumerate(scan_positions, 1): logger.info(f'[{idx}/{len(scan_positions)}] pan={pan}°, tilt={tilt}°') # 移动球机 if not ptz.goto_exact_position(float(pan), float(tilt), zoom): logger.warning(f' 移动到 pan={pan}, tilt={tilt} 失败') failed_positions.append((pan, tilt)) continue # 等待稳定 time.sleep(stabilize_time) # 获取清晰帧 frame = None for _ in range(5): frame = ptz.get_frame() if frame is not None: break time.sleep(0.2) if frame is None: logger.warning(f' 获取帧失败: pan={pan}, tilt={tilt}') failed_positions.append((pan, tilt)) continue # 保存球机图 filename = f'ptz_p{pan:03d}_t{tilt:+03d}.jpg' img_path = ptz_dir / filename save_image(img_path, frame) # 尝试与全景图匹配 pos, vis = match_to_panorama(frame, pano_frame) record = { 'pan': pan, 'tilt': tilt, 'zoom': zoom, 'filename': filename, 'matched': pos is not None, } if pos: record['x_ratio'] = round(pos[0], 4) record['y_ratio'] = round(pos[1], 4) record['panorama_x'] = int(pos[0] * w) record['panorama_y'] = int(pos[1] * h) logger.info(f' 匹配成功: 全景位置=({pos[0]:.3f}, {pos[1]:.3f})') # 保存可视化匹配图 if vis is not None: vis_path = match_dir / f'match_{filename}' save_image(vis_path, vis) else: logger.info(' 未匹配到全景区域') mapping_records.append(record) # 生成映射表 mapping = { 'created_at': datetime.now().isoformat(), 'panorama_size': {'width': w, 'height': h}, 'pan_step': pan_step, 'tilt_range': tilt_range, 'tilt_step': tilt_step, 'zoom': zoom, 'total_positions': len(scan_positions), 'failed_positions': failed_positions, 'records': mapping_records, } mapping_path = output_dir / 'mapping_raw.json' with open(mapping_path, 'w', encoding='utf-8') as f: json.dump(mapping, f, indent=2, ensure_ascii=False) logger.info(f'原始映射表已保存: {mapping_path}') # 生成可直接用于校准器的查找表(仅包含有匹配的点) valid_records = [r for r in mapping_records if r.get('matched')] pan_lookup = sorted([[r['x_ratio'], float(r['pan'])] for r in valid_records], key=lambda x: x[0]) tilt_lookup = sorted([[r['y_ratio'], float(r['tilt'])] for r in valid_records], key=lambda x: x[0]) lookup = { 'created_at': datetime.now().isoformat(), 'pan_lookup': pan_lookup, 'tilt_lookup': tilt_lookup, 'valid_count': len(valid_records), } lookup_path = output_dir / 'lookup_table.json' with open(lookup_path, 'w', encoding='utf-8') as f: json.dump(lookup, f, indent=2, ensure_ascii=False) logger.info(f'查找表已保存: {lookup_path} (有效点 {len(valid_records)}/{len(scan_positions)})') # 生成人工复核用 CSV csv_path = output_dir / 'mapping_for_review.csv' with open(csv_path, 'w', encoding='utf-8') as f: f.write('filename,pan,tilt,x_ratio,y_ratio,panorama_x,panorama_y,matched,review_x,review_y\n') for r in mapping_records: f.write( f"{r['filename']},{r['pan']},{r['tilt']}," f"{r.get('x_ratio', '')},{r.get('y_ratio', '')}," f"{r.get('panorama_x', '')},{r.get('panorama_y', '')}," f"{r['matched']},,\n" ) logger.info(f'人工复核 CSV 已保存: {csv_path}') # 回到初始位置 ptz.goto_exact_position(0.0, 0.0, 1) # 清理 ptz.disconnect() panorama.disconnect() sdk.cleanup() logger.info('扫描完成') def main(): parser = argparse.ArgumentParser(description='PTZ 校准扫描工具') parser.add_argument('--output', type=str, default='/home/admin/dsh/calibration_scan', help='扫描结果输出目录') parser.add_argument('--pan-step', type=int, default=20, help='水平扫描步长(度)') parser.add_argument('--tilt-min', type=int, default=-35, help='最小 tilt(度)') parser.add_argument('--tilt-max', type=int, default=45, help='最大 tilt(度)') parser.add_argument('--tilt-step', type=int, default=10, help='tilt 扫描步长(度)') parser.add_argument('--zoom', type=int, default=1, help='扫描时使用 zoom') parser.add_argument('--stabilize', type=float, default=1.0, help='PTZ 到位后等待时间(秒)') args = parser.parse_args() run_scan( output_dir=Path(args.output), pan_step=args.pan_step, tilt_range=(args.tilt_min, args.tilt_max), tilt_step=args.tilt_step, zoom=args.zoom, stabilize_time=args.stabilize, ) if __name__ == '__main__': main()