| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642 |
- #!/usr/bin/env python3
- """
- 交互式手动标定脚本(Web UI 版)
- 流程:
- 1. 用户在 panorama.jpg 上选择标定点,写入 points.txt(每行:x_ratio y_ratio [备注])
- 2. 脚本抓取全景图、启动 Web UI
- 3. 脚本控制球机移动到每个点的初始估计位置并抓拍
- 4. 用户在浏览器点击按钮调整 pan/tilt,确认后进入下一点
- 5. 最终生成 calibration_group1_manual.json
- 用法:
- python manual_calibrate.py --points points.txt --output manual_calib --http-port 8000
- """
- import os
- import sys
- import json
- import time
- import argparse
- import http.server
- import socketserver
- import threading
- import queue
- from pathlib import Path
- from datetime import datetime
- from urllib.parse import urlparse, parse_qs
- # 必须在导入 cv2 前设置
- os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;tcp|threads;1'
- import cv2
- import numpy as np
- sys.path.insert(0, '/home/admin/dsh/dual_camera_system')
- from dahua_sdk import DahuaSDK
- from ptz_camera import PTZCamera
- # 全局状态
- calib_state = {
- 'running': True,
- 'point_idx': 0,
- 'total_points': 0,
- 'pan': 0.0,
- 'tilt': 0.0,
- 'zoom': 1,
- 'note': '',
- 'status': 'initializing', # initializing, moving, waiting, finished, error
- 'message': '初始化中...',
- 'image_ts': 0,
- 'calib_points': [],
- 'http_port': 8000,
- }
- cmd_queue = queue.Queue()
- def load_points(points_path: str) -> list:
- """从文件加载标定点,格式:每行 x_ratio y_ratio [备注]"""
- points = []
- if not os.path.exists(points_path):
- return points
- with open(points_path, 'r', encoding='utf-8') as f:
- for line in f:
- line = line.strip()
- if not line or line.startswith('#'):
- continue
- parts = line.split()
- if len(parts) >= 2:
- x = float(parts[0])
- y = float(parts[1])
- note = ' '.join(parts[2:]) if len(parts) > 2 else ''
- points.append({'x': x, 'y': y, 'note': note})
- return points
- def estimate_initial_ptz(x_ratio: float, y_ratio: float) -> tuple:
- """用粗略线性模型估计初始 PTZ 角度"""
- pan = 340.0 - 160.0 * x_ratio
- tilt = 25.0 - 40.0 * y_ratio
- return pan, tilt
- def capture_ptz_frame(ptz: PTZCamera, attempts: int = 15) -> np.ndarray:
- """抓取清晰的 PTZ 帧"""
- best_frame = None
- best_var = 0
- for _ in range(attempts):
- frame = ptz.get_frame()
- if frame is not None:
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
- var = cv2.Laplacian(gray, cv2.CV_64F).var()
- if var > best_var:
- best_var = var
- best_frame = frame
- if best_var > 200:
- break
- time.sleep(0.05)
- return best_frame
- def save_panorama_with_marks(panorama: np.ndarray, points: list, output_path: Path):
- """在全景图上标记所有标定点"""
- vis = panorama.copy()
- h, w = vis.shape[:2]
- for i, p in enumerate(points):
- cx, cy = int(p['x'] * w), int(p['y'] * h)
- cv2.circle(vis, (cx, cy), 30, (0, 0, 255), 4)
- cv2.putText(vis, f"P{i+1}", (cx + 40, cy),
- cv2.FONT_HERSHEY_SIMPLEX, 2.0, (0, 255, 0), 4)
- cv2.imwrite(str(output_path), vis)
- return output_path
- def save_ptz_shot(frame: np.ndarray, shots_dir: Path, latest_path: Path,
- point_idx: int, pan: float, tilt: float):
- """保存 PTZ 历史图和 latest_ptz.jpg"""
- history_path = shots_dir / f'P{point_idx:02d}_p{int(pan)}_t{int(tilt)}.jpg'
- cv2.imwrite(str(history_path), frame)
- cv2.imwrite(str(latest_path), frame)
- return history_path
- def generate_calibration(calib_points: list, output_path: Path):
- """从标定点生成 calibration_group1.json"""
- if len(calib_points) < 4:
- print(f"标定点不足 ({len(calib_points)} 个),无法生成校准文件")
- return False
- pan_entries = sorted([[p['x'], float(p['pan'])] for p in calib_points], key=lambda item: item[0])
- tilt_entries = sorted([[p['y'], float(p['tilt'])] for p in calib_points], key=lambda item: item[0])
- def merge_entries(entries):
- merged = []
- for x, v in entries:
- if merged and abs(merged[-1][0] - x) < 0.001:
- merged[-1] = [x, (merged[-1][1] + v) / 2]
- else:
- merged.append([x, v])
- return merged
- pan_lookup = merge_entries(pan_entries)
- tilt_lookup = merge_entries(tilt_entries)
- X = np.array([[1.0, p['x'], p['y']] for p in calib_points])
- pans = np.array([p['pan'] for p in calib_points])
- tilts = np.array([p['tilt'] for p in calib_points])
- pan_params, _, _, _ = np.linalg.lstsq(X, pans, rcond=None)
- tilt_params, _, _, _ = np.linalg.lstsq(X, tilts, rcond=None)
- total_err = 0.0
- for p in calib_points:
- pred_pan = pan_params[0] + pan_params[1]*p['x'] + pan_params[2]*p['y']
- pred_tilt = tilt_params[0] + tilt_params[1]*p['x'] + tilt_params[2]*p['y']
- err = ((pred_pan - p['pan'])**2 + (pred_tilt - p['tilt'])**2) ** 0.5
- total_err += err ** 2
- rms = (total_err / len(calib_points)) ** 0.5
- calibration = {
- 'pan_offset': float(pan_params[0]),
- 'pan_scale_x': float(pan_params[1]),
- 'pan_scale_y': float(pan_params[2]),
- 'tilt_offset': float(tilt_params[0]),
- 'tilt_scale_x': float(tilt_params[1]),
- 'tilt_scale_y': float(tilt_params[2]),
- 'rms_error': float(rms),
- 'pan_rms_error': 0.0,
- 'tilt_rms_error': 0.0,
- 'pan_lookup': pan_lookup,
- 'tilt_lookup': tilt_lookup,
- 'overlap_ranges': [{
- 'pan_start': 180.0,
- 'pan_end': 340.0,
- 'tilt_start': -35.0,
- 'tilt_end': 45.0,
- }],
- 'mount_type': 'wall',
- 'tilt_flip': False,
- 'pan_flip': False,
- 'generated_from': 'manual_interactive_calibration',
- 'note': 'x_ratio,y_ratio 为全景图归一化坐标;transform 输入归一化坐标',
- 'calib_points': calib_points,
- }
- with open(output_path, 'w', encoding='utf-8') as f:
- json.dump(calibration, f, indent=2, ensure_ascii=False)
- return True
- # Web UI HTML
- INDEX_HTML = r"""<!DOCTYPE html>
- <html lang="zh-CN">
- <head>
- <meta charset="utf-8">
- <meta name="viewport" content="width=device-width, initial-scale=1">
- <title>PTZ 手动标定</title>
- <style>
- * { box-sizing: border-box; }
- body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Arial, sans-serif; margin: 0; padding: 16px; background: #0f172a; color: #e2e8f0; }
- h1 { margin: 0 0 12px; font-size: 22px; color: #38bdf8; }
- .status { background: #1e293b; border-radius: 10px; padding: 14px 18px; margin-bottom: 16px; }
- .status-row { display: flex; flex-wrap: wrap; gap: 18px; }
- .status-item { font-size: 15px; }
- .status-item span { color: #94a3b8; }
- .images { display: grid; grid-template-columns: minmax(320px, 1fr) minmax(320px, 1fr); gap: 16px; margin-bottom: 16px; }
- .img-box { background: #1e293b; border-radius: 10px; padding: 12px; }
- .img-box h3 { margin: 0 0 10px; font-size: 15px; color: #94a3b8; }
- .img-box img { width: 100%; border-radius: 6px; background: #0b1120; }
- .controls { background: #1e293b; border-radius: 10px; padding: 16px; }
- .control-group { margin-bottom: 14px; }
- .control-group label { display: block; font-size: 13px; color: #94a3b8; margin-bottom: 8px; }
- .btn-row { display: flex; flex-wrap: wrap; gap: 8px; }
- button { border: none; border-radius: 6px; padding: 10px 16px; font-size: 14px; cursor: pointer; transition: transform .05s; }
- button:active { transform: scale(0.96); }
- .btn-pan { background: #3b82f6; color: white; }
- .btn-tilt { background: #8b5cf6; color: white; }
- .btn-ok { background: #22c55e; color: white; font-weight: bold; padding: 12px 28px; }
- .btn-skip { background: #f59e0b; color: white; }
- .btn-quit { background: #ef4444; color: white; }
- .message { margin-top: 12px; padding: 10px; border-radius: 6px; background: #0f172a; font-size: 14px; min-height: 20px; }
- .message.waiting { color: #facc15; }
- .message.done { color: #4ade80; }
- @media (max-width: 900px) { .images { grid-template-columns: 1fr; } }
- </style>
- </head>
- <body>
- <h1>🔭 PTZ-全景手动标定</h1>
- <div class="status">
- <div class="status-row">
- <div class="status-item"><span>当前点:</span><strong id="point">- / -</strong></div>
- <div class="status-item"><span>PAN:</span><strong id="pan">-</strong>°</div>
- <div class="status-item"><span>TILT:</span><strong id="tilt">-</strong>°</div>
- <div class="status-item"><span>状态:</span><strong id="status">连接中...</strong></div>
- </div>
- </div>
- <div class="images">
- <div class="img-box">
- <h3>实时 PTZ 画面</h3>
- <img id="ptz-img" src="ptz_shots/latest_ptz.jpg" alt="ptz">
- </div>
- <div class="img-box">
- <h3>全景标定点标记</h3>
- <img id="pano-img" src="panorama_marks.jpg" alt="panorama">
- </div>
- </div>
- <div class="controls">
- <div class="control-group">
- <label>水平方向 (Pan)</label>
- <div class="btn-row">
- <button class="btn-pan" onclick="sendCmd('p-10')">← 10°</button>
- <button class="btn-pan" onclick="sendCmd('p-5')">← 5°</button>
- <button class="btn-pan" onclick="sendCmd('p-3')">← 3°</button>
- <button class="btn-pan" onclick="sendCmd('p+3')">3° →</button>
- <button class="btn-pan" onclick="sendCmd('p+5')">5° →</button>
- <button class="btn-pan" onclick="sendCmd('p+10')">10° →</button>
- </div>
- </div>
- <div class="control-group">
- <label>垂直方向 (Tilt)</label>
- <div class="btn-row">
- <button class="btn-tilt" onclick="sendCmd('t-10')">↓ 10°</button>
- <button class="btn-tilt" onclick="sendCmd('t-5')">↓ 5°</button>
- <button class="btn-tilt" onclick="sendCmd('t-3')">↓ 3°</button>
- <button class="btn-tilt" onclick="sendCmd('t+3')">↑ 3°</button>
- <button class="btn-tilt" onclick="sendCmd('t+5')">↑ 5°</button>
- <button class="btn-tilt" onclick="sendCmd('t+10')">↑ 10°</button>
- </div>
- </div>
- <div class="control-group">
- <label>操作</label>
- <div class="btn-row">
- <button class="btn-ok" onclick="sendCmd('ok')">✓ 确认该点</button>
- <button class="btn-skip" onclick="sendCmd('skip')">跳过该点</button>
- <button class="btn-quit" onclick="sendCmd('quit')">退出标定</button>
- </div>
- </div>
- <div class="message" id="message">等待服务器连接...</div>
- </div>
- <script>
- function sendCmd(cmd) {
- fetch('/api/command', {
- method: 'POST',
- headers: {'Content-Type': 'application/json'},
- body: JSON.stringify({command: cmd})
- }).then(r => r.json()).then(data => {
- updateStatus(data);
- }).catch(err => {
- document.getElementById('message').textContent = '发送失败: ' + err;
- });
- }
- function updateStatus(data) {
- document.getElementById('point').textContent = data.point_idx + ' / ' + data.total_points;
- document.getElementById('pan').textContent = data.pan.toFixed(1);
- document.getElementById('tilt').textContent = data.tilt.toFixed(1);
- document.getElementById('status').textContent = data.status_text;
- document.getElementById('message').textContent = data.message;
- document.getElementById('message').className = 'message ' + (data.status === 'waiting' ? 'waiting' : '');
- const ts = data.image_ts || Date.now();
- document.getElementById('ptz-img').src = 'ptz_shots/latest_ptz.jpg?t=' + ts;
- }
- async function poll() {
- try {
- const res = await fetch('/api/status');
- const data = await res.json();
- updateStatus(data);
- } catch (e) {
- document.getElementById('message').textContent = '状态获取失败: ' + e;
- }
- }
- setInterval(poll, 1200);
- setInterval(() => {
- const img = document.getElementById('ptz-img');
- img.src = 'ptz_shots/latest_ptz.jpg?t=' + Date.now();
- }, 1500);
- poll();
- </script>
- </body>
- </html>
- """
- class CalibHTTPHandler(http.server.SimpleHTTPRequestHandler):
- """自定义 HTTP Handler,支持 API 路由"""
- def __init__(self, *args, directory=None, **kwargs):
- self.directory = directory
- super().__init__(*args, directory=str(directory), **kwargs)
- def log_message(self, format, *args):
- pass
- def do_GET(self):
- parsed = urlparse(self.path)
- path = parsed.path
- if path == '/' or path == '/index.html':
- self.send_html(INDEX_HTML)
- elif path == '/api/status':
- self.send_json(calib_state)
- else:
- # 静态文件(图片等)
- super().do_GET()
- def do_POST(self):
- parsed = urlparse(self.path)
- if parsed.path == '/api/command':
- content_length = int(self.headers.get('Content-Length', 0))
- body = self.rfile.read(content_length).decode('utf-8')
- try:
- data = json.loads(body)
- cmd = data.get('command', '').strip().lower()
- if cmd:
- cmd_queue.put(cmd)
- calib_state['message'] = f'收到指令: {cmd},执行中...'
- self.send_json({'ok': True, 'command': cmd})
- except Exception as e:
- self.send_json({'ok': False, 'error': str(e)})
- else:
- self.send_response(404)
- self.end_headers()
- def send_html(self, html: str):
- self.send_response(200)
- self.send_header('Content-Type', 'text/html; charset=utf-8')
- self.send_header('Cache-Control', 'no-cache')
- self.end_headers()
- self.wfile.write(html.encode('utf-8'))
- def send_json(self, data: dict):
- self.send_response(200)
- self.send_header('Content-Type', 'application/json; charset=utf-8')
- self.send_header('Cache-Control', 'no-cache')
- self.end_headers()
- self.wfile.write(json.dumps(data, ensure_ascii=False).encode('utf-8'))
- def start_http_server(directory: Path, port: int = 8000):
- """启动后台 HTTP 服务"""
- def handler_factory(*args, **kwargs):
- return CalibHTTPHandler(*args, directory=directory, **kwargs)
- socketserver.TCPServer.allow_reuse_address = True
- httpd = socketserver.TCPServer(("", port), handler_factory)
- httpd.allow_reuse_address = True
- thread = threading.Thread(target=httpd.serve_forever, daemon=True)
- thread.start()
- return httpd
- def update_state(point_idx=0, total_points=0, pan=0.0, tilt=0.0, zoom=1,
- note='', status='initializing', message=''):
- calib_state['point_idx'] = point_idx
- calib_state['total_points'] = total_points
- calib_state['pan'] = pan
- calib_state['tilt'] = tilt
- calib_state['zoom'] = zoom
- calib_state['note'] = note
- calib_state['status'] = status
- calib_state['image_ts'] = int(time.time() * 1000)
- status_map = {
- 'initializing': '初始化',
- 'moving': '移动中',
- 'waiting': '等待确认',
- 'finished': '已完成',
- 'error': '错误',
- }
- calib_state['status_text'] = status_map.get(status, status)
- calib_state['message'] = message
- def wait_for_command(timeout: float = 0.5) -> str:
- """从队列等待命令,支持优雅退出"""
- while calib_state['running']:
- try:
- return cmd_queue.get(timeout=timeout)
- except queue.Empty:
- continue
- return 'quit'
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument('--points', default='points.txt', help='标定点文件')
- parser.add_argument('--output', default='/home/admin/dsh/calibration_scan_180_360_z1/manual_calib', help='输出目录')
- parser.add_argument('--stabilize', type=float, default=1.5, help='移动后稳定秒数')
- parser.add_argument('--http-port', type=int, default=8000, help='HTTP 预览服务端口')
- args = parser.parse_args()
- calib_state['http_port'] = args.http_port
- output_dir = Path(args.output)
- output_dir.mkdir(parents=True, exist_ok=True)
- shots_dir = output_dir / 'ptz_shots'
- shots_dir.mkdir(exist_ok=True)
- latest_path = shots_dir / 'latest_ptz.jpg'
- # 加载标定点
- points = load_points(args.points)
- if not points:
- print(f"未找到 {args.points},使用默认标定点")
- points = [
- {'x': 0.2, 'y': 0.5, 'note': '左侧'},
- {'x': 0.4, 'y': 0.5, 'note': '左中'},
- {'x': 0.6, 'y': 0.5, 'note': '右中'},
- {'x': 0.8, 'y': 0.5, 'note': '右侧'},
- {'x': 0.5, 'y': 0.3, 'note': '中上'},
- {'x': 0.5, 'y': 0.7, 'note': '中下'},
- {'x': 0.5, 'y': 0.9, 'note': '底部'},
- ]
- with open(output_dir / 'points_default.txt', 'w') as f:
- for p in points:
- f.write(f"{p['x']} {p['y']} {p['note']}\n")
- else:
- print(f"从 {args.points} 加载了 {len(points)} 个标定点")
- update_state(total_points=len(points), message='抓取全景图...')
- # 抓取全景图
- print("抓取全景图...")
- pano_url = 'rtsp://admin:Aa1234567@192.168.20.196:554/cam/realmonitor?channel=1&subtype=0'
- cap = cv2.VideoCapture(pano_url, cv2.CAP_FFMPEG)
- cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- panorama = None
- for _ in range(30):
- ret, frame = cap.read()
- if ret and frame is not None:
- panorama = frame
- break
- time.sleep(0.1)
- cap.release()
- if panorama is None:
- print("无法获取全景图")
- update_state(status='error', message='无法获取全景图')
- return 1
- cv2.imwrite(str(output_dir / 'panorama.jpg'), panorama)
- save_panorama_with_marks(panorama, points, output_dir / 'panorama_marks.jpg')
- # 启动 HTTP 服务
- try:
- httpd = start_http_server(output_dir, port=args.http_port)
- print(f"\nHTTP 预览服务已启动: http://<rk3588-ip>:{args.http_port}/")
- except OSError as e:
- print(f"启动 HTTP 服务失败 (端口 {args.http_port}): {e}")
- httpd = None
- return 1
- # 初始化 SDK
- sdk = DahuaSDK(lib_path='/home/admin/dsh/dh/arm/Bin/libdhnetsdk.so')
- if not sdk.init():
- print("SDK 初始化失败")
- return 1
- try:
- ptz_config = {
- 'ip': '192.168.20.197',
- 'port': 37777,
- 'rtsp_port': 554,
- 'username': 'admin',
- 'password': 'Aa1234567',
- 'channel': 0,
- 'rtsp_url': 'rtsp://admin:Aa1234567@192.168.20.197:554/cam/realmonitor?channel=1&subtype=1',
- }
- ptz = PTZCamera(sdk, ptz_config)
- if not ptz.connect():
- print("连接球机失败")
- return 1
- if not ptz.start_stream_rtsp():
- print("启动球机 RTSP 失败")
- return 1
- calib_points = []
- for i, p in enumerate(points):
- if not calib_state['running']:
- break
- print(f"\n标定点 {i+1}/{len(points)}: ({p['x']:.3f}, {p['y']:.3f}) {p.get('note', '')}")
- init_pan, init_tilt = estimate_initial_ptz(p['x'], p['y'])
- pan, tilt = init_pan, init_tilt
- update_state(point_idx=i+1, total_points=len(points), pan=pan, tilt=tilt,
- note=p.get('note', ''), status='moving',
- message=f'移动到 P{i+1} 初始位置...')
- while True:
- if not calib_state['running']:
- break
- update_state(point_idx=i+1, total_points=len(points), pan=pan, tilt=tilt,
- note=p.get('note', ''), status='moving',
- message=f'移动到 pan={pan:.1f}, tilt={tilt:.1f}...')
- print(f"移动到 pan={pan:.1f}, tilt={tilt:.1f}, zoom=1")
- if not ptz.goto_exact_position(pan, tilt, 1):
- print("移动失败,重试...")
- time.sleep(1)
- continue
- time.sleep(args.stabilize)
- frame = capture_ptz_frame(ptz)
- if frame is None:
- print("抓拍失败,重试...")
- continue
- save_ptz_shot(frame, shots_dir, latest_path, i + 1, pan, tilt)
- update_state(point_idx=i+1, total_points=len(points), pan=pan, tilt=tilt,
- note=p.get('note', ''), status='waiting',
- message=f'P{i+1} 已就位,请在浏览器调整并确认')
- print("已抓拍,等待 Web UI 指令...")
- cmd = wait_for_command()
- print(f"收到指令: {cmd}")
- if cmd == 'ok':
- calib_points.append({
- 'x': p['x'],
- 'y': p['y'],
- 'pan': pan,
- 'tilt': tilt,
- 'zoom': 1,
- 'note': p.get('note', '')
- })
- update_state(point_idx=i+1, total_points=len(points), pan=pan, tilt=tilt,
- note=p.get('note', ''), status='moving',
- message=f'P{i+1} 已记录,进入下一点')
- print(f"记录标定点: ({p['x']:.3f}, {p['y']:.3f}) -> ({pan:.1f}, {tilt:.1f})")
- time.sleep(0.5)
- break
- elif cmd == 'skip':
- update_state(point_idx=i+1, total_points=len(points), pan=pan, tilt=tilt,
- note=p.get('note', ''), status='moving',
- message=f'跳过 P{i+1},进入下一点')
- print("跳过这个点")
- break
- elif cmd == 'quit':
- print("退出标定")
- calib_state['running'] = False
- raise KeyboardInterrupt
- elif cmd.startswith('p') or cmd.startswith('t'):
- try:
- delta = float(cmd[1:])
- if cmd.startswith('p'):
- pan += delta
- pan = pan % 360
- else:
- tilt += delta
- tilt = max(-90, min(90, tilt))
- update_state(point_idx=i+1, total_points=len(points), pan=pan, tilt=tilt,
- note=p.get('note', ''), status='moving',
- message=f'调整至 pan={pan:.1f}, tilt={tilt:.1f}')
- print(f"调整: pan={pan:.1f}, tilt={tilt:.1f}")
- except ValueError:
- update_state(message='指令格式错误,请使用按钮')
- print("指令格式错误")
- else:
- update_state(message=f'未知指令: {cmd}')
- print("未知指令")
- # 保存中间结果
- with open(output_dir / 'manual_points.json', 'w', encoding='utf-8') as f:
- json.dump({
- 'points': calib_points,
- 'time': datetime.now().isoformat(),
- }, f, indent=2, ensure_ascii=False)
- # 生成校准文件
- if len(calib_points) >= 4:
- generate_calibration(calib_points, output_dir / 'calibration_group1_manual.json')
- update_state(status='finished',
- message=f'标定完成,{len(calib_points)} 个有效点,已生成校准文件')
- print(f"\n标定完成,共 {len(calib_points)} 个有效点")
- print(f"校准文件: {output_dir / 'calibration_group1_manual.json'}")
- else:
- update_state(status='error',
- message=f'有效标定点不足 ({len(calib_points)} 个)')
- print(f"\n有效标定点不足 ({len(calib_points)} 个),未生成校准文件")
- ptz.stop_stream()
- ptz.disconnect()
- except KeyboardInterrupt:
- print("\n标定被中断")
- finally:
- sdk.cleanup()
- if httpd is not None:
- httpd.shutdown()
- print("HTTP 服务已关闭")
- return 0
- if __name__ == '__main__':
- sys.exit(main())
|