omsOnlineLog test failed, need to use new solution

This commit is contained in:
tjq 2020-05-03 17:14:51 +08:00
parent 0d6c60c826
commit 635140177d
6 changed files with 201 additions and 34 deletions

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.server.persistence.local;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
@ -23,6 +24,7 @@ public interface LocalInstanceLogRepository extends JpaRepository<LocalInstanceL
@Transactional
long deleteByInstanceId(Long instanceId);
@CanIgnoreReturnValue
long deleteByInstanceIdInAndLogTimeLessThan(List<Long> instanceIds, Long t);
long countByInstanceId(Long instanceId);

View File

@ -11,6 +11,7 @@ import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.mongodb.gridfs.GridFS;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.FastDateFormat;
import org.springframework.beans.BeanUtils;

View File

@ -0,0 +1,46 @@
package com.github.kfcfans.oms.server.tester;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.oms.worker.log.OmsLogger;
/**
* 测试 Oms 在线日志的性能
*
* @author tjq
* @since 2020/5/3
*/
public class OmsLogPerformanceTester implements BasicProcessor {
private static final int BATCH = 1000;
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger logger = context.getOmsLogger();
// 控制台参数格式为 {"num":10000, "interval": 200}
JSONObject jobParams = JSONObject.parseObject(context.getJobParams());
Long num = jobParams.getLong("num");
Long interval = jobParams.getLong("interval");
RuntimeException re = new RuntimeException("This is a exception~~~");
long times = num / BATCH;
for (long i = 0; i < times; i++) {
for (long j = 0; j < BATCH; j++) {
long index = i * BATCH + j;
logger.info("[OmsLogPerformanceTester] testing omsLogger performance, current index is {}.", index);
}
logger.error("[OmsLogPerformanceTester] Oh, we have an exception to log~", re);
try {
Thread.sleep(interval);
}catch (Exception ignore) {
}
}
logger.info("[OmsLogPerformanceTester] success!");
return new ProcessResult(true, "good job");
}
}

View File

@ -14,6 +14,7 @@ import org.springframework.util.StringUtils;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* 日志处理器
@ -33,10 +34,14 @@ public class OmsLogHandler {
private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue();
// 处理线程需要通过线程池启动
public final Runnable logSubmitter = new LogSubmitter();
// 上报锁只需要一个线程上报即可
private final ReentrantLock reportLock = new ReentrantLock();
private static final int BATCH_SIZE = 10;
private static final int MAX_QUEUE_SIZE = 8096;
// 每次上报携带的数据条数
private static final int BATCH_SIZE = 20;
// 本地囤积阈值
private static final int REPORT_SIZE = 1024;
/**
* 提交日志
@ -44,52 +49,65 @@ public class OmsLogHandler {
* @param logContent 日志内容
*/
public void submitLog(long instanceId, String logContent) {
if (logQueue.size() > REPORT_SIZE) {
// 线程的生命周期是个不可循环的过程一个线程对象结束了不能再次start只能一直创建和销毁
new Thread(logSubmitter).start();
}
InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logContent);
logQueue.add(tuple);
logQueue.offer(tuple);
}
private class LogSubmitter implements Runnable {
@Override
public void run() {
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
// 当前无可用 Server
if (StringUtils.isEmpty(serverPath)) {
// 防止长时间无可用Server导致的堆积
if (logQueue.size() > MAX_QUEUE_SIZE) {
for (int i = 0; i < 1024; i++) {
logQueue.remove();
}
log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded some logs.");
}
boolean lockResult = reportLock.tryLock();
if (!lockResult) {
return;
}
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
List<InstanceLogContent> logs = Lists.newLinkedList();
try {
while (!logQueue.isEmpty()) {
try {
InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);
logs.add(logContent);
if (logs.size() >= BATCH_SIZE) {
WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), logs);
// 不可靠请求WEB日志不追求极致
serverActor.tell(req, null);
logs.clear();
}
}catch (Exception ignore) {
break;
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
// 当前无可用 Server
if (StringUtils.isEmpty(serverPath)) {
logQueue.clear();
log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded all logs.");
return;
}
}
if (!logs.isEmpty()) {
WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), logs);
serverActor.tell(req, null);
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
List<InstanceLogContent> logs = Lists.newLinkedList();
while (!logQueue.isEmpty()) {
try {
InstanceLogContent logContent = logQueue.poll(100, TimeUnit.MILLISECONDS);
logs.add(logContent);
if (logs.size() >= BATCH_SIZE) {
WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), Lists.newLinkedList(logs));
// 不可靠请求WEB日志不追求极致
serverActor.tell(req, null);
logs.clear();
}
}catch (Exception ignore) {
break;
}
}
if (!logs.isEmpty()) {
WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), logs);
serverActor.tell(req, null);
}
}finally {
reportLock.unlock();
}
}
}

