diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/HashedWheelTimerHolder.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/HashedWheelTimerHolder.java index 5d73a46a..f8abc5be 100644 --- a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/HashedWheelTimerHolder.java +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/HashedWheelTimerHolder.java @@ -11,7 +11,9 @@ import tech.powerjob.server.common.timewheel.Timer; */ public class HashedWheelTimerHolder { - // 非精确时间轮,每 5S 走一格 + /** + * 非精确时间轮,每 5S 走一格 + */ public static final Timer INACCURATE_TIMER = new HashedWheelTimer(5000, 16, 0); private HashedWheelTimerHolder() { diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/InstanceTimeWheelService.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/InstanceTimeWheelService.java index 35d06d4c..d38d53d8 100644 --- a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/InstanceTimeWheelService.java +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/timewheel/holder/InstanceTimeWheelService.java @@ -19,14 +19,22 @@ public class InstanceTimeWheelService { private static final Map CARGO = Maps.newConcurrentMap(); - // 精确调度时间轮,每 1MS 走一格 + /** + * 精确调度时间轮,每 1MS 走一格 + */ private static final Timer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4); - // 非精确调度时间轮,用于处理高延迟任务,每 10S 走一格 + /** + * 非精确调度时间轮,用于处理高延迟任务,每 10S 走一格 + */ private static final Timer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0); - // 支持取消的时间间隔,低于该阈值则不会放进 CARGO + /** + * 支持取消的时间间隔,低于该阈值则不会放进 CARGO + */ private static final long MIN_INTERVAL_MS = 1000; - // 长延迟阈值 + /** + * 长延迟阈值 + */ private static final long LONG_DELAY_THRESHOLD_MS = 60000; /** diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/AOPUtils.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/AOPUtils.java index 3d06dd64..66d98f14 100644 --- a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/AOPUtils.java +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/AOPUtils.java @@ -25,8 +25,8 @@ import java.lang.reflect.Method; @Slf4j public class AOPUtils { - private static final ExpressionParser parser = new SpelExpressionParser(); - private static final ParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer(); + private static final ExpressionParser PARSER = new SpelExpressionParser(); + private static final ParameterNameDiscoverer DISCOVERER = new LocalVariableTableParameterNameDiscoverer(); public static String parseRealClassName(JoinPoint joinPoint) { return joinPoint.getSignature().getDeclaringType().getSimpleName(); @@ -50,7 +50,7 @@ public class AOPUtils { } public static T parseSpEl(Method method, Object[] arguments, String spEl, Class clazz, T defaultResult) { - String[] params = discoverer.getParameterNames(method); + String[] params = DISCOVERER.getParameterNames(method); assert params != null; EvaluationContext context = new StandardEvaluationContext(); @@ -58,7 +58,7 @@ public class AOPUtils { context.setVariable(params[len], arguments[len]); } try { - Expression expression = parser.parseExpression(spEl); + Expression expression = PARSER.parseExpression(spEl); return expression.getValue(context, clazz); } catch (Exception e) { log.error("[AOPUtils] parse SpEL failed for method[{}], please concat @tjq to fix the bug!", method.getName(), e); diff --git a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/TimeUtils.java b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/TimeUtils.java index 09df140f..ea6e388b 100644 --- a/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/TimeUtils.java +++ b/powerjob-server/powerjob-server-common/src/main/java/tech/powerjob/server/common/utils/TimeUtils.java @@ -19,9 +19,13 @@ import java.util.List; @Slf4j public class TimeUtils { - // NTP 授时服务器(阿里云 -> 交大 -> 水果) + /** + * NTP 授时服务器(阿里云 -> 交大 -> 水果) + */ private static final List NTP_SERVER_LIST = Lists.newArrayList("ntp.aliyun.com", "ntp.sjtu.edu.cn", "time1.apple.com"); - // 最大误差 5S + /** + * 最大误差 5S + */ private static final long MAX_OFFSET = 5000; public static void check() throws TimeCheckException { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java index f1b68eec..b0e8047c 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java @@ -1,5 +1,6 @@ package tech.powerjob.server.core.handler; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.beans.BeanUtils; @@ -22,7 +23,6 @@ import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepositor import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; -import javax.annotation.Resource; import java.util.List; import java.util.Optional; import java.util.concurrent.RejectedExecutionException; @@ -34,17 +34,18 @@ import java.util.stream.Collectors; * @author tjq * @since 2022/9/11 */ +@RequiredArgsConstructor @Slf4j public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler { - @Resource - protected MonitorService monitorService; - @Resource - protected Environment environment; - @Resource - protected ContainerInfoRepository containerInfoRepository; - @Resource - private WorkerClusterQueryService workerClusterQueryService; + + protected final MonitorService monitorService; + + protected final Environment environment; + + protected final ContainerInfoRepository containerInfoRepository; + + private final WorkerClusterQueryService workerClusterQueryService; protected abstract void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java index ac259e03..f9c267cc 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerHolder.java @@ -1,9 +1,7 @@ package tech.powerjob.server.core.handler; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.annotation.Resource; /** * WorkerRequestHandlerHolder @@ -16,13 +14,14 @@ public class WorkerRequestHandlerHolder { private static IWorkerRequestHandler workerRequestHandler; + public WorkerRequestHandlerHolder(IWorkerRequestHandler injectedWorkerRequestHandler) { + workerRequestHandler = injectedWorkerRequestHandler; + } public static IWorkerRequestHandler fetchWorkerRequestHandler() { + if (workerRequestHandler == null){ + throw new IllegalStateException("WorkerRequestHandlerHolder not initialized!"); + } return workerRequestHandler; } - - @Autowired - public void setWorkerRequestHandler(IWorkerRequestHandler workerRequestHandler) { - WorkerRequestHandlerHolder.workerRequestHandler = workerRequestHandler; - } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java index 77e573be..ed4e9206 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java @@ -1,7 +1,7 @@ package tech.powerjob.server.core.handler; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; +import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import tech.powerjob.common.enums.InstanceStatus; @@ -12,12 +12,14 @@ import tech.powerjob.common.response.AskResponse; import tech.powerjob.server.core.instance.InstanceLogService; import tech.powerjob.server.core.instance.InstanceManager; import tech.powerjob.server.core.workflow.WorkflowInstanceManager; +import tech.powerjob.server.monitor.MonitorService; import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent; import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent; import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent; +import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; import tech.powerjob.server.remote.worker.WorkerClusterManagerService; +import tech.powerjob.server.remote.worker.WorkerClusterQueryService; -import javax.annotation.Resource; import java.util.Optional; /** @@ -30,12 +32,19 @@ import java.util.Optional; @Component public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler { - @Resource - private InstanceManager instanceManager; - @Resource - private WorkflowInstanceManager workflowInstanceManager; - @Resource - private InstanceLogService instanceLogService; + private final InstanceManager instanceManager; + + private final WorkflowInstanceManager workflowInstanceManager; + + private final InstanceLogService instanceLogService; + + public WorkerRequestHandlerImpl(InstanceManager instanceManager, WorkflowInstanceManager workflowInstanceManager, InstanceLogService instanceLogService, + MonitorService monitorService, Environment environment, ContainerInfoRepository containerInfoRepository, WorkerClusterQueryService workerClusterQueryService) { + super(monitorService, environment, containerInfoRepository, workerClusterQueryService); + this.instanceManager = instanceManager; + this.workflowInstanceManager = workflowInstanceManager; + this.instanceLogService = instanceLogService; + } @Override protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java index 9f75587a..5adfe8d7 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceLogService.java @@ -56,6 +56,7 @@ public class InstanceLogService { @Resource private InstanceMetadataService instanceMetadataService; + @Resource private GridFsManager gridFsManager; /** @@ -63,6 +64,7 @@ public class InstanceLogService { */ @Resource(name = "localTransactionTemplate") private TransactionTemplate localTransactionTemplate; + @Resource private LocalInstanceLogRepository localInstanceLogRepository; diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceMetadataService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceMetadataService.java index 79a320e1..3ca67dc2 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceMetadataService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceMetadataService.java @@ -1,16 +1,15 @@ package tech.powerjob.server.core.instance; -import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; -import tech.powerjob.server.persistence.remote.model.JobInfoDO; -import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; -import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; +import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; +import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; -import javax.annotation.Resource; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -23,17 +22,23 @@ import java.util.concurrent.ExecutionException; @Service public class InstanceMetadataService implements InitializingBean { - @Resource - private JobInfoRepository jobInfoRepository; - @Resource - private InstanceInfoRepository instanceInfoRepository; + private final JobInfoRepository jobInfoRepository; - // 缓存,一旦生成任务实例,其对应的 JobInfo 不应该再改变(即使源数据改变) + private final InstanceInfoRepository instanceInfoRepository; + /** + * 缓存,一旦生成任务实例,其对应的 JobInfo 不应该再改变(即使源数据改变) + */ private Cache instanceId2JobInfoCache; - @Value("${oms.instance.metadata.cache.size}") - private int instanceMetadataCacheSize; - private static final int CACHE_CONCURRENCY_LEVEL = 16; + private final int instanceMetadataCacheSize; + + private static final int CACHE_CONCURRENCY_LEVEL = 32; + + public InstanceMetadataService(JobInfoRepository jobInfoRepository, InstanceInfoRepository instanceInfoRepository, @Value("${oms.instance.metadata.cache.size}") Integer instanceMetadataCacheSize) { + this.jobInfoRepository = jobInfoRepository; + this.instanceInfoRepository = instanceInfoRepository; + this.instanceMetadataCacheSize = instanceMetadataCacheSize; + } @Override public void afterPropertiesSet() throws Exception { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java index bd506b5d..b5e2f991 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java @@ -1,13 +1,14 @@ package tech.powerjob.server.core.instance; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; -import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.PowerQuery; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.Protocol; +import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerQueryInstanceStatusReq; import tech.powerjob.common.request.ServerStopInstanceReq; @@ -28,7 +29,6 @@ import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.remote.transport.TransportService; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; -import javax.annotation.Resource; import java.util.Date; import java.util.List; import java.util.Optional; @@ -45,23 +45,22 @@ import static tech.powerjob.common.enums.InstanceStatus.STOPPED; */ @Slf4j @Service +@RequiredArgsConstructor public class InstanceService { - @Resource - private TransportService transportService; - @Resource - private DispatchService dispatchService; - @Resource - private IdGenerateService idGenerateService; - @Resource - private InstanceManager instanceManager; - @Resource - private JobInfoRepository jobInfoRepository; - @Resource - private InstanceInfoRepository instanceInfoRepository; + private final TransportService transportService; - @Resource - private WorkerClusterQueryService workerClusterQueryService; + private final DispatchService dispatchService; + + private final IdGenerateService idGenerateService; + + private final InstanceManager instanceManager; + + private final JobInfoRepository jobInfoRepository; + + private final InstanceInfoRepository instanceInfoRepository; + + private final WorkerClusterQueryService workerClusterQueryService; /** * 创建任务实例(注意,该方法并不调用 saveAndFlush,如果有需要立即同步到DB的需求,请在方法结束后手动调用 flush) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/lock/UseCacheLockAspect.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/lock/UseCacheLockAspect.java index 11b4591c..1bf467cb 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/lock/UseCacheLockAspect.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/lock/UseCacheLockAspect.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Maps; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; @@ -30,10 +31,10 @@ import java.util.concurrent.locks.ReentrantLock; @Aspect @Component @Order(1) +@RequiredArgsConstructor public class UseCacheLockAspect { - @Resource - private MonitorService monitorService; + private final MonitorService monitorService; private final Map> lockContainer = Maps.newConcurrentMap(); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java index d9c9e137..0191e800 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CleanService.java @@ -1,15 +1,5 @@ package tech.powerjob.server.core.scheduler; -import tech.powerjob.common.enums.InstanceStatus; -import tech.powerjob.common.enums.WorkflowInstanceStatus; -import tech.powerjob.server.common.constants.PJThreadPool; -import tech.powerjob.server.common.utils.OmsFileUtils; -import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; -import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; -import tech.powerjob.server.persistence.mongodb.GridFsManager; -import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; -import tech.powerjob.server.remote.worker.WorkerClusterManagerService; -import tech.powerjob.server.extension.LockService; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import lombok.extern.slf4j.Slf4j; @@ -18,8 +8,17 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.WorkflowInstanceStatus; +import tech.powerjob.server.common.constants.PJThreadPool; +import tech.powerjob.server.common.utils.OmsFileUtils; +import tech.powerjob.server.extension.LockService; +import tech.powerjob.server.persistence.mongodb.GridFsManager; +import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; +import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; +import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; +import tech.powerjob.server.remote.worker.WorkerClusterManagerService; -import javax.annotation.Resource; import java.io.File; import java.util.Date; @@ -33,25 +32,21 @@ import java.util.Date; @Service public class CleanService { - @Resource - private GridFsManager gridFsManager; - @Resource - private InstanceInfoRepository instanceInfoRepository; - @Resource - private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; - @Resource - private WorkflowNodeInfoRepository workflowNodeInfoRepository; - @Resource - private LockService lockService; + private final GridFsManager gridFsManager; - @Value("${oms.instanceinfo.retention}") - private int instanceInfoRetentionDay; + private final InstanceInfoRepository instanceInfoRepository; - @Value("${oms.container.retention.local}") - private int localContainerRetentionDay; - @Value("${oms.container.retention.remote}") - private int remoteContainerRetentionDay; + private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository; + private final WorkflowNodeInfoRepository workflowNodeInfoRepository; + + private final LockService lockService; + + private final int instanceInfoRetentionDay; + + private final int localContainerRetentionDay; + + private final int remoteContainerRetentionDay; private static final int TEMPORARY_RETENTION_DAY = 3; @@ -62,6 +57,21 @@ public class CleanService { private static final String HISTORY_DELETE_LOCK = "history_delete_lock"; + public CleanService(GridFsManager gridFsManager, InstanceInfoRepository instanceInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository, + WorkflowNodeInfoRepository workflowNodeInfoRepository, LockService lockService, + @Value("${oms.instanceinfo.retention}") int instanceInfoRetentionDay, + @Value("${oms.container.retention.local}") int localContainerRetentionDay, + @Value("${oms.container.retention.remote}") int remoteContainerRetentionDay) { + this.gridFsManager = gridFsManager; + this.instanceInfoRepository = instanceInfoRepository; + this.workflowInstanceInfoRepository = workflowInstanceInfoRepository; + this.workflowNodeInfoRepository = workflowNodeInfoRepository; + this.lockService = lockService; + this.instanceInfoRetentionDay = instanceInfoRetentionDay; + this.localContainerRetentionDay = localContainerRetentionDay; + this.remoteContainerRetentionDay = remoteContainerRetentionDay; + } + @Async(PJThreadPool.TIMING_POOL) @Scheduled(cron = CLEAN_TIME_EXPRESSION) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java index b324923e..a9af475b 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java @@ -1,26 +1,27 @@ package tech.powerjob.server.core.scheduler; -import tech.powerjob.common.enums.InstanceStatus; -import tech.powerjob.common.SystemInstanceResult; -import tech.powerjob.common.enums.TimeExpressionType; -import tech.powerjob.common.enums.WorkflowInstanceStatus; -import tech.powerjob.server.common.constants.PJThreadPool; -import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; -import tech.powerjob.server.persistence.remote.model.*; -import tech.powerjob.server.persistence.remote.repository.*; -import tech.powerjob.server.core.DispatchService; -import tech.powerjob.server.core.instance.InstanceManager; -import tech.powerjob.server.core.workflow.WorkflowInstanceManager; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; +import tech.powerjob.common.SystemInstanceResult; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.enums.WorkflowInstanceStatus; +import tech.powerjob.server.common.constants.PJThreadPool; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.core.DispatchService; +import tech.powerjob.server.core.instance.InstanceManager; +import tech.powerjob.server.core.workflow.WorkflowInstanceManager; +import tech.powerjob.server.persistence.remote.model.*; +import tech.powerjob.server.persistence.remote.repository.*; +import tech.powerjob.server.remote.transport.starter.AkkaStarter; -import javax.annotation.Resource; import java.util.Date; import java.util.List; import java.util.Optional; @@ -34,6 +35,7 @@ import java.util.stream.Collectors; */ @Slf4j @Service +@RequiredArgsConstructor public class InstanceStatusCheckService { private static final int MAX_BATCH_NUM = 10; @@ -42,23 +44,21 @@ public class InstanceStatusCheckService { private static final long RUNNING_TIMEOUT_MS = 60000; private static final long WORKFLOW_WAITING_TIMEOUT_MS = 60000; - @Resource - private DispatchService dispatchService; - @Resource - private InstanceManager instanceManager; - @Resource - private WorkflowInstanceManager workflowInstanceManager; + private final DispatchService dispatchService; - @Resource - private AppInfoRepository appInfoRepository; - @Resource - private JobInfoRepository jobInfoRepository; - @Resource - private InstanceInfoRepository instanceInfoRepository; - @Resource - private WorkflowInfoRepository workflowInfoRepository; - @Resource - private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; + private final InstanceManager instanceManager; + + private final WorkflowInstanceManager workflowInstanceManager; + + private final AppInfoRepository appInfoRepository; + + private final JobInfoRepository jobInfoRepository; + + private final InstanceInfoRepository instanceInfoRepository; + + private final WorkflowInfoRepository workflowInfoRepository; + + private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository; @Async(PJThreadPool.TIMING_POOL) @Scheduled(fixedDelay = 10000) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java index 862acf8e..f48555e9 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java @@ -1,5 +1,6 @@ package tech.powerjob.server.core.scheduler; +import lombok.RequiredArgsConstructor; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.LifeCycle; @@ -30,7 +31,6 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; -import javax.annotation.Resource; import java.util.*; import java.util.stream.Collectors; @@ -44,6 +44,7 @@ import java.util.stream.Collectors; */ @Slf4j @Service +@RequiredArgsConstructor public class PowerScheduleService { /** @@ -51,26 +52,23 @@ public class PowerScheduleService { */ private static final int MAX_APP_NUM = 10; - @Resource - private DispatchService dispatchService; - @Resource - private InstanceService instanceService; - @Resource - private WorkflowInstanceManager workflowInstanceManager; + private final DispatchService dispatchService; - @Resource - private AppInfoRepository appInfoRepository; - @Resource - private JobInfoRepository jobInfoRepository; - @Resource - private WorkflowInfoRepository workflowInfoRepository; - @Resource - private InstanceInfoRepository instanceInfoRepository; + private final InstanceService instanceService; - @Resource - private JobService jobService; - @Resource - private TimingStrategyService timingStrategyService; + private final WorkflowInstanceManager workflowInstanceManager; + + private final AppInfoRepository appInfoRepository; + + private final JobInfoRepository jobInfoRepository; + + private final WorkflowInfoRepository workflowInfoRepository; + + private final InstanceInfoRepository instanceInfoRepository; + + private final JobService jobService; + + private final TimingStrategyService timingStrategyService; private static final long SCHEDULE_RATE = 15000; diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/AppInfoService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/AppInfoService.java index 66863750..e768bd77 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/AppInfoService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/AppInfoService.java @@ -1,5 +1,6 @@ package tech.powerjob.server.core.service; +import lombok.RequiredArgsConstructor; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.server.persistence.remote.model.AppInfoDO; import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; @@ -15,10 +16,10 @@ import java.util.Objects; * @since 2020/6/20 */ @Service +@RequiredArgsConstructor public class AppInfoService { - @Resource - private AppInfoRepository appInfoRepository; + private final AppInfoRepository appInfoRepository; /** * 验证应用访问权限 diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/CacheService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/CacheService.java index 619e5356..491ea328 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/CacheService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/CacheService.java @@ -1,17 +1,16 @@ package tech.powerjob.server.core.service; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; -import javax.annotation.Resource; import java.time.Duration; import java.util.Optional; @@ -25,19 +24,23 @@ import java.util.Optional; @Service public class CacheService { - @Resource - private JobInfoRepository jobInfoRepository; - @Resource - private WorkflowInfoRepository workflowInfoRepository; - @Resource - private InstanceInfoRepository instanceInfoRepository; + private final JobInfoRepository jobInfoRepository; + + private final WorkflowInfoRepository workflowInfoRepository; + + private final InstanceInfoRepository instanceInfoRepository; private final Cache jobId2JobNameCache; private final Cache workflowId2WorkflowNameCache; private final Cache instanceId2AppId; private final Cache jobId2AppId; - public CacheService() { + public CacheService(JobInfoRepository jobInfoRepository, WorkflowInfoRepository workflowInfoRepository, InstanceInfoRepository instanceInfoRepository) { + + this.jobInfoRepository = jobInfoRepository; + this.workflowInfoRepository = workflowInfoRepository; + this.instanceInfoRepository = instanceInfoRepository; + jobId2JobNameCache = CacheBuilder.newBuilder() .expireAfterWrite(Duration.ofMinutes(1)) .maximumSize(512) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java index b3fe7703..87307b72 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java @@ -2,6 +2,7 @@ package tech.powerjob.server.core.service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.data.jpa.domain.Specification; @@ -43,20 +44,18 @@ import java.util.stream.Collectors; */ @Slf4j @Service +@RequiredArgsConstructor public class JobService { - @Resource - private InstanceService instanceService; + private final InstanceService instanceService; - @Resource - private DispatchService dispatchService; - @Resource - private JobInfoRepository jobInfoRepository; - @Resource - private InstanceInfoRepository instanceInfoRepository; - @Resource - private TimingStrategyService timingStrategyService; + private final DispatchService dispatchService; + private final JobInfoRepository jobInfoRepository; + + private final InstanceInfoRepository instanceInfoRepository; + + private final TimingStrategyService timingStrategyService; /** * 保存/修改任务 @@ -205,9 +204,8 @@ public class JobService { * 启用某个任务 * * @param jobId 任务ID - * @throws ParseException 异常(CRON表达式错误) */ - public void enableJob(Long jobId) throws ParseException { + public void enableJob(Long jobId) { JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId)); jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV()); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/IdGenerateService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/IdGenerateService.java index 2f42e087..d583f69b 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/IdGenerateService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/IdGenerateService.java @@ -18,11 +18,10 @@ import tech.powerjob.server.remote.server.self.ServerInfoService; public class IdGenerateService { private final SnowFlakeIdGenerator snowFlakeIdGenerator; + private static final int DATA_CENTER_ID = 0; - @Autowired public IdGenerateService(ServerInfoService serverInfoService) { - long id = serverInfoService.fetchServiceInfo().getId(); snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id); log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java index c700cd60..a04dd67b 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/uid/SnowFlakeIdGenerator.java @@ -7,37 +7,50 @@ package tech.powerjob.server.core.uid; * @since 2020/4/6 */ public class SnowFlakeIdGenerator { - /** * 起始的时间戳(a special day for me) */ private final static long START_STAMP = 1555776000000L; - /** - * 每一部分占用的位数 + * 序列号占用的位数 */ - private final static long SEQUENCE_BIT = 6; //序列号占用的位数 - private final static long MACHINE_BIT = 14; //机器标识占用的位数 - private final static long DATA_CENTER_BIT = 2;//数据中心占用的位数 - + private final static long SEQUENCE_BIT = 6; + /** + * 机器标识占用的位数 + */ + private final static long MACHINE_BIT = 14; + /** + * 数据中心占用的位数 + */ + private final static long DATA_CENTER_BIT = 2; /** * 每一部分的最大值 */ private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT); private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT); private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT); - /** * 每一部分向左的位移 */ private final static long MACHINE_LEFT = SEQUENCE_BIT; private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT; private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT; - - private final long dataCenterId; //数据中心 - private final long machineId; //机器标识 - private long sequence = 0L; //序列号 - private long lastTimestamp = -1L;//上一次时间戳 + /** + * 数据中心 + */ + private final long dataCenterId; + /** + * 机器标识 + */ + private final long machineId; + /** + * 序列号 + */ + private long sequence = 0L; + /** + * 上一次时间戳 + */ + private long lastTimestamp = -1L; public SnowFlakeIdGenerator(long dataCenterId, long machineId) { if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/JobNodeValidator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/JobNodeValidator.java index e95a34e5..7765aec8 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/JobNodeValidator.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/JobNodeValidator.java @@ -1,5 +1,6 @@ package tech.powerjob.server.core.validator; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import tech.powerjob.common.enums.WorkflowNodeType; @@ -18,10 +19,10 @@ import javax.annotation.Resource; */ @Component @Slf4j +@RequiredArgsConstructor public class JobNodeValidator implements NodeValidator { - @Resource - private JobInfoRepository jobInfoRepository; + private final JobInfoRepository jobInfoRepository; @Override public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NestedWorkflowNodeValidator.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NestedWorkflowNodeValidator.java index 08147e0c..0c22fa1b 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NestedWorkflowNodeValidator.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/validator/NestedWorkflowNodeValidator.java @@ -1,6 +1,7 @@ package tech.powerjob.server.core.validator; import com.alibaba.fastjson.JSON; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import tech.powerjob.common.enums.WorkflowNodeType; @@ -13,7 +14,6 @@ import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; -import javax.annotation.Resource; import java.util.Objects; import java.util.Optional; @@ -23,12 +23,12 @@ import java.util.Optional; */ @Component @Slf4j +@RequiredArgsConstructor public class NestedWorkflowNodeValidator implements NodeValidator { - @Resource - private WorkflowInfoRepository workflowInfoRepository; - @Resource - private WorkflowNodeInfoRepository workflowNodeInfoRepository; + private final WorkflowInfoRepository workflowInfoRepository; + + private final WorkflowNodeInfoRepository workflowNodeInfoRepository; @Override public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) { diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java index 0cd9373c..520e0780 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceManager.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; @@ -32,7 +33,6 @@ import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository; -import javax.annotation.Resource; import java.util.*; import java.util.stream.Collectors; @@ -47,25 +47,25 @@ import static tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils.isNo */ @Slf4j @Service +@RequiredArgsConstructor @SuppressWarnings("squid:S1192") public class WorkflowInstanceManager { - @Resource - private AlarmCenter alarmCenter; - @Resource - private IdGenerateService idGenerateService; - @Resource - private JobInfoRepository jobInfoRepository; - @Resource - private UserService userService; - @Resource - private WorkflowInfoRepository workflowInfoRepository; - @Resource - private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; - @Resource - private WorkflowNodeInfoRepository workflowNodeInfoRepository; - @Resource - private WorkflowNodeHandleService workflowNodeHandleService; + private final AlarmCenter alarmCenter; + + private final IdGenerateService idGenerateService; + + private final JobInfoRepository jobInfoRepository; + + private final UserService userService; + + private final WorkflowInfoRepository workflowInfoRepository; + + private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository; + + private final WorkflowNodeInfoRepository workflowNodeInfoRepository; + + private final WorkflowNodeHandleService workflowNodeHandleService; /** * 创建工作流任务实例 diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java index fae06d15..047a706d 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/WorkflowInstanceService.java @@ -1,14 +1,20 @@ package tech.powerjob.server.core.workflow; import com.alibaba.fastjson.JSON; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; +import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.WorkflowInstanceStatus; import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.common.SystemInstanceResult; -import tech.powerjob.common.enums.WorkflowInstanceStatus; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.response.WorkflowInstanceInfoDTO; import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.utils.SpringUtils; +import tech.powerjob.server.core.instance.InstanceService; import tech.powerjob.server.core.lock.UseCacheLock; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; @@ -16,12 +22,7 @@ import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; import tech.powerjob.server.remote.server.redirector.DesignateServer; -import tech.powerjob.server.core.instance.InstanceService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; -import org.springframework.stereotype.Service; -import javax.annotation.Resource; import java.util.Date; import java.util.Objects; import java.util.Optional; @@ -35,18 +36,16 @@ import java.util.Optional; */ @Slf4j @Service +@RequiredArgsConstructor public class WorkflowInstanceService { - @Resource - private InstanceService instanceService; - @Resource - private WorkflowInstanceInfoRepository wfInstanceInfoRepository; - @Resource - private WorkflowInstanceManager workflowInstanceManager; - @Resource - private WorkflowInfoRepository workflowInfoRepository; - @Resource - private WorkflowInstanceService self; + private final InstanceService instanceService; + + private final WorkflowInstanceInfoRepository wfInstanceInfoRepository; + + private final WorkflowInstanceManager workflowInstanceManager; + + private final WorkflowInfoRepository workflowInfoRepository; /** * 停止工作流实例(入口) @@ -61,10 +60,10 @@ public class WorkflowInstanceService { } // 如果这是一个被嵌套的工作流,则终止父工作流 if (wfInstance.getParentWfInstanceId() != null) { - self.stopWorkflowInstance(wfInstance.getParentWfInstanceId(), appId); + SpringUtils.getBean(this.getClass()).stopWorkflowInstance(wfInstance.getParentWfInstanceId(), appId); return; } - self.stopWorkflowInstance(wfInstanceId, appId); + SpringUtils.getBean(this.getClass()).stopWorkflowInstance(wfInstanceId, appId); } /** diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/JobNodeHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/JobNodeHandler.java index 531123e9..4481fabe 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/JobNodeHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/JobNodeHandler.java @@ -1,11 +1,13 @@ package tech.powerjob.server.core.workflow.hanlder.impl; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.enums.WorkflowNodeType; import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.server.common.utils.SpringUtils; import tech.powerjob.server.core.DispatchService; import tech.powerjob.server.core.instance.InstanceService; import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler; @@ -13,7 +15,6 @@ import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; -import javax.annotation.Resource; /** * @author Echo009 @@ -21,21 +22,15 @@ import javax.annotation.Resource; */ @Slf4j @Component +@RequiredArgsConstructor public class JobNodeHandler implements TaskNodeHandler { - @Resource - private InstanceService instanceService; - - @Resource - private JobInfoRepository jobInfoRepository; - - @Resource - private DispatchService dispatchService; + private final JobInfoRepository jobInfoRepository; @Override public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) { // instanceParam 传递的是工作流实例的 wfContext - Long instanceId = instanceService.create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis()); + Long instanceId = SpringUtils.getBean(InstanceService.class).create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis()); node.setInstanceId(instanceId); node.setStatus(InstanceStatus.RUNNING.getV()); log.info("[Workflow-{}|{}] create readyNode(JOB) instance(nodeId={},jobId={},instanceId={}) successfully~", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId(), instanceId); @@ -46,7 +41,7 @@ public class JobNodeHandler implements TaskNodeHandler { JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new); // 洗去时间表达式类型 jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV()); - dispatchService.dispatch(jobInfo, node.getInstanceId()); + SpringUtils.getBean(DispatchService.class).dispatch(jobInfo, node.getInstanceId()); } @Override diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java index e7d0de99..b08413a6 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/NestedWorkflowNodeHandler.java @@ -1,6 +1,7 @@ package tech.powerjob.server.core.workflow.hanlder.impl; import com.alibaba.fastjson.JSON; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import tech.powerjob.common.SystemInstanceResult; @@ -11,6 +12,7 @@ import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.PEWorkflowDAG; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.utils.SpringUtils; import tech.powerjob.server.core.workflow.WorkflowInstanceManager; import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils; import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler; @@ -19,7 +21,6 @@ import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository; -import javax.annotation.Resource; import java.util.Date; /** @@ -28,16 +29,12 @@ import java.util.Date; */ @Component @Slf4j +@RequiredArgsConstructor public class NestedWorkflowNodeHandler implements TaskNodeHandler { - @Resource - private WorkflowInfoRepository workflowInfoRepository; + private final WorkflowInfoRepository workflowInfoRepository; - @Resource - private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; - - @Resource - private WorkflowInstanceManager workflowInstanceManager; + private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository; @Override public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) { @@ -78,7 +75,7 @@ public class NestedWorkflowNodeHandler implements TaskNodeHandler { } else { // 透传当前的上下文创建新的工作流实例 String wfContext = wfInstanceInfo.getWfContext(); - Long instanceId = workflowInstanceManager.create(targetWf, wfContext, System.currentTimeMillis(), wfInstanceInfo.getWfInstanceId()); + Long instanceId = SpringUtils.getBean(WorkflowInstanceManager.class).create(targetWf, wfContext, System.currentTimeMillis(), wfInstanceInfo.getWfInstanceId()); node.setInstanceId(instanceId); } node.setStartTime(CommonUtils.formatTime(System.currentTimeMillis())); @@ -89,7 +86,7 @@ public class NestedWorkflowNodeHandler implements TaskNodeHandler { public void startTaskInstance(PEWorkflowDAG.Node node) { Long wfId = node.getJobId(); WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null); - workflowInstanceManager.start(targetWf, node.getInstanceId()); + SpringUtils.getBean(WorkflowInstanceManager.class).start(targetWf, node.getInstanceId()); } @Override diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/DatabaseLockService.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/DatabaseLockService.java index d329e96b..15a3d91e 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/DatabaseLockService.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/DatabaseLockService.java @@ -21,6 +21,7 @@ import org.springframework.stereotype.Service; public class DatabaseLockService implements LockService { private final String ownerIp; + private final OmsLockRepository omsLockRepository; @Autowired diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/AlarmCenter.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/AlarmCenter.java index 48914cf5..118e5d41 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/AlarmCenter.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/AlarmCenter.java @@ -24,9 +24,9 @@ import java.util.concurrent.*; public class AlarmCenter { private final ExecutorService POOL; + private final List BEANS = Lists.newLinkedList(); - @Autowired public AlarmCenter(List alarmables) { int cores = Runtime.getRuntime().availableProcessors(); ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("AlarmPool-%d").build(); diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java index ab705c0a..b42d9c05 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkAlarmService.java @@ -1,5 +1,6 @@ package tech.powerjob.server.extension.defaultimpl.alarm.impl; +import lombok.RequiredArgsConstructor; import tech.powerjob.common.OmsConstant; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.utils.NetUtils; @@ -30,17 +31,19 @@ import java.util.Set; */ @Slf4j @Service +@RequiredArgsConstructor public class DingTalkAlarmService implements Alarmable { - @Resource - private Environment environment; + private final Environment environment; private Long agentId; private DingTalkUtils dingTalkUtils; private Cache mobile2UserIdCache; private static final int CACHE_SIZE = 8192; - // 防止缓存击穿 + /** + * 防止缓存击穿 + */ private static final String EMPTY_TAG = "EMPTY"; @Override diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkUtils.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkUtils.java index 39425c10..e2cd77f0 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkUtils.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/impl/DingTalkUtils.java @@ -125,7 +125,7 @@ public class DingTalkUtils implements Closeable { @AllArgsConstructor public static final class MarkdownEntity { - private String title; - private String detail; + private final String title; + private final String detail; } } diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/module/JobInstanceAlarm.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/module/JobInstanceAlarm.java index 13e091ec..4924966e 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/module/JobInstanceAlarm.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/module/JobInstanceAlarm.java @@ -10,38 +10,70 @@ import lombok.Data; */ @Data public class JobInstanceAlarm implements Alarm { - // 应用ID + /** + * 应用ID + */ private long appId; - // 任务ID + /** + * 任务ID + */ private long jobId; - // 任务实例ID + /** + * 任务实例ID + */ private long instanceId; - // 任务名称 + /** + * 任务名称 + */ private String jobName; - // 任务自带的参数 + /** + * 任务自带的参数 + */ private String jobParams; - // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) + /** + * 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) + */ private Integer timeExpressionType; - // 时间表达式,CRON/NULL/LONG/LONG + /** + * 时间表达式,CRON/NULL/LONG/LONG + */ private String timeExpression; - // 执行类型,单机/广播/MR + /** + * 执行类型,单机/广播/MR + */ private Integer executeType; - // 执行器类型,Java/Shell + /** + * 执行器类型,Java/Shell + */ private Integer processorType; - // 执行器信息 + /** + * 执行器信息 + */ private String processorInfo; - // 任务实例参数 + /** + * 任务实例参数 + */ private String instanceParams; - // 执行结果 + /** + * 执行结果 + */ private String result; - // 预计触发时间 + /** + * 预计触发时间 + */ private Long expectedTriggerTime; - // 实际触发时间 + /** + * 实际触发时间 + */ private Long actualTriggerTime; - // 结束时间 + /** + * 结束时间 + */ private Long finishedTime; - // TaskTracker地址 + /** + * + */ private String taskTrackerAddress; @Override diff --git a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/module/WorkflowInstanceAlarm.java b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/module/WorkflowInstanceAlarm.java index 3435de1c..9639dc54 100644 --- a/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/module/WorkflowInstanceAlarm.java +++ b/powerjob-server/powerjob-server-extension/src/main/java/tech/powerjob/server/extension/defaultimpl/alarm/module/WorkflowInstanceAlarm.java @@ -14,25 +14,39 @@ public class WorkflowInstanceAlarm implements Alarm { private String workflowName; - // 任务所属应用的ID,冗余提高查询效率 + /** + * 任务所属应用的ID,冗余提高查询效率 + */ private Long appId; private Long workflowId; - // workflowInstanceId(任务实例表都使用单独的ID作为主键以支持潜在的分表需求) + /** + * workflowInstanceId(任务实例表都使用单独的ID作为主键以支持潜在的分表需求) + */ private Long wfInstanceId; - // workflow 状态(WorkflowInstanceStatus) + /** + * workflow 状态(WorkflowInstanceStatus) + */ private Integer status; private PEWorkflowDAG peWorkflowDAG; private String result; - // 实际触发时间 + /** + * 实际触发时间 + */ private Long actualTriggerTime; - // 结束时间 + /** + * 结束时间 + */ private Long finishedTime; - // 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) + /** + * 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY) + */ private Integer timeExpressionType; - // 时间表达式,CRON/NULL/LONG/LONG + /** + * 时间表达式,CRON/NULL/LONG/LONG + */ private String timeExpression; @Override diff --git a/powerjob-server/powerjob-server-migrate/src/main/java/tech/powerjob/server/migrate/V3ToV4MigrateService.java b/powerjob-server/powerjob-server-migrate/src/main/java/tech/powerjob/server/migrate/V3ToV4MigrateService.java index 3a07857f..b9781f9d 100644 --- a/powerjob-server/powerjob-server-migrate/src/main/java/tech/powerjob/server/migrate/V3ToV4MigrateService.java +++ b/powerjob-server/powerjob-server-migrate/src/main/java/tech/powerjob/server/migrate/V3ToV4MigrateService.java @@ -2,9 +2,11 @@ package tech.powerjob.server.migrate; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import lombok.RequiredArgsConstructor; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.model.PEWorkflowDAG; +import tech.powerjob.server.common.utils.SpringUtils; import tech.powerjob.server.extension.LockService; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; @@ -20,7 +22,6 @@ import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; -import javax.annotation.Resource; import javax.persistence.criteria.Predicate; import javax.transaction.Transactional; import java.util.*; @@ -35,23 +36,18 @@ import java.util.concurrent.TimeUnit; */ @Service @Slf4j +@RequiredArgsConstructor public class V3ToV4MigrateService { private static final String MIGRATE_LOCK_TEMPLATE = "v3to4MigrateLock-%s-%s"; - @Resource - private LockService lockService; - @Resource - private JobInfoRepository jobInfoRepository; - @Resource - private WorkflowInfoRepository workflowInfoRepository; - @Resource - private WorkflowNodeInfoRepository workflowNodeInfoRepository; - /** - * 避免内部方法调用导致事务不生效 - */ - @Resource - private V3ToV4MigrateService self; + private final LockService lockService; + + private final JobInfoRepository jobInfoRepository; + + private final WorkflowInfoRepository workflowInfoRepository; + + private final WorkflowNodeInfoRepository workflowNodeInfoRepository; /* ********************** 3.x => 4.x ********************** */ @@ -149,7 +145,7 @@ public class V3ToV4MigrateService { for (WorkflowInfoDO workflowInfo : workflowInfoList) { try { - boolean fixed = self.fixWorkflowInfoCoreFromV3ToV4(workflowInfo, jobId2NodeIdMap); + boolean fixed = SpringUtils.getBean(this.getClass()).fixWorkflowInfoCoreFromV3ToV4(workflowInfo, jobId2NodeIdMap); if (fixed) { fixedWorkflowIds.add(workflowInfo.getId()); } diff --git a/powerjob-server/powerjob-server-monitor/src/main/java/tech/powerjob/server/monitor/PowerJobMonitorService.java b/powerjob-server/powerjob-server-monitor/src/main/java/tech/powerjob/server/monitor/PowerJobMonitorService.java index 90caaa5e..9969a94d 100644 --- a/powerjob-server/powerjob-server-monitor/src/main/java/tech/powerjob/server/monitor/PowerJobMonitorService.java +++ b/powerjob-server/powerjob-server-monitor/src/main/java/tech/powerjob/server/monitor/PowerJobMonitorService.java @@ -2,7 +2,6 @@ package tech.powerjob.server.monitor; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.List; @@ -19,9 +18,7 @@ public class PowerJobMonitorService implements MonitorService { private final List monitors = Lists.newLinkedList(); - @Autowired public PowerJobMonitorService(List monitors) { - monitors.forEach(m -> { log.info("[MonitorService] register monitor: {}", m.getClass().getName()); this.monitors.add(m); diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/config/LocalJpaConfig.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/config/LocalJpaConfig.java index c33f0b9e..215ef406 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/config/LocalJpaConfig.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/config/LocalJpaConfig.java @@ -7,6 +7,7 @@ import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties; import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; @@ -38,9 +39,6 @@ import java.util.Objects; ) public class LocalJpaConfig { - @Resource(name = "omsLocalDatasource") - private DataSource omsLocalDatasource; - public static final String LOCAL_PACKAGES = "tech.powerjob.server.persistence.local"; private static Map genDatasourceProperties() { @@ -56,8 +54,7 @@ public class LocalJpaConfig { } @Bean(name = "localEntityManagerFactory") - public LocalContainerEntityManagerFactoryBean initLocalEntityManagerFactory(EntityManagerFactoryBuilder builder) { - + public LocalContainerEntityManagerFactoryBean initLocalEntityManagerFactory(@Qualifier("omsLocalDatasource") DataSource omsLocalDatasource,EntityManagerFactoryBuilder builder) { return builder .dataSource(omsLocalDatasource) .properties(genDatasourceProperties()) @@ -66,10 +63,9 @@ public class LocalJpaConfig { .build(); } - @Bean(name = "localTransactionManager") - public PlatformTransactionManager initLocalTransactionManager(EntityManagerFactoryBuilder builder) { - return new JpaTransactionManager(Objects.requireNonNull(initLocalEntityManagerFactory(builder).getObject())); + public PlatformTransactionManager initLocalTransactionManager(@Qualifier("localEntityManagerFactory") LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBean) { + return new JpaTransactionManager(Objects.requireNonNull(localContainerEntityManagerFactoryBean.getObject())); } @Bean(name = "localTransactionTemplate") diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/config/RemoteJpaConfig.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/config/RemoteJpaConfig.java index 3bc52603..fb784152 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/config/RemoteJpaConfig.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/config/RemoteJpaConfig.java @@ -1,5 +1,6 @@ package tech.powerjob.server.persistence.config; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties; import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings; import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties; @@ -13,7 +14,6 @@ import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; -import javax.annotation.Resource; import javax.sql.DataSource; import java.util.Map; import java.util.Objects; @@ -36,12 +36,6 @@ import java.util.Objects; ) public class RemoteJpaConfig { - @Resource(name = "omsRemoteDatasource") - private DataSource omsRemoteDatasource; - - @Resource(name = "multiDatasourceProperties") - private MultiDatasourceProperties properties; - public static final String CORE_PACKAGES = "tech.powerjob.server.persistence.remote"; /** @@ -69,7 +63,7 @@ public class RemoteJpaConfig { @Primary @Bean(name = "remoteEntityManagerFactory") - public LocalContainerEntityManagerFactoryBean initRemoteEntityManagerFactory(EntityManagerFactoryBuilder builder) { + public LocalContainerEntityManagerFactoryBean initRemoteEntityManagerFactory(@Qualifier("omsRemoteDatasource") DataSource omsRemoteDatasource,@Qualifier("multiDatasourceProperties") MultiDatasourceProperties properties, EntityManagerFactoryBuilder builder) { Map datasourceProperties = genDatasourceProperties(); datasourceProperties.putAll(properties.getRemote().getHibernate().getProperties()); return builder @@ -83,7 +77,7 @@ public class RemoteJpaConfig { @Primary @Bean(name = "remoteTransactionManager") - public PlatformTransactionManager initRemoteTransactionManager(EntityManagerFactoryBuilder builder) { - return new JpaTransactionManager(Objects.requireNonNull(initRemoteEntityManagerFactory(builder).getObject())); + public PlatformTransactionManager initRemoteTransactionManager(@Qualifier("remoteEntityManagerFactory") LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBean) { + return new JpaTransactionManager(Objects.requireNonNull(localContainerEntityManagerFactoryBean.getObject())); } } diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/local/LocalInstanceLogRepository.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/local/LocalInstanceLogRepository.java index cd8057f7..ab72a745 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/local/LocalInstanceLogRepository.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/local/LocalInstanceLogRepository.java @@ -16,16 +16,20 @@ import java.util.stream.Stream; */ public interface LocalInstanceLogRepository extends JpaRepository { - // 流式查询 + /** + * 流式查询 + */ Stream findByInstanceIdOrderByLogTime(Long instanceId); - // 删除数据 + /** + * 删除数据 + */ @Modifying - @Transactional + @Transactional(rollbackOn = Exception.class) long deleteByInstanceId(Long instanceId); @Modifying - @Transactional + @Transactional(rollbackOn = Exception.class) @CanIgnoreReturnValue long deleteByInstanceIdInAndLogTimeLessThan(List instanceIds, Long t); diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/mongodb/GridFsManager.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/mongodb/GridFsManager.java index 9a607ad5..d275a0fd 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/mongodb/GridFsManager.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/mongodb/GridFsManager.java @@ -1,6 +1,5 @@ package tech.powerjob.server.persistence.mongodb; -import tech.powerjob.server.common.PowerJobServerConfigKey; import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; import com.mongodb.client.MongoDatabase; @@ -19,8 +18,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.stereotype.Service; +import tech.powerjob.server.common.PowerJobServerConfigKey; -import javax.annotation.Resource; import java.io.*; import java.util.Date; import java.util.Map; @@ -36,21 +35,24 @@ import java.util.function.Consumer; @Service public class GridFsManager implements InitializingBean { - @Resource - private Environment environment; + private final Environment environment; + + private final MongoDatabase db; - private MongoDatabase db; private boolean available; private final Map bucketCache = Maps.newConcurrentMap(); public static final String LOG_BUCKET = "log"; + public static final String CONTAINER_BUCKET = "container"; - @Autowired(required = false) - public void setMongoTemplate(MongoTemplate mongoTemplate) { + public GridFsManager(Environment environment, @Autowired(required = false) MongoTemplate mongoTemplate) { + this.environment = environment; if (mongoTemplate != null) { this.db = mongoTemplate.getDb(); + } else { + this.db = null; } } diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/monitor/DatabaseMonitorAspect.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/monitor/DatabaseMonitorAspect.java index 6d5adb49..8be9a4aa 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/monitor/DatabaseMonitorAspect.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/monitor/DatabaseMonitorAspect.java @@ -1,5 +1,6 @@ package tech.powerjob.server.persistence.monitor; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; @@ -11,7 +12,6 @@ import tech.powerjob.server.monitor.MonitorService; import tech.powerjob.server.monitor.events.db.DatabaseEvent; import tech.powerjob.server.monitor.events.db.DatabaseType; -import javax.annotation.Resource; import java.util.Collection; import java.util.Optional; import java.util.stream.Stream; @@ -25,10 +25,10 @@ import java.util.stream.Stream; @Slf4j @Aspect @Component +@RequiredArgsConstructor public class DatabaseMonitorAspect { - @Resource - private MonitorService monitorService; + private final MonitorService monitorService; @Around("execution(* tech.powerjob.server.persistence.remote.repository..*.*(..))") public Object monitorCoreDB(ProceedingJoinPoint joinPoint) throws Throwable { diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/repository/OmsLockRepository.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/repository/OmsLockRepository.java index 64dedf28..54baa3a4 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/repository/OmsLockRepository.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/repository/OmsLockRepository.java @@ -16,13 +16,13 @@ import javax.transaction.Transactional; public interface OmsLockRepository extends JpaRepository { @Modifying - @Transactional + @Transactional(rollbackOn = Exception.class) @Query(value = "delete from OmsLockDO where lockName = ?1") int deleteByLockName(String lockName); OmsLockDO findByLockName(String lockName); @Modifying - @Transactional + @Transactional(rollbackOn = Exception.class) int deleteByOwnerIP(String ip); } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java index 1370949e..1db65201 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java @@ -37,20 +37,25 @@ import java.util.concurrent.TimeUnit; @Service public class ServerElectionService { - @Resource - private LockService lockService; - @Resource - private TransportService transportService; - @Resource - private AppInfoRepository appInfoRepository; + private final LockService lockService; - @Value("${oms.accurate.select.server.percentage}") - private int accurateSelectServerPercentage; + private final TransportService transportService; + + private final AppInfoRepository appInfoRepository; + + private final int accurateSelectServerPercentage; private static final int RETRY_TIMES = 10; private static final long PING_TIMEOUT_MS = 1000; private static final String SERVER_ELECT_LOCK = "server_elect_%d"; + public ServerElectionService(LockService lockService, TransportService transportService, AppInfoRepository appInfoRepository,@Value("${oms.accurate.select.server.percentage}") int accurateSelectServerPercentage) { + this.lockService = lockService; + this.transportService = transportService; + this.appInfoRepository = appInfoRepository; + this.accurateSelectServerPercentage = accurateSelectServerPercentage; + } + public String elect(Long appId, String protocol, String currentServer) { if (!accurate()) { // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功 diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java index 2d9278ce..9b5280b5 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java @@ -4,6 +4,7 @@ import akka.pattern.Patterns; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.TypeFactory; +import lombok.RequiredArgsConstructor; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.response.AskResponse; @@ -39,10 +40,10 @@ import java.util.concurrent.CompletionStage; @Aspect @Component @Order(0) +@RequiredArgsConstructor public class DesignateServerAspect { - @Resource - private AppInfoRepository appInfoRepository; + private final AppInfoRepository appInfoRepository; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -70,7 +71,7 @@ public class DesignateServerAspect { } if (appId == null) { - throw new PowerJobException("can't find appId in params for:" + signature.toString()); + throw new PowerJobException("can't find appId in params for:" + signature); } // 获取执行机器 diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/ClusterStatusHolder.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/ClusterStatusHolder.java index e104bd3c..98b9293d 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/ClusterStatusHolder.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/ClusterStatusHolder.java @@ -21,11 +21,17 @@ import java.util.Map; @Slf4j public class ClusterStatusHolder { - // 集群所属的应用名称 + /** + * 集群所属的应用名称 + */ private final String appName; - // 集群中所有机器的信息 + /** + * 集群中所有机器的信息 + */ private final Map address2WorkerInfo; - // 集群中所有机器的容器部署状态 containerId -> (workerAddress -> containerInfo) + /** + * 集群中所有机器的容器部署状态 containerId -> (workerAddress -> containerInfo) + */ private Map> containerId2Infos; diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterManagerService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterManagerService.java index 37b94b8a..0fb8c224 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterManagerService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterManagerService.java @@ -18,8 +18,10 @@ import java.util.Set; @Slf4j public class WorkerClusterManagerService { - // 存储Worker健康信息,appId -> ClusterStatusHolder - private static final Map appId2ClusterStatus = Maps.newConcurrentMap(); + /** + * 存储Worker健康信息,appId -> ClusterStatusHolder + */ + private static final Map APP_ID_2_CLUSTER_STATUS = Maps.newConcurrentMap(); /** * 更新状态 @@ -28,7 +30,7 @@ public class WorkerClusterManagerService { public static void updateStatus(WorkerHeartbeat heartbeat) { Long appId = heartbeat.getAppId(); String appName = heartbeat.getAppName(); - ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName)); + ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName)); clusterStatusHolder.updateStatus(heartbeat); } @@ -38,7 +40,7 @@ public class WorkerClusterManagerService { */ public static void clean(List usingAppIds) { Set keys = Sets.newHashSet(usingAppIds); - appId2ClusterStatus.entrySet().removeIf(entry -> !keys.contains(entry.getKey())); + APP_ID_2_CLUSTER_STATUS.entrySet().removeIf(entry -> !keys.contains(entry.getKey())); } @@ -46,11 +48,11 @@ public class WorkerClusterManagerService { * 清理缓存信息,防止 OOM */ public static void cleanUp() { - appId2ClusterStatus.values().forEach(ClusterStatusHolder::release); + APP_ID_2_CLUSTER_STATUS.values().forEach(ClusterStatusHolder::release); } protected static Map getAppId2ClusterStatus() { - return appId2ClusterStatus; + return APP_ID_2_CLUSTER_STATUS; } } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java index 04002cb7..2cbef095 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java @@ -26,9 +26,8 @@ import java.util.Optional; @Service public class WorkerClusterQueryService { - private List workerFilters; + private final List workerFilters; - @Autowired public WorkerClusterQueryService(List workerFilters) { this.workerFilters = workerFilters; } @@ -92,7 +91,6 @@ public class WorkerClusterQueryService { */ public Optional getWorkerInfoByAddress(Long appId, String address) { // this may cause NPE while address value is null . - //return Optional.ofNullable(getWorkerInfosByAppId(appId).get(address)); final Map workerInfosByAppId = getWorkerInfosByAppId(appId); //add null check for both workerInfos Map and address if (null != workerInfosByAppId && null != address) { diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/SwaggerConfig.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/SwaggerConfig.java index fac926c5..f9c7a2fd 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/SwaggerConfig.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/SwaggerConfig.java @@ -1,5 +1,6 @@ package tech.powerjob.server.config; +import lombok.RequiredArgsConstructor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -25,10 +26,10 @@ import static springfox.documentation.builders.PathSelectors.any; @Configuration @EnableSwagger2 @ConditionalOnProperty(name = PowerJobServerConfigKey.SWAGGER_UI_ENABLE, havingValue = "true") +@RequiredArgsConstructor public class SwaggerConfig { - @Resource - private ServerInfoService serverInfoService; + private final ServerInfoService serverInfoService; @Bean public Docket createRestApi() { diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java index 899c182e..4c7777ce 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/config/ThreadPoolConfig.java @@ -62,7 +62,9 @@ public class ThreadPoolConfig { return executor; } - // 引入 WebSocket 支持后需要手动初始化调度线程池 + /** + * 引入 WebSocket 支持后需要手动初始化调度线程池 + */ @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java index 1e677c32..f50e9a90 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java @@ -1,5 +1,6 @@ package tech.powerjob.server.web.controller; +import lombok.RequiredArgsConstructor; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.server.persistence.remote.model.AppInfoDO; @@ -31,12 +32,12 @@ import java.util.stream.Collectors; */ @RestController @RequestMapping("/appInfo") +@RequiredArgsConstructor public class AppInfoController { - @Resource - private AppInfoService appInfoService; - @Resource - private AppInfoRepository appInfoRepository; + private final AppInfoService appInfoService; + + private final AppInfoRepository appInfoRepository; private static final int MAX_APP_NUM = 200; diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ContainerController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ContainerController.java index f637e5d8..8f13e0ef 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ContainerController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ContainerController.java @@ -41,15 +41,21 @@ import java.util.stream.Collectors; @RequestMapping("/container") public class ContainerController { - @Value("${server.port}") - private int port; - @Resource - private ContainerService containerService; - @Resource - private AppInfoRepository appInfoRepository; - @Resource - private ContainerInfoRepository containerInfoRepository; + private final int port; + + private final ContainerService containerService; + + private final AppInfoRepository appInfoRepository; + + private final ContainerInfoRepository containerInfoRepository; + + public ContainerController(@Value("${server.port}") int port, ContainerService containerService, AppInfoRepository appInfoRepository, ContainerInfoRepository containerInfoRepository) { + this.port = port; + this.containerService = containerService; + this.appInfoRepository = appInfoRepository; + this.containerInfoRepository = containerInfoRepository; + } @GetMapping("/downloadJar") public void downloadJar(String version, HttpServletResponse response) throws IOException { diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/OpenAPIController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/OpenAPIController.java index cd181695..0db6a9e3 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/OpenAPIController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/OpenAPIController.java @@ -1,8 +1,10 @@ package tech.powerjob.server.web.controller; -import tech.powerjob.common.enums.InstanceStatus; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.*; import tech.powerjob.common.OpenAPIConstant; import tech.powerjob.common.PowerQuery; +import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.request.http.SaveWorkflowNodeRequest; import tech.powerjob.common.request.http.SaveWorkflowRequest; @@ -11,19 +13,16 @@ import tech.powerjob.common.response.InstanceInfoDTO; import tech.powerjob.common.response.JobInfoDTO; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.common.response.WorkflowInstanceInfoDTO; -import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; -import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO; +import tech.powerjob.server.core.instance.InstanceService; import tech.powerjob.server.core.service.AppInfoService; import tech.powerjob.server.core.service.CacheService; import tech.powerjob.server.core.service.JobService; -import tech.powerjob.server.core.instance.InstanceService; import tech.powerjob.server.core.workflow.WorkflowInstanceService; import tech.powerjob.server.core.workflow.WorkflowService; +import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; +import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO; import tech.powerjob.server.web.response.WorkflowInfoVO; -import org.springframework.web.bind.annotation.*; -import javax.annotation.Resource; -import java.text.ParseException; import java.util.List; /** @@ -34,21 +33,20 @@ import java.util.List; */ @RestController @RequestMapping(OpenAPIConstant.WEB_PATH) +@RequiredArgsConstructor public class OpenAPIController { - @Resource - private AppInfoService appInfoService; - @Resource - private JobService jobService; - @Resource - private InstanceService instanceService; - @Resource - private WorkflowService workflowService; - @Resource - private WorkflowInstanceService workflowInstanceService; + private final AppInfoService appInfoService; - @Resource - private CacheService cacheService; + private final JobService jobService; + + private final InstanceService instanceService; + + private final WorkflowService workflowService; + + private final WorkflowInstanceService workflowInstanceService; + + private final CacheService cacheService; @PostMapping(OpenAPIConstant.ASSERT) @@ -59,7 +57,7 @@ public class OpenAPIController { /* ************* Job 区 ************* */ @PostMapping(OpenAPIConstant.SAVE_JOB) - public ResultDTO saveJob(@RequestBody SaveJobInfoRequest request) throws ParseException { + public ResultDTO saveJob(@RequestBody SaveJobInfoRequest request) { if (request.getId() != null) { checkJobIdValid(request.getId(), request.getAppId()); } @@ -102,7 +100,7 @@ public class OpenAPIController { } @PostMapping(OpenAPIConstant.ENABLE_JOB) - public ResultDTO enableJob(Long jobId, Long appId) throws ParseException { + public ResultDTO enableJob(Long jobId, Long appId) { checkJobIdValid(jobId, appId); jobService.enableJob(jobId); return ResultDTO.success(null); @@ -156,7 +154,7 @@ public class OpenAPIController { /* ************* Workflow 区 ************* */ @PostMapping(OpenAPIConstant.SAVE_WORKFLOW) - public ResultDTO saveWorkflow(@RequestBody SaveWorkflowRequest request) throws ParseException { + public ResultDTO saveWorkflow(@RequestBody SaveWorkflowRequest request) { return ResultDTO.success(workflowService.saveWorkflow(request)); } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java index 7f60fb05..ada346e3 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java @@ -2,6 +2,11 @@ package tech.powerjob.server.web.controller; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.NetUtils; @@ -10,12 +15,7 @@ import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; import tech.powerjob.server.remote.server.election.ServerElectionService; import tech.powerjob.server.remote.transport.TransportService; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; -import javax.annotation.Resource; import java.util.Optional; import java.util.TimeZone; @@ -28,16 +28,16 @@ import java.util.TimeZone; */ @RestController @RequestMapping("/server") +@RequiredArgsConstructor public class ServerController { - @Resource - private TransportService transportService; - @Resource - private ServerElectionService serverElectionService; - @Resource - private AppInfoRepository appInfoRepository; - @Resource - private WorkerClusterQueryService workerClusterQueryService; + private final TransportService transportService; + + private final ServerElectionService serverElectionService; + + private final AppInfoRepository appInfoRepository; + + private final WorkerClusterQueryService workerClusterQueryService; @GetMapping("/assert") public ResultDTO assertAppName(String appName) { diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/SystemInfoController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/SystemInfoController.java index 9dda6ea2..5864fecc 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/SystemInfoController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/SystemInfoController.java @@ -1,25 +1,24 @@ package tech.powerjob.server.web.controller; -import tech.powerjob.common.enums.InstanceStatus; -import tech.powerjob.common.OmsConstant; -import tech.powerjob.common.response.ResultDTO; -import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.common.module.ServerInfo; -import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; -import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; -import tech.powerjob.server.remote.server.self.ServerInfoService; -import tech.powerjob.server.remote.worker.WorkerClusterQueryService; -import tech.powerjob.server.common.module.WorkerInfo; -import tech.powerjob.server.web.response.SystemOverviewVO; -import tech.powerjob.server.web.response.WorkerStatusVO; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.DateUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import tech.powerjob.common.OmsConstant; +import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.response.ResultDTO; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.module.WorkerInfo; +import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; +import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; +import tech.powerjob.server.remote.server.self.ServerInfoService; +import tech.powerjob.server.remote.worker.WorkerClusterQueryService; +import tech.powerjob.server.web.response.SystemOverviewVO; +import tech.powerjob.server.web.response.WorkerStatusVO; -import javax.annotation.Resource; import java.util.Date; import java.util.List; import java.util.TimeZone; @@ -34,17 +33,16 @@ import java.util.stream.Collectors; @Slf4j @RestController @RequestMapping("/system") +@RequiredArgsConstructor public class SystemInfoController { - @Resource - private JobInfoRepository jobInfoRepository; - @Resource - private InstanceInfoRepository instanceInfoRepository; + private final JobInfoRepository jobInfoRepository; - @Resource - private ServerInfoService serverInfoService; - @Resource - private WorkerClusterQueryService workerClusterQueryService; + private final InstanceInfoRepository instanceInfoRepository; + + private final ServerInfoService serverInfoService; + + private final WorkerClusterQueryService workerClusterQueryService; @GetMapping("/listWorker") public ResultDTO> listWorker(Long appId) { diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ValidateController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ValidateController.java index 4b0e2c2a..5b07bb24 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ValidateController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ValidateController.java @@ -1,6 +1,7 @@ package tech.powerjob.server.web.controller; import com.google.common.collect.Lists; +import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -10,7 +11,6 @@ import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.server.core.scheduler.TimingStrategyService; -import javax.annotation.Resource; import java.util.List; /** @@ -22,10 +22,10 @@ import java.util.List; */ @RestController @RequestMapping("/validate") +@RequiredArgsConstructor public class ValidateController { - @Resource - private TimingStrategyService timingStrategyService; + private final TimingStrategyService timingStrategyService; @GetMapping("/timeExpression") public ResultDTO> checkTimeExpression(TimeExpressionType timeExpressionType, diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/GenerateContainerTemplateRequest.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/GenerateContainerTemplateRequest.java index af8e5085..0ce75280 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/GenerateContainerTemplateRequest.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/GenerateContainerTemplateRequest.java @@ -11,15 +11,25 @@ import lombok.Data; @Data public class GenerateContainerTemplateRequest { - // Maven Group + /** + * Maven Group + */ private String group; - // Maven artifact + /** + * Maven artifact + */ private String artifact; - // Maven name + /** + * Maven name + */ private String name; - // 包名(com.xx.xx.xx) + /** + * 包名(com.xx.xx.xx) + */ private String packageName; - // Java版本号,8或者11 + /** + * Java版本号,8或者11 + */ private Integer javaVersion; } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/ModifyUserInfoRequest.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/ModifyUserInfoRequest.java index 486c5617..db43e346 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/ModifyUserInfoRequest.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/ModifyUserInfoRequest.java @@ -17,8 +17,12 @@ public class ModifyUserInfoRequest { private String password; private String webHook; - // 手机号 + /** + * 手机号 + */ private String phone; - // 邮箱地址 + /** + * 邮箱地址 + */ private String email; } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryInstanceRequest.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryInstanceRequest.java index cf80f1db..f8e3e9c5 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryInstanceRequest.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryInstanceRequest.java @@ -1,7 +1,7 @@ package tech.powerjob.server.web.request; -import tech.powerjob.server.common.constants.InstanceType; import lombok.Data; +import tech.powerjob.server.common.constants.InstanceType; /** * 任务实例查询对象 @@ -12,14 +12,21 @@ import lombok.Data; @Data public class QueryInstanceRequest { - // 任务所属应用ID + /** + * 任务所属应用ID + */ private Long appId; - // 当前页码 + /** + * 当前页码 + */ private Integer index; - // 页大小 + /** + * 页大小 + */ private Integer pageSize; - - // 查询条件(NORMAL/WORKFLOW) + /** + * 查询条件(NORMAL/WORKFLOW) + */ private InstanceType type; private Long instanceId; private Long jobId; diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryJobInfoRequest.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryJobInfoRequest.java index 10fd568c..9f615265 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryJobInfoRequest.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryJobInfoRequest.java @@ -11,14 +11,21 @@ import lombok.Data; @Data public class QueryJobInfoRequest { - // 任务所属应用ID + /** + * 任务所属应用ID + */ private Long appId; - // 当前页码 + /** + * 当前页码 + */ private Integer index; - // 页大小 + /** + * 页大小 + */ private Integer pageSize; - - // 查询条件 + /** + * 任务ID + */ private Long jobId; private String keyword; } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryWorkflowInfoRequest.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryWorkflowInfoRequest.java index 4c6b96b1..6e89862a 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryWorkflowInfoRequest.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryWorkflowInfoRequest.java @@ -11,14 +11,21 @@ import lombok.Data; @Data public class QueryWorkflowInfoRequest { - // 任务所属应用ID + /** + * 任务所属应用ID + */ private Long appId; - // 当前页码 + /** + * 当前页码 + */ private Integer index; - // 页大小 + /** + * 页大小 + */ private Integer pageSize; - - // 查询条件 + /** + * 查询条件 + */ private Long workflowId; private String keyword; diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryWorkflowInstanceRequest.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryWorkflowInstanceRequest.java index 9eb5c1a5..39343506 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryWorkflowInstanceRequest.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/QueryWorkflowInstanceRequest.java @@ -11,15 +11,23 @@ import lombok.Data; @Data public class QueryWorkflowInstanceRequest { - // 任务所属应用ID + /** + * 任务所属应用ID + */ private Long appId; - // 当前页码 + /** + * 当前页码 + */ private Integer index; - // 页大小 + /** + * 页大小 + */ private Integer pageSize; - - // 查询条件(NORMAL/WORKFLOW) + /** + * 查询条件(NORMAL/WORKFLOW) + */ private Long wfInstanceId; + private Long workflowId; private String status; diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/SaveContainerInfoRequest.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/SaveContainerInfoRequest.java index 69c0664b..aa734c14 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/SaveContainerInfoRequest.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/SaveContainerInfoRequest.java @@ -14,21 +14,33 @@ import lombok.Data; @Data public class SaveContainerInfoRequest { - // 容器ID,null -> 创建;否则代表修改 + /** + * 容器ID,null -> 创建;否则代表修改 + */ private Long id; - // 所属的应用ID + /** + * 所属的应用ID + */ private Long appId; - // 容器名称 + /** + * 容器名称 + */ private String containerName; - // 容器类型,枚举值为 ContainerSourceType(JarFile/Git) + /** + * 容器类型,枚举值为 ContainerSourceType(JarFile/Git) + */ private ContainerSourceType sourceType; - // 由 sourceType 决定,JarFile -> String,存储文件名称;Git -> JSON,包括 URL,branch,username,password + /** + * 由 sourceType 决定,JarFile -> String,存储文件名称;Git -> JSON,包括 URL,branch,username,password + */ private String sourceInfo; - // 状态,枚举值为 ContainerStatus(ENABLE/DISABLE) + /** + * 状态,枚举值为 ContainerStatus(ENABLE/DISABLE) + */ private SwitchableStatus status; public void valid() { diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/ContainerInfoVO.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/ContainerInfoVO.java index dfc7d4ec..c1b2ce63 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/ContainerInfoVO.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/ContainerInfoVO.java @@ -17,18 +17,25 @@ public class ContainerInfoVO { private String containerName; - // 容器类型,枚举值为 ContainerSourceType + /** + * 容器类型,枚举值为 ContainerSourceType + */ private String sourceType; - // 由 sourceType 决定,JarFile -> String,存储文件名称;Git -> JSON,包括 URL,branch,username,password + /** + * 由 sourceType 决定,JarFile -> String,存储文件名称;Git -> JSON,包括 URL,branch,username,password + */ private String sourceInfo; - - // 版本 (Jar包使用md5,Git使用commitId,前者32位,后者40位,不会产生碰撞) + /** + * 版本 (Jar包使用md5,Git使用commitId,前者32位,后者40位,不会产生碰撞) + */ private String version; - - // 状态,枚举值为 ContainerStatus + /** + * 状态,枚举值为 ContainerStatus + */ private String status; - - // 上一次部署时间 + /** + * 上一次部署时间 + */ private String lastDeployTime; private Date gmtCreate; diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/InstanceInfoVO.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/InstanceInfoVO.java index 67fa351f..399e627b 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/InstanceInfoVO.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/InstanceInfoVO.java @@ -15,29 +15,44 @@ import org.springframework.beans.BeanUtils; @Data public class InstanceInfoVO { - // 任务ID(JS精度丢失) + /** + * 任务ID(JS精度丢失) + */ private String jobId; - // 任务名称 + /** + * 任务名称 + */ private String jobName; - // 任务实例ID(JS精度丢失) + /** + * 任务实例ID(JS精度丢失) + */ private String instanceId; - // 该任务实例所属的 workflow ID,仅 workflow 任务存在 + /** + * 该任务实例所属的 workflow ID,仅 workflow 任务存在 + */ private String wfInstanceId; - - // 执行结果 + /** + * 执行结果 + */ private String result; - - // TaskTracker地址 + /** + * TaskTracker地址 + */ private String taskTrackerAddress; - - // 总共执行的次数(用于重试判断) + /** + * 总共执行的次数(用于重试判断) + */ private Long runningTimes; private int status; /* ********** 不一致区域 ********** */ - // 实际触发时间(需要格式化为人看得懂的时间) + /** + * 实际触发时间(需要格式化为人看得懂的时间) + */ private String actualTriggerTime; - // 结束时间(同理,需要格式化) + /** + * 结束时间(同理,需要格式化) + */ private String finishedTime; public static InstanceInfoVO from(InstanceInfoDO instanceInfoDo, String jobName) { diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/SystemOverviewVO.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/SystemOverviewVO.java index c7b94500..0f4f9526 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/SystemOverviewVO.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/SystemOverviewVO.java @@ -17,9 +17,13 @@ public class SystemOverviewVO { private long jobCount; private long runningInstanceCount; private long failedInstanceCount; - // 服务器时区 + /** + * 服务器时区 + */ private String timezone; - // 服务器时间 + /** + * 服务器时间 + */ private String serverTime; private ServerInfo serverInfo; diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/WorkerStatusVO.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/WorkerStatusVO.java index 32bd6ee0..fc817d95 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/WorkerStatusVO.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/WorkerStatusVO.java @@ -27,12 +27,18 @@ public class WorkerStatusVO { private String tag; private String lastActiveTime; - // 1 -> 健康,绿色,2 -> 一般,橙色,3 -> 糟糕,红色,9999 -> 非在线机器 + /** + * 1 -> 健康,绿色,2 -> 一般,橙色,3 -> 糟糕,红色,9999 -> 非在线机器 + */ private int status; - // 12.3%(4 cores) + /** + * 12.3%(4 cores) + */ private static final String CPU_FORMAT = "%s / %s cores"; - // 27.7%(2.9/8.0 GB) + /** + * 27.7%(2.9/8.0 GB) + */ private static final String OTHER_FORMAT = "%s%%(%s / %s GB)"; private static final DecimalFormat df = new DecimalFormat("#.#");