diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java index 63aafe78..974f3612 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java @@ -131,4 +131,10 @@ public class JobInfoDO { private Date gmtModified; + /** + * 扩展参数,PowerJob 自身不会使用该数据,留给开发者扩展时使用 + * 比如 WorkerFilter 的自定义 worker 过滤逻辑,可在次传入过滤指标 GpuUsage < 10 + */ + private String extra; + } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/DispatchService.java similarity index 82% rename from powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java rename to powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/DispatchService.java index 04e82603..356b699a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/DispatchService.java @@ -1,19 +1,17 @@ -package com.github.kfcfans.powerjob.server.service; +package com.github.kfcfans.powerjob.server.remote; import com.github.kfcfans.powerjob.common.*; -import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; -import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; +import com.github.kfcfans.powerjob.server.remote.transport.TransportService; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; import com.github.kfcfans.powerjob.server.service.instance.InstanceMetadataService; import com.github.kfcfans.powerjob.server.service.lock.local.UseSegmentLock; -import com.github.kfcfans.powerjob.server.remote.transport.TransportService; -import com.google.common.base.Splitter; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; @@ -23,7 +21,6 @@ import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.util.Date; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; import static com.github.kfcfans.powerjob.common.InstanceStatus.*; @@ -42,6 +39,8 @@ public class DispatchService { @Resource private TransportService transportService; + @Resource + private WorkerClusterQueryService workerClusterQueryService; @Resource private InstanceManager instanceManager; @@ -50,7 +49,6 @@ public class DispatchService { @Resource private InstanceInfoRepository instanceInfoRepository; - private static final Splitter COMMA_SPLITTER = Splitter.on(","); /** * 重新派发任务 @@ -120,12 +118,12 @@ public class DispatchService { return; } } + // 获取当前最合适的 worker 列表 - List suitableWorkers = obtainSuitableWorkers(jobInfo); + List suitableWorkers = workerClusterQueryService.getSuitableWorkers(jobInfo); if (CollectionUtils.isEmpty(suitableWorkers)) { - String clusterStatusDescription = WorkerClusterManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId()); - log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available, clusterStatus is {}.", jobId, instanceId, clusterStatusDescription); + log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available", jobId, instanceId); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, now); instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE); @@ -151,32 +149,6 @@ public class DispatchService { instanceMetadataService.loadJobInfo(instanceId, jobInfo); } - /** - * 获取当前最合适的 worker 列表 - */ - private List obtainSuitableWorkers(JobInfoDO jobInfo) { - // 获取当前所有可用的Worker - List allAvailableWorker = WorkerClusterManagerService.getSortedAvailableWorkers(jobInfo.getAppId(), jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace()); - - // 筛选指定的机器 - allAvailableWorker.removeIf(worker -> { - // 空,则全部不过滤 - if (StringUtils.isEmpty(jobInfo.getDesignatedWorkers())) { - return false; - } - // 非空,只有匹配上的 worker 才不被过滤 - Set designatedWorkers = Sets.newHashSet(COMMA_SPLITTER.splitToList(jobInfo.getDesignatedWorkers())); - return !designatedWorkers.contains(worker.getAddress()); - }); - - - // 限定集群大小(0代表不限制) - if (!allAvailableWorker.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && allAvailableWorker.size() > jobInfo.getMaxWorkerCount()) { - allAvailableWorker = allAvailableWorker.subList(0, jobInfo.getMaxWorkerCount()); - } - return allAvailableWorker; - } - /** * 构造任务调度请求 */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/ClusterStatusHolder.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/ClusterStatusHolder.java index 6d185488..24dd631d 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/ClusterStatusHolder.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/ClusterStatusHolder.java @@ -1,8 +1,6 @@ package com.github.kfcfans.powerjob.server.remote.worker.cluster; -import com.alibaba.fastjson.JSON; import com.github.kfcfans.powerjob.common.model.DeployedContainerInfo; -import com.github.kfcfans.powerjob.common.model.SystemMetrics; import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -12,7 +10,6 @@ import org.springframework.util.CollectionUtils; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; /** * 管理Worker集群状态 @@ -30,7 +27,6 @@ public class ClusterStatusHolder { // 集群中所有机器的容器部署状态 containerId -> (workerAddress -> containerInfo) private Map> containerId2Infos; - private static final long WORKER_TIMEOUT_MS = 60000; public ClusterStatusHolder(String appName) { this.appName = appName; @@ -68,53 +64,14 @@ public class ClusterStatusHolder { } } - /** - * 获取当前所有可用的 Worker - * @param minCPUCores 最低CPU核心数量 - * @param minMemorySpace 最低内存可用空间,单位GB - * @param minDiskSpace 最低磁盘可用空间,单位GB - * @return List - */ - public List getSortedAvailableWorkers(double minCPUCores, double minMemorySpace, double minDiskSpace) { - List workers = getAvailableWorkers(minCPUCores, minMemorySpace, minDiskSpace); - - // 按机器健康度排序 - workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); - - return workers; + public List getAllWorkers() { + return Lists.newLinkedList(address2WorkerInfo.values()); } public WorkerInfo getWorkerInfo(String address) { return address2WorkerInfo.get(address); } - public List getAvailableWorkers(double minCPUCores, double minMemorySpace, double minDiskSpace) { - List workerInfos = Lists.newArrayList(); - address2WorkerInfo.forEach((address, workerInfo) -> { - - if (timeout(address)) { - log.info("[ClusterStatusHolder] worker(address={},metrics={}) was filtered because of timeout, last active time is {}.", address, workerInfo.getSystemMetrics(), workerInfo.getLastActiveTime()); - return; - } - // 判断指标 - SystemMetrics metrics = workerInfo.getSystemMetrics(); - if (metrics.available(minCPUCores, minMemorySpace, minDiskSpace)) { - workerInfos.add(workerInfo); - }else { - log.info("[ClusterStatusHolder] worker(address={},metrics={}) was filtered by config(minCPUCores={},minMemory={},minDiskSpace={})", address, metrics, minCPUCores, minMemorySpace, minDiskSpace); - } - }); - return workerInfos; - } - - /** - * 获取整个集群的简介 - * @return 获取集群简介 - */ - public String getClusterDescription() { - return String.format("appName:%s,clusterStatus:%s", appName, JSON.toJSONString(address2WorkerInfo)); - } - /** * 获取当前连接的的机器详情 * @return map @@ -122,7 +79,7 @@ public class ClusterStatusHolder { public Map getActiveWorkerInfo() { Map res = Maps.newHashMap(); address2WorkerInfo.forEach((address, workerInfo) -> { - if (!timeout(address)) { + if (!workerInfo.timeout()) { res.put(address, workerInfo); } }); @@ -154,7 +111,7 @@ public class ClusterStatusHolder { // 丢弃超时机器的信息 List timeoutAddress = Lists.newLinkedList(); address2WorkerInfo.forEach((addr, workerInfo) -> { - if (timeout(addr)) { + if (workerInfo.timeout()) { timeoutAddress.add(addr); } }); @@ -164,15 +121,4 @@ public class ClusterStatusHolder { timeoutAddress.forEach(address2WorkerInfo::remove); } } - - private boolean timeout(String address) { - // 排除超时机器 - return Optional.ofNullable(address2WorkerInfo.get(address)) - .map(workerInfo -> { - long timeout = System.currentTimeMillis() - workerInfo.getLastActiveTime(); - return timeout > WORKER_TIMEOUT_MS; - }) - .orElse(true); - - } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterManagerService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterManagerService.java index ea2687c7..edad4602 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterManagerService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterManagerService.java @@ -9,7 +9,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.*; /** - * Worker 管理服务 + * 管理 worker 集群信息 * * @author tjq * @since 2020/4/5 @@ -32,25 +32,19 @@ public class WorkerClusterManagerService { } /** - * 获取有序的当前所有可用的Worker地址(按得分高低排序,排在前面的健康度更高) + * 获取某个 app 下所有连接过的机器信息 + * @param appId appId + * @return 所有连接过的机器信息列表 */ - public static List getSortedAvailableWorkers(Long appId, double minCPUCores, double minMemorySpace, double minDiskSpace) { + public static List getWorkerInfosByAppId(Long appId) { ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); if (clusterStatusHolder == null) { log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId); return Collections.emptyList(); } - return clusterStatusHolder.getSortedAvailableWorkers(minCPUCores, minMemorySpace, minDiskSpace); + return clusterStatusHolder.getAllWorkers(); } - public static List getAvailableWorkers(Long appId, double minCPUCores, double minMemorySpace, double minDiskSpace) { - ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); - if (clusterStatusHolder == null) { - log.warn("[WorkerManagerService] can't find any worker for app(appId={}) yet.", appId); - return Collections.emptyList(); - } - return clusterStatusHolder.getAvailableWorkers(minCPUCores, minMemorySpace, minDiskSpace); - } public static Optional getWorkerInfo(Long appId, String address) { ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); @@ -70,19 +64,6 @@ public class WorkerClusterManagerService { appId2ClusterStatus.entrySet().removeIf(entry -> !keys.contains(entry.getKey())); } - /** - * 获取某个应用下的Worker集群状态描述 - * @param appId 应用ID - * @return 集群状态描述信息 - */ - public static String getWorkerClusterStatusDescription(Long appId) { - ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId); - if (clusterStatusHolder == null) { - return "CAN'T_FIND_ANY_WORKER"; - } - return clusterStatusHolder.getClusterDescription(); - } - /** * 获取当前连接到该Server的Worker信息 * @param appId 应用ID diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterQueryService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterQueryService.java new file mode 100644 index 00000000..8b9ad82f --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerClusterQueryService.java @@ -0,0 +1,55 @@ +package com.github.kfcfans.powerjob.server.remote.worker.cluster; + +import com.github.kfcfans.powerjob.server.extension.WorkerFilter; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * 获取 worker 集群信息 + * + * @author tjq + * @since 2021/2/19 + */ +@Service +public class WorkerClusterQueryService { + + private List workerFilters; + + @Autowired + public WorkerClusterQueryService(List workerFilters) { + this.workerFilters = workerFilters; + } + + /** + * get worker for job + * @param jobInfo job + * @return worker cluster info, sorted by metrics desc + */ + public List getSuitableWorkers(JobInfoDO jobInfo) { + + List workers = WorkerClusterManagerService.getWorkerInfosByAppId(jobInfo.getAppId()); + + workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo)); + + // 按健康度排序 + workers.sort((o1, o2) -> o2 .getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); + + // 限定集群大小(0代表不限制) + if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) { + workers = workers.subList(0, jobInfo.getMaxWorkerCount()); + } + return workers; + } + + private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) { + for (WorkerFilter filter : workerFilters) { + if (filter.filter(workerInfo, jobInfo)) { + return true; + } + } + return false; + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerInfo.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerInfo.java index 7d4b069c..cd74b3a9 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerInfo.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/WorkerInfo.java @@ -28,6 +28,8 @@ public class WorkerInfo { private List containerInfos; + private static final long WORKER_TIMEOUT_MS = 60000; + public void refresh(WorkerHeartbeat workerHeartbeat) { address = workerHeartbeat.getWorkerAddress(); lastActiveTime = workerHeartbeat.getHeartbeatTime(); @@ -36,4 +38,9 @@ public class WorkerInfo { systemMetrics = workerHeartbeat.getSystemMetrics(); containerInfos = workerHeartbeat.getContainerInfos(); } + + public boolean timeout() { + long timeout = System.currentTimeMillis() - lastActiveTime; + return timeout > WORKER_TIMEOUT_MS; + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/filter/DesignatedWorkerFilter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/filter/DesignatedWorkerFilter.java new file mode 100644 index 00000000..d424dc92 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/filter/DesignatedWorkerFilter.java @@ -0,0 +1,37 @@ +package com.github.kfcfans.powerjob.server.remote.worker.cluster.filter; + +import com.github.kfcfans.powerjob.server.common.SJ; +import com.github.kfcfans.powerjob.server.extension.WorkerFilter; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; +import com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import java.util.Set; + +/** + * just use designated worker + * + * @author tjq + * @since 2021/2/19 + */ +@Component +public class DesignatedWorkerFilter implements WorkerFilter { + + // min length 1.1.1.1:1 + private static final int MIN_IP_LENGTH = 9; + + @Override + public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO) { + + String designatedWorkers = jobInfoDO.getDesignatedWorkers(); + + if (StringUtils.isEmpty(designatedWorkers) || designatedWorkers.length() < MIN_IP_LENGTH) { + return false; + } + + Set designatedWorkersSet = Sets.newHashSet(SJ.commaSplitter.splitToList(designatedWorkers)); + return !designatedWorkersSet.contains(workerInfo.getAddress()); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/filter/DisconnectedWorkerFilter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/filter/DisconnectedWorkerFilter.java new file mode 100644 index 00000000..a56152f2 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/filter/DisconnectedWorkerFilter.java @@ -0,0 +1,21 @@ +package com.github.kfcfans.powerjob.server.remote.worker.cluster.filter; + +import com.github.kfcfans.powerjob.server.extension.WorkerFilter; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; +import org.springframework.stereotype.Component; + +/** + * filter disconnected worker + * + * @author tjq + * @since 2021/2/19 + */ +@Component +public class DisconnectedWorkerFilter implements WorkerFilter { + + @Override + public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO) { + return workerInfo.timeout(); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/filter/SystemMetricsWorkerFilter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/filter/SystemMetricsWorkerFilter.java new file mode 100644 index 00000000..0a40973b --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/cluster/filter/SystemMetricsWorkerFilter.java @@ -0,0 +1,23 @@ +package com.github.kfcfans.powerjob.server.remote.worker.cluster.filter; + +import com.github.kfcfans.powerjob.common.model.SystemMetrics; +import com.github.kfcfans.powerjob.server.extension.WorkerFilter; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; +import org.springframework.stereotype.Component; + +/** + * filter worker by system metric + * + * @author tjq + * @since 2021/2/19 + */ +@Component +public class SystemMetricsWorkerFilter implements WorkerFilter { + + @Override + public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) { + SystemMetrics metrics = workerInfo.getSystemMetrics(); + return !metrics.available(jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace()); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/handler/WorkerRequestHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/handler/WorkerRequestHandler.java index d554d98a..0a8e4659 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/handler/WorkerRequestHandler.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/worker/handler/WorkerRequestHandler.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.server.remote.worker.handler; import com.github.kfcfans.powerjob.common.InstanceStatus; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService; import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import com.github.kfcfans.powerjob.common.request.*; import com.github.kfcfans.powerjob.common.response.AskResponse; @@ -49,6 +50,9 @@ public class WorkerRequestHandler { @Resource private ContainerInfoRepository containerInfoRepository; + @Resource + private WorkerClusterQueryService workerClusterQueryService; + private static WorkerRequestHandler workerRequestHandler; /** @@ -134,7 +138,7 @@ public class WorkerRequestHandler { if (!jobInfo.getAppId().equals(appId)) { askResponse = AskResponse.failed("Permission Denied!"); }else { - List sortedAvailableWorker = WorkerClusterManagerService.getAvailableWorkers(appId, jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace()) + List sortedAvailableWorker = workerClusterQueryService.getSuitableWorkers(jobInfo) .stream().map(WorkerInfo::getAddress).collect(Collectors.toList()); askResponse = AskResponse.succeed(sortedAvailableWorker); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index 23722ab5..86be4bac 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -8,6 +8,7 @@ import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.response.JobInfoDTO; import com.github.kfcfans.powerjob.server.common.SJ; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; +import com.github.kfcfans.powerjob.server.remote.DispatchService; import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer; import com.github.kfcfans.powerjob.server.common.utils.CronExpression; import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java index 5da90e35..e2ed4dde 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java @@ -8,7 +8,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; -import com.github.kfcfans.powerjob.server.service.DispatchService; +import com.github.kfcfans.powerjob.server.remote.DispatchService; import com.github.kfcfans.powerjob.server.service.InstanceLogService; import com.github.kfcfans.powerjob.server.service.UserService; import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index 32e86989..a6064873 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -15,7 +15,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; -import com.github.kfcfans.powerjob.server.service.DispatchService; +import com.github.kfcfans.powerjob.server.remote.DispatchService; import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; import com.github.kfcfans.powerjob.server.service.id.IdGenerateService; import com.github.kfcfans.powerjob.server.remote.transport.TransportService; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java index aad8508d..e96ead42 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java @@ -8,7 +8,7 @@ import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.remote.transport.starter.AkkaStarter; import com.github.kfcfans.powerjob.server.persistence.core.model.*; import com.github.kfcfans.powerjob.server.persistence.core.repository.*; -import com.github.kfcfans.powerjob.server.service.DispatchService; +import com.github.kfcfans.powerjob.server.remote.DispatchService; import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager; import com.google.common.base.Stopwatch; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index ddac25c3..eae4f403 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -12,7 +12,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRep import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; -import com.github.kfcfans.powerjob.server.service.DispatchService; +import com.github.kfcfans.powerjob.server.remote.DispatchService; import com.github.kfcfans.powerjob.server.service.JobService; import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; import com.github.kfcfans.powerjob.server.service.instance.InstanceService; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java index 20c8f6f9..8d4a5fd0 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceManager.java @@ -13,7 +13,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRep import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowNodeInfoRepository; -import com.github.kfcfans.powerjob.server.service.DispatchService; +import com.github.kfcfans.powerjob.server.remote.DispatchService; import com.github.kfcfans.powerjob.server.service.UserService; import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter; import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarm; diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/BasicProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/BasicProcessor.java index 32d75c4b..99816d4a 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/BasicProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/BasicProcessor.java @@ -13,8 +13,7 @@ public interface BasicProcessor { /** * 核心处理逻辑 - * 可通过 {@link TaskContext#fetchWorkflowContext} 获取工作流上下文 - * 可通过 {@link TaskContext#appendData2WfContext} 向工作流上下文中添加数据 + * 可通过 {@link TaskContext#workflowContext} 获取工作流上下文 * * @param context 任务上下文,可通过 jobParams 和 instanceParams 分别获取控制台参数和OpenAPI传递的任务实例参数 * @return 处理结果,msg有长度限制,超长会被裁剪,不允许返回 null