|
@@ -0,0 +1,120 @@
|
|
|
+package com.huashe.park.collect.controller;
|
|
|
+
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+
|
|
|
+import org.junit.jupiter.api.Test;
|
|
|
+import org.junit.jupiter.api.extension.ExtendWith;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.boot.test.context.SpringBootTest;
|
|
|
+import org.springframework.test.context.junit.jupiter.SpringExtension;
|
|
|
+
|
|
|
+import com.huashe.park.collect.SpingMqttApplication;
|
|
|
+import com.huashe.park.collect.core.MqttTemplate;
|
|
|
+import com.huashe.park.common.ByteArrayUtil;
|
|
|
+import com.huashe.park.core.service.IMachineProcessResultService;
|
|
|
+import com.huashe.park.core.service.IMachineProcessService;
|
|
|
+import com.huashe.park.domain.entity.MachineProcess;
|
|
|
+import com.huashe.park.domain.entity.MachineProcessResult;
|
|
|
+
|
|
|
+@SpringBootTest(classes = SpingMqttApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
|
|
+@ExtendWith(SpringExtension.class)
|
|
|
+class TopicControllerTest {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private MqttTemplate mqttTemplate;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IMachineProcessResultService machineProcessResultService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IMachineProcessService machineProcessService;
|
|
|
+
|
|
|
+ @Test
|
|
|
+ void unitTest() {
|
|
|
+ List<MachineProcessResult> machineProcessResults = machineProcessResultService
|
|
|
+ .selectMachineProcessResultList(new MachineProcessResult() {
|
|
|
+ {
|
|
|
+ setPileId("ilf171fk");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ List<MachineProcess> machineProcesses = machineProcessService.selectMachineProcessList(new MachineProcess() {
|
|
|
+ {
|
|
|
+ setPileId("ilf171fk");
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ CountDownLatch latch = new CountDownLatch(1);
|
|
|
+ long l = System.currentTimeMillis();
|
|
|
+ // 过程线程:处理数据并发送信号
|
|
|
+ Thread processThread = new Thread(() -> {
|
|
|
+ System.out.println("[过程线程] 开始处理数据...");
|
|
|
+ machineProcesses.forEach(machineProcess -> {
|
|
|
+ byte[] bytes;
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ machineProcess.setId(null);
|
|
|
+ machineProcess.setPileId("talv1qmq");
|
|
|
+ machineProcess.setMachineId("wi4mdee7");
|
|
|
+ machineProcess.setDataTime(System.currentTimeMillis());
|
|
|
+ machineProcess.setUuid(Long.toString(System.currentTimeMillis()));
|
|
|
+ if (machineProcesses.indexOf(machineProcess) > machineProcesses.size() - 3) {
|
|
|
+ latch.countDown(); // 发送信号(计数减1)
|
|
|
+ }
|
|
|
+ bytes = ByteArrayUtil.serializeObject(machineProcess);
|
|
|
+ mqttTemplate.sendByte("/gl41DkJLD3N/123457", "/gl41DkJLD3N/123457/machine/piling/process", bytes, 0,
|
|
|
+ false);
|
|
|
+
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ System.out.println("[过程线程] 数据处理完成,发送启动信号");
|
|
|
+
|
|
|
+ });
|
|
|
+
|
|
|
+ // 结果线程:等待信号后处理结果
|
|
|
+ Thread resultThread = new Thread(() -> {
|
|
|
+ try {
|
|
|
+ System.out.println("[结果线程] 等待启动信号...");
|
|
|
+ latch.await(); // 阻塞直到门闩计数归零
|
|
|
+ machineProcessResults.forEach(result -> {
|
|
|
+ byte[] bytes;
|
|
|
+ try {
|
|
|
+ result.setId(null);
|
|
|
+ result.setPileId("talv1qmq");
|
|
|
+
|
|
|
+ result.setMachineId("wi4mdee7");
|
|
|
+ result.setDataTime(System.currentTimeMillis());
|
|
|
+ result.setStartTime(l);
|
|
|
+ result.setEndTime(System.currentTimeMillis());
|
|
|
+ result.setUuid(Long.toString(System.currentTimeMillis()));
|
|
|
+ bytes = ByteArrayUtil.serializeObject(result);
|
|
|
+ mqttTemplate.sendByte("/gl41DkJLD3N/123457", "/gl41DkJLD3N/123457/machine/piling/result", bytes,
|
|
|
+ 0, false);
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ System.out.println("[结果线程] 收到信号,开始处理结果数据");
|
|
|
+ // 此处添加具体的结果处理逻辑
|
|
|
+ }
|
|
|
+ catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ // 启动线程
|
|
|
+ processThread.start();
|
|
|
+ resultThread.start();
|
|
|
+ // 等待线程执行完成
|
|
|
+ try {
|
|
|
+ processThread.join();
|
|
|
+ resultThread.join();
|
|
|
+ }
|
|
|
+ catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|