diff --git a/pom.xml b/pom.xml index ab2a2670..99022ea4 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ 1.0.0 pom powerjob - https://github.com/KFCFans/OhMyScheduler + https://github.com/KFCFans/PowerJob Distributed scheduling and execution framework @@ -19,8 +19,8 @@ - https://github.com/KFCFans/OhMyScheduler - https://github.com/KFCFans/OhMyScheduler.git + https://github.com/KFCFans/PowerJob + https://github.com/KFCFans/PowerJob.git diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java index 7729b75b..749926db 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java @@ -28,6 +28,8 @@ public enum InstanceStatus { // 广义的运行状态 public static final List generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v); + // 结束状态 + public static final List finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v); public static InstanceStatus of(int v) { for (InstanceStatus is : values()) { diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java index b409347e..420de9c1 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java @@ -17,6 +17,7 @@ public class RemoteConstant { public static final String Task_TRACKER_ACTOR_NAME = "task_tracker"; public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker"; public static final String WORKER_ACTOR_NAME = "worker"; + public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting"; public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf"; diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java index ca3b5c2c..ae7a24e2 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java @@ -46,8 +46,8 @@ public class SaveJobInfoRequest { /* ************************** 运行时配置 ************************** */ - // 最大同时运行任务数 - private Integer maxInstanceNum = 1; + // 最大同时运行任务数,0 代表不限 + private Integer maxInstanceNum = 0; // 并发度,同时执行的线程数量 private Integer concurrency = 5; // 任务整体超时时间 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java index d173801f..41439727 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java @@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.akka; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.routing.RoundRobinPool; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.akka.actors.FriendActor; @@ -58,7 +59,9 @@ public class OhMyServer { Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig); - actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME); + actorSystem.actorOf(Props.create(ServerActor.class) + .withDispatcher("akka.server-actor-dispatcher") + .withRouter(new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)), RemoteConstant.SERVER_ACTOR_NAME); actorSystem.actorOf(Props.create(FriendActor.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME); log.info("[OhMyServer] OhMyServer's akka system start successfully, using time {}.", stopwatch); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java index fc83f9a2..fdd82196 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java @@ -63,7 +63,7 @@ public class ServerActor extends AbstractActor { getInstanceManager().updateStatus(req); // 结束状态(成功/失败)需要回复消息 - if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) { + if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) { getSender().tell(AskResponse.succeed(null), getSelf()); } }catch (Exception e) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index 7f5200a8..13281259 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -68,16 +68,20 @@ public class DispatchService { // 查询当前运行的实例数 long current = System.currentTimeMillis(); - long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus); - // 超出最大同时运行限制,不执行调度 - if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { - String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum()); - log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount); - instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now); + // 0 代表不限制在线任务,还能省去一次 DB 查询 + if (jobInfo.getMaxInstanceNum() > 0) { - instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); - return; + long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV())); + // 超出最大同时运行限制,不执行调度 + if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { + String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum()); + log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount); + instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now); + + instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); + return; + } } // 获取当前所有可用的Worker diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index 781dd5e5..da429f08 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -35,17 +35,14 @@ public class CleanService { @Resource private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; - @Value("${oms.log.retention.local}") - private int localLogRetentionDay; - @Value("${oms.log.retention.remote}") - private int remoteLogRetentionDay; + @Value("${oms.instanceinfo.retention}") + private int instanceInfoRetentionDay; + @Value("${oms.container.retention.local}") private int localContainerRetentionDay; @Value("${oms.container.retention.remote}") private int remoteContainerRetentionDay; - @Value("${oms.instanceinfo.retention}") - private int instanceInfoRetentionDay; private static final int TEMPORARY_RETENTION_DAY = 3; @@ -65,12 +62,12 @@ public class CleanService { cleanWorkflowInstanceLog(); // 释放磁盘空间 - cleanLocal(OmsFileUtils.genLogDirPath(), localLogRetentionDay); + cleanLocal(OmsFileUtils.genLogDirPath(), instanceInfoRetentionDay); cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay); cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY); // 删除 GridFS 过期文件 - cleanRemote(GridFsManager.LOG_BUCKET, remoteLogRetentionDay); + cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay); cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/WebLogAspect.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/WebLogAspect.java index d8693f46..1b83499a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/WebLogAspect.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/WebLogAspect.java @@ -8,9 +8,11 @@ import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.annotation.Pointcut; +import org.springframework.core.io.Resource; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; +import org.springframework.web.multipart.MultipartFile; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -86,6 +88,11 @@ public class WebLogAspect { if (obj instanceof HttpServletRequest || obj instanceof HttpServletResponse) { break; } + // FatJar + if (obj instanceof MultipartFile || obj instanceof Resource) { + break; + } + objList.add(obj); } return JSONObject.toJSONString(objList); diff --git a/powerjob-server/src/main/resources/application-daily.properties b/powerjob-server/src/main/resources/application-daily.properties index 87036311..f0870b5f 100644 --- a/powerjob-server/src/main/resources/application-daily.properties +++ b/powerjob-server/src/main/resources/application-daily.properties @@ -21,11 +21,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true ####### 资源清理配置 ####### -oms.log.retention.local=1 -oms.log.retention.remote=1 +oms.instanceinfo.retention=1 oms.container.retention.local=1 oms.container.retention.remote=-1 -oms.instanceinfo.retention=1 ####### 缓存配置 ####### oms.instance.metadata.cache.size=1024 \ No newline at end of file diff --git a/powerjob-server/src/main/resources/application-pre.properties b/powerjob-server/src/main/resources/application-pre.properties index 5e2291d8..f2d33b88 100644 --- a/powerjob-server/src/main/resources/application-pre.properties +++ b/powerjob-server/src/main/resources/application-pre.properties @@ -21,11 +21,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true ####### 资源清理配置 ####### -oms.log.retention.local=3 -oms.log.retention.remote=3 +oms.instanceinfo.retention=3 oms.container.retention.local=3 oms.container.retention.remote=-1 -oms.instanceinfo.retention=3 ####### 缓存配置 ####### oms.instance.metadata.cache.size=1024 \ No newline at end of file diff --git a/powerjob-server/src/main/resources/application-product.properties b/powerjob-server/src/main/resources/application-product.properties index 8df29624..0a4e2556 100644 --- a/powerjob-server/src/main/resources/application-product.properties +++ b/powerjob-server/src/main/resources/application-product.properties @@ -21,11 +21,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true ####### 资源清理配置 ####### -oms.log.retention.local=7 -oms.log.retention.remote=7 +oms.instanceinfo.retention=7 oms.container.retention.local=7 oms.container.retention.remote=-1 -oms.instanceinfo.retention=3 ####### 缓存配置 ####### oms.instance.metadata.cache.size=2048 \ No newline at end of file diff --git a/powerjob-server/src/main/resources/oms-server.akka.conf b/powerjob-server/src/main/resources/oms-server.akka.conf index fe6ab702..d7ff91c0 100644 --- a/powerjob-server/src/main/resources/oms-server.akka.conf +++ b/powerjob-server/src/main/resources/oms-server.akka.conf @@ -20,4 +20,24 @@ akka { canonical.port = 0 } } + + server-actor-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 + } } \ No newline at end of file diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java index 4175bfe7..0b763742 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java @@ -1,7 +1,10 @@ package com.github.kfcfans.powerjob.worker; +import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.DeadLetter; import akka.actor.Props; +import akka.routing.RoundRobinPool; import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.response.ResultDTO; @@ -9,6 +12,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; +import com.github.kfcfans.powerjob.worker.actors.TroubleshootingActor; import com.github.kfcfans.powerjob.worker.actors.ProcessorTrackerActor; import com.github.kfcfans.powerjob.worker.actors.TaskTrackerActor; import com.github.kfcfans.powerjob.worker.actors.WorkerActor; @@ -93,11 +97,20 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME); Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); + int cores = Runtime.getRuntime().availableProcessors(); actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig); - actorSystem.actorOf(Props.create(TaskTrackerActor.class), RemoteConstant.Task_TRACKER_ACTOR_NAME); - actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); + actorSystem.actorOf(Props.create(TaskTrackerActor.class) + .withDispatcher("akka.task-tracker-dispatcher") + .withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME); + actorSystem.actorOf(Props.create(ProcessorTrackerActor.class) + .withDispatcher("akka.processor-tracker-dispatcher") + .withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); actorSystem.actorOf(Props.create(WorkerActor.class), RemoteConstant.WORKER_ACTOR_NAME); + // 处理系统中产生的异常情况 + ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(TroubleshootingActor.class), RemoteConstant.TROUBLESHOOTING_ACTOR_NAME); + actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class); + log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress); log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TroubleshootingActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TroubleshootingActor.java new file mode 100644 index 00000000..3b731b40 --- /dev/null +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TroubleshootingActor.java @@ -0,0 +1,25 @@ +package com.github.kfcfans.powerjob.worker.actors; + +import akka.actor.AbstractActor; +import akka.actor.DeadLetter; +import lombok.extern.slf4j.Slf4j; + +/** + * 处理系统异常的 Actor + * + * @author 朱八 + * @since 2020/7/16 + */ +@Slf4j +public class TroubleshootingActor extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .match(DeadLetter.class, this::onReceiveDeadLetter) + .build(); + } + + public void onReceiveDeadLetter(DeadLetter dl) { + log.warn("[IndianActor] receive DeadLetter: {}", dl); + } +} diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index 78b20322..01e1f9ac 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -37,6 +37,10 @@ public class CommonTaskTracker extends TaskTracker { // 可以是除 ROOT_TASK_ID 的任何数字 private static final String LAST_TASK_ID = "1111"; + // 连续上报多次失败后放弃上报,视为结果不可达,TaskTracker down + private int reportFailedCnt = 0; + private static final int MAX_REPORT_FAILED_THRESHOLD = 5; + protected CommonTaskTracker(ServerScheduleJobReq req) { super(req); } @@ -232,6 +236,10 @@ public class CommonTaskTracker extends TaskTracker { // 服务器未接受上报,则等待下次重新上报 if (!serverAccepted) { + if (++reportFailedCnt > MAX_REPORT_FAILED_THRESHOLD) { + log.error("[TaskTracker-{}] try to report finished status(success={}, result={}) lots of times but all failed, it's time to give up, so the process result will be dropped", instanceId, success, result); + destroy(); + } return; } diff --git a/powerjob-worker/src/main/resources/oms-worker.akka.conf b/powerjob-worker/src/main/resources/oms-worker.akka.conf index 94bf50af..409eee9c 100644 --- a/powerjob-worker/src/main/resources/oms-worker.akka.conf +++ b/powerjob-worker/src/main/resources/oms-worker.akka.conf @@ -20,4 +20,36 @@ akka { canonical.port = 25520 } } + + # dispatcher + task-tracker-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 2.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 10 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 100 + } + + processor-tracker-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 2.0 + parallelism-max = 4 + } + throughput = 100 + } } \ No newline at end of file