learshaw 4 месяцев назад
Родитель
Сommit
37526a584d
17 измененных файлов с 1140 добавлено и 725 удалено
  1. 9 9
      ems/ems-cloud/ems-dev-adapter/src/main/java/com/ruoyi/ems/TaskExecutor.java
  2. 7 1
      ems/ems-cloud/ems-dev-adapter/src/main/java/com/ruoyi/ems/handle/GeekOpenCbHandler.java
  3. 197 112
      ems/ems-cloud/ems-dev-adapter/src/main/java/com/ruoyi/ems/handle/Keka86BsHandler.java
  4. 47 0
      ems/ems-cloud/ems-server/src/main/java/com/ruoyi/ems/config/SchedulerConfig.java
  5. 74 48
      ems/ems-cloud/ems-server/src/main/java/com/ruoyi/ems/controller/OpEnergyStrategyController.java
  6. 262 180
      ems/ems-cloud/ems-server/src/main/java/com/ruoyi/ems/task/StrategyScheduler.java
  7. 175 365
      ems/ems-cloud/ems-server/src/main/java/com/ruoyi/ems/task/StrategyTriggerListener.java
  8. 0 1
      ems/ems-core/src/main/java/com/ruoyi/ems/domain/OpEnergyStrategy.java
  9. 2 0
      ems/ems-core/src/main/java/com/ruoyi/ems/mapper/OpEnergyStrategyTriggerMapper.java
  10. 2 0
      ems/ems-core/src/main/java/com/ruoyi/ems/service/IOpEnergyStrategyTriggerService.java
  11. 5 0
      ems/ems-core/src/main/java/com/ruoyi/ems/service/impl/OpEnergyStrategyTriggerServiceImpl.java
  12. 342 0
      ems/ems-core/src/main/java/com/ruoyi/ems/strategy/executor/ParallelStepExecutor.java
  13. 11 0
      ems/ems-core/src/main/java/com/ruoyi/ems/strategy/executor/StrategyExecutor.java
  14. 1 7
      ems/ems-core/src/main/resources/mapper/ems/OpEnergyStrategyMapper.xml
  15. 5 0
      ems/ems-core/src/main/resources/mapper/ems/OpEnergyStrategyTriggerMapper.xml
  16. 1 1
      ems/sql/ems_init_data_test.sql
  17. 0 1
      ems/sql/ems_server.sql

+ 9 - 9
ems/ems-cloud/ems-dev-adapter/src/main/java/com/ruoyi/ems/TaskExecutor.java

