diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml
index 39ee4363..2d4286da 100644
--- a/powerjob-client/pom.xml
+++ b/powerjob-client/pom.xml
@@ -10,11 +10,11 @@
4.0.0
powerjob-client
- 3.2.0
+ 3.2.1
jar
- 3.2.0
+ 3.2.1
5.6.1
diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java
index 8d7f1fea..1b0d66d7 100644
--- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java
+++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java
@@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.OpenAPIConstant;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.*;
+import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.HttpUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.google.common.collect.Lists;
@@ -31,13 +32,13 @@ public class OhMyClient {
private Long appId;
private String currentAddress;
- private List allAddress;
+ private final List allAddress;
private static final String URL_PATTERN = "http://%s%s%s";
/**
* 初始化 OhMyClient 客户端
- * @param domain www.oms-server.com(内网域名,自行完成DNS & Proxy)
+ * @param domain 比如 www.powerjob-server.com(内网域名,自行完成 DNS & Proxy)
* @param appName 负责的应用名称
*/
public OhMyClient(String domain, String appName, String password) {
@@ -52,8 +53,8 @@ public class OhMyClient {
*/
public OhMyClient(List addressList, String appName, String password) {
- Objects.requireNonNull(addressList, "domain can't be null!");
- Objects.requireNonNull(appName, "appName can't be null");
+ CommonUtils.requireNonNull(addressList, "domain can't be null!");
+ CommonUtils.requireNonNull(appName, "appName can't be null");
allAddress = addressList;
for (String addr : addressList) {
@@ -77,7 +78,7 @@ public class OhMyClient {
if (StringUtils.isEmpty(currentAddress)) {
throw new OmsException("no server available");
}
- log.info("[OhMyClient] {}'s oms-client bootstrap successfully.", appName);
+ log.info("[OhMyClient] {}'s oms-client bootstrap successfully, using server: {}", appName, currentAddress);
}
private static String assertApp(String appName, String password, String url) throws IOException {
@@ -211,6 +212,13 @@ public class OhMyClient {
return JsonUtils.parseObject(post, ResultDTO.class);
}
+ /**
+ * 取消任务实例
+ * 接口使用条件:调用接口时间与待取消任务的预计执行时间有一定时间间隔,否则不保证可靠性!
+ * @param instanceId 任务实例ID
+ * @return true 代表取消成功,false 取消失败
+ * @throws Exception 异常
+ */
public ResultDTO cancelInstance(Long instanceId) throws Exception {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
@@ -373,28 +381,35 @@ public class OhMyClient {
private String postHA(String path, RequestBody requestBody) {
// 先尝试默认地址
+ String url = getUrl(path, currentAddress);
try {
- String res = HttpUtils.post(getUrl(path, currentAddress), requestBody);
+ String res = HttpUtils.post(url, requestBody);
if (StringUtils.isNotEmpty(res)) {
return res;
}
- }catch (Exception ignore) {
+ }catch (Exception e) {
+ log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString());
}
// 失败,开始重试
for (String addr : allAddress) {
+ if (Objects.equals(addr, currentAddress)) {
+ continue;
+ }
+ url = getUrl(path, addr);
try {
- String res = HttpUtils.post(getUrl(path, addr), requestBody);
+ String res = HttpUtils.post(url, requestBody);
if (StringUtils.isNotEmpty(res)) {
log.warn("[OhMyClient] server change: from({}) -> to({}).", currentAddress, addr);
currentAddress = addr;
return res;
}
- }catch (Exception ignore) {
+ }catch (Exception e) {
+ log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString());
}
}
- log.error("[OhMyClient] no server available in {}.", allAddress);
+ log.error("[OhMyClient] do post for path: {} failed because of no server available in {}.", path, allAddress);
throw new OmsException("no server available");
}
}
diff --git a/powerjob-client/src/test/java/TestClient.java b/powerjob-client/src/test/java/TestClient.java
index f1f7dec8..b4d00ae8 100644
--- a/powerjob-client/src/test/java/TestClient.java
+++ b/powerjob-client/src/test/java/TestClient.java
@@ -9,6 +9,8 @@ import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import java.util.concurrent.TimeUnit;
+
/**
* 测试 Client
*
@@ -87,4 +89,24 @@ public class TestClient {
public void testFetchInstanceStatus() throws Exception {
System.out.println(ohMyClient.fetchInstanceStatus(141251409466097728L));
}
+
+ @Test
+ public void testCancelInstanceInTimeWheel() throws Exception {
+ ResultDTO startRes = ohMyClient.runJob(15L, "start by OhMyClient", 20000);
+ System.out.println("runJob result: " + JsonUtils.toJSONString(startRes));
+ ResultDTO cancelRes = ohMyClient.cancelInstance(startRes.getData());
+ System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes));
+ }
+
+ @Test
+ public void testCancelInstanceInDatabase() throws Exception {
+ ResultDTO startRes = ohMyClient.runJob(15L, "start by OhMyClient", 2000000);
+ System.out.println("runJob result: " + JsonUtils.toJSONString(startRes));
+
+ // 手动重启 server,干掉时间轮中的调度数据
+ TimeUnit.MINUTES.sleep(1);
+
+ ResultDTO cancelRes = ohMyClient.cancelInstance(startRes.getData());
+ System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes));
+ }
}
diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml
index 3e8766b4..bca0ae69 100644
--- a/powerjob-common/pom.xml
+++ b/powerjob-common/pom.xml
@@ -10,7 +10,7 @@
4.0.0
powerjob-common
- 3.2.0
+ 3.2.1
jar
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/HttpUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/HttpUtils.java
index 3612f179..21664614 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/HttpUtils.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/HttpUtils.java
@@ -17,7 +17,7 @@ import java.util.concurrent.TimeUnit;
*/
public class HttpUtils {
- private static OkHttpClient client;
+ private static final OkHttpClient client;
private static final int HTTP_SUCCESS_CODE = 200;
static {
diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml
index f290a457..464668af 100644
--- a/powerjob-server/pom.xml
+++ b/powerjob-server/pom.xml
@@ -10,13 +10,13 @@
4.0.0
powerjob-server
- 3.2.0
+ 3.2.1
jar
2.9.2
2.2.6.RELEASE
- 3.2.0
+ 3.2.1
8.0.19
1.4.200
2.5.2
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java
index 4b864960..9b3810e8 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java
@@ -102,6 +102,8 @@ public class JobService {
*/
public long runJob(Long jobId, String instanceParams, long delay) {
+ log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobId, instanceParams, delay);
+
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
instanceInfoRepository.flush();
@@ -113,6 +115,7 @@ public class JobService {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
});
}
+ log.info("[Job-{}] run job successfully, instanceId={}", jobId, instanceId);
return instanceId;
}
@@ -170,7 +173,7 @@ public class JobService {
return;
}
if (executeLogs.size() > 1) {
- log.warn("[JobService] frequent job should just have one running instance, there must have some bug.");
+ log.warn("[Job-{}] frequent job should just have one running instance, there must have some bug.", jobId);
}
executeLogs.forEach(instance -> {
try {
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java
index 0b710fa4..10a0c9d3 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java
@@ -20,6 +20,8 @@ public class InstanceTimeWheelService {
// 精确时间轮,每 1S 走一格
private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
+ // 支持取消的时间间隔,低于该阈值则不会放进 CARGO
+ private static final long MIN_INTERVAL_MS = 1000;
/**
* 定时调度
@@ -32,8 +34,9 @@ public class InstanceTimeWheelService {
CARGO.remove(uniqueId);
timerTask.run();
}, delayMS, TimeUnit.MILLISECONDS);
-
- CARGO.put(uniqueId, timerFuture);
+ if (delayMS > MIN_INTERVAL_MS) {
+ CARGO.put(uniqueId, timerFuture);
+ }
}
/**
diff --git a/powerjob-server/src/main/resources/application.properties b/powerjob-server/src/main/resources/application.properties
index 3ec409ea..a9383fbe 100644
--- a/powerjob-server/src/main/resources/application.properties
+++ b/powerjob-server/src/main/resources/application.properties
@@ -16,7 +16,7 @@ spring.datasource.druid.filters=stat
spring.datasource.druid.stat-view-servlet.url-pattern=/druid/*
spring.datasource.druid.filter.stat.enabled=true
spring.datasource.druid.filter.stat.log-slow-sql=true
-spring.datasource.druid.filter.stat.slow-sql-millis=3000
+spring.datasource.druid.filter.stat.slow-sql-millis=5000
spring.datasource.druid.stat-view-servlet.enabled=true
spring.datasource.druid.stat-view-servlet.login-username=powerjob
spring.datasource.druid.stat-view-servlet.login-password=powerjob
diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml
index 4b4c7db2..82211504 100644
--- a/powerjob-worker-agent/pom.xml
+++ b/powerjob-worker-agent/pom.xml
@@ -10,12 +10,12 @@
4.0.0
powerjob-worker-agent
- 3.2.0
+ 3.2.1
jar
- 3.2.0
+ 3.2.1
1.2.3
4.3.2
diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml
index e11ca2cc..e2500e5a 100644
--- a/powerjob-worker-samples/pom.xml
+++ b/powerjob-worker-samples/pom.xml
@@ -10,11 +10,11 @@
4.0.0
powerjob-worker-samples
- 3.2.0
+ 3.2.1
2.2.6.RELEASE
- 3.2.0
+ 3.2.1
1.2.68
diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml
index e4b55ef1..63661bc0 100644
--- a/powerjob-worker/pom.xml
+++ b/powerjob-worker/pom.xml
@@ -10,12 +10,12 @@
4.0.0
powerjob-worker
- 3.2.0
+ 3.2.1
jar
5.2.4.RELEASE
- 3.2.0
+ 3.2.1
1.4.200
3.4.2
5.6.1