refactor: 调整工作流上下文控制策略

This commit is contained in:
Echo009 2021-02-20 21:31:50 +08:00
parent 5fd4b9ae9d
commit 449608293c
9 changed files with 69 additions and 56 deletions

View File

@ -10,4 +10,6 @@ powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701
# Store strategy of H2 database. disk or memory. Default value is disk. # Store strategy of H2 database. disk or memory. Default value is disk.
powerjob.worker.store-strategy=disk powerjob.worker.store-strategy=disk
# Max length of result. Results that are longer than the value will be truncated. # Max length of result. Results that are longer than the value will be truncated.
powerjob.worker.max-result-length=4096 powerjob.worker.max-result-length=4096
# Max length of appended workflow context . Appended workflow context value that is longer than the value will be ignore.
powerjob.worker.max-appended-wf-context-length=4096

View File

@ -70,13 +70,9 @@ public class PowerJobAutoConfiguration {
*/ */
config.setEnableTestMode(worker.isEnableTestMode()); config.setEnableTestMode(worker.isEnableTestMode());
/* /*
* Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore. * Max length of appended workflow context . Appended workflow context value that is longer than the value will be ignore.
*/ */
config.setMaxAppendedWfContextLength(worker.getMaxAppendedWfContextLength()); config.setMaxAppendedWfContextLength(worker.getMaxAppendedWfContextLength());
/*
* Max size of appended workflow context. Appended workflow context that is greater than the value will be truncate.
*/
config.setMaxAppendedWfContextSize(worker.getMaxAppendedWfContextSize());
/* /*
* Create OhMyWorker object and set properties. * Create OhMyWorker object and set properties.
*/ */

View File

@ -3,8 +3,7 @@ package com.github.kfcfans.powerjob.worker.autoconfigure;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.WorkflowContext;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
@ -129,7 +128,7 @@ public class PowerJobProperties {
* Max length of response result. Result that is longer than the value will be truncated. * Max length of response result. Result that is longer than the value will be truncated.
* {@link ProcessResult} max length for #msg * {@link ProcessResult} max length for #msg
*/ */
private int maxResultLength = 8096; private int maxResultLength = 8192;
/** /**
* If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName. * If test mode is set as true, Powerjob-worker no longer connects to the server or validates appName.
* Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application * Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application
@ -137,14 +136,9 @@ public class PowerJobProperties {
private boolean enableTestMode = false; private boolean enableTestMode = false;
/** /**
* Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore. * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.
* {@link TaskContext} max length for #appendedContextData key and value. * {@link WorkflowContext} max length for #appendedContextData
*/ */
private int maxAppendedWfContextLength = 8096; private int maxAppendedWfContextLength = 8192;
/**
* Max size of appended workflow context. Appended workflow context that is greater than the value will be truncated.
* {@link TaskContext} max size for #appendedContextData
* {@link TaskTracker} max size for #appendedWfContext
*/
private int maxAppendedWfContextSize = 16;
} }
} }

View File

@ -54,13 +54,7 @@
{ {
"name": "powerjob.worker.max-appended-wf-context-length", "name": "powerjob.worker.max-appended-wf-context-length",
"type": "java.lang.Integer", "type": "java.lang.Integer",
"description": "Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.", "description": "Max length of appended workflow context. Appended workflow context that is longer than the value will be ignore.",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
},
{
"name": "powerjob.worker.max-appended-wf-context-size",
"type": "java.lang.Integer",
"description": "Max size of appended workflow context. Appended workflow context that is greater than the value will be truncated.",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
} }
], ],

View File

