[release] Merge branch 'v3.0.2'

This commit is contained in:
tjq 2020-06-21 19:31:09 +08:00
commit 774126b0f1
39 changed files with 371 additions and 138 deletions

View File

@ -38,7 +38,7 @@ PowerJob原OhMyScheduler是全新一代分布式调度与计算框架
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
| 报警监控 | 无 | 邮件 | 短信 | **邮件,提供接口允许开发者扩展** |
| 系统依赖 | MySQL | MySQL | 人民币(公测期间免费,哎,帮打个广告吧) | **任意Spring Data Jpa支持的关系型数据库MySQL、Oracle...** |
| 系统依赖 | JDBC支持的关系型数据库MySQL、Oracle... | MySQL | 人民币(公测期间免费,哎,帮打个广告吧) | **任意Spring Data Jpa支持的关系型数据库MySQL、Oracle...** |
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |

View File

@ -1,13 +1,15 @@
<p align="center">
<img src="https://raw.githubusercontent.com/KFCFans/OhMyScheduler/master/others/images/oms-logo.png" alt="OhMyScheduler" title="OhMyScheduler" width="557"/>
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/logo.png" alt="PowerJob" title="PowerJob" width="557"/>
</p>
<p align="center">
<a href="https://github.com/KFCFans/OhMyScheduler/actions"><img src="https://github.com/KFCFans/OhMyScheduler/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master"></a>
<a href="https://github.com/KFCFans/OhMyScheduler/blob/master/LICENSE"><img src="https://img.shields.io/github/license/KFCFans/OhMyScheduler"></a>
<a href="https://github.com/KFCFans/PowerJob/actions"><img src="https://github.com/KFCFans/PowerJob/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master" alt="actions"></a>
<a href="https://search.maven.org/search?q=com.github.kfcfans"><img alt="Maven Central" src="https://img.shields.io/maven-central/v/com.github.kfcfans/powerjob-worker"></a>
<a href="https://github.com/KFCFans/PowerJob/releases"><img alt="GitHub release (latest SemVer)" src="https://img.shields.io/github/v/release/kfcfans/powerjob?color=%23E59866"></a>
<a href="https://github.com/KFCFans/PowerJob/blob/master/LICENSE"><img src="https://img.shields.io/github/license/KFCFans/PowerJob" alt="LICENSE"></a>
</p>
OhMyScheduler is a powerful distributed scheduling platform and distributed computing framework based on Akka architecture.It provides you a chance to schedule job and distributed computing easily.
PowerJob is a powerful distributed scheduling platform and distributed computing framework based on Akka architecture.It provides you a chance to schedule job and distributed computing easily.
# Introduction
@ -30,7 +32,7 @@ OhMyScheduler is a powerful distributed scheduling platform and distributed comp
### Comparison of similar products
| | QuartZ | xxl-job | SchedulerX 2.0 | OhMyScheduler |
| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob |
| ---------------------------------- | --------------------------------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
| Timing type | CRON | CRON | CRON, fixed frequency, fixed delay, OpenAPI | **CRON, fixed frequency, fixed delay, OpenAPI** |
| Task type | Built-in Java | Built-in Java, GLUE Java, Shell, Python and other scripts | Built-in Java, external Java (FatJar), Shell, Python and other scripts | **Built-in Java, external Java (container), Shell, Python and other scripts** |
@ -39,13 +41,13 @@ OhMyScheduler is a powerful distributed scheduling platform and distributed comp
| Log blanking | not support | support | not support | **support** |
| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** |
| Alarm monitoring | no | mail | SMS | **Email, providing an interface to allow developers to customize development** |
| System dependence | MySQL | MySQL | Renminbi (free during public beta, hey, help to advertise) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
| DAG workflow | not support | not support | support | **support** |
| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | Renminbi (free during public beta, hey, help to advertise) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
| workflow | not support | not support | support | **support** |
# Document
**[GitHub Wiki](https://github.com/KFCFans/OhMyScheduler/wiki)**
**[GitHub Wiki](https://github.com/KFCFans/PowerJob/wiki)**
**[中文文档](https://www.yuque.com/ohmyscheduler/product)**
**[中文文档](https://www.yuque.com/powerjob/product)**
# Others

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.0.1</version>
<version>3.1.0</version>
<packaging>jar</packaging>
<properties>
<powerjob.common.version>3.0.1</powerjob.common.version>
<powerjob.common.version>3.1.0</powerjob.common.version>
<junit.version>5.6.1</junit.version>
</properties>

View File

@ -15,6 +15,7 @@ import okhttp3.MediaType;
import okhttp3.RequestBody;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
@ -39,8 +40,8 @@ public class OhMyClient {
* @param domain www.oms-server.com内网域名自行完成DNS & Proxy
* @param appName 负责的应用名称
*/
public OhMyClient(String domain, String appName) {
this(Lists.newArrayList(domain), appName);
public OhMyClient(String domain, String appName, String password) {
this(Lists.newArrayList(domain), appName, password);
}
@ -49,16 +50,16 @@ public class OhMyClient {
* @param addressList IP:Port 列表
* @param appName 负责的应用名称
*/
public OhMyClient(List<String> addressList, String appName) {
public OhMyClient(List<String> addressList, String appName, String password) {
Objects.requireNonNull(addressList, "domain can't be null!");
Objects.requireNonNull(appName, "appName can't be null");
allAddress = addressList;
for (String addr : addressList) {
String url = getUrl(OpenAPIConstant.ASSERT, addr) + "?appName=" + appName;
String url = getUrl(OpenAPIConstant.ASSERT, addr);
try {
String result = HttpUtils.get(url);
String result = assertApp(appName, password, url);
if (StringUtils.isNotEmpty(result)) {
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
@ -77,6 +78,15 @@ public class OhMyClient {
log.info("[OhMyClient] {}'s oms-client bootstrap successfully.", appName);
}
private static String assertApp(String appName, String password, String url) throws IOException {
FormBody.Builder builder = new FormBody.Builder()
.add("appName", appName);
if (password != null) {
builder.add("password", password);
}
return HttpUtils.post(url, builder.build());
}
private static String getUrl(String path, String address) {
return String.format(URL_PATTERN, address, OpenAPIConstant.WEB_PATH, path);

View File

@ -21,7 +21,7 @@ public class TestClient {
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test2");
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test2", null);
}
@Test
@ -29,7 +29,7 @@ public class TestClient {
SaveJobInfoRequest newJobInfo = new SaveJobInfoRequest();
// newJobInfo.setId(8L);
newJobInfo.setJobName("omsOpenAPIJob");
newJobInfo.setJobName("omsOpenAPIJobccccc");
newJobInfo.setJobDescription("tes OpenAPI");
newJobInfo.setJobParams("{'aa':'bb'}");
newJobInfo.setTimeExpressionType(TimeExpressionType.CRON);

View File

@ -20,7 +20,7 @@ public class TestWorkflow {
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test");
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test", null);
}
@Test

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.0.1</version>
<version>3.1.0</version>
<packaging>jar</packaging>
<properties>

View File

@ -1,4 +1,4 @@
package com.github.kfcfans.powerjob.common.request.http;
package com.github.kfcfans.powerjob.common.request;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.AllArgsConstructor;

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.0.1</version>
<version>3.1.0</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.common.version>3.0.1</powerjob.common.version>
<powerjob.common.version>3.1.0</powerjob.common.version>
<mysql.version>8.0.19</mysql.version>
<h2.db.version>1.4.200</h2.db.version>
<zip4j.version>2.5.2</zip4j.version>

View File

@ -6,7 +6,7 @@ import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat;
import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq;
import com.github.kfcfans.powerjob.common.request.http.WorkerNeedDeployContainerRequest;
import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerRequest;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;

View File

@ -21,7 +21,8 @@ public class AppInfoDO {
private Long id;
private String appName;
private String description;
// 应用分组密码
private String password;
// 当前负责该 appName 旗下任务调度的server地址IP:Port注意该地址为ActorSystem地址而不是HTTP地址两者端口不同
private String currentServer;

View File

@ -0,0 +1,38 @@
package com.github.kfcfans.powerjob.server.service;
import com.github.kfcfans.powerjob.common.OmsException;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 应用信息服务
*
* @author tjq
* @since 2020/6/20
*/
@Service
public class AppInfoService {
@Resource
private AppInfoRepository appInfoRepository;
/**
* 验证应用访问权限
* @param appName 应用名称
* @param password 密码
* @return 应用ID
*/
public Long assertApp(String appName, String password) {
AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new OmsException("can't find appInfo by appName: " + appName));
if (Objects.equals(appInfo.getPassword(), password)) {
return appInfo.getId();
}
throw new OmsException("password error!");
}
}

View File

@ -3,6 +3,8 @@ package com.github.kfcfans.powerjob.server.web.controller;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.service.AppInfoService;
import com.github.kfcfans.powerjob.server.web.request.AppAssertRequest;
import com.github.kfcfans.powerjob.server.web.request.ModifyAppInfoRequest;
import com.google.common.collect.Lists;
import lombok.Data;
@ -29,6 +31,8 @@ import java.util.stream.Collectors;
@RequestMapping("/appInfo")
public class AppInfoController {
@Resource
private AppInfoService appInfoService;
@Resource
private AppInfoRepository appInfoRepository;
@ -53,6 +57,11 @@ public class AppInfoController {
return ResultDTO.success(null);
}
@PostMapping("/assert")
public ResultDTO<Long> assertApp(@RequestBody AppAssertRequest request) {
return ResultDTO.success(appInfoService.assertApp(request.getAppName(), request.getPassword()));
}
@GetMapping("/delete")
public ResultDTO<Void> deleteAppInfo(Long appId) {
appInfoRepository.deleteById(appId);

View File

@ -3,8 +3,7 @@ package com.github.kfcfans.powerjob.server.web.controller;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.OpenAPIConstant;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.service.AppInfoService;
import com.github.kfcfans.powerjob.server.service.CacheService;
import com.github.kfcfans.powerjob.server.service.JobService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
@ -15,7 +14,6 @@ import com.github.kfcfans.powerjob.common.response.*;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.Optional;
/**
* 开放接口OpenAPI控制器对接 oms-client
@ -27,6 +25,8 @@ import java.util.Optional;
@RequestMapping(OpenAPIConstant.WEB_PATH)
public class OpenAPIController {
@Resource
private AppInfoService appInfoService;
@Resource
private JobService jobService;
@Resource
@ -39,14 +39,10 @@ public class OpenAPIController {
@Resource
private CacheService cacheService;
@Resource
private AppInfoRepository appInfoRepository;
@GetMapping(OpenAPIConstant.ASSERT)
public ResultDTO<Long> assertAppName(String appName) {
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findByAppName(appName);
return appInfoOpt.map(appInfoDO -> ResultDTO.success(appInfoDO.getId()))
.orElseGet(() -> ResultDTO.failed(appName + " is not registered!"));
@PostMapping(OpenAPIConstant.ASSERT)
public ResultDTO<Long> assertAppName(String appName, @RequestParam(required = false) String password) {
return ResultDTO.success(appInfoService.assertApp(appName, password));
}
/* ************* Job 区 ************* */

View File

@ -0,0 +1,15 @@
package com.github.kfcfans.powerjob.server.web.request;
import lombok.Data;
/**
* 验证应用应用登陆
*
* @author tjq
* @since 2020/6/20
*/
@Data
public class AppAssertRequest {
private String appName;
private String password;
}

View File

@ -13,5 +13,5 @@ public class ModifyAppInfoRequest {
private Long id;
private String appName;
private String description;
private String password;
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.0.1</version>
<version>3.1.0</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.0.1</powerjob.worker.version>
<powerjob.worker.version>3.1.0</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.0.1</version>
<version>3.1.0</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.version>3.0.1</powerjob.worker.version>
<powerjob.worker.version>3.1.0</powerjob.worker.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -10,16 +10,18 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.0.1</version>
<version>3.1.0</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.0.1</powerjob.common.version>
<powerjob.common.version>3.1.0</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>
<kryo.version>5.0.0-RC5</kryo.version>
<logback.version>1.2.3</logback.version>
</properties>
<dependencies>
@ -66,6 +68,13 @@
<scope>test</scope>
</dependency>
<!-- log for test stage -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -127,6 +127,7 @@ public class TaskTrackerActor extends AbstractActor {
* ProcessorTracker 心跳处理器
*/
private void onReceiveProcessorTrackerStatusReportReq(ProcessorTrackerStatusReportReq req) {
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ProcessorTrackerStatusReportReq({}) but system can't find TaskTracker.", req);

View File

@ -48,7 +48,9 @@ public class ServerDiscoveryService {
String ip = currentServer.split(":")[0];
// 直接请求当前Server的HTTP服务可以少一次网络开销减轻Server负担
String firstServerAddress = IP2ADDRESS.get(ip);
result = acquire(firstServerAddress);
if (firstServerAddress != null) {
result = acquire(firstServerAddress);
}
}
for (String httpServerAddress : OhMyWorker.getConfig().getServerAddress()) {

View File

@ -3,8 +3,10 @@ package com.github.kfcfans.powerjob.worker.common;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.google.common.collect.Lists;
import lombok.Data;
import java.util.Collections;
import java.util.List;
/**
@ -26,7 +28,7 @@ public class OhMyConfig {
/**
* 调度服务器地址ip:port 域名
*/
private List<String> serverAddress;
private List<String> serverAddress = Lists.newArrayList();
/**
* 本地持久化方式默认使用磁盘
*/

View File

@ -5,7 +5,7 @@ import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.model.DeployedContainerInfo;
import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.powerjob.common.request.http.WorkerNeedDeployContainerRequest;
import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerRequest;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;

View File

@ -54,6 +54,8 @@ public class ProcessorTracker {
private OmsLogger omsLogger;
// ProcessResult 上报失败的重试队列
private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
// 上一次空闲时间
private long lastIdleTime;
private String taskTrackerAddress;
private ActorSelection taskTrackerActorRef;
@ -62,6 +64,8 @@ public class ProcessorTracker {
private ScheduledExecutorService timingPool;
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100;
// 长时间空闲的 ProcessorTracker 会发起销毁请求
private static final long MAX_IDLE_TIME = 120000;
// ProcessorTracker 出现根本性错误比如 Processor 创建失败所有的任务直接失败
private boolean lethal = false;
@ -82,6 +86,7 @@ public class ProcessorTracker {
this.omsLogger = new OmsServerLogger(instanceId);
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
this.lastIdleTime = -1L;
// 初始化 线程池TimingPool 启动的任务会检查 ThreadPool所以必须先初始化线程池否则NPE
initThreadPool();
@ -221,6 +226,7 @@ public class ProcessorTracker {
@Override
public void run() {
// 超时检查如果超时则自动关闭 TaskTracker
long interval = System.currentTimeMillis() - startTime;
// 秒级任务的ProcessorTracker不应该关闭
if (!TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) {
@ -231,6 +237,25 @@ public class ProcessorTracker {
}
}
// 判断线程池活跃状态长时间空闲则上报 TaskTracker 请求检查
if (threadPool.getActiveCount() > 0) {
lastIdleTime = -1;
}else {
if (lastIdleTime == -1) {
lastIdleTime = System.currentTimeMillis();
}else {
long idleTime = System.currentTimeMillis() - lastIdleTime;
if (idleTime > MAX_IDLE_TIME) {
log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime);
// 不可靠通知如果该请求失败则整个任务处理集群缺失一个 ProcessorTracker影响可接受
taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildIdleReport(instanceId), null);
destroy();
return;
}
}
}
// 上报状态之前先重新发送失败的任务只要有结果堆积就不上报状态 PT 认为该 TT 失联然后重试相关任务
while (!statusReportRetryQueue.isEmpty()) {
ProcessorReportTaskStatusReq req = statusReportRetryQueue.poll();
@ -245,8 +270,7 @@ public class ProcessorTracker {
// 上报当前 ProcessorTracker 负载
long waitingNum = threadPool.getQueue().size();
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceId, waitingNum);
taskTrackerActorRef.tell(req, null);
taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum), null);
log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum);
}

View File

@ -2,15 +2,12 @@ package com.github.kfcfans.powerjob.worker.core.tracker.task;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.SystemInstanceResult;
import com.github.kfcfans.powerjob.common.*;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
@ -105,10 +102,11 @@ public class CommonTaskTracker extends TaskTracker {
rootTask.setLastReportTime(-1L);
rootTask.setSubInstanceId(instanceId);
if (!taskPersistenceService.save(rootTask)) {
log.error("[TaskTracker-{}] create root task failed.", instanceId);
}else {
if (taskPersistenceService.save(rootTask)) {
log.info("[TaskTracker-{}] create root task successfully.", instanceId);
}else {
log.error("[TaskTracker-{}] create root task failed.", instanceId);
throw new OmsException("create root task failed for instance: " + instanceId);
}
}
@ -176,7 +174,7 @@ public class CommonTaskTracker extends TaskTracker {
success = holder.failedNum == 0;
result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
break;
// MapReduce Broadcast 任务实例是否完成根据**Last_Task**的执行情况判断
// MapReduce Broadcast 任务实例是否完成根据**LastTask**的执行情况判断
default:
Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);
@ -267,8 +265,8 @@ public class CommonTaskTracker extends TaskTracker {
taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity);
log.warn("[TaskTracker-{}] task(taskId={}) try to dispatch again due to unreceived the response from ProcessorTracker.",
instanceId, uncheckTask.getTaskId());
log.warn("[TaskTracker-{}] task(id={},name={}) try to dispatch again due to unreceived the response from ProcessorTracker.",
instanceId, uncheckTask.getTaskId(), uncheckTask.getTaskName());
}
});
@ -280,9 +278,6 @@ public class CommonTaskTracker extends TaskTracker {
log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
taskPersistenceService.updateLostTasks(disconnectedPTs);
}
// 6.3 超时检查 -> 检查超时的Task
}
@Override
@ -290,7 +285,7 @@ public class CommonTaskTracker extends TaskTracker {
try {
innerRun();
}catch (Exception e) {
log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceInfo.getInstanceId(), e);
log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceId, e);
}
}
}

View File

@ -267,8 +267,20 @@ public abstract class TaskTracker {
* @param heartbeatReq ProcessorTracker任务的执行管理器发来的心跳包包含了其当前状态
*/
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
ptStatusHolder.updateStatus(heartbeatReq);
log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
ptStatusHolder.updateStatus(heartbeatReq);
// 上报空闲检查是否已经接收到全部该 ProcessorTracker 负责的任务
if (heartbeatReq.getType() == ProcessorTrackerStatusReportReq.IDLE) {
String idlePtAddress = heartbeatReq.getAddress();
// ProcessorTracker 已销毁重置为初始状态
ptStatusHolder.getProcessorTrackerStatus(idlePtAddress).setDispatched(false);
List<TaskDO> unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress);
if (!CollectionUtils.isEmpty(unfinishedTask)) {
log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask);
unfinishedTask.forEach(task -> updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
}
}
}
/**

View File

@ -33,7 +33,8 @@ public class ConnectionFactory {
synchronized (ConnectionFactory.class) {
if (dataSource == null) {
StoreStrategy strategy = OhMyWorker.getConfig().getStoreStrategy();
// 兼容单元测试否则没办法单独测试 DAO 层了
StoreStrategy strategy = OhMyWorker.getConfig() == null ? StoreStrategy.DISK : OhMyWorker.getConfig().getStoreStrategy();
HikariConfig config = new HikariConfig();
config.setDriverClassName("org.h2.Driver");

View File

@ -66,9 +66,8 @@ public class TaskDO {
@Override
public String toString() {
return "TaskDO{" +
return "{" +
"taskId='" + taskId + '\'' +
", instanceId='" + instanceId + '\'' +
", taskName='" + taskName + '\'' +
", address='" + address + '\'' +
", status=" + status +

View File

@ -156,6 +156,23 @@ public class TaskPersistenceService {
return Lists.newArrayList();
}
// 获取某个 ProcessorTracker 未完成的任务
public List<TaskDO> getAllUnFinishedTaskByAddress(Long instanceId, String address) {
try {
String condition = String.format("status not in (%d, %d)", TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), TaskStatus.WORKER_PROCESS_FAILED.getValue());
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setAddress(address);
query.setQueryCondition(condition);
return execute(() -> taskDAO.simpleQuery(query));
}catch (Exception e) {
log.error("[TaskPersistenceService] getAllTaskByAddress for instance(id={}) failed.", instanceId, e);
}
return Lists.newArrayList();
}
/**
* 获取指定状态的Task
*/

View File

@ -16,6 +16,12 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
public class ProcessorTrackerStatusReportReq implements OmsSerializable {
public static final int IDLE = 1;
public static final int LOAD = 2;
// IDLE 代表 ProcessorTracker 长期处于空闲状态LOAD 代表 负载上报请求
private int type;
private Long instanceId;
/**
@ -33,11 +39,24 @@ public class ProcessorTrackerStatusReportReq implements OmsSerializable {
*/
private String address;
public ProcessorTrackerStatusReportReq(Long instanceId, long remainTaskNum) {
this.instanceId = instanceId;
this.remainTaskNum = remainTaskNum;
this.time = System.currentTimeMillis();
this.address = OhMyWorker.getWorkerAddress();
public static ProcessorTrackerStatusReportReq buildIdleReport(Long instanceId) {
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq();
req.type = IDLE;
req.instanceId = instanceId;
req.time = System.currentTimeMillis();
req.address = OhMyWorker.getWorkerAddress();
req.setRemainTaskNum(0);
return req;
}
public static ProcessorTrackerStatusReportReq buildLoadReport(Long instanceId, Long remainTaskNum) {
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq();
req.type = LOAD;
req.instanceId = instanceId;
req.time = System.currentTimeMillis();
req.address = OhMyWorker.getWorkerAddress();
req.setRemainTaskNum(remainTaskNum);
return req;
}
}

View File

@ -0,0 +1,78 @@
package com.github.kfcfans.powerjob;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import com.typesafe.config.ConfigFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
/**
* 启动公共服务
*
* @author tjq
* @since 2020/6/17
*/
public class CommonTest {
protected static ActorSelection remoteProcessorTracker;
protected static ActorSelection remoteTaskTracker;
@BeforeAll
public static void startWorker() throws Exception {
OhMyConfig ohMyConfig = new OhMyConfig();
ohMyConfig.setAppName("oms-test");
ohMyConfig.setEnableTestMode(true);
OhMyWorker worker = new OhMyWorker();
worker.setConfig(ohMyConfig);
worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
String address = NetUtils.getLocalHost() + ":27777";
remoteProcessorTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME));
remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.Task_TRACKER_ACTOR_NAME));
}
@AfterAll
public static void stop() throws Exception {
Thread.sleep(120000);
}
public static TaskTrackerStartTaskReq genTaskTrackerStartTaskReq(String processor) {
InstanceInfo instanceInfo = new InstanceInfo();
instanceInfo.setJobId(1L);
instanceInfo.setInstanceId(10086L);
instanceInfo.setExecuteType(ExecuteType.STANDALONE.name());
instanceInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
instanceInfo.setProcessorInfo(processor);
instanceInfo.setInstanceTimeoutMS(500000);
instanceInfo.setThreadConcurrency(5);
instanceInfo.setTaskRetryNum(3);
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
req.setTaskTrackerAddress(NetUtils.getLocalHost() + ":27777");
req.setInstanceInfo(instanceInfo);
req.setTaskId("0");
req.setTaskName("ROOT_TASK");
req.setTaskCurrentRetryNums(0);
return req;
}
}

View File

@ -24,6 +24,7 @@ public class PersistenceServiceTest {
public static void initTable() throws Exception {
taskPersistenceService.init();
System.out.println("=============== init data ===============");
List<TaskDO> taskList = Lists.newLinkedList();
for (int i = 0; i < 10; i++) {
TaskDO task = new TaskDO();
@ -39,10 +40,11 @@ public class PersistenceServiceTest {
task.setAddress(NetUtils.getLocalHost());
task.setLastModifiedTime(System.currentTimeMillis());
task.setCreatedTime(System.currentTimeMillis());
task.setLastReportTime(System.currentTimeMillis());
task.setResult("");
}
taskPersistenceService.batchSave(taskList);
System.out.println("=============== init data ===============");
taskList.forEach(System.out::println);
}
@ -75,4 +77,11 @@ public class PersistenceServiceTest {
System.out.println("updateLostTasks: " + success);
}
@Test
public void testGetAllUnFinishedTaskByAddress() throws Exception {
System.out.println("=============== testGetAllUnFinishedTaskByAddress ===============");
List<TaskDO> res = taskPersistenceService.getAllUnFinishedTaskByAddress(10086L, NetUtils.getLocalHost());
System.out.println(res);
}
}

View File

@ -9,6 +9,7 @@ import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker;
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import com.typesafe.config.ConfigFactory;
@ -23,68 +24,20 @@ import org.junit.jupiter.api.Test;
* @author tjq
* @since 2020/3/24
*/
public class ProcessorTrackerTest {
private static ActorSelection remoteProcessorTracker;
@BeforeAll
public static void startWorker() throws Exception {
OhMyConfig ohMyConfig = new OhMyConfig();
ohMyConfig.setAppName("oms-test");
OhMyWorker worker = new OhMyWorker();
worker.setConfig(ohMyConfig);
worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost(), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
remoteProcessorTracker = testAS.actorSelection(akkaRemotePath);
}
@AfterAll
public static void stop() throws Exception {
Thread.sleep(120000);
}
public class ProcessorTrackerTest extends CommonTest {
@Test
public void testBasicProcessor() throws Exception {
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.oms.processors.TestBasicProcessor");
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
remoteProcessorTracker.tell(req, null);
Thread.sleep(30000);
}
@Test
public void testMapReduceProcessor() throws Exception {
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.oms.processors.TestMapReduceProcessor");
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestMapReduceProcessor");
remoteProcessorTracker.tell(req, null);
Thread.sleep(30000);
}
private static TaskTrackerStartTaskReq genTaskTrackerStartTaskReq(String processor) {
InstanceInfo instanceInfo = new InstanceInfo();
instanceInfo.setJobId(1L);
instanceInfo.setInstanceId(10086L);
instanceInfo.setExecuteType(ExecuteType.STANDALONE.name());
instanceInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
instanceInfo.setProcessorInfo(processor);
instanceInfo.setInstanceTimeoutMS(500000);
instanceInfo.setThreadConcurrency(5);
instanceInfo.setTaskRetryNum(3);
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
req.setTaskTrackerAddress(NetUtils.getLocalHost());
req.setInstanceInfo(instanceInfo);
req.setTaskId("0");
req.setTaskName("ROOT_TASK");
req.setTaskCurrentRetryNums(0);
return req;
}
}

View File

@ -19,6 +19,7 @@ public class TestUtils {
public static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType, TimeExpressionType timeExpressionType) {
ServerScheduleJobReq req = new ServerScheduleJobReq();
req.setJobId(1L);
req.setInstanceId(10086L);
req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT));
@ -38,15 +39,15 @@ public class TestUtils {
switch (executeType) {
case STANDALONE:
req.setExecuteType(ExecuteType.STANDALONE.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor");
req.setProcessorInfo("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
break;
case MAP_REDUCE:
req.setExecuteType(ExecuteType.MAP_REDUCE.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestMapReduceProcessor");
req.setProcessorInfo("com.github.kfcfans.powerjob.processors.TestMapReduceProcessor");
break;
case BROADCAST:
req.setExecuteType(ExecuteType.BROADCAST.name());
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBroadcastProcessor");
req.setProcessorInfo("com.github.kfcfans.powerjob.processors.TestBroadcastProcessor");
break;
}

View File

@ -0,0 +1,40 @@
package com.github.kfcfans.powerjob.function;
import com.github.kfcfans.powerjob.CommonTest;
import com.github.kfcfans.powerjob.TestUtils;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import org.junit.jupiter.api.Test;
/**
* 空闲测试
*
* @author tjq
* @since 2020/6/17
*/
public class IdleTest extends CommonTest {
@Test
public void testProcessorTrackerSendIdleReport() throws Exception {
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
ProcessorTracker pt = new ProcessorTracker(req);
Thread.sleep(300000);
}
@Test
public void testTaskTrackerProcessorIdle() throws Exception {
ProcessorTrackerStatusReportReq req = ProcessorTrackerStatusReportReq.buildIdleReport(10086L);
ServerScheduleJobReq serverScheduleJobReq = TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.API);
TaskTracker taskTracker = TaskTracker.create(serverScheduleJobReq);
if (taskTracker != null) {
taskTracker.receiveProcessorTrackerHeartbeat(req);
}
}
}

View File

@ -5,7 +5,7 @@ akka {
allow-java-serialization = off
serialization-bindings {
"OmsSerializable" = jackson-cbor
"com.github.kfcfans.powerjob.common.OmsSerializable" = jackson-cbor
}
}
remote {