mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
refactor: optimize the code of the server module
1. use constructor based dependency injection to replace field injection and solve the problem of circular dependencies 2. replace deprecated API calls
This commit is contained in:
parent
5189634b60
commit
39eb79de54
@ -11,7 +11,9 @@ import tech.powerjob.server.common.timewheel.Timer;
|
||||
*/
|
||||
public class HashedWheelTimerHolder {
|
||||
|
||||
// 非精确时间轮,每 5S 走一格
|
||||
/**
|
||||
* 非精确时间轮,每 5S 走一格
|
||||
*/
|
||||
public static final Timer INACCURATE_TIMER = new HashedWheelTimer(5000, 16, 0);
|
||||
|
||||
private HashedWheelTimerHolder() {
|
||||
|
@ -19,14 +19,22 @@ public class InstanceTimeWheelService {
|
||||
|
||||
private static final Map<Long, TimerFuture> CARGO = Maps.newConcurrentMap();
|
||||
|
||||
// 精确调度时间轮,每 1MS 走一格
|
||||
/**
|
||||
* 精确调度时间轮,每 1MS 走一格
|
||||
*/
|
||||
private static final Timer TIMER = new HashedWheelTimer(1, 4096, Runtime.getRuntime().availableProcessors() * 4);
|
||||
// 非精确调度时间轮,用于处理高延迟任务,每 10S 走一格
|
||||
/**
|
||||
* 非精确调度时间轮,用于处理高延迟任务,每 10S 走一格
|
||||
*/
|
||||
private static final Timer SLOW_TIMER = new HashedWheelTimer(10000, 12, 0);
|
||||
|
||||
// 支持取消的时间间隔,低于该阈值则不会放进 CARGO
|
||||
/**
|
||||
* 支持取消的时间间隔,低于该阈值则不会放进 CARGO
|
||||
*/
|
||||
private static final long MIN_INTERVAL_MS = 1000;
|
||||
// 长延迟阈值
|
||||
/**
|
||||
* 长延迟阈值
|
||||
*/
|
||||
private static final long LONG_DELAY_THRESHOLD_MS = 60000;
|
||||
|
||||
/**
|
||||
|
@ -25,8 +25,8 @@ import java.lang.reflect.Method;
|
||||
@Slf4j
|
||||
public class AOPUtils {
|
||||
|
||||
private static final ExpressionParser parser = new SpelExpressionParser();
|
||||
private static final ParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();
|
||||
private static final ExpressionParser PARSER = new SpelExpressionParser();
|
||||
private static final ParameterNameDiscoverer DISCOVERER = new LocalVariableTableParameterNameDiscoverer();
|
||||
|
||||
public static String parseRealClassName(JoinPoint joinPoint) {
|
||||
return joinPoint.getSignature().getDeclaringType().getSimpleName();
|
||||
@ -50,7 +50,7 @@ public class AOPUtils {
|
||||
}
|
||||
|
||||
public static <T> T parseSpEl(Method method, Object[] arguments, String spEl, Class<T> clazz, T defaultResult) {
|
||||
String[] params = discoverer.getParameterNames(method);
|
||||
String[] params = DISCOVERER.getParameterNames(method);
|
||||
assert params != null;
|
||||
|
||||
EvaluationContext context = new StandardEvaluationContext();
|
||||
@ -58,7 +58,7 @@ public class AOPUtils {
|
||||
context.setVariable(params[len], arguments[len]);
|
||||
}
|
||||
try {
|
||||
Expression expression = parser.parseExpression(spEl);
|
||||
Expression expression = PARSER.parseExpression(spEl);
|
||||
return expression.getValue(context, clazz);
|
||||
} catch (Exception e) {
|
||||
log.error("[AOPUtils] parse SpEL failed for method[{}], please concat @tjq to fix the bug!", method.getName(), e);
|
||||
|
@ -19,9 +19,13 @@ import java.util.List;
|
||||
@Slf4j
|
||||
public class TimeUtils {
|
||||
|
||||
// NTP 授时服务器(阿里云 -> 交大 -> 水果)
|
||||
/**
|
||||
* NTP 授时服务器(阿里云 -> 交大 -> 水果)
|
||||
*/
|
||||
private static final List<String> NTP_SERVER_LIST = Lists.newArrayList("ntp.aliyun.com", "ntp.sjtu.edu.cn", "time1.apple.com");
|
||||
// 最大误差 5S
|
||||
/**
|
||||
* 最大误差 5S
|
||||
*/
|
||||
private static final long MAX_OFFSET = 5000;
|
||||
|
||||
public static void check() throws TimeCheckException {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.server.core.handler;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
@ -22,7 +23,6 @@ import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepositor
|
||||
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
@ -34,17 +34,18 @@ import java.util.stream.Collectors;
|
||||
* @author tjq
|
||||
* @since 2022/9/11
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
|
||||
|
||||
@Resource
|
||||
protected MonitorService monitorService;
|
||||
@Resource
|
||||
protected Environment environment;
|
||||
@Resource
|
||||
protected ContainerInfoRepository containerInfoRepository;
|
||||
@Resource
|
||||
private WorkerClusterQueryService workerClusterQueryService;
|
||||
|
||||
protected final MonitorService monitorService;
|
||||
|
||||
protected final Environment environment;
|
||||
|
||||
protected final ContainerInfoRepository containerInfoRepository;
|
||||
|
||||
private final WorkerClusterQueryService workerClusterQueryService;
|
||||
|
||||
protected abstract void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event);
|
||||
|
||||
|
@ -1,9 +1,7 @@
|
||||
package tech.powerjob.server.core.handler;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* WorkerRequestHandlerHolder
|
||||
@ -16,13 +14,14 @@ public class WorkerRequestHandlerHolder {
|
||||
|
||||
private static IWorkerRequestHandler workerRequestHandler;
|
||||
|
||||
public WorkerRequestHandlerHolder(IWorkerRequestHandler injectedWorkerRequestHandler) {
|
||||
workerRequestHandler = injectedWorkerRequestHandler;
|
||||
}
|
||||
|
||||
public static IWorkerRequestHandler fetchWorkerRequestHandler() {
|
||||
if (workerRequestHandler == null){
|
||||
throw new IllegalStateException("WorkerRequestHandlerHolder not initialized!");
|
||||
}
|
||||
return workerRequestHandler;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setWorkerRequestHandler(IWorkerRequestHandler workerRequestHandler) {
|
||||
WorkerRequestHandlerHolder.workerRequestHandler = workerRequestHandler;
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package tech.powerjob.server.core.handler;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
@ -12,12 +12,14 @@ import tech.powerjob.common.response.AskResponse;
|
||||
import tech.powerjob.server.core.instance.InstanceLogService;
|
||||
import tech.powerjob.server.core.instance.InstanceManager;
|
||||
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
|
||||
import tech.powerjob.server.monitor.MonitorService;
|
||||
import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent;
|
||||
import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent;
|
||||
import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent;
|
||||
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
@ -30,12 +32,19 @@ import java.util.Optional;
|
||||
@Component
|
||||
public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler {
|
||||
|
||||
@Resource
|
||||
private InstanceManager instanceManager;
|
||||
@Resource
|
||||
private WorkflowInstanceManager workflowInstanceManager;
|
||||
@Resource
|
||||
private InstanceLogService instanceLogService;
|
||||
private final InstanceManager instanceManager;
|
||||
|
||||
private final WorkflowInstanceManager workflowInstanceManager;
|
||||
|
||||
private final InstanceLogService instanceLogService;
|
||||
|
||||
public WorkerRequestHandlerImpl(InstanceManager instanceManager, WorkflowInstanceManager workflowInstanceManager, InstanceLogService instanceLogService,
|
||||
MonitorService monitorService, Environment environment, ContainerInfoRepository containerInfoRepository, WorkerClusterQueryService workerClusterQueryService) {
|
||||
super(monitorService, environment, containerInfoRepository, workerClusterQueryService);
|
||||
this.instanceManager = instanceManager;
|
||||
this.workflowInstanceManager = workflowInstanceManager;
|
||||
this.instanceLogService = instanceLogService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) {
|
||||
|
@ -56,6 +56,7 @@ public class InstanceLogService {
|
||||
|
||||
@Resource
|
||||
private InstanceMetadataService instanceMetadataService;
|
||||
|
||||
@Resource
|
||||
private GridFsManager gridFsManager;
|
||||
/**
|
||||
@ -63,6 +64,7 @@ public class InstanceLogService {
|
||||
*/
|
||||
@Resource(name = "localTransactionTemplate")
|
||||
private TransactionTemplate localTransactionTemplate;
|
||||
|
||||
@Resource
|
||||
private LocalInstanceLogRepository localInstanceLogRepository;
|
||||
|
||||
|
@ -1,16 +1,15 @@
|
||||
package tech.powerjob.server.core.instance;
|
||||
|
||||
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
@ -23,17 +22,23 @@ import java.util.concurrent.ExecutionException;
|
||||
@Service
|
||||
public class InstanceMetadataService implements InitializingBean {
|
||||
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
private final JobInfoRepository jobInfoRepository;
|
||||
|
||||
// 缓存,一旦生成任务实例,其对应的 JobInfo 不应该再改变(即使源数据改变)
|
||||
private final InstanceInfoRepository instanceInfoRepository;
|
||||
/**
|
||||
* 缓存,一旦生成任务实例,其对应的 JobInfo 不应该再改变(即使源数据改变)
|
||||
*/
|
||||
private Cache<Long, JobInfoDO> instanceId2JobInfoCache;
|
||||
|
||||
@Value("${oms.instance.metadata.cache.size}")
|
||||
private int instanceMetadataCacheSize;
|
||||
private static final int CACHE_CONCURRENCY_LEVEL = 16;
|
||||
private final int instanceMetadataCacheSize;
|
||||
|
||||
private static final int CACHE_CONCURRENCY_LEVEL = 32;
|
||||
|
||||
public InstanceMetadataService(JobInfoRepository jobInfoRepository, InstanceInfoRepository instanceInfoRepository, @Value("${oms.instance.metadata.cache.size}") Integer instanceMetadataCacheSize) {
|
||||
this.jobInfoRepository = jobInfoRepository;
|
||||
this.instanceInfoRepository = instanceInfoRepository;
|
||||
this.instanceMetadataCacheSize = instanceMetadataCacheSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
|
@ -1,13 +1,14 @@
|
||||
package tech.powerjob.server.core.instance;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.common.PowerQuery;
|
||||
import tech.powerjob.common.SystemInstanceResult;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.enums.Protocol;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.common.model.InstanceDetail;
|
||||
import tech.powerjob.common.request.ServerQueryInstanceStatusReq;
|
||||
import tech.powerjob.common.request.ServerStopInstanceReq;
|
||||
@ -28,7 +29,6 @@ import tech.powerjob.server.remote.server.redirector.DesignateServer;
|
||||
import tech.powerjob.server.remote.transport.TransportService;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -45,23 +45,22 @@ import static tech.powerjob.common.enums.InstanceStatus.STOPPED;
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class InstanceService {
|
||||
|
||||
@Resource
|
||||
private TransportService transportService;
|
||||
@Resource
|
||||
private DispatchService dispatchService;
|
||||
@Resource
|
||||
private IdGenerateService idGenerateService;
|
||||
@Resource
|
||||
private InstanceManager instanceManager;
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
private final TransportService transportService;
|
||||
|
||||
@Resource
|
||||
private WorkerClusterQueryService workerClusterQueryService;
|
||||
private final DispatchService dispatchService;
|
||||
|
||||
private final IdGenerateService idGenerateService;
|
||||
|
||||
private final InstanceManager instanceManager;
|
||||
|
||||
private final JobInfoRepository jobInfoRepository;
|
||||
|
||||
private final InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
private final WorkerClusterQueryService workerClusterQueryService;
|
||||
|
||||
/**
|
||||
* 创建任务实例(注意,该方法并不调用 saveAndFlush,如果有需要立即同步到DB的需求,请在方法结束后手动调用 flush)
|
||||
|
@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.annotation.Around;
|
||||
@ -30,10 +31,10 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
@Aspect
|
||||
@Component
|
||||
@Order(1)
|
||||
@RequiredArgsConstructor
|
||||
public class UseCacheLockAspect {
|
||||
|
||||
@Resource
|
||||
private MonitorService monitorService;
|
||||
private final MonitorService monitorService;
|
||||
|
||||
private final Map<String, Cache<String, ReentrantLock>> lockContainer = Maps.newConcurrentMap();
|
||||
|
||||
|
@ -1,15 +1,5 @@
|
||||
package tech.powerjob.server.core.scheduler;
|
||||
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.enums.WorkflowInstanceStatus;
|
||||
import tech.powerjob.server.common.constants.PJThreadPool;
|
||||
import tech.powerjob.server.common.utils.OmsFileUtils;
|
||||
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
|
||||
import tech.powerjob.server.persistence.mongodb.GridFsManager;
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
|
||||
import tech.powerjob.server.extension.LockService;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -18,8 +8,17 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.enums.WorkflowInstanceStatus;
|
||||
import tech.powerjob.server.common.constants.PJThreadPool;
|
||||
import tech.powerjob.server.common.utils.OmsFileUtils;
|
||||
import tech.powerjob.server.extension.LockService;
|
||||
import tech.powerjob.server.persistence.mongodb.GridFsManager;
|
||||
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterManagerService;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.File;
|
||||
import java.util.Date;
|
||||
|
||||
@ -33,25 +32,21 @@ import java.util.Date;
|
||||
@Service
|
||||
public class CleanService {
|
||||
|
||||
@Resource
|
||||
private GridFsManager gridFsManager;
|
||||
@Resource
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
@Resource
|
||||
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
|
||||
@Resource
|
||||
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
|
||||
@Resource
|
||||
private LockService lockService;
|
||||
private final GridFsManager gridFsManager;
|
||||
|
||||
@Value("${oms.instanceinfo.retention}")
|
||||
private int instanceInfoRetentionDay;
|
||||
private final InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
@Value("${oms.container.retention.local}")
|
||||
private int localContainerRetentionDay;
|
||||
@Value("${oms.container.retention.remote}")
|
||||
private int remoteContainerRetentionDay;
|
||||
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
|
||||
|
||||
private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
|
||||
|
||||
private final LockService lockService;
|
||||
|
||||
private final int instanceInfoRetentionDay;
|
||||
|
||||
private final int localContainerRetentionDay;
|
||||
|
||||
private final int remoteContainerRetentionDay;
|
||||
|
||||
private static final int TEMPORARY_RETENTION_DAY = 3;
|
||||
|
||||
@ -62,6 +57,21 @@ public class CleanService {
|
||||
|
||||
private static final String HISTORY_DELETE_LOCK = "history_delete_lock";
|
||||
|
||||
public CleanService(GridFsManager gridFsManager, InstanceInfoRepository instanceInfoRepository, WorkflowInstanceInfoRepository workflowInstanceInfoRepository,
|
||||
WorkflowNodeInfoRepository workflowNodeInfoRepository, LockService lockService,
|
||||
@Value("${oms.instanceinfo.retention}") int instanceInfoRetentionDay,
|
||||
@Value("${oms.container.retention.local}") int localContainerRetentionDay,
|
||||
@Value("${oms.container.retention.remote}") int remoteContainerRetentionDay) {
|
||||
this.gridFsManager = gridFsManager;
|
||||
this.instanceInfoRepository = instanceInfoRepository;
|
||||
this.workflowInstanceInfoRepository = workflowInstanceInfoRepository;
|
||||
this.workflowNodeInfoRepository = workflowNodeInfoRepository;
|
||||
this.lockService = lockService;
|
||||
this.instanceInfoRetentionDay = instanceInfoRetentionDay;
|
||||
this.localContainerRetentionDay = localContainerRetentionDay;
|
||||
this.remoteContainerRetentionDay = remoteContainerRetentionDay;
|
||||
}
|
||||
|
||||
|
||||
@Async(PJThreadPool.TIMING_POOL)
|
||||
@Scheduled(cron = CLEAN_TIME_EXPRESSION)
|
||||
|
@ -1,26 +1,27 @@
|
||||
package tech.powerjob.server.core.scheduler;
|
||||
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.SystemInstanceResult;
|
||||
import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.enums.WorkflowInstanceStatus;
|
||||
import tech.powerjob.server.common.constants.PJThreadPool;
|
||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
|
||||
import tech.powerjob.server.persistence.remote.model.*;
|
||||
import tech.powerjob.server.persistence.remote.repository.*;
|
||||
import tech.powerjob.server.core.DispatchService;
|
||||
import tech.powerjob.server.core.instance.InstanceManager;
|
||||
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import tech.powerjob.common.SystemInstanceResult;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.enums.WorkflowInstanceStatus;
|
||||
import tech.powerjob.server.common.constants.PJThreadPool;
|
||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||
import tech.powerjob.server.core.DispatchService;
|
||||
import tech.powerjob.server.core.instance.InstanceManager;
|
||||
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
|
||||
import tech.powerjob.server.persistence.remote.model.*;
|
||||
import tech.powerjob.server.persistence.remote.repository.*;
|
||||
import tech.powerjob.server.remote.transport.starter.AkkaStarter;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -34,6 +35,7 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class InstanceStatusCheckService {
|
||||
|
||||
private static final int MAX_BATCH_NUM = 10;
|
||||
@ -42,23 +44,21 @@ public class InstanceStatusCheckService {
|
||||
private static final long RUNNING_TIMEOUT_MS = 60000;
|
||||
private static final long WORKFLOW_WAITING_TIMEOUT_MS = 60000;
|
||||
|
||||
@Resource
|
||||
private DispatchService dispatchService;
|
||||
@Resource
|
||||
private InstanceManager instanceManager;
|
||||
@Resource
|
||||
private WorkflowInstanceManager workflowInstanceManager;
|
||||
private final DispatchService dispatchService;
|
||||
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
@Resource
|
||||
private WorkflowInfoRepository workflowInfoRepository;
|
||||
@Resource
|
||||
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
|
||||
private final InstanceManager instanceManager;
|
||||
|
||||
private final WorkflowInstanceManager workflowInstanceManager;
|
||||
|
||||
private final AppInfoRepository appInfoRepository;
|
||||
|
||||
private final JobInfoRepository jobInfoRepository;
|
||||
|
||||
private final InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
private final WorkflowInfoRepository workflowInfoRepository;
|
||||
|
||||
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
|
||||
|
||||
@Async(PJThreadPool.TIMING_POOL)
|
||||
@Scheduled(fixedDelay = 10000)
|
||||
|
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.server.core.scheduler;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.model.LifeCycle;
|
||||
@ -30,7 +31,6 @@ import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -44,6 +44,7 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class PowerScheduleService {
|
||||
|
||||
/**
|
||||
@ -51,26 +52,23 @@ public class PowerScheduleService {
|
||||
*/
|
||||
private static final int MAX_APP_NUM = 10;
|
||||
|
||||
@Resource
|
||||
private DispatchService dispatchService;
|
||||
@Resource
|
||||
private InstanceService instanceService;
|
||||
@Resource
|
||||
private WorkflowInstanceManager workflowInstanceManager;
|
||||
private final DispatchService dispatchService;
|
||||
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private WorkflowInfoRepository workflowInfoRepository;
|
||||
@Resource
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
private final InstanceService instanceService;
|
||||
|
||||
@Resource
|
||||
private JobService jobService;
|
||||
@Resource
|
||||
private TimingStrategyService timingStrategyService;
|
||||
private final WorkflowInstanceManager workflowInstanceManager;
|
||||
|
||||
private final AppInfoRepository appInfoRepository;
|
||||
|
||||
private final JobInfoRepository jobInfoRepository;
|
||||
|
||||
private final WorkflowInfoRepository workflowInfoRepository;
|
||||
|
||||
private final InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
private final JobService jobService;
|
||||
|
||||
private final TimingStrategyService timingStrategyService;
|
||||
|
||||
private static final long SCHEDULE_RATE = 15000;
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.server.core.service;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
|
||||
@ -15,10 +16,10 @@ import java.util.Objects;
|
||||
* @since 2020/6/20
|
||||
*/
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class AppInfoService {
|
||||
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
private final AppInfoRepository appInfoRepository;
|
||||
|
||||
/**
|
||||
* 验证应用访问权限
|
||||
|
@ -1,17 +1,16 @@
|
||||
package tech.powerjob.server.core.service;
|
||||
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
|
||||
@ -25,19 +24,23 @@ import java.util.Optional;
|
||||
@Service
|
||||
public class CacheService {
|
||||
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private WorkflowInfoRepository workflowInfoRepository;
|
||||
@Resource
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
private final JobInfoRepository jobInfoRepository;
|
||||
|
||||
private final WorkflowInfoRepository workflowInfoRepository;
|
||||
|
||||
private final InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
private final Cache<Long, String> jobId2JobNameCache;
|
||||
private final Cache<Long, String> workflowId2WorkflowNameCache;
|
||||
private final Cache<Long, Long> instanceId2AppId;
|
||||
private final Cache<Long, Long> jobId2AppId;
|
||||
|
||||
public CacheService() {
|
||||
public CacheService(JobInfoRepository jobInfoRepository, WorkflowInfoRepository workflowInfoRepository, InstanceInfoRepository instanceInfoRepository) {
|
||||
|
||||
this.jobInfoRepository = jobInfoRepository;
|
||||
this.workflowInfoRepository = workflowInfoRepository;
|
||||
this.instanceInfoRepository = instanceInfoRepository;
|
||||
|
||||
jobId2JobNameCache = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(Duration.ofMinutes(1))
|
||||
.maximumSize(512)
|
||||
|
@ -2,6 +2,7 @@ package tech.powerjob.server.core.service;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.data.jpa.domain.Specification;
|
||||
@ -43,20 +44,18 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class JobService {
|
||||
|
||||
@Resource
|
||||
private InstanceService instanceService;
|
||||
private final InstanceService instanceService;
|
||||
|
||||
@Resource
|
||||
private DispatchService dispatchService;
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
@Resource
|
||||
private TimingStrategyService timingStrategyService;
|
||||
private final DispatchService dispatchService;
|
||||
|
||||
private final JobInfoRepository jobInfoRepository;
|
||||
|
||||
private final InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
private final TimingStrategyService timingStrategyService;
|
||||
|
||||
/**
|
||||
* 保存/修改任务
|
||||
@ -205,9 +204,8 @@ public class JobService {
|
||||
* 启用某个任务
|
||||
*
|
||||
* @param jobId 任务ID
|
||||
* @throws ParseException 异常(CRON表达式错误)
|
||||
*/
|
||||
public void enableJob(Long jobId) throws ParseException {
|
||||
public void enableJob(Long jobId) {
|
||||
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId));
|
||||
|
||||
jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV());
|
||||
|
@ -18,11 +18,10 @@ import tech.powerjob.server.remote.server.self.ServerInfoService;
|
||||
public class IdGenerateService {
|
||||
|
||||
private final SnowFlakeIdGenerator snowFlakeIdGenerator;
|
||||
|
||||
private static final int DATA_CENTER_ID = 0;
|
||||
|
||||
@Autowired
|
||||
public IdGenerateService(ServerInfoService serverInfoService) {
|
||||
|
||||
long id = serverInfoService.fetchServiceInfo().getId();
|
||||
snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);
|
||||
log.info("[IdGenerateService] initialize IdGenerateService successfully, ID:{}", id);
|
||||
|
@ -7,37 +7,50 @@ package tech.powerjob.server.core.uid;
|
||||
* @since 2020/4/6
|
||||
*/
|
||||
public class SnowFlakeIdGenerator {
|
||||
|
||||
/**
|
||||
* 起始的时间戳(a special day for me)
|
||||
*/
|
||||
private final static long START_STAMP = 1555776000000L;
|
||||
|
||||
/**
|
||||
* 每一部分占用的位数
|
||||
* 序列号占用的位数
|
||||
*/
|
||||
private final static long SEQUENCE_BIT = 6; //序列号占用的位数
|
||||
private final static long MACHINE_BIT = 14; //机器标识占用的位数
|
||||
private final static long DATA_CENTER_BIT = 2;//数据中心占用的位数
|
||||
|
||||
private final static long SEQUENCE_BIT = 6;
|
||||
/**
|
||||
* 机器标识占用的位数
|
||||
*/
|
||||
private final static long MACHINE_BIT = 14;
|
||||
/**
|
||||
* 数据中心占用的位数
|
||||
*/
|
||||
private final static long DATA_CENTER_BIT = 2;
|
||||
/**
|
||||
* 每一部分的最大值
|
||||
*/
|
||||
private final static long MAX_DATA_CENTER_NUM = ~(-1L << DATA_CENTER_BIT);
|
||||
private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT);
|
||||
private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT);
|
||||
|
||||
/**
|
||||
* 每一部分向左的位移
|
||||
*/
|
||||
private final static long MACHINE_LEFT = SEQUENCE_BIT;
|
||||
private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
|
||||
private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
|
||||
|
||||
private final long dataCenterId; //数据中心
|
||||
private final long machineId; //机器标识
|
||||
private long sequence = 0L; //序列号
|
||||
private long lastTimestamp = -1L;//上一次时间戳
|
||||
/**
|
||||
* 数据中心
|
||||
*/
|
||||
private final long dataCenterId;
|
||||
/**
|
||||
* 机器标识
|
||||
*/
|
||||
private final long machineId;
|
||||
/**
|
||||
* 序列号
|
||||
*/
|
||||
private long sequence = 0L;
|
||||
/**
|
||||
* 上一次时间戳
|
||||
*/
|
||||
private long lastTimestamp = -1L;
|
||||
|
||||
public SnowFlakeIdGenerator(long dataCenterId, long machineId) {
|
||||
if (dataCenterId > MAX_DATA_CENTER_NUM || dataCenterId < 0) {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.server.core.validator;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import tech.powerjob.common.enums.WorkflowNodeType;
|
||||
@ -18,10 +19,10 @@ import javax.annotation.Resource;
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class JobNodeValidator implements NodeValidator {
|
||||
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
private final JobInfoRepository jobInfoRepository;
|
||||
|
||||
@Override
|
||||
public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) {
|
||||
|
@ -1,6 +1,7 @@
|
||||
package tech.powerjob.server.core.validator;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import tech.powerjob.common.enums.WorkflowNodeType;
|
||||
@ -13,7 +14,6 @@ import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
@ -23,12 +23,12 @@ import java.util.Optional;
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class NestedWorkflowNodeValidator implements NodeValidator {
|
||||
|
||||
@Resource
|
||||
private WorkflowInfoRepository workflowInfoRepository;
|
||||
@Resource
|
||||
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
|
||||
private final WorkflowInfoRepository workflowInfoRepository;
|
||||
|
||||
private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
|
||||
|
||||
@Override
|
||||
public void complexValidate(WorkflowNodeInfoDO node, WorkflowDAG dag) {
|
||||
|
@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.TypeReference;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
@ -32,7 +33,6 @@ import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowNodeInfoRepository;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -47,25 +47,25 @@ import static tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils.isNo
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@SuppressWarnings("squid:S1192")
|
||||
public class WorkflowInstanceManager {
|
||||
|
||||
@Resource
|
||||
private AlarmCenter alarmCenter;
|
||||
@Resource
|
||||
private IdGenerateService idGenerateService;
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private UserService userService;
|
||||
@Resource
|
||||
private WorkflowInfoRepository workflowInfoRepository;
|
||||
@Resource
|
||||
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
|
||||
@Resource
|
||||
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
|
||||
@Resource
|
||||
private WorkflowNodeHandleService workflowNodeHandleService;
|
||||
private final AlarmCenter alarmCenter;
|
||||
|
||||
private final IdGenerateService idGenerateService;
|
||||
|
||||
private final JobInfoRepository jobInfoRepository;
|
||||
|
||||
private final UserService userService;
|
||||
|
||||
private final WorkflowInfoRepository workflowInfoRepository;
|
||||
|
||||
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
|
||||
|
||||
private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
|
||||
|
||||
private final WorkflowNodeHandleService workflowNodeHandleService;
|
||||
|
||||
/**
|
||||
* 创建工作流任务实例
|
||||
|
@ -1,14 +1,20 @@
|
||||
package tech.powerjob.server.core.workflow;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
import tech.powerjob.common.SystemInstanceResult;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.enums.WorkflowInstanceStatus;
|
||||
import tech.powerjob.common.enums.WorkflowNodeType;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.common.SystemInstanceResult;
|
||||
import tech.powerjob.common.enums.WorkflowInstanceStatus;
|
||||
import tech.powerjob.common.model.PEWorkflowDAG;
|
||||
import tech.powerjob.common.response.WorkflowInstanceInfoDTO;
|
||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||
import tech.powerjob.server.common.utils.SpringUtils;
|
||||
import tech.powerjob.server.core.instance.InstanceService;
|
||||
import tech.powerjob.server.core.lock.UseCacheLock;
|
||||
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
|
||||
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
|
||||
@ -16,12 +22,7 @@ import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
|
||||
import tech.powerjob.server.remote.server.redirector.DesignateServer;
|
||||
import tech.powerjob.server.core.instance.InstanceService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
@ -35,18 +36,16 @@ import java.util.Optional;
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class WorkflowInstanceService {
|
||||
|
||||
@Resource
|
||||
private InstanceService instanceService;
|
||||
@Resource
|
||||
private WorkflowInstanceInfoRepository wfInstanceInfoRepository;
|
||||
@Resource
|
||||
private WorkflowInstanceManager workflowInstanceManager;
|
||||
@Resource
|
||||
private WorkflowInfoRepository workflowInfoRepository;
|
||||
@Resource
|
||||
private WorkflowInstanceService self;
|
||||
private final InstanceService instanceService;
|
||||
|
||||
private final WorkflowInstanceInfoRepository wfInstanceInfoRepository;
|
||||
|
||||
private final WorkflowInstanceManager workflowInstanceManager;
|
||||
|
||||
private final WorkflowInfoRepository workflowInfoRepository;
|
||||
|
||||
/**
|
||||
* 停止工作流实例(入口)
|
||||
@ -61,10 +60,10 @@ public class WorkflowInstanceService {
|
||||
}
|
||||
// 如果这是一个被嵌套的工作流,则终止父工作流
|
||||
if (wfInstance.getParentWfInstanceId() != null) {
|
||||
self.stopWorkflowInstance(wfInstance.getParentWfInstanceId(), appId);
|
||||
SpringUtils.getBean(this.getClass()).stopWorkflowInstance(wfInstance.getParentWfInstanceId(), appId);
|
||||
return;
|
||||
}
|
||||
self.stopWorkflowInstance(wfInstanceId, appId);
|
||||
SpringUtils.getBean(this.getClass()).stopWorkflowInstance(wfInstanceId, appId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,11 +1,13 @@
|
||||
package tech.powerjob.server.core.workflow.hanlder.impl;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.enums.WorkflowNodeType;
|
||||
import tech.powerjob.common.model.PEWorkflowDAG;
|
||||
import tech.powerjob.server.common.utils.SpringUtils;
|
||||
import tech.powerjob.server.core.DispatchService;
|
||||
import tech.powerjob.server.core.instance.InstanceService;
|
||||
import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler;
|
||||
@ -13,7 +15,6 @@ import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* @author Echo009
|
||||
@ -21,21 +22,15 @@ import javax.annotation.Resource;
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class JobNodeHandler implements TaskNodeHandler {
|
||||
|
||||
@Resource
|
||||
private InstanceService instanceService;
|
||||
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
|
||||
@Resource
|
||||
private DispatchService dispatchService;
|
||||
private final JobInfoRepository jobInfoRepository;
|
||||
|
||||
@Override
|
||||
public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
|
||||
// instanceParam 传递的是工作流实例的 wfContext
|
||||
Long instanceId = instanceService.create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis());
|
||||
Long instanceId = SpringUtils.getBean(InstanceService.class).create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis());
|
||||
node.setInstanceId(instanceId);
|
||||
node.setStatus(InstanceStatus.RUNNING.getV());
|
||||
log.info("[Workflow-{}|{}] create readyNode(JOB) instance(nodeId={},jobId={},instanceId={}) successfully~", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId(), instanceId);
|
||||
@ -46,7 +41,7 @@ public class JobNodeHandler implements TaskNodeHandler {
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new);
|
||||
// 洗去时间表达式类型
|
||||
jobInfo.setTimeExpressionType(TimeExpressionType.WORKFLOW.getV());
|
||||
dispatchService.dispatch(jobInfo, node.getInstanceId());
|
||||
SpringUtils.getBean(DispatchService.class).dispatch(jobInfo, node.getInstanceId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1,6 +1,7 @@
|
||||
package tech.powerjob.server.core.workflow.hanlder.impl;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import tech.powerjob.common.SystemInstanceResult;
|
||||
@ -11,6 +12,7 @@ import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.common.model.PEWorkflowDAG;
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||
import tech.powerjob.server.common.utils.SpringUtils;
|
||||
import tech.powerjob.server.core.workflow.WorkflowInstanceManager;
|
||||
import tech.powerjob.server.core.workflow.algorithm.WorkflowDAGUtils;
|
||||
import tech.powerjob.server.core.workflow.hanlder.TaskNodeHandler;
|
||||
@ -19,7 +21,6 @@ import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.WorkflowInstanceInfoRepository;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
@ -28,16 +29,12 @@ import java.util.Date;
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class NestedWorkflowNodeHandler implements TaskNodeHandler {
|
||||
|
||||
@Resource
|
||||
private WorkflowInfoRepository workflowInfoRepository;
|
||||
private final WorkflowInfoRepository workflowInfoRepository;
|
||||
|
||||
@Resource
|
||||
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
|
||||
|
||||
@Resource
|
||||
private WorkflowInstanceManager workflowInstanceManager;
|
||||
private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
|
||||
|
||||
@Override
|
||||
public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
|
||||
@ -78,7 +75,7 @@ public class NestedWorkflowNodeHandler implements TaskNodeHandler {
|
||||
} else {
|
||||
// 透传当前的上下文创建新的工作流实例
|
||||
String wfContext = wfInstanceInfo.getWfContext();
|
||||
Long instanceId = workflowInstanceManager.create(targetWf, wfContext, System.currentTimeMillis(), wfInstanceInfo.getWfInstanceId());
|
||||
Long instanceId = SpringUtils.getBean(WorkflowInstanceManager.class).create(targetWf, wfContext, System.currentTimeMillis(), wfInstanceInfo.getWfInstanceId());
|
||||
node.setInstanceId(instanceId);
|
||||
}
|
||||
node.setStartTime(CommonUtils.formatTime(System.currentTimeMillis()));
|
||||
@ -89,7 +86,7 @@ public class NestedWorkflowNodeHandler implements TaskNodeHandler {
|
||||
public void startTaskInstance(PEWorkflowDAG.Node node) {
|
||||
Long wfId = node.getJobId();
|
||||
WorkflowInfoDO targetWf = workflowInfoRepository.findById(wfId).orElse(null);
|
||||
workflowInstanceManager.start(targetWf, node.getInstanceId());
|
||||
SpringUtils.getBean(WorkflowInstanceManager.class).start(targetWf, node.getInstanceId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -21,6 +21,7 @@ import org.springframework.stereotype.Service;
|
||||
public class DatabaseLockService implements LockService {
|
||||
|
||||
private final String ownerIp;
|
||||
|
||||
private final OmsLockRepository omsLockRepository;
|
||||
|
||||
@Autowired
|
||||
|
@ -24,9 +24,9 @@ import java.util.concurrent.*;
|
||||
public class AlarmCenter {
|
||||
|
||||
private final ExecutorService POOL;
|
||||
|
||||
private final List<Alarmable> BEANS = Lists.newLinkedList();
|
||||
|
||||
@Autowired
|
||||
public AlarmCenter(List<Alarmable> alarmables) {
|
||||
int cores = Runtime.getRuntime().availableProcessors();
|
||||
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("AlarmPool-%d").build();
|
||||
|
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import tech.powerjob.common.OmsConstant;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.common.utils.NetUtils;
|
||||
@ -30,17 +31,19 @@ import java.util.Set;
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class DingTalkAlarmService implements Alarmable {
|
||||
|
||||
@Resource
|
||||
private Environment environment;
|
||||
private final Environment environment;
|
||||
|
||||
private Long agentId;
|
||||
private DingTalkUtils dingTalkUtils;
|
||||
private Cache<String, String> mobile2UserIdCache;
|
||||
|
||||
private static final int CACHE_SIZE = 8192;
|
||||
// 防止缓存击穿
|
||||
/**
|
||||
* 防止缓存击穿
|
||||
*/
|
||||
private static final String EMPTY_TAG = "EMPTY";
|
||||
|
||||
@Override
|
||||
|
@ -125,7 +125,7 @@ public class DingTalkUtils implements Closeable {
|
||||
|
||||
@AllArgsConstructor
|
||||
public static final class MarkdownEntity {
|
||||
private String title;
|
||||
private String detail;
|
||||
private final String title;
|
||||
private final String detail;
|
||||
}
|
||||
}
|
||||
|
@ -10,38 +10,70 @@ import lombok.Data;
|
||||
*/
|
||||
@Data
|
||||
public class JobInstanceAlarm implements Alarm {
|
||||
// 应用ID
|
||||
/**
|
||||
* 应用ID
|
||||
*/
|
||||
private long appId;
|
||||
// 任务ID
|
||||
/**
|
||||
* 任务ID
|
||||
*/
|
||||
private long jobId;
|
||||
// 任务实例ID
|
||||
/**
|
||||
* 任务实例ID
|
||||
*/
|
||||
private long instanceId;
|
||||
// 任务名称
|
||||
/**
|
||||
* 任务名称
|
||||
*/
|
||||
private String jobName;
|
||||
// 任务自带的参数
|
||||
/**
|
||||
* 任务自带的参数
|
||||
*/
|
||||
private String jobParams;
|
||||
// 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY)
|
||||
/**
|
||||
* 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY)
|
||||
*/
|
||||
private Integer timeExpressionType;
|
||||
// 时间表达式,CRON/NULL/LONG/LONG
|
||||
/**
|
||||
* 时间表达式,CRON/NULL/LONG/LONG
|
||||
*/
|
||||
private String timeExpression;
|
||||
// 执行类型,单机/广播/MR
|
||||
/**
|
||||
* 执行类型,单机/广播/MR
|
||||
*/
|
||||
private Integer executeType;
|
||||
// 执行器类型,Java/Shell
|
||||
/**
|
||||
* 执行器类型,Java/Shell
|
||||
*/
|
||||
private Integer processorType;
|
||||
// 执行器信息
|
||||
/**
|
||||
* 执行器信息
|
||||
*/
|
||||
private String processorInfo;
|
||||
|
||||
// 任务实例参数
|
||||
/**
|
||||
* 任务实例参数
|
||||
*/
|
||||
private String instanceParams;
|
||||
// 执行结果
|
||||
/**
|
||||
* 执行结果
|
||||
*/
|
||||
private String result;
|
||||
// 预计触发时间
|
||||
/**
|
||||
* 预计触发时间
|
||||
*/
|
||||
private Long expectedTriggerTime;
|
||||
// 实际触发时间
|
||||
/**
|
||||
* 实际触发时间
|
||||
*/
|
||||
private Long actualTriggerTime;
|
||||
// 结束时间
|
||||
/**
|
||||
* 结束时间
|
||||
*/
|
||||
private Long finishedTime;
|
||||
// TaskTracker地址
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private String taskTrackerAddress;
|
||||
|
||||
@Override
|
||||
|
@ -14,25 +14,39 @@ public class WorkflowInstanceAlarm implements Alarm {
|
||||
|
||||
private String workflowName;
|
||||
|
||||
// 任务所属应用的ID,冗余提高查询效率
|
||||
/**
|
||||
* 任务所属应用的ID,冗余提高查询效率
|
||||
*/
|
||||
private Long appId;
|
||||
private Long workflowId;
|
||||
// workflowInstanceId(任务实例表都使用单独的ID作为主键以支持潜在的分表需求)
|
||||
/**
|
||||
* workflowInstanceId(任务实例表都使用单独的ID作为主键以支持潜在的分表需求)
|
||||
*/
|
||||
private Long wfInstanceId;
|
||||
// workflow 状态(WorkflowInstanceStatus)
|
||||
/**
|
||||
* workflow 状态(WorkflowInstanceStatus)
|
||||
*/
|
||||
private Integer status;
|
||||
|
||||
private PEWorkflowDAG peWorkflowDAG;
|
||||
private String result;
|
||||
|
||||
// 实际触发时间
|
||||
/**
|
||||
* 实际触发时间
|
||||
*/
|
||||
private Long actualTriggerTime;
|
||||
// 结束时间
|
||||
/**
|
||||
* 结束时间
|
||||
*/
|
||||
private Long finishedTime;
|
||||
|
||||
// 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY)
|
||||
/**
|
||||
* 时间表达式类型(CRON/API/FIX_RATE/FIX_DELAY)
|
||||
*/
|
||||
private Integer timeExpressionType;
|
||||
// 时间表达式,CRON/NULL/LONG/LONG
|
||||
/**
|
||||
* 时间表达式,CRON/NULL/LONG/LONG
|
||||
*/
|
||||
private String timeExpression;
|
||||
|
||||
@Override
|
||||
|
@ -2,9 +2,11 @@ package tech.powerjob.server.migrate;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.common.enums.ProcessorType;
|
||||
import tech.powerjob.common.model.PEWorkflowDAG;
|
||||
import tech.powerjob.server.common.utils.SpringUtils;
|
||||
import tech.powerjob.server.extension.LockService;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
|
||||
@ -20,7 +22,6 @@ import org.springframework.data.jpa.domain.Specification;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.persistence.criteria.Predicate;
|
||||
import javax.transaction.Transactional;
|
||||
import java.util.*;
|
||||
@ -35,23 +36,18 @@ import java.util.concurrent.TimeUnit;
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class V3ToV4MigrateService {
|
||||
|
||||
private static final String MIGRATE_LOCK_TEMPLATE = "v3to4MigrateLock-%s-%s";
|
||||
|
||||
@Resource
|
||||
private LockService lockService;
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private WorkflowInfoRepository workflowInfoRepository;
|
||||
@Resource
|
||||
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
|
||||
/**
|
||||
* 避免内部方法调用导致事务不生效
|
||||
*/
|
||||
@Resource
|
||||
private V3ToV4MigrateService self;
|
||||
private final LockService lockService;
|
||||
|
||||
private final JobInfoRepository jobInfoRepository;
|
||||
|
||||
private final WorkflowInfoRepository workflowInfoRepository;
|
||||
|
||||
private final WorkflowNodeInfoRepository workflowNodeInfoRepository;
|
||||
|
||||
/* ********************** 3.x => 4.x ********************** */
|
||||
|
||||
@ -149,7 +145,7 @@ public class V3ToV4MigrateService {
|
||||
for (WorkflowInfoDO workflowInfo : workflowInfoList) {
|
||||
|
||||
try {
|
||||
boolean fixed = self.fixWorkflowInfoCoreFromV3ToV4(workflowInfo, jobId2NodeIdMap);
|
||||
boolean fixed = SpringUtils.getBean(this.getClass()).fixWorkflowInfoCoreFromV3ToV4(workflowInfo, jobId2NodeIdMap);
|
||||
if (fixed) {
|
||||
fixedWorkflowIds.add(workflowInfo.getId());
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package tech.powerjob.server.monitor;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
@ -19,9 +18,7 @@ public class PowerJobMonitorService implements MonitorService {
|
||||
|
||||
private final List<Monitor> monitors = Lists.newLinkedList();
|
||||
|
||||
@Autowired
|
||||
public PowerJobMonitorService(List<Monitor> monitors) {
|
||||
|
||||
monitors.forEach(m -> {
|
||||
log.info("[MonitorService] register monitor: {}", m.getClass().getName());
|
||||
this.monitors.add(m);
|
||||
|
@ -7,6 +7,7 @@ import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
|
||||
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.DependsOn;
|
||||
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
|
||||
import org.springframework.orm.jpa.JpaTransactionManager;
|
||||
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
|
||||
@ -38,9 +39,6 @@ import java.util.Objects;
|
||||
)
|
||||
public class LocalJpaConfig {
|
||||
|
||||
@Resource(name = "omsLocalDatasource")
|
||||
private DataSource omsLocalDatasource;
|
||||
|
||||
public static final String LOCAL_PACKAGES = "tech.powerjob.server.persistence.local";
|
||||
|
||||
private static Map<String, Object> genDatasourceProperties() {
|
||||
@ -56,8 +54,7 @@ public class LocalJpaConfig {
|
||||
}
|
||||
|
||||
@Bean(name = "localEntityManagerFactory")
|
||||
public LocalContainerEntityManagerFactoryBean initLocalEntityManagerFactory(EntityManagerFactoryBuilder builder) {
|
||||
|
||||
public LocalContainerEntityManagerFactoryBean initLocalEntityManagerFactory(@Qualifier("omsLocalDatasource") DataSource omsLocalDatasource,EntityManagerFactoryBuilder builder) {
|
||||
return builder
|
||||
.dataSource(omsLocalDatasource)
|
||||
.properties(genDatasourceProperties())
|
||||
@ -66,10 +63,9 @@ public class LocalJpaConfig {
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Bean(name = "localTransactionManager")
|
||||
public PlatformTransactionManager initLocalTransactionManager(EntityManagerFactoryBuilder builder) {
|
||||
return new JpaTransactionManager(Objects.requireNonNull(initLocalEntityManagerFactory(builder).getObject()));
|
||||
public PlatformTransactionManager initLocalTransactionManager(@Qualifier("localEntityManagerFactory") LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBean) {
|
||||
return new JpaTransactionManager(Objects.requireNonNull(localContainerEntityManagerFactoryBean.getObject()));
|
||||
}
|
||||
|
||||
@Bean(name = "localTransactionTemplate")
|
||||
|
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.server.persistence.config;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
|
||||
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
|
||||
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
|
||||
@ -13,7 +14,6 @@ import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.annotation.EnableTransactionManagement;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.sql.DataSource;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@ -36,12 +36,6 @@ import java.util.Objects;
|
||||
)
|
||||
public class RemoteJpaConfig {
|
||||
|
||||
@Resource(name = "omsRemoteDatasource")
|
||||
private DataSource omsRemoteDatasource;
|
||||
|
||||
@Resource(name = "multiDatasourceProperties")
|
||||
private MultiDatasourceProperties properties;
|
||||
|
||||
public static final String CORE_PACKAGES = "tech.powerjob.server.persistence.remote";
|
||||
|
||||
/**
|
||||
@ -69,7 +63,7 @@ public class RemoteJpaConfig {
|
||||
|
||||
@Primary
|
||||
@Bean(name = "remoteEntityManagerFactory")
|
||||
public LocalContainerEntityManagerFactoryBean initRemoteEntityManagerFactory(EntityManagerFactoryBuilder builder) {
|
||||
public LocalContainerEntityManagerFactoryBean initRemoteEntityManagerFactory(@Qualifier("omsRemoteDatasource") DataSource omsRemoteDatasource,@Qualifier("multiDatasourceProperties") MultiDatasourceProperties properties, EntityManagerFactoryBuilder builder) {
|
||||
Map<String, Object> datasourceProperties = genDatasourceProperties();
|
||||
datasourceProperties.putAll(properties.getRemote().getHibernate().getProperties());
|
||||
return builder
|
||||
@ -83,7 +77,7 @@ public class RemoteJpaConfig {
|
||||
|
||||
@Primary
|
||||
@Bean(name = "remoteTransactionManager")
|
||||
public PlatformTransactionManager initRemoteTransactionManager(EntityManagerFactoryBuilder builder) {
|
||||
return new JpaTransactionManager(Objects.requireNonNull(initRemoteEntityManagerFactory(builder).getObject()));
|
||||
public PlatformTransactionManager initRemoteTransactionManager(@Qualifier("remoteEntityManagerFactory") LocalContainerEntityManagerFactoryBean localContainerEntityManagerFactoryBean) {
|
||||
return new JpaTransactionManager(Objects.requireNonNull(localContainerEntityManagerFactoryBean.getObject()));
|
||||
}
|
||||
}
|
||||
|
@ -16,16 +16,20 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
public interface LocalInstanceLogRepository extends JpaRepository<LocalInstanceLogDO, Long> {
|
||||
|
||||
// 流式查询
|
||||
/**
|
||||
* 流式查询
|
||||
*/
|
||||
Stream<LocalInstanceLogDO> findByInstanceIdOrderByLogTime(Long instanceId);
|
||||
|
||||
// 删除数据
|
||||
/**
|
||||
* 删除数据
|
||||
*/
|
||||
@Modifying
|
||||
@Transactional
|
||||
@Transactional(rollbackOn = Exception.class)
|
||||
long deleteByInstanceId(Long instanceId);
|
||||
|
||||
@Modifying
|
||||
@Transactional
|
||||
@Transactional(rollbackOn = Exception.class)
|
||||
@CanIgnoreReturnValue
|
||||
long deleteByInstanceIdInAndLogTimeLessThan(List<Long> instanceIds, Long t);
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
package tech.powerjob.server.persistence.mongodb;
|
||||
|
||||
import tech.powerjob.server.common.PowerJobServerConfigKey;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
@ -19,8 +18,8 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.data.mongodb.core.MongoTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import tech.powerjob.server.common.PowerJobServerConfigKey;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.*;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
@ -36,21 +35,24 @@ import java.util.function.Consumer;
|
||||
@Service
|
||||
public class GridFsManager implements InitializingBean {
|
||||
|
||||
@Resource
|
||||
private Environment environment;
|
||||
private final Environment environment;
|
||||
|
||||
private final MongoDatabase db;
|
||||
|
||||
private MongoDatabase db;
|
||||
private boolean available;
|
||||
|
||||
private final Map<String, GridFSBucket> bucketCache = Maps.newConcurrentMap();
|
||||
|
||||
public static final String LOG_BUCKET = "log";
|
||||
|
||||
public static final String CONTAINER_BUCKET = "container";
|
||||
|
||||
@Autowired(required = false)
|
||||
public void setMongoTemplate(MongoTemplate mongoTemplate) {
|
||||
public GridFsManager(Environment environment, @Autowired(required = false) MongoTemplate mongoTemplate) {
|
||||
this.environment = environment;
|
||||
if (mongoTemplate != null) {
|
||||
this.db = mongoTemplate.getDb();
|
||||
} else {
|
||||
this.db = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.server.persistence.monitor;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.annotation.Around;
|
||||
@ -11,7 +12,6 @@ import tech.powerjob.server.monitor.MonitorService;
|
||||
import tech.powerjob.server.monitor.events.db.DatabaseEvent;
|
||||
import tech.powerjob.server.monitor.events.db.DatabaseType;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
@ -25,10 +25,10 @@ import java.util.stream.Stream;
|
||||
@Slf4j
|
||||
@Aspect
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class DatabaseMonitorAspect {
|
||||
|
||||
@Resource
|
||||
private MonitorService monitorService;
|
||||
private final MonitorService monitorService;
|
||||
|
||||
@Around("execution(* tech.powerjob.server.persistence.remote.repository..*.*(..))")
|
||||
public Object monitorCoreDB(ProceedingJoinPoint joinPoint) throws Throwable {
|
||||
|
@ -16,13 +16,13 @@ import javax.transaction.Transactional;
|
||||
public interface OmsLockRepository extends JpaRepository<OmsLockDO, Long> {
|
||||
|
||||
@Modifying
|
||||
@Transactional
|
||||
@Transactional(rollbackOn = Exception.class)
|
||||
@Query(value = "delete from OmsLockDO where lockName = ?1")
|
||||
int deleteByLockName(String lockName);
|
||||
|
||||
OmsLockDO findByLockName(String lockName);
|
||||
|
||||
@Modifying
|
||||
@Transactional
|
||||
@Transactional(rollbackOn = Exception.class)
|
||||
int deleteByOwnerIP(String ip);
|
||||
}
|
||||
|
@ -37,20 +37,25 @@ import java.util.concurrent.TimeUnit;
|
||||
@Service
|
||||
public class ServerElectionService {
|
||||
|
||||
@Resource
|
||||
private LockService lockService;
|
||||
@Resource
|
||||
private TransportService transportService;
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
private final LockService lockService;
|
||||
|
||||
@Value("${oms.accurate.select.server.percentage}")
|
||||
private int accurateSelectServerPercentage;
|
||||
private final TransportService transportService;
|
||||
|
||||
private final AppInfoRepository appInfoRepository;
|
||||
|
||||
private final int accurateSelectServerPercentage;
|
||||
|
||||
private static final int RETRY_TIMES = 10;
|
||||
private static final long PING_TIMEOUT_MS = 1000;
|
||||
private static final String SERVER_ELECT_LOCK = "server_elect_%d";
|
||||
|
||||
public ServerElectionService(LockService lockService, TransportService transportService, AppInfoRepository appInfoRepository,@Value("${oms.accurate.select.server.percentage}") int accurateSelectServerPercentage) {
|
||||
this.lockService = lockService;
|
||||
this.transportService = transportService;
|
||||
this.appInfoRepository = appInfoRepository;
|
||||
this.accurateSelectServerPercentage = accurateSelectServerPercentage;
|
||||
}
|
||||
|
||||
public String elect(Long appId, String protocol, String currentServer) {
|
||||
if (!accurate()) {
|
||||
// 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功
|
||||
|
@ -4,6 +4,7 @@ import akka.pattern.Patterns;
|
||||
import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.type.TypeFactory;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.common.RemoteConstant;
|
||||
import tech.powerjob.common.response.AskResponse;
|
||||
@ -39,10 +40,10 @@ import java.util.concurrent.CompletionStage;
|
||||
@Aspect
|
||||
@Component
|
||||
@Order(0)
|
||||
@RequiredArgsConstructor
|
||||
public class DesignateServerAspect {
|
||||
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
private final AppInfoRepository appInfoRepository;
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
@ -70,7 +71,7 @@ public class DesignateServerAspect {
|
||||
}
|
||||
|
||||
if (appId == null) {
|
||||
throw new PowerJobException("can't find appId in params for:" + signature.toString());
|
||||
throw new PowerJobException("can't find appId in params for:" + signature);
|
||||
}
|
||||
|
||||
// 获取执行机器
|
||||
|
@ -21,11 +21,17 @@ import java.util.Map;
|
||||
@Slf4j
|
||||
public class ClusterStatusHolder {
|
||||
|
||||
// 集群所属的应用名称
|
||||
/**
|
||||
* 集群所属的应用名称
|
||||
*/
|
||||
private final String appName;
|
||||
// 集群中所有机器的信息
|
||||
/**
|
||||
* 集群中所有机器的信息
|
||||
*/
|
||||
private final Map<String, WorkerInfo> address2WorkerInfo;
|
||||
// 集群中所有机器的容器部署状态 containerId -> (workerAddress -> containerInfo)
|
||||
/**
|
||||
* 集群中所有机器的容器部署状态 containerId -> (workerAddress -> containerInfo)
|
||||
*/
|
||||
private Map<Long, Map<String, DeployedContainerInfo>> containerId2Infos;
|
||||
|
||||
|
||||
|
@ -18,8 +18,10 @@ import java.util.Set;
|
||||
@Slf4j
|
||||
public class WorkerClusterManagerService {
|
||||
|
||||
// 存储Worker健康信息,appId -> ClusterStatusHolder
|
||||
private static final Map<Long, ClusterStatusHolder> appId2ClusterStatus = Maps.newConcurrentMap();
|
||||
/**
|
||||
* 存储Worker健康信息,appId -> ClusterStatusHolder
|
||||
*/
|
||||
private static final Map<Long, ClusterStatusHolder> APP_ID_2_CLUSTER_STATUS = Maps.newConcurrentMap();
|
||||
|
||||
/**
|
||||
* 更新状态
|
||||
@ -28,7 +30,7 @@ public class WorkerClusterManagerService {
|
||||
public static void updateStatus(WorkerHeartbeat heartbeat) {
|
||||
Long appId = heartbeat.getAppId();
|
||||
String appName = heartbeat.getAppName();
|
||||
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
|
||||
ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
|
||||
clusterStatusHolder.updateStatus(heartbeat);
|
||||
}
|
||||
|
||||
@ -38,7 +40,7 @@ public class WorkerClusterManagerService {
|
||||
*/
|
||||
public static void clean(List<Long> usingAppIds) {
|
||||
Set<Long> keys = Sets.newHashSet(usingAppIds);
|
||||
appId2ClusterStatus.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));
|
||||
APP_ID_2_CLUSTER_STATUS.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));
|
||||
}
|
||||
|
||||
|
||||
@ -46,11 +48,11 @@ public class WorkerClusterManagerService {
|
||||
* 清理缓存信息,防止 OOM
|
||||
*/
|
||||
public static void cleanUp() {
|
||||
appId2ClusterStatus.values().forEach(ClusterStatusHolder::release);
|
||||
APP_ID_2_CLUSTER_STATUS.values().forEach(ClusterStatusHolder::release);
|
||||
}
|
||||
|
||||
protected static Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {
|
||||
return appId2ClusterStatus;
|
||||
return APP_ID_2_CLUSTER_STATUS;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -26,9 +26,8 @@ import java.util.Optional;
|
||||
@Service
|
||||
public class WorkerClusterQueryService {
|
||||
|
||||
private List<WorkerFilter> workerFilters;
|
||||
private final List<WorkerFilter> workerFilters;
|
||||
|
||||
@Autowired
|
||||
public WorkerClusterQueryService(List<WorkerFilter> workerFilters) {
|
||||
this.workerFilters = workerFilters;
|
||||
}
|
||||
@ -92,7 +91,6 @@ public class WorkerClusterQueryService {
|
||||
*/
|
||||
public Optional<WorkerInfo> getWorkerInfoByAddress(Long appId, String address) {
|
||||
// this may cause NPE while address value is null .
|
||||
//return Optional.ofNullable(getWorkerInfosByAppId(appId).get(address));
|
||||
final Map<String, WorkerInfo> workerInfosByAppId = getWorkerInfosByAppId(appId);
|
||||
//add null check for both workerInfos Map and address
|
||||
if (null != workerInfosByAppId && null != address) {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.server.config;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@ -25,10 +26,10 @@ import static springfox.documentation.builders.PathSelectors.any;
|
||||
@Configuration
|
||||
@EnableSwagger2
|
||||
@ConditionalOnProperty(name = PowerJobServerConfigKey.SWAGGER_UI_ENABLE, havingValue = "true")
|
||||
@RequiredArgsConstructor
|
||||
public class SwaggerConfig {
|
||||
|
||||
@Resource
|
||||
private ServerInfoService serverInfoService;
|
||||
private final ServerInfoService serverInfoService;
|
||||
|
||||
@Bean
|
||||
public Docket createRestApi() {
|
||||
|
@ -62,7 +62,9 @@ public class ThreadPoolConfig {
|
||||
return executor;
|
||||
}
|
||||
|
||||
// 引入 WebSocket 支持后需要手动初始化调度线程池
|
||||
/**
|
||||
* 引入 WebSocket 支持后需要手动初始化调度线程池
|
||||
*/
|
||||
@Bean
|
||||
public TaskScheduler taskScheduler() {
|
||||
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
|
||||
|
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.server.web.controller;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.common.response.ResultDTO;
|
||||
import tech.powerjob.server.persistence.remote.model.AppInfoDO;
|
||||
@ -31,12 +32,12 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/appInfo")
|
||||
@RequiredArgsConstructor
|
||||
public class AppInfoController {
|
||||
|
||||
@Resource
|
||||
private AppInfoService appInfoService;
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
private final AppInfoService appInfoService;
|
||||
|
||||
private final AppInfoRepository appInfoRepository;
|
||||
|
||||
private static final int MAX_APP_NUM = 200;
|
||||
|
||||
|
@ -41,15 +41,21 @@ import java.util.stream.Collectors;
|
||||
@RequestMapping("/container")
|
||||
public class ContainerController {
|
||||
|
||||
@Value("${server.port}")
|
||||
private int port;
|
||||
|
||||
@Resource
|
||||
private ContainerService containerService;
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
@Resource
|
||||
private ContainerInfoRepository containerInfoRepository;
|
||||
private final int port;
|
||||
|
||||
private final ContainerService containerService;
|
||||
|
||||
private final AppInfoRepository appInfoRepository;
|
||||
|
||||
private final ContainerInfoRepository containerInfoRepository;
|
||||
|
||||
public ContainerController(@Value("${server.port}") int port, ContainerService containerService, AppInfoRepository appInfoRepository, ContainerInfoRepository containerInfoRepository) {
|
||||
this.port = port;
|
||||
this.containerService = containerService;
|
||||
this.appInfoRepository = appInfoRepository;
|
||||
this.containerInfoRepository = containerInfoRepository;
|
||||
}
|
||||
|
||||
@GetMapping("/downloadJar")
|
||||
public void downloadJar(String version, HttpServletResponse response) throws IOException {
|
||||
|
@ -1,8 +1,10 @@
|
||||
package tech.powerjob.server.web.controller;
|
||||
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import tech.powerjob.common.OpenAPIConstant;
|
||||
import tech.powerjob.common.PowerQuery;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.request.http.SaveJobInfoRequest;
|
||||
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
|
||||
import tech.powerjob.common.request.http.SaveWorkflowRequest;
|
||||
@ -11,19 +13,16 @@ import tech.powerjob.common.response.InstanceInfoDTO;
|
||||
import tech.powerjob.common.response.JobInfoDTO;
|
||||
import tech.powerjob.common.response.ResultDTO;
|
||||
import tech.powerjob.common.response.WorkflowInstanceInfoDTO;
|
||||
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
|
||||
import tech.powerjob.server.core.instance.InstanceService;
|
||||
import tech.powerjob.server.core.service.AppInfoService;
|
||||
import tech.powerjob.server.core.service.CacheService;
|
||||
import tech.powerjob.server.core.service.JobService;
|
||||
import tech.powerjob.server.core.instance.InstanceService;
|
||||
import tech.powerjob.server.core.workflow.WorkflowInstanceService;
|
||||
import tech.powerjob.server.core.workflow.WorkflowService;
|
||||
import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.WorkflowNodeInfoDO;
|
||||
import tech.powerjob.server.web.response.WorkflowInfoVO;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.text.ParseException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -34,21 +33,20 @@ import java.util.List;
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping(OpenAPIConstant.WEB_PATH)
|
||||
@RequiredArgsConstructor
|
||||
public class OpenAPIController {
|
||||
|
||||
@Resource
|
||||
private AppInfoService appInfoService;
|
||||
@Resource
|
||||
private JobService jobService;
|
||||
@Resource
|
||||
private InstanceService instanceService;
|
||||
@Resource
|
||||
private WorkflowService workflowService;
|
||||
@Resource
|
||||
private WorkflowInstanceService workflowInstanceService;
|
||||
private final AppInfoService appInfoService;
|
||||
|
||||
@Resource
|
||||
private CacheService cacheService;
|
||||
private final JobService jobService;
|
||||
|
||||
private final InstanceService instanceService;
|
||||
|
||||
private final WorkflowService workflowService;
|
||||
|
||||
private final WorkflowInstanceService workflowInstanceService;
|
||||
|
||||
private final CacheService cacheService;
|
||||
|
||||
|
||||
@PostMapping(OpenAPIConstant.ASSERT)
|
||||
@ -59,7 +57,7 @@ public class OpenAPIController {
|
||||
/* ************* Job 区 ************* */
|
||||
|
||||
@PostMapping(OpenAPIConstant.SAVE_JOB)
|
||||
public ResultDTO<Long> saveJob(@RequestBody SaveJobInfoRequest request) throws ParseException {
|
||||
public ResultDTO<Long> saveJob(@RequestBody SaveJobInfoRequest request) {
|
||||
if (request.getId() != null) {
|
||||
checkJobIdValid(request.getId(), request.getAppId());
|
||||
}
|
||||
@ -102,7 +100,7 @@ public class OpenAPIController {
|
||||
}
|
||||
|
||||
@PostMapping(OpenAPIConstant.ENABLE_JOB)
|
||||
public ResultDTO<Void> enableJob(Long jobId, Long appId) throws ParseException {
|
||||
public ResultDTO<Void> enableJob(Long jobId, Long appId) {
|
||||
checkJobIdValid(jobId, appId);
|
||||
jobService.enableJob(jobId);
|
||||
return ResultDTO.success(null);
|
||||
@ -156,7 +154,7 @@ public class OpenAPIController {
|
||||
/* ************* Workflow 区 ************* */
|
||||
|
||||
@PostMapping(OpenAPIConstant.SAVE_WORKFLOW)
|
||||
public ResultDTO<Long> saveWorkflow(@RequestBody SaveWorkflowRequest request) throws ParseException {
|
||||
public ResultDTO<Long> saveWorkflow(@RequestBody SaveWorkflowRequest request) {
|
||||
return ResultDTO.success(workflowService.saveWorkflow(request));
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,11 @@ package tech.powerjob.server.web.controller;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import tech.powerjob.common.response.ResultDTO;
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.common.utils.NetUtils;
|
||||
@ -10,12 +15,7 @@ import tech.powerjob.server.persistence.remote.repository.AppInfoRepository;
|
||||
import tech.powerjob.server.remote.server.election.ServerElectionService;
|
||||
import tech.powerjob.server.remote.transport.TransportService;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Optional;
|
||||
import java.util.TimeZone;
|
||||
|
||||
@ -28,16 +28,16 @@ import java.util.TimeZone;
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/server")
|
||||
@RequiredArgsConstructor
|
||||
public class ServerController {
|
||||
|
||||
@Resource
|
||||
private TransportService transportService;
|
||||
@Resource
|
||||
private ServerElectionService serverElectionService;
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
@Resource
|
||||
private WorkerClusterQueryService workerClusterQueryService;
|
||||
private final TransportService transportService;
|
||||
|
||||
private final ServerElectionService serverElectionService;
|
||||
|
||||
private final AppInfoRepository appInfoRepository;
|
||||
|
||||
private final WorkerClusterQueryService workerClusterQueryService;
|
||||
|
||||
@GetMapping("/assert")
|
||||
public ResultDTO<Long> assertAppName(String appName) {
|
||||
|
@ -1,25 +1,24 @@
|
||||
package tech.powerjob.server.web.controller;
|
||||
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.OmsConstant;
|
||||
import tech.powerjob.common.response.ResultDTO;
|
||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||
import tech.powerjob.server.common.module.ServerInfo;
|
||||
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
|
||||
import tech.powerjob.server.remote.server.self.ServerInfoService;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
|
||||
import tech.powerjob.server.common.module.WorkerInfo;
|
||||
import tech.powerjob.server.web.response.SystemOverviewVO;
|
||||
import tech.powerjob.server.web.response.WorkerStatusVO;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.time.DateFormatUtils;
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import tech.powerjob.common.OmsConstant;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.response.ResultDTO;
|
||||
import tech.powerjob.server.common.constants.SwitchableStatus;
|
||||
import tech.powerjob.server.common.module.WorkerInfo;
|
||||
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
|
||||
import tech.powerjob.server.remote.server.self.ServerInfoService;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
|
||||
import tech.powerjob.server.web.response.SystemOverviewVO;
|
||||
import tech.powerjob.server.web.response.WorkerStatusVO;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.TimeZone;
|
||||
@ -34,17 +33,16 @@ import java.util.stream.Collectors;
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequestMapping("/system")
|
||||
@RequiredArgsConstructor
|
||||
public class SystemInfoController {
|
||||
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
private final JobInfoRepository jobInfoRepository;
|
||||
|
||||
@Resource
|
||||
private ServerInfoService serverInfoService;
|
||||
@Resource
|
||||
private WorkerClusterQueryService workerClusterQueryService;
|
||||
private final InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
private final ServerInfoService serverInfoService;
|
||||
|
||||
private final WorkerClusterQueryService workerClusterQueryService;
|
||||
|
||||
@GetMapping("/listWorker")
|
||||
public ResultDTO<List<WorkerStatusVO>> listWorker(Long appId) {
|
||||
|
@ -1,6 +1,7 @@
|
||||
package tech.powerjob.server.web.controller;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
@ -10,7 +11,6 @@ import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.response.ResultDTO;
|
||||
import tech.powerjob.server.core.scheduler.TimingStrategyService;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -22,10 +22,10 @@ import java.util.List;
|
||||
*/
|
||||
@RestController
|
||||
@RequestMapping("/validate")
|
||||
@RequiredArgsConstructor
|
||||
public class ValidateController {
|
||||
|
||||
@Resource
|
||||
private TimingStrategyService timingStrategyService;
|
||||
private final TimingStrategyService timingStrategyService;
|
||||
|
||||
@GetMapping("/timeExpression")
|
||||
public ResultDTO<List<String>> checkTimeExpression(TimeExpressionType timeExpressionType,
|
||||
|
@ -11,15 +11,25 @@ import lombok.Data;
|
||||
@Data
|
||||
public class GenerateContainerTemplateRequest {
|
||||
|
||||
// Maven Group
|
||||
/**
|
||||
* Maven Group
|
||||
*/
|
||||
private String group;
|
||||
// Maven artifact
|
||||
/**
|
||||
* Maven artifact
|
||||
*/
|
||||
private String artifact;
|
||||
// Maven name
|
||||
/**
|
||||
* Maven name
|
||||
*/
|
||||
private String name;
|
||||
// 包名(com.xx.xx.xx)
|
||||
/**
|
||||
* 包名(com.xx.xx.xx)
|
||||
*/
|
||||
private String packageName;
|
||||
// Java版本号,8或者11
|
||||
/**
|
||||
* Java版本号,8或者11
|
||||
*/
|
||||
private Integer javaVersion;
|
||||
|
||||
}
|
||||
|
@ -17,8 +17,12 @@ public class ModifyUserInfoRequest {
|
||||
private String password;
|
||||
private String webHook;
|
||||
|
||||
// 手机号
|
||||
/**
|
||||
* 手机号
|
||||
*/
|
||||
private String phone;
|
||||
// 邮箱地址
|
||||
/**
|
||||
* 邮箱地址
|
||||
*/
|
||||
private String email;
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package tech.powerjob.server.web.request;
|
||||
|
||||
import tech.powerjob.server.common.constants.InstanceType;
|
||||
import lombok.Data;
|
||||
import tech.powerjob.server.common.constants.InstanceType;
|
||||
|
||||
/**
|
||||
* 任务实例查询对象
|
||||
@ -12,14 +12,21 @@ import lombok.Data;
|
||||
@Data
|
||||
public class QueryInstanceRequest {
|
||||
|
||||
// 任务所属应用ID
|
||||
/**
|
||||
* 任务所属应用ID
|
||||
*/
|
||||
private Long appId;
|
||||
// 当前页码
|
||||
/**
|
||||
* 当前页码
|
||||
*/
|
||||
private Integer index;
|
||||
// 页大小
|
||||
/**
|
||||
* 页大小
|
||||
*/
|
||||
private Integer pageSize;
|
||||
|
||||
// 查询条件(NORMAL/WORKFLOW)
|
||||
/**
|
||||
* 查询条件(NORMAL/WORKFLOW)
|
||||
*/
|
||||
private InstanceType type;
|
||||
private Long instanceId;
|
||||
private Long jobId;
|
||||
|
@ -11,14 +11,21 @@ import lombok.Data;
|
||||
@Data
|
||||
public class QueryJobInfoRequest {
|
||||
|
||||
// 任务所属应用ID
|
||||
/**
|
||||
* 任务所属应用ID
|
||||
*/
|
||||
private Long appId;
|
||||
// 当前页码
|
||||
/**
|
||||
* 当前页码
|
||||
*/
|
||||
private Integer index;
|
||||
// 页大小
|
||||
/**
|
||||
* 页大小
|
||||
*/
|
||||
private Integer pageSize;
|
||||
|
||||
// 查询条件
|
||||
/**
|
||||
* 任务ID
|
||||
*/
|
||||
private Long jobId;
|
||||
private String keyword;
|
||||
}
|
||||
|
@ -11,14 +11,21 @@ import lombok.Data;
|
||||
@Data
|
||||
public class QueryWorkflowInfoRequest {
|
||||
|
||||
// 任务所属应用ID
|
||||
/**
|
||||
* 任务所属应用ID
|
||||
*/
|
||||
private Long appId;
|
||||
// 当前页码
|
||||
/**
|
||||
* 当前页码
|
||||
*/
|
||||
private Integer index;
|
||||
// 页大小
|
||||
/**
|
||||
* 页大小
|
||||
*/
|
||||
private Integer pageSize;
|
||||
|
||||
// 查询条件
|
||||
/**
|
||||
* 查询条件
|
||||
*/
|
||||
private Long workflowId;
|
||||
private String keyword;
|
||||
|
||||
|
@ -11,15 +11,23 @@ import lombok.Data;
|
||||
@Data
|
||||
public class QueryWorkflowInstanceRequest {
|
||||
|
||||
// 任务所属应用ID
|
||||
/**
|
||||
* 任务所属应用ID
|
||||
*/
|
||||
private Long appId;
|
||||
// 当前页码
|
||||
/**
|
||||
* 当前页码
|
||||
*/
|
||||
private Integer index;
|
||||
// 页大小
|
||||
/**
|
||||
* 页大小
|
||||
*/
|
||||
private Integer pageSize;
|
||||
|
||||
// 查询条件(NORMAL/WORKFLOW)
|
||||
/**
|
||||
* 查询条件(NORMAL/WORKFLOW)
|
||||
*/
|
||||
private Long wfInstanceId;
|
||||
|
||||
private Long workflowId;
|
||||
|
||||
private String status;
|
||||
|
@ -14,21 +14,33 @@ import lombok.Data;
|
||||
@Data
|
||||
public class SaveContainerInfoRequest {
|
||||
|
||||
// 容器ID,null -> 创建;否则代表修改
|
||||
/**
|
||||
* 容器ID,null -> 创建;否则代表修改
|
||||
*/
|
||||
private Long id;
|
||||
|
||||
// 所属的应用ID
|
||||
/**
|
||||
* 所属的应用ID
|
||||
*/
|
||||
private Long appId;
|
||||
|
||||
// 容器名称
|
||||
/**
|
||||
* 容器名称
|
||||
*/
|
||||
private String containerName;
|
||||
|
||||
// 容器类型,枚举值为 ContainerSourceType(JarFile/Git)
|
||||
/**
|
||||
* 容器类型,枚举值为 ContainerSourceType(JarFile/Git)
|
||||
*/
|
||||
private ContainerSourceType sourceType;
|
||||
// 由 sourceType 决定,JarFile -> String,存储文件名称;Git -> JSON,包括 URL,branch,username,password
|
||||
/**
|
||||
* 由 sourceType 决定,JarFile -> String,存储文件名称;Git -> JSON,包括 URL,branch,username,password
|
||||
*/
|
||||
private String sourceInfo;
|
||||
|
||||
// 状态,枚举值为 ContainerStatus(ENABLE/DISABLE)
|
||||
/**
|
||||
* 状态,枚举值为 ContainerStatus(ENABLE/DISABLE)
|
||||
*/
|
||||
private SwitchableStatus status;
|
||||
|
||||
public void valid() {
|
||||
|
@ -17,18 +17,25 @@ public class ContainerInfoVO {
|
||||
|
||||
private String containerName;
|
||||
|
||||
// 容器类型,枚举值为 ContainerSourceType
|
||||
/**
|
||||
* 容器类型,枚举值为 ContainerSourceType
|
||||
*/
|
||||
private String sourceType;
|
||||
// 由 sourceType 决定,JarFile -> String,存储文件名称;Git -> JSON,包括 URL,branch,username,password
|
||||
/**
|
||||
* 由 sourceType 决定,JarFile -> String,存储文件名称;Git -> JSON,包括 URL,branch,username,password
|
||||
*/
|
||||
private String sourceInfo;
|
||||
|
||||
// 版本 (Jar包使用md5,Git使用commitId,前者32位,后者40位,不会产生碰撞)
|
||||
/**
|
||||
* 版本 (Jar包使用md5,Git使用commitId,前者32位,后者40位,不会产生碰撞)
|
||||
*/
|
||||
private String version;
|
||||
|
||||
// 状态,枚举值为 ContainerStatus
|
||||
/**
|
||||
* 状态,枚举值为 ContainerStatus
|
||||
*/
|
||||
private String status;
|
||||
|
||||
// 上一次部署时间
|
||||
/**
|
||||
* 上一次部署时间
|
||||
*/
|
||||
private String lastDeployTime;
|
||||
|
||||
private Date gmtCreate;
|
||||
|
@ -15,29 +15,44 @@ import org.springframework.beans.BeanUtils;
|
||||
@Data
|
||||
public class InstanceInfoVO {
|
||||
|
||||
// 任务ID(JS精度丢失)
|
||||
/**
|
||||
* 任务ID(JS精度丢失)
|
||||
*/
|
||||
private String jobId;
|
||||
// 任务名称
|
||||
/**
|
||||
* 任务名称
|
||||
*/
|
||||
private String jobName;
|
||||
// 任务实例ID(JS精度丢失)
|
||||
/**
|
||||
* 任务实例ID(JS精度丢失)
|
||||
*/
|
||||
private String instanceId;
|
||||
// 该任务实例所属的 workflow ID,仅 workflow 任务存在
|
||||
/**
|
||||
* 该任务实例所属的 workflow ID,仅 workflow 任务存在
|
||||
*/
|
||||
private String wfInstanceId;
|
||||
|
||||
// 执行结果
|
||||
/**
|
||||
* 执行结果
|
||||
*/
|
||||
private String result;
|
||||
|
||||
// TaskTracker地址
|
||||
/**
|
||||
* TaskTracker地址
|
||||
*/
|
||||
private String taskTrackerAddress;
|
||||
|
||||
// 总共执行的次数(用于重试判断)
|
||||
/**
|
||||
* 总共执行的次数(用于重试判断)
|
||||
*/
|
||||
private Long runningTimes;
|
||||
private int status;
|
||||
|
||||
/* ********** 不一致区域 ********** */
|
||||
// 实际触发时间(需要格式化为人看得懂的时间)
|
||||
/**
|
||||
* 实际触发时间(需要格式化为人看得懂的时间)
|
||||
*/
|
||||
private String actualTriggerTime;
|
||||
// 结束时间(同理,需要格式化)
|
||||
/**
|
||||
* 结束时间(同理,需要格式化)
|
||||
*/
|
||||
private String finishedTime;
|
||||
|
||||
public static InstanceInfoVO from(InstanceInfoDO instanceInfoDo, String jobName) {
|
||||
|
@ -17,9 +17,13 @@ public class SystemOverviewVO {
|
||||
private long jobCount;
|
||||
private long runningInstanceCount;
|
||||
private long failedInstanceCount;
|
||||
// 服务器时区
|
||||
/**
|
||||
* 服务器时区
|
||||
*/
|
||||
private String timezone;
|
||||
// 服务器时间
|
||||
/**
|
||||
* 服务器时间
|
||||
*/
|
||||
private String serverTime;
|
||||
|
||||
private ServerInfo serverInfo;
|
||||
|
@ -27,12 +27,18 @@ public class WorkerStatusVO {
|
||||
private String tag;
|
||||
private String lastActiveTime;
|
||||
|
||||
// 1 -> 健康,绿色,2 -> 一般,橙色,3 -> 糟糕,红色,9999 -> 非在线机器
|
||||
/**
|
||||
* 1 -> 健康,绿色,2 -> 一般,橙色,3 -> 糟糕,红色,9999 -> 非在线机器
|
||||
*/
|
||||
private int status;
|
||||
|
||||
// 12.3%(4 cores)
|
||||
/**
|
||||
* 12.3%(4 cores)
|
||||
*/
|
||||
private static final String CPU_FORMAT = "%s / %s cores";
|
||||
// 27.7%(2.9/8.0 GB)
|
||||
/**
|
||||
* 27.7%(2.9/8.0 GB)
|
||||
*/
|
||||
private static final String OTHER_FORMAT = "%s%%(%s / %s GB)";
|
||||
private static final DecimalFormat df = new DecimalFormat("#.#");
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user