| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- #!/usr/bin/env python3
- """
- 独立校准扫描脚本
- 按用户方案执行:
- 1. 球机水平 360°、步长 20° 转一圈
- 2. 每个水平位置由下朝上扫描 tilt
- 3. 每个位置拍一张球机图,文件名带 pan_tilt
- 4. 同时拍一张全景图
- 5. 基于这些图做特征匹配 / 人工确认,生成映射表 JSON
- 用法:
- cd /home/admin/dsh/dual_camera_system
- conda activate rknn
- python scripts/calibration_scanner.py --output /home/admin/dsh/calibration_scan
- """
- import os
- import sys
- import json
- import time
- import argparse
- import logging
- from pathlib import Path
- from datetime import datetime
- from typing import List, Tuple, Optional
- # 必须在导入 cv2 之前设置
- os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp|threads;1'
- import cv2
- import numpy as np
- # 把项目根目录加入路径
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from config import CAMERA_GROUPS, SDK_PATH
- from config.camera import parse_resolution
- from dahua_sdk import DahuaSDK
- from ptz_camera import PTZCamera
- from panorama_camera import PanoramaCamera
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger(__name__)
- def ensure_dir(path: Path):
- path.mkdir(parents=True, exist_ok=True)
- return path
- def clear_frame_buffer(cap, count: int = 5, interval: float = 0.05):
- """丢弃缓存中的旧帧,获取最新帧"""
- for _ in range(count):
- cap.grab()
- time.sleep(interval)
- return cap.read()
- def save_image(path: Path, img: np.ndarray):
- cv2.imwrite(str(path), img, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
- def match_to_panorama(
- ptz_img: np.ndarray,
- panorama_img: np.ndarray,
- min_matches: int = 10,
- ratio_thresh: float = 0.75,
- ) -> Tuple[Optional[Tuple[float, float]], Optional[np.ndarray]]:
- """
- 将球机图与全景图做 SIFT 特征匹配,返回全景中的归一化位置 (x_ratio, y_ratio)
- 以及可视化匹配图
- """
- if ptz_img is None or panorama_img is None:
- return None, None
- # 转换为灰度图
- gray_p = cv2.cvtColor(ptz_img, cv2.COLOR_BGR2GRAY)
- gray_g = cv2.cvtColor(panorama_img, cv2.COLOR_BGR2GRAY)
- # 使用 SIFT
- sift = cv2.SIFT_create(nfeatures=500)
- kp_p, des_p = sift.detectAndCompute(gray_p, None)
- kp_g, des_g = sift.detectAndCompute(gray_g, None)
- if des_p is None or des_g is None or len(kp_p) < 4 or len(kp_g) < 4:
- return None, None
- # FLANN 匹配
- index_params = dict(algorithm=1, trees=5)
- search_params = dict(checks=50)
- flann = cv2.FlannBasedMatcher(index_params, search_params)
- matches = flann.knnMatch(des_p, des_g, k=2)
- good = []
- for m_n in matches:
- if len(m_n) == 2:
- m, n = m_n
- if m.distance < ratio_thresh * n.distance:
- good.append(m)
- if len(good) < min_matches:
- return None, None
- # 提取匹配点坐标
- pts_p = np.float32([kp_p[m.queryIdx].pt for m in good])
- pts_g = np.float32([kp_g[m.trainIdx].pt for m in good])
- # 用 RANSAC 过滤异常点
- H, mask = cv2.findHomography(pts_p, pts_g, cv2.RANSAC, 5.0)
- if mask is None:
- return None, None
- inlier_g = pts_g[mask.ravel() == 1]
- if len(inlier_g) < min_matches:
- return None, None
- # 计算全景位置中心
- center_x = float(np.median(inlier_g[:, 0]))
- center_y = float(np.median(inlier_g[:, 1]))
- h, w = panorama_img.shape[:2]
- x_ratio = np.clip(center_x / w, 0.0, 1.0)
- y_ratio = np.clip(center_y / h, 0.0, 1.0)
- # 绘制匹配可视化
- vis = cv2.drawMatches(
- ptz_img, kp_p, panorama_img, kp_g,
- [good[i] for i in range(len(good)) if mask[i]],
- None, flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS
- )
- return (x_ratio, y_ratio), vis
- def run_scan(
- output_dir: Path,
- pan_step: int = 20,
- tilt_range: Tuple[int, int] = (-35, 45),
- tilt_step: int = 10,
- zoom: int = 1,
- stabilize_time: float = 1.0,
- ):
- """执行完整扫描"""
- ensure_dir(output_dir)
- ptz_dir = ensure_dir(output_dir / 'ptz_images')
- match_dir = ensure_dir(output_dir / 'matches')
- # 加载第一组配置
- groups = [g for g in CAMERA_GROUPS if g.get('enabled', False)]
- if not groups:
- logger.error('没有启用的摄像头组')
- return
- group = groups[0]
- pano_cfg = group['panorama']
- ptz_cfg = group['ptz']
- # 初始化 SDK
- sdk_lib = os.path.join(SDK_PATH['lib_path'], SDK_PATH['netsdk'])
- sdk = DahuaSDK(sdk_lib)
- if not sdk.initialized or not sdk.init():
- logger.error('大华 SDK 初始化失败')
- return
- # 连接枪机
- logger.info('连接全景摄像头...')
- panorama = PanoramaCamera(sdk, pano_cfg)
- if not panorama.connect():
- logger.error('全景摄像头连接失败')
- sdk.cleanup()
- return
- if not panorama.start_stream_rtsp():
- logger.error('全景 RTSP 启动失败')
- panorama.disconnect()
- sdk.cleanup()
- return
- # 连接球机
- logger.info('连接 PTZ 球机...')
- ptz = PTZCamera(sdk, ptz_cfg)
- if not ptz.connect():
- logger.error('PTZ 球机连接失败')
- panorama.disconnect()
- sdk.cleanup()
- return
- if not ptz.start_stream_rtsp():
- logger.warning('PTZ RTSP 启动失败,但仍可控制云台')
- # 等待视频流稳定
- logger.info('等待视频流稳定 5 秒...')
- time.sleep(5)
- # 获取并保存全景图
- logger.info('保存全景图...')
- pano_frame = None
- for _ in range(10):
- pano_frame = panorama.get_frame()
- if pano_frame is not None:
- break
- time.sleep(0.5)
- if pano_frame is None:
- logger.error('无法获取全景图')
- ptz.disconnect()
- panorama.disconnect()
- sdk.cleanup()
- return
- pano_path = output_dir / 'panorama.jpg'
- save_image(pano_path, pano_frame)
- logger.info(f'全景图已保存: {pano_path}')
- h, w = pano_frame.shape[:2]
- logger.info(f'全景图尺寸: {w}x{h}')
- # 构建扫描位置列表
- scan_positions: List[Tuple[int, int]] = []
- pan = 0
- while pan < 360:
- tilt = tilt_range[0]
- while tilt <= tilt_range[1]:
- scan_positions.append((pan, tilt))
- tilt += tilt_step
- pan += pan_step
- logger.info(f'扫描位置总数: {len(scan_positions)}')
- # 扫描并保存图片
- mapping_records = []
- failed_positions = []
- for idx, (pan, tilt) in enumerate(scan_positions, 1):
- logger.info(f'[{idx}/{len(scan_positions)}] pan={pan}°, tilt={tilt}°')
- # 移动球机
- if not ptz.goto_exact_position(float(pan), float(tilt), zoom):
- logger.warning(f' 移动到 pan={pan}, tilt={tilt} 失败')
- failed_positions.append((pan, tilt))
- continue
- # 等待稳定
- time.sleep(stabilize_time)
- # 获取清晰帧
- frame = None
- for _ in range(5):
- frame = ptz.get_frame()
- if frame is not None:
- break
- time.sleep(0.2)
- if frame is None:
- logger.warning(f' 获取帧失败: pan={pan}, tilt={tilt}')
- failed_positions.append((pan, tilt))
- continue
- # 保存球机图
- filename = f'ptz_p{pan:03d}_t{tilt:+03d}.jpg'
- img_path = ptz_dir / filename
- save_image(img_path, frame)
- # 尝试与全景图匹配
- pos, vis = match_to_panorama(frame, pano_frame)
- record = {
- 'pan': pan,
- 'tilt': tilt,
- 'zoom': zoom,
- 'filename': filename,
- 'matched': pos is not None,
- }
- if pos:
- record['x_ratio'] = round(pos[0], 4)
- record['y_ratio'] = round(pos[1], 4)
- record['panorama_x'] = int(pos[0] * w)
- record['panorama_y'] = int(pos[1] * h)
- logger.info(f' 匹配成功: 全景位置=({pos[0]:.3f}, {pos[1]:.3f})')
- # 保存可视化匹配图
- if vis is not None:
- vis_path = match_dir / f'match_{filename}'
- save_image(vis_path, vis)
- else:
- logger.info(' 未匹配到全景区域')
- mapping_records.append(record)
- # 生成映射表
- mapping = {
- 'created_at': datetime.now().isoformat(),
- 'panorama_size': {'width': w, 'height': h},
- 'pan_step': pan_step,
- 'tilt_range': tilt_range,
- 'tilt_step': tilt_step,
- 'zoom': zoom,
- 'total_positions': len(scan_positions),
- 'failed_positions': failed_positions,
- 'records': mapping_records,
- }
- mapping_path = output_dir / 'mapping_raw.json'
- with open(mapping_path, 'w', encoding='utf-8') as f:
- json.dump(mapping, f, indent=2, ensure_ascii=False)
- logger.info(f'原始映射表已保存: {mapping_path}')
- # 生成可直接用于校准器的查找表(仅包含有匹配的点)
- valid_records = [r for r in mapping_records if r.get('matched')]
- pan_lookup = sorted([[r['x_ratio'], float(r['pan'])] for r in valid_records], key=lambda x: x[0])
- tilt_lookup = sorted([[r['y_ratio'], float(r['tilt'])] for r in valid_records], key=lambda x: x[0])
- lookup = {
- 'created_at': datetime.now().isoformat(),
- 'pan_lookup': pan_lookup,
- 'tilt_lookup': tilt_lookup,
- 'valid_count': len(valid_records),
- }
- lookup_path = output_dir / 'lookup_table.json'
- with open(lookup_path, 'w', encoding='utf-8') as f:
- json.dump(lookup, f, indent=2, ensure_ascii=False)
- logger.info(f'查找表已保存: {lookup_path} (有效点 {len(valid_records)}/{len(scan_positions)})')
- # 生成人工复核用 CSV
- csv_path = output_dir / 'mapping_for_review.csv'
- with open(csv_path, 'w', encoding='utf-8') as f:
- f.write('filename,pan,tilt,x_ratio,y_ratio,panorama_x,panorama_y,matched,review_x,review_y\n')
- for r in mapping_records:
- f.write(
- f"{r['filename']},{r['pan']},{r['tilt']},"
- f"{r.get('x_ratio', '')},{r.get('y_ratio', '')},"
- f"{r.get('panorama_x', '')},{r.get('panorama_y', '')},"
- f"{r['matched']},,\n"
- )
- logger.info(f'人工复核 CSV 已保存: {csv_path}')
- # 回到初始位置
- ptz.goto_exact_position(0.0, 0.0, 1)
- # 清理
- ptz.disconnect()
- panorama.disconnect()
- sdk.cleanup()
- logger.info('扫描完成')
- def main():
- parser = argparse.ArgumentParser(description='PTZ 校准扫描工具')
- parser.add_argument('--output', type=str, default='/home/admin/dsh/calibration_scan',
- help='扫描结果输出目录')
- parser.add_argument('--pan-step', type=int, default=20,
- help='水平扫描步长(度)')
- parser.add_argument('--tilt-min', type=int, default=-35,
- help='最小 tilt(度)')
- parser.add_argument('--tilt-max', type=int, default=45,
- help='最大 tilt(度)')
- parser.add_argument('--tilt-step', type=int, default=10,
- help='tilt 扫描步长(度)')
- parser.add_argument('--zoom', type=int, default=1,
- help='扫描时使用 zoom')
- parser.add_argument('--stabilize', type=float, default=1.0,
- help='PTZ 到位后等待时间(秒)')
- args = parser.parse_args()
- run_scan(
- output_dir=Path(args.output),
- pan_step=args.pan_step,
- tilt_range=(args.tilt_min, args.tilt_max),
- tilt_step=args.tilt_step,
- zoom=args.zoom,
- stabilize_time=args.stabilize,
- )
- if __name__ == '__main__':
- main()
|