| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- """
- 大华SDK Python封装
- 使用ctypes调用大华NetSDK
- """
- import ctypes
- import os
- import threading
- import queue
- from ctypes import c_int, c_long, c_ulong, c_char_p, c_ubyte, POINTER, Structure, byref, CFUNCTYPE
- from typing import Optional, Callable, Tuple, Dict, Any
- class VideoFrameBuffer:
- """
- 视频帧缓冲区
- 用于SDK回调存储帧数据
- """
- def __init__(self, max_size: int = 10):
- self.buffer = queue.Queue(maxsize=max_size)
- self.lock = threading.Lock()
- self.latest_frame = None
-
- def put(self, frame_data: bytes, width: int, height: int, frame_type: int = 0):
- """存入帧数据"""
- with self.lock:
- self.latest_frame = {
- 'data': frame_data,
- 'width': width,
- 'height': height,
- 'type': frame_type
- }
- try:
- self.buffer.put_nowait(self.latest_frame)
- except queue.Full:
- pass
-
- def get(self, timeout: float = 0.1) -> Optional[Dict[str, Any]]:
- """获取帧数据"""
- try:
- return self.buffer.get(timeout=timeout)
- except queue.Empty:
- return None
-
- def get_latest(self) -> Optional[Dict[str, Any]]:
- """获取最新帧数据"""
- with self.lock:
- return self.latest_frame
- # 加载SDK库
- class DahuaSDK:
- """大华SDK封装类"""
-
- def __init__(self, lib_path: str):
- """
- 初始化SDK
- Args:
- lib_path: SDK库文件路径
- """
- self.lib_path = lib_path
- self.sdk = None
- self.initialized = False
- self._disconnect_callback = None # 保存回调引用,防止被垃圾回收
-
- # 视频回调相关
- self._video_callback = None
- self._video_frame_buffers: Dict[int, VideoFrameBuffer] = {}
-
- self._setup_structures()
- self._load_library()
-
- def _setup_structures(self):
- """设置SDK所需的C结构体"""
-
- # 登录输入参数
- class NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY(Structure):
- _fields_ = [
- ('dwSize', c_ulong),
- ('szIP', ctypes.c_char * 64),
- ('nPort', c_int),
- ('szUserName', ctypes.c_char * 64),
- ('szPassword', ctypes.c_char * 64),
- ('emSpecCap', c_int),
- ]
-
- # 登录输出参数
- class NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY(Structure):
- _fields_ = [
- ('dwSize', c_ulong),
- ('pstuDeviceInfo', ctypes.c_void_p),
- ('nError', c_int),
- ('byRes', ctypes.c_char * 128),
- ]
-
- # 抓拍参数
- class SNAP_PARAMS(Structure):
- _fields_ = [
- ('Channel', c_int),
- ('mode', c_int),
- ('CmdSerial', c_int),
- ('PicTransType', c_int),
- ('SendTotal', c_int),
- ('Quality', c_int),
- ('PicFormat', c_int),
- ('PicWidth', c_int),
- ('PicHeight', c_int),
- ('SnapDelayTime', c_int),
- ('byRes', ctypes.c_char * 12),
- ]
-
- self.NET_IN_LOGIN = NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY
- self.NET_OUT_LOGIN = NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY
- self.SNAP_PARAMS = SNAP_PARAMS
-
- def _load_library(self):
- """加载SDK动态库"""
- try:
- lib_dir = os.path.dirname(self.lib_path)
-
- # 预加载依赖库 (确保库依赖能正确解析)
- # 大华SDK依赖 libcrypto.so, libssl.so 等
- if lib_dir:
- # 使用 RTLD_GLOBAL 标志加载依赖库,使其符号对后续库可见
- try:
- # 尝试预加载常见的依赖库
- for dep_lib in ['libcrypto.so', 'libssl.so']:
- dep_path = os.path.join(lib_dir, dep_lib)
- if os.path.exists(dep_path):
- ctypes.CDLL(dep_path, mode=ctypes.RTLD_GLOBAL)
- except Exception as e:
- # 依赖库加载失败不一定是致命错误,继续尝试加载主库
- print(f"加载依赖库警告: {e}")
-
- # 加载主SDK库
- self.sdk = ctypes.CDLL(self.lib_path)
- self._setup_functions()
- self.initialized = True
- print(f"成功加载大华SDK: {self.lib_path}")
- except Exception as e:
- print(f"加载SDK失败: {e}")
- raise
-
- def _setup_functions(self):
- """设置SDK函数签名"""
-
- # CLIENT_Init
- self.sdk.CLIENT_Init.argtypes = [ctypes.c_void_p, c_ulong]
- self.sdk.CLIENT_Init.restype = ctypes.c_bool
-
- # CLIENT_Cleanup
- self.sdk.CLIENT_Cleanup.argtypes = []
- self.sdk.CLIENT_Cleanup.restype = None
-
- # CLIENT_LoginWithHighLevelSecurity
- self.sdk.CLIENT_LoginWithHighLevelSecurity.argtypes = [
- POINTER(self.NET_IN_LOGIN),
- POINTER(self.NET_OUT_LOGIN)
- ]
- self.sdk.CLIENT_LoginWithHighLevelSecurity.restype = c_long
-
- # CLIENT_Logout
- self.sdk.CLIENT_Logout.argtypes = [c_long]
- self.sdk.CLIENT_Logout.restype = ctypes.c_bool
-
- # CLIENT_RealPlay
- self.sdk.CLIENT_RealPlay.argtypes = [c_long, c_int, ctypes.c_void_p]
- self.sdk.CLIENT_RealPlay.restype = c_long
-
- # CLIENT_StopRealPlay
- self.sdk.CLIENT_StopRealPlay.argtypes = [c_long]
- self.sdk.CLIENT_StopRealPlay.restype = ctypes.c_bool
-
- # CLIENT_DHPTZControlEx
- self.sdk.CLIENT_DHPTZControlEx.argtypes = [
- c_long, c_int, c_ulong, c_long, c_long, c_long, ctypes.c_bool
- ]
- self.sdk.CLIENT_DHPTZControlEx.restype = ctypes.c_bool
-
- # CLIENT_SnapPicture
- self.sdk.CLIENT_SnapPicture.argtypes = [c_long, POINTER(self.SNAP_PARAMS)]
- self.sdk.CLIENT_SnapPicture.restype = ctypes.c_bool
-
- # CLIENT_SetSnapRevCallBack
- self.sdk.CLIENT_SetSnapRevCallBack.argtypes = [ctypes.c_void_p, c_ulong]
- self.sdk.CLIENT_SetSnapRevCallBack.restype = None
-
- # CLIENT_RealPlayEx - 扩展实时预览 (支持回调)
- self.sdk.CLIENT_RealPlayEx.argtypes = [c_long, c_int, ctypes.c_void_p, c_int]
- self.sdk.CLIENT_RealPlayEx.restype = c_long
-
- # CLIENT_StopRealPlayEx
- self.sdk.CLIENT_StopRealPlayEx.argtypes = [c_long]
- self.sdk.CLIENT_StopRealPlayEx.restype = ctypes.c_bool
-
- # CLIENT_SetVideoProcCallBack - 设置视频回调
- self.sdk.CLIENT_SetVideoProcCallBack.argtypes = [ctypes.c_void_p, c_ulong]
- self.sdk.CLIENT_SetVideoProcCallBack.restype = None
-
- def init(self, disconnect_callback: Callable = None) -> bool:
- """
- 初始化SDK
- Args:
- disconnect_callback: 断线回调函数
- Returns:
- 是否成功
- """
- if not self.sdk:
- return False
-
- if disconnect_callback:
- # 保存回调引用,防止被垃圾回收
- self._disconnect_callback = ctypes.CFUNCTYPE(
- None, c_long, c_char_p, c_int, c_ulong
- )(disconnect_callback)
-
- result = self.sdk.CLIENT_Init(self._disconnect_callback, 0)
- self.initialized = result
- return result
-
- def cleanup(self):
- """清理SDK资源"""
- if self.sdk and self.initialized:
- self.sdk.CLIENT_Cleanup()
- self.initialized = False
-
- def login(self, ip: str, port: int, username: str, password: str) -> Tuple[Optional[int], int]:
- """
- 登录设备
- Args:
- ip: 设备IP
- port: 端口号
- username: 用户名
- password: 密码
- Returns:
- (登录句柄, 错误码) - 成功时错误码为0
- """
- if not self.sdk:
- return None, -1
-
- in_param = self.NET_IN_LOGIN()
- in_param.dwSize = ctypes.sizeof(self.NET_IN_LOGIN)
- in_param.szIP = ip.encode('utf-8')
- in_param.nPort = port
- in_param.szUserName = username.encode('utf-8')
- in_param.szPassword = password.encode('utf-8')
- in_param.emSpecCap = 0 # EM_LOGIN_SPEC_CAP_TCP
-
- out_param = self.NET_OUT_LOGIN()
- out_param.dwSize = ctypes.sizeof(self.NET_OUT_LOGIN)
-
- login_handle = self.sdk.CLIENT_LoginWithHighLevelSecurity(
- byref(in_param), byref(out_param)
- )
-
- if login_handle == 0:
- return None, out_param.nError
-
- return login_handle, 0
-
- def logout(self, login_handle: int) -> bool:
- """登出设备"""
- if not self.sdk or login_handle <= 0:
- return False
- return self.sdk.CLIENT_Logout(login_handle)
-
- def real_play(self, login_handle: int, channel: int = 0) -> Optional[int]:
- """
- 开始实时预览
- Args:
- login_handle: 登录句柄
- channel: 通道号
- Returns:
- 预览句柄
- """
- if not self.sdk or login_handle <= 0:
- return None
- play_handle = self.sdk.CLIENT_RealPlay(login_handle, channel, None)
- return play_handle if play_handle > 0 else None
-
- def real_play_ex(self, login_handle: int, channel: int = 0,
- use_callback: bool = True) -> Optional[int]:
- """
- 开始实时预览 (扩展版,支持视频回调)
- Args:
- login_handle: 登录句柄
- channel: 通道号
- use_callback: 是否使用回调方式获取视频
- Returns:
- 预览句柄
- """
- if not self.sdk or login_handle <= 0:
- return None
-
- if use_callback:
- # 创建帧缓冲区
- if channel not in self._video_frame_buffers:
- self._video_frame_buffers[channel] = VideoFrameBuffer()
-
- # 设置视频回调
- if self._video_callback is None:
- self._setup_video_callback()
-
- # 使用 RealPlayEx 启动预览 (rType=0 表示标准预览)
- play_handle = self.sdk.CLIENT_RealPlayEx(login_handle, channel, None, 0)
- else:
- play_handle = self.sdk.CLIENT_RealPlay(login_handle, channel, None)
-
- return play_handle if play_handle > 0 else None
-
- def _setup_video_callback(self):
- """设置视频数据回调函数"""
- # 视频回调函数签名:
- # void CALLBACK RealDataCallBackEx(LONG lRealHandle, DWORD dwDataType, BYTE *pBuffer, DWORD dwBufSize, LONG param, LDWORD dwUser)
- def video_callback(real_handle: c_long, data_type: c_ulong,
- buffer: POINTER(c_ubyte), buf_size: c_ulong,
- param: c_long, user_data: c_ulong):
- """视频数据回调 (内部函数)"""
- # data_type:
- # 0 = 系统头
- # 1 = 流数据 (P帧/I帧)
- # 2 = 音频数据
- # 3 = 智能信息
-
- if data_type == 1 and buf_size > 0: # 视频流数据
- # 注意: 这里收到的是编码后的视频流数据,需要解码才能得到图像
- # 实际解码需要使用 ffmpeg 或 SDK 的解码接口
- pass
-
- # 创建 ctypes 回调并保存引用
- self._video_callback = CFUNCTYPE(
- None, c_long, c_ulong, POINTER(c_ubyte), c_ulong, c_long, c_ulong
- )(video_callback)
-
- # 设置 SDK 回调
- self.sdk.CLIENT_SetVideoProcCallBack(self._video_callback, 0)
-
- def get_video_frame_buffer(self, channel: int = 0) -> Optional[VideoFrameBuffer]:
- """获取视频帧缓冲区"""
- return self._video_frame_buffers.get(channel)
-
- def stop_real_play(self, play_handle: int) -> bool:
- """停止实时预览"""
- if not self.sdk or play_handle <= 0:
- return False
- return self.sdk.CLIENT_StopRealPlay(play_handle)
-
- def ptz_control(self, login_handle: int, channel: int,
- command: int, param1: int, param2: int, param3: int,
- stop: bool = False) -> bool:
- """
- PTZ控制
- Args:
- login_handle: 登录句柄
- channel: 通道号
- command: 控制命令 (DH_EXTPTZ_ControlType)
- param1-param3: 控制参数
- stop: 是否停止(用于持续移动)
- Returns:
- 是否成功
- """
- if not self.sdk or login_handle <= 0:
- return False
- return self.sdk.CLIENT_DHPTZControlEx(
- login_handle, channel, command, param1, param2, param3, stop
- )
-
- def snap_picture(self, login_handle: int, channel: int = 0,
- cmd_serial: int = 0) -> bool:
- """
- 抓拍图片
- Args:
- login_handle: 登录句柄
- channel: 通道号
- cmd_serial: 命令序列号
- Returns:
- 是否成功
- """
- if not self.sdk or login_handle <= 0:
- return False
-
- params = self.SNAP_PARAMS()
- params.Channel = channel
- params.mode = 0 # SNAP_TYP_TIMING
- params.CmdSerial = cmd_serial
- params.PicTransType = 0
- params.Quality = 1
- params.PicFormat = 0 # BMP
-
- return self.sdk.CLIENT_SnapPicture(login_handle, byref(params))
- # PTZ控制命令常量 (从dhnetsdk.h提取)
- class PTZCommand:
- """PTZ控制命令常量"""
-
- # 基本控制
- UP = 0 # 上
- DOWN = 1 # 下
- LEFT = 2 # 左
- RIGHT = 3 # 右
- ZOOM_ADD = 4 # 变倍+
- ZOOM_DEC = 5 # 变倍-
- FOCUS_ADD = 6 # 聚焦+
- FOCUS_DEC = 7 # 聚焦-
- APERTURE_ADD = 8 # 光圈+
- APERTURE_DEC = 9 # 光圈-
-
- # 扩展控制
- EXACTGOTO = 23 # 三维精确定位 (param1:水平角0~3600, param2:垂直角-1800~1800, param3:变倍1~128)
- GOTOPRESET = 39 # 转到预置点
- MOVE_ABSOLUTELY = 41 # 绝对移动
-
- # 预置点控制
- POINT_SET = 16 # 设置预置点
- POINT_CLEAR = 17 # 清除预置点
- POINT_GO = 18 # 转到预置点
- class SDKError:
- """SDK错误码"""
- SUCCESS = 0
- NET_ERROR_PASSWORD = 1
- NET_ERROR_USER = 2
- NET_ERROR_TIMEOUT = 3
- NET_ERROR_RECONNECT = 4
|