import paho.mqtt.client as mqtt import threading import json import time import logging import ssl from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple, Any # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('mqtt_client') @dataclass class MQTTConfig: """MQTT配置数据类""" broker: str port: int = 1883 username: str = "" password: str = "" client_id: str = "" keepalive: int = 60 qos: int = 0 # TLS/SSL 配置 use_tls: bool = False tls_version: int = ssl.PROTOCOL_TLSv1_2 ca_certs: Optional[str] = None certfile: Optional[str] = None keyfile: Optional[str] = None # 重连配置 reconnect_delay: float = 1.0 reconnect_delay_max: float = 120.0 reconnect_exponential_backoff: bool = True max_reconnect_attempts: int = 0 # 0 表示无限尝试 # 消息配置 will_topic: Optional[str] = None will_payload: Optional[str] = None will_qos: int = 0 will_retain: bool = False class MQTTClient: """MQTT客户端封装类,提供高级MQTT通信功能""" def __init__(self): self.client = None self.is_connected = False self.config = None self.topics = [] # 存储主题和QoS的元组列表 self.data_callback = None self.status_callback = None self.error_callback = None self.connected_event = threading.Event() self.lock = threading.RLock() self.reconnect_attempts = 0 self.last_reconnect_time = 0 self.last_will_set = False def connect(self, broker: str, port: int = 1883, username: str = "", password: str = "", client_id: str = "", use_tls: bool = False, qos: int = 0, **kwargs) -> Tuple[bool, str]: """ 连接到MQTT服务器 Args: broker: MQTT服务器地址 port: MQTT服务器端口 username: 用户名 password: 密码 client_id: 客户端ID,为空时自动生成 use_tls: 是否使用TLS加密 qos: 默认QoS级别 **kwargs: 其他配置参数 Returns: tuple: (是否成功, 消息) """ try: # 构建配置 config = MQTTConfig( broker=broker, port=port, username=username, password=password, client_id=client_id or f"serial_mqtt_gateway_{time.time()}", use_tls=use_tls, qos=qos, **kwargs ) with self.lock: # 断开现有连接 if self.is_connected: self.disconnect() logger.info(f"尝试连接MQTT服务器: {broker}:{port}") # 创建MQTT客户端 self.client = mqtt.Client( client_id=config.client_id, clean_session=True, userdata=None ) # 设置回调函数 self.client.on_connect = self._on_connect self.client.on_disconnect = self._on_disconnect self.client.on_message = self._on_message self.client.on_publish = self._on_publish self.client.on_subscribe = self._on_subscribe # 设置重连参数 self.client.reconnect_delay_set( min_delay=config.reconnect_delay, max_delay=config.reconnect_delay_max, exponential_backoff=config.reconnect_exponential_backoff ) # 设置用户名密码 if config.username and config.password: self.client.username_pw_set(config.username, config.password) logger.debug(f"已设置MQTT用户名密码认证") # 配置TLS if config.use_tls: try: self.client.tls_set( ca_certs=config.ca_certs, certfile=config.certfile, keyfile=config.keyfile, tls_version=config.tls_version ) logger.debug(f"已配置MQTT TLS连接") except Exception as e: error_msg = f"配置TLS失败: {str(e)}" logger.error(error_msg) return False, error_msg # 设置遗嘱消息 if config.will_topic: will_payload = config.will_payload or "{\"status\":\"offline\"}" self.client.will_set( topic=config.will_topic, payload=will_payload, qos=config.will_qos, retain=config.will_retain ) self.last_will_set = True logger.debug(f"已设置MQTT遗嘱消息: {config.will_topic}") # 连接到服务器 self.client.connect( broker=config.broker, port=config.port, keepalive=config.keepalive ) # 启动客户端循环 self.client.loop_start() # 保存配置 self.config = config # 等待连接成功或超时 connected = self.connected_event.wait(timeout=10) if connected: logger.info(f"已连接到MQTT服务器 {broker}:{port}") self.reconnect_attempts = 0 self.last_reconnect_time = time.time() if self.status_callback: self.status_callback(True) # 重新订阅之前的主题 if self.topics: self._resubscribe_topics() return True, f"已连接到MQTT服务器 {broker}:{port}" else: error_msg = "连接MQTT服务器超时" logger.error(error_msg) self.client.loop_stop() self.client = None if self.status_callback: self.status_callback(False) return False, error_msg except Exception as e: error_msg = f"连接失败: {str(e)}" logger.error(error_msg) if self.status_callback: self.status_callback(False) if self.error_callback: self.error_callback(error_msg) return False, error_msg def disconnect(self) -> Tuple[bool, str]: """ 断开MQTT连接 Returns: tuple: (是否成功, 消息) """ try: with self.lock: logger.info("断开MQTT连接") if self.client: # 发布在线状态(如果设置了遗嘱消息) if self.last_will_set and self.is_connected: try: self.client.publish( topic=self.config.will_topic, payload="{\"status\":\"online\"}", qos=self.config.will_qos, retain=self.config.will_retain ) time.sleep(0.1) # 给发布消息一些时间 except Exception as e: logger.warning(f"发布离线状态失败: {str(e)}") # 停止循环并断开连接 self.client.loop_stop() try: self.client.disconnect() except Exception as e: logger.error(f"断开连接时出错: {str(e)}") finally: self.client = None self.is_connected = False self.connected_event.clear() if self.status_callback: self.status_callback(False) return True, "已断开MQTT连接" except Exception as e: error_msg = f"断开连接失败: {str(e)}" logger.error(error_msg) return False, error_msg def subscribe(self, topics, qos: Optional[int] = None) -> Tuple[bool, str]: """ 订阅主题 Args: topics: 主题字符串或列表 qos: QoS级别,如果为None则使用配置中的默认值 Returns: tuple: (是否成功, 消息) """ try: with self.lock: if not self.client or not self.is_connected: return False, "MQTT未连接" # 确定QoS级别 qos_level = qos if qos is not None else (self.config.qos if self.config else 0) # 格式化主题列表 if isinstance(topics, str): topic_list = [(topics, qos_level)] topic_names = [topics] else: topic_list = [(topic, qos_level) for topic in topics] topic_names = topics # 订阅主题 result, _ = self.client.subscribe(topic_list) if result == mqtt.MQTT_ERR_SUCCESS: # 更新本地主题列表 self.topics = topic_list logger.info(f"已订阅主题: {', '.join(topic_names)}, QoS: {qos_level}") return True, f"已订阅主题: {', '.join(topic_names)}" else: error_msg = f"订阅失败: {mqtt.error_string(result)}" logger.error(error_msg) return False, error_msg except Exception as e: error_msg = f"订阅失败: {str(e)}" logger.error(error_msg) return False, error_msg def publish(self, topic: str, message: Any, qos: Optional[int] = None, retain: bool = False) -> Tuple[bool, str]: """ 发布消息 Args: topic: 发布主题 message: 消息内容(自动转换为JSON字符串) qos: QoS级别,如果为None则使用配置中的默认值 retain: 是否为保留消息 Returns: tuple: (是否成功, 消息) """ try: with self.lock: if not self.client or not self.is_connected: return False, "MQTT未连接" # 确定QoS级别 qos_level = qos if qos is not None else (self.config.qos if self.config else 0) # 序列化消息 if isinstance(message, (dict, list)): payload = json.dumps(message) else: payload = str(message) # 发布消息 result = self.client.publish( topic=topic, payload=payload, qos=qos_level, retain=retain ) # 检查发布是否成功 if result.rc == mqtt.MQTT_ERR_SUCCESS: logger.debug(f"消息已发布到主题 {topic}: {payload[:50]}...") return True, f"消息已发布到主题 {topic}" else: error_msg = f"发布失败: {mqtt.error_string(result.rc)}" logger.error(error_msg) return False, error_msg except Exception as e: error_msg = f"发布失败: {str(e)}" logger.error(error_msg) return False, error_msg def unsubscribe(self, topics) -> Tuple[bool, str]: """ 取消订阅主题 Args: topics: 主题字符串或列表 Returns: tuple: (是否成功, 消息) """ try: with self.lock: if not self.client or not self.is_connected: return False, "MQTT未连接" # 格式化主题列表 if isinstance(topics, str): topic_list = [topics] else: topic_list = topics # 取消订阅 result = self.client.unsubscribe(topic_list) if result.rc == mqtt.MQTT_ERR_SUCCESS: # 更新本地主题列表 self.topics = [(t, q) for t, q in self.topics if t not in topic_list] logger.info(f"已取消订阅主题: {', '.join(topic_list)}") return True, f"已取消订阅主题: {', '.join(topic_list)}" else: error_msg = f"取消订阅失败: {mqtt.error_string(result.rc)}" logger.error(error_msg) return False, error_msg except Exception as e: error_msg = f"取消订阅失败: {str(e)}" logger.error(error_msg) return False, error_msg def _resubscribe_topics(self): """重新订阅所有已保存的主题""" if self.topics: try: topic_names = [topic for topic, _ in self.topics] logger.info(f"重新订阅之前的主题: {', '.join(topic_names)}") self.client.subscribe(self.topics) except Exception as e: logger.error(f"重新订阅主题失败: {str(e)}") def _on_connect(self, client, userdata, flags, rc): """连接回调函数""" if rc == 0: self.is_connected = True self.connected_event.set() logger.info(f"成功连接到MQTT服务器: {self.config.broker}:{self.config.port}") # 重置重连计数 self.reconnect_attempts = 0 self.last_reconnect_time = time.time() else: self.is_connected = False self.connected_event.clear() error_msg = f"连接MQTT服务器失败: {mqtt.connack_string(rc)} (代码: {rc})" logger.error(error_msg) if self.error_callback: self.error_callback(error_msg) def _on_disconnect(self, client, userdata, rc): """断开连接回调函数""" self.is_connected = False self.connected_event.clear() # 区分主动断开和意外断开 if rc != 0: error_msg = f"意外断开MQTT连接 (代码: {rc})" logger.warning(error_msg) # 处理重连逻辑 self._handle_reconnect() if self.error_callback: self.error_callback(error_msg) else: logger.info("主动断开MQTT连接") if self.status_callback: self.status_callback(False) def _on_message(self, client, userdata, msg): """消息接收回调函数""" try: # 尝试多种编码解码 encodings = ['utf-8', 'latin-1', 'ascii'] payload = None for encoding in encodings: try: payload = msg.payload.decode(encoding) break except UnicodeDecodeError: continue # 如果都失败,转为十六进制 if payload is None: payload = msg.payload.hex() logger.warning(f"收到无法解码的二进制消息,已转为十六进制: {payload[:50]}...") # 构建消息数据结构 data = { 'topic': msg.topic, 'payload': payload, 'qos': msg.qos, 'retain': msg.retain, 'timestamp': time.time() } logger.debug(f"收到MQTT消息: 主题={msg.topic}, 长度={len(msg.payload)}字节") if self.data_callback: self.data_callback(data) except Exception as e: error_msg = f"处理MQTT消息错误: {str(e)}" logger.error(error_msg) if self.error_callback: self.error_callback(error_msg) def _on_publish(self, client, userdata, mid): """发布回调函数""" logger.debug(f"消息发布成功,消息ID: {mid}") def _on_subscribe(self, client, userdata, mid, granted_qos): """订阅回调函数""" logger.debug(f"主题订阅成功,消息ID: {mid}, 授权QoS: {granted_qos}") def _handle_reconnect(self): """处理重连逻辑""" if not self.config or self.config.max_reconnect_attempts == 0: # 如果未设置最大重连次数或为0,则无限重连 return self.reconnect_attempts += 1 if self.reconnect_attempts > self.config.max_reconnect_attempts: logger.error(f"已达到最大重连次数 ({self.config.max_reconnect_attempts}),停止重连") # 可以在这里调用断开连接或通知上层 self.disconnect() def set_data_callback(self, callback): """设置数据接收回调函数""" self.data_callback = callback def set_status_callback(self, callback): """设置状态变化回调函数""" self.status_callback = callback def set_error_callback(self, callback): """设置错误回调函数""" self.error_callback = callback def get_status(self) -> Dict[str, Any]: """ 获取当前连接状态 Returns: dict: 包含连接状态和详细信息 """ with self.lock: status = { 'connected': self.is_connected, 'broker': self.config.broker if self.config else None, 'port': self.config.port if self.config else None, 'client_id': self.config.client_id if self.config else None, 'topics': [topic for topic, _ in self.topics], 'reconnect_attempts': self.reconnect_attempts, 'last_reconnect_time': self.last_reconnect_time } return status def get_config(self) -> Optional[MQTTConfig]: """获取当前MQTT配置""" return self.config # 创建全局MQTT实例 global_mqtt = MQTTClient()