diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java index b6fc9d28..b41dba7d 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java @@ -53,4 +53,8 @@ public class AskResponse implements OmsSerializable { return JsonUtils.parseObject(data, clz); } + public String getDataAsString() { + return new String(data, StandardCharsets.UTF_8); + } + } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java index f4568e0e..513c9bf2 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java @@ -1,8 +1,11 @@ package com.github.kfcfans.powerjob.server.akka; import akka.actor.*; +import akka.pattern.Patterns; import akka.routing.RoundRobinPool; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.RemoteConstant; +import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.akka.actors.FriendActor; import com.github.kfcfans.powerjob.server.akka.actors.ServerActor; @@ -16,8 +19,10 @@ import com.typesafe.config.ConfigFactory; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import java.time.Duration; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletionStage; /** * 服务端 ActorSystem 启动器 @@ -90,4 +95,19 @@ public class OhMyServer { String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.WORKER_ACTOR_NAME); return actorSystem.actorSelection(path); } + + /** + * ASK 其他 powejob-server,要求 AskResponse 中的 Data 为 String + * @param address 其他 powejob-server 的地址(ip:port) + * @param request 请求 + * @return 返回值 OR 异常 + */ + public static String askFriend(String address, Object request) throws Exception { + CompletionStage askCS = Patterns.ask(getFriendActor(address), request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(); + if (askResponse.isSuccess()) { + return askResponse.getDataAsString(); + } + throw new PowerJobException("remote server process failed:" + askResponse.getMessage()); + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/FriendActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/FriendActor.java index f36aaa22..eb28c112 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/FriendActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/FriendActor.java @@ -1,11 +1,16 @@ package com.github.kfcfans.powerjob.server.akka.actors; import akka.actor.AbstractActor; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.model.SystemMetrics; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.server.akka.requests.FriendQueryWorkerClusterStatusReq; import com.github.kfcfans.powerjob.server.akka.requests.Ping; +import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq; +import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; +import com.github.kfcfans.powerjob.server.service.JobService; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; +import com.github.kfcfans.powerjob.server.service.workflow.WorkflowService; import lombok.extern.slf4j.Slf4j; import java.util.Map; @@ -22,6 +27,7 @@ public class FriendActor extends AbstractActor { public Receive createReceive() { return receiveBuilder() .match(Ping.class, this::onReceivePing) + .match(RunJobOrWorkflowReq.class, this::onReceiveFriendResendRunRequest) .match(FriendQueryWorkerClusterStatusReq.class, this::onReceiveFriendQueryWorkerClusterStatusReq) .matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj)) .build(); @@ -42,4 +48,27 @@ public class FriendActor extends AbstractActor { AskResponse askResponse = AskResponse.succeed(workerInfo); getSender().tell(askResponse, getSelf()); } + + /** + * 处理 run 转发 + */ + private void onReceiveFriendResendRunRequest(RunJobOrWorkflowReq req) { + try { + Long resultId; + switch (req.getType()) { + case RunJobOrWorkflowReq.WORKFLOW: + resultId = SpringUtils.getBean(WorkflowService.class).runWorkflow(req.getId(), req.getAppId(), req.getParams(), req.getDelay()); + break; + case RunJobOrWorkflowReq.JOB: + resultId = SpringUtils.getBean(JobService.class).runJob(req.getId(), req.getParams(), req.getDelay()); + break; + default: + throw new PowerJobException("unknown type: " + req.getType()); + } + getSender().tell(AskResponse.succeed(String.valueOf(resultId)), getSelf()); + } catch (Exception e) { + log.error("[FriendActor] process run request [{}] failed!", req, e); + getSender().tell(AskResponse.failed(e.getMessage()), getSelf()); + } + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/requests/RunJobOrWorkflowReq.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/requests/RunJobOrWorkflowReq.java new file mode 100644 index 00000000..517be316 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/requests/RunJobOrWorkflowReq.java @@ -0,0 +1,27 @@ +package com.github.kfcfans.powerjob.server.akka.requests; + +import com.github.kfcfans.powerjob.common.OmsSerializable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 运行 Job 或 工作流,需要转发到 server 进行,否则没有集群信息 + * + * @author tjq + * @since 11/7/20 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class RunJobOrWorkflowReq implements OmsSerializable { + public static final int JOB = 1; + public static final int WORKFLOW = 2; + + private int type; + private long id; + private long delay; + private String params; + + private long appId; +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java index ad81184f..cc6ee4c9 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java @@ -5,11 +5,15 @@ import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.response.JobInfoDTO; +import com.github.kfcfans.powerjob.server.akka.OhMyServer; +import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq; import com.github.kfcfans.powerjob.server.common.SJ; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.CronExpression; +import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.service.instance.InstanceService; @@ -22,6 +26,7 @@ import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.Date; import java.util.List; +import java.util.Objects; import java.util.Optional; /** @@ -44,6 +49,9 @@ public class JobService { @Resource private InstanceInfoRepository instanceInfoRepository; + @Resource + private AppInfoRepository appInfoRepository; + /** * 保存/修改任务 * @param request 任务请求 @@ -103,12 +111,30 @@ public class JobService { */ public long runJob(Long jobId, String instanceParams, long delay) { - log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobId, instanceParams, delay); - JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId)); + + AppInfoDO appInfo = appInfoRepository.findById(jobInfo.getAppId()).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by appId: " + jobInfo.getAppId())); + String targetServer = appInfo.getCurrentServer(); + + if (Objects.equals(targetServer, OhMyServer.getActorSystemAddress())) { + return realRunJob(jobInfo, instanceParams, delay); + } + + // 转发请求 + log.info("[Job-{}] redirect run request to target server: {}", jobId, targetServer); + RunJobOrWorkflowReq req = new RunJobOrWorkflowReq(RunJobOrWorkflowReq.JOB, jobId, delay, instanceParams, jobInfo.getAppId()); + try { + return Long.parseLong(OhMyServer.askFriend(targetServer, req)); + }catch (Exception e) { + log.error("[Job-{}] redirect run request[params={}] to target server[{}] failed!", jobId, instanceParams, targetServer); + throw new PowerJobException("redirect run request failed!", e); + } + } + + private long realRunJob(JobInfoDO jobInfo, String instanceParams, long delay) { + log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobInfo.getId(), instanceParams, delay); Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0)); instanceInfoRepository.flush(); - if (delay <= 0) { dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); }else { @@ -116,7 +142,7 @@ public class JobService { dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null); }); } - log.info("[Job-{}] run job successfully, instanceId={}", jobId, instanceId); + log.info("[Job-{}] run job successfully, params= {}, instanceId={}", jobInfo.getId(), instanceParams, instanceId); return instanceId; } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java index 073d7491..81430898 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java @@ -5,13 +5,18 @@ import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.response.WorkflowInfoDTO; +import com.github.kfcfans.powerjob.server.akka.OhMyServer; +import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq; import com.github.kfcfans.powerjob.server.common.SJ; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.CronExpression; import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils; +import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; @@ -24,9 +29,12 @@ import java.util.Date; * @author tjq * @since 2020/5/26 */ +@Slf4j @Service public class WorkflowService { + @Resource + private AppInfoRepository appInfoRepository; @Resource private WorkflowInstanceManager workflowInstanceManager; @Resource @@ -138,8 +146,28 @@ public class WorkflowService { public Long runWorkflow(Long wfId, Long appId, String initParams, long delay) { WorkflowInfoDO wfInfo = permissionCheck(wfId, appId); - Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams); + AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by appId: " + appId)); + + String targetServer = OhMyServer.getActorSystemAddress(); + if (targetServer.equals(appInfo.getCurrentServer())) { + return realRunWorkflow(wfInfo, initParams, delay); + } + + log.info("[WorkflowService-{}] redirect run request to target server: {}", wfId, targetServer); + // 转发请求 + RunJobOrWorkflowReq req = new RunJobOrWorkflowReq(RunJobOrWorkflowReq.WORKFLOW, wfId, delay, initParams, appId); + try { + return Long.valueOf(OhMyServer.askFriend(targetServer, req)); + }catch (Exception e) { + log.error("[WorkflowService-{}] redirect run request[params={}] to target server[{}] failed!", wfId, initParams, targetServer); + throw new PowerJobException("redirect run request failed!", e); + } + } + + private Long realRunWorkflow(WorkflowInfoDO wfInfo, String initParams, long delay) { + log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay); + Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams); if (delay <= 0) { workflowInstanceManager.start(wfInfo, wfInstanceId, initParams); }else {