Przeglądaj źródła

+ SocketServer注解自动注册

chen.cheng 6 miesięcy temu
rodzic
commit
cd0f2071a2

+ 31 - 0
bd-park/park-backend/park-application/src/main/java/com/huashe/park/application/socket/cfg/SessionHandshakeInterceptor.java

@@ -0,0 +1,31 @@
+package com.huashe.park.application.socket.cfg;
+
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.http.server.ServletServerHttpRequest;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
+
+import javax.servlet.http.HttpServletRequest;
+import java.util.Enumeration;
+import java.util.Map;
+
+public class SessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
+    // 拦截器返回false,则不会进行websocket协议的转换
+    // 可以获取请求参数做认证鉴权
+    @Override
+    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
+        HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
+        // 将所有查询参数复制到 WebSocketSession 属性映射
+        Enumeration<String> parameterNames = servletRequest.getParameterNames();
+        while (parameterNames.hasMoreElements()) {
+            String parameterName = parameterNames.nextElement();
+            // 放入的值可以从WebSocketHandler里面的WebSocketSession拿出来
+            attributes.put(parameterName, servletRequest.getParameter(parameterName));
+        }
+        return true;
+    }
+    @Override
+    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
+    }
+}

+ 101 - 0
bd-park/park-backend/park-common/src/main/java/com/huashe/park/common/websocket/SocketHandle.java

@@ -0,0 +1,101 @@
+package com.huashe.park.common.websocket;
+
+import com.alibaba.fastjson2.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.socket.BinaryMessage;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.PongMessage;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.AbstractWebSocketHandler;
+import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class SocketHandle extends AbstractWebSocketHandler implements ISocketHandle {
+    private static final Logger log = LoggerFactory.getLogger(SocketHandle.class);
+    private final Map<String, WebSocketSession> webSocketSessionMap = new ConcurrentHashMap<>();
+
+    //成功连接时
+    @Override
+    public void afterConnectionEstablished(WebSocketSession session) {
+        String sessionKey = custSessionKey(session);
+        webSocketSessionMap.put(sessionKey, new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64000));
+    }
+
+    @Override
+    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
+        super.handleTextMessage(session, message);
+        // 有消息就广播下
+        for (Map.Entry<String, WebSocketSession> entry : webSocketSessionMap.entrySet()) {
+            String s = entry.getKey();
+            WebSocketSession webSocketSession = entry.getValue();
+            if (webSocketSession.isOpen()) {
+                webSocketSession.sendMessage(new TextMessage(message.getPayload()));
+                log.info("send to {} msg:{}", s, message.getPayload());
+            }
+        }
+    }
+
+
+    @Override
+    public void sendMessage(String sessionKey, JSONObject message) {
+        WebSocketSession webSocketSession = webSocketSessionMap.get(sessionKey);
+        if (webSocketSession != null && webSocketSession.isOpen()) {
+            try {
+                webSocketSession.sendMessage(new TextMessage(message.toJSONString()));
+            } catch (Exception e) {
+                log.error("sendMessage error:{}", e.getMessage(), e);
+            }
+        }
+    }
+
+    //报错时
+    @Override
+    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
+        super.handleTransportError(session, exception);
+        log.warn("handleTransportError:: sessionId: {} {}", session.getId(), exception.getMessage(), exception);
+        if (session.isOpen()) {
+            try {
+                session.close();
+            } catch (Exception ex) {
+                // ignore
+            }
+        }
+    }
+
+    //连接关闭时
+    @Override
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+        super.afterConnectionClosed(session, status);
+        webSocketSessionMap.remove(session.getId());
+        if (session.isOpen()) {
+            try {
+                session.close();
+            } catch (Exception ex) {
+                // ignore
+            }
+        }
+    }
+
+    //处理binarymessage
+    @Override
+    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
+        super.handleBinaryMessage(session, message);
+    }
+
+    //处理pong
+    @Override
+    protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
+        super.handlePongMessage(session, message);
+    }
+
+    //是否支持报文拆包
+    // 决定是否接受半包,因为websocket协议比较底层,好像Tcp协议一样,如果发送大消息可能会拆成多个小报文。如果不希望处理不完整的报文,希望底层帮忙聚合成完整消息将此方法返回false,这样底层会等待完整报文到达聚合后才回调。
+    @Override
+    public boolean supportsPartialMessages() {
+        return super.supportsPartialMessages();
+    }
+}