package com.ruoyi.web.job; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.copier.CopyOptions; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import com.ruoyi.common.constant.CacheConstants; import com.ruoyi.common.constant.ElasticConstants; import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.utils.JdbcTypeUtil; import com.ruoyi.common.utils.uuid.IdUtils; import com.ruoyi.framework.config.ElasticSearchClient; import com.ruoyi.system.domain.IllegalShipData; import com.ruoyi.system.domain.vo.AisShipInfo; import com.ruoyi.system.domain.vo.ShipEepReportRecInfo; import com.ruoyi.system.service.IAisInfoService; import com.ruoyi.system.service.IIllegalShipDataService; import com.ruoyi.system.service.ISysConfigService; import com.ruoyi.web.core.config.CanalConfig; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.LinkedCaseInsensitiveMap; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @Description: TODO * @Author: huangcheng * @Date: 2021/8/16 * @Version V1.0 */ @Slf4j @Component public class CanalScheduling implements Runnable, ApplicationContextAware { private ApplicationContext applicationContext; @Autowired private ElasticSearchClient client; @Autowired private RedisCache redisCache; @Autowired private ISysConfigService configService; @Autowired private IAisInfoService aisInfoService; // @Resource // private CanalConnector canalConnector; @Value("${black.snapImgUrl}") private String blackImgUrl; @Autowired private IIllegalShipDataService illegalShipDataService; @Autowired private CanalConfig canalConfig; @Value("${spring.profiles.active}") private String env; public final static Map orgMap = new HashMap() {{ put("南京三桥", "南京海事局"); put("南京四桥", "南京海事局"); put("润扬大桥", "镇江海事局"); put("泰州大桥", "泰州海事局"); put("江阴大桥", "江阴海事局"); put("苏通大桥", "常熟海事局"); }}; public final static Map heiyanDeviceMap = new HashMap() {{ put("南京三桥", "1106"); put("润扬大桥", "2104"); put("江阴大桥", "6106"); put("苏通大桥", "9108"); }}; @Override @Scheduled(fixedDelay = 100) //每隔100秒执行 public void run() { for (CanalConfig.Config config : canalConfig.getConfigs()) { // 创建链接 CanalConnector canalConnector = CanalConnectors .newSingleConnector(new InetSocketAddress(config.getHostname(), config.getPort()), config.getDestination(), "", ""); long batchId = -1; try { canalConnector.connect(); canalConnector.subscribe(); canalConnector.rollback(); //每次拉取条数 int batchSize = 1000; Message message = canalConnector.getWithoutAck(batchSize); //批次id batchId = message.getId(); List entries = message.getEntries(); if (batchId != -1 && entries.size() > 0) { entries.forEach(entry -> { //MySQL种my.cnf中配置的是binlog_format = ROW,这里只解析ROW类型 if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { //解析处理 publishCanalEvent(entry); } }); } canalConnector.ack(batchId); } catch (Exception e) { log.info("canal存在异常:{}", e.getMessage()); e.printStackTrace(); canalConnector.rollback(batchId); } finally { // 断开连接 canalConnector.disconnect(); } } } private void publishCanalEvent(CanalEntry.Entry entry) { //表名 String tableName = entry.getHeader().getTableName(); //数据库名 String database = entry.getHeader().getSchemaName(); // 操作类型 CanalEntry.EventType eventType = entry.getHeader().getEventType(); log.info(String.format("========> binlog[%s:%s] , name[%s,%s] , eventType[%s]", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), database, tableName, eventType)); CanalEntry.RowChange rowChange; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); return; } rowChange.getRowDatasList().forEach(rowData -> { //获取改变前的数据 List beforeColumnsList = rowData.getBeforeColumnsList(); //获取改变后的数据 List afterColumnsList = rowData.getAfterColumnsList(); Map beforeColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(beforeColumnsList, tableName)); Map afterColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(afterColumnsList, tableName)); //插入es indexES(beforeColumnsToMap, afterColumnsToMap, eventType, database, tableName); }); } Map parseColumnsToMap(List columns, String tableName) { Map map = new HashMap<>(); columns.forEach(column -> { if (column == null) { return; } map.put(StrUtil.lowerFirst(StrUtil.toCamelCase(column.getName())), JdbcTypeUtil.typeConvert(tableName, column.getName(), column.getValue(), column.getSqlType(), column.getMysqlType())); }); return map; } @SneakyThrows private void indexES(Map beforeDataMap, Map afterDataMap, CanalEntry.EventType eventType, String database, String table) { if (MapUtil.isNotEmpty(afterDataMap)) { log.info("eventType:{},database:{},table:{}\nbeforeMap:{},\n afterMap:{}", eventType, database, table, beforeDataMap, afterDataMap); } if (!StrUtil.equalsAnyIgnoreCase(database, "smoke_api", "ship", "ais_database")) { return; } //不是user表中的数据不处理 if (!StrUtil.equalsAnyIgnoreCase(table, "ship_recognition", "ship_snap_address", "common_record", "monitor_point", "device", "illegal_ship", "sem_instrument")) { return; } String so2ThresholdVal = configService.selectConfigByKey("so2.so2"); String blackThresholdVal = configService.selectConfigByKey("black.rcgSoot"); // 根据不同类型处理相应的逻辑 switch (eventType) { case INSERT: // 黑烟船舶数据入库 if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) { afterDataMap.put("mmsi", afterDataMap.get("aisMmsi")); afterDataMap.put("illegalType", "heiyan"); afterDataMap.put("createTime", afterDataMap.get("snapTimeFmt")); afterDataMap.put("monitorPointName", afterDataMap.get("snapPos")); if (ObjectUtil.isEmpty(afterDataMap.get("mmsi")) /*|| Convert.toStr(afterDataMap.get("mmsi")).length() != 9*/) { break; } // 判断数据状态 if (ObjectUtil.isNotEmpty(afterDataMap.get("rcgSoot")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("rcgSoot")), Convert.toBigDecimal(blackThresholdVal))) { // 查询黑烟图片 if ("prod".equals(env)) { String imgUrl = HttpUtil.get(blackImgUrl + Convert.toStr(afterDataMap.get("id"))); JSONObject response = JSON.parseObject(imgUrl); if (200 == response.getInteger("code")) { List newImgList = new ArrayList<>(); JSONArray imgUrlList = response.getJSONObject("data").getJSONArray("imgUrlList"); String sootImgUrl = response.getJSONObject("data").getString("sootImgUrl"); for (Object o : imgUrlList) { // 只保存细节、全貌开头的图片 String s = Convert.toStr(o); // if (StrUtil.containsAny(s, "全貌", "细节")) { newImgList.add("http://" + s); // } } afterDataMap.put("sootImgUrl", StrUtil.isNotBlank(sootImgUrl) ? "http://" + sootImgUrl : ""); afterDataMap.put("allImgUrl", newImgList); } } // 查询检测点和船舶信息、船舶进出港记录 ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.offsetDay(new Date(), -20)), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "", ""); if (eepReportRecInfo != null) { afterDataMap.put("destination", eepReportRecInfo.getNextPortName()); afterDataMap.put("berthName", eepReportRecInfo.getBerthName()); afterDataMap.put("expectTime", eepReportRecInfo.getExpectTime()); } // 超过阈值后直接判定为违规船舶 afterDataMap.put("illegalStatus", 3); afterDataMap.put("uploadFlag", 0); // TODO 上报行政检查系统 } else { afterDataMap.put("uploadFlag", 0); afterDataMap.put("illegalStatus", 1); } // 查询检测点和船舶信息 AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("mmsi")), "", ""); if (shipInfo != null) { afterDataMap.put("shipName", shipInfo.getShipName()); afterDataMap.put("shipRegionType", shipInfo.getShipRegionType()); } // 对于没有从ais系统拿到船名的情况下,直接取rcgShipName if (ObjectUtil.isEmpty(afterDataMap.get("shipName"))) { afterDataMap.put("shipName", afterDataMap.get("rcgShipName")); } if (ObjectUtil.isNotEmpty(afterDataMap.get("snapPos"))) { afterDataMap.put("orgName", orgMap.get(Convert.toStr(afterDataMap.get("snapPos")))); afterDataMap.put("deviceId", heiyanDeviceMap.get(Convert.toStr(afterDataMap.get("snapPos")))); } // 保存数据(黑烟暂时不需要推送到行政检查) // saveIllegalData(afterDataMap, ElasticConstants.HEIYAN_SHIP_RECOGNITION); client.createDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap); } else if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) { // 删除设备和类型的统计缓存 log.info("删除redis设备缓存"); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) { // 检测点信息 client.createDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap); redisCache.deleteObject(CacheConstants.DEVICE_STATIC); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) { // 设备信息 log.info("删除redis设备缓存"); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) { // 嗅探系统-站点信息 client.createDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap); log.info("删除redis设备缓存"); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) { // 嗅探系统-违规船舶 afterDataMap.put("mmsi", afterDataMap.get("shipMmsi")); afterDataMap.put("createTime", afterDataMap.get("peakTime")); afterDataMap.put("so2Percent", afterDataMap.get("sPercent")); if ("prod".equals(env)) { // 查询检测点和船舶信息、船舶进出港记录 AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("mmsi")), "", ""); if (shipInfo != null) { afterDataMap.put("shipName", shipInfo.getShipName()); afterDataMap.put("shipRegionType", shipInfo.getShipRegionType()); } } if (ObjectUtil.isNotEmpty(afterDataMap.get("sPercent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("sPercent")), Convert.toBigDecimal(so2ThresholdVal))) { // 查询检测点和船舶信息、船舶进出港记录 ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.offsetDay(new Date(), -20)), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "", ""); if (eepReportRecInfo != null) { afterDataMap.put("destination", eepReportRecInfo.getNextPortName()); afterDataMap.put("berthName", eepReportRecInfo.getBerthName()); afterDataMap.put("expectTime", eepReportRecInfo.getExpectTime()); } // 嫌疑船舶 afterDataMap.put("illegalStatus", 2); afterDataMap.put("uploadFlag", 0); // TODO 上报行政检查系统 } else { afterDataMap.put("uploadFlag", 0); afterDataMap.put("illegalStatus", 1); } afterDataMap.put("deviceId", afterDataMap.get("semId")); // 查询站点信息 Map sem = client.getDocById(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("semId")), "name,category"); if (sem != null && ObjectUtil.equal(sem.get("code"), 200)) { Map data = Convert.toMap(String.class, String.class, sem.get("data")); afterDataMap.put("semName", data.get("name")); afterDataMap.put("monitorPointName", data.get("category")); afterDataMap.put("orgName", orgMap.get(data.get("category"))); } afterDataMap.put("illegalType", "xiutan"); // 保存数据 saveIllegalData(afterDataMap, ElasticConstants.AIS_ILLEGAL_SHIP); client.createDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")), afterDataMap); } break; case UPDATE: if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) { if (ObjectUtil.notEqual(beforeDataMap.get("deviceImgUrl"), afterDataMap.get("deviceImgUrl")) || ObjectUtil.notEqual(beforeDataMap.get("latitude"), afterDataMap.get("latitude")) || ObjectUtil.notEqual(beforeDataMap.get("longitude"), afterDataMap.get("longitude")) || ObjectUtil.notEqual(beforeDataMap.get("status"), afterDataMap.get("status"))) { log.info("删除redis设备缓存"); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "common_record")) { // 船舶记录 if ("prod".equals(env)) { AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("mmsi")), "", ""); if (shipInfo != null) { afterDataMap.put("shipName", shipInfo.getShipName()); afterDataMap.put("shipRegionType", shipInfo.getShipRegionType()); } } // 判断是否大于阈值 if (ObjectUtil.isNotEmpty(afterDataMap.get("so2Percent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("so2Percent")), Convert.toBigDecimal(so2ThresholdVal))) { // 查询检测点和船舶信息、船舶进出港记录 ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.offsetDay(new Date(), -20)), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "", ""); if (eepReportRecInfo != null) { afterDataMap.put("destination", eepReportRecInfo.getNextPortName()); afterDataMap.put("berthName", eepReportRecInfo.getBerthName()); afterDataMap.put("expectTime", eepReportRecInfo.getExpectTime()); } // 嫌疑船舶 afterDataMap.put("illegalStatus", 2); afterDataMap.put("uploadFlag", 0); // TODO 上报行政检查系统 } else { afterDataMap.put("uploadFlag", 0); afterDataMap.put("illegalStatus", 1); } Map monitor = client.getDocById(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("monitorPointId")), "name"); if (monitor != null && ObjectUtil.equal(monitor.get("code"), 200)) { Map data = Convert.toMap(String.class, String.class, monitor.get("data")); afterDataMap.put("monitorPointName", data.get("name")); afterDataMap.put("orgName", orgMap.get(data.get("name"))); } afterDataMap.put("illegalType", "guangpu"); // 保存数据 saveIllegalData(afterDataMap, ElasticConstants.SO2_ALERT); client.createDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) { // 检测点信息 client.updateDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap); redisCache.deleteObject(CacheConstants.DEVICE_STATIC); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) { // 设备信息 // 如果状态和经纬度变更才去删除redis if (ObjectUtil.notEqual(beforeDataMap.get("online_status"), afterDataMap.get("online_status")) || ObjectUtil.notEqual(beforeDataMap.get("longitude"), afterDataMap.get("longitude")) || ObjectUtil.notEqual(beforeDataMap.get( "latitude"), afterDataMap.get("latitude"))) { log.info("删除redis设备缓存"); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) { // 嗅探系统-站点信息 client.updateDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap); if (ObjectUtil.notEqual(beforeDataMap.get("latitude"), afterDataMap.get("latitude")) || ObjectUtil.notEqual(beforeDataMap.get("longitude"), afterDataMap.get("longitude"))) { log.info("删除redis设备缓存"); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } } break; case DELETE: if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) { log.info("删除redis设备缓存"); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) { // 检测点信息 client.deleteDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(beforeDataMap.get("id"))); redisCache.deleteObject(CacheConstants.DEVICE_STATIC); } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) { // 设备信息 log.info("删除redis设备缓存"); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) { // 嗅探系统-站点信息 client.deleteDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(beforeDataMap.get("id"))); log.info("删除redis设备缓存"); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC, CacheConstants.DEVICE_LIST_STATIC)); } break; default: break; } } private void saveIllegalData(Map afterDataMap, String indexName) { if (Convert.toInt(afterDataMap.get("illegalStatus")) != 1) { IllegalShipData illegalShipData = new IllegalShipData(); BeanUtil.fillBeanWithMap(afterDataMap, illegalShipData, CopyOptions.create().setIgnoreProperties("id")); if (StrUtil.isBlank(illegalShipData.getMmsi()) || "0".equals(illegalShipData.getMmsi())) { return; } illegalShipData.setId(IdUtils.fastSimpleUUID()); illegalShipData.setSystemId(Convert.toStr(afterDataMap.get("id"))); illegalShipData.setSystemEsIndex(indexName); // 查询此船是否已上报过,如果是,则不需要再上报.根据mmsi和uploadFlag=1,illegalStatus=2 IllegalShipData existBean = new IllegalShipData(); existBean.setMmsi(illegalShipData.getMmsi()); existBean.setIllegalStatus(2); existBean.setUploadFlag(1); List exist = illegalShipDataService.selectIllegalShipDataList(existBean); if (exist != null && exist.size() > 0) { illegalShipData.setIllegalStatus(4); } try { illegalShipDataService.insertIllegalShipData(illegalShipData); } catch (Exception ignored) { ignored.printStackTrace(); } } } /** * 或缺数据库字段的大小写 * * @param data * @param * @return */ private static LinkedCaseInsensitiveMap toCaseInsensitiveMap(Map data) { LinkedCaseInsensitiveMap map = new LinkedCaseInsensitiveMap(); map.putAll(data); return map; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }