diff --git a/powerjob-common/src/main/java/tech/powerjob/common/enums/ExecuteType.java b/powerjob-common/src/main/java/tech/powerjob/common/enums/ExecuteType.java index 98864a5a..f84957b3 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/enums/ExecuteType.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/enums/ExecuteType.java @@ -26,8 +26,8 @@ public enum ExecuteType { MAP_REDUCE(3, "MapReduce"), MAP(4, "Map"); - int v; - String des; + private final int v; + private final String des; public static ExecuteType of(int v) { for (ExecuteType type : values()) { diff --git a/powerjob-common/src/main/java/tech/powerjob/common/enums/TimeExpressionType.java b/powerjob-common/src/main/java/tech/powerjob/common/enums/TimeExpressionType.java index 30c10f2e..92d90d88 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/enums/TimeExpressionType.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/enums/TimeExpressionType.java @@ -5,6 +5,7 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.ToString; +import java.util.Collections; import java.util.List; /** @@ -24,13 +25,13 @@ public enum TimeExpressionType { FIXED_DELAY(4), WORKFLOW(5); - int v; + private final int v; - public static final List FREQUENT_TYPES = Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v); + public static final List FREQUENT_TYPES = Collections.unmodifiableList(Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v)); /** * 首次计算触发时间时必须计算出一个有效值 */ - public static final List INSPECT_TYPES = Lists.newArrayList(CRON.v); + public static final List INSPECT_TYPES = Collections.unmodifiableList(Lists.newArrayList(CRON.v)); public static TimeExpressionType of(int v) { for (TimeExpressionType type : values()) { diff --git a/powerjob-common/src/main/java/tech/powerjob/common/enums/WorkflowInstanceStatus.java b/powerjob-common/src/main/java/tech/powerjob/common/enums/WorkflowInstanceStatus.java index 474cbc7c..751083ab 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/enums/WorkflowInstanceStatus.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/enums/WorkflowInstanceStatus.java @@ -4,6 +4,7 @@ import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Getter; +import java.util.Collections; import java.util.List; /** @@ -27,11 +28,11 @@ public enum WorkflowInstanceStatus { /** * 广义的运行状态 */ - public static final List GENERALIZED_RUNNING_STATUS = Lists.newArrayList(WAITING.v, RUNNING.v); + public static final List GENERALIZED_RUNNING_STATUS = Collections.unmodifiableList(Lists.newArrayList(WAITING.v, RUNNING.v)); /** * 结束状态 */ - public static final List FINISHED_STATUS = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v); + public static final List FINISHED_STATUS = Collections.unmodifiableList(Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v)); private final int v; diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/container/ContainerService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/container/ContainerService.java index c88e7ea2..56dc22ee 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/container/ContainerService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/container/ContainerService.java @@ -84,7 +84,9 @@ public class ContainerService { // 并发部署的机器数量 private static final int DEPLOY_BATCH_NUM = 50; // 部署间隔 - private static final long DEPLOY_MIN_INTERVAL = 10 * 60 * 1000; + private static final long DEPLOY_MIN_INTERVAL = 10 * 60 * 1000L; + // 最长部署时间 + private static final long DEPLOY_MAX_COST_TIME = 10 * 60 * 1000L; /** * 保存容器 @@ -208,14 +210,13 @@ public class ContainerService { String deployLock = "containerDeployLock-" + containerId; RemoteEndpoint.Async remote = session.getAsyncRemote(); // 最长部署时间:10分钟 - boolean lock = lockService.tryLock(deployLock, 10 * 60 * 1000); + boolean lock = lockService.tryLock(deployLock, DEPLOY_MAX_COST_TIME); if (!lock) { remote.sendText("SYSTEM: acquire deploy lock failed, maybe other user is deploying, please wait until the running deploy task finished."); return; } try { - Optional containerInfoOpt = containerInfoRepository.findById(containerId); if (!containerInfoOpt.isPresent()) { remote.sendText("SYSTEM: can't find container by id: " + containerId); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java index 0191e800..06037106 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java @@ -94,7 +94,7 @@ public class CleanService { */ private void cleanByOneServer() { // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了 - boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000); + boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000L); if (!lock) { log.info("[CleanService] clean job is already running, just return."); return; diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java index a04dd67b..b3f70241 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java @@ -69,7 +69,7 @@ public class SnowFlakeIdGenerator { public synchronized long nextId() { long currStamp = getNewStamp(); if (currStamp < lastTimestamp) { - throw new RuntimeException("clock moved backwards, refusing to generate id"); + return futureId(); } if (currStamp == lastTimestamp) { @@ -92,6 +92,22 @@ public class SnowFlakeIdGenerator { | sequence; //序列号部分 } + /** + * 发生时钟回拨时借用未来时间生成Id,避免运行过程中任务调度和工作流直接进入不可用状态 + * 注:该方式不可解决原算法中停服状态下时钟回拨导致的重复id问题 + */ + private long futureId() { + sequence = (sequence + 1) & MAX_SEQUENCE; + if (sequence == 0L) { + lastTimestamp = lastTimestamp + 1; + } + + return (lastTimestamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分 + | dataCenterId << DATA_CENTER_LEFT //数据中心部分 + | machineId << MACHINE_LEFT //机器标识部分 + | sequence; //序列号部分 + } + private long getNextMill() { long mill = getNewStamp(); while (mill <= lastTimestamp) { diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java index a27689ae..d9438879 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/impl/MailAlarmService.java @@ -1,5 +1,6 @@ package tech.powerjob.server.extension.defaultimpl.alarm.impl; +import org.springframework.beans.factory.annotation.Value; import org.apache.commons.lang3.StringUtils; import tech.powerjob.server.persistence.remote.model.UserInfoDO; import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm; @@ -31,12 +32,11 @@ public class MailAlarmService implements Alarmable { private JavaMailSender javaMailSender; + @Value("${spring.mail.username:''}") private String from; - private static final String FROM_KEY = "spring.mail.username"; @Override public void onFailed(Alarm alarm, List targetUserList) { - initFrom(); if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) { return; } @@ -59,10 +59,4 @@ public class MailAlarmService implements Alarmable { this.javaMailSender = javaMailSender; } - // 不能直接使用 @Value 注入,不存在的时候会报错 - private void initFrom() { - if (StringUtils.isEmpty(from)) { - from = environment.getProperty(FROM_KEY); - } - } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java index 9c84287e..f1e7f8e8 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java @@ -527,12 +527,11 @@ public abstract class TaskTracker { // 3. 避免大查询,分批派发任务 long currentDispatchNum = 0; long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L; + int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum); AtomicInteger index = new AtomicInteger(0); // 4. 循环查询数据库,获取需要派发的任务 while (maxDispatchNum > currentDispatchNum) { - - int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum); List needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit); currentDispatchNum += needDispatchTasks.size();