| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- #!/usr/bin/env python3
- """
- 用 ORB 重新匹配已扫描的球机图与全景图,生成更可靠的映射表。
- 用法:
- python scripts/re_match_orb.py --scan-dir /home/admin/dsh/calibration_scan
- """
- import os
- import sys
- import json
- import argparse
- import logging
- from pathlib import Path
- from typing import List, Tuple, Optional
- import cv2
- import numpy as np
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- def match_orb(
- ptz_img: np.ndarray,
- panorama_img: np.ndarray,
- min_matches: int = 8,
- ratio_thresh: float = 0.8,
- ) -> Tuple[Optional[Tuple[float, float]], Optional[np.ndarray]]:
- """ORB 特征匹配"""
- if ptz_img is None or panorama_img is None:
- return None, None
- gray_p = cv2.cvtColor(ptz_img, cv2.COLOR_BGR2GRAY)
- gray_g = cv2.cvtColor(panorama_img, cv2.COLOR_BGR2GRAY)
- orb = cv2.ORB_create(nfeatures=1000)
- kp_p, des_p = orb.detectAndCompute(gray_p, None)
- kp_g, des_g = orb.detectAndCompute(gray_g, None)
- if des_p is None or des_g is None or len(kp_p) < 4 or len(kp_g) < 4:
- return None, None
- bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
- matches = bf.knnMatch(des_p, des_g, k=2)
- good = []
- for m_n in matches:
- if len(m_n) == 2:
- m, n = m_n
- if m.distance < ratio_thresh * n.distance:
- good.append(m)
- if len(good) < min_matches:
- return None, None
- pts_p = np.float32([kp_p[m.queryIdx].pt for m in good])
- pts_g = np.float32([kp_g[m.trainIdx].pt for m in good])
- H, mask = cv2.findHomography(pts_p, pts_g, cv2.RANSAC, 5.0)
- if mask is None:
- return None, None
- inlier_g = pts_g[mask.ravel() == 1]
- if len(inlier_g) < min_matches:
- return None, None
- center_x = float(np.median(inlier_g[:, 0]))
- center_y = float(np.median(inlier_g[:, 1]))
- h, w = panorama_img.shape[:2]
- x_ratio = np.clip(center_x / w, 0.0, 1.0)
- y_ratio = np.clip(center_y / h, 0.0, 1.0)
- vis = cv2.drawMatches(
- ptz_img, kp_p, panorama_img, kp_g,
- [good[i] for i in range(len(good)) if mask[i]],
- None, flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS
- )
- return (x_ratio, y_ratio), vis
- def run_rematch(scan_dir: Path):
- ptz_dir = scan_dir / 'ptz_images'
- pano_path = scan_dir / 'panorama.jpg'
- out_dir = scan_dir / 'rematch_orb'
- out_dir.mkdir(exist_ok=True)
- panorama = cv2.imread(str(pano_path))
- if panorama is None:
- logger.error(f'无法读取全景图: {pano_path}')
- return
- h, w = panorama.shape[:2]
- records = []
- for img_path in sorted(ptz_dir.glob('ptz_p*_t*.jpg')):
- # 解析文件名中的 pan/tilt
- stem = img_path.stem # e.g. ptz_p080_t-05
- parts = stem.replace('ptz_p', '').split('_t')
- pan = int(parts[0])
- tilt = int(parts[1])
- ptz_img = cv2.imread(str(img_path))
- if ptz_img is None:
- continue
- pos, vis = match_orb(ptz_img, panorama)
- record = {
- 'filename': img_path.name,
- 'pan': pan,
- 'tilt': tilt,
- 'matched': pos is not None,
- }
- if pos:
- record['x_ratio'] = round(pos[0], 4)
- record['y_ratio'] = round(pos[1], 4)
- record['panorama_x'] = int(pos[0] * w)
- record['panorama_y'] = int(pos[1] * h)
- vis_path = out_dir / f'match_{img_path.name}'
- cv2.imwrite(str(vis_path), vis, [int(cv2.IMWRITE_JPEG_QUALITY), 85])
- logger.info(f'{img_path.name} -> ({pos[0]:.3f}, {pos[1]:.3f})')
- else:
- logger.info(f'{img_path.name} -> 未匹配')
- records.append(record)
- # 保存映射表
- mapping_path = scan_dir / 'mapping_raw_orb.json'
- with open(mapping_path, 'w', encoding='utf-8') as f:
- json.dump({
- 'method': 'ORB',
- 'panorama_size': {'width': w, 'height': h},
- 'records': records,
- }, f, indent=2, ensure_ascii=False)
- logger.info(f'原始映射已保存: {mapping_path}')
- # 生成查找表
- valid = [r for r in records if r.get('matched')]
- pan_lookup = sorted([[r['x_ratio'], float(r['pan'])] for r in valid], key=lambda x: x[0])
- tilt_lookup = sorted([[r['y_ratio'], float(r['tilt'])] for r in valid], key=lambda x: x[0])
- lookup_path = scan_dir / 'lookup_table_orb.json'
- with open(lookup_path, 'w', encoding='utf-8') as f:
- json.dump({
- 'method': 'ORB',
- 'pan_lookup': pan_lookup,
- 'tilt_lookup': tilt_lookup,
- 'valid_count': len(valid),
- }, f, indent=2, ensure_ascii=False)
- logger.info(f'ORB 查找表已保存: {lookup_path} (有效 {len(valid)}/{len(records)})')
- # 更新 CSV
- csv_path = scan_dir / 'mapping_for_review_orb.csv'
- with open(csv_path, 'w', encoding='utf-8') as f:
- f.write('filename,pan,tilt,x_ratio,y_ratio,panorama_x,panorama_y,matched,review_x,review_y\n')
- for r in records:
- f.write(
- f"{r['filename']},{r['pan']},{r['tilt']},"
- f"{r.get('x_ratio', '')},{r.get('y_ratio', '')},"
- f"{r.get('panorama_x', '')},{r.get('panorama_y', '')},"
- f"{r['matched']},,\n"
- )
- logger.info(f'复核 CSV 已保存: {csv_path}')
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument('--scan-dir', type=str, default='/home/admin/dsh/calibration_scan',
- help='扫描结果目录')
- args = parser.parse_args()
- run_rematch(Path(args.scan_dir))
- if __name__ == '__main__':
- main()
|