| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527 |
- """
- 大华SDK Python封装
- 使用ctypes调用大华NetSDK
- """
- import ctypes
- import os
- import threading
- import queue
- from ctypes import c_int, c_long, c_uint32, c_char_p, c_ubyte, POINTER, Structure, byref, CFUNCTYPE
- # 大华SDK Linux类型映射 (dhnetsdk.h非Windows定义):
- # DWORD = unsigned int (4 bytes) → c_uint32
- # LONG = int (4 bytes) → c_int
- # LLONG = long (8 bytes on LP64) → c_long
- # LDWORD = long (8 bytes on LP64) → c_long
- from typing import Optional, Callable, Tuple, Dict, Any
- class VideoFrameBuffer:
- """
- 视频帧缓冲区
- 用于SDK回调存储帧数据
- """
- def __init__(self, max_size: int = 10):
- self.buffer = queue.Queue(maxsize=max_size)
- self.lock = threading.Lock()
- self.latest_frame = None
-
- def put(self, frame_data: bytes, width: int, height: int, frame_type: int = 0):
- """存入帧数据"""
- with self.lock:
- self.latest_frame = {
- 'data': frame_data,
- 'width': width,
- 'height': height,
- 'type': frame_type
- }
- try:
- self.buffer.put_nowait(self.latest_frame)
- except queue.Full:
- pass
-
- def get(self, timeout: float = 0.1) -> Optional[Dict[str, Any]]:
- """获取帧数据"""
- try:
- return self.buffer.get(timeout=timeout)
- except queue.Empty:
- return None
-
- def get_latest(self) -> Optional[Dict[str, Any]]:
- """获取最新帧数据"""
- with self.lock:
- return self.latest_frame
- # 加载SDK库
- class DahuaSDK:
- """大华SDK封装类"""
-
- def __init__(self, lib_path: str):
- """
- 初始化SDK
- Args:
- lib_path: SDK库文件路径
- """
- self.lib_path = lib_path
- self.sdk = None
- self.initialized = False
- self._disconnect_callback = None # 保存回调引用,防止被垃圾回收
-
- # 视频回调相关
- self._video_callback = None
- self._video_frame_buffers: Dict[int, VideoFrameBuffer] = {}
-
- self._setup_structures()
- self._load_library()
-
- def _setup_structures(self):
- """设置SDK所需的C结构体"""
-
- # 登录输入参数 (必须与SDK头文件 NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY 严格对齐)
- class NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY(Structure):
- _fields_ = [
- ('dwSize', c_uint32), # DWORD = uint32
- ('szIP', ctypes.c_char * 64), # IP
- ('nPort', c_int), # 端口
- ('szUserName', ctypes.c_char * 64), # 用户名
- ('szPassword', ctypes.c_char * 64), # 密码
- ('emSpecCap', c_int), # 登录模式
- ('byReserved', ctypes.c_ubyte * 4), # 对齐保留
- ('pCapParam', ctypes.c_void_p), # 扩展参数
- ('emTLSCap', c_int), # TLS模式
- ('szLocalIP', ctypes.c_char * 64), # 本地IP
- ]
-
- # 设备信息 (NET_DEVICEINFO_Ex)
- class NET_DEVICEINFO_Ex(Structure):
- _fields_ = [
- ('sSerialNumber', ctypes.c_ubyte * 48), # 序列号
- ('nAlarmInPortNum', c_int), # 报警输入数
- ('nAlarmOutPortNum', c_int), # 报警输出数
- ('nDiskNum', c_int), # 硬盘数
- ('nDVRType', c_int), # DVR类型
- ('nChanNum', c_int), # 通道数
- ('byLimitLoginTime', ctypes.c_ubyte), # 限制登录时间
- ('byLeftLogTimes', ctypes.c_ubyte), # 剩余登录次数
- ('bReserved', ctypes.c_ubyte * 2), # 保留
- ('nLockLeftTime', c_int), # 锁定剩余时间
- ('Reserved', ctypes.c_char * 4), # 保留
- ('nNTlsPort', c_int), # TLS端口
- ('nKeyFrameEncrypt', c_int), # 关键帧加密
- ('emAlgorithm', c_int), # 加密算法
- ('Reserved2', ctypes.c_char * 8), # 保留
- ]
-
- # 登录输出参数 (必须与SDK头文件严格对齐)
- class NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY(Structure):
- _fields_ = [
- ('dwSize', c_uint32), # DWORD = uint32
- ('stuDeviceInfo', NET_DEVICEINFO_Ex), # 设备信息
- ('nError', c_int), # 错误码
- ('byReserved', ctypes.c_char * 132), # 保留
- ]
-
- # 抓拍参数
- class SNAP_PARAMS(Structure):
- _fields_ = [
- ('Channel', c_int),
- ('mode', c_int),
- ('CmdSerial', c_int),
- ('PicTransType', c_int),
- ('SendTotal', c_int),
- ('Quality', c_int),
- ('PicFormat', c_int),
- ('PicWidth', c_int),
- ('PicHeight', c_int),
- ('SnapDelayTime', c_int),
- ('byRes', ctypes.c_char * 12),
- ]
-
- self.NET_IN_LOGIN = NET_IN_LOGIN_WITH_HIGHLEVEL_SECURITY
- self.NET_OUT_LOGIN = NET_OUT_LOGIN_WITH_HIGHLEVEL_SECURITY
- self.SNAP_PARAMS = SNAP_PARAMS
-
- def _load_library(self):
- """加载SDK动态库"""
- try:
- lib_dir = os.path.dirname(self.lib_path)
-
- # 预加载依赖库 (确保库依赖能正确解析)
- # 大华SDK依赖 libcrypto.so, libssl.so 等
- if lib_dir:
- # 使用 RTLD_GLOBAL 标志加载依赖库,使其符号对后续库可见
- try:
- # 尝试预加载常见的依赖库
- for dep_lib in ['libcrypto.so', 'libssl.so']:
- dep_path = os.path.join(lib_dir, dep_lib)
- if os.path.exists(dep_path):
- ctypes.CDLL(dep_path, mode=ctypes.RTLD_GLOBAL)
- except Exception as e:
- # 依赖库加载失败不一定是致命错误,继续尝试加载主库
- print(f"加载依赖库警告: {e}")
-
- # 加载主SDK库
- self.sdk = ctypes.CDLL(self.lib_path)
- self._setup_functions()
- self.initialized = True
- print(f"成功加载大华SDK: {self.lib_path}")
- except Exception as e:
- print(f"加载SDK失败: {e}")
- raise
-
- def _setup_functions(self):
- """设置SDK函数签名
-
- 不同平台/版本的SDK可能缺少部分函数,用可选绑定处理:
- - 必需函数:缺失则初始化失败
- - 可选函数:缺失时设为None,运行时检查可用性
-
- 注意: 在 Linux 上 BOOL = int (4 bytes),不是 Windows 上的 bool (1 byte)
- 所有返回 BOOL 的函数使用 c_int 作为返回值类型
- """
-
- # === 必需函数 ===
-
- # CLIENT_Init(BOOL (CALLBACK *cbDisConnect), LDWORD) -> BOOL
- self.sdk.CLIENT_Init.argtypes = [ctypes.c_void_p, c_long]
- self.sdk.CLIENT_Init.restype = c_int # BOOL = int on Linux
-
- # CLIENT_Cleanup()
- self.sdk.CLIENT_Cleanup.argtypes = []
- self.sdk.CLIENT_Cleanup.restype = None
-
- # CLIENT_LoginWithHighLevelSecurity -> LLONG
- self.sdk.CLIENT_LoginWithHighLevelSecurity.argtypes = [
- POINTER(self.NET_IN_LOGIN),
- POINTER(self.NET_OUT_LOGIN)
- ]
- self.sdk.CLIENT_LoginWithHighLevelSecurity.restype = c_long
-
- # CLIENT_Logout(LLONG) -> BOOL
- self.sdk.CLIENT_Logout.argtypes = [c_long]
- self.sdk.CLIENT_Logout.restype = c_int # BOOL = int on Linux
-
- # CLIENT_RealPlay(LLONG, int, void*) -> LLONG
- self.sdk.CLIENT_RealPlay.argtypes = [c_long, c_int, ctypes.c_void_p]
- self.sdk.CLIENT_RealPlay.restype = c_long
-
- # CLIENT_StopRealPlay(LLONG) -> BOOL
- self.sdk.CLIENT_StopRealPlay.argtypes = [c_long]
- self.sdk.CLIENT_StopRealPlay.restype = c_int # BOOL = int on Linux
-
- # CLIENT_DHPTZControlEx(LLONG, int, DWORD, LONG, LONG, LONG, BOOL) -> BOOL
- # 注意: 在 Linux 上 BOOL = int (4 bytes),不是 Windows 上的 bool
- # 使用 c_int 而不是 c_bool 以确保参数大小正确
- self.sdk.CLIENT_DHPTZControlEx.argtypes = [
- c_long, c_int, c_uint32, c_int, c_int, c_int, c_int
- ]
- self.sdk.CLIENT_DHPTZControlEx.restype = c_int # BOOL = int on Linux
-
- # CLIENT_SnapPicture(LLONG, SNAP_PARAMS*) -> BOOL
- self.sdk.CLIENT_SnapPicture.argtypes = [c_long, POINTER(self.SNAP_PARAMS)]
- self.sdk.CLIENT_SnapPicture.restype = c_int # BOOL = int on Linux
-
- # CLIENT_GetLastError() -> DWORD
- self.sdk.CLIENT_GetLastError.argtypes = []
- self.sdk.CLIENT_GetLastError.restype = c_uint32
-
- # === 可选函数(某些SDK版本/平台可能缺失)===
- self._optional_funcs = {}
-
- self._bind_optional('CLIENT_SetSnapRevCallBack', [ctypes.c_void_p, c_long], None)
- self._bind_optional('CLIENT_RealPlayEx', [c_long, c_int, ctypes.c_void_p, c_int], c_long)
- self._bind_optional('CLIENT_StopRealPlayEx', [c_long], c_int) # BOOL = int on Linux
- self._bind_optional('CLIENT_SetVideoProcCallBack', [ctypes.c_void_p, c_long], None)
-
- def _bind_optional(self, name: str, argtypes: list, restype):
- """绑定可选SDK函数,缺失时设为None而不报错"""
- try:
- func = getattr(self.sdk, name, None)
- if func is not None:
- func.argtypes = argtypes
- func.restype = restype
- self._optional_funcs[name] = func
- else:
- self._optional_funcs[name] = None
- print(f"SDK可选函数缺失: {name}")
- except (AttributeError, OSError) as e:
- self._optional_funcs[name] = None
- print(f"SDK可选函数不可用: {name} - {e}")
-
- def _has_func(self, name: str) -> bool:
- """检查SDK函数是否可用"""
- return self._optional_funcs.get(name) is not None
-
- def init(self, disconnect_callback: Callable = None) -> bool:
- """
- 初始化SDK
- Args:
- disconnect_callback: 断线回调函数
- Returns:
- 是否成功
- """
- if not self.sdk:
- return False
-
- if disconnect_callback:
- # 保存回调引用,防止被垃圾回收
- # 回调签名: BOOL (CALLBACK *cbDisConnect)(LLONG lLoginID, char *pchDVRIP, LONG nDVRPort, LDWORD dwUser)
- # 在 Linux 上 BOOL = int,所以使用 c_int 作为返回值类型
- self._disconnect_callback = ctypes.CFUNCTYPE(
- c_int, c_long, c_char_p, c_int, c_long
- )(disconnect_callback)
-
- result = self.sdk.CLIENT_Init(self._disconnect_callback, 0)
- # SDK 返回 TRUE(非0) 表示成功
- self.initialized = (result != 0)
- return self.initialized
-
- def cleanup(self):
- """清理SDK资源"""
- if self.sdk and self.initialized:
- self.sdk.CLIENT_Cleanup()
- self.initialized = False
-
- def login(self, ip: str, port: int, username: str, password: str) -> Tuple[Optional[int], int]:
- """
- 登录设备
- Args:
- ip: 设备IP
- port: 端口号
- username: 用户名
- password: 密码
- Returns:
- (登录句柄, 错误码) - 成功时错误码为0
- """
- if not self.sdk:
- return None, -1
-
- in_param = self.NET_IN_LOGIN()
- in_param.dwSize = ctypes.sizeof(self.NET_IN_LOGIN)
- in_param.szIP = ip.encode('utf-8')
- in_param.nPort = port
- in_param.szUserName = username.encode('utf-8')
- in_param.szPassword = password.encode('utf-8')
- in_param.emSpecCap = 0 # EM_LOGIN_SPEC_CAP_TCP
-
- out_param = self.NET_OUT_LOGIN()
- out_param.dwSize = ctypes.sizeof(self.NET_OUT_LOGIN)
-
- login_handle = self.sdk.CLIENT_LoginWithHighLevelSecurity(
- byref(in_param), byref(out_param)
- )
-
- if login_handle == 0:
- return None, out_param.nError
-
- return login_handle, 0
-
- def logout(self, login_handle: int) -> bool:
- """登出设备"""
- if not self.sdk or login_handle <= 0:
- return False
- result = self.sdk.CLIENT_Logout(login_handle)
- return result != 0 # SDK 返回 TRUE(非0) 表示成功
-
- def real_play(self, login_handle: int, channel: int = 0) -> Optional[int]:
- """
- 开始实时预览
- Args:
- login_handle: 登录句柄
- channel: 通道号
- Returns:
- 预览句柄
- """
- if not self.sdk or login_handle <= 0:
- return None
- play_handle = self.sdk.CLIENT_RealPlay(login_handle, channel, None)
- return play_handle if play_handle > 0 else None
-
- def real_play_ex(self, login_handle: int, channel: int = 0,
- use_callback: bool = True) -> Optional[int]:
- if not self.sdk or login_handle <= 0:
- return None
-
- if not self._has_func('CLIENT_RealPlayEx'):
- return self.real_play(login_handle, channel)
-
- if use_callback and self._has_func('CLIENT_SetVideoProcCallBack'):
- if channel not in self._video_frame_buffers:
- self._video_frame_buffers[channel] = VideoFrameBuffer()
- if self._video_callback is None:
- self._setup_video_callback()
- play_handle = self.sdk.CLIENT_RealPlayEx(login_handle, channel, None, 0)
- else:
- play_handle = self.sdk.CLIENT_RealPlayEx(login_handle, channel, None, 0) if self._has_func('CLIENT_RealPlayEx') else self.real_play(login_handle, channel)
-
- return play_handle if play_handle > 0 else None
-
- def _setup_video_callback(self):
- if not self._has_func('CLIENT_SetVideoProcCallBack'):
- print("SDK不支持CLIENT_SetVideoProcCallBack,视频回调不可用")
- return
-
- def video_callback(real_handle: c_long, data_type: c_uint32,
- buffer: POINTER(c_ubyte), buf_size: c_uint32,
- param: c_long, user_data: c_long):
- if data_type == 1 and buf_size > 0:
- pass
-
- self._video_callback = CFUNCTYPE(
- None, c_long, c_uint32, POINTER(c_ubyte), c_uint32, c_long, c_long
- )(video_callback)
-
- self.sdk.CLIENT_SetVideoProcCallBack(self._video_callback, 0)
-
- def get_video_frame_buffer(self, channel: int = 0) -> Optional[VideoFrameBuffer]:
- """获取视频帧缓冲区"""
- return self._video_frame_buffers.get(channel)
-
- def stop_real_play(self, play_handle: int) -> bool:
- """停止实时预览"""
- if not self.sdk or play_handle <= 0:
- return False
- result = self.sdk.CLIENT_StopRealPlay(play_handle)
- return result != 0 # SDK 返回 TRUE(非0) 表示成功
-
- def ptz_control(self, login_handle: int, channel: int,
- command: int, param1: int, param2: int, param3: int,
- stop: bool = False) -> bool:
- """
- PTZ控制
- Args:
- login_handle: 登录句柄
- channel: 通道号
- command: 控制命令 (DH_EXTPTZ_ControlType)
- param1-param3: 控制参数
- stop: 是否停止(用于持续移动)
- Returns:
- 是否成功
- """
- if not self.sdk or login_handle <= 0:
- print(f"[PTZ] 失败: sdk={self.sdk is not None}, handle={login_handle}")
- return False
-
- # 命令名称映射 (用于调试日志)
- cmd_names = {
- 0: 'UP', 1: 'DOWN', 2: 'LEFT', 3: 'RIGHT',
- 4: 'ZOOM_ADD', 5: 'ZOOM_DEC',
- 0x20: 'LEFTTOP', 0x21: 'RIGHTTOP', 0x22: 'LEFTDOWN', 0x23: 'RIGHTDOWN',
- 0x33: 'FASTGOTO', 0x43: 'EXACTGOTO', 0x44: 'RESETZERO',
- 0x45: 'MOVE_ABSOLUTELY', 0x46: 'MOVE_CONTINUOUSLY', 0x47: 'GOTOPRESET',
- }
- cmd_name = cmd_names.get(command, f'CMD_{command:#x}')
-
- # 将 stop (bool) 转换为 int (TRUE=1, FALSE=0),匹配 Linux SDK 的 BOOL 定义
- stop_int = 1 if stop else 0
-
- result = self.sdk.CLIENT_DHPTZControlEx(
- login_handle, channel, command, param1, param2, param3, stop_int
- )
-
- # SDK 返回 TRUE(非0) 表示成功,FALSE(0) 表示失败
- success = (result != 0)
-
- print(f"[PTZ] {cmd_name}(ch={channel}, p1={param1}, p2={param2}, p3={param3}, stop={stop}) → {'✓' if success else '✗'} (ret={result})")
-
- if not success:
- # 获取SDK错误码
- error = self.sdk.CLIENT_GetLastError() if hasattr(self.sdk, 'CLIENT_GetLastError') else -1
- print(f"[PTZ] 错误码: {error}")
-
- return success
-
- def snap_picture(self, login_handle: int, channel: int = 0,
- cmd_serial: int = 0) -> bool:
- """
- 抓拍图片
- Args:
- login_handle: 登录句柄
- channel: 通道号
- cmd_serial: 命令序列号
- Returns:
- 是否成功
- """
- if not self.sdk or login_handle <= 0:
- return False
-
- params = self.SNAP_PARAMS()
- params.Channel = channel
- params.mode = 0 # SNAP_TYP_TIMING
- params.CmdSerial = cmd_serial
- params.PicTransType = 0
- params.Quality = 1
- params.PicFormat = 0 # BMP
-
- result = self.sdk.CLIENT_SnapPicture(login_handle, byref(params))
- return result != 0 # SDK 返回 TRUE(非0) 表示成功
- # PTZ控制命令常量 (从dhnetsdk.h提取)
- class PTZCommand:
- """PTZ控制命令常量 (DH_EXTPTZ_ControlType from dhnetsdk.h)"""
-
- # 基本控制 (DH_PTZ_ControlType)
- UP = 0
- DOWN = 1
- LEFT = 2
- RIGHT = 3
- ZOOM_ADD = 4
- ZOOM_DEC = 5
- FOCUS_ADD = 6
- FOCUS_DEC = 7
- APERTURE_ADD = 8
- APERTURE_DEC = 9
-
- # 扩展控制 (DH_EXTPTZ_ControlType, 从0x20开始)
- LEFTTOP = 0x20
- RIGHTTOP = 0x21
- LEFTDOWN = 0x22
- RIGHTDOWN = 0x23
- ADDTOLOOP = 0x24
- DELFROMLOOP = 0x25
- CLOSELOOP = 0x26
- STARTPANCRUISE = 0x27
- STOPPANCRUISE = 0x28
- SETLEFTBORDER = 0x29
- SETRIGHTBORDER = 0x2a
- STARTLINESCAN = 0x2b
- CLOSELINESCAN = 0x2c
- SETMODESTART = 0x2d
- SETMODESTOP = 0x2e
- RUNMODE = 0x2f
- STOPMODE = 0x30
- DELETEMODE = 0x31
- REVERSECOMM = 0x32
- FASTGOTO = 0x33
- AUXIOPEN = 0x34
- AUXICLOSE = 0x35
- OPENMENU = 0x36
- CLOSEMENU = 0x37
- MENUOK = 0x38
- MENUCANCEL = 0x39
- MENUUP = 0x3a
- MENUDOWN = 0x3b
- MENULEFT = 0x3c
- MENURIGHT = 0x3d
- ALARMHANDLE = 0x40
- MATRIXSWITCH = 0x41
- LIGHTCONTROL = 0x42
- EXACTGOTO = 0x43 # 三维精确定位
- RESETZERO = 0x44
- MOVE_ABSOLUTELY = 0x45
- MOVE_CONTINUOUSLY = 0x46
- GOTOPRESET = 0x47
- SET_VIEW_RANGE = 0x49
- FOCUS_ABSOLUTELY = 0x4a
- class SDKError:
- """SDK错误码"""
- SUCCESS = 0
- NET_ERROR_PASSWORD = 1
- NET_ERROR_USER = 2
- NET_ERROR_TIMEOUT = 3
- NET_ERROR_RECONNECT = 4
|