mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
Merge branch 'v3.2.0' into jenkins_auto_build
This commit is contained in:
commit
675194f9cf
6
pom.xml
6
pom.xml
@ -9,7 +9,7 @@
|
||||
<version>1.0.0</version>
|
||||
<packaging>pom</packaging>
|
||||
<name>powerjob</name>
|
||||
<url>https://github.com/KFCFans/OhMyScheduler</url>
|
||||
<url>https://github.com/KFCFans/PowerJob</url>
|
||||
<description>Distributed scheduling and execution framework</description>
|
||||
<licenses>
|
||||
<license>
|
||||
@ -19,8 +19,8 @@
|
||||
</license>
|
||||
</licenses>
|
||||
<scm>
|
||||
<url>https://github.com/KFCFans/OhMyScheduler</url>
|
||||
<connection>https://github.com/KFCFans/OhMyScheduler.git</connection>
|
||||
<url>https://github.com/KFCFans/PowerJob</url>
|
||||
<connection>https://github.com/KFCFans/PowerJob.git</connection>
|
||||
</scm>
|
||||
|
||||
<developers>
|
||||
|
@ -28,6 +28,8 @@ public enum InstanceStatus {
|
||||
|
||||
// 广义的运行状态
|
||||
public static final List<Integer> generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v);
|
||||
// 结束状态
|
||||
public static final List<Integer> finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v);
|
||||
|
||||
public static InstanceStatus of(int v) {
|
||||
for (InstanceStatus is : values()) {
|
||||
|
@ -17,6 +17,7 @@ public class RemoteConstant {
|
||||
public static final String Task_TRACKER_ACTOR_NAME = "task_tracker";
|
||||
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
|
||||
public static final String WORKER_ACTOR_NAME = "worker";
|
||||
public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting";
|
||||
|
||||
public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf";
|
||||
|
||||
|
@ -46,8 +46,8 @@ public class SaveJobInfoRequest {
|
||||
|
||||
|
||||
/* ************************** 运行时配置 ************************** */
|
||||
// 最大同时运行任务数
|
||||
private Integer maxInstanceNum = 1;
|
||||
// 最大同时运行任务数,0 代表不限
|
||||
private Integer maxInstanceNum = 0;
|
||||
// 并发度,同时执行的线程数量
|
||||
private Integer concurrency = 5;
|
||||
// 任务整体超时时间
|
||||
|
@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.akka;
|
||||
import akka.actor.ActorSelection;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
import akka.routing.RoundRobinPool;
|
||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||
import com.github.kfcfans.powerjob.server.akka.actors.FriendActor;
|
||||
@ -58,7 +59,9 @@ public class OhMyServer {
|
||||
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
|
||||
actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
|
||||
|
||||
actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME);
|
||||
actorSystem.actorOf(Props.create(ServerActor.class)
|
||||
.withDispatcher("akka.server-actor-dispatcher")
|
||||
.withRouter(new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)), RemoteConstant.SERVER_ACTOR_NAME);
|
||||
actorSystem.actorOf(Props.create(FriendActor.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
|
||||
|
||||
log.info("[OhMyServer] OhMyServer's akka system start successfully, using time {}.", stopwatch);
|
||||
|
@ -63,7 +63,7 @@ public class ServerActor extends AbstractActor {
|
||||
getInstanceManager().updateStatus(req);
|
||||
|
||||
// 结束状态(成功/失败)需要回复消息
|
||||
if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) {
|
||||
if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) {
|
||||
getSender().tell(AskResponse.succeed(null), getSelf());
|
||||
}
|
||||
}catch (Exception e) {
|
||||
|
@ -68,8 +68,11 @@ public class DispatchService {
|
||||
|
||||
// 查询当前运行的实例数
|
||||
long current = System.currentTimeMillis();
|
||||
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus);
|
||||
|
||||
// 0 代表不限制在线任务,还能省去一次 DB 查询
|
||||
if (jobInfo.getMaxInstanceNum() > 0) {
|
||||
|
||||
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()));
|
||||
// 超出最大同时运行限制,不执行调度
|
||||
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
|
||||
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
|
||||
@ -79,6 +82,7 @@ public class DispatchService {
|
||||
instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 获取当前所有可用的Worker
|
||||
List<String> allAvailableWorker = WorkerManagerService.getSortedAvailableWorker(jobInfo.getAppId(), jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace());
|
||||
|
@ -35,17 +35,14 @@ public class CleanService {
|
||||
@Resource
|
||||
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
|
||||
|
||||
@Value("${oms.log.retention.local}")
|
||||
private int localLogRetentionDay;
|
||||
@Value("${oms.log.retention.remote}")
|
||||
private int remoteLogRetentionDay;
|
||||
@Value("${oms.instanceinfo.retention}")
|
||||
private int instanceInfoRetentionDay;
|
||||
|
||||
@Value("${oms.container.retention.local}")
|
||||
private int localContainerRetentionDay;
|
||||
@Value("${oms.container.retention.remote}")
|
||||
private int remoteContainerRetentionDay;
|
||||
|
||||
@Value("${oms.instanceinfo.retention}")
|
||||
private int instanceInfoRetentionDay;
|
||||
|
||||
private static final int TEMPORARY_RETENTION_DAY = 3;
|
||||
|
||||
@ -65,12 +62,12 @@ public class CleanService {
|
||||
cleanWorkflowInstanceLog();
|
||||
|
||||
// 释放磁盘空间
|
||||
cleanLocal(OmsFileUtils.genLogDirPath(), localLogRetentionDay);
|
||||
cleanLocal(OmsFileUtils.genLogDirPath(), instanceInfoRetentionDay);
|
||||
cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay);
|
||||
cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY);
|
||||
|
||||
// 删除 GridFS 过期文件
|
||||
cleanRemote(GridFsManager.LOG_BUCKET, remoteLogRetentionDay);
|
||||
cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay);
|
||||
cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay);
|
||||
}
|
||||
|
||||
|
@ -8,9 +8,11 @@ import org.aspectj.lang.JoinPoint;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
import org.aspectj.lang.annotation.Before;
|
||||
import org.aspectj.lang.annotation.Pointcut;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.context.request.RequestContextHolder;
|
||||
import org.springframework.web.context.request.ServletRequestAttributes;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
@ -86,6 +88,11 @@ public class WebLogAspect {
|
||||
if (obj instanceof HttpServletRequest || obj instanceof HttpServletResponse) {
|
||||
break;
|
||||
}
|
||||
// FatJar
|
||||
if (obj instanceof MultipartFile || obj instanceof Resource) {
|
||||
break;
|
||||
}
|
||||
|
||||
objList.add(obj);
|
||||
}
|
||||
return JSONObject.toJSONString(objList);
|
||||
|
@ -21,11 +21,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true
|
||||
spring.mail.properties.mail.smtp.starttls.required=true
|
||||
|
||||
####### 资源清理配置 #######
|
||||
oms.log.retention.local=1
|
||||
oms.log.retention.remote=1
|
||||
oms.instanceinfo.retention=1
|
||||
oms.container.retention.local=1
|
||||
oms.container.retention.remote=-1
|
||||
oms.instanceinfo.retention=1
|
||||
|
||||
####### 缓存配置 #######
|
||||
oms.instance.metadata.cache.size=1024
|
@ -21,11 +21,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true
|
||||
spring.mail.properties.mail.smtp.starttls.required=true
|
||||
|
||||
####### 资源清理配置 #######
|
||||
oms.log.retention.local=3
|
||||
oms.log.retention.remote=3
|
||||
oms.instanceinfo.retention=3
|
||||
oms.container.retention.local=3
|
||||
oms.container.retention.remote=-1
|
||||
oms.instanceinfo.retention=3
|
||||
|
||||
####### 缓存配置 #######
|
||||
oms.instance.metadata.cache.size=1024
|
@ -21,11 +21,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true
|
||||
spring.mail.properties.mail.smtp.starttls.required=true
|
||||
|
||||
####### 资源清理配置 #######
|
||||
oms.log.retention.local=7
|
||||
oms.log.retention.remote=7
|
||||
oms.instanceinfo.retention=7
|
||||
oms.container.retention.local=7
|
||||
oms.container.retention.remote=-1
|
||||
oms.instanceinfo.retention=3
|
||||
|
||||
####### 缓存配置 #######
|
||||
oms.instance.metadata.cache.size=2048
|
@ -20,4 +20,24 @@ akka {
|
||||
canonical.port = 0
|
||||
}
|
||||
}
|
||||
|
||||
server-actor-dispatcher {
|
||||
# Dispatcher is the name of the event-based dispatcher
|
||||
type = Dispatcher
|
||||
# What kind of ExecutionService to use
|
||||
executor = "fork-join-executor"
|
||||
# Configuration for the fork join pool
|
||||
fork-join-executor {
|
||||
# Min number of threads to cap factor-based parallelism number to
|
||||
parallelism-min = 2
|
||||
# Parallelism (threads) ... ceil(available processors * factor)
|
||||
parallelism-factor = 2.0
|
||||
# Max number of threads to cap factor-based parallelism number to
|
||||
parallelism-max = 10
|
||||
}
|
||||
# Throughput defines the maximum number of messages to be
|
||||
# processed per actor before the thread jumps to the next actor.
|
||||
# Set to 1 for as fair as possible.
|
||||
throughput = 100
|
||||
}
|
||||
}
|
@ -1,7 +1,10 @@
|
||||
package com.github.kfcfans.powerjob.worker;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.DeadLetter;
|
||||
import akka.actor.Props;
|
||||
import akka.routing.RoundRobinPool;
|
||||
import com.github.kfcfans.powerjob.common.OmsException;
|
||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||
import com.github.kfcfans.powerjob.common.response.ResultDTO;
|
||||
@ -9,6 +12,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils;
|
||||
import com.github.kfcfans.powerjob.common.utils.HttpUtils;
|
||||
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
|
||||
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||
import com.github.kfcfans.powerjob.worker.actors.TroubleshootingActor;
|
||||
import com.github.kfcfans.powerjob.worker.actors.ProcessorTrackerActor;
|
||||
import com.github.kfcfans.powerjob.worker.actors.TaskTrackerActor;
|
||||
import com.github.kfcfans.powerjob.worker.actors.WorkerActor;
|
||||
@ -93,11 +97,20 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
|
||||
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME);
|
||||
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
|
||||
|
||||
int cores = Runtime.getRuntime().availableProcessors();
|
||||
actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
|
||||
actorSystem.actorOf(Props.create(TaskTrackerActor.class), RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
||||
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
||||
actorSystem.actorOf(Props.create(TaskTrackerActor.class)
|
||||
.withDispatcher("akka.task-tracker-dispatcher")
|
||||
.withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME);
|
||||
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class)
|
||||
.withDispatcher("akka.processor-tracker-dispatcher")
|
||||
.withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
||||
actorSystem.actorOf(Props.create(WorkerActor.class), RemoteConstant.WORKER_ACTOR_NAME);
|
||||
|
||||
// 处理系统中产生的异常情况
|
||||
ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(TroubleshootingActor.class), RemoteConstant.TROUBLESHOOTING_ACTOR_NAME);
|
||||
actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
|
||||
|
||||
log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress);
|
||||
log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
|
||||
|
||||
|
@ -0,0 +1,25 @@
|
||||
package com.github.kfcfans.powerjob.worker.actors;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.DeadLetter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* 处理系统异常的 Actor
|
||||
*
|
||||
* @author 朱八
|
||||
* @since 2020/7/16
|
||||
*/
|
||||
@Slf4j
|
||||
public class TroubleshootingActor extends AbstractActor {
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(DeadLetter.class, this::onReceiveDeadLetter)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void onReceiveDeadLetter(DeadLetter dl) {
|
||||
log.warn("[IndianActor] receive DeadLetter: {}", dl);
|
||||
}
|
||||
}
|
@ -37,6 +37,10 @@ public class CommonTaskTracker extends TaskTracker {
|
||||
// 可以是除 ROOT_TASK_ID 的任何数字
|
||||
private static final String LAST_TASK_ID = "1111";
|
||||
|
||||
// 连续上报多次失败后放弃上报,视为结果不可达,TaskTracker down
|
||||
private int reportFailedCnt = 0;
|
||||
private static final int MAX_REPORT_FAILED_THRESHOLD = 5;
|
||||
|
||||
protected CommonTaskTracker(ServerScheduleJobReq req) {
|
||||
super(req);
|
||||
}
|
||||
@ -232,6 +236,10 @@ public class CommonTaskTracker extends TaskTracker {
|
||||
|
||||
// 服务器未接受上报,则等待下次重新上报
|
||||
if (!serverAccepted) {
|
||||
if (++reportFailedCnt > MAX_REPORT_FAILED_THRESHOLD) {
|
||||
log.error("[TaskTracker-{}] try to report finished status(success={}, result={}) lots of times but all failed, it's time to give up, so the process result will be dropped", instanceId, success, result);
|
||||
destroy();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -20,4 +20,36 @@ akka {
|
||||
canonical.port = 25520
|
||||
}
|
||||
}
|
||||
|
||||
# dispatcher
|
||||
task-tracker-dispatcher {
|
||||
# Dispatcher is the name of the event-based dispatcher
|
||||
type = Dispatcher
|
||||
# What kind of ExecutionService to use
|
||||
executor = "fork-join-executor"
|
||||
# Configuration for the fork join pool
|
||||
fork-join-executor {
|
||||
# Min number of threads to cap factor-based parallelism number to
|
||||
parallelism-min = 2
|
||||
# Parallelism (threads) ... ceil(available processors * factor)
|
||||
parallelism-factor = 2.0
|
||||
# Max number of threads to cap factor-based parallelism number to
|
||||
parallelism-max = 10
|
||||
}
|
||||
# Throughput defines the maximum number of messages to be
|
||||
# processed per actor before the thread jumps to the next actor.
|
||||
# Set to 1 for as fair as possible.
|
||||
throughput = 100
|
||||
}
|
||||
|
||||
processor-tracker-dispatcher {
|
||||
type = Dispatcher
|
||||
executor = "fork-join-executor"
|
||||
fork-join-executor {
|
||||
parallelism-min = 2
|
||||
parallelism-factor = 2.0
|
||||
parallelism-max = 4
|
||||
}
|
||||
throughput = 100
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user