|
@@ -1,9 +1,9 @@
|
|
|
package com.ruoyi.bd.service.engine;
|
|
|
|
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
|
+import com.ruoyi.common.core.redis.RedisCache;
|
|
|
import com.ruoyi.common.utils.DateTimeUtil;
|
|
|
import com.ruoyi.common.utils.StringUtils;
|
|
|
-import org.apache.commons.lang3.ObjectUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
@@ -13,17 +13,20 @@ import javax.annotation.PostConstruct;
|
|
|
import javax.annotation.Resource;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
public abstract class EvtFusionEngine {
|
|
|
private final static Logger logger = LoggerFactory.getLogger(EvtFusionEngine.class);
|
|
|
- private final ConcurrentHashMap<String, LocationInfo> evtBucket = new ConcurrentHashMap<>();
|
|
|
|
|
|
- private final List<LinkedBlockingQueue<JSONObject>> messageQueueList = new ArrayList<>(5);
|
|
|
+ @Resource
|
|
|
+ private RedisCache redisCache;
|
|
|
+
|
|
|
|
|
|
private String engineName;
|
|
|
|
|
|
+ private static final List<LinkedBlockingQueue<JSONObject>> messageQueueList = new ArrayList<>(5);
|
|
|
+
|
|
|
+
|
|
|
@Value("${evt-fusion.thread-pool-size:5}")
|
|
|
private int threadPoolSize;
|
|
|
|
|
@@ -37,6 +40,7 @@ public abstract class EvtFusionEngine {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
public void setEngineName(String name) {
|
|
|
this.engineName = name;
|
|
|
}
|
|
@@ -67,22 +71,35 @@ public abstract class EvtFusionEngine {
|
|
|
try {
|
|
|
JSONObject msg = messageQueueList.get(finalI).take();
|
|
|
LocationInfo locationInfoNew = new LocationInfo(msg.getDouble("latitude"), msg.getDouble("longitude"), msg.getLong("srcTimestamp"));
|
|
|
+ locationInfoNew.setMsg(msg);
|
|
|
String key = getKey(locationInfoNew);
|
|
|
- LocationInfo locationInfo = evtBucket.get(key);
|
|
|
- locationInfoNew.setEvtTimestamp(DateTimeUtil.timestampMillis());
|
|
|
- if (ObjectUtils.isEmpty(locationInfo)) {
|
|
|
+ if (!redisCache.hasKey(key)) {
|
|
|
getBizId(locationInfoNew);
|
|
|
- evtBucket.put(key, locationInfoNew);
|
|
|
+ redisCache.setCacheObject(key, locationInfoNew);
|
|
|
continue;
|
|
|
}
|
|
|
+ LocationInfo locationInfo = redisCache.getCacheObject(key);
|
|
|
+ locationInfoNew.setEvtTimestamp(DateTimeUtil.timestampMillis());
|
|
|
// 判断消息是否需要融合
|
|
|
- if (!check(locationInfo, locationInfoNew)) {
|
|
|
+ if (check(locationInfo, locationInfoNew)) {
|
|
|
+ locationInfo.setSrcTimestamp(locationInfoNew.getSrcTimestamp());
|
|
|
+ locationInfo.setHandleTimestamp(DateTimeUtil.timestampMillis());
|
|
|
+ redisCache.setCacheObject(key, locationInfo);
|
|
|
+ } else {
|
|
|
// 老数据释放
|
|
|
processOlderData(locationInfo);
|
|
|
+ getBizId(locationInfoNew);
|
|
|
+ redisCache.setCacheObject(key, locationInfoNew);
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ logger.error("{} error", this.engineName, e);
|
|
|
+ // 重置中断状态
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ // 根据业务逻辑决定是否继续执行
|
|
|
+ if (Thread.currentThread().isInterrupted()) {
|
|
|
+ logger.error("任务已中断,不再继续执行");
|
|
|
+ break;
|
|
|
}
|
|
|
- getBizId(locationInfoNew);
|
|
|
- locationInfo.setSrcTimestamp(locationInfoNew.getSrcTimestamp());
|
|
|
- evtBucket.put(key, locationInfoNew);
|
|
|
} catch (Exception e) {
|
|
|
logger.error("{} error", this.engineName, e);
|
|
|
}
|