diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java index 93093236..92fef38e 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java @@ -20,6 +20,11 @@ public class ServerScheduleJobReq implements PowerSerializable { */ private List allWorkerAddress; + /** + * 最大机器数量 + */ + private Integer maxWorkerCount; + /* *********************** 任务相关属性 *********************** */ /** diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/TaskTrackerReportInstanceStatusReq.java b/powerjob-common/src/main/java/tech/powerjob/common/request/TaskTrackerReportInstanceStatusReq.java index 55cf865d..8347d6ac 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/TaskTrackerReportInstanceStatusReq.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/TaskTrackerReportInstanceStatusReq.java @@ -15,6 +15,12 @@ import java.util.Map; @Data public class TaskTrackerReportInstanceStatusReq implements PowerSerializable { + /** + * 追加上报自己的 appId + * 方便后续的监控日志埋点 + */ + private Long appId; + private Long jobId; private Long instanceId; diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 08e6b04d..268dba77 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -21,6 +21,7 @@ powerjob-server-extension powerjob-server-migrate powerjob-server-core + powerjob-server-monitor @@ -56,6 +57,11 @@ powerjob-server-common ${project.version} + + tech.powerjob + powerjob-server-monitor + ${project.version} + tech.powerjob powerjob-server-extension diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/RejectedExecutionHandlerFactory.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/RejectedExecutionHandlerFactory.java index e7ea7cc0..6e0e1527 100644 --- a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/RejectedExecutionHandlerFactory.java +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/RejectedExecutionHandlerFactory.java @@ -2,6 +2,7 @@ package tech.powerjob.server.common; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.atomic.AtomicLong; @@ -16,15 +17,27 @@ public class RejectedExecutionHandlerFactory { private static final AtomicLong COUNTER = new AtomicLong(); + /** + * 拒绝执行,抛出 RejectedExecutionException + * @param source name for log + * @return A handler for tasks that cannot be executed by ThreadPool + */ + public static RejectedExecutionHandler newAbort(String source) { + return (r, e) -> { + log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Abort, Maybe you need to adjust the ThreadPool config!", source, e, r); + throw new RejectedExecutionException("Task " + r.toString() + + " rejected from " + source); + }; + } + /** * 直接丢弃该任务 * @param source log name * @return A handler for tasks that cannot be executed by ThreadPool */ - public static RejectedExecutionHandler newReject(String source) { + public static RejectedExecutionHandler newDiscard(String source) { return (r, p) -> { - log.error("[{}] ThreadPool[{}] overload, the task[{}] will be dropped!", source, p, r); - log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source); + log.error("[{}] ThreadPool[{}] overload, the task[{}] will be Discard, Maybe you need to adjust the ThreadPool config!", source, p, r); }; } @@ -35,8 +48,7 @@ public class RejectedExecutionHandlerFactory { */ public static RejectedExecutionHandler newCallerRun(String source) { return (r, p) -> { - log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread!", source, p, r); - log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source); + log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by caller thread, Maybe you need to adjust the ThreadPool config!", source, p, r); if (!p.isShutdown()) { r.run(); } @@ -50,8 +62,7 @@ public class RejectedExecutionHandlerFactory { */ public static RejectedExecutionHandler newThreadRun(String source) { return (r, p) -> { - log.warn("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!", source, p, r); - log.warn("[{}] Maybe you need to adjust the ThreadPool config!", source); + log.error("[{}] ThreadPool[{}] overload, the task[{}] will run by a new thread!, Maybe you need to adjust the ThreadPool config!", source, p, r); if (!p.isShutdown()) { String threadName = source + "-T-" + COUNTER.getAndIncrement(); log.info("[{}] create new thread[{}] to run job", source, threadName); diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/SJ.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/SJ.java index 443300b2..89a89730 100644 --- a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/SJ.java +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/SJ.java @@ -14,4 +14,6 @@ public class SJ { public static final Splitter COMMA_SPLITTER = Splitter.on(","); public static final Joiner COMMA_JOINER = Joiner.on(","); + public static final Joiner MONITOR_JOINER = Joiner.on("|").useForNull("-"); + } diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/aware/PowerJobAware.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/aware/PowerJobAware.java new file mode 100644 index 00000000..fc124fa9 --- /dev/null +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/aware/PowerJobAware.java @@ -0,0 +1,10 @@ +package tech.powerjob.server.common.aware; + +/** + * PowerJobAware + * + * @author tjq + * @since 2022/9/12 + */ +public interface PowerJobAware { +} diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/aware/ServerInfoAware.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/aware/ServerInfoAware.java new file mode 100644 index 00000000..57706f4b --- /dev/null +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/aware/ServerInfoAware.java @@ -0,0 +1,14 @@ +package tech.powerjob.server.common.aware; + +import tech.powerjob.server.common.module.ServerInfo; + +/** + * notify server info + * + * @author tjq + * @since 2022/9/12 + */ +public interface ServerInfoAware extends PowerJobAware { + + void setServerInfo(ServerInfo serverInfo); +} diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/constants/PJThreadPool.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/constants/PJThreadPool.java new file mode 100644 index 00000000..ef8b2b06 --- /dev/null +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/constants/PJThreadPool.java @@ -0,0 +1,26 @@ +package tech.powerjob.server.common.constants; + +/** + * 线程池 + * + * @author tjq + * @since 2022/9/12 + */ +public class PJThreadPool { + + /** + * 定时调度用线程池 + */ + public static final String TIMING_POOL = "PowerJobTimingPool"; + + /** + * 后台任务异步线程池 + */ + public static final String BACKGROUND_POOL = "PowerJobBackgroundPool"; + + /** + * 本地数据库专用线程池 + */ + public static final String LOCAL_DB_POOL = "PowerJobLocalDbPool"; + +} diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/module/ServerInfo.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/module/ServerInfo.java new file mode 100644 index 00000000..d2c004fa --- /dev/null +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/module/ServerInfo.java @@ -0,0 +1,19 @@ +package tech.powerjob.server.common.module; + +import lombok.Data; + +/** + * current server info + * + * @author tjq + * @since 2022/9/12 + */ +@Data +public class ServerInfo { + + private Long id; + + private String ip; + + private String version = "UNKNOWN"; +} diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/HashedWheelTimerHolder.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/HashedWheelTimerHolder.java index 0d9556ba..5d73a46a 100644 --- a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/HashedWheelTimerHolder.java +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/HashedWheelTimerHolder.java @@ -1,6 +1,7 @@ package tech.powerjob.server.common.timewheel.holder; import tech.powerjob.server.common.timewheel.HashedWheelTimer; +import tech.powerjob.server.common.timewheel.Timer; /** * 时间轮单例 @@ -11,7 +12,7 @@ import tech.powerjob.server.common.timewheel.HashedWheelTimer; public class HashedWheelTimerHolder { // 非精确时间轮,每 5S 走一格 - public static final HashedWheelTimer INACCURATE_TIMER = new HashedWheelTimer(5, 16, 0); + public static final Timer INACCURATE_TIMER = new HashedWheelTimer(5000, 16, 0); private HashedWheelTimerHolder() { } diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/InstanceTimeWheelService.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/InstanceTimeWheelService.java index a6453916..35d06d4c 100644 --- a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/InstanceTimeWheelService.java +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/InstanceTimeWheelService.java @@ -1,6 +1,7 @@ package tech.powerjob.server.common.timewheel.holder; import tech.powerjob.server.common.timewheel.HashedWheelTimer; +import tech.powerjob.server.common.timewheel.Timer; import tech.powerjob.server.common.timewheel.TimerFuture; import tech.powerjob.server.common.timewheel.TimerTask; import com.google.common.collect.Maps; @@ -19,9 +20,9 @@ public class InstanceTimeWheelService { private static final Map CARGO = Maps.newConcurrentMap(); // 精确调度时间轮,每 1MS 走一格 - private static final HashedWheelTimer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4); + private static final Timer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4); // 非精确调度时间轮,用于处理高延迟任务,每 10S 走一格 - private static final HashedWheelTimer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0); + private static final Timer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0); // 支持取消的时间间隔,低于该阈值则不会放进 CARGO private static final long MIN_INTERVAL_MS = 1000; diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/AOPUtils.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/AOPUtils.java index c1edeb09..3d06dd64 100644 --- a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/AOPUtils.java +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/AOPUtils.java @@ -2,6 +2,7 @@ package tech.powerjob.server.common.utils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.aspectj.lang.JoinPoint; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.Signature; import org.aspectj.lang.reflect.MethodSignature; @@ -27,6 +28,10 @@ public class AOPUtils { private static final ExpressionParser parser = new SpelExpressionParser(); private static final ParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer(); + public static String parseRealClassName(JoinPoint joinPoint) { + return joinPoint.getSignature().getDeclaringType().getSimpleName(); + } + public static Method parseMethod(ProceedingJoinPoint joinPoint) { Signature pointSignature = joinPoint.getSignature(); if (!(pointSignature instanceof MethodSignature)) { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java index 44d36d60..db0183f7 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java @@ -146,9 +146,8 @@ public class DispatchService { // 构造任务调度请求 ServerScheduleJobReq req = constructServerScheduleJobReq(jobInfo, instanceInfo, workerIpList); - // 发送请求(不可靠,需要一个后台线程定期轮询状态) - WorkerInfo taskTracker = selectTaskTracker(jobInfo, suitableWorkers); + WorkerInfo taskTracker = suitableWorkers.get(0); String taskTrackerAddress = taskTracker.getAddress(); transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req); @@ -182,6 +181,7 @@ public class DispatchService { } req.setInstanceId(instanceInfo.getInstanceId()); req.setAllWorkerAddress(finalWorkersIpList); + req.setMaxWorkerCount(jobInfo.getMaxWorkerCount()); // 设置工作流ID req.setWfInstanceId(instanceInfo.getWfInstanceId()); @@ -196,17 +196,4 @@ public class DispatchService { req.setThreadConcurrency(jobInfo.getConcurrency()); return req; } - - private WorkerInfo selectTaskTracker(JobInfoDO jobInfo, List workerInfos) { - DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy()); - switch (dispatchStrategy) { - case HEALTH_FIRST: - return workerInfos.get(0); - case RANDOM: - return workerInfos.get(ThreadLocalRandom.current().nextInt(workerInfos.size())); - default: - } - // impossible, indian java - return workerInfos.get(0); - } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java new file mode 100644 index 00000000..f1b68eec --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java @@ -0,0 +1,161 @@ +package tech.powerjob.server.core.handler; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.beans.BeanUtils; +import org.springframework.core.env.Environment; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.request.*; +import tech.powerjob.common.response.AskResponse; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.module.WorkerInfo; +import tech.powerjob.server.common.utils.SpringUtils; +import tech.powerjob.server.monitor.MonitorService; +import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent; +import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent; +import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent; +import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; +import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; +import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; +import tech.powerjob.server.remote.worker.WorkerClusterQueryService; + +import javax.annotation.Resource; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.RejectedExecutionException; +import java.util.stream.Collectors; + +/** + * wrapper monitor for IWorkerRequestHandler + * + * @author tjq + * @since 2022/9/11 + */ +@Slf4j +public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler { + + @Resource + protected MonitorService monitorService; + @Resource + protected Environment environment; + @Resource + protected ContainerInfoRepository containerInfoRepository; + @Resource + private WorkerClusterQueryService workerClusterQueryService; + + protected abstract void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event); + + protected abstract Optional processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception; + + protected abstract void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event); + + + @Override + public void processWorkerHeartbeat(WorkerHeartbeat heartbeat) { + long startMs = System.currentTimeMillis(); + WorkerHeartbeatEvent event = new WorkerHeartbeatEvent() + .setAppName(heartbeat.getAppName()) + .setAppId(heartbeat.getAppId()) + .setVersion(heartbeat.getVersion()) + .setProtocol(heartbeat.getProtocol()) + .setTag(heartbeat.getTag()) + .setWorkerAddress(heartbeat.getWorkerAddress()) + .setDelayMs(startMs - heartbeat.getHeartbeatTime()) + .setScore(heartbeat.getSystemMetrics().getScore()); + processWorkerHeartbeat0(heartbeat, event); + monitorService.monitor(event); + } + + @Override + public Optional processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req) { + long startMs = System.currentTimeMillis(); + TtReportInstanceStatusEvent event = new TtReportInstanceStatusEvent() + .setAppId(req.getAppId()) + .setJobId(req.getJobId()) + .setInstanceId(req.getInstanceId()) + .setWfInstanceId(req.getWfInstanceId()) + .setInstanceStatus(InstanceStatus.of(req.getInstanceStatus())) + .setDelayMs(startMs - req.getReportTime()) + .setServerProcessStatus(TtReportInstanceStatusEvent.Status.SUCCESS); + try { + return processTaskTrackerReportInstanceStatus0(req, event); + } catch (Exception e) { + event.setServerProcessStatus(TtReportInstanceStatusEvent.Status.FAILED); + log.error("[WorkerRequestHandler] processTaskTrackerReportInstanceStatus failed for request: {}", req, e); + return Optional.of(AskResponse.failed(ExceptionUtils.getMessage(e))); + } finally { + event.setServerProcessCost(System.currentTimeMillis() - startMs); + monitorService.monitor(event); + } + } + + @Override + public void processWorkerLogReport(WorkerLogReportReq req) { + + WorkerLogReportEvent event = new WorkerLogReportEvent() + .setWorkerAddress(req.getWorkerAddress()) + .setLogNum(req.getInstanceLogContents().size()); + try { + processWorkerLogReport0(req, event); + event.setStatus(WorkerLogReportEvent.Status.SUCCESS); + } catch (RejectedExecutionException re) { + event.setStatus(WorkerLogReportEvent.Status.REJECTED); + } catch (Throwable t) { + event.setStatus(WorkerLogReportEvent.Status.EXCEPTION); + log.warn("[WorkerRequestHandler] process worker report failed!", t); + } finally { + monitorService.monitor(event); + } + } + + @Override + public AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req) { + AskResponse askResponse; + + Long jobId = req.getJobId(); + Long appId = req.getAppId(); + + JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class); + Optional jobInfoOpt = jobInfoRepository.findById(jobId); + if (jobInfoOpt.isPresent()) { + JobInfoDO jobInfo = jobInfoOpt.get(); + if (!jobInfo.getAppId().equals(appId)) { + askResponse = AskResponse.failed("Permission Denied!"); + }else { + List sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo) + .stream().map(WorkerInfo::getAddress).collect(Collectors.toList()); + askResponse = AskResponse.succeed(sortedAvailableWorker); + } + }else { + askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId); + } + return askResponse; + } + + @Override + public AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest req) { + String port = environment.getProperty("local.server.port"); + + Optional containerInfoOpt = containerInfoRepository.findById(req.getContainerId()); + AskResponse askResponse = new AskResponse(); + if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) { + askResponse.setSuccess(false); + askResponse.setMessage("can't find container by id: " + req.getContainerId()); + }else { + ContainerInfoDO containerInfo = containerInfoOpt.get(); + askResponse.setSuccess(true); + + ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest(); + BeanUtils.copyProperties(containerInfo, dpReq); + dpReq.setContainerId(containerInfo.getId()); + String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, containerInfo.getVersion()); + dpReq.setDownloadURL(downloadURL); + + askResponse.setData(JsonUtils.toBytes(dpReq)); + } + return askResponse; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java new file mode 100644 index 00000000..6fe30e70 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java @@ -0,0 +1,48 @@ +package tech.powerjob.server.core.handler; + +import tech.powerjob.common.request.*; +import tech.powerjob.common.response.AskResponse; + +import java.util.Optional; + +/** + * 定义 server 与 worker 之间需要处理的协议 + * + * @author tjq + * @since 2022/9/10 + */ +public interface IWorkerRequestHandler { + + /** + * 处理 worker 上报的心跳信息 + * @param heartbeat 心跳信息 + */ + void processWorkerHeartbeat(WorkerHeartbeat heartbeat); + + /** + * 处理 TaskTracker 的任务实例上报 + * @param req 上报请求 + * @return 响应信息 + */ + Optional processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req); + + /** + * 处理 worker 查询执行器集群 + * @param req 请求 + * @return cluster info + */ + AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req); + + /** + * 处理 worker 日志推送请求 + * @param req 请求 + */ + void processWorkerLogReport(WorkerLogReportReq req); + + /** + * 处理 worker 的容器部署请求 + * @param request 请求 + * @return 容器部署信息 + */ + AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest request); +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandler.java deleted file mode 100644 index ed5787a6..00000000 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandler.java +++ /dev/null @@ -1,171 +0,0 @@ -package tech.powerjob.server.core.handler; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; -import org.springframework.core.env.Environment; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; -import tech.powerjob.common.RemoteConstant; -import tech.powerjob.common.enums.InstanceStatus; -import tech.powerjob.common.request.*; -import tech.powerjob.common.response.AskResponse; -import tech.powerjob.common.serialize.JsonUtils; -import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.common.module.WorkerInfo; -import tech.powerjob.server.common.utils.SpringUtils; -import tech.powerjob.server.core.handler.impl.WorkerRequestAkkaHandler; -import tech.powerjob.server.core.handler.impl.WorkerRequestHttpHandler; -import tech.powerjob.server.core.instance.InstanceLogService; -import tech.powerjob.server.core.instance.InstanceManager; -import tech.powerjob.server.core.workflow.WorkflowInstanceManager; -import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; -import tech.powerjob.server.persistence.remote.model.JobInfoDO; -import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; -import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; -import tech.powerjob.server.remote.transport.starter.VertXStarter; -import tech.powerjob.server.remote.worker.WorkerClusterManagerService; -import tech.powerjob.server.remote.worker.WorkerClusterQueryService; - -import javax.annotation.PostConstruct; -import javax.annotation.Resource; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; - -/** - * receive and process worker's request - * - * @author tjq - * @since 2021/2/8 - */ -@Slf4j -@Component -public class WorkerRequestHandler { - - @Resource - private Environment environment; - @Resource - private InstanceManager instanceManager; - @Resource - private WorkflowInstanceManager workflowInstanceManager; - @Resource - private InstanceLogService instanceLogService; - @Resource - private ContainerInfoRepository containerInfoRepository; - - @Resource - private WorkerClusterQueryService workerClusterQueryService; - - private static WorkerRequestHandler workerRequestHandler; - - @PostConstruct - public void initHandler() { - // init akka - AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME); - // init vert.x - VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler()); - } - - /** - * 处理 Worker 的心跳请求 - * @param heartbeat 心跳包 - */ - public void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) { - WorkerClusterManagerService.updateStatus(heartbeat); - } - - /** - * 处理 instance 状态 - * @param req 任务实例的状态上报请求 - */ - public Optional onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) throws ExecutionException { - // 2021/02/05 如果是工作流中的实例先尝试更新上下文信息,再更新实例状态,这里一定不会有异常 - if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) { - // 更新工作流上下文信息 - workflowInstanceManager.updateWorkflowContext(req.getWfInstanceId(),req.getAppendedWfContext()); - } - - instanceManager.updateStatus(req); - - // 结束状态(成功/失败)需要回复消息 - if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) { - return Optional.of(AskResponse.succeed(null)); - } - return Optional.empty(); - } - - /** - * 处理OMS在线日志请求 - * @param req 日志请求 - */ - public void onReceiveWorkerLogReportReq(WorkerLogReportReq req) { - // 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧... - instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents()); - } - - /** - * 处理 Worker容器部署请求 - * @param req 容器部署请求 - */ - public AskResponse onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) { - - String port = environment.getProperty("local.server.port"); - - Optional containerInfoOpt = containerInfoRepository.findById(req.getContainerId()); - AskResponse askResponse = new AskResponse(); - if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) { - askResponse.setSuccess(false); - askResponse.setMessage("can't find container by id: " + req.getContainerId()); - }else { - ContainerInfoDO containerInfo = containerInfoOpt.get(); - askResponse.setSuccess(true); - - ServerDeployContainerRequest dpReq = new ServerDeployContainerRequest(); - BeanUtils.copyProperties(containerInfo, dpReq); - dpReq.setContainerId(containerInfo.getId()); - String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, containerInfo.getVersion()); - dpReq.setDownloadURL(downloadURL); - - askResponse.setData(JsonUtils.toBytes(dpReq)); - } - return askResponse; - } - - /** - * 处理 worker 请求获取当前任务所有处理器节点的请求 - * @param req jobId + appId - */ - public AskResponse onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) { - - AskResponse askResponse; - - Long jobId = req.getJobId(); - Long appId = req.getAppId(); - - JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class); - Optional jobInfoOpt = jobInfoRepository.findById(jobId); - if (jobInfoOpt.isPresent()) { - JobInfoDO jobInfo = jobInfoOpt.get(); - if (!jobInfo.getAppId().equals(appId)) { - askResponse = AskResponse.failed("Permission Denied!"); - }else { - List sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo) - .stream().map(WorkerInfo::getAddress).collect(Collectors.toList()); - askResponse = AskResponse.succeed(sortedAvailableWorker); - } - }else { - askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId); - } - return askResponse; - } - - public static WorkerRequestHandler getWorkerRequestHandler() { - if (workerRequestHandler == null) { - workerRequestHandler = SpringUtils.getBean(WorkerRequestHandler.class); - } - return workerRequestHandler; - } -} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java new file mode 100644 index 00000000..ac259e03 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java @@ -0,0 +1,28 @@ +package tech.powerjob.server.core.handler; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * WorkerRequestHandlerHolder + * + * @author tjq + * @since 2022/9/11 + */ +@Component +public class WorkerRequestHandlerHolder { + + private static IWorkerRequestHandler workerRequestHandler; + + + public static IWorkerRequestHandler fetchWorkerRequestHandler() { + return workerRequestHandler; + } + + @Autowired + public void setWorkerRequestHandler(IWorkerRequestHandler workerRequestHandler) { + WorkerRequestHandlerHolder.workerRequestHandler = workerRequestHandler; + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java new file mode 100644 index 00000000..77e573be --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java @@ -0,0 +1,67 @@ +package tech.powerjob.server.core.handler; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; +import tech.powerjob.common.request.WorkerHeartbeat; +import tech.powerjob.common.request.WorkerLogReportReq; +import tech.powerjob.common.response.AskResponse; +import tech.powerjob.server.core.instance.InstanceLogService; +import tech.powerjob.server.core.instance.InstanceManager; +import tech.powerjob.server.core.workflow.WorkflowInstanceManager; +import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent; +import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent; +import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent; +import tech.powerjob.server.remote.worker.WorkerClusterManagerService; + +import javax.annotation.Resource; +import java.util.Optional; + +/** + * receive and process worker's request + * + * @author tjq + * @since 2022/9/11 + */ +@Slf4j +@Component +public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler { + + @Resource + private InstanceManager instanceManager; + @Resource + private WorkflowInstanceManager workflowInstanceManager; + @Resource + private InstanceLogService instanceLogService; + + @Override + protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) { + WorkerClusterManagerService.updateStatus(heartbeat); + } + + @Override + protected Optional processTaskTrackerReportInstanceStatus0(TaskTrackerReportInstanceStatusReq req, TtReportInstanceStatusEvent event) throws Exception { + // 2021/02/05 如果是工作流中的实例先尝试更新上下文信息,再更新实例状态,这里一定不会有异常 + if (req.getWfInstanceId() != null && !CollectionUtils.isEmpty(req.getAppendedWfContext())) { + // 更新工作流上下文信息 + workflowInstanceManager.updateWorkflowContext(req.getWfInstanceId(),req.getAppendedWfContext()); + } + + instanceManager.updateStatus(req); + + // 结束状态(成功/失败)需要回复消息 + if (InstanceStatus.FINISHED_STATUS.contains(req.getInstanceStatus())) { + return Optional.of(AskResponse.succeed(null)); + } + return Optional.empty(); + } + + @Override + protected void processWorkerLogReport0(WorkerLogReportReq req, WorkerLogReportEvent event) { + // 这个效率应该不会拉垮吧...也就是一些判断 + Map#get 吧... + instanceLogService.submitLogs(req.getWorkerAddress(), req.getInstanceLogContents()); + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/Initializer.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/Initializer.java new file mode 100644 index 00000000..77951c32 --- /dev/null +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/Initializer.java @@ -0,0 +1,26 @@ +package tech.powerjob.server.core.handler.impl; + +import org.springframework.stereotype.Component; +import tech.powerjob.common.RemoteConstant; +import tech.powerjob.server.remote.transport.starter.AkkaStarter; +import tech.powerjob.server.remote.transport.starter.VertXStarter; + +import javax.annotation.PostConstruct; + +/** + * 初始化器 + * + * @author tjq + * @since 2022/9/11 + */ +@Component +public class Initializer { + + @PostConstruct + public void initHandler() { + // init akka + AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME); + // init vert.x + VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler()); + } +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java index 99aa0cd3..26f18be0 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestAkkaHandler.java @@ -10,7 +10,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.Optional; -import static tech.powerjob.server.core.handler.WorkerRequestHandler.getWorkerRequestHandler; +import static tech.powerjob.server.core.handler.WorkerRequestHandlerHolder.fetchWorkerRequestHandler; /** * 处理 Worker 请求 @@ -24,7 +24,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor { public static Props defaultProps(){ return Props.create(WorkerRequestAkkaHandler.class) - .withDispatcher("akka.worker-request-actor-dispatcher") + .withDispatcher("akka.w-r-c-d") .withRouter( new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4) .withResizer(new DefaultResizer( @@ -42,9 +42,9 @@ public class WorkerRequestAkkaHandler extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() - .match(WorkerHeartbeat.class, hb -> getWorkerRequestHandler().onReceiveWorkerHeartbeat(hb)) + .match(WorkerHeartbeat.class, hb -> fetchWorkerRequestHandler().processWorkerHeartbeat(hb)) .match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq) - .match(WorkerLogReportReq.class, req -> getWorkerRequestHandler().onReceiveWorkerLogReportReq(req)) + .match(WorkerLogReportReq.class, req -> fetchWorkerRequestHandler().processWorkerLogReport(req)) .match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest) .match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq) .matchAny(obj -> log.warn("[WorkerRequestAkkaHandler] receive unknown request: {}.", obj)) @@ -71,7 +71,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor { private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) { try { - Optional askResponseOpt = getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req); + Optional askResponseOpt = fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req); if (askResponseOpt.isPresent()) { getSender().tell(AskResponse.succeed(null), getSelf()); } @@ -85,7 +85,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor { * @param req 容器部署请求 */ private void onReceiveWorkerNeedDeployContainerRequest(WorkerNeedDeployContainerRequest req) { - getSender().tell(getWorkerRequestHandler().onReceiveWorkerNeedDeployContainerRequest(req), getSelf()); + getSender().tell(fetchWorkerRequestHandler().processWorkerNeedDeployContainer(req), getSelf()); } /** @@ -94,7 +94,7 @@ public class WorkerRequestAkkaHandler extends AbstractActor { */ private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) { - getSender().tell(getWorkerRequestHandler().onReceiveWorkerQueryExecutorClusterReq(req), getSelf()); + getSender().tell(fetchWorkerRequestHandler().processWorkerQueryExecutorCluster(req), getSelf()); } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestHttpHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestHttpHandler.java index 615dab2c..3749158f 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestHttpHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/WorkerRequestHttpHandler.java @@ -21,7 +21,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import java.util.Properties; -import static tech.powerjob.server.core.handler.WorkerRequestHandler.getWorkerRequestHandler; +import static tech.powerjob.server.core.handler.WorkerRequestHandlerHolder.fetchWorkerRequestHandler; /** * WorkerRequestHandler @@ -46,14 +46,14 @@ public class WorkerRequestHttpHandler extends AbstractVerticle { router.post(ProtocolConstant.SERVER_PATH_HEARTBEAT) .handler(ctx -> { WorkerHeartbeat heartbeat = ctx.getBodyAsJson().mapTo(WorkerHeartbeat.class); - getWorkerRequestHandler().onReceiveWorkerHeartbeat(heartbeat); + fetchWorkerRequestHandler().processWorkerHeartbeat(heartbeat); success(ctx); }); router.post(ProtocolConstant.SERVER_PATH_STATUS_REPORT) .blockingHandler(ctx -> { TaskTrackerReportInstanceStatusReq req = ctx.getBodyAsJson().mapTo(TaskTrackerReportInstanceStatusReq.class); try { - getWorkerRequestHandler().onReceiveTaskTrackerReportInstanceStatusReq(req); + fetchWorkerRequestHandler().processTaskTrackerReportInstanceStatus(req); out(ctx, AskResponse.succeed(null)); } catch (Exception e) { log.error("[WorkerRequestHttpHandler] update instance status failed for request: {}.", req, e); @@ -63,7 +63,7 @@ public class WorkerRequestHttpHandler extends AbstractVerticle { router.post(ProtocolConstant.SERVER_PATH_LOG_REPORT) .blockingHandler(ctx -> { WorkerLogReportReq req = ctx.getBodyAsJson().mapTo(WorkerLogReportReq.class); - getWorkerRequestHandler().onReceiveWorkerLogReportReq(req); + fetchWorkerRequestHandler().processWorkerLogReport(req); success(ctx); }); server.requestHandler(router).listen(port); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java index f24f776c..9f75587a 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java @@ -1,5 +1,7 @@ package tech.powerjob.server.core.instance; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; import tech.powerjob.common.enums.LogLevel; import tech.powerjob.common.OmsConstant; import tech.powerjob.common.enums.TimeExpressionType; @@ -7,6 +9,7 @@ import tech.powerjob.common.model.InstanceLogContent; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.SegmentLock; +import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.persistence.StringPage; @@ -67,7 +70,9 @@ public class InstanceLogService { * 本地维护了在线日志的任务实例ID */ private final Map instanceId2LastReportTime = Maps.newConcurrentMap(); - private final ExecutorService workerPool; + + @Resource(name = PJThreadPool.BACKGROUND_POOL) + private AsyncTaskExecutor powerJobBackgroundPool; /** * 分段锁 @@ -87,16 +92,12 @@ public class InstanceLogService { */ private static final long EXPIRE_INTERVAL_MS = 60000; - public InstanceLogService() { - int coreSize = Runtime.getRuntime().availableProcessors(); - workerPool = new ThreadPoolExecutor(coreSize, coreSize, 1, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue()); - } - /** * 提交日志记录,持久化到本地数据库中 * @param workerAddress 上报机器地址 * @param logs 任务实例运行时日志 */ + @Async(value = PJThreadPool.LOCAL_DB_POOL) public void submitLogs(String workerAddress, List logs) { List logList = logs.stream().map(x -> { @@ -190,7 +191,7 @@ public class InstanceLogService { * @return 异步结果 */ private Future prepareLogFile(long instanceId) { - return workerPool.submit(() -> { + return powerJobBackgroundPool.submit(() -> { // 在线日志还在不断更新,需要使用本地数据库中的数据 if (instanceId2LastReportTime.containsKey(instanceId)) { return genTemporaryLogFile(instanceId); @@ -203,7 +204,7 @@ public class InstanceLogService { * 将本地的任务实例运行日志同步到 mongoDB 存储,在任务执行结束后异步执行 * @param instanceId 任务实例ID */ - @Async("omsBackgroundPool") + @Async(PJThreadPool.BACKGROUND_POOL) public void sync(Long instanceId) { Stopwatch sw = Stopwatch.createStarted(); @@ -345,7 +346,7 @@ public class InstanceLogService { } - @Async("omsTimingPool") + @Async(PJThreadPool.TIMING_POOL) @Scheduled(fixedDelay = 120000) public void timingCheck() { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java index a4751c0b..351fbf9f 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java @@ -191,7 +191,7 @@ public class InstanceManager { log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name()); // 上报日志数据 - HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 15, TimeUnit.SECONDS); + HashedWheelTimerHolder.INACCURATE_TIMER.schedule(() -> instanceLogService.sync(instanceId), 60, TimeUnit.SECONDS); // workflow 特殊处理 if (wfInstanceId != null) { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceMetadataService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceMetadataService.java index 4304a74b..79a320e1 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceMetadataService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceMetadataService.java @@ -33,13 +33,14 @@ public class InstanceMetadataService implements InitializingBean { @Value("${oms.instance.metadata.cache.size}") private int instanceMetadataCacheSize; - private static final int CACHE_CONCURRENCY_LEVEL = 4; + private static final int CACHE_CONCURRENCY_LEVEL = 16; @Override public void afterPropertiesSet() throws Exception { instanceId2JobInfoCache = CacheBuilder.newBuilder() .concurrencyLevel(CACHE_CONCURRENCY_LEVEL) .maximumSize(instanceMetadataCacheSize) + .softValues() .build(); } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/lock/UseCacheLockAspect.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/lock/UseCacheLockAspect.java index 631036e2..11b4591c 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/lock/UseCacheLockAspect.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/lock/UseCacheLockAspect.java @@ -11,7 +11,10 @@ import org.aspectj.lang.annotation.Aspect; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import tech.powerjob.server.common.utils.AOPUtils; +import tech.powerjob.server.monitor.MonitorService; +import tech.powerjob.server.monitor.events.lock.SlowLockEvent; +import javax.annotation.Resource; import java.lang.reflect.Method; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -29,6 +32,9 @@ import java.util.concurrent.locks.ReentrantLock; @Order(1) public class UseCacheLockAspect { + @Resource + private MonitorService monitorService; + private final Map> lockContainer = Maps.newConcurrentMap(); private static final long SLOW_THRESHOLD = 100; @@ -53,8 +59,19 @@ public class UseCacheLockAspect { try { long timeCost = System.currentTimeMillis() - start; if (timeCost > SLOW_THRESHOLD) { + + final SlowLockEvent slowLockEvent = new SlowLockEvent() + .setType(SlowLockEvent.Type.LOCAL) + .setLockType(useCacheLock.type()) + .setLockKey(String.valueOf(key)) + .setCallerService(method.getDeclaringClass().getSimpleName()) + .setCallerMethod(method.getName()) + .setCost(timeCost); + + monitorService.monitor(slowLockEvent); + log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost, - useCacheLock.key(), + key, JSON.toJSONString(point.getArgs())); } return point.proceed(); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java index 03ba4ffb..d9c9e137 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java @@ -2,6 +2,7 @@ package tech.powerjob.server.core.scheduler; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.WorkflowInstanceStatus; +import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; @@ -62,7 +63,7 @@ public class CleanService { private static final String HISTORY_DELETE_LOCK = "history_delete_lock"; - @Async("omsTimingPool") + @Async(PJThreadPool.TIMING_POOL) @Scheduled(cron = CLEAN_TIME_EXPRESSION) public void timingClean() { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java index 4ca3e327..b324923e 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java @@ -4,6 +4,7 @@ import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.enums.WorkflowInstanceStatus; +import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.remote.transport.starter.AkkaStarter; import tech.powerjob.server.persistence.remote.model.*; @@ -59,7 +60,7 @@ public class InstanceStatusCheckService { @Resource private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; - @Async("omsTimingPool") + @Async(PJThreadPool.TIMING_POOL) @Scheduled(fixedDelay = 10000) public void timingStatusCheck() { Stopwatch stopwatch = Stopwatch.createStarted(); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java index 7e46c452..862acf8e 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java @@ -3,6 +3,7 @@ package tech.powerjob.server.core.scheduler; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.LifeCycle; +import tech.powerjob.server.common.constants.PJThreadPool; import tech.powerjob.server.remote.transport.starter.AkkaStarter; import tech.powerjob.server.common.constants.SwitchableStatus; import tech.powerjob.server.persistence.remote.model.AppInfoDO; @@ -73,7 +74,7 @@ public class PowerScheduleService { private static final long SCHEDULE_RATE = 15000; - @Async("omsTimingPool") + @Async(PJThreadPool.TIMING_POOL) @Scheduled(fixedDelay = SCHEDULE_RATE) public void timingSchedule() { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/IdGenerateService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/IdGenerateService.java index eae67869..2f42e087 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/IdGenerateService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/IdGenerateService.java @@ -1,9 +1,9 @@ package tech.powerjob.server.core.uid; -import tech.powerjob.server.remote.server.ServerInfoService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import tech.powerjob.server.remote.server.self.ServerInfoService; /** * 唯一ID生成服务,使用 Twitter snowflake 算法 @@ -23,7 +23,7 @@ public class IdGenerateService { @Autowired public IdGenerateService(ServerInfoService serverInfoService) { - long id = serverInfoService.getServerId(); + long id = serverInfoService.fetchServiceInfo().getId(); snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id); log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id); } diff --git a/powerjob-server/powerjob-server-monitor/pom.xml b/powerjob-server/powerjob-server-monitor/pom.xml new file mode 100644 index 00000000..dce0a058 --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/pom.xml @@ -0,0 +1,28 @@ + + + + powerjob-server + tech.powerjob + 4.1.0 + ../pom.xml + + 4.0.0 + + powerjob-server-monitor + ${project.parent.version} + + + 8 + 8 + + + + + tech.powerjob + powerjob-server-common + + + + \ No newline at end of file diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/Event.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/Event.java new file mode 100644 index 00000000..e62a304f --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/Event.java @@ -0,0 +1,22 @@ +package tech.powerjob.server.monitor; + +/** + * 监控事件 + * + * @author tjq + * @since 2022/9/6 + */ +public interface Event { + + /** + * 监控事件的类型 + * @return 监控类型 + */ + String type(); + + /** + * 监控事件的内容 + * @return 监控事件的内容 + */ + String message(); +} diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/Monitor.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/Monitor.java new file mode 100644 index 00000000..6df7030d --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/Monitor.java @@ -0,0 +1,21 @@ +package tech.powerjob.server.monitor; + +/** + * 监视器 + * + * @author tjq + * @since 2022/9/6 + */ +public interface Monitor { + + /** + * 全局上下文绑定 & 初始化 + */ + void init(); + /** + * 记录监控事件 + * 请注意该方法务必异步不阻塞!!! + * @param event 事件 + */ + void record(Event event); +} diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/MonitorService.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/MonitorService.java new file mode 100644 index 00000000..0fa1465f --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/MonitorService.java @@ -0,0 +1,11 @@ +package tech.powerjob.server.monitor; + +/** + * 对外暴露的监控服务 + * + * @author tjq + * @since 2022/9/10 + */ +public interface MonitorService { + void monitor(Event event); +} diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/PowerJobMonitorService.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/PowerJobMonitorService.java new file mode 100644 index 00000000..90caaa5e --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/PowerJobMonitorService.java @@ -0,0 +1,35 @@ +package tech.powerjob.server.monitor; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * PowerJob 服务端监控 + * + * @author tjq + * @since 2022/9/10 + */ +@Slf4j +@Component +public class PowerJobMonitorService implements MonitorService { + + private final List monitors = Lists.newLinkedList(); + + @Autowired + public PowerJobMonitorService(List monitors) { + + monitors.forEach(m -> { + log.info("[MonitorService] register monitor: {}", m.getClass().getName()); + this.monitors.add(m); + }); + } + + @Override + public void monitor(Event event) { + monitors.forEach(m -> m.record(event)); + } +} diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/db/DatabaseEvent.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/db/DatabaseEvent.java new file mode 100644 index 00000000..1980cf54 --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/db/DatabaseEvent.java @@ -0,0 +1,48 @@ +package tech.powerjob.server.monitor.events.db; + +import lombok.Setter; +import lombok.experimental.Accessors; +import tech.powerjob.server.common.SJ; +import tech.powerjob.server.monitor.Event; + +/** + * 数据库操作事件 + * + * @author tjq + * @since 2022/9/6 + */ +@Setter +@Accessors(chain = true) +public class DatabaseEvent implements Event { + + private DatabaseType type; + + private String serviceName; + + private String methodName; + + private Status status; + + private Integer rows; + + private long cost; + + private String errorMsg; + + private String extra; + + public enum Status { + SUCCESS, + FAILED + } + + @Override + public String type() { + return "MONITOR_LOGGER_DB_OPERATION"; + } + + @Override + public String message() { + return SJ.MONITOR_JOINER.join(type, serviceName, methodName, status, rows, cost, errorMsg, extra); + } +} diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/db/DatabaseType.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/db/DatabaseType.java new file mode 100644 index 00000000..673e16b0 --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/db/DatabaseType.java @@ -0,0 +1,22 @@ +package tech.powerjob.server.monitor.events.db; + +/** + * DatabaseEventType + * + * @author tjq + * @since 2022/9/6 + */ +public enum DatabaseType { + /** + * 本地存储库,H2 + */ + LOCAL, + /** + * 远程核心库 + */ + CORE, + /** + * 扩展库 + */ + EXTRA +} diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/lock/SlowLockEvent.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/lock/SlowLockEvent.java new file mode 100644 index 00000000..9211c90b --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/lock/SlowLockEvent.java @@ -0,0 +1,39 @@ +package tech.powerjob.server.monitor.events.lock; + +import lombok.Setter; +import lombok.experimental.Accessors; +import tech.powerjob.server.common.SJ; +import tech.powerjob.server.monitor.Event; + +/** + * 长时间等待锁事件 + * + * @author tjq + * @since 2022/9/9 + */ +@Setter +@Accessors(chain = true) +public class SlowLockEvent implements Event { + + private Type type; + private String lockType; + private String lockKey; + private String callerService; + private String callerMethod; + private long cost; + + public enum Type { + LOCAL, + DB + } + + @Override + public String type() { + return "MONITOR_LOGGER_SLOW_LOCK"; + } + + @Override + public String message() { + return SJ.MONITOR_JOINER.join(type, lockType, lockKey, callerService, callerMethod, cost); + } +} diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/TtReportInstanceStatusEvent.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/TtReportInstanceStatusEvent.java new file mode 100644 index 00000000..d43e7db0 --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/TtReportInstanceStatusEvent.java @@ -0,0 +1,47 @@ +package tech.powerjob.server.monitor.events.w2s; + +import lombok.Setter; +import lombok.experimental.Accessors; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.server.common.SJ; +import tech.powerjob.server.monitor.Event; + +/** + * TaskTrackerReportInstanceStatus + * + * @author tjq + * @since 2022/9/9 + */ +@Setter +@Accessors(chain = true) +public class TtReportInstanceStatusEvent implements Event { + + private Long appId; + private Long jobId; + private Long instanceId; + + private Long wfInstanceId; + + private InstanceStatus instanceStatus; + + private Long delayMs; + + private Status serverProcessStatus; + + private Long serverProcessCost; + + public enum Status { + SUCCESS, + FAILED + } + + @Override + public String type() { + return "MONITOR_LOGGER_TT_REPORT_STATUS"; + } + + @Override + public String message() { + return SJ.MONITOR_JOINER.join(appId, jobId, instanceId, wfInstanceId, instanceStatus, delayMs, serverProcessStatus, serverProcessCost); + } +} diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerHeartbeatEvent.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerHeartbeatEvent.java new file mode 100644 index 00000000..d2686dd8 --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerHeartbeatEvent.java @@ -0,0 +1,44 @@ +package tech.powerjob.server.monitor.events.w2s; + +import lombok.Setter; +import lombok.experimental.Accessors; +import tech.powerjob.server.common.SJ; +import tech.powerjob.server.monitor.Event; + +/** + * worker 心跳事件监控 + * + * @author tjq + * @since 2022/9/9 + */ +@Setter +@Accessors(chain = true) +public class WorkerHeartbeatEvent implements Event { + + private String appName; + /** + * 虽然和 AppName 冗余,但考虑到其他日志使用 appId 监控,此处可方便潜在的其他处理 + */ + private Long appId; + private String version; + + private String protocol; + + private String tag; + private String workerAddress; + /** + * worker 上报时间与 server 之间的延迟 + */ + private long delayMs; + private Integer score; + + @Override + public String type() { + return "MONITOR_LOGGER_WORKER_HEART_BEAT"; + } + + @Override + public String message() { + return SJ.MONITOR_JOINER.join(appName, appId, version, protocol, tag, workerAddress, delayMs, score); + } +} diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerLogReportEvent.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerLogReportEvent.java new file mode 100644 index 00000000..fe12778c --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/events/w2s/WorkerLogReportEvent.java @@ -0,0 +1,48 @@ +package tech.powerjob.server.monitor.events.w2s; + +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import tech.powerjob.server.common.SJ; +import tech.powerjob.server.monitor.Event; + +/** + * description + * + * @author tjq + * @since 2022/9/11 + */ +@Setter +@Accessors(chain = true) +public class WorkerLogReportEvent implements Event { + + private String workerAddress; + + /** + * 日志条数 + */ + private long logNum; + + /** + * 日志大小,用于统计 IO 压力 + */ + private long logSize; + + private Status status; + + public enum Status { + SUCCESS, + REJECTED, + EXCEPTION + } + + @Override + public String type() { + return "MONITOR_LOGGER_WORKER_LOG_REPORT"; + } + + @Override + public String message() { + return SJ.MONITOR_JOINER.join(workerAddress, logNum, logSize, status); + } +} diff --git a/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/monitors/LogMonitor.java b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/monitors/LogMonitor.java new file mode 100644 index 00000000..050e5ed7 --- /dev/null +++ b/powerjob-server/powerjob-server-monitor/src/tech/powerjob/server/monitor/monitors/LogMonitor.java @@ -0,0 +1,43 @@ +package tech.powerjob.server.monitor.monitors; + +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.stereotype.Component; +import tech.powerjob.server.common.aware.ServerInfoAware; +import tech.powerjob.server.common.module.ServerInfo; +import tech.powerjob.server.monitor.Event; +import tech.powerjob.server.monitor.Monitor; + +/** + * 系统默认实现——基于日志的监控监视器 + * 需要接入方自行基于类 ELK 系统采集 + * + * @author tjq + * @since 2022/9/6 + */ +@Component +public class LogMonitor implements Monitor, ServerInfoAware { + + /** + * server 启动依赖 DB,DB会被 monitor,因此最初的几条 log serverInfo 一定为空,在此处简单防空 + */ + private ServerInfo serverInfo = new ServerInfo(); + + private static final String MDC_KEY_SERVER_ID = "serverId"; + + + @Override + public void init() { + } + + @Override + public void record(Event event) { + MDC.put(MDC_KEY_SERVER_ID, String.valueOf(serverInfo.getId())); + LoggerFactory.getLogger(event.type()).info(event.message()); + } + + @Override + public void setServerInfo(ServerInfo serverInfo) { + this.serverInfo = serverInfo; + } +} diff --git a/powerjob-server/powerjob-server-persistence/pom.xml b/powerjob-server/powerjob-server-persistence/pom.xml index 444fac74..b26d81d8 100644 --- a/powerjob-server/powerjob-server-persistence/pom.xml +++ b/powerjob-server/powerjob-server-persistence/pom.xml @@ -23,6 +23,10 @@ tech.powerjob powerjob-server-common + + tech.powerjob + powerjob-server-monitor + \ No newline at end of file diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/monitor/DatabaseMonitorAspect.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/monitor/DatabaseMonitorAspect.java new file mode 100644 index 00000000..6d5adb49 --- /dev/null +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/monitor/DatabaseMonitorAspect.java @@ -0,0 +1,90 @@ +package tech.powerjob.server.persistence.monitor; + +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.springframework.data.domain.Slice; +import org.springframework.stereotype.Component; +import tech.powerjob.server.common.utils.AOPUtils; +import tech.powerjob.server.monitor.MonitorService; +import tech.powerjob.server.monitor.events.db.DatabaseEvent; +import tech.powerjob.server.monitor.events.db.DatabaseType; + +import javax.annotation.Resource; +import java.util.Collection; +import java.util.Optional; +import java.util.stream.Stream; + +/** + * 监控切面 + * + * @author tjq + * @since 2022/9/6 + */ +@Slf4j +@Aspect +@Component +public class DatabaseMonitorAspect { + + @Resource + private MonitorService monitorService; + + @Around("execution(* tech.powerjob.server.persistence.remote.repository..*.*(..))") + public Object monitorCoreDB(ProceedingJoinPoint joinPoint) throws Throwable { + return wrapperMonitor(joinPoint, DatabaseType.CORE); + } + + @Around("execution(* tech.powerjob.server.persistence.local..*.*(..))") + public Object monitorLocalDB(ProceedingJoinPoint joinPoint) throws Throwable { + return wrapperMonitor(joinPoint, DatabaseType.LOCAL); + } + + private Object wrapperMonitor(ProceedingJoinPoint point, DatabaseType type) throws Throwable { + String classNameMini = AOPUtils.parseRealClassName(point); + final String methodName = point.getSignature().getName(); + + DatabaseEvent event = new DatabaseEvent().setType(type) + .setServiceName(classNameMini) + .setMethodName(methodName) + .setStatus(DatabaseEvent.Status.SUCCESS); + + long startTs = System.currentTimeMillis(); + try { + final Object ret = point.proceed(); + event.setRows(parseEffectRows(ret)); + return ret; + } catch (Throwable t) { + event.setErrorMsg(t.getClass().getSimpleName()).setStatus(DatabaseEvent.Status.FAILED); + throw t; + } finally { + long cost = System.currentTimeMillis() - startTs; + monitorService.monitor(event.setCost(cost)); + } + } + + private static Integer parseEffectRows(Object ret) { + + // 从性能角度考虑,最高频场景放在最前面判断 + + if (ret instanceof Number) { + return ((Number) ret).intValue(); + } + if (ret instanceof Optional) { + return ((Optional) ret).isPresent() ? 1 : 0; + } + if (ret instanceof Collection) { + return ((Collection) ret).size(); + } + if (ret instanceof Slice) { + return ((Slice) ret).getSize(); + } + + if (ret instanceof Stream) { + return null; + } + // TODO: 直接返回对象的方法全部改成 Optional + + return ret == null ? 0 : 1; + } +} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/self/ServerInfoService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/self/ServerInfoService.java new file mode 100644 index 00000000..5c799a7a --- /dev/null +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/self/ServerInfoService.java @@ -0,0 +1,19 @@ +package tech.powerjob.server.remote.server.self; + +import tech.powerjob.server.common.module.ServerInfo; + +/** + * ServerInfoService + * + * @author tjq + * @since 2022/9/12 + */ +public interface ServerInfoService { + + /** + * fetch current server info + * @return ServerInfo + */ + ServerInfo fetchServiceInfo(); + +} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/ServerInfoService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/self/ServerInfoServiceImpl.java similarity index 80% rename from powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/ServerInfoService.java rename to powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/self/ServerInfoServiceImpl.java index 6b83a007..eb94b58b 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/ServerInfoService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/self/ServerInfoServiceImpl.java @@ -1,10 +1,13 @@ -package tech.powerjob.server.remote.server; +package tech.powerjob.server.remote.server.self; import org.apache.commons.lang3.StringUtils; import org.springframework.boot.info.BuildProperties; +import org.springframework.scheduling.annotation.Async; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.server.common.constants.PJThreadPool; +import tech.powerjob.server.common.module.ServerInfo; import tech.powerjob.server.extension.LockService; import tech.powerjob.server.persistence.remote.model.ServerInfoDO; import tech.powerjob.server.persistence.remote.repository.ServerInfoRepository; @@ -28,37 +31,25 @@ import java.util.stream.Collectors; */ @Slf4j @Service -public class ServerInfoService { +public class ServerInfoServiceImpl implements ServerInfoService { - private final String ip; - private final long serverId; + private final ServerInfo serverInfo; private final ServerInfoRepository serverInfoRepository; - private String version = "UNKNOWN"; - private static final long MAX_SERVER_CLUSTER_SIZE = 10000; private static final String SERVER_INIT_LOCK = "server_init_lock"; private static final int SERVER_INIT_LOCK_MAX_TIME = 15000; - public long getServerId() { - return serverId; - } - - public String getServerIp() { - return ip; - } - - public String getServerVersion() { - return version; - } - @Autowired - public ServerInfoService(LockService lockService, ServerInfoRepository serverInfoRepository) { + public ServerInfoServiceImpl(LockService lockService, ServerInfoRepository serverInfoRepository) { - this.ip = NetUtils.getLocalHost(); + this.serverInfo = new ServerInfo(); + + String ip = NetUtils.getLocalHost(); + serverInfo.setIp(ip); this.serverInfoRepository = serverInfoRepository; Stopwatch sw = Stopwatch.createStarted(); @@ -80,10 +71,11 @@ public class ServerInfoService { } if (server.getId() < MAX_SERVER_CLUSTER_SIZE) { - this.serverId = server.getId(); + serverInfo.setId(server.getId()); } else { - this.serverId = retryServerId(); - serverInfoRepository.updateIdByIp(this.serverId, ip); + long retryServerId = retryServerId(); + serverInfo.setId(retryServerId); + serverInfoRepository.updateIdByIp(retryServerId, ip); } } catch (Exception e) { @@ -93,12 +85,12 @@ public class ServerInfoService { lockService.unlock(SERVER_INIT_LOCK); } - log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverId, sw); + log.info("[ServerInfoService] ip:{}, id:{}, cost:{}", ip, serverInfo.getId(), sw); } @Scheduled(fixedRate = 15000, initialDelay = 15000) public void heartbeat() { - serverInfoRepository.updateGmtModifiedByIp(ip, new Date()); + serverInfoRepository.updateGmtModifiedByIp(serverInfo.getIp(), new Date()); } @@ -142,7 +134,12 @@ public class ServerInfoService { } String pomVersion = buildProperties.getVersion(); if (StringUtils.isNotBlank(pomVersion)) { - version = pomVersion; + serverInfo.setVersion(pomVersion); } } + + @Override + public ServerInfo fetchServiceInfo() { + return serverInfo; + } } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java index da555c0b..04002cb7 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java @@ -4,6 +4,7 @@ import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import tech.powerjob.common.enums.DispatchStrategy; import tech.powerjob.common.model.DeployedContainerInfo; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.extension.WorkerFilter; @@ -44,8 +45,17 @@ public class WorkerClusterQueryService { workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo)); - // 按健康度排序 - workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); + DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy()); + switch (dispatchStrategy) { + case RANDOM: + Collections.shuffle(workers); + break; + case HEALTH_FIRST: + workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); + break; + default: + // do nothing + } // 限定集群大小(0代表不限制) if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) { diff --git a/powerjob-server/powerjob-server-starter/pom.xml b/powerjob-server/powerjob-server-starter/pom.xml index c1fb09f1..b6e95a20 100644 --- a/powerjob-server/powerjob-server-starter/pom.xml +++ b/powerjob-server/powerjob-server-starter/pom.xml @@ -31,6 +31,10 @@ tech.powerjob powerjob-server-common + + tech.powerjob + powerjob-server-monitor + tech.powerjob powerjob-server-persistence diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/SwaggerConfig.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/SwaggerConfig.java index 77bb4310..fac926c5 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/SwaggerConfig.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/SwaggerConfig.java @@ -9,7 +9,7 @@ import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; import tech.powerjob.server.common.PowerJobServerConfigKey; -import tech.powerjob.server.remote.server.ServerInfoService; +import tech.powerjob.server.remote.server.self.ServerInfoService; import javax.annotation.Resource; @@ -39,7 +39,7 @@ public class SwaggerConfig { .description("Distributed scheduling and computing framework.") .license("Apache Licence 2") .termsOfServiceUrl("https://github.com/PowerJob/PowerJob") - .version(serverInfoService.getServerVersion()) + .version(serverInfoService.fetchServiceInfo().getVersion()) .build(); return new Docket(DocumentationType.SWAGGER_2) diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java index b9f42e86..899c182e 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java @@ -1,5 +1,7 @@ package tech.powerjob.server.config; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; import tech.powerjob.server.common.RejectedExecutionHandlerFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; @@ -8,14 +10,12 @@ import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import tech.powerjob.server.common.constants.PJThreadPool; import java.util.concurrent.*; /** * 公用线程池配置 - * omsTimingPool:用于执行定时任务的线程池 - * omsBackgroundPool:用于执行后台任务的线程池,这类任务对时间不敏感,慢慢执行细水长流即可 - * taskScheduler:用于定时调度的线程池 * * @author tjq * @since 2020/4/28 @@ -25,28 +25,40 @@ import java.util.concurrent.*; @Configuration public class ThreadPoolConfig { - @Bean("omsTimingPool") - public Executor getTimingPool() { + @Bean(PJThreadPool.TIMING_POOL) + public TaskExecutor getTimingPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4); // use SynchronousQueue executor.setQueueCapacity(0); executor.setKeepAliveSeconds(60); - executor.setThreadNamePrefix("omsTimingPool-"); - executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun("PowerJobTiming")); + executor.setThreadNamePrefix("PJ-TIMING-"); + executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newThreadRun(PJThreadPool.TIMING_POOL)); return executor; } - @Bean("omsBackgroundPool") - public Executor initBackgroundPool() { + @Bean(PJThreadPool.BACKGROUND_POOL) + public AsyncTaskExecutor initBackgroundPool() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 8); executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 16); executor.setQueueCapacity(8192); executor.setKeepAliveSeconds(60); - executor.setThreadNamePrefix("omsBackgroundPool-"); - executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newReject("PowerJobBackgroundPool")); + executor.setThreadNamePrefix("PJ-BG-"); + executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newDiscard(PJThreadPool.BACKGROUND_POOL)); + return executor; + } + + @Bean(PJThreadPool.LOCAL_DB_POOL) + public TaskExecutor initOmsLocalDbPool() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + int tSize = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + executor.setCorePoolSize(tSize); + executor.setMaxPoolSize(tSize); + executor.setQueueCapacity(2048); + executor.setThreadNamePrefix("PJ-LOCALDB-"); + executor.setRejectedExecutionHandler(RejectedExecutionHandlerFactory.newAbort(PJThreadPool.LOCAL_DB_POOL)); return executor; } @@ -55,7 +67,7 @@ public class ThreadPoolConfig { public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); - scheduler.setThreadNamePrefix("PowerJobSchedulePool-"); + scheduler.setThreadNamePrefix("PJ-WS-"); scheduler.setDaemon(true); return scheduler; } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/support/ServerInfoAwareProcessor.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/support/ServerInfoAwareProcessor.java new file mode 100644 index 00000000..6a4aa54a --- /dev/null +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/support/ServerInfoAwareProcessor.java @@ -0,0 +1,29 @@ +package tech.powerjob.server.support; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import tech.powerjob.server.common.aware.ServerInfoAware; +import tech.powerjob.server.common.module.ServerInfo; +import tech.powerjob.server.remote.server.self.ServerInfoService; + +import java.util.List; + +/** + * ServerInfoAwareProcessor + * + * @author tjq + * @since 2022/9/12 + */ +@Slf4j +@Component +public class ServerInfoAwareProcessor { + + public ServerInfoAwareProcessor(ServerInfoService serverInfoService, List awareList) { + final ServerInfo serverInfo = serverInfoService.fetchServiceInfo(); + log.info("[ServerInfoAwareProcessor] current server info: {}", serverInfo); + awareList.forEach(aware -> { + aware.setServerInfo(serverInfo); + log.info("[ServerInfoAwareProcessor] set ServerInfo for: {} successfully", aware); + }); + } +} diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/WebLogAspect.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/WebLogAspect.java index 10aa456f..33b5812e 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/WebLogAspect.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/WebLogAspect.java @@ -13,6 +13,7 @@ import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import org.springframework.web.multipart.MultipartFile; +import tech.powerjob.server.common.utils.AOPUtils; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -60,8 +61,8 @@ public class WebLogAspect { } HttpServletRequest request = requestAttributes.getRequest(); - String[] classNameSplit = joinPoint.getSignature().getDeclaringTypeName().split("\\."); - String classNameMini = classNameSplit[classNameSplit.length - 1]; + + String classNameMini = AOPUtils.parseRealClassName(joinPoint); String classMethod = classNameMini + "." + joinPoint.getSignature().getName(); // 排除特殊类 diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/SystemInfoController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/SystemInfoController.java index 6d747129..9dda6ea2 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/SystemInfoController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/SystemInfoController.java @@ -4,9 +4,10 @@ import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.OmsConstant; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.module.ServerInfo; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; -import tech.powerjob.server.remote.server.ServerInfoService; +import tech.powerjob.server.remote.server.self.ServerInfoService; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.web.response.SystemOverviewVO; @@ -70,8 +71,7 @@ public class SystemInfoController { // 服务器时间 overview.setServerTime(DateFormatUtils.format(new Date(), OmsConstant.TIME_PATTERN)); - SystemOverviewVO.CurrentServerInfo info = new SystemOverviewVO.CurrentServerInfo(serverInfoService.getServerId(), serverInfoService.getServerIp(), serverInfoService.getServerVersion()); - overview.setCurrentServerInfo(info); + overview.setServerInfo(serverInfoService.fetchServiceInfo()); return ResultDTO.success(overview); } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/SystemOverviewVO.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/SystemOverviewVO.java index 3bc86786..c7b94500 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/SystemOverviewVO.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/SystemOverviewVO.java @@ -3,6 +3,7 @@ package tech.powerjob.server.web.response; import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; +import tech.powerjob.server.common.module.ServerInfo; /** * 系统概览 @@ -21,13 +22,5 @@ public class SystemOverviewVO { // 服务器时间 private String serverTime; - private CurrentServerInfo currentServerInfo; - - @Getter - @AllArgsConstructor - public static class CurrentServerInfo { - private final long id; - private final String ip; - private final String version; - } + private ServerInfo serverInfo; } diff --git a/powerjob-server/powerjob-server-starter/src/main/resources/logback-config/powerjob_monitor.xml b/powerjob-server/powerjob-server-starter/src/main/resources/logback-config/powerjob_monitor.xml new file mode 100644 index 00000000..d357a833 --- /dev/null +++ b/powerjob-server/powerjob-server-starter/src/main/resources/logback-config/powerjob_monitor.xml @@ -0,0 +1,132 @@ + + + + + + + + + + + ${MONITOR_LOG_PATH}/database.log + + ${MONITOR_LOG_PATTERN} + UTF-8 + + + ${MONITOR_LOG_PATH}/database.log.${ROTATE_PATTERN} + 3 + 200MB + 1000MB + + + + 512 + 0 + true + + + + + + + + + + ${MONITOR_LOG_PATH}/tt_status_report.log + + ${MONITOR_LOG_PATTERN} + UTF-8 + + + ${MONITOR_LOG_PATH}/tt_status_report.log.${ROTATE_PATTERN} + 3 + 200MB + 1000MB + + + + 512 + 0 + true + + + + + + + + + ${MONITOR_LOG_PATH}/worker_heartbeat.log + + ${MONITOR_LOG_PATTERN} + UTF-8 + + + ${MONITOR_LOG_PATH}/worker_heartbeat.log.${ROTATE_PATTERN} + 3 + 200MB + 1000MB + + + + 512 + 0 + true + + + + + + + + + + ${MONITOR_LOG_PATH}/worker_log_report.log + + ${MONITOR_LOG_PATTERN} + UTF-8 + + + ${MONITOR_LOG_PATH}/worker_log_report.log.${ROTATE_PATTERN} + 3 + 200MB + 1000MB + + + + 512 + 0 + true + + + + + + + + + + ${MONITOR_LOG_PATH}/lock.log + + ${MONITOR_LOG_PATTERN} + UTF-8 + + + ${MONITOR_LOG_PATH}/lock.log.${ROTATE_PATTERN} + 3 + 200MB + 1000MB + + + + 512 + 0 + true + + + + + + + \ No newline at end of file diff --git a/powerjob-server/powerjob-server-starter/src/main/resources/logback-product.xml b/powerjob-server/powerjob-server-starter/src/main/resources/logback-product.xml index b1d2b1f4..80be04f6 100644 --- a/powerjob-server/powerjob-server-starter/src/main/resources/logback-product.xml +++ b/powerjob-server/powerjob-server-starter/src/main/resources/logback-product.xml @@ -14,6 +14,9 @@ --> + + + ${LOG_PATH}/powerjob-server-error.log diff --git a/powerjob-server/powerjob-server-starter/src/main/resources/oms-server.akka.conf b/powerjob-server/powerjob-server-starter/src/main/resources/oms-server.akka.conf index 7e2274cd..95afe889 100644 --- a/powerjob-server/powerjob-server-starter/src/main/resources/oms-server.akka.conf +++ b/powerjob-server/powerjob-server-starter/src/main/resources/oms-server.akka.conf @@ -25,7 +25,8 @@ akka { } } - worker-request-actor-dispatcher { + # worker-request-core-dispatcher + w-r-c-d { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java index 1a67c378..b5721b91 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java @@ -17,10 +17,15 @@ import java.util.Map; @Slf4j public class ProcessorTrackerStatusHolder { + private final Long instanceId; + private final Integer maxWorkerCount; // ProcessorTracker的address(IP:Port) -> 状态 private final Map address2Status; - public ProcessorTrackerStatusHolder(List allWorkerAddress) { + public ProcessorTrackerStatusHolder(Long instanceId, Integer maxWorkerCount, List allWorkerAddress) { + + this.instanceId = instanceId; + this.maxWorkerCount = maxWorkerCount; address2Status = Maps.newConcurrentMap(); allWorkerAddress.forEach(address -> { @@ -38,7 +43,7 @@ public class ProcessorTrackerStatusHolder { public ProcessorTrackerStatus getProcessorTrackerStatus(String address) { // remove 前突然收到了 PT 心跳同时立即被派发才可能出现这种情况,0.001% 概率 return address2Status.computeIfAbsent(address, ignore -> { - log.warn("[ProcessorTrackerStatusHolder] unregistered worker: {}", address); + log.warn("[PTStatusHolder-{}] unregistered worker: {}", instanceId, address); ProcessorTrackerStatus processorTrackerStatus = new ProcessorTrackerStatus(); processorTrackerStatus.init(address); return processorTrackerStatus; @@ -90,9 +95,9 @@ public class ProcessorTrackerStatusHolder { /** * 注册新的执行节点 * @param address 新的执行节点地址 - * @return true: 注册成功 / false:已存在 + * @return true: register successfully / false: already exists */ - public boolean register(String address) { + private boolean registerOne(String address) { ProcessorTrackerStatus pts = address2Status.get(address); if (pts != null) { return false; @@ -100,9 +105,50 @@ public class ProcessorTrackerStatusHolder { pts = new ProcessorTrackerStatus(); pts.init(address); address2Status.put(address, pts); + log.info("[PTStatusHolder-{}] register new worker: {}", instanceId, address); return true; } + public void register(List workerIpList) { + if (endlessWorkerNum()) { + workerIpList.forEach(this::registerOne); + return; + } + List availableProcessorTrackers = getAvailableProcessorTrackers(); + int currentWorkerSize = availableProcessorTrackers.size(); + int needMoreNum = maxWorkerCount - currentWorkerSize; + if (needMoreNum <= 0) { + return; + } + + log.info("[PTStatusHolder-{}] currentWorkerSize: {}, needMoreNum: {}", instanceId, currentWorkerSize, needMoreNum); + + for (String newIp : workerIpList) { + boolean success = registerOne(newIp); + if (success) { + needMoreNum --; + } + if (needMoreNum <= 0) { + return; + } + } + } + + /** + * 检查是否需要动态加载新的执行器 + * @return check need more workers + */ + public boolean checkNeedMoreWorker() { + if (endlessWorkerNum()) { + return true; + } + return getAvailableProcessorTrackers().size() < maxWorkerCount; + } + + private boolean endlessWorkerNum() { + return maxWorkerCount == null || maxWorkerCount == 0; + } + public void remove(List addressList) { addressList.forEach(address2Status::remove); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/CommonTaskTracker.java index 2a188a54..1cf442b5 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -158,6 +158,7 @@ public class CommonTaskTracker extends TaskTracker { log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder); TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq(); + req.setAppId(workerRuntime.getAppId()); req.setJobId(instanceInfo.getJobId()); req.setInstanceId(instanceId); req.setWfInstanceId(instanceInfo.getWfInstanceId()); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index a4e79f83..071477d5 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -7,14 +7,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Data; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; -import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.AlarmConfig; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerScheduleJobReq; @@ -339,6 +338,7 @@ public class FrequentTaskTracker extends TaskTracker { } TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq(); + req.setAppId(workerRuntime.getAppId()); req.setJobId(instanceInfo.getJobId()); req.setInstanceId(instanceId); req.setReportTime(System.currentTimeMillis()); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java index fdf35fa9..492b8d7b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java @@ -124,7 +124,7 @@ public abstract class TaskTracker { // 保护性操作 instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency())); - this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress()); + this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress()); this.taskPersistenceService = workerRuntime.getTaskPersistenceService(); this.finished = new AtomicBoolean(false); // 只有工作流中的任务允许向工作流中追加上下文数据 @@ -562,6 +562,13 @@ public abstract class TaskTracker { protected class WorkerDetector implements Runnable { @Override public void run() { + + boolean needMoreWorker = ptStatusHolder.checkNeedMoreWorker(); + log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", instanceId, needMoreWorker); + if (!needMoreWorker) { + return; + } + String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress()); if (StringUtils.isEmpty(serverPath)) { log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId); @@ -575,11 +582,7 @@ public abstract class TaskTracker { } try { List workerList = JsonUtils.parseObject(response.getData(), new TypeReference>() {}); - workerList.forEach(address -> { - if (ptStatusHolder.register(address)) { - log.info("[TaskTracker-{}] detective new worker: {}", instanceId, address); - } - }); + ptStatusHolder.register(workerList); } catch (Exception e) { log.warn("[TaskTracker-{}] detective failed!", instanceId, e); }