123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506 |
- package com.ruoyi.web.job;
- import cn.hutool.core.bean.BeanUtil;
- import cn.hutool.core.bean.copier.CopyOptions;
- import cn.hutool.core.collection.CollUtil;
- import cn.hutool.core.convert.Convert;
- import cn.hutool.core.date.DateUtil;
- import cn.hutool.core.map.MapUtil;
- import cn.hutool.core.util.NumberUtil;
- import cn.hutool.core.util.ObjectUtil;
- import cn.hutool.core.util.StrUtil;
- import cn.hutool.http.HttpUtil;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.protocol.CanalEntry;
- import com.alibaba.otter.canal.protocol.Message;
- import com.google.protobuf.InvalidProtocolBufferException;
- import com.ruoyi.common.constant.CacheConstants;
- import com.ruoyi.common.constant.ElasticConstants;
- import com.ruoyi.common.core.redis.RedisCache;
- import com.ruoyi.common.utils.JdbcTypeUtil;
- import com.ruoyi.common.utils.uuid.IdUtils;
- import com.ruoyi.framework.config.ElasticSearchClient;
- import com.ruoyi.system.domain.IllegalShipData;
- import com.ruoyi.system.domain.vo.AisShipInfo;
- import com.ruoyi.system.domain.vo.ShipEepReportRecInfo;
- import com.ruoyi.system.service.IAisInfoService;
- import com.ruoyi.system.service.IIllegalShipDataService;
- import com.ruoyi.system.service.ISysConfigService;
- import com.ruoyi.web.core.config.CanalConfig;
- import lombok.SneakyThrows;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import org.springframework.util.LinkedCaseInsensitiveMap;
- import java.net.InetSocketAddress;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- /**
- * @Description: TODO
- * @Author: huangcheng
- * @Date: 2021/8/16
- * @Version V1.0
- */
- @Slf4j
- @Component
- public class CanalScheduling implements Runnable, ApplicationContextAware {
- private ApplicationContext applicationContext;
- @Autowired
- private ElasticSearchClient client;
- @Autowired
- private RedisCache redisCache;
- @Autowired
- private ISysConfigService configService;
- @Autowired
- private IAisInfoService aisInfoService;
- // @Resource
- // private CanalConnector canalConnector;
- @Value("${black.snapImgUrl}")
- private String blackImgUrl;
- @Autowired
- private IIllegalShipDataService illegalShipDataService;
- @Autowired
- private CanalConfig canalConfig;
- @Value("${spring.profiles.active}")
- private String env;
- public final static Map<String, String> orgMap = new HashMap<String, String>() {{
- put("南京三桥", "南京海事局");
- put("南京四桥", "南京海事局");
- put("润扬大桥", "镇江海事局");
- put("泰州大桥", "泰州海事局");
- put("江阴大桥", "江阴海事局");
- put("苏通大桥", "常熟海事局");
- }};
- public final static Map<String, String> heiyanDeviceMap = new HashMap<String, String>() {{
- put("南京三桥", "1106");
- put("润扬大桥", "2104");
- put("江阴大桥", "6106");
- put("苏通大桥", "9108");
- }};
- @Override
- @Scheduled(fixedDelay = 100) //每隔100秒执行
- public void run() {
- for (CanalConfig.Config config : canalConfig.getConfigs()) {
- // 创建链接
- CanalConnector canalConnector = CanalConnectors
- .newSingleConnector(new InetSocketAddress(config.getHostname(), config.getPort()), config.getDestination(), "", "");
- long batchId = -1;
- try {
- canalConnector.connect();
- canalConnector.subscribe();
- canalConnector.rollback();
- //每次拉取条数
- int batchSize = 1000;
- Message message = canalConnector.getWithoutAck(batchSize);
- //批次id
- batchId = message.getId();
- List<CanalEntry.Entry> entries = message.getEntries();
- if (batchId != -1 && entries.size() > 0) {
- entries.forEach(entry -> {
- //MySQL种my.cnf中配置的是binlog_format = ROW,这里只解析ROW类型
- if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
- //解析处理
- publishCanalEvent(entry);
- }
- });
- }
- canalConnector.ack(batchId);
- } catch (Exception e) {
- log.info("canal存在异常:{}", e.getMessage());
- e.printStackTrace();
- canalConnector.rollback(batchId);
- } finally {
- // 断开连接
- canalConnector.disconnect();
- }
- }
- }
- private void publishCanalEvent(CanalEntry.Entry entry) {
- //表名
- String tableName = entry.getHeader().getTableName();
- //数据库名
- String database = entry.getHeader().getSchemaName();
- // 操作类型
- CanalEntry.EventType eventType = entry.getHeader().getEventType();
- log.info(String.format("========> binlog[%s:%s] , name[%s,%s] , eventType[%s]",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- database, tableName,
- eventType));
- CanalEntry.RowChange rowChange;
- try {
- rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- return;
- }
- rowChange.getRowDatasList().forEach(rowData -> {
- //获取改变前的数据
- List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
- //获取改变后的数据
- List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
- Map<String, Object> beforeColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(beforeColumnsList, tableName));
- Map<String, Object> afterColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(afterColumnsList, tableName));
- //插入es
- indexES(beforeColumnsToMap, afterColumnsToMap, eventType, database, tableName);
- });
- }
- Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns, String tableName) {
- Map<String, Object> map = new HashMap<>();
- columns.forEach(column -> {
- if (column == null) {
- return;
- }
- map.put(StrUtil.lowerFirst(StrUtil.toCamelCase(column.getName())), JdbcTypeUtil.typeConvert(tableName, column.getName(), column.getValue(), column.getSqlType(), column.getMysqlType()));
- });
- return map;
- }
- @SneakyThrows
- private void indexES(Map<String, Object> beforeDataMap, Map<String, Object> afterDataMap, CanalEntry.EventType eventType, String database, String table) {
- if (MapUtil.isNotEmpty(afterDataMap)) {
- log.info("eventType:{},database:{},table:{}\nbeforeMap:{},\n afterMap:{}", eventType, database, table, beforeDataMap, afterDataMap);
- }
- if (!StrUtil.equalsAnyIgnoreCase(database, "smoke_api", "ship", "ais_database")) {
- return;
- }
- //不是user表中的数据不处理
- if (!StrUtil.equalsAnyIgnoreCase(table,
- "ship_recognition",
- "ship_snap_address",
- "common_record",
- "monitor_point",
- "device",
- "illegal_ship",
- "sem_instrument")) {
- return;
- }
- String so2ThresholdVal = configService.selectConfigByKey("so2.so2");
- String blackThresholdVal = configService.selectConfigByKey("black.rcgSoot");
- // 根据不同类型处理相应的逻辑
- switch (eventType) {
- case INSERT:
- // 黑烟船舶数据入库
- if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
- afterDataMap.put("mmsi", afterDataMap.get("aisMmsi"));
- afterDataMap.put("illegalType", "heiyan");
- afterDataMap.put("createTime", afterDataMap.get("snapTimeFmt"));
- afterDataMap.put("monitorPointName", afterDataMap.get("snapPos"));
- if (ObjectUtil.isEmpty(afterDataMap.get("mmsi")) /*|| Convert.toStr(afterDataMap.get("mmsi")).length() != 9*/) {
- break;
- }
- // 判断数据状态
- if (ObjectUtil.isNotEmpty(afterDataMap.get("rcgSoot")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("rcgSoot")),
- Convert.toBigDecimal(blackThresholdVal))) {
- // 查询黑烟图片
- if ("prod".equals(env)) {
- String imgUrl = HttpUtil.get(blackImgUrl + Convert.toStr(afterDataMap.get("id")));
- JSONObject response = JSON.parseObject(imgUrl);
- if (200 == response.getInteger("code")) {
- List<String> newImgList = new ArrayList<>();
- JSONArray imgUrlList = response.getJSONObject("data").getJSONArray("imgUrlList");
- String sootImgUrl = response.getJSONObject("data").getString("sootImgUrl");
- for (Object o : imgUrlList) {
- // 只保存细节、全貌开头的图片
- String s = Convert.toStr(o);
- // if (StrUtil.containsAny(s, "全貌", "细节")) {
- newImgList.add("http://" + s);
- // }
- }
- afterDataMap.put("sootImgUrl", StrUtil.isNotBlank(sootImgUrl) ? "http://" + sootImgUrl : "");
- afterDataMap.put("allImgUrl", newImgList);
- }
- }
- // 查询检测点和船舶信息、船舶进出港记录
- ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.offsetDay(new Date(), -20)), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "", "");
- if (eepReportRecInfo != null) {
- afterDataMap.put("destination", eepReportRecInfo.getNextPortName());
- afterDataMap.put("berthName", eepReportRecInfo.getBerthName());
- afterDataMap.put("expectTime", eepReportRecInfo.getExpectTime());
- }
- // 超过阈值后直接判定为违规船舶
- afterDataMap.put("illegalStatus", 3);
- afterDataMap.put("uploadFlag", 0);
- // TODO 上报行政检查系统
- } else {
- afterDataMap.put("uploadFlag", 0);
- afterDataMap.put("illegalStatus", 1);
- }
- // 查询检测点和船舶信息
- AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("mmsi")), "", "");
- if (shipInfo != null) {
- afterDataMap.put("shipName", shipInfo.getShipName());
- afterDataMap.put("shipRegionType", shipInfo.getShipRegionType());
- }
- // 对于没有从ais系统拿到船名的情况下,直接取rcgShipName
- if (ObjectUtil.isEmpty(afterDataMap.get("shipName"))) {
- afterDataMap.put("shipName", afterDataMap.get("rcgShipName"));
- }
- if (ObjectUtil.isNotEmpty(afterDataMap.get("snapPos"))) {
- afterDataMap.put("orgName", orgMap.get(Convert.toStr(afterDataMap.get("snapPos"))));
- afterDataMap.put("deviceId", heiyanDeviceMap.get(Convert.toStr(afterDataMap.get("snapPos"))));
- }
- // 保存数据(黑烟暂时不需要推送到行政检查)
- // saveIllegalData(afterDataMap, ElasticConstants.HEIYAN_SHIP_RECOGNITION);
- client.createDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap);
- } else if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
- // 删除设备和类型的统计缓存
- log.info("删除redis设备缓存");
- redisCache.deleteObject(CollUtil.set(false,
- CacheConstants.DEVICE_STATIC,
- CacheConstants.DEVICE_TYPE_STATIC,
- CacheConstants.DEVICE_LIST_STATIC));
- } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
- // 检测点信息
- client.createDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
- redisCache.deleteObject(CacheConstants.DEVICE_STATIC);
- } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) {
- // 设备信息
- log.info("删除redis设备缓存");
- redisCache.deleteObject(CollUtil.set(false,
- CacheConstants.DEVICE_STATIC,
- CacheConstants.DEVICE_TYPE_STATIC,
- CacheConstants.DEVICE_LIST_STATIC));
- } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
- // 嗅探系统-站点信息
- client.createDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
- log.info("删除redis设备缓存");
- redisCache.deleteObject(CollUtil.set(false,
- CacheConstants.DEVICE_STATIC,
- CacheConstants.DEVICE_TYPE_STATIC,
- CacheConstants.DEVICE_LIST_STATIC));
- } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
- // 嗅探系统-违规船舶
- afterDataMap.put("mmsi", afterDataMap.get("shipMmsi"));
- afterDataMap.put("createTime", afterDataMap.get("peakTime"));
- afterDataMap.put("so2Percent", afterDataMap.get("sPercent"));
- if ("prod".equals(env)) {
- // 查询检测点和船舶信息、船舶进出港记录
- AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("mmsi")), "", "");
- if (shipInfo != null) {
- afterDataMap.put("shipName", shipInfo.getShipName());
- afterDataMap.put("shipRegionType", shipInfo.getShipRegionType());
- }
- }
- if (ObjectUtil.isNotEmpty(afterDataMap.get("sPercent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("sPercent")), Convert.toBigDecimal(so2ThresholdVal))) {
- // 查询检测点和船舶信息、船舶进出港记录
- ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.offsetDay(new Date(), -20)), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "",
- "");
- if (eepReportRecInfo != null) {
- afterDataMap.put("destination", eepReportRecInfo.getNextPortName());
- afterDataMap.put("berthName", eepReportRecInfo.getBerthName());
- afterDataMap.put("expectTime", eepReportRecInfo.getExpectTime());
- }
- // 嫌疑船舶
- afterDataMap.put("illegalStatus", 2);
- afterDataMap.put("uploadFlag", 0);
- // TODO 上报行政检查系统
- } else {
- afterDataMap.put("uploadFlag", 0);
- afterDataMap.put("illegalStatus", 1);
- }
- afterDataMap.put("deviceId", afterDataMap.get("semId"));
- // 查询站点信息
- Map<String, Object> sem = client.getDocById(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("semId")), "name,category");
- if (sem != null && ObjectUtil.equal(sem.get("code"), 200)) {
- Map<String, String> data = Convert.toMap(String.class, String.class, sem.get("data"));
- afterDataMap.put("semName", data.get("name"));
- afterDataMap.put("monitorPointName", data.get("category"));
- afterDataMap.put("orgName", orgMap.get(data.get("category")));
- }
- afterDataMap.put("illegalType", "xiutan");
- // 保存数据
- saveIllegalData(afterDataMap, ElasticConstants.AIS_ILLEGAL_SHIP);
- client.createDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")), afterDataMap);
- }
- break;
- case UPDATE:
- if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
- if (ObjectUtil.notEqual(beforeDataMap.get("deviceImgUrl"), afterDataMap.get("deviceImgUrl"))
- || ObjectUtil.notEqual(beforeDataMap.get("latitude"), afterDataMap.get("latitude"))
- || ObjectUtil.notEqual(beforeDataMap.get("longitude"), afterDataMap.get("longitude"))
- || ObjectUtil.notEqual(beforeDataMap.get("status"), afterDataMap.get("status"))) {
- log.info("删除redis设备缓存");
- redisCache.deleteObject(CollUtil.set(false,
- CacheConstants.DEVICE_STATIC,
- CacheConstants.DEVICE_TYPE_STATIC,
- CacheConstants.DEVICE_LIST_STATIC));
- }
- } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "common_record")) {
- // 船舶记录
- if ("prod".equals(env)) {
- AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("mmsi")), "", "");
- if (shipInfo != null) {
- afterDataMap.put("shipName", shipInfo.getShipName());
- afterDataMap.put("shipRegionType", shipInfo.getShipRegionType());
- }
- }
- // 判断是否大于阈值
- if (ObjectUtil.isNotEmpty(afterDataMap.get("so2Percent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("so2Percent")), Convert.toBigDecimal(so2ThresholdVal))) {
- // 查询检测点和船舶信息、船舶进出港记录
- ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.offsetDay(new Date(), -20)), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "", "");
- if (eepReportRecInfo != null) {
- afterDataMap.put("destination", eepReportRecInfo.getNextPortName());
- afterDataMap.put("berthName", eepReportRecInfo.getBerthName());
- afterDataMap.put("expectTime", eepReportRecInfo.getExpectTime());
- }
- // 嫌疑船舶
- afterDataMap.put("illegalStatus", 2);
- afterDataMap.put("uploadFlag", 0);
- // TODO 上报行政检查系统
- } else {
- afterDataMap.put("uploadFlag", 0);
- afterDataMap.put("illegalStatus", 1);
- }
- Map<String, Object> monitor = client.getDocById(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("monitorPointId")), "name");
- if (monitor != null && ObjectUtil.equal(monitor.get("code"), 200)) {
- Map<String, String> data = Convert.toMap(String.class, String.class, monitor.get("data"));
- afterDataMap.put("monitorPointName", data.get("name"));
- afterDataMap.put("orgName", orgMap.get(data.get("name")));
- }
- afterDataMap.put("illegalType", "guangpu");
- // 保存数据
- saveIllegalData(afterDataMap, ElasticConstants.SO2_ALERT);
- client.createDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
- } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
- // 检测点信息
- client.updateDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
- redisCache.deleteObject(CacheConstants.DEVICE_STATIC);
- } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) {
- // 设备信息
- // 如果状态和经纬度变更才去删除redis
- if (ObjectUtil.notEqual(beforeDataMap.get("online_status"), afterDataMap.get("online_status")) || ObjectUtil.notEqual(beforeDataMap.get("longitude"), afterDataMap.get("longitude")) || ObjectUtil.notEqual(beforeDataMap.get(
- "latitude"), afterDataMap.get("latitude"))) {
- log.info("删除redis设备缓存");
- redisCache.deleteObject(CollUtil.set(false,
- CacheConstants.DEVICE_STATIC,
- CacheConstants.DEVICE_TYPE_STATIC,
- CacheConstants.DEVICE_LIST_STATIC));
- }
- } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
- // 嗅探系统-站点信息
- client.updateDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
- if (ObjectUtil.notEqual(beforeDataMap.get("latitude"), afterDataMap.get("latitude")) || ObjectUtil.notEqual(beforeDataMap.get("longitude"), afterDataMap.get("longitude"))) {
- log.info("删除redis设备缓存");
- redisCache.deleteObject(CollUtil.set(false,
- CacheConstants.DEVICE_STATIC,
- CacheConstants.DEVICE_TYPE_STATIC,
- CacheConstants.DEVICE_LIST_STATIC));
- }
- }
- break;
- case DELETE:
- if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
- log.info("删除redis设备缓存");
- redisCache.deleteObject(CollUtil.set(false,
- CacheConstants.DEVICE_STATIC,
- CacheConstants.DEVICE_TYPE_STATIC,
- CacheConstants.DEVICE_LIST_STATIC));
- } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
- // 检测点信息
- client.deleteDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(beforeDataMap.get("id")));
- redisCache.deleteObject(CacheConstants.DEVICE_STATIC);
- } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) {
- // 设备信息
- log.info("删除redis设备缓存");
- redisCache.deleteObject(CollUtil.set(false,
- CacheConstants.DEVICE_STATIC,
- CacheConstants.DEVICE_TYPE_STATIC,
- CacheConstants.DEVICE_LIST_STATIC));
- } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
- // 嗅探系统-站点信息
- client.deleteDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(beforeDataMap.get("id")));
- log.info("删除redis设备缓存");
- redisCache.deleteObject(CollUtil.set(false,
- CacheConstants.DEVICE_STATIC,
- CacheConstants.DEVICE_TYPE_STATIC,
- CacheConstants.DEVICE_LIST_STATIC));
- }
- break;
- default:
- break;
- }
- }
- private void saveIllegalData(Map<String, Object> afterDataMap, String indexName) {
- if (Convert.toInt(afterDataMap.get("illegalStatus")) != 1) {
- IllegalShipData illegalShipData = new IllegalShipData();
- BeanUtil.fillBeanWithMap(afterDataMap, illegalShipData, CopyOptions.create().setIgnoreProperties("id"));
- if (StrUtil.isBlank(illegalShipData.getMmsi()) || "0".equals(illegalShipData.getMmsi())) {
- return;
- }
- illegalShipData.setId(IdUtils.fastSimpleUUID());
- illegalShipData.setSystemId(Convert.toStr(afterDataMap.get("id")));
- illegalShipData.setSystemEsIndex(indexName);
- // 查询此船是否已上报过,如果是,则不需要再上报.根据mmsi和uploadFlag=1,illegalStatus=2
- IllegalShipData existBean = new IllegalShipData();
- existBean.setMmsi(illegalShipData.getMmsi());
- existBean.setIllegalStatus(2);
- existBean.setUploadFlag(1);
- List<IllegalShipData> exist = illegalShipDataService.selectIllegalShipDataList(existBean);
- if (exist != null && exist.size() > 0) {
- illegalShipData.setIllegalStatus(4);
- }
- try {
- illegalShipDataService.insertIllegalShipData(illegalShipData);
- } catch (Exception ignored) {
- ignored.printStackTrace();
- }
- }
- }
- /**
- * 或缺数据库字段的大小写
- *
- * @param data
- * @param <V>
- * @return
- */
- private static <V> LinkedCaseInsensitiveMap<V> toCaseInsensitiveMap(Map<String, V> data) {
- LinkedCaseInsensitiveMap map = new LinkedCaseInsensitiveMap();
- map.putAll(data);
- return map;
- }
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = applicationContext;
- }
- }
|