feat: 工作流任务支持更新上下文数据(后续任务的实例参数信息)

This commit is contained in:
Echo009 2021-02-18 15:06:49 +08:00
parent 814d4321a1
commit 11b712332d
20 changed files with 600 additions and 220 deletions

View File

@ -3,6 +3,8 @@ package com.github.kfcfans.powerjob.common.request;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.Data;
import java.util.Map;
/**
* TaskTracker 将状态上报给服务器
@ -14,19 +16,31 @@ import lombok.Data;
public class TaskTrackerReportInstanceStatusReq implements OmsSerializable {
private Long jobId;
private Long instanceId;
private Long wfInstanceId;
/**
* 追加的工作流上下文数据
* @since 2021/02/05
*/
private Map<String,String> appendedWfContext;
private int instanceStatus;
private String result;
/* ********* 统计信息 ********* */
private long totalTaskNum;
private long succeedTaskNum;
private long failedTaskNum;
private long startTime;
private long reportTime;
private String sourceAddress;
}

View File

@ -15,14 +15,17 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRep
import com.github.kfcfans.powerjob.server.service.InstanceLogService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
@ -40,6 +43,8 @@ public class WorkerRequestHandler {
@Resource
private InstanceManager instanceManager;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
@Resource
private InstanceLogService instanceLogService;
@Resource
private ContainerInfoRepository containerInfoRepository;
@ -58,7 +63,13 @@ public class WorkerRequestHandler {
* 处理 instance 状态
* @param req 任务实例的状态上报请求
*/
public Optional<AskResponse> onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) throws Exception {
public Optional<AskResponse> onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) throws ExecutionException {
// 2021/02/05 如果是工作流中的实例先尝试更新上下文信息再更新实例状态这里一定不会有异常
if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) {
// 更新工作流上下文信息
workflowInstanceManager.updateWorkflowContext(req.getWfInstanceId(),req.getAppendedWfContext());
}
instanceManager.updateStatus(req);
// 结束状态成功/失败需要回复消息

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.service.workflow;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.github.kfcfans.powerjob.common.*;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
@ -20,6 +21,7 @@ import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter;
import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarm;
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.github.kfcfans.powerjob.server.service.lock.local.UseSegmentLock;
import com.google.common.collect.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
@ -36,6 +38,7 @@ import java.util.*;
*/
@Slf4j
@Service
@SuppressWarnings("squid:S1192")
public class WorkflowInstanceManager {
@Resource
@ -155,7 +158,7 @@ public class WorkflowInstanceManager {
JobInfoDO jobInfo = jobInfoRepository.findById(root.getJobId()).orElseGet(JobInfoDO::new);
if (jobInfo.getId() == null) {
// 在创建工作流实例到当前的这段时间内 job 信息被物理删除了
log.error("[Workflow-{}|{}]job info of current node{nodeId={},jobId={}} is not present! maybe you have deleted the job!", wfInfo.getId(), wfInstanceId, root.getNodeId(), root.getJobId());
log.error("[Workflow-{}|{}]job info of current node(nodeId={},jobId={}) is not present! maybe you have deleted the job!", wfInfo.getId(), wfInstanceId, root.getNodeId(), root.getJobId());
}
nodeId2JobInfoMap.put(root.getNodeId(), jobInfo);
// instanceParam 传递的是工作流实例的 wfContext
@ -163,7 +166,7 @@ public class WorkflowInstanceManager {
root.setInstanceId(instanceId);
root.setStatus(InstanceStatus.RUNNING.getV());
log.info("[Workflow-{}|{}] create root instance(jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId, root.getJobId(), instanceId);
log.info("[Workflow-{}|{}] create root instance(nodeId={},jobId={},instanceId={}) successfully~", wfInfo.getId(), wfInstanceId,root.getNodeId(), root.getJobId(), instanceId);
});
// 持久化
@ -195,7 +198,7 @@ public class WorkflowInstanceManager {
* @param status 完成任务的任务实例状态SUCCEED/FAILED/STOPPED
* @param result 完成任务的任务实例结果
*/
@SuppressWarnings({"squid:S3776","squid:S2142","squid:S1141"})
@SuppressWarnings({"squid:S3776", "squid:S2142", "squid:S1141"})
public void move(Long wfInstanceId, Long instanceId, InstanceStatus status, String result) {
int lockId = wfInstanceId.hashCode();
@ -228,7 +231,7 @@ public class WorkflowInstanceManager {
node.setStatus(status.getV());
node.setResult(result);
log.info("[Workflow-{}|{}] node(jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId, node.getJobId(), instanceId, status.name(), result);
log.info("[Workflow-{}|{}] node(nodeId={},jobId={},instanceId={}) finished in workflowInstance, status={},result={}", wfId, wfInstanceId,node.getNodeId(), node.getJobId(), instanceId, status.name(), result);
}
if (InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) {
@ -299,7 +302,7 @@ public class WorkflowInstanceManager {
JobInfoDO jobInfo = jobInfoRepository.findById(currentNode.getJobId()).orElseGet(JobInfoDO::new);
if (jobInfo.getId() == null) {
// 在创建工作流实例到当前的这段时间内 job 信息被物理删除了
log.error("[Workflow-{}|{}]job info of current node{nodeId={},jobId={}} is not present! maybe you have deleted the job!", wfId, wfInstanceId, currentNode.getNodeId(), currentNode.getJobId());
log.error("[Workflow-{}|{}]job info of current node(nodeId={},jobId={}) is not present! maybe you have deleted the job!", wfId, wfInstanceId, currentNode.getNodeId(), currentNode.getJobId());
}
nodeId2JobInfoMap.put(nodeId, jobInfo);
// instanceParam 传递的是工作流实例的 wfContext
@ -327,6 +330,37 @@ public class WorkflowInstanceManager {
}
}
/**
* 更新工作流上下文
* @since 2021/02/05
* @param wfInstanceId 工作流实例
* @param appendedWfContextData 追加的上下文数据
*/
@UseSegmentLock(type = "updateWfContext", key = "#wfInstanceId.intValue()", concurrencyLevel = 1024)
public void updateWorkflowContext(Long wfInstanceId, Map<String, String> appendedWfContextData) {
try {
Optional<WorkflowInstanceInfoDO> wfInstanceInfoOpt = workflowInstanceInfoRepository.findByWfInstanceId(wfInstanceId);
if (!wfInstanceInfoOpt.isPresent()) {
log.error("[WorkflowInstanceManager] can't find metadata by workflowInstanceId({}).", wfInstanceId);
return;
}
WorkflowInstanceInfoDO wfInstance = wfInstanceInfoOpt.get();
HashMap<String, String> wfContext = JSON.parseObject(wfInstance.getWfContext(), new TypeReference<HashMap<String, String>>() {
});
for (Map.Entry<String, String> entry : appendedWfContextData.entrySet()) {
String key = entry.getKey();
String originValue = wfContext.put(key, entry.getValue());
log.info("[Workflow-{}|{}] update workflow context {} : {} -> {}", wfInstance.getWorkflowId(), wfInstance.getWfInstanceId(), key, originValue, entry.getValue());
}
wfInstance.setWfContext(JSON.toJSONString(wfContext));
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
} catch (Exception e) {
log.error("[WorkflowInstanceManager] update workflow(workflowInstanceId={}) context failed.", wfInstanceId, e);
}
}
/**
* 运行任务实例

View File

@ -27,10 +27,14 @@ public class MapProcessorDemo extends MapProcessor {
@Resource
private MysteryService mysteryService;
// 每一批发送任务大小
private static final int batchSize = 100;
// 发送的批次
private static final int batchNum = 2;
/**
* 每一批发送任务大小
*/
private static final int BATCH_SIZE = 100;
/**
* 发送的批次
*/
private static final int BATCH_NUM = 5;
@Override
public ProcessResult process(TaskContext context) throws Exception {
@ -43,17 +47,19 @@ public class MapProcessorDemo extends MapProcessor {
if (isRootTask()) {
System.out.println("==== MAP ====");
List<SubTask> subTasks = Lists.newLinkedList();
for (int j = 0; j < batchNum; j++) {
for (int j = 0; j < BATCH_NUM; j++) {
SubTask subTask = new SubTask();
subTask.siteId = j;
subTask.itemIds = Lists.newLinkedList();
subTasks.add(subTask);
for (int i = 0; i < batchSize; i++) {
for (int i = 0; i < BATCH_SIZE; i++) {
subTask.itemIds.add(i);
}
}
return map(subTasks, "MAP_TEST_TASK");
}else {
// 测试在 Map 任务中追加上下文
context.appendData2WfContext("Yasuo","A sword's poor company for a long road.");
System.out.println("==== PROCESS ====");
System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask()));
boolean b = ThreadLocalRandom.current().nextBoolean();

View File

@ -0,0 +1,43 @@
package com.github.kfcfans.powerjob.samples.tester;
import com.github.kfcfans.powerjob.common.WorkflowContextConstant;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 测试追加工作流上下文数据
* com.github.kfcfans.powerjob.samples.tester.AppendWorkflowContextTester
*
* @author Echo009
* @since 2021/2/6
*/
@Component
public class AppendWorkflowContextTester implements BasicProcessor {
@Override
@SuppressWarnings("squid:S106")
public ProcessResult process(TaskContext context) throws Exception {
Map<String, String> workflowContext = context.fetchWorkflowContext();
String originValue = workflowContext.get(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY);
System.out.println("======= AppendWorkflowContextTester#start =======");
System.out.println("current instance id : " + context.getInstanceId());
System.out.println("current workflow context : " + workflowContext);
System.out.println("initParam of workflow context : " + originValue);
int num = 0;
try {
num = Integer.parseInt(originValue);
} catch (Exception e) {
// ignore
}
context.appendData2WfContext(WorkflowContextConstant.CONTEXT_INIT_PARAMS_KEY, num + 1);
System.out.println("======= AppendWorkflowContextTester#end =======");
return new ProcessResult(true, JsonUtils.toJSONString(context));
}
}

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.powerjob.samples.workflow;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSON;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
@ -22,14 +22,13 @@ public class WorkflowStandaloneProcessor implements BasicProcessor {
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger logger = context.getOmsLogger();
logger.info("current:" + context.getJobParams());
System.out.println("current: " + context.getJobParams());
System.out.println("currentContext:");
System.out.println(JSONObject.toJSONString(context));
System.out.println("jobParams: " + context.getJobParams());
System.out.println("currentContext:"+JSON.toJSONString(context));
// 尝试获取上游任务
Map<Long, String> upstreamTaskResult = context.fetchUpstreamTaskResult();
System.out.println("工作流上游任务数据:");
System.out.println(upstreamTaskResult);
Map<String, String> workflowContext = context.fetchWorkflowContext();
System.out.println("工作流上下文数据:");
System.out.println(workflowContext);
return new ProcessResult(true, context.getJobId() + " process successfully.");
}

View File

@ -69,6 +69,14 @@ public class PowerJobAutoConfiguration {
* or validate appName.
*/
config.setEnableTestMode(worker.isEnableTestMode());
/*
* Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.
*/
config.setMaxAppendedWfContextLength(worker.getMaxAppendedWfContextLength());
/*
* Max size of appended workflow context. Appended workflow context that is greater than the value will be truncate.
*/
config.setMaxAppendedWfContextSize(worker.getMaxAppendedWfContextSize());
/*
* Create OhMyWorker object and set properties.
*/

View File

@ -3,6 +3,8 @@ package com.github.kfcfans.powerjob.worker.autoconfigure;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ -133,5 +135,16 @@ public class PowerJobProperties {
* Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application
*/
private boolean enableTestMode = false;
/**
* Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.
* {@link TaskContext} max length for #appendedContextData key and value.
*/
private int maxAppendedWfContextLength = 8096;
/**
* Max size of appended workflow context. Appended workflow context that is greater than the value will be truncated.
* {@link TaskContext} max size for #appendedContextData
* {@link TaskTracker} max size for #appendedWfContext
*/
private int maxAppendedWfContextSize = 16;
}
}

View File

@ -50,6 +50,18 @@
"type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
"description": "Local store strategy, disk or memory",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
},
{
"name": "powerjob.worker.max-appended-wf-context-length",
"type": "java.lang.Integer",
"description": "Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
},
{
"name": "powerjob.worker.max-appended-wf-context-size",
"type": "java.lang.Integer",
"description": "Max size of appended workflow context. Appended workflow context that is greater than the value will be truncated.",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
}
],
"hints": []

View File

@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* worker的master节点处理来自server的jobInstance请求和来自worker的task请求
* worker master 节点处理来自 server jobInstance 请求和来自 worker 的task 请求
*
* @author tjq
* @since 2020/3/17
@ -66,6 +66,9 @@ public class TaskTrackerActor extends AbstractActor {
}
taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
// 更新工作流上下文
taskTracker.updateAppendedWfContext(req.getAppendedWfContext());
}
/**

View File

@ -3,6 +3,8 @@ package com.github.kfcfans.powerjob.worker.common;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.powerjob.worker.extension.SystemMetricsCollector;
import com.google.common.collect.Lists;
import lombok.Getter;
@ -54,6 +56,18 @@ public class OhMyConfig {
* Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application
*/
private boolean enableTestMode = false;
/**
* Max length of appended workflow context value length. Appended workflow context value that is longer than the value will be ignore.
* {@link TaskContext} max length for #appendedContextData key and value.
*/
private int maxAppendedWfContextLength = 8096;
/**
* Max size of appended workflow context. Appended workflow context that is greater than the value will be truncated.
* {@link TaskContext} max size for #appendedContextData
* {@link TaskTracker} max size for #appendedWfContext
*/
private int maxAppendedWfContextSize = 16;
private SystemMetricsCollector systemMetricsCollector;

View File

@ -1,18 +1,21 @@
package com.github.kfcfans.powerjob.worker.common.constants;
/**
* task
* task
*
* @author tjq
* @since 2020/3/17
*/
public class TaskConstant {
private TaskConstant() {
}
/**
* 所有根任务的名称
*/
public static final String ROOT_TASK_NAME = "OMS_ROOT_TASK";
/**
* 广播执行任务的名称
*/
@ -21,7 +24,5 @@ public class TaskConstant {
* 终极任务的名称MapReduce的reduceTask和Broadcast的postProcess会有该任务
*/
public static final String LAST_TASK_NAME = "OMS_LAST_TASK";
// 除0外任何数都可以
public static final String LAST_TASK_ID = "9999";
}

View File

@ -2,24 +2,23 @@ package com.github.kfcfans.powerjob.worker.core.executor;
import akka.actor.ActorSelection;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.TaskResult;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.MapReduceProcessor;
import com.google.common.base.Stopwatch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -27,6 +26,7 @@ import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.Queue;
/**
@ -37,6 +37,7 @@ import java.util.Queue;
*/
@Slf4j
@AllArgsConstructor
@SuppressWarnings("squid:S1181")
public class ProcessorRunnable implements Runnable {
@ -45,9 +46,13 @@ public class ProcessorRunnable implements Runnable {
private final TaskDO task;
private final BasicProcessor processor;
private final OmsLogger omsLogger;
// 类加载器
/**
* 类加载器
*/
private final ClassLoader classLoader;
// 重试队列ProcessorTracker 将会定期重新上报处理结果
/**
* 重试队列ProcessorTracker 将会定期重新上报处理结果
*/
private final Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
public void innerRun() throws InterruptedException {
@ -56,8 +61,41 @@ public class ProcessorRunnable implements Runnable {
Long instanceId = task.getInstanceId();
log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());
ThreadLocalStore.setTask(task);
// 0. 构造任务上下文
TaskContext taskContext = constructTaskContext();
// 1. 上报执行信息
reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null);
// 0. 完成执行上下文准备 & 上报执行信息
ProcessResult processResult;
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
// 2. 根任务 & 广播执行 特殊处理
if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName()) && executeType == ExecuteType.BROADCAST) {
// 广播执行先选本机执行 preProcess完成后 TaskTracker 再为所有 Worker 生成子 Task
handleBroadcastRootTask(instanceId, taskContext);
return;
}
// 3. 最终任务特殊处理一定和 TaskTracker 处于相同的机器
if (TaskConstant.LAST_TASK_NAME.equals(task.getTaskName())) {
handleLastTask(taskId, instanceId, taskContext, executeType);
return;
}
// 4. 正式提交运行
try {
processResult = processor.process(taskContext);
} catch (Throwable e) {
log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);
processResult = new ProcessResult(false, e.toString());
}
//
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, taskContext.getAppendedContextData());
}
private TaskContext constructTaskContext() {
TaskContext taskContext = new TaskContext();
BeanUtils.copyProperties(task, taskContext);
taskContext.setJobId(instanceInfo.getJobId());
@ -70,101 +108,87 @@ public class ProcessorRunnable implements Runnable {
taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent()));
}
taskContext.setUserContext(OhMyWorker.getConfig().getUserContext());
ThreadLocalStore.setTask(task);
return taskContext;
}
reportStatus(TaskStatus.WORKER_PROCESSING, null, null);
// 1. 根任务特殊处理
/**
* 处理最终任务
* BROADCAST => {@link BroadcastProcessor#postProcess}
* MAP_REDUCE => {@link MapReduceProcessor#reduce}
*/
private void handleLastTask(String taskId, Long instanceId, TaskContext taskContext, ExecuteType executeType) {
ProcessResult processResult;
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName())) {
Stopwatch stopwatch = Stopwatch.createStarted();
log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
// 广播执行先选本机执行 preProcess完成后TaskTracker再为所有Worker生成子Task
if (executeType == ExecuteType.BROADCAST) {
if (processor instanceof BroadcastProcessor) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
try {
processResult = broadcastProcessor.preProcess(taskContext);
}catch (Throwable e) {
log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e);
processResult = new ProcessResult(false, e.toString());
}
}else {
processResult = new ProcessResult(true, "NO_PREPOST_TASK");
}
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST);
// 广播执行的第一个 task 只执行 preProcess 部分
return;
}
}
// 2. 最终任务特殊处理一定和 TaskTracker 处于相同的机器
if (TaskConstant.LAST_TASK_NAME.equals(task.getTaskName())) {
Stopwatch stopwatch = Stopwatch.createStarted();
log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
List<TaskResult> taskResults = TaskPersistenceService.INSTANCE.getAllTaskResult(instanceId, task.getSubInstanceId());
try {
switch (executeType) {
case BROADCAST:
if (processor instanceof BroadcastProcessor) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
processResult = broadcastProcessor.postProcess(taskContext, taskResults);
}else {
processResult = BroadcastProcessor.defaultResult(taskResults);
}
break;
case MAP_REDUCE:
if (processor instanceof MapReduceProcessor) {
MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor;
processResult = mapReduceProcessor.reduce(taskContext, taskResults);
}else {
processResult = new ProcessResult(false, "not implement the MapReduceProcessor");
}
break;
default:
processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
}
}catch (Throwable e) {
processResult = new ProcessResult(false, e.toString());
log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e);
}
TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
reportStatus(status, suit(processResult.getMsg()), null);
log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch);
return;
}
// 3. 正式提交运行
List<TaskResult> taskResults = TaskPersistenceService.INSTANCE.getAllTaskResult(instanceId, task.getSubInstanceId());
try {
processResult = processor.process(taskContext);
}catch (Throwable e) {
log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);
switch (executeType) {
case BROADCAST:
if (processor instanceof BroadcastProcessor) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
processResult = broadcastProcessor.postProcess(taskContext, taskResults);
} else {
processResult = BroadcastProcessor.defaultResult(taskResults);
}
break;
case MAP_REDUCE:
if (processor instanceof MapReduceProcessor) {
MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor;
processResult = mapReduceProcessor.reduce(taskContext, taskResults);
} else {
processResult = new ProcessResult(false, "not implement the MapReduceProcessor");
}
break;
default:
processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
}
} catch (Throwable e) {
processResult = new ProcessResult(false, e.toString());
log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e);
}
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null);
TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
reportStatus(status, suit(processResult.getMsg()), null, taskContext.getAppendedContextData());
log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch);
}
/**
* 处理广播执行的根任务
* 即执行 {@link BroadcastProcessor#preProcess}并通知 TaskerTracker 创建广播子任务
*/
private void handleBroadcastRootTask(Long instanceId, TaskContext taskContext) {
ProcessResult processResult;
// 广播执行的第一个 task 只执行 preProcess 部分
if (processor instanceof BroadcastProcessor) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
try {
processResult = broadcastProcessor.preProcess(taskContext);
} catch (Throwable e) {
log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e);
processResult = new ProcessResult(false, e.toString());
}
} else {
processResult = new ProcessResult(true, "NO_PREPOST_TASK");
}
// 通知 TaskerTracker 创建广播子任务
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST, taskContext.getAppendedContextData());
}
/**
* 上报状态给 TaskTracker
*
* @param status Task状态
* @param result 执行结果只有结束时才存在
* @param cmd 特殊需求比如广播执行需要创建广播任务
* @param cmd 特殊需求比如广播执行需要创建广播任务
*/
private void reportStatus(TaskStatus status, String result, Integer cmd) {
CommonUtils.easySleep(1);
private void reportStatus(TaskStatus status, String result, Integer cmd, Map<String, String> appendedWfContext) {
ProcessorReportTaskStatusReq req = new ProcessorReportTaskStatusReq();
req.setInstanceId(task.getInstanceId());
@ -174,6 +198,7 @@ public class ProcessorRunnable implements Runnable {
req.setResult(result);
req.setReportTime(System.currentTimeMillis());
req.setCmd(cmd);
req.setAppendedWfContext(appendedWfContext);
// 最终结束状态要求可靠发送
if (TaskStatus.finishedStatus.contains(status.getValue())) {
@ -189,21 +214,25 @@ public class ProcessorRunnable implements Runnable {
}
@Override
@SuppressWarnings("squid:S2142")
public void run() {
// 切换线程上下文类加载器否则用的是 Worker 类加载器不存在容器类在序列化/反序列化时会报 ClassNotFoundException
Thread.currentThread().setContextClassLoader(classLoader);
try {
innerRun();
}catch (InterruptedException ignore) {
}catch (Throwable e) {
reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null);
} catch (InterruptedException ignore) {
// ignore
} catch (Throwable e) {
reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null, null);
log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e);
}finally {
} finally {
ThreadLocalStore.clear();
}
}
// 裁剪返回结果到合适的大小
/**
* 裁剪返回结果到合适的大小
*/
private String suit(String result) {
if (StringUtils.isEmpty(result)) {

View File

@ -1,12 +1,16 @@
package com.github.kfcfans.powerjob.worker.core.processor;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.github.kfcfans.powerjob.common.WorkflowContextConstant;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@ -16,6 +20,8 @@ import java.util.Map;
* 单机任务整个Job变成一个Task
* 广播任务整个job变成一堆一样的Task
* MR 任务被map出来的任务都视为根Task的子Task
* <p>
* 2021/02/04 移除 fetchUpstreamTaskResult 方法
*
* @author tjq
* @since 2020/3/18
@ -23,14 +29,18 @@ import java.util.Map;
@Getter
@Setter
@ToString
@Slf4j
public class TaskContext {
private Long jobId;
private Long instanceId;
private Long subInstanceId;
private String taskId;
private String taskName;
private Long instanceId;
private Long subInstanceId;
private String taskId;
private String taskName;
/**
* 通过控制台传递的参数
*/
@ -38,7 +48,7 @@ public class TaskContext {
/**
* 任务实例运行中参数
* 若该任务实例通过 OpenAPI 触发则该值为 OpenAPI 传递的参数
* 若该任务为工作流的某个节点则该值为上游任务传递下来的数据推荐通过 {@link TaskContext#fetchUpstreamTaskResult()} 方法获取
* 若该任务为工作流的某个节点则该值为工作流实例的上下文 ( wfContext )
*/
private String instanceParams;
/**
@ -56,29 +66,67 @@ public class TaskContext {
/**
* 在线日志记录
*/
@JsonIgnore
private OmsLogger omsLogger;
/**
* 用户自定义上下文
* 用户自定义上下文通过 {@link OhMyConfig} 初始化
*/
private Object userContext;
/**
* 追加的上下文数据
*/
private final Map<String,String> appendedContextData = Maps.newConcurrentMap();
/**
* 获取工作流上游任务传递的数据仅该任务实例由工作流触发时存在
* @return key: 上游任务的 jobIdvalue: 上游任务的 ProcessResult#result
* 获取工作流上下文 (MAP)本质上是将 instanceParams 解析成 MAP
* 初始参数的 key {@link WorkflowContextConstant#CONTEXT_INIT_PARAMS_KEY}
* 注意在没有传递初始参数时通过 CONTEXT_INIT_PARAMS_KEY 获取到的是 null
*
* @return 工作流上下文
* @author Echo009
* @since 2021/02/04
*/
@SuppressWarnings("rawtypes, unchecked")
public Map<Long, String> fetchUpstreamTaskResult() {
Map<Long, String> res = Maps.newHashMap();
if (StringUtils.isEmpty(instanceParams)) {
return res;
}
@SuppressWarnings({"rawtypes","unchecked"})
public Map<String, String> fetchWorkflowContext() {
Map<String, String> res = Maps.newHashMap();
try {
Map originMap = JsonUtils.parseObject(instanceParams, Map.class);
originMap.forEach((k, v) -> res.put(Long.valueOf(String.valueOf(k)), String.valueOf(v)));
originMap.forEach((k, v) -> res.put(String.valueOf(k), v == null ? null : String.valueOf(v)));
return res;
}catch (Exception ignore) {
} catch (Exception ignore) {
// ignore
}
return Maps.newHashMap();
}
/**
* 往工作流上下文添加数据
* 注意如果 key 在当前上下文中已存在那么会直接覆盖
*/
public synchronized void appendData2WfContext(String key,Object value){
String finalValue;
try {
// 先判断当前上下文大小是否超出限制
final int sizeThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextSize();
if (appendedContextData.size() >= sizeThreshold) {
log.warn("[TaskContext-{}|{}|{}] appended workflow context data size must be lesser than {}, current appended workflow context data(key={}) will be ignored!",instanceId,taskId,taskName,sizeThreshold,key);
}
finalValue = JsonUtils.toJSONStringUnsafe(value);
final int lengthThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextLength();
// 判断 key & value 是否超长度限制
if (key.length() > lengthThreshold || finalValue.length() > lengthThreshold) {
log.warn("[TaskContext-{}|{}|{}] appended workflow context data length must be shorter than {}, current appended workflow context data(key={}) will be ignored!",instanceId,taskId,taskName,lengthThreshold,key);
return;
}
} catch (Exception e) {
log.warn("[TaskContext-{}|{}|{}] fail to append data to workflow context, key : {}",instanceId,taskId,taskName, key);
return;
}
appendedContextData.put(key, JsonUtils.toJSONString(value));
}
}

View File

@ -13,6 +13,9 @@ public interface BasicProcessor {
/**
* 核心处理逻辑
* 可通过 {@link TaskContext#fetchWorkflowContext} 获取工作流上下文
* 可通过 {@link TaskContext#appendData2WfContext} 向工作流上下文中添加数据
*
* @param context 任务上下文可通过 jobParams instanceParams 分别获取控制台参数和OpenAPI传递的任务实例参数
* @return 处理结果msg有长度限制超长会被裁剪不允许返回 null
* @throws Exception 异常允许抛出异常但不推荐最好由业务开发者自己处理

View File

@ -13,6 +13,7 @@ import com.github.kfcfans.powerjob.worker.core.ProcessorBeanFactory;
import com.github.kfcfans.powerjob.worker.core.executor.ProcessorRunnable;
import com.github.kfcfans.powerjob.worker.core.processor.built.PythonProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.built.ShellProcessor;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.github.kfcfans.powerjob.worker.log.impl.OmsServerLogger;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
@ -20,7 +21,6 @@ import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
@ -40,43 +40,67 @@ import java.util.concurrent.*;
@Slf4j
public class ProcessorTracker {
// 记录创建时间
/**
* 记录创建时间
*/
private long startTime;
// 任务实例信息
/**
* 任务实例信息
*/
private InstanceInfo instanceInfo;
// 冗余 instanceId方便日志
/**
* 冗余 instanceId方便日志
*/
private Long instanceId;
// 任务执行器
/**
* 任务执行器
*/
private BasicProcessor processor;
// 容器可能为空
/**
* 容器可能为空
*/
private OmsContainer omsContainer;
// 在线日志
/**
* 在线日志
*/
private OmsLogger omsLogger;
// ProcessResult 上报失败的重试队列
/**
* ProcessResult 上报失败的重试队列
*/
private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
// 上一次空闲时间用于闲置判定
/**
* 上一次空闲时间用于闲置判定
*/
private long lastIdleTime;
// 上次完成任务数量用于闲置判定
/**
* 上次完成任务数量用于闲置判定
*/
private long lastCompletedTaskCount;
private String taskTrackerAddress;
private ActorSelection taskTrackerActorRef;
private ThreadPoolExecutor threadPool;
private ScheduledExecutorService timingPool;
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128;
// 长时间空闲的 ProcessorTracker 会发起销毁请求
/**
* 长时间空闲的 ProcessorTracker 会发起销毁请求
*/
private static final long MAX_IDLE_TIME = 120000;
// ProcessorTracker 出现根本性错误比如 Processor 创建失败所有的任务直接失败
/**
* ProcessorTracker 出现根本性错误比如 Processor 创建失败所有的任务直接失败
*/
private boolean lethal = false;
private String lethalReason;
/**
* 创建 ProcessorTracker其实就是创建了个执行用的线程池 T_T
*/
@SuppressWarnings("squid:S1181")
public ProcessorTracker(TaskTrackerStartTaskReq request) {
try {
// 赋值
@ -122,7 +146,14 @@ public class ProcessorTracker {
// 一旦 ProcessorTracker 出现异常所有提交到此处的任务直接返回失败防止形成死锁
// 死锁分析TT创建PTPT创建失败无法定期汇报心跳TT长时间未收到PT心跳认为PT宕机确实宕机了无法选择可用的PT再次派发任务死锁形成GG斯密达 T_T
if (lethal) {
ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, newTask.getSubInstanceId(), newTask.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), lethalReason, System.currentTimeMillis(), null);
ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq()
.setInstanceId(instanceId)
.setSubInstanceId(newTask.getSubInstanceId())
.setTaskId(newTask.getTaskId())
.setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue())
.setResult(lethalReason)
.setReportTime(System.currentTimeMillis());
taskTrackerActorRef.tell(report, null);
return;
}
@ -137,10 +168,10 @@ public class ProcessorTracker {
try {
threadPool.submit(processorRunnable);
success = true;
}catch (RejectedExecutionException ignore) {
} catch (RejectedExecutionException ignore) {
log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.",
instanceId, newTask.getTaskId(), newTask.getTaskName());
}catch (Exception e) {
} catch (Exception e) {
log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e);
}
@ -228,6 +259,7 @@ public class ProcessorTracker {
private class CheckerAndReporter implements Runnable {
@Override
@SuppressWarnings({"squid:S1066","squid:S3776"})
public void run() {
// 超时检查如果超时则自动关闭 TaskTracker
@ -245,10 +277,10 @@ public class ProcessorTracker {
if (threadPool.getActiveCount() > 0 || threadPool.getCompletedTaskCount() > lastCompletedTaskCount) {
lastIdleTime = -1;
lastCompletedTaskCount = threadPool.getCompletedTaskCount();
}else {
} else {
if (lastIdleTime == -1) {
lastIdleTime = System.currentTimeMillis();
}else {
} else {
long idleTime = System.currentTimeMillis() - lastIdleTime;
if (idleTime > MAX_IDLE_TIME) {
log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime);
@ -296,7 +328,7 @@ public class ProcessorTracker {
if (SpringUtils.supportSpringBean()) {
try {
processor = SpringUtils.getBean(processorInfo);
}catch (Exception e) {
} catch (Exception e) {
log.warn("[ProcessorTracker-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, ExceptionUtils.getMessage(e));
}
}
@ -318,7 +350,7 @@ public class ProcessorTracker {
omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), true);
if (omsContainer != null) {
processor = omsContainer.getProcessor(split[1]);
}else {
} else {
log.warn("[ProcessorTracker-{}] load container failed.", instanceId);
}
break;

View File

@ -33,12 +33,20 @@ import java.util.concurrent.*;
@ToString
public class CommonTaskTracker extends TaskTracker {
private static final String ROOT_TASK_ID = "0";
// 可以是除 ROOT_TASK_ID 的任何数字
private static final String LAST_TASK_ID = "1111";
// 连续上报多次失败后放弃上报视为结果不可达TaskTracker down
/**
* 连续上报多次失败后放弃上报视为结果不可达TaskTracker down
*/
private int reportFailedCnt = 0;
/**
* 根任务 ID
*/
public static final String ROOT_TASK_ID = "0";
/**
* 最后一个任务 ID
* {@link #ROOT_TASK_ID} 外任何数都可以
*/
public static final String LAST_TASK_ID = "9999";
private static final int MAX_REPORT_FAILED_THRESHOLD = 5;
protected CommonTaskTracker(ServerScheduleJobReq req) {
@ -131,6 +139,7 @@ public class CommonTaskTracker extends TaskTracker {
private static final long DISPATCH_TIME_OUT_MS = 15000;
@SuppressWarnings("squid:S3776")
private void innerRun() {
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
@ -144,7 +153,6 @@ public class CommonTaskTracker extends TaskTracker {
req.setJobId(instanceInfo.getJobId());
req.setInstanceId(instanceId);
req.setWfInstanceId(instanceInfo.getWfInstanceId());
req.setTotalTaskNum(finishedNum + unfinishedNum);
req.setSucceedTaskNum(holder.succeedNum);
req.setFailedTaskNum(holder.failedNum);
@ -161,7 +169,6 @@ public class CommonTaskTracker extends TaskTracker {
// 数据库中一个任务都没有说明根任务创建失败该任务实例失败
if (finishedNum == 0) {
finished.set(true);
success = false;
result = SystemInstanceResult.TASK_INIT_FAILED;
}else {
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
@ -231,6 +238,8 @@ public class CommonTaskTracker extends TaskTracker {
if (finished.get()) {
req.setResult(result);
// 上报追加的工作流上下文信息
req.setAppendedWfContext(appendedWfContext);
req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
CompletionStage<Object> askCS = Patterns.ask(serverActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));

View File

@ -29,6 +29,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
@ -36,6 +37,7 @@ import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -52,24 +54,48 @@ import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public abstract class TaskTracker {
// TaskTracker创建时间
protected long createTime;
// 任务实例ID使用频率过高 InstanceInfo 提取出来单独保存一份
protected long instanceId;
// 任务实例信息
protected InstanceInfo instanceInfo;
// ProcessTracker 状态管理
protected ProcessorTrackerStatusHolder ptStatusHolder;
// 数据库持久化服务
protected TaskPersistenceService taskPersistenceService;
// 定时任务线程池
/**
* TaskTracker创建时间
*/
protected final long createTime;
/**
* 任务实例ID使用频率过高 InstanceInfo 提取出来单独保存一份
*/
protected final long instanceId;
/**
* 任务实例信息
*/
protected final InstanceInfo instanceInfo;
/**
* ProcessTracker 状态管理
*/
protected final ProcessorTrackerStatusHolder ptStatusHolder;
/**
* 数据库持久化服务
*/
protected final TaskPersistenceService taskPersistenceService;
/**
* 定时任务线程池
*/
protected ScheduledExecutorService scheduledPool;
// 是否结束
protected AtomicBoolean finished;
// 上报时间缓存
/**
* 是否结束
*/
protected final AtomicBoolean finished;
/**
* 追加的工作流上下文数据
*
* @since 2021/02/05
*/
protected final Map<String, String> appendedWfContext;
/**
* 上报时间缓存
*/
private final Cache<String, Long> taskId2LastReportTime;
// 分段锁
/**
* 分段锁
*/
private final SegmentLock segmentLock;
private static final int UPDATE_CONCURRENCY = 4;
@ -93,7 +119,8 @@ public abstract class TaskTracker {
this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress());
this.taskPersistenceService = TaskPersistenceService.INSTANCE;
this.finished = new AtomicBoolean(false);
// 只有工作流中的任务允许向工作流中追加上下文数据
this.appendedWfContext = req.getWfInstanceId() == null ? Collections.emptyMap() : Maps.newConcurrentMap();
// 构建缓存
taskId2LastReportTime = CacheBuilder.newBuilder().maximumSize(1024).build();
@ -108,6 +135,7 @@ public abstract class TaskTracker {
/**
* 静态方法创建 TaskTracker
*
* @param req 服务端调度任务请求
* @return API/CRON -> CommonTaskTracker, FIX_RATE/FIX_DELAY -> FrequentTaskTracker
*/
@ -116,8 +144,10 @@ public abstract class TaskTracker {
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
switch (timeExpressionType) {
case FIXED_RATE:
case FIXED_DELAY:return new FrequentTaskTracker(req);
default:return new CommonTaskTracker(req);
case FIXED_DELAY:
return new FrequentTaskTracker(req);
default:
return new CommonTaskTracker(req);
}
} catch (Exception e) {
log.warn("[TaskTracker-{}] create TaskTracker from request({}) failed.", req.getInstanceId(), req, e);
@ -139,15 +169,45 @@ public abstract class TaskTracker {
}
/* *************************** 对外方法区 *************************** */
/**
* 更新追加的上下文数据
*
* @param newAppendedWfContext 追加的上下文数据
* @since 2021/02/05
*/
public void updateAppendedWfContext(Map<String, String> newAppendedWfContext) {
// check
if (instanceInfo.getWfInstanceId() == null || CollectionUtils.isEmpty(newAppendedWfContext)) {
// 只有工作流中的任务才有存储的必要
return;
}
// 先判断当前上下文大小是否超出限制
final int sizeThreshold = OhMyWorker.getConfig().getMaxAppendedWfContextSize();
for (Map.Entry<String, String> entry : newAppendedWfContext.entrySet()) {
if (appendedWfContext.size() >= sizeThreshold) {
log.warn("[TaskTracker-{}] current size of appended workflow context data is greater than {}, the rest of appended workflow context data will be ignore! ", instanceInfo.getInstanceId(), sizeThreshold);
break;
}
String originValue = appendedWfContext.put(entry.getKey(), entry.getValue());
log.info("[TaskTracker-{}] update appended workflow context data {} : {} -> {}", instanceInfo.getInstanceId(), entry.getKey(), originValue, entry.getValue());
}
}
/**
* 更新Task状态
* V1.0.0 -> V1.0.1e405e283ad7f97b0b4e5d369c7de884c0caf9192 锁方案变更 synchronized (taskId.intern()) 修改为分段锁能大大减少内存占用损失的只有理论并发度而已
*
* @param subInstanceId 子任务实例ID
* @param taskId task的IDtask为任务实例的执行单位
* @param newStatus task的新状态
* @param reportTime 上报时间
* @param result task的执行结果未执行完成时为空
* @param taskId task的IDtask为任务实例的执行单位
* @param newStatus task的新状态
* @param reportTime 上报时间
* @param result task的执行结果未执行完成时为空
*/
@SuppressWarnings({"squid:S3776","squid:S2142"})
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
if (finished.get()) {
@ -168,7 +228,7 @@ public abstract class TaskTracker {
Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
if (taskOpt.isPresent()) {
lastReportTime = taskOpt.get().getLastReportTime();
}else {
} else {
// 理论上不存在这种情况除非数据库异常
log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);
}
@ -235,6 +295,7 @@ public abstract class TaskTracker {
}
} catch (InterruptedException ignore) {
// ignore
} catch (Exception e) {
log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e);
} finally {
@ -244,6 +305,7 @@ public abstract class TaskTracker {
/**
* 提交Task任务(MapReduce的MapBroadcast的广播)上层保证 batchSize同时插入过多数据可能导致失败
*
* @param newTaskList 新增的子任务列表
*/
public boolean submitTask(List<TaskDO> newTaskList) {
@ -269,6 +331,7 @@ public abstract class TaskTracker {
/**
* 处理 ProcessorTracker 的心跳信息
*
* @param heartbeatReq ProcessorTracker任务的执行管理器发来的心跳包包含了其当前状态
*/
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
@ -290,10 +353,11 @@ public abstract class TaskTracker {
/**
* 生成广播任务
*
* @param preExecuteSuccess 预执行广播任务运行状态
* @param subInstanceId 子实例ID
* @param preTaskId 预执行广播任务的taskId
* @param result 预执行广播任务的结果
* @param subInstanceId 子实例ID
* @param preTaskId 预执行广播任务的taskId
* @param result 预执行广播任务的结果
*/
public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, String result) {
@ -317,7 +381,7 @@ public abstract class TaskTracker {
subTaskList.add(subTask);
}
submitTask(subTaskList);
}else {
} else {
log.warn("[TaskTracker-{}-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", instanceId, subInstanceId, result);
}
}
@ -334,7 +398,6 @@ public abstract class TaskTracker {
scheduledPool.shutdown();
// 1. 通知 ProcessorTracker 释放资源
Long instanceId = instanceInfo.getInstanceId();
TaskTrackerStopInstanceReq stopRequest = new TaskTrackerStopInstanceReq();
stopRequest.setInstanceId(instanceId);
ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> {
@ -348,7 +411,7 @@ public abstract class TaskTracker {
boolean dbSuccess = taskPersistenceService.deleteAllTasks(instanceId);
if (!dbSuccess) {
log.error("[TaskTracker-{}] delete tasks from database failed.", instanceId);
}else {
} else {
log.debug("[TaskTracker-{}] delete all tasks from database successfully.", instanceId);
}
@ -368,7 +431,8 @@ public abstract class TaskTracker {
/**
* 派发任务到 ProcessorTracker
* @param task 需要被执行的任务
*
* @param task 需要被执行的任务
* @param processorTrackerAddress ProcessorTracker的地址IP:Port
*/
protected void dispatchTask(TaskDO task, String processorTrackerAddress) {
@ -400,6 +464,7 @@ public abstract class TaskTracker {
/**
* 获取任务实例产生的各个Task状态用于分析任务实例执行情况
*
* @param subInstanceId 子任务实例ID
* @return InstanceStatisticsHolder
*/
@ -418,7 +483,6 @@ public abstract class TaskTracker {
}
/**
* 定时扫描数据库中的task出于内存占用量考虑每次最多获取100个并将需要执行的任务派发出去
*/
@ -435,7 +499,6 @@ public abstract class TaskTracker {
}
Stopwatch stopwatch = Stopwatch.createStarted();
Long instanceId = instanceInfo.getInstanceId();
// 1. 获取可以派发任务的 ProcessorTracker
List<String> availablePtIps = ptStatusHolder.getAvailableProcessorTrackers();
@ -502,7 +565,7 @@ public abstract class TaskTracker {
log.info("[TaskTracker-{}] detective new worker: {}", instanceId, address);
}
});
}catch (Exception e) {
} catch (Exception e) {
log.warn("[TaskTracker-{}] detective failed!", instanceId, e);
}
}
@ -531,13 +594,15 @@ public abstract class TaskTracker {
/**
* 初始化 TaskTracker
*
* @param req 服务器调度任务实例运行请求
*/
abstract protected void initTaskTracker(ServerScheduleJobReq req);
protected abstract void initTaskTracker(ServerScheduleJobReq req);
/**
* 查询任务实例的详细运行状态
*
* @return 任务实例的详细运行状态
*/
abstract public InstanceDetail fetchRunningStatus();
public abstract InstanceDetail fetchRunningStatus();
}

View File

@ -16,29 +16,55 @@ import org.springframework.util.StringUtils;
@Setter
public class TaskDO {
// 层次命名法可以表示 Map 后的父子关系 0.1.2 代表 rootTask map 的第一个 task map 的第二个 task
/**
* 层次命名法可以表示 Map 后的父子关系 0.1.2 代表 rootTask map 的第一个 task map 的第二个 task
*/
private String taskId;
/**
* 任务实例 ID
*/
private Long instanceId;
// 秒级任务专用
/**
* 秒级任务专用
* 对于普通任务而言 等于 instanceId
* 对于秒级固定频率任务 自增长
*/
private Long subInstanceId;
// 任务名称
/**
* 任务名称
*/
private String taskName;
// 任务对象序列化后的二进制数据
/**
* 任务对象序列化后的二进制数据
*/
private byte[] taskContent;
// 对于TaskTracker为workerAddress派发地址对于普通Worker为TaskTrackerAddress汇报地址所有地址都是 IP:Port
/**
* 对于TaskTracker为workerAddress派发地址对于普通Worker为TaskTrackerAddress汇报地址所有地址都是 IP:Port
*/
private String address;
// 任务状态010代表 JobTracker 使用1120代表普通Worker使用
/**
* 任务状态010代表 JobTracker 使用1120代表普通Worker使用
*/
private Integer status;
// 执行结果
/**
* 执行结果
*/
private String result;
// 失败次数
/**
* 失败次数
*/
private Integer failedCnt;
// 创建时间
/**
* 创建时间
*/
private Long createdTime;
// 最后修改时间
/**
* 最后修改时间
*/
private Long lastModifiedTime;
// ProcessorTracker 最后上报时间
/**
* ProcessorTracker 最后上报时间
*/
private Long lastReportTime;
public String getUpdateSQL() {

View File

@ -1,9 +1,10 @@
package com.github.kfcfans.powerjob.worker.pojo.request;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.*;
import lombok.experimental.Accessors;
import java.util.Map;
/**
@ -13,14 +14,16 @@ import lombok.NoArgsConstructor;
* @since 2020/3/17
*/
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
public class ProcessorReportTaskStatusReq implements OmsSerializable {
public static final Integer BROADCAST = 1;
private Long instanceId;
private Long subInstanceId;
private String taskId;
private int status;
@ -28,10 +31,17 @@ public class ProcessorReportTaskStatusReq implements OmsSerializable {
* 执行完成时才有
*/
private String result;
// 上报时间
/**
* 上报时间
*/
private long reportTime;
// 特殊请求名称
/**
* 特殊请求名称
*/
private Integer cmd;
/**
* 追加的工作流下文数据
* @since 2021/02/05
*/
private Map<String,String> appendedWfContext;
}