| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566 |
- """
- 球机(PTZ)控制模块
- 负责PTZ控制、精确定位和视频流获取
- """
- import os
- # 必须在导入cv2之前设置,防止FFmpeg多线程解码崩溃
- os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'threads;1'
- import math
- import time
- import threading
- import queue
- from typing import Optional, Tuple, Dict
- from dataclasses import dataclass
- import cv2
- import numpy as np
- from config import PTZ_CAMERA, PTZ_CONFIG
- from dahua_sdk import DahuaSDK, PTZCommand
- from video_lock import safe_read, safe_is_opened
- @dataclass
- class PTZPosition:
- """PTZ位置信息"""
- pan: float # 水平角度 (0-360度)
- tilt: float # 垂直角度 (-90到90度)
- zoom: float # 变倍 (1-最大倍数)
- class PTZCamera:
- """球机控制类"""
-
- def __init__(self, sdk: DahuaSDK, camera_config: Dict = None):
- """
- 初始化球机
- Args:
- sdk: 大华SDK实例
- camera_config: 摄像头配置
- """
- self.sdk = sdk
- self.config = camera_config or PTZ_CAMERA
- self.ptz_config = PTZ_CONFIG
-
- self.login_handle = None
- self.connected = False
-
- # 当前位置
- self.current_position = PTZPosition(pan=0, tilt=0, zoom=1)
- self.position_lock = threading.Lock()
-
- # 视频流 (用于校准时抓拍球机画面)
- self.rtsp_cap = None
- self.current_frame = None
- self.frame_lock = threading.Lock()
- self.stream_thread = None
- self.running_stream = False
- self._camera_id = 'ptz' # 用于per-camera锁
-
- def connect(self) -> bool:
- """
- 连接球机
- Returns:
- 是否成功
- """
- print(f"[PTZCamera] 正在连接球机: IP={self.config['ip']}:{self.config['port']}, 通道={self.config['channel']}")
-
- login_handle, error = self.sdk.login(
- self.config['ip'],
- self.config['port'],
- self.config['username'],
- self.config['password']
- )
-
- if login_handle is None:
- print(f"[PTZCamera] 连接球机失败: IP={self.config['ip']}, 错误码={error}")
- print(f"[PTZCamera] 请检查: 1)IP地址是否正确 2)网络是否连通 3)用户名密码是否正确")
- return False
-
- self.login_handle = login_handle
- self.connected = True
- print(f"[PTZCamera] 成功连接球机: {self.config['ip']}, handle={login_handle}")
- print(f"[PTZCamera] 配置: 通道={self.config['channel']}, 默认变倍={self.ptz_config['default_zoom']}")
- return True
-
- def disconnect(self):
- """断开连接"""
- self.stop_stream()
- if self.login_handle:
- self.sdk.logout(self.login_handle)
- self.login_handle = None
- self.connected = False
-
- def start_stream_rtsp(self, rtsp_url: str = None) -> bool:
- """启动RTSP视频流 (用于校准时获取球机画面)"""
- if rtsp_url is None:
- rtsp_url = self.config.get('rtsp_url') or \
- f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/cam/realmonitor?channel=1&subtype=1"
-
- try:
- # 先尝试FFmpeg后端
- self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
- if not self.rtsp_cap.isOpened():
- # FFmpeg失败,尝试GStreamer后端(ARM64上更稳定)
- print(f"[PTZCamera] FFmpeg后端无法打开RTSP流,尝试GStreamer后端...")
- try:
- gst_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_GSTREAMER)
- if gst_cap.isOpened():
- self.rtsp_cap = gst_cap
- print(f"[PTZCamera] 使用GStreamer后端打开RTSP流成功")
- else:
- print(f"[PTZCamera] 无法打开RTSP流: {rtsp_url}")
- return False
- except Exception as ge:
- print(f"[PTZCamera] GStreamer后端也不可用: {ge}")
- return False
-
- self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
-
- self.running_stream = True
- self.stream_thread = threading.Thread(target=self._stream_worker, daemon=True)
- self.stream_thread.start()
- print(f"[PTZCamera] RTSP视频流已启动")
- return True
- except Exception as e:
- print(f"[PTZCamera] RTSP流启动失败: {e}")
- return False
-
- def _stream_worker(self):
- """视频流工作线程"""
- import signal
- if hasattr(signal, 'pthread_sigmask'):
- try:
- signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
- except (AttributeError, OSError):
- pass
-
- max_consecutive_errors = 50
- error_count = 0
-
- while self.running_stream:
- try:
- if self.rtsp_cap is None or not safe_is_opened(self.rtsp_cap, self._camera_id):
- time.sleep(0.1)
- continue
-
- ret, frame = safe_read(self.rtsp_cap, self._camera_id)
- if not ret or frame is None:
- error_count += 1
- if error_count > max_consecutive_errors:
- print("[PTZCamera] RTSP流连续读取失败,尝试重连...")
- self._reconnect_rtsp()
- error_count = 0
- time.sleep(0.01)
- continue
-
- error_count = 0
-
- with self.frame_lock:
- self.current_frame = frame.copy()
-
- time.sleep(0.001)
- except Exception as e:
- err_str = str(e)
- if 'async_lock' in err_str or 'Assertion' in err_str:
- print(f"[PTZCamera] FFmpeg内部错误,3秒后重建连接: {e}")
- time.sleep(3)
- self._reconnect_rtsp()
- else:
- print(f"[PTZCamera] 视频流错误: {e}")
- time.sleep(0.5)
-
- def _reconnect_rtsp(self):
- rtsp_url = self.config.get('rtsp_url') or \
- f"rtsp://{self.config['username']}:{self.config['password']}@{self.config['ip']}:{self.config.get('rtsp_port', 554)}/cam/realmonitor?channel=1&subtype=1"
-
- if self.rtsp_cap is not None:
- try:
- self.rtsp_cap.release()
- except Exception:
- pass
- self.rtsp_cap = None
-
- time.sleep(1)
-
- try:
- self.rtsp_cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
- if safe_is_opened(self.rtsp_cap, self._camera_id):
- self.rtsp_cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
- print("[PTZCamera] RTSP流重连成功")
- else:
- print("[PTZCamera] RTSP流重连失败")
- self.rtsp_cap = None
- except Exception as e:
- print(f"[PTZCamera] RTSP流重连异常: {e}")
- self.rtsp_cap = None
-
- def get_frame(self) -> Optional[np.ndarray]:
- """获取球机当前帧"""
- with self.frame_lock:
- return self.current_frame.copy() if self.current_frame is not None else None
-
- def stop_stream(self):
- """停止视频流"""
- self.running_stream = False
- if self.stream_thread:
- self.stream_thread.join(timeout=2)
- self.stream_thread = None
- if self.rtsp_cap:
- self.rtsp_cap.release()
- self.rtsp_cap = None
-
- def ptz_control(self, command: int, param1: int = 0, param2: int = 0,
- param3: int = 0, stop: bool = False) -> bool:
- """
- PTZ控制
- Args:
- command: 控制命令
- param1-3: 参数
- stop: 是否停止
- Returns:
- 是否成功
- """
- if not self.connected:
- print(f"[PTZCamera] PTZ控制失败: 未连接球机")
- return False
-
- if self.login_handle is None or self.login_handle <= 0:
- print(f"[PTZCamera] PTZ控制失败: 登录句柄无效 (handle={self.login_handle})")
- return False
-
- return self.sdk.ptz_control(
- self.login_handle,
- self.config['channel'],
- command, param1, param2, param3, stop
- )
-
- def move_up(self, speed: int = 4, stop: bool = False) -> bool:
- """向上移动"""
- return self.ptz_control(PTZCommand.UP, 0, speed, 0, stop)
-
- def move_down(self, speed: int = 4, stop: bool = False) -> bool:
- """向下移动"""
- return self.ptz_control(PTZCommand.DOWN, 0, speed, 0, stop)
-
- def move_left(self, speed: int = 4, stop: bool = False) -> bool:
- """向左移动"""
- return self.ptz_control(PTZCommand.LEFT, 0, speed, 0, stop)
-
- def move_right(self, speed: int = 4, stop: bool = False) -> bool:
- """向右移动"""
- return self.ptz_control(PTZCommand.RIGHT, 0, speed, 0, stop)
-
- def zoom_in(self, speed: int = 4, stop: bool = False) -> bool:
- """放大"""
- return self.ptz_control(PTZCommand.ZOOM_ADD, 0, speed, 0, stop)
-
- def zoom_out(self, speed: int = 4, stop: bool = False) -> bool:
- """缩小"""
- return self.ptz_control(PTZCommand.ZOOM_DEC, 0, speed, 0, stop)
-
- def stop_move(self) -> bool:
- """停止移动"""
- # 发送停止命令
- self.move_up(0, True)
- self.move_left(0, True)
- self.zoom_in(0, True)
- return True
-
- def goto_exact_position(self, pan: float, tilt: float, zoom: int) -> bool:
- """
- 三维精确定位
- Args:
- pan: 水平角度 (0-360度)
- tilt: 垂直角度 (-90到90度)
- zoom: 变倍 (1-128)
- Returns:
- 是否成功
- """
- param1 = int(pan * 10)
- param2 = int(tilt * 10)
- param3 = int(min(zoom, 128))
-
- print(f"[PTZCamera] goto_exact_position: pan={pan:.1f}° tilt={tilt:.1f}° zoom={zoom} → p1={param1} p2={param2} p3={param3}")
-
- result = self.ptz_control(PTZCommand.EXACTGOTO, param1, param2, param3)
-
- if result:
- with self.position_lock:
- self.current_position = PTZPosition(pan=pan, tilt=tilt, zoom=zoom)
- else:
- print(f"[PTZCamera] goto_exact_position FAILED!")
-
- return result
-
- def goto_preset(self, preset_id: int) -> bool:
- """
- 转到预置点
- Args:
- preset_id: 预置点ID
- Returns:
- 是否成功
- """
- return self.ptz_control(PTZCommand.POINT_GO, 0, preset_id, 0)
-
- def set_preset(self, preset_id: int) -> bool:
- """
- 设置预置点
- Args:
- preset_id: 预置点ID
- Returns:
- 是否成功
- """
- return self.ptz_control(PTZCommand.POINT_SET, 0, preset_id, 0)
-
- def clear_preset(self, preset_id: int) -> bool:
- """
- 清除预置点
- Args:
- preset_id: 预置点ID
- Returns:
- 是否成功
- """
- return self.ptz_control(PTZCommand.POINT_CLEAR, 0, preset_id, 0)
-
- def calculate_ptz_position(self, x_ratio: float, y_ratio: float,
- zoom: int = None) -> Tuple[float, float, int]:
- """
- 根据全景画面中的位置计算PTZ角度
- Args:
- x_ratio: X方向比例 (0-1)
- y_ratio: Y方向比例 (0-1)
- zoom: 变倍 (默认使用配置值)
- Returns:
- (pan, tilt, zoom) PTZ位置
- """
- if zoom is None:
- zoom = self.ptz_config['default_zoom']
-
- # 应用坐标偏移校准
- offset_x, offset_y = self.ptz_config['coordinate_offset']
- x_ratio = max(0, min(1, x_ratio + offset_x))
- y_ratio = max(0, min(1, y_ratio + offset_y))
-
- # 从配置获取视野参数
- pan_range = self.ptz_config.get('pan_range', (0, 180))
- tilt_range = self.ptz_config.get('tilt_range', (-45, 45))
- pan_center = self.ptz_config.get('pan_center', 90)
- tilt_center = self.ptz_config.get('tilt_center', 0)
-
- # 将画面比例转换为角度
- # x_ratio=0 对应 pan_range[0], x_ratio=1 对应 pan_range[1]
- pan = pan_range[0] + (pan_range[1] - pan_range[0]) * x_ratio
-
- # y_ratio=0.5 对应 tilt_center, y_ratio=0 对应 tilt_range[0], y_ratio=1 对应 tilt_range[1]
- tilt = tilt_range[0] + (tilt_range[1] - tilt_range[0]) * y_ratio
-
- return (pan, tilt, zoom)
-
- def move_to_target(self, x_ratio: float, y_ratio: float,
- zoom: int = None) -> bool:
- """
- 移动到目标位置
- Args:
- x_ratio: X方向比例 (0-1)
- y_ratio: Y方向比例 (0-1)
- zoom: 变倍
- Returns:
- 是否成功
- """
- pan, tilt, zoom = self.calculate_ptz_position(x_ratio, y_ratio, zoom)
- return self.goto_exact_position(pan, tilt, zoom)
-
- def track_target(self, x_ratio: float, y_ratio: float,
- zoom: int = None) -> bool:
- """
- 跟踪目标 - 与move_to_target相同,但可以添加跟踪特定逻辑
- Args:
- x_ratio: X方向比例
- y_ratio: Y方向比例
- zoom: 变倍
- Returns:
- 是否成功
- """
- return self.move_to_target(x_ratio, y_ratio, zoom)
-
- def get_current_position(self) -> PTZPosition:
- """获取当前位置"""
- with self.position_lock:
- return PTZPosition(
- pan=self.current_position.pan,
- tilt=self.current_position.tilt,
- zoom=self.current_position.zoom
- )
-
- def goto_and_confirm(self, pan: float, tilt: float, zoom: int,
- confirm_timeout: float = 1.0,
- get_frame_func=None) -> dict:
- """
- PTZ精确定位并确认到位
-
- Args:
- pan, tilt, zoom: 目标位置
- confirm_timeout: 确认超时秒数
- get_frame_func: 获取球机帧的函数,用于验证
-
- Returns:
- dict: {'success': bool, 'pan': float, 'tilt': float, 'zoom': int,
- 'frame_available': bool, 'elapsed_ms': float}
- """
- import time as _time
- start = _time.time()
-
- success = self.goto_exact_position(pan, tilt, zoom)
-
- result = {
- 'success': success,
- 'pan': pan,
- 'tilt': tilt,
- 'zoom': zoom,
- 'frame_available': False,
- 'elapsed_ms': (_time.time() - start) * 1000
- }
-
- if not success:
- return result
-
- # 等待球机物理移动到位
- time.sleep(0.2)
-
- # 如果有帧获取函数,验证球机画面
- if get_frame_func is not None:
- deadline = _time.time() + confirm_timeout
- while _time.time() < deadline:
- frame = get_frame_func()
- if frame is not None:
- result['frame_available'] = True
- break
- time.sleep(0.05)
-
- result['elapsed_ms'] = (_time.time() - start) * 1000
- return result
-
- def is_position_close(self, target_pan: float, target_tilt: float,
- threshold: float = 1.0) -> bool:
- """
- 检查当前位置是否接近目标位置
-
- Args:
- target_pan: 目标水平角度
- target_tilt: 目标垂直角度
- threshold: 角度容差(度)
- """
- current = self.get_current_position()
- pan_diff = abs(current.pan - target_pan)
- tilt_diff = abs(current.tilt - target_tilt)
- return pan_diff <= threshold and tilt_diff <= threshold
- class PTZController:
- """
- PTZ高级控制器
- 提供平滑移动、跟踪等功能
- """
-
- def __init__(self, ptz_camera: PTZCamera):
- """
- 初始化控制器
- Args:
- ptz_camera: PTZ摄像头实例
- """
- self.ptz = ptz_camera
- self.tracking = False
- self.tracking_thread = None
- self.target_position = None
-
- def smooth_move_to(self, pan: float, tilt: float, zoom: int,
- steps: int = 10, delay: float = 0.1) -> bool:
- """
- 平滑移动到目标位置
- Args:
- pan: 目标水平角度
- tilt: 目标垂直角度
- zoom: 目标变倍
- steps: 移动步数
- delay: 步间延迟
- Returns:
- 是否成功
- """
- current = self.ptz.get_current_position()
-
- # 计算步长
- pan_step = (pan - current.pan) / steps
- tilt_step = (tilt - current.tilt) / steps
- zoom_step = (zoom - current.zoom) / steps
-
- for i in range(1, steps + 1):
- current_pan = current.pan + pan_step * i
- current_tilt = current.tilt + tilt_step * i
- current_zoom = int(current.zoom + zoom_step * i)
-
- self.ptz.goto_exact_position(current_pan, current_tilt, current_zoom)
- time.sleep(delay)
-
- return True
-
- def start_tracking(self, get_target_func, update_interval: float = 0.1):
- """
- 开始跟踪
- Args:
- get_target_func: 获取目标位置的函数 (返回 x_ratio, y_ratio 或 None)
- update_interval: 更新间隔
- """
- self.tracking = True
-
- def tracking_worker():
- while self.tracking:
- try:
- target = get_target_func()
- if target:
- x_ratio, y_ratio = target
- self.ptz.track_target(x_ratio, y_ratio)
- time.sleep(update_interval)
- except Exception as e:
- print(f"跟踪错误: {e}")
- time.sleep(0.1)
-
- self.tracking_thread = threading.Thread(target=tracking_worker, daemon=True)
- self.tracking_thread.start()
-
- def stop_tracking(self):
- """停止跟踪"""
- self.tracking = False
- if self.tracking_thread:
- self.tracking_thread.join(timeout=1)
- self.tracking_thread = None
-
- def zoom_to_target_size(self, target_size: Tuple[int, int],
- frame_size: Tuple[int, int],
- min_zoom: int = 2, max_zoom: int = 20) -> int:
- """
- 根据目标大小计算合适的变倍
- Args:
- target_size: 目标尺寸 (width, height)
- frame_size: 画面尺寸 (width, height)
- min_zoom: 最小变倍
- max_zoom: 最大变倍
- Returns:
- 计算的变倍值
- """
- target_area = target_size[0] * target_size[1]
- frame_area = frame_size[0] * frame_size[1]
-
- # 目标占画面比例
- ratio = target_area / frame_area
-
- # 根据比例计算变倍 (目标占画面30%时变倍为1)
- if ratio > 0:
- ideal_zoom = math.sqrt(0.3 / ratio)
- zoom = int(max(min_zoom, min(max_zoom, ideal_zoom)))
- else:
- zoom = min_zoom
-
- return zoom
|