|
@@ -0,0 +1,122 @@
|
|
|
+/*
|
|
|
+ * 文 件 名: MqttClient
|
|
|
+ * 版 权: 浩鲸云计算科技股份有限公司
|
|
|
+ * 描 述: <描述>
|
|
|
+ * 修 改 人: lvwenbin
|
|
|
+ * 修改时间: 2024/4/30
|
|
|
+ * 跟踪单号: <跟踪单号>
|
|
|
+ * 修改单号: <修改单号>
|
|
|
+ * 修改内容: <修改内容>
|
|
|
+ */
|
|
|
+package com.ruoyi.core;
|
|
|
+
|
|
|
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
|
|
+import org.eclipse.paho.client.mqttv3.MqttCallback;
|
|
|
+import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
|
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
|
+import org.eclipse.paho.client.mqttv3.MqttException;
|
|
|
+import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
|
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Mqtt操作模板
|
|
|
+ * <功能详细描述>
|
|
|
+ *
|
|
|
+ * @author lvwenbin
|
|
|
+ * @version [版本号, 2024/4/30]
|
|
|
+ * @see [相关类/方法]
|
|
|
+ * @since [产品/模块版本]
|
|
|
+ */
|
|
|
+public class MqttTemplate {
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(MqttTemplate.class);
|
|
|
+
|
|
|
+ private MqttClient mqttClient;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 构造器
|
|
|
+ *
|
|
|
+ * @param broker 服务地址
|
|
|
+ * @param clientId 客户端编码
|
|
|
+ */
|
|
|
+ public MqttTemplate(String broker, String clientId) throws MqttException {
|
|
|
+ MemoryPersistence persistence = new MemoryPersistence();
|
|
|
+
|
|
|
+ mqttClient = new MqttClient(broker, clientId, persistence);
|
|
|
+ // MQTT 连接选项
|
|
|
+ MqttConnectOptions connOpts = new MqttConnectOptions();
|
|
|
+ // 保留会话
|
|
|
+ connOpts.setCleanSession(false);
|
|
|
+ connOpts.setAutomaticReconnect(true);
|
|
|
+ // 建立连接
|
|
|
+ mqttClient.connect(connOpts);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 订阅
|
|
|
+ *
|
|
|
+ * @param topic 主题
|
|
|
+ * @param qos 0-最多1次,1-至少1次,2-仅1次
|
|
|
+ */
|
|
|
+ public void subscribe(String topic, Integer qos, MqttMessageHandler handler) {
|
|
|
+ try {
|
|
|
+ mqttClient.subscribe(topic, qos);
|
|
|
+ mqttClient.setCallback(new MqttCallback() {
|
|
|
+ // 链接丢失处理
|
|
|
+ @Override
|
|
|
+ public void connectionLost(Throwable throwable) {
|
|
|
+ // 连接丢失后,一般在这里面进行重连
|
|
|
+ log.info("mqtt服务连接断开,进行重连");
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 尝试重新连接
|
|
|
+ if (!mqttClient.isConnected()) {
|
|
|
+ mqttClient.reconnect();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (MqttException e) {
|
|
|
+ log.error("重连失败:[{}]", e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 消息接收处理
|
|
|
+ @Override
|
|
|
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
|
|
|
+ String content = new String(message.getPayload(), StandardCharsets.UTF_8);
|
|
|
+ handler.handle(topic, content);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 消息发送处理
|
|
|
+ @Override
|
|
|
+ public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
+ log.info("消息发布结果[{}]", token.isComplete());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ log.error("mqtt订阅异常", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 消息发送
|
|
|
+ * @param topic 主题
|
|
|
+ * @param payload 报文
|
|
|
+ * @param qos 消息级别
|
|
|
+ * @param retained 回复
|
|
|
+ */
|
|
|
+ public void send(String topic, String payload, int qos, boolean retained) {
|
|
|
+ try {
|
|
|
+ MqttMessage message = new MqttMessage();
|
|
|
+ message.setPayload(payload.getBytes(StandardCharsets.UTF_8));
|
|
|
+ message.setQos(qos);
|
|
|
+ message.setRetained(retained);
|
|
|
+ mqttClient.publish(topic, message);
|
|
|
+ } catch (MqttException e) {
|
|
|
+ log.error("[Send]fail!", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|