[release] merge branch 'v3.2.1'

This commit is contained in:
朱八 2020-07-26 23:11:32 +08:00
commit 7e9af994c8
53 changed files with 584 additions and 98 deletions

View File

@ -40,8 +40,9 @@
<module>powerjob-server</module>
<module>powerjob-common</module>
<module>powerjob-client</module>
<module>powerjob-worker-samples</module>
<module>powerjob-worker-agent</module>
<module>powerjob-worker-spring-boot-starter</module>
<module>powerjob-worker-samples</module>
</modules>
<properties>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.2.0</version>
<version>3.2.1</version>
<packaging>jar</packaging>
<properties>
<powerjob.common.version>3.2.0</powerjob.common.version>
<powerjob.common.version>3.2.1</powerjob.common.version>
<junit.version>5.6.1</junit.version>
</properties>

View File

@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.OpenAPIConstant;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.*;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.HttpUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.google.common.collect.Lists;
@ -31,13 +32,13 @@ public class OhMyClient {
private Long appId;
private String currentAddress;
private List<String> allAddress;
private final List<String> allAddress;
private static final String URL_PATTERN = "http://%s%s%s";
/**
* 初始化 OhMyClient 客户端
* @param domain www.oms-server.com内网域名自行完成DNS & Proxy
* @param domain 比如 www.powerjob-server.com内网域名自行完成 DNS & Proxy
* @param appName 负责的应用名称
*/
public OhMyClient(String domain, String appName, String password) {
@ -52,8 +53,8 @@ public class OhMyClient {
*/
public OhMyClient(List<String> addressList, String appName, String password) {
Objects.requireNonNull(addressList, "domain can't be null!");
Objects.requireNonNull(appName, "appName can't be null");
CommonUtils.requireNonNull(addressList, "domain can't be null!");
CommonUtils.requireNonNull(appName, "appName can't be null");
allAddress = addressList;
for (String addr : addressList) {
@ -77,7 +78,7 @@ public class OhMyClient {
if (StringUtils.isEmpty(currentAddress)) {
throw new OmsException("no server available");
}
log.info("[OhMyClient] {}'s oms-client bootstrap successfully.", appName);
log.info("[OhMyClient] {}'s oms-client bootstrap successfully, using server: {}", appName, currentAddress);
}
private static String assertApp(String appName, String password, String url) throws IOException {
@ -211,6 +212,22 @@ public class OhMyClient {
return JsonUtils.parseObject(post, ResultDTO.class);
}
/**
* 取消任务实例
* 接口使用条件调用接口时间与待取消任务的预计执行时间有一定时间间隔否则不保证可靠性
* @param instanceId 任务实例ID
* @return true 代表取消成功false 取消失败
* @throws Exception 异常
*/
public ResultDTO<Void> cancelInstance(Long instanceId) throws Exception {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.CANCEL_INSTANCE, body);
return JsonUtils.parseObject(post, ResultDTO.class);
}
/**
* 查询任务实例状态
* @param instanceId 应用实例ID
@ -364,28 +381,35 @@ public class OhMyClient {
private String postHA(String path, RequestBody requestBody) {
// 先尝试默认地址
String url = getUrl(path, currentAddress);
try {
String res = HttpUtils.post(getUrl(path, currentAddress), requestBody);
String res = HttpUtils.post(url, requestBody);
if (StringUtils.isNotEmpty(res)) {
return res;
}
}catch (Exception ignore) {
}catch (Exception e) {
log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString());
}
// 失败开始重试
for (String addr : allAddress) {
if (Objects.equals(addr, currentAddress)) {
continue;
}
url = getUrl(path, addr);
try {
String res = HttpUtils.post(getUrl(path, addr), requestBody);
String res = HttpUtils.post(url, requestBody);
if (StringUtils.isNotEmpty(res)) {
log.warn("[OhMyClient] server change: from({}) -> to({}).", currentAddress, addr);
currentAddress = addr;
return res;
}
}catch (Exception ignore) {
}catch (Exception e) {
log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString());
}
}
log.error("[OhMyClient] no server available in {}.", allAddress);
log.error("[OhMyClient] do post for path: {} failed because of no server available in {}.", path, allAddress);
throw new OmsException("no server available");
}
}

View File

@ -9,6 +9,8 @@ import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.concurrent.TimeUnit;
/**
* 测试 Client
*
@ -87,4 +89,24 @@ public class TestClient {
public void testFetchInstanceStatus() throws Exception {
System.out.println(ohMyClient.fetchInstanceStatus(141251409466097728L));
}
@Test
public void testCancelInstanceInTimeWheel() throws Exception {
ResultDTO<Long> startRes = ohMyClient.runJob(15L, "start by OhMyClient", 20000);
System.out.println("runJob result: " + JsonUtils.toJSONString(startRes));
ResultDTO<Void> cancelRes = ohMyClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes));
}
@Test
public void testCancelInstanceInDatabase() throws Exception {
ResultDTO<Long> startRes = ohMyClient.runJob(15L, "start by OhMyClient", 2000000);
System.out.println("runJob result: " + JsonUtils.toJSONString(startRes));
// 手动重启 server干掉时间轮中的调度数据
TimeUnit.MINUTES.sleep(1);
ResultDTO<Void> cancelRes = ohMyClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes));
}
}

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.2.0</version>
<version>3.2.1</version>
<packaging>jar</packaging>
<properties>

View File

@ -21,6 +21,7 @@ public enum InstanceStatus {
RUNNING(3, "运行中"),
FAILED(4, "失败"),
SUCCEED(5, "成功"),
CANCELED(9, "取消"),
STOPPED(10, "手动停止");
private int v;
@ -29,7 +30,7 @@ public enum InstanceStatus {
// 广义的运行状态
public static final List<Integer> generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v);
// 结束状态
public static final List<Integer> finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v);
public static final List<Integer> finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, CANCELED.v, STOPPED.v);
public static InstanceStatus of(int v) {
for (InstanceStatus is : values()) {

View File

@ -22,6 +22,7 @@ public class OpenAPIConstant {
/* ************* Instance 区 ************* */
public static final String STOP_INSTANCE = "/stopInstance";
public static final String CANCEL_INSTANCE = "/cancelInstance";
public static final String FETCH_INSTANCE_STATUS = "/fetchInstanceStatus";
public static final String FETCH_INSTANCE_INFO = "/fetchInstanceInfo";

View File

@ -30,6 +30,7 @@ public class SystemInstanceResult {
// 被用户手动停止
public static final String STOPPED_BY_USER = "stopped by user";
public static final String CANCELED_BY_USER = "canceled by user";
}

View File

@ -16,6 +16,8 @@ import java.util.List;
@NoArgsConstructor
public class InstanceDetail implements OmsSerializable {
// 任务预计执行时间
private Long expectedTriggerTime;
// 任务整体开始时间
private Long actualTriggerTime;
// 任务整体结束时间可能不存在

View File

@ -17,7 +17,7 @@ import java.util.concurrent.TimeUnit;
*/
public class HttpUtils {
private static OkHttpClient client;
private static final OkHttpClient client;
private static final int HTTP_SUCCESS_CODE = 200;
static {

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.2.0</version>
<version>3.2.1</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.common.version>3.2.0</powerjob.common.version>
<powerjob.common.version>3.2.1</powerjob.common.version>
<mysql.version>8.0.19</mysql.version>
<h2.db.version>1.4.200</h2.db.version>
<zip4j.version>2.5.2</zip4j.version>

View File

@ -1,27 +1,59 @@
package com.github.kfcfans.powerjob.server;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.io.File;
/**
* SpringBoot 启动入口
*
* @author tjq
* @since 2020/3/29
*/
@Slf4j
@EnableScheduling
@SpringBootApplication
public class OhMyApplication {
private static final String TIPS = "\n\n" +
"******************* PowerJob Tips *******************\n" +
"如果应用无法启动,我们建议您仔细阅读以下文档来解决:\n" +
"if server can't startup, we recommend that you read the documentation to find a solution:\n" +
"https://www.yuque.com/powerjob/guidence/xp5ygc#xMQC9\n" +
"******************* PowerJob Tips *******************\n\n";
public static void main(String[] args) {
// 完成前置工作
pre();
// 先启动 ActorSystem
OhMyServer.init();
// 再启动SpringBoot
SpringApplication.run(OhMyApplication.class, args);
try {
SpringApplication.run(OhMyApplication.class, args);
}catch (Throwable t) {
log.error(TIPS);
throw t;
}
}
private static void pre() {
log.info(TIPS);
// 删除历史遗留的 H2 数据库文件
try {
FileUtils.forceDeleteOnExit(new File(OmsFileUtils.genH2Path()));
}catch (Exception e) {
log.warn("[PowerJob] delete h2 workspace({}) failed, if server can't startup successfully, please delete it manually", OmsFileUtils.genH2Path(), e);
}
}
}

View File

@ -53,6 +53,14 @@ public class OmsFileUtils {
return genTemporaryPath() + uuid + "/";
}
/**
* 获取 H2 数据库工作目录
* @return H2 工作目录
*/
public static String genH2Path() {
return COMMON_PATH + "h2/";
}
/**
* 将文本写入文件
* @param content 文本内容

View File

@ -30,7 +30,7 @@ public class HashedWheelTimer implements Timer {
private final Indicator indicator;
private long startTime;
private final long startTime;
private final Queue<HashedWheelTimerFuture> waitingTasks = Queues.newLinkedBlockingQueue();
private final Queue<HashedWheelTimerFuture> canceledTasks = Queues.newLinkedBlockingQueue();
@ -184,7 +184,6 @@ public class HashedWheelTimer implements Timer {
log.warn("[HashedWheelTimer] timerFuture.totalTicks < currentTick, please fix the bug");
}
timerFuture.status = HashedWheelTimerFuture.RUNNING;
try {
// 提交执行
runTask(timerFuture);
@ -202,6 +201,7 @@ public class HashedWheelTimer implements Timer {
}
private void runTask(HashedWheelTimerFuture timerFuture) {
timerFuture.status = HashedWheelTimerFuture.RUNNING;
if (taskProcessPool == null) {
timerFuture.timerTask.run();
}else {

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.server.persistence.config;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ -19,7 +20,10 @@ import javax.sql.DataSource;
@Configuration
public class MultiDatasourceConfig {
private static final String H2_JDBC_URL = "jdbc:h2:file:~/powerjob-server/h2/powerjob_server_db";
private static final String H2_DRIVER_CLASS_NAME = "org.h2.Driver";
private static final String H2_JDBC_URL_PATTERN = "jdbc:h2:file:%spowerjob_server_db";
private static final int H2_MIN_SIZE = 4;
private static final int H2_MAX_ACTIVE_SIZE = 10;
@Primary
@Bean("omsCoreDatasource")
@ -30,15 +34,14 @@ public class MultiDatasourceConfig {
@Bean("omsLocalDatasource")
public DataSource initOmsLocalDatasource() {
HikariConfig config = new HikariConfig();
config.setDriverClassName("org.h2.Driver");
config.setJdbcUrl(H2_JDBC_URL);
config.setDriverClassName(H2_DRIVER_CLASS_NAME);
config.setJdbcUrl(String.format(H2_JDBC_URL_PATTERN, OmsFileUtils.genH2Path()));
config.setAutoCommit(true);
// 池中最小空闲连接数量
config.setMinimumIdle(4);
config.setMinimumIdle(H2_MIN_SIZE);
// 池中最大连接数量
config.setMaximumPoolSize(32);
config.setMaximumPoolSize(H2_MAX_ACTIVE_SIZE);
return new HikariDataSource(config);
}
}

View File

@ -66,17 +66,27 @@ public class DispatchService {
Date now = new Date();
String dbInstanceParams = instanceParams == null ? "" : instanceParams;
// 检查当前任务是否被取消
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
InstanceStatus currentStatus = InstanceStatus.of(instanceInfo.getStatus());
if (currentStatus != WAITING_DISPATCH) {
log.info("[Dispatcher-{}|{}] cancel dispatch job due to instance status({}) is not WAITING_DISPATCH", jobId, instanceId, currentStatus.name());
return;
}
// 查询当前运行的实例数
long current = System.currentTimeMillis();
// 0 代表不限制在线任务还能省去一次 DB 查询
if (jobInfo.getMaxInstanceNum() > 0) {
Integer maxInstanceNum = jobInfo.getMaxInstanceNum();
if (maxInstanceNum > 0) {
// 这个 runningInstanceCount 已经包含了本 instance
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()));
// 超出最大同时运行限制不执行调度
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount);
if (runningInstanceCount > maxInstanceNum) {
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, maxInstanceNum);
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance is running ({} > {}).", jobId, instanceId, runningInstanceCount, maxInstanceNum);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now);
instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result);

View File

@ -12,7 +12,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
@ -22,7 +22,6 @@ import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* 任务服务
@ -103,6 +102,8 @@ public class JobService {
*/
public long runJob(Long jobId, String instanceParams, long delay) {
log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobId, instanceParams, delay);
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
instanceInfoRepository.flush();
@ -110,10 +111,11 @@ public class JobService {
if (delay <= 0) {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
}else {
HashedWheelTimerHolder.TIMER.schedule(() -> {
InstanceTimeWheelService.schedule(instanceId, delay, () -> {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
}, delay, TimeUnit.MILLISECONDS);
});
}
log.info("[Job-{}] run job successfully, instanceId={}", jobId, instanceId);
return instanceId;
}
@ -171,7 +173,7 @@ public class JobService {
return;
}
if (executeLogs.size() > 1) {
log.warn("[JobService] frequent job should just have one running instance, there must have some bug.");
log.warn("[Job-{}] frequent job should just have one running instance, there must have some bug.", jobId);
}
executeLogs.forEach(instance -> {
try {

View File

@ -107,7 +107,7 @@ public class InstanceManager {
log.info("[InstanceManager-{}] instance execute failed but will take the {}th retry.", instanceId, instanceInfo.getRunningTimes());
// 延迟10S重试由于重试不改变 instanceId如果派发到同一台机器上一个 TaskTracker 还处于资源释放阶段无法创建新的TaskTracker任务失败
HashedWheelTimerHolder.TIMER.schedule(() -> {
HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> {
dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes());
}, 10, TimeUnit.SECONDS);

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.service.instance;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.SystemInstanceResult;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
@ -12,6 +13,7 @@ import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.response.InstanceInfoDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.constans.InstanceType;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
@ -86,12 +88,7 @@ public class InstanceService {
log.info("[Instance-{}] try to stop the instance.", instanceId);
try {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo == null) {
log.warn("[Instance-{}] can't find instanceInfo by instanceId.", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
// 判断状态只有运行中才能停止
if (!InstanceStatus.generalizedRunningStatus.contains(instanceInfo.getStatus())) {
throw new IllegalArgumentException("can't stop finished instance!");
@ -124,17 +121,53 @@ public class InstanceService {
}
}
/**
* 取消任务实例的运行
* 接口使用条件调用接口时间与待取消任务的预计执行时间有一定时间间隔否则不保证可靠性
* @param instanceId 任务实例
*/
public void cancelInstance(Long instanceId) {
log.info("[Instance-{}] try to cancel the instance.", instanceId);
try {
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
TimerFuture timerFuture = InstanceTimeWheelService.fetchTimerFuture(instanceId);
boolean success;
// 本机时间轮中存在该任务且顺利取消抢救成功
if (timerFuture != null) {
success = timerFuture.cancel();
} else {
// 调用该接口时间和预计调度时间相近时理论上会出现问题cancel 状态还没写进去另一边就完成了 dispatch随后状态会被覆盖
// 解决该问题的成本极高分布式锁因此选择不解决
// 该接口使用条件调用接口时间与待取消任务的预计执行时间有一定时间间隔否则不保证可靠性
success = InstanceStatus.WAITING_DISPATCH.getV() == instanceInfo.getStatus();
}
if (success) {
instanceInfo.setStatus(InstanceStatus.CANCELED.getV());
instanceInfo.setResult(SystemInstanceResult.CANCELED_BY_USER);
// 如果写 DB 失败抛异常接口返回 false即取消失败任务会被 HA 机制重新调度执行因此此处不需要任何处理
instanceInfoRepository.saveAndFlush(instanceInfo);
log.info("[Instance-{}] cancel the instance successfully.", instanceId);
}else {
log.warn("[Instance-{}] cancel the instance failed.", instanceId);
throw new OmsException("instance already up and running");
}
}catch (Exception e) {
log.error("[Instance-{}] cancelInstance failed.", instanceId, e);
throw e;
}
}
/**
* 获取任务实例的信息
* @param instanceId 任务实例ID
* @return 任务实例的信息
*/
public InstanceInfoDTO getInstanceInfo(Long instanceId) {
InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfoDO == null) {
log.warn("[Instance-{}] can't find InstanceInfo by instanceId.", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
InstanceInfoDO instanceInfoDO = fetchInstanceInfo(instanceId);
InstanceInfoDTO instanceInfoDTO = new InstanceInfoDTO();
BeanUtils.copyProperties(instanceInfoDO, instanceInfoDTO);
return instanceInfoDTO;
@ -146,11 +179,7 @@ public class InstanceService {
* @return 任务实例的状态
*/
public InstanceStatus getInstanceStatus(Long instanceId) {
InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfoDO == null) {
log.warn("[Instance-{}] can't find InstanceInfo by instanceId.", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
InstanceInfoDO instanceInfoDO = fetchInstanceInfo(instanceId);
return InstanceStatus.of(instanceInfoDO.getStatus());
}
@ -161,11 +190,7 @@ public class InstanceService {
*/
public InstanceDetail getInstanceDetail(Long instanceId) {
InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfoDO == null) {
log.warn("[Instance-{}] can't find InstanceInfo by instanceId", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
InstanceInfoDO instanceInfoDO = fetchInstanceInfo(instanceId);
InstanceStatus instanceStatus = InstanceStatus.of(instanceInfoDO.getStatus());
@ -202,4 +227,12 @@ public class InstanceService {
return detail;
}
private InstanceInfoDO fetchInstanceInfo(Long instanceId) {
InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfoDO == null) {
log.warn("[Instance-{}] can't find InstanceInfo by instanceId", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
return instanceInfoDO;
}
}

View File

@ -0,0 +1,51 @@
package com.github.kfcfans.powerjob.server.service.instance;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTimer;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerTask;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 定时调度任务实例
*
* @author tjq
* @since 2020/7/25
*/
public class InstanceTimeWheelService {
private static final Map<Long, TimerFuture> CARGO = Maps.newConcurrentMap();
// 精确时间轮 1S 走一格
private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
// 支持取消的时间间隔低于该阈值则不会放进 CARGO
private static final long MIN_INTERVAL_MS = 1000;
/**
* 定时调度
* @param uniqueId 唯一 ID必须是 snowflake 算法生成的 ID
* @param delayMS 延迟毫秒数
* @param timerTask 需要执行的目标方法
*/
public static void schedule(Long uniqueId, Long delayMS, TimerTask timerTask) {
TimerFuture timerFuture = TIMER.schedule(() -> {
CARGO.remove(uniqueId);
timerTask.run();
}, delayMS, TimeUnit.MILLISECONDS);
if (delayMS > MIN_INTERVAL_MS) {
CARGO.put(uniqueId, timerFuture);
}
}
/**
* 获取 TimerFuture
* @param uniqueId 唯一 ID
* @return TimerFuture
*/
public static TimerFuture fetchTimerFuture(Long uniqueId) {
return CARGO.get(uniqueId);
}
}

View File

@ -192,6 +192,6 @@ public class InstanceStatusCheckService {
instance.setResult(SystemInstanceResult.REPORT_TIMEOUT);
instanceInfoRepository.saveAndFlush(instance);
instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, "timeout, maybe TaskTracker down!");
instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, SystemInstanceResult.REPORT_TIMEOUT);
}
}

View File

@ -10,9 +10,6 @@ import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTime
*/
public class HashedWheelTimerHolder {
// 精确时间轮 1S 走一格
public static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
// 非精确时间轮 5S 走一格
public static final HashedWheelTimer INACCURATE_TIMER = new HashedWheelTimer(5, 16, 0);

View File

@ -16,12 +16,12 @@ import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.service.JobService;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.annotation.Async;
@ -34,7 +34,6 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@ -160,9 +159,9 @@ public class OmsScheduleService {
delay = targetTriggerTime - nowTime;
}
HashedWheelTimerHolder.TIMER.schedule(() -> {
InstanceTimeWheelService.schedule(instanceId, delay, () -> {
dispatchService.dispatch(jobInfoDO, instanceId, 0, null, null);
}, delay, TimeUnit.MILLISECONDS);
});
});
// 3. 计算下一次调度时间忽略5S内的重复执行即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms
@ -216,7 +215,7 @@ public class OmsScheduleService {
log.warn("[Workflow-{}] workflow schedule delay, expect:{}, actual: {}", wfInfo.getId(), wfInfo.getNextTriggerTime(), System.currentTimeMillis());
delay = 0;
}
HashedWheelTimerHolder.TIMER.schedule(() -> workflowInstanceManager.start(wfInfo, wfInstanceId), delay, TimeUnit.MILLISECONDS);
InstanceTimeWheelService.schedule(wfInstanceId, delay, () -> workflowInstanceManager.start(wfInfo, wfInstanceId));
// 3. 重新计算下一次调度时间并更新
try {

View File

@ -204,7 +204,7 @@ public class WorkflowInstanceManager {
node.setStatus(status.getV());
node.setResult(result);
log.debug("[Workflow-{}|{}] node(jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getJobId(), instanceId, status.name(), result);
log.info("[Workflow-{}|{}] node(jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getJobId(), instanceId, status.name(), result);
}
if (InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) {

View File

@ -41,6 +41,7 @@ public class AppInfoController {
@PostMapping("/save")
public ResultDTO<Void> saveAppInfo(@RequestBody ModifyAppInfoRequest req) {
req.valid();
AppInfoDO appInfoDO;
Long id = req.getId();

View File

@ -94,6 +94,13 @@ public class OpenAPIController {
return ResultDTO.success(null);
}
@PostMapping(OpenAPIConstant.CANCEL_INSTANCE)
public ResultDTO<Void> cancelInstance(Long instanceId, Long appId) {
checkInstanceIdValid(instanceId, appId);
instanceService.cancelInstance(instanceId);
return ResultDTO.success(null);
}
@PostMapping(OpenAPIConstant.FETCH_INSTANCE_STATUS)
public ResultDTO<Integer> fetchInstanceStatus(Long instanceId) {
InstanceStatus instanceStatus = instanceService.getInstanceStatus(instanceId);

View File

@ -1,5 +1,8 @@
package com.github.kfcfans.powerjob.server.web.controller;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
@ -11,6 +14,7 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Optional;
import java.util.TimeZone;
/**
* 处理Worker请求的 Controller
@ -47,8 +51,13 @@ public class ServerController {
}
@GetMapping("/hello")
public ResultDTO<String> ping() {
return ResultDTO.success("this is powerjob-server~");
public ResultDTO<JSONObject> ping() {
JSONObject res = new JSONObject();
res.put("localHost", NetUtils.getLocalHost());
res.put("actorSystemAddress", OhMyServer.getActorSystemAddress());
res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis()));
res.put("serverTimeZone", TimeZone.getDefault().getDisplayName());
return ResultDTO.success(res);
}
}

View File

@ -1,6 +1,9 @@
package com.github.kfcfans.powerjob.server.web.request;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
/**
* 修改应用信息请求
@ -14,4 +17,11 @@ public class ModifyAppInfoRequest {
private Long id;
private String appName;
private String password;
public void valid() {
CommonUtils.requireNonNull(appName, "appName can't be empty");
if (StringUtils.containsWhitespace(appName)) {
throw new OmsException("appName can't contains white space!");
}
}
}

View File

@ -22,6 +22,8 @@ import java.util.List;
@NoArgsConstructor
public class InstanceDetailVO {
// 任务预计执行时间
private String expectedTriggerTime;
// 任务整体开始时间
private String actualTriggerTime;
// 任务整体结束时间可能不存在
@ -68,6 +70,7 @@ public class InstanceDetailVO {
// 格式化时间
vo.setFinishedTime(CommonUtils.formatTime(origin.getFinishedTime()));
vo.setActualTriggerTime(CommonUtils.formatTime(origin.getActualTriggerTime()));
vo.setExpectedTriggerTime(CommonUtils.formatTime(origin.getExpectedTriggerTime()));
// 拷贝 TaskDetail
if (origin.getTaskDetail() != null) {

View File

@ -1,7 +1,7 @@
oms.env=DAILY
logging.config=classpath:logback-dev.xml
####### 数据库配置 #######
####### 外部数据库配置(需要用户更改为自己的数据库配置) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.username=root
@ -10,7 +10,7 @@ spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置非核心依赖可移除 #######
spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-daily
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily
####### 邮件配置(启用邮件报警则需要) #######
spring.mail.host=smtp.163.com

View File

@ -2,8 +2,10 @@
server.port=7700
spring.profiles.active=daily
spring.main.banner-mode=log
spring.jpa.open-in-view=false
spring.data.mongodb.repositories.type=none
logging.level.org.mongodb=warn
# 文件上传配置
spring.servlet.multipart.enabled=true

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTime
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerTask;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import java.util.Date;
@ -82,4 +83,12 @@ public class UtilsTest {
public void testTZ() {
System.out.println(TimeZone.getDefault());
}
@Test
public void testStringUtils() {
String goodAppName = "powerjob-server";
String appName = "powerjob-server ";
System.out.println(StringUtils.containsWhitespace(goodAppName));
System.out.println(StringUtils.containsWhitespace(appName));
}
}

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.2.0</version>
<version>3.2.1</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.2.0</powerjob.worker.version>
<powerjob.worker.version>3.2.1</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.2.0</version>
<version>3.2.1</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.version>3.2.0</powerjob.worker.version>
<powerjob.worker.version>3.2.1</powerjob.worker.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>powerjob</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>3.2.1</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.2.1</powerjob.worker.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
</properties>
<dependencies>
<!-- oms-worker -->
<dependency>
<groupId>com.github.kfcfans</groupId>
<artifactId>powerjob-worker</artifactId>
<version>${powerjob.worker.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${springboot.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,50 @@
package com.github.kfcfans.powerjob.worker.autoconfigure;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.List;
/**
* PowerJob 自动装配
*
* @author songyinyin
* @since 2020/7/26 16:37
*/
@Configuration
@EnableConfigurationProperties(PowerJobProperties.class)
public class PowerJobAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public OhMyWorker initPowerJob(PowerJobProperties properties) {
// 服务器HTTP地址端口号为 server.port而不是 ActorSystem port请勿添加任何前缀http://
CommonUtils.requireNonNull(properties.getServerAddress(), "serverAddress can't be empty!");
List<String> serverAddress = Arrays.asList(properties.getServerAddress().split(","));
// 1. 创建配置文件
OhMyConfig config = new OhMyConfig();
// 可以不显式设置默认值 27777
config.setPort(properties.getAkkaPort());
// appName需要提前在控制台注册否则启动报错
config.setAppName(properties.getAppName());
config.setServerAddress(serverAddress);
// 如果没有大型 Map/MapReduce 的需求建议使用内存来加速计算
// 有大型 Map/MapReduce 需求可能产生大量子任务Task的场景请使用 DISK否则妥妥的 OutOfMemory
config.setStoreStrategy(properties.getStoreStrategy());
// 启动测试模式true情况下不再尝试连接 server 并验证appName
config.setEnableTestMode(properties.isEnableTestMode());
// 2. 创建 Worker 对象设置配置文件
OhMyWorker ohMyWorker = new OhMyWorker();
ohMyWorker.setConfig(config);
return ohMyWorker;
}
}

View File

@ -0,0 +1,44 @@
package com.github.kfcfans.powerjob.worker.autoconfigure;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* PowerJob 配置项
*
* @author songyinyin
* @since 2020/7/26 16:37
*/
@Data
@ConfigurationProperties(prefix = "powerjob")
public class PowerJobProperties {
/**
* 应用名称需要提前在控制台注册否则启动报错
*/
private String appName;
/**
* 启动 akka 端口
*/
private int akkaPort = RemoteConstant.DEFAULT_WORKER_PORT;
/**
* 调度服务器地址ip:port 域名多个用英文逗号分隔
*/
private String serverAddress;
/**
* 本地持久化方式默认使用磁盘
*/
private StoreStrategy storeStrategy = StoreStrategy.DISK;
/**
* 最大返回值长度超过会被截断
* {@link ProcessResult}#msg 的最大长度
*/
private int maxResultLength = 8096;
/**
* 启动测试模式true情况下不再尝试连接 server 并验证appName
* true -> 用于本地写单元测试调试 false -> 默认值标准模式
*/
private boolean enableTestMode = false;
}

View File

@ -0,0 +1,50 @@
{
"groups": [
{
"name": "powerjob",
"type": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
}
],
"properties": [
{
"name": "powerjob.app-name",
"type": "java.lang.String",
"description": "应用名称,需要提前在控制台注册,否则启动报错",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
},
{
"name": "powerjob.max-result-length",
"type": "java.lang.Integer",
"description": "最大返回值长度,超过会被截断 {@link ProcessResult}#msg 的最大长度",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"defaultValue": 8096
},
{
"name": "powerjob.akka-port",
"type": "java.lang.Integer",
"description": "启动 akka 端口",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
},
{
"name": "powerjob.server-address",
"type": "java.lang.String",
"description": "调度服务器地址ip:port 或 域名,多值用英文逗号分隔",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
},
{
"name": "powerjob.store-strategy",
"type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
"description": "本地持久化方式,默认使用磁盘",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties"
},
{
"name": "powerjob.enable-test-mode",
"type": "java.lang.Boolean",
"description": "启动测试模式true情况下不再尝试连接 server 并验证appName。true -> 用于本地写单元测试调试; false -> 默认值,标准模式",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"defaultValue": false
}
],
"hints": []
}

View File

@ -0,0 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobAutoConfiguration

View File

@ -0,0 +1,22 @@
package com.github.kfcfans.powerjob.worker.autoconfigure;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableAutoConfiguration
class PowerJobAutoConfigurationTest {
@Test
void testAutoConfiguration() {
ConfigurableApplicationContext run = SpringApplication.run(PowerJobAutoConfigurationTest.class);
OhMyWorker worker = run.getBean(OhMyWorker.class);
Assert.assertNotNull(worker);
}
}

View File

@ -0,0 +1,2 @@
powerjob.enable-test-mode=true

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.2.0</version>
<version>3.2.1</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.2.0</powerjob.common.version>
<powerjob.common.version>3.2.1</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -21,6 +21,7 @@ import com.github.kfcfans.powerjob.worker.background.ServerDiscoveryService;
import com.github.kfcfans.powerjob.worker.background.WorkerHealthReporter;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.worker.common.OmsBannerPrinter;
import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
import com.google.common.base.Stopwatch;
@ -30,6 +31,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
@ -37,6 +39,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.util.StringUtils;
import java.io.File;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
@ -80,6 +83,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[OhMyWorker] start to initialize OhMyWorker...");
try {
pre();
OmsBannerPrinter.print();
// 校验 appName
if (!config.isEnableTestMode()) {
@ -179,4 +183,14 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
public void destroy() throws Exception {
timingPool.shutdownNow();
}
private static void pre() {
// 删除历史遗留的 H2 数据库文件
String h2Path = OmsWorkerFileUtils.getH2Dir();
try {
FileUtils.forceDeleteOnExit(new File(h2Path));
}catch (Exception e) {
log.warn("[PowerJob] delete h2 workspace({}) failed, if worker can't startup successfully, please delete it manually", h2Path, e);
}
}
}

View File

@ -18,4 +18,8 @@ public class OmsWorkerFileUtils {
public static String getContainerDir() {
return WORKER_DIR + "container/";
}
public static String getH2Dir() {
return WORKER_DIR + "h2/";
}
}

View File

@ -124,7 +124,7 @@ public class OmsContainerFactory {
oldContainer.destroy();
}
}catch (Exception e) {
} catch (Exception e) {
log.error("[OmsContainer-{}] deployContainer(name={},version={}) failed.", containerId, containerName, version, e);
// 如果部署失败则删除该 jar本次失败可能是下载jar出错导致不删除会导致这个版本永久无法重新部署
CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(jarFile));

View File

@ -96,7 +96,9 @@ public abstract class ScriptProcessor implements BasicProcessor {
}
String result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString());
return new ProcessResult(true, result);
// 0 代表正常退出
int exitValue = process.exitValue();
return new ProcessResult(exitValue == 0, result);
}catch (InterruptedException ie) {
omsLogger.info("SYSTEM===> ScriptProcessor has been interrupted");
return new ProcessResult(false, "Interrupted");

View File

@ -171,11 +171,13 @@ public class FrequentTaskTracker extends TaskTracker {
newRootTask.setLastReportTime(-1L);
// 判断是否超出最大执行实例数
if (timeExpressionType == TimeExpressionType.FIX_RATE) {
if (subInstanceId2TimeHolder.size() > maxInstanceNum) {
log.warn("[TaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId);
processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE");
return;
if (maxInstanceNum > 0) {
if (timeExpressionType == TimeExpressionType.FIX_RATE) {
if (subInstanceId2TimeHolder.size() > maxInstanceNum) {
log.warn("[TaskTracker-{}] cancel to launch the subInstance({}) due to too much subInstance is running.", instanceId, subInstanceId);
processFinishedSubInstance(subInstanceId, false, "TOO_MUCH_INSTANCE");
return;
}
}
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.worker.persistence;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
@ -19,8 +20,8 @@ public class ConnectionFactory {
private static volatile DataSource dataSource;
private static final String DISK_JDBC_URL = "jdbc:h2:file:~/powerjob/h2/oms_worker_db";
private static final String MEMORY_JDBC_URL = "jdbc:h2:mem:~/powerjob/h2/oms_worker_db";
private static final String DISK_JDBC_URL = String.format("jdbc:h2:file:%spowerjob_worker_db", OmsWorkerFileUtils.getH2Dir());
private static final String MEMORY_JDBC_URL = String.format("jdbc:h2:mem:%spowerjob_worker_db", OmsWorkerFileUtils.getH2Dir());
public static Connection getConnection() throws SQLException {
return getDataSource().getConnection();

View File

@ -68,6 +68,8 @@ public class TaskDO {
public String toString() {
return "{" +
"taskId='" + taskId + '\'' +
", instanceId=" + instanceId +
", subInstanceId=" + subInstanceId +
", taskName='" + taskName + '\'' +
", address='" + address + '\'' +
", status=" + status +
@ -75,6 +77,7 @@ public class TaskDO {
", failedCnt=" + failedCnt +
", createdTime=" + createdTime +
", lastModifiedTime=" + lastModifiedTime +
", lastReportTime=" + lastReportTime +
'}';
}
}

View File

@ -1,9 +1,14 @@
package com.github.kfcfans.powerjob;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.built.PythonProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.built.ShellProcessor;
import com.github.kfcfans.powerjob.worker.log.impl.OmsServerLogger;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ThreadLocalRandom;
/**
* 测试脚本处理器
*
@ -14,25 +19,37 @@ public class ScriptProcessorTest {
private static final long timeout = 10000;
private static final TaskContext context = new TaskContext();
@BeforeAll
public static void initContext() {
context.setOmsLogger(new OmsServerLogger(1L));
}
@Test
public void testLocalShellProcessor() throws Exception {
ShellProcessor sp = new ShellProcessor(1L, "ls -a", timeout);
sp.process(null);
System.out.println(sp.process(context));
ShellProcessor sp2 = new ShellProcessor(2777L, "pwd", timeout);
sp2.process(null);
System.out.println(sp2.process(context));
}
@Test
public void testLocalPythonProcessor() throws Exception {
PythonProcessor pp = new PythonProcessor(2L, "print 'Hello World!'", timeout);
pp.process(null);
System.out.println(pp.process(context));
}
@Test
public void testNetShellProcessor() throws Exception {
ShellProcessor sp = new ShellProcessor(18L, "http://localhost:8080/test/test.sh", timeout);
sp.process(null);
System.out.println(sp.process(context));
}
@Test
public void testFailedScript() throws Exception {
ShellProcessor sp3 = new ShellProcessor(ThreadLocalRandom.current().nextLong(), "mvn tjq", timeout);
System.out.println(sp3.process(context));
}
}