From 12162f29554655298c3f91e280fcee721c5c0598 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 20 Dec 2020 20:40:36 +0800 Subject: [PATCH 01/24] feat: log full stack info when can't fetch processor #134 --- .../powerjob/worker/common/utils/SpringUtils.java | 8 +++++--- .../powerjob/worker/core/ProcessorBeanFactory.java | 4 +++- .../worker/core/tracker/processor/ProcessorTracker.java | 9 +++++---- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/SpringUtils.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/SpringUtils.java index b8cafad3..c495dbda 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/SpringUtils.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/SpringUtils.java @@ -1,15 +1,15 @@ package com.github.kfcfans.powerjob.worker.common.utils; +import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; -import java.util.Objects; - /** * Spring ApplicationContext 工具类 * * @author tjq * @since 2020/3/16 */ +@Slf4j public class SpringUtils { private static boolean supportSpringBean = false; @@ -43,7 +43,9 @@ public class SpringUtils { // 小写转大写 char[] cs = beanName.toCharArray(); cs[0] += 32; - return (T) context.getBean(String.valueOf(cs)); + String beanName0 = String.valueOf(cs); + log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", context, beanName0); + return (T) context.getBean(beanName0); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ProcessorBeanFactory.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ProcessorBeanFactory.java index 2b890d90..27dfbf32 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ProcessorBeanFactory.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ProcessorBeanFactory.java @@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.worker.core; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; import java.util.Map; @@ -37,7 +38,8 @@ public class ProcessorBeanFactory { return (BasicProcessor) clz.getDeclaredConstructor().newInstance(); }catch (Exception e) { - log.warn("[ProcessorBeanFactory] load local Processor(className = {}) failed, reason is {}", className, e.getMessage()); + log.warn("[ProcessorBeanFactory] load local Processor(className = {}) failed.", className, e); + ExceptionUtils.rethrow(e); } return null; }); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 4c7e42e2..e12f48cc 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -24,6 +24,7 @@ import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.util.CollectionUtils; import java.util.List; @@ -96,10 +97,10 @@ public class ProcessorTracker { initProcessor(); log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId); - }catch (Throwable e) { - log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, e); + } catch (Throwable t) { + log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, t); lethal = true; - lethalReason = e.toString(); + lethalReason = ExceptionUtils.getMessage(t); } } @@ -291,7 +292,7 @@ public class ProcessorTracker { try { processor = SpringUtils.getBean(processorInfo); }catch (Exception e) { - log.warn("[ProcessorTracker-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, e.toString()); + log.warn("[ProcessorTracker-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, ExceptionUtils.getMessage(e)); } } // 反射加载 From 2d989d2b0be42f33de5155e34dee939360f7c778 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 20 Dec 2020 21:28:37 +0800 Subject: [PATCH 02/24] revert: temporary remove @Type(StringType) due to it will change the db colum type to varchar --- .../powerjob/server/common/redirect/DesignateServer.java | 2 +- .../server/common/redirect/DesignateServerAspect.java | 2 +- .../server/persistence/core/model/InstanceInfoDO.java | 3 --- .../server/persistence/core/model/JobInfoDO.java | 2 -- .../server/persistence/core/model/TypeDefConstant.java | 9 --------- .../server/persistence/core/model/WorkflowInfoDO.java | 2 -- .../persistence/core/model/WorkflowInstanceInfoDO.java | 4 ---- .../server/persistence/core/model/package-info.java | 7 ------- .../kfcfans/powerjob/server/service/DispatchService.java | 2 +- 9 files changed, 3 insertions(+), 30 deletions(-) delete mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/TypeDefConstant.java delete mode 100644 powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/package-info.java diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServer.java index bc31aafd..7954b7e6 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServer.java @@ -6,7 +6,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * 执行服务器运行 + * 需要在指定的服务器运行 * 注意:该注解所在方法的参数必须为对象,不可以是 long 等基本类型 * * @author tjq diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServerAspect.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServerAspect.java index 87a47448..9d8684b3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServerAspect.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/redirect/DesignateServerAspect.java @@ -62,7 +62,7 @@ public class DesignateServerAspect { } if (appId == null) { - throw new PowerJobException("can't find appId in params!"); + throw new PowerJobException("can't find appId in params for:" + signature.toString()); } // 获取执行机器 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java index a956830c..5af69945 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java @@ -5,7 +5,6 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; -import org.hibernate.annotations.Type; import javax.persistence.*; import java.util.Date; @@ -37,7 +36,6 @@ public class InstanceInfoDO { // 任务实例参数 @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String instanceParams; // 该任务实例的类型,普通/工作流(InstanceType) @@ -51,7 +49,6 @@ public class InstanceInfoDO { // 执行结果(允许存储稍大的结果) @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String result; // 预计触发时间 private Long expectedTriggerTime; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java index ddccbf7a..b5d2b171 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java @@ -5,7 +5,6 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; -import org.hibernate.annotations.Type; import javax.persistence.*; import java.util.Date; @@ -53,7 +52,6 @@ public class JobInfoDO { // 执行器信息(可能需要存储整个脚本文件) @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String processorInfo; /* ************************** 运行时配置 ************************** */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/TypeDefConstant.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/TypeDefConstant.java deleted file mode 100644 index fbe4b199..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/TypeDefConstant.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.github.kfcfans.powerjob.server.persistence.core.model; - -/** - * @see package-info.java - * @author user - */ -public final class TypeDefConstant { - public static final String STRING_TYPE = "string-type"; -} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java index ee8c53a8..f3c5d560 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java @@ -4,7 +4,6 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; -import org.hibernate.annotations.Type; import javax.persistence.*; import java.util.Date; @@ -36,7 +35,6 @@ public class WorkflowInfoDO { // 工作流的DAG图信息(点线式DAG的json) @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String peDAG; /* ************************** 定时参数 ************************** */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java index d6290205..e6f483c4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java @@ -4,7 +4,6 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; -import org.hibernate.annotations.Type; import javax.persistence.*; import java.util.Date; @@ -40,16 +39,13 @@ public class WorkflowInstanceInfoDO { // 工作流启动参数 @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String wfInitParams; @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String dag; @Lob @Column - @Type(type = TypeDefConstant.STRING_TYPE) private String result; // 预计触发时间 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/package-info.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/package-info.java deleted file mode 100644 index 543fd8a0..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/package-info.java +++ /dev/null @@ -1,7 +0,0 @@ -@TypeDefs({ - @TypeDef(name = TypeDefConstant.STRING_TYPE, typeClass = org.hibernate.type.StringType.class) -}) -package com.github.kfcfans.powerjob.server.persistence.core.model; - -import org.hibernate.annotations.TypeDef; -import org.hibernate.annotations.TypeDefs; \ No newline at end of file diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index 3e5c41f4..6399a72f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -81,7 +81,7 @@ public class DispatchService { if (maxInstanceNum > 0) { // 这个 runningInstanceCount 已经包含了本 instance - // 不统计 WAITING_DISPATCH 的状态:使用 OpenAPI 触发的延迟任务显然不应该统计进去(比如 delay 是 1 天) + // 不统计 WAITING_DISPATCH 的状态:使用 OpenAPI 触发的延迟任务不应该统计进去(比如 delay 是 1 天) long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV())); // 超出最大同时运行限制,不执行调度 if (runningInstanceCount > maxInstanceNum) { From c176a447e72845eef947a4fc1cdfaa77dc6e1c36 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 20 Dec 2020 22:53:56 +0800 Subject: [PATCH 03/24] feat: support LogLevel --- .../kfcfans/powerjob/common/LogLevel.java | 34 +++++++++++++++++++ .../common/model/InstanceLogContent.java | 2 ++ .../persistence/local/LocalInstanceLogDO.java | 4 +++ .../server/service/InstanceLogService.java | 9 +++-- .../worker/background/OmsLogHandler.java | 5 +-- .../powerjob/worker/log/OmsLogger.java | 2 +- .../worker/log/impl/OmsServerLogger.java | 31 ++++++----------- 7 files changed, 62 insertions(+), 25 deletions(-) create mode 100644 powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/LogLevel.java diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/LogLevel.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/LogLevel.java new file mode 100644 index 00000000..58c1c920 --- /dev/null +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/LogLevel.java @@ -0,0 +1,34 @@ +package com.github.kfcfans.powerjob.common; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +import java.util.Objects; + +/** + * 日志级别 + * + * @author tjq + * @since 12/20/20 + */ +@Getter +@AllArgsConstructor +public enum LogLevel { + + DEBUG(1), + INFO(2), + WARN(3), + ERROR(4); + + private final int v; + + public static String genLogLevelString(Integer v) { + + for (LogLevel logLevel : values()) { + if (Objects.equals(logLevel.v, v)) { + return logLevel.name(); + } + } + return "UNKNOWN"; + } +} diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceLogContent.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceLogContent.java index a22db900..0be65e93 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceLogContent.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceLogContent.java @@ -20,6 +20,8 @@ public class InstanceLogContent implements OmsSerializable { private long instanceId; // 日志提交时间 private long logTime; + // 级别 + private int logLevel; // 日志内容 private String logContent; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/local/LocalInstanceLogDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/local/LocalInstanceLogDO.java index 040f8b2b..e9e5028a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/local/LocalInstanceLogDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/local/LocalInstanceLogDO.java @@ -28,6 +28,10 @@ public class LocalInstanceLogDO { * 日志时间 */ private Long logTime; + /** + * 日志级别 {@link com.github.kfcfans.powerjob.common.LogLevel} + */ + private Integer logLevel; /** * 日志内容 */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java index b197d31e..18fde5c4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.server.service; +import com.github.kfcfans.powerjob.common.LogLevel; import com.github.kfcfans.powerjob.common.OmsConstant; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.model.InstanceLogContent; @@ -320,12 +321,16 @@ public class InstanceLogService { /** - * 拼接日志 -> 2020-04-29 22:07:10.059 192.168.1.1:2777 INFO XXX + * 拼接日志 -> 2020-04-29 22:07:10.059 [192.168.1.1:2777] INFO XXX * @param instanceLog 日志对象 * @return 字符串 */ private static String convertLog(LocalInstanceLogDO instanceLog) { - return String.format("%s [%s] -%s", dateFormat.format(instanceLog.getLogTime()), instanceLog.getWorkerAddress(), instanceLog.getLogContent()); + return String.format("%s [%s] %s %s", + dateFormat.format(instanceLog.getLogTime()), + instanceLog.getWorkerAddress(), + LogLevel.genLogLevelString(instanceLog.getLogLevel()), + instanceLog.getLogContent()); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/OmsLogHandler.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/OmsLogHandler.java index 0ad1261e..40aed030 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/OmsLogHandler.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/OmsLogHandler.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.worker.background; import akka.actor.ActorSelection; +import com.github.kfcfans.powerjob.common.LogLevel; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.model.InstanceLogContent; import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq; @@ -48,14 +49,14 @@ public class OmsLogHandler { * @param instanceId 任务实例ID * @param logContent 日志内容 */ - public void submitLog(long instanceId, String logContent) { + public void submitLog(long instanceId, LogLevel logLevel, String logContent) { if (logQueue.size() > REPORT_SIZE) { // 线程的生命周期是个不可循环的过程,一个线程对象结束了不能再次start,只能一直创建和销毁 new Thread(logSubmitter).start(); } - InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logContent); + InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent); logQueue.offer(tuple); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/OmsLogger.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/OmsLogger.java index 0ecc4e90..77074c18 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/OmsLogger.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/OmsLogger.java @@ -1,7 +1,7 @@ package com.github.kfcfans.powerjob.worker.log; /** - * OhMyScheduler 在线日志,直接上报到 Server,可在控制台直接查看 + * PowerJob 在线日志,直接上报到 Server,可在控制台直接查看 * * @author tjq * @since 2020/4/21 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/impl/OmsServerLogger.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/impl/OmsServerLogger.java index 50021cc2..38d7c1b7 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/impl/OmsServerLogger.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/log/impl/OmsServerLogger.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.worker.log.impl; +import com.github.kfcfans.powerjob.common.LogLevel; import com.github.kfcfans.powerjob.worker.background.OmsLogHandler; import com.github.kfcfans.powerjob.worker.log.OmsLogger; import lombok.AllArgsConstructor; @@ -9,7 +10,7 @@ import org.slf4j.helpers.MessageFormatter; /** - * OhMyScheduler 在线日志,直接上报到 Server,可在控制台直接查看 + * PowerJob 在线日志,直接上报到 Server,可在控制台直接查看 * * @author tjq * @since 2020/4/21 @@ -19,45 +20,35 @@ public class OmsServerLogger implements OmsLogger { private final long instanceId; - // Level|业务方自身的日志 - private static final String LOG_PREFIX = "{} "; - @Override public void debug(String messagePattern, Object... args) { - process("DEBUG", messagePattern, args); + process(LogLevel.DEBUG, messagePattern, args); } @Override public void info(String messagePattern, Object... args) { - process("INFO", messagePattern, args); + process(LogLevel.INFO, messagePattern, args); } @Override public void warn(String messagePattern, Object... args) { - process("WARN", messagePattern, args); + process(LogLevel.WARN, messagePattern, args); } @Override public void error(String messagePattern, Object... args) { - process("ERROR", messagePattern, args); + process(LogLevel.ERROR, messagePattern, args); } /** * 生成日志内容 - * @param level 级别,DEBUG/INFO/WARN/ERROR * @param messagePattern 日志格式 * @param arg 填充参数 * @return 生成完毕的日志内容 */ - private static String genLog(String level, String messagePattern, Object... arg) { - - String pattern = LOG_PREFIX + messagePattern; - Object[] newArgs = new Object[arg.length + 1]; - newArgs[0] = level; - System.arraycopy(arg, 0, newArgs, 1, arg.length); - + private static String genLogContent(String messagePattern, Object... arg) { // 借用 Slf4J 直接生成日志信息 - FormattingTuple formattingTuple = MessageFormatter.arrayFormat(pattern, newArgs); + FormattingTuple formattingTuple = MessageFormatter.arrayFormat(messagePattern, arg); if (formattingTuple.getThrowable() != null) { String stackTrace = ExceptionUtils.getStackTrace(formattingTuple.getThrowable()); return formattingTuple.getMessage() + System.lineSeparator() + stackTrace; @@ -66,9 +57,9 @@ public class OmsServerLogger implements OmsLogger { } } - private void process(String level, String messagePattern, Object... args) { - String logContent = genLog(level, messagePattern, args); - OmsLogHandler.INSTANCE.submitLog(instanceId, logContent); + private void process(LogLevel level, String messagePattern, Object... args) { + String logContent = genLogContent(messagePattern, args); + OmsLogHandler.INSTANCE.submitLog(instanceId, level, logContent); } } \ No newline at end of file From e8ada3789f1d07c16990afb3566d18dabead6474 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 20 Dec 2020 23:14:44 +0800 Subject: [PATCH 04/24] fix: can't download log file when sever and browser are not in the same network --- .../web/controller/InstanceController.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java index ae36a78a..61d806f2 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java @@ -17,6 +17,7 @@ import com.github.kfcfans.powerjob.server.web.request.QueryInstanceRequest; import com.github.kfcfans.powerjob.server.web.response.InstanceDetailVO; import com.github.kfcfans.powerjob.server.web.response.InstanceInfoVO; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; import org.springframework.beans.BeanUtils; import org.springframework.data.domain.Example; import org.springframework.data.domain.Page; @@ -28,6 +29,7 @@ import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import javax.servlet.http.HttpServletResponse; import java.io.File; +import java.net.URL; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -92,6 +94,24 @@ public class InstanceController { OmsFileUtils.file2HttpResponse(file, response); } + @GetMapping("/downloadLog4Console") + public void downloadLog4Console(Long appId, Long instanceId , HttpServletResponse response) throws Exception { + // 获取内部下载链接 + String downloadUrl = instanceLogService.fetchDownloadUrl(appId, instanceId); + // 先下载到本机 + String logFilePath = OmsFileUtils.genTemporaryWorkPath() + String.format("powerjob-%s-%s.log", appId, instanceId); + File logFile = new File(logFilePath); + + try { + FileUtils.copyURLToFile(new URL(downloadUrl), logFile); + + // 再推送到浏览器 + OmsFileUtils.file2HttpResponse(logFile, response); + } finally { + FileUtils.forceDelete(logFile); + } + } + @PostMapping("/list") public ResultDTO> list(@RequestBody QueryInstanceRequest request) { From 4f3b6057b6a5fd831538a375d3f306b238d14829 Mon Sep 17 00:00:00 2001 From: jjnnzb Date: Sun, 27 Dec 2020 15:58:16 +0800 Subject: [PATCH 05/24] Swagger API document version keep update-to-date with pom.xml version. --- powerjob-server/pom.xml | 1 + .../server/common/config/SwaggerConfig.java | 15 +++++++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 80a62d2e..ad6e673e 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -214,6 +214,7 @@ + build-info repackage diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java index 804e45ce..6334df98 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.server.common.config; +import org.springframework.boot.info.BuildProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; @@ -8,18 +9,24 @@ import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; +import javax.annotation.Resource; + import static springfox.documentation.builders.PathSelectors.any; /** * Swagger UI 配置 * * @author tjq + * @author Jiang Jining * @since 2020/3/29 */ @Configuration @EnableSwagger2 public class SwaggerConfig { - + + @Resource + private BuildProperties buildProperties; + @Bean public Docket createRestApi() { // apiInfo()用来创建该Api的基本信息(这些基本信息会展现在文档页面中 @@ -28,9 +35,9 @@ public class SwaggerConfig { .description("Distributed scheduling and computing framework.") .license("Apache Licence 2") .termsOfServiceUrl("https://github.com/KFCFans/PowerJob") - .version("3.3.3") + .version(buildProperties.getVersion()) .build(); - + return new Docket(DocumentationType.SWAGGER_2) .apiInfo(apiInfo) // select()函数返回一个ApiSelectorBuilder实例 @@ -39,5 +46,5 @@ public class SwaggerConfig { .paths(any()) .build(); } - + } From d4af8138d0afd167edaad17022502ad7bfbfc38c Mon Sep 17 00:00:00 2001 From: jjnnzb Date: Sun, 27 Dec 2020 21:53:36 +0800 Subject: [PATCH 06/24] Prevent codes from reporting error when pom.xml is not reinstalled. --- .../server/common/config/SwaggerConfig.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java index 6334df98..296e57fd 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.server.common.config; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.info.BuildProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -9,12 +10,10 @@ import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; -import javax.annotation.Resource; - import static springfox.documentation.builders.PathSelectors.any; /** - * Swagger UI 配置 + * Configuration class for Swagger UI. * * @author tjq * @author Jiang Jining @@ -24,18 +23,25 @@ import static springfox.documentation.builders.PathSelectors.any; @EnableSwagger2 public class SwaggerConfig { - @Resource - private BuildProperties buildProperties; + private final BuildProperties buildProperties; + + public SwaggerConfig(@Autowired(required = false) final BuildProperties buildProperties) { + this.buildProperties = buildProperties; + } @Bean public Docket createRestApi() { + String version = "3.3.3"; + if (buildProperties != null) { + version = buildProperties.getVersion(); + } // apiInfo()用来创建该Api的基本信息(这些基本信息会展现在文档页面中 ApiInfo apiInfo = new ApiInfoBuilder() .title("PowerJob") .description("Distributed scheduling and computing framework.") .license("Apache Licence 2") .termsOfServiceUrl("https://github.com/KFCFans/PowerJob") - .version(buildProperties.getVersion()) + .version(version) .build(); return new Docket(DocumentationType.SWAGGER_2) From 5329fba6b07d12312f585aa3dc94bd4d5a3c6d85 Mon Sep 17 00:00:00 2001 From: jjnnzb Date: Mon, 28 Dec 2020 23:19:18 +0800 Subject: [PATCH 07/24] Set deafult version to Unknown and trim version info. --- .../kfcfans/powerjob/server/common/config/SwaggerConfig.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java index 296e57fd..694c83b8 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.server.common.config; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.info.BuildProperties; import org.springframework.context.annotation.Bean; @@ -31,9 +32,9 @@ public class SwaggerConfig { @Bean public Docket createRestApi() { - String version = "3.3.3"; + String version = "Unknown"; if (buildProperties != null) { - version = buildProperties.getVersion(); + version = StringUtils.trimToEmpty(buildProperties.getVersion()); } // apiInfo()用来创建该Api的基本信息(这些基本信息会展现在文档页面中 ApiInfo apiInfo = new ApiInfoBuilder() From ef881cfcac315753ff494d6c301293716614fad9 Mon Sep 17 00:00:00 2001 From: jjnnzb Date: Tue, 29 Dec 2020 07:46:07 +0800 Subject: [PATCH 08/24] Change version judgement info. --- .../kfcfans/powerjob/server/common/config/SwaggerConfig.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java index 694c83b8..f4411d09 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java @@ -34,7 +34,10 @@ public class SwaggerConfig { public Docket createRestApi() { String version = "Unknown"; if (buildProperties != null) { - version = StringUtils.trimToEmpty(buildProperties.getVersion()); + String pomVersion = buildProperties.getVersion(); + if (StringUtils.isNotBlank(pomVersion)) { + version = pomVersion; + } } // apiInfo()用来创建该Api的基本信息(这些基本信息会展现在文档页面中 ApiInfo apiInfo = new ApiInfoBuilder() From 6ae24ac9ed24f649263ad22fa38158962883ed1b Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 1 Jan 2021 12:33:52 +0800 Subject: [PATCH 09/24] fix: OpenAPI can't save workflow --- .../powerjob/server/web/controller/OpenAPIController.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java index 7ee1cc58..a9bf3968 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java @@ -122,9 +122,6 @@ public class OpenAPIController { /* ************* Workflow 区 ************* */ @PostMapping(OpenAPIConstant.SAVE_WORKFLOW) public ResultDTO saveWorkflow(@RequestBody SaveWorkflowRequest request) throws Exception { - if (request.getId() != null) { - checkJobIdValid(request.getId(), request.getAppId()); - } return ResultDTO.success(workflowService.saveWorkflow(request)); } From 2b7936b39b39fb8d739c0b75e9217c3d08a65254 Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 1 Jan 2021 12:53:51 +0800 Subject: [PATCH 10/24] docs: update readme --- README.md | 113 ++++++++++++++++++++++++++----------------------- README_enUS.md | 83 ------------------------------------ README_zhCN.md | 78 ++++++++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 135 deletions(-) delete mode 100644 README_enUS.md create mode 100644 README_zhCN.md diff --git a/README.md b/README.md index 5800098a..fa40f09d 100644 --- a/README.md +++ b/README.md @@ -1,76 +1,85 @@ -

+English | [简体中文](./README_zhCN.md) + +

PowerJob

-

+

actions Maven Central GitHub release (latest SemVer) LICENSE

-PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架,能让您轻松完成作业的调度与繁杂任务的分布式计算。 -# 简介 -### 主要特性 -* 使用简单:提供前端Web界面,允许开发者可视化地完成调度任务的管理(增、删、改、查)、任务运行状态监控和运行日志查看等功能。 -* 定时策略完善:支持CRON表达式、固定频率、固定延迟和API四种定时调度策略。 -* 执行模式丰富:支持单机、广播、Map、MapReduce四种执行模式,其中Map/MapReduce处理器能使开发者寥寥数行代码便获得集群分布式计算的能力。 -* DAG工作流支持:支持在线配置任务依赖关系,可视化得对任务进行编排,同时还支持上下游任务间的数据传递 -* 执行器支持广泛:支持Spring Bean、内置/外置Java类、Shell、Python等处理器,应用范围广。 -* 运维便捷:支持在线日志功能,执行器产生的日志可以在前端控制台页面实时显示,降低debug成本,极大地提高开发效率。 -* 依赖精简:最小仅依赖关系型数据库(MySQL/Oracle/MS SQLServer...),扩展依赖为MongoDB(用于存储庞大的在线日志)。 -* 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。 -* 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。 +- Have you ever wondered how cron jobs could be organized orderly? +- Have you ever felt upset when scheduling tasks suddenly terminated without any warning? +- Have you ever felt helpless when batches of business tasks require handling? +- Have you ever felt depressed about tasks that carry with complex dependencies? -### 适用场景 -* 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。 -* 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。 -* 有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce处理器完成任务的分发,调动整个集群加速计算。 -* 有需要**延迟执行**某些任务的业务场景:比如订单过期处理等。 +Well, PowerJob is there for you, it is the choice of a new generation. It is a powerful, business-oriented scheduling framework that provides distributed computing ability. Based on Akka architecture, it makes everything with scheduling easier. Just with several steps, PowerJob could be deployed and work for you! -### 设计目标 -PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server,旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。 +# Introduction -### 在线试用 -试用地址:[try.powerjob.tech](http://try.powerjob.tech/) -试用应用名称:powerjob-agent-test -控制台密码:123 +### Features +- Simple to use: PowerJob provides a friendly front-end Web that allows developers to visually manage tasks (Create, Read, Update and Delete), monitor tasks, and view logs online. +- Complete timing strategy: PowerJob supports four different scheduling strategies, including CRON expression, fixed frequency timing, fixed delay timing as well as the Open API. +- Various execution modes: PowerJob supports four execution modes: stand-alone, broadcast, Map, and MapReduce. It's worth mentioning the Map and MapReduce modes. With several lines of codes, developers could take full advantage of PowerJob's distributed computing ability. +- Complete workflow support. PowerJob supports DAG(Directed acyclic graph) based online task configuration. Developers could arrange tasks on the console, while data could be transferred among tasks on the flow. +- Extensive executor support: PowerJob supports multiple processors, including Spring Beans, ordinary Java objects, Shell, Python and so on. +- Simple in dependency: PowerJob aims to be simple in dependency. The only dependency is merely database (MySQL / Oracle / MS SQLServer ...), with MongoDB being the extra dependency for storing large log files online. +- High availability and performance: Unlike traditional job-scheduling frameworks that rely on database locks, PowerJob server is lock-free. PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance by deploying as many PowerJob server instances as you need. +- Quick failover and recovery support: Whenever any task failed, PowerJob server would retry according to the configured strategy. As long as there were enough nodes in the cluster, the failed tasks could execute successfully finally. +- Convenient to run and maintain: PowerJob supports online logging. Logs generated by the worker would be transferred and displayed on the console instantly, therefore reducing the cost of debugging and improving the efficiency significantly. -[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn) +### Applicable scenes -### 同类产品对比 -| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | -| -------------- | ------------------------ | ---------------------------------------- | ------------------------------------------------- | ------------------------------------------------------------ | -| 定时类型 | CRON | CRON | CRON、固定频率、固定延迟、OpenAPI | **CRON、固定频率、固定延迟、OpenAPI** | -| 任务类型 | 内置Java | 内置Java、GLUE Java、Shell、Python等脚本 | 内置Java、外置Java(FatJar)、Shell、Python等脚本 | **内置Java、外置Java(容器)、Shell、Python等脚本** | -| 分布式计算 | 无 | 静态分片 | MapReduce动态分片 | **MapReduce动态分片** | -| 在线任务治理 | 不支持 | 支持 | 支持 | **支持** | -| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** | -| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** | -| 报警监控 | 无 | 邮件 | 短信 | **WebHook、邮件、钉钉与自定义扩展** | -| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** | -| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** | +- Scenarios with timed tasks: such as full synchronization of data at midnight, generating business reports at desired time. +- Scenarios that require all machines to run tasks simultaneously: such as log cleanup. +- Scenarios that require distributed processing: For example, a large amount of data requires updating, while the stand-alone execution takes quite a lot of time. The Map/MapReduce mode could be applied in which the workers would join the cluster for PowerJob server to dispatch, to speed up the time-consuming process, therefore improving the computing ability of the whole cluster. +- Scenarios with delayed tasks: For instance, disposal of overdue orders. +### Design goals -# 官方文档 -**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** +PowerJob aims to be an enterprise scheduling middleware. By deploying PowerJob-server as the scheduling center, +all the applications could gain scheduling and distributed computing ability relying on PowerJob-worker. -**[Document](https://www.yuque.com/powerjob/en/xrdoqw)** +### Online trial -PS:感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献! +Trial address: [Online Trial Address](http://try.powerjob.tech/) +Application name: powerjob-agent-test +Application password: 123 -# 接入登记 -[点击进行接入登记,为 PowerJob 的发展贡献自己的力量!](https://github.com/KFCFans/PowerJob/issues/6) +### Comparison with similar products -ღ( ´・ᴗ・\` )ღ 感谢以下接入用户的大力支持 ღ( ´・ᴗ・\` )ღ +| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | +| ---------------------------------- | --------------------------------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | +| Timing type | CRON | CRON | CRON, fixed frequency, fixed delay, OpenAPI | **CRON, fixed frequency, fixed delay, OpenAPI** | +| Task type | Built-in Java | Built-in Java, GLUE Java, Shell, Python and other scripts | Built-in Java, external Java (FatJar), Shell, Python and other scripts | **Built-in Java, external Java (container), Shell, Python and other scripts** | +| Distributed strategy | Unsupported | Static sharding | MapReduce dynamic sharding | **MapReduce dynamic sharding** | +| Online task management | Unsupported | Supported | Supported | **Supported** | +| Online logging | Unsupported | Supported | Unsupported | **Supported** | +| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** | +| Alarm monitoring | Unsupported | Email | SMS | **Email, WebHook, DingTalk. An interface is provided for customization.** | +| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | RMB (Public Beta version for free, hey, helping to promote) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** | +| workflow | Unsupported | Unsupported | Supported | **Supported** | + +# Document +**[GitHub Wiki](https://github.com/KFCFans/PowerJob/wiki)** -

+**[中文文档](https://www.yuque.com/powerjob/product)** + +# User Registration +[Click to register as PowerJob user and contribute to PowerJob!](https://github.com/KFCFans/PowerJob/issues/6) +ღ( ´・ᴗ・\` )ღ Many thanks to the following registered users. ღ( ´・ᴗ・\` )ღ +

PowerJob User

-# 其他 -* 开源许可证:Apache License, Version 2.0 -* 欢迎共同参与本项目的贡献,PR和Issue都大大滴欢迎(求求了)~ -* 觉得还不错的话,可以点个Star支持一下哦~ = ̄ω ̄= -* 联系方式@KFCFans -> `tengjiqi@gmail.com` -* 用户交流QQ群:487453839 \ No newline at end of file + +# Others + +- PowerJob is permanently open source software(Apache License, Version 2.0), please feel free to try, deploy and put into production! +- Author of PowerJob (@KFCFans) has abundant time for maintenance, and is willing to provide technical support if you have needs! +- Welcome to contribute to PowerJob, both Pull Requests and Issues are precious. +- Please STAR PowerJob if it is valuable. ~ =  ̄ω ̄ = +- Do you need any help or want to propose suggestions? Please raise Github issues or contact the Author @KFCFans-> `tengjiqi@gmail.com` directly. \ No newline at end of file diff --git a/README_enUS.md b/README_enUS.md deleted file mode 100644 index 3cbcad04..00000000 --- a/README_enUS.md +++ /dev/null @@ -1,83 +0,0 @@ -

-PowerJob -

- -

-actions -Maven Central -GitHub release (latest SemVer) -LICENSE -

- -- Have you ever wondered how cron jobs could be organized orderly? -- Have you ever felt upset when scheduling tasks suddenly terminated without any warning? -- Have you ever felt helpless when batches of business tasks require handling? -- Have you ever felt depressed about tasks that carry with complex dependencies? - -Well, PowerJob is there for you, it is the choice of a new generation. It is a powerful, business-oriented scheduling framework that provides distributed computing ability. Based on Akka architecture, it makes everything with scheduling easier. Just with several steps, PowerJob could be deployed and work for you! - -# Introduction - -### Features -- Simple to use: PowerJob provides a friendly front-end Web that allows developers to visually manage tasks (Create, Read, Update and Delete), monitor tasks, and view logs online. -- Complete timing strategy: PowerJob supports four different scheduling strategies, including CRON expression, fixed frequency timing, fixed delay timing as well as the Open API. -- Various execution modes: PowerJob supports four execution modes: stand-alone, broadcast, Map, and MapReduce. It's worth mentioning the Map and MapReduce modes. With several lines of codes, developers could take full advantage of PowerJob's distributed computing ability. -- Complete workflow support. PowerJob supports DAG(Directed acyclic graph) based online task configuration. Developers could arrange tasks on the console, while data could be transferred among tasks on the flow. -- Extensive executor support: PowerJob supports multiple processors, including Spring Beans, ordinary Java objects, Shell, Python and so on. -- Simple in dependency: PowerJob aims to be simple in dependency. The only dependency is merely database (MySQL / Oracle / MS SQLServer ...), with MongoDB being the extra dependency for storing large log files online. -- High availability and performance: Unlike traditional job-scheduling frameworks that rely on database locks, PowerJob server is lock-free. PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance by deploying as many PowerJob server instances as you need. -- Quick failover and recovery support: Whenever any task failed, PowerJob server would retry according to the configured strategy. As long as there were enough nodes in the cluster, the failed tasks could execute successfully finally. -- Convenient to run and maintain: PowerJob supports online logging. Logs generated by the worker would be transferred and displayed on the console instantly, therefore reducing the cost of debugging and improving the efficiency significantly. - -### Applicable scenes - -- Scenarios with timed tasks: such as full synchronization of data at midnight, generating business reports at desired time. -- Scenarios that require all machines to run tasks simultaneously: such as log cleanup. -- Scenarios that require distributed processing: For example, a large amount of data requires updating, while the stand-alone execution takes quite a lot of time. The Map/MapReduce mode could be applied in which the workers would join the cluster for PowerJob server to dispatch, to speed up the time-consuming process, therefore improving the computing ability of the whole cluster. -- Scenarios with delayed tasks: For instance, disposal of overdue orders. - -### Design goals - -PowerJob aims to be an enterprise scheduling middleware. By deploying PowerJob-server as the scheduling center, -all the applications could gain scheduling and distributed computing ability relying on PowerJob-worker. - -### Online trial - -Trial address: [Online Trial Address](http://try.powerjob.tech/) -Application name: powerjob-agent-test -Application password: 123 - -### Comparison with similar products - -| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | -| ---------------------------------- | --------------------------------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | -| Timing type | CRON | CRON | CRON, fixed frequency, fixed delay, OpenAPI | **CRON, fixed frequency, fixed delay, OpenAPI** | -| Task type | Built-in Java | Built-in Java, GLUE Java, Shell, Python and other scripts | Built-in Java, external Java (FatJar), Shell, Python and other scripts | **Built-in Java, external Java (container), Shell, Python and other scripts** | -| Distributed strategy | Unsupported | Static sharding | MapReduce dynamic sharding | **MapReduce dynamic sharding** | -| Online task management | Unsupported | Supported | Supported | **Supported** | -| Online logging | Unsupported | Supported | Unsupported | **Supported** | -| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** | -| Alarm monitoring | Unsupported | Email | SMS | **Email, WebHook, DingTalk. An interface is provided for customization.** | -| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | RMB (Public Beta version for free, hey, helping to promote) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** | -| workflow | Unsupported | Unsupported | Supported | **Supported** | - -# Document -**[GitHub Wiki](https://github.com/KFCFans/PowerJob/wiki)** - -**[中文文档](https://www.yuque.com/powerjob/product)** - -# User Registration -[Click to register as PowerJob user and contribute to PowerJob!](https://github.com/KFCFans/PowerJob/issues/6) -ღ( ´・ᴗ・\` )ღ Many thanks to the following registered users. ღ( ´・ᴗ・\` )ღ -

-PowerJob User -

- - -# Others - -- PowerJob is permanently open source software(Apache License, Version 2.0), please feel free to try, deploy and put into production! -- Author of PowerJob (@KFCFans) has abundant time for maintenance, and is willing to provide technical support if you have needs! -- Welcome to contribute to PowerJob, both Pull Requests and Issues are precious. -- Please STAR PowerJob if it is valuable. ~ =  ̄ω ̄ = -- Do you need any help or want to propose suggestions? Please raise Github issues or contact the Author @KFCFans-> `tengjiqi@gmail.com` directly. \ No newline at end of file diff --git a/README_zhCN.md b/README_zhCN.md new file mode 100644 index 00000000..1482418b --- /dev/null +++ b/README_zhCN.md @@ -0,0 +1,78 @@ +[English](./README.md) | 简体中文 + +

+PowerJob +

+ +

+actions +Maven Central +GitHub release (latest SemVer) +LICENSE +

+ +PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架,能让您轻松完成作业的调度与繁杂任务的分布式计算。 +# 简介 +### 主要特性 +* 使用简单:提供前端Web界面,允许开发者可视化地完成调度任务的管理(增、删、改、查)、任务运行状态监控和运行日志查看等功能。 +* 定时策略完善:支持CRON表达式、固定频率、固定延迟和API四种定时调度策略。 +* 执行模式丰富:支持单机、广播、Map、MapReduce四种执行模式,其中Map/MapReduce处理器能使开发者寥寥数行代码便获得集群分布式计算的能力。 +* DAG工作流支持:支持在线配置任务依赖关系,可视化得对任务进行编排,同时还支持上下游任务间的数据传递 +* 执行器支持广泛:支持Spring Bean、内置/外置Java类、Shell、Python等处理器,应用范围广。 +* 运维便捷:支持在线日志功能,执行器产生的日志可以在前端控制台页面实时显示,降低debug成本,极大地提高开发效率。 +* 依赖精简:最小仅依赖关系型数据库(MySQL/Oracle/MS SQLServer...),扩展依赖为MongoDB(用于存储庞大的在线日志)。 +* 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。 +* 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。 + +### 适用场景 +* 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。 +* 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。 +* 有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce处理器完成任务的分发,调动整个集群加速计算。 +* 有需要**延迟执行**某些任务的业务场景:比如订单过期处理等。 + +### 设计目标 +PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server,旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。 + +### 在线试用 +试用地址:[try.powerjob.tech](http://try.powerjob.tech/) +试用应用名称:powerjob-agent-test +控制台密码:123 + +[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn) + +### 同类产品对比 +| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | +| -------------- | ------------------------ | ---------------------------------------- | ------------------------------------------------- | ------------------------------------------------------------ | +| 定时类型 | CRON | CRON | CRON、固定频率、固定延迟、OpenAPI | **CRON、固定频率、固定延迟、OpenAPI** | +| 任务类型 | 内置Java | 内置Java、GLUE Java、Shell、Python等脚本 | 内置Java、外置Java(FatJar)、Shell、Python等脚本 | **内置Java、外置Java(容器)、Shell、Python等脚本** | +| 分布式计算 | 无 | 静态分片 | MapReduce动态分片 | **MapReduce动态分片** | +| 在线任务治理 | 不支持 | 支持 | 支持 | **支持** | +| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** | +| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** | +| 报警监控 | 无 | 邮件 | 短信 | **WebHook、邮件、钉钉与自定义扩展** | +| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币 | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** | +| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** | + + +# 官方文档 +**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** + +**[Document](https://www.yuque.com/powerjob/en/xrdoqw)** + +PS:感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献! + +# 接入登记 +[点击进行接入登记,为 PowerJob 的发展贡献自己的力量!](https://github.com/KFCFans/PowerJob/issues/6) + +ღ( ´・ᴗ・\` )ღ 感谢以下接入用户的大力支持 ღ( ´・ᴗ・\` )ღ + +

+PowerJob User +

+ +# 其他 +* 开源许可证:Apache License, Version 2.0 +* 欢迎共同参与本项目的贡献,PR和Issue都大大滴欢迎(求求了)~ +* 觉得还不错的话,可以点个Star支持一下哦~ = ̄ω ̄= +* 联系方式@KFCFans -> `tengjiqi@gmail.com` +* 用户交流QQ群:487453839 \ No newline at end of file From 83dae8ddd30bb449f29d37c021d2c34ae2811a62 Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 1 Jan 2021 12:55:35 +0800 Subject: [PATCH 11/24] docs: update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fa40f09d..4e82e41b 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ English | [简体中文](./README_zhCN.md) -

+

PowerJob

From 1b68fdb7d7d6d5546dcfa3b3105b9bbaa7859c9f Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 1 Jan 2021 12:56:51 +0800 Subject: [PATCH 12/24] docs: update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4e82e41b..83536535 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ English | [简体中文](./README_zhCN.md) PowerJob

-

+

actions Maven Central GitHub release (latest SemVer) From 269d64065cc85904cf009bdeadf2fda14266588a Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 1 Jan 2021 13:00:44 +0800 Subject: [PATCH 13/24] docs: update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 83536535..f0115858 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,7 @@ Application password: 123 | workflow | Unsupported | Unsupported | Supported | **Supported** | # Document -**[GitHub Wiki](https://github.com/KFCFans/PowerJob/wiki)** +**[Docs](https://www.yuque.com/powerjob/en/introduce)** **[中文文档](https://www.yuque.com/powerjob/product)** From 0e77a23e76e0e30f6839cfe4d814350fefb13e57 Mon Sep 17 00:00:00 2001 From: ocean23 <43363687@qq.com> Date: Fri, 1 Jan 2021 14:17:54 +0800 Subject: [PATCH 14/24] =?UTF-8?q?fix=E5=B9=B6=E5=8F=91=E6=83=85=E5=86=B5?= =?UTF-8?q?=E4=B8=8Bserver=E7=AB=AF=E5=88=A0=E9=99=A4=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=97=B6=E6=95=B0=E6=8D=AE=E5=B7=B2=E7=BB=8F=E8=A2=AB=E5=85=B6?= =?UTF-8?q?=E4=BB=96server=E5=88=A0=E9=99=A4=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../persistence/mongodb/GridFsManager.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/mongodb/GridFsManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/mongodb/GridFsManager.java index 71d7d8b8..da4f8449 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/mongodb/GridFsManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/mongodb/GridFsManager.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.server.persistence.mongodb; import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; +import com.github.kfcfans.powerjob.server.service.lock.LockService; import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; import com.mongodb.client.MongoDatabase; @@ -38,6 +39,8 @@ public class GridFsManager implements InitializingBean { @Resource private Environment environment; + @Resource + private LockService lockService; private MongoDatabase db; private boolean available; @@ -107,9 +110,22 @@ public class GridFsManager implements InitializingBean { * @param day 日期偏移量,单位 天 */ public void deleteBefore(String bucketName, int day) { + String deleteFsLock = "deleteFsLock"; + // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了 + boolean lock = lockService.lock(deleteFsLock, 10 * 60 * 1000); + if (!lock) { + log.info("[GridFsManager] deleted task is running, it's ok to return."); + return; + } + try{ + deleteHistoryFile(bucketName, day); + }finally { + lockService.unlock(deleteFsLock); + } + } + private void deleteHistoryFile(String bucketName, int day) { Stopwatch sw = Stopwatch.createStarted(); - Date date = DateUtils.addDays(new Date(), -day); GridFSBucket bucket = getBucket(bucketName); Bson filter = Filters.lt("uploadDate", date); From 3ecefd22cb4b0fb7e331b6da743f19172832dc32 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 2 Jan 2021 11:31:48 +0800 Subject: [PATCH 15/24] fix: the bug of idle check #146 --- .../server/service/workflow/WorkflowInstanceManager.java | 2 +- .../worker/core/tracker/processor/ProcessorTracker.java | 8 ++++++-- .../powerjob/worker/core/tracker/task/TaskTracker.java | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index a10b8a92..adf8cc4c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -314,7 +314,7 @@ public class WorkflowInstanceManager { } /** - * 允许任务实例 + * 运行任务实例 * 需要将创建和运行任务实例分离,否则在秒失败情况下,会发生DAG覆盖更新的问题 * @param jobId 任务ID * @param instanceId 任务实例ID diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index e12f48cc..15876cb6 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -55,8 +55,10 @@ public class ProcessorTracker { private OmsLogger omsLogger; // ProcessResult 上报失败的重试队列 private Queue statusReportRetryQueue; - // 上一次空闲时间 + // 上一次空闲时间(用于闲置判定) private long lastIdleTime; + // 上次完成任务数量(用于闲置判定) + private long lastCompletedTaskCount; private String taskTrackerAddress; private ActorSelection taskTrackerActorRef; @@ -88,6 +90,7 @@ public class ProcessorTracker { this.omsLogger = new OmsServerLogger(instanceId); this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); this.lastIdleTime = -1L; + this.lastCompletedTaskCount = 0L; // 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE initThreadPool(); @@ -239,8 +242,9 @@ public class ProcessorTracker { } // 判断线程池活跃状态,长时间空闲则上报 TaskTracker 请求检查 - if (threadPool.getActiveCount() > 0) { + if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) { lastIdleTime = -1; + lastCompletedTaskCount = threadPool.getCompletedTaskCount(); }else { if (lastIdleTime == -1) { lastIdleTime = System.currentTimeMillis(); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index 55c38634..15827976 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -446,7 +446,7 @@ public abstract class TaskTracker { // 3. 避免大查询,分批派发任务 long currentDispatchNum = 0; - long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2; + long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L; AtomicInteger index = new AtomicInteger(0); // 4. 循环查询数据库,获取需要派发的任务 From ead9f08e5285ff3ee8bc9b5c0a181662637ad0f5 Mon Sep 17 00:00:00 2001 From: ocean23 <43363687@qq.com> Date: Sat, 2 Jan 2021 12:38:19 +0800 Subject: [PATCH 16/24] =?UTF-8?q?=E5=9B=9E=E6=BB=9AGridFsManager=E7=9A=84?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=8A=8A=E9=94=81=E5=8A=A0=E5=9C=A8CleanServ?= =?UTF-8?q?ice=E4=B8=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../persistence/mongodb/GridFsManager.java | 18 +----------------- .../server/service/timing/CleanService.java | 12 ++++++++++++ 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/mongodb/GridFsManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/mongodb/GridFsManager.java index da4f8449..71d7d8b8 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/mongodb/GridFsManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/mongodb/GridFsManager.java @@ -1,7 +1,6 @@ package com.github.kfcfans.powerjob.server.persistence.mongodb; import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey; -import com.github.kfcfans.powerjob.server.service.lock.LockService; import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; import com.mongodb.client.MongoDatabase; @@ -39,8 +38,6 @@ public class GridFsManager implements InitializingBean { @Resource private Environment environment; - @Resource - private LockService lockService; private MongoDatabase db; private boolean available; @@ -110,22 +107,9 @@ public class GridFsManager implements InitializingBean { * @param day 日期偏移量,单位 天 */ public void deleteBefore(String bucketName, int day) { - String deleteFsLock = "deleteFsLock"; - // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了 - boolean lock = lockService.lock(deleteFsLock, 10 * 60 * 1000); - if (!lock) { - log.info("[GridFsManager] deleted task is running, it's ok to return."); - return; - } - try{ - deleteHistoryFile(bucketName, day); - }finally { - lockService.unlock(deleteFsLock); - } - } - private void deleteHistoryFile(String bucketName, int day) { Stopwatch sw = Stopwatch.createStarted(); + Date date = DateUtils.addDays(new Date(), -day); GridFSBucket bucket = getBucket(bucketName); Bson filter = Filters.lt("uploadDate", date); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index f180ce3d..837917fe 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -7,6 +7,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; +import com.github.kfcfans.powerjob.server.service.lock.LockService; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import lombok.extern.slf4j.Slf4j; @@ -36,6 +37,8 @@ public class CleanService { private InstanceInfoRepository instanceInfoRepository; @Resource private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; + @Resource + private LockService lockService; @Value("${oms.instanceinfo.retention}") private int instanceInfoRetentionDay; @@ -113,11 +116,20 @@ public class CleanService { return; } if (gridFsManager.available()) { + String deleteFsLock = "deleteFsLock"; + // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了 + boolean lock = lockService.lock(deleteFsLock, 10 * 60 * 1000); + if (!lock) { + log.info("[GridFsManager] deleted task is running, it's ok to return."); + return; + } Stopwatch stopwatch = Stopwatch.createStarted(); try { gridFsManager.deleteBefore(bucketName, day); }catch (Exception e) { log.warn("[CleanService] clean remote bucket({}) failed.", bucketName, e); + }finally { + lockService.unlock(deleteFsLock); } log.info("[CleanService] clean remote bucket({}) successfully, using {}.", bucketName, stopwatch.stop()); } From 05181c34ecca4fde1976442b30440bff7105de2c Mon Sep 17 00:00:00 2001 From: ocean23 <43363687@qq.com> Date: Sat, 2 Jan 2021 12:39:36 +0800 Subject: [PATCH 17/24] =?UTF-8?q?=E5=9B=9E=E6=BB=9AGridFsManager=E7=9A=84?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=8A=8A=E9=94=81=E5=8A=A0=E5=9C=A8CleanServ?= =?UTF-8?q?ice=E4=B8=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kfcfans/powerjob/server/service/timing/CleanService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index 837917fe..889fb207 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -120,7 +120,7 @@ public class CleanService { // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了 boolean lock = lockService.lock(deleteFsLock, 10 * 60 * 1000); if (!lock) { - log.info("[GridFsManager] deleted task is running, it's ok to return."); + log.info("[GridFsManager] deleted task is running, just return."); return; } Stopwatch stopwatch = Stopwatch.createStarted(); From 9a661aa177745442e97c4280dc5d18a093ac1c03 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 2 Jan 2021 13:14:25 +0800 Subject: [PATCH 18/24] fix: DatabaseLock can't unlock when timeout --- .../service/lock/DatabaseLockService.java | 29 ++++++------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/DatabaseLockService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/DatabaseLockService.java index 19708f67..22f3d0ee 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/DatabaseLockService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/lock/DatabaseLockService.java @@ -4,14 +4,11 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.persistence.core.model.OmsLockDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.OmsLockRepository; -import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Service; import javax.annotation.Resource; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; /** * 基于数据库实现的分布式锁 @@ -26,35 +23,27 @@ public class DatabaseLockService implements LockService { @Resource private OmsLockRepository omsLockRepository; - private Map lockName2FailedTimes = Maps.newConcurrentMap(); - private static final int MAX_FAILED_NUM = 5; - @Override public boolean lock(String name, long maxLockTime) { - AtomicInteger failedCount = lockName2FailedTimes.computeIfAbsent(name, ignore -> new AtomicInteger(0)); OmsLockDO newLock = new OmsLockDO(name, NetUtils.getLocalHost(), maxLockTime); try { omsLockRepository.saveAndFlush(newLock); - failedCount.set(0); return true; - }catch (DataIntegrityViolationException ignore) { - }catch (Exception e) { + } catch (DataIntegrityViolationException ignore) { + } catch (Exception e) { log.warn("[DatabaseLockService] write lock to database failed, lockName = {}.", name, e); } - // 连续失败一段时间,需要判断是否为锁释放失败的情况 - if (failedCount.incrementAndGet() > MAX_FAILED_NUM) { + OmsLockDO omsLockDO = omsLockRepository.findByLockName(name); + long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime(); - OmsLockDO omsLockDO = omsLockRepository.findByLockName(name); - long lockedMillions = System.currentTimeMillis() - omsLockDO.getGmtCreate().getTime(); - if (lockedMillions > omsLockDO.getMaxLockTime()) { + // 锁超时,强制释放锁并重新尝试获取 + if (lockedMillions > omsLockDO.getMaxLockTime()) { - log.warn("[DatabaseLockService] The lock({}) already timeout, will be deleted now.", omsLockDO); - unlock(name); - } else { - failedCount.set(0); - } + log.warn("[DatabaseLockService] The lock[{}] already timeout, will be unlocked now.", omsLockDO); + unlock(name); + return lock(name, maxLockTime); } return false; } From 0daa097d20e8a657f8b3c2b9056a5989453ac893 Mon Sep 17 00:00:00 2001 From: ocean23 <43363687@qq.com> Date: Sat, 2 Jan 2021 13:28:55 +0800 Subject: [PATCH 19/24] =?UTF-8?q?=E6=A0=B9=E6=8D=AE=E6=84=8F=E8=A7=81?= =?UTF-8?q?=E9=87=8D=E6=9E=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/service/timing/CleanService.java | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index 889fb207..274a5f59 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -54,6 +54,8 @@ public class CleanService { // 每天凌晨3点定时清理 private static final String CLEAN_TIME_EXPRESSION = "0 0 3 * * ?"; + private static final String HISTORY_DELETE_LOCK = "history_delete"; + @Async("omsTimingPool") @Scheduled(cron = CLEAN_TIME_EXPRESSION) @@ -62,18 +64,35 @@ public class CleanService { // 释放本地缓存 WorkerManagerService.cleanUp(); - // 删除数据库运行记录 - cleanInstanceLog(); - cleanWorkflowInstanceLog(); - // 释放磁盘空间 cleanLocal(OmsFileUtils.genLogDirPath(), instanceInfoRetentionDay); cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay); cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY); - // 删除 GridFS 过期文件 - cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay); - cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay); + // 删除数据库历史的数据 + cleanOneServer(); + } + + /** + * 只能一台server清理的操作统一到这里执行 + */ + private void cleanOneServer() { + // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了 + boolean lock = lockService.lock(HISTORY_DELETE_LOCK, 10 * 60 * 1000); + if (!lock) { + log.info("[GridFsManager] deleted task is running, just return."); + return; + } + try { + // 删除数据库运行记录 + cleanInstanceLog(); + cleanWorkflowInstanceLog(); + // 删除 GridFS 过期文件 + cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay); + cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay); + }finally { + lockService.unlock(HISTORY_DELETE_LOCK); + } } @VisibleForTesting @@ -116,20 +135,11 @@ public class CleanService { return; } if (gridFsManager.available()) { - String deleteFsLock = "deleteFsLock"; - // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了 - boolean lock = lockService.lock(deleteFsLock, 10 * 60 * 1000); - if (!lock) { - log.info("[GridFsManager] deleted task is running, just return."); - return; - } Stopwatch stopwatch = Stopwatch.createStarted(); try { gridFsManager.deleteBefore(bucketName, day); }catch (Exception e) { log.warn("[CleanService] clean remote bucket({}) failed.", bucketName, e); - }finally { - lockService.unlock(deleteFsLock); } log.info("[CleanService] clean remote bucket({}) successfully, using {}.", bucketName, stopwatch.stop()); } From 3776d4ad843585178926ae2748d4583b8cead1b8 Mon Sep 17 00:00:00 2001 From: ocean23 <43363687@qq.com> Date: Sat, 2 Jan 2021 13:30:46 +0800 Subject: [PATCH 20/24] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kfcfans/powerjob/server/service/timing/CleanService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index 274a5f59..71f80475 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -80,7 +80,7 @@ public class CleanService { // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了 boolean lock = lockService.lock(HISTORY_DELETE_LOCK, 10 * 60 * 1000); if (!lock) { - log.info("[GridFsManager] deleted task is running, just return."); + log.info("task is already running, just return."); return; } try { From 6a85995937f0f66f9908cbc89c3571aa01d9231b Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 2 Jan 2021 19:52:22 +0800 Subject: [PATCH 21/24] refactor: optimize thread pool config --- .../powerjob/server/common/config/ThreadPoolConfig.java | 4 ++-- .../server/service/timing/InstanceStatusCheckService.java | 4 ++-- .../server/service/timing/schedule/OmsScheduleService.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java index 5762f0ce..6ec3f08f 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/ThreadPoolConfig.java @@ -28,8 +28,8 @@ public class ThreadPoolConfig { @Bean("omsTimingPool") public Executor getTimingPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 16); - executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 32); + executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); + executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); // use SynchronousQueue executor.setQueueCapacity(0); executor.setKeepAliveSeconds(60); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java index ddea791e..8ba71733 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java @@ -60,7 +60,7 @@ public class InstanceStatusCheckService { private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; @Async("omsTimingPool") - @Scheduled(fixedRate = 10000) + @Scheduled(fixedDelay = 10000) public void timingStatusCheck() { Stopwatch stopwatch = Stopwatch.createStarted(); @@ -115,7 +115,7 @@ public class InstanceStatusCheckService { threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS; List waitingWorkerReceiveInstances = instanceInfoRepository.findByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold); if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) { - log.warn("[InstanceStatusChecker] instances({}) didn't receive any reply from worker.", waitingWorkerReceiveInstances); + log.warn("[InstanceStatusChecker] find one instance didn't receive any reply from worker, try to redispatch: {}", waitingWorkerReceiveInstances); waitingWorkerReceiveInstances.forEach(instance -> { // 重新派发 JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index 144af178..c8a4e383 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -133,7 +133,7 @@ public class OmsScheduleService { // 1. 批量写日志表 Map jobId2InstanceId = Maps.newHashMap(); - log.info("[CronScheduler] These cron jobs will be scheduled: {}.", jobInfos); + log.info("[CronScheduler] These cron jobs will be scheduled: {}.", jobInfos); jobInfos.forEach(jobInfo -> { Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), null, null, jobInfo.getNextTriggerTime()); From 52ea3fb80d4d0acf0771058e751dafaa51b10320 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 2 Jan 2021 20:06:18 +0800 Subject: [PATCH 22/24] fix: the bug of concurrent clean the same thing #144 --- .../powerjob/server/service/timing/CleanService.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index 71f80475..e8eaf305 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -54,7 +54,7 @@ public class CleanService { // 每天凌晨3点定时清理 private static final String CLEAN_TIME_EXPRESSION = "0 0 3 * * ?"; - private static final String HISTORY_DELETE_LOCK = "history_delete"; + private static final String HISTORY_DELETE_LOCK = "history_delete_lock"; @Async("omsTimingPool") @@ -70,17 +70,17 @@ public class CleanService { cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY); // 删除数据库历史的数据 - cleanOneServer(); + cleanByOneServer(); } /** * 只能一台server清理的操作统一到这里执行 */ - private void cleanOneServer() { + private void cleanByOneServer() { // 只要第一个server抢到锁其他server就会返回,所以锁10分钟应该足够了 boolean lock = lockService.lock(HISTORY_DELETE_LOCK, 10 * 60 * 1000); if (!lock) { - log.info("task is already running, just return."); + log.info("[CleanService] clean job is already running, just return."); return; } try { @@ -90,7 +90,7 @@ public class CleanService { // 删除 GridFS 过期文件 cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay); cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay); - }finally { + } finally { lockService.unlock(HISTORY_DELETE_LOCK); } } @@ -113,7 +113,7 @@ public class CleanService { } // 计算最大偏移量 - long maxOffset = day * 24 * 60 * 60 * 1000; + long maxOffset = day * 24 * 60 * 60 * 1000L; for (File f : logFiles) { long offset = System.currentTimeMillis() - f.lastModified(); From dfd2106a3f142b8c09815f6e9ff7a5947d09dda4 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 2 Jan 2021 20:10:04 +0800 Subject: [PATCH 23/24] feat: Swagger API version keep up-to-date with POM version #139 --- .../kfcfans/powerjob/server/common/config/SwaggerConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java index f4411d09..68878f1e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/config/SwaggerConfig.java @@ -32,7 +32,7 @@ public class SwaggerConfig { @Bean public Docket createRestApi() { - String version = "Unknown"; + String version = "unknown"; if (buildProperties != null) { String pomVersion = buildProperties.getVersion(); if (StringUtils.isNotBlank(pomVersion)) { From 6c3c6695e4fa8d38064aa9c2147cdd95800dc47a Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 2 Jan 2021 20:15:09 +0800 Subject: [PATCH 24/24] refactor: change version to 3.4.2 and ready to release --- powerjob-client/pom.xml | 4 ++-- powerjob-common/pom.xml | 2 +- powerjob-server/pom.xml | 4 ++-- powerjob-worker-agent/pom.xml | 4 ++-- powerjob-worker-samples/pom.xml | 4 ++-- powerjob-worker-spring-boot-starter/pom.xml | 4 ++-- powerjob-worker/pom.xml | 4 ++-- 7 files changed, 13 insertions(+), 13 deletions(-) diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index d49a6a17..8a8a6621 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-client - 3.4.1 + 3.4.2 jar 5.6.1 1.2.68 - 3.4.1 + 3.4.2 3.2.4 diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index e92f4a92..b1e2d8e3 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.4.1 + 3.4.2 jar diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index ad6e673e..83885a05 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.4.1 + 3.4.2 jar 2.9.2 2.3.4.RELEASE - 3.4.1 + 3.4.2 8.0.19 19.7.0.0 diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index 817bffdb..94ea9386 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker-agent - 3.4.1 + 3.4.2 jar - 3.4.1 + 3.4.2 1.2.3 4.3.2 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index cdacbb5e..6db5e26f 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-samples - 3.4.1 + 3.4.2 2.2.6.RELEASE - 3.4.1 + 3.4.2 1.2.68 diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index dfddbb77..8a6f7749 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-spring-boot-starter - 3.4.1 + 3.4.2 jar - 3.4.1 + 3.4.2 2.2.6.RELEASE diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index 9a73566b..e3dc356d 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker - 3.4.1 + 3.4.2 jar 5.2.4.RELEASE - 3.4.1 + 3.4.2 1.4.200 3.4.2 5.6.1