|
|
@@ -0,0 +1,642 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+"""
|
|
|
+交互式手动标定脚本(Web UI 版)
|
|
|
+
|
|
|
+流程:
|
|
|
+1. 用户在 panorama.jpg 上选择标定点,写入 points.txt(每行:x_ratio y_ratio [备注])
|
|
|
+2. 脚本抓取全景图、启动 Web UI
|
|
|
+3. 脚本控制球机移动到每个点的初始估计位置并抓拍
|
|
|
+4. 用户在浏览器点击按钮调整 pan/tilt,确认后进入下一点
|
|
|
+5. 最终生成 calibration_group1_manual.json
|
|
|
+
|
|
|
+用法:
|
|
|
+ python manual_calibrate.py --points points.txt --output manual_calib --http-port 8000
|
|
|
+"""
|
|
|
+import os
|
|
|
+import sys
|
|
|
+import json
|
|
|
+import time
|
|
|
+import argparse
|
|
|
+import http.server
|
|
|
+import socketserver
|
|
|
+import threading
|
|
|
+import queue
|
|
|
+from pathlib import Path
|
|
|
+from datetime import datetime
|
|
|
+from urllib.parse import urlparse, parse_qs
|
|
|
+
|
|
|
+# 必须在导入 cv2 前设置
|
|
|
+os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp|threads;1'
|
|
|
+
|
|
|
+import cv2
|
|
|
+import numpy as np
|
|
|
+
|
|
|
+sys.path.insert(0, '/home/admin/dsh/dual_camera_system')
|
|
|
+
|
|
|
+from dahua_sdk import DahuaSDK
|
|
|
+from ptz_camera import PTZCamera
|
|
|
+
|
|
|
+
|
|
|
+# 全局状态
|
|
|
+calib_state = {
|
|
|
+ 'running': True,
|
|
|
+ 'point_idx': 0,
|
|
|
+ 'total_points': 0,
|
|
|
+ 'pan': 0.0,
|
|
|
+ 'tilt': 0.0,
|
|
|
+ 'zoom': 1,
|
|
|
+ 'note': '',
|
|
|
+ 'status': 'initializing', # initializing, moving, waiting, finished, error
|
|
|
+ 'message': '初始化中...',
|
|
|
+ 'image_ts': 0,
|
|
|
+ 'calib_points': [],
|
|
|
+ 'http_port': 8000,
|
|
|
+}
|
|
|
+
|
|
|
+cmd_queue = queue.Queue()
|
|
|
+
|
|
|
+
|
|
|
+def load_points(points_path: str) -> list:
|
|
|
+ """从文件加载标定点,格式:每行 x_ratio y_ratio [备注]"""
|
|
|
+ points = []
|
|
|
+ if not os.path.exists(points_path):
|
|
|
+ return points
|
|
|
+ with open(points_path, 'r', encoding='utf-8') as f:
|
|
|
+ for line in f:
|
|
|
+ line = line.strip()
|
|
|
+ if not line or line.startswith('#'):
|
|
|
+ continue
|
|
|
+ parts = line.split()
|
|
|
+ if len(parts) >= 2:
|
|
|
+ x = float(parts[0])
|
|
|
+ y = float(parts[1])
|
|
|
+ note = ' '.join(parts[2:]) if len(parts) > 2 else ''
|
|
|
+ points.append({'x': x, 'y': y, 'note': note})
|
|
|
+ return points
|
|
|
+
|
|
|
+
|
|
|
+def estimate_initial_ptz(x_ratio: float, y_ratio: float) -> tuple:
|
|
|
+ """用粗略线性模型估计初始 PTZ 角度"""
|
|
|
+ pan = 340.0 - 160.0 * x_ratio
|
|
|
+ tilt = 25.0 - 40.0 * y_ratio
|
|
|
+ return pan, tilt
|
|
|
+
|
|
|
+
|
|
|
+def capture_ptz_frame(ptz: PTZCamera, attempts: int = 15) -> np.ndarray:
|
|
|
+ """抓取清晰的 PTZ 帧"""
|
|
|
+ best_frame = None
|
|
|
+ best_var = 0
|
|
|
+ for _ in range(attempts):
|
|
|
+ frame = ptz.get_frame()
|
|
|
+ if frame is not None:
|
|
|
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
+ var = cv2.Laplacian(gray, cv2.CV_64F).var()
|
|
|
+ if var > best_var:
|
|
|
+ best_var = var
|
|
|
+ best_frame = frame
|
|
|
+ if best_var > 200:
|
|
|
+ break
|
|
|
+ time.sleep(0.05)
|
|
|
+ return best_frame
|
|
|
+
|
|
|
+
|
|
|
+def save_panorama_with_marks(panorama: np.ndarray, points: list, output_path: Path):
|
|
|
+ """在全景图上标记所有标定点"""
|
|
|
+ vis = panorama.copy()
|
|
|
+ h, w = vis.shape[:2]
|
|
|
+ for i, p in enumerate(points):
|
|
|
+ cx, cy = int(p['x'] * w), int(p['y'] * h)
|
|
|
+ cv2.circle(vis, (cx, cy), 30, (0, 0, 255), 4)
|
|
|
+ cv2.putText(vis, f"P{i+1}", (cx + 40, cy),
|
|
|
+ cv2.FONT_HERSHEY_SIMPLEX, 2.0, (0, 255, 0), 4)
|
|
|
+ cv2.imwrite(str(output_path), vis)
|
|
|
+ return output_path
|
|
|
+
|
|
|
+
|
|
|
+def save_ptz_shot(frame: np.ndarray, shots_dir: Path, latest_path: Path,
|
|
|
+ point_idx: int, pan: float, tilt: float):
|
|
|
+ """保存 PTZ 历史图和 latest_ptz.jpg"""
|
|
|
+ history_path = shots_dir / f'P{point_idx:02d}_p{int(pan)}_t{int(tilt)}.jpg'
|
|
|
+ cv2.imwrite(str(history_path), frame)
|
|
|
+ cv2.imwrite(str(latest_path), frame)
|
|
|
+ return history_path
|
|
|
+
|
|
|
+
|
|
|
+def generate_calibration(calib_points: list, output_path: Path):
|
|
|
+ """从标定点生成 calibration_group1.json"""
|
|
|
+ if len(calib_points) < 4:
|
|
|
+ print(f"标定点不足 ({len(calib_points)} 个),无法生成校准文件")
|
|
|
+ return False
|
|
|
+
|
|
|
+ pan_entries = sorted([[p['x'], float(p['pan'])] for p in calib_points], key=lambda item: item[0])
|
|
|
+ tilt_entries = sorted([[p['y'], float(p['tilt'])] for p in calib_points], key=lambda item: item[0])
|
|
|
+
|
|
|
+ def merge_entries(entries):
|
|
|
+ merged = []
|
|
|
+ for x, v in entries:
|
|
|
+ if merged and abs(merged[-1][0] - x) < 0.001:
|
|
|
+ merged[-1] = [x, (merged[-1][1] + v) / 2]
|
|
|
+ else:
|
|
|
+ merged.append([x, v])
|
|
|
+ return merged
|
|
|
+
|
|
|
+ pan_lookup = merge_entries(pan_entries)
|
|
|
+ tilt_lookup = merge_entries(tilt_entries)
|
|
|
+
|
|
|
+ X = np.array([[1.0, p['x'], p['y']] for p in calib_points])
|
|
|
+ pans = np.array([p['pan'] for p in calib_points])
|
|
|
+ tilts = np.array([p['tilt'] for p in calib_points])
|
|
|
+ pan_params, _, _, _ = np.linalg.lstsq(X, pans, rcond=None)
|
|
|
+ tilt_params, _, _, _ = np.linalg.lstsq(X, tilts, rcond=None)
|
|
|
+
|
|
|
+ total_err = 0.0
|
|
|
+ for p in calib_points:
|
|
|
+ pred_pan = pan_params[0] + pan_params[1]*p['x'] + pan_params[2]*p['y']
|
|
|
+ pred_tilt = tilt_params[0] + tilt_params[1]*p['x'] + tilt_params[2]*p['y']
|
|
|
+ err = ((pred_pan - p['pan'])**2 + (pred_tilt - p['tilt'])**2) ** 0.5
|
|
|
+ total_err += err ** 2
|
|
|
+ rms = (total_err / len(calib_points)) ** 0.5
|
|
|
+
|
|
|
+ calibration = {
|
|
|
+ 'pan_offset': float(pan_params[0]),
|
|
|
+ 'pan_scale_x': float(pan_params[1]),
|
|
|
+ 'pan_scale_y': float(pan_params[2]),
|
|
|
+ 'tilt_offset': float(tilt_params[0]),
|
|
|
+ 'tilt_scale_x': float(tilt_params[1]),
|
|
|
+ 'tilt_scale_y': float(tilt_params[2]),
|
|
|
+ 'rms_error': float(rms),
|
|
|
+ 'pan_rms_error': 0.0,
|
|
|
+ 'tilt_rms_error': 0.0,
|
|
|
+ 'pan_lookup': pan_lookup,
|
|
|
+ 'tilt_lookup': tilt_lookup,
|
|
|
+ 'overlap_ranges': [{
|
|
|
+ 'pan_start': 180.0,
|
|
|
+ 'pan_end': 340.0,
|
|
|
+ 'tilt_start': -35.0,
|
|
|
+ 'tilt_end': 45.0,
|
|
|
+ }],
|
|
|
+ 'mount_type': 'wall',
|
|
|
+ 'tilt_flip': False,
|
|
|
+ 'pan_flip': False,
|
|
|
+ 'generated_from': 'manual_interactive_calibration',
|
|
|
+ 'note': 'x_ratio,y_ratio 为全景图归一化坐标;transform 输入归一化坐标',
|
|
|
+ 'calib_points': calib_points,
|
|
|
+ }
|
|
|
+
|
|
|
+ with open(output_path, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(calibration, f, indent=2, ensure_ascii=False)
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+# Web UI HTML
|
|
|
+INDEX_HTML = r"""<!DOCTYPE html>
|
|
|
+<html lang="zh-CN">
|
|
|
+<head>
|
|
|
+ <meta charset="utf-8">
|
|
|
+ <meta name="viewport" content="width=device-width, initial-scale=1">
|
|
|
+ <title>PTZ 手动标定</title>
|
|
|
+ <style>
|
|
|
+ * { box-sizing: border-box; }
|
|
|
+ body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Arial, sans-serif; margin: 0; padding: 16px; background: #0f172a; color: #e2e8f0; }
|
|
|
+ h1 { margin: 0 0 12px; font-size: 22px; color: #38bdf8; }
|
|
|
+ .status { background: #1e293b; border-radius: 10px; padding: 14px 18px; margin-bottom: 16px; }
|
|
|
+ .status-row { display: flex; flex-wrap: wrap; gap: 18px; }
|
|
|
+ .status-item { font-size: 15px; }
|
|
|
+ .status-item span { color: #94a3b8; }
|
|
|
+ .images { display: grid; grid-template-columns: minmax(320px, 1fr) minmax(320px, 1fr); gap: 16px; margin-bottom: 16px; }
|
|
|
+ .img-box { background: #1e293b; border-radius: 10px; padding: 12px; }
|
|
|
+ .img-box h3 { margin: 0 0 10px; font-size: 15px; color: #94a3b8; }
|
|
|
+ .img-box img { width: 100%; border-radius: 6px; background: #0b1120; }
|
|
|
+ .controls { background: #1e293b; border-radius: 10px; padding: 16px; }
|
|
|
+ .control-group { margin-bottom: 14px; }
|
|
|
+ .control-group label { display: block; font-size: 13px; color: #94a3b8; margin-bottom: 8px; }
|
|
|
+ .btn-row { display: flex; flex-wrap: wrap; gap: 8px; }
|
|
|
+ button { border: none; border-radius: 6px; padding: 10px 16px; font-size: 14px; cursor: pointer; transition: transform .05s; }
|
|
|
+ button:active { transform: scale(0.96); }
|
|
|
+ .btn-pan { background: #3b82f6; color: white; }
|
|
|
+ .btn-tilt { background: #8b5cf6; color: white; }
|
|
|
+ .btn-ok { background: #22c55e; color: white; font-weight: bold; padding: 12px 28px; }
|
|
|
+ .btn-skip { background: #f59e0b; color: white; }
|
|
|
+ .btn-quit { background: #ef4444; color: white; }
|
|
|
+ .message { margin-top: 12px; padding: 10px; border-radius: 6px; background: #0f172a; font-size: 14px; min-height: 20px; }
|
|
|
+ .message.waiting { color: #facc15; }
|
|
|
+ .message.done { color: #4ade80; }
|
|
|
+ @media (max-width: 900px) { .images { grid-template-columns: 1fr; } }
|
|
|
+ </style>
|
|
|
+</head>
|
|
|
+<body>
|
|
|
+ <h1>🔭 PTZ-全景手动标定</h1>
|
|
|
+ <div class="status">
|
|
|
+ <div class="status-row">
|
|
|
+ <div class="status-item"><span>当前点:</span><strong id="point">- / -</strong></div>
|
|
|
+ <div class="status-item"><span>PAN:</span><strong id="pan">-</strong>°</div>
|
|
|
+ <div class="status-item"><span>TILT:</span><strong id="tilt">-</strong>°</div>
|
|
|
+ <div class="status-item"><span>状态:</span><strong id="status">连接中...</strong></div>
|
|
|
+ </div>
|
|
|
+ </div>
|
|
|
+
|
|
|
+ <div class="images">
|
|
|
+ <div class="img-box">
|
|
|
+ <h3>实时 PTZ 画面</h3>
|
|
|
+ <img id="ptz-img" src="ptz_shots/latest_ptz.jpg" alt="ptz">
|
|
|
+ </div>
|
|
|
+ <div class="img-box">
|
|
|
+ <h3>全景标定点标记</h3>
|
|
|
+ <img id="pano-img" src="panorama_marks.jpg" alt="panorama">
|
|
|
+ </div>
|
|
|
+ </div>
|
|
|
+
|
|
|
+ <div class="controls">
|
|
|
+ <div class="control-group">
|
|
|
+ <label>水平方向 (Pan)</label>
|
|
|
+ <div class="btn-row">
|
|
|
+ <button class="btn-pan" onclick="sendCmd('p-10')">← 10°</button>
|
|
|
+ <button class="btn-pan" onclick="sendCmd('p-5')">← 5°</button>
|
|
|
+ <button class="btn-pan" onclick="sendCmd('p-3')">← 3°</button>
|
|
|
+ <button class="btn-pan" onclick="sendCmd('p+3')">3° →</button>
|
|
|
+ <button class="btn-pan" onclick="sendCmd('p+5')">5° →</button>
|
|
|
+ <button class="btn-pan" onclick="sendCmd('p+10')">10° →</button>
|
|
|
+ </div>
|
|
|
+ </div>
|
|
|
+ <div class="control-group">
|
|
|
+ <label>垂直方向 (Tilt)</label>
|
|
|
+ <div class="btn-row">
|
|
|
+ <button class="btn-tilt" onclick="sendCmd('t-10')">↓ 10°</button>
|
|
|
+ <button class="btn-tilt" onclick="sendCmd('t-5')">↓ 5°</button>
|
|
|
+ <button class="btn-tilt" onclick="sendCmd('t-3')">↓ 3°</button>
|
|
|
+ <button class="btn-tilt" onclick="sendCmd('t+3')">↑ 3°</button>
|
|
|
+ <button class="btn-tilt" onclick="sendCmd('t+5')">↑ 5°</button>
|
|
|
+ <button class="btn-tilt" onclick="sendCmd('t+10')">↑ 10°</button>
|
|
|
+ </div>
|
|
|
+ </div>
|
|
|
+ <div class="control-group">
|
|
|
+ <label>操作</label>
|
|
|
+ <div class="btn-row">
|
|
|
+ <button class="btn-ok" onclick="sendCmd('ok')">✓ 确认该点</button>
|
|
|
+ <button class="btn-skip" onclick="sendCmd('skip')">跳过该点</button>
|
|
|
+ <button class="btn-quit" onclick="sendCmd('quit')">退出标定</button>
|
|
|
+ </div>
|
|
|
+ </div>
|
|
|
+ <div class="message" id="message">等待服务器连接...</div>
|
|
|
+ </div>
|
|
|
+
|
|
|
+ <script>
|
|
|
+ function sendCmd(cmd) {
|
|
|
+ fetch('/api/command', {
|
|
|
+ method: 'POST',
|
|
|
+ headers: {'Content-Type': 'application/json'},
|
|
|
+ body: JSON.stringify({command: cmd})
|
|
|
+ }).then(r => r.json()).then(data => {
|
|
|
+ updateStatus(data);
|
|
|
+ }).catch(err => {
|
|
|
+ document.getElementById('message').textContent = '发送失败: ' + err;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ function updateStatus(data) {
|
|
|
+ document.getElementById('point').textContent = data.point_idx + ' / ' + data.total_points;
|
|
|
+ document.getElementById('pan').textContent = data.pan.toFixed(1);
|
|
|
+ document.getElementById('tilt').textContent = data.tilt.toFixed(1);
|
|
|
+ document.getElementById('status').textContent = data.status_text;
|
|
|
+ document.getElementById('message').textContent = data.message;
|
|
|
+ document.getElementById('message').className = 'message ' + (data.status === 'waiting' ? 'waiting' : '');
|
|
|
+ const ts = data.image_ts || Date.now();
|
|
|
+ document.getElementById('ptz-img').src = 'ptz_shots/latest_ptz.jpg?t=' + ts;
|
|
|
+ }
|
|
|
+
|
|
|
+ async function poll() {
|
|
|
+ try {
|
|
|
+ const res = await fetch('/api/status');
|
|
|
+ const data = await res.json();
|
|
|
+ updateStatus(data);
|
|
|
+ } catch (e) {
|
|
|
+ document.getElementById('message').textContent = '状态获取失败: ' + e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ setInterval(poll, 1200);
|
|
|
+ setInterval(() => {
|
|
|
+ const img = document.getElementById('ptz-img');
|
|
|
+ img.src = 'ptz_shots/latest_ptz.jpg?t=' + Date.now();
|
|
|
+ }, 1500);
|
|
|
+ poll();
|
|
|
+ </script>
|
|
|
+</body>
|
|
|
+</html>
|
|
|
+"""
|
|
|
+
|
|
|
+
|
|
|
+class CalibHTTPHandler(http.server.SimpleHTTPRequestHandler):
|
|
|
+ """自定义 HTTP Handler,支持 API 路由"""
|
|
|
+
|
|
|
+ def __init__(self, *args, directory=None, **kwargs):
|
|
|
+ self.directory = directory
|
|
|
+ super().__init__(*args, directory=str(directory), **kwargs)
|
|
|
+
|
|
|
+ def log_message(self, format, *args):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def do_GET(self):
|
|
|
+ parsed = urlparse(self.path)
|
|
|
+ path = parsed.path
|
|
|
+
|
|
|
+ if path == '/' or path == '/index.html':
|
|
|
+ self.send_html(INDEX_HTML)
|
|
|
+ elif path == '/api/status':
|
|
|
+ self.send_json(calib_state)
|
|
|
+ else:
|
|
|
+ # 静态文件(图片等)
|
|
|
+ super().do_GET()
|
|
|
+
|
|
|
+ def do_POST(self):
|
|
|
+ parsed = urlparse(self.path)
|
|
|
+ if parsed.path == '/api/command':
|
|
|
+ content_length = int(self.headers.get('Content-Length', 0))
|
|
|
+ body = self.rfile.read(content_length).decode('utf-8')
|
|
|
+ try:
|
|
|
+ data = json.loads(body)
|
|
|
+ cmd = data.get('command', '').strip().lower()
|
|
|
+ if cmd:
|
|
|
+ cmd_queue.put(cmd)
|
|
|
+ calib_state['message'] = f'收到指令: {cmd},执行中...'
|
|
|
+ self.send_json({'ok': True, 'command': cmd})
|
|
|
+ except Exception as e:
|
|
|
+ self.send_json({'ok': False, 'error': str(e)})
|
|
|
+ else:
|
|
|
+ self.send_response(404)
|
|
|
+ self.end_headers()
|
|
|
+
|
|
|
+ def send_html(self, html: str):
|
|
|
+ self.send_response(200)
|
|
|
+ self.send_header('Content-Type', 'text/html; charset=utf-8')
|
|
|
+ self.send_header('Cache-Control', 'no-cache')
|
|
|
+ self.end_headers()
|
|
|
+ self.wfile.write(html.encode('utf-8'))
|
|
|
+
|
|
|
+ def send_json(self, data: dict):
|
|
|
+ self.send_response(200)
|
|
|
+ self.send_header('Content-Type', 'application/json; charset=utf-8')
|
|
|
+ self.send_header('Cache-Control', 'no-cache')
|
|
|
+ self.end_headers()
|
|
|
+ self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8'))
|
|
|
+
|
|
|
+
|
|
|
+def start_http_server(directory: Path, port: int = 8000):
|
|
|
+ """启动后台 HTTP 服务"""
|
|
|
+ def handler_factory(*args, **kwargs):
|
|
|
+ return CalibHTTPHandler(*args, directory=directory, **kwargs)
|
|
|
+
|
|
|
+ socketserver.TCPServer.allow_reuse_address = True
|
|
|
+ httpd = socketserver.TCPServer(("", port), handler_factory)
|
|
|
+ httpd.allow_reuse_address = True
|
|
|
+ thread = threading.Thread(target=httpd.serve_forever, daemon=True)
|
|
|
+ thread.start()
|
|
|
+ return httpd
|
|
|
+
|
|
|
+
|
|
|
+def update_state(point_idx=0, total_points=0, pan=0.0, tilt=0.0, zoom=1,
|
|
|
+ note='', status='initializing', message=''):
|
|
|
+ calib_state['point_idx'] = point_idx
|
|
|
+ calib_state['total_points'] = total_points
|
|
|
+ calib_state['pan'] = pan
|
|
|
+ calib_state['tilt'] = tilt
|
|
|
+ calib_state['zoom'] = zoom
|
|
|
+ calib_state['note'] = note
|
|
|
+ calib_state['status'] = status
|
|
|
+ calib_state['image_ts'] = int(time.time() * 1000)
|
|
|
+
|
|
|
+ status_map = {
|
|
|
+ 'initializing': '初始化',
|
|
|
+ 'moving': '移动中',
|
|
|
+ 'waiting': '等待确认',
|
|
|
+ 'finished': '已完成',
|
|
|
+ 'error': '错误',
|
|
|
+ }
|
|
|
+ calib_state['status_text'] = status_map.get(status, status)
|
|
|
+ calib_state['message'] = message
|
|
|
+
|
|
|
+
|
|
|
+def wait_for_command(timeout: float = 0.5) -> str:
|
|
|
+ """从队列等待命令,支持优雅退出"""
|
|
|
+ while calib_state['running']:
|
|
|
+ try:
|
|
|
+ return cmd_queue.get(timeout=timeout)
|
|
|
+ except queue.Empty:
|
|
|
+ continue
|
|
|
+ return 'quit'
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ parser = argparse.ArgumentParser()
|
|
|
+ parser.add_argument('--points', default='points.txt', help='标定点文件')
|
|
|
+ parser.add_argument('--output', default='/home/admin/dsh/calibration_scan_180_360_z1/manual_calib', help='输出目录')
|
|
|
+ parser.add_argument('--stabilize', type=float, default=1.5, help='移动后稳定秒数')
|
|
|
+ parser.add_argument('--http-port', type=int, default=8000, help='HTTP 预览服务端口')
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ calib_state['http_port'] = args.http_port
|
|
|
+
|
|
|
+ output_dir = Path(args.output)
|
|
|
+ output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+ shots_dir = output_dir / 'ptz_shots'
|
|
|
+ shots_dir.mkdir(exist_ok=True)
|
|
|
+ latest_path = shots_dir / 'latest_ptz.jpg'
|
|
|
+
|
|
|
+ # 加载标定点
|
|
|
+ points = load_points(args.points)
|
|
|
+ if not points:
|
|
|
+ print(f"未找到 {args.points},使用默认标定点")
|
|
|
+ points = [
|
|
|
+ {'x': 0.2, 'y': 0.5, 'note': '左侧'},
|
|
|
+ {'x': 0.4, 'y': 0.5, 'note': '左中'},
|
|
|
+ {'x': 0.6, 'y': 0.5, 'note': '右中'},
|
|
|
+ {'x': 0.8, 'y': 0.5, 'note': '右侧'},
|
|
|
+ {'x': 0.5, 'y': 0.3, 'note': '中上'},
|
|
|
+ {'x': 0.5, 'y': 0.7, 'note': '中下'},
|
|
|
+ {'x': 0.5, 'y': 0.9, 'note': '底部'},
|
|
|
+ ]
|
|
|
+ with open(output_dir / 'points_default.txt', 'w') as f:
|
|
|
+ for p in points:
|
|
|
+ f.write(f"{p['x']} {p['y']} {p['note']}\n")
|
|
|
+ else:
|
|
|
+ print(f"从 {args.points} 加载了 {len(points)} 个标定点")
|
|
|
+
|
|
|
+ update_state(total_points=len(points), message='抓取全景图...')
|
|
|
+
|
|
|
+ # 抓取全景图
|
|
|
+ print("抓取全景图...")
|
|
|
+ pano_url = 'rtsp://admin:Aa1234567@192.168.20.196:554/cam/realmonitor?channel=1&subtype=0'
|
|
|
+ cap = cv2.VideoCapture(pano_url, cv2.CAP_FFMPEG)
|
|
|
+ cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
|
+ panorama = None
|
|
|
+ for _ in range(30):
|
|
|
+ ret, frame = cap.read()
|
|
|
+ if ret and frame is not None:
|
|
|
+ panorama = frame
|
|
|
+ break
|
|
|
+ time.sleep(0.1)
|
|
|
+ cap.release()
|
|
|
+ if panorama is None:
|
|
|
+ print("无法获取全景图")
|
|
|
+ update_state(status='error', message='无法获取全景图')
|
|
|
+ return 1
|
|
|
+ cv2.imwrite(str(output_dir / 'panorama.jpg'), panorama)
|
|
|
+ save_panorama_with_marks(panorama, points, output_dir / 'panorama_marks.jpg')
|
|
|
+
|
|
|
+ # 启动 HTTP 服务
|
|
|
+ try:
|
|
|
+ httpd = start_http_server(output_dir, port=args.http_port)
|
|
|
+ print(f"\nHTTP 预览服务已启动: http://<rk3588-ip>:{args.http_port}/")
|
|
|
+ except OSError as e:
|
|
|
+ print(f"启动 HTTP 服务失败 (端口 {args.http_port}): {e}")
|
|
|
+ httpd = None
|
|
|
+ return 1
|
|
|
+
|
|
|
+ # 初始化 SDK
|
|
|
+ sdk = DahuaSDK(lib_path='/home/admin/dsh/dh/arm/Bin/libdhnetsdk.so')
|
|
|
+ if not sdk.init():
|
|
|
+ print("SDK 初始化失败")
|
|
|
+ return 1
|
|
|
+
|
|
|
+ try:
|
|
|
+ ptz_config = {
|
|
|
+ 'ip': '192.168.20.197',
|
|
|
+ 'port': 37777,
|
|
|
+ 'rtsp_port': 554,
|
|
|
+ 'username': 'admin',
|
|
|
+ 'password': 'Aa1234567',
|
|
|
+ 'channel': 0,
|
|
|
+ 'rtsp_url': 'rtsp://admin:Aa1234567@192.168.20.197:554/cam/realmonitor?channel=1&subtype=1',
|
|
|
+ }
|
|
|
+ ptz = PTZCamera(sdk, ptz_config)
|
|
|
+ if not ptz.connect():
|
|
|
+ print("连接球机失败")
|
|
|
+ return 1
|
|
|
+ if not ptz.start_stream_rtsp():
|
|
|
+ print("启动球机 RTSP 失败")
|
|
|
+ return 1
|
|
|
+
|
|
|
+ calib_points = []
|
|
|
+
|
|
|
+ for i, p in enumerate(points):
|
|
|
+ if not calib_state['running']:
|
|
|
+ break
|
|
|
+
|
|
|
+ print(f"\n标定点 {i+1}/{len(points)}: ({p['x']:.3f}, {p['y']:.3f}) {p.get('note', '')}")
|
|
|
+
|
|
|
+ init_pan, init_tilt = estimate_initial_ptz(p['x'], p['y'])
|
|
|
+ pan, tilt = init_pan, init_tilt
|
|
|
+
|
|
|
+ update_state(point_idx=i+1, total_points=len(points), pan=pan, tilt=tilt,
|
|
|
+ note=p.get('note', ''), status='moving',
|
|
|
+ message=f'移动到 P{i+1} 初始位置...')
|
|
|
+
|
|
|
+ while True:
|
|
|
+ if not calib_state['running']:
|
|
|
+ break
|
|
|
+
|
|
|
+ update_state(point_idx=i+1, total_points=len(points), pan=pan, tilt=tilt,
|
|
|
+ note=p.get('note', ''), status='moving',
|
|
|
+ message=f'移动到 pan={pan:.1f}, tilt={tilt:.1f}...')
|
|
|
+ print(f"移动到 pan={pan:.1f}, tilt={tilt:.1f}, zoom=1")
|
|
|
+ if not ptz.goto_exact_position(pan, tilt, 1):
|
|
|
+ print("移动失败,重试...")
|
|
|
+ time.sleep(1)
|
|
|
+ continue
|
|
|
+ time.sleep(args.stabilize)
|
|
|
+
|
|
|
+ frame = capture_ptz_frame(ptz)
|
|
|
+ if frame is None:
|
|
|
+ print("抓拍失败,重试...")
|
|
|
+ continue
|
|
|
+
|
|
|
+ save_ptz_shot(frame, shots_dir, latest_path, i + 1, pan, tilt)
|
|
|
+ update_state(point_idx=i+1, total_points=len(points), pan=pan, tilt=tilt,
|
|
|
+ note=p.get('note', ''), status='waiting',
|
|
|
+ message=f'P{i+1} 已就位,请在浏览器调整并确认')
|
|
|
+ print("已抓拍,等待 Web UI 指令...")
|
|
|
+
|
|
|
+ cmd = wait_for_command()
|
|
|
+ print(f"收到指令: {cmd}")
|
|
|
+
|
|
|
+ if cmd == 'ok':
|
|
|
+ calib_points.append({
|
|
|
+ 'x': p['x'],
|
|
|
+ 'y': p['y'],
|
|
|
+ 'pan': pan,
|
|
|
+ 'tilt': tilt,
|
|
|
+ 'zoom': 1,
|
|
|
+ 'note': p.get('note', '')
|
|
|
+ })
|
|
|
+ update_state(point_idx=i+1, total_points=len(points), pan=pan, tilt=tilt,
|
|
|
+ note=p.get('note', ''), status='moving',
|
|
|
+ message=f'P{i+1} 已记录,进入下一点')
|
|
|
+ print(f"记录标定点: ({p['x']:.3f}, {p['y']:.3f}) -> ({pan:.1f}, {tilt:.1f})")
|
|
|
+ time.sleep(0.5)
|
|
|
+ break
|
|
|
+ elif cmd == 'skip':
|
|
|
+ update_state(point_idx=i+1, total_points=len(points), pan=pan, tilt=tilt,
|
|
|
+ note=p.get('note', ''), status='moving',
|
|
|
+ message=f'跳过 P{i+1},进入下一点')
|
|
|
+ print("跳过这个点")
|
|
|
+ break
|
|
|
+ elif cmd == 'quit':
|
|
|
+ print("退出标定")
|
|
|
+ calib_state['running'] = False
|
|
|
+ raise KeyboardInterrupt
|
|
|
+ elif cmd.startswith('p') or cmd.startswith('t'):
|
|
|
+ try:
|
|
|
+ delta = float(cmd[1:])
|
|
|
+ if cmd.startswith('p'):
|
|
|
+ pan += delta
|
|
|
+ pan = pan % 360
|
|
|
+ else:
|
|
|
+ tilt += delta
|
|
|
+ tilt = max(-90, min(90, tilt))
|
|
|
+ update_state(point_idx=i+1, total_points=len(points), pan=pan, tilt=tilt,
|
|
|
+ note=p.get('note', ''), status='moving',
|
|
|
+ message=f'调整至 pan={pan:.1f}, tilt={tilt:.1f}')
|
|
|
+ print(f"调整: pan={pan:.1f}, tilt={tilt:.1f}")
|
|
|
+ except ValueError:
|
|
|
+ update_state(message='指令格式错误,请使用按钮')
|
|
|
+ print("指令格式错误")
|
|
|
+ else:
|
|
|
+ update_state(message=f'未知指令: {cmd}')
|
|
|
+ print("未知指令")
|
|
|
+
|
|
|
+ # 保存中间结果
|
|
|
+ with open(output_dir / 'manual_points.json', 'w', encoding='utf-8') as f:
|
|
|
+ json.dump({
|
|
|
+ 'points': calib_points,
|
|
|
+ 'time': datetime.now().isoformat(),
|
|
|
+ }, f, indent=2, ensure_ascii=False)
|
|
|
+
|
|
|
+ # 生成校准文件
|
|
|
+ if len(calib_points) >= 4:
|
|
|
+ generate_calibration(calib_points, output_dir / 'calibration_group1_manual.json')
|
|
|
+ update_state(status='finished',
|
|
|
+ message=f'标定完成,{len(calib_points)} 个有效点,已生成校准文件')
|
|
|
+ print(f"\n标定完成,共 {len(calib_points)} 个有效点")
|
|
|
+ print(f"校准文件: {output_dir / 'calibration_group1_manual.json'}")
|
|
|
+ else:
|
|
|
+ update_state(status='error',
|
|
|
+ message=f'有效标定点不足 ({len(calib_points)} 个)')
|
|
|
+ print(f"\n有效标定点不足 ({len(calib_points)} 个),未生成校准文件")
|
|
|
+
|
|
|
+ ptz.stop_stream()
|
|
|
+ ptz.disconnect()
|
|
|
+
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ print("\n标定被中断")
|
|
|
+ finally:
|
|
|
+ sdk.cleanup()
|
|
|
+ if httpd is not None:
|
|
|
+ httpd.shutdown()
|
|
|
+ print("HTTP 服务已关闭")
|
|
|
+
|
|
|
+ return 0
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ sys.exit(main())
|