feat: support LogConfig

This commit is contained in:
tjq 2022-09-17 00:24:26 +08:00
parent a4a41c4ab7
commit e501cb9dfa
13 changed files with 171 additions and 34 deletions

View File

@ -16,7 +16,7 @@ import lombok.experimental.Accessors;
@Setter
@ToString
@Accessors(chain = true)
public class JobLogConfig {
public class LogConfig {
/**
* log type {@link LogType}
*/
@ -26,6 +26,8 @@ public class JobLogConfig {
*/
private Integer level;
private String loggerName;
@Getter
@AllArgsConstructor
public enum LogType {

View File

@ -94,6 +94,10 @@ public class ServerScheduleJobReq implements PowerSerializable {
*/
private String alarmConfig;
/**
* 日志配置
*/
private String logConfig;
@Override
public String path() {

View File

@ -5,7 +5,7 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.JobLogConfig;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.utils.CommonUtils;
import lombok.Data;
@ -148,7 +148,7 @@ public class SaveJobInfoRequest {
/**
* 日志配置包括日志级别日志方式等配置信息
*/
private JobLogConfig logConfig;
private LogConfig logConfig;
/**

View File

@ -2,7 +2,7 @@ package tech.powerjob.common.response;
import lombok.Data;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.JobLogConfig;
import tech.powerjob.common.model.LogConfig;
import java.util.Date;
@ -135,6 +135,6 @@ public class JobInfoDTO {
/**
* 日志配置包括日志级别日志方式等配置信息
*/
private JobLogConfig logConfig;
private LogConfig logConfig;
}

View File

@ -1,5 +1,6 @@
package tech.powerjob.official.processors;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.WorkflowContext;
import tech.powerjob.worker.log.impl.OmsLocalLogger;
@ -24,7 +25,7 @@ public class TestUtils {
taskContext.setJobParams(jobParams);
taskContext.setTaskId("0.0");
taskContext.setTaskName("TEST_TASK");
taskContext.setOmsLogger(new OmsLocalLogger());
taskContext.setOmsLogger(new OmsLocalLogger(new LogConfig()));
taskContext.setWorkflowContext(new WorkflowContext(null, null));
return taskContext;
}

View File

@ -6,7 +6,7 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.JobLogConfig;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.SJ;
@ -154,7 +154,7 @@ public class JobInfoVO {
/**
* 日志配置包括日志级别日志方式等配置信息
*/
private JobLogConfig logConfig;
private LogConfig logConfig;
public static JobInfoVO from(JobInfoDO jobInfoDO) {
JobInfoVO jobInfoVO = new JobInfoVO();
@ -186,7 +186,7 @@ public class JobInfoVO {
}
if (!StringUtils.isEmpty(jobInfoDO.getLogConfig())) {
jobInfoVO.setLogConfig(JSONObject.parseObject(jobInfoDO.getLogConfig(), JobLogConfig.class));
jobInfoVO.setLogConfig(JSONObject.parseObject(jobInfoDO.getLogConfig(), LogConfig.class));
}
return jobInfoVO;

View File

@ -11,6 +11,8 @@ import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;
@ -22,6 +24,7 @@ import tech.powerjob.worker.core.ProcessorBeanFactory;
import tech.powerjob.worker.core.executor.ProcessorRunnable;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.log.OmsLogger;
import tech.powerjob.worker.log.OmsLoggerFactory;
import tech.powerjob.worker.log.impl.OmsServerLogger;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.pojo.model.InstanceInfo;
@ -116,7 +119,7 @@ public class ProcessorTracker {
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
this.taskTrackerActorRef = workerRuntime.getActorSystem().actorSelection(akkaRemotePath);
this.omsLogger = new OmsServerLogger(instanceId, workerRuntime.getOmsLogHandler());
this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime);
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
this.lastIdleTime = -1L;
this.lastCompletedTaskCount = 0L;

View File

@ -0,0 +1,34 @@
package tech.powerjob.worker.log;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.log.impl.OmsLocalLogger;
import tech.powerjob.worker.log.impl.OmsServerLogger;
/**
* OmsLoggerFactory
*
* @author tjq
* @since 2022/9/17
*/
public class OmsLoggerFactory {
public static OmsLogger build(Long instanceId, String logConfig, WorkerRuntime workerRuntime) {
LogConfig cfg;
if (logConfig == null) {
cfg = new LogConfig();
} else {
try {
cfg = JsonUtils.parseObject(logConfig, LogConfig.class);
} catch (Exception ignore) {
cfg = new LogConfig();
}
}
if (LogConfig.LogType.LOCAL.getV().equals(cfg.getType())) {
return new OmsLocalLogger(cfg);
}
return new OmsServerLogger(cfg, instanceId, workerRuntime.getOmsLogHandler());
}
}

View File

@ -0,0 +1,68 @@
package tech.powerjob.worker.log.impl;
import tech.powerjob.common.enums.LogLevel;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.worker.log.OmsLogger;
/**
* AbstractOmsLogger
*
* @author tjq
* @since 2022/9/16
*/
public abstract class AbstractOmsLogger implements OmsLogger {
private final LogConfig logConfig;
public AbstractOmsLogger(LogConfig logConfig) {
this.logConfig = logConfig;
// 兼容空数据场景添加默认值尽量与原有逻辑保持兼容
if (logConfig.getLevel() == null) {
logConfig.setLevel(LogLevel.INFO.getV());
}
if (logConfig.getType() == null) {
logConfig.setType(LogConfig.LogType.ONLINE.getV());
}
}
abstract void debug0(String messagePattern, Object... args);
abstract void info0(String messagePattern, Object... args);
abstract void warn0(String messagePattern, Object... args);
abstract void error0(String messagePattern, Object... args);
@Override
public void debug(String messagePattern, Object... args) {
if (LogLevel.DEBUG.getV() < logConfig.getLevel()) {
return;
}
debug0(messagePattern, args);
}
@Override
public void info(String messagePattern, Object... args) {
if (LogLevel.INFO.getV() < logConfig.getLevel()) {
return;
}
info0(messagePattern, args);
}
@Override
public void warn(String messagePattern, Object... args) {
if (LogLevel.WARN.getV() < logConfig.getLevel()) {
return;
}
warn0(messagePattern, args);
}
@Override
public void error(String messagePattern, Object... args) {
if (LogLevel.ERROR.getV() < logConfig.getLevel()) {
return;
}
error0(messagePattern, args);
}
}

View File

@ -1,33 +1,46 @@
package tech.powerjob.worker.log.impl;
import tech.powerjob.worker.log.OmsLogger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.powerjob.common.model.LogConfig;
/**
* for local test
* More user feedback when the task volume server timeout serious. After pressure testing, we found that there is no bottleneck in the server processing scheduling tasks, and it is assumed that the large amount of logs is causing a serious bottleneck. Therefore, we need to provide local logging API for large MR tasks.
*
* @author tjq
* @since 2021/2/4
*/
@Slf4j
public class OmsLocalLogger implements OmsLogger {
@Override
public void debug(String messagePattern, Object... args) {
log.debug(messagePattern, args);
public class OmsLocalLogger extends AbstractOmsLogger {
private final Logger LOGGER;
private static final String DEFAULT_LOGGER_NAME = OmsLocalLogger.class.getName();
public OmsLocalLogger(LogConfig logConfig) {
super(logConfig);
String loggerName = StringUtils.isEmpty(logConfig.getLoggerName()) ? DEFAULT_LOGGER_NAME : logConfig.getLoggerName();
LOGGER = LoggerFactory.getLogger(loggerName);
}
@Override
public void info(String messagePattern, Object... args) {
log.info(messagePattern, args);
public void debug0(String messagePattern, Object... args) {
LOGGER.debug(messagePattern, args);
}
@Override
public void warn(String messagePattern, Object... args) {
log.warn(messagePattern, args);
public void info0(String messagePattern, Object... args) {
LOGGER.info(messagePattern, args);
}
@Override
public void error(String messagePattern, Object... args) {
log.error(messagePattern, args);
public void warn0(String messagePattern, Object... args) {
LOGGER.warn(messagePattern, args);
}
@Override
public void error0(String messagePattern, Object... args) {
LOGGER.error(messagePattern, args);
}
}

View File

@ -1,43 +1,49 @@
package tech.powerjob.worker.log.impl;
import tech.powerjob.common.enums.LogLevel;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.worker.background.OmsLogHandler;
import tech.powerjob.worker.log.OmsLogger;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;
/**
* PowerJob 在线日志直接上报到 Server可在控制台直接查看
* WARNPlease do not use this logger to print large amounts of logs!
* WARNPlease do not use this logger to print large amounts of logs!
* WARNPlease do not use this logger to print large amounts of logs!
*
* @author tjq
* @since 2020/4/21
* @since 2022/9/16
*/
@AllArgsConstructor
public class OmsServerLogger implements OmsLogger {
public class OmsServerLogger extends AbstractOmsLogger {
private final long instanceId;
private final OmsLogHandler omsLogHandler;
public OmsServerLogger(LogConfig logConfig, long instanceId, OmsLogHandler omsLogHandler) {
super(logConfig);
this.instanceId = instanceId;
this.omsLogHandler = omsLogHandler;
}
@Override
public void debug(String messagePattern, Object... args) {
public void debug0(String messagePattern, Object... args) {
process(LogLevel.DEBUG, messagePattern, args);
}
@Override
public void info(String messagePattern, Object... args) {
public void info0(String messagePattern, Object... args) {
process(LogLevel.INFO, messagePattern, args);
}
@Override
public void warn(String messagePattern, Object... args) {
public void warn0(String messagePattern, Object... args) {
process(LogLevel.WARN, messagePattern, args);
}
@Override
public void error(String messagePattern, Object... args) {
public void error0(String messagePattern, Object... args) {
process(LogLevel.ERROR, messagePattern, args);
}

View File

@ -51,4 +51,6 @@ public class InstanceInfo implements Serializable {
private int threadConcurrency;
// 子任务重试次数任务本身的重试机制由server控制
private int taskRetryNum;
private String logConfig;
}

View File

@ -32,6 +32,8 @@ public class TaskTrackerStartTaskReq implements PowerSerializable {
// 秒级任务专用
private long subInstanceId;
private String logConfig;
/**
* 创建 TaskTrackerStartTaskReq该构造方法必须在 TaskTracker 节点调用
@ -47,5 +49,7 @@ public class TaskTrackerStartTaskReq implements PowerSerializable {
this.taskCurrentRetryNums = task.getFailedCnt();
this.subInstanceId = task.getSubInstanceId();
this.logConfig = instanceInfo.getLogConfig();
}
}