""" 球机(PTZ)控制模块 负责PTZ控制、精确定位和视频流获取 """ import os # 必须在导入cv2之前设置,防止FFmpeg多线程解码崩溃 os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1' import math import time import threading import queue from typing import Optional, Tuple, Dict from dataclasses import dataclass import cv2 import numpy as np from config import PTZ_CAMERA, PTZ_CONFIG from dahua_sdk import DahuaSDK, PTZCommand from video_lock import safe_read, safe_is_opened @dataclass class PTZPosition: """PTZ位置信息""" pan: float # 水平角度 (0-360度) tilt: float # 垂直角度 (-90到90度) zoom: float # 变倍 (1-最大倍数) class PTZCamera: """球机控制类""" def __init__(self, sdk: DahuaSDK, camera_config: Dict = None): """ 初始化球机 Args: sdk: 大华SDK实例 camera_config: 摄像头配置 """ self.sdk = sdk self.config = camera_config or PTZ_CAMERA self.ptz_config = PTZ_CONFIG self.login_handle = None self.connected = False # 当前位置 self.current_position = PTZPosition(pan=0, tilt=0, zoom=1) self.position_lock = threading.Lock() # 视频流 (用于校准时抓拍球机画面) self.rtsp_cap = None self.current_frame = None self.frame_lock = threading.Lock() self.stream_thread = None self.running_stream = False self._camera_id = 'ptz' # 用于per-camera锁 def connect(self) -> bool: """ 连接球机 Returns: 是否成功 """ print(f"[PTZCamera] 正在连接球机: IP={self.config['ip']}:{self.config['port']}, 通道={self.config['channel']}") login_handle, error = self.sdk.login( self.config['ip'], self.config['port'], self.config['username'], self.config['password'] ) if login_handle is None: print(f"[PTZCamera] 连接球机失败: IP={self.config['ip']}, 错误码={error}") print(f"[PTZCamera] 请检查: 1)IP地址是否正确 2)网络是否连通 3)用户名密码是否正确") return False self.login_handle = login_handle self.connected = True print(f"[PTZCamera] 成功连接球机: {self.config['ip']}, handle={login_handle}") print(f"[PTZCamera] 配置: 通道={self.config['channel']}, 默认变倍={self.ptz_config['default_zoom']}") return True def disconnect(self): """断开连接""" self.stop_stream() if self.login_handle: self.sdk.logout(self.login_handle) self.login_handle = None self.connected = False def start_stream_rtsp(self, rtsp_url: str = None) -> bool: """启动RTSP视频流 (用于校准时获取球机画面)""" if rtsp_url is None: rtsp_url = self.config.get('rtsp_url') or \ f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/cam/realmonitor?channel=1&subtype=1" try: # 先尝试FFmpeg后端 self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) if not self.rtsp_cap.isOpened(): # FFmpeg失败,尝试GStreamer后端(ARM64上更稳定) print(f"[PTZCamera] FFmpeg后端无法打开RTSP流,尝试GStreamer后端...") try: gst_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_GSTREAMER) if gst_cap.isOpened(): self.rtsp_cap = gst_cap print(f"[PTZCamera] 使用GStreamer后端打开RTSP流成功") else: print(f"[PTZCamera] 无法打开RTSP流: {rtsp_url}") return False except Exception as ge: print(f"[PTZCamera] GStreamer后端也不可用: {ge}") return False self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.running_stream = True self.stream_thread = threading.Thread(target=self._stream_worker, daemon=True) self.stream_thread.start() print(f"[PTZCamera] RTSP视频流已启动") return True except Exception as e: print(f"[PTZCamera] RTSP流启动失败: {e}") return False def _stream_worker(self): """视频流工作线程""" import signal if hasattr(signal, 'pthread_sigmask'): try: signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT}) except (AttributeError, OSError): pass max_consecutive_errors = 50 error_count = 0 while self.running_stream: try: if self.rtsp_cap is None or not safe_is_opened(self.rtsp_cap, self._camera_id): time.sleep(0.1) continue ret, frame = safe_read(self.rtsp_cap, self._camera_id) if not ret or frame is None: error_count += 1 if error_count > max_consecutive_errors: print("[PTZCamera] RTSP流连续读取失败,尝试重连...") self._reconnect_rtsp() error_count = 0 time.sleep(0.01) continue error_count = 0 with self.frame_lock: self.current_frame = frame.copy() time.sleep(0.001) except Exception as e: err_str = str(e) if 'async_lock' in err_str or 'Assertion' in err_str: print(f"[PTZCamera] FFmpeg内部错误,3秒后重建连接: {e}") time.sleep(3) self._reconnect_rtsp() else: print(f"[PTZCamera] 视频流错误: {e}") time.sleep(0.5) def _reconnect_rtsp(self): rtsp_url = self.config.get('rtsp_url') or \ f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/cam/realmonitor?channel=1&subtype=1" if self.rtsp_cap is not None: try: self.rtsp_cap.release() except Exception: pass self.rtsp_cap = None time.sleep(1) try: self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG) if safe_is_opened(self.rtsp_cap, self._camera_id): self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) print("[PTZCamera] RTSP流重连成功") else: print("[PTZCamera] RTSP流重连失败") self.rtsp_cap = None except Exception as e: print(f"[PTZCamera] RTSP流重连异常: {e}") self.rtsp_cap = None def get_frame(self) -> Optional[np.ndarray]: """获取球机当前帧""" with self.frame_lock: return self.current_frame.copy() if self.current_frame is not None else None def stop_stream(self): """停止视频流""" self.running_stream = False if self.stream_thread: self.stream_thread.join(timeout=2) self.stream_thread = None if self.rtsp_cap: self.rtsp_cap.release() self.rtsp_cap = None def ptz_control(self, command: int, param1: int = 0, param2: int = 0, param3: int = 0, stop: bool = False) -> bool: """ PTZ控制 Args: command: 控制命令 param1-3: 参数 stop: 是否停止 Returns: 是否成功 """ if not self.connected: print(f"[PTZCamera] PTZ控制失败: 未连接球机") return False if self.login_handle is None or self.login_handle <= 0: print(f"[PTZCamera] PTZ控制失败: 登录句柄无效 (handle={self.login_handle})") return False return self.sdk.ptz_control( self.login_handle, self.config['channel'], command, param1, param2, param3, stop ) def move_up(self, speed: int = 4, stop: bool = False) -> bool: """向上移动""" return self.ptz_control(PTZCommand.UP, 0, speed, 0, stop) def move_down(self, speed: int = 4, stop: bool = False) -> bool: """向下移动""" return self.ptz_control(PTZCommand.DOWN, 0, speed, 0, stop) def move_left(self, speed: int = 4, stop: bool = False) -> bool: """向左移动""" return self.ptz_control(PTZCommand.LEFT, 0, speed, 0, stop) def move_right(self, speed: int = 4, stop: bool = False) -> bool: """向右移动""" return self.ptz_control(PTZCommand.RIGHT, 0, speed, 0, stop) def zoom_in(self, speed: int = 4, stop: bool = False) -> bool: """放大""" return self.ptz_control(PTZCommand.ZOOM_ADD, 0, speed, 0, stop) def zoom_out(self, speed: int = 4, stop: bool = False) -> bool: """缩小""" return self.ptz_control(PTZCommand.ZOOM_DEC, 0, speed, 0, stop) def stop_move(self) -> bool: """停止移动""" # 发送停止命令 self.move_up(0, True) self.move_left(0, True) self.zoom_in(0, True) return True def goto_exact_position(self, pan: float, tilt: float, zoom: int) -> bool: """ 三维精确定位 Args: pan: 水平角度 (0-360度) tilt: 垂直角度 (-90到90度) zoom: 变倍 (1-128) Returns: 是否成功 """ param1 = int(pan * 10) param2 = int(tilt * 10) param3 = int(min(zoom, 128)) print(f"[PTZCamera] goto_exact_position: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom} → p1={param1} p2={param2} p3={param3}") result = self.ptz_control(PTZCommand.EXACTGOTO, param1, param2, param3) if result: with self.position_lock: self.current_position = PTZPosition(pan=pan, tilt=tilt, zoom=zoom) else: print(f"[PTZCamera] goto_exact_position FAILED!") return result def goto_preset(self, preset_id: int) -> bool: """ 转到预置点 Args: preset_id: 预置点ID Returns: 是否成功 """ return self.ptz_control(PTZCommand.POINT_GO, 0, preset_id, 0) def set_preset(self, preset_id: int) -> bool: """ 设置预置点 Args: preset_id: 预置点ID Returns: 是否成功 """ return self.ptz_control(PTZCommand.POINT_SET, 0, preset_id, 0) def clear_preset(self, preset_id: int) -> bool: """ 清除预置点 Args: preset_id: 预置点ID Returns: 是否成功 """ return self.ptz_control(PTZCommand.POINT_CLEAR, 0, preset_id, 0) def calculate_ptz_position(self, x_ratio: float, y_ratio: float, zoom: int = None) -> Tuple[float, float, int]: """ 根据全景画面中的位置计算PTZ角度 Args: x_ratio: X方向比例 (0-1) y_ratio: Y方向比例 (0-1) zoom: 变倍 (默认使用配置值) Returns: (pan, tilt, zoom) PTZ位置 """ if zoom is None: zoom = self.ptz_config['default_zoom'] # 应用坐标偏移校准 offset_x, offset_y = self.ptz_config['coordinate_offset'] x_ratio = max(0, min(1, x_ratio + offset_x)) y_ratio = max(0, min(1, y_ratio + offset_y)) # 从配置获取视野参数 pan_range = self.ptz_config.get('pan_range', (0, 180)) tilt_range = self.ptz_config.get('tilt_range', (-45, 45)) pan_center = self.ptz_config.get('pan_center', 90) tilt_center = self.ptz_config.get('tilt_center', 0) # 将画面比例转换为角度 # x_ratio=0 对应 pan_range[0], x_ratio=1 对应 pan_range[1] pan = pan_range[0] + (pan_range[1] - pan_range[0]) * x_ratio # y_ratio=0.5 对应 tilt_center, y_ratio=0 对应 tilt_range[0], y_ratio=1 对应 tilt_range[1] tilt = tilt_range[0] + (tilt_range[1] - tilt_range[0]) * y_ratio # 应用方向修正 # 1. 检查安装类型,吸顶安装需要反转tilt mount_type = self.ptz_config.get('mount_type', 'wall') tilt_flip = self.ptz_config.get('tilt_flip', False) # 吸顶安装时自动反转tilt方向 if mount_type == 'ceiling' or tilt_flip: # 反转tilt:原来的正角度变负,负角度变正 tilt = -tilt print(f"[PTZCamera] 吸顶安装,tilt方向修正: {-tilt} -> {tilt}") # 2. 应用pan方向修正 pan_flip = self.ptz_config.get('pan_flip', False) if pan_flip: # 反转pan方向(180度偏移) pan = (pan + 180) % 360 print(f"[PTZCamera] pan方向翻转: {(pan - 180) % 360} -> {pan}") return (pan, tilt, zoom) def move_to_target(self, x_ratio: float, y_ratio: float, zoom: int = None) -> bool: """ 移动到目标位置 Args: x_ratio: X方向比例 (0-1) y_ratio: Y方向比例 (0-1) zoom: 变倍 Returns: 是否成功 """ pan, tilt, zoom = self.calculate_ptz_position(x_ratio, y_ratio, zoom) return self.goto_exact_position(pan, tilt, zoom) def track_target(self, x_ratio: float, y_ratio: float, zoom: int = None) -> bool: """ 跟踪目标 - 与move_to_target相同,但可以添加跟踪特定逻辑 Args: x_ratio: X方向比例 y_ratio: Y方向比例 zoom: 变倍 Returns: 是否成功 """ return self.move_to_target(x_ratio, y_ratio, zoom) def get_current_position(self) -> PTZPosition: """获取当前位置""" with self.position_lock: return PTZPosition( pan=self.current_position.pan, tilt=self.current_position.tilt, zoom=self.current_position.zoom ) def goto_and_confirm(self, pan: float, tilt: float, zoom: int, confirm_timeout: float = 1.0, get_frame_func=None) -> dict: """ PTZ精确定位并确认到位 Args: pan, tilt, zoom: 目标位置 confirm_timeout: 确认超时秒数 get_frame_func: 获取球机帧的函数,用于验证 Returns: dict: {'success': bool, 'pan': float, 'tilt': float, 'zoom': int, 'frame_available': bool, 'elapsed_ms': float} """ import time as _time start = _time.time() success = self.goto_exact_position(pan, tilt, zoom) result = { 'success': success, 'pan': pan, 'tilt': tilt, 'zoom': zoom, 'frame_available': False, 'elapsed_ms': (_time.time() - start) * 1000 } if not success: return result # 等待球机物理移动到位 time.sleep(0.2) # 如果有帧获取函数,验证球机画面 if get_frame_func is not None: deadline = _time.time() + confirm_timeout while _time.time() < deadline: frame = get_frame_func() if frame is not None: result['frame_available'] = True break time.sleep(0.05) result['elapsed_ms'] = (_time.time() - start) * 1000 return result def is_position_close(self, target_pan: float, target_tilt: float, threshold: float = 1.0) -> bool: """ 检查当前位置是否接近目标位置 Args: target_pan: 目标水平角度 target_tilt: 目标垂直角度 threshold: 角度容差(度) """ current = self.get_current_position() pan_diff = abs(current.pan - target_pan) tilt_diff = abs(current.tilt - target_tilt) return pan_diff <= threshold and tilt_diff <= threshold class PTZController: """ PTZ高级控制器 提供平滑移动、跟踪等功能 """ def __init__(self, ptz_camera: PTZCamera): """ 初始化控制器 Args: ptz_camera: PTZ摄像头实例 """ self.ptz = ptz_camera self.tracking = False self.tracking_thread = None self.target_position = None def smooth_move_to(self, pan: float, tilt: float, zoom: int, steps: int = 10, delay: float = 0.1) -> bool: """ 平滑移动到目标位置 Args: pan: 目标水平角度 tilt: 目标垂直角度 zoom: 目标变倍 steps: 移动步数 delay: 步间延迟 Returns: 是否成功 """ current = self.ptz.get_current_position() # 计算步长 pan_step = (pan - current.pan) / steps tilt_step = (tilt - current.tilt) / steps zoom_step = (zoom - current.zoom) / steps for i in range(1, steps + 1): current_pan = current.pan + pan_step * i current_tilt = current.tilt + tilt_step * i current_zoom = int(current.zoom + zoom_step * i) self.ptz.goto_exact_position(current_pan, current_tilt, current_zoom) time.sleep(delay) return True def start_tracking(self, get_target_func, update_interval: float = 0.1): """ 开始跟踪 Args: get_target_func: 获取目标位置的函数 (返回 x_ratio, y_ratio 或 None) update_interval: 更新间隔 """ self.tracking = True def tracking_worker(): while self.tracking: try: target = get_target_func() if target: x_ratio, y_ratio = target self.ptz.track_target(x_ratio, y_ratio) time.sleep(update_interval) except Exception as e: print(f"跟踪错误: {e}") time.sleep(0.1) self.tracking_thread = threading.Thread(target=tracking_worker, daemon=True) self.tracking_thread.start() def stop_tracking(self): """停止跟踪""" self.tracking = False if self.tracking_thread: self.tracking_thread.join(timeout=1) self.tracking_thread = None def zoom_to_target_size(self, target_size: Tuple[int, int], frame_size: Tuple[int, int], min_zoom: int = 2, max_zoom: int = 20) -> int: """ 根据目标大小计算合适的变倍 Args: target_size: 目标尺寸 (width, height) frame_size: 画面尺寸 (width, height) min_zoom: 最小变倍 max_zoom: 最大变倍 Returns: 计算的变倍值 """ target_area = target_size[0] * target_size[1] frame_area = frame_size[0] * frame_size[1] # 目标占画面比例 ratio = target_area / frame_area # 根据比例计算变倍 (目标占画面30%时变倍为1) if ratio > 0: ideal_zoom = math.sqrt(0.3 / ratio) zoom = int(max(min_zoom, min(max_zoom, ideal_zoom))) else: zoom = min_zoom return zoom