瀏覽代碼

+ 新增uwb视频跟随socketserver

chen.cheng 6 月之前
父節點
當前提交
98ec50aa39

+ 2 - 1
bd-park/park-backend/park-application/src/main/java/com/huashe/park/application/socket/cfg/SessionHandshakeInterceptor.java

@@ -14,7 +14,7 @@ public class SessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor
     // 拦截器返回false,则不会进行websocket协议的转换
     // 可以获取请求参数做认证鉴权
     @Override
-    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
+    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
         HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
         // 将所有查询参数复制到 WebSocketSession 属性映射
         Enumeration<String> parameterNames = servletRequest.getParameterNames();
@@ -25,6 +25,7 @@ public class SessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor
         }
         return true;
     }
+
     @Override
     public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
     }

+ 48 - 0
bd-park/park-backend/park-application/src/main/java/com/huashe/park/application/socket/cfg/WebSocketConfig.java

@@ -0,0 +1,48 @@
+package com.huashe.park.application.socket.cfg;
+
+import com.huashe.park.common.SpringBeanUtils;
+import com.huashe.park.common.websocket.SocketEndPoint;
+import com.huashe.park.common.websocket.SocketHandle;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.annotation.AnnotationUtils;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
+
+import java.util.Map;
+
+@Configuration
+@EnableWebSocket
+public class WebSocketConfig implements WebSocketConfigurer {
+    @Override
+    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+        Map<String, WebSocketHandler> beansOfType = SpringBeanUtils.getBeansOfType(WebSocketHandler.class);
+        beansOfType.forEach((k, v) -> {
+            SocketEndPoint annotation = AnnotationUtils.findAnnotation(v.getClass(), SocketEndPoint.class);
+            registry.addHandler(v, annotation.url())
+                    // 添加拦截器,可以获取连接的param和 header 用作认证鉴权
+                    .addInterceptors(new SessionHandshakeInterceptor())
+                    // 设置运行跨域
+                    .setAllowedOrigins("*");
+        });
+    }
+
+    @Bean
+    public ServletServerContainerFactoryBean createWebSocketContainer() {
+        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
+        // 设置默认会话空闲超时 以毫秒为单位 非正值意味着无限超时,默认值 0 ,默认没10s检查一次空闲就关闭
+        container.setMaxSessionIdleTimeout(10 * 1000L);
+        // 设置异步发送消息的默认超时时间 以毫秒为单位 非正值意味着无限超时 ,默认值-1,还没看到作用
+        container.setAsyncSendTimeout(10 * 1000L);
+        // 设置文本消息的默认最大缓冲区大小 以字符为单位,默认值 8 * 1024
+        container.setMaxTextMessageBufferSize(8 * 1024);
+        // 设置二进制消息的默认最大缓冲区大小 以字节为单位,默认值 8 * 1024
+        container.setMaxBinaryMessageBufferSize(8 * 1024);
+        return container;
+    }
+
+}

+ 0 - 13
bd-park/park-backend/park-application/src/main/java/com/huashe/park/application/socket/server/TestSocket.java

@@ -1,13 +0,0 @@
-package com.huashe.park.application.socket.server;
-
-import com.huashe.park.common.websocket.SocketEndPoint;
-import com.huashe.park.common.websocket.SocketHandle;
-import org.springframework.web.socket.WebSocketSession;
-
-@SocketEndPoint(url = "/pkb/test")
-public class TestSocket extends SocketHandle {
-    @Override
-    public String custSessionKey(WebSocketSession session) {
-        return "test";
-    }
-}

+ 0 - 14
bd-park/park-backend/park-application/src/main/java/com/huashe/park/application/socket/server/TestSocket1.java

@@ -1,14 +0,0 @@
-package com.huashe.park.application.socket.server;
-
-import com.huashe.park.common.websocket.SocketEndPoint;
-import com.huashe.park.common.websocket.SocketHandle;
-import org.springframework.stereotype.Service;
-import org.springframework.web.socket.WebSocketSession;
-
-@SocketEndPoint(url = "/pkd/test1")
-public class TestSocket1 extends SocketHandle {
-    @Override
-    public String custSessionKey(WebSocketSession session) {
-        return session.getId();
-    }
-}

+ 0 - 102
bd-park/park-backend/park-application/src/main/java/com/huashe/park/application/socket/server/UWBVideoTraceSocketServer.java