@ -3,8 +3,7 @@ package com.github.kfcfans.powerjob.worker.common;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.WorkflowContext;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.powerjob.worker.extension.SystemMetricsCollector; import com.github.kfcfans.powerjob.worker.extension.SystemMetricsCollector;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.Getter; import lombok.Getter;
@ -58,15 +57,9 @@ public class OhMyConfig {
private boolean enableTestMode = false; private boolean enableTestMode = false;
/** /**
* Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore. * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.
* {@link TaskContext} max length for #appendedContextData key and value. * {@link WorkflowContext} max length for #appendedContextData
*/ */
private int maxAppendedWfContextLength = 8096; private int maxAppendedWfContextLength = 8192;
/**
* Max size of appended workflow context. Appended workflow context that is greater than the value will be truncated.
* {@link TaskContext} max size for #appendedContextData
* {@link TaskTracker} max size for #appendedWfContext
*/
private int maxAppendedWfContextSize = 16;
private SystemMetricsCollector systemMetricsCollector; private SystemMetricsCollector systemMetricsCollector;

View File

@ -0,0 +1,34 @@
package com.github.kfcfans.powerjob.worker.common.utils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import java.util.Map;
/**
* 工作流上下文工具类
*
* @author Echo009
* @since 2021/2/20
*/
public class WorkflowContextUtils {
private WorkflowContextUtils() {
}
public static boolean isExceededLengthLimit(Map<String, String> appendedWfContext) {
String jsonString = JsonUtils.toJSONString(appendedWfContext);
if (jsonString == null) {
// impossible
return true;
}
int maxAppendedWfContextLength = OhMyWorker.getConfig().getMaxAppendedWfContextLength();
return maxAppendedWfContextLength < jsonString.length();
}
}

View File

@ -8,6 +8,7 @@ import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils; import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils;
import com.github.kfcfans.powerjob.worker.common.utils.WorkflowContextUtils;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult;
@ -26,6 +27,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
@ -114,8 +116,8 @@ public class ProcessorRunnable implements Runnable {
return taskContext; return taskContext;
} }
private WorkflowContext constructWorkflowContext(){ private WorkflowContext constructWorkflowContext() {
return new WorkflowContext(task.getInstanceId(), instanceInfo.getInstanceParams()); return new WorkflowContext(task.getInstanceId(), instanceInfo.getInstanceParams());
} }
/** /**
@ -205,6 +207,12 @@ public class ProcessorRunnable implements Runnable {
req.setResult(result); req.setResult(result);
req.setReportTime(System.currentTimeMillis()); req.setReportTime(System.currentTimeMillis());
req.setCmd(cmd); req.setCmd(cmd);
// 检查追加的上下文大小是否超出限制
if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext)) {
log.warn("[ProcessorRunnable-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(),OhMyWorker.getConfig().getMaxAppendedWfContextLength());
// ignore appended workflow context data
appendedWfContext = Collections.emptyMap();
}
req.setAppendedWfContext(appendedWfContext); req.setAppendedWfContext(appendedWfContext);
// 最终结束状态要求可靠发送 // 最终结束状态要求可靠发送
@ -253,4 +261,5 @@ public class ProcessorRunnable implements Runnable {
task.getInstanceId(), task.getTaskId(), result.length(), maxLength); task.getInstanceId(), task.getTaskId(), result.length(), maxLength);
return result.substring(0, maxLength).concat("..."); return result.substring(0, maxLength).concat("...");
} }
} }

View File

@ -2,7 +2,6 @@ package com.github.kfcfans.powerjob.worker.core.processor;
import com.github.kfcfans.powerjob.common.WorkflowContextConstant; import com.github.kfcfans.powerjob.common.WorkflowContextConstant;
import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -63,24 +62,14 @@ public class WorkflowContext {
public void appendData2WfContext(String key, Object value) { public void appendData2WfContext(String key, Object value) {
String finalValue; String finalValue;
try { try {
// 先判断当前上下文大小是否超出限制 // 这里不限制长度完成任务之后上报至 TaskTracker 时再校验
final int sizeThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextSize();
if (appendedContextData.size() >= sizeThreshold) {
log.warn("[WorkflowContext-{}] appended workflow context data size must be lesser than {}, current appended workflow context data(key={}) will be ignored!", instanceId, sizeThreshold, key);
return;
}
finalValue = JsonUtils.toJSONStringUnsafe(value); finalValue = JsonUtils.toJSONStringUnsafe(value);
final int lengthThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextLength();
// 判断 key & value 是否超长度限制
if (key.length() > lengthThreshold || finalValue.length() > lengthThreshold) {
log.warn("[WorkflowContext-{}] appended workflow context data length must be shorter than {}, current appended workflow context data(key={}) will be ignored!", instanceId, lengthThreshold, key);
return;
}
} catch (Exception e) { } catch (Exception e) {
log.warn("[WorkflowContext-{}] fail to append data to workflow context, key : {}", instanceId, key); log.warn("[WorkflowContext-{}] fail to append data to workflow context, key : {}", instanceId, key);
return; return;
} }
appendedContextData.put(key, JsonUtils.toJSONString(value)); appendedContextData.put(key, finalValue);
} }

View File

@ -18,6 +18,7 @@ import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.common.utils.WorkflowContextUtils;
import com.github.kfcfans.powerjob.worker.core.ha.ProcessorTrackerStatusHolder; import com.github.kfcfans.powerjob.worker.core.ha.ProcessorTrackerStatusHolder;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO; import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService; import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
@ -183,13 +184,14 @@ public abstract class TaskTracker {
// 只有工作流中的任务才有存储的必要 // 只有工作流中的任务才有存储的必要
return; return;
} }
// 先判断当前上下文大小是否超出限制 // 检查追加的上下文大小是否超出限制
final int sizeThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextSize(); if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext)) {
log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(),OhMyWorker.getConfig().getMaxAppendedWfContextLength());
// ignore appended workflow context data
return;
}
for (Map.Entry<String, String> entry : newAppendedWfContext.entrySet()) { for (Map.Entry<String, String> entry : newAppendedWfContext.entrySet()) {
if (appendedWfContext.size() >= sizeThreshold) {
log.warn("[TaskTracker-{}] current size of appended workflow context data is greater than {}, the rest of appended workflow context data will be ignore! ", instanceInfo.getInstanceId(), sizeThreshold);
break;
}
String originValue = appendedWfContext.put(entry.getKey(), entry.getValue()); String originValue = appendedWfContext.put(entry.getKey(), entry.getValue());
log.info("[TaskTracker-{}] update appended workflow context data {} : {} -> {}", instanceInfo.getInstanceId(), entry.getKey(), originValue, entry.getValue()); log.info("[TaskTracker-{}] update appended workflow context data {} : {} -> {}", instanceInfo.getInstanceId(), entry.getKey(), originValue, entry.getValue());
} }