package com.ruoyi.web.job; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.copier.CopyOptions; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import com.ruoyi.common.constant.CacheConstants; import com.ruoyi.common.constant.ElasticConstants; import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.utils.JdbcTypeUtil; import com.ruoyi.common.utils.uuid.IdUtils; import com.ruoyi.framework.config.ElasticSearchClient; import com.ruoyi.system.domain.IllegalShipData; import com.ruoyi.system.domain.vo.AisShipInfo; import com.ruoyi.system.domain.vo.ShipEepReportRecInfo; import com.ruoyi.system.service.IAisInfoService; import com.ruoyi.system.service.IIllegalShipDataService; import com.ruoyi.system.service.ISysConfigService; import com.ruoyi.web.core.config.CanalConfig; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.LinkedCaseInsensitiveMap; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @Description: TODO * @Author: huangcheng * @Date: 2021/8/16 * @Version V1.0 */ @Slf4j @Component public class CanalScheduling implements Runnable, ApplicationContextAware { private ApplicationContext applicationContext; @Autowired private ElasticSearchClient client; @Autowired private RedisCache redisCache; @Autowired private ISysConfigService configService; @Autowired private IAisInfoService aisInfoService; // @Resource // private CanalConnector canalConnector; @Value("${black.snapImgUrl}") private String blackImgUrl; @Autowired private IIllegalShipDataService illegalShipDataService; @Autowired private CanalConfig canalConfig; @Value("${spring.profiles.active}") private String env; public final static Map orgMap = new HashMap() {{ put("南京三桥", "南京海事局"); put("南京四桥", "南京海事局"); put("润扬大桥", "扬州海事局"); put("泰州大桥", "泰州海事局"); put("江阴大桥", "江阴海事局"); put("苏通大桥", "常熟海事局"); }}; public final static Map heiyanDeviceMap = new HashMap() {{ put("南京三桥", "65"); put("润扬大桥", "66"); put("江阴大桥", "67"); put("苏通大桥", "68"); }}; @Override @Scheduled(fixedDelay = 100) //每隔100秒执行 public void run() { for (CanalConfig.Config config : canalConfig.getConfigs()) { // 创建链接 CanalConnector canalConnector = CanalConnectors .newSingleConnector(new InetSocketAddress(config.getHostname(), config.getPort()), config.getDestination(), "", ""); long batchId = -1; try { canalConnector.connect(); canalConnector.subscribe(); canalConnector.rollback(); //每次拉取条数 int batchSize = 1000; Message message = canalConnector.getWithoutAck(batchSize); //批次id batchId = message.getId(); List entries = message.getEntries(); if (batchId != -1 && entries.size() > 0) { entries.forEach(entry -> { //MySQL种my.cnf中配置的是binlog_format = ROW,这里只解析ROW类型 if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { //解析处理 publishCanalEvent(entry); } }); } canalConnector.ack(batchId); } catch (Exception e) { log.info("canal存在异常:{}", e.getMessage()); e.printStackTrace(); canalConnector.rollback(batchId); } finally { // 断开连接 canalConnector.disconnect(); } } } private void publishCanalEvent(CanalEntry.Entry entry) { //表名 String tableName = entry.getHeader().getTableName(); //数据库名 String database = entry.getHeader().getSchemaName(); // 操作类型 CanalEntry.EventType eventType = entry.getHeader().getEventType(); log.info(String.format("========> binlog[%s:%s] , name[%s,%s] , eventType[%s]", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), database, tableName, eventType)); CanalEntry.RowChange rowChange; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); return; } rowChange.getRowDatasList().forEach(rowData -> { //获取改变前的数据 List beforeColumnsList = rowData.getBeforeColumnsList(); //获取改变后的数据 List afterColumnsList = rowData.getAfterColumnsList(); Map beforeColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(beforeColumnsList, tableName)); Map afterColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(afterColumnsList, tableName)); //插入es indexES(beforeColumnsToMap, afterColumnsToMap, eventType, database, tableName); }); } Map parseColumnsToMap(List columns, String tableName) { Map map = new HashMap<>(); columns.forEach(column -> { if (column == null) { return; } map.put(StrUtil.lowerFirst(StrUtil.toCamelCase(column.getName())), JdbcTypeUtil.typeConvert(tableName, column.getName(), column.getValue(), column.getSqlType(), column.getMysqlType())); }); return map; } @SneakyThrows private void indexES(Map beforeDataMap, Map afterDataMap, CanalEntry.EventType eventType, String database, String table) { log.info("eventType:{},database:{},table:{}\nbeforeMap:{},\n afterMap:{}", eventType, database, table, beforeDataMap, afterDataMap); if (!StrUtil.equalsAnyIgnoreCase(database, "smoke_api", "ship", "ais_database")) { return; } //不是user表中的数据不处理 if (!StrUtil.equalsAnyIgnoreCase(table, "ship_recognition", "ship_snap_address", "alert", "monitor_point", "device", "illegal_ship", "sem_instrument")) { return; } String so2ThresholdVal = configService.selectConfigByKey("so2.so2"); String blackThresholdVal = configService.selectConfigByKey("black.rcgSoot"); // 根据不同类型处理相应的逻辑 switch (eventType) { case INSERT: // 黑烟船舶数据入库 if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) { if (ObjectUtil.isEmpty(afterDataMap.get("aisMmsi")) /*|| Convert.toStr(afterDataMap.get("aisMmsi")).length() != 9*/) { break; } // 判断数据状态 if (ObjectUtil.isNotEmpty(afterDataMap.get("rcgSoot")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("rcgSoot")), Convert.toBigDecimal(blackThresholdVal))) { // 查询黑烟图片 if ("prod".equals(env)) { String imgUrl = HttpUtil.get(blackImgUrl + Convert.toStr(afterDataMap.get("id"))); JSONObject response = JSON.parseObject(imgUrl); if (200 == response.getInteger("code")) { List newImgList = new ArrayList<>(); JSONArray imgUrlList = response.getJSONObject("data").getJSONArray("imgUrlList"); String sootImgUrl = response.getJSONObject("data").getString("sootImgUrl"); for (Object o : imgUrlList) { // 只保存细节、全貌开头的图片 String s = Convert.toStr(o); if (StrUtil.containsAny(s, "全貌", "细节")) { newImgList.add("http://" + s); } } afterDataMap.put("sootImgUrl", StrUtil.isNotBlank(sootImgUrl) ? "http://" + sootImgUrl : ""); afterDataMap.put("allImgUrl", newImgList); } } // 超过阈值后直接判定为违规船舶 afterDataMap.put("illegalStatus", 3); afterDataMap.put("uploadFlag", 0); // TODO 上报行政检查系统 } else { afterDataMap.put("uploadFlag", 0); afterDataMap.put("illegalStatus", 1); } if (ObjectUtil.isNotEmpty(afterDataMap.get("aisMmsi")) && Convert.toStr(afterDataMap.get("aisMmsi")).length() == 9) { aisInfoService.getDynamicShipInfo(Convert.toStr(afterDataMap.get("aisMmsi"))); } afterDataMap.put("mmsi", afterDataMap.get("aisMmsi")); afterDataMap.put("illegalType", "heiyan"); // 查询检测点和船舶信息 if (ObjectUtil.isNotEmpty(afterDataMap.get("aisMmsi")) && !"0".equals(Convert.toStr(afterDataMap.get("aisMmsi")))) { AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("aisMmsi")), "", ""); if (shipInfo != null) { afterDataMap.put("shipName", shipInfo.getShipName()); afterDataMap.put("shipRegionType", shipInfo.getShipRegionType()); } } // 对于没有从ais系统拿到船名的情况下,直接取rcgShipName if (ObjectUtil.isEmpty(afterDataMap.get("shipName"))) { afterDataMap.put("shipName", afterDataMap.get("rcgShipName")); } if (ObjectUtil.isNotEmpty(afterDataMap.get("snapPos"))) { afterDataMap.put("orgName", orgMap.get(Convert.toStr(afterDataMap.get("snapPos")))); afterDataMap.put("deviceId", heiyanDeviceMap.get(Convert.toStr(afterDataMap.get("snapPos")))); } afterDataMap.put("createTime", afterDataMap.get("snapTimeFmt")); afterDataMap.put("monitorPointName", afterDataMap.get("snapPos")); // 保存数据 saveIllegalData(afterDataMap, ElasticConstants.HEIYAN_SHIP_RECOGNITION); client.createDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap); } else if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) { // 删除设备和类型的统计缓存 redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) { // 检测点信息 client.createDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap); redisCache.deleteObject(CacheConstants.DEVICE_STATIC); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) { // 设备信息 redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "alert")) { // 船舶记录 if ("prod".equals(env)) { if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) { AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("mmsi")), "", ""); if (shipInfo != null) { afterDataMap.put("shipName", shipInfo.getShipName()); afterDataMap.put("shipRegionType", shipInfo.getShipRegionType()); } } } // 判断是否大于阈值 if (ObjectUtil.isNotEmpty(afterDataMap.get("so2Percent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("so2Percent")), Convert.toBigDecimal(so2ThresholdVal))) { // 查询检测点和船舶信息、船舶进出港记录 if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) { ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.lastWeek()), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "", ""); if (eepReportRecInfo != null) { afterDataMap.put("destination", eepReportRecInfo.getNextPortName()); afterDataMap.put("berthName", eepReportRecInfo.getBerthName()); } } Map monitor = client.getDocById(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("monitorPointId")), "name"); if (monitor != null && ObjectUtil.equal(monitor.get("code"), 200)) { Map data = Convert.toMap(String.class, String.class, monitor.get("data")); afterDataMap.put("monitorPointName", data.get("name")); afterDataMap.put("orgName", orgMap.get(data.get("name"))); } // 嫌疑船舶 afterDataMap.put("illegalStatus", 2); afterDataMap.put("uploadFlag", 0); // TODO 上报行政检查系统 } else { afterDataMap.put("uploadFlag", 0); afterDataMap.put("illegalStatus", 1); } if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi")) && Convert.toStr(afterDataMap.get("mmsi")).length() == 9) { aisInfoService.getDynamicShipInfo(Convert.toStr(afterDataMap.get("mmsi"))); } afterDataMap.put("illegalType", "guangpu"); // 保存数据 saveIllegalData(afterDataMap, ElasticConstants.SO2_ALERT); client.createDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) { // 嗅探系统-站点信息 client.createDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) { // 嗅探系统-违规船舶 if ("prod".equals(env)) { // 查询检测点和船舶信息、船舶进出港记录 if (ObjectUtil.isNotEmpty(afterDataMap.get("shipMmsi"))) { AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("shipMmsi")), "", ""); if (shipInfo != null) { afterDataMap.put("shipName", shipInfo.getShipName()); afterDataMap.put("shipRegionType", shipInfo.getShipRegionType()); } } } afterDataMap.put("mmsi", afterDataMap.get("shipMmsi")); afterDataMap.put("createTime", afterDataMap.get("peakTime")); afterDataMap.put("so2Percent", afterDataMap.get("sPercent")); if (ObjectUtil.isNotEmpty(afterDataMap.get("sPercent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("sPercent")), Convert.toBigDecimal(so2ThresholdVal))) { // 查询检测点和船舶信息、船舶进出港记录 if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) { ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.lastWeek()), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "", ""); if (eepReportRecInfo != null) { afterDataMap.put("destination", eepReportRecInfo.getNextPortName()); afterDataMap.put("berthName", eepReportRecInfo.getBerthName()); } } afterDataMap.put("deviceId", afterDataMap.get("semId")); // 查询站点信息 Map sem = client.getDocById(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("semId")), "name,category"); if (sem != null && ObjectUtil.equal(sem.get("code"), 200)) { Map data = Convert.toMap(String.class, String.class, sem.get("data")); afterDataMap.put("semName", data.get("name")); afterDataMap.put("monitorPointName", data.get("category")); afterDataMap.put("orgName", orgMap.get(data.get("category"))); } // 嫌疑船舶 afterDataMap.put("illegalStatus", 2); afterDataMap.put("uploadFlag", 0); // TODO 上报行政检查系统 } else { afterDataMap.put("uploadFlag", 0); afterDataMap.put("illegalStatus", 1); } if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi")) && Convert.toStr(afterDataMap.get("mmsi")).length() == 9) { aisInfoService.getDynamicShipInfo(Convert.toStr(afterDataMap.get("mmsi"))); } afterDataMap.put("illegalType", "xiutan"); // 保存数据 saveIllegalData(afterDataMap, ElasticConstants.AIS_ILLEGAL_SHIP); client.createDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")), afterDataMap); } break; case UPDATE: if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) { redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) { // 检测点信息 client.updateDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap); redisCache.deleteObject(CacheConstants.DEVICE_STATIC); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) { // 设备信息 redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) { // 嗅探系统-站点信息 client.updateDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } break; case DELETE: if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) { redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) { // 检测点信息 client.deleteDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id"))); redisCache.deleteObject(CacheConstants.DEVICE_STATIC); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) { // 设备信息 redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) { // 嗅探系统-站点信息 client.deleteDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id"))); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } break; default: break; } } private void saveIllegalData(Map afterDataMap, String indexName) { if (Convert.toInt(afterDataMap.get("illegalStatus")) != 1) { IllegalShipData illegalShipData = new IllegalShipData(); BeanUtil.fillBeanWithMap(afterDataMap, illegalShipData, CopyOptions.create().setIgnoreProperties("id")); illegalShipData.setId(IdUtils.fastSimpleUUID()); illegalShipData.setSystemId(Convert.toStr(afterDataMap.get("id"))); illegalShipData.setSystemEsIndex(indexName); try { illegalShipDataService.insertIllegalShipData(illegalShipData); } catch (Exception ignored) { ignored.printStackTrace(); } } } /** * 或缺数据库字段的大小写 * * @param data * @param * @return */ private static LinkedCaseInsensitiveMap toCaseInsensitiveMap(Map data) { LinkedCaseInsensitiveMap map = new LinkedCaseInsensitiveMap(); map.putAll(data); return map; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }