|
@@ -1,6 +1,7 @@
|
|
|
package com.ruoyi.web.job;
|
|
|
|
|
|
import cn.hutool.core.convert.Convert;
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
import com.alibaba.otter.canal.client.CanalConnector;
|
|
|
import com.alibaba.otter.canal.client.CanalConnectors;
|
|
@@ -19,6 +20,7 @@ import org.springframework.context.ApplicationContext;
|
|
|
import org.springframework.context.ApplicationContextAware;
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.LinkedCaseInsensitiveMap;
|
|
|
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.HashMap;
|
|
@@ -110,8 +112,8 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
|
|
|
//获取改变后的数据
|
|
|
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
|
|
|
- Map<String, Object> beforeColumnsToMap = parseColumnsToMap(beforeColumnsList, tableName);
|
|
|
- Map<String, Object> afterColumnsToMap = parseColumnsToMap(afterColumnsList, tableName);
|
|
|
+ Map<String, Object> beforeColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(beforeColumnsList, tableName));
|
|
|
+ Map<String, Object> afterColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(afterColumnsList, tableName));
|
|
|
//插入es
|
|
|
indexES(beforeColumnsToMap, afterColumnsToMap, eventType, database, tableName);
|
|
|
|
|
@@ -124,7 +126,7 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
if (column == null) {
|
|
|
return;
|
|
|
}
|
|
|
- map.put(column.getName(), JdbcTypeUtil.typeConvert(tableName, column.getName(), column.getValue(), column.getSqlType(), column.getMysqlType()));
|
|
|
+ map.put(StrUtil.toCamelCase(column.getName()), JdbcTypeUtil.typeConvert(tableName, column.getName(), column.getValue(), column.getSqlType(), column.getMysqlType()));
|
|
|
});
|
|
|
return map;
|
|
|
}
|
|
@@ -137,7 +139,10 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
}
|
|
|
|
|
|
//不是user表中的数据不处理
|
|
|
- if (!StrUtil.equalsAnyIgnoreCase(table, "ship_recognition")) {
|
|
|
+ if (!StrUtil.equalsAnyIgnoreCase(table, "ship_recognition",
|
|
|
+ "alert",
|
|
|
+ "monitor_point",
|
|
|
+ "ship_static_info")) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -147,22 +152,82 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
// 黑烟船舶数据入库
|
|
|
if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
|
|
|
client.createDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
|
|
|
+ // 检测点信息
|
|
|
+ client.createDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
|
|
|
+ // 船舶信息
|
|
|
+ if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
|
|
|
+ client.createDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), afterDataMap);
|
|
|
+ }
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
|
|
|
+ // 违规船舶
|
|
|
+ // 查询检测点和船舶信息
|
|
|
+ if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
|
|
|
+ Map<String, Object> mmsi = client.getDocById(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), "name,destination");
|
|
|
+ if (mmsi != null && ObjectUtil.equal(mmsi.get("code"), 200)) {
|
|
|
+ Map<String, String> data = Convert.toMap(String.class, String.class, mmsi.get("data"));
|
|
|
+ afterDataMap.put("shipName", data.get("name"));
|
|
|
+ afterDataMap.put("destination", data.get("destination"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Map<String, Object> monitor = client.getDocById(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("monitorPointId")), "name");
|
|
|
+ if (monitor != null && ObjectUtil.equal(monitor.get("code"), 200)) {
|
|
|
+ Map<String, String> data = Convert.toMap(String.class, String.class, monitor.get("data"));
|
|
|
+ afterDataMap.put("monitorPointName", data.get("name"));
|
|
|
+ }
|
|
|
+ client.createDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
}
|
|
|
break;
|
|
|
case UPDATE:
|
|
|
if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
|
|
|
client.updateDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
|
|
|
+ // 检测点信息
|
|
|
+ client.updateDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
|
|
|
+ // 船舶信息
|
|
|
+ client.updateDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), afterDataMap);
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
|
|
|
+ // 违规船舶
|
|
|
+ client.updateDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
}
|
|
|
break;
|
|
|
case DELETE:
|
|
|
+ if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
|
|
|
+ client.deleteDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")));
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
|
|
|
+ // 检测点信息
|
|
|
+ client.deleteDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")));
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
|
|
|
+ // 船舶信息
|
|
|
+ client.deleteDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")));
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
|
|
|
+ // 违规船舶
|
|
|
+ client.deleteDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")));
|
|
|
+ }
|
|
|
break;
|
|
|
default:
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 或缺数据库字段的大小写
|
|
|
+ *
|
|
|
+ * @param data
|
|
|
+ * @param <V>
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private static <V> LinkedCaseInsensitiveMap<V> toCaseInsensitiveMap(Map<String, V> data) {
|
|
|
+ LinkedCaseInsensitiveMap map = new LinkedCaseInsensitiveMap();
|
|
|
+ map.putAll(data);
|
|
|
+ return map;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
|
|
this.applicationContext = applicationContext;
|
|
|
}
|
|
|
+
|
|
|
}
|