test the TaskTracker(Standalone mode) and fix a lot of children bug

This commit is contained in:
tjq 2020-03-25 20:21:26 +08:00
parent 5e4463061e
commit ab642805c4
17 changed files with 173 additions and 61 deletions

View File

@ -2,6 +2,8 @@ package com.github.kfcfans.common.request;
import lombok.Data; import lombok.Data;
import java.io.Serializable;
/** /**
* 服务端调度任务请求一次任务处理的入口 * 服务端调度任务请求一次任务处理的入口
* *
@ -9,7 +11,7 @@ import lombok.Data;
* @since 2020/3/17 * @since 2020/3/17
*/ */
@Data @Data
public class ServerScheduleJobReq { public class ServerScheduleJobReq implements Serializable {
// 调度的服务器地址默认通讯目标 // 调度的服务器地址默认通讯目标
private String serverAddress; private String serverAddress;
@ -27,7 +29,7 @@ public class ServerScheduleJobReq {
// 任务执行时间限制单位毫秒 // 任务执行时间限制单位毫秒
private long timeLimit; private long timeLimit;
// 可用处理器地址可能多值逗号分隔 // 可用处理器地址可能多值逗号分隔
private String workerAddress; private String allWorkerAddress;
private String jobParams; private String jobParams;
private String instanceParams; private String instanceParams;

View File

@ -2,6 +2,8 @@ package com.github.kfcfans.common.request;
import lombok.Data; import lombok.Data;
import java.io.Serializable;
/** /**
* TaskTracker 将状态上报给服务器 * TaskTracker 将状态上报给服务器
* *
@ -9,7 +11,7 @@ import lombok.Data;
* @since 2020/3/17 * @since 2020/3/17
*/ */
@Data @Data
public class TaskTrackerReportInstanceStatusReq { public class TaskTrackerReportInstanceStatusReq implements Serializable {
private String jobId; private String jobId;
private String instanceId; private String instanceId;

View File

@ -3,6 +3,8 @@ package com.github.kfcfans.common.request;
import com.github.kfcfans.common.model.SystemMetrics; import com.github.kfcfans.common.model.SystemMetrics;
import lombok.Data; import lombok.Data;
import java.io.Serializable;
/** /**
* Worker 上报健康信息worker定时发送的heartbeat * Worker 上报健康信息worker定时发送的heartbeat
* *
@ -10,7 +12,7 @@ import lombok.Data;
* @since 2020/3/25 * @since 2020/3/25
*/ */
@Data @Data
public class WorkerHealthReportReq { public class WorkerHealthReportReq implements Serializable {
// 本机地址 -> IP:port // 本机地址 -> IP:port
private String totalAddress; private String totalAddress;

View File

@ -4,6 +4,8 @@ import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.io.Serializable;
/** /**
* Pattens.ask 的响应 * Pattens.ask 的响应
* *
@ -13,6 +15,6 @@ import lombok.NoArgsConstructor;
@Data @Data
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public class AskResponse { public class AskResponse implements Serializable {
private boolean success; private boolean success;
} }

View File

@ -7,12 +7,14 @@ import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool; import com.github.kfcfans.oms.worker.core.tracker.task.TaskTrackerPool;
import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq; import com.github.kfcfans.oms.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest; import com.github.kfcfans.oms.worker.pojo.request.ProcessorMapTaskRequest;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.common.response.AskResponse;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import java.util.List; import java.util.List;
@ -45,7 +47,11 @@ public class TaskTrackerActor extends AbstractActor {
if (taskTracker == null) { if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req); log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req);
} else { } else {
taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), req.getStatus(), req.getResult(), false);
// 状态转化
TaskStatus status = TaskStatus.convertStatus(TaskStatus.of(req.getStatus()));
taskTracker.updateTaskStatus(req.getInstanceId(), req.getTaskId(), status.getValue(), req.getResult(), false);
} }
} }
@ -120,7 +126,21 @@ public class TaskTrackerActor extends AbstractActor {
* 服务器任务调度处理器 * 服务器任务调度处理器
*/ */
private void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) { private void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) {
// 接受到任务创建 TaskTracker String instanceId = req.getInstanceId();
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(instanceId);
if (taskTracker != null) {
log.warn("[TaskTrackerActor] TaskTracker({}) for instance(id={}) already exists.", taskTracker, instanceId);
return;
} }
// 原子创建防止多实例的存在
TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> {
JobInstanceInfo jobInstanceInfo = new JobInstanceInfo();
BeanUtils.copyProperties(req, jobInstanceInfo);
return new TaskTracker(jobInstanceInfo);
});
}
} }

View File

@ -16,9 +16,10 @@ public enum TaskStatus {
/* ******************* TaskTracker 专用 ******************* */ /* ******************* TaskTracker 专用 ******************* */
WAITING_DISPATCH(1, "等待调度器调度"), WAITING_DISPATCH(1, "等待调度器调度"),
DISPATCH_SUCCESS_WORKER_UNCHECK(2, "调度成功但不保证worker收到"), DISPATCH_SUCCESS_WORKER_UNCHECK(2, "调度成功但不保证worker收到"),
WORKER_PROCESSING(3, "worker开始执行"), WORKER_RECEIVED(3, "worker接收成功但未开始执行"),
WORKER_PROCESS_FAILED(4, "worker执行失败"), WORKER_PROCESSING(4, "worker正在执行"),
WORKER_PROCESS_SUCCESS(5, "worker执行成功"), WORKER_PROCESS_FAILED(5, "worker执行失败"),
WORKER_PROCESS_SUCCESS(6, "worker执行成功"),
/* ******************* Worker 专用 ******************* */ /* ******************* Worker 专用 ******************* */
RECEIVE_SUCCESS(11, "成功接受任务但未开始执行此时worker满载暂时无法运行"), RECEIVE_SUCCESS(11, "成功接受任务但未开始执行此时worker满载暂时无法运行"),
@ -30,18 +31,21 @@ public enum TaskStatus {
private String des; private String des;
public static TaskStatus of(int v) { public static TaskStatus of(int v) {
switch (v) { for (TaskStatus taskStatus : values()) {
case 1: return WAITING_DISPATCH; if (v == taskStatus.value) {
case 2: return DISPATCH_SUCCESS_WORKER_UNCHECK; return taskStatus;
case 3: return WORKER_PROCESSING; }
case 4: return WORKER_PROCESS_FAILED;
case 5: return WORKER_PROCESS_SUCCESS;
case 11: return RECEIVE_SUCCESS;
case 12: return PROCESSING;
case 13: return PROCESS_FAILED;
case 14: return PROCESS_SUCCESS;
} }
throw new IllegalArgumentException("no TaskStatus match the value of " + v); throw new IllegalArgumentException("no TaskStatus match the value of " + v);
} }
public static TaskStatus convertStatus(TaskStatus processorStatus) {
switch (processorStatus) {
case RECEIVE_SUCCESS: return WORKER_RECEIVED;
case PROCESSING: return WORKER_PROCESSING;
case PROCESS_FAILED: return WORKER_PROCESS_FAILED;
case PROCESS_SUCCESS: return WORKER_PROCESS_SUCCESS;
}
throw new IllegalArgumentException(processorStatus.name() + " is not the processor status.");
}
} }

View File

@ -58,7 +58,7 @@ public class ProcessorRunnable implements Runnable {
} }
ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext); ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
reportStatus(TaskStatus.WORKER_PROCESSING, null); reportStatus(TaskStatus.PROCESSING, null);
// 1. 获取 Processor // 1. 获取 Processor
BasicProcessor processor = getProcessor(); BasicProcessor processor = getProcessor();
@ -123,7 +123,7 @@ public class ProcessorRunnable implements Runnable {
log.warn("[ProcessorRunnable] execute last task(instanceId={},taskId={}) failed.", instanceId, taskId, e); log.warn("[ProcessorRunnable] execute last task(instanceId={},taskId={}) failed.", instanceId, taskId, e);
} }
TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED; TaskStatus status = lastResult.isSuccess() ? TaskStatus.PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED;
reportStatus(status, lastResult.getMsg()); reportStatus(status, lastResult.getMsg());
log.info("[ProcessorRunnable] instance(instanceId={},taskId={})' last task execute successfully, using time: {}", instanceId, taskId, stopwatch); log.info("[ProcessorRunnable] instance(instanceId={},taskId={})' last task execute successfully, using time: {}", instanceId, taskId, stopwatch);
@ -139,7 +139,7 @@ public class ProcessorRunnable implements Runnable {
log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e); log.warn("[ProcessorRunnable] task({}) process failed.", taskContext.getDescription(), e);
processResult = new ProcessResult(false, e.toString()); processResult = new ProcessResult(false, e.toString());
} }
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED, processResult.getMsg()); reportStatus(processResult.isSuccess() ? TaskStatus.PROCESS_SUCCESS : TaskStatus.PROCESS_FAILED, processResult.getMsg());
}catch (Exception e) { }catch (Exception e) {
log.error("[ProcessorRunnable] execute failed, please fix this bug!", e); log.error("[ProcessorRunnable] execute failed, please fix this bug!", e);

View File

@ -1,16 +1,13 @@
package com.github.kfcfans.oms.worker.core.tracker.task; package com.github.kfcfans.oms.worker.core.tracker.task;
import akka.actor.ActorRef;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import ch.qos.logback.core.util.SystemInfo;
import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.JobInstanceStatus; import com.github.kfcfans.common.JobInstanceStatus;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.worker.OhMyWorker; import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.common.AkkaConstant; import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.oms.worker.common.constants.CommonSJ; import com.github.kfcfans.oms.worker.common.constants.CommonSJ;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant; import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
@ -24,6 +21,7 @@ import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStopInstanceReq;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Getter; import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -42,14 +40,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
* @since 2020/3/17 * @since 2020/3/17
*/ */
@Slf4j @Slf4j
public abstract class TaskTracker { @ToString
public class TaskTracker {
protected long startTime; protected long startTime;
protected long jobTimeLimitMS; protected long jobTimeLimitMS;
// 任务实例信息 // 任务实例信息
protected JobInstanceInfo jobInstanceInfo; protected JobInstanceInfo jobInstanceInfo;
protected ActorRef taskTrackerActorRef;
@Getter @Getter
protected List<String> allWorkerAddress; protected List<String> allWorkerAddress;
@ -59,13 +57,14 @@ public abstract class TaskTracker {
protected AtomicBoolean finished = new AtomicBoolean(false); protected AtomicBoolean finished = new AtomicBoolean(false);
public TaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) { public TaskTracker(JobInstanceInfo jobInstanceInfo) {
log.info("[TaskTracker] start to create TaskTracker for instance({}).", jobInstanceInfo);
this.startTime = System.currentTimeMillis(); this.startTime = System.currentTimeMillis();
this.jobTimeLimitMS = jobInstanceInfo.getTimeLimit(); this.jobTimeLimitMS = jobInstanceInfo.getTimeLimit();
this.jobInstanceInfo = jobInstanceInfo; this.jobInstanceInfo = jobInstanceInfo;
this.taskTrackerActorRef = taskTrackerActorRef;
this.taskPersistenceService = TaskPersistenceService.INSTANCE; this.taskPersistenceService = TaskPersistenceService.INSTANCE;
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("oms-TaskTrackerTimingPool-%d").build(); ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("oms-TaskTrackerTimingPool-%d").build();
@ -81,6 +80,7 @@ public abstract class TaskTracker {
// 定时任务2状态检查 // 定时任务2状态检查
scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS); scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS);
} }
@ -165,11 +165,12 @@ public abstract class TaskTracker {
rootTask.setAddress(NetUtils.getLocalHost()); rootTask.setAddress(NetUtils.getLocalHost());
rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME); rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME);
rootTask.setCreatedTime(System.currentTimeMillis()); rootTask.setCreatedTime(System.currentTimeMillis());
rootTask.setCreatedTime(System.currentTimeMillis()); rootTask.setLastModifiedTime(System.currentTimeMillis());
if (!taskPersistenceService.save(rootTask)) { if (!taskPersistenceService.save(rootTask)) {
throw new RuntimeException("create root task failed."); throw new RuntimeException("create root task failed.");
} }
log.info("[TaskTracker] create root task successfully for instance(instanceId={}).", jobInstanceInfo.getInstanceId());
} }
@ -199,10 +200,12 @@ public abstract class TaskTracker {
ActorSelection targetActor = OhMyWorker.actorSystem.actorSelection(targetPath); ActorSelection targetActor = OhMyWorker.actorSystem.actorSelection(targetPath);
// 发送请求Akka的tell是至少投递一次经实验表明无法投递消息也不会报错...印度啊... // 发送请求Akka的tell是至少投递一次经实验表明无法投递消息也不会报错...印度啊...
targetActor.tell(req, taskTrackerActorRef); targetActor.tell(req, null);
// 更新数据库如果更新数据库失败可能导致重复执行先不处理 // 更新数据库如果更新数据库失败可能导致重复执行先不处理
taskPersistenceService.updateTaskStatus(task.getInstanceId(), task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, null); taskPersistenceService.updateTaskStatus(task.getInstanceId(), task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, null);
log.debug("[TaskTracker] dispatch task({instanceId={},taskId={},taskName={}} successfully.)", task.getInstanceId(), task.getTaskId(), task.getTaskName());
}catch (Exception e) { }catch (Exception e) {
// 调度失败不修改数据库下次重新随机派发给 remote actor // 调度失败不修改数据库下次重新随机派发给 remote actor
log.warn("[TaskTracker] dispatch task({}) failed.", task); log.warn("[TaskTracker] dispatch task({}) failed.", task);
@ -218,24 +221,25 @@ public abstract class TaskTracker {
private static final long TIME_OUT_MS = 5000; private static final long TIME_OUT_MS = 5000;
@Override
public void run() { private void innerRun() {
final String instanceId = jobInstanceInfo.getInstanceId(); final String instanceId = jobInstanceInfo.getInstanceId();
// 1. 查询统计信息 // 1. 查询统计信息
Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId); Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId);
long waitingDispatchNum = status2Num.get(TaskStatus.WAITING_DISPATCH); long waitingDispatchNum = status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L);
long workerUnreceivedNum = status2Num.get(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK); long workerUnreceivedNum = status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L);
long receivedNum = status2Num.get(TaskStatus.RECEIVE_SUCCESS); long receivedNum = status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L);
long succeedNum = status2Num.get(TaskStatus.WORKER_PROCESS_SUCCESS); long runningNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L);
long failedNum = status2Num.get(TaskStatus.WORKER_PROCESS_FAILED); long succeedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L);
long failedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L);
long finishedNum = succeedNum + failedNum; long finishedNum = succeedNum + failedNum;
long unfinishedNum = waitingDispatchNum + workerUnreceivedNum + receivedNum; long unfinishedNum = waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum;
log.debug("[TaskTracker] status check result({})", status2Num); log.debug("[TaskTracker] status check result: {}", status2Num);
TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq(); TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
req.setJobId(jobInstanceInfo.getJobId()); req.setJobId(jobInstanceInfo.getJobId());
@ -348,5 +352,13 @@ public abstract class TaskTracker {
} }
@Override
public void run() {
try {
innerRun();
}catch (Exception e) {
log.warn("[TaskTracker] status checker execute failed, please fix the bug (@tjq)!", e);
}
}
} }
} }

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.oms.worker.core.tracker.task;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
/** /**
* 持有 Processor 对象 * 持有 Processor 对象
@ -25,4 +26,8 @@ public class TaskTrackerPool {
instanceId2TaskTracker.remove(instanceId); instanceId2TaskTracker.remove(instanceId);
} }
public static void atomicCreateTaskTracker(String instanceId, Function<String, TaskTracker> creator) {
instanceId2TaskTracker.computeIfAbsent(instanceId, creator);
}
} }

View File

@ -23,11 +23,6 @@ public interface TaskDAO {
boolean save(TaskDO task); boolean save(TaskDO task);
boolean batchSave(Collection<TaskDO> tasks); boolean batchSave(Collection<TaskDO> tasks);
/**
* 更新任务数据必须有主键 instanceId + taskId
*/
boolean update(TaskDO task);
int batchDelete(String instanceId, List<String> taskIds); int batchDelete(String instanceId, List<String> taskIds);
List<TaskDO> simpleQuery(SimpleTaskQuery query); List<TaskDO> simpleQuery(SimpleTaskQuery query);

