| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- package com.ruoyi.data.service.impl;
- import cn.hutool.core.bean.BeanUtil;
- import cn.hutool.json.JSONObject;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import com.ruoyi.common.utils.StringUtils;
- import com.ruoyi.common.utils.mqtt.MQTTConnect;
- import com.ruoyi.data.domain.*;
- import com.ruoyi.data.domain.bo.TblMqttBo;
- import com.ruoyi.data.domain.bo.TblSensorRecordBo;
- import com.ruoyi.data.domain.vo.TblMqttVo;
- import com.ruoyi.data.domain.vo.TblRecordVo;
- import com.ruoyi.data.domain.vo.TblSensorRecordVo;
- import com.ruoyi.data.mapper.*;
- import com.ruoyi.data.service.MqttService;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
- import sun.management.Sensor;
- import java.text.SimpleDateFormat;
- import java.util.Base64;
- import java.util.Date;
- import java.util.List;
- import java.util.Map;
- @RequiredArgsConstructor
- @Service
- @Slf4j
- public class MqttServiceImpl implements MqttService {
- private final TblMqttMapper tblMqttMapper;
- private final TblEquipmentMqttMapper tblEquipmentMqttMapper;
- private final TblRecordMapper tblRecordMapper;
- private final TblSensorMapper tblSensorMapper;
- private final TblSensorRecordMapper tblSensorRecordMapper;
- @Value("${mqtt.url}")
- private String mqttUrl;
- @Value("${mqtt.clientid}")
- private String clientID;
- @Value("${mqtt.user}")
- private String mqttUser;
- @Value("${mqtt.password}")
- private String mqttPassword;
- @Value("${mqtt.topic}")
- private String mqttTopic;
- // @Value("${mqtt.url}")
- // private String mqttUrl;
- @Override
- public void pubMqttData(String mqttStr) {
- JSONObject jsonObject = new JSONObject(mqttStr);
- Long sensorId = Long.valueOf((String) jsonObject.get("sensorId"));
- TblSensor tblSensor = tblSensorMapper.selectById(sensorId);
- Long deviceId = tblSensor.getDeviceId();
- SimpleDateFormat formatter= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Date date = new Date(System.currentTimeMillis());
- jsonObject.put("created_time",formatter.format(date));
- MqttObj mqttObj = new MqttObj();
- mqttObj.setEquipmentId(deviceId);
- List<MqttObj> mqttObjList = tblEquipmentMqttMapper.selectMqttListByDeviceId(mqttObj);
- TblRecord tblRecord = new TblRecord();
- tblRecord.setEquipmentId(deviceId);
- tblRecord.setSensorId(sensorId);
- tblRecord.setJson(mqttStr);
- tblRecord.setCreateBy("admin");
- tblRecord.setUpdateBy("admin");
- tblRecord.setSensorId(sensorId);
- tblRecordMapper.insert(tblRecord);
- TblSensorRecordBo tblSensorRecord = new TblSensorRecordBo();
- tblSensorRecord.setEquipmentId(deviceId);
- tblSensorRecord.setJson(mqttStr);
- tblSensorRecord.setCreateBy("admin");
- tblSensorRecord.setUpdateBy("admin");
- tblSensorRecord.setSensorId(sensorId);
- updateNowRecord(tblSensorRecord);
- // tblSensorRecordMapper.insert(tblSensorRecord);
- for(MqttObj obj:mqttObjList){
- if(obj.getStatus() == 1) {
- JSONObject topicObj = obj.getTopicQos("tcp");
- MQTTConnect mqttConnect = new MQTTConnect();
- try {
- mqttConnect.createMqttClient(obj.getServerAddress(), obj.getUuid(), obj.getAccount(), obj.getPassword(), new Callback());
- if (topicObj != null) {
- String topic = topicObj.get("name").toString().replace("#", "");
- mqttConnect.pub(topic, jsonObject.toString(), Integer.valueOf((String) topicObj.get("qos")));
- } else {
- String topic = "sensor/modbustcp/" + deviceId;
- mqttConnect.pub(topic, jsonObject.toString(), 0);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- @Override
- public void createMqttMain(TblMqttBo bo){
- MQTTConnect mqttConnect = new MQTTConnect();
- try {
- System.out.println(mqttUrl);
- mqttConnect.createMqttClient(mqttUrl,clientID,mqttUser,mqttPassword,new Callback());
- mqttConnect.sub(mqttTopic);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- public void updateNowRecord(TblSensorRecordBo bo){
- TblSensorRecordBo tblSensorRecordBo = new TblSensorRecordBo();
- tblSensorRecordBo.setSensorId(bo.getSensorId());
- LambdaQueryWrapper<TblSensorRecord> lqw = buildSensorRecordWrapper(bo);
- List<TblSensorRecordVo> tblSensorRecordVoList = tblSensorRecordMapper.selectVoList(lqw);
- if(tblSensorRecordVoList.size() > 0 ){
- bo.setId(tblSensorRecordVoList.get(0).getId());
- TblSensorRecord update = BeanUtil.toBean(bo, TblSensorRecord.class);
- validEntityBeforeSave(update);
- tblSensorRecordMapper.updateById(update);
- }else{
- TblSensorRecord add = BeanUtil.toBean(bo, TblSensorRecord.class);
- validEntityBeforeSave(add);
- tblSensorRecordMapper.insert(add);
- }
- }
- private void validEntityBeforeSave(TblSensorRecord entity){
- //TODO 做一些数据校验,如唯一约束
- }
- @Override
- public void createMqtt(TblMqttBo bo){
- LambdaQueryWrapper<TblMqtt> lqw = buildQueryWrapper(bo);
- List<TblMqttVo> mqttVoList = tblMqttMapper.selectVoList(lqw);
- for(TblMqttVo mqttVo:mqttVoList){
- MQTTConnect mqttConnect = new MQTTConnect();
- try {
- mqttConnect.createMqttClient(mqttVo.getServerAddress(),mqttVo.getUuid(),mqttVo.getAccount(),mqttVo.getPassword(),new Callback());
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
- @Override
- public void pubOrder(OrderBean orderBean){
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("deviceId",orderBean.getDeviceId());
- jsonObject.put("add",orderBean.getAdd());
- jsonObject.put("value",orderBean.getValue());
- jsonObject.put("addrOffset",orderBean.getAddrOffset());
- jsonObject.put("len",orderBean.getValue().split(",").length);
- MQTTConnect mqttConnect = new MQTTConnect();
- try {
- mqttConnect.createMqttClient("ws://52.130.249.112:8083/mqtt","adminTest","ship","ship@2021.11.24",new Callback());
- mqttConnect.pub("control",jsonObject.toString(),0);
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- private LambdaQueryWrapper<TblMqtt> buildQueryWrapper(TblMqttBo bo) {
- Map<String, Object> params = bo.getParams();
- LambdaQueryWrapper<TblMqtt> lqw = Wrappers.lambdaQuery();
- lqw.like(StringUtils.isNotBlank(bo.getProtocolName()), TblMqtt::getProtocolName, bo.getProtocolName());
- lqw.eq(StringUtils.isNotBlank(bo.getProtocolDesc()), TblMqtt::getProtocolDesc, bo.getProtocolDesc());
- lqw.eq(StringUtils.isNotBlank(bo.getProtocolType()), TblMqtt::getProtocolType, bo.getProtocolType());
- lqw.eq(StringUtils.isNotBlank(bo.getServerAddress()), TblMqtt::getServerAddress, bo.getServerAddress());
- lqw.eq(StringUtils.isNotBlank(bo.getServerTopic()), TblMqtt::getServerTopic, bo.getServerTopic());
- lqw.eq(StringUtils.isNotBlank(bo.getAccount()), TblMqtt::getAccount, bo.getAccount());
- lqw.eq(StringUtils.isNotBlank(bo.getPassword()), TblMqtt::getPassword, bo.getPassword());
- lqw.eq(StringUtils.isNotBlank(bo.getUuid()), TblMqtt::getUuid, bo.getUuid());
- // lqw.eq(bo.getStatus() != null, TblMqtt::getStatus, bo.getStatus());
- return lqw;
- }
- class Callback implements MqttCallback {
- /**
- * MQTT 断开连接会执行此方法
- */
- @Override
- public void connectionLost(Throwable throwable) {
- log.info("断开了MQTT连接111 :{}", throwable.getMessage());
- log.error(throwable.getMessage(), throwable);
- }
- /**
- * publish发布成功后会执行到这里
- */
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- log.info("发布消息成功");
- }
- /**
- * subscribe订阅后得到的消息会执行到这里
- */
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- // TODO 此处可以将订阅得到的消息进行业务处理、数据存储
- String payload = String.valueOf(message.getPayload());
- // String msg = Byte.toString(message.getPayload());
- byte[] bytes = message.getPayload();
- String encoded = Base64.getEncoder().encodeToString(bytes);
- byte[] decoded = Base64.getDecoder().decode(encoded);
- String msg =new String(decoded);
- System.out.println(msg);
- log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
- MqttServiceImpl.this.pubMqttData(msg);
- }
- }
- private LambdaQueryWrapper<TblSensorRecord> buildSensorRecordWrapper(TblSensorRecordBo bo) {
- Map<String, Object> params = bo.getParams();
- LambdaQueryWrapper<TblSensorRecord> lqw = Wrappers.lambdaQuery();
- lqw.eq(bo.getSensorId() != null, TblSensorRecord::getSensorId, bo.getSensorId());
- lqw.eq(StringUtils.isNotBlank(bo.getJson()), TblSensorRecord::getJson, bo.getJson());
- lqw.eq(bo.getEquipmentId() != null, TblSensorRecord::getEquipmentId, bo.getEquipmentId());
- return lqw;
- }
- }
|