From d6d461c77e2b6ff172d44ea95a9e6e5a07f91c83 Mon Sep 17 00:00:00 2001 From: Echo009 Date: Fri, 19 Feb 2021 15:48:43 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E5=B0=86=E5=B7=A5=E4=BD=9C?= =?UTF-8?q?=E6=B5=81=E4=B8=8A=E4=B8=8B=E6=96=87=E7=9B=B8=E5=85=B3=E7=9A=84?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E9=9B=86=E4=B8=AD=E8=87=B3=20WorkflowContext?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../samples/processors/MapProcessorDemo.java | 2 +- .../tester/AppendWorkflowContextTester.java | 4 +- .../workflow/WorkflowStandaloneProcessor.java | 2 +- .../core/executor/ProcessorRunnable.java | 13 ++- .../worker/core/processor/TaskContext.java | 63 +------------- .../core/processor/WorkflowContext.java | 87 +++++++++++++++++++ 6 files changed, 104 insertions(+), 67 deletions(-) create mode 100644 powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/WorkflowContext.java diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/MapProcessorDemo.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/MapProcessorDemo.java index c28b4052..02184ca0 100644 --- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/MapProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/MapProcessorDemo.java @@ -59,7 +59,7 @@ public class MapProcessorDemo extends MapProcessor { return map(subTasks, "MAP_TEST_TASK"); }else { // 测试在 Map 任务中追加上下文 - context.appendData2WfContext("Yasuo","A sword's poor company for a long road."); + context.getWorkflowContext().appendData2WfContext("Yasuo","A sword's poor company for a long road."); System.out.println("==== PROCESS ===="); System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask())); boolean b = ThreadLocalRandom.current().nextBoolean(); diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/tester/AppendWorkflowContextTester.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/tester/AppendWorkflowContextTester.java index 993ab002..e8b89f38 100644 --- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/tester/AppendWorkflowContextTester.java +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/tester/AppendWorkflowContextTester.java @@ -25,7 +25,7 @@ public class AppendWorkflowContextTester implements BasicProcessor { @SuppressWarnings("squid:S106") public ProcessResult process(TaskContext context) throws Exception { - Map workflowContext = context.fetchWorkflowContext(); + Map workflowContext = context.getWorkflowContext().fetchWorkflowContext(); String originValue = workflowContext.get(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY); System.out.println("======= AppendWorkflowContextTester#start ======="); System.out.println("current instance id : " + context.getInstanceId()); @@ -38,7 +38,7 @@ public class AppendWorkflowContextTester implements BasicProcessor { } catch (Exception e) { // ignore } - context.appendData2WfContext(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY, num + 1); + context.getWorkflowContext().appendData2WfContext(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY, num + 1); System.out.println("======= AppendWorkflowContextTester#end ======="); if (FAIL_CODE.equals(context.getJobParams())) { return new ProcessResult(false, "Failed!"); diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/workflow/WorkflowStandaloneProcessor.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/workflow/WorkflowStandaloneProcessor.java index de71a2e9..0b253c19 100644 --- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/workflow/WorkflowStandaloneProcessor.java +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/workflow/WorkflowStandaloneProcessor.java @@ -26,7 +26,7 @@ public class WorkflowStandaloneProcessor implements BasicProcessor { System.out.println("currentContext:"+JSON.toJSONString(context)); // 尝试获取上游任务 - Map workflowContext = context.fetchWorkflowContext(); + Map workflowContext = context.getWorkflowContext().fetchWorkflowContext(); System.out.println("工作流上下文数据:"); System.out.println(workflowContext); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java index c75dec81..b6fec269 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java @@ -11,6 +11,7 @@ import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; +import com.github.kfcfans.powerjob.worker.core.processor.WorkflowContext; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BroadcastProcessor; import com.github.kfcfans.powerjob.worker.core.processor.sdk.MapReduceProcessor; @@ -63,7 +64,9 @@ public class ProcessorRunnable implements Runnable { log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName()); ThreadLocalStore.setTask(task); // 0. 构造任务上下文 + WorkflowContext workflowContext = constructWorkflowContext(); TaskContext taskContext = constructTaskContext(); + taskContext.setWorkflowContext(workflowContext); // 1. 上报执行信息 reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null); @@ -91,7 +94,7 @@ public class ProcessorRunnable implements Runnable { processResult = new ProcessResult(false, e.toString()); } // - reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, taskContext.getAppendedContextData()); + reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, workflowContext.getAppendedContextData()); } @@ -111,6 +114,10 @@ public class ProcessorRunnable implements Runnable { return taskContext; } + private WorkflowContext constructWorkflowContext(){ + return new WorkflowContext(task.getInstanceId(), instanceInfo.getInstanceParams()); + } + /** * 处理最终任务 * BROADCAST => {@link BroadcastProcessor#postProcess} @@ -151,7 +158,7 @@ public class ProcessorRunnable implements Runnable { } TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; - reportStatus(status, suit(processResult.getMsg()), null, taskContext.getAppendedContextData()); + reportStatus(status, suit(processResult.getMsg()), null, taskContext.getWorkflowContext().getAppendedContextData()); log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch); } @@ -177,7 +184,7 @@ public class ProcessorRunnable implements Runnable { processResult = new ProcessResult(true, "NO_PREPOST_TASK"); } // 通知 TaskerTracker 创建广播子任务 - reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST, taskContext.getAppendedContextData()); + reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST, taskContext.getWorkflowContext().getAppendedContextData()); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/TaskContext.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/TaskContext.java index e2d9a190..8a3c2c35 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/TaskContext.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/TaskContext.java @@ -1,19 +1,13 @@ package com.github.kfcfans.powerjob.worker.core.processor; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.github.kfcfans.powerjob.common.WorkflowContextConstant; -import com.github.kfcfans.powerjob.common.utils.JsonUtils; -import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.OhMyConfig; import com.github.kfcfans.powerjob.worker.log.OmsLogger; -import com.google.common.collect.Maps; import lombok.Getter; import lombok.Setter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; -import java.util.Map; - /** * 任务上下文 * 概念统一,所有的worker只处理Task,Job和JobInstance的概念只存在于Server和TaskTracker @@ -24,6 +18,7 @@ import java.util.Map; * 2021/02/04 移除 fetchUpstreamTaskResult 方法 * * @author tjq + * @author Echo009 * @since 2020/3/18 */ @Getter @@ -72,61 +67,9 @@ public class TaskContext { * 用户自定义上下文,通过 {@link OhMyConfig} 初始化 */ private Object userContext; - /** - * 追加的上下文数据 + * 工作流上下文数据 */ - private final Map appendedContextData = Maps.newConcurrentMap(); - - - - /** - * 获取工作流上下文 (MAP),本质上是将 instanceParams 解析成 MAP - * 初始参数的 key 为 {@link WorkflowContextConstant#CONTEXT_INIT_PARAMS_KEY} - * 注意,在没有传递初始参数时,通过 CONTEXT_INIT_PARAMS_KEY 获取到的是 null - * - * @return 工作流上下文 - * @author Echo009 - * @since 2021/02/04 - */ - @SuppressWarnings({"rawtypes","unchecked"}) - public Map fetchWorkflowContext() { - Map res = Maps.newHashMap(); - try { - Map originMap = JsonUtils.parseObject(instanceParams, Map.class); - originMap.forEach((k, v) -> res.put(String.valueOf(k), v == null ? null : String.valueOf(v))); - return res; - } catch (Exception ignore) { - // ignore - } - return Maps.newHashMap(); - } - - - /** - * 往工作流上下文添加数据 - * 注意:如果 key 在当前上下文中已存在,那么会直接覆盖 - */ - public synchronized void appendData2WfContext(String key,Object value){ - String finalValue; - try { - // 先判断当前上下文大小是否超出限制 - final int sizeThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextSize(); - if (appendedContextData.size() >= sizeThreshold) { - log.warn("[TaskContext-{}|{}|{}] appended workflow context data size must be lesser than {}, current appended workflow context data(key={}) will be ignored!",instanceId,taskId,taskName,sizeThreshold,key); - } - finalValue = JsonUtils.toJSONStringUnsafe(value); - final int lengthThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextLength(); - // 判断 key & value 是否超长度限制 - if (key.length() > lengthThreshold || finalValue.length() > lengthThreshold) { - log.warn("[TaskContext-{}|{}|{}] appended workflow context data length must be shorter than {}, current appended workflow context data(key={}) will be ignored!",instanceId,taskId,taskName,lengthThreshold,key); - return; - } - } catch (Exception e) { - log.warn("[TaskContext-{}|{}|{}] fail to append data to workflow context, key : {}",instanceId,taskId,taskName, key); - return; - } - appendedContextData.put(key, JsonUtils.toJSONString(value)); - } + private WorkflowContext workflowContext; } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/WorkflowContext.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/WorkflowContext.java new file mode 100644 index 00000000..b4b91ed1 --- /dev/null +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/WorkflowContext.java @@ -0,0 +1,87 @@ +package com.github.kfcfans.powerjob.worker.core.processor; + +import com.github.kfcfans.powerjob.common.WorkflowContextConstant; +import com.github.kfcfans.powerjob.common.utils.JsonUtils; +import com.github.kfcfans.powerjob.worker.OhMyWorker; +import com.google.common.collect.Maps; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + +/** + * 工作流上下文 + * + * @author Echo009 + * @since 2021/2/19 + */ +@Data +@Slf4j +public class WorkflowContext { + /** + * 任务实例 ID + */ + private final Long instanceId; + /** + * 当前工作流上下文数据 + * 这里的 data 实际上等价于 {@link TaskContext} 中的 instanceParams + */ + private final String data; + /** + * 追加的上下文信息 + */ + private final Map appendedContextData = Maps.newConcurrentMap(); + + + /** + * 获取工作流上下文 (MAP),本质上是将 data 解析成 MAP + * 初始参数的 key 为 {@link WorkflowContextConstant#CONTEXT_INIT_PARAMS_KEY} + * 注意,在没有传递初始参数时,通过 CONTEXT_INIT_PARAMS_KEY 获取到的是 null + * + * @return 工作流上下文 + * @author Echo009 + * @since 2021/02/04 + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + public Map fetchWorkflowContext() { + Map res = Maps.newHashMap(); + try { + Map originMap = JsonUtils.parseObject(data, Map.class); + originMap.forEach((k, v) -> res.put(String.valueOf(k), v == null ? null : String.valueOf(v))); + return res; + } catch (Exception exception) { + // log + log.warn("[WorkflowContext-{}] fail to fetch workflow context , {}", instanceId, exception); + } + return Maps.newHashMap(); + } + + /** + * 往工作流上下文添加数据 + * 注意:如果 key 在当前上下文中已存在,那么会直接覆盖 + */ + public void appendData2WfContext(String key, Object value) { + String finalValue; + try { + // 先判断当前上下文大小是否超出限制 + final int sizeThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextSize(); + if (appendedContextData.size() >= sizeThreshold) { + log.warn("[WorkflowContext-{}] appended workflow context data size must be lesser than {}, current appended workflow context data(key={}) will be ignored!", instanceId, sizeThreshold, key); + return; + } + finalValue = JsonUtils.toJSONStringUnsafe(value); + final int lengthThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextLength(); + // 判断 key & value 是否超长度限制 + if (key.length() > lengthThreshold || finalValue.length() > lengthThreshold) { + log.warn("[WorkflowContext-{}] appended workflow context data length must be shorter than {}, current appended workflow context data(key={}) will be ignored!", instanceId, lengthThreshold, key); + return; + } + } catch (Exception e) { + log.warn("[WorkflowContext-{}] fail to append data to workflow context, key : {}", instanceId, key); + return; + } + appendedContextData.put(key, JsonUtils.toJSONString(value)); + } + + +}