| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- package com.ruoyi.data.service.impl;
- import cn.hutool.core.bean.BeanUtil;
- import cn.hutool.core.util.StrUtil;
- import cn.hutool.json.JSONArray;
- import cn.hutool.core.lang.UUID;
- import cn.hutool.json.JSONObject;
- import cn.hutool.json.JSONUtil;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import com.ruoyi.common.core.domain.entity.SysDictData;
- import com.ruoyi.common.utils.StringUtils;
- import com.ruoyi.common.utils.mqtt.MQTTConnect;
- import com.ruoyi.common.utils.redis.CacheUtils;
- import com.ruoyi.data.domain.*;
- import com.ruoyi.data.domain.bo.TblMqttBo;
- import com.ruoyi.data.domain.bo.TblSensorRecordBo;
- import com.ruoyi.data.domain.vo.TblMqttVo;
- import com.ruoyi.data.domain.vo.TblRecordVo;
- import com.ruoyi.data.domain.vo.TblSensorRecordVo;
- import com.ruoyi.data.mapper.*;
- import com.ruoyi.data.service.MqttService;
- import com.ruoyi.data.service.WebsocketService;
- import com.ruoyi.system.service.ISysDictTypeService;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
- import sun.management.Sensor;
- import javax.annotation.PostConstruct;
- import java.text.SimpleDateFormat;
- import java.util.Base64;
- import java.util.Date;
- import java.util.List;
- import java.util.Map;
- @RequiredArgsConstructor
- @Service
- @Slf4j
- public class MqttServiceImpl implements MqttService {
- private final TblMqttMapper tblMqttMapper;
- private final TblEquipmentMqttMapper tblEquipmentMqttMapper;
- private final TblRecordMapper tblRecordMapper;
- private final TblSensorMapper tblSensorMapper;
- private final TblSensorRecordMapper tblSensorRecordMapper;
- private final ISysDictTypeService sysDictTypeService;
- private final WebsocketService websocketService;
- @Value("${mqtt.url}")
- private String mqttUrl;
- @Value("${mqtt.clientid}")
- private String clientID;
- @Value("${mqtt.user}")
- private String mqttUser;
- @Value("${mqtt.password}")
- private String mqttPassword;
- @Value("${mqtt.topic}")
- private String mqttTopic;
- @Value("${mqtt.saveAndForward}")
- private Boolean saveAndForward;
- @Override
- public void pubMqttData(String mqttStr) {
- websocketService.sendMessageAll(mqttStr);
- if(saveAndForward) {
- JSONObject jsonObject = new JSONObject(mqttStr);
- Long sensorId = Long.valueOf((String) jsonObject.get("sensorId"));
- TblSensor tblSensor = tblSensorMapper.selectById(sensorId);
- Long deviceId = tblSensor.getDeviceId();
- SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Date date = new Date(System.currentTimeMillis());
- JSONObject mqttMsg = new JSONObject();
- mqttMsg.put("deviceSn", deviceId);
- mqttMsg.put("sensorSn", sensorId);
- mqttMsg.put("sensorType", tblSensor.getSensorType());
- mqttMsg.put("created_time", formatter.format(date));
- JSONArray sensorData = new JSONArray();
- JSONArray dataArry = new JSONArray(jsonObject.get("data"));
- for (Object obj : dataArry) {
- JSONObject dataObj = new JSONObject(obj);
- dataObj.remove("gcdinfo");
- sensorData.add(dataObj);
- }
- mqttMsg.put("sensorData", sensorData);
- MqttObj mqttObj = new MqttObj();
- mqttObj.setEquipmentId(deviceId);
- List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(mqttObj);
- TblRecord tblRecord = new TblRecord();
- tblRecord.setEquipmentId(deviceId);
- tblRecord.setJson(mqttStr);
- tblRecord.setCreateBy("admin");
- tblRecord.setUpdateBy("admin");
- tblRecord.setSensorId(sensorId);
- tblRecordMapper.insert(tblRecord);
- // TblSensorRecordBo tblSensorRecord = new TblSensorRecordBo();
- // tblSensorRecord.setEquipmentId(deviceId);
- // tblSensorRecord.setJson(mqttStr);
- // tblSensorRecord.setSensorId(sensorId);
- // tblSensorRecord.setCreateBy("admin");
- // tblSensorRecord.setUpdateBy("admin");
- // updateNowRecord(tblSensorRecord);
- CacheUtils.put("sensorData",sensorId,mqttMsg.toString());
- // tblSensorRecordMapper.insert(tblSensorRecord);
- String protocolType = "";
- List<SysDictData> sysDictTypeList = sysDictTypeService.selectDictDataByType("protocal_type");
- for(SysDictData sysDictData:sysDictTypeList){
- if(Long.valueOf(sysDictData.getDictValue()).longValue() == tblSensor.getProtocalType().longValue()){
- protocolType = sysDictData.getDictLabel();
- }
- }
- for(MqttObj obj:mqttObjList){
- if(obj.getStatus() == 1) {
- JSONArray topicObj1 = JSONUtil.parseArray(obj.getServerTopic());
- MQTTConnect mqttConnect = new MQTTConnect();
- try {
- mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback());
- if (topicObj1.size() >0) {
- String finalProtocolType = protocolType;
- topicObj1.forEach(i->{
- JSONObject p = (JSONObject)i;
- String topic = p.getStr("name");
- topic = StrUtil.replace(topic,"$protocolType$", finalProtocolType);
- topic = StrUtil.replace(topic,"$sensorId$", sensorId.toString());
- try {
- mqttConnect.pub(topic, mqttMsg.toString(), (Integer) p.get("qos"));
- } catch (MqttException e) {
- throw new RuntimeException(e);
- }
- });
- } else {
- String topic = "sensor/"+protocolType+"/" + sensorId;
- mqttConnect.pub(topic, mqttMsg.toString(), 0);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- @Override
- public void createMqttMain(TblMqttBo bo){
- MQTTConnect mqttConnect = new MQTTConnect();
- try {
- System.out.println(mqttUrl);
- mqttConnect.createMqttClient(mqttUrl, UUID.fastUUID().toString(),mqttUser,mqttPassword,new Callback());
- mqttConnect.sub(mqttTopic);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- public void updateNowRecord(TblSensorRecordBo bo){
- TblSensorRecordBo tblSensorRecordBo = new TblSensorRecordBo();
- tblSensorRecordBo.setSensorId(bo.getSensorId());
- LambdaQueryWrapper<TblSensorRecord> lqw = buildSensorRecordWrapper(bo);
- List<TblSensorRecordVo> tblSensorRecordVoList = tblSensorRecordMapper.selectVoList(lqw);
- if(tblSensorRecordVoList.size() > 0 ){
- bo.setId(tblSensorRecordVoList.get(0).getId());
- TblSensorRecord update = BeanUtil.toBean(bo, TblSensorRecord.class);
- validEntityBeforeSave(update);
- tblSensorRecordMapper.updateById(update);
- }else{
- TblSensorRecord add = BeanUtil.toBean(bo, TblSensorRecord.class);
- validEntityBeforeSave(add);
- tblSensorRecordMapper.insert(add);
- }
- }
- private void validEntityBeforeSave(TblSensorRecord entity){
- //TODO 做一些数据校验,如唯一约束
- }
- @Override
- public void createMqtt(TblMqttBo bo){
- LambdaQueryWrapper<TblMqtt> lqw = buildQueryWrapper(bo);
- List<TblMqttVo> mqttVoList = tblMqttMapper.selectVoList(lqw);
- for(TblMqttVo mqttVo:mqttVoList){
- MQTTConnect mqttConnect = new MQTTConnect();
- try {
- mqttConnect.createMqttClient(mqttVo.getServerAddress(),mqttVo.getUuid(),mqttVo.getAccount(),mqttVo.getPassword(),new Callback());
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
- @Override
- public void pubOrder(OrderBean orderBean){
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("deviceId",orderBean.getDeviceId());
- jsonObject.put("add",orderBean.getAdd());
- jsonObject.put("value",orderBean.getValue());
- jsonObject.put("addrOffset",orderBean.getAddrOffset());
- jsonObject.put("len",orderBean.getValue().split(",").length);
- MQTTConnect mqttConnect = new MQTTConnect();
- try {
- mqttConnect.createMqttClient("ws://52.130.249.112:8083/mqtt","adminTest","ship","ship@2021.11.24",new Callback());
- mqttConnect.pub("control",jsonObject.toString(),0);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- private LambdaQueryWrapper<TblMqtt> buildQueryWrapper(TblMqttBo bo) {
- Map<String, Object> params = bo.getParams();
- LambdaQueryWrapper<TblMqtt> lqw = Wrappers.lambdaQuery();
- lqw.like(StringUtils.isNotBlank(bo.getProtocolName()), TblMqtt::getProtocolName, bo.getProtocolName());
- lqw.eq(StringUtils.isNotBlank(bo.getProtocolDesc()), TblMqtt::getProtocolDesc, bo.getProtocolDesc());
- lqw.eq(StringUtils.isNotBlank(bo.getProtocolType()), TblMqtt::getProtocolType, bo.getProtocolType());
- lqw.eq(StringUtils.isNotBlank(bo.getServerAddress()), TblMqtt::getServerAddress, bo.getServerAddress());
- lqw.eq(StringUtils.isNotBlank(bo.getServerTopic()), TblMqtt::getServerTopic, bo.getServerTopic());
- lqw.eq(StringUtils.isNotBlank(bo.getAccount()), TblMqtt::getAccount, bo.getAccount());
- lqw.eq(StringUtils.isNotBlank(bo.getPassword()), TblMqtt::getPassword, bo.getPassword());
- lqw.eq(StringUtils.isNotBlank(bo.getUuid()), TblMqtt::getUuid, bo.getUuid());
- // lqw.eq(bo.getStatus() != null, TblMqtt::getStatus, bo.getStatus());
- return lqw;
- }
- class Callback implements MqttCallback {
- /**
- * MQTT 断开连接会执行此方法
- */
- @Override
- public void connectionLost(Throwable throwable) {
- log.info("断开了MQTT连接111 :{}", throwable.getMessage());
- log.error(throwable.getMessage(), throwable);
- }
- /**
- * publish发布成功后会执行到这里
- */
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- log.info("发布消息成功");
- }
- /**
- * subscribe订阅后得到的消息会执行到这里
- */
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- // TODO 此处可以将订阅得到的消息进行业务处理、数据存储
- String payload = String.valueOf(message.getPayload());
- // String msg = Byte.toString(message.getPayload());
- byte[] bytes = message.getPayload();
- String encoded = Base64.getEncoder().encodeToString(bytes);
- byte[] decoded = Base64.getDecoder().decode(encoded);
- String msg =new String(decoded);
- System.out.println(msg);
- log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
- MqttServiceImpl.this.pubMqttData(msg);
- }
- }
- private LambdaQueryWrapper<TblSensorRecord> buildSensorRecordWrapper(TblSensorRecordBo bo) {
- Map<String, Object> params = bo.getParams();
- LambdaQueryWrapper<TblSensorRecord> lqw = Wrappers.lambdaQuery();
- lqw.eq(bo.getSensorId() != null, TblSensorRecord::getSensorId, bo.getSensorId());
- lqw.eq(StringUtils.isNotBlank(bo.getJson()), TblSensorRecord::getJson, bo.getJson());
- lqw.eq(bo.getEquipmentId() != null, TblSensorRecord::getEquipmentId, bo.getEquipmentId());
- return lqw;
- }
- }
|