Browse Source

kafka同步用户信息

温红权 3 years ago
parent
commit
6434f13ec9

+ 31 - 1
ruoyi-admin/nacos/yjzh-sq-dev.yaml

@@ -79,7 +79,7 @@ spring:
 spring:
   redis:
     # 地址
-    host: 175.27.190.58
+    host: 127.0.0.1
     # 端口,默认为6379
     port: 6379
     # 数据库索引
@@ -91,6 +91,36 @@ spring:
     # 是否开启ssl
     ssl: false
 
+--- # kafaka 配置
+spring:
+    kafka:
+        # 指定kafkaserver的地址,集群配多个,中间,逗号隔开
+        bootstrap-servers: 200.200.19.121:9092
+        producer:
+            #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
+            #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
+            #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
+            #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
+            #可以设置的值为:all, -1, 0, 1
+            acks: 1
+            # 每次批量发送消息的数量,produce积累到一定数据,一次发送
+            batch-size: 16384
+            # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
+            buffer-memory: 33554432
+            # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
+            # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
+            retries: 0
+            # 指定消息key和消息体的编解码方式
+            key-serializer: org.apache.kafka.common.serialization.StringSerializer
+            value-serializer: org.apache.kafka.common.serialization.StringSerializer
+        consumer:
+            # 默认的消费组ID
+            group-id: defaultConsumerGroup
+            enable-auto-commit: true
+            auto-offset-reset: latest
+            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+            auto-commit-interval: 1000
 redisson:
   # 线程池数量
   threads: 16

+ 30 - 0
ruoyi-admin/nacos/yjzh-sq-prod.yaml

@@ -133,6 +133,36 @@ spring:
         username: ruoyi
         password: 123456
 
+--- # kafaka 配置
+spring:
+    kafka:
+        # 指定kafkaserver的地址,集群配多个,中间,逗号隔开
+        bootstrap-servers: 200.200.19.121:9092
+        producer:
+            #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
+            #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
+            #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
+            #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
+            #可以设置的值为:all, -1, 0, 1
+            acks: 1
+            # 每次批量发送消息的数量,produce积累到一定数据,一次发送
+            batch-size: 16384
+            # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
+            buffer-memory: 33554432
+            # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
+            # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
+            retries: 0
+            # 指定消息key和消息体的编解码方式
+            key-serializer: org.apache.kafka.common.serialization.StringSerializer
+            value-serializer: org.apache.kafka.common.serialization.StringSerializer
+        consumer:
+            # 默认的消费组ID
+            group-id: defaultConsumerGroup
+            enable-auto-commit: true
+            auto-offset-reset: latest
+            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+            auto-commit-interval: 1000
 ruoyi:
     # 文件路径 示例( Windows配置D:/tocc/uploadPath,Linux配置 /xintong/server/uploadPath)
     profile: /xintong/server

+ 30 - 0
ruoyi-admin/nacos/yjzh-sq-test.yaml

@@ -86,6 +86,36 @@ spring:
     # 是否开启ssl
     ssl: false
 
+--- # kafaka 配置
+spring:
+    kafka:
+        # 指定kafkaserver的地址,集群配多个,中间,逗号隔开
+        bootstrap-servers: 200.200.19.121:9092
+        producer:
+            #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
+            #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
+            #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
+            #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
+            #可以设置的值为:all, -1, 0, 1
+            acks: 1
+            # 每次批量发送消息的数量,produce积累到一定数据,一次发送
+            batch-size: 16384
+            # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
+            buffer-memory: 33554432
+            # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
+            # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
+            retries: 0
+            # 指定消息key和消息体的编解码方式
+            key-serializer: org.apache.kafka.common.serialization.StringSerializer
+            value-serializer: org.apache.kafka.common.serialization.StringSerializer
+        consumer:
+            # 默认的消费组ID
+            group-id: defaultConsumerGroup
+            enable-auto-commit: true
+            auto-offset-reset: latest
+            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+            auto-commit-interval: 1000
 redisson:
   # 线程池数量
   threads: 16

+ 12 - 0
ruoyi-admin/pom.xml

@@ -71,6 +71,18 @@
             <artifactId>ruoyi-demo</artifactId>
         </dependency>
 
+
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <!--<dependency>
             <groupId>org.springframework.cloud</groupId>
             <artifactId>spring-cloud-loadbalancer</artifactId>

+ 213 - 0
ruoyi-admin/src/main/java/com/ruoyi/web/controller/task/SysGatewayUserTask.java

