wenhongquan 2 maanden geleden
bovenliggende
commit
012839f5c5
1 gewijzigde bestanden met toevoegingen van 17 en 12 verwijderingen
  1. 17 12
      ruoyi-admin/src/main/java/org/dromara/web/task/KafkaMessageConsumer.java

+ 17 - 12
ruoyi-admin/src/main/java/org/dromara/web/task/KafkaMessageConsumer.java

@@ -50,11 +50,12 @@ public class KafkaMessageConsumer {
     @Scheduled(cron = "*/1 * * * * *")
     public void doChange() {
 //        System.out.println("事件判断");
-        try{
+
             eventMap.forEach((key, value) -> {
-                if(DateUtil.current()-value>Integer.parseInt(finalEventChangeTime)){
+                if((DateUtil.current()-value)>Integer.parseInt(finalEventChangeTime)*1000){
+                    try{
                     //改变状态
-                    ThreadUtil.execAsync(() -> {
+//                    ThreadUtil.execAsync(() -> {
                         //先查找状态
                         TblEventVo tblEventVo = SpringUtils.getBean(ITblEventService.class).queryById(eventSaveMap.get(key));
                         if(tblEventVo != null && tblEventVo.getStatus().equals("2")){
@@ -63,13 +64,15 @@ public class KafkaMessageConsumer {
                             tblEventBo.setStatus("4");
                             SpringUtils.getBean(ITblEventService.class).updateByBo(tblEventBo);
                         }
-                    });
+                        eventSaveMap.remove(key);
+//                    });
+                    } catch (Exception e) {
+
+                    }
                 }
             });
 
-        } catch (Exception e) {
 
-        }
 
 
     }
@@ -111,7 +114,7 @@ public class KafkaMessageConsumer {
         message.forEach(consumerRecord ->{
 
             if(consumerRecord.topic().equals("zs_events_r2p3")){
-                    ThreadUtil.execAsync(() -> {
+//                    ThreadUtil.execAsync(() -> {
                         try{JSONObject msg =  JSONUtil.parseObj( consumerRecord.value());
                             if(msg.getInt("MsgType")!=1001) return;
                             JSONArray list = msg.getJSONArray("ChannelEvtInfo");
@@ -121,13 +124,15 @@ public class KafkaMessageConsumer {
                                 for (int j = 0; j < envlist.size(); j++) {
                                     JSONObject envitem = envlist.getJSONObject(j);
                                     String key = envitem.getInt("EvtType")+msg.getStr("DevNo")+"源驶科技";
-                                    if(eventMap.get(key)!=null){
-                                        if(DateUtil.current()-eventMap.get(key)<Integer.parseInt(finalEventTime)){
-                                            continue;
+                                    try{
+                                        if(eventMap.get(key)!=null){
+                                            if((DateUtil.current()-eventMap.get(key))<Integer.parseInt(finalEventTime)*1000){
+                                                continue;
+                                            }
                                         }
+                                    } catch (Exception e) {
                                     }
                                     eventMap.put(key,DateUtil.current());
-
                                     envitem.set("from","源驶科技");
                                     envitem.set("lx", getEventName(envitem.getInt("EvtType")));
                                     TblEventBo tblEventBo = new TblEventBo();
@@ -178,7 +183,7 @@ public class KafkaMessageConsumer {
                             e.printStackTrace();
                         }
 
-                    });
+//                    });
 
             }
         });