UWBVideoTraceSocketServer.java 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package com.ruoyi.bd.socket;
  2. import java.io.IOException;
  3. import java.util.concurrent.ConcurrentHashMap;
  4. import java.util.concurrent.atomic.AtomicInteger;
  5. import javax.annotation.PostConstruct;
  6. import javax.websocket.*;
  7. import javax.websocket.server.PathParam;
  8. import javax.websocket.server.ServerEndpoint;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.stereotype.Component;
  12. import org.springframework.web.bind.annotation.CrossOrigin;
  13. @ServerEndpoint("/ws/video/trace/{client}/{device}")
  14. @CrossOrigin(origins = "*")
  15. @Component
  16. public class UWBVideoTraceSocketServer {
  17. private static final Logger logger = LoggerFactory.getLogger(UWBVideoTraceSocketServer.class);
  18. // 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
  19. private static AtomicInteger onlineNum = new AtomicInteger();
  20. // concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
  21. private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
  22. public static final String SESSION_KEY_TEMPLATE = "%s-%s";
  23. @PostConstruct
  24. public void init() {
  25. logger.info("WebSocketServer init");
  26. }
  27. // 发送消息
  28. public void sendMessage(Session session, String message) throws IOException {
  29. if (session != null) {
  30. synchronized (session) {
  31. logger.info("发送数据:{}", message);
  32. session.getBasicRemote().sendText(message);
  33. }
  34. }
  35. }
  36. // 群发消息
  37. public void broadcast(String message) {
  38. for (Session session : sessionPools.values()) {
  39. try {
  40. sendMessage(session, message);
  41. }
  42. catch (Exception e) {
  43. logger.info("server get {}", e.getMessage());
  44. }
  45. }
  46. }
  47. // 建立连接成功调用
  48. @OnOpen
  49. public void onOpen(Session session, @PathParam(value = "client") String client,
  50. @PathParam(value = "device") String device) {
  51. sessionPools.put(String.format(SESSION_KEY_TEMPLATE, client, device), session);
  52. addOnlineCount();
  53. }
  54. // 关闭连接时调用
  55. @OnClose
  56. public void onClose( @PathParam(value = "client") String client,
  57. @PathParam(value = "device") String device) {
  58. sessionPools.remove(String.format(SESSION_KEY_TEMPLATE, client, device));
  59. subOnlineCount();
  60. logger.info("{}断开webSocket连接!当前人数为:{}", device, onlineNum);
  61. }
  62. // 收到客户端信息后,根据接收人的username把消息推下去或者群发
  63. // to=-1群发消息
  64. @OnMessage
  65. public void onMessage(String message) throws IOException {
  66. logger.info("server get {}", message);
  67. }
  68. // 错误时调用
  69. @OnError
  70. public void onError(Session session, Throwable throwable) {
  71. logger.info("server get {}", throwable.getMessage());
  72. }
  73. public static void addOnlineCount() {
  74. onlineNum.incrementAndGet();
  75. }
  76. public static void subOnlineCount() {
  77. onlineNum.decrementAndGet();
  78. }
  79. public static AtomicInteger getOnlineNumber() {
  80. return onlineNum;
  81. }
  82. public static ConcurrentHashMap<String, Session> getSessionPools() {
  83. return sessionPools;
  84. }
  85. }