Эх сурвалжийг харах

使用canal client来监控binlog的数据变化

459242451@qq.com 4 жил өмнө
parent
commit
13b3f59ac4

+ 58 - 1
README.md

@@ -3,5 +3,62 @@
 ### 四个数据库
 二氧化硫、黑烟、光谱、基础库
 ```
-光谱分析:地址:47.92.161.189  端口:3306 账号 js_test 密码 js123456 
+光谱分析:地址:47.92.161.189  端口:3306 账号 js_test 密码 js123456
+本地测试库:地址:200.200.19.126:3306 root/root 
+```
+
+> ElasticSearch
+```
+内网测试环境
+http://200.200.19.122:5601/app/dev_tools#/console
+http://200.200.19.122:9200
+
+外网测试环境
+http://212.129.138.23:9100
+http://212.129.138.23:9200
+```
+
+> canal
+```
+启动流程:
+1.cd /Users/huangcheng/workspace/canal/canal.admin-1.1.5/bin 
+    sh startup.sh
+2.cd /Users/huangcheng/workspace/canal/canal.deployer-1.1.5/bin
+    sh startup.sh
+3.cd /Users/huangcheng/workspace/canal/canal.adapter-1.1.5/bin
+    sh startup.sh
+canal admin管理后台 http://127.0.0.1:8089   admin/123456
+
+```
+```
+Entry=====> RowChange=====> Column
+
+Entry
+    Header
+        logfileName [binlog文件名]
+        logfileOffset [binlog position]
+        executeTime [发生的变更]
+        schemaName 
+        tableName
+        eventType [insert/update/delete类型]
+    entryType   [事务头BEGIN/事务尾END/数据ROWDATA]
+    storeValue  [byte数据,可展开,对应的类型为RowChange]    
+ 
+ 
+RowChange
+    isDdl       [是否是ddl变更操作,比如create table/drop table]
+    sql     [具体的ddl sql]
+    rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
+        beforeColumns [Column类型的数组]
+        afterColumns [Column类型的数组]  
+ 
+    
+Column 
+    index       
+    sqlType     [jdbc type]
+    name        [column name]
+    isKey       [是否为主键]
+    updated     [是否发生过变更]
+    isNull      [值是否为null]
+
 ```

+ 2 - 0
ruoyi-admin/src/main/java/com/ruoyi/RuoYiApplication.java

@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
 /**
  * 启动程序
@@ -12,6 +13,7 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
  */
 @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
 @Slf4j
+@EnableScheduling
 public class RuoYiApplication
 {
     public static void main(String[] args)

+ 49 - 0
ruoyi-admin/src/main/java/com/ruoyi/web/core/config/CanalClient.java

@@ -0,0 +1,49 @@
+package com.ruoyi.web.core.config;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalConnectors;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+import java.net.InetSocketAddress;
+
+/**
+ * @Description: canal配置类
+ * @Author: huangcheng
+ * @Date: 2021/8/16
+ * @Version V1.0
+ */
+@Component
+@Slf4j
+public class CanalClient implements DisposableBean {
+
+    private CanalConnector canalConnector;
+
+    @Bean
+    public CanalConnector getCanalConnector() {
+        canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
+                11111), "heiyan", "", "");
+        canalConnector.connect();
+        //指定filter,格式{database}.{table}
+        canalConnector.subscribe();
+        //回滚寻找上次中断的为止
+        canalConnector.rollback();
+        return canalConnector;
+    }
+
+    /**
+     * 在spring容器销毁的时候,需要断开canal客户端的连接
+     * 防止canal连接的泄露
+     *
+     * @throws Exception
+     */
+    @Override
+    public void destroy() throws Exception {
+        if (canalConnector != null) {
+            log.info("断开canal客户端的连接");
+            canalConnector.disconnect();
+        }
+    }
+}

+ 27 - 0
ruoyi-admin/src/main/java/com/ruoyi/web/core/config/CanalConfig.java

@@ -0,0 +1,27 @@
+package com.ruoyi.web.core.config;
+
+import com.ruoyi.web.job.CanalScheduling;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.List;
+
+/**
+ * @Description: TODO
+ * @Author: huangcheng
+ * @Date: 2021/8/17
+ * @Version V1.0
+ */
+@Configuration
+@ConfigurationProperties("canal")
+@Data
+public class CanalConfig {
+    private List<Config> configs;
+    @Data
+    public static class Config {
+        private String hostname;
+        private int port;
+        private String destination;
+    }
+}

+ 168 - 0
ruoyi-admin/src/main/java/com/ruoyi/web/job/CanalScheduling.java

