diff --git a/powerjob-client/src/main/java/tech/powerjob/client/IPowerJobClient.java b/powerjob-client/src/main/java/tech/powerjob/client/IPowerJobClient.java index 0df519b1..05591927 100644 --- a/powerjob-client/src/main/java/tech/powerjob/client/IPowerJobClient.java +++ b/powerjob-client/src/main/java/tech/powerjob/client/IPowerJobClient.java @@ -1,5 +1,6 @@ package tech.powerjob.client; +import tech.powerjob.common.request.http.RunJobOpenApiRequest; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.request.http.SaveWorkflowNodeRequest; import tech.powerjob.common.request.http.SaveWorkflowRequest; @@ -38,6 +39,8 @@ public interface IPowerJobClient { ResultDTO runJob(Long jobId, String instanceParams, long delayMS); + PowerResultDTO runJob(RunJobOpenApiRequest request); + /* ************* Instance API list ************* */ ResultDTO stopInstance(Long instanceId); diff --git a/powerjob-client/src/main/java/tech/powerjob/client/PowerJobClient.java b/powerjob-client/src/main/java/tech/powerjob/client/PowerJobClient.java index f2e668ce..cf638f02 100644 --- a/powerjob-client/src/main/java/tech/powerjob/client/PowerJobClient.java +++ b/powerjob-client/src/main/java/tech/powerjob/client/PowerJobClient.java @@ -14,6 +14,7 @@ import tech.powerjob.common.OpenAPIConstant; import tech.powerjob.common.enums.EncryptType; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.request.http.RunJobOpenApiRequest; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.request.http.SaveWorkflowNodeRequest; import tech.powerjob.common.request.http.SaveWorkflowRequest; @@ -237,21 +238,28 @@ public class PowerJobClient implements IPowerJobClient, Closeable { */ @Override public ResultDTO runJob(Long jobId, String instanceParams, long delayMS) { - - Map param = Maps.newHashMap(); - param.put("jobId", jobId.toString()); - param.put("appId", appId.toString()); - param.put("delay", String.valueOf(delayMS)); - - if (StringUtils.isNotEmpty(instanceParams)) { - param.put("instanceParams", instanceParams); - } - String post = requestService.request(OpenAPIConstant.RUN_JOB, PowerRequestBody.newFormRequestBody(param)); - return JSON.parseObject(post, LONG_RESULT_TYPE); + RunJobOpenApiRequest runJobOpenApiRequest = new RunJobOpenApiRequest(); + runJobOpenApiRequest.setJobId(jobId); + runJobOpenApiRequest.setInstanceParams(instanceParams); + runJobOpenApiRequest.setDelay(delayMS); + return runJob(runJobOpenApiRequest); } public ResultDTO runJob(Long jobId) { - return runJob(jobId, null, 0); + RunJobOpenApiRequest runJobOpenApiRequest = new RunJobOpenApiRequest(); + runJobOpenApiRequest.setJobId(jobId); + return runJob(runJobOpenApiRequest); + } + + @Override + public PowerResultDTO runJob(RunJobOpenApiRequest request) { + + request.setAppId(appId); + + // 中坑记录:用 FastJSON 序列化会导致 Server 接收时 pEWorkflowDAG 为 null,无语.jpg + String json = JsonUtils.toJSONStringUnsafe(request); + String post = requestService.request(OpenAPIConstant.RUN_JOB_PLUS, PowerRequestBody.newJsonRequestBody(json)); + return JSON.parseObject(post, LONG_POWER_RESULT_TYPE); } /* ************* Instance API list ************* */ diff --git a/powerjob-client/src/main/java/tech/powerjob/client/TypeStore.java b/powerjob-client/src/main/java/tech/powerjob/client/TypeStore.java index c3c23406..eee3c069 100644 --- a/powerjob-client/src/main/java/tech/powerjob/client/TypeStore.java +++ b/powerjob-client/src/main/java/tech/powerjob/client/TypeStore.java @@ -22,6 +22,8 @@ public class TypeStore { public static final TypeReference> LONG_RESULT_TYPE = new TypeReference>(){}; + public static final TypeReference> LONG_POWER_RESULT_TYPE = new TypeReference>(){}; + public static final TypeReference> JOB_RESULT_TYPE = new TypeReference>(){}; public static final TypeReference> SAVE_JOB_INFO_REQUEST_RESULT_TYPE = new TypeReference>(){}; diff --git a/powerjob-common/src/main/java/tech/powerjob/common/OpenAPIConstant.java b/powerjob-common/src/main/java/tech/powerjob/common/OpenAPIConstant.java index 2d7d8f55..ccb36923 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/OpenAPIConstant.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/OpenAPIConstant.java @@ -32,6 +32,8 @@ public class OpenAPIConstant { public static final String DELETE_JOB = "/deleteJob"; public static final String RUN_JOB = "/runJob"; + public static final String RUN_JOB_PLUS = "/runJobPlus"; + /* ************* Instance 区 ************* */ public static final String STOP_INSTANCE = "/stopInstance"; diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/WorkerQueryExecutorClusterReq.java b/powerjob-common/src/main/java/tech/powerjob/common/request/WorkerQueryExecutorClusterReq.java index a1af21e6..1aa254dd 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/WorkerQueryExecutorClusterReq.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/WorkerQueryExecutorClusterReq.java @@ -17,4 +17,6 @@ import lombok.NoArgsConstructor; public class WorkerQueryExecutorClusterReq implements PowerSerializable { private Long appId; private Long jobId; + + private Long instanceId; } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/common/RunJobRequest.java b/powerjob-common/src/main/java/tech/powerjob/common/request/common/RunJobRequest.java new file mode 100644 index 00000000..3103999c --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/common/RunJobRequest.java @@ -0,0 +1,33 @@ +package tech.powerjob.common.request.common; + +import lombok.Data; + +/** + * 运行任务 + * + * @author tjq + * @since 2024/8/23 + */ +@Data +public class RunJobRequest { + + /** + * 目标任务ID + */ + private Long jobId; + + /** + * 任务实例参数 + */ + private String instanceParams; + + /** + * 延迟执行时间 + */ + private Long delay; + + /** + * 指定机器运行,空代表不限,非空则只会使用其中的机器运行(多值逗号分割) + */ + private String designatedWorkers; +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/http/RunJobOpenApiRequest.java b/powerjob-common/src/main/java/tech/powerjob/common/request/http/RunJobOpenApiRequest.java new file mode 100644 index 00000000..a97eb1d8 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/http/RunJobOpenApiRequest.java @@ -0,0 +1,20 @@ +package tech.powerjob.common.request.http; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import tech.powerjob.common.request.common.RunJobRequest; + +/** + * RunJobOpenApiRequest + * + * @author tjq + * @since 2024/8/23 + */ +@Data +@EqualsAndHashCode(callSuper = true) +public class RunJobOpenApiRequest extends RunJobRequest { + /** + * 自动填充 appId + */ + private Long appId; +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java index be565503..b76f3d68 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java @@ -148,7 +148,7 @@ public class DispatchService { } // 获取当前最合适的 worker 列表 - List suitableWorkers = workerClusterQueryService.geAvailableWorkers(jobInfo); + List suitableWorkers = workerClusterQueryService.geAvailableWorkers(jobInfo, instanceInfo); if (CollectionUtils.isEmpty(suitableWorkers)) { log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available", jobId, instanceId); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java index 595f135d..bfb03967 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/AbWorkerRequestHandler.java @@ -6,13 +6,13 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.beans.BeanUtils; import org.springframework.core.env.Environment; import tech.powerjob.common.enums.InstanceStatus; +import tech.powerjob.common.enums.SwitchableStatus; import tech.powerjob.common.request.*; import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.NetUtils; import tech.powerjob.remote.framework.actor.Handler; import tech.powerjob.remote.framework.actor.ProcessType; -import tech.powerjob.common.enums.SwitchableStatus; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.utils.SpringUtils; import tech.powerjob.server.monitor.MonitorService; @@ -20,8 +20,10 @@ import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent; import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent; import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent; import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; +import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; +import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; @@ -129,18 +131,26 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler { JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class); Optional jobInfoOpt = jobInfoRepository.findById(jobId); - if (jobInfoOpt.isPresent()) { - JobInfoDO jobInfo = jobInfoOpt.get(); - if (!jobInfo.getAppId().equals(appId)) { - askResponse = AskResponse.failed("Permission Denied!"); - }else { - List sortedAvailableWorker = workerClusterQueryService.geAvailableWorkers(jobInfo) - .stream().map(WorkerInfo::getAddress).collect(Collectors.toList()); - askResponse = AskResponse.succeed(sortedAvailableWorker); - } - }else { - askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId); + + if (!jobInfoOpt.isPresent()) { + return AskResponse.failed("can't find jobInfo by jobId: " + jobId); } + + InstanceInfoRepository instanceInfoRepository = SpringUtils.getBean(InstanceInfoRepository.class); + InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(req.getInstanceId()); + if (instanceInfoDO == null) { + return AskResponse.failed("can't find instanceInfo by instanceId: " + req.getInstanceId()); + } + + JobInfoDO jobInfo = jobInfoOpt.get(); + if (!jobInfo.getAppId().equals(appId)) { + askResponse = AskResponse.failed("Permission Denied!"); + }else { + List sortedAvailableWorker = workerClusterQueryService.geAvailableWorkers(jobInfo, instanceInfoDO) + .stream().map(WorkerInfo::getAddress).collect(Collectors.toList()); + askResponse = AskResponse.succeed(sortedAvailableWorker); + } + return askResponse; } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java index 2f09a052..68c3b3de 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java @@ -80,7 +80,7 @@ public class InstanceService { * @param expectTriggerTime 预期执行时间 * @return 任务实例ID */ - public InstanceInfoDO create(Long jobId, Long appId, String jobParams, String instanceParams, Long wfInstanceId, Long expectTriggerTime) { + public InstanceInfoDO create(Long jobId, Long appId, String jobParams, String instanceParams, Long wfInstanceId, Long expectTriggerTime, String designatedWorkers) { Long instanceId = idGenerateService.allocate(); Date now = new Date(); @@ -93,6 +93,7 @@ public class InstanceService { newInstanceInfo.setInstanceParams(instanceParams); newInstanceInfo.setType(wfInstanceId == null ? InstanceType.NORMAL.getV() : InstanceType.WORKFLOW.getV()); newInstanceInfo.setWfInstanceId(wfInstanceId); + newInstanceInfo.setDesignatedWorkers(designatedWorkers); newInstanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); newInstanceInfo.setRunningTimes(0L); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java index 7ab429d6..eee67409 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java @@ -12,6 +12,7 @@ import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.enums.SwitchableStatus; +import tech.powerjob.common.request.common.RunJobRequest; import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService; import tech.powerjob.server.core.DispatchService; import tech.powerjob.server.core.instance.InstanceService; @@ -167,7 +168,7 @@ public class PowerScheduleService { log.info("[NormalScheduler] These {} jobs will be scheduled: {}.", timeExpressionType.name(), jobInfos); jobInfos.forEach(jobInfo -> { - Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime()).getInstanceId(); + Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime(), jobInfo.getDesignatedWorkers()).getInstanceId(); jobId2InstanceId.put(jobInfo.getId(), instanceId); }); instanceInfoRepository.flush(); @@ -276,7 +277,10 @@ public class PowerScheduleService { log.info("[FrequentScheduler] disable frequent job,id:{}.", jobInfoDO.getId()); } else if (lifeCycle.getStart() == null || lifeCycle.getStart() < System.currentTimeMillis() + SCHEDULE_RATE * 2) { log.info("[FrequentScheduler] schedule frequent job,id:{}.", jobInfoDO.getId()); - jobService.runJob(jobInfoDO.getAppId(), jobId, null, Optional.ofNullable(lifeCycle.getStart()).orElse(0L) - System.currentTimeMillis()); + RunJobRequest runJobRequest = new RunJobRequest(); + runJobRequest.setJobId(jobId); + runJobRequest.setDelay(Optional.ofNullable(lifeCycle.getStart()).orElse(0L) - System.currentTimeMillis()); + jobService.runJob(jobInfoDO.getAppId(), runJobRequest); } }); }); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java index 1ad88b8d..63d9e41f 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java @@ -1,6 +1,7 @@ package tech.powerjob.server.core.service; import tech.powerjob.common.PowerQuery; +import tech.powerjob.common.request.common.RunJobRequest; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.response.JobInfoDTO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; @@ -25,7 +26,7 @@ public interface JobService { List queryJob(PowerQuery powerQuery); - long runJob(Long appId, Long jobId, String instanceParams, Long delay); + long runJob(Long appId, RunJobRequest runJobRequest); void deleteJob(Long jobId); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobServiceImpl.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobServiceImpl.java index 550c7f68..eb07b0aa 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobServiceImpl.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/impl/job/JobServiceImpl.java @@ -14,6 +14,7 @@ import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.AlarmConfig; import tech.powerjob.common.model.LifeCycle; +import tech.powerjob.common.request.common.RunJobRequest; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.response.JobInfoDTO; import tech.powerjob.common.serialize.JsonUtils; @@ -170,33 +171,34 @@ public class JobServiceImpl implements JobService { } /** - * 手动立即运行某个任务 - * - * @param jobId 任务ID - * @param instanceParams 任务实例参数(仅 OpenAPI 存在) - * @param delay 延迟时间,单位 毫秒 + * 立即运行某个任务 + * @param appId appId,用于集群路由 + * @param runJobRequest 请求 * @return 任务实例ID */ @Override @DesignateServer - public long runJob(Long appId, Long jobId, String instanceParams, Long delay) { + public long runJob(Long appId, RunJobRequest runJobRequest) { + + Long jobId = runJobRequest.getJobId(); + long delay = runJobRequest.getDelay() == null ? 0 : runJobRequest.getDelay(); - delay = delay == null ? 0 : delay; JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); - log.info("[Job-{}] try to run job in app[{}], instanceParams={},delay={} ms.", jobInfo.getId(), appId, instanceParams, delay); - final InstanceInfoDO instanceInfo = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0)); + log.info("[Job-{}] try to run job in app[{}], runJobRequest: {}", jobInfo.getId(), appId, runJobRequest); + + String designatedWorkers = Optional.ofNullable(runJobRequest.getDesignatedWorkers()).orElse(jobInfo.getDesignatedWorkers()); + final InstanceInfoDO instanceInfo = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), runJobRequest.getInstanceParams(), null, System.currentTimeMillis() + Math.max(delay, 0), designatedWorkers); instanceInfoRepository.flush(); if (delay <= 0) { dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.of(instanceInfo),Optional.empty()); } else { InstanceTimeWheelService.schedule(instanceInfo.getInstanceId(), delay, () -> dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.empty(),Optional.empty())); } - log.info("[Job-{}|{}] execute 'runJob' successfully, params={}", jobInfo.getId(), instanceInfo.getInstanceId(), instanceParams); + log.info("[Job-{}|{}] execute 'runJob' successfully", jobInfo.getId(), instanceInfo.getInstanceId()); return instanceInfo.getInstanceId(); } - /** * 删除某个任务 * diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/JobNodeHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/JobNodeHandler.java index a3fc30ed..d0ac3cab 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/JobNodeHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/workflow/hanlder/impl/JobNodeHandler.java @@ -30,8 +30,9 @@ public class JobNodeHandler implements TaskNodeHandler { @Override public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) { + JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new); // instanceParam 传递的是工作流实例的 wfContext - Long instanceId = SpringUtils.getBean(InstanceService.class).create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis()).getInstanceId(); + Long instanceId = SpringUtils.getBean(InstanceService.class).create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis(), jobInfo.getDesignatedWorkers()).getInstanceId(); node.setInstanceId(instanceId); node.setStatus(InstanceStatus.RUNNING.getV()); log.info("[Workflow-{}|{}] create readyNode(JOB) instance(nodeId={},jobId={},instanceId={}) successfully~", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId(), instanceId); diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/InstanceInfoDO.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/InstanceInfoDO.java index c74e6546..c8db26eb 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/InstanceInfoDO.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/InstanceInfoDO.java @@ -104,4 +104,12 @@ public class InstanceInfoDO { private Date gmtModified; + /** + * 指定机器运行,空代表不限,非空则只会使用其中的机器运行(多值逗号分割) + */ + private String designatedWorkers; + /** + * 扩展参数 + */ + private String extra; } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java index 9364a5a1..6d2069b4 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/WorkerClusterQueryService.java @@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import tech.powerjob.common.model.DeployedContainerInfo; import tech.powerjob.server.common.module.WorkerInfo; +import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.remote.server.redirector.DesignateServer; import tech.powerjob.server.remote.worker.filter.WorkerFilter; @@ -34,14 +35,15 @@ public class WorkerClusterQueryService { * get worker for job * * @param jobInfo job + * @param instanceInfo instanceInfo * @return worker cluster info, sorted by metrics desc */ - public List geAvailableWorkers(JobInfoDO jobInfo) { + public List geAvailableWorkers(JobInfoDO jobInfo, InstanceInfoDO instanceInfo) { List workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values()); // 过滤不符合要求的机器 - workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo)); + workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo, instanceInfo)); // 限定集群大小(0代表不限制) if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) { @@ -120,11 +122,12 @@ public class WorkerClusterQueryService { * * @param workerInfo worker info * @param jobInfo job info + * @param instanceInfoDO instanceInfo * @return filter this worker when return true */ - private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) { + private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo, InstanceInfoDO instanceInfoDO) { for (WorkerFilter filter : workerFilters) { - if (filter.filter(workerInfo, jobInfo)) { + if (filter.filter(workerInfo, jobInfo, instanceInfoDO)) { return true; } } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java index c03a9f9f..94b3b102 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DesignatedWorkerFilter.java @@ -6,8 +6,10 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import tech.powerjob.server.common.SJ; import tech.powerjob.server.common.module.WorkerInfo; +import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; +import java.util.Optional; import java.util.Set; /** @@ -21,9 +23,10 @@ import java.util.Set; public class DesignatedWorkerFilter implements WorkerFilter { @Override - public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) { + public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo, InstanceInfoDO instanceInfoDO) { - String designatedWorkers = jobInfo.getDesignatedWorkers(); + // 优先取 instance 上的指定运行时配置 + String designatedWorkers = Optional.ofNullable(instanceInfoDO.getDesignatedWorkers()).orElse(jobInfo.getDesignatedWorkers()); // no worker is specified, no filter of any if (StringUtils.isEmpty(designatedWorkers)) { diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DisconnectedWorkerFilter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DisconnectedWorkerFilter.java index 5cdfb9a8..f46cca1d 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DisconnectedWorkerFilter.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/DisconnectedWorkerFilter.java @@ -1,5 +1,6 @@ package tech.powerjob.server.remote.worker.filter; +import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.common.module.WorkerInfo; import lombok.extern.slf4j.Slf4j; @@ -16,7 +17,7 @@ import org.springframework.stereotype.Component; public class DisconnectedWorkerFilter implements WorkerFilter { @Override - public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) { + public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo, InstanceInfoDO instanceInfoDO) { boolean timeout = workerInfo.timeout(); if (timeout) { log.info("[Job-{}] filter worker[{}] due to timeout(lastActiveTime={})", jobInfo.getId(), workerInfo.getAddress(), workerInfo.getLastActiveTime()); diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/SystemMetricsWorkerFilter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/SystemMetricsWorkerFilter.java index 2270189a..631e9c52 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/SystemMetricsWorkerFilter.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/SystemMetricsWorkerFilter.java @@ -1,6 +1,7 @@ package tech.powerjob.server.remote.worker.filter; import tech.powerjob.common.model.SystemMetrics; +import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.common.module.WorkerInfo; import lombok.extern.slf4j.Slf4j; @@ -17,7 +18,7 @@ import org.springframework.stereotype.Component; public class SystemMetricsWorkerFilter implements WorkerFilter { @Override - public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) { + public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo, InstanceInfoDO instanceInfoDO) { SystemMetrics metrics = workerInfo.getSystemMetrics(); boolean filter = !metrics.available(jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace()); if (filter) { diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/WorkerFilter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/WorkerFilter.java index 35e41586..6b4c6655 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/WorkerFilter.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/worker/filter/WorkerFilter.java @@ -1,5 +1,6 @@ package tech.powerjob.server.remote.worker.filter; +import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.common.module.WorkerInfo; @@ -17,5 +18,5 @@ public interface WorkerFilter { * @param jobInfoDO job info * @return true will remove the worker in process list */ - boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO); + boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO, InstanceInfoDO instanceInfoDO); } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/openapi/OpenAPIController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/openapi/OpenAPIController.java index 267c4355..57c8d659 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/openapi/OpenAPIController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/openapi/OpenAPIController.java @@ -11,6 +11,7 @@ import tech.powerjob.common.PowerQuery; import tech.powerjob.common.enums.ErrorCodes; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.request.http.RunJobOpenApiRequest; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.request.http.SaveWorkflowNodeRequest; import tech.powerjob.common.request.http.SaveWorkflowRequest; @@ -146,10 +147,25 @@ public class OpenAPIController { return ResultDTO.success(null); } + @Deprecated @PostMapping(OpenAPIConstant.RUN_JOB) public ResultDTO runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams, @RequestParam(required = false) Long delay) { checkJobIdValid(jobId, appId); - return ResultDTO.success(jobService.runJob(appId, jobId, instanceParams, delay)); + + RunJobOpenApiRequest runJobOpenApiRequest = new RunJobOpenApiRequest(); + runJobOpenApiRequest.setAppId(appId); + runJobOpenApiRequest.setJobId(jobId); + runJobOpenApiRequest.setInstanceParams(instanceParams); + runJobOpenApiRequest.setDelay(delay); + + return ResultDTO.success(jobService.runJob(appId, runJobOpenApiRequest)); + } + + @PostMapping(OpenAPIConstant.RUN_JOB_PLUS) + public PowerResultDTO runJob(@RequestBody RunJobOpenApiRequest runJobOpenApiRequest) { + checkJobIdValid(runJobOpenApiRequest.getJobId(), runJobOpenApiRequest.getAppId()); + long instanceId = jobService.runJob(runJobOpenApiRequest.getAppId(), runJobOpenApiRequest); + return PowerResultDTO.s(instanceId); } /* ************* Instance 区 ************* */ diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/JobController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/JobController.java index 74431895..2b26a836 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/JobController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/JobController.java @@ -1,6 +1,7 @@ package tech.powerjob.server.web.controller; import org.apache.commons.lang3.StringUtils; +import tech.powerjob.common.request.common.RunJobRequest; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.server.auth.Permission; @@ -75,13 +76,16 @@ public class JobController { } @GetMapping("/run") - @ApiPermission(name = "Job-Copy", roleScope = RoleScope.APP, requiredPermission = Permission.OPS) + @ApiPermission(name = "Job-Run", roleScope = RoleScope.APP, requiredPermission = Permission.OPS) public ResultDTO runImmediately(String appId, String jobId, @RequestParam(required = false) String instanceParams) { - return ResultDTO.success(jobService.runJob(Long.valueOf(appId), Long.valueOf(jobId), instanceParams, 0L)); + RunJobRequest runJobRequest = new RunJobRequest(); + runJobRequest.setJobId(Long.valueOf(jobId)); + runJobRequest.setInstanceParams(instanceParams); + return ResultDTO.success(jobService.runJob(Long.valueOf(appId), runJobRequest)); } @PostMapping("/list") - @ApiPermission(name = "Job-Copy", roleScope = RoleScope.APP, requiredPermission = Permission.READ) + @ApiPermission(name = "Job-List", roleScope = RoleScope.APP, requiredPermission = Permission.READ) public ResultDTO> listJobs(@RequestBody QueryJobInfoRequest request) { Sort sort = Sort.by(Sort.Direction.ASC, "id"); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 443cbfd1..d39bf565 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -549,7 +549,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { } try { - WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(workerRuntime.getAppId(), instanceInfo.getJobId()); + WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(workerRuntime.getAppId(), instanceInfo.getJobId(), instanceId); AskResponse response = TransportUtils.reliableQueryJobCluster(req, currentServerAddress, workerRuntime.getTransporter()); if (!response.isSuccess()) { log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage());