ready to release worker and client's 1.0.0 version

This commit is contained in:
tjq 2020-04-20 16:51:23 +08:00
parent 67bbf9e352
commit 0726e1f630
16 changed files with 110 additions and 60 deletions

View File

@ -1,3 +1,4 @@
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.oms.client.OhMyClient;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@ -18,13 +19,12 @@ public class TestClient {
}
@Test
public void testInstanceOpenAPI() throws Exception {
System.out.println(ohMyClient.stopInstance(1586855173043L));
System.out.println(ohMyClient.fetchInstanceStatus(1586855173043L));
public void testStopInstance() throws Exception {
ResultDTO<Void> res = ohMyClient.stopInstance(132522955178508352L);
System.out.println(res.toString());
}
@Test
public void testJobOpenAPI() throws Exception {
System.out.println(ohMyClient.runJob(1L, "hhhh"));
public void testFetchInstanceStatus() throws Exception {
System.out.println(ohMyClient.fetchInstanceStatus(132522955178508352L));
}
}

View File

@ -12,7 +12,7 @@ public class RemoteConstant {
/* ************************ AKKA WORKER ************************ */
public static final int DEFAULT_WORKER_PORT = 2777;
public static final int DEFAULT_WORKER_PORT = 27777;
public static final String WORKER_ACTOR_SYSTEM_NAME = "oms";

View File

@ -29,7 +29,7 @@ public class NetUtils {
// valid port range is (0, 65535]
private static final int MIN_PORT = 0;
private static final int MAX_PORT = 65535;
public static final int MAX_PORT = 65535;
private static final Pattern ADDRESS_PATTERN = Pattern.compile("^\\d{1,3}(\\.\\d{1,3}){3}\\:\\d{1,5}$");
private static final Pattern LOCAL_IP_PATTERN = Pattern.compile("127(\\.\\d{1,3}){3}$");

View File

@ -116,7 +116,7 @@ public class DispatchService {
req.setThreadConcurrency(jobInfo.getConcurrency());
// 发送请求不可靠需要一个后台线程定期轮询状态
String taskTrackerAddress = allAvailableWorker.get(0);
String taskTrackerAddress = finalWorkers.get(0);
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(taskTrackerAddress);
taskTrackerActor.tell(req, null);
log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString());

View File

@ -86,6 +86,7 @@ public class ServerSelectService {
appInfo.setGmtModified(new Date());
appInfoRepository.saveAndFlush(appInfo);
log.info("[ServerSelectService] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
return appInfo.getCurrentServer();
}catch (Exception e) {
log.warn("[ServerSelectService] write new server to db failed for app {}.", appName);
@ -121,7 +122,7 @@ public class ServerSelectService {
downServerCache.remove(serverAddress);
return response.isSuccess();
}catch (Exception e) {
log.warn("[ServerSelectService] server({}) was down, I will be the new server.", serverAddress);
log.warn("[ServerSelectService] server({}) was down.", serverAddress);
}
downServerCache.add(serverAddress);
return false;

View File

@ -0,0 +1,21 @@
package com.github.kfcfans.oms.server.processors;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import org.springframework.stereotype.Component;
/**
* 测试超时任务
*
* @author tjq
* @since 2020/4/20
*/
@Component
public class TimeoutProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
Thread.sleep(Long.parseLong(context.getJobParams()));
return new ProcessResult(true, "impossible~~~~QAQ~");
}
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.oms.worker;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.remote.RemoteTransportException;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.common.utils.JsonUtils;
@ -81,21 +82,28 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
log.warn("[OhMyWorker] using TestMode now, it's dangerous if this is production env.");
}
// 初始化 ActorSystem
Map<String, Object> overrideConfig = Maps.newHashMap();
int port = NetUtils.getAvailablePort(RemoteConstant.DEFAULT_WORKER_PORT);
overrideConfig.put("akka.remote.artery.canonical.hostname", NetUtils.getLocalHost());
overrideConfig.put("akka.remote.artery.canonical.port", port);
workerAddress = NetUtils.getLocalHost() + ":" + port;
log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress);
// 初始化 ActorSystemmacOS上 new ServerSocket 检测端口占用的方法并不生效可能是AKKA是Scala写的缘故没办法...只能靠异常重试了
for (int port = NetUtils.getAvailablePort(RemoteConstant.DEFAULT_WORKER_PORT); port < NetUtils.MAX_PORT; port++) {
try {
Map<String, Object> overrideConfig = Maps.newHashMap();
overrideConfig.put("akka.remote.artery.canonical.hostname", NetUtils.getLocalHost());
overrideConfig.put("akka.remote.artery.canonical.port", port);
workerAddress = NetUtils.getLocalHost() + ":" + port;
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(TaskTrackerActor.class), RemoteConstant.Task_TRACKER_ACTOR_NAME);
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(TaskTrackerActor.class), RemoteConstant.Task_TRACKER_ACTOR_NAME);
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress);
log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
break;
}catch (RemoteTransportException ignore) {
log.warn("[OhMyWorker] port:{} already in use, try to use a new port.", port);
}
}
// 初始化存储
TaskPersistenceService.INSTANCE.init();

View File

@ -32,28 +32,12 @@ public class ProcessorTrackerActor extends AbstractActor {
* 处理来自TaskTracker的task执行请求
*/
private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
Long jobId = req.getInstanceInfo().getJobId();
Long instanceId = req.getInstanceInfo().getInstanceId();
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> {
try {
ProcessorTracker pt = new ProcessorTracker(req);
log.info("[ProcessorTrackerActor] create ProcessorTracker for instance(jobId={}&instanceId={}) success.", jobId, instanceId);
return pt;
}catch (Exception e) {
log.warn("[ProcessorTrackerActor] create ProcessorTracker for instance(jobId={}&instanceId={}) failed.", jobId, instanceId, e);
// 直接上报失败
ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, req.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), e.getMessage(), System.currentTimeMillis());
getSender().tell(report, getSelf());
}
return null;
});
// 创建失败直接返回
if (processorTracker == null) {
return;
}
// 创建 ProcessorTracker 一定能成功且每个任务实例只会创建一个 ProcessorTracker
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> new ProcessorTracker(req));
TaskDO task = new TaskDO();

View File

@ -91,7 +91,11 @@ public class ProcessorTrackerStatus {
* 是否超时超过一定时间没有收到心跳
*/
public boolean isTimeout() {
return System.currentTimeMillis() - lastActiveTime > HEARTBEAT_TIMEOUT_MS;
if (dispatched) {
return System.currentTimeMillis() - lastActiveTime > HEARTBEAT_TIMEOUT_MS;
}
// 未曾派发过任务的机器不用处理
return false;
}
}

View File

@ -26,6 +26,7 @@ import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 负责管理 Processor 的执行
@ -54,25 +55,36 @@ public class ProcessorTracker {
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100;
// ProcessorTracker 出现根本性错误比如 Processor 创建失败所有的任务直接失败
private boolean lethal = false;
private String lethalReason;
/**
* 创建 ProcessorTracker其实就是创建了个执行用的线程池 T_T
*/
public ProcessorTracker(TaskTrackerStartTaskReq request) throws Exception {
public ProcessorTracker(TaskTrackerStartTaskReq request) {
try {
// 赋值
this.startTime = System.currentTimeMillis();
this.instanceInfo = request.getInstanceInfo();
this.instanceId = request.getInstanceInfo().getInstanceId();
this.taskTrackerAddress = request.getTaskTrackerAddress();
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME);
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
// 赋值
this.startTime = System.currentTimeMillis();
this.instanceInfo = request.getInstanceInfo();
this.instanceId = request.getInstanceInfo().getInstanceId();
this.taskTrackerAddress = request.getTaskTrackerAddress();
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME);
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
// 初始化 线程池
initThreadPool();
// 初始化 Processor
initProcessor();
// 初始化定时任务
initTimingJob();
// 初始化 线程池
initThreadPool();
// 初始化 Processor
initProcessor();
// 初始化定时任务
initTimingJob();
log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId);
}catch (Exception e) {
log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, e);
lethal = true;
lethalReason = e.toString();
}
}
/**
@ -87,6 +99,14 @@ public class ProcessorTracker {
*/
public void submitTask(TaskDO newTask) {
// 一旦 ProcessorTracker 出现异常所有提交到此处的任务直接返回失败防止形成死锁
// 死锁分析TT创建PTPT创建失败无法定期汇报心跳TT长时间未收到PT心跳认为PT宕机确实宕机了无法选择可用的PT再次派发任务死锁形成GG斯密达 T_T
if (lethal) {
ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, newTask.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), lethalReason, System.currentTimeMillis());
taskTrackerActorRef.tell(report, null);
return;
}
boolean success = false;
// 1. 设置值并提交执行
newTask.setInstanceId(instanceInfo.getInstanceId());
@ -215,7 +235,7 @@ public class ProcessorTracker {
try {
processor = SpringUtils.getBean(processorInfo);
}catch (Exception e) {
log.warn("[ProcessorRunnable-{}] no spring bean of processor(className={}).", instanceId, processorInfo, e);
log.warn("[ProcessorRunnable-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, e.toString());
}
}
// 反射加载

View File

@ -71,7 +71,8 @@ public abstract class TaskTracker {
BeanUtils.copyProperties(req, instanceInfo);
// 特殊处理超时时间
if (instanceInfo.getInstanceTimeoutMS() <= 0) {
instanceInfo.setInstanceTimeoutMS(Long.MAX_VALUE);
// Integer最大值2147483647一天的毫秒数86400000够执行24天了...要是不满足需求就让开发者手动指定吧
instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE);
}
// 赋予时间表达式类型
instanceInfo.setTimeExpressionType(TimeExpressionType.valueOf(req.getTimeExpressionType()).getV());

Binary file not shown.

Before

Width:  |  Height:  |  Size: 92 KiB

After

Width:  |  Height:  |  Size: 90 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 76 KiB

After

Width:  |  Height:  |  Size: 139 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 59 KiB

After

Width:  |  Height:  |  Size: 64 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 102 KiB

After

Width:  |  Height:  |  Size: 149 KiB

View File

@ -91,4 +91,15 @@ java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
2020-04-16 18:05:09 WARN - [TaskTracker-1586857062542] query TaskStatus from DB failed when try to update new TaskStatus(taskId=4,newStatus=6).
```
解决方案初步怀疑在连续更改时由于数据库锁的存在导致行不可见不知道H2具体的特性。因此需要保证同一个taskId串行更新 -> synchronize Yes
解决方案初步怀疑在连续更改时由于数据库锁的存在导致行不可见不知道H2具体的特性。因此需要保证同一个taskId串行更新 -> synchronize Yes
# 2020.4.20 1.0.0发布前测试
#### Server & Worker
* 指定机器执行 -> 验证通过
* Map/MapReduce/Standalone/Broadcast/Shell/Python处理器的执行 -> 验证通过
* 超时失败 -> 验证通过
* 破坏测试:指定错误的处理器 -> 发现问题,会造成死锁(TT创建PTPT创建失败无法定期汇报心跳TT长时间未收到PT心跳认为PT宕机确实宕机了无法选择可用的PT再次派发任务死锁形成GG斯密达 T_T)。通过确保ProcessorTracker一定能创建成功解决如果处理器构建失败之后所有提交的任务直接返回错误。
#### Client
* StopInstance -> success
* FetchInstanceStatus -> success