mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
feat: runJob support dynamic designatedWorkers #978
This commit is contained in:
parent
cbeabd20c9
commit
155e471939
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.client;
|
||||
|
||||
import tech.powerjob.common.request.http.RunJobOpenApiRequest;
|
||||
import tech.powerjob.common.request.http.SaveJobInfoRequest;
|
||||
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
|
||||
import tech.powerjob.common.request.http.SaveWorkflowRequest;
|
||||
@ -38,6 +39,8 @@ public interface IPowerJobClient {
|
||||
|
||||
ResultDTO<Long> runJob(Long jobId, String instanceParams, long delayMS);
|
||||
|
||||
PowerResultDTO<Long> runJob(RunJobOpenApiRequest request);
|
||||
|
||||
/* ************* Instance API list ************* */
|
||||
|
||||
ResultDTO<Void> stopInstance(Long instanceId);
|
||||
|
@ -14,6 +14,7 @@ import tech.powerjob.common.OpenAPIConstant;
|
||||
import tech.powerjob.common.enums.EncryptType;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.common.request.http.RunJobOpenApiRequest;
|
||||
import tech.powerjob.common.request.http.SaveJobInfoRequest;
|
||||
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
|
||||
import tech.powerjob.common.request.http.SaveWorkflowRequest;
|
||||
@ -237,21 +238,28 @@ public class PowerJobClient implements IPowerJobClient, Closeable {
|
||||
*/
|
||||
@Override
|
||||
public ResultDTO<Long> runJob(Long jobId, String instanceParams, long delayMS) {
|
||||
|
||||
Map<String, String> param = Maps.newHashMap();
|
||||
param.put("jobId", jobId.toString());
|
||||
param.put("appId", appId.toString());
|
||||
param.put("delay", String.valueOf(delayMS));
|
||||
|
||||
if (StringUtils.isNotEmpty(instanceParams)) {
|
||||
param.put("instanceParams", instanceParams);
|
||||
}
|
||||
String post = requestService.request(OpenAPIConstant.RUN_JOB, PowerRequestBody.newFormRequestBody(param));
|
||||
return JSON.parseObject(post, LONG_RESULT_TYPE);
|
||||
RunJobOpenApiRequest runJobOpenApiRequest = new RunJobOpenApiRequest();
|
||||
runJobOpenApiRequest.setJobId(jobId);
|
||||
runJobOpenApiRequest.setInstanceParams(instanceParams);
|
||||
runJobOpenApiRequest.setDelay(delayMS);
|
||||
return runJob(runJobOpenApiRequest);
|
||||
}
|
||||
|
||||
public ResultDTO<Long> runJob(Long jobId) {
|
||||
return runJob(jobId, null, 0);
|
||||
RunJobOpenApiRequest runJobOpenApiRequest = new RunJobOpenApiRequest();
|
||||
runJobOpenApiRequest.setJobId(jobId);
|
||||
return runJob(runJobOpenApiRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PowerResultDTO<Long> runJob(RunJobOpenApiRequest request) {
|
||||
|
||||
request.setAppId(appId);
|
||||
|
||||
// 中坑记录:用 FastJSON 序列化会导致 Server 接收时 pEWorkflowDAG 为 null,无语.jpg
|
||||
String json = JsonUtils.toJSONStringUnsafe(request);
|
||||
String post = requestService.request(OpenAPIConstant.RUN_JOB_PLUS, PowerRequestBody.newJsonRequestBody(json));
|
||||
return JSON.parseObject(post, LONG_POWER_RESULT_TYPE);
|
||||
}
|
||||
|
||||
/* ************* Instance API list ************* */
|
||||
|
@ -22,6 +22,8 @@ public class TypeStore {
|
||||
|
||||
public static final TypeReference<ResultDTO<Long>> LONG_RESULT_TYPE = new TypeReference<ResultDTO<Long>>(){};
|
||||
|
||||
public static final TypeReference<PowerResultDTO<Long>> LONG_POWER_RESULT_TYPE = new TypeReference<PowerResultDTO<Long>>(){};
|
||||
|
||||
public static final TypeReference<ResultDTO<JobInfoDTO>> JOB_RESULT_TYPE = new TypeReference<ResultDTO<JobInfoDTO>>(){};
|
||||
|
||||
public static final TypeReference<ResultDTO<SaveJobInfoRequest>> SAVE_JOB_INFO_REQUEST_RESULT_TYPE = new TypeReference<ResultDTO<SaveJobInfoRequest>>(){};
|
||||
|
@ -32,6 +32,8 @@ public class OpenAPIConstant {
|
||||
public static final String DELETE_JOB = "/deleteJob";
|
||||
public static final String RUN_JOB = "/runJob";
|
||||
|
||||
public static final String RUN_JOB_PLUS = "/runJobPlus";
|
||||
|
||||
/* ************* Instance 区 ************* */
|
||||
|
||||
public static final String STOP_INSTANCE = "/stopInstance";
|
||||
|
@ -17,4 +17,6 @@ import lombok.NoArgsConstructor;
|
||||
public class WorkerQueryExecutorClusterReq implements PowerSerializable {
|
||||
private Long appId;
|
||||
private Long jobId;
|
||||
|
||||
private Long instanceId;
|
||||
}
|
||||
|
@ -0,0 +1,33 @@
|
||||
package tech.powerjob.common.request.common;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* 运行任务
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/8/23
|
||||
*/
|
||||
@Data
|
||||
public class RunJobRequest {
|
||||
|
||||
/**
|
||||
* 目标任务ID
|
||||
*/
|
||||
private Long jobId;
|
||||
|
||||
/**
|
||||
* 任务实例参数
|
||||
*/
|
||||
private String instanceParams;
|
||||
|
||||
/**
|
||||
* 延迟执行时间
|
||||
*/
|
||||
private Long delay;
|
||||
|
||||
/**
|
||||
* 指定机器运行,空代表不限,非空则只会使用其中的机器运行(多值逗号分割)
|
||||
*/
|
||||
private String designatedWorkers;
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package tech.powerjob.common.request.http;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import tech.powerjob.common.request.common.RunJobRequest;
|
||||
|
||||
/**
|
||||
* RunJobOpenApiRequest
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2024/8/23
|
||||
*/
|
||||
@Data
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class RunJobOpenApiRequest extends RunJobRequest {
|
||||
/**
|
||||
* 自动填充 appId
|
||||
*/
|
||||
private Long appId;
|
||||
}
|
@ -148,7 +148,7 @@ public class DispatchService {
|
||||
}
|
||||
|
||||
// 获取当前最合适的 worker 列表
|
||||
List<WorkerInfo> suitableWorkers = workerClusterQueryService.geAvailableWorkers(jobInfo);
|
||||
List<WorkerInfo> suitableWorkers = workerClusterQueryService.geAvailableWorkers(jobInfo, instanceInfo);
|
||||
|
||||
if (CollectionUtils.isEmpty(suitableWorkers)) {
|
||||
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available", jobId, instanceId);
|
||||
|
@ -6,13 +6,13 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.core.env.Environment;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.enums.SwitchableStatus;
|
||||
import tech.powerjob.common.request.*;
|
||||
import tech.powerjob.common.response.AskResponse;
|
||||
import tech.powerjob.common.serialize.JsonUtils;
|
||||
import tech.powerjob.common.utils.NetUtils;
|
||||
import tech.powerjob.remote.framework.actor.Handler;
|
||||
import tech.powerjob.remote.framework.actor.ProcessType;
|
||||
import tech.powerjob.common.enums.SwitchableStatus;
|
||||
import tech.powerjob.server.common.module.WorkerInfo;
|
||||
import tech.powerjob.server.common.utils.SpringUtils;
|
||||
import tech.powerjob.server.monitor.MonitorService;
|
||||
@ -20,8 +20,10 @@ import tech.powerjob.server.monitor.events.w2s.TtReportInstanceStatusEvent;
|
||||
import tech.powerjob.server.monitor.events.w2s.WorkerHeartbeatEvent;
|
||||
import tech.powerjob.server.monitor.events.w2s.WorkerLogReportEvent;
|
||||
import tech.powerjob.server.persistence.remote.model.ContainerInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository;
|
||||
import tech.powerjob.server.persistence.remote.repository.JobInfoRepository;
|
||||
import tech.powerjob.server.remote.worker.WorkerClusterQueryService;
|
||||
|
||||
@ -129,18 +131,26 @@ public abstract class AbWorkerRequestHandler implements IWorkerRequestHandler {
|
||||
|
||||
JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class);
|
||||
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
|
||||
if (jobInfoOpt.isPresent()) {
|
||||
|
||||
if (!jobInfoOpt.isPresent()) {
|
||||
return AskResponse.failed("can't find jobInfo by jobId: " + jobId);
|
||||
}
|
||||
|
||||
InstanceInfoRepository instanceInfoRepository = SpringUtils.getBean(InstanceInfoRepository.class);
|
||||
InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(req.getInstanceId());
|
||||
if (instanceInfoDO == null) {
|
||||
return AskResponse.failed("can't find instanceInfo by instanceId: " + req.getInstanceId());
|
||||
}
|
||||
|
||||
JobInfoDO jobInfo = jobInfoOpt.get();
|
||||
if (!jobInfo.getAppId().equals(appId)) {
|
||||
askResponse = AskResponse.failed("Permission Denied!");
|
||||
}else {
|
||||
List<String> sortedAvailableWorker = workerClusterQueryService.geAvailableWorkers(jobInfo)
|
||||
List<String> sortedAvailableWorker = workerClusterQueryService.geAvailableWorkers(jobInfo, instanceInfoDO)
|
||||
.stream().map(WorkerInfo::getAddress).collect(Collectors.toList());
|
||||
askResponse = AskResponse.succeed(sortedAvailableWorker);
|
||||
}
|
||||
}else {
|
||||
askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId);
|
||||
}
|
||||
|
||||
return askResponse;
|
||||
}
|
||||
|
||||
|
@ -80,7 +80,7 @@ public class InstanceService {
|
||||
* @param expectTriggerTime 预期执行时间
|
||||
* @return 任务实例ID
|
||||
*/
|
||||
public InstanceInfoDO create(Long jobId, Long appId, String jobParams, String instanceParams, Long wfInstanceId, Long expectTriggerTime) {
|
||||
public InstanceInfoDO create(Long jobId, Long appId, String jobParams, String instanceParams, Long wfInstanceId, Long expectTriggerTime, String designatedWorkers) {
|
||||
|
||||
Long instanceId = idGenerateService.allocate();
|
||||
Date now = new Date();
|
||||
@ -93,6 +93,7 @@ public class InstanceService {
|
||||
newInstanceInfo.setInstanceParams(instanceParams);
|
||||
newInstanceInfo.setType(wfInstanceId == null ? InstanceType.NORMAL.getV() : InstanceType.WORKFLOW.getV());
|
||||
newInstanceInfo.setWfInstanceId(wfInstanceId);
|
||||
newInstanceInfo.setDesignatedWorkers(designatedWorkers);
|
||||
|
||||
newInstanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
|
||||
newInstanceInfo.setRunningTimes(0L);
|
||||
|
@ -12,6 +12,7 @@ import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.model.LifeCycle;
|
||||
import tech.powerjob.common.enums.SwitchableStatus;
|
||||
import tech.powerjob.common.request.common.RunJobRequest;
|
||||
import tech.powerjob.server.common.timewheel.holder.InstanceTimeWheelService;
|
||||
import tech.powerjob.server.core.DispatchService;
|
||||
import tech.powerjob.server.core.instance.InstanceService;
|
||||
@ -167,7 +168,7 @@ public class PowerScheduleService {
|
||||
log.info("[NormalScheduler] These {} jobs will be scheduled: {}.", timeExpressionType.name(), jobInfos);
|
||||
|
||||
jobInfos.forEach(jobInfo -> {
|
||||
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime()).getInstanceId();
|
||||
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime(), jobInfo.getDesignatedWorkers()).getInstanceId();
|
||||
jobId2InstanceId.put(jobInfo.getId(), instanceId);
|
||||
});
|
||||
instanceInfoRepository.flush();
|
||||
@ -276,7 +277,10 @@ public class PowerScheduleService {
|
||||
log.info("[FrequentScheduler] disable frequent job,id:{}.", jobInfoDO.getId());
|
||||
} else if (lifeCycle.getStart() == null || lifeCycle.getStart() < System.currentTimeMillis() + SCHEDULE_RATE * 2) {
|
||||
log.info("[FrequentScheduler] schedule frequent job,id:{}.", jobInfoDO.getId());
|
||||
jobService.runJob(jobInfoDO.getAppId(), jobId, null, Optional.ofNullable(lifeCycle.getStart()).orElse(0L) - System.currentTimeMillis());
|
||||
RunJobRequest runJobRequest = new RunJobRequest();
|
||||
runJobRequest.setJobId(jobId);
|
||||
runJobRequest.setDelay(Optional.ofNullable(lifeCycle.getStart()).orElse(0L) - System.currentTimeMillis());
|
||||
jobService.runJob(jobInfoDO.getAppId(), runJobRequest);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
@ -1,6 +1,7 @@
|
||||
package tech.powerjob.server.core.service;
|
||||
|
||||
import tech.powerjob.common.PowerQuery;
|
||||
import tech.powerjob.common.request.common.RunJobRequest;
|
||||
import tech.powerjob.common.request.http.SaveJobInfoRequest;
|
||||
import tech.powerjob.common.response.JobInfoDTO;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
@ -25,7 +26,7 @@ public interface JobService {
|
||||
|
||||
List<JobInfoDTO> queryJob(PowerQuery powerQuery);
|
||||
|
||||
long runJob(Long appId, Long jobId, String instanceParams, Long delay);
|
||||
long runJob(Long appId, RunJobRequest runJobRequest);
|
||||
|
||||
void deleteJob(Long jobId);
|
||||
|
||||
|
@ -14,6 +14,7 @@ import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.common.model.AlarmConfig;
|
||||
import tech.powerjob.common.model.LifeCycle;
|
||||
import tech.powerjob.common.request.common.RunJobRequest;
|
||||
import tech.powerjob.common.request.http.SaveJobInfoRequest;
|
||||
import tech.powerjob.common.response.JobInfoDTO;
|
||||
import tech.powerjob.common.serialize.JsonUtils;
|
||||
@ -170,33 +171,34 @@ public class JobServiceImpl implements JobService {
|
||||
}
|
||||
|
||||
/**
|
||||
* 手动立即运行某个任务
|
||||
*
|
||||
* @param jobId 任务ID
|
||||
* @param instanceParams 任务实例参数(仅 OpenAPI 存在)
|
||||
* @param delay 延迟时间,单位 毫秒
|
||||
* 立即运行某个任务
|
||||
* @param appId appId,用于集群路由
|
||||
* @param runJobRequest 请求
|
||||
* @return 任务实例ID
|
||||
*/
|
||||
@Override
|
||||
@DesignateServer
|
||||
public long runJob(Long appId, Long jobId, String instanceParams, Long delay) {
|
||||
public long runJob(Long appId, RunJobRequest runJobRequest) {
|
||||
|
||||
Long jobId = runJobRequest.getJobId();
|
||||
long delay = runJobRequest.getDelay() == null ? 0 : runJobRequest.getDelay();
|
||||
|
||||
delay = delay == null ? 0 : delay;
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
|
||||
|
||||
log.info("[Job-{}] try to run job in app[{}], instanceParams={},delay={} ms.", jobInfo.getId(), appId, instanceParams, delay);
|
||||
final InstanceInfoDO instanceInfo = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
|
||||
log.info("[Job-{}] try to run job in app[{}], runJobRequest: {}", jobInfo.getId(), appId, runJobRequest);
|
||||
|
||||
String designatedWorkers = Optional.ofNullable(runJobRequest.getDesignatedWorkers()).orElse(jobInfo.getDesignatedWorkers());
|
||||
final InstanceInfoDO instanceInfo = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), runJobRequest.getInstanceParams(), null, System.currentTimeMillis() + Math.max(delay, 0), designatedWorkers);
|
||||
instanceInfoRepository.flush();
|
||||
if (delay <= 0) {
|
||||
dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.of(instanceInfo),Optional.empty());
|
||||
} else {
|
||||
InstanceTimeWheelService.schedule(instanceInfo.getInstanceId(), delay, () -> dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.empty(),Optional.empty()));
|
||||
}
|
||||
log.info("[Job-{}|{}] execute 'runJob' successfully, params={}", jobInfo.getId(), instanceInfo.getInstanceId(), instanceParams);
|
||||
log.info("[Job-{}|{}] execute 'runJob' successfully", jobInfo.getId(), instanceInfo.getInstanceId());
|
||||
return instanceInfo.getInstanceId();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 删除某个任务
|
||||
*
|
||||
|
@ -30,8 +30,9 @@ public class JobNodeHandler implements TaskNodeHandler {
|
||||
|
||||
@Override
|
||||
public void createTaskInstance(PEWorkflowDAG.Node node, PEWorkflowDAG dag, WorkflowInstanceInfoDO wfInstanceInfo) {
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseGet(JobInfoDO::new);
|
||||
// instanceParam 传递的是工作流实例的 wfContext
|
||||
Long instanceId = SpringUtils.getBean(InstanceService.class).create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis()).getInstanceId();
|
||||
Long instanceId = SpringUtils.getBean(InstanceService.class).create(node.getJobId(), wfInstanceInfo.getAppId(), node.getNodeParams(), wfInstanceInfo.getWfContext(), wfInstanceInfo.getWfInstanceId(), System.currentTimeMillis(), jobInfo.getDesignatedWorkers()).getInstanceId();
|
||||
node.setInstanceId(instanceId);
|
||||
node.setStatus(InstanceStatus.RUNNING.getV());
|
||||
log.info("[Workflow-{}|{}] create readyNode(JOB) instance(nodeId={},jobId={},instanceId={}) successfully~", wfInstanceInfo.getWorkflowId(), wfInstanceInfo.getWfInstanceId(), node.getNodeId(), node.getJobId(), instanceId);
|
||||
|
@ -104,4 +104,12 @@ public class InstanceInfoDO {
|
||||
|
||||
private Date gmtModified;
|
||||
|
||||
/**
|
||||
* 指定机器运行,空代表不限,非空则只会使用其中的机器运行(多值逗号分割)
|
||||
*/
|
||||
private String designatedWorkers;
|
||||
/**
|
||||
* 扩展参数
|
||||
*/
|
||||
private String extra;
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import tech.powerjob.common.model.DeployedContainerInfo;
|
||||
import tech.powerjob.server.common.module.WorkerInfo;
|
||||
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import tech.powerjob.server.remote.server.redirector.DesignateServer;
|
||||
import tech.powerjob.server.remote.worker.filter.WorkerFilter;
|
||||
@ -34,14 +35,15 @@ public class WorkerClusterQueryService {
|
||||
* get worker for job
|
||||
*
|
||||
* @param jobInfo job
|
||||
* @param instanceInfo instanceInfo
|
||||
* @return worker cluster info, sorted by metrics desc
|
||||
*/
|
||||
public List<WorkerInfo> geAvailableWorkers(JobInfoDO jobInfo) {
|
||||
public List<WorkerInfo> geAvailableWorkers(JobInfoDO jobInfo, InstanceInfoDO instanceInfo) {
|
||||
|
||||
List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values());
|
||||
|
||||
// 过滤不符合要求的机器
|
||||
workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo));
|
||||
workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo, instanceInfo));
|
||||
|
||||
// 限定集群大小(0代表不限制)
|
||||
if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) {
|
||||
@ -120,11 +122,12 @@ public class WorkerClusterQueryService {
|
||||
*
|
||||
* @param workerInfo worker info
|
||||
* @param jobInfo job info
|
||||
* @param instanceInfoDO instanceInfo
|
||||
* @return filter this worker when return true
|
||||
*/
|
||||
private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) {
|
||||
private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo, InstanceInfoDO instanceInfoDO) {
|
||||
for (WorkerFilter filter : workerFilters) {
|
||||
if (filter.filter(workerInfo, jobInfo)) {
|
||||
if (filter.filter(workerInfo, jobInfo, instanceInfoDO)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -6,8 +6,10 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
import tech.powerjob.server.common.SJ;
|
||||
import tech.powerjob.server.common.module.WorkerInfo;
|
||||
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
@ -21,9 +23,10 @@ import java.util.Set;
|
||||
public class DesignatedWorkerFilter implements WorkerFilter {
|
||||
|
||||
@Override
|
||||
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
|
||||
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo, InstanceInfoDO instanceInfoDO) {
|
||||
|
||||
String designatedWorkers = jobInfo.getDesignatedWorkers();
|
||||
// 优先取 instance 上的指定运行时配置
|
||||
String designatedWorkers = Optional.ofNullable(instanceInfoDO.getDesignatedWorkers()).orElse(jobInfo.getDesignatedWorkers());
|
||||
|
||||
// no worker is specified, no filter of any
|
||||
if (StringUtils.isEmpty(designatedWorkers)) {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.server.remote.worker.filter;
|
||||
|
||||
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import tech.powerjob.server.common.module.WorkerInfo;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -16,7 +17,7 @@ import org.springframework.stereotype.Component;
|
||||
public class DisconnectedWorkerFilter implements WorkerFilter {
|
||||
|
||||
@Override
|
||||
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
|
||||
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo, InstanceInfoDO instanceInfoDO) {
|
||||
boolean timeout = workerInfo.timeout();
|
||||
if (timeout) {
|
||||
log.info("[Job-{}] filter worker[{}] due to timeout(lastActiveTime={})", jobInfo.getId(), workerInfo.getAddress(), workerInfo.getLastActiveTime());
|
||||
|
@ -1,6 +1,7 @@
|
||||
package tech.powerjob.server.remote.worker.filter;
|
||||
|
||||
import tech.powerjob.common.model.SystemMetrics;
|
||||
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import tech.powerjob.server.common.module.WorkerInfo;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -17,7 +18,7 @@ import org.springframework.stereotype.Component;
|
||||
public class SystemMetricsWorkerFilter implements WorkerFilter {
|
||||
|
||||
@Override
|
||||
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) {
|
||||
public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo, InstanceInfoDO instanceInfoDO) {
|
||||
SystemMetrics metrics = workerInfo.getSystemMetrics();
|
||||
boolean filter = !metrics.available(jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace());
|
||||
if (filter) {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.server.remote.worker.filter;
|
||||
|
||||
import tech.powerjob.server.persistence.remote.model.InstanceInfoDO;
|
||||
import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import tech.powerjob.server.common.module.WorkerInfo;
|
||||
|
||||
@ -17,5 +18,5 @@ public interface WorkerFilter {
|
||||
* @param jobInfoDO job info
|
||||
* @return true will remove the worker in process list
|
||||
*/
|
||||
boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO);
|
||||
boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO, InstanceInfoDO instanceInfoDO);
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import tech.powerjob.common.PowerQuery;
|
||||
import tech.powerjob.common.enums.ErrorCodes;
|
||||
import tech.powerjob.common.enums.InstanceStatus;
|
||||
import tech.powerjob.common.exception.PowerJobException;
|
||||
import tech.powerjob.common.request.http.RunJobOpenApiRequest;
|
||||
import tech.powerjob.common.request.http.SaveJobInfoRequest;
|
||||
import tech.powerjob.common.request.http.SaveWorkflowNodeRequest;
|
||||
import tech.powerjob.common.request.http.SaveWorkflowRequest;
|
||||
@ -146,10 +147,25 @@ public class OpenAPIController {
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@PostMapping(OpenAPIConstant.RUN_JOB)
|
||||
public ResultDTO<Long> runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams, @RequestParam(required = false) Long delay) {
|
||||
checkJobIdValid(jobId, appId);
|
||||
return ResultDTO.success(jobService.runJob(appId, jobId, instanceParams, delay));
|
||||
|
||||
RunJobOpenApiRequest runJobOpenApiRequest = new RunJobOpenApiRequest();
|
||||
runJobOpenApiRequest.setAppId(appId);
|
||||
runJobOpenApiRequest.setJobId(jobId);
|
||||
runJobOpenApiRequest.setInstanceParams(instanceParams);
|
||||
runJobOpenApiRequest.setDelay(delay);
|
||||
|
||||
return ResultDTO.success(jobService.runJob(appId, runJobOpenApiRequest));
|
||||
}
|
||||
|
||||
@PostMapping(OpenAPIConstant.RUN_JOB_PLUS)
|
||||
public PowerResultDTO<Long> runJob(@RequestBody RunJobOpenApiRequest runJobOpenApiRequest) {
|
||||
checkJobIdValid(runJobOpenApiRequest.getJobId(), runJobOpenApiRequest.getAppId());
|
||||
long instanceId = jobService.runJob(runJobOpenApiRequest.getAppId(), runJobOpenApiRequest);
|
||||
return PowerResultDTO.s(instanceId);
|
||||
}
|
||||
|
||||
/* ************* Instance 区 ************* */
|
||||
|
@ -1,6 +1,7 @@
|
||||
package tech.powerjob.server.web.controller;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import tech.powerjob.common.request.common.RunJobRequest;
|
||||
import tech.powerjob.common.request.http.SaveJobInfoRequest;
|
||||
import tech.powerjob.common.response.ResultDTO;
|
||||
import tech.powerjob.server.auth.Permission;
|
||||
@ -75,13 +76,16 @@ public class JobController {
|
||||
}
|
||||
|
||||
@GetMapping("/run")
|
||||
@ApiPermission(name = "Job-Copy", roleScope = RoleScope.APP, requiredPermission = Permission.OPS)
|
||||
@ApiPermission(name = "Job-Run", roleScope = RoleScope.APP, requiredPermission = Permission.OPS)
|
||||
public ResultDTO<Long> runImmediately(String appId, String jobId, @RequestParam(required = false) String instanceParams) {
|
||||
return ResultDTO.success(jobService.runJob(Long.valueOf(appId), Long.valueOf(jobId), instanceParams, 0L));
|
||||
RunJobRequest runJobRequest = new RunJobRequest();
|
||||
runJobRequest.setJobId(Long.valueOf(jobId));
|
||||
runJobRequest.setInstanceParams(instanceParams);
|
||||
return ResultDTO.success(jobService.runJob(Long.valueOf(appId), runJobRequest));
|
||||
}
|
||||
|
||||
@PostMapping("/list")
|
||||
@ApiPermission(name = "Job-Copy", roleScope = RoleScope.APP, requiredPermission = Permission.READ)
|
||||
@ApiPermission(name = "Job-List", roleScope = RoleScope.APP, requiredPermission = Permission.READ)
|
||||
public ResultDTO<PageResult<JobInfoVO>> listJobs(@RequestBody QueryJobInfoRequest request) {
|
||||
|
||||
Sort sort = Sort.by(Sort.Direction.ASC, "id");
|
||||
|
@ -549,7 +549,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
|
||||
}
|
||||
|
||||
try {
|
||||
WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(workerRuntime.getAppId(), instanceInfo.getJobId());
|
||||
WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(workerRuntime.getAppId(), instanceInfo.getJobId(), instanceId);
|
||||
AskResponse response = TransportUtils.reliableQueryJobCluster(req, currentServerAddress, workerRuntime.getTransporter());
|
||||
if (!response.isSuccess()) {
|
||||
log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage());
|
||||
|
Loading…
x
Reference in New Issue
Block a user