Merge branch 'v3.4.7' into jenkins_auto_build

This commit is contained in:
tjq 2021-02-23 00:47:11 +08:00
commit a900704669
20 changed files with 109 additions and 49 deletions

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.4.6</version>
<version>3.4.7</version>
<packaging>jar</packaging>
<properties>
<junit.version>5.6.1</junit.version>
<fastjson.version>1.2.68</fastjson.version>
<powerjob.common.version>3.4.6</powerjob.common.version>
<powerjob.common.version>3.4.7</powerjob.common.version>
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
</properties>

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.4.6</version>
<version>3.4.7</version>
<packaging>jar</packaging>
<properties>

View File

@ -47,7 +47,12 @@ public class SystemMetrics implements OmsSerializable, Comparable<SystemMetrics>
* Used disk ratio.
*/
private double diskUsage;
/**
* user-customized system metrics collector, eg. GPU usage
* implement SystemMetricsCollector to set the value in worker side
* implement WorkerFilter to filter the worker in server side
*/
private String extra;
/**
* Score of cache.
*/

View File

@ -20,7 +20,7 @@
<!-- 不会被打包的部分scope 只能是 test 或 provide -->
<junit.version>5.6.1</junit.version>
<logback.version>1.2.3</logback.version>
<powerjob.worker.version>3.4.6</powerjob.worker.version>
<powerjob.worker.version>3.4.7</powerjob.worker.version>
<!-- 全部 shade 化,避免依赖冲突 -->
<fastjson.version>1.2.68</fastjson.version>

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.4.6</version>
<version>3.4.7</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.3.4.RELEASE</springboot.version>
<powerjob.common.version>3.4.6</powerjob.common.version>
<powerjob.common.version>3.4.7</powerjob.common.version>
<!-- MySQL version that corresponds to spring-boot-dependencies version. -->
<mysql.version>8.0.19</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version>

View File

@ -0,0 +1,21 @@
package com.github.kfcfans.powerjob.server.extension;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
/**
* filter worker by system metrics or other info
*
* @author tjq
* @since 2021/2/16
*/
public interface WorkerFilter {
/**
*
* @param workerInfo worker info, maybe you need to use your customized info in SystemMetrics#extra
* @param jobInfoDO job info
* @return true will remove the worker in process list
*/
boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO);
}

View File

@ -2,13 +2,14 @@ package com.github.kfcfans.powerjob.server.remote.server;
import akka.actor.AbstractActor;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.remote.server.request.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.powerjob.server.remote.server.request.Ping;
import com.github.kfcfans.powerjob.server.remote.server.request.RemoteProcessReq;
import com.github.kfcfans.powerjob.server.remote.transport.TransportService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService;
import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.util.ReflectionUtils;
@ -38,7 +39,7 @@ public class FriendRequestHandler extends AbstractActor {
* 处理存活检测的请求
*/
private void onReceivePing(Ping ping) {
getSender().tell(AskResponse.succeed(System.currentTimeMillis() - ping.getCurrentTime()), getSelf());
getSender().tell(AskResponse.succeed(TransportService.getAllAddress()), getSelf());
}
/**

View File

@ -2,9 +2,11 @@ package com.github.kfcfans.powerjob.server.remote.server.election;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.Protocol;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.server.extension.LockService;
import com.github.kfcfans.powerjob.server.extension.ServerElectionService;
import com.github.kfcfans.powerjob.server.remote.server.request.Ping;
@ -75,8 +77,9 @@ public class DefaultServerElectionService implements ServerElectionService {
}
String appName = appInfoOpt.get().getAppName();
String originServer = appInfoOpt.get().getCurrentServer();
if (isActive(originServer, downServerCache)) {
return originServer;
String activeAddress = activeAddress(originServer, downServerCache, protocol);
if (StringUtils.isNotEmpty(activeAddress)) {
return activeAddress;
}
// 无可用Server重新进行Server选举需要加锁
@ -93,8 +96,9 @@ public class DefaultServerElectionService implements ServerElectionService {
// 可能上一台机器已经完成了Server选举需要再次判断
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
if (isActive(appInfo.getCurrentServer(), downServerCache)) {
return appInfo.getCurrentServer();
String address = activeAddress(originServer, downServerCache, protocol);
if (StringUtils.isNotEmpty(address)) {
return address;
}
// 篡位本机作为Server
@ -118,15 +122,16 @@ public class DefaultServerElectionService implements ServerElectionService {
* 判断指定server是否存活
* @param serverAddress 需要检测的server地址
* @param downServerCache 缓存防止多次发送PING这个QPS其实还蛮爆表的...
* @return true 存活 / false down机
* @param protocol 协议用于返回指定的地址
* @return null or address
*/
private boolean isActive(String serverAddress, Set<String> downServerCache) {
private String activeAddress(String serverAddress, Set<String> downServerCache, String protocol) {
if (downServerCache.contains(serverAddress)) {
return false;
return null;
}
if (StringUtils.isEmpty(serverAddress)) {
return false;
return null;
}
Ping ping = new Ping();
@ -137,12 +142,14 @@ public class DefaultServerElectionService implements ServerElectionService {
CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));
AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
downServerCache.remove(serverAddress);
return response.isSuccess();
if (response.isSuccess()) {
return JsonUtils.parseObject(response.getData(), JSONObject.class).getString(protocol);
}
}catch (Exception e) {
log.warn("[ServerElection] server({}) was down.", serverAddress);
}
downServerCache.add(serverAddress);
return false;
return null;
}
private boolean accurate() {
@ -151,6 +158,6 @@ public class DefaultServerElectionService implements ServerElectionService {
private String getProtocolServerAddress(String protocol) {
Protocol pt = Protocol.of(protocol);
return transportService.getTransporter(pt).getAddress();
return TransportService.getAllAddress().get(pt);
}
}

View File

@ -22,6 +22,8 @@ import java.util.Map;
@Service
public class TransportService {
private static final Map<Protocol, String> protocol2Address = Maps.newHashMap();
@Getter
private final Map<Protocol, Transporter> protocol2Transporter = Maps.newConcurrentMap();
@ -30,6 +32,7 @@ public class TransportService {
transporters.forEach(t -> {
log.info("[TransportService] Transporter[protocol:{},address:{}] registration successful!", t.getProtocol(), t.getAddress());
protocol2Transporter.put(t.getProtocol(), t);
protocol2Address.put(t.getProtocol(), t.getAddress());
});
}
@ -56,4 +59,8 @@ public class TransportService {
super(message);
}
}
public static Map<Protocol, String> getAllAddress() {
return protocol2Address;
}
}

