| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- package com.ruoyi.common.utils.mqtt;
- import com.ruoyi.common.utils.cache.CacheManager;
- import com.ruoyi.common.utils.cache.CacheManagerEntity;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- /**
- * MQTT工具类操作
- *
- * @author Mr.Qu
- * @since v1.1.0 2020-01-10
- */
- @Slf4j
- @Component
- public class MQTTConnect {
- private MqttClient mqttClient;
- /**
- * 客户端connect连接mqtt服务器
- *
- * @param userName 用户名
- * @param passWord 密码
- * @param mqttCallback 回调函数
- **/
- public void setMqttClient(String Host,String clientId,String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
- MqttConnectOptions options = mqttConnectOptions(Host,clientId,userName, passWord);
- if (mqttCallback == null) {
- mqttClient.setCallback(new Callback());
- } else {
- mqttClient.setCallback(mqttCallback);
- }
- mqttClient.connect(options);
- String key = "mqtt"+clientId;
- CacheManagerEntity cacheManagerEntity = new CacheManagerEntity(mqttClient);
- CacheManager.putCache(key,cacheManagerEntity);
- }
- /**
- * MQTT连接参数设置
- */
- private MqttConnectOptions mqttConnectOptions(String host,String clientId,String userName, String passWord) throws MqttException {
- mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
- MqttConnectOptions options = new MqttConnectOptions();
- options.setUserName(userName);
- options.setPassword(passWord.toCharArray());
- options.setConnectionTimeout(10);///默认:30
- options.setAutomaticReconnect(true);//默认:false
- options.setCleanSession(false);//默认:true
- //options.setKeepAliveInterval(20);//默认:60
- return options;
- }
- /**
- * 关闭MQTT连接
- */
- public void close() throws MqttException {
- mqttClient.disconnect();
- mqttClient.close();
- }
- /**
- * 向某个主题发布消息 默认qos:1
- *
- * @param topic:发布的主题
- * @param msg:发布的消息
- */
- public void pub(String topic, String msg) throws MqttException {
- MqttMessage mqttMessage = new MqttMessage();
- //mqttMessage.setQos(2);
- mqttMessage.setPayload(msg.getBytes());
- MqttTopic mqttTopic = mqttClient.getTopic(topic);
- MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
- token.waitForCompletion();
- }
- /**
- * 向某个主题发布消息
- *
- * @param topic: 发布的主题
- * @param msg: 发布的消息
- * @param qos: 消息质量 Qos:0、1、2
- */
- public void pub(String topic, String msg, int qos) throws MqttException {
- MqttMessage mqttMessage = new MqttMessage();
- mqttMessage.setQos(qos);
- mqttMessage.setPayload(msg.getBytes());
- MqttTopic mqttTopic = mqttClient.getTopic(topic);
- MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
- token.waitForCompletion();
- }
- public void createMqttConnect(String Host,String clientId,String userName, String passWord) throws MqttException {
- MQTTConnect mqttConnect = new MQTTConnect();
- mqttConnect.setMqttClient(Host, clientId, userName,passWord,new Callback());
- }
- /**
- * 订阅某一个主题 ,此方法默认的的Qos等级为:1
- *
- * @param topic 主题
- */
- public void sub(String topic) throws MqttException {
- mqttClient.subscribe(topic);
- }
- /**
- * 订阅某一个主题,可携带Qos
- *
- * @param topic 所要订阅的主题
- * @param qos 消息质量:0、1、2
- */
- public void sub(String topic, int qos) throws MqttException {
- mqttClient.subscribe(topic, qos);
- }
- public void pubMqttClient(String Host,String clientId,String userName, String passWord,String topic,int qos,String msg) throws MqttException {
- MQTTConnect mqttConnect = new MQTTConnect();
- MqttClient mqttClientOld = (MqttClient) CacheManager.getCacheDataByKey("mqtt"+clientId);
- if(mqttClientOld != null){
- mqttConnect.setMqttClient(mqttClientOld);
- //mqttClient = mqttClientOld;
- }else{
- mqttConnect.setMqttClient(Host, clientId, userName,passWord,new Callback());
- }
- mqttConnect.pub(topic,msg,qos);
- }
- public MqttClient getMqttClient() {
- return mqttClient;
- }
- public void setMqttClient(MqttClient mqttClient) {
- this.mqttClient = mqttClient;
- }
- /**
- * main函数自己测试用
- */
- // public static void main(String[] args) throws MqttException {
- // MQTTConnect mqttConnect = new MQTTConnect();
- // mqttConnect.setMqttClient("admin", "public", new Callback());
- // mqttConnect.sub("com/iot/init");
- // mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000));
- // }
- }
|