[release] merge branch 'v3.1.3'

This commit is contained in:
tjq 2020-07-01 22:22:06 +08:00
commit 152524df1e
38 changed files with 187 additions and 92 deletions

View File

@ -22,11 +22,21 @@ PowerJob原OhMyScheduler是全新一代分布式调度与计算框架
* 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。
* 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。
[在线试用地址](https://www.yuque.com/powerjob/guidence/hnbskn)
### 适用场景
* 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。
* 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
* 有需要分布式处理的业务场景比如需要更新一大批数据单机执行耗时非常长可以使用Map/MapReduce处理器完成任务的分发调动整个集群加速计算。
* 有需要延迟执行某些任务的业务场景:比如订单过期处理等。
### 设计目标
PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。
### 在线试用
试用地址:[try.powerjob.tech](http://try.powerjob.tech/)
试用应用名称powerjob-agent-test
控制台密码123
[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn)
### 同类产品对比
| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob |
@ -43,7 +53,9 @@ PowerJob原OhMyScheduler是全新一代分布式调度与计算框架
# 文档
**[超详细中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** OR **[备用地址(内容可能更新不及时)](https://kfcfans.github.io/)**
**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)**
**[Document](https://www.yuque.com/powerjob/en/xrdoqw)**
PS感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献!

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.1.2</version>
<version>3.1.3</version>
<packaging>jar</packaging>
<properties>
<powerjob.common.version>3.1.2</powerjob.common.version>
<powerjob.common.version>3.1.3</powerjob.common.version>
<junit.version>5.6.1</junit.version>
</properties>

View File

@ -66,9 +66,11 @@ public class OhMyClient {
appId = Long.parseLong(resultDTO.getData().toString());
currentAddress = addr;
break;
}else {
throw new OmsException(resultDTO.getMessage());
}
}
}catch (Exception ignore) {
}catch (IOException ignore) {
}
}
@ -173,13 +175,15 @@ public class OhMyClient {
* 运行某个任务
* @param jobId 任务ID
* @param instanceParams 任务实例的参数
* @param delayMS 延迟时间单位毫秒
* @return 任务实例IDinstanceId
* @throws Exception 异常
*/
public ResultDTO<Long> runJob(Long jobId, String instanceParams) throws Exception {
public ResultDTO<Long> runJob(Long jobId, String instanceParams, long delayMS) throws Exception {
FormBody.Builder builder = new FormBody.Builder()
.add("jobId", jobId.toString())
.add("appId", appId.toString());
.add("appId", appId.toString())
.add("delay", String.valueOf(delayMS));
if (StringUtils.isNotEmpty(instanceParams)) {
builder.add("instanceParams", instanceParams);
@ -188,7 +192,7 @@ public class OhMyClient {
return JsonUtils.parseObject(post, ResultDTO.class);
}
public ResultDTO<Long> runJob(Long jobId) throws Exception {
return runJob(jobId, null);
return runJob(jobId, null, 0);
}
/* ************* Instance 区 ************* */

View File

@ -21,7 +21,7 @@ public class TestClient {
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test2", null);
ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
}
@Test
@ -70,7 +70,7 @@ public class TestClient {
@Test
public void testRunJob() throws Exception {
System.out.println(ohMyClient.runJob(8L, "this is instanceParams"));
System.out.println(ohMyClient.runJob(6L, "this is instanceParams", 60000));
}
@Test

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.1.2</version>
<version>3.1.3</version>
<packaging>jar</packaging>
<properties>

View File

@ -15,5 +15,5 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
public class ServerDestroyContainerRequest implements OmsSerializable {
private String containerName;
private Long containerId;
}

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.common.request.http;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import lombok.Data;
import java.util.List;
@ -76,4 +77,14 @@ public class SaveJobInfoRequest {
// 报警用户ID列表
private List<Long> notifyUserIds;
public void valid() {
CommonUtils.requireNonNull(jobName, "jobName can't be empty");
CommonUtils.requireNonNull(appId, "appId can't be empty");
CommonUtils.requireNonNull(processorInfo, "processorInfo can't be empty");
CommonUtils.requireNonNull(executeType, "executeType can't be empty");
CommonUtils.requireNonNull(processorType, "processorType can't be empty");
CommonUtils.requireNonNull(timeExpressionType, "timeExpressionType can't be empty");
}
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.common.request.http;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.google.common.collect.Lists;
import lombok.Data;
@ -43,4 +44,11 @@ public class SaveWorkflowRequest {
// 工作流整体失败的报警
private List<Long> notifyUserIds = Lists.newLinkedList();
public void valid() {
CommonUtils.requireNonNull(wfName, "workflow name can't be empty");
CommonUtils.requireNonNull(appId, "appId can't be empty");
CommonUtils.requireNonNull(pEWorkflowDAG, "dag can't be empty");
CommonUtils.requireNonNull(timeExpressionType, "timeExpressionType can't be empty");
}
}

View File

@ -1,8 +1,11 @@
package com.github.kfcfans.powerjob.common.utils;
import com.github.kfcfans.powerjob.common.OmsException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;
@ -114,4 +117,16 @@ public class CommonUtils {
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
public static <T> T requireNonNull(T obj, String msg) {
if (obj == null) {
throw new OmsException(msg);
}
if (obj instanceof String) {
if (StringUtils.isEmpty((String) obj)) {
throw new OmsException(msg);
}
}
return obj;
}
}

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.1.2</version>
<version>3.1.3</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.common.version>3.1.2</powerjob.common.version>
<powerjob.common.version>3.1.3</powerjob.common.version>
<mysql.version>8.0.19</mysql.version>
<h2.db.version>1.4.200</h2.db.version>
<zip4j.version>2.5.2</zip4j.version>

View File

@ -10,6 +10,7 @@ import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerReque
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository;
@ -91,8 +92,10 @@ public class ServerActor extends AbstractActor {
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(req.getContainerId());
AskResponse askResponse = new AskResponse();
askResponse.setSuccess(false);
if (containerInfoOpt.isPresent()) {
if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) {
askResponse.setSuccess(false);
askResponse.setMessage("can't find container by id: " + req.getContainerId());
}else {
ContainerInfoDO containerInfo = containerInfoOpt.get();
askResponse.setSuccess(true);
@ -104,7 +107,6 @@ public class ServerActor extends AbstractActor {
askResponse.setData(JsonUtils.toBytes(dpReq));
}
getSender().tell(askResponse, getSelf());
}

View File

@ -1,31 +0,0 @@
package com.github.kfcfans.powerjob.server.common.constans;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 容器状态
* 由于命名约束准备采取硬删除策略
*
* @author tjq
* @since 2020/5/15
*/
@Getter
@AllArgsConstructor
public enum ContainerStatus {
ENABLE(1),
DISABLE(2),
DELETED(99);
private int v;
public static ContainerStatus of(int v) {
for (ContainerStatus type : values()) {
if (type.v == v) {
return type;
}
}
throw new IllegalArgumentException("unknown ContainerStatus of " + v);
}
}

View File

@ -1,5 +1,19 @@
package com.github.kfcfans.powerjob.server.common.utils;
/*
Copyright [2020] [KFCFans]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import java.io.Serializable;
import java.text.ParseException;
import java.util.Calendar;

View File

@ -37,8 +37,6 @@ public class HashedWheelTimer implements Timer {
private final ExecutorService taskProcessPool;
private static final int MAXIMUM_CAPACITY = 1 << 30;
public HashedWheelTimer(long tickDuration, int ticksPerWheel) {
this(tickDuration, ticksPerWheel, 0);
}
@ -47,9 +45,9 @@ public class HashedWheelTimer implements Timer {
* 新建时间轮定时器
* @param tickDuration 时间间隔单位毫秒ms
* @param ticksPerWheel 轮盘个数
* @param processTaskNum 处理任务的线程个数0代表不启用新线程如果定时任务需要耗时操作请启用线程池
* @param processThreadNum 处理任务的线程个数0代表不启用新线程如果定时任务需要耗时操作请启用线程池
*/
public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processTaskNum) {
public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) {
this.tickDuration = tickDuration;
@ -62,12 +60,13 @@ public class HashedWheelTimer implements Timer {
mask = wheel.length - 1;
// 初始化执行线程池
if (processTaskNum <= 0) {
if (processThreadNum <= 0) {
taskProcessPool = null;
}else {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(16);
taskProcessPool = new ThreadPoolExecutor(2, processTaskNum,
int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
60, TimeUnit.SECONDS,
queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
}

View File

@ -1,7 +1,7 @@
package com.github.kfcfans.powerjob.server.common.utils.timewheel;
/**
* description
* TimerFuture
*
* @author tjq
* @since 2020/4/3

View File

@ -13,5 +13,5 @@ import java.util.List;
*/
public interface ContainerInfoRepository extends JpaRepository<ContainerInfoDO, Long> {
List<ContainerInfoDO> findByAppId(Long appId);
List<ContainerInfoDO> findByAppIdAndStatusNot(Long appId, Integer status);
}

View File

@ -12,6 +12,7 @@ import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository;
@ -85,6 +86,9 @@ public class ContainerService {
* @param request 容器保存请求
*/
public void save(SaveContainerInfoRequest request) {
request.valid();
ContainerInfoDO container;
Long originId = request.getId();
if (originId != null) {
@ -120,15 +124,17 @@ public class ContainerService {
throw new RuntimeException("Permission Denied!");
}
ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getContainerName());
ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId());
WorkerManagerService.getActiveWorkerInfo(container.getAppId()).keySet().forEach(akkaAddress -> {
ActorSelection workerActor = OhMyServer.getWorkerActor(akkaAddress);
workerActor.tell(destroyRequest, null);
});
log.info("[ContainerService] delete container: {}.", container);
// 硬删除算了...留着好像也没什么用
containerInfoRepository.deleteById(containerId);
// 软删除
container.setStatus(SwitchableStatus.DELETED.getV());
container.setGmtModified(new Date());
containerInfoRepository.saveAndFlush(container);
}
/**

View File

@ -12,6 +12,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
@ -21,6 +22,7 @@ import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* 任务服务
@ -50,6 +52,8 @@ public class JobService {
*/
public Long saveJob(SaveJobInfoRequest request) throws Exception {
request.valid();
JobInfoDO jobInfoDO;
if (request.getId() != null) {
jobInfoDO = jobInfoRepository.findById(request.getId()).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + request.getId()));
@ -94,16 +98,22 @@ public class JobService {
* 手动立即运行某个任务
* @param jobId 任务ID
* @param instanceParams 任务实例参数 OpenAPI 存在
* @param delay 延迟时间单位 毫秒
* @return 任务实例ID
*/
public long runJob(Long jobId, String instanceParams) {
public long runJob(Long jobId, String instanceParams, long delay) {
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis());
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
instanceInfoRepository.flush();
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
if (delay <= 0) {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
}else {
HashedWheelTimerHolder.TIMER.schedule(() -> {
dispatchService.dispatch(jobInfo, instanceId, 0, instanceParams, null);
}, delay, TimeUnit.MILLISECONDS);
}
return instanceId;
}

View File

@ -43,7 +43,7 @@ public class ServerSelectService {
/**
* 获取某个应用对应的Server
* 缺点如果server死而复生可能造成worker集群脑裂不过感觉影响不是很大 & 概率极低就不管了
*
* @param appId 应用ID
* @return 当前可用的Server
*/

View File

@ -258,7 +258,7 @@ public class OmsScheduleService {
}
log.info("[FrequentScheduler] These frequent jobs will be scheduled {}.", notRunningJobIds);
notRunningJobIds.forEach(jobId -> jobService.runJob(jobId, null));
notRunningJobIds.forEach(jobId -> jobService.runJob(jobId, null, 0));
}catch (Exception e) {
log.error("[FrequentScheduler] schedule frequent job failed.", e);
}

View File

@ -39,6 +39,8 @@ public class WorkflowService {
*/
public Long saveWorkflow(SaveWorkflowRequest req) throws Exception {
req.valid();
if (!WorkflowDAGUtils.valid(req.getPEWorkflowDAG())) {
throw new OmsException("illegal DAG");
}

View File

@ -4,7 +4,7 @@ import com.github.kfcfans.powerjob.common.OmsConstant;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType;
import com.github.kfcfans.powerjob.server.common.constans.ContainerStatus;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.ContainerTemplateGenerator;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
@ -87,7 +87,8 @@ public class ContainerController {
@GetMapping("/list")
public ResultDTO<List<ContainerInfoVO>> listContainers(Long appId) {
List<ContainerInfoVO> res = containerInfoRepository.findByAppId(appId).stream().map(ContainerController::convert).collect(Collectors.toList());
List<ContainerInfoVO> res = containerInfoRepository.findByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV())
.stream().map(ContainerController::convert).collect(Collectors.toList());
return ResultDTO.success(res);
}
@ -122,7 +123,7 @@ public class ContainerController {
}else {
vo.setLastDeployTime(DateFormatUtils.format(containerInfoDO.getLastDeployTime(), OmsConstant.TIME_PATTERN));
}
ContainerStatus status = ContainerStatus.of(containerInfoDO.getStatus());
SwitchableStatus status = SwitchableStatus.of(containerInfoDO.getStatus());
vo.setStatus(status.name());
ContainerSourceType sourceType = ContainerSourceType.of(containerInfoDO.getSourceType());
vo.setSourceType(sourceType.name());

View File

@ -63,7 +63,7 @@ public class JobController {
@GetMapping("/run")
public ResultDTO<Long> runImmediately(String jobId) {
return ResultDTO.success(jobService.runJob(Long.valueOf(jobId), null));
return ResultDTO.success(jobService.runJob(Long.valueOf(jobId), null, 0));
}
@PostMapping("/list")

View File

@ -80,9 +80,9 @@ public class OpenAPIController {
}
@PostMapping(OpenAPIConstant.RUN_JOB)
public ResultDTO<Long> runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams) {
public ResultDTO<Long> runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams, @RequestParam(required = false) Long delay) {
checkJobIdValid(jobId, appId);
return ResultDTO.success(jobService.runJob(jobId, instanceParams));
return ResultDTO.success(jobService.runJob(jobId, instanceParams, delay == null ? 0 : delay));
}
/* ************* Instance 区 ************* */

View File

@ -46,4 +46,9 @@ public class ServerController {
return ResultDTO.success(server);
}
@GetMapping("/hello")
public ResultDTO<String> ping() {
return ResultDTO.success("this is powerjob-server~");
}
}

View File

@ -59,7 +59,7 @@ public class SystemInfoController {
}
String server =appInfoOpt.get().getCurrentServer();
// 没有Server
// 没有 Server说明从来没有该 appId worker 集群连接过
if (StringUtils.isEmpty(server)) {
return ResultDTO.success(Collections.emptyList());
}

View File

@ -1,7 +1,8 @@
package com.github.kfcfans.powerjob.server.web.request;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.server.common.constans.ContainerSourceType;
import com.github.kfcfans.powerjob.server.common.constans.ContainerStatus;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import lombok.Data;
/**
@ -28,5 +29,11 @@ public class SaveContainerInfoRequest {
private String sourceInfo;
// 状态枚举值为 ContainerStatusENABLE/DISABLE
private ContainerStatus status;
private SwitchableStatus status;
public void valid() {
CommonUtils.requireNonNull(containerName, "containerName can't be empty");
CommonUtils.requireNonNull(appId, "appId can't be empty");
CommonUtils.requireNonNull(sourceInfo, "sourceInfo can't be empty");
}
}

File diff suppressed because one or more lines are too long

View File

@ -256,7 +256,7 @@ eval("__webpack_require__.r(__webpack_exports__);\n/* harmony import */ var _bar
/***/ (function(module, __webpack_exports__, __webpack_require__) {
"use strict";
eval("__webpack_require__.r(__webpack_exports__);\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"Navbar\",\n data: function data() {\n return {\n changeAppInfoDialogVisible: false,\n appInfo: {\n id: this.$store.state.appInfo.id,\n appName: this.$store.state.appInfo.appName,\n password: undefined,\n password2: undefined\n }\n };\n },\n methods: {\n // 退出当前应用\n onClickCloseConsole: function onClickCloseConsole() {\n window.localStorage.removeItem('oms_auto_login');\n this.$router.push(\"/\");\n },\n // 处理系统设置的指令时间\n handleSettings: function handleSettings(cmd) {\n switch (cmd) {\n case \"logout\":\n this.onClickCloseConsole();\n break;\n\n case \"changeAppInfo\":\n this.changeAppInfoDialogVisible = true;\n break;\n }\n },\n // 更新应用信息\n saveNewAppInfo: function saveNewAppInfo() {\n var _this = this;\n\n if (this.appInfo.password === this.appInfo.password2) {\n var that = this;\n this.axios.post(\"/appInfo/save\", this.appInfo).then(function () {\n that.$message.success(_this.$t('message.success'));\n that.$router.push(\"/\");\n }, function (e) {\n return that.$message.error(e);\n });\n }\n }\n }\n});\n\n//# sourceURL=webpack:///./src/components/bar/Navbar.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options");
eval("__webpack_require__.r(__webpack_exports__);\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"Navbar\",\n data: function data() {\n return {\n changeAppInfoDialogVisible: false,\n appInfo: {\n id: this.$store.state.appInfo.id,\n appName: this.$store.state.appInfo.appName,\n password: undefined,\n password2: undefined\n }\n };\n },\n methods: {\n // 退出当前应用\n onClickCloseConsole: function onClickCloseConsole() {\n window.localStorage.removeItem('oms_auto_login');\n this.$router.push(\"/\");\n },\n // 处理系统设置的指令时间\n handleSettings: function handleSettings(cmd) {\n switch (cmd) {\n case \"logout\":\n this.onClickCloseConsole();\n break;\n\n case \"changeAppInfo\":\n this.changeAppInfoDialogVisible = true;\n break;\n }\n },\n // 更新应用信息\n saveNewAppInfo: function saveNewAppInfo() {\n var _this = this;\n\n if (this.appInfo.password === this.appInfo.password2) {\n var that = this;\n this.axios.post(\"/appInfo/save\", this.appInfo).then(function () {\n that.$message.success(_this.$t('message.success'));\n that.$router.push(\"/\");\n }, function (e) {\n return that.$message.error(e);\n });\n } else {\n this.$message.error(\"the password doesn't match\");\n }\n }\n }\n});\n\n//# sourceURL=webpack:///./src/components/bar/Navbar.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options");
/***/ }),

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.1.2</version>
<version>3.1.3</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.1.2</powerjob.worker.version>
<powerjob.worker.version>3.1.3</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.1.2</version>
<version>3.1.3</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.version>3.1.2</powerjob.worker.version>
<powerjob.worker.version>3.1.3</powerjob.worker.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -31,7 +31,7 @@ public class OhMySchedulerConfig {
// 1. 创建配置文件
OhMyConfig config = new OhMyConfig();
config.setPort(port);
config.setAppName("oms-test");
config.setAppName("powerjob-agent-test");
config.setServerAddress(serverAddress);
// 如果没有大型 Map/MapReduce 的需求建议使用内存来加速计算
// 为了本地模拟多个实例只能使用 MEMORY 启动文件只能由一个应用占有

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.1.2</version>
<version>3.1.3</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.1.2</powerjob.common.version>
<powerjob.common.version>3.1.3</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -59,9 +59,7 @@ public class ProcessorTrackerActor extends AbstractActor {
Long instanceId = req.getInstanceId();
List<ProcessorTracker> removedPts = ProcessorTrackerPool.removeProcessorTracker(instanceId);
if (CollectionUtils.isEmpty(removedPts)) {
log.warn("[ProcessorTrackerActor] ProcessorTracker for instance(instanceId={}) already destroyed.", instanceId);
}else {
if (!CollectionUtils.isEmpty(removedPts)) {
removedPts.forEach(ProcessorTracker::destroy);
}
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.worker.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.powerjob.common.request.ServerDestroyContainerRequest;
import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory;
import lombok.extern.slf4j.Slf4j;
@ -18,6 +19,7 @@ public class WorkerActor extends AbstractActor {
public Receive createReceive() {
return receiveBuilder()
.match(ServerDeployContainerRequest.class, this::onReceiveServerDeployContainerRequest)
.match(ServerDestroyContainerRequest.class, this::onReceiveServerDestroyContainerRequest)
.matchAny(obj -> log.warn("[WorkerActor] receive unknown request: {}.", obj))
.build();
}
@ -25,4 +27,8 @@ public class WorkerActor extends AbstractActor {
private void onReceiveServerDeployContainerRequest(ServerDeployContainerRequest request) {
OmsContainerFactory.deployContainer(request);
}
private void onReceiveServerDestroyContainerRequest(ServerDestroyContainerRequest request) {
OmsContainerFactory.destroyContainer(request.getContainerId());
}
}

View File

@ -39,16 +39,21 @@ public class OmsContainerFactory {
/**
* 获取容器
* @param containerId 容器ID
* @param loadFromServer 当本地不存在时尝试从 server 加载
* @return 容器示例可能为 null
*/
public static OmsContainer getContainer(Long containerId) {
public static OmsContainer fetchContainer(Long containerId, boolean loadFromServer) {
OmsContainer omsContainer = CARGO.get(containerId);
if (omsContainer != null) {
return omsContainer;
}
// 尝试下载
if (!loadFromServer) {
return null;
}
// 尝试从 server 加载
log.info("[OmsContainer-{}] can't find the container in factory, try to deploy from server.", containerId);
WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(containerId);
@ -66,6 +71,8 @@ public class OmsContainerFactory {
ServerDeployContainerRequest deployRequest = askResponse.getData(ServerDeployContainerRequest.class);
log.info("[OmsContainer-{}] fetch containerInfo from server successfully.", containerId);
deployContainer(deployRequest);
}else {
log.warn("[OmsContainer-{}] fetch containerInfo failed, reason is {}.", containerId, askResponse.getMessage());
}
}catch (Exception e) {
log.error("[OmsContainer-{}] get container failed, exception is {}", containerId, e.toString());
@ -133,4 +140,21 @@ public class OmsContainerFactory {
CARGO.forEach((name, container) -> info.add(new DeployedContainerInfo(container.getContainerId(), container.getVersion(), container.getDeployedTime(), null)));
return info;
}
/**
* 销毁指定容器
* @param containerId 容器ID
*/
public static void destroyContainer(Long containerId) {
OmsContainer container = CARGO.remove(containerId);
if (container == null) {
log.info("[OmsContainer-{}] container not exists, so there is no need to destroy the container.", containerId);
return;
}
try {
container.destroy();
}catch (Exception e) {
log.warn("[OmsContainer-{}] destroy container failed.", containerId, e);
}
}
}

View File

@ -190,7 +190,7 @@ public class OmsJarContainer implements OmsContainer {
// 需要满足的条件引用计数器减为0 & 有更新的容器出现
if (referenceCount.decrementAndGet() <= 0) {
OmsContainer container = OmsContainerFactory.getContainer(containerId);
OmsContainer container = OmsContainerFactory.fetchContainer(containerId, false);
if (container != this) {
try {
destroy();

View File

@ -65,7 +65,7 @@ public class ProcessorTracker {
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100;
// 长时间空闲的 ProcessorTracker 会发起销毁请求
private static final long MAX_IDLE_TIME = 120000;
private static final long MAX_IDLE_TIME = 300000;
// ProcessorTracker 出现根本性错误比如 Processor 创建失败所有的任务直接失败
private boolean lethal = false;
@ -179,7 +179,7 @@ public class ProcessorTracker {
statusReportRetryQueue.clear();
ProcessorTrackerPool.removeProcessorTracker(instanceId);
log.info("[ProcessorTracker-{}] ProcessorTracker already destroyed!", instanceId);
log.info("[ProcessorTracker-{}] ProcessorTracker destroyed successfully!", instanceId);
// 3. 关闭定时线程池
CommonUtils.executeIgnoreException(() -> timingPool.shutdownNow());
@ -309,9 +309,11 @@ public class ProcessorTracker {
String[] split = processorInfo.split("#");
log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", instanceId, split[1], split[0]);
omsContainer = OmsContainerFactory.getContainer(Long.valueOf(split[0]));
omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), true);
if (omsContainer != null) {
processor = omsContainer.getProcessor(split[1]);
}else {
log.warn("[ProcessorTracker-{}] load container failed.", instanceId);
}
break;
default: