refactor: 将工作流上下文相关的逻辑集中至 WorkflowContext

This commit is contained in:
Echo009 2021-02-19 15:48:43 +08:00
parent fa3981d167
commit d6d461c77e
6 changed files with 104 additions and 67 deletions

View File

@ -59,7 +59,7 @@ public class MapProcessorDemo extends MapProcessor {
return map(subTasks, "MAP_TEST_TASK"); return map(subTasks, "MAP_TEST_TASK");
}else { }else {
// 测试在 Map 任务中追加上下文 // 测试在 Map 任务中追加上下文
context.appendData2WfContext("Yasuo","A sword's poor company for a long road."); context.getWorkflowContext().appendData2WfContext("Yasuo","A sword's poor company for a long road.");
System.out.println("==== PROCESS ===="); System.out.println("==== PROCESS ====");
System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask())); System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask()));
boolean b = ThreadLocalRandom.current().nextBoolean(); boolean b = ThreadLocalRandom.current().nextBoolean();

View File

@ -25,7 +25,7 @@ public class AppendWorkflowContextTester implements BasicProcessor {
@SuppressWarnings("squid:S106") @SuppressWarnings("squid:S106")
public ProcessResult process(TaskContext context) throws Exception { public ProcessResult process(TaskContext context) throws Exception {
Map<String, String> workflowContext = context.fetchWorkflowContext(); Map<String, String> workflowContext = context.getWorkflowContext().fetchWorkflowContext();
String originValue = workflowContext.get(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY); String originValue = workflowContext.get(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY);
System.out.println("======= AppendWorkflowContextTester#start ======="); System.out.println("======= AppendWorkflowContextTester#start =======");
System.out.println("current instance id : " + context.getInstanceId()); System.out.println("current instance id : " + context.getInstanceId());
@ -38,7 +38,7 @@ public class AppendWorkflowContextTester implements BasicProcessor {
} catch (Exception e) { } catch (Exception e) {
// ignore // ignore
} }
context.appendData2WfContext(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY, num + 1); context.getWorkflowContext().appendData2WfContext(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY, num + 1);
System.out.println("======= AppendWorkflowContextTester#end ======="); System.out.println("======= AppendWorkflowContextTester#end =======");
if (FAIL_CODE.equals(context.getJobParams())) { if (FAIL_CODE.equals(context.getJobParams())) {
return new ProcessResult(false, "Failed!"); return new ProcessResult(false, "Failed!");

View File

@ -26,7 +26,7 @@ public class WorkflowStandaloneProcessor implements BasicProcessor {
System.out.println("currentContext:"+JSON.toJSONString(context)); System.out.println("currentContext:"+JSON.toJSONString(context));
// 尝试获取上游任务 // 尝试获取上游任务
Map<String, String> workflowContext = context.fetchWorkflowContext(); Map<String, String> workflowContext = context.getWorkflowContext().fetchWorkflowContext();
System.out.println("工作流上下文数据:"); System.out.println("工作流上下文数据:");
System.out.println(workflowContext); System.out.println(workflowContext);

View File

@ -11,6 +11,7 @@ import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult;
import com.github.kfcfans.powerjob.worker.core.processor.WorkflowContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BroadcastProcessor; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.MapReduceProcessor; import com.github.kfcfans.powerjob.worker.core.processor.sdk.MapReduceProcessor;
@ -63,7 +64,9 @@ public class ProcessorRunnable implements Runnable {
log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName()); log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());
ThreadLocalStore.setTask(task); ThreadLocalStore.setTask(task);
// 0. 构造任务上下文 // 0. 构造任务上下文
WorkflowContext workflowContext = constructWorkflowContext();
TaskContext taskContext = constructTaskContext(); TaskContext taskContext = constructTaskContext();
taskContext.setWorkflowContext(workflowContext);
// 1. 上报执行信息 // 1. 上报执行信息
reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null); reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null);
@ -91,7 +94,7 @@ public class ProcessorRunnable implements Runnable {
processResult = new ProcessResult(false, e.toString()); processResult = new ProcessResult(false, e.toString());
} }
// //
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, taskContext.getAppendedContextData()); reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, workflowContext.getAppendedContextData());
} }
@ -111,6 +114,10 @@ public class ProcessorRunnable implements Runnable {
return taskContext; return taskContext;
} }
private WorkflowContext constructWorkflowContext(){
return new WorkflowContext(task.getInstanceId(), instanceInfo.getInstanceParams());
}
/** /**
* 处理最终任务 * 处理最终任务
* BROADCAST => {@link BroadcastProcessor#postProcess} * BROADCAST => {@link BroadcastProcessor#postProcess}
@ -151,7 +158,7 @@ public class ProcessorRunnable implements Runnable {
} }
TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
reportStatus(status, suit(processResult.getMsg()), null, taskContext.getAppendedContextData()); reportStatus(status, suit(processResult.getMsg()), null, taskContext.getWorkflowContext().getAppendedContextData());
log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch); log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch);
} }
@ -177,7 +184,7 @@ public class ProcessorRunnable implements Runnable {
processResult = new ProcessResult(true, "NO_PREPOST_TASK"); processResult = new ProcessResult(true, "NO_PREPOST_TASK");
} }
// 通知 TaskerTracker 创建广播子任务 // 通知 TaskerTracker 创建广播子任务
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST, taskContext.getAppendedContextData()); reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST, taskContext.getWorkflowContext().getAppendedContextData());
} }

View File

@ -1,19 +1,13 @@
package com.github.kfcfans.powerjob.worker.core.processor; package com.github.kfcfans.powerjob.worker.core.processor;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.github.kfcfans.powerjob.common.WorkflowContextConstant;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig; import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.google.common.collect.Maps;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.ToString; import lombok.ToString;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/** /**
* 任务上下文 * 任务上下文
* 概念统一所有的worker只处理TaskJob和JobInstance的概念只存在于Server和TaskTracker * 概念统一所有的worker只处理TaskJob和JobInstance的概念只存在于Server和TaskTracker
@ -24,6 +18,7 @@ import java.util.Map;
* 2021/02/04 移除 fetchUpstreamTaskResult 方法 * 2021/02/04 移除 fetchUpstreamTaskResult 方法
* *
* @author tjq * @author tjq
* @author Echo009
* @since 2020/3/18 * @since 2020/3/18
*/ */
@Getter @Getter
@ -72,61 +67,9 @@ public class TaskContext {
* 用户自定义上下文通过 {@link OhMyConfig} 初始化 * 用户自定义上下文通过 {@link OhMyConfig} 初始化
*/ */
private Object userContext; private Object userContext;
/** /**
* 追加的上下文数据 * 工作流上下文数据
*/ */
private final Map<String,String> appendedContextData = Maps.newConcurrentMap(); private WorkflowContext workflowContext;
/**
* 获取工作流上下文 (MAP)本质上是将 instanceParams 解析成 MAP
* 初始参数的 key {@link WorkflowContextConstant#CONTEXT_INIT_PARAMS_KEY}
* 注意在没有传递初始参数时通过 CONTEXT_INIT_PARAMS_KEY 获取到的是 null
*
* @return 工作流上下文
* @author Echo009
* @since 2021/02/04
*/
@SuppressWarnings({"rawtypes","unchecked"})
public Map<String, String> fetchWorkflowContext() {
Map<String, String> res = Maps.newHashMap();
try {
Map originMap = JsonUtils.parseObject(instanceParams, Map.class);
originMap.forEach((k, v) -> res.put(String.valueOf(k), v == null ? null : String.valueOf(v)));
return res;
} catch (Exception ignore) {
// ignore
}
return Maps.newHashMap();
}
/**
* 往工作流上下文添加数据
* 注意如果 key 在当前上下文中已存在那么会直接覆盖
*/
public synchronized void appendData2WfContext(String key,Object value){
String finalValue;
try {
// 先判断当前上下文大小是否超出限制
final int sizeThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextSize();
if (appendedContextData.size() >= sizeThreshold) {
log.warn("[TaskContext-{}|{}|{}] appended workflow context data size must be lesser than {}, current appended workflow context data(key={}) will be ignored!",instanceId,taskId,taskName,sizeThreshold,key);
}
finalValue = JsonUtils.toJSONStringUnsafe(value);
final int lengthThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextLength();
// 判断 key & value 是否超长度限制
if (key.length() > lengthThreshold || finalValue.length() > lengthThreshold) {
log.warn("[TaskContext-{}|{}|{}] appended workflow context data length must be shorter than {}, current appended workflow context data(key={}) will be ignored!",instanceId,taskId,taskName,lengthThreshold,key);
return;
}
} catch (Exception e) {
log.warn("[TaskContext-{}|{}|{}] fail to append data to workflow context, key : {}",instanceId,taskId,taskName, key);
return;
}
appendedContextData.put(key, JsonUtils.toJSONString(value));
}
} }

