| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528 |
- import paho.mqtt.client as mqtt
- import threading
- import json
- import time
- import logging
- import ssl
- from dataclasses import dataclass, field
- from typing import Dict, List, Optional, Tuple, Any
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger('mqtt_client')
- @dataclass
- class MQTTConfig:
- """MQTT配置数据类"""
- broker: str
- port: int = 1883
- username: str = ""
- password: str = ""
- client_id: str = ""
- keepalive: int = 60
- qos: int = 0
- # TLS/SSL 配置
- use_tls: bool = False
- tls_version: int = ssl.PROTOCOL_TLSv1_2
- ca_certs: Optional[str] = None
- certfile: Optional[str] = None
- keyfile: Optional[str] = None
- # 重连配置
- reconnect_delay: float = 1.0
- reconnect_delay_max: float = 120.0
- reconnect_exponential_backoff: bool = True
- max_reconnect_attempts: int = 0 # 0 表示无限尝试
- # 消息配置
- will_topic: Optional[str] = None
- will_payload: Optional[str] = None
- will_qos: int = 0
- will_retain: bool = False
- class MQTTClient:
- """MQTT客户端封装类,提供高级MQTT通信功能"""
-
- def __init__(self):
- self.client = None
- self.is_connected = False
- self.config = None
- self.topics = [] # 存储主题和QoS的元组列表
- self.data_callback = None
- self.status_callback = None
- self.error_callback = None
- self.connected_event = threading.Event()
- self.lock = threading.RLock()
- self.reconnect_attempts = 0
- self.last_reconnect_time = 0
- self.last_will_set = False
-
- def connect(self, broker: str, port: int = 1883, username: str = "",
- password: str = "", client_id: str = "", use_tls: bool = False,
- qos: int = 0, **kwargs) -> Tuple[bool, str]:
- """
- 连接到MQTT服务器
-
- Args:
- broker: MQTT服务器地址
- port: MQTT服务器端口
- username: 用户名
- password: 密码
- client_id: 客户端ID,为空时自动生成
- use_tls: 是否使用TLS加密
- qos: 默认QoS级别
- **kwargs: 其他配置参数
-
- Returns:
- tuple: (是否成功, 消息)
- """
- try:
- # 构建配置
- config = MQTTConfig(
- broker=broker,
- port=port,
- username=username,
- password=password,
- client_id=client_id or f"serial_mqtt_gateway_{time.time()}",
- use_tls=use_tls,
- qos=qos,
- **kwargs
- )
-
- with self.lock:
- # 断开现有连接
- if self.is_connected:
- self.disconnect()
-
- logger.info(f"尝试连接MQTT服务器: {broker}:{port}")
-
- # 创建MQTT客户端
- self.client = mqtt.Client(
- client_id=config.client_id,
- clean_session=True,
- userdata=None
- )
-
- # 设置回调函数
- self.client.on_connect = self._on_connect
- self.client.on_disconnect = self._on_disconnect
- self.client.on_message = self._on_message
- self.client.on_publish = self._on_publish
- self.client.on_subscribe = self._on_subscribe
-
- # 设置重连参数
- self.client.reconnect_delay_set(
- min_delay=config.reconnect_delay,
- max_delay=config.reconnect_delay_max,
- exponential_backoff=config.reconnect_exponential_backoff
- )
-
- # 设置用户名密码
- if config.username and config.password:
- self.client.username_pw_set(config.username, config.password)
- logger.debug(f"已设置MQTT用户名密码认证")
-
- # 配置TLS
- if config.use_tls:
- try:
- self.client.tls_set(
- ca_certs=config.ca_certs,
- certfile=config.certfile,
- keyfile=config.keyfile,
- tls_version=config.tls_version
- )
- logger.debug(f"已配置MQTT TLS连接")
- except Exception as e:
- error_msg = f"配置TLS失败: {str(e)}"
- logger.error(error_msg)
- return False, error_msg
-
- # 设置遗嘱消息
- if config.will_topic:
- will_payload = config.will_payload or "{\"status\":\"offline\"}"
- self.client.will_set(
- topic=config.will_topic,
- payload=will_payload,
- qos=config.will_qos,
- retain=config.will_retain
- )
- self.last_will_set = True
- logger.debug(f"已设置MQTT遗嘱消息: {config.will_topic}")
-
- # 连接到服务器
- self.client.connect(
- broker=config.broker,
- port=config.port,
- keepalive=config.keepalive
- )
-
- # 启动客户端循环
- self.client.loop_start()
-
- # 保存配置
- self.config = config
-
- # 等待连接成功或超时
- connected = self.connected_event.wait(timeout=10)
-
- if connected:
- logger.info(f"已连接到MQTT服务器 {broker}:{port}")
- self.reconnect_attempts = 0
- self.last_reconnect_time = time.time()
-
- if self.status_callback:
- self.status_callback(True)
-
- # 重新订阅之前的主题
- if self.topics:
- self._resubscribe_topics()
-
- return True, f"已连接到MQTT服务器 {broker}:{port}"
- else:
- error_msg = "连接MQTT服务器超时"
- logger.error(error_msg)
- self.client.loop_stop()
- self.client = None
-
- if self.status_callback:
- self.status_callback(False)
-
- return False, error_msg
-
- except Exception as e:
- error_msg = f"连接失败: {str(e)}"
- logger.error(error_msg)
-
- if self.status_callback:
- self.status_callback(False)
- if self.error_callback:
- self.error_callback(error_msg)
-
- return False, error_msg
-
- def disconnect(self) -> Tuple[bool, str]:
- """
- 断开MQTT连接
-
- Returns:
- tuple: (是否成功, 消息)
- """
- try:
- with self.lock:
- logger.info("断开MQTT连接")
-
- if self.client:
- # 发布在线状态(如果设置了遗嘱消息)
- if self.last_will_set and self.is_connected:
- try:
- self.client.publish(
- topic=self.config.will_topic,
- payload="{\"status\":\"online\"}",
- qos=self.config.will_qos,
- retain=self.config.will_retain
- )
- time.sleep(0.1) # 给发布消息一些时间
- except Exception as e:
- logger.warning(f"发布离线状态失败: {str(e)}")
-
- # 停止循环并断开连接
- self.client.loop_stop()
- try:
- self.client.disconnect()
- except Exception as e:
- logger.error(f"断开连接时出错: {str(e)}")
- finally:
- self.client = None
-
- self.is_connected = False
- self.connected_event.clear()
-
- if self.status_callback:
- self.status_callback(False)
-
- return True, "已断开MQTT连接"
-
- except Exception as e:
- error_msg = f"断开连接失败: {str(e)}"
- logger.error(error_msg)
- return False, error_msg
-
- def subscribe(self, topics, qos: Optional[int] = None) -> Tuple[bool, str]:
- """
- 订阅主题
-
- Args:
- topics: 主题字符串或列表
- qos: QoS级别,如果为None则使用配置中的默认值
-
- Returns:
- tuple: (是否成功, 消息)
- """
- try:
- with self.lock:
- if not self.client or not self.is_connected:
- return False, "MQTT未连接"
-
- # 确定QoS级别
- qos_level = qos if qos is not None else (self.config.qos if self.config else 0)
-
- # 格式化主题列表
- if isinstance(topics, str):
- topic_list = [(topics, qos_level)]
- topic_names = [topics]
- else:
- topic_list = [(topic, qos_level) for topic in topics]
- topic_names = topics
-
- # 订阅主题
- result, _ = self.client.subscribe(topic_list)
-
- if result == mqtt.MQTT_ERR_SUCCESS:
- # 更新本地主题列表
- self.topics = topic_list
- logger.info(f"已订阅主题: {', '.join(topic_names)}, QoS: {qos_level}")
- return True, f"已订阅主题: {', '.join(topic_names)}"
- else:
- error_msg = f"订阅失败: {mqtt.error_string(result)}"
- logger.error(error_msg)
- return False, error_msg
-
- except Exception as e:
- error_msg = f"订阅失败: {str(e)}"
- logger.error(error_msg)
- return False, error_msg
-
- def publish(self, topic: str, message: Any, qos: Optional[int] = None,
- retain: bool = False) -> Tuple[bool, str]:
- """
- 发布消息
-
- Args:
- topic: 发布主题
- message: 消息内容(自动转换为JSON字符串)
- qos: QoS级别,如果为None则使用配置中的默认值
- retain: 是否为保留消息
-
- Returns:
- tuple: (是否成功, 消息)
- """
- try:
- with self.lock:
- if not self.client or not self.is_connected:
- return False, "MQTT未连接"
-
- # 确定QoS级别
- qos_level = qos if qos is not None else (self.config.qos if self.config else 0)
-
- # 序列化消息
- if isinstance(message, (dict, list)):
- payload = json.dumps(message)
- else:
- payload = str(message)
-
- # 发布消息
- result = self.client.publish(
- topic=topic,
- payload=payload,
- qos=qos_level,
- retain=retain
- )
-
- # 检查发布是否成功
- if result.rc == mqtt.MQTT_ERR_SUCCESS:
- logger.debug(f"消息已发布到主题 {topic}: {payload[:50]}...")
- return True, f"消息已发布到主题 {topic}"
- else:
- error_msg = f"发布失败: {mqtt.error_string(result.rc)}"
- logger.error(error_msg)
- return False, error_msg
-
- except Exception as e:
- error_msg = f"发布失败: {str(e)}"
- logger.error(error_msg)
- return False, error_msg
-
- def unsubscribe(self, topics) -> Tuple[bool, str]:
- """
- 取消订阅主题
-
- Args:
- topics: 主题字符串或列表
-
- Returns:
- tuple: (是否成功, 消息)
- """
- try:
- with self.lock:
- if not self.client or not self.is_connected:
- return False, "MQTT未连接"
-
- # 格式化主题列表
- if isinstance(topics, str):
- topic_list = [topics]
- else:
- topic_list = topics
-
- # 取消订阅
- result = self.client.unsubscribe(topic_list)
-
- if result.rc == mqtt.MQTT_ERR_SUCCESS:
- # 更新本地主题列表
- self.topics = [(t, q) for t, q in self.topics if t not in topic_list]
- logger.info(f"已取消订阅主题: {', '.join(topic_list)}")
- return True, f"已取消订阅主题: {', '.join(topic_list)}"
- else:
- error_msg = f"取消订阅失败: {mqtt.error_string(result.rc)}"
- logger.error(error_msg)
- return False, error_msg
-
- except Exception as e:
- error_msg = f"取消订阅失败: {str(e)}"
- logger.error(error_msg)
- return False, error_msg
-
- def _resubscribe_topics(self):
- """重新订阅所有已保存的主题"""
- if self.topics:
- try:
- topic_names = [topic for topic, _ in self.topics]
- logger.info(f"重新订阅之前的主题: {', '.join(topic_names)}")
- self.client.subscribe(self.topics)
- except Exception as e:
- logger.error(f"重新订阅主题失败: {str(e)}")
-
- def _on_connect(self, client, userdata, flags, rc):
- """连接回调函数"""
- if rc == 0:
- self.is_connected = True
- self.connected_event.set()
- logger.info(f"成功连接到MQTT服务器: {self.config.broker}:{self.config.port}")
-
- # 重置重连计数
- self.reconnect_attempts = 0
- self.last_reconnect_time = time.time()
- else:
- self.is_connected = False
- self.connected_event.clear()
- error_msg = f"连接MQTT服务器失败: {mqtt.connack_string(rc)} (代码: {rc})"
- logger.error(error_msg)
-
- if self.error_callback:
- self.error_callback(error_msg)
-
- def _on_disconnect(self, client, userdata, rc):
- """断开连接回调函数"""
- self.is_connected = False
- self.connected_event.clear()
-
- # 区分主动断开和意外断开
- if rc != 0:
- error_msg = f"意外断开MQTT连接 (代码: {rc})"
- logger.warning(error_msg)
-
- # 处理重连逻辑
- self._handle_reconnect()
-
- if self.error_callback:
- self.error_callback(error_msg)
- else:
- logger.info("主动断开MQTT连接")
-
- if self.status_callback:
- self.status_callback(False)
-
- def _on_message(self, client, userdata, msg):
- """消息接收回调函数"""
- try:
- # 尝试多种编码解码
- encodings = ['utf-8', 'latin-1', 'ascii']
- payload = None
- for encoding in encodings:
- try:
- payload = msg.payload.decode(encoding)
- break
- except UnicodeDecodeError:
- continue
-
- # 如果都失败,转为十六进制
- if payload is None:
- payload = msg.payload.hex()
- logger.warning(f"收到无法解码的二进制消息,已转为十六进制: {payload[:50]}...")
-
- # 构建消息数据结构
- data = {
- 'topic': msg.topic,
- 'payload': payload,
- 'qos': msg.qos,
- 'retain': msg.retain,
- 'timestamp': time.time()
- }
-
- logger.debug(f"收到MQTT消息: 主题={msg.topic}, 长度={len(msg.payload)}字节")
-
- if self.data_callback:
- self.data_callback(data)
- except Exception as e:
- error_msg = f"处理MQTT消息错误: {str(e)}"
- logger.error(error_msg)
- if self.error_callback:
- self.error_callback(error_msg)
-
- def _on_publish(self, client, userdata, mid):
- """发布回调函数"""
- logger.debug(f"消息发布成功,消息ID: {mid}")
-
- def _on_subscribe(self, client, userdata, mid, granted_qos):
- """订阅回调函数"""
- logger.debug(f"主题订阅成功,消息ID: {mid}, 授权QoS: {granted_qos}")
-
- def _handle_reconnect(self):
- """处理重连逻辑"""
- if not self.config or self.config.max_reconnect_attempts == 0:
- # 如果未设置最大重连次数或为0,则无限重连
- return
-
- self.reconnect_attempts += 1
- if self.reconnect_attempts > self.config.max_reconnect_attempts:
- logger.error(f"已达到最大重连次数 ({self.config.max_reconnect_attempts}),停止重连")
- # 可以在这里调用断开连接或通知上层
- self.disconnect()
-
- def set_data_callback(self, callback):
- """设置数据接收回调函数"""
- self.data_callback = callback
-
- def set_status_callback(self, callback):
- """设置状态变化回调函数"""
- self.status_callback = callback
-
- def set_error_callback(self, callback):
- """设置错误回调函数"""
- self.error_callback = callback
-
- def get_status(self) -> Dict[str, Any]:
- """
- 获取当前连接状态
-
- Returns:
- dict: 包含连接状态和详细信息
- """
- with self.lock:
- status = {
- 'connected': self.is_connected,
- 'broker': self.config.broker if self.config else None,
- 'port': self.config.port if self.config else None,
- 'client_id': self.config.client_id if self.config else None,
- 'topics': [topic for topic, _ in self.topics],
- 'reconnect_attempts': self.reconnect_attempts,
- 'last_reconnect_time': self.last_reconnect_time
- }
- return status
-
- def get_config(self) -> Optional[MQTTConfig]:
- """获取当前MQTT配置"""
- return self.config
- # 创建全局MQTT实例
- global_mqtt = MQTTClient()
|