diff --git a/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java b/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java index f2f2e5ff..a2d751ec 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/RemoteConstant.java @@ -29,31 +29,43 @@ public class RemoteConstant { public static final String EMPTY_ADDRESS = "N/A"; public static final long DEFAULT_TIMEOUT_MS = 5000; - /* ************************ SERVER ************************ */ - public static final String SERVER_PATH = "server"; + /* ************************ SERVER-self_side (s4s == server for server side) ************************ */ + public static final String S4S_PATH = "friend"; + + /** + * server 集群间的心跳处理 + */ + public static final String S4S_HANDLER_PING = "ping"; + /** + * 处理其他 server 的执行请求 + */ + public static final String S4S_HANDLER_PROCESS = "process"; + + /* ************************ SERVER-worker_side(s4w == server for worker side) ************************ */ + public static final String S4W_PATH = "server"; /** * server 处理在线日志 */ - public static final String SERVER_HANDLER_REPORT_LOG = "reportLog"; + public static final String S4W_HANDLER_REPORT_LOG = "reportLog"; /** * server 处理 worker 心跳 */ - public static final String SERVER_HANDLER_WORKER_HEARTBEAT = "workerHeartbeat"; + public static final String S4W_HANDLER_WORKER_HEARTBEAT = "workerHeartbeat"; /** * server 处理 TaskTracker 上报的任务实例状态 */ - public static final String SERVER_HANDLER_REPORT_INSTANCE_STATUS = "reportInstanceStatus"; + public static final String S4W_HANDLER_REPORT_INSTANCE_STATUS = "reportInstanceStatus"; /** * server 查询任务的可执行集群 */ - public static final String SERVER_HANDLER_QUERY_JOB_CLUSTER = "queryJobCluster"; + public static final String S4W_HANDLER_QUERY_JOB_CLUSTER = "queryJobCluster"; /** * server 处理 worker 请求部署容器命令 */ - public static final String SERVER_HANDLER_WORKER_NEED_DEPLOY_CONTAINER = "container"; + public static final String S4W_HANDLER_WORKER_NEED_DEPLOY_CONTAINER = "queryContainer"; /* ************************ Worker-TaskTracker ************************ */ public static final String WTT_PATH = "taskTracker"; diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaMappingService.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaMappingService.java index 904b47f2..ac4392d6 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaMappingService.java +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaMappingService.java @@ -22,7 +22,8 @@ public class AkkaMappingService { private static final Map RP_2_ACTOR_CFG = Maps.newHashMap(); static { - addMappingRule(RemoteConstant.SERVER_PATH, "server_actor", null); + addMappingRule(RemoteConstant.S4W_PATH, "server_actor", "w-r-c-d"); + addMappingRule(RemoteConstant.S4S_PATH, "friend_actor", "friend-request-actor-dispatcher"); } private static final String DEFAULT_DISPATCH_NAME = "common-dispatcher"; diff --git a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProxyActor.java b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProxyActor.java index 15700cc0..b2d2fc5e 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProxyActor.java +++ b/powerjob-remote/powerjob-remote-impl-akka/src/main/java/tech/powerjob/remote/akka/AkkaProxyActor.java @@ -55,9 +55,15 @@ public class AkkaProxyActor extends AbstractActor { try { final Object ret = handlerInfo.getMethod().invoke(actorInfo.getActor(), req); - if (ret != null) { - getSender().tell(ret, getSelf()); + if (ret == null) { + return; } + if (ret instanceof Optional) { + if (!((Optional) ret).isPresent()) { + return; + } + } + getSender().tell(ret, getSelf()); } catch (Exception e) { log.error("[PowerJob-AKKA] process failed!", e); } diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 37f64bbc..dedfefba 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -28,7 +28,7 @@ 2.9.2 2.7.4 - 4.2.1 + 8.0.30 19.7.0.0 @@ -50,6 +50,8 @@ 3.0.10 9.1.6 + 4.2.1 + 4.2.1 @@ -99,11 +101,16 @@ - + tech.powerjob - powerjob-common - ${powerjob.common.version} + powerjob-remote-impl-http + ${powerjob-remote-impl-http.version} + + + tech.powerjob + powerjob-remote-impl-akka + ${powerjob-remote-impl-akka.version} @@ -267,6 +274,12 @@ io.springfox springfox-swagger2 ${swagger.version} + + + guava + com.google.guava + + diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java index 4a80cf87..29f10c7b 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/DispatchService.java @@ -11,6 +11,7 @@ import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.*; import tech.powerjob.common.request.ServerScheduleJobReq; +import tech.powerjob.remote.framework.base.URL; import tech.powerjob.server.common.Holder; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.core.instance.InstanceManager; @@ -19,7 +20,8 @@ import tech.powerjob.server.core.lock.UseCacheLock; import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; -import tech.powerjob.server.remote.transport.TransportService; +import tech.powerjob.server.remote.tp.ServerURLFactory; +import tech.powerjob.server.remote.tp.TransportService; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; import java.util.ArrayList; @@ -165,7 +167,8 @@ public class DispatchService { WorkerInfo taskTracker = suitableWorkers.get(0); String taskTrackerAddress = taskTracker.getAddress(); - transportService.tell(Protocol.of(taskTracker.getProtocol()), taskTrackerAddress, req); + URL workerUrl = ServerURLFactory.dispatchJob2Worker(taskTrackerAddress); + transportService.tell(taskTracker.getProtocol(), workerUrl, req); log.info("[Dispatcher-{}|{}] send schedule request to TaskTracker[protocol:{},address:{}] successfully: {}.", jobId, instanceId, taskTracker.getProtocol(), taskTrackerAddress, req); // 修改状态 diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/container/ContainerService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/container/ContainerService.java index 56dc22ee..89875876 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/container/ContainerService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/container/ContainerService.java @@ -1,25 +1,5 @@ package tech.powerjob.server.core.container; -import tech.powerjob.common.OmsConstant; -import tech.powerjob.common.enums.Protocol; -import tech.powerjob.common.model.DeployedContainerInfo; -import tech.powerjob.common.model.GitRepoInfo; -import tech.powerjob.common.request.ServerDeployContainerRequest; -import tech.powerjob.common.request.ServerDestroyContainerRequest; -import tech.powerjob.common.utils.CommonUtils; -import tech.powerjob.common.serialize.JsonUtils; -import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.common.utils.SegmentLock; -import tech.powerjob.server.common.constants.ContainerSourceType; -import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.common.utils.OmsFileUtils; -import tech.powerjob.server.extension.LockService; -import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; -import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; -import tech.powerjob.server.persistence.mongodb.GridFsManager; -import tech.powerjob.server.remote.transport.TransportService; -import tech.powerjob.server.remote.worker.WorkerClusterQueryService; -import tech.powerjob.server.common.module.WorkerInfo; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; @@ -28,6 +8,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.commons.io.filefilter.IOFileFilter; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.maven.shared.invoker.DefaultInvocationRequest; @@ -43,8 +24,29 @@ import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider; import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.springframework.web.multipart.MultipartFile; +import tech.powerjob.common.OmsConstant; +import tech.powerjob.common.model.DeployedContainerInfo; +import tech.powerjob.common.model.GitRepoInfo; +import tech.powerjob.common.request.ServerDeployContainerRequest; +import tech.powerjob.common.request.ServerDestroyContainerRequest; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.common.utils.SegmentLock; +import tech.powerjob.remote.framework.base.URL; +import tech.powerjob.server.common.constants.ContainerSourceType; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.module.WorkerInfo; +import tech.powerjob.server.common.utils.OmsFileUtils; +import tech.powerjob.server.extension.LockService; +import tech.powerjob.server.persistence.mongodb.GridFsManager; +import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; +import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; +import tech.powerjob.server.remote.server.redirector.DesignateServer; +import tech.powerjob.server.remote.tp.ServerURLFactory; +import tech.powerjob.server.remote.tp.TransportService; +import tech.powerjob.server.remote.worker.WorkerClusterQueryService; import javax.annotation.Resource; import javax.websocket.RemoteEndpoint; @@ -128,7 +130,8 @@ public class ContainerService { ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId()); workerClusterQueryService.getAllAliveWorkers(container.getAppId()).forEach(workerInfo -> { - transportService.tell(Protocol.AKKA, workerInfo.getAddress(), destroyRequest); + final URL url = ServerURLFactory.destroyContainer2Worker(workerInfo.getAddress()); + transportService.tell(workerInfo.getProtocol(), url, destroyRequest); }); log.info("[ContainerService] delete container: {}.", container); @@ -247,11 +250,8 @@ public class ContainerService { containerInfoRepository.saveAndFlush(container); // 开始部署(需要分批进行) - Set workerAddressList = workerClusterQueryService.getAllAliveWorkers(container.getAppId()) - .stream() - .map(WorkerInfo::getAddress) - .collect(Collectors.toSet()); - if (workerAddressList.isEmpty()) { + final List allAliveWorkers = workerClusterQueryService.getAllAliveWorkers(container.getAppId()); + if (allAliveWorkers.isEmpty()) { remote.sendText("SYSTEM: there is no worker available now, deploy failed!"); return; } @@ -262,10 +262,12 @@ public class ContainerService { long sleepTime = calculateSleepTime(jarFile.length()); AtomicInteger count = new AtomicInteger(); - workerAddressList.forEach(akkaAddress -> { - transportService.tell(Protocol.AKKA, akkaAddress, req); + allAliveWorkers.forEach(workerInfo -> { - remote.sendText("SYSTEM: send deploy request to " + akkaAddress); + final URL url = ServerURLFactory.deployContainer2Worker(workerInfo.getAddress()); + transportService.tell(workerInfo.getProtocol(), url, req); + + remote.sendText("SYSTEM: send deploy request to " + url.getAddress()); if (count.incrementAndGet() % DEPLOY_BATCH_NUM == 0) { CommonUtils.executeIgnoreException(() -> Thread.sleep(sleepTime)); @@ -285,6 +287,7 @@ public class ContainerService { * @param containerId 容器ID * @return 拼接好的可阅读字符串 */ + @DesignateServer public String fetchDeployedInfo(Long appId, Long containerId) { List infoList = workerClusterQueryService.getDeployedContainerInfos(appId, containerId); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java index 6fe30e70..a5231685 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/IWorkerRequestHandler.java @@ -2,21 +2,27 @@ package tech.powerjob.server.core.handler; import tech.powerjob.common.request.*; import tech.powerjob.common.response.AskResponse; +import tech.powerjob.remote.framework.actor.Handler; +import tech.powerjob.remote.framework.actor.ProcessType; +import tech.powerjob.server.remote.actoes.ServerActor; import java.util.Optional; +import static tech.powerjob.common.RemoteConstant.*; + /** * 定义 server 与 worker 之间需要处理的协议 * * @author tjq * @since 2022/9/10 */ -public interface IWorkerRequestHandler { +public interface IWorkerRequestHandler extends ServerActor { /** * 处理 worker 上报的心跳信息 * @param heartbeat 心跳信息 */ + @Handler(path = S4W_HANDLER_WORKER_HEARTBEAT, processType = ProcessType.NO_BLOCKING) void processWorkerHeartbeat(WorkerHeartbeat heartbeat); /** @@ -24,6 +30,7 @@ public interface IWorkerRequestHandler { * @param req 上报请求 * @return 响应信息 */ + @Handler(path = S4W_HANDLER_REPORT_INSTANCE_STATUS, processType = ProcessType.BLOCKING) Optional processTaskTrackerReportInstanceStatus(TaskTrackerReportInstanceStatusReq req); /** @@ -31,12 +38,14 @@ public interface IWorkerRequestHandler { * @param req 请求 * @return cluster info */ + @Handler(path = S4W_HANDLER_QUERY_JOB_CLUSTER, processType = ProcessType.BLOCKING) AskResponse processWorkerQueryExecutorCluster(WorkerQueryExecutorClusterReq req); /** - * 处理 worker 日志推送请求 + * 处理 worker 日志推送请求(内部使用线程池异步处理,非阻塞) * @param req 请求 */ + @Handler(path = S4W_HANDLER_REPORT_LOG, processType = ProcessType.NO_BLOCKING) void processWorkerLogReport(WorkerLogReportReq req); /** @@ -44,5 +53,6 @@ public interface IWorkerRequestHandler { * @param request 请求 * @return 容器部署信息 */ + @Handler(path = S4W_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, processType = ProcessType.BLOCKING) AskResponse processWorkerNeedDeployContainer(WorkerNeedDeployContainerRequest request); } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java index ed4e9206..13eef738 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java @@ -4,11 +4,13 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import tech.powerjob.common.request.WorkerHeartbeat; import tech.powerjob.common.request.WorkerLogReportReq; import tech.powerjob.common.response.AskResponse; +import tech.powerjob.remote.framework.actor.Actor; import tech.powerjob.server.core.instance.InstanceLogService; import tech.powerjob.server.core.instance.InstanceManager; import tech.powerjob.server.core.workflow.WorkflowInstanceManager; @@ -30,6 +32,7 @@ import java.util.Optional; */ @Slf4j @Component +@Actor(path = RemoteConstant.S4W_PATH) public class WorkerRequestHandlerImpl extends AbWorkerRequestHandler { private final InstanceManager instanceManager; diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/Initializer.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/Initializer.java deleted file mode 100644 index d34d7469..00000000 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/handler/impl/Initializer.java +++ /dev/null @@ -1,28 +0,0 @@ -package tech.powerjob.server.core.handler.impl; - -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.stereotype.Component; -import tech.powerjob.common.RemoteConstant; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; -import tech.powerjob.server.remote.transport.starter.VertXStarter; - -import javax.annotation.PostConstruct; - -/** - * 初始化器 - * - * @author tjq - * @since 2022/9/11 - */ -@Component -@ConditionalOnExpression("'${execution.env}'!='test'") -public class Initializer { - - @PostConstruct - public void initHandler() { - // init akka - AkkaStarter.actorSystem.actorOf(WorkerRequestAkkaHandler.defaultProps(), RemoteConstant.SERVER_ACTOR_NAME); - // init vert.x - VertXStarter.vertx.deployVerticle(new WorkerRequestHttpHandler()); - } -} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java index 40f066e1..2c641c27 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceManager.java @@ -2,15 +2,16 @@ package tech.powerjob.server.core.instance; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; -import org.apache.commons.lang3.StringUtils; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.enums.Protocol; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.request.ServerStopInstanceReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; +import tech.powerjob.remote.framework.base.URL; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.timewheel.holder.HashedWheelTimerHolder; import tech.powerjob.server.common.utils.SpringUtils; @@ -22,10 +23,10 @@ import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.model.UserInfoDO; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; -import tech.powerjob.server.remote.transport.TransportService; +import tech.powerjob.server.remote.tp.ServerURLFactory; +import tech.powerjob.server.remote.tp.TransportService; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; -import javax.annotation.Resource; import java.util.Date; import java.util.List; import java.util.Optional; @@ -176,7 +177,8 @@ public class InstanceManager { if (workerInfoOpt.isPresent()) { ServerStopInstanceReq stopInstanceReq = new ServerStopInstanceReq(instanceId); WorkerInfo workerInfo = workerInfoOpt.get(); - transportService.tell(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), stopInstanceReq); + final URL url = ServerURLFactory.stopInstance2Worker(workerInfo.getAddress()); + transportService.tell(workerInfo.getProtocol(), url, stopInstanceReq); } } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java index 2d465b96..db12ab99 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java @@ -5,15 +5,16 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Service; import tech.powerjob.common.PowerQuery; +import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.InstanceStatus; -import tech.powerjob.common.enums.Protocol; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerQueryInstanceStatusReq; import tech.powerjob.common.request.ServerStopInstanceReq; import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.response.InstanceInfoDTO; +import tech.powerjob.remote.framework.base.URL; import tech.powerjob.server.common.constants.InstanceType; import tech.powerjob.server.common.module.WorkerInfo; import tech.powerjob.server.common.timewheel.TimerFuture; @@ -26,12 +27,14 @@ import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import tech.powerjob.server.remote.server.redirector.DesignateServer; -import tech.powerjob.server.remote.transport.TransportService; +import tech.powerjob.server.remote.tp.ServerURLFactory; +import tech.powerjob.server.remote.tp.TransportService; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; import java.util.Date; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static tech.powerjob.common.enums.InstanceStatus.RUNNING; @@ -136,7 +139,7 @@ public class InstanceService { if (workerInfoOpt.isPresent()) { ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId); WorkerInfo workerInfo = workerInfoOpt.get(); - transportService.tell(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), req); + transportService.tell(workerInfo.getProtocol(), ServerURLFactory.stopInstance2Worker(workerInfo.getAddress()), req); log.info("[Instance-{}] update instanceInfo and send 'stopInstance' request succeed.", instanceId); } else { log.warn("[Instance-{}] update instanceInfo successfully but can't find TaskTracker to stop instance", instanceId); @@ -280,7 +283,10 @@ public class InstanceService { WorkerInfo workerInfo = workerInfoOpt.get(); ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId); try { - AskResponse askResponse = transportService.ask(Protocol.of(workerInfo.getProtocol()), workerInfo.getAddress(), req); + final URL url = ServerURLFactory.queryInstance2Worker(workerInfo.getAddress()); + AskResponse askResponse = transportService.ask(workerInfo.getProtocol(), url, req, AskResponse.class) + .toCompletableFuture() + .get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (askResponse.isSuccess()) { InstanceDetail instanceDetail = askResponse.getData(InstanceDetail.class); instanceDetail.setRunningTimes(instanceInfoDO.getRunningTimes()); diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java index 879cd749..3d80692f 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java @@ -22,9 +22,8 @@ import tech.powerjob.server.persistence.remote.model.WorkflowInfoDO; import tech.powerjob.server.persistence.remote.model.WorkflowInstanceInfoDO; import tech.powerjob.server.persistence.remote.model.brief.BriefInstanceInfo; import tech.powerjob.server.persistence.remote.repository.*; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; +import tech.powerjob.server.remote.tp.TransportService; -import javax.annotation.Resource; import java.util.*; import java.util.stream.Collectors; @@ -49,6 +48,8 @@ public class InstanceStatusCheckService { public static final long CHECK_INTERVAL = 10000; + private final TransportService transportService; + private final DispatchService dispatchService; private final InstanceManager instanceManager; @@ -61,7 +62,6 @@ public class InstanceStatusCheckService { private final InstanceInfoRepository instanceInfoRepository; - private final WorkflowInfoRepository workflowInfoRepository; private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository; @@ -69,7 +69,7 @@ public class InstanceStatusCheckService { public void checkWorkflowInstance() { Stopwatch stopwatch = Stopwatch.createStarted(); // 查询 DB 获取该 Server 需要负责的 AppGroup - List allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress()); + List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[InstanceStatusChecker] current server has no app's job to check"); return; @@ -89,7 +89,7 @@ public class InstanceStatusCheckService { public void checkWaitingDispatchInstance() { Stopwatch stopwatch = Stopwatch.createStarted(); // 查询 DB 获取该 Server 需要负责的 AppGroup - List allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress()); + List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[InstanceStatusChecker] current server has no app's job to check"); return; @@ -110,7 +110,7 @@ public class InstanceStatusCheckService { public void checkWaitingWorkerReceiveInstance() { Stopwatch stopwatch = Stopwatch.createStarted(); // 查询 DB 获取该 Server 需要负责的 AppGroup - List allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress()); + List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[InstanceStatusChecker] current server has no app's job to check"); return; @@ -131,7 +131,7 @@ public class InstanceStatusCheckService { public void checkRunningInstance() { Stopwatch stopwatch = Stopwatch.createStarted(); // 查询 DB 获取该 Server 需要负责的 AppGroup - List allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress()); + List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[InstanceStatusChecker] current server has no app's job to check"); return; diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java index 33bf97f3..c63326ee 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/PowerScheduleService.java @@ -23,10 +23,9 @@ import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import tech.powerjob.server.persistence.remote.repository.WorkflowInfoRepository; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; +import tech.powerjob.server.remote.tp.TransportService; import tech.powerjob.server.remote.worker.WorkerClusterManagerService; -import javax.annotation.Resource; import java.util.*; /** @@ -47,6 +46,7 @@ public class PowerScheduleService { */ private static final int MAX_APP_NUM = 10; + private final TransportService transportService; private final DispatchService dispatchService; private final InstanceService instanceService; @@ -72,7 +72,7 @@ public class PowerScheduleService { long start = System.currentTimeMillis(); // 调度 CRON 表达式 JOB try { - final List allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress()); + final List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[CronJobSchedule] current server has no app's job to schedule."); return; @@ -92,7 +92,7 @@ public class PowerScheduleService { long start = System.currentTimeMillis(); // 调度 CRON 表达式 WORKFLOW try { - final List allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress()); + final List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[CronWorkflowSchedule] current server has no app's workflow to schedule."); return; @@ -113,7 +113,7 @@ public class PowerScheduleService { long start = System.currentTimeMillis(); // 调度 FIX_RATE/FIX_DELAY 表达式 JOB try { - final List allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress()); + final List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (CollectionUtils.isEmpty(allAppIds)) { log.info("[FrequentJobSchedule] current server has no app's job to schedule."); return; @@ -132,7 +132,7 @@ public class PowerScheduleService { public void cleanData() { try { - final List allAppIds = appInfoRepository.listAppIdByCurrentServer(AkkaStarter.getActorSystemAddress()); + final List allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress()); if (allAppIds.isEmpty()) { return; } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/actoes/ServerActor.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/actoes/ServerActor.java new file mode 100644 index 00000000..04f6fa60 --- /dev/null +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/actoes/ServerActor.java @@ -0,0 +1,10 @@ +package tech.powerjob.server.remote.actoes; + +/** + * ServerActor 声明接口 + * + * @author tjq + * @since 2023/1/21 + */ +public interface ServerActor { +} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java new file mode 100644 index 00000000..5e3ceeb2 --- /dev/null +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendActor.java @@ -0,0 +1,52 @@ +package tech.powerjob.server.remote.server; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.stereotype.Component; +import tech.powerjob.common.response.AskResponse; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.remote.framework.actor.Handler; +import tech.powerjob.remote.framework.actor.ProcessType; +import tech.powerjob.server.remote.actoes.ServerActor; +import tech.powerjob.server.remote.server.election.Ping; +import tech.powerjob.server.remote.server.redirector.RemoteProcessReq; +import tech.powerjob.server.remote.server.redirector.RemoteRequestProcessor; + +import static tech.powerjob.common.RemoteConstant.*; + +/** + * 处理朋友们的信息(处理服务器与服务器之间的通讯) + * + * @author tjq + * @since 2020/4/9 + */ +@Slf4j +@Component +@Handler(path = S4S_PATH) +public class FriendActor implements ServerActor { + + private static final String SK = "dGVuZ2ppcWlAZ21haWwuY29tIA=="; + + /** + * 处理存活检测的请求 + */ + @Handler(path = S4S_HANDLER_PING, processType = ProcessType.NO_BLOCKING) + public AskResponse onReceivePing(Ping ping) { + return AskResponse.succeed(SK); + } + + @Handler(path = S4S_HANDLER_PROCESS, processType = ProcessType.BLOCKING) + private AskResponse onReceiveRemoteProcessReq(RemoteProcessReq req) { + + AskResponse response = new AskResponse(); + response.setSuccess(true); + try { + response.setData(JsonUtils.toBytes(RemoteRequestProcessor.processRemoteRequest(req))); + } catch (Throwable t) { + log.error("[FriendActor] process remote request[{}] failed!", req, t); + response.setSuccess(false); + response.setMessage(ExceptionUtils.getMessage(t)); + } + return response; + } +} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendRequestHandler.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendRequestHandler.java deleted file mode 100644 index edda488e..00000000 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/FriendRequestHandler.java +++ /dev/null @@ -1,86 +0,0 @@ -package tech.powerjob.server.remote.server; - -import akka.actor.AbstractActor; -import akka.actor.Props; -import akka.routing.DefaultResizer; -import akka.routing.RoundRobinPool; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.exception.ExceptionUtils; -import tech.powerjob.common.response.AskResponse; -import tech.powerjob.common.serialize.JsonUtils; -import tech.powerjob.server.remote.server.election.Ping; -import tech.powerjob.server.remote.server.redirector.RemoteProcessReq; -import tech.powerjob.server.remote.server.redirector.RemoteRequestProcessor; -import tech.powerjob.server.remote.transport.TransportService; - -/** - * 处理朋友们的信息(处理服务器与服务器之间的通讯) - * - * @author tjq - * @since 2020/4/9 - */ -@Slf4j -public class FriendRequestHandler extends AbstractActor { - - - public static Props defaultProps() { - return Props.create(FriendRequestHandler.class) - .withDispatcher("akka.friend-request-actor-dispatcher") - .withRouter( - new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4) - .withResizer(new DefaultResizer( - Runtime.getRuntime().availableProcessors() * 4, - Runtime.getRuntime().availableProcessors() * 10, - 1, - 0.2d, - 0.3d, - 0.1d, - 10 - )) - ); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .match(Ping.class, this::onReceivePing) - .match(RemoteProcessReq.class, this::onReceiveRemoteProcessReq) - .matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj)) - .build(); - } - - - @Override - public void preStart() throws Exception { - super.preStart(); - log.debug("[FriendRequestHandler]init FriendRequestActor"); - } - - - @Override - public void postStop() throws Exception { - super.postStop(); - log.debug("[FriendRequestHandler]stop FriendRequestActor"); - } - - /** - * 处理存活检测的请求 - */ - private void onReceivePing(Ping ping) { - getSender().tell(AskResponse.succeed(TransportService.getAllAddress()), getSelf()); - } - - private void onReceiveRemoteProcessReq(RemoteProcessReq req) { - - AskResponse response = new AskResponse(); - response.setSuccess(true); - try { - response.setData(JsonUtils.toBytes(RemoteRequestProcessor.processRemoteRequest(req))); - } catch (Throwable t) { - log.error("[FriendActor] process remote request[{}] failed!", req, t); - response.setSuccess(false); - response.setMessage(ExceptionUtils.getMessage(t)); - } - getSender().tell(response, getSelf()); - } -} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java index 1db65201..e47599ba 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/election/ServerElectionService.java @@ -1,29 +1,23 @@ package tech.powerjob.server.remote.server.election; -import akka.actor.ActorSelection; -import akka.pattern.Patterns; -import com.alibaba.fastjson.JSONObject; -import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.common.enums.Protocol; -import tech.powerjob.common.response.AskResponse; -import tech.powerjob.common.serialize.JsonUtils; -import tech.powerjob.server.extension.LockService; -import tech.powerjob.server.persistence.remote.model.AppInfoDO; -import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; -import tech.powerjob.server.remote.transport.TransportService; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import tech.powerjob.common.enums.Protocol; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.response.AskResponse; +import tech.powerjob.remote.framework.base.URL; +import tech.powerjob.server.extension.LockService; +import tech.powerjob.server.persistence.remote.model.AppInfoDO; +import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; +import tech.powerjob.server.remote.tp.ServerURLFactory; +import tech.powerjob.server.remote.tp.TransportService; -import javax.annotation.Resource; -import java.time.Duration; import java.util.Date; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -59,7 +53,7 @@ public class ServerElectionService { public String elect(Long appId, String protocol, String currentServer) { if (!accurate()) { // 如果是本机,就不需要查数据库那么复杂的操作了,直接返回成功 - if (getProtocolServerAddress(protocol).equals(currentServer)) { + if (transportService.defaultProtocol().getAddress().equals(currentServer)) { return currentServer; } } @@ -104,13 +98,14 @@ public class ServerElectionService { } // 篡位,本机作为Server - // 注意,写入 AppInfoDO#currentServer 的永远是 ActorSystem 的地址,仅在返回的时候特殊处理 - appInfo.setCurrentServer(transportService.getTransporter(Protocol.AKKA).getAddress()); + // 注意,写入 AppInfoDO#currentServer 的永远是 ActorSystem 的地址,仅在返回的时候特殊处理 (4.3.0 更改为 HTTP) + final String selfDefaultAddress = transportService.defaultProtocol().getAddress(); + appInfo.setCurrentServer(selfDefaultAddress); appInfo.setGmtModified(new Date()); appInfoRepository.saveAndFlush(appInfo); log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId); - return getProtocolServerAddress(protocol); + return selfDefaultAddress; }catch (Exception e) { log.error("[ServerElection] write new server to db failed for app {}.", appName, e); }finally { @@ -139,16 +134,18 @@ public class ServerElectionService { Ping ping = new Ping(); ping.setCurrentTime(System.currentTimeMillis()); - ActorSelection serverActor = AkkaStarter.getFriendActor(serverAddress); + URL targetUrl = ServerURLFactory.ping2Friend(serverAddress); try { - CompletionStage askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS)); - AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS); - downServerCache.remove(serverAddress); + AskResponse response = transportService.ask(Protocol.HTTP.name(), targetUrl, ping, AskResponse.class) + .toCompletableFuture() + .get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (response.isSuccess()) { - return JsonUtils.parseObject(response.getData(), JSONObject.class).getString(protocol); + log.info("[ServerElection] server[{}] is active, it will be the master.", serverAddress); + downServerCache.remove(serverAddress); + return serverAddress; } }catch (Exception e) { - log.warn("[ServerElection] server({}) was down.", serverAddress); + log.warn("[ServerElection] server[{}] was down.", serverAddress); } downServerCache.add(serverAddress); return null; @@ -157,9 +154,4 @@ public class ServerElectionService { private boolean accurate() { return ThreadLocalRandom.current().nextInt(100) < accurateSelectServerPercentage; } - - private String getProtocolServerAddress(String protocol) { - Protocol pt = Protocol.of(protocol); - return TransportService.getAllAddress().get(pt); - } } diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java index 9b5280b5..56eda4e4 100644 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java @@ -1,17 +1,9 @@ package tech.powerjob.server.remote.server.redirector; -import akka.pattern.Patterns; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.type.TypeFactory; import lombok.RequiredArgsConstructor; -import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.common.RemoteConstant; -import tech.powerjob.common.response.AskResponse; -import org.springframework.core.annotation.Order; -import tech.powerjob.server.persistence.remote.model.AppInfoDO; -import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.aspectj.lang.ProceedingJoinPoint; @@ -19,16 +11,25 @@ import org.aspectj.lang.Signature; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; +import tech.powerjob.common.RemoteConstant; +import tech.powerjob.common.enums.Protocol; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.response.AskResponse; +import tech.powerjob.remote.framework.base.URL; +import tech.powerjob.server.persistence.remote.model.AppInfoDO; +import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; +import tech.powerjob.server.remote.tp.ServerURLFactory; +import tech.powerjob.server.remote.tp.TransportService; -import javax.annotation.Resource; import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; -import java.time.Duration; import java.util.Arrays; import java.util.Objects; import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; /** * 指定服务器运行切面 @@ -43,6 +44,7 @@ import java.util.concurrent.CompletionStage; @RequiredArgsConstructor public class DesignateServerAspect { + private final TransportService transportService; private final AppInfoRepository appInfoRepository; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -84,7 +86,7 @@ public class DesignateServerAspect { } // 目标IP与本地符合则本地执行 - if (Objects.equals(targetServer, AkkaStarter.getActorSystemAddress())) { + if (Objects.equals(targetServer, transportService.defaultProtocol())) { return point.proceed(); } @@ -96,8 +98,10 @@ public class DesignateServerAspect { .setParameterTypes(parameterTypes) .setArgs(args); - CompletionStage askCS = Patterns.ask(AkkaStarter.getFriendActor(targetServer), remoteProcessReq, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); - AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(); + final URL friendUrl = ServerURLFactory.process2Friend(targetServer); + + CompletionStage askCS = transportService.ask(Protocol.HTTP.name(), friendUrl, remoteProcessReq, AskResponse.class); + AskResponse askResponse = askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (!askResponse.isSuccess()) { throw new PowerJobException("remote process failed: " + askResponse.getMessage()); diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/tp/PowerTransportService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/tp/PowerTransportService.java new file mode 100644 index 00000000..1b2f0e2e --- /dev/null +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/tp/PowerTransportService.java @@ -0,0 +1,169 @@ +package tech.powerjob.server.remote.tp; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; +import tech.powerjob.common.OmsConstant; +import tech.powerjob.common.PowerSerializable; +import tech.powerjob.common.enums.Protocol; +import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.remote.framework.base.Address; +import tech.powerjob.remote.framework.base.RemotingException; +import tech.powerjob.remote.framework.base.ServerType; +import tech.powerjob.remote.framework.base.URL; +import tech.powerjob.remote.framework.engine.EngineConfig; +import tech.powerjob.remote.framework.engine.EngineOutput; +import tech.powerjob.remote.framework.engine.RemoteEngine; +import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine; +import tech.powerjob.server.remote.actoes.ServerActor; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +/** + * server 数据传输服务 + * + * @author tjq + * @since 2023/1/21 + */ +@Slf4j +@Service +public class PowerTransportService implements TransportService, InitializingBean { + + @Value("${oms.transporter.active.protocols}") + private String activeProtocols; + private static final String PROTOCOL_PORT_CONFIG = "oms.%s.port"; + + private final Environment environment; + private final List serverActors; + + private ProtocolInfo defaultProtocol; + private final Map protocol2Transporter = Maps.newHashMap(); + + public PowerTransportService(List serverActors, Environment environment) { + this.serverActors = serverActors; + this.environment = environment; + } + + @Override + public ProtocolInfo defaultProtocol() { + return defaultProtocol; + } + + private ProtocolInfo fetchProtocolInfo(String protocol) { + // 兼容老版 worker 未上报 protocol 的情况 + protocol = compatibleProtocol(protocol); + final ProtocolInfo protocolInfo = protocol2Transporter.get(protocol); + if (protocolInfo == null) { + throw new IllegalArgumentException("can't find Transporter by protocol :" + protocol); + } + return protocolInfo; + } + + @Override + public void tell(String protocol, URL url, PowerSerializable request) { + fetchProtocolInfo(protocol).getTransporter().tell(url, request); + } + + @Override + public CompletionStage ask(String protocol, URL url, PowerSerializable request, Class clz) throws RemotingException { + return fetchProtocolInfo(protocol).getTransporter().ask(url, request, clz); + } + + private void initRemoteFrameWork(String protocol, int port) { + Address address = new Address() + .setHost(NetUtils.getLocalHost()) + .setPort(port); + EngineConfig engineConfig = new EngineConfig() + .setServerType(ServerType.SERVER) + .setType(protocol.toUpperCase()) + .setBindAddress(address) + .setActorList(Lists.newArrayList(serverActors)); + log.info("[PowerTransportService] start to initialize RemoteEngine[type={},address={}]", protocol, address); + RemoteEngine re = new PowerJobRemoteEngine(); + final EngineOutput engineOutput = re.start(engineConfig); + log.info("[PowerTransportService] start RemoteEngine[type={},address={}] successfully", protocol, address); + + this.protocol2Transporter.put(protocol, new ProtocolInfo(protocol, address.toFullAddress(), engineOutput.getTransporter())); + } + + @Override + public void afterPropertiesSet() throws Exception { + + log.info("[PowerTransportService] start to initialize whole PowerTransportService!"); + log.info("[PowerTransportService] activeProtocols: {}", activeProtocols); + + if (StringUtils.isEmpty(activeProtocols)) { + throw new IllegalArgumentException("activeProtocols can't be empty!"); + } + + for (String protocol : activeProtocols.split(OmsConstant.COMMA)) { + try { + final int port = parseProtocolPort(protocol); + initRemoteFrameWork(protocol, port); + } catch (Throwable t) { + log.error("[PowerTransportService] initialize protocol[{}] failed. If you don't need to use this protocol, you can turn it off by 'oms.transporter.active.protocols'", protocol); + ExceptionUtils.rethrow(t); + } + } + + choseDefault(); + + log.info("[PowerTransportService] initialize successfully!"); + log.info("[PowerTransportService] ALL_PROTOCOLS: {}", protocol2Transporter); + } + + /** + * 获取协议端口,考虑兼容性 & 用户仔细扩展的场景,选择动态从 env 获取 port + * @return port + */ + private int parseProtocolPort(String protocol) { + final String key1 = String.format(PROTOCOL_PORT_CONFIG, protocol.toLowerCase()); + final String key2 = String.format(PROTOCOL_PORT_CONFIG, protocol.toUpperCase()); + String portStr = environment.getProperty(key1); + if (StringUtils.isEmpty(portStr)) { + portStr = environment.getProperty(key2); + } + log.info("[PowerTransportService] fetch port for protocol[{}], key={}, value={}", protocol, key1, portStr); + + if (StringUtils.isEmpty(portStr)) { + throw new IllegalArgumentException(String.format("can't find protocol config by key: %s, please check your spring config!", key1)); + } + + return Integer.parseInt(portStr); + } + + private String compatibleProtocol(String p) { + if (p == null) { + return Protocol.AKKA.name(); + } + return p; + } + + /** + * HTTP 优先,否则默认取第一个协议 + */ + private void choseDefault() { + ProtocolInfo httpP = protocol2Transporter.get(Protocol.HTTP.name()); + if (httpP != null) { + log.info("[PowerTransportService] exist HTTP protocol, chose this as the default protocol!"); + this.defaultProtocol = httpP; + return; + } + + String firstProtocol = activeProtocols.split(OmsConstant.COMMA)[0]; + this.defaultProtocol = this.protocol2Transporter.get(firstProtocol); + log.info("[PowerTransportService] chose [{}] as the default protocol!", firstProtocol); + + if (this.defaultProtocol == null) { + throw new IllegalArgumentException("can't find default protocol, please check your config!"); + } + } +} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/tp/ProtocolInfo.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/tp/ProtocolInfo.java new file mode 100644 index 00000000..c405d1b3 --- /dev/null +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/tp/ProtocolInfo.java @@ -0,0 +1,28 @@ +package tech.powerjob.server.remote.tp; + +import lombok.Getter; +import lombok.ToString; +import tech.powerjob.remote.framework.transporter.Transporter; + +/** + * ProtocolInfo + * + * @author tjq + * @since 2023/1/21 + */ +@Getter +@ToString +public class ProtocolInfo { + + private final String protocol; + + private final String address; + + private final transient Transporter transporter; + + public ProtocolInfo(String protocol, String address, Transporter transporter) { + this.protocol = protocol; + this.address = address; + this.transporter = transporter; + } +} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/tp/ServerURLFactory.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/tp/ServerURLFactory.java new file mode 100644 index 00000000..d8345ab0 --- /dev/null +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/tp/ServerURLFactory.java @@ -0,0 +1,52 @@ +package tech.powerjob.server.remote.tp; + +import tech.powerjob.remote.framework.base.Address; +import tech.powerjob.remote.framework.base.HandlerLocation; +import tech.powerjob.remote.framework.base.ServerType; +import tech.powerjob.remote.framework.base.URL; + +import static tech.powerjob.common.RemoteConstant.*; + +/** + * 统一生成地址 + * + * @author tjq + * @since 2023/1/21 + */ +public class ServerURLFactory { + + public static URL dispatchJob2Worker(String address) { + return simileBuild(address, ServerType.WORKER, WORKER_PATH, WTT_HANDLER_RUN_JOB); + } + + public static URL stopInstance2Worker(String address) { + return simileBuild(address, ServerType.WORKER, WORKER_PATH, WTT_HANDLER_STOP_INSTANCE); + } + + public static URL queryInstance2Worker(String address) { + return simileBuild(address, ServerType.WORKER, WORKER_PATH, WTT_HANDLER_QUERY_INSTANCE_STATUS); + } + + public static URL deployContainer2Worker(String address) { + return simileBuild(address, ServerType.WORKER, WORKER_PATH, WORKER_HANDLER_DEPLOY_CONTAINER); + } + + public static URL destroyContainer2Worker(String address) { + return simileBuild(address, ServerType.WORKER, WORKER_PATH, WORKER_HANDLER_DESTROY_CONTAINER); + } + + public static URL ping2Friend(String address) { + return simileBuild(address, ServerType.SERVER, S4S_PATH, S4S_HANDLER_PING); + } + + public static URL process2Friend(String address) { + return simileBuild(address, ServerType.SERVER, S4S_PATH, S4S_HANDLER_PROCESS); + } + + public static URL simileBuild(String address, ServerType type, String rootPath, String handlerPath) { + return new URL() + .setServerType(type) + .setAddress(Address.fromIpv4(address)) + .setLocation(new HandlerLocation().setRootPath(rootPath).setMethodPath(handlerPath)); + } +} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/tp/TransportService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/tp/TransportService.java new file mode 100644 index 00000000..67473326 --- /dev/null +++ b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/tp/TransportService.java @@ -0,0 +1,29 @@ +package tech.powerjob.server.remote.tp; + +import tech.powerjob.common.PowerSerializable; +import tech.powerjob.remote.framework.base.RemotingException; +import tech.powerjob.remote.framework.base.URL; + +import java.util.concurrent.CompletionStage; + +/** + * server 数据传输服务 + * + * @author tjq + * @since 2023/1/21 + */ +public interface TransportService { + + /** + * 自用地址,用于维护 server -> appId 和 server 间通讯 + * 4.3.0 前为 ActorSystem Address(ip:10086) + * 4.3.0 后 PowerJob 将主协议切换为 HTTP,使用 HTTP address (ip:10010) + * @return 自用地址 + */ + ProtocolInfo defaultProtocol(); + + void tell(String protocol, URL url, PowerSerializable request); + + CompletionStage ask(String protocol, URL url, PowerSerializable request, Class clz) throws RemotingException; + +} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/TransportService.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/TransportService.java deleted file mode 100644 index 3ee154d5..00000000 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/TransportService.java +++ /dev/null @@ -1,66 +0,0 @@ -package tech.powerjob.server.remote.transport; - -import tech.powerjob.common.PowerSerializable; -import tech.powerjob.common.enums.Protocol; -import tech.powerjob.common.response.AskResponse; -import com.google.common.collect.Maps; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.List; -import java.util.Map; - -/** - * TransportService - * - * @author tjq - * @since 2021/2/7 - */ -@Slf4j -@Service -public class TransportService { - - private static final Map protocol2Address = Maps.newHashMap(); - - @Getter - private final Map protocol2Transporter = Maps.newConcurrentMap(); - - @Autowired - public TransportService(List transporters) { - transporters.forEach(t -> { - log.info("[TransportService] Transporter[protocol:{},address:{}] registration successful!", t.getProtocol(), t.getAddress()); - protocol2Transporter.put(t.getProtocol(), t); - protocol2Address.put(t.getProtocol(), t.getAddress()); - }); - } - - public void tell(Protocol protocol, String address, PowerSerializable object) { - getTransporter(protocol).tell(address, object); - } - - public AskResponse ask(Protocol protocol, String address, PowerSerializable object) throws Exception { - - return getTransporter(protocol).ask(address, object); - } - - public Transporter getTransporter(Protocol protocol) { - Transporter transporter = protocol2Transporter.get(protocol); - if (transporter == null) { - log.error("[TransportService] can't find transporter by protocol[{}], this is a bug!", protocol); - throw new UnknownProtocolException("can't find transporter by protocol: " + protocol); - } - return transporter; - } - - public static class UnknownProtocolException extends RuntimeException { - public UnknownProtocolException(String message) { - super(message); - } - } - - public static Map getAllAddress() { - return protocol2Address; - } -} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/Transporter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/Transporter.java deleted file mode 100644 index 6d77dd69..00000000 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/Transporter.java +++ /dev/null @@ -1,22 +0,0 @@ -package tech.powerjob.server.remote.transport; - -import tech.powerjob.common.PowerSerializable; -import tech.powerjob.common.enums.Protocol; -import tech.powerjob.common.response.AskResponse; - -/** - * Transporter - * - * @author tjq - * @since 2021/2/7 - */ -public interface Transporter { - - Protocol getProtocol(); - - String getAddress(); - - void tell(String address, PowerSerializable object); - - AskResponse ask(String address, PowerSerializable object) throws Exception; -} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/impl/AkkaTransporter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/impl/AkkaTransporter.java deleted file mode 100644 index 3a7ae9a1..00000000 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/impl/AkkaTransporter.java +++ /dev/null @@ -1,48 +0,0 @@ -package tech.powerjob.server.remote.transport.impl; - -import akka.actor.ActorSelection; -import akka.pattern.Patterns; -import tech.powerjob.common.PowerSerializable; -import tech.powerjob.common.enums.Protocol; -import tech.powerjob.common.RemoteConstant; -import tech.powerjob.common.response.AskResponse; -import tech.powerjob.server.remote.transport.Transporter; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; -import org.springframework.stereotype.Service; - -import java.time.Duration; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; - -/** - * akka transporter - * - * @author tjq - * @since 2021/2/7 - */ -@Service -public class AkkaTransporter implements Transporter { - - @Override - public Protocol getProtocol() { - return Protocol.AKKA; - } - - @Override - public String getAddress() { - return AkkaStarter.getActorSystemAddress(); - } - - @Override - public void tell(String address, PowerSerializable object) { - ActorSelection taskTrackerActor = AkkaStarter.getWorkerActor(address); - taskTrackerActor.tell(object, null); - } - - @Override - public AskResponse ask(String address, PowerSerializable object) throws Exception { - ActorSelection taskTrackerActor = AkkaStarter.getWorkerActor(address); - CompletionStage askCS = Patterns.ask(taskTrackerActor, object, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); - return (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } -} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/impl/HttpTransporter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/impl/HttpTransporter.java deleted file mode 100644 index 365b60a5..00000000 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/impl/HttpTransporter.java +++ /dev/null @@ -1,73 +0,0 @@ -package tech.powerjob.server.remote.transport.impl; - -import tech.powerjob.common.PowerSerializable; -import tech.powerjob.common.enums.Protocol; -import tech.powerjob.common.RemoteConstant; -import tech.powerjob.common.response.AskResponse; -import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.server.remote.transport.Transporter; -import tech.powerjob.server.remote.transport.starter.VertXStarter; -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.core.buffer.Buffer; -import io.vertx.ext.web.client.HttpResponse; -import io.vertx.ext.web.client.WebClient; -import io.vertx.ext.web.client.WebClientOptions; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.Pair; -import org.springframework.stereotype.Service; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -/** - * http transporter powered by vert.x - * - * @author tjq - * @since 2021/2/8 - */ -@Slf4j -@Service -public class HttpTransporter implements Transporter { - - private final WebClient webClient; - - public HttpTransporter() { - WebClientOptions options = new WebClientOptions() - .setKeepAlive(false) - .setConnectTimeout((int) RemoteConstant.DEFAULT_TIMEOUT_MS); - webClient = WebClient.create(Vertx.vertx(), options); - } - - @Override - public Protocol getProtocol() { - return Protocol.HTTP; - } - - @Override - public String getAddress() { - return VertXStarter.getAddress(); - } - - @Override - public void tell(String address, PowerSerializable object) { - postRequest(address, object); - } - - @Override - public AskResponse ask(String address, PowerSerializable object) throws Exception { - CompletableFuture> future = postRequest(address, object).toCompletionStage().toCompletableFuture(); - HttpResponse httpResponse = future.get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - return httpResponse.bodyAsJson(AskResponse.class); - } - - private Future> postRequest(String address, PowerSerializable object) { - Pair ipAndPort = NetUtils.splitAddress2IpAndPort(address); - String ip = ipAndPort.getLeft(); - int port = ipAndPort.getRight(); - return webClient.post(port, ip, object.path()) - .sendJson(object) - .onSuccess(res -> log.info("[HttpTransporter] send request to {}{} successfully: {}, response: {}", address, object.path(), object, res)) - .onFailure(t -> log.warn("[HttpTransporter] send request to {}{} failed: {}", address, object.path(), object, t)); - } -} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/starter/AkkaStarter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/starter/AkkaStarter.java deleted file mode 100644 index de54e273..00000000 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/starter/AkkaStarter.java +++ /dev/null @@ -1,88 +0,0 @@ -package tech.powerjob.server.remote.transport.starter; - -import akka.actor.ActorSelection; -import akka.actor.ActorSystem; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import tech.powerjob.common.OmsConstant; -import tech.powerjob.common.RemoteConstant; -import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.server.common.PowerJobServerConfigKey; -import tech.powerjob.server.common.utils.PropertyUtils; -import tech.powerjob.server.remote.server.FriendRequestHandler; - -import java.util.Map; -import java.util.Properties; - -/** - * 服务端 ActorSystem 启动器 - * - * @author tjq - * @since 2020/4/2 - */ -@Slf4j -public class AkkaStarter { - - public static ActorSystem actorSystem; - @Getter - private static String actorSystemAddress; - - private static final String AKKA_PATH = "akka://%s@%s/user/%s"; - - public static void init() { - - Stopwatch stopwatch = Stopwatch.createStarted(); - log.info("[PowerJob] PowerJob's akka system start to bootstrap..."); - - // 忽略了一个问题,机器是没办法访问外网的,除非架设自己的NTP服务器 - // TimeUtils.check(); - - // 解析配置文件 - Config akkaFinalConfig = parseConfig(); - actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig); - actorSystem.actorOf(FriendRequestHandler.defaultProps(), RemoteConstant.SERVER_FRIEND_ACTOR_NAME); - log.info("[PowerJob] PowerJob's akka system started successfully, using time {}.", stopwatch); - } - - private static Config parseConfig() { - Properties properties = PropertyUtils.getProperties(); - int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.AKKA_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_AKKA_PORT))); - String portFromJvm = System.getProperty(PowerJobServerConfigKey.AKKA_PORT); - if (StringUtils.isNotEmpty(portFromJvm)) { - log.info("[PowerJob] use port from jvm params: {}", portFromJvm); - port = Integer.parseInt(portFromJvm); - } - - // 启动 ActorSystem - Map overrideConfig = Maps.newHashMap(); - String localIp = NetUtils.getLocalHost(); - overrideConfig.put("akka.remote.artery.canonical.hostname", localIp); - overrideConfig.put("akka.remote.artery.canonical.port", port); - actorSystemAddress = localIp + ":" + port; - log.info("[PowerJob] akka-remote server address: {}", actorSystemAddress); - - Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.SERVER_AKKA_CONFIG_NAME); - return ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); - } - - /** - * 获取 ServerActor 的 ActorSelection - * - * @param address IP:port - * @return ActorSelection - */ - public static ActorSelection getFriendActor(String address) { - String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_FRIEND_ACTOR_NAME); - return actorSystem.actorSelection(path); - } - - public static ActorSelection getWorkerActor(String address) { - String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.WORKER_ACTOR_NAME); - return actorSystem.actorSelection(path); - } -} diff --git a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/starter/VertXStarter.java b/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/starter/VertXStarter.java deleted file mode 100644 index 5f419c26..00000000 --- a/powerjob-server/powerjob-server-remote/src/main/java/tech/powerjob/server/remote/transport/starter/VertXStarter.java +++ /dev/null @@ -1,47 +0,0 @@ -package tech.powerjob.server.remote.transport.starter; - -import tech.powerjob.common.OmsConstant; -import tech.powerjob.common.utils.NetUtils; -import tech.powerjob.server.common.PowerJobServerConfigKey; -import tech.powerjob.server.common.utils.PropertyUtils; -import com.google.common.base.Stopwatch; -import io.vertx.core.Vertx; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; - -import java.util.Properties; - -/** - * vert.x starter - * - * @author tjq - * @since 2021/2/8 - */ -@Slf4j -public class VertXStarter { - - public static Vertx vertx; - @Getter - private static String address; - - public static void init() { - Stopwatch stopwatch = Stopwatch.createStarted(); - log.info("[PowerJob] PowerJob's vert.x system start to bootstrap..."); - - Properties properties = PropertyUtils.getProperties(); - int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.HTTP_PORT, String.valueOf(OmsConstant.SERVER_DEFAULT_HTTP_PORT))); - String portFromJVM = System.getProperty(PowerJobServerConfigKey.HTTP_PORT); - if (StringUtils.isNotEmpty(portFromJVM)) { - port = Integer.parseInt(portFromJVM); - } - String localIP = NetUtils.getLocalHost(); - - address = localIP + ":" + port; - log.info("[PowerJob] vert.x server address: {}", address); - - vertx = Vertx.vertx(); - - log.info("[PowerJob] PowerJob's vert.x system started successfully, using time {}.", stopwatch); - } -} diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/PowerJobServerApplication.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/PowerJobServerApplication.java index 22a660af..75af8667 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/PowerJobServerApplication.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/PowerJobServerApplication.java @@ -1,8 +1,6 @@ package tech.powerjob.server; import tech.powerjob.server.common.utils.PropertyUtils; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; -import tech.powerjob.server.remote.transport.starter.VertXStarter; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -30,9 +28,6 @@ public class PowerJobServerApplication { pre(); - AkkaStarter.init(); - VertXStarter.init(); - // Start SpringBoot application. try { SpringApplication.run(PowerJobServerApplication.class, args); diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ContainerController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ContainerController.java index 8f13e0ef..4b1aca3e 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ContainerController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ContainerController.java @@ -1,20 +1,5 @@ package tech.powerjob.server.web.controller; -import tech.powerjob.common.OmsConstant; -import tech.powerjob.common.response.ResultDTO; -import tech.powerjob.server.remote.transport.starter.AkkaStarter; -import tech.powerjob.server.common.constants.ContainerSourceType; -import tech.powerjob.server.common.constants.SwitchableStatus; -import tech.powerjob.server.core.container.ContainerTemplateGenerator; -import tech.powerjob.server.common.utils.OmsFileUtils; -import tech.powerjob.server.persistence.remote.model.AppInfoDO; -import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; -import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; -import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; -import tech.powerjob.server.core.container.ContainerService; -import tech.powerjob.server.web.request.GenerateContainerTemplateRequest; -import tech.powerjob.server.web.request.SaveContainerInfoRequest; -import tech.powerjob.server.web.response.ContainerInfoVO; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; @@ -22,8 +7,21 @@ import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; +import tech.powerjob.common.OmsConstant; +import tech.powerjob.common.response.ResultDTO; +import tech.powerjob.server.common.constants.ContainerSourceType; +import tech.powerjob.server.common.constants.SwitchableStatus; +import tech.powerjob.server.common.utils.OmsFileUtils; +import tech.powerjob.server.core.container.ContainerService; +import tech.powerjob.server.core.container.ContainerTemplateGenerator; +import tech.powerjob.server.persistence.remote.model.AppInfoDO; +import tech.powerjob.server.persistence.remote.model.ContainerInfoDO; +import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; +import tech.powerjob.server.persistence.remote.repository.ContainerInfoRepository; +import tech.powerjob.server.web.request.GenerateContainerTemplateRequest; +import tech.powerjob.server.web.request.SaveContainerInfoRequest; +import tech.powerjob.server.web.response.ContainerInfoVO; -import javax.annotation.Resource; import javax.servlet.http.HttpServletResponse; import java.io.File; import java.io.IOException; @@ -114,17 +112,6 @@ public class ContainerController { return ResultDTO.failed("No workers have even registered!"); } - // 转发 HTTP 请求 - if (!AkkaStarter.getActorSystemAddress().equals(targetServer)) { - String targetIp = targetServer.split(":")[0]; - String url = String.format("http://%s:%d/container/listDeployedWorker?appId=%d&containerId=%d", targetIp, port, appId, containerId); - try { - response.sendRedirect(url); - return ResultDTO.success(null); - }catch (Exception e) { - return ResultDTO.failed(e); - } - } return ResultDTO.success(containerService.fetchDeployedInfo(appId, containerId)); } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java index ada346e3..aa844214 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java @@ -13,7 +13,7 @@ import tech.powerjob.common.utils.NetUtils; import tech.powerjob.server.persistence.remote.model.AppInfoDO; import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; import tech.powerjob.server.remote.server.election.ServerElectionService; -import tech.powerjob.server.remote.transport.TransportService; +import tech.powerjob.server.remote.tp.TransportService; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; import java.util.Optional; @@ -55,7 +55,7 @@ public class ServerController { public ResultDTO ping(@RequestParam(required = false) boolean debug) { JSONObject res = new JSONObject(); res.put("localHost", NetUtils.getLocalHost()); - res.put("communicationSystemInfo", transportService.getProtocol2Transporter()); + res.put("defaultAddress", transportService.defaultProtocol()); res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis())); res.put("serverTimeZone", TimeZone.getDefault().getDisplayName()); res.put("appIds", workerClusterQueryService.getAppId2ClusterStatus().keySet()); diff --git a/powerjob-server/powerjob-server-starter/src/main/resources/application.properties b/powerjob-server/powerjob-server-starter/src/main/resources/application.properties index 9f65c568..72fd1ec8 100644 --- a/powerjob-server/powerjob-server-starter/src/main/resources/application.properties +++ b/powerjob-server/powerjob-server-starter/src/main/resources/application.properties @@ -18,6 +18,7 @@ spring.main.allow-circular-references=true ###### PowerJob self-owned configuration (The following properties should exist in application.properties only). ###### # Akka ActorSystem port. +oms.transporter.active.protocols=AKKA,HTTP oms.akka.port=10086 oms.http.port=10010 # Prefix for all tables. Default empty string. Config if you have needs, i.e. pj_ diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java index ac2f185f..3a7f1127 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/TransportUtils.java @@ -30,7 +30,7 @@ import static tech.powerjob.common.RemoteConstant.*; public class TransportUtils { public static void ttReportInstanceStatus(TaskTrackerReportInstanceStatusReq req, String address, Transporter transporter) { - final URL url = easyBuildUrl(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_REPORT_INSTANCE_STATUS, address); + final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_INSTANCE_STATUS, address); transporter.tell(url, req); } @@ -55,12 +55,12 @@ public class TransportUtils { } public static void reportLogs(WorkerLogReportReq req, String address, Transporter transporter) { - final URL url = easyBuildUrl(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_REPORT_LOG, address); + final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_LOG, address); transporter.tell(url, req); } public static void reportWorkerHeartbeat(WorkerHeartbeat req, String address, Transporter transporter) { - final URL url = easyBuildUrl(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_WORKER_HEARTBEAT, address); + final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_WORKER_HEARTBEAT, address); transporter.tell(url, req); } @@ -83,17 +83,17 @@ public class TransportUtils { @SneakyThrows public static boolean reliableTtReportInstanceStatus(TaskTrackerReportInstanceStatusReq req, String address, Transporter transporter) { - return reliableAsk(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_REPORT_INSTANCE_STATUS, address, req, transporter).isSuccess(); + return reliableAsk(ServerType.SERVER, S4W_PATH, S4W_HANDLER_REPORT_INSTANCE_STATUS, address, req, transporter).isSuccess(); } @SneakyThrows public static AskResponse reliableQueryJobCluster(WorkerQueryExecutorClusterReq req, String address, Transporter transporter) { - return reliableAsk(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_QUERY_JOB_CLUSTER, address, req, transporter); + return reliableAsk(ServerType.SERVER, S4W_PATH, S4W_HANDLER_QUERY_JOB_CLUSTER, address, req, transporter); } @SneakyThrows public static AskResponse reliableQueryContainerInfo(WorkerNeedDeployContainerRequest req, String address, Transporter transporter) { - return reliableAsk(ServerType.SERVER, SERVER_PATH, SERVER_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, address, req, transporter); + return reliableAsk(ServerType.SERVER, S4W_PATH, S4W_HANDLER_WORKER_NEED_DEPLOY_CONTAINER, address, req, transporter); } private static AskResponse reliableAsk(ServerType t, String rootPath, String handlerPath, String address, PowerSerializable req, Transporter transporter) throws Exception {