[dev] add cluster info cleaner

This commit is contained in:
tjq 2020-06-24 20:01:53 +08:00
parent b189cd0ba7
commit 24d098d863
10 changed files with 52 additions and 27 deletions

View File

@ -26,7 +26,7 @@ docker run -d \
--name powerjob-server \
-p 7700:7700 -p 10086:10086 -p 5001:5005 -p 10001:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
-e PARAMS="--spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://124.70.67.79:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --spring.data.mongodb.uri=mongodb://124.70.67.79:27017/powerjob-product" \
-e PARAMS="--spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://139.224.83.134:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --spring.data.mongodb.uri=mongodb://139.224.83.134:27017/powerjob-product" \
-v ~/docker/powerjob-server:/root/powerjob-server -v ~/.m2:/root/.m2 \
tjqq/powerjob-server:latest
sleep 60
@ -45,8 +45,8 @@ docker run -d \
tjqq/powerjob-agent:latest
docker run -d \
--name powerjob-agent2 \
--restart=always \
--name powerjob-agent2 \
-p 27778:27777 -p 5003:5005 -p 10003:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
-e PARAMS="--app powerjob-agent-test --server $serverAddress" \

View File

@ -43,7 +43,7 @@ public class InstanceDetail implements OmsSerializable {
private String startTime;
private String finishedTime;
private String result;
private String status;
private int status;
}
// MapReduce Broadcast 任务的 extra ->

View File

