MqttServiceImpl.java 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. package com.ruoyi.data.service.impl;
  2. import cn.hutool.core.bean.BeanUtil;
  3. import cn.hutool.core.util.StrUtil;
  4. import cn.hutool.json.JSON;
  5. import cn.hutool.json.JSONArray;
  6. import cn.hutool.core.lang.UUID;
  7. import cn.hutool.json.JSONObject;
  8. import cn.hutool.json.JSONUtil;
  9. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  10. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  11. import com.ruoyi.common.core.domain.entity.SysDictData;
  12. import com.ruoyi.common.utils.PoolUtils;
  13. import com.ruoyi.common.utils.StringUtils;
  14. import com.ruoyi.common.utils.mqtt.MQTTConnect;
  15. import com.ruoyi.common.utils.redis.CacheUtils;
  16. import com.ruoyi.data.domain.*;
  17. import com.ruoyi.data.domain.bo.*;
  18. import com.ruoyi.data.domain.vo.*;
  19. import com.ruoyi.data.mapper.*;
  20. import com.ruoyi.data.service.*;
  21. import com.ruoyi.system.service.ISysDictTypeService;
  22. import lombok.Data;
  23. import lombok.RequiredArgsConstructor;
  24. import lombok.extern.slf4j.Slf4j;
  25. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  26. import org.eclipse.paho.client.mqttv3.MqttCallback;
  27. import org.eclipse.paho.client.mqttv3.MqttException;
  28. import org.eclipse.paho.client.mqttv3.MqttMessage;
  29. import org.springframework.beans.factory.annotation.Value;
  30. import org.springframework.stereotype.Service;
  31. import org.mozilla.javascript.Context;
  32. import org.mozilla.javascript.Scriptable;
  33. import org.mozilla.javascript.ScriptableObject;
  34. import java.util.*;
  35. import java.util.stream.Collectors;
  36. @RequiredArgsConstructor
  37. @Service
  38. @Slf4j
  39. public class MqttServiceImpl implements MqttService {
  40. private final TblMqttMapper tblMqttMapper;
  41. private final TblEquipmentMqttMapper tblEquipmentMqttMapper;
  42. private final TblRecordMapper tblRecordMapper;
  43. private final TblSensorMapper tblSensorMapper;
  44. private final TblSensorRecordMapper tblSensorRecordMapper;
  45. private final ISysDictTypeService sysDictTypeService;
  46. private final WebsocketService websocketService;
  47. private final ITblEquipmentSbookService equipmentSbookService;
  48. private final ITblRuleService iTblRuleService;
  49. private final ITblRuleFilterService iTblRuleFilterService;
  50. private final ITblBreakdownService iTblBreakdownService;
  51. private final ITblWarnService iTblWarnService;
  52. private final ITblRuleExecuteService iTblRuleExecuteService;
  53. private final TblWarnMapper tblWarnMapper;
  54. private final TblBreakdownMapper tblBreakdownMapper;
  55. @Value("${mqtt.url}")
  56. private String mqttUrl;
  57. @Value("${mqtt.clientid}")
  58. private String clientID;
  59. @Value("${mqtt.user}")
  60. private String mqttUser;
  61. @Value("${mqtt.password}")
  62. private String mqttPassword;
  63. @Value("${mqtt.topic}")
  64. private String mqttTopic;
  65. @Value("${mqtt.saveAndForward}")
  66. private Boolean saveAndForward;
  67. @Override
  68. public void pubMqttData(String mqttStr) {
  69. //数据分发到websocket 前端调用
  70. websocketService.sendMessageAll(mqttStr);
  71. MqttSensorDataBo dataBo = JSONUtil.toBean(mqttStr, MqttSensorDataBo.class);
  72. TblSensor tblSensor = tblSensorMapper.selectById(dataBo.getSensorId());
  73. filterData(dataBo,tblSensor);
  74. //数据保存到数据库
  75. if(saveAndForward) {
  76. // 解析数据 异步线程执行
  77. PoolUtils.getPool().execute(new Runnable() {
  78. @Override
  79. public void run() {
  80. MqttSensorDataBo dataBo = JSONUtil.toBean(mqttStr, MqttSensorDataBo.class);
  81. Date timedata = new Date();
  82. if (dataBo.getTime() != null) {
  83. timedata=(dataBo.getTime());
  84. }
  85. Date finalTimedata = timedata;
  86. //放缓存内 sensorId 缓存数据
  87. MqttSensorDataBo dataBo1 = CacheUtils.get("sensorData", dataBo.getSensorId());
  88. if (dataBo1 == null) {
  89. dataBo.getData().forEach(i -> {
  90. i.setCreateTime(finalTimedata);
  91. });
  92. CacheUtils.put("sensorData", dataBo.getSensorId(), dataBo);
  93. } else {
  94. dataBo1.getData().addAll(dataBo.getData());
  95. dataBo.getData().forEach(i -> {
  96. List<SensorDataBo> dataBoList = dataBo1.getData().stream().filter(j -> j.getName().equals(i.getName())).collect(Collectors.toList());
  97. if (dataBoList.size() > 0) {
  98. dataBo1.getData().removeAll(dataBoList);
  99. }
  100. i.setCreateTime(finalTimedata);
  101. dataBo1.getData().add(i);
  102. });
  103. CacheUtils.put("sensorData", dataBo.getSensorId(), dataBo1);
  104. }
  105. //TODO 缓存获取对应设备的缓存数据
  106. TblSensor tblSensor = tblSensorMapper.selectById(dataBo.getSensorId());
  107. filterData(dataBo,tblSensor);
  108. if (tblSensor != null) {
  109. //保存到数据库
  110. List<TblSensorRecord> tblSensorRecords = new ArrayList<>();
  111. dataBo.getData().forEach(i->{
  112. TblSensorRecord tblSensorRecord = new TblSensorRecord();
  113. tblSensorRecord.setEquipmentId(tblSensor.getDeviceId());
  114. tblSensorRecord.setCreateBy("admin");
  115. tblSensorRecord.setUpdateBy("admin");
  116. tblSensorRecord.setCreateTime(finalTimedata);
  117. tblSensorRecord.setSensorId(tblSensor.getId());
  118. tblSensorRecord.setPointName(i.getName());
  119. tblSensorRecord.setPointValue(i.getValue());
  120. tblSensorRecords.add(tblSensorRecord);
  121. });
  122. if(tblSensorRecords.size()>0){
  123. tblSensorRecordMapper.insertBatch(tblSensorRecords);
  124. }
  125. String protocolType = "";
  126. List<SysDictData> sysDictTypeList = sysDictTypeService.selectDictDataByType("protocal_type");
  127. for (SysDictData sysDictData : sysDictTypeList) {
  128. if (Long.valueOf(sysDictData.getDictValue()).longValue() == tblSensor.getProtocalType().longValue()) {
  129. protocolType = sysDictData.getDictLabel();
  130. }
  131. }
  132. //转换
  133. SensorDataToOtherMqttBo mqttdata = new SensorDataToOtherMqttBo();
  134. mqttdata.setDeviceId(tblSensor.getDeviceId().toString());
  135. List<SensorDataBo> dataBoList = JSONUtil.toList(tblSensor.getDatapoints(), SensorDataBo.class);
  136. MqttSensorDataBo dataBo12 = CacheUtils.get("sensorData", dataBo.getSensorId());
  137. //推缓存数据
  138. dataBo12.getData().forEach(i -> {
  139. dataBoList.stream().filter(j -> j.getName().equals(i.getName())).forEach(j -> {
  140. i.setUnitSymbol(j.getUnitType());
  141. i.setUnit(j.getUnit());
  142. i.setUnitType(j.getUnit());
  143. });
  144. i.setParams(null);
  145. }
  146. );
  147. mqttdata.setData(dataBo12.getData());
  148. mqttdata.setSensorId(tblSensor.getId().toString());
  149. mqttdata.setCreateTime(finalTimedata);
  150. String mqttstr = JSONUtil.parseObj(mqttdata, true).toStringPretty();
  151. //转发mqtt数据
  152. //TODO 缓存获取对应mqtt的缓存数据
  153. MqttObj searchdata = new MqttObj();
  154. searchdata.setEquipmentId(tblSensor.getDeviceId());
  155. List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(searchdata);
  156. for (MqttObj obj : mqttObjList) {
  157. if (obj.getStatus() == 1) {
  158. JSONArray topicObj1 = JSONUtil.parseArray(obj.getServerTopic());
  159. MQTTConnect mqttConnect = new MQTTConnect();
  160. try {
  161. mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback());
  162. if (topicObj1.size() > 0) {
  163. String finalProtocolType = protocolType;
  164. topicObj1.forEach(i -> {
  165. JSONObject p = (JSONObject) i;
  166. String topic = p.getStr("name");
  167. topic = StrUtil.replace(topic, "$protocolType$", finalProtocolType);
  168. topic = StrUtil.replace(topic, "$sensorId$", tblSensor.getId().toString());
  169. try {
  170. mqttConnect.pub(topic, mqttstr.toString(), (Integer) p.get("qos"));
  171. } catch (MqttException e) {
  172. throw new RuntimeException(e);
  173. }
  174. });
  175. } else {
  176. String topic = "forwarding/" + protocolType + "/" + tblSensor.getId().toString();
  177. mqttConnect.pub(topic, mqttstr.toString(), 0);
  178. }
  179. } catch (Exception e) {
  180. log.error("mqtt转发失败", e);
  181. }
  182. }
  183. }
  184. }
  185. }
  186. });
  187. }
  188. }
  189. @Override
  190. public void createMqttMain(TblMqttBo bo){
  191. MQTTConnect mqttConnect = new MQTTConnect();
  192. try {
  193. System.out.println(mqttUrl);
  194. mqttConnect.createMqttClient(mqttUrl, UUID.fastUUID().toString(),mqttUser,mqttPassword,new Callback("main"));
  195. mqttConnect.sub(mqttTopic);
  196. }catch (Exception e){
  197. e.printStackTrace();
  198. }
  199. }
  200. public void filterData(MqttSensorDataBo dataBo,TblSensor tblSensor){
  201. TblRuleBo tblRuleBo = new TblRuleBo();
  202. tblRuleBo.setSensorId(Long.valueOf(dataBo.getSensorId()));
  203. List<TblRuleVo> ruleVoList = iTblRuleService.queryList(tblRuleBo);
  204. dataBo.getData().forEach(i->{
  205. ruleVoList.forEach(ruleVo->{
  206. if(i.getName().equals(ruleVo.getTriggeringCondition())){
  207. ruleFilter(i,ruleVo.getId(),tblSensor);
  208. }
  209. });
  210. });
  211. }
  212. public void ruleFilter(SensorDataBo bo,Long ruleId,TblSensor tblSensor){
  213. TblRuleFilterBo tblRuleFilterBo = new TblRuleFilterBo();
  214. tblRuleFilterBo.setRuleId(Long.toString(ruleId));
  215. List<TblRuleFilterVo> tblRuleFilterVoList = iTblRuleFilterService.queryList(tblRuleFilterBo);
  216. tblRuleFilterVoList.forEach(i->{
  217. String value = bo.getValue();
  218. if(StringUtils.isNotBlank(i.getFormula())){
  219. try {
  220. Context context = Context.enter();
  221. Scriptable scope = context.initStandardObjects();
  222. String script = "function format(data) { return "+i.getFormula()+" ; } format("+value+");";
  223. Object result = context.evaluateString(scope, script, "<cmd>", 1, null);
  224. value = (String) result;
  225. System.out.println(value);
  226. } catch (Exception e) {
  227. System.out.println("表达式runtime错误:" + e.getMessage());
  228. }
  229. }
  230. Boolean isFilter = false;
  231. if(StringUtils.isNotBlank(i.getFilterAlgorithm())){
  232. Context context = Context.enter();
  233. Scriptable scope = context.initStandardObjects();
  234. String script = "function isFilter(data,up,down) { return "+i.getFilterAlgorithm()+" ; }";
  235. try {
  236. script+="isFilter("+ value+","+i.getUpperLimit()+","+i.getLowerLimit()+")";
  237. Object result = context.evaluateString(scope, script, "<cmd>", 1, null);
  238. isFilter = (Boolean) result;
  239. // isFilter = (Boolean) inv.invokeFunction("isFilter", value,i.getUpperLimit(),i.getLowerLimit() );
  240. System.out.println(isFilter);
  241. if(isFilter){
  242. ruleExecute(bo,i.getId(),tblSensor);
  243. }
  244. } catch (Exception e) {
  245. System.out.println("表达式runtime错误:" + e.getMessage());
  246. }
  247. }
  248. });
  249. }
  250. private void validEntityBeforeSave(TblSensorRecord entity){
  251. //TODO 做一些数据校验,如唯一约束
  252. }
  253. private void ruleExecute(SensorDataBo bo,String id,TblSensor tblSensor){
  254. TblRuleExecuteBo executeBo = new TblRuleExecuteBo();
  255. executeBo.setFilterId(Long.valueOf(id));
  256. List<TblRuleExecuteVo> list = iTblRuleExecuteService.queryList(executeBo);
  257. for(TblRuleExecuteVo vo:list){
  258. if(vo.getExecuteAction().equals("warn")){
  259. TblWarnBo warn = new TblWarnBo();
  260. TblEquipmentSbookVo tblEquipment = equipmentSbookService.queryById(tblSensor.getDeviceId());
  261. warn.setName("设备"+tblEquipment.getName()+"传感器"+tblSensor.getName()+"数据异常");
  262. warn.setVal(bo.getValue());
  263. warn.setEquipmentId(tblEquipment.getId());
  264. warn.setEquipmentName(tblEquipment.getName());
  265. warn.setContent("设备"+tblEquipment.getName()+"传感器"+tblSensor.getName()+vo.getRemark());
  266. warn.setAlarmTime(new Date());
  267. if(StringUtils.isNotBlank(vo.getExpr1()) && vo.getExpr1() != null){
  268. JSONObject jsonObject = new JSONObject(vo.getExpr1());
  269. if(StringUtils.isNotBlank(jsonObject.getStr("postpone")) && jsonObject.getStr("postpone") != null){
  270. Long postpone = Long.valueOf(jsonObject.getStr("postpone"));
  271. try{
  272. Thread.sleep(postpone);
  273. }catch (Exception e){
  274. e.printStackTrace();
  275. }
  276. }
  277. if(jsonObject.getStr("frequency") != null && jsonObject.getStr("interval") != null && jsonObject.getStr("invalidation") != null){
  278. TblWarn tblWarn = new TblWarn();
  279. tblWarn.setEquipmentId(tblEquipment.getId());
  280. tblWarn.setStatus(jsonObject.getStr("interval"));
  281. List<TblWarn> warnList = tblWarnMapper.getWarnDeviceList(tblWarn);
  282. Integer frequency = Integer.valueOf(jsonObject.getStr("frequency"));
  283. Integer interval = Integer.valueOf(jsonObject.getStr("interval"));
  284. Integer invalidation = Integer.valueOf(jsonObject.getStr("invalidation"));
  285. Integer time = frequency*interval + invalidation + 100;
  286. tblWarn.setStatus(String.valueOf(time));
  287. List<TblWarn> totalWarnList = tblWarnMapper.getWarnDeviceList(tblWarn);
  288. if(warnList.size() == 0 && totalWarnList.size() < frequency){
  289. iTblWarnService.insertByBo(warn);
  290. }
  291. }
  292. }else{
  293. iTblWarnService.insertByBo(warn);
  294. }
  295. } else if (vo.getExecuteAction().equals("breakbown")) {
  296. TblBreakdownBo tblBreakdownBo = new TblBreakdownBo();
  297. TblEquipmentSbookVo tblEquipment = equipmentSbookService.queryById(tblSensor.getDeviceId());
  298. tblBreakdownBo.setName("设备"+tblEquipment.getName()+"传感器"+tblSensor.getName()+"故障");
  299. tblBreakdownBo.setVal(bo.getValue());
  300. tblBreakdownBo.setEquipmentId(tblEquipment.getId());
  301. tblBreakdownBo.setEquipmentName(tblEquipment.getName());
  302. tblBreakdownBo.setContent("设备"+tblEquipment.getName()+"传感器"+tblSensor.getName()+vo.getRemark());
  303. tblBreakdownBo.setFaultTime(new Date());
  304. if(StringUtils.isNotBlank(vo.getExpr1()) && vo.getExpr1() != null){
  305. JSONObject jsonObject = new JSONObject(vo.getExpr1());
  306. if(StringUtils.isNotBlank(jsonObject.getStr("postpone")) && jsonObject.getStr("postpone") != null){
  307. Long postpone = Long.valueOf(jsonObject.getStr("postpone"));
  308. try{
  309. Thread.sleep(postpone);
  310. }catch (Exception e){
  311. e.printStackTrace();
  312. }
  313. }
  314. if(jsonObject.getStr("frequency") != null && jsonObject.getStr("interval") != null && jsonObject.getStr("invalidation") != null){
  315. TblBreakdown tblBreakdown = new TblBreakdown();
  316. tblBreakdown.setEquipmentId(tblEquipment.getId());
  317. tblBreakdown.setStatus(jsonObject.getStr("interval"));
  318. List<TblBreakdown> breakList = tblBreakdownMapper.getBreakdownList(tblBreakdown);
  319. Integer frequency = Integer.valueOf(jsonObject.getStr("frequency"));
  320. Integer interval = Integer.valueOf(jsonObject.getStr("interval"));
  321. Integer invalidation = Integer.valueOf(jsonObject.getStr("invalidation"));
  322. Integer time = frequency*interval + invalidation + 100;
  323. tblBreakdown.setStatus(String.valueOf(time));
  324. List<TblBreakdown> totalbreakList = tblBreakdownMapper.getBreakdownList(tblBreakdown);
  325. if(breakList.size() == 0 && totalbreakList.size() < frequency){
  326. iTblBreakdownService.insertByBo(tblBreakdownBo);
  327. }
  328. }
  329. }else{
  330. iTblBreakdownService.insertByBo(tblBreakdownBo);
  331. }
  332. }
  333. }
  334. }
  335. @Override
  336. public void createMqtt(TblMqttBo bo){
  337. LambdaQueryWrapper<TblMqtt> lqw = buildQueryWrapper(bo);
  338. List<TblMqttVo> mqttVoList = tblMqttMapper.selectVoList(lqw);
  339. for(TblMqttVo mqttVo:mqttVoList){
  340. MQTTConnect mqttConnect = new MQTTConnect();
  341. try {
  342. mqttConnect.createMqttClient(mqttVo.getServerAddress(),mqttVo.getUuid(),mqttVo.getAccount(),mqttVo.getPassword(),new Callback());
  343. }catch (Exception e){
  344. e.printStackTrace();
  345. }
  346. }
  347. }
  348. @Override
  349. public void pubOrder(OrderBean orderBean){
  350. JSONObject jsonObject = new JSONObject();
  351. jsonObject.put("deviceId",orderBean.getDeviceId());
  352. jsonObject.put("add",orderBean.getAdd());
  353. jsonObject.put("value",orderBean.getValue());
  354. jsonObject.put("addrOffset",orderBean.getAddrOffset());
  355. jsonObject.put("len",orderBean.getValue().split(",").length);
  356. MQTTConnect mqttConnect = new MQTTConnect();
  357. try {
  358. mqttConnect.createMqttClient("ws://52.130.249.112:8083/mqtt","adminTest","ship","ship@2021.11.24",new Callback());
  359. mqttConnect.pub("control",jsonObject.toString(),0);
  360. }catch (Exception e){
  361. e.printStackTrace();
  362. }
  363. }
  364. private LambdaQueryWrapper<TblMqtt> buildQueryWrapper(TblMqttBo bo) {
  365. Map<String, Object> params = bo.getParams();
  366. LambdaQueryWrapper<TblMqtt> lqw = Wrappers.lambdaQuery();
  367. lqw.like(StringUtils.isNotBlank(bo.getProtocolName()), TblMqtt::getProtocolName, bo.getProtocolName());
  368. lqw.eq(StringUtils.isNotBlank(bo.getProtocolDesc()), TblMqtt::getProtocolDesc, bo.getProtocolDesc());
  369. lqw.eq(StringUtils.isNotBlank(bo.getProtocolType()), TblMqtt::getProtocolType, bo.getProtocolType());
  370. lqw.eq(StringUtils.isNotBlank(bo.getServerAddress()), TblMqtt::getServerAddress, bo.getServerAddress());
  371. lqw.eq(StringUtils.isNotBlank(bo.getServerTopic()), TblMqtt::getServerTopic, bo.getServerTopic());
  372. lqw.eq(StringUtils.isNotBlank(bo.getAccount()), TblMqtt::getAccount, bo.getAccount());
  373. lqw.eq(StringUtils.isNotBlank(bo.getPassword()), TblMqtt::getPassword, bo.getPassword());
  374. lqw.eq(StringUtils.isNotBlank(bo.getUuid()), TblMqtt::getUuid, bo.getUuid());
  375. lqw.eq(bo.getStatus() != null, TblMqtt::getStatus, bo.getStatus());
  376. return lqw;
  377. }
  378. class Callback implements MqttCallback {
  379. private String type;
  380. Callback(){
  381. type= "default";
  382. }
  383. Callback(String type){
  384. this.type = type;
  385. }
  386. /**
  387. * MQTT 断开连接会执行此方法
  388. */
  389. @Override
  390. public void connectionLost(Throwable throwable) {
  391. log.info("断开了MQTT连接111 :{}", throwable.getMessage());
  392. log.error(throwable.getMessage(), throwable);
  393. if(this.type.equals("main")){
  394. MqttServiceImpl.this.createMqtt(new TblMqttBo());
  395. }
  396. // MqttServiceImpl.this.createMqtt(new TblMqttBo());
  397. }
  398. /**
  399. * publish发布成功后会执行到这里
  400. */
  401. @Override
  402. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  403. log.info("发布消息成功");
  404. }
  405. /**
  406. * subscribe订阅后得到的消息会执行到这里
  407. */
  408. @Override
  409. public void messageArrived(String topic, MqttMessage message) throws Exception {
  410. // TODO 此处可以将订阅得到的消息进行业务处理、数据存储
  411. String payload = String.valueOf(message.getPayload());
  412. // String msg = Byte.toString(message.getPayload());
  413. byte[] bytes = message.getPayload();
  414. String encoded = Base64.getEncoder().encodeToString(bytes);
  415. byte[] decoded = Base64.getDecoder().decode(encoded);
  416. String msg =new String(decoded);
  417. System.out.println(msg);
  418. log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
  419. MqttServiceImpl.this.pubMqttData(msg);
  420. }
  421. }
  422. private LambdaQueryWrapper<TblSensorRecord> buildSensorRecordWrapper(TblSensorRecordBo bo) {
  423. Map<String, Object> params = bo.getParams();
  424. LambdaQueryWrapper<TblSensorRecord> lqw = Wrappers.lambdaQuery();
  425. lqw.eq(bo.getSensorId() != null, TblSensorRecord::getSensorId, bo.getSensorId());
  426. lqw.eq(StringUtils.isNotBlank(bo.getPointName()), TblSensorRecord::getPointName, bo.getPointName());
  427. lqw.eq(StringUtils.isNotBlank(bo.getPointValue()), TblSensorRecord::getPointValue, bo.getPointValue());
  428. lqw.eq(bo.getEquipmentId() != null, TblSensorRecord::getEquipmentId, bo.getEquipmentId());
  429. return lqw;
  430. }
  431. @Override
  432. public void setMqttCache(){
  433. List<TblEquipmentSbookVo> equipmentVoList = equipmentSbookService.queryList(new TblEquipmentSbookBo());
  434. for(TblEquipmentSbookVo vo:equipmentVoList){
  435. MqttObj obj = new MqttObj();
  436. obj.setEquipmentId(vo.getId());
  437. List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(obj);
  438. CacheUtils.put("DeviceMqtt",vo.getId(),mqttObjList);
  439. }
  440. }
  441. }