View File

@ -0,0 +1,100 @@
## V1.0.0
#### 持久化链路
1. 客户端使用内存队列异步化批量上报服务器
2. 服务器接收到请求后无脑写H2数据库
3. 任务结束后流式同步到MongoDB持久化存储维护一个包含Array的MongoDB对象
4. 同步结束后删除本地所有数据
#### 查询链路
* 如果本地存在数据,则直接从本地数据库返回
* 如果本地不存在数据则直连MongoDB获取数据再返回
***
问题主要在于前台展示测试100W条数据本地H2占用82MMongoDB未知因为mongo shell不知道为啥用不了...),不过应该也小不到哪里去。这种情况下数据都没办法回传回来...需要更新方案。
```text
org.apache.catalina.connector.ClientAbortException: java.io.IOException: Broken pipe
at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:351)
at org.apache.catalina.connector.OutputBuffer.flushByteBuffer(OutputBuffer.java:776)
at org.apache.catalina.connector.OutputBuffer.append(OutputBuffer.java:681)
at org.apache.catalina.connector.OutputBuffer.writeBytes(OutputBuffer.java:386)
at org.apache.catalina.connector.OutputBuffer.write(OutputBuffer.java:364)
at org.apache.catalina.connector.CoyoteOutputStream.write(CoyoteOutputStream.java:96)
at com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:2137)
at com.fasterxml.jackson.core.json.UTF8JsonGenerator.flush(UTF8JsonGenerator.java:1150)
at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:923)
at org.springframework.http.converter.json.AbstractJackson2HttpMessageConverter.writeInternal(AbstractJackson2HttpMessageConverter.java:287)
at org.springframework.http.converter.AbstractGenericHttpMessageConverter.write(AbstractGenericHttpMessageConverter.java:104)
at org.springframework.web.servlet.mvc.method.annotation.AbstractMessageConverterMethodProcessor.writeWithMessageConverters(AbstractMessageConverterMethodProcessor.java:287)
at org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor.handleReturnValue(RequestResponseBodyMethodProcessor.java:181)
at org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite.handleReturnValue(HandlerMethodReturnValueHandlerComposite.java:82)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:123)
at org.springframework.web.servlet.mvc.method.annotation.ExceptionHandlerExceptionResolver.doResolveHandlerMethodException(ExceptionHandlerExceptionResolver.java:403)
at org.springframework.web.servlet.handler.AbstractHandlerMethodExceptionResolver.doResolveException(AbstractHandlerMethodExceptionResolver.java:61)
at org.springframework.web.servlet.handler.AbstractHandlerExceptionResolver.resolveException(AbstractHandlerExceptionResolver.java:141)
at org.springframework.web.servlet.handler.HandlerExceptionResolverComposite.resolveException(HandlerExceptionResolverComposite.java:80)
at org.springframework.web.servlet.DispatcherServlet.processHandlerException(DispatcherServlet.java:1300)
at org.springframework.web.servlet.DispatcherServlet.processDispatchResult(DispatcherServlet.java:1111)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1057)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:634)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:373)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1594)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:138)
at org.apache.tomcat.util.net.NioBlockingSelector.write(NioBlockingSelector.java:101)
at org.apache.tomcat.util.net.NioSelectorPool.write(NioSelectorPool.java:152)
at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1253)
at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:740)
at org.apache.tomcat.util.net.SocketWrapperBase.writeBlocking(SocketWrapperBase.java:560)
at org.apache.tomcat.util.net.SocketWrapperBase.write(SocketWrapperBase.java:504)
at org.apache.coyote.http11.Http11OutputBuffer$SocketOutputBuffer.doWrite(Http11OutputBuffer.java:538)
at org.apache.coyote.http11.filters.ChunkedOutputFilter.doWrite(ChunkedOutputFilter.java:110)
at org.apache.coyote.http11.Http11OutputBuffer.doWrite(Http11OutputBuffer.java:190)
at org.apache.coyote.Response.doWrite(Response.java:601)
at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:339)
... 60 common frames omitted
```
## V2.0.0
>经过小小的调查mongoDB似乎允许用户直接使用它的文件系统:GridFS完成文件的存储。那么要不要改成文件对文件的形式呢同步开始时先在本地生成日志文件然后同步到MongoDB。查询时则先下载文件。一旦拥有了完整的文件分页什么的也就容易实现了前端展示一次1000行之类的