From 4fccc816979fecec2b90b915aabd0ac8d761a916 Mon Sep 17 00:00:00 2001 From: fddc Date: Thu, 13 May 2021 11:02:42 +0800 Subject: [PATCH 01/21] =?UTF-8?q?=E9=9D=9Ewindows=E7=B3=BB=E7=BB=9F?= =?UTF-8?q?=E6=89=8D=E9=9C=80=E8=A6=81chmod?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/script/AbstractScriptProcessor.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java index b344f703..8f65f67f 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java @@ -55,11 +55,12 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { } // 授权 - ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); - // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) - chmodPb.start().waitFor(); - omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~"); - + if ( !SystemUtils.IS_OS_WINDOWS) { + ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); + // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) + chmodPb.start().waitFor(); + omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~"); + } // 2. 执行目标脚本 ProcessBuilder pb = new ProcessBuilder(getRunCommand(), scriptPath); Process process = pb.start(); From 49c7d18c0032f71cd924d64477d1f8f154c8b163 Mon Sep 17 00:00:00 2001 From: fddc Date: Fri, 14 May 2021 18:23:27 +0800 Subject: [PATCH 02/21] =?UTF-8?q?=E5=A2=9E=E5=8A=A0powershell=EF=BC=8C?= =?UTF-8?q?=E4=BB=A5=E6=94=AF=E6=8C=81windws=E5=B9=B3=E5=8F=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/script/PowerShellProcessor.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java new file mode 100644 index 00000000..9b7a40b7 --- /dev/null +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java @@ -0,0 +1,20 @@ +package tech.powerjob.official.processors.impl.script; + +/** + * python processor + * + * @author fddc + * @since 2021/5/14 + */ +public class PowerShellProcessor extends AbstractScriptProcessor { + + @Override + protected String getScriptName(Long instanceId) { + return String.format("powershell_%d.bat", instanceId); + } + + @Override + protected String getRunCommand() { + return "powershell.exe"; + } +} From 973322370abdeb79d8f4446de7004436deacf573 Mon Sep 17 00:00:00 2001 From: fddc Date: Mon, 31 May 2021 13:06:09 +0800 Subject: [PATCH 03/21] =?UTF-8?q?agent=E6=96=B0=E5=A2=9Etag=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../processors/impl/script/CMDProcessor.java | 20 +++++++++++++++++++ .../tech/powerjob/agent/MainApplication.java | 4 ++++ 2 files changed, 24 insertions(+) create mode 100644 powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java new file mode 100644 index 00000000..9b7a40b7 --- /dev/null +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java @@ -0,0 +1,20 @@ +package tech.powerjob.official.processors.impl.script; + +/** + * python processor + * + * @author fddc + * @since 2021/5/14 + */ +public class PowerShellProcessor extends AbstractScriptProcessor { + + @Override + protected String getScriptName(Long instanceId) { + return String.format("powershell_%d.bat", instanceId); + } + + @Override + protected String getRunCommand() { + return "powershell.exe"; + } +} diff --git a/powerjob-worker-agent/src/main/java/tech/powerjob/agent/MainApplication.java b/powerjob-worker-agent/src/main/java/tech/powerjob/agent/MainApplication.java index c8ca2e36..b89767f1 100644 --- a/powerjob-worker-agent/src/main/java/tech/powerjob/agent/MainApplication.java +++ b/powerjob-worker-agent/src/main/java/tech/powerjob/agent/MainApplication.java @@ -36,6 +36,9 @@ public class MainApplication implements Runnable { @Option(names = {"-l", "--length"}, description = "ProcessResult#msg max length") private int length = 1024; + @Option(names = {"-t", "--tag"}, description = "worker-agent's tag") + private String tag; + public static void main(String[] args) { CommandLine commandLine = new CommandLine(new MainApplication()); commandLine.execute(args); @@ -52,6 +55,7 @@ public class MainApplication implements Runnable { cfg.setServerAddress(Splitter.on(",").splitToList(server)); cfg.setStoreStrategy(StoreStrategy.MEMORY.name().equals(storeStrategy) ? StoreStrategy.MEMORY : StoreStrategy.DISK); cfg.setMaxResultLength(length); + cfg.setTag(tag); PowerJobWorker worker = new PowerJobWorker(); worker.setConfig(cfg); From 0aa06d1ae6b033815951733e9f620d4209847fd3 Mon Sep 17 00:00:00 2001 From: fddc Date: Fri, 6 Aug 2021 20:41:24 +0800 Subject: [PATCH 04/21] =?UTF-8?q?=E8=A7=A3=E5=86=B3win=E5=B9=B3=E5=8F=B0ba?= =?UTF-8?q?t=E8=84=9A=E6=9C=AC=E4=B8=AD=E6=96=87=E8=B7=AF=E5=BE=84?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E4=B9=B1=E7=A0=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/script/AbstractScriptProcessor.java | 31 ++++++++++++++----- .../processors/impl/script/CMDProcessor.java | 6 ++-- .../impl/script/PowerShellProcessor.java | 2 +- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java index 8f65f67f..5de1cbbd 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java @@ -13,6 +13,7 @@ import tech.powerjob.official.processors.util.CommonUtils; import java.io.*; import java.net.URL; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Set; import java.util.concurrent.ForkJoinPool; @@ -30,6 +31,7 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors()); private static final Set DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp"); protected static final String SH_SHELL = "/bin/sh"; + protected static final String CMD_SHELL = "cmd.exe"; private static final String WORKER_DIR = System.getProperty("user.home") + "/powerjob/worker/official_script_processor/"; @@ -62,7 +64,9 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~"); } // 2. 执行目标脚本 - ProcessBuilder pb = new ProcessBuilder(getRunCommand(), scriptPath); + ProcessBuilder pb = StringUtils.equals(getRunCommand(), CMD_SHELL) ? + new ProcessBuilder(getRunCommand(), "/c", scriptPath) + : new ProcessBuilder(getRunCommand(), scriptPath); Process process = pb.start(); StringBuilder inputBuilder = new StringBuilder(); @@ -71,10 +75,12 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { boolean success = true; String result; + //解决windows平台中文乱码 + Charset loggerCharset = SystemUtils.IS_OS_WINDOWS? Charset.forName("GBK"):StandardCharsets.UTF_8; try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) { - POOL.execute(() -> copyStream(is, inputBuilder, omsLogger)); - POOL.execute(() -> copyStream(es, errorBuilder, omsLogger)); + POOL.execute(() -> copyStream(is, inputBuilder, omsLogger,loggerCharset)); + POOL.execute(() -> copyStream(es, errorBuilder, omsLogger,loggerCharset)); success = process.waitFor() == 0; @@ -108,16 +114,25 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { } // 非下载链接,为 processInfo 生成可执行文件 - try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) { - bw.write(processorInfo); - bw.flush(); + if(SystemUtils.IS_OS_WINDOWS) + { + try (Writer fstream = new OutputStreamWriter(new FileOutputStream(script), Charset.forName("GBK")); BufferedWriter out = new BufferedWriter(fstream)) { + out.write(processorInfo); + out.flush(); + } + } + else { + try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) { + bw.write(processorInfo); + bw.flush(); + } } return scriptPath; } - private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger) { + private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) { String line; - try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) { while ((line = br.readLine()) != null) { sb.append(line); // 同步到在线日志 diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java index 9b7a40b7..954e26cc 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java @@ -6,15 +6,15 @@ package tech.powerjob.official.processors.impl.script; * @author fddc * @since 2021/5/14 */ -public class PowerShellProcessor extends AbstractScriptProcessor { +public class CMDProcessor extends AbstractScriptProcessor { @Override protected String getScriptName(Long instanceId) { - return String.format("powershell_%d.bat", instanceId); + return String.format("cmd_%d.bat", instanceId); } @Override protected String getRunCommand() { - return "powershell.exe"; + return "cmd.exe"; } } diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java index 9b7a40b7..47530813 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java @@ -10,7 +10,7 @@ public class PowerShellProcessor extends AbstractScriptProcessor { @Override protected String getScriptName(Long instanceId) { - return String.format("powershell_%d.bat", instanceId); + return String.format("powershell_%d.ps1", instanceId); } @Override From 3842acf9529c3fa5c4ad64aa5a056610306a688b Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 16 Sep 2022 23:10:25 +0800 Subject: [PATCH 05/21] feat: remove useless code --- .../tech/powerjob/server/web/controller/InstanceController.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/InstanceController.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/InstanceController.java index bf776539..0300b199 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/InstanceController.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/controller/InstanceController.java @@ -54,8 +54,6 @@ public class InstanceController { @Resource private CacheService cacheService; @Resource - private AppInfoRepository appInfoRepository; - @Resource private InstanceInfoRepository instanceInfoRepository; @GetMapping("/stop") From a4a41c4ab71aeaf87fc9cbdf7d05a008fbc49025 Mon Sep 17 00:00:00 2001 From: tjq Date: Fri, 16 Sep 2022 23:28:42 +0800 Subject: [PATCH 06/21] feat: define JobLogConfig --- .../powerjob/common/model/JobLogConfig.java | 45 +++++++++++++++++++ .../request/http/SaveJobInfoRequest.java | 11 +++++ .../common/request/query/JobInfoQuery.java | 2 + .../powerjob/common/response/JobInfoDTO.java | 12 +++++ .../server/core/service/JobService.java | 5 +++ .../persistence/remote/model/JobInfoDO.java | 9 ++++ .../server/web/response/JobInfoVO.java | 16 +++++++ 7 files changed, 100 insertions(+) create mode 100644 powerjob-common/src/main/java/tech/powerjob/common/model/JobLogConfig.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/model/JobLogConfig.java b/powerjob-common/src/main/java/tech/powerjob/common/model/JobLogConfig.java new file mode 100644 index 00000000..edc54467 --- /dev/null +++ b/powerjob-common/src/main/java/tech/powerjob/common/model/JobLogConfig.java @@ -0,0 +1,45 @@ +package tech.powerjob.common.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; + +/** + * 任务日志配置 + * + * @author yhz + * @since 2022/9/16 + */ +@Getter +@Setter +@ToString +@Accessors(chain = true) +public class JobLogConfig { + /** + * log type {@link LogType} + */ + private Integer type; + /** + * log level {@link tech.powerjob.common.enums.LogLevel} + */ + private Integer level; + + @Getter + @AllArgsConstructor + public enum LogType { + ONLINE(1), + LOCAL(2); + private final Integer v; + + public LogType of(Integer type) { + for (LogType logType : values()) { + if (logType.v.equals(type)) { + return logType; + } + } + return ONLINE; + } + } +} diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java index e0541527..a031b35f 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java @@ -5,6 +5,7 @@ import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.AlarmConfig; +import tech.powerjob.common.model.JobLogConfig; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.utils.CommonUtils; import lombok.Data; @@ -139,6 +140,16 @@ public class SaveJobInfoRequest { */ private AlarmConfig alarmConfig; + /** + * 任务归类,开放给接入方自由定制 + */ + private String tag; + + /** + * 日志配置,包括日志级别、日志方式等配置信息 + */ + private JobLogConfig logConfig; + /** * Check non-null properties. diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/query/JobInfoQuery.java b/powerjob-common/src/main/java/tech/powerjob/common/request/query/JobInfoQuery.java index aa208900..d726b545 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/query/JobInfoQuery.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/query/JobInfoQuery.java @@ -50,4 +50,6 @@ public class JobInfoQuery extends PowerQuery { private Date gmtModifiedGt; private Integer dispatchStrategyEq; + + private String tagEq; } diff --git a/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java b/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java index 0c04ffa5..567a9614 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java @@ -2,6 +2,7 @@ package tech.powerjob.common.response; import lombok.Data; import tech.powerjob.common.model.AlarmConfig; +import tech.powerjob.common.model.JobLogConfig; import java.util.Date; @@ -125,4 +126,15 @@ public class JobInfoDTO { private String lifecycle; private AlarmConfig alarmConfig; + + /** + * 任务归类,开放给接入方自由定制 + */ + private String tag; + + /** + * 日志配置,包括日志级别、日志方式等配置信息 + */ + private JobLogConfig logConfig; + } diff --git a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java index 62591f3b..b3fe7703 100644 --- a/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java +++ b/powerjob-server/powerjob-server-core/src/main/java/tech/powerjob/server/core/service/JobService.java @@ -1,6 +1,7 @@ package tech.powerjob.server.core.service; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.data.jpa.domain.Specification; @@ -107,6 +108,10 @@ public class JobService { } jobInfoDO.setAlarmConfig(JSON.toJSONString(request.getAlarmConfig())); } + // 日志配置 + if (request.getLogConfig() != null) { + jobInfoDO.setLogConfig(JSONObject.toJSONString(request.getLogConfig())); + } JobInfoDO res = jobInfoRepository.saveAndFlush(jobInfoDO); return res.getId(); } diff --git a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/JobInfoDO.java b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/JobInfoDO.java index 7abdd9dc..3fba4f45 100644 --- a/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/JobInfoDO.java +++ b/powerjob-server/powerjob-server-persistence/src/main/java/tech/powerjob/server/persistence/remote/model/JobInfoDO.java @@ -145,4 +145,13 @@ public class JobInfoDO { */ private String alarmConfig; + /** + * 任务归类,开放给接入方自由定制 + */ + private String tag; + + /** + * 日志配置,包括日志级别、日志方式等配置信息 + */ + private String logConfig; } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java index 9bda80ff..d30679b9 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java @@ -1,10 +1,12 @@ package tech.powerjob.server.web.response; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.AlarmConfig; +import tech.powerjob.common.model.JobLogConfig; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.server.common.SJ; @@ -144,6 +146,16 @@ public class JobInfoVO { private AlarmConfig alarmConfig; + /** + * 任务归类,开放给接入方自由定制 + */ + private String tag; + + /** + * 日志配置,包括日志级别、日志方式等配置信息 + */ + private JobLogConfig logConfig; + public static JobInfoVO from(JobInfoDO jobInfoDO) { JobInfoVO jobInfoVO = new JobInfoVO(); BeanUtils.copyProperties(jobInfoDO, jobInfoVO); @@ -173,6 +185,10 @@ public class JobInfoVO { jobInfoVO.setLifeCycle(LifeCycle.parse(jobInfoDO.getLifecycle())); } + if (!StringUtils.isEmpty(jobInfoDO.getLogConfig())) { + jobInfoVO.setLogConfig(JSONObject.parseObject(jobInfoDO.getLogConfig(), JobLogConfig.class)); + } + return jobInfoVO; } } From e501cb9dfac786edf713f0b2f3d84454af47e564 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 17 Sep 2022 00:24:26 +0800 Subject: [PATCH 07/21] feat: support LogConfig --- .../{JobLogConfig.java => LogConfig.java} | 4 +- .../common/request/ServerScheduleJobReq.java | 4 ++ .../request/http/SaveJobInfoRequest.java | 4 +- .../powerjob/common/response/JobInfoDTO.java | 4 +- .../official/processors/TestUtils.java | 3 +- .../server/web/response/JobInfoVO.java | 6 +- .../tracker/processor/ProcessorTracker.java | 5 +- .../powerjob/worker/log/OmsLoggerFactory.java | 34 ++++++++++ .../worker/log/impl/AbstractOmsLogger.java | 68 +++++++++++++++++++ .../worker/log/impl/OmsLocalLogger.java | 41 +++++++---- .../worker/log/impl/OmsServerLogger.java | 26 ++++--- .../worker/pojo/model/InstanceInfo.java | 2 + .../pojo/request/TaskTrackerStartTaskReq.java | 4 ++ 13 files changed, 171 insertions(+), 34 deletions(-) rename powerjob-common/src/main/java/tech/powerjob/common/model/{JobLogConfig.java => LogConfig.java} (93%) create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/log/OmsLoggerFactory.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/AbstractOmsLogger.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/model/JobLogConfig.java b/powerjob-common/src/main/java/tech/powerjob/common/model/LogConfig.java similarity index 93% rename from powerjob-common/src/main/java/tech/powerjob/common/model/JobLogConfig.java rename to powerjob-common/src/main/java/tech/powerjob/common/model/LogConfig.java index edc54467..6f7c291c 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/model/JobLogConfig.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/model/LogConfig.java @@ -16,7 +16,7 @@ import lombok.experimental.Accessors; @Setter @ToString @Accessors(chain = true) -public class JobLogConfig { +public class LogConfig { /** * log type {@link LogType} */ @@ -26,6 +26,8 @@ public class JobLogConfig { */ private Integer level; + private String loggerName; + @Getter @AllArgsConstructor public enum LogType { diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java index 92fef38e..364aba55 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java @@ -94,6 +94,10 @@ public class ServerScheduleJobReq implements PowerSerializable { */ private String alarmConfig; + /** + * 日志配置 + */ + private String logConfig; @Override public String path() { diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java index a031b35f..65fd892b 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java @@ -5,7 +5,7 @@ import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.AlarmConfig; -import tech.powerjob.common.model.JobLogConfig; +import tech.powerjob.common.model.LogConfig; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.utils.CommonUtils; import lombok.Data; @@ -148,7 +148,7 @@ public class SaveJobInfoRequest { /** * 日志配置,包括日志级别、日志方式等配置信息 */ - private JobLogConfig logConfig; + private LogConfig logConfig; /** diff --git a/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java b/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java index 567a9614..34b63613 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java @@ -2,7 +2,7 @@ package tech.powerjob.common.response; import lombok.Data; import tech.powerjob.common.model.AlarmConfig; -import tech.powerjob.common.model.JobLogConfig; +import tech.powerjob.common.model.LogConfig; import java.util.Date; @@ -135,6 +135,6 @@ public class JobInfoDTO { /** * 日志配置,包括日志级别、日志方式等配置信息 */ - private JobLogConfig logConfig; + private LogConfig logConfig; } diff --git a/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/TestUtils.java b/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/TestUtils.java index 54671ead..c88b923e 100644 --- a/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/TestUtils.java +++ b/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/TestUtils.java @@ -1,5 +1,6 @@ package tech.powerjob.official.processors; +import tech.powerjob.common.model.LogConfig; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.WorkflowContext; import tech.powerjob.worker.log.impl.OmsLocalLogger; @@ -24,7 +25,7 @@ public class TestUtils { taskContext.setJobParams(jobParams); taskContext.setTaskId("0.0"); taskContext.setTaskName("TEST_TASK"); - taskContext.setOmsLogger(new OmsLocalLogger()); + taskContext.setOmsLogger(new OmsLocalLogger(new LogConfig())); taskContext.setWorkflowContext(new WorkflowContext(null, null)); return taskContext; } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java index d30679b9..6e7ebc0b 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java @@ -6,7 +6,7 @@ import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.AlarmConfig; -import tech.powerjob.common.model.JobLogConfig; +import tech.powerjob.common.model.LogConfig; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.server.common.SJ; @@ -154,7 +154,7 @@ public class JobInfoVO { /** * 日志配置,包括日志级别、日志方式等配置信息 */ - private JobLogConfig logConfig; + private LogConfig logConfig; public static JobInfoVO from(JobInfoDO jobInfoDO) { JobInfoVO jobInfoVO = new JobInfoVO(); @@ -186,7 +186,7 @@ public class JobInfoVO { } if (!StringUtils.isEmpty(jobInfoDO.getLogConfig())) { - jobInfoVO.setLogConfig(JSONObject.parseObject(jobInfoDO.getLogConfig(), JobLogConfig.class)); + jobInfoVO.setLogConfig(JSONObject.parseObject(jobInfoDO.getLogConfig(), LogConfig.class)); } return jobInfoVO; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 2526c96f..a81367cd 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -11,6 +11,8 @@ import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.model.LogConfig; +import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskStatus; @@ -22,6 +24,7 @@ import tech.powerjob.worker.core.ProcessorBeanFactory; import tech.powerjob.worker.core.executor.ProcessorRunnable; import tech.powerjob.worker.core.processor.sdk.BasicProcessor; import tech.powerjob.worker.log.OmsLogger; +import tech.powerjob.worker.log.OmsLoggerFactory; import tech.powerjob.worker.log.impl.OmsServerLogger; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.pojo.model.InstanceInfo; @@ -116,7 +119,7 @@ public class ProcessorTracker { String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.TASK_TRACKER_ACTOR_NAME); this.taskTrackerActorRef = workerRuntime.getActorSystem().actorSelection(akkaRemotePath); - this.omsLogger = new OmsServerLogger(instanceId, workerRuntime.getOmsLogHandler()); + this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime); this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); this.lastIdleTime = -1L; this.lastCompletedTaskCount = 0L; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/log/OmsLoggerFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/log/OmsLoggerFactory.java new file mode 100644 index 00000000..d7fc1e5f --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/log/OmsLoggerFactory.java @@ -0,0 +1,34 @@ +package tech.powerjob.worker.log; + +import tech.powerjob.common.model.LogConfig; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.worker.common.WorkerRuntime; +import tech.powerjob.worker.log.impl.OmsLocalLogger; +import tech.powerjob.worker.log.impl.OmsServerLogger; + +/** + * OmsLoggerFactory + * + * @author tjq + * @since 2022/9/17 + */ +public class OmsLoggerFactory { + + public static OmsLogger build(Long instanceId, String logConfig, WorkerRuntime workerRuntime) { + LogConfig cfg; + if (logConfig == null) { + cfg = new LogConfig(); + } else { + try { + cfg = JsonUtils.parseObject(logConfig, LogConfig.class); + } catch (Exception ignore) { + cfg = new LogConfig(); + } + } + + if (LogConfig.LogType.LOCAL.getV().equals(cfg.getType())) { + return new OmsLocalLogger(cfg); + } + return new OmsServerLogger(cfg, instanceId, workerRuntime.getOmsLogHandler()); + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/AbstractOmsLogger.java b/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/AbstractOmsLogger.java new file mode 100644 index 00000000..bb5cd2bf --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/AbstractOmsLogger.java @@ -0,0 +1,68 @@ +package tech.powerjob.worker.log.impl; + +import tech.powerjob.common.enums.LogLevel; +import tech.powerjob.common.model.LogConfig; +import tech.powerjob.worker.log.OmsLogger; + +/** + * AbstractOmsLogger + * + * @author tjq + * @since 2022/9/16 + */ +public abstract class AbstractOmsLogger implements OmsLogger { + + private final LogConfig logConfig; + + public AbstractOmsLogger(LogConfig logConfig) { + this.logConfig = logConfig; + + // 兼容空数据场景,添加默认值,尽量与原有逻辑保持兼容 + if (logConfig.getLevel() == null) { + logConfig.setLevel(LogLevel.INFO.getV()); + } + if (logConfig.getType() == null) { + logConfig.setType(LogConfig.LogType.ONLINE.getV()); + } + } + + abstract void debug0(String messagePattern, Object... args); + + abstract void info0(String messagePattern, Object... args); + + abstract void warn0(String messagePattern, Object... args); + + abstract void error0(String messagePattern, Object... args); + + @Override + public void debug(String messagePattern, Object... args) { + if (LogLevel.DEBUG.getV() < logConfig.getLevel()) { + return; + } + debug0(messagePattern, args); + } + + @Override + public void info(String messagePattern, Object... args) { + if (LogLevel.INFO.getV() < logConfig.getLevel()) { + return; + } + info0(messagePattern, args); + } + + @Override + public void warn(String messagePattern, Object... args) { + if (LogLevel.WARN.getV() < logConfig.getLevel()) { + return; + } + warn0(messagePattern, args); + } + + @Override + public void error(String messagePattern, Object... args) { + if (LogLevel.ERROR.getV() < logConfig.getLevel()) { + return; + } + error0(messagePattern, args); + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsLocalLogger.java b/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsLocalLogger.java index fe1e7d01..ae7da742 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsLocalLogger.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsLocalLogger.java @@ -1,33 +1,46 @@ package tech.powerjob.worker.log.impl; -import tech.powerjob.worker.log.OmsLogger; -import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.powerjob.common.model.LogConfig; /** - * for local test + * More user feedback when the task volume server timeout serious. After pressure testing, we found that there is no bottleneck in the server processing scheduling tasks, and it is assumed that the large amount of logs is causing a serious bottleneck. Therefore, we need to provide local logging API for large MR tasks. * * @author tjq * @since 2021/2/4 */ -@Slf4j -public class OmsLocalLogger implements OmsLogger { - @Override - public void debug(String messagePattern, Object... args) { - log.debug(messagePattern, args); +public class OmsLocalLogger extends AbstractOmsLogger { + + private final Logger LOGGER; + + private static final String DEFAULT_LOGGER_NAME = OmsLocalLogger.class.getName(); + + public OmsLocalLogger(LogConfig logConfig) { + super(logConfig); + + String loggerName = StringUtils.isEmpty(logConfig.getLoggerName()) ? DEFAULT_LOGGER_NAME : logConfig.getLoggerName(); + LOGGER = LoggerFactory.getLogger(loggerName); } @Override - public void info(String messagePattern, Object... args) { - log.info(messagePattern, args); + public void debug0(String messagePattern, Object... args) { + LOGGER.debug(messagePattern, args); } @Override - public void warn(String messagePattern, Object... args) { - log.warn(messagePattern, args); + public void info0(String messagePattern, Object... args) { + LOGGER.info(messagePattern, args); } @Override - public void error(String messagePattern, Object... args) { - log.error(messagePattern, args); + public void warn0(String messagePattern, Object... args) { + LOGGER.warn(messagePattern, args); + } + + @Override + public void error0(String messagePattern, Object... args) { + LOGGER.error(messagePattern, args); } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsServerLogger.java b/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsServerLogger.java index 57807042..feb69e9e 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsServerLogger.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsServerLogger.java @@ -1,43 +1,49 @@ package tech.powerjob.worker.log.impl; import tech.powerjob.common.enums.LogLevel; +import tech.powerjob.common.model.LogConfig; import tech.powerjob.worker.background.OmsLogHandler; -import tech.powerjob.worker.log.OmsLogger; -import lombok.AllArgsConstructor; import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.helpers.FormattingTuple; import org.slf4j.helpers.MessageFormatter; /** - * PowerJob 在线日志,直接上报到 Server,可在控制台直接查看 + * WARN:Please do not use this logger to print large amounts of logs! + * WARN:Please do not use this logger to print large amounts of logs! + * WARN:Please do not use this logger to print large amounts of logs! * * @author tjq - * @since 2020/4/21 + * @since 2022/9/16 */ -@AllArgsConstructor -public class OmsServerLogger implements OmsLogger { +public class OmsServerLogger extends AbstractOmsLogger { private final long instanceId; private final OmsLogHandler omsLogHandler; + public OmsServerLogger(LogConfig logConfig, long instanceId, OmsLogHandler omsLogHandler) { + super(logConfig); + this.instanceId = instanceId; + this.omsLogHandler = omsLogHandler; + } + @Override - public void debug(String messagePattern, Object... args) { + public void debug0(String messagePattern, Object... args) { process(LogLevel.DEBUG, messagePattern, args); } @Override - public void info(String messagePattern, Object... args) { + public void info0(String messagePattern, Object... args) { process(LogLevel.INFO, messagePattern, args); } @Override - public void warn(String messagePattern, Object... args) { + public void warn0(String messagePattern, Object... args) { process(LogLevel.WARN, messagePattern, args); } @Override - public void error(String messagePattern, Object... args) { + public void error0(String messagePattern, Object... args) { process(LogLevel.ERROR, messagePattern, args); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java index 28df58d2..c53e9b5b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java @@ -51,4 +51,6 @@ public class InstanceInfo implements Serializable { private int threadConcurrency; // 子任务重试次数(任务本身的重试机制由server控制) private int taskRetryNum; + + private String logConfig; } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java index 531dfa7c..7481ce5e 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java @@ -32,6 +32,8 @@ public class TaskTrackerStartTaskReq implements PowerSerializable { // 秒级任务专用 private long subInstanceId; + private String logConfig; + /** * 创建 TaskTrackerStartTaskReq,该构造方法必须在 TaskTracker 节点调用 @@ -47,5 +49,7 @@ public class TaskTrackerStartTaskReq implements PowerSerializable { this.taskCurrentRetryNums = task.getFailedCnt(); this.subInstanceId = task.getSubInstanceId(); + + this.logConfig = instanceInfo.getLogConfig(); } } From dc90f272c773a4cf2db5090f7ddf290b6f246c04 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 17 Sep 2022 23:47:10 +0800 Subject: [PATCH 08/21] feat: optimize worker log --- .../powerjob/worker/background/ServerDiscoveryService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java index 7402772c..35e1c9a0 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/ServerDiscoveryService.java @@ -87,7 +87,7 @@ public class ServerDiscoveryService { } if (StringUtils.isEmpty(result)) { - log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined."); + log.warn("[PowerDiscovery] can't find any available server, this worker[appId={}] has been quarantined.", appId); // 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务 if (FAILED_COUNT++ > MAX_FAILED_COUNT) { @@ -108,7 +108,7 @@ public class ServerDiscoveryService { } else { // 重置失败次数 FAILED_COUNT = 0; - log.debug("[PowerDiscovery] current server is {}.", result); + log.debug("[PowerDiscovery] appId={}, current server is {}.", appId, result); return result; } } From 74b6acc927fe3e37f81e342759acb1cb8fb8b815 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 18 Sep 2022 00:12:04 +0800 Subject: [PATCH 09/21] feat: add ConfigProcessor in official-processor --- .../processors/impl/ConfigProcessor.java | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/ConfigProcessor.java diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/ConfigProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/ConfigProcessor.java new file mode 100644 index 00000000..f92ec4c2 --- /dev/null +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/ConfigProcessor.java @@ -0,0 +1,84 @@ +package tech.powerjob.official.processors.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Maps; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import tech.powerjob.official.processors.util.CommonUtils; +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Optional; + +/** + * 配置处理器 + * 超简易的配置中心,用于配置的下发,需要配合秒级 + 广播任务使用! + * 超低成本下的解决方案,强配置 or 高SLA 场景,请使用标准的配置管理中间件。 + * 外部调用方法 {@link ConfigProcessor#fetchConfig()} + * + * @author tjq + * @since 2022/9/17 + */ +@Slf4j +public class ConfigProcessor implements BroadcastProcessor { + + /** + * 获取配置 + * @return 控制台下发的配置 + */ + public static Map fetchConfig() { + if (config == null) { + return Maps.newHashMap(); + } + return Optional.ofNullable(config.getConfig()).orElse(Maps.newHashMap()); + } + + private static Config config; + + @Override + public ProcessResult process(TaskContext context) throws Exception { + + Config newCfg = JSON.parseObject(CommonUtils.parseParams(context), Config.class); + context.getOmsLogger().info("[ConfigProcessor] receive and update config: {}", config); + + // 空场景不更新 + final Map realConfig = newCfg.config; + if (realConfig == null) { + return new ProcessResult(false, "CONFIG_IS_NULL"); + } + + config = newCfg; + + if (StringUtils.isNotEmpty(config.persistentFileName)) { + final File file = new File(config.persistentFileName); + + String content = JSONObject.toJSONString(realConfig); + FileUtils.copyToFile(new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)), file); + } + + return new ProcessResult(true, "UPDATE_SUCCESS"); + } + + @Data + public static class Config implements Serializable { + + /** + * 原始配置 + */ + private Map config; + + /** + * 持久到本地的全路径名称 + */ + private String persistentFileName; + } +} From ec47f5a8c50c841da724de08853d28a6ea94ac5a Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 18 Sep 2022 00:22:03 +0800 Subject: [PATCH 10/21] feat: add ConfigProcessor in official-processor --- .../powerjob/official/processors/impl/ConfigProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/ConfigProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/ConfigProcessor.java index f92ec4c2..6d6e1e0e 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/ConfigProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/ConfigProcessor.java @@ -1,12 +1,12 @@ package tech.powerjob.official.processors.impl; -import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Maps; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; +import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.official.processors.util.CommonUtils; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; @@ -47,7 +47,7 @@ public class ConfigProcessor implements BroadcastProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { - Config newCfg = JSON.parseObject(CommonUtils.parseParams(context), Config.class); + Config newCfg = JsonUtils.parseObject(CommonUtils.parseParams(context), Config.class); context.getOmsLogger().info("[ConfigProcessor] receive and update config: {}", config); // 空场景不更新 From 45f7b17e14b61352821a331e5d1b753a02fc8cb5 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 18 Sep 2022 00:40:35 +0800 Subject: [PATCH 11/21] feat: script processor support cmd and powershel by fddc --- .../impl/script/AbstractScriptProcessor.java | 23 +++++++++++++------ .../processors/impl/script/CMDProcessor.java | 7 ++++++ .../impl/script/PowerShellProcessor.java | 7 ++++++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java index 5de1cbbd..935df2ca 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java @@ -15,6 +15,7 @@ import java.io.*; import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.Set; import java.util.concurrent.ForkJoinPool; @@ -75,12 +76,11 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { boolean success = true; String result; - //解决windows平台中文乱码 - Charset loggerCharset = SystemUtils.IS_OS_WINDOWS? Charset.forName("GBK"):StandardCharsets.UTF_8; + final Charset charset = getCharset(); try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) { - POOL.execute(() -> copyStream(is, inputBuilder, omsLogger,loggerCharset)); - POOL.execute(() -> copyStream(es, errorBuilder, omsLogger,loggerCharset)); + POOL.execute(() -> copyStream(is, inputBuilder, omsLogger, charset)); + POOL.execute(() -> copyStream(es, errorBuilder, omsLogger, charset)); success = process.waitFor() == 0; @@ -113,10 +113,11 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { } } - // 非下载链接,为 processInfo 生成可执行文件 - if(SystemUtils.IS_OS_WINDOWS) + final Charset charset = getCharset(); + + if(charset != null) { - try (Writer fstream = new OutputStreamWriter(new FileOutputStream(script), Charset.forName("GBK")); BufferedWriter out = new BufferedWriter(fstream)) { + try (Writer fstream = new OutputStreamWriter(Files.newOutputStream(script.toPath()), charset); BufferedWriter out = new BufferedWriter(fstream)) { out.write(processorInfo); out.flush(); } @@ -158,4 +159,12 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { * @return 执行脚本的命令 */ protected abstract String getRunCommand(); + + /** + * 默认不指定 + * @return Charset + */ + protected Charset getCharset() { + return StandardCharsets.UTF_8; + } } diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java index 954e26cc..9f14dd44 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java @@ -1,5 +1,7 @@ package tech.powerjob.official.processors.impl.script; +import java.nio.charset.Charset; + /** * python processor * @@ -17,4 +19,9 @@ public class CMDProcessor extends AbstractScriptProcessor { protected String getRunCommand() { return "cmd.exe"; } + + @Override + protected Charset getCharset() { + return Charset.forName("GBK"); + } } diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java index 47530813..592ef841 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java @@ -1,5 +1,7 @@ package tech.powerjob.official.processors.impl.script; +import java.nio.charset.Charset; + /** * python processor * @@ -17,4 +19,9 @@ public class PowerShellProcessor extends AbstractScriptProcessor { protected String getRunCommand() { return "powershell.exe"; } + + @Override + protected Charset getCharset() { + return Charset.forName("GBK"); + } } From 483227f840469332d668747caf91a57feff2748d Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 18 Sep 2022 00:48:02 +0800 Subject: [PATCH 12/21] feat: script processor support cmd and powershel by fddc --- .../powerjob/official/processors/impl/script/CMDProcessor.java | 2 +- .../official/processors/impl/script/PowerShellProcessor.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java index 9f14dd44..22130c4f 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/CMDProcessor.java @@ -22,6 +22,6 @@ public class CMDProcessor extends AbstractScriptProcessor { @Override protected Charset getCharset() { - return Charset.forName("GBK"); + return Charset.defaultCharset(); } } diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java index 592ef841..e0827e9d 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/PowerShellProcessor.java @@ -22,6 +22,6 @@ public class PowerShellProcessor extends AbstractScriptProcessor { @Override protected Charset getCharset() { - return Charset.forName("GBK"); + return Charset.defaultCharset(); } } From f3c7ed8baf03f298e9bdeb0db2b18d00103ad9d5 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 18 Sep 2022 14:02:05 +0800 Subject: [PATCH 13/21] feat: add max queue size for log handler in worker to prevent OOM --- .../tech/powerjob/worker/background/OmsLogHandler.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java b/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java index f84e8a66..ad4f51ba 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/background/OmsLogHandler.java @@ -35,7 +35,7 @@ public class OmsLogHandler { // 上报锁,只需要一个线程上报即可 private final Lock reportLock = new ReentrantLock(); // 生产者消费者模式,异步上传日志 - private final BlockingQueue logQueue = Queues.newLinkedBlockingQueue(); + private final BlockingQueue logQueue = Queues.newLinkedBlockingQueue(10240); // 每次上报携带的数据条数 private static final int BATCH_SIZE = 20; @@ -61,7 +61,10 @@ public class OmsLogHandler { } InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent); - logQueue.offer(tuple); + boolean offerRet = logQueue.offer(tuple); + if (!offerRet) { + log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId); + } } From f20a849a93639c59a12229afeea296a9af7c55aa Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 18 Sep 2022 15:13:17 +0800 Subject: [PATCH 14/21] feat: support shutdown log by OFF level --- .../src/main/java/tech/powerjob/common/enums/LogLevel.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/powerjob-common/src/main/java/tech/powerjob/common/enums/LogLevel.java b/powerjob-common/src/main/java/tech/powerjob/common/enums/LogLevel.java index 53071f3f..3640baec 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/enums/LogLevel.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/enums/LogLevel.java @@ -18,7 +18,8 @@ public enum LogLevel { DEBUG(1), INFO(2), WARN(3), - ERROR(4); + ERROR(4), + OFF(99); private final int v; From b2b8241295b87efff8bb4d2a78e0983186324493 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 18 Sep 2022 18:30:42 +0800 Subject: [PATCH 15/21] feat: use worker-samples as try demo --- others/script/jenkins_auto_build.sh | 31 ++++++++++++++--------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/others/script/jenkins_auto_build.sh b/others/script/jenkins_auto_build.sh index b92b074c..3bf04a8b 100755 --- a/others/script/jenkins_auto_build.sh +++ b/others/script/jenkins_auto_build.sh @@ -4,22 +4,21 @@ echo "================== 构建 jar ==================" mvn clean package -Pdev -DskipTests -U -e echo "================== 拷贝 jar ==================" /bin/cp -rf powerjob-server/powerjob-server-starter/target/*.jar powerjob-server/docker/powerjob-server.jar -/bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar echo "================== 关闭老应用 ==================" docker stop powerjob-server -docker stop powerjob-agent -docker stop powerjob-agent2 +docker stop powerjob-worker-samples +docker stop powerjob-worker-samples2 echo "================== 删除老容器 ==================" docker container rm powerjob-server -docker container rm powerjob-agent -docker container rm powerjob-agent2 +docker container rm powerjob-worker-samples +docker container rm powerjob-worker-samples2 echo "================== 删除旧镜像 ==================" docker rmi -f tjqq/powerjob-server:latest -docker rmi -f tjqq/powerjob-agent:latest +docker rmi -f tjqq/powerjob-worker-samples:latest echo "================== 构建 powerjob-server 镜像 ==================" docker build -t tjqq/powerjob-server:latest powerjob-server/docker/. || exit -echo "================== 构建 powerjob-agent 镜像 ==================" -docker build -t tjqq/powerjob-agent:latest powerjob-worker-agent/. || exit +echo "================== 构建 powerjob-worker-samples 镜像 ==================" +docker build -t tjqq/powerjob-worker-samples:latest powerjob-worker-samples/. || exit echo "================== 准备启动 powerjob-server ==================" docker run -d \ --restart=always \ @@ -37,19 +36,19 @@ echo "使用的Server地址:$serverAddress" docker run -d \ --restart=always \ - --name powerjob-agent \ + --name powerjob-worker-samples \ -p 27777:27777 -p 5002:5005 -p 10002:10000 \ -e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \ - -e PARAMS="--app powerjob-agent-test --server $serverAddress" \ - -v ~/docker/powerjob-agent:/root \ - tjqq/powerjob-agent:latest + -e PARAMS="-Dpowerjob.worker.server-address $serverAddress" \ + -v ~/docker/powerjob-worker-samples:/root \ + tjqq/powerjob-worker-samples:latest docker run -d \ --restart=always \ - --name powerjob-agent2 \ + --name powerjob-worker-samples2 \ -p 27778:27777 -p 5003:5005 -p 10003:10000 \ -e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \ - -e PARAMS="--app powerjob-agent-test --server $serverAddress" \ - -v ~/docker/powerjob-agent2:/root \ - tjqq/powerjob-agent:latest + -e PARAMS="-Dpowerjob.worker.server-address $serverAddress" \ + -v ~/docker/powerjob-worker-samples2:/root \ + tjqq/powerjob-worker-samples:latest From e5d31399902d634b750a45afe7b4c9eeaef8ab3d Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 18 Sep 2022 21:18:56 +0800 Subject: [PATCH 16/21] feat: use worker-samples as try demo --- others/script/jenkins_auto_build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/others/script/jenkins_auto_build.sh b/others/script/jenkins_auto_build.sh index 3bf04a8b..33542e16 100755 --- a/others/script/jenkins_auto_build.sh +++ b/others/script/jenkins_auto_build.sh @@ -1,7 +1,7 @@ #!/bin/bash cd `dirname $0`/../.. || exit echo "================== 构建 jar ==================" -mvn clean package -Pdev -DskipTests -U -e +mvn clean package -Pdev -DskipTests -e echo "================== 拷贝 jar ==================" /bin/cp -rf powerjob-server/powerjob-server-starter/target/*.jar powerjob-server/docker/powerjob-server.jar echo "================== 关闭老应用 ==================" From ce555ad18f33f70f8b995c5480b3c4e4e8821e55 Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 18 Sep 2022 21:56:03 +0800 Subject: [PATCH 17/21] test: add test log processor in samples --- .../processors/test/LogTestProcessor.java | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/LogTestProcessor.java diff --git a/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/LogTestProcessor.java b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/LogTestProcessor.java new file mode 100644 index 00000000..ab423884 --- /dev/null +++ b/powerjob-worker-samples/src/main/java/tech/powerjob/samples/processors/test/LogTestProcessor.java @@ -0,0 +1,41 @@ +package tech.powerjob.samples.processors.test; + +import com.alibaba.fastjson.JSONObject; +import org.springframework.stereotype.Component; +import tech.powerjob.official.processors.util.CommonUtils; +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.sdk.BasicProcessor; +import tech.powerjob.worker.log.OmsLogger; + +import java.util.Date; +import java.util.Optional; + +/** + * LogTestProcessor + * + * @author tjq + * @since 2022/9/18 + */ +@Component +public class LogTestProcessor implements BasicProcessor { + + @Override + public ProcessResult process(TaskContext context) throws Exception { + + final OmsLogger omsLogger = context.getOmsLogger(); + final String parseParams = CommonUtils.parseParams(context); + final JSONObject config = Optional.ofNullable(JSONObject.parseObject(parseParams)).orElse(new JSONObject()); + + final long loopTimes = Optional.ofNullable(config.getLong("loopTimes")).orElse(1000L); + + for (int i = 0; i < loopTimes; i++) { + omsLogger.debug("[DEBUG] one DEBUG log in {}", new Date()); + omsLogger.info("[INFO] one INFO log in {}", new Date()); + omsLogger.warn("[WARN] one WARN log in {}", new Date()); + omsLogger.error("[ERROR] one ERROR log in {}", new Date()); + } + + return new ProcessResult(true); + } +} From 653dcb4a923a3958c95ea87795f2239667a4bd6a Mon Sep 17 00:00:00 2001 From: tjq Date: Sun, 18 Sep 2022 22:06:29 +0800 Subject: [PATCH 18/21] feat: update front-end --- .../src/main/resources/static/index.html | 2 +- .../src/main/resources/static/js/0.js | 11945 ++++++++++++++- .../src/main/resources/static/js/1.js | 12568 +--------------- .../src/main/resources/static/js/10.js | 101 +- .../src/main/resources/static/js/11.js | 80 +- .../src/main/resources/static/js/12.js | 97 - .../src/main/resources/static/js/2.js | 106 +- .../src/main/resources/static/js/3.js | 178 +- .../src/main/resources/static/js/4.js | 236 +- .../src/main/resources/static/js/5.js | 286 +- .../src/main/resources/static/js/6.js | 134 +- .../src/main/resources/static/js/7.js | 165 +- .../src/main/resources/static/js/8.js | 150 +- .../src/main/resources/static/js/9.js | 136 +- .../src/main/resources/static/js/app.js | 252 +- .../main/resources/static/js/chunk-vendors.js | 5900 ++++---- 16 files changed, 15279 insertions(+), 17057 deletions(-) delete mode 100644 powerjob-server/powerjob-server-starter/src/main/resources/static/js/12.js diff --git a/powerjob-server/powerjob-server-starter/src/main/resources/static/index.html b/powerjob-server/powerjob-server-starter/src/main/resources/static/index.html index ac4f12e8..8d933786 100644 --- a/powerjob-server/powerjob-server-starter/src/main/resources/static/index.html +++ b/powerjob-server/powerjob-server-starter/src/main/resources/static/index.html @@ -6,7 +6,7 @@ PowerJob - +