package com.ruoyi.data.service.impl; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONArray; import cn.hutool.core.lang.UUID; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.ruoyi.common.core.domain.entity.SysDictData; import com.ruoyi.common.utils.StringUtils; import com.ruoyi.common.utils.mqtt.MQTTConnect; import com.ruoyi.common.utils.redis.CacheUtils; import com.ruoyi.data.domain.*; import com.ruoyi.data.domain.bo.TblMqttBo; import com.ruoyi.data.domain.bo.TblSensorRecordBo; import com.ruoyi.data.domain.vo.TblMqttVo; import com.ruoyi.data.domain.vo.TblRecordVo; import com.ruoyi.data.domain.vo.TblSensorRecordVo; import com.ruoyi.data.mapper.*; import com.ruoyi.data.service.MqttService; import com.ruoyi.data.service.WebsocketService; import com.ruoyi.system.service.ISysDictTypeService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import sun.management.Sensor; import javax.annotation.PostConstruct; import java.text.SimpleDateFormat; import java.util.Base64; import java.util.Date; import java.util.List; import java.util.Map; @RequiredArgsConstructor @Service @Slf4j public class MqttServiceImpl implements MqttService { private final TblMqttMapper tblMqttMapper; private final TblEquipmentMqttMapper tblEquipmentMqttMapper; private final TblRecordMapper tblRecordMapper; private final TblSensorMapper tblSensorMapper; private final TblSensorRecordMapper tblSensorRecordMapper; private final ISysDictTypeService sysDictTypeService; private final WebsocketService websocketService; @Value("${mqtt.url}") private String mqttUrl; @Value("${mqtt.clientid}") private String clientID; @Value("${mqtt.user}") private String mqttUser; @Value("${mqtt.password}") private String mqttPassword; @Value("${mqtt.topic}") private String mqttTopic; @Value("${mqtt.saveAndForward}") private Boolean saveAndForward; @Override public void pubMqttData(String mqttStr) { websocketService.sendMessageAll(mqttStr); if(saveAndForward) { JSONObject jsonObject = new JSONObject(mqttStr); Long sensorId = Long.valueOf((String) jsonObject.get("sensorId")); TblSensor tblSensor = tblSensorMapper.selectById(sensorId); Long deviceId = tblSensor.getDeviceId(); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(System.currentTimeMillis()); JSONObject mqttMsg = new JSONObject(); mqttMsg.put("deviceSn", deviceId); mqttMsg.put("sensorSn", sensorId); mqttMsg.put("sensorType", tblSensor.getSensorType()); mqttMsg.put("created_time", formatter.format(date)); JSONArray sensorData = new JSONArray(); JSONArray dataArry = new JSONArray(jsonObject.get("data")); for (Object obj : dataArry) { JSONObject dataObj = new JSONObject(obj); dataObj.remove("gcdinfo"); sensorData.add(dataObj); } mqttMsg.put("sensorData", sensorData); MqttObj mqttObj = new MqttObj(); mqttObj.setEquipmentId(deviceId); List mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(mqttObj); TblRecord tblRecord = new TblRecord(); tblRecord.setEquipmentId(deviceId); tblRecord.setJson(mqttStr); tblRecord.setCreateBy("admin"); tblRecord.setUpdateBy("admin"); tblRecord.setSensorId(sensorId); tblRecordMapper.insert(tblRecord); // TblSensorRecordBo tblSensorRecord = new TblSensorRecordBo(); // tblSensorRecord.setEquipmentId(deviceId); // tblSensorRecord.setJson(mqttStr); // tblSensorRecord.setSensorId(sensorId); // tblSensorRecord.setCreateBy("admin"); // tblSensorRecord.setUpdateBy("admin"); // updateNowRecord(tblSensorRecord); CacheUtils.put("sensorData",sensorId,mqttMsg.toString()); // tblSensorRecordMapper.insert(tblSensorRecord); String protocolType = ""; List sysDictTypeList = sysDictTypeService.selectDictDataByType("protocal_type"); for(SysDictData sysDictData:sysDictTypeList){ if(Long.valueOf(sysDictData.getDictValue()).longValue() == tblSensor.getProtocalType().longValue()){ protocolType = sysDictData.getDictLabel(); } } for(MqttObj obj:mqttObjList){ if(obj.getStatus() == 1) { JSONArray topicObj1 = JSONUtil.parseArray(obj.getServerTopic()); MQTTConnect mqttConnect = new MQTTConnect(); try { mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback()); if (topicObj1.size() >0) { String finalProtocolType = protocolType; topicObj1.forEach(i->{ JSONObject p = (JSONObject)i; String topic = p.getStr("name"); topic = StrUtil.replace(topic,"$protocolType$", finalProtocolType); topic = StrUtil.replace(topic,"$sensorId$", sensorId.toString()); try { mqttConnect.pub(topic, mqttMsg.toString(), (Integer) p.get("qos")); } catch (MqttException e) { throw new RuntimeException(e); } }); } else { String topic = "sensor/"+protocolType+"/" + sensorId; mqttConnect.pub(topic, mqttMsg.toString(), 0); } } catch (Exception e) { e.printStackTrace(); } } } } } @Override public void createMqttMain(TblMqttBo bo){ MQTTConnect mqttConnect = new MQTTConnect(); try { System.out.println(mqttUrl); mqttConnect.createMqttClient(mqttUrl, UUID.fastUUID().toString(),mqttUser,mqttPassword,new Callback()); mqttConnect.sub(mqttTopic); }catch (Exception e){ e.printStackTrace(); } } public void updateNowRecord(TblSensorRecordBo bo){ TblSensorRecordBo tblSensorRecordBo = new TblSensorRecordBo(); tblSensorRecordBo.setSensorId(bo.getSensorId()); LambdaQueryWrapper lqw = buildSensorRecordWrapper(bo); List tblSensorRecordVoList = tblSensorRecordMapper.selectVoList(lqw); if(tblSensorRecordVoList.size() > 0 ){ bo.setId(tblSensorRecordVoList.get(0).getId()); TblSensorRecord update = BeanUtil.toBean(bo, TblSensorRecord.class); validEntityBeforeSave(update); tblSensorRecordMapper.updateById(update); }else{ TblSensorRecord add = BeanUtil.toBean(bo, TblSensorRecord.class); validEntityBeforeSave(add); tblSensorRecordMapper.insert(add); } } private void validEntityBeforeSave(TblSensorRecord entity){ //TODO 做一些数据校验,如唯一约束 } @Override public void createMqtt(TblMqttBo bo){ LambdaQueryWrapper lqw = buildQueryWrapper(bo); List mqttVoList = tblMqttMapper.selectVoList(lqw); for(TblMqttVo mqttVo:mqttVoList){ MQTTConnect mqttConnect = new MQTTConnect(); try { mqttConnect.createMqttClient(mqttVo.getServerAddress(),mqttVo.getUuid(),mqttVo.getAccount(),mqttVo.getPassword(),new Callback()); }catch (Exception e){ e.printStackTrace(); } } } @Override public void pubOrder(OrderBean orderBean){ JSONObject jsonObject = new JSONObject(); jsonObject.put("deviceId",orderBean.getDeviceId()); jsonObject.put("add",orderBean.getAdd()); jsonObject.put("value",orderBean.getValue()); jsonObject.put("addrOffset",orderBean.getAddrOffset()); jsonObject.put("len",orderBean.getValue().split(",").length); MQTTConnect mqttConnect = new MQTTConnect(); try { mqttConnect.createMqttClient("ws://52.130.249.112:8083/mqtt","adminTest","ship","ship@2021.11.24",new Callback()); mqttConnect.pub("control",jsonObject.toString(),0); }catch (Exception e){ e.printStackTrace(); } } private LambdaQueryWrapper buildQueryWrapper(TblMqttBo bo) { Map params = bo.getParams(); LambdaQueryWrapper lqw = Wrappers.lambdaQuery(); lqw.like(StringUtils.isNotBlank(bo.getProtocolName()), TblMqtt::getProtocolName, bo.getProtocolName()); lqw.eq(StringUtils.isNotBlank(bo.getProtocolDesc()), TblMqtt::getProtocolDesc, bo.getProtocolDesc()); lqw.eq(StringUtils.isNotBlank(bo.getProtocolType()), TblMqtt::getProtocolType, bo.getProtocolType()); lqw.eq(StringUtils.isNotBlank(bo.getServerAddress()), TblMqtt::getServerAddress, bo.getServerAddress()); lqw.eq(StringUtils.isNotBlank(bo.getServerTopic()), TblMqtt::getServerTopic, bo.getServerTopic()); lqw.eq(StringUtils.isNotBlank(bo.getAccount()), TblMqtt::getAccount, bo.getAccount()); lqw.eq(StringUtils.isNotBlank(bo.getPassword()), TblMqtt::getPassword, bo.getPassword()); lqw.eq(StringUtils.isNotBlank(bo.getUuid()), TblMqtt::getUuid, bo.getUuid()); // lqw.eq(bo.getStatus() != null, TblMqtt::getStatus, bo.getStatus()); return lqw; } class Callback implements MqttCallback { /** * MQTT 断开连接会执行此方法 */ @Override public void connectionLost(Throwable throwable) { log.info("断开了MQTT连接111 :{}", throwable.getMessage()); log.error(throwable.getMessage(), throwable); } /** * publish发布成功后会执行到这里 */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("发布消息成功"); } /** * subscribe订阅后得到的消息会执行到这里 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // TODO 此处可以将订阅得到的消息进行业务处理、数据存储 String payload = String.valueOf(message.getPayload()); // String msg = Byte.toString(message.getPayload()); byte[] bytes = message.getPayload(); String encoded = Base64.getEncoder().encodeToString(bytes); byte[] decoded = Base64.getDecoder().decode(encoded); String msg =new String(decoded); System.out.println(msg); log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload())); MqttServiceImpl.this.pubMqttData(msg); } } private LambdaQueryWrapper buildSensorRecordWrapper(TblSensorRecordBo bo) { Map params = bo.getParams(); LambdaQueryWrapper lqw = Wrappers.lambdaQuery(); lqw.eq(bo.getSensorId() != null, TblSensorRecord::getSensorId, bo.getSensorId()); lqw.eq(StringUtils.isNotBlank(bo.getJson()), TblSensorRecord::getJson, bo.getJson()); lqw.eq(bo.getEquipmentId() != null, TblSensorRecord::getEquipmentId, bo.getEquipmentId()); return lqw; } }