|
@@ -1,17 +1,20 @@
|
|
|
package com.huashe.park.application.engine;
|
|
|
|
|
|
-import com.alibaba.fastjson2.JSONObject;
|
|
|
-import com.huashe.common.utils.StringUtils;
|
|
|
-import com.huashe.park.common.DateTimeUtil;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+
|
|
|
+
|
|
|
import com.huashe.park.core.redis.RedisProxy;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
|
|
|
-import javax.annotation.Resource;
|
|
|
-import java.util.List;
|
|
|
-import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import com.alibaba.fastjson2.JSONObject;
|
|
|
+import com.huashe.common.utils.StringUtils;
|
|
|
+import com.huashe.park.common.DateTimeUtil;
|
|
|
|
|
|
public abstract class EvtFusionEngine implements IFusionEngine {
|
|
|
private final static Logger logger = LoggerFactory.getLogger(EvtFusionEngine.class);
|
|
@@ -34,7 +37,6 @@ public abstract class EvtFusionEngine implements IFusionEngine {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
public void setEngineName(String name) {
|
|
|
this.engineName = name;
|
|
|
}
|
|
@@ -57,10 +59,13 @@ public abstract class EvtFusionEngine implements IFusionEngine {
|
|
|
threadPoolTaskExecutor.execute(() -> {
|
|
|
while (true) {
|
|
|
try {
|
|
|
+ logger.info("{} is running", this.engineName);
|
|
|
JSONObject msg = queue.get(finalI).take();
|
|
|
- LocationInfo locationInfoNew = new LocationInfo(msg.getDouble("latitude"), msg.getDouble("longitude"), msg.getLong("srcTimestamp"));
|
|
|
+ LocationInfo locationInfoNew = new LocationInfo(msg.getDouble("latitude"),
|
|
|
+ msg.getDouble("longitude"), msg.getLong("srcTimestamp"));
|
|
|
locationInfoNew.setMsg(msg);
|
|
|
String key = getKey(locationInfoNew);
|
|
|
+ logger.info("{} is running {}", this.engineName, key);
|
|
|
if (!redisService.hasKey(key)) {
|
|
|
getBizId(locationInfoNew);
|
|
|
redisService.setCacheObject(key, locationInfoNew);
|
|
@@ -74,14 +79,16 @@ public abstract class EvtFusionEngine implements IFusionEngine {
|
|
|
locationInfo.setSrcTimestamp(locationInfoNew.getSrcTimestamp());
|
|
|
locationInfo.setHandleTimestamp(DateTimeUtil.timestampMillis());
|
|
|
redisService.setCacheObject(key, locationInfo);
|
|
|
- } else {
|
|
|
+ }
|
|
|
+ else {
|
|
|
// 老数据释放
|
|
|
processOlderData(locationInfo);
|
|
|
getBizId(locationInfoNew);
|
|
|
redisService.setCacheObject(key, locationInfoNew);
|
|
|
newEvtCallback(locationInfoNew);
|
|
|
}
|
|
|
- } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ catch (InterruptedException e) {
|
|
|
logger.error("{} error", this.engineName, e);
|
|
|
// 重置中断状态
|
|
|
Thread.currentThread().interrupt();
|
|
@@ -90,7 +97,8 @@ public abstract class EvtFusionEngine implements IFusionEngine {
|
|
|
logger.error("任务已中断,不再继续执行");
|
|
|
break;
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
logger.error("{} error", this.engineName, e);
|
|
|
}
|
|
|
}
|
|
@@ -101,20 +109,20 @@ public abstract class EvtFusionEngine implements IFusionEngine {
|
|
|
/**
|
|
|
* 计算字符串的哈希值并对某个数取余数。
|
|
|
*
|
|
|
- * @param str 字符串
|
|
|
+ * @param str 字符串
|
|
|
* @param modulo 取余数的基数
|
|
|
* @return 字符串哈希值对 modulo 取余的结果
|
|
|
*/
|
|
|
private int computeHashModulo(String str, int modulo) {
|
|
|
long hash = 0;
|
|
|
long base = 31;
|
|
|
- long baseMod = base % modulo; // 预计算乘法因子
|
|
|
+ long baseMod = base % modulo; // 预计算乘法因子
|
|
|
|
|
|
for (char c : str.toCharArray()) {
|
|
|
- hash = (hash * baseMod + c) % modulo; // 避免整数溢出
|
|
|
+ hash = (hash * baseMod + c) % modulo; // 避免整数溢出
|
|
|
}
|
|
|
|
|
|
- return (int) (hash % modulo); // 最终结果转换为 int
|
|
|
+ return (int) (hash % modulo); // 最终结果转换为 int
|
|
|
}
|
|
|
|
|
|
}
|