MQTTConnect.java 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package com.ruoyi.common.utils.mqtt;
  2. import com.ruoyi.common.utils.cache.CacheManager;
  3. import com.ruoyi.common.utils.cache.CacheManagerEntity;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.eclipse.paho.client.mqttv3.*;
  6. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.stereotype.Component;
  9. /**
  10. * MQTT工具类操作
  11. *
  12. * @author Mr.Qu
  13. * @since v1.1.0 2020-01-10
  14. */
  15. @Slf4j
  16. @Component
  17. public class MQTTConnect {
  18. private MqttClient mqttClient;
  19. /**
  20. * 客户端connect连接mqtt服务器
  21. *
  22. * @param userName 用户名
  23. * @param passWord 密码
  24. * @param mqttCallback 回调函数
  25. **/
  26. public void setMqttClient(String Host,String clientId,String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
  27. MqttConnectOptions options = mqttConnectOptions(Host,clientId,userName, passWord);
  28. if (mqttCallback == null) {
  29. mqttClient.setCallback(new Callback());
  30. } else {
  31. mqttClient.setCallback(mqttCallback);
  32. }
  33. mqttClient.connect(options);
  34. String key = "mqtt"+clientId;
  35. CacheManagerEntity cacheManagerEntity = new CacheManagerEntity(mqttClient);
  36. CacheManager.putCache(key,cacheManagerEntity);
  37. }
  38. /**
  39. * MQTT连接参数设置
  40. */
  41. private MqttConnectOptions mqttConnectOptions(String host,String clientId,String userName, String passWord) throws MqttException {
  42. mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
  43. MqttConnectOptions options = new MqttConnectOptions();
  44. options.setUserName(userName);
  45. options.setPassword(passWord.toCharArray());
  46. options.setConnectionTimeout(10);///默认:30
  47. options.setAutomaticReconnect(true);//默认:false
  48. options.setCleanSession(false);//默认:true
  49. //options.setKeepAliveInterval(20);//默认:60
  50. return options;
  51. }
  52. /**
  53. * 关闭MQTT连接
  54. */
  55. public void close() throws MqttException {
  56. mqttClient.disconnect();
  57. mqttClient.close();
  58. }
  59. /**
  60. * 向某个主题发布消息 默认qos:1
  61. *
  62. * @param topic:发布的主题
  63. * @param msg:发布的消息
  64. */
  65. public void pub(String topic, String msg) throws MqttException {
  66. MqttMessage mqttMessage = new MqttMessage();
  67. //mqttMessage.setQos(2);
  68. mqttMessage.setPayload(msg.getBytes());
  69. MqttTopic mqttTopic = mqttClient.getTopic(topic);
  70. MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
  71. token.waitForCompletion();
  72. }
  73. /**
  74. * 向某个主题发布消息
  75. *
  76. * @param topic: 发布的主题
  77. * @param msg: 发布的消息
  78. * @param qos: 消息质量 Qos:0、1、2
  79. */
  80. public void pub(String topic, String msg, int qos) throws MqttException {
  81. MqttMessage mqttMessage = new MqttMessage();
  82. mqttMessage.setQos(qos);
  83. mqttMessage.setPayload(msg.getBytes());
  84. MqttTopic mqttTopic = mqttClient.getTopic(topic);
  85. MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
  86. token.waitForCompletion();
  87. }
  88. public void createMqttConnect(String Host,String clientId,String userName, String passWord) throws MqttException {
  89. MQTTConnect mqttConnect = new MQTTConnect();
  90. mqttConnect.setMqttClient(Host, clientId, userName,passWord,new Callback());
  91. }
  92. /**
  93. * 订阅某一个主题 ,此方法默认的的Qos等级为:1
  94. *
  95. * @param topic 主题
  96. */
  97. public void sub(String topic) throws MqttException {
  98. mqttClient.subscribe(topic);
  99. }
  100. /**
  101. * 订阅某一个主题,可携带Qos
  102. *
  103. * @param topic 所要订阅的主题
  104. * @param qos 消息质量:0、1、2
  105. */
  106. public void sub(String topic, int qos) throws MqttException {
  107. mqttClient.subscribe(topic, qos);
  108. }
  109. public void pubMqttClient(String Host,String clientId,String userName, String passWord,String topic,int qos,String msg) throws MqttException {
  110. MQTTConnect mqttConnect = new MQTTConnect();
  111. MqttClient mqttClientOld = (MqttClient) CacheManager.getCacheDataByKey("mqtt"+clientId);
  112. if(mqttClientOld != null){
  113. mqttConnect.setMqttClient(mqttClientOld);
  114. //mqttClient = mqttClientOld;
  115. }else{
  116. mqttConnect.setMqttClient(Host, clientId, userName,passWord,new Callback());
  117. }
  118. mqttConnect.pub(topic,msg,qos);
  119. }
  120. public MqttClient getMqttClient() {
  121. return mqttClient;
  122. }
  123. public void setMqttClient(MqttClient mqttClient) {
  124. this.mqttClient = mqttClient;
  125. }
  126. /**
  127. * main函数自己测试用
  128. */
  129. // public static void main(String[] args) throws MqttException {
  130. // MQTTConnect mqttConnect = new MQTTConnect();
  131. // mqttConnect.setMqttClient("admin", "public", new Callback());
  132. // mqttConnect.sub("com/iot/init");
  133. // mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000));
  134. // }
  135. }