CanalScheduling.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. package com.ruoyi.web.job;
  2. import cn.hutool.core.collection.CollUtil;
  3. import cn.hutool.core.convert.Convert;
  4. import cn.hutool.core.util.ObjectUtil;
  5. import cn.hutool.core.util.StrUtil;
  6. import com.alibaba.otter.canal.client.CanalConnector;
  7. import com.alibaba.otter.canal.client.CanalConnectors;
  8. import com.alibaba.otter.canal.protocol.CanalEntry;
  9. import com.alibaba.otter.canal.protocol.Message;
  10. import com.google.protobuf.InvalidProtocolBufferException;
  11. import com.ruoyi.common.constant.CacheConstants;
  12. import com.ruoyi.common.constant.ElasticConstants;
  13. import com.ruoyi.common.core.redis.RedisCache;
  14. import com.ruoyi.common.utils.JdbcTypeUtil;
  15. import com.ruoyi.framework.config.ElasticSearchClient;
  16. import com.ruoyi.web.core.config.CanalConfig;
  17. import lombok.SneakyThrows;
  18. import lombok.extern.slf4j.Slf4j;
  19. import org.springframework.beans.BeansException;
  20. import org.springframework.beans.factory.annotation.Autowired;
  21. import org.springframework.context.ApplicationContext;
  22. import org.springframework.context.ApplicationContextAware;
  23. import org.springframework.scheduling.annotation.Scheduled;
  24. import org.springframework.stereotype.Component;
  25. import org.springframework.util.LinkedCaseInsensitiveMap;
  26. import java.net.InetSocketAddress;
  27. import java.util.HashMap;
  28. import java.util.List;
  29. import java.util.Map;
  30. /**
  31. * @Description: TODO
  32. * @Author: huangcheng
  33. * @Date: 2021/8/16
  34. * @Version V1.0
  35. */
  36. @Slf4j
  37. @Component
  38. public class CanalScheduling implements Runnable, ApplicationContextAware {
  39. private ApplicationContext applicationContext;
  40. @Autowired
  41. private ElasticSearchClient client;
  42. @Autowired
  43. private RedisCache redisCache;
  44. // @Resource
  45. // private CanalConnector canalConnector;
  46. @Autowired
  47. private CanalConfig canalConfig;
  48. @Override
  49. @Scheduled(fixedDelay = 100) //每隔100秒执行
  50. public void run() {
  51. for (CanalConfig.Config config : canalConfig.getConfigs()) {
  52. // 创建链接
  53. CanalConnector canalConnector = CanalConnectors
  54. .newSingleConnector(new InetSocketAddress(config.getHostname(), config.getPort()), config.getDestination(), "", "");
  55. long batchId = -1;
  56. try {
  57. canalConnector.connect();
  58. canalConnector.subscribe();
  59. canalConnector.rollback();
  60. //每次拉取条数
  61. int batchSize = 1000;
  62. Message message = canalConnector.getWithoutAck(batchSize);
  63. //批次id
  64. batchId = message.getId();
  65. List<CanalEntry.Entry> entries = message.getEntries();
  66. if (batchId != -1 && entries.size() > 0) {
  67. entries.forEach(entry -> {
  68. //MySQL种my.cnf中配置的是binlog_format = ROW,这里只解析ROW类型
  69. if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
  70. //解析处理
  71. publishCanalEvent(entry);
  72. }
  73. });
  74. }
  75. canalConnector.ack(batchId);
  76. } catch (Exception e) {
  77. log.info("canal存在异常:{}", e.getMessage());
  78. e.printStackTrace();
  79. canalConnector.rollback(batchId);
  80. } finally {
  81. // 断开连接
  82. canalConnector.disconnect();
  83. }
  84. }
  85. }
  86. private void publishCanalEvent(CanalEntry.Entry entry) {
  87. //表名
  88. String tableName = entry.getHeader().getTableName();
  89. //数据库名
  90. String database = entry.getHeader().getSchemaName();
  91. // 操作类型
  92. CanalEntry.EventType eventType = entry.getHeader().getEventType();
  93. log.info(String.format("========> binlog[%s:%s] , name[%s,%s] , eventType[%s]",
  94. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  95. database, tableName,
  96. eventType));
  97. CanalEntry.RowChange rowChange;
  98. try {
  99. rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  100. } catch (InvalidProtocolBufferException e) {
  101. e.printStackTrace();
  102. return;
  103. }
  104. rowChange.getRowDatasList().forEach(rowData -> {
  105. //获取改变前的数据
  106. List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
  107. //获取改变后的数据
  108. List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
  109. Map<String, Object> beforeColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(beforeColumnsList, tableName));
  110. Map<String, Object> afterColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(afterColumnsList, tableName));
  111. //插入es
  112. indexES(beforeColumnsToMap, afterColumnsToMap, eventType, database, tableName);
  113. });
  114. }
  115. Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns, String tableName) {
  116. Map<String, Object> map = new HashMap<>();
  117. columns.forEach(column -> {
  118. if (column == null) {
  119. return;
  120. }
  121. map.put(StrUtil.lowerFirst(StrUtil.toCamelCase(column.getName())), JdbcTypeUtil.typeConvert(tableName, column.getName(), column.getValue(), column.getSqlType(), column.getMysqlType()));
  122. });
  123. return map;
  124. }
  125. @SneakyThrows
  126. private void indexES(Map<String, Object> beforeDataMap, Map<String, Object> afterDataMap, CanalEntry.EventType eventType, String database, String table) {
  127. log.info("eventType:{},database:{},table:{}\nbeforeMap:{},\n afterMap:{}", eventType, database, table, beforeDataMap, afterDataMap);
  128. if (!StrUtil.equalsAnyIgnoreCase(database, "heiyan", "so2", "ais_database")) {
  129. return;
  130. }
  131. //不是user表中的数据不处理
  132. if (!StrUtil.equalsAnyIgnoreCase(table,
  133. "ship_recognition",
  134. "ship_snap_address",
  135. "alert",
  136. "monitor_point",
  137. "device",
  138. "ship_static_info",
  139. "illegal_ship",
  140. "sem_instrument",
  141. "sem_instrument_test")) {
  142. return;
  143. }
  144. // 根据不同类型处理相应的逻辑
  145. switch (eventType) {
  146. case INSERT:
  147. // 黑烟船舶数据入库
  148. if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
  149. client.createDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  150. } else if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
  151. redisCache.deleteObject(CollUtil.set(false,
  152. CacheConstants.DEVICE_STATIC,
  153. CacheConstants.DEVICE_TYPE_STATIC));
  154. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
  155. // 检测点信息
  156. client.createDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  157. redisCache.deleteObject(CacheConstants.DEVICE_STATIC);
  158. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) {
  159. // 设备信息
  160. redisCache.deleteObject(CollUtil.set(false,
  161. CacheConstants.DEVICE_STATIC,
  162. CacheConstants.DEVICE_TYPE_STATIC));
  163. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
  164. // 船舶信息
  165. if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
  166. client.createDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), afterDataMap);
  167. }
  168. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
  169. // 违规船舶
  170. // 查询检测点和船舶信息
  171. if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
  172. Map<String, Object> mmsi = client.getDocById(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), "name,destination");
  173. if (mmsi != null && ObjectUtil.equal(mmsi.get("code"), 200)) {
  174. Map<String, String> data = Convert.toMap(String.class, String.class, mmsi.get("data"));
  175. afterDataMap.put("shipName", data.get("name"));
  176. afterDataMap.put("destination", data.get("destination"));
  177. }
  178. }
  179. Map<String, Object> monitor = client.getDocById(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("monitorPointId")), "name");
  180. if (monitor != null && ObjectUtil.equal(monitor.get("code"), 200)) {
  181. Map<String, String> data = Convert.toMap(String.class, String.class, monitor.get("data"));
  182. afterDataMap.put("monitorPointName", data.get("name"));
  183. }
  184. client.createDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  185. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
  186. // 嗅探系统-站点信息
  187. client.createDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  188. redisCache.deleteObject(CollUtil.set(false,
  189. CacheConstants.DEVICE_STATIC,
  190. CacheConstants.DEVICE_TYPE_STATIC));
  191. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
  192. // 嗅探系统-违规船舶
  193. // 查询站点信息
  194. Map<String, Object> sem = client.getDocById(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("semId")), "name,category");
  195. if (sem != null && ObjectUtil.equal(sem.get("code"), 200)) {
  196. Map<String, String> data = Convert.toMap(String.class, String.class, sem.get("data"));
  197. afterDataMap.put("semName", data.get("name"));
  198. afterDataMap.put("monitorPointName", data.get("category"));
  199. }
  200. // 初始化设为未上传状态
  201. afterDataMap.put("uploadFlag", 0);
  202. client.createDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  203. }
  204. break;
  205. case UPDATE:
  206. if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
  207. client.updateDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  208. } else if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
  209. redisCache.deleteObject(CollUtil.set(false,
  210. CacheConstants.DEVICE_STATIC,
  211. CacheConstants.DEVICE_TYPE_STATIC));
  212. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
  213. // 检测点信息
  214. client.updateDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  215. redisCache.deleteObject(CacheConstants.DEVICE_STATIC);
  216. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) {
  217. // 设备信息
  218. redisCache.deleteObject(CollUtil.set(false,
  219. CacheConstants.DEVICE_STATIC,
  220. CacheConstants.DEVICE_TYPE_STATIC));
  221. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
  222. // 船舶信息
  223. client.updateDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), afterDataMap);
  224. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
  225. // 违规船舶
  226. client.updateDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  227. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
  228. // 嗅探系统-站点信息
  229. client.updateDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  230. redisCache.deleteObject(CollUtil.set(false,
  231. CacheConstants.DEVICE_STATIC,
  232. CacheConstants.DEVICE_TYPE_STATIC));
  233. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
  234. // 嗅探系统-违规船舶
  235. // 查询站点信息
  236. Map<String, Object> sem = client.getDocById(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("semId")), "name,category");
  237. if (sem != null && ObjectUtil.equal(sem.get("code"), 200)) {
  238. Map<String, String> data = Convert.toMap(String.class, String.class, sem.get("data"));
  239. afterDataMap.put("semName", data.get("name"));
  240. afterDataMap.put("monitorPointName", data.get("category"));
  241. }
  242. client.updateDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  243. }
  244. break;
  245. case DELETE:
  246. if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
  247. client.deleteDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")));
  248. } else if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_snap_address")) {
  249. redisCache.deleteObject(CollUtil.set(false,
  250. CacheConstants.DEVICE_STATIC,
  251. CacheConstants.DEVICE_TYPE_STATIC));
  252. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
  253. // 检测点信息
  254. client.deleteDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")));
  255. redisCache.deleteObject(CacheConstants.DEVICE_STATIC);
  256. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) {
  257. // 设备信息
  258. redisCache.deleteObject(CollUtil.set(false,
  259. CacheConstants.DEVICE_STATIC,
  260. CacheConstants.DEVICE_TYPE_STATIC));
  261. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
  262. // 船舶信息
  263. client.deleteDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")));
  264. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
  265. // 违规船舶
  266. client.deleteDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")));
  267. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
  268. // 嗅探系统-站点信息
  269. client.deleteDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")));
  270. redisCache.deleteObject(CollUtil.set(false,
  271. CacheConstants.DEVICE_STATIC,
  272. CacheConstants.DEVICE_TYPE_STATIC));
  273. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
  274. // 嗅探系统-违规船舶
  275. client.deleteDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")));
  276. }
  277. break;
  278. default:
  279. break;
  280. }
  281. }
  282. /**
  283. * 或缺数据库字段的大小写
  284. *
  285. * @param data
  286. * @param <V>
  287. * @return
  288. */
  289. private static <V> LinkedCaseInsensitiveMap<V> toCaseInsensitiveMap(Map<String, V> data) {
  290. LinkedCaseInsensitiveMap map = new LinkedCaseInsensitiveMap();
  291. map.putAll(data);
  292. return map;
  293. }
  294. @Override
  295. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  296. this.applicationContext = applicationContext;
  297. }
  298. }