diff --git a/oh-my-scheduler-worker/pom.xml b/oh-my-scheduler-worker/pom.xml index 222b912e..4345c17a 100644 --- a/oh-my-scheduler-worker/pom.xml +++ b/oh-my-scheduler-worker/pom.xml @@ -21,6 +21,7 @@ 3.4.2 28.2-jre 1.2.58 + 5.6.1 @@ -74,6 +75,14 @@ ${fastjson.version} + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + @@ -84,6 +93,8 @@ + + diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java index e037542c..c616d58f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -1,27 +1,23 @@ package com.github.kfcfans.oms.worker; -import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor; import com.github.kfcfans.oms.worker.actors.TaskTrackerActor; import com.github.kfcfans.oms.worker.common.OhMyConfig; import com.github.kfcfans.oms.worker.common.constants.AkkaConstant; import com.github.kfcfans.oms.worker.common.utils.NetUtils; import com.github.kfcfans.oms.worker.common.utils.SpringUtils; -import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTracker; import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; -import org.springframework.util.StopWatch; import org.springframework.util.StringUtils; import java.util.Map; @@ -45,7 +41,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean { @Override public void afterPropertiesSet() throws Exception { - + init(); } public void init() { @@ -68,11 +64,15 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean { actorSystem = ActorSystem.create(AkkaConstant.ACTOR_SYSTEM_NAME, akkaFinalConfig); actorSystem.actorOf(Props.create(TaskTrackerActor.class)); - actorSystem.actorOf(Props.create(ProcessorTracker.class)); + actorSystem.actorOf(Props.create(ProcessorTrackerActor.class)); + log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem); // 初始化存储 TaskPersistenceService.INSTANCE.init(); + log.info("[OhMyWorker] local storage initialized successfully."); + + log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch); }catch (Exception e) { log.error("[OhMyWorker] initialize OhMyWorker failed, using {}.", stopwatch, e); } @@ -85,4 +85,17 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean { public void setConfig(OhMyConfig cfg) { config = cfg; } + + public static void main(String[] args) { + + System.out.println(org.h2.util.NetUtils.getLocalAddress()); + + OhMyConfig config = new OhMyConfig(); + config.setAppName("oms"); + OhMyWorker ohMyWorker = new OhMyWorker(); + ohMyWorker.setConfig(config); + ohMyWorker.init(); + + + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java index 19ee154b..76e2cf77 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java @@ -214,52 +214,4 @@ public class TaskDAOImpl implements TaskDAO { collection.forEach(str -> sb.append("'").append(str).append("',")); return sb.replace(sb.length() -1, sb.length(), " ) ").toString(); } - - public static void main(String[] args) throws Exception { - - System.out.println(getInStringCondition(Lists.newArrayList("2.1"))); - - TaskDAOImpl taskDAO = new TaskDAOImpl(); - taskDAO.initTable(); - - TaskDO taskDO = new TaskDO(); - taskDO.setJobId("11"); - taskDO.setInstanceId("22"); - taskDO.setTaskId("2.1"); - taskDO.setTaskName("zzz"); - taskDO.setTaskContent("hhhh".getBytes()); - taskDO.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); - taskDO.setLastModifiedTime(System.currentTimeMillis()); - taskDO.setCreatedTime(System.currentTimeMillis()); - taskDO.setFailedCnt(0); - - taskDAO.save(taskDO); - - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId("22"); - query.setTaskId("2.1"); - final List res = taskDAO.simpleQuery(query); - System.out.println(res); - System.out.println(new String(res.get(0).getTaskContent())); - - // update - TaskDO update = new TaskDO(); - update.setFailedCnt(8); - taskDAO.simpleUpdate(query, update); - - final List res2 = taskDAO.simpleQuery(query); - System.out.println(res2); - - SimpleTaskQuery query3 = new SimpleTaskQuery(); - query.setInstanceId("22"); - query.setQueryContent("status, count(*) as num"); - query.setOtherCondition("GROUP BY status"); - List> dbRES = taskDAO.simpleQueryPlus(query); - System.out.println(dbRES); - - System.out.println("=========== start to delete ==========="); - System.out.println(taskDAO.batchDelete("22", Lists.newArrayList("2.1")));; - - Thread.sleep(100000); - } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java index 167902f2..afc48883 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java @@ -63,7 +63,7 @@ public class TaskDO { ", jobId='" + jobId + '\'' + ", instanceId='" + instanceId + '\'' + ", taskName='" + taskName + '\'' + - ", taskContent=" + new String(taskContent) + + ", taskContent=" + (taskContent == null ? "" : new String(taskContent)) + ", address='" + address + '\'' + ", status=" + status + ", result='" + result + '\'' + diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index 10bbc750..48a0ac76 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -122,4 +122,10 @@ public class TaskPersistenceService { public int batchDelete(String instanceId, List taskIds) { return taskDAO.batchDelete(instanceId, taskIds); } + + public List listAll() { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setQueryCondition("1 = 1"); + return taskDAO.simpleQuery(query); + } } diff --git a/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java new file mode 100644 index 00000000..f7fdf195 --- /dev/null +++ b/oh-my-scheduler-worker/src/test/java/com/github/kfcfans/oms/PersistenceServiceTest.java @@ -0,0 +1,69 @@ +package com.github.kfcfans.oms; + +import com.github.kfcfans.oms.worker.common.constants.TaskStatus; +import com.github.kfcfans.oms.worker.persistence.TaskDO; +import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; +import com.google.common.collect.Lists; +import org.junit.jupiter.api.*; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * H2 数据库持久化测试 + * + * @author tjq + * @since 2020/3/23 + */ +public class PersistenceServiceTest { + + private static TaskPersistenceService taskPersistenceService = TaskPersistenceService.INSTANCE; + + @BeforeAll + public static void initTable() throws Exception { + taskPersistenceService.init(); + + List taskList = Lists.newLinkedList(); + for (int i = 0; i < 4; i++) { + TaskDO task = new TaskDO(); + taskList.add(task); + + task.setJobId("1"); + task.setInstanceId("10086" + ThreadLocalRandom.current().nextInt(2)); + task.setTaskId(i + ""); + task.setFailedCnt(0); + task.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue()); + task.setTaskName("ROOT_TASK"); + task.setLastModifiedTime(System.currentTimeMillis()); + task.setCreatedTime(System.currentTimeMillis()); + } + + taskPersistenceService.batchSave(taskList); + System.out.println("=============== init data ==============="); + taskList.forEach(System.out::println); + } + + @AfterAll + public static void stop() throws Exception { + Thread.sleep(60000); + } + + @AfterEach + public void listData() { + System.out.println("============= listData ============="); + List result = taskPersistenceService.listAll(); + System.out.println("size: " + result.size()); + result.forEach(System.out::println); + } + + + @Test + public void testBatchDelete() { + + System.out.println("=============== testBatchDelete ==============="); + int delete = taskPersistenceService.batchDelete("100860", Lists.newArrayList("0", "1")); + System.out.println("delete result:" + delete); + } + + +}