From fafb708c7a93e17e56d07ebf4032a62aa4dd4aa7 Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 18 Mar 2020 20:11:26 +0800 Subject: [PATCH] finished processor api design --- oh-my-scheduler-common/pom.xml | 12 +++ .../kfcfans/common/JobInstanceStatus.java | 21 +++++ .../common}/request/ServerScheduleJobReq.java | 2 +- .../TaskTrackerReportInstanceStatusReq.java | 26 ++++++ .../kfcfans/common/utils/CommonUtils.java | 38 ++++++++ oh-my-scheduler-worker/pom.xml | 17 ++-- .../oms/worker/actors/JobTrackerActor.java | 2 +- .../oms/worker/common/ThreadLocalStore.java | 16 ++++ .../worker/common/constants/AkkaConstant.java | 2 +- .../worker/common/constants/TaskStatus.java | 4 +- .../worker/persistence/SimpleTaskQuery.java | 13 ++- .../oms/worker/persistence/TaskDAO.java | 3 + .../oms/worker/persistence/TaskDAOImpl.java | 49 ++++++++++- .../persistence/TaskPersistenceService.java | 22 +++++ .../pojo/request/WorkerMapTaskRequest.java | 43 +++++++++ .../worker/pojo/response/MapTaskResponse.java | 14 +++ .../kfcfans/oms/worker/sdk/ProcessResult.java | 23 +++++ .../kfcfans/oms/worker/sdk/TaskContext.java | 32 +++++++ .../oms/worker/sdk/api/BasicProcessor.java | 16 ++++ .../worker/sdk/api/BroadcastProcessor.java | 21 +++++ .../worker/sdk/api/MapReduceProcessor.java | 75 ++++++++++++++++ .../worker/tracker/BroadcastTaskTracker.java | 8 ++ .../worker/tracker/MapReduceTaskTracker.java | 7 ++ .../worker/tracker/StandaloneTaskTracker.java | 8 ++ .../oms/worker/tracker/TaskTracker.java | 87 ++++++++++++------- 25 files changed, 509 insertions(+), 52 deletions(-) create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/JobInstanceStatus.java rename {oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo => oh-my-scheduler-common/src/main/java/com/github/kfcfans/common}/request/ServerScheduleJobReq.java (95%) create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java create mode 100644 oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/CommonUtils.java create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/ThreadLocalStore.java create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/response/MapTaskResponse.java create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/ProcessResult.java create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BasicProcessor.java create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BroadcastProcessor.java create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java diff --git a/oh-my-scheduler-common/pom.xml b/oh-my-scheduler-common/pom.xml index f3a9cae9..361424dd 100644 --- a/oh-my-scheduler-common/pom.xml +++ b/oh-my-scheduler-common/pom.xml @@ -13,5 +13,17 @@ 1.0.0-SNAPSHOT jar + + 1.7.30 + + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + \ No newline at end of file diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/JobInstanceStatus.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/JobInstanceStatus.java new file mode 100644 index 00000000..61d1f18d --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/JobInstanceStatus.java @@ -0,0 +1,21 @@ +package com.github.kfcfans.common; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * description + * + * @author tjq + * @since 2020/3/17 + */ +@Getter +@AllArgsConstructor +public enum JobInstanceStatus { + RUNNING(1, "运行中"), + SUCCEED(2, "运行成功"), + FAILED(3, "运行失败"); + + private int value; + private String des; +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ServerScheduleJobReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java similarity index 95% rename from oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ServerScheduleJobReq.java rename to oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java index f9cf1652..812a3ab8 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ServerScheduleJobReq.java +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/ServerScheduleJobReq.java @@ -1,4 +1,4 @@ -package com.github.kfcfans.oms.worker.pojo.request; +package com.github.kfcfans.common.request; import lombok.Data; diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java new file mode 100644 index 00000000..50ca11ae --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/TaskTrackerReportInstanceStatusReq.java @@ -0,0 +1,26 @@ +package com.github.kfcfans.common.request; + +import lombok.Data; + +/** + * TaskTracker 将状态上报给服务器 + * + * @author tjq + * @since 2020/3/17 + */ +@Data +public class TaskTrackerReportInstanceStatusReq { + + private String jobId; + private String instanceId; + + private int instanceStatus; + + private String result; + + /* ********* 统计信息 ********* */ + private long totalTaskNum; + private long runningTaskNum; + private long succeedTaskNum; + private long failedTaskNum; +} diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/CommonUtils.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/CommonUtils.java new file mode 100644 index 00000000..b174fffe --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/utils/CommonUtils.java @@ -0,0 +1,38 @@ +package com.github.kfcfans.common.utils; + +import lombok.extern.slf4j.Slf4j; + +import java.util.function.Supplier; + +/** + * 公共工具类 + * + * @author tjq + * @since 2020/3/18 + */ +@Slf4j +public class CommonUtils { + + /** + * 重试执行,仅适用于失败抛出异常的方法 + * @param executor 需要执行的方法 + * @param retryTimes 重试的次数 + * @param intervalMS 失败后下一次执行的间隔时间 + * @return 函数成功执行后的返回值 + * @throws Exception 执行失败,调用方自行处理 + */ + public static T executeWithRetry(Supplier executor, int retryTimes, long intervalMS) throws Exception { + if (retryTimes <= 1 || intervalMS <= 0) { + return executor.get(); + } + for (int i = 1; i < retryTimes; i++) { + try { + return executor.get(); + }catch (Exception e) { + log.warn("[CommonUtils] executeWithRetry failed, system will retry after {}ms.", intervalMS, e); + Thread.sleep(intervalMS); + } + } + return executor.get(); + } +} diff --git a/oh-my-scheduler-worker/pom.xml b/oh-my-scheduler-worker/pom.xml index adaf49cc..222b912e 100644 --- a/oh-my-scheduler-worker/pom.xml +++ b/oh-my-scheduler-worker/pom.xml @@ -16,11 +16,11 @@ 5.2.4.RELEASE 2.6.4 - 1.7.30 1.0.0-SNAPSHOT 1.4.200 3.4.2 28.2-jre + 1.2.58 @@ -40,13 +40,6 @@ ${akka.version} - - - org.slf4j - slf4j-api - 1.7.30 - - com.github.kfcfans @@ -74,6 +67,14 @@ ${guava.version} + + + com.alibaba + fastjson + ${fastjson.version} + + + ch.qos.logback diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/JobTrackerActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/JobTrackerActor.java index 7bd0cd50..534cf306 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/JobTrackerActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/JobTrackerActor.java @@ -1,7 +1,7 @@ package com.github.kfcfans.oms.worker.actors; import akka.actor.AbstractActor; -import com.github.kfcfans.oms.worker.pojo.request.ServerScheduleJobReq; +import com.github.kfcfans.common.request.ServerScheduleJobReq; import lombok.extern.slf4j.Slf4j; /** diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/ThreadLocalStore.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/ThreadLocalStore.java new file mode 100644 index 00000000..439e2086 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/ThreadLocalStore.java @@ -0,0 +1,16 @@ +package com.github.kfcfans.oms.worker.common; + +import com.github.kfcfans.oms.worker.sdk.TaskContext; + +/** + * 存储一些不方便直接传递的东西 + * #attention:警惕内存泄漏问题,最好在 ProcessorTracker destroy 时,执行 remove + * + * @author tjq + * @since 2020/3/18 + */ +public class ThreadLocalStore { + + public static final ThreadLocal TASK_CONTEXT_THREAD_LOCAL = new ThreadLocal<>(); + +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java index b81ce7b5..c2caa1fb 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java @@ -13,7 +13,7 @@ public class AkkaConstant { */ public static final String ACTOR_SYSTEM_NAME = "oms"; - public static final String JOB_TRACKER_ACTOR_NAME = "job_tracker"; + public static final String Task_TRACKER_ACTOR_NAME = "task_tracker"; public static final String WORKER_ACTOR_NAME = "worker"; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java index e337f59b..23dbc984 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java @@ -15,7 +15,7 @@ public enum TaskStatus { /* ******************* TaskTracker 专用 ******************* */ WAITING_DISPATCH(1, "等待调度器调度"), - DISPATCH_SUCCESS(2, "调度成功(但不保证worker收到)"), + DISPATCH_SUCCESS_WORKER_UNCHECK(2, "调度成功(但不保证worker收到)"), WORKER_PROCESSING(3, "worker开始执行"), WORKER_PROCESS_SUCCESS(4, "worker执行成功"), WORKER_PROCESS_FAILED(5, "worker执行失败"), @@ -32,7 +32,7 @@ public enum TaskStatus { public static TaskStatus of(int v) { switch (v) { case 1: return WAITING_DISPATCH; - case 2: return DISPATCH_SUCCESS; + case 2: return DISPATCH_SUCCESS_WORKER_UNCHECK; case 3: return WORKER_PROCESSING; case 4: return WORKER_PROCESS_SUCCESS; case 5: return WORKER_PROCESS_FAILED; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java index f2ff4399..351a88f0 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java @@ -22,11 +22,16 @@ public class SimpleTaskQuery { private Integer status; // 自定义的查询条件(where 后面的语句),如 crated_time > 10086 and status = 3 - private String conditionSQL; + private String queryCondition; + // 自定义的查询条件,如 GROUP BY status + private String otherCondition; + + // 查询内容,默认为 * + private String queryContent = " * "; private Integer limit; - public String getConditionSQL() { + public String getQueryCondition() { StringBuilder sb = new StringBuilder(); if (!StringUtils.isEmpty(taskId)) { sb.append("task_id = '").append(taskId).append("'").append(LINK); @@ -47,8 +52,8 @@ public class SimpleTaskQuery { sb.append("status = ").append(status).append(LINK); } - if (!StringUtils.isEmpty(conditionSQL)) { - sb.append(conditionSQL).append(LINK); + if (!StringUtils.isEmpty(queryCondition)) { + sb.append(queryCondition).append(LINK); } String substring = sb.substring(0, sb.length() - LINK.length()); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java index f140d22c..7d178dc6 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java @@ -2,6 +2,7 @@ package com.github.kfcfans.oms.worker.persistence; import java.util.Collection; import java.util.List; +import java.util.Map; /** * 任务持久化接口 @@ -31,6 +32,8 @@ public interface TaskDAO { List simpleQuery(SimpleTaskQuery query); + List> simpleQueryPlus(SimpleTaskQuery query); + boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java index 01fd8d64..e33c42d5 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java @@ -2,12 +2,13 @@ package com.github.kfcfans.oms.worker.persistence; import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import java.sql.*; import java.util.Collection; -import java.util.Collections; import java.util.List; +import java.util.Map; /** * 任务持久化实现层,表名:task_info @@ -22,7 +23,7 @@ public class TaskDAOImpl implements TaskDAO { public boolean initTable() { String delTableSQL = "drop table if exists task_info"; - String createTableSQL = "create table task_info (task_id varchar(20), instance_id varchar(20), job_id varchar(20), task_name varchar(20), task_content blob, address varchar(20), status int(11), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), unique key pkey (instance_id, task_id))"; + String createTableSQL = "create table task_info (task_id varchar(20), instance_id varchar(20), job_id varchar(20), task_name varchar(20), task_content blob, address varchar(20), status int(11), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), unique KEY pkey (instance_id, task_id))"; try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { stat.execute(delTableSQL); @@ -99,7 +100,7 @@ public class TaskDAOImpl implements TaskDAO { @Override public List simpleQuery(SimpleTaskQuery query) { ResultSet rs = null; - String sql = "select * from task_info where " + query.getConditionSQL(); + String sql = "select * from task_info where " + query.getQueryCondition(); List result = Lists.newLinkedList(); try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { rs = ps.executeQuery(); @@ -119,10 +120,43 @@ public class TaskDAOImpl implements TaskDAO { return result; } + @Override + public List> simpleQueryPlus(SimpleTaskQuery query) { + ResultSet rs = null; + String sqlFormat = "select %s from task_info where %s"; + String sql = String.format(sqlFormat, query.getQueryContent(), query.getQueryCondition()); + List> result = Lists.newLinkedList(); + try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { + rs = ps.executeQuery(); + // 原数据,包含了列名 + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + Map row = Maps.newHashMap(); + result.add(row); + + for (int i = 0; i < metaData.getColumnCount(); i++) { + String colName = metaData.getColumnName(i + 1); + Object colValue = rs.getObject(colName); + row.put(colName, colValue); + } + } + }catch (Exception e) { + log.error("[TaskDAO] simpleQuery failed(sql = {}).", sql, e); + }finally { + if (rs != null) { + try { + rs.close(); + }catch (Exception ignore) { + } + } + } + return result; + } + @Override public boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) { String sqlFormat = "update task_info set %s where %s"; - String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getConditionSQL()); + String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getQueryCondition()); try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) { return stat.execute(); }catch (Exception e) { @@ -193,6 +227,13 @@ public class TaskDAOImpl implements TaskDAO { final List res2 = taskDAO.simpleQuery(query); System.out.println(res2); + SimpleTaskQuery query3 = new SimpleTaskQuery(); + query.setInstanceId("22"); + query.setQueryContent("status, count(*) as num"); + query.setOtherCondition("GROUP BY status"); + List> dbRES = taskDAO.simpleQueryPlus(query); + System.out.println(dbRES); + Thread.sleep(100000); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index d5f458ff..0869c0b1 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -3,9 +3,11 @@ package com.github.kfcfans.oms.worker.persistence; import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.springframework.util.CollectionUtils; import java.util.List; +import java.util.Map; /** * 任务持久化服务 @@ -67,4 +69,24 @@ public class TaskPersistenceService { updateEntity.setStatus(status.getValue()); return taskDAO.simpleUpdate(condition, updateEntity); } + + /** + * 获取 TaskTracker 管理的子 task 状态统计信息 + * TaskStatus -> num + */ + public Map getTaskStatusStatistics(String instanceId) { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setQueryContent("status, count(*) as num"); + query.setOtherCondition("GROUP BY status"); + List> dbRES = taskDAO.simpleQueryPlus(query); + + Map result = Maps.newHashMap(); + dbRES.forEach(row -> { + int status = Integer.parseInt(String.valueOf(row.get("status"))); + long num = Long.parseLong(String.valueOf(row.get("num"))); + result.put(TaskStatus.of(status), num); + }); + return result; + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/WorkerMapTaskRequest.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/WorkerMapTaskRequest.java index 09660aca..c562936f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/WorkerMapTaskRequest.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/WorkerMapTaskRequest.java @@ -1,10 +1,53 @@ package com.github.kfcfans.oms.worker.pojo.request; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.github.kfcfans.oms.worker.common.constants.TaskConstant; +import com.github.kfcfans.oms.worker.sdk.TaskContext; +import com.google.common.collect.Lists; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.util.List; + /** * WorkerMapTaskRequest * * @author tjq * @since 2020/3/17 */ +@Getter +@NoArgsConstructor public class WorkerMapTaskRequest { + + private String instanceId; + private String jobId; + + private String taskName; + private List subTasks; + + @NoArgsConstructor + @AllArgsConstructor + private static class SubTask { + private String taskId; + private byte[] taskContent; + } + + public WorkerMapTaskRequest(TaskContext taskContext, List subTaskList, String taskName) { + + this.instanceId = taskContext.getInstanceId(); + this.jobId = taskContext.getJobId(); + this.taskName = taskName; + this.subTasks = Lists.newLinkedList(); + + for (int i = 0; i < subTaskList.size(); i++) { + // 不同执行线程之间,前缀(taskId)不同,该ID可以保证分布式唯一 + String subTaskId = taskContext.getTaskId() + "." + i; + // 写入类名,方便反序列化 + byte[] content = JSON.toJSONBytes(subTaskList.get(i), SerializerFeature.WriteClassName); + subTasks.add(new SubTask(subTaskId, content)); + } + + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/response/MapTaskResponse.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/response/MapTaskResponse.java new file mode 100644 index 00000000..4b28aab0 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/response/MapTaskResponse.java @@ -0,0 +1,14 @@ +package com.github.kfcfans.oms.worker.pojo.response; + +import lombok.Data; + +/** + * WorkerMapTaskRequest 的响应 + * + * @author tjq + * @since 2020/3/18 + */ +@Data +public class MapTaskResponse { + private boolean success; +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/ProcessResult.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/ProcessResult.java new file mode 100644 index 00000000..594dd788 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/ProcessResult.java @@ -0,0 +1,23 @@ +package com.github.kfcfans.oms.worker.sdk; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * processor执行结果 + * + * @author tjq + * @since 2020/3/18 + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class ProcessResult { + + private boolean success = false; + private String msg; + +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java new file mode 100644 index 00000000..8a7f2288 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/TaskContext.java @@ -0,0 +1,32 @@ +package com.github.kfcfans.oms.worker.sdk; + +import lombok.Data; + +/** + * 任务上下文 + * 概念统一,所有的worker只处理Task,Job和JobInstance的概念只存在于Server和TaskTracker + * 单机任务 -> 整个Job变成一个Task + * 广播任务 -> 整个jOb变成一堆一样的Task + * MR 任务 -> 被map出来的任务都视为根Task的子Task + * + * @author tjq + * @since 2020/3/18 + */ +@Data +public class TaskContext { + + private String jobId; + private String instanceId; + private String taskId; + private String taskName; + + private String jobParams; + private String instanceParams; + + private int maxRetryTimes; + private int currentRetryTimes; + + private Object subTask; + + private String taskTrackerAddress; +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BasicProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BasicProcessor.java new file mode 100644 index 00000000..1e65d378 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BasicProcessor.java @@ -0,0 +1,16 @@ +package com.github.kfcfans.oms.worker.sdk.api; + +import com.github.kfcfans.oms.worker.sdk.TaskContext; +import com.github.kfcfans.oms.worker.sdk.ProcessResult; + +/** + * 基础的处理器,适用于单机执行 + * + * @author tjq + * @since 2020/3/18 + */ +public interface BasicProcessor { + + ProcessResult process(TaskContext context); + +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BroadcastProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BroadcastProcessor.java new file mode 100644 index 00000000..78fe6b6a --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/BroadcastProcessor.java @@ -0,0 +1,21 @@ +package com.github.kfcfans.oms.worker.sdk.api; + +import com.github.kfcfans.oms.worker.sdk.ProcessResult; + +/** + * 广播执行处理器,适用于广播执行 + * + * @author tjq + * @since 2020/3/18 + */ +public interface BroadcastProcessor extends BasicProcessor { + + /** + * 在所有节点广播执行前执行,只会在一台机器执行一次 + */ + ProcessResult preProcess(); + /** + * 在所有节点广播执行完成后执行,只会在一台机器执行一次 + */ + ProcessResult postProcess(); +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java new file mode 100644 index 00000000..49856f68 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/sdk/api/MapReduceProcessor.java @@ -0,0 +1,75 @@ +package com.github.kfcfans.oms.worker.sdk.api; + +import akka.actor.ActorSelection; +import akka.pattern.Patterns; +import com.github.kfcfans.oms.worker.OhMyWorker; +import com.github.kfcfans.oms.worker.common.ThreadLocalStore; +import com.github.kfcfans.oms.worker.common.constants.AkkaConstant; +import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; +import com.github.kfcfans.oms.worker.pojo.request.WorkerMapTaskRequest; +import com.github.kfcfans.oms.worker.pojo.response.MapTaskResponse; +import com.github.kfcfans.oms.worker.sdk.TaskContext; +import com.github.kfcfans.oms.worker.sdk.ProcessResult; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +/** + * MapReduce执行处理器,适用于MapReduce任务 + * + * @author tjq + * @since 2020/3/18 + */ +@Slf4j +public abstract class MapReduceProcessor implements BasicProcessor { + + private static final int RECOMMEND_BATCH_SIZE = 200; + private static final int REQUEST_TIMEOUT_MS = 5000; + + /** + * 分发子任务 + * @param taskList 子任务,再次执行时可通过 TaskContext#getSubTask 获取 + * @param taskName 子任务名称,作用不大 + * @return map结果 + */ + public ProcessResult map(List taskList, String taskName) { + + if (CollectionUtils.isEmpty(taskList)) { + return new ProcessResult(false, "taskList can't be null"); + } + + if (taskList.size() > RECOMMEND_BATCH_SIZE) { + log.warn("[MapReduceProcessor] map task size is too large, network maybe overload... please try to split the tasks."); + } + + TaskContext taskContext = ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.get(); + + // 1. 构造请求 + WorkerMapTaskRequest req = new WorkerMapTaskRequest(taskContext, taskList, taskName); + + // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) + boolean requestSucceed = false; + try { + String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskContext.getTaskTrackerAddress(), AkkaConstant.Task_TRACKER_ACTOR_NAME); + ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); + CompletionStage requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS)); + MapTaskResponse respObj = (MapTaskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + requestSucceed = respObj.isSuccess(); + }catch (Exception e) { + log.warn("[MapReduceProcessor] map failed.", e); + } + + if (requestSucceed) { + return new ProcessResult(true, "MAP_SUCCESS"); + }else { + return new ProcessResult(false, "MAP_FAILED"); + } + } + + public abstract ProcessResult reduce(TaskContext taskContext, Map taskId2Result); +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/BroadcastTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/BroadcastTaskTracker.java index 6432bc1e..3904f514 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/BroadcastTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/BroadcastTaskTracker.java @@ -1,5 +1,8 @@ package com.github.kfcfans.oms.worker.tracker; +import akka.actor.ActorRef; +import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo; + /** * 广播任务使用的 TaskTracker * @@ -8,6 +11,11 @@ package com.github.kfcfans.oms.worker.tracker; */ public class BroadcastTaskTracker extends TaskTracker { + + public BroadcastTaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) { + super(jobInstanceInfo, taskTrackerActorRef); + } + @Override public void dispatch() { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/MapReduceTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/MapReduceTaskTracker.java index 3de1b3b9..bf5b16cb 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/MapReduceTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/MapReduceTaskTracker.java @@ -1,5 +1,7 @@ package com.github.kfcfans.oms.worker.tracker; +import akka.actor.ActorRef; +import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo; import com.github.kfcfans.oms.worker.pojo.request.WorkerMapTaskRequest; @@ -11,6 +13,11 @@ import com.github.kfcfans.oms.worker.pojo.request.WorkerMapTaskRequest; */ public class MapReduceTaskTracker extends StandaloneTaskTracker { + + public MapReduceTaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) { + super(jobInstanceInfo, taskTrackerActorRef); + } + public void newTask(WorkerMapTaskRequest mapRequest) { } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/StandaloneTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/StandaloneTaskTracker.java index 70770a8b..c9dc9ad7 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/StandaloneTaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/StandaloneTaskTracker.java @@ -1,5 +1,8 @@ package com.github.kfcfans.oms.worker.tracker; +import akka.actor.ActorRef; +import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo; + /** * 单机任务使用的 TaskTracker * @@ -8,6 +11,11 @@ package com.github.kfcfans.oms.worker.tracker; */ public class StandaloneTaskTracker extends TaskTracker { + + public StandaloneTaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) { + super(jobInstanceInfo, taskTrackerActorRef); + } + @Override public void dispatch() { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/TaskTracker.java index f4473da1..6e3020ab 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/TaskTracker.java @@ -3,6 +3,8 @@ package com.github.kfcfans.oms.worker.tracker; import akka.actor.ActorRef; import akka.actor.ActorSelection; import com.github.kfcfans.common.ExecuteType; +import com.github.kfcfans.common.JobInstanceStatus; +import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.common.constants.AkkaConstant; import com.github.kfcfans.oms.worker.common.constants.CommonSJ; @@ -21,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; import java.util.List; +import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -43,14 +46,7 @@ public abstract class TaskTracker { protected TaskPersistenceService taskPersistenceService; protected ScheduledExecutorService scheduledPool; - // 统计 protected AtomicBoolean finished = new AtomicBoolean(false); - protected AtomicLong needDispatchTaskNum = new AtomicLong(0); - protected AtomicLong dispatchedTaskNum = new AtomicLong(0); - protected AtomicLong waitingToRunTaskNum = new AtomicLong(0); - protected AtomicLong runningTaskNum = new AtomicLong(0); - protected AtomicLong successTaskNum = new AtomicLong(0); - protected AtomicLong failedTaskNum = new AtomicLong(0); public TaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) { @@ -62,6 +58,15 @@ public abstract class TaskTracker { this.scheduledPool = Executors.newScheduledThreadPool(2, factory); allWorkerAddress = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getAllWorkerAddress()); + + // 持久化根任务 + persistenceRootTask(); + + // 定时任务1:任务派发 + scheduledPool.scheduleWithFixedDelay(new DispatcherRunnable(), 0, 5, TimeUnit.SECONDS); + + // 定时任务2:状态检查 + scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS); } @@ -70,16 +75,17 @@ public abstract class TaskTracker { */ public abstract void dispatch(); - public void updateTaskStatus(WorkerReportTaskStatusReq statusReportRequest) { - TaskStatus taskStatus = TaskStatus.of(statusReportRequest.getStatus()); - // 持久化 - - // 更新统计数据 - switch (taskStatus) { - case RECEIVE_SUCCESS: - waitingToRunTaskNum.incrementAndGet();break; - case PROCESSING: + public void updateTaskStatus(WorkerReportTaskStatusReq req) { + TaskStatus taskStatus = TaskStatus.of(req.getStatus()); + // 持久化,失败则重试一次(本地数据库操作几乎可以认为可靠...吧...) + boolean updateResult = taskPersistenceService.updateTaskStatus(req.getInstanceId(), req.getTaskId(), taskStatus); + if (!updateResult) { + try { + Thread.sleep(100); + taskPersistenceService.updateTaskStatus(req.getInstanceId(), req.getTaskId(), taskStatus); + }catch (Exception ignore) { + } } } @@ -90,7 +96,7 @@ public abstract class TaskTracker { /** * 持久化根任务,只有完成持久化才能视为任务开始running(先持久化,再报告server) */ - private void persistenceTask() { + private void persistenceRootTask() { ExecuteType executeType = ExecuteType.valueOf(jobInstanceInfo.getExecuteType()); boolean persistenceResult; @@ -109,7 +115,6 @@ public abstract class TaskTracker { rootTask.setCreatedTime(System.currentTimeMillis()); persistenceResult = taskPersistenceService.save(rootTask); - needDispatchTaskNum.incrementAndGet(); }else { List taskList = Lists.newLinkedList(); List addrList = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getAllWorkerAddress()); @@ -128,7 +133,6 @@ public abstract class TaskTracker { taskList.add(task); } persistenceResult = taskPersistenceService.batchSave(taskList); - needDispatchTaskNum.addAndGet(taskList.size()); } if (!persistenceResult) { @@ -136,12 +140,6 @@ public abstract class TaskTracker { } } - /** - * 启动任务分发器 - */ - private void initDispatcher() { - - } public void destroy() { scheduledPool.shutdown(); @@ -171,12 +169,7 @@ public abstract class TaskTracker { targetActor.tell(req, taskTrackerActorRef); // 更新数据库(如果更新数据库失败,可能导致重复执行,先不处理) - taskPersistenceService.updateTaskStatus(task.getInstanceId(), task.getTaskId(), TaskStatus.DISPATCH_SUCCESS); - - // 更新统计数据 - needDispatchTaskNum.decrementAndGet(); - dispatchedTaskNum.incrementAndGet(); - + taskPersistenceService.updateTaskStatus(task.getInstanceId(), task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK); }catch (Exception e) { // 调度失败,不修改数据库,下次重新随机派发给 remote actor log.warn("[TaskTracker] dispatch task({}) failed.", task); @@ -193,6 +186,38 @@ public abstract class TaskTracker { @Override public void run() { + // 1. 查询统计信息 + Map status2Num = taskPersistenceService.getTaskStatusStatistics(jobInstanceInfo.getInstanceId()); + + long waitingDispatchNum = status2Num.get(TaskStatus.WAITING_DISPATCH); + long workerUnreceivedNum = status2Num.get(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK); + long receivedNum = status2Num.get(TaskStatus.RECEIVE_SUCCESS); + long succeedNum = status2Num.get(TaskStatus.WORKER_PROCESS_SUCCESS); + long failedNum = status2Num.get(TaskStatus.WORKER_PROCESS_FAILED); + + long finishedNum = succeedNum + failedNum; + long unfinishedNum = waitingDispatchNum + workerUnreceivedNum + receivedNum; + + log.debug("[TaskTracker] status check result({})", status2Num); + + TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq(); + + // 2. 如果未完成任务数为0,上报服务器 + if (unfinishedNum == 0) { + finished.set(true); + + if (failedNum == 0) { + req.setInstanceStatus(JobInstanceStatus.SUCCEED.getValue()); + }else { + req.setInstanceStatus(JobInstanceStatus.FAILED.getValue()); + } + + // 特殊处理MapReduce任务(执行reduce) + // 特殊处理广播任务任务(执行postProcess) + }else { + + } + } } }