diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java index b6fec269..92b97b84 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java @@ -183,7 +183,7 @@ public class ProcessorRunnable implements Runnable { } else { processResult = new ProcessResult(true, "NO_PREPOST_TASK"); } - // 通知 TaskerTracker 创建广播子任务 + // 通知 TaskTracker 创建广播子任务 reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST, taskContext.getWorkflowContext().getAppendedContextData()); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/WorkflowContext.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/WorkflowContext.java index b4b91ed1..46a9a839 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/WorkflowContext.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/WorkflowContext.java @@ -4,7 +4,7 @@ import com.github.kfcfans.powerjob.common.WorkflowContextConstant; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.google.common.collect.Maps; -import lombok.Data; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.Map; @@ -15,7 +15,7 @@ import java.util.Map; * @author Echo009 * @since 2021/2/19 */ -@Data +@Getter @Slf4j public class WorkflowContext { /** @@ -26,12 +26,22 @@ public class WorkflowContext { * 当前工作流上下文数据 * 这里的 data 实际上等价于 {@link TaskContext} 中的 instanceParams */ - private final String data; + private final Map data = Maps.newHashMap(); /** * 追加的上下文信息 */ private final Map appendedContextData = Maps.newConcurrentMap(); + @SuppressWarnings({"rawtypes", "unchecked"}) + public WorkflowContext(Long instanceId, String data) { + this.instanceId = instanceId; + try { + Map originMap = JsonUtils.parseObject(data, Map.class); + originMap.forEach((k, v) -> this.data.put(String.valueOf(k), v == null ? null : String.valueOf(v))); + } catch (Exception exception) { + log.warn("[WorkflowContext-{}] parse workflow context failed, {}", instanceId, exception); + } + } /** * 获取工作流上下文 (MAP),本质上是将 data 解析成 MAP @@ -42,18 +52,8 @@ public class WorkflowContext { * @author Echo009 * @since 2021/02/04 */ - @SuppressWarnings({"rawtypes", "unchecked"}) public Map fetchWorkflowContext() { - Map res = Maps.newHashMap(); - try { - Map originMap = JsonUtils.parseObject(data, Map.class); - originMap.forEach((k, v) -> res.put(String.valueOf(k), v == null ? null : String.valueOf(v))); - return res; - } catch (Exception exception) { - // log - log.warn("[WorkflowContext-{}] fail to fetch workflow context , {}", instanceId, exception); - } - return Maps.newHashMap(); + return data; } /**