浏览代码

室内能耗拆分采集和计量小时产出

learshaw 2 周之前
父节点
当前提交
1582d0efba

+ 9 - 0
ems/ems-cloud/ems-dev-adapter/src/main/java/com/ruoyi/ems/TaskExecutor.java

@@ -76,6 +76,15 @@ public class TaskExecutor {
 //    }
 
     /**
+     * 每5min采集能耗数据
+     */
+    @Scheduled(cron = "0 0/15 * * * ?")
+    public void inDoorMeterCollect() {
+        int cnt = inDoorEnergyHandler.meterCollect();
+        log.debug("采集室内能耗设备计量数据: {} 条", cnt);
+    }
+
+    /**
      * 每小时产出设备计量数据
      */
     @Scheduled(cron = "0 0 0/1 * * ?")

+ 5 - 13
ems/ems-cloud/ems-dev-adapter/src/main/java/com/ruoyi/ems/handle/BaseDevHandler.java

@@ -11,6 +11,7 @@
 package com.ruoyi.ems.handle;
 
 import com.huashe.common.utils.DateUtils;
+import com.ruoyi.common.redis.service.RedisService;
 import com.ruoyi.ems.domain.EmsDevice;
 import com.ruoyi.ems.domain.EmsObjAbility;
 import com.ruoyi.ems.domain.EmsObjAbilityCallLog;
@@ -34,8 +35,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.Date;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * 设备处理基类
@@ -49,6 +48,9 @@ import java.util.concurrent.ConcurrentHashMap;
 @Slf4j
 public abstract class BaseDevHandler {
     @Autowired
+    protected RedisService redisService;
+
+    @Autowired
     protected IEmsDeviceService deviceService;
 
     @Autowired
@@ -91,11 +93,6 @@ public abstract class BaseDevHandler {
     public abstract List<EmsDevice> getDeviceList();
 
     /**
-     * 缓存设备属性
-     */
-    protected static final Map<String, Map<String, String>> attrCache = new ConcurrentHashMap<>();
-
-    /**
      * 最后一次消息时间缓存key
      */
     protected static final String MQTT_LAST_TIME = "mqttLastMsgTime";
@@ -110,8 +107,6 @@ public abstract class BaseDevHandler {
         try {
             String dateTime = DateUtils.dateToString(timeValue, DateUtils.YYYY_MM_DD_HH_MM_SS);
 
-            attrCache.computeIfAbsent(device.getDeviceCode(), k -> new ConcurrentHashMap<>());
-
             // 更新缓存
             updateCacheAfterSuccess(device.getDeviceCode(), MQTT_LAST_TIME, dateTime);
         }
@@ -128,10 +123,7 @@ public abstract class BaseDevHandler {
      * @param newValue
      */
     public void updateCacheAfterSuccess(String deviceCode, String attrKey, String newValue) {
-        attrCache.computeIfPresent(deviceCode, (k, attrMap) -> {
-            attrMap.put(attrKey, newValue);
-            return attrMap;
-        });
+        redisService.setCacheMapValue(deviceCode, attrKey, newValue);
     }
 
     /**

+ 39 - 35
ems/ems-cloud/ems-dev-adapter/src/main/java/com/ruoyi/ems/handle/BaseMeterDevHandler.java

@@ -68,15 +68,11 @@ public abstract class BaseMeterDevHandler extends BaseDevHandler {
             Date date = DateUtils.adjustHour(new Date(), -1);
 
             for (MeterDevice device : deviceList) {
-                // 获取设备属性缓存
-                Map<String, String> attrMap = attrCache.computeIfAbsent(device.getDeviceCode(),
-                    k -> new ConcurrentHashMap<>());
-
                 // 读取历史抄表
-                String lastMeterReading = attrMap.get(LAST_HOUR_READING);
+                String lastMeterReading = redisService.getCacheMapValue(device.getDeviceCode(), LAST_HOUR_READING);
 
                 // 读取最新抄表值
-                String newMeterReading = attrMap.get(NEW_HOUR_READING);
+                String newMeterReading = redisService.getCacheMapValue(device.getDeviceCode(), NEW_HOUR_READING);
 
                 ElecMeterH elecMeterH = getElecMeterH(device, lastMeterReading, newMeterReading);
 
@@ -99,24 +95,34 @@ public abstract class BaseMeterDevHandler extends BaseDevHandler {
         return cnt;
     }
 
-    public ElecMeterH getElecMeterH(MeterDevice device, String lastMeterReading, String newMeterReading) {
+    public ElecMeterH getElecMeterH(MeterDevice mDevice, String lastMeterReading, String newMeterReading) {
+        return getElecMeterH(mDevice.getDeviceCode(), mDevice, lastMeterReading, newMeterReading);
+    }
+
+    public ElecMeterH getElecMeterH(String deviceCode, MeterDevice mDevice, String lastMeterReading,
+        String newMeterReading) {
         ElecMeterH elecMeterH = null;
 
         // 缓存不为空,使用缓存抄表计算
         if (StringUtils.isNotEmpty(lastMeterReading)) {
-            elecMeterH = execElecHourMeter(device, lastMeterReading, newMeterReading);
+            elecMeterH = execElecHourMeter(mDevice, lastMeterReading, newMeterReading);
+            // 本周期抄表完成,将新抄表值覆盖作为下个周期的起始
+            updateCacheAfterSuccess(deviceCode, LAST_HOUR_READING, newMeterReading);
         }
         // 缓存为空,使用数据库记录计算
         else {
-            ElecMeterH dbElecMeterH = elecMeterHService.selectLatelyItem(device.getDeviceCode());
+            ElecMeterH dbElecMeterH = elecMeterHService.selectLatelyItem(deviceCode);
 
             if (null != dbElecMeterH && null != dbElecMeterH.getMeterReading()) {
-                elecMeterH = execElecHourMeter(device, String.valueOf(dbElecMeterH.getMeterReading()), newMeterReading);
+                elecMeterH = execElecHourMeter(mDevice, String.valueOf(dbElecMeterH.getMeterReading()),
+                    newMeterReading);
+                // 本周期抄表完成,将新抄表值覆盖作为下个周期的起始
+                updateCacheAfterSuccess(deviceCode, LAST_HOUR_READING, newMeterReading);
             }
             else {
                 // 无缓存也无数据库记录,则认为是首次上报
                 if (StringUtils.isNotEmpty(newMeterReading)) {
-                    updateCacheAfterSuccess(device.getDeviceCode(), LAST_HOUR_READING, newMeterReading);
+                    updateCacheAfterSuccess(deviceCode, LAST_HOUR_READING, newMeterReading);
                 }
             }
         }
@@ -127,12 +133,12 @@ public abstract class BaseMeterDevHandler extends BaseDevHandler {
     /**
      * 执行小时抄表计算
      *
-     * @param device           设备信息
+     * @param mDevice          计量设备
      * @param lastMeterReading 上次抄表值
      * @param newMeterReading  本次抄表值
      * @return
      */
-    private ElecMeterH execElecHourMeter(MeterDevice device, String lastMeterReading, String newMeterReading) {
+    protected ElecMeterH execElecHourMeter(MeterDevice mDevice, String lastMeterReading, String newMeterReading) {
         ElecMeterH elecMeterH = null;
         BigDecimal lastEngValue = new BigDecimal(lastMeterReading);
 
@@ -141,16 +147,9 @@ public abstract class BaseMeterDevHandler extends BaseDevHandler {
             // 计算电量差值
             BigDecimal useQuantity = newEngValue.subtract(lastEngValue);
             // 倍率计算
-            useQuantity = magnification(useQuantity, device.getMagnification());
-            // 更新缓存
-            updateCacheAfterSuccess(device.getDeviceCode(), LAST_HOUR_READING, newMeterReading);
-            // 封装计量对象
-            elecMeterH = buildElecMeterH(device, newEngValue, useQuantity);
-        }
-        else {
-            BigDecimal useQuantity = BigDecimal.ZERO;
+            useQuantity = magnification(useQuantity, mDevice.getMagnification());
             // 封装计量对象
-            elecMeterH = buildElecMeterH(device, lastEngValue, useQuantity);
+            elecMeterH = buildElecMeterH(mDevice, newEngValue, useQuantity);
         }
 
         return elecMeterH;
@@ -179,24 +178,36 @@ public abstract class BaseMeterDevHandler extends BaseDevHandler {
         return elecMeterH;
     }
 
-    public WaterMeterH getWaterMeterH(MeterDevice device, String lastMeterReading, String newMeterReading) {
+    public WaterMeterH getWaterMeterH(MeterDevice mDevice, String lastMeterReading, String newMeterReading) {
+        return getWaterMeterH(mDevice.getDeviceCode(), mDevice, lastMeterReading, newMeterReading);
+    }
+
+    public WaterMeterH getWaterMeterH(String deviceCode, MeterDevice mDevice, String lastMeterReading,
+        String newMeterReading) {
         WaterMeterH waterMeterH = null;
 
         // 缓存不为空,使用缓存抄表计算
         if (StringUtils.isNotEmpty(lastMeterReading)) {
-            waterMeterH = execWaterHourMeter(device, lastMeterReading, newMeterReading);
+            waterMeterH = execWaterHourMeter(mDevice, lastMeterReading, newMeterReading);
+
+            // 更新缓存
+            updateCacheAfterSuccess(deviceCode, LAST_HOUR_READING, newMeterReading);
         }
         // 缓存为空,使用数据库记录计算
         else {
-            WaterMeterH dbWaterMeterH = waterMeterHService.selectLatelyItem(device.getDeviceCode());
+            WaterMeterH dbWaterMeterH = waterMeterHService.selectLatelyItem(mDevice.getDeviceCode());
 
             if (null != dbWaterMeterH && null != dbWaterMeterH.getMeterReading()) {
-                waterMeterH = execWaterHourMeter(device, String.valueOf(dbWaterMeterH.getMeterReading()), newMeterReading);
+                waterMeterH = execWaterHourMeter(mDevice, String.valueOf(dbWaterMeterH.getMeterReading()),
+                    newMeterReading);
+
+                // 更新缓存
+                updateCacheAfterSuccess(deviceCode, LAST_HOUR_READING, newMeterReading);
             }
             else {
                 // 无缓存也无数据库记录,则认为是首次上报
                 if (StringUtils.isNotEmpty(newMeterReading)) {
-                    updateCacheAfterSuccess(device.getDeviceCode(), LAST_HOUR_READING, newMeterReading);
+                    updateCacheAfterSuccess(deviceCode, LAST_HOUR_READING, newMeterReading);
                 }
             }
         }
@@ -212,7 +223,7 @@ public abstract class BaseMeterDevHandler extends BaseDevHandler {
      * @param newMeterReading  本次抄表值
      * @return
      */
-    private WaterMeterH execWaterHourMeter(MeterDevice device, String lastMeterReading, String newMeterReading) {
+    protected WaterMeterH execWaterHourMeter(MeterDevice device, String lastMeterReading, String newMeterReading) {
         WaterMeterH waterMeterH = null;
         BigDecimal lastEngValue = new BigDecimal(lastMeterReading);
 
@@ -222,16 +233,9 @@ public abstract class BaseMeterDevHandler extends BaseDevHandler {
             BigDecimal useQuantity = newEngValue.subtract(lastEngValue);
             // 倍率计算
             useQuantity = magnification(useQuantity, device.getMagnification());
-            // 更新缓存
-            updateCacheAfterSuccess(device.getDeviceCode(), LAST_HOUR_READING, newMeterReading);
             // 封装计量对象
             waterMeterH = buildWaterMeterH(device, newEngValue, useQuantity);
         }
-        else {
-            BigDecimal useQuantity = BigDecimal.ZERO;
-            // 封装计量对象
-            waterMeterH = buildWaterMeterH(device, lastEngValue, useQuantity);
-        }
 
         return waterMeterH;
     }

+ 12 - 21
ems/ems-cloud/ems-dev-adapter/src/main/java/com/ruoyi/ems/handle/GeekOpenCbHandler.java

@@ -28,7 +28,6 @@ import com.ruoyi.ems.model.MqttCacheMsg;
 import com.ruoyi.ems.model.QueryDevice;
 import com.ruoyi.ems.util.IdUtils;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,9 +39,6 @@ import org.springframework.stereotype.Service;
 import javax.annotation.Resource;
 import java.util.Date;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
 
 /**
  * GeekOpen 断路器服务层
@@ -252,21 +248,18 @@ public class GeekOpenCbHandler extends BaseMeterDevHandler {
             String currentValue = jsonBody.getString(sourceKey);
 
             // 获取缓存
-            Map<String, String> attrMap = attrCache.computeIfAbsent(device.getDeviceCode(), k -> {
-                List<EmsObjAttrValue> attrList = objAttrValueService.selectByObjCode(device.getDeviceModel(), k);
+            String cacheValue = redisService.getCacheMapValue(device.getDeviceCode(), targetKey);
 
-                if (CollectionUtils.isNotEmpty(attrList)) {
-                    return attrList.stream().collect(
-                        Collectors.toMap(EmsObjAttrValue::getAttrKey, EmsObjAttrValue::getAttrValue, (v1, v2) -> v1,
-                            ConcurrentHashMap::new));
-                }
-                else {
-                    return new ConcurrentHashMap<>();
-                }
-            });
+            if (null == cacheValue) {
+                EmsObjAttrValue param = new EmsObjAttrValue();
+                param.setObjCode(device.getDeviceCode());
+                param.setModelCode(device.getDeviceModel());
+                param.setAttrKey(targetKey);
 
-            // 从缓存中获取属性值
-            String cacheValue = attrMap.get(targetKey);
+                List<EmsObjAttrValue> attrList = objAttrValueService.selectObjAttrValueList(param);
+                EmsObjAttrValue attrValue = CollectionUtils.isNotEmpty(attrList) ? attrList.get(0) : null;
+                cacheValue = null != attrValue ? attrValue.getAttrValue() : null;
+            }
 
             // 比较属性值是否发生变化
             if (!StringUtils.equals(currentValue, cacheValue)) {
@@ -352,11 +345,9 @@ public class GeekOpenCbHandler extends BaseMeterDevHandler {
 
         if (CollectionUtils.isNotEmpty(deviceList)) {
             for (EmsDevice device : deviceList) {
-                Map<String, String> attrMap = attrCache.get(device.getDeviceCode());
-
-                if (MapUtils.isNotEmpty(attrMap) && null != attrMap.get(MQTT_LAST_TIME)) {
-                    String lastMsgTime = attrMap.get(MQTT_LAST_TIME);
+                String lastMsgTime = redisService.getCacheMapValue(device.getDeviceCode(), MQTT_LAST_TIME);
 
+                if (null != lastMsgTime) {
                     // 计算最后一次消息至今的时间差
                     long time =
                         currentTime - DateUtils.stringToDate(lastMsgTime, DateUtils.YYYY_MM_DD_HH_MM_SS).getTime();

+ 197 - 77
ems/ems-cloud/ems-dev-adapter/src/main/java/com/ruoyi/ems/handle/InDoorEnergyHandler.java

@@ -13,6 +13,7 @@ package com.ruoyi.ems.handle;
 import com.alibaba.fastjson.JSONObject;
 import com.huashe.common.exception.Assert;
 import com.huashe.common.utils.DateUtils;
+import com.ruoyi.common.redis.service.RedisService;
 import com.ruoyi.ems.config.InDoorEnergyConfig;
 import com.ruoyi.ems.core.InDoorEnergyTemplate;
 import com.ruoyi.ems.domain.ElecMeterH;
@@ -35,6 +36,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
@@ -70,6 +72,9 @@ public class InDoorEnergyHandler extends BaseMeterDevHandler {
     @Resource
     private InDoorEnergyConfig config;
 
+    @Autowired
+    private RedisService redisService;
+
     @Override
     public List<MeterDevice> getMeterDeviceList() {
         return meterDeviceService.selectByModel(MODE_CODE);
@@ -130,16 +135,16 @@ public class InDoorEnergyHandler extends BaseMeterDevHandler {
     }
 
     /**
-     * 能耗数据抄报
+     * 采集室内能耗计量数据
+     *
+     * @return 采集条数
      */
-    @Override
-    public int meterHourProd() {
+    public int meterCollect() {
         int cnt = 0;
 
         try {
             // 获取所有能源设备列表
             List<EmsDevice> deviceList = getDeviceList();
-            List<MeterDevice> meterDeviceList = getMeterDeviceList();
 
             // 遍历每个能源设备
             if (CollectionUtils.isNotEmpty(deviceList)) {
@@ -147,7 +152,7 @@ public class InDoorEnergyHandler extends BaseMeterDevHandler {
                 InDoorEnergyTemplate template = new InDoorEnergyTemplate(config.getUrl());
 
                 for (EmsDevice emsDevice : deviceList) {
-                    cnt += meterDevHourProd(template, emsDevice, meterDeviceList);
+                    cnt += meterDevCollect(template, emsDevice);
                 }
             }
         }
@@ -158,8 +163,7 @@ public class InDoorEnergyHandler extends BaseMeterDevHandler {
         return cnt;
     }
 
-    private int meterDevHourProd(InDoorEnergyTemplate template, EmsDevice emsDevice,
-        List<MeterDevice> meterDeviceList) {
+    private int meterDevCollect(InDoorEnergyTemplate template, EmsDevice emsDevice) {
         int cnt = 0;
 
         try {
@@ -189,6 +193,108 @@ public class InDoorEnergyHandler extends BaseMeterDevHandler {
             String dataStr = resJson.getString("ResultPointObjArr");
             List<CodesVal> retList = JSONObject.parseArray(dataStr, CodesVal.class);
 
+            // 更新设备属性
+            updateDeviceAttrList(attrValues, retList);
+        }
+        catch (Exception e) {
+            log.error("meterDevHourProd error! deviceCode:{}", emsDevice.getDeviceCode(), e);
+        }
+
+        return cnt;
+    }
+
+    private void updateDeviceAttrList(List<EmsObjAttrValue> attrValues, List<CodesVal> retList) {
+        try {
+            // 将新采集数据转换为Map
+            Map<String, CodesVal> codesValMap = retList.stream()
+                .collect(Collectors.toMap(CodesVal::getPointId, Function.identity()));
+
+            for (EmsObjAttrValue dbAttr : attrValues) {
+                // 更新列表中的值
+                boolean dbNeedUpdate = false;
+                String dbAttrValue = dbAttr.getAttrValue();
+                List<ObjAttrTableItem> dbItems = JSON.parseArray(dbAttrValue, ObjAttrTableItem.class);
+
+                if (CollectionUtils.isNotEmpty(dbItems)) {
+                    Map<String, ObjAttrTableItem> itemMap = dbItems.stream()
+                        .collect(Collectors.toMap(ObjAttrTableItem::getKey, Function.identity()));
+
+                    for (Map.Entry<String, ObjAttrTableItem> dbItemEntry : itemMap.entrySet()) {
+                        String pointId = dbItemEntry.getKey();
+                        ObjAttrTableItem dbItem = dbItemEntry.getValue();
+
+                        CodesVal syncCodesVal = codesValMap.get(pointId);
+
+                        if (null != syncCodesVal) {
+                            // 更新缓存中的数据
+                            String hKey = NEW_HOUR_READING + "-" + pointId;
+                            redisService.setCacheMapValue(dbAttr.getObjCode(), hKey, syncCodesVal.getValue());
+
+                            if (!StringUtils.equals(syncCodesVal.getValue(), dbItem.getValue()) || !StringUtils.equals(
+                                syncCodesVal.getTime(), dbItem.getUpdateTime())) {
+                                dbNeedUpdate = true;
+
+                                dbItem.setValue(syncCodesVal.getValue());
+                                dbItem.setUpdateTime(syncCodesVal.getTime());
+                            }
+                        }
+                    }
+                }
+
+                // 如果有更新,则保存回数据库
+                if (dbNeedUpdate) {
+                    dbAttrValue = JSON.toJSONString(dbItems);
+                    dbAttr.setAttrValue(dbAttrValue);
+                    objAttrValueService.updateObjAttrValue(dbAttr);
+                }
+            }
+        }
+        catch (Exception e) {
+            log.error("更新设备属性异常", e);
+        }
+    }
+
+    /**
+     * 能耗数据抄报
+     */
+    @Override
+    public int meterHourProd() {
+        int cnt = 0;
+
+        try {
+            // 获取所有能源设备列表
+            List<EmsDevice> deviceList = getDeviceList();
+            List<MeterDevice> meterDeviceList = getMeterDeviceList();
+
+            // 遍历每个能源设备
+            if (CollectionUtils.isNotEmpty(deviceList)) {
+                for (EmsDevice emsDevice : deviceList) {
+                    cnt += meterDevHourProd(emsDevice, meterDeviceList);
+                }
+            }
+        }
+        catch (Exception e) {
+            log.error("能耗数据抄报异常", e);
+        }
+
+        return cnt;
+    }
+
+    private int meterDevHourProd(EmsDevice emsDevice, List<MeterDevice> meterDeviceList) {
+        int cnt = 0;
+
+        try {
+            // 查询当前设备的接口属性值
+            List<EmsObjAttrValue> attrValues = objAttrValueService.selectByObjCode(MODE_CODE,
+                emsDevice.getDeviceCode());
+            attrValues = attrValues.stream()
+                // 过滤条件:attrKey以"interface"开头
+                .filter(attr -> StringUtils.startsWith(attr.getAttrKey(), "interface"))
+                // 收集为新的List
+                .collect(Collectors.toList());
+
+            Set<String> pointIdSet = getPointIds(attrValues);
+
             // 电表过滤
             List<MeterDevice> elecDevs = meterDeviceList.stream()
                 .filter(device -> pointIdSet.contains(device.getDeviceCode()) && 45 == device.getMeterCls())
@@ -196,7 +302,7 @@ public class InDoorEnergyHandler extends BaseMeterDevHandler {
 
             // 电表抄表数据保存
             if (CollectionUtils.isNotEmpty(elecDevs)) {
-                cnt += saveElecMeterReading(elecDevs, retList);
+                cnt += saveElecMeterReading(emsDevice, elecDevs);
             }
 
             // 水表过滤
@@ -206,11 +312,8 @@ public class InDoorEnergyHandler extends BaseMeterDevHandler {
 
             // 水表抄表数据保存
             if (CollectionUtils.isNotEmpty(waterDevs)) {
-                cnt += saveWaterMeterReading(waterDevs, retList);
+                cnt += saveWaterMeterReading(emsDevice, waterDevs);
             }
-
-            // 更新设备属性
-            updateDeviceAttrList(attrValues, retList);
         }
         catch (Exception e) {
             log.error("meterDevHourProd error! deviceCode:{}", emsDevice.getDeviceCode(), e);
@@ -219,33 +322,28 @@ public class InDoorEnergyHandler extends BaseMeterDevHandler {
         return cnt;
     }
 
-    private int saveElecMeterReading(List<MeterDevice> deviceList, List<CodesVal> retList) {
-        Map<String, CodesVal> retMap = retList.stream().collect(Collectors.toMap(CodesVal::getPointId, val -> val));
+    private int saveElecMeterReading(EmsDevice gwDevice, List<MeterDevice> deviceList) {
         Map<String, Price> priceMap = new ConcurrentHashMap<>();
         List<ElecMeterH> meterHList = new ArrayList<>();
+        Date date = DateUtils.adjustHour(new Date(), -1);
 
-        for (MeterDevice device : deviceList) {
+        for (MeterDevice meterDevice : deviceList) {
             // 读取最新抄表值
-            CodesVal codesVal = retMap.get(device.getDeviceCode());
+            String newMeterReading = redisService.getCacheMapValue(gwDevice.getDeviceCode(),
+                NEW_HOUR_READING + "-" + meterDevice.getDeviceCode());
 
-            if (null != codesVal) {
-                // 读取最新抄表值
-                String newMeterReading = codesVal.getValue();
-                Date date = DateUtils.stringToDate(codesVal.getTime(), "yyyy-MM-dd HH:mm:ss");
-
-                // 获取设备属性缓存
-                Map<String, String> attrMap = attrCache.computeIfAbsent(device.getDeviceCode(),
-                    k -> new ConcurrentHashMap<>());
+            if (null != newMeterReading) {
+                String lastCacheKey = LAST_HOUR_READING + "-" + meterDevice.getDeviceCode();
 
                 // 读取历史抄表
-                String lastMeterReading = attrMap.get(LAST_HOUR_READING);
+                String lastMeterReading = redisService.getCacheMapValue(gwDevice.getDeviceCode(), lastCacheKey);
 
-                // 组装电表抄报数据
-                ElecMeterH elecMeterH = getElecMeterH(device, lastMeterReading, newMeterReading);
+                ElecMeterH elecMeterH = getElecMeterH(gwDevice.getDeviceCode(), meterDevice, lastMeterReading,
+                    newMeterReading);
 
                 if (null != elecMeterH) {
-                    Price price = priceMap.computeIfAbsent(device.getAreaCode(),
-                        k -> priceService.getElecHourPrice(device.getAreaCode(), date));
+                    Price price = priceMap.computeIfAbsent(gwDevice.getAreaCode(),
+                        k -> priceService.getElecHourPrice(gwDevice.getAreaCode(), date));
                     completeElecPrice(elecMeterH, price);
                     meterHList.add(elecMeterH);
                 }
@@ -255,32 +353,60 @@ public class InDoorEnergyHandler extends BaseMeterDevHandler {
         return CollectionUtils.isNotEmpty(meterHList) ? elecMeterHService.insertBatch(meterHList) : 0;
     }
 
-    private int saveWaterMeterReading(List<MeterDevice> deviceList, List<CodesVal> retList) {
-        Map<String, CodesVal> retMap = retList.stream().collect(Collectors.toMap(CodesVal::getPointId, val -> val));
+    @Override
+    public ElecMeterH getElecMeterH(String deviceCode, MeterDevice mDevice, String lastMeterReading,
+        String newMeterReading) {
+        ElecMeterH elecMeterH = null;
+        String cacheKey = LAST_HOUR_READING + "-" + mDevice.getDeviceCode();
+
+        // 缓存不为空,使用缓存抄表计算
+        if (StringUtils.isNotEmpty(lastMeterReading)) {
+            elecMeterH = execElecHourMeter(mDevice, lastMeterReading, newMeterReading);
+            // 本周期抄表完成,将新抄表值覆盖作为下个周期的起始
+            updateCacheAfterSuccess(deviceCode, cacheKey, newMeterReading);
+        }
+        // 缓存为空,使用数据库记录计算
+        else {
+            ElecMeterH dbElecMeterH = elecMeterHService.selectLatelyItem(deviceCode);
+
+            if (null != dbElecMeterH && null != dbElecMeterH.getMeterReading()) {
+                elecMeterH = execElecHourMeter(mDevice, String.valueOf(dbElecMeterH.getMeterReading()),
+                    newMeterReading);
+                // 本周期抄表完成,将新抄表值覆盖作为下个周期的起始
+                updateCacheAfterSuccess(deviceCode, cacheKey, newMeterReading);
+            }
+            else {
+                // 无缓存也无数据库记录,则认为是首次上报
+                if (StringUtils.isNotEmpty(newMeterReading)) {
+                    updateCacheAfterSuccess(deviceCode, cacheKey, newMeterReading);
+                }
+            }
+        }
+
+        return elecMeterH;
+    }
+
+    private int saveWaterMeterReading(EmsDevice gwDevice, List<MeterDevice> deviceList) {
         Map<String, FdEnergyPriceConfig> priceMap = new ConcurrentHashMap<>();
         List<WaterMeterH> meterHList = new ArrayList<>();
 
-        for (MeterDevice device : deviceList) {
+        for (MeterDevice meterDevice : deviceList) {
             // 读取最新抄表值
-            CodesVal codesVal = retMap.get(device.getDeviceCode());
-
-            if (null != codesVal) {
-                // 读取最新抄表值
-                String newMeterReading = codesVal.getValue();
+            String newMeterReading = redisService.getCacheMapValue(gwDevice.getDeviceCode(),
+                NEW_HOUR_READING + "-" + meterDevice.getDeviceCode());
 
-                // 获取设备属性缓存
-                Map<String, String> attrMap = attrCache.computeIfAbsent(device.getDeviceCode(),
-                    k -> new ConcurrentHashMap<>());
+            if (null != newMeterReading) {
+                String lastCacheKey = LAST_HOUR_READING + "-" + meterDevice.getDeviceCode();
 
                 // 读取历史抄表
-                String lastMeterReading = attrMap.get(LAST_HOUR_READING);
+                String lastMeterReading = redisService.getCacheMapValue(gwDevice.getDeviceCode(), lastCacheKey);
 
-                // 组装电表抄报数据
-                WaterMeterH waterMeterH = getWaterMeterH(device, lastMeterReading, newMeterReading);
+                // 组装表抄报数据
+                WaterMeterH waterMeterH = getWaterMeterH(gwDevice.getDeviceCode(), meterDevice, lastMeterReading, newMeterReading);
 
                 if (null != waterMeterH) {
-                    FdEnergyPriceConfig price = priceMap.computeIfAbsent(device.getAreaCode(),
-                        k -> fdEnergyPriceConfigService.selectByAreaCode(device.getAreaCode(), 70));
+                    FdEnergyPriceConfig price = priceMap.computeIfAbsent(gwDevice.getAreaCode(),
+                        k -> fdEnergyPriceConfigService.selectByAreaCode(gwDevice.getAreaCode(), 70));
                     completeWaterPrice(waterMeterH, price);
                     meterHList.add(waterMeterH);
                 }
@@ -290,45 +416,39 @@ public class InDoorEnergyHandler extends BaseMeterDevHandler {
         return CollectionUtils.isNotEmpty(meterHList) ? waterMeterHService.insertBatch(meterHList) : 0;
     }
 
-    private void updateDeviceAttrList(List<EmsObjAttrValue> attrValues, List<CodesVal> retList) {
-        try {
-            Map<String, CodesVal> codesValMap = retList.stream()
-                .collect(Collectors.toMap(CodesVal::getPointId, Function.identity()));
-
-            for (EmsObjAttrValue objAttr : attrValues) {
-                // 更新列表中的值
-                boolean updated = false;
-                String attrValue = objAttr.getAttrValue();
-                List<ObjAttrTableItem> tableItems = JSON.parseArray(attrValue, ObjAttrTableItem.class);
+    @Override
+    public WaterMeterH getWaterMeterH(String deviceCode, MeterDevice mDevice, String lastMeterReading,
+        String newMeterReading) {
+        WaterMeterH waterMeterH = null;
+        String cacheKey = LAST_HOUR_READING + "-" + mDevice.getDeviceCode();
 
-                if (CollectionUtils.isNotEmpty(tableItems)) {
-                    Map<String, ObjAttrTableItem> itemMap = tableItems.stream()
-                        .collect(Collectors.toMap(ObjAttrTableItem::getKey, Function.identity()));
+        // 缓存不为空,使用缓存抄表计算
+        if (StringUtils.isNotEmpty(lastMeterReading)) {
+            waterMeterH = execWaterHourMeter(mDevice, lastMeterReading, newMeterReading);
 
-                    for (Map.Entry<String, ObjAttrTableItem> itemEntry : itemMap.entrySet()) {
-                        String itemKey = itemEntry.getKey();
-                        ObjAttrTableItem item = itemEntry.getValue();
+            // 更新缓存
+            updateCacheAfterSuccess(deviceCode, cacheKey, newMeterReading);
+        }
+        // 缓存为空,使用数据库记录计算
+        else {
+            WaterMeterH dbWaterMeterH = waterMeterHService.selectLatelyItem(mDevice.getDeviceCode());
 
-                        if (codesValMap.containsKey(itemKey)) {
-                            updated = true;
-                            CodesVal codesVal = codesValMap.get(itemKey);
-                            item.setValue(codesVal.getValue());
-                            item.setUpdateTime(codesVal.getTime());
-                        }
-                    }
-                }
+            if (null != dbWaterMeterH && null != dbWaterMeterH.getMeterReading()) {
+                waterMeterH = execWaterHourMeter(mDevice, String.valueOf(dbWaterMeterH.getMeterReading()),
+                    newMeterReading);
 
-                // 如果有更新,则保存回数据库
-                if (updated) {
-                    attrValue = JSON.toJSONString(tableItems);
-                    objAttr.setAttrValue(attrValue);
-                    objAttrValueService.updateObjAttrValue(objAttr);
+                // 更新缓存
+                updateCacheAfterSuccess(deviceCode, cacheKey, newMeterReading);
+            }
+            else {
+                // 无缓存也无数据库记录,则认为是首次上报
+                if (StringUtils.isNotEmpty(newMeterReading)) {
+                    updateCacheAfterSuccess(deviceCode, cacheKey, newMeterReading);
                 }
             }
         }
-        catch (Exception e) {
-            log.error("更新设备属性异常", e);
-        }
+
+        return waterMeterH;
     }
 
     private Set<String> getPointIds(List<EmsObjAttrValue> attrValues) {

+ 3 - 4
ems/ems-cloud/ems-dev-adapter/src/main/java/com/ruoyi/ems/handle/Keka86BsHandler.java

@@ -20,7 +20,6 @@ import com.ruoyi.ems.model.AbilityPayload;
 import com.ruoyi.ems.model.CallResponse;
 import com.ruoyi.ems.model.QueryDevice;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -95,10 +94,10 @@ public class Keka86BsHandler extends BaseDevHandler {
 
         if (CollectionUtils.isNotEmpty(deviceList)) {
             for (EmsDevice device : deviceList) {
-                Map<String, String> attrMap = attrCache.get(device.getDeviceCode());
+                Object cacheTimeObj = redisService.getCacheMapValue(device.getDeviceCode(), MQTT_LAST_TIME);
 
-                if (MapUtils.isNotEmpty(attrMap) && null != attrMap.get(MQTT_LAST_TIME)) {
-                    String lastMsgTime = attrMap.get(MQTT_LAST_TIME);
+                if (null != cacheTimeObj) {
+                    String lastMsgTime = cacheTimeObj.toString();
 
                     // 计算最后一次消息至今的时间差
                     long time =

+ 1 - 1
ems/ems-cloud/ems-server/src/main/resources/application-prod-ct.yml

@@ -39,7 +39,7 @@ spring:
         # 从库数据源
         slave:
           driver-class-name: com.mysql.cj.jdbc.Driver
-          url: jdbc:mysql://10.0.8.20:3306/ems_ct?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+          url: jdbc:mysql://10.0.8.21:3306/ems_ct?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
           username: ems_ct
           password: Cxndix7rd2EdtrAd