|
@@ -8,7 +8,9 @@ import com.alibaba.otter.canal.client.CanalConnectors;
|
|
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
|
|
import com.alibaba.otter.canal.protocol.Message;
|
|
|
import com.google.protobuf.InvalidProtocolBufferException;
|
|
|
+import com.ruoyi.common.constant.CacheConstants;
|
|
|
import com.ruoyi.common.constant.ElasticConstants;
|
|
|
+import com.ruoyi.common.core.redis.RedisCache;
|
|
|
import com.ruoyi.common.utils.JdbcTypeUtil;
|
|
|
import com.ruoyi.framework.config.ElasticSearchClient;
|
|
|
import com.ruoyi.web.core.config.CanalConfig;
|
|
@@ -40,6 +42,8 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
private ApplicationContext applicationContext;
|
|
|
@Autowired
|
|
|
private ElasticSearchClient client;
|
|
|
+ @Autowired
|
|
|
+ private RedisCache redisCache;
|
|
|
// @Resource
|
|
|
// private CanalConnector canalConnector;
|
|
|
|
|
@@ -142,6 +146,7 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
if (!StrUtil.equalsAnyIgnoreCase(table, "ship_recognition",
|
|
|
"alert",
|
|
|
"monitor_point",
|
|
|
+ "device",
|
|
|
"ship_static_info",
|
|
|
"illegal_ship",
|
|
|
"sem_instrument",
|
|
@@ -158,6 +163,10 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
|
|
|
// 检测点信息
|
|
|
client.createDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
+ redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) {
|
|
|
+ // 设备信息
|
|
|
+ redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
|
|
|
// 船舶信息
|
|
|
if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
|
|
@@ -183,6 +192,7 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
|
|
|
// 嗅探系统-站点信息
|
|
|
client.createDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
+ redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
|
|
|
// 嗅探系统-违规船舶
|
|
|
// 查询站点信息
|
|
@@ -202,6 +212,10 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
|
|
|
// 检测点信息
|
|
|
client.updateDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
+ redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) {
|
|
|
+ // 设备信息
|
|
|
+ redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
|
|
|
// 船舶信息
|
|
|
client.updateDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), afterDataMap);
|
|
@@ -211,6 +225,7 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
|
|
|
// 嗅探系统-站点信息
|
|
|
client.updateDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
+ redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
|
|
|
// 嗅探系统-违规船舶
|
|
|
// 查询站点信息
|
|
@@ -228,6 +243,10 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
|
|
|
// 检测点信息
|
|
|
client.deleteDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")));
|
|
|
+ redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) {
|
|
|
+ // 设备信息
|
|
|
+ redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
|
|
|
// 船舶信息
|
|
|
client.deleteDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")));
|
|
@@ -237,6 +256,7 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
|
|
|
// 嗅探系统-站点信息
|
|
|
client.deleteDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")));
|
|
|
+ redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
|
|
|
// 嗅探系统-违规船舶
|
|
|
client.deleteDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")));
|