fix: the problem that can not stop the job instance which is managed by another server

This commit is contained in:
Echo009 2021-03-03 22:00:14 +08:00
parent 7575bbd4e1
commit b3ca7fd670
5 changed files with 18 additions and 27 deletions

View File

@ -197,7 +197,7 @@ public class JobService {
executeLogs.forEach(instance -> { executeLogs.forEach(instance -> {
try { try {
// 重复查询了数据库不过问题不大这个调用量很小 // 重复查询了数据库不过问题不大这个调用量很小
instanceService.stopInstance(instance.getInstanceId()); instanceService.stopInstance(instance.getAppId(),instance.getInstanceId());
} catch (Exception ignore) { } catch (Exception ignore) {
// ignore exception // ignore exception
} }

View File

@ -2,14 +2,11 @@ package com.github.kfcfans.powerjob.server.service.instance;
import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.*;
import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq; import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq;
import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.response.InstanceInfoDTO; import com.github.kfcfans.powerjob.common.response.InstanceInfoDTO;
import com.github.kfcfans.powerjob.server.common.constans.InstanceType; import com.github.kfcfans.powerjob.server.common.constans.InstanceType;
import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer;
import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils; import com.github.kfcfans.powerjob.server.common.utils.QueryConvertUtils;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture; import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
@ -17,9 +14,11 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.remote.DispatchService; import com.github.kfcfans.powerjob.server.remote.DispatchService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer;
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
import com.github.kfcfans.powerjob.server.remote.transport.TransportService; import com.github.kfcfans.powerjob.server.remote.transport.TransportService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterQueryService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -104,9 +103,10 @@ public class InstanceService {
* *
* @param instanceId 任务实例ID * @param instanceId 任务实例ID
*/ */
public void stopInstance(Long instanceId) { @DesignateServer(appIdParameterName = "appId")
public void stopInstance(Long appId,Long instanceId) {
log.info("[Instance-{}] try to stop the instance.", instanceId); log.info("[Instance-{}] try to stop the instance instance in appId: {}", instanceId,appId);
try { try {
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId); InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);

View File

@ -13,6 +13,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.powerjob.server.remote.server.redirector.DesignateServer;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService; import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
@ -52,6 +53,7 @@ public class WorkflowInstanceService {
* @param wfInstanceId 工作流实例ID * @param wfInstanceId 工作流实例ID
* @param appId 所属应用ID * @param appId 所属应用ID
*/ */
@DesignateServer(appIdParameterName = "appId")
public void stopWorkflowInstance(Long wfInstanceId, Long appId) { public void stopWorkflowInstance(Long wfInstanceId, Long appId) {
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) { if (!WorkflowInstanceStatus.GENERALIZED_RUNNING_STATUS.contains(wfInstance.getStatus())) {
@ -67,7 +69,7 @@ public class WorkflowInstanceService {
node.setStatus(InstanceStatus.STOPPED.getV()); node.setStatus(InstanceStatus.STOPPED.getV());
node.setResult(SystemInstanceResult.STOPPED_BY_USER); node.setResult(SystemInstanceResult.STOPPED_BY_USER);
// 注意这里并不保证一定能终止正在运行的实例 // 注意这里并不保证一定能终止正在运行的实例
instanceService.stopInstance(node.getInstanceId()); instanceService.stopInstance(appId,node.getInstanceId());
} }
} catch (Exception e) { } catch (Exception e) {
log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSON.toJSONString(node), e); log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSON.toJSONString(node), e);
@ -90,6 +92,7 @@ public class WorkflowInstanceService {
* @param wfInstanceId 工作流实例ID * @param wfInstanceId 工作流实例ID
* @param appId 应用ID * @param appId 应用ID
*/ */
@DesignateServer(appIdParameterName = "appId")
public void retryWorkflowInstance(Long wfInstanceId, Long appId) { public void retryWorkflowInstance(Long wfInstanceId, Long appId) {
WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId); WorkflowInstanceInfoDO wfInstance = fetchWfInstance(wfInstanceId, appId);
// 仅允许重试 失败的工作流 // 仅允许重试 失败的工作流

View File

@ -16,6 +16,7 @@ import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.github.kfcfans.powerjob.server.web.request.QueryInstanceRequest; import com.github.kfcfans.powerjob.server.web.request.QueryInstanceRequest;
import com.github.kfcfans.powerjob.server.web.response.InstanceDetailVO; import com.github.kfcfans.powerjob.server.web.response.InstanceDetailVO;
import com.github.kfcfans.powerjob.server.web.response.InstanceInfoVO; import com.github.kfcfans.powerjob.server.web.response.InstanceInfoVO;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
@ -61,8 +62,8 @@ public class InstanceController {
private InstanceInfoRepository instanceInfoRepository; private InstanceInfoRepository instanceInfoRepository;
@GetMapping("/stop") @GetMapping("/stop")
public ResultDTO<Void> stopInstance(Long instanceId) { public ResultDTO<Void> stopInstance(Long appId,Long instanceId) {
instanceService.stopInstance(instanceId); instanceService.stopInstance(appId,instanceId);
return ResultDTO.success(null); return ResultDTO.success(null);
} }
@ -95,7 +96,8 @@ public class InstanceController {
} }
@GetMapping("/downloadLog4Console") @GetMapping("/downloadLog4Console")
public void downloadLog4Console(Long appId, Long instanceId , HttpServletResponse response) throws Exception { @SneakyThrows
public void downloadLog4Console(Long appId, Long instanceId , HttpServletResponse response) {
// 获取内部下载链接 // 获取内部下载链接
String downloadUrl = instanceLogService.fetchDownloadUrl(appId, instanceId); String downloadUrl = instanceLogService.fetchDownloadUrl(appId, instanceId);
// 先下载到本机 // 先下载到本机
@ -139,18 +141,4 @@ public class InstanceController {
return pageResult; return pageResult;
} }
/**
* 获取该 instanceId 对应的服务器地址
* @param instanceId 任务实例ID
* @return 对应服务器地址
*/
private String getTargetServer(Long instanceId) {
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo == null) {
throw new PowerJobException("invalid instanceId: " + instanceId);
}
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(instanceInfo.getAppId());
return appInfoOpt.orElseThrow(() -> new PowerJobException("impossible")).getCurrentServer();
}
} }

View File

@ -110,7 +110,7 @@ public class OpenAPIController {
@PostMapping(OpenAPIConstant.STOP_INSTANCE) @PostMapping(OpenAPIConstant.STOP_INSTANCE)
public ResultDTO<Void> stopInstance(Long instanceId, Long appId) { public ResultDTO<Void> stopInstance(Long instanceId, Long appId) {
checkInstanceIdValid(instanceId, appId); checkInstanceIdValid(instanceId, appId);
instanceService.stopInstance(instanceId); instanceService.stopInstance(appId,instanceId);
return ResultDTO.success(null); return ResultDTO.success(null);
} }