Merge branch 'v3.4.0' into jenkins_auto_build

This commit is contained in:
tjq 2020-11-28 16:56:41 +08:00
commit 3b8fb2f28f
31 changed files with 294 additions and 102 deletions

View File

@ -47,7 +47,7 @@ PowerJob 的设计目标为企业级的分布式任务调度平台,即成为
| 在线任务治理 | 不支持 | 支持 | 支持 | **支持** |
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
| 报警监控 | 无 | 邮件 | 短信 | **邮件与钉钉,并支持开发者扩展** |
| 报警监控 | 无 | 邮件 | 短信 | **WebHook、邮件、钉钉与自定义扩展** |
| 系统依赖 | JDBC支持的关系型数据库MySQL、Oracle... | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库MySQL、Oracle...** |
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |

Binary file not shown.

Before

Width:  |  Height:  |  Size: 618 KiB

After

Width:  |  Height:  |  Size: 162 KiB

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.3.3</version>
<version>3.4.0</version>
<packaging>jar</packaging>
<properties>
<junit.version>5.6.1</junit.version>
<fastjson.version>1.2.68</fastjson.version>
<powerjob.common.version>3.3.3</powerjob.common.version>
<powerjob.common.version>3.4.0</powerjob.common.version>
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
</properties>

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.3.3</version>
<version>3.4.0</version>
<packaging>jar</packaging>
<properties>

View File

@ -11,7 +11,7 @@ public class SystemInstanceResult {
/* *********** 普通instance 专用 *********** */
// 同时运行的任务实例数过多
public static final String TOO_MUCH_INSTANCE = "too much instance(%d>%d)";
public static final String TOO_MANY_INSTANCES = "too many instances(%d>%d)";
// 无可用worker
public static final String NO_WORKER_AVAILABLE = "no worker available";
// 任务执行超时

View File

@ -24,7 +24,8 @@ public enum WorkflowInstanceStatus {
// 广义的运行状态
public static final List<Integer> generalizedRunningStatus = Lists.newArrayList(WAITING.v, RUNNING.v);
// 结束状态
public static final List<Integer> finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v);
private int v;
private String des;

View File

@ -27,6 +27,8 @@ public class WorkflowInstanceInfoDTO {
private String dag;
private String result;
// 预计触发时间
private Long expectedTriggerTime;
// 实际触发时间
private Long actualTriggerTime;
// 结束时间

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.3.3</version>
<version>3.4.0</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.3.4.RELEASE</springboot.version>
<powerjob.common.version>3.3.3</powerjob.common.version>
<powerjob.common.version>3.4.0</powerjob.common.version>
<!-- 数据库驱动版本使用的是spring-boot-dependencies管理的版本 -->
<mysql.version>8.0.19</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version>

View File

@ -0,0 +1,58 @@
package com.github.kfcfans.powerjob.server.common;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RejectedExecutionHandler;
/**
* 拒绝策略
*
* @author tjq
* @since 2020/11/28
*/
@Slf4j
public class RejectedExecutionHandlerFactory {
/**
* 直接丢弃该任务
* @param source log name
* @return A handler for tasks that cannot be executed by ThreadPool
*/
public static RejectedExecutionHandler newReject(String source) {
return (r, p) -> {
log.error("[{}] ThreadPool[{}] overload, the task[{}] will be dropped!", source, p, r);
log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
};
}
/**
* 调用线程运行
* @param source log name
* @return A handler for tasks that cannot be executed by ThreadPool
*/
public static RejectedExecutionHandler newCallerRun(String source) {
return (r, p) -> {
log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread!", source, p, r);
log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
if (!p.isShutdown()) {
r.run();
}
};
}
/**
* 新线程运行
* @param source log name
* @return A handler for tasks that cannot be executed by ThreadPool
*/
public static RejectedExecutionHandler newThreadRun(String source) {
return (r, p) -> {
log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!", source, p, r);
log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source);
if (!p.isShutdown()) {
new Thread(r).start();
}
};
}
}

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.server.common.config;
import com.github.kfcfans.powerjob.server.common.RejectedExecutionHandlerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -33,11 +34,7 @@ public class ThreadPoolConfig {
executor.setQueueCapacity(0);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsTimingPool-");
executor.setRejectedExecutionHandler((r, e) -> {
log.warn("[OmsTimingService] timing pool can't schedule job immediately, maybe some job using too much cpu times.");
// 定时任务优先级较高不惜一些代价都需要继续执行开线程继续干
new Thread(r).start();
});
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PowerJobTimingPool"));
return executor;
}
@ -49,7 +46,7 @@ public class ThreadPoolConfig {
executor.setQueueCapacity(8192);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsBackgroundPool-");
executor.setRejectedExecutionHandler(new LogOnRejected());
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newReject("PowerJobBackgroundPool"));
return executor;
}
@ -63,11 +60,4 @@ public class ThreadPoolConfig {
return scheduler;
}
private static final class LogOnRejected implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor p) {
log.error("[OmsThreadPool] Task({}) rejected from pool({}).", r, p);
}
}
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.common.utils.timewheel;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.server.common.RejectedExecutionHandlerFactory;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -66,9 +67,9 @@ public class HashedWheelTimer implements Timer {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(16);
int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
taskProcessPool = new ThreadPoolExecutor(core, 4 * core,
60, TimeUnit.SECONDS,
queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool"));
}
startTime = System.currentTimeMillis();
@ -172,6 +173,11 @@ public class HashedWheelTimer implements Timer {
removeIf(timerFuture -> {
// processCanceledTasks 后外部操作取消任务会导致 BUCKET 中仍存在 CANCELED 任务的情况
if (timerFuture.status == HashedWheelTimerFuture.CANCELED) {
return true;
}
if (timerFuture.status != HashedWheelTimerFuture.WAITING) {
log.warn("[HashedWheelTimer] impossible, please fix the bug");
return true;

View File

@ -48,6 +48,8 @@ public class WorkflowInstanceInfoDO {
@Column
private String result;
// 预计触发时间
private Long expectedTriggerTime;
// 实际触发时间
private Long actualTriggerTime;
// 结束时间

View File

@ -71,6 +71,6 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
// 结果只能用 int 接收
@Modifying
@Transactional
@Query(value = "delete from InstanceInfoDO where gmtModified < ?1")
int deleteAllByGmtModifiedBefore(Date time);
@Query(value = "delete from InstanceInfoDO where gmtModified < ?1 and status in ?2")
int deleteAllByGmtModifiedBeforeAndStatusIn(Date time, List<Integer> status);
}

View File

@ -24,11 +24,11 @@ public interface WorkflowInstanceInfoRepository extends JpaRepository<WorkflowIn
// 结果只能用 int 接收
@Modifying
@Transactional
@Query(value = "delete from WorkflowInstanceInfoDO where gmtModified < ?1")
int deleteAllByGmtModifiedBefore(Date time);
@Query(value = "delete from WorkflowInstanceInfoDO where gmtModified < ?1 and status in ?2")
int deleteAllByGmtModifiedBeforeAndStatusIn(Date time, List<Integer> status);
int countByWorkflowIdAndStatusIn(Long workflowId, List<Integer> status);
// 状态检查
List<WorkflowInstanceInfoDO> findByAppIdInAndStatusAndGmtModifiedBefore(List<Long> appIds, int status, Date before);
List<WorkflowInstanceInfoDO> findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> appIds, int status, long time);
}

View File

@ -84,7 +84,7 @@ public class DispatchService {
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()));
// 超出最大同时运行限制不执行调度
if (runningInstanceCount > maxInstanceNum) {
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, maxInstanceNum);
String result = String.format(SystemInstanceResult.TOO_MANY_INSTANCES, runningInstanceCount, maxInstanceNum);
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", jobId, instanceId, runningInstanceCount, maxInstanceNum);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now);

View File

@ -18,10 +18,15 @@ public class InstanceTimeWheelService {
private static final Map<Long, TimerFuture> CARGO = Maps.newConcurrentMap();
// 精确时间轮 1S 走一格
// 精确调度时间轮 1MS 走一格
private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
// 非精确调度时间轮用于处理高延迟任务 10S 走一格
private static final HashedWheelTimer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0);
// 支持取消的时间间隔低于该阈值则不会放进 CARGO
private static final long MIN_INTERVAL_MS = 1000;
// 长延迟阈值
private static final long LONG_DELAY_THRESHOLD_MS = 60000;
/**
* 定时调度
@ -30,13 +35,17 @@ public class InstanceTimeWheelService {
* @param timerTask 需要执行的目标方法
*/
public static void schedule(Long uniqueId, Long delayMS, TimerTask timerTask) {
TimerFuture timerFuture = TIMER.schedule(() -> {
CARGO.remove(uniqueId);
timerTask.run();
}, delayMS, TimeUnit.MILLISECONDS);
if (delayMS > MIN_INTERVAL_MS) {
CARGO.put(uniqueId, timerFuture);
if (delayMS <= LONG_DELAY_THRESHOLD_MS) {
realSchedule(uniqueId, delayMS, timerTask);
return;
}
long expectTriggerTime = System.currentTimeMillis() + delayMS;
TimerFuture longDelayTask = SLOW_TIMER.schedule(() -> {
CARGO.remove(uniqueId);
realSchedule(uniqueId, expectTriggerTime - System.currentTimeMillis(), timerTask);
}, delayMS - LONG_DELAY_THRESHOLD_MS, TimeUnit.MILLISECONDS);
CARGO.put(uniqueId, longDelayTask);
}
/**
@ -48,4 +57,15 @@ public class InstanceTimeWheelService {
return CARGO.get(uniqueId);
}
private static void realSchedule(Long uniqueId, Long delayMS, TimerTask timerTask) {
TimerFuture timerFuture = TIMER.schedule(() -> {
CARGO.remove(uniqueId);
timerTask.run();
}, delayMS, TimeUnit.MILLISECONDS);
if (delayMS > MIN_INTERVAL_MS) {
CARGO.put(uniqueId, timerFuture);
}
}
}

View File

@ -1,5 +1,7 @@
package com.github.kfcfans.powerjob.server.service.timing;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
@ -128,7 +130,7 @@ public class CleanService {
}
try {
Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay);
int num = instanceInfoRepository.deleteAllByGmtModifiedBefore(t);
int num = instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, InstanceStatus.finishedStatus);
log.info("[CleanService] deleted {} instanceInfo records whose modify time before {}.", num, t);
}catch (Exception e) {
log.warn("[CleanService] clean instanceInfo failed.", e);
@ -142,7 +144,7 @@ public class CleanService {
}
try {
Date t = DateUtils.addDays(new Date(), -instanceInfoRetentionDay);
int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBefore(t);
int num = workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(t, WorkflowInstanceStatus.finishedStatus);
log.info("[CleanService] deleted {} workflow instanceInfo records whose modify time before {}.", num, t);
}catch (Exception e) {
log.warn("[CleanService] clean workflow instanceInfo failed.", e);

View File

@ -162,7 +162,7 @@ public class InstanceStatusCheckService {
// 重试长时间处于 WAITING 状态的工作流实例
long threshold = System.currentTimeMillis() - WORKFLOW_WAITING_TIMEOUT_MS;
Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> {
List<WorkflowInstanceInfoDO> waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, WorkflowInstanceStatus.WAITING.getV(), new Date(threshold));
List<WorkflowInstanceInfoDO> waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, WorkflowInstanceStatus.WAITING.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingWfInstanceList)) {
List<Long> wfInstanceIds = waitingWfInstanceList.stream().map(WorkflowInstanceInfoDO::getWfInstanceId).collect(Collectors.toList());

View File

@ -193,7 +193,7 @@ public class OmsScheduleService {
wfInfos.forEach(wfInfo -> {
// 1. 先生成调度记录防止不调度的情况发生
Long wfInstanceId = workflowInstanceManager.create(wfInfo, null);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, null, wfInfo.getNextTriggerTime());
// 2. 推入时间轮准备调度执行
long delay = wfInfo.getNextTriggerTime() - System.currentTimeMillis();
@ -220,6 +220,9 @@ public class OmsScheduleService {
try {
// 查询所有的秒级任务只包含ID
List<Long> jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.frequentTypes);
if (CollectionUtils.isEmpty(jobIds)) {
return;
}
// 查询日志记录表中是否存在相关的任务
List<Long> runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.generalizedRunningStatus);
Set<Long> runningJobIdSet = Sets.newHashSet(runningJobIdList);

View File

@ -68,9 +68,10 @@ public class WorkflowInstanceManager {
* 创建工作流任务实例
* @param wfInfo 工作流任务元数据描述信息
* @param initParams 启动参数
* @param expectTriggerTime 预计执行时间
* @return wfInstanceId
*/
public Long create(WorkflowInfoDO wfInfo, String initParams) {
public Long create(WorkflowInfoDO wfInfo, String initParams, Long expectTriggerTime) {
Long wfId = wfInfo.getId();
Long wfInstanceId = idGenerateService.allocate();
@ -82,6 +83,7 @@ public class WorkflowInstanceManager {
newWfInstance.setWfInstanceId(wfInstanceId);
newWfInstance.setWorkflowId(wfId);
newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
newWfInstance.setExpectedTriggerTime(expectTriggerTime);
newWfInstance.setActualTriggerTime(System.currentTimeMillis());
newWfInstance.setWfInitParams(initParams);
@ -129,7 +131,7 @@ public class WorkflowInstanceManager {
// 并发度控制
int instanceConcurrency = workflowInstanceInfoRepository.countByWorkflowIdAndStatusIn(wfInfo.getId(), WorkflowInstanceStatus.generalizedRunningStatus);
if (instanceConcurrency > wfInfo.getMaxWfInstanceNum()) {
onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo);
onWorkflowInstanceFailed(String.format(SystemInstanceResult.TOO_MANY_INSTANCES, instanceConcurrency, wfInfo.getMaxWfInstanceNum()), wfInstanceInfo);
return;
}

View File

@ -168,7 +168,7 @@ public class WorkflowService {
private Long realRunWorkflow(WorkflowInfoDO wfInfo, String initParams, long delay) {
log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay);
if (delay <= 0) {
workflowInstanceManager.start(wfInfo, wfInstanceId, initParams);
}else {

View File

@ -32,6 +32,8 @@ public class WorkflowInstanceInfoVO {
private PEWorkflowDAG pEWorkflowDAG;
private String result;
// 预计触发时间
private String expectedTriggerTime;
// 实际触发时间需要格式化为人看得懂的时间
private String actualTriggerTime;
// 结束时间同理需要格式化
@ -49,6 +51,7 @@ public class WorkflowInstanceInfoVO {
vo.setWorkflowId(String.valueOf(wfInstanceDO.getWorkflowId()));
// 格式化时间
vo.setExpectedTriggerTime(DateFormatUtils.format(wfInstanceDO.getExpectedTriggerTime(), OmsConstant.TIME_PATTERN));
vo.setActualTriggerTime(DateFormatUtils.format(wfInstanceDO.getActualTriggerTime(), OmsConstant.TIME_PATTERN));
if (wfInstanceDO.getFinishedTime() == null) {
vo.setFinishedTime(OmsConstant.NONE);

View File

@ -2,18 +2,17 @@
<!-- 生产环境日志 -->
<configuration>
<!--默认配置-->
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<!--配置控制台(Console)-->
<include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
<!--
日志路径,注意权限问题,否则无法打印日志。
大坑记录:`~/logs`不会在用户目录下创建文件夹,而是在项目目录下创建名为~的文件夹
-->
<property name="LOG_PATH" value="${user.home}/powerjob-server/logs"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<!-- 系统所有异常日志ERROR双写 start -->
<appender name="ERROR_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender">

View File

@ -0,0 +1,126 @@
package com.github.kfcfans.powerjob.server.test;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTimer;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerTask;
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* 时间轮测试
*
* @author tjq
* @since 2020/11/28
*/
@Slf4j
public class HashedWheelTimerTest {
@Test
public void testHashedWheelTimer() throws Exception {
HashedWheelTimer timer = new HashedWheelTimer(1, 1024, 32);
List<TimerFuture> futures = Lists.newLinkedList();
for (int i = 0; i < 1000; i++) {
String name = "Task" + i;
long nowMS = System.currentTimeMillis();
int delayMS = ThreadLocalRandom.current().nextInt(60000);
long targetTime = delayMS + nowMS;
TimerTask timerTask = () -> {
System.out.println("============= " + name + "============= ");
System.out.println("ThreadInfo:" + Thread.currentThread().getName());
System.out.println("expectTime:" + targetTime);;
System.out.println("currentTime:" + System.currentTimeMillis());
System.out.println("deviation:" + (System.currentTimeMillis() - targetTime));
System.out.println("============= " + name + "============= ");
};
futures.add(timer.schedule(timerTask, delayMS, TimeUnit.MILLISECONDS));
}
// 随机取消
futures.forEach(future -> {
int x = ThreadLocalRandom.current().nextInt(2);
if (x == 1) {
future.cancel();
}
});
Thread.sleep(1000);
// 关闭
System.out.println(timer.stop().size());
System.out.println("Finished");
Thread.sleep(277777777);
}
@Test
public void testPerformance() throws Exception {
Stopwatch sw = Stopwatch.createStarted();
for (long i = 0; i < 10000000; i++) {
long delay = ThreadLocalRandom.current().nextLong(100, 120000);
long expect = System.currentTimeMillis() + delay;
InstanceTimeWheelService.schedule(i, delay, () -> {
log.info("[Performance] deviation:{}", (System.currentTimeMillis() - expect));
});
}
log.info("[Performance] insert cost: {}", sw);
Thread.sleep(90000);
}
@Test
public void testLongDelayTask() throws Exception {
for (long i = 0; i < 1000000; i++) {
long delay = ThreadLocalRandom.current().nextLong(60000, 60000 * 3);
long expect = System.currentTimeMillis() + delay;
InstanceTimeWheelService.schedule(i, delay, () -> {
log.info("[LongDelayTask] deviation: {}", (System.currentTimeMillis() - expect));
});
}
Thread.sleep(60000 * 4);
}
@Test
public void testCancelDelayTask() throws Exception {
AtomicLong executeNum = new AtomicLong();
AtomicLong cancelNum = new AtomicLong();
for (long i = 0; i < 1000000; i++) {
long delay = ThreadLocalRandom.current().nextLong(60000, 60000 * 2);
long expect = System.currentTimeMillis() + delay;
InstanceTimeWheelService.schedule(i, delay, () -> {
executeNum.incrementAndGet();
log.info("[CancelLongDelayTask] deviation: {}", (System.currentTimeMillis() - expect));
});
}
Thread.sleep(10000);
for (long i = 0; i < 1000000; i++) {
boolean nextBoolean = ThreadLocalRandom.current().nextBoolean();
if (nextBoolean) {
continue;
}
boolean cancel = InstanceTimeWheelService.fetchTimerFuture(i).cancel();
log.info("[CancelLongDelayTask] id:{},status:{}", i, cancel);
cancelNum.incrementAndGet();
}
Thread.sleep(60000 * 4);
log.info("[CancelLongDelayTask] result -> executeNum:{},cancelNum:{}", executeNum, cancelNum);
}
}

View File

@ -1,6 +1,8 @@
package com.github.kfcfans.powerjob.server.test;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.WorkflowInstanceStatus;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
@ -104,12 +106,12 @@ public class RepositoryTest {
@Test
public void testDeleteInstanceInfo() {
instanceInfoRepository.deleteAllByGmtModifiedBefore(new Date());
instanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), InstanceStatus.finishedStatus);
}
@Test
public void testDeleteWorkflowInstanceInfo() {
workflowInstanceInfoRepository.deleteAllByGmtModifiedBefore(new Date());
workflowInstanceInfoRepository.deleteAllByGmtModifiedBeforeAndStatusIn(new Date(), WorkflowInstanceStatus.finishedStatus);
}
}

View File

@ -24,49 +24,6 @@ import java.util.stream.Collectors;
*/
public class UtilsTest {
@Test
public void testHashedWheelTimer() throws Exception {
HashedWheelTimer timer = new HashedWheelTimer(1, 1024, 32);
List<TimerFuture> futures = Lists.newLinkedList();
for (int i = 0; i < 1000; i++) {
String name = "Task" + i;
long nowMS = System.currentTimeMillis();
int delayMS = ThreadLocalRandom.current().nextInt(60000);
long targetTime = delayMS + nowMS;
TimerTask timerTask = () -> {
System.out.println("============= " + name + "============= ");
System.out.println("ThreadInfo:" + Thread.currentThread().getName());
System.out.println("expectTime:" + targetTime);;
System.out.println("currentTime:" + System.currentTimeMillis());
System.out.println("deviation:" + (System.currentTimeMillis() - targetTime));
System.out.println("============= " + name + "============= ");
};
futures.add(timer.schedule(timerTask, delayMS, TimeUnit.MILLISECONDS));
}
// 随机取消
futures.forEach(future -> {
int x = ThreadLocalRandom.current().nextInt(2);
if (x == 1) {
future.cancel();
}
});
Thread.sleep(1000);
// 关闭
System.out.println(timer.stop().size());
System.out.println("Finished");
Thread.sleep(277777777);
}
@Test
public void testCronExpression() throws Exception {
String cron = "0 * * * * ? *";

View File

@ -0,0 +1,19 @@
<?xml version="1.0"?>
<!-- 生产环境日志 -->
<configuration>
<property name="CONSOLE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %m%n"/>
<!-- Console 输出设置 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>
</configuration>

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.3.3</version>
<version>3.4.0</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.3.3</powerjob.worker.version>
<powerjob.worker.version>3.4.0</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.3.3</version>
<version>3.4.0</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.starter.version>3.3.3</powerjob.worker.starter.version>
<powerjob.worker.starter.version>3.4.0</powerjob.worker.starter.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>3.3.3</version>
<version>3.4.0</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.3.3</powerjob.worker.version>
<powerjob.worker.version>3.4.0</powerjob.worker.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
</properties>

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.3.3</version>
<version>3.4.0</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.3.3</powerjob.common.version>
<powerjob.common.version>3.4.0</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>