diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java index 768160e2..40d4dde1 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java @@ -19,6 +19,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Set; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; /** * 脚本处理器 @@ -78,17 +79,23 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { String result; final Charset charset = getCharset(); - try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) { + try { + InputStream is = process.getInputStream(); + InputStream es = process.getErrorStream(); - POOL.execute(() -> copyStream(is, inputBuilder, omsLogger, charset)); - POOL.execute(() -> copyStream(es, errorBuilder, omsLogger, charset)); + ForkJoinTask inputSubmit = POOL.submit(() -> copyStream(is, inputBuilder, omsLogger, charset)); + ForkJoinTask errorSubmit = POOL.submit(() -> copyStream(es, errorBuilder, omsLogger, charset)); success = process.waitFor() == 0; + // 阻塞等待日志读取 + inputSubmit.get(); + errorSubmit.get(); + } catch (InterruptedException ie) { omsLogger.info("[SYSTEM] ScriptProcessor has been interrupted"); } finally { - result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder.toString(), errorBuilder.toString()); + result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder, errorBuilder); } return new ProcessResult(success, result); } @@ -132,11 +139,11 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { return scriptPath; } - private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) { + private void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) { String line; try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) { while ((line = br.readLine()) != null) { - sb.append(line); + sb.append(line).append(System.lineSeparator()); // 同步到在线日志 omsLogger.info(line); } @@ -145,6 +152,13 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { omsLogger.warn("[SYSTEM] copyStream failed.", e); sb.append("Exception: ").append(e); + } finally { + try { + is.close(); + } catch (IOException e) { + log.warn("[ScriptProcessor] close stream failed.", e); + omsLogger.warn("[SYSTEM] close stream failed.", e); + } } }