|
@@ -2,6 +2,7 @@ package com.ruoyi.web.job;
|
|
|
|
|
|
import cn.hutool.core.collection.CollUtil;
|
|
|
import cn.hutool.core.convert.Convert;
|
|
|
+import cn.hutool.core.util.NumberUtil;
|
|
|
import cn.hutool.core.util.ObjectUtil;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
import com.alibaba.otter.canal.client.CanalConnector;
|
|
@@ -14,6 +15,7 @@ import com.ruoyi.common.constant.ElasticConstants;
|
|
|
import com.ruoyi.common.core.redis.RedisCache;
|
|
|
import com.ruoyi.common.utils.JdbcTypeUtil;
|
|
|
import com.ruoyi.framework.config.ElasticSearchClient;
|
|
|
+import com.ruoyi.system.service.ISysConfigService;
|
|
|
import com.ruoyi.web.core.config.CanalConfig;
|
|
|
import lombok.SneakyThrows;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
@@ -45,6 +47,8 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
private ElasticSearchClient client;
|
|
|
@Autowired
|
|
|
private RedisCache redisCache;
|
|
|
+ @Autowired
|
|
|
+ private ISysConfigService configService;
|
|
|
// @Resource
|
|
|
// private CanalConnector canalConnector;
|
|
|
|
|
@@ -157,6 +161,9 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ String so2ThresholdVal = configService.selectConfigByKey("so2.so2");
|
|
|
+ String blackThresholdVal = configService.selectConfigByKey("black.rcgSoot");
|
|
|
+
|
|
|
// 根据不同类型处理相应的逻辑
|
|
|
switch (eventType) {
|
|
|
case INSERT:
|
|
@@ -176,13 +183,13 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
redisCache.deleteObject(CollUtil.set(false,
|
|
|
CacheConstants.DEVICE_STATIC,
|
|
|
CacheConstants.DEVICE_TYPE_STATIC));
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
|
|
|
+ } /*else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
|
|
|
// 船舶信息
|
|
|
if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
|
|
|
client.createDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), afterDataMap);
|
|
|
}
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
|
|
|
- // 违规船舶
|
|
|
+ }*/ else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
|
|
|
+ // 船舶记录
|
|
|
// 查询检测点和船舶信息
|
|
|
if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
|
|
|
Map<String, Object> mmsi = client.getDocById(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), "name,destination");
|
|
@@ -197,6 +204,15 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
Map<String, String> data = Convert.toMap(String.class, String.class, monitor.get("data"));
|
|
|
afterDataMap.put("monitorPointName", data.get("name"));
|
|
|
}
|
|
|
+ // 判断是否大于阈值
|
|
|
+ if (ObjectUtil.isNotEmpty(afterDataMap.get("so2Percent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("so2Percent")), Convert.toBigDecimal(so2ThresholdVal))) {
|
|
|
+ // 嫌疑船舶
|
|
|
+ afterDataMap.put("illegalStatus", 2);
|
|
|
+ // TODO 上报行政检查系统
|
|
|
+
|
|
|
+ } else {
|
|
|
+ afterDataMap.put("illegalStatus", 1);
|
|
|
+ }
|
|
|
client.createDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
|
|
|
// 嗅探系统-站点信息
|
|
@@ -234,12 +250,6 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
redisCache.deleteObject(CollUtil.set(false,
|
|
|
CacheConstants.DEVICE_STATIC,
|
|
|
CacheConstants.DEVICE_TYPE_STATIC));
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
|
|
|
- // 船舶信息
|
|
|
- client.updateDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), afterDataMap);
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
|
|
|
- // 违规船舶
|
|
|
- client.updateDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
|
|
|
// 嗅探系统-站点信息
|
|
|
client.updateDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
@@ -259,9 +269,7 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
}
|
|
|
break;
|
|
|
case DELETE:
|
|
|
- if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
|
|
|
- client.deleteDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")));
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
|
|
|
+ if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
|
|
|
redisCache.deleteObject(CollUtil.set(false,
|
|
|
CacheConstants.DEVICE_STATIC,
|
|
|
CacheConstants.DEVICE_TYPE_STATIC));
|
|
@@ -277,18 +285,12 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
|
|
|
// 船舶信息
|
|
|
client.deleteDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")));
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
|
|
|
- // 违规船舶
|
|
|
- client.deleteDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")));
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
|
|
|
// 嗅探系统-站点信息
|
|
|
client.deleteDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")));
|
|
|
redisCache.deleteObject(CollUtil.set(false,
|
|
|
CacheConstants.DEVICE_STATIC,
|
|
|
CacheConstants.DEVICE_TYPE_STATIC));
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
|
|
|
- // 嗅探系统-违规船舶
|
|
|
- client.deleteDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")));
|
|
|
}
|
|
|
break;
|
|
|
default:
|