View File

@ -3,7 +3,7 @@ logging.config=classpath:logback-dev.xml
####### Database properties(Configure according to the the environment) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3307/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20

View File

@ -10,18 +10,18 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.4.6</version>
<version>3.4.7</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.4.6</powerjob.worker.version>
<powerjob.worker.version>3.4.7</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>
<spring.boot.version>2.2.6.RELEASE</spring.boot.version>
<powerjob.official.processors.version>1.0.0</powerjob.official.processors.version>
<powerjob.official.processors.version>1.0.1</powerjob.official.processors.version>
</properties>
<dependencies>

View File

@ -17,7 +17,7 @@ import picocli.CommandLine.Option;
* @since 2020/5/20
*/
@Slf4j
@Command(name = "OhMyAgent", mixinStandardHelpOptions = true, version = "3.4.3", description = "powerjob-worker agent")
@Command(name = "OhMyAgent", mixinStandardHelpOptions = true, version = "3.4.7", description = "powerjob-worker agent")
public class MainApplication implements Runnable {
@Option(names = {"-a", "--app"}, description = "worker-agent's name", required = true)

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.4.6</version>
<version>3.4.7</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.starter.version>3.4.6</powerjob.worker.starter.version>
<powerjob.worker.starter.version>3.4.7</powerjob.worker.starter.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>3.4.6</version>
<version>3.4.7</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.4.6</powerjob.worker.version>
<powerjob.worker.version>3.4.7</powerjob.worker.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
</properties>

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.4.6</version>
<version>3.4.7</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.4.6</powerjob.common.version>
<powerjob.common.version>3.4.7</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -33,7 +33,14 @@ public class WorkerHealthReporter implements Runnable {
return;
}
SystemMetrics systemMetrics = SystemInfoUtils.getSystemMetrics();
SystemMetrics systemMetrics;
if (OhMyWorker.getConfig().getSystemMetricsCollector() == null) {
systemMetrics = SystemInfoUtils.getSystemMetrics();
} else {
systemMetrics = OhMyWorker.getConfig().getSystemMetricsCollector().collect();
}
WorkerHeartbeat heartbeat = new WorkerHeartbeat();
heartbeat.setSystemMetrics(systemMetrics);

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.worker.common;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.extension.SystemMetricsCollector;
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
@ -53,4 +54,7 @@ public class OhMyConfig {
* Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application
*/
private boolean enableTestMode = false;
private SystemMetricsCollector systemMetricsCollector;
}

View File

@ -8,8 +8,6 @@ import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.*;
import java.net.URL;
@ -46,9 +44,6 @@ public abstract class ScriptProcessor implements BasicProcessor {
File dir = new File(script.getParent());
boolean success = dir.mkdirs();
if (!success) {
throw new RuntimeException("create script folder failed.");
}
success = script.createNewFile();
if (!success) {
throw new RuntimeException("create script file failed");
@ -75,18 +70,11 @@ public abstract class ScriptProcessor implements BasicProcessor {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("SYSTEM===> ScriptProcessor start to process");
if (SystemUtils.IS_OS_WINDOWS) {
if (StringUtils.equals(fetchRunCommand(), "/bin/bash")) {
omsLogger.warn("Current OS is {} where shell scripts cannot run.", SystemUtils.OS_NAME);
return new ProcessResult(false, "Shell scripts cannot run on Windows");
}
} else {
// 1. 授权
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
// 等待返回这里不可能导致死锁shell产生大量数据可能导致死锁
chmodPb.start().waitFor();
}
// 1. 授权
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
// 等待返回这里不可能导致死锁shell产生大量数据可能导致死锁
chmodPb.start().waitFor();
// 2. 执行目标脚本
ProcessBuilder pb = new ProcessBuilder(fetchRunCommand(), scriptPath);

View File

@ -312,6 +312,8 @@ public abstract class TaskTracker {
subTask.setSubInstanceId(subInstanceId);
subTask.setTaskName(TaskConstant.BROADCAST_TASK_NAME);
subTask.setTaskId(preTaskId + "." + i);
// 广播任务直接写入派发地址
subTask.setAddress(allWorkerAddress.get(i));
subTaskList.add(subTask);
}
submitTask(subTaskList);

View File

@ -0,0 +1,18 @@
package com.github.kfcfans.powerjob.worker.extension;
import com.github.kfcfans.powerjob.common.model.SystemMetrics;
/**
* user-customized system metrics collector
*
* @author tjq
* @since 2021/2/16
*/
public interface SystemMetricsCollector {
/**
* SystemMetrics, you can put your custom metrics info in the 'extra' param
* @return SystemMetrics
*/
SystemMetrics collect();
}