From 415da176edcf6ab65f74088f7801dcf6e5923636 Mon Sep 17 00:00:00 2001 From: tjq Date: Tue, 17 Mar 2020 21:16:07 +0800 Subject: [PATCH] it is too difficult to guarantee the ha and reliability... --- .../github/kfcfans/oms/worker/OhMyWorker.java | 9 ++ ...RequestActor.java => JobTrackerActor.java} | 5 +- ...ckerRequestActor.java => WorkerActor.java} | 4 +- .../kfcfans/oms/worker/common/OhMyConfig.java | 7 +- .../worker/common/constants/AkkaConstant.java | 19 +++ .../worker/common/constants/TaskStatus.java | 27 +++- .../oms/worker/common/utils/AkkaUtils.java | 23 ++++ .../worker/persistence/SimpleTaskQuery.java | 12 +- .../oms/worker/persistence/TaskDAO.java | 2 + .../oms/worker/persistence/TaskDAOImpl.java | 50 ++++++-- .../oms/worker/persistence/TaskDO.java | 54 ++++++-- .../persistence/TaskPersistenceService.java | 17 +++ .../worker/pojo/model/JobInstanceInfo.java | 5 +- .../pojo/request/ServerScheduleJobReq.java | 6 + .../pojo/request/TaskTrackerStartTaskReq.java | 57 +++++++++ .../request/WorkerReportTaskStatusReq.java | 24 ++++ .../oms/worker/tracker/TaskTracker.java | 117 +++++++++++++++++- .../main/resources/oms-akka-application.conf | 14 +++ 18 files changed, 414 insertions(+), 38 deletions(-) rename oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/{ServerRequestActor.java => JobTrackerActor.java} (82%) rename oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/{TaskTrackerRequestActor.java => WorkerActor.java} (65%) create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java create mode 100644 oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/WorkerReportTaskStatusReq.java create mode 100644 oh-my-scheduler-worker/src/main/resources/oms-akka-application.conf diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java index 19a92f67..83b9810f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -1,6 +1,10 @@ package com.github.kfcfans.oms.worker; +import akka.actor.ActorSystem; +import com.github.kfcfans.oms.worker.common.OhMyConfig; import com.github.kfcfans.oms.worker.common.utils.SpringUtils; +import lombok.Getter; +import lombok.Setter; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; @@ -14,6 +18,11 @@ import org.springframework.context.ApplicationContextAware; */ public class OhMyWorker implements ApplicationContextAware, InitializingBean { + public static ActorSystem actorSystem; + @Getter + @Setter + private static OhMyConfig config; + @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringUtils.inject(applicationContext); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ServerRequestActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/JobTrackerActor.java similarity index 82% rename from oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ServerRequestActor.java rename to oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/JobTrackerActor.java index cbbdbb2c..7bd0cd50 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ServerRequestActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/JobTrackerActor.java @@ -5,14 +5,13 @@ import com.github.kfcfans.oms.worker.pojo.request.ServerScheduleJobReq; import lombok.extern.slf4j.Slf4j; /** - * 处理来自服务器的请求 - * 请求链:server -> taskTracker -> worker + * worker的master节点,处理来自server的jobInstance请求和来自worker的task请求 * * @author tjq * @since 2020/3/17 */ @Slf4j -public class ServerRequestActor extends AbstractActor { +public class JobTrackerActor extends AbstractActor { @Override public Receive createReceive() { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerRequestActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/WorkerActor.java similarity index 65% rename from oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerRequestActor.java rename to oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/WorkerActor.java index 2f0717ce..29569d09 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerRequestActor.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/WorkerActor.java @@ -3,12 +3,12 @@ package com.github.kfcfans.oms.worker.actors; import akka.actor.AbstractActor; /** - * 处理来自 TaskTracker 的请求 + * 普通计算节点,处理来自 JobTracker 的请求 * * @author tjq * @since 2020/3/17 */ -public class TaskTrackerRequestActor extends AbstractActor { +public class WorkerActor extends AbstractActor { @Override public Receive createReceive() { return null; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java index 0e888878..feaa427e 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java @@ -1,6 +1,6 @@ package com.github.kfcfans.oms.worker.common; -import java.util.Set; +import lombok.Data; /** * Worker 配置文件 @@ -8,6 +8,7 @@ import java.util.Set; * @author tjq * @since 2020/3/16 */ +@Data public class OhMyConfig { /** * 应用名称 @@ -17,4 +18,8 @@ public class OhMyConfig { * 调度服务器地址,ip:port (多值使用 , 分隔) */ private String serverAddress; + /** + * 通讯端口 + */ + private int listeningPort; } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java new file mode 100644 index 00000000..b81ce7b5 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/AkkaConstant.java @@ -0,0 +1,19 @@ +package com.github.kfcfans.oms.worker.common.constants; + +/** + * akka actor 名称 + * + * @author tjq + * @since 2020/3/17 + */ +public class AkkaConstant { + + /** + * 顶层Actor(actorSystem名称) + */ + public static final String ACTOR_SYSTEM_NAME = "oms"; + + public static final String JOB_TRACKER_ACTOR_NAME = "job_tracker"; + public static final String WORKER_ACTOR_NAME = "worker"; + +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java index cfdd886a..e337f59b 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java @@ -15,14 +15,33 @@ public enum TaskStatus { /* ******************* TaskTracker 专用 ******************* */ WAITING_DISPATCH(1, "等待调度器调度"), - DISPATCH_SUCCESS(2, "调度成功"), - DISPATCH_FAILED(3, "调度失败"), + DISPATCH_SUCCESS(2, "调度成功(但不保证worker收到)"), + WORKER_PROCESSING(3, "worker开始执行"), WORKER_PROCESS_SUCCESS(4, "worker执行成功"), - WORKER_PROCESS_FAILED(5, "worker执行失败"); - + WORKER_PROCESS_FAILED(5, "worker执行失败"), /* ******************* Worker 专用 ******************* */ + RECEIVE_SUCCESS(11, "成功接受任务但未开始执行(此时worker满载,暂时无法运行)"), + PROCESSING(12, "执行中"), + PROCESS_SUCCESS(13, "执行成功"), + PROCESS_FAILED(14, "执行失败"); private int value; private String des; + + public static TaskStatus of(int v) { + switch (v) { + case 1: return WAITING_DISPATCH; + case 2: return DISPATCH_SUCCESS; + case 3: return WORKER_PROCESSING; + case 4: return WORKER_PROCESS_SUCCESS; + case 5: return WORKER_PROCESS_FAILED; + + case 11: return RECEIVE_SUCCESS; + case 12: return PROCESSING; + case 13: return PROCESS_SUCCESS; + case 14: return PROCESS_FAILED; + } + throw new IllegalArgumentException("no TaskStatus match the value of " + v); + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java new file mode 100644 index 00000000..6099643a --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/AkkaUtils.java @@ -0,0 +1,23 @@ +package com.github.kfcfans.oms.worker.common.utils; + +import com.github.kfcfans.oms.worker.OhMyWorker; +import com.github.kfcfans.oms.worker.common.constants.AkkaConstant; + +/** + * AKKA 工具类 + * + * @author tjq + * @since 2020/3/17 + */ +public class AkkaUtils { + + /** + * akka://@:/ + */ + private static final String AKKA_REMOTE_NODE_PATH = "akka://%s@%s:%d/%s"; + + public static String getAkkaRemotePath(String ip, String actorName) { + return String.format(AKKA_REMOTE_NODE_PATH, AkkaConstant.ACTOR_SYSTEM_NAME, ip, OhMyWorker.getConfig().getListeningPort(), actorName); + } + +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java index 27665bce..f2ff4399 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java @@ -12,7 +12,6 @@ import org.springframework.util.StringUtils; @Data public class SimpleTaskQuery { - private static final String PREFIX_SQL = "select * from task_info where "; private static final String LINK = " and "; private String taskId; @@ -22,10 +21,13 @@ public class SimpleTaskQuery { private String address; private Integer status; + // 自定义的查询条件(where 后面的语句),如 crated_time > 10086 and status = 3 + private String conditionSQL; + private Integer limit; - public String getQuerySQL() { - StringBuilder sb = new StringBuilder(PREFIX_SQL); + public String getConditionSQL() { + StringBuilder sb = new StringBuilder(); if (!StringUtils.isEmpty(taskId)) { sb.append("task_id = '").append(taskId).append("'").append(LINK); } @@ -45,6 +47,10 @@ public class SimpleTaskQuery { sb.append("status = ").append(status).append(LINK); } + if (!StringUtils.isEmpty(conditionSQL)) { + sb.append(conditionSQL).append(LINK); + } + String substring = sb.substring(0, sb.length() - LINK.length()); if (limit != null) { substring = substring + " limit " + limit; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java index 77c2c51f..f140d22c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java @@ -31,4 +31,6 @@ public interface TaskDAO { List simpleQuery(SimpleTaskQuery query); + boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField); + } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java index cc90b0d4..01fd8d64 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.worker.persistence; +import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; @@ -21,7 +22,7 @@ public class TaskDAOImpl implements TaskDAO { public boolean initTable() { String delTableSQL = "drop table if exists task_info"; - String createTableSQL = "create table task_info (task_id varchar(20), instance_id varchar(20), job_id varchar(20), task_name varchar(20), task_content text, address varchar(20), status int(11), result text, created_time bigint(20), last_modified_time bigint(20), unique key pkey (instance_id, task_id))"; + String createTableSQL = "create table task_info (task_id varchar(20), instance_id varchar(20), job_id varchar(20), task_name varchar(20), task_content blob, address varchar(20), status int(11), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), unique key pkey (instance_id, task_id))"; try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { stat.execute(delTableSQL); @@ -35,7 +36,7 @@ public class TaskDAOImpl implements TaskDAO { @Override public boolean save(TaskDO task) { - String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?)"; + String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)"; try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { fillInsertPreparedStatement(task, ps); return ps.execute(); @@ -47,7 +48,7 @@ public class TaskDAOImpl implements TaskDAO { @Override public boolean batchSave(Collection tasks) { - String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?)"; + String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)"; try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { for (TaskDO task : tasks) { @@ -97,9 +98,8 @@ public class TaskDAOImpl implements TaskDAO { @Override public List simpleQuery(SimpleTaskQuery query) { - ResultSet rs = null; - String sql = query.getQuerySQL(); + String sql = "select * from task_info where " + query.getConditionSQL(); List result = Lists.newLinkedList(); try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { rs = ps.executeQuery(); @@ -119,16 +119,29 @@ public class TaskDAOImpl implements TaskDAO { return result; } + @Override + public boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) { + String sqlFormat = "update task_info set %s where %s"; + String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getConditionSQL()); + try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) { + return stat.execute(); + }catch (Exception e) { + log.error("[TaskDAO] simpleUpdate failed(sql = {}).", updateField, e); + return false; + } + } + private static TaskDO convert(ResultSet rs) throws SQLException { TaskDO task = new TaskDO(); task.setTaskId(rs.getString("task_id")); task.setInstanceId(rs.getString("instance_id")); task.setJobId(rs.getString("job_id")); task.setTaskName(rs.getString("task_name")); - task.setTaskContent(rs.getString("task_content")); + task.setTaskContent(rs.getBytes("task_content")); task.setAddress(rs.getString("address")); task.setStatus(rs.getInt("status")); task.setResult(rs.getString("result")); + task.setFailedCnt(rs.getInt("failed_cnt")); task.setCreatedTime(rs.getLong("created_time")); task.setLastModifiedTime(rs.getLong("last_modified_time")); return task; @@ -139,12 +152,13 @@ public class TaskDAOImpl implements TaskDAO { ps.setString(2, task.getInstanceId()); ps.setString(3, task.getJobId()); ps.setString(4, task.getTaskName()); - ps.setString(5, task.getTaskContent()); + ps.setBytes(5, task.getTaskContent()); ps.setString(6, task.getAddress()); ps.setInt(7, task.getStatus()); ps.setString(8, task.getResult()); - ps.setLong(9, task.getCreatedTime()); - ps.setLong(10, task.getLastModifiedTime()); + ps.setInt(9, task.getFailedCnt()); + ps.setLong(10, task.getCreatedTime()); + ps.setLong(11, task.getLastModifiedTime()); } public static void main(String[] args) throws Exception { @@ -156,14 +170,28 @@ public class TaskDAOImpl implements TaskDAO { taskDO.setInstanceId("22"); taskDO.setTaskId("2.1"); taskDO.setTaskName("zzz"); - taskDO.setTaskContent("hhhh"); + taskDO.setTaskContent("hhhh".getBytes()); + taskDO.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); + taskDO.setLastModifiedTime(System.currentTimeMillis()); + taskDO.setCreatedTime(System.currentTimeMillis()); + taskDO.setFailedCnt(0); taskDAO.save(taskDO); SimpleTaskQuery query = new SimpleTaskQuery(); query.setInstanceId("22"); query.setTaskId("2.1"); - System.out.println(taskDAO.simpleQuery(query)); + final List res = taskDAO.simpleQuery(query); + System.out.println(res); + System.out.println(new String(res.get(0).getTaskContent())); + + // update + TaskDO update = new TaskDO(); + update.setFailedCnt(8); + taskDAO.simpleUpdate(query, update); + + final List res2 = taskDAO.simpleQuery(query); + System.out.println(res2); Thread.sleep(100000); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java index 6a84e136..167902f2 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java @@ -1,6 +1,8 @@ package com.github.kfcfans.oms.worker.persistence; -import lombok.Data; +import lombok.Getter; +import lombok.Setter; +import org.springframework.util.StringUtils; /** * TaskDO(为了简化 DAO 层,一张表实现两种功能) @@ -10,7 +12,8 @@ import lombok.Data; * @author tjq * @since 2020/3/17 */ -@Data +@Getter +@Setter public class TaskDO { // 层次命名法,可以表示 Map 后的父子关系,如 0.1.2 代表 rootTask map 的第一个 task map 的第二个 task @@ -20,16 +23,53 @@ public class TaskDO { private String instanceId; // 任务名称 private String taskName; - // 任务参数 - private String taskContent; + // 任务对象(序列化后的二进制数据) + private byte[] taskContent; // 对于JobTracker为workerAddress,对于普通Worker为jobTrackerAddress private String address; // 任务状态,0~10代表 JobTracker 使用,11~20代表普通Worker使用 - private int status; + private Integer status; // 执行结果 private String result; + // 失败次数 + private Integer failedCnt; // 创建时间 - private long createdTime; + private Long createdTime; // 最后修改时间 - private long lastModifiedTime; + private Long lastModifiedTime; + + public String getUpdateSQL() { + StringBuilder sb = new StringBuilder(); + if (!StringUtils.isEmpty(address)) { + sb.append(" address = '").append(address).append("',"); + } + if (status != null) { + sb.append(" status = ").append(status).append(","); + } + if (!StringUtils.isEmpty(result)) { + sb.append(" result = '").append(result).append("',"); + } + if (failedCnt != null) { + sb.append(" failed_cnt = ").append(failedCnt).append(","); + } + sb.append(" last_modified_time = ").append(lastModifiedTime); + return sb.toString(); + } + + @Override + public String toString() { + return "TaskDO{" + + "taskId='" + taskId + '\'' + + ", jobId='" + jobId + '\'' + + ", instanceId='" + instanceId + '\'' + + ", taskName='" + taskName + '\'' + + ", taskContent=" + new String(taskContent) + + ", address='" + address + '\'' + + ", status=" + status + + ", result='" + result + '\'' + + ", failedCnt=" + failedCnt + + ", createdTime=" + createdTime + + ", lastModifiedTime=" + lastModifiedTime + + '}'; + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java index f3cd1c6e..d5f458ff 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -15,6 +15,11 @@ import java.util.List; */ public class TaskPersistenceService { + public static TaskPersistenceService INSTANCE = new TaskPersistenceService(); + + private TaskPersistenceService() { + } + private TaskDAO taskDAO = new TaskDAOImpl(); private static final int MAX_BATCH_SIZE = 50; @@ -50,4 +55,16 @@ public class TaskPersistenceService { query.setLimit(100); return taskDAO.simpleQuery(query); } + + /** + * 更新 Task 的状态 + */ + public boolean updateTaskStatus(String instanceId, String taskId, TaskStatus status) { + SimpleTaskQuery condition = new SimpleTaskQuery(); + condition.setInstanceId(instanceId); + condition.setTaskId(taskId); + TaskDO updateEntity = new TaskDO(); + updateEntity.setStatus(status.getValue()); + return taskDAO.simpleUpdate(condition, updateEntity); + } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/JobInstanceInfo.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/JobInstanceInfo.java index 0ba2a0c7..bc7a601f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/JobInstanceInfo.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/JobInstanceInfo.java @@ -22,7 +22,10 @@ public class JobInstanceInfo { // 任务执行时间限制,单位毫秒 private long timeLimit; // 可用处理器地址,可能多值,逗号分隔 - private String workerAddress; + private String allWorkerAddress; + + private String jobParams; + private String instanceParams; /* *********************** Map/MapReduce 任务专用 *********************** */ diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ServerScheduleJobReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ServerScheduleJobReq.java index 9bbdfba9..f9cf1652 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ServerScheduleJobReq.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ServerScheduleJobReq.java @@ -1,11 +1,14 @@ package com.github.kfcfans.oms.worker.pojo.request; +import lombok.Data; + /** * 服务端调度任务请求(一次任务处理的入口) * * @author tjq * @since 2020/3/17 */ +@Data public class ServerScheduleJobReq { // 调度的服务器地址,默认通讯目标 @@ -26,6 +29,9 @@ public class ServerScheduleJobReq { // 可用处理器地址,可能多值,逗号分隔 private String workerAddress; + private String jobParams; + private String instanceParams; + /* *********************** Map/MapReduce 任务专用 *********************** */ // 每台机器的处理线程数上限 diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java new file mode 100644 index 00000000..91a18ef8 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/TaskTrackerStartTaskReq.java @@ -0,0 +1,57 @@ +package com.github.kfcfans.oms.worker.pojo.request; + +import com.github.kfcfans.oms.worker.common.utils.NetUtils; +import com.github.kfcfans.oms.worker.persistence.TaskDO; +import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * JobTracker 派发 task 进行执行 + * + * @author tjq + * @since 2020/3/17 + */ +@Data +@NoArgsConstructor +public class TaskTrackerStartTaskReq { + + private String jobId; + private String instanceId; + // 处理器类型(JavaBean、Jar、脚本等) + private String processorType; + // 处理器信息 + private String processorInfo; + // 并发计算线程数 + private int threadConcurrency; + // JobTracker 地址 + private String jobTrackerAddress; + + private String jobParams; + private String instanceParams; + + private String taskName; + private byte[] taskContent; + // 子任务允许的重试次数 + private int taskRetryNum; + // 子任务当前重试次数 + private int currentRetryNum; + + public TaskTrackerStartTaskReq(JobInstanceInfo instanceInfo, TaskDO task) { + jobId = instanceInfo.getJobId(); + instanceId = instanceInfo.getInstanceId(); + processorType = instanceInfo.getProcessorType(); + processorInfo = instanceInfo.getProcessorInfo(); + threadConcurrency = instanceInfo.getThreadConcurrency(); + jobTrackerAddress = NetUtils.getLocalHost(); + + jobParams = instanceInfo.getJobParams(); + instanceParams = instanceInfo.getInstanceParams(); + + taskName = task.getTaskName(); + taskContent = task.getTaskContent(); + + taskRetryNum = instanceInfo.getTaskRetryNum(); + currentRetryNum = task.getFailedCnt(); + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/WorkerReportTaskStatusReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/WorkerReportTaskStatusReq.java new file mode 100644 index 00000000..c69b9004 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/WorkerReportTaskStatusReq.java @@ -0,0 +1,24 @@ +package com.github.kfcfans.oms.worker.pojo.request; + +import lombok.Data; + +/** + * worker 上报 task 执行情况 + * + * @author tjq + * @since 2020/3/17 + */ +@Data +public class WorkerReportTaskStatusReq { + + private String jobId; + private String instanceId; + private String taskId; + + private int status; + /** + * 执行完成时才有 + */ + private String result; + +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/TaskTracker.java index 5a91d942..f4473da1 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/TaskTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/TaskTracker.java @@ -1,16 +1,29 @@ package com.github.kfcfans.oms.worker.tracker; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import com.github.kfcfans.common.ExecuteType; +import com.github.kfcfans.oms.worker.OhMyWorker; +import com.github.kfcfans.oms.worker.common.constants.AkkaConstant; import com.github.kfcfans.oms.worker.common.constants.CommonSJ; import com.github.kfcfans.oms.worker.common.constants.TaskConstant; +import com.github.kfcfans.oms.worker.common.constants.TaskStatus; +import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; import com.github.kfcfans.oms.worker.common.utils.NetUtils; import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo; +import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq; +import com.github.kfcfans.oms.worker.pojo.request.WorkerReportTaskStatusReq; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StringUtils; import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** * 负责管理 JobInstance 的运行,主要包括任务的派发(MR可能存在大量的任务)和状态的更新 @@ -18,13 +31,38 @@ import java.util.List; * @author tjq * @since 2020/3/17 */ +@Slf4j public abstract class TaskTracker { // 任务实例信息 protected JobInstanceInfo jobInstanceInfo; - protected ActorRef actor; + protected ActorRef taskTrackerActorRef; + + protected List allWorkerAddress; protected TaskPersistenceService taskPersistenceService; + protected ScheduledExecutorService scheduledPool; + + // 统计 + protected AtomicBoolean finished = new AtomicBoolean(false); + protected AtomicLong needDispatchTaskNum = new AtomicLong(0); + protected AtomicLong dispatchedTaskNum = new AtomicLong(0); + protected AtomicLong waitingToRunTaskNum = new AtomicLong(0); + protected AtomicLong runningTaskNum = new AtomicLong(0); + protected AtomicLong successTaskNum = new AtomicLong(0); + protected AtomicLong failedTaskNum = new AtomicLong(0); + + public TaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) { + + this.jobInstanceInfo = jobInstanceInfo; + this.taskTrackerActorRef = taskTrackerActorRef; + this.taskPersistenceService = TaskPersistenceService.INSTANCE; + + ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("TaskTrackerTimingPool-%s").build(); + this.scheduledPool = Executors.newScheduledThreadPool(2, factory); + + allWorkerAddress = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getAllWorkerAddress()); + } /** @@ -32,11 +70,21 @@ public abstract class TaskTracker { */ public abstract void dispatch(); - public void updateTaskStatus() { + public void updateTaskStatus(WorkerReportTaskStatusReq statusReportRequest) { + TaskStatus taskStatus = TaskStatus.of(statusReportRequest.getStatus()); + // 持久化 + + // 更新统计数据 + switch (taskStatus) { + case RECEIVE_SUCCESS: + waitingToRunTaskNum.incrementAndGet();break; + case PROCESSING: + + } } public boolean finished() { - return false; + return finished.get(); } /** @@ -50,26 +98,29 @@ public abstract class TaskTracker { // 单机、MR模型下,根任务模型本机直接执行(JobTracker一般为负载最小的机器,且MR的根任务通常伴随着 map 操作,本机执行可以有效减少网络I/O开销) if (executeType != ExecuteType.BROADCAST) { TaskDO rootTask = new TaskDO(); - rootTask.setStatus(1); + rootTask.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); rootTask.setJobId(jobInstanceInfo.getJobId()); rootTask.setInstanceId(jobInstanceInfo.getInstanceId()); rootTask.setTaskId(TaskConstant.ROOT_TASK_ID); + rootTask.setFailedCnt(0); rootTask.setAddress(NetUtils.getLocalHost()); rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME); rootTask.setCreatedTime(System.currentTimeMillis()); rootTask.setCreatedTime(System.currentTimeMillis()); persistenceResult = taskPersistenceService.save(rootTask); + needDispatchTaskNum.incrementAndGet(); }else { List taskList = Lists.newLinkedList(); - List addrList = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getWorkerAddress()); + List addrList = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getAllWorkerAddress()); for (int i = 0; i < addrList.size(); i++) { TaskDO task = new TaskDO(); - task.setStatus(1); + task.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); task.setJobId(jobInstanceInfo.getJobId()); task.setInstanceId(jobInstanceInfo.getInstanceId()); task.setTaskId(String.valueOf(i)); task.setAddress(addrList.get(i)); + task.setFailedCnt(0); task.setTaskName(TaskConstant.ROOT_TASK_NAME); task.setCreatedTime(System.currentTimeMillis()); task.setCreatedTime(System.currentTimeMillis()); @@ -77,6 +128,7 @@ public abstract class TaskTracker { taskList.add(task); } persistenceResult = taskPersistenceService.batchSave(taskList); + needDispatchTaskNum.addAndGet(taskList.size()); } if (!persistenceResult) { @@ -90,4 +142,57 @@ public abstract class TaskTracker { private void initDispatcher() { } + + public void destroy() { + scheduledPool.shutdown(); + } + + /** + * 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去 + */ + private class DispatcherRunnable implements Runnable { + + @Override + public void run() { + taskPersistenceService.getNeedDispatchTask(jobInstanceInfo.getInstanceId()).forEach(task -> { + try { + // 构造 worker 执行请求 + TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq(jobInstanceInfo, task); + + // 构造 akka 可访问节点路径 + String targetIP = task.getAddress(); + if (StringUtils.isEmpty(targetIP)) { + targetIP = allWorkerAddress.get(ThreadLocalRandom.current().nextInt(allWorkerAddress.size())); + } + String targetPath = AkkaUtils.getAkkaRemotePath(targetIP, AkkaConstant.WORKER_ACTOR_NAME); + ActorSelection targetActor = OhMyWorker.actorSystem.actorSelection(targetPath); + + // 发送请求(Akka的tell是至少投递一次,经实验表明无法投递消息也不会报错...印度啊...) + targetActor.tell(req, taskTrackerActorRef); + + // 更新数据库(如果更新数据库失败,可能导致重复执行,先不处理) + taskPersistenceService.updateTaskStatus(task.getInstanceId(), task.getTaskId(), TaskStatus.DISPATCH_SUCCESS); + + // 更新统计数据 + needDispatchTaskNum.decrementAndGet(); + dispatchedTaskNum.incrementAndGet(); + + }catch (Exception e) { + // 调度失败,不修改数据库,下次重新随机派发给 remote actor + log.warn("[TaskTracker] dispatch task({}) failed.", task); + } + }); + } + } + + /** + * 定时检查当前任务的执行状态 + */ + private class StatusCheckRunnable implements Runnable { + + @Override + public void run() { + + } + } } diff --git a/oh-my-scheduler-worker/src/main/resources/oms-akka-application.conf b/oh-my-scheduler-worker/src/main/resources/oms-akka-application.conf new file mode 100644 index 00000000..7972a078 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/resources/oms-akka-application.conf @@ -0,0 +1,14 @@ +akka { + actor { + # cluster is better(recommend by official document), but I prefer remote + provider = remote + } + remote { + artery { + transport = tcp # See Selecting a transport below + # over write by code + canonical.hostname = "127.0.0.1" + canonical.port = 25520 + } + } +} \ No newline at end of file