Merge branch 'v3.3.3' into jenkins_auto_build

This commit is contained in:
KFCFans 2020-11-21 17:09:02 +08:00
commit 8b30881b43
15 changed files with 65 additions and 129 deletions

View File

@ -56,7 +56,7 @@ public class OhMyClient {
*/ */
public OhMyClient(List<String> addressList, String appName, String password) { public OhMyClient(List<String> addressList, String appName, String password) {
CommonUtils.requireNonNull(addressList, "domain can't be null!"); CommonUtils.requireNonNull(addressList, "addressList can't be null!");
CommonUtils.requireNonNull(appName, "appName can't be null"); CommonUtils.requireNonNull(appName, "appName can't be null");
allAddress = addressList; allAddress = addressList;
@ -81,7 +81,7 @@ public class OhMyClient {
if (StringUtils.isEmpty(currentAddress)) { if (StringUtils.isEmpty(currentAddress)) {
throw new PowerJobException("no server available"); throw new PowerJobException("no server available");
} }
log.info("[OhMyClient] {}'s oms-client bootstrap successfully, using server: {}", appName, currentAddress); log.info("[OhMyClient] {}'s OhMyClient bootstrap successfully, using server: {}", appName, currentAddress);
} }
private static String assertApp(String appName, String password, String url) throws IOException { private static String assertApp(String appName, String password, String url) throws IOException {

View File

@ -28,7 +28,7 @@ public class SwaggerConfig {
.description("Distributed scheduling and computing framework.") .description("Distributed scheduling and computing framework.")
.license("Apache Licence 2") .license("Apache Licence 2")
.termsOfServiceUrl("https://github.com/KFCFans/PowerJob") .termsOfServiceUrl("https://github.com/KFCFans/PowerJob")
.version("3.1.3") .version("3.3.3")
.build(); .build();
return new Docket(DocumentationType.SWAGGER_2) return new Docket(DocumentationType.SWAGGER_2)

View File

@ -50,7 +50,7 @@ public class MailAlarmService implements Alarmable {
javaMailSender.send(sm); javaMailSender.send(sm);
}catch (Exception e) { }catch (Exception e) {
log.error("[MailAlarmService] send mail failed, reason is {}", e.getMessage()); log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage());
} }
} }

View File

@ -12,6 +12,7 @@ import com.github.kfcfans.powerjob.server.service.lock.LockService;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -20,6 +21,7 @@ import java.util.Date;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -37,17 +39,25 @@ public class ServerSelectService {
@Resource @Resource
private AppInfoRepository appInfoRepository; private AppInfoRepository appInfoRepository;
@Value("${oms.accurate.select.server.percentage}")
private int accurateSelectServerPercentage;
private static final int RETRY_TIMES = 10; private static final int RETRY_TIMES = 10;
private static final long PING_TIMEOUT_MS = 1000; private static final long PING_TIMEOUT_MS = 1000;
private static final String SERVER_ELECT_LOCK = "server_elect_%d"; private static final String SERVER_ELECT_LOCK = "server_elect_%d";
/**
* 获取某个应用对应的Server public String getServer(Long appId, String currentServer) {
* if (!accurate()) {
* @param appId 应用ID // 如果是本机就不需要查数据库那么复杂的操作了直接返回成功
* @return 当前可用的Server if (OhMyServer.getActorSystemAddress().equals(currentServer)) {
*/ return currentServer;
public String getServer(Long appId) { }
}
return getServer0(appId);
}
private String getServer0(Long appId) {
Set<String> downServerCache = Sets.newHashSet(); Set<String> downServerCache = Sets.newHashSet();
@ -95,7 +105,7 @@ public class ServerSelectService {
lockService.unlock(lockName); lockService.unlock(lockName);
} }
} }
throw new RuntimeException("server elect failed for app " + appId); throw new PowerJobException("server elect failed for app " + appId);
} }
/** /**
@ -113,6 +123,10 @@ public class ServerSelectService {
return false; return false;
} }
if (OhMyServer.getActorSystemAddress().equals(serverAddress)) {
return true;
}
Ping ping = new Ping(); Ping ping = new Ping();
ping.setCurrentTime(System.currentTimeMillis()); ping.setCurrentTime(System.currentTimeMillis());
@ -128,4 +142,8 @@ public class ServerSelectService {
downServerCache.add(serverAddress); downServerCache.add(serverAddress);
return false; return false;
} }
private boolean accurate() {
return ThreadLocalRandom.current().nextInt(100) < accurateSelectServerPercentage;
}
} }

View File

@ -2,26 +2,20 @@ package com.github.kfcfans.powerjob.server.web.controller;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.ha.ClusterStatusHolder;
import com.github.kfcfans.powerjob.server.service.ha.ServerSelectService; import com.github.kfcfans.powerjob.server.service.ha.ServerSelectService;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService; import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import com.taobao.api.internal.cluster.ClusterManager;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.TimeZone; import java.util.TimeZone;
@ -40,8 +34,6 @@ public class ServerController {
private ServerSelectService serverSelectService; private ServerSelectService serverSelectService;
@Resource @Resource
private AppInfoRepository appInfoRepository; private AppInfoRepository appInfoRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@GetMapping("/assert") @GetMapping("/assert")
public ResultDTO<Long> assertAppName(String appName) { public ResultDTO<Long> assertAppName(String appName) {
@ -52,13 +44,7 @@ public class ServerController {
@GetMapping("/acquire") @GetMapping("/acquire")
public ResultDTO<String> acquireServer(Long appId, String currentServer) { public ResultDTO<String> acquireServer(Long appId, String currentServer) {
return ResultDTO.success(serverSelectService.getServer(appId, currentServer));
// 如果是本机就不需要查数据库那么复杂的操作了直接返回成功
if (OhMyServer.getActorSystemAddress().equals(currentServer)) {
return ResultDTO.success(currentServer);
}
String server = serverSelectService.getServer(appId);
return ResultDTO.success(server);
} }
@GetMapping("/hello") @GetMapping("/hello")

View File

@ -33,3 +33,6 @@ oms.container.retention.remote=-1
####### 缓存配置 ####### ####### 缓存配置 #######
oms.instance.metadata.cache.size=1024 oms.instance.metadata.cache.size=1024
####### 精确获取 server 的百分比0100100代表每次 worker 获取 server 都会进行完整的探活流程,不存在脑裂问题,但有性能开销 #######
oms.accurate.select.server.percentage = 50

View File

@ -33,3 +33,6 @@ oms.container.retention.remote=-1
####### 缓存配置 ####### ####### 缓存配置 #######
oms.instance.metadata.cache.size=1024 oms.instance.metadata.cache.size=1024
####### 精确获取 server 的百分比0100100代表每次 worker 获取 server 都会进行完整的探活流程,不存在脑裂问题,但有性能开销 #######
oms.accurate.select.server.percentage = 50

View File

@ -33,3 +33,6 @@ oms.container.retention.remote=-1
####### 缓存配置 ####### ####### 缓存配置 #######
oms.instance.metadata.cache.size=2048 oms.instance.metadata.cache.size=2048
####### 精确获取 server 的百分比0100100代表每次 worker 获取 server 都会进行完整的探活流程,不存在脑裂问题,但有性能开销 #######
oms.accurate.select.server.percentage = 50

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.worker.autoconfigure; package com.github.kfcfans.powerjob.worker.autoconfigure;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig; import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition; import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
@ -37,8 +38,14 @@ public class PowerJobAutoConfiguration {
// 1. 创建配置文件 // 1. 创建配置文件
OhMyConfig config = new OhMyConfig(); OhMyConfig config = new OhMyConfig();
// 可以不显式设置默认值 27777
config.setPort(worker.getAkkaPort()); // 端口配置支持随机端口
int port = worker.getAkkaPort();
if (port <= 0) {
port = NetUtils.getRandomPort();
}
config.setPort(port);
// appName需要提前在控制台注册否则启动报错 // appName需要提前在控制台注册否则启动报错
config.setAppName(worker.getAppName()); config.setAppName(worker.getAppName());
config.setServerAddress(serverAddress); config.setServerAddress(serverAddress);

View File

@ -50,60 +50,6 @@
"type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy", "type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
"description": "本地持久化方式,默认使用磁盘", "description": "本地持久化方式,默认使用磁盘",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker" "sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties$Worker"
},
{
"name": "powerjob.akka-port",
"type": "java.lang.Integer",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.akka-port"
}
},
{
"name": "powerjob.app-name",
"type": "java.lang.String",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.app-name"
}
},
{
"name": "powerjob.enable-test-mode",
"type": "java.lang.Boolean",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.enable-test-mode"
}
},
{
"name": "powerjob.max-result-length",
"type": "java.lang.Integer",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.max-result-length"
}
},
{
"name": "powerjob.server-address",
"type": "java.lang.String",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.server-address"
}
},
{
"name": "powerjob.store-strategy",
"type": "com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy",
"sourceType": "com.github.kfcfans.powerjob.worker.autoconfigure.PowerJobProperties",
"deprecated": true,
"deprecation": {
"replacement": "powerjob.worker.store-strategy"
}
} }
], ],
"hints": [] "hints": []

View File

@ -20,7 +20,7 @@ import com.github.kfcfans.powerjob.worker.background.OmsLogHandler;
import com.github.kfcfans.powerjob.worker.background.ServerDiscoveryService; import com.github.kfcfans.powerjob.worker.background.ServerDiscoveryService;
import com.github.kfcfans.powerjob.worker.background.WorkerHealthReporter; import com.github.kfcfans.powerjob.worker.background.WorkerHealthReporter;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig; import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.worker.common.OmsBannerPrinter; import com.github.kfcfans.powerjob.worker.common.PowerBannerPrinter;
import com.github.kfcfans.powerjob.worker.common.utils.SpringUtils; import com.github.kfcfans.powerjob.worker.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService; import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
@ -80,7 +80,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
Stopwatch stopwatch = Stopwatch.createStarted(); Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[OhMyWorker] start to initialize OhMyWorker..."); log.info("[OhMyWorker] start to initialize OhMyWorker...");
try { try {
OmsBannerPrinter.print(); PowerBannerPrinter.print();
// 校验 appName // 校验 appName
if (!config.isEnableTestMode()) { if (!config.isEnableTestMode()) {
appId = assertAppName(); appId = assertAppName();

View File

@ -5,7 +5,7 @@ import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.model.SystemMetrics; import com.github.kfcfans.powerjob.common.model.SystemMetrics;
import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat; import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OmsWorkerVersion; import com.github.kfcfans.powerjob.worker.common.PowerJobWorkerVersion;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SystemInfoUtils; import com.github.kfcfans.powerjob.worker.common.utils.SystemInfoUtils;
import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory; import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory;
@ -40,7 +40,7 @@ public class WorkerHealthReporter implements Runnable {
heartbeat.setAppName(OhMyWorker.getConfig().getAppName()); heartbeat.setAppName(OhMyWorker.getConfig().getAppName());
heartbeat.setAppId(OhMyWorker.getAppId()); heartbeat.setAppId(OhMyWorker.getAppId());
heartbeat.setHeartbeatTime(System.currentTimeMillis()); heartbeat.setHeartbeatTime(System.currentTimeMillis());
heartbeat.setVersion(OmsWorkerVersion.getVersion()); heartbeat.setVersion(PowerJobWorkerVersion.getVersion());
// 获取当前加载的容器列表 // 获取当前加载的容器列表
heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos()); heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos());

View File

@ -1,10 +1,10 @@
package com.github.kfcfans.powerjob.worker.common; package com.github.kfcfans.powerjob.worker.common;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.util.List; import java.util.List;
@ -15,6 +15,7 @@ import java.util.List;
* @author tjq * @author tjq
* @since 2020/3/16 * @since 2020/3/16
*/ */
@Getter
@Setter @Setter
public class OhMyConfig { public class OhMyConfig {
/** /**
@ -48,35 +49,4 @@ public class OhMyConfig {
* true -> 用于本地写单元测试调试 false -> 默认值标准模式 * true -> 用于本地写单元测试调试 false -> 默认值标准模式
*/ */
private boolean enableTestMode = false; private boolean enableTestMode = false;
public String getAppName() {
return appName;
}
public int getPort() {
if (port > 0) {
return port;
}
return NetUtils.getRandomPort();
}
public List<String> getServerAddress() {
return serverAddress;
}
public StoreStrategy getStoreStrategy() {
return storeStrategy;
}
public int getMaxResultLength() {
return maxResultLength;
}
public Object getUserContext() {
return userContext;
}
public boolean isEnableTestMode() {
return enableTestMode;
}
} }

