|
@@ -6,6 +6,10 @@ import cn.hutool.core.date.DateUtil;
|
|
|
import cn.hutool.core.util.NumberUtil;
|
|
|
import cn.hutool.core.util.ObjectUtil;
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
+import cn.hutool.http.HttpUtil;
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
import com.alibaba.otter.canal.client.CanalConnector;
|
|
|
import com.alibaba.otter.canal.client.CanalConnectors;
|
|
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
|
@@ -33,6 +37,7 @@ import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.LinkedCaseInsensitiveMap;
|
|
|
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -64,6 +69,9 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
@Autowired
|
|
|
private CanalConfig canalConfig;
|
|
|
|
|
|
+ @Value("${spring.profiles.active}")
|
|
|
+ private String env;
|
|
|
+
|
|
|
public final static Map<String, String> orgMap = new HashMap<String, String>() {{
|
|
|
put("南京三桥", "南京海事局");
|
|
|
put("南京四桥", "南京海事局");
|
|
@@ -161,7 +169,7 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
@SneakyThrows
|
|
|
private void indexES(Map<String, Object> beforeDataMap, Map<String, Object> afterDataMap, CanalEntry.EventType eventType, String database, String table) {
|
|
|
log.info("eventType:{},database:{},table:{}\nbeforeMap:{},\n afterMap:{}", eventType, database, table, beforeDataMap, afterDataMap);
|
|
|
- if (!StrUtil.equalsAnyIgnoreCase(database, "heiyan", "so2", "ais_database")) {
|
|
|
+ if (!StrUtil.equalsAnyIgnoreCase(database, "smoke_api", "ship", "ais_database")) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -184,35 +192,27 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
switch (eventType) {
|
|
|
case INSERT:
|
|
|
// 黑烟船舶数据入库
|
|
|
- if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
|
|
|
+ if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
|
|
|
// 判断数据状态
|
|
|
if (ObjectUtil.isNotEmpty(afterDataMap.get("rcgSoot")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("rcgSoot")), Convert.toBigDecimal(blackThresholdVal))) {
|
|
|
-
|
|
|
// 查询黑烟图片
|
|
|
- /*String imgUrl = HttpUtil.get(blackImgUrl + Convert.toStr(afterDataMap.get("id")));
|
|
|
- JSONObject response = JSON.parseObject(imgUrl);
|
|
|
- if (200 == response.getInteger("code")) {
|
|
|
- List<String> newImgList = new ArrayList<>();
|
|
|
- JSONArray imgUrlList = response.getJSONObject("data").getJSONArray("imgUrlList");
|
|
|
- for (Object o : imgUrlList) {
|
|
|
- // 只保存细节、全貌开头的图片
|
|
|
- String s = Convert.toStr(o);
|
|
|
- if (StrUtil.containsAny(s, "全貌", "细节")) {
|
|
|
- newImgList.add(s);
|
|
|
+ if ("prod".equals(env)) {
|
|
|
+ String imgUrl = HttpUtil.get(blackImgUrl + Convert.toStr(afterDataMap.get("id")));
|
|
|
+ JSONObject response = JSON.parseObject(imgUrl);
|
|
|
+ if (200 == response.getInteger("code")) {
|
|
|
+ List<String> newImgList = new ArrayList<>();
|
|
|
+ JSONArray imgUrlList = response.getJSONObject("data").getJSONArray("imgUrlList");
|
|
|
+ String sootImgUrl = response.getJSONObject("data").getString("sootImgUrl");
|
|
|
+ for (Object o : imgUrlList) {
|
|
|
+ // 只保存细节、全貌开头的图片
|
|
|
+ String s = Convert.toStr(o);
|
|
|
+ if (StrUtil.containsAny(s, "全貌", "细节")) {
|
|
|
+ newImgList.add("http://" + s);
|
|
|
+ }
|
|
|
}
|
|
|
+ afterDataMap.put("sootImgUrl", StrUtil.isNotBlank(sootImgUrl) ? "http://" + sootImgUrl : "");
|
|
|
+ afterDataMap.put("allImgUrl", newImgList);
|
|
|
}
|
|
|
- afterDataMap.put("allImgUrl", newImgList);
|
|
|
- }*/
|
|
|
- // 查询检测点和船舶信息、船舶进出港记录
|
|
|
- if (ObjectUtil.isNotEmpty(afterDataMap.get("aisMmsi"))) {
|
|
|
- AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("aisMmsi")), "", "");
|
|
|
- if (shipInfo != null) {
|
|
|
- afterDataMap.put("aisShipName", shipInfo.getShipName());
|
|
|
- afterDataMap.put("shipRegionType", shipInfo.getShipRegionType());
|
|
|
- }
|
|
|
- }
|
|
|
- if (ObjectUtil.isNotEmpty(afterDataMap.get("snapPos"))) {
|
|
|
- afterDataMap.put("orgName", orgMap.get(Convert.toStr(afterDataMap.get("snapPos"))));
|
|
|
}
|
|
|
// 超过阈值后直接判定为违规船舶
|
|
|
afterDataMap.put("illegalStatus", 3);
|
|
@@ -223,32 +223,55 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
afterDataMap.put("uploadFlag", 0);
|
|
|
afterDataMap.put("illegalStatus", 1);
|
|
|
}
|
|
|
+ if (ObjectUtil.isNotEmpty(afterDataMap.get("aisMmsi"))) {
|
|
|
+ aisInfoService.getDynamicShipInfo(Convert.toStr(afterDataMap.get("aisMmsi")));
|
|
|
+ }
|
|
|
+ afterDataMap.put("mmsi", afterDataMap.get("aisMmsi"));
|
|
|
+ afterDataMap.put("illegalType", "heiyan");
|
|
|
+ // 查询检测点和船舶信息
|
|
|
+ if ("prod".equals(env)) {
|
|
|
+ if (ObjectUtil.isNotEmpty(afterDataMap.get("aisMmsi")) && !"0".equals(Convert.toStr(afterDataMap.get("aisMmsi")))) {
|
|
|
+ AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("aisMmsi")), "", "");
|
|
|
+ if (shipInfo != null) {
|
|
|
+ afterDataMap.put("aisShipName", shipInfo.getShipName());
|
|
|
+ afterDataMap.put("shipRegionType", shipInfo.getShipRegionType());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (ObjectUtil.isNotEmpty(afterDataMap.get("snapPos"))) {
|
|
|
+ afterDataMap.put("orgName", orgMap.get(Convert.toStr(afterDataMap.get("snapPos"))));
|
|
|
+ }
|
|
|
+ afterDataMap.put("createTime", afterDataMap.get("snapTimeFmt"));
|
|
|
client.createDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
|
|
|
// 删除设备和类型的统计缓存
|
|
|
redisCache.deleteObject(CollUtil.set(false,
|
|
|
CacheConstants.DEVICE_STATIC,
|
|
|
CacheConstants.DEVICE_TYPE_STATIC));
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
|
|
|
// 检测点信息
|
|
|
client.createDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
redisCache.deleteObject(CacheConstants.DEVICE_STATIC);
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) {
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) {
|
|
|
// 设备信息
|
|
|
redisCache.deleteObject(CollUtil.set(false,
|
|
|
CacheConstants.DEVICE_STATIC,
|
|
|
CacheConstants.DEVICE_TYPE_STATIC));
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "alert")) {
|
|
|
// 船舶记录
|
|
|
- // 判断是否大于阈值
|
|
|
- if (ObjectUtil.isNotEmpty(afterDataMap.get("so2Percent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("so2Percent")), Convert.toBigDecimal(so2ThresholdVal))) {
|
|
|
- // 查询检测点和船舶信息、船舶进出港记录
|
|
|
+ if ("prod".equals(env)) {
|
|
|
if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
|
|
|
AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("mmsi")), "", "");
|
|
|
if (shipInfo != null) {
|
|
|
afterDataMap.put("shipName", shipInfo.getShipName());
|
|
|
afterDataMap.put("shipRegionType", shipInfo.getShipRegionType());
|
|
|
}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 判断是否大于阈值
|
|
|
+ if (ObjectUtil.isNotEmpty(afterDataMap.get("so2Percent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("so2Percent")), Convert.toBigDecimal(so2ThresholdVal))) {
|
|
|
+ // 查询检测点和船舶信息、船舶进出港记录
|
|
|
+ if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
|
|
|
ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.lastWeek()), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "", "");
|
|
|
if (eepReportRecInfo != null) {
|
|
|
afterDataMap.put("destination", eepReportRecInfo.getNextPortName());
|
|
@@ -271,6 +294,9 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
afterDataMap.put("uploadFlag", 0);
|
|
|
afterDataMap.put("illegalStatus", 1);
|
|
|
}
|
|
|
+ if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
|
|
|
+ aisInfoService.getDynamicShipInfo(Convert.toStr(afterDataMap.get("mmsi")));
|
|
|
+ }
|
|
|
afterDataMap.put("illegalType", "guangpu");
|
|
|
client.createDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
|
|
@@ -281,10 +307,7 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
CacheConstants.DEVICE_TYPE_STATIC));
|
|
|
} else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
|
|
|
// 嗅探系统-违规船舶
|
|
|
- afterDataMap.put("mmsi", afterDataMap.get("shipMmsi"));
|
|
|
- afterDataMap.put("createTime", afterDataMap.get("peakTime"));
|
|
|
- afterDataMap.put("so2Percent", afterDataMap.get("sPercent"));
|
|
|
- if (ObjectUtil.isNotEmpty(afterDataMap.get("sPercent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("sPercent")), Convert.toBigDecimal(so2ThresholdVal))) {
|
|
|
+ if ("prod".equals(env)) {
|
|
|
// 查询检测点和船舶信息、船舶进出港记录
|
|
|
if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
|
|
|
AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("mmsi")), "", "");
|
|
@@ -292,6 +315,14 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
afterDataMap.put("shipName", shipInfo.getShipName());
|
|
|
afterDataMap.put("shipRegionType", shipInfo.getShipRegionType());
|
|
|
}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ afterDataMap.put("mmsi", afterDataMap.get("shipMmsi"));
|
|
|
+ afterDataMap.put("createTime", afterDataMap.get("peakTime"));
|
|
|
+ afterDataMap.put("so2Percent", afterDataMap.get("sPercent"));
|
|
|
+ if (ObjectUtil.isNotEmpty(afterDataMap.get("sPercent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("sPercent")), Convert.toBigDecimal(so2ThresholdVal))) {
|
|
|
+ // 查询检测点和船舶信息、船舶进出港记录
|
|
|
+ if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
|
|
|
ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.lastWeek()), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "", "");
|
|
|
if (eepReportRecInfo != null) {
|
|
|
afterDataMap.put("destination", eepReportRecInfo.getNextPortName());
|
|
@@ -316,20 +347,23 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
afterDataMap.put("uploadFlag", 0);
|
|
|
afterDataMap.put("illegalStatus", 1);
|
|
|
}
|
|
|
+ if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
|
|
|
+ aisInfoService.getDynamicShipInfo(Convert.toStr(afterDataMap.get("mmsi")));
|
|
|
+ }
|
|
|
afterDataMap.put("illegalType", "xiutan");
|
|
|
client.createDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
}
|
|
|
break;
|
|
|
case UPDATE:
|
|
|
- if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
|
|
|
+ if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
|
|
|
redisCache.deleteObject(CollUtil.set(false,
|
|
|
CacheConstants.DEVICE_STATIC,
|
|
|
CacheConstants.DEVICE_TYPE_STATIC));
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
|
|
|
// 检测点信息
|
|
|
client.updateDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
|
|
|
redisCache.deleteObject(CacheConstants.DEVICE_STATIC);
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) {
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) {
|
|
|
// 设备信息
|
|
|
redisCache.deleteObject(CollUtil.set(false,
|
|
|
CacheConstants.DEVICE_STATIC,
|
|
@@ -343,15 +377,15 @@ public class CanalScheduling implements Runnable, ApplicationContextAware {
|
|
|
}
|
|
|
break;
|
|
|
case DELETE:
|
|
|
- if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
|
|
|
+ if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
|
|
|
redisCache.deleteObject(CollUtil.set(false,
|
|
|
CacheConstants.DEVICE_STATIC,
|
|
|
CacheConstants.DEVICE_TYPE_STATIC));
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
|
|
|
// 检测点信息
|
|
|
client.deleteDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")));
|
|
|
redisCache.deleteObject(CacheConstants.DEVICE_STATIC);
|
|
|
- } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) {
|
|
|
+ } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) {
|
|
|
// 设备信息
|
|
|
redisCache.deleteObject(CollUtil.set(false,
|
|
|
CacheConstants.DEVICE_STATIC,
|