fix serialize bug (Object Pool is a good thing but you have to use carefully)

This commit is contained in:
tjq 2020-03-26 11:34:50 +08:00
parent d44f818b81
commit 63f082c3b3
15 changed files with 110 additions and 53 deletions

View File

@ -21,7 +21,7 @@
<hikaricp.version>3.4.2</hikaricp.version>
<guava.version>28.2-jre</guava.version>
<junit.version>5.6.1</junit.version>
<fst.version>2.56</fst.version>
<kryo.version>5.0.0-RC5</kryo.version>
</properties>
<dependencies>
@ -68,11 +68,11 @@
<version>${guava.version}</version>
</dependency>
<!-- FST 超超超高性能序列化框架 -->
<!-- kryo 超超超高性能序列化框架 -->
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>${fst.version}</version>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>${kryo.version}</version>
</dependency>
<!-- Junit 测试 -->
@ -91,10 +91,6 @@
<version>1.2.3</version>
</dependency>
</dependencies>

View File

@ -2,6 +2,8 @@ package com.github.kfcfans.oms.worker.common;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import java.util.concurrent.atomic.AtomicLong;
/**
* 存储一些不方便直接传递的东西
* #attention警惕内存泄漏问题最好在 ProcessorTracker destroy 执行 remove
@ -13,4 +15,6 @@ public class ThreadLocalStore {
public static final ThreadLocal<TaskContext> TASK_CONTEXT_THREAD_LOCAL = new ThreadLocal<>();
public static final ThreadLocal<AtomicLong> TASK_ID_THREAD_LOCAL = new ThreadLocal<>();
}

View File

@ -1,35 +1,54 @@
package com.github.kfcfans.oms.worker.common.utils;
import org.nustaq.serialization.FSTConfiguration;
import java.nio.charset.StandardCharsets;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.util.Pool;
/**
* 序列化框架
* 序列化
*
* @author tjq
* @since 2020/3/25
*/
public class SerializerUtils {
private static FSTConfiguration conf = FSTConfiguration.createDefaultConfiguration();
private static final int DEFAULT_CAPACITY = Runtime.getRuntime().availableProcessors();
private static final Pool<Kryo> kryoPool = new Pool<Kryo>(true, false, DEFAULT_CAPACITY) {
@Override
protected Kryo create() {
Kryo kryo = new Kryo();
// 关闭序列化注册会导致性能些许下降但在分布式环境中注册类生成ID不一致会导致错误
kryo.setRegistrationRequired(false);
// 支持循环引用也会导致性能些许下降 T_T
kryo.setReferences(true);
return kryo;
}
};
public static byte[] serialize(Object obj) {
return conf.asByteArray(obj);
}
public static Object deSerialized(byte[] bytes) {
return conf.asObject(bytes);
}
Kryo kryo = kryoPool.obtain();
public static String toJSON(Object object) {
if (object == null) {
return null;
// 使用 Output 对象池会导致序列化重复的错误getBuffer返回了Output对象的buffer引用
try (Output opt = new Output(1024, -1)) {
kryo.writeClassAndObject(opt, obj);
opt.flush();
return opt.getBuffer();
}finally {
kryoPool.free(kryo);
}
}
public static Object deSerialized(byte[] buffer) {
Kryo kryo = kryoPool.obtain();
try {
return new String(serialize(object), StandardCharsets.UTF_8);
}catch (Exception ignore) {
return kryo.readClassAndObject(new Input(buffer));
}finally {
kryoPool.free(kryo);
}
return null;
}
}

View File

@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* Processor 执行器
@ -57,6 +58,7 @@ public class ProcessorRunnable implements Runnable {
taskContext.setSubTask(SerializerUtils.deSerialized(request.getSubTaskContent()));
}
ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
ThreadLocalStore.TASK_ID_THREAD_LOCAL.set(new AtomicLong(0));
reportStatus(TaskStatus.PROCESSING, null);

View File

@ -96,12 +96,14 @@ public class ProcessorTracker {
newTask.setFailedCnt(newTaskReq.getCurrentRetryTimes());
newTask.setCreatedTime(System.currentTimeMillis());
newTask.setLastModifiedTime(System.currentTimeMillis());
// 特殊处理 instanceId防止冲突
newTask.setInstanceId(getSPInstanceId(instanceId));
boolean save = TaskPersistenceService.INSTANCE.save(newTask);
if (save) {
log.debug("[RejectedProcessorHandler] persistent task({}) succeed.", newTask);
log.debug("[ProcessorTracker] persistent task({}) succeed.", newTask);
}else {
log.warn("[RejectedProcessorHandler] persistent task({}) failed.", newTask);
log.warn("[ProcessorTracker] persistent task({}) failed.", newTask);
}
return;
}
@ -157,7 +159,9 @@ public class ProcessorTracker {
}
TaskPersistenceService taskPersistenceService = TaskPersistenceService.INSTANCE;
List<TaskDO> taskDOList =taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.RECEIVE_SUCCESS, MAX_QUEUE_SIZE / 2);
// 查询时也用特殊处理的 instanceId 查即可
List<TaskDO> taskDOList =taskPersistenceService.getTaskByStatus(getSPInstanceId(instanceId), TaskStatus.RECEIVE_SUCCESS, MAX_QUEUE_SIZE / 2);
if (CollectionUtils.isEmpty(taskDOList)) {
return;
@ -169,12 +173,15 @@ public class ProcessorTracker {
// 提交到线程池执行
taskDOList.forEach(task -> {
// 还原 instanceId
task.setInstanceId(instanceId);
runTask(task);
deletedIds.add(task.getTaskId());
});
// 删除任务
taskPersistenceService.batchDelete(instanceId, deletedIds);
// 删除任务需要使用特殊instanceId
taskPersistenceService.batchDelete(getSPInstanceId(instanceId), deletedIds);
}
private void runTask(TaskDO task) {
@ -195,4 +202,8 @@ public class ProcessorTracker {
threadPool.submit(processorRunnable);
}
}
private static String getSPInstanceId(String instanceId) {
return "L" + instanceId;
}
}