@@ -0,0 +1,168 @@
+package com.ruoyi.web.job;
+
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalConnectors;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.ruoyi.framework.config.ElasticSearchClient;
+import com.ruoyi.web.core.config.CanalConfig;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @Description: TODO
+ * @Author: huangcheng
+ * @Date: 2021/8/16
+ * @Version V1.0
+ */
+@Slf4j
+@Component
+public class CanalScheduling implements Runnable, ApplicationContextAware {
+
+    private ApplicationContext applicationContext;
+    @Autowired
+    private ElasticSearchClient client;
+//    @Resource
+//    private CanalConnector canalConnector;
+
+    @Autowired
+    private CanalConfig canalConfig;
+
+    @Override
+    @Scheduled(cron = "0/1 * * * * ?", fixedDelay = 100) //每隔100秒执行
+    public void run() {
+        for (CanalConfig.Config config : canalConfig.getConfigs()) {
+            // 创建链接
+            CanalConnector canalConnector = CanalConnectors
+                    .newSingleConnector(new InetSocketAddress(config.getHostname(), config.getPort()), config.getDestination(), "", "");
+            long batchId = -1;
+            try {
+                canalConnector.connect();
+                canalConnector.subscribe();
+                canalConnector.rollback();
+
+                //每次拉取条数
+                int batchSize = 1000;
+                Message message = canalConnector.getWithoutAck(batchSize);
+                //批次id
+                batchId = message.getId();
+                List<CanalEntry.Entry> entries = message.getEntries();
+                if (batchId != -1 && entries.size() > 0) {
+                    entries.forEach(entry -> {
+                        //MySQL种my.cnf中配置的是binlog_format = ROW,这里只解析ROW类型
+                        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
+                            //解析处理
+                            publishCanalEvent(entry);
+                        }
+                    });
+                }
+                canalConnector.ack(batchId);
+            } catch (Exception e) {
+                log.info("canal存在异常:{}", e.getMessage());
+                e.printStackTrace();
+                canalConnector.rollback(batchId);
+            } finally {
+                // 断开连接
+                canalConnector.disconnect();
+            }
+        }
+    }
+
+    private void publishCanalEvent(CanalEntry.Entry entry) {
+        //表名
+        String tableName = entry.getHeader().getTableName();
+        //数据库名
+        String database = entry.getHeader().getSchemaName();
+        // 操作类型
+        CanalEntry.EventType eventType = entry.getHeader().getEventType();
+
+        log.info(String.format("========> binlog[%s:%s] , name[%s,%s] , eventType[%s]",
+                entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
+                database, tableName,
+                eventType));
+
+        CanalEntry.RowChange rowChange;
+        try {
+            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+        } catch (InvalidProtocolBufferException e) {
+            e.printStackTrace();
+            return;
+        }
+
+        rowChange.getRowDatasList().forEach(rowData -> {
+            //获取改变前的数据
+            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
+            //获取改变后的数据
+            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
+            Map<String, Object> beforeColumnsToMap = parseColumnsToMap(beforeColumnsList);
+            Map<String, Object> afterColumnsToMap = parseColumnsToMap(afterColumnsList);
+            try {
+                //插入es
+                indexES(beforeColumnsToMap, afterColumnsToMap, eventType, database, tableName);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        });
+    }
+
+    Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) {
+        Map<String, Object> map = new HashMap<>();
+        columns.forEach(column -> {
+            if (column == null) {
+                return;
+            }
+            map.put(column.getName(), column.getValue());
+        });
+        return map;
+    }
+
+    private void indexES(Map<String, Object> beforeDataMap, Map<String, Object> afterDataMap, CanalEntry.EventType eventType, String database, String table) throws IOException {
+        log.info("eventType:{},database:{},table:{}\nbeforeMap:{},\n afterMap:{}", eventType, database, table, beforeDataMap, afterDataMap);
+        if (!StrUtil.equals("test", database)) {
+            return;
+        }
+
+        //不是user表中的数据不处理
+        if (!StrUtil.equals("user", table)) {
+            return;
+        }
+
+        // 根据不同类型处理相应的逻辑
+        switch (eventType) {
+            case INSERT:
+                break;
+            case UPDATE:
+                break;
+            case DELETE:
+                break;
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+}

+ 18 - 0
ruoyi-admin/src/main/resources/application-prod.yml

@@ -93,6 +93,24 @@ spring:
                 # #连接池最大阻塞等待时间(使用负值表示没有限制)
                 max-wait: -1ms
 
+elasticsearch:
+    port: 9200
+    ip: 212.129.138.23
+    type: http  #访问方式
+    numberOfReplicas: 2  #副本数
+    numberOfShards: 5  #分片数
+    defaultIndex: materials  #默认加载索引
+    searchTimeOut: 10    #搜索超时时长,单位:秒
+    masterShardTimeOut: 1 #主分片执行更新的超时时长 ,单位:秒
+
+canal:
+    configs:
+        - hostname: 127.0.0.1
+          port: 11111
+          destination: heiyan
+        - hostname: 127.0.0.1
+          port: 11111
+          destination: so2
 
 # Swagger配置
 swagger:

+ 9 - 0
ruoyi-admin/src/main/resources/application-test.yml

@@ -102,6 +102,15 @@ elasticsearch:
     searchTimeOut: 10    #搜索超时时长,单位:秒
     masterShardTimeOut: 1 #主分片执行更新的超时时长 ,单位:秒
 
+canal:
+    configs:
+        - hostname: 127.0.0.1
+          port: 11111
+          destination: heiyan
+        - hostname: 127.0.0.1
+          port: 11111
+          destination: so2
+
 # Swagger配置
 swagger:
     # 是否开启swagger

+ 17 - 0
ruoyi-framework/pom.xml

@@ -99,6 +99,23 @@
             <version>7.14.0</version>
         </dependency>
 
+        <!-- canal -->
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.client</artifactId>
+            <version>1.1.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.common</artifactId>
+            <version>1.1.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.otter</groupId>
+            <artifactId>canal.protocol</artifactId>
+            <version>1.1.5</version>
+        </dependency>
+
     </dependencies>
 
 </project>