diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceLogContent.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceLogContent.java new file mode 100644 index 00000000..9a2237ca --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/model/InstanceLogContent.java @@ -0,0 +1,26 @@ +package com.github.kfcfans.common.model; + +import com.github.kfcfans.common.OmsSerializable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 任务实例日志对象 + * + * @author tjq + * @since 2020/4/21 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class InstanceLogContent implements OmsSerializable { + + // 实例ID + private long instanceId; + // 日志提交时间 + private long timestamp; + // 日志内容 + private String content; +} + diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerLogReportReq.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerLogReportReq.java new file mode 100644 index 00000000..c60a149f --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/request/WorkerLogReportReq.java @@ -0,0 +1,22 @@ +package com.github.kfcfans.common.request; + +import com.github.kfcfans.common.OmsSerializable; +import com.github.kfcfans.common.model.InstanceLogContent; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * 日志上报请求 + * + * @author tjq + * @since 2020/4/23 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class WorkerLogReportReq implements OmsSerializable { + private List instanceLogContents; +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/OmsLogHandler.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/OmsLogHandler.java new file mode 100644 index 00000000..06054bd6 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/OmsLogHandler.java @@ -0,0 +1,93 @@ +package com.github.kfcfans.oms.worker.background; + +import akka.actor.ActorSelection; +import com.github.kfcfans.common.RemoteConstant; +import com.github.kfcfans.common.model.InstanceLogContent; +import com.github.kfcfans.common.request.WorkerLogReportReq; +import com.github.kfcfans.oms.worker.OhMyWorker; +import com.github.kfcfans.oms.worker.common.utils.AkkaUtils; +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import org.springframework.util.StringUtils; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * 日志处理器 + * + * @author tjq + * @since 2020/4/21 + */ +public class OmsLogHandler { + + private OmsLogHandler() { + } + + // 单例 + public static final OmsLogHandler INSTANCE = new OmsLogHandler(); + // 生产者消费者模式,异步上传日志 + private final BlockingQueue logQueue = Queues.newLinkedBlockingQueue(); + // 处理线程,需要通过线程池启动 + public final Runnable logSubmitter = new LogSubmitter(); + + + private static final int BATCH_SIZE = 10; + private static final int MAX_QUEUE_SIZE = 8096; + + /** + * 提交日志 + * @param instanceId 任务实例ID + * @param logContent 日志内容 + */ + public void submitLog(long instanceId, String logContent) { + InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logContent); + logQueue.add(tuple); + } + + private class LogSubmitter implements Runnable { + + @Override + public void run() { + + String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); + // 当前无可用 Server + if (StringUtils.isEmpty(serverPath)) { + + // 防止长时间无可用Server导致的堆积 + if (logQueue.size() > MAX_QUEUE_SIZE) { + for (int i = 0; i < 1024; i++) { + logQueue.remove(); + } + } + return; + } + + ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); + List logs = Lists.newLinkedList(); + + while (!logQueue.isEmpty()) { + try { + InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS); + logs.add(logContent); + + if (logs.size() >= BATCH_SIZE) { + WorkerLogReportReq req = new WorkerLogReportReq(logs); + // 不可靠请求,WEB日志不追求极致 + serverActor.tell(req, null); + logs.clear(); + } + + }catch (Exception ignore) { + break; + } + } + + if (!logs.isEmpty()) { + WorkerLogReportReq req = new WorkerLogReportReq(logs); + serverActor.tell(req, null); + } + } + } +} \ No newline at end of file diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index d30f7a62..06afc21c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -8,6 +8,7 @@ import com.github.kfcfans.oms.worker.common.constants.TaskConstant; import com.github.kfcfans.oms.worker.common.constants.TaskStatus; import com.github.kfcfans.oms.worker.common.utils.SerializerUtils; import com.github.kfcfans.oms.worker.core.processor.TaskResult; +import com.github.kfcfans.oms.worker.log.OmsLogger; import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; @@ -41,6 +42,7 @@ public class ProcessorRunnable implements Runnable { private final ActorSelection taskTrackerActor; private final TaskDO task; private final BasicProcessor processor; + private final OmsLogger omsLogger; public void innerRun() { @@ -56,6 +58,7 @@ public class ProcessorRunnable implements Runnable { taskContext.setCurrentRetryTimes(task.getFailedCnt()); taskContext.setJobParams(instanceInfo.getJobParams()); taskContext.setInstanceParams(instanceInfo.getInstanceParams()); + taskContext.setOmsLogger(omsLogger); if (task.getTaskContent() != null && task.getTaskContent().length > 0) { taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent())); } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java index da9e9d8e..c88951b9 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/processor/TaskContext.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.worker.core.processor; +import com.github.kfcfans.oms.worker.log.OmsLogger; import lombok.Getter; import lombok.Setter; @@ -22,14 +23,30 @@ public class TaskContext { private String taskId; private String taskName; + /** + * 通过控制台传递的参数 + */ private String jobParams; + /** + * 通过 OpenAPI 传递的参数 + */ private String instanceParams; - + /** + * 最大重试次数 + */ private int maxRetryTimes; + /** + * 当前重试次数 + */ private int currentRetryTimes; - + /** + * 子任务对象,通过Map/MapReduce处理器的map方法生成 + */ private Object subTask; - + /** + * 在线日志记录 + */ + private OmsLogger omsLogger; public String getDescription() { return "subInstanceId='" + subInstanceId + '\'' + diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index c3f9781c..e7044b0a 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -14,6 +14,8 @@ import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory; import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable; import com.github.kfcfans.oms.worker.core.processor.built.PythonProcessor; import com.github.kfcfans.oms.worker.core.processor.built.ShellProcessor; +import com.github.kfcfans.oms.worker.log.OmsLogger; +import com.github.kfcfans.oms.worker.log.impl.OmsServerLogger; import com.github.kfcfans.oms.worker.persistence.TaskDO; import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo; import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq; @@ -46,6 +48,8 @@ public class ProcessorTracker { // 任务执行器 private BasicProcessor processor; + // 在线日志 + private OmsLogger omsLogger; private String taskTrackerAddress; private ActorSelection taskTrackerActorRef; @@ -72,6 +76,8 @@ public class ProcessorTracker { String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME); this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); + this.omsLogger = new OmsServerLogger(instanceId); + // 初始化 线程池 initThreadPool(); // 初始化 Processor @@ -112,7 +118,7 @@ public class ProcessorTracker { newTask.setInstanceId(instanceInfo.getInstanceId()); newTask.setAddress(taskTrackerAddress); - ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor); + ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger); try { threadPool.submit(processorRunnable); success = true; diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/log/OmsLogger.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/log/OmsLogger.java new file mode 100644 index 00000000..4a827a1e --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/log/OmsLogger.java @@ -0,0 +1,38 @@ +package com.github.kfcfans.oms.worker.log; + +/** + * OhMyScheduler 在线日志,直接上报到 Server,可在控制台直接查看 + * + * @author tjq + * @since 2020/4/21 + */ +public interface OmsLogger { + + /** + * 输出 DEBUG 类型的日志,与 Slf4j 用法一致 + * @param messagePattern 日志格式,比如 [XXXService] process task(taskId={},jobId={}) failed. + * @param args 填充 messagePattern 中 {} 的对象 + */ + void debug(String messagePattern, Object... args); + + /** + * 输出 INFO 类型的日志,与 Slf4j 用法一致 + * @param messagePattern 日志格式 + * @param args 填充对象 + */ + void info(String messagePattern, Object... args); + + /** + * 输出 WARN 类型的日志,与 Slf4j 用法一致 + * @param messagePattern 日志格式 + * @param args 填充对象 + */ + void warn(String messagePattern, Object... args); + + /** + * 输出 ERROR 类型的日志,与 Slf4j 用法一致 + * @param messagePattern 日志格式 + * @param args 填充对象 + */ + void error(String messagePattern, Object... args); +} \ No newline at end of file diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/log/impl/OmsServerLogger.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/log/impl/OmsServerLogger.java new file mode 100644 index 00000000..532a54a9 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/log/impl/OmsServerLogger.java @@ -0,0 +1,74 @@ +package com.github.kfcfans.oms.worker.log.impl; + +import com.github.kfcfans.oms.worker.background.OmsLogHandler; +import com.github.kfcfans.oms.worker.log.OmsLogger; +import lombok.AllArgsConstructor; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.slf4j.helpers.FormattingTuple; +import org.slf4j.helpers.MessageFormatter; + + +/** + * OhMyScheduler 在线日志,直接上报到 Server,可在控制台直接查看 + * + * @author tjq + * @since 2020/4/21 + */ +@AllArgsConstructor +public class OmsServerLogger implements OmsLogger { + + private final long instanceId; + + // Level|业务方自身的日志 + private static final String LOG_PREFIX = "{} "; + + @Override + public void debug(String messagePattern, Object... args) { + process("DEBUG", messagePattern, args); + } + + @Override + public void info(String messagePattern, Object... args) { + process("INFO", messagePattern, args); + } + + @Override + public void warn(String messagePattern, Object... args) { + process("WARN", messagePattern, args); + } + + @Override + public void error(String messagePattern, Object... args) { + process("ERROR", messagePattern, args); + } + + /** + * 生成日志内容 + * @param level 级别,DEBUG/INFO/WARN/ERROR + * @param messagePattern 日志格式 + * @param arg 填充参数 + * @return 生成完毕的日志内容 + */ + private static String genLog(String level, String messagePattern, Object... arg) { + + String pattern = LOG_PREFIX + messagePattern; + Object[] newArgs = new Object[arg.length + 2]; + newArgs[0] = level; + System.arraycopy(arg, 0, newArgs, 1, arg.length); + + // 借用 Slf4J 直接生成日志信息 + FormattingTuple formattingTuple = MessageFormatter.arrayFormat(pattern, newArgs); + if (formattingTuple.getThrowable() != null) { + String stackTrace = ExceptionUtils.getStackTrace(formattingTuple.getThrowable()); + return formattingTuple.getMessage() + System.lineSeparator() + stackTrace; + }else { + return formattingTuple.getMessage(); + } + } + + private void process(String level, String messagePattern, Object... args) { + String logContent = genLog(level, messagePattern, args); + OmsLogHandler.INSTANCE.submitLog(instanceId, logContent); + } + +} \ No newline at end of file