package com.ruoyi.web.job; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import com.ruoyi.common.constant.CacheConstants; import com.ruoyi.common.constant.ElasticConstants; import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.utils.JdbcTypeUtil; import com.ruoyi.framework.config.ElasticSearchClient; import com.ruoyi.web.core.config.CanalConfig; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.LinkedCaseInsensitiveMap; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @Description: TODO * @Author: huangcheng * @Date: 2021/8/16 * @Version V1.0 */ @Slf4j @Component public class CanalScheduling implements Runnable, ApplicationContextAware { private ApplicationContext applicationContext; @Autowired private ElasticSearchClient client; @Autowired private RedisCache redisCache; // @Resource // private CanalConnector canalConnector; @Autowired private CanalConfig canalConfig; @Override @Scheduled(fixedDelay = 100) //每隔100秒执行 public void run() { for (CanalConfig.Config config : canalConfig.getConfigs()) { // 创建链接 CanalConnector canalConnector = CanalConnectors .newSingleConnector(new InetSocketAddress(config.getHostname(), config.getPort()), config.getDestination(), "", ""); long batchId = -1; try { canalConnector.connect(); canalConnector.subscribe(); canalConnector.rollback(); //每次拉取条数 int batchSize = 1000; Message message = canalConnector.getWithoutAck(batchSize); //批次id batchId = message.getId(); List entries = message.getEntries(); if (batchId != -1 && entries.size() > 0) { entries.forEach(entry -> { //MySQL种my.cnf中配置的是binlog_format = ROW,这里只解析ROW类型 if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { //解析处理 publishCanalEvent(entry); } }); } canalConnector.ack(batchId); } catch (Exception e) { log.info("canal存在异常:{}", e.getMessage()); e.printStackTrace(); canalConnector.rollback(batchId); } finally { // 断开连接 canalConnector.disconnect(); } } } private void publishCanalEvent(CanalEntry.Entry entry) { //表名 String tableName = entry.getHeader().getTableName(); //数据库名 String database = entry.getHeader().getSchemaName(); // 操作类型 CanalEntry.EventType eventType = entry.getHeader().getEventType(); log.info(String.format("========> binlog[%s:%s] , name[%s,%s] , eventType[%s]", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), database, tableName, eventType)); CanalEntry.RowChange rowChange; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); return; } rowChange.getRowDatasList().forEach(rowData -> { //获取改变前的数据 List beforeColumnsList = rowData.getBeforeColumnsList(); //获取改变后的数据 List afterColumnsList = rowData.getAfterColumnsList(); Map beforeColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(beforeColumnsList, tableName)); Map afterColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(afterColumnsList, tableName)); //插入es indexES(beforeColumnsToMap, afterColumnsToMap, eventType, database, tableName); }); } Map parseColumnsToMap(List columns, String tableName) { Map map = new HashMap<>(); columns.forEach(column -> { if (column == null) { return; } map.put(StrUtil.lowerFirst(StrUtil.toCamelCase(column.getName())), JdbcTypeUtil.typeConvert(tableName, column.getName(), column.getValue(), column.getSqlType(), column.getMysqlType())); }); return map; } @SneakyThrows private void indexES(Map beforeDataMap, Map afterDataMap, CanalEntry.EventType eventType, String database, String table) { log.info("eventType:{},database:{},table:{}\nbeforeMap:{},\n afterMap:{}", eventType, database, table, beforeDataMap, afterDataMap); if (!StrUtil.equalsAnyIgnoreCase(database, "heiyan", "so2", "ais_database")) { return; } //不是user表中的数据不处理 if (!StrUtil.equalsAnyIgnoreCase(table, "ship_recognition", "ship_snap_address", "alert", "monitor_point", "device", "ship_static_info", "illegal_ship", "sem_instrument", "sem_instrument_test")) { return; } // 根据不同类型处理相应的逻辑 switch (eventType) { case INSERT: // 黑烟船舶数据入库 if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) { client.createDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap); } else if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) { redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) { // 检测点信息 client.createDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap); redisCache.deleteObject(CacheConstants.DEVICE_STATIC); } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) { // 设备信息 redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) { // 船舶信息 if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) { client.createDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), afterDataMap); } } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) { // 违规船舶 // 查询检测点和船舶信息 if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) { Map mmsi = client.getDocById(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), "name,destination"); if (mmsi != null && ObjectUtil.equal(mmsi.get("code"), 200)) { Map data = Convert.toMap(String.class, String.class, mmsi.get("data")); afterDataMap.put("shipName", data.get("name")); afterDataMap.put("destination", data.get("destination")); } } Map monitor = client.getDocById(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("monitorPointId")), "name"); if (monitor != null && ObjectUtil.equal(monitor.get("code"), 200)) { Map data = Convert.toMap(String.class, String.class, monitor.get("data")); afterDataMap.put("monitorPointName", data.get("name")); } client.createDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) { // 嗅探系统-站点信息 client.createDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) { // 嗅探系统-违规船舶 // 查询站点信息 Map sem = client.getDocById(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("semId")), "name,category"); if (sem != null && ObjectUtil.equal(sem.get("code"), 200)) { Map data = Convert.toMap(String.class, String.class, sem.get("data")); afterDataMap.put("semName", data.get("name")); afterDataMap.put("monitorPointName", data.get("category")); } // 初始化设为未上传状态 afterDataMap.put("uploadFlag", 0); client.createDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")), afterDataMap); } break; case UPDATE: if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) { client.updateDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap); } else if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) { redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) { // 检测点信息 client.updateDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap); redisCache.deleteObject(CacheConstants.DEVICE_STATIC); } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) { // 设备信息 redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) { // 船舶信息 client.updateDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), afterDataMap); } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) { // 违规船舶 client.updateDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) { // 嗅探系统-站点信息 client.updateDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) { // 嗅探系统-违规船舶 // 查询站点信息 Map sem = client.getDocById(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("semId")), "name,category"); if (sem != null && ObjectUtil.equal(sem.get("code"), 200)) { Map data = Convert.toMap(String.class, String.class, sem.get("data")); afterDataMap.put("semName", data.get("name")); afterDataMap.put("monitorPointName", data.get("category")); } client.updateDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")), afterDataMap); } break; case DELETE: if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) { client.deleteDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id"))); } else if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) { redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) { // 检测点信息 client.deleteDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id"))); redisCache.deleteObject(CacheConstants.DEVICE_STATIC); } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) { // 设备信息 redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) { // 船舶信息 client.deleteDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi"))); } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) { // 违规船舶 client.deleteDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id"))); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) { // 嗅探系统-站点信息 client.deleteDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id"))); redisCache.deleteObject(CollUtil.set(false, CacheConstants.DEVICE_STATIC, CacheConstants.DEVICE_TYPE_STATIC)); } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) { // 嗅探系统-违规船舶 client.deleteDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id"))); } break; default: break; } } /** * 或缺数据库字段的大小写 * * @param data * @param * @return */ private static LinkedCaseInsensitiveMap toCaseInsensitiveMap(Map data) { LinkedCaseInsensitiveMap map = new LinkedCaseInsensitiveMap(); map.putAll(data); return map; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }