[dev] change pom version to v3.2.1 & optmize OhMyClient's request

This commit is contained in:
朱八 2020-07-25 11:25:24 +08:00
parent f530ce89f6
commit 83ad854655
12 changed files with 69 additions and 26 deletions

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId> <artifactId>powerjob-client</artifactId>
<version>3.2.0</version> <version>3.2.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<powerjob.common.version>3.2.0</powerjob.common.version> <powerjob.common.version>3.2.1</powerjob.common.version>
<junit.version>5.6.1</junit.version> <junit.version>5.6.1</junit.version>
</properties> </properties>

View File

@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.OpenAPIConstant;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.*; import com.github.kfcfans.powerjob.common.response.*;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.HttpUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -31,13 +32,13 @@ public class OhMyClient {
private Long appId; private Long appId;
private String currentAddress; private String currentAddress;
private List<String> allAddress; private final List<String> allAddress;
private static final String URL_PATTERN = "http://%s%s%s"; private static final String URL_PATTERN = "http://%s%s%s";
/** /**
* 初始化 OhMyClient 客户端 * 初始化 OhMyClient 客户端
* @param domain www.oms-server.com内网域名自行完成DNS & Proxy * @param domain 比如 www.powerjob-server.com内网域名自行完成 DNS & Proxy
* @param appName 负责的应用名称 * @param appName 负责的应用名称
*/ */
public OhMyClient(String domain, String appName, String password) { public OhMyClient(String domain, String appName, String password) {
@ -52,8 +53,8 @@ public class OhMyClient {
*/ */
public OhMyClient(List<String> addressList, String appName, String password) { public OhMyClient(List<String> addressList, String appName, String password) {
Objects.requireNonNull(addressList, "domain can't be null!"); CommonUtils.requireNonNull(addressList, "domain can't be null!");
Objects.requireNonNull(appName, "appName can't be null"); CommonUtils.requireNonNull(appName, "appName can't be null");
allAddress = addressList; allAddress = addressList;
for (String addr : addressList) { for (String addr : addressList) {
@ -77,7 +78,7 @@ public class OhMyClient {
if (StringUtils.isEmpty(currentAddress)) { if (StringUtils.isEmpty(currentAddress)) {
throw new OmsException("no server available"); throw new OmsException("no server available");
} }
log.info("[OhMyClient] {}'s oms-client bootstrap successfully.", appName); log.info("[OhMyClient] {}'s oms-client bootstrap successfully, using server: {}", appName, currentAddress);
} }
private static String assertApp(String appName, String password, String url) throws IOException { private static String assertApp(String appName, String password, String url) throws IOException {
@ -211,6 +212,13 @@ public class OhMyClient {
return JsonUtils.parseObject(post, ResultDTO.class); return JsonUtils.parseObject(post, ResultDTO.class);
} }
/**
* 取消任务实例
* 接口使用条件调用接口时间与待取消任务的预计执行时间有一定时间间隔否则不保证可靠性
* @param instanceId 任务实例ID
* @return true 代表取消成功false 取消失败
* @throws Exception 异常
*/
public ResultDTO<Void> cancelInstance(Long instanceId) throws Exception { public ResultDTO<Void> cancelInstance(Long instanceId) throws Exception {
RequestBody body = new FormBody.Builder() RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString()) .add("instanceId", instanceId.toString())
@ -373,28 +381,35 @@ public class OhMyClient {
private String postHA(String path, RequestBody requestBody) { private String postHA(String path, RequestBody requestBody) {
// 先尝试默认地址 // 先尝试默认地址
String url = getUrl(path, currentAddress);
try { try {
String res = HttpUtils.post(getUrl(path, currentAddress), requestBody); String res = HttpUtils.post(url, requestBody);
if (StringUtils.isNotEmpty(res)) { if (StringUtils.isNotEmpty(res)) {
return res; return res;
} }
}catch (Exception ignore) { }catch (Exception e) {
log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString());
} }
// 失败开始重试 // 失败开始重试
for (String addr : allAddress) { for (String addr : allAddress) {
if (Objects.equals(addr, currentAddress)) {
continue;
}
url = getUrl(path, addr);
try { try {
String res = HttpUtils.post(getUrl(path, addr), requestBody); String res = HttpUtils.post(url, requestBody);
if (StringUtils.isNotEmpty(res)) { if (StringUtils.isNotEmpty(res)) {
log.warn("[OhMyClient] server change: from({}) -> to({}).", currentAddress, addr); log.warn("[OhMyClient] server change: from({}) -> to({}).", currentAddress, addr);
currentAddress = addr; currentAddress = addr;
return res; return res;
} }
}catch (Exception ignore) { }catch (Exception e) {
log.warn("[OhMyClient] request url:{} failed, reason is {}.", url, e.toString());
} }
} }
log.error("[OhMyClient] no server available in {}.", allAddress); log.error("[OhMyClient] do post for path: {} failed because of no server available in {}.", path, allAddress);
throw new OmsException("no server available"); throw new OmsException("no server available");
} }
} }

View File

@ -9,6 +9,8 @@ import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.concurrent.TimeUnit;
/** /**
* 测试 Client * 测试 Client
* *
@ -87,4 +89,24 @@ public class TestClient {
public void testFetchInstanceStatus() throws Exception { public void testFetchInstanceStatus() throws Exception {
System.out.println(ohMyClient.fetchInstanceStatus(141251409466097728L)); System.out.println(ohMyClient.fetchInstanceStatus(141251409466097728L));
} }
@Test
public void testCancelInstanceInTimeWheel() throws Exception {
ResultDTO<Long> startRes = ohMyClient.runJob(15L, "start by OhMyClient", 20000);
System.out.println("runJob result: " + JsonUtils.toJSONString(startRes));
ResultDTO<Void> cancelRes = ohMyClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes));
}
@Test
public void testCancelInstanceInDatabase() throws Exception {
ResultDTO<Long> startRes = ohMyClient.runJob(15L, "start by OhMyClient", 2000000);
System.out.println("runJob result: " + JsonUtils.toJSONString(startRes));
// 手动重启 server干掉时间轮中的调度数据
TimeUnit.MINUTES.sleep(1);
ResultDTO<Void> cancelRes = ohMyClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes));
}
} }

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId> <artifactId>powerjob-common</artifactId>
<version>3.2.0</version> <version>3.2.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>

View File

@ -17,7 +17,7 @@ import java.util.concurrent.TimeUnit;
*/ */
public class HttpUtils { public class HttpUtils {
private static OkHttpClient client; private static final OkHttpClient client;
private static final int HTTP_SUCCESS_CODE = 200; private static final int HTTP_SUCCESS_CODE = 200;
static { static {

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId> <artifactId>powerjob-server</artifactId>
<version>3.2.0</version> <version>3.2.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<swagger.version>2.9.2</swagger.version> <swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version> <springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.common.version>3.2.0</powerjob.common.version> <powerjob.common.version>3.2.1</powerjob.common.version>
<mysql.version>8.0.19</mysql.version> <mysql.version>8.0.19</mysql.version>
<h2.db.version>1.4.200</h2.db.version> <h2.db.version>1.4.200</h2.db.version>
<zip4j.version>2.5.2</zip4j.version> <zip4j.version>2.5.2</zip4j.version>

View File

@ -102,6 +102,8 @@ public class JobService {
*/ */
public long runJob(Long jobId, String instanceParams, long delay) { public long runJob(Long jobId, String instanceParams, long delay) {
log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobId, instanceParams, delay);
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0)); Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
instanceInfoRepository.flush(); instanceInfoRepository.flush();
@ -113,6 +115,7 @@ public class JobService {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
}); });
} }
log.info("[Job-{}] run job successfully, instanceId={}", jobId, instanceId);
return instanceId; return instanceId;
} }
@ -170,7 +173,7 @@ public class JobService {
return; return;
} }
if (executeLogs.size() > 1) { if (executeLogs.size() > 1) {
log.warn("[JobService] frequent job should just have one running instance, there must have some bug."); log.warn("[Job-{}] frequent job should just have one running instance, there must have some bug.", jobId);
} }
executeLogs.forEach(instance -> { executeLogs.forEach(instance -> {
try { try {

View File

@ -20,6 +20,8 @@ public class InstanceTimeWheelService {
// 精确时间轮 1S 走一格 // 精确时间轮 1S 走一格
private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4); private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
// 支持取消的时间间隔低于该阈值则不会放进 CARGO
private static final long MIN_INTERVAL_MS = 1000;
/** /**
* 定时调度 * 定时调度
@ -32,8 +34,9 @@ public class InstanceTimeWheelService {
CARGO.remove(uniqueId); CARGO.remove(uniqueId);
timerTask.run(); timerTask.run();
}, delayMS, TimeUnit.MILLISECONDS); }, delayMS, TimeUnit.MILLISECONDS);
if (delayMS > MIN_INTERVAL_MS) {
CARGO.put(uniqueId, timerFuture); CARGO.put(uniqueId, timerFuture);
}
} }
/** /**

View File

@ -16,7 +16,7 @@ spring.datasource.druid.filters=stat
spring.datasource.druid.stat-view-servlet.url-pattern=/druid/* spring.datasource.druid.stat-view-servlet.url-pattern=/druid/*
spring.datasource.druid.filter.stat.enabled=true spring.datasource.druid.filter.stat.enabled=true
spring.datasource.druid.filter.stat.log-slow-sql=true spring.datasource.druid.filter.stat.log-slow-sql=true
spring.datasource.druid.filter.stat.slow-sql-millis=3000 spring.datasource.druid.filter.stat.slow-sql-millis=5000
spring.datasource.druid.stat-view-servlet.enabled=true spring.datasource.druid.stat-view-servlet.enabled=true
spring.datasource.druid.stat-view-servlet.login-username=powerjob spring.datasource.druid.stat-view-servlet.login-username=powerjob
spring.datasource.druid.stat-view-servlet.login-password=powerjob spring.datasource.druid.stat-view-servlet.login-password=powerjob

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId> <artifactId>powerjob-worker-agent</artifactId>
<version>3.2.0</version> <version>3.2.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<powerjob.worker.version>3.2.0</powerjob.worker.version> <powerjob.worker.version>3.2.1</powerjob.worker.version>
<logback.version>1.2.3</logback.version> <logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version> <picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId> <artifactId>powerjob-worker-samples</artifactId>
<version>3.2.0</version> <version>3.2.1</version>
<properties> <properties>
<springboot.version>2.2.6.RELEASE</springboot.version> <springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.version>3.2.0</powerjob.worker.version> <powerjob.worker.version>3.2.1</powerjob.worker.version>
<fastjson.version>1.2.68</fastjson.version> <fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module --> <!-- 部署时跳过该module -->

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId> <artifactId>powerjob-worker</artifactId>
<version>3.2.0</version> <version>3.2.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<spring.version>5.2.4.RELEASE</spring.version> <spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.2.0</powerjob.common.version> <powerjob.common.version>3.2.1</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version> <h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version> <hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version> <junit.version>5.6.1</junit.version>