[dev] script processor support online log (support realtime output runtime info to console now)

This commit is contained in:
朱八 2020-07-18 23:26:33 +08:00
parent e860e74717
commit dcd96fbe84
3 changed files with 21 additions and 10 deletions

View File

@ -13,7 +13,7 @@ public class PythonProcessor extends ScriptProcessor {
} }
@Override @Override
protected String genScriptName(Long instanceId) { protected String genScriptName() {
return String.format("python_%d.py", instanceId); return String.format("python_%d.py", instanceId);
} }

View File

@ -4,6 +4,7 @@ import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
@ -23,7 +24,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public abstract class ScriptProcessor implements BasicProcessor { public abstract class ScriptProcessor implements BasicProcessor {
private final Long instanceId; protected final Long instanceId;
// 脚本绝对路径 // 脚本绝对路径
private final String scriptPath; private final String scriptPath;
private final long timeout; private final long timeout;
@ -33,7 +34,7 @@ public abstract class ScriptProcessor implements BasicProcessor {
public ScriptProcessor(Long instanceId, String processorInfo, long timeout) throws Exception { public ScriptProcessor(Long instanceId, String processorInfo, long timeout) throws Exception {
this.instanceId = instanceId; this.instanceId = instanceId;
this.scriptPath = OmsWorkerFileUtils.getScriptDir() + genScriptName(instanceId); this.scriptPath = OmsWorkerFileUtils.getScriptDir() + genScriptName();
this.timeout = timeout; this.timeout = timeout;
File script = new File(scriptPath); File script = new File(scriptPath);
@ -66,6 +67,10 @@ public abstract class ScriptProcessor implements BasicProcessor {
@Override @Override
public ProcessResult process(TaskContext context) throws Exception { public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("SYSTEM===> ScriptProcessor start to process");
// 1. 授权 // 1. 授权
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
// 等待返回这里不可能导致死锁shell产生大量数据可能导致死锁 // 等待返回这里不可能导致死锁shell产生大量数据可能导致死锁
@ -80,40 +85,46 @@ public abstract class ScriptProcessor implements BasicProcessor {
// 为了代码优雅而牺牲那么一点点点点点点点点性能 // 为了代码优雅而牺牲那么一点点点点点点点点性能
// 从外部传入线程池总感觉怪怪的...内部创建嘛又要考虑考虑资源释放问题想来想去还是直接创建算了 // 从外部传入线程池总感觉怪怪的...内部创建嘛又要考虑考虑资源释放问题想来想去还是直接创建算了
new Thread(() -> copyStream(process.getInputStream(), inputSB)).start(); new Thread(() -> copyStream(process.getInputStream(), inputSB, omsLogger)).start();
new Thread(() -> copyStream(process.getErrorStream(), errorSB)).start(); new Thread(() -> copyStream(process.getErrorStream(), errorSB, omsLogger)).start();
try { try {
boolean s = process.waitFor(timeout, TimeUnit.MILLISECONDS); boolean s = process.waitFor(timeout, TimeUnit.MILLISECONDS);
if (!s) { if (!s) {
omsLogger.info("SYSTEM===> process timeout");
return new ProcessResult(false, "TIMEOUT"); return new ProcessResult(false, "TIMEOUT");
} }
String result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString()); String result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString());
log.debug("[ScriptProcessor] process result for instance(instanceId={}) is {}.", instanceId, result);
omsLogger.info("SYSTEM===> ScriptProcessor finished process");
return new ProcessResult(true, result); return new ProcessResult(true, result);
}catch (InterruptedException ie) { }catch (InterruptedException ie) {
omsLogger.info("SYSTEM===> ScriptProcessor has been interrupted");
return new ProcessResult(false, "Interrupted"); return new ProcessResult(false, "Interrupted");
} }
} }
private void copyStream(InputStream is, StringBuilder sb) { private void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger) {
String line; String line;
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
while ((line = br.readLine()) != null) { while ((line = br.readLine()) != null) {
sb.append(line); sb.append(line);
// 同步到在线日志
omsLogger.info(line);
} }
} catch (Exception e) { } catch (Exception e) {
log.warn("[ScriptProcessor] copyStream failed.", e); log.warn("[ScriptProcessor] copyStream failed.", e);
omsLogger.warn("[ScriptProcessor] copyStream failed.", e);
sb.append("Exception: ").append(e); sb.append("Exception: ").append(e);
} }
} }
/** /**
* 生成脚本名称 * 生成脚本名称
* @param instanceId 任务实例ID作为文件名称使用JobId会有更改不生效的问题
* @return 文件名称 * @return 文件名称
*/ */
protected abstract String genScriptName(Long instanceId); protected abstract String genScriptName();
/** /**
* 获取运行命令egshell返回 /bin/sh * 获取运行命令egshell返回 /bin/sh

View File

@ -17,7 +17,7 @@ public class ShellProcessor extends ScriptProcessor {
} }
@Override @Override
protected String genScriptName(Long instanceId) { protected String genScriptName() {
return String.format("shell_%d.sh", instanceId); return String.format("shell_%d.sh", instanceId);
} }