calibration_scanner.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. #!/usr/bin/env python3
  2. """
  3. 独立校准扫描脚本
  4. 按用户方案执行:
  5. 1. 球机水平 360°、步长 20° 转一圈
  6. 2. 每个水平位置由下朝上扫描 tilt
  7. 3. 每个位置拍一张球机图,文件名带 pan_tilt
  8. 4. 同时拍一张全景图
  9. 5. 基于这些图做特征匹配 / 人工确认,生成映射表 JSON
  10. 用法:
  11. cd /home/admin/dsh/dual_camera_system
  12. conda activate rknn
  13. python scripts/calibration_scanner.py --output /home/admin/dsh/calibration_scan
  14. """
  15. import os
  16. import sys
  17. import json
  18. import time
  19. import argparse
  20. import logging
  21. from pathlib import Path
  22. from datetime import datetime
  23. from typing import List, Tuple, Optional
  24. # 必须在导入 cv2 之前设置
  25. os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp|threads;1'
  26. import cv2
  27. import numpy as np
  28. # 把项目根目录加入路径
  29. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  30. from config import CAMERA_GROUPS, SDK_PATH
  31. from config.camera import parse_resolution
  32. from dahua_sdk import DahuaSDK
  33. from ptz_camera import PTZCamera
  34. from panorama_camera import PanoramaCamera
  35. logging.basicConfig(
  36. level=logging.INFO,
  37. format='%(asctime)s - %(levelname)s - %(message)s'
  38. )
  39. logger = logging.getLogger(__name__)
  40. def ensure_dir(path: Path):
  41. path.mkdir(parents=True, exist_ok=True)
  42. return path
  43. def clear_frame_buffer(cap, count: int = 5, interval: float = 0.05):
  44. """丢弃缓存中的旧帧,获取最新帧"""
  45. for _ in range(count):
  46. cap.grab()
  47. time.sleep(interval)
  48. return cap.read()
  49. def save_image(path: Path, img: np.ndarray):
  50. cv2.imwrite(str(path), img, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
  51. def match_to_panorama(
  52. ptz_img: np.ndarray,
  53. panorama_img: np.ndarray,
  54. min_matches: int = 10,
  55. ratio_thresh: float = 0.75,
  56. ) -> Tuple[Optional[Tuple[float, float]], Optional[np.ndarray]]:
  57. """
  58. 将球机图与全景图做 SIFT 特征匹配,返回全景中的归一化位置 (x_ratio, y_ratio)
  59. 以及可视化匹配图
  60. """
  61. if ptz_img is None or panorama_img is None:
  62. return None, None
  63. # 转换为灰度图
  64. gray_p = cv2.cvtColor(ptz_img, cv2.COLOR_BGR2GRAY)
  65. gray_g = cv2.cvtColor(panorama_img, cv2.COLOR_BGR2GRAY)
  66. # 使用 SIFT
  67. sift = cv2.SIFT_create(nfeatures=500)
  68. kp_p, des_p = sift.detectAndCompute(gray_p, None)
  69. kp_g, des_g = sift.detectAndCompute(gray_g, None)
  70. if des_p is None or des_g is None or len(kp_p) < 4 or len(kp_g) < 4:
  71. return None, None
  72. # FLANN 匹配
  73. index_params = dict(algorithm=1, trees=5)
  74. search_params = dict(checks=50)
  75. flann = cv2.FlannBasedMatcher(index_params, search_params)
  76. matches = flann.knnMatch(des_p, des_g, k=2)
  77. good = []
  78. for m_n in matches:
  79. if len(m_n) == 2:
  80. m, n = m_n
  81. if m.distance < ratio_thresh * n.distance:
  82. good.append(m)
  83. if len(good) < min_matches:
  84. return None, None
  85. # 提取匹配点坐标
  86. pts_p = np.float32([kp_p[m.queryIdx].pt for m in good])
  87. pts_g = np.float32([kp_g[m.trainIdx].pt for m in good])
  88. # 用 RANSAC 过滤异常点
  89. H, mask = cv2.findHomography(pts_p, pts_g, cv2.RANSAC, 5.0)
  90. if mask is None:
  91. return None, None
  92. inlier_g = pts_g[mask.ravel() == 1]
  93. if len(inlier_g) < min_matches:
  94. return None, None
  95. # 计算全景位置中心
  96. center_x = float(np.median(inlier_g[:, 0]))
  97. center_y = float(np.median(inlier_g[:, 1]))
  98. h, w = panorama_img.shape[:2]
  99. x_ratio = np.clip(center_x / w, 0.0, 1.0)
  100. y_ratio = np.clip(center_y / h, 0.0, 1.0)
  101. # 绘制匹配可视化
  102. vis = cv2.drawMatches(
  103. ptz_img, kp_p, panorama_img, kp_g,
  104. [good[i] for i in range(len(good)) if mask[i]],
  105. None, flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS
  106. )
  107. return (x_ratio, y_ratio), vis
  108. def run_scan(
  109. output_dir: Path,
  110. pan_step: int = 20,
  111. tilt_range: Tuple[int, int] = (-35, 45),
  112. tilt_step: int = 10,
  113. zoom: int = 1,
  114. stabilize_time: float = 1.0,
  115. ):
  116. """执行完整扫描"""
  117. ensure_dir(output_dir)
  118. ptz_dir = ensure_dir(output_dir / 'ptz_images')
  119. match_dir = ensure_dir(output_dir / 'matches')
  120. # 加载第一组配置
  121. groups = [g for g in CAMERA_GROUPS if g.get('enabled', False)]
  122. if not groups:
  123. logger.error('没有启用的摄像头组')
  124. return
  125. group = groups[0]
  126. pano_cfg = group['panorama']
  127. ptz_cfg = group['ptz']
  128. # 初始化 SDK
  129. sdk_lib = os.path.join(SDK_PATH['lib_path'], SDK_PATH['netsdk'])
  130. sdk = DahuaSDK(sdk_lib)
  131. if not sdk.initialized or not sdk.init():
  132. logger.error('大华 SDK 初始化失败')
  133. return
  134. # 连接枪机
  135. logger.info('连接全景摄像头...')
  136. panorama = PanoramaCamera(sdk, pano_cfg)
  137. if not panorama.connect():
  138. logger.error('全景摄像头连接失败')
  139. sdk.cleanup()
  140. return
  141. if not panorama.start_stream_rtsp():
  142. logger.error('全景 RTSP 启动失败')
  143. panorama.disconnect()
  144. sdk.cleanup()
  145. return
  146. # 连接球机
  147. logger.info('连接 PTZ 球机...')
  148. ptz = PTZCamera(sdk, ptz_cfg)
  149. if not ptz.connect():
  150. logger.error('PTZ 球机连接失败')
  151. panorama.disconnect()
  152. sdk.cleanup()
  153. return
  154. if not ptz.start_stream_rtsp():
  155. logger.warning('PTZ RTSP 启动失败,但仍可控制云台')
  156. # 等待视频流稳定
  157. logger.info('等待视频流稳定 5 秒...')
  158. time.sleep(5)
  159. # 获取并保存全景图
  160. logger.info('保存全景图...')
  161. pano_frame = None
  162. for _ in range(10):
  163. pano_frame = panorama.get_frame()
  164. if pano_frame is not None:
  165. break
  166. time.sleep(0.5)
  167. if pano_frame is None:
  168. logger.error('无法获取全景图')
  169. ptz.disconnect()
  170. panorama.disconnect()
  171. sdk.cleanup()
  172. return
  173. pano_path = output_dir / 'panorama.jpg'
  174. save_image(pano_path, pano_frame)
  175. logger.info(f'全景图已保存: {pano_path}')
  176. h, w = pano_frame.shape[:2]
  177. logger.info(f'全景图尺寸: {w}x{h}')
  178. # 构建扫描位置列表
  179. scan_positions: List[Tuple[int, int]] = []
  180. pan = 0
  181. while pan < 360:
  182. tilt = tilt_range[0]
  183. while tilt <= tilt_range[1]:
  184. scan_positions.append((pan, tilt))
  185. tilt += tilt_step
  186. pan += pan_step
  187. logger.info(f'扫描位置总数: {len(scan_positions)}')
  188. # 扫描并保存图片
  189. mapping_records = []
  190. failed_positions = []
  191. for idx, (pan, tilt) in enumerate(scan_positions, 1):
  192. logger.info(f'[{idx}/{len(scan_positions)}] pan={pan}°, tilt={tilt}°')
  193. # 移动球机
  194. if not ptz.goto_exact_position(float(pan), float(tilt), zoom):
  195. logger.warning(f' 移动到 pan={pan}, tilt={tilt} 失败')
  196. failed_positions.append((pan, tilt))
  197. continue
  198. # 等待稳定
  199. time.sleep(stabilize_time)
  200. # 获取清晰帧
  201. frame = None
  202. for _ in range(5):
  203. frame = ptz.get_frame()
  204. if frame is not None:
  205. break
  206. time.sleep(0.2)
  207. if frame is None:
  208. logger.warning(f' 获取帧失败: pan={pan}, tilt={tilt}')
  209. failed_positions.append((pan, tilt))
  210. continue
  211. # 保存球机图
  212. filename = f'ptz_p{pan:03d}_t{tilt:+03d}.jpg'
  213. img_path = ptz_dir / filename
  214. save_image(img_path, frame)
  215. # 尝试与全景图匹配
  216. pos, vis = match_to_panorama(frame, pano_frame)
  217. record = {
  218. 'pan': pan,
  219. 'tilt': tilt,
  220. 'zoom': zoom,
  221. 'filename': filename,
  222. 'matched': pos is not None,
  223. }
  224. if pos:
  225. record['x_ratio'] = round(pos[0], 4)
  226. record['y_ratio'] = round(pos[1], 4)
  227. record['panorama_x'] = int(pos[0] * w)
  228. record['panorama_y'] = int(pos[1] * h)
  229. logger.info(f' 匹配成功: 全景位置=({pos[0]:.3f}, {pos[1]:.3f})')
  230. # 保存可视化匹配图
  231. if vis is not None:
  232. vis_path = match_dir / f'match_{filename}'
  233. save_image(vis_path, vis)
  234. else:
  235. logger.info(' 未匹配到全景区域')
  236. mapping_records.append(record)
  237. # 生成映射表
  238. mapping = {
  239. 'created_at': datetime.now().isoformat(),
  240. 'panorama_size': {'width': w, 'height': h},
  241. 'pan_step': pan_step,
  242. 'tilt_range': tilt_range,
  243. 'tilt_step': tilt_step,
  244. 'zoom': zoom,
  245. 'total_positions': len(scan_positions),
  246. 'failed_positions': failed_positions,
  247. 'records': mapping_records,
  248. }
  249. mapping_path = output_dir / 'mapping_raw.json'
  250. with open(mapping_path, 'w', encoding='utf-8') as f:
  251. json.dump(mapping, f, indent=2, ensure_ascii=False)
  252. logger.info(f'原始映射表已保存: {mapping_path}')
  253. # 生成可直接用于校准器的查找表(仅包含有匹配的点)
  254. valid_records = [r for r in mapping_records if r.get('matched')]
  255. pan_lookup = sorted([[r['x_ratio'], float(r['pan'])] for r in valid_records], key=lambda x: x[0])
  256. tilt_lookup = sorted([[r['y_ratio'], float(r['tilt'])] for r in valid_records], key=lambda x: x[0])
  257. lookup = {
  258. 'created_at': datetime.now().isoformat(),
  259. 'pan_lookup': pan_lookup,
  260. 'tilt_lookup': tilt_lookup,
  261. 'valid_count': len(valid_records),
  262. }
  263. lookup_path = output_dir / 'lookup_table.json'
  264. with open(lookup_path, 'w', encoding='utf-8') as f:
  265. json.dump(lookup, f, indent=2, ensure_ascii=False)
  266. logger.info(f'查找表已保存: {lookup_path} (有效点 {len(valid_records)}/{len(scan_positions)})')
  267. # 生成人工复核用 CSV
  268. csv_path = output_dir / 'mapping_for_review.csv'
  269. with open(csv_path, 'w', encoding='utf-8') as f:
  270. f.write('filename,pan,tilt,x_ratio,y_ratio,panorama_x,panorama_y,matched,review_x,review_y\n')
  271. for r in mapping_records:
  272. f.write(
  273. f"{r['filename']},{r['pan']},{r['tilt']},"
  274. f"{r.get('x_ratio', '')},{r.get('y_ratio', '')},"
  275. f"{r.get('panorama_x', '')},{r.get('panorama_y', '')},"
  276. f"{r['matched']},,\n"
  277. )
  278. logger.info(f'人工复核 CSV 已保存: {csv_path}')
  279. # 回到初始位置
  280. ptz.goto_exact_position(0.0, 0.0, 1)
  281. # 清理
  282. ptz.disconnect()
  283. panorama.disconnect()
  284. sdk.cleanup()
  285. logger.info('扫描完成')
  286. def main():
  287. parser = argparse.ArgumentParser(description='PTZ 校准扫描工具')
  288. parser.add_argument('--output', type=str, default='/home/admin/dsh/calibration_scan',
  289. help='扫描结果输出目录')
  290. parser.add_argument('--pan-step', type=int, default=20,
  291. help='水平扫描步长(度)')
  292. parser.add_argument('--tilt-min', type=int, default=-35,
  293. help='最小 tilt(度)')
  294. parser.add_argument('--tilt-max', type=int, default=45,
  295. help='最大 tilt(度)')
  296. parser.add_argument('--tilt-step', type=int, default=10,
  297. help='tilt 扫描步长(度)')
  298. parser.add_argument('--zoom', type=int, default=1,
  299. help='扫描时使用 zoom')
  300. parser.add_argument('--stabilize', type=float, default=1.0,
  301. help='PTZ 到位后等待时间(秒)')
  302. args = parser.parse_args()
  303. run_scan(
  304. output_dir=Path(args.output),
  305. pan_step=args.pan_step,
  306. tilt_range=(args.tilt_min, args.tilt_max),
  307. tilt_step=args.tilt_step,
  308. zoom=args.zoom,
  309. stabilize_time=args.stabilize,
  310. )
  311. if __name__ == '__main__':
  312. main()