View File

@ -92,10 +92,17 @@ public class TaskTracker {
// 1. 读取当前Task状态防止过期消息重置任务状态
if (!force) {
TaskDO originTask = taskPersistenceService.selectTaskByKey(instanceId, taskId);
if (originTask.getStatus() > status) {
TaskStatus originTaskStatus = taskPersistenceService.getTaskStatus(instanceId, taskId);
if (originTaskStatus == null) {
log.warn("[TaskTracker] database may overload...");
return;
}
if (originTaskStatus.getValue() > status) {
log.warn("[TaskTracker] task(instanceId={},taskId={},dbStatus={},requestStatus={}) status conflict, this request will be drop.",
instanceId, taskId, originTask.getStatus(), status);
instanceId, taskId, originTaskStatus, status);
return;
}
}

View File

@ -36,7 +36,7 @@ public class ConnectionFactory {
// 池中最小空闲连接数量
config.setMinimumIdle(2);
// 池中最大连接数量
config.setMaximumPoolSize(16);
config.setMaximumPoolSize(32);
dataSource = new HikariDataSource(config);
}
}

View File

@ -66,7 +66,7 @@ public class TaskDAOImpl implements TaskDAO {
@Override
public int batchDelete(String instanceId, List<String> taskIds) {
String deleteSQL = "delete from task_info where instance_id = %s and task_id in %s";
String deleteSQL = "delete from task_info where instance_id = '%s' and task_id in %s";
String sql = String.format(deleteSQL, instanceId, getInStringCondition(taskIds));
try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) {

View File

@ -63,7 +63,6 @@ public class TaskDO {
", jobId='" + jobId + '\'' +
", instanceId='" + instanceId + '\'' +
", taskName='" + taskName + '\'' +
", taskContent=" + (taskContent == null ? "" : new String(taskContent)) +
", address='" + address + '\'' +
", status=" + status +
", result='" + result + '\'' +

View File

@ -113,17 +113,22 @@ public class TaskPersistenceService {
}
/**
* 根据主键查找任务
* 查询任务状态只查询 status节约 I/O 资源
*/
public TaskDO selectTaskByKey(String instanceId, String taskId) {
public TaskStatus getTaskStatus(String instanceId, String taskId) {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setTaskId(taskId);
List<TaskDO> results = taskDAO.simpleQuery(query);
if (CollectionUtils.isEmpty(results)) {
query.setQueryContent(" STATUS ");
List<Map<String, Object>> rows = taskDAO.simpleQueryPlus(query);
if (CollectionUtils.isEmpty(rows)) {
return null;
}
return results.get(0);
return TaskStatus.of((int) rows.get(0).get("STATUS"));
}
/**

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.worker.pojo.request;
import com.github.kfcfans.oms.worker.common.ThreadLocalStore;
import com.github.kfcfans.oms.worker.common.utils.SerializerUtils;
import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.google.common.collect.Lists;
@ -39,13 +40,11 @@ public class ProcessorMapTaskRequest implements Serializable {
this.taskName = taskName;
this.subTasks = Lists.newLinkedList();
for (int i = 0; i < subTaskList.size(); i++) {
// 不同执行线程之间前缀taskId不同该ID可以保证分布式唯一
String subTaskId = taskContext.getTaskId() + "." + i;
subTaskList.forEach(subTask -> {
// 同一个 Task 内部可能多次 Map因此还是要确保线程级别的唯一
String subTaskId = taskContext.getTaskId() + "." + ThreadLocalStore.TASK_ID_THREAD_LOCAL.get().getAndIncrement();
// 写入类名方便反序列化
byte[] content = SerializerUtils.serialize(subTaskList.get(i));
subTasks.add(new SubTask(subTaskId, content));
}
subTasks.add(new SubTask(subTaskId, SerializerUtils.serialize(subTask)));
});
}
}

View File

@ -43,4 +43,9 @@ public class TaskContext {
", instanceParams='" + instanceParams + '\'' +
", taskTrackerAddress='" + taskTrackerAddress;
}
@Override
public String toString() {
return getDescription();
}
}

View File

@ -21,4 +21,9 @@ public class UtilsTest {
public void testSystemInfoUtils() {
System.out.println(SystemInfoUtils.getSystemMetrics());
}
@Test
public void testSerializeUtils() {
}
}

View File

@ -37,14 +37,19 @@ public class TestMapReduceProcessor extends MapReduceProcessor {
if (isRootTask()) {
System.out.println("start to map");
List<TestSubTask> subTasks = Lists.newLinkedList();
for (int i = 0; i < 100; i++) {
subTasks.add(new TestSubTask("name" + i, i));
for (int j = 0; j < 2; j++) {
for (int i = 0; i < 100; i++) {
int x = j * 100 + i;
subTasks.add(new TestSubTask("name" + x, x));
}
ProcessResult mapResult = map(subTasks, "MAP_TEST_TASK");
System.out.println("map result = " + mapResult);
subTasks.clear();
}
ProcessResult mapResult = map(subTasks, "MAP_TEST_TASK");
System.out.println("map result = " + mapResult);
return new ProcessResult(true, "MAP_SUCCESS");
}else {
System.out.println("start to process");
Thread.sleep(1000);
System.out.println(context.getSubTask());
return new ProcessResult(true, "PROCESS_SUCCESS");
}
@ -54,7 +59,7 @@ public class TestMapReduceProcessor extends MapReduceProcessor {
@ToString
@NoArgsConstructor
@AllArgsConstructor
private static class TestSubTask implements Serializable {
private static class TestSubTask {
private String name;
private int age;
}

View File

@ -11,7 +11,7 @@
</encoder>
</appender>
<logger name="com.zaxxer.hikari" level="ERROR">
<logger name="com.zaxxer.hikari" level="INFO">
<appender-ref ref="STDOUT"/>
</logger>