[fix] fix the bug of container can't be destory

This commit is contained in:
tjq 2020-07-01 19:28:42 +08:00
parent f332b7dd95
commit 50ad38ba0b
10 changed files with 44 additions and 18 deletions

View File

@ -22,11 +22,21 @@ PowerJob原OhMyScheduler是全新一代分布式调度与计算框架
* 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。
* 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。
[在线试用地址](https://www.yuque.com/powerjob/guidence/hnbskn)
### 适用场景
* 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。
* 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
* 有需要分布式处理的业务场景比如需要更新一大批数据单机执行耗时非常长可以使用Map/MapReduce处理器完成任务的分发调动整个集群加速计算。
* 有需要延迟执行某些任务的业务场景:比如订单过期处理等。
### 设计目标
PowerJob 的设计目标为企业级的分布式任务调度平台,即成为公司内部的**任务调度中间件**。整个公司统一部署调度中心 powerjob-server旗下所有业务线应用只需要依赖 `powerjob-worker` 即可接入调度中心获取任务调度与分布式计算能力。
### 在线试用
试用地址:[try.powerjob.tech](http://try.powerjob.tech/)
试用应用名称powerjob-agent-test
控制台密码123
[建议点击查看试用文档了解相关操作](https://www.yuque.com/powerjob/guidence/hnbskn)
### 同类产品对比
| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob |
@ -43,7 +53,9 @@ PowerJob原OhMyScheduler是全新一代分布式调度与计算框架
# 文档
**[超详细中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)** OR **[备用地址(内容可能更新不及时)](https://kfcfans.github.io/)**
**[中文文档](https://www.yuque.com/powerjob/guidence/ztn4i5)**
**[Document](https://www.yuque.com/powerjob/en/xrdoqw)**
PS感谢文档翻译平台[breword](https://www.breword.com/)对本项目英文文档翻译做出的巨大贡献!

View File

@ -66,9 +66,11 @@ public class OhMyClient {
appId = Long.parseLong(resultDTO.getData().toString());
currentAddress = addr;
break;
}else {
throw new OmsException(resultDTO.getMessage());
}
}
}catch (Exception ignore) {
}catch (IOException ignore) {
}
}

View File

@ -21,7 +21,7 @@ public class TestClient {
@BeforeAll
public static void initClient() throws Exception {
ohMyClient = new OhMyClient("127.0.0.1:7700", "oms-test2", null);
ohMyClient = new OhMyClient("127.0.0.1:7700", "powerjob-agent-test", "123");
}
@Test
@ -70,7 +70,7 @@ public class TestClient {
@Test
public void testRunJob() throws Exception {
System.out.println(ohMyClient.runJob(8L, "this is instanceParams", 20));
System.out.println(ohMyClient.runJob(6L, "this is instanceParams", 60000));
}
@Test

View File

@ -10,6 +10,7 @@ import com.github.kfcfans.powerjob.common.request.WorkerNeedDeployContainerReque
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.ContainerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ContainerInfoRepository;
@ -91,8 +92,10 @@ public class ServerActor extends AbstractActor {
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(req.getContainerId());
AskResponse askResponse = new AskResponse();
if (!containerInfoOpt.isPresent() || containerInfoOpt.get().getStatus() != SwitchableStatus.ENABLE.getV()) {
askResponse.setSuccess(false);
if (containerInfoOpt.isPresent()) {
askResponse.setMessage("can't find container by id: " + req.getContainerId());
}else {
ContainerInfoDO containerInfo = containerInfoOpt.get();
askResponse.setSuccess(true);
@ -104,7 +107,6 @@ public class ServerActor extends AbstractActor {
askResponse.setData(JsonUtils.toBytes(dpReq));
}
getSender().tell(askResponse, getSelf());
}

View File

@ -45,9 +45,9 @@ public class HashedWheelTimer implements Timer {
* 新建时间轮定时器
* @param tickDuration 时间间隔单位毫秒ms
* @param ticksPerWheel 轮盘个数
* @param processTaskNum 处理任务的线程个数0代表不启用新线程如果定时任务需要耗时操作请启用线程池
* @param processThreadNum 处理任务的线程个数0代表不启用新线程如果定时任务需要耗时操作请启用线程池
*/
public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processTaskNum) {
public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) {
this.tickDuration = tickDuration;
@ -60,12 +60,13 @@ public class HashedWheelTimer implements Timer {
mask = wheel.length - 1;
// 初始化执行线程池
if (processTaskNum <= 0) {
if (processThreadNum <= 0) {
taskProcessPool = null;
}else {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(16);
taskProcessPool = new ThreadPoolExecutor(2, processTaskNum,
int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
60, TimeUnit.SECONDS,
queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
}

View File

@ -59,7 +59,7 @@ public class SystemInfoController {
}
String server =appInfoOpt.get().getCurrentServer();
// 没有Server
// 没有 Server说明从来没有该 appId worker 集群连接过
if (StringUtils.isEmpty(server)) {
return ResultDTO.success(Collections.emptyList());
}

View File

@ -31,7 +31,7 @@ public class OhMySchedulerConfig {
// 1. 创建配置文件
OhMyConfig config = new OhMyConfig();
config.setPort(port);
config.setAppName("powerjob");
config.setAppName("powerjob-agent-test");
config.setServerAddress(serverAddress);
// 如果没有大型 Map/MapReduce 的需求建议使用内存来加速计算
// 为了本地模拟多个实例只能使用 MEMORY 启动文件只能由一个应用占有

View File

@ -39,16 +39,21 @@ public class OmsContainerFactory {
/**
* 获取容器
* @param containerId 容器ID
* @param loadFromServer 当本地不存在时尝试从 server 加载
* @return 容器示例可能为 null
*/
public static OmsContainer getContainer(Long containerId) {
public static OmsContainer fetchContainer(Long containerId, boolean loadFromServer) {
OmsContainer omsContainer = CARGO.get(containerId);
if (omsContainer != null) {
return omsContainer;
}
// 尝试下载
if (!loadFromServer) {
return null;
}
// 尝试从 server 加载
log.info("[OmsContainer-{}] can't find the container in factory, try to deploy from server.", containerId);
WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(containerId);
@ -66,6 +71,8 @@ public class OmsContainerFactory {
ServerDeployContainerRequest deployRequest = askResponse.getData(ServerDeployContainerRequest.class);
log.info("[OmsContainer-{}] fetch containerInfo from server successfully.", containerId);
deployContainer(deployRequest);
}else {
log.warn("[OmsContainer-{}] fetch containerInfo failed, reason is {}.", containerId, askResponse.getMessage());
}
}catch (Exception e) {
log.error("[OmsContainer-{}] get container failed, exception is {}", containerId, e.toString());

View File

@ -190,7 +190,7 @@ public class OmsJarContainer implements OmsContainer {
// 需要满足的条件引用计数器减为0 & 有更新的容器出现
if (referenceCount.decrementAndGet() <= 0) {
OmsContainer container = OmsContainerFactory.getContainer(containerId);
OmsContainer container = OmsContainerFactory.fetchContainer(containerId, false);
if (container != this) {
try {
destroy();

View File

@ -309,9 +309,11 @@ public class ProcessorTracker {
String[] split = processorInfo.split("#");
log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", instanceId, split[1], split[0]);
omsContainer = OmsContainerFactory.getContainer(Long.valueOf(split[0]));
omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), true);
if (omsContainer != null) {
processor = omsContainer.getProcessor(split[1]);
}else {
log.warn("[ProcessorTracker-{}] load container failed.", instanceId);
}
break;
default: