liwei19941102 2 лет назад
Родитель
Сommit
4a335d9ce0

+ 20 - 0
ruoyi-common/src/main/java/com/ruoyi/common/utils/mqtt/MQTTConnect.java

@@ -1,5 +1,7 @@
 package com.ruoyi.common.utils.mqtt;
 
+import com.ruoyi.common.utils.cache.CacheManager;
+import com.ruoyi.common.utils.cache.CacheManagerEntity;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
@@ -89,6 +91,14 @@ public class MQTTConnect {
         token.waitForCompletion();
     }
 
+    public void createMqttConnect(String Host,String clientId,String userName, String passWord) throws MqttException {
+           MQTTConnect mqttConnect = new MQTTConnect();
+           mqttConnect.setMqttClient(Host, clientId, userName,passWord,new Callback());
+           String key = "mqtt"+clientId;
+           CacheManagerEntity cacheManagerEntity = new CacheManagerEntity(mqttClient);
+           CacheManager.putCache(key,cacheManagerEntity);
+    }
+
     /**
      * 订阅某一个主题 ,此方法默认的的Qos等级为:1
      *
@@ -108,6 +118,16 @@ public class MQTTConnect {
         mqttClient.subscribe(topic, qos);
     }
 
+    public void pubMqttClient(String Host,String clientId,String userName, String passWord,String topic,int qos,String msg) throws MqttException {
+              MqttClient mqttClientOld = (MqttClient) CacheManager.getCacheDataByKey("mqtt"+clientId);
+              if(mqttClientOld != null){
+                  mqttClient = mqttClientOld;
+              }else{
+                  this.createMqttConnect(Host,clientId,userName,passWord);
+              }
+              this.pub(topic,msg,qos);
+    }
+
     /**
      * main函数自己测试用
      */

+ 8 - 1
ruoyi-system/src/main/java/com/ruoyi/data/service/DataDealservice.java

@@ -2,13 +2,20 @@ package com.ruoyi.data.service;
 
 import com.ruoyi.data.domain.DataFormatBean;
 import com.ruoyi.data.domain.TblModbusTcp;
+import com.ruoyi.data.domain.TblMqtt;
 
 //协议获取数据接口
 public interface DataDealservice {
 
-    void getModBusTcp(DataFormatBean dataFormatBean);
+    String getModBusTcp(DataFormatBean dataFormatBean);
 
     void setModBusTcp(DataFormatBean dataFormatBean);
 
     void createTcpConnect(TblModbusTcp tblModbusTcp);
+
+    void createMqttConnect(TblMqtt tblMqtt);
+
+    void pubMqttClient(TblMqtt tblMqtt,String topic,int qos,String msg);
+
+
 }

+ 26 - 2
ruoyi-system/src/main/java/com/ruoyi/data/service/impl/DataDealServiceImpl.java

@@ -1,9 +1,11 @@
 package com.ruoyi.data.service.impl;
 
 import com.ruoyi.common.utils.ModbusUtils;
+import com.ruoyi.common.utils.mqtt.MQTTConnect;
 import com.ruoyi.data.domain.DataFormatBean;
 import com.ruoyi.data.domain.TblDatapoint;
 import com.ruoyi.data.domain.TblModbusTcp;
+import com.ruoyi.data.domain.TblMqtt;
 import com.ruoyi.data.mapper.TblDatapointMapper;
 import com.ruoyi.data.mapper.TblModbusTcpMapper;
 import com.ruoyi.data.service.DataDealservice;
@@ -22,7 +24,8 @@ public class DataDealServiceImpl implements DataDealservice {
 
     private final ModbusUtils modbusUtils;
     @Override
-    public void getModBusTcp(DataFormatBean dataFormatBean) {
+    public String getModBusTcp(DataFormatBean dataFormatBean) {
+        String value = "";
         TblDatapoint tblDatapoint = datapointMapper.selectById(dataFormatBean.getPointId());
         TblModbusTcp tblModbusTcp = tblModbusTcpMapper.selectById(dataFormatBean.getModbusUuId());
         String str = "holding-register:"+(tblDatapoint.getAddr()+tblDatapoint.getAddrOffset())+":UINT["+tblDatapoint.getLen()+"]";
@@ -30,10 +33,11 @@ public class DataDealServiceImpl implements DataDealservice {
         String connectKey = "tcp"+tblModbusTcp.getUuid();
         String dataKey = "tcp"+tblDatapoint.getId().toString();
         try{
-            modbusUtils.plcRead(url,connectKey,dataKey,str);
+            value = modbusUtils.plcRead(url,connectKey,dataKey,str);
         }catch (Exception e){
             e.printStackTrace();
         }
+        return value;
     }
 
     @Override
@@ -63,5 +67,25 @@ public class DataDealServiceImpl implements DataDealservice {
         }
     }
 
+    @Override
+    public void createMqttConnect(TblMqtt tblMqtt) {
+        MQTTConnect mqttConnect = new MQTTConnect();
+        try {
+            mqttConnect.createMqttConnect(tblMqtt.getServerAddress(),tblMqtt.getUuid(),tblMqtt.getAccount(),tblMqtt.getPassword());
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void pubMqttClient(TblMqtt tblMqtt,String topic,int qos,String msg) {
+        MQTTConnect mqttConnect = new MQTTConnect();
+        try {
+            mqttConnect.pubMqttClient(tblMqtt.getServerAddress(),tblMqtt.getUuid(),tblMqtt.getAccount(),tblMqtt.getPassword(),topic,qos,msg);
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+    }
+
 
 }