package com.ruoyi.data.service.impl; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSON; import cn.hutool.json.JSONArray; import cn.hutool.core.lang.UUID; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.ruoyi.common.core.domain.entity.SysDictData; import com.ruoyi.common.utils.PoolUtils; import com.ruoyi.common.utils.StringUtils; import com.ruoyi.common.utils.mqtt.MQTTConnect; import com.ruoyi.common.utils.redis.CacheUtils; import com.ruoyi.data.domain.*; import com.ruoyi.data.domain.bo.*; import com.ruoyi.data.domain.vo.*; import com.ruoyi.data.mapper.*; import com.ruoyi.data.service.*; import com.ruoyi.system.service.ISysDictTypeService; import lombok.Data; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.mozilla.javascript.Context; import org.mozilla.javascript.Scriptable; import org.mozilla.javascript.ScriptableObject; import java.util.*; import java.util.stream.Collectors; @RequiredArgsConstructor @Service @Slf4j public class MqttServiceImpl implements MqttService { private final TblMqttMapper tblMqttMapper; private final TblEquipmentMqttMapper tblEquipmentMqttMapper; private final TblRecordMapper tblRecordMapper; private final TblSensorMapper tblSensorMapper; private final TblSensorRecordMapper tblSensorRecordMapper; private final ISysDictTypeService sysDictTypeService; private final WebsocketService websocketService; private final ITblEquipmentSbookService equipmentSbookService; private final ITblRuleService iTblRuleService; private final ITblRuleFilterService iTblRuleFilterService; private final ITblBreakdownService iTblBreakdownService; private final ITblWarnService iTblWarnService; private final ITblRuleExecuteService iTblRuleExecuteService; private final TblWarnMapper tblWarnMapper; private final TblBreakdownMapper tblBreakdownMapper; @Value("${mqtt.url}") private String mqttUrl; @Value("${mqtt.clientid}") private String clientID; @Value("${mqtt.user}") private String mqttUser; @Value("${mqtt.password}") private String mqttPassword; @Value("${mqtt.topic}") private String mqttTopic; @Value("${mqtt.saveAndForward}") private Boolean saveAndForward; @Override public void pubMqttData(String mqttStr) { //数据分发到websocket 前端调用 websocketService.sendMessageAll(mqttStr); MqttSensorDataBo dataBo = JSONUtil.toBean(mqttStr, MqttSensorDataBo.class); TblSensor tblSensor = tblSensorMapper.selectById(dataBo.getSensorId()); filterData(dataBo,tblSensor); //数据保存到数据库 if(saveAndForward) { // 解析数据 异步线程执行 PoolUtils.getPool().execute(new Runnable() { @Override public void run() { MqttSensorDataBo dataBo = JSONUtil.toBean(mqttStr, MqttSensorDataBo.class); Date timedata = new Date(); if (dataBo.getTime() != null) { timedata=(dataBo.getTime()); } Date finalTimedata = timedata; //放缓存内 sensorId 缓存数据 MqttSensorDataBo dataBo1 = CacheUtils.get("sensorData", dataBo.getSensorId()); if (dataBo1 == null) { dataBo.getData().forEach(i -> { i.setCreateTime(finalTimedata); }); CacheUtils.put("sensorData", dataBo.getSensorId(), dataBo); } else { dataBo1.getData().addAll(dataBo.getData()); dataBo.getData().forEach(i -> { List dataBoList = dataBo1.getData().stream().filter(j -> j.getName().equals(i.getName())).collect(Collectors.toList()); if (dataBoList.size() > 0) { dataBo1.getData().removeAll(dataBoList); } i.setCreateTime(finalTimedata); dataBo1.getData().add(i); }); CacheUtils.put("sensorData", dataBo.getSensorId(), dataBo1); } //TODO 缓存获取对应设备的缓存数据 TblSensor tblSensor = tblSensorMapper.selectById(dataBo.getSensorId()); filterData(dataBo,tblSensor); if (tblSensor != null) { //保存到数据库 List tblSensorRecords = new ArrayList<>(); dataBo.getData().forEach(i->{ TblSensorRecord tblSensorRecord = new TblSensorRecord(); tblSensorRecord.setEquipmentId(tblSensor.getDeviceId()); tblSensorRecord.setCreateBy("admin"); tblSensorRecord.setUpdateBy("admin"); tblSensorRecord.setCreateTime(finalTimedata); tblSensorRecord.setSensorId(tblSensor.getId()); tblSensorRecord.setPointName(i.getName()); tblSensorRecord.setPointValue(i.getValue()); tblSensorRecords.add(tblSensorRecord); }); if(tblSensorRecords.size()>0){ tblSensorRecordMapper.insertBatch(tblSensorRecords); } String protocolType = ""; List sysDictTypeList = sysDictTypeService.selectDictDataByType("protocal_type"); for (SysDictData sysDictData : sysDictTypeList) { if (Long.valueOf(sysDictData.getDictValue()).longValue() == tblSensor.getProtocalType().longValue()) { protocolType = sysDictData.getDictLabel(); } } //转换 SensorDataToOtherMqttBo mqttdata = new SensorDataToOtherMqttBo(); mqttdata.setDeviceId(tblSensor.getDeviceId().toString()); List dataBoList = JSONUtil.toList(tblSensor.getDatapoints(), SensorDataBo.class); MqttSensorDataBo dataBo12 = CacheUtils.get("sensorData", dataBo.getSensorId()); //推缓存数据 dataBo12.getData().forEach(i -> { dataBoList.stream().filter(j -> j.getName().equals(i.getName())).forEach(j -> { i.setUnitSymbol(j.getUnitType()); i.setUnit(j.getUnit()); i.setUnitType(j.getUnit()); }); i.setParams(null); } ); mqttdata.setData(dataBo12.getData()); mqttdata.setSensorId(tblSensor.getId().toString()); mqttdata.setCreateTime(finalTimedata); String mqttstr = JSONUtil.parseObj(mqttdata, true).toStringPretty(); //转发mqtt数据 //TODO 缓存获取对应mqtt的缓存数据 MqttObj searchdata = new MqttObj(); searchdata.setEquipmentId(tblSensor.getDeviceId()); List mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(searchdata); for (MqttObj obj : mqttObjList) { if (obj.getStatus() == 1) { JSONArray topicObj1 = JSONUtil.parseArray(obj.getServerTopic()); MQTTConnect mqttConnect = new MQTTConnect(); try { mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback()); if (topicObj1.size() > 0) { String finalProtocolType = protocolType; topicObj1.forEach(i -> { JSONObject p = (JSONObject) i; String topic = p.getStr("name"); topic = StrUtil.replace(topic, "$protocolType$", finalProtocolType); topic = StrUtil.replace(topic, "$sensorId$", tblSensor.getId().toString()); try { mqttConnect.pub(topic, mqttstr.toString(), (Integer) p.get("qos")); } catch (MqttException e) { throw new RuntimeException(e); } }); } else { String topic = "forwarding/" + protocolType + "/" + tblSensor.getId().toString(); mqttConnect.pub(topic, mqttstr.toString(), 0); } } catch (Exception e) { log.error("mqtt转发失败", e); } } } } } }); } } @Override public void createMqttMain(TblMqttBo bo){ MQTTConnect mqttConnect = new MQTTConnect(); try { System.out.println(mqttUrl); mqttConnect.createMqttClient(mqttUrl, UUID.fastUUID().toString(),mqttUser,mqttPassword,new Callback("main")); mqttConnect.sub(mqttTopic); }catch (Exception e){ e.printStackTrace(); } } public void filterData(MqttSensorDataBo dataBo,TblSensor tblSensor){ TblRuleBo tblRuleBo = new TblRuleBo(); tblRuleBo.setSensorId(Long.valueOf(dataBo.getSensorId())); List ruleVoList = iTblRuleService.queryList(tblRuleBo); dataBo.getData().forEach(i->{ ruleVoList.forEach(ruleVo->{ if(i.getName().equals(ruleVo.getTriggeringCondition())){ ruleFilter(i,ruleVo.getId(),tblSensor); } }); }); } public void ruleFilter(SensorDataBo bo,Long ruleId,TblSensor tblSensor){ TblRuleFilterBo tblRuleFilterBo = new TblRuleFilterBo(); tblRuleFilterBo.setRuleId(Long.toString(ruleId)); List tblRuleFilterVoList = iTblRuleFilterService.queryList(tblRuleFilterBo); tblRuleFilterVoList.forEach(i->{ String value = bo.getValue(); if(StringUtils.isNotBlank(i.getFormula())){ try { Context context = Context.enter(); Scriptable scope = context.initStandardObjects(); String script = "function format(data) { return "+i.getFormula()+" ; } format("+value+");"; Object result = context.evaluateString(scope, script, "", 1, null); value = (String) result; System.out.println(value); } catch (Exception e) { System.out.println("表达式runtime错误:" + e.getMessage()); } } Boolean isFilter = false; if(StringUtils.isNotBlank(i.getFilterAlgorithm())){ Context context = Context.enter(); Scriptable scope = context.initStandardObjects(); String script = "function isFilter(data,up,down) { return "+i.getFilterAlgorithm()+" ; }"; try { script+="isFilter("+ value+","+i.getUpperLimit()+","+i.getLowerLimit()+")"; Object result = context.evaluateString(scope, script, "", 1, null); isFilter = (Boolean) result; // isFilter = (Boolean) inv.invokeFunction("isFilter", value,i.getUpperLimit(),i.getLowerLimit() ); System.out.println(isFilter); if(isFilter){ ruleExecute(bo,i.getId(),tblSensor); } } catch (Exception e) { System.out.println("表达式runtime错误:" + e.getMessage()); } } }); } private void validEntityBeforeSave(TblSensorRecord entity){ //TODO 做一些数据校验,如唯一约束 } private void ruleExecute(SensorDataBo bo,String id,TblSensor tblSensor){ TblRuleExecuteBo executeBo = new TblRuleExecuteBo(); executeBo.setFilterId(Long.valueOf(id)); List list = iTblRuleExecuteService.queryList(executeBo); for(TblRuleExecuteVo vo:list){ if(vo.getExecuteAction().equals("warn")){ TblWarnBo warn = new TblWarnBo(); TblEquipmentSbookVo tblEquipment = equipmentSbookService.queryById(tblSensor.getDeviceId()); warn.setName("设备"+tblEquipment.getName()+"传感器"+tblSensor.getName()+"数据异常"); warn.setVal(bo.getValue()); warn.setEquipmentId(tblEquipment.getId()); warn.setEquipmentName(tblEquipment.getName()); warn.setContent("设备"+tblEquipment.getName()+"传感器"+tblSensor.getName()+vo.getRemark()); warn.setAlarmTime(new Date()); if(StringUtils.isNotBlank(vo.getExpr1()) && vo.getExpr1() != null){ JSONObject jsonObject = new JSONObject(vo.getExpr1()); if(StringUtils.isNotBlank(jsonObject.getStr("postpone")) && jsonObject.getStr("postpone") != null){ Long postpone = Long.valueOf(jsonObject.getStr("postpone")); try{ Thread.sleep(postpone); }catch (Exception e){ e.printStackTrace(); } } if(jsonObject.getStr("frequency") != null && jsonObject.getStr("interval") != null && jsonObject.getStr("invalidation") != null){ TblWarn tblWarn = new TblWarn(); tblWarn.setEquipmentId(tblEquipment.getId()); tblWarn.setStatus(jsonObject.getStr("interval")); List warnList = tblWarnMapper.getWarnDeviceList(tblWarn); Integer frequency = Integer.valueOf(jsonObject.getStr("frequency")); Integer interval = Integer.valueOf(jsonObject.getStr("interval")); Integer invalidation = Integer.valueOf(jsonObject.getStr("invalidation")); Integer time = frequency*interval + invalidation + 100; tblWarn.setStatus(String.valueOf(time)); List totalWarnList = tblWarnMapper.getWarnDeviceList(tblWarn); if(warnList.size() == 0 && totalWarnList.size() < frequency){ iTblWarnService.insertByBo(warn); } } }else{ iTblWarnService.insertByBo(warn); } } else if (vo.getExecuteAction().equals("breakbown")) { TblBreakdownBo tblBreakdownBo = new TblBreakdownBo(); TblEquipmentSbookVo tblEquipment = equipmentSbookService.queryById(tblSensor.getDeviceId()); tblBreakdownBo.setName("设备"+tblEquipment.getName()+"传感器"+tblSensor.getName()+"故障"); tblBreakdownBo.setVal(bo.getValue()); tblBreakdownBo.setEquipmentId(tblEquipment.getId()); tblBreakdownBo.setEquipmentName(tblEquipment.getName()); tblBreakdownBo.setContent("设备"+tblEquipment.getName()+"传感器"+tblSensor.getName()+vo.getRemark()); tblBreakdownBo.setFaultTime(new Date()); if(StringUtils.isNotBlank(vo.getExpr1()) && vo.getExpr1() != null){ JSONObject jsonObject = new JSONObject(vo.getExpr1()); if(StringUtils.isNotBlank(jsonObject.getStr("postpone")) && jsonObject.getStr("postpone") != null){ Long postpone = Long.valueOf(jsonObject.getStr("postpone")); try{ Thread.sleep(postpone); }catch (Exception e){ e.printStackTrace(); } } if(jsonObject.getStr("frequency") != null && jsonObject.getStr("interval") != null && jsonObject.getStr("invalidation") != null){ TblBreakdown tblBreakdown = new TblBreakdown(); tblBreakdown.setEquipmentId(tblEquipment.getId()); tblBreakdown.setStatus(jsonObject.getStr("interval")); List breakList = tblBreakdownMapper.getBreakdownList(tblBreakdown); Integer frequency = Integer.valueOf(jsonObject.getStr("frequency")); Integer interval = Integer.valueOf(jsonObject.getStr("interval")); Integer invalidation = Integer.valueOf(jsonObject.getStr("invalidation")); Integer time = frequency*interval + invalidation + 100; tblBreakdown.setStatus(String.valueOf(time)); List totalbreakList = tblBreakdownMapper.getBreakdownList(tblBreakdown); if(breakList.size() == 0 && totalbreakList.size() < frequency){ iTblBreakdownService.insertByBo(tblBreakdownBo); } } }else{ iTblBreakdownService.insertByBo(tblBreakdownBo); } } } } @Override public void createMqtt(TblMqttBo bo){ LambdaQueryWrapper lqw = buildQueryWrapper(bo); List mqttVoList = tblMqttMapper.selectVoList(lqw); for(TblMqttVo mqttVo:mqttVoList){ MQTTConnect mqttConnect = new MQTTConnect(); try { mqttConnect.createMqttClient(mqttVo.getServerAddress(),mqttVo.getUuid(),mqttVo.getAccount(),mqttVo.getPassword(),new Callback()); }catch (Exception e){ e.printStackTrace(); } } } @Override public void pubOrder(OrderBean orderBean){ JSONObject jsonObject = new JSONObject(); jsonObject.put("deviceId",orderBean.getDeviceId()); jsonObject.put("add",orderBean.getAdd()); jsonObject.put("value",orderBean.getValue()); jsonObject.put("addrOffset",orderBean.getAddrOffset()); jsonObject.put("len",orderBean.getValue().split(",").length); MQTTConnect mqttConnect = new MQTTConnect(); try { mqttConnect.createMqttClient("ws://52.130.249.112:8083/mqtt","adminTest","ship","ship@2021.11.24",new Callback()); mqttConnect.pub("control",jsonObject.toString(),0); }catch (Exception e){ e.printStackTrace(); } } private LambdaQueryWrapper buildQueryWrapper(TblMqttBo bo) { Map params = bo.getParams(); LambdaQueryWrapper lqw = Wrappers.lambdaQuery(); lqw.like(StringUtils.isNotBlank(bo.getProtocolName()), TblMqtt::getProtocolName, bo.getProtocolName()); lqw.eq(StringUtils.isNotBlank(bo.getProtocolDesc()), TblMqtt::getProtocolDesc, bo.getProtocolDesc()); lqw.eq(StringUtils.isNotBlank(bo.getProtocolType()), TblMqtt::getProtocolType, bo.getProtocolType()); lqw.eq(StringUtils.isNotBlank(bo.getServerAddress()), TblMqtt::getServerAddress, bo.getServerAddress()); lqw.eq(StringUtils.isNotBlank(bo.getServerTopic()), TblMqtt::getServerTopic, bo.getServerTopic()); lqw.eq(StringUtils.isNotBlank(bo.getAccount()), TblMqtt::getAccount, bo.getAccount()); lqw.eq(StringUtils.isNotBlank(bo.getPassword()), TblMqtt::getPassword, bo.getPassword()); lqw.eq(StringUtils.isNotBlank(bo.getUuid()), TblMqtt::getUuid, bo.getUuid()); lqw.eq(bo.getStatus() != null, TblMqtt::getStatus, bo.getStatus()); return lqw; } class Callback implements MqttCallback { private String type; Callback(){ type= "default"; } Callback(String type){ this.type = type; } /** * MQTT 断开连接会执行此方法 */ @Override public void connectionLost(Throwable throwable) { log.info("断开了MQTT连接111 :{}", throwable.getMessage()); log.error(throwable.getMessage(), throwable); if(this.type.equals("main")){ MqttServiceImpl.this.createMqtt(new TblMqttBo()); } // MqttServiceImpl.this.createMqtt(new TblMqttBo()); } /** * publish发布成功后会执行到这里 */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("发布消息成功"); } /** * subscribe订阅后得到的消息会执行到这里 */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // TODO 此处可以将订阅得到的消息进行业务处理、数据存储 String payload = String.valueOf(message.getPayload()); // String msg = Byte.toString(message.getPayload()); byte[] bytes = message.getPayload(); String encoded = Base64.getEncoder().encodeToString(bytes); byte[] decoded = Base64.getDecoder().decode(encoded); String msg =new String(decoded); System.out.println(msg); log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload())); MqttServiceImpl.this.pubMqttData(msg); } } private LambdaQueryWrapper buildSensorRecordWrapper(TblSensorRecordBo bo) { Map params = bo.getParams(); LambdaQueryWrapper lqw = Wrappers.lambdaQuery(); lqw.eq(bo.getSensorId() != null, TblSensorRecord::getSensorId, bo.getSensorId()); lqw.eq(StringUtils.isNotBlank(bo.getPointName()), TblSensorRecord::getPointName, bo.getPointName()); lqw.eq(StringUtils.isNotBlank(bo.getPointValue()), TblSensorRecord::getPointValue, bo.getPointValue()); lqw.eq(bo.getEquipmentId() != null, TblSensorRecord::getEquipmentId, bo.getEquipmentId()); return lqw; } @Override public void setMqttCache(){ List equipmentVoList = equipmentSbookService.queryList(new TblEquipmentSbookBo()); for(TblEquipmentSbookVo vo:equipmentVoList){ MqttObj obj = new MqttObj(); obj.setEquipmentId(vo.getId()); List mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(obj); CacheUtils.put("DeviceMqtt",vo.getId(),mqttObjList); } } }