|
@@ -0,0 +1,187 @@
|
|
|
|
|
+# build_calibration_mapping.py
|
|
|
|
|
+import argparse
|
|
|
|
|
+import json
|
|
|
|
|
+import re
|
|
|
|
|
+from pathlib import Path
|
|
|
|
|
+from typing import Dict, List, Optional, Tuple
|
|
|
|
|
+
|
|
|
|
|
+import cv2
|
|
|
|
|
+import numpy as np
|
|
|
|
|
+
|
|
|
|
|
+from matchers import TemplateMatcher, FeatureMatcher
|
|
|
|
|
+from mapping_model import MappingModel
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def parse_filename(filename: str) -> Optional[Tuple[float, float]]:
|
|
|
|
|
+ m = re.match(r'ptz_p(\d+)_t([+-]?\d+)\.jpg', filename)
|
|
|
|
|
+ if not m:
|
|
|
|
|
+ return None
|
|
|
|
|
+ return float(m.group(1)), float(m.group(2))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _to_native(value):
|
|
|
|
|
+ """Convert numpy scalar to native Python type for JSON serialization."""
|
|
|
|
|
+ if isinstance(value, np.generic):
|
|
|
|
|
+ return value.item()
|
|
|
|
|
+ return value
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _convert_result(result):
|
|
|
|
|
+ """Convert matcher result tuple to JSON-serializable native types."""
|
|
|
|
|
+ if result is None:
|
|
|
|
|
+ return None
|
|
|
|
|
+ return tuple(_to_native(v) for v in result)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def fuse_results(tm_result, fm_result) -> Optional[Dict]:
|
|
|
|
|
+ if tm_result is None and fm_result is None:
|
|
|
|
|
+ return None
|
|
|
|
|
+ if tm_result is None:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'x_ratio': fm_result[0],
|
|
|
|
|
+ 'y_ratio': fm_result[1],
|
|
|
|
|
+ 'confidence': 'feature_only',
|
|
|
|
|
+ 'tm_score': None,
|
|
|
|
|
+ 'fm_inliers': fm_result[2],
|
|
|
|
|
+ }
|
|
|
|
|
+ if fm_result is None:
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'x_ratio': tm_result[0],
|
|
|
|
|
+ 'y_ratio': tm_result[1],
|
|
|
|
|
+ 'confidence': 'template_only',
|
|
|
|
|
+ 'tm_score': tm_result[2],
|
|
|
|
|
+ 'fm_inliers': None,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ tm_x, tm_y, tm_score, _ = tm_result
|
|
|
|
|
+ fm_x, fm_y, fm_inliers, _ = fm_result
|
|
|
|
|
+
|
|
|
|
|
+ diff = abs(tm_x - fm_x) + abs(tm_y - fm_y)
|
|
|
|
|
+ if diff > 0.30: # 差异过大,取更可信的一方
|
|
|
|
|
+ if tm_score > 0.7 and fm_inliers < 15:
|
|
|
|
|
+ x, y = tm_x, tm_y
|
|
|
|
|
+ conf = 'template_high_disagree'
|
|
|
|
|
+ else:
|
|
|
|
|
+ x, y = fm_x, fm_y
|
|
|
|
|
+ conf = 'feature_high_disagree'
|
|
|
|
|
+ else:
|
|
|
|
|
+ # 加权融合
|
|
|
|
|
+ w_tm = tm_score * 0.4
|
|
|
|
|
+ w_fm = min(fm_inliers / 30.0, 1.0) * 0.6
|
|
|
|
|
+ total = w_tm + w_fm
|
|
|
|
|
+ if total < 1e-6:
|
|
|
|
|
+ x, y = (tm_x + fm_x) / 2, (tm_y + fm_y) / 2
|
|
|
|
|
+ else:
|
|
|
|
|
+ x = (tm_x * w_tm + fm_x * w_fm) / total
|
|
|
|
|
+ y = (tm_y * w_tm + fm_y * w_fm) / total
|
|
|
|
|
+ conf = 'fused'
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'x_ratio': x,
|
|
|
|
|
+ 'y_ratio': y,
|
|
|
|
|
+ 'confidence': conf,
|
|
|
|
|
+ 'tm_score': tm_score,
|
|
|
|
|
+ 'fm_inliers': fm_inliers,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def draw_match(panorama: np.ndarray, x_ratio: float, y_ratio: float,
|
|
|
|
|
+ pan: float, tilt: float, out_path: Path) -> None:
|
|
|
|
|
+ vis = panorama.copy()
|
|
|
|
|
+ h, w = vis.shape[:2]
|
|
|
|
|
+ cx, cy = int(x_ratio * w), int(y_ratio * h)
|
|
|
|
|
+ cv2.circle(vis, (cx, cy), 15, (0, 0, 255), 2)
|
|
|
|
|
+ cv2.putText(vis, f'P{pan:.0f} T{tilt:.0f}', (cx + 20, cy),
|
|
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 0), 2)
|
|
|
|
|
+ cv2.imwrite(str(out_path), vis)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def main():
|
|
|
|
|
+ parser = argparse.ArgumentParser(description='Build PTZ-panorama coordinate mapping')
|
|
|
|
|
+ parser.add_argument('--ptz-dir', default='ptz_images')
|
|
|
|
|
+ parser.add_argument('--panorama', default='panorama.jpg')
|
|
|
|
|
+ parser.add_argument('--output', default='calibration_group1.json')
|
|
|
|
|
+ parser.add_argument('--detail-output', default='mapping_fused.json')
|
|
|
|
|
+ parser.add_argument('--vis-dir', default='matches_fused')
|
|
|
|
|
+ parser.add_argument('--save-vis', action='store_true')
|
|
|
|
|
+ args = parser.parse_args()
|
|
|
|
|
+
|
|
|
|
|
+ ptz_dir = Path(args.ptz_dir)
|
|
|
|
|
+ panorama_path = Path(args.panorama)
|
|
|
|
|
+ panorama = cv2.imread(str(panorama_path))
|
|
|
|
|
+ if panorama is None:
|
|
|
|
|
+ raise FileNotFoundError(f'Cannot read panorama: {panorama_path}')
|
|
|
|
|
+
|
|
|
|
|
+ tm = TemplateMatcher()
|
|
|
|
|
+ fm = FeatureMatcher()
|
|
|
|
|
+ model = MappingModel()
|
|
|
|
|
+
|
|
|
|
|
+ records = []
|
|
|
|
|
+ detail_records = []
|
|
|
|
|
+
|
|
|
|
|
+ ptz_files = sorted(ptz_dir.glob('ptz_p*_t*.jpg'))
|
|
|
|
|
+ print(f'Found {len(ptz_files)} PTZ images')
|
|
|
|
|
+
|
|
|
|
|
+ if args.save_vis:
|
|
|
|
|
+ vis_dir = Path(args.vis_dir)
|
|
|
|
|
+ vis_dir.mkdir(exist_ok=True)
|
|
|
|
|
+
|
|
|
|
|
+ for ptz_path in ptz_files:
|
|
|
|
|
+ parsed = parse_filename(ptz_path.name)
|
|
|
|
|
+ if parsed is None:
|
|
|
|
|
+ continue
|
|
|
|
|
+ pan, tilt = parsed
|
|
|
|
|
+ ptz_img = cv2.imread(str(ptz_path))
|
|
|
|
|
+ if ptz_img is None:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ tm_result = _convert_result(tm.match(ptz_img, panorama))
|
|
|
|
|
+ fm_result = _convert_result(fm.match(ptz_img, panorama))
|
|
|
|
|
+ fused = fuse_results(tm_result, fm_result)
|
|
|
|
|
+
|
|
|
|
|
+ detail = {
|
|
|
|
|
+ 'filename': ptz_path.name,
|
|
|
|
|
+ 'pan': pan,
|
|
|
|
|
+ 'tilt': tilt,
|
|
|
|
|
+ 'matched': fused is not None,
|
|
|
|
|
+ 'tm': tm_result,
|
|
|
|
|
+ 'fm': fm_result,
|
|
|
|
|
+ 'fused': fused,
|
|
|
|
|
+ }
|
|
|
|
|
+ detail_records.append(detail)
|
|
|
|
|
+
|
|
|
|
|
+ if fused is not None:
|
|
|
|
|
+ record = {
|
|
|
|
|
+ 'pan': pan,
|
|
|
|
|
+ 'tilt': tilt,
|
|
|
|
|
+ 'x_ratio': fused['x_ratio'],
|
|
|
|
|
+ 'y_ratio': fused['y_ratio'],
|
|
|
|
|
+ 'confidence': fused['confidence'],
|
|
|
|
|
+ }
|
|
|
|
|
+ records.append(record)
|
|
|
|
|
+ if args.save_vis:
|
|
|
|
|
+ draw_match(panorama, fused['x_ratio'], fused['y_ratio'], pan, tilt,
|
|
|
|
|
+ vis_dir / f'match_{ptz_path.stem}.jpg')
|
|
|
|
|
+
|
|
|
|
|
+ print(f'Valid fused records: {len(records)}/{len(ptz_files)}')
|
|
|
|
|
+
|
|
|
|
|
+ if len(records) < 4:
|
|
|
|
|
+ raise RuntimeError(f'Too few valid records: {len(records)}')
|
|
|
|
|
+
|
|
|
|
|
+ ok = model.fit(records)
|
|
|
|
|
+ if not ok:
|
|
|
|
|
+ raise RuntimeError('Failed to fit mapping model')
|
|
|
|
|
+
|
|
|
|
|
+ model.save(args.output)
|
|
|
|
|
+ print(f'Saved mapping to {args.output}, RMS error: {model.rms_error:.2f}°')
|
|
|
|
|
+
|
|
|
|
|
+ with open(args.detail_output, 'w', encoding='utf-8') as f:
|
|
|
|
|
+ json.dump({
|
|
|
|
|
+ 'records': detail_records,
|
|
|
|
|
+ 'panorama_size': {'width': int(panorama.shape[1]), 'height': int(panorama.shape[0])},
|
|
|
|
|
+ }, f, indent=2, ensure_ascii=False)
|
|
|
|
|
+ print(f'Saved detail to {args.detail_output}')
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
|
+ main()
|