From 592dff8d75d1aa9ea79dd5800088a4bb31931071 Mon Sep 17 00:00:00 2001 From: songyinyin Date: Wed, 20 Sep 2023 17:16:51 +0800 Subject: [PATCH 01/17] chore: When the TaskTracker is successfully executed normally, the log level changes to Info #657 --- .../worker/core/tracker/task/light/LightTaskTracker.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java index 66da591b..791256d0 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java @@ -148,7 +148,12 @@ public class LightTaskTracker extends TaskTracker { } LightTaskTrackerManager.removeTaskTracker(instanceId); // 最后一列为总耗时(即占用资源的耗时,当前时间减去创建时间) - log.warn("[TaskTracker-{}] remove TaskTracker,task status {},start time:{},end time:{},real cost:{},total time:{}", instanceId, status, taskStartTime, taskEndTime, taskEndTime != null ? taskEndTime - taskStartTime : "unknown", System.currentTimeMillis() - createTime); + String msg = String.format("[TaskTracker-%s] remove TaskTracker,task status %s,start time:%s,end time:%s,real cost:%s,total time:%s", instanceId, status, taskStartTime, taskEndTime, taskEndTime != null ? taskEndTime - taskStartTime : "unknown", System.currentTimeMillis() - createTime); + if (TaskStatus.WORKER_PROCESS_SUCCESS.equals(status)) { + log.info(msg); + } else { + log.warn(msg); + } } @Override From 8f3803bda6a39eb5707a4a93e7ece1b889e82899 Mon Sep 17 00:00:00 2001 From: songyinyin Date: Wed, 20 Sep 2023 17:17:36 +0800 Subject: [PATCH 02/17] =?UTF-8?q?fix:=20=E5=91=A8=E6=9C=9F=E6=80=A7?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=87=BA=E7=8E=B0=E5=BC=82=E5=B8=B8=E6=97=B6?= =?UTF-8?q?=EF=BC=8C=E5=AF=BC=E8=87=B4=E4=BB=BB=E5=8A=A1=E5=81=9C=E6=AD=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../worker/background/OmsLogHandler.java | 4 +-- .../worker/background/RunnableAndCatch.java | 25 ++++++++++++++++ .../worker/background/RunnableWrapper.java | 30 +++++++++++++++++++ .../background/WorkerHealthReporter.java | 4 +-- .../tracker/processor/ProcessorTracker.java | 5 ++-- .../tracker/task/heavy/HeavyTaskTracker.java | 9 +++--- .../tracker/task/light/LightTaskTracker.java | 7 +++-- 7 files changed, 71 insertions(+), 13 deletions(-) create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableAndCatch.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableWrapper.java diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java index a9ce0413..89c56afc 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java @@ -69,10 +69,10 @@ public class OmsLogHandler { - private class LogSubmitter implements Runnable { + private class LogSubmitter extends RunnableAndCatch { @Override - public void run() { + public void run0() { boolean lockResult = reportLock.tryLock(); if (!lockResult) { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableAndCatch.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableAndCatch.java new file mode 100644 index 00000000..df45ebfe --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableAndCatch.java @@ -0,0 +1,25 @@ +package tech.powerjob.worker.background; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * 使用 {@link ScheduledExecutorService} 执行任务时,推荐继承此类捕获并打印异常,避免因为抛出异常导致周期性任务终止 + * + * @author songyinyin + * @since 2023/9/20 15:52 + */ +@Slf4j +public abstract class RunnableAndCatch implements Runnable{ + @Override + public void run() { + try { + run0(); + } catch (Exception e) { + log.error("[RunnableAndCatch] run failed", e); + } + } + + protected abstract void run0(); +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableWrapper.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableWrapper.java new file mode 100644 index 00000000..0622d382 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableWrapper.java @@ -0,0 +1,30 @@ +package tech.powerjob.worker.background; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * 使用 {@link ScheduledExecutorService} 执行任务时,推荐使用此对象包装一层,避免因为抛出异常导致周期性任务终止 + * + * @author songyinyin + * @since 2023/9/20 16:04 + */ +@Slf4j +public class RunnableWrapper implements Runnable { + + private final Runnable runnable; + + public RunnableWrapper(Runnable runnable) { + this.runnable = runnable; + } + + @Override + public void run() { + try { + runnable.run(); + } catch (Exception e) { + log.error("[RunnableWrapper] run failed", e); + } + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java index 645d74ed..295da2a3 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java @@ -22,12 +22,12 @@ import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager; */ @Slf4j @RequiredArgsConstructor -public class WorkerHealthReporter implements Runnable { +public class WorkerHealthReporter extends RunnableAndCatch { private final WorkerRuntime workerRuntime; @Override - public void run() { + public void run0() { // 没有可用Server,无法上报 String currentServer = workerRuntime.getServerDiscoveryService().getCurrentServerAddress(); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 7bc79f68..4accd113 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -9,6 +9,7 @@ import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; +import tech.powerjob.worker.background.RunnableAndCatch; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.TransportUtils; @@ -237,11 +238,11 @@ public class ProcessorTracker { /** * 定时向 TaskTracker 汇报(携带任务执行信息的心跳) */ - private class CheckerAndReporter implements Runnable { + private class CheckerAndReporter extends RunnableAndCatch { @Override @SuppressWarnings({"squid:S1066","squid:S3776"}) - public void run() { + public void run0() { // 超时检查,如果超时则自动关闭 TaskTracker long interval = System.currentTimeMillis() - startTime; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 4f75e0ab..e8d9c7dd 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -19,6 +19,7 @@ import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.SegmentLock; +import tech.powerjob.worker.background.RunnableAndCatch; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; @@ -445,13 +446,13 @@ public abstract class HeavyTaskTracker extends TaskTracker { /** * 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去 */ - protected class Dispatcher implements Runnable { + protected class Dispatcher extends RunnableAndCatch { // 数据库查询限制,每次最多查询几个任务 private static final int DB_QUERY_LIMIT = 100; @Override - public void run() { + public void run0() { if (finished.get()) { return; @@ -503,9 +504,9 @@ public abstract class HeavyTaskTracker extends TaskTracker { * 执行器动态上线(for 秒级任务和 MR 任务) * 原则:server 查询得到的 执行器状态不会干预 worker 自己维护的状态,即只做新增,不做任何修改 */ - protected class WorkerDetector implements Runnable { + protected class WorkerDetector extends RunnableAndCatch { @Override - public void run() { + public void run0() { boolean needMoreWorker = ptStatusHolder.checkNeedMoreWorker(); log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", instanceId, needMoreWorker); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java index 791256d0..8f8a1819 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java @@ -9,6 +9,7 @@ import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; +import tech.powerjob.worker.background.RunnableWrapper; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; @@ -93,14 +94,14 @@ public class LightTaskTracker extends TaskTracker { // 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段 long initDelay = RandomUtils.nextInt(5000, 10000); // 上报任务状态 - statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(this::checkAndReportStatus, initDelay, delay, TimeUnit.MILLISECONDS); + statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(new RunnableWrapper(this::checkAndReportStatus), initDelay, delay, TimeUnit.MILLISECONDS); // 超时控制 if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) { if (instanceInfo.getInstanceTimeoutMS() < 1000L) { - timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS); + timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new RunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS); } else { // 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s - timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(this::timeoutCheck, instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS); + timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new RunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS); } } else { timeoutCheckScheduledFuture = null; From 23d94ed46f040616d6f7c962944ea3d4a32ed79d Mon Sep 17 00:00:00 2001 From: yuhan Date: Wed, 3 Jan 2024 14:14:32 +0800 Subject: [PATCH 03/17] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BD=BF=E7=94=A8mysql?= =?UTF-8?q?=E5=AD=98=E5=82=A8=E6=97=A5=E5=BF=97=E7=9A=84=E6=83=85=E5=86=B5?= =?UTF-8?q?=E4=B8=8B=E6=96=87=E4=BB=B6=E6=B5=81=E6=9C=AA=E5=85=B3=E9=97=AD?= =?UTF-8?q?=E9=80=A0=E6=88=90=E7=9A=84=E6=96=87=E4=BB=B6=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E5=8F=A5=E6=9F=84=E4=B8=8D=E9=87=8A=E6=94=BE=E7=9A=84bug?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit (cherry picked from commit 2a9444770d227ffe46d6c700a7e8570ef3e1bc17) --- .../persistence/storage/impl/MySqlSeriesDfsService.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java index 9493f62c..4f30ba34 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java @@ -141,6 +141,7 @@ public class MySqlSeriesDfsService extends AbstractDFsService { Map meta = Maps.newHashMap(); meta.put("_server_", NetUtils.getLocalHost()); meta.put("_local_file_path_", storeRequest.getLocalFile().getAbsolutePath()); + BufferedInputStream bufferedInputStream = new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath())); Date date = new Date(System.currentTimeMillis()); @@ -153,7 +154,7 @@ public class MySqlSeriesDfsService extends AbstractDFsService { pst.setString(4, JsonUtils.toJSONString(meta)); pst.setLong(5, storeRequest.getLocalFile().length()); pst.setInt(6, SwitchableStatus.ENABLE.getV()); - pst.setBlob(7, new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath()))); + pst.setBlob(7, bufferedInputStream); pst.setString(8, null); pst.setDate(9, date); pst.setDate(10, date); @@ -165,6 +166,8 @@ public class MySqlSeriesDfsService extends AbstractDFsService { } catch (Exception e) { log.error("[MySqlSeriesDfsService] store [{}] failed!", fileLocation); ExceptionUtils.rethrow(e); + }finally { + bufferedInputStream.close(); } } From d61d85abd44bbd8dc834dfe1ee2226fffcbc0743 Mon Sep 17 00:00:00 2001 From: liwh <155356751@qq.com> Date: Thu, 4 Jan 2024 14:04:32 +0800 Subject: [PATCH 04/17] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20email?= =?UTF-8?q?=E6=9C=AA=E5=A1=AB=E5=86=99=E5=AF=BC=E8=87=B4=E5=91=8A=E8=AD=A6?= =?UTF-8?q?=E6=8A=A5=E5=BC=82=E5=B8=B8,=20#808?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tech/powerjob/server/core/alarm/impl/MailAlarmService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/alarm/impl/MailAlarmService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/alarm/impl/MailAlarmService.java index dbbf1a0d..838d3291 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/alarm/impl/MailAlarmService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/alarm/impl/MailAlarmService.java @@ -44,7 +44,7 @@ public class MailAlarmService implements Alarmable { SimpleMailMessage sm = new SimpleMailMessage(); try { sm.setFrom(from); - sm.setTo(targetUserList.stream().map(AlarmTarget::getEmail).filter(Objects::nonNull).toArray(String[]::new)); + sm.setTo(targetUserList.stream().map(AlarmTarget::getEmail).filter(Objects::nonNull).filter(email -> !email.isEmpty()).toArray(String[]::new)); sm.setSubject(alarm.fetchTitle()); sm.setText(alarm.fetchContent()); From ff84d467132d042ddc94aaceb2d33c6be83ebd90 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 4 Feb 2024 22:17:53 +0800 Subject: [PATCH 05/17] perf: Discarding the results of the map task to improve performance --- .../powerjob/worker/core/tracker/task/TaskTracker.java | 5 +++++ .../worker/core/tracker/task/heavy/HeavyTaskTracker.java | 7 +++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java index 58a6bc3a..dd398485 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/TaskTracker.java @@ -2,6 +2,7 @@ package tech.powerjob.worker.core.tracker.task; import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerScheduleJobReq; @@ -33,6 +34,7 @@ public abstract class TaskTracker { * 任务实例信息 */ protected final InstanceInfo instanceInfo; + protected final ExecuteType executeType; /** * 追加的工作流上下文数据 * @@ -75,6 +77,9 @@ public abstract class TaskTracker { instanceInfo.setLogConfig(req.getLogConfig()); instanceInfo.setInstanceTimeoutMS(req.getInstanceTimeoutMS()); + // 常用变量初始化 + executeType = ExecuteType.valueOf(req.getExecuteType()); + // 特殊处理超时时间 if (instanceInfo.getInstanceTimeoutMS() <= 0) { instanceInfo.setInstanceTimeoutMS(Integer.MAX_VALUE); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index 4f75e0ab..99f449e6 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -83,7 +83,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress()); this.taskPersistenceService = workerRuntime.getTaskPersistenceService(); // 构建缓存 - taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).build(); + taskId2BriefInfo = CacheBuilder.newBuilder().maximumSize(1024).softValues().build(); // 构建分段锁 segmentLock = new SegmentLock(UPDATE_CONCURRENCY); @@ -226,7 +226,6 @@ public abstract class HeavyTaskTracker extends TaskTracker { 3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址) */ String taskName = taskOpt.get().getTaskName(); - ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) { updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS); } @@ -243,8 +242,8 @@ public abstract class HeavyTaskTracker extends TaskTracker { } } - // 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...) - result = result == null ? "" : result; + // 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...)(24.2.4 更新:大规模 MAP 任务追求极限性能,不持久化无用的子任务 result) + result = result == null || ExecuteType.MAP.equals(executeType) ? "" : result; boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result); if (!updateResult) { From 1ba74bf0af56dbb53039c4507f208b548865962a Mon Sep 17 00:00:00 2001 From: tjq Date: Mon, 5 Feb 2024 00:12:37 +0800 Subject: [PATCH 06/17] test: performance test for h2 --- .../worker/persistence/TaskDAOImpl.java | 18 ++- .../persistence/AbstractTaskDAOTest.java | 31 +++++ .../persistence/TaskDAOPerformanceTest.java | 111 ++++++++++++++++++ .../worker/persistence/TaskDAOTest.java | 20 +--- .../src/test/resources/logback-test.xml | 22 ++++ 5 files changed, 181 insertions(+), 21 deletions(-) create mode 100644 powerjob-worker/src/test/java/tech/powerjob/worker/persistence/AbstractTaskDAOTest.java create mode 100644 powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOPerformanceTest.java diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAOImpl.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAOImpl.java index 96c4ab89..9863831d 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAOImpl.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAOImpl.java @@ -17,11 +17,20 @@ import java.util.Map; * @author tjq * @since 2020/3/17 */ -@AllArgsConstructor public class TaskDAOImpl implements TaskDAO { - + + private final boolean useIndex; private final ConnectionFactory connectionFactory; + public TaskDAOImpl(ConnectionFactory connectionFactory) { + this(false, connectionFactory); + } + + public TaskDAOImpl(boolean useIndex, ConnectionFactory connectionFactory) { + this.useIndex = useIndex; + this.connectionFactory = connectionFactory; + } + @Override public void initTable() throws Exception { @@ -30,9 +39,14 @@ public class TaskDAOImpl implements TaskDAO { // bigint(20) 与 Java Long 取值范围完全一致 String createTableSQL = "create table task_info (task_id varchar(255), instance_id bigint, sub_instance_id bigint, task_name varchar(255), task_content blob, address varchar(255), status int, result text, failed_cnt int, created_time bigint, last_modified_time bigint, last_report_time bigint, constraint pkey unique (instance_id, task_id))"; + String createIndexSQL = "create INDEX idx_status ON task_info (status)"; + try (Connection conn = connectionFactory.getConnection(); Statement stat = conn.createStatement()) { stat.execute(delTableSQL); stat.execute(createTableSQL); + if (useIndex) { + stat.execute(createIndexSQL); + } } } diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/AbstractTaskDAOTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/AbstractTaskDAOTest.java new file mode 100644 index 00000000..9e15ceb5 --- /dev/null +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/AbstractTaskDAOTest.java @@ -0,0 +1,31 @@ +package tech.powerjob.worker.persistence; + +import tech.powerjob.worker.common.constants.TaskStatus; + +import java.nio.charset.StandardCharsets; + +/** + * AbstractTaskDAOTest + * + * @author tjq + * @since 2024/2/4 + */ +public class AbstractTaskDAOTest { + + protected static TaskDO buildTaskDO(String taskId, Long instanceId, TaskStatus taskStatus) { + TaskDO taskDO = new TaskDO(); + taskDO.setTaskId(taskId); + taskDO.setInstanceId(instanceId); + taskDO.setSubInstanceId(instanceId); + taskDO.setTaskName("TEST_TASK"); + taskDO.setTaskContent("TEST_CONTENT".getBytes(StandardCharsets.UTF_8)); + taskDO.setAddress("127.0.0.1:10086"); + taskDO.setStatus(taskStatus.getValue()); + taskDO.setResult("SUCCESS"); + taskDO.setFailedCnt(0); + taskDO.setLastModifiedTime(System.currentTimeMillis()); + taskDO.setLastReportTime(System.currentTimeMillis()); + taskDO.setCreatedTime(System.currentTimeMillis()); + return taskDO; + } +} diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOPerformanceTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOPerformanceTest.java new file mode 100644 index 00000000..39014fc7 --- /dev/null +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOPerformanceTest.java @@ -0,0 +1,111 @@ +package tech.powerjob.worker.persistence; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.util.StopWatch; +import tech.powerjob.worker.common.constants.StoreStrategy; +import tech.powerjob.worker.common.constants.TaskStatus; +import tech.powerjob.worker.core.processor.TaskResult; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 任务持久化层 - 性能测试 + * + * @author tjq + * @since 2024/2/4 + */ +@Slf4j(topic = "PERFORMANCE_TEST_LOGGER") +public class TaskDAOPerformanceTest extends AbstractTaskDAOTest { + + private static final int INSERT_NUM = 100000; + + private static final Long INSTANCE_ID = 10086L; + + @Test + void testInsert() throws Exception { + TaskDAO noIndexDao = initTaskDao(false); + TaskDAO indexDao = initTaskDao(true); + + for (int i = 0; i < 1; i++) { + testWriteThenRead(noIndexDao, INSERT_NUM, "no-idx-" + i); + testWriteThenRead(indexDao, INSERT_NUM, "uu-idx-" + i); + } + } + + @SneakyThrows + private void testWriteThenRead(TaskDAO taskDAO, int num, String taskName) { + + String logKey = "testWriteThenRead-" + taskName; + StopWatch stopWatch = new StopWatch(); + + + AtomicLong atomicLong = new AtomicLong(); + + ForkJoinPool pool = new ForkJoinPool(256); + + CountDownLatch latch = new CountDownLatch(num); + + stopWatch.start("Insert"); + for (int i = 0; i < num; i++) { + pool.execute(() -> { + long id = atomicLong.incrementAndGet(); + String taskId = String.format("%s.%d", taskName, id); + TaskDO taskDO = buildTaskDO(taskId, INSTANCE_ID, TaskStatus.of(ThreadLocalRandom.current().nextInt(1, 7))); + try { + long s = System.currentTimeMillis(); + taskDAO.save(taskDO); + long cost = System.currentTimeMillis() - s; + if (cost > 10) { + log.warn("[{}] id={} save cost too much: {}", logKey, id, cost); + } + } catch (Exception e) { + log.error("[{}] id={} save failed!", logKey, id, e); + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + stopWatch.stop(); + + + stopWatch.start("READ-getAllTaskResult"); + // 测试读 + List allTaskResult = taskDAO.getAllTaskResult(INSTANCE_ID, INSTANCE_ID); + stopWatch.stop(); + + // 测试统计 + stopWatch.start("READ-countByStatus"); + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(INSTANCE_ID); + query.setSubInstanceId(INSTANCE_ID); + query.setQueryContent("status, count(*) as num"); + query.setOtherCondition("GROUP BY status"); + List> countByStatus = taskDAO.simpleQueryPlus(query); + stopWatch.stop(); + + String prettyPrint = stopWatch.prettyPrint(); + System.out.println(logKey + ": " + prettyPrint); + log.info("[{}] {}", logKey, prettyPrint); + + } + + @SneakyThrows + private TaskDAO initTaskDao(boolean useIndex) { + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.initDatasource(StoreStrategy.DISK); + TaskDAO taskDAO = new TaskDAOImpl(useIndex, connectionFactory); + + taskDAO.initTable(); + return taskDAO; + } +} diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOTest.java index a0a4cd1f..7c02082f 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/persistence/TaskDAOTest.java @@ -24,7 +24,7 @@ import static org.junit.jupiter.api.Assertions.*; * @since 2022/10/23 */ @Slf4j -class TaskDAOTest { +class TaskDAOTest extends AbstractTaskDAOTest { private static TaskDAO taskDAO; @@ -94,22 +94,4 @@ class TaskDAOTest { assert allTaskResult.size() == 2; } - private static TaskDO buildTaskDO(String taskId, Long instanceId, TaskStatus taskStatus) { - TaskDO taskDO = new TaskDO(); - taskDO.setTaskId(taskId); - taskDO.setInstanceId(instanceId); - taskDO.setSubInstanceId(instanceId); - taskDO.setTaskName("TEST_TASK"); - taskDO.setTaskContent("TEST_CONTENT".getBytes(StandardCharsets.UTF_8)); - taskDO.setAddress("127.0.0.1:10086"); - taskDO.setStatus(taskStatus.getValue()); - taskDO.setResult("SUCCESS"); - taskDO.setFailedCnt(0); - taskDO.setLastModifiedTime(System.currentTimeMillis()); - taskDO.setLastReportTime(System.currentTimeMillis()); - taskDO.setCreatedTime(System.currentTimeMillis()); - return taskDO; - } - - } \ No newline at end of file diff --git a/powerjob-worker/src/test/resources/logback-test.xml b/powerjob-worker/src/test/resources/logback-test.xml index 97891583..733161e6 100644 --- a/powerjob-worker/src/test/resources/logback-test.xml +++ b/powerjob-worker/src/test/resources/logback-test.xml @@ -20,6 +20,28 @@ + + + ${user.home}/powerjob/worker/test/logs/performance_test.log + + %d{yyyy-MM-dd HH:mm:ss.SSS}|%thread|%msg%n + UTF-8 + + + ${user.home}/powerjob/worker/test/logs/performance_test.log.%d{yyyy-MM-dd} + 7 + + + + 512 + 0 + true + + + + + + From 4be6a139dd553faa2477a2fc2f4e69f5628f26e8 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Feb 2024 10:42:47 +0800 Subject: [PATCH 07/17] fix: Cyclic Logging on Exception #769 --- .../server/core/scheduler/CoreScheduleTaskManager.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CoreScheduleTaskManager.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CoreScheduleTaskManager.java index 4a46e497..a814e410 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CoreScheduleTaskManager.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/scheduler/CoreScheduleTaskManager.java @@ -67,8 +67,11 @@ public class CoreScheduleTaskManager implements InitializingBean, DisposableBean log.info("start task : {}.", taskName); while (true) { try { - innerRunnable.run(); + + // 倒置顺序为 先 sleep 再执行,解决异常情况 while true 打日志的问题 https://github.com/PowerJob/PowerJob/issues/769 Thread.sleep(runningInterval); + + innerRunnable.run(); } catch (InterruptedException e) { log.warn("[{}] task has been interrupted!", taskName, e); break; From e63dc91643b11e85d49fd237fdf8e5f5b361b789 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Feb 2024 11:14:19 +0800 Subject: [PATCH 08/17] fix: @PowerJobHandler does not work in cglib proxy #770 --- .../samples/anno/ATestMethodAnnotation.java | 16 ++++++++++++++++ .../tester/SpringMethodProcessorService.java | 9 +++++++++ .../BuildInSpringMethodProcessorFactory.java | 7 +++++++ 3 files changed, 32 insertions(+) create mode 100644 powerjob-worker-samples/src/main/java/tech/powerjob/samples/anno/ATestMethodAnnotation.java diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/anno/ATestMethodAnnotation.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/anno/ATestMethodAnnotation.java new file mode 100644 index 00000000..1b1347f7 --- /dev/null +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/anno/ATestMethodAnnotation.java @@ -0,0 +1,16 @@ +package tech.powerjob.samples.anno; + +import java.lang.annotation.*; + +/** + * 自定义方法注解 + * 自定义注解导致 @PowerJobHandler 失效 + * + * @author tjq + * @since 2024/2/8 + */ +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface ATestMethodAnnotation { +} diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/SpringMethodProcessorService.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/SpringMethodProcessorService.java index ce0777cd..8fd025f0 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/SpringMethodProcessorService.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/tester/SpringMethodProcessorService.java @@ -1,6 +1,7 @@ package tech.powerjob.samples.tester; import org.springframework.stereotype.Component; +import tech.powerjob.samples.anno.ATestMethodAnnotation; import tech.powerjob.worker.annotation.PowerJobHandler; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.log.OmsLogger; @@ -33,4 +34,12 @@ public class SpringMethodProcessorService { omsLogger.warn("testThrowException"); throw new IllegalArgumentException("test"); } + + @ATestMethodAnnotation + @PowerJobHandler(name = "testNormalReturnWithCustomAnno") + public String testNormalReturnWithCustomAnno(TaskContext context) { + OmsLogger omsLogger = context.getOmsLogger(); + omsLogger.warn("测试自定义注解"); + return "testNormalReturnWithCustomAnno"; + } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/BuildInSpringMethodProcessorFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/BuildInSpringMethodProcessorFactory.java index 00021bb6..342baf3d 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/BuildInSpringMethodProcessorFactory.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/processor/impl/BuildInSpringMethodProcessorFactory.java @@ -3,6 +3,7 @@ package tech.powerjob.worker.processor.impl; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.context.ApplicationContext; +import org.springframework.core.annotation.AnnotationUtils; import tech.powerjob.worker.annotation.PowerJobHandler; import tech.powerjob.worker.extension.processor.ProcessorBean; import tech.powerjob.worker.extension.processor.ProcessorDefinition; @@ -51,6 +52,12 @@ public class BuildInSpringMethodProcessorFactory extends AbstractBuildInSpringPr Method[] methods = bean.getClass().getDeclaredMethods(); for (Method method : methods) { PowerJobHandler powerJob = method.getAnnotation(PowerJobHandler.class); + + // CGLib代理对象拿不到该注解, 通过 AnnotationUtils.findAnnotation()可以获取到注解 by GitHub@zhangxiang0907 https://github.com/PowerJob/PowerJob/issues/770 + if (powerJob == null) { + powerJob = AnnotationUtils.findAnnotation(method, PowerJobHandler.class); + } + if (powerJob == null) { continue; } From f3dd56bf54a10624fe32c8ff56e372d248d30d10 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Feb 2024 12:26:22 +0800 Subject: [PATCH 09/17] chore: upgrade h2 and spring version --- powerjob-official-processors/pom.xml | 7 +++---- powerjob-remote/powerjob-remote-benchmark/pom.xml | 2 +- powerjob-server/pom.xml | 4 ++-- powerjob-worker-agent/pom.xml | 2 +- powerjob-worker-samples/pom.xml | 2 +- powerjob-worker-spring-boot-starter/pom.xml | 2 +- powerjob-worker/pom.xml | 4 ++-- 7 files changed, 11 insertions(+), 12 deletions(-) diff --git a/powerjob-official-processors/pom.xml b/powerjob-official-processors/pom.xml index 7a4eb979..74b846c6 100644 --- a/powerjob-official-processors/pom.xml +++ b/powerjob-official-processors/pom.xml @@ -21,10 +21,9 @@ 5.9.1 1.2.9 4.3.6 - 5.2.9.RELEASE - 2.2.220 + 2.2.224 8.0.28 - 5.3.23 + 5.3.31 1.2.83 @@ -87,7 +86,7 @@ org.springframework spring-jdbc - ${spring.jdbc.version} + ${spring.version} test diff --git a/powerjob-remote/powerjob-remote-benchmark/pom.xml b/powerjob-remote/powerjob-remote-benchmark/pom.xml index c9b80b94..85121446 100644 --- a/powerjob-remote/powerjob-remote-benchmark/pom.xml +++ b/powerjob-remote/powerjob-remote-benchmark/pom.xml @@ -20,7 +20,7 @@ 3.2.2 1.2.9 - 2.7.14 + 2.7.18 4.3.6 4.3.6 diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 81b49f4f..dea2964a 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -26,7 +26,7 @@ - 2.7.14 + 2.7.18 8.0.33 @@ -34,7 +34,7 @@ 7.4.1.jre8 11.5.0.0 42.6.0 - 2.2.220 + 2.2.224 4.10.2 2.11.2 diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index b9dc6f4a..45f671bc 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -18,7 +18,7 @@ 4.3.6 1.2.9 4.3.2 - 5.3.23 + 5.3.31 2.3.4.RELEASE diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index c5784572..5160dbbc 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -13,7 +13,7 @@ 4.3.6 - 2.7.14 + 2.7.18 4.3.6 1.2.83 4.3.6 diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index b70896a0..9903bde8 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -15,7 +15,7 @@ 4.3.6 - 2.7.14 + 2.7.18 diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index d35c4aa0..fa12aa16 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -14,8 +14,8 @@ jar - 5.3.23 - 2.2.220 + 5.3.31 + 2.2.224 4.0.3 5.9.1 From debc2e0abba655bcd01e16f3a566b023d19aa9d0 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Feb 2024 13:14:59 +0800 Subject: [PATCH 10/17] fix: instanceInfo cannot display details on non-scheduled server nodes --- .../server/core/instance/InstanceService.java | 4 +++- .../server/web/controller/InstanceController.java | 11 ++++++++--- .../processors/MapReduceProcessorDemo.java | 15 ++++++++++----- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java index 53071a6d..29a0cfc2 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/instance/InstanceService.java @@ -260,10 +260,12 @@ public class InstanceService { /** * 获取任务实例的详细运行详细 * + * @param appId 用于远程 server 路由,勿删! * @param instanceId 任务实例ID * @return 详细运行状态 */ - public InstanceDetail getInstanceDetail(Long instanceId) { + @DesignateServer + public InstanceDetail getInstanceDetail(Long appId, Long instanceId) { InstanceInfoDO instanceInfoDO = fetchInstanceInfo(instanceId); diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/InstanceController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/InstanceController.java index d2f729c2..008d05e7 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/InstanceController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/InstanceController.java @@ -6,7 +6,6 @@ import tech.powerjob.server.common.utils.OmsFileUtils; import tech.powerjob.server.persistence.PageResult; import tech.powerjob.server.persistence.StringPage; import tech.powerjob.server.persistence.remote.model.InstanceInfoDO; -import tech.powerjob.server.persistence.remote.repository.AppInfoRepository; import tech.powerjob.server.persistence.remote.repository.InstanceInfoRepository; import tech.powerjob.server.core.service.CacheService; import tech.powerjob.server.core.instance.InstanceLogService; @@ -69,8 +68,14 @@ public class InstanceController { } @GetMapping("/detail") - public ResultDTO getInstanceDetail(Long instanceId) { - return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(instanceId))); + public ResultDTO getInstanceDetail(Long appId, Long instanceId) { + + // 兼容老版本前端不存在 appId 的场景 + if (appId == null) { + appId = instanceService.getInstanceInfo(instanceId).getAppId(); + } + + return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(appId, instanceId))); } @GetMapping("/log") diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java index 2b77c370..2f69fd4b 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/MapReduceProcessorDemo.java @@ -2,6 +2,7 @@ package tech.powerjob.samples.processors; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import lombok.*; import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; @@ -9,10 +10,6 @@ import tech.powerjob.worker.core.processor.TaskResult; import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor; import tech.powerjob.worker.log.OmsLogger; import com.google.common.collect.Lists; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -85,10 +82,18 @@ public class MapReduceProcessorDemo implements MapReduceProcessor { } @Getter + @Setter @ToString - @NoArgsConstructor @AllArgsConstructor public static class TestSubTask { + + /** + * 注意:代表子任务参数的类:一定要有无参构造方法!一定要有无参构造方法!一定要有无参构造方法! + * 最好把 GET / SET 方法也加上,减少序列化问题的概率 + */ + public TestSubTask() { + } + private String name; private int age; } From 6842fb6a7b143b10190dfa3872db9e3770354707 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Feb 2024 14:25:55 +0800 Subject: [PATCH 11/17] perf: add cost log for TaskPersistenceService --- .../persistence/TaskPersistenceService.java | 102 ++++++------------ .../worker/test/PersistenceServiceTest.java | 8 -- 2 files changed, 30 insertions(+), 80 deletions(-) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java index f490146c..42bdc8fe 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java @@ -13,10 +13,10 @@ import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.core.processor.TaskResult; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; /** * 任务持久化服务 @@ -36,6 +36,11 @@ public class TaskPersistenceService { private static final long RETRY_INTERVAL_MS = 100; + /** + * 慢查询定义:200ms + */ + private static final long SLOW_QUERY_RT_THRESHOLD = 200; + private TaskDAO taskDAO; public TaskPersistenceService(StoreStrategy strategy) { @@ -54,7 +59,7 @@ public class TaskPersistenceService { public boolean save(TaskDO task) { try { - return execute(() -> taskDAO.save(task)); + return execute(() -> taskDAO.save(task), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] taskId={} save cost {}ms", task.getInstanceId(), task.getTaskId(), cost)); }catch (Exception e) { log.error("[TaskPersistenceService] save task{} failed.", task, e); } @@ -66,7 +71,7 @@ public class TaskPersistenceService { return true; } try { - return execute(() -> taskDAO.batchSave(tasks)); + return execute(() -> taskDAO.batchSave(tasks), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] batchSave cost {}ms", tasks.get(0).getInstanceId(), cost)); }catch (Exception e) { log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e); } @@ -80,7 +85,7 @@ public class TaskPersistenceService { try { updateEntity.setLastModifiedTime(System.currentTimeMillis()); SimpleTaskQuery query = genKeyQuery(instanceId, taskId); - return execute(() -> taskDAO.simpleUpdate(query, updateEntity)); + return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTask(taskId={}) cost {}ms", instanceId, taskId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] updateTask failed.", e); } @@ -92,7 +97,7 @@ public class TaskPersistenceService { */ public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) { try { - return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result)); + return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateTaskStatus(taskId={}) cost {}ms", instanceId, taskId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] updateTaskStatus failed.", e); } @@ -125,7 +130,7 @@ public class TaskPersistenceService { log.debug("[TaskPersistenceService] updateLostTasks-QUERY-SQL: {}", query.getQueryCondition()); try { - return execute(() -> taskDAO.simpleUpdate(query, updateEntity)); + return execute(() -> taskDAO.simpleUpdate(query, updateEntity), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] updateLostTasks cost {}ms", instanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] updateLostTasks failed.", e); } @@ -148,7 +153,7 @@ public class TaskPersistenceService { return Optional.empty(); } return Optional.of(taskDOS.get(0)); - }); + }, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getLastTask cost {}ms", instanceId, subInstanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] get last task for instance(id={}) failed.", instanceId, e); } @@ -161,7 +166,7 @@ public class TaskPersistenceService { SimpleTaskQuery query = new SimpleTaskQuery(); query.setInstanceId(instanceId); query.setSubInstanceId(subInstanceId); - return execute(() -> taskDAO.simpleQuery(query)); + return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTask cost {}ms", instanceId, subInstanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] getAllTask for instance(id={}) failed.", instanceId, e); } @@ -178,7 +183,7 @@ public class TaskPersistenceService { query.setAddress(address); query.setQueryCondition(condition); - return execute(() -> taskDAO.simpleQuery(query)); + return execute(() -> taskDAO.simpleQuery(query) , cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getAllUnFinishedTaskByAddress({}) cost {}ms", instanceId, address, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] getAllTaskByAddress for instance(id={}) failed.", instanceId, e); } @@ -194,7 +199,7 @@ public class TaskPersistenceService { query.setInstanceId(instanceId); query.setStatus(status.getValue()); query.setLimit(limit); - return execute(() -> taskDAO.simpleQuery(query)); + return execute(() -> taskDAO.simpleQuery(query), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTaskByStatus({}) cost {}ms", instanceId, status, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] getTaskByStatus failed, params is instanceId={},status={}.", instanceId, status, e); } @@ -224,7 +229,7 @@ public class TaskPersistenceService { result.put(TaskStatus.of(status), num); }); return result; - }); + }, cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getTaskStatusStatistics cost {}ms", instanceId, subInstanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e); } @@ -236,31 +241,13 @@ public class TaskPersistenceService { */ public List getAllTaskResult(Long instanceId, Long subInstanceId) { try { - return execute(() -> taskDAO.getAllTaskResult(instanceId, subInstanceId)); + return execute(() -> taskDAO.getAllTaskResult(instanceId, subInstanceId), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] getAllTaskResult cost {}ms", instanceId, subInstanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] getTaskId2ResultMap for instance(id={}) failed.", instanceId, e); } return Lists.newLinkedList(); } - /** - * 查询任务状态(只查询 status,节约 I/O 资源 -> 测试表明,在(我高端的NVMe)SSD上都效果惊人...别说一般的HDD了...磁盘I/O果然是重要瓶颈...) - */ - public Optional getTaskStatus(Long instanceId, String taskId) { - - try { - SimpleTaskQuery query = genKeyQuery(instanceId, taskId); - query.setQueryContent("status"); - return execute(() -> { - List> rows = taskDAO.simpleQueryPlus(query); - return Optional.of(TaskStatus.of((int) rows.get(0).get("status"))); - }); - }catch (Exception e) { - log.error("[TaskPersistenceService] getTaskStatus failed, instanceId={},taskId={}.", instanceId, taskId, e); - } - return Optional.empty(); - } - /** * 根据主键查询 Task */ @@ -273,7 +260,7 @@ public class TaskPersistenceService { return Optional.empty(); } return Optional.of(res.get(0)); - }); + }, cost -> log.warn("[TaskPersistenceService] [Slow] [{}] getTask(taskId={}) cost {}ms", instanceId, taskId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] getTask failed, instanceId={},taskId={}.", instanceId, taskId, e); } @@ -281,35 +268,11 @@ public class TaskPersistenceService { } - /** - * 批量更新 Task 状态 - */ - public boolean batchUpdateTaskStatus(Long instanceId, List taskIds, TaskStatus status, String result) { - try { - return execute(() -> { - - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setInstanceId(instanceId); - query.setQueryCondition(String.format(" task_id in %s ", CommonUtils.getInStringCondition(taskIds))); - - TaskDO updateEntity = new TaskDO(); - updateEntity.setStatus(status.getValue()); - updateEntity.setResult(result); - return taskDAO.simpleUpdate(query, updateEntity); - }); - }catch (Exception e) { - log.error("[TaskPersistenceService] updateTaskStatus failed, instanceId={},taskIds={},status={},result={}.", - instanceId, taskIds, status, result, e); - } - return false; - } - - public boolean deleteAllTasks(Long instanceId) { try { SimpleTaskQuery condition = new SimpleTaskQuery(); condition.setInstanceId(instanceId); - return execute(() -> taskDAO.simpleDelete(condition)); + return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}] deleteAllTasks cost {}ms", instanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e); } @@ -321,26 +284,13 @@ public class TaskPersistenceService { SimpleTaskQuery condition = new SimpleTaskQuery(); condition.setInstanceId(instanceId); condition.setSubInstanceId(subInstanceId); - return execute(() -> taskDAO.simpleDelete(condition)); + return execute(() -> taskDAO.simpleDelete(condition), cost -> log.warn("[TaskPersistenceService] [Slow] [{}.{}] deleteAllSubInstanceTasks cost {}ms", instanceId, subInstanceId, cost)); }catch (Exception e) { log.error("[TaskPersistenceService] deleteAllTasks failed, instanceId={}.", instanceId, e); } return false; } - public List listAll() { - try { - return execute(() -> { - SimpleTaskQuery query = new SimpleTaskQuery(); - query.setQueryCondition("1 = 1"); - return taskDAO.simpleQuery(query); - }); - }catch (Exception e) { - log.error("[TaskPersistenceService] listAll failed.", e); - } - return Collections.emptyList(); - } - private static SimpleTaskQuery genKeyQuery(Long instanceId, String taskId) { SimpleTaskQuery condition = new SimpleTaskQuery(); condition.setInstanceId(instanceId); @@ -348,7 +298,15 @@ public class TaskPersistenceService { return condition; } - private static T execute(SupplierPlus executor) throws Exception { - return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS); + private static T execute(SupplierPlus executor, Consumer slowQueryLogger) throws Exception { + long s = System.currentTimeMillis(); + try { + return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS); + } finally { + long cost = System.currentTimeMillis() - s; + if (cost > SLOW_QUERY_RT_THRESHOLD) { + slowQueryLogger.accept(cost); + } + } } } diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java index 6c90b5ed..ba5e67a5 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java @@ -56,14 +56,6 @@ public class PersistenceServiceTest { Thread.sleep(60000); } - @AfterEach - public void listData() { - System.out.println("============= listData ============="); - List result = taskPersistenceService.listAll(); - System.out.println("size: " + result.size()); - result.forEach(System.out::println); - } - @Test public void testBatchSave(){ From 6de5e83a2fe61582300b2067ea860871174a5a52 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Feb 2024 15:12:55 +0800 Subject: [PATCH 12/17] chore: upgrade logback version to fix logback serialization vulnerability #80 --- powerjob-official-processors/pom.xml | 2 +- powerjob-remote/pom.xml | 2 +- powerjob-remote/powerjob-remote-benchmark/pom.xml | 2 +- powerjob-worker-agent/pom.xml | 2 +- powerjob-worker/pom.xml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/powerjob-official-processors/pom.xml b/powerjob-official-processors/pom.xml index 74b846c6..c92a6903 100644 --- a/powerjob-official-processors/pom.xml +++ b/powerjob-official-processors/pom.xml @@ -19,7 +19,7 @@ 5.9.1 - 1.2.9 + 1.2.13 4.3.6 2.2.224 8.0.28 diff --git a/powerjob-remote/pom.xml b/powerjob-remote/pom.xml index 511139c0..ede2ca6f 100644 --- a/powerjob-remote/pom.xml +++ b/powerjob-remote/pom.xml @@ -24,7 +24,7 @@ UTF-8 5.9.0 - 1.2.9 + 1.2.13 diff --git a/powerjob-remote/powerjob-remote-benchmark/pom.xml b/powerjob-remote/powerjob-remote-benchmark/pom.xml index 85121446..d91e2507 100644 --- a/powerjob-remote/powerjob-remote-benchmark/pom.xml +++ b/powerjob-remote/powerjob-remote-benchmark/pom.xml @@ -19,7 +19,7 @@ 3.10.1 3.2.2 - 1.2.9 + 1.2.13 2.7.18 4.3.6 4.3.6 diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index 45f671bc..dc1ff8f5 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -16,7 +16,7 @@ 4.3.6 - 1.2.9 + 1.2.13 4.3.2 5.3.31 diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index fa12aa16..f92013cf 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -19,7 +19,7 @@ 4.0.3 5.9.1 - 1.2.9 + 1.2.13 4.3.6 4.3.6 From 61aecc6354e694861b35e0cc33f31d58d1af7a06 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Feb 2024 16:08:39 +0800 Subject: [PATCH 13/17] refactor: optimize NetUtils --- .../main/java/tech/powerjob/common/utils/NetUtils.java | 8 ++++++++ powerjob-common/src/test/java/NetUtilsTest.java | 6 +++--- .../processors/impl/VerificationProcessor.java | 4 ++-- .../server/persistence/storage/AbstractDFsService.java | 10 +++++++++- .../storage/impl/MySqlSeriesDfsService.java | 2 +- .../server/web/controller/ServerController.java | 2 +- .../java/tech/powerjob/server/test/RepositoryTest.java | 4 ++-- .../samples/processors/BroadcastProcessorDemo.java | 6 +++--- .../java/tech/powerjob/worker/test/CommonTest.java | 2 +- .../powerjob/worker/test/PersistenceServiceTest.java | 8 ++++---- .../test/java/tech/powerjob/worker/test/TestUtils.java | 2 +- 11 files changed, 35 insertions(+), 19 deletions(-) diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java index 56fa7b3b..a17370d8 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java @@ -94,6 +94,14 @@ public class NetUtils { return LOCALHOST_VALUE; } + /** + * 隔离调用 scope,核心场景才能直接调用 getLocalHost,方便查看使用点 + * @return IP + */ + public static String getLocalHost4Test() { + return getLocalHost(); + } + /** * Find first valid IP from local network card * diff --git a/powerjob-common/src/test/java/NetUtilsTest.java b/powerjob-common/src/test/java/NetUtilsTest.java index 7d6c562a..98bfeda0 100644 --- a/powerjob-common/src/test/java/NetUtilsTest.java +++ b/powerjob-common/src/test/java/NetUtilsTest.java @@ -12,19 +12,19 @@ public class NetUtilsTest { @Test public void testOrigin() { - System.out.println(NetUtils.getLocalHost()); + System.out.println(NetUtils.getLocalHost4Test()); } @Test public void testPreferredNetworkInterface() { System.setProperty(PowerJobDKey.PREFERRED_NETWORK_INTERFACE, "en5"); - System.out.println(NetUtils.getLocalHost()); + System.out.println(NetUtils.getLocalHost4Test()); } @Test public void testIgnoredNetworkInterface() { System.setProperty(PowerJobDKey.IGNORED_NETWORK_INTERFACE_REGEX, "utun.|llw."); - System.out.println(NetUtils.getLocalHost()); + System.out.println(NetUtils.getLocalHost4Test()); } } diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/VerificationProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/VerificationProcessor.java index 4588c22e..1a905616 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/VerificationProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/VerificationProcessor.java @@ -123,14 +123,14 @@ public class VerificationProcessor extends CommonBasicProcessor implements MapRe @Override public ProcessResult preProcess(TaskContext context) throws Exception { - context.getOmsLogger().info("start to preProcess, current worker IP is {}.", NetUtils.getLocalHost()); + context.getOmsLogger().info("start to preProcess, current worker IP is {}.", NetUtils.getLocalHost4Test()); return new ProcessResult(true, "preProcess successfully!"); } @Override public ProcessResult postProcess(TaskContext context, List taskResults) throws Exception { OmsLogger omsLogger = context.getOmsLogger(); - omsLogger.info("start to postProcess, current worker IP is {}.", NetUtils.getLocalHost()); + omsLogger.info("start to postProcess, current worker IP is {}.", NetUtils.getLocalHost4Test()); omsLogger.info("====== All Node's Process Result ======"); taskResults.forEach(r -> omsLogger.info("taskId:{},success:{},result:{}", r.getTaskId(), r.isSuccess(), r.getResult())); return new ProcessResult(true, "postProcess successfully!"); diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java index edb52a8c..abe0f477 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/AbstractDFsService.java @@ -6,6 +6,8 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.core.env.Environment; +import tech.powerjob.server.common.aware.ServerInfoAware; +import tech.powerjob.server.common.module.ServerInfo; import tech.powerjob.server.extension.dfs.DFsService; /** @@ -15,8 +17,9 @@ import tech.powerjob.server.extension.dfs.DFsService; * @since 2023/7/28 */ @Slf4j -public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, DisposableBean { +public abstract class AbstractDFsService implements DFsService, ApplicationContextAware, ServerInfoAware, DisposableBean { + protected ServerInfo serverInfo; protected ApplicationContext applicationContext; public AbstractDFsService() { @@ -38,4 +41,9 @@ public abstract class AbstractDFsService implements DFsService, ApplicationConte log.info("[DFsService] invoke [{}]'s setApplicationContext", this.getClass().getName()); init(applicationContext); } + + @Override + public void setServerInfo(ServerInfo serverInfo) { + this.serverInfo = serverInfo; + } } diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java index 4f30ba34..fb79b3e7 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/storage/impl/MySqlSeriesDfsService.java @@ -139,7 +139,7 @@ public class MySqlSeriesDfsService extends AbstractDFsService { deleteByLocation(fileLocation); Map meta = Maps.newHashMap(); - meta.put("_server_", NetUtils.getLocalHost()); + meta.put("_server_", serverInfo.getIp()); meta.put("_local_file_path_", storeRequest.getLocalFile().getAbsolutePath()); BufferedInputStream bufferedInputStream = new BufferedInputStream(Files.newInputStream(storeRequest.getLocalFile().toPath())); diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java index c3343941..60eaebd6 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java @@ -69,7 +69,7 @@ public class ServerController implements ServerInfoAware { @GetMapping("/hello") public ResultDTO ping(@RequestParam(required = false) boolean debug) { JSONObject res = new JSONObject(); - res.put("localHost", NetUtils.getLocalHost()); + res.put("localHost", serverInfo.getIp()); res.put("serverInfo", serverInfo); res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis())); res.put("serverTimeTs", System.currentTimeMillis()); diff --git a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/RepositoryTest.java b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/RepositoryTest.java index 0ea80988..0397ab92 100644 --- a/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/RepositoryTest.java +++ b/powerjob-server/powerjob-server-starter/src/test/java/tech/powerjob/server/test/RepositoryTest.java @@ -53,7 +53,7 @@ public class RepositoryTest { public void testBatchLock() { List locks = Lists.newArrayList(); for (int i = 0; i < 10; i++) { - OmsLockDO lockDO = new OmsLockDO("lock" + i, NetUtils.getLocalHost(), 10000L); + OmsLockDO lockDO = new OmsLockDO("lock" + i, NetUtils.getLocalHost4Test(), 10000L); locks.add(lockDO); } omsLockRepository.saveAll(locks); @@ -63,7 +63,7 @@ public class RepositoryTest { @Test public void testDeleteLock() { String lockName = "test-lock"; - OmsLockDO lockDO = new OmsLockDO(lockName, NetUtils.getLocalHost(), 10000L); + OmsLockDO lockDO = new OmsLockDO(lockName, NetUtils.getLocalHost4Test(), 10000L); omsLockRepository.save(lockDO); omsLockRepository.deleteByLockName(lockName); } diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/BroadcastProcessorDemo.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/BroadcastProcessorDemo.java index 7515cb51..197a3430 100644 --- a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/BroadcastProcessorDemo.java +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/BroadcastProcessorDemo.java @@ -24,7 +24,7 @@ public class BroadcastProcessorDemo implements BroadcastProcessor { @Override public ProcessResult preProcess(TaskContext context) { System.out.println("===== BroadcastProcessorDemo#preProcess ======"); - context.getOmsLogger().info("BroadcastProcessorDemo#preProcess, current host: {}", NetUtils.getLocalHost()); + context.getOmsLogger().info("BroadcastProcessorDemo#preProcess, current host: {}", NetUtils.getLocalHost4Test()); if ("rootFailed".equals(context.getJobParams())) { return new ProcessResult(false, "console need failed"); } else { @@ -36,7 +36,7 @@ public class BroadcastProcessorDemo implements BroadcastProcessor { public ProcessResult process(TaskContext taskContext) throws Exception { OmsLogger logger = taskContext.getOmsLogger(); System.out.println("===== BroadcastProcessorDemo#process ======"); - logger.info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost()); + logger.info("BroadcastProcessorDemo#process, current host: {}", NetUtils.getLocalHost4Test()); long sleepTime = 1000; try { sleepTime = Long.parseLong(taskContext.getJobParams()); @@ -50,7 +50,7 @@ public class BroadcastProcessorDemo implements BroadcastProcessor { @Override public ProcessResult postProcess(TaskContext context, List taskResults) { System.out.println("===== BroadcastProcessorDemo#postProcess ======"); - context.getOmsLogger().info("BroadcastProcessorDemo#postProcess, current host: {}, taskResult: {}", NetUtils.getLocalHost(), taskResults); + context.getOmsLogger().info("BroadcastProcessorDemo#postProcess, current host: {}, taskResult: {}", NetUtils.getLocalHost4Test(), taskResults); return new ProcessResult(true, "success"); } } diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/test/CommonTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/test/CommonTest.java index 080bdf74..31ce58a0 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/test/CommonTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/test/CommonTest.java @@ -33,7 +33,7 @@ public class CommonTest { TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq(); - req.setTaskTrackerAddress(NetUtils.getLocalHost() + ":27777"); + req.setTaskTrackerAddress(NetUtils.getLocalHost4Test() + ":27777"); req.setInstanceInfo(instanceInfo); req.setTaskId("0"); diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java b/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java index ba5e67a5..52357f3e 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/test/PersistenceServiceTest.java @@ -40,7 +40,7 @@ public class PersistenceServiceTest { task.setFailedCnt(0); task.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); task.setTaskName("ROOT_TASK"); - task.setAddress(NetUtils.getLocalHost()); + task.setAddress(NetUtils.getLocalHost4Test()); task.setLastModifiedTime(System.currentTimeMillis()); task.setCreatedTime(System.currentTimeMillis()); task.setLastReportTime(System.currentTimeMillis()); @@ -70,7 +70,7 @@ public class PersistenceServiceTest { task.setFailedCnt(0); task.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); task.setTaskName("ROOT_TASK"); - task.setAddress(NetUtils.getLocalHost()); + task.setAddress(NetUtils.getLocalHost4Test()); task.setLastModifiedTime(System.currentTimeMillis()); task.setCreatedTime(System.currentTimeMillis()); task.setLastReportTime(System.currentTimeMillis()); @@ -93,14 +93,14 @@ public class PersistenceServiceTest { @Test public void testUpdateLostTasks() throws Exception { Thread.sleep(1000); - boolean success = taskPersistenceService.updateLostTasks(10086L, Lists.newArrayList(NetUtils.getLocalHost()), true); + boolean success = taskPersistenceService.updateLostTasks(10086L, Lists.newArrayList(NetUtils.getLocalHost4Test()), true); System.out.println("updateLostTasks: " + success); } @Test public void testGetAllUnFinishedTaskByAddress() throws Exception { System.out.println("=============== testGetAllUnFinishedTaskByAddress ==============="); - List res = taskPersistenceService.getAllUnFinishedTaskByAddress(10086L, NetUtils.getLocalHost()); + List res = taskPersistenceService.getAllUnFinishedTaskByAddress(10086L, NetUtils.getLocalHost4Test()); System.out.println(res); } diff --git a/powerjob-worker/src/test/java/tech/powerjob/worker/test/TestUtils.java b/powerjob-worker/src/test/java/tech/powerjob/worker/test/TestUtils.java index 8298f433..5d90a02e 100644 --- a/powerjob-worker/src/test/java/tech/powerjob/worker/test/TestUtils.java +++ b/powerjob-worker/src/test/java/tech/powerjob/worker/test/TestUtils.java @@ -21,7 +21,7 @@ public class TestUtils { req.setJobId(1L); req.setInstanceId(10086L); - req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT)); + req.setAllWorkerAddress(Lists.newArrayList(NetUtils.getLocalHost4Test() + ":" + RemoteConstant.DEFAULT_WORKER_PORT)); req.setJobParams("JobParams"); req.setInstanceParams("InstanceParams"); From b29e265e42d58d973fe4398018918d48091010cd Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Feb 2024 19:41:40 +0800 Subject: [PATCH 14/17] feat: Optimizing IP acquisition logic with PingPongSocketServer #762 --- .../tech/powerjob/common/utils/NetUtils.java | 46 +++++++++-- .../common/utils/net/PingPongServer.java | 14 ++++ .../utils/net/PingPongSocketServer.java | 57 ++++++++++++++ .../common/utils/net/PingPongUtils.java | 53 +++++++++++++ .../utils/net/PingPongSocketServerTest.java | 31 ++++++++ .../web/controller/ServerController.java | 13 +++- .../tech/powerjob/worker/PowerJobWorker.java | 9 ++- .../worker/common/utils/WorkerNetUtils.java | 76 +++++++++++++++++++ 8 files changed, 288 insertions(+), 11 deletions(-) create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongServer.java create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongUtils.java create mode 100644 powerjob-common/src/test/java/tech/powerjob/common/utils/net/PingPongSocketServerTest.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/WorkerNetUtils.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java index a17370d8..b6cc0e14 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/NetUtils.java @@ -77,6 +77,10 @@ public class NetUtils { * @return 本机 IP 地址 */ public static String getLocalHost() { + return getLocalHostWithNetworkInterfaceChecker(null); + } + + public static String getLocalHostWithNetworkInterfaceChecker(NetworkInterfaceChecker networkInterfaceChecker) { if (HOST_ADDRESS != null) { return HOST_ADDRESS; } @@ -87,7 +91,7 @@ public class NetUtils { return HOST_ADDRESS = addressFromJVM; } - InetAddress address = getLocalAddress(); + InetAddress address = getLocalAddress(networkInterfaceChecker); if (address != null) { return HOST_ADDRESS = address.getHostAddress(); } @@ -107,19 +111,19 @@ public class NetUtils { * * @return first valid local IP */ - public static InetAddress getLocalAddress() { + public static InetAddress getLocalAddress(NetworkInterfaceChecker networkInterfaceChecker) { if (LOCAL_ADDRESS != null) { return LOCAL_ADDRESS; } - InetAddress localAddress = getLocalAddress0(); + InetAddress localAddress = getLocalAddress0(networkInterfaceChecker); LOCAL_ADDRESS = localAddress; return localAddress; } - private static InetAddress getLocalAddress0() { + private static InetAddress getLocalAddress0(NetworkInterfaceChecker networkInterfaceChecker) { // @since 2.7.6, choose the {@link NetworkInterface} first try { - InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface()); + InetAddress addressOp = getFirstReachableInetAddress( findNetworkInterface(networkInterfaceChecker)); if (addressOp != null) { return addressOp; } @@ -169,7 +173,7 @@ public class NetUtils { * @return If no {@link NetworkInterface} is available , return null * @since 2.7.6 */ - public static NetworkInterface findNetworkInterface() { + public static NetworkInterface findNetworkInterface(NetworkInterfaceChecker networkInterfaceChecker) { List validNetworkInterfaces = emptyList(); try { @@ -184,7 +188,11 @@ public class NetUtils { // Try to find the preferred one for (NetworkInterface networkInterface : validNetworkInterfaces) { if (isPreferredNetworkInterface(networkInterface)) { - log.info("[Net] use preferred network interface: {}", networkInterface.getDisplayName()); + log.info("[Net] use preferred network interface: {}", networkInterface); + return networkInterface; + } + if (isPassedCheckNetworkInterface(networkInterface, networkInterfaceChecker)) { + log.info("[Net] use PassedCheckNetworkInterface: {}", networkInterface); return networkInterface; } } @@ -199,6 +207,25 @@ public class NetUtils { return first(validNetworkInterfaces); } + /** + * 通过用户方法判断是否为目标网卡 + * @param networkInterface networkInterface + * @param networkInterfaceChecker 判断方法 + * @return true or false + */ + static boolean isPassedCheckNetworkInterface(NetworkInterface networkInterface, NetworkInterfaceChecker networkInterfaceChecker) { + if (networkInterfaceChecker == null) { + return false; + } + log.info("[Net] try to choose NetworkInterface by NetworkInterfaceChecker, current NetworkInterface: {}", networkInterface); + try { + return networkInterfaceChecker.ok(networkInterface, getFirstReachableInetAddress(networkInterface)); + } catch (Exception e) { + log.warn("[Net] isPassedCheckerNetworkInterface failed, current networkInterface: {}", networkInterface, e); + } + return false; + } + private static Optional toValidAddress(InetAddress address) { if (address instanceof Inet6Address) { Inet6Address v6Address = (Inet6Address) address; @@ -352,4 +379,9 @@ public class NetUtils { } return false; } + + @FunctionalInterface + public interface NetworkInterfaceChecker { + boolean ok(NetworkInterface networkInterface, InetAddress inetAddress); + } } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongServer.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongServer.java new file mode 100644 index 00000000..53e4f0c3 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongServer.java @@ -0,0 +1,14 @@ +package tech.powerjob.common.utils.net; + +import java.io.Closeable; + +/** + * socket 服务器,用于进行连通性测试 + * + * @author tjq + * @since 2024/2/8 + */ +public interface PingPongServer extends Closeable { + + void initialize(int port) throws Exception; +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java new file mode 100644 index 00000000..21090bfa --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java @@ -0,0 +1,57 @@ +package tech.powerjob.common.utils.net; + +import lombok.extern.slf4j.Slf4j; +import tech.powerjob.common.utils.CommonUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; + +/** + * 简易服务器 + * + * @author tjq + * @since 2024/2/8 + */ +@Slf4j +public class PingPongSocketServer implements PingPongServer { + + private Thread thread; + + private ServerSocket serverSocket; + + private transient boolean terminated = false; + + @Override + public void initialize(int port) throws Exception{ + serverSocket = new ServerSocket(port); + + thread = new Thread(() -> { + while (true) { + if (terminated) { + return; + } + // 接收连接,如果没有连接,accept() 方法会阻塞 + try (Socket socket = serverSocket.accept();OutputStream outputStream = socket.getOutputStream();) { + outputStream.write(PingPongUtils.PONG.getBytes(StandardCharsets.UTF_8)); + outputStream.flush(); + } catch (Exception e) { + if (!terminated) { + log.warn("[PingPongSocketServer] process accepted socket failed!", e); + } + } + } + }, "PingPongSocketServer-Thread"); + + thread.start(); + } + + @Override + public void close() throws IOException { + terminated = true; + CommonUtils.executeIgnoreException(() -> serverSocket.close()); + thread.interrupt(); + } +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongUtils.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongUtils.java new file mode 100644 index 00000000..ee547254 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongUtils.java @@ -0,0 +1,53 @@ +package tech.powerjob.common.utils.net; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.exception.ExceptionUtils; + +import java.io.*; +import java.net.Socket; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; + +/** + * socket 连通性助手 + * + * @author tjq + * @since 2024/2/8 + */ +@Slf4j +public class PingPongUtils { + + static final String PING = "ping"; + static final String PONG = "pong"; + + /** + * 验证目标 IP 和 端口的连通性 + * @param targetIp 目标 IP + * @param targetPort 目标端口 + * @return true or false + */ + public static boolean checkConnectivity(String targetIp, int targetPort) { + + try (Socket s = new Socket(targetIp, targetPort);InputStream is = s.getInputStream();OutputStream os = s.getOutputStream();BufferedReader br = new BufferedReader(new InputStreamReader(is))) { + + // 发送 PING 请求 + os.write(PING.getBytes(StandardCharsets.UTF_8)); + os.flush(); + + //读取服务器返回的消息 + String content = br.readLine(); + + if (PONG.equalsIgnoreCase(content)) { + return true; + } + } catch (UnknownHostException e) { + log.warn("[SocketConnectivityUtils] unknown host: {}:{}", targetIp, targetPort); + } catch (IOException e) { + log.warn("[SocketConnectivityUtils] IOException: {}:{}, msg: {}", targetIp, targetPort, ExceptionUtils.getMessage(e)); + } catch (Exception e) { + log.error("[SocketConnectivityUtils] unknown exception for check ip: {}:{}", targetIp, targetPort, e); + } + + return false; + } +} diff --git a/powerjob-common/src/test/java/tech/powerjob/common/utils/net/PingPongSocketServerTest.java b/powerjob-common/src/test/java/tech/powerjob/common/utils/net/PingPongSocketServerTest.java new file mode 100644 index 00000000..0d559111 --- /dev/null +++ b/powerjob-common/src/test/java/tech/powerjob/common/utils/net/PingPongSocketServerTest.java @@ -0,0 +1,31 @@ +package tech.powerjob.common.utils.net; + +import org.junit.jupiter.api.Test; +import tech.powerjob.common.utils.NetUtils; + +/** + * desc + * + * @author tjq + * @since 2024/2/8 + */ +class PingPongSocketServerTest { + + @Test + void test() throws Exception { + + int port = 8877; + + PingPongSocketServer pingPongSocketServer = new PingPongSocketServer(); + pingPongSocketServer.initialize(port); + + System.out.println("[PingPongSocketServerTest] finished initialize"); + + assert PingPongUtils.checkConnectivity(NetUtils.getLocalHost(), port); + + assert !PingPongUtils.checkConnectivity(NetUtils.getLocalHost(), port + 1); + + pingPongSocketServer.close(); + System.out.println("[PingPongSocketServerTest] finished close"); + } +} \ No newline at end of file diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java index 60eaebd6..c476e162 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/ServerController.java @@ -11,7 +11,7 @@ import tech.powerjob.common.model.WorkerAppInfo; import tech.powerjob.common.request.ServerDiscoveryRequest; import tech.powerjob.common.response.ResultDTO; import tech.powerjob.common.utils.CommonUtils; -import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.common.utils.net.PingPongUtils; import tech.powerjob.server.common.aware.ServerInfoAware; import tech.powerjob.server.common.module.ServerInfo; import tech.powerjob.server.persistence.remote.model.AppInfoDO; @@ -66,6 +66,17 @@ public class ServerController implements ServerInfoAware { return ResultDTO.success(serverElectionService.elect(request)); } + @GetMapping("/checkConnectivity") + public ResultDTO checkConnectivity(String targetIp, Integer targetPort) { + try { + boolean ret = PingPongUtils.checkConnectivity(targetIp, targetPort); + return ResultDTO.success(ret); + } catch (Throwable t) { + return ResultDTO.failed(t); + } + } + + @GetMapping("/hello") public ResultDTO ping(@RequestParam(required = false) boolean debug) { JSONObject res = new JSONObject(); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java index eceac34c..404da4ca 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/PowerJobWorker.java @@ -6,7 +6,6 @@ import lombok.extern.slf4j.Slf4j; import tech.powerjob.common.PowerJobDKey; import tech.powerjob.common.model.WorkerAppInfo; import tech.powerjob.common.utils.CommonUtils; -import tech.powerjob.common.utils.NetUtils; import tech.powerjob.common.utils.PropertyUtils; import tech.powerjob.remote.framework.base.Address; import tech.powerjob.remote.framework.base.ServerType; @@ -24,6 +23,7 @@ import tech.powerjob.worker.background.discovery.ServerDiscoveryService; import tech.powerjob.worker.common.PowerBannerPrinter; import tech.powerjob.worker.common.PowerJobWorkerConfig; import tech.powerjob.worker.common.WorkerRuntime; +import tech.powerjob.worker.common.utils.WorkerNetUtils; import tech.powerjob.worker.core.executor.ExecutorManager; import tech.powerjob.worker.extension.processor.ProcessorFactory; import tech.powerjob.worker.persistence.TaskPersistenceService; @@ -74,13 +74,16 @@ public class PowerJobWorker { try { PowerBannerPrinter.print(); + + // 在发第一个请求之前,完成真正 IP 的解析 + int localBindPort = config.getPort(); + String localBindIp = WorkerNetUtils.parseLocalBindIp(localBindPort, config.getServerAddress()); + // 校验 appName WorkerAppInfo appInfo = serverDiscoveryService.assertApp(); workerRuntime.setAppInfo(appInfo); // 初始化网络数据,区别对待上报地址和本机绑定地址(对外统一使用上报地址) - String localBindIp = NetUtils.getLocalHost(); - int localBindPort = config.getPort(); String externalIp = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_ADDRESS, localBindIp); String externalPort = PropertyUtils.readProperty(PowerJobDKey.NT_EXTERNAL_PORT, String.valueOf(localBindPort)); log.info("[PowerJobWorker] [ADDRESS_INFO] localBindIp: {}, localBindPort: {}; externalIp: {}, externalPort: {}", localBindIp, localBindPort, externalIp, externalPort); diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/WorkerNetUtils.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/WorkerNetUtils.java new file mode 100644 index 00000000..8d9d0a31 --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/WorkerNetUtils.java @@ -0,0 +1,76 @@ +package tech.powerjob.worker.common.utils; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import tech.powerjob.common.response.ResultDTO; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.common.utils.HttpUtils; +import tech.powerjob.common.utils.NetUtils; +import tech.powerjob.common.utils.net.PingPongServer; +import tech.powerjob.common.utils.net.PingPongSocketServer; + +import java.util.List; + +/** + * PowerJob Worker 专用的网络工具类 + * + * @author tjq + * @since 2024/2/8 + */ +@Slf4j +public class WorkerNetUtils { + + private static final String SERVER_CONNECTIVITY_CHECK_URL_PATTERN = "http://%s/server/checkConnectivity?targetIp=%s&targetPort=%d"; + + /** + * 多网卡情况下,解析可与 server 通讯的本地 IP 地址 + * @param port 目标端口 + * @param serverAddress server 服务地址 + * @return 本机IP + */ + public static String parseLocalBindIp(int port, List serverAddress) { + PingPongServer pingPongServer = null; + + try { + pingPongServer = new PingPongSocketServer(); + pingPongServer.initialize(port); + log.info("[WorkerNetUtils] initialize PingPongSocketServer successfully~"); + } catch (Exception e) { + log.warn("[WorkerNetUtils] PingPongSocketServer failed to start, which may result in an incorrectly bound IP, please pay attention to the initialize log.", e); + } + + String localHostWithNetworkInterfaceChecker = NetUtils.getLocalHostWithNetworkInterfaceChecker(((networkInterface, inetAddress) -> { + + if (inetAddress == null) { + return false; + } + + String workerIp = inetAddress.getHostAddress(); + for (String address : serverAddress) { + String url = String.format(SERVER_CONNECTIVITY_CHECK_URL_PATTERN, address, workerIp, port); + try { + String resp = HttpUtils.get(url); + log.info("[WorkerNetUtils] check connectivity by url[{}], response: {}", url, resp); + if (StringUtils.isNotEmpty(resp)) { + ResultDTO resultDTO = JsonUtils.parseObject(resp, ResultDTO.class); + return Boolean.TRUE.toString().equalsIgnoreCase(String.valueOf(resultDTO.getData())); + } + } catch (Exception ignore) { + } + } + return false; + })); + + if (pingPongServer != null) { + try { + pingPongServer.close(); + log.info("[WorkerNetUtils] close PingPongSocketServer successfully~"); + } catch (Exception e) { + log.warn("[WorkerNetUtils] close PingPongSocketServer failed!", e); + } + } + + return localHostWithNetworkInterfaceChecker; + } + +} From 01d7247efae923c31e280bb01139b9b4576352c9 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Feb 2024 19:45:56 +0800 Subject: [PATCH 15/17] chore: Upgrade project version to 4.3.7 --- pom.xml | 2 +- powerjob-client/pom.xml | 6 +++--- powerjob-common/pom.xml | 4 ++-- .../common/utils/net/PingPongSocketServer.java | 2 +- powerjob-official-processors/pom.xml | 6 +++--- powerjob-remote/pom.xml | 2 +- powerjob-remote/powerjob-remote-benchmark/pom.xml | 6 +++--- powerjob-remote/powerjob-remote-framework/pom.xml | 6 +++--- powerjob-remote/powerjob-remote-impl-akka/pom.xml | 6 +++--- powerjob-remote/powerjob-remote-impl-http/pom.xml | 6 +++--- powerjob-server/pom.xml | 10 +++++----- powerjob-server/powerjob-server-common/pom.xml | 2 +- powerjob-server/powerjob-server-core/pom.xml | 2 +- powerjob-server/powerjob-server-extension/pom.xml | 2 +- powerjob-server/powerjob-server-migrate/pom.xml | 2 +- powerjob-server/powerjob-server-monitor/pom.xml | 2 +- powerjob-server/powerjob-server-persistence/pom.xml | 2 +- powerjob-server/powerjob-server-remote/pom.xml | 2 +- powerjob-server/powerjob-server-starter/pom.xml | 2 +- powerjob-worker-agent/pom.xml | 8 ++++---- powerjob-worker-samples/pom.xml | 8 ++++---- powerjob-worker-spring-boot-starter/pom.xml | 6 +++--- powerjob-worker/pom.xml | 12 ++++++------ 23 files changed, 53 insertions(+), 53 deletions(-) diff --git a/pom.xml b/pom.xml index 790d7fc0..822a9cb8 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ tech.powerjob powerjob - 4.3.6 + 4.3.7 pom powerjob http://www.powerjob.tech diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index 10b88c93..cf8ab18b 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -5,18 +5,18 @@ powerjob tech.powerjob - 4.3.6 + 4.3.7 4.0.0 powerjob-client - 4.3.6 + 4.3.7 jar 5.9.1 1.2.83 - 4.3.6 + 4.3.7 3.2.4 diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index d6640019..893a08d8 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -5,12 +5,12 @@ powerjob tech.powerjob - 4.3.6 + 4.3.7 4.0.0 powerjob-common - 4.3.6 + 4.3.7 jar diff --git a/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java index 21090bfa..d2a837c8 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/utils/net/PingPongSocketServer.java @@ -22,7 +22,7 @@ public class PingPongSocketServer implements PingPongServer { private ServerSocket serverSocket; - private transient boolean terminated = false; + private volatile boolean terminated = false; @Override public void initialize(int port) throws Exception{ diff --git a/powerjob-official-processors/pom.xml b/powerjob-official-processors/pom.xml index c92a6903..5de9dfdb 100644 --- a/powerjob-official-processors/pom.xml +++ b/powerjob-official-processors/pom.xml @@ -5,12 +5,12 @@ powerjob tech.powerjob - 4.3.6 + 4.3.7 4.0.0 powerjob-official-processors - 4.3.6 + 4.3.7 jar @@ -20,7 +20,7 @@ 5.9.1 1.2.13 - 4.3.6 + 4.3.7 2.2.224 8.0.28 5.3.31 diff --git a/powerjob-remote/pom.xml b/powerjob-remote/pom.xml index ede2ca6f..3b339c5d 100644 --- a/powerjob-remote/pom.xml +++ b/powerjob-remote/pom.xml @@ -5,7 +5,7 @@ powerjob tech.powerjob - 4.3.6 + 4.3.7 4.0.0 pom diff --git a/powerjob-remote/powerjob-remote-benchmark/pom.xml b/powerjob-remote/powerjob-remote-benchmark/pom.xml index d91e2507..fa282187 100644 --- a/powerjob-remote/powerjob-remote-benchmark/pom.xml +++ b/powerjob-remote/powerjob-remote-benchmark/pom.xml @@ -5,7 +5,7 @@ powerjob-remote tech.powerjob - 4.3.6 + 4.3.7 4.0.0 @@ -21,8 +21,8 @@ 1.2.13 2.7.18 - 4.3.6 - 4.3.6 + 4.3.7 + 4.3.7 3.9.0 4.2.9 diff --git a/powerjob-remote/powerjob-remote-framework/pom.xml b/powerjob-remote/powerjob-remote-framework/pom.xml index e63f358a..883f27ba 100644 --- a/powerjob-remote/powerjob-remote-framework/pom.xml +++ b/powerjob-remote/powerjob-remote-framework/pom.xml @@ -5,11 +5,11 @@ powerjob-remote tech.powerjob - 4.3.6 + 4.3.7 4.0.0 - 4.3.6 + 4.3.7 powerjob-remote-framework @@ -17,7 +17,7 @@ 8 UTF-8 - 4.3.6 + 4.3.7 0.10.2 diff --git a/powerjob-remote/powerjob-remote-impl-akka/pom.xml b/powerjob-remote/powerjob-remote-impl-akka/pom.xml index 1ee9b4f4..ccbd9732 100644 --- a/powerjob-remote/powerjob-remote-impl-akka/pom.xml +++ b/powerjob-remote/powerjob-remote-impl-akka/pom.xml @@ -5,19 +5,19 @@ powerjob-remote tech.powerjob - 4.3.6 + 4.3.7 4.0.0 powerjob-remote-impl-akka - 4.3.6 + 4.3.7 8 8 UTF-8 - 4.3.6 + 4.3.7 2.6.13 diff --git a/powerjob-remote/powerjob-remote-impl-http/pom.xml b/powerjob-remote/powerjob-remote-impl-http/pom.xml index 84577fad..8ecb4baa 100644 --- a/powerjob-remote/powerjob-remote-impl-http/pom.xml +++ b/powerjob-remote/powerjob-remote-impl-http/pom.xml @@ -5,12 +5,12 @@ powerjob-remote tech.powerjob - 4.3.6 + 4.3.7 4.0.0 powerjob-remote-impl-http - 4.3.6 + 4.3.7 8 @@ -18,7 +18,7 @@ UTF-8 4.3.7 - 4.3.6 + 4.3.7 diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index dea2964a..5eaec38d 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -5,12 +5,12 @@ powerjob tech.powerjob - 4.3.6 + 4.3.7 4.0.0 powerjob-server - 4.3.6 + 4.3.7 pom @@ -50,9 +50,9 @@ 3.0.10 9.2.1 - 4.3.6 - 4.3.6 - 4.3.6 + 4.3.7 + 4.3.7 + 4.3.7 1.6.14 3.17.1 8.5.2 diff --git a/powerjob-server/powerjob-server-common/pom.xml b/powerjob-server/powerjob-server-common/pom.xml index 34a5284d..7391b562 100644 --- a/powerjob-server/powerjob-server-common/pom.xml +++ b/powerjob-server/powerjob-server-common/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.6 + 4.3.7 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-core/pom.xml b/powerjob-server/powerjob-server-core/pom.xml index 33808bc4..20c19bbd 100644 --- a/powerjob-server/powerjob-server-core/pom.xml +++ b/powerjob-server/powerjob-server-core/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.6 + 4.3.7 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-extension/pom.xml b/powerjob-server/powerjob-server-extension/pom.xml index 7998e6bd..20c39dc1 100644 --- a/powerjob-server/powerjob-server-extension/pom.xml +++ b/powerjob-server/powerjob-server-extension/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.6 + 4.3.7 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-migrate/pom.xml b/powerjob-server/powerjob-server-migrate/pom.xml index 669eda19..ecb09e50 100644 --- a/powerjob-server/powerjob-server-migrate/pom.xml +++ b/powerjob-server/powerjob-server-migrate/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.6 + 4.3.7 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-monitor/pom.xml b/powerjob-server/powerjob-server-monitor/pom.xml index 587a604a..4b0b4d11 100644 --- a/powerjob-server/powerjob-server-monitor/pom.xml +++ b/powerjob-server/powerjob-server-monitor/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.6 + 4.3.7 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-persistence/pom.xml b/powerjob-server/powerjob-server-persistence/pom.xml index 59c5f63e..b8bdfa56 100644 --- a/powerjob-server/powerjob-server-persistence/pom.xml +++ b/powerjob-server/powerjob-server-persistence/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.6 + 4.3.7 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-remote/pom.xml b/powerjob-server/powerjob-server-remote/pom.xml index fd9c141e..cf491b41 100644 --- a/powerjob-server/powerjob-server-remote/pom.xml +++ b/powerjob-server/powerjob-server-remote/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.6 + 4.3.7 ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-starter/pom.xml b/powerjob-server/powerjob-server-starter/pom.xml index df747a3d..9ac1e3cf 100644 --- a/powerjob-server/powerjob-server-starter/pom.xml +++ b/powerjob-server/powerjob-server-starter/pom.xml @@ -5,7 +5,7 @@ powerjob-server tech.powerjob - 4.3.6 + 4.3.7 ../pom.xml 4.0.0 diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index dc1ff8f5..4418cfcb 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -5,24 +5,24 @@ powerjob tech.powerjob - 4.3.6 + 4.3.7 4.0.0 powerjob-worker-agent - 4.3.6 + 4.3.7 jar - 4.3.6 + 4.3.7 1.2.13 4.3.2 5.3.31 2.3.4.RELEASE - 4.3.6 + 4.3.7 8.0.28 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index 5160dbbc..a8fe3dea 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -5,18 +5,18 @@ powerjob tech.powerjob - 4.3.6 + 4.3.7 4.0.0 powerjob-worker-samples - 4.3.6 + 4.3.7 2.7.18 - 4.3.6 + 4.3.7 1.2.83 - 4.3.6 + 4.3.7 true diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index 9903bde8..881f4205 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -5,16 +5,16 @@ powerjob tech.powerjob - 4.3.6 + 4.3.7 4.0.0 powerjob-worker-spring-boot-starter - 4.3.6 + 4.3.7 jar - 4.3.6 + 4.3.7 2.7.18 diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index f92013cf..fc8e2370 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -5,12 +5,12 @@ powerjob tech.powerjob - 4.3.6 + 4.3.7 4.0.0 powerjob-worker - 4.3.6 + 4.3.7 jar @@ -21,10 +21,10 @@ 1.2.13 - 4.3.6 - 4.3.6 - 4.3.6 - 4.3.6 + 4.3.7 + 4.3.7 + 4.3.7 + 4.3.7 From 599d710e279f6beded841243625b3df6352fc511 Mon Sep 17 00:00:00 2001 From: tjq Date: Thu, 8 Feb 2024 20:14:35 +0800 Subject: [PATCH 16/17] refactor: rename RunnableAndCatch to SafeRunnable --- .../java/tech/powerjob/common/enhance/SafeRunnable.java | 7 ++++--- .../tech/powerjob/common/enhance/SafeRunnableWrapper.java | 8 ++++---- .../tech/powerjob/worker/background/OmsLogHandler.java | 3 ++- .../powerjob/worker/background/WorkerHealthReporter.java | 3 ++- .../worker/core/tracker/processor/ProcessorTracker.java | 4 ++-- .../worker/core/tracker/task/heavy/HeavyTaskTracker.java | 6 +++--- .../worker/core/tracker/task/light/LightTaskTracker.java | 8 ++++---- 7 files changed, 21 insertions(+), 18 deletions(-) rename powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableAndCatch.java => powerjob-common/src/main/java/tech/powerjob/common/enhance/SafeRunnable.java (67%) rename powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableWrapper.java => powerjob-common/src/main/java/tech/powerjob/common/enhance/SafeRunnableWrapper.java (70%) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableAndCatch.java b/powerjob-common/src/main/java/tech/powerjob/common/enhance/SafeRunnable.java similarity index 67% rename from powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableAndCatch.java rename to powerjob-common/src/main/java/tech/powerjob/common/enhance/SafeRunnable.java index df45ebfe..73075995 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableAndCatch.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/enhance/SafeRunnable.java @@ -1,23 +1,24 @@ -package tech.powerjob.worker.background; +package tech.powerjob.common.enhance; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ScheduledExecutorService; /** + * 安全的 runnable,可防止因抛出异常导致周期性任务终止 * 使用 {@link ScheduledExecutorService} 执行任务时,推荐继承此类捕获并打印异常,避免因为抛出异常导致周期性任务终止 * * @author songyinyin * @since 2023/9/20 15:52 */ @Slf4j -public abstract class RunnableAndCatch implements Runnable{ +public abstract class SafeRunnable implements Runnable{ @Override public void run() { try { run0(); } catch (Exception e) { - log.error("[RunnableAndCatch] run failed", e); + log.error("[SafeRunnable] run failed", e); } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableWrapper.java b/powerjob-common/src/main/java/tech/powerjob/common/enhance/SafeRunnableWrapper.java similarity index 70% rename from powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableWrapper.java rename to powerjob-common/src/main/java/tech/powerjob/common/enhance/SafeRunnableWrapper.java index 0622d382..2c847507 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/RunnableWrapper.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/enhance/SafeRunnableWrapper.java @@ -1,4 +1,4 @@ -package tech.powerjob.worker.background; +package tech.powerjob.common.enhance; import lombok.extern.slf4j.Slf4j; @@ -11,11 +11,11 @@ import java.util.concurrent.ScheduledExecutorService; * @since 2023/9/20 16:04 */ @Slf4j -public class RunnableWrapper implements Runnable { +public class SafeRunnableWrapper implements Runnable { private final Runnable runnable; - public RunnableWrapper(Runnable runnable) { + public SafeRunnableWrapper(Runnable runnable) { this.runnable = runnable; } @@ -24,7 +24,7 @@ public class RunnableWrapper implements Runnable { try { runnable.run(); } catch (Exception e) { - log.error("[RunnableWrapper] run failed", e); + log.error("[SafeRunnableWrapper] run failed", e); } } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java index 89c56afc..07cf3bab 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java @@ -1,5 +1,6 @@ package tech.powerjob.worker.background; +import tech.powerjob.common.enhance.SafeRunnable; import tech.powerjob.common.enums.LogLevel; import tech.powerjob.common.model.InstanceLogContent; import tech.powerjob.common.request.WorkerLogReportReq; @@ -69,7 +70,7 @@ public class OmsLogHandler { - private class LogSubmitter extends RunnableAndCatch { + private class LogSubmitter extends SafeRunnable { @Override public void run0() { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java index 295da2a3..2d947ab5 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/WorkerHealthReporter.java @@ -3,6 +3,7 @@ package tech.powerjob.worker.background; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import tech.powerjob.common.enhance.SafeRunnable; import tech.powerjob.common.model.SystemMetrics; import tech.powerjob.common.request.WorkerHeartbeat; import tech.powerjob.worker.common.PowerJobWorkerVersion; @@ -22,7 +23,7 @@ import tech.powerjob.worker.core.tracker.manager.LightTaskTrackerManager; */ @Slf4j @RequiredArgsConstructor -public class WorkerHealthReporter extends RunnableAndCatch { +public class WorkerHealthReporter extends SafeRunnable { private final WorkerRuntime workerRuntime; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 4accd113..3d7650cd 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -9,7 +9,7 @@ import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; -import tech.powerjob.worker.background.RunnableAndCatch; +import tech.powerjob.common.enhance.SafeRunnable; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskStatus; import tech.powerjob.worker.common.utils.TransportUtils; @@ -238,7 +238,7 @@ public class ProcessorTracker { /** * 定时向 TaskTracker 汇报(携带任务执行信息的心跳) */ - private class CheckerAndReporter extends RunnableAndCatch { + private class CheckerAndReporter extends SafeRunnable { @Override @SuppressWarnings({"squid:S1066","squid:S3776"}) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java index d5b521dd..3d5db645 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java @@ -19,7 +19,7 @@ import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CollectionUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.common.utils.SegmentLock; -import tech.powerjob.worker.background.RunnableAndCatch; +import tech.powerjob.common.enhance.SafeRunnable; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; @@ -445,7 +445,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { /** * 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去 */ - protected class Dispatcher extends RunnableAndCatch { + protected class Dispatcher extends SafeRunnable { // 数据库查询限制,每次最多查询几个任务 private static final int DB_QUERY_LIMIT = 100; @@ -503,7 +503,7 @@ public abstract class HeavyTaskTracker extends TaskTracker { * 执行器动态上线(for 秒级任务和 MR 任务) * 原则:server 查询得到的 执行器状态不会干预 worker 自己维护的状态,即只做新增,不做任何修改 */ - protected class WorkerDetector extends RunnableAndCatch { + protected class WorkerDetector extends SafeRunnable { @Override public void run0() { diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java index 8f8a1819..b56eac16 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java @@ -9,7 +9,7 @@ import tech.powerjob.common.enums.InstanceStatus; import tech.powerjob.common.model.InstanceDetail; import tech.powerjob.common.request.ServerScheduleJobReq; import tech.powerjob.common.request.TaskTrackerReportInstanceStatusReq; -import tech.powerjob.worker.background.RunnableWrapper; +import tech.powerjob.common.enhance.SafeRunnableWrapper; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskConstant; import tech.powerjob.worker.common.constants.TaskStatus; @@ -94,14 +94,14 @@ public class LightTaskTracker extends TaskTracker { // 初始延迟加入随机值,避免在高并发场景下所有请求集中在一个时间段 long initDelay = RandomUtils.nextInt(5000, 10000); // 上报任务状态 - statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(new RunnableWrapper(this::checkAndReportStatus), initDelay, delay, TimeUnit.MILLISECONDS); + statusReportScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleWithFixedDelay(new SafeRunnableWrapper(this::checkAndReportStatus), initDelay, delay, TimeUnit.MILLISECONDS); // 超时控制 if (instanceInfo.getInstanceTimeoutMS() != Integer.MAX_VALUE) { if (instanceInfo.getInstanceTimeoutMS() < 1000L) { - timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new RunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS); + timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new SafeRunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), instanceInfo.getInstanceTimeoutMS() / 10, TimeUnit.MILLISECONDS); } else { // 执行时间超过 1 s 的任务,超时检测最小颗粒度为 1 s - timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new RunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS); + timeoutCheckScheduledFuture = workerRuntime.getExecutorManager().getLightweightTaskStatusCheckExecutor().scheduleAtFixedRate(new SafeRunnableWrapper(this::timeoutCheck), instanceInfo.getInstanceTimeoutMS(), 1000L, TimeUnit.MILLISECONDS); } } else { timeoutCheckScheduledFuture = null; From ea919b102fec32ae7247ec61da1412eaac8f962b Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 9 Feb 2024 11:13:05 +0800 Subject: [PATCH 17/17] docs: Happy New Year --- README.md | 4 ++++ README_zhCN.md | 4 ++++ .../worker/core/tracker/task/light/LightTaskTracker.java | 7 +------ 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index ab269bed..341fda31 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ # English | [简体中文](./README_zhCN.md) +

+🏮PowerJob 全体成员祝大家龙年腾飞,新的一年身体健康,万事如意,阖家欢乐,幸福安康!🏮 +

+

PowerJob

diff --git a/README_zhCN.md b/README_zhCN.md index 41820938..51f38e6a 100644 --- a/README_zhCN.md +++ b/README_zhCN.md @@ -1,5 +1,9 @@ # [English](./README.md) | 简体中文 +

+🏮PowerJob 全体成员祝大家龙年腾飞,新的一年身体健康,万事如意,阖家欢乐,幸福安康!🏮 +

+

PowerJob

diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java index b56eac16..79b58fbf 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java @@ -149,12 +149,7 @@ public class LightTaskTracker extends TaskTracker { } LightTaskTrackerManager.removeTaskTracker(instanceId); // 最后一列为总耗时(即占用资源的耗时,当前时间减去创建时间) - String msg = String.format("[TaskTracker-%s] remove TaskTracker,task status %s,start time:%s,end time:%s,real cost:%s,total time:%s", instanceId, status, taskStartTime, taskEndTime, taskEndTime != null ? taskEndTime - taskStartTime : "unknown", System.currentTimeMillis() - createTime); - if (TaskStatus.WORKER_PROCESS_SUCCESS.equals(status)) { - log.info(msg); - } else { - log.warn(msg); - } + log.info("[TaskTracker-{}] remove TaskTracker,task status {},start time:{},end time:{},real cost:{},total time:{}", instanceId, status, taskStartTime, taskEndTime, taskEndTime != null ? taskEndTime - taskStartTime : "unknown", System.currentTimeMillis() - createTime); } @Override