CanalScheduling.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package com.ruoyi.web.job;
  2. import cn.hutool.core.convert.Convert;
  3. import cn.hutool.core.util.ObjectUtil;
  4. import cn.hutool.core.util.StrUtil;
  5. import com.alibaba.otter.canal.client.CanalConnector;
  6. import com.alibaba.otter.canal.client.CanalConnectors;
  7. import com.alibaba.otter.canal.protocol.CanalEntry;
  8. import com.alibaba.otter.canal.protocol.Message;
  9. import com.google.protobuf.InvalidProtocolBufferException;
  10. import com.ruoyi.common.constant.CacheConstants;
  11. import com.ruoyi.common.constant.ElasticConstants;
  12. import com.ruoyi.common.core.redis.RedisCache;
  13. import com.ruoyi.common.utils.JdbcTypeUtil;
  14. import com.ruoyi.framework.config.ElasticSearchClient;
  15. import com.ruoyi.web.core.config.CanalConfig;
  16. import lombok.SneakyThrows;
  17. import lombok.extern.slf4j.Slf4j;
  18. import org.springframework.beans.BeansException;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.context.ApplicationContext;
  21. import org.springframework.context.ApplicationContextAware;
  22. import org.springframework.scheduling.annotation.Scheduled;
  23. import org.springframework.stereotype.Component;
  24. import org.springframework.util.LinkedCaseInsensitiveMap;
  25. import java.net.InetSocketAddress;
  26. import java.util.HashMap;
  27. import java.util.List;
  28. import java.util.Map;
  29. /**
  30. * @Description: TODO
  31. * @Author: huangcheng
  32. * @Date: 2021/8/16
  33. * @Version V1.0
  34. */
  35. @Slf4j
  36. @Component
  37. public class CanalScheduling implements Runnable, ApplicationContextAware {
  38. private ApplicationContext applicationContext;
  39. @Autowired
  40. private ElasticSearchClient client;
  41. @Autowired
  42. private RedisCache redisCache;
  43. // @Resource
  44. // private CanalConnector canalConnector;
  45. @Autowired
  46. private CanalConfig canalConfig;
  47. @Override
  48. @Scheduled(fixedDelay = 100) //每隔100秒执行
  49. public void run() {
  50. for (CanalConfig.Config config : canalConfig.getConfigs()) {
  51. // 创建链接
  52. CanalConnector canalConnector = CanalConnectors
  53. .newSingleConnector(new InetSocketAddress(config.getHostname(), config.getPort()), config.getDestination(), "", "");
  54. long batchId = -1;
  55. try {
  56. canalConnector.connect();
  57. canalConnector.subscribe();
  58. canalConnector.rollback();
  59. //每次拉取条数
  60. int batchSize = 1000;
  61. Message message = canalConnector.getWithoutAck(batchSize);
  62. //批次id
  63. batchId = message.getId();
  64. List<CanalEntry.Entry> entries = message.getEntries();
  65. if (batchId != -1 && entries.size() > 0) {
  66. entries.forEach(entry -> {
  67. //MySQL种my.cnf中配置的是binlog_format = ROW,这里只解析ROW类型
  68. if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
  69. //解析处理
  70. publishCanalEvent(entry);
  71. }
  72. });
  73. }
  74. canalConnector.ack(batchId);
  75. } catch (Exception e) {
  76. log.info("canal存在异常:{}", e.getMessage());
  77. e.printStackTrace();
  78. canalConnector.rollback(batchId);
  79. } finally {
  80. // 断开连接
  81. canalConnector.disconnect();
  82. }
  83. }
  84. }
  85. private void publishCanalEvent(CanalEntry.Entry entry) {
  86. //表名
  87. String tableName = entry.getHeader().getTableName();
  88. //数据库名
  89. String database = entry.getHeader().getSchemaName();
  90. // 操作类型
  91. CanalEntry.EventType eventType = entry.getHeader().getEventType();
  92. log.info(String.format("========> binlog[%s:%s] , name[%s,%s] , eventType[%s]",
  93. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  94. database, tableName,
  95. eventType));
  96. CanalEntry.RowChange rowChange;
  97. try {
  98. rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  99. } catch (InvalidProtocolBufferException e) {
  100. e.printStackTrace();
  101. return;
  102. }
  103. rowChange.getRowDatasList().forEach(rowData -> {
  104. //获取改变前的数据
  105. List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
  106. //获取改变后的数据
  107. List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
  108. Map<String, Object> beforeColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(beforeColumnsList, tableName));
  109. Map<String, Object> afterColumnsToMap = toCaseInsensitiveMap(parseColumnsToMap(afterColumnsList, tableName));
  110. //插入es
  111. indexES(beforeColumnsToMap, afterColumnsToMap, eventType, database, tableName);
  112. });
  113. }
  114. Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns, String tableName) {
  115. Map<String, Object> map = new HashMap<>();
  116. columns.forEach(column -> {
  117. if (column == null) {
  118. return;
  119. }
  120. map.put(StrUtil.lowerFirst(StrUtil.toCamelCase(column.getName())), JdbcTypeUtil.typeConvert(tableName, column.getName(), column.getValue(), column.getSqlType(), column.getMysqlType()));
  121. });
  122. return map;
  123. }
  124. @SneakyThrows
  125. private void indexES(Map<String, Object> beforeDataMap, Map<String, Object> afterDataMap, CanalEntry.EventType eventType, String database, String table) {
  126. log.info("eventType:{},database:{},table:{}\nbeforeMap:{},\n afterMap:{}", eventType, database, table, beforeDataMap, afterDataMap);
  127. if (!StrUtil.equalsAnyIgnoreCase(database, "heiyan", "so2", "ais_database")) {
  128. return;
  129. }
  130. //不是user表中的数据不处理
  131. if (!StrUtil.equalsAnyIgnoreCase(table, "ship_recognition",
  132. "alert",
  133. "monitor_point",
  134. "device",
  135. "ship_static_info",
  136. "illegal_ship",
  137. "sem_instrument",
  138. "sem_instrument_test")) {
  139. return;
  140. }
  141. // 根据不同类型处理相应的逻辑
  142. switch (eventType) {
  143. case INSERT:
  144. // 黑烟船舶数据入库
  145. if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
  146. client.createDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  147. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
  148. // 检测点信息
  149. client.createDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  150. redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
  151. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) {
  152. // 设备信息
  153. redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
  154. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
  155. // 船舶信息
  156. if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
  157. client.createDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), afterDataMap);
  158. }
  159. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
  160. // 违规船舶
  161. // 查询检测点和船舶信息
  162. if (ObjectUtil.isNotEmpty(afterDataMap.get("mmsi"))) {
  163. Map<String, Object> mmsi = client.getDocById(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), "name,destination");
  164. if (mmsi != null && ObjectUtil.equal(mmsi.get("code"), 200)) {
  165. Map<String, String> data = Convert.toMap(String.class, String.class, mmsi.get("data"));
  166. afterDataMap.put("shipName", data.get("name"));
  167. afterDataMap.put("destination", data.get("destination"));
  168. }
  169. }
  170. Map<String, Object> monitor = client.getDocById(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("monitorPointId")), "name");
  171. if (monitor != null && ObjectUtil.equal(monitor.get("code"), 200)) {
  172. Map<String, String> data = Convert.toMap(String.class, String.class, monitor.get("data"));
  173. afterDataMap.put("monitorPointName", data.get("name"));
  174. }
  175. client.createDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  176. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
  177. // 嗅探系统-站点信息
  178. client.createDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  179. redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
  180. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
  181. // 嗅探系统-违规船舶
  182. // 查询站点信息
  183. Map<String, Object> sem = client.getDocById(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("semId")), "name");
  184. if (sem != null && ObjectUtil.equal(sem.get("code"), 200)) {
  185. Map<String, String> data = Convert.toMap(String.class, String.class, sem.get("data"));
  186. afterDataMap.put("semName", data.get("name"));
  187. }
  188. // 初始化设为未上传状态
  189. afterDataMap.put("uploadFlag", 0);
  190. client.createDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  191. }
  192. break;
  193. case UPDATE:
  194. if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
  195. client.updateDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  196. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
  197. // 检测点信息
  198. client.updateDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  199. redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
  200. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) {
  201. // 设备信息
  202. redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
  203. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
  204. // 船舶信息
  205. client.updateDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")), afterDataMap);
  206. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
  207. // 违规船舶
  208. client.updateDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  209. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
  210. // 嗅探系统-站点信息
  211. client.updateDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  212. redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
  213. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
  214. // 嗅探系统-违规船舶
  215. // 查询站点信息
  216. Map<String, Object> sem = client.getDocById(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("semId")), "name");
  217. if (sem != null && ObjectUtil.equal(sem.get("code"), 200)) {
  218. Map<String, String> data = Convert.toMap(String.class, String.class, sem.get("data"));
  219. afterDataMap.put("semName", data.get("name"));
  220. }
  221. client.updateDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")), afterDataMap);
  222. }
  223. break;
  224. case DELETE:
  225. if (StrUtil.equalsIgnoreCase(database, "heiyan") && StrUtil.equalsIgnoreCase(table, "ship_recognition")) {
  226. client.deleteDocument(ElasticConstants.HEIYAN_SHIP_RECOGNITION, Convert.toStr(afterDataMap.get("id")));
  227. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "monitor_point")) {
  228. // 检测点信息
  229. client.deleteDocument(ElasticConstants.SO2_MONITOR_POINT, Convert.toStr(afterDataMap.get("id")));
  230. redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
  231. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "device")) {
  232. // 设备信息
  233. redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
  234. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "ship_static_info")) {
  235. // 船舶信息
  236. client.deleteDocument(ElasticConstants.SO2_SHIP_STATIC_INFO, Convert.toStr(afterDataMap.get("mmsi")));
  237. } else if (StrUtil.equalsIgnoreCase(database, "so2") && StrUtil.equalsIgnoreCase(table, "alert")) {
  238. // 违规船舶
  239. client.deleteDocument(ElasticConstants.SO2_ALERT, Convert.toStr(afterDataMap.get("id")));
  240. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && (StrUtil.equalsIgnoreCase(table, "sem_instrument_test") || StrUtil.equalsIgnoreCase(table, "sem_instrument"))) {
  241. // 嗅探系统-站点信息
  242. client.deleteDocument(ElasticConstants.AIS_SEM_INSTRUMENT, Convert.toStr(afterDataMap.get("id")));
  243. redisCache.deleteObject(CacheConstants.SO2_DEVICE_STATIC);
  244. } else if (StrUtil.equalsIgnoreCase(database, "ais_database") && StrUtil.equalsIgnoreCase(table, "illegal_ship")) {
  245. // 嗅探系统-违规船舶
  246. client.deleteDocument(ElasticConstants.AIS_ILLEGAL_SHIP, Convert.toStr(afterDataMap.get("id")));
  247. }
  248. break;
  249. default:
  250. break;
  251. }
  252. }
  253. /**
  254. * 或缺数据库字段的大小写
  255. *
  256. * @param data
  257. * @param <V>
  258. * @return
  259. */
  260. private static <V> LinkedCaseInsensitiveMap<V> toCaseInsensitiveMap(Map<String, V> data) {
  261. LinkedCaseInsensitiveMap map = new LinkedCaseInsensitiveMap();
  262. map.putAll(data);
  263. return map;
  264. }
  265. @Override
  266. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  267. this.applicationContext = applicationContext;
  268. }
  269. }