View File

@ -0,0 +1,87 @@
package com.github.kfcfans.powerjob.worker.core.processor;
import com.github.kfcfans.powerjob.common.WorkflowContextConstant;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.google.common.collect.Maps;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* 工作流上下文
*
* @author Echo009
* @since 2021/2/19
*/
@Data
@Slf4j
public class WorkflowContext {
/**
* 任务实例 ID
*/
private final Long instanceId;
/**
* 当前工作流上下文数据
* 这里的 data 实际上等价于 {@link TaskContext} 中的 instanceParams
*/
private final String data;
/**
* 追加的上下文信息
*/
private final Map<String, String> appendedContextData = Maps.newConcurrentMap();
/**
* 获取工作流上下文 (MAP)本质上是将 data 解析成 MAP
* 初始参数的 key {@link WorkflowContextConstant#CONTEXT_INIT_PARAMS_KEY}
* 注意在没有传递初始参数时通过 CONTEXT_INIT_PARAMS_KEY 获取到的是 null
*
* @return 工作流上下文
* @author Echo009
* @since 2021/02/04
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public Map<String, String> fetchWorkflowContext() {
Map<String, String> res = Maps.newHashMap();
try {
Map originMap = JsonUtils.parseObject(data, Map.class);
originMap.forEach((k, v) -> res.put(String.valueOf(k), v == null ? null : String.valueOf(v)));
return res;
} catch (Exception exception) {
// log
log.warn("[WorkflowContext-{}] fail to fetch workflow context , {}", instanceId, exception);
}
return Maps.newHashMap();
}
/**
* 往工作流上下文添加数据
* 注意如果 key 在当前上下文中已存在那么会直接覆盖
*/
public void appendData2WfContext(String key, Object value) {
String finalValue;
try {
// 先判断当前上下文大小是否超出限制
final int sizeThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextSize();
if (appendedContextData.size() >= sizeThreshold) {
log.warn("[WorkflowContext-{}] appended workflow context data size must be lesser than {}, current appended workflow context data(key={}) will be ignored!", instanceId, sizeThreshold, key);
return;
}
finalValue = JsonUtils.toJSONStringUnsafe(value);
final int lengthThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextLength();
// 判断 key & value 是否超长度限制
if (key.length() > lengthThreshold || finalValue.length() > lengthThreshold) {
log.warn("[WorkflowContext-{}] appended workflow context data length must be shorter than {}, current appended workflow context data(key={}) will be ignored!", instanceId, lengthThreshold, key);
return;
}
} catch (Exception e) {
log.warn("[WorkflowContext-{}] fail to append data to workflow context, key : {}", instanceId, key);
return;
}
appendedContextData.put(key, JsonUtils.toJSONString(value));
}
}