Merge branch 'v3.3.1' into jenkins_auto_build

This commit is contained in:
“tjq” 2020-10-18 20:20:10 +08:00
commit 8f2949e9af
27 changed files with 310 additions and 75 deletions

View File

@ -10,20 +10,37 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.3.0</version>
<version>3.3.1</version>
<packaging>jar</packaging>
<properties>
<powerjob.common.version>3.3.0</powerjob.common.version>
<junit.version>5.6.1</junit.version>
<fastjson.version>1.2.68</fastjson.version>
<powerjob.common.version>3.3.1</powerjob.common.version>
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
</properties>
<dependencies>
<!-- fastJson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- oms-common -->
<dependency>
<groupId>com.github.kfcfans</groupId>
<artifactId>powerjob-common</artifactId>
<version>${powerjob.common.version}</version>
<exclusions>
<exclusion>
<groupId>com.typesafe.akka</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Junit 测试 -->
@ -35,4 +52,47 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${mvn.shade.plugin.version}</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<relocations>
<relocation>
<pattern>okhttp3</pattern>
<shadedPattern>shade.powerjob.okhttp3</shadedPattern>
</relocation>
<relocation>
<pattern>okio</pattern>
<shadedPattern>shade.powerjob.okio</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>shade.powerjob.com.google</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache</pattern>
<shadedPattern>shade.powerjob.org.apache</shadedPattern>
</relocation>
<relocation>
<pattern>com.alibaba</pattern>
<shadedPattern>shade.powerjob.com.alibaba</shadedPattern>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.client;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.OpenAPIConstant;
@ -8,7 +9,6 @@ import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.response.*;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.HttpUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import okhttp3.FormBody;
@ -62,7 +62,7 @@ public class OhMyClient {
try {
String result = assertApp(appName, password, url);
if (StringUtils.isNotEmpty(result)) {
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
appId = Long.parseLong(resultDTO.getData().toString());
currentAddress = addr;
@ -107,9 +107,9 @@ public class OhMyClient {
request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JsonUtils.toJSONStringUnsafe(request);
String json = JSONObject.toJSONString(request);
String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json));
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -124,7 +124,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_JOB, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -139,7 +139,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DISABLE_JOB, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -154,7 +154,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.ENABLE_JOB, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -169,7 +169,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DELETE_JOB, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -190,7 +190,7 @@ public class OhMyClient {
builder.add("instanceParams", instanceParams);
}
String post = postHA(OpenAPIConstant.RUN_JOB, builder.build());
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
public ResultDTO<Long> runJob(Long jobId) throws Exception {
return runJob(jobId, null, 0);
@ -209,7 +209,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.STOP_INSTANCE, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -225,7 +225,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.CANCEL_INSTANCE, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -241,7 +241,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.RETRY_INSTANCE, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -255,7 +255,7 @@ public class OhMyClient {
.add("instanceId", instanceId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_INSTANCE_STATUS, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -269,7 +269,7 @@ public class OhMyClient {
.add("instanceId", instanceId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_INSTANCE_INFO, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/* ************* Workflow 区 ************* */
@ -282,9 +282,9 @@ public class OhMyClient {
public ResultDTO<Long> saveWorkflow(SaveWorkflowRequest request) throws Exception {
request.setAppId(appId);
MediaType jsonType = MediaType.parse("application/json; charset=utf-8");
String json = JsonUtils.toJSONStringUnsafe(request);
String json = JSONObject.toJSONString(request);
String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json));
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -299,7 +299,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_WORKFLOW, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -314,7 +314,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DISABLE_WORKFLOW, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -329,7 +329,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.ENABLE_WORKFLOW, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -344,7 +344,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.DELETE_WORKFLOW, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -364,7 +364,7 @@ public class OhMyClient {
builder.add("initParams", initParams);
}
String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build());
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
public ResultDTO<Long> runWorkflow(Long workflowId) throws Exception {
return runWorkflow(workflowId, null, 0);
@ -383,7 +383,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.STOP_WORKFLOW_INSTANCE, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}
/**
@ -398,7 +398,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO, body);
return JsonUtils.parseObject(post, ResultDTO.class);
return JSONObject.parseObject(post, ResultDTO.class);
}

View File

@ -1,3 +1,4 @@
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
@ -5,7 +6,6 @@ import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.client.OhMyClient;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@ -46,13 +46,13 @@ public class TestClient {
newJobInfo.setMinDiskSpace(1.3);
ResultDTO<Long> resultDTO = ohMyClient.saveJob(newJobInfo);
System.out.println(JsonUtils.toJSONString(resultDTO));
System.out.println(JSONObject.toJSONString(resultDTO));
}
@Test
public void testFetchJob() throws Exception {
ResultDTO<JobInfoDTO> fetchJob = ohMyClient.fetchJob(1L);
System.out.println(JsonUtils.toJSONStringUnsafe(fetchJob));
System.out.println(JSONObject.toJSONString(fetchJob));
}
@Test
@ -93,21 +93,21 @@ public class TestClient {
@Test
public void testCancelInstanceInTimeWheel() throws Exception {
ResultDTO<Long> startRes = ohMyClient.runJob(15L, "start by OhMyClient", 20000);
System.out.println("runJob result: " + JsonUtils.toJSONString(startRes));
System.out.println("runJob result: " + JSONObject.toJSONString(startRes));
ResultDTO<Void> cancelRes = ohMyClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes));
System.out.println("cancelJob result: " + JSONObject.toJSONString(cancelRes));
}
@Test
public void testCancelInstanceInDatabase() throws Exception {
ResultDTO<Long> startRes = ohMyClient.runJob(15L, "start by OhMyClient", 2000000);
System.out.println("runJob result: " + JsonUtils.toJSONString(startRes));
System.out.println("runJob result: " + JSONObject.toJSONString(startRes));
// 手动重启 server干掉时间轮中的调度数据
TimeUnit.MINUTES.sleep(1);
ResultDTO<Void> cancelRes = ohMyClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes));
System.out.println("cancelJob result: " + JSONObject.toJSONString(cancelRes));
}
@Test

View File

@ -1,3 +1,4 @@
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.client.OhMyClient;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.ProcessorType;
@ -5,7 +6,6 @@ import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@ -37,7 +37,7 @@ public class TestWorkflow {
base.setProcessorInfo("com.github.kfcfans.powerjob.samples.workflow.WorkflowStandaloneProcessor");
for (int i = 0; i < 5; i++) {
SaveJobInfoRequest request = JsonUtils.parseObject(JsonUtils.toBytes(base), SaveJobInfoRequest.class);
SaveJobInfoRequest request = JSONObject.parseObject(JSONObject.toJSONBytes(base), SaveJobInfoRequest.class);
request.setJobName(request.getJobName() + i);
System.out.println(ohMyClient.saveJob(request));
}

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.3.0</version>
<version>3.3.1</version>
<packaging>jar</packaging>
<properties>

View File

@ -0,0 +1,20 @@
package com.github.kfcfans.powerjob.common.request;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* worker 查询 执行器集群动态上线需要
*
* @author tjq
* @since 10/17/20
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WorkerQueryExecutorClusterReq implements OmsSerializable {
private Long appId;
private Long jobId;
}

View File

@ -4,6 +4,8 @@ import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import lombok.*;
import java.nio.charset.StandardCharsets;
/**
* Pattens.ask 的响应
@ -31,7 +33,11 @@ public class AskResponse implements OmsSerializable {
AskResponse r = new AskResponse();
r.success = true;
if (data != null) {
r.data = JsonUtils.toBytes(data);
if (data instanceof String) {
r.data = ((String) data).getBytes(StandardCharsets.UTF_8);
} else {
r.data = JsonUtils.toBytes(data);
}
}
return r;
}

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.common.utils;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.kfcfans.powerjob.common.PowerJobException;
import org.apache.commons.lang3.exception.ExceptionUtils;
@ -48,6 +49,10 @@ public class JsonUtils {
return objectMapper.readValue(b, clz);
}
public static <T> T parseObject(byte[] b, TypeReference<T> typeReference) throws Exception {
return objectMapper.readValue(b, typeReference);
}
public static <T> T parseObjectUnsafe(String json, Class<T> clz) {
try {
return objectMapper.readValue(json, clz);

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.3.0</version>
<version>3.3.1</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.3.4.RELEASE</springboot.version>
<powerjob.common.version>3.3.0</powerjob.common.version>
<powerjob.common.version>3.3.1</powerjob.common.version>
<!-- 数据库驱动版本使用的是spring-boot-dependencies管理的版本 -->
<mysql.version>8.0.19</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version>

View File

@ -21,7 +21,7 @@ public class OhMyApplication {
"******************* PowerJob Tips *******************\n" +
"如果应用无法启动,我们建议您仔细阅读以下文档来解决:\n" +
"if server can't startup, we recommend that you read the documentation to find a solution:\n" +
"https://www.yuque.com/powerjob/guidence/xp5ygc#xMQC9\n" +
"https://www.yuque.com/powerjob/guidence/problem\n" +
"******************* PowerJob Tips *******************\n\n";
public static void main(String[] args) {

View File

@ -2,18 +2,18 @@ package com.github.kfcfans.powerjob.server.akka.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat;
import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq;
import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerRequest;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.request.*;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.InstanceLogService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.core.env.Environment;
import java.util.List;
import java.util.Optional;
/**
@ -41,6 +42,7 @@ public class ServerActor extends AbstractActor {
.match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq)
.match(WorkerLogReportReq.class, this::onReceiveWorkerLogReportReq)
.match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest)
.match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
}
@ -110,6 +112,33 @@ public class ServerActor extends AbstractActor {
getSender().tell(askResponse, getSelf());
}
/**
* 处理 worker 请求获取当前任务所有处理器节点的请求
* @param req jobId + appId
*/
private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) {
AskResponse askResponse;
Long jobId = req.getJobId();
Long appId = req.getAppId();
JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class);
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
if (jobInfoOpt.isPresent()) {
JobInfoDO jobInfo = jobInfoOpt.get();
if (!jobInfo.getAppId().equals(appId)) {
askResponse = AskResponse.failed("Permission Denied!");
}else {
List<String> sortedAvailableWorker = WorkerManagerService.getSortedAvailableWorker(appId, jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace());
askResponse = AskResponse.succeed(sortedAvailableWorker);
}
}else {
askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId);
}
getSender().tell(askResponse, getSelf());
}
// 不需要加锁 Spring IOC 中重复取并没什么问题
private InstanceManager getInstanceManager() {
if (instanceManager == null) {

View File

@ -2,20 +2,26 @@ package com.github.kfcfans.powerjob.server.web.controller;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.ha.ClusterStatusHolder;
import com.github.kfcfans.powerjob.server.service.ha.ServerSelectService;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.taobao.api.internal.cluster.ClusterManager;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
import java.util.Optional;
import java.util.TimeZone;
@ -34,8 +40,10 @@ public class ServerController {
private ServerSelectService serverSelectService;
@Resource
private AppInfoRepository appInfoRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@GetMapping("assert")
@GetMapping("/assert")
public ResultDTO<Long> assertAppName(String appName) {
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findByAppName(appName);
return appInfoOpt.map(appInfoDO -> ResultDTO.success(appInfoDO.getId())).

View File

@ -11,7 +11,7 @@ spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置非核心依赖通过配置 oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-daily
spring.data.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority
####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) #######
spring.mail.host=smtp.163.com

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.3.0</version>
<version>3.3.1</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.3.0</powerjob.worker.version>
<powerjob.worker.version>3.3.1</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.3.0</version>
<version>3.3.1</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.starter.version>3.3.0</powerjob.worker.starter.version>
<powerjob.worker.starter.version>3.3.1</powerjob.worker.starter.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.TaskResult;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -34,9 +35,16 @@ public class BroadcastProcessorDemo extends BroadcastProcessor {
@Override
public ProcessResult process(TaskContext taskContext) throws Exception {
OmsLogger logger = taskContext.getOmsLogger();
System.out.println("===== BroadcastProcessorDemo#process ======");
taskContext.getOmsLogger().info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost());
Thread.sleep(45 * 1000);
logger.info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost());
long sleepTime = 1000;
try {
sleepTime = Long.parseLong(taskContext.getJobParams());
}catch (Exception e) {
logger.warn("[BroadcastProcessor] parse sleep time failed!", e);
}
Thread.sleep(Math.max(sleepTime, 1000));
return new ProcessResult(true);
}

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>3.3.0</version>
<version>3.3.1</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.3.0</powerjob.worker.version>
<powerjob.worker.version>3.3.1</powerjob.worker.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
</properties>

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.3.0</version>
<version>3.3.1</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.3.0</powerjob.common.version>
<powerjob.common.version>3.3.1</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.worker.common.utils;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.common.RemoteConstant;
@ -45,13 +46,20 @@ public class AkkaUtils {
*/
public static boolean reliableTransmit(ActorSelection remote, Object msg) {
try {
CompletionStage<Object> ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse response = (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
return response.isSuccess();
return easyAsk(remote, msg).isSuccess();
}catch (Exception e) {
log.warn("[Oms-Transmitter] transmit {} failed, reason is {}", msg, e.toString());
}
return false;
}
public static AskResponse easyAsk(ActorSelection remote, Object msg) {
try {
CompletionStage<Object> ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
return (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}catch (Exception e) {
throw new PowerJobException(e);
}
}
}

View File

@ -33,7 +33,7 @@ public class ProcessorTrackerStatus {
*/
public void init(String address) {
this.address = address;
this.lastActiveTime = System.currentTimeMillis();
this.lastActiveTime = - 1;
this.remainTaskNum = 0;
this.dispatched = false;
this.connected = false;

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.worker.core.ha;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
@ -13,6 +14,7 @@ import java.util.Map;
* @author tjq
* @since 2020/3/28
*/
@Slf4j
public class ProcessorTrackerStatusHolder {
// ProcessorTracker的address(IP:Port) -> 状态
@ -36,8 +38,14 @@ public class ProcessorTrackerStatusHolder {
* 根据 ProcessorTracker 的心跳更新状态
*/
public void updateStatus(ProcessorTrackerStatusReportReq heartbeatReq) {
ProcessorTrackerStatus processorTrackerStatus = address2Status.get(heartbeatReq.getAddress());
processorTrackerStatus.update(heartbeatReq);
// remove 前突然收到了 PT 心跳同时立即被派发才可能出现这种情况0.001% 概率
ProcessorTrackerStatus pts = address2Status.computeIfAbsent(heartbeatReq.getAddress(), ignore-> {
log.warn("[ProcessorTrackerStatusHolder] unregistered worker's heartbeat request: {}", heartbeatReq);
ProcessorTrackerStatus processorTrackerStatus = new ProcessorTrackerStatus();
processorTrackerStatus.init(heartbeatReq.getAddress());
return processorTrackerStatus;
});
pts.update(heartbeatReq);
}
/**
@ -74,4 +82,24 @@ public class ProcessorTrackerStatusHolder {
});
return result;
}
/**
* 注册新的执行节点
* @param address 新的执行节点地址
* @return true: 注册成功 / false已存在
*/
public boolean register(String address) {
ProcessorTrackerStatus pts = address2Status.get(address);
if (pts != null) {
return false;
}
pts = new ProcessorTrackerStatus();
pts.init(address);
address2Status.put(address, pts);
return true;
}
public void remove(List<String> addressList) {
addressList.forEach(address2Status::remove);
}
}

View File

@ -49,7 +49,7 @@ public abstract class ScriptProcessor implements BasicProcessor {
throw new RuntimeException("create script file failed");
}
// 如果是下载则从网络获取
// 如果是下载则从网络获取
for (String protocol : DOWNLOAD_PROTOCOL) {
if (processorInfo.startsWith(protocol)) {
FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000);
@ -57,7 +57,7 @@ public abstract class ScriptProcessor implements BasicProcessor {
}
}
// 持久化到本地
// 非下载链接 processInfo 生成可执行文件
try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) {
bw.write(processorInfo);
bw.flush();

View File

@ -58,7 +58,13 @@ public class CommonTaskTracker extends TaskTracker {
// 启动定时任务任务派发 & 状态检查
scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 0, 5, TimeUnit.SECONDS);
scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS);
scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 13, 13, TimeUnit.SECONDS);
// 如果是 MR 任务则需要启动执行器动态检测装置
ExecuteType executeType = ExecuteType.valueOf(req.getExecuteType());
if (executeType == ExecuteType.MAP || executeType == ExecuteType.MAP_REDUCE) {
scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES);
}
}
@Override
@ -284,7 +290,10 @@ public class CommonTaskTracker extends TaskTracker {
List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();
if (!disconnectedPTs.isEmpty()) {
log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
taskPersistenceService.updateLostTasks(disconnectedPTs);
if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) {
ptStatusHolder.remove(disconnectedPTs);
log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs);
}
}
}

View File

@ -20,7 +20,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -85,7 +84,7 @@ public class FrequentTaskTracker extends TaskTracker {
// 1. 初始化定时调度线程池
String poolName = String.format("ftttp-%d", req.getInstanceId()) + "-%d";
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
this.scheduledPool = Executors.newScheduledThreadPool(3, factory);
this.scheduledPool = Executors.newScheduledThreadPool(4, factory);
// 2. 启动任务发射器
launcher = new Launcher();
@ -103,6 +102,8 @@ public class FrequentTaskTracker extends TaskTracker {
scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 1, 2, TimeUnit.SECONDS);
// 4. 启动状态检查器
scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS);
// 5. 启动执行器动态检测装置
scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES);
}
@Override
@ -227,6 +228,17 @@ public class FrequentTaskTracker extends TaskTracker {
private void checkStatus() {
Stopwatch stopwatch = Stopwatch.createStarted();
// worker 挂掉的任务直接置为失败
List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();
if (!disconnectedPTs.isEmpty()) {
log.warn("[FQTaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, false)) {
ptStatusHolder.remove(disconnectedPTs);
log.warn("[FQTaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs);
}
}
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
long instanceTimeoutMS = instanceInfo.getInstanceTimeoutMS();
long nowTS = System.currentTimeMillis();

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.worker.core.tracker.task;
import akka.actor.ActorSelection;
import com.fasterxml.jackson.core.type.TypeReference;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.RemoteConstant;
@ -8,7 +9,10 @@ import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.WorkerQueryExecutorClusterReq;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
@ -63,10 +67,10 @@ public abstract class TaskTracker {
// 是否结束
protected AtomicBoolean finished;
// 上报时间缓存
private Cache<String, Long> taskId2LastReportTime;
private final Cache<String, Long> taskId2LastReportTime;
// 分段锁
private SegmentLock segmentLock;
private final SegmentLock segmentLock;
private static final int UPDATE_CONCURRENCY = 4;
protected TaskTracker(ServerScheduleJobReq req) {
@ -471,6 +475,37 @@ public abstract class TaskTracker {
}
}
/**
* 执行器动态上线for 秒级任务和 MR 任务
* 原则server 查询得到的 执行器状态不会干预 worker 自己维护的状态即只做新增不做任何修改
*/
protected class WorkerDetector implements Runnable {
@Override
public void run() {
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
if (StringUtils.isEmpty(serverPath)) {
log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId);
return;
}
WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(OhMyWorker.getAppId(), instanceInfo.getJobId());
AskResponse response = AkkaUtils.easyAsk(OhMyWorker.actorSystem.actorSelection(serverPath), req);
if (!response.isSuccess()) {
log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage());
return;
}
try {
List<String> workerList = JsonUtils.parseObject(response.getData(), new TypeReference<List<String>>() {});
workerList.forEach(address -> {
if (ptStatusHolder.register(address)) {
log.info("[TaskTracker-{}] detective new worker: {}", instanceId, address);
}
});
}catch (Exception e) {
log.warn("[TaskTracker-{}] detective failed!", instanceId, e);
}
}
}
/**
* 存储任务实例产生的各个Task状态用于分析任务实例执行情况
*/

View File

@ -98,19 +98,26 @@ public class TaskPersistenceService {
* 更新被派发到已经失联的 ProcessorTracker 的任务重新执行
* update task_info
* set address = 'N/A', status = 0
* where address in () and status not in (5,6)
* where address in () and status not in (5,6) and instance_id = 277
*/
public boolean updateLostTasks(List<String> addressList) {
public boolean updateLostTasks(Long instanceId, List<String> addressList, boolean retry) {
TaskDO updateEntity = new TaskDO();
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
updateEntity.setLastModifiedTime(System.currentTimeMillis());
if (retry) {
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
}else {
updateEntity.setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue());
updateEntity.setResult("maybe worker down");
}
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
String queryConditionFormat = "address in %s and status not in (%d, %d)";
String queryCondition = String.format(queryConditionFormat, CommonUtils.getInStringCondition(addressList), TaskStatus.WORKER_PROCESS_FAILED.getValue(), TaskStatus.WORKER_PROCESS_SUCCESS.getValue());
query.setQueryCondition(queryCondition);
log.debug("[TaskPersistenceService] updateLostTasks-QUERY-SQL: {}", query.getQueryCondition());
try {
return execute(() -> taskDAO.simpleUpdate(query, updateEntity));

View File

@ -73,7 +73,7 @@ public class PersistenceServiceTest {
@Test
public void testUpdateLostTasks() throws Exception {
Thread.sleep(1000);
boolean success = taskPersistenceService.updateLostTasks(Lists.newArrayList(NetUtils.getLocalHost()));
boolean success = taskPersistenceService.updateLostTasks(10086L, Lists.newArrayList(NetUtils.getLocalHost()), true);
System.out.println("updateLostTasks: " + success);
}