diff --git a/README.md b/README.md
index 2eaabf71..62e8fa8f 100644
--- a/README.md
+++ b/README.md
@@ -22,11 +22,21 @@ PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架,
* 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。
* 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。
-[在线试用地址](https://www.yuque.com/powerjob/guidence/hnbskn)
### 适用场景
* 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。
* 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
* 有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce处理器完成任务的分发,调动整个集群加速计算。
+* 有需要延迟执行某些任务的业务场景:比如订单过期处理等。
+
+### 设计目标
+PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server,旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。
+
+### 在线试用
+试用地址:[try.powerjob.tech](http://try.powerjob.tech/)
+试用应用名称:powerjob-agent-test
+控制台密码:123
+
+[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn)
### 同类产品对比
| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob |
@@ -43,7 +53,9 @@ PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架,
# 文档
-**[超详细中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** OR **[备用地址(内容可能更新不及时)](https://kfcfans.github.io/)**
+**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)**
+
+**[Document](https://www.yuque.com/powerjob/en/xrdoqw)**
PS:感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献!
diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml
index 43c59747..376f6977 100644
--- a/powerjob-client/pom.xml
+++ b/powerjob-client/pom.xml
@@ -10,11 +10,11 @@
4.0.0
powerjob-client
- 3.1.2
+ 3.1.3
jar
- 3.1.2
+ 3.1.3
5.6.1
diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java
index b6ee21b8..54d00ad5 100644
--- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java
+++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java
@@ -66,9 +66,11 @@ public class OhMyClient {
appId = Long.parseLong(resultDTO.getData().toString());
currentAddress = addr;
break;
+ }else {
+ throw new OmsException(resultDTO.getMessage());
}
}
- }catch (Exception ignore) {
+ }catch (IOException ignore) {
}
}
@@ -173,13 +175,15 @@ public class OhMyClient {
* 运行某个任务
* @param jobId 任务ID
* @param instanceParams 任务实例的参数
+ * @param delayMS 延迟时间,单位毫秒
* @return 任务实例ID(instanceId)
* @throws Exception 异常
*/
- public ResultDTO runJob(Long jobId, String instanceParams) throws Exception {
+ public ResultDTO runJob(Long jobId, String instanceParams, long delayMS) throws Exception {
FormBody.Builder builder = new FormBody.Builder()
.add("jobId", jobId.toString())
- .add("appId", appId.toString());
+ .add("appId", appId.toString())
+ .add("delay", String.valueOf(delayMS));
if (StringUtils.isNotEmpty(instanceParams)) {
builder.add("instanceParams", instanceParams);
@@ -188,7 +192,7 @@ public class OhMyClient {
return JsonUtils.parseObject(post, ResultDTO.class);
}
public ResultDTO runJob(Long jobId) throws Exception {
- return runJob(jobId, null);
+ return runJob(jobId, null, 0);
}
/* ************* Instance 区 ************* */
diff --git a/powerjob-client/src/test/java/TestClient.java b/powerjob-client/src/test/java/TestClient.java
index 2306dffc..f1f7dec8 100644
--- a/powerjob-client/src/test/java/TestClient.java
+++ b/powerjob-client/src/test/java/TestClient.java
@@ -21,7 +21,7 @@ public class TestClient {
@BeforeAll
public static void initClient() throws Exception {
- ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test2", null);
+ ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
}
@Test
@@ -70,7 +70,7 @@ public class TestClient {
@Test
public void testRunJob() throws Exception {
- System.out.println(ohMyClient.runJob(8L, "this is instanceParams"));
+ System.out.println(ohMyClient.runJob(6L, "this is instanceParams", 60000));
}
@Test
diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml
index 625fa6db..b4ebcc1f 100644
--- a/powerjob-common/pom.xml
+++ b/powerjob-common/pom.xml
@@ -10,7 +10,7 @@
4.0.0
powerjob-common
- 3.1.2
+ 3.1.3
jar
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerDestroyContainerRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerDestroyContainerRequest.java
index aa7ea83f..718f543f 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerDestroyContainerRequest.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/ServerDestroyContainerRequest.java
@@ -15,5 +15,5 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
public class ServerDestroyContainerRequest implements OmsSerializable {
- private String containerName;
+ private Long containerId;
}
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java
index b2fa7f78..ca3b5c2c 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java
@@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.common.request.http;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
+import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import lombok.Data;
import java.util.List;
@@ -76,4 +77,14 @@ public class SaveJobInfoRequest {
// 报警用户ID列表
private List notifyUserIds;
+
+
+ public void valid() {
+ CommonUtils.requireNonNull(jobName, "jobName can't be empty");
+ CommonUtils.requireNonNull(appId, "appId can't be empty");
+ CommonUtils.requireNonNull(processorInfo, "processorInfo can't be empty");
+ CommonUtils.requireNonNull(executeType, "executeType can't be empty");
+ CommonUtils.requireNonNull(processorType, "processorType can't be empty");
+ CommonUtils.requireNonNull(timeExpressionType, "timeExpressionType can't be empty");
+ }
}
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java
index ec06754a..3e0e03e1 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveWorkflowRequest.java
@@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.common.request.http;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
+import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.google.common.collect.Lists;
import lombok.Data;
@@ -43,4 +44,11 @@ public class SaveWorkflowRequest {
// 工作流整体失败的报警
private List notifyUserIds = Lists.newLinkedList();
+
+ public void valid() {
+ CommonUtils.requireNonNull(wfName, "workflow name can't be empty");
+ CommonUtils.requireNonNull(appId, "appId can't be empty");
+ CommonUtils.requireNonNull(pEWorkflowDAG, "dag can't be empty");
+ CommonUtils.requireNonNull(timeExpressionType, "timeExpressionType can't be empty");
+ }
}
diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java
index 8126dde8..52277188 100644
--- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java
+++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java
@@ -1,8 +1,11 @@
package com.github.kfcfans.powerjob.common.utils;
+import com.github.kfcfans.powerjob.common.OmsException;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import java.util.Collection;
+import java.util.Objects;
import java.util.function.Supplier;
@@ -114,4 +117,16 @@ public class CommonUtils {
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
+ public static T requireNonNull(T obj, String msg) {
+ if (obj == null) {
+ throw new OmsException(msg);
+ }
+ if (obj instanceof String) {
+ if (StringUtils.isEmpty((String) obj)) {
+ throw new OmsException(msg);
+ }
+ }
+ return obj;
+ }
+
}
diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml
index 0eee88e4..7f3cdbf5 100644
--- a/powerjob-server/pom.xml
+++ b/powerjob-server/pom.xml
@@ -10,13 +10,13 @@
4.0.0
powerjob-server
- 3.1.2
+ 3.1.3
jar
2.9.2
2.2.6.RELEASE
- 3.1.2
+ 3.1.3
8.0.19
1.4.200
2.5.2
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java
index 6acb41fb..fc83f9a2 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java
@@ -10,6 +10,7 @@ import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerReque
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
+import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository;
@@ -91,8 +92,10 @@ public class ServerActor extends AbstractActor {
Optional containerInfoOpt = containerInfoRepository.findById(req.getContainerId());
AskResponse askResponse = new AskResponse();
- askResponse.setSuccess(false);
- if (containerInfoOpt.isPresent()) {
+ if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) {
+ askResponse.setSuccess(false);
+ askResponse.setMessage("can't find container by id: " + req.getContainerId());
+ }else {
ContainerInfoDO containerInfo = containerInfoOpt.get();
askResponse.setSuccess(true);
@@ -104,7 +107,6 @@ public class ServerActor extends AbstractActor {
askResponse.setData(JsonUtils.toBytes(dpReq));
}
-
getSender().tell(askResponse, getSelf());
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/ContainerStatus.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/ContainerStatus.java
deleted file mode 100644
index f93b4e90..00000000
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/constans/ContainerStatus.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package com.github.kfcfans.powerjob.server.common.constans;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-
-/**
- * 容器状态
- * 由于命名约束,准备采取硬删除策略
- *
- * @author tjq
- * @since 2020/5/15
- */
-@Getter
-@AllArgsConstructor
-public enum ContainerStatus {
-
- ENABLE(1),
- DISABLE(2),
- DELETED(99);
-
- private int v;
-
- public static ContainerStatus of(int v) {
- for (ContainerStatus type : values()) {
- if (type.v == v) {
- return type;
- }
- }
- throw new IllegalArgumentException("unknown ContainerStatus of " + v);
- }
-}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/CronExpression.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/CronExpression.java
index e753cd49..de59f0f5 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/CronExpression.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/CronExpression.java
@@ -1,5 +1,19 @@
package com.github.kfcfans.powerjob.server.common.utils;
+/*
+Copyright [2020] [KFCFans]
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+ */
import java.io.Serializable;
import java.text.ParseException;
import java.util.Calendar;
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java
index 41960391..b1077bb1 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/HashedWheelTimer.java
@@ -37,8 +37,6 @@ public class HashedWheelTimer implements Timer {
private final ExecutorService taskProcessPool;
- private static final int MAXIMUM_CAPACITY = 1 << 30;
-
public HashedWheelTimer(long tickDuration, int ticksPerWheel) {
this(tickDuration, ticksPerWheel, 0);
}
@@ -47,9 +45,9 @@ public class HashedWheelTimer implements Timer {
* 新建时间轮定时器
* @param tickDuration 时间间隔,单位毫秒(ms)
* @param ticksPerWheel 轮盘个数
- * @param processTaskNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池)
+ * @param processThreadNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池)
*/
- public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processTaskNum) {
+ public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) {
this.tickDuration = tickDuration;
@@ -62,12 +60,13 @@ public class HashedWheelTimer implements Timer {
mask = wheel.length - 1;
// 初始化执行线程池
- if (processTaskNum <= 0) {
+ if (processThreadNum <= 0) {
taskProcessPool = null;
}else {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
BlockingQueue queue = Queues.newLinkedBlockingQueue(16);
- taskProcessPool = new ThreadPoolExecutor(2, processTaskNum,
+ int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
+ taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
60, TimeUnit.SECONDS,
queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/TimerFuture.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/TimerFuture.java
index 61ec9ed6..68f50609 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/TimerFuture.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/common/utils/timewheel/TimerFuture.java
@@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.common.utils.timewheel;
/**
- * description
+ * TimerFuture
*
* @author tjq
* @since 2020/4/3
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ContainerInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ContainerInfoRepository.java
index 1f913039..f1a4c916 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ContainerInfoRepository.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ContainerInfoRepository.java
@@ -13,5 +13,5 @@ import java.util.List;
*/
public interface ContainerInfoRepository extends JpaRepository {
- List findByAppId(Long appId);
+ List findByAppIdAndStatusNot(Long appId, Integer status);
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java
index aaf05ac8..0f10b0b4 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ContainerService.java
@@ -12,6 +12,7 @@ import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType;
+import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository;
@@ -85,6 +86,9 @@ public class ContainerService {
* @param request 容器保存请求
*/
public void save(SaveContainerInfoRequest request) {
+
+ request.valid();
+
ContainerInfoDO container;
Long originId = request.getId();
if (originId != null) {
@@ -120,15 +124,17 @@ public class ContainerService {
throw new RuntimeException("Permission Denied!");
}
- ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getContainerName());
+ ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId());
WorkerManagerService.getActiveWorkerInfo(container.getAppId()).keySet().forEach(akkaAddress -> {
ActorSelection workerActor = OhMyServer.getWorkerActor(akkaAddress);
workerActor.tell(destroyRequest, null);
});
log.info("[ContainerService] delete container: {}.", container);
- // 硬删除算了...留着好像也没什么用
- containerInfoRepository.deleteById(containerId);
+ // 软删除
+ container.setStatus(SwitchableStatus.DELETED.getV());
+ container.setGmtModified(new Date());
+ containerInfoRepository.saveAndFlush(container);
}
/**
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java
index 6c187d45..b485edf2 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/JobService.java
@@ -12,6 +12,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
+import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
@@ -21,6 +22,7 @@ import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
/**
* 任务服务
@@ -50,6 +52,8 @@ public class JobService {
*/
public Long saveJob(SaveJobInfoRequest request) throws Exception {
+ request.valid();
+
JobInfoDO jobInfoDO;
if (request.getId() != null) {
jobInfoDO = jobInfoRepository.findById(request.getId()).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + request.getId()));
@@ -94,16 +98,22 @@ public class JobService {
* 手动立即运行某个任务
* @param jobId 任务ID
* @param instanceParams 任务实例参数(仅 OpenAPI 存在)
+ * @param delay 延迟时间,单位 毫秒
* @return 任务实例ID
*/
- public long runJob(Long jobId, String instanceParams) {
+ public long runJob(Long jobId, String instanceParams, long delay) {
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
-
- Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis());
+ Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
instanceInfoRepository.flush();
- dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
+ if (delay <= 0) {
+ dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
+ }else {
+ HashedWheelTimerHolder.TIMER.schedule(() -> {
+ dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
+ }, delay, TimeUnit.MILLISECONDS);
+ }
return instanceId;
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java
index 5b232a3e..e9037176 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/ha/ServerSelectService.java
@@ -43,7 +43,7 @@ public class ServerSelectService {
/**
* 获取某个应用对应的Server
- * 缺点:如果server死而复生,可能造成worker集群脑裂,不过感觉影响不是很大 & 概率极低,就不管了
+ *
* @param appId 应用ID
* @return 当前可用的Server
*/
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java
index 0ebcd68a..d4aa4efd 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java
@@ -258,7 +258,7 @@ public class OmsScheduleService {
}
log.info("[FrequentScheduler] These frequent jobs will be scheduled: {}.", notRunningJobIds);
- notRunningJobIds.forEach(jobId -> jobService.runJob(jobId, null));
+ notRunningJobIds.forEach(jobId -> jobService.runJob(jobId, null, 0));
}catch (Exception e) {
log.error("[FrequentScheduler] schedule frequent job failed.", e);
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java
index c2884f65..63e224b5 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowService.java
@@ -39,6 +39,8 @@ public class WorkflowService {
*/
public Long saveWorkflow(SaveWorkflowRequest req) throws Exception {
+ req.valid();
+
if (!WorkflowDAGUtils.valid(req.getPEWorkflowDAG())) {
throw new OmsException("illegal DAG");
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ContainerController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ContainerController.java
index c11eae81..bd0453a4 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ContainerController.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ContainerController.java
@@ -4,7 +4,7 @@ import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType;
-import com.github.kfcfans.powerjob.server.common.constans.ContainerStatus;
+import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.ContainerTemplateGenerator;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
@@ -87,7 +87,8 @@ public class ContainerController {
@GetMapping("/list")
public ResultDTO> listContainers(Long appId) {
- List res = containerInfoRepository.findByAppId(appId).stream().map(ContainerController::convert).collect(Collectors.toList());
+ List res = containerInfoRepository.findByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV())
+ .stream().map(ContainerController::convert).collect(Collectors.toList());
return ResultDTO.success(res);
}
@@ -122,7 +123,7 @@ public class ContainerController {
}else {
vo.setLastDeployTime(DateFormatUtils.format(containerInfoDO.getLastDeployTime(), OmsConstant.TIME_PATTERN));
}
- ContainerStatus status = ContainerStatus.of(containerInfoDO.getStatus());
+ SwitchableStatus status = SwitchableStatus.of(containerInfoDO.getStatus());
vo.setStatus(status.name());
ContainerSourceType sourceType = ContainerSourceType.of(containerInfoDO.getSourceType());
vo.setSourceType(sourceType.name());
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/JobController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/JobController.java
index 34b476e9..c8afcab9 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/JobController.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/JobController.java
@@ -63,7 +63,7 @@ public class JobController {
@GetMapping("/run")
public ResultDTO runImmediately(String jobId) {
- return ResultDTO.success(jobService.runJob(Long.valueOf(jobId), null));
+ return ResultDTO.success(jobService.runJob(Long.valueOf(jobId), null, 0));
}
@PostMapping("/list")
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java
index df4662c2..b9519f9c 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/OpenAPIController.java
@@ -80,9 +80,9 @@ public class OpenAPIController {
}
@PostMapping(OpenAPIConstant.RUN_JOB)
- public ResultDTO runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams) {
+ public ResultDTO runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams, @RequestParam(required = false) Long delay) {
checkJobIdValid(jobId, appId);
- return ResultDTO.success(jobService.runJob(jobId, instanceParams));
+ return ResultDTO.success(jobService.runJob(jobId, instanceParams, delay == null ? 0 : delay));
}
/* ************* Instance 区 ************* */
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java
index 6a3c2588..768a6f71 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java
@@ -46,4 +46,9 @@ public class ServerController {
return ResultDTO.success(server);
}
+ @GetMapping("/hello")
+ public ResultDTO ping() {
+ return ResultDTO.success("this is powerjob-server~");
+ }
+
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java
index 6d3e8bb2..1a7508ed 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/SystemInfoController.java
@@ -59,7 +59,7 @@ public class SystemInfoController {
}
String server =appInfoOpt.get().getCurrentServer();
- // 没有Server
+ // 没有 Server,说明从来没有该 appId 的 worker 集群连接过
if (StringUtils.isEmpty(server)) {
return ResultDTO.success(Collections.emptyList());
}
diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/SaveContainerInfoRequest.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/SaveContainerInfoRequest.java
index 5e2536c7..b56837c1 100644
--- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/SaveContainerInfoRequest.java
+++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/request/SaveContainerInfoRequest.java
@@ -1,7 +1,8 @@
package com.github.kfcfans.powerjob.server.web.request;
+import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType;
-import com.github.kfcfans.powerjob.server.common.constans.ContainerStatus;
+import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import lombok.Data;
/**
@@ -28,5 +29,11 @@ public class SaveContainerInfoRequest {
private String sourceInfo;
// 状态,枚举值为 ContainerStatus(ENABLE/DISABLE)
- private ContainerStatus status;
+ private SwitchableStatus status;
+
+ public void valid() {
+ CommonUtils.requireNonNull(containerName, "containerName can't be empty");
+ CommonUtils.requireNonNull(appId, "appId can't be empty");
+ CommonUtils.requireNonNull(sourceInfo, "sourceInfo can't be empty");
+ }
}
diff --git a/powerjob-server/src/main/resources/static/js/2.js b/powerjob-server/src/main/resources/static/js/2.js
index 2337571d..876f03dc 100644
--- a/powerjob-server/src/main/resources/static/js/2.js
+++ b/powerjob-server/src/main/resources/static/js/2.js
@@ -8,7 +8,7 @@
/***/ (function(module, __webpack_exports__, __webpack_require__) {
"use strict";
-eval("__webpack_require__.r(__webpack_exports__);\n/* harmony import */ var core_js_modules_es_array_includes__WEBPACK_IMPORTED_MODULE_0__ = __webpack_require__(/*! core-js/modules/es.array.includes */ \"./node_modules/core-js/modules/es.array.includes.js\");\n/* harmony import */ var core_js_modules_es_array_includes__WEBPACK_IMPORTED_MODULE_0___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_array_includes__WEBPACK_IMPORTED_MODULE_0__);\n/* harmony import */ var core_js_modules_es_array_splice__WEBPACK_IMPORTED_MODULE_1__ = __webpack_require__(/*! core-js/modules/es.array.splice */ \"./node_modules/core-js/modules/es.array.splice.js\");\n/* harmony import */ var core_js_modules_es_array_splice__WEBPACK_IMPORTED_MODULE_1___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_array_splice__WEBPACK_IMPORTED_MODULE_1__);\n/* harmony import */ var core_js_modules_es_regexp_exec__WEBPACK_IMPORTED_MODULE_2__ = __webpack_require__(/*! core-js/modules/es.regexp.exec */ \"./node_modules/core-js/modules/es.regexp.exec.js\");\n/* harmony import */ var core_js_modules_es_regexp_exec__WEBPACK_IMPORTED_MODULE_2___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_regexp_exec__WEBPACK_IMPORTED_MODULE_2__);\n/* harmony import */ var core_js_modules_es_string_includes__WEBPACK_IMPORTED_MODULE_3__ = __webpack_require__(/*! core-js/modules/es.string.includes */ \"./node_modules/core-js/modules/es.string.includes.js\");\n/* harmony import */ var core_js_modules_es_string_includes__WEBPACK_IMPORTED_MODULE_3___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_string_includes__WEBPACK_IMPORTED_MODULE_3__);\n/* harmony import */ var core_js_modules_es_string_replace__WEBPACK_IMPORTED_MODULE_4__ = __webpack_require__(/*! core-js/modules/es.string.replace */ \"./node_modules/core-js/modules/es.string.replace.js\");\n/* harmony import */ var core_js_modules_es_string_replace__WEBPACK_IMPORTED_MODULE_4___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_string_replace__WEBPACK_IMPORTED_MODULE_4__);\n/* harmony import */ var core_js_modules_es_string_split__WEBPACK_IMPORTED_MODULE_5__ = __webpack_require__(/*! core-js/modules/es.string.split */ \"./node_modules/core-js/modules/es.string.split.js\");\n/* harmony import */ var core_js_modules_es_string_split__WEBPACK_IMPORTED_MODULE_5___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_string_split__WEBPACK_IMPORTED_MODULE_5__);\n/* harmony import */ var _main__WEBPACK_IMPORTED_MODULE_6__ = __webpack_require__(/*! ../../main */ \"./src/main.js\");\n\n\n\n\n\n\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n\nvar ws;\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"ContainerManager\",\n data: function data() {\n return {\n form: {\n sourceType: 'Git',\n containerName: ''\n },\n gitForm: {\n repo: '',\n branch: '',\n username: '',\n password: ''\n },\n sourceInfo: '',\n id: '',\n appId: this.$store.state.appInfo.id,\n dialogVisible: false,\n arrangeTitle: '',\n arrangeVisible: false,\n containerList: [],\n logs: [],\n requestUrl: \"\",\n fileList: []\n };\n },\n methods: {\n onSubmit: function onSubmit() {\n var _this = this;\n\n // 接口参数\n var data = {\n appId: this.appId,\n containerName: this.form.containerName,\n status: \"ENABLE\",\n id: this.id,\n sourceType: this.form.sourceType\n };\n\n if (this.form.sourceType == 'Git') {\n data.sourceInfo = JSON.stringify(this.gitForm);\n } else {\n data.sourceInfo = this.sourceInfo;\n data.sourceType = 'FatJar';\n }\n\n this.flyio.post(\"container/save\", data).then(function (res) {\n if (res.data.success) {\n var appId = _this.$store.state.appInfo.id;\n\n _this.flyio.get(\"/container/list?appId=\" + appId).then(function (res) {\n if (res.data.success) {\n _this.$message.info(_this.$t('message.success')); // 恢复默认表单\n\n\n _this.dialogVisible = false;\n _this.form.containerName = '';\n _this.gitForm = {};\n _this.sourceInfo = '';\n _this.id = ''; // 刷新容器表单\n\n _this.containerList = res.data.data;\n }\n });\n } else {\n _this.$message.warning(_this.$t('message.failed'));\n }\n });\n },\n // 文件上传成功后 修改来源信息\n onSuccess: function onSuccess(response) {\n this.sourceInfo = response.data;\n },\n deleteItem: function deleteItem(item, index) {\n var _this2 = this;\n\n var appId = this.$store.state.appInfo.id;\n this.flyio.get(\"/container/delete?containerId=\" + item.id + '&appId=' + appId).then(function (res) {\n console.log(res);\n\n _this2.containerList.splice(index, 1);\n\n _this2.$message.info(_this2.$t('message.success'));\n });\n },\n editItem: function editItem(item) {\n if (item.sourceType == 'Git') {\n this.form.sourceType = 'Git';\n this.gitForm = JSON.parse(item.sourceInfo);\n } else {\n this.form.sourceType = 'FatJar';\n }\n\n this.form.containerName = item.containerName;\n this.id = item.id;\n this.dialogVisible = true;\n },\n arrangeItem: function arrangeItem(item) {\n var _this3 = this;\n\n var wsBase = this.requestUrl.replace(\"http\", \"ws\") + \"/container/deploy/\";\n var wsUrl = wsBase + item.id;\n ws = new WebSocket(wsUrl);\n\n ws.onopen = function () {\n _this3.arrangeTitle = _this3.$t('message.deploy');\n _this3.arrangeVisible = true;\n console.log(\"Connection open ...\");\n ws.send(\"Hello WebSockets!\");\n };\n\n ws.onmessage = function (evt) {\n _this3.logs.push(evt.data);\n };\n\n ws.onclose = function () {\n console.log(\"Connection closed.\");\n };\n },\n // 关闭部署页面时 关闭ws避免dialog内的信息有上台机器信息\n closeArrange: function closeArrange() {\n ws.close();\n this.logs = [];\n },\n closeEdit: function closeEdit() {\n this.sourceInfo = '';\n this.fileList = [];\n },\n listOfItem: function listOfItem(item) {\n var _this4 = this;\n\n var appId = this.$store.state.appInfo.id;\n this.flyio.get(\"/container/listDeployedWorker?containerId=\" + item.id + '&appId=' + appId).then(function (res) {\n if (res.data.data) {\n _this4.logs = res.data.data.split('\\n');\n _this4.arrangeTitle = _this4.$t('message.deployedWorkerList');\n _this4.arrangeVisible = true;\n } // this.containerList.splice(index,1);\n // this.$message(`容器${item.containerName}已删除`);\n\n });\n },\n // 兼容 java build in 模式下 baseURL 为 / 的情况(将当前url作为请求路径)\n calculateRequestUrl: function calculateRequestUrl() {\n if (_main__WEBPACK_IMPORTED_MODULE_6__[\"default\"] === undefined || !_main__WEBPACK_IMPORTED_MODULE_6__[\"default\"].includes(\"http\")) {\n var url = window.location.href;\n var urlSplit = url.split('//'); // str1[0]--协议头\n\n var ip = urlSplit[1].split('/')[0];\n this.requestUrl = urlSplit[0] + '//' + ip;\n console.log(\"calculateRequestUrl: \" + this.requestUrl);\n } else {\n this.requestUrl = _main__WEBPACK_IMPORTED_MODULE_6__[\"default\"];\n }\n }\n },\n mounted: function mounted() {\n var _this5 = this;\n\n this.calculateRequestUrl();\n var appId = this.$store.state.appInfo.id;\n this.flyio.get(\"/container/list?appId=\" + appId).then(function (res) {\n console.log(res);\n\n if (res.data.success) {\n _this5.containerList = res.data.data;\n }\n });\n }\n});\n\n//# sourceURL=webpack:///./src/components/views/ContainerManager.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options");
+eval("__webpack_require__.r(__webpack_exports__);\n/* harmony import */ var core_js_modules_es_array_includes__WEBPACK_IMPORTED_MODULE_0__ = __webpack_require__(/*! core-js/modules/es.array.includes */ \"./node_modules/core-js/modules/es.array.includes.js\");\n/* harmony import */ var core_js_modules_es_array_includes__WEBPACK_IMPORTED_MODULE_0___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_array_includes__WEBPACK_IMPORTED_MODULE_0__);\n/* harmony import */ var core_js_modules_es_array_splice__WEBPACK_IMPORTED_MODULE_1__ = __webpack_require__(/*! core-js/modules/es.array.splice */ \"./node_modules/core-js/modules/es.array.splice.js\");\n/* harmony import */ var core_js_modules_es_array_splice__WEBPACK_IMPORTED_MODULE_1___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_array_splice__WEBPACK_IMPORTED_MODULE_1__);\n/* harmony import */ var core_js_modules_es_regexp_exec__WEBPACK_IMPORTED_MODULE_2__ = __webpack_require__(/*! core-js/modules/es.regexp.exec */ \"./node_modules/core-js/modules/es.regexp.exec.js\");\n/* harmony import */ var core_js_modules_es_regexp_exec__WEBPACK_IMPORTED_MODULE_2___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_regexp_exec__WEBPACK_IMPORTED_MODULE_2__);\n/* harmony import */ var core_js_modules_es_string_includes__WEBPACK_IMPORTED_MODULE_3__ = __webpack_require__(/*! core-js/modules/es.string.includes */ \"./node_modules/core-js/modules/es.string.includes.js\");\n/* harmony import */ var core_js_modules_es_string_includes__WEBPACK_IMPORTED_MODULE_3___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_string_includes__WEBPACK_IMPORTED_MODULE_3__);\n/* harmony import */ var core_js_modules_es_string_replace__WEBPACK_IMPORTED_MODULE_4__ = __webpack_require__(/*! core-js/modules/es.string.replace */ \"./node_modules/core-js/modules/es.string.replace.js\");\n/* harmony import */ var core_js_modules_es_string_replace__WEBPACK_IMPORTED_MODULE_4___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_string_replace__WEBPACK_IMPORTED_MODULE_4__);\n/* harmony import */ var core_js_modules_es_string_split__WEBPACK_IMPORTED_MODULE_5__ = __webpack_require__(/*! core-js/modules/es.string.split */ \"./node_modules/core-js/modules/es.string.split.js\");\n/* harmony import */ var core_js_modules_es_string_split__WEBPACK_IMPORTED_MODULE_5___default = /*#__PURE__*/__webpack_require__.n(core_js_modules_es_string_split__WEBPACK_IMPORTED_MODULE_5__);\n/* harmony import */ var _main__WEBPACK_IMPORTED_MODULE_6__ = __webpack_require__(/*! ../../main */ \"./src/main.js\");\n\n\n\n\n\n\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n\nvar ws;\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"ContainerManager\",\n data: function data() {\n return {\n form: {\n sourceType: 'Git',\n containerName: ''\n },\n gitForm: {\n repo: '',\n branch: '',\n username: '',\n password: ''\n },\n sourceInfo: '',\n id: '',\n appId: this.$store.state.appInfo.id,\n dialogVisible: false,\n arrangeTitle: '',\n arrangeVisible: false,\n containerList: [],\n logs: [],\n requestUrl: \"\",\n fileList: []\n };\n },\n methods: {\n onSubmit: function onSubmit() {\n var _this = this;\n\n // 接口参数\n var data = {\n appId: this.appId,\n containerName: this.form.containerName,\n status: \"ENABLE\",\n id: this.id,\n sourceType: this.form.sourceType\n };\n\n if (this.form.sourceType == 'Git') {\n data.sourceInfo = JSON.stringify(this.gitForm);\n } else {\n data.sourceInfo = this.sourceInfo;\n data.sourceType = 'FatJar';\n }\n\n this.axios.post(\"container/save\", data).then(function () {\n var appId = _this.$store.state.appInfo.id;\n\n _this.axios.get(\"/container/list?appId=\" + appId).then(function (res) {\n _this.$message.info(_this.$t('message.success')); // 恢复默认表单\n\n\n _this.dialogVisible = false;\n _this.form.containerName = '';\n _this.gitForm = {};\n _this.sourceInfo = '';\n _this.id = ''; // 刷新容器表单\n\n _this.containerList = res;\n });\n });\n },\n // 文件上传成功后 修改来源信息\n onSuccess: function onSuccess(response) {\n this.sourceInfo = response.data;\n },\n deleteItem: function deleteItem(item, index) {\n var _this2 = this;\n\n var appId = this.$store.state.appInfo.id;\n this.flyio.get(\"/container/delete?containerId=\" + item.id + '&appId=' + appId).then(function (res) {\n console.log(res);\n\n _this2.containerList.splice(index, 1);\n\n _this2.$message.info(_this2.$t('message.success'));\n });\n },\n editItem: function editItem(item) {\n if (item.sourceType == 'Git') {\n this.form.sourceType = 'Git';\n this.gitForm = JSON.parse(item.sourceInfo);\n } else {\n this.form.sourceType = 'FatJar';\n }\n\n this.form.containerName = item.containerName;\n this.id = item.id;\n this.dialogVisible = true;\n },\n arrangeItem: function arrangeItem(item) {\n var _this3 = this;\n\n var wsBase = this.requestUrl.replace(\"http\", \"ws\") + \"/container/deploy/\";\n var wsUrl = wsBase + item.id;\n ws = new WebSocket(wsUrl);\n\n ws.onopen = function () {\n _this3.arrangeTitle = _this3.$t('message.deploy');\n _this3.arrangeVisible = true;\n console.log(\"Connection open ...\");\n ws.send(\"Hello WebSockets!\");\n };\n\n ws.onmessage = function (evt) {\n _this3.logs.push(evt.data);\n };\n\n ws.onclose = function () {\n console.log(\"Connection closed.\");\n };\n },\n // 关闭部署页面时 关闭ws避免dialog内的信息有上台机器信息\n closeArrange: function closeArrange() {\n ws.close();\n this.logs = [];\n },\n closeEdit: function closeEdit() {\n this.sourceInfo = '';\n this.fileList = [];\n },\n listOfItem: function listOfItem(item) {\n var _this4 = this;\n\n var appId = this.$store.state.appInfo.id;\n this.flyio.get(\"/container/listDeployedWorker?containerId=\" + item.id + '&appId=' + appId).then(function (res) {\n if (res.data.data) {\n _this4.logs = res.data.data.split('\\n');\n _this4.arrangeTitle = _this4.$t('message.deployedWorkerList');\n _this4.arrangeVisible = true;\n } // this.containerList.splice(index,1);\n // this.$message(`容器${item.containerName}已删除`);\n\n });\n },\n // 兼容 java build in 模式下 baseURL 为 / 的情况(将当前url作为请求路径)\n calculateRequestUrl: function calculateRequestUrl() {\n if (_main__WEBPACK_IMPORTED_MODULE_6__[\"default\"] === undefined || !_main__WEBPACK_IMPORTED_MODULE_6__[\"default\"].includes(\"http\")) {\n var url = window.location.href;\n var urlSplit = url.split('//'); // str1[0]--协议头\n\n var ip = urlSplit[1].split('/')[0];\n this.requestUrl = urlSplit[0] + '//' + ip;\n console.log(\"calculateRequestUrl: \" + this.requestUrl);\n } else {\n this.requestUrl = _main__WEBPACK_IMPORTED_MODULE_6__[\"default\"];\n }\n }\n },\n mounted: function mounted() {\n var _this5 = this;\n\n this.calculateRequestUrl();\n var appId = this.$store.state.appInfo.id;\n this.flyio.get(\"/container/list?appId=\" + appId).then(function (res) {\n console.log(res);\n\n if (res.data.success) {\n _this5.containerList = res.data.data;\n }\n });\n }\n});\n\n//# sourceURL=webpack:///./src/components/views/ContainerManager.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options");
/***/ }),
diff --git a/powerjob-server/src/main/resources/static/js/app.js b/powerjob-server/src/main/resources/static/js/app.js
index 64088f63..51bad83a 100644
--- a/powerjob-server/src/main/resources/static/js/app.js
+++ b/powerjob-server/src/main/resources/static/js/app.js
@@ -256,7 +256,7 @@ eval("__webpack_require__.r(__webpack_exports__);\n/* harmony import */ var _bar
/***/ (function(module, __webpack_exports__, __webpack_require__) {
"use strict";
-eval("__webpack_require__.r(__webpack_exports__);\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"Navbar\",\n data: function data() {\n return {\n changeAppInfoDialogVisible: false,\n appInfo: {\n id: this.$store.state.appInfo.id,\n appName: this.$store.state.appInfo.appName,\n password: undefined,\n password2: undefined\n }\n };\n },\n methods: {\n // 退出当前应用\n onClickCloseConsole: function onClickCloseConsole() {\n window.localStorage.removeItem('oms_auto_login');\n this.$router.push(\"/\");\n },\n // 处理系统设置的指令时间\n handleSettings: function handleSettings(cmd) {\n switch (cmd) {\n case \"logout\":\n this.onClickCloseConsole();\n break;\n\n case \"changeAppInfo\":\n this.changeAppInfoDialogVisible = true;\n break;\n }\n },\n // 更新应用信息\n saveNewAppInfo: function saveNewAppInfo() {\n var _this = this;\n\n if (this.appInfo.password === this.appInfo.password2) {\n var that = this;\n this.axios.post(\"/appInfo/save\", this.appInfo).then(function () {\n that.$message.success(_this.$t('message.success'));\n that.$router.push(\"/\");\n }, function (e) {\n return that.$message.error(e);\n });\n }\n }\n }\n});\n\n//# sourceURL=webpack:///./src/components/bar/Navbar.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options");
+eval("__webpack_require__.r(__webpack_exports__);\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"Navbar\",\n data: function data() {\n return {\n changeAppInfoDialogVisible: false,\n appInfo: {\n id: this.$store.state.appInfo.id,\n appName: this.$store.state.appInfo.appName,\n password: undefined,\n password2: undefined\n }\n };\n },\n methods: {\n // 退出当前应用\n onClickCloseConsole: function onClickCloseConsole() {\n window.localStorage.removeItem('oms_auto_login');\n this.$router.push(\"/\");\n },\n // 处理系统设置的指令时间\n handleSettings: function handleSettings(cmd) {\n switch (cmd) {\n case \"logout\":\n this.onClickCloseConsole();\n break;\n\n case \"changeAppInfo\":\n this.changeAppInfoDialogVisible = true;\n break;\n }\n },\n // 更新应用信息\n saveNewAppInfo: function saveNewAppInfo() {\n var _this = this;\n\n if (this.appInfo.password === this.appInfo.password2) {\n var that = this;\n this.axios.post(\"/appInfo/save\", this.appInfo).then(function () {\n that.$message.success(_this.$t('message.success'));\n that.$router.push(\"/\");\n }, function (e) {\n return that.$message.error(e);\n });\n } else {\n this.$message.error(\"the password doesn't match\");\n }\n }\n }\n});\n\n//# sourceURL=webpack:///./src/components/bar/Navbar.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options");
/***/ }),
diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml
index c7c73d5c..4efd4128 100644
--- a/powerjob-worker-agent/pom.xml
+++ b/powerjob-worker-agent/pom.xml
@@ -10,12 +10,12 @@
4.0.0
powerjob-worker-agent
- 3.1.2
+ 3.1.3
jar
- 3.1.2
+ 3.1.3
1.2.3
4.3.2
diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml
index b911cf21..b6dc739a 100644
--- a/powerjob-worker-samples/pom.xml
+++ b/powerjob-worker-samples/pom.xml
@@ -10,11 +10,11 @@
4.0.0
powerjob-worker-samples
- 3.1.2
+ 3.1.3
2.2.6.RELEASE
- 3.1.2
+ 3.1.3
1.2.68
diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/OhMySchedulerConfig.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/OhMySchedulerConfig.java
index 7c7b410b..3cc8b8ca 100644
--- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/OhMySchedulerConfig.java
+++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/OhMySchedulerConfig.java
@@ -31,7 +31,7 @@ public class OhMySchedulerConfig {
// 1. 创建配置文件
OhMyConfig config = new OhMyConfig();
config.setPort(port);
- config.setAppName("oms-test");
+ config.setAppName("powerjob-agent-test");
config.setServerAddress(serverAddress);
// 如果没有大型 Map/MapReduce 的需求,建议使用内存来加速计算
// 为了本地模拟多个实例,只能使用 MEMORY 启动(文件只能由一个应用占有)
diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml
index e44f9f98..6e7e9236 100644
--- a/powerjob-worker/pom.xml
+++ b/powerjob-worker/pom.xml
@@ -10,12 +10,12 @@
4.0.0
powerjob-worker
- 3.1.2
+ 3.1.3
jar
5.2.4.RELEASE
- 3.1.2
+ 3.1.3
1.4.200
3.4.2
5.6.1
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java
index 3df267c7..e30779cb 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java
@@ -59,9 +59,7 @@ public class ProcessorTrackerActor extends AbstractActor {
Long instanceId = req.getInstanceId();
List removedPts = ProcessorTrackerPool.removeProcessorTracker(instanceId);
- if (CollectionUtils.isEmpty(removedPts)) {
- log.warn("[ProcessorTrackerActor] ProcessorTracker for instance(instanceId={}) already destroyed.", instanceId);
- }else {
+ if (!CollectionUtils.isEmpty(removedPts)) {
removedPts.forEach(ProcessorTracker::destroy);
}
}
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/WorkerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/WorkerActor.java
index 6188d419..c10aed00 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/WorkerActor.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/WorkerActor.java
@@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.worker.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest;
+import com.github.kfcfans.powerjob.common.request.ServerDestroyContainerRequest;
import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory;
import lombok.extern.slf4j.Slf4j;
@@ -18,6 +19,7 @@ public class WorkerActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder()
.match(ServerDeployContainerRequest.class, this::onReceiveServerDeployContainerRequest)
+ .match(ServerDestroyContainerRequest.class, this::onReceiveServerDestroyContainerRequest)
.matchAny(obj -> log.warn("[WorkerActor] receive unknown request: {}.", obj))
.build();
}
@@ -25,4 +27,8 @@ public class WorkerActor extends AbstractActor {
private void onReceiveServerDeployContainerRequest(ServerDeployContainerRequest request) {
OmsContainerFactory.deployContainer(request);
}
+
+ private void onReceiveServerDestroyContainerRequest(ServerDestroyContainerRequest request) {
+ OmsContainerFactory.destroyContainer(request.getContainerId());
+ }
}
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsContainerFactory.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsContainerFactory.java
index 3a86a13e..2eb289c4 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsContainerFactory.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsContainerFactory.java
@@ -39,16 +39,21 @@ public class OmsContainerFactory {
/**
* 获取容器
* @param containerId 容器ID
+ * @param loadFromServer 当本地不存在时尝试从 server 加载
* @return 容器示例,可能为 null
*/
- public static OmsContainer getContainer(Long containerId) {
+ public static OmsContainer fetchContainer(Long containerId, boolean loadFromServer) {
OmsContainer omsContainer = CARGO.get(containerId);
if (omsContainer != null) {
return omsContainer;
}
- // 尝试下载
+ if (!loadFromServer) {
+ return null;
+ }
+
+ // 尝试从 server 加载
log.info("[OmsContainer-{}] can't find the container in factory, try to deploy from server.", containerId);
WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(containerId);
@@ -66,6 +71,8 @@ public class OmsContainerFactory {
ServerDeployContainerRequest deployRequest = askResponse.getData(ServerDeployContainerRequest.class);
log.info("[OmsContainer-{}] fetch containerInfo from server successfully.", containerId);
deployContainer(deployRequest);
+ }else {
+ log.warn("[OmsContainer-{}] fetch containerInfo failed, reason is {}.", containerId, askResponse.getMessage());
}
}catch (Exception e) {
log.error("[OmsContainer-{}] get container failed, exception is {}", containerId, e.toString());
@@ -133,4 +140,21 @@ public class OmsContainerFactory {
CARGO.forEach((name, container) -> info.add(new DeployedContainerInfo(container.getContainerId(), container.getVersion(), container.getDeployedTime(), null)));
return info;
}
+
+ /**
+ * 销毁指定容器
+ * @param containerId 容器ID
+ */
+ public static void destroyContainer(Long containerId) {
+ OmsContainer container = CARGO.remove(containerId);
+ if (container == null) {
+ log.info("[OmsContainer-{}] container not exists, so there is no need to destroy the container.", containerId);
+ return;
+ }
+ try {
+ container.destroy();
+ }catch (Exception e) {
+ log.warn("[OmsContainer-{}] destroy container failed.", containerId, e);
+ }
+ }
}
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java
index f1eb08f9..6dfacce6 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java
@@ -190,7 +190,7 @@ public class OmsJarContainer implements OmsContainer {
// 需要满足的条件:引用计数器减为0 & 有更新的容器出现
if (referenceCount.decrementAndGet() <= 0) {
- OmsContainer container = OmsContainerFactory.getContainer(containerId);
+ OmsContainer container = OmsContainerFactory.fetchContainer(containerId, false);
if (container != this) {
try {
destroy();
diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java
index cc85bf8a..e9957127 100644
--- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java
+++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java
@@ -65,7 +65,7 @@ public class ProcessorTracker {
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100;
// 长时间空闲的 ProcessorTracker 会发起销毁请求
- private static final long MAX_IDLE_TIME = 120000;
+ private static final long MAX_IDLE_TIME = 300000;
// 当 ProcessorTracker 出现根本性错误(比如 Processor 创建失败,所有的任务直接失败)
private boolean lethal = false;
@@ -179,7 +179,7 @@ public class ProcessorTracker {
statusReportRetryQueue.clear();
ProcessorTrackerPool.removeProcessorTracker(instanceId);
- log.info("[ProcessorTracker-{}] ProcessorTracker already destroyed!", instanceId);
+ log.info("[ProcessorTracker-{}] ProcessorTracker destroyed successfully!", instanceId);
// 3. 关闭定时线程池
CommonUtils.executeIgnoreException(() -> timingPool.shutdownNow());
@@ -309,9 +309,11 @@ public class ProcessorTracker {
String[] split = processorInfo.split("#");
log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", instanceId, split[1], split[0]);
- omsContainer = OmsContainerFactory.getContainer(Long.valueOf(split[0]));
+ omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), true);
if (omsContainer != null) {
processor = omsContainer.getProcessor(split[1]);
+ }else {
+ log.warn("[ProcessorTracker-{}] load container failed.", instanceId);
}
break;
default: