MqttServiceImpl.java 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. package com.ruoyi.data.service.impl;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.json.JSONArray;
  4. import cn.hutool.json.JSONObject;
  5. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  6. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  7. import com.ruoyi.common.utils.StringUtils;
  8. import com.ruoyi.common.utils.mqtt.MQTTConnect;
  9. import com.ruoyi.data.domain.*;
  10. import com.ruoyi.data.domain.bo.TblMqttBo;
  11. import com.ruoyi.data.domain.bo.TblSensorRecordBo;
  12. import com.ruoyi.data.domain.vo.TblMqttVo;
  13. import com.ruoyi.data.domain.vo.TblRecordVo;
  14. import com.ruoyi.data.domain.vo.TblSensorRecordVo;
  15. import com.ruoyi.data.mapper.*;
  16. import com.ruoyi.data.service.MqttService;
  17. import lombok.RequiredArgsConstructor;
  18. import lombok.extern.slf4j.Slf4j;
  19. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  20. import org.eclipse.paho.client.mqttv3.MqttCallback;
  21. import org.eclipse.paho.client.mqttv3.MqttMessage;
  22. import org.springframework.beans.factory.annotation.Value;
  23. import org.springframework.stereotype.Service;
  24. import sun.management.Sensor;
  25. import java.text.SimpleDateFormat;
  26. import java.util.Base64;
  27. import java.util.Date;
  28. import java.util.List;
  29. import java.util.Map;
  30. @RequiredArgsConstructor
  31. @Service
  32. @Slf4j
  33. public class MqttServiceImpl implements MqttService {
  34. private final TblMqttMapper tblMqttMapper;
  35. private final TblEquipmentMqttMapper tblEquipmentMqttMapper;
  36. private final TblRecordMapper tblRecordMapper;
  37. private final TblSensorMapper tblSensorMapper;
  38. private final TblSensorRecordMapper tblSensorRecordMapper;
  39. @Value("${mqtt.url}")
  40. private String mqttUrl;
  41. @Value("${mqtt.clientid}")
  42. private String clientID;
  43. @Value("${mqtt.user}")
  44. private String mqttUser;
  45. @Value("${mqtt.password}")
  46. private String mqttPassword;
  47. @Value("${mqtt.topic}")
  48. private String mqttTopic;
  49. // @Value("${mqtt.url}")
  50. // private String mqttUrl;
  51. @Override
  52. public void pubMqttData(String mqttStr) {
  53. JSONObject jsonObject = new JSONObject(mqttStr);
  54. Long sensorId = Long.valueOf((String) jsonObject.get("sensorId"));
  55. TblSensor tblSensor = tblSensorMapper.selectById(sensorId);
  56. Long deviceId = tblSensor.getDeviceId();
  57. SimpleDateFormat formatter= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  58. Date date = new Date(System.currentTimeMillis());
  59. JSONObject mqttMsg = new JSONObject();
  60. mqttMsg.put("deviceSn",deviceId);
  61. mqttMsg.put("sensorSn",sensorId);
  62. mqttMsg.put("sensorType",tblSensor.getSensorType());
  63. mqttMsg.put("created_time",formatter.format(date));
  64. JSONArray sensorData = new JSONArray();
  65. JSONArray dataArry = new JSONArray(jsonObject.get("data"));
  66. for(Object obj: dataArry){
  67. JSONObject dataObj = new JSONObject(obj);
  68. dataObj.remove("gcdinfo");
  69. sensorData.add(dataObj);
  70. }
  71. mqttMsg.put("sensorData",sensorData);
  72. MqttObj mqttObj = new MqttObj();
  73. mqttObj.setEquipmentId(deviceId);
  74. List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(mqttObj);
  75. TblRecord tblRecord = new TblRecord();
  76. tblRecord.setEquipmentId(deviceId);
  77. tblRecord.setJson(mqttStr);
  78. tblRecord.setCreateBy("admin");
  79. tblRecord.setUpdateBy("admin");
  80. tblRecord.setSensorId(sensorId);
  81. tblRecordMapper.insert(tblRecord);
  82. // TblSensorRecordBo tblSensorRecord = new TblSensorRecordBo();
  83. // tblSensorRecord.setEquipmentId(deviceId);
  84. // tblSensorRecord.setJson(mqttStr);
  85. // tblSensorRecord.setCreateBy("admin");
  86. // tblSensorRecord.setUpdateBy("admin");
  87. // tblSensorRecord.setSensorId(sensorId);
  88. // updateNowRecord(tblSensorRecord);
  89. // tblSensorRecordMapper.insert(tblSensorRecord);
  90. String protocolType = "";
  91. for(MqttObj obj:mqttObjList){
  92. if(obj.getStatus() == 1) {
  93. JSONObject topicObj = obj.getTopicQos("tcp");
  94. MQTTConnect mqttConnect = new MQTTConnect();
  95. try {
  96. mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback());
  97. if (topicObj != null) {
  98. String topic = topicObj.get("name").toString().replace("#", "");
  99. mqttConnect.pub(topic, mqttMsg.toString(), Integer.valueOf((String) topicObj.get("qos")));
  100. } else {
  101. String topic = "sensor/modbustcp/" + deviceId;
  102. mqttConnect.pub(topic, mqttMsg.toString(), 0);
  103. }
  104. } catch (Exception e) {
  105. e.printStackTrace();
  106. }
  107. }
  108. }
  109. }
  110. @Override
  111. public void createMqttMain(TblMqttBo bo){
  112. MQTTConnect mqttConnect = new MQTTConnect();
  113. try {
  114. System.out.println(mqttUrl);
  115. mqttConnect.createMqttClient(mqttUrl,clientID,mqttUser,mqttPassword,new Callback());
  116. mqttConnect.sub(mqttTopic);
  117. }catch (Exception e){
  118. e.printStackTrace();
  119. }
  120. }
  121. public void updateNowRecord(TblSensorRecordBo bo){
  122. TblSensorRecordBo tblSensorRecordBo = new TblSensorRecordBo();
  123. tblSensorRecordBo.setSensorId(bo.getSensorId());
  124. LambdaQueryWrapper<TblSensorRecord> lqw = buildSensorRecordWrapper(bo);
  125. List<TblSensorRecordVo> tblSensorRecordVoList = tblSensorRecordMapper.selectVoList(lqw);
  126. if(tblSensorRecordVoList.size() > 0 ){
  127. bo.setId(tblSensorRecordVoList.get(0).getId());
  128. TblSensorRecord update = BeanUtil.toBean(bo, TblSensorRecord.class);
  129. validEntityBeforeSave(update);
  130. tblSensorRecordMapper.updateById(update);
  131. }else{
  132. TblSensorRecord add = BeanUtil.toBean(bo, TblSensorRecord.class);
  133. validEntityBeforeSave(add);
  134. tblSensorRecordMapper.insert(add);
  135. }
  136. }
  137. private void validEntityBeforeSave(TblSensorRecord entity){
  138. //TODO 做一些数据校验,如唯一约束
  139. }
  140. @Override
  141. public void createMqtt(TblMqttBo bo){
  142. LambdaQueryWrapper<TblMqtt> lqw = buildQueryWrapper(bo);
  143. List<TblMqttVo> mqttVoList = tblMqttMapper.selectVoList(lqw);
  144. for(TblMqttVo mqttVo:mqttVoList){
  145. MQTTConnect mqttConnect = new MQTTConnect();
  146. try {
  147. mqttConnect.createMqttClient(mqttVo.getServerAddress(),mqttVo.getUuid(),mqttVo.getAccount(),mqttVo.getPassword(),new Callback());
  148. }catch (Exception e){
  149. e.printStackTrace();
  150. }
  151. }
  152. }
  153. @Override
  154. public void pubOrder(OrderBean orderBean){
  155. JSONObject jsonObject = new JSONObject();
  156. jsonObject.put("deviceId",orderBean.getDeviceId());
  157. jsonObject.put("add",orderBean.getAdd());
  158. jsonObject.put("value",orderBean.getValue());
  159. jsonObject.put("addrOffset",orderBean.getAddrOffset());
  160. jsonObject.put("len",orderBean.getValue().split(",").length);
  161. MQTTConnect mqttConnect = new MQTTConnect();
  162. try {
  163. mqttConnect.createMqttClient("ws://52.130.249.112:8083/mqtt","adminTest","ship","ship@2021.11.24",new Callback());
  164. mqttConnect.pub("control",jsonObject.toString(),0);
  165. }catch (Exception e){
  166. e.printStackTrace();
  167. }
  168. }
  169. private LambdaQueryWrapper<TblMqtt> buildQueryWrapper(TblMqttBo bo) {
  170. Map<String, Object> params = bo.getParams();
  171. LambdaQueryWrapper<TblMqtt> lqw = Wrappers.lambdaQuery();
  172. lqw.like(StringUtils.isNotBlank(bo.getProtocolName()), TblMqtt::getProtocolName, bo.getProtocolName());
  173. lqw.eq(StringUtils.isNotBlank(bo.getProtocolDesc()), TblMqtt::getProtocolDesc, bo.getProtocolDesc());
  174. lqw.eq(StringUtils.isNotBlank(bo.getProtocolType()), TblMqtt::getProtocolType, bo.getProtocolType());
  175. lqw.eq(StringUtils.isNotBlank(bo.getServerAddress()), TblMqtt::getServerAddress, bo.getServerAddress());
  176. lqw.eq(StringUtils.isNotBlank(bo.getServerTopic()), TblMqtt::getServerTopic, bo.getServerTopic());
  177. lqw.eq(StringUtils.isNotBlank(bo.getAccount()), TblMqtt::getAccount, bo.getAccount());
  178. lqw.eq(StringUtils.isNotBlank(bo.getPassword()), TblMqtt::getPassword, bo.getPassword());
  179. lqw.eq(StringUtils.isNotBlank(bo.getUuid()), TblMqtt::getUuid, bo.getUuid());
  180. // lqw.eq(bo.getStatus() != null, TblMqtt::getStatus, bo.getStatus());
  181. return lqw;
  182. }
  183. class Callback implements MqttCallback {
  184. /**
  185. * MQTT 断开连接会执行此方法
  186. */
  187. @Override
  188. public void connectionLost(Throwable throwable) {
  189. log.info("断开了MQTT连接111 :{}", throwable.getMessage());
  190. log.error(throwable.getMessage(), throwable);
  191. }
  192. /**
  193. * publish发布成功后会执行到这里
  194. */
  195. @Override
  196. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  197. log.info("发布消息成功");
  198. }
  199. /**
  200. * subscribe订阅后得到的消息会执行到这里
  201. */
  202. @Override
  203. public void messageArrived(String topic, MqttMessage message) throws Exception {
  204. // TODO 此处可以将订阅得到的消息进行业务处理、数据存储
  205. String payload = String.valueOf(message.getPayload());
  206. // String msg = Byte.toString(message.getPayload());
  207. byte[] bytes = message.getPayload();
  208. String encoded = Base64.getEncoder().encodeToString(bytes);
  209. byte[] decoded = Base64.getDecoder().decode(encoded);
  210. String msg =new String(decoded);
  211. System.out.println(msg);
  212. log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
  213. MqttServiceImpl.this.pubMqttData(msg);
  214. }
  215. }
  216. private LambdaQueryWrapper<TblSensorRecord> buildSensorRecordWrapper(TblSensorRecordBo bo) {
  217. Map<String, Object> params = bo.getParams();
  218. LambdaQueryWrapper<TblSensorRecord> lqw = Wrappers.lambdaQuery();
  219. lqw.eq(bo.getSensorId() != null, TblSensorRecord::getSensorId, bo.getSensorId());
  220. lqw.eq(StringUtils.isNotBlank(bo.getJson()), TblSensorRecord::getJson, bo.getJson());
  221. lqw.eq(bo.getEquipmentId() != null, TblSensorRecord::getEquipmentId, bo.getEquipmentId());
  222. return lqw;
  223. }
  224. }