|
@@ -15,12 +15,11 @@ import com.huashe.common.exception.Assert;
|
|
|
import com.huashe.common.exception.BusinessException;
|
|
|
import com.huashe.common.utils.DateUtils;
|
|
|
import com.huashe.common.utils.ThreadUtils;
|
|
|
-import com.ruoyi.ems.core.MessageCache;
|
|
|
+import com.ruoyi.ems.core.ObjectCache;
|
|
|
import com.ruoyi.ems.domain.ElecMeterH;
|
|
|
import com.ruoyi.ems.domain.EmsDevice;
|
|
|
import com.ruoyi.ems.domain.EmsObjAbilityCallLog;
|
|
|
import com.ruoyi.ems.domain.MeterDevice;
|
|
|
-import com.ruoyi.ems.enums.DevObjType;
|
|
|
import com.ruoyi.ems.enums.DevOnlineStatus;
|
|
|
import com.ruoyi.ems.model.AbilityPayload;
|
|
|
import com.ruoyi.ems.model.CallResponse;
|
|
@@ -31,14 +30,11 @@ import com.ruoyi.ems.service.IElecMeterHService;
|
|
|
import com.ruoyi.ems.service.IMeterDeviceService;
|
|
|
import com.ruoyi.ems.service.IPriceService;
|
|
|
import com.ruoyi.ems.util.IdUtils;
|
|
|
-import org.apache.commons.collections4.CollectionUtils;
|
|
|
-import org.apache.commons.collections4.MapUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
-import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.math.BigDecimal;
|
|
@@ -63,7 +59,7 @@ public class GeekOpenCbHandler extends MqttBaseHandler {
|
|
|
private static final Logger log = LoggerFactory.getLogger(GeekOpenCbHandler.class);
|
|
|
|
|
|
@Autowired
|
|
|
- private MessageCache messageCache;
|
|
|
+ private ObjectCache messageCache;
|
|
|
|
|
|
@Autowired
|
|
|
private IMeterDeviceService meterDeviceService;
|
|
@@ -92,7 +88,7 @@ public class GeekOpenCbHandler extends MqttBaseHandler {
|
|
|
String type = sendObject.getString("type");
|
|
|
String system = sendObject.getString("system");
|
|
|
String deviceCode = abilityParam.getObjCode();
|
|
|
- String messageId = StringUtils.equals("syncStatistic", type) ? "auto" : ("CALL-" + IdUtils.generateMessageId());
|
|
|
+ String messageId = StringUtils.equals("statistic", type) ? "auto" : ("CALL-" + IdUtils.generateMessageId());
|
|
|
String msgBody = addMsgId(abilityParam.getAbilityParam(), "messageId", messageId);
|
|
|
|
|
|
// 发送消息到MQTT服务器
|
|
@@ -106,7 +102,7 @@ public class GeekOpenCbHandler extends MqttBaseHandler {
|
|
|
EmsObjAbilityCallLog logItem = saveCallLog(abilityParam, sendTime, 1);
|
|
|
|
|
|
while (true) {
|
|
|
- MqttCacheMsg cacheMsg = messageCache.getAndRemoveMqttMessage(messageId);
|
|
|
+ MqttCacheMsg cacheMsg = messageCache.readDevResponse(messageId);
|
|
|
|
|
|
if (null != cacheMsg) {
|
|
|
String receiveParam = cacheMsg.getPayload();
|
|
@@ -153,7 +149,7 @@ public class GeekOpenCbHandler extends MqttBaseHandler {
|
|
|
// 前序调用的响应消息:1.写入消息队列,2.更新属性值
|
|
|
else if (StringUtils.isNotEmpty(messageId) && StringUtils.startsWith(messageId, "CALL-")) {
|
|
|
MqttCacheMsg mqttCacheMsg = new MqttCacheMsg(messageId, deviceCode, new Date(), payload);
|
|
|
- messageCache.addMqttMessage(messageId, mqttCacheMsg);
|
|
|
+ messageCache.setDevResCache(messageId, mqttCacheMsg);
|
|
|
updateBaseAttr(device, msgBody);
|
|
|
}
|
|
|
// 设备同步数据(INFO类,协议类):更新基础属性值
|
|
@@ -176,39 +172,15 @@ public class GeekOpenCbHandler extends MqttBaseHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public int getObjType() {
|
|
|
- return DevObjType.DEVC.getCode();
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
- * 定时检测在线状态
|
|
|
- * <br/>每小时执行一次,扫描2个小时无消息的设备,标记为离线状态
|
|
|
+ * 获取设备列表
|
|
|
+ * @return 设备列表
|
|
|
*/
|
|
|
- @Scheduled(cron = "0 10 0/1 * * ?")
|
|
|
- public void checkOnlineStatus() {
|
|
|
- long currentTime = new Date().getTime();
|
|
|
- long threshold = 2 * 60 * 60 * 1000; // 120分钟
|
|
|
-
|
|
|
+ @Override
|
|
|
+ public List<EmsDevice> getDeviceList() {
|
|
|
QueryDevice queryDevice = new QueryDevice();
|
|
|
queryDevice.setDeviceModel("M_W2_QF_GEEKOPEN");
|
|
|
- List<EmsDevice> deviceList = deviceService.selectList(queryDevice);
|
|
|
-
|
|
|
- if (CollectionUtils.isNotEmpty(deviceList)) {
|
|
|
- for (EmsDevice device : deviceList) {
|
|
|
- Map<String, String> attrMap = attrCache.get(device.getDeviceCode());
|
|
|
-
|
|
|
- if (MapUtils.isNotEmpty(attrMap)) {
|
|
|
- String lastMsgTime = attrMap.get(MQTT_LAST_TIME);
|
|
|
-
|
|
|
- if (StringUtils.isNotEmpty(lastMsgTime)
|
|
|
- && (currentTime - DateUtils.stringToDate(lastMsgTime, DateUtils.YYYY_MM_DD_HH_MM_SS).getTime())
|
|
|
- > threshold) {
|
|
|
- refreshStatus(device, DevOnlineStatus.OFFLINE);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ return deviceService.selectList(queryDevice);
|
|
|
}
|
|
|
|
|
|
/**
|