mqtt_client.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. import paho.mqtt.client as mqtt
  2. import threading
  3. import json
  4. import time
  5. import logging
  6. import ssl
  7. from dataclasses import dataclass, field
  8. from typing import Dict, List, Optional, Tuple, Any
  9. # 配置日志
  10. logging.basicConfig(
  11. level=logging.INFO,
  12. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  13. )
  14. logger = logging.getLogger('mqtt_client')
  15. @dataclass
  16. class MQTTConfig:
  17. """MQTT配置数据类"""
  18. broker: str
  19. port: int = 1883
  20. username: str = ""
  21. password: str = ""
  22. client_id: str = ""
  23. keepalive: int = 60
  24. qos: int = 0
  25. # TLS/SSL 配置
  26. use_tls: bool = False
  27. tls_version: int = ssl.PROTOCOL_TLSv1_2
  28. ca_certs: Optional[str] = None
  29. certfile: Optional[str] = None
  30. keyfile: Optional[str] = None
  31. # 重连配置
  32. reconnect_delay: float = 1.0
  33. reconnect_delay_max: float = 120.0
  34. reconnect_exponential_backoff: bool = True
  35. max_reconnect_attempts: int = 0 # 0 表示无限尝试
  36. # 消息配置
  37. will_topic: Optional[str] = None
  38. will_payload: Optional[str] = None
  39. will_qos: int = 0
  40. will_retain: bool = False
  41. class MQTTClient:
  42. """MQTT客户端封装类,提供高级MQTT通信功能"""
  43. def __init__(self):
  44. self.client = None
  45. self.is_connected = False
  46. self.config = None
  47. self.topics = [] # 存储主题和QoS的元组列表
  48. self.data_callback = None
  49. self.status_callback = None
  50. self.error_callback = None
  51. self.connected_event = threading.Event()
  52. self.lock = threading.RLock()
  53. self.reconnect_attempts = 0
  54. self.last_reconnect_time = 0
  55. self.last_will_set = False
  56. def connect(self, broker: str, port: int = 1883, username: str = "",
  57. password: str = "", client_id: str = "", use_tls: bool = False,
  58. qos: int = 0, **kwargs) -> Tuple[bool, str]:
  59. """
  60. 连接到MQTT服务器
  61. Args:
  62. broker: MQTT服务器地址
  63. port: MQTT服务器端口
  64. username: 用户名
  65. password: 密码
  66. client_id: 客户端ID,为空时自动生成
  67. use_tls: 是否使用TLS加密
  68. qos: 默认QoS级别
  69. **kwargs: 其他配置参数
  70. Returns:
  71. tuple: (是否成功, 消息)
  72. """
  73. try:
  74. # 构建配置
  75. config = MQTTConfig(
  76. broker=broker,
  77. port=port,
  78. username=username,
  79. password=password,
  80. client_id=client_id or f"serial_mqtt_gateway_{time.time()}",
  81. use_tls=use_tls,
  82. qos=qos,
  83. **kwargs
  84. )
  85. with self.lock:
  86. # 断开现有连接
  87. if self.is_connected:
  88. self.disconnect()
  89. logger.info(f"尝试连接MQTT服务器: {broker}:{port}")
  90. # 创建MQTT客户端
  91. self.client = mqtt.Client(
  92. client_id=config.client_id,
  93. clean_session=True,
  94. userdata=None
  95. )
  96. # 设置回调函数
  97. self.client.on_connect = self._on_connect
  98. self.client.on_disconnect = self._on_disconnect
  99. self.client.on_message = self._on_message
  100. self.client.on_publish = self._on_publish
  101. self.client.on_subscribe = self._on_subscribe
  102. # 设置重连参数
  103. self.client.reconnect_delay_set(
  104. min_delay=config.reconnect_delay,
  105. max_delay=config.reconnect_delay_max,
  106. exponential_backoff=config.reconnect_exponential_backoff
  107. )
  108. # 设置用户名密码
  109. if config.username and config.password:
  110. self.client.username_pw_set(config.username, config.password)
  111. logger.debug(f"已设置MQTT用户名密码认证")
  112. # 配置TLS
  113. if config.use_tls:
  114. try:
  115. self.client.tls_set(
  116. ca_certs=config.ca_certs,
  117. certfile=config.certfile,
  118. keyfile=config.keyfile,
  119. tls_version=config.tls_version
  120. )
  121. logger.debug(f"已配置MQTT TLS连接")
  122. except Exception as e:
  123. error_msg = f"配置TLS失败: {str(e)}"
  124. logger.error(error_msg)
  125. return False, error_msg
  126. # 设置遗嘱消息
  127. if config.will_topic:
  128. will_payload = config.will_payload or "{\"status\":\"offline\"}"
  129. self.client.will_set(
  130. topic=config.will_topic,
  131. payload=will_payload,
  132. qos=config.will_qos,
  133. retain=config.will_retain
  134. )
  135. self.last_will_set = True
  136. logger.debug(f"已设置MQTT遗嘱消息: {config.will_topic}")
  137. # 连接到服务器
  138. self.client.connect(
  139. broker=config.broker,
  140. port=config.port,
  141. keepalive=config.keepalive
  142. )
  143. # 启动客户端循环
  144. self.client.loop_start()
  145. # 保存配置
  146. self.config = config
  147. # 等待连接成功或超时
  148. connected = self.connected_event.wait(timeout=10)
  149. if connected:
  150. logger.info(f"已连接到MQTT服务器 {broker}:{port}")
  151. self.reconnect_attempts = 0
  152. self.last_reconnect_time = time.time()
  153. if self.status_callback:
  154. self.status_callback(True)
  155. # 重新订阅之前的主题
  156. if self.topics:
  157. self._resubscribe_topics()
  158. return True, f"已连接到MQTT服务器 {broker}:{port}"
  159. else:
  160. error_msg = "连接MQTT服务器超时"
  161. logger.error(error_msg)
  162. self.client.loop_stop()
  163. self.client = None
  164. if self.status_callback:
  165. self.status_callback(False)
  166. return False, error_msg
  167. except Exception as e:
  168. error_msg = f"连接失败: {str(e)}"
  169. logger.error(error_msg)
  170. if self.status_callback:
  171. self.status_callback(False)
  172. if self.error_callback:
  173. self.error_callback(error_msg)
  174. return False, error_msg
  175. def disconnect(self) -> Tuple[bool, str]:
  176. """
  177. 断开MQTT连接
  178. Returns:
  179. tuple: (是否成功, 消息)
  180. """
  181. try:
  182. with self.lock:
  183. logger.info("断开MQTT连接")
  184. if self.client:
  185. # 发布在线状态(如果设置了遗嘱消息)
  186. if self.last_will_set and self.is_connected:
  187. try:
  188. self.client.publish(
  189. topic=self.config.will_topic,
  190. payload="{\"status\":\"online\"}",
  191. qos=self.config.will_qos,
  192. retain=self.config.will_retain
  193. )
  194. time.sleep(0.1) # 给发布消息一些时间
  195. except Exception as e:
  196. logger.warning(f"发布离线状态失败: {str(e)}")
  197. # 停止循环并断开连接
  198. self.client.loop_stop()
  199. try:
  200. self.client.disconnect()
  201. except Exception as e:
  202. logger.error(f"断开连接时出错: {str(e)}")
  203. finally:
  204. self.client = None
  205. self.is_connected = False
  206. self.connected_event.clear()
  207. if self.status_callback:
  208. self.status_callback(False)
  209. return True, "已断开MQTT连接"
  210. except Exception as e:
  211. error_msg = f"断开连接失败: {str(e)}"
  212. logger.error(error_msg)
  213. return False, error_msg
  214. def subscribe(self, topics, qos: Optional[int] = None) -> Tuple[bool, str]:
  215. """
  216. 订阅主题
  217. Args:
  218. topics: 主题字符串或列表
  219. qos: QoS级别,如果为None则使用配置中的默认值
  220. Returns:
  221. tuple: (是否成功, 消息)
  222. """
  223. try:
  224. with self.lock:
  225. if not self.client or not self.is_connected:
  226. return False, "MQTT未连接"
  227. # 确定QoS级别
  228. qos_level = qos if qos is not None else (self.config.qos if self.config else 0)
  229. # 格式化主题列表
  230. if isinstance(topics, str):
  231. topic_list = [(topics, qos_level)]
  232. topic_names = [topics]
  233. else:
  234. topic_list = [(topic, qos_level) for topic in topics]
  235. topic_names = topics
  236. # 订阅主题
  237. result, _ = self.client.subscribe(topic_list)
  238. if result == mqtt.MQTT_ERR_SUCCESS:
  239. # 更新本地主题列表
  240. self.topics = topic_list
  241. logger.info(f"已订阅主题: {', '.join(topic_names)}, QoS: {qos_level}")
  242. return True, f"已订阅主题: {', '.join(topic_names)}"
  243. else:
  244. error_msg = f"订阅失败: {mqtt.error_string(result)}"
  245. logger.error(error_msg)
  246. return False, error_msg
  247. except Exception as e:
  248. error_msg = f"订阅失败: {str(e)}"
  249. logger.error(error_msg)
  250. return False, error_msg
  251. def publish(self, topic: str, message: Any, qos: Optional[int] = None,
  252. retain: bool = False) -> Tuple[bool, str]:
  253. """
  254. 发布消息
  255. Args:
  256. topic: 发布主题
  257. message: 消息内容(自动转换为JSON字符串)
  258. qos: QoS级别,如果为None则使用配置中的默认值
  259. retain: 是否为保留消息
  260. Returns:
  261. tuple: (是否成功, 消息)
  262. """
  263. try:
  264. with self.lock:
  265. if not self.client or not self.is_connected:
  266. return False, "MQTT未连接"
  267. # 确定QoS级别
  268. qos_level = qos if qos is not None else (self.config.qos if self.config else 0)
  269. # 序列化消息
  270. if isinstance(message, (dict, list)):
  271. payload = json.dumps(message)
  272. else:
  273. payload = str(message)
  274. # 发布消息
  275. result = self.client.publish(
  276. topic=topic,
  277. payload=payload,
  278. qos=qos_level,
  279. retain=retain
  280. )
  281. # 检查发布是否成功
  282. if result.rc == mqtt.MQTT_ERR_SUCCESS:
  283. logger.debug(f"消息已发布到主题 {topic}: {payload[:50]}...")
  284. return True, f"消息已发布到主题 {topic}"
  285. else:
  286. error_msg = f"发布失败: {mqtt.error_string(result.rc)}"
  287. logger.error(error_msg)
  288. return False, error_msg
  289. except Exception as e:
  290. error_msg = f"发布失败: {str(e)}"
  291. logger.error(error_msg)
  292. return False, error_msg
  293. def unsubscribe(self, topics) -> Tuple[bool, str]:
  294. """
  295. 取消订阅主题
  296. Args:
  297. topics: 主题字符串或列表
  298. Returns:
  299. tuple: (是否成功, 消息)
  300. """
  301. try:
  302. with self.lock:
  303. if not self.client or not self.is_connected:
  304. return False, "MQTT未连接"
  305. # 格式化主题列表
  306. if isinstance(topics, str):
  307. topic_list = [topics]
  308. else:
  309. topic_list = topics
  310. # 取消订阅
  311. result = self.client.unsubscribe(topic_list)
  312. if result.rc == mqtt.MQTT_ERR_SUCCESS:
  313. # 更新本地主题列表
  314. self.topics = [(t, q) for t, q in self.topics if t not in topic_list]
  315. logger.info(f"已取消订阅主题: {', '.join(topic_list)}")
  316. return True, f"已取消订阅主题: {', '.join(topic_list)}"
  317. else:
  318. error_msg = f"取消订阅失败: {mqtt.error_string(result.rc)}"
  319. logger.error(error_msg)
  320. return False, error_msg
  321. except Exception as e:
  322. error_msg = f"取消订阅失败: {str(e)}"
  323. logger.error(error_msg)
  324. return False, error_msg
  325. def _resubscribe_topics(self):
  326. """重新订阅所有已保存的主题"""
  327. if self.topics:
  328. try:
  329. topic_names = [topic for topic, _ in self.topics]
  330. logger.info(f"重新订阅之前的主题: {', '.join(topic_names)}")
  331. self.client.subscribe(self.topics)
  332. except Exception as e:
  333. logger.error(f"重新订阅主题失败: {str(e)}")
  334. def _on_connect(self, client, userdata, flags, rc):
  335. """连接回调函数"""
  336. if rc == 0:
  337. self.is_connected = True
  338. self.connected_event.set()
  339. logger.info(f"成功连接到MQTT服务器: {self.config.broker}:{self.config.port}")
  340. # 重置重连计数
  341. self.reconnect_attempts = 0
  342. self.last_reconnect_time = time.time()
  343. else:
  344. self.is_connected = False
  345. self.connected_event.clear()
  346. error_msg = f"连接MQTT服务器失败: {mqtt.connack_string(rc)} (代码: {rc})"
  347. logger.error(error_msg)
  348. if self.error_callback:
  349. self.error_callback(error_msg)
  350. def _on_disconnect(self, client, userdata, rc):
  351. """断开连接回调函数"""
  352. self.is_connected = False
  353. self.connected_event.clear()
  354. # 区分主动断开和意外断开
  355. if rc != 0:
  356. error_msg = f"意外断开MQTT连接 (代码: {rc})"
  357. logger.warning(error_msg)
  358. # 处理重连逻辑
  359. self._handle_reconnect()
  360. if self.error_callback:
  361. self.error_callback(error_msg)
  362. else:
  363. logger.info("主动断开MQTT连接")
  364. if self.status_callback:
  365. self.status_callback(False)
  366. def _on_message(self, client, userdata, msg):
  367. """消息接收回调函数"""
  368. try:
  369. # 尝试多种编码解码
  370. encodings = ['utf-8', 'latin-1', 'ascii']
  371. payload = None
  372. for encoding in encodings:
  373. try:
  374. payload = msg.payload.decode(encoding)
  375. break
  376. except UnicodeDecodeError:
  377. continue
  378. # 如果都失败,转为十六进制
  379. if payload is None:
  380. payload = msg.payload.hex()
  381. logger.warning(f"收到无法解码的二进制消息,已转为十六进制: {payload[:50]}...")
  382. # 构建消息数据结构
  383. data = {
  384. 'topic': msg.topic,
  385. 'payload': payload,
  386. 'qos': msg.qos,
  387. 'retain': msg.retain,
  388. 'timestamp': time.time()
  389. }
  390. logger.debug(f"收到MQTT消息: 主题={msg.topic}, 长度={len(msg.payload)}字节")
  391. if self.data_callback:
  392. self.data_callback(data)
  393. except Exception as e:
  394. error_msg = f"处理MQTT消息错误: {str(e)}"
  395. logger.error(error_msg)
  396. if self.error_callback:
  397. self.error_callback(error_msg)
  398. def _on_publish(self, client, userdata, mid):
  399. """发布回调函数"""
  400. logger.debug(f"消息发布成功,消息ID: {mid}")
  401. def _on_subscribe(self, client, userdata, mid, granted_qos):
  402. """订阅回调函数"""
  403. logger.debug(f"主题订阅成功,消息ID: {mid}, 授权QoS: {granted_qos}")
  404. def _handle_reconnect(self):
  405. """处理重连逻辑"""
  406. if not self.config or self.config.max_reconnect_attempts == 0:
  407. # 如果未设置最大重连次数或为0,则无限重连
  408. return
  409. self.reconnect_attempts += 1
  410. if self.reconnect_attempts > self.config.max_reconnect_attempts:
  411. logger.error(f"已达到最大重连次数 ({self.config.max_reconnect_attempts}),停止重连")
  412. # 可以在这里调用断开连接或通知上层
  413. self.disconnect()
  414. def set_data_callback(self, callback):
  415. """设置数据接收回调函数"""
  416. self.data_callback = callback
  417. def set_status_callback(self, callback):
  418. """设置状态变化回调函数"""
  419. self.status_callback = callback
  420. def set_error_callback(self, callback):
  421. """设置错误回调函数"""
  422. self.error_callback = callback
  423. def get_status(self) -> Dict[str, Any]:
  424. """
  425. 获取当前连接状态
  426. Returns:
  427. dict: 包含连接状态和详细信息
  428. """
  429. with self.lock:
  430. status = {
  431. 'connected': self.is_connected,
  432. 'broker': self.config.broker if self.config else None,
  433. 'port': self.config.port if self.config else None,
  434. 'client_id': self.config.client_id if self.config else None,
  435. 'topics': [topic for topic, _ in self.topics],
  436. 'reconnect_attempts': self.reconnect_attempts,
  437. 'last_reconnect_time': self.last_reconnect_time
  438. }
  439. return status
  440. def get_config(self) -> Optional[MQTTConfig]:
  441. """获取当前MQTT配置"""
  442. return self.config
  443. # 创建全局MQTT实例
  444. global_mqtt = MQTTClient()