Jelajahi Sumber

新增点位信息WebSocket服务并集成到系统中

- 新增PointWebSocketServer类,用于处理WebSocket连接和消息发送
- 在PointFusionEngine中集成WebSocket服务,实现位置信息实时推送
- 更新pom.xml文件,添加WebSocket相关的依赖- 修改SecurityConfig,允许WebSocket相关URL的匿名访问
- 新增WebSocketConfig配置类,注册ServerEndpointExporter
chen.cheng 9 bulan lalu
induk
melakukan
d96c048269

+ 4 - 0
bd-location/pom.xml

@@ -57,6 +57,10 @@
             <groupId>net.dreamlu</groupId>
             <artifactId>mica-mqtt-client-spring-boot-starter</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

+ 9 - 0
bd-location/src/main/java/com/ruoyi/bd/service/engine/impl/PointFusionEngine.java

@@ -4,6 +4,7 @@ import com.ruoyi.bd.domain.BdDevcTrailUwb;
 import com.ruoyi.bd.service.IBdDevcTrailUwbService;
 import com.ruoyi.bd.service.engine.EvtFusionEngine;
 import com.ruoyi.bd.service.engine.LocationInfo;
+import com.ruoyi.bd.socket.PointWebSocketServer;
 import com.ruoyi.common.BDConst;
 import com.ruoyi.common.utils.DateTimeUtil;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -38,6 +39,9 @@ public class PointFusionEngine extends EvtFusionEngine {
      */
     private static final int MAX_POINT_TIME_GAP = 2;
 
+    @Autowired
+    private PointWebSocketServer webSocketServer;
+
 
     @PostConstruct
     public void init() {
@@ -71,4 +75,9 @@ public class PointFusionEngine extends EvtFusionEngine {
         bdDevcTrailUwbService.insertBdDevcTrailUwb(bdDevcTrailUwb);
         msg.setBizId(bdDevcTrailUwb.getId().toString());
     }
+
+    @Override
+    public void newEvtCallback(LocationInfo msg) {
+        webSocketServer.sendInfo(msg.getMsg().getString("deviceId"), msg.getMsg().toJSONString());
+    }
 }

+ 110 - 0
bd-location/src/main/java/com/ruoyi/bd/socket/PointWebSocketServer.java

@@ -0,0 +1,110 @@
+package com.ruoyi.bd.socket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.CrossOrigin;
+
+import javax.annotation.PostConstruct;
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.Session;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@ServerEndpoint("/ws/point/{device}")
+@CrossOrigin(origins = "*")
+@Component
+public class PointWebSocketServer {
+    private static final Logger logger = LoggerFactory.getLogger(PointWebSocketServer.class);
+    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
+    private static AtomicInteger onlineNum = new AtomicInteger();
+    //concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
+    private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
+
+    @PostConstruct
+    public void init() {
+        logger.info("WebSocketServer init");
+    }
+
+    //发送消息
+    public void sendMessage(Session session, String message) throws IOException {
+        if (session != null) {
+            synchronized (session) {
+                logger.info("发送数据:{}", message);
+                session.getBasicRemote().sendText(message);
+            }
+        }
+    }
+
+    //给指定用户发送信息
+    public void sendInfo(String userName, String message) {
+        Session session = sessionPools.get(userName);
+        if (session == null) return;
+        try {
+            sendMessage(session, message);
+        } catch (Exception e) {
+            logger.info("server get {}", e.getMessage());
+        }
+    }
+
+    // 群发消息
+    public void broadcast(String message) {
+        for (Session session : sessionPools.values()) {
+            try {
+                sendMessage(session, message);
+            } catch (Exception e) {
+                logger.info("server get {}", e.getMessage());
+            }
+        }
+    }
+
+    //建立连接成功调用
+    @OnOpen
+    public void onOpen(Session session, @PathParam(value = "device") String device) {
+        sessionPools.put(device, session);
+        addOnlineCount();
+    }
+
+    //关闭连接时调用
+    @OnClose
+    public void onClose(@PathParam(value = "device") String device) {
+        sessionPools.remove(device);
+        subOnlineCount();
+        logger.info("{}断开webSocket连接!当前人数为:{}", device, onlineNum);
+    }
+
+    //收到客户端信息后,根据接收人的username把消息推下去或者群发
+    // to=-1群发消息
+    @OnMessage
+    public void onMessage(String message) throws IOException {
+        logger.info("server get {}", message);
+    }
+
+    //错误时调用
+    @OnError
+    public void onError(Session session, Throwable throwable) {
+        logger.info("server get {}", throwable.getMessage());
+    }
+
+    public static void addOnlineCount() {
+        onlineNum.incrementAndGet();
+    }
+
+    public static void subOnlineCount() {
+        onlineNum.decrementAndGet();
+    }
+
+    public static AtomicInteger getOnlineNumber() {
+        return onlineNum;
+    }
+
+    public static ConcurrentHashMap<String, Session> getSessionPools() {
+        return sessionPools;
+    }
+}

+ 5 - 2
ruoyi-framework/pom.xml

@@ -58,7 +58,10 @@
             <groupId>com.ruoyi</groupId>
             <artifactId>ruoyi-system</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
     </dependencies>
 
-</project>
+</project>

+ 31 - 37
ruoyi-framework/src/main/java/com/ruoyi/framework/config/SecurityConfig.java

@@ -28,8 +28,7 @@ import org.springframework.web.filter.CorsFilter;
  */
 @EnableMethodSecurity(prePostEnabled = true, securedEnabled = true)
 @Configuration
-public class SecurityConfig
-{
+public class SecurityConfig {
     /**
      * 自定义用户认证逻辑
      */
@@ -71,8 +70,7 @@ public class SecurityConfig
      * 身份验证实现
      */
     @Bean
-    public AuthenticationManager authenticationManager()
-    {
+    public AuthenticationManager authenticationManager() {
         DaoAuthenticationProvider daoAuthenticationProvider = new DaoAuthenticationProvider();
         daoAuthenticationProvider.setUserDetailsService(userDetailsService);
         daoAuthenticationProvider.setPasswordEncoder(bCryptPasswordEncoder());
@@ -95,46 +93,42 @@ public class SecurityConfig
      * authenticated       |   用户登录后可访问
      */
     @Bean
-    protected SecurityFilterChain filterChain(HttpSecurity httpSecurity) throws Exception
-    {
+    protected SecurityFilterChain filterChain(HttpSecurity httpSecurity) throws Exception {
         return httpSecurity
-            // CSRF禁用,因为不使用session
-            .csrf(csrf -> csrf.disable())
-            // 禁用HTTP响应标头
-            .headers((headersCustomizer) -> {
-                headersCustomizer.cacheControl(cache -> cache.disable()).frameOptions(options -> options.sameOrigin());
-            })
-            // 认证失败处理类
-            .exceptionHandling(exception -> exception.authenticationEntryPoint(unauthorizedHandler))
-            // 基于token,所以不需要session
-            .sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
-            // 注解标记允许匿名访问的url
-            .authorizeHttpRequests((requests) -> {
-                permitAllUrl.getUrls().forEach(url -> requests.antMatchers(url).permitAll());
-                // 对于登录login 注册register 验证码captchaImage 允许匿名访问
-                requests.antMatchers("/login", "/register", "/captchaImage").permitAll()
-                    // 静态资源,可匿名访问
-                    .antMatchers(HttpMethod.GET, "/", "/*.html", "/**/*.html", "/**/*.css", "/**/*.js", "/profile/**").permitAll()
-                    .antMatchers("/swagger-ui.html", "/swagger-resources/**", "/webjars/**", "/*/api-docs", "/druid/**").permitAll()
-                    // 除上面外的所有请求全部需要鉴权认证
-                    .anyRequest().authenticated();
-            })
-            // 添加Logout filter
-            .logout(logout -> logout.logoutUrl("/logout").logoutSuccessHandler(logoutSuccessHandler))
-            // 添加JWT filter
-            .addFilterBefore(authenticationTokenFilter, UsernamePasswordAuthenticationFilter.class)
-            // 添加CORS filter
-            .addFilterBefore(corsFilter, JwtAuthenticationTokenFilter.class)
-            .addFilterBefore(corsFilter, LogoutFilter.class)
-            .build();
+                // CSRF禁用,因为不使用session
+                .csrf(csrf -> csrf.disable())
+                // 禁用HTTP响应标头
+                .headers((headersCustomizer) -> {
+                    headersCustomizer.cacheControl(cache -> cache.disable()).frameOptions(options -> options.sameOrigin());
+                })
+                // 认证失败处理类
+                .exceptionHandling(exception -> exception.authenticationEntryPoint(unauthorizedHandler))
+                // 基于token,所以不需要session
+                .sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
+                // 注解标记允许匿名访问的url
+                .authorizeHttpRequests((requests) -> {
+                    permitAllUrl.getUrls().forEach(url -> requests.antMatchers(url).permitAll());
+                    // 对于登录login 注册register 验证码captchaImage 允许匿名访问
+                    requests.antMatchers("/login", "/register", "/captchaImage").permitAll().antMatchers("/ws/**", "/websocket/**").permitAll()
+                            // 静态资源,可匿名访问
+                            .antMatchers(HttpMethod.GET, "/", "/*.html", "/**/*.html", "/**/*.css", "/**/*.js", "/profile/**").permitAll().antMatchers("/swagger-ui.html", "/swagger-resources/**", "/webjars/**", "/*/api-docs", "/druid/**").permitAll()
+                            // 除上面外的所有请求全部需要鉴权认证
+                            .anyRequest().authenticated();
+                })
+
+                // 添加Logout filter
+                .logout(logout -> logout.logoutUrl("/logout").logoutSuccessHandler(logoutSuccessHandler))
+                // 添加JWT filter
+                .addFilterBefore(authenticationTokenFilter, UsernamePasswordAuthenticationFilter.class)
+                // 添加CORS filter
+                .addFilterBefore(corsFilter, JwtAuthenticationTokenFilter.class).addFilterBefore(corsFilter, LogoutFilter.class).build();
     }
 
     /**
      * 强散列哈希加密实现
      */
     @Bean
-    public BCryptPasswordEncoder bCryptPasswordEncoder()
-    {
+    public BCryptPasswordEncoder bCryptPasswordEncoder() {
         return new BCryptPasswordEncoder();
     }
 }

+ 18 - 0
ruoyi-framework/src/main/java/com/ruoyi/framework/websocket/WebSocketConfig.java

@@ -0,0 +1,18 @@
+package com.ruoyi.framework.websocket;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+/**
+ * websocket 配置
+ *
+ * @author ruoyi
+ */
+@Configuration
+public class WebSocketConfig {
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+}