From dcd96fbe84072c3308dd51accbabdaaefc21375a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E5=85=AB?= Date: Sat, 18 Jul 2020 23:26:33 +0800 Subject: [PATCH] [dev] script processor support online log (support realtime output runtime info to console now) --- .../core/processor/built/PythonProcessor.java | 2 +- .../core/processor/built/ScriptProcessor.java | 27 +++++++++++++------ .../core/processor/built/ShellProcessor.java | 2 +- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/PythonProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/PythonProcessor.java index 317319c2..8973efad 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/PythonProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/PythonProcessor.java @@ -13,7 +13,7 @@ public class PythonProcessor extends ScriptProcessor { } @Override - protected String genScriptName(Long instanceId) { + protected String genScriptName() { return String.format("python_%d.py", instanceId); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java index 32e8120c..0672b8fa 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java @@ -4,6 +4,7 @@ import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; +import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -23,7 +24,7 @@ import java.util.concurrent.TimeUnit; @Slf4j public abstract class ScriptProcessor implements BasicProcessor { - private final Long instanceId; + protected final Long instanceId; // 脚本绝对路径 private final String scriptPath; private final long timeout; @@ -33,7 +34,7 @@ public abstract class ScriptProcessor implements BasicProcessor { public ScriptProcessor(Long instanceId, String processorInfo, long timeout) throws Exception { this.instanceId = instanceId; - this.scriptPath = OmsWorkerFileUtils.getScriptDir() + genScriptName(instanceId); + this.scriptPath = OmsWorkerFileUtils.getScriptDir() + genScriptName(); this.timeout = timeout; File script = new File(scriptPath); @@ -66,6 +67,10 @@ public abstract class ScriptProcessor implements BasicProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { + OmsLogger omsLogger = context.getOmsLogger(); + + omsLogger.info("SYSTEM===> ScriptProcessor start to process"); + // 1. 授权 ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) @@ -80,40 +85,46 @@ public abstract class ScriptProcessor implements BasicProcessor { // 为了代码优雅而牺牲那么一点点点点点点点点性能 // 从外部传入线程池总感觉怪怪的...内部创建嘛又要考虑考虑资源释放问题,想来想去还是直接创建算了。 - new Thread(() -> copyStream(process.getInputStream(), inputSB)).start(); - new Thread(() -> copyStream(process.getErrorStream(), errorSB)).start(); + new Thread(() -> copyStream(process.getInputStream(), inputSB, omsLogger)).start(); + new Thread(() -> copyStream(process.getErrorStream(), errorSB, omsLogger)).start(); try { boolean s = process.waitFor(timeout, TimeUnit.MILLISECONDS); if (!s) { + omsLogger.info("SYSTEM===> process timeout"); return new ProcessResult(false, "TIMEOUT"); } String result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString()); - log.debug("[ScriptProcessor] process result for instance(instanceId={}) is {}.", instanceId, result); + + omsLogger.info("SYSTEM===> ScriptProcessor finished process"); return new ProcessResult(true, result); }catch (InterruptedException ie) { + omsLogger.info("SYSTEM===> ScriptProcessor has been interrupted"); return new ProcessResult(false, "Interrupted"); } } - private void copyStream(InputStream is, StringBuilder sb) { + private void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger) { String line; try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { while ((line = br.readLine()) != null) { sb.append(line); + // 同步到在线日志 + omsLogger.info(line); } } catch (Exception e) { log.warn("[ScriptProcessor] copyStream failed.", e); + omsLogger.warn("[ScriptProcessor] copyStream failed.", e); + sb.append("Exception: ").append(e); } } /** * 生成脚本名称 - * @param instanceId 任务实例ID,作为文件名称(使用JobId会有更改不生效的问题) * @return 文件名称 */ - protected abstract String genScriptName(Long instanceId); + protected abstract String genScriptName(); /** * 获取运行命令(eg,shell返回 /bin/sh) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ShellProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ShellProcessor.java index 02478e0c..d712bd5c 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ShellProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ShellProcessor.java @@ -17,7 +17,7 @@ public class ShellProcessor extends ScriptProcessor { } @Override - protected String genScriptName(Long instanceId) { + protected String genScriptName() { return String.format("shell_%d.sh", instanceId); }