diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index 76a28752..1638d059 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,20 +10,37 @@ 4.0.0 powerjob-client - 3.3.0 + 3.3.1 jar - 3.3.0 5.6.1 + 1.2.68 + 3.3.1 + + 3.2.4 + + + + com.alibaba + fastjson + ${fastjson.version} + + com.github.kfcfans powerjob-common ${powerjob.common.version} + + + com.typesafe.akka + * + + @@ -35,4 +52,47 @@ + + + + org.apache.maven.plugins + maven-shade-plugin + ${mvn.shade.plugin.version} + + false + + + okhttp3 + shade.powerjob.okhttp3 + + + okio + shade.powerjob.okio + + + com.google + shade.powerjob.com.google + + + org.apache + shade.powerjob.org.apache + + + com.alibaba + shade.powerjob.com.alibaba + + + + + + package + + shade + + + + + + + \ No newline at end of file diff --git a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java index a2f91a13..cadeb661 100644 --- a/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java +++ b/powerjob-client/src/main/java/com/github/kfcfans/powerjob/client/OhMyClient.java @@ -1,5 +1,6 @@ package com.github.kfcfans.powerjob.client; +import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.OpenAPIConstant; @@ -8,7 +9,6 @@ import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; import com.github.kfcfans.powerjob.common.response.*; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils; -import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import okhttp3.FormBody; @@ -62,7 +62,7 @@ public class OhMyClient { try { String result = assertApp(appName, password, url); if (StringUtils.isNotEmpty(result)) { - ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class); + ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class); if (resultDTO.isSuccess()) { appId = Long.parseLong(resultDTO.getData().toString()); currentAddress = addr; @@ -107,9 +107,9 @@ public class OhMyClient { request.setAppId(appId); MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); - String json = JsonUtils.toJSONStringUnsafe(request); + String json = JSONObject.toJSONString(request); String post = postHA(OpenAPIConstant.SAVE_JOB, RequestBody.create(jsonType, json)); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -124,7 +124,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_JOB, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -139,7 +139,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.DISABLE_JOB, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -154,7 +154,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.ENABLE_JOB, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -169,7 +169,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.DELETE_JOB, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -190,7 +190,7 @@ public class OhMyClient { builder.add("instanceParams", instanceParams); } String post = postHA(OpenAPIConstant.RUN_JOB, builder.build()); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } public ResultDTO runJob(Long jobId) throws Exception { return runJob(jobId, null, 0); @@ -209,7 +209,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.STOP_INSTANCE, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -225,7 +225,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.CANCEL_INSTANCE, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -241,7 +241,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.RETRY_INSTANCE, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -255,7 +255,7 @@ public class OhMyClient { .add("instanceId", instanceId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_INSTANCE_STATUS, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -269,7 +269,7 @@ public class OhMyClient { .add("instanceId", instanceId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_INSTANCE_INFO, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /* ************* Workflow 区 ************* */ @@ -282,9 +282,9 @@ public class OhMyClient { public ResultDTO saveWorkflow(SaveWorkflowRequest request) throws Exception { request.setAppId(appId); MediaType jsonType = MediaType.parse("application/json; charset=utf-8"); - String json = JsonUtils.toJSONStringUnsafe(request); + String json = JSONObject.toJSONString(request); String post = postHA(OpenAPIConstant.SAVE_WORKFLOW, RequestBody.create(jsonType, json)); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -299,7 +299,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_WORKFLOW, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -314,7 +314,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.DISABLE_WORKFLOW, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -329,7 +329,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.ENABLE_WORKFLOW, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -344,7 +344,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.DELETE_WORKFLOW, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -364,7 +364,7 @@ public class OhMyClient { builder.add("initParams", initParams); } String post = postHA(OpenAPIConstant.RUN_WORKFLOW, builder.build()); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } public ResultDTO runWorkflow(Long workflowId) throws Exception { return runWorkflow(workflowId, null, 0); @@ -383,7 +383,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.STOP_WORKFLOW_INSTANCE, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } /** @@ -398,7 +398,7 @@ public class OhMyClient { .add("appId", appId.toString()) .build(); String post = postHA(OpenAPIConstant.FETCH_WORKFLOW_INSTANCE_INFO, body); - return JsonUtils.parseObject(post, ResultDTO.class); + return JSONObject.parseObject(post, ResultDTO.class); } diff --git a/powerjob-client/src/test/java/TestClient.java b/powerjob-client/src/test/java/TestClient.java index 5cebd7ac..dd469c99 100644 --- a/powerjob-client/src/test/java/TestClient.java +++ b/powerjob-client/src/test/java/TestClient.java @@ -1,3 +1,4 @@ +import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.ProcessorType; import com.github.kfcfans.powerjob.common.TimeExpressionType; @@ -5,7 +6,6 @@ import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.response.JobInfoDTO; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.client.OhMyClient; -import com.github.kfcfans.powerjob.common.utils.JsonUtils; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -46,13 +46,13 @@ public class TestClient { newJobInfo.setMinDiskSpace(1.3); ResultDTO resultDTO = ohMyClient.saveJob(newJobInfo); - System.out.println(JsonUtils.toJSONString(resultDTO)); + System.out.println(JSONObject.toJSONString(resultDTO)); } @Test public void testFetchJob() throws Exception { ResultDTO fetchJob = ohMyClient.fetchJob(1L); - System.out.println(JsonUtils.toJSONStringUnsafe(fetchJob)); + System.out.println(JSONObject.toJSONString(fetchJob)); } @Test @@ -93,21 +93,21 @@ public class TestClient { @Test public void testCancelInstanceInTimeWheel() throws Exception { ResultDTO startRes = ohMyClient.runJob(15L, "start by OhMyClient", 20000); - System.out.println("runJob result: " + JsonUtils.toJSONString(startRes)); + System.out.println("runJob result: " + JSONObject.toJSONString(startRes)); ResultDTO cancelRes = ohMyClient.cancelInstance(startRes.getData()); - System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes)); + System.out.println("cancelJob result: " + JSONObject.toJSONString(cancelRes)); } @Test public void testCancelInstanceInDatabase() throws Exception { ResultDTO startRes = ohMyClient.runJob(15L, "start by OhMyClient", 2000000); - System.out.println("runJob result: " + JsonUtils.toJSONString(startRes)); + System.out.println("runJob result: " + JSONObject.toJSONString(startRes)); // 手动重启 server,干掉时间轮中的调度数据 TimeUnit.MINUTES.sleep(1); ResultDTO cancelRes = ohMyClient.cancelInstance(startRes.getData()); - System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes)); + System.out.println("cancelJob result: " + JSONObject.toJSONString(cancelRes)); } @Test diff --git a/powerjob-client/src/test/java/TestWorkflow.java b/powerjob-client/src/test/java/TestWorkflow.java index 0e888742..9e04d80e 100644 --- a/powerjob-client/src/test/java/TestWorkflow.java +++ b/powerjob-client/src/test/java/TestWorkflow.java @@ -1,3 +1,4 @@ +import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.client.OhMyClient; import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.ProcessorType; @@ -5,7 +6,6 @@ import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest; -import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.google.common.collect.Lists; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -37,7 +37,7 @@ public class TestWorkflow { base.setProcessorInfo("com.github.kfcfans.powerjob.samples.workflow.WorkflowStandaloneProcessor"); for (int i = 0; i < 5; i++) { - SaveJobInfoRequest request = JsonUtils.parseObject(JsonUtils.toBytes(base), SaveJobInfoRequest.class); + SaveJobInfoRequest request = JSONObject.parseObject(JSONObject.toJSONBytes(base), SaveJobInfoRequest.class); request.setJobName(request.getJobName() + i); System.out.println(ohMyClient.saveJob(request)); } diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index 947b04b9..3bca0b62 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.3.0 + 3.3.1 jar diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/WorkerQueryExecutorClusterReq.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/WorkerQueryExecutorClusterReq.java new file mode 100644 index 00000000..dd72c3c5 --- /dev/null +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/WorkerQueryExecutorClusterReq.java @@ -0,0 +1,20 @@ +package com.github.kfcfans.powerjob.common.request; + +import com.github.kfcfans.powerjob.common.OmsSerializable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * worker 查询 执行器集群(动态上线需要) + * + * @author tjq + * @since 10/17/20 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class WorkerQueryExecutorClusterReq implements OmsSerializable { + private Long appId; + private Long jobId; +} diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java index 52331437..b6fc9d28 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/response/AskResponse.java @@ -4,6 +4,8 @@ import com.github.kfcfans.powerjob.common.OmsSerializable; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import lombok.*; +import java.nio.charset.StandardCharsets; + /** * Pattens.ask 的响应 @@ -31,7 +33,11 @@ public class AskResponse implements OmsSerializable { AskResponse r = new AskResponse(); r.success = true; if (data != null) { - r.data = JsonUtils.toBytes(data); + if (data instanceof String) { + r.data = ((String) data).getBytes(StandardCharsets.UTF_8); + } else { + r.data = JsonUtils.toBytes(data); + } } return r; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java index 3d0a61ae..2e91357f 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/JsonUtils.java @@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.common.utils; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.kfcfans.powerjob.common.PowerJobException; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -48,6 +49,10 @@ public class JsonUtils { return objectMapper.readValue(b, clz); } + public static T parseObject(byte[] b, TypeReference typeReference) throws Exception { + return objectMapper.readValue(b, typeReference); + } + public static T parseObjectUnsafe(String json, Class clz) { try { return objectMapper.readValue(json, clz); diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 6b72e369..a5e18322 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.3.0 + 3.3.1 jar 2.9.2 2.3.4.RELEASE - 3.3.0 + 3.3.1 8.0.19 19.7.0.0 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/OhMyApplication.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/OhMyApplication.java index 434a2e9b..ddb10420 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/OhMyApplication.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/OhMyApplication.java @@ -21,7 +21,7 @@ public class OhMyApplication { "******************* PowerJob Tips *******************\n" + "如果应用无法启动,我们建议您仔细阅读以下文档来解决:\n" + "if server can't startup, we recommend that you read the documentation to find a solution:\n" + - "https://www.yuque.com/powerjob/guidence/xp5ygc#xMQC9\n" + + "https://www.yuque.com/powerjob/guidence/problem\n" + "******************* PowerJob Tips *******************\n\n"; public static void main(String[] args) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java index fdd82196..685a2056 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java @@ -2,18 +2,18 @@ package com.github.kfcfans.powerjob.server.akka.actors; import akka.actor.AbstractActor; import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest; -import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq; -import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat; -import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq; -import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerRequest; +import com.github.kfcfans.powerjob.common.PowerJobException; +import com.github.kfcfans.powerjob.common.request.*; import com.github.kfcfans.powerjob.common.response.AskResponse; +import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository; +import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.service.InstanceLogService; import com.github.kfcfans.powerjob.server.service.instance.InstanceManager; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.core.env.Environment; +import java.util.List; import java.util.Optional; /** @@ -41,6 +42,7 @@ public class ServerActor extends AbstractActor { .match(TaskTrackerReportInstanceStatusReq.class, this::onReceiveTaskTrackerReportInstanceStatusReq) .match(WorkerLogReportReq.class, this::onReceiveWorkerLogReportReq) .match(WorkerNeedDeployContainerRequest.class, this::onReceiveWorkerNeedDeployContainerRequest) + .match(WorkerQueryExecutorClusterReq.class, this::onReceiveWorkerQueryExecutorClusterReq) .matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj)) .build(); } @@ -110,6 +112,33 @@ public class ServerActor extends AbstractActor { getSender().tell(askResponse, getSelf()); } + /** + * 处理 worker 请求获取当前任务所有处理器节点的请求 + * @param req jobId + appId + */ + private void onReceiveWorkerQueryExecutorClusterReq(WorkerQueryExecutorClusterReq req) { + + AskResponse askResponse; + + Long jobId = req.getJobId(); + Long appId = req.getAppId(); + + JobInfoRepository jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class); + Optional jobInfoOpt = jobInfoRepository.findById(jobId); + if (jobInfoOpt.isPresent()) { + JobInfoDO jobInfo = jobInfoOpt.get(); + if (!jobInfo.getAppId().equals(appId)) { + askResponse = AskResponse.failed("Permission Denied!"); + }else { + List sortedAvailableWorker = WorkerManagerService.getSortedAvailableWorker(appId, jobInfo.getMinCpuCores(), jobInfo.getMinMemorySpace(), jobInfo.getMinDiskSpace()); + askResponse = AskResponse.succeed(sortedAvailableWorker); + } + }else { + askResponse = AskResponse.failed("can't find jobInfo by jobId: " + jobId); + } + getSender().tell(askResponse, getSelf()); + } + // 不需要加锁,从 Spring IOC 中重复取并没什么问题 private InstanceManager getInstanceManager() { if (instanceManager == null) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java index 6cf50dce..1de5f06e 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/ServerController.java @@ -2,20 +2,26 @@ package com.github.kfcfans.powerjob.server.web.controller; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; +import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; +import com.github.kfcfans.powerjob.server.service.ha.ClusterStatusHolder; import com.github.kfcfans.powerjob.server.service.ha.ServerSelectService; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; +import com.taobao.api.internal.cluster.ClusterManager; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; +import java.util.List; import java.util.Optional; import java.util.TimeZone; @@ -34,8 +40,10 @@ public class ServerController { private ServerSelectService serverSelectService; @Resource private AppInfoRepository appInfoRepository; + @Resource + private JobInfoRepository jobInfoRepository; - @GetMapping("assert") + @GetMapping("/assert") public ResultDTO assertAppName(String appName) { Optional appInfoOpt = appInfoRepository.findByAppName(appName); return appInfoOpt.map(appInfoDO -> ResultDTO.success(appInfoDO.getId())). diff --git a/powerjob-server/src/main/resources/application-daily.properties b/powerjob-server/src/main/resources/application-daily.properties index 6d83511f..9b1e2b7a 100644 --- a/powerjob-server/src/main/resources/application-daily.properties +++ b/powerjob-server/src/main/resources/application-daily.properties @@ -11,7 +11,7 @@ spring.datasource.core.hikari.minimum-idle=5 ####### mongoDB配置,非核心依赖,通过配置 oms.mongodb.enable=false 来关闭 ####### oms.mongodb.enable=true -spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-daily +spring.data.mongodb.uri=mongodb+srv://zqq:No1Bug2Please3!@cluster0.wie54.gcp.mongodb.net/powerjob_daily?retryWrites=true&w=majority ####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) ####### spring.mail.host=smtp.163.com diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index d95e203f..e51219e0 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker-agent - 3.3.0 + 3.3.1 jar - 3.3.0 + 3.3.1 1.2.3 4.3.2 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index 86835a94..c34da9af 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-samples - 3.3.0 + 3.3.1 2.2.6.RELEASE - 3.3.0 + 3.3.1 1.2.68 diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java index 73bb3ec7..70dc3759 100644 --- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/processors/BroadcastProcessorDemo.java @@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BroadcastProcessor; +import com.github.kfcfans.powerjob.worker.log.OmsLogger; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -34,9 +35,16 @@ public class BroadcastProcessorDemo extends BroadcastProcessor { @Override public ProcessResult process(TaskContext taskContext) throws Exception { + OmsLogger logger = taskContext.getOmsLogger(); System.out.println("===== BroadcastProcessorDemo#process ======"); - taskContext.getOmsLogger().info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost()); - Thread.sleep(45 * 1000); + logger.info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost()); + long sleepTime = 1000; + try { + sleepTime = Long.parseLong(taskContext.getJobParams()); + }catch (Exception e) { + logger.warn("[BroadcastProcessor] parse sleep time failed!", e); + } + Thread.sleep(Math.max(sleepTime, 1000)); return new ProcessResult(true); } diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index d5ba6e8e..6ddad2fa 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-spring-boot-starter - 3.3.0 + 3.3.1 jar - 3.3.0 + 3.3.1 2.2.6.RELEASE diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index 44d95e5d..f00034e0 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker - 3.3.0 + 3.3.1 jar 5.2.4.RELEASE - 3.3.0 + 3.3.1 1.4.200 3.4.2 5.6.1 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java index 42114b90..efb465df 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java @@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.worker.common.utils; import akka.actor.ActorSelection; import akka.pattern.Patterns; +import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.common.RemoteConstant; @@ -45,13 +46,20 @@ public class AkkaUtils { */ public static boolean reliableTransmit(ActorSelection remote, Object msg) { try { - CompletionStage ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); - AskResponse response = (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); - return response.isSuccess(); + return easyAsk(remote, msg).isSuccess(); }catch (Exception e) { log.warn("[Oms-Transmitter] transmit {} failed, reason is {}", msg, e.toString()); } return false; } + public static AskResponse easyAsk(ActorSelection remote, Object msg) { + try { + CompletionStage ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + return (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + }catch (Exception e) { + throw new PowerJobException(e); + } + } + } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatus.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatus.java index 1fae1bb3..c3cc6d1d 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatus.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatus.java @@ -33,7 +33,7 @@ public class ProcessorTrackerStatus { */ public void init(String address) { this.address = address; - this.lastActiveTime = System.currentTimeMillis(); + this.lastActiveTime = - 1; this.remainTaskNum = 0; this.dispatched = false; this.connected = false; diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java index 461aca34..d9ba0ce6 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ha/ProcessorTrackerStatusHolder.java @@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.worker.core.ha; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; import java.util.List; import java.util.Map; @@ -13,6 +14,7 @@ import java.util.Map; * @author tjq * @since 2020/3/28 */ +@Slf4j public class ProcessorTrackerStatusHolder { // ProcessorTracker的address(IP:Port) -> 状态 @@ -36,8 +38,14 @@ public class ProcessorTrackerStatusHolder { * 根据 ProcessorTracker 的心跳更新状态 */ public void updateStatus(ProcessorTrackerStatusReportReq heartbeatReq) { - ProcessorTrackerStatus processorTrackerStatus = address2Status.get(heartbeatReq.getAddress()); - processorTrackerStatus.update(heartbeatReq); + // remove 前突然收到了 PT 心跳同时立即被派发才可能出现这种情况,0.001% 概率 + ProcessorTrackerStatus pts = address2Status.computeIfAbsent(heartbeatReq.getAddress(), ignore-> { + log.warn("[ProcessorTrackerStatusHolder] unregistered worker's heartbeat request: {}", heartbeatReq); + ProcessorTrackerStatus processorTrackerStatus = new ProcessorTrackerStatus(); + processorTrackerStatus.init(heartbeatReq.getAddress()); + return processorTrackerStatus; + }); + pts.update(heartbeatReq); } /** @@ -74,4 +82,24 @@ public class ProcessorTrackerStatusHolder { }); return result; } + + /** + * 注册新的执行节点 + * @param address 新的执行节点地址 + * @return true: 注册成功 / false:已存在 + */ + public boolean register(String address) { + ProcessorTrackerStatus pts = address2Status.get(address); + if (pts != null) { + return false; + } + pts = new ProcessorTrackerStatus(); + pts.init(address); + address2Status.put(address, pts); + return true; + } + + public void remove(List addressList) { + addressList.forEach(address2Status::remove); + } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java index 2780f86c..9b84bb5f 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java @@ -49,7 +49,7 @@ public abstract class ScriptProcessor implements BasicProcessor { throw new RuntimeException("create script file failed"); } - // 如果是下载连接,则从网络获取 + // 如果是下载链接,则从网络获取 for (String protocol : DOWNLOAD_PROTOCOL) { if (processorInfo.startsWith(protocol)) { FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000); @@ -57,7 +57,7 @@ public abstract class ScriptProcessor implements BasicProcessor { } } - // 持久化到本地 + // 非下载链接,为 processInfo 生成可执行文件 try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) { bw.write(processorInfo); bw.flush(); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index e3f6a08e..513b8722 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -58,7 +58,13 @@ public class CommonTaskTracker extends TaskTracker { // 启动定时任务(任务派发 & 状态检查) scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 0, 5, TimeUnit.SECONDS); - scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 10, 10, TimeUnit.SECONDS); + scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 13, 13, TimeUnit.SECONDS); + + // 如果是 MR 任务,则需要启动执行器动态检测装置 + ExecuteType executeType = ExecuteType.valueOf(req.getExecuteType()); + if (executeType == ExecuteType.MAP || executeType == ExecuteType.MAP_REDUCE) { + scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES); + } } @Override @@ -284,7 +290,10 @@ public class CommonTaskTracker extends TaskTracker { List disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers(); if (!disconnectedPTs.isEmpty()) { log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs); - taskPersistenceService.updateLostTasks(disconnectedPTs); + if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) { + ptStatusHolder.remove(disconnectedPTs); + log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs); + } } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index 27d3af71..9fa9b34b 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -20,7 +20,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; -import javax.annotation.Nullable; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -85,7 +84,7 @@ public class FrequentTaskTracker extends TaskTracker { // 1. 初始化定时调度线程池 String poolName = String.format("ftttp-%d", req.getInstanceId()) + "-%d"; ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build(); - this.scheduledPool = Executors.newScheduledThreadPool(3, factory); + this.scheduledPool = Executors.newScheduledThreadPool(4, factory); // 2. 启动任务发射器 launcher = new Launcher(); @@ -103,6 +102,8 @@ public class FrequentTaskTracker extends TaskTracker { scheduledPool.scheduleWithFixedDelay(new Dispatcher(), 1, 2, TimeUnit.SECONDS); // 4. 启动状态检查器 scheduledPool.scheduleWithFixedDelay(new Checker(), 5000, Math.min(Math.max(timeParams, 5000), 15000), TimeUnit.MILLISECONDS); + // 5. 启动执行器动态检测装置 + scheduledPool.scheduleAtFixedRate(new WorkerDetector(), 1, 1, TimeUnit.MINUTES); } @Override @@ -227,6 +228,17 @@ public class FrequentTaskTracker extends TaskTracker { private void checkStatus() { Stopwatch stopwatch = Stopwatch.createStarted(); + + // worker 挂掉的任务直接置为失败 + List disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers(); + if (!disconnectedPTs.isEmpty()) { + log.warn("[FQTaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs); + if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, false)) { + ptStatusHolder.remove(disconnectedPTs); + log.warn("[FQTaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs); + } + } + ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); long instanceTimeoutMS = instanceInfo.getInstanceTimeoutMS(); long nowTS = System.currentTimeMillis(); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index bb9bb88f..55c38634 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -1,6 +1,7 @@ package com.github.kfcfans.powerjob.worker.core.tracker.task; import akka.actor.ActorSelection; +import com.fasterxml.jackson.core.type.TypeReference; import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.RemoteConstant; @@ -8,7 +9,10 @@ import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq; +import com.github.kfcfans.powerjob.common.request.WorkerQueryExecutorClusterReq; +import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.utils.CommonUtils; +import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.SegmentLock; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; @@ -63,10 +67,10 @@ public abstract class TaskTracker { // 是否结束 protected AtomicBoolean finished; // 上报时间缓存 - private Cache taskId2LastReportTime; + private final Cache taskId2LastReportTime; // 分段锁 - private SegmentLock segmentLock; + private final SegmentLock segmentLock; private static final int UPDATE_CONCURRENCY = 4; protected TaskTracker(ServerScheduleJobReq req) { @@ -471,6 +475,37 @@ public abstract class TaskTracker { } } + /** + * 执行器动态上线(for 秒级任务和 MR 任务) + * 原则:server 查询得到的 执行器状态不会干预 worker 自己维护的状态,即只做新增,不做任何修改 + */ + protected class WorkerDetector implements Runnable { + @Override + public void run() { + String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); + if (StringUtils.isEmpty(serverPath)) { + log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId); + return; + } + WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(OhMyWorker.getAppId(), instanceInfo.getJobId()); + AskResponse response = AkkaUtils.easyAsk(OhMyWorker.actorSystem.actorSelection(serverPath), req); + if (!response.isSuccess()) { + log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage()); + return; + } + try { + List workerList = JsonUtils.parseObject(response.getData(), new TypeReference>() {}); + workerList.forEach(address -> { + if (ptStatusHolder.register(address)) { + log.info("[TaskTracker-{}] detective new worker: {}", instanceId, address); + } + }); + }catch (Exception e) { + log.warn("[TaskTracker-{}] detective failed!", instanceId, e); + } + } + } + /** * 存储任务实例产生的各个Task状态,用于分析任务实例执行情况 */ diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java index b967af3e..5f5de67a 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/persistence/TaskPersistenceService.java @@ -98,19 +98,26 @@ public class TaskPersistenceService { * 更新被派发到已经失联的 ProcessorTracker 的任务,重新执行 * update task_info * set address = 'N/A', status = 0 - * where address in () and status not in (5,6) + * where address in () and status not in (5,6) and instance_id = 277 */ - public boolean updateLostTasks(List addressList) { + public boolean updateLostTasks(Long instanceId, List addressList, boolean retry) { TaskDO updateEntity = new TaskDO(); - updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); - updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); updateEntity.setLastModifiedTime(System.currentTimeMillis()); + if (retry) { + updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); + updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); + }else { + updateEntity.setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()); + updateEntity.setResult("maybe worker down"); + } SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); String queryConditionFormat = "address in %s and status not in (%d, %d)"; String queryCondition = String.format(queryConditionFormat, CommonUtils.getInStringCondition(addressList), TaskStatus.WORKER_PROCESS_FAILED.getValue(), TaskStatus.WORKER_PROCESS_SUCCESS.getValue()); query.setQueryCondition(queryCondition); + log.debug("[TaskPersistenceService] updateLostTasks-QUERY-SQL: {}", query.getQueryCondition()); try { return execute(() -> taskDAO.simpleUpdate(query, updateEntity)); diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java index 8f0482fb..368947c1 100644 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java +++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/PersistenceServiceTest.java @@ -73,7 +73,7 @@ public class PersistenceServiceTest { @Test public void testUpdateLostTasks() throws Exception { Thread.sleep(1000); - boolean success = taskPersistenceService.updateLostTasks(Lists.newArrayList(NetUtils.getLocalHost())); + boolean success = taskPersistenceService.updateLostTasks(10086L, Lists.newArrayList(NetUtils.getLocalHost()), true); System.out.println("updateLostTasks: " + success); }