diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/InstanceInfoDO.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/InstanceInfoDO.java index 145f0c57..6b57e6af 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/InstanceInfoDO.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/persistence/core/model/InstanceInfoDO.java @@ -18,7 +18,7 @@ import java.util.Date; @Entity @NoArgsConstructor @AllArgsConstructor -@Table(name = "instance_log", indexes = {@Index(columnList = "jobId"), @Index(columnList = "appId")}) +@Table(name = "instance_info", indexes = {@Index(columnList = "jobId"), @Index(columnList = "appId"), @Index(columnList = "instanceId")}) public class InstanceInfoDO { @Id diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java index 9e5dbe44..f3bdf4f6 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ContainerService.java @@ -329,8 +329,10 @@ public class ContainerService { if (!gridFsManager.exists(GridFsManager.CONTAINER_BUCKET, jarFileName)) { remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version."); - gridFsManager.download(jarWithDependency, GridFsManager.CONTAINER_BUCKET, jarFileName); + gridFsManager.store(jarWithDependency, GridFsManager.CONTAINER_BUCKET, jarFileName); remote.sendText("SYSTEM: upload to GridFS successfully~"); + }else { + remote.sendText("SYSTEM: find the jar resource in remote successfully, so it's no need to upload anymore."); } // 将文件从临时工作目录移动到正式目录 diff --git a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java index 426921d1..c4f178a4 100644 --- a/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java +++ b/oh-my-scheduler-server/src/main/java/com/github/kfcfans/oms/server/service/ha/ClusterStatusHolder.java @@ -8,6 +8,7 @@ import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -25,7 +26,7 @@ public class ClusterStatusHolder { // 集群中所有机器的健康状态 private Map address2Metrics; // 集群中所有机器的容器部署状态 - private Map> containerId2Infos; + private Map> containerId2Infos; // 集群中所有机器的最后心跳时间 private Map address2ActiveTime; @@ -58,10 +59,8 @@ public class ClusterStatusHolder { List containerInfos = heartbeat.getContainerInfos(); if (!CollectionUtils.isEmpty(containerInfos)) { containerInfos.forEach(containerInfo -> { - List infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Lists.newLinkedList()); - // 设置机器地址 - containerInfo.setWorkerAddress(heartbeat.getWorkerAddress()); - infos.add(containerInfo); + Map infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap()); + infos.put(workerAddress, containerInfo); }); } } @@ -121,7 +120,12 @@ public class ClusterStatusHolder { * @return 该容器的部署情况 */ public List getDeployedContainerInfos(Long containerId) { - return containerId2Infos.getOrDefault(containerId, Lists.newLinkedList()); + List res = Lists.newLinkedList(); + containerId2Infos.getOrDefault(containerId, Collections.emptyMap()).forEach((address, info) -> { + info.setWorkerAddress(address); + res.add(info); + }); + return res; } private boolean timeout(String address) { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SerializerUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SerializerUtils.java index af1333cc..99248566 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SerializerUtils.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SerializerUtils.java @@ -8,47 +8,43 @@ import com.esotericsoftware.kryo.util.Pool; /** * 序列化器 + * V1.0.0:对象池,因无法解决反序列化容器类(外部类)的原因而被移除,LastCommitId: a14f554e0085b6a179375a8ca04665434b73c7bd + * V1.2.0:ThreadLocal + 手动设置Kryo所使用的类加载器(默认类加载器为创建kryo的类对象(Kryo.class)的类加载器)实现容器类的序列化和反序列化 * * @author tjq * @since 2020/3/25 */ public class SerializerUtils { - private static final int DEFAULT_CAPACITY = Runtime.getRuntime().availableProcessors(); - private static final Pool kryoPool = new Pool(true, false, DEFAULT_CAPACITY) { - @Override - protected Kryo create() { + //每个线程的 Kryo 实例 + private static final ThreadLocal kryoLocal = ThreadLocal.withInitial(() -> { - Kryo kryo = new Kryo(); - // 关闭序列化注册,会导致性能些许下降,但在分布式环境中,注册类生成ID不一致会导致错误 - kryo.setRegistrationRequired(false); - // 支持循环引用,也会导致性能些许下降 T_T - kryo.setReferences(true); - return kryo; - } - }; + Kryo kryo = new Kryo(); + // 支持对象循环引用(否则会栈溢出),会导致性能些许下降 T_T + kryo.setReferences(true); //默认值就是 true,添加此行的目的是为了提醒维护者,不要改变这个配置 + // 关闭序列化注册,会导致性能些许下降,但在分布式环境中,注册类生成ID不一致会导致错误 + kryo.setRegistrationRequired(false); + // 设置类加载器为线程上下文类加载器(如果Processor来源于容器,必须使用容器的类加载器,否则妥妥的CNF) + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + + return kryo; + }); public static byte[] serialize(Object obj) { - Kryo kryo = kryoPool.obtain(); + Kryo kryo = kryoLocal.get(); // 使用 Output 对象池会导致序列化重复的错误(getBuffer返回了Output对象的buffer引用) try (Output opt = new Output(1024, -1)) { kryo.writeClassAndObject(opt, obj); opt.flush(); return opt.getBuffer(); - }finally { - kryoPool.free(kryo); } } public static Object deSerialized(byte[] buffer) { - Kryo kryo = kryoPool.obtain(); - try { - return kryo.readClassAndObject(new Input(buffer)); - }finally { - kryoPool.free(kryo); - } + Kryo kryo = kryoLocal.get(); + return kryo.readClassAndObject(new Input(buffer)); } } diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainer.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainer.java index 052ddd67..431e777a 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainer.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsContainer.java @@ -17,6 +17,12 @@ public interface OmsContainer extends LifeCycle { */ BasicProcessor getProcessor(String className); + /** + * 获取容器的类加载器 + * @return 类加载器 + */ + OhMyClassLoader getContainerClassLoader(); + Long getContainerId(); Long getDeployedTime(); String getName(); diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java index 59c242c0..3cb2699f 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/container/OmsJarContainer.java @@ -99,7 +99,7 @@ public class OmsJarContainer implements OmsContainer { URL jarURL = localJarFile.toURI().toURL(); - // 创建类加载器 + // 创建类加载器(父类加载为 Worker 的类加载) this.containerClassLoader = new OhMyClassLoader(new URL[]{jarURL}, this.getClass().getClassLoader()); // 获取资源文件 @@ -180,6 +180,10 @@ public class OmsJarContainer implements OmsContainer { public Long getDeployedTime() { return deployedTime; } + @Override + public OhMyClassLoader getContainerClassLoader() { + return containerClassLoader; + } @Override public void tryRelease() { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java index 107ab83b..d107c638 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/executor/ProcessorRunnable.java @@ -42,6 +42,8 @@ public class ProcessorRunnable implements Runnable { private final TaskDO task; private final BasicProcessor processor; private final OmsLogger omsLogger; + // 类加载器 + private final ClassLoader classLoader; public void innerRun() throws InterruptedException { @@ -175,6 +177,8 @@ public class ProcessorRunnable implements Runnable { @Override public void run() { + // 切换线程上下文类加载器(否则用的是 Worker 类加载器,不存在容器类,在序列化/反序列化时会报 ClassNotFoundException) + Thread.currentThread().setContextClassLoader(classLoader); try { innerRun(); }catch (InterruptedException ignore) { diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java index e7a47ecf..7a262ed0 100644 --- a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/core/tracker/processor/ProcessorTracker.java @@ -121,7 +121,8 @@ public class ProcessorTracker { newTask.setInstanceId(instanceInfo.getInstanceId()); newTask.setAddress(taskTrackerAddress); - ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger); + ClassLoader classLoader = omsContainer == null ? getClass().getClassLoader() : omsContainer.getContainerClassLoader(); + ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader); try { threadPool.submit(processorRunnable); success = true; diff --git a/others/logs/ContainerTestRecord.md b/others/logs/ContainerTestRecord.md new file mode 100644 index 00000000..e390e336 --- /dev/null +++ b/others/logs/ContainerTestRecord.md @@ -0,0 +1,34 @@ +# 容器测试日志 +## ClassNotFound问题 +>玩热加载这一套,不来几个ClassNotFound都没那味 [滑稽]~ + +测试容器化的MapReduce任务时,发现如下错误: +```text +2020-05-19 09:33:18 ERROR - [ProcessorRunnable-142925055284740224] execute failed, please fix this bug @tjq! +com.esotericsoftware.kryo.KryoException: Unable to find class: cn.edu.zju.oms.container.ContainerMRProcessor$TestSubTask + at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:182) + at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:151) + at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:684) + at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:795) + at com.github.kfcfans.oms.worker.common.utils.SerializerUtils.deSerialized(SerializerUtils.java:48) + at com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable.innerRun(ProcessorRunnable.java:63) + at com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable.run(ProcessorRunnable.java:179) + at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) + at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) + at java.util.concurrent.FutureTask.run(FutureTask.java) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) + at java.lang.Thread.run(Thread.java:748) +Caused by: java.lang.ClassNotFoundException: cn.edu.zju.oms.container.ContainerMRProcessor$TestSubTask + at java.net.URLClassLoader.findClass(URLClassLoader.java:382) + at java.lang.ClassLoader.loadClass(ClassLoader.java:418) + at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) + at java.lang.ClassLoader.loadClass(ClassLoader.java:351) + at java.lang.Class.forName0(Native Method) + at java.lang.Class.forName(Class.java:348) + at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:176) + ... 12 common frames omitted +``` + +* 原因分析:经过分析,原有在于序列化与反序列化过程中,框架为了追求性能,采用了**对象池**技术(库存代码: a14f554e0085b6a179375a8ca04665434b73c7bd#SerializerUtils),而Kryo在序列化和反序列化过程中只会使用固定的类加载器(创建kryo的类对象(Kryo.class)的类加载器),因此无法找到由OMS自定义类加载器创建的容器类。 +* 解决方案:弃用性能优异的对象池技术,该用ThreadLocal + 手动设置Kryo类加载器。 \ No newline at end of file