mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
Merge branch 'v3.0.2' into jenkins_auto_build
This commit is contained in:
commit
28b67c6574
@ -38,7 +38,7 @@ PowerJob(原OhMyScheduler)是全新一代分布式调度与计算框架,
|
|||||||
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
|
| 日志白屏化 | 不支持 | 支持 | 不支持 | **支持** |
|
||||||
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
|
| 调度方式及性能 | 基于数据库锁,有性能瓶颈 | 基于数据库锁,有性能瓶颈 | 不详 | **无锁化设计,性能强劲无上限** |
|
||||||
| 报警监控 | 无 | 邮件 | 短信 | **邮件,提供接口允许开发者扩展** |
|
| 报警监控 | 无 | 邮件 | 短信 | **邮件,提供接口允许开发者扩展** |
|
||||||
| 系统依赖 | MySQL | MySQL | 人民币(公测期间免费,哎,帮打个广告吧) | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** |
|
| 系统依赖 | JDBC支持的关系型数据库(MySQL、Oracle...) | MySQL | 人民币(公测期间免费,哎,帮打个广告吧) | **任意Spring Data Jpa支持的关系型数据库(MySQL、Oracle...)** |
|
||||||
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |
|
| DAG工作流 | 不支持 | 不支持 | 支持 | **支持** |
|
||||||
|
|
||||||
|
|
||||||
|
@ -39,8 +39,8 @@ OhMyScheduler is a powerful distributed scheduling platform and distributed comp
|
|||||||
| Log blanking | not support | support | not support | **support** |
|
| Log blanking | not support | support | not support | **support** |
|
||||||
| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** |
|
| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** |
|
||||||
| Alarm monitoring | no | mail | SMS | **Email, providing an interface to allow developers to customize development** |
|
| Alarm monitoring | no | mail | SMS | **Email, providing an interface to allow developers to customize development** |
|
||||||
| System dependence | MySQL | MySQL | Renminbi (free during public beta, hey, help to advertise) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
|
| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | Renminbi (free during public beta, hey, help to advertise) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
|
||||||
| DAG workflow | not support | not support | support | **support** |
|
| workflow | not support | not support | support | **support** |
|
||||||
|
|
||||||
# Document
|
# Document
|
||||||
**[GitHub Wiki](https://github.com/KFCFans/OhMyScheduler/wiki)**
|
**[GitHub Wiki](https://github.com/KFCFans/OhMyScheduler/wiki)**
|
||||||
|
@ -10,11 +10,11 @@
|
|||||||
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<artifactId>powerjob-client</artifactId>
|
<artifactId>powerjob-client</artifactId>
|
||||||
<version>3.0.1</version>
|
<version>3.1.0</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<powerjob.common.version>3.0.1</powerjob.common.version>
|
<powerjob.common.version>3.1.0</powerjob.common.version>
|
||||||
<junit.version>5.6.1</junit.version>
|
<junit.version>5.6.1</junit.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@ import okhttp3.MediaType;
|
|||||||
import okhttp3.RequestBody;
|
import okhttp3.RequestBody;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
@ -39,8 +40,8 @@ public class OhMyClient {
|
|||||||
* @param domain www.oms-server.com(内网域名,自行完成DNS & Proxy)
|
* @param domain www.oms-server.com(内网域名,自行完成DNS & Proxy)
|
||||||
* @param appName 负责的应用名称
|
* @param appName 负责的应用名称
|
||||||
*/
|
*/
|
||||||
public OhMyClient(String domain, String appName) {
|
public OhMyClient(String domain, String appName, String password) {
|
||||||
this(Lists.newArrayList(domain), appName);
|
this(Lists.newArrayList(domain), appName, password);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -49,16 +50,16 @@ public class OhMyClient {
|
|||||||
* @param addressList IP:Port 列表
|
* @param addressList IP:Port 列表
|
||||||
* @param appName 负责的应用名称
|
* @param appName 负责的应用名称
|
||||||
*/
|
*/
|
||||||
public OhMyClient(List<String> addressList, String appName) {
|
public OhMyClient(List<String> addressList, String appName, String password) {
|
||||||
|
|
||||||
Objects.requireNonNull(addressList, "domain can't be null!");
|
Objects.requireNonNull(addressList, "domain can't be null!");
|
||||||
Objects.requireNonNull(appName, "appName can't be null");
|
Objects.requireNonNull(appName, "appName can't be null");
|
||||||
|
|
||||||
allAddress = addressList;
|
allAddress = addressList;
|
||||||
for (String addr : addressList) {
|
for (String addr : addressList) {
|
||||||
String url = getUrl(OpenAPIConstant.ASSERT, addr) + "?appName=" + appName;
|
String url = getUrl(OpenAPIConstant.ASSERT, addr);
|
||||||
try {
|
try {
|
||||||
String result = HttpUtils.get(url);
|
String result = assertApp(appName, password, url);
|
||||||
if (StringUtils.isNotEmpty(result)) {
|
if (StringUtils.isNotEmpty(result)) {
|
||||||
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
|
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
|
||||||
if (resultDTO.isSuccess()) {
|
if (resultDTO.isSuccess()) {
|
||||||
@ -77,6 +78,15 @@ public class OhMyClient {
|
|||||||
log.info("[OhMyClient] {}'s oms-client bootstrap successfully.", appName);
|
log.info("[OhMyClient] {}'s oms-client bootstrap successfully.", appName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String assertApp(String appName, String password, String url) throws IOException {
|
||||||
|
FormBody.Builder builder = new FormBody.Builder()
|
||||||
|
.add("appName", appName);
|
||||||
|
if (password != null) {
|
||||||
|
builder.add("password", password);
|
||||||
|
}
|
||||||
|
return HttpUtils.post(url, builder.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static String getUrl(String path, String address) {
|
private static String getUrl(String path, String address) {
|
||||||
return String.format(URL_PATTERN, address, OpenAPIConstant.WEB_PATH, path);
|
return String.format(URL_PATTERN, address, OpenAPIConstant.WEB_PATH, path);
|
||||||
|
@ -21,7 +21,7 @@ public class TestClient {
|
|||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void initClient() throws Exception {
|
public static void initClient() throws Exception {
|
||||||
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test2");
|
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test2", null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -29,7 +29,7 @@ public class TestClient {
|
|||||||
|
|
||||||
SaveJobInfoRequest newJobInfo = new SaveJobInfoRequest();
|
SaveJobInfoRequest newJobInfo = new SaveJobInfoRequest();
|
||||||
// newJobInfo.setId(8L);
|
// newJobInfo.setId(8L);
|
||||||
newJobInfo.setJobName("omsOpenAPIJob");
|
newJobInfo.setJobName("omsOpenAPIJobccccc");
|
||||||
newJobInfo.setJobDescription("tes OpenAPI");
|
newJobInfo.setJobDescription("tes OpenAPI");
|
||||||
newJobInfo.setJobParams("{'aa':'bb'}");
|
newJobInfo.setJobParams("{'aa':'bb'}");
|
||||||
newJobInfo.setTimeExpressionType(TimeExpressionType.CRON);
|
newJobInfo.setTimeExpressionType(TimeExpressionType.CRON);
|
||||||
|
@ -20,7 +20,7 @@ public class TestWorkflow {
|
|||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void initClient() throws Exception {
|
public static void initClient() throws Exception {
|
||||||
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test");
|
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test", null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -10,7 +10,7 @@
|
|||||||
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<artifactId>powerjob-common</artifactId>
|
<artifactId>powerjob-common</artifactId>
|
||||||
<version>3.0.1</version>
|
<version>3.1.0</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package com.github.kfcfans.powerjob.common.request.http;
|
package com.github.kfcfans.powerjob.common.request;
|
||||||
|
|
||||||
import com.github.kfcfans.powerjob.common.OmsSerializable;
|
import com.github.kfcfans.powerjob.common.OmsSerializable;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
@ -10,13 +10,13 @@
|
|||||||
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<artifactId>powerjob-server</artifactId>
|
<artifactId>powerjob-server</artifactId>
|
||||||
<version>3.0.1</version>
|
<version>3.1.0</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<swagger.version>2.9.2</swagger.version>
|
<swagger.version>2.9.2</swagger.version>
|
||||||
<springboot.version>2.2.6.RELEASE</springboot.version>
|
<springboot.version>2.2.6.RELEASE</springboot.version>
|
||||||
<powerjob.common.version>3.0.1</powerjob.common.version>
|
<powerjob.common.version>3.1.0</powerjob.common.version>
|
||||||
<mysql.version>8.0.19</mysql.version>
|
<mysql.version>8.0.19</mysql.version>
|
||||||
<h2.db.version>1.4.200</h2.db.version>
|
<h2.db.version>1.4.200</h2.db.version>
|
||||||
<zip4j.version>2.5.2</zip4j.version>
|
<zip4j.version>2.5.2</zip4j.version>
|
||||||
|
@ -6,7 +6,7 @@ import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest;
|
|||||||
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
|
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
|
||||||
import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat;
|
import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat;
|
||||||
import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq;
|
import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq;
|
||||||
import com.github.kfcfans.powerjob.common.request.http.WorkerNeedDeployContainerRequest;
|
import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerRequest;
|
||||||
import com.github.kfcfans.powerjob.common.response.AskResponse;
|
import com.github.kfcfans.powerjob.common.response.AskResponse;
|
||||||
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
|
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
|
||||||
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||||
|
@ -21,7 +21,8 @@ public class AppInfoDO {
|
|||||||
private Long id;
|
private Long id;
|
||||||
|
|
||||||
private String appName;
|
private String appName;
|
||||||
private String description;
|
// 应用分组密码
|
||||||
|
private String password;
|
||||||
|
|
||||||
// 当前负责该 appName 旗下任务调度的server地址,IP:Port(注意,该地址为ActorSystem地址,而不是HTTP地址,两者端口不同)
|
// 当前负责该 appName 旗下任务调度的server地址,IP:Port(注意,该地址为ActorSystem地址,而不是HTTP地址,两者端口不同)
|
||||||
private String currentServer;
|
private String currentServer;
|
||||||
|
@ -0,0 +1,38 @@
|
|||||||
|
package com.github.kfcfans.powerjob.server.service;
|
||||||
|
|
||||||
|
import com.github.kfcfans.powerjob.common.OmsException;
|
||||||
|
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
|
||||||
|
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 应用信息服务
|
||||||
|
*
|
||||||
|
* @author tjq
|
||||||
|
* @since 2020/6/20
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class AppInfoService {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private AppInfoRepository appInfoRepository;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 验证应用访问权限
|
||||||
|
* @param appName 应用名称
|
||||||
|
* @param password 密码
|
||||||
|
* @return 应用ID
|
||||||
|
*/
|
||||||
|
public Long assertApp(String appName, String password) {
|
||||||
|
|
||||||
|
AppInfoDO appInfo = appInfoRepository.findByAppName(appName).orElseThrow(() -> new OmsException("can't find appInfo by appName: " + appName));
|
||||||
|
if (Objects.equals(appInfo.getPassword(), password)) {
|
||||||
|
return appInfo.getId();
|
||||||
|
}
|
||||||
|
throw new OmsException("password error!");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -3,6 +3,8 @@ package com.github.kfcfans.powerjob.server.web.controller;
|
|||||||
import com.github.kfcfans.powerjob.common.response.ResultDTO;
|
import com.github.kfcfans.powerjob.common.response.ResultDTO;
|
||||||
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
|
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
|
||||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
|
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
|
||||||
|
import com.github.kfcfans.powerjob.server.service.AppInfoService;
|
||||||
|
import com.github.kfcfans.powerjob.server.web.request.AppAssertRequest;
|
||||||
import com.github.kfcfans.powerjob.server.web.request.ModifyAppInfoRequest;
|
import com.github.kfcfans.powerjob.server.web.request.ModifyAppInfoRequest;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
@ -29,6 +31,8 @@ import java.util.stream.Collectors;
|
|||||||
@RequestMapping("/appInfo")
|
@RequestMapping("/appInfo")
|
||||||
public class AppInfoController {
|
public class AppInfoController {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private AppInfoService appInfoService;
|
||||||
@Resource
|
@Resource
|
||||||
private AppInfoRepository appInfoRepository;
|
private AppInfoRepository appInfoRepository;
|
||||||
|
|
||||||
@ -53,6 +57,11 @@ public class AppInfoController {
|
|||||||
return ResultDTO.success(null);
|
return ResultDTO.success(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostMapping("/assert")
|
||||||
|
public ResultDTO<Long> assertApp(@RequestBody AppAssertRequest request) {
|
||||||
|
return ResultDTO.success(appInfoService.assertApp(request.getAppName(), request.getPassword()));
|
||||||
|
}
|
||||||
|
|
||||||
@GetMapping("/delete")
|
@GetMapping("/delete")
|
||||||
public ResultDTO<Void> deleteAppInfo(Long appId) {
|
public ResultDTO<Void> deleteAppInfo(Long appId) {
|
||||||
appInfoRepository.deleteById(appId);
|
appInfoRepository.deleteById(appId);
|
||||||
|
@ -3,8 +3,7 @@ package com.github.kfcfans.powerjob.server.web.controller;
|
|||||||
import com.github.kfcfans.powerjob.common.InstanceStatus;
|
import com.github.kfcfans.powerjob.common.InstanceStatus;
|
||||||
import com.github.kfcfans.powerjob.common.OpenAPIConstant;
|
import com.github.kfcfans.powerjob.common.OpenAPIConstant;
|
||||||
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
|
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
|
||||||
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
|
import com.github.kfcfans.powerjob.server.service.AppInfoService;
|
||||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
|
|
||||||
import com.github.kfcfans.powerjob.server.service.CacheService;
|
import com.github.kfcfans.powerjob.server.service.CacheService;
|
||||||
import com.github.kfcfans.powerjob.server.service.JobService;
|
import com.github.kfcfans.powerjob.server.service.JobService;
|
||||||
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
|
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
|
||||||
@ -15,7 +14,6 @@ import com.github.kfcfans.powerjob.common.response.*;
|
|||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 开放接口(OpenAPI)控制器,对接 oms-client
|
* 开放接口(OpenAPI)控制器,对接 oms-client
|
||||||
@ -27,6 +25,8 @@ import java.util.Optional;
|
|||||||
@RequestMapping(OpenAPIConstant.WEB_PATH)
|
@RequestMapping(OpenAPIConstant.WEB_PATH)
|
||||||
public class OpenAPIController {
|
public class OpenAPIController {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private AppInfoService appInfoService;
|
||||||
@Resource
|
@Resource
|
||||||
private JobService jobService;
|
private JobService jobService;
|
||||||
@Resource
|
@Resource
|
||||||
@ -39,14 +39,10 @@ public class OpenAPIController {
|
|||||||
@Resource
|
@Resource
|
||||||
private CacheService cacheService;
|
private CacheService cacheService;
|
||||||
|
|
||||||
@Resource
|
|
||||||
private AppInfoRepository appInfoRepository;
|
|
||||||
|
|
||||||
@GetMapping(OpenAPIConstant.ASSERT)
|
@PostMapping(OpenAPIConstant.ASSERT)
|
||||||
public ResultDTO<Long> assertAppName(String appName) {
|
public ResultDTO<Long> assertAppName(String appName, @RequestParam(required = false) String password) {
|
||||||
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findByAppName(appName);
|
return ResultDTO.success(appInfoService.assertApp(appName, password));
|
||||||
return appInfoOpt.map(appInfoDO -> ResultDTO.success(appInfoDO.getId()))
|
|
||||||
.orElseGet(() -> ResultDTO.failed(appName + " is not registered!"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ************* Job 区 ************* */
|
/* ************* Job 区 ************* */
|
||||||
|
@ -0,0 +1,15 @@
|
|||||||
|
package com.github.kfcfans.powerjob.server.web.request;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 验证应用(应用登陆)
|
||||||
|
*
|
||||||
|
* @author tjq
|
||||||
|
* @since 2020/6/20
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class AppAssertRequest {
|
||||||
|
private String appName;
|
||||||
|
private String password;
|
||||||
|
}
|
@ -13,5 +13,5 @@ public class ModifyAppInfoRequest {
|
|||||||
|
|
||||||
private Long id;
|
private Long id;
|
||||||
private String appName;
|
private String appName;
|
||||||
private String description;
|
private String password;
|
||||||
}
|
}
|
||||||
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -10,12 +10,12 @@
|
|||||||
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<artifactId>powerjob-worker-agent</artifactId>
|
<artifactId>powerjob-worker-agent</artifactId>
|
||||||
<version>3.0.1</version>
|
<version>3.1.0</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<powerjob.worker.version>3.0.1</powerjob.worker.version>
|
<powerjob.worker.version>3.1.0</powerjob.worker.version>
|
||||||
<logback.version>1.2.3</logback.version>
|
<logback.version>1.2.3</logback.version>
|
||||||
<picocli.version>4.3.2</picocli.version>
|
<picocli.version>4.3.2</picocli.version>
|
||||||
|
|
||||||
|
@ -10,11 +10,11 @@
|
|||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
<artifactId>powerjob-worker-samples</artifactId>
|
<artifactId>powerjob-worker-samples</artifactId>
|
||||||
<version>3.0.1</version>
|
<version>3.1.0</version>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<springboot.version>2.2.6.RELEASE</springboot.version>
|
<springboot.version>2.2.6.RELEASE</springboot.version>
|
||||||
<powerjob.worker.version>3.0.1</powerjob.worker.version>
|
<powerjob.worker.version>3.1.0</powerjob.worker.version>
|
||||||
<fastjson.version>1.2.68</fastjson.version>
|
<fastjson.version>1.2.68</fastjson.version>
|
||||||
|
|
||||||
<!-- 部署时跳过该module -->
|
<!-- 部署时跳过该module -->
|
||||||
|
@ -10,16 +10,18 @@
|
|||||||
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<artifactId>powerjob-worker</artifactId>
|
<artifactId>powerjob-worker</artifactId>
|
||||||
<version>3.0.1</version>
|
<version>3.1.0</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<spring.version>5.2.4.RELEASE</spring.version>
|
<spring.version>5.2.4.RELEASE</spring.version>
|
||||||
<powerjob.common.version>3.0.1</powerjob.common.version>
|
<powerjob.common.version>3.1.0</powerjob.common.version>
|
||||||
<h2.db.version>1.4.200</h2.db.version>
|
<h2.db.version>1.4.200</h2.db.version>
|
||||||
<hikaricp.version>3.4.2</hikaricp.version>
|
<hikaricp.version>3.4.2</hikaricp.version>
|
||||||
<junit.version>5.6.1</junit.version>
|
<junit.version>5.6.1</junit.version>
|
||||||
<kryo.version>5.0.0-RC5</kryo.version>
|
<kryo.version>5.0.0-RC5</kryo.version>
|
||||||
|
|
||||||
|
<logback.version>1.2.3</logback.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
@ -66,6 +68,13 @@
|
|||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- log for test stage -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>ch.qos.logback</groupId>
|
||||||
|
<artifactId>logback-classic</artifactId>
|
||||||
|
<version>${logback.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
@ -3,8 +3,10 @@ package com.github.kfcfans.powerjob.worker.common;
|
|||||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||||
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
|
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
|
||||||
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
|
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -26,7 +28,7 @@ public class OhMyConfig {
|
|||||||
/**
|
/**
|
||||||
* 调度服务器地址,ip:port 或 域名
|
* 调度服务器地址,ip:port 或 域名
|
||||||
*/
|
*/
|
||||||
private List<String> serverAddress;
|
private List<String> serverAddress = Lists.newArrayList();
|
||||||
/**
|
/**
|
||||||
* 本地持久化方式,默认使用磁盘
|
* 本地持久化方式,默认使用磁盘
|
||||||
*/
|
*/
|
||||||
|
@ -5,7 +5,7 @@ import akka.pattern.Patterns;
|
|||||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||||
import com.github.kfcfans.powerjob.common.model.DeployedContainerInfo;
|
import com.github.kfcfans.powerjob.common.model.DeployedContainerInfo;
|
||||||
import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest;
|
import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest;
|
||||||
import com.github.kfcfans.powerjob.common.request.http.WorkerNeedDeployContainerRequest;
|
import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerRequest;
|
||||||
import com.github.kfcfans.powerjob.common.response.AskResponse;
|
import com.github.kfcfans.powerjob.common.response.AskResponse;
|
||||||
import com.github.kfcfans.powerjob.worker.OhMyWorker;
|
import com.github.kfcfans.powerjob.worker.OhMyWorker;
|
||||||
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
|
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
|
||||||
|
@ -54,6 +54,8 @@ public class ProcessorTracker {
|
|||||||
private OmsLogger omsLogger;
|
private OmsLogger omsLogger;
|
||||||
// ProcessResult 上报失败的重试队列
|
// ProcessResult 上报失败的重试队列
|
||||||
private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
|
private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
|
||||||
|
// 上一次空闲时间
|
||||||
|
private long lastIdleTime;
|
||||||
|
|
||||||
private String taskTrackerAddress;
|
private String taskTrackerAddress;
|
||||||
private ActorSelection taskTrackerActorRef;
|
private ActorSelection taskTrackerActorRef;
|
||||||
@ -62,6 +64,8 @@ public class ProcessorTracker {
|
|||||||
private ScheduledExecutorService timingPool;
|
private ScheduledExecutorService timingPool;
|
||||||
|
|
||||||
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100;
|
private static final int THREAD_POOL_QUEUE_MAX_SIZE = 100;
|
||||||
|
// 最多,长时间空闲的 ProcessorTracker 会发起销毁请求
|
||||||
|
private static final long MAX_IDLE_TIME = 120000;
|
||||||
|
|
||||||
// 当 ProcessorTracker 出现根本性错误(比如 Processor 创建失败,所有的任务直接失败)
|
// 当 ProcessorTracker 出现根本性错误(比如 Processor 创建失败,所有的任务直接失败)
|
||||||
private boolean lethal = false;
|
private boolean lethal = false;
|
||||||
@ -82,6 +86,7 @@ public class ProcessorTracker {
|
|||||||
|
|
||||||
this.omsLogger = new OmsServerLogger(instanceId);
|
this.omsLogger = new OmsServerLogger(instanceId);
|
||||||
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
|
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
|
||||||
|
this.lastIdleTime = -1L;
|
||||||
|
|
||||||
// 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE
|
// 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE
|
||||||
initThreadPool();
|
initThreadPool();
|
||||||
@ -221,6 +226,7 @@ public class ProcessorTracker {
|
|||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
|
||||||
|
// 超时检查,如果超时则自动关闭 TaskTracker
|
||||||
long interval = System.currentTimeMillis() - startTime;
|
long interval = System.currentTimeMillis() - startTime;
|
||||||
// 秒级任务的ProcessorTracker不应该关闭
|
// 秒级任务的ProcessorTracker不应该关闭
|
||||||
if (!TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) {
|
if (!TimeExpressionType.frequentTypes.contains(instanceInfo.getTimeExpressionType())) {
|
||||||
@ -243,10 +249,27 @@ public class ProcessorTracker {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 判断线程池活跃状态,长时间空闲则上报 TaskTracker 请求检查
|
||||||
|
if (threadPool.getActiveCount() > 0) {
|
||||||
|
lastIdleTime = -1;
|
||||||
|
}else {
|
||||||
|
if (lastIdleTime == -1) {
|
||||||
|
lastIdleTime = System.currentTimeMillis();
|
||||||
|
}else {
|
||||||
|
long idleTime = System.currentTimeMillis() - lastIdleTime;
|
||||||
|
if (idleTime > MAX_IDLE_TIME) {
|
||||||
|
lastIdleTime = System.currentTimeMillis();
|
||||||
|
log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker.", instanceId, idleTime);
|
||||||
|
|
||||||
|
taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildIdleReport(instanceId), null);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 上报当前 ProcessorTracker 负载
|
// 上报当前 ProcessorTracker 负载
|
||||||
long waitingNum = threadPool.getQueue().size();
|
long waitingNum = threadPool.getQueue().size();
|
||||||
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceId, waitingNum);
|
taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum), null);
|
||||||
taskTrackerActorRef.tell(req, null);
|
|
||||||
log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum);
|
log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,15 +2,12 @@ package com.github.kfcfans.powerjob.worker.core.tracker.task;
|
|||||||
|
|
||||||
import akka.actor.ActorSelection;
|
import akka.actor.ActorSelection;
|
||||||
import akka.pattern.Patterns;
|
import akka.pattern.Patterns;
|
||||||
import com.github.kfcfans.powerjob.common.ExecuteType;
|
import com.github.kfcfans.powerjob.common.*;
|
||||||
import com.github.kfcfans.powerjob.common.InstanceStatus;
|
|
||||||
import com.github.kfcfans.powerjob.common.SystemInstanceResult;
|
|
||||||
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
|
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
|
||||||
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
|
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
|
||||||
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
|
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
|
||||||
import com.github.kfcfans.powerjob.common.response.AskResponse;
|
import com.github.kfcfans.powerjob.common.response.AskResponse;
|
||||||
import com.github.kfcfans.powerjob.worker.OhMyWorker;
|
import com.github.kfcfans.powerjob.worker.OhMyWorker;
|
||||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
|
||||||
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
|
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
|
||||||
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
|
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
|
||||||
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
|
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
|
||||||
@ -105,10 +102,11 @@ public class CommonTaskTracker extends TaskTracker {
|
|||||||
rootTask.setLastReportTime(-1L);
|
rootTask.setLastReportTime(-1L);
|
||||||
rootTask.setSubInstanceId(instanceId);
|
rootTask.setSubInstanceId(instanceId);
|
||||||
|
|
||||||
if (!taskPersistenceService.save(rootTask)) {
|
if (taskPersistenceService.save(rootTask)) {
|
||||||
log.error("[TaskTracker-{}] create root task failed.", instanceId);
|
|
||||||
}else {
|
|
||||||
log.info("[TaskTracker-{}] create root task successfully.", instanceId);
|
log.info("[TaskTracker-{}] create root task successfully.", instanceId);
|
||||||
|
}else {
|
||||||
|
log.error("[TaskTracker-{}] create root task failed.", instanceId);
|
||||||
|
throw new OmsException("create root task failed for instance: " + instanceId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -176,7 +174,7 @@ public class CommonTaskTracker extends TaskTracker {
|
|||||||
success = holder.failedNum == 0;
|
success = holder.failedNum == 0;
|
||||||
result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
|
result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
|
||||||
break;
|
break;
|
||||||
// MapReduce 和 Broadcast 任务实例是否完成根据**Last_Task**的执行情况判断
|
// MapReduce 和 Broadcast 任务实例是否完成根据**LastTask**的执行情况判断
|
||||||
default:
|
default:
|
||||||
|
|
||||||
Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);
|
Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);
|
||||||
@ -267,8 +265,8 @@ public class CommonTaskTracker extends TaskTracker {
|
|||||||
|
|
||||||
taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity);
|
taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity);
|
||||||
|
|
||||||
log.warn("[TaskTracker-{}] task(taskId={}) try to dispatch again due to unreceived the response from ProcessorTracker.",
|
log.warn("[TaskTracker-{}] task(id={},name={}) try to dispatch again due to unreceived the response from ProcessorTracker.",
|
||||||
instanceId, uncheckTask.getTaskId());
|
instanceId, uncheckTask.getTaskId(), uncheckTask.getTaskName());
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
@ -280,9 +278,6 @@ public class CommonTaskTracker extends TaskTracker {
|
|||||||
log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
|
log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
|
||||||
taskPersistenceService.updateLostTasks(disconnectedPTs);
|
taskPersistenceService.updateLostTasks(disconnectedPTs);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6.3 超时检查 -> 检查超时的Task
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -290,7 +285,7 @@ public class CommonTaskTracker extends TaskTracker {
|
|||||||
try {
|
try {
|
||||||
innerRun();
|
innerRun();
|
||||||
}catch (Exception e) {
|
}catch (Exception e) {
|
||||||
log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceInfo.getInstanceId(), e);
|
log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceId, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -267,8 +267,17 @@ public abstract class TaskTracker {
|
|||||||
* @param heartbeatReq ProcessorTracker(任务的执行管理器)发来的心跳包,包含了其当前状态
|
* @param heartbeatReq ProcessorTracker(任务的执行管理器)发来的心跳包,包含了其当前状态
|
||||||
*/
|
*/
|
||||||
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
|
public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) {
|
||||||
ptStatusHolder.updateStatus(heartbeatReq);
|
|
||||||
log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
|
log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq);
|
||||||
|
ptStatusHolder.updateStatus(heartbeatReq);
|
||||||
|
|
||||||
|
// 上报空闲,检查是否已经接收到全部该 ProcessorTracker 负责的任务
|
||||||
|
if (heartbeatReq.getType() == ProcessorTrackerStatusReportReq.IDLE) {
|
||||||
|
List<TaskDO> unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, heartbeatReq.getAddress());
|
||||||
|
if (!CollectionUtils.isEmpty(unfinishedTask)) {
|
||||||
|
log.warn("[TaskTracker-{}] ProcessorTracker is idle now but have unfinished tasks: {}", instanceId, unfinishedTask);
|
||||||
|
unfinishedTask.forEach(task -> updateTaskStatus(task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,7 +33,8 @@ public class ConnectionFactory {
|
|||||||
synchronized (ConnectionFactory.class) {
|
synchronized (ConnectionFactory.class) {
|
||||||
if (dataSource == null) {
|
if (dataSource == null) {
|
||||||
|
|
||||||
StoreStrategy strategy = OhMyWorker.getConfig().getStoreStrategy();
|
// 兼容单元测试,否则没办法单独测试 DAO 层了
|
||||||
|
StoreStrategy strategy = OhMyWorker.getConfig() == null ? StoreStrategy.DISK : OhMyWorker.getConfig().getStoreStrategy();
|
||||||
|
|
||||||
HikariConfig config = new HikariConfig();
|
HikariConfig config = new HikariConfig();
|
||||||
config.setDriverClassName("org.h2.Driver");
|
config.setDriverClassName("org.h2.Driver");
|
||||||
|
@ -66,9 +66,8 @@ public class TaskDO {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "TaskDO{" +
|
return "{" +
|
||||||
"taskId='" + taskId + '\'' +
|
"taskId='" + taskId + '\'' +
|
||||||
", instanceId='" + instanceId + '\'' +
|
|
||||||
", taskName='" + taskName + '\'' +
|
", taskName='" + taskName + '\'' +
|
||||||
", address='" + address + '\'' +
|
", address='" + address + '\'' +
|
||||||
", status=" + status +
|
", status=" + status +
|
||||||
|
@ -156,6 +156,23 @@ public class TaskPersistenceService {
|
|||||||
return Lists.newArrayList();
|
return Lists.newArrayList();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 获取某个 ProcessorTracker 未完成的任务
|
||||||
|
public List<TaskDO> getAllUnFinishedTaskByAddress(Long instanceId, String address) {
|
||||||
|
try {
|
||||||
|
String condition = String.format("status not in (%d, %d)", TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), TaskStatus.WORKER_PROCESS_FAILED.getValue());
|
||||||
|
|
||||||
|
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||||
|
query.setInstanceId(instanceId);
|
||||||
|
query.setAddress(address);
|
||||||
|
query.setQueryCondition(condition);
|
||||||
|
|
||||||
|
return execute(() -> taskDAO.simpleQuery(query));
|
||||||
|
}catch (Exception e) {
|
||||||
|
log.error("[TaskPersistenceService] getAllTaskByAddress for instance(id={}) failed.", instanceId, e);
|
||||||
|
}
|
||||||
|
return Lists.newArrayList();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取指定状态的Task
|
* 获取指定状态的Task
|
||||||
*/
|
*/
|
||||||
|
@ -16,6 +16,12 @@ import lombok.NoArgsConstructor;
|
|||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
public class ProcessorTrackerStatusReportReq implements OmsSerializable {
|
public class ProcessorTrackerStatusReportReq implements OmsSerializable {
|
||||||
|
|
||||||
|
public static final int IDLE = 1;
|
||||||
|
public static final int LOAD = 2;
|
||||||
|
|
||||||
|
// IDLE 代表 ProcessorTracker 长期处于空闲状态,LOAD 代表 负载上报请求
|
||||||
|
private int type;
|
||||||
|
|
||||||
private Long instanceId;
|
private Long instanceId;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -33,11 +39,24 @@ public class ProcessorTrackerStatusReportReq implements OmsSerializable {
|
|||||||
*/
|
*/
|
||||||
private String address;
|
private String address;
|
||||||
|
|
||||||
public ProcessorTrackerStatusReportReq(Long instanceId, long remainTaskNum) {
|
|
||||||
this.instanceId = instanceId;
|
|
||||||
this.remainTaskNum = remainTaskNum;
|
|
||||||
|
|
||||||
this.time = System.currentTimeMillis();
|
public static ProcessorTrackerStatusReportReq buildIdleReport(Long instanceId) {
|
||||||
this.address = OhMyWorker.getWorkerAddress();
|
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq();
|
||||||
|
req.type = IDLE;
|
||||||
|
req.instanceId = instanceId;
|
||||||
|
req.time = System.currentTimeMillis();
|
||||||
|
req.address = OhMyWorker.getWorkerAddress();
|
||||||
|
req.setRemainTaskNum(0);
|
||||||
|
return req;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ProcessorTrackerStatusReportReq buildLoadReport(Long instanceId, Long remainTaskNum) {
|
||||||
|
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq();
|
||||||
|
req.type = LOAD;
|
||||||
|
req.instanceId = instanceId;
|
||||||
|
req.time = System.currentTimeMillis();
|
||||||
|
req.address = OhMyWorker.getWorkerAddress();
|
||||||
|
req.setRemainTaskNum(remainTaskNum);
|
||||||
|
return req;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,78 @@
|
|||||||
|
package com.github.kfcfans.powerjob;
|
||||||
|
|
||||||
|
import akka.actor.ActorSelection;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import com.github.kfcfans.powerjob.common.ExecuteType;
|
||||||
|
import com.github.kfcfans.powerjob.common.ProcessorType;
|
||||||
|
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||||
|
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||||
|
import com.github.kfcfans.powerjob.worker.OhMyWorker;
|
||||||
|
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
|
||||||
|
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
|
||||||
|
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
|
||||||
|
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||||
|
import com.typesafe.config.ConfigFactory;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 启动公共服务
|
||||||
|
*
|
||||||
|
* @author tjq
|
||||||
|
* @since 2020/6/17
|
||||||
|
*/
|
||||||
|
public class CommonTest {
|
||||||
|
|
||||||
|
protected static ActorSelection remoteProcessorTracker;
|
||||||
|
protected static ActorSelection remoteTaskTracker;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void startWorker() throws Exception {
|
||||||
|
OhMyConfig ohMyConfig = new OhMyConfig();
|
||||||
|
ohMyConfig.setAppName("oms-test");
|
||||||
|
ohMyConfig.setEnableTestMode(true);
|
||||||
|
|
||||||
|
OhMyWorker worker = new OhMyWorker();
|
||||||
|
worker.setConfig(ohMyConfig);
|
||||||
|
worker.init();
|
||||||
|
|
||||||
|
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
|
||||||
|
String address = NetUtils.getLocalHost() + ":27777";
|
||||||
|
|
||||||
|
remoteProcessorTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME));
|
||||||
|
remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.Task_TRACKER_ACTOR_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void stop() throws Exception {
|
||||||
|
Thread.sleep(120000);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TaskTrackerStartTaskReq genTaskTrackerStartTaskReq(String processor) {
|
||||||
|
|
||||||
|
InstanceInfo instanceInfo = new InstanceInfo();
|
||||||
|
|
||||||
|
instanceInfo.setJobId(1L);
|
||||||
|
instanceInfo.setInstanceId(10086L);
|
||||||
|
|
||||||
|
instanceInfo.setExecuteType(ExecuteType.STANDALONE.name());
|
||||||
|
instanceInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
|
||||||
|
instanceInfo.setProcessorInfo(processor);
|
||||||
|
|
||||||
|
instanceInfo.setInstanceTimeoutMS(500000);
|
||||||
|
|
||||||
|
instanceInfo.setThreadConcurrency(5);
|
||||||
|
instanceInfo.setTaskRetryNum(3);
|
||||||
|
|
||||||
|
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
|
||||||
|
|
||||||
|
req.setTaskTrackerAddress(NetUtils.getLocalHost() + ":27777");
|
||||||
|
req.setInstanceInfo(instanceInfo);
|
||||||
|
|
||||||
|
req.setTaskId("0");
|
||||||
|
req.setTaskName("ROOT_TASK");
|
||||||
|
req.setTaskCurrentRetryNums(0);
|
||||||
|
|
||||||
|
return req;
|
||||||
|
}
|
||||||
|
}
|
@ -24,6 +24,7 @@ public class PersistenceServiceTest {
|
|||||||
public static void initTable() throws Exception {
|
public static void initTable() throws Exception {
|
||||||
taskPersistenceService.init();
|
taskPersistenceService.init();
|
||||||
|
|
||||||
|
System.out.println("=============== init data ===============");
|
||||||
List<TaskDO> taskList = Lists.newLinkedList();
|
List<TaskDO> taskList = Lists.newLinkedList();
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
TaskDO task = new TaskDO();
|
TaskDO task = new TaskDO();
|
||||||
@ -39,10 +40,11 @@ public class PersistenceServiceTest {
|
|||||||
task.setAddress(NetUtils.getLocalHost());
|
task.setAddress(NetUtils.getLocalHost());
|
||||||
task.setLastModifiedTime(System.currentTimeMillis());
|
task.setLastModifiedTime(System.currentTimeMillis());
|
||||||
task.setCreatedTime(System.currentTimeMillis());
|
task.setCreatedTime(System.currentTimeMillis());
|
||||||
|
task.setLastReportTime(System.currentTimeMillis());
|
||||||
|
task.setResult("");
|
||||||
}
|
}
|
||||||
|
|
||||||
taskPersistenceService.batchSave(taskList);
|
taskPersistenceService.batchSave(taskList);
|
||||||
System.out.println("=============== init data ===============");
|
|
||||||
taskList.forEach(System.out::println);
|
taskList.forEach(System.out::println);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,4 +77,11 @@ public class PersistenceServiceTest {
|
|||||||
System.out.println("updateLostTasks: " + success);
|
System.out.println("updateLostTasks: " + success);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetAllUnFinishedTaskByAddress() throws Exception {
|
||||||
|
System.out.println("=============== testGetAllUnFinishedTaskByAddress ===============");
|
||||||
|
List<TaskDO> res = taskPersistenceService.getAllUnFinishedTaskByAddress(10086L, NetUtils.getLocalHost());
|
||||||
|
System.out.println(res);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
|
|||||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||||
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
|
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
|
||||||
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||||
|
import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker;
|
||||||
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
|
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
|
||||||
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
|
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||||
import com.typesafe.config.ConfigFactory;
|
import com.typesafe.config.ConfigFactory;
|
||||||
@ -23,68 +24,20 @@ import org.junit.jupiter.api.Test;
|
|||||||
* @author tjq
|
* @author tjq
|
||||||
* @since 2020/3/24
|
* @since 2020/3/24
|
||||||
*/
|
*/
|
||||||
public class ProcessorTrackerTest {
|
public class ProcessorTrackerTest extends CommonTest {
|
||||||
|
|
||||||
private static ActorSelection remoteProcessorTracker;
|
|
||||||
|
|
||||||
@BeforeAll
|
|
||||||
public static void startWorker() throws Exception {
|
|
||||||
OhMyConfig ohMyConfig = new OhMyConfig();
|
|
||||||
ohMyConfig.setAppName("oms-test");
|
|
||||||
OhMyWorker worker = new OhMyWorker();
|
|
||||||
worker.setConfig(ohMyConfig);
|
|
||||||
worker.init();
|
|
||||||
|
|
||||||
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));
|
|
||||||
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost(), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
|
||||||
remoteProcessorTracker = testAS.actorSelection(akkaRemotePath);
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterAll
|
|
||||||
public static void stop() throws Exception {
|
|
||||||
Thread.sleep(120000);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBasicProcessor() throws Exception {
|
public void testBasicProcessor() throws Exception {
|
||||||
|
|
||||||
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.oms.processors.TestBasicProcessor");
|
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
|
||||||
remoteProcessorTracker.tell(req, null);
|
remoteProcessorTracker.tell(req, null);
|
||||||
Thread.sleep(30000);
|
Thread.sleep(30000);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMapReduceProcessor() throws Exception {
|
public void testMapReduceProcessor() throws Exception {
|
||||||
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.oms.processors.TestMapReduceProcessor");
|
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestMapReduceProcessor");
|
||||||
remoteProcessorTracker.tell(req, null);
|
remoteProcessorTracker.tell(req, null);
|
||||||
Thread.sleep(30000);
|
Thread.sleep(30000);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static TaskTrackerStartTaskReq genTaskTrackerStartTaskReq(String processor) {
|
|
||||||
|
|
||||||
InstanceInfo instanceInfo = new InstanceInfo();
|
|
||||||
|
|
||||||
instanceInfo.setJobId(1L);
|
|
||||||
instanceInfo.setInstanceId(10086L);
|
|
||||||
|
|
||||||
instanceInfo.setExecuteType(ExecuteType.STANDALONE.name());
|
|
||||||
instanceInfo.setProcessorType(ProcessorType.EMBEDDED_JAVA.name());
|
|
||||||
instanceInfo.setProcessorInfo(processor);
|
|
||||||
|
|
||||||
instanceInfo.setInstanceTimeoutMS(500000);
|
|
||||||
|
|
||||||
instanceInfo.setThreadConcurrency(5);
|
|
||||||
instanceInfo.setTaskRetryNum(3);
|
|
||||||
|
|
||||||
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
|
|
||||||
|
|
||||||
req.setTaskTrackerAddress(NetUtils.getLocalHost());
|
|
||||||
req.setInstanceInfo(instanceInfo);
|
|
||||||
|
|
||||||
req.setTaskId("0");
|
|
||||||
req.setTaskName("ROOT_TASK");
|
|
||||||
req.setTaskCurrentRetryNums(0);
|
|
||||||
|
|
||||||
return req;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@ public class TestUtils {
|
|||||||
public static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType, TimeExpressionType timeExpressionType) {
|
public static ServerScheduleJobReq genServerScheduleJobReq(ExecuteType executeType, TimeExpressionType timeExpressionType) {
|
||||||
ServerScheduleJobReq req = new ServerScheduleJobReq();
|
ServerScheduleJobReq req = new ServerScheduleJobReq();
|
||||||
|
|
||||||
|
req.setJobId(1L);
|
||||||
req.setInstanceId(10086L);
|
req.setInstanceId(10086L);
|
||||||
req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT));
|
req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT));
|
||||||
|
|
||||||
@ -38,15 +39,15 @@ public class TestUtils {
|
|||||||
switch (executeType) {
|
switch (executeType) {
|
||||||
case STANDALONE:
|
case STANDALONE:
|
||||||
req.setExecuteType(ExecuteType.STANDALONE.name());
|
req.setExecuteType(ExecuteType.STANDALONE.name());
|
||||||
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBasicProcessor");
|
req.setProcessorInfo("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
|
||||||
break;
|
break;
|
||||||
case MAP_REDUCE:
|
case MAP_REDUCE:
|
||||||
req.setExecuteType(ExecuteType.MAP_REDUCE.name());
|
req.setExecuteType(ExecuteType.MAP_REDUCE.name());
|
||||||
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestMapReduceProcessor");
|
req.setProcessorInfo("com.github.kfcfans.powerjob.processors.TestMapReduceProcessor");
|
||||||
break;
|
break;
|
||||||
case BROADCAST:
|
case BROADCAST:
|
||||||
req.setExecuteType(ExecuteType.BROADCAST.name());
|
req.setExecuteType(ExecuteType.BROADCAST.name());
|
||||||
req.setProcessorInfo("com.github.kfcfans.oms.processors.TestBroadcastProcessor");
|
req.setProcessorInfo("com.github.kfcfans.powerjob.processors.TestBroadcastProcessor");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,40 @@
|
|||||||
|
package com.github.kfcfans.powerjob.function;
|
||||||
|
|
||||||
|
import com.github.kfcfans.powerjob.CommonTest;
|
||||||
|
import com.github.kfcfans.powerjob.TestUtils;
|
||||||
|
import com.github.kfcfans.powerjob.common.ExecuteType;
|
||||||
|
import com.github.kfcfans.powerjob.common.TimeExpressionType;
|
||||||
|
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
|
||||||
|
import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker;
|
||||||
|
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
|
||||||
|
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
|
||||||
|
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 空闲测试
|
||||||
|
*
|
||||||
|
* @author tjq
|
||||||
|
* @since 2020/6/17
|
||||||
|
*/
|
||||||
|
public class IdleTest extends CommonTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testProcessorTrackerSendIdleReport() throws Exception {
|
||||||
|
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
|
||||||
|
ProcessorTracker pt = new ProcessorTracker(req);
|
||||||
|
Thread.sleep(300000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTaskTrackerProcessorIdle() throws Exception {
|
||||||
|
|
||||||
|
ProcessorTrackerStatusReportReq req = ProcessorTrackerStatusReportReq.buildIdleReport(10086L);
|
||||||
|
ServerScheduleJobReq serverScheduleJobReq = TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.API);
|
||||||
|
|
||||||
|
TaskTracker taskTracker = TaskTracker.create(serverScheduleJobReq);
|
||||||
|
if (taskTracker != null) {
|
||||||
|
taskTracker.receiveProcessorTrackerHeartbeat(req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -5,7 +5,7 @@ akka {
|
|||||||
allow-java-serialization = off
|
allow-java-serialization = off
|
||||||
|
|
||||||
serialization-bindings {
|
serialization-bindings {
|
||||||
"OmsSerializable" = jackson-cbor
|
"com.github.kfcfans.powerjob.common.OmsSerializable" = jackson-cbor
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
remote {
|
remote {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user