QueueUtils.java 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package com.ruoyi.common.utils.redis;
  2. import com.ruoyi.common.utils.spring.SpringUtils;
  3. import lombok.AccessLevel;
  4. import lombok.NoArgsConstructor;
  5. import org.redisson.api.*;
  6. import java.util.concurrent.TimeUnit;
  7. import java.util.function.Consumer;
  8. /**
  9. * 分布式队列工具
  10. * 轻量级队列 重量级数据量 请使用 MQ
  11. * 要求 redis 5.X 以上
  12. *
  13. * @author Lion Li
  14. * @version 3.6.0 新增
  15. */
  16. @NoArgsConstructor(access = AccessLevel.PRIVATE)
  17. public class QueueUtils {
  18. private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
  19. /**
  20. * 获取客户端实例
  21. */
  22. public static RedissonClient getClient() {
  23. return CLIENT;
  24. }
  25. /**
  26. * 添加普通队列数据
  27. *
  28. * @param queueName 队列名
  29. * @param data 数据
  30. */
  31. public static <T> boolean addQueueObject(String queueName, T data) {
  32. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  33. return queue.offer(data);
  34. }
  35. /**
  36. * 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)
  37. *
  38. * @param queueName 队列名
  39. */
  40. public static <T> T getQueueObject(String queueName) {
  41. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  42. return queue.poll();
  43. }
  44. /**
  45. * 通用删除队列数据(不支持延迟队列)
  46. */
  47. public static <T> boolean removeQueueObject(String queueName, T data) {
  48. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  49. return queue.remove(data);
  50. }
  51. /**
  52. * 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)
  53. */
  54. public static <T> boolean destroyQueue(String queueName) {
  55. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  56. return queue.delete();
  57. }
  58. /**
  59. * 添加延迟队列数据 默认毫秒
  60. *
  61. * @param queueName 队列名
  62. * @param data 数据
  63. * @param time 延迟时间
  64. */
  65. public static <T> void addDelayedQueueObject(String queueName, T data, long time) {
  66. addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);
  67. }
  68. /**
  69. * 添加延迟队列数据
  70. *
  71. * @param queueName 队列名
  72. * @param data 数据
  73. * @param time 延迟时间
  74. * @param timeUnit 单位
  75. */
  76. public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
  77. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  78. RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
  79. delayedQueue.offer(data, time, timeUnit);
  80. }
  81. /**
  82. * 获取一个延迟队列数据 没有数据返回 null
  83. *
  84. * @param queueName 队列名
  85. */
  86. public static <T> T getDelayedQueueObject(String queueName) {
  87. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  88. RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
  89. return delayedQueue.poll();
  90. }
  91. /**
  92. * 删除延迟队列数据
  93. */
  94. public static <T> boolean removeDelayedQueueObject(String queueName, T data) {
  95. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  96. RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
  97. return delayedQueue.remove(data);
  98. }
  99. /**
  100. * 销毁延迟队列 所有阻塞监听 报错
  101. */
  102. public static <T> void destroyDelayedQueue(String queueName) {
  103. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  104. RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
  105. delayedQueue.destroy();
  106. }
  107. /**
  108. * 添加优先队列数据
  109. *
  110. * @param queueName 队列名
  111. * @param data 数据
  112. */
  113. public static <T> boolean addPriorityQueueObject(String queueName, T data) {
  114. RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
  115. return priorityBlockingQueue.offer(data);
  116. }
  117. /**
  118. * 尝试设置 有界队列 容量 用于限制数量
  119. *
  120. * @param queueName 队列名
  121. * @param capacity 容量
  122. */
  123. public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {
  124. RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
  125. return boundedBlockingQueue.trySetCapacity(capacity);
  126. }
  127. /**
  128. * 尝试设置 有界队列 容量 用于限制数量
  129. *
  130. * @param queueName 队列名
  131. * @param capacity 容量
  132. * @param destroy 已存在是否销毁
  133. */
  134. public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
  135. RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
  136. if (boundedBlockingQueue.isExists() && destroy) {
  137. destroyQueue(queueName);
  138. }
  139. return boundedBlockingQueue.trySetCapacity(capacity);
  140. }
  141. /**
  142. * 添加有界队列数据
  143. *
  144. * @param queueName 队列名
  145. * @param data 数据
  146. * @return 添加成功 true 已达到界限 false
  147. */
  148. public static <T> boolean addBoundedQueueObject(String queueName, T data) {
  149. RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
  150. return boundedBlockingQueue.offer(data);
  151. }
  152. /**
  153. * 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等)
  154. */
  155. public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) {
  156. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  157. queue.subscribeOnElements(consumer);
  158. }
  159. }