View File

@ -36,7 +36,7 @@ public class TaskDAOImpl implements TaskDAO {
String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)"; String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?,?)";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) {
fillInsertPreparedStatement(task, ps); fillInsertPreparedStatement(task, ps);
return ps.execute(); return ps.executeUpdate() == 1;
}catch (Exception e) { }catch (Exception e) {
log.error("[TaskDAO] insert failed.", e); log.error("[TaskDAO] insert failed.", e);
} }
@ -64,11 +64,6 @@ public class TaskDAOImpl implements TaskDAO {
} }
@Override
public boolean update(TaskDO task) {
return false;
}
@Override @Override
public int batchDelete(String instanceId, List<String> taskIds) { public int batchDelete(String instanceId, List<String> taskIds) {
String deleteSQL = "delete from task_info where instance_id = %s and task_id in %s"; String deleteSQL = "delete from task_info where instance_id = %s and task_id in %s";
@ -144,7 +139,8 @@ public class TaskDAOImpl implements TaskDAO {
String sqlFormat = "update task_info set %s where %s"; String sqlFormat = "update task_info set %s where %s";
String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getQueryCondition()); String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getQueryCondition());
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) { try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) {
return stat.execute(); stat.executeUpdate();
return true;
}catch (Exception e) { }catch (Exception e) {
log.error("[TaskDAO] simpleUpdate failed(sql = {}).", updateField, e); log.error("[TaskDAO] simpleUpdate failed(sql = {}).", updateField, e);
return false; return false;

View File

@ -97,8 +97,9 @@ public class TaskPersistenceService {
Map<TaskStatus, Long> result = Maps.newHashMap(); Map<TaskStatus, Long> result = Maps.newHashMap();
dbRES.forEach(row -> { dbRES.forEach(row -> {
int status = Integer.parseInt(String.valueOf(row.get("status"))); // H2 数据库都是大写...
long num = Long.parseLong(String.valueOf(row.get("num"))); int status = Integer.parseInt(String.valueOf(row.get("STATUS")));
long num = Long.parseLong(String.valueOf(row.get("NUM")));
result.put(TaskStatus.of(status), num); result.put(TaskStatus.of(status), num);
}); });
return result; return result;

View File

@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.pojo.request;
import lombok.Data; import lombok.Data;
import java.io.Serializable;
/** /**
* 广播任务 preExecute 结束信息 * 广播任务 preExecute 结束信息
* *
@ -9,7 +11,7 @@ import lombok.Data;
* @since 2020/3/23 * @since 2020/3/23
*/ */
@Data @Data
public class BroadcastTaskPreExecuteFinishedReq { public class BroadcastTaskPreExecuteFinishedReq implements Serializable {
private String instanceId; private String instanceId;
private String taskId; private String taskId;

View File

@ -8,6 +8,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List; import java.util.List;
/** /**
@ -18,7 +19,7 @@ import java.util.List;
*/ */
@Getter @Getter
@NoArgsConstructor @NoArgsConstructor
public class ProcessorMapTaskRequest { public class ProcessorMapTaskRequest implements Serializable {
private String instanceId; private String instanceId;

View File

@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.pojo.request;
import lombok.Data; import lombok.Data;
import java.io.Serializable;
/** /**
* worker 上报 task 执行情况 * worker 上报 task 执行情况
* *
@ -9,7 +11,7 @@ import lombok.Data;
* @since 2020/3/17 * @since 2020/3/17
*/ */
@Data @Data
public class ProcessorReportTaskStatusReq { public class ProcessorReportTaskStatusReq implements Serializable {
private String instanceId; private String instanceId;
private String taskId; private String taskId;

View File

@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.pojo.request;
import lombok.Data; import lombok.Data;
import java.io.Serializable;
/** /**
* TaskTracker 停止 ProcessorTracker释放相关资源 * TaskTracker 停止 ProcessorTracker释放相关资源
* 任务执行完毕后停止 OR 手动强制停止 * 任务执行完毕后停止 OR 手动强制停止
@ -10,7 +12,7 @@ import lombok.Data;
* @since 2020/3/25 * @since 2020/3/25
*/ */
@Data @Data
public class TaskTrackerStopInstanceReq { public class TaskTrackerStopInstanceReq implements Serializable {
private String instanceId; private String instanceId;
// 保留字段暂时没用 // 保留字段暂时没用

View File

@ -0,0 +1,64 @@
package com.github.kfcfans.oms;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.github.kfcfans.common.AkkaConstant;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.typesafe.config.ConfigFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/**
* 测试完整的 JobInstance 执行流程
*
* @author tjq
* @since 2020/3/25
*/
public class TaskTrackerTest {
private static ActorSelection remoteTaskTracker;
@BeforeAll
public static void init() {
OhMyConfig ohMyConfig = new OhMyConfig();
ohMyConfig.setAppName("oms-test");
OhMyWorker worker = new OhMyWorker();
worker.setConfig(ohMyConfig);
worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(NetUtils.getLocalHost(), AkkaConstant.Task_TRACKER_ACTOR_NAME);
remoteTaskTracker = testAS.actorSelection(akkaRemotePath);
}
@Test
public void testStandaloneJob() throws Exception {
ServerScheduleJobReq req = new ServerScheduleJobReq();
req.setJobId("1");
req.setInstanceId("10086");
req.setAllWorkerAddress(NetUtils.getLocalHost());
req.setExecuteType(ExecuteType.STANDALONE.name());
req.setJobParams("this is job Params");
req.setInstanceParams("this is instance Params");
req.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor");
req.setTaskRetryNum(3);
req.setThreadConcurrency(5);
req.setTimeLimit(500000);
remoteTaskTracker.tell(req, null);
Thread.sleep(500000);
}
}