diff --git a/powerjob-worker-samples/src/main/resources/application.properties b/powerjob-worker-samples/src/main/resources/application.properties index 79e356e3..b303af55 100644 --- a/powerjob-worker-samples/src/main/resources/application.properties +++ b/powerjob-worker-samples/src/main/resources/application.properties @@ -10,4 +10,6 @@ powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701 # Store strategy of H2 database. disk or memory. Default value is disk. powerjob.worker.store-strategy=disk # Max length of result. Results that are longer than the value will be truncated. -powerjob.worker.max-result-length=4096 \ No newline at end of file +powerjob.worker.max-result-length=4096 +# Max length of appended workflow context . Appended workflow context value that is longer than the value will be ignore. +powerjob.worker.max-appended-wf-context-length=4096 \ No newline at end of file diff --git a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java index 07290900..3984959e 100644 --- a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java +++ b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java @@ -70,13 +70,9 @@ public class PowerJobAutoConfiguration { */ config.setEnableTestMode(worker.isEnableTestMode()); /* - * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore. + * Max length of appended workflow context . Appended workflow context value that is longer than the value will be ignore. */ config.setMaxAppendedWfContextLength(worker.getMaxAppendedWfContextLength()); - /* - * Max size of appended workflow context. Appended workflow context that is greater than the value will be truncate. - */ - config.setMaxAppendedWfContextSize(worker.getMaxAppendedWfContextSize()); /* * Create OhMyWorker object and set properties. */ diff --git a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java index a2fb6ad0..b4130874 100644 --- a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java +++ b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java @@ -3,8 +3,7 @@ package com.github.kfcfans.powerjob.worker.autoconfigure; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; -import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; -import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; +import com.github.kfcfans.powerjob.worker.core.processor.WorkflowContext; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -129,7 +128,7 @@ public class PowerJobProperties { * Max length of response result. Result that is longer than the value will be truncated. * {@link ProcessResult} max length for #msg */ - private int maxResultLength = 8096; + private int maxResultLength = 8192; /** * If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName. * Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application @@ -137,14 +136,9 @@ public class PowerJobProperties { private boolean enableTestMode = false; /** * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore. - * {@link TaskContext} max length for #appendedContextData key and value. + * {@link WorkflowContext} max length for #appendedContextData */ - private int maxAppendedWfContextLength = 8096; - /** - * Max size of appended workflow context. Appended workflow context that is greater than the value will be truncated. - * {@link TaskContext} max size for #appendedContextData - * {@link TaskTracker} max size for #appendedWfContext - */ - private int maxAppendedWfContextSize = 16; + private int maxAppendedWfContextLength = 8192; + } } diff --git a/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json b/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json index 883d147f..b74570cf 100644 --- a/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json +++ b/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json @@ -54,13 +54,7 @@ { "name": "powerjob.worker.max-appended-wf-context-length", "type": "java.lang.Integer", - "description": "Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.", - "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" - }, - { - "name": "powerjob.worker.max-appended-wf-context-size", - "type": "java.lang.Integer", - "description": "Max size of appended workflow context. Appended workflow context that is greater than the value will be truncated.", + "description": "Max length of appended workflow context. Appended workflow context that is longer than the value will be ignore.", "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" } ], diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java index 0091fb23..aa3f80ce 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java @@ -3,8 +3,7 @@ package com.github.kfcfans.powerjob.worker.common; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; -import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; -import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; +import com.github.kfcfans.powerjob.worker.core.processor.WorkflowContext; import com.github.kfcfans.powerjob.worker.extension.SystemMetricsCollector; import com.google.common.collect.Lists; import lombok.Getter; @@ -58,15 +57,9 @@ public class OhMyConfig { private boolean enableTestMode = false; /** * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore. - * {@link TaskContext} max length for #appendedContextData key and value. + * {@link WorkflowContext} max length for #appendedContextData */ - private int maxAppendedWfContextLength = 8096; - /** - * Max size of appended workflow context. Appended workflow context that is greater than the value will be truncated. - * {@link TaskContext} max size for #appendedContextData - * {@link TaskTracker} max size for #appendedWfContext - */ - private int maxAppendedWfContextSize = 16; + private int maxAppendedWfContextLength = 8192; private SystemMetricsCollector systemMetricsCollector; diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/WorkflowContextUtils.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/WorkflowContextUtils.java new file mode 100644 index 00000000..24a9468e --- /dev/null +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/WorkflowContextUtils.java @@ -0,0 +1,34 @@ +package com.github.kfcfans.powerjob.worker.common.utils; + +import com.github.kfcfans.powerjob.common.utils.JsonUtils; +import com.github.kfcfans.powerjob.worker.OhMyWorker; + +import java.util.Map; + +/** + * 工作流上下文工具类 + * + * @author Echo009 + * @since 2021/2/20 + */ +public class WorkflowContextUtils { + + private WorkflowContextUtils() { + + } + + + public static boolean isExceededLengthLimit(Map appendedWfContext) { + + String jsonString = JsonUtils.toJSONString(appendedWfContext); + if (jsonString == null) { + // impossible + return true; + } + int maxAppendedWfContextLength = OhMyWorker.getConfig().getMaxAppendedWfContextLength(); + + return maxAppendedWfContextLength < jsonString.length(); + + } + +} diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java index 92b97b84..cfb34bd6 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java @@ -8,6 +8,7 @@ import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils; +import com.github.kfcfans.powerjob.worker.common.utils.WorkflowContextUtils; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; @@ -26,6 +27,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Queue; @@ -114,8 +116,8 @@ public class ProcessorRunnable implements Runnable { return taskContext; } - private WorkflowContext constructWorkflowContext(){ - return new WorkflowContext(task.getInstanceId(), instanceInfo.getInstanceParams()); + private WorkflowContext constructWorkflowContext() { + return new WorkflowContext(task.getInstanceId(), instanceInfo.getInstanceParams()); } /** @@ -205,6 +207,12 @@ public class ProcessorRunnable implements Runnable { req.setResult(result); req.setReportTime(System.currentTimeMillis()); req.setCmd(cmd); + // 检查追加的上下文大小是否超出限制 + if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext)) { + log.warn("[ProcessorRunnable-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(),OhMyWorker.getConfig().getMaxAppendedWfContextLength()); + // ignore appended workflow context data + appendedWfContext = Collections.emptyMap(); + } req.setAppendedWfContext(appendedWfContext); // 最终结束状态要求可靠发送 @@ -253,4 +261,5 @@ public class ProcessorRunnable implements Runnable { task.getInstanceId(), task.getTaskId(), result.length(), maxLength); return result.substring(0, maxLength).concat("..."); } + } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/WorkflowContext.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/WorkflowContext.java index 46a9a839..b435fe51 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/WorkflowContext.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/WorkflowContext.java @@ -2,7 +2,6 @@ package com.github.kfcfans.powerjob.worker.core.processor; import com.github.kfcfans.powerjob.common.WorkflowContextConstant; import com.github.kfcfans.powerjob.common.utils.JsonUtils; -import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.google.common.collect.Maps; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -63,24 +62,14 @@ public class WorkflowContext { public void appendData2WfContext(String key, Object value) { String finalValue; try { - // 先判断当前上下文大小是否超出限制 - final int sizeThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextSize(); - if (appendedContextData.size() >= sizeThreshold) { - log.warn("[WorkflowContext-{}] appended workflow context data size must be lesser than {}, current appended workflow context data(key={}) will be ignored!", instanceId, sizeThreshold, key); - return; - } + // 这里不限制长度,完成任务之后上报至 TaskTracker 时再校验 finalValue = JsonUtils.toJSONStringUnsafe(value); - final int lengthThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextLength(); - // 判断 key & value 是否超长度限制 - if (key.length() > lengthThreshold || finalValue.length() > lengthThreshold) { - log.warn("[WorkflowContext-{}] appended workflow context data length must be shorter than {}, current appended workflow context data(key={}) will be ignored!", instanceId, lengthThreshold, key); - return; - } + } catch (Exception e) { log.warn("[WorkflowContext-{}] fail to append data to workflow context, key : {}", instanceId, key); return; } - appendedContextData.put(key, JsonUtils.toJSONString(value)); + appendedContextData.put(key, finalValue); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index 2e6cfa5d..677e7587 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -18,6 +18,7 @@ import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; +import com.github.kfcfans.powerjob.worker.common.utils.WorkflowContextUtils; import com.github.kfcfans.powerjob.worker.core.ha.ProcessorTrackerStatusHolder; import com.github.kfcfans.powerjob.worker.persistence.TaskDO; import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService; @@ -183,13 +184,14 @@ public abstract class TaskTracker { // 只有工作流中的任务才有存储的必要 return; } - // 先判断当前上下文大小是否超出限制 - final int sizeThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextSize(); + // 检查追加的上下文大小是否超出限制 + if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext)) { + log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(),OhMyWorker.getConfig().getMaxAppendedWfContextLength()); + // ignore appended workflow context data + return; + } + for (Map.Entry entry : newAppendedWfContext.entrySet()) { - if (appendedWfContext.size() >= sizeThreshold) { - log.warn("[TaskTracker-{}] current size of appended workflow context data is greater than {}, the rest of appended workflow context data will be ignore! ", instanceInfo.getInstanceId(), sizeThreshold); - break; - } String originValue = appendedWfContext.put(entry.getKey(), entry.getValue()); log.info("[TaskTracker-{}] update appended workflow context data {} : {} -> {}", instanceInfo.getInstanceId(), entry.getKey(), originValue, entry.getValue()); }