diff --git a/.gitignore b/.gitignore index a1c2a238..5a7e1cf9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,23 +1,36 @@ -# Compiled class file -*.class +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/** +!**/src/test/** -# Log file -*.log +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache -# BlueJ files -*.ctxt +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr -# Mobile Tools for Java (J2ME) -.mtj.tmp/ +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ -# Package Files # +### VS Code ### +.vscode/ + +### others ### *.jar -*.war -*.nar -*.ear -*.zip -*.tar.gz -*.rar - -# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml -hs_err_pid* +*.log +*/.DS_Store diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 00000000..5c98b428 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,2 @@ +# Default ignored files +/workspace.xml \ No newline at end of file diff --git a/.idea/.name b/.idea/.name new file mode 100644 index 00000000..a0f9964c --- /dev/null +++ b/.idea/.name @@ -0,0 +1 @@ +oh-my-scheduler \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 00000000..28802bf0 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 00000000..35eb1ddf --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/oh-my-scheduler-common/pom.xml b/oh-my-scheduler-common/pom.xml new file mode 100644 index 00000000..f3a9cae9 --- /dev/null +++ b/oh-my-scheduler-common/pom.xml @@ -0,0 +1,17 @@ + + + + oh-my-scheduler + com.github.kfcfans + 1.0.0-SNAPSHOT + + + 4.0.0 + oh-my-scheduler-common + 1.0.0-SNAPSHOT + jar + + + \ No newline at end of file diff --git a/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ExecuteType.java b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ExecuteType.java new file mode 100644 index 00000000..73755a0f --- /dev/null +++ b/oh-my-scheduler-common/src/main/java/com/github/kfcfans/common/ExecuteType.java @@ -0,0 +1,13 @@ +package com.github.kfcfans.common; + +/** + * 任务执行类型 + * + * @author tjq + * @since 2020/3/17 + */ +public enum ExecuteType { + STANDALONE, + BROADCAST, + MAP_REDUCE +} diff --git a/oh-my-scheduler-server/pom.xml b/oh-my-scheduler-server/pom.xml new file mode 100644 index 00000000..9c369663 --- /dev/null +++ b/oh-my-scheduler-server/pom.xml @@ -0,0 +1,17 @@ + + + + oh-my-scheduler + com.github.kfcfans + 1.0.0-SNAPSHOT + + + 4.0.0 + oh-my-scheduler-server + 1.0.0-SNAPSHOT + jar + + + \ No newline at end of file diff --git a/oh-my-scheduler-worker/pom.xml b/oh-my-scheduler-worker/pom.xml new file mode 100644 index 00000000..adaf49cc --- /dev/null +++ b/oh-my-scheduler-worker/pom.xml @@ -0,0 +1,89 @@ + + + + oh-my-scheduler + com.github.kfcfans + 1.0.0-SNAPSHOT + + + 4.0.0 + oh-my-scheduler-worker + 1.0.0-SNAPSHOT + jar + + + 5.2.4.RELEASE + 2.6.4 + 1.7.30 + 1.0.0-SNAPSHOT + 1.4.200 + 3.4.2 + 28.2-jre + + + + + + + org.springframework + spring-context + ${spring.version} + + + + + + com.typesafe.akka + akka-remote_2.13 + ${akka.version} + + + + + org.slf4j + slf4j-api + 1.7.30 + + + + + com.github.kfcfans + oh-my-scheduler-common + 1.0.0-SNAPSHOT + + + + + com.h2database + h2 + ${h2.db.version} + + + + com.zaxxer + HikariCP + ${hikaricp.version} + + + + + com.google.guava + guava + ${guava.version} + + + + + ch.qos.logback + logback-classic + 1.2.3 + + + + + + + + \ No newline at end of file diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java new file mode 100644 index 00000000..19a92f67 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/OhMyWorker.java @@ -0,0 +1,30 @@ +package com.github.kfcfans.oms.worker; + +import com.github.kfcfans.oms.worker.common.utils.SpringUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + +/** + * 客户端启动类 + * + * @author KFCFans + * @since 2020/3/16 + */ +public class OhMyWorker implements ApplicationContextAware, InitializingBean { + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + SpringUtils.inject(applicationContext); + } + + @Override + public void afterPropertiesSet() throws Exception { + + } + + public void init() { + + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ServerRequestActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ServerRequestActor.java new file mode 100644 index 00000000..cbbdbb2c --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/ServerRequestActor.java @@ -0,0 +1,30 @@ +package com.github.kfcfans.oms.worker.actors; + +import akka.actor.AbstractActor; +import com.github.kfcfans.oms.worker.pojo.request.ServerScheduleJobReq; +import lombok.extern.slf4j.Slf4j; + +/** + * 处理来自服务器的请求 + * 请求链:server -> taskTracker -> worker + * + * @author tjq + * @since 2020/3/17 + */ +@Slf4j +public class ServerRequestActor extends AbstractActor { + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(ServerScheduleJobReq.class, this::onReceiveServerScheduleJobReq) + .matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj)) + .build(); + } + + private void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) { + // 接受到任务,创建 TaskTracker + } + + +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerRequestActor.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerRequestActor.java new file mode 100644 index 00000000..2f0717ce --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/actors/TaskTrackerRequestActor.java @@ -0,0 +1,16 @@ +package com.github.kfcfans.oms.worker.actors; + +import akka.actor.AbstractActor; + +/** + * 处理来自 TaskTracker 的请求 + * + * @author tjq + * @since 2020/3/17 + */ +public class TaskTrackerRequestActor extends AbstractActor { + @Override + public Receive createReceive() { + return null; + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java new file mode 100644 index 00000000..0e888878 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/OhMyConfig.java @@ -0,0 +1,20 @@ +package com.github.kfcfans.oms.worker.common; + +import java.util.Set; + +/** + * Worker 配置文件 + * + * @author tjq + * @since 2020/3/16 + */ +public class OhMyConfig { + /** + * 应用名称 + */ + private String appName; + /** + * 调度服务器地址,ip:port (多值使用 , 分隔) + */ + private String serverAddress; +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/CommonSJ.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/CommonSJ.java new file mode 100644 index 00000000..41ee4427 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/CommonSJ.java @@ -0,0 +1,13 @@ +package com.github.kfcfans.oms.worker.common.constants; + +import com.google.common.base.Splitter; + +/** + * splitter & joiner + * + * @author tjq + * @since 2020/3/17 + */ +public class CommonSJ { + public static final Splitter commaSplitter = Splitter.on(","); +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskConstant.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskConstant.java new file mode 100644 index 00000000..bb01c8da --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskConstant.java @@ -0,0 +1,14 @@ +package com.github.kfcfans.oms.worker.common.constants; + +/** + * task 常熟 + * + * @author tjq + * @since 2020/3/17 + */ +public class TaskConstant { + + public static final String ROOT_TASK_NAME = "OMS_ROOT_TASK"; + + public static final String ROOT_TASK_ID = "0"; +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java new file mode 100644 index 00000000..cfdd886a --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/constants/TaskStatus.java @@ -0,0 +1,28 @@ +package com.github.kfcfans.oms.worker.common.constants; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * 任务状态,task_info 表中 status 字段的枚举值 + * + * @author tjq + * @since 2020/3/17 + */ +@Getter +@AllArgsConstructor +public enum TaskStatus { + + /* ******************* TaskTracker 专用 ******************* */ + WAITING_DISPATCH(1, "等待调度器调度"), + DISPATCH_SUCCESS(2, "调度成功"), + DISPATCH_FAILED(3, "调度失败"), + WORKER_PROCESS_SUCCESS(4, "worker执行成功"), + WORKER_PROCESS_FAILED(5, "worker执行失败"); + + + /* ******************* Worker 专用 ******************* */ + + private int value; + private String des; +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/NetUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/NetUtils.java new file mode 100644 index 00000000..3f8348aa --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/NetUtils.java @@ -0,0 +1,460 @@ +package com.github.kfcfans.oms.worker.common.utils; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.*; +import java.util.Enumeration; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Pattern; + +/** + * IP and Port Helper for RPC + * + * @author tjq borrowed from dubbo + * @since 2020/3/16 + */ +@Slf4j +@SuppressWarnings("all") +public class NetUtils { + + private static final String ANYHOST_VALUE = "0.0.0.0"; + private static final String LOCALHOST_KEY = "localhost"; + private static final String LOCALHOST_VALUE = "127.0.0.1"; + + // returned port range is [30000, 39999] + private static final int RND_PORT_START = 30000; + private static final int RND_PORT_RANGE = 10000; + + // valid port range is (0, 65535] + private static final int MIN_PORT = 0; + private static final int MAX_PORT = 65535; + + private static final Pattern ADDRESS_PATTERN = Pattern.compile("^\\d{1,3}(\\.\\d{1,3}){3}\\:\\d{1,5}$"); + private static final Pattern LOCAL_IP_PATTERN = Pattern.compile("127(\\.\\d{1,3}){3}$"); + private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$"); + + private static volatile InetAddress LOCAL_ADDRESS = null; + + private static final String SPLIT_IPV4_CHARECTER = "\\."; + private static final String SPLIT_IPV6_CHARECTER = ":"; + + public static int getRandomPort() { + return RND_PORT_START + ThreadLocalRandom.current().nextInt(RND_PORT_RANGE); + } + + public static int getAvailablePort() { + try (ServerSocket ss = new ServerSocket()) { + ss.bind(null); + return ss.getLocalPort(); + } catch (IOException e) { + return getRandomPort(); + } + } + + public static int getAvailablePort(int port) { + if (port <= 0) { + return getAvailablePort(); + } + for (int i = port; i < MAX_PORT; i++) { + try (ServerSocket ss = new ServerSocket(i)) { + return i; + } catch (IOException e) { + // continue + } + } + return port; + } + + public static boolean isInvalidPort(int port) { + return port <= MIN_PORT || port > MAX_PORT; + } + + public static boolean isValidAddress(String address) { + return ADDRESS_PATTERN.matcher(address).matches(); + } + + public static boolean isLocalHost(String host) { + return host != null + && (LOCAL_IP_PATTERN.matcher(host).matches() + || host.equalsIgnoreCase(LOCALHOST_KEY)); + } + + public static boolean isAnyHost(String host) { + return ANYHOST_VALUE.equals(host); + } + + public static boolean isInvalidLocalHost(String host) { + return host == null + || host.length() == 0 + || host.equalsIgnoreCase(LOCALHOST_KEY) + || host.equals(ANYHOST_VALUE) + || (LOCAL_IP_PATTERN.matcher(host).matches()); + } + + public static boolean isValidLocalHost(String host) { + return !isInvalidLocalHost(host); + } + + public static InetSocketAddress getLocalSocketAddress(String host, int port) { + return isInvalidLocalHost(host) ? + new InetSocketAddress(port) : new InetSocketAddress(host, port); + } + + static boolean isValidV4Address(InetAddress address) { + if (address == null || address.isLoopbackAddress()) { + return false; + } + String name = address.getHostAddress(); + boolean result = (name != null + && IP_PATTERN.matcher(name).matches() + && !ANYHOST_VALUE.equals(name) + && !LOCALHOST_VALUE.equals(name)); + return result; + } + + /** + * Check if an ipv6 address + * + * @return true if it is reachable + */ + static boolean isPreferIPV6Address() { + boolean preferIpv6 = Boolean.getBoolean("java.net.preferIPv6Addresses"); + if (!preferIpv6) { + return false; + } + return false; + } + + /** + * normalize the ipv6 Address, convert scope name to scope id. + * e.g. + * convert + * fe80:0:0:0:894:aeec:f37d:23e1%en0 + * to + * fe80:0:0:0:894:aeec:f37d:23e1%5 + *

+ * The %5 after ipv6 address is called scope id. + * see java doc of {@link Inet6Address} for more details. + * + * @param address the input address + * @return the normalized address, with scope id converted to int + */ + static InetAddress normalizeV6Address(Inet6Address address) { + String addr = address.getHostAddress(); + int i = addr.lastIndexOf('%'); + if (i > 0) { + try { + return InetAddress.getByName(addr.substring(0, i) + '%' + address.getScopeId()); + } catch (UnknownHostException e) { + // ignore + log.debug("Unknown IPV6 address: ", e); + } + } + return address; + } + + /** + * 获取本机 IP 地址 + */ + public static String getLocalHost() { + InetAddress address = getLocalAddress(); + return address == null ? LOCALHOST_VALUE : address.getHostAddress(); + } + + + /** + * Find first valid IP from local network card + * + * @return first valid local IP + */ + public static InetAddress getLocalAddress() { + if (LOCAL_ADDRESS != null) { + return LOCAL_ADDRESS; + } + InetAddress localAddress = getLocalAddress0(); + LOCAL_ADDRESS = localAddress; + return localAddress; + } + + private static Optional toValidAddress(InetAddress address) { + if (address instanceof Inet6Address) { + Inet6Address v6Address = (Inet6Address) address; + if (isPreferIPV6Address()) { + return Optional.ofNullable(normalizeV6Address(v6Address)); + } + } + if (isValidV4Address(address)) { + return Optional.of(address); + } + return Optional.empty(); + } + + private static InetAddress getLocalAddress0() { + InetAddress localAddress = null; + try { + localAddress = InetAddress.getLocalHost(); + Optional addressOp = toValidAddress(localAddress); + if (addressOp.isPresent()) { + return addressOp.get(); + } + } catch (Throwable e) { + log.warn("[Triple]", e); + } + + try { + Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); + if (null == interfaces) { + return localAddress; + } + while (interfaces.hasMoreElements()) { + try { + NetworkInterface network = interfaces.nextElement(); + if (network.isLoopback() || network.isVirtual() || !network.isUp()) { + continue; + } + Enumeration addresses = network.getInetAddresses(); + while (addresses.hasMoreElements()) { + try { + Optional addressOp = toValidAddress(addresses.nextElement()); + if (addressOp.isPresent()) { + try { + if(addressOp.get().isReachable(100)){ + return addressOp.get(); + } + } catch (IOException e) { + // ignore + } + } + } catch (Throwable e) { + log.warn("[Triple]", e); + } + } + } catch (Throwable e) { + log.warn("[Triple]", e); + } + } + } catch (Throwable e) { + log.warn("[Triple]", e); + } + return localAddress; + } + + public static String getHostName(String address) { + try { + int i = address.indexOf(':'); + if (i > -1) { + address = address.substring(0, i); + } + InetAddress inetAddress = InetAddress.getByName(address); + if (inetAddress != null) { + return inetAddress.getHostName(); + } + } catch (Throwable e) { + // ignore + } + return address; + } + + /** + * @param hostName + * @return ip address or hostName if UnknownHostException + */ + public static String getIpByHost(String hostName) { + try { + return InetAddress.getByName(hostName).getHostAddress(); + } catch (UnknownHostException e) { + return hostName; + } + } + + public static String toAddressString(InetSocketAddress address) { + return address.getAddress().getHostAddress() + ":" + address.getPort(); + } + + public static InetSocketAddress toAddress(String address) { + int i = address.indexOf(':'); + String host; + int port; + if (i > -1) { + host = address.substring(0, i); + port = Integer.parseInt(address.substring(i + 1)); + } else { + host = address; + port = 0; + } + return new InetSocketAddress(host, port); + } + + public static String toURL(String protocol, String host, int port, String path) { + StringBuilder sb = new StringBuilder(); + sb.append(protocol).append("://"); + sb.append(host).append(':').append(port); + if (path.charAt(0) != '/') { + sb.append('/'); + } + sb.append(path); + return sb.toString(); + } + + public static void joinMulticastGroup(MulticastSocket multicastSocket, InetAddress multicastAddress) throws IOException { + setInterface(multicastSocket, multicastAddress instanceof Inet6Address); + multicastSocket.setLoopbackMode(false); + multicastSocket.joinGroup(multicastAddress); + } + + public static void setInterface(MulticastSocket multicastSocket, boolean preferIpv6) throws IOException { + boolean interfaceSet = false; + Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); + while (interfaces.hasMoreElements()) { + NetworkInterface i = (NetworkInterface) interfaces.nextElement(); + Enumeration addresses = i.getInetAddresses(); + while (addresses.hasMoreElements()) { + InetAddress address = (InetAddress) addresses.nextElement(); + if (preferIpv6 && address instanceof Inet6Address) { + try { + if(address.isReachable(100)){ + multicastSocket.setInterface(address); + interfaceSet = true; + break; + } + } catch (IOException e) { + // ignore + } + } else if (!preferIpv6 && address instanceof Inet4Address) { + try { + if(address.isReachable(100)){ + multicastSocket.setInterface(address); + interfaceSet = true; + break; + } + } catch (IOException e) { + // ignore + } + } + } + if (interfaceSet) { + break; + } + } + } + + /** + * @param pattern + * @param host + * @param port + * @return + * @throws UnknownHostException + */ + public static boolean matchIpRange(String pattern, String host, int port) throws UnknownHostException { + if (pattern == null || host == null) { + throw new IllegalArgumentException("Illegal Argument pattern or hostName. Pattern:" + pattern + ", Host:" + host); + } + pattern = pattern.trim(); + if ("*.*.*.*".equals(pattern) || "*".equals(pattern)) { + return true; + } + + InetAddress inetAddress = InetAddress.getByName(host); + boolean isIpv4 = isValidV4Address(inetAddress) ? true : false; + String[] hostAndPort = getPatternHostAndPort(pattern, isIpv4); + if (hostAndPort[1] != null && !hostAndPort[1].equals(String.valueOf(port))) { + return false; + } + pattern = hostAndPort[0]; + + String splitCharacter = SPLIT_IPV4_CHARECTER; + if (!isIpv4) { + splitCharacter = SPLIT_IPV6_CHARECTER; + } + String[] mask = pattern.split(splitCharacter); + //check format of pattern + checkHostPattern(pattern, mask, isIpv4); + + host = inetAddress.getHostAddress(); + + String[] ipAddress = host.split(splitCharacter); + if (pattern.equals(host)) { + return true; + } + // short name condition + if (!ipPatternContainExpression(pattern)) { + InetAddress patternAddress = InetAddress.getByName(pattern); + if (patternAddress.getHostAddress().equals(host)) { + return true; + } else { + return false; + } + } + for (int i = 0; i < mask.length; i++) { + if ("*".equals(mask[i]) || mask[i].equals(ipAddress[i])) { + continue; + } else if (mask[i].contains("-")) { + String[] rangeNumStrs = mask[i].split("-"); + if (rangeNumStrs.length != 2) { + throw new IllegalArgumentException("There is wrong format of ip Address: " + mask[i]); + } + Integer min = getNumOfIpSegment(rangeNumStrs[0], isIpv4); + Integer max = getNumOfIpSegment(rangeNumStrs[1], isIpv4); + Integer ip = getNumOfIpSegment(ipAddress[i], isIpv4); + if (ip < min || ip > max) { + return false; + } + } else if ("0".equals(ipAddress[i]) && ("0".equals(mask[i]) || "00".equals(mask[i]) || "000".equals(mask[i]) || "0000".equals(mask[i]))) { + continue; + } else if (!mask[i].equals(ipAddress[i])) { + return false; + } + } + return true; + } + + private static boolean ipPatternContainExpression(String pattern) { + return pattern.contains("*") || pattern.contains("-"); + } + + private static void checkHostPattern(String pattern, String[] mask, boolean isIpv4) { + if (!isIpv4) { + if (mask.length != 8 && ipPatternContainExpression(pattern)) { + throw new IllegalArgumentException("If you config ip expression that contains '*' or '-', please fill qulified ip pattern like 234e:0:4567:0:0:0:3d:*. "); + } + if (mask.length != 8 && !pattern.contains("::")) { + throw new IllegalArgumentException("The host is ipv6, but the pattern is not ipv6 pattern : " + pattern); + } + } else { + if (mask.length != 4) { + throw new IllegalArgumentException("The host is ipv4, but the pattern is not ipv4 pattern : " + pattern); + } + } + } + + private static String[] getPatternHostAndPort(String pattern, boolean isIpv4) { + String[] result = new String[2]; + if (pattern.startsWith("[") && pattern.contains("]:")) { + int end = pattern.indexOf("]:"); + result[0] = pattern.substring(1, end); + result[1] = pattern.substring(end + 2); + return result; + } else if (pattern.startsWith("[") && pattern.endsWith("]")) { + result[0] = pattern.substring(1, pattern.length() - 1); + result[1] = null; + return result; + } else if (isIpv4 && pattern.contains(":")) { + int end = pattern.indexOf(":"); + result[0] = pattern.substring(0, end); + result[1] = pattern.substring(end + 1); + return result; + } else { + result[0] = pattern; + return result; + } + } + + private static Integer getNumOfIpSegment(String ipSegment, boolean isIpv4) { + if (isIpv4) { + return Integer.parseInt(ipSegment); + } + return Integer.parseInt(ipSegment, 16); + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SpringUtils.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SpringUtils.java new file mode 100644 index 00000000..1b8c94d0 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/common/utils/SpringUtils.java @@ -0,0 +1,34 @@ +package com.github.kfcfans.oms.worker.common.utils; + +import org.springframework.context.ApplicationContext; + +/** + * Spring ApplicationContext 工具类 + * + * @author tjq + * @since 2020/3/16 + */ +public class SpringUtils { + + private static boolean supportSpringBean = false; + private static ApplicationContext context; + + public static void inject(ApplicationContext ctx) { + context = ctx; + supportSpringBean = true; + } + + public static boolean supportSpringBean() { + return supportSpringBean; + } + + public static T getBean(Class clz) { + return context.getBean(clz); + } + + @SuppressWarnings("unchecked") + public static T getBean(String className) { + return (T) context.getBean(className); + } + +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java new file mode 100644 index 00000000..00a5f3f9 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/ConnectionFactory.java @@ -0,0 +1,43 @@ +package com.github.kfcfans.oms.worker.persistence; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; + +/** + * 数据库连接管理 + * + * @author tjq + * @since 2020/3/17 + */ +public class ConnectionFactory { + + private static DataSource dataSource; + + public static Connection getConnection() throws SQLException { + return getDataSource().getConnection(); + } + + private static DataSource getDataSource() { + if (dataSource != null) { + return dataSource; + } + synchronized (ConnectionFactory.class) { + if (dataSource == null) { + HikariConfig config = new HikariConfig(); + config.setDriverClassName("org.h2.Driver"); + config.setJdbcUrl("jdbc:h2:file:~/.h2/oms/oms_worker_db"); + config.setAutoCommit(true); + // 池中最小空闲连接数量 + config.setMinimumIdle(2); + // 池中最大连接数量 + config.setMaximumPoolSize(16); + dataSource = new HikariDataSource(config); + } + } + return dataSource; + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java new file mode 100644 index 00000000..27665bce --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/SimpleTaskQuery.java @@ -0,0 +1,54 @@ +package com.github.kfcfans.oms.worker.persistence; + +import lombok.Data; +import org.springframework.util.StringUtils; + +/** + * 简单查询直接类,只支持 select * from task_info where xxx = xxx and xxx = xxx 的查询 + * + * @author tjq + * @since 2020/3/17 + */ +@Data +public class SimpleTaskQuery { + + private static final String PREFIX_SQL = "select * from task_info where "; + private static final String LINK = " and "; + + private String taskId; + private String jobId; + private String instanceId; + private String taskName; + private String address; + private Integer status; + + private Integer limit; + + public String getQuerySQL() { + StringBuilder sb = new StringBuilder(PREFIX_SQL); + if (!StringUtils.isEmpty(taskId)) { + sb.append("task_id = '").append(taskId).append("'").append(LINK); + } + if (!StringUtils.isEmpty(jobId)) { + sb.append("job_id = '").append(jobId).append("'").append(LINK); + } + if (!StringUtils.isEmpty(instanceId)) { + sb.append("instance_id = '").append(instanceId).append("'").append(LINK); + } + if (!StringUtils.isEmpty(address)) { + sb.append("address = '").append(address).append("'").append(LINK); + } + if (!StringUtils.isEmpty(taskName)) { + sb.append("task_name = '").append(taskName).append("'").append(LINK); + } + if (status != null) { + sb.append("status = ").append(status).append(LINK); + } + + String substring = sb.substring(0, sb.length() - LINK.length()); + if (limit != null) { + substring = substring + " limit " + limit; + } + return substring; + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java new file mode 100644 index 00000000..77c2c51f --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAO.java @@ -0,0 +1,34 @@ +package com.github.kfcfans.oms.worker.persistence; + +import java.util.Collection; +import java.util.List; + +/** + * 任务持久化接口 + * + * @author tjq + * @since 2020/3/17 + */ +public interface TaskDAO { + + /** + * 初始化任务表 + */ + boolean initTable(); + + /** + * 插入任务数据 + */ + boolean save(TaskDO task); + boolean batchSave(Collection tasks); + + /** + * 更新任务数据,必须有主键 instanceId + taskId + */ + boolean update(TaskDO task); + + TaskDO selectByKey(String instanceId, String taskId); + + List simpleQuery(SimpleTaskQuery query); + +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java new file mode 100644 index 00000000..cc90b0d4 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDAOImpl.java @@ -0,0 +1,170 @@ +package com.github.kfcfans.oms.worker.persistence; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; + +import java.sql.*; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * 任务持久化实现层,表名:task_info + * + * @author tjq + * @since 2020/3/17 + */ +@Slf4j +public class TaskDAOImpl implements TaskDAO { + + @Override + public boolean initTable() { + + String delTableSQL = "drop table if exists task_info"; + String createTableSQL = "create table task_info (task_id varchar(20), instance_id varchar(20), job_id varchar(20), task_name varchar(20), task_content text, address varchar(20), status int(11), result text, created_time bigint(20), last_modified_time bigint(20), unique key pkey (instance_id, task_id))"; + + try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { + stat.execute(delTableSQL); + stat.execute(createTableSQL); + }catch (Exception e) { + log.error("[TaskDAO] initTable failed.", e); + return false; + } + return true; + } + + @Override + public boolean save(TaskDO task) { + String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?)"; + try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { + fillInsertPreparedStatement(task, ps); + return ps.execute(); + }catch (Exception e) { + log.error("[TaskDAO] insert failed.", e); + } + return false; + } + + @Override + public boolean batchSave(Collection tasks) { + String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?)"; + try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { + + for (TaskDO task : tasks) { + + fillInsertPreparedStatement(task, ps); + ps.addBatch(); + } + + ps.executeBatch(); + return true; + + }catch (Exception e) { + log.error("[TaskDAO] insert failed.", e); + } + return false; + } + + + @Override + public boolean update(TaskDO task) { + return false; + } + + @Override + public TaskDO selectByKey(String instanceId, String taskId) { + String selectSQL = "select * from task_info where instance_id = ? and task_id = ?"; + ResultSet rs = null; + try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(selectSQL)) { + ps.setString(1, instanceId); + ps.setString(2, taskId); + rs = ps.executeQuery(); + if (rs.next()) { + return convert(rs); + } + }catch (Exception e) { + log.error("[TaskDAO] selectByKey failed(instanceId = {}, taskId = {}).", instanceId, taskId, e); + }finally { + if (rs != null) { + try { + rs.close(); + }catch (Exception ignore) { + } + } + } + return null; + } + + @Override + public List simpleQuery(SimpleTaskQuery query) { + + ResultSet rs = null; + String sql = query.getQuerySQL(); + List result = Lists.newLinkedList(); + try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { + rs = ps.executeQuery(); + while (rs.next()) { + result.add(convert(rs)); + } + }catch (Exception e) { + log.error("[TaskDAO] simpleQuery failed(sql = {}).", sql, e); + }finally { + if (rs != null) { + try { + rs.close(); + }catch (Exception ignore) { + } + } + } + return result; + } + + private static TaskDO convert(ResultSet rs) throws SQLException { + TaskDO task = new TaskDO(); + task.setTaskId(rs.getString("task_id")); + task.setInstanceId(rs.getString("instance_id")); + task.setJobId(rs.getString("job_id")); + task.setTaskName(rs.getString("task_name")); + task.setTaskContent(rs.getString("task_content")); + task.setAddress(rs.getString("address")); + task.setStatus(rs.getInt("status")); + task.setResult(rs.getString("result")); + task.setCreatedTime(rs.getLong("created_time")); + task.setLastModifiedTime(rs.getLong("last_modified_time")); + return task; + } + + private static void fillInsertPreparedStatement(TaskDO task, PreparedStatement ps) throws SQLException { + ps.setString(1, task.getTaskId()); + ps.setString(2, task.getInstanceId()); + ps.setString(3, task.getJobId()); + ps.setString(4, task.getTaskName()); + ps.setString(5, task.getTaskContent()); + ps.setString(6, task.getAddress()); + ps.setInt(7, task.getStatus()); + ps.setString(8, task.getResult()); + ps.setLong(9, task.getCreatedTime()); + ps.setLong(10, task.getLastModifiedTime()); + } + + public static void main(String[] args) throws Exception { + TaskDAOImpl taskDAO = new TaskDAOImpl(); + taskDAO.initTable(); + + TaskDO taskDO = new TaskDO(); + taskDO.setJobId("11"); + taskDO.setInstanceId("22"); + taskDO.setTaskId("2.1"); + taskDO.setTaskName("zzz"); + taskDO.setTaskContent("hhhh"); + + taskDAO.save(taskDO); + + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId("22"); + query.setTaskId("2.1"); + System.out.println(taskDAO.simpleQuery(query)); + + Thread.sleep(100000); + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java new file mode 100644 index 00000000..6a84e136 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskDO.java @@ -0,0 +1,35 @@ +package com.github.kfcfans.oms.worker.persistence; + +import lombok.Data; + +/** + * TaskDO(为了简化 DAO 层,一张表实现两种功能) + * 对于 TaskTracker,task_info 存储了当前 JobInstance 所有的子任务信息 + * 对于普通的 Worker,task_info 存储了当前无法处理的任务信息 + * + * @author tjq + * @since 2020/3/17 + */ +@Data +public class TaskDO { + + // 层次命名法,可以表示 Map 后的父子关系,如 0.1.2 代表 rootTask map 的第一个 task map 的第二个 task + private String taskId; + + private String jobId; + private String instanceId; + // 任务名称 + private String taskName; + // 任务参数 + private String taskContent; + // 对于JobTracker为workerAddress,对于普通Worker为jobTrackerAddress + private String address; + // 任务状态,0~10代表 JobTracker 使用,11~20代表普通Worker使用 + private int status; + // 执行结果 + private String result; + // 创建时间 + private long createdTime; + // 最后修改时间 + private long lastModifiedTime; +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java new file mode 100644 index 00000000..f3cd1c6e --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/persistence/TaskPersistenceService.java @@ -0,0 +1,53 @@ +package com.github.kfcfans.oms.worker.persistence; + + +import com.github.kfcfans.oms.worker.common.constants.TaskStatus; +import com.google.common.collect.Lists; +import org.springframework.util.CollectionUtils; + +import java.util.List; + +/** + * 任务持久化服务 + * + * @author tjq + * @since 2020/3/17 + */ +public class TaskPersistenceService { + + private TaskDAO taskDAO = new TaskDAOImpl(); + private static final int MAX_BATCH_SIZE = 50; + + public boolean save(TaskDO task) { + return taskDAO.save(task); + } + + public boolean batchSave(List tasks) { + if (CollectionUtils.isEmpty(tasks)) { + return true; + } + if (tasks.size() <= MAX_BATCH_SIZE) { + return taskDAO.batchSave(tasks); + } + List> partition = Lists.partition(tasks, MAX_BATCH_SIZE); + for (List p : partition) { + boolean b = taskDAO.batchSave(p); + if (!b) { + return false; + } + } + return true; + } + + + /** + * 获取 TaskTracker 准备派发给 Worker 执行的 task + */ + public List getNeedDispatchTask(String instanceId) { + SimpleTaskQuery query = new SimpleTaskQuery(); + query.setInstanceId(instanceId); + query.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); + query.setLimit(100); + return taskDAO.simpleQuery(query); + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/JobInstanceInfo.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/JobInstanceInfo.java new file mode 100644 index 00000000..0ba2a0c7 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/model/JobInstanceInfo.java @@ -0,0 +1,33 @@ +package com.github.kfcfans.oms.worker.pojo.model; + +import lombok.Data; + +/** + * 被调度执行的任务实例详情 + * + * @author tjq + * @since 2020/3/16 + */ +@Data +public class JobInstanceInfo { + + private String jobId; + private String instanceId; + // 任务执行类型,单机、广播、MR + private String executeType; + // 处理器类型(JavaBean、Jar、脚本等) + private String processorType; + // 处理器信息 + private String processorInfo; + // 任务执行时间限制,单位毫秒 + private long timeLimit; + // 可用处理器地址,可能多值,逗号分隔 + private String workerAddress; + + /* *********************** Map/MapReduce 任务专用 *********************** */ + + // 每台机器的处理线程数上限 + private int threadConcurrency; + // 子任务重试次数(任务本身的重试机制由server控制) + private int taskRetryNum; +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ServerScheduleJobReq.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ServerScheduleJobReq.java new file mode 100644 index 00000000..9bbdfba9 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/ServerScheduleJobReq.java @@ -0,0 +1,35 @@ +package com.github.kfcfans.oms.worker.pojo.request; + +/** + * 服务端调度任务请求(一次任务处理的入口) + * + * @author tjq + * @since 2020/3/17 + */ +public class ServerScheduleJobReq { + + // 调度的服务器地址,默认通讯目标 + private String serverAddress; + + /* *********************** 任务相关属性 *********************** */ + + private String jobId; + private String instanceId; + // 任务执行类型,单机、广播、MR + private String executeType; + // 处理器类型(JavaBean、Jar、脚本等) + private String processorType; + // 处理器信息 + private String processorInfo; + // 任务执行时间限制,单位毫秒 + private long timeLimit; + // 可用处理器地址,可能多值,逗号分隔 + private String workerAddress; + + /* *********************** Map/MapReduce 任务专用 *********************** */ + + // 每台机器的处理线程数上限 + private int threadConcurrency; + // 子任务重试次数(任务本身的重试机制由server控制) + private int taskRetryNum; +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/WorkerMapTaskRequest.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/WorkerMapTaskRequest.java new file mode 100644 index 00000000..09660aca --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/pojo/request/WorkerMapTaskRequest.java @@ -0,0 +1,10 @@ +package com.github.kfcfans.oms.worker.pojo.request; + +/** + * WorkerMapTaskRequest + * + * @author tjq + * @since 2020/3/17 + */ +public class WorkerMapTaskRequest { +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/BroadcastTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/BroadcastTaskTracker.java new file mode 100644 index 00000000..6432bc1e --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/BroadcastTaskTracker.java @@ -0,0 +1,15 @@ +package com.github.kfcfans.oms.worker.tracker; + +/** + * 广播任务使用的 TaskTracker + * + * @author tjq + * @since 2020/3/17 + */ +public class BroadcastTaskTracker extends TaskTracker { + + @Override + public void dispatch() { + + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/MapReduceTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/MapReduceTaskTracker.java new file mode 100644 index 00000000..3de1b3b9 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/MapReduceTaskTracker.java @@ -0,0 +1,17 @@ +package com.github.kfcfans.oms.worker.tracker; + +import com.github.kfcfans.oms.worker.pojo.request.WorkerMapTaskRequest; + + +/** + * MapReduce 任务使用的 TaskTracker + * + * @author tjq + * @since 2020/3/17 + */ +public class MapReduceTaskTracker extends StandaloneTaskTracker { + + public void newTask(WorkerMapTaskRequest mapRequest) { + + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/StandaloneTaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/StandaloneTaskTracker.java new file mode 100644 index 00000000..70770a8b --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/StandaloneTaskTracker.java @@ -0,0 +1,15 @@ +package com.github.kfcfans.oms.worker.tracker; + +/** + * 单机任务使用的 TaskTracker + * + * @author tjq + * @since 2020/3/17 + */ +public class StandaloneTaskTracker extends TaskTracker { + + @Override + public void dispatch() { + + } +} diff --git a/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/TaskTracker.java b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/TaskTracker.java new file mode 100644 index 00000000..5a91d942 --- /dev/null +++ b/oh-my-scheduler-worker/src/main/java/com/github/kfcfans/oms/worker/tracker/TaskTracker.java @@ -0,0 +1,93 @@ +package com.github.kfcfans.oms.worker.tracker; + +import akka.actor.ActorRef; +import com.github.kfcfans.common.ExecuteType; +import com.github.kfcfans.oms.worker.common.constants.CommonSJ; +import com.github.kfcfans.oms.worker.common.constants.TaskConstant; +import com.github.kfcfans.oms.worker.common.utils.NetUtils; +import com.github.kfcfans.oms.worker.persistence.TaskDO; +import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService; +import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo; +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * 负责管理 JobInstance 的运行,主要包括任务的派发(MR可能存在大量的任务)和状态的更新 + * + * @author tjq + * @since 2020/3/17 + */ +public abstract class TaskTracker { + + // 任务实例信息 + protected JobInstanceInfo jobInstanceInfo; + protected ActorRef actor; + + protected TaskPersistenceService taskPersistenceService; + + + /** + * 分发任务 + */ + public abstract void dispatch(); + + public void updateTaskStatus() { + } + + public boolean finished() { + return false; + } + + /** + * 持久化根任务,只有完成持久化才能视为任务开始running(先持久化,再报告server) + */ + private void persistenceTask() { + + ExecuteType executeType = ExecuteType.valueOf(jobInstanceInfo.getExecuteType()); + boolean persistenceResult; + + // 单机、MR模型下,根任务模型本机直接执行(JobTracker一般为负载最小的机器,且MR的根任务通常伴随着 map 操作,本机执行可以有效减少网络I/O开销) + if (executeType != ExecuteType.BROADCAST) { + TaskDO rootTask = new TaskDO(); + rootTask.setStatus(1); + rootTask.setJobId(jobInstanceInfo.getJobId()); + rootTask.setInstanceId(jobInstanceInfo.getInstanceId()); + rootTask.setTaskId(TaskConstant.ROOT_TASK_ID); + rootTask.setAddress(NetUtils.getLocalHost()); + rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME); + rootTask.setCreatedTime(System.currentTimeMillis()); + rootTask.setCreatedTime(System.currentTimeMillis()); + + persistenceResult = taskPersistenceService.save(rootTask); + }else { + List taskList = Lists.newLinkedList(); + List addrList = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getWorkerAddress()); + for (int i = 0; i < addrList.size(); i++) { + TaskDO task = new TaskDO(); + task.setStatus(1); + task.setJobId(jobInstanceInfo.getJobId()); + task.setInstanceId(jobInstanceInfo.getInstanceId()); + task.setTaskId(String.valueOf(i)); + task.setAddress(addrList.get(i)); + task.setTaskName(TaskConstant.ROOT_TASK_NAME); + task.setCreatedTime(System.currentTimeMillis()); + task.setCreatedTime(System.currentTimeMillis()); + + taskList.add(task); + } + persistenceResult = taskPersistenceService.batchSave(taskList); + } + + if (!persistenceResult) { + throw new RuntimeException("create root task failed."); + } + } + + /** + * 启动任务分发器 + */ + private void initDispatcher() { + + } +} diff --git a/oh-my-scheduler.iml b/oh-my-scheduler.iml new file mode 100644 index 00000000..78b2cc53 --- /dev/null +++ b/oh-my-scheduler.iml @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 00000000..1cf11b5a --- /dev/null +++ b/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + + com.github.kfcfans + oh-my-scheduler + 1.0.0-SNAPSHOT + + oh-my-scheduler-worker + oh-my-scheduler-server + oh-my-scheduler-common + + pom + + + 1.8 + 1.8 + 1.8 + UTF-8 + UTF-8 + 1.18.12 + + + + + + org.projectlombok + lombok + ${lombok.version} + provided + + + + \ No newline at end of file