|
@@ -13,18 +13,24 @@ package com.ruoyi.ems.handle;
|
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
|
import com.huashe.common.utils.DateUtils;
|
|
|
import com.ruoyi.ems.core.MqttTemplate;
|
|
|
+import com.ruoyi.ems.domain.ElecMeterH;
|
|
|
import com.ruoyi.ems.domain.EmsDevice;
|
|
|
import com.ruoyi.ems.domain.EmsObjAbilityCallLog;
|
|
|
import com.ruoyi.ems.domain.EmsObjAttrValue;
|
|
|
import com.ruoyi.ems.domain.EmsObjReportLog;
|
|
|
+import com.ruoyi.ems.domain.MeterDevice;
|
|
|
import com.ruoyi.ems.enums.DevOnlineStatus;
|
|
|
import com.ruoyi.ems.model.AbilityPayload;
|
|
|
import com.ruoyi.ems.model.CallResponse;
|
|
|
import com.ruoyi.ems.model.MqttCacheMsg;
|
|
|
+import com.ruoyi.ems.model.Price;
|
|
|
+import com.ruoyi.ems.service.IElecMeterHService;
|
|
|
import com.ruoyi.ems.service.IEmsDeviceService;
|
|
|
import com.ruoyi.ems.service.IEmsObjAbilityCallLogService;
|
|
|
import com.ruoyi.ems.service.IEmsObjAttrValueService;
|
|
|
import com.ruoyi.ems.service.IEmsObjReportLogService;
|
|
|
+import com.ruoyi.ems.service.IMeterDeviceService;
|
|
|
+import com.ruoyi.ems.service.IPriceService;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.commons.collections4.MapUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
@@ -34,6 +40,9 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
+import java.math.BigDecimal;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Calendar;
|
|
|
import java.util.Date;
|
|
|
import java.util.List;
|
|
@@ -61,11 +70,15 @@ public abstract class MqttBaseHandler {
|
|
|
*/
|
|
|
protected static final Map<String, Map<String, String>> attrCache = new ConcurrentHashMap<>();
|
|
|
|
|
|
+ protected static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
/**
|
|
|
* 最后一次消息时间缓存key
|
|
|
*/
|
|
|
protected static final String MQTT_LAST_TIME = "mqttLastMsgTime";
|
|
|
|
|
|
+ protected static final String LAST_HOUR_READING = "lastHourReading";
|
|
|
+
|
|
|
@Resource
|
|
|
@Qualifier("mqttTemplate")
|
|
|
protected MqttTemplate mqttTemplate;
|
|
@@ -74,13 +87,22 @@ public abstract class MqttBaseHandler {
|
|
|
protected IEmsDeviceService deviceService;
|
|
|
|
|
|
@Autowired
|
|
|
+ protected IMeterDeviceService meterDeviceService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
protected IEmsObjAttrValueService objAttrValueService;
|
|
|
|
|
|
@Autowired
|
|
|
- private IEmsObjAbilityCallLogService objAbilityCallLogService;
|
|
|
+ protected IElecMeterHService elecMeterHService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ protected IPriceService priceService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ protected IEmsObjAbilityCallLogService objAbilityCallLogService;
|
|
|
|
|
|
@Autowired
|
|
|
- private IEmsObjReportLogService objReportLogService;
|
|
|
+ protected IEmsObjReportLogService objReportLogService;
|
|
|
|
|
|
/**
|
|
|
* 抽象方法,用于能力调用
|
|
@@ -101,11 +123,19 @@ public abstract class MqttBaseHandler {
|
|
|
|
|
|
/**
|
|
|
* 获取设备列表
|
|
|
+ *
|
|
|
* @return 设备列表
|
|
|
*/
|
|
|
public abstract List<EmsDevice> getDeviceList();
|
|
|
|
|
|
/**
|
|
|
+ * 获取计量设备列表
|
|
|
+ *
|
|
|
+ * @return 设备列表
|
|
|
+ */
|
|
|
+ public abstract List<MeterDevice> getMeterDeviceList();
|
|
|
+
|
|
|
+ /**
|
|
|
* 添加消息ID到消息中
|
|
|
*
|
|
|
* @param msg 消息
|
|
@@ -119,9 +149,10 @@ public abstract class MqttBaseHandler {
|
|
|
|
|
|
/**
|
|
|
* 发送MQTT消息
|
|
|
- * @param topic 主题
|
|
|
- * @param payload 负载数据
|
|
|
- * @param qos 质量服务等级
|
|
|
+ *
|
|
|
+ * @param topic 主题
|
|
|
+ * @param payload 负载数据
|
|
|
+ * @param qos 质量服务等级
|
|
|
* @param retained 是否保留消息
|
|
|
*/
|
|
|
public void sendMqttMsg(String topic, String payload, int qos, boolean retained) {
|
|
@@ -131,7 +162,8 @@ public abstract class MqttBaseHandler {
|
|
|
|
|
|
/**
|
|
|
* 更新最后消息时
|
|
|
- * @param device 设备
|
|
|
+ *
|
|
|
+ * @param device 设备
|
|
|
* @param timeValue 时间值
|
|
|
*/
|
|
|
public void updateMsgTime(EmsDevice device, Date timeValue) {
|
|
@@ -141,7 +173,7 @@ public abstract class MqttBaseHandler {
|
|
|
attrCache.computeIfAbsent(device.getDeviceCode(), k -> new ConcurrentHashMap<>());
|
|
|
|
|
|
// 更新缓存
|
|
|
- updateCacheAfterSuccess(device, MQTT_LAST_TIME, dateTime);
|
|
|
+ updateCacheAfterSuccess(device.getDeviceCode(), MQTT_LAST_TIME, dateTime);
|
|
|
}
|
|
|
catch (Exception e) {
|
|
|
log.error("刷新缓存消息时间失败!", e);
|
|
@@ -149,12 +181,11 @@ public abstract class MqttBaseHandler {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 定时检测在线状态
|
|
|
+ * 定时刷新在线状态
|
|
|
* <br/>每小时执行一次,扫描2个小时无消息的设备,标记为离线状态
|
|
|
*/
|
|
|
- public void refreshOnline() {
|
|
|
+ public void refreshOnline(long threshold) {
|
|
|
long currentTime = new Date().getTime();
|
|
|
- long threshold = 2 * 60 * 60 * 1000; // 120分钟
|
|
|
|
|
|
List<EmsDevice> deviceList = getDeviceList();
|
|
|
|
|
@@ -162,12 +193,14 @@ public abstract class MqttBaseHandler {
|
|
|
for (EmsDevice device : deviceList) {
|
|
|
Map<String, String> attrMap = attrCache.get(device.getDeviceCode());
|
|
|
|
|
|
- if (MapUtils.isNotEmpty(attrMap)) {
|
|
|
+ if (MapUtils.isNotEmpty(attrMap) && null != attrMap.get(MQTT_LAST_TIME)) {
|
|
|
String lastMsgTime = attrMap.get(MQTT_LAST_TIME);
|
|
|
|
|
|
- if (StringUtils.isNotEmpty(lastMsgTime)
|
|
|
- && (currentTime - DateUtils.stringToDate(lastMsgTime, DateUtils.YYYY_MM_DD_HH_MM_SS).getTime())
|
|
|
- > threshold) {
|
|
|
+ // 计算最后一次消息至今的时间差
|
|
|
+ long time =
|
|
|
+ currentTime - DateUtils.stringToDate(lastMsgTime, DateUtils.YYYY_MM_DD_HH_MM_SS).getTime();
|
|
|
+
|
|
|
+ if (time > threshold) {
|
|
|
refreshStatus(device, DevOnlineStatus.OFFLINE);
|
|
|
}
|
|
|
}
|
|
@@ -176,6 +209,116 @@ public abstract class MqttBaseHandler {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * 产出小时电量计量
|
|
|
+ */
|
|
|
+ public int meterHourProd() {
|
|
|
+ int cnt = 0;
|
|
|
+
|
|
|
+ try {
|
|
|
+ List<MeterDevice> deviceList = getMeterDeviceList();
|
|
|
+ List<ElecMeterH> meterHList = new ArrayList<>();
|
|
|
+
|
|
|
+ for (MeterDevice device : deviceList) {
|
|
|
+ ElecMeterH elecMeterH = meterHourProdSub(device);
|
|
|
+
|
|
|
+ if (null != elecMeterH) {
|
|
|
+ meterHList.add(elecMeterH);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (CollectionUtils.isNotEmpty(meterHList)) {
|
|
|
+ cnt = elecMeterHService.insertBatch(meterHList);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ log.error("定时产出小时电量计量失败!", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ return cnt;
|
|
|
+ }
|
|
|
+
|
|
|
+ private ElecMeterH meterHourProdSub(MeterDevice device) {
|
|
|
+ ElecMeterH elecMeterH = null;
|
|
|
+
|
|
|
+ // 获取设备属性缓存
|
|
|
+ Map<String, String> attrMap = attrCache.computeIfAbsent(device.getDeviceCode(), k -> new ConcurrentHashMap<>());
|
|
|
+
|
|
|
+ String lastMeterReading = attrMap.get(LAST_HOUR_READING);
|
|
|
+ String newMeterReading = attrMap.get("energy");
|
|
|
+
|
|
|
+ if (StringUtils.isNotEmpty(lastMeterReading)) {
|
|
|
+ elecMeterH = execHourMeter(device, lastMeterReading, newMeterReading);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ ElecMeterH dbElecMeterH = elecMeterHService.selectLatelyItem(device.getDeviceCode());
|
|
|
+
|
|
|
+ if (null != dbElecMeterH && null != dbElecMeterH.getMeterReading()) {
|
|
|
+ elecMeterH = execHourMeter(device, String.valueOf(dbElecMeterH.getMeterReading()), newMeterReading);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ // 无缓存也无数据库记录,则认为是首次上报
|
|
|
+ if (StringUtils.isNotEmpty(newMeterReading)) {
|
|
|
+ updateCacheAfterSuccess(device.getDeviceCode(), LAST_HOUR_READING, newMeterReading);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return elecMeterH;
|
|
|
+ }
|
|
|
+
|
|
|
+ private ElecMeterH execHourMeter(MeterDevice device, String lastMeterReading, String newMeterReading) {
|
|
|
+ ElecMeterH elecMeterH = null;
|
|
|
+ BigDecimal lastEngValue = new BigDecimal(lastMeterReading);
|
|
|
+
|
|
|
+ if (StringUtils.isNotEmpty(newMeterReading)) {
|
|
|
+ BigDecimal newEngValue = new BigDecimal(newMeterReading);
|
|
|
+ // 计算电量差值
|
|
|
+ BigDecimal useQuantity = newEngValue.subtract(lastEngValue);
|
|
|
+ // 倍率计算
|
|
|
+ useQuantity = magnification(useQuantity, device.getMagnification());
|
|
|
+ // 更新缓存
|
|
|
+ updateCacheAfterSuccess(device.getDeviceCode(), LAST_HOUR_READING, newMeterReading);
|
|
|
+ // 封装计量对象
|
|
|
+ elecMeterH = buildElecMeterH(device, newEngValue, useQuantity);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ BigDecimal useQuantity = BigDecimal.ZERO;
|
|
|
+ // 封装计量对象
|
|
|
+ elecMeterH = buildElecMeterH(device, lastEngValue, useQuantity);
|
|
|
+ }
|
|
|
+
|
|
|
+ return elecMeterH;
|
|
|
+ }
|
|
|
+
|
|
|
+ private ElecMeterH buildElecMeterH(MeterDevice device, BigDecimal newEngValue, BigDecimal useQuantity) {
|
|
|
+ Date date = new Date();
|
|
|
+ ElecMeterH elecMeterH = new ElecMeterH();
|
|
|
+ elecMeterH.setAreaCode(device.getAreaCode());
|
|
|
+ elecMeterH.setDeviceCode(device.getDeviceCode());
|
|
|
+ elecMeterH.setRecordTime(date);
|
|
|
+ elecMeterH.setDate(date);
|
|
|
+ elecMeterH.setTime(date);
|
|
|
+ elecMeterH.setTimeIndex(getHourIndex(date));
|
|
|
+ elecMeterH.setElecQuantity(useQuantity.doubleValue());
|
|
|
+ elecMeterH.setMeterReading(newEngValue.doubleValue());
|
|
|
+
|
|
|
+ Price price = priceService.getElecHourPrice(device.getAreaCode(), date);
|
|
|
+
|
|
|
+ if (null != price) {
|
|
|
+ BigDecimal cost = useQuantity.multiply(new BigDecimal(String.valueOf(price.getPriceValue())));
|
|
|
+ elecMeterH.setMeterType(price.getMeterType());
|
|
|
+ elecMeterH.setMeterUnitPrice(price.getPriceValue());
|
|
|
+ elecMeterH.setUseElecCost(cost.doubleValue());
|
|
|
+ }
|
|
|
+
|
|
|
+ return elecMeterH;
|
|
|
+ }
|
|
|
+
|
|
|
+ private BigDecimal magnification(BigDecimal diff, Integer magnification) {
|
|
|
+ return null != magnification ? diff.multiply(new BigDecimal(String.valueOf(magnification))) : diff;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* 校验是否需要更新属性值
|
|
|
*
|
|
|
* @param jsonBody jsonBody 消息体
|
|
@@ -218,7 +361,7 @@ public abstract class MqttBaseHandler {
|
|
|
// 更新数据库
|
|
|
objAttrValueService.mergeObjAttrValue(attrValue);
|
|
|
// 更新缓存
|
|
|
- updateCacheAfterSuccess(device, attrKey, currentValue);
|
|
|
+ updateCacheAfterSuccess(device.getDeviceCode(), attrKey, currentValue);
|
|
|
|
|
|
flag = true;
|
|
|
}
|
|
@@ -232,12 +375,13 @@ public abstract class MqttBaseHandler {
|
|
|
|
|
|
/**
|
|
|
* 更新缓存值
|
|
|
- * @param device 设备信息
|
|
|
- * @param attrKey 属性值
|
|
|
+ *
|
|
|
+ * @param deviceCode 设备信息
|
|
|
+ * @param attrKey 属性值
|
|
|
* @param newValue
|
|
|
*/
|
|
|
- public void updateCacheAfterSuccess(EmsDevice device, String attrKey, String newValue) {
|
|
|
- attrCache.computeIfPresent(device.getDeviceCode(), (k, attrMap) -> {
|
|
|
+ public void updateCacheAfterSuccess(String deviceCode, String attrKey, String newValue) {
|
|
|
+ attrCache.computeIfPresent(deviceCode, (k, attrMap) -> {
|
|
|
attrMap.put(attrKey, newValue);
|
|
|
return attrMap;
|
|
|
});
|