MqttServiceImpl.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package com.ruoyi.data.service.impl;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.util.StrUtil;
  4. import cn.hutool.json.JSONArray;
  5. import cn.hutool.core.lang.UUID;
  6. import cn.hutool.json.JSONObject;
  7. import cn.hutool.json.JSONUtil;
  8. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  9. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  10. import com.ruoyi.common.core.domain.entity.SysDictData;
  11. import com.ruoyi.common.utils.StringUtils;
  12. import com.ruoyi.common.utils.mqtt.MQTTConnect;
  13. import com.ruoyi.common.utils.redis.CacheUtils;
  14. import com.ruoyi.data.domain.*;
  15. import com.ruoyi.data.domain.bo.TblMqttBo;
  16. import com.ruoyi.data.domain.bo.TblSensorRecordBo;
  17. import com.ruoyi.data.domain.vo.TblMqttVo;
  18. import com.ruoyi.data.domain.vo.TblRecordVo;
  19. import com.ruoyi.data.domain.vo.TblSensorRecordVo;
  20. import com.ruoyi.data.mapper.*;
  21. import com.ruoyi.data.service.MqttService;
  22. import com.ruoyi.data.service.WebsocketService;
  23. import com.ruoyi.system.service.ISysDictTypeService;
  24. import lombok.RequiredArgsConstructor;
  25. import lombok.extern.slf4j.Slf4j;
  26. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  27. import org.eclipse.paho.client.mqttv3.MqttCallback;
  28. import org.eclipse.paho.client.mqttv3.MqttException;
  29. import org.eclipse.paho.client.mqttv3.MqttMessage;
  30. import org.springframework.beans.factory.annotation.Value;
  31. import org.springframework.stereotype.Service;
  32. import sun.management.Sensor;
  33. import javax.annotation.PostConstruct;
  34. import java.text.SimpleDateFormat;
  35. import java.util.Base64;
  36. import java.util.Date;
  37. import java.util.List;
  38. import java.util.Map;
  39. @RequiredArgsConstructor
  40. @Service
  41. @Slf4j
  42. public class MqttServiceImpl implements MqttService {
  43. private final TblMqttMapper tblMqttMapper;
  44. private final TblEquipmentMqttMapper tblEquipmentMqttMapper;
  45. private final TblRecordMapper tblRecordMapper;
  46. private final TblSensorMapper tblSensorMapper;
  47. private final TblSensorRecordMapper tblSensorRecordMapper;
  48. private final ISysDictTypeService sysDictTypeService;
  49. private final WebsocketService websocketService;
  50. @Value("${mqtt.url}")
  51. private String mqttUrl;
  52. @Value("${mqtt.clientid}")
  53. private String clientID;
  54. @Value("${mqtt.user}")
  55. private String mqttUser;
  56. @Value("${mqtt.password}")
  57. private String mqttPassword;
  58. @Value("${mqtt.topic}")
  59. private String mqttTopic;
  60. @Value("${mqtt.saveAndForward}")
  61. private Boolean saveAndForward;
  62. @Override
  63. public void pubMqttData(String mqttStr) {
  64. websocketService.sendMessageAll(mqttStr);
  65. if(saveAndForward) {
  66. JSONObject jsonObject = new JSONObject(mqttStr);
  67. Long sensorId = Long.valueOf((String) jsonObject.get("sensorId"));
  68. TblSensor tblSensor = tblSensorMapper.selectById(sensorId);
  69. Long deviceId = tblSensor.getDeviceId();
  70. SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  71. Date date = new Date(System.currentTimeMillis());
  72. JSONObject mqttMsg = new JSONObject();
  73. mqttMsg.put("deviceSn", deviceId);
  74. mqttMsg.put("sensorSn", sensorId);
  75. mqttMsg.put("sensorType", tblSensor.getSensorType());
  76. mqttMsg.put("created_time", formatter.format(date));
  77. JSONArray sensorData = new JSONArray();
  78. JSONArray dataArry = new JSONArray(jsonObject.get("data"));
  79. for (Object obj : dataArry) {
  80. JSONObject dataObj = new JSONObject(obj);
  81. dataObj.remove("gcdinfo");
  82. sensorData.add(dataObj);
  83. }
  84. mqttMsg.put("sensorData", sensorData);
  85. MqttObj mqttObj = new MqttObj();
  86. mqttObj.setEquipmentId(deviceId);
  87. List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(mqttObj);
  88. TblRecord tblRecord = new TblRecord();
  89. tblRecord.setEquipmentId(deviceId);
  90. tblRecord.setJson(mqttStr);
  91. tblRecord.setCreateBy("admin");
  92. tblRecord.setUpdateBy("admin");
  93. tblRecord.setSensorId(sensorId);
  94. tblRecordMapper.insert(tblRecord);
  95. // TblSensorRecordBo tblSensorRecord = new TblSensorRecordBo();
  96. // tblSensorRecord.setEquipmentId(deviceId);
  97. // tblSensorRecord.setJson(mqttStr);
  98. // tblSensorRecord.setSensorId(sensorId);
  99. // tblSensorRecord.setCreateBy("admin");
  100. // tblSensorRecord.setUpdateBy("admin");
  101. // updateNowRecord(tblSensorRecord);
  102. CacheUtils.put("sensorData",sensorId,mqttMsg.toString());
  103. // tblSensorRecordMapper.insert(tblSensorRecord);
  104. String protocolType = "";
  105. List<SysDictData> sysDictTypeList = sysDictTypeService.selectDictDataByType("protocal_type");
  106. for(SysDictData sysDictData:sysDictTypeList){
  107. if(Long.valueOf(sysDictData.getDictValue()).longValue() == tblSensor.getProtocalType().longValue()){
  108. protocolType = sysDictData.getDictLabel();
  109. }
  110. }
  111. for(MqttObj obj:mqttObjList){
  112. if(obj.getStatus() == 1) {
  113. JSONArray topicObj1 = JSONUtil.parseArray(obj.getServerTopic());
  114. MQTTConnect mqttConnect = new MQTTConnect();
  115. try {
  116. mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback());
  117. if (topicObj1.size() >0) {
  118. String finalProtocolType = protocolType;
  119. topicObj1.forEach(i->{
  120. JSONObject p = (JSONObject)i;
  121. String topic = p.getStr("name");
  122. topic = StrUtil.replace(topic,"$protocolType$", finalProtocolType);
  123. topic = StrUtil.replace(topic,"$sensorId$", sensorId.toString());
  124. try {
  125. mqttConnect.pub(topic, mqttMsg.toString(), (Integer) p.get("qos"));
  126. } catch (MqttException e) {
  127. throw new RuntimeException(e);
  128. }
  129. });
  130. } else {
  131. String topic = "sensor/"+protocolType+"/" + sensorId;
  132. mqttConnect.pub(topic, mqttMsg.toString(), 0);
  133. }
  134. } catch (Exception e) {
  135. e.printStackTrace();
  136. }
  137. }
  138. }
  139. }
  140. }
  141. @Override
  142. public void createMqttMain(TblMqttBo bo){
  143. MQTTConnect mqttConnect = new MQTTConnect();
  144. try {
  145. System.out.println(mqttUrl);
  146. mqttConnect.createMqttClient(mqttUrl, UUID.fastUUID().toString(),mqttUser,mqttPassword,new Callback());
  147. mqttConnect.sub(mqttTopic);
  148. }catch (Exception e){
  149. e.printStackTrace();
  150. }
  151. }
  152. public void updateNowRecord(TblSensorRecordBo bo){
  153. TblSensorRecordBo tblSensorRecordBo = new TblSensorRecordBo();
  154. tblSensorRecordBo.setSensorId(bo.getSensorId());
  155. LambdaQueryWrapper<TblSensorRecord> lqw = buildSensorRecordWrapper(bo);
  156. List<TblSensorRecordVo> tblSensorRecordVoList = tblSensorRecordMapper.selectVoList(lqw);
  157. if(tblSensorRecordVoList.size() > 0 ){
  158. bo.setId(tblSensorRecordVoList.get(0).getId());
  159. TblSensorRecord update = BeanUtil.toBean(bo, TblSensorRecord.class);
  160. validEntityBeforeSave(update);
  161. tblSensorRecordMapper.updateById(update);
  162. }else{
  163. TblSensorRecord add = BeanUtil.toBean(bo, TblSensorRecord.class);
  164. validEntityBeforeSave(add);
  165. tblSensorRecordMapper.insert(add);
  166. }
  167. }
  168. private void validEntityBeforeSave(TblSensorRecord entity){
  169. //TODO 做一些数据校验,如唯一约束
  170. }
  171. @Override
  172. public void createMqtt(TblMqttBo bo){
  173. LambdaQueryWrapper<TblMqtt> lqw = buildQueryWrapper(bo);
  174. List<TblMqttVo> mqttVoList = tblMqttMapper.selectVoList(lqw);
  175. for(TblMqttVo mqttVo:mqttVoList){
  176. MQTTConnect mqttConnect = new MQTTConnect();
  177. try {
  178. mqttConnect.createMqttClient(mqttVo.getServerAddress(),mqttVo.getUuid(),mqttVo.getAccount(),mqttVo.getPassword(),new Callback());
  179. }catch (Exception e){
  180. e.printStackTrace();
  181. }
  182. }
  183. }
  184. @Override
  185. public void pubOrder(OrderBean orderBean){
  186. JSONObject jsonObject = new JSONObject();
  187. jsonObject.put("deviceId",orderBean.getDeviceId());
  188. jsonObject.put("add",orderBean.getAdd());
  189. jsonObject.put("value",orderBean.getValue());
  190. jsonObject.put("addrOffset",orderBean.getAddrOffset());
  191. jsonObject.put("len",orderBean.getValue().split(",").length);
  192. MQTTConnect mqttConnect = new MQTTConnect();
  193. try {
  194. mqttConnect.createMqttClient("ws://52.130.249.112:8083/mqtt","adminTest","ship","ship@2021.11.24",new Callback());
  195. mqttConnect.pub("control",jsonObject.toString(),0);
  196. }catch (Exception e){
  197. e.printStackTrace();
  198. }
  199. }
  200. private LambdaQueryWrapper<TblMqtt> buildQueryWrapper(TblMqttBo bo) {
  201. Map<String, Object> params = bo.getParams();
  202. LambdaQueryWrapper<TblMqtt> lqw = Wrappers.lambdaQuery();
  203. lqw.like(StringUtils.isNotBlank(bo.getProtocolName()), TblMqtt::getProtocolName, bo.getProtocolName());
  204. lqw.eq(StringUtils.isNotBlank(bo.getProtocolDesc()), TblMqtt::getProtocolDesc, bo.getProtocolDesc());
  205. lqw.eq(StringUtils.isNotBlank(bo.getProtocolType()), TblMqtt::getProtocolType, bo.getProtocolType());
  206. lqw.eq(StringUtils.isNotBlank(bo.getServerAddress()), TblMqtt::getServerAddress, bo.getServerAddress());
  207. lqw.eq(StringUtils.isNotBlank(bo.getServerTopic()), TblMqtt::getServerTopic, bo.getServerTopic());
  208. lqw.eq(StringUtils.isNotBlank(bo.getAccount()), TblMqtt::getAccount, bo.getAccount());
  209. lqw.eq(StringUtils.isNotBlank(bo.getPassword()), TblMqtt::getPassword, bo.getPassword());
  210. lqw.eq(StringUtils.isNotBlank(bo.getUuid()), TblMqtt::getUuid, bo.getUuid());
  211. // lqw.eq(bo.getStatus() != null, TblMqtt::getStatus, bo.getStatus());
  212. return lqw;
  213. }
  214. class Callback implements MqttCallback {
  215. /**
  216. * MQTT 断开连接会执行此方法
  217. */
  218. @Override
  219. public void connectionLost(Throwable throwable) {
  220. log.info("断开了MQTT连接111 :{}", throwable.getMessage());
  221. log.error(throwable.getMessage(), throwable);
  222. }
  223. /**
  224. * publish发布成功后会执行到这里
  225. */
  226. @Override
  227. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  228. log.info("发布消息成功");
  229. }
  230. /**
  231. * subscribe订阅后得到的消息会执行到这里
  232. */
  233. @Override
  234. public void messageArrived(String topic, MqttMessage message) throws Exception {
  235. // TODO 此处可以将订阅得到的消息进行业务处理、数据存储
  236. String payload = String.valueOf(message.getPayload());
  237. // String msg = Byte.toString(message.getPayload());
  238. byte[] bytes = message.getPayload();
  239. String encoded = Base64.getEncoder().encodeToString(bytes);
  240. byte[] decoded = Base64.getDecoder().decode(encoded);
  241. String msg =new String(decoded);
  242. System.out.println(msg);
  243. log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
  244. MqttServiceImpl.this.pubMqttData(msg);
  245. }
  246. }
  247. private LambdaQueryWrapper<TblSensorRecord> buildSensorRecordWrapper(TblSensorRecordBo bo) {
  248. Map<String, Object> params = bo.getParams();
  249. LambdaQueryWrapper<TblSensorRecord> lqw = Wrappers.lambdaQuery();
  250. lqw.eq(bo.getSensorId() != null, TblSensorRecord::getSensorId, bo.getSensorId());
  251. lqw.eq(StringUtils.isNotBlank(bo.getJson()), TblSensorRecord::getJson, bo.getJson());
  252. lqw.eq(bo.getEquipmentId() != null, TblSensorRecord::getEquipmentId, bo.getEquipmentId());
  253. return lqw;
  254. }
  255. }