feat: official processors adapt for workflow

This commit is contained in:
Echo009 2021-03-03 20:17:10 +08:00
parent 5f1ab82f0e
commit 7575bbd4e1
4 changed files with 19 additions and 8 deletions

View File

@ -1,6 +1,6 @@
package tech.powerjob.official.processors.impl;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONValidator;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
@ -24,14 +24,16 @@ import java.util.concurrent.TimeUnit;
*/
public class HttpProcessor extends CommonBasicProcessor {
// 60 seconds
/**
* 60 seconds
*/
private static final int DEFAULT_TIMEOUT = 60;
private static final Map<Integer, OkHttpClient> CLIENT_STORE = new ConcurrentHashMap<>();
@Override
public ProcessResult process0(TaskContext taskContext) throws Exception {
OmsLogger omsLogger = taskContext.getOmsLogger();
HttpParams httpParams = JSONObject.parseObject(CommonUtils.parseParams(taskContext), HttpParams.class);
HttpParams httpParams = JSON.parseObject(CommonUtils.parseParams(taskContext), HttpParams.class);
if (StringUtils.isEmpty(httpParams.url)) {
return new ProcessResult(false, "url can't be empty!");
@ -52,11 +54,9 @@ public class HttpProcessor extends CommonBasicProcessor {
}
// set default mediaType
if (!"GET".equals(httpParams.method) && StringUtils.isEmpty(httpParams.mediaType)) {
if (JSONValidator.from(httpParams.body).validate()) {
httpParams.mediaType = "application/json";
omsLogger.warn("try to use 'application/json' as media type");
}
if (!"GET".equals(httpParams.method) && StringUtils.isEmpty(httpParams.mediaType) && JSONValidator.from(httpParams.body).validate()) {
httpParams.mediaType = "application/json";
omsLogger.warn("try to use 'application/json' as media type");
}
// set default timeout

View File

@ -11,7 +11,15 @@ import org.apache.commons.lang3.StringUtils;
*/
public class CommonUtils {
private CommonUtils() {
}
public static String parseParams(TaskContext context) {
// 工作流中的总是优先使用 jobParams
if (context.getWfInstanceId() == null) {
return context.getJobParams();
}
if (StringUtils.isNotEmpty(context.getInstanceParams())) {
return context.getInstanceParams();
}

View File

@ -104,6 +104,7 @@ public class ProcessorRunnable implements Runnable {
TaskContext taskContext = new TaskContext();
BeanUtils.copyProperties(task, taskContext);
taskContext.setJobId(instanceInfo.getJobId());
taskContext.setWfInstanceId(instanceInfo.getWfInstanceId());
taskContext.setMaxRetryTimes(instanceInfo.getTaskRetryNum());
taskContext.setCurrentRetryTimes(task.getFailedCnt());
taskContext.setJobParams(instanceInfo.getJobParams());

View File

@ -31,6 +31,8 @@ public class TaskContext {
private Long instanceId;
private Long wfInstanceId;
private Long subInstanceId;
private String taskId;