From f9dd8d7713b657c610a45e1e2435e7f85bccb805 Mon Sep 17 00:00:00 2001 From: tjq Date: Wed, 6 Nov 2024 23:23:52 +0800 Subject: [PATCH 01/12] fix: PADDLING not work --- powerjob-worker/pom.xml | 2 +- .../tracker/task/heavy/HeavyTaskTracker.java | 27 ++++++++++--------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index ae74a553..038ff954 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-worker - 5.1.0-bugfix + 5.1.0-bugfix2-SNAPSHOT jar diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 443cbfd1..797a8475 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -478,6 +479,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { long currentDispatchNum = 0; long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L; AtomicInteger index = new AtomicInteger(0); + AtomicBoolean skipThisRound = new AtomicBoolean(false); // 4. 循环查询数据库,获取需要派发的任务 while (maxDispatchNum > currentDispatchNum) { @@ -490,8 +492,15 @@ public abstract class HeavyTaskTracker extends TaskTracker { // 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address String ptAddress = task.getAddress(); if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) { - if (taskNeedByPassTaskTracker(availablePtIps)) { + if (taskNeedByPassTaskTracker()) { + int loopTime = 0; do { + loopTime++; + if (loopTime > 3) { + log.warn("[TaskTracker-{}] The cluster has no available workers other than master, so this round dispatch is skipped.", instanceId); + skipThisRound.set(true); + return; + } ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); } while (workerRuntime.getWorkerAddress().equals(ptAddress)); } else { @@ -501,6 +510,10 @@ public abstract class HeavyTaskTracker extends TaskTracker { dispatchTask(task, ptAddress); }); + if (skipThisRound.get()) { + break; + } + // 数量不足 或 查询失败,则终止循环 if (needDispatchTasks.size() < dbQueryLimit) { break; @@ -510,18 +523,8 @@ public abstract class HeavyTaskTracker extends TaskTracker { log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop()); } - /** - * padding的生效条件: 在map || mapReduce 的情况下, 且是该appId的worker是 非单机运行时,才生效。 - * fix: 当该appId的worker是单机运行 且 padding时, 导致Dispatcher分发任务处于死循环中, 致使无法分发任务,状态一直为运行中, - * 且该线程不能通过停止任务的方式去停止,只能通过重启该work实例的方式释放该线程。 - */ - private boolean taskNeedByPassTaskTracker(List availablePtIps) { + private boolean taskNeedByPassTaskTracker() { if (ExecuteType.MAP.equals(executeType) || ExecuteType.MAP_REDUCE.equals(executeType)) { - - if (availablePtIps.size() <= 1) { - return false; - } - return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior()); } return false; From e912e2c31d305e79189132369480efd280d30c10 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 9 Nov 2024 18:05:26 +0800 Subject: [PATCH 02/12] feat: NOT_ALLOWED_CHANGE_PASSWORD_ACCOUNTS --- .../java/tech/powerjob/common/PowerJobDKey.java | 2 ++ .../service/impl/PwjbUserWebServiceImplImpl.java | 14 +++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java index cf9819a6..97f859f6 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/PowerJobDKey.java @@ -64,4 +64,6 @@ public class PowerJobDKey { public static final String WORKER_RUNTIME_SWAP_TASK_SCHEDULE_INTERVAL_MS = "powerjob.worker.swap.scan-interval"; + public static final String SERVER_TEST_ACCOUNT_USERNAME = "powerjob.server.test-accounts"; + } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/PwjbUserWebServiceImplImpl.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/PwjbUserWebServiceImplImpl.java index 356ff200..20e752d7 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/PwjbUserWebServiceImplImpl.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/PwjbUserWebServiceImplImpl.java @@ -1,14 +1,17 @@ package tech.powerjob.server.web.service.impl; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import lombok.SneakyThrows; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; +import tech.powerjob.common.PowerJobDKey; +import tech.powerjob.common.enums.ErrorCodes; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CommonUtils; -import tech.powerjob.common.enums.ErrorCodes; -import tech.powerjob.server.auth.common.PowerJobAuthException; import tech.powerjob.common.utils.DigestUtils; +import tech.powerjob.server.auth.common.PowerJobAuthException; +import tech.powerjob.server.common.SJ; import tech.powerjob.server.persistence.remote.model.PwjbUserInfoDO; import tech.powerjob.server.persistence.remote.repository.PwjbUserInfoRepository; import tech.powerjob.server.web.request.ChangePasswordRequest; @@ -88,7 +91,12 @@ public class PwjbUserWebServiceImplImpl implements PwjbUserWebService { } // 测试账号特殊处理 - if (NOT_ALLOWED_CHANGE_PASSWORD_ACCOUNTS.contains(username)) { + Set testAccounts = Sets.newHashSet(NOT_ALLOWED_CHANGE_PASSWORD_ACCOUNTS); + String testAccountsStr = System.getProperty(PowerJobDKey.SERVER_TEST_ACCOUNT_USERNAME); + if (StringUtils.isNotEmpty(testAccountsStr)) { + testAccounts.addAll(Lists.newArrayList(SJ.COMMA_SPLITTER.split(testAccountsStr))); + } + if (testAccounts.contains(username)) { throw new IllegalArgumentException("this account not allowed change the password"); } From f44bd43d133841b6393d5ec6467aa6a12a59bfe7 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 21 Nov 2024 22:11:44 +0800 Subject: [PATCH 03/12] fix: reduce Probabilistic non-execution #1033 --- .../processors/test/IdleBugTestProcessor.java | 46 +++++++++++++++++++ .../tracker/processor/ProcessorTracker.java | 37 ++++++++++++--- .../tracker/task/heavy/CommonTaskTracker.java | 2 + .../tracker/task/heavy/HeavyTaskTracker.java | 2 +- 4 files changed, 79 insertions(+), 8 deletions(-) create mode 100644 powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/IdleBugTestProcessor.java diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/IdleBugTestProcessor.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/IdleBugTestProcessor.java new file mode 100644 index 00000000..abe9f9c2 --- /dev/null +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/IdleBugTestProcessor.java @@ -0,0 +1,46 @@ +package tech.powerjob.samples.processors.test; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.TaskResult; +import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * 测试长时间执行的任务 idle 导致 reduce 不执行 + * + * @author tjq + * @since 2024/11/21 + */ +@Slf4j +@Component +public class IdleBugTestProcessor implements MapReduceProcessor { + + @Override + public ProcessResult process(TaskContext context) throws Exception { + if (isRootTask()) { + map(Lists.newArrayList("1", "2", "3", "4", "5", "6", "7"), "L1_TASK"); + return new ProcessResult(true, "MAP_SUCCESS"); + } + + Object subTask = context.getSubTask(); + log.info("[IdleBugTestProcessor] subTask:={}, start to process!", subTask); + + // 同步修改 idle 阈值 + CommonUtils.easySleep(ThreadLocalRandom.current().nextInt(40001, 60000)); + log.info("[IdleBugTestProcessor] subTask:={}, finished process", subTask); + return new ProcessResult(true, "SUCCESS_" + subTask); + } + + @Override + public ProcessResult reduce(TaskContext context, List taskResults) { + log.info("[IdleBugTestProcessor] [REDUCE] REDUCE!!!"); + return new ProcessResult(true, "SUCCESS"); + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 3d7650cd..c9daebd6 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -265,14 +265,21 @@ public class ProcessorTracker { } else { long idleTime = System.currentTimeMillis() - lastIdleTime; if (idleTime > MAX_IDLE_TIME) { - log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime); - // 不可靠通知,如果该请求失败,则整个任务处理集群缺失一个 ProcessorTracker,影响可接受 - ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId); - statusReportReq.setAddress(workerRuntime.getWorkerAddress()); - TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime); - destroy(); - return; + boolean shouldDestroyWhenIdle = shouldDestroyWhenIdle(); + log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, shouldDestroyWhenIdle: {}", instanceId, idleTime, shouldDestroyWhenIdle); + + if (shouldDestroyWhenIdle) { + + log.warn("[ProcessorTracker-{}] it's time to tell TaskTracker and then destroy self.", instanceId); + + // 不可靠通知,如果该请求失败,则整个任务处理集群缺失一个 ProcessorTracker,影响可接受 + ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId); + statusReportReq.setAddress(workerRuntime.getWorkerAddress()); + TransportUtils.ptReportSelfStatus(statusReportReq, taskTrackerAddress, workerRuntime); + destroy(); + return; + } } } } @@ -300,6 +307,22 @@ public class ProcessorTracker { } + /** + * 空闲的时候是否需要自我销毁 + * @return true or false + */ + private boolean shouldDestroyWhenIdle() { + /* + https://github.com/PowerJob/PowerJob/issues/1033 + map 情况下,如果子任务执行较长,任务末期可能出现某一个节点的任务仍在执行,其他机器都已经无任务可执行,被 idle 逻辑关闭节点。如果不幸在‘生成reduce任务后并派发前’关闭了 TaskTracker 所在节点的 PT,那 reduce 任务就会直接失败 + 解决方案:同 TT 节点的 PT,本身不存在分布式不一致问题,因此不需要 idle 直接关闭 PT 的机制 + */ + if (taskTrackerAddress.equalsIgnoreCase(workerRuntime.getWorkerAddress())) { + return false; + } + return true; + } + /** * 计算线程池大小 diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java index 2ad7ac80..6fc8b1ea 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java @@ -232,6 +232,8 @@ public class CommonTaskTracker extends HeavyTaskTracker { } else { + log.info("[TaskTracker-{}] all subTask has done, start to create final task", instanceId); + // 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask,最终任务必须在本机执行! TaskDO newLastTask = new TaskDO(); newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 443cbfd1..fe03a5fc 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -298,7 +298,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { * @param heartbeatReq ProcessorTracker(任务的执行管理器)发来的心跳包,包含了其当前状态 */ public void receiveProcessorTrackerHeartbeat(ProcessorTrackerStatusReportReq heartbeatReq) { - log.debug("[TaskTracker-{}] receive heartbeat: {}", instanceId, heartbeatReq); + log.debug("[TaskTracker-{}] receive PT's heartbeat: {}", instanceId, heartbeatReq); ptStatusHolder.updateStatus(heartbeatReq); // 上报空闲,检查是否已经接收到全部该 ProcessorTracker 负责的任务 From 4fe2d7fdf143f51bd522ecba16f975b3b77fa97e Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 21 Nov 2024 22:53:57 +0800 Subject: [PATCH 04/12] feat: Add PowerjobClient api /queryInstance #1034 --- .../tech/powerjob/client/IPowerJobClient.java | 3 ++ .../tech/powerjob/client/PowerJobClient.java | 8 ++++ .../java/tech/powerjob/client/TypeStore.java | 2 + .../tech/powerjob/client/test/TestClient.java | 13 ++++++ .../tech/powerjob/client/test/TestUtils.java | 17 ++++++++ .../request/query/InstancePageQuery.java | 33 +++++++++++++++ .../common/request/query/PowerPageQuery.java | 41 ++++++++++++++++++ .../powerjob/common/response/PageResult.java | 42 +++++++++++++++++++ .../server/core/instance/InstanceService.java | 29 +++++++++---- .../server/persistence/QueryConvertUtils.java | 30 +++++++++++-- .../server/openapi/OpenAPIController.java | 4 +- 11 files changed, 209 insertions(+), 13 deletions(-) create mode 100644 powerjob-client/src/test/java/tech/powerjob/client/test/TestUtils.java create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/request/query/InstancePageQuery.java create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/request/query/PowerPageQuery.java create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/response/PageResult.java diff --git a/powerjob-client/src/main/java/tech/powerjob/client/IPowerJobClient.java b/powerjob-client/src/main/java/tech/powerjob/client/IPowerJobClient.java index 0df519b1..e489297c 100644 --- a/powerjob-client/src/main/java/tech/powerjob/client/IPowerJobClient.java +++ b/powerjob-client/src/main/java/tech/powerjob/client/IPowerJobClient.java @@ -3,6 +3,7 @@ package tech.powerjob.client; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.request.http.SaveWorkflowNodeRequest; import tech.powerjob.common.request.http.SaveWorkflowRequest; +import tech.powerjob.common.request.query.InstancePageQuery; import tech.powerjob.common.request.query.JobInfoQuery; import tech.powerjob.common.response.*; @@ -50,6 +51,8 @@ public interface IPowerJobClient { ResultDTO fetchInstanceInfo(Long instanceId); + ResultDTO> queryInstanceInfo(InstancePageQuery instancePageQuery); + /* ************* Workflow API list ************* */ ResultDTO saveWorkflow(SaveWorkflowRequest request); diff --git a/powerjob-client/src/main/java/tech/powerjob/client/PowerJobClient.java b/powerjob-client/src/main/java/tech/powerjob/client/PowerJobClient.java index f2e668ce..3c454516 100644 --- a/powerjob-client/src/main/java/tech/powerjob/client/PowerJobClient.java +++ b/powerjob-client/src/main/java/tech/powerjob/client/PowerJobClient.java @@ -17,6 +17,7 @@ import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.request.http.SaveWorkflowNodeRequest; import tech.powerjob.common.request.http.SaveWorkflowRequest; +import tech.powerjob.common.request.query.InstancePageQuery; import tech.powerjob.common.request.query.JobInfoQuery; import tech.powerjob.common.response.*; import tech.powerjob.common.serialize.JsonUtils; @@ -335,6 +336,13 @@ public class PowerJobClient implements IPowerJobClient, Closeable { return JSON.parseObject(post, INSTANCE_RESULT_TYPE); } + @Override + public ResultDTO> queryInstanceInfo(InstancePageQuery instancePageQuery) { + instancePageQuery.setAppIdEq(appId); + String post = requestService.request(OpenAPIConstant.QUERY_INSTANCE, PowerRequestBody.newJsonRequestBody(instancePageQuery)); + return JSON.parseObject(post, PAGE_INSTANCE_RESULT_TYPE); + } + /* ************* Workflow API list ************* */ /** diff --git a/powerjob-client/src/main/java/tech/powerjob/client/TypeStore.java b/powerjob-client/src/main/java/tech/powerjob/client/TypeStore.java index c3c23406..2c19460e 100644 --- a/powerjob-client/src/main/java/tech/powerjob/client/TypeStore.java +++ b/powerjob-client/src/main/java/tech/powerjob/client/TypeStore.java @@ -32,6 +32,8 @@ public class TypeStore { public static final TypeReference>> LIST_INSTANCE_RESULT_TYPE = new TypeReference>>(){}; + public static final TypeReference>> PAGE_INSTANCE_RESULT_TYPE = new TypeReference>>(){}; + public static final TypeReference> WF_RESULT_TYPE = new TypeReference>() {}; public static final TypeReference> WF_INSTANCE_RESULT_TYPE = new TypeReference>() {}; diff --git a/powerjob-client/src/test/java/tech/powerjob/client/test/TestClient.java b/powerjob-client/src/test/java/tech/powerjob/client/test/TestClient.java index ac5658d6..9c1b30c0 100644 --- a/powerjob-client/src/test/java/tech/powerjob/client/test/TestClient.java +++ b/powerjob-client/src/test/java/tech/powerjob/client/test/TestClient.java @@ -1,6 +1,7 @@ package tech.powerjob.client.test; import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -9,6 +10,7 @@ import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.request.http.SaveJobInfoRequest; +import tech.powerjob.common.request.query.InstancePageQuery; import tech.powerjob.common.response.InstanceInfoDTO; import tech.powerjob.common.response.JobInfoDTO; import tech.powerjob.common.response.ResultDTO; @@ -113,6 +115,17 @@ class TestClient extends ClientInitializer { Assertions.assertNotNull(res); } + @Test + void testQueryInstanceInfo() { + InstancePageQuery instancePageQuery = new InstancePageQuery(); + instancePageQuery.setJobIdEq(11L); + instancePageQuery.setSortBy("actualTriggerTime"); + instancePageQuery.setAsc(true); + instancePageQuery.setPageSize(3); + instancePageQuery.setStatusIn(Lists.newArrayList(1,2,5)); + TestUtils.output(powerJobClient.queryInstanceInfo(instancePageQuery)); + } + @Test void testStopInstance() { ResultDTO res = powerJobClient.stopInstance(702482902331424832L); diff --git a/powerjob-client/src/test/java/tech/powerjob/client/test/TestUtils.java b/powerjob-client/src/test/java/tech/powerjob/client/test/TestUtils.java new file mode 100644 index 00000000..e09f4071 --- /dev/null +++ b/powerjob-client/src/test/java/tech/powerjob/client/test/TestUtils.java @@ -0,0 +1,17 @@ +package tech.powerjob.client.test; + +import com.alibaba.fastjson.JSONObject; + +/** + * TestUtils + * + * @author tjq + * @since 2024/11/21 + */ +public class TestUtils { + + public static void output(Object v) { + String str = JSONObject.toJSONString(v); + System.out.println(str); + } +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/query/InstancePageQuery.java b/powerjob-common/src/main/java/tech/powerjob/common/request/query/InstancePageQuery.java new file mode 100644 index 00000000..dff4c6f0 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/query/InstancePageQuery.java @@ -0,0 +1,33 @@ +package tech.powerjob.common.request.query; + +import lombok.Getter; +import lombok.Setter; + +import java.util.Date; +import java.util.List; + +/** + * 任务实例分页查询 + * + * @author tjq + * @since 2024/11/21 + */ +@Getter +@Setter +public class InstancePageQuery extends PowerPageQuery { + + private Long instanceIdEq; + private Long instanceIdLt; + private Long instanceIdGt; + + private Long jobIdEq; + + private List statusIn; + + + private Date gmtCreateLt; + private Date gmtCreateGt; + + private Date gmtModifiedLt; + private Date gmtModifiedGt; +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/query/PowerPageQuery.java b/powerjob-common/src/main/java/tech/powerjob/common/request/query/PowerPageQuery.java new file mode 100644 index 00000000..cd65139b --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/query/PowerPageQuery.java @@ -0,0 +1,41 @@ +package tech.powerjob.common.request.query; + +import lombok.Getter; +import lombok.Setter; +import tech.powerjob.common.PowerQuery; + +import java.io.Serializable; + +/** + * 分页查询 + * + * @author tjq + * @since 2024/11/21 + */ +@Getter +@Setter +public class PowerPageQuery extends PowerQuery implements Serializable { + + + /* ****************** 分页参数 ****************** */ + /** + * 当前页码 + */ + protected Integer index = 0; + /** + * 页大小 + */ + protected Integer pageSize = 10; + + /* ****************** 排序参数 ****************** */ + + /** + * 排序参数,如 gmtCreate、instanceId + */ + protected String sortBy; + + /** + * asc是指定列按升序排列,desc则是指定列按降序排列 + */ + protected boolean asc = false; +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/response/PageResult.java b/powerjob-common/src/main/java/tech/powerjob/common/response/PageResult.java new file mode 100644 index 00000000..a90c5ee5 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/response/PageResult.java @@ -0,0 +1,42 @@ +package tech.powerjob.common.response; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +import java.io.Serializable; +import java.util.List; + +/** + * 分页对象 + * + * @author tjq + * @since 2020/4/12 + */ +@Data +@NoArgsConstructor +@Accessors(chain = true) +public class PageResult implements Serializable { + + /** + * 当前页数 + */ + private int index; + /** + * 页大小 + */ + private int pageSize; + /** + * 总页数 + */ + private int totalPages; + /** + * 总数据量 + */ + private long totalItems; + /** + * 数据 + */ + private List data; + +} diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java index 2f09a052..d9a8ed24 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java @@ -3,8 +3,10 @@ package tech.powerjob.server.core.instance; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Service; -import tech.powerjob.common.PowerQuery; import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.SystemInstanceResult; import tech.powerjob.common.enums.InstanceStatus; @@ -12,8 +14,10 @@ import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerQueryInstanceStatusReq; import tech.powerjob.common.request.ServerStopInstanceReq; +import tech.powerjob.common.request.query.InstancePageQuery; import tech.powerjob.common.response.AskResponse; import tech.powerjob.common.response.InstanceInfoDTO; +import tech.powerjob.common.response.PageResult; import tech.powerjob.remote.framework.base.URL; import tech.powerjob.server.common.constants.InstanceType; import tech.powerjob.server.common.module.WorkerInfo; @@ -27,8 +31,8 @@ import tech.powerjob.server.persistence.remote.model.JobInfoDO; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.persistence.remote.repository.JobInfoRepository; import tech.powerjob.server.remote.server.redirector.DesignateServer; -import tech.powerjob.server.remote.transporter.impl.ServerURLFactory; import tech.powerjob.server.remote.transporter.TransportService; +import tech.powerjob.server.remote.transporter.impl.ServerURLFactory; import tech.powerjob.server.remote.worker.WorkerClusterQueryService; import java.util.Date; @@ -228,12 +232,21 @@ public class InstanceService { } } - public List queryInstanceInfo(PowerQuery powerQuery) { - return instanceInfoRepository - .findAll(QueryConvertUtils.toSpecification(powerQuery)) - .stream() - .map(InstanceService::directConvert) - .collect(Collectors.toList()); + public PageResult queryInstanceInfo(InstancePageQuery instancePageQuery) { + Specification specification = QueryConvertUtils.toSpecification(instancePageQuery); + Pageable pageable = QueryConvertUtils.toPageable(instancePageQuery); + Page instanceInfoDOPage = instanceInfoRepository.findAll(specification, pageable); + + PageResult ret = new PageResult<>(); + List instanceInfoDTOList = instanceInfoDOPage.get().map(InstanceService::directConvert).collect(Collectors.toList()); + + ret.setData(instanceInfoDTOList) + .setIndex(instanceInfoDOPage.getNumber()) + .setPageSize(instanceInfoDOPage.getSize()) + .setTotalPages(instanceInfoDOPage.getTotalPages()) + .setTotalItems(instanceInfoDOPage.getTotalElements()); + + return ret; } /** diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/QueryConvertUtils.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/QueryConvertUtils.java index 3f4fe41d..ce62c4f3 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/QueryConvertUtils.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/QueryConvertUtils.java @@ -1,14 +1,18 @@ package tech.powerjob.server.persistence; import com.alibaba.fastjson.JSONArray; -import tech.powerjob.common.exception.PowerJobException; -import tech.powerjob.common.PowerQuery; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Sort; import org.springframework.data.jpa.domain.Specification; +import tech.powerjob.common.PowerQuery; +import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.request.query.PowerPageQuery; -import javax.persistence.criteria.*; +import javax.persistence.criteria.Predicate; import java.lang.reflect.Field; import java.util.List; @@ -86,6 +90,26 @@ public class QueryConvertUtils { }; } + public static Pageable toPageable(PowerPageQuery powerPageQuery) { + + Sort sorter = null; + String sortBy = powerPageQuery.getSortBy(); + if (StringUtils.isNoneEmpty(sortBy)) { + sorter = Sort.by(sortBy); + if (powerPageQuery.isAsc()) { + sorter.ascending(); + } else { + sorter.descending(); + } + } + + if (sorter == null) { + return PageRequest.of(powerPageQuery.getIndex(), powerPageQuery.getPageSize()); + } + + return PageRequest.of(powerPageQuery.getIndex(), powerPageQuery.getPageSize(), sorter); + } + public static String convertLikeParams(Object o) { String s = (String) o; if (!s.startsWith("%")) { diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/openapi/OpenAPIController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/openapi/OpenAPIController.java index 40a9abf3..37e4b2bb 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/openapi/OpenAPIController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/openapi/OpenAPIController.java @@ -7,13 +7,13 @@ import org.springframework.web.bind.annotation.*; import tech.powerjob.client.module.AppAuthRequest; import tech.powerjob.client.module.AppAuthResult; import tech.powerjob.common.OpenAPIConstant; -import tech.powerjob.common.PowerQuery; import tech.powerjob.common.enums.ErrorCodes; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.exception.PowerJobException; import tech.powerjob.common.request.http.SaveJobInfoRequest; import tech.powerjob.common.request.http.SaveWorkflowNodeRequest; import tech.powerjob.common.request.http.SaveWorkflowRequest; +import tech.powerjob.common.request.query.InstancePageQuery; import tech.powerjob.common.request.query.JobInfoQuery; import tech.powerjob.common.response.*; import tech.powerjob.server.core.instance.InstanceService; @@ -183,7 +183,7 @@ public class OpenAPIController { } @PostMapping(OpenAPIConstant.QUERY_INSTANCE) - public ResultDTO> queryInstance(@RequestBody PowerQuery powerQuery) { + public ResultDTO> queryInstance(@RequestBody InstancePageQuery powerQuery) { return ResultDTO.success(instanceService.queryInstanceInfo(powerQuery)); } From 4e84bc60d7b8cb9bc9fb058aa4e86b442ea78589 Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 22 Nov 2024 21:05:23 +0800 Subject: [PATCH 05/12] feat: support method job direct return ProcessResult #798 --- .../worker/processor/impl/MethodBasicProcessor.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/MethodBasicProcessor.java b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/MethodBasicProcessor.java index b148591f..8912cf8f 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/MethodBasicProcessor.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/MethodBasicProcessor.java @@ -24,6 +24,12 @@ class MethodBasicProcessor implements BasicProcessor { public ProcessResult process(TaskContext context) throws Exception { try { Object result = method.invoke(bean, context); + + // 支持直接返回 ProcessResult https://github.com/PowerJob/PowerJob/issues/798 + if (result instanceof ProcessResult) { + return (ProcessResult) result; + } + return new ProcessResult(true, JsonUtils.toJSONString(result)); } catch (InvocationTargetException ite) { ExceptionUtils.rethrow(ite.getTargetException()); From 57627305faa035374e4b82195023e2462eda37ee Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 22 Nov 2024 21:32:13 +0800 Subject: [PATCH 06/12] fix: Repeated execution after broadcast worker node down #1003 --- .../worker/core/tracker/task/heavy/CommonTaskTracker.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java index 6fc8b1ea..20a88a3b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java @@ -296,8 +296,10 @@ public class CommonTaskTracker extends HeavyTaskTracker { // 6.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务 List disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers(); if (!disconnectedPTs.isEmpty()) { - log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs); - if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) { + // 广播任务节点丢失后若直接移除 IP 重试,后续会派发到其他节点,导致重复执行,因此此处不能重试 https://github.com/PowerJob/PowerJob/issues/1003 + boolean needRetry = !ExecuteType.BROADCAST.equals(executeType); + log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}, needRetry: {}.", instanceId, disconnectedPTs, needRetry); + if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, needRetry)) { ptStatusHolder.remove(disconnectedPTs); log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs); } From 508127426f9156e40e575065c18be8276740ff9e Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 22 Nov 2024 21:45:54 +0800 Subject: [PATCH 07/12] feat: appname and namespace duplicate check #1009 --- .../tech/powerjob/common/enums/ErrorCodes.java | 4 ++++ .../exception/PowerJobExceptionLauncher.java | 17 +++++++++++++++++ .../web/controller/AppInfoController.java | 7 +++++++ .../service/impl/NamespaceWebServiceImpl.java | 7 ++++++- 4 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/exception/PowerJobExceptionLauncher.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/enums/ErrorCodes.java b/powerjob-common/src/main/java/tech/powerjob/common/enums/ErrorCodes.java index db0043c6..dfd42ce5 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/enums/ErrorCodes.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/enums/ErrorCodes.java @@ -49,6 +49,10 @@ public enum ErrorCodes { * 系统内部异常 */ SYSTEM_UNKNOWN_ERROR("-500", "SYS_UNKNOWN_ERROR"), + /** + * 非法参数 + */ + ILLEGAL_ARGS_ERROR("-501", "ILLEGAL_ARGS_ERROR"), /** * OPENAPI 错误码号段 -10XX diff --git a/powerjob-common/src/main/java/tech/powerjob/common/exception/PowerJobExceptionLauncher.java b/powerjob-common/src/main/java/tech/powerjob/common/exception/PowerJobExceptionLauncher.java new file mode 100644 index 00000000..d86e13af --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/exception/PowerJobExceptionLauncher.java @@ -0,0 +1,17 @@ +package tech.powerjob.common.exception; + + +import tech.powerjob.common.enums.ErrorCodes; + +/** + * PowerJobExceptionLauncher + * + * @author tjq + * @since 2024/11/22 + */ +public class PowerJobExceptionLauncher { + + public PowerJobExceptionLauncher(ErrorCodes errorCode, String message) { + throw new PowerJobException(errorCode, message); + } +} diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java index 36d048ef..6b312ad4 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java @@ -17,6 +17,8 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; +import tech.powerjob.common.enums.ErrorCodes; +import tech.powerjob.common.exception.PowerJobExceptionLauncher; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CommonUtils; @@ -85,9 +87,14 @@ public class AppInfoController { Long id = req.getId(); if (id == null) { + + // 前置校验,防止部分没加唯一索引的 DB 重复创建记录导致异常 + appInfoRepository.findByAppName(req.getAppName()).ifPresent(x -> new PowerJobExceptionLauncher(ErrorCodes.ILLEGAL_ARGS_ERROR, String.format("App[%s] already exists", req.getAppName()))); + appInfoDO = new AppInfoDO(); appInfoDO.setGmtCreate(new Date()); appInfoDO.setCreator(LoginUserHolder.getUserId()); + } else { appInfoDO = appInfoService.findById(id, false).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by id:" + id)); diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/NamespaceWebServiceImpl.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/NamespaceWebServiceImpl.java index e504bc81..342dbfd6 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/NamespaceWebServiceImpl.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/NamespaceWebServiceImpl.java @@ -8,12 +8,14 @@ import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Service; +import tech.powerjob.common.enums.ErrorCodes; +import tech.powerjob.common.enums.SwitchableStatus; import tech.powerjob.common.exception.PowerJobException; +import tech.powerjob.common.exception.PowerJobExceptionLauncher; import tech.powerjob.server.auth.LoginUserHolder; import tech.powerjob.server.auth.RoleScope; import tech.powerjob.server.auth.service.WebAuthService; import tech.powerjob.server.common.SJ; -import tech.powerjob.common.enums.SwitchableStatus; import tech.powerjob.server.persistence.QueryConvertUtils; import tech.powerjob.server.persistence.remote.model.AppInfoDO; import tech.powerjob.server.persistence.remote.model.NamespaceDO; @@ -57,6 +59,9 @@ public class NamespaceWebServiceImpl implements NamespaceWebService { boolean isCreate = id == null; if (isCreate) { + + namespaceRepository.findByCode(req.getCode()).ifPresent(x -> new PowerJobExceptionLauncher(ErrorCodes.ILLEGAL_ARGS_ERROR, String.format("namespace[%s] already exists", req.getCode()))); + namespaceDO = new NamespaceDO(); namespaceDO.setGmtCreate(new Date()); From 92ddc6af4d52ee1915544ce1d060cfaf5ffec494 Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 22 Nov 2024 22:03:06 +0800 Subject: [PATCH 08/12] feat: support create app with namespace_code #976 --- .../powerjob/server/web/controller/AppInfoController.java | 5 +++++ .../powerjob/server/web/request/ModifyAppInfoRequest.java | 4 ++++ .../powerjob/server/web/service/NamespaceWebService.java | 2 ++ .../server/web/service/impl/NamespaceWebServiceImpl.java | 8 ++++++++ 4 files changed, 19 insertions(+) diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java index 6b312ad4..3206548c 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/AppInfoController.java @@ -82,6 +82,11 @@ public class AppInfoController { @ApiPermission(name = "App-Save", roleScope = RoleScope.APP, dynamicPermissionPlugin = ModifyOrCreateDynamicPermission.class, grandPermissionPlugin = SaveAppGrantPermissionPlugin.class) public ResultDTO saveAppInfo(@RequestBody ModifyAppInfoRequest req) { + // 根据 ns code 填充 namespaceId(自动化创建过程中,固定的 namespace-code 对用户更友好) + if (StringUtils.isNotEmpty(req.getNamespaceCode())) { + namespaceWebService.findByCode(req.getNamespaceCode()).ifPresent(x -> req.setNamespaceId(x.getId())); + } + req.valid(); AppInfoDO appInfoDO; diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/ModifyAppInfoRequest.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/ModifyAppInfoRequest.java index 4669cf62..7d86fbd0 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/ModifyAppInfoRequest.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/request/ModifyAppInfoRequest.java @@ -17,7 +17,11 @@ public class ModifyAppInfoRequest { private Long id; private String appName; + /** + * namespace 唯一标识,任选其一传递即可 + */ private Long namespaceId; + private String namespaceCode; private String oldPassword; private String password; diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/NamespaceWebService.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/NamespaceWebService.java index 07f9d218..8ff706aa 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/NamespaceWebService.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/NamespaceWebService.java @@ -22,6 +22,8 @@ public interface NamespaceWebService { Optional findById(Long id); + Optional findByCode(String code); + Page list(QueryNamespaceRequest queryNamespaceRequest); List listAll(); diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/NamespaceWebServiceImpl.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/NamespaceWebServiceImpl.java index 342dbfd6..e376adf2 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/NamespaceWebServiceImpl.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/service/impl/NamespaceWebServiceImpl.java @@ -114,6 +114,14 @@ public class NamespaceWebServiceImpl implements NamespaceWebService { return namespaceRepository.findById(id); } + @Override + public Optional findByCode(String code) { + if (StringUtils.isEmpty(code)) { + return Optional.empty(); + } + return namespaceRepository.findByCode(code); + } + @Override public Page list(QueryNamespaceRequest queryNamespaceRequest) { String codeLike = queryNamespaceRequest.getCodeLike(); From fdd80f6cf97e5433f2c105572c654e90aef1b263 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 7 Dec 2024 17:32:20 +0800 Subject: [PATCH 09/12] feat: opt log --- powerjob-worker/pom.xml | 2 +- .../core/tracker/task/heavy/HeavyTaskTracker.java | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index 038ff954..ae74a553 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-worker - 5.1.0-bugfix2-SNAPSHOT + 5.1.0-bugfix jar diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 7308ba24..314294c8 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -43,6 +43,7 @@ import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; /** * 负责管理 JobInstance 的运行,主要包括任务的派发(MR可能存在大量的任务)和状态的更新 @@ -477,6 +478,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { // 3. 避免大查询,分批派发任务 long currentDispatchNum = 0; + LongAdder realDispatchNum = new LongAdder(); long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L; AtomicInteger index = new AtomicInteger(0); AtomicBoolean skipThisRound = new AtomicBoolean(false); @@ -496,7 +498,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { int loopTime = 0; do { loopTime++; - if (loopTime > 3) { + if (loopTime > 2) { log.warn("[TaskTracker-{}] The cluster has no available workers other than master, so this round dispatch is skipped.", instanceId); skipThisRound.set(true); return; @@ -508,6 +510,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { } } dispatchTask(task, ptAddress); + realDispatchNum.increment(); }); if (skipThisRound.get()) { @@ -520,7 +523,10 @@ public abstract class HeavyTaskTracker extends TaskTracker { } } - log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop()); + long realDispatchNumL = realDispatchNum.longValue(); + if (realDispatchNumL > 0) { + log.info("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, realDispatchNum, stopwatch.stop()); + } } private boolean taskNeedByPassTaskTracker() { From aefa9290c9a12da15fa22cf5b231299f2b67f18d Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 7 Dec 2024 20:36:16 +0800 Subject: [PATCH 10/12] chore: github workflows --- .github/workflows/docker-image.yml | 40 ---------------- .github/workflows/docker_publish.yml | 69 ++++++++++++++++++++++++++++ .github/workflows/maven.yml | 38 --------------- .github/workflows/maven_build.yml | 28 +++++++++++ .github/workflows/maven_publish.yml | 22 +++++++++ 5 files changed, 119 insertions(+), 78 deletions(-) delete mode 100644 .github/workflows/docker-image.yml create mode 100644 .github/workflows/docker_publish.yml delete mode 100644 .github/workflows/maven.yml create mode 100644 .github/workflows/maven_build.yml create mode 100644 .github/workflows/maven_publish.yml diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml deleted file mode 100644 index 65635e0c..00000000 --- a/.github/workflows/docker-image.yml +++ /dev/null @@ -1,40 +0,0 @@ -name: Docker Image CI - -on: - push: - branches: [ master ] - -jobs: - - build: - - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - name: Build the Docker image - run: mvn clean package -Pdev -DskipTests -U -e && /bin/cp -rf powerjob-server/powerjob-server-starter/target/*.jar powerjob-server/docker/powerjob-server.jar && /bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar && /bin/cp -rf powerjob-worker-samples/target/*.jar powerjob-worker-samples/powerjob-worker-samples.jar - - uses: docker/build-push-action@v1 - with: - username: ${{ secrets.DOCKER_USERNAME }} - password: ${{ secrets.DOCKER_PASSWORD }} - repository: tjqq/powerjob-server - tag_with_ref: true - tags: latest - path: powerjob-server/docker/ - - uses: docker/build-push-action@v1 - with: - username: ${{ secrets.DOCKER_USERNAME }} - password: ${{ secrets.DOCKER_PASSWORD }} - repository: tjqq/powerjob-agent - tag_with_ref: true - tags: latest - path: powerjob-worker-agent/ - - uses: docker/build-push-action@v1 - with: - username: ${{ secrets.DOCKER_USERNAME }} - password: ${{ secrets.DOCKER_PASSWORD }} - repository: tjqq/powerjob-worker-samples - tag_with_ref: true - tags: latest - path: powerjob-worker-samples/ \ No newline at end of file diff --git a/.github/workflows/docker_publish.yml b/.github/workflows/docker_publish.yml new file mode 100644 index 00000000..4b0d7f1e --- /dev/null +++ b/.github/workflows/docker_publish.yml @@ -0,0 +1,69 @@ +name: build_docker + +on: + push: + branches: [master] + tags: + - 'v*' # Push events to matching v*, i.e. v1.0, v20.15.10 + +jobs: + build_docker: + name: Build docker + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Build Maven Project + uses: actions/setup-java@v4 + with: + java-version: '8' + distribution: 'temurin' + - name: Publish package + run: mvn clean package -Pdev -DskipTests -U -e && /bin/cp -rf powerjob-server/powerjob-server-starter/target/*.jar powerjob-server/docker/powerjob-server.jar && /bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar && /bin/cp -rf powerjob-worker-samples/target/*.jar powerjob-worker-samples/powerjob-worker-samples.jar + + # Login + - name: Login to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Build And Push [powerjob-server] + uses: docker/build-push-action@v6 + with: + context: powerjob-server/docker/ + push: true + platforms: linux/amd64,linux/arm64 + tags: | + tjqq/powerjob-server:latest + powerjob/powerjob-server:latest + tjqq/powerjob-server:${{ env.GITHUB_REF_NAME }} + powerjob/powerjob-server:${{ env.GITHUB_REF_NAME }} + + - name: Build And Push [powerjob-agent] + uses: docker/build-push-action@v6 + with: + context: powerjob-worker-agent/ + push: true + platforms: linux/amd64,linux/arm64 + tags: | + tjqq/powerjob-agent:latest + powerjob/powerjob-agent:latest + tjqq/powerjob-agent:${{ env.GITHUB_REF_NAME }} + powerjob/powerjob-agent:${{ env.GITHUB_REF_NAME }} + + - name: Build And Push [powerjob-worker-samples] + uses: docker/build-push-action@v6 + with: + context: powerjob-worker-samples/ + push: true + platforms: linux/amd64,linux/arm64 + tags: | + tjqq/powerjob-worker-samples:latest + powerjob/powerjob-worker-samples:latest + tjqq/powerjob-worker-samples:${{ env.GITHUB_REF_NAME }} + powerjob/powerjob-worker-samples:${{ env.GITHUB_REF_NAME }} diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml deleted file mode 100644 index c2f99971..00000000 --- a/.github/workflows/maven.yml +++ /dev/null @@ -1,38 +0,0 @@ -# This workflow will build a Java project with Maven -# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven - -name: Java CI with Maven - -on: - push: - branches: [ master ] - pull_request: - branches: [ master ] - -jobs: - build: - - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - name: Set up JDK 1.8 - uses: actions/setup-java@v1 - with: - java-version: 1.8 - - name: Build with Maven - run: mvn -B clean package -Pdev -DskipTests --file pom.xml - - name: upload build result - run: mkdir staging && cp powerjob-server/powerjob-server-starter/target/*.jar staging/powerjob-server.jar && cp powerjob-client/target/*.jar staging/powerjob-client.jar && cp powerjob-worker-agent/target/*.jar staging/powerjob-agent.jar - - uses: actions/upload-artifact@v1 - with: - name: powerjob-server.jar - path: staging/powerjob-server.jar - - uses: actions/upload-artifact@v1 - with: - name: powerjob-client.jar - path: staging/powerjob-client.jar - - uses: actions/upload-artifact@v1 - with: - name: powerjob-agent.jar - path: staging/powerjob-agent.jar diff --git a/.github/workflows/maven_build.yml b/.github/workflows/maven_build.yml new file mode 100644 index 00000000..f0042b9a --- /dev/null +++ b/.github/workflows/maven_build.yml @@ -0,0 +1,28 @@ +# This workflow will build a Java project with Maven +# For more information see: https://docs.github.com/zh/actions/use-cases-and-examples/building-and-testing/building-and-testing-java-with-maven + +name: Java CI with Maven + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +jobs: + build: + + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-java@v4 + with: + java-version: '8' + distribution: 'temurin' + - run: mvn -B clean package -Pdev -DskipTests --file pom.xml + - run: mkdir staging && cp powerjob-server/powerjob-server-starter/target/*.jar staging/powerjob-server.jar && cp powerjob-client/target/*.jar staging/powerjob-client.jar && cp powerjob-worker-agent/target/*.jar staging/powerjob-agent.jar && cp powerjob-worker-spring-boot-starter/target/*.jar staging/powerjob-worker-spring-boot-starter.jar + - uses: actions/upload-artifact@v4 + with: + name: Package + path: staging + diff --git a/.github/workflows/maven_publish.yml b/.github/workflows/maven_publish.yml new file mode 100644 index 00000000..f374635a --- /dev/null +++ b/.github/workflows/maven_publish.yml @@ -0,0 +1,22 @@ +name: Publish package to the Maven Central Repository +on: + release: + types: [created] +jobs: + publish: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Maven Central Repository + uses: actions/setup-java@v4 + with: + java-version: '8' + distribution: 'temurin' + server-id: ossrh + server-username: MAVEN_USERNAME + server-password: MAVEN_PASSWORD + - name: Publish package + run: mvn --batch-mode clean deploy -pl powerjob-worker,powerjob-client,powerjob-worker-spring-boot-starter,powerjob-official-processors,powerjob-worker-agent -DskipTests -Prelease -am + env: + MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }} From c6277767649885be7f7eebf3bcc162ef3ca8ea24 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 7 Dec 2024 20:38:31 +0800 Subject: [PATCH 11/12] chore: upgrade project version to 5.1.1 --- pom.xml | 2 +- powerjob-client/pom.xml | 6 +++--- powerjob-common/pom.xml | 4 ++-- powerjob-official-processors/pom.xml | 6 +++--- powerjob-remote/pom.xml | 2 +- powerjob-remote/powerjob-remote-benchmark/pom.xml | 6 +++--- powerjob-remote/powerjob-remote-framework/pom.xml | 6 +++--- powerjob-remote/powerjob-remote-impl-akka/pom.xml | 6 +++--- powerjob-remote/powerjob-remote-impl-http/pom.xml | 6 +++--- powerjob-server/pom.xml | 10 +++++----- powerjob-server/powerjob-server-auth/pom.xml | 2 +- powerjob-server/powerjob-server-common/pom.xml | 2 +- powerjob-server/powerjob-server-core/pom.xml | 2 +- powerjob-server/powerjob-server-extension/pom.xml | 2 +- powerjob-server/powerjob-server-migrate/pom.xml | 2 +- powerjob-server/powerjob-server-monitor/pom.xml | 2 +- powerjob-server/powerjob-server-persistence/pom.xml | 2 +- powerjob-server/powerjob-server-remote/pom.xml | 2 +- powerjob-server/powerjob-server-starter/pom.xml | 2 +- powerjob-worker-agent/pom.xml | 8 ++++---- powerjob-worker-samples/pom.xml | 10 +++++----- powerjob-worker-spring-boot-starter/pom.xml | 6 +++--- powerjob-worker/pom.xml | 12 ++++++------ 23 files changed, 54 insertions(+), 54 deletions(-) diff --git a/pom.xml b/pom.xml index 0f4cbdb3..1863f8a8 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ tech.powerjob powerjob - 5.1.0-bugfix + 5.1.1 pom powerjob http://www.powerjob.tech diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index 7ac9a906..ddd317bc 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -5,19 +5,19 @@ powerjob tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 powerjob-client - 5.1.0-bugfix + 5.1.1 jar 5.9.1 1.2.13 1.2.83 - 5.1.0-bugfix + 5.1.1 3.2.4 diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index 3c17297b..ae50dbdf 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -5,12 +5,12 @@ powerjob tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 powerjob-common - 5.1.0-bugfix + 5.1.1 jar diff --git a/powerjob-official-processors/pom.xml b/powerjob-official-processors/pom.xml index 254f0eaf..efa0739f 100644 --- a/powerjob-official-processors/pom.xml +++ b/powerjob-official-processors/pom.xml @@ -5,12 +5,12 @@ powerjob tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 powerjob-official-processors - 5.1.0-bugfix + 5.1.1 jar @@ -20,7 +20,7 @@ 5.9.1 1.2.13 - 5.1.0-bugfix + 5.1.1 2.2.224 8.0.28 5.3.31 diff --git a/powerjob-remote/pom.xml b/powerjob-remote/pom.xml index 5064f6b1..44937c7b 100644 --- a/powerjob-remote/pom.xml +++ b/powerjob-remote/pom.xml @@ -5,7 +5,7 @@ powerjob tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 pom diff --git a/powerjob-remote/powerjob-remote-benchmark/pom.xml b/powerjob-remote/powerjob-remote-benchmark/pom.xml index 917e91c9..dfdeb76b 100644 --- a/powerjob-remote/powerjob-remote-benchmark/pom.xml +++ b/powerjob-remote/powerjob-remote-benchmark/pom.xml @@ -5,7 +5,7 @@ powerjob-remote tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 @@ -21,8 +21,8 @@ 1.2.13 2.7.18 - 5.1.0-bugfix - 5.1.0-bugfix + 5.1.1 + 5.1.1 3.9.0 4.2.9 diff --git a/powerjob-remote/powerjob-remote-framework/pom.xml b/powerjob-remote/powerjob-remote-framework/pom.xml index 3e647ec0..5e883ed8 100644 --- a/powerjob-remote/powerjob-remote-framework/pom.xml +++ b/powerjob-remote/powerjob-remote-framework/pom.xml @@ -5,11 +5,11 @@ powerjob-remote tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 - 5.1.0-bugfix + 5.1.1 powerjob-remote-framework @@ -17,7 +17,7 @@ 8 UTF-8 - 5.1.0-bugfix + 5.1.1 0.10.2 diff --git a/powerjob-remote/powerjob-remote-impl-akka/pom.xml b/powerjob-remote/powerjob-remote-impl-akka/pom.xml index 97335b2e..28d7d2ae 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/pom.xml +++ b/powerjob-remote/powerjob-remote-impl-akka/pom.xml @@ -5,19 +5,19 @@ powerjob-remote tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 powerjob-remote-impl-akka - 5.1.0-bugfix + 5.1.1 8 8 UTF-8 - 5.1.0-bugfix + 5.1.1 2.6.13 diff --git a/powerjob-remote/powerjob-remote-impl-http/pom.xml b/powerjob-remote/powerjob-remote-impl-http/pom.xml index e911aef8..572b2731 100644 --- a/powerjob-remote/powerjob-remote-impl-http/pom.xml +++ b/powerjob-remote/powerjob-remote-impl-http/pom.xml @@ -5,12 +5,12 @@ powerjob-remote tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 powerjob-remote-impl-http - 5.1.0-bugfix + 5.1.1 8 @@ -18,7 +18,7 @@ UTF-8 4.3.7 - 5.1.0-bugfix + 5.1.1 diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 22bf07ac..1071da86 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -5,12 +5,12 @@ powerjob tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 powerjob-server - 5.1.0-bugfix + 5.1.1 pom @@ -51,9 +51,9 @@ 3.0.10 9.2.1 - 5.1.0-bugfix - 5.1.0-bugfix - 5.1.0-bugfix + 5.1.1 + 5.1.1 + 5.1.1 1.6.14 3.17.1 1.12.665 diff --git a/powerjob-server/powerjob-server-auth/pom.xml b/powerjob-server/powerjob-server-auth/pom.xml index 5e232bec..17ce8bbe 100644 --- a/powerjob-server/powerjob-server-auth/pom.xml +++ b/powerjob-server/powerjob-server-auth/pom.xml @@ -6,7 +6,7 @@ tech.powerjob powerjob-server - 5.1.0-bugfix + 5.1.1 4.0.0 diff --git a/powerjob-server/powerjob-server-common/pom.xml b/powerjob-server/powerjob-server-common/pom.xml index 13738427..6cab2dae 100644 --- a/powerjob-server/powerjob-server-common/pom.xml +++ b/powerjob-server/powerjob-server-common/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 5.1.0-bugfix + 5.1.1 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-core/pom.xml b/powerjob-server/powerjob-server-core/pom.xml index c6b2278e..5b7d7cc9 100644 --- a/powerjob-server/powerjob-server-core/pom.xml +++ b/powerjob-server/powerjob-server-core/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 5.1.0-bugfix + 5.1.1 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-extension/pom.xml b/powerjob-server/powerjob-server-extension/pom.xml index 776d9f82..7a0fa142 100644 --- a/powerjob-server/powerjob-server-extension/pom.xml +++ b/powerjob-server/powerjob-server-extension/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 5.1.0-bugfix + 5.1.1 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-migrate/pom.xml b/powerjob-server/powerjob-server-migrate/pom.xml index d7b8cfbe..c539fce3 100644 --- a/powerjob-server/powerjob-server-migrate/pom.xml +++ b/powerjob-server/powerjob-server-migrate/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 5.1.0-bugfix + 5.1.1 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-monitor/pom.xml b/powerjob-server/powerjob-server-monitor/pom.xml index f93b02fd..720974d6 100644 --- a/powerjob-server/powerjob-server-monitor/pom.xml +++ b/powerjob-server/powerjob-server-monitor/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 5.1.0-bugfix + 5.1.1 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-persistence/pom.xml b/powerjob-server/powerjob-server-persistence/pom.xml index aeaf95a9..3d8eec79 100644 --- a/powerjob-server/powerjob-server-persistence/pom.xml +++ b/powerjob-server/powerjob-server-persistence/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 5.1.0-bugfix + 5.1.1 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-remote/pom.xml b/powerjob-server/powerjob-server-remote/pom.xml index 82f294cc..cf8d4b52 100644 --- a/powerjob-server/powerjob-server-remote/pom.xml +++ b/powerjob-server/powerjob-server-remote/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 5.1.0-bugfix + 5.1.1 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-starter/pom.xml b/powerjob-server/powerjob-server-starter/pom.xml index 2e5ff247..b3d7927d 100644 --- a/powerjob-server/powerjob-server-starter/pom.xml +++ b/powerjob-server/powerjob-server-starter/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 5.1.0-bugfix + 5.1.1 ../pom.xml 4.0.0 diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index aef2a46a..e2a5b3e6 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -5,24 +5,24 @@ powerjob tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 powerjob-worker-agent - 5.1.0-bugfix + 5.1.1 jar - 5.1.0-bugfix + 5.1.1 1.2.13 4.3.2 5.3.31 2.3.4.RELEASE - 5.1.0-bugfix + 5.1.1 8.0.28 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index 24aa9f16..79f24de8 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -5,22 +5,22 @@ powerjob tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 powerjob-worker-samples - 5.1.0-bugfix + 5.1.1 2.7.18 - 5.1.0-bugfix + 5.1.1 1.2.83 - 5.1.0-bugfix + 5.1.1 true - 5.1.0-bugfix + 5.1.1 diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index 094454c5..e18be923 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -5,16 +5,16 @@ powerjob tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 powerjob-worker-spring-boot-starter - 5.1.0-bugfix + 5.1.1 jar - 5.1.0-bugfix + 5.1.1 2.7.18 diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index ae74a553..4938d56e 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -5,12 +5,12 @@ powerjob tech.powerjob - 5.1.0-bugfix + 5.1.1 4.0.0 powerjob-worker - 5.1.0-bugfix + 5.1.1 jar @@ -21,10 +21,10 @@ 1.2.13 - 5.1.0-bugfix - 5.1.0-bugfix - 5.1.0-bugfix - 5.1.0-bugfix + 5.1.1 + 5.1.1 + 5.1.1 + 5.1.1 From dc62c1b992eb41b4c9d1c6e972d034e5b0b001ee Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 7 Dec 2024 20:42:45 +0800 Subject: [PATCH 12/12] refactor: optimize worker skip round log --- .../worker/core/tracker/task/heavy/HeavyTaskTracker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 314294c8..8977e2bf 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -499,7 +499,6 @@ public abstract class HeavyTaskTracker extends TaskTracker { do { loopTime++; if (loopTime > 2) { - log.warn("[TaskTracker-{}] The cluster has no available workers other than master, so this round dispatch is skipped.", instanceId); skipThisRound.set(true); return; } @@ -514,6 +513,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { }); if (skipThisRound.get()) { + log.warn("[TaskTracker-{}] The cluster has no available workers other than master, so this round dispatch is skipped.", instanceId); break; }