|
@@ -0,0 +1,172 @@
|
|
|
|
|
+package com.ruoyi.data.service.impl;
|
|
|
|
|
+
|
|
|
|
|
+import cn.hutool.json.JSONObject;
|
|
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
|
|
+import com.ruoyi.common.utils.StringUtils;
|
|
|
|
|
+import com.ruoyi.common.utils.mqtt.MQTTConnect;
|
|
|
|
|
+import com.ruoyi.data.domain.MqttObj;
|
|
|
|
|
+import com.ruoyi.data.domain.OrderBean;
|
|
|
|
|
+import com.ruoyi.data.domain.TblMqtt;
|
|
|
|
|
+import com.ruoyi.data.domain.TblRecord;
|
|
|
|
|
+import com.ruoyi.data.domain.bo.TblMqttBo;
|
|
|
|
|
+import com.ruoyi.data.domain.vo.TblMqttVo;
|
|
|
|
|
+import com.ruoyi.data.mapper.TblEquipmentMqttMapper;
|
|
|
|
|
+import com.ruoyi.data.mapper.TblMqttMapper;
|
|
|
|
|
+import com.ruoyi.data.mapper.TblRecordMapper;
|
|
|
|
|
+import com.ruoyi.data.service.MqttService;
|
|
|
|
|
+import lombok.RequiredArgsConstructor;
|
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
|
|
|
|
+import org.eclipse.paho.client.mqttv3.MqttCallback;
|
|
|
|
|
+import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
|
+
|
|
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
|
|
+import java.util.Base64;
|
|
|
|
|
+import java.util.Date;
|
|
|
|
|
+import java.util.List;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
+
|
|
|
|
|
+@RequiredArgsConstructor
|
|
|
|
|
+@Service
|
|
|
|
|
+@Slf4j
|
|
|
|
|
+public class MqttServiceImpl implements MqttService {
|
|
|
|
|
+
|
|
|
|
|
+ private final TblMqttMapper tblMqttMapper;
|
|
|
|
|
+
|
|
|
|
|
+ private final TblEquipmentMqttMapper tblEquipmentMqttMapper;
|
|
|
|
|
+
|
|
|
|
|
+ private final TblRecordMapper tblRecordMapper;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void pubMqttData(String mqttStr) {
|
|
|
|
|
+ JSONObject jsonObject = new JSONObject(mqttStr);
|
|
|
|
|
+ Long deviceId = Long.valueOf((String) jsonObject.get("deviceId"));
|
|
|
|
|
+ SimpleDateFormat formatter= new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss");
|
|
|
|
|
+ Date date = new Date(System.currentTimeMillis());
|
|
|
|
|
+ jsonObject.put("created_time",formatter.format(date));
|
|
|
|
|
+ MqttObj mqttObj = new MqttObj();
|
|
|
|
|
+ mqttObj.setEquipmentId(deviceId);
|
|
|
|
|
+ List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(mqttObj);
|
|
|
|
|
+ for(MqttObj obj:mqttObjList){
|
|
|
|
|
+ JSONObject topicObj = obj.getTopicQos("tcp");
|
|
|
|
|
+ MQTTConnect mqttConnect = new MQTTConnect();
|
|
|
|
|
+ try{
|
|
|
|
|
+ mqttConnect.createMqttClient(obj.getServerAddress(),obj.getUuid(),obj.getAccount(),obj.getPassword(),new Callback());
|
|
|
|
|
+ if(topicObj != null){
|
|
|
|
|
+ String topic = topicObj.get("name").toString().replace("#","");
|
|
|
|
|
+ mqttConnect.pub(topic,mqttStr,Integer.valueOf((String) topicObj.get("qos")));
|
|
|
|
|
+ }else{
|
|
|
|
|
+ String topic = "sensor/modbustcp/"+deviceId;
|
|
|
|
|
+ mqttConnect.pub(topic,mqttStr,0);
|
|
|
|
|
+ }
|
|
|
|
|
+ TblRecord tblRecord = new TblRecord();
|
|
|
|
|
+ tblRecord.setEquipmentId(deviceId);
|
|
|
|
|
+ tblRecord.setJson(mqttStr);
|
|
|
|
|
+ tblRecord.setCreateBy("admin");
|
|
|
|
|
+ tblRecord.setUpdateBy("admin");
|
|
|
|
|
+ tblRecordMapper.insert(tblRecord);
|
|
|
|
|
+ }catch (Exception e){
|
|
|
|
|
+ e.printStackTrace();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void createMqttMain(TblMqttBo bo){
|
|
|
|
|
+ MQTTConnect mqttConnect = new MQTTConnect();
|
|
|
|
|
+ try {
|
|
|
|
|
+ mqttConnect.createMqttClient("ws://52.130.249.112:8083/mqtt","adminTest","ship","ship@2021.11.24",new Callback());
|
|
|
|
|
+ mqttConnect.sub("device/#");
|
|
|
|
|
+ }catch (Exception e){
|
|
|
|
|
+ e.printStackTrace();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void createMqtt(TblMqttBo bo){
|
|
|
|
|
+ LambdaQueryWrapper<TblMqtt> lqw = buildQueryWrapper(bo);
|
|
|
|
|
+ List<TblMqttVo> mqttVoList = tblMqttMapper.selectVoList(lqw);
|
|
|
|
|
+ for(TblMqttVo mqttVo:mqttVoList){
|
|
|
|
|
+ MQTTConnect mqttConnect = new MQTTConnect();
|
|
|
|
|
+ try {
|
|
|
|
|
+ mqttConnect.createMqttClient(mqttVo.getServerAddress(),mqttVo.getUuid(),mqttVo.getAccount(),mqttVo.getPassword(),new Callback());
|
|
|
|
|
+ }catch (Exception e){
|
|
|
|
|
+ e.printStackTrace();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void pubOrder(OrderBean orderBean){
|
|
|
|
|
+ JSONObject jsonObject = new JSONObject();
|
|
|
|
|
+ jsonObject.put("deviceId",orderBean.getDeviceId());
|
|
|
|
|
+ jsonObject.put("add",orderBean.getAdd());
|
|
|
|
|
+ jsonObject.put("value",orderBean.getValue());
|
|
|
|
|
+ jsonObject.put("addrOffset",orderBean.getAddrOffset());
|
|
|
|
|
+ jsonObject.put("len",orderBean.getValue().split(",").length);
|
|
|
|
|
+ MQTTConnect mqttConnect = new MQTTConnect();
|
|
|
|
|
+ try {
|
|
|
|
|
+ mqttConnect.createMqttClient("ws://52.130.249.112:8083/mqtt","adminTest","ship","ship@2021.11.24",new Callback());
|
|
|
|
|
+ mqttConnect.pub("control",jsonObject.toString(),0);
|
|
|
|
|
+ }catch (Exception e){
|
|
|
|
|
+ e.printStackTrace();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private LambdaQueryWrapper<TblMqtt> buildQueryWrapper(TblMqttBo bo) {
|
|
|
|
|
+ Map<String, Object> params = bo.getParams();
|
|
|
|
|
+ LambdaQueryWrapper<TblMqtt> lqw = Wrappers.lambdaQuery();
|
|
|
|
|
+ lqw.like(StringUtils.isNotBlank(bo.getProtocolName()), TblMqtt::getProtocolName, bo.getProtocolName());
|
|
|
|
|
+ lqw.eq(StringUtils.isNotBlank(bo.getProtocolDesc()), TblMqtt::getProtocolDesc, bo.getProtocolDesc());
|
|
|
|
|
+ lqw.eq(StringUtils.isNotBlank(bo.getProtocolType()), TblMqtt::getProtocolType, bo.getProtocolType());
|
|
|
|
|
+ lqw.eq(StringUtils.isNotBlank(bo.getServerAddress()), TblMqtt::getServerAddress, bo.getServerAddress());
|
|
|
|
|
+ lqw.eq(StringUtils.isNotBlank(bo.getServerTopic()), TblMqtt::getServerTopic, bo.getServerTopic());
|
|
|
|
|
+ lqw.eq(StringUtils.isNotBlank(bo.getAccount()), TblMqtt::getAccount, bo.getAccount());
|
|
|
|
|
+ lqw.eq(StringUtils.isNotBlank(bo.getPassword()), TblMqtt::getPassword, bo.getPassword());
|
|
|
|
|
+ lqw.eq(StringUtils.isNotBlank(bo.getUuid()), TblMqtt::getUuid, bo.getUuid());
|
|
|
|
|
+// lqw.eq(bo.getStatus() != null, TblMqtt::getStatus, bo.getStatus());
|
|
|
|
|
+ return lqw;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ class Callback implements MqttCallback {
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * MQTT 断开连接会执行此方法
|
|
|
|
|
+ */
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void connectionLost(Throwable throwable) {
|
|
|
|
|
+ log.info("断开了MQTT连接111 :{}", throwable.getMessage());
|
|
|
|
|
+ log.error(throwable.getMessage(), throwable);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * publish发布成功后会执行到这里
|
|
|
|
|
+ */
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
|
|
|
|
+ log.info("发布消息成功");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * subscribe订阅后得到的消息会执行到这里
|
|
|
|
|
+ */
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
|
|
|
|
|
+ // TODO 此处可以将订阅得到的消息进行业务处理、数据存储
|
|
|
|
|
+ String payload = String.valueOf(message.getPayload());
|
|
|
|
|
+ // String msg = Byte.toString(message.getPayload());
|
|
|
|
|
+ byte[] bytes = message.getPayload();
|
|
|
|
|
+ String encoded = Base64.getEncoder().encodeToString(bytes);
|
|
|
|
|
+ byte[] decoded = Base64.getDecoder().decode(encoded);
|
|
|
|
|
+ String msg =new String(decoded);
|
|
|
|
|
+ System.out.println(msg);
|
|
|
|
|
+ log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
|
|
|
|
|
+ MqttServiceImpl.this.pubMqttData(msg);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+}
|