Merge branch 'v3.2.1' into jenkins_auto_build

This commit is contained in:
朱八 2020-07-25 18:00:46 +08:00
commit b53438836a
42 changed files with 381 additions and 115 deletions

View File

@ -1,7 +1,6 @@
--- ---
name: bug report name: "\U0001F41B Bug Report"
about: Create a report to help us improve about: Something doesn't work as expected
title: "[BUG] bug report"
labels: bug labels: bug
assignees: KFCFans assignees: KFCFans

View File

@ -1,7 +1,6 @@
--- ---
name: feature request name: "\U0001F680 Feature Request"
about: Suggest an idea for this project about: Suggest an idea for this project
title: "[Feature] feature request"
labels: new feature labels: new feature
assignees: '' assignees: ''

14
.github/ISSUE_TEMPLATE/question.md vendored Normal file
View File

@ -0,0 +1,14 @@
---
name: "\U0001F914 Question"
about: Usage question that isn't answered in docs or discussion
---
## Question
Before asking a question, make sure you have在提问之前请确保你已经:
- Read documentation仔细阅读了官方文档
- Googled your question百度搜索了你的问题
- Searched open and closed GitHub issues搜索了开放和关闭的 GitHub issues
Please pay attention on issues you submitted, because we maybe need more details.

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId> <artifactId>powerjob-client</artifactId>
<version>3.2.0</version> <version>3.2.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<powerjob.common.version>3.2.0</powerjob.common.version> <powerjob.common.version>3.2.1</powerjob.common.version>
<junit.version>5.6.1</junit.version> <junit.version>5.6.1</junit.version>
</properties> </properties>

View File

