diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index 873c16d5..28d2b152 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-client - 3.4.6 + 3.4.7 jar 5.6.1 1.2.68 - 3.4.6 + 3.4.7 3.2.4 diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index fe718257..a45cd012 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.4.6 + 3.4.7 jar diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/SystemMetrics.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/SystemMetrics.java index 6a065789..f2d48849 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/SystemMetrics.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/SystemMetrics.java @@ -47,7 +47,12 @@ public class SystemMetrics implements OmsSerializable, Comparable * Used disk ratio. */ private double diskUsage; - + /** + * user-customized system metrics collector, eg. GPU usage + * implement SystemMetricsCollector to set the value in worker side + * implement WorkerFilter to filter the worker in server side + */ + private String extra; /** * Score of cache. */ diff --git a/powerjob-official-processors/pom.xml b/powerjob-official-processors/pom.xml index b28b61fc..5e11dc48 100644 --- a/powerjob-official-processors/pom.xml +++ b/powerjob-official-processors/pom.xml @@ -20,7 +20,7 @@ 5.6.1 1.2.3 - 3.4.6 + 3.4.7 1.2.68 diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 9aac61ce..263a1930 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.4.6 + 3.4.7 jar 2.9.2 2.3.4.RELEASE - 3.4.6 + 3.4.7 8.0.19 19.7.0.0 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/extension/WorkerFilter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/extension/WorkerFilter.java new file mode 100644 index 00000000..adbecb45 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/extension/WorkerFilter.java @@ -0,0 +1,21 @@ +package com.github.kfcfans.powerjob.server.extension; + +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; + +/** + * filter worker by system metrics or other info + * + * @author tjq + * @since 2021/2/16 + */ +public interface WorkerFilter { + + /** + * + * @param workerInfo worker info, maybe you need to use your customized info in SystemMetrics#extra + * @param jobInfoDO job info + * @return true will remove the worker in process list + */ + boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfoDO); +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/FriendRequestHandler.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/FriendRequestHandler.java index 2c246c1b..b6d8cef1 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/FriendRequestHandler.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/FriendRequestHandler.java @@ -2,13 +2,14 @@ package com.github.kfcfans.powerjob.server.remote.server; import akka.actor.AbstractActor; import com.alibaba.fastjson.JSONObject; -import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.server.common.utils.SpringUtils; import com.github.kfcfans.powerjob.server.remote.server.request.FriendQueryWorkerClusterStatusReq; import com.github.kfcfans.powerjob.server.remote.server.request.Ping; import com.github.kfcfans.powerjob.server.remote.server.request.RemoteProcessReq; +import com.github.kfcfans.powerjob.server.remote.transport.TransportService; import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerClusterManagerService; +import com.github.kfcfans.powerjob.server.remote.worker.cluster.WorkerInfo; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.util.ReflectionUtils; @@ -38,7 +39,7 @@ public class FriendRequestHandler extends AbstractActor { * 处理存活检测的请求 */ private void onReceivePing(Ping ping) { - getSender().tell(AskResponse.succeed(System.currentTimeMillis() - ping.getCurrentTime()), getSelf()); + getSender().tell(AskResponse.succeed(TransportService.getAllAddress()), getSelf()); } /** diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/DefaultServerElectionService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/DefaultServerElectionService.java index 716ba850..236b24a3 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/DefaultServerElectionService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/server/election/DefaultServerElectionService.java @@ -2,9 +2,11 @@ package com.github.kfcfans.powerjob.server.remote.server.election; import akka.actor.ActorSelection; import akka.pattern.Patterns; +import com.alibaba.fastjson.JSONObject; import com.github.kfcfans.powerjob.common.PowerJobException; import com.github.kfcfans.powerjob.common.Protocol; import com.github.kfcfans.powerjob.common.response.AskResponse; +import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.server.extension.LockService; import com.github.kfcfans.powerjob.server.extension.ServerElectionService; import com.github.kfcfans.powerjob.server.remote.server.request.Ping; @@ -75,8 +77,9 @@ public class DefaultServerElectionService implements ServerElectionService { } String appName = appInfoOpt.get().getAppName(); String originServer = appInfoOpt.get().getCurrentServer(); - if (isActive(originServer, downServerCache)) { - return originServer; + String activeAddress = activeAddress(originServer, downServerCache, protocol); + if (StringUtils.isNotEmpty(activeAddress)) { + return activeAddress; } // 无可用Server,重新进行Server选举,需要加锁 @@ -93,8 +96,9 @@ public class DefaultServerElectionService implements ServerElectionService { // 可能上一台机器已经完成了Server选举,需要再次判断 AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database.")); - if (isActive(appInfo.getCurrentServer(), downServerCache)) { - return appInfo.getCurrentServer(); + String address = activeAddress(originServer, downServerCache, protocol); + if (StringUtils.isNotEmpty(address)) { + return address; } // 篡位,本机作为Server @@ -118,15 +122,16 @@ public class DefaultServerElectionService implements ServerElectionService { * 判断指定server是否存活 * @param serverAddress 需要检测的server地址 * @param downServerCache 缓存,防止多次发送PING(这个QPS其实还蛮爆表的...) - * @return true 存活 / false down机 + * @param protocol 协议,用于返回指定的地址 + * @return null or address */ - private boolean isActive(String serverAddress, Set downServerCache) { + private String activeAddress(String serverAddress, Set downServerCache, String protocol) { if (downServerCache.contains(serverAddress)) { - return false; + return null; } if (StringUtils.isEmpty(serverAddress)) { - return false; + return null; } Ping ping = new Ping(); @@ -137,12 +142,14 @@ public class DefaultServerElectionService implements ServerElectionService { CompletionStage askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS)); AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS); downServerCache.remove(serverAddress); - return response.isSuccess(); + if (response.isSuccess()) { + return JsonUtils.parseObject(response.getData(), JSONObject.class).getString(protocol); + } }catch (Exception e) { log.warn("[ServerElection] server({}) was down.", serverAddress); } downServerCache.add(serverAddress); - return false; + return null; } private boolean accurate() { @@ -151,6 +158,6 @@ public class DefaultServerElectionService implements ServerElectionService { private String getProtocolServerAddress(String protocol) { Protocol pt = Protocol.of(protocol); - return transportService.getTransporter(pt).getAddress(); + return TransportService.getAllAddress().get(pt); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/transport/TransportService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/transport/TransportService.java index 53c764e5..bfd6b947 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/transport/TransportService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/transport/TransportService.java @@ -22,6 +22,8 @@ import java.util.Map; @Service public class TransportService { + private static final Map protocol2Address = Maps.newHashMap(); + @Getter private final Map protocol2Transporter = Maps.newConcurrentMap(); @@ -30,6 +32,7 @@ public class TransportService { transporters.forEach(t -> { log.info("[TransportService] Transporter[protocol:{},address:{}] registration successful!", t.getProtocol(), t.getAddress()); protocol2Transporter.put(t.getProtocol(), t); + protocol2Address.put(t.getProtocol(), t.getAddress()); }); } @@ -56,4 +59,8 @@ public class TransportService { super(message); } } + + public static Map getAllAddress() { + return protocol2Address; + } } diff --git a/powerjob-server/src/main/resources/application-daily.properties b/powerjob-server/src/main/resources/application-daily.properties index bb286003..d99fa81a 100644 --- a/powerjob-server/src/main/resources/application-daily.properties +++ b/powerjob-server/src/main/resources/application-daily.properties @@ -3,7 +3,7 @@ logging.config=classpath:logback-dev.xml ####### Database properties(Configure according to the the environment) ####### spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver -spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai +spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3307/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai spring.datasource.core.username=root spring.datasource.core.password=No1Bug2Please3! spring.datasource.core.hikari.maximum-pool-size=20 diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index fc9097fc..0fc5b74a 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -10,18 +10,18 @@ 4.0.0 powerjob-worker-agent - 3.4.6 + 3.4.7 jar - 3.4.6 + 3.4.7 1.2.3 4.3.2 2.2.6.RELEASE - 1.0.0 + 1.0.1 diff --git a/powerjob-worker-agent/src/main/java/com/github/kfcfans/powerjob/worker/MainApplication.java b/powerjob-worker-agent/src/main/java/com/github/kfcfans/powerjob/worker/MainApplication.java index caebd148..a51d3b71 100644 --- a/powerjob-worker-agent/src/main/java/com/github/kfcfans/powerjob/worker/MainApplication.java +++ b/powerjob-worker-agent/src/main/java/com/github/kfcfans/powerjob/worker/MainApplication.java @@ -17,7 +17,7 @@ import picocli.CommandLine.Option; * @since 2020/5/20 */ @Slf4j -@Command(name = "OhMyAgent", mixinStandardHelpOptions = true, version = "3.4.3", description = "powerjob-worker agent") +@Command(name = "OhMyAgent", mixinStandardHelpOptions = true, version = "3.4.7", description = "powerjob-worker agent") public class MainApplication implements Runnable { @Option(names = {"-a", "--app"}, description = "worker-agent's name", required = true) diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index 9526b515..b825734f 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-samples - 3.4.6 + 3.4.7 2.2.6.RELEASE - 3.4.6 + 3.4.7 1.2.68 diff --git a/powerjob-worker-spring-boot-starter/pom.xml b/powerjob-worker-spring-boot-starter/pom.xml index 98432094..bfa72297 100644 --- a/powerjob-worker-spring-boot-starter/pom.xml +++ b/powerjob-worker-spring-boot-starter/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-spring-boot-starter - 3.4.6 + 3.4.7 jar - 3.4.6 + 3.4.7 2.2.6.RELEASE diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index b4c6758a..43430970 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker - 3.4.6 + 3.4.7 jar 5.2.4.RELEASE - 3.4.6 + 3.4.7 1.4.200 3.4.2 5.6.1 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/WorkerHealthReporter.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/WorkerHealthReporter.java index fbad814d..d50f6e75 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/WorkerHealthReporter.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/WorkerHealthReporter.java @@ -33,7 +33,14 @@ public class WorkerHealthReporter implements Runnable { return; } - SystemMetrics systemMetrics = SystemInfoUtils.getSystemMetrics(); + SystemMetrics systemMetrics; + + if (OhMyWorker.getConfig().getSystemMetricsCollector() == null) { + systemMetrics = SystemInfoUtils.getSystemMetrics(); + } else { + systemMetrics = OhMyWorker.getConfig().getSystemMetricsCollector().collect(); + } + WorkerHeartbeat heartbeat = new WorkerHeartbeat(); heartbeat.setSystemMetrics(systemMetrics); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java index b1007279..4fef6213 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/OhMyConfig.java @@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.worker.common; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; +import com.github.kfcfans.powerjob.worker.extension.SystemMetricsCollector; import com.google.common.collect.Lists; import lombok.Getter; import lombok.Setter; @@ -53,4 +54,7 @@ public class OhMyConfig { * Test mode is used for conditions that your have no powerjob-server in your develop env so you can't startup the application */ private boolean enableTestMode = false; + + private SystemMetricsCollector systemMetricsCollector; + } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java index 7d070a3e..9b84bb5f 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java @@ -8,8 +8,6 @@ import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.SystemUtils; import java.io.*; import java.net.URL; @@ -46,9 +44,6 @@ public abstract class ScriptProcessor implements BasicProcessor { File dir = new File(script.getParent()); boolean success = dir.mkdirs(); - if (!success) { - throw new RuntimeException("create script folder failed."); - } success = script.createNewFile(); if (!success) { throw new RuntimeException("create script file failed"); @@ -75,18 +70,11 @@ public abstract class ScriptProcessor implements BasicProcessor { OmsLogger omsLogger = context.getOmsLogger(); omsLogger.info("SYSTEM===> ScriptProcessor start to process"); - - if (SystemUtils.IS_OS_WINDOWS) { - if (StringUtils.equals(fetchRunCommand(), "/bin/bash")) { - omsLogger.warn("Current OS is {} where shell scripts cannot run.", SystemUtils.OS_NAME); - return new ProcessResult(false, "Shell scripts cannot run on Windows"); - } - } else { - // 1. 授权 - ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); - // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) - chmodPb.start().waitFor(); - } + + // 1. 授权 + ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); + // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) + chmodPb.start().waitFor(); // 2. 执行目标脚本 ProcessBuilder pb = new ProcessBuilder(fetchRunCommand(), scriptPath); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index 4efc5430..7eafa6e3 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -312,6 +312,8 @@ public abstract class TaskTracker { subTask.setSubInstanceId(subInstanceId); subTask.setTaskName(TaskConstant.BROADCAST_TASK_NAME); subTask.setTaskId(preTaskId + "." + i); + // 广播任务直接写入派发地址 + subTask.setAddress(allWorkerAddress.get(i)); subTaskList.add(subTask); } submitTask(subTaskList); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/extension/SystemMetricsCollector.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/extension/SystemMetricsCollector.java new file mode 100644 index 00000000..ab2c78f0 --- /dev/null +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/extension/SystemMetricsCollector.java @@ -0,0 +1,18 @@ +package com.github.kfcfans.powerjob.worker.extension; + +import com.github.kfcfans.powerjob.common.model.SystemMetrics; + +/** + * user-customized system metrics collector + * + * @author tjq + * @since 2021/2/16 + */ +public interface SystemMetricsCollector { + + /** + * SystemMetrics, you can put your custom metrics info in the 'extra' param + * @return SystemMetrics + */ + SystemMetrics collect(); +}