wenhongquan hace 2 años
padre
commit
0d6923543b

+ 1 - 0
ruoyi-admin/src/main/resources/application.yml

@@ -274,3 +274,4 @@ mqtt:
   user:
   password:
   topic: /sensor/#
+  saveAndForward: ${DATASAVE:true}

+ 51 - 35
ruoyi-system/src/main/java/com/ruoyi/data/service/impl/MqttServiceImpl.java

@@ -1,6 +1,7 @@
 package com.ruoyi.data.service.impl;
 
 import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.util.StrUtil;
 import cn.hutool.json.JSONArray;
 import cn.hutool.core.lang.UUID;
 import cn.hutool.json.JSONObject;
@@ -25,6 +26,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
@@ -70,40 +72,43 @@ public class MqttServiceImpl implements MqttService {
 
     @Value("${mqtt.topic}")
     private String mqttTopic;
-//    @Value("${mqtt.url}")
-//    private String mqttUrl;
+
+    @Value("${mqtt.saveAndForward}")
+    private Boolean saveAndForward;
 
     @Override
     public void pubMqttData(String mqttStr) {
-        JSONObject jsonObject = new JSONObject(mqttStr);
-        Long sensorId = Long.valueOf((String) jsonObject.get("sensorId"));
-        TblSensor tblSensor = tblSensorMapper.selectById(sensorId);
-        Long deviceId = tblSensor.getDeviceId();
-        SimpleDateFormat formatter= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        Date date = new Date(System.currentTimeMillis());
-        JSONObject mqttMsg = new JSONObject();
-        mqttMsg.put("deviceSn",deviceId);
-        mqttMsg.put("sensorSn",sensorId);
-        mqttMsg.put("sensorType",tblSensor.getSensorType());
-        mqttMsg.put("created_time",formatter.format(date));
-        JSONArray sensorData = new JSONArray();
-        JSONArray dataArry = new JSONArray(jsonObject.get("data"));
-        for(Object obj: dataArry){
-             JSONObject dataObj = new JSONObject(obj);
-             dataObj.remove("gcdinfo");
-             sensorData.add(dataObj);
-        }
-        mqttMsg.put("sensorData",sensorData);
-        MqttObj mqttObj = new MqttObj();
-        mqttObj.setEquipmentId(deviceId);
-        List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(mqttObj);
-        TblRecord tblRecord = new TblRecord();
-        tblRecord.setEquipmentId(deviceId);
-        tblRecord.setJson(mqttStr);
-        tblRecord.setCreateBy("admin");
-        tblRecord.setUpdateBy("admin");
-        tblRecord.setSensorId(sensorId);
-        tblRecordMapper.insert(tblRecord);
+        if(saveAndForward) {
+
+            JSONObject jsonObject = new JSONObject(mqttStr);
+            Long sensorId = Long.valueOf((String) jsonObject.get("sensorId"));
+            TblSensor tblSensor = tblSensorMapper.selectById(sensorId);
+            Long deviceId = tblSensor.getDeviceId();
+            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+            Date date = new Date(System.currentTimeMillis());
+            JSONObject mqttMsg = new JSONObject();
+            mqttMsg.put("deviceSn", deviceId);
+            mqttMsg.put("sensorSn", sensorId);
+            mqttMsg.put("sensorType", tblSensor.getSensorType());
+            mqttMsg.put("created_time", formatter.format(date));
+            JSONArray sensorData = new JSONArray();
+            JSONArray dataArry = new JSONArray(jsonObject.get("data"));
+            for (Object obj : dataArry) {
+                JSONObject dataObj = new JSONObject(obj);
+                dataObj.remove("gcdinfo");
+                sensorData.add(dataObj);
+            }
+            mqttMsg.put("sensorData", sensorData);
+            MqttObj mqttObj = new MqttObj();
+            mqttObj.setEquipmentId(deviceId);
+            List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(mqttObj);
+            TblRecord tblRecord = new TblRecord();
+            tblRecord.setEquipmentId(deviceId);
+            tblRecord.setJson(mqttStr);
+            tblRecord.setCreateBy("admin");
+            tblRecord.setUpdateBy("admin");
+            tblRecord.setSensorId(sensorId);
+            tblRecordMapper.insert(tblRecord);
 //        TblSensorRecordBo tblSensorRecord = new TblSensorRecordBo();
 //        tblSensorRecord.setEquipmentId(deviceId);
 //        tblSensorRecord.setJson(mqttStr);
@@ -123,13 +128,23 @@ public class MqttServiceImpl implements MqttService {
         }
         for(MqttObj obj:mqttObjList){
                if(obj.getStatus() == 1) {
-                   JSONObject topicObj = obj.getTopicQos(protocolType);
+                   JSONArray topicObj1 = JSONUtil.parseArray(obj.getServerTopic());
                    MQTTConnect mqttConnect = new MQTTConnect();
                    try {
                        mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback());
-                       if (topicObj != null) {
-                           String topic = topicObj.get("name").toString().replace("#", "");
-                           mqttConnect.pub(topic, mqttMsg.toString(), Integer.valueOf((String) topicObj.get("qos")));
+                       if (topicObj1.size() >0) {
+                           String finalProtocolType = protocolType;
+                           topicObj1.forEach(i->{
+                               JSONObject p = (JSONObject)i;
+                               String topic = p.getStr("name");
+                               topic = StrUtil.replace(topic,"$protocolType$", finalProtocolType);
+                               topic = StrUtil.replace(topic,"$sensorId$", sensorId.toString());
+                               try {
+                                   mqttConnect.pub(topic, mqttMsg.toString(), (Integer) p.get("qos"));
+                               } catch (MqttException e) {
+                                   throw new RuntimeException(e);
+                               }
+                           });
                        } else {
                            String topic = "sensor/"+protocolType+"/" + sensorId;
                            mqttConnect.pub(topic, mqttMsg.toString(), 0);
@@ -139,6 +154,7 @@ public class MqttServiceImpl implements MqttService {
                    }
                }
             }
+        }
     }
 
     @Override

+ 3 - 0
ruoyi-ui-vue3/.env.development

@@ -7,9 +7,12 @@ VITE_APP_ENV = 'development'
 # 若依管理系统/开发环境
 VITE_APP_BASE_API = '/dev-api'
 
+VITE_APP_WS_API = '/dev-ws'
+
 # 应用访问路径 例如使用前缀 /admin/
 VITE_APP_CONTEXT_PATH = '/'
 
+
 # 监控地址
 VITE_APP_MONITRO_ADMIN = 'http://localhost:9090/admin/applications'
 

+ 2 - 1
ruoyi-ui-vue3/src/store/modules/websocket.js

@@ -2,7 +2,8 @@ const useWSStore = defineStore(
   'ws',
   {
     state: () => ({
-      isConnected: false
+      isConnected: false,
+      message: '',
     }),
     actions: {
       connect() {

+ 13 - 1
ruoyi-ui-vue3/src/utils/websocket.ts

@@ -3,10 +3,20 @@ import useWSStore from '../store/modules/websocket'
 
 let websocket: WebSocket | null = null; // 用于存储实例化后websocket
 let rec: any; // 断线重连后,延迟5秒重新创建WebSocket连接  rec用来存储延迟请求的代码
-
+// @ts-ignore
+const baseUrl = import.meta.env.VITE_APP_WS_API;
 const store = useWSStore();
 // 创建websocket
 function creatWebSocket(wsUrl: string) {
+  wsUrl = baseUrl+wsUrl;
+  wsUrl = wsUrl.replace("http","ws");
+  wsUrl = wsUrl.replace("https","wss");
+  if(wsUrl.indexOf("ws") == -1){
+    var start =  window.location.href.split("://")[0];
+    start = start.replace("http","ws");
+    start = start.replace("https","wss");
+    wsUrl =start+"://"+ window.location.href.split("://")[1].split("/")[0]+wsUrl
+  }
   console.log("websocket==================");
   // 判断当前浏览器是否支持WebSocket
   if ("WebSocket" in window) {
@@ -62,6 +72,7 @@ function initWebSocket(wsUrl: string) {
 
 // 定义重连函数
 let reConnect = (wsUrl: string) => {
+
   console.log("尝试重新连接");
   if (store.state.isConnected) return; // 如果已经连上就不在重连了
   rec && clearTimeout(rec);
@@ -81,6 +92,7 @@ function websocketOpen() {
 function websocketonmessage(e: MessageEvent<any>) {
   console.log("数据接收", e.data);
   const data = JSON.parse(e.data);  // 解析JSON格式的数据
+  store.state.message = data;
   // 下面的判断则是后台返回的接收到的数据  如何处理自己决定
   if (data.code === 400) {
     console.log("数据接收", data.msg);

+ 2 - 1
ruoyi-ui-vue3/src/views/device/sensordash/index.vue

@@ -272,7 +272,7 @@ import {listSensor,delSensor,addSensor,updateSensor} from "@/api/data/sensor"
 import {listDatapoint} from "@/api/data/datapoint";
 import {listUnit} from "@/api/data/unit";
 
-import {closeWebSocket,initWebSocket } from "@/utils/websocket";
+import {closeWebSocket,creatWebSocket } from "@/utils/websocket";
 import {ElMessage, ElMessageBox} from "element-plus";
 
 const {proxy} = getCurrentInstance();
@@ -293,6 +293,7 @@ const cloumdata = ref([
   {label: '类型', prop: 'sensorType', visible: true},
 ])
 
+creatWebSocket("/ws/realtimedata/ss");
 
 const onchangepage = (page) => {
   pagedata.value.current = page;

+ 5 - 0
ruoyi-ui-vue3/vite.config.js

@@ -33,6 +33,11 @@ export default defineConfig(({ mode, command }) => {
           target: 'http://localhost:8989',
           changeOrigin: true,
           rewrite: (p) => p.replace(/^\/dev-api/, '')
+        },
+        '/dev-ws': {
+          target: 'ws://localhost:8989',
+          changeOrigin: true,
+          rewrite: (p) => p.replace(/^\/dev-ws/, '')
         }
       }
     },