@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.OpenAPIConstant;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.*; import com.github.kfcfans.powerjob.common.response.*;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.HttpUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -31,13 +32,13 @@ public class OhMyClient {
private Long appId; private Long appId;
private String currentAddress; private String currentAddress;
private List<String> allAddress; private final List<String> allAddress;
private static final String URL_PATTERN = "http://%s%s%s"; private static final String URL_PATTERN = "http://%s%s%s";
/** /**
* 初始化 OhMyClient 客户端 * 初始化 OhMyClient 客户端
* @param domain www.oms-server.com内网域名自行完成DNS & Proxy * @param domain 比如 www.powerjob-server.com内网域名自行完成 DNS & Proxy
* @param appName 负责的应用名称 * @param appName 负责的应用名称
*/ */
public OhMyClient(String domain, String appName, String password) { public OhMyClient(String domain, String appName, String password) {
@ -52,8 +53,8 @@ public class OhMyClient {
*/ */
public OhMyClient(List<String> addressList, String appName, String password) { public OhMyClient(List<String> addressList, String appName, String password) {
Objects.requireNonNull(addressList, "domain can't be null!"); CommonUtils.requireNonNull(addressList, "domain can't be null!");
Objects.requireNonNull(appName, "appName can't be null"); CommonUtils.requireNonNull(appName, "appName can't be null");
allAddress = addressList; allAddress = addressList;
for (String addr : addressList) { for (String addr : addressList) {
@ -77,7 +78,7 @@ public class OhMyClient {
if (StringUtils.isEmpty(currentAddress)) { if (StringUtils.isEmpty(currentAddress)) {
throw new OmsException("no server available"); throw new OmsException("no server available");
} }
log.info("[OhMyClient] {}'s oms-client bootstrap successfully.", appName); log.info("[OhMyClient] {}'s oms-client bootstrap successfully, using server: {}", appName, currentAddress);
} }
private static String assertApp(String appName, String password, String url) throws IOException { private static String assertApp(String appName, String password, String url) throws IOException {
@ -211,6 +212,22 @@ public class OhMyClient {
return JsonUtils.parseObject(post, ResultDTO.class); return JsonUtils.parseObject(post, ResultDTO.class);
} }
/**
* 取消任务实例
* 接口使用条件调用接口时间与待取消任务的预计执行时间有一定时间间隔否则不保证可靠性
* @param instanceId 任务实例ID
* @return true 代表取消成功false 取消失败
* @throws Exception 异常
*/
public ResultDTO<Void> cancelInstance(Long instanceId) throws Exception {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.CANCEL_INSTANCE, body);
return JsonUtils.parseObject(post, ResultDTO.class);
}
/** /**
* 查询任务实例状态 * 查询任务实例状态
* @param instanceId 应用实例ID * @param instanceId 应用实例ID
@ -364,28 +381,35 @@ public class OhMyClient {
private String postHA(String path, RequestBody requestBody) { private String postHA(String path, RequestBody requestBody) {
// 先尝试默认地址 // 先尝试默认地址
String url = getUrl(path, currentAddress);
try { try {
String res = HttpUtils.post(getUrl(path, currentAddress), requestBody); String res = HttpUtils.post(url, requestBody);
if (StringUtils.isNotEmpty(res)) { if (StringUtils.isNotEmpty(res)) {
return res; return res;
} }
}catch (Exception ignore) { }catch (Exception e) {
log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString());
} }
// 失败开始重试 // 失败开始重试
for (String addr : allAddress) { for (String addr : allAddress) {
if (Objects.equals(addr, currentAddress)) {
continue;
}
url = getUrl(path, addr);
try { try {
String res = HttpUtils.post(getUrl(path, addr), requestBody); String res = HttpUtils.post(url, requestBody);
if (StringUtils.isNotEmpty(res)) { if (StringUtils.isNotEmpty(res)) {
log.warn("[OhMyClient] server change: from({}) -> to({}).", currentAddress, addr); log.warn("[OhMyClient] server change: from({}) -> to({}).", currentAddress, addr);
currentAddress = addr; currentAddress = addr;
return res; return res;
} }
}catch (Exception ignore) { }catch (Exception e) {
log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString());
} }
} }
log.error("[OhMyClient] no server available in {}.", allAddress); log.error("[OhMyClient] do post for path: {} failed because of no server available in {}.", path, allAddress);
throw new OmsException("no server available"); throw new OmsException("no server available");
} }
} }

View File

@ -9,6 +9,8 @@ import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.concurrent.TimeUnit;
/** /**
* 测试 Client * 测试 Client
* *
@ -87,4 +89,24 @@ public class TestClient {
public void testFetchInstanceStatus() throws Exception { public void testFetchInstanceStatus() throws Exception {
System.out.println(ohMyClient.fetchInstanceStatus(141251409466097728L)); System.out.println(ohMyClient.fetchInstanceStatus(141251409466097728L));
} }
@Test
public void testCancelInstanceInTimeWheel() throws Exception {
ResultDTO<Long> startRes = ohMyClient.runJob(15L, "start by OhMyClient", 20000);
System.out.println("runJob result: " + JsonUtils.toJSONString(startRes));
ResultDTO<Void> cancelRes = ohMyClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes));
}
@Test
public void testCancelInstanceInDatabase() throws Exception {
ResultDTO<Long> startRes = ohMyClient.runJob(15L, "start by OhMyClient", 2000000);
System.out.println("runJob result: " + JsonUtils.toJSONString(startRes));
// 手动重启 server干掉时间轮中的调度数据
TimeUnit.MINUTES.sleep(1);
ResultDTO<Void> cancelRes = ohMyClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes));
}
} }

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId> <artifactId>powerjob-common</artifactId>
<version>3.2.0</version> <version>3.2.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>

View File

@ -21,6 +21,7 @@ public enum InstanceStatus {
RUNNING(3, "运行中"), RUNNING(3, "运行中"),
FAILED(4, "失败"), FAILED(4, "失败"),
SUCCEED(5, "成功"), SUCCEED(5, "成功"),
CANCELED(9, "取消"),
STOPPED(10, "手动停止"); STOPPED(10, "手动停止");
private int v; private int v;
@ -29,7 +30,7 @@ public enum InstanceStatus {
// 广义的运行状态 // 广义的运行状态
public static final List<Integer> generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v); public static final List<Integer> generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v);
// 结束状态 // 结束状态
public static final List<Integer> finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v); public static final List<Integer> finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, CANCELED.v, STOPPED.v);
public static InstanceStatus of(int v) { public static InstanceStatus of(int v) {
for (InstanceStatus is : values()) { for (InstanceStatus is : values()) {

View File

@ -22,6 +22,7 @@ public class OpenAPIConstant {
/* ************* Instance 区 ************* */ /* ************* Instance 区 ************* */
public static final String STOP_INSTANCE = "/stopInstance"; public static final String STOP_INSTANCE = "/stopInstance";
public static final String CANCEL_INSTANCE = "/cancelInstance";
public static final String FETCH_INSTANCE_STATUS = "/fetchInstanceStatus"; public static final String FETCH_INSTANCE_STATUS = "/fetchInstanceStatus";
public static final String FETCH_INSTANCE_INFO = "/fetchInstanceInfo"; public static final String FETCH_INSTANCE_INFO = "/fetchInstanceInfo";

View File

@ -30,6 +30,7 @@ public class SystemInstanceResult {
// 被用户手动停止 // 被用户手动停止
public static final String STOPPED_BY_USER = "stopped by user"; public static final String STOPPED_BY_USER = "stopped by user";
public static final String CANCELED_BY_USER = "canceled by user";
} }

View File

@ -16,6 +16,8 @@ import java.util.List;
@NoArgsConstructor @NoArgsConstructor
public class InstanceDetail implements OmsSerializable { public class InstanceDetail implements OmsSerializable {
// 任务预计执行时间
private Long expectedTriggerTime;
// 任务整体开始时间 // 任务整体开始时间
private Long actualTriggerTime; private Long actualTriggerTime;
// 任务整体结束时间可能不存在 // 任务整体结束时间可能不存在

View File

@ -17,7 +17,7 @@ import java.util.concurrent.TimeUnit;
*/ */
public class HttpUtils { public class HttpUtils {
private static OkHttpClient client; private static final OkHttpClient client;
private static final int HTTP_SUCCESS_CODE = 200; private static final int HTTP_SUCCESS_CODE = 200;
static { static {

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId> <artifactId>powerjob-server</artifactId>
<version>3.2.0</version> <version>3.2.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<swagger.version>2.9.2</swagger.version> <swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version> <springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.common.version>3.2.0</powerjob.common.version> <powerjob.common.version>3.2.1</powerjob.common.version>
<mysql.version>8.0.19</mysql.version> <mysql.version>8.0.19</mysql.version>
<h2.db.version>1.4.200</h2.db.version> <h2.db.version>1.4.200</h2.db.version>
<zip4j.version>2.5.2</zip4j.version> <zip4j.version>2.5.2</zip4j.version>
@ -24,6 +24,7 @@
<mvn.invoker.version>3.0.1</mvn.invoker.version> <mvn.invoker.version>3.0.1</mvn.invoker.version>
<commons.net.version>3.6</commons.net.version> <commons.net.version>3.6</commons.net.version>
<fastjson.version>1.2.68</fastjson.version> <fastjson.version>1.2.68</fastjson.version>
<druid.starter.version>1.1.23</druid.starter.version>
<!-- 部署时跳过该module --> <!-- 部署时跳过该module -->
<maven.deploy.skip>true</maven.deploy.skip> <maven.deploy.skip>true</maven.deploy.skip>
@ -78,6 +79,18 @@
<artifactId>spring-boot-starter-data-jpa</artifactId> <artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${springboot.version}</version> <version>${springboot.version}</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.starter.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId> <artifactId>spring-boot-starter-data-mongodb</artifactId>

View File

@ -1,27 +1,59 @@
package com.github.kfcfans.powerjob.server; package com.github.kfcfans.powerjob.server;
import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
import java.io.File;
/** /**
* SpringBoot 启动入口 * SpringBoot 启动入口
* *
* @author tjq * @author tjq
* @since 2020/3/29 * @since 2020/3/29
*/ */
@Slf4j
@EnableScheduling @EnableScheduling
@SpringBootApplication @SpringBootApplication
public class OhMyApplication { public class OhMyApplication {
private static final String TIPS = "\n\n" +
"******************* PowerJob Tips *******************\n" +
"如果应用无法启动,我们建议您仔细阅读以下文档来解决:\n" +
"if server can't startup, we recommend that you read the documentation to find a solution:\n" +
"https://www.yuque.com/powerjob/guidence/xp5ygc#xMQC9\n" +
"******************* PowerJob Tips *******************\n\n";
public static void main(String[] args) { public static void main(String[] args) {
// 完成前置工作
pre();
// 先启动 ActorSystem // 先启动 ActorSystem
OhMyServer.init(); OhMyServer.init();
// 再启动SpringBoot // 再启动SpringBoot
SpringApplication.run(OhMyApplication.class, args); try {
SpringApplication.run(OhMyApplication.class, args);
}catch (Exception e) {
log.error(TIPS);
throw e;
}
}
private static void pre() {
log.info(TIPS);
// 删除历史遗留的 H2 数据库文件
try {
FileUtils.forceDeleteOnExit(new File(OmsFileUtils.genH2Path()));
}catch (Exception e) {
log.warn("[PowerJob] delete h2 workspace({}) failed, if server can't startup successfully, please delete it manually", OmsFileUtils.genH2Path(), e);
}
} }
} }

View File

@ -53,6 +53,14 @@ public class OmsFileUtils {
return genTemporaryPath() + uuid + "/"; return genTemporaryPath() + uuid + "/";
} }
/**
* 获取 H2 数据库工作目录
* @return H2 工作目录
*/
public static String genH2Path() {
return COMMON_PATH + "h2/";
}
/** /**
* 将文本写入文件 * 将文本写入文件
* @param content 文本内容 * @param content 文本内容

View File

@ -30,7 +30,7 @@ public class HashedWheelTimer implements Timer {
private final Indicator indicator; private final Indicator indicator;
private long startTime; private final long startTime;
private final Queue<HashedWheelTimerFuture> waitingTasks = Queues.newLinkedBlockingQueue(); private final Queue<HashedWheelTimerFuture> waitingTasks = Queues.newLinkedBlockingQueue();
private final Queue<HashedWheelTimerFuture> canceledTasks = Queues.newLinkedBlockingQueue(); private final Queue<HashedWheelTimerFuture> canceledTasks = Queues.newLinkedBlockingQueue();

View File

@ -1,9 +1,8 @@
package com.github.kfcfans.powerjob.server.persistence.config; package com.github.kfcfans.powerjob.server.persistence.config;
import com.zaxxer.hikari.HikariConfig; import com.alibaba.druid.pool.DruidDataSource;
import com.zaxxer.hikari.HikariDataSource; import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Primary;
@ -19,26 +18,30 @@ import javax.sql.DataSource;
@Configuration @Configuration
public class MultiDatasourceConfig { public class MultiDatasourceConfig {
private static final String H2_DRIVER_CLASS_NAME = "org.h2.Driver";
private static final String H2_JDBC_URL = "jdbc:h2:file:~/powerjob-server/h2/powerjob_server_db"; private static final String H2_JDBC_URL = "jdbc:h2:file:~/powerjob-server/h2/powerjob_server_db";
private static final int H2_INITIAL_SIZE = 4;
private static final int H2_MIN_SIZE = 4;
private static final int H2_MAX_ACTIVE_SIZE = 10;
private static final String H2_DATASOURCE_NAME = "localDatasource";
@Primary @Primary
@Bean("omsCoreDatasource") @Bean("omsCoreDatasource")
@ConfigurationProperties(prefix = "spring.datasource.core") @ConfigurationProperties(prefix = "spring.datasource.druid")
public DataSource initOmsCoreDatasource() { public DataSource initOmsCoreDatasource() {
return DataSourceBuilder.create().build(); return DruidDataSourceBuilder.create().build();
} }
@Bean("omsLocalDatasource") @Bean("omsLocalDatasource")
public DataSource initOmsLocalDatasource() { public DataSource initOmsLocalDatasource() {
DruidDataSource ds = new DruidDataSource();
HikariConfig config = new HikariConfig(); ds.setDriverClassName(H2_DRIVER_CLASS_NAME);
config.setDriverClassName("org.h2.Driver"); ds.setUrl(H2_JDBC_URL);
config.setJdbcUrl(H2_JDBC_URL); ds.setInitialSize(H2_INITIAL_SIZE);
config.setAutoCommit(true); ds.setMinIdle(H2_MIN_SIZE);
// 池中最小空闲连接数量 ds.setMaxActive(H2_MAX_ACTIVE_SIZE);
config.setMinimumIdle(4); ds.setName(H2_DATASOURCE_NAME);
// 池中最大连接数量 ds.setTestWhileIdle(false);
config.setMaximumPoolSize(32); return ds;
return new HikariDataSource(config);
} }
} }

View File

@ -66,6 +66,14 @@ public class DispatchService {
Date now = new Date(); Date now = new Date();
String dbInstanceParams = instanceParams == null ? "" : instanceParams; String dbInstanceParams = instanceParams == null ? "" : instanceParams;
// 检查当前任务是否被取消
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
InstanceStatus currentStatus = InstanceStatus.of(instanceInfo.getStatus());
if (currentStatus != WAITING_DISPATCH) {
log.info("[Dispatcher-{}|{}] cancel dispatch job due to instance status({}) is not WAITING_DISPATCH", jobId, instanceId, currentStatus.name());
return;
}
// 查询当前运行的实例数 // 查询当前运行的实例数
long current = System.currentTimeMillis(); long current = System.currentTimeMillis();

View File

@ -12,7 +12,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService; import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder; import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -22,7 +22,6 @@ import javax.annotation.Resource;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit;
/** /**
* 任务服务 * 任务服务
@ -103,6 +102,8 @@ public class JobService {
*/ */
public long runJob(Long jobId, String instanceParams, long delay) { public long runJob(Long jobId, String instanceParams, long delay) {
log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobId, instanceParams, delay);
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0)); Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
instanceInfoRepository.flush(); instanceInfoRepository.flush();
@ -110,10 +111,11 @@ public class JobService {
if (delay <= 0) { if (delay <= 0) {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
}else { }else {
HashedWheelTimerHolder.TIMER.schedule(() -> { InstanceTimeWheelService.schedule(instanceId, delay, () -> {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
}, delay, TimeUnit.MILLISECONDS); });
} }
log.info("[Job-{}] run job successfully, instanceId={}", jobId, instanceId);
return instanceId; return instanceId;
} }
@ -171,7 +173,7 @@ public class JobService {
return; return;
} }
if (executeLogs.size() > 1) { if (executeLogs.size() > 1) {
log.warn("[JobService] frequent job should just have one running instance, there must have some bug."); log.warn("[Job-{}] frequent job should just have one running instance, there must have some bug.", jobId);
} }
executeLogs.forEach(instance -> { executeLogs.forEach(instance -> {
try { try {

View File

@ -107,7 +107,7 @@ public class InstanceManager {
log.info("[InstanceManager-{}] instance execute failed but will take the {}th retry.", instanceId, instanceInfo.getRunningTimes()); log.info("[InstanceManager-{}] instance execute failed but will take the {}th retry.", instanceId, instanceInfo.getRunningTimes());
// 延迟10S重试由于重试不改变 instanceId如果派发到同一台机器上一个 TaskTracker 还处于资源释放阶段无法创建新的TaskTracker任务失败 // 延迟10S重试由于重试不改变 instanceId如果派发到同一台机器上一个 TaskTracker 还处于资源释放阶段无法创建新的TaskTracker任务失败
HashedWheelTimerHolder.TIMER.schedule(() -> { HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> {
dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes()); dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes());
}, 10, TimeUnit.SECONDS); }, 10, TimeUnit.SECONDS);

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.service.instance;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.SystemInstanceResult; import com.github.kfcfans.powerjob.common.SystemInstanceResult;
import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.model.InstanceDetail;
@ -12,6 +13,7 @@ import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.response.InstanceInfoDTO; import com.github.kfcfans.powerjob.common.response.InstanceInfoDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.constans.InstanceType; import com.github.kfcfans.powerjob.server.common.constans.InstanceType;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService; import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
@ -86,12 +88,7 @@ public class InstanceService {
log.info("[Instance-{}] try to stop the instance.", instanceId); log.info("[Instance-{}] try to stop the instance.", instanceId);
try { try {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
if (instanceInfo == null) {
log.warn("[Instance-{}] can't find instanceInfo by instanceId.", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
// 判断状态只有运行中才能停止 // 判断状态只有运行中才能停止
if (!InstanceStatus.generalizedRunningStatus.contains(instanceInfo.getStatus())) { if (!InstanceStatus.generalizedRunningStatus.contains(instanceInfo.getStatus())) {
throw new IllegalArgumentException("can't stop finished instance!"); throw new IllegalArgumentException("can't stop finished instance!");
@ -124,17 +121,53 @@ public class InstanceService {
} }
} }
/**
* 取消任务实例的运行
* 接口使用条件调用接口时间与待取消任务的预计执行时间有一定时间间隔否则不保证可靠性
* @param instanceId 任务实例
*/
public void cancelInstance(Long instanceId) {
log.info("[Instance-{}] try to cancel the instance.", instanceId);
try {
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
TimerFuture timerFuture = InstanceTimeWheelService.fetchTimerFuture(instanceId);
boolean success;
// 本机时间轮中存在该任务且顺利取消抢救成功
if (timerFuture != null) {
success = timerFuture.cancel();
} else {
// 调用该接口时间和预计调度时间相近时理论上会出现问题cancel 状态还没写进去另一边就完成了 dispatch随后状态会被覆盖
// 解决该问题的成本极高分布式锁因此选择不解决
// 该接口使用条件调用接口时间与待取消任务的预计执行时间有一定时间间隔否则不保证可靠性
success = InstanceStatus.WAITING_DISPATCH.getV() == instanceInfo.getStatus();
}
if (success) {
instanceInfo.setStatus(InstanceStatus.CANCELED.getV());
instanceInfo.setResult(SystemInstanceResult.CANCELED_BY_USER);
// 如果写 DB 失败抛异常接口返回 false即取消失败任务会被 HA 机制重新调度执行因此此处不需要任何处理
instanceInfoRepository.saveAndFlush(instanceInfo);
log.info("[Instance-{}] cancel the instance successfully.", instanceId);
}else {
log.warn("[Instance-{}] cancel the instance failed.", instanceId);
throw new OmsException("instance already up and running");
}
}catch (Exception e) {
log.error("[Instance-{}] cancelInstance failed.", instanceId, e);
throw e;
}
}
/** /**
* 获取任务实例的信息 * 获取任务实例的信息
* @param instanceId 任务实例ID * @param instanceId 任务实例ID
* @return 任务实例的信息 * @return 任务实例的信息
*/ */
public InstanceInfoDTO getInstanceInfo(Long instanceId) { public InstanceInfoDTO getInstanceInfo(Long instanceId) {
InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(instanceId); InstanceInfoDO instanceInfoDO = fetchInstanceInfo(instanceId);
if (instanceInfoDO == null) {
log.warn("[Instance-{}] can't find InstanceInfo by instanceId.", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
InstanceInfoDTO instanceInfoDTO = new InstanceInfoDTO(); InstanceInfoDTO instanceInfoDTO = new InstanceInfoDTO();
BeanUtils.copyProperties(instanceInfoDO, instanceInfoDTO); BeanUtils.copyProperties(instanceInfoDO, instanceInfoDTO);
return instanceInfoDTO; return instanceInfoDTO;
@ -146,11 +179,7 @@ public class InstanceService {
* @return 任务实例的状态 * @return 任务实例的状态
*/ */
public InstanceStatus getInstanceStatus(Long instanceId) { public InstanceStatus getInstanceStatus(Long instanceId) {
InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(instanceId); InstanceInfoDO instanceInfoDO = fetchInstanceInfo(instanceId);
if (instanceInfoDO == null) {
log.warn("[Instance-{}] can't find InstanceInfo by instanceId.", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
return InstanceStatus.of(instanceInfoDO.getStatus()); return InstanceStatus.of(instanceInfoDO.getStatus());
} }
@ -161,11 +190,7 @@ public class InstanceService {
*/ */
public InstanceDetail getInstanceDetail(Long instanceId) { public InstanceDetail getInstanceDetail(Long instanceId) {
InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(instanceId); InstanceInfoDO instanceInfoDO = fetchInstanceInfo(instanceId);
if (instanceInfoDO == null) {
log.warn("[Instance-{}] can't find InstanceInfo by instanceId", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
InstanceStatus instanceStatus = InstanceStatus.of(instanceInfoDO.getStatus()); InstanceStatus instanceStatus = InstanceStatus.of(instanceInfoDO.getStatus());
@ -202,4 +227,12 @@ public class InstanceService {
return detail; return detail;
} }
private InstanceInfoDO fetchInstanceInfo(Long instanceId) {
InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfoDO == null) {
log.warn("[Instance-{}] can't find InstanceInfo by instanceId", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
return instanceInfoDO;
}
} }

View File

@ -0,0 +1,51 @@
package com.github.kfcfans.powerjob.server.service.instance;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTimer;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerTask;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 定时调度任务实例
*
* @author tjq
* @since 2020/7/25
*/
public class InstanceTimeWheelService {
private static final Map<Long, TimerFuture> CARGO = Maps.newConcurrentMap();
// 精确时间轮 1S 走一格
private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
// 支持取消的时间间隔低于该阈值则不会放进 CARGO
private static final long MIN_INTERVAL_MS = 1000;
/**
* 定时调度
* @param uniqueId 唯一 ID必须是 snowflake 算法生成的 ID
* @param delayMS 延迟毫秒数
* @param timerTask 需要执行的目标方法
*/
public static void schedule(Long uniqueId, Long delayMS, TimerTask timerTask) {
TimerFuture timerFuture = TIMER.schedule(() -> {
CARGO.remove(uniqueId);
timerTask.run();
}, delayMS, TimeUnit.MILLISECONDS);
if (delayMS > MIN_INTERVAL_MS) {
CARGO.put(uniqueId, timerFuture);
}
}
/**
* 获取 TimerFuture
* @param uniqueId 唯一 ID
* @return TimerFuture
*/
public static TimerFuture fetchTimerFuture(Long uniqueId) {
return CARGO.get(uniqueId);
}
}

View File

@ -192,6 +192,6 @@ public class InstanceStatusCheckService {
instance.setResult(SystemInstanceResult.REPORT_TIMEOUT); instance.setResult(SystemInstanceResult.REPORT_TIMEOUT);
instanceInfoRepository.saveAndFlush(instance); instanceInfoRepository.saveAndFlush(instance);
instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, "timeout, maybe TaskTracker down!"); instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, SystemInstanceResult.REPORT_TIMEOUT);
} }
} }

View File

@ -10,9 +10,6 @@ import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTime
*/ */
public class HashedWheelTimerHolder { public class HashedWheelTimerHolder {
// 精确时间轮 1S 走一格
public static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
// 非精确时间轮 5S 走一格 // 非精确时间轮 5S 走一格
public static final HashedWheelTimer INACCURATE_TIMER = new HashedWheelTimer(5, 16, 0); public static final HashedWheelTimer INACCURATE_TIMER = new HashedWheelTimer(5, 16, 0);

View File

@ -16,12 +16,12 @@ import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.service.JobService; import com.github.kfcfans.powerjob.server.service.JobService;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService; import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager; import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
@ -34,7 +34,6 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -160,9 +159,9 @@ public class OmsScheduleService {
delay = targetTriggerTime - nowTime; delay = targetTriggerTime - nowTime;
} }
HashedWheelTimerHolder.TIMER.schedule(() -> { InstanceTimeWheelService.schedule(instanceId, delay, () -> {
dispatchService.dispatch(jobInfoDO, instanceId, 0, null, null); dispatchService.dispatch(jobInfoDO, instanceId, 0, null, null);
}, delay, TimeUnit.MILLISECONDS); });
}); });
// 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms // 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms
@ -216,7 +215,7 @@ public class OmsScheduleService {
log.warn("[Workflow-{}] workflow schedule delay, expect:{}, actual: {}", wfInfo.getId(), wfInfo.getNextTriggerTime(), System.currentTimeMillis()); log.warn("[Workflow-{}] workflow schedule delay, expect:{}, actual: {}", wfInfo.getId(), wfInfo.getNextTriggerTime(), System.currentTimeMillis());
delay = 0; delay = 0;
} }
HashedWheelTimerHolder.TIMER.schedule(() -> workflowInstanceManager.start(wfInfo, wfInstanceId), delay, TimeUnit.MILLISECONDS); InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId));
// 3. 重新计算下一次调度时间并更新 // 3. 重新计算下一次调度时间并更新
try { try {

View File

@ -94,6 +94,13 @@ public class OpenAPIController {
return ResultDTO.success(null); return ResultDTO.success(null);
} }
@PostMapping(OpenAPIConstant.CANCEL_INSTANCE)
public ResultDTO<Void> cancelInstance(Long instanceId, Long appId) {
checkInstanceIdValid(instanceId, appId);
instanceService.cancelInstance(instanceId);
return ResultDTO.success(null);
}
@PostMapping(OpenAPIConstant.FETCH_INSTANCE_STATUS) @PostMapping(OpenAPIConstant.FETCH_INSTANCE_STATUS)
public ResultDTO<Integer> fetchInstanceStatus(Long instanceId) { public ResultDTO<Integer> fetchInstanceStatus(Long instanceId) {
InstanceStatus instanceStatus = instanceService.getInstanceStatus(instanceId); InstanceStatus instanceStatus = instanceService.getInstanceStatus(instanceId);

View File

@ -1,5 +1,8 @@
package com.github.kfcfans.powerjob.server.web.controller; package com.github.kfcfans.powerjob.server.web.controller;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
@ -11,6 +14,7 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Optional; import java.util.Optional;
import java.util.TimeZone;
/** /**
* 处理Worker请求的 Controller * 处理Worker请求的 Controller
@ -47,8 +51,13 @@ public class ServerController {
} }
@GetMapping("/hello") @GetMapping("/hello")
public ResultDTO<String> ping() { public ResultDTO<JSONObject> ping() {
return ResultDTO.success("this is powerjob-server~"); JSONObject res = new JSONObject();
res.put("localHost", NetUtils.getLocalHost());
res.put("actorSystemAddress", OhMyServer.getActorSystemAddress());
res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis()));
res.put("serverTimeZone", TimeZone.getDefault().getDisplayName());
return ResultDTO.success(res);
} }
} }

View File

@ -22,6 +22,8 @@ import java.util.List;
@NoArgsConstructor @NoArgsConstructor
public class InstanceDetailVO { public class InstanceDetailVO {
// 任务预计执行时间
private String expectedTriggerTime;
// 任务整体开始时间 // 任务整体开始时间
private String actualTriggerTime; private String actualTriggerTime;
// 任务整体结束时间可能不存在 // 任务整体结束时间可能不存在
@ -68,6 +70,7 @@ public class InstanceDetailVO {
// 格式化时间 // 格式化时间
vo.setFinishedTime(CommonUtils.formatTime(origin.getFinishedTime())); vo.setFinishedTime(CommonUtils.formatTime(origin.getFinishedTime()));
vo.setActualTriggerTime(CommonUtils.formatTime(origin.getActualTriggerTime())); vo.setActualTriggerTime(CommonUtils.formatTime(origin.getActualTriggerTime()));
vo.setExpectedTriggerTime(CommonUtils.formatTime(origin.getExpectedTriggerTime()));
// 拷贝 TaskDetail // 拷贝 TaskDetail
if (origin.getTaskDetail() != null) { if (origin.getTaskDetail() != null) {

View File

@ -1,13 +1,15 @@
oms.env=DAILY oms.env=DAILY
logging.config=classpath:logback-dev.xml logging.config=classpath:logback-dev.xml
####### 数据库配置 ####### ####### 外部数据库配置(需要用户更改为自己的数据库配置) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8 spring.datasource.druid.url=jdbc:mysql://remotehost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.username=root spring.datasource.druid.username=root
spring.datasource.core.password=No1Bug2Please3! spring.datasource.druid.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20 spring.datasource.druid.initial-size=5
spring.datasource.core.hikari.minimum-idle=5 spring.datasource.druid.max-active=10
spring.datasource.druid.test-while-idle=false
spring.datasource.druid.name=remoteDatasource
####### mongoDB配置非核心依赖可移除 ####### ####### mongoDB配置非核心依赖可移除 #######
spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-daily spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-daily

View File

@ -2,12 +2,14 @@ oms.env=PRE
logging.config=classpath:logback-product.xml logging.config=classpath:logback-product.xml
####### 数据库配置 ####### ####### 数据库配置 #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-pre?useUnicode=true&characterEncoding=UTF-8 spring.datasource.druid.url=jdbc:mysql://remotehost:3306/powerjob-pre?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.username=root spring.datasource.druid.username=root
spring.datasource.core.password=No1Bug2Please3! spring.datasource.druid.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20 spring.datasource.druid.initial-size=5
spring.datasource.core.hikari.minimum-idle=5 spring.datasource.druid.max-active=20
spring.datasource.druid.test-while-idle=false
spring.datasource.druid.name=remoteDatasource
####### mongoDB配置非核心依赖可移除 ####### ####### mongoDB配置非核心依赖可移除 #######
spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-pre spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-pre

View File

@ -2,12 +2,14 @@ oms.env=PRODUCT
logging.config=classpath:logback-product.xml logging.config=classpath:logback-product.xml
####### 数据库配置 ####### ####### 数据库配置 #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 spring.datasource.druid.url=jdbc:mysql://localhost:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.username=root spring.datasource.druid.username=root
spring.datasource.core.password=No1Bug2Please3! spring.datasource.druid.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20 spring.datasource.druid.initial-size=5
spring.datasource.core.hikari.minimum-idle=5 spring.datasource.druid.max-active=20
spring.datasource.druid.test-while-idle=false
spring.datasource.druid.name=remoteDatasource
####### mongoDB配置非核心依赖可移除 ####### ####### mongoDB配置非核心依赖可移除 #######
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-product spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-product

View File

@ -11,6 +11,16 @@ spring.servlet.multipart.file-size-threshold=0
spring.servlet.multipart.max-file-size=209715200 spring.servlet.multipart.max-file-size=209715200
spring.servlet.multipart.max-request-size=209715200 spring.servlet.multipart.max-request-size=209715200
# druid 监控配置(监控地址 /druid登陆账号和密码默认都是 powerjob
spring.datasource.druid.filters=stat
spring.datasource.druid.stat-view-servlet.url-pattern=/druid/*
spring.datasource.druid.filter.stat.enabled=true
spring.datasource.druid.filter.stat.log-slow-sql=true
spring.datasource.druid.filter.stat.slow-sql-millis=5000
spring.datasource.druid.stat-view-servlet.enabled=true
spring.datasource.druid.stat-view-servlet.login-username=powerjob
spring.datasource.druid.stat-view-servlet.login-password=powerjob
###### PowerJob 自身配置(该配置只允许存在于 application.properties 文件中) ###### ###### PowerJob 自身配置(该配置只允许存在于 application.properties 文件中) ######
# akka ActorSystem 服务端口 # akka ActorSystem 服务端口
oms.akka.port=10086 oms.akka.port=10086

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId> <artifactId>powerjob-worker-agent</artifactId>
<version>3.2.0</version> <version>3.2.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<powerjob.worker.version>3.2.0</powerjob.worker.version> <powerjob.worker.version>3.2.1</powerjob.worker.version>
<logback.version>1.2.3</logback.version> <logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version> <picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId> <artifactId>powerjob-worker-samples</artifactId>
<version>3.2.0</version> <version>3.2.1</version>
<properties> <properties>
<springboot.version>2.2.6.RELEASE</springboot.version> <springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.version>3.2.0</powerjob.worker.version> <powerjob.worker.version>3.2.1</powerjob.worker.version>
<fastjson.version>1.2.68</fastjson.version> <fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module --> <!-- 部署时跳过该module -->

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId> <artifactId>powerjob-worker</artifactId>
<version>3.2.0</version> <version>3.2.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<spring.version>5.2.4.RELEASE</spring.version> <spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.2.0</powerjob.common.version> <powerjob.common.version>3.2.1</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version> <h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version> <hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version> <junit.version>5.6.1</junit.version>

View File

@ -124,7 +124,7 @@ public class OmsContainerFactory {
oldContainer.destroy(); oldContainer.destroy();
} }
}catch (Exception e) { } catch (Exception e) {
log.error("[OmsContainer-{}] deployContainer(name={},version={}) failed.", containerId, containerName, version, e); log.error("[OmsContainer-{}] deployContainer(name={},version={}) failed.", containerId, containerName, version, e);
// 如果部署失败则删除该 jar本次失败可能是下载jar出错导致不删除会导致这个版本永久无法重新部署 // 如果部署失败则删除该 jar本次失败可能是下载jar出错导致不删除会导致这个版本永久无法重新部署
CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(jarFile)); CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(jarFile));

View File

@ -96,7 +96,9 @@ public abstract class ScriptProcessor implements BasicProcessor {
} }
String result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString()); String result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString());
return new ProcessResult(true, result); // 0 代表正常退出
int exitValue = process.exitValue();
return new ProcessResult(exitValue == 0, result);
}catch (InterruptedException ie) { }catch (InterruptedException ie) {
omsLogger.info("SYSTEM===> ScriptProcessor has been interrupted"); omsLogger.info("SYSTEM===> ScriptProcessor has been interrupted");
return new ProcessResult(false, "Interrupted"); return new ProcessResult(false, "Interrupted");

View File

@ -68,6 +68,8 @@ public class TaskDO {
public String toString() { public String toString() {
return "{" + return "{" +
"taskId='" + taskId + '\'' + "taskId='" + taskId + '\'' +
", instanceId=" + instanceId +
", subInstanceId=" + subInstanceId +
", taskName='" + taskName + '\'' + ", taskName='" + taskName + '\'' +
", address='" + address + '\'' + ", address='" + address + '\'' +
", status=" + status + ", status=" + status +
@ -75,6 +77,7 @@ public class TaskDO {
", failedCnt=" + failedCnt + ", failedCnt=" + failedCnt +
", createdTime=" + createdTime + ", createdTime=" + createdTime +
", lastModifiedTime=" + lastModifiedTime + ", lastModifiedTime=" + lastModifiedTime +
", lastReportTime=" + lastReportTime +
'}'; '}';
} }
} }

View File

@ -1,9 +1,14 @@
package com.github.kfcfans.powerjob; package com.github.kfcfans.powerjob;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.built.PythonProcessor; import com.github.kfcfans.powerjob.worker.core.processor.built.PythonProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.built.ShellProcessor; import com.github.kfcfans.powerjob.worker.core.processor.built.ShellProcessor;
import com.github.kfcfans.powerjob.worker.log.impl.OmsServerLogger;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.concurrent.ThreadLocalRandom;
/** /**
* 测试脚本处理器 * 测试脚本处理器
* *
@ -14,25 +19,37 @@ public class ScriptProcessorTest {
private static final long timeout = 10000; private static final long timeout = 10000;
private static final TaskContext context = new TaskContext();
@BeforeAll
public static void initContext() {
context.setOmsLogger(new OmsServerLogger(1L));
}
@Test @Test
public void testLocalShellProcessor() throws Exception { public void testLocalShellProcessor() throws Exception {
ShellProcessor sp = new ShellProcessor(1L, "ls -a", timeout); ShellProcessor sp = new ShellProcessor(1L, "ls -a", timeout);
sp.process(null); System.out.println(sp.process(context));
ShellProcessor sp2 = new ShellProcessor(2777L, "pwd", timeout); ShellProcessor sp2 = new ShellProcessor(2777L, "pwd", timeout);
sp2.process(null); System.out.println(sp2.process(context));
} }
@Test @Test
public void testLocalPythonProcessor() throws Exception { public void testLocalPythonProcessor() throws Exception {
PythonProcessor pp = new PythonProcessor(2L, "print 'Hello World!'", timeout); PythonProcessor pp = new PythonProcessor(2L, "print 'Hello World!'", timeout);
pp.process(null); System.out.println(pp.process(context));
} }
@Test @Test
public void testNetShellProcessor() throws Exception { public void testNetShellProcessor() throws Exception {
ShellProcessor sp = new ShellProcessor(18L, "http://localhost:8080/test/test.sh", timeout); ShellProcessor sp = new ShellProcessor(18L, "http://localhost:8080/test/test.sh", timeout);
sp.process(null); System.out.println(sp.process(context));
} }
@Test
public void testFailedScript() throws Exception {
ShellProcessor sp3 = new ShellProcessor(ThreadLocalRandom.current().nextLong(), "mvn tjq", timeout);
System.out.println(sp3.process(context));
}
} }