|
|
@@ -9,19 +9,19 @@ import cn.hutool.json.JSONUtil;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
import com.ruoyi.common.core.domain.entity.SysDictData;
|
|
|
+import com.ruoyi.common.utils.PoolUtils;
|
|
|
import com.ruoyi.common.utils.StringUtils;
|
|
|
import com.ruoyi.common.utils.mqtt.MQTTConnect;
|
|
|
import com.ruoyi.common.utils.redis.CacheUtils;
|
|
|
import com.ruoyi.data.domain.*;
|
|
|
-import com.ruoyi.data.domain.bo.TblEquipmentSbookBo;
|
|
|
-import com.ruoyi.data.domain.bo.TblMqttBo;
|
|
|
-import com.ruoyi.data.domain.bo.TblSensorRecordBo;
|
|
|
+import com.ruoyi.data.domain.bo.*;
|
|
|
import com.ruoyi.data.domain.vo.*;
|
|
|
import com.ruoyi.data.mapper.*;
|
|
|
import com.ruoyi.data.service.ITblEquipmentSbookService;
|
|
|
import com.ruoyi.data.service.MqttService;
|
|
|
import com.ruoyi.data.service.WebsocketService;
|
|
|
import com.ruoyi.system.service.ISysDictTypeService;
|
|
|
+import lombok.Data;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
|
|
@@ -32,10 +32,8 @@ import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.text.SimpleDateFormat;
|
|
|
-import java.util.Base64;
|
|
|
-import java.util.Date;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
+import java.util.*;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
@RequiredArgsConstructor
|
|
|
@Service
|
|
|
@@ -78,76 +76,135 @@ public class MqttServiceImpl implements MqttService {
|
|
|
|
|
|
@Override
|
|
|
public void pubMqttData(String mqttStr) {
|
|
|
+
|
|
|
+ //数据分发到websocket 前端调用
|
|
|
websocketService.sendMessageAll(mqttStr);
|
|
|
+ //数据保存到数据库
|
|
|
if(saveAndForward) {
|
|
|
- JSONObject jsonObject = new JSONObject(mqttStr);
|
|
|
- Long sensorId = jsonObject.getLong("sensorId");
|
|
|
- TblSensor tblSensor = tblSensorMapper.selectById(sensorId);
|
|
|
- Long deviceId = tblSensor.getDeviceId();
|
|
|
- SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
|
- Date date = new Date(System.currentTimeMillis());
|
|
|
- JSONObject mqttMsg = new JSONObject();
|
|
|
- mqttMsg.put("deviceSn", deviceId);
|
|
|
- mqttMsg.put("sensorSn", sensorId);
|
|
|
- mqttMsg.put("sensorType", tblSensor.getSensorType());
|
|
|
- mqttMsg.put("created_time", formatter.format(date));
|
|
|
- JSONArray sensorData = new JSONArray();
|
|
|
- JSONArray dataArry = new JSONArray(jsonObject.get("data"));
|
|
|
- for (Object obj : dataArry) {
|
|
|
- JSONObject dataObj = new JSONObject(obj);
|
|
|
- dataObj.remove("gcdinfo");
|
|
|
- sensorData.add(dataObj);
|
|
|
- }
|
|
|
- mqttMsg.put("sensorData", sensorData);
|
|
|
- MqttObj mqttObj = new MqttObj();
|
|
|
- mqttObj.setEquipmentId(deviceId);
|
|
|
- List<MqttObj> mqttObjList = CacheUtils.get("DeviceMqtt",deviceId);
|
|
|
- TblRecord tblRecord = new TblRecord();
|
|
|
- tblRecord.setEquipmentId(deviceId);
|
|
|
- tblRecord.setJson(mqttMsg.toString());
|
|
|
- tblRecord.setCreateBy("admin");
|
|
|
- tblRecord.setUpdateBy("admin");
|
|
|
- tblRecord.setSensorId(sensorId);
|
|
|
- tblRecordMapper.insert(tblRecord);
|
|
|
- CacheUtils.put("sensorData",sensorId,mqttMsg.toString());
|
|
|
- String protocolType = "";
|
|
|
- List<SysDictData> sysDictTypeList = sysDictTypeService.selectDictDataByType("protocal_type");
|
|
|
- for(SysDictData sysDictData:sysDictTypeList){
|
|
|
- if(Long.valueOf(sysDictData.getDictValue()).longValue() == tblSensor.getProtocalType().longValue()){
|
|
|
- protocolType = sysDictData.getDictLabel();
|
|
|
- }
|
|
|
- }
|
|
|
- for(MqttObj obj:mqttObjList){
|
|
|
- if(obj.getStatus() == 1) {
|
|
|
- JSONArray topicObj1 = JSONUtil.parseArray(obj.getServerTopic());
|
|
|
- MQTTConnect mqttConnect = new MQTTConnect();
|
|
|
- try {
|
|
|
- mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback());
|
|
|
- if (topicObj1.size() >0) {
|
|
|
- String finalProtocolType = protocolType;
|
|
|
- topicObj1.forEach(i->{
|
|
|
- JSONObject p = (JSONObject)i;
|
|
|
- String topic = p.getStr("name");
|
|
|
- topic = StrUtil.replace(topic,"$protocolType$", finalProtocolType);
|
|
|
- topic = StrUtil.replace(topic,"$sensorId$", sensorId.toString());
|
|
|
- try {
|
|
|
- mqttConnect.pub(topic, mqttMsg.toString(), (Integer) p.get("qos"));
|
|
|
- } catch (MqttException e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- });
|
|
|
- } else {
|
|
|
- String topic = "sensor/"+protocolType+"/" + sensorId;
|
|
|
- mqttConnect.pub(topic, mqttMsg.toString(), 0);
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ // 解析数据 异步线程执行
|
|
|
+ PoolUtils.getPool().execute(new Runnable() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+
|
|
|
+ MqttSensorDataBo dataBo = JSONUtil.toBean(mqttStr, MqttSensorDataBo.class);
|
|
|
+ Date timedata = new Date();
|
|
|
+ if (dataBo.getTime() != null) {
|
|
|
+ timedata=(dataBo.getTime());
|
|
|
+ }
|
|
|
+ Date finalTimedata = timedata;
|
|
|
+
|
|
|
+ //放缓存内 sensorId 缓存数据
|
|
|
+ MqttSensorDataBo dataBo1 = CacheUtils.get("sensorData", dataBo.getSensorId());
|
|
|
+ if (dataBo1 == null) {
|
|
|
+ dataBo.getData().forEach(i -> {
|
|
|
+ i.setCreateTime(finalTimedata);
|
|
|
+ });
|
|
|
+ CacheUtils.put("sensorData", dataBo.getSensorId(), dataBo);
|
|
|
+ } else {
|
|
|
+ dataBo1.getData().addAll(dataBo.getData());
|
|
|
+ dataBo.getData().forEach(i -> {
|
|
|
+ List<SensorDataBo> dataBoList = dataBo1.getData().stream().filter(j -> j.getName().equals(i.getName())).collect(Collectors.toList());
|
|
|
+ if (dataBoList.size() > 0) {
|
|
|
+ dataBo1.getData().removeAll(dataBoList);
|
|
|
+ }
|
|
|
+ i.setCreateTime(finalTimedata);
|
|
|
+ dataBo1.getData().add(i);
|
|
|
+ });
|
|
|
+ CacheUtils.put("sensorData", dataBo.getSensorId(), dataBo1);
|
|
|
+ }
|
|
|
+ //TODO 缓存获取对应设备的缓存数据
|
|
|
+ TblSensor tblSensor = tblSensorMapper.selectById(dataBo.getSensorId());
|
|
|
+ if (tblSensor != null) {
|
|
|
+ //保存到数据库
|
|
|
+
|
|
|
+
|
|
|
+ List<TblSensorRecord> tblSensorRecords = new ArrayList<>();
|
|
|
+
|
|
|
+ dataBo.getData().forEach(i->{
|
|
|
+ TblSensorRecord tblSensorRecord = new TblSensorRecord();
|
|
|
+ tblSensorRecord.setEquipmentId(tblSensor.getDeviceId());
|
|
|
+ tblSensorRecord.setCreateBy("admin");
|
|
|
+ tblSensorRecord.setUpdateBy("admin");
|
|
|
+ tblSensorRecord.setCreateTime(finalTimedata);
|
|
|
+ tblSensorRecord.setSensorId(tblSensor.getId());
|
|
|
+ tblSensorRecord.setPointName(i.getName());
|
|
|
+ tblSensorRecord.setPointValue(i.getValue());
|
|
|
+ tblSensorRecords.add(tblSensorRecord);
|
|
|
+ });
|
|
|
+ if(tblSensorRecords.size()>0){
|
|
|
+ tblSensorRecordMapper.insertBatch(tblSensorRecords);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ String protocolType = "";
|
|
|
+ List<SysDictData> sysDictTypeList = sysDictTypeService.selectDictDataByType("protocal_type");
|
|
|
+ for (SysDictData sysDictData : sysDictTypeList) {
|
|
|
+ if (Long.valueOf(sysDictData.getDictValue()).longValue() == tblSensor.getProtocalType().longValue()) {
|
|
|
+ protocolType = sysDictData.getDictLabel();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //转换
|
|
|
+ SensorDataToOtherMqttBo mqttdata = new SensorDataToOtherMqttBo();
|
|
|
+ mqttdata.setDeviceId(tblSensor.getDeviceId().toString());
|
|
|
+ List<SensorDataBo> dataBoList = JSONUtil.toList(tblSensor.getDatapoints(), SensorDataBo.class);
|
|
|
+ dataBo.getData().forEach(i -> {
|
|
|
+ dataBoList.stream().filter(j -> j.getName().equals(i.getName())).forEach(j -> {
|
|
|
+ i.setUnitSymbol(j.getUnitType());
|
|
|
+ i.setUnit(j.getUnit());
|
|
|
+ i.setUnitType(j.getUnit());
|
|
|
+ });
|
|
|
+ i.setParams(null);
|
|
|
+ }
|
|
|
+ );
|
|
|
+ mqttdata.setData(dataBo.getData());
|
|
|
+ mqttdata.setSensorId(tblSensor.getId().toString());
|
|
|
+ mqttdata.setCreateTime(finalTimedata);
|
|
|
+ String mqttstr = JSONUtil.parseObj(mqttdata, true).toStringPretty();
|
|
|
+ //转发mqtt数据
|
|
|
+
|
|
|
+ //TODO 缓存获取对应mqtt的缓存数据
|
|
|
+ MqttObj searchdata = new MqttObj();
|
|
|
+ searchdata.setEquipmentId(tblSensor.getDeviceId());
|
|
|
+ List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(searchdata);
|
|
|
+
|
|
|
+ for (MqttObj obj : mqttObjList) {
|
|
|
+ if (obj.getStatus() == 1) {
|
|
|
+ JSONArray topicObj1 = JSONUtil.parseArray(obj.getServerTopic());
|
|
|
+ MQTTConnect mqttConnect = new MQTTConnect();
|
|
|
+ try {
|
|
|
+ mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback());
|
|
|
+ if (topicObj1.size() > 0) {
|
|
|
+ String finalProtocolType = protocolType;
|
|
|
+ topicObj1.forEach(i -> {
|
|
|
+ JSONObject p = (JSONObject) i;
|
|
|
+ String topic = p.getStr("name");
|
|
|
+ topic = StrUtil.replace(topic, "$protocolType$", finalProtocolType);
|
|
|
+ topic = StrUtil.replace(topic, "$sensorId$", tblSensor.getId().toString());
|
|
|
+ try {
|
|
|
+ mqttConnect.pub(topic, mqttstr.toString(), (Integer) p.get("qos"));
|
|
|
+ } catch (MqttException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ String topic = "sensor/" + protocolType + "/" + tblSensor.getId().toString();
|
|
|
+ mqttConnect.pub(topic, mqttstr.toString(), 0);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("mqtt转发失败", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
@Override
|
|
|
public void createMqttMain(TblMqttBo bo){
|
|
|
MQTTConnect mqttConnect = new MQTTConnect();
|
|
|
@@ -268,7 +325,8 @@ public class MqttServiceImpl implements MqttService {
|
|
|
Map<String, Object> params = bo.getParams();
|
|
|
LambdaQueryWrapper<TblSensorRecord> lqw = Wrappers.lambdaQuery();
|
|
|
lqw.eq(bo.getSensorId() != null, TblSensorRecord::getSensorId, bo.getSensorId());
|
|
|
- lqw.eq(StringUtils.isNotBlank(bo.getJson()), TblSensorRecord::getJson, bo.getJson());
|
|
|
+ lqw.eq(StringUtils.isNotBlank(bo.getPointName()), TblSensorRecord::getPointName, bo.getPointName());
|
|
|
+ lqw.eq(StringUtils.isNotBlank(bo.getPointValue()), TblSensorRecord::getPointValue, bo.getPointValue());
|
|
|
lqw.eq(bo.getEquipmentId() != null, TblSensorRecord::getEquipmentId, bo.getEquipmentId());
|
|
|
return lqw;
|
|
|
}
|