CanalScheduling.java 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. package com.ruoyi.web.job;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.bean.copier.CopyOptions;
  4. import cn.hutool.core.collection.CollUtil;
  5. import cn.hutool.core.convert.Convert;
  6. import cn.hutool.core.date.DateUtil;
  7. import cn.hutool.core.map.MapUtil;
  8. import cn.hutool.core.util.NumberUtil;
  9. import cn.hutool.core.util.ObjectUtil;
  10. import cn.hutool.core.util.StrUtil;
  11. import cn.hutool.http.HttpUtil;
  12. import com.alibaba.fastjson.JSON;
  13. import com.alibaba.fastjson.JSONArray;
  14. import com.alibaba.fastjson.JSONObject;
  15. import com.alibaba.otter.canal.client.CanalConnector;
  16. import com.alibaba.otter.canal.client.CanalConnectors;
  17. import com.alibaba.otter.canal.protocol.CanalEntry;
  18. import com.alibaba.otter.canal.protocol.Message;
  19. import com.google.protobuf.InvalidProtocolBufferException;
  20. import com.ruoyi.common.constant.CacheConstants;
  21. import com.ruoyi.common.constant.ElasticConstants;
  22. import com.ruoyi.common.core.redis.RedisCache;
  23. import com.ruoyi.common.utils.JdbcTypeUtil;
  24. import com.ruoyi.common.utils.uuid.IdUtils;
  25. import com.ruoyi.framework.config.ElasticSearchClient;
  26. import com.ruoyi.system.domain.IllegalShipData;
  27. import com.ruoyi.system.domain.vo.AisShipInfo;
  28. import com.ruoyi.system.domain.vo.ShipEepReportRecInfo;
  29. import com.ruoyi.system.service.IAisInfoService;
  30. import com.ruoyi.system.service.IIllegalShipDataService;
  31. import com.ruoyi.system.service.ISysConfigService;
  32. import com.ruoyi.web.core.config.CanalConfig;
  33. import lombok.SneakyThrows;
  34. import lombok.extern.slf4j.Slf4j;
  35. import org.springframework.beans.BeansException;
  36. import org.springframework.beans.factory.annotation.Autowired;
  37. import org.springframework.beans.factory.annotation.Value;
  38. import org.springframework.context.ApplicationContext;
  39. import org.springframework.context.ApplicationContextAware;
  40. import org.springframework.scheduling.annotation.Scheduled;
  41. import org.springframework.stereotype.Component;
  42. import org.springframework.util.LinkedCaseInsensitiveMap;
  43. import java.net.InetSocketAddress;
  44. import java.util.ArrayList;
  45. import java.util.Date;
  46. import java.util.HashMap;
  47. import java.util.List;
  48. import java.util.Map;
  49. /**
  50. * @Description: TODO
  51. * @Author: huangcheng
  52. * @Date: 2021/8/16
  53. * @Version V1.0
  54. */
  55. @Slf4j
  56. @Component
  57. public class CanalScheduling implements Runnable, ApplicationContextAware {
  58. private ApplicationContext applicationContext;
  59. @Autowired
  60. private ElasticSearchClient client;
  61. @Autowired
  62. private RedisCache redisCache;
  63. @Autowired
  64. private ISysConfigService configService;
  65. @Autowired
  66. private IAisInfoService aisInfoService;
  67. // @Resource
  68. // private CanalConnector canalConnector;
  69. @Value("${black.snapImgUrl}")
  70. private String blackImgUrl;
  71. @Autowired
  72. private IIllegalShipDataService illegalShipDataService;
  73. @Autowired
  74. private CanalConfig canalConfig;
  75. @Value("${spring.profiles.active}")
  76. private String env;
  77. public final static Map<String, String> orgMap = new HashMap<String, String>() {{
  78. put("南京三桥", "南京海事局");
  79. put("南京四桥", "南京海事局");
  80. put("润扬大桥", "镇江海事局");
  81. put("泰州大桥", "泰州海事局");
  82. put("江阴大桥", "江阴海事局");
  83. put("苏通大桥", "常熟海事局");
  84. }};
  85. public final static Map<String, String> heiyanDeviceMap = new HashMap<String, String>() {{
  86. put("南京三桥", "1106");
  87. put("润扬大桥", "2104");
  88. put("江阴大桥", "6106");
  89. put("苏通大桥", "9108");
  90. }};
  91. @Override
  92. @Scheduled(fixedDelay = 100) //每隔100秒执行
  93. public void run() {
  94. for (CanalConfig.Config config : canalConfig.getConfigs()) {
  95. // 创建链接
  96. CanalConnector canalConnector = CanalConnectors
  97. .newSingleConnector(new InetSocketAddress(config.getHostname(), config.getPort()), config.getDestination(), "", "");
  98. long batchId = -1;
  99. try {
  100. canalConnector.connect();
  101. canalConnector.subscribe();
  102. canalConnector.rollback();
  103. //每次拉取条数
  104. int batchSize = 1000;
  105. Message message = canalConnector.getWithoutAck(batchSize);
  106. //批次id
  107. batchId = message.getId();
  108. List<CanalEntry.Entry> entries = message.getEntries();
  109. if (batchId != -1 && entries.size() > 0) {
  110. entries.forEach(entry -> {
  111. //MySQL种my.cnf中配置的是binlog_format = ROW,这里只解析ROW类型
  112. if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
  113. //解析处理
  114. publishCanalEvent(entry);
  115. }
  116. });
  117. }
  118. canalConnector.ack(batchId);
  119. } catch (Exception e) {
  120. log.info("canal存在异常:{}", e.getMessage());
  121. e.printStackTrace();
  122. canalConnector.rollback(batchId);
  123. } finally {
  124. // 断开连接
  125. canalConnector.disconnect();
  126. }
  127. }
  128. }
  129. private void publishCanalEvent(CanalEntry.Entry entry) {
  130. //表名
  131. String tableName = entry.getHeader().getTableName();
  132. //数据库名
  133. String database = entry.getHeader().getSchemaName();
  134. // 操作类型
  135. CanalEntry.EventType eventType = entry.getHeader().getEventType();
  136. log.info(String.format("========> binlog[%s:%s] , name[%s,%s] , eventType[%s]",
  137. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  138. database, tableName,
  139. eventType));
  140. CanalEntry.RowChange rowChange;
  141. try {
  142. rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  143. } catch (InvalidProtocolBufferException e) {
  144. e.printStackTrace();
  145. return;
  146. }
  147. rowChange.getRowDatasList().forEach(rowData -> {
  148. //获取改变前的数据
  149. List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
  150. //获取改变后的数据
  151. List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
  152. Map<String, Object> beforeColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(beforeColumnsList, tableName));
  153. Map<String, Object> afterColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(afterColumnsList, tableName));
  154. //插入es
  155. indexES(beforeColumnsToMap, afterColumnsToMap, eventType, database, tableName);
  156. });
  157. }
  158. Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns, String tableName) {
  159. Map<String, Object> map = new HashMap<>();
  160. columns.forEach(column -> {
  161. if (column == null) {
  162. return;
  163. }
  164. map.put(StrUtil.lowerFirst(StrUtil.toCamelCase(column.getName())), JdbcTypeUtil.typeConvert(tableName, column.getName(), column.getValue(), column.getSqlType(), column.getMysqlType()));
  165. });
  166. return map;
  167. }
  168. @SneakyThrows
  169. private void indexES(Map<String, Object> beforeDataMap, Map<String, Object> afterDataMap, CanalEntry.EventType eventType, String database, String table) {
  170. if (MapUtil.isNotEmpty(afterDataMap)) {
  171. log.info("eventType:{},database:{},table:{}\nbeforeMap:{},\n afterMap:{}", eventType, database, table, beforeDataMap, afterDataMap);
  172. }
  173. if (!StrUtil.equalsAnyIgnoreCase(database, "smoke_api", "ship", "ais_database")) {
  174. return;
  175. }
  176. //不是user表中的数据不处理
  177. if (!StrUtil.equalsAnyIgnoreCase(table,
  178. "ship_recognition",
  179. "ship_snap_address",
  180. "common_record",
  181. "monitor_point",
  182. "device",
  183. "illegal_ship",
  184. "sem_instrument")) {
  185. return;
  186. }
  187. String so2ThresholdVal = configService.selectConfigByKey("so2.so2");
  188. String blackThresholdVal = configService.selectConfigByKey("black.rcgSoot");
  189. // 根据不同类型处理相应的逻辑
  190. switch (eventType) {
  191. case INSERT:
  192. // 黑烟船舶数据入库
  193. if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
  194. afterDataMap.put("mmsi", afterDataMap.get("aisMmsi"));
  195. afterDataMap.put("illegalType", "heiyan");
  196. afterDataMap.put("createTime", afterDataMap.get("snapTimeFmt"));
  197. afterDataMap.put("monitorPointName", afterDataMap.get("snapPos"));
  198. if (ObjectUtil.isEmpty(afterDataMap.get("mmsi")) /*|| Convert.toStr(afterDataMap.get("mmsi")).length() != 9*/) {
  199. break;
  200. }
  201. // 判断数据状态
  202. if (ObjectUtil.isNotEmpty(afterDataMap.get("rcgSoot")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("rcgSoot")),
  203. Convert.toBigDecimal(blackThresholdVal))) {
  204. // 查询黑烟图片
  205. if ("prod".equals(env)) {
  206. String imgUrl = HttpUtil.get(blackImgUrl + Convert.toStr(afterDataMap.get("id")));
  207. JSONObject response = JSON.parseObject(imgUrl);
  208. if (200 == response.getInteger("code")) {
  209. List<String> newImgList = new ArrayList<>();
  210. JSONArray imgUrlList = response.getJSONObject("data").getJSONArray("imgUrlList");
  211. String sootImgUrl = response.getJSONObject("data").getString("sootImgUrl");
  212. for (Object o : imgUrlList) {
  213. // 只保存细节、全貌开头的图片
  214. String s = Convert.toStr(o);
  215. // if (StrUtil.containsAny(s, "全貌", "细节")) {
  216. newImgList.add("http://" + s);
  217. // }
  218. }
  219. afterDataMap.put("sootImgUrl", StrUtil.isNotBlank(sootImgUrl) ? "http://" + sootImgUrl : "");
  220. afterDataMap.put("allImgUrl", newImgList);
  221. }
  222. }
  223. // 查询检测点和船舶信息、船舶进出港记录
  224. ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.offsetDay(new Date(), -20)), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "", "");
  225. if (eepReportRecInfo != null) {
  226. afterDataMap.put("destination", eepReportRecInfo.getNextPortName());
  227. afterDataMap.put("berthName", eepReportRecInfo.getBerthName());
  228. afterDataMap.put("expectTime", eepReportRecInfo.getExpectTime());
  229. }
  230. // 超过阈值后直接判定为违规船舶
  231. afterDataMap.put("illegalStatus", 3);
  232. afterDataMap.put("uploadFlag", 0);
  233. // TODO 上报行政检查系统
  234. } else {
  235. afterDataMap.put("uploadFlag", 0);
  236. afterDataMap.put("illegalStatus", 1);
  237. }
  238. // 查询检测点和船舶信息
  239. AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("mmsi")), "", "");
  240. if (shipInfo != null) {
  241. afterDataMap.put("shipName", shipInfo.getShipName());
  242. afterDataMap.put("shipRegionType", shipInfo.getShipRegionType());
  243. }
  244. // 对于没有从ais系统拿到船名的情况下,直接取rcgShipName
  245. if (ObjectUtil.isEmpty(afterDataMap.get("shipName"))) {
  246. afterDataMap.put("shipName", afterDataMap.get("rcgShipName"));
  247. }
  248. if (ObjectUtil.isNotEmpty(afterDataMap.get("snapPos"))) {
  249. afterDataMap.put("orgName", orgMap.get(Convert.toStr(afterDataMap.get("snapPos"))));
  250. afterDataMap.put("deviceId", heiyanDeviceMap.get(Convert.toStr(afterDataMap.get("snapPos"))));
  251. }
  252. // 保存数据(黑烟暂时不需要推送到行政检查)
  253. // saveIllegalData(afterDataMap, ElasticConstants.HEIYAN_SHIP_RECOGNITION);
  254. client.createDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  255. } else if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
  256. // 删除设备和类型的统计缓存
  257. log.info("删除redis设备缓存");
  258. redisCache.deleteObject(CollUtil.set(false,
  259. CacheConstants.DEVICE_STATIC,
  260. CacheConstants.DEVICE_TYPE_STATIC,
  261. CacheConstants.DEVICE_LIST_STATIC));
  262. } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
  263. // 检测点信息
  264. client.createDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  265. redisCache.deleteObject(CacheConstants.DEVICE_STATIC);
  266. } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) {
  267. // 设备信息
  268. log.info("删除redis设备缓存");
  269. redisCache.deleteObject(CollUtil.set(false,
  270. CacheConstants.DEVICE_STATIC,
  271. CacheConstants.DEVICE_TYPE_STATIC,
  272. CacheConstants.DEVICE_LIST_STATIC));
  273. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
  274. // 嗅探系统-站点信息
  275. client.createDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  276. log.info("删除redis设备缓存");
  277. redisCache.deleteObject(CollUtil.set(false,
  278. CacheConstants.DEVICE_STATIC,
  279. CacheConstants.DEVICE_TYPE_STATIC,
  280. CacheConstants.DEVICE_LIST_STATIC));
  281. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
  282. // 嗅探系统-违规船舶
  283. afterDataMap.put("mmsi", afterDataMap.get("shipMmsi"));
  284. afterDataMap.put("createTime", afterDataMap.get("peakTime"));
  285. afterDataMap.put("so2Percent", afterDataMap.get("sPercent"));
  286. if ("prod".equals(env)) {
  287. // 查询检测点和船舶信息、船舶进出港记录
  288. AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("mmsi")), "", "");
  289. if (shipInfo != null) {
  290. afterDataMap.put("shipName", shipInfo.getShipName());
  291. afterDataMap.put("shipRegionType", shipInfo.getShipRegionType());
  292. }
  293. }
  294. if (ObjectUtil.isNotEmpty(afterDataMap.get("sPercent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("sPercent")), Convert.toBigDecimal(so2ThresholdVal))) {
  295. // 查询检测点和船舶信息、船舶进出港记录
  296. ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.offsetDay(new Date(), -20)), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "",
  297. "");
  298. if (eepReportRecInfo != null) {
  299. afterDataMap.put("destination", eepReportRecInfo.getNextPortName());
  300. afterDataMap.put("berthName", eepReportRecInfo.getBerthName());
  301. afterDataMap.put("expectTime", eepReportRecInfo.getExpectTime());
  302. }
  303. // 嫌疑船舶
  304. afterDataMap.put("illegalStatus", 2);
  305. afterDataMap.put("uploadFlag", 0);
  306. // TODO 上报行政检查系统
  307. } else {
  308. afterDataMap.put("uploadFlag", 0);
  309. afterDataMap.put("illegalStatus", 1);
  310. }
  311. afterDataMap.put("deviceId", afterDataMap.get("semId"));
  312. // 查询站点信息
  313. Map<String, Object> sem = client.getDocById(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("semId")), "name,category");
  314. if (sem != null && ObjectUtil.equal(sem.get("code"), 200)) {
  315. Map<String, String> data = Convert.toMap(String.class, String.class, sem.get("data"));
  316. afterDataMap.put("semName", data.get("name"));
  317. afterDataMap.put("monitorPointName", data.get("category"));
  318. afterDataMap.put("orgName", orgMap.get(data.get("category")));
  319. }
  320. afterDataMap.put("illegalType", "xiutan");
  321. // 保存数据
  322. saveIllegalData(afterDataMap, ElasticConstants.AIS_ILLEGAL_SHIP);
  323. client.createDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  324. }
  325. break;
  326. case UPDATE:
  327. if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
  328. if (ObjectUtil.notEqual(beforeDataMap.get("deviceImgUrl"), afterDataMap.get("deviceImgUrl"))
  329. || ObjectUtil.notEqual(beforeDataMap.get("latitude"), afterDataMap.get("latitude"))
  330. || ObjectUtil.notEqual(beforeDataMap.get("longitude"), afterDataMap.get("longitude"))
  331. || ObjectUtil.notEqual(beforeDataMap.get("status"), afterDataMap.get("status"))) {
  332. log.info("删除redis设备缓存");
  333. redisCache.deleteObject(CollUtil.set(false,
  334. CacheConstants.DEVICE_STATIC,
  335. CacheConstants.DEVICE_TYPE_STATIC,
  336. CacheConstants.DEVICE_LIST_STATIC));
  337. }
  338. } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "common_record")) {
  339. // 船舶记录
  340. if ("prod".equals(env)) {
  341. AisShipInfo shipInfo = aisInfoService.getShipInfo("", Convert.toStr(afterDataMap.get("mmsi")), "", "");
  342. if (shipInfo != null) {
  343. afterDataMap.put("shipName", shipInfo.getShipName());
  344. afterDataMap.put("shipRegionType", shipInfo.getShipRegionType());
  345. }
  346. }
  347. // 判断是否大于阈值
  348. if (ObjectUtil.isNotEmpty(afterDataMap.get("so2Percent")) && NumberUtil.isGreaterOrEqual(Convert.toBigDecimal(afterDataMap.get("so2Percent")), Convert.toBigDecimal(so2ThresholdVal))) {
  349. // 查询检测点和船舶信息、船舶进出港记录
  350. ShipEepReportRecInfo eepReportRecInfo = aisInfoService.getShipEepReportRecInfo(DateUtil.formatDate(DateUtil.offsetDay(new Date(), -20)), DateUtil.formatDate(DateUtil.tomorrow()), Convert.toStr(afterDataMap.get("mmsi")), "", "");
  351. if (eepReportRecInfo != null) {
  352. afterDataMap.put("destination", eepReportRecInfo.getNextPortName());
  353. afterDataMap.put("berthName", eepReportRecInfo.getBerthName());
  354. afterDataMap.put("expectTime", eepReportRecInfo.getExpectTime());
  355. }
  356. // 嫌疑船舶
  357. afterDataMap.put("illegalStatus", 2);
  358. afterDataMap.put("uploadFlag", 0);
  359. // TODO 上报行政检查系统
  360. } else {
  361. afterDataMap.put("uploadFlag", 0);
  362. afterDataMap.put("illegalStatus", 1);
  363. }
  364. Map<String, Object> monitor = client.getDocById(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("monitorPointId")), "name");
  365. if (monitor != null && ObjectUtil.equal(monitor.get("code"), 200)) {
  366. Map<String, String> data = Convert.toMap(String.class, String.class, monitor.get("data"));
  367. afterDataMap.put("monitorPointName", data.get("name"));
  368. afterDataMap.put("orgName", orgMap.get(data.get("name")));
  369. }
  370. afterDataMap.put("illegalType", "guangpu");
  371. // 保存数据
  372. saveIllegalData(afterDataMap, ElasticConstants.SO2_ALERT);
  373. client.createDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  374. } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
  375. // 检测点信息
  376. client.updateDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  377. redisCache.deleteObject(CacheConstants.DEVICE_STATIC);
  378. } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) {
  379. // 设备信息
  380. // 如果状态和经纬度变更才去删除redis
  381. if (ObjectUtil.notEqual(beforeDataMap.get("online_status"), afterDataMap.get("online_status")) || ObjectUtil.notEqual(beforeDataMap.get("longitude"), afterDataMap.get("longitude")) || ObjectUtil.notEqual(beforeDataMap.get(
  382. "latitude"), afterDataMap.get("latitude"))) {
  383. log.info("删除redis设备缓存");
  384. redisCache.deleteObject(CollUtil.set(false,
  385. CacheConstants.DEVICE_STATIC,
  386. CacheConstants.DEVICE_TYPE_STATIC,
  387. CacheConstants.DEVICE_LIST_STATIC));
  388. }
  389. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
  390. // 嗅探系统-站点信息
  391. client.updateDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  392. if (ObjectUtil.notEqual(beforeDataMap.get("latitude"), afterDataMap.get("latitude")) || ObjectUtil.notEqual(beforeDataMap.get("longitude"), afterDataMap.get("longitude"))) {
  393. log.info("删除redis设备缓存");
  394. redisCache.deleteObject(CollUtil.set(false,
  395. CacheConstants.DEVICE_STATIC,
  396. CacheConstants.DEVICE_TYPE_STATIC,
  397. CacheConstants.DEVICE_LIST_STATIC));
  398. }
  399. }
  400. break;
  401. case DELETE:
  402. if (StrUtil.equalsIgnoreCase(database, "smoke_api") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
  403. log.info("删除redis设备缓存");
  404. redisCache.deleteObject(CollUtil.set(false,
  405. CacheConstants.DEVICE_STATIC,
  406. CacheConstants.DEVICE_TYPE_STATIC,
  407. CacheConstants.DEVICE_LIST_STATIC));
  408. } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
  409. // 检测点信息
  410. client.deleteDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(beforeDataMap.get("id")));
  411. redisCache.deleteObject(CacheConstants.DEVICE_STATIC);
  412. } else if (StrUtil.equalsIgnoreCase(database, "ship") && StrUtil.equalsIgnoreCase(table, "device")) {
  413. // 设备信息
  414. log.info("删除redis设备缓存");
  415. redisCache.deleteObject(CollUtil.set(false,
  416. CacheConstants.DEVICE_STATIC,
  417. CacheConstants.DEVICE_TYPE_STATIC,
  418. CacheConstants.DEVICE_LIST_STATIC));
  419. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
  420. // 嗅探系统-站点信息
  421. client.deleteDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(beforeDataMap.get("id")));
  422. log.info("删除redis设备缓存");
  423. redisCache.deleteObject(CollUtil.set(false,
  424. CacheConstants.DEVICE_STATIC,
  425. CacheConstants.DEVICE_TYPE_STATIC,
  426. CacheConstants.DEVICE_LIST_STATIC));
  427. }
  428. break;
  429. default:
  430. break;
  431. }
  432. }
  433. private void saveIllegalData(Map<String, Object> afterDataMap, String indexName) {
  434. if (Convert.toInt(afterDataMap.get("illegalStatus")) != 1) {
  435. IllegalShipData illegalShipData = new IllegalShipData();
  436. BeanUtil.fillBeanWithMap(afterDataMap, illegalShipData, CopyOptions.create().setIgnoreProperties("id"));
  437. if (StrUtil.isBlank(illegalShipData.getMmsi()) || "0".equals(illegalShipData.getMmsi())) {
  438. return;
  439. }
  440. illegalShipData.setId(IdUtils.fastSimpleUUID());
  441. illegalShipData.setSystemId(Convert.toStr(afterDataMap.get("id")));
  442. illegalShipData.setSystemEsIndex(indexName);
  443. // 查询此船是否已上报过,如果是,则不需要再上报.根据mmsi和uploadFlag=1,illegalStatus=2
  444. IllegalShipData existBean = new IllegalShipData();
  445. existBean.setMmsi(illegalShipData.getMmsi());
  446. existBean.setIllegalStatus(2);
  447. existBean.setUploadFlag(1);
  448. List<IllegalShipData> exist = illegalShipDataService.selectIllegalShipDataList(existBean);
  449. if (exist != null && exist.size() > 0) {
  450. illegalShipData.setIllegalStatus(4);
  451. }
  452. try {
  453. illegalShipDataService.insertIllegalShipData(illegalShipData);
  454. } catch (Exception ignored) {
  455. ignored.printStackTrace();
  456. }
  457. }
  458. }
  459. /**
  460. * 或缺数据库字段的大小写
  461. *
  462. * @param data
  463. * @param <V>
  464. * @return
  465. */
  466. private static <V> LinkedCaseInsensitiveMap<V> toCaseInsensitiveMap(Map<String, V> data) {
  467. LinkedCaseInsensitiveMap map = new LinkedCaseInsensitiveMap();
  468. map.putAll(data);
  469. return map;
  470. }
  471. @Override
  472. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  473. this.applicationContext = applicationContext;
  474. }
  475. }