fix: runJob or runWorkflow failed when there are multi server in cluster

This commit is contained in:
“tjq” 2020-11-07 21:03:04 +08:00
parent 6924088d16
commit 0cc7cc26b4
6 changed files with 139 additions and 5 deletions

View File

@ -53,4 +53,8 @@ public class AskResponse implements OmsSerializable {
return JsonUtils.parseObject(data, clz);
}
public String getDataAsString() {
return new String(data, StandardCharsets.UTF_8);
}
}

View File

@ -1,8 +1,11 @@
package com.github.kfcfans.powerjob.server.akka;
import akka.actor.*;
import akka.pattern.Patterns;
import akka.routing.RoundRobinPool;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.akka.actors.FriendActor;
import com.github.kfcfans.powerjob.server.akka.actors.ServerActor;
@ -16,8 +19,10 @@ import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletionStage;
/**
* 服务端 ActorSystem 启动器
@ -90,4 +95,19 @@ public class OhMyServer {
String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.WORKER_ACTOR_NAME);
return actorSystem.actorSelection(path);
}
/**
* ASK 其他 powejob-server要求 AskResponse 中的 Data String
* @param address 其他 powejob-server 的地址ip:port
* @param request 请求
* @return 返回值 OR 异常
*/
public static String askFriend(String address, Object request) throws Exception {
CompletionStage<Object> askCS = Patterns.ask(getFriendActor(address), request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get();
if (askResponse.isSuccess()) {
return askResponse.getDataAsString();
}
throw new PowerJobException("remote server process failed:" + askResponse.getMessage());
}
}

View File

@ -1,11 +1,16 @@
package com.github.kfcfans.powerjob.server.akka.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.model.SystemMetrics;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.akka.requests.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.powerjob.server.akka.requests.Ping;
import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.service.JobService;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowService;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@ -22,6 +27,7 @@ public class FriendActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder()
.match(Ping.class, this::onReceivePing)
.match(RunJobOrWorkflowReq.class, this::onReceiveFriendResendRunRequest)
.match(FriendQueryWorkerClusterStatusReq.class, this::onReceiveFriendQueryWorkerClusterStatusReq)
.matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj))
.build();
@ -42,4 +48,27 @@ public class FriendActor extends AbstractActor {
AskResponse askResponse = AskResponse.succeed(workerInfo);
getSender().tell(askResponse, getSelf());
}
/**
* 处理 run 转发
*/
private void onReceiveFriendResendRunRequest(RunJobOrWorkflowReq req) {
try {
Long resultId;
switch (req.getType()) {
case RunJobOrWorkflowReq.WORKFLOW:
resultId = SpringUtils.getBean(WorkflowService.class).runWorkflow(req.getId(), req.getAppId(), req.getParams(), req.getDelay());
break;
case RunJobOrWorkflowReq.JOB:
resultId = SpringUtils.getBean(JobService.class).runJob(req.getId(), req.getParams(), req.getDelay());
break;
default:
throw new PowerJobException("unknown type: " + req.getType());
}
getSender().tell(AskResponse.succeed(String.valueOf(resultId)), getSelf());
} catch (Exception e) {
log.error("[FriendActor] process run request [{}] failed!", req, e);
getSender().tell(AskResponse.failed(e.getMessage()), getSelf());
}
}
}

View File

@ -0,0 +1,27 @@
package com.github.kfcfans.powerjob.server.akka.requests;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 运行 Job 工作流需要转发到 server 进行否则没有集群信息
*
* @author tjq
* @since 11/7/20
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RunJobOrWorkflowReq implements OmsSerializable {
public static final int JOB = 1;
public static final int WORKFLOW = 2;
private int type;
private long id;
private long delay;
private String params;
private long appId;
}

View File

@ -5,11 +5,15 @@ import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
@ -22,6 +26,7 @@ import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
@ -44,6 +49,9 @@ public class JobService {
@Resource
private InstanceInfoRepository instanceInfoRepository;
@Resource
private AppInfoRepository appInfoRepository;
/**
* 保存/修改任务
* @param request 任务请求
@ -103,12 +111,30 @@ public class JobService {
*/
public long runJob(Long jobId, String instanceParams, long delay) {
log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobId, instanceParams, delay);
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
AppInfoDO appInfo = appInfoRepository.findById(jobInfo.getAppId()).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by appId: " + jobInfo.getAppId()));
String targetServer = appInfo.getCurrentServer();
if (Objects.equals(targetServer, OhMyServer.getActorSystemAddress())) {
return realRunJob(jobInfo, instanceParams, delay);
}
// 转发请求
log.info("[Job-{}] redirect run request to target server: {}", jobId, targetServer);
RunJobOrWorkflowReq req = new RunJobOrWorkflowReq(RunJobOrWorkflowReq.JOB, jobId, delay, instanceParams, jobInfo.getAppId());
try {
return Long.parseLong(OhMyServer.askFriend(targetServer, req));
}catch (Exception e) {
log.error("[Job-{}] redirect run request[params={}] to target server[{}] failed!", jobId, instanceParams, targetServer);
throw new PowerJobException("redirect run request failed!", e);
}
}
private long realRunJob(JobInfoDO jobInfo, String instanceParams, long delay) {
log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobInfo.getId(), instanceParams, delay);
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
instanceInfoRepository.flush();
if (delay <= 0) {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
}else {
@ -116,7 +142,7 @@ public class JobService {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
});
}
log.info("[Job-{}] run job successfully, instanceId={}", jobId, instanceId);
log.info("[Job-{}] run job successfully, params= {}, instanceId={}", jobInfo.getId(), instanceParams, instanceId);
return instanceId;
}

View File

@ -5,13 +5,18 @@ import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.WorkflowInfoDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository;
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
@ -24,9 +29,12 @@ import java.util.Date;
* @author tjq
* @since 2020/5/26
*/
@Slf4j
@Service
public class WorkflowService {
@Resource
private AppInfoRepository appInfoRepository;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
@Resource
@ -138,8 +146,28 @@ public class WorkflowService {
public Long runWorkflow(Long wfId, Long appId, String initParams, long delay) {
WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams);
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by appId: " + appId));
String targetServer = OhMyServer.getActorSystemAddress();
if (targetServer.equals(appInfo.getCurrentServer())) {
return realRunWorkflow(wfInfo, initParams, delay);
}
log.info("[WorkflowService-{}] redirect run request to target server: {}", wfId, targetServer);
// 转发请求
RunJobOrWorkflowReq req = new RunJobOrWorkflowReq(RunJobOrWorkflowReq.WORKFLOW, wfId, delay, initParams, appId);
try {
return Long.valueOf(OhMyServer.askFriend(targetServer, req));
}catch (Exception e) {
log.error("[WorkflowService-{}] redirect run request[params={}] to target server[{}] failed!", wfId, initParams, targetServer);
throw new PowerJobException("redirect run request failed!", e);
}
}
private Long realRunWorkflow(WorkflowInfoDO wfInfo, String initParams, long delay) {
log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay);
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams);
if (delay <= 0) {
workflowInstanceManager.start(wfInfo, wfInstanceId, initParams);
}else {