@ -30,7 +30,7 @@ public class ClusterStatusHolder {
// 集群中所有机器的最后心跳时间
private Map<String, Long> address2ActiveTime;
private static final long WORKER_TIMEOUT_MS = 30000;
private static final long WORKER_TIMEOUT_MS = 60000;
public ClusterStatusHolder(String appName) {
this.appName = appName;
@ -131,10 +131,25 @@ public class ClusterStatusHolder {
/**
* 释放所有本地存储的容器信息该操作会导致短暂的 listDeployedContainer 服务不可用
*/
public void releaseContainerInfos() {
public void release() {
log.info("[ClusterStatusHolder-{}] clean the containerInfos, listDeployedContainer service may down about 1min~", appName);
// 丢弃原来的所有数据准备重建
containerId2Infos = Maps.newConcurrentMap();
// 丢弃超时机器的信息
List<String> timeoutAddress = Lists.newLinkedList();
address2Metrics.forEach((addr, lastActiveTime) -> {
if (timeout(addr)) {
timeoutAddress.add(addr);
}
});
if (!timeoutAddress.isEmpty()) {
log.info("[ClusterStatusHolder-{}] detective timeout workers({}), try to release their infos.", appName, timeoutAddress);
timeoutAddress.forEach(addr -> {
address2ActiveTime.remove(addr);
address2Metrics.remove(addr);
});
}
}
private boolean timeout(String address) {

View File

@ -94,9 +94,9 @@ public class WorkerManagerService {
}
/**
* 释放所有本地存储的容器信息该操作会导致短暂的 listDeployedContainer 服务不可用
* 清理缓存信息防止 OOM
*/
public static void releaseContainerInfos() {
appId2ClusterStatus.values().forEach(ClusterStatusHolder::releaseContainerInfos);
public static void cleanUp() {
appId2ClusterStatus.values().forEach(ClusterStatusHolder::release);
}
}

View File

@ -5,7 +5,6 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceIn
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.mongodb.GridFsManager;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
@ -59,7 +58,7 @@ public class CleanService {
public void timingClean() {
// 释放本地缓存
WorkerManagerService.releaseContainerInfos();
WorkerManagerService.cleanUp();
// 删除数据库运行记录
cleanInstanceLog();

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -497,7 +497,7 @@ eval("module.exports = __webpack_require__.p + \"img/powerjob-console-logo.ac01c
/***/ (function(module, __webpack_exports__, __webpack_require__) {
"use strict";
eval("__webpack_require__.r(__webpack_exports__);\n/* harmony import */ var _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__ = __webpack_require__(/*! ./i18n/i18n */ \"./src/i18n/i18n.js\");\n\n\nvar timestamp2Str = function timestamp2Str(ts) {\n if (ts < 10000) {\n return \"N/A\";\n }\n\n try {\n if (ts) {\n var time = new Date(ts);\n var y = time.getFullYear();\n var M = time.getMonth() + 1;\n var d = time.getDate();\n var h = time.getHours();\n var m = time.getMinutes();\n var s = time.getSeconds();\n return y + '-' + addZero(M) + '-' + addZero(d) + ' ' + addZero(h) + ':' + addZero(m) + ':' + addZero(s);\n } else {\n return '';\n }\n } catch (e) {\n return \"N/A\";\n }\n}; // 公共函数,涉及到 i18n ,放进 common.js 报错,暂时先放在这里吧\n\n\nvar translateInstanceStatus = function translateInstanceStatus(status) {\n console.log(\"zzzzzzzz %o\", status);\n\n switch (status) {\n case 1:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.waitingDispatch');\n\n case 2:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.waitingWorkerReceive');\n\n case 3:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.running');\n\n case 4:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.failed');\n\n case 5:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.success');\n\n case 10:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.stopped');\n\n default:\n return \"unknown\";\n }\n};\n\nvar translateWfInstanceStatus = function translateWfInstanceStatus(status) {\n switch (status) {\n case 1:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.wfWaiting');\n\n case 2:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.running');\n\n case 3:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.failed');\n\n case 4:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.success');\n\n case 10:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.stopped');\n\n default:\n return \"unknown\";\n }\n}; // 更换语言\n\n\nvar switchLanguage = function switchLanguage(cmd) {\n console.log(\"switch language to %o\", cmd);\n _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].locale = cmd; // 存储到LangStorage\n\n window.localStorage.setItem('oms_lang', cmd);\n};\n\nfunction addZero(m) {\n return m < 10 ? '0' + m : m;\n}\n\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n timestamp2Str: timestamp2Str,\n translateInstanceStatus: translateInstanceStatus,\n translateWfInstanceStatus: translateWfInstanceStatus,\n switchLanguage: switchLanguage\n});\n\n//# sourceURL=webpack:///./src/common.js?");
eval("__webpack_require__.r(__webpack_exports__);\n/* harmony import */ var _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__ = __webpack_require__(/*! ./i18n/i18n */ \"./src/i18n/i18n.js\");\n\n\nvar timestamp2Str = function timestamp2Str(ts) {\n if (ts < 10000) {\n return \"N/A\";\n }\n\n try {\n if (ts) {\n var time = new Date(ts);\n var y = time.getFullYear();\n var M = time.getMonth() + 1;\n var d = time.getDate();\n var h = time.getHours();\n var m = time.getMinutes();\n var s = time.getSeconds();\n return y + '-' + addZero(M) + '-' + addZero(d) + ' ' + addZero(h) + ':' + addZero(m) + ':' + addZero(s);\n } else {\n return '';\n }\n } catch (e) {\n return \"N/A\";\n }\n}; // 公共函数,涉及到 i18n ,放进 common.js 报错,暂时先放在这里吧\n\n\nvar translateInstanceStatus = function translateInstanceStatus(status) {\n switch (status) {\n case 1:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.waitingDispatch');\n\n case 2:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.waitingWorkerReceive');\n\n case 3:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.running');\n\n case 4:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.failed');\n\n case 5:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.success');\n\n case 10:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.stopped');\n\n default:\n return \"unknown\";\n }\n};\n\nvar translateWfInstanceStatus = function translateWfInstanceStatus(status) {\n switch (status) {\n case 1:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.wfWaiting');\n\n case 2:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.running');\n\n case 3:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.failed');\n\n case 4:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.success');\n\n case 10:\n return _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].t('message.stopped');\n\n default:\n return \"unknown\";\n }\n}; // 更换语言\n\n\nvar switchLanguage = function switchLanguage(cmd) {\n console.log(\"switch language to %o\", cmd);\n _i18n_i18n__WEBPACK_IMPORTED_MODULE_0__[\"default\"].locale = cmd; // 存储到LangStorage\n\n window.localStorage.setItem('oms_lang', cmd);\n};\n\nfunction addZero(m) {\n return m < 10 ? '0' + m : m;\n}\n\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n timestamp2Str: timestamp2Str,\n translateInstanceStatus: translateInstanceStatus,\n translateWfInstanceStatus: translateWfInstanceStatus,\n switchLanguage: switchLanguage\n});\n\n//# sourceURL=webpack:///./src/common.js?");
/***/ }),

View File

@ -1,26 +1,37 @@
package com.github.kfcfans.powerjob.worker.common.utils;
import java.util.LinkedHashMap;
import java.util.Map;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.function.BiConsumer;
/**
* LRULeast Recently Used 缓存
* before v3.1.1 使用 LinkedHashMap但存在修改时访问报错问题改用 Guava
*
* @author tjq
* @since 2020/4/8
*/
public class LRUCache<K, V> extends LinkedHashMap<K, V> {
public class LRUCache<K, V> {
private final int cacheSize;
private final Cache<K, V> innerCache;
public LRUCache(int cacheSize) {
super((int) Math.ceil(cacheSize / 0.75) + 1, 0.75f, false);
this.cacheSize = cacheSize;
innerCache = CacheBuilder.newBuilder()
.concurrencyLevel(2)
.initialCapacity(cacheSize)
.build();
}
@Override
protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
// 超过阈值时返回true进行LRU淘汰
return size() > cacheSize;
public void forEach(BiConsumer<? super K, ? super V> action) {
innerCache.asMap().forEach(action);
}
public V get(K key) {
return innerCache.getIfPresent(key);
}
public void put(K key, V value) {
innerCache.put(key, value);
}
}

View File

@ -114,7 +114,7 @@ public class FrequentTaskTracker extends TaskTracker {
InstanceDetail.SubInstanceDetail subDetail = new InstanceDetail.SubInstanceDetail();
BeanUtils.copyProperties(subInstanceInfo, subDetail);
InstanceStatus status = InstanceStatus.of(subInstanceInfo.status);
subDetail.setStatus(status.getDes());
subDetail.setStatus(status.getV());
subDetail.setSubInstanceId(subId);
// 设置时间
@ -347,8 +347,8 @@ public class FrequentTaskTracker extends TaskTracker {
subInstanceId2TimeHolder.remove(subInstanceId);
// 更新缓存数据
if (recentSubInstanceInfo.containsKey(subInstanceId)) {
SubInstanceInfo subInstanceInfo = recentSubInstanceInfo.get(subInstanceId);
if (subInstanceInfo != null) {
subInstanceInfo.status = success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV();
subInstanceInfo.result = result;
subInstanceInfo.finishedTime = System.currentTimeMillis();