| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- package com.ruoyi.data.service.impl;
- import cn.hutool.core.bean.BeanUtil;
- import cn.hutool.core.util.StrUtil;
- import cn.hutool.json.JSON;
- import cn.hutool.json.JSONArray;
- import cn.hutool.core.lang.UUID;
- import cn.hutool.json.JSONObject;
- import cn.hutool.json.JSONUtil;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import com.ruoyi.common.core.domain.entity.SysDictData;
- import com.ruoyi.common.utils.PoolUtils;
- import com.ruoyi.common.utils.StringUtils;
- import com.ruoyi.common.utils.mqtt.MQTTConnect;
- import com.ruoyi.common.utils.redis.CacheUtils;
- import com.ruoyi.data.domain.*;
- import com.ruoyi.data.domain.bo.*;
- import com.ruoyi.data.domain.vo.*;
- import com.ruoyi.data.mapper.*;
- import com.ruoyi.data.service.*;
- import com.ruoyi.system.service.ISysDictTypeService;
- import lombok.Data;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
- import org.mozilla.javascript.Context;
- import org.mozilla.javascript.Scriptable;
- import org.mozilla.javascript.ScriptableObject;
- import java.util.*;
- import java.util.stream.Collectors;
- @RequiredArgsConstructor
- @Service
- @Slf4j
- public class MqttServiceImpl implements MqttService {
- private final TblMqttMapper tblMqttMapper;
- private final TblEquipmentMqttMapper tblEquipmentMqttMapper;
- private final TblRecordMapper tblRecordMapper;
- private final TblSensorMapper tblSensorMapper;
- private final TblSensorRecordMapper tblSensorRecordMapper;
- private final ISysDictTypeService sysDictTypeService;
- private final WebsocketService websocketService;
- private final ITblEquipmentSbookService equipmentSbookService;
- private final ITblRuleService iTblRuleService;
- private final ITblRuleFilterService iTblRuleFilterService;
- private final ITblBreakdownService iTblBreakdownService;
- private final ITblWarnService iTblWarnService;
- private final ITblRuleExecuteService iTblRuleExecuteService;
- private final TblWarnMapper tblWarnMapper;
- private final TblBreakdownMapper tblBreakdownMapper;
- @Value("${mqtt.url}")
- private String mqttUrl;
- @Value("${mqtt.clientid}")
- private String clientID;
- @Value("${mqtt.user}")
- private String mqttUser;
- @Value("${mqtt.password}")
- private String mqttPassword;
- @Value("${mqtt.topic}")
- private String mqttTopic;
- @Value("${mqtt.saveAndForward}")
- private Boolean saveAndForward;
- @Override
- public void pubMqttData(String mqttStr) {
- //数据分发到websocket 前端调用
- websocketService.sendMessageAll(mqttStr);
- MqttSensorDataBo dataBo = JSONUtil.toBean(mqttStr, MqttSensorDataBo.class);
- TblSensor tblSensor = tblSensorMapper.selectById(dataBo.getSensorId());
- filterData(dataBo,tblSensor);
- //数据保存到数据库
- if(saveAndForward) {
- // 解析数据 异步线程执行
- PoolUtils.getPool().execute(new Runnable() {
- @Override
- public void run() {
- MqttSensorDataBo dataBo = JSONUtil.toBean(mqttStr, MqttSensorDataBo.class);
- Date timedata = new Date();
- if (dataBo.getTime() != null) {
- timedata=(dataBo.getTime());
- }
- Date finalTimedata = timedata;
- //放缓存内 sensorId 缓存数据
- MqttSensorDataBo dataBo1 = CacheUtils.get("sensorData", dataBo.getSensorId());
- if (dataBo1 == null) {
- dataBo.getData().forEach(i -> {
- i.setCreateTime(finalTimedata);
- });
- CacheUtils.put("sensorData", dataBo.getSensorId(), dataBo);
- } else {
- dataBo1.getData().addAll(dataBo.getData());
- dataBo.getData().forEach(i -> {
- List<SensorDataBo> dataBoList = dataBo1.getData().stream().filter(j -> j.getName().equals(i.getName())).collect(Collectors.toList());
- if (dataBoList.size() > 0) {
- dataBo1.getData().removeAll(dataBoList);
- }
- i.setCreateTime(finalTimedata);
- dataBo1.getData().add(i);
- });
- CacheUtils.put("sensorData", dataBo.getSensorId(), dataBo1);
- }
- //TODO 缓存获取对应设备的缓存数据
- TblSensor tblSensor = tblSensorMapper.selectById(dataBo.getSensorId());
- filterData(dataBo,tblSensor);
- if (tblSensor != null) {
- //保存到数据库
- List<TblSensorRecord> tblSensorRecords = new ArrayList<>();
- dataBo.getData().forEach(i->{
- TblSensorRecord tblSensorRecord = new TblSensorRecord();
- tblSensorRecord.setEquipmentId(tblSensor.getDeviceId());
- tblSensorRecord.setCreateBy("admin");
- tblSensorRecord.setUpdateBy("admin");
- tblSensorRecord.setCreateTime(finalTimedata);
- tblSensorRecord.setSensorId(tblSensor.getId());
- tblSensorRecord.setPointName(i.getName());
- tblSensorRecord.setPointValue(i.getValue());
- tblSensorRecords.add(tblSensorRecord);
- });
- if(tblSensorRecords.size()>0){
- tblSensorRecordMapper.insertBatch(tblSensorRecords);
- }
- String protocolType = "";
- List<SysDictData> sysDictTypeList = sysDictTypeService.selectDictDataByType("protocal_type");
- for (SysDictData sysDictData : sysDictTypeList) {
- if (Long.valueOf(sysDictData.getDictValue()).longValue() == tblSensor.getProtocalType().longValue()) {
- protocolType = sysDictData.getDictLabel();
- }
- }
- //转换
- SensorDataToOtherMqttBo mqttdata = new SensorDataToOtherMqttBo();
- mqttdata.setDeviceId(tblSensor.getDeviceId().toString());
- List<SensorDataBo> dataBoList = JSONUtil.toList(tblSensor.getDatapoints(), SensorDataBo.class);
- MqttSensorDataBo dataBo12 = CacheUtils.get("sensorData", dataBo.getSensorId());
- //推缓存数据
- dataBo12.getData().forEach(i -> {
- dataBoList.stream().filter(j -> j.getName().equals(i.getName())).forEach(j -> {
- i.setUnitSymbol(j.getUnitType());
- i.setUnit(j.getUnit());
- i.setUnitType(j.getUnit());
- });
- i.setParams(null);
- }
- );
- mqttdata.setData(dataBo12.getData());
- mqttdata.setSensorId(tblSensor.getId().toString());
- mqttdata.setCreateTime(finalTimedata);
- String mqttstr = JSONUtil.parseObj(mqttdata, true).toStringPretty();
- //转发mqtt数据
- //TODO 缓存获取对应mqtt的缓存数据
- MqttObj searchdata = new MqttObj();
- searchdata.setEquipmentId(tblSensor.getDeviceId());
- List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(searchdata);
- for (MqttObj obj : mqttObjList) {
- if (obj.getStatus() == 1) {
- JSONArray topicObj1 = JSONUtil.parseArray(obj.getServerTopic());
- MQTTConnect mqttConnect = new MQTTConnect();
- try {
- mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback());
- if (topicObj1.size() > 0) {
- String finalProtocolType = protocolType;
- topicObj1.forEach(i -> {
- JSONObject p = (JSONObject) i;
- String topic = p.getStr("name");
- topic = StrUtil.replace(topic, "$protocolType$", finalProtocolType);
- topic = StrUtil.replace(topic, "$sensorId$", tblSensor.getId().toString());
- try {
- mqttConnect.pub(topic, mqttstr.toString(), (Integer) p.get("qos"));
- } catch (MqttException e) {
- throw new RuntimeException(e);
- }
- });
- } else {
- String topic = "forwarding/" + protocolType + "/" + tblSensor.getId().toString();
- mqttConnect.pub(topic, mqttstr.toString(), 0);
- }
- } catch (Exception e) {
- log.error("mqtt转发失败", e);
- }
- }
- }
- }
- }
- });
- }
- }
- @Override
- public void createMqttMain(TblMqttBo bo){
- MQTTConnect mqttConnect = new MQTTConnect();
- try {
- System.out.println(mqttUrl);
- mqttConnect.createMqttClient(mqttUrl, UUID.fastUUID().toString(),mqttUser,mqttPassword,new Callback("main"));
- mqttConnect.sub(mqttTopic);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- public void filterData(MqttSensorDataBo dataBo,TblSensor tblSensor){
- TblRuleBo tblRuleBo = new TblRuleBo();
- tblRuleBo.setSensorId(Long.valueOf(dataBo.getSensorId()));
- List<TblRuleVo> ruleVoList = iTblRuleService.queryList(tblRuleBo);
- dataBo.getData().forEach(i->{
- ruleVoList.forEach(ruleVo->{
- if(i.getName().equals(ruleVo.getTriggeringCondition())){
- ruleFilter(i,ruleVo.getId(),tblSensor);
- }
- });
- });
- }
- public void ruleFilter(SensorDataBo bo,Long ruleId,TblSensor tblSensor){
- TblRuleFilterBo tblRuleFilterBo = new TblRuleFilterBo();
- tblRuleFilterBo.setRuleId(Long.toString(ruleId));
- List<TblRuleFilterVo> tblRuleFilterVoList = iTblRuleFilterService.queryList(tblRuleFilterBo);
- tblRuleFilterVoList.forEach(i->{
- String value = bo.getValue();
- if(StringUtils.isNotBlank(i.getFormula())){
- try {
- Context context = Context.enter();
- Scriptable scope = context.initStandardObjects();
- String script = "function format(data) { return "+i.getFormula()+" ; } format("+value+");";
- Object result = context.evaluateString(scope, script, "<cmd>", 1, null);
- value = (String) result;
- System.out.println(value);
- } catch (Exception e) {
- System.out.println("表达式runtime错误:" + e.getMessage());
- }
- }
- Boolean isFilter = false;
- if(StringUtils.isNotBlank(i.getFilterAlgorithm())){
- Context context = Context.enter();
- Scriptable scope = context.initStandardObjects();
- String script = "function isFilter(data,up,down) { return "+i.getFilterAlgorithm()+" ; }";
- try {
- script+="isFilter("+ value+","+i.getUpperLimit()+","+i.getLowerLimit()+")";
- Object result = context.evaluateString(scope, script, "<cmd>", 1, null);
- isFilter = (Boolean) result;
- // isFilter = (Boolean) inv.invokeFunction("isFilter", value,i.getUpperLimit(),i.getLowerLimit() );
- System.out.println(isFilter);
- if(isFilter){
- ruleExecute(bo,i.getId(),tblSensor);
- }
- } catch (Exception e) {
- System.out.println("表达式runtime错误:" + e.getMessage());
- }
- }
- });
- }
- private void validEntityBeforeSave(TblSensorRecord entity){
- //TODO 做一些数据校验,如唯一约束
- }
- private void ruleExecute(SensorDataBo bo,String id,TblSensor tblSensor){
- TblRuleExecuteBo executeBo = new TblRuleExecuteBo();
- executeBo.setFilterId(Long.valueOf(id));
- List<TblRuleExecuteVo> list = iTblRuleExecuteService.queryList(executeBo);
- for(TblRuleExecuteVo vo:list){
- if(vo.getExecuteAction().equals("warn")){
- TblWarnBo warn = new TblWarnBo();
- TblEquipmentSbookVo tblEquipment = equipmentSbookService.queryById(tblSensor.getDeviceId());
- warn.setName("设备"+tblEquipment.getName()+"传感器"+tblSensor.getName()+"数据异常");
- warn.setVal(bo.getValue());
- warn.setEquipmentId(tblEquipment.getId());
- warn.setEquipmentName(tblEquipment.getName());
- warn.setContent("设备"+tblEquipment.getName()+"传感器"+tblSensor.getName()+vo.getRemark());
- warn.setAlarmTime(new Date());
- if(StringUtils.isNotBlank(vo.getExpr1()) && vo.getExpr1() != null){
- JSONObject jsonObject = new JSONObject(vo.getExpr1());
- if(StringUtils.isNotBlank(jsonObject.getStr("postpone")) && jsonObject.getStr("postpone") != null){
- Long postpone = Long.valueOf(jsonObject.getStr("postpone"));
- try{
- Thread.sleep(postpone);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- if(jsonObject.getStr("frequency") != null && jsonObject.getStr("interval") != null && jsonObject.getStr("invalidation") != null){
- TblWarn tblWarn = new TblWarn();
- tblWarn.setEquipmentId(tblEquipment.getId());
- tblWarn.setStatus(jsonObject.getStr("interval"));
- List<TblWarn> warnList = tblWarnMapper.getWarnDeviceList(tblWarn);
- Integer frequency = Integer.valueOf(jsonObject.getStr("frequency"));
- Integer interval = Integer.valueOf(jsonObject.getStr("interval"));
- Integer invalidation = Integer.valueOf(jsonObject.getStr("invalidation"));
- Integer time = frequency*interval + invalidation + 100;
- tblWarn.setStatus(String.valueOf(time));
- List<TblWarn> totalWarnList = tblWarnMapper.getWarnDeviceList(tblWarn);
- if(warnList.size() == 0 && totalWarnList.size() < frequency){
- iTblWarnService.insertByBo(warn);
- }
- }
- }else{
- iTblWarnService.insertByBo(warn);
- }
- } else if (vo.getExecuteAction().equals("breakbown")) {
- TblBreakdownBo tblBreakdownBo = new TblBreakdownBo();
- TblEquipmentSbookVo tblEquipment = equipmentSbookService.queryById(tblSensor.getDeviceId());
- tblBreakdownBo.setName("设备"+tblEquipment.getName()+"传感器"+tblSensor.getName()+"故障");
- tblBreakdownBo.setVal(bo.getValue());
- tblBreakdownBo.setEquipmentId(tblEquipment.getId());
- tblBreakdownBo.setEquipmentName(tblEquipment.getName());
- tblBreakdownBo.setContent("设备"+tblEquipment.getName()+"传感器"+tblSensor.getName()+vo.getRemark());
- tblBreakdownBo.setFaultTime(new Date());
- if(StringUtils.isNotBlank(vo.getExpr1()) && vo.getExpr1() != null){
- JSONObject jsonObject = new JSONObject(vo.getExpr1());
- if(StringUtils.isNotBlank(jsonObject.getStr("postpone")) && jsonObject.getStr("postpone") != null){
- Long postpone = Long.valueOf(jsonObject.getStr("postpone"));
- try{
- Thread.sleep(postpone);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- if(jsonObject.getStr("frequency") != null && jsonObject.getStr("interval") != null && jsonObject.getStr("invalidation") != null){
- TblBreakdown tblBreakdown = new TblBreakdown();
- tblBreakdown.setEquipmentId(tblEquipment.getId());
- tblBreakdown.setStatus(jsonObject.getStr("interval"));
- List<TblBreakdown> breakList = tblBreakdownMapper.getBreakdownList(tblBreakdown);
- Integer frequency = Integer.valueOf(jsonObject.getStr("frequency"));
- Integer interval = Integer.valueOf(jsonObject.getStr("interval"));
- Integer invalidation = Integer.valueOf(jsonObject.getStr("invalidation"));
- Integer time = frequency*interval + invalidation + 100;
- tblBreakdown.setStatus(String.valueOf(time));
- List<TblBreakdown> totalbreakList = tblBreakdownMapper.getBreakdownList(tblBreakdown);
- if(breakList.size() == 0 && totalbreakList.size() < frequency){
- iTblBreakdownService.insertByBo(tblBreakdownBo);
- }
- }
- }else{
- iTblBreakdownService.insertByBo(tblBreakdownBo);
- }
- }
- }
- }
- @Override
- public void createMqtt(TblMqttBo bo){
- LambdaQueryWrapper<TblMqtt> lqw = buildQueryWrapper(bo);
- List<TblMqttVo> mqttVoList = tblMqttMapper.selectVoList(lqw);
- for(TblMqttVo mqttVo:mqttVoList){
- MQTTConnect mqttConnect = new MQTTConnect();
- try {
- mqttConnect.createMqttClient(mqttVo.getServerAddress(),mqttVo.getUuid(),mqttVo.getAccount(),mqttVo.getPassword(),new Callback());
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
- @Override
- public void pubOrder(OrderBean orderBean){
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("deviceId",orderBean.getDeviceId());
- jsonObject.put("add",orderBean.getAdd());
- jsonObject.put("value",orderBean.getValue());
- jsonObject.put("addrOffset",orderBean.getAddrOffset());
- jsonObject.put("len",orderBean.getValue().split(",").length);
- MQTTConnect mqttConnect = new MQTTConnect();
- try {
- mqttConnect.createMqttClient("ws://52.130.249.112:8083/mqtt","adminTest","ship","ship@2021.11.24",new Callback());
- mqttConnect.pub("control",jsonObject.toString(),0);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- private LambdaQueryWrapper<TblMqtt> buildQueryWrapper(TblMqttBo bo) {
- Map<String, Object> params = bo.getParams();
- LambdaQueryWrapper<TblMqtt> lqw = Wrappers.lambdaQuery();
- lqw.like(StringUtils.isNotBlank(bo.getProtocolName()), TblMqtt::getProtocolName, bo.getProtocolName());
- lqw.eq(StringUtils.isNotBlank(bo.getProtocolDesc()), TblMqtt::getProtocolDesc, bo.getProtocolDesc());
- lqw.eq(StringUtils.isNotBlank(bo.getProtocolType()), TblMqtt::getProtocolType, bo.getProtocolType());
- lqw.eq(StringUtils.isNotBlank(bo.getServerAddress()), TblMqtt::getServerAddress, bo.getServerAddress());
- lqw.eq(StringUtils.isNotBlank(bo.getServerTopic()), TblMqtt::getServerTopic, bo.getServerTopic());
- lqw.eq(StringUtils.isNotBlank(bo.getAccount()), TblMqtt::getAccount, bo.getAccount());
- lqw.eq(StringUtils.isNotBlank(bo.getPassword()), TblMqtt::getPassword, bo.getPassword());
- lqw.eq(StringUtils.isNotBlank(bo.getUuid()), TblMqtt::getUuid, bo.getUuid());
- lqw.eq(bo.getStatus() != null, TblMqtt::getStatus, bo.getStatus());
- return lqw;
- }
- class Callback implements MqttCallback {
- private String type;
- Callback(){
- type= "default";
- }
- Callback(String type){
- this.type = type;
- }
- /**
- * MQTT 断开连接会执行此方法
- */
- @Override
- public void connectionLost(Throwable throwable) {
- log.info("断开了MQTT连接111 :{}", throwable.getMessage());
- log.error(throwable.getMessage(), throwable);
- if(this.type.equals("main")){
- MqttServiceImpl.this.createMqtt(new TblMqttBo());
- }
- // MqttServiceImpl.this.createMqtt(new TblMqttBo());
- }
- /**
- * publish发布成功后会执行到这里
- */
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- log.info("发布消息成功");
- }
- /**
- * subscribe订阅后得到的消息会执行到这里
- */
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- // TODO 此处可以将订阅得到的消息进行业务处理、数据存储
- String payload = String.valueOf(message.getPayload());
- // String msg = Byte.toString(message.getPayload());
- byte[] bytes = message.getPayload();
- String encoded = Base64.getEncoder().encodeToString(bytes);
- byte[] decoded = Base64.getDecoder().decode(encoded);
- String msg =new String(decoded);
- System.out.println(msg);
- log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
- MqttServiceImpl.this.pubMqttData(msg);
- }
- }
- private LambdaQueryWrapper<TblSensorRecord> buildSensorRecordWrapper(TblSensorRecordBo bo) {
- Map<String, Object> params = bo.getParams();
- LambdaQueryWrapper<TblSensorRecord> lqw = Wrappers.lambdaQuery();
- lqw.eq(bo.getSensorId() != null, TblSensorRecord::getSensorId, bo.getSensorId());
- lqw.eq(StringUtils.isNotBlank(bo.getPointName()), TblSensorRecord::getPointName, bo.getPointName());
- lqw.eq(StringUtils.isNotBlank(bo.getPointValue()), TblSensorRecord::getPointValue, bo.getPointValue());
- lqw.eq(bo.getEquipmentId() != null, TblSensorRecord::getEquipmentId, bo.getEquipmentId());
- return lqw;
- }
- @Override
- public void setMqttCache(){
- List<TblEquipmentSbookVo> equipmentVoList = equipmentSbookService.queryList(new TblEquipmentSbookBo());
- for(TblEquipmentSbookVo vo:equipmentVoList){
- MqttObj obj = new MqttObj();
- obj.setEquipmentId(vo.getId());
- List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(obj);
- CacheUtils.put("DeviceMqtt",vo.getId(),mqttObjList);
- }
- }
- }
|