it is too difficult to guarantee the ha and reliability...

This commit is contained in:
tjq 2020-03-17 21:16:07 +08:00
parent f74a87772a
commit 415da176ed
18 changed files with 414 additions and 38 deletions

View File

@ -1,6 +1,10 @@
package com.github.kfcfans.oms.worker;
import akka.actor.ActorSystem;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
@ -14,6 +18,11 @@ import org.springframework.context.ApplicationContextAware;
*/
public class OhMyWorker implements ApplicationContextAware, InitializingBean {
public static ActorSystem actorSystem;
@Getter
@Setter
private static OhMyConfig config;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtils.inject(applicationContext);

View File

@ -5,14 +5,13 @@ import com.github.kfcfans.oms.worker.pojo.request.ServerScheduleJobReq;
import lombok.extern.slf4j.Slf4j;
/**
* 处理来自服务器的请求
* 请求链server -> taskTracker -> worker
* worker的master节点处理来自server的jobInstance请求和来自worker的task请求
*
* @author tjq
* @since 2020/3/17
*/
@Slf4j
public class ServerRequestActor extends AbstractActor {
public class JobTrackerActor extends AbstractActor {
@Override
public Receive createReceive() {

View File

@ -3,12 +3,12 @@ package com.github.kfcfans.oms.worker.actors;
import akka.actor.AbstractActor;
/**
* 处理来自 TaskTracker 的请求
* 普通计算节点处理来自 JobTracker 的请求
*
* @author tjq
* @since 2020/3/17
*/
public class TaskTrackerRequestActor extends AbstractActor {
public class WorkerActor extends AbstractActor {
@Override
public Receive createReceive() {
return null;

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.oms.worker.common;
import java.util.Set;
import lombok.Data;
/**
* Worker 配置文件
@ -8,6 +8,7 @@ import java.util.Set;
* @author tjq
* @since 2020/3/16
*/
@Data
public class OhMyConfig {
/**
* 应用名称
@ -17,4 +18,8 @@ public class OhMyConfig {
* 调度服务器地址ip:port 多值使用 , 分隔
*/
private String serverAddress;
/**
* 通讯端口
*/
private int listeningPort;
}

View File

@ -0,0 +1,19 @@
package com.github.kfcfans.oms.worker.common.constants;
/**
* akka actor 名称
*
* @author tjq
* @since 2020/3/17
*/
public class AkkaConstant {
/**
* 顶层ActoractorSystem名称
*/
public static final String ACTOR_SYSTEM_NAME = "oms";
public static final String JOB_TRACKER_ACTOR_NAME = "job_tracker";
public static final String WORKER_ACTOR_NAME = "worker";
}

View File

@ -15,14 +15,33 @@ public enum TaskStatus {
/* ******************* TaskTracker 专用 ******************* */
WAITING_DISPATCH(1, "等待调度器调度"),
DISPATCH_SUCCESS(2, "调度成功"),
DISPATCH_FAILED(3, "调度失败"),
DISPATCH_SUCCESS(2, "调度成功但不保证worker收到"),
WORKER_PROCESSING(3, "worker开始执行"),
WORKER_PROCESS_SUCCESS(4, "worker执行成功"),
WORKER_PROCESS_FAILED(5, "worker执行失败");
WORKER_PROCESS_FAILED(5, "worker执行失败"),
/* ******************* Worker 专用 ******************* */
RECEIVE_SUCCESS(11, "成功接受任务但未开始执行此时worker满载暂时无法运行"),
PROCESSING(12, "执行中"),
PROCESS_SUCCESS(13, "执行成功"),
PROCESS_FAILED(14, "执行失败");
private int value;
private String des;
public static TaskStatus of(int v) {
switch (v) {
case 1: return WAITING_DISPATCH;
case 2: return DISPATCH_SUCCESS;
case 3: return WORKER_PROCESSING;
case 4: return WORKER_PROCESS_SUCCESS;
case 5: return WORKER_PROCESS_FAILED;
case 11: return RECEIVE_SUCCESS;
case 12: return PROCESSING;
case 13: return PROCESS_SUCCESS;
case 14: return PROCESS_FAILED;
}
throw new IllegalArgumentException("no TaskStatus match the value of " + v);
}
}

View File

@ -0,0 +1,23 @@
package com.github.kfcfans.oms.worker.common.utils;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.constants.AkkaConstant;
/**
* AKKA 工具类
*
* @author tjq
* @since 2020/3/17
*/
public class AkkaUtils {
/**
* akka://<actor system>@<hostname>:<port>/<actor path>
*/
private static final String AKKA_REMOTE_NODE_PATH = "akka://%s@%s:%d/%s";
public static String getAkkaRemotePath(String ip, String actorName) {
return String.format(AKKA_REMOTE_NODE_PATH, AkkaConstant.ACTOR_SYSTEM_NAME, ip, OhMyWorker.getConfig().getListeningPort(), actorName);
}
}

View File

@ -12,7 +12,6 @@ import org.springframework.util.StringUtils;
@Data
public class SimpleTaskQuery {
private static final String PREFIX_SQL = "select * from task_info where ";
private static final String LINK = " and ";
private String taskId;
@ -22,10 +21,13 @@ public class SimpleTaskQuery {
private String address;
private Integer status;
// 自定义的查询条件where 后面的语句 crated_time > 10086 and status = 3
private String conditionSQL;
private Integer limit;
public String getQuerySQL() {
StringBuilder sb = new StringBuilder(PREFIX_SQL);
public String getConditionSQL() {
StringBuilder sb = new StringBuilder();
if (!StringUtils.isEmpty(taskId)) {
sb.append("task_id = '").append(taskId).append("'").append(LINK);
}
@ -45,6 +47,10 @@ public class SimpleTaskQuery {
sb.append("status = ").append(status).append(LINK);
}
if (!StringUtils.isEmpty(conditionSQL)) {
sb.append(conditionSQL).append(LINK);
}
String substring = sb.substring(0, sb.length() - LINK.length());
if (limit != null) {
substring = substring + " limit " + limit;

View File

@ -31,4 +31,6 @@ public interface TaskDAO {
List<TaskDO> simpleQuery(SimpleTaskQuery query);
boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField);
}

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.worker.persistence;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
@ -21,7 +22,7 @@ public class TaskDAOImpl implements TaskDAO {
public boolean initTable() {
String delTableSQL = "drop table if exists task_info";
String createTableSQL = "create table task_info (task_id varchar(20), instance_id varchar(20), job_id varchar(20), task_name varchar(20), task_content text, address varchar(20), status int(11), result text, created_time bigint(20), last_modified_time bigint(20), unique key pkey (instance_id, task_id))";
String createTableSQL = "create table task_info (task_id varchar(20), instance_id varchar(20), job_id varchar(20), task_name varchar(20), task_content blob, address varchar(20), status int(11), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), unique key pkey (instance_id, task_id))";
try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) {
stat.execute(delTableSQL);
@ -35,7 +36,7 @@ public class TaskDAOImpl implements TaskDAO {
@Override
public boolean save(TaskDO task) {
String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?)";
String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) {
fillInsertPreparedStatement(task, ps);
return ps.execute();
@ -47,7 +48,7 @@ public class TaskDAOImpl implements TaskDAO {
@Override
public boolean batchSave(Collection<TaskDO> tasks) {
String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?)";
String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) {
for (TaskDO task : tasks) {
@ -97,9 +98,8 @@ public class TaskDAOImpl implements TaskDAO {
@Override
public List<TaskDO> simpleQuery(SimpleTaskQuery query) {
ResultSet rs = null;
String sql = query.getQuerySQL();
String sql = "select * from task_info where " + query.getConditionSQL();
List<TaskDO> result = Lists.newLinkedList();
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
rs = ps.executeQuery();
@ -119,16 +119,29 @@ public class TaskDAOImpl implements TaskDAO {
return result;
}
@Override
public boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) {
String sqlFormat = "update task_info set %s where %s";
String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getConditionSQL());
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) {
return stat.execute();
}catch (Exception e) {
log.error("[TaskDAO] simpleUpdate failed(sql = {}).", updateField, e);
return false;
}
}
private static TaskDO convert(ResultSet rs) throws SQLException {
TaskDO task = new TaskDO();
task.setTaskId(rs.getString("task_id"));
task.setInstanceId(rs.getString("instance_id"));
task.setJobId(rs.getString("job_id"));
task.setTaskName(rs.getString("task_name"));
task.setTaskContent(rs.getString("task_content"));
task.setTaskContent(rs.getBytes("task_content"));
task.setAddress(rs.getString("address"));
task.setStatus(rs.getInt("status"));
task.setResult(rs.getString("result"));
task.setFailedCnt(rs.getInt("failed_cnt"));
task.setCreatedTime(rs.getLong("created_time"));
task.setLastModifiedTime(rs.getLong("last_modified_time"));
return task;
@ -139,12 +152,13 @@ public class TaskDAOImpl implements TaskDAO {
ps.setString(2, task.getInstanceId());
ps.setString(3, task.getJobId());
ps.setString(4, task.getTaskName());
ps.setString(5, task.getTaskContent());
ps.setBytes(5, task.getTaskContent());
ps.setString(6, task.getAddress());
ps.setInt(7, task.getStatus());
ps.setString(8, task.getResult());
ps.setLong(9, task.getCreatedTime());
ps.setLong(10, task.getLastModifiedTime());
ps.setInt(9, task.getFailedCnt());
ps.setLong(10, task.getCreatedTime());
ps.setLong(11, task.getLastModifiedTime());
}
public static void main(String[] args) throws Exception {
@ -156,14 +170,28 @@ public class TaskDAOImpl implements TaskDAO {
taskDO.setInstanceId("22");
taskDO.setTaskId("2.1");
taskDO.setTaskName("zzz");
taskDO.setTaskContent("hhhh");
taskDO.setTaskContent("hhhh".getBytes());
taskDO.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
taskDO.setLastModifiedTime(System.currentTimeMillis());
taskDO.setCreatedTime(System.currentTimeMillis());
taskDO.setFailedCnt(0);
taskDAO.save(taskDO);
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId("22");
query.setTaskId("2.1");
System.out.println(taskDAO.simpleQuery(query));
final List<TaskDO> res = taskDAO.simpleQuery(query);
System.out.println(res);
System.out.println(new String(res.get(0).getTaskContent()));
// update
TaskDO update = new TaskDO();
update.setFailedCnt(8);
taskDAO.simpleUpdate(query, update);
final List<TaskDO> res2 = taskDAO.simpleQuery(query);
System.out.println(res2);
Thread.sleep(100000);
}

View File

@ -1,6 +1,8 @@
package com.github.kfcfans.oms.worker.persistence;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.springframework.util.StringUtils;
/**
* TaskDO为了简化 DAO 一张表实现两种功能
@ -10,7 +12,8 @@ import lombok.Data;
* @author tjq
* @since 2020/3/17
*/
@Data
@Getter
@Setter
public class TaskDO {
// 层次命名法可以表示 Map 后的父子关系 0.1.2 代表 rootTask map 的第一个 task map 的第二个 task
@ -20,16 +23,53 @@ public class TaskDO {
private String instanceId;
// 任务名称
private String taskName;
// 任务参数
private String taskContent;
// 任务对象序列化后的二进制数据
private byte[] taskContent;
// 对于JobTracker为workerAddress对于普通Worker为jobTrackerAddress
private String address;
// 任务状态010代表 JobTracker 使用1120代表普通Worker使用
private int status;
private Integer status;
// 执行结果
private String result;
// 失败次数
private Integer failedCnt;
// 创建时间
private long createdTime;
private Long createdTime;
// 最后修改时间
private long lastModifiedTime;
private Long lastModifiedTime;
public String getUpdateSQL() {
StringBuilder sb = new StringBuilder();
if (!StringUtils.isEmpty(address)) {
sb.append(" address = '").append(address).append("',");
}
if (status != null) {
sb.append(" status = ").append(status).append(",");
}
if (!StringUtils.isEmpty(result)) {
sb.append(" result = '").append(result).append("',");
}
if (failedCnt != null) {
sb.append(" failed_cnt = ").append(failedCnt).append(",");
}
sb.append(" last_modified_time = ").append(lastModifiedTime);
return sb.toString();
}
@Override
public String toString() {
return "TaskDO{" +
"taskId='" + taskId + '\'' +
", jobId='" + jobId + '\'' +
", instanceId='" + instanceId + '\'' +
", taskName='" + taskName + '\'' +
", taskContent=" + new String(taskContent) +
", address='" + address + '\'' +
", status=" + status +
", result='" + result + '\'' +
", failedCnt=" + failedCnt +
", createdTime=" + createdTime +
", lastModifiedTime=" + lastModifiedTime +
'}';
}
}

View File

@ -15,6 +15,11 @@ import java.util.List;
*/
public class TaskPersistenceService {
public static TaskPersistenceService INSTANCE = new TaskPersistenceService();
private TaskPersistenceService() {
}
private TaskDAO taskDAO = new TaskDAOImpl();
private static final int MAX_BATCH_SIZE = 50;
@ -50,4 +55,16 @@ public class TaskPersistenceService {
query.setLimit(100);
return taskDAO.simpleQuery(query);
}
/**
* 更新 Task 的状态
*/
public boolean updateTaskStatus(String instanceId, String taskId, TaskStatus status) {
SimpleTaskQuery condition = new SimpleTaskQuery();
condition.setInstanceId(instanceId);
condition.setTaskId(taskId);
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(status.getValue());
return taskDAO.simpleUpdate(condition, updateEntity);
}
}

View File

@ -22,7 +22,10 @@ public class JobInstanceInfo {
// 任务执行时间限制单位毫秒
private long timeLimit;
// 可用处理器地址可能多值逗号分隔
private String workerAddress;
private String allWorkerAddress;
private String jobParams;
private String instanceParams;
/* *********************** Map/MapReduce 任务专用 *********************** */

View File

@ -1,11 +1,14 @@
package com.github.kfcfans.oms.worker.pojo.request;
import lombok.Data;
/**
* 服务端调度任务请求一次任务处理的入口
*
* @author tjq
* @since 2020/3/17
*/
@Data
public class ServerScheduleJobReq {
// 调度的服务器地址默认通讯目标
@ -26,6 +29,9 @@ public class ServerScheduleJobReq {
// 可用处理器地址可能多值逗号分隔
private String workerAddress;
private String jobParams;
private String instanceParams;
/* *********************** Map/MapReduce 任务专用 *********************** */
// 每台机器的处理线程数上限

View File

@ -0,0 +1,57 @@
package com.github.kfcfans.oms.worker.pojo.request;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* JobTracker 派发 task 进行执行
*
* @author tjq
* @since 2020/3/17
*/
@Data
@NoArgsConstructor
public class TaskTrackerStartTaskReq {
private String jobId;
private String instanceId;
// 处理器类型JavaBeanJar脚本等
private String processorType;
// 处理器信息
private String processorInfo;
// 并发计算线程数
private int threadConcurrency;
// JobTracker 地址
private String jobTrackerAddress;
private String jobParams;
private String instanceParams;
private String taskName;
private byte[] taskContent;
// 子任务允许的重试次数
private int taskRetryNum;
// 子任务当前重试次数
private int currentRetryNum;
public TaskTrackerStartTaskReq(JobInstanceInfo instanceInfo, TaskDO task) {
jobId = instanceInfo.getJobId();
instanceId = instanceInfo.getInstanceId();
processorType = instanceInfo.getProcessorType();
processorInfo = instanceInfo.getProcessorInfo();
threadConcurrency = instanceInfo.getThreadConcurrency();
jobTrackerAddress = NetUtils.getLocalHost();
jobParams = instanceInfo.getJobParams();
instanceParams = instanceInfo.getInstanceParams();
taskName = task.getTaskName();
taskContent = task.getTaskContent();
taskRetryNum = instanceInfo.getTaskRetryNum();
currentRetryNum = task.getFailedCnt();
}
}

View File

@ -0,0 +1,24 @@
package com.github.kfcfans.oms.worker.pojo.request;
import lombok.Data;
/**
* worker 上报 task 执行情况
*
* @author tjq
* @since 2020/3/17
*/
@Data
public class WorkerReportTaskStatusReq {
private String jobId;
private String instanceId;
private String taskId;
private int status;
/**
* 执行完成时才有
*/
private String result;
}

View File

@ -1,16 +1,29 @@
package com.github.kfcfans.oms.worker.tracker;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.constants.AkkaConstant;
import com.github.kfcfans.oms.worker.common.constants.CommonSJ;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.oms.worker.pojo.request.WorkerReportTaskStatusReq;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* 负责管理 JobInstance 的运行主要包括任务的派发MR可能存在大量的任务和状态的更新
@ -18,13 +31,38 @@ import java.util.List;
* @author tjq
* @since 2020/3/17
*/
@Slf4j
public abstract class TaskTracker {
// 任务实例信息
protected JobInstanceInfo jobInstanceInfo;
protected ActorRef actor;
protected ActorRef taskTrackerActorRef;
protected List<String> allWorkerAddress;
protected TaskPersistenceService taskPersistenceService;
protected ScheduledExecutorService scheduledPool;
// 统计
protected AtomicBoolean finished = new AtomicBoolean(false);
protected AtomicLong needDispatchTaskNum = new AtomicLong(0);
protected AtomicLong dispatchedTaskNum = new AtomicLong(0);
protected AtomicLong waitingToRunTaskNum = new AtomicLong(0);
protected AtomicLong runningTaskNum = new AtomicLong(0);
protected AtomicLong successTaskNum = new AtomicLong(0);
protected AtomicLong failedTaskNum = new AtomicLong(0);
public TaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) {
this.jobInstanceInfo = jobInstanceInfo;
this.taskTrackerActorRef = taskTrackerActorRef;
this.taskPersistenceService = TaskPersistenceService.INSTANCE;
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("TaskTrackerTimingPool-%s").build();
this.scheduledPool = Executors.newScheduledThreadPool(2, factory);
allWorkerAddress = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getAllWorkerAddress());
}
/**
@ -32,11 +70,21 @@ public abstract class TaskTracker {
*/
public abstract void dispatch();
public void updateTaskStatus() {
public void updateTaskStatus(WorkerReportTaskStatusReq statusReportRequest) {
TaskStatus taskStatus = TaskStatus.of(statusReportRequest.getStatus());
// 持久化
// 更新统计数据
switch (taskStatus) {
case RECEIVE_SUCCESS:
waitingToRunTaskNum.incrementAndGet();break;
case PROCESSING:
}
}
public boolean finished() {
return false;
return finished.get();
}
/**
@ -50,26 +98,29 @@ public abstract class TaskTracker {
// 单机MR模型下根任务模型本机直接执行JobTracker一般为负载最小的机器且MR的根任务通常伴随着 map 操作本机执行可以有效减少网络I/O开销
if (executeType != ExecuteType.BROADCAST) {
TaskDO rootTask = new TaskDO();
rootTask.setStatus(1);
rootTask.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
rootTask.setJobId(jobInstanceInfo.getJobId());
rootTask.setInstanceId(jobInstanceInfo.getInstanceId());
rootTask.setTaskId(TaskConstant.ROOT_TASK_ID);
rootTask.setFailedCnt(0);
rootTask.setAddress(NetUtils.getLocalHost());
rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME);
rootTask.setCreatedTime(System.currentTimeMillis());
rootTask.setCreatedTime(System.currentTimeMillis());
persistenceResult = taskPersistenceService.save(rootTask);
needDispatchTaskNum.incrementAndGet();
}else {
List<TaskDO> taskList = Lists.newLinkedList();
List<String> addrList = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getWorkerAddress());
List<String> addrList = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getAllWorkerAddress());
for (int i = 0; i < addrList.size(); i++) {
TaskDO task = new TaskDO();
task.setStatus(1);
task.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
task.setJobId(jobInstanceInfo.getJobId());
task.setInstanceId(jobInstanceInfo.getInstanceId());
task.setTaskId(String.valueOf(i));
task.setAddress(addrList.get(i));
task.setFailedCnt(0);
task.setTaskName(TaskConstant.ROOT_TASK_NAME);
task.setCreatedTime(System.currentTimeMillis());
task.setCreatedTime(System.currentTimeMillis());
@ -77,6 +128,7 @@ public abstract class TaskTracker {
taskList.add(task);
}
persistenceResult = taskPersistenceService.batchSave(taskList);
needDispatchTaskNum.addAndGet(taskList.size());
}
if (!persistenceResult) {
@ -90,4 +142,57 @@ public abstract class TaskTracker {
private void initDispatcher() {
}
public void destroy() {
scheduledPool.shutdown();
}
/**
* 定时扫描数据库中的task出于内存占用量考虑每次最多获取100个并将需要执行的任务派发出去
*/
private class DispatcherRunnable implements Runnable {
@Override
public void run() {
taskPersistenceService.getNeedDispatchTask(jobInstanceInfo.getInstanceId()).forEach(task -> {
try {
// 构造 worker 执行请求
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq(jobInstanceInfo, task);
// 构造 akka 可访问节点路径
String targetIP = task.getAddress();
if (StringUtils.isEmpty(targetIP)) {
targetIP = allWorkerAddress.get(ThreadLocalRandom.current().nextInt(allWorkerAddress.size()));
}
String targetPath = AkkaUtils.getAkkaRemotePath(targetIP, AkkaConstant.WORKER_ACTOR_NAME);
ActorSelection targetActor = OhMyWorker.actorSystem.actorSelection(targetPath);
// 发送请求Akka的tell是至少投递一次经实验表明无法投递消息也不会报错...印度啊...
targetActor.tell(req, taskTrackerActorRef);
// 更新数据库如果更新数据库失败可能导致重复执行先不处理
taskPersistenceService.updateTaskStatus(task.getInstanceId(), task.getTaskId(), TaskStatus.DISPATCH_SUCCESS);
// 更新统计数据
needDispatchTaskNum.decrementAndGet();
dispatchedTaskNum.incrementAndGet();
}catch (Exception e) {
// 调度失败不修改数据库下次重新随机派发给 remote actor
log.warn("[TaskTracker] dispatch task({}) failed.", task);
}
});
}
}
/**
* 定时检查当前任务的执行状态
*/
private class StatusCheckRunnable implements Runnable {
@Override
public void run() {
}
}
}

View File

@ -0,0 +1,14 @@
akka {
actor {
# cluster is better(recommend by official document), but I prefer remote
provider = remote
}
remote {
artery {
transport = tcp # See Selecting a transport below
# over write by code
canonical.hostname = "127.0.0.1"
canonical.port = 25520
}
}
}