diff --git a/README.md b/README.md index 2eaabf71..62e8fa8f 100644 --- a/README.md +++ b/README.md @@ -22,11 +22,21 @@ PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架, * 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。 * 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。 -[在线试用地址](https://www.yuque.com/powerjob/guidence/hnbskn) ### 适用场景 * 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。 * 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。 * 有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce处理器完成任务的分发,调动整个集群加速计算。 +* 有需要延迟执行某些任务的业务场景:比如订单过期处理等。 + +### 设计目标 +PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server,旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。 + +### 在线试用 +试用地址:[try.powerjob.tech](http://try.powerjob.tech/) +试用应用名称:powerjob-agent-test +控制台密码:123 + +[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn) ### 同类产品对比 | | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob | @@ -43,7 +53,9 @@ PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架, # 文档 -**[超详细中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** OR **[备用地址(内容可能更新不及时)](https://kfcfans.github.io/)** +**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** + +**[Document](https://www.yuque.com/powerjob/en/xrdoqw)** PS:感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献! diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index 3ae451b5..54d00ad5 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -66,9 +66,11 @@ public class OhMyClient { appId = Long.parseLong(resultDTO.getData().toString()); currentAddress = addr; break; + }else { + throw new OmsException(resultDTO.getMessage()); } } - }catch (Exception ignore) { + }catch (IOException ignore) { } } diff --git a/powerjob-client/src/test/java/TestClient.java b/powerjob-client/src/test/java/TestClient.java index 3106ea01..f1f7dec8 100644 --- a/powerjob-client/src/test/java/TestClient.java +++ b/powerjob-client/src/test/java/TestClient.java @@ -21,7 +21,7 @@ public class TestClient { @BeforeAll public static void initClient() throws Exception { - ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test2", null); + ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123"); } @Test @@ -70,7 +70,7 @@ public class TestClient { @Test public void testRunJob() throws Exception { - System.out.println(ohMyClient.runJob(8L, "this is instanceParams", 20)); + System.out.println(ohMyClient.runJob(6L, "this is instanceParams", 60000)); } @Test diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java index 6acb41fb..fc83f9a2 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java @@ -10,6 +10,7 @@ import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerReque import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; +import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository; @@ -91,8 +92,10 @@ public class ServerActor extends AbstractActor { Optional containerInfoOpt = containerInfoRepository.findById(req.getContainerId()); AskResponse askResponse = new AskResponse(); - askResponse.setSuccess(false); - if (containerInfoOpt.isPresent()) { + if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) { + askResponse.setSuccess(false); + askResponse.setMessage("can't find container by id: " + req.getContainerId()); + }else { ContainerInfoDO containerInfo = containerInfoOpt.get(); askResponse.setSuccess(true); @@ -104,7 +107,6 @@ public class ServerActor extends AbstractActor { askResponse.setData(JsonUtils.toBytes(dpReq)); } - getSender().tell(askResponse, getSelf()); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java index 941b12e9..b1077bb1 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java @@ -45,9 +45,9 @@ public class HashedWheelTimer implements Timer { * 新建时间轮定时器 * @param tickDuration 时间间隔,单位毫秒(ms) * @param ticksPerWheel 轮盘个数 - * @param processTaskNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池) + * @param processThreadNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池) */ - public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processTaskNum) { + public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) { this.tickDuration = tickDuration; @@ -60,12 +60,13 @@ public class HashedWheelTimer implements Timer { mask = wheel.length - 1; // 初始化执行线程池 - if (processTaskNum <= 0) { + if (processThreadNum <= 0) { taskProcessPool = null; }else { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build(); BlockingQueue queue = Queues.newLinkedBlockingQueue(16); - taskProcessPool = new ThreadPoolExecutor(2, processTaskNum, + int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum); + taskProcessPool = new ThreadPoolExecutor(core, 2 * core, 60, TimeUnit.SECONDS, queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java index 6d3e8bb2..1a7508ed 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java @@ -59,7 +59,7 @@ public class SystemInfoController { } String server =appInfoOpt.get().getCurrentServer(); - // 没有Server + // 没有 Server,说明从来没有该 appId 的 worker 集群连接过 if (StringUtils.isEmpty(server)) { return ResultDTO.success(Collections.emptyList()); } diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/OhMySchedulerConfig.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/OhMySchedulerConfig.java index 55426d59..3cc8b8ca 100644 --- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/OhMySchedulerConfig.java +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/OhMySchedulerConfig.java @@ -31,7 +31,7 @@ public class OhMySchedulerConfig { // 1. 创建配置文件 OhMyConfig config = new OhMyConfig(); config.setPort(port); - config.setAppName("powerjob"); + config.setAppName("powerjob-agent-test"); config.setServerAddress(serverAddress); // 如果没有大型 Map/MapReduce 的需求,建议使用内存来加速计算 // 为了本地模拟多个实例,只能使用 MEMORY 启动(文件只能由一个应用占有) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsContainerFactory.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsContainerFactory.java index 0dffc768..7661f7e7 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsContainerFactory.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsContainerFactory.java @@ -39,16 +39,21 @@ public class OmsContainerFactory { /** * 获取容器 * @param containerId 容器ID + * @param loadFromServer 当本地不存在时尝试从 server 加载 * @return 容器示例,可能为 null */ - public static OmsContainer getContainer(Long containerId) { + public static OmsContainer fetchContainer(Long containerId, boolean loadFromServer) { OmsContainer omsContainer = CARGO.get(containerId); if (omsContainer != null) { return omsContainer; } - // 尝试下载 + if (!loadFromServer) { + return null; + } + + // 尝试从 server 加载 log.info("[OmsContainer-{}] can't find the container in factory, try to deploy from server.", containerId); WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(containerId); @@ -66,6 +71,8 @@ public class OmsContainerFactory { ServerDeployContainerRequest deployRequest = askResponse.getData(ServerDeployContainerRequest.class); log.info("[OmsContainer-{}] fetch containerInfo from server successfully.", containerId); deployContainer(deployRequest); + }else { + log.warn("[OmsContainer-{}] fetch containerInfo failed, reason is {}.", containerId, askResponse.getMessage()); } }catch (Exception e) { log.error("[OmsContainer-{}] get container failed, exception is {}", containerId, e.toString()); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java index f1eb08f9..6dfacce6 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java @@ -190,7 +190,7 @@ public class OmsJarContainer implements OmsContainer { // 需要满足的条件:引用计数器减为0 & 有更新的容器出现 if (referenceCount.decrementAndGet() <= 0) { - OmsContainer container = OmsContainerFactory.getContainer(containerId); + OmsContainer container = OmsContainerFactory.fetchContainer(containerId, false); if (container != this) { try { destroy(); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index cc85bf8a..2fd99c0e 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -309,9 +309,11 @@ public class ProcessorTracker { String[] split = processorInfo.split("#"); log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", instanceId, split[1], split[0]); - omsContainer = OmsContainerFactory.getContainer(Long.valueOf(split[0])); + omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), true); if (omsContainer != null) { processor = omsContainer.getProcessor(split[1]); + }else { + log.warn("[ProcessorTracker-{}] load container failed.", instanceId); } break; default: