""" 大华SDK Python封装 使用ctypes调用大华NetSDK """ import ctypes import os import threading import queue from ctypes import c_int, c_long, c_uint32, c_char_p, c_ubyte, POINTER, Structure, byref, CFUNCTYPE # 大华SDK Linux类型映射 (dhnetsdk.h非Windows定义): # DWORD = unsigned int (4 bytes) → c_uint32 # LONG = int (4 bytes) → c_int # LLONG = long (8 bytes on LP64) → c_long # LDWORD = long (8 bytes on LP64) → c_long from typing import Optional, Callable, Tuple, Dict, Any class VideoFrameBuffer: """ 视频帧缓冲区 用于SDK回调存储帧数据 """ def __init__(self, max_size: int = 10): self.buffer = queue.Queue(maxsize=max_size) self.lock = threading.Lock() self.latest_frame = None def put(self, frame_data: bytes, width: int, height: int, frame_type: int = 0): """存入帧数据""" with self.lock: self.latest_frame = { 'data': frame_data, 'width': width, 'height': height, 'type': frame_type } try: self.buffer.put_nowait(self.latest_frame) except queue.Full: pass def get(self, timeout: float = 0.1) -> Optional[Dict[str, Any]]: """获取帧数据""" try: return self.buffer.get(timeout=timeout) except queue.Empty: return None def get_latest(self) -> Optional[Dict[str, Any]]: """获取最新帧数据""" with self.lock: return self.latest_frame # 加载SDK库 class DahuaSDK: """大华SDK封装类""" def __init__(self, lib_path: str): """ 初始化SDK Args: lib_path: SDK库文件路径 """ self.lib_path = lib_path self.sdk = None self.initialized = False self._disconnect_callback = None # 保存回调引用,防止被垃圾回收 # 视频回调相关 self._video_callback = None self._video_frame_buffers: Dict[int, VideoFrameBuffer] = {} self._setup_structures() self._load_library() def _setup_structures(self): """设置SDK所需的C结构体""" # 登录输入参数 (必须与SDK头文件 NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY 严格对齐) class NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY(Structure): _fields_ = [ ('dwSize', c_uint32), # DWORD = uint32 ('szIP', ctypes.c_char * 64), # IP ('nPort', c_int), # 端口 ('szUserName', ctypes.c_char * 64), # 用户名 ('szPassword', ctypes.c_char * 64), # 密码 ('emSpecCap', c_int), # 登录模式 ('byReserved', ctypes.c_ubyte * 4), # 对齐保留 ('pCapParam', ctypes.c_void_p), # 扩展参数 ('emTLSCap', c_int), # TLS模式 ('szLocalIP', ctypes.c_char * 64), # 本地IP ] # 设备信息 (NET_DEVICEINFO_Ex) class NET_DEVICEINFO_Ex(Structure): _fields_ = [ ('sSerialNumber', ctypes.c_ubyte * 48), # 序列号 ('nAlarmInPortNum', c_int), # 报警输入数 ('nAlarmOutPortNum', c_int), # 报警输出数 ('nDiskNum', c_int), # 硬盘数 ('nDVRType', c_int), # DVR类型 ('nChanNum', c_int), # 通道数 ('byLimitLoginTime', ctypes.c_ubyte), # 限制登录时间 ('byLeftLogTimes', ctypes.c_ubyte), # 剩余登录次数 ('bReserved', ctypes.c_ubyte * 2), # 保留 ('nLockLeftTime', c_int), # 锁定剩余时间 ('Reserved', ctypes.c_char * 4), # 保留 ('nNTlsPort', c_int), # TLS端口 ('nKeyFrameEncrypt', c_int), # 关键帧加密 ('emAlgorithm', c_int), # 加密算法 ('Reserved2', ctypes.c_char * 8), # 保留 ] # 登录输出参数 (必须与SDK头文件严格对齐) class NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY(Structure): _fields_ = [ ('dwSize', c_uint32), # DWORD = uint32 ('stuDeviceInfo', NET_DEVICEINFO_Ex), # 设备信息 ('nError', c_int), # 错误码 ('byReserved', ctypes.c_char * 132), # 保留 ] # 抓拍参数 class SNAP_PARAMS(Structure): _fields_ = [ ('Channel', c_int), ('mode', c_int), ('CmdSerial', c_int), ('PicTransType', c_int), ('SendTotal', c_int), ('Quality', c_int), ('PicFormat', c_int), ('PicWidth', c_int), ('PicHeight', c_int), ('SnapDelayTime', c_int), ('byRes', ctypes.c_char * 12), ] self.NET_IN_LOGIN = NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY self.NET_OUT_LOGIN = NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY self.SNAP_PARAMS = SNAP_PARAMS def _load_library(self): """加载SDK动态库""" try: lib_dir = os.path.dirname(self.lib_path) # 预加载依赖库 (确保库依赖能正确解析) # 大华SDK依赖 libcrypto.so, libssl.so 等 if lib_dir: # 使用 RTLD_GLOBAL 标志加载依赖库,使其符号对后续库可见 try: # 尝试预加载常见的依赖库 for dep_lib in ['libcrypto.so', 'libssl.so']: dep_path = os.path.join(lib_dir, dep_lib) if os.path.exists(dep_path): ctypes.CDLL(dep_path, mode=ctypes.RTLD_GLOBAL) except Exception as e: # 依赖库加载失败不一定是致命错误,继续尝试加载主库 print(f"加载依赖库警告: {e}") # 加载主SDK库 self.sdk = ctypes.CDLL(self.lib_path) self._setup_functions() self.initialized = True print(f"成功加载大华SDK: {self.lib_path}") except Exception as e: print(f"加载SDK失败: {e}") raise def _setup_functions(self): """设置SDK函数签名 不同平台/版本的SDK可能缺少部分函数,用可选绑定处理: - 必需函数:缺失则初始化失败 - 可选函数:缺失时设为None,运行时检查可用性 注意: 在 Linux 上 BOOL = int (4 bytes),不是 Windows 上的 bool (1 byte) 所有返回 BOOL 的函数使用 c_int 作为返回值类型 """ # === 必需函数 === # CLIENT_Init(BOOL (CALLBACK *cbDisConnect), LDWORD) -> BOOL self.sdk.CLIENT_Init.argtypes = [ctypes.c_void_p, c_long] self.sdk.CLIENT_Init.restype = c_int # BOOL = int on Linux # CLIENT_Cleanup() self.sdk.CLIENT_Cleanup.argtypes = [] self.sdk.CLIENT_Cleanup.restype = None # CLIENT_LoginWithHighLevelSecurity -> LLONG self.sdk.CLIENT_LoginWithHighLevelSecurity.argtypes = [ POINTER(self.NET_IN_LOGIN), POINTER(self.NET_OUT_LOGIN) ] self.sdk.CLIENT_LoginWithHighLevelSecurity.restype = c_long # CLIENT_Logout(LLONG) -> BOOL self.sdk.CLIENT_Logout.argtypes = [c_long] self.sdk.CLIENT_Logout.restype = c_int # BOOL = int on Linux # CLIENT_RealPlay(LLONG, int, void*) -> LLONG self.sdk.CLIENT_RealPlay.argtypes = [c_long, c_int, ctypes.c_void_p] self.sdk.CLIENT_RealPlay.restype = c_long # CLIENT_StopRealPlay(LLONG) -> BOOL self.sdk.CLIENT_StopRealPlay.argtypes = [c_long] self.sdk.CLIENT_StopRealPlay.restype = c_int # BOOL = int on Linux # CLIENT_DHPTZControlEx(LLONG, int, DWORD, LONG, LONG, LONG, BOOL) -> BOOL # 注意: 在 Linux 上 BOOL = int (4 bytes),不是 Windows 上的 bool # 使用 c_int 而不是 c_bool 以确保参数大小正确 self.sdk.CLIENT_DHPTZControlEx.argtypes = [ c_long, c_int, c_uint32, c_int, c_int, c_int, c_int ] self.sdk.CLIENT_DHPTZControlEx.restype = c_int # BOOL = int on Linux # CLIENT_SnapPicture(LLONG, SNAP_PARAMS*) -> BOOL self.sdk.CLIENT_SnapPicture.argtypes = [c_long, POINTER(self.SNAP_PARAMS)] self.sdk.CLIENT_SnapPicture.restype = c_int # BOOL = int on Linux # CLIENT_GetLastError() -> DWORD self.sdk.CLIENT_GetLastError.argtypes = [] self.sdk.CLIENT_GetLastError.restype = c_uint32 # === 可选函数(某些SDK版本/平台可能缺失)=== self._optional_funcs = {} self._bind_optional('CLIENT_SetSnapRevCallBack', [ctypes.c_void_p, c_long], None) self._bind_optional('CLIENT_RealPlayEx', [c_long, c_int, ctypes.c_void_p, c_int], c_long) self._bind_optional('CLIENT_StopRealPlayEx', [c_long], c_int) # BOOL = int on Linux self._bind_optional('CLIENT_SetVideoProcCallBack', [ctypes.c_void_p, c_long], None) def _bind_optional(self, name: str, argtypes: list, restype): """绑定可选SDK函数,缺失时设为None而不报错""" try: func = getattr(self.sdk, name, None) if func is not None: func.argtypes = argtypes func.restype = restype self._optional_funcs[name] = func else: self._optional_funcs[name] = None print(f"SDK可选函数缺失: {name}") except (AttributeError, OSError) as e: self._optional_funcs[name] = None print(f"SDK可选函数不可用: {name} - {e}") def _has_func(self, name: str) -> bool: """检查SDK函数是否可用""" return self._optional_funcs.get(name) is not None def init(self, disconnect_callback: Callable = None) -> bool: """ 初始化SDK Args: disconnect_callback: 断线回调函数 Returns: 是否成功 """ if not self.sdk: return False if disconnect_callback: # 保存回调引用,防止被垃圾回收 # 回调签名: BOOL (CALLBACK *cbDisConnect)(LLONG lLoginID, char *pchDVRIP, LONG nDVRPort, LDWORD dwUser) # 在 Linux 上 BOOL = int,所以使用 c_int 作为返回值类型 self._disconnect_callback = ctypes.CFUNCTYPE( c_int, c_long, c_char_p, c_int, c_long )(disconnect_callback) result = self.sdk.CLIENT_Init(self._disconnect_callback, 0) # SDK 返回 TRUE(非0) 表示成功 self.initialized = (result != 0) return self.initialized def cleanup(self): """清理SDK资源""" if self.sdk and self.initialized: self.sdk.CLIENT_Cleanup() self.initialized = False def login(self, ip: str, port: int, username: str, password: str) -> Tuple[Optional[int], int]: """ 登录设备 Args: ip: 设备IP port: 端口号 username: 用户名 password: 密码 Returns: (登录句柄, 错误码) - 成功时错误码为0 """ if not self.sdk: return None, -1 in_param = self.NET_IN_LOGIN() in_param.dwSize = ctypes.sizeof(self.NET_IN_LOGIN) in_param.szIP = ip.encode('utf-8') in_param.nPort = port in_param.szUserName = username.encode('utf-8') in_param.szPassword = password.encode('utf-8') in_param.emSpecCap = 0 # EM_LOGIN_SPEC_CAP_TCP out_param = self.NET_OUT_LOGIN() out_param.dwSize = ctypes.sizeof(self.NET_OUT_LOGIN) login_handle = self.sdk.CLIENT_LoginWithHighLevelSecurity( byref(in_param), byref(out_param) ) if login_handle == 0: return None, out_param.nError return login_handle, 0 def logout(self, login_handle: int) -> bool: """登出设备""" if not self.sdk or login_handle <= 0: return False result = self.sdk.CLIENT_Logout(login_handle) return result != 0 # SDK 返回 TRUE(非0) 表示成功 def real_play(self, login_handle: int, channel: int = 0) -> Optional[int]: """ 开始实时预览 Args: login_handle: 登录句柄 channel: 通道号 Returns: 预览句柄 """ if not self.sdk or login_handle <= 0: return None play_handle = self.sdk.CLIENT_RealPlay(login_handle, channel, None) return play_handle if play_handle > 0 else None def real_play_ex(self, login_handle: int, channel: int = 0, use_callback: bool = True) -> Optional[int]: if not self.sdk or login_handle <= 0: return None if not self._has_func('CLIENT_RealPlayEx'): return self.real_play(login_handle, channel) if use_callback and self._has_func('CLIENT_SetVideoProcCallBack'): if channel not in self._video_frame_buffers: self._video_frame_buffers[channel] = VideoFrameBuffer() if self._video_callback is None: self._setup_video_callback() play_handle = self.sdk.CLIENT_RealPlayEx(login_handle, channel, None, 0) else: play_handle = self.sdk.CLIENT_RealPlayEx(login_handle, channel, None, 0) if self._has_func('CLIENT_RealPlayEx') else self.real_play(login_handle, channel) return play_handle if play_handle > 0 else None def _setup_video_callback(self): if not self._has_func('CLIENT_SetVideoProcCallBack'): print("SDK不支持CLIENT_SetVideoProcCallBack,视频回调不可用") return def video_callback(real_handle: c_long, data_type: c_uint32, buffer: POINTER(c_ubyte), buf_size: c_uint32, param: c_long, user_data: c_long): if data_type == 1 and buf_size > 0: pass self._video_callback = CFUNCTYPE( None, c_long, c_uint32, POINTER(c_ubyte), c_uint32, c_long, c_long )(video_callback) self.sdk.CLIENT_SetVideoProcCallBack(self._video_callback, 0) def get_video_frame_buffer(self, channel: int = 0) -> Optional[VideoFrameBuffer]: """获取视频帧缓冲区""" return self._video_frame_buffers.get(channel) def stop_real_play(self, play_handle: int) -> bool: """停止实时预览""" if not self.sdk or play_handle <= 0: return False result = self.sdk.CLIENT_StopRealPlay(play_handle) return result != 0 # SDK 返回 TRUE(非0) 表示成功 def ptz_control(self, login_handle: int, channel: int, command: int, param1: int, param2: int, param3: int, stop: bool = False) -> bool: """ PTZ控制 Args: login_handle: 登录句柄 channel: 通道号 command: 控制命令 (DH_EXTPTZ_ControlType) param1-param3: 控制参数 stop: 是否停止(用于持续移动) Returns: 是否成功 """ if not self.sdk or login_handle <= 0: print(f"[PTZ] 失败: sdk={self.sdk is not None}, handle={login_handle}") return False # 命令名称映射 (用于调试日志) cmd_names = { 0: 'UP', 1: 'DOWN', 2: 'LEFT', 3: 'RIGHT', 4: 'ZOOM_ADD', 5: 'ZOOM_DEC', 0x20: 'LEFTTOP', 0x21: 'RIGHTTOP', 0x22: 'LEFTDOWN', 0x23: 'RIGHTDOWN', 0x33: 'FASTGOTO', 0x43: 'EXACTGOTO', 0x44: 'RESETZERO', 0x45: 'MOVE_ABSOLUTELY', 0x46: 'MOVE_CONTINUOUSLY', 0x47: 'GOTOPRESET', } cmd_name = cmd_names.get(command, f'CMD_{command:#x}') # 将 stop (bool) 转换为 int (TRUE=1, FALSE=0),匹配 Linux SDK 的 BOOL 定义 stop_int = 1 if stop else 0 result = self.sdk.CLIENT_DHPTZControlEx( login_handle, channel, command, param1, param2, param3, stop_int ) # SDK 返回 TRUE(非0) 表示成功,FALSE(0) 表示失败 success = (result != 0) print(f"[PTZ] {cmd_name}(ch={channel}, p1={param1}, p2={param2}, p3={param3}, stop={stop}) → {'✓' if success else '✗'} (ret={result})") if not success: # 获取SDK错误码 error = self.sdk.CLIENT_GetLastError() if hasattr(self.sdk, 'CLIENT_GetLastError') else -1 print(f"[PTZ] 错误码: {error}") return success def snap_picture(self, login_handle: int, channel: int = 0, cmd_serial: int = 0) -> bool: """ 抓拍图片 Args: login_handle: 登录句柄 channel: 通道号 cmd_serial: 命令序列号 Returns: 是否成功 """ if not self.sdk or login_handle <= 0: return False params = self.SNAP_PARAMS() params.Channel = channel params.mode = 0 # SNAP_TYP_TIMING params.CmdSerial = cmd_serial params.PicTransType = 0 params.Quality = 1 params.PicFormat = 0 # BMP result = self.sdk.CLIENT_SnapPicture(login_handle, byref(params)) return result != 0 # SDK 返回 TRUE(非0) 表示成功 # PTZ控制命令常量 (从dhnetsdk.h提取) class PTZCommand: """PTZ控制命令常量 (DH_EXTPTZ_ControlType from dhnetsdk.h)""" # 基本控制 (DH_PTZ_ControlType) UP = 0 DOWN = 1 LEFT = 2 RIGHT = 3 ZOOM_ADD = 4 ZOOM_DEC = 5 FOCUS_ADD = 6 FOCUS_DEC = 7 APERTURE_ADD = 8 APERTURE_DEC = 9 # 扩展控制 (DH_EXTPTZ_ControlType, 从0x20开始) LEFTTOP = 0x20 RIGHTTOP = 0x21 LEFTDOWN = 0x22 RIGHTDOWN = 0x23 ADDTOLOOP = 0x24 DELFROMLOOP = 0x25 CLOSELOOP = 0x26 STARTPANCRUISE = 0x27 STOPPANCRUISE = 0x28 SETLEFTBORDER = 0x29 SETRIGHTBORDER = 0x2a STARTLINESCAN = 0x2b CLOSELINESCAN = 0x2c SETMODESTART = 0x2d SETMODESTOP = 0x2e RUNMODE = 0x2f STOPMODE = 0x30 DELETEMODE = 0x31 REVERSECOMM = 0x32 FASTGOTO = 0x33 AUXIOPEN = 0x34 AUXICLOSE = 0x35 OPENMENU = 0x36 CLOSEMENU = 0x37 MENUOK = 0x38 MENUCANCEL = 0x39 MENUUP = 0x3a MENUDOWN = 0x3b MENULEFT = 0x3c MENURIGHT = 0x3d ALARMHANDLE = 0x40 MATRIXSWITCH = 0x41 LIGHTCONTROL = 0x42 EXACTGOTO = 0x43 # 三维精确定位 RESETZERO = 0x44 MOVE_ABSOLUTELY = 0x45 MOVE_CONTINUOUSLY = 0x46 GOTOPRESET = 0x47 SET_VIEW_RANGE = 0x49 FOCUS_ABSOLUTELY = 0x4a class SDKError: """SDK错误码""" SUCCESS = 0 NET_ERROR_PASSWORD = 1 NET_ERROR_USER = 2 NET_ERROR_TIMEOUT = 3 NET_ERROR_RECONNECT = 4