diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/TaskTrackerReportInstanceStatusReq.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/TaskTrackerReportInstanceStatusReq.java index 95489b59..4d29f1ab 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/TaskTrackerReportInstanceStatusReq.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/TaskTrackerReportInstanceStatusReq.java @@ -3,6 +3,8 @@ package com.github.kfcfans.powerjob.common.request; import com.github.kfcfans.powerjob.common.OmsSerializable; import lombok.Data; +import java.util.Map; + /** * TaskTracker 将状态上报给服务器 @@ -14,19 +16,31 @@ import lombok.Data; public class TaskTrackerReportInstanceStatusReq implements OmsSerializable { private Long jobId; + private Long instanceId; + private Long wfInstanceId; + /** + * 追加的工作流上下文数据 + * @since 2021/02/05 + */ + private Map appendedWfContext; private int instanceStatus; private String result; /* ********* 统计信息 ********* */ + private long totalTaskNum; + private long succeedTaskNum; + private long failedTaskNum; private long startTime; + private long reportTime; + private String sourceAddress; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/handler/WorkerRequestHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/handler/WorkerRequestHandler.java index 8d836d96..f2b47c78 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/handler/WorkerRequestHandler.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/handler/WorkerRequestHandler.java @@ -15,14 +15,17 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRep import com.github.kfcfans.powerjob.server.service.InstanceLogService; import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; +import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.List; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; /** @@ -40,6 +43,8 @@ public class WorkerRequestHandler { @Resource private InstanceManager instanceManager; @Resource + private WorkflowInstanceManager workflowInstanceManager; + @Resource private InstanceLogService instanceLogService; @Resource private ContainerInfoRepository containerInfoRepository; @@ -58,7 +63,13 @@ public class WorkerRequestHandler { * 处理 instance 状态 * @param req 任务实例的状态上报请求 */ - public Optional onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) throws Exception { + public Optional onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) throws ExecutionException { + // 2021/02/05 如果是工作流中的实例先尝试更新上下文信息,再更新实例状态,这里一定不会有异常 + if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) { + // 更新工作流上下文信息 + workflowInstanceManager.updateWorkflowContext(req.getWfInstanceId(),req.getAppendedWfContext()); + } + instanceManager.updateStatus(req); // 结束状态(成功/失败)需要回复消息 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index 73c94e4e..d67f53f4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.server.service.workflow; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.utils.JsonUtils; @@ -20,6 +21,7 @@ import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter; import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarm; import com.github.kfcfans.powerjob.server.service.id.IdGenerateService; import com.github.kfcfans.powerjob.server.service.instance.InstanceService; +import com.github.kfcfans.powerjob.server.service.lock.local.UseSegmentLock; import com.google.common.collect.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; @@ -36,6 +38,7 @@ import java.util.*; */ @Slf4j @Service +@SuppressWarnings("squid:S1192") public class WorkflowInstanceManager { @Resource @@ -155,7 +158,7 @@ public class WorkflowInstanceManager { JobInfoDO jobInfo = jobInfoRepository.findById(root.getJobId()).orElseGet(JobInfoDO::new); if (jobInfo.getId() == null) { // 在创建工作流实例到当前的这段时间内 job 信息被物理删除了 - log.error("[Workflow-{}|{}]job info of current node{nodeId={},jobId={}} is not present! maybe you have deleted the job!", wfInfo.getId(), wfInstanceId, root.getNodeId(), root.getJobId()); + log.error("[Workflow-{}|{}]job info of current node(nodeId={},jobId={}) is not present! maybe you have deleted the job!", wfInfo.getId(), wfInstanceId, root.getNodeId(), root.getJobId()); } nodeId2JobInfoMap.put(root.getNodeId(), jobInfo); // instanceParam 传递的是工作流实例的 wfContext @@ -163,7 +166,7 @@ public class WorkflowInstanceManager { root.setInstanceId(instanceId); root.setStatus(InstanceStatus.RUNNING.getV()); - log.info("[Workflow-{}|{}] create root instance(jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, root.getJobId(), instanceId); + log.info("[Workflow-{}|{}] create root instance(nodeId={},jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId,root.getNodeId(), root.getJobId(), instanceId); }); // 持久化 @@ -195,7 +198,7 @@ public class WorkflowInstanceManager { * @param status 完成任务的任务实例状态(SUCCEED/FAILED/STOPPED) * @param result 完成任务的任务实例结果 */ - @SuppressWarnings({"squid:S3776","squid:S2142","squid:S1141"}) + @SuppressWarnings({"squid:S3776", "squid:S2142", "squid:S1141"}) public void move(Long wfInstanceId, Long instanceId, InstanceStatus status, String result) { int lockId = wfInstanceId.hashCode(); @@ -228,7 +231,7 @@ public class WorkflowInstanceManager { node.setStatus(status.getV()); node.setResult(result); - log.info("[Workflow-{}|{}] node(jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getJobId(), instanceId, status.name(), result); + log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId,node.getNodeId(), node.getJobId(), instanceId, status.name(), result); } if (InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) { @@ -299,7 +302,7 @@ public class WorkflowInstanceManager { JobInfoDO jobInfo = jobInfoRepository.findById(currentNode.getJobId()).orElseGet(JobInfoDO::new); if (jobInfo.getId() == null) { // 在创建工作流实例到当前的这段时间内 job 信息被物理删除了 - log.error("[Workflow-{}|{}]job info of current node{nodeId={},jobId={}} is not present! maybe you have deleted the job!", wfId, wfInstanceId, currentNode.getNodeId(), currentNode.getJobId()); + log.error("[Workflow-{}|{}]job info of current node(nodeId={},jobId={}) is not present! maybe you have deleted the job!", wfId, wfInstanceId, currentNode.getNodeId(), currentNode.getJobId()); } nodeId2JobInfoMap.put(nodeId, jobInfo); // instanceParam 传递的是工作流实例的 wfContext @@ -327,6 +330,37 @@ public class WorkflowInstanceManager { } } + /** + * 更新工作流上下文 + * @since 2021/02/05 + * @param wfInstanceId 工作流实例 + * @param appendedWfContextData 追加的上下文数据 + */ + @UseSegmentLock(type = "updateWfContext", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024) + public void updateWorkflowContext(Long wfInstanceId, Map appendedWfContextData) { + + try { + Optional wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId); + if (!wfInstanceInfoOpt.isPresent()) { + log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", wfInstanceId); + return; + } + WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get(); + HashMap wfContext = JSON.parseObject(wfInstance.getWfContext(), new TypeReference>() { + }); + for (Map.Entry entry : appendedWfContextData.entrySet()) { + String key = entry.getKey(); + String originValue = wfContext.put(key, entry.getValue()); + log.info("[Workflow-{}|{}] update workflow context {} : {} -> {}", wfInstance.getWorkflowId(), wfInstance.getWfInstanceId(), key, originValue, entry.getValue()); + } + wfInstance.setWfContext(JSON.toJSONString(wfContext)); + workflowInstanceInfoRepository.saveAndFlush(wfInstance); + + } catch (Exception e) { + log.error("[WorkflowInstanceManager] update workflow(workflowInstanceId={}) context failed.", wfInstanceId, e); + } + + } /** * 运行任务实例 diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/MapProcessorDemo.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/MapProcessorDemo.java index b60aae22..c28b4052 100644 --- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/MapProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/MapProcessorDemo.java @@ -27,10 +27,14 @@ public class MapProcessorDemo extends MapProcessor { @Resource private MysteryService mysteryService; - // 每一批发送任务大小 - private static final int batchSize = 100; - // 发送的批次 - private static final int batchNum = 2; + /** + * 每一批发送任务大小 + */ + private static final int BATCH_SIZE = 100; + /** + * 发送的批次 + */ + private static final int BATCH_NUM = 5; @Override public ProcessResult process(TaskContext context) throws Exception { @@ -43,17 +47,19 @@ public class MapProcessorDemo extends MapProcessor { if (isRootTask()) { System.out.println("==== MAP ===="); List subTasks = Lists.newLinkedList(); - for (int j = 0; j < batchNum; j++) { + for (int j = 0; j < BATCH_NUM; j++) { SubTask subTask = new SubTask(); subTask.siteId = j; subTask.itemIds = Lists.newLinkedList(); subTasks.add(subTask); - for (int i = 0; i < batchSize; i++) { + for (int i = 0; i < BATCH_SIZE; i++) { subTask.itemIds.add(i); } } return map(subTasks, "MAP_TEST_TASK"); }else { + // 测试在 Map 任务中追加上下文 + context.appendData2WfContext("Yasuo","A sword's poor company for a long road."); System.out.println("==== PROCESS ===="); System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask())); boolean b = ThreadLocalRandom.current().nextBoolean(); diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/tester/AppendWorkflowContextTester.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/tester/AppendWorkflowContextTester.java new file mode 100644 index 00000000..0cd46e65 --- /dev/null +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/tester/AppendWorkflowContextTester.java @@ -0,0 +1,43 @@ +package com.github.kfcfans.powerjob.samples.tester; + +import com.github.kfcfans.powerjob.common.WorkflowContextConstant; +import com.github.kfcfans.powerjob.common.utils.JsonUtils; +import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; +import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; +import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * 测试追加工作流上下文数据 + * com.github.kfcfans.powerjob.samples.tester.AppendWorkflowContextTester + * + * @author Echo009 + * @since 2021/2/6 + */ +@Component +public class AppendWorkflowContextTester implements BasicProcessor { + + + @Override + @SuppressWarnings("squid:S106") + public ProcessResult process(TaskContext context) throws Exception { + + Map workflowContext = context.fetchWorkflowContext(); + String originValue = workflowContext.get(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY); + System.out.println("======= AppendWorkflowContextTester#start ======="); + System.out.println("current instance id : " + context.getInstanceId()); + System.out.println("current workflow context : " + workflowContext); + System.out.println("initParam of workflow context : " + originValue); + int num = 0; + try { + num = Integer.parseInt(originValue); + } catch (Exception e) { + // ignore + } + context.appendData2WfContext(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY, num + 1); + System.out.println("======= AppendWorkflowContextTester#end ======="); + return new ProcessResult(true, JsonUtils.toJSONString(context)); + } +} diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/workflow/WorkflowStandaloneProcessor.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/workflow/WorkflowStandaloneProcessor.java index efd133cf..de71a2e9 100644 --- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/workflow/WorkflowStandaloneProcessor.java +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/workflow/WorkflowStandaloneProcessor.java @@ -1,6 +1,6 @@ package com.github.kfcfans.powerjob.samples.workflow; -import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.JSON; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; @@ -22,14 +22,13 @@ public class WorkflowStandaloneProcessor implements BasicProcessor { public ProcessResult process(TaskContext context) throws Exception { OmsLogger logger = context.getOmsLogger(); logger.info("current:" + context.getJobParams()); - System.out.println("current: " + context.getJobParams()); - System.out.println("currentContext:"); - System.out.println(JSONObject.toJSONString(context)); + System.out.println("jobParams: " + context.getJobParams()); + System.out.println("currentContext:"+JSON.toJSONString(context)); // 尝试获取上游任务 - Map upstreamTaskResult = context.fetchUpstreamTaskResult(); - System.out.println("工作流上游任务数据:"); - System.out.println(upstreamTaskResult); + Map workflowContext = context.fetchWorkflowContext(); + System.out.println("工作流上下文数据:"); + System.out.println(workflowContext); return new ProcessResult(true, context.getJobId() + " process successfully."); } diff --git a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java index 577d7db6..07290900 100644 --- a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java +++ b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobAutoConfiguration.java @@ -69,6 +69,14 @@ public class PowerJobAutoConfiguration { * or validate appName. */ config.setEnableTestMode(worker.isEnableTestMode()); + /* + * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore. + */ + config.setMaxAppendedWfContextLength(worker.getMaxAppendedWfContextLength()); + /* + * Max size of appended workflow context. Appended workflow context that is greater than the value will be truncate. + */ + config.setMaxAppendedWfContextSize(worker.getMaxAppendedWfContextSize()); /* * Create OhMyWorker object and set properties. */ diff --git a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java index 358dc55e..a2fb6ad0 100644 --- a/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java +++ b/powerjob-worker-spring-boot-starter/src/main/java/com/github/kfcfans/powerjob/worker/autoconfigure/PowerJobProperties.java @@ -3,6 +3,8 @@ package com.github.kfcfans.powerjob.worker.autoconfigure; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; +import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; +import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -133,5 +135,16 @@ public class PowerJobProperties { * Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application */ private boolean enableTestMode = false; + /** + * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore. + * {@link TaskContext} max length for #appendedContextData key and value. + */ + private int maxAppendedWfContextLength = 8096; + /** + * Max size of appended workflow context. Appended workflow context that is greater than the value will be truncated. + * {@link TaskContext} max size for #appendedContextData + * {@link TaskTracker} max size for #appendedWfContext + */ + private int maxAppendedWfContextSize = 16; } } diff --git a/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json b/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json index 01a0eb3d..883d147f 100644 --- a/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json +++ b/powerjob-worker-spring-boot-starter/src/main/resources/META-INF/spring-configuration-metadata.json @@ -50,6 +50,18 @@ "type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy", "description": "Local store strategy, disk or memory", "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" + }, + { + "name": "powerjob.worker.max-appended-wf-context-length", + "type": "java.lang.Integer", + "description": "Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" + }, + { + "name": "powerjob.worker.max-appended-wf-context-size", + "type": "java.lang.Integer", + "description": "Max size of appended workflow context. Appended workflow context that is greater than the value will be truncated.", + "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" } ], "hints": [] diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java index 4b53ae84..58893603 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java @@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.List; /** - * worker的master节点,处理来自server的jobInstance请求和来自worker的task请求 + * worker 的 master 节点,处理来自 server 的 jobInstance 请求和来自 worker 的task 请求 * * @author tjq * @since 2020/3/17 @@ -66,6 +66,9 @@ public class TaskTrackerActor extends AbstractActor { } taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); + + // 更新工作流上下文 + taskTracker.updateAppendedWfContext(req.getAppendedWfContext()); } /** diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java index 4fef6213..0091fb23 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java @@ -3,6 +3,8 @@ package com.github.kfcfans.powerjob.worker.common; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; +import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; +import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.powerjob.worker.extension.SystemMetricsCollector; import com.google.common.collect.Lists; import lombok.Getter; @@ -54,6 +56,18 @@ public class OhMyConfig { * Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application */ private boolean enableTestMode = false; + /** + * Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore. + * {@link TaskContext} max length for #appendedContextData key and value. + */ + private int maxAppendedWfContextLength = 8096; + /** + * Max size of appended workflow context. Appended workflow context that is greater than the value will be truncated. + * {@link TaskContext} max size for #appendedContextData + * {@link TaskTracker} max size for #appendedWfContext + */ + private int maxAppendedWfContextSize = 16; + private SystemMetricsCollector systemMetricsCollector; diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/constants/TaskConstant.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/constants/TaskConstant.java index bf06ce92..f2ec2786 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/constants/TaskConstant.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/constants/TaskConstant.java @@ -1,18 +1,21 @@ package com.github.kfcfans.powerjob.worker.common.constants; /** - * task 常熟 + * task 常量 * * @author tjq * @since 2020/3/17 */ public class TaskConstant { + private TaskConstant() { + + } + /** * 所有根任务的名称 */ public static final String ROOT_TASK_NAME = "OMS_ROOT_TASK"; - /** * 广播执行任务的名称 */ @@ -21,7 +24,5 @@ public class TaskConstant { * 终极任务的名称(MapReduce的reduceTask和Broadcast的postProcess会有该任务) */ public static final String LAST_TASK_NAME = "OMS_LAST_TASK"; - // 除0外任何数都可以 - public static final String LAST_TASK_ID = "9999"; } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java index ef0e61ad..c75dec81 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java @@ -2,24 +2,23 @@ package com.github.kfcfans.powerjob.worker.core.executor; import akka.actor.ActorSelection; import com.github.kfcfans.powerjob.common.ExecuteType; -import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils; +import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; +import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; +import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; +import com.github.kfcfans.powerjob.worker.core.processor.sdk.BroadcastProcessor; +import com.github.kfcfans.powerjob.worker.core.processor.sdk.MapReduceProcessor; import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.github.kfcfans.powerjob.worker.persistence.TaskDO; import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService; import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; -import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; -import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; -import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; -import com.github.kfcfans.powerjob.worker.core.processor.sdk.BroadcastProcessor; -import com.github.kfcfans.powerjob.worker.core.processor.sdk.MapReduceProcessor; import com.google.common.base.Stopwatch; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -27,6 +26,7 @@ import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; import java.util.List; +import java.util.Map; import java.util.Queue; /** @@ -37,6 +37,7 @@ import java.util.Queue; */ @Slf4j @AllArgsConstructor +@SuppressWarnings("squid:S1181") public class ProcessorRunnable implements Runnable { @@ -45,9 +46,13 @@ public class ProcessorRunnable implements Runnable { private final TaskDO task; private final BasicProcessor processor; private final OmsLogger omsLogger; - // 类加载器 + /** + * 类加载器 + */ private final ClassLoader classLoader; - // 重试队列,ProcessorTracker 将会定期重新上报处理结果 + /** + * 重试队列,ProcessorTracker 将会定期重新上报处理结果 + */ private final Queue statusReportRetryQueue; public void innerRun() throws InterruptedException { @@ -56,8 +61,41 @@ public class ProcessorRunnable implements Runnable { Long instanceId = task.getInstanceId(); log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName()); + ThreadLocalStore.setTask(task); + // 0. 构造任务上下文 + TaskContext taskContext = constructTaskContext(); + // 1. 上报执行信息 + reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null); - // 0. 完成执行上下文准备 & 上报执行信息 + ProcessResult processResult; + ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); + + // 2. 根任务 & 广播执行 特殊处理 + if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName()) && executeType == ExecuteType.BROADCAST) { + // 广播执行:先选本机执行 preProcess,完成后 TaskTracker 再为所有 Worker 生成子 Task + handleBroadcastRootTask(instanceId, taskContext); + return; + } + + // 3. 最终任务特殊处理(一定和 TaskTracker 处于相同的机器) + if (TaskConstant.LAST_TASK_NAME.equals(task.getTaskName())) { + handleLastTask(taskId, instanceId, taskContext, executeType); + return; + } + + // 4. 正式提交运行, + try { + processResult = processor.process(taskContext); + } catch (Throwable e) { + log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e); + processResult = new ProcessResult(false, e.toString()); + } + // + reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, taskContext.getAppendedContextData()); + } + + + private TaskContext constructTaskContext() { TaskContext taskContext = new TaskContext(); BeanUtils.copyProperties(task, taskContext); taskContext.setJobId(instanceInfo.getJobId()); @@ -70,101 +108,87 @@ public class ProcessorRunnable implements Runnable { taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent())); } taskContext.setUserContext(OhMyWorker.getConfig().getUserContext()); - ThreadLocalStore.setTask(task); + return taskContext; + } - reportStatus(TaskStatus.WORKER_PROCESSING, null, null); - - // 1. 根任务特殊处理 + /** + * 处理最终任务 + * BROADCAST => {@link BroadcastProcessor#postProcess} + * MAP_REDUCE => {@link MapReduceProcessor#reduce} + */ + private void handleLastTask(String taskId, Long instanceId, TaskContext taskContext, ExecuteType executeType) { ProcessResult processResult; - ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); - if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName())) { + Stopwatch stopwatch = Stopwatch.createStarted(); + log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId); - // 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task - if (executeType == ExecuteType.BROADCAST) { - - if (processor instanceof BroadcastProcessor) { - - BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; - try { - processResult = broadcastProcessor.preProcess(taskContext); - }catch (Throwable e) { - log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e); - processResult = new ProcessResult(false, e.toString()); - } - - }else { - processResult = new ProcessResult(true, "NO_PREPOST_TASK"); - } - - reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST); - // 广播执行的第一个 task 只执行 preProcess 部分 - return; - } - } - - // 2. 最终任务特殊处理(一定和 TaskTracker 处于相同的机器) - if (TaskConstant.LAST_TASK_NAME.equals(task.getTaskName())) { - - Stopwatch stopwatch = Stopwatch.createStarted(); - log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId); - - List taskResults = TaskPersistenceService.INSTANCE.getAllTaskResult(instanceId, task.getSubInstanceId()); - try { - switch (executeType) { - case BROADCAST: - - if (processor instanceof BroadcastProcessor) { - BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; - processResult = broadcastProcessor.postProcess(taskContext, taskResults); - }else { - processResult = BroadcastProcessor.defaultResult(taskResults); - } - break; - case MAP_REDUCE: - - if (processor instanceof MapReduceProcessor) { - MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor; - processResult = mapReduceProcessor.reduce(taskContext, taskResults); - }else { - processResult = new ProcessResult(false, "not implement the MapReduceProcessor"); - } - break; - default: - processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG"); - } - }catch (Throwable e) { - processResult = new ProcessResult(false, e.toString()); - log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e); - } - - TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; - reportStatus(status, suit(processResult.getMsg()), null); - - log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch); - return; - } - - - // 3. 正式提交运行 + List taskResults = TaskPersistenceService.INSTANCE.getAllTaskResult(instanceId, task.getSubInstanceId()); try { - processResult = processor.process(taskContext); - }catch (Throwable e) { - log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e); + switch (executeType) { + case BROADCAST: + + if (processor instanceof BroadcastProcessor) { + BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; + processResult = broadcastProcessor.postProcess(taskContext, taskResults); + } else { + processResult = BroadcastProcessor.defaultResult(taskResults); + } + break; + case MAP_REDUCE: + + if (processor instanceof MapReduceProcessor) { + MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor; + processResult = mapReduceProcessor.reduce(taskContext, taskResults); + } else { + processResult = new ProcessResult(false, "not implement the MapReduceProcessor"); + } + break; + default: + processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG"); + } + } catch (Throwable e) { processResult = new ProcessResult(false, e.toString()); + log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e); } - reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null); + + TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; + reportStatus(status, suit(processResult.getMsg()), null, taskContext.getAppendedContextData()); + + log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch); + } + + /** + * 处理广播执行的根任务 + * 即执行 {@link BroadcastProcessor#preProcess},并通知 TaskerTracker 创建广播子任务 + */ + private void handleBroadcastRootTask(Long instanceId, TaskContext taskContext) { + ProcessResult processResult; + // 广播执行的第一个 task 只执行 preProcess 部分 + if (processor instanceof BroadcastProcessor) { + + BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; + try { + processResult = broadcastProcessor.preProcess(taskContext); + } catch (Throwable e) { + log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e); + processResult = new ProcessResult(false, e.toString()); + } + + } else { + processResult = new ProcessResult(true, "NO_PREPOST_TASK"); + } + // 通知 TaskerTracker 创建广播子任务 + reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST, taskContext.getAppendedContextData()); + } /** * 上报状态给 TaskTracker + * * @param status Task状态 * @param result 执行结果,只有结束时才存在 - * @param cmd 特殊需求,比如广播执行需要创建广播任务 + * @param cmd 特殊需求,比如广播执行需要创建广播任务 */ - private void reportStatus(TaskStatus status, String result, Integer cmd) { - - CommonUtils.easySleep(1); - + private void reportStatus(TaskStatus status, String result, Integer cmd, Map appendedWfContext) { ProcessorReportTaskStatusReq req = new ProcessorReportTaskStatusReq(); req.setInstanceId(task.getInstanceId()); @@ -174,6 +198,7 @@ public class ProcessorRunnable implements Runnable { req.setResult(result); req.setReportTime(System.currentTimeMillis()); req.setCmd(cmd); + req.setAppendedWfContext(appendedWfContext); // 最终结束状态要求可靠发送 if (TaskStatus.finishedStatus.contains(status.getValue())) { @@ -189,21 +214,25 @@ public class ProcessorRunnable implements Runnable { } @Override + @SuppressWarnings("squid:S2142") public void run() { // 切换线程上下文类加载器(否则用的是 Worker 类加载器,不存在容器类,在序列化/反序列化时会报 ClassNotFoundException) Thread.currentThread().setContextClassLoader(classLoader); try { innerRun(); - }catch (InterruptedException ignore) { - }catch (Throwable e) { - reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null); + } catch (InterruptedException ignore) { + // ignore + } catch (Throwable e) { + reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null, null); log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e); - }finally { + } finally { ThreadLocalStore.clear(); } } - // 裁剪返回结果到合适的大小 + /** + * 裁剪返回结果到合适的大小 + */ private String suit(String result) { if (StringUtils.isEmpty(result)) { diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/TaskContext.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/TaskContext.java index 52e01b1c..e2d9a190 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/TaskContext.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/TaskContext.java @@ -1,12 +1,16 @@ package com.github.kfcfans.powerjob.worker.core.processor; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.github.kfcfans.powerjob.common.WorkflowContextConstant; import com.github.kfcfans.powerjob.common.utils.JsonUtils; +import com.github.kfcfans.powerjob.worker.OhMyWorker; +import com.github.kfcfans.powerjob.worker.common.OhMyConfig; import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.google.common.collect.Maps; import lombok.Getter; import lombok.Setter; import lombok.ToString; -import org.springframework.util.StringUtils; +import lombok.extern.slf4j.Slf4j; import java.util.Map; @@ -16,6 +20,8 @@ import java.util.Map; * 单机任务:整个Job变成一个Task * 广播任务:整个job变成一堆一样的Task * MR 任务:被map出来的任务都视为根Task的子Task + *

+ * 2021/02/04 移除 fetchUpstreamTaskResult 方法 * * @author tjq * @since 2020/3/18 @@ -23,14 +29,18 @@ import java.util.Map; @Getter @Setter @ToString +@Slf4j public class TaskContext { private Long jobId; - private Long instanceId; - private Long subInstanceId; - private String taskId; - private String taskName; + private Long instanceId; + + private Long subInstanceId; + + private String taskId; + + private String taskName; /** * 通过控制台传递的参数 */ @@ -38,7 +48,7 @@ public class TaskContext { /** * 任务实例运行中参数 * 若该任务实例通过 OpenAPI 触发,则该值为 OpenAPI 传递的参数 - * 若该任务为工作流的某个节点,则该值为上游任务传递下来的数据,推荐通过 {@link TaskContext#fetchUpstreamTaskResult()} 方法获取 + * 若该任务为工作流的某个节点,则该值为工作流实例的上下文 ( wfContext ) */ private String instanceParams; /** @@ -56,29 +66,67 @@ public class TaskContext { /** * 在线日志记录 */ + @JsonIgnore private OmsLogger omsLogger; /** - * 用户自定义上下文 + * 用户自定义上下文,通过 {@link OhMyConfig} 初始化 */ private Object userContext; + /** + * 追加的上下文数据 + */ + private final Map appendedContextData = Maps.newConcurrentMap(); + + /** - * 获取工作流上游任务传递的数据(仅该任务实例由工作流触发时存在) - * @return key: 上游任务的 jobId;value: 上游任务的 ProcessResult#result + * 获取工作流上下文 (MAP),本质上是将 instanceParams 解析成 MAP + * 初始参数的 key 为 {@link WorkflowContextConstant#CONTEXT_INIT_PARAMS_KEY} + * 注意,在没有传递初始参数时,通过 CONTEXT_INIT_PARAMS_KEY 获取到的是 null + * + * @return 工作流上下文 + * @author Echo009 + * @since 2021/02/04 */ - @SuppressWarnings("rawtypes, unchecked") - public Map fetchUpstreamTaskResult() { - Map res = Maps.newHashMap(); - if (StringUtils.isEmpty(instanceParams)) { - return res; - } + @SuppressWarnings({"rawtypes","unchecked"}) + public Map fetchWorkflowContext() { + Map res = Maps.newHashMap(); try { Map originMap = JsonUtils.parseObject(instanceParams, Map.class); - originMap.forEach((k, v) -> res.put(Long.valueOf(String.valueOf(k)), String.valueOf(v))); + originMap.forEach((k, v) -> res.put(String.valueOf(k), v == null ? null : String.valueOf(v))); return res; - }catch (Exception ignore) { + } catch (Exception ignore) { + // ignore } return Maps.newHashMap(); } + + + /** + * 往工作流上下文添加数据 + * 注意:如果 key 在当前上下文中已存在,那么会直接覆盖 + */ + public synchronized void appendData2WfContext(String key,Object value){ + String finalValue; + try { + // 先判断当前上下文大小是否超出限制 + final int sizeThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextSize(); + if (appendedContextData.size() >= sizeThreshold) { + log.warn("[TaskContext-{}|{}|{}] appended workflow context data size must be lesser than {}, current appended workflow context data(key={}) will be ignored!",instanceId,taskId,taskName,sizeThreshold,key); + } + finalValue = JsonUtils.toJSONStringUnsafe(value); + final int lengthThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextLength(); + // 判断 key & value 是否超长度限制 + if (key.length() > lengthThreshold || finalValue.length() > lengthThreshold) { + log.warn("[TaskContext-{}|{}|{}] appended workflow context data length must be shorter than {}, current appended workflow context data(key={}) will be ignored!",instanceId,taskId,taskName,lengthThreshold,key); + return; + } + } catch (Exception e) { + log.warn("[TaskContext-{}|{}|{}] fail to append data to workflow context, key : {}",instanceId,taskId,taskName, key); + return; + } + appendedContextData.put(key, JsonUtils.toJSONString(value)); + } + } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/BasicProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/BasicProcessor.java index af2eae24..32d75c4b 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/BasicProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/BasicProcessor.java @@ -13,6 +13,9 @@ public interface BasicProcessor { /** * 核心处理逻辑 + * 可通过 {@link TaskContext#fetchWorkflowContext} 获取工作流上下文 + * 可通过 {@link TaskContext#appendData2WfContext} 向工作流上下文中添加数据 + * * @param context 任务上下文,可通过 jobParams 和 instanceParams 分别获取控制台参数和OpenAPI传递的任务实例参数 * @return 处理结果,msg有长度限制,超长会被裁剪,不允许返回 null * @throws Exception 异常,允许抛出异常,但不推荐,最好由业务开发者自己处理 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index c7ed24aa..fdef8c7b 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -13,6 +13,7 @@ import com.github.kfcfans.powerjob.worker.core.ProcessorBeanFactory; import com.github.kfcfans.powerjob.worker.core.executor.ProcessorRunnable; import com.github.kfcfans.powerjob.worker.core.processor.built.PythonProcessor; import com.github.kfcfans.powerjob.worker.core.processor.built.ShellProcessor; +import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.github.kfcfans.powerjob.worker.log.impl.OmsServerLogger; import com.github.kfcfans.powerjob.worker.persistence.TaskDO; @@ -20,7 +21,6 @@ import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; -import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; @@ -40,43 +40,67 @@ import java.util.concurrent.*; @Slf4j public class ProcessorTracker { - // 记录创建时间 + /** + * 记录创建时间 + */ private long startTime; - // 任务实例信息 + /** + * 任务实例信息 + */ private InstanceInfo instanceInfo; - // 冗余 instanceId,方便日志 + /** + * 冗余 instanceId,方便日志 + */ private Long instanceId; - - // 任务执行器 + /** + * 任务执行器 + */ private BasicProcessor processor; - // 容器(可能为空) + /** + * 容器(可能为空) + */ private OmsContainer omsContainer; - // 在线日志 + /** + * 在线日志 + */ private OmsLogger omsLogger; - // ProcessResult 上报失败的重试队列 + /** + * ProcessResult 上报失败的重试队列 + */ private Queue statusReportRetryQueue; - // 上一次空闲时间(用于闲置判定) + /** + * 上一次空闲时间(用于闲置判定) + */ private long lastIdleTime; - // 上次完成任务数量(用于闲置判定) + /** + * 上次完成任务数量(用于闲置判定) + */ private long lastCompletedTaskCount; private String taskTrackerAddress; + private ActorSelection taskTrackerActorRef; private ThreadPoolExecutor threadPool; + private ScheduledExecutorService timingPool; private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128; - // 长时间空闲的 ProcessorTracker 会发起销毁请求 + /** + * 长时间空闲的 ProcessorTracker 会发起销毁请求 + */ private static final long MAX_IDLE_TIME = 120000; - - // 当 ProcessorTracker 出现根本性错误(比如 Processor 创建失败,所有的任务直接失败) + /** + * 当 ProcessorTracker 出现根本性错误(比如 Processor 创建失败,所有的任务直接失败) + */ private boolean lethal = false; + private String lethalReason; /** * 创建 ProcessorTracker(其实就是创建了个执行用的线程池 T_T) */ + @SuppressWarnings("squid:S1181") public ProcessorTracker(TaskTrackerStartTaskReq request) { try { // 赋值 @@ -122,7 +146,14 @@ public class ProcessorTracker { // 一旦 ProcessorTracker 出现异常,所有提交到此处的任务直接返回失败,防止形成死锁 // 死锁分析:TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T if (lethal) { - ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, newTask.getSubInstanceId(), newTask.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), lethalReason, System.currentTimeMillis(), null); + ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq() + .setInstanceId(instanceId) + .setSubInstanceId(newTask.getSubInstanceId()) + .setTaskId(newTask.getTaskId()) + .setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()) + .setResult(lethalReason) + .setReportTime(System.currentTimeMillis()); + taskTrackerActorRef.tell(report, null); return; } @@ -137,10 +168,10 @@ public class ProcessorTracker { try { threadPool.submit(processorRunnable); success = true; - }catch (RejectedExecutionException ignore) { + } catch (RejectedExecutionException ignore) { log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.", instanceId, newTask.getTaskId(), newTask.getTaskName()); - }catch (Exception e) { + } catch (Exception e) { log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e); } @@ -228,6 +259,7 @@ public class ProcessorTracker { private class CheckerAndReporter implements Runnable { @Override + @SuppressWarnings({"squid:S1066","squid:S3776"}) public void run() { // 超时检查,如果超时则自动关闭 TaskTracker @@ -245,10 +277,10 @@ public class ProcessorTracker { if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) { lastIdleTime = -1; lastCompletedTaskCount = threadPool.getCompletedTaskCount(); - }else { + } else { if (lastIdleTime == -1) { lastIdleTime = System.currentTimeMillis(); - }else { + } else { long idleTime = System.currentTimeMillis() - lastIdleTime; if (idleTime > MAX_IDLE_TIME) { log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime); @@ -296,7 +328,7 @@ public class ProcessorTracker { if (SpringUtils.supportSpringBean()) { try { processor = SpringUtils.getBean(processorInfo); - }catch (Exception e) { + } catch (Exception e) { log.warn("[ProcessorTracker-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, ExceptionUtils.getMessage(e)); } } @@ -318,7 +350,7 @@ public class ProcessorTracker { omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), true); if (omsContainer != null) { processor = omsContainer.getProcessor(split[1]); - }else { + } else { log.warn("[ProcessorTracker-{}] load container failed.", instanceId); } break; diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index 7a928c6e..b6f4777f 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -33,12 +33,20 @@ import java.util.concurrent.*; @ToString public class CommonTaskTracker extends TaskTracker { - private static final String ROOT_TASK_ID = "0"; - // 可以是除 ROOT_TASK_ID 的任何数字 - private static final String LAST_TASK_ID = "1111"; - - // 连续上报多次失败后放弃上报,视为结果不可达,TaskTracker down + /** + * 连续上报多次失败后放弃上报,视为结果不可达,TaskTracker down + */ private int reportFailedCnt = 0; + /** + * 根任务 ID + */ + public static final String ROOT_TASK_ID = "0"; + /** + * 最后一个任务 ID + * 除 {@link #ROOT_TASK_ID} 外任何数都可以 + */ + public static final String LAST_TASK_ID = "9999"; + private static final int MAX_REPORT_FAILED_THRESHOLD = 5; protected CommonTaskTracker(ServerScheduleJobReq req) { @@ -131,6 +139,7 @@ public class CommonTaskTracker extends TaskTracker { private static final long DISPATCH_TIME_OUT_MS = 15000; + @SuppressWarnings("squid:S3776") private void innerRun() { InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId); @@ -144,7 +153,6 @@ public class CommonTaskTracker extends TaskTracker { req.setJobId(instanceInfo.getJobId()); req.setInstanceId(instanceId); req.setWfInstanceId(instanceInfo.getWfInstanceId()); - req.setTotalTaskNum(finishedNum + unfinishedNum); req.setSucceedTaskNum(holder.succeedNum); req.setFailedTaskNum(holder.failedNum); @@ -161,7 +169,6 @@ public class CommonTaskTracker extends TaskTracker { // 数据库中一个任务都没有,说明根任务创建失败,该任务实例失败 if (finishedNum == 0) { finished.set(true); - success = false; result = SystemInstanceResult.TASK_INIT_FAILED; }else { ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); @@ -231,6 +238,8 @@ public class CommonTaskTracker extends TaskTracker { if (finished.get()) { req.setResult(result); + // 上报追加的工作流上下文信息 + req.setAppendedWfContext(appendedWfContext); req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV()); CompletionStage askCS = Patterns.ask(serverActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index 7eafa6e3..2e6cfa5d 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -29,6 +29,7 @@ import com.google.common.base.Stopwatch; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; @@ -36,6 +37,7 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -52,24 +54,48 @@ import java.util.concurrent.atomic.AtomicInteger; @Slf4j public abstract class TaskTracker { - // TaskTracker创建时间 - protected long createTime; - // 任务实例ID,使用频率过高,从 InstanceInfo 提取出来单独保存一份 - protected long instanceId; - // 任务实例信息 - protected InstanceInfo instanceInfo; - // ProcessTracker 状态管理 - protected ProcessorTrackerStatusHolder ptStatusHolder; - // 数据库持久化服务 - protected TaskPersistenceService taskPersistenceService; - // 定时任务线程池 + /** + * TaskTracker创建时间 + */ + protected final long createTime; + /** + * 任务实例ID,使用频率过高,从 InstanceInfo 提取出来单独保存一份 + */ + protected final long instanceId; + /** + * 任务实例信息 + */ + protected final InstanceInfo instanceInfo; + /** + * ProcessTracker 状态管理 + */ + protected final ProcessorTrackerStatusHolder ptStatusHolder; + /** + * 数据库持久化服务 + */ + protected final TaskPersistenceService taskPersistenceService; + /** + * 定时任务线程池 + */ protected ScheduledExecutorService scheduledPool; - // 是否结束 - protected AtomicBoolean finished; - // 上报时间缓存 + /** + * 是否结束 + */ + protected final AtomicBoolean finished; + /** + * 追加的工作流上下文数据 + * + * @since 2021/02/05 + */ + protected final Map appendedWfContext; + /** + * 上报时间缓存 + */ private final Cache taskId2LastReportTime; - // 分段锁 + /** + * 分段锁 + */ private final SegmentLock segmentLock; private static final int UPDATE_CONCURRENCY = 4; @@ -93,7 +119,8 @@ public abstract class TaskTracker { this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress()); this.taskPersistenceService = TaskPersistenceService.INSTANCE; this.finished = new AtomicBoolean(false); - + // 只有工作流中的任务允许向工作流中追加上下文数据 + this.appendedWfContext = req.getWfInstanceId() == null ? Collections.emptyMap() : Maps.newConcurrentMap(); // 构建缓存 taskId2LastReportTime = CacheBuilder.newBuilder().maximumSize(1024).build(); @@ -108,6 +135,7 @@ public abstract class TaskTracker { /** * 静态方法创建 TaskTracker + * * @param req 服务端调度任务请求 * @return API/CRON -> CommonTaskTracker, FIX_RATE/FIX_DELAY -> FrequentTaskTracker */ @@ -116,8 +144,10 @@ public abstract class TaskTracker { TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType()); switch (timeExpressionType) { case FIXED_RATE: - case FIXED_DELAY:return new FrequentTaskTracker(req); - default:return new CommonTaskTracker(req); + case FIXED_DELAY: + return new FrequentTaskTracker(req); + default: + return new CommonTaskTracker(req); } } catch (Exception e) { log.warn("[TaskTracker-{}] create TaskTracker from request({}) failed.", req.getInstanceId(), req, e); @@ -139,15 +169,45 @@ public abstract class TaskTracker { } /* *************************** 对外方法区 *************************** */ + + /** + * 更新追加的上下文数据 + * + * @param newAppendedWfContext 追加的上下文数据 + * @since 2021/02/05 + */ + public void updateAppendedWfContext(Map newAppendedWfContext) { + + // check + if (instanceInfo.getWfInstanceId() == null || CollectionUtils.isEmpty(newAppendedWfContext)) { + // 只有工作流中的任务才有存储的必要 + return; + } + // 先判断当前上下文大小是否超出限制 + final int sizeThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextSize(); + for (Map.Entry entry : newAppendedWfContext.entrySet()) { + if (appendedWfContext.size() >= sizeThreshold) { + log.warn("[TaskTracker-{}] current size of appended workflow context data is greater than {}, the rest of appended workflow context data will be ignore! ", instanceInfo.getInstanceId(), sizeThreshold); + break; + } + String originValue = appendedWfContext.put(entry.getKey(), entry.getValue()); + log.info("[TaskTracker-{}] update appended workflow context data {} : {} -> {}", instanceInfo.getInstanceId(), entry.getKey(), originValue, entry.getValue()); + } + + } + + /** * 更新Task状态 * V1.0.0 -> V1.0.1(e405e283ad7f97b0b4e5d369c7de884c0caf9192) 锁方案变更,从 synchronized (taskId.intern()) 修改为分段锁,能大大减少内存占用,损失的只有理论并发度而已 + * * @param subInstanceId 子任务实例ID - * @param taskId task的ID(task为任务实例的执行单位) - * @param newStatus task的新状态 - * @param reportTime 上报时间 - * @param result task的执行结果,未执行完成时为空 + * @param taskId task的ID(task为任务实例的执行单位) + * @param newStatus task的新状态 + * @param reportTime 上报时间 + * @param result task的执行结果,未执行完成时为空 */ + @SuppressWarnings({"squid:S3776","squid:S2142"}) public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) { if (finished.get()) { @@ -168,7 +228,7 @@ public abstract class TaskTracker { Optional taskOpt = taskPersistenceService.getTask(instanceId, taskId); if (taskOpt.isPresent()) { lastReportTime = taskOpt.get().getLastReportTime(); - }else { + } else { // 理论上不存在这种情况,除非数据库异常 log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId); } @@ -235,6 +295,7 @@ public abstract class TaskTracker { } } catch (InterruptedException ignore) { + // ignore } catch (Exception e) { log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e); } finally { @@ -244,6 +305,7 @@ public abstract class TaskTracker { /** * 提交Task任务(MapReduce的Map,Broadcast的广播),上层保证 batchSize,同时插入过多数据可能导致失败 + * * @param newTaskList 新增的子任务列表 */ public boolean submitTask(List newTaskList) { @@ -269,6 +331,7 @@ public abstract class TaskTracker { /** * 处理 ProcessorTracker 的心跳信息 + * * @param heartbeatReq ProcessorTracker(任务的执行管理器)发来的心跳包,包含了其当前状态 */ public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) { @@ -290,10 +353,11 @@ public abstract class TaskTracker { /** * 生成广播任务 + * * @param preExecuteSuccess 预执行广播任务运行状态 - * @param subInstanceId 子实例ID - * @param preTaskId 预执行广播任务的taskId - * @param result 预执行广播任务的结果 + * @param subInstanceId 子实例ID + * @param preTaskId 预执行广播任务的taskId + * @param result 预执行广播任务的结果 */ public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, String result) { @@ -317,7 +381,7 @@ public abstract class TaskTracker { subTaskList.add(subTask); } submitTask(subTaskList); - }else { + } else { log.warn("[TaskTracker-{}-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", instanceId, subInstanceId, result); } } @@ -334,7 +398,6 @@ public abstract class TaskTracker { scheduledPool.shutdown(); // 1. 通知 ProcessorTracker 释放资源 - Long instanceId = instanceInfo.getInstanceId(); TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq(); stopRequest.setInstanceId(instanceId); ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> { @@ -348,7 +411,7 @@ public abstract class TaskTracker { boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId); if (!dbSuccess) { log.error("[TaskTracker-{}] delete tasks from database failed.", instanceId); - }else { + } else { log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId); } @@ -368,7 +431,8 @@ public abstract class TaskTracker { /** * 派发任务到 ProcessorTracker - * @param task 需要被执行的任务 + * + * @param task 需要被执行的任务 * @param processorTrackerAddress ProcessorTracker的地址(IP:Port) */ protected void dispatchTask(TaskDO task, String processorTrackerAddress) { @@ -400,6 +464,7 @@ public abstract class TaskTracker { /** * 获取任务实例产生的各个Task状态,用于分析任务实例执行情况 + * * @param subInstanceId 子任务实例ID * @return InstanceStatisticsHolder */ @@ -418,7 +483,6 @@ public abstract class TaskTracker { } - /** * 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去 */ @@ -435,7 +499,6 @@ public abstract class TaskTracker { } Stopwatch stopwatch = Stopwatch.createStarted(); - Long instanceId = instanceInfo.getInstanceId(); // 1. 获取可以派发任务的 ProcessorTracker List availablePtIps = ptStatusHolder.getAvailableProcessorTrackers(); @@ -502,7 +565,7 @@ public abstract class TaskTracker { log.info("[TaskTracker-{}] detective new worker: {}", instanceId, address); } }); - }catch (Exception e) { + } catch (Exception e) { log.warn("[TaskTracker-{}] detective failed!", instanceId, e); } } @@ -531,13 +594,15 @@ public abstract class TaskTracker { /** * 初始化 TaskTracker + * * @param req 服务器调度任务实例运行请求 */ - abstract protected void initTaskTracker(ServerScheduleJobReq req); + protected abstract void initTaskTracker(ServerScheduleJobReq req); /** * 查询任务实例的详细运行状态 + * * @return 任务实例的详细运行状态 */ - abstract public InstanceDetail fetchRunningStatus(); + public abstract InstanceDetail fetchRunningStatus(); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskDO.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskDO.java index 33adcc8a..6a41f942 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskDO.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskDO.java @@ -16,29 +16,55 @@ import org.springframework.util.StringUtils; @Setter public class TaskDO { - // 层次命名法,可以表示 Map 后的父子关系,如 0.1.2 代表 rootTask map 的第一个 task map 的第二个 task + /** + * 层次命名法,可以表示 Map 后的父子关系,如 0.1.2 代表 rootTask map 的第一个 task map 的第二个 task + */ private String taskId; - + /** + * 任务实例 ID + */ private Long instanceId; - // 秒级任务专用 + /** + * 秒级任务专用 + * 对于普通任务而言 等于 instanceId + * 对于秒级(固定频率)任务 自增长 + */ private Long subInstanceId; - // 任务名称 + /** + * 任务名称 + */ private String taskName; - // 任务对象(序列化后的二进制数据) + /** + * 任务对象(序列化后的二进制数据) + */ private byte[] taskContent; - // 对于TaskTracker为workerAddress(派发地址),对于普通Worker为TaskTrackerAddress(汇报地址),所有地址都是 IP:Port + /** + * 对于TaskTracker为workerAddress(派发地址),对于普通Worker为TaskTrackerAddress(汇报地址),所有地址都是 IP:Port + */ private String address; - // 任务状态,0~10代表 JobTracker 使用,11~20代表普通Worker使用 + /** + * 任务状态,0~10代表 JobTracker 使用,11~20代表普通Worker使用 + */ private Integer status; - // 执行结果 + /** + * 执行结果 + */ private String result; - // 失败次数 + /** + * 失败次数 + */ private Integer failedCnt; - // 创建时间 + /** + * 创建时间 + */ private Long createdTime; - // 最后修改时间 + /** + * 最后修改时间 + */ private Long lastModifiedTime; - // ProcessorTracker 最后上报时间 + /** + * ProcessorTracker 最后上报时间 + */ private Long lastReportTime; public String getUpdateSQL() { diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorReportTaskStatusReq.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorReportTaskStatusReq.java index 9d33b8b4..aad7c815 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorReportTaskStatusReq.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorReportTaskStatusReq.java @@ -1,9 +1,10 @@ package com.github.kfcfans.powerjob.worker.pojo.request; import com.github.kfcfans.powerjob.common.OmsSerializable; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; +import lombok.*; +import lombok.experimental.Accessors; + +import java.util.Map; /** @@ -13,14 +14,16 @@ import lombok.NoArgsConstructor; * @since 2020/3/17 */ @Data +@Accessors(chain = true) @NoArgsConstructor -@AllArgsConstructor public class ProcessorReportTaskStatusReq implements OmsSerializable { public static final Integer BROADCAST = 1; private Long instanceId; + private Long subInstanceId; + private String taskId; private int status; @@ -28,10 +31,17 @@ public class ProcessorReportTaskStatusReq implements OmsSerializable { * 执行完成时才有 */ private String result; - - // 上报时间 + /** + * 上报时间 + */ private long reportTime; - - // 特殊请求名称 + /** + * 特殊请求名称 + */ private Integer cmd; + /** + * 追加的工作流下文数据 + * @since 2021/02/05 + */ + private Map appendedWfContext; }