From e501cb9dfac786edf713f0b2f3d84454af47e564 Mon Sep 17 00:00:00 2001 From: tjq Date: Sat, 17 Sep 2022 00:24:26 +0800 Subject: [PATCH] feat: support LogConfig --- .../{JobLogConfig.java => LogConfig.java} | 4 +- .../common/request/ServerScheduleJobReq.java | 4 ++ .../request/http/SaveJobInfoRequest.java | 4 +- .../powerjob/common/response/JobInfoDTO.java | 4 +- .../official/processors/TestUtils.java | 3 +- .../server/web/response/JobInfoVO.java | 6 +- .../tracker/processor/ProcessorTracker.java | 5 +- .../powerjob/worker/log/OmsLoggerFactory.java | 34 ++++++++++ .../worker/log/impl/AbstractOmsLogger.java | 68 +++++++++++++++++++ .../worker/log/impl/OmsLocalLogger.java | 41 +++++++---- .../worker/log/impl/OmsServerLogger.java | 26 ++++--- .../worker/pojo/model/InstanceInfo.java | 2 + .../pojo/request/TaskTrackerStartTaskReq.java | 4 ++ 13 files changed, 171 insertions(+), 34 deletions(-) rename powerjob-common/src/main/java/tech/powerjob/common/model/{JobLogConfig.java => LogConfig.java} (93%) create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/log/OmsLoggerFactory.java create mode 100644 powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/AbstractOmsLogger.java diff --git a/powerjob-common/src/main/java/tech/powerjob/common/model/JobLogConfig.java b/powerjob-common/src/main/java/tech/powerjob/common/model/LogConfig.java similarity index 93% rename from powerjob-common/src/main/java/tech/powerjob/common/model/JobLogConfig.java rename to powerjob-common/src/main/java/tech/powerjob/common/model/LogConfig.java index edc54467..6f7c291c 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/model/JobLogConfig.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/model/LogConfig.java @@ -16,7 +16,7 @@ import lombok.experimental.Accessors; @Setter @ToString @Accessors(chain = true) -public class JobLogConfig { +public class LogConfig { /** * log type {@link LogType} */ @@ -26,6 +26,8 @@ public class JobLogConfig { */ private Integer level; + private String loggerName; + @Getter @AllArgsConstructor public enum LogType { diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java index 92fef38e..364aba55 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/ServerScheduleJobReq.java @@ -94,6 +94,10 @@ public class ServerScheduleJobReq implements PowerSerializable { */ private String alarmConfig; + /** + * 日志配置 + */ + private String logConfig; @Override public String path() { diff --git a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java index a031b35f..65fd892b 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/request/http/SaveJobInfoRequest.java @@ -5,7 +5,7 @@ import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.AlarmConfig; -import tech.powerjob.common.model.JobLogConfig; +import tech.powerjob.common.model.LogConfig; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.utils.CommonUtils; import lombok.Data; @@ -148,7 +148,7 @@ public class SaveJobInfoRequest { /** * 日志配置,包括日志级别、日志方式等配置信息 */ - private JobLogConfig logConfig; + private LogConfig logConfig; /** diff --git a/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java b/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java index 567a9614..34b63613 100644 --- a/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java +++ b/powerjob-common/src/main/java/tech/powerjob/common/response/JobInfoDTO.java @@ -2,7 +2,7 @@ package tech.powerjob.common.response; import lombok.Data; import tech.powerjob.common.model.AlarmConfig; -import tech.powerjob.common.model.JobLogConfig; +import tech.powerjob.common.model.LogConfig; import java.util.Date; @@ -135,6 +135,6 @@ public class JobInfoDTO { /** * 日志配置,包括日志级别、日志方式等配置信息 */ - private JobLogConfig logConfig; + private LogConfig logConfig; } diff --git a/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/TestUtils.java b/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/TestUtils.java index 54671ead..c88b923e 100644 --- a/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/TestUtils.java +++ b/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/TestUtils.java @@ -1,5 +1,6 @@ package tech.powerjob.official.processors; +import tech.powerjob.common.model.LogConfig; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.core.processor.WorkflowContext; import tech.powerjob.worker.log.impl.OmsLocalLogger; @@ -24,7 +25,7 @@ public class TestUtils { taskContext.setJobParams(jobParams); taskContext.setTaskId("0.0"); taskContext.setTaskName("TEST_TASK"); - taskContext.setOmsLogger(new OmsLocalLogger()); + taskContext.setOmsLogger(new OmsLocalLogger(new LogConfig())); taskContext.setWorkflowContext(new WorkflowContext(null, null)); return taskContext; } diff --git a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java index d30679b9..6e7ebc0b 100644 --- a/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java +++ b/powerjob-server/powerjob-server-starter/src/main/java/tech/powerjob/server/web/response/JobInfoVO.java @@ -6,7 +6,7 @@ import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; import tech.powerjob.common.model.AlarmConfig; -import tech.powerjob.common.model.JobLogConfig; +import tech.powerjob.common.model.LogConfig; import tech.powerjob.common.model.LifeCycle; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.server.common.SJ; @@ -154,7 +154,7 @@ public class JobInfoVO { /** * 日志配置,包括日志级别、日志方式等配置信息 */ - private JobLogConfig logConfig; + private LogConfig logConfig; public static JobInfoVO from(JobInfoDO jobInfoDO) { JobInfoVO jobInfoVO = new JobInfoVO(); @@ -186,7 +186,7 @@ public class JobInfoVO { } if (!StringUtils.isEmpty(jobInfoDO.getLogConfig())) { - jobInfoVO.setLogConfig(JSONObject.parseObject(jobInfoDO.getLogConfig(), JobLogConfig.class)); + jobInfoVO.setLogConfig(JSONObject.parseObject(jobInfoDO.getLogConfig(), LogConfig.class)); } return jobInfoVO; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 2526c96f..a81367cd 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -11,6 +11,8 @@ import tech.powerjob.common.RemoteConstant; import tech.powerjob.common.enums.ExecuteType; import tech.powerjob.common.enums.ProcessorType; import tech.powerjob.common.enums.TimeExpressionType; +import tech.powerjob.common.model.LogConfig; +import tech.powerjob.common.serialize.JsonUtils; import tech.powerjob.common.utils.CommonUtils; import tech.powerjob.worker.common.WorkerRuntime; import tech.powerjob.worker.common.constants.TaskStatus; @@ -22,6 +24,7 @@ import tech.powerjob.worker.core.ProcessorBeanFactory; import tech.powerjob.worker.core.executor.ProcessorRunnable; import tech.powerjob.worker.core.processor.sdk.BasicProcessor; import tech.powerjob.worker.log.OmsLogger; +import tech.powerjob.worker.log.OmsLoggerFactory; import tech.powerjob.worker.log.impl.OmsServerLogger; import tech.powerjob.worker.persistence.TaskDO; import tech.powerjob.worker.pojo.model.InstanceInfo; @@ -116,7 +119,7 @@ public class ProcessorTracker { String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.TASK_TRACKER_ACTOR_NAME); this.taskTrackerActorRef = workerRuntime.getActorSystem().actorSelection(akkaRemotePath); - this.omsLogger = new OmsServerLogger(instanceId, workerRuntime.getOmsLogHandler()); + this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime); this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); this.lastIdleTime = -1L; this.lastCompletedTaskCount = 0L; diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/log/OmsLoggerFactory.java b/powerjob-worker/src/main/java/tech/powerjob/worker/log/OmsLoggerFactory.java new file mode 100644 index 00000000..d7fc1e5f --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/log/OmsLoggerFactory.java @@ -0,0 +1,34 @@ +package tech.powerjob.worker.log; + +import tech.powerjob.common.model.LogConfig; +import tech.powerjob.common.serialize.JsonUtils; +import tech.powerjob.worker.common.WorkerRuntime; +import tech.powerjob.worker.log.impl.OmsLocalLogger; +import tech.powerjob.worker.log.impl.OmsServerLogger; + +/** + * OmsLoggerFactory + * + * @author tjq + * @since 2022/9/17 + */ +public class OmsLoggerFactory { + + public static OmsLogger build(Long instanceId, String logConfig, WorkerRuntime workerRuntime) { + LogConfig cfg; + if (logConfig == null) { + cfg = new LogConfig(); + } else { + try { + cfg = JsonUtils.parseObject(logConfig, LogConfig.class); + } catch (Exception ignore) { + cfg = new LogConfig(); + } + } + + if (LogConfig.LogType.LOCAL.getV().equals(cfg.getType())) { + return new OmsLocalLogger(cfg); + } + return new OmsServerLogger(cfg, instanceId, workerRuntime.getOmsLogHandler()); + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/AbstractOmsLogger.java b/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/AbstractOmsLogger.java new file mode 100644 index 00000000..bb5cd2bf --- /dev/null +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/AbstractOmsLogger.java @@ -0,0 +1,68 @@ +package tech.powerjob.worker.log.impl; + +import tech.powerjob.common.enums.LogLevel; +import tech.powerjob.common.model.LogConfig; +import tech.powerjob.worker.log.OmsLogger; + +/** + * AbstractOmsLogger + * + * @author tjq + * @since 2022/9/16 + */ +public abstract class AbstractOmsLogger implements OmsLogger { + + private final LogConfig logConfig; + + public AbstractOmsLogger(LogConfig logConfig) { + this.logConfig = logConfig; + + // 兼容空数据场景,添加默认值,尽量与原有逻辑保持兼容 + if (logConfig.getLevel() == null) { + logConfig.setLevel(LogLevel.INFO.getV()); + } + if (logConfig.getType() == null) { + logConfig.setType(LogConfig.LogType.ONLINE.getV()); + } + } + + abstract void debug0(String messagePattern, Object... args); + + abstract void info0(String messagePattern, Object... args); + + abstract void warn0(String messagePattern, Object... args); + + abstract void error0(String messagePattern, Object... args); + + @Override + public void debug(String messagePattern, Object... args) { + if (LogLevel.DEBUG.getV() < logConfig.getLevel()) { + return; + } + debug0(messagePattern, args); + } + + @Override + public void info(String messagePattern, Object... args) { + if (LogLevel.INFO.getV() < logConfig.getLevel()) { + return; + } + info0(messagePattern, args); + } + + @Override + public void warn(String messagePattern, Object... args) { + if (LogLevel.WARN.getV() < logConfig.getLevel()) { + return; + } + warn0(messagePattern, args); + } + + @Override + public void error(String messagePattern, Object... args) { + if (LogLevel.ERROR.getV() < logConfig.getLevel()) { + return; + } + error0(messagePattern, args); + } +} diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsLocalLogger.java b/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsLocalLogger.java index fe1e7d01..ae7da742 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsLocalLogger.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsLocalLogger.java @@ -1,33 +1,46 @@ package tech.powerjob.worker.log.impl; -import tech.powerjob.worker.log.OmsLogger; -import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.powerjob.common.model.LogConfig; /** - * for local test + * More user feedback when the task volume server timeout serious. After pressure testing, we found that there is no bottleneck in the server processing scheduling tasks, and it is assumed that the large amount of logs is causing a serious bottleneck. Therefore, we need to provide local logging API for large MR tasks. * * @author tjq * @since 2021/2/4 */ -@Slf4j -public class OmsLocalLogger implements OmsLogger { - @Override - public void debug(String messagePattern, Object... args) { - log.debug(messagePattern, args); +public class OmsLocalLogger extends AbstractOmsLogger { + + private final Logger LOGGER; + + private static final String DEFAULT_LOGGER_NAME = OmsLocalLogger.class.getName(); + + public OmsLocalLogger(LogConfig logConfig) { + super(logConfig); + + String loggerName = StringUtils.isEmpty(logConfig.getLoggerName()) ? DEFAULT_LOGGER_NAME : logConfig.getLoggerName(); + LOGGER = LoggerFactory.getLogger(loggerName); } @Override - public void info(String messagePattern, Object... args) { - log.info(messagePattern, args); + public void debug0(String messagePattern, Object... args) { + LOGGER.debug(messagePattern, args); } @Override - public void warn(String messagePattern, Object... args) { - log.warn(messagePattern, args); + public void info0(String messagePattern, Object... args) { + LOGGER.info(messagePattern, args); } @Override - public void error(String messagePattern, Object... args) { - log.error(messagePattern, args); + public void warn0(String messagePattern, Object... args) { + LOGGER.warn(messagePattern, args); + } + + @Override + public void error0(String messagePattern, Object... args) { + LOGGER.error(messagePattern, args); } } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsServerLogger.java b/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsServerLogger.java index 57807042..feb69e9e 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsServerLogger.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/log/impl/OmsServerLogger.java @@ -1,43 +1,49 @@ package tech.powerjob.worker.log.impl; import tech.powerjob.common.enums.LogLevel; +import tech.powerjob.common.model.LogConfig; import tech.powerjob.worker.background.OmsLogHandler; -import tech.powerjob.worker.log.OmsLogger; -import lombok.AllArgsConstructor; import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.helpers.FormattingTuple; import org.slf4j.helpers.MessageFormatter; /** - * PowerJob 在线日志,直接上报到 Server,可在控制台直接查看 + * WARN:Please do not use this logger to print large amounts of logs! + * WARN:Please do not use this logger to print large amounts of logs! + * WARN:Please do not use this logger to print large amounts of logs! * * @author tjq - * @since 2020/4/21 + * @since 2022/9/16 */ -@AllArgsConstructor -public class OmsServerLogger implements OmsLogger { +public class OmsServerLogger extends AbstractOmsLogger { private final long instanceId; private final OmsLogHandler omsLogHandler; + public OmsServerLogger(LogConfig logConfig, long instanceId, OmsLogHandler omsLogHandler) { + super(logConfig); + this.instanceId = instanceId; + this.omsLogHandler = omsLogHandler; + } + @Override - public void debug(String messagePattern, Object... args) { + public void debug0(String messagePattern, Object... args) { process(LogLevel.DEBUG, messagePattern, args); } @Override - public void info(String messagePattern, Object... args) { + public void info0(String messagePattern, Object... args) { process(LogLevel.INFO, messagePattern, args); } @Override - public void warn(String messagePattern, Object... args) { + public void warn0(String messagePattern, Object... args) { process(LogLevel.WARN, messagePattern, args); } @Override - public void error(String messagePattern, Object... args) { + public void error0(String messagePattern, Object... args) { process(LogLevel.ERROR, messagePattern, args); } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java index 28df58d2..c53e9b5b 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java @@ -51,4 +51,6 @@ public class InstanceInfo implements Serializable { private int threadConcurrency; // 子任务重试次数(任务本身的重试机制由server控制) private int taskRetryNum; + + private String logConfig; } diff --git a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java index 531dfa7c..7481ce5e 100644 --- a/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java +++ b/powerjob-worker/src/main/java/tech/powerjob/worker/pojo/request/TaskTrackerStartTaskReq.java @@ -32,6 +32,8 @@ public class TaskTrackerStartTaskReq implements PowerSerializable { // 秒级任务专用 private long subInstanceId; + private String logConfig; + /** * 创建 TaskTrackerStartTaskReq,该构造方法必须在 TaskTracker 节点调用 @@ -47,5 +49,7 @@ public class TaskTrackerStartTaskReq implements PowerSerializable { this.taskCurrentRetryNums = task.getFailedCnt(); this.subInstanceId = task.getSubInstanceId(); + + this.logConfig = instanceInfo.getLogConfig(); } }