mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
[fix] fix ClassNotFoundException when deSerialized the container class
This commit is contained in:
parent
a14f554e00
commit
7537cc1f5d
@ -18,7 +18,7 @@ import java.util.Date;
|
|||||||
@Entity
|
@Entity
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
@Table(name = "instance_log", indexes = {@Index(columnList = "jobId"), @Index(columnList = "appId")})
|
@Table(name = "instance_info", indexes = {@Index(columnList = "jobId"), @Index(columnList = "appId"), @Index(columnList = "instanceId")})
|
||||||
public class InstanceInfoDO {
|
public class InstanceInfoDO {
|
||||||
|
|
||||||
@Id
|
@Id
|
||||||
|
@ -329,8 +329,10 @@ public class ContainerService {
|
|||||||
|
|
||||||
if (!gridFsManager.exists(GridFsManager.CONTAINER_BUCKET, jarFileName)) {
|
if (!gridFsManager.exists(GridFsManager.CONTAINER_BUCKET, jarFileName)) {
|
||||||
remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version.");
|
remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version.");
|
||||||
gridFsManager.download(jarWithDependency, GridFsManager.CONTAINER_BUCKET, jarFileName);
|
gridFsManager.store(jarWithDependency, GridFsManager.CONTAINER_BUCKET, jarFileName);
|
||||||
remote.sendText("SYSTEM: upload to GridFS successfully~");
|
remote.sendText("SYSTEM: upload to GridFS successfully~");
|
||||||
|
}else {
|
||||||
|
remote.sendText("SYSTEM: find the jar resource in remote successfully, so it's no need to upload anymore.");
|
||||||
}
|
}
|
||||||
|
|
||||||
// 将文件从临时工作目录移动到正式目录
|
// 将文件从临时工作目录移动到正式目录
|
||||||
|
@ -8,6 +8,7 @@ import com.google.common.collect.Maps;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@ -25,7 +26,7 @@ public class ClusterStatusHolder {
|
|||||||
// 集群中所有机器的健康状态
|
// 集群中所有机器的健康状态
|
||||||
private Map<String, SystemMetrics> address2Metrics;
|
private Map<String, SystemMetrics> address2Metrics;
|
||||||
// 集群中所有机器的容器部署状态
|
// 集群中所有机器的容器部署状态
|
||||||
private Map<Long, List<DeployedContainerInfo>> containerId2Infos;
|
private Map<Long, Map<String, DeployedContainerInfo>> containerId2Infos;
|
||||||
// 集群中所有机器的最后心跳时间
|
// 集群中所有机器的最后心跳时间
|
||||||
private Map<String, Long> address2ActiveTime;
|
private Map<String, Long> address2ActiveTime;
|
||||||
|
|
||||||
@ -58,10 +59,8 @@ public class ClusterStatusHolder {
|
|||||||
List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
|
List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
|
||||||
if (!CollectionUtils.isEmpty(containerInfos)) {
|
if (!CollectionUtils.isEmpty(containerInfos)) {
|
||||||
containerInfos.forEach(containerInfo -> {
|
containerInfos.forEach(containerInfo -> {
|
||||||
List<DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Lists.newLinkedList());
|
Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());
|
||||||
// 设置机器地址
|
infos.put(workerAddress, containerInfo);
|
||||||
containerInfo.setWorkerAddress(heartbeat.getWorkerAddress());
|
|
||||||
infos.add(containerInfo);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -121,7 +120,12 @@ public class ClusterStatusHolder {
|
|||||||
* @return 该容器的部署情况
|
* @return 该容器的部署情况
|
||||||
*/
|
*/
|
||||||
public List<DeployedContainerInfo> getDeployedContainerInfos(Long containerId) {
|
public List<DeployedContainerInfo> getDeployedContainerInfos(Long containerId) {
|
||||||
return containerId2Infos.getOrDefault(containerId, Lists.newLinkedList());
|
List<DeployedContainerInfo> res = Lists.newLinkedList();
|
||||||
|
containerId2Infos.getOrDefault(containerId, Collections.emptyMap()).forEach((address, info) -> {
|
||||||
|
info.setWorkerAddress(address);
|
||||||
|
res.add(info);
|
||||||
|
});
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean timeout(String address) {
|
private boolean timeout(String address) {
|
||||||
|
@ -8,47 +8,43 @@ import com.esotericsoftware.kryo.util.Pool;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* 序列化器
|
* 序列化器
|
||||||
|
* V1.0.0:对象池,因无法解决反序列化容器类(外部类)的原因而被移除,LastCommitId: a14f554e0085b6a179375a8ca04665434b73c7bd
|
||||||
|
* V1.2.0:ThreadLocal + 手动设置Kryo所使用的类加载器(默认类加载器为创建kryo的类对象(Kryo.class)的类加载器)实现容器类的序列化和反序列化
|
||||||
*
|
*
|
||||||
* @author tjq
|
* @author tjq
|
||||||
* @since 2020/3/25
|
* @since 2020/3/25
|
||||||
*/
|
*/
|
||||||
public class SerializerUtils {
|
public class SerializerUtils {
|
||||||
|
|
||||||
private static final int DEFAULT_CAPACITY = Runtime.getRuntime().availableProcessors();
|
//每个线程的 Kryo 实例
|
||||||
private static final Pool<Kryo> kryoPool = new Pool<Kryo>(true, false, DEFAULT_CAPACITY) {
|
private static final ThreadLocal<Kryo> kryoLocal = ThreadLocal.withInitial(() -> {
|
||||||
@Override
|
|
||||||
protected Kryo create() {
|
|
||||||
|
|
||||||
Kryo kryo = new Kryo();
|
Kryo kryo = new Kryo();
|
||||||
// 关闭序列化注册,会导致性能些许下降,但在分布式环境中,注册类生成ID不一致会导致错误
|
// 支持对象循环引用(否则会栈溢出),会导致性能些许下降 T_T
|
||||||
kryo.setRegistrationRequired(false);
|
kryo.setReferences(true); //默认值就是 true,添加此行的目的是为了提醒维护者,不要改变这个配置
|
||||||
// 支持循环引用,也会导致性能些许下降 T_T
|
// 关闭序列化注册,会导致性能些许下降,但在分布式环境中,注册类生成ID不一致会导致错误
|
||||||
kryo.setReferences(true);
|
kryo.setRegistrationRequired(false);
|
||||||
return kryo;
|
// 设置类加载器为线程上下文类加载器(如果Processor来源于容器,必须使用容器的类加载器,否则妥妥的CNF)
|
||||||
}
|
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
|
||||||
};
|
|
||||||
|
return kryo;
|
||||||
|
});
|
||||||
|
|
||||||
public static byte[] serialize(Object obj) {
|
public static byte[] serialize(Object obj) {
|
||||||
|
|
||||||
Kryo kryo = kryoPool.obtain();
|
Kryo kryo = kryoLocal.get();
|
||||||
|
|
||||||
// 使用 Output 对象池会导致序列化重复的错误(getBuffer返回了Output对象的buffer引用)
|
// 使用 Output 对象池会导致序列化重复的错误(getBuffer返回了Output对象的buffer引用)
|
||||||
try (Output opt = new Output(1024, -1)) {
|
try (Output opt = new Output(1024, -1)) {
|
||||||
kryo.writeClassAndObject(opt, obj);
|
kryo.writeClassAndObject(opt, obj);
|
||||||
opt.flush();
|
opt.flush();
|
||||||
return opt.getBuffer();
|
return opt.getBuffer();
|
||||||
}finally {
|
|
||||||
kryoPool.free(kryo);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Object deSerialized(byte[] buffer) {
|
public static Object deSerialized(byte[] buffer) {
|
||||||
Kryo kryo = kryoPool.obtain();
|
Kryo kryo = kryoLocal.get();
|
||||||
try {
|
return kryo.readClassAndObject(new Input(buffer));
|
||||||
return kryo.readClassAndObject(new Input(buffer));
|
|
||||||
}finally {
|
|
||||||
kryoPool.free(kryo);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,12 @@ public interface OmsContainer extends LifeCycle {
|
|||||||
*/
|
*/
|
||||||
BasicProcessor getProcessor(String className);
|
BasicProcessor getProcessor(String className);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取容器的类加载器
|
||||||
|
* @return 类加载器
|
||||||
|
*/
|
||||||
|
OhMyClassLoader getContainerClassLoader();
|
||||||
|
|
||||||
Long getContainerId();
|
Long getContainerId();
|
||||||
Long getDeployedTime();
|
Long getDeployedTime();
|
||||||
String getName();
|
String getName();
|
||||||
|
@ -99,7 +99,7 @@ public class OmsJarContainer implements OmsContainer {
|
|||||||
|
|
||||||
URL jarURL = localJarFile.toURI().toURL();
|
URL jarURL = localJarFile.toURI().toURL();
|
||||||
|
|
||||||
// 创建类加载器
|
// 创建类加载器(父类加载为 Worker 的类加载)
|
||||||
this.containerClassLoader = new OhMyClassLoader(new URL[]{jarURL}, this.getClass().getClassLoader());
|
this.containerClassLoader = new OhMyClassLoader(new URL[]{jarURL}, this.getClass().getClassLoader());
|
||||||
|
|
||||||
// 获取资源文件
|
// 获取资源文件
|
||||||
@ -180,6 +180,10 @@ public class OmsJarContainer implements OmsContainer {
|
|||||||
public Long getDeployedTime() {
|
public Long getDeployedTime() {
|
||||||
return deployedTime;
|
return deployedTime;
|
||||||
}
|
}
|
||||||
|
@Override
|
||||||
|
public OhMyClassLoader getContainerClassLoader() {
|
||||||
|
return containerClassLoader;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void tryRelease() {
|
public void tryRelease() {
|
||||||
|
@ -42,6 +42,8 @@ public class ProcessorRunnable implements Runnable {
|
|||||||
private final TaskDO task;
|
private final TaskDO task;
|
||||||
private final BasicProcessor processor;
|
private final BasicProcessor processor;
|
||||||
private final OmsLogger omsLogger;
|
private final OmsLogger omsLogger;
|
||||||
|
// 类加载器
|
||||||
|
private final ClassLoader classLoader;
|
||||||
|
|
||||||
public void innerRun() throws InterruptedException {
|
public void innerRun() throws InterruptedException {
|
||||||
|
|
||||||
@ -175,6 +177,8 @@ public class ProcessorRunnable implements Runnable {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
// 切换线程上下文类加载器(否则用的是 Worker 类加载器,不存在容器类,在序列化/反序列化时会报 ClassNotFoundException)
|
||||||
|
Thread.currentThread().setContextClassLoader(classLoader);
|
||||||
try {
|
try {
|
||||||
innerRun();
|
innerRun();
|
||||||
}catch (InterruptedException ignore) {
|
}catch (InterruptedException ignore) {
|
||||||
|
@ -121,7 +121,8 @@ public class ProcessorTracker {
|
|||||||
newTask.setInstanceId(instanceInfo.getInstanceId());
|
newTask.setInstanceId(instanceInfo.getInstanceId());
|
||||||
newTask.setAddress(taskTrackerAddress);
|
newTask.setAddress(taskTrackerAddress);
|
||||||
|
|
||||||
ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger);
|
ClassLoader classLoader = omsContainer == null ? getClass().getClassLoader() : omsContainer.getContainerClassLoader();
|
||||||
|
ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader);
|
||||||
try {
|
try {
|
||||||
threadPool.submit(processorRunnable);
|
threadPool.submit(processorRunnable);
|
||||||
success = true;
|
success = true;
|
||||||
|
34
others/logs/ContainerTestRecord.md
Normal file
34
others/logs/ContainerTestRecord.md
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
# 容器测试日志
|
||||||
|
## ClassNotFound问题
|
||||||
|
>玩热加载这一套,不来几个ClassNotFound都没那味 [滑稽]~
|
||||||
|
|
||||||
|
测试容器化的MapReduce任务时,发现如下错误:
|
||||||
|
```text
|
||||||
|
2020-05-19 09:33:18 ERROR - [ProcessorRunnable-142925055284740224] execute failed, please fix this bug @tjq!
|
||||||
|
com.esotericsoftware.kryo.KryoException: Unable to find class: cn.edu.zju.oms.container.ContainerMRProcessor$TestSubTask
|
||||||
|
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:182)
|
||||||
|
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:151)
|
||||||
|
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:684)
|
||||||
|
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:795)
|
||||||
|
at com.github.kfcfans.oms.worker.common.utils.SerializerUtils.deSerialized(SerializerUtils.java:48)
|
||||||
|
at com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable.innerRun(ProcessorRunnable.java:63)
|
||||||
|
at com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable.run(ProcessorRunnable.java:179)
|
||||||
|
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
|
||||||
|
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
|
||||||
|
at java.util.concurrent.FutureTask.run(FutureTask.java)
|
||||||
|
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
|
||||||
|
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
|
||||||
|
at java.lang.Thread.run(Thread.java:748)
|
||||||
|
Caused by: java.lang.ClassNotFoundException: cn.edu.zju.oms.container.ContainerMRProcessor$TestSubTask
|
||||||
|
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
|
||||||
|
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
|
||||||
|
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
|
||||||
|
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
|
||||||
|
at java.lang.Class.forName0(Native Method)
|
||||||
|
at java.lang.Class.forName(Class.java:348)
|
||||||
|
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:176)
|
||||||
|
... 12 common frames omitted
|
||||||
|
```
|
||||||
|
|
||||||
|
* 原因分析:经过分析,原有在于序列化与反序列化过程中,框架为了追求性能,采用了**对象池**技术(库存代码: a14f554e0085b6a179375a8ca04665434b73c7bd#SerializerUtils),而Kryo在序列化和反序列化过程中只会使用固定的类加载器(创建kryo的类对象(Kryo.class)的类加载器),因此无法找到由OMS自定义类加载器创建的容器类。
|
||||||
|
* 解决方案:弃用性能优异的对象池技术,该用ThreadLocal + 手动设置Kryo类加载器。
|
Loading…
x
Reference in New Issue
Block a user