feat: PowerJobClient Support Authentication

This commit is contained in:
tjq 2024-08-10 11:41:07 +08:00
parent 53be566173
commit e711ed7251
14 changed files with 409 additions and 303 deletions

View File

@ -15,6 +15,7 @@
<properties>
<junit.version>5.9.1</junit.version>
<logback.version>1.2.13</logback.version>
<fastjson.version>1.2.83</fastjson.version>
<powerjob.common.version>5.0.1-beta</powerjob.common.version>
@ -44,6 +45,13 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- log for test stage -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -3,6 +3,7 @@ package tech.powerjob.client;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import tech.powerjob.client.common.Protocol;
import java.io.Serializable;
@ -18,6 +19,7 @@ import java.util.Map;
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class ClientConfig implements Serializable {
/**

View File

@ -1,28 +1,29 @@
package tech.powerjob.client;
import com.alibaba.fastjson.JSON;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.OmsConstant;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.client.module.AppAuthRequest;
import tech.powerjob.client.module.AppAuthResult;
import tech.powerjob.client.service.PowerRequestBody;
import tech.powerjob.client.service.RequestService;
import tech.powerjob.client.service.impl.ClusterRequestServiceOkHttp3Impl;
import tech.powerjob.common.OpenAPIConstant;
import tech.powerjob.common.enums.InstanceStatus;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
import tech.powerjob.common.request.http.SaveWorkflowRequest;
import tech.powerjob.common.request.query.JobInfoQuery;
import tech.powerjob.common.response.*;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.common.serialize.JsonUtils;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import okhttp3.FormBody;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.DigestUtils;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Map;
import static tech.powerjob.client.TypeStore.*;
@ -36,11 +37,40 @@ import static tech.powerjob.client.TypeStore.*;
public class PowerJobClient implements IPowerJobClient {
private Long appId;
private String currentAddress;
private final List<String> allAddress;
private final RequestService requestService;
private static final String URL_PATTERN = "http://%s%s%s";
public PowerJobClient(ClientConfig config) {
List<String> addressList = config.getAddressList();
String appName = config.getAppName();
CommonUtils.requireNonNull(addressList, "addressList can't be null!");
CommonUtils.requireNonNull(appName, "appName can't be null");
this.requestService = new ClusterRequestServiceOkHttp3Impl(config);
AppAuthRequest appAuthRequest = new AppAuthRequest();
appAuthRequest.setAppName(appName);
appAuthRequest.setEncryptedPassword(DigestUtils.md5(config.getPassword()));
String assertResponse = requestService.request(OpenAPIConstant.AUTH_APP, PowerRequestBody.newJsonRequestBody(appAuthRequest));
if (StringUtils.isNotEmpty(assertResponse)) {
ResultDTO<AppAuthResult> resultDTO = JSON.parseObject(assertResponse, APP_AUTH_RESULT_TYPE);
if (resultDTO.isSuccess()) {
appId = resultDTO.getData().getAppId();
} else {
throw new PowerJobException(resultDTO.getMessage());
}
}
if (appId == null) {
throw new PowerJobException("appId is null, please check your config");
}
log.info("[PowerJobClient] [INIT] {}'s PowerJobClient bootstrap successfully", appName);
}
/**
* Init PowerJobClient with domain, appName and password.
*
@ -49,7 +79,7 @@ public class PowerJobClient implements IPowerJobClient {
* @param password password of the application
*/
public PowerJobClient(String domain, String appName, String password) {
this(Lists.newArrayList(domain), appName, password);
this(new ClientConfig().setAppName(appName).setPassword(password).setAddressList(Lists.newArrayList(domain)));
}
@ -61,48 +91,7 @@ public class PowerJobClient implements IPowerJobClient {
* @param password password of the application
*/
public PowerJobClient(List<String> addressList, String appName, String password) {
CommonUtils.requireNonNull(addressList, "addressList can't be null!");
CommonUtils.requireNonNull(appName, "appName can't be null");
allAddress = addressList;
for (String addr : addressList) {
String url = getUrl(OpenAPIConstant.ASSERT, addr);
try {
String result = assertApp(appName, password, url);
if (StringUtils.isNotEmpty(result)) {
ResultDTO<Long> resultDTO = JSON.parseObject(result, LONG_RESULT_TYPE);
if (resultDTO.isSuccess()) {
appId = resultDTO.getData();
currentAddress = addr;
break;
} else {
throw new PowerJobException(resultDTO.getMessage());
}
}
} catch (IOException ignore) {
//
}
}
if (StringUtils.isEmpty(currentAddress)) {
throw new PowerJobException("no server available for PowerJobClient");
}
log.info("[PowerJobClient] {}'s PowerJobClient bootstrap successfully, using server: {}", appName, currentAddress);
}
private static String assertApp(String appName, String password, String url) throws IOException {
FormBody.Builder builder = new FormBody.Builder()
.add("appName", appName);
if (password != null) {
builder.add("password", password);
}
return HttpUtils.post(url, builder.build());
}
private static String getUrl(String path, String address) {
return String.format(URL_PATTERN, address, OpenAPIConstant.WEB_PATH, path);
this(new ClientConfig().setAppName(appName).setPassword(password).setAddressList(addressList));
}
/* ************* Job 区 ************* */
@ -118,9 +107,7 @@ public class PowerJobClient implements IPowerJobClient {
public ResultDTO<Long> saveJob(SaveJobInfoRequest request) {
request.setAppId(appId);
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
String json = JSON.toJSONString(request);
String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json));
String post = requestService.request(OpenAPIConstant.SAVE_JOB, PowerRequestBody.newJsonRequestBody(request));
return JSON.parseObject(post, LONG_RESULT_TYPE);
}
@ -133,21 +120,20 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Long> copyJob(Long jobId) {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.COPY_JOB, body);
Map<String, String> param = Maps.newHashMap();
param.put("jobId", jobId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.COPY_JOB, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, LONG_RESULT_TYPE);
}
@Override
public ResultDTO<SaveJobInfoRequest> exportJob(Long jobId) {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.EXPORT_JOB, body);
Map<String, String> param = Maps.newHashMap();
param.put("jobId", jobId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.EXPORT_JOB, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, SAVE_JOB_INFO_REQUEST_RESULT_TYPE);
}
@ -159,11 +145,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<JobInfoDTO> fetchJob(Long jobId) {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_JOB, body);
Map<String, String> param = Maps.newHashMap();
param.put("jobId", jobId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.FETCH_JOB, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, JOB_RESULT_TYPE);
}
@ -174,10 +159,9 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<List<JobInfoDTO>> fetchAllJob() {
RequestBody body = new FormBody.Builder()
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_ALL_JOB, body);
Map<String, String> param = Maps.newHashMap();
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.FETCH_ALL_JOB, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, LIST_JOB_RESULT_TYPE);
}
@ -190,9 +174,7 @@ public class PowerJobClient implements IPowerJobClient {
@Override
public ResultDTO<List<JobInfoDTO>> queryJob(JobInfoQuery powerQuery) {
powerQuery.setAppIdEq(appId);
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
String json = JsonUtils.toJSONStringUnsafe(powerQuery);
String post = postHA(OpenAPIConstant.QUERY_JOB, RequestBody.create(jsonType, json));
String post = requestService.request(OpenAPIConstant.QUERY_JOB, PowerRequestBody.newJsonRequestBody(powerQuery));
return JSON.parseObject(post, LIST_JOB_RESULT_TYPE);
}
@ -204,11 +186,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Void> disableJob(Long jobId) {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DISABLE_JOB, body);
Map<String, String> param = Maps.newHashMap();
param.put("jobId", jobId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.DISABLE_JOB, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
@ -220,11 +201,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Void> enableJob(Long jobId) {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.ENABLE_JOB, body);
Map<String, String> param = Maps.newHashMap();
param.put("jobId", jobId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.ENABLE_JOB, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
@ -236,11 +216,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Void> deleteJob(Long jobId) {
RequestBody body = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DELETE_JOB, body);
Map<String, String> param = Maps.newHashMap();
param.put("jobId", jobId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.DELETE_JOB, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
@ -254,15 +233,16 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Long> runJob(Long jobId, String instanceParams, long delayMS) {
FormBody.Builder builder = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString())
.add("delay", String.valueOf(delayMS));
Map<String, String> param = Maps.newHashMap();
param.put("jobId", jobId.toString());
param.put("appId", appId.toString());
param.put("delay", String.valueOf(delayMS));
if (StringUtils.isNotEmpty(instanceParams)) {
builder.add("instanceParams", instanceParams);
param.put("instanceParams", instanceParams);
}
String post = postHA(OpenAPIConstant.RUN_JOB, builder.build());
String post = requestService.request(OpenAPIConstant.RUN_JOB, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, LONG_RESULT_TYPE);
}
@ -280,11 +260,12 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Void> stopInstance(Long instanceId) {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.STOP_INSTANCE, body);
Map<String, String> param = Maps.newHashMap();
param.put("instanceId", instanceId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.STOP_INSTANCE, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
@ -297,11 +278,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Void> cancelInstance(Long instanceId) {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.CANCEL_INSTANCE, body);
Map<String, String> param = Maps.newHashMap();
param.put("instanceId", instanceId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.CANCEL_INSTANCE, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
@ -314,11 +294,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Void> retryInstance(Long instanceId) {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.RETRY_INSTANCE, body);
Map<String, String> param = Maps.newHashMap();
param.put("instanceId", instanceId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.RETRY_INSTANCE, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
@ -330,10 +309,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Integer> fetchInstanceStatus(Long instanceId) {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_INSTANCE_STATUS, body);
Map<String, String> param = Maps.newHashMap();
param.put("instanceId", instanceId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.FETCH_INSTANCE_STATUS, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, INTEGER_RESULT_TYPE);
}
@ -345,10 +324,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<InstanceInfoDTO> fetchInstanceInfo(Long instanceId) {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_INSTANCE_INFO, body);
Map<String, String> param = Maps.newHashMap();
param.put("instanceId", instanceId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.FETCH_INSTANCE_INFO, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, INSTANCE_RESULT_TYPE);
}
@ -364,10 +343,9 @@ public class PowerJobClient implements IPowerJobClient {
@Override
public ResultDTO<Long> saveWorkflow(SaveWorkflowRequest request) {
request.setAppId(appId);
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
// 中坑记录 FastJSON 序列化会导致 Server 接收时 pEWorkflowDAG null无语.jpg
String json = JsonUtils.toJSONStringUnsafe(request);
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json));
String post = requestService.request(OpenAPIConstant.SAVE_WORKFLOW, PowerRequestBody.newJsonRequestBody(json));
return JSON.parseObject(post, LONG_RESULT_TYPE);
}
@ -379,11 +357,12 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Long> copyWorkflow(Long workflowId) {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.COPY_WORKFLOW, body);
Map<String, String> param = Maps.newHashMap();
param.put("workflowId", workflowId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.COPY_WORKFLOW, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, LONG_RESULT_TYPE);
}
@ -399,9 +378,9 @@ public class PowerJobClient implements IPowerJobClient {
for (SaveWorkflowNodeRequest saveWorkflowNodeRequest : requestList) {
saveWorkflowNodeRequest.setAppId(appId);
}
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
String json = JsonUtils.toJSONStringUnsafe(requestList);
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW_NODE, RequestBody.create(jsonType, json));
String post = requestService.request(OpenAPIConstant.SAVE_WORKFLOW_NODE, PowerRequestBody.newJsonRequestBody(json));
return JSON.parseObject(post, WF_NODE_LIST_RESULT_TYPE);
}
@ -415,11 +394,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<WorkflowInfoDTO> fetchWorkflow(Long workflowId) {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_WORKFLOW, body);
Map<String, String> param = Maps.newHashMap();
param.put("workflowId", workflowId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.FETCH_WORKFLOW, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, WF_RESULT_TYPE);
}
@ -431,11 +409,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Void> disableWorkflow(Long workflowId) {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DISABLE_WORKFLOW, body);
Map<String, String> param = Maps.newHashMap();
param.put("workflowId", workflowId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.DISABLE_WORKFLOW, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
@ -447,11 +424,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Void> enableWorkflow(Long workflowId) {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.ENABLE_WORKFLOW, body);
Map<String, String> param = Maps.newHashMap();
param.put("workflowId", workflowId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.ENABLE_WORKFLOW, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
@ -463,11 +439,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Void> deleteWorkflow(Long workflowId) {
RequestBody body = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DELETE_WORKFLOW, body);
Map<String, String> param = Maps.newHashMap();
param.put("workflowId", workflowId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.DELETE_WORKFLOW, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
@ -481,14 +456,17 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Long> runWorkflow(Long workflowId, String initParams, long delayMS) {
FormBody.Builder builder = new FormBody.Builder()
.add("workflowId", workflowId.toString())
.add("appId", appId.toString())
.add("delay", String.valueOf(delayMS));
Map<String, String> param = Maps.newHashMap();
param.put("workflowId", workflowId.toString());
param.put("appId", appId.toString());
param.put("delay", String.valueOf(delayMS));
if (StringUtils.isNotEmpty(initParams)) {
builder.add("initParams", initParams);
param.put("initParams", initParams);
}
String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build());
String post = requestService.request(OpenAPIConstant.RUN_WORKFLOW, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, LONG_RESULT_TYPE);
}
@ -506,11 +484,12 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Void> stopWorkflowInstance(Long wfInstanceId) {
RequestBody body = new FormBody.Builder()
.add("wfInstanceId", wfInstanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.STOP_WORKFLOW_INSTANCE, body);
Map<String, String> param = Maps.newHashMap();
param.put("wfInstanceId", wfInstanceId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.STOP_WORKFLOW_INSTANCE, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
@ -522,11 +501,10 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Void> retryWorkflowInstance(Long wfInstanceId) {
RequestBody body = new FormBody.Builder()
.add("wfInstanceId", wfInstanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.RETRY_WORKFLOW_INSTANCE, body);
Map<String, String> param = Maps.newHashMap();
param.put("wfInstanceId", wfInstanceId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.RETRY_WORKFLOW_INSTANCE, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
@ -539,12 +517,13 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<Void> markWorkflowNodeAsSuccess(Long wfInstanceId, Long nodeId) {
RequestBody body = new FormBody.Builder()
.add("wfInstanceId", wfInstanceId.toString())
.add("nodeId", nodeId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.MARK_WORKFLOW_NODE_AS_SUCCESS, body);
Map<String, String> param = Maps.newHashMap();
param.put("wfInstanceId", wfInstanceId.toString());
param.put("appId", appId.toString());
param.put("nodeId", nodeId.toString());
String post = requestService.request(OpenAPIConstant.MARK_WORKFLOW_NODE_AS_SUCCESS, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, VOID_RESULT_TYPE);
}
@ -556,47 +535,13 @@ public class PowerJobClient implements IPowerJobClient {
*/
@Override
public ResultDTO<WorkflowInstanceInfoDTO> fetchWorkflowInstanceInfo(Long wfInstanceId) {
RequestBody body = new FormBody.Builder()
.add("wfInstanceId", wfInstanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO, body);
Map<String, String> param = Maps.newHashMap();
param.put("wfInstanceId", wfInstanceId.toString());
param.put("appId", appId.toString());
String post = requestService.request(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO, PowerRequestBody.newFormRequestBody(param));
return JSON.parseObject(post, WF_INSTANCE_RESULT_TYPE);
}
private String postHA(String path, RequestBody requestBody) {
// 先尝试默认地址
String url = getUrl(path, currentAddress);
try {
String res = HttpUtils.post(url, requestBody);
if (StringUtils.isNotEmpty(res)) {
return res;
}
} catch (IOException e) {
log.warn("[PowerJobClient] request url:{} failed, reason is {}.", url, e.toString());
}
// 失败开始重试
for (String addr : allAddress) {
if (Objects.equals(addr, currentAddress)) {
continue;
}
url = getUrl(path, addr);
try {
String res = HttpUtils.post(url, requestBody);
if (StringUtils.isNotEmpty(res)) {
log.warn("[PowerJobClient] server change: from({}) -> to({}).", currentAddress, addr);
currentAddress = addr;
return res;
}
} catch (IOException e) {
log.warn("[PowerJobClient] request url:{} failed, reason is {}.", url, e.toString());
}
}
log.error("[PowerJobClient] do post for path: {} failed because of no server available in {}.", path, allAddress);
throw new PowerJobException("no server available when send post request");
}
}

View File

@ -1,6 +1,7 @@
package tech.powerjob.client;
import com.alibaba.fastjson.TypeReference;
import tech.powerjob.client.module.AppAuthResult;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.response.*;
@ -14,6 +15,7 @@ import java.util.List;
*/
public class TypeStore {
public static final TypeReference<ResultDTO<AppAuthResult>> APP_AUTH_RESULT_TYPE = new TypeReference<ResultDTO<AppAuthResult>>(){};
public static final TypeReference<ResultDTO<Void>> VOID_RESULT_TYPE = new TypeReference<ResultDTO<Void>>(){};
public static final TypeReference<ResultDTO<Integer>> INTEGER_RESULT_TYPE = new TypeReference<ResultDTO<Integer>>(){};

View File

@ -0,0 +1,26 @@
package tech.powerjob.client.service;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.Map;
/**
* HTTP 响应
*
* @author tjq
* @since 2024/8/10
*/
@Data
@Accessors(chain = true)
public class HttpResponse implements Serializable {
private boolean success;
private int code;
private String response;
private Map<String, String> headers;
}

View File

@ -0,0 +1,47 @@
package tech.powerjob.client.service;
import com.google.common.collect.Maps;
import lombok.Getter;
import tech.powerjob.common.enums.MIME;
import java.util.Map;
/**
* 请求体
*
* @author tjq
* @since 2024/8/10
*/
@Getter
public class PowerRequestBody {
private MIME mime;
private Object payload;
private final Map<String, String> headers = Maps.newHashMap();
private PowerRequestBody() {
}
public static PowerRequestBody newJsonRequestBody(Object data) {
PowerRequestBody powerRequestBody = new PowerRequestBody();
powerRequestBody.mime = MIME.APPLICATION_JSON;
powerRequestBody.payload = data;
return powerRequestBody;
}
public static PowerRequestBody newFormRequestBody(Map<String, String> form) {
PowerRequestBody powerRequestBody = new PowerRequestBody();
powerRequestBody.mime = MIME.APPLICATION_FORM;
powerRequestBody.payload = form;
return powerRequestBody;
}
public void addHeaders(Map<String, String> hs) {
if (hs == null || hs.isEmpty()) {
return;
}
this.headers.putAll(hs);
}
}

View File

@ -9,5 +9,5 @@ package tech.powerjob.client.service;
public interface RequestService {
String request(String path, Object body);
String request(String path, PowerRequestBody powerRequestBody);
}

View File

@ -1,16 +1,19 @@
package tech.powerjob.client.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.client.ClientConfig;
import tech.powerjob.client.TypeStore;
import tech.powerjob.client.module.AppAuthRequest;
import tech.powerjob.client.module.AppAuthResult;
import tech.powerjob.client.service.HttpResponse;
import tech.powerjob.client.service.PowerRequestBody;
import tech.powerjob.common.OpenAPIConstant;
import tech.powerjob.common.enums.ErrorCodes;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.utils.DigestUtils;
import tech.powerjob.common.utils.MapUtils;
import java.util.Map;
@ -30,18 +33,41 @@ abstract class AppAuthClusterRequestService extends ClusterRequestService {
@Override
public String request(String path, Object body) {
public String request(String path, PowerRequestBody powerRequestBody) {
// 若不存在 appAuthResult则首先进行鉴权
if (appAuthResult == null) {
refreshAppAuthResult();
}
HttpResponse httpResponse = doRequest(path, powerRequestBody);
// 如果 auth 成功则代表请求有效直接返回
String authStatus = MapUtils.getString(httpResponse.getHeaders(), OpenAPIConstant.RESPONSE_HEADER_AUTH_STATUS);
if (Boolean.TRUE.toString().equalsIgnoreCase(authStatus)) {
return httpResponse.getResponse();
}
// 否则请求无效刷新鉴权后重新请求
refreshAppAuthResult();
httpResponse = doRequest(path, powerRequestBody);
// 只要请求不失败直接返回如果鉴权失败则返回鉴权错误信息server 保证 response 永远非空
return httpResponse.getResponse();
}
private HttpResponse doRequest(String path, PowerRequestBody powerRequestBody) {
// 添加鉴权信息
Map<String, String> authHeaders = buildAuthHeader();
String clusterResponse = clusterHaRequest(path, body, authHeaders);
powerRequestBody.addHeaders(authHeaders);
// TODO
HttpResponse httpResponse = clusterHaRequest(path, powerRequestBody);
return null;
// 任何请求不成功都直接报错
if (!httpResponse.isSuccess()) {
throw new PowerJobException("REMOTE_SERVER_INNER_EXCEPTION");
}
return httpResponse;
}
private Map<String, String> buildAuthHeader() {
@ -54,12 +80,15 @@ abstract class AppAuthClusterRequestService extends ClusterRequestService {
@SneakyThrows
private void refreshAppAuthResult() {
AppAuthRequest appAuthRequest = buildAppAuthRequest();
String authResponse = clusterHaRequest(OpenAPIConstant.AUTH_APP, appAuthRequest, null);
if (StringUtils.isEmpty(authResponse)) {
throw new PowerJobException(ErrorCodes.CLIENT_HTTP_REQUEST_FAILED, "EMPTY_RESPONSE");
HttpResponse httpResponse = clusterHaRequest(OpenAPIConstant.AUTH_APP, PowerRequestBody.newJsonRequestBody(appAuthRequest));
if (!httpResponse.isSuccess()) {
throw new PowerJobException("auth_app_exception!");
}
this.appAuthResult = JsonUtils.parseObject(authResponse, AppAuthResult.class);
ResultDTO<AppAuthResult> authResultDTO = JSONObject.parseObject(httpResponse.getResponse(), TypeStore.APP_AUTH_RESULT_TYPE);
if (!authResultDTO.isSuccess()) {
throw new PowerJobException("auth_failed:" + authResultDTO.getMessage());
}
this.appAuthResult = authResultDTO.getData();
}
protected AppAuthRequest buildAppAuthRequest() {

View File

@ -1,18 +1,17 @@
package tech.powerjob.client.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.client.ClientConfig;
import tech.powerjob.client.service.HttpResponse;
import tech.powerjob.client.service.PowerRequestBody;
import tech.powerjob.client.service.RequestService;
import tech.powerjob.common.OpenAPIConstant;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.serialize.JsonUtils;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
@ -47,37 +46,32 @@ abstract class ClusterRequestService implements RequestService {
public ClusterRequestService(ClientConfig config) {
this.config = config;
this.currentAddress = config.getAddressList().get(0);
}
/**
* 具体某一次 HTTP 请求的实现
* @param url 完整请求地址
* @param body 请求体
* @param headers 请求头
* @return 响应
* @throws IOException 异常
*/
protected abstract String sendHttpRequest(String url, String body, Map<String, String> headers) throws IOException;
protected abstract HttpResponse sendHttpRequest(String url, PowerRequestBody body) throws IOException;
/**
* 封装集群请求能力
* @param path 请求 PATH
* @param obj 请求体
* @param powerRequestBody 请求体
* @param headers 请求头
* @return 响应
*/
protected String clusterHaRequest(String path, Object obj, Map<String, String> headers) {
String body = obj instanceof String ? (String) obj : JsonUtils.toJSONStringUnsafe(obj);
protected HttpResponse clusterHaRequest(String path, PowerRequestBody powerRequestBody) {
List<String> addressList = config.getAddressList();
// 先尝试默认地址
String url = getUrl(path, currentAddress);
try {
String res = sendHttpRequest(url, body, headers);
if (StringUtils.isNotEmpty(res)) {
return res;
}
return sendHttpRequest(url, powerRequestBody);
} catch (IOException e) {
log.warn("[ClusterRequestService] request url:{} failed, reason is {}.", url, e.toString());
}
@ -89,12 +83,10 @@ abstract class ClusterRequestService implements RequestService {
}
url = getUrl(path, addr);
try {
String res = sendHttpRequest(url, body, headers);
if (StringUtils.isNotEmpty(res)) {
log.warn("[ClusterRequestService] server change: from({}) -> to({}).", currentAddress, addr);
currentAddress = addr;
return res;
}
HttpResponse res = sendHttpRequest(url, powerRequestBody);
log.warn("[ClusterRequestService] server change: from({}) -> to({}).", currentAddress, addr);
currentAddress = addr;
return res;
} catch (IOException e) {
log.warn("[ClusterRequestService] request url:{} failed, reason is {}.", url, e.toString());
}

View File

@ -6,14 +6,17 @@ import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import tech.powerjob.client.ClientConfig;
import tech.powerjob.client.common.Protocol;
import tech.powerjob.client.service.HttpResponse;
import tech.powerjob.client.service.PowerRequestBody;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.serialize.JsonUtils;
import javax.net.ssl.*;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@ -23,7 +26,7 @@ import java.util.concurrent.TimeUnit;
* @since 2024/2/20
*/
@Slf4j
public class ClusterRequestServiceOkHttp3Impl extends ClusterRequestService {
public class ClusterRequestServiceOkHttp3Impl extends AppAuthClusterRequestService {
private final OkHttpClient okHttpClient;
@ -40,42 +43,55 @@ public class ClusterRequestServiceOkHttp3Impl extends ClusterRequestService {
}
@Override
public String request(String path, Object body) {
// TODO
return null;
}
protected HttpResponse sendHttpRequest(String url, PowerRequestBody powerRequestBody) throws IOException {
@Override
protected String sendHttpRequest(String url, String payload, Map<String, String> h) throws IOException {
// 添加公共 header
powerRequestBody.addHeaders(config.getDefaultHeaders());
// 公共 header
Map<String, String> headers = Maps.newHashMap();
if (config.getDefaultHeaders() != null) {
headers.putAll(config.getDefaultHeaders());
}
if (h != null) {
headers.putAll(h);
Object obj = powerRequestBody.getPayload();
RequestBody requestBody = null;
switch (powerRequestBody.getMime()) {
case APPLICATION_JSON:
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
String body = obj instanceof String ? (String) obj : JsonUtils.toJSONStringUnsafe(obj);
requestBody = RequestBody.create(jsonType, body);
break;
case APPLICATION_FORM:
FormBody.Builder formBuilder = new FormBody.Builder();
Map<String, String> formObj = (Map<String, String>) obj;
formObj.forEach(formBuilder::add);
requestBody = formBuilder.build();
}
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
RequestBody requestBody = RequestBody.create(jsonType, payload);
Request request = new Request.Builder()
.post(requestBody)
.headers(Headers.of(powerRequestBody.getHeaders()))
.url(url)
.headers(Headers.of(headers))
.build();
try (Response response = okHttpClient.newCall(request).execute()) {
int responseCode = response.code();
if (responseCode == HTTP_SUCCESS_CODE) {
ResponseBody body = response.body();
if (body == null) {
return null;
}else {
return body.string();
}
int code = response.code();
HttpResponse httpResponse = new HttpResponse()
.setCode(code)
.setSuccess(code == HTTP_SUCCESS_CODE);
ResponseBody body = response.body();
if (body != null) {
httpResponse.setResponse(body.string());
}
throw new PowerJobException(String.format("http request failed,code=%d", responseCode));
Headers respHeaders = response.headers();
Set<String> headerNames = respHeaders.names();
Map<String, String> respHeaderMap = Maps.newHashMap();
headerNames.forEach(hdKey -> respHeaderMap.put(hdKey, respHeaders.get(hdKey)));
httpResponse.setHeaders(respHeaderMap);
return httpResponse;
}
}

View File

@ -1,6 +1,8 @@
package tech.powerjob.client.test;
import com.alibaba.fastjson.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import tech.powerjob.client.PowerJobClient;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
@ -9,11 +11,6 @@ import tech.powerjob.common.request.http.SaveJobInfoRequest;
import tech.powerjob.common.response.InstanceInfoDTO;
import tech.powerjob.common.response.JobInfoDTO;
import tech.powerjob.common.response.ResultDTO;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.concurrent.TimeUnit;
/**
* Test cases for {@link PowerJobClient}
@ -24,15 +21,15 @@ import java.util.concurrent.TimeUnit;
*/
class TestClient extends ClientInitializer {
public static final long JOB_ID = 4L;
public static final long JOB_ID = 1L;
@Test
void testSaveJob() {
SaveJobInfoRequest newJobInfo = new SaveJobInfoRequest();
newJobInfo.setId(JOB_ID);
newJobInfo.setJobName("omsOpenAPIJobccccc");
newJobInfo.setJobDescription("test OpenAPI");
newJobInfo.setJobName("omsOpenAPIJobccccc" + System.currentTimeMillis());
newJobInfo.setJobDescription("test OpenAPI" + System.currentTimeMillis());
newJobInfo.setJobParams("{'aa':'bb'}");
newJobInfo.setTimeExpressionType(TimeExpressionType.CRON);
newJobInfo.setTimeExpression("0 0 * * * ? ");
@ -107,21 +104,21 @@ class TestClient extends ClientInitializer {
@Test
void testFetchInstanceInfo() {
ResultDTO<InstanceInfoDTO> res = powerJobClient.fetchInstanceInfo(205436386851946560L);
ResultDTO<InstanceInfoDTO> res = powerJobClient.fetchInstanceInfo(702482902331424832L);
System.out.println(res);
Assertions.assertNotNull(res);
}
@Test
void testStopInstance() {
ResultDTO<Void> res = powerJobClient.stopInstance(205436995885858880L);
ResultDTO<Void> res = powerJobClient.stopInstance(702482902331424832L);
System.out.println(res);
Assertions.assertNotNull(res);
}
@Test
void testFetchInstanceStatus() {
ResultDTO<Integer> res = powerJobClient.fetchInstanceStatus(205436995885858880L);
ResultDTO<Integer> res = powerJobClient.fetchInstanceStatus(702482902331424832L);
System.out.println(res);
Assertions.assertNotNull(res);
}
@ -135,19 +132,19 @@ class TestClient extends ClientInitializer {
Assertions.assertTrue(cancelRes.isSuccess());
}
@Test
@SneakyThrows
void testCancelInstanceInDatabase() {
ResultDTO<Long> startRes = powerJobClient.runJob(15L, "start by OhMyClient", 2000000);
System.out.println("runJob result: " + JSONObject.toJSONString(startRes));
// Restart server manually and clear all the data in time wheeler.
TimeUnit.MINUTES.sleep(1);
ResultDTO<Void> cancelRes = powerJobClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JSONObject.toJSONString(cancelRes));
Assertions.assertTrue(cancelRes.isSuccess());
}
// @Test
// @SneakyThrows
// void testCancelInstanceInDatabase() {
// ResultDTO<Long> startRes = powerJobClient.runJob(15L, "start by OhMyClient", 2000000);
// System.out.println("runJob result: " + JSONObject.toJSONString(startRes));
//
// // Restart server manually and clear all the data in time wheeler.
// TimeUnit.MINUTES.sleep(1);
//
// ResultDTO<Void> cancelRes = powerJobClient.cancelInstance(startRes.getData());
// System.out.println("cancelJob result: " + JSONObject.toJSONString(cancelRes));
// Assertions.assertTrue(cancelRes.isSuccess());
// }
@Test
void testRetryInstance() {

View File

@ -29,7 +29,7 @@ import java.util.List;
*/
class TestWorkflow extends ClientInitializer {
private static final long WF_ID = 1;
private static final long WF_ID = 2;
@Test
void initTestData() {

View File

@ -0,0 +1,22 @@
package tech.powerjob.common.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* <a href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types">消息内容类型</a>
*
* @author tjq
* @since 2024/8/10
*/
@Getter
@AllArgsConstructor
public enum MIME {
APPLICATION_JSON("application/json; charset=utf-8"),
APPLICATION_FORM("application/x-www-form-urlencoded")
;
private final String code;
}

View File

@ -12,6 +12,26 @@ import java.util.Map;
*/
public class MapUtils {
public static <K> String getString(Map<? super K, ?> map, K key) {
if (map != null) {
Object answer = map.get(key);
if (answer != null) {
return answer.toString();
}
}
return null;
}
public static <K> String getString(Map<? super K, ?> map, K key, String defaultValue) {
String answer = getString(map, key);
if (answer == null) {
answer = defaultValue;
}
return answer;
}
public static <K> Long getLong(Map<? super K, ?> map, K key, Long defaultValue) {
Long answer = getLong(map, key);
if (answer == null) {