@@ -0,0 +1,213 @@
+package com.ruoyi.web.controller.task;
+
+import cn.hutool.json.JSONUtil;
+import com.ruoyi.common.core.domain.entity.SysDept;
+import com.ruoyi.common.core.domain.entity.SysUser;
+import com.ruoyi.system.service.ISysDeptService;
+import com.ruoyi.system.service.ISysUserService;
+import com.ruoyi.zhdd.domain.GatewayDept;
+import com.ruoyi.zhdd.domain.GatewayOrg;
+import com.ruoyi.zhdd.domain.GatewaySUser;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.List;
+
+@Component
+public class SysGatewayUserTask {
+
+    @Autowired
+    ISysUserService userService;
+
+    @Autowired
+    ISysDeptService deptService;
+
+    @KafkaListener(topics = "t_uc_user")
+    public void onUserMessage(String message){
+        try{
+            GatewaySUser gatewaySUser =  JSONUtil.toBean(message,GatewaySUser.class);
+            if(gatewaySUser!=null){
+                SysUser quser = new SysUser();
+                quser.setOtherId(gatewaySUser.getUserId());
+                List<SysUser> userList = userService.selectUserList(quser);
+                if(userList.isEmpty()){
+                    //插入
+                    SysUser user = new SysUser();
+                    user.setOtherId(gatewaySUser.getUserId());
+                    {
+                        SysDept qdept = new SysDept();
+                        qdept.setOtherId(gatewaySUser.getDeptId());
+                        List<SysDept> deptList = deptService.selectDeptList(qdept);
+                        if(!deptList.isEmpty()){
+                            user.setDeptId(deptList.get(0).getDeptId());
+                        }else{
+                            //还未有部门暂不设置  //需要先同步部门
+
+                        }
+                    }
+                    user.setPhonenumber(gatewaySUser.getMobile());
+                    user.setNickName(gatewaySUser.getName());
+                    user.setUserName(gatewaySUser.getUserName());
+                    user.setRemark(gatewaySUser.getUserGroupId()+"");
+                    user.setUpdateBy("kafka");
+                    user.setUpdateTime(new Date());
+                    user.setCreateBy("kafka");
+                    user.setCreateTime(new Date());
+                    user.setPassword("$2a$10$7JB720yubVSZvUI0rEqK/.VqGOZTH.ulu33dHOiBE8ByOhJIrdAu2");
+                    user.setStatus(gatewaySUser.getStatus());
+                    userService.insertUser(user);
+                }else{
+                    //跟新
+                    SysUser user = userList.get(0);
+                    user.setOtherId(gatewaySUser.getUserId());
+                    {
+                        SysDept qdept = new SysDept();
+                        qdept.setOtherId(gatewaySUser.getDeptId());
+                        List<SysDept> deptList = deptService.selectDeptList(qdept);
+                        if(!deptList.isEmpty()){
+                            user.setDeptId(deptList.get(0).getDeptId());
+                        }else{
+                            //还未有部门暂不设置  //需要先同步部门
+
+                        }
+                    }
+                    user.setPhonenumber(gatewaySUser.getMobile());
+                    user.setNickName(gatewaySUser.getName());
+                    user.setUserName(gatewaySUser.getUserName());
+                    user.setRemark(gatewaySUser.getUserGroupId()+"");
+                    user.setUpdateBy("kafka");
+                    user.setUpdateTime(new Date());
+                    user.setPassword("$2a$10$7JB720yubVSZvUI0rEqK/.VqGOZTH.ulu33dHOiBE8ByOhJIrdAu2");
+                    user.setStatus(gatewaySUser.getStatus());
+                    userService.updateUser(user);
+                }
+
+            }
+
+        }catch (Exception e){
+
+        }
+        System.out.println(message);
+    }
+    @KafkaListener(topics = "t_uc_org")
+    public void onOrgMessage(String message){
+        //默认组织是0   组织间的层级先不考虑
+
+        try{
+           GatewayOrg gatewayOrg =  JSONUtil.toBean(message,GatewayOrg.class);
+            if(gatewayOrg!=null){
+
+
+                    SysDept qdept = new SysDept();
+                    qdept.setOtherId(gatewayOrg.getOrgId());
+                    //为0 的作为组织
+                    qdept.setParentId(0L);
+                    List<SysDept> deptList = deptService.selectDeptList(qdept);
+                    if(deptList.isEmpty()){
+                        //插入
+                        SysDept dept = deptList.get(0);
+                        dept.setOtherId(gatewayOrg.getOrgId());
+                        dept.setParentId(0L);
+                        dept.setDeptName(gatewayOrg.getOrgName());
+                        dept.setCreateBy("kafka");
+                        dept.setCreateTime(new Date());
+                        dept.setUpdateBy("kafka");
+                        dept.setUpdateTime(new Date());
+                        dept.setDelFlag(gatewayOrg.getDelFlag());
+                        deptService.insertDept(dept);
+
+                    }else{
+                        //跟新
+                        SysDept dept = deptList.get(0);
+                        dept.setOtherId(gatewayOrg.getOrgId());
+                        dept.setParentId(0L);
+                        dept.setDeptName(gatewayOrg.getOrgName());
+                        dept.setUpdateBy("kafka");
+                        dept.setUpdateTime(new Date());
+                        dept.setDelFlag(gatewayOrg.getDelFlag());
+                        deptService.updateDept(dept);
+                    }
+            }
+
+
+        }catch (Exception e){
+
+        }
+        System.out.println(message);
+    }
+    @KafkaListener(topics = "t_uc_dept")
+    public void onDeptMessage(String message){
+        try{
+            GatewayDept gatewayDept =  JSONUtil.toBean(message,GatewayDept.class);
+            if(gatewayDept!=null){
+                SysDept qdept = new SysDept();
+                qdept.setOtherId(gatewayDept.getDeptId());
+
+                List<SysDept> deptList = deptService.selectDeptList(qdept);
+                if(deptList.isEmpty()){
+                    //插入
+                    SysDept dept = new SysDept();
+                    //查询父部门
+                    {
+                        SysDept qdept1 = new SysDept();
+                        if(gatewayDept.getParentId()==null||gatewayDept.getParentId().equals("")){
+                            qdept1.setParentId(0L);
+                            qdept1.setOtherId(gatewayDept.getOrgId());
+                        }else{
+                            qdept1.setOtherId(gatewayDept.getParentId());
+                        }
+                        List<SysDept> deptList1 =  deptService.selectDeptList(qdept1);
+                        if(!deptList1.isEmpty()){
+                            dept.setParentId(deptList1.get(0).getDeptId());
+                        }else{
+                            //无父组织或部门 暂不处理吧
+
+                        }
+                    }
+                    dept.setOtherId(gatewayDept.getDeptId());
+                    dept.setDeptName(gatewayDept.getDeptName());
+                    dept.setUpdateBy("kafka");
+                    dept.setUpdateTime(new Date());
+                    dept.setCreateBy("kafka");
+                    dept.setCreateTime(new Date());
+                    dept.setDelFlag(gatewayDept.getDelFlag());
+                    deptService.insertDept(dept);
+                }else{
+                    //跟新
+                    SysDept dept = deptList.get(0);
+                    //查询父部门
+                    {
+                        SysDept qdept1 = new SysDept();
+                        if(gatewayDept.getParentId()==null||gatewayDept.getParentId().equals("")){
+                            qdept1.setParentId(0L);
+                            qdept1.setOtherId(gatewayDept.getOrgId());
+                        }else{
+                            qdept1.setOtherId(gatewayDept.getParentId());
+                        }
+                        List<SysDept> deptList1 =  deptService.selectDeptList(qdept1);
+                        if(!deptList1.isEmpty()){
+                            dept.setParentId(deptList1.get(0).getDeptId());
+                        }else{
+                            //无父组织或部门 暂不处理吧
+
+                        }
+                    }
+                    dept.setOtherId(gatewayDept.getDeptId());
+                    dept.setDeptName(gatewayDept.getDeptName());
+                    dept.setUpdateBy("kafka");
+                    dept.setUpdateTime(new Date());
+                    dept.setDelFlag(gatewayDept.getDelFlag());
+                    deptService.updateDept(dept);
+                }
+
+            }
+
+        }catch (Exception e){
+
+        }
+        System.out.println(message);
+    }
+
+}