View File

@ -9,7 +9,7 @@ import lombok.extern.slf4j.Slf4j;
* @since 2020/5/11 * @since 2020/5/11
*/ */
@Slf4j @Slf4j
public final class OmsBannerPrinter { public final class PowerBannerPrinter {
private static final String BANNER = "" + private static final String BANNER = "" +
"\n" + "\n" +
@ -30,7 +30,7 @@ public final class OmsBannerPrinter {
public static void print() { public static void print() {
log.info(BANNER); log.info(BANNER);
String version = OmsWorkerVersion.getVersion(); String version = PowerJobWorkerVersion.getVersion();
version = (version != null) ? " (v" + version + ")" : ""; version = (version != null) ? " (v" + version + ")" : "";
log.info(":: PowerJob Worker :: {}", version); log.info(":: PowerJob Worker :: {}", version);
} }

View File

@ -17,7 +17,7 @@ import java.util.jar.JarFile;
* @author tjq * @author tjq
* @since 2020/5/11 * @since 2020/5/11
*/ */
public final class OmsWorkerVersion { public final class PowerJobWorkerVersion {
private static String CACHE = null; private static String CACHE = null;
@ -29,17 +29,17 @@ public final class OmsWorkerVersion {
*/ */
public static String getVersion() { public static String getVersion() {
if (StringUtils.isEmpty(CACHE)) { if (StringUtils.isEmpty(CACHE)) {
CACHE = determineSpringBootVersion(); CACHE = determinePowerJobVersion();
} }
return CACHE; return CACHE;
} }
private static String determineSpringBootVersion() { private static String determinePowerJobVersion() {
String implementationVersion = OmsWorkerVersion.class.getPackage().getImplementationVersion(); String implementationVersion = PowerJobWorkerVersion.class.getPackage().getImplementationVersion();
if (implementationVersion != null) { if (implementationVersion != null) {
return implementationVersion; return implementationVersion;
} }
CodeSource codeSource = OmsWorkerVersion.class.getProtectionDomain().getCodeSource(); CodeSource codeSource = PowerJobWorkerVersion.class.getProtectionDomain().getCodeSource();
if (codeSource == null) { if (codeSource == null) {
return null; return null;
} }