@@ -1,102 +0,0 @@
-package com.huashe.park.application.socket.server;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-import org.springframework.web.bind.annotation.CrossOrigin;
-
-import javax.annotation.PostConstruct;
-import javax.websocket.*;
-import javax.websocket.server.PathParam;
-import javax.websocket.server.ServerEndpoint;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-@ServerEndpoint("/ws/video/trace/{client}/{device}")
-@CrossOrigin(origins = "*")
-@Component
-public class UWBVideoTraceSocketServer {
-    private static final Logger logger = LoggerFactory.getLogger(UWBVideoTraceSocketServer.class);
-
-    // 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
-    private static AtomicInteger onlineNum = new AtomicInteger();
-
-    // concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
-    private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
-
-    public static final String SESSION_KEY_TEMPLATE = "%s-%s";
-
-    @PostConstruct
-    public void init() {
-        logger.info("WebSocketServer init");
-    }
-
-    // 发送消息
-    public void sendMessage(Session session, String message) throws IOException {
-        if (session != null) {
-            synchronized (session) {
-                logger.info("发送数据:{}", message);
-                session.getBasicRemote().sendText(message);
-            }
-        }
-    }
-
-    // 群发消息
-    public void broadcast(String message) {
-        for (Session session : sessionPools.values()) {
-            try {
-                sendMessage(session, message);
-            }
-            catch (Exception e) {
-                logger.info("server get {}", e.getMessage());
-            }
-        }
-    }
-
-    // 建立连接成功调用
-    @OnOpen
-    public void onOpen(Session session, @PathParam(value = "client") String client,
-        @PathParam(value = "device") String device) {
-        sessionPools.put(String.format(SESSION_KEY_TEMPLATE, client, device), session);
-        addOnlineCount();
-    }
-
-    // 关闭连接时调用
-    @OnClose
-    public void onClose( @PathParam(value = "client") String client,
-                         @PathParam(value = "device") String device) {
-        sessionPools.remove(String.format(SESSION_KEY_TEMPLATE, client, device));
-        subOnlineCount();
-        logger.info("{}断开webSocket连接!当前人数为:{}", device, onlineNum);
-    }
-
-    // 收到客户端信息后,根据接收人的username把消息推下去或者群发
-    // to=-1群发消息
-    @OnMessage
-    public void onMessage(String message) throws IOException {
-        logger.info("server get {}", message);
-    }
-
-    // 错误时调用
-    @OnError
-    public void onError(Session session, Throwable throwable) {
-        logger.info("server get {}", throwable.getMessage());
-    }
-
-    public static void addOnlineCount() {
-        onlineNum.incrementAndGet();
-    }
-
-    public static void subOnlineCount() {
-        onlineNum.decrementAndGet();
-    }
-
-    public static AtomicInteger getOnlineNumber() {
-        return onlineNum;
-    }
-
-    public static ConcurrentHashMap<String, Session> getSessionPools() {
-        return sessionPools;
-    }
-}

+ 39 - 0
bd-park/park-backend/park-application/src/main/java/com/huashe/park/application/socket/server/UwbVideoTraceSocketServer.java

@@ -0,0 +1,39 @@
+package com.huashe.park.application.socket.server;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.springframework.web.socket.WebSocketSession;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.huashe.park.common.websocket.SocketEndPoint;
+import com.huashe.park.common.websocket.SocketHandle;
+
+@SocketEndPoint(url = "/pkb/uwb/trail")
+public class UwbVideoTraceSocketServer extends SocketHandle {
+    private static final String SESSION_KEY_TEMPLATE = "%s-%s";
+
+    private static final String UWB_KEY = "deviceId";
+
+    private static final String CLIENT = "client";
+
+    @Override
+    public String custSessionKey(WebSocketSession session) {
+        Map<String, Object> attributes = session.getAttributes();
+        return String.format(SESSION_KEY_TEMPLATE, attributes.get(CLIENT), attributes.get(UWB_KEY));
+    }
+
+    @Override
+    public void sendMessage(String sessionKey, JSONObject message) {
+        super.sendMessage(sessionKey, message);
+    }
+
+    @Override
+    public List<WebSocketSession> filterSession(String sessionKey, Map<String, WebSocketSession> webSocketSessionMap) {
+        return webSocketSessionMap.entrySet().stream()
+            .filter(entry -> entry.getKey().contains(sessionKey)).map(Map.Entry::getValue).collect(Collectors.toList());
+    }
+
+}

+ 5 - 6
bd-park/park-backend/park-application/src/main/java/com/huashe/park/application/web/controller/bd/BdFenceInfoController.java

@@ -1,8 +1,7 @@
 package com.huashe.park.application.web.controller.bd;
 
-import com.alibaba.fastjson2.JSON;
 import com.huashe.common.domain.AjaxResult;
-import com.huashe.park.application.socket.server.TestSocket;
+import com.huashe.park.application.socket.server.UwbVideoTraceSocketServer;
 import com.huashe.park.core.service.IBdFenceInfoService;
 import com.huashe.park.domain.entity.BdFenceInfo;
 import com.ruoyi.common.annotation.Anonymous;
