diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java index 18f8e94e..d4eec769 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/akka/actors/ServerActor.java @@ -1,6 +1,7 @@ package com.github.kfcfans.oms.server.akka.actors; import akka.actor.AbstractActor; +import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.request.WorkerHeartbeat; import com.github.kfcfans.common.response.AskResponse; @@ -45,8 +46,10 @@ public class ServerActor extends AbstractActor { try { InstanceManager.updateStatus(req); - // 回复接收成功 - getSender().tell(AskResponse.succeed(null), getSelf()); + // 结束状态(成功/失败)需要回复消息 + if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) { + getSender().tell(AskResponse.succeed(null), getSelf()); + } }catch (Exception e) { log.error("[ServerActor] update instance status failed for request: {}.", req, e); } diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/BroadcastProcessorDemo.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/BroadcastProcessorDemo.java index 9bb526c1..b8aec9ba 100644 --- a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/BroadcastProcessorDemo.java +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/BroadcastProcessorDemo.java @@ -6,6 +6,7 @@ import com.github.kfcfans.oms.worker.core.processor.TaskContext; import com.github.kfcfans.oms.worker.core.processor.TaskResult; import com.github.kfcfans.oms.worker.core.processor.sdk.BroadcastProcessor; import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -18,6 +19,7 @@ import java.util.concurrent.ThreadLocalRandom; * @since 2020/4/17 */ @Slf4j +@Component public class BroadcastProcessorDemo extends BroadcastProcessor { @Override diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/MapReduceProcessorDemo.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/MapReduceProcessorDemo.java index 43943ff2..5883e5bc 100644 --- a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/MapReduceProcessorDemo.java +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/MapReduceProcessorDemo.java @@ -12,6 +12,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.ToString; import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -24,6 +25,7 @@ import java.util.concurrent.ThreadLocalRandom; * @since 2020/4/17 */ @Slf4j +@Component public class MapReduceProcessorDemo extends MapReduceProcessor { // 每一批发送任务大小 diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java index 3a9b88dd..a15fac47 100644 --- a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/processors/StandaloneProcessorDemo.java @@ -5,6 +5,7 @@ import com.github.kfcfans.oms.worker.core.processor.ProcessResult; import com.github.kfcfans.oms.worker.core.processor.TaskContext; import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; import java.util.concurrent.ThreadLocalRandom; @@ -16,15 +17,16 @@ import java.util.concurrent.ThreadLocalRandom; * @since 2020/4/17 */ @Slf4j +@Component public class StandaloneProcessorDemo implements BasicProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { System.out.println("================ StandaloneProcessorDemo#process ================"); - System.out.println("TaskContext: " + JSONObject.toJSONString(context)); - boolean success = ThreadLocalRandom.current().nextBoolean(); + System.out.println("TaskContext: " + JSONObject.toJSONString(context)); + System.out.println("ProcessSuccess: " + success); return new ProcessResult(success, context + ": " + success); } } diff --git a/oh-my-scheduler-worker-samples/src/main/resources/logback.xml b/oh-my-scheduler-worker-samples/src/main/resources/logback.xml new file mode 100644 index 00000000..136026ef --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/resources/logback.xml @@ -0,0 +1,27 @@ + + + + + + + + + %red(%d{yyyy-MM-dd HH:mm:ss}) %highlight(%-5level) - %cyan(%msg%n) + + UTF-8 + + + + + + + + + + + + + + + + diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java index 780d0c8d..8ed39f70 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/FrequentTaskTracker.java @@ -257,17 +257,17 @@ public class FrequentTaskTracker extends TaskTracker { continue; } - String result; switch (executeType) { // STANDALONE 代表任务确实已经执行完毕了 case STANDALONE: // 查询数据库获取结果(STANDALONE每个SubInstance只会有一条Task记录) - result = taskPersistenceService.getAllTask(instanceId, subInstanceId).get(0).getResult(); - onFinished(subInstanceId, true, result, iterator); + TaskDO resultTask = taskPersistenceService.getAllTask(instanceId, subInstanceId).get(0); + boolean success = resultTask.getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(); + onFinished(subInstanceId, success, resultTask.getResult(), iterator); continue; // MAP 不关心结果,最简单 case MAP: - result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum); + String result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum); onFinished(subInstanceId, holder.failedNum == 0, result, iterator); continue; // MapReduce 和 BroadCast 需要根据是否有 LAST_TASK 来判断结束与否 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java index 5fe3426f..29806fe0 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/task/TaskTracker.java @@ -146,7 +146,7 @@ public abstract class TaskTracker { // 处理失败的情况 int configTaskRetryNum = instanceInfo.getTaskRetryNum(); - if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum > 1) { + if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1) { // 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况之前不会去查DB) Optional taskOpt = taskPersistenceService.getTask(instanceId, taskId);