|
@@ -13,35 +13,36 @@ package com.ruoyi.ems.handle;
|
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
|
import com.huashe.common.exception.Assert;
|
|
|
import com.huashe.common.exception.BusinessException;
|
|
|
+import com.huashe.common.utils.DateUtils;
|
|
|
import com.huashe.common.utils.ThreadUtils;
|
|
|
import com.ruoyi.ems.core.MessageCache;
|
|
|
-import com.ruoyi.ems.core.MqttTemplate;
|
|
|
+import com.ruoyi.ems.domain.ElecMeterH;
|
|
|
import com.ruoyi.ems.domain.EmsDevice;
|
|
|
import com.ruoyi.ems.domain.EmsObjAbilityCallLog;
|
|
|
-import com.ruoyi.ems.domain.EmsObjAttrValue;
|
|
|
+import com.ruoyi.ems.domain.MeterDevice;
|
|
|
import com.ruoyi.ems.enums.DevObjType;
|
|
|
+import com.ruoyi.ems.enums.DevOnlineStatus;
|
|
|
import com.ruoyi.ems.model.AbilityPayload;
|
|
|
import com.ruoyi.ems.model.MqttCacheMsg;
|
|
|
-import com.ruoyi.ems.service.IEmsDeviceService;
|
|
|
+import com.ruoyi.ems.model.Price;
|
|
|
+import com.ruoyi.ems.service.IElecMeterHService;
|
|
|
import com.ruoyi.ems.service.IEmsObjAbilityCallLogService;
|
|
|
-import com.ruoyi.ems.service.IEmsObjAttrService;
|
|
|
-import com.ruoyi.ems.service.IEmsObjAttrValueService;
|
|
|
+import com.ruoyi.ems.service.IMeterDeviceService;
|
|
|
+import com.ruoyi.ems.service.IPriceService;
|
|
|
import com.ruoyi.ems.util.IdUtils;
|
|
|
-import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
-import javax.annotation.Resource;
|
|
|
+import java.math.BigDecimal;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+import java.time.temporal.ChronoUnit;
|
|
|
import java.util.Date;
|
|
|
-import java.util.List;
|
|
|
import java.util.Map;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* GeekOpen 断路器服务层
|
|
@@ -56,33 +57,24 @@ import java.util.stream.Collectors;
|
|
|
public class GeekOpenCbHandler extends MqttBaseHandler {
|
|
|
private static final Logger log = LoggerFactory.getLogger(GeekOpenCbHandler.class);
|
|
|
|
|
|
- @Resource
|
|
|
- @Qualifier("mqttTemplate")
|
|
|
- private MqttTemplate mqttTemplate;
|
|
|
-
|
|
|
@Autowired
|
|
|
private MessageCache messageCache;
|
|
|
|
|
|
@Autowired
|
|
|
- private IEmsDeviceService deviceService;
|
|
|
+ private IEmsObjAbilityCallLogService objAbilityCallLogService;
|
|
|
|
|
|
@Autowired
|
|
|
- private IEmsObjAttrService objAttrService;
|
|
|
+ private IMeterDeviceService meterDeviceService;
|
|
|
|
|
|
@Autowired
|
|
|
- private IEmsObjAttrValueService objAttrValueService;
|
|
|
+ private IElecMeterHService elecMeterHService;
|
|
|
|
|
|
@Autowired
|
|
|
- private IEmsObjAbilityCallLogService objAbilityCallLogService;
|
|
|
-
|
|
|
- /**
|
|
|
- * 缓存设备属性
|
|
|
- */
|
|
|
- private static final Map<String, Map<String, String>> attrCache = new ConcurrentHashMap<>();
|
|
|
+ private IPriceService priceService;
|
|
|
|
|
|
private static final String TOPIC_PREFIX = "/device/dlq/";
|
|
|
|
|
|
- private static final Integer DEVICE_OBJ_TYPE = DevObjType.DEVC.getCode();
|
|
|
+ private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
|
|
|
/**
|
|
|
* 能力执行
|
|
@@ -94,14 +86,14 @@ public class GeekOpenCbHandler extends MqttBaseHandler {
|
|
|
public String call(AbilityPayload abilityParam) {
|
|
|
String retStr = "执行成功!";
|
|
|
|
|
|
- String messageId = IdUtils.generateMessageId();
|
|
|
+ String messageId = "CALL-" + IdUtils.generateMessageId();
|
|
|
String deviceCode = abilityParam.getObjCode();
|
|
|
String msgBody = addMsgId(abilityParam.getAbilityParam(), "messageId", messageId);
|
|
|
|
|
|
// 发送消息到MQTT服务器
|
|
|
long sendTime = System.currentTimeMillis();
|
|
|
String topic = TOPIC_PREFIX + deviceCode;
|
|
|
- mqttTemplate.send(topic, msgBody, 2, false);
|
|
|
+ sendMqttMsg(topic, msgBody, 2, false);
|
|
|
|
|
|
// 写入日志
|
|
|
EmsObjAbilityCallLog logItem = saveLog(abilityParam, sendTime, 1);
|
|
@@ -130,7 +122,9 @@ public class GeekOpenCbHandler extends MqttBaseHandler {
|
|
|
|
|
|
ThreadUtils.sleep(100);
|
|
|
|
|
|
- if (System.currentTimeMillis() - sendTime > 60000) {
|
|
|
+ if (System.currentTimeMillis() - sendTime > 20000) {
|
|
|
+ EmsDevice device = deviceService.selectByCode(deviceCode);
|
|
|
+ refreshStatus(device, DevOnlineStatus.OFFLINE);
|
|
|
retStr = "响应超时!";
|
|
|
break;
|
|
|
}
|
|
@@ -148,21 +142,27 @@ public class GeekOpenCbHandler extends MqttBaseHandler {
|
|
|
|
|
|
if (null != device) {
|
|
|
JSONObject msgBody = JSONObject.parseObject(payload);
|
|
|
- refreshStatus(device);
|
|
|
-
|
|
|
String messageId = msgBody.getString("messageId");
|
|
|
|
|
|
+ // 自动上报数据: 1.更新属性值,2.更新电量计量数据
|
|
|
if (StringUtils.equals("auto", messageId)) {
|
|
|
updateAutoAttr(device, msgBody);
|
|
|
}
|
|
|
- else if (StringUtils.isNotEmpty(messageId)) {
|
|
|
+ // 前序调用的响应消息:1.写入消息队列,2.更新属性值
|
|
|
+ else if (StringUtils.isNotEmpty(messageId) && StringUtils.startsWith(messageId, "CALL-")) {
|
|
|
MqttCacheMsg mqttCacheMsg = new MqttCacheMsg(messageId, deviceCode, new Date(), payload);
|
|
|
messageCache.addMqttMessage(messageId, mqttCacheMsg);
|
|
|
updateBaseAttr(device, msgBody);
|
|
|
}
|
|
|
+ // 设备同步数据(INFO类,协议类):更新基础属性值
|
|
|
else {
|
|
|
updateBaseAttr(device, msgBody);
|
|
|
}
|
|
|
+
|
|
|
+ // 设备消息抵达,将数据库离线状态改为在线
|
|
|
+ refreshStatus(device, DevOnlineStatus.ONLINE);
|
|
|
+ // 设备消息抵达,更新最后消息时间
|
|
|
+ updateMsgTime(device, new Date());
|
|
|
}
|
|
|
else {
|
|
|
log.warn("接收消息,设备未注册, deviceCode:{}\nmessageBody:{}", deviceCode, payload);
|
|
@@ -173,14 +173,87 @@ public class GeekOpenCbHandler extends MqttBaseHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public int getObjType() {
|
|
|
+ return DevObjType.DEVC.getCode();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * 刷新设备状态
|
|
|
+ * 据根据累计量更新电量计量数
|
|
|
*
|
|
|
- * @param device 设备信息
|
|
|
+ * @param device 设备信息
|
|
|
+ * @param msgBody 消息体
|
|
|
*/
|
|
|
- private void refreshStatus(EmsDevice device) {
|
|
|
- device.setDeviceStatus(1);
|
|
|
- deviceService.updateEmsDevice(device);
|
|
|
+ private void syncElecMeterByCumulant(EmsDevice device, JSONObject msgBody) {
|
|
|
+ MeterDevice meterDev = meterDeviceService.selectMeterDeviceByCode(device.getDeviceCode());
|
|
|
+
|
|
|
+ if (null == meterDev) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, String> arrtMap = attrCache.get(device.getDeviceCode());
|
|
|
+
|
|
|
+ // 获取当前时间
|
|
|
+ String currentTime = DateUtils.dateToString(new Date(), "yyyy-MM-dd HH:00:00");
|
|
|
+ LocalDateTime currentLdt = LocalDateTime.parse(currentTime, TIME_FORMATTER);
|
|
|
+ // 获取缓存时间
|
|
|
+ String cacheTime = arrtMap.get("elecMeterHour");
|
|
|
+
|
|
|
+ if (StringUtils.isNotEmpty(cacheTime)) {
|
|
|
+ LocalDateTime cacheLdt = LocalDateTime.parse(cacheTime, TIME_FORMATTER);
|
|
|
+
|
|
|
+ String newEngValue = msgBody.getString("energy");
|
|
|
+
|
|
|
+ long hours = ChronoUnit.HOURS.between(cacheLdt, currentLdt);
|
|
|
+
|
|
|
+ if (hours == 1) {
|
|
|
+ String lastEngValue = arrtMap.get("elecMeterHourValue");
|
|
|
+
|
|
|
+ BigDecimal lastEngValueBig = new BigDecimal(lastEngValue);
|
|
|
+ BigDecimal newEngValueBig = new BigDecimal(newEngValue);
|
|
|
+ BigDecimal diff = newEngValueBig.subtract(lastEngValueBig);
|
|
|
+
|
|
|
+ Date date = DateUtils.stringToDate(cacheTime, "yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
+ ElecMeterH elecMeterH = new ElecMeterH();
|
|
|
+ elecMeterH.setAreaCode(meterDev.getAreaCode());
|
|
|
+ elecMeterH.setDeviceCode(meterDev.getDeviceCode());
|
|
|
+ elecMeterH.setRecordTime(date);
|
|
|
+ elecMeterH.setDate(date);
|
|
|
+ elecMeterH.setTime(date);
|
|
|
+ elecMeterH.setTimeIndex(getHourIndex(date));
|
|
|
+ elecMeterH.setElecQuantity(diff.doubleValue());
|
|
|
+
|
|
|
+ Price price = priceService.getElecHourPrice(meterDev.getAreaCode(), date);
|
|
|
+
|
|
|
+ if (null != price) {
|
|
|
+ BigDecimal cost = diff.multiply(new BigDecimal(String.valueOf(price.getPriceValue())));
|
|
|
+ elecMeterH.setMeterType(price.getMeterType());
|
|
|
+ elecMeterH.setMeterUnitPrice(price.getPriceValue());
|
|
|
+ elecMeterH.setUseElecCost(cost.doubleValue());
|
|
|
+ }
|
|
|
+
|
|
|
+ elecMeterHService.insertElecMeterH(elecMeterH);
|
|
|
+ LocalDateTime nextTime = cacheLdt.plusHours(1);
|
|
|
+ String nextHour = nextTime.format(TIME_FORMATTER);
|
|
|
+ arrtMap.put("elecMeterHour", nextHour);
|
|
|
+ arrtMap.put("elecMeterHourValue", newEngValue);
|
|
|
+ }
|
|
|
+ else if (hours == 0) {
|
|
|
+ arrtMap.computeIfAbsent("elecMeterHourValue", key -> newEngValue);
|
|
|
+ }
|
|
|
+ else if (hours > 1) {
|
|
|
+ LocalDateTime nextTime = currentLdt.plusHours(1);
|
|
|
+ String nextHour = nextTime.format(TIME_FORMATTER);
|
|
|
+ arrtMap.put("elecMeterHour", nextHour);
|
|
|
+ arrtMap.remove("elecMeterHourValue");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ LocalDateTime nextTime = currentLdt.plusHours(1);
|
|
|
+ String nextHour = nextTime.format(TIME_FORMATTER);
|
|
|
+ arrtMap.put("elecMeterHour", nextHour);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -190,25 +263,25 @@ public class GeekOpenCbHandler extends MqttBaseHandler {
|
|
|
* @param jsonBody jsonBody 消息体
|
|
|
*/
|
|
|
private void updateBaseAttr(EmsDevice device, JSONObject jsonBody) {
|
|
|
- checkForUpdate(device, jsonBody, "key");
|
|
|
- checkForUpdate(device, jsonBody, "iccid");
|
|
|
- checkForUpdate(device, jsonBody, "imei");
|
|
|
- checkForUpdate(device, jsonBody, "signal");
|
|
|
- checkForUpdate(device, jsonBody, "version");
|
|
|
- checkForUpdate(device, jsonBody, "timerEnable");
|
|
|
- checkForUpdate(device, jsonBody, "timerInterval");
|
|
|
- checkForUpdate(device, jsonBody, "keyLock");
|
|
|
- checkForUpdate(device, jsonBody, "onState");
|
|
|
- checkForUpdate(device, jsonBody, "resetLock");
|
|
|
+ updateAttr(device, jsonBody, "key");
|
|
|
+ updateAttr(device, jsonBody, "iccid");
|
|
|
+ updateAttr(device, jsonBody, "imei");
|
|
|
+ updateAttr(device, jsonBody, "signal");
|
|
|
+ updateAttr(device, jsonBody, "version");
|
|
|
+ updateAttr(device, jsonBody, "timerEnable");
|
|
|
+ updateAttr(device, jsonBody, "timerInterval");
|
|
|
+ updateAttr(device, jsonBody, "keyLock");
|
|
|
+ updateAttr(device, jsonBody, "onState");
|
|
|
+ updateAttr(device, jsonBody, "resetLock");
|
|
|
|
|
|
// 协议信息
|
|
|
- checkForUpdate(device, jsonBody, "protocol");
|
|
|
- checkForUpdate(device, jsonBody, "clientId");
|
|
|
- checkForUpdate(device, jsonBody, "server");
|
|
|
- checkForUpdate(device, jsonBody, "port");
|
|
|
- checkForUpdate(device, jsonBody, "username");
|
|
|
- checkForUpdate(device, jsonBody, "publish");
|
|
|
- checkForUpdate(device, jsonBody, "subcribe");
|
|
|
+ updateAttr(device, jsonBody, "protocol");
|
|
|
+ updateAttr(device, jsonBody, "clientId");
|
|
|
+ updateAttr(device, jsonBody, "server");
|
|
|
+ updateAttr(device, jsonBody, "port");
|
|
|
+ updateAttr(device, jsonBody, "username");
|
|
|
+ updateAttr(device, jsonBody, "publish");
|
|
|
+ updateAttr(device, jsonBody, "subcribe");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -218,53 +291,13 @@ public class GeekOpenCbHandler extends MqttBaseHandler {
|
|
|
* @param jsonBody jsonBody 消息体
|
|
|
*/
|
|
|
private void updateAutoAttr(EmsDevice device, JSONObject jsonBody) {
|
|
|
- checkForUpdate(device, jsonBody, "voltage");
|
|
|
- checkForUpdate(device, jsonBody, "current");
|
|
|
- checkForUpdate(device, jsonBody, "power");
|
|
|
- checkForUpdate(device, jsonBody, "energy");
|
|
|
- }
|
|
|
+ updateAttr(device, jsonBody, "voltage");
|
|
|
+ updateAttr(device, jsonBody, "current");
|
|
|
+ updateAttr(device, jsonBody, "power");
|
|
|
|
|
|
- /**
|
|
|
- * 校验是否需要更新属性值
|
|
|
- *
|
|
|
- * @param jsonBody jsonBody 消息体
|
|
|
- * @param attrKey 属性key
|
|
|
- * @return true 更新,false 不更新
|
|
|
- */
|
|
|
- private void checkForUpdate(EmsDevice device, JSONObject jsonBody, String attrKey) {
|
|
|
- if (!jsonBody.containsKey(attrKey)) {
|
|
|
- return;
|
|
|
+ if (updateAttr(device, jsonBody, "energy")) {
|
|
|
+ syncElecMeterByCumulant(device, jsonBody);
|
|
|
}
|
|
|
- String currentValue = jsonBody.getString(attrKey);
|
|
|
-
|
|
|
- // Atomically initialize the cache entry if absent
|
|
|
- Map<String, String> attrMap = attrCache.computeIfAbsent(device.getDeviceCode(), k -> {
|
|
|
- List<EmsObjAttrValue> attrList = objAttrValueService.selectByObjCode(DEVICE_OBJ_TYPE, k);
|
|
|
- if (CollectionUtils.isNotEmpty(attrList)) {
|
|
|
- return attrList.stream().collect(
|
|
|
- Collectors.toMap(EmsObjAttrValue::getAttrKey, EmsObjAttrValue::getAttrValue, (v1, v2) -> v1,
|
|
|
- ConcurrentHashMap::new));
|
|
|
- }
|
|
|
- else {
|
|
|
- return new ConcurrentHashMap<>();
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- String cachedValue = attrMap.get(attrKey);
|
|
|
-
|
|
|
- if (!StringUtils.equals(currentValue, cachedValue)) {
|
|
|
- EmsObjAttrValue attrValue = new EmsObjAttrValue(device.getDeviceCode(), DEVICE_OBJ_TYPE,
|
|
|
- device.getDeviceModel(), attrKey, currentValue);
|
|
|
- objAttrValueService.mergeObjAttrValue(attrValue);
|
|
|
- updateCacheAfterSuccess(device, attrKey, currentValue);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void updateCacheAfterSuccess(EmsDevice device, String attrKey, String newValue) {
|
|
|
- attrCache.computeIfPresent(device.getDeviceCode(), (k, attrMap) -> {
|
|
|
- attrMap.put(attrKey, newValue);
|
|
|
- return attrMap;
|
|
|
- });
|
|
|
}
|
|
|
|
|
|
private boolean checkResult(JSONObject sendObject, JSONObject receiveObject) {
|