ptz_camera.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. """
  2. 球机(PTZ)控制模块
  3. 负责PTZ控制和精确定位
  4. """
  5. import math
  6. import time
  7. import threading
  8. from typing import Optional, Tuple, Dict
  9. from dataclasses import dataclass
  10. from config import PTZ_CAMERA, PTZ_CONFIG
  11. from dahua_sdk import DahuaSDK, PTZCommand
  12. @dataclass
  13. class PTZPosition:
  14. """PTZ位置信息"""
  15. pan: float # 水平角度 (0-360度)
  16. tilt: float # 垂直角度 (-90到90度)
  17. zoom: float # 变倍 (1-最大倍数)
  18. class PTZCamera:
  19. """球机控制类"""
  20. def __init__(self, sdk: DahuaSDK, camera_config: Dict = None):
  21. """
  22. 初始化球机
  23. Args:
  24. sdk: 大华SDK实例
  25. camera_config: 摄像头配置
  26. """
  27. self.sdk = sdk
  28. self.config = camera_config or PTZ_CAMERA
  29. self.ptz_config = PTZ_CONFIG
  30. self.login_handle = None
  31. self.connected = False
  32. # 当前位置
  33. self.current_position = PTZPosition(pan=0, tilt=0, zoom=1)
  34. self.position_lock = threading.Lock()
  35. def connect(self) -> bool:
  36. """
  37. 连接球机
  38. Returns:
  39. 是否成功
  40. """
  41. login_handle, error = self.sdk.login(
  42. self.config['ip'],
  43. self.config['port'],
  44. self.config['username'],
  45. self.config['password']
  46. )
  47. if login_handle is None:
  48. print(f"连接球机失败: IP={self.config['ip']}, 错误码={error}")
  49. return False
  50. self.login_handle = login_handle
  51. self.connected = True
  52. print(f"成功连接球机: {self.config['ip']}")
  53. return True
  54. def disconnect(self):
  55. """断开连接"""
  56. if self.login_handle:
  57. self.sdk.logout(self.login_handle)
  58. self.login_handle = None
  59. self.connected = False
  60. def ptz_control(self, command: int, param1: int = 0, param2: int = 0,
  61. param3: int = 0, stop: bool = False) -> bool:
  62. """
  63. PTZ控制
  64. Args:
  65. command: 控制命令
  66. param1-3: 参数
  67. stop: 是否停止
  68. Returns:
  69. 是否成功
  70. """
  71. if not self.connected:
  72. return False
  73. return self.sdk.ptz_control(
  74. self.login_handle,
  75. self.config['channel'],
  76. command, param1, param2, param3, stop
  77. )
  78. def move_up(self, speed: int = 4, stop: bool = False) -> bool:
  79. """向上移动"""
  80. return self.ptz_control(PTZCommand.UP, 0, speed, 0, stop)
  81. def move_down(self, speed: int = 4, stop: bool = False) -> bool:
  82. """向下移动"""
  83. return self.ptz_control(PTZCommand.DOWN, 0, speed, 0, stop)
  84. def move_left(self, speed: int = 4, stop: bool = False) -> bool:
  85. """向左移动"""
  86. return self.ptz_control(PTZCommand.LEFT, 0, speed, 0, stop)
  87. def move_right(self, speed: int = 4, stop: bool = False) -> bool:
  88. """向右移动"""
  89. return self.ptz_control(PTZCommand.RIGHT, 0, speed, 0, stop)
  90. def zoom_in(self, speed: int = 4, stop: bool = False) -> bool:
  91. """放大"""
  92. return self.ptz_control(PTZCommand.ZOOM_ADD, 0, speed, 0, stop)
  93. def zoom_out(self, speed: int = 4, stop: bool = False) -> bool:
  94. """缩小"""
  95. return self.ptz_control(PTZCommand.ZOOM_DEC, 0, speed, 0, stop)
  96. def stop_move(self) -> bool:
  97. """停止移动"""
  98. # 发送停止命令
  99. self.move_up(0, True)
  100. self.move_left(0, True)
  101. self.zoom_in(0, True)
  102. return True
  103. def goto_exact_position(self, pan: float, tilt: float, zoom: int) -> bool:
  104. """
  105. 三维精确定位
  106. Args:
  107. pan: 水平角度 (0-360度)
  108. tilt: 垂直角度 (-90到90度)
  109. zoom: 变倍 (1-128)
  110. Returns:
  111. 是否成功
  112. """
  113. # SDK参数: pan 0~3600 (0.1度单位), tilt -1800~1800, zoom 1~128
  114. param1 = int(pan * 10) # 转换为0.1度单位
  115. param2 = int(tilt * 10) # 转换为0.1度单位
  116. param3 = int(min(zoom, 128)) # 限制最大128倍
  117. result = self.ptz_control(PTZCommand.EXACTGOTO, param1, param2, param3)
  118. if result:
  119. with self.position_lock:
  120. self.current_position = PTZPosition(pan=pan, tilt=tilt, zoom=zoom)
  121. return result
  122. def goto_preset(self, preset_id: int) -> bool:
  123. """
  124. 转到预置点
  125. Args:
  126. preset_id: 预置点ID
  127. Returns:
  128. 是否成功
  129. """
  130. return self.ptz_control(PTZCommand.POINT_GO, 0, preset_id, 0)
  131. def set_preset(self, preset_id: int) -> bool:
  132. """
  133. 设置预置点
  134. Args:
  135. preset_id: 预置点ID
  136. Returns:
  137. 是否成功
  138. """
  139. return self.ptz_control(PTZCommand.POINT_SET, 0, preset_id, 0)
  140. def clear_preset(self, preset_id: int) -> bool:
  141. """
  142. 清除预置点
  143. Args:
  144. preset_id: 预置点ID
  145. Returns:
  146. 是否成功
  147. """
  148. return self.ptz_control(PTZCommand.POINT_CLEAR, 0, preset_id, 0)
  149. def calculate_ptz_position(self, x_ratio: float, y_ratio: float,
  150. zoom: int = None) -> Tuple[float, float, int]:
  151. """
  152. 根据全景画面中的位置计算PTZ角度
  153. Args:
  154. x_ratio: X方向比例 (0-1)
  155. y_ratio: Y方向比例 (0-1)
  156. zoom: 变倍 (默认使用配置值)
  157. Returns:
  158. (pan, tilt, zoom) PTZ位置
  159. """
  160. if zoom is None:
  161. zoom = self.ptz_config['default_zoom']
  162. # 应用坐标偏移校准
  163. offset_x, offset_y = self.ptz_config['coordinate_offset']
  164. x_ratio = max(0, min(1, x_ratio + offset_x))
  165. y_ratio = max(0, min(1, y_ratio + offset_y))
  166. # 从配置获取视野参数
  167. pan_range = self.ptz_config.get('pan_range', (0, 180))
  168. tilt_range = self.ptz_config.get('tilt_range', (-45, 45))
  169. pan_center = self.ptz_config.get('pan_center', 90)
  170. tilt_center = self.ptz_config.get('tilt_center', 0)
  171. # 将画面比例转换为角度
  172. # x_ratio=0 对应 pan_range[0], x_ratio=1 对应 pan_range[1]
  173. pan = pan_range[0] + (pan_range[1] - pan_range[0]) * x_ratio
  174. # y_ratio=0.5 对应 tilt_center, y_ratio=0 对应 tilt_range[0], y_ratio=1 对应 tilt_range[1]
  175. tilt = tilt_range[0] + (tilt_range[1] - tilt_range[0]) * y_ratio
  176. return (pan, tilt, zoom)
  177. def move_to_target(self, x_ratio: float, y_ratio: float,
  178. zoom: int = None) -> bool:
  179. """
  180. 移动到目标位置
  181. Args:
  182. x_ratio: X方向比例 (0-1)
  183. y_ratio: Y方向比例 (0-1)
  184. zoom: 变倍
  185. Returns:
  186. 是否成功
  187. """
  188. pan, tilt, zoom = self.calculate_ptz_position(x_ratio, y_ratio, zoom)
  189. return self.goto_exact_position(pan, tilt, zoom)
  190. def track_target(self, x_ratio: float, y_ratio: float,
  191. zoom: int = None) -> bool:
  192. """
  193. 跟踪目标 - 与move_to_target相同,但可以添加跟踪特定逻辑
  194. Args:
  195. x_ratio: X方向比例
  196. y_ratio: Y方向比例
  197. zoom: 变倍
  198. Returns:
  199. 是否成功
  200. """
  201. return self.move_to_target(x_ratio, y_ratio, zoom)
  202. def get_current_position(self) -> PTZPosition:
  203. """获取当前位置"""
  204. with self.position_lock:
  205. return PTZPosition(
  206. pan=self.current_position.pan,
  207. tilt=self.current_position.tilt,
  208. zoom=self.current_position.zoom
  209. )
  210. class PTZController:
  211. """
  212. PTZ高级控制器
  213. 提供平滑移动、跟踪等功能
  214. """
  215. def __init__(self, ptz_camera: PTZCamera):
  216. """
  217. 初始化控制器
  218. Args:
  219. ptz_camera: PTZ摄像头实例
  220. """
  221. self.ptz = ptz_camera
  222. self.tracking = False
  223. self.tracking_thread = None
  224. self.target_position = None
  225. def smooth_move_to(self, pan: float, tilt: float, zoom: int,
  226. steps: int = 10, delay: float = 0.1) -> bool:
  227. """
  228. 平滑移动到目标位置
  229. Args:
  230. pan: 目标水平角度
  231. tilt: 目标垂直角度
  232. zoom: 目标变倍
  233. steps: 移动步数
  234. delay: 步间延迟
  235. Returns:
  236. 是否成功
  237. """
  238. current = self.ptz.get_current_position()
  239. # 计算步长
  240. pan_step = (pan - current.pan) / steps
  241. tilt_step = (tilt - current.tilt) / steps
  242. zoom_step = (zoom - current.zoom) / steps
  243. for i in range(1, steps + 1):
  244. current_pan = current.pan + pan_step * i
  245. current_tilt = current.tilt + tilt_step * i
  246. current_zoom = int(current.zoom + zoom_step * i)
  247. self.ptz.goto_exact_position(current_pan, current_tilt, current_zoom)
  248. time.sleep(delay)
  249. return True
  250. def start_tracking(self, get_target_func, update_interval: float = 0.1):
  251. """
  252. 开始跟踪
  253. Args:
  254. get_target_func: 获取目标位置的函数 (返回 x_ratio, y_ratio 或 None)
  255. update_interval: 更新间隔
  256. """
  257. self.tracking = True
  258. def tracking_worker():
  259. while self.tracking:
  260. try:
  261. target = get_target_func()
  262. if target:
  263. x_ratio, y_ratio = target
  264. self.ptz.track_target(x_ratio, y_ratio)
  265. time.sleep(update_interval)
  266. except Exception as e:
  267. print(f"跟踪错误: {e}")
  268. time.sleep(0.1)
  269. self.tracking_thread = threading.Thread(target=tracking_worker, daemon=True)
  270. self.tracking_thread.start()
  271. def stop_tracking(self):
  272. """停止跟踪"""
  273. self.tracking = False
  274. if self.tracking_thread:
  275. self.tracking_thread.join(timeout=1)
  276. self.tracking_thread = None
  277. def zoom_to_target_size(self, target_size: Tuple[int, int],
  278. frame_size: Tuple[int, int],
  279. min_zoom: int = 2, max_zoom: int = 20) -> int:
  280. """
  281. 根据目标大小计算合适的变倍
  282. Args:
  283. target_size: 目标尺寸 (width, height)
  284. frame_size: 画面尺寸 (width, height)
  285. min_zoom: 最小变倍
  286. max_zoom: 最大变倍
  287. Returns:
  288. 计算的变倍值
  289. """
  290. target_area = target_size[0] * target_size[1]
  291. frame_area = frame_size[0] * frame_size[1]
  292. # 目标占画面比例
  293. ratio = target_area / frame_area
  294. # 根据比例计算变倍 (目标占画面30%时变倍为1)
  295. if ratio > 0:
  296. ideal_zoom = math.sqrt(0.3 / ratio)
  297. zoom = int(max(min_zoom, min(max_zoom, ideal_zoom)))
  298. else:
  299. zoom = min_zoom
  300. return zoom