feat: support frequent job's worker dynamic online #81

This commit is contained in:
“tjq” 2020-10-17 16:34:48 +08:00
parent 2a6bb0b7f3
commit de3561ed4d
6 changed files with 79 additions and 8 deletions

View File

@ -4,6 +4,8 @@ import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import lombok.*;
import java.nio.charset.StandardCharsets;
/**
* Pattens.ask 的响应
@ -31,7 +33,11 @@ public class AskResponse implements OmsSerializable {
AskResponse r = new AskResponse();
r.success = true;
if (data != null) {
r.data = JsonUtils.toBytes(data);
if (data instanceof String) {
r.data = ((String) data).getBytes(StandardCharsets.UTF_8);
} else {
r.data = JsonUtils.toBytes(data);
}
}
return r;
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.common.utils;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.kfcfans.powerjob.common.PowerJobException;
import org.apache.commons.lang3.exception.ExceptionUtils;
@ -48,6 +49,10 @@ public class JsonUtils {
return objectMapper.readValue(b, clz);
}
public static <T> T parseObject(byte[] b, TypeReference<T> typeReference) throws Exception {
return objectMapper.readValue(b, typeReference);
}
public static <T> T parseObjectUnsafe(String json, Class<T> clz) {
try {
return objectMapper.readValue(json, clz);

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.worker.common.utils;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.common.RemoteConstant;
@ -45,13 +46,20 @@ public class AkkaUtils {
*/
public static boolean reliableTransmit(ActorSelection remote, Object msg) {
try {
CompletionStage<Object> ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse response = (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
return response.isSuccess();
return easyAsk(remote, msg).isSuccess();
}catch (Exception e) {
log.warn("[Oms-Transmitter] transmit {} failed, reason is {}", msg, e.toString());
}
return false;
}
public static AskResponse easyAsk(ActorSelection remote, Object msg) {
try {
CompletionStage<Object> ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
return (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}catch (Exception e) {
throw new PowerJobException(e);
}
}
}

View File

@ -74,4 +74,20 @@ public class ProcessorTrackerStatusHolder {
});
return result;
}
/**
* 注册新的执行节点
* @param address 新的执行节点地址
* @return true: 注册成功 / false已存在
*/
public boolean register(String address) {
ProcessorTrackerStatus pts = address2Status.get(address);
if (pts != null) {
return false;
}
pts = new ProcessorTrackerStatus();
pts.init(address);
address2Status.put(address, pts);
return true;
}
}

View File

@ -20,7 +20,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -85,7 +84,7 @@ public class FrequentTaskTracker extends TaskTracker {
// 1. 初始化定时调度线程池
String poolName = String.format("ftttp-%d", req.getInstanceId()) + "-%d";
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
this.scheduledPool = Executors.newScheduledThreadPool(3, factory);
this.scheduledPool = Executors.newScheduledThreadPool(4, factory);
// 2. 启动任务发射器
launcher = new Launcher();
@ -103,6 +102,8 @@ public class FrequentTaskTracker extends TaskTracker {
scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 1, 2, TimeUnit.SECONDS);
// 4. 启动状态检查器
scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS);
// 5. 启动执行器动态检测装置
scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES);
}
@Override

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.worker.core.tracker.task;
import akka.actor.ActorSelection;
import com.fasterxml.jackson.core.type.TypeReference;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.RemoteConstant;
@ -8,7 +9,10 @@ import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.WorkerQueryExecutorClusterReq;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
@ -63,10 +67,10 @@ public abstract class TaskTracker {
// 是否结束
protected AtomicBoolean finished;
// 上报时间缓存
private Cache<String, Long> taskId2LastReportTime;
private final Cache<String, Long> taskId2LastReportTime;
// 分段锁
private SegmentLock segmentLock;
private final SegmentLock segmentLock;
private static final int UPDATE_CONCURRENCY = 4;
protected TaskTracker(ServerScheduleJobReq req) {
@ -471,6 +475,37 @@ public abstract class TaskTracker {
}
}
/**
* 执行器动态上线for 秒级任务和 MR 任务
* 原则server 查询得到的 执行器状态不会干预 worker 自己维护的状态即只做新增不做任何修改
*/
protected class WorkerDetector implements Runnable {
@Override
public void run() {
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
if (StringUtils.isEmpty(serverPath)) {
log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId);
return;
}
WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(OhMyWorker.getAppId(), instanceInfo.getJobId());
AskResponse response = AkkaUtils.easyAsk(OhMyWorker.actorSystem.actorSelection(serverPath), req);
if (!response.isSuccess()) {
log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage());
return;
}
try {
List<String> workerList = JsonUtils.parseObject(response.getData(), new TypeReference<List<String>>() {});
workerList.forEach(address -> {
if (ptStatusHolder.register(address)) {
log.info("[TaskTracker-{}] detective new worker: {}", instanceId, address);
}
});
}catch (Exception e) {
log.warn("[TaskTracker-{}] detective failed!", instanceId, e);
}
}
}
/**
* 存储任务实例产生的各个Task状态用于分析任务实例执行情况
*/