MqttServiceImpl.java 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package com.ruoyi.data.service.impl;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.json.JSONObject;
  4. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  5. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  6. import com.ruoyi.common.utils.StringUtils;
  7. import com.ruoyi.common.utils.mqtt.MQTTConnect;
  8. import com.ruoyi.data.domain.*;
  9. import com.ruoyi.data.domain.bo.TblMqttBo;
  10. import com.ruoyi.data.domain.bo.TblSensorRecordBo;
  11. import com.ruoyi.data.domain.vo.TblMqttVo;
  12. import com.ruoyi.data.domain.vo.TblRecordVo;
  13. import com.ruoyi.data.domain.vo.TblSensorRecordVo;
  14. import com.ruoyi.data.mapper.*;
  15. import com.ruoyi.data.service.MqttService;
  16. import lombok.RequiredArgsConstructor;
  17. import lombok.extern.slf4j.Slf4j;
  18. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  19. import org.eclipse.paho.client.mqttv3.MqttCallback;
  20. import org.eclipse.paho.client.mqttv3.MqttMessage;
  21. import org.springframework.beans.factory.annotation.Value;
  22. import org.springframework.stereotype.Service;
  23. import sun.management.Sensor;
  24. import java.text.SimpleDateFormat;
  25. import java.util.Base64;
  26. import java.util.Date;
  27. import java.util.List;
  28. import java.util.Map;
  29. @RequiredArgsConstructor
  30. @Service
  31. @Slf4j
  32. public class MqttServiceImpl implements MqttService {
  33. private final TblMqttMapper tblMqttMapper;
  34. private final TblEquipmentMqttMapper tblEquipmentMqttMapper;
  35. private final TblRecordMapper tblRecordMapper;
  36. private final TblSensorMapper tblSensorMapper;
  37. private final TblSensorRecordMapper tblSensorRecordMapper;
  38. @Value("${mqtt.url}")
  39. private String mqttUrl;
  40. @Value("${mqtt.clientid}")
  41. private String clientID;
  42. @Value("${mqtt.user}")
  43. private String mqttUser;
  44. @Value("${mqtt.password}")
  45. private String mqttPassword;
  46. @Value("${mqtt.topic}")
  47. private String mqttTopic;
  48. // @Value("${mqtt.url}")
  49. // private String mqttUrl;
  50. @Override
  51. public void pubMqttData(String mqttStr) {
  52. JSONObject jsonObject = new JSONObject(mqttStr);
  53. Long sensorId = Long.valueOf((String) jsonObject.get("sensorId"));
  54. TblSensor tblSensor = tblSensorMapper.selectById(sensorId);
  55. Long deviceId = tblSensor.getDeviceId();
  56. SimpleDateFormat formatter= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  57. Date date = new Date(System.currentTimeMillis());
  58. jsonObject.put("created_time",formatter.format(date));
  59. MqttObj mqttObj = new MqttObj();
  60. mqttObj.setEquipmentId(deviceId);
  61. List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(mqttObj);
  62. TblRecord tblRecord = new TblRecord();
  63. tblRecord.setEquipmentId(deviceId);
  64. tblRecord.setSensorId(sensorId);
  65. tblRecord.setJson(mqttStr);
  66. tblRecord.setCreateBy("admin");
  67. tblRecord.setUpdateBy("admin");
  68. tblRecord.setSensorId(sensorId);
  69. tblRecordMapper.insert(tblRecord);
  70. TblSensorRecordBo tblSensorRecord = new TblSensorRecordBo();
  71. tblSensorRecord.setEquipmentId(deviceId);
  72. tblSensorRecord.setJson(mqttStr);
  73. tblSensorRecord.setCreateBy("admin");
  74. tblSensorRecord.setUpdateBy("admin");
  75. tblSensorRecord.setSensorId(sensorId);
  76. updateNowRecord(tblSensorRecord);
  77. // tblSensorRecordMapper.insert(tblSensorRecord);
  78. for(MqttObj obj:mqttObjList){
  79. if(obj.getStatus() == 1) {
  80. JSONObject topicObj = obj.getTopicQos("tcp");
  81. MQTTConnect mqttConnect = new MQTTConnect();
  82. try {
  83. mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback());
  84. if (topicObj != null) {
  85. String topic = topicObj.get("name").toString().replace("#", "");
  86. mqttConnect.pub(topic, jsonObject.toString(), Integer.valueOf((String) topicObj.get("qos")));
  87. } else {
  88. String topic = "sensor/modbustcp/" + deviceId;
  89. mqttConnect.pub(topic, jsonObject.toString(), 0);
  90. }
  91. } catch (Exception e) {
  92. e.printStackTrace();
  93. }
  94. }
  95. }
  96. }
  97. @Override
  98. public void createMqttMain(TblMqttBo bo){
  99. MQTTConnect mqttConnect = new MQTTConnect();
  100. try {
  101. System.out.println(mqttUrl);
  102. mqttConnect.createMqttClient(mqttUrl,clientID,mqttUser,mqttPassword,new Callback());
  103. mqttConnect.sub(mqttTopic);
  104. }catch (Exception e){
  105. e.printStackTrace();
  106. }
  107. }
  108. public void updateNowRecord(TblSensorRecordBo bo){
  109. TblSensorRecordBo tblSensorRecordBo = new TblSensorRecordBo();
  110. tblSensorRecordBo.setSensorId(bo.getSensorId());
  111. LambdaQueryWrapper<TblSensorRecord> lqw = buildSensorRecordWrapper(bo);
  112. List<TblSensorRecordVo> tblSensorRecordVoList = tblSensorRecordMapper.selectVoList(lqw);
  113. if(tblSensorRecordVoList.size() > 0 ){
  114. bo.setId(tblSensorRecordVoList.get(0).getId());
  115. TblSensorRecord update = BeanUtil.toBean(bo, TblSensorRecord.class);
  116. validEntityBeforeSave(update);
  117. tblSensorRecordMapper.updateById(update);
  118. }else{
  119. TblSensorRecord add = BeanUtil.toBean(bo, TblSensorRecord.class);
  120. validEntityBeforeSave(add);
  121. tblSensorRecordMapper.insert(add);
  122. }
  123. }
  124. private void validEntityBeforeSave(TblSensorRecord entity){
  125. //TODO 做一些数据校验,如唯一约束
  126. }
  127. @Override
  128. public void createMqtt(TblMqttBo bo){
  129. LambdaQueryWrapper<TblMqtt> lqw = buildQueryWrapper(bo);
  130. List<TblMqttVo> mqttVoList = tblMqttMapper.selectVoList(lqw);
  131. for(TblMqttVo mqttVo:mqttVoList){
  132. MQTTConnect mqttConnect = new MQTTConnect();
  133. try {
  134. mqttConnect.createMqttClient(mqttVo.getServerAddress(),mqttVo.getUuid(),mqttVo.getAccount(),mqttVo.getPassword(),new Callback());
  135. }catch (Exception e){
  136. e.printStackTrace();
  137. }
  138. }
  139. }
  140. @Override
  141. public void pubOrder(OrderBean orderBean){
  142. JSONObject jsonObject = new JSONObject();
  143. jsonObject.put("deviceId",orderBean.getDeviceId());
  144. jsonObject.put("add",orderBean.getAdd());
  145. jsonObject.put("value",orderBean.getValue());
  146. jsonObject.put("addrOffset",orderBean.getAddrOffset());
  147. jsonObject.put("len",orderBean.getValue().split(",").length);
  148. MQTTConnect mqttConnect = new MQTTConnect();
  149. try {
  150. mqttConnect.createMqttClient("ws://52.130.249.112:8083/mqtt","adminTest","ship","ship@2021.11.24",new Callback());
  151. mqttConnect.pub("control",jsonObject.toString(),0);
  152. }catch (Exception e){
  153. e.printStackTrace();
  154. }
  155. }
  156. private LambdaQueryWrapper<TblMqtt> buildQueryWrapper(TblMqttBo bo) {
  157. Map<String, Object> params = bo.getParams();
  158. LambdaQueryWrapper<TblMqtt> lqw = Wrappers.lambdaQuery();
  159. lqw.like(StringUtils.isNotBlank(bo.getProtocolName()), TblMqtt::getProtocolName, bo.getProtocolName());
  160. lqw.eq(StringUtils.isNotBlank(bo.getProtocolDesc()), TblMqtt::getProtocolDesc, bo.getProtocolDesc());
  161. lqw.eq(StringUtils.isNotBlank(bo.getProtocolType()), TblMqtt::getProtocolType, bo.getProtocolType());
  162. lqw.eq(StringUtils.isNotBlank(bo.getServerAddress()), TblMqtt::getServerAddress, bo.getServerAddress());
  163. lqw.eq(StringUtils.isNotBlank(bo.getServerTopic()), TblMqtt::getServerTopic, bo.getServerTopic());
  164. lqw.eq(StringUtils.isNotBlank(bo.getAccount()), TblMqtt::getAccount, bo.getAccount());
  165. lqw.eq(StringUtils.isNotBlank(bo.getPassword()), TblMqtt::getPassword, bo.getPassword());
  166. lqw.eq(StringUtils.isNotBlank(bo.getUuid()), TblMqtt::getUuid, bo.getUuid());
  167. // lqw.eq(bo.getStatus() != null, TblMqtt::getStatus, bo.getStatus());
  168. return lqw;
  169. }
  170. class Callback implements MqttCallback {
  171. /**
  172. * MQTT 断开连接会执行此方法
  173. */
  174. @Override
  175. public void connectionLost(Throwable throwable) {
  176. log.info("断开了MQTT连接111 :{}", throwable.getMessage());
  177. log.error(throwable.getMessage(), throwable);
  178. }
  179. /**
  180. * publish发布成功后会执行到这里
  181. */
  182. @Override
  183. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  184. log.info("发布消息成功");
  185. }
  186. /**
  187. * subscribe订阅后得到的消息会执行到这里
  188. */
  189. @Override
  190. public void messageArrived(String topic, MqttMessage message) throws Exception {
  191. // TODO 此处可以将订阅得到的消息进行业务处理、数据存储
  192. String payload = String.valueOf(message.getPayload());
  193. // String msg = Byte.toString(message.getPayload());
  194. byte[] bytes = message.getPayload();
  195. String encoded = Base64.getEncoder().encodeToString(bytes);
  196. byte[] decoded = Base64.getDecoder().decode(encoded);
  197. String msg =new String(decoded);
  198. System.out.println(msg);
  199. log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
  200. MqttServiceImpl.this.pubMqttData(msg);
  201. }
  202. }
  203. private LambdaQueryWrapper<TblSensorRecord> buildSensorRecordWrapper(TblSensorRecordBo bo) {
  204. Map<String, Object> params = bo.getParams();
  205. LambdaQueryWrapper<TblSensorRecord> lqw = Wrappers.lambdaQuery();
  206. lqw.eq(bo.getSensorId() != null, TblSensorRecord::getSensorId, bo.getSensorId());
  207. lqw.eq(StringUtils.isNotBlank(bo.getJson()), TblSensorRecord::getJson, bo.getJson());
  208. lqw.eq(bo.getEquipmentId() != null, TblSensorRecord::getEquipmentId, bo.getEquipmentId());
  209. return lqw;
  210. }
  211. }