package com.ruoyi.data.service.impl; import cn.hutool.core.bean.BeanUtil; import cn.hutool.json.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.ruoyi.common.utils.StringUtils; import com.ruoyi.common.utils.mqtt.MQTTConnect; import com.ruoyi.data.domain.*; import com.ruoyi.data.domain.bo.TblMqttBo; import com.ruoyi.data.domain.bo.TblSensorRecordBo; import com.ruoyi.data.domain.vo.TblMqttVo; import com.ruoyi.data.domain.vo.TblRecordVo; import com.ruoyi.data.domain.vo.TblSensorRecordVo; import com.ruoyi.data.mapper.*; import com.ruoyi.data.service.MqttService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import sun.management.Sensor; import java.text.SimpleDateFormat; import java.util.Base64; import java.util.Date; import java.util.List; import java.util.Map; @RequiredArgsConstructor @Service @Slf4j public class MqttServiceImpl implements MqttService { private final TblMqttMapper tblMqttMapper; private final TblEquipmentMqttMapper tblEquipmentMqttMapper; private final TblRecordMapper tblRecordMapper; private final TblSensorMapper tblSensorMapper; private final TblSensorRecordMapper tblSensorRecordMapper; @Value("${mqtt.url}") private String mqttUrl; @Value("${mqtt.clientid}") private String clientID; @Value("${mqtt.user}") private String mqttUser; @Value("${mqtt.password}") private String mqttPassword; @Value("${mqtt.topic}") private String mqttTopic; // @Value("${mqtt.url}") // private String mqttUrl; @Override public void pubMqttData(String mqttStr) { JSONObject jsonObject = new JSONObject(mqttStr); Long sensorId = Long.valueOf((String) jsonObject.get("sensorId")); TblSensor tblSensor = tblSensorMapper.selectById(sensorId); Long deviceId = tblSensor.getDeviceId(); SimpleDateFormat formatter= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(System.currentTimeMillis()); jsonObject.put("created_time",formatter.format(date)); MqttObj mqttObj = new MqttObj(); mqttObj.setEquipmentId(deviceId); List mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(mqttObj); TblRecord tblRecord = new TblRecord(); tblRecord.setEquipmentId(deviceId); tblRecord.setSensorId(sensorId); tblRecord.setJson(mqttStr); tblRecord.setCreateBy("admin"); tblRecord.setUpdateBy("admin"); tblRecord.setSensorId(sensorId); tblRecordMapper.insert(tblRecord); TblSensorRecordBo tblSensorRecord = new TblSensorRecordBo(); tblSensorRecord.setEquipmentId(deviceId); tblSensorRecord.setJson(mqttStr); tblSensorRecord.setCreateBy("admin"); tblSensorRecord.setUpdateBy("admin"); tblSensorRecord.setSensorId(sensorId); updateNowRecord(tblSensorRecord); // tblSensorRecordMapper.insert(tblSensorRecord); for(MqttObj obj:mqttObjList){ if(obj.getStatus() == 1) { JSONObject topicObj = obj.getTopicQos("tcp"); MQTTConnect mqttConnect = new MQTTConnect(); try { mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback()); if (topicObj != null) { String topic = topicObj.get("name").toString().replace("#", ""); mqttConnect.pub(topic, jsonObject.toString(), Integer.valueOf((String) topicObj.get("qos"))); } else { String topic = "sensor/modbustcp/" + deviceId; mqttConnect.pub(topic, jsonObject.toString(), 0); } } catch (Exception e) { e.printStackTrace(); } } } } @Override public void createMqttMain(TblMqttBo bo){ MQTTConnect mqttConnect = new MQTTConnect(); try { System.out.println(mqttUrl); mqttConnect.createMqttClient(mqttUrl,clientID,mqttUser,mqttPassword,new Callback()); mqttConnect.sub(mqttTopic); }catch (Exception e){ e.printStackTrace(); } } public void updateNowRecord(TblSensorRecordBo bo){ TblSensorRecordBo tblSensorRecordBo = new TblSensorRecordBo(); tblSensorRecordBo.setSensorId(bo.getSensorId()); LambdaQueryWrapper lqw = buildSensorRecordWrapper(bo); List tblSensorRecordVoList = tblSensorRecordMapper.selectVoList(lqw); if(tblSensorRecordVoList.size() > 0 ){ bo.setId(tblSensorRecordVoList.get(0).getId()); TblSensorRecord update = BeanUtil.toBean(bo, TblSensorRecord.class); validEntityBeforeSave(update); tblSensorRecordMapper.updateById(update); }else{ TblSensorRecord add = BeanUtil.toBean(bo, TblSensorRecord.class); validEntityBeforeSave(add); tblSensorRecordMapper.insert(add); } } private void validEntityBeforeSave(TblSensorRecord entity){ //TODO 做一些数据校验,如唯一约束 } @Override public void createMqtt(TblMqttBo bo){ LambdaQueryWrapper lqw = buildQueryWrapper(bo); List mqttVoList = tblMqttMapper.selectVoList(lqw); for(TblMqttVo mqttVo:mqttVoList){ MQTTConnect mqttConnect = new MQTTConnect(); try { mqttConnect.createMqttClient(mqttVo.getServerAddress(),mqttVo.getUuid(),mqttVo.getAccount(),mqttVo.getPassword(),new Callback()); }catch (Exception e){ e.printStackTrace(); } } } @Override public void pubOrder(OrderBean orderBean){ JSONObject jsonObject = new JSONObject(); jsonObject.put("deviceId",orderBean.getDeviceId()); jsonObject.put("add",orderBean.getAdd()); jsonObject.put("value",orderBean.getValue()); jsonObject.put("addrOffset",orderBean.getAddrOffset()); jsonObject.put("len",orderBean.getValue().split(",").length); MQTTConnect mqttConnect = new MQTTConnect(); try { mqttConnect.createMqttClient("ws://52.130.249.112:8083/mqtt","adminTest","ship","ship@2021.11.24",new Callback()); mqttConnect.pub("control",jsonObject.toString(),0); }catch (Exception e){ e.printStackTrace(); } } private LambdaQueryWrapper buildQueryWrapper(TblMqttBo bo) { Map params = bo.getParams(); LambdaQueryWrapper lqw = Wrappers.lambdaQuery(); lqw.like(StringUtils.isNotBlank(bo.getProtocolName()), TblMqtt::getProtocolName, bo.getProtocolName()); lqw.eq(StringUtils.isNotBlank(bo.getProtocolDesc()), TblMqtt::getProtocolDesc, bo.getProtocolDesc()); lqw.eq(StringUtils.isNotBlank(bo.getProtocolType()), TblMqtt::getProtocolType, bo.getProtocolType()); lqw.eq(StringUtils.isNotBlank(bo.getServerAddress()), TblMqtt::getServerAddress, bo.getServerAddress()); lqw.eq(StringUtils.isNotBlank(bo.getServerTopic()), TblMqtt::getServerTopic, bo.getServerTopic()); lqw.eq(StringUtils.isNotBlank(bo.getAccount()), TblMqtt::getAccount, bo.getAccount()); lqw.eq(StringUtils.isNotBlank(bo.getPassword()), TblMqtt::getPassword, bo.getPassword()); lqw.eq(StringUtils.isNotBlank(bo.getUuid()), TblMqtt::getUuid, bo.getUuid()); // lqw.eq(bo.getStatus() != null, TblMqtt::getStatus, bo.getStatus()); return lqw; } class Callback implements MqttCallback { /** * MQTT 断开连接会执行此方法 */ @Override public void connectionLost(Throwable throwable) { log.info("断开了MQTT连接111 :{}", throwable.getMessage()); log.error(throwable.getMessage(), throwable); } /** * publish发布成功后会执行到这里 */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("发布消息成功"); } /** * subscribe订阅后得到的消息会执行到这里 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // TODO 此处可以将订阅得到的消息进行业务处理、数据存储 String payload = String.valueOf(message.getPayload()); // String msg = Byte.toString(message.getPayload()); byte[] bytes = message.getPayload(); String encoded = Base64.getEncoder().encodeToString(bytes); byte[] decoded = Base64.getDecoder().decode(encoded); String msg =new String(decoded); System.out.println(msg); log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload())); MqttServiceImpl.this.pubMqttData(msg); } } private LambdaQueryWrapper buildSensorRecordWrapper(TblSensorRecordBo bo) { Map params = bo.getParams(); LambdaQueryWrapper lqw = Wrappers.lambdaQuery(); lqw.eq(bo.getSensorId() != null, TblSensorRecord::getSensorId, bo.getSensorId()); lqw.eq(StringUtils.isNotBlank(bo.getJson()), TblSensorRecord::getJson, bo.getJson()); lqw.eq(bo.getEquipmentId() != null, TblSensorRecord::getEquipmentId, bo.getEquipmentId()); return lqw; } }