QueueUtils.java 7.4 KB


  1. package com.ruoyi.common.utils.redis;
  2. import com.ruoyi.common.utils.spring.SpringUtils;
  3. import lombok.AccessLevel;
  4. import lombok.NoArgsConstructor;
  5. import org.redisson.api.*;
  6. import java.util.concurrent.TimeUnit;
  7. import java.util.function.Consumer;
  8. /**
  9. * 分布式队列工具
  10. * 轻量级队列 重量级数据量 请使用 MQ
  11. * 要求 redis 5.X 以上
  12. *
  13. * @author Lion Li
  14. * @version 3.6.0 新增
  15. */
  16. @NoArgsConstructor(access = AccessLevel.PRIVATE)
  17. public class QueueUtils {
  18. private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
  19. /**
  20. * 获取客户端实例
  21. */
  22. public static RedissonClient getClient() {
  23. return CLIENT;
  24. }
  25. /**
  26. * 添加普通队列数据
  27. *
  28. * @param queueName 队列名
  29. * @param data 数据
  30. */
  31. public static <T> boolean addQueueObject(String queueName, T data) {
  32. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  33. return queue.offer(data);
  34. }
  35. /**
  36. * 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)
  37. *
  38. * @param queueName 队列名
  39. */
  40. public static <T> T getQueueObject(String queueName) {
  41. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  42. return queue.poll();
  43. }
  44. /**
  45. * 通用删除队列数据(不支持延迟队列)
  46. */
  47. public static <T> boolean removeQueueObject(String queueName, T data) {
  48. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  49. return queue.remove(data);
  50. }
  51. /**
  52. * 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)
  53. */
  54. public static <T> boolean destroyQueue(String queueName) {
  55. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  56. return queue.delete();
  57. }
  58. /**
  59. * 添加延迟队列数据 默认毫秒
  60. *
  61. * @param queueName 队列名
  62. * @param data 数据
  63. * @param time 延迟时间
  64. */
  65. public static <T> void addDelayedQueueObject(String queueName, T data, long time) {
  66. addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);
  67. }
  68. /**
  69. * 添加延迟队列数据
  70. *
  71. * @param queueName 队列名
  72. * @param data 数据
  73. * @param time 延迟时间
  74. * @param timeUnit 单位
  75. */
  76. public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
  77. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  78. RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
  79. delayedQueue.offer(data, time, timeUnit);
  80. }
  81. /**
  82. * 获取一个延迟队列数据 没有数据返回 null
  83. *
  84. * @param queueName 队列名
  85. */
  86. public static <T> T getDelayedQueueObject(String queueName) {
  87. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  88. RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
  89. return delayedQueue.poll();
  90. }
  91. /**
  92. * 删除延迟队列数据
  93. */
  94. public static <T> boolean removeDelayedQueueObject(String queueName, T data) {
  95. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  96. RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
  97. return delayedQueue.remove(data);
  98. }
  99. /**
  100. * 销毁延迟队列 所有阻塞监听 报错
  101. */
  102. public static <T> void destroyDelayedQueue(String queueName) {
  103. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  104. RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
  105. delayedQueue.destroy();
  106. }
  107. /**
  108. * 添加优先队列数据
  109. *
  110. * @param queueName 队列名
  111. * @param data 数据
  112. */
  113. public static <T> boolean addPriorityQueueObject(String queueName, T data) {
  114. RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
  115. return priorityBlockingQueue.offer(data);
  116. }
  117. /**
  118. * 优先队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
  119. *
  120. * @param queueName 队列名
  121. */
  122. public static <T> T getPriorityQueueObject(String queueName) {
  123. RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
  124. return queue.poll();
  125. }
  126. /**
  127. * 优先队列删除队列数据(不支持延迟队列)
  128. */
  129. public static <T> boolean removePriorityQueueObject(String queueName, T data) {
  130. RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
  131. return queue.remove(data);
  132. }
  133. /**
  134. * 优先队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
  135. */
  136. public static <T> boolean destroyPriorityQueue(String queueName) {
  137. RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
  138. return queue.delete();
  139. }
  140. /**
  141. * 尝试设置 有界队列 容量 用于限制数量
  142. *
  143. * @param queueName 队列名
  144. * @param capacity 容量
  145. */
  146. public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {
  147. RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
  148. return boundedBlockingQueue.trySetCapacity(capacity);
  149. }
  150. /**
  151. * 尝试设置 有界队列 容量 用于限制数量
  152. *
  153. * @param queueName 队列名
  154. * @param capacity 容量
  155. * @param destroy 已存在是否销毁
  156. */
  157. public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
  158. RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
  159. if (boundedBlockingQueue.isExists() && destroy) {
  160. destroyQueue(queueName);
  161. }
  162. return boundedBlockingQueue.trySetCapacity(capacity);
  163. }
  164. /**
  165. * 添加有界队列数据
  166. *
  167. * @param queueName 队列名
  168. * @param data 数据
  169. * @return 添加成功 true 已达到界限 false
  170. */
  171. public static <T> boolean addBoundedQueueObject(String queueName, T data) {
  172. RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
  173. return boundedBlockingQueue.offer(data);
  174. }
  175. /**
  176. * 有界队列获取一个队列数据 没有数据返回 null(不支持延迟队列)
  177. *
  178. * @param queueName 队列名
  179. */
  180. public static <T> T getBoundedQueueObject(String queueName) {
  181. RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
  182. return queue.poll();
  183. }
  184. /**
  185. * 有界队列删除队列数据(不支持延迟队列)
  186. */
  187. public static <T> boolean removeBoundedQueueObject(String queueName, T data) {
  188. RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
  189. return queue.remove(data);
  190. }
  191. /**
  192. * 有界队列销毁队列 所有阻塞监听 报错(不支持延迟队列)
  193. */
  194. public static <T> boolean destroyBoundedQueue(String queueName) {
  195. RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
  196. return queue.delete();
  197. }
  198. /**
  199. * 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等)
  200. */
  201. public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) {
  202. RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
  203. queue.subscribeOnElements(consumer);
  204. }
  205. }