|
@@ -9,13 +9,12 @@ import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
|
|
|
-import javax.annotation.PostConstruct;
|
|
|
import javax.annotation.Resource;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
-public abstract class EvtFusionEngine {
|
|
|
+public abstract class EvtFusionEngine implements IFusionEngine {
|
|
|
private final static Logger logger = LoggerFactory.getLogger(EvtFusionEngine.class);
|
|
|
|
|
|
@Resource
|
|
@@ -33,7 +32,6 @@ public abstract class EvtFusionEngine {
|
|
|
@Resource(name = "threadPoolTaskExecutor")
|
|
|
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
|
|
|
|
|
- @PostConstruct
|
|
|
public void init() {
|
|
|
for (int i = 0; i < threadPoolSize; i++) {
|
|
|
messageQueueList.add(new LinkedBlockingQueue<>());
|
|
@@ -55,14 +53,6 @@ public abstract class EvtFusionEngine {
|
|
|
messageQueue.offer(msg);
|
|
|
}
|
|
|
|
|
|
- public abstract Boolean check(LocationInfo older, LocationInfo newer);
|
|
|
-
|
|
|
- public abstract String getKey(LocationInfo msg);
|
|
|
-
|
|
|
- public abstract void getBizId(LocationInfo msg);
|
|
|
-
|
|
|
- public abstract void processOlderData(LocationInfo msg);
|
|
|
-
|
|
|
public void start() {
|
|
|
for (int i = 0; i < threadPoolSize; i++) {
|
|
|
int finalI = i;
|
|
@@ -76,6 +66,7 @@ public abstract class EvtFusionEngine {
|
|
|
if (!redisCache.hasKey(key)) {
|
|
|
getBizId(locationInfoNew);
|
|
|
redisCache.setCacheObject(key, locationInfoNew);
|
|
|
+ newEvtCallback(locationInfoNew);
|
|
|
continue;
|
|
|
}
|
|
|
LocationInfo locationInfo = redisCache.getCacheObject(key);
|
|
@@ -90,6 +81,7 @@ public abstract class EvtFusionEngine {
|
|
|
processOlderData(locationInfo);
|
|
|
getBizId(locationInfoNew);
|
|
|
redisCache.setCacheObject(key, locationInfoNew);
|
|
|
+ newEvtCallback(locationInfoNew);
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
|
logger.error("{} error", this.engineName, e);
|