@@ -41,16 +40,16 @@ public class BdFenceInfoController extends BaseController {
     private IBdFenceInfoService bdFenceInfoService;
 
     @Resource
-    private TestSocket testSocket;
+    private UwbVideoTraceSocketServer uwbVideoTraceSocket;
 
     /**
      * 查询围栏基础信息列表
      */
     @GetMapping("/list")
     public TableDataInfo list(BdFenceInfo bdFenceInfo) {
-        testSocket.sendMessage("test", JSON.parseObject(JSON.toJSONString(new BdFenceInfo() {{
-            setAltitude(22);
-        }})));
+//        testSocket.sendMessage("test", JSON.parseObject(JSON.toJSONString(new BdFenceInfo() {{
+//            setAltitude(22);
+//        }})));
         startPage();
         List<BdFenceInfo> list = bdFenceInfoService.selectBdFenceInfoList(bdFenceInfo);
         return getDataTable(list);

+ 28 - 25
bd-park/park-backend/park-common/src/main/java/com/huashe/park/common/websocket/SocketHandle.java

@@ -1,8 +1,12 @@
 package com.huashe.park.common.websocket;
 
-import com.alibaba.fastjson2.JSONObject;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
 import org.springframework.web.socket.BinaryMessage;
 import org.springframework.web.socket.CloseStatus;
 import org.springframework.web.socket.PongMessage;
@@ -11,14 +15,14 @@ import org.springframework.web.socket.WebSocketSession;
 import org.springframework.web.socket.handler.AbstractWebSocketHandler;
 import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import com.alibaba.fastjson2.JSONObject;
 
 public abstract class SocketHandle extends AbstractWebSocketHandler implements ISocketHandle {
     private static final Logger log = LoggerFactory.getLogger(SocketHandle.class);
+
     private final Map<String, WebSocketSession> webSocketSessionMap = new ConcurrentHashMap<>();
 
-    //成功连接时
+    // 成功连接时
     @Override
     public void afterConnectionEstablished(WebSocketSession session) {
         String sessionKey = custSessionKey(session);
@@ -28,31 +32,28 @@ public abstract class SocketHandle extends AbstractWebSocketHandler implements I
     @Override
     protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
         super.handleTextMessage(session, message);
-        // 有消息就广播下
-        for (Map.Entry<String, WebSocketSession> entry : webSocketSessionMap.entrySet()) {
-            String s = entry.getKey();
-            WebSocketSession webSocketSession = entry.getValue();
-            if (webSocketSession.isOpen()) {
-                webSocketSession.sendMessage(new TextMessage(message.getPayload()));
-                log.info("send to {} msg:{}", s, message.getPayload());
-            }
-        }
     }
 
-
     @Override
     public void sendMessage(String sessionKey, JSONObject message) {
-        WebSocketSession webSocketSession = webSocketSessionMap.get(sessionKey);
-        if (webSocketSession != null && webSocketSession.isOpen()) {
+        List<WebSocketSession> filterSessions = filterSession(sessionKey, webSocketSessionMap);
+        if (CollectionUtils.isEmpty(filterSessions)) {
+            return;
+        }
+        filterSessions.forEach(webSocketSession -> {
             try {
                 webSocketSession.sendMessage(new TextMessage(message.toJSONString()));
-            } catch (Exception e) {
+            }
+            catch (Exception e) {
                 log.error("sendMessage error:{}", e.getMessage(), e);
             }
-        }
+        });
     }
 
-    //报错时
+    public abstract List<WebSocketSession> filterSession(String sessionKey,
+        Map<String, WebSocketSession> webSocketSessionMap);
+
+    // 报错时
     @Override
     public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
         super.handleTransportError(session, exception);
@@ -60,13 +61,14 @@ public abstract class SocketHandle extends AbstractWebSocketHandler implements I
         if (session.isOpen()) {
             try {
                 session.close();
-            } catch (Exception ex) {
+            }
+            catch (Exception ex) {
                 // ignore
             }
         }
     }
 
-    //连接关闭时
+    // 连接关闭时
     @Override
     public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
         super.afterConnectionClosed(session, status);
@@ -74,25 +76,26 @@ public abstract class SocketHandle extends AbstractWebSocketHandler implements I
         if (session.isOpen()) {
             try {
                 session.close();
-            } catch (Exception ex) {
+            }
+            catch (Exception ex) {
                 // ignore
             }
         }
     }
 
-    //处理binarymessage
+    // 处理binarymessage
     @Override
     protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
         super.handleBinaryMessage(session, message);
     }
 
-    //处理pong
+    // 处理pong
     @Override
     protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
         super.handlePongMessage(session, message);
     }
 
-    //是否支持报文拆包
+    // 是否支持报文拆包
     // 决定是否接受半包,因为websocket协议比较底层,好像Tcp协议一样,如果发送大消息可能会拆成多个小报文。如果不希望处理不完整的报文,希望底层帮忙聚合成完整消息将此方法返回false,这样底层会等待完整报文到达聚合后才回调。
     @Override
     public boolean supportsPartialMessages() {