package com.ruoyi.common.utils.mqtt; import com.ruoyi.common.utils.cache.CacheManager; import com.ruoyi.common.utils.cache.CacheManagerEntity; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * MQTT工具类操作 * * @author Mr.Qu * @since v1.1.0 2020-01-10 */ @Slf4j @Component public class MQTTConnect { private MqttClient mqttClient; /** * 客户端connect连接mqtt服务器 * * @param userName 用户名 * @param passWord 密码 * @param mqttCallback 回调函数 **/ public void setMqttClient(String Host,String clientId,String userName, String passWord, MqttCallback mqttCallback) throws MqttException { MqttConnectOptions options = mqttConnectOptions(Host,clientId,userName, passWord); if (mqttCallback == null) { mqttClient.setCallback(new Callback()); } else { mqttClient.setCallback(mqttCallback); } mqttClient.connect(options); String key = "mqtt"+clientId; CacheManagerEntity cacheManagerEntity = new CacheManagerEntity(mqttClient); CacheManager.putCache(key,cacheManagerEntity); } /** * MQTT连接参数设置 */ private MqttConnectOptions mqttConnectOptions(String host,String clientId,String userName, String passWord) throws MqttException { mqttClient = new MqttClient(host, clientId, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(userName); options.setPassword(passWord.toCharArray()); options.setConnectionTimeout(10);///默认:30 options.setAutomaticReconnect(true);//默认:false options.setCleanSession(false);//默认:true //options.setKeepAliveInterval(20);//默认:60 return options; } /** * 关闭MQTT连接 */ public void close() throws MqttException { mqttClient.disconnect(); mqttClient.close(); } /** * 向某个主题发布消息 默认qos:1 * * @param topic:发布的主题 * @param msg:发布的消息 */ public void pub(String topic, String msg) throws MqttException { MqttMessage mqttMessage = new MqttMessage(); //mqttMessage.setQos(2); mqttMessage.setPayload(msg.getBytes()); MqttTopic mqttTopic = mqttClient.getTopic(topic); MqttDeliveryToken token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } /** * 向某个主题发布消息 * * @param topic: 发布的主题 * @param msg: 发布的消息 * @param qos: 消息质量 Qos:0、1、2 */ public void pub(String topic, String msg, int qos) throws MqttException { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(msg.getBytes()); MqttTopic mqttTopic = mqttClient.getTopic(topic); MqttDeliveryToken token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } public void createMqttConnect(String Host,String clientId,String userName, String passWord) throws MqttException { MQTTConnect mqttConnect = new MQTTConnect(); mqttConnect.setMqttClient(Host, clientId, userName,passWord,new Callback()); } /** * 订阅某一个主题 ,此方法默认的的Qos等级为:1 * * @param topic 主题 */ public void sub(String topic) throws MqttException { mqttClient.subscribe(topic); } /** * 订阅某一个主题,可携带Qos * * @param topic 所要订阅的主题 * @param qos 消息质量:0、1、2 */ public void sub(String topic, int qos) throws MqttException { mqttClient.subscribe(topic, qos); } public void pubMqttClient(String Host,String clientId,String userName, String passWord,String topic,int qos,String msg) throws MqttException { MQTTConnect mqttConnect = new MQTTConnect(); MqttClient mqttClientOld = (MqttClient) CacheManager.getCacheDataByKey("mqtt"+clientId); if(mqttClientOld != null){ mqttConnect.setMqttClient(mqttClientOld); //mqttClient = mqttClientOld; }else{ mqttConnect.setMqttClient(Host, clientId, userName,passWord,new Callback()); } mqttConnect.pub(topic,msg,qos); } public MqttClient getMqttClient() { return mqttClient; } public void setMqttClient(MqttClient mqttClient) { this.mqttClient = mqttClient; } /** * main函数自己测试用 */ // public static void main(String[] args) throws MqttException { // MQTTConnect mqttConnect = new MQTTConnect(); // mqttConnect.setMqttClient("admin", "public", new Callback()); // mqttConnect.sub("com/iot/init"); // mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000)); // } }