diff --git a/README.md b/README.md
index bfcf5996..5800098a 100644
--- a/README.md
+++ b/README.md
@@ -47,7 +47,7 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为
| 在线任务治理 | 不支持 | 支持 | 支持 | **支持** |
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
-| 报警监控 | 无 | 邮件 | 短信 | **邮件与钉钉,并支持开发者扩展** |
+| 报警监控 | 无 | 邮件 | 短信 | **WebHook、邮件、钉钉与自定义扩展** |
| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** |
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |
diff --git a/others/images/user.png b/others/images/user.png
index 8de90911..fd3da626 100644
Binary files a/others/images/user.png and b/others/images/user.png differ
diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml
index 2fa776a5..ac7d0292 100644
--- a/powerjob-client/pom.xml
+++ b/powerjob-client/pom.xml
@@ -10,13 +10,13 @@
4.0.0
powerjob-client
- 3.3.3
+ 3.4.0
jar
5.6.1
1.2.68
- 3.3.3
+ 3.4.0
3.2.4
diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml
index b28c9717..29436f48 100644
--- a/powerjob-common/pom.xml
+++ b/powerjob-common/pom.xml
@@ -10,7 +10,7 @@
4.0.0
powerjob-common
- 3.3.3
+ 3.4.0
jar
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java
index 0682a94d..5f53fecc 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/SystemInstanceResult.java
@@ -11,7 +11,7 @@ public class SystemInstanceResult {
/* *********** 普通instance 专用 *********** */
// 同时运行的任务实例数过多
- public static final String TOO_MUCH_INSTANCE = "too much instance(%d>%d)";
+ public static final String TOO_MANY_INSTANCES = "too many instances(%d>%d)";
// 无可用worker
public static final String NO_WORKER_AVAILABLE = "no worker available";
// 任务执行超时
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/WorkflowInstanceStatus.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/WorkflowInstanceStatus.java
index 4a2a1e6f..7bba8d25 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/WorkflowInstanceStatus.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/WorkflowInstanceStatus.java
@@ -24,7 +24,8 @@ public enum WorkflowInstanceStatus {
// 广义的运行状态
public static final List generalizedRunningStatus = Lists.newArrayList(WAITING.v, RUNNING.v);
-
+ // 结束状态
+ public static final List finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v);
private int v;
private String des;
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java
index c9a22df4..b35ea919 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/WorkflowInstanceInfoDTO.java
@@ -27,6 +27,8 @@ public class WorkflowInstanceInfoDTO {
private String dag;
private String result;
+ // 预计触发时间
+ private Long expectedTriggerTime;
// 实际触发时间
private Long actualTriggerTime;
// 结束时间
diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml
index 480559c7..68036a10 100644
--- a/powerjob-server/pom.xml
+++ b/powerjob-server/pom.xml
@@ -10,13 +10,13 @@
4.0.0
powerjob-server
- 3.3.3
+ 3.4.0
jar
2.9.2
2.3.4.RELEASE
- 3.3.3
+ 3.4.0
8.0.19
19.7.0.0
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/RejectedExecutionHandlerFactory.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/RejectedExecutionHandlerFactory.java
new file mode 100644
index 00000000..691dee87
--- /dev/null
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/RejectedExecutionHandlerFactory.java
@@ -0,0 +1,58 @@
+package com.github.kfcfans.powerjob.server.common;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.RejectedExecutionHandler;
+
+/**
+ * 拒绝策略
+ *
+ * @author tjq
+ * @since 2020/11/28
+ */
+@Slf4j
+public class RejectedExecutionHandlerFactory {
+
+ /**
+ * 直接丢弃该任务
+ * @param source log name
+ * @return A handler for tasks that cannot be executed by ThreadPool
+ */
+ public static RejectedExecutionHandler newReject(String source) {
+ return (r, p) -> {
+ log.error("[{}] ThreadPool[{}] overload, the task[{}] will be dropped!", source, p, r);
+ log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
+ };
+ }
+
+ /**
+ * 调用线程运行
+ * @param source log name
+ * @return A handler for tasks that cannot be executed by ThreadPool
+ */
+ public static RejectedExecutionHandler newCallerRun(String source) {
+ return (r, p) -> {
+ log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread!", source, p, r);
+ log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
+ if (!p.isShutdown()) {
+ r.run();
+ }
+ };
+ }
+
+ /**
+ * 新线程运行
+ * @param source log name
+ * @return A handler for tasks that cannot be executed by ThreadPool
+ */
+ public static RejectedExecutionHandler newThreadRun(String source) {
+ return (r, p) -> {
+ log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!", source, p, r);
+ log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
+ if (!p.isShutdown()) {
+ new Thread(r).start();
+ }
+ };
+ }
+
+}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java
index 5597eaf5..5762f0ce 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java
@@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.server.common.config;
+import com.github.kfcfans.powerjob.server.common.RejectedExecutionHandlerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -33,11 +34,7 @@ public class ThreadPoolConfig {
executor.setQueueCapacity(0);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsTimingPool-");
- executor.setRejectedExecutionHandler((r, e) -> {
- log.warn("[OmsTimingService] timing pool can't schedule job immediately, maybe some job using too much cpu times.");
- // 定时任务优先级较高,不惜一些代价都需要继续执行,开线程继续干~
- new Thread(r).start();
- });
+ executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PowerJobTimingPool"));
return executor;
}
@@ -49,7 +46,7 @@ public class ThreadPoolConfig {
executor.setQueueCapacity(8192);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsBackgroundPool-");
- executor.setRejectedExecutionHandler(new LogOnRejected());
+ executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newReject("PowerJobBackgroundPool"));
return executor;
}
@@ -63,11 +60,4 @@ public class ThreadPoolConfig {
return scheduler;
}
- private static final class LogOnRejected implements RejectedExecutionHandler {
-
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor p) {
- log.error("[OmsThreadPool] Task({}) rejected from pool({}).", r, p);
- }
- }
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java
index 847966a1..241bbe5d 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java
@@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.common.utils.timewheel;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
+import com.github.kfcfans.powerjob.server.common.RejectedExecutionHandlerFactory;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -66,9 +67,9 @@ public class HashedWheelTimer implements Timer {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
BlockingQueue queue = Queues.newLinkedBlockingQueue(16);
int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
- taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
+ taskProcessPool = new ThreadPoolExecutor(core, 4 * core,
60, TimeUnit.SECONDS,
- queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
+ queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool"));
}
startTime = System.currentTimeMillis();
@@ -172,6 +173,11 @@ public class HashedWheelTimer implements Timer {
removeIf(timerFuture -> {
+ // processCanceledTasks 后外部操作取消任务会导致 BUCKET 中仍存在 CANCELED 任务的情况
+ if (timerFuture.status == HashedWheelTimerFuture.CANCELED) {
+ return true;
+ }
+
if (timerFuture.status != HashedWheelTimerFuture.WAITING) {
log.warn("[HashedWheelTimer] impossible, please fix the bug");
return true;
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java
index c2a074a4..e6f483c4 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java
@@ -48,6 +48,8 @@ public class WorkflowInstanceInfoDO {
@Column
private String result;
+ // 预计触发时间
+ private Long expectedTriggerTime;
// 实际触发时间
private Long actualTriggerTime;
// 结束时间
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java
index f65fb299..3742e3c8 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java
@@ -71,6 +71,6 @@ public interface InstanceInfoRepository extends JpaRepository status);
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInstanceInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInstanceInfoRepository.java
index 809bfdfc..6dabc143 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInstanceInfoRepository.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInstanceInfoRepository.java
@@ -24,11 +24,11 @@ public interface WorkflowInstanceInfoRepository extends JpaRepository status);
int countByWorkflowIdAndStatusIn(Long workflowId, List status);
// 状态检查
- List findByAppIdInAndStatusAndGmtModifiedBefore(List appIds, int status, Date before);
+ List findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List appIds, int status, long time);
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java
index d92c7a01..8f98c7a0 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java
@@ -84,7 +84,7 @@ public class DispatchService {
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()));
// 超出最大同时运行限制,不执行调度
if (runningInstanceCount > maxInstanceNum) {
- String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, maxInstanceNum);
+ String result = String.format(SystemInstanceResult.TOO_MANY_INSTANCES, runningInstanceCount, maxInstanceNum);
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", jobId, instanceId, runningInstanceCount, maxInstanceNum);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now);
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java
index 10a0c9d3..c4e626c9 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java
@@ -18,10 +18,15 @@ public class InstanceTimeWheelService {
private static final Map CARGO = Maps.newConcurrentMap();
- // 精确时间轮,每 1S 走一格
+ // 精确调度时间轮,每 1MS 走一格
private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
+ // 非精确调度时间轮,用于处理高延迟任务,每 10S 走一格
+ private static final HashedWheelTimer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0);
+
// 支持取消的时间间隔,低于该阈值则不会放进 CARGO
private static final long MIN_INTERVAL_MS = 1000;
+ // 长延迟阈值
+ private static final long LONG_DELAY_THRESHOLD_MS = 60000;
/**
* 定时调度
@@ -30,13 +35,17 @@ public class InstanceTimeWheelService {
* @param timerTask 需要执行的目标方法
*/
public static void schedule(Long uniqueId, Long delayMS, TimerTask timerTask) {
- TimerFuture timerFuture = TIMER.schedule(() -> {
- CARGO.remove(uniqueId);
- timerTask.run();
- }, delayMS, TimeUnit.MILLISECONDS);
- if (delayMS > MIN_INTERVAL_MS) {
- CARGO.put(uniqueId, timerFuture);
+ if (delayMS <= LONG_DELAY_THRESHOLD_MS) {
+ realSchedule(uniqueId, delayMS, timerTask);
+ return;
}
+
+ long expectTriggerTime = System.currentTimeMillis() + delayMS;
+ TimerFuture longDelayTask = SLOW_TIMER.schedule(() -> {
+ CARGO.remove(uniqueId);
+ realSchedule(uniqueId, expectTriggerTime - System.currentTimeMillis(), timerTask);
+ }, delayMS - LONG_DELAY_THRESHOLD_MS, TimeUnit.MILLISECONDS);
+ CARGO.put(uniqueId, longDelayTask);
}
/**
@@ -48,4 +57,15 @@ public class InstanceTimeWheelService {
return CARGO.get(uniqueId);
}
+
+ private static void realSchedule(Long uniqueId, Long delayMS, TimerTask timerTask) {
+ TimerFuture timerFuture = TIMER.schedule(() -> {
+ CARGO.remove(uniqueId);
+ timerTask.run();
+ }, delayMS, TimeUnit.MILLISECONDS);
+ if (delayMS > MIN_INTERVAL_MS) {
+ CARGO.put(uniqueId, timerFuture);
+ }
+ }
+
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java
index da429f08..f180ce3d 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java
@@ -1,5 +1,7 @@
package com.github.kfcfans.powerjob.server.service.timing;
+import com.github.kfcfans.powerjob.common.InstanceStatus;
+import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
@@ -128,7 +130,7 @@ public class CleanService {
}
try {
Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay);
- int num = instanceInfoRepository.deleteAllByGmtModifiedBefore(t);
+ int num = instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, InstanceStatus.finishedStatus);
log.info("[CleanService] deleted {} instanceInfo records whose modify time before {}.", num, t);
}catch (Exception e) {
log.warn("[CleanService] clean instanceInfo failed.", e);
@@ -142,7 +144,7 @@ public class CleanService {
}
try {
Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay);
- int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBefore(t);
+ int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, WorkflowInstanceStatus.finishedStatus);
log.info("[CleanService] deleted {} workflow instanceInfo records whose modify time before {}.", num, t);
}catch (Exception e) {
log.warn("[CleanService] clean workflow instanceInfo failed.", e);
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java
index 489f2874..ddea791e 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java
@@ -162,7 +162,7 @@ public class InstanceStatusCheckService {
// 重试长时间处于 WAITING 状态的工作流实例
long threshold = System.currentTimeMillis() - WORKFLOW_WAITING_TIMEOUT_MS;
Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> {
- List waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, WorkflowInstanceStatus.WAITING.getV(), new Date(threshold));
+ List waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, WorkflowInstanceStatus.WAITING.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingWfInstanceList)) {
List wfInstanceIds = waitingWfInstanceList.stream().map(WorkflowInstanceInfoDO::getWfInstanceId).collect(Collectors.toList());
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java
index 2115a664..f036b374 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java
@@ -193,7 +193,7 @@ public class OmsScheduleService {
wfInfos.forEach(wfInfo -> {
// 1. 先生成调度记录,防止不调度的情况发生
- Long wfInstanceId = workflowInstanceManager.create(wfInfo, null);
+ Long wfInstanceId = workflowInstanceManager.create(wfInfo, null, wfInfo.getNextTriggerTime());
// 2. 推入时间轮,准备调度执行
long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis();
@@ -220,6 +220,9 @@ public class OmsScheduleService {
try {
// 查询所有的秒级任务(只包含ID)
List jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.frequentTypes);
+ if (CollectionUtils.isEmpty(jobIds)) {
+ return;
+ }
// 查询日志记录表中是否存在相关的任务
List runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.generalizedRunningStatus);
Set runningJobIdSet = Sets.newHashSet(runningJobIdList);
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java
index 37ca9396..a10b8a92 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java
@@ -68,9 +68,10 @@ public class WorkflowInstanceManager {
* 创建工作流任务实例
* @param wfInfo 工作流任务元数据(描述信息)
* @param initParams 启动参数
+ * @param expectTriggerTime 预计执行时间
* @return wfInstanceId
*/
- public Long create(WorkflowInfoDO wfInfo, String initParams) {
+ public Long create(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime) {
Long wfId = wfInfo.getId();
Long wfInstanceId = idGenerateService.allocate();
@@ -82,6 +83,7 @@ public class WorkflowInstanceManager {
newWfInstance.setWfInstanceId(wfInstanceId);
newWfInstance.setWorkflowId(wfId);
newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
+ newWfInstance.setExpectedTriggerTime(expectTriggerTime);
newWfInstance.setActualTriggerTime(System.currentTimeMillis());
newWfInstance.setWfInitParams(initParams);
@@ -129,7 +131,7 @@ public class WorkflowInstanceManager {
// 并发度控制
int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.generalizedRunningStatus);
if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) {
- onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo);
+ onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo);
return;
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java
index 51929452..53ae028b 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java
@@ -168,7 +168,7 @@ public class WorkflowService {
private Long realRunWorkflow(WorkflowInfoDO wfInfo, String initParams, long delay) {
log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay);
- Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams);
+ Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay);
if (delay <= 0) {
workflowInstanceManager.start(wfInfo, wfInstanceId, initParams);
}else {
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkflowInstanceInfoVO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkflowInstanceInfoVO.java
index 20c55b06..ed1e2daa 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkflowInstanceInfoVO.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/WorkflowInstanceInfoVO.java
@@ -32,6 +32,8 @@ public class WorkflowInstanceInfoVO {
private PEWorkflowDAG pEWorkflowDAG;
private String result;
+ // 预计触发时间
+ private String expectedTriggerTime;
// 实际触发时间(需要格式化为人看得懂的时间)
private String actualTriggerTime;
// 结束时间(同理,需要格式化)
@@ -49,6 +51,7 @@ public class WorkflowInstanceInfoVO {
vo.setWorkflowId(String.valueOf(wfInstanceDO.getWorkflowId()));
// 格式化时间
+ vo.setExpectedTriggerTime(DateFormatUtils.format(wfInstanceDO.getExpectedTriggerTime(), OmsConstant.TIME_PATTERN));
vo.setActualTriggerTime(DateFormatUtils.format(wfInstanceDO.getActualTriggerTime(), OmsConstant.TIME_PATTERN));
if (wfInstanceDO.getFinishedTime() == null) {
vo.setFinishedTime(OmsConstant.NONE);
diff --git a/powerjob-server/src/main/resources/logback-product.xml b/powerjob-server/src/main/resources/logback-product.xml
index 9f63badf..2be6e263 100644
--- a/powerjob-server/src/main/resources/logback-product.xml
+++ b/powerjob-server/src/main/resources/logback-product.xml
@@ -2,18 +2,17 @@
+
+
+
+
+
-
-
- ${CONSOLE_LOG_PATTERN}
- utf8
-
-
diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/HashedWheelTimerTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/HashedWheelTimerTest.java
new file mode 100644
index 00000000..86dd13ca
--- /dev/null
+++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/HashedWheelTimerTest.java
@@ -0,0 +1,126 @@
+package com.github.kfcfans.powerjob.server.test;
+
+import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTimer;
+import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
+import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerTask;
+import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * 时间轮测试
+ *
+ * @author tjq
+ * @since 2020/11/28
+ */
+@Slf4j
+public class HashedWheelTimerTest {
+
+ @Test
+ public void testHashedWheelTimer() throws Exception {
+
+ HashedWheelTimer timer = new HashedWheelTimer(1, 1024, 32);
+ List futures = Lists.newLinkedList();
+
+ for (int i = 0; i < 1000; i++) {
+
+ String name = "Task" + i;
+ long nowMS = System.currentTimeMillis();
+ int delayMS = ThreadLocalRandom.current().nextInt(60000);
+ long targetTime = delayMS + nowMS;
+
+ TimerTask timerTask = () -> {
+ System.out.println("============= " + name + "============= ");
+ System.out.println("ThreadInfo:" + Thread.currentThread().getName());
+ System.out.println("expectTime:" + targetTime);;
+ System.out.println("currentTime:" + System.currentTimeMillis());
+ System.out.println("deviation:" + (System.currentTimeMillis() - targetTime));
+ System.out.println("============= " + name + "============= ");
+ };
+ futures.add(timer.schedule(timerTask, delayMS, TimeUnit.MILLISECONDS));
+ }
+
+ // 随机取消
+ futures.forEach(future -> {
+
+ int x = ThreadLocalRandom.current().nextInt(2);
+ if (x == 1) {
+ future.cancel();
+ }
+
+ });
+
+ Thread.sleep(1000);
+
+ // 关闭
+ System.out.println(timer.stop().size());
+ System.out.println("Finished!");
+
+ Thread.sleep(277777777);
+ }
+
+ @Test
+ public void testPerformance() throws Exception {
+ Stopwatch sw = Stopwatch.createStarted();
+ for (long i = 0; i < 10000000; i++) {
+ long delay = ThreadLocalRandom.current().nextLong(100, 120000);
+ long expect = System.currentTimeMillis() + delay;
+ InstanceTimeWheelService.schedule(i, delay, () -> {
+ log.info("[Performance] deviation:{}", (System.currentTimeMillis() - expect));
+ });
+ }
+ log.info("[Performance] insert cost: {}", sw);
+
+ Thread.sleep(90000);
+ }
+
+ @Test
+ public void testLongDelayTask() throws Exception {
+ for (long i = 0; i < 1000000; i++) {
+ long delay = ThreadLocalRandom.current().nextLong(60000, 60000 * 3);
+ long expect = System.currentTimeMillis() + delay;
+ InstanceTimeWheelService.schedule(i, delay, () -> {
+ log.info("[LongDelayTask] deviation: {}", (System.currentTimeMillis() - expect));
+ });
+ }
+
+ Thread.sleep(60000 * 4);
+ }
+
+ @Test
+ public void testCancelDelayTask() throws Exception {
+
+ AtomicLong executeNum = new AtomicLong();
+ AtomicLong cancelNum = new AtomicLong();
+ for (long i = 0; i < 1000000; i++) {
+ long delay = ThreadLocalRandom.current().nextLong(60000, 60000 * 2);
+ long expect = System.currentTimeMillis() + delay;
+ InstanceTimeWheelService.schedule(i, delay, () -> {
+ executeNum.incrementAndGet();
+ log.info("[CancelLongDelayTask] deviation: {}", (System.currentTimeMillis() - expect));
+ });
+ }
+
+ Thread.sleep(10000);
+
+ for (long i = 0; i < 1000000; i++) {
+ boolean nextBoolean = ThreadLocalRandom.current().nextBoolean();
+ if (nextBoolean) {
+ continue;
+ }
+ boolean cancel = InstanceTimeWheelService.fetchTimerFuture(i).cancel();
+ log.info("[CancelLongDelayTask] id:{},status:{}", i, cancel);
+ cancelNum.incrementAndGet();
+ }
+
+ Thread.sleep(60000 * 4);
+ log.info("[CancelLongDelayTask] result -> executeNum:{},cancelNum:{}", executeNum, cancelNum);
+ }
+}
diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java
index 51cd52b3..341fe412 100644
--- a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java
+++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java
@@ -1,6 +1,8 @@
package com.github.kfcfans.powerjob.server.test;
+import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
+import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
@@ -104,12 +106,12 @@ public class RepositoryTest {
@Test
public void testDeleteInstanceInfo() {
- instanceInfoRepository.deleteAllByGmtModifiedBefore(new Date());
+ instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), InstanceStatus.finishedStatus);
}
@Test
public void testDeleteWorkflowInstanceInfo() {
- workflowInstanceInfoRepository.deleteAllByGmtModifiedBefore(new Date());
+ workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), WorkflowInstanceStatus.finishedStatus);
}
}
diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/UtilsTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/UtilsTest.java
index c8b52f20..b3592a4a 100644
--- a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/UtilsTest.java
+++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/UtilsTest.java
@@ -24,49 +24,6 @@ import java.util.stream.Collectors;
*/
public class UtilsTest {
- @Test
- public void testHashedWheelTimer() throws Exception {
-
- HashedWheelTimer timer = new HashedWheelTimer(1, 1024, 32);
- List futures = Lists.newLinkedList();
-
- for (int i = 0; i < 1000; i++) {
-
- String name = "Task" + i;
- long nowMS = System.currentTimeMillis();
- int delayMS = ThreadLocalRandom.current().nextInt(60000);
- long targetTime = delayMS + nowMS;
-
- TimerTask timerTask = () -> {
- System.out.println("============= " + name + "============= ");
- System.out.println("ThreadInfo:" + Thread.currentThread().getName());
- System.out.println("expectTime:" + targetTime);;
- System.out.println("currentTime:" + System.currentTimeMillis());
- System.out.println("deviation:" + (System.currentTimeMillis() - targetTime));
- System.out.println("============= " + name + "============= ");
- };
- futures.add(timer.schedule(timerTask, delayMS, TimeUnit.MILLISECONDS));
- }
-
- // 随机取消
- futures.forEach(future -> {
-
- int x = ThreadLocalRandom.current().nextInt(2);
- if (x == 1) {
- future.cancel();
- }
-
- });
-
- Thread.sleep(1000);
-
- // 关闭
- System.out.println(timer.stop().size());
- System.out.println("Finished!");
-
- Thread.sleep(277777777);
- }
-
@Test
public void testCronExpression() throws Exception {
String cron = "0 * * * * ? *";
diff --git a/powerjob-server/src/test/resources/logback-test.xml b/powerjob-server/src/test/resources/logback-test.xml
new file mode 100644
index 00000000..41e6a2e7
--- /dev/null
+++ b/powerjob-server/src/test/resources/logback-test.xml
@@ -0,0 +1,19 @@
+
+
+
+
+
+
+
+
+
+ ${CONSOLE_LOG_PATTERN}
+ utf8
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml
index b63a6860..177a00f7 100644
--- a/powerjob-worker-agent/pom.xml
+++ b/powerjob-worker-agent/pom.xml
@@ -10,12 +10,12 @@
4.0.0
powerjob-worker-agent
- 3.3.3
+ 3.4.0
jar
- 3.3.3
+ 3.4.0
1.2.3
4.3.2
diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml
index 2744ab12..44815bba 100644
--- a/powerjob-worker-samples/pom.xml
+++ b/powerjob-worker-samples/pom.xml
@@ -10,11 +10,11 @@
4.0.0
powerjob-worker-samples
- 3.3.3
+ 3.4.0
2.2.6.RELEASE
- 3.3.3
+ 3.4.0
1.2.68
diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml
index a9dadcd3..45191759 100644
--- a/powerjob-worker-spring-boot-starter/pom.xml
+++ b/powerjob-worker-spring-boot-starter/pom.xml
@@ -10,11 +10,11 @@
4.0.0
powerjob-worker-spring-boot-starter
- 3.3.3
+ 3.4.0
jar
- 3.3.3
+ 3.4.0
2.2.6.RELEASE
diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml
index 9996b9fc..b320e73a 100644
--- a/powerjob-worker/pom.xml
+++ b/powerjob-worker/pom.xml
@@ -10,12 +10,12 @@
4.0.0
powerjob-worker
- 3.3.3
+ 3.4.0
jar
5.2.4.RELEASE
- 3.3.3
+ 3.4.0
1.4.200
3.4.2
5.6.1