|
@@ -3,19 +3,15 @@ package org.dromara.web.task;
|
|
|
import cn.hutool.core.date.DateUtil;
|
|
|
import cn.hutool.core.io.FileUtil;
|
|
|
import cn.hutool.core.lang.UUID;
|
|
|
-import cn.hutool.core.thread.ThreadUtil;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
import cn.hutool.http.HttpUtil;
|
|
|
-import cn.hutool.json.JSON;
|
|
|
import cn.hutool.json.JSONArray;
|
|
|
import cn.hutool.json.JSONObject;
|
|
|
import cn.hutool.json.JSONUtil;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
-import org.dromara.common.core.service.OssService;
|
|
|
import org.dromara.common.core.utils.SpringUtils;
|
|
|
import org.dromara.system.domain.bo.TblDeviceBo;
|
|
|
import org.dromara.system.domain.bo.TblEventBo;
|
|
|
-import org.dromara.system.domain.vo.SysOssUploadVo;
|
|
|
import org.dromara.system.domain.vo.SysOssVo;
|
|
|
import org.dromara.system.domain.vo.TblDeviceVo;
|
|
|
import org.dromara.system.domain.vo.TblEventVo;
|
|
@@ -30,10 +26,11 @@ import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.io.File;
|
|
|
-import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
@Component
|
|
|
public class KafkaMessageConsumer {
|
|
@@ -49,9 +46,21 @@ public class KafkaMessageConsumer {
|
|
|
private String ffmpegFilePath = "/usr/local/bin/ffmpeg";
|
|
|
@Scheduled(cron = "*/1 * * * * *")
|
|
|
public void doChange() {
|
|
|
+ String eventTime = SpringUtils.getBean(ISysConfigService.class).selectConfigByKey("event_time");
|
|
|
+ if(StrUtil.isEmptyIfStr(eventTime)){
|
|
|
+ eventTime = "200";
|
|
|
+ }
|
|
|
+ finalEventTime = eventTime;
|
|
|
+ String eventChangeTime = SpringUtils.getBean(ISysConfigService.class).selectConfigByKey("event_change_time");
|
|
|
+ if(StrUtil.isEmptyIfStr(eventChangeTime)){
|
|
|
+ eventChangeTime = "400";
|
|
|
+ }
|
|
|
+ finalEventChangeTime = eventChangeTime;
|
|
|
// System.out.println("事件判断");
|
|
|
|
|
|
+ AtomicBoolean succes = new AtomicBoolean(false);
|
|
|
eventMap.forEach((key, value) -> {
|
|
|
+
|
|
|
if((DateUtil.current()-value)>Integer.parseInt(finalEventChangeTime)*1000){
|
|
|
try{
|
|
|
//改变状态
|
|
@@ -63,13 +72,19 @@ public class KafkaMessageConsumer {
|
|
|
tblEventBo.setId(eventSaveMap.get(key));
|
|
|
tblEventBo.setStatus("4");
|
|
|
SpringUtils.getBean(ITblEventService.class).updateByBo(tblEventBo);
|
|
|
+ succes.set(true);
|
|
|
}
|
|
|
eventSaveMap.remove(key);
|
|
|
+
|
|
|
// });
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if(succes.get()){
|
|
|
+ eventMap.remove(key);
|
|
|
+ }
|
|
|
});
|
|
|
|
|
|
|
|
@@ -78,33 +93,96 @@ public class KafkaMessageConsumer {
|
|
|
}
|
|
|
|
|
|
|
|
|
+ public void processEvent(Object message){
|
|
|
+ if(devmap.keySet().size()<1){
|
|
|
+ List<TblDeviceVo> listdev = SpringUtils.getBean(ITblDeviceService.class).queryList(new TblDeviceBo());
|
|
|
+ devmap = new java.util.HashMap<>();
|
|
|
+ listdev.forEach(item ->{
|
|
|
+ if(item.getExt1()!=null){
|
|
|
+ try{
|
|
|
+ JSONObject ysObject = JSONUtil.parseObj(item.getExt1());
|
|
|
+ if(ysObject.get("name")!=null){
|
|
|
+ devmap.put(ysObject.getStr("name"), item);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
+ }
|
|
|
|
|
|
+ try{JSONObject msg = JSONUtil.parseObj( message);
|
|
|
+ if(msg.getInt("MsgType")!=1001) return;
|
|
|
+ JSONArray list = msg.getJSONArray("ChannelEvtInfo");
|
|
|
+ for (int i = 0; i < list.size(); i++) {
|
|
|
+ JSONObject item = list.getJSONObject(i);
|
|
|
+ JSONArray envlist = item.getJSONArray("Evt_List");
|
|
|
+ for (int j = 0; j < envlist.size(); j++) {
|
|
|
+ JSONObject envitem = envlist.getJSONObject(j);
|
|
|
+ String key = envitem.getInt("EvtType")+msg.getStr("DevNo")+"源驶科技";
|
|
|
+ try{
|
|
|
+ if(eventMap.get(key)!=null){
|
|
|
+ if((DateUtil.current()-eventMap.get(key))<Integer.parseInt(finalEventTime)*1000){
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ }
|
|
|
+ eventMap.put(key,DateUtil.current());
|
|
|
+ envitem.set("from","源驶科技");
|
|
|
+ envitem.set("lx", getEventName(envitem.getInt("EvtType")));
|
|
|
+ TblEventBo tblEventBo = new TblEventBo();
|
|
|
+ tblEventBo.setCreateTime(DateUtil.date(msg.getLong("Timestamp")));
|
|
|
+ tblEventBo.setExt2(JSONUtil.toJsonStr(envitem));
|
|
|
+ List<String> urls = new ArrayList<>();
|
|
|
+ //下载图片
|
|
|
+ String path =envitem.getStr("EventImagePath");
|
|
|
+ String path5 =envitem.getStr("EventVideoPath");
|
|
|
+// String path2 =envitem.getStr("EventImagePath2");
|
|
|
+// String path3 =envitem.getStr("EventImagePath3");
|
|
|
+// String path4 =envitem.getStr("EventImagePath4");
|
|
|
+
|
|
|
+ if(!StrUtil.isEmptyIfStr( path)){
|
|
|
+ path = downloadImage(path);
|
|
|
+ urls.add( path);
|
|
|
+ }
|
|
|
+ if(!StrUtil.isEmptyIfStr( path5)){
|
|
|
+ path5 = downloadImage(path5);
|
|
|
+ urls.add( path5);
|
|
|
+ }
|
|
|
+ tblEventBo.setExt1(JSONUtil.toJsonStr(urls));
|
|
|
+ tblEventBo.setAddr(msg.getStr("DevNo"));
|
|
|
+ tblEventBo.setStatus("2");
|
|
|
+ //根据DevNo 查询设备名称
|
|
|
+ TblDeviceVo deviceVo = devmap.get(msg.getStr("DevNo"));
|
|
|
+ if(deviceVo!=null){
|
|
|
+ tblEventBo.setAddr(deviceVo.getName());
|
|
|
+ envitem.set("deviceVo", deviceVo);
|
|
|
+ tblEventBo.setExt2(JSONUtil.toJsonStr(envitem));
|
|
|
+ }
|
|
|
+ SpringUtils.getBean(ITblEventService.class).insertByBo(tblEventBo);
|
|
|
+ eventSaveMap.put(key,tblEventBo.getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- public void consumerMessage(List<ConsumerRecord<String, Object>> message) {
|
|
|
- System.out.println("收到消息:" + message);
|
|
|
-
|
|
|
- String eventTime = SpringUtils.getBean(ISysConfigService.class).selectConfigByKey("event_time");
|
|
|
- if(StrUtil.isEmptyIfStr(eventTime)){
|
|
|
- eventTime = "200";
|
|
|
+ }catch (Exception e){
|
|
|
+ e.printStackTrace();
|
|
|
}
|
|
|
- finalEventTime = eventTime;
|
|
|
- String eventChangeTime = SpringUtils.getBean(ISysConfigService.class).selectConfigByKey("event_change_time");
|
|
|
- if(StrUtil.isEmptyIfStr(eventChangeTime)){
|
|
|
- eventChangeTime = "400";
|
|
|
- }
|
|
|
- finalEventChangeTime = eventChangeTime;
|
|
|
+ }
|
|
|
+ private Map<String, TblDeviceVo> devmap = new HashMap<>();
|
|
|
|
|
|
+ public void consumerMessage(List<ConsumerRecord<String, Object>> message) {
|
|
|
+ System.out.println("收到消息:" + message);
|
|
|
//查询摄像头
|
|
|
List<TblDeviceVo> listdev = SpringUtils.getBean(ITblDeviceService.class).queryList(new TblDeviceBo());
|
|
|
- Map<String, TblDeviceVo> map = new java.util.HashMap<>();
|
|
|
+ devmap = new java.util.HashMap<>();
|
|
|
listdev.forEach(item ->{
|
|
|
if(item.getExt1()!=null){
|
|
|
try{
|
|
|
JSONObject ysObject = JSONUtil.parseObj(item.getExt1());
|
|
|
if(ysObject.get("name")!=null){
|
|
|
- map.put(ysObject.getStr("name"), item);
|
|
|
+ devmap.put(ysObject.getStr("name"), item);
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
|
|
@@ -114,82 +192,10 @@ public class KafkaMessageConsumer {
|
|
|
message.forEach(consumerRecord ->{
|
|
|
|
|
|
if(consumerRecord.topic().equals("zs_events_r2p3")){
|
|
|
-// ThreadUtil.execAsync(() -> {
|
|
|
- try{JSONObject msg = JSONUtil.parseObj( consumerRecord.value());
|
|
|
- if(msg.getInt("MsgType")!=1001) return;
|
|
|
- JSONArray list = msg.getJSONArray("ChannelEvtInfo");
|
|
|
- for (int i = 0; i < list.size(); i++) {
|
|
|
- JSONObject item = list.getJSONObject(i);
|
|
|
- JSONArray envlist = item.getJSONArray("Evt_List");
|
|
|
- for (int j = 0; j < envlist.size(); j++) {
|
|
|
- JSONObject envitem = envlist.getJSONObject(j);
|
|
|
- String key = envitem.getInt("EvtType")+msg.getStr("DevNo")+"源驶科技";
|
|
|
- try{
|
|
|
- if(eventMap.get(key)!=null){
|
|
|
- if((DateUtil.current()-eventMap.get(key))<Integer.parseInt(finalEventTime)*1000){
|
|
|
- continue;
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- }
|
|
|
- eventMap.put(key,DateUtil.current());
|
|
|
- envitem.set("from","源驶科技");
|
|
|
- envitem.set("lx", getEventName(envitem.getInt("EvtType")));
|
|
|
- TblEventBo tblEventBo = new TblEventBo();
|
|
|
- tblEventBo.setCreateTime(DateUtil.date(msg.getLong("Timestamp")));
|
|
|
- tblEventBo.setExt2(JSONUtil.toJsonStr(envitem));
|
|
|
- List<String> urls = new ArrayList<>();
|
|
|
- //下载图片
|
|
|
- String path =envitem.getStr("EventImagePath");
|
|
|
- String path2 =envitem.getStr("EventImagePath2");
|
|
|
- String path3 =envitem.getStr("EventImagePath3");
|
|
|
- String path4 =envitem.getStr("EventImagePath4");
|
|
|
- String path5 =envitem.getStr("EventVideoPath");
|
|
|
- if(!StrUtil.isEmptyIfStr( path)){
|
|
|
- path = downloadImage(path);
|
|
|
- urls.add( path);
|
|
|
- }
|
|
|
- if(!StrUtil.isEmptyIfStr( path2)){
|
|
|
- path2 = downloadImage(path2);
|
|
|
- urls.add( path2);
|
|
|
- }
|
|
|
- if(!StrUtil.isEmptyIfStr( path3)){
|
|
|
- path3 = downloadImage(path3);
|
|
|
- urls.add( path3);
|
|
|
- }
|
|
|
- if(!StrUtil.isEmptyIfStr( path4)){
|
|
|
- path4 = downloadImage(path4);
|
|
|
- urls.add( path4);
|
|
|
- }
|
|
|
- if(!StrUtil.isEmptyIfStr( path5)){
|
|
|
- path5 = downloadImage(path5);
|
|
|
- urls.add( path5);
|
|
|
- }
|
|
|
- tblEventBo.setExt1(JSONUtil.toJsonStr(urls));
|
|
|
- tblEventBo.setAddr(msg.getStr("DevNo"));
|
|
|
- //根据DevNo 查询设备名称
|
|
|
- TblDeviceVo deviceVo = map.get(msg.getStr("DevNo"));
|
|
|
- if(deviceVo!=null){
|
|
|
- tblEventBo.setAddr(deviceVo.getName());
|
|
|
- envitem.set("deviceVo", deviceVo);
|
|
|
- tblEventBo.setExt2(JSONUtil.toJsonStr(envitem));
|
|
|
- }
|
|
|
- SpringUtils.getBean(ITblEventService.class).insertByBo(tblEventBo);
|
|
|
- eventSaveMap.put(key,tblEventBo.getId());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- }catch (Exception e){
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
-
|
|
|
-// });
|
|
|
-
|
|
|
+ processEvent(consumerRecord.value());
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- //消息确认
|
|
|
-// ack.acknowledge();
|
|
|
}
|
|
|
|
|
|
public String downloadImage(String path){
|
|
@@ -211,11 +217,7 @@ public class KafkaMessageConsumer {
|
|
|
FileUtil.del(file);
|
|
|
return oss.getFileName();
|
|
|
}
|
|
|
-// public static void main(String[] args) {
|
|
|
-// KafkaMessageConsumer kafkaMessageConsumer = new KafkaMessageConsumer();
|
|
|
-// kafkaMessageConsumer.downloadImage("http://jtjai.xt.wenhq.top:8083/api/oss/local/upload/image/2025/06/13/1.mp4");
|
|
|
-// }
|
|
|
-//
|
|
|
+
|
|
|
|
|
|
|
|
|
|