+ 1 - 0
ruoyi-admin/src/main/resources/bootstrap-dev.yml

@@ -28,3 +28,4 @@ spring:
 
     compatibility-verifier:
       enabled: false
+

+ 1 - 1
ruoyi-admin/src/main/resources/bootstrap.yml

@@ -1,6 +1,6 @@
 spring:
   profiles:
-    active: test
+    active: dev
 ruoyi:
   # 名称
   name: YJZH-SQ

+ 2 - 0
ruoyi-common/src/main/java/com/ruoyi/common/core/domain/entity/SysDept.java

@@ -124,4 +124,6 @@ public class SysDept implements Serializable {
 	@TableField(exist = false)
 	private Map<String, Object> params = new HashMap<>();
 
+    private String otherId;
+
 }

+ 2 - 0
ruoyi-common/src/main/java/com/ruoyi/common/core/domain/entity/SysUser.java

@@ -176,6 +176,8 @@ public class SysUser implements Serializable {
 	@TableField(exist = false)
 	private Long[] postIds;
 
+    private String otherId;
+
 	/**
 	 * 角色ID
 	 */

+ 5 - 1
ruoyi-system/src/main/resources/mapper/system/SysDeptMapper.xml

@@ -20,10 +20,11 @@
         <result property="createTime" column="create_time"/>
         <result property="updateBy" column="update_by"/>
         <result property="updateTime" column="update_time"/>
+        <result property="otherId" column="other_id"/>
     </resultMap>
 
     <sql id="selectDeptVo">
-        select d.dept_id, d.parent_id, d.ancestors, d.dept_name, d.order_num, d.leader, d.phone, d.email, d.status, d.del_flag, d.create_by, d.create_time
+        select d.dept_id, d.parent_id, d.ancestors, d.dept_name, d.order_num, d.leader, d.phone, d.email, d.status, d.del_flag, d.create_by, d.create_time,d.other_id
         from sys_dept d
     </sql>
 
@@ -33,6 +34,9 @@
 		<if test="deptId != null and deptId != 0">
 			AND dept_id = #{deptId}
 		</if>
+        <if test="otherId != null and otherId != 0">
+            AND other_id = #{otherId}
+        </if>
         <if test="parentId != null and parentId != 0">
             AND parent_id = #{parentId}
         </if>

+ 5 - 0
ruoyi-system/src/main/resources/mapper/system/SysUserMapper.xml

@@ -23,6 +23,7 @@
         <result property="updateBy" column="update_by"/>
         <result property="updateTime" column="update_time"/>
         <result property="remark" column="remark"/>
+        <result property="otherId" column="other_id"/>
         <association property="dept" column="dept_id" javaType="SysDept" resultMap="deptResult"/>
         <collection property="roles" javaType="java.util.List" resultMap="RoleResult"/>
     </resultMap>
@@ -73,6 +74,7 @@
                r.role_key,
                r.role_sort,
                r.data_scope,
+               r.other_id,
                r.status as role_status
         from sys_user u
 		    left join sys_dept d on u.dept_id = d.dept_id
@@ -123,6 +125,9 @@
         <if test="userId != null and userId != 0">
             AND u.user_id = #{userId}
         </if>
+        <if test="otherId != null and otherId != 0">
+            AND u.other_id = #{otherId}
+        </if>
         <if test="userName != null and userName != ''">
             AND u.user_name like concat('%', #{userName}, '%')
         </if>

+ 8 - 0
ruoyi-zhdd/pom.xml

@@ -21,6 +21,14 @@
             <groupId>com.ruoyi</groupId>
             <artifactId>ruoyi-common</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.ruoyi</groupId>
+            <artifactId>ruoyi-system</artifactId>
+        </dependency>
     </dependencies>
 
 </project>

+ 13 - 0
ruoyi-zhdd/src/main/java/com/ruoyi/zhdd/domain/EntInfo.java

@@ -0,0 +1,13 @@
+package com.ruoyi.zhdd.domain;
+
+import lombok.Data;
+
+@Data
+public class EntInfo {
+    private String entName;
+    private String businessLicenseNo;
+    private String creditCode;
+    private String entLegal;
+    private String legalRepName;
+    private String legalRepIdNumber;
+}

+ 19 - 0
ruoyi-zhdd/src/main/java/com/ruoyi/zhdd/domain/GatewayDept.java

@@ -0,0 +1,19 @@
+package com.ruoyi.zhdd.domain;
+
+import lombok.Data;
+
+@Data
+public class GatewayDept {
+    private String createTime;
+    private String createUser;
+    private String delFlag;
+    private String deptId;
+    private String deptName;
+    private String deptShortName;
+    private String orgId;
+    private String orgName;
+    private String parentId;
+    private String parentName;
+    private String updateTime;
+    private String updateUser;
+}

+ 11 - 0
ruoyi-zhdd/src/main/java/com/ruoyi/zhdd/domain/GatewayOrg.java

@@ -0,0 +1,11 @@
+package com.ruoyi.zhdd.domain;
+
+import lombok.Data;
+
+@Data
+public class GatewayOrg {
+    private String delFlag;
+    private String orgId;
+    private String orgName;
+    private String parentId;
+}

+ 21 - 0
ruoyi-zhdd/src/main/java/com/ruoyi/zhdd/domain/GatewaySUser.java

@@ -0,0 +1,21 @@
+package com.ruoyi.zhdd.domain;
+
+import lombok.Data;
+
+@Data
+public class GatewaySUser {
+        private String delFlag;
+        private String updateTime;
+        private String userId;
+        private String name;
+        private String userName;
+        private String deptId;
+        private String deptName;
+        private String orgId;
+        private String orgName;
+        private String status;
+        private String idNumber;
+        private String mobile;
+        private int userGroupId;
+        private EntInfo entInfo;
+}

+ 9 - 0
sql/20211215.sql

@@ -0,0 +1,9 @@
+ALTER TABLE "public"."sys_dept"
+    ADD COLUMN "other_id" varchar(64);
+
+COMMENT ON COLUMN "public"."sys_dept"."other_id" IS '其他系统编号';
+
+ALTER TABLE "public"."sys_user"
+    ADD COLUMN "other_id" varchar(64);
+
+COMMENT ON COLUMN "public"."sys_user"."other_id" IS '其他系统编号';