From 83ad8546550caac6164d75a7900e095626cd31dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=85=AB?= Date: Sat, 25 Jul 2020 11:25:24 +0800 Subject: [PATCH] [dev] change pom version to v3.2.1 & optmize OhMyClient's request --- powerjob-client/pom.xml | 4 +-- .../kfcfans/powerjob/client/OhMyClient.java | 35 +++++++++++++------ powerjob-client/src/test/java/TestClient.java | 22 ++++++++++++ powerjob-common/pom.xml | 2 +- .../powerjob/common/utils/HttpUtils.java | 2 +- powerjob-server/pom.xml | 4 +-- .../powerjob/server/service/JobService.java | 5 ++- .../instance/InstanceTimeWheelService.java | 7 ++-- .../src/main/resources/application.properties | 2 +- powerjob-worker-agent/pom.xml | 4 +-- powerjob-worker-samples/pom.xml | 4 +-- powerjob-worker/pom.xml | 4 +-- 12 files changed, 69 insertions(+), 26 deletions(-) diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index 39ee4363..2d4286da 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-client - 3.2.0 + 3.2.1 jar - 3.2.0 + 3.2.1 5.6.1 diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 8d7f1fea..1b0d66d7 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.OpenAPIConstant; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.response.*; +import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.google.common.collect.Lists; @@ -31,13 +32,13 @@ public class OhMyClient { private Long appId; private String currentAddress; - private List allAddress; + private final List allAddress; private static final String URL_PATTERN = "http://%s%s%s"; /** * 初始化 OhMyClient 客户端 - * @param domain www.oms-server.com(内网域名,自行完成DNS & Proxy) + * @param domain 比如 www.powerjob-server.com(内网域名,自行完成 DNS & Proxy) * @param appName 负责的应用名称 */ public OhMyClient(String domain, String appName, String password) { @@ -52,8 +53,8 @@ public class OhMyClient { */ public OhMyClient(List addressList, String appName, String password) { - Objects.requireNonNull(addressList, "domain can't be null!"); - Objects.requireNonNull(appName, "appName can't be null"); + CommonUtils.requireNonNull(addressList, "domain can't be null!"); + CommonUtils.requireNonNull(appName, "appName can't be null"); allAddress = addressList; for (String addr : addressList) { @@ -77,7 +78,7 @@ public class OhMyClient { if (StringUtils.isEmpty(currentAddress)) { throw new OmsException("no server available"); } - log.info("[OhMyClient] {}'s oms-client bootstrap successfully.", appName); + log.info("[OhMyClient] {}'s oms-client bootstrap successfully, using server: {}", appName, currentAddress); } private static String assertApp(String appName, String password, String url) throws IOException { @@ -211,6 +212,13 @@ public class OhMyClient { return JsonUtils.parseObject(post, ResultDTO.class); } + /** + * 取消任务实例 + * 接口使用条件:调用接口时间与待取消任务的预计执行时间有一定时间间隔,否则不保证可靠性! + * @param instanceId 任务实例ID + * @return true 代表取消成功,false 取消失败 + * @throws Exception 异常 + */ public ResultDTO cancelInstance(Long instanceId) throws Exception { RequestBody body = new FormBody.Builder() .add("instanceId", instanceId.toString()) @@ -373,28 +381,35 @@ public class OhMyClient { private String postHA(String path, RequestBody requestBody) { // 先尝试默认地址 + String url = getUrl(path, currentAddress); try { - String res = HttpUtils.post(getUrl(path, currentAddress), requestBody); + String res = HttpUtils.post(url, requestBody); if (StringUtils.isNotEmpty(res)) { return res; } - }catch (Exception ignore) { + }catch (Exception e) { + log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString()); } // 失败,开始重试 for (String addr : allAddress) { + if (Objects.equals(addr, currentAddress)) { + continue; + } + url = getUrl(path, addr); try { - String res = HttpUtils.post(getUrl(path, addr), requestBody); + String res = HttpUtils.post(url, requestBody); if (StringUtils.isNotEmpty(res)) { log.warn("[OhMyClient] server change: from({}) -> to({}).", currentAddress, addr); currentAddress = addr; return res; } - }catch (Exception ignore) { + }catch (Exception e) { + log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString()); } } - log.error("[OhMyClient] no server available in {}.", allAddress); + log.error("[OhMyClient] do post for path: {} failed because of no server available in {}.", path, allAddress); throw new OmsException("no server available"); } } diff --git a/powerjob-client/src/test/java/TestClient.java b/powerjob-client/src/test/java/TestClient.java index f1f7dec8..b4d00ae8 100644 --- a/powerjob-client/src/test/java/TestClient.java +++ b/powerjob-client/src/test/java/TestClient.java @@ -9,6 +9,8 @@ import com.github.kfcfans.powerjob.common.utils.JsonUtils; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import java.util.concurrent.TimeUnit; + /** * 测试 Client * @@ -87,4 +89,24 @@ public class TestClient { public void testFetchInstanceStatus() throws Exception { System.out.println(ohMyClient.fetchInstanceStatus(141251409466097728L)); } + + @Test + public void testCancelInstanceInTimeWheel() throws Exception { + ResultDTO startRes = ohMyClient.runJob(15L, "start by OhMyClient", 20000); + System.out.println("runJob result: " + JsonUtils.toJSONString(startRes)); + ResultDTO cancelRes = ohMyClient.cancelInstance(startRes.getData()); + System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes)); + } + + @Test + public void testCancelInstanceInDatabase() throws Exception { + ResultDTO startRes = ohMyClient.runJob(15L, "start by OhMyClient", 2000000); + System.out.println("runJob result: " + JsonUtils.toJSONString(startRes)); + + // 手动重启 server,干掉时间轮中的调度数据 + TimeUnit.MINUTES.sleep(1); + + ResultDTO cancelRes = ohMyClient.cancelInstance(startRes.getData()); + System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes)); + } } diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index 3e8766b4..bca0ae69 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.2.0 + 3.2.1 jar diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/HttpUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/HttpUtils.java index 3612f179..21664614 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/HttpUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/HttpUtils.java @@ -17,7 +17,7 @@ import java.util.concurrent.TimeUnit; */ public class HttpUtils { - private static OkHttpClient client; + private static final OkHttpClient client; private static final int HTTP_SUCCESS_CODE = 200; static { diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index f290a457..464668af 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.2.0 + 3.2.1 jar 2.9.2 2.2.6.RELEASE - 3.2.0 + 3.2.1 8.0.19 1.4.200 2.5.2 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 4b864960..9b3810e8 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -102,6 +102,8 @@ public class JobService { */ public long runJob(Long jobId, String instanceParams, long delay) { + log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobId, instanceParams, delay); + JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0)); instanceInfoRepository.flush(); @@ -113,6 +115,7 @@ public class JobService { dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); }); } + log.info("[Job-{}] run job successfully, instanceId={}", jobId, instanceId); return instanceId; } @@ -170,7 +173,7 @@ public class JobService { return; } if (executeLogs.size() > 1) { - log.warn("[JobService] frequent job should just have one running instance, there must have some bug."); + log.warn("[Job-{}] frequent job should just have one running instance, there must have some bug.", jobId); } executeLogs.forEach(instance -> { try { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java index 0b710fa4..10a0c9d3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceTimeWheelService.java @@ -20,6 +20,8 @@ public class InstanceTimeWheelService { // 精确时间轮,每 1S 走一格 private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4); + // 支持取消的时间间隔,低于该阈值则不会放进 CARGO + private static final long MIN_INTERVAL_MS = 1000; /** * 定时调度 @@ -32,8 +34,9 @@ public class InstanceTimeWheelService { CARGO.remove(uniqueId); timerTask.run(); }, delayMS, TimeUnit.MILLISECONDS); - - CARGO.put(uniqueId, timerFuture); + if (delayMS > MIN_INTERVAL_MS) { + CARGO.put(uniqueId, timerFuture); + } } /** diff --git a/powerjob-server/src/main/resources/application.properties b/powerjob-server/src/main/resources/application.properties index 3ec409ea..a9383fbe 100644 --- a/powerjob-server/src/main/resources/application.properties +++ b/powerjob-server/src/main/resources/application.properties @@ -16,7 +16,7 @@ spring.datasource.druid.filters=stat spring.datasource.druid.stat-view-servlet.url-pattern=/druid/* spring.datasource.druid.filter.stat.enabled=true spring.datasource.druid.filter.stat.log-slow-sql=true -spring.datasource.druid.filter.stat.slow-sql-millis=3000 +spring.datasource.druid.filter.stat.slow-sql-millis=5000 spring.datasource.druid.stat-view-servlet.enabled=true spring.datasource.druid.stat-view-servlet.login-username=powerjob spring.datasource.druid.stat-view-servlet.login-password=powerjob diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index 4b4c7db2..82211504 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker-agent - 3.2.0 + 3.2.1 jar - 3.2.0 + 3.2.1 1.2.3 4.3.2 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index e11ca2cc..e2500e5a 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-samples - 3.2.0 + 3.2.1 2.2.6.RELEASE - 3.2.0 + 3.2.1 1.2.68 diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index e4b55ef1..63661bc0 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker - 3.2.0 + 3.2.1 jar 5.2.4.RELEASE - 3.2.0 + 3.2.1 1.4.200 3.4.2 5.6.1