Browse Source

add websocket server

459242451@qq.com 3 years ago
parent
commit
f1c1fa85c7

+ 2 - 1
ruoyi-framework/src/main/java/com/ruoyi/framework/config/SecurityConfig.java

@@ -101,7 +101,8 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
                 "/*.html",
                 "/**/*.html",
                 "/**/*.css",
-                "/**/*.js"
+                "/**/*.js",
+                "/websocket/**"
             ).permitAll()
             .antMatchers("/doc.html").anonymous()
             .antMatchers("/swagger-resources/**").anonymous()

+ 2 - 2
ruoyi-framework/src/main/java/com/ruoyi/framework/web/service/TokenService.java

@@ -65,7 +65,7 @@ public class TokenService {
             }
         } else {
             // 获取网关传过来的用户信息
-            String userStr = request.getHeader("user");
+            /*String userStr = request.getHeader("user");
             userStr = URLDecoder.decode(userStr, "UTF-8");
             JSONObject userJsonObject = new JSONObject(userStr);
             JSONObject principalObject = userJsonObject.getJSONObject("principal");
@@ -75,7 +75,7 @@ public class TokenService {
             userDTO.setOrgId(principalObject.getStr("orgId"));
             userDTO.setOrgName(principalObject.getStr("orgName"));
             userDTO.setRoles(Convert.toList(String.class, principalObject.get("permissions")));
-            System.out.println("网关用户信息:"+userDTO);
+            System.out.println("网关用户信息:"+userDTO);*/
 //            return userDTO;
         }
         return null;

+ 48 - 0
ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/SemaphoreUtils.java

@@ -0,0 +1,48 @@
+package com.ruoyi.framework.websocket;
+
+import java.util.concurrent.Semaphore;
+
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 信号量相关处理
+ *
+ * @author ruoyi
+ */
+@Slf4j
+public class SemaphoreUtils {
+
+    /**
+     * 获取信号量
+     *
+     * @param semaphore
+     * @return
+     */
+    public static boolean tryAcquire(Semaphore semaphore) {
+        boolean flag = false;
+
+        try {
+            flag = semaphore.tryAcquire();
+        } catch (Exception e) {
+            log.error("获取信号量异常", e);
+        }
+
+        return flag;
+    }
+
+    /**
+     * 释放信号量
+     *
+     * @param semaphore
+     */
+    public static void release(Semaphore semaphore) {
+
+        try {
+            semaphore.release();
+        } catch (Exception e) {
+            log.error("释放信号量异常", e);
+        }
+    }
+}

+ 18 - 0
ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketConfig.java

@@ -0,0 +1,18 @@
+package com.ruoyi.framework.websocket;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+/**
+ * websocket 配置
+ *
+ */
+@Configuration
+public class WebSocketConfig {
+
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+}

+ 111 - 0
ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketServer.java

@@ -0,0 +1,111 @@
+package com.ruoyi.framework.websocket;
+
+import java.util.concurrent.Semaphore;
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.Session;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+
+import cn.hutool.json.JSONObject;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.WebSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * websocket 消息处理
+ *
+ * @author ruoyi
+ */
+@Component
+@ServerEndpoint("/websocket/message/{userId}")
+@Slf4j
+public class WebSocketServer {
+
+    /**
+     * 默认最多允许同时在线人数100
+     */
+    public static int socketMaxOnlineCount = 100;
+
+    private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(Session session, @PathParam(value = "userId") String userId) throws Exception {
+        boolean semaphoreFlag;
+        // 尝试获取信号量
+        semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
+        if (!semaphoreFlag) {
+            // 未获取到信号量
+            log.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);
+            WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);
+            session.close();
+        } else {
+            // 添加用户
+            WebSocketUsers.put(userId, session);
+            log.info("\n 建立连接 - {}", session);
+            log.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());
+            WebSocketUsers.sendMessageToUserByText(session, "连接成功");
+        }
+    }
+
+    /**
+     * 连接关闭时处理
+     */
+    @OnClose
+    public void onClose(Session session) {
+        log.info("\n 关闭连接 - {}", session);
+        // 移除用户
+        WebSocketUsers.remove(session.getId());
+        // 获取到信号量则需释放
+        SemaphoreUtils.release(socketSemaphore);
+    }
+
+    /**
+     * 抛出异常时处理
+     */
+    @OnError
+    public void onError(Session session, Throwable exception) throws Exception {
+        if (session.isOpen()) {
+            // 关闭连接
+            session.close();
+        }
+        String sessionId = session.getId();
+        log.info("\n 连接异常 - {}\n异常信息 - {}", sessionId, exception);
+        // 移出用户
+        WebSocketUsers.remove(sessionId);
+        // 获取到信号量则需释放
+        SemaphoreUtils.release(socketSemaphore);
+    }
+
+    /**
+     * 服务器接收到客户端消息时调用的方法
+     */
+    @OnMessage
+    public void onMessage(String message, Session session) {
+        log.info("【websocket消息】收到客户端消息:{}", message);
+        JSONObject obj = new JSONObject();
+        obj.set(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);//业务类型
+        obj.set(WebsocketConst.MSG_TXT, "心跳响应");//消息内容
+        WebSocketUsers.sendMessageToUserByText(session, obj.toString());
+    }
+
+    // 此为单点消息
+    public void sendOneMessage(String userId, String message) {
+        WebSocketUsers.sendMessageToSingleUserByText(userId, message);
+    }
+
+    // 此为单点消息(多人)
+    public void sendMoreMessage(String[] userIds, String message) {
+        for (String userId : userIds) {
+            WebSocketUsers.sendMessageToSingleUserByText(userId, message);
+        }
+
+    }
+}

+ 122 - 0
ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketUsers.java

@@ -0,0 +1,122 @@
+package com.ruoyi.framework.websocket;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.websocket.Session;
+
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * websocket 客户端用户集
+ *
+ * @author ruoyi
+ */
+@Slf4j
+public class WebSocketUsers {
+
+    /**
+     * 用户集
+     */
+    private static Map<String, Session> USERS = new ConcurrentHashMap<>();
+
+    /**
+     * 存储用户
+     *
+     * @param key     唯一键
+     * @param session 用户信息
+     */
+    public static void put(String key, Session session) {
+        USERS.put(key, session);
+    }
+
+    /**
+     * 移除用户
+     *
+     * @param session 用户信息
+     * @return 移除结果
+     */
+    public static boolean remove(Session session) {
+        String key = null;
+        boolean flag = USERS.containsValue(session);
+        if (flag) {
+            Set<Map.Entry<String, Session>> entries = USERS.entrySet();
+            for (Map.Entry<String, Session> entry : entries) {
+                Session value = entry.getValue();
+                if (value.equals(session)) {
+                    key = entry.getKey();
+                    break;
+                }
+            }
+        } else {
+            return true;
+        }
+        return remove(key);
+    }
+
+    /**
+     * 移出用户
+     *
+     * @param key 键
+     */
+    public static boolean remove(String key) {
+        log.info("\n 正在移出用户 - {}", key);
+        Session remove = USERS.remove(key);
+        if (remove != null) {
+            boolean containsValue = USERS.containsValue(remove);
+            log.info("\n 移出结果 - {}", containsValue ? "失败" : "成功");
+            return containsValue;
+        } else {
+            return true;
+        }
+    }
+
+    /**
+     * 获取在线用户列表
+     *
+     * @return 返回用户集合
+     */
+    public static Map<String, Session> getUsers() {
+        return USERS;
+    }
+
+    /**
+     * 群发消息文本消息
+     *
+     * @param message 消息内容
+     */
+    public static void sendMessageToUsersByText(String message) {
+        Collection<Session> values = USERS.values();
+        for (Session value : values) {
+            sendMessageToUserByText(value, message);
+        }
+    }
+
+    /**
+     * 发送文本消息
+     *
+     * @param session 自己的用户名
+     * @param message 消息内容
+     */
+    public static void sendMessageToUserByText(Session session, String message) {
+        if (session != null) {
+            session.getAsyncRemote().sendText(message);
+        } else {
+            log.info("\n[你已离线]");
+        }
+    }
+
+    public static void sendMessageToSingleUserByText(String userId, String message) {
+        log.info("【websocket消息】发送给:{}, 单点消息:{}", userId, message);
+        Session session = USERS.get(userId);
+        if (session != null) {
+            session.getAsyncRemote().sendText(message);
+        } else {
+            log.info("\n[你已离线]");
+        }
+    }
+}

+ 59 - 0
ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebsocketConst.java

@@ -0,0 +1,59 @@
+package com.ruoyi.framework.websocket;
+
+/**
+ * @Description: Websocket常量类
+ */
+public class WebsocketConst {
+
+
+    /**
+     * 消息json key:cmd
+     */
+    public static final String MSG_CMD = "cmd";
+
+    /**
+     * 消息json key:msgId
+     */
+    public static final String MSG_ID = "msgId";
+
+    /**
+     * 消息json key:msgTxt
+     */
+    public static final String MSG_TXT = "msgTxt";
+
+    /**
+     * 消息json key:userId
+     */
+    public static final String MSG_USER_ID = "userId";
+
+    /**
+     * 消息类型 heartcheck
+     */
+    public static final String CMD_CHECK = "heartcheck";
+
+    /**
+     * 消息类型 user 用户消息
+     */
+    public static final String CMD_USER = "user";
+
+    /**
+     * 消息类型 topic 系统通知
+     */
+    public static final String CMD_TOPIC = "topic";
+
+    /**
+     * 消息类型 email
+     */
+    public static final String CMD_EMAIL = "email";
+
+    /**
+     * 消息类型 meetingsign 会议签到
+     */
+    public static final String CMD_SIGN = "sign";
+
+    /**
+     * 消息类型 新闻发布/取消
+     */
+    public static final String NEWS_PUBLISH = "publish";
+
+}