liwei19941102 2 tahun lalu
induk
melakukan
5bee02d8bf

+ 3 - 3
ruoyi-common/src/main/java/com/ruoyi/common/utils/PoolUtils.java

@@ -7,15 +7,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class TaskUtils {
+public class PoolUtils {
 
-     private  static ScheduledExecutorService pool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
+    private  static ScheduledExecutorService pool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
 
     public static ScheduledExecutorService getPool() {
         return pool;
     }
 
     public static void setPool(ScheduledExecutorService pool) {
-        TaskUtils.pool = pool;
+        PoolUtils.pool = pool;
     }
 }

+ 3 - 3
ruoyi-common/src/main/java/com/ruoyi/common/utils/mqtt/MQTTConnect.java

@@ -27,7 +27,7 @@ public class MQTTConnect {
      * @param passWord     密码
      * @param mqttCallback 回调函数
      **/
-    public void setMqttClient(String Host,String clientId,String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
+    public void createMqttClient(String Host,String clientId,String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
         MqttConnectOptions options = mqttConnectOptions(Host,clientId,userName, passWord);
         if (mqttCallback == null) {
             mqttClient.setCallback(new Callback());
@@ -96,7 +96,7 @@ public class MQTTConnect {
 
     public void createMqttConnect(String Host,String clientId,String userName, String passWord) throws MqttException {
            MQTTConnect mqttConnect = new MQTTConnect();
-           mqttConnect.setMqttClient(Host, clientId, userName,passWord,new Callback());
+           mqttConnect.createMqttClient(Host, clientId, userName,passWord,new Callback());
     }
 
     /**
@@ -125,7 +125,7 @@ public class MQTTConnect {
                   mqttConnect.setMqttClient(mqttClientOld);
                   //mqttClient = mqttClientOld;
               }else{
-                  mqttConnect.setMqttClient(Host, clientId, userName,passWord,new Callback());
+                  mqttConnect.createMqttClient(Host, clientId, userName,passWord,new Callback());
               }
               mqttConnect.pub(topic,msg,qos);
     }

+ 63 - 40
ruoyi-system/src/main/java/com/ruoyi/data/controller/TblModBusController.java

@@ -1,36 +1,32 @@
 package com.ruoyi.data.controller;
 
-import cn.dev33.satoken.annotation.SaCheckPermission;
+
 import com.ruoyi.common.core.controller.BaseController;
-import com.ruoyi.common.core.domain.PageQuery;
-import com.ruoyi.common.core.page.TableDataInfo;
 import com.ruoyi.common.utils.ModbusUtils;
-import com.ruoyi.common.utils.mqtt.MQTTConnect;
-import com.ruoyi.common.utils.redis.CacheUtils;
+
+import com.ruoyi.common.utils.PoolUtils;
 import com.ruoyi.data.domain.DataFormatBean;
-import com.ruoyi.data.domain.TblDatapoint;
-import com.ruoyi.data.domain.TblEquipment;
+
 import com.ruoyi.data.domain.TblModbusTcp;
-import com.ruoyi.data.domain.bo.TblMqttBo;
+import com.ruoyi.data.domain.bo.TblEquipmentBo;
 import com.ruoyi.data.domain.vo.TblEquipmentVo;
-import com.ruoyi.data.domain.vo.TblMqttVo;
+import com.ruoyi.data.domain.vo.TblModbusTcpVo;
 import com.ruoyi.data.service.DataDealservice;
 import com.ruoyi.data.service.ITblDatapointService;
 import com.ruoyi.data.service.ITblEquipmentService;
+import com.ruoyi.data.service.ITblModbusTcpService;
 import lombok.RequiredArgsConstructor;
-import org.ehcache.xml.model.ThreadPoolsType;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
+
+import java.util.List;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.TimerTask;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
 
 @Validated
 @RequiredArgsConstructor
@@ -46,6 +42,8 @@ public class TblModBusController extends BaseController {
 
     private final ITblEquipmentService tblEquipmentService;
 
+    private final ITblModbusTcpService tblModbusTcpService;
+
     static Queue<String> queue = new ConcurrentLinkedQueue<String>();
 
     @GetMapping("/test")
@@ -56,31 +54,56 @@ public class TblModBusController extends BaseController {
           dealservice.getDataToMqtt(tblEquipment);
     }
 
+    @GetMapping("/tests")
+    public void equipment(TblEquipmentBo bo){
+        List<TblEquipmentVo> tblEquipmentVoList = tblEquipmentService.queryList(bo);
+        for(TblEquipmentVo vo:tblEquipmentVoList){
+            dealservice.getDataToMqtt(vo);
+        }
+        ScheduledExecutorService pool = PoolUtils.getPool();
+        for(TblEquipmentVo vo:tblEquipmentVoList){
+            TblModbusTcpVo tblModbusTcp = tblModbusTcpService.queryById(vo.getProtocalId());
+            pool.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                        dealservice.getDataToMqtt(vo);
+                }
+            },5,tblModbusTcp.getIntervals(),TimeUnit.SECONDS);//推迟5秒执行,然后每10秒执行一次
+
+        }
+    }
+
+
+    @GetMapping("/thread")
+    public void thread(DataFormatBean bean){
+        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
+        AtomicLong atomicLong = new AtomicLong(0L);
+        //提交初始延迟1秒执行,固定周期为2秒的runnable任务
+        ScheduledFuture<?> runnableFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
+            //记录当前时间
+            Long current = System.currentTimeMillis();
+            //判断是否为第一次运行
+            if (atomicLong.get()==0){
+                atomicLong.set(current);
+                System.out.printf("first running [%d]\n",atomicLong.get());
+            }else{
+                //记录与上次的间隔时间
+                System.out.printf("running time:[%d]\n",current-atomicLong.get());
+            }
+            //将当前时间保存
+            atomicLong.set(current);
+            //模拟超过固定周期时间
+//            mySleep(5);
+        }, 1,10, TimeUnit.SECONDS);
+    }
 
-//    @GetMapping("/thread")
-//    public void thread(DataFormatBean bean){
-////        dealservice.setModBusTcp(bean);
-////        dealservice.getModBusTcp(bean);
-//        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
-//        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
-//        System.out.println("时间:" + sdf.format(new Date()) );
-//        for (int i = 0; i < queue.size(); i++) {
-//            executor.scheduleWithFixedDelay(new Runnable() {
-//                @Override
-//                public void run() {
-//                    String value = ThreadPoolTask.queue.poll();
-//                    if (value != "" && null != value) {
-//                        System.out.println("时间:" + sdf.format(new Date()) + " 线程" + Thread.currentThread().getName() + " 执行了task: " + value);
-//                    }
-//                    try {
-//                        TimeUnit.SECONDS.sleep(5);
-//                    } catch (InterruptedException e) {
-//                        e.printStackTrace();
-//                    }
-//                }
-//            }, 2, 6, TimeUnit.SECONDS);
-//        }
-//    }
+    private static void mySleep(int seconds){
+        try {
+            TimeUnit.SECONDS.sleep(seconds);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
 
     @GetMapping("/modbusTcpCache")
     public void addModbusCache(TblModbusTcp tblModbusTcp){

+ 16 - 1
ruoyi-system/src/main/java/com/ruoyi/data/domain/TblDtaDevices.java

@@ -1,2 +1,17 @@
-package com.ruoyi.data.domain;public class TblDtaDevices {
+package com.ruoyi.data.domain;
+
+import com.ruoyi.data.domain.vo.TblEquipmentVo;
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class TblDtaDevices {
+
+    private static final long serialVersionUID=1L;
+
+    private TblEquipmentVo tblEquipmentVo;
+
+    private List<DataPointUnit> dataPointUnitList;
+
 }

+ 43 - 27
ruoyi-system/src/main/java/com/ruoyi/data/service/impl/DataDealServiceImpl.java

@@ -10,7 +10,13 @@ import com.ruoyi.data.service.DataDealservice;
 import lombok.RequiredArgsConstructor;
 import org.springframework.stereotype.Service;
 
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
 @RequiredArgsConstructor
@@ -85,6 +91,11 @@ public class DataDealServiceImpl implements DataDealservice {
         }
     }
 
+    @Override
+    public void getDataToMqtt(TblEquipmentVo tblEquipment) {
+
+    }
+
     public void pubMqttClient(MqttObj tblMqtt,String topic,int qos,String msg) {
         MQTTConnect mqttConnect = new MQTTConnect();
         try {
@@ -94,32 +105,37 @@ public class DataDealServiceImpl implements DataDealservice {
         }
     }
 
-    @Override
-    public void getDataToMqtt(TblEquipmentVo tblEquipment) {
-            TblDatapoint dataPointParam = new TblDatapoint();
-            dataPointParam.setDeviceId(tblEquipment.getId());
-            TblModbusTcp tblModbusTcp = tblModbusTcpMapper.selectById(tblEquipment.getProtocalId());
-            List<TblDatapoint> datapointList= datapointMapper.selectByDeviceId(dataPointParam);
-            JSONObject jsonObject = new JSONObject ();
-            for(TblDatapoint datapoint : datapointList){
-                String value = this.getModBusTcp(datapoint,tblModbusTcp);
-                jsonObject.put(datapoint.getValueType(),value);
-            }
-            String jsonStr = jsonObject.toString();
-            System.out.println(jsonStr);
-            MqttObj tblMqtt = new MqttObj();
-            tblMqtt.setEquipmentId(tblEquipment.getId());
-            List<MqttObj> tblMqttList = tbleEquipmentMqttMapper.selectMqttListByDeviceId(tblMqtt);
-            for(MqttObj obj:tblMqttList){
-                JSONObject topicObj = obj.getTopicQos("tcp");
-                if(topicObj != null){
-                    String topic = topicObj.get("name").toString().replace("#","");
-                    this.pubMqttClient(obj,topic,Integer.parseInt(topicObj.get("qos").toString()),jsonStr);
-                }else{
-                    String topic = "sensor/modbustcp/"+tblEquipment.getId();
-                    this.pubMqttClient(obj,topic,0,jsonStr);
-                }
-            }
-    }
+//    @Override
+//    public void getDataToMqtt(TblEquipmentVo tblEquipment) {
+//            TblDatapoint dataPointParam = new TblDatapoint();
+//            dataPointParam.setDeviceId(tblEquipment.getId());
+//            TblModbusTcp tblModbusTcp = tblModbusTcpMapper.selectById(tblEquipment.getProtocalId());
+//            List<TblDatapoint> datapointList= datapointMapper.selectByDeviceId(dataPointParam);
+//            JSONObject jsonObject = new JSONObject ();
+//            jsonObject.put("deviceId",tblEquipment.getId());
+//            jsonObject.put("deviceName",tblEquipment.getName());
+//            SimpleDateFormat formatter= new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss");
+//            Date date = new Date(System.currentTimeMillis());
+//            jsonObject.put("dataTime",formatter.format(date));
+//            for(TblDatapoint datapoint : datapointList){
+//                String value = this.getModBusTcp(datapoint,tblModbusTcp);
+//                jsonObject.put(datapoint.getValueType(),value);
+//            }
+//            String jsonStr = jsonObject.toString();
+//            System.out.println(jsonStr);
+//            MqttObj tblMqtt = new MqttObj();
+//            tblMqtt.setEquipmentId(tblEquipment.getId());
+//            List<MqttObj> tblMqttList = tbleEquipmentMqttMapper.selectMqttListByDeviceId(tblMqtt);
+//            for(MqttObj obj:tblMqttList){
+//                JSONObject topicObj = obj.getTopicQos("tcp");
+//                if(topicObj != null){
+//                    String topic = topicObj.get("name").toString().replace("#","");
+//                    this.pubMqttClient(obj,topic,Integer.parseInt(topicObj.get("qos").toString()),jsonStr);
+//                }else{
+//                    String topic = "sensor/modbustcp/"+tblEquipment.getId();
+//                    this.pubMqttClient(obj,topic,0,jsonStr);
+//                }
+//            }
+//    }
 
 }