From 93dadab83271faec63c7466ceb2eb1a9a4964bba Mon Sep 17 00:00:00 2001 From: songyinyin Date: Mon, 14 Aug 2023 14:12:39 +0800 Subject: [PATCH 1/2] fix: When debugging, ShellProcessor appears Java.io.IOException: Stream closed #682 --- .../impl/script/AbstractScriptProcessor.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java index 768160e2..40d4dde1 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java @@ -19,6 +19,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Set; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; /** * 脚本处理器 @@ -78,17 +79,23 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { String result; final Charset charset = getCharset(); - try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) { + try { + InputStream is = process.getInputStream(); + InputStream es = process.getErrorStream(); - POOL.execute(() -> copyStream(is, inputBuilder, omsLogger, charset)); - POOL.execute(() -> copyStream(es, errorBuilder, omsLogger, charset)); + ForkJoinTask inputSubmit = POOL.submit(() -> copyStream(is, inputBuilder, omsLogger, charset)); + ForkJoinTask errorSubmit = POOL.submit(() -> copyStream(es, errorBuilder, omsLogger, charset)); success = process.waitFor() == 0; + // 阻塞等待日志读取 + inputSubmit.get(); + errorSubmit.get(); + } catch (InterruptedException ie) { omsLogger.info("[SYSTEM] ScriptProcessor has been interrupted"); } finally { - result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder.toString(), errorBuilder.toString()); + result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder, errorBuilder); } return new ProcessResult(success, result); } @@ -132,11 +139,11 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { return scriptPath; } - private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) { + private void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) { String line; try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) { while ((line = br.readLine()) != null) { - sb.append(line); + sb.append(line).append(System.lineSeparator()); // 同步到在线日志 omsLogger.info(line); } @@ -145,6 +152,13 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { omsLogger.warn("[SYSTEM] copyStream failed.", e); sb.append("Exception: ").append(e); + } finally { + try { + is.close(); + } catch (IOException e) { + log.warn("[ScriptProcessor] close stream failed.", e); + omsLogger.warn("[SYSTEM] close stream failed.", e); + } } } From 3544f76aaa4fbb4dfb42b0259f2f114cb890e7f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E5=BF=97=E8=BE=89?= <1179307527@qq.com> Date: Thu, 24 Aug 2023 11:06:16 +0800 Subject: [PATCH 2/2] fix: Memory overflow caused by mounting cloud disks --- .../tech/powerjob/worker/common/utils/SystemInfoUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/SystemInfoUtils.java b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/SystemInfoUtils.java index 747b7d1b..c181002e 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/SystemInfoUtils.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/common/utils/SystemInfoUtils.java @@ -68,7 +68,8 @@ public class SystemInfoUtils { } metrics.setDiskUsed(bytes2GB(total - free)); - metrics.setDiskTotal(bytes2GB(total)); + // 防止内存溢出导致total为负数,导致找不到worker实例 + metrics.setDiskTotal(bytes2GB(total < 0 ? Long.MAX_VALUE >> 6 : total )); metrics.setDiskUsage(miniDouble(metrics.getDiskUsed() / metrics.getDiskTotal())); }