new feature -> OmsLogger

This commit is contained in:
tjq 2020-04-23 14:13:20 +08:00
parent b31b96f36b
commit 8382e71da0
8 changed files with 283 additions and 4 deletions

View File

@ -0,0 +1,26 @@
package com.github.kfcfans.common.model;
import com.github.kfcfans.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 任务实例日志对象
*
* @author tjq
* @since 2020/4/21
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class InstanceLogContent implements OmsSerializable {
// 实例ID
private long instanceId;
// 日志提交时间
private long timestamp;
// 日志内容
private String content;
}

View File

@ -0,0 +1,22 @@
package com.github.kfcfans.common.request;
import com.github.kfcfans.common.OmsSerializable;
import com.github.kfcfans.common.model.InstanceLogContent;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* 日志上报请求
*
* @author tjq
* @since 2020/4/23
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WorkerLogReportReq implements OmsSerializable {
private List<InstanceLogContent> instanceLogContents;
}

View File

@ -0,0 +1,93 @@
package com.github.kfcfans.oms.worker.background;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.model.InstanceLogContent;
import com.github.kfcfans.common.request.WorkerLogReportReq;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 日志处理器
*
* @author tjq
* @since 2020/4/21
*/
public class OmsLogHandler {
private OmsLogHandler() {
}
// 单例
public static final OmsLogHandler INSTANCE = new OmsLogHandler();
// 生产者消费者模式异步上传日志
private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue();
// 处理线程需要通过线程池启动
public final Runnable logSubmitter = new LogSubmitter();
private static final int BATCH_SIZE = 10;
private static final int MAX_QUEUE_SIZE = 8096;
/**
* 提交日志
* @param instanceId 任务实例ID
* @param logContent 日志内容
*/
public void submitLog(long instanceId, String logContent) {
InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logContent);
logQueue.add(tuple);
}
private class LogSubmitter implements Runnable {
@Override
public void run() {
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
// 当前无可用 Server
if (StringUtils.isEmpty(serverPath)) {
// 防止长时间无可用Server导致的堆积
if (logQueue.size() > MAX_QUEUE_SIZE) {
for (int i = 0; i < 1024; i++) {
logQueue.remove();
}
}
return;
}
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
List<InstanceLogContent> logs = Lists.newLinkedList();
while (!logQueue.isEmpty()) {
try {
InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);
logs.add(logContent);
if (logs.size() >= BATCH_SIZE) {
WorkerLogReportReq req = new WorkerLogReportReq(logs);
// 不可靠请求WEB日志不追求极致
serverActor.tell(req, null);
logs.clear();
}
}catch (Exception ignore) {
break;
}
}
if (!logs.isEmpty()) {
WorkerLogReportReq req = new WorkerLogReportReq(logs);
serverActor.tell(req, null);
}
}
}
}

View File

@ -8,6 +8,7 @@ import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.github.kfcfans.oms.worker.common.utils.SerializerUtils;
import com.github.kfcfans.oms.worker.core.processor.TaskResult;
import com.github.kfcfans.oms.worker.log.OmsLogger;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
@ -41,6 +42,7 @@ public class ProcessorRunnable implements Runnable {
private final ActorSelection taskTrackerActor;
private final TaskDO task;
private final BasicProcessor processor;
private final OmsLogger omsLogger;
public void innerRun() {
@ -56,6 +58,7 @@ public class ProcessorRunnable implements Runnable {
taskContext.setCurrentRetryTimes(task.getFailedCnt());
taskContext.setJobParams(instanceInfo.getJobParams());
taskContext.setInstanceParams(instanceInfo.getInstanceParams());
taskContext.setOmsLogger(omsLogger);
if (task.getTaskContent() != null && task.getTaskContent().length > 0) {
taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent()));
}

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.worker.core.processor;
import com.github.kfcfans.oms.worker.log.OmsLogger;
import lombok.Getter;
import lombok.Setter;
@ -22,14 +23,30 @@ public class TaskContext {
private String taskId;
private String taskName;
/**
* 通过控制台传递的参数
*/
private String jobParams;
/**
* 通过 OpenAPI 传递的参数
*/
private String instanceParams;
/**
* 最大重试次数
*/
private int maxRetryTimes;
/**
* 当前重试次数
*/
private int currentRetryTimes;
/**
* 子任务对象通过Map/MapReduce处理器的map方法生成
*/
private Object subTask;
/**
* 在线日志记录
*/
private OmsLogger omsLogger;
public String getDescription() {
return "subInstanceId='" + subInstanceId + '\'' +

View File

@ -14,6 +14,8 @@ import com.github.kfcfans.oms.worker.core.classloader.ProcessorBeanFactory;
import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable;
import com.github.kfcfans.oms.worker.core.processor.built.PythonProcessor;
import com.github.kfcfans.oms.worker.core.processor.built.ShellProcessor;
import com.github.kfcfans.oms.worker.log.OmsLogger;
import com.github.kfcfans.oms.worker.log.impl.OmsServerLogger;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
@ -46,6 +48,8 @@ public class ProcessorTracker {
// 任务执行器
private BasicProcessor processor;
// 在线日志
private OmsLogger omsLogger;
private String taskTrackerAddress;
private ActorSelection taskTrackerActorRef;
@ -72,6 +76,8 @@ public class ProcessorTracker {
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME);
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
this.omsLogger = new OmsServerLogger(instanceId);
// 初始化 线程池
initThreadPool();
// 初始化 Processor
@ -112,7 +118,7 @@ public class ProcessorTracker {
newTask.setInstanceId(instanceInfo.getInstanceId());
newTask.setAddress(taskTrackerAddress);
ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor);
ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger);
try {
threadPool.submit(processorRunnable);
success = true;

View File

@ -0,0 +1,38 @@
package com.github.kfcfans.oms.worker.log;
/**
* OhMyScheduler 在线日志直接上报到 Server可在控制台直接查看
*
* @author tjq
* @since 2020/4/21
*/
public interface OmsLogger {
/**
* 输出 DEBUG 类型的日志 Slf4j 用法一致
* @param messagePattern 日志格式比如 [XXXService] process task(taskId={},jobId={}) failed.
* @param args 填充 messagePattern {} 的对象
*/
void debug(String messagePattern, Object... args);
/**
* 输出 INFO 类型的日志 Slf4j 用法一致
* @param messagePattern 日志格式
* @param args 填充对象
*/
void info(String messagePattern, Object... args);
/**
* 输出 WARN 类型的日志 Slf4j 用法一致
* @param messagePattern 日志格式
* @param args 填充对象
*/
void warn(String messagePattern, Object... args);
/**
* 输出 ERROR 类型的日志 Slf4j 用法一致
* @param messagePattern 日志格式
* @param args 填充对象
*/
void error(String messagePattern, Object... args);
}

View File

@ -0,0 +1,74 @@
package com.github.kfcfans.oms.worker.log.impl;
import com.github.kfcfans.oms.worker.background.OmsLogHandler;
import com.github.kfcfans.oms.worker.log.OmsLogger;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;
/**
* OhMyScheduler 在线日志直接上报到 Server可在控制台直接查看
*
* @author tjq
* @since 2020/4/21
*/
@AllArgsConstructor
public class OmsServerLogger implements OmsLogger {
private final long instanceId;
// Level|业务方自身的日志
private static final String LOG_PREFIX = "{} ";
@Override
public void debug(String messagePattern, Object... args) {
process("DEBUG", messagePattern, args);
}
@Override
public void info(String messagePattern, Object... args) {
process("INFO", messagePattern, args);
}
@Override
public void warn(String messagePattern, Object... args) {
process("WARN", messagePattern, args);
}
@Override
public void error(String messagePattern, Object... args) {
process("ERROR", messagePattern, args);
}
/**
* 生成日志内容
* @param level 级别DEBUG/INFO/WARN/ERROR
* @param messagePattern 日志格式
* @param arg 填充参数
* @return 生成完毕的日志内容
*/
private static String genLog(String level, String messagePattern, Object... arg) {
String pattern = LOG_PREFIX + messagePattern;
Object[] newArgs = new Object[arg.length + 2];
newArgs[0] = level;
System.arraycopy(arg, 0, newArgs, 1, arg.length);
// 借用 Slf4J 直接生成日志信息
FormattingTuple formattingTuple = MessageFormatter.arrayFormat(pattern, newArgs);
if (formattingTuple.getThrowable() != null) {
String stackTrace = ExceptionUtils.getStackTrace(formattingTuple.getThrowable());
return formattingTuple.getMessage() + System.lineSeparator() + stackTrace;
}else {
return formattingTuple.getMessage();
}
}
private void process(String level, String messagePattern, Object... args) {
String logContent = genLog(level, messagePattern, args);
OmsLogHandler.INSTANCE.submitLog(instanceId, logContent);
}
}