@@ -74,9 +74,9 @@ public class TaskExecutor {
      */
     @Scheduled(cron = "0 0/5 * * * ?")
     public void refresh5min() {
-        CompletableFuture.runAsync(() -> squareLightCtlHandler.execSyncDevAttrAll());
-        CompletableFuture.runAsync(() -> acrelElecMonitorHandler.execSyncDevAttrAll());
-        squareLightCtlHandler.refreshOnline();
+//        CompletableFuture.runAsync(() -> squareLightCtlHandler.execSyncDevAttrAll());
+//        CompletableFuture.runAsync(() -> acrelElecMonitorHandler.execSyncDevAttrAll());
+//        squareLightCtlHandler.refreshOnline();
     }
 
 //    /**
@@ -93,12 +93,12 @@ public class TaskExecutor {
      */
     @Scheduled(cron = "0 0/15 * * * ?")
     public void baMeterCollect() {
-        baCtlHandler.meterCollect();
-        baCtlHandler.xfCollect();
-        baCtlHandler.ahuCollect();
-        baCtlHandler.wtCollect();
-        baCtlHandler.wpCollect();
-        baCtlHandler.lightCollect();
+//        baCtlHandler.meterCollect();
+//        baCtlHandler.xfCollect();
+//        baCtlHandler.ahuCollect();
+//        baCtlHandler.wtCollect();
+//        baCtlHandler.wpCollect();
+//        baCtlHandler.lightCollect();
     }
 
     /**

+ 7 - 1
ems/ems-cloud/ems-dev-adapter/src/main/java/com/ruoyi/ems/handle/GeekOpenCbHandler.java

@@ -16,6 +16,8 @@ import com.huashe.common.exception.Assert;
 import com.huashe.common.exception.BusinessException;
 import com.huashe.common.utils.DateUtils;
 import com.huashe.common.utils.ThreadUtils;
+import com.ruoyi.ems.config.EmsConfig;
+import com.ruoyi.ems.core.EmsApiTemplate;
 import com.ruoyi.ems.core.MqttTemplate;
 import com.ruoyi.ems.core.ObjectCache;
 import com.ruoyi.ems.domain.EmsDevice;
@@ -67,6 +69,9 @@ public class GeekOpenCbHandler extends BaseMeterDevHandler {
     // 设备模型代码
     private static final String MODE_CODE = "M_W2_QF_GEEKOPEN";
 
+    @Autowired
+    private EmsConfig emsConfig;
+
     /**
      * 能力执行
      *
@@ -266,7 +271,8 @@ public class GeekOpenCbHandler extends BaseMeterDevHandler {
                 objAttrValueService.mergeObjAttrValue(attrValue);
                 // 更新缓存
                 updateCacheAfterSuccess(device.getDeviceCode(), targetKey, currentValue);
-
+                new EmsApiTemplate(emsConfig).notifyAttrValueChangedAsync(device.getDeviceCode(), targetKey, cacheValue,
+                    currentValue);
                 flag = true;
             }
         }

+ 197 - 112
ems/ems-cloud/ems-dev-adapter/src/main/java/com/ruoyi/ems/handle/Keka86BsHandler.java

@@ -11,13 +11,11 @@
 package com.ruoyi.ems.handle;
 
 import com.huashe.common.exception.Assert;
-import com.huashe.common.utils.DateUtils;
 import com.ruoyi.ems.config.EmsConfig;
 import com.ruoyi.ems.core.EmsApiTemplate;
 import com.ruoyi.ems.core.MqttTemplate;
 import com.ruoyi.ems.domain.EmsDevice;
 import com.ruoyi.ems.domain.EmsObjAttrValue;
-import com.ruoyi.ems.enums.DevOnlineStatus;
 import com.ruoyi.ems.model.AbilityPayload;
 import com.ruoyi.ems.model.CallResponse;
 import com.ruoyi.ems.model.ModbusCommand;
@@ -33,9 +31,9 @@ import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
@@ -58,31 +56,25 @@ public class Keka86BsHandler extends BaseDevHandler {
     @Autowired
     private EmsConfig emsConfig;
 
-    // 主题前置
     private static final String TOPIC_PREFIX = "/sc/dtu/ctl/";
-
-    // 设备模型代码
     private static final String MODE_CODE = "M_W2_QS_KEKA_86";
 
-    // Modbus功能码
-    private static final byte FUNC_READ = 0x03;   // 读保持寄存器
-
-    private static final byte FUNC_WRITE = 0x06;  // 写单个寄存器
-
-    // 设备地址
+    private static final byte FUNC_READ = 0x03;
+    private static final byte FUNC_WRITE = 0x06;
     private static final byte DEVICE_ADDR = 0x01;
 
-    // 灯控制寄存器地址 (44100=0x1004开启, 44101=0x1005关闭)
-    // 这里保持0x1021以匹配实际测试指令,如需修改请改为0x1004
+    // 单按键控制寄存器基地址 (协议: 44129=0x1021)
     private static final int LIGHT_BASE_ADDR = 0x1021;
 
-    // 控制值
-    private static final int VALUE_ON = 0x0001;   // 开
+    private static final int VALUE_ON = 0x0001;   // 闭合/开
+    private static final int VALUE_OFF = 0x0000;  // 断开/关
 
-    private static final int VALUE_OFF = 0x0000;  // 关
+    // 修正:读取单个寄存器只需要1个
+    private static final int READ_COUNT = 0x0001;
 
-    // 读取寄存器数量
-    private static final int READ_COUNT = 0x0002; // 读2个寄存器
+    // ========== 新增:用于追踪待处理的读取请求 ==========
+    // Key: gatewayId, Value: 最近请求的buttonId
+    private final Map<String, Integer> pendingReadRequests = new ConcurrentHashMap<>();
 
     @Override
     public CallResponse<Void> call(AbilityPayload abilityParam) {
@@ -99,16 +91,19 @@ public class Keka86BsHandler extends BaseDevHandler {
         if (StringUtils.equals("on-off", abilityParam.getAbilityKey())) {
             ModbusCommand command = buildControlCommand(Integer.parseInt(buttonId),
                 Integer.parseInt(abilityParam.getAbilityParam()));
-            // 发送消息到MQTT服务器
             String topic = TOPIC_PREFIX + gatewayId;
             sendMqttHex(topic, command);
             saveCallLog(abilityParam, command.getCommandHex(), System.currentTimeMillis(), 0);
             callResponse = new CallResponse<>(0, "执行成功!");
         }
         else if (StringUtils.equals("syncState", abilityParam.getAbilityKey())) {
-            ModbusCommand command = buildReadCommand(Integer.parseInt(buttonId));
-            // 发送消息到MQTT服务器
+            int btnId = Integer.parseInt(buttonId);
+            ModbusCommand command = buildReadCommand(btnId);
             String topic = TOPIC_PREFIX + gatewayId;
+
+            // 记录待处理的读取请求
+            pendingReadRequests.put(gatewayId + "_" + getRegisterAddr(btnId), btnId);
+
             sendMqttHex(topic, command);
             saveCallLog(abilityParam, command.getCommandHex(), System.currentTimeMillis(), 0);
             callResponse = new CallResponse<>(0, "执行成功!");
@@ -136,32 +131,26 @@ public class Keka86BsHandler extends BaseDevHandler {
         try {
             log.info("[Keka86] 网关:{}, 收到消息:{}", gatewayId, payload);
 
-            // 检查消息长度
             String[] hexBytes = payload.trim().split("\\s+");
-            if (hexBytes.length < 7) {
+            if (hexBytes.length < 5) {
                 log.warn("[Keka86] 消息长度不足,忽略: {}", payload);
                 return;
             }
 
-            // 判断消息类型 (功能码在第2个字节,索引为1)
-            String funcCodeStr = hexBytes[1].toUpperCase();
-            int funcCode = Integer.parseInt(funcCodeStr, 16);
+            int funcCode = Integer.parseInt(hexBytes[1].toUpperCase(), 16);
 
             if (funcCode == 0x03) {
-                // 读取响应 (03H)
                 handleReadResponse(gatewayId, payload);
             }
             else if (funcCode == 0x06) {
-                // 写入确认响应 (06H) - 这就是你遇到的情况
                 handleWriteResponse(gatewayId, payload);
             }
             else if ((funcCode & 0x80) != 0) {
-                // 错误响应 (功能码最高位为1,如83H、86H)
                 byte originalFuncCode = (byte) (funcCode & 0x7F);
                 handleErrorResponse(gatewayId, payload, originalFuncCode);
             }
             else {
-                log.warn("[Keka86] 未知功能码: 0x{}, 消息: {}", funcCodeStr, payload);
+                log.warn("[Keka86] 未知功能码: 0x%02X, 消息: {}", funcCode, payload);
             }
 
         }
@@ -171,95 +160,85 @@ public class Keka86BsHandler extends BaseDevHandler {
     }
 
     private String getDeviceCode(String gatewayId, int buttonId) {
-        List<EmsObjAttrValue> list = objAttrValueService.selectByAttrKeyValue(MODE_CODE, "gatewayId", gatewayId);
+        List<EmsObjAttrValue> list = objAttrValueService
+            .selectByAttrKeyValue(MODE_CODE, "gatewayId", gatewayId);
 
         if (CollectionUtils.isNotEmpty(list)) {
-            for (EmsObjAttrValue gatewayIdValue : list) {
-                EmsObjAttrValue buttonIdValue = objAttrValueService.selectObjAttrValue(MODE_CODE,
-                    gatewayIdValue.getObjCode(), "buttonId");
+            for (EmsObjAttrValue gatewayAttr : list) {
+                EmsObjAttrValue buttonAttr = objAttrValueService
+                    .selectObjAttrValue(MODE_CODE, gatewayAttr.getObjCode(), "buttonId");
 
-                if (null != buttonIdValue && StringUtils.equals(buttonIdValue.getObjCode(),
-                    gatewayIdValue.getObjCode())) {
-                    return gatewayIdValue.getObjCode();
+                if (buttonAttr != null
+                    && StringUtils.equals(buttonAttr.getAttrValue(), String.valueOf(buttonId))) {
+                    return gatewayAttr.getObjCode();
                 }
             }
         }
-
         return null;
     }
 
     /**
-     * 生成控制指令 (开/关) - 优化版本
-     *
-     * @param lightId 灯ID (1, 2, 3)
-     * @param state   状态 (1=开, 0=关)
-     * @return ModbusCommand 包含字节数组和十六进制字符串
+     * 生成控制指令 (开/关)
      */
-    public ModbusCommand buildControlCommand(int lightId, int state) {
-        if (lightId < 1 || lightId > 3) {
-            throw new IllegalArgumentException("灯ID必须是1-3");
+    public ModbusCommand buildControlCommand(int buttonId, int state) {
+        if (buttonId < 1 || buttonId > 6) {
+            throw new IllegalArgumentException("按键ID必须是1-6");
         }
         if (state != 0 && state != 1) {
             throw new IllegalArgumentException("状态必须是0(关)或1(开)");
         }
 
-        // 计算寄存器地址
-        int registerAddr = LIGHT_BASE_ADDR + (lightId - 1);
-
-        // 控制值
+        int registerAddr = getRegisterAddr(buttonId);
         int value = (state == 1) ? VALUE_ON : VALUE_OFF;
 
-        // 构建Modbus帧 (不含CRC)
         byte[] frame = new byte[6];
-        frame[0] = DEVICE_ADDR;                          // 设备地址
-        frame[1] = FUNC_WRITE;                           // 功能码 06H
-        frame[2] = (byte) (registerAddr >> 8);           // 寄存器地址高字节
-        frame[3] = (byte) (registerAddr & 0xFF);         // 寄存器地址低字节
-        frame[4] = (byte) (value >> 8);                  // 数据高字节
-        frame[5] = (byte) (value & 0xFF);                // 数据低字节
-
-        // 计算并添加CRC
+        frame[0] = DEVICE_ADDR;
+        frame[1] = FUNC_WRITE;
+        frame[2] = (byte) (registerAddr >> 8);
+        frame[3] = (byte) (registerAddr & 0xFF);
+        frame[4] = (byte) (value >> 8);
+        frame[5] = (byte) (value & 0xFF);
+
         int crc = calculateCRC16(frame);
         byte[] fullFrame = new byte[8];
         System.arraycopy(frame, 0, fullFrame, 0, 6);
-        fullFrame[6] = (byte) (crc & 0xFF);              // CRC低字节在前
-        fullFrame[7] = (byte) (crc >> 8);                // CRC高字节在后
+        fullFrame[6] = (byte) (crc & 0xFF);
+        fullFrame[7] = (byte) (crc >> 8);
 
         String hexString = bytesToHexString(fullFrame);
+        log.debug("[Keka86] 构建控制指令: 按键{}, 状态{}, 寄存器0x{}, 指令:{}",
+            buttonId, state == 1 ? "开" : "关", String.format("%04X", registerAddr), hexString);
         return new ModbusCommand(fullFrame, hexString);
     }
 
     /**
-     * 生成读取指令 - 优化版本
-     *
-     * @param lightId 灯ID (1, 2, 3)
-     * @return ModbusCommand 包含字节数组和十六进制字符串
+     * 生成读取指令
+     * 读取单个按键状态,寄存器数量=1
      */
-    public ModbusCommand buildReadCommand(int lightId) {
-        if (lightId < 1 || lightId > 3) {
-            throw new IllegalArgumentException("灯ID必须是1-3");
+    public ModbusCommand buildReadCommand(int buttonId) {
+        if (buttonId < 1 || buttonId > 6) {
+            throw new IllegalArgumentException("按键ID必须是1-6");
         }
 
-        // 计算寄存器地址
-        int registerAddr = LIGHT_BASE_ADDR + (lightId - 1);
+        int registerAddr = getRegisterAddr(buttonId);
 
-        // 构建Modbus帧 (不含CRC)
         byte[] frame = new byte[6];
-        frame[0] = DEVICE_ADDR;                          // 设备地址
-        frame[1] = FUNC_READ;                            // 功能码 03H
-        frame[2] = (byte) (registerAddr >> 8);           // 寄存器地址高字节
-        frame[3] = (byte) (registerAddr & 0xFF);         // 寄存器地址低字节
-        frame[4] = (byte) (READ_COUNT >> 8);             // 寄存器数量高字节
-        frame[5] = (byte) (READ_COUNT & 0xFF);           // 寄存器数量低字节
-
-        // 计算并添加CRC
+        frame[0] = DEVICE_ADDR;
+        frame[1] = FUNC_READ;
+        frame[2] = (byte) (registerAddr >> 8);
+        frame[3] = (byte) (registerAddr & 0xFF);
+        frame[4] = (byte) (READ_COUNT >> 8);    // 0x00
+        frame[5] = (byte) (READ_COUNT & 0xFF);  // 0x01 (读1个寄存器)
+
         int crc = calculateCRC16(frame);
         byte[] fullFrame = new byte[8];
         System.arraycopy(frame, 0, fullFrame, 0, 6);
-        fullFrame[6] = (byte) (crc & 0xFF);              // CRC低字节
-        fullFrame[7] = (byte) (crc >> 8);                // CRC高字节
+        fullFrame[6] = (byte) (crc & 0xFF);
+        fullFrame[7] = (byte) (crc >> 8);
 
         String hexString = bytesToHexString(fullFrame);
+        log.debug("[Keka86] 构建读取指令: 按键{}, 寄存器0x{}, 指令:{}",
+            buttonId, String.format("%04X", registerAddr), hexString);
         return new ModbusCommand(fullFrame, hexString);
     }
 
@@ -321,6 +300,20 @@ public class Keka86BsHandler extends BaseDevHandler {
     }
 
     /**
+     * 获取按键对应的寄存器地址
+     */
+    private int getRegisterAddr(int buttonId) {
+        return LIGHT_BASE_ADDR + (buttonId - 1);
+    }
+
+    /**
+     * 根据寄存器地址反推按键ID
+     */
+    private int getButtonIdFromAddr(int registerAddr) {
+        return registerAddr - LIGHT_BASE_ADDR + 1;
+    }
+
+    /**
      * 获取Modbus错误信息
      */
     private static String getModbusErrorMessage(byte errorCode) {
@@ -401,34 +394,106 @@ public class Keka86BsHandler extends BaseDevHandler {
 
     /**
      * 处理读取响应 (功能码 03H)
-     * 响应格式: 01 03 04 00 01 00 00 FA 33
+     *
+     * 响应格式 (读取1个寄存器):
+     * [0] 设备地址: 01
+     * [1] 功能码:   03
+     * [2] 字节数:   02 (1个寄存器=2字节)
+     * [3] 数据高:   00
+     * [4] 数据低:   01 或 00
+     * [5] CRC低
+     * [6] CRC高
+     *
+     * 示例: 01 03 02 00 01 79 84 表示状态=开
+     *       01 03 02 00 00 B8 44 表示状态=关
      */
     private void handleReadResponse(String gatewayId, String hexMessage) {
         try {
-            // 从响应中提取寄存器地址,推断是哪个按键
             byte[] response = hexStringToBytes(hexMessage);
 
-            // 方法1: 如果响应中没有地址信息,需要从上下文推断
-            // 这里先假设返回的是第一个寄存器的数据
-            // 实际需要根据之前发送的读取指令来匹配
+            // 最小长度: 地址(1) + 功能码(1) + 字节数(1) + 数据(2) + CRC(2) = 7
+            if (response.length < 7) {
+                log.warn("[Keka86-Read] 响应长度不足: {}", hexMessage);
+                return;
+            }
 
-            // 临时处理: 遍历所有可能的按键
-            for (int buttonId = 1; buttonId <= 3; buttonId++) {
-                DeviceStatus deviceStatus = parseReadResponse(hexMessage, buttonId);
-                String deviceCode = getDeviceCode(gatewayId, buttonId);
+            // CRC校验
+            byte[] dataWithoutCrc = new byte[response.length - 2];
+            System.arraycopy(response, 0, dataWithoutCrc, 0, response.length - 2);
+            int calculatedCrc = calculateCRC16(dataWithoutCrc);
+            int receivedCrc = (response[response.length - 2] & 0xFF)
+                | ((response[response.length - 1] & 0xFF) << 8);
 
-                if (null == deviceCode) {
-                    log.debug("[Keka86-Read] 网关:{}, 按键{} 未注册", gatewayId, buttonId);
-                    continue;
-                }
+            if (calculatedCrc != receivedCrc) {
+                log.error("[Keka86-Read] CRC校验失败, 计算:0x{}, 接收:0x{}, Hex:{}",
+                    String.format("%04X", calculatedCrc),
+                    String.format("%04X", receivedCrc), hexMessage);
+                return;
+            }
 
-                // 更新设备状态
-                updateDeviceStatus(deviceCode, deviceStatus);
+            // 解析响应数据
+            int deviceAddr = response[0] & 0xFF;
+            int funcCode = response[1] & 0xFF;
+            int byteCount = response[2] & 0xFF;
+
+            if (funcCode != 0x03) {
+                log.warn("[Keka86-Read] 功能码不匹配: 0x{}", String.format("%02X", funcCode));
+                return;
+            }
+
+            // 提取寄存器值 (2字节)
+            int dataValue = ((response[3] & 0xFF) << 8) | (response[4] & 0xFF);
+            int status = (dataValue == VALUE_ON) ? 1 : 0;
+
+            log.info("[Keka86-Read] 网关:{}, 设备地址:{}, 字节数:{}, 数据值:0x{}, 状态:{}",
+                gatewayId, deviceAddr, byteCount,
+                String.format("%04X", dataValue), status == 1 ? "开" : "关");
+
+            // ========== 关键修复:确定是哪个按键的响应 ==========
+            // 方案1: 从pendingReadRequests中查找 (需要记录发送时的按键)
+            // 方案2: 遍历所有按键配置,更新匹配的设备
+
+            // 这里采用方案2: 查找该网关下所有按键设备并更新
+            // 实际生产中建议用方案1,通过请求-响应匹配
+
+            List<EmsObjAttrValue> gatewayDevices = objAttrValueService
+                .selectByAttrKeyValue(MODE_CODE, "gatewayId", gatewayId);
+
+            if (CollectionUtils.isEmpty(gatewayDevices)) {
+                log.warn("[Keka86-Read] 网关:{} 无注册设备", gatewayId);
+                return;
+            }
 
-                log.info("[Keka86-Read] 网关:{}, 设备:{}, 按键{}, 状态:{}", gatewayId, deviceCode, buttonId,
-                    deviceStatus.getStatus() == 1 ? "开" : "关");
+            // 检查是否有匹配的待处理请求
+            for (EmsObjAttrValue gw : gatewayDevices) {
+                String deviceCode = gw.getObjCode();
+                EmsObjAttrValue btnAttr = objAttrValueService
+                    .selectObjAttrValue(MODE_CODE, deviceCode, "buttonId");
+
+                if (btnAttr != null) {
+                    int buttonId = Integer.parseInt(btnAttr.getAttrValue());
+                    int registerAddr = getRegisterAddr(buttonId);
+                    String requestKey = gatewayId + "_" + registerAddr;
+
+                    // 检查是否是该按键的响应
+                    if (pendingReadRequests.containsKey(requestKey)) {
+                        pendingReadRequests.remove(requestKey);
+
+                        DeviceStatus deviceStatus = new DeviceStatus(
+                            deviceAddr, buttonId, status, hexMessage);
+                        updateDeviceStatus(deviceCode, deviceStatus);
+
+                        log.info("[Keka86-Read] 更新设备:{}, 按键:{}, 状态:{}",
+                            deviceCode, buttonId, status == 1 ? "开" : "关");
+                        return; // 找到匹配的请求,处理完毕
+                    }
+                }
             }
 
+            // 如果没有待处理请求匹配,可能是设备主动上报
+            log.info("[Keka86-Read] 网关:{} 收到状态上报(可能是主动上报), 状态:{}",
+                gatewayId, status == 1 ? "开" : "关");
+
         }
         catch (Exception e) {
             log.error("[Keka86-Read] 网关:{}, 解析失败, Hex:{}", gatewayId, hexMessage, e);
@@ -437,18 +502,33 @@ public class Keka86BsHandler extends BaseDevHandler {
 
     /**
      * 处理写入响应 (功能码 06H)
-     * 响应格式: 01 06 10 21 00 00 DD 00
-     * 这是对写入指令的确认,表示设备已接收并执行
+     *
+     * 响应格式 (写入确认 - 回显请求):
+     * [0] 设备地址: 01
+     * [1] 功能码:   06
+     * [2] 寄存器高: 10
+     * [3] 寄存器低: 21 (0x1021 = 按键1)
+     * [4] 数据高:   00
+     * [5] 数据低:   01 或 00
+     * [6] CRC低
+     * [7] CRC高
+     *
+     * 示例: 01 06 10 21 00 01 DD 00 表示按键1设置为开
      */
     private void handleWriteResponse(String gatewayId, String hexMessage) {
         try {
             byte[] response = hexStringToBytes(hexMessage);
 
-            // 验证CRC
-            byte[] dataWithoutCrc = new byte[response.length - 2];
-            System.arraycopy(response, 0, dataWithoutCrc, 0, response.length - 2);
+            if (response.length < 8) {
+                log.warn("[Keka86-Write] 响应长度不足: {}", hexMessage);
+                return;
+            }
+
+            // CRC校验
+            byte[] dataWithoutCrc = new byte[6];
+            System.arraycopy(response, 0, dataWithoutCrc, 0, 6);
             int calculatedCrc = calculateCRC16(dataWithoutCrc);
-            int receivedCrc = (response[response.length - 2] & 0xFF) | ((response[response.length - 1] & 0xFF) << 8);
+            int receivedCrc = (response[6] & 0xFF) | ((response[7] & 0xFF) << 8);
 
             if (calculatedCrc != receivedCrc) {
                 log.error("[Keka86-Write] CRC校验失败, 网关:{}, Hex:{}", gatewayId, hexMessage);
@@ -457,21 +537,26 @@ public class Keka86BsHandler extends BaseDevHandler {
 
             // 解析寄存器地址和值
             int registerAddr = ((response[2] & 0xFF) << 8) | (response[3] & 0xFF);
-            int value = ((response[4] & 0xFF) << 8) | (response[5] & 0xFF);
+            int dataValue = ((response[4] & 0xFF) << 8) | (response[5] & 0xFF);
 
             // 根据寄存器地址推断按键ID
-            int buttonId = registerAddr - LIGHT_BASE_ADDR + 1;
-            int status = (value == VALUE_ON) ? 1 : 0;
+            int buttonId = getButtonIdFromAddr(registerAddr);
+            int status = (dataValue == VALUE_ON) ? 1 : 0;
 
-            log.info("[Keka86-Write] 网关:{}, 写入确认成功, 寄存器:0x{}, 按键:{}, 状态:{}", gatewayId,
-                String.format("%04X", registerAddr), buttonId, status == 1 ? "开" : "关");
+            log.info("[Keka86-Write] 网关:{}, 写入确认, 寄存器:0x{}, 按键:{}, 状态:{}",
+                gatewayId, String.format("%04X", registerAddr),
+                buttonId, status == 1 ? "开" : "关");
 
-            // 写入成功后更新数据库状态
+            // 更新数据库状态
             String deviceCode = getDeviceCode(gatewayId, buttonId);
             if (deviceCode != null) {
-                DeviceStatus deviceStatus = new DeviceStatus(response[0] & 0xFF, buttonId, status, hexMessage);
+                DeviceStatus deviceStatus = new DeviceStatus(
+                    response[0] & 0xFF, buttonId, status, hexMessage);
                 updateDeviceStatus(deviceCode, deviceStatus);
             }
+            else {
+                log.warn("[Keka86-Write] 网关:{}, 按键{} 未找到对应设备", gatewayId, buttonId);
+            }
 
         }
         catch (Exception e) {

+ 47 - 0
ems/ems-cloud/ems-server/src/main/java/com/ruoyi/ems/config/SchedulerConfig.java

@@ -0,0 +1,47 @@
+/*
+ * 文 件 名:  TaskSchedulerConfig
+ * 版    权:  华设设计集团股份有限公司
+ * 描    述:  <描述>
+ * 修 改 人:  lvwenbin
+ * 修改时间:  2025/12/9
+ * 跟踪单号:  <跟踪单号>
+ * 修改单号:  <修改单号>
+ * 修改内容:  <修改内容>
+ */
+package com.ruoyi.ems.config;
+
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+/**
+ * 任务调度器配置
+ */
+@Configuration
+@EnableScheduling
+public class SchedulerConfig {
+
+    /**
+     * 任务调度器
+     * 用于策略的定时触发
+     */
+    @Bean
+    public TaskScheduler taskScheduler() {
+        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+        scheduler.setPoolSize(10);
+        scheduler.setThreadNamePrefix("strategy-scheduler-");
+        scheduler.setAwaitTerminationSeconds(60);
+        scheduler.setWaitForTasksToCompleteOnShutdown(true);
+        scheduler.setRemoveOnCancelPolicy(true);
+        scheduler.setErrorHandler(t -> {
+            // 定时任务异常处理,避免任务因异常而停止
+            LoggerFactory.getLogger("StrategyScheduler")
+                .error("定时任务执行异常", t);
+        });
+        scheduler.initialize();
+        return scheduler;
+    }
+}

+ 74 - 48
ems/ems-cloud/ems-server/src/main/java/com/ruoyi/ems/controller/OpEnergyStrategyController.java

@@ -118,7 +118,7 @@ public class OpEnergyStrategyController extends BaseController {
 
     /**
      * 修改能源策略
-     * -修改后刷新调度器
+     * 重构:修改后刷新调度器
      */
     @RequiresPermissions("power-mgr:strategy:edit")
     @Log(title = "能源策略", businessType = BusinessType.UPDATE)
@@ -126,17 +126,9 @@ public class OpEnergyStrategyController extends BaseController {
     public AjaxResult edit(@RequestBody OpEnergyStrategy strategy) {
         int result = strategyService.updateStrategy(strategy);
 
-        if (result > 0) {
+        if (result > 0 && strategy.getStrategyState() != null && strategy.getStrategyState() == 1) {
             // 刷新调度器中的策略配置
             strategyScheduler.refreshStrategy(strategy.getStrategyCode());
-
-            // 【新增】如果是轮询触发类型且已启用,刷新轮询配置
-            if (strategy.getStrategyState() != null && strategy.getStrategyState() == 1) {
-                if (strategy.getTriggerType() == 4 || strategy.getTriggerType() == 5) {
-                    pollingMonitorService.refreshPollingStrategy(strategy.getStrategyCode());
-                }
-            }
-
             log.info("策略[{}]已更新并刷新调度器", strategy.getStrategyCode());
         }
 
@@ -167,7 +159,7 @@ public class OpEnergyStrategyController extends BaseController {
 
     /**
      * 启用/停用策略
-     * 核心修改:同时更新调度器
+     * 重构:统一通过 StrategyScheduler 管理触发器注册
      */
     @PutMapping("/state/{strategyCode}/{state}")
     @ApiOperation("启用/停用策略")
@@ -181,20 +173,14 @@ public class OpEnergyStrategyController extends BaseController {
         int result = strategyService.updateStrategy(strategy);
 
         if (result > 0) {
-            // 根据状态更新调度器
             if (state == 1) {
+                // 启用策略 - 由调度器统一处理所有触发器类型
                 strategyScheduler.registerStrategy(strategy);
-
-                // 如果是轮询触发类型,同时注册轮询监控
-                if (strategy.getTriggerType() == 4 || strategy.getTriggerType() == 5) {
-                    pollingMonitorService.registerPollingStrategy(strategy);
-                }
-
                 log.info("策略[{}]已启用并注册到调度器", strategyCode);
             }
             else {
+                // 停用策略 - 注销所有触发器
                 strategyScheduler.unregisterStrategy(strategyCode);
-                pollingMonitorService.unregisterPollingStrategy(strategyCode);
                 log.info("策略[{}]已停用并从调度器注销", strategyCode);
             }
         }
@@ -301,23 +287,27 @@ public class OpEnergyStrategyController extends BaseController {
 
     /**
      * 获取调度器状态
+     * 新增:更详细的状态信息
      */
     @GetMapping("/scheduler/status")
     @ApiOperation("获取调度器状态")
     public AjaxResult getSchedulerStatus() {
-        Map<String, Object> status = new HashMap<>();
-        status.put("registeredStrategies", strategyScheduler.getRegisteredCount());
-        status.put("scheduledTasks", strategyScheduler.getScheduledTaskCount());
-        status.put("attrTriggers", strategyScheduler.getAttrTriggerCount());
-        status.put("triggers", triggerListener.getRegisteredTriggers());
+        Map<String, Object> status = strategyScheduler.getStatus();
+
+        // 添加触发监听器状态
+        if (triggerListener != null) {
+            status.put("triggers", triggerListener.getRegisteredTriggers());
+        }
 
         // 添加轮询监控状态
-        status.put("pollingTasks", pollingMonitorService.getPollingTaskCount());
-        status.put("pollingStatus", pollingMonitorService.getPollingStatus());
+        if (pollingMonitorService != null) {
+            status.put("polling", pollingMonitorService.getPollingStatus());
+        }
 
         return success(status);
     }
 
+
     /**
      * 重新加载所有策略
      */
@@ -369,7 +359,8 @@ public class OpEnergyStrategyController extends BaseController {
             result.put("message", "已触发轮询检查,请查看日志确认执行结果");
 
             return success(result);
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             log.error("手动触发轮询检查失败: strategy={}", strategyCode, e);
             return error("触发轮询检查失败: " + e.getMessage());
         }
@@ -474,36 +465,64 @@ public class OpEnergyStrategyController extends BaseController {
         return success(triggers);
     }
 
+    /**
+     * 保存触发器
+     *
+     * @param trigger
+     * @return
+     */
     @PostMapping("/trigger")
     @ApiOperation("保存触发器")
     public AjaxResult saveTrigger(@RequestBody OpEnergyStrategyTrigger trigger) {
+        // 确保 strategyCode 存在
+        if (trigger.getStrategyCode() == null || trigger.getStrategyCode().isEmpty()) {
+            return error("策略代码不能为空");
+        }
+
         int result;
         if (trigger.getId() == null) {
             result = triggerService.insertTrigger(trigger);
-        } else {
+        }
+        else {
             result = triggerService.updateTrigger(trigger);
         }
 
         if (result > 0) {
-            // 同步更新策略的触发类型
+            // 同步更新策略的触发类型(用于列表筛选)
             syncStrategyTriggerType(trigger.getStrategyCode());
 
-            // 刷新调度器
-            strategyScheduler.refreshStrategy(trigger.getStrategyCode());
-
-            // 如果是轮询触发器,刷新轮询配置
-            if ("POLLING".equals(trigger.getTriggerType())) {
-                pollingMonitorService.refreshPollingStrategy(trigger.getStrategyCode());
+            // 检查策略是否已启用,如果是则刷新调度器
+            OpEnergyStrategy strategy = strategyService.selectStrategyByCode(trigger.getStrategyCode());
+            if (strategy != null && strategy.getStrategyState() == 1) {
+                strategyScheduler.refreshStrategy(trigger.getStrategyCode());
+                log.info("触发器保存后刷新调度器: strategy={}", trigger.getStrategyCode());
             }
         }
 
         return toAjax(result);
     }
 
+    /**
+     * 删除触发器
+     */
     @DeleteMapping("/trigger/{id}")
     @ApiOperation("删除触发器")
     public AjaxResult deleteTrigger(@PathVariable Long id) {
-        return toAjax(triggerService.deleteTrigger(id));
+        // 先查询触发器获取策略代码
+        OpEnergyStrategyTrigger trigger = triggerService.selectById(id);
+        String strategyCode = trigger != null ? trigger.getStrategyCode() : null;
+
+        int result = triggerService.deleteTrigger(id);
+
+        if (result > 0 && strategyCode != null) {
+            // 检查策略是否已启用,如果是则刷新调度器
+            OpEnergyStrategy strategy = strategyService.selectStrategyByCode(strategyCode);
+            if (strategy != null && strategy.getStrategyState() == 1) {
+                strategyScheduler.refreshStrategy(strategyCode);
+            }
+        }
+
+        return toAjax(result);
     }
 
     // ==================== 模板管理 ====================
@@ -534,10 +553,12 @@ public class OpEnergyStrategyController extends BaseController {
                 String username = SecurityUtils.getUsername();
                 if (username != null && !username.isEmpty()) {
                     params.put("exec_by", username);
-                } else {
+                }
+                else {
                     params.put("exec_by", "UNKNOWN_USER");
                 }
-            } catch (Exception e) {
+            }
+            catch (Exception e) {
                 params.put("exec_by", "ANONYMOUS");
                 log.debug("无法获取当前用户: {}", e.getMessage());
             }
@@ -551,7 +572,8 @@ public class OpEnergyStrategyController extends BaseController {
             result.put("execBy", params.get("exec_by"));
 
             return success(result);
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             log.error("手动执行策略失败: {}", strategyCode, e);
             return error("策略执行失败: " + e.getMessage());
         }
@@ -598,9 +620,7 @@ public class OpEnergyStrategyController extends BaseController {
         }
 
         // 取第一个启用的触发器的类型
-        OpEnergyStrategyTrigger firstTrigger = triggers.stream()
-            .filter(t -> t.getEnable() == 1)
-            .findFirst()
+        OpEnergyStrategyTrigger firstTrigger = triggers.stream().filter(t -> t.getEnable() == 1).findFirst()
             .orElse(null);
 
         if (firstTrigger != null) {
@@ -617,13 +637,19 @@ public class OpEnergyStrategyController extends BaseController {
      * 触发器类型字符串转整数
      */
     private Integer mapTriggerType(String triggerType) {
-        if (triggerType == null) return 3;
+        if (triggerType == null)
+            return 3;
         switch (triggerType) {
-            case "EVENT": return 1;
-            case "TIME": return 2;
-            case "ATTR": return 4;
-            case "POLLING": return 5;
-            default: return 3;
+            case "EVENT":
+                return 1;
+            case "TIME":
+                return 2;
+            case "ATTR":
+                return 4;
+            case "POLLING":
+                return 5;
+            default:
+                return 3;
         }
     }
 }

+ 262 - 180
ems/ems-cloud/ems-server/src/main/java/com/ruoyi/ems/task/StrategyScheduler.java

@@ -1,25 +1,16 @@
-/*
- * 文 件 名:  StrategyScheduler
- * 版    权:  华设设计集团股份有限公司
- * 描    述:  <描述>
- * 修 改 人:  lvwenbin
- * 修改时间:  2025/12/9
- * 跟踪单号:  <跟踪单号>
- * 修改单号:  <修改单号>
- * 修改内容:  <修改内容>
- */
 package com.ruoyi.ems.task;
 
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
 import com.ruoyi.ems.domain.OpEnergyStrategy;
 import com.ruoyi.ems.domain.OpEnergyStrategyTrigger;
 import com.ruoyi.ems.service.IOpEnergyStrategyService;
 import com.ruoyi.ems.service.IOpEnergyStrategyTriggerService;
 import com.ruoyi.ems.strategy.PollingMonitorService;
 import com.ruoyi.ems.strategy.executor.StrategyExecutor;
-
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.ApplicationContext;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.support.CronTrigger;
 import org.springframework.stereotype.Component;
@@ -33,13 +24,17 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 
 /**
- * 策略调度器
- * 支持的触发类型:
- * 1. 事件触发(1) - 监听设备事件
- * 2. 定时触发(2) - CRON定时执行
- * 3. 手动触发(3) - 用户手动执行
- * 4. 条件触发(4) - 属性变化触发
- * 5. 轮询触发(5) - 主动轮询监控
+ * 策略调度器(重构版)
+ *
+ * 核心改动:所有触发配置统一从触发器表读取
+ *
+ * 触发类型说明:
+ * - EVENT: 事件触发,由 StrategyTriggerListener 处理
+ * - TIME: 定时触发,从触发器的 condition_expr 读取 CRON
+ * - ATTR: 属性变化触发,由 StrategyTriggerListener 处理
+ * - POLLING: 轮询触发,由 PollingMonitorService 处理
+ *
+ * @author lvwenbin
  */
 @Slf4j
 @Component
@@ -52,46 +47,58 @@ public class StrategyScheduler {
     private IOpEnergyStrategyTriggerService triggerService;
 
     @Autowired
-    private StrategyExecutor strategyExecutor;
+    private TaskScheduler taskScheduler;
 
     @Autowired
-    private StrategyTriggerListener triggerListener;
+    private ApplicationContext applicationContext;
 
-    /**
-     * 轮询监控服务
-     */
-    @Autowired
+    @Autowired(required = false)
     private PollingMonitorService pollingMonitorService;
 
-    @Qualifier("strategyTaskScheduler")
-    @Autowired
-    private TaskScheduler taskScheduler;
+    @Autowired(required = false)
+    private StrategyTriggerListener triggerListener;
+
+    private StrategyExecutor strategyExecutor;
 
     /**
-     * 存储定时任务的Future
+     * 定时任务缓存
+     * key: strategyCode:triggerId
      */
     private final Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
 
     /**
-     * 存储已注册策略
+     * 已注册策略
      */
     private final Map<String, OpEnergyStrategy> registeredStrategies = new ConcurrentHashMap<>();
 
+    /**
+     * 统计计数
+     */
+    private int timeTriggerCount = 0;
+    private int eventTriggerCount = 0;
+    private int attrTriggerCount = 0;
+    private int pollingTriggerCount = 0;
+
+    private StrategyExecutor getStrategyExecutor() {
+        if (strategyExecutor == null) {
+            synchronized (this) {
+                if (strategyExecutor == null) {
+                    strategyExecutor = applicationContext.getBean(StrategyExecutor.class);
+                }
+            }
+        }
+        return strategyExecutor;
+    }
+
     @PostConstruct
     public void init() {
-        log.info("====== 策略调度器V2初始化开始 ======");
+        log.info("====== 策略调度器初始化(重构版) ======");
         loadEnabledStrategies();
-        log.info("====== 策略调度器V2初始化完成 ======");
-        log.info("  - 已注册策略: {}", registeredStrategies.size());
-        log.info("  - 定时任务: {}", scheduledTasks.size());
-        log.info("  - 属性触发器: {}", triggerListener.getAttrTriggerCount());
-        log.info("  - 事件触发器: {}", triggerListener.getEventTriggerCount());
-        log.info("  - 轮询任务: {}", pollingMonitorService.getPollingTaskCount());
     }
 
     @PreDestroy
     public void destroy() {
-        log.info("====== 策略调度器V2销毁 ======");
+        log.info("====== 策略调度器销毁 ======");
         scheduledTasks.values().forEach(future -> future.cancel(true));
         scheduledTasks.clear();
         registeredStrategies.clear();
@@ -101,213 +108,296 @@ public class StrategyScheduler {
      * 加载所有已启用的策略
      */
     public void loadEnabledStrategies() {
+        log.info("开始加载已启用的策略...");
+
+        // 清理现有任务
+        clearAllTasks();
+
+        // 查询所有已启用的策略
         OpEnergyStrategy query = new OpEnergyStrategy();
         query.setStrategyState(1);
         List<OpEnergyStrategy> strategies = strategyService.selectStrategyList(query);
 
-        log.info("加载到 {} 个已启用的策略", strategies.size());
+        log.info("共查询到 {} 个已启用的策略", strategies.size());
 
         for (OpEnergyStrategy strategy : strategies) {
             try {
                 registerStrategy(strategy);
+            } catch (Exception e) {
+                log.error("注册策略[{}]失败", strategy.getStrategyCode(), e);
             }
-            catch (Exception e) {
-                log.error("注册策略失败: {}", strategy.getStrategyCode(), e);
-            }
+        }
+
+        log.info("====== 策略加载完成 ======");
+        log.info("  定时触发: {} 个", timeTriggerCount);
+        log.info("  事件触发: {} 个", eventTriggerCount);
+        log.info("  属性触发: {} 个", attrTriggerCount);
+        log.info("  轮询触发: {} 个", pollingTriggerCount);
+    }
+
+    /**
+     * 清理所有任务
+     */
+    private void clearAllTasks() {
+        scheduledTasks.values().forEach(future -> future.cancel(true));
+        scheduledTasks.clear();
+        registeredStrategies.clear();
+        timeTriggerCount = 0;
+        eventTriggerCount = 0;
+        attrTriggerCount = 0;
+        pollingTriggerCount = 0;
+
+        // 清理触发监听器
+        if (triggerListener != null) {
+            triggerListener.clearAll();
         }
     }
 
     /**
      * 注册策略
+     * 核心改动:遍历触发器表,根据每个触发器的类型进行注册
      */
     public void registerStrategy(OpEnergyStrategy strategy) {
         String strategyCode = strategy.getStrategyCode();
-        Integer triggerType = strategy.getTriggerType();
+        log.info("注册策略[{}]: {}", strategyCode, strategy.getStrategyName());
 
-        log.info("注册策略: code={}, name={}, triggerType={}",
-            strategyCode, strategy.getStrategyName(), triggerType);
+        registeredStrategies.put(strategyCode, strategy);
 
-        // 先取消已有的注册
-        unregisterStrategy(strategyCode);
+        // 查询该策略的所有触发器
+        List<OpEnergyStrategyTrigger> triggers = triggerService.selectByStrategyCode(strategyCode);
 
-        switch (triggerType) {
-            case 1: // 事件触发
-                registerEventTrigger(strategy);
-                break;
+        if (triggers == null || triggers.isEmpty()) {
+            log.warn("策略[{}]没有配置触发器", strategyCode);
+            return;
+        }
+
+        // 遍历触发器,根据类型进行注册
+        for (OpEnergyStrategyTrigger trigger : triggers) {
+            if (trigger.getEnable() != 1) {
+                log.debug("触发器[{}]已禁用,跳过", trigger.getTriggerName());
+                continue;
+            }
+
+            registerTrigger(strategyCode, trigger);
+        }
+    }
+
+    /**
+     * 注册单个触发器
+     */
+    private void registerTrigger(String strategyCode, OpEnergyStrategyTrigger trigger) {
+        String triggerType = trigger.getTriggerType();
+        String triggerName = trigger.getTriggerName();
+
+        log.debug("注册触发器[{}]: type={}, name={}", strategyCode, triggerType, triggerName);
 
-            case 2: // 定时触发
-                registerTimeTrigger(strategy);
+        switch (triggerType) {
+            case "TIME":
+                registerTimeTrigger(strategyCode, trigger);
                 break;
 
-            case 3: // 手动触发
-                log.info("策略[{}]为手动触发类型,不自动执行", strategyCode);
+            case "EVENT":
+                registerEventTrigger(strategyCode, trigger);
                 break;
 
-            case 4: // 条件触发(属性变化)
-                // 同时注册被动触发和轮询触发
-                registerConditionTrigger(strategy);
-                registerPollingTrigger(strategy);
+            case "ATTR":
+                registerAttrTrigger(strategyCode, trigger);
                 break;
 
-            case 5: // 轮询触发
-                registerPollingTrigger(strategy);
+            case "POLLING":
+                registerPollingTrigger(strategyCode, trigger);
                 break;
 
             default:
-                log.warn("未知的触发类型: {}", triggerType);
+                log.warn("未知的触发类型: {}", triggerType);
         }
-
-        registeredStrategies.put(strategyCode, strategy);
     }
 
     /**
-     * 注销策略
+     * 注册定时触发器
+     * 从 condition_expr 中读取 CRON 表达式
      */
-    public void unregisterStrategy(String strategyCode) {
-        // 取消定时任务
-        ScheduledFuture<?> future = scheduledTasks.remove(strategyCode);
-        if (future != null) {
-            future.cancel(true);
-            log.info("已取消策略[{}]的定时任务", strategyCode);
+    private void registerTimeTrigger(String strategyCode, OpEnergyStrategyTrigger trigger) {
+        String cronExpression = parseCronFromCondition(trigger.getConditionExpr());
+
+        if (cronExpression == null || cronExpression.isEmpty()) {
+            log.warn("触发器[{}:{}]未配置CRON表达式", strategyCode, trigger.getTriggerName());
+            return;
         }
 
-        // 从触发监听器中移除
-        triggerListener.unregisterStrategy(strategyCode);
+        // 验证 CRON 表达式
+        try {
+            new CronTrigger(cronExpression);
+        } catch (IllegalArgumentException e) {
+            log.error("触发器[{}:{}]的CRON表达式[{}]格式错误",
+                strategyCode, trigger.getTriggerName(), cronExpression, e);
+            return;
+        }
 
-        // 从轮询监控服务中移除
-        pollingMonitorService.unregisterPollingStrategy(strategyCode);
+        // 构建任务key
+        String taskKey = strategyCode + ":TIME:" + trigger.getId();
 
-        registeredStrategies.remove(strategyCode);
+        // 取消已有任务
+        cancelTask(taskKey);
+
+        // 创建定时任务
+        final Long triggerId = trigger.getId();
+        final String triggerName = trigger.getTriggerName();
+
+        ScheduledFuture<?> future = taskScheduler.schedule(
+            () -> executeTimeTrigger(strategyCode, triggerId, triggerName),
+            new CronTrigger(cronExpression)
+        );
+
+        scheduledTasks.put(taskKey, future);
+        timeTriggerCount++;
+
+        log.info("✓ 定时触发器注册成功: strategy={}, trigger={}, cron={}",
+            strategyCode, triggerName, cronExpression);
     }
 
     /**
-     * 注册事件触发策略
+     * 从 condition_expr 解析 CRON 表达式
+     * 支持格式: {"cron":"0 0 8 * * ?"}
      */
-    private void registerEventTrigger(OpEnergyStrategy strategy) {
-        String strategyCode = strategy.getStrategyCode();
-        List<OpEnergyStrategyTrigger> triggers = triggerService.selectByStrategyCode(strategyCode);
+    private String parseCronFromCondition(String conditionExpr) {
+        if (conditionExpr == null || conditionExpr.isEmpty()) {
+            return null;
+        }
 
-        for (OpEnergyStrategyTrigger trigger : triggers) {
-            if (trigger.getEnable() != 1) continue;
-
-            if ("EVENT".equals(trigger.getTriggerType())) {
-                triggerListener.registerEventTrigger(
-                    strategyCode,
-                    trigger.getSourceObjCode(),
-                    trigger.getEventKey(),
-                    trigger.getConditionExpr()
-                );
-                log.info("策略[{}]注册事件触发: obj={}, event={}",
-                    strategyCode, trigger.getSourceObjCode(), trigger.getEventKey());
-            }
+        try {
+            JSONObject condition = JSON.parseObject(conditionExpr);
+            return condition.getString("cron");
+        } catch (Exception e) {
+            log.warn("解析CRON表达式失败: {}", conditionExpr);
+            return null;
         }
     }
 
     /**
-     * 注册定时触发策略
+     * 执行定时触发
      */
-    private void registerTimeTrigger(OpEnergyStrategy strategy) {
-        String strategyCode = strategy.getStrategyCode();
-        String cronExpr = strategy.getExecRule();
-
-        if (cronExpr == null || cronExpr.trim().isEmpty()) {
-            log.warn("策略[{}]的CRON表达式为空,跳过注册", strategyCode);
-            return;
-        }
+    private void executeTimeTrigger(String strategyCode, Long triggerId, String triggerName) {
+        log.info(">>> 定时触发执行: strategy={}, trigger={}", strategyCode, triggerName);
 
         try {
-            ScheduledFuture<?> future = taskScheduler.schedule(
-                () -> executeStrategyAsync(strategyCode, "TIME", "SCHEDULER"),
-                new CronTrigger(cronExpr)
-            );
+            Map<String, Object> params = new HashMap<>();
+            params.put("trigger_type", "TIME");
+            params.put("trigger_source", "SCHEDULER:" + triggerName);
+            params.put("trigger_id", triggerId);
+            params.put("exec_by", "SYSTEM_SCHEDULER");
 
-            scheduledTasks.put(strategyCode, future);
-            log.info("策略[{}]注册定时触发: cron={}", strategyCode, cronExpr);
+            getStrategyExecutor().executeStrategy(strategyCode, params);
 
         } catch (Exception e) {
-            log.error("策略[{}]注册定时触发失败: cron={}", strategyCode, cronExpr, e);
+            log.error("定时触发策略[{}]执行失败", strategyCode, e);
         }
     }
 
     /**
-     * 注册条件触发策略(属性变化触发 - 被动模式)
+     * 注册事件触发器
      */
-    private void registerConditionTrigger(OpEnergyStrategy strategy) {
-        String strategyCode = strategy.getStrategyCode();
-        List<OpEnergyStrategyTrigger> triggers = triggerService.selectByStrategyCode(strategyCode);
+    private void registerEventTrigger(String strategyCode, OpEnergyStrategyTrigger trigger) {
+        if (triggerListener == null) {
+            log.warn("TriggerListener未注入,无法注册事件触发器");
+            return;
+        }
 
-        for (OpEnergyStrategyTrigger trigger : triggers) {
-            if (trigger.getEnable() != 1) continue;
-
-            if ("ATTR".equals(trigger.getTriggerType())) {
-                triggerListener.registerAttrTrigger(
-                    strategyCode,
-                    trigger.getSourceObjCode(),
-                    trigger.getAttrKey(),
-                    trigger.getConditionExpr()
-                );
-                log.info("策略[{}]注册属性触发(被动): obj={}, attr={}, condition={}",
-                    strategyCode, trigger.getSourceObjCode(), trigger.getAttrKey(),
-                    trigger.getConditionExpr());
-            }
+        triggerListener.registerEventTrigger(strategyCode, trigger);
+        eventTriggerCount++;
+
+        log.info("✓ 事件触发器注册成功: strategy={}, device={}, event={}",
+            strategyCode, trigger.getSourceObjCode(), trigger.getEventKey());
+    }
+
+    /**
+     * 注册属性变化触发器
+     */
+    private void registerAttrTrigger(String strategyCode, OpEnergyStrategyTrigger trigger) {
+        if (triggerListener == null) {
+            log.warn("TriggerListener未注入,无法注册属性触发器");
+            return;
         }
+
+        triggerListener.registerAttrTrigger(strategyCode, trigger);
+        attrTriggerCount++;
+
+        log.info("✓ 属性触发器注册成功: strategy={}, device={}, attr={}",
+            strategyCode, trigger.getSourceObjCode(), trigger.getAttrKey());
     }
 
     /**
-     * 注册轮询触发策略(主动轮询模式)
+     * 注册轮询触发
      */
-    private void registerPollingTrigger(OpEnergyStrategy strategy) {
-        pollingMonitorService.registerPollingStrategy(strategy);
+    private void registerPollingTrigger(String strategyCode, OpEnergyStrategyTrigger trigger) {
+        if (pollingMonitorService == null) {
+            log.warn("PollingMonitorService未注入,无法注册轮询触发器");
+            return;
+        }
+
+        // 轮询服务需要策略对象
+        OpEnergyStrategy strategy = registeredStrategies.get(strategyCode);
+        if (strategy != null) {
+            pollingMonitorService.registerPollingStrategy(strategy);
+            pollingTriggerCount++;
+
+            log.info("✓ 轮询触发器注册成功: strategy={}, device={}, attr={}",
+                strategyCode, trigger.getSourceObjCode(), trigger.getAttrKey());
+        }
     }
 
     /**
-     * 异步执行策略
+     * 注销策略
      */
-    private void executeStrategyAsync(String strategyCode, String triggerType, String triggerSource) {
-        try {
-            Map<String, Object> params = new HashMap<>();
-            params.put("trigger_type", triggerType);
-            params.put("trigger_source", triggerSource);
-
-            // 根据触发类型设置执行人
-            switch (triggerType) {
-                case "TIME":
-                    params.put("exec_by", "SYSTEM_SCHEDULER");
-                    break;
-                case "EVENT":
-                    params.put("exec_by", "SYSTEM_EVENT");
-                    break;
-                case "ATTR":
-                    params.put("exec_by", "SYSTEM_ATTR_CHANGE");
-                    break;
-                case "POLLING":
-                    params.put("exec_by", "SYSTEM_POLLING");
-                    break;
-                default:
-                    params.put("exec_by", "SYSTEM");
+    public void unregisterStrategy(String strategyCode) {
+        log.info("注销策略[{}]", strategyCode);
+
+        // 取消所有相关定时任务
+        scheduledTasks.entrySet().removeIf(entry -> {
+            if (entry.getKey().startsWith(strategyCode + ":")) {
+                entry.getValue().cancel(true);
+                return true;
             }
+            return false;
+        });
 
-            strategyExecutor.executeStrategy(strategyCode, params);
-        } catch (Exception e) {
-            log.error("自动执行策略失败: {}", strategyCode, e);
+        // 移除策略信息
+        registeredStrategies.remove(strategyCode);
+
+        // 注销轮询监控
+        if (pollingMonitorService != null) {
+            pollingMonitorService.unregisterPollingStrategy(strategyCode);
+        }
+
+        // 注销事件/属性触发
+        if (triggerListener != null) {
+            triggerListener.unregisterTriggers(strategyCode);
         }
     }
 
     /**
-     * 刷新策略
+     * 刷新策略配置
      */
     public void refreshStrategy(String strategyCode) {
-        OpEnergyStrategy strategy = strategyService.selectStrategyByCode(strategyCode);
+        log.info("刷新策略[{}]", strategyCode);
 
-        if (strategy == null) {
-            unregisterStrategy(strategyCode);
-            return;
-        }
+        // 先注销
+        unregisterStrategy(strategyCode);
 
-        if (strategy.getStrategyState() == 1) {
+        // 重新查询并注册
+        OpEnergyStrategy strategy = strategyService.selectStrategyByCode(strategyCode);
+        if (strategy != null && strategy.getStrategyState() == 1) {
             registerStrategy(strategy);
-        } else {
-            unregisterStrategy(strategyCode);
+        }
+    }
+
+    /**
+     * 取消指定任务
+     */
+    private void cancelTask(String taskKey) {
+        ScheduledFuture<?> future = scheduledTasks.remove(taskKey);
+        if (future != null) {
+            future.cancel(true);
         }
     }
 
@@ -322,25 +412,17 @@ public class StrategyScheduler {
     }
 
     public int getAttrTriggerCount() {
-        return triggerListener.getAttrTriggerCount();
+        return attrTriggerCount;
     }
 
-    public int getPollingTaskCount() {
-        return pollingMonitorService.getPollingTaskCount();
-    }
-
-    /**
-     * 获取完整的调度器状态
-     */
-    public Map<String, Object> getFullStatus() {
+    public Map<String, Object> getStatus() {
         Map<String, Object> status = new HashMap<>();
         status.put("registeredStrategies", registeredStrategies.size());
-        status.put("scheduledTasks", scheduledTasks.size());
-        status.put("attrTriggers", triggerListener.getAttrTriggerCount());
-        status.put("eventTriggers", triggerListener.getEventTriggerCount());
-        status.put("pollingTasks", pollingMonitorService.getPollingTaskCount());
-        status.put("triggers", triggerListener.getRegisteredTriggers());
-        status.put("polling", pollingMonitorService.getPollingStatus());
+        status.put("timeTriggers", timeTriggerCount);
+        status.put("eventTriggers", eventTriggerCount);
+        status.put("attrTriggers", attrTriggerCount);
+        status.put("pollingTriggers", pollingTriggerCount);
+        status.put("scheduledTasks", scheduledTasks.keySet());
         return status;
     }
 }

+ 175 - 365
ems/ems-cloud/ems-server/src/main/java/com/ruoyi/ems/task/StrategyTriggerListener.java

@@ -1,17 +1,10 @@
-/*
- * 文 件 名:  StrategyTriggerListener
- * 版    权:  华设设计集团股份有限公司
- * 描    述:  <描述>
- * 修 改 人:  lvwenbin
- * 修改时间:  2025/12/9
- * 跟踪单号:  <跟踪单号>
- * 修改单号:  <修改单号>
- * 修改内容:  <修改内容>
- */
 package com.ruoyi.ems.task;
 
+import com.ruoyi.ems.domain.OpEnergyStrategyTrigger;
+import com.ruoyi.ems.service.IOpEnergyStrategyTriggerService;
 import com.ruoyi.ems.strategy.evaluator.ConditionEvaluator;
 import com.ruoyi.ems.strategy.executor.StrategyExecutor;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationContext;
@@ -22,57 +15,45 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
- * 策略触发监听器
- * 由 Controller 的 onAttrValueChanged 接口调用
+ * 策略触发监听器 (事件 / 属性)
+ *
+ * 针对 StrategyScheduler 的新版 registerTrigger() 逻辑进行了修复:
+ * - registerEventTrigger()
+ * - registerAttrTrigger()
+ * - unregisterTriggers()
+ * - clearAll()
+ * - 分发入口 onEventReceived() / onAttrChanged()
+ *
+ * @author
  */
 @Slf4j
 @Component
 public class StrategyTriggerListener {
 
     @Autowired
-    private ApplicationContext applicationContext;
+    private IOpEnergyStrategyTriggerService triggerService;
 
     @Autowired
     private ConditionEvaluator conditionEvaluator;
 
-    /**
-     * StrategyExecutor 延迟加载,避免循环依赖
-     */
+    @Autowired
+    private ApplicationContext applicationContext;
+
     private StrategyExecutor strategyExecutor;
 
-    /**
-     * 属性触发器注册表
-     * key: objCode + ":" + attrKey
-     * value: 触发器配置列表
-     */
-    private final Map<String, List<AttrTriggerConfig>> attrTriggers = new ConcurrentHashMap<>();
+    /** key = objCode:eventKey  */
+    private final Map<String, List<TriggerInfo>> eventTriggers = new ConcurrentHashMap<>();
 
-    /**
-     * 事件触发器注册表
-     * key: objCode + ":" + eventKey
-     * value: 触发器配置列表
-     */
-    private final Map<String, List<EventTriggerConfig>> eventTriggers = new ConcurrentHashMap<>();
+    /** key = objCode:attrKey */
+    private final Map<String, List<TriggerInfo>> attrTriggers = new ConcurrentHashMap<>();
 
-    /**
-     * 策略代码到触发器Key的映射(用于注销时快速查找)
-     * key: strategyCode
-     * value: 触发器Key列表 (格式: "ATTR:objCode:attrKey" 或 "EVENT:objCode:eventKey")
-     */
-    private final Map<String, List<String>> strategyTriggerKeys = new ConcurrentHashMap<>();
+    /** 缓存触发器 → 用于注销 */
+    private final Map<String, List<TriggerInfo>> strategyTriggerIndex = new ConcurrentHashMap<>();
 
-    /**
-     * 异步执行线程池
-     */
-    private final ExecutorService asyncExecutor = Executors.newFixedThreadPool(5);
 
-    /**
-     * 获取 StrategyExecutor(延迟加载,避免循环依赖)
-     */
+    // 延迟加载
     private StrategyExecutor getStrategyExecutor() {
         if (strategyExecutor == null) {
             synchronized (this) {
@@ -84,399 +65,228 @@ public class StrategyTriggerListener {
         return strategyExecutor;
     }
 
-    // ==================== 触发器注册方法 ====================
+    /* ============================================================
+     * 注册触发器
+     * ============================================================ */
 
-    /**
-     * 注册属性变化触发器
-     *
-     * @param strategyCode  策略代码
-     * @param objCode       设备代码
-     * @param attrKey       属性键
-     * @param conditionExpr 条件表达式(JSON格式)
-     */
-    public void registerAttrTrigger(String strategyCode, String objCode, String attrKey, String conditionExpr) {
-        String triggerKey = objCode + ":" + attrKey;
-
-        // 创建触发器配置
-        AttrTriggerConfig config = new AttrTriggerConfig();
-        config.strategyCode = strategyCode;
-        config.objCode = objCode;
-        config.attrKey = attrKey;
-        config.conditionExpr = conditionExpr;
-
-        // 添加到触发器列表
-        List<AttrTriggerConfig> configList = attrTriggers.computeIfAbsent(triggerKey, k -> new ArrayList<>());
-
-        // 避免重复注册
-        boolean exists = configList.stream().anyMatch(c -> c.strategyCode.equals(strategyCode));
-        if (!exists) {
-            configList.add(config);
-        }
+    /** 单个事件触发器 */
+    public void registerEventTrigger(String strategyCode, OpEnergyStrategyTrigger trigger) {
+        String key = trigger.getSourceObjCode() + ":" + trigger.getEventKey();
 
-        // 记录策略到触发器的映射(用于注销)
-        List<String> keys = strategyTriggerKeys.computeIfAbsent(strategyCode, k -> new ArrayList<>());
-        String mappingKey = "ATTR:" + triggerKey;
-        if (!keys.contains(mappingKey)) {
-            keys.add(mappingKey);
-        }
-
-        log.debug("注册属性触发器: triggerKey={}, strategy={}, condition={}", triggerKey, strategyCode, conditionExpr);
-    }
+        TriggerInfo info = new TriggerInfo(strategyCode, trigger);
 
-    /**
-     * 注册事件触发器
-     *
-     * @param strategyCode  策略代码
-     * @param objCode       设备代码
-     * @param eventKey      事件键
-     * @param conditionExpr 条件表达式(JSON格式)
-     */
-    public void registerEventTrigger(String strategyCode, String objCode, String eventKey, String conditionExpr) {
-        String triggerKey = objCode + ":" + eventKey;
-
-        // 创建触发器配置
-        EventTriggerConfig config = new EventTriggerConfig();
-        config.strategyCode = strategyCode;
-        config.objCode = objCode;
-        config.eventKey = eventKey;
-        config.conditionExpr = conditionExpr;
-
-        // 添加到触发器列表
-        List<EventTriggerConfig> configList = eventTriggers.computeIfAbsent(triggerKey, k -> new ArrayList<>());
-
-        // 避免重复注册
-        boolean exists = configList.stream().anyMatch(c -> c.strategyCode.equals(strategyCode));
-        if (!exists) {
-            configList.add(config);
-        }
+        eventTriggers.computeIfAbsent(key, k -> new ArrayList<>()).add(info);
 
-        // 记录策略到触发器的映射
-        List<String> keys = strategyTriggerKeys.computeIfAbsent(strategyCode, k -> new ArrayList<>());
-        String mappingKey = "EVENT:" + triggerKey;
-        if (!keys.contains(mappingKey)) {
-            keys.add(mappingKey);
-        }
+        // 建立反向索引,注销时用
+        strategyTriggerIndex.computeIfAbsent(strategyCode, k -> new ArrayList<>()).add(info);
 
-        log.debug("注册事件触发器: triggerKey={}, strategy={}", triggerKey, strategyCode);
+        log.info("事件触发器已注册: key={}, strategy={}", key, strategyCode);
     }
 
-    /**
-     * 注销策略的所有触发器
-     *
-     * @param strategyCode 策略代码
-     */
-    public void unregisterStrategy(String strategyCode) {
-        List<String> keys = strategyTriggerKeys.remove(strategyCode);
-        if (keys == null || keys.isEmpty()) {
-            log.debug("策略[{}]没有注册的触发器", strategyCode);
-            return;
-        }
+    /** 单个属性触发器 */
+    public void registerAttrTrigger(String strategyCode, OpEnergyStrategyTrigger trigger) {
+        String key = trigger.getSourceObjCode() + ":" + trigger.getAttrKey();
 
-        for (String key : keys) {
-            if (key.startsWith("ATTR:")) {
-                // 属性触发器
-                String triggerKey = key.substring(5); // 去掉 "ATTR:" 前缀
-                List<AttrTriggerConfig> configList = attrTriggers.get(triggerKey);
-                if (configList != null) {
-                    configList.removeIf(c -> strategyCode.equals(c.strategyCode));
-                    if (configList.isEmpty()) {
-                        attrTriggers.remove(triggerKey);
-                    }
-                }
-            }
-            else if (key.startsWith("EVENT:")) {
-                // 事件触发器
-                String triggerKey = key.substring(6); // 去掉 "EVENT:" 前缀
-                List<EventTriggerConfig> configList = eventTriggers.get(triggerKey);
-                if (configList != null) {
-                    configList.removeIf(c -> strategyCode.equals(c.strategyCode));
-                    if (configList.isEmpty()) {
-                        eventTriggers.remove(triggerKey);
-                    }
-                }
-            }
-        }
+        TriggerInfo info = new TriggerInfo(strategyCode, trigger);
 
-        log.debug("注销策略[{}]的所有触发器,共{}个", strategyCode, keys.size());
-    }
+        attrTriggers.computeIfAbsent(key, k -> new ArrayList<>()).add(info);
+        strategyTriggerIndex.computeIfAbsent(strategyCode, k -> new ArrayList<>()).add(info);
 
-    // ==================== 触发器处理方法 ====================
+        log.info("属性触发器已注册: key={}, strategy={}", key, strategyCode);
+    }
 
-    /**
-     * 处理属性变化事件
-     * 由 Controller 的 /onAttrValueChanged 接口调用
-     *
-     * @param objCode  设备代码
-     * @param attrKey  属性键
-     * @param oldValue 旧值(可为null)
-     * @param newValue 新值
-     * @return 触发的策略数量
-     */
-    public int handleAttrChange(String objCode, String attrKey, Object oldValue, Object newValue) {
-        String triggerKey = objCode + ":" + attrKey;
-        List<AttrTriggerConfig> configList = attrTriggers.get(triggerKey);
 
-        if (configList == null || configList.isEmpty()) {
-            log.debug("无匹配的属性触发器: {}", triggerKey);
-            return 0;
-        }
+    /* ============================================================
+     * 注销 & 清空
+     * ============================================================ */
 
-        log.info("属性变化触发检查: obj={}, attr={}, {} -> {}", objCode, attrKey, oldValue, newValue);
+    /** 注销某个策略的所有事件/属性触发器 */
+    public void unregisterTriggers(String strategyCode) {
+        List<TriggerInfo> list = strategyTriggerIndex.remove(strategyCode);
+        if (list == null || list.isEmpty()) return;
 
-        int triggeredCount = 0;
+        for (TriggerInfo info : list) {
+            OpEnergyStrategyTrigger t = info.getTrigger();
+            String key;
 
-        // 遍历所有匹配的触发器
-        for (AttrTriggerConfig config : configList) {
-            try {
-                boolean triggered = checkAndExecuteAttrTrigger(config, oldValue, newValue);
-                if (triggered) {
-                    triggeredCount++;
-                }
-            }
-            catch (Exception e) {
-                log.error("属性触发器执行异常: strategy={}", config.strategyCode, e);
+            if ("EVENT".equals(t.getTriggerType())) {
+                key = t.getSourceObjCode() + ":" + t.getEventKey();
+                removeTrigger(eventTriggers, key, info);
+            } else if ("ATTR".equals(t.getTriggerType())) {
+                key = t.getSourceObjCode() + ":" + t.getAttrKey();
+                removeTrigger(attrTriggers, key, info);
             }
         }
 
-        return triggeredCount;
+        log.info("触发器已注销: strategy={}", strategyCode);
     }
 
-    /**
-     * 处理设备事件
-     *
-     * @param objCode   设备代码
-     * @param eventKey  事件键
-     * @param eventData 事件数据
-     * @return 触发的策略数量
-     */
-    public int handleDeviceEvent(String objCode, String eventKey, Map<String, Object> eventData) {
-        String triggerKey = objCode + ":" + eventKey;
-        List<EventTriggerConfig> configList = eventTriggers.get(triggerKey);
-
-        if (configList == null || configList.isEmpty()) {
-            log.debug("无匹配的事件触发器: {}", triggerKey);
-            return 0;
-        }
-
-        log.info("设备事件触发检查: obj={}, event={}", objCode, eventKey);
-
-        int triggeredCount = 0;
-
-        for (EventTriggerConfig config : configList) {
-            try {
-                boolean triggered = checkAndExecuteEventTrigger(config, eventData);
-                if (triggered) {
-                    triggeredCount++;
-                }
-            }
-            catch (Exception e) {
-                log.error("事件触发器执行异常: strategy={}", config.strategyCode, e);
-            }
-        }
-
-        return triggeredCount;
+    /** 完全清空所有触发器 */
+    public void clearAll() {
+        eventTriggers.clear();
+        attrTriggers.clear();
+        strategyTriggerIndex.clear();
+        log.info("TriggerListener: 所有触发器已清空");
     }
 
-    /**
-     * 检查属性触发条件并执行策略
-     * 修复:完善触发信息传递
-     *
-     * @return 是否触发执行
-     */
-    private boolean checkAndExecuteAttrTrigger(AttrTriggerConfig config, Object oldValue, Object newValue) {
-        // 构建条件评估上下文
-        Map<String, Object> context = new HashMap<>();
-        context.put("oldValue", oldValue);
-        context.put("newValue", newValue);
-        context.put(config.attrKey, newValue);
-
-        // 评估条件
-        boolean shouldTrigger = true;
-        if (config.conditionExpr != null && !config.conditionExpr.trim().isEmpty()) {
-            shouldTrigger = conditionEvaluator.evaluate(config.conditionExpr, context);
+    private void removeTrigger(Map<String, List<TriggerInfo>> map, String key, TriggerInfo info) {
+        List<TriggerInfo> list = map.get(key);
+        if (list != null) {
+            list.remove(info);
+            if (list.isEmpty()) map.remove(key);
         }
+    }
 
-        if (shouldTrigger) {
-            log.info("属性触发条件满足,执行策略: strategy={}, obj={}.{}, value={}",
-                config.strategyCode, config.objCode, config.attrKey, newValue);
-
-            // 异步执行策略
-            asyncExecutor.submit(() -> {
-                try {
-                    Map<String, Object> params = new HashMap<>();
 
-                    // 触发类型和触发源
-                    params.put("trigger_type", "ATTR");
-                    params.put("trigger_source", config.objCode + "." + config.attrKey);
+    /* ============================================================
+     * 触发入口(外部系统调用)
+     * ============================================================ */
 
-                    // 执行人(属性变化为系统自动触发)
-                    params.put("exec_by", "SYSTEM_ATTR_CHANGE");
+    /** 外部系统事件上报 → 事件触发 */
+    public void onEventReceived(String objCode, String eventKey, Map<String, Object> eventData) {
+        String key = objCode + ":" + eventKey;
 
-                    // 属性变化信息
-                    params.put("old_value", oldValue);
-                    params.put("new_value", newValue);
-                    params.put("device_code", config.objCode);
-                    params.put("attr_key", config.attrKey);
+        List<TriggerInfo> list = eventTriggers.get(key);
+        if (list == null || list.isEmpty()) return;
 
-                    // 方便上下文变量访问
-                    params.put(config.attrKey, newValue);
-                    params.put("current_" + config.attrKey, newValue);
+        for (TriggerInfo info : list) {
+            OpEnergyStrategyTrigger trigger = info.getTrigger();
 
-                    getStrategyExecutor().executeStrategy(config.strategyCode, params);
-                } catch (Exception e) {
-                    log.error("策略异步执行失败: strategy={}", config.strategyCode, e);
-                }
-            });
+            // condition_expr 判断
+            if (!checkCondition(trigger, eventData)) continue;
 
-            return true;
-        } else {
-            log.debug("属性触发条件不满足: strategy={}, condition={}", config.strategyCode, config.conditionExpr);
-            return false;
+            executeTrigger(info, "EVENT", eventData, "EVENT:" + eventKey);
         }
     }
 
-    /**
-     * 检查事件触发条件并执行策略
-     * 修复:完善触发信息传递
-     *
-     * @return 是否触发执行
-     */
-    private boolean checkAndExecuteEventTrigger(EventTriggerConfig config, Map<String, Object> eventData) {
-        // 评估条件
-        boolean shouldTrigger = true;
-        if (config.conditionExpr != null && !config.conditionExpr.trim().isEmpty()) {
-            Map<String, Object> context = eventData != null ? eventData : new HashMap<>();
-            shouldTrigger = conditionEvaluator.evaluate(config.conditionExpr, context);
-        }
+    /** 外部系统属性变化上报 → 属性触发 */
+    public void onAttrChanged(String objCode, String attrKey, Object newVal) {
+        String key = objCode + ":" + attrKey;
 
-        if (shouldTrigger) {
-            log.info("事件触发条件满足,执行策略: strategy={}, obj={}.{}",
-                config.strategyCode, config.objCode, config.eventKey);
+        List<TriggerInfo> list = attrTriggers.get(key);
+        if (list == null || list.isEmpty()) return;
 
-            // 异步执行策略
-            asyncExecutor.submit(() -> {
-                try {
-                    Map<String, Object> params = new HashMap<>();
-                    if (eventData != null) {
-                        params.putAll(eventData);
-                    }
+        Map<String, Object> data = new HashMap<>();
+        data.put(attrKey, newVal);
 
-                    // 触发类型和触发源
-                    params.put("trigger_type", "EVENT");
-                    params.put("trigger_source", config.objCode + "." + config.eventKey);
+        for (TriggerInfo info : list) {
+            OpEnergyStrategyTrigger trigger = info.getTrigger();
 
-                    // 执行人(事件触发为系统自动执行)
-                    params.put("exec_by", "SYSTEM_EVENT");
+            if (!checkCondition(trigger, data)) continue;
 
-                    // 事件信息
-                    params.put("device_code", config.objCode);
-                    params.put("event_key", config.eventKey);
+            executeTrigger(info, "ATTR", data, "ATTR:" + attrKey);
+        }
+    }
 
-                    getStrategyExecutor().executeStrategy(config.strategyCode, params);
-                } catch (Exception e) {
-                    log.error("策略异步执行失败: strategy={}", config.strategyCode, e);
-                }
-            });
+    /* ============================================================
+     * 公共方法
+     * ============================================================ */
 
+    private boolean checkCondition(OpEnergyStrategyTrigger trigger, Map<String, Object> data) {
+        if (trigger.getConditionExpr() == null || trigger.getConditionExpr().isEmpty()) {
             return true;
-        } else {
-            log.debug("事件触发条件不满足: strategy={}, condition={}", config.strategyCode, config.conditionExpr);
+        }
+
+        try {
+            return conditionEvaluator.evaluate(trigger.getConditionExpr(), data);
+        } catch (Exception e) {
+            log.error("condition_expr 解析失败: {}", trigger.getConditionExpr(), e);
             return false;
         }
     }
 
-    // ==================== 统计和调试方法 ====================
+    /** 执行策略 */
+    private void executeTrigger(TriggerInfo info, String type, Map<String, Object> data, String source) {
+        String strategyCode = info.getStrategyCode();
 
-    /**
-     * 获取已注册的属性触发器数量
-     */
-    public int getAttrTriggerCount() {
-        int count = 0;
-        for (List<AttrTriggerConfig> list : attrTriggers.values()) {
-            count += list.size();
-        }
-        return count;
-    }
+        log.info(">>> 触发策略: strategy={}, type={}, source={}", strategyCode, type, source);
 
-    /**
-     * 获取已注册的事件触发器数量
-     */
-    public int getEventTriggerCount() {
-        int count = 0;
-        for (List<EventTriggerConfig> list : eventTriggers.values()) {
-            count += list.size();
+        Map<String, Object> params = new HashMap<>();
+        params.put("trigger_type", type);
+        params.put("trigger_source", source);
+        params.put("trigger_id", info.getTrigger().getId());
+        params.put("event_data", data);
+
+        try {
+            getStrategyExecutor().executeStrategy(strategyCode, params);
+        } catch (Exception e) {
+            log.error("触发策略失败: {}", strategyCode, e);
         }
-        return count;
     }
 
+    /* ============================================================
+     * Controller 兼容方法(你的控制器中正在使用)
+     * ============================================================ */
+
     /**
-     * 获取所有已注册的触发器信息(用于调试)
+     * 属性变化入口(兼容 Controller
      */
-    public Map<String, Object> getRegisteredTriggers() {
-        Map<String, Object> result = new HashMap<>();
+    public int handleAttrChange(String objCode, String attrKey, Object oldValue, Object newValue) {
+        String key = objCode + ":" + attrKey;
 
-        // 属性触发器列表
-        List<String> attrTriggerList = new ArrayList<>(attrTriggers.keySet());
-        result.put("attrTriggers", attrTriggerList);
+        List<TriggerInfo> list = attrTriggers.get(key);
+        if (list == null || list.isEmpty()) return 0;
 
-        // 事件触发器列表
-        List<String> eventTriggerList = new ArrayList<>(eventTriggers.keySet());
-        result.put("eventTriggers", eventTriggerList);
+        Map<String, Object> eventData = new HashMap<>();
+        eventData.put("oldValue", oldValue);
+        eventData.put("newValue", newValue);
 
-        // 数量统计
-        result.put("attrTriggerCount", getAttrTriggerCount());
-        result.put("eventTriggerCount", getEventTriggerCount());
+        int count = 0;
+        for (TriggerInfo info : list) {
+            OpEnergyStrategyTrigger trigger = info.getTrigger();
 
-        // 策略与触发器的映射关系
-        Map<String, List<String>> strategyMappings = new HashMap<>(strategyTriggerKeys);
-        result.put("strategyMappings", strategyMappings);
+            // 条件判断
+            if (!checkCondition(trigger, eventData)) continue;
 
-        return result;
-    }
+            executeTrigger(info, "ATTR", eventData, "ATTR:" + attrKey);
+            count++;
+        }
 
-    /**
-     * 检查某个属性是否有触发器监听
-     */
-    public boolean hasAttrTrigger(String objCode, String attrKey) {
-        String triggerKey = objCode + ":" + attrKey;
-        List<AttrTriggerConfig> list = attrTriggers.get(triggerKey);
-        return list != null && !list.isEmpty();
+        return count;
     }
 
     /**
-     * 检查某个事件是否有触发器监听
+     * 设备事件入口(兼容 Controller)
      */
-    public boolean hasEventTrigger(String objCode, String eventKey) {
-        String triggerKey = objCode + ":" + eventKey;
-        List<EventTriggerConfig> list = eventTriggers.get(triggerKey);
-        return list != null && !list.isEmpty();
-    }
-
-    // ==================== 内部配置类 ====================
+    public int handleDeviceEvent(String objCode, String eventKey, Map<String, Object> eventData) {
+        String key = objCode + ":" + eventKey;
+        List<TriggerInfo> list = eventTriggers.get(key);
+        if (list == null || list.isEmpty()) return 0;
 
-    /**
-     * 属性触发器配置
-     */
-    private static class AttrTriggerConfig {
-        String strategyCode;   // 策略代码
+        int count = 0;
+        for (TriggerInfo info : list) {
+            OpEnergyStrategyTrigger trigger = info.getTrigger();
 
-        String objCode;        // 设备代码
+            if (!checkCondition(trigger, eventData)) continue;
 
-        String attrKey;        // 属性键
+            executeTrigger(info, "EVENT", eventData, "EVENT:" + eventKey);
+            count++;
+        }
 
-        String conditionExpr;  // 条件表达式
+        return count;
     }
 
     /**
-     * 事件触发器配置
+     * 获取已注册触发器(用于 /scheduler/status 接口)
      */
-    private static class EventTriggerConfig {
-        String strategyCode;   // 策略代码
+    public Map<String, Object> getRegisteredTriggers() {
+        Map<String, Object> map = new HashMap<>();
+
+        map.put("eventTriggers", eventTriggers.keySet());
+        map.put("attrTriggers", attrTriggers.keySet());
+        map.put("strategyIndex", strategyTriggerIndex.keySet());
+
+        return map;
+    }
 
-        String objCode;        // 设备代码
 
-        String eventKey;       // 事件键
+    /* ============================================================
+     * 内部结构
+     * ============================================================ */
 
-        String conditionExpr;  // 条件表达式
+    @Data
+    private static class TriggerInfo {
+        private final String strategyCode;
+        private final OpEnergyStrategyTrigger trigger;
     }
-}
+}

+ 0 - 1
ems/ems-core/src/main/java/com/ruoyi/ems/domain/OpEnergyStrategy.java

@@ -29,7 +29,6 @@ public class OpEnergyStrategy extends BaseEntity
     private Integer priority;
     private String strategyDesc;
     private Integer execMode;
-    private String execRule;
     private Integer timeout;
     private Integer retryTimes;
 

+ 2 - 0
ems/ems-core/src/main/java/com/ruoyi/ems/mapper/OpEnergyStrategyTriggerMapper.java

@@ -54,6 +54,8 @@ public interface OpEnergyStrategyTriggerMapper {
         @Param("attrKey") String attrKey
     );
 
+    OpEnergyStrategyTrigger selectById(@Param("id") Long id);
+
     /**
      * 新增触发器
      * @param trigger 触发器

+ 2 - 0
ems/ems-core/src/main/java/com/ruoyi/ems/service/IOpEnergyStrategyTriggerService.java

@@ -47,6 +47,8 @@ public interface IOpEnergyStrategyTriggerService {
      */
     List<OpEnergyStrategyTrigger> findBySourceAndAttr(String sourceObjCode, String attrKey);
 
+    OpEnergyStrategyTrigger selectById(Long id);
+
     /**
      * 新增触发器
      * @param trigger 触发器

+ 5 - 0
ems/ems-core/src/main/java/com/ruoyi/ems/service/impl/OpEnergyStrategyTriggerServiceImpl.java

@@ -48,6 +48,11 @@ public class OpEnergyStrategyTriggerServiceImpl implements IOpEnergyStrategyTrig
     }
 
     @Override
+    public OpEnergyStrategyTrigger selectById(Long id) {
+        return triggerMapper.selectById( id);
+    }
+
+    @Override
     public int insertTrigger(OpEnergyStrategyTrigger trigger) {
         return triggerMapper.insertTrigger(trigger);
     }

+ 342 - 0
ems/ems-core/src/main/java/com/ruoyi/ems/strategy/executor/ParallelStepExecutor.java

@@ -0,0 +1,342 @@
+package com.ruoyi.ems.strategy.executor;
+
+import com.ruoyi.ems.domain.OpEnergyStrategyStep;
+import com.ruoyi.ems.domain.StrategyExecutionContext;
+import com.ruoyi.ems.service.IOpEnergyStrategyStepService;
+import com.ruoyi.ems.service.IStepExecutor;
+import com.ruoyi.ems.strategy.evaluator.ConditionEvaluator;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * 并行执行器(修复版)
+ *
+ * @author lvwenbin
+ */
+@Slf4j
+@Component
+public class ParallelStepExecutor implements IStepExecutor {
+
+    @Autowired
+    private IOpEnergyStrategyStepService stepService;
+
+    @Autowired
+    private ConditionEvaluator conditionEvaluator;
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+    private StepExecutorFactory stepExecutorFactory;
+
+    /**
+     * 并行执行线程池
+     * 修复:使用固定大小线程池,避免资源耗尽
+     */
+    private final ExecutorService parallelExecutor = Executors.newFixedThreadPool(10);
+
+    /**
+     * 默认超时时间(秒)
+     */
+    private static final int DEFAULT_TIMEOUT = 60;
+
+    /**
+     * 获取 StepExecutorFactory(延迟加载,避免循环依赖)
+     */
+    private StepExecutorFactory getStepExecutorFactory() {
+        if (stepExecutorFactory == null) {
+            synchronized (this) {
+                if (stepExecutorFactory == null) {
+                    stepExecutorFactory = applicationContext.getBean(StepExecutorFactory.class);
+                }
+            }
+        }
+        return stepExecutorFactory;
+    }
+
+    @Override
+    public Object execute(OpEnergyStrategyStep step, StrategyExecutionContext context) throws Exception {
+        String stepCode = step.getStepCode();
+        String strategyCode = step.getStrategyCode();
+
+        log.info("========== 并行步骤[{}]开始执行 ==========", stepCode);
+        log.info("并行步骤信息: strategyCode={}, stepName={}, parentStepCode={}",
+            strategyCode, step.getStepName(), step.getParentStepCode());
+
+        // 1. 查询所有子步骤
+        List<OpEnergyStrategyStep> childSteps = stepService.selectStepsByParentCode(strategyCode, stepCode);
+
+        // 【关键修复1】增加详细日志,确认查询结果
+        log.info("并行步骤[{}]查询子步骤: strategyCode={}, parentStepCode={}", stepCode, strategyCode, stepCode);
+
+        if (childSteps == null) {
+            log.warn("并行步骤[{}]子步骤查询返回null,可能是Service方法问题", stepCode);
+            childSteps = new ArrayList<>();
+        }
+
+        log.info("并行步骤[{}]查询到{}个子步骤(过滤前)", stepCode, childSteps.size());
+
+        // 打印所有查询到的子步骤
+        for (OpEnergyStrategyStep child : childSteps) {
+            log.info("  - 子步骤: code={}, name={}, type={}, enable={}, parentCode={}",
+                child.getStepCode(), child.getStepName(), child.getStepType(),
+                child.getEnable(), child.getParentStepCode());
+        }
+
+        // 过滤启用的步骤并排序
+        childSteps = childSteps.stream()
+            .filter(s -> {
+                boolean enabled = s.getEnable() != null && s.getEnable() == 1;
+                if (!enabled) {
+                    log.info("  - 子步骤[{}]已禁用,跳过", s.getStepCode());
+                }
+                return enabled;
+            })
+            .sorted(Comparator.comparingInt(s -> s.getStepIndex() != null ? s.getStepIndex() : 0))
+            .collect(Collectors.toList());
+
+        log.info("并行步骤[{}]过滤后有{}个启用的子步骤", stepCode, childSteps.size());
+
+        if (childSteps.isEmpty()) {
+            log.warn("并行步骤[{}]没有启用的子步骤,跳过执行", stepCode);
+            return buildResult(true, "并行步骤无启用的子步骤", new ArrayList<>());
+        }
+
+        log.info("并行步骤[{}]将并行执行{}个子步骤: {}",
+            stepCode,
+            childSteps.size(),
+            childSteps.stream().map(s -> s.getStepName() + "(" + s.getStepCode() + ")").collect(Collectors.joining(", ")));
+
+        // 2. 并行执行所有子步骤
+        List<CompletableFuture<ChildStepResult>> futures = new ArrayList<>();
+
+        // 【关键修复2】为每个子步骤创建独立的Future,并增加异常处理
+        for (int i = 0; i < childSteps.size(); i++) {
+            final OpEnergyStrategyStep childStep = childSteps.get(i);
+            final int index = i;
+
+            log.info("并行步骤[{}]创建子步骤任务[{}/{}]: {}",
+                stepCode, index + 1, childSteps.size(), childStep.getStepCode());
+
+            CompletableFuture<ChildStepResult> future = CompletableFuture.supplyAsync(() -> {
+                log.info(">>> 并行子任务[{}]开始执行: {}", index + 1, childStep.getStepCode());
+                ChildStepResult result = executeChildStep(childStep, context, index + 1);
+                log.info("<<< 并行子任务[{}]执行完成: {} - {}",
+                    index + 1, childStep.getStepCode(), result.success ? "成功" : "失败");
+                return result;
+            }, parallelExecutor).exceptionally(ex -> {
+                // 【关键修复3】捕获异步执行中的异常
+                log.error("并行子任务[{}]执行异常: {}", index + 1, childStep.getStepCode(), ex);
+                return new ChildStepResult(
+                    childStep.getStepCode(),
+                    childStep.getStepName(),
+                    false,
+                    null,
+                    "异步执行异常: " + ex.getMessage()
+                );
+            });
+
+            futures.add(future);
+        }
+
+        log.info("并行步骤[{}]已创建{}个并行任务,等待执行完成...", stepCode, futures.size());
+
+        // 3. 等待所有子步骤完成
+        int timeout = step.getTimeout() != null ? step.getTimeout() : DEFAULT_TIMEOUT;
+
+        try {
+            CompletableFuture<Void> allFutures = CompletableFuture.allOf(
+                futures.toArray(new CompletableFuture[0])
+            );
+
+            // 等待所有任务完成(带超时)
+            allFutures.get(timeout, TimeUnit.SECONDS);
+            log.info("并行步骤[{}]所有子任务执行完成", stepCode);
+
+        } catch (java.util.concurrent.TimeoutException e) {
+            log.error("并行步骤[{}]等待超时({}秒)", stepCode, timeout);
+        } catch (Exception e) {
+            log.error("并行步骤[{}]等待子步骤完成时发生异常", stepCode, e);
+        }
+
+        // 4. 收集执行结果
+        List<ChildStepResult> results = new ArrayList<>();
+        int successCount = 0;
+        int failCount = 0;
+
+        for (int i = 0; i < futures.size(); i++) {
+            CompletableFuture<ChildStepResult> future = futures.get(i);
+            OpEnergyStrategyStep childStep = childSteps.get(i);
+
+            try {
+                if (future.isDone()) {
+                    ChildStepResult result = future.get();
+                    results.add(result);
+
+                    if (result.success) {
+                        successCount++;
+                        // 保存结果到上下文
+                        context.setStepResult(childStep.getStepCode(), result.result);
+                        log.info("并行子步骤[{}]执行成功", childStep.getStepCode());
+                    } else {
+                        failCount++;
+                        log.warn("并行子步骤[{}]执行失败: {}", childStep.getStepCode(), result.errorMessage);
+                    }
+                } else {
+                    // 超时未完成
+                    log.warn("并行子步骤[{}]执行超时", childStep.getStepCode());
+                    results.add(new ChildStepResult(childStep.getStepCode(), childStep.getStepName(),
+                        false, null, "执行超时"));
+                    failCount++;
+
+                    // 取消未完成的任务
+                    future.cancel(true);
+                }
+            } catch (Exception e) {
+                log.error("并行子步骤[{}]结果收集异常", childStep.getStepCode(), e);
+                results.add(new ChildStepResult(childStep.getStepCode(), childStep.getStepName(),
+                    false, null, e.getMessage()));
+                failCount++;
+            }
+        }
+
+        // 5. 构建并返回结果
+        boolean allSuccess = failCount == 0;
+        String message = String.format("并行执行完成: 总数=%d, 成功=%d, 失败=%d",
+            childSteps.size(), successCount, failCount);
+
+        log.info("========== 并行步骤[{}]执行完成: {} ==========", stepCode, message);
+
+        return buildResult(allSuccess, message, results);
+    }
+
+    /**
+     * 执行单个子步骤
+     * 【关键修复4】增加更详细的执行日志和异常处理
+     */
+    private ChildStepResult executeChildStep(OpEnergyStrategyStep childStep,
+        StrategyExecutionContext context,
+        int taskIndex) {
+        String stepCode = childStep.getStepCode();
+        String stepName = childStep.getStepName();
+        long startTime = System.currentTimeMillis();
+
+        try {
+            log.info("[Task-{}] 开始执行子步骤: code={}, name={}, type={}",
+                taskIndex, stepCode, stepName, childStep.getStepType());
+
+            // 打印步骤详细信息
+            log.info("[Task-{}] 步骤详情: targetObj={}, abilityKey={}, param={}",
+                taskIndex,
+                childStep.getTargetObjCode(),
+                childStep.getAbilityKey(),
+                childStep.getAbilityParam());
+
+            // 评估执行条件
+            if (childStep.getConditionExpr() != null && !childStep.getConditionExpr().isEmpty()) {
+                log.info("[Task-{}] 评估执行条件: {}", taskIndex, childStep.getConditionExpr());
+                boolean conditionMet = conditionEvaluator.evaluate(
+                    childStep.getConditionExpr(),
+                    context.getVariables()
+                );
+                if (!conditionMet) {
+                    log.info("[Task-{}] 子步骤[{}]条件不满足,跳过", taskIndex, stepCode);
+                    return new ChildStepResult(stepCode, stepName, true, "条件不满足,已跳过", null);
+                }
+            }
+
+            // 获取对应的执行器
+            log.info("[Task-{}] 获取步骤类型[{}]的执行器", taskIndex, childStep.getStepType());
+            IStepExecutor executor = getStepExecutorFactory().getExecutor(childStep.getStepType());
+
+            if (executor == null) {
+                log.error("[Task-{}] 未找到步骤类型[{}]的执行器", taskIndex, childStep.getStepType());
+                return new ChildStepResult(stepCode, stepName, false, null,
+                    "未找到执行器: " + childStep.getStepType());
+            }
+
+            // 执行步骤
+            log.info("[Task-{}] 调用执行器执行步骤...", taskIndex);
+            Object result = executor.execute(childStep, context);
+
+            long duration = System.currentTimeMillis() - startTime;
+            log.info("[Task-{}] 子步骤[{}]执行成功, 耗时{}ms, 结果: {}",
+                taskIndex, stepCode, duration, result);
+
+            return new ChildStepResult(stepCode, stepName, true, result, null);
+
+        } catch (Exception e) {
+            long duration = System.currentTimeMillis() - startTime;
+            log.error("[Task-{}] 子步骤[{}]执行失败, 耗时{}ms", taskIndex, stepCode, duration, e);
+
+            // 检查是否配置了失败继续
+            if (childStep.getContinueOnFail() != null && childStep.getContinueOnFail() == 1) {
+                log.warn("[Task-{}] 子步骤[{}]执行失败,但配置了失败继续", taskIndex, stepCode);
+                return new ChildStepResult(stepCode, stepName, true, null, "失败但继续: " + e.getMessage());
+            }
+
+            return new ChildStepResult(stepCode, stepName, false, null, e.getMessage());
+        }
+    }
+
+    /**
+     * 构建执行结果
+     */
+    private Map<String, Object> buildResult(boolean success, String message, List<ChildStepResult> childResults) {
+        Map<String, Object> result = new HashMap<>();
+        result.put("success", success);
+        result.put("message", message);
+        result.put("childResults", childResults);
+        result.put("childCount", childResults.size());
+
+        // 统计成功和失败数量
+        long successCount = childResults.stream().filter(r -> r.success).count();
+        long failCount = childResults.size() - successCount;
+        result.put("successCount", successCount);
+        result.put("failCount", failCount);
+
+        return result;
+    }
+
+    @Override
+    public String getSupportedType() {
+        return "PARALLEL";
+    }
+
+    /**
+     * 子步骤执行结果
+     */
+    private static class ChildStepResult {
+        String stepCode;
+        String stepName;
+        boolean success;
+        Object result;
+        String errorMessage;
+
+        ChildStepResult(String stepCode, String stepName, boolean success, Object result, String errorMessage) {
+            this.stepCode = stepCode;
+            this.stepName = stepName;
+            this.success = success;
+            this.result = result;
+            this.errorMessage = errorMessage;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("{stepCode='%s', stepName='%s', success=%s, error='%s'}",
+                stepCode, stepName, success, errorMessage);
+        }
+    }
+}

+ 11 - 0
ems/ems-core/src/main/java/com/ruoyi/ems/strategy/executor/StrategyExecutor.java

@@ -211,6 +211,17 @@ public class StrategyExecutor {
             // 执行步骤
             StepExecutionResult result = executeStep(step, context);
 
+            // ✅ 修复:增加空判断
+            if (result == null) {
+                log.error("步骤[{}]执行返回null,视为失败", step.getStepCode());
+                allSuccess = false;
+                if (step.getContinueOnFail() != 1) {
+                    log.error("步骤执行失败且未配置失败继续,终止策略: {}", step.getStepCode());
+                    break;
+                }
+                continue;
+            }
+
             if (!result.isSuccess()) {
                 allSuccess = false;
                 if (step.getContinueOnFail() != 1) {

+ 1 - 7
ems/ems-core/src/main/resources/mapper/ems/OpEnergyStrategyMapper.xml

@@ -17,7 +17,6 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         <result property="strategyState" column="strategy_state"/>
         <result property="priority" column="priority"/>
         <result property="execMode" column="exec_mode"/>
-        <result property="execRule" column="exec_rule"/>
         <result property="timeout" column="timeout"/>
         <result property="retryTimes" column="retry_times"/>
         <result property="lastExecTime" column="last_exec_time"/>
@@ -35,7 +34,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
     <sql id="selectStrategyVo">
         select id, area_code, strategy_code, strategy_name, strategy_desc, scene_type,
                strategy_category, trigger_type, trigger_config, strategy_state, priority,
-               exec_mode, exec_rule, timeout, retry_times, last_exec_time, last_exec_result,
+               exec_mode, timeout, retry_times, last_exec_time, last_exec_result,
                exec_count, success_count, fail_count, version,
                create_by, create_time, update_by, update_time
         from adm_op_energy_strategy
@@ -86,8 +85,6 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
         <include refid="selectStrategyVo"/>
         where strategy_state = 1
         and trigger_type = 2
-        and exec_rule is not null
-        and exec_rule != ''
         order by priority desc
     </select>
 
@@ -105,7 +102,6 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="strategyState != null">strategy_state,</if>
             <if test="priority != null">priority,</if>
             <if test="execMode != null">exec_mode,</if>
-            <if test="execRule != null">exec_rule,</if>
             <if test="timeout != null">timeout,</if>
             <if test="retryTimes != null">retry_times,</if>
             <if test="version != null">version,</if>
@@ -123,7 +119,6 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="strategyState != null">#{strategyState},</if>
             <if test="priority != null">#{priority},</if>
             <if test="execMode != null">#{execMode},</if>
-            <if test="execRule != null">#{execRule},</if>
             <if test="timeout != null">#{timeout},</if>
             <if test="retryTimes != null">#{retryTimes},</if>
             <if test="version != null">#{version},</if>
@@ -144,7 +139,6 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
             <if test="strategyState != null">strategy_state = #{strategyState},</if>
             <if test="priority != null">priority = #{priority},</if>
             <if test="execMode != null">exec_mode = #{execMode},</if>
-            <if test="execRule != null">exec_rule = #{execRule},</if>
             <if test="timeout != null">timeout = #{timeout},</if>
             <if test="retryTimes != null">retry_times = #{retryTimes},</if>
             <if test="lastExecTime != null">last_exec_time = #{lastExecTime},</if>

+ 5 - 0
ems/ems-core/src/main/resources/mapper/ems/OpEnergyStrategyTriggerMapper.xml

@@ -50,6 +50,11 @@
         order by priority desc
     </select>
 
+    <select id="selectById" parameterType="Long" resultMap="TriggerResult">
+        <include refid="selectTriggerVo"/>
+        where id = #{id}
+    </select>
+
     <insert id="insertTrigger" parameterType="com.ruoyi.ems.domain.OpEnergyStrategyTrigger" useGeneratedKeys="true" keyProperty="id">
         insert into adm_op_energy_strategy_trigger
         <trim prefix="(" suffix=")" suffixOverrides=",">

+ 1 - 1
ems/sql/ems_init_data_test.sql

@@ -126,7 +126,7 @@ INSERT INTO `adm_ems_obj_event` (`model_code`, `event_type`, `event_key`, `event
 INSERT INTO `adm_ems_obj_event` (`model_code`, `event_type`, `event_key`, `event_name`, `event_desc`, `event_code`, `ext_event_code`) VALUES ('M_W2_QF_GEEKOPEN', 2, 'key-off', '设备断电', '设备断电', 'e-qf-off-0001', null);
 
 
-INSERT INTO `adm_op_energy_strategy` (`area_code`, `strategy_code`, `strategy_name`, `scene_type`, `strategy_category`, `trigger_type`, `trigger_config`, `strategy_state`, `priority`, `strategy_desc`, `exec_mode`, `exec_rule`, `timeout`, `retry_times`, `last_exec_time`, `last_exec_result`, `exec_count`, `success_count`, `fail_count`, `version`, `create_by`, `create_time`, `update_by`, `update_time`) VALUES ('320100', 'POLLING_LIGHT_001', '灯1开启后打开灯2(轮询监控V2)', 'ENERGY_SAVE', 'AUTO', 5, NULL, 1, 50, '使用轮询监控机制:每2秒调用syncState能力查询灯1状态,当灯1开启时自动打开灯2', 1, NULL, 300, 0, '2025-12-11 17:10:06', 0, 16, 16, 0, 1, NULL, '2025-12-11 14:16:20', NULL, '2025-12-11 17:08:33');
+INSERT INTO `adm_op_energy_strategy` (`area_code`, `strategy_code`, `strategy_name`, `scene_type`, `strategy_category`, `trigger_type`, `trigger_config`, `strategy_state`, `priority`, `strategy_desc`, `exec_mode`, `timeout`, `retry_times`, `last_exec_time`, `last_exec_result`, `exec_count`, `success_count`, `fail_count`, `version`, `create_by`, `create_time`, `update_by`, `update_time`) VALUES ('320100', 'POLLING_LIGHT_001', '灯1开启后打开灯2(轮询监控V2)', 'ENERGY_SAVE', 'AUTO', 5, NULL, 1, 50, '使用轮询监控机制:每2秒调用syncState能力查询灯1状态,当灯1开启时自动打开灯2', 1, 300, 0, '2025-12-11 17:10:06', 0, 16, 16, 0, 1, NULL, '2025-12-11 14:16:20', NULL, '2025-12-11 17:08:33');
 INSERT INTO `adm_op_energy_strategy_trigger` (`strategy_code`, `trigger_name`, `trigger_type`, `source_obj_type`, `source_obj_code`, `source_model_code`, `event_key`, `attr_key`, `condition_expr`, `enable`, `priority`) VALUES ('POLLING_LIGHT_001', '监控灯1开关状态', 'POLLING', 2, 'D-B-QS-10000001', 'M_W2_QS_KEKA_86', '', 'Switch', '{\"left\":\"Switch\",\"op\":\"==\",\"right\":\"1\",\"polling\":{\"enabled\":true,\"interval\":2000,\"initialDelay\":1000,\"activeQuery\":true,\"queryWaitTime\":500,\"triggerMode\":\"ALWAYS\",\"queryAbility\":{\"abilityKey\":\"syncState\",\"abilityParam\":\"\"}}}', 1, 50);
 INSERT INTO `adm_op_energy_strategy_step` (`strategy_code`, `step_code`, `step_name`, `step_type`, `step_index`, `parent_step_code`, `condition_expr`, `target_obj_type`, `target_obj_code`, `target_model_code`, `ability_key`, `ability_param`, `param_source`, `param_mapping`, `delay_seconds`, `retry_on_fail`, `retry_times`, `retry_interval`, `continue_on_fail`, `timeout`, `enable`, `remark`, `loop_max_count`, `loop_interval`, `loop_condition`) VALUES ('POLLING_LIGHT_001', 'STEP_1765444159961_kin3j', '能力调用', 'ABILITY', 1, NULL, NULL, 2, 'D-B-QS-10000002', 'M_W2_QS_KEKA_86', 'on-off', '1', 'STATIC', NULL, 10, NULL, NULL, NULL, 0, NULL, 1, NULL, 100, 1000, '');
 

+ 0 - 1
ems/sql/ems_server.sql

@@ -896,7 +896,6 @@ CREATE TABLE adm_op_energy_strategy (
   `priority` INT DEFAULT 50 COMMENT '优先级:0-100,数值越大优先级越高',
   `strategy_desc` VARCHAR(256) DEFAULT NULL COMMENT '策略描述',
   `exec_mode` INT DEFAULT NULL COMMENT '执行模式:1-串行,2-并行,99-手动',
-  `exec_rule` VARCHAR(128) DEFAULT NULL COMMENT 'CRON表达式',
   `timeout` INT DEFAULT 300 COMMENT '超时时间(秒)',
   `retry_times` INT DEFAULT 0 COMMENT '失败重试次数',
   `last_exec_time` DATETIME DEFAULT NULL COMMENT '最后执行时间',