diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java index 52331437..b6fc9d28 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java @@ -4,6 +4,8 @@ import com.github.kfcfans.powerjob.common.OmsSerializable; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import lombok.*; +import java.nio.charset.StandardCharsets; + /** * Pattens.ask 的响应 @@ -31,7 +33,11 @@ public class AskResponse implements OmsSerializable { AskResponse r = new AskResponse(); r.success = true; if (data != null) { - r.data = JsonUtils.toBytes(data); + if (data instanceof String) { + r.data = ((String) data).getBytes(StandardCharsets.UTF_8); + } else { + r.data = JsonUtils.toBytes(data); + } } return r; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java index 3d0a61ae..2e91357f 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java @@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.common.utils; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.kfcfans.powerjob.common.PowerJobException; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -48,6 +49,10 @@ public class JsonUtils { return objectMapper.readValue(b, clz); } + public static T parseObject(byte[] b, TypeReference typeReference) throws Exception { + return objectMapper.readValue(b, typeReference); + } + public static T parseObjectUnsafe(String json, Class clz) { try { return objectMapper.readValue(json, clz); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java index 42114b90..efb465df 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java @@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.worker.common.utils; import akka.actor.ActorSelection; import akka.pattern.Patterns; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.common.RemoteConstant; @@ -45,13 +46,20 @@ public class AkkaUtils { */ public static boolean reliableTransmit(ActorSelection remote, Object msg) { try { - CompletionStage ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); - AskResponse response = (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - return response.isSuccess(); + return easyAsk(remote, msg).isSuccess(); }catch (Exception e) { log.warn("[Oms-Transmitter] transmit {} failed, reason is {}", msg, e.toString()); } return false; } + public static AskResponse easyAsk(ActorSelection remote, Object msg) { + try { + CompletionStage ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + return (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + }catch (Exception e) { + throw new PowerJobException(e); + } + } + } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java index 461aca34..9e2c5267 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java @@ -74,4 +74,20 @@ public class ProcessorTrackerStatusHolder { }); return result; } + + /** + * 注册新的执行节点 + * @param address 新的执行节点地址 + * @return true: 注册成功 / false:已存在 + */ + public boolean register(String address) { + ProcessorTrackerStatus pts = address2Status.get(address); + if (pts != null) { + return false; + } + pts = new ProcessorTrackerStatus(); + pts.init(address); + address2Status.put(address, pts); + return true; + } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index 27d3af71..00903371 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -20,7 +20,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; -import javax.annotation.Nullable; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -85,7 +84,7 @@ public class FrequentTaskTracker extends TaskTracker { // 1. 初始化定时调度线程池 String poolName = String.format("ftttp-%d", req.getInstanceId()) + "-%d"; ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build(); - this.scheduledPool = Executors.newScheduledThreadPool(3, factory); + this.scheduledPool = Executors.newScheduledThreadPool(4, factory); // 2. 启动任务发射器 launcher = new Launcher(); @@ -103,6 +102,8 @@ public class FrequentTaskTracker extends TaskTracker { scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 1, 2, TimeUnit.SECONDS); // 4. 启动状态检查器 scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS); + // 5. 启动执行器动态检测装置 + scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES); } @Override diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index bb9bb88f..55c38634 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.worker.core.tracker.task; import akka.actor.ActorSelection; +import com.fasterxml.jackson.core.type.TypeReference; import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.RemoteConstant; @@ -8,7 +9,10 @@ import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq; +import com.github.kfcfans.powerjob.common.request.WorkerQueryExecutorClusterReq; +import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.utils.CommonUtils; +import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.SegmentLock; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; @@ -63,10 +67,10 @@ public abstract class TaskTracker { // 是否结束 protected AtomicBoolean finished; // 上报时间缓存 - private Cache taskId2LastReportTime; + private final Cache taskId2LastReportTime; // 分段锁 - private SegmentLock segmentLock; + private final SegmentLock segmentLock; private static final int UPDATE_CONCURRENCY = 4; protected TaskTracker(ServerScheduleJobReq req) { @@ -471,6 +475,37 @@ public abstract class TaskTracker { } } + /** + * 执行器动态上线(for 秒级任务和 MR 任务) + * 原则:server 查询得到的 执行器状态不会干预 worker 自己维护的状态,即只做新增,不做任何修改 + */ + protected class WorkerDetector implements Runnable { + @Override + public void run() { + String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); + if (StringUtils.isEmpty(serverPath)) { + log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId); + return; + } + WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(OhMyWorker.getAppId(), instanceInfo.getJobId()); + AskResponse response = AkkaUtils.easyAsk(OhMyWorker.actorSystem.actorSelection(serverPath), req); + if (!response.isSuccess()) { + log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage()); + return; + } + try { + List workerList = JsonUtils.parseObject(response.getData(), new TypeReference>() {}); + workerList.forEach(address -> { + if (ptStatusHolder.register(address)) { + log.info("[TaskTracker-{}] detective new worker: {}", instanceId, address); + } + }); + }catch (Exception e) { + log.warn("[TaskTracker-{}] detective failed!", instanceId, e); + } + } + } + /** * 存储任务实例产生的各个Task状态,用于分析任务实例执行情况 */