chore: merge master

This commit is contained in:
tjq 2023-01-16 00:14:30 +08:00
commit 59121684a8
177 changed files with 3435 additions and 1787 deletions

View File

@ -69,11 +69,9 @@ CREATE TABLE `instance_info`
`gmt_create` datetime not NULL COMMENT '创建时间',
`gmt_modified` datetime not NULL COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx01_instance_info` (`job_id`),
KEY `idx02_instance_info` (`app_id`),
KEY `idx03_instance_info` (`instance_id`),
KEY `idx04_instance_info` (`wf_instance_id`),
KEY `idx05_instance_info` (`expected_trigger_time`)
KEY `idx01_instance_info` (`job_id`, 'status'),
KEY `idx02_instance_info` (`app_id`, `status`),
KEY `idx03_instance_info` (`instance_id`, `status`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4
@ -110,15 +108,13 @@ CREATE TABLE `job_info`
`task_retry_num` int not NULL default 0 COMMENT 'Task重试次数',
`time_expression` varchar(255) default NULL COMMENT '时间表达式,内容取决于time_expression_type,1:CRON/2:NULL/3:LONG/4:LONG',
`time_expression_type` int not NULL COMMENT '时间表达式类型,1:CRON/2:API/3:FIX_RATE/4:FIX_DELAY,5:WORKFLOW\n',
`tag` varchar(255) DEFAULT NULL COMMENT 'TAG',
`log_config` varchar(255) DEFAULT NULL COMMENT '日志配置',
`tag` varchar(255) DEFAULT NULL COMMENT 'TAG',
`log_config` varchar(255) DEFAULT NULL COMMENT '日志配置',
`extra` varchar(255) DEFAULT NULL COMMENT '扩展字段',
`gmt_create` datetime not NULL COMMENT '创建时间',
`gmt_modified` datetime not NULL COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx01_job_info` (`app_id`),
KEY `idx02_job_info` (`job_name`),
KEY `idx03_job_info` (`next_trigger_time`)
KEY `idx01_job_info` (`app_id`, `status`, `time_expression_type`, `next_trigger_time`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4
@ -154,7 +150,8 @@ CREATE TABLE `server_info`
`gmt_modified` datetime DEFAULT NULL COMMENT '更新时间',
`ip` varchar(128) DEFAULT NULL COMMENT '服务器IP地址',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx01_server_info` (`ip`)
UNIQUE KEY `uidx01_server_info` (`ip`),
KEY `idx01_server_info` (`gmt_modified`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4
@ -204,7 +201,7 @@ CREATE TABLE `workflow_info`
`gmt_create` datetime DEFAULT NULL COMMENT '创建时间',
`gmt_modified` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx01_workflow_info` (`app_id`)
KEY `idx01_workflow_info` (`app_id`, `status`, `time_expression_type`, next_trigger_time)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4
@ -231,9 +228,9 @@ CREATE TABLE `workflow_instance_info`
`gmt_create` datetime DEFAULT NULL COMMENT '创建时间',
`gmt_modified` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`),
unique index uidx01_wf_instance (wf_instance_id),
index idx01_wf_instance (workflow_id),
index idx02_wf_instance (app_id, status)
unique index uidx01_wf_instance (`wf_instance_id`),
index idx01_wf_instance (`workflow_id`, `status`),
index idx02_wf_instance (`app_id`, `status`, `expected_trigger_time`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4
@ -258,9 +255,7 @@ CREATE TABLE `workflow_node_info`
`type` int DEFAULT NULL COMMENT '节点类型,1:任务JOB',
`workflow_id` bigint DEFAULT NULL COMMENT '工作流ID',
PRIMARY KEY (`id`),
KEY `idx01_workflow_node_info` (`app_id`),
KEY `idx02_workflow_node_info` (`workflow_id`),
KEY `idx03_workflow_node_info` (`job_id`)
KEY `idx01_workflow_node_info` (`workflow_id`,`gmt_create`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>4.2.0</version>
<version>4.2.1</version>
<packaging>jar</packaging>
<properties>
<junit.version>5.6.1</junit.version>
<junit.version>5.9.1</junit.version>
<fastjson.version>1.2.83</fastjson.version>
<powerjob.common.version>4.2.0</powerjob.common.version>
<powerjob.common.version>4.2.1</powerjob.common.version>
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
</properties>

View File

@ -10,18 +10,18 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>4.2.0</version>
<version>4.2.1</version>
<packaging>jar</packaging>
<properties>
<slf4j.version>1.7.30</slf4j.version>
<slf4j.version>1.7.36</slf4j.version>
<commons.lang.version>3.12.0</commons.lang.version>
<commons.io.version>2.7</commons.io.version>
<commons.io.version>2.11.0</commons.io.version>
<guava.version>31.1-jre</guava.version>
<okhttp.version>3.14.9</okhttp.version>
<akka.version>2.6.20</akka.version>
<kryo.version>5.0.4</kryo.version>
<jackson.version>2.12.2</jackson.version>
<kryo.version>5.3.0</kryo.version>
<jackson.version>2.14.0-rc1</jackson.version>
<junit.version>5.9.0</junit.version>
</properties>

View File

@ -35,6 +35,12 @@ public class PowerJobDKey {
public static final String TRANSPORTER_KEEP_ALIVE_TIMEOUT = "powerjob.transporter.keepalive.timeout";
public static final String WORKER_STATUS_CHECK_PERIOD = "powerjob.worker.status-check.normal.period";
/**
* allowed PowerJob to invoke Thread#stop to kill a thread when PowerJob can't interrupt the thread
* <a href="https://stackoverflow.com/questions/16504140/thread-stop-deprecated">It's VERY dangerous</a>
*/
public static final String WORKER_ALLOWED_FORCE_STOP_THREAD = "powerjob.worker.allowed-force-stop-thread";
/**
* ms
*/

View File

@ -26,6 +26,23 @@ public class SystemInstanceResult {
* 任务执行超时
*/
public static final String INSTANCE_EXECUTE_TIMEOUT = "instance execute timeout";
/**
* 任务执行超时成功打断任务
*/
public static final String INSTANCE_EXECUTE_TIMEOUT_INTERRUPTED = "instance execute timeout,interrupted success";
/**
* 任务执行超时强制终止任务
*/
public static final String INSTANCE_EXECUTE_TIMEOUT_FORCE_STOP= "instance execute timeout,force stop success";
/**
* 用户手动停止任务成功打断任务
*/
public static final String USER_STOP_INSTANCE_INTERRUPTED= "user stop instance,interrupted success";
/**
* 用户手动停止任务被系统强制终止
*/
public static final String USER_STOP_INSTANCE_FORCE_STOP= "user stop instance,force stop success";
/**
* 创建根任务失败
*/

View File

@ -26,8 +26,8 @@ public enum ExecuteType {
MAP_REDUCE(3, "MapReduce"),
MAP(4, "Map");
int v;
String des;
private final int v;
private final String des;
public static ExecuteType of(int v) {
for (ExecuteType type : values()) {

View File

@ -5,6 +5,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import java.util.Collections;
import java.util.List;
/**
@ -24,13 +25,13 @@ public enum TimeExpressionType {
FIXED_DELAY(4),
WORKFLOW(5);
int v;
private final int v;
public static final List<Integer> FREQUENT_TYPES = Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v);
public static final List<Integer> FREQUENT_TYPES = Collections.unmodifiableList(Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v));
/**
* 首次计算触发时间时必须计算出一个有效值
*/
public static final List<Integer> INSPECT_TYPES = Lists.newArrayList(CRON.v);
public static final List<Integer> INSPECT_TYPES = Collections.unmodifiableList(Lists.newArrayList(CRON.v));
public static TimeExpressionType of(int v) {
for (TimeExpressionType type : values()) {

View File

@ -4,6 +4,7 @@ import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Collections;
import java.util.List;
/**
@ -27,11 +28,11 @@ public enum WorkflowInstanceStatus {
/**
* 广义的运行状态
*/
public static final List<Integer> GENERALIZED_RUNNING_STATUS = Lists.newArrayList(WAITING.v, RUNNING.v);
public static final List<Integer> GENERALIZED_RUNNING_STATUS = Collections.unmodifiableList(Lists.newArrayList(WAITING.v, RUNNING.v));
/**
* 结束状态
*/
public static final List<Integer> FINISHED_STATUS = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v);
public static final List<Integer> FINISHED_STATUS = Collections.unmodifiableList(Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v));
private final int v;

View File

@ -46,6 +46,8 @@ public class TaskTrackerReportInstanceStatusReq implements PowerSerializable {
private long startTime;
private Long endTime;
private long reportTime;
private String sourceAddress;

View File

@ -17,26 +17,55 @@ import java.util.List;
@Data
public class WorkerHeartbeat implements PowerSerializable {
// 本机地址 -> IP:port
/**
* 本机地址 -> IP:port
*/
private String workerAddress;
// 当前 appName
/**
* 当前 appName
*/
private String appName;
// 当前 appId
/**
* 当前 appId
*/
private Long appId;
// 当前时间
/**
* 当前时间
*/
private long heartbeatTime;
// 当前加载的容器容器名称 -> 容器版本
/**
* 当前加载的容器容器名称 -> 容器版本
*/
private List<DeployedContainerInfo> containerInfos;
// worker 版本信息
/**
* worker 版本信息
*/
private String version;
// 使用的通讯协议 AKKA / HTTP
/**
* 使用的通讯协议 AKKA / HTTP
*/
private String protocol;
// worker tag标识同一个 worker 下的一类集群 ISSUE: 226
/**
* worker tag标识同一个 worker 下的一类集群 ISSUE: 226
*/
private String tag;
// 客户端名称
/**
* 客户端名称
*/
private String client;
// 扩展字段
/**
* 扩展字段
*/
private String extra;
/**
* 是否已经超载超载的情况下 Server 一段时间内不会再向其派发任务
*/
private boolean isOverload;
private int lightTaskTrackerNum;
private int heavyTaskTrackerNum;
private SystemMetrics systemMetrics;
}

View File

@ -4,40 +4,44 @@ import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.exception.PowerJobException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.io.IOException;
/**
* JSON工具类
*
* @author tjq
* @since 2020/4/16
*/
@Slf4j
public class JsonUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
static {
objectMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
OBJECT_MAPPER.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
//
OBJECT_MAPPER.configure(JsonParser.Feature.IGNORE_UNDEFINED, true);
}
private JsonUtils(){
}
public static String toJSONString(Object obj) {
if (obj instanceof String) {
return (String) obj;
}
try {
return objectMapper.writeValueAsString(obj);
return OBJECT_MAPPER.writeValueAsString(obj);
}catch (Exception ignore) {
}
return null;
}
public static String toJSONStringUnsafe(Object obj) {
if (obj instanceof String) {
return (String) obj;
}
try {
return objectMapper.writeValueAsString(obj);
return OBJECT_MAPPER.writeValueAsString(obj);
}catch (Exception e) {
throw new PowerJobException(e);
}
@ -45,27 +49,41 @@ public class JsonUtils {
public static byte[] toBytes(Object obj) {
try {
return objectMapper.writeValueAsBytes(obj);
return OBJECT_MAPPER.writeValueAsBytes(obj);
}catch (Exception ignore) {
}
return null;
}
public static <T> T parseObject(String json, Class<T> clz) throws JsonProcessingException {
return objectMapper.readValue(json, clz);
return OBJECT_MAPPER.readValue(json, clz);
}
public static <T> T parseObject(byte[] b, Class<T> clz) throws Exception {
return objectMapper.readValue(b, clz);
public static <T> T parseObject(byte[] b, Class<T> clz) throws IOException {
return OBJECT_MAPPER.readValue(b, clz);
}
public static <T> T parseObject(byte[] b, TypeReference<T> typeReference) throws Exception {
return objectMapper.readValue(b, typeReference);
public static <T> T parseObject(byte[] b, TypeReference<T> typeReference) throws IOException {
return OBJECT_MAPPER.readValue(b, typeReference);
}
public static <T> T parseObject(String json, TypeReference<T> typeReference) throws IOException {
return OBJECT_MAPPER.readValue(json, typeReference);
}
public static <T> T parseObjectIgnoreException(String json, Class<T> clz) {
try {
return OBJECT_MAPPER.readValue(json, clz);
}catch (Exception e) {
log.error("unable to parse json string to object,current string:{}",json,e);
return null;
}
}
public static <T> T parseObjectUnsafe(String json, Class<T> clz) {
try {
return objectMapper.readValue(json, clz);
return OBJECT_MAPPER.readValue(json, clz);
}catch (Exception e) {
ExceptionUtils.rethrow(e);
}

View File

@ -0,0 +1,57 @@
package tech.powerjob.common.utils;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.IOException;
import java.net.JarURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.security.CodeSource;
import java.util.jar.Attributes;
import java.util.jar.JarFile;
/**
* Java 语言相关的工具
*
* @author tjq
* @since 2022/10/23
*/
@Slf4j
public class JavaUtils {
/**
* 获取类所在 Jar 包的版本
* @param clz
* @return 包版本
*/
public static String determinePackageVersion(Class<?> clz) {
try {
String implementationVersion = clz.getPackage().getImplementationVersion();
if (implementationVersion != null) {
return implementationVersion;
}
CodeSource codeSource = clz.getProtectionDomain().getCodeSource();
if (codeSource == null) {
return null;
}
URL codeSourceLocation = codeSource.getLocation();
URLConnection connection = codeSourceLocation.openConnection();
if (connection instanceof JarURLConnection) {
return getImplementationVersion(((JarURLConnection) connection).getJarFile());
}
try (JarFile jarFile = new JarFile(new File(codeSourceLocation.toURI()))) {
return getImplementationVersion(jarFile);
}
}
catch (Throwable t) {
log.warn("[JavaUtils] determinePackageVersion for clz[{}] failed, msg: {}", clz.getSimpleName(), t.toString());
}
return null;
}
private static String getImplementationVersion(JarFile jarFile) throws IOException {
return jarFile.getManifest().getMainAttributes().getValue(Attributes.Name.IMPLEMENTATION_VERSION);
}
}

View File

@ -0,0 +1,25 @@
package tech.powerjob.common.utils;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.*;
/**
* Java 语言相关的工具测试
*
* @author tjq
* @since 2022/10/23
*/
@Slf4j
class JavaUtilsTest {
@Test
void determinePackageVersion() {
String packageVersion = JavaUtils.determinePackageVersion(LoggerFactory.class);
log.info("[determinePackageVersion] LoggerFactory's package version: {}", packageVersion);
}
}

View File

@ -18,18 +18,18 @@
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
<!-- 不会被打包的部分scope 只能是 test 或 provide -->
<junit.version>5.6.1</junit.version>
<logback.version>1.2.3</logback.version>
<powerjob.worker.version>4.2.0</powerjob.worker.version>
<junit.version>5.9.1</junit.version>
<logback.version>1.2.9</logback.version>
<powerjob.worker.version>4.2.1</powerjob.worker.version>
<spring.jdbc.version>5.2.9.RELEASE</spring.jdbc.version>
<h2.db.version>1.4.200</h2.db.version>
<h2.db.version>2.1.214</h2.db.version>
<mysql.version>8.0.28</mysql.version>
<!-- 全部 shade 化,避免依赖冲突 -->
<fastjson.version>1.2.83</fastjson.version>
<okhttp.version>3.14.9</okhttp.version>
<guava.version>30.1.1-jre</guava.version>
<commons.io.version>2.6</commons.io.version>
<commons.io.version>2.11.0</commons.io.version>
<commons.lang.version>3.10</commons.lang.version>
</properties>

View File

@ -4,7 +4,7 @@ import tech.powerjob.worker.core.processor.TaskContext;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import javax.sql.DataSource;
import java.sql.Connection;

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>4.2.0</version>
<version>4.2.1</version>
<packaging>pom</packaging>
<modules>
@ -28,19 +28,19 @@
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.7.4</springboot.version>
<powerjob.common.version>4.2.0</powerjob.common.version>
<powerjob.common.version>4.2.1</powerjob.common.version>
<!-- MySQL version that corresponds to spring-boot-dependencies version. -->
<mysql.version>8.0.28</mysql.version>
<mysql.version>8.0.30</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version>
<mssql-jdbc.version>7.4.1.jre8</mssql-jdbc.version>
<db2-jdbc.version>11.5.0.0</db2-jdbc.version>
<postgresql.version>42.2.14</postgresql.version>
<h2.db.version>1.4.200</h2.db.version>
<h2.db.version>2.1.214</h2.db.version>
<zip4j.version>2.5.2</zip4j.version>
<zip4j.version>2.11.2</zip4j.version>
<jgit.version>5.7.0.202003110725-r</jgit.version>
<mvn.invoker.version>3.0.1</mvn.invoker.version>
<commons.net.version>3.6</commons.net.version>
<commons.net.version>3.8.0</commons.net.version>
<fastjson.version>1.2.83</fastjson.version>
<dingding.version>1.0.1</dingding.version>
<vertx-web.version>4.0.2</vertx-web.version>
@ -48,6 +48,8 @@
<!-- skip this module when deploying. -->
<maven.deploy.skip>true</maven.deploy.skip>
<groovy.version>3.0.10</groovy.version>
<cron-utils.version>9.1.6</cron-utils.version>
</properties>
<dependencyManagement>
@ -257,7 +259,7 @@
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
<version>9.1.6</version>
<version>${cron-utils.version}</version>
</dependency>
<!-- swagger2 -->
@ -277,13 +279,13 @@
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-jsr223</artifactId>
<version>3.0.10</version>
<version>${groovy.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.codehaus.groovy/groovy-json -->
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-json</artifactId>
<version>3.0.10</version>
<version>${groovy.version}</version>
</dependency>
</dependencies>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.2.0</version>
<version>4.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -0,0 +1,23 @@
package tech.powerjob.server.common;
/**
* @author Echo009
* @since 2022/10/2
*/
public class Holder<T> {
private T value;
public Holder(T value) {
this.value = value;
}
public T get() {
return value;
}
public void set(T value) {
this.value = value;
}
}

View File

@ -1,9 +1,10 @@
package tech.powerjob.server.common.module;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.model.DeployedContainerInfo;
import tech.powerjob.common.model.SystemMetrics;
import tech.powerjob.common.request.WorkerHeartbeat;
import lombok.Data;
import java.util.List;
@ -14,6 +15,7 @@ import java.util.List;
* @since 2021/2/7
*/
@Data
@Slf4j
public class WorkerInfo {
private String address;
@ -26,6 +28,14 @@ public class WorkerInfo {
private String tag;
private int lightTaskTrackerNum;
private int heavyTaskTrackerNum;
private long lastOverloadTime;
private boolean overloading;
private SystemMetrics systemMetrics;
private List<DeployedContainerInfo> containerInfos;
@ -40,10 +50,25 @@ public class WorkerInfo {
tag = workerHeartbeat.getTag();
systemMetrics = workerHeartbeat.getSystemMetrics();
containerInfos = workerHeartbeat.getContainerInfos();
lightTaskTrackerNum = workerHeartbeat.getLightTaskTrackerNum();
heavyTaskTrackerNum = workerHeartbeat.getHeavyTaskTrackerNum();
if (workerHeartbeat.isOverload()) {
overloading = true;
lastOverloadTime = workerHeartbeat.getHeartbeatTime();
log.warn("[WorkerInfo] worker {} is overload!", getAddress());
} else {
overloading = false;
}
}
public boolean timeout() {
long timeout = System.currentTimeMillis() - lastActiveTime;
return timeout > WORKER_TIMEOUT_MS;
}
public boolean overload() {
return overloading;
}
}

View File

@ -0,0 +1,33 @@
package tech.powerjob.server.common.thread;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author Echo009
* @since 2022/10/12
*/
@Slf4j
public class NewThreadRunRejectedExecutionHandler implements RejectedExecutionHandler {
private static final AtomicLong COUNTER = new AtomicLong();
private final String source;
public NewThreadRunRejectedExecutionHandler(String source) {
this.source = source;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor p) {
log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!, Maybe you need to adjust the ThreadPool config!", source, p, r);
if (!p.isShutdown()) {
String threadName = source + "-T-" + COUNTER.getAndIncrement();
log.info("[{}] create new thread[{}] to run job", source, threadName);
new Thread(r, threadName).start();
}
}
}

View File

@ -65,9 +65,11 @@ public class HashedWheelTimer implements Timer {
taskProcessPool = null;
}else {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(16);
// 这里需要调整一下队列大小
BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(8192);
int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
taskProcessPool = new ThreadPoolExecutor(core, 4 * core,
// 基本都是 io 密集型任务
taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
60, TimeUnit.SECONDS,
queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool"));
}

View File

@ -11,7 +11,9 @@ import tech.powerjob.server.common.timewheel.Timer;
*/
public class HashedWheelTimerHolder {
// 非精确时间轮 5S 走一格
/**
* 非精确时间轮 5S 走一格
*/
public static final Timer INACCURATE_TIMER = new HashedWheelTimer(5000, 16, 0);
private HashedWheelTimerHolder() {

View File

@ -19,14 +19,22 @@ public class InstanceTimeWheelService {
private static final Map<Long, TimerFuture> CARGO = Maps.newConcurrentMap();
// 精确调度时间轮 1MS 走一格
/**
* 精确调度时间轮 1MS 走一格
*/
private static final Timer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
// 非精确调度时间轮用于处理高延迟任务 10S 走一格
/**
* 非精确调度时间轮用于处理高延迟任务 10S 走一格
*/
private static final Timer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0);
// 支持取消的时间间隔低于该阈值则不会放进 CARGO
/**
* 支持取消的时间间隔低于该阈值则不会放进 CARGO
*/
private static final long MIN_INTERVAL_MS = 1000;
// 长延迟阈值
/**
* 长延迟阈值
*/
private static final long LONG_DELAY_THRESHOLD_MS = 60000;
/**

View File

@ -25,8 +25,8 @@ import java.lang.reflect.Method;
@Slf4j
public class AOPUtils {
private static final ExpressionParser parser = new SpelExpressionParser();
private static final ParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();
private static final ExpressionParser PARSER = new SpelExpressionParser();
private static final ParameterNameDiscoverer DISCOVERER = new LocalVariableTableParameterNameDiscoverer();
public static String parseRealClassName(JoinPoint joinPoint) {
return joinPoint.getSignature().getDeclaringType().getSimpleName();
@ -50,7 +50,7 @@ public class AOPUtils {
}
public static <T> T parseSpEl(Method method, Object[] arguments, String spEl, Class<T> clazz, T defaultResult) {
String[] params = discoverer.getParameterNames(method);
String[] params = DISCOVERER.getParameterNames(method);
assert params != null;
EvaluationContext context = new StandardEvaluationContext();
@ -58,7 +58,7 @@ public class AOPUtils {
context.setVariable(params[len], arguments[len]);
}
try {
Expression expression = parser.parseExpression(spEl);
Expression expression = PARSER.parseExpression(spEl);
return expression.getValue(context, clazz);
} catch (Exception e) {
log.error("[AOPUtils] parse SpEL failed for method[{}], please concat @tjq to fix the bug!", method.getName(), e);

View File

@ -19,9 +19,13 @@ import java.util.List;
@Slf4j
public class TimeUtils {
// NTP 授时服务器阿里云 -> 交大 -> 水果
/**
* NTP 授时服务器阿里云 -> 交大 -> 水果
*/
private static final List<String> NTP_SERVER_LIST = Lists.newArrayList("ntp.aliyun.com", "ntp.sjtu.edu.cn", "time1.apple.com");
// 最大误差 5S
/**
* 最大误差 5S
*/
private static final long MAX_OFFSET = 5000;
public static void check() throws TimeCheckException {

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.2.0</version>
<version>4.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -1,9 +1,18 @@
package tech.powerjob.server.core;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.*;
import tech.powerjob.common.request.ServerScheduleJobReq;
import tech.powerjob.server.common.Holder;
import tech.powerjob.server.common.module.WorkerInfo;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.instance.InstanceMetadataService;
import tech.powerjob.server.core.lock.UseCacheLock;
@ -12,18 +21,11 @@ import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.remote.transport.TransportService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import tech.powerjob.server.common.module.WorkerInfo;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.Optional;
import java.util.stream.Collectors;
import static tech.powerjob.common.enums.InstanceStatus.*;
@ -38,35 +40,39 @@ import static tech.powerjob.common.enums.InstanceStatus.*;
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DispatchService {
@Resource
private TransportService transportService;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
@Resource
private InstanceManager instanceManager;
@Resource
private InstanceMetadataService instanceMetadataService;
@Resource
private InstanceInfoRepository instanceInfoRepository;
private final TransportService transportService;
private final WorkerClusterQueryService workerClusterQueryService;
private final InstanceManager instanceManager;
private final InstanceMetadataService instanceMetadataService;
private final InstanceInfoRepository instanceInfoRepository;
/**
* 重新派发任务实例不考虑实例当前的状态
* 异步重新派发
*
* @param jobInfo 任务信息注意这里传入的任务信息有可能为
* @param instanceId 实例ID
* @param instanceId 实例 ID
*/
@UseCacheLock(type = "processJobInstance", key = "#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId", concurrencyLevel = 1024)
public void redispatch(JobInfoDO jobInfo, Long instanceId) {
InstanceInfoDO instance = instanceInfoRepository.findByInstanceId(instanceId);
@UseCacheLock(type = "processJobInstance", key = "#instanceId", concurrencyLevel = 1024)
public void redispatchAsync(Long instanceId, int originStatus) {
// 将状态重置为等待派发
instance.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
instance.setGmtModified(new Date());
instanceInfoRepository.saveAndFlush(instance);
dispatch(jobInfo, instanceId);
instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdAndOriginStatus(instanceId, originStatus, InstanceStatus.WAITING_DISPATCH.getV(), new Date());
}
/**
* 异步批量重新派发不加锁
*/
public void redispatchBatchAsyncLockFree(List<Long> instanceIdList, int originStatus) {
// 将状态重置为等待派发
instanceInfoRepository.updateStatusAndGmtModifiedByInstanceIdListAndOriginStatus(instanceIdList, originStatus, InstanceStatus.WAITING_DISPATCH.getV(), new Date());
}
/**
* 将任务从Server派发到WorkerTaskTracker
* 只会派发当前状态为等待派发的任务实例
@ -78,13 +84,16 @@ public class DispatchService {
* 迁移至 {@link InstanceManager#updateStatus} 中处理
* **************************************************
*
* @param jobInfo 任务的元信息
* @param instanceId 任务实例ID
* @param jobInfo 任务的元信息
* @param instanceId 任务实例ID
* @param instanceInfoOptional 任务实例信息可选
* @param overloadOptional 超载信息可选
*/
@UseCacheLock(type = "processJobInstance", key = "#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId", concurrencyLevel = 1024)
public void dispatch(JobInfoDO jobInfo, Long instanceId) {
public void dispatch(JobInfoDO jobInfo, Long instanceId, Optional<InstanceInfoDO> instanceInfoOptional, Optional<Holder<Boolean>> overloadOptional) {
// 允许从外部传入实例信息减少 io 次数
// 检查当前任务是否被取消
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
InstanceInfoDO instanceInfo = instanceInfoOptional.orElseGet(() -> instanceInfoRepository.findByInstanceId(instanceId));
Long jobId = instanceInfo.getJobId();
if (CANCELED.getV() == instanceInfo.getStatus()) {
log.info("[Dispatcher-{}|{}] cancel dispatch due to instance has been canceled", jobId, instanceId);
@ -125,7 +134,6 @@ public class DispatchService {
String result = String.format(SystemInstanceResult.TOO_MANY_INSTANCES, runningInstanceCount, maxInstanceNum);
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", jobId, instanceId, runningInstanceCount, maxInstanceNum);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), current, current, RemoteConstant.EMPTY_ADDRESS, result, now);
instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, result);
return;
}
@ -141,8 +149,15 @@ public class DispatchService {
instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE);
return;
}
// 判断是否超载在所有可用 worker 超载的情况下直接跳过当前任务
suitableWorkers = filterOverloadWorker(suitableWorkers);
if (suitableWorkers.isEmpty()) {
// 直接取消派发减少一次数据库 io
overloadOptional.ifPresent(booleanHolder -> booleanHolder.set(true));
log.warn("[Dispatcher-{}|{}] cancel to dispatch job due to all worker is overload", jobId, instanceId);
return;
}
List<String> workerIpList = suitableWorkers.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
// 构造任务调度请求
ServerScheduleJobReq req = constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList);
@ -154,12 +169,23 @@ public class DispatchService {
log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req);
// 修改状态
instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), current, taskTrackerAddress, now);
instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), current, taskTrackerAddress, now, instanceInfo.getStatus());
// 装载缓存
instanceMetadataService.loadJobInfo(instanceId, jobInfo);
}
private List<WorkerInfo> filterOverloadWorker(List<WorkerInfo> suitableWorkers) {
List<WorkerInfo> res = new ArrayList<>(suitableWorkers.size());
for (WorkerInfo suitableWorker : suitableWorkers) {
if (suitableWorker.overload()){
continue;
}
res.add(suitableWorker);
}
return res;
}
/**
* 构造任务调度请求
*/

View File

@ -43,7 +43,7 @@ import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
@ -84,7 +84,9 @@ public class ContainerService {
// 并发部署的机器数量
private static final int DEPLOY_BATCH_NUM = 50;
// 部署间隔
private static final long DEPLOY_MIN_INTERVAL = 10 * 60 * 1000;
private static final long DEPLOY_MIN_INTERVAL = 10 * 60 * 1000L;
// 最长部署时间
private static final long DEPLOY_MAX_COST_TIME = 10 * 60 * 1000L;
/**
* 保存容器
@ -208,14 +210,13 @@ public class ContainerService {
String deployLock = "containerDeployLock-" + containerId;
RemoteEndpoint.Async remote = session.getAsyncRemote();
// 最长部署时间10分钟
boolean lock = lockService.tryLock(deployLock, 10 * 60 * 1000);
boolean lock = lockService.tryLock(deployLock, DEPLOY_MAX_COST_TIME);
if (!lock) {
remote.sendText("SYSTEM: acquire deploy lock failed, maybe other user is deploying, please wait until the running deploy task finished.");
return;
}
try {
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(containerId);
if (!containerInfoOpt.isPresent()) {
remote.sendText("SYSTEM: can't find container by id: " + containerId);

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.core.handler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.BeanUtils;
@ -22,7 +23,6 @@ import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepositor
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import javax.annotation.Resource;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
@ -34,17 +34,18 @@ import java.util.stream.Collectors;
* @author tjq
* @since 2022/9/11
*/
@RequiredArgsConstructor
@Slf4j
public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
@Resource
protected MonitorService monitorService;
@Resource
protected Environment environment;
@Resource
protected ContainerInfoRepository containerInfoRepository;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
protected final MonitorService monitorService;
protected final Environment environment;
protected final ContainerInfoRepository containerInfoRepository;
private final WorkerClusterQueryService workerClusterQueryService;
protected abstract void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event);

View File

@ -1,9 +1,7 @@
package tech.powerjob.server.core.handler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* WorkerRequestHandlerHolder
@ -16,13 +14,14 @@ public class WorkerRequestHandlerHolder {
private static IWorkerRequestHandler workerRequestHandler;
public WorkerRequestHandlerHolder(IWorkerRequestHandler injectedWorkerRequestHandler) {
workerRequestHandler = injectedWorkerRequestHandler;
}
public static IWorkerRequestHandler fetchWorkerRequestHandler() {
if (workerRequestHandler == null){
throw new IllegalStateException("WorkerRequestHandlerHolder not initialized!");
}
return workerRequestHandler;
}
@Autowired
public void setWorkerRequestHandler(IWorkerRequestHandler workerRequestHandler) {
WorkerRequestHandlerHolder.workerRequestHandler = workerRequestHandler;
}
}

View File

@ -1,7 +1,7 @@
package tech.powerjob.server.core.handler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.enums.InstanceStatus;
@ -12,12 +12,14 @@ import tech.powerjob.common.response.AskResponse;
import tech.powerjob.server.core.instance.InstanceLogService;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.monitor.MonitorService;
import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent;
import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent;
import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent;
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import javax.annotation.Resource;
import java.util.Optional;
/**
@ -30,12 +32,19 @@ import java.util.Optional;
@Component
public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler {
@Resource
private InstanceManager instanceManager;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
@Resource
private InstanceLogService instanceLogService;
private final InstanceManager instanceManager;
private final WorkflowInstanceManager workflowInstanceManager;
private final InstanceLogService instanceLogService;
public WorkerRequestHandlerImpl(InstanceManager instanceManager, WorkflowInstanceManager workflowInstanceManager, InstanceLogService instanceLogService,
MonitorService monitorService, Environment environment, ContainerInfoRepository containerInfoRepository, WorkerClusterQueryService workerClusterQueryService) {
super(monitorService, environment, containerInfoRepository, workerClusterQueryService);
this.instanceManager = instanceManager;
this.workflowInstanceManager = workflowInstanceManager;
this.instanceLogService = instanceLogService;
}
@Override
protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) {

View File

@ -56,6 +56,7 @@ public class InstanceLogService {
@Resource
private InstanceMetadataService instanceMetadataService;
@Resource
private GridFsManager gridFsManager;
/**
@ -63,6 +64,7 @@ public class InstanceLogService {
*/
@Resource(name = "localTransactionTemplate")
private TransactionTemplate localTransactionTemplate;
@Resource
private LocalInstanceLogRepository localInstanceLogRepository;

View File

@ -1,9 +1,10 @@
package tech.powerjob.server.core.instance;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.enums.TimeExpressionType;
@ -39,22 +40,22 @@ import java.util.concurrent.TimeUnit;
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class InstanceManager {
@Resource
private AlarmCenter alarmCenter;
@Resource
private InstanceLogService instanceLogService;
@Resource
private InstanceMetadataService instanceMetadataService;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
@Resource
private TransportService transportService;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
private final AlarmCenter alarmCenter;
private final InstanceLogService instanceLogService;
private final InstanceMetadataService instanceMetadataService;
private final InstanceInfoRepository instanceInfoRepository;
private final WorkflowInstanceManager workflowInstanceManager;
private final TransportService transportService;
private final WorkerClusterQueryService workerClusterQueryService;
/**
* 更新任务状态
@ -69,7 +70,6 @@ public class InstanceManager {
public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws ExecutionException {
Long instanceId = req.getInstanceId();
// 获取相关数据
JobInfoDO jobInfo = instanceMetadataService.fetchJobInfoByInstanceId(req.getInstanceId());
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
@ -77,6 +77,7 @@ public class InstanceManager {
log.warn("[InstanceManager-{}] can't find InstanceInfo from database", instanceId);
return;
}
int originStatus = instanceInfo.getStatus();
// 丢弃过期的上报数据
if (req.getReportTime() <= instanceInfo.getLastReportTime()) {
log.warn("[InstanceManager-{}] receive the expired status report request: {}, this report will be dropped.", instanceId, req);
@ -134,8 +135,7 @@ public class InstanceManager {
boolean finished = false;
if (receivedInstanceStatus == InstanceStatus.SUCCEED) {
instanceInfo.setResult(req.getResult());
instanceInfo.setFinishedTime(System.currentTimeMillis());
instanceInfo.setFinishedTime(req.getEndTime() == null ? System.currentTimeMillis() : req.getEndTime());
finished = true;
} else if (receivedInstanceStatus == InstanceStatus.FAILED) {
@ -152,21 +152,23 @@ public class InstanceManager {
instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
} else {
instanceInfo.setResult(req.getResult());
instanceInfo.setFinishedTime(System.currentTimeMillis());
instanceInfo.setFinishedTime(req.getEndTime() == null ? System.currentTimeMillis() : req.getEndTime());
finished = true;
log.info("[InstanceManager-{}] instance execute failed and have no chance to retry.", instanceId);
}
}
// 同步状态变更信息到数据库
instanceInfoRepository.saveAndFlush(instanceInfo);
if (finished) {
// 最终状态允许直接覆盖更新
instanceInfoRepository.saveAndFlush(instanceInfo);
// 这里的 InstanceStatus 只有 成功/失败 两种手动停止不会由 TaskTracker 上报
processFinishedInstance(instanceId, req.getWfInstanceId(), receivedInstanceStatus, req.getResult());
return;
}
// 带条件更新
final int i = instanceInfoRepository.updateStatusChangeInfoByInstanceIdAndStatus(instanceInfo.getLastReportTime(), instanceInfo.getGmtModified(), instanceInfo.getRunningTimes(), instanceInfo.getStatus(), instanceInfo.getInstanceId(), originStatus);
if (i == 0) {
log.warn("[InstanceManager-{}] update instance status failed, maybe the instance status has been changed by other thread. discard this status change,{}", instanceId, instanceInfo);
}
}
private void stopInstance(Long instanceId, InstanceInfoDO instanceInfo) {

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.core.instance;
import lombok.RequiredArgsConstructor;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
@ -21,14 +22,16 @@ import java.util.concurrent.ExecutionException;
* @since 2020/6/23
*/
@Service
@RequiredArgsConstructor
public class InstanceMetadataService implements InitializingBean {
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
private final JobInfoRepository jobInfoRepository;
// 缓存一旦生成任务实例其对应的 JobInfo 不应该再改变即使源数据改变
private final InstanceInfoRepository instanceInfoRepository;
/**
* 缓存一旦生成任务实例其对应的 JobInfo 不应该再改变即使源数据改变
*/
private Cache<Long, JobInfoDO> instanceId2JobInfoCache;
@Value("${oms.instance.metadata.cache.size}")

View File

@ -1,13 +1,14 @@
package tech.powerjob.server.core.instance;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.PowerQuery;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.InstanceDetail;
import tech.powerjob.common.request.ServerQueryInstanceStatusReq;
import tech.powerjob.common.request.ServerStopInstanceReq;
@ -28,7 +29,6 @@ import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.remote.transport.TransportService;
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Optional;
@ -45,23 +45,22 @@ import static tech.powerjob.common.enums.InstanceStatus.STOPPED;
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class InstanceService {
@Resource
private TransportService transportService;
@Resource
private DispatchService dispatchService;
@Resource
private IdGenerateService idGenerateService;
@Resource
private InstanceManager instanceManager;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
private final TransportService transportService;
@Resource
private WorkerClusterQueryService workerClusterQueryService;
private final DispatchService dispatchService;
private final IdGenerateService idGenerateService;
private final InstanceManager instanceManager;
private final JobInfoRepository jobInfoRepository;
private final InstanceInfoRepository instanceInfoRepository;
private final WorkerClusterQueryService workerClusterQueryService;
/**
* 创建任务实例注意该方法并不调用 saveAndFlush如果有需要立即同步到DB的需求请在方法结束后手动调用 flush
@ -78,7 +77,7 @@ public class InstanceService {
* @param expectTriggerTime 预期执行时间
* @return 任务实例ID
*/
public Long create(Long jobId, Long appId, String jobParams, String instanceParams, Long wfInstanceId, Long expectTriggerTime) {
public InstanceInfoDO create(Long jobId, Long appId, String jobParams, String instanceParams, Long wfInstanceId, Long expectTriggerTime) {
Long instanceId = idGenerateService.allocate();
Date now = new Date();
@ -100,7 +99,7 @@ public class InstanceService {
newInstanceInfo.setGmtModified(now);
instanceInfoRepository.save(newInstanceInfo);
return instanceId;
return newInstanceInfo;
}
/**
@ -181,7 +180,7 @@ public class InstanceService {
// 派发任务
Long jobId = instanceInfo.getJobId();
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new PowerJobException("can't find job info by jobId: " + jobId));
dispatchService.redispatch(jobInfo, instanceId);
dispatchService.dispatch(jobInfo, instanceId,Optional.of(instanceInfo),Optional.empty());
}
/**

View File

@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
@ -30,10 +31,10 @@ import java.util.concurrent.locks.ReentrantLock;
@Aspect
@Component
@Order(1)
@RequiredArgsConstructor
public class UseCacheLockAspect {
@Resource
private MonitorService monitorService;
private final MonitorService monitorService;
private final Map<String, Cache<String, ReentrantLock>> lockContainer = Maps.newConcurrentMap();

View File

@ -1,15 +1,5 @@
package tech.powerjob.server.core.scheduler;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import tech.powerjob.server.extension.LockService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
@ -18,8 +8,17 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.utils.OmsFileUtils;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.mongodb.GridFsManager;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import javax.annotation.Resource;
import java.io.File;
import java.util.Date;
@ -33,25 +32,21 @@ import java.util.Date;
@Service
public class CleanService {
@Resource
private GridFsManager gridFsManager;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Resource
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
@Resource
private LockService lockService;
private final GridFsManager gridFsManager;
@Value("${oms.instanceinfo.retention}")
private int instanceInfoRetentionDay;
private final InstanceInfoRepository instanceInfoRepository;
@Value("${oms.container.retention.local}")
private int localContainerRetentionDay;
@Value("${oms.container.retention.remote}")
private int remoteContainerRetentionDay;
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
private final LockService lockService;
private final int instanceInfoRetentionDay;
private final int localContainerRetentionDay;
private final int remoteContainerRetentionDay;
private static final int TEMPORARY_RETENTION_DAY = 3;
@ -62,6 +57,21 @@ public class CleanService {
private static final String HISTORY_DELETE_LOCK = "history_delete_lock";
public CleanService(GridFsManager gridFsManager, InstanceInfoRepository instanceInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository,
WorkflowNodeInfoRepository workflowNodeInfoRepository, LockService lockService,
@Value("${oms.instanceinfo.retention}") int instanceInfoRetentionDay,
@Value("${oms.container.retention.local}") int localContainerRetentionDay,
@Value("${oms.container.retention.remote}") int remoteContainerRetentionDay) {
this.gridFsManager = gridFsManager;
this.instanceInfoRepository = instanceInfoRepository;
this.workflowInstanceInfoRepository = workflowInstanceInfoRepository;
this.workflowNodeInfoRepository = workflowNodeInfoRepository;
this.lockService = lockService;
this.instanceInfoRetentionDay = instanceInfoRetentionDay;
this.localContainerRetentionDay = localContainerRetentionDay;
this.remoteContainerRetentionDay = remoteContainerRetentionDay;
}
@Async(PJThreadPool.TIMING_POOL)
@Scheduled(cron = CLEAN_TIME_EXPRESSION)
@ -84,7 +94,7 @@ public class CleanService {
*/
private void cleanByOneServer() {
// 只要第一个server抢到锁其他server就会返回所以锁10分钟应该足够了
boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000);
boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000L);
if (!lock) {
log.info("[CleanService] clean job is already running, just return.");
return;

View File

@ -0,0 +1,80 @@
package tech.powerjob.server.core.scheduler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
/**
* @author Echo009
* @since 2022/10/12
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class CoreScheduleTaskManager implements InitializingBean, DisposableBean {
private final PowerScheduleService powerScheduleService;
private final InstanceStatusCheckService instanceStatusCheckService;
private final List<Thread> coreThreadContainer = new ArrayList<>();
@SuppressWarnings("AlibabaAvoidManuallyCreateThread")
@Override
public void afterPropertiesSet() {
// 定时调度
coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronJob), "Thread-ScheduleCronJob"));
coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronWorkflow", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronWorkflow), "Thread-ScheduleCronWorkflow"));
coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleFrequentJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleFrequentJob), "Thread-ScheduleFrequentJob"));
// 数据清理
coreThreadContainer.add(new Thread(new LoopRunnable("CleanWorkerData", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::cleanData), "Thread-CleanWorkerData"));
// 状态检查
coreThreadContainer.add(new Thread(new LoopRunnable("CheckRunningInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkRunningInstance), "Thread-CheckRunningInstance"));
coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingDispatchInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingDispatchInstance), "Thread-CheckWaitingDispatchInstance"));
coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingWorkerReceiveInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingWorkerReceiveInstance), "Thread-CheckWaitingWorkerReceiveInstance"));
coreThreadContainer.add(new Thread(new LoopRunnable("CheckWorkflowInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWorkflowInstance), "Thread-CheckWorkflowInstance"));
coreThreadContainer.forEach(Thread::start);
}
@Override
public void destroy() {
coreThreadContainer.forEach(Thread::interrupt);
}
@RequiredArgsConstructor
private static class LoopRunnable implements Runnable {
private final String taskName;
private final Long runningInterval;
private final Runnable innerRunnable;
@SuppressWarnings("BusyWait")
@Override
public void run() {
log.info("start task : {}.", taskName);
while (true) {
try {
innerRunnable.run();
Thread.sleep(runningInterval);
} catch (InterruptedException e) {
log.warn("[{}] task has been interrupted!", taskName, e);
break;
} catch (Exception e) {
log.error("[{}] task failed!", taskName, e);
}
}
}
}
}

View File

@ -1,29 +1,31 @@
package tech.powerjob.server.core.scheduler;
import tech.powerjob.common.enums.InstanceStatus;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.common.Holder;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.persistence.remote.model.*;
import tech.powerjob.server.persistence.remote.repository.*;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceManager;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.brief.BriefInstanceInfo;
import tech.powerjob.server.persistence.remote.repository.*;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
/**
@ -34,135 +36,220 @@ import java.util.stream.Collectors;
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class InstanceStatusCheckService {
private static final int MAX_BATCH_NUM = 10;
private static final int MAX_BATCH_NUM_APP = 10;
private static final int MAX_BATCH_NUM_INSTANCE = 3000;
private static final int MAX_BATCH_UPDATE_NUM = 500;
private static final long DISPATCH_TIMEOUT_MS = 30000;
private static final long RECEIVE_TIMEOUT_MS = 60000;
private static final long RUNNING_TIMEOUT_MS = 60000;
private static final long WORKFLOW_WAITING_TIMEOUT_MS = 60000;
@Resource
private DispatchService dispatchService;
@Resource
private InstanceManager instanceManager;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
public static final long CHECK_INTERVAL = 10000;
@Resource
private AppInfoRepository appInfoRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
private final DispatchService dispatchService;
@Async(PJThreadPool.TIMING_POOL)
@Scheduled(fixedDelay = 10000)
public void timingStatusCheck() {
private final InstanceManager instanceManager;
private final WorkflowInstanceManager workflowInstanceManager;
private final AppInfoRepository appInfoRepository;
private final JobInfoRepository jobInfoRepository;
private final InstanceInfoRepository instanceInfoRepository;
private final WorkflowInfoRepository workflowInfoRepository;
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
public void checkWorkflowInstance() {
Stopwatch stopwatch = Stopwatch.createStarted();
// 查询DB获取该Server需要负责的AppGroup
List<AppInfoDO> appInfoList = appInfoRepository.findAllByCurrentServer(AkkaStarter.getActorSystemAddress());
if (CollectionUtils.isEmpty(appInfoList)) {
// 查询 DB 获取该 Server 需要负责的 AppGroup
List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[InstanceStatusChecker] current server has no app's job to check");
return;
}
List<Long> allAppIds = appInfoList.stream().map(AppInfoDO::getId).collect(Collectors.toList());
try {
checkInstance(allAppIds);
checkWorkflowInstance(allAppIds);
} catch (Exception e) {
log.error("[InstanceStatusChecker] status check failed.", e);
log.error("[InstanceStatusChecker] WorkflowInstance status check failed.", e);
}
log.info("[InstanceStatusChecker] status check using {}.", stopwatch.stop());
log.info("[InstanceStatusChecker] WorkflowInstance status check using {}.", stopwatch.stop());
}
/**
* 检查任务实例的状态发现异常及时重试包括
* 检查等待派发的实例
* WAITING_DISPATCH 超时写入时间轮但未调度前 server down
* WAITING_WORKER_RECEIVE 超时由于网络错误导致 worker 未接受成功
* RUNNING 超时TaskTracker down断开与 server 的心跳连接
*
* @param allAppIds 本系统所承担的所有 appIds
*/
private void checkInstance(List<Long> allAppIds) {
Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> {
// 1. 检查等待 WAITING_DISPATCH 状态的任务
handleWaitingDispatchInstance(partAppIds);
// 2. 检查 WAITING_WORKER_RECEIVE 状态的任务
handleWaitingWorkerReceiveInstance(partAppIds);
// 3. 检查 RUNNING 状态的任务一定时间内没收到 TaskTracker 的状态报告视为失败
handleRunningInstance(partAppIds);
});
public void checkWaitingDispatchInstance() {
Stopwatch stopwatch = Stopwatch.createStarted();
// 查询 DB 获取该 Server 需要负责的 AppGroup
List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[InstanceStatusChecker] current server has no app's job to check");
return;
}
try {
// 检查等待 WAITING_DISPATCH 状态的任务
Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(this::handleWaitingDispatchInstance);
} catch (Exception e) {
log.error("[InstanceStatusChecker] WaitingDispatchInstance status check failed.", e);
}
log.info("[InstanceStatusChecker] WaitingDispatchInstance status check using {}.", stopwatch.stop());
}
/**
* 检查等待 worker 接收的实例
* WAITING_WORKER_RECEIVE 超时由于网络错误导致 worker 未接受成功
*/
public void checkWaitingWorkerReceiveInstance() {
Stopwatch stopwatch = Stopwatch.createStarted();
// 查询 DB 获取该 Server 需要负责的 AppGroup
List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[InstanceStatusChecker] current server has no app's job to check");
return;
}
try {
// 检查 WAITING_WORKER_RECEIVE 状态的任务
Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(this::handleWaitingWorkerReceiveInstance);
} catch (Exception e) {
log.error("[InstanceStatusChecker] WaitingWorkerReceiveInstance status check failed.", e);
}
log.info("[InstanceStatusChecker] WaitingWorkerReceiveInstance status check using {}.", stopwatch.stop());
}
/**
* 检查运行中的实例
* RUNNING 超时TaskTracker down断开与 server 的心跳连接
*/
public void checkRunningInstance() {
Stopwatch stopwatch = Stopwatch.createStarted();
// 查询 DB 获取该 Server 需要负责的 AppGroup
List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[InstanceStatusChecker] current server has no app's job to check");
return;
}
try {
// 检查 RUNNING 状态的任务一定时间没收到 TaskTracker 的状态报告视为失败
Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(this::handleRunningInstance);
} catch (Exception e) {
log.error("[InstanceStatusChecker] RunningInstance status check failed.", e);
}
log.info("[InstanceStatusChecker] RunningInstance status check using {}.", stopwatch.stop());
}
private void handleWaitingDispatchInstance(List<Long> partAppIds) {
// 1. 检查等待 WAITING_DISPATCH 状态的任务
long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS;
List<InstanceInfoDO> waitingDispatchInstances = instanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingDispatchInstances)) {
log.warn("[InstanceStatusChecker] find some instance which is not triggered as expected: {}", waitingDispatchInstances);
waitingDispatchInstances.forEach(instance -> {
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(instance.getJobId());
if (jobInfoOpt.isPresent()) {
dispatchService.redispatch(jobInfoOpt.get(), instance.getInstanceId());
} else {
log.warn("[InstanceStatusChecker] can't find job by jobId[{}], so redispatch failed, failed instance: {}", instance.getJobId(), instance);
updateFailedInstance(instance, SystemInstanceResult.CAN_NOT_FIND_JOB_INFO);
List<InstanceInfoDO> waitingDispatchInstances = instanceInfoRepository.findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold, PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
while (!waitingDispatchInstances.isEmpty()) {
List<Long> overloadAppIdList = new ArrayList<>();
long startTime = System.currentTimeMillis();
// 按照 appId 分组处理方便处理超载的逻辑
Map<Long, List<InstanceInfoDO>> waitingDispatchInstancesMap = waitingDispatchInstances.stream().collect(Collectors.groupingBy(InstanceInfoDO::getAppId));
for (Map.Entry<Long, List<InstanceInfoDO>> entry : waitingDispatchInstancesMap.entrySet()) {
final Long currentAppId = entry.getKey();
final List<InstanceInfoDO> currentAppWaitingDispatchInstances = entry.getValue();
// collect job id
Set<Long> jobIds = currentAppWaitingDispatchInstances.stream().map(InstanceInfoDO::getJobId).collect(Collectors.toSet());
// query job info and map
Map<Long, JobInfoDO> jobInfoMap = jobInfoRepository.findByIdIn(jobIds).stream().collect(Collectors.toMap(JobInfoDO::getId, e -> e));
log.warn("[InstanceStatusChecker] find some instance in app({}) which is not triggered as expected: {}", currentAppId, currentAppWaitingDispatchInstances.stream().map(InstanceInfoDO::getInstanceId).collect(Collectors.toList()));
final Holder<Boolean> overloadFlag = new Holder<>(false);
// 先这么简单处理没问题毕竟只有这一个地方用了 parallelStream
currentAppWaitingDispatchInstances.parallelStream().forEach(instance -> {
if (overloadFlag.get()) {
// 直接忽略
return;
}
Optional<JobInfoDO> jobInfoOpt = Optional.ofNullable(jobInfoMap.get(instance.getJobId()));
if (jobInfoOpt.isPresent()) {
// 处理等待派发的任务没有必要再重置一次状态减少 io 次数
dispatchService.dispatch(jobInfoOpt.get(), instance.getInstanceId(), Optional.of(instance), Optional.of(overloadFlag));
} else {
log.warn("[InstanceStatusChecker] can't find job by jobId[{}], so redispatch failed, failed instance: {}", instance.getJobId(), instance);
final Optional<InstanceInfoDO> opt = instanceInfoRepository.findById(instance.getId());
opt.ifPresent(instanceInfoDO -> updateFailedInstance(instanceInfoDO, SystemInstanceResult.CAN_NOT_FIND_JOB_INFO));
}
});
threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS;
if (overloadFlag.get()) {
overloadAppIdList.add(currentAppId);
}
});
}
log.info("[InstanceStatusChecker] process {} task,use {} ms", waitingDispatchInstances.size(), System.currentTimeMillis() - startTime);
if (!overloadAppIdList.isEmpty()) {
log.warn("[InstanceStatusChecker] app[{}] is overload, so skip check waiting dispatch instance", overloadAppIdList);
partAppIds.removeAll(overloadAppIdList);
}
if (partAppIds.isEmpty()) {
break;
}
waitingDispatchInstances = instanceInfoRepository.findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold, PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
}
}
private void handleWaitingWorkerReceiveInstance(List<Long> partAppIds) {
// 2. 检查 WAITING_WORKER_RECEIVE 状态的任务
long threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS;
List<InstanceInfoDO> waitingWorkerReceiveInstances = instanceInfoRepository.findByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) {
log.warn("[InstanceStatusChecker] find one instance didn't receive any reply from worker, try to redispatch: {}", waitingWorkerReceiveInstances);
waitingWorkerReceiveInstances.forEach(instance -> {
// 重新派发
JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new);
dispatchService.redispatch(jobInfoDO, instance.getInstanceId());
});
List<BriefInstanceInfo> waitingWorkerReceiveInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold, PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
while (!waitingWorkerReceiveInstances.isEmpty()) {
log.warn("[InstanceStatusChecker] find some instance didn't receive any reply from worker, try to redispatch: {}", waitingWorkerReceiveInstances.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList()));
final List<List<BriefInstanceInfo>> partitions = Lists.partition(waitingWorkerReceiveInstances, MAX_BATCH_UPDATE_NUM);
for (List<BriefInstanceInfo> partition : partitions) {
dispatchService.redispatchBatchAsyncLockFree(partition.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList()), InstanceStatus.WAITING_WORKER_RECEIVE.getV());
}
// 重新查询
threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS;
waitingWorkerReceiveInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold, PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
}
}
private void handleRunningInstance(List<Long> partAppIds) {
// 3. 检查 RUNNING 状态的任务一定时间没收到 TaskTracker 的状态报告视为失败
long threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS;
List<InstanceInfoDO> failedInstances = instanceInfoRepository.findByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold));
if (!CollectionUtils.isEmpty(failedInstances)) {
log.warn("[InstanceStatusCheckService] instances({}) has not received status report for a long time.", failedInstances);
List<BriefInstanceInfo> failedInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold), PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
while (!failedInstances.isEmpty()) {
// collect job id
Set<Long> jobIds = failedInstances.stream().map(BriefInstanceInfo::getJobId).collect(Collectors.toSet());
// query job info and map
Map<Long, JobInfoDO> jobInfoMap = jobInfoRepository.findByIdIn(jobIds).stream().collect(Collectors.toMap(JobInfoDO::getId, e -> e));
log.warn("[InstanceStatusCheckService] find some instances have not received status report for a long time : {}", failedInstances.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList()));
failedInstances.forEach(instance -> {
JobInfoDO jobInfoDO = jobInfoRepository.findById(instance.getJobId()).orElseGet(JobInfoDO::new);
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
SwitchableStatus switchableStatus = SwitchableStatus.of(jobInfoDO.getStatus());
// 如果任务已关闭则不进行重试将任务置为失败即可秒级任务也直接置为失败由派发器重新调度
if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType.getV())) {
updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT);
Optional<JobInfoDO> jobInfoOpt = Optional.ofNullable(jobInfoMap.get(instance.getJobId()));
if (!jobInfoOpt.isPresent()) {
final Optional<InstanceInfoDO> opt = instanceInfoRepository.findById(instance.getId());
opt.ifPresent(e -> updateFailedInstance(e, SystemInstanceResult.REPORT_TIMEOUT));
return;
}
// CRON API一样失败次数 + 1根据重试配置进行重试
if (instance.getRunningTimes() < jobInfoDO.getInstanceRetryNum()) {
dispatchService.redispatch(jobInfoDO, instance.getInstanceId());
} else {
updateFailedInstance(instance, SystemInstanceResult.REPORT_TIMEOUT);
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoOpt.get().getTimeExpressionType());
SwitchableStatus switchableStatus = SwitchableStatus.of(jobInfoOpt.get().getStatus());
// 如果任务已关闭则不进行重试将任务置为失败即可秒级任务也直接置为失败由派发器重新调度
if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType.getV())) {
final Optional<InstanceInfoDO> opt = instanceInfoRepository.findById(instance.getId());
opt.ifPresent(e -> updateFailedInstance(e, SystemInstanceResult.REPORT_TIMEOUT));
return;
}
// CRON API一样失败次数 + 1根据重试配置进行重试
if (instance.getRunningTimes() < jobInfoOpt.get().getInstanceRetryNum()) {
dispatchService.redispatchAsync(instance.getInstanceId(), InstanceStatus.RUNNING.getV());
} else {
final Optional<InstanceInfoDO> opt = instanceInfoRepository.findById(instance.getId());
opt.ifPresent(e -> updateFailedInstance(e, SystemInstanceResult.REPORT_TIMEOUT));
}
});
threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS;
failedInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold), PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
}
}
/**
@ -175,7 +262,7 @@ public class InstanceStatusCheckService {
// 重试长时间处于 WAITING 状态的工作流实例
long threshold = System.currentTimeMillis() - WORKFLOW_WAITING_TIMEOUT_MS;
Lists.partition(allAppIds, MAX_BATCH_NUM).forEach(partAppIds -> {
Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(partAppIds -> {
List<WorkflowInstanceInfoDO> waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, WorkflowInstanceStatus.WAITING.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingWfInstanceList)) {

View File

@ -1,38 +1,33 @@
package tech.powerjob.server.core.scheduler;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.server.common.constants.PJThreadPool;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.core.service.JobService;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.service.JobService;
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
/**
* 任务调度执行服务调度 CRON 表达式的任务进行执行
@ -44,6 +39,7 @@ import java.util.stream.Collectors;
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class PowerScheduleService {
/**
@ -51,83 +47,105 @@ public class PowerScheduleService {
*/
private static final int MAX_APP_NUM = 10;
@Resource
private DispatchService dispatchService;
@Resource
private InstanceService instanceService;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
private final DispatchService dispatchService;
@Resource
private AppInfoRepository appInfoRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
private final InstanceService instanceService;
@Resource
private JobService jobService;
@Resource
private TimingStrategyService timingStrategyService;
private final WorkflowInstanceManager workflowInstanceManager;
private static final long SCHEDULE_RATE = 15000;
private final AppInfoRepository appInfoRepository;
@Async(PJThreadPool.TIMING_POOL)
@Scheduled(fixedDelay = SCHEDULE_RATE)
public void timingSchedule() {
private final JobInfoRepository jobInfoRepository;
private final WorkflowInfoRepository workflowInfoRepository;
private final InstanceInfoRepository instanceInfoRepository;
private final JobService jobService;
private final TimingStrategyService timingStrategyService;
public static final long SCHEDULE_RATE = 15000;
public void scheduleCronJob() {
long start = System.currentTimeMillis();
Stopwatch stopwatch = Stopwatch.createStarted();
// 先查询DB查看本机需要负责的任务
List<AppInfoDO> allAppInfos = appInfoRepository.findAllByCurrentServer(AkkaStarter.getActorSystemAddress());
if (CollectionUtils.isEmpty(allAppInfos)) {
log.info("[JobScheduleService] current server has no app's job to schedule.");
return;
}
List<Long> allAppIds = allAppInfos.stream().map(AppInfoDO::getId).collect(Collectors.toList());
// 清理不需要维护的数据
WorkerClusterManagerService.clean(allAppIds);
// 调度 CRON 表达式 JOB
try {
scheduleCronJob(allAppIds);
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[CronJobSchedule] current server has no app's job to schedule.");
return;
}
scheduleCronJobCore(allAppIds);
} catch (Exception e) {
log.error("[CronScheduler] schedule cron job failed.", e);
log.error("[CronJobSchedule] schedule cron job failed.", e);
}
String cronTime = stopwatch.toString();
stopwatch.reset().start();
// 调度 workflow 任务
try {
scheduleWorkflow(allAppIds);
} catch (Exception e) {
log.error("[WorkflowScheduler] schedule workflow job failed.", e);
}
String wfTime = stopwatch.toString();
stopwatch.reset().start();
// 调度 秒级任务
try {
scheduleFrequentJob(allAppIds);
} catch (Exception e) {
log.error("[FrequentScheduler] schedule frequent job failed.", e);
}
log.info("[JobScheduleService] cron schedule: {}, workflow schedule: {}, frequent schedule: {}.", cronTime, wfTime, stopwatch.stop());
long cost = System.currentTimeMillis() - start;
log.info("[CronJobSchedule] cron job schedule use {} ms.", cost);
if (cost > SCHEDULE_RATE) {
log.warn("[JobScheduleService] The database query is using too much time({}ms), please check if the database load is too high!", cost);
log.warn("[CronJobSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost);
}
}
public void scheduleCronWorkflow() {
long start = System.currentTimeMillis();
// 调度 CRON 表达式 WORKFLOW
try {
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[CronWorkflowSchedule] current server has no app's workflow to schedule.");
return;
}
scheduleWorkflowCore(allAppIds);
} catch (Exception e) {
log.error("[CronWorkflowSchedule] schedule cron workflow failed.", e);
}
long cost = System.currentTimeMillis() - start;
log.info("[CronWorkflowSchedule] cron workflow schedule use {} ms.", cost);
if (cost > SCHEDULE_RATE) {
log.warn("[CronWorkflowSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost);
}
}
public void scheduleFrequentJob() {
long start = System.currentTimeMillis();
// 调度 FIX_RATE/FIX_DELAY 表达式 JOB
try {
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
if (CollectionUtils.isEmpty(allAppIds)) {
log.info("[FrequentJobSchedule] current server has no app's job to schedule.");
return;
}
scheduleFrequentJobCore(allAppIds);
} catch (Exception e) {
log.error("[FrequentJobSchedule] schedule frequent job failed.", e);
}
long cost = System.currentTimeMillis() - start;
log.info("[FrequentJobSchedule] frequent job schedule use {} ms.", cost);
if (cost > SCHEDULE_RATE) {
log.warn("[FrequentJobSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost);
}
}
public void cleanData() {
try {
final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress());
if (allAppIds.isEmpty()) {
return;
}
WorkerClusterManagerService.clean(allAppIds);
} catch (Exception e) {
log.error("[CleanData] clean data failed.", e);
}
}
/**
* 调度 CRON 表达式类型的任务
*/
private void scheduleCronJob(List<Long> appIds) {
private void scheduleCronJobCore(List<Long> appIds) {
long nowTime = System.currentTimeMillis();
long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
@ -147,7 +165,7 @@ public class PowerScheduleService {
log.info("[CronScheduler] These cron jobs will be scheduled: {}.", jobInfos);
jobInfos.forEach(jobInfo -> {
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime());
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime()).getInstanceId();
jobId2InstanceId.put(jobInfo.getId(), instanceId);
});
instanceInfoRepository.flush();
@ -165,7 +183,7 @@ public class PowerScheduleService {
delay = targetTriggerTime - nowTime;
}
InstanceTimeWheelService.schedule(instanceId, delay, () -> dispatchService.dispatch(jobInfoDO, instanceId));
InstanceTimeWheelService.schedule(instanceId, delay, () -> dispatchService.dispatch(jobInfoDO, instanceId, Optional.empty(), Optional.empty()));
});
// 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms
@ -185,7 +203,7 @@ public class PowerScheduleService {
});
}
private void scheduleWorkflow(List<Long> appIds) {
private void scheduleWorkflowCore(List<Long> appIds) {
long nowTime = System.currentTimeMillis();
long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
@ -220,7 +238,7 @@ public class PowerScheduleService {
});
}
private void scheduleFrequentJob(List<Long> appIds) {
private void scheduleFrequentJobCore(List<Long> appIds) {
Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {
try {

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.core.service;
import lombok.RequiredArgsConstructor;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
@ -15,10 +16,10 @@ import java.util.Objects;
* @since 2020/6/20
*/
@Service
@RequiredArgsConstructor
public class AppInfoService {
@Resource
private AppInfoRepository appInfoRepository;
private final AppInfoRepository appInfoRepository;
/**
* 验证应用访问权限

View File

@ -1,17 +1,16 @@
package tech.powerjob.server.core.service;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Optional;
@ -25,19 +24,23 @@ import java.util.Optional;
@Service
public class CacheService {
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
private final JobInfoRepository jobInfoRepository;
private final WorkflowInfoRepository workflowInfoRepository;
private final InstanceInfoRepository instanceInfoRepository;
private final Cache<Long, String> jobId2JobNameCache;
private final Cache<Long, String> workflowId2WorkflowNameCache;
private final Cache<Long, Long> instanceId2AppId;
private final Cache<Long, Long> jobId2AppId;
public CacheService() {
public CacheService(JobInfoRepository jobInfoRepository, WorkflowInfoRepository workflowInfoRepository, InstanceInfoRepository instanceInfoRepository) {
this.jobInfoRepository = jobInfoRepository;
this.workflowInfoRepository = workflowInfoRepository;
this.instanceInfoRepository = instanceInfoRepository;
jobId2JobNameCache = CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofMinutes(1))
.maximumSize(512)

View File

@ -2,6 +2,7 @@ package tech.powerjob.server.core.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.data.jpa.domain.Specification;
@ -43,20 +44,18 @@ import java.util.stream.Collectors;
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class JobService {
@Resource
private InstanceService instanceService;
private final InstanceService instanceService;
@Resource
private DispatchService dispatchService;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@Resource
private TimingStrategyService timingStrategyService;
private final DispatchService dispatchService;
private final JobInfoRepository jobInfoRepository;
private final InstanceInfoRepository instanceInfoRepository;
private final TimingStrategyService timingStrategyService;
/**
* 保存/修改任务
@ -173,15 +172,15 @@ public class JobService {
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
log.info("[Job-{}] try to run job in app[{}], instanceParams={},delay={} ms.", jobInfo.getId(), appId, instanceParams, delay);
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
final InstanceInfoDO instanceInfo = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
instanceInfoRepository.flush();
if (delay <= 0) {
dispatchService.dispatch(jobInfo, instanceId);
dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.of(instanceInfo),Optional.empty());
} else {
InstanceTimeWheelService.schedule(instanceId, delay, () -> dispatchService.dispatch(jobInfo, instanceId));
InstanceTimeWheelService.schedule(instanceInfo.getInstanceId(), delay, () -> dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.empty(),Optional.empty()));
}
log.info("[Job-{}|{}] execute 'runJob' successfully, params={}", jobInfo.getId(), instanceId, instanceParams);
return instanceId;
log.info("[Job-{}|{}] execute 'runJob' successfully, params={}", jobInfo.getId(), instanceInfo.getInstanceId(), instanceParams);
return instanceInfo.getInstanceId();
}
@ -205,9 +204,8 @@ public class JobService {
* 启用某个任务
*
* @param jobId 任务ID
* @throws ParseException 异常CRON表达式错误
*/
public void enableJob(Long jobId) throws ParseException {
public void enableJob(Long jobId) {
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId));
jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV());

View File

@ -5,7 +5,7 @@ import tech.powerjob.server.persistence.remote.repository.UserInfoRepository;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Resource;
import java.util.Date;

View File

@ -18,11 +18,10 @@ import tech.powerjob.server.remote.server.self.ServerInfoService;
public class IdGenerateService {
private final SnowFlakeIdGenerator snowFlakeIdGenerator;
private static final int DATA_CENTER_ID = 0;
@Autowired
public IdGenerateService(ServerInfoService serverInfoService) {
long id = serverInfoService.fetchServiceInfo().getId();
snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);
log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id);

View File

@ -7,37 +7,50 @@ package tech.powerjob.server.core.uid;
* @since 2020/4/6
*/
public class SnowFlakeIdGenerator {
/**
* 起始的时间戳(a special day for me)
*/
private final static long START_STAMP = 1555776000000L;
/**
* 每一部分占用的位数
* 序列号占用的位数
*/
private final static long SEQUENCE_BIT = 6; //序列号占用的位数
private final static long MACHINE_BIT = 14; //机器标识占用的位数
private final static long DATA_CENTER_BIT = 2;//数据中心占用的位数
private final static long SEQUENCE_BIT = 6;
/**
* 机器标识占用的位数
*/
private final static long MACHINE_BIT = 14;
/**
* 数据中心占用的位数
*/
private final static long DATA_CENTER_BIT = 2;
/**
* 每一部分的最大值
*/
private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT);
private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT);
private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
/**
* 每一部分向左的位移
*/
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
private final long dataCenterId; //数据中心
private final long machineId; //机器标识
private long sequence = 0L; //序列号
private long lastTimestamp = -1L;//上一次时间戳
/**
* 数据中心
*/
private final long dataCenterId;
/**
* 机器标识
*/
private final long machineId;
/**
* 序列号
*/
private long sequence = 0L;
/**
* 上一次时间戳
*/
private long lastTimestamp = -1L;
public SnowFlakeIdGenerator(long dataCenterId, long machineId) {
if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {
@ -56,7 +69,7 @@ public class SnowFlakeIdGenerator {
public synchronized long nextId() {
long currStamp = getNewStamp();
if (currStamp < lastTimestamp) {
throw new RuntimeException("clock moved backwards, refusing to generate id");
return futureId();
}
if (currStamp == lastTimestamp) {
@ -79,6 +92,22 @@ public class SnowFlakeIdGenerator {
| sequence; //序列号部分
}
/**
* 发生时钟回拨时借用未来时间生成Id避免运行过程中任务调度和工作流直接进入不可用状态
* 该方式不可解决原算法中停服状态下时钟回拨导致的重复id问题
*/
private long futureId() {
sequence = (sequence + 1) & MAX_SEQUENCE;
if (sequence == 0L) {
lastTimestamp = lastTimestamp + 1;
}
return (lastTimestamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分
| dataCenterId << DATA_CENTER_LEFT //数据中心部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //序列号部分
}
private long getNextMill() {
long mill = getNewStamp();
while (mill <= lastTimestamp) {

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.core.validator;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.WorkflowNodeType;
@ -18,10 +19,10 @@ import javax.annotation.Resource;
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class JobNodeValidator implements NodeValidator {
@Resource
private JobInfoRepository jobInfoRepository;
private final JobInfoRepository jobInfoRepository;
@Override
public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) {

View File

@ -1,6 +1,7 @@
package tech.powerjob.server.core.validator;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.WorkflowNodeType;
@ -13,7 +14,6 @@ import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.Optional;
@ -23,12 +23,12 @@ import java.util.Optional;
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class NestedWorkflowNodeValidator implements NodeValidator {
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Resource
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
private final WorkflowInfoRepository workflowInfoRepository;
private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
@Override
public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) {

View File

@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
@ -18,6 +19,7 @@ import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.helper.StatusMappingHelper;
import tech.powerjob.server.core.lock.UseCacheLock;
import tech.powerjob.server.core.service.UserService;
@ -32,7 +34,6 @@ import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
@ -47,25 +48,25 @@ import static tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils.isNo
*/
@Slf4j
@Service
@RequiredArgsConstructor
@SuppressWarnings("squid:S1192")
public class WorkflowInstanceManager {
@Resource
private AlarmCenter alarmCenter;
@Resource
private IdGenerateService idGenerateService;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private UserService userService;
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Resource
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
@Resource
private WorkflowNodeHandleService workflowNodeHandleService;
private final AlarmCenter alarmCenter;
private final IdGenerateService idGenerateService;
private final JobInfoRepository jobInfoRepository;
private final UserService userService;
private final WorkflowInfoRepository workflowInfoRepository;
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
private final WorkflowNodeHandleService workflowNodeHandleService;
/**
* 创建工作流任务实例
@ -440,10 +441,10 @@ public class WorkflowInstanceManager {
if (workflowInstanceStatus == WorkflowInstanceStatus.SUCCEED){
HashMap<String, String> wfContext = JSON.parseObject(wfInstance.getWfContext(), new TypeReference<HashMap<String, String>>() {
});
updateWorkflowContext(wfInstance.getParentWfInstanceId(),wfContext);
SpringUtils.getBean(this.getClass()).updateWorkflowContext(wfInstance.getParentWfInstanceId(), wfContext);
}
// 处理父工作流
move(wfInstance.getParentWfInstanceId(), wfInstance.getWfInstanceId(), StatusMappingHelper.toInstanceStatus(workflowInstanceStatus), result);
// 处理父工作流, fix https://github.com/PowerJob/PowerJob/issues/465
SpringUtils.getBean(this.getClass()).move(wfInstance.getParentWfInstanceId(), wfInstance.getWfInstanceId(), StatusMappingHelper.toInstanceStatus(workflowInstanceStatus), result);
}
// 报警

View File

@ -1,14 +1,20 @@
package tech.powerjob.server.core.workflow;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.SystemInstanceResult;
import tech.powerjob.common.enums.WorkflowInstanceStatus;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.response.WorkflowInstanceInfoDTO;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.core.lock.UseCacheLock;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
@ -16,12 +22,7 @@ import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import tech.powerjob.server.remote.server.redirector.DesignateServer;
import tech.powerjob.server.core.instance.InstanceService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
@ -35,18 +36,16 @@ import java.util.Optional;
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class WorkflowInstanceService {
@Resource
private InstanceService instanceService;
@Resource
private WorkflowInstanceInfoRepository wfInstanceInfoRepository;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Resource
private WorkflowInstanceService self;
private final InstanceService instanceService;
private final WorkflowInstanceInfoRepository wfInstanceInfoRepository;
private final WorkflowInstanceManager workflowInstanceManager;
private final WorkflowInfoRepository workflowInfoRepository;
/**
* 停止工作流实例入口
@ -61,10 +60,10 @@ public class WorkflowInstanceService {
}
// 如果这是一个被嵌套的工作流则终止父工作流
if (wfInstance.getParentWfInstanceId() != null) {
self.stopWorkflowInstance(wfInstance.getParentWfInstanceId(), appId);
SpringUtils.getBean(this.getClass()).stopWorkflowInstance(wfInstance.getParentWfInstanceId(), appId);
return;
}
self.stopWorkflowInstance(wfInstanceId, appId);
SpringUtils.getBean(this.getClass()).stopWorkflowInstance(wfInstanceId, appId);
}
/**

View File

@ -7,7 +7,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.LifeCycle;

View File

@ -1,11 +1,13 @@
package tech.powerjob.server.core.workflow.hanlder.impl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.enums.WorkflowNodeType;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.DispatchService;
import tech.powerjob.server.core.instance.InstanceService;
import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler;
@ -13,7 +15,7 @@ import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
import javax.annotation.Resource;
import java.util.Optional;
/**
* @author Echo009
@ -21,21 +23,15 @@ import javax.annotation.Resource;
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class JobNodeHandler implements TaskNodeHandler {
@Resource
private InstanceService instanceService;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private DispatchService dispatchService;
private final JobInfoRepository jobInfoRepository;
@Override
public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
// instanceParam 传递的是工作流实例的 wfContext
Long instanceId = instanceService.create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis());
Long instanceId = SpringUtils.getBean(InstanceService.class).create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis()).getInstanceId();
node.setInstanceId(instanceId);
node.setStatus(InstanceStatus.RUNNING.getV());
log.info("[Workflow-{}|{}] create readyNode(JOB) instance(nodeId={},jobId={},instanceId={}) successfully~", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId(), instanceId);
@ -46,7 +42,7 @@ public class JobNodeHandler implements TaskNodeHandler {
JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new);
// 洗去时间表达式类型
jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV());
dispatchService.dispatch(jobInfo, node.getInstanceId());
SpringUtils.getBean(DispatchService.class).dispatch(jobInfo, node.getInstanceId(), Optional.empty(), Optional.empty());
}
@Override

View File

@ -1,6 +1,7 @@
package tech.powerjob.server.core.workflow.hanlder.impl;
import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.powerjob.common.SystemInstanceResult;
@ -11,6 +12,7 @@ import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.constants.SwitchableStatus;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler;
@ -19,7 +21,6 @@ import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
import javax.annotation.Resource;
import java.util.Date;
/**
@ -28,16 +29,12 @@ import java.util.Date;
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class NestedWorkflowNodeHandler implements TaskNodeHandler {
@Resource
private WorkflowInfoRepository workflowInfoRepository;
private final WorkflowInfoRepository workflowInfoRepository;
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
@Override
public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
@ -78,7 +75,7 @@ public class NestedWorkflowNodeHandler implements TaskNodeHandler {
} else {
// 透传当前的上下文创建新的工作流实例
String wfContext = wfInstanceInfo.getWfContext();
Long instanceId = workflowInstanceManager.create(targetWf, wfContext, System.currentTimeMillis(), wfInstanceInfo.getWfInstanceId());
Long instanceId = SpringUtils.getBean(WorkflowInstanceManager.class).create(targetWf, wfContext, System.currentTimeMillis(), wfInstanceInfo.getWfInstanceId());
node.setInstanceId(instanceId);
}
node.setStartTime(CommonUtils.formatTime(System.currentTimeMillis()));
@ -89,7 +86,7 @@ public class NestedWorkflowNodeHandler implements TaskNodeHandler {
public void startTaskInstance(PEWorkflowDAG.Node node) {
Long wfId = node.getJobId();
WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null);
workflowInstanceManager.start(targetWf, node.getInstanceId());
SpringUtils.getBean(WorkflowInstanceManager.class).start(targetWf, node.getInstanceId());
}
@Override

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.2.0</version>
<version>4.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -21,6 +21,7 @@ import org.springframework.stereotype.Service;
public class DatabaseLockService implements LockService {
private final String ownerIp;
private final OmsLockRepository omsLockRepository;
@Autowired

View File

@ -24,9 +24,9 @@ import java.util.concurrent.*;
public class AlarmCenter {
private final ExecutorService POOL;
private final List<Alarmable> BEANS = Lists.newLinkedList();
@Autowired
public AlarmCenter(List<Alarmable> alarmables) {
int cores = Runtime.getRuntime().availableProcessors();
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("AlarmPool-%d").build();

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import lombok.RequiredArgsConstructor;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.utils.NetUtils;
@ -30,17 +31,19 @@ import java.util.Set;
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DingTalkAlarmService implements Alarmable {
@Resource
private Environment environment;
private final Environment environment;
private Long agentId;
private DingTalkUtils dingTalkUtils;
private Cache<String, String> mobile2UserIdCache;
private static final int CACHE_SIZE = 8192;
// 防止缓存击穿
/**
* 防止缓存击穿
*/
private static final String EMPTY_TAG = "EMPTY";
@Override

View File

@ -125,7 +125,7 @@ public class DingTalkUtils implements Closeable {
@AllArgsConstructor
public static final class MarkdownEntity {
private String title;
private String detail;
private final String title;
private final String detail;
}
}

View File

@ -1,5 +1,7 @@
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import org.springframework.beans.factory.annotation.Value;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
@ -10,7 +12,6 @@ import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
@ -31,12 +32,11 @@ public class MailAlarmService implements Alarmable {
private JavaMailSender javaMailSender;
@Value("${spring.mail.username:''}")
private String from;
private static final String FROM_KEY = "spring.mail.username";
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
initFrom();
if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
return;
}
@ -59,10 +59,4 @@ public class MailAlarmService implements Alarmable {
this.javaMailSender = javaMailSender;
}
// 不能直接使用 @Value 注入不存在的时候会报错
private void initFrom() {
if (StringUtils.isEmpty(from)) {
from = environment.getProperty(FROM_KEY);
}
}
}

View File

@ -11,7 +11,7 @@ import okhttp3.MediaType;
import okhttp3.RequestBody;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;

View File

@ -10,38 +10,70 @@ import lombok.Data;
*/
@Data
public class JobInstanceAlarm implements Alarm {
// 应用ID
/**
* 应用ID
*/
private long appId;
// 任务ID
/**
* 任务ID
*/
private long jobId;
// 任务实例ID
/**
* 任务实例ID
*/
private long instanceId;
// 任务名称
/**
* 任务名称
*/
private String jobName;
// 任务自带的参数
/**
* 任务自带的参数
*/
private String jobParams;
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
/**
* 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
*/
private Integer timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
/**
* 时间表达式CRON/NULL/LONG/LONG
*/
private String timeExpression;
// 执行类型单机/广播/MR
/**
* 执行类型单机/广播/MR
*/
private Integer executeType;
// 执行器类型Java/Shell
/**
* 执行器类型Java/Shell
*/
private Integer processorType;
// 执行器信息
/**
* 执行器信息
*/
private String processorInfo;
// 任务实例参数
/**
* 任务实例参数
*/
private String instanceParams;
// 执行结果
/**
* 执行结果
*/
private String result;
// 预计触发时间
/**
* 预计触发时间
*/
private Long expectedTriggerTime;
// 实际触发时间
/**
* 实际触发时间
*/
private Long actualTriggerTime;
// 结束时间
/**
* 结束时间
*/
private Long finishedTime;
// TaskTracker地址
/**
*
*/
private String taskTrackerAddress;
@Override

View File

@ -14,25 +14,39 @@ public class WorkflowInstanceAlarm implements Alarm {
private String workflowName;
// 任务所属应用的ID冗余提高查询效率
/**
* 任务所属应用的ID冗余提高查询效率
*/
private Long appId;
private Long workflowId;
// workflowInstanceId任务实例表都使用单独的ID作为主键以支持潜在的分表需求
/**
* workflowInstanceId任务实例表都使用单独的ID作为主键以支持潜在的分表需求
*/
private Long wfInstanceId;
// workflow 状态WorkflowInstanceStatus
/**
* workflow 状态WorkflowInstanceStatus
*/
private Integer status;
private PEWorkflowDAG peWorkflowDAG;
private String result;
// 实际触发时间
/**
* 实际触发时间
*/
private Long actualTriggerTime;
// 结束时间
/**
* 结束时间
*/
private Long finishedTime;
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
/**
* 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
*/
private Integer timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
/**
* 时间表达式CRON/NULL/LONG/LONG
*/
private String timeExpression;
@Override

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.2.0</version>
<version>4.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -2,9 +2,11 @@ package tech.powerjob.server.migrate;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.RequiredArgsConstructor;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.model.PEWorkflowDAG;
import tech.powerjob.server.common.utils.SpringUtils;
import tech.powerjob.server.extension.LockService;
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
@ -20,7 +22,6 @@ import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import javax.persistence.criteria.Predicate;
import javax.transaction.Transactional;
import java.util.*;
@ -35,23 +36,18 @@ import java.util.concurrent.TimeUnit;
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class V3ToV4MigrateService {
private static final String MIGRATE_LOCK_TEMPLATE = "v3to4MigrateLock-%s-%s";
@Resource
private LockService lockService;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Resource
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
/**
* 避免内部方法调用导致事务不生效
*/
@Resource
private V3ToV4MigrateService self;
private final LockService lockService;
private final JobInfoRepository jobInfoRepository;
private final WorkflowInfoRepository workflowInfoRepository;
private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
/* ********************** 3.x => 4.x ********************** */
@ -149,7 +145,7 @@ public class V3ToV4MigrateService {
for (WorkflowInfoDO workflowInfo : workflowInfoList) {
try {
boolean fixed = self.fixWorkflowInfoCoreFromV3ToV4(workflowInfo, jobId2NodeIdMap);
boolean fixed = SpringUtils.getBean(this.getClass()).fixWorkflowInfoCoreFromV3ToV4(workflowInfo, jobId2NodeIdMap);
if (fixed) {
fixedWorkflowIds.add(workflowInfo.getId());
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.2.0</version>
<version>4.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -2,7 +2,6 @@ package tech.powerjob.server.monitor;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@ -19,9 +18,7 @@ public class PowerJobMonitorService implements MonitorService {
private final List<Monitor> monitors = Lists.newLinkedList();
@Autowired
public PowerJobMonitorService(List<Monitor> monitors) {
monitors.forEach(m -> {
log.info("[MonitorService] register monitor: {}", m.getClass().getName());
this.monitors.add(m);

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.2.0</version>
<version>4.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -7,6 +7,7 @@ import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
@ -38,9 +39,6 @@ import java.util.Objects;
)
public class LocalJpaConfig {
@Resource(name = "omsLocalDatasource")
private DataSource omsLocalDatasource;
public static final String LOCAL_PACKAGES = "tech.powerjob.server.persistence.local";
private static Map<String, Object> genDatasourceProperties() {
@ -56,8 +54,7 @@ public class LocalJpaConfig {
}
@Bean(name = "localEntityManagerFactory")
public LocalContainerEntityManagerFactoryBean initLocalEntityManagerFactory(EntityManagerFactoryBuilder builder) {
public LocalContainerEntityManagerFactoryBean initLocalEntityManagerFactory(@Qualifier("omsLocalDatasource") DataSource omsLocalDatasource,EntityManagerFactoryBuilder builder) {
return builder
.dataSource(omsLocalDatasource)
.properties(genDatasourceProperties())
@ -66,10 +63,9 @@ public class LocalJpaConfig {
.build();
}
@Bean(name = "localTransactionManager")
public PlatformTransactionManager initLocalTransactionManager(EntityManagerFactoryBuilder builder) {
return new JpaTransactionManager(Objects.requireNonNull(initLocalEntityManagerFactory(builder).getObject()));
public PlatformTransactionManager initLocalTransactionManager(@Qualifier("localEntityManagerFactory") LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBean) {
return new JpaTransactionManager(Objects.requireNonNull(localContainerEntityManagerFactoryBean.getObject()));
}
@Bean(name = "localTransactionTemplate")

View File

@ -5,7 +5,7 @@ import tech.powerjob.server.common.utils.PropertyUtils;
import org.hibernate.boot.model.naming.Identifier;
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment;
import org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy;
import org.springframework.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
@ -39,7 +39,7 @@ public class PowerJobPhysicalNamingStrategy extends SpringPhysicalNamingStrategy
String text = name.getText();
String noDOText = StringUtils.endsWithIgnoreCase(text, "do") ? text.substring(0, text.length() - 2) : text;
String newText = StringUtils.hasLength(tablePrefix) ? tablePrefix + noDOText : noDOText;
String newText = StringUtils.isEmpty(tablePrefix) ? tablePrefix + noDOText : noDOText;
return super.toPhysicalTableName(new Identifier(newText, name.isQuoted()), jdbcEnvironment);
}

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.persistence.config;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
@ -13,7 +14,6 @@ import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Map;
import java.util.Objects;
@ -36,12 +36,6 @@ import java.util.Objects;
)
public class RemoteJpaConfig {
@Resource(name = "omsRemoteDatasource")
private DataSource omsRemoteDatasource;
@Resource(name = "multiDatasourceProperties")
private MultiDatasourceProperties properties;
public static final String CORE_PACKAGES = "tech.powerjob.server.persistence.remote";
/**
@ -69,7 +63,7 @@ public class RemoteJpaConfig {
@Primary
@Bean(name = "remoteEntityManagerFactory")
public LocalContainerEntityManagerFactoryBean initRemoteEntityManagerFactory(EntityManagerFactoryBuilder builder) {
public LocalContainerEntityManagerFactoryBean initRemoteEntityManagerFactory(@Qualifier("omsRemoteDatasource") DataSource omsRemoteDatasource,@Qualifier("multiDatasourceProperties") MultiDatasourceProperties properties, EntityManagerFactoryBuilder builder) {
Map<String, Object> datasourceProperties = genDatasourceProperties();
datasourceProperties.putAll(properties.getRemote().getHibernate().getProperties());
return builder
@ -83,7 +77,7 @@ public class RemoteJpaConfig {
@Primary
@Bean(name = "remoteTransactionManager")
public PlatformTransactionManager initRemoteTransactionManager(EntityManagerFactoryBuilder builder) {
return new JpaTransactionManager(Objects.requireNonNull(initRemoteEntityManagerFactory(builder).getObject()));
public PlatformTransactionManager initRemoteTransactionManager(@Qualifier("remoteEntityManagerFactory") LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBean) {
return new JpaTransactionManager(Objects.requireNonNull(localContainerEntityManagerFactoryBean.getObject()));
}
}

View File

@ -16,16 +16,20 @@ import java.util.stream.Stream;
*/
public interface LocalInstanceLogRepository extends JpaRepository<LocalInstanceLogDO, Long> {
// 流式查询
/**
* 流式查询
*/
Stream<LocalInstanceLogDO> findByInstanceIdOrderByLogTime(Long instanceId);
// 删除数据
/**
* 删除数据
*/
@Modifying
@Transactional
@Transactional(rollbackOn = Exception.class)
long deleteByInstanceId(Long instanceId);
@Modifying
@Transactional
@Transactional(rollbackOn = Exception.class)
@CanIgnoreReturnValue
long deleteByInstanceIdInAndLogTimeLessThan(List<Long> instanceIds, Long t);

View File

@ -1,6 +1,5 @@
package tech.powerjob.server.persistence.mongodb;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.mongodb.client.MongoDatabase;
@ -19,8 +18,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Service;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import javax.annotation.Resource;
import java.io.*;
import java.util.Date;
import java.util.Map;
@ -36,21 +35,24 @@ import java.util.function.Consumer;
@Service
public class GridFsManager implements InitializingBean {
@Resource
private Environment environment;
private final Environment environment;
private final MongoDatabase db;
private MongoDatabase db;
private boolean available;
private final Map<String, GridFSBucket> bucketCache = Maps.newConcurrentMap();
public static final String LOG_BUCKET = "log";
public static final String CONTAINER_BUCKET = "container";
@Autowired(required = false)
public void setMongoTemplate(MongoTemplate mongoTemplate) {
public GridFsManager(Environment environment, @Autowired(required = false) MongoTemplate mongoTemplate) {
this.environment = environment;
if (mongoTemplate != null) {
this.db = mongoTemplate.getDb();
} else {
this.db = null;
}
}

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.persistence.monitor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
@ -11,7 +12,6 @@ import tech.powerjob.server.monitor.MonitorService;
import tech.powerjob.server.monitor.events.db.DatabaseEvent;
import tech.powerjob.server.monitor.events.db.DatabaseType;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;
@ -25,10 +25,10 @@ import java.util.stream.Stream;
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor
public class DatabaseMonitorAspect {
@Resource
private MonitorService monitorService;
private final MonitorService monitorService;
@Around("execution(* tech.powerjob.server.persistence.remote.repository..*.*(..))")
public Object monitorCoreDB(ProceedingJoinPoint joinPoint) throws Throwable {

View File

@ -14,7 +14,7 @@ import java.util.Date;
*/
@Data
@Entity
@Table(uniqueConstraints = {@UniqueConstraint(name = "appNameUK", columnNames = {"appName"})})
@Table(uniqueConstraints = {@UniqueConstraint(name = "uidx01_app_info", columnNames = {"appName"})})
public class AppInfoDO {
@Id

View File

@ -14,7 +14,7 @@ import java.util.Date;
*/
@Data
@Entity
@Table(indexes = {@Index(columnList = "appId")})
@Table(indexes = {@Index(name = "idx01_container_info", columnList = "appId")})
public class ContainerInfoDO {
@Id

View File

@ -19,7 +19,11 @@ import java.util.Date;
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(indexes = {@Index(columnList = "jobId"), @Index(columnList = "appId"), @Index(columnList = "instanceId")})
@Table(indexes = {
@Index(name = "idx01_instance_info", columnList = "jobId,status"),
@Index(name = "idx02_instance_info", columnList = "appId,status"),
@Index(name = "idx03_instance_info", columnList = "instanceId,status")
})
public class InstanceInfoDO {
@Id
@ -40,6 +44,7 @@ public class InstanceInfoDO {
private Long instanceId;
/**
* 任务参数静态
*
* @since 2021/2/01
*/
@Lob

View File

@ -19,7 +19,9 @@ import java.util.Date;
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(indexes = {@Index(columnList = "appId")})
@Table(indexes = {
@Index(name = "idx01_job_info", columnList = "appId,status,timeExpressionType,nextTriggerTime"),
})
public class JobInfoDO {

View File

@ -16,7 +16,7 @@ import java.util.Date;
@Data
@Entity
@NoArgsConstructor
@Table(uniqueConstraints = {@UniqueConstraint(name = "lockNameUK", columnNames = {"lockName"})})
@Table(uniqueConstraints = {@UniqueConstraint(name = "uidx01_oms_lock", columnNames = {"lockName"})})
public class OmsLockDO {
@Id

View File

@ -16,7 +16,10 @@ import java.util.Date;
@Data
@Entity
@NoArgsConstructor
@Table(uniqueConstraints = {@UniqueConstraint(columnNames = "ip")})
@Table(
uniqueConstraints = {@UniqueConstraint(name = "uidx01_server_info", columnNames = "ip")},
indexes = {@Index(name = "idx01_server_info", columnList = "gmtModified")}
)
public class ServerInfoDO {
@Id

View File

@ -14,7 +14,10 @@ import java.util.Date;
*/
@Data
@Entity
@Table
@Table(indexes = {
@Index(name = "uidx01_user_info", columnList = "username"),
@Index(name = "uidx02_user_info", columnList = "email")
})
public class UserInfoDO {
@Id

View File

@ -18,7 +18,9 @@ import java.util.Date;
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(indexes = {@Index(columnList = "appId")})
@Table(indexes = {
@Index(name = "idx01_workflow_info",columnList = "appId,status,timeExpressionType,nextTriggerTime")
})
public class WorkflowInfoDO {
@Id

View File

@ -18,7 +18,13 @@ import java.util.Date;
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table
@Table(
uniqueConstraints = {@UniqueConstraint(name = "uidx01_wf_instance", columnNames = {"wfInstanceId"})},
indexes = {
@Index(name = "idx01_wf_instance", columnList = "workflowId,status"),
@Index(name = "idx01_wf_instance", columnList = "appId,status,expectedTriggerTime")
}
)
public class WorkflowInstanceInfoDO {
@Id

View File

@ -21,7 +21,9 @@ import java.util.Date;
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(indexes = {@Index(columnList = "appId"), @Index(columnList = "workflowId")})
@Table(indexes = {
@Index(name = "idx01_workflow_node_info", columnList = "workflowId,gmtCreate")
})
public class WorkflowNodeInfoDO {
@Id

View File

@ -0,0 +1,45 @@
package tech.powerjob.server.persistence.remote.model.brief;
import lombok.Data;
/**
* @author Echo009
* @since 2022/9/13
*/
@Data
public class BriefInstanceInfo {
private Long appId;
private Long id;
/**
* 任务ID
*/
private Long jobId;
/**
* 任务所属应用的ID冗余提高查询效率
*/
private Long instanceId;
/**
* 总共执行的次数用于重试判断
*/
private Long runningTimes;
public BriefInstanceInfo(Long appId, Long id, Long jobId, Long instanceId) {
this.appId = appId;
this.id = id;
this.jobId = jobId;
this.instanceId = instanceId;
}
public BriefInstanceInfo(Long appId, Long id, Long jobId, Long instanceId, Long runningTimes) {
this.appId = appId;
this.id = id;
this.jobId = jobId;
this.instanceId = instanceId;
this.runningTimes = runningTimes;
}
}

View File

@ -1,5 +1,7 @@
package tech.powerjob.server.persistence.remote.repository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
@ -25,4 +27,8 @@ public interface AppInfoRepository extends JpaRepository<AppInfoDO, Long> {
* 其实只需要 id处于性能考虑可以直接写SQL只返回ID
*/
List<AppInfoDO> findAllByCurrentServer(String currentServer);
@Query(value = "select id from AppInfoDO where currentServer = :currentServer")
List<Long> listAppIdByCurrentServer(@Param("currentServer")String currentServer);
}

View File

@ -1,12 +1,14 @@
package tech.powerjob.server.persistence.remote.repository;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
import tech.powerjob.server.persistence.remote.model.brief.BriefInstanceInfo;
import javax.transaction.Transactional;
import java.util.Date;
@ -23,10 +25,26 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
/**
* 统计当前JOB有多少实例正在运行
*/
@Query(value = "select count(*) from InstanceInfoDO where jobId = ?1 and status in ?2")
long countByJobIdAndStatusIn(long jobId, List<Integer> status);
List<InstanceInfoDO> findByJobIdAndStatusIn(long jobId, List<Integer> status);
/**
* 更新状态变更信息
*
* @param lastReportTime 最近一次上报时间
* @param modifyTime 更新时间
* @param runningTimes 运行次数
* @param instanceId 实例 ID
* @param status 目标状态
* @param oldStatus 旧状态
* @return 更新记录数
*/
@Transactional(rollbackOn = Exception.class)
@Modifying
@Query(value = "update InstanceInfoDO set lastReportTime = :lastReportTime, gmtModified = :modifyTime, runningTimes = :runningTimes, status = :status where instanceId = :instanceId and status = :oldStatus")
int updateStatusChangeInfoByInstanceIdAndStatus(@Param("lastReportTime") long lastReportTime, @Param("modifyTime") Date modifyTime, @Param("runningTimes") long runningTimes, @Param("status") int status, @Param("instanceId") long instanceId, @Param("oldStatus") int oldStatus);
/**
* 更新任务执行记录内容DispatchService专用
@ -46,6 +64,7 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
@Query(value = "update InstanceInfoDO set status = :status, actualTriggerTime = :actualTriggerTime, finishedTime = :finishedTime, taskTrackerAddress = :taskTrackerAddress, result = :result, gmtModified = :modifyTime where instanceId = :instanceId")
int update4TriggerFailed(@Param("instanceId") long instanceId, @Param("status") int status, @Param("actualTriggerTime") long actualTriggerTime, @Param("finishedTime") long finishedTime, @Param("taskTrackerAddress") String taskTrackerAddress, @Param("result") String result, @Param("modifyTime") Date modifyTime);
/**
* 更新任务执行记录内容DispatchService专用
*
@ -54,13 +73,28 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
* @param actualTriggerTime 实际调度时间
* @param taskTrackerAddress taskTracker 地址
* @param modifyTime 更新时间
* @param oldStatus 旧状态
* @return 更新记录数量
*/
@Transactional(rollbackOn = Exception.class)
@Modifying
@CanIgnoreReturnValue
@Query(value = "update InstanceInfoDO set status = :status, actualTriggerTime = :actualTriggerTime, taskTrackerAddress = :taskTrackerAddress, gmtModified = :modifyTime where instanceId = :instanceId")
int update4TriggerSucceed(@Param("instanceId") long instanceId, @Param("status") int status, @Param("actualTriggerTime") long actualTriggerTime, @Param("taskTrackerAddress") String taskTrackerAddress, @Param("modifyTime") Date modifyTime);
@Query(value = "update InstanceInfoDO set status = :status, actualTriggerTime = :actualTriggerTime, taskTrackerAddress = :taskTrackerAddress, gmtModified = :modifyTime where instanceId = :instanceId and status = :oldStatus")
int update4TriggerSucceed(@Param("instanceId") long instanceId, @Param("status") int status, @Param("actualTriggerTime") long actualTriggerTime, @Param("taskTrackerAddress") String taskTrackerAddress, @Param("modifyTime") Date modifyTime, @Param("oldStatus") int oldStatus);
@Transactional(rollbackOn = Exception.class)
@Modifying
@CanIgnoreReturnValue
@Query(value = "update InstanceInfoDO set status = :status, gmtModified = :modifyTime where instanceId = :instanceId and status = :originStatus ")
int updateStatusAndGmtModifiedByInstanceIdAndOriginStatus(@Param("instanceId") long instanceId, @Param("originStatus") int originStatus, @Param("status") int status, @Param("modifyTime") Date modifyTime);
@Transactional(rollbackOn = Exception.class)
@Modifying
@CanIgnoreReturnValue
@Query(value = "update InstanceInfoDO set status = :status, gmtModified = :modifyTime where instanceId in (:instanceIdList) and status = :originStatus ")
int updateStatusAndGmtModifiedByInstanceIdListAndOriginStatus(@Param("instanceIdList") List<Long> instanceIdList, @Param("originStatus") int originStatus, @Param("status") int status, @Param("modifyTime") Date modifyTime);
/**
* 更新固定频率任务的执行记录
@ -77,19 +111,20 @@ public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Lo
@Query(value = "update InstanceInfoDO set status = :status, runningTimes = :runningTimes, gmtModified = :modifyTime where instanceId = :instanceId")
int update4FrequentJob(@Param("instanceId") long instanceId, @Param("status") int status, @Param("runningTimes") long runningTimes, @Param("modifyTime") Date modifyTime);
/* --状态检查三兄弟,对应 WAITING_DISPATCH 、 WAITING_WORKER_RECEIVE 和 RUNNING 三阶段数据量一般不大就不单独写SQL优化 IO 了-- */
List<InstanceInfoDO> findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan(@Param("appIds") List<Long> appIds, @Param("status") int status, @Param("time") long time, Pageable pageable);
List<InstanceInfoDO> findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> jobIds, int status, long time);
@Query(value = "select new tech.powerjob.server.persistence.remote.model.brief.BriefInstanceInfo(i.appId,i.id,i.jobId,i.instanceId) from InstanceInfoDO i where i.appId in (:appIds) and i.status = :status and i.actualTriggerTime < :time")
List<BriefInstanceInfo> selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan(@Param("appIds") List<Long> appIds, @Param("status") int status, @Param("time") long time, Pageable pageable);
List<InstanceInfoDO> findByAppIdInAndStatusAndActualTriggerTimeLessThan(List<Long> jobIds, int status, long time);
List<InstanceInfoDO> findByAppIdInAndStatusAndGmtModifiedBefore(List<Long> jobIds, int status, Date time);
@Query(value = "select new tech.powerjob.server.persistence.remote.model.brief.BriefInstanceInfo(i.appId,i.id,i.jobId,i.instanceId,i.runningTimes) from InstanceInfoDO i where i.appId in (:appIds) and i.status = :status and i.gmtModified < :time")
List<BriefInstanceInfo> selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(@Param("appIds") List<Long> appIds, @Param("status") int status, @Param("time") Date time, Pageable pageable);
InstanceInfoDO findByInstanceId(long instanceId);
/* --数据统计-- */
@Query(value = "select count(*) from InstanceInfoDO where appId = ?1 and status = ?2")
long countByAppIdAndStatus(long appId, int status);
long countByAppIdAndStatusAndGmtCreateAfter(long appId, int status, Date time);

View File

@ -7,6 +7,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;
import java.util.Collection;
import java.util.List;
import java.util.Set;
@ -44,4 +45,6 @@ public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long>, JpaSp
List<JobInfoDO> findByAppId(Long appId);
List<JobInfoDO> findByIdIn(Collection<Long> jobIds);
}

View File

@ -16,13 +16,13 @@ import javax.transaction.Transactional;
public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> {
@Modifying
@Transactional
@Transactional(rollbackOn = Exception.class)
@Query(value = "delete from OmsLockDO where lockName = ?1")
int deleteByLockName(String lockName);
OmsLockDO findByLockName(String lockName);
@Modifying
@Transactional
@Transactional(rollbackOn = Exception.class)
int deleteByOwnerIP(String ip);
}

View File

@ -20,13 +20,18 @@ public interface WorkflowInfoRepository extends JpaRepository<WorkflowInfoDO, Lo
/**
* 查询指定 APP 下所有的工作流信息
*
* @param appId APP ID
* @return APP 下的所有工作流信息
*/
List<WorkflowInfoDO> findByAppId(Long appId);
/** 对外查询list三兄弟 */
/**
* 对外查询list三兄弟
*/
Page<WorkflowInfoDO> findByAppIdAndStatusNot(Long appId, int nStatus, Pageable pageable);
Page<WorkflowInfoDO> findByIdAndStatusNot(Long id, int nStatus, Pageable pageable);
Page<WorkflowInfoDO> findByAppIdAndStatusNotAndWfNameLike(Long appId, int nStatus, String condition, Pageable pageable);
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.2.0</version>
<version>4.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -37,20 +37,25 @@ import java.util.concurrent.TimeUnit;
@Service
public class ServerElectionService {
@Resource
private LockService lockService;
@Resource
private TransportService transportService;
@Resource
private AppInfoRepository appInfoRepository;
private final LockService lockService;
@Value("${oms.accurate.select.server.percentage}")
private int accurateSelectServerPercentage;
private final TransportService transportService;
private final AppInfoRepository appInfoRepository;
private final int accurateSelectServerPercentage;
private static final int RETRY_TIMES = 10;
private static final long PING_TIMEOUT_MS = 1000;
private static final String SERVER_ELECT_LOCK = "server_elect_%d";
public ServerElectionService(LockService lockService, TransportService transportService, AppInfoRepository appInfoRepository,@Value("${oms.accurate.select.server.percentage}") int accurateSelectServerPercentage) {
this.lockService = lockService;
this.transportService = transportService;
this.appInfoRepository = appInfoRepository;
this.accurateSelectServerPercentage = accurateSelectServerPercentage;
}
public String elect(Long appId, String protocol, String currentServer) {
if (!accurate()) {
// 如果是本机就不需要查数据库那么复杂的操作了直接返回成功

View File

@ -4,6 +4,7 @@ import akka.pattern.Patterns;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import lombok.RequiredArgsConstructor;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.response.AskResponse;
@ -39,10 +40,10 @@ import java.util.concurrent.CompletionStage;
@Aspect
@Component
@Order(0)
@RequiredArgsConstructor
public class DesignateServerAspect {
@Resource
private AppInfoRepository appInfoRepository;
private final AppInfoRepository appInfoRepository;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -70,7 +71,7 @@ public class DesignateServerAspect {
}
if (appId == null) {
throw new PowerJobException("can't find appId in params for:" + signature.toString());
throw new PowerJobException("can't find appId in params for:" + signature);
}
// 获取执行机器

View File

@ -21,11 +21,17 @@ import java.util.Map;
@Slf4j
public class ClusterStatusHolder {
// 集群所属的应用名称
/**
* 集群所属的应用名称
*/
private final String appName;
// 集群中所有机器的信息
/**
* 集群中所有机器的信息
*/
private final Map<String, WorkerInfo> address2WorkerInfo;
// 集群中所有机器的容器部署状态 containerId -> (workerAddress -> containerInfo)
/**
* 集群中所有机器的容器部署状态 containerId -> (workerAddress -> containerInfo)
*/
private Map<Long, Map<String, DeployedContainerInfo>> containerId2Infos;

View File

@ -18,8 +18,10 @@ import java.util.Set;
@Slf4j
public class WorkerClusterManagerService {
// 存储Worker健康信息appId -> ClusterStatusHolder
private static final Map<Long, ClusterStatusHolder> appId2ClusterStatus = Maps.newConcurrentMap();
/**
* 存储Worker健康信息appId -> ClusterStatusHolder
*/
private static final Map<Long, ClusterStatusHolder> APP_ID_2_CLUSTER_STATUS = Maps.newConcurrentMap();
/**
* 更新状态
@ -28,7 +30,7 @@ public class WorkerClusterManagerService {
public static void updateStatus(WorkerHeartbeat heartbeat) {
Long appId = heartbeat.getAppId();
String appName = heartbeat.getAppName();
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
clusterStatusHolder.updateStatus(heartbeat);
}
@ -38,7 +40,7 @@ public class WorkerClusterManagerService {
*/
public static void clean(List<Long> usingAppIds) {
Set<Long> keys = Sets.newHashSet(usingAppIds);
appId2ClusterStatus.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));
APP_ID_2_CLUSTER_STATUS.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));
}
@ -46,11 +48,11 @@ public class WorkerClusterManagerService {
* 清理缓存信息防止 OOM
*/
public static void cleanUp() {
appId2ClusterStatus.values().forEach(ClusterStatusHolder::release);
APP_ID_2_CLUSTER_STATUS.values().forEach(ClusterStatusHolder::release);
}
protected static Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {
return appId2ClusterStatus;
return APP_ID_2_CLUSTER_STATUS;
}
}

View File

@ -26,9 +26,8 @@ import java.util.Optional;
@Service
public class WorkerClusterQueryService {
private List<WorkerFilter> workerFilters;
private final List<WorkerFilter> workerFilters;
@Autowired
public WorkerClusterQueryService(List<WorkerFilter> workerFilters) {
this.workerFilters = workerFilters;
}
@ -92,7 +91,6 @@ public class WorkerClusterQueryService {
*/
public Optional<WorkerInfo> getWorkerInfoByAddress(Long appId, String address) {
// this may cause NPE while address value is null .
//return Optional.ofNullable(getWorkerInfosByAppId(appId).get(address));
final Map<String, WorkerInfo> workerInfosByAppId = getWorkerInfosByAppId(appId);
//add null check for both workerInfos Map and address
if (null != workerInfosByAppId && null != address) {

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>powerjob-server</artifactId>
<groupId>tech.powerjob</groupId>
<version>4.2.0</version>
<version>4.2.1</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.config;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -25,10 +26,10 @@ import static springfox.documentation.builders.PathSelectors.any;
@Configuration
@EnableSwagger2
@ConditionalOnProperty(name = PowerJobServerConfigKey.SWAGGER_UI_ENABLE, havingValue = "true")
@RequiredArgsConstructor
public class SwaggerConfig {
@Resource
private ServerInfoService serverInfoService;
private final ServerInfoService serverInfoService;
@Bean
public Docket createRestApi() {

View File

@ -1,18 +1,17 @@
package tech.powerjob.server.config;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import tech.powerjob.server.common.RejectedExecutionHandlerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import tech.powerjob.server.common.RejectedExecutionHandlerFactory;
import tech.powerjob.server.common.constants.PJThreadPool;
import java.util.concurrent.*;
import tech.powerjob.server.common.thread.NewThreadRunRejectedExecutionHandler;
/**
* 公用线程池配置
@ -34,7 +33,7 @@ public class ThreadPoolConfig {
executor.setQueueCapacity(0);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("PJ-TIMING-");
executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun(PJThreadPool.TIMING_POOL));
executor.setRejectedExecutionHandler(new NewThreadRunRejectedExecutionHandler(PJThreadPool.TIMING_POOL));
return executor;
}
@ -62,12 +61,14 @@ public class ThreadPoolConfig {
return executor;
}
// 引入 WebSocket 支持后需要手动初始化调度线程池
/**
* 引入 WebSocket 支持后需要手动初始化调度线程池
*/
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
scheduler.setThreadNamePrefix("PJ-WS-");
scheduler.setPoolSize(Math.max(Runtime.getRuntime().availableProcessors() * 8, 32));
scheduler.setThreadNamePrefix("PJ-DEFAULT-");
scheduler.setDaemon(true);
return scheduler;
}

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.web.controller;
import lombok.RequiredArgsConstructor;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
@ -31,12 +32,12 @@ import java.util.stream.Collectors;
*/
@RestController
@RequestMapping("/appInfo")
@RequiredArgsConstructor
public class AppInfoController {
@Resource
private AppInfoService appInfoService;
@Resource
private AppInfoRepository appInfoRepository;
private final AppInfoService appInfoService;
private final AppInfoRepository appInfoRepository;
private static final int MAX_APP_NUM = 200;

View File

@ -41,15 +41,21 @@ import java.util.stream.Collectors;
@RequestMapping("/container")
public class ContainerController {
@Value("${server.port}")
private int port;
@Resource
private ContainerService containerService;
@Resource
private AppInfoRepository appInfoRepository;
@Resource
private ContainerInfoRepository containerInfoRepository;
private final int port;
private final ContainerService containerService;
private final AppInfoRepository appInfoRepository;
private final ContainerInfoRepository containerInfoRepository;
public ContainerController(@Value("${server.port}") int port, ContainerService containerService, AppInfoRepository appInfoRepository, ContainerInfoRepository containerInfoRepository) {
this.port = port;
this.containerService = containerService;
this.appInfoRepository = appInfoRepository;
this.containerInfoRepository = containerInfoRepository;
}
@GetMapping("/downloadJar")
public void downloadJar(String version, HttpServletResponse response) throws IOException {

Some files were not shown because too many files have changed in this diff Show More