diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java index 6da44241..64069cf3 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/local/LocalInstanceLogRepository.java @@ -1,5 +1,6 @@ package com.github.kfcfans.oms.server.persistence.local; +import com.google.errorprone.annotations.CanIgnoreReturnValue; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Modifying; @@ -23,6 +24,7 @@ public interface LocalInstanceLogRepository extends JpaRepository instanceIds, Long t); long countByInstanceId(Long instanceId); diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java index ee884e4a..aa75f026 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/InstanceLogService.java @@ -11,6 +11,7 @@ import com.github.kfcfans.oms.server.service.instance.InstanceManager; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.mongodb.gridfs.GridFS; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.FastDateFormat; import org.springframework.beans.BeanUtils; diff --git a/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/tester/OmsLogPerformanceTester.java b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/tester/OmsLogPerformanceTester.java new file mode 100644 index 00000000..cb46756c --- /dev/null +++ b/oh-my-scheduler-worker-samples/src/main/java/com/github/kfcfans/oms/server/tester/OmsLogPerformanceTester.java @@ -0,0 +1,46 @@ +package com.github.kfcfans.oms.server.tester; + +import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.oms.worker.core.processor.ProcessResult; +import com.github.kfcfans.oms.worker.core.processor.TaskContext; +import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor; +import com.github.kfcfans.oms.worker.log.OmsLogger; + +/** + * 测试 Oms 在线日志的性能 + * + * @author tjq + * @since 2020/5/3 + */ +public class OmsLogPerformanceTester implements BasicProcessor { + + private static final int BATCH = 1000; + + @Override + public ProcessResult process(TaskContext context) throws Exception { + + OmsLogger logger = context.getOmsLogger(); + // 控制台参数,格式为 {"num":10000, "interval": 200} + JSONObject jobParams = JSONObject.parseObject(context.getJobParams()); + Long num = jobParams.getLong("num"); + Long interval = jobParams.getLong("interval"); + + RuntimeException re = new RuntimeException("This is a exception~~~"); + + long times = num / BATCH; + for (long i = 0; i < times; i++) { + for (long j = 0; j < BATCH; j++) { + long index = i * BATCH + j; + logger.info("[OmsLogPerformanceTester] testing omsLogger performance, current index is {}.", index); + } + logger.error("[OmsLogPerformanceTester] Oh, we have an exception to log~", re); + try { + Thread.sleep(interval); + }catch (Exception ignore) { + } + } + + logger.info("[OmsLogPerformanceTester] success!"); + return new ProcessResult(true, "good job"); + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/OmsLogHandler.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/OmsLogHandler.java index ab938efc..c6e53b4c 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/OmsLogHandler.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/background/OmsLogHandler.java @@ -14,6 +14,7 @@ import org.springframework.util.StringUtils; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * 日志处理器 @@ -33,10 +34,14 @@ public class OmsLogHandler { private final BlockingQueue logQueue = Queues.newLinkedBlockingQueue(); // 处理线程,需要通过线程池启动 public final Runnable logSubmitter = new LogSubmitter(); + // 上报锁,只需要一个线程上报即可 + private final ReentrantLock reportLock = new ReentrantLock(); - private static final int BATCH_SIZE = 10; - private static final int MAX_QUEUE_SIZE = 8096; + // 每次上报携带的数据条数 + private static final int BATCH_SIZE = 20; + // 本地囤积阈值 + private static final int REPORT_SIZE = 1024; /** * 提交日志 @@ -44,52 +49,65 @@ public class OmsLogHandler { * @param logContent 日志内容 */ public void submitLog(long instanceId, String logContent) { + + if (logQueue.size() > REPORT_SIZE) { + // 线程的生命周期是个不可循环的过程,一个线程对象结束了不能再次start,只能一直创建和销毁 + new Thread(logSubmitter).start(); + } + InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logContent); - logQueue.add(tuple); + logQueue.offer(tuple); } + + private class LogSubmitter implements Runnable { @Override public void run() { - String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); - // 当前无可用 Server - if (StringUtils.isEmpty(serverPath)) { - - // 防止长时间无可用Server导致的堆积 - if (logQueue.size() > MAX_QUEUE_SIZE) { - for (int i = 0; i < 1024; i++) { - logQueue.remove(); - } - log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded some logs."); - } + boolean lockResult = reportLock.tryLock(); + if (!lockResult) { return; } - ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); - List logs = Lists.newLinkedList(); + try { - while (!logQueue.isEmpty()) { - try { - InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS); - logs.add(logContent); - - if (logs.size() >= BATCH_SIZE) { - WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), logs); - // 不可靠请求,WEB日志不追求极致 - serverActor.tell(req, null); - logs.clear(); - } - - }catch (Exception ignore) { - break; + String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); + // 当前无可用 Server + if (StringUtils.isEmpty(serverPath)) { + logQueue.clear(); + log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs."); + return; } - } - if (!logs.isEmpty()) { - WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), logs); - serverActor.tell(req, null); + ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); + List logs = Lists.newLinkedList(); + + while (!logQueue.isEmpty()) { + try { + InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS); + logs.add(logContent); + + if (logs.size() >= BATCH_SIZE) { + WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), Lists.newLinkedList(logs)); + // 不可靠请求,WEB日志不追求极致 + serverActor.tell(req, null); + logs.clear(); + } + + }catch (Exception ignore) { + break; + } + } + + if (!logs.isEmpty()) { + WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), logs); + serverActor.tell(req, null); + } + + }finally { + reportLock.unlock(); } } } diff --git a/others/logs/TestRecord.md b/others/logs/BasicFunctionTestRecord.md similarity index 100% rename from others/logs/TestRecord.md rename to others/logs/BasicFunctionTestRecord.md diff --git a/others/logs/OnlineLogTestRecord.md b/others/logs/OnlineLogTestRecord.md new file mode 100644 index 00000000..7ce8eb94 --- /dev/null +++ b/others/logs/OnlineLogTestRecord.md @@ -0,0 +1,100 @@ +## V1.0.0 +#### 持久化链路 +1. 客户端使用内存队列异步化批量上报服务器 +2. 服务器接收到请求后无脑写H2数据库 +3. 任务结束后,流式同步到MongoDB持久化存储,维护一个包含Array的MongoDB对象 +4. 同步结束后删除本地所有数据 +#### 查询链路 +* 如果本地存在数据,则直接从本地数据库返回 +* 如果本地不存在数据,则直连MongoDB,获取数据再返回 + +*** +问题主要在于前台展示:测试100W条数据,本地H2占用82M,MongoDB未知(因为mongo shell不知道为啥用不了...),不过应该也小不到哪里去。这种情况下数据都没办法回传回来...需要更新方案。 + +```text +org.apache.catalina.connector.ClientAbortException: java.io.IOException: Broken pipe + at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:351) + at org.apache.catalina.connector.OutputBuffer.flushByteBuffer(OutputBuffer.java:776) + at org.apache.catalina.connector.OutputBuffer.append(OutputBuffer.java:681) + at org.apache.catalina.connector.OutputBuffer.writeBytes(OutputBuffer.java:386) + at org.apache.catalina.connector.OutputBuffer.write(OutputBuffer.java:364) + at org.apache.catalina.connector.CoyoteOutputStream.write(CoyoteOutputStream.java:96) + at com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:2137) + at com.fasterxml.jackson.core.json.UTF8JsonGenerator.flush(UTF8JsonGenerator.java:1150) + at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:923) + at org.springframework.http.converter.json.AbstractJackson2HttpMessageConverter.writeInternal(AbstractJackson2HttpMessageConverter.java:287) + at org.springframework.http.converter.AbstractGenericHttpMessageConverter.write(AbstractGenericHttpMessageConverter.java:104) + at org.springframework.web.servlet.mvc.method.annotation.AbstractMessageConverterMethodProcessor.writeWithMessageConverters(AbstractMessageConverterMethodProcessor.java:287) + at org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor.handleReturnValue(RequestResponseBodyMethodProcessor.java:181) + at org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite.handleReturnValue(HandlerMethodReturnValueHandlerComposite.java:82) + at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:123) + at org.springframework.web.servlet.mvc.method.annotation.ExceptionHandlerExceptionResolver.doResolveHandlerMethodException(ExceptionHandlerExceptionResolver.java:403) + at org.springframework.web.servlet.handler.AbstractHandlerMethodExceptionResolver.doResolveException(AbstractHandlerMethodExceptionResolver.java:61) + at org.springframework.web.servlet.handler.AbstractHandlerExceptionResolver.resolveException(AbstractHandlerExceptionResolver.java:141) + at org.springframework.web.servlet.handler.HandlerExceptionResolverComposite.resolveException(HandlerExceptionResolverComposite.java:80) + at org.springframework.web.servlet.DispatcherServlet.processHandlerException(DispatcherServlet.java:1300) + at org.springframework.web.servlet.DispatcherServlet.processDispatchResult(DispatcherServlet.java:1111) + at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1057) + at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) + at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) + at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) + at javax.servlet.http.HttpServlet.service(HttpServlet.java:634) + at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) + at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) + at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) + at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) + at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) + at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) + at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) + at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) + at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) + at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) + at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) + at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) + at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) + at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) + at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) + at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) + at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) + at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) + at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) + at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) + at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) + at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541) + at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) + at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) + at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) + at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) + at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:373) + at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) + at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) + at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1594) + at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) + at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) + at java.lang.Thread.run(Thread.java:748) +Caused by: java.io.IOException: Broken pipe + at sun.nio.ch.FileDispatcherImpl.write0(Native Method) + at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) + at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) + at sun.nio.ch.IOUtil.write(IOUtil.java:65) + at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) + at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:138) + at org.apache.tomcat.util.net.NioBlockingSelector.write(NioBlockingSelector.java:101) + at org.apache.tomcat.util.net.NioSelectorPool.write(NioSelectorPool.java:152) + at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1253) + at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:740) + at org.apache.tomcat.util.net.SocketWrapperBase.writeBlocking(SocketWrapperBase.java:560) + at org.apache.tomcat.util.net.SocketWrapperBase.write(SocketWrapperBase.java:504) + at org.apache.coyote.http11.Http11OutputBuffer$SocketOutputBuffer.doWrite(Http11OutputBuffer.java:538) + at org.apache.coyote.http11.filters.ChunkedOutputFilter.doWrite(ChunkedOutputFilter.java:110) + at org.apache.coyote.http11.Http11OutputBuffer.doWrite(Http11OutputBuffer.java:190) + at org.apache.coyote.Response.doWrite(Response.java:601) + at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:339) + ... 60 common frames omitted + +``` + +## V2.0.0 +>经过小小的调查,mongoDB似乎允许用户直接使用它的文件系统:GridFS,完成文件的存储。那么要不要改成文件对文件的形式呢?同步开始时,先在本地生成日志文件,然后同步到MongoDB。查询时则先下载文件。一旦拥有了完整的文件,分页什么的也就容易实现了,前端展示一次1000行之类的~