From b3ca7fd6709dbcf2593de463f9b6cc46b7c62f85 Mon Sep 17 00:00:00 2001 From: Echo009 Date: Wed, 3 Mar 2021 22:00:14 +0800 Subject: [PATCH] fix: the problem that can not stop the job instance which is managed by another server --- .../powerjob/server/service/JobService.java | 2 +- .../service/instance/InstanceService.java | 14 ++++++------ .../workflow/WorkflowInstanceService.java | 5 ++++- .../web/controller/InstanceController.java | 22 +++++-------------- .../web/controller/OpenAPIController.java | 2 +- 5 files changed, 18 insertions(+), 27 deletions(-) diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index a8629159..7618200c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -197,7 +197,7 @@ public class JobService { executeLogs.forEach(instance -> { try { // 重复查询了数据库,不过问题不大,这个调用量很小 - instanceService.stopInstance(instance.getInstanceId()); + instanceService.stopInstance(instance.getAppId(),instance.getInstanceId()); } catch (Exception ignore) { // ignore exception } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index 4307987f..9f13d7ca 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -2,14 +2,11 @@ package com.github.kfcfans.powerjob.server.service.instance; import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.model.InstanceDetail; -import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService; -import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.InstanceInfoDTO; import com.github.kfcfans.powerjob.server.common.constans.InstanceType; -import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer; import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils; import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; @@ -17,9 +14,11 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.remote.DispatchService; -import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; -import com.github.kfcfans.powerjob.server.service.id.IdGenerateService; +import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer; import com.github.kfcfans.powerjob.server.remote.transport.TransportService; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; +import com.github.kfcfans.powerjob.server.service.id.IdGenerateService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; @@ -104,9 +103,10 @@ public class InstanceService { * * @param instanceId 任务实例ID */ - public void stopInstance(Long instanceId) { + @DesignateServer(appIdParameterName = "appId") + public void stopInstance(Long appId,Long instanceId) { - log.info("[Instance-{}] try to stop the instance.", instanceId); + log.info("[Instance-{}] try to stop the instance instance in appId: {}", instanceId,appId); try { InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java index 6d5fc14f..77793c75 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java @@ -13,6 +13,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; +import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer; import com.github.kfcfans.powerjob.server.service.instance.InstanceService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; @@ -52,6 +53,7 @@ public class WorkflowInstanceService { * @param wfInstanceId 工作流实例ID * @param appId 所属应用ID */ + @DesignateServer(appIdParameterName = "appId") public void stopWorkflowInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { @@ -67,7 +69,7 @@ public class WorkflowInstanceService { node.setStatus(InstanceStatus.STOPPED.getV()); node.setResult(SystemInstanceResult.STOPPED_BY_USER); // 注意,这里并不保证一定能终止正在运行的实例 - instanceService.stopInstance(node.getInstanceId()); + instanceService.stopInstance(appId,node.getInstanceId()); } } catch (Exception e) { log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSON.toJSONString(node), e); @@ -90,6 +92,7 @@ public class WorkflowInstanceService { * @param wfInstanceId 工作流实例ID * @param appId 应用ID */ + @DesignateServer(appIdParameterName = "appId") public void retryWorkflowInstance(Long wfInstanceId, Long appId) { WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); // 仅允许重试 失败的工作流 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java index d25c6282..bdc4e5c9 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java @@ -16,6 +16,7 @@ import com.github.kfcfans.powerjob.server.service.instance.InstanceService; import com.github.kfcfans.powerjob.server.web.request.QueryInstanceRequest; import com.github.kfcfans.powerjob.server.web.response.InstanceDetailVO; import com.github.kfcfans.powerjob.server.web.response.InstanceInfoVO; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.springframework.beans.BeanUtils; @@ -61,8 +62,8 @@ public class InstanceController { private InstanceInfoRepository instanceInfoRepository; @GetMapping("/stop") - public ResultDTO stopInstance(Long instanceId) { - instanceService.stopInstance(instanceId); + public ResultDTO stopInstance(Long appId,Long instanceId) { + instanceService.stopInstance(appId,instanceId); return ResultDTO.success(null); } @@ -95,7 +96,8 @@ public class InstanceController { } @GetMapping("/downloadLog4Console") - public void downloadLog4Console(Long appId, Long instanceId , HttpServletResponse response) throws Exception { + @SneakyThrows + public void downloadLog4Console(Long appId, Long instanceId , HttpServletResponse response) { // 获取内部下载链接 String downloadUrl = instanceLogService.fetchDownloadUrl(appId, instanceId); // 先下载到本机 @@ -139,18 +141,4 @@ public class InstanceController { return pageResult; } - /** - * 获取该 instanceId 对应的服务器地址 - * @param instanceId 任务实例ID - * @return 对应服务器地址 - */ - private String getTargetServer(Long instanceId) { - InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); - if (instanceInfo == null) { - throw new PowerJobException("invalid instanceId: " + instanceId); - } - - Optional appInfoOpt = appInfoRepository.findById(instanceInfo.getAppId()); - return appInfoOpt.orElseThrow(() -> new PowerJobException("impossible")).getCurrentServer(); - } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java index b2afbc35..7bbd2244 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java @@ -110,7 +110,7 @@ public class OpenAPIController { @PostMapping(OpenAPIConstant.STOP_INSTANCE) public ResultDTO stopInstance(Long instanceId, Long appId) { checkInstanceIdValid(instanceId, appId); - instanceService.stopInstance(instanceId); + instanceService.stopInstance(appId,instanceId); return ResultDTO.success(null); }