init project

This commit is contained in:
tjq 2020-03-17 17:27:19 +08:00
parent fe53dfd7e3
commit f74a87772a
33 changed files with 1485 additions and 18 deletions

49
.gitignore vendored
View File

@ -1,23 +1,36 @@
# Compiled class file
*.class
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
# Log file
*.log
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
# BlueJ files
*.ctxt
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
# Mobile Tools for Java (J2ME)
.mtj.tmp/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
# Package Files #
### VS Code ###
.vscode/
### others ###
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
*.log
*/.DS_Store

2
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,2 @@
# Default ignored files
/workspace.xml

1
.idea/.name generated Normal file
View File

@ -0,0 +1 @@
oh-my-scheduler

19
.idea/misc.xml generated Normal file
View File

@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
<option value="$PROJECT_DIR$/pom.xml" />
</list>
</option>
<option name="ignoredFiles">
<set>
<option value="$PROJECT_DIR$/oh-my-scheduler-sdk/pom.xml" />
</set>
</option>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8.0_231" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oh-my-scheduler</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>oh-my-scheduler-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
</project>

View File

@ -0,0 +1,13 @@
package com.github.kfcfans.common;
/**
* 任务执行类型
*
* @author tjq
* @since 2020/3/17
*/
public enum ExecuteType {
STANDALONE,
BROADCAST,
MAP_REDUCE
}

View File

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oh-my-scheduler</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>oh-my-scheduler-server</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
</project>

View File

@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oh-my-scheduler</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>oh-my-scheduler-worker</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<akka.version>2.6.4</akka.version>
<slf4j.version>1.7.30</slf4j.version>
<oms.common.version>1.0.0-SNAPSHOT</oms.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<guava.version>28.2-jre</guava.version>
</properties>
<dependencies>
<!-- Spring 编译期依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- akka remote -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<!-- oms-common -->
<dependency>
<groupId>com.github.kfcfans</groupId>
<artifactId>oh-my-scheduler-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<!-- h2 database -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.db.version}</version>
</dependency>
<!-- HikariCP -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikaricp.version}</version>
</dependency>
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- 开发阶段输出日志 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,30 @@
package com.github.kfcfans.oms.worker;
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
/**
* 客户端启动类
*
* @author KFCFans
* @since 2020/3/16
*/
public class OhMyWorker implements ApplicationContextAware, InitializingBean {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringUtils.inject(applicationContext);
}
@Override
public void afterPropertiesSet() throws Exception {
}
public void init() {
}
}

View File

@ -0,0 +1,30 @@
package com.github.kfcfans.oms.worker.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.oms.worker.pojo.request.ServerScheduleJobReq;
import lombok.extern.slf4j.Slf4j;
/**
* 处理来自服务器的请求
* 请求链server -> taskTracker -> worker
*
* @author tjq
* @since 2020/3/17
*/
@Slf4j
public class ServerRequestActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ServerScheduleJobReq.class, this::onReceiveServerScheduleJobReq)
.matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj))
.build();
}
private void onReceiveServerScheduleJobReq(ServerScheduleJobReq req) {
// 接受到任务创建 TaskTracker
}
}

View File

@ -0,0 +1,16 @@
package com.github.kfcfans.oms.worker.actors;
import akka.actor.AbstractActor;
/**
* 处理来自 TaskTracker 的请求
*
* @author tjq
* @since 2020/3/17
*/
public class TaskTrackerRequestActor extends AbstractActor {
@Override
public Receive createReceive() {
return null;
}
}

View File

@ -0,0 +1,20 @@
package com.github.kfcfans.oms.worker.common;
import java.util.Set;
/**
* Worker 配置文件
*
* @author tjq
* @since 2020/3/16
*/
public class OhMyConfig {
/**
* 应用名称
*/
private String appName;
/**
* 调度服务器地址ip:port 多值使用 , 分隔
*/
private String serverAddress;
}

View File

@ -0,0 +1,13 @@
package com.github.kfcfans.oms.worker.common.constants;
import com.google.common.base.Splitter;
/**
* splitter & joiner
*
* @author tjq
* @since 2020/3/17
*/
public class CommonSJ {
public static final Splitter commaSplitter = Splitter.on(",");
}

View File

@ -0,0 +1,14 @@
package com.github.kfcfans.oms.worker.common.constants;
/**
* task 常熟
*
* @author tjq
* @since 2020/3/17
*/
public class TaskConstant {
public static final String ROOT_TASK_NAME = "OMS_ROOT_TASK";
public static final String ROOT_TASK_ID = "0";
}

View File

@ -0,0 +1,28 @@
package com.github.kfcfans.oms.worker.common.constants;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 任务状态task_info 表中 status 字段的枚举值
*
* @author tjq
* @since 2020/3/17
*/
@Getter
@AllArgsConstructor
public enum TaskStatus {
/* ******************* TaskTracker 专用 ******************* */
WAITING_DISPATCH(1, "等待调度器调度"),
DISPATCH_SUCCESS(2, "调度成功"),
DISPATCH_FAILED(3, "调度失败"),
WORKER_PROCESS_SUCCESS(4, "worker执行成功"),
WORKER_PROCESS_FAILED(5, "worker执行失败");
/* ******************* Worker 专用 ******************* */
private int value;
private String des;
}

View File

@ -0,0 +1,460 @@
package com.github.kfcfans.oms.worker.common.utils;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.*;
import java.util.Enumeration;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
/**
* IP and Port Helper for RPC
*
* @author tjq borrowed from dubbo
* @since 2020/3/16
*/
@Slf4j
@SuppressWarnings("all")
public class NetUtils {
private static final String ANYHOST_VALUE = "0.0.0.0";
private static final String LOCALHOST_KEY = "localhost";
private static final String LOCALHOST_VALUE = "127.0.0.1";
// returned port range is [30000, 39999]
private static final int RND_PORT_START = 30000;
private static final int RND_PORT_RANGE = 10000;
// valid port range is (0, 65535]
private static final int MIN_PORT = 0;
private static final int MAX_PORT = 65535;
private static final Pattern ADDRESS_PATTERN = Pattern.compile("^\\d{1,3}(\\.\\d{1,3}){3}\\:\\d{1,5}$");
private static final Pattern LOCAL_IP_PATTERN = Pattern.compile("127(\\.\\d{1,3}){3}$");
private static final Pattern IP_PATTERN = Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$");
private static volatile InetAddress LOCAL_ADDRESS = null;
private static final String SPLIT_IPV4_CHARECTER = "\\.";
private static final String SPLIT_IPV6_CHARECTER = ":";
public static int getRandomPort() {
return RND_PORT_START + ThreadLocalRandom.current().nextInt(RND_PORT_RANGE);
}
public static int getAvailablePort() {
try (ServerSocket ss = new ServerSocket()) {
ss.bind(null);
return ss.getLocalPort();
} catch (IOException e) {
return getRandomPort();
}
}
public static int getAvailablePort(int port) {
if (port <= 0) {
return getAvailablePort();
}
for (int i = port; i < MAX_PORT; i++) {
try (ServerSocket ss = new ServerSocket(i)) {
return i;
} catch (IOException e) {
// continue
}
}
return port;
}
public static boolean isInvalidPort(int port) {
return port <= MIN_PORT || port > MAX_PORT;
}
public static boolean isValidAddress(String address) {
return ADDRESS_PATTERN.matcher(address).matches();
}
public static boolean isLocalHost(String host) {
return host != null
&& (LOCAL_IP_PATTERN.matcher(host).matches()
|| host.equalsIgnoreCase(LOCALHOST_KEY));
}
public static boolean isAnyHost(String host) {
return ANYHOST_VALUE.equals(host);
}
public static boolean isInvalidLocalHost(String host) {
return host == null
|| host.length() == 0
|| host.equalsIgnoreCase(LOCALHOST_KEY)
|| host.equals(ANYHOST_VALUE)
|| (LOCAL_IP_PATTERN.matcher(host).matches());
}
public static boolean isValidLocalHost(String host) {
return !isInvalidLocalHost(host);
}
public static InetSocketAddress getLocalSocketAddress(String host, int port) {
return isInvalidLocalHost(host) ?
new InetSocketAddress(port) : new InetSocketAddress(host, port);
}
static boolean isValidV4Address(InetAddress address) {
if (address == null || address.isLoopbackAddress()) {
return false;
}
String name = address.getHostAddress();
boolean result = (name != null
&& IP_PATTERN.matcher(name).matches()
&& !ANYHOST_VALUE.equals(name)
&& !LOCALHOST_VALUE.equals(name));
return result;
}
/**
* Check if an ipv6 address
*
* @return true if it is reachable
*/
static boolean isPreferIPV6Address() {
boolean preferIpv6 = Boolean.getBoolean("java.net.preferIPv6Addresses");
if (!preferIpv6) {
return false;
}
return false;
}
/**
* normalize the ipv6 Address, convert scope name to scope id.
* e.g.
* convert
* fe80:0:0:0:894:aeec:f37d:23e1%en0
* to
* fe80:0:0:0:894:aeec:f37d:23e1%5
* <p>
* The %5 after ipv6 address is called scope id.
* see java doc of {@link Inet6Address} for more details.
*
* @param address the input address
* @return the normalized address, with scope id converted to int
*/
static InetAddress normalizeV6Address(Inet6Address address) {
String addr = address.getHostAddress();
int i = addr.lastIndexOf('%');
if (i > 0) {
try {
return InetAddress.getByName(addr.substring(0, i) + '%' + address.getScopeId());
} catch (UnknownHostException e) {
// ignore
log.debug("Unknown IPV6 address: ", e);
}
}
return address;
}
/**
* 获取本机 IP 地址
*/
public static String getLocalHost() {
InetAddress address = getLocalAddress();
return address == null ? LOCALHOST_VALUE : address.getHostAddress();
}
/**
* Find first valid IP from local network card
*
* @return first valid local IP
*/
public static InetAddress getLocalAddress() {
if (LOCAL_ADDRESS != null) {
return LOCAL_ADDRESS;
}
InetAddress localAddress = getLocalAddress0();
LOCAL_ADDRESS = localAddress;
return localAddress;
}
private static Optional<InetAddress> toValidAddress(InetAddress address) {
if (address instanceof Inet6Address) {
Inet6Address v6Address = (Inet6Address) address;
if (isPreferIPV6Address()) {
return Optional.ofNullable(normalizeV6Address(v6Address));
}
}
if (isValidV4Address(address)) {
return Optional.of(address);
}
return Optional.empty();
}
private static InetAddress getLocalAddress0() {
InetAddress localAddress = null;
try {
localAddress = InetAddress.getLocalHost();
Optional<InetAddress> addressOp = toValidAddress(localAddress);
if (addressOp.isPresent()) {
return addressOp.get();
}
} catch (Throwable e) {
log.warn("[Triple]", e);
}
try {
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
if (null == interfaces) {
return localAddress;
}
while (interfaces.hasMoreElements()) {
try {
NetworkInterface network = interfaces.nextElement();
if (network.isLoopback() || network.isVirtual() || !network.isUp()) {
continue;
}
Enumeration<InetAddress> addresses = network.getInetAddresses();
while (addresses.hasMoreElements()) {
try {
Optional<InetAddress> addressOp = toValidAddress(addresses.nextElement());
if (addressOp.isPresent()) {
try {
if(addressOp.get().isReachable(100)){
return addressOp.get();
}
} catch (IOException e) {
// ignore
}
}
} catch (Throwable e) {
log.warn("[Triple]", e);
}
}
} catch (Throwable e) {
log.warn("[Triple]", e);
}
}
} catch (Throwable e) {
log.warn("[Triple]", e);
}
return localAddress;
}
public static String getHostName(String address) {
try {
int i = address.indexOf(':');
if (i > -1) {
address = address.substring(0, i);
}
InetAddress inetAddress = InetAddress.getByName(address);
if (inetAddress != null) {
return inetAddress.getHostName();
}
} catch (Throwable e) {
// ignore
}
return address;
}
/**
* @param hostName
* @return ip address or hostName if UnknownHostException
*/
public static String getIpByHost(String hostName) {
try {
return InetAddress.getByName(hostName).getHostAddress();
} catch (UnknownHostException e) {
return hostName;
}
}
public static String toAddressString(InetSocketAddress address) {
return address.getAddress().getHostAddress() + ":" + address.getPort();
}
public static InetSocketAddress toAddress(String address) {
int i = address.indexOf(':');
String host;
int port;
if (i > -1) {
host = address.substring(0, i);
port = Integer.parseInt(address.substring(i + 1));
} else {
host = address;
port = 0;
}
return new InetSocketAddress(host, port);
}
public static String toURL(String protocol, String host, int port, String path) {
StringBuilder sb = new StringBuilder();
sb.append(protocol).append("://");
sb.append(host).append(':').append(port);
if (path.charAt(0) != '/') {
sb.append('/');
}
sb.append(path);
return sb.toString();
}
public static void joinMulticastGroup(MulticastSocket multicastSocket, InetAddress multicastAddress) throws IOException {
setInterface(multicastSocket, multicastAddress instanceof Inet6Address);
multicastSocket.setLoopbackMode(false);
multicastSocket.joinGroup(multicastAddress);
}
public static void setInterface(MulticastSocket multicastSocket, boolean preferIpv6) throws IOException {
boolean interfaceSet = false;
Enumeration interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface i = (NetworkInterface) interfaces.nextElement();
Enumeration addresses = i.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress address = (InetAddress) addresses.nextElement();
if (preferIpv6 && address instanceof Inet6Address) {
try {
if(address.isReachable(100)){
multicastSocket.setInterface(address);
interfaceSet = true;
break;
}
} catch (IOException e) {
// ignore
}
} else if (!preferIpv6 && address instanceof Inet4Address) {
try {
if(address.isReachable(100)){
multicastSocket.setInterface(address);
interfaceSet = true;
break;
}
} catch (IOException e) {
// ignore
}
}
}
if (interfaceSet) {
break;
}
}
}
/**
* @param pattern
* @param host
* @param port
* @return
* @throws UnknownHostException
*/
public static boolean matchIpRange(String pattern, String host, int port) throws UnknownHostException {
if (pattern == null || host == null) {
throw new IllegalArgumentException("Illegal Argument pattern or hostName. Pattern:" + pattern + ", Host:" + host);
}
pattern = pattern.trim();
if ("*.*.*.*".equals(pattern) || "*".equals(pattern)) {
return true;
}
InetAddress inetAddress = InetAddress.getByName(host);
boolean isIpv4 = isValidV4Address(inetAddress) ? true : false;
String[] hostAndPort = getPatternHostAndPort(pattern, isIpv4);
if (hostAndPort[1] != null && !hostAndPort[1].equals(String.valueOf(port))) {
return false;
}
pattern = hostAndPort[0];
String splitCharacter = SPLIT_IPV4_CHARECTER;
if (!isIpv4) {
splitCharacter = SPLIT_IPV6_CHARECTER;
}
String[] mask = pattern.split(splitCharacter);
//check format of pattern
checkHostPattern(pattern, mask, isIpv4);
host = inetAddress.getHostAddress();
String[] ipAddress = host.split(splitCharacter);
if (pattern.equals(host)) {
return true;
}
// short name condition
if (!ipPatternContainExpression(pattern)) {
InetAddress patternAddress = InetAddress.getByName(pattern);
if (patternAddress.getHostAddress().equals(host)) {
return true;
} else {
return false;
}
}
for (int i = 0; i < mask.length; i++) {
if ("*".equals(mask[i]) || mask[i].equals(ipAddress[i])) {
continue;
} else if (mask[i].contains("-")) {
String[] rangeNumStrs = mask[i].split("-");
if (rangeNumStrs.length != 2) {
throw new IllegalArgumentException("There is wrong format of ip Address: " + mask[i]);
}
Integer min = getNumOfIpSegment(rangeNumStrs[0], isIpv4);
Integer max = getNumOfIpSegment(rangeNumStrs[1], isIpv4);
Integer ip = getNumOfIpSegment(ipAddress[i], isIpv4);
if (ip < min || ip > max) {
return false;
}
} else if ("0".equals(ipAddress[i]) && ("0".equals(mask[i]) || "00".equals(mask[i]) || "000".equals(mask[i]) || "0000".equals(mask[i]))) {
continue;
} else if (!mask[i].equals(ipAddress[i])) {
return false;
}
}
return true;
}
private static boolean ipPatternContainExpression(String pattern) {
return pattern.contains("*") || pattern.contains("-");
}
private static void checkHostPattern(String pattern, String[] mask, boolean isIpv4) {
if (!isIpv4) {
if (mask.length != 8 && ipPatternContainExpression(pattern)) {
throw new IllegalArgumentException("If you config ip expression that contains '*' or '-', please fill qulified ip pattern like 234e:0:4567:0:0:0:3d:*. ");
}
if (mask.length != 8 && !pattern.contains("::")) {
throw new IllegalArgumentException("The host is ipv6, but the pattern is not ipv6 pattern : " + pattern);
}
} else {
if (mask.length != 4) {
throw new IllegalArgumentException("The host is ipv4, but the pattern is not ipv4 pattern : " + pattern);
}
}
}
private static String[] getPatternHostAndPort(String pattern, boolean isIpv4) {
String[] result = new String[2];
if (pattern.startsWith("[") && pattern.contains("]:")) {
int end = pattern.indexOf("]:");
result[0] = pattern.substring(1, end);
result[1] = pattern.substring(end + 2);
return result;
} else if (pattern.startsWith("[") && pattern.endsWith("]")) {
result[0] = pattern.substring(1, pattern.length() - 1);
result[1] = null;
return result;
} else if (isIpv4 && pattern.contains(":")) {
int end = pattern.indexOf(":");
result[0] = pattern.substring(0, end);
result[1] = pattern.substring(end + 1);
return result;
} else {
result[0] = pattern;
return result;
}
}
private static Integer getNumOfIpSegment(String ipSegment, boolean isIpv4) {
if (isIpv4) {
return Integer.parseInt(ipSegment);
}
return Integer.parseInt(ipSegment, 16);
}
}

View File

@ -0,0 +1,34 @@
package com.github.kfcfans.oms.worker.common.utils;
import org.springframework.context.ApplicationContext;
/**
* Spring ApplicationContext 工具类
*
* @author tjq
* @since 2020/3/16
*/
public class SpringUtils {
private static boolean supportSpringBean = false;
private static ApplicationContext context;
public static void inject(ApplicationContext ctx) {
context = ctx;
supportSpringBean = true;
}
public static boolean supportSpringBean() {
return supportSpringBean;
}
public static <T> T getBean(Class<T> clz) {
return context.getBean(clz);
}
@SuppressWarnings("unchecked")
public static <T> T getBean(String className) {
return (T) context.getBean(className);
}
}

View File

@ -0,0 +1,43 @@
package com.github.kfcfans.oms.worker.persistence;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
/**
* 数据库连接管理
*
* @author tjq
* @since 2020/3/17
*/
public class ConnectionFactory {
private static DataSource dataSource;
public static Connection getConnection() throws SQLException {
return getDataSource().getConnection();
}
private static DataSource getDataSource() {
if (dataSource != null) {
return dataSource;
}
synchronized (ConnectionFactory.class) {
if (dataSource == null) {
HikariConfig config = new HikariConfig();
config.setDriverClassName("org.h2.Driver");
config.setJdbcUrl("jdbc:h2:file:~/.h2/oms/oms_worker_db");
config.setAutoCommit(true);
// 池中最小空闲连接数量
config.setMinimumIdle(2);
// 池中最大连接数量
config.setMaximumPoolSize(16);
dataSource = new HikariDataSource(config);
}
}
return dataSource;
}
}

View File

@ -0,0 +1,54 @@
package com.github.kfcfans.oms.worker.persistence;
import lombok.Data;
import org.springframework.util.StringUtils;
/**
* 简单查询直接类只支持 select * from task_info where xxx = xxx and xxx = xxx 的查询
*
* @author tjq
* @since 2020/3/17
*/
@Data
public class SimpleTaskQuery {
private static final String PREFIX_SQL = "select * from task_info where ";
private static final String LINK = " and ";
private String taskId;
private String jobId;
private String instanceId;
private String taskName;
private String address;
private Integer status;
private Integer limit;
public String getQuerySQL() {
StringBuilder sb = new StringBuilder(PREFIX_SQL);
if (!StringUtils.isEmpty(taskId)) {
sb.append("task_id = '").append(taskId).append("'").append(LINK);
}
if (!StringUtils.isEmpty(jobId)) {
sb.append("job_id = '").append(jobId).append("'").append(LINK);
}
if (!StringUtils.isEmpty(instanceId)) {
sb.append("instance_id = '").append(instanceId).append("'").append(LINK);
}
if (!StringUtils.isEmpty(address)) {
sb.append("address = '").append(address).append("'").append(LINK);
}
if (!StringUtils.isEmpty(taskName)) {
sb.append("task_name = '").append(taskName).append("'").append(LINK);
}
if (status != null) {
sb.append("status = ").append(status).append(LINK);
}
String substring = sb.substring(0, sb.length() - LINK.length());
if (limit != null) {
substring = substring + " limit " + limit;
}
return substring;
}
}

View File

@ -0,0 +1,34 @@
package com.github.kfcfans.oms.worker.persistence;
import java.util.Collection;
import java.util.List;
/**
* 任务持久化接口
*
* @author tjq
* @since 2020/3/17
*/
public interface TaskDAO {
/**
* 初始化任务表
*/
boolean initTable();
/**
* 插入任务数据
*/
boolean save(TaskDO task);
boolean batchSave(Collection<TaskDO> tasks);
/**
* 更新任务数据必须有主键 instanceId + taskId
*/
boolean update(TaskDO task);
TaskDO selectByKey(String instanceId, String taskId);
List<TaskDO> simpleQuery(SimpleTaskQuery query);
}

View File

@ -0,0 +1,170 @@
package com.github.kfcfans.oms.worker.persistence;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.sql.*;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* 任务持久化实现层表名task_info
*
* @author tjq
* @since 2020/3/17
*/
@Slf4j
public class TaskDAOImpl implements TaskDAO {
@Override
public boolean initTable() {
String delTableSQL = "drop table if exists task_info";
String createTableSQL = "create table task_info (task_id varchar(20), instance_id varchar(20), job_id varchar(20), task_name varchar(20), task_content text, address varchar(20), status int(11), result text, created_time bigint(20), last_modified_time bigint(20), unique key pkey (instance_id, task_id))";
try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) {
stat.execute(delTableSQL);
stat.execute(createTableSQL);
}catch (Exception e) {
log.error("[TaskDAO] initTable failed.", e);
return false;
}
return true;
}
@Override
public boolean save(TaskDO task) {
String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?)";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) {
fillInsertPreparedStatement(task, ps);
return ps.execute();
}catch (Exception e) {
log.error("[TaskDAO] insert failed.", e);
}
return false;
}
@Override
public boolean batchSave(Collection<TaskDO> tasks) {
String insertSQL = "insert into task_info(task_id, instance_id, job_id, task_name, task_content, address, status, result, created_time, last_modified_time) values (?,?,?,?,?,?,?,?,?,?)";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) {
for (TaskDO task : tasks) {
fillInsertPreparedStatement(task, ps);
ps.addBatch();
}
ps.executeBatch();
return true;
}catch (Exception e) {
log.error("[TaskDAO] insert failed.", e);
}
return false;
}
@Override
public boolean update(TaskDO task) {
return false;
}
@Override
public TaskDO selectByKey(String instanceId, String taskId) {
String selectSQL = "select * from task_info where instance_id = ? and task_id = ?";
ResultSet rs = null;
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(selectSQL)) {
ps.setString(1, instanceId);
ps.setString(2, taskId);
rs = ps.executeQuery();
if (rs.next()) {
return convert(rs);
}
}catch (Exception e) {
log.error("[TaskDAO] selectByKey failed(instanceId = {}, taskId = {}).", instanceId, taskId, e);
}finally {
if (rs != null) {
try {
rs.close();
}catch (Exception ignore) {
}
}
}
return null;
}
@Override
public List<TaskDO> simpleQuery(SimpleTaskQuery query) {
ResultSet rs = null;
String sql = query.getQuerySQL();
List<TaskDO> result = Lists.newLinkedList();
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
rs = ps.executeQuery();
while (rs.next()) {
result.add(convert(rs));
}
}catch (Exception e) {
log.error("[TaskDAO] simpleQuery failed(sql = {}).", sql, e);
}finally {
if (rs != null) {
try {
rs.close();
}catch (Exception ignore) {
}
}
}
return result;
}
private static TaskDO convert(ResultSet rs) throws SQLException {
TaskDO task = new TaskDO();
task.setTaskId(rs.getString("task_id"));
task.setInstanceId(rs.getString("instance_id"));
task.setJobId(rs.getString("job_id"));
task.setTaskName(rs.getString("task_name"));
task.setTaskContent(rs.getString("task_content"));
task.setAddress(rs.getString("address"));
task.setStatus(rs.getInt("status"));
task.setResult(rs.getString("result"));
task.setCreatedTime(rs.getLong("created_time"));
task.setLastModifiedTime(rs.getLong("last_modified_time"));
return task;
}
private static void fillInsertPreparedStatement(TaskDO task, PreparedStatement ps) throws SQLException {
ps.setString(1, task.getTaskId());
ps.setString(2, task.getInstanceId());
ps.setString(3, task.getJobId());
ps.setString(4, task.getTaskName());
ps.setString(5, task.getTaskContent());
ps.setString(6, task.getAddress());
ps.setInt(7, task.getStatus());
ps.setString(8, task.getResult());
ps.setLong(9, task.getCreatedTime());
ps.setLong(10, task.getLastModifiedTime());
}
public static void main(String[] args) throws Exception {
TaskDAOImpl taskDAO = new TaskDAOImpl();
taskDAO.initTable();
TaskDO taskDO = new TaskDO();
taskDO.setJobId("11");
taskDO.setInstanceId("22");
taskDO.setTaskId("2.1");
taskDO.setTaskName("zzz");
taskDO.setTaskContent("hhhh");
taskDAO.save(taskDO);
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId("22");
query.setTaskId("2.1");
System.out.println(taskDAO.simpleQuery(query));
Thread.sleep(100000);
}
}

View File

@ -0,0 +1,35 @@
package com.github.kfcfans.oms.worker.persistence;
import lombok.Data;
/**
* TaskDO为了简化 DAO 一张表实现两种功能
* 对于 TaskTrackertask_info 存储了当前 JobInstance 所有的子任务信息
* 对于普通的 Workertask_info 存储了当前无法处理的任务信息
*
* @author tjq
* @since 2020/3/17
*/
@Data
public class TaskDO {
// 层次命名法可以表示 Map 后的父子关系 0.1.2 代表 rootTask map 的第一个 task map 的第二个 task
private String taskId;
private String jobId;
private String instanceId;
// 任务名称
private String taskName;
// 任务参数
private String taskContent;
// 对于JobTracker为workerAddress对于普通Worker为jobTrackerAddress
private String address;
// 任务状态010代表 JobTracker 使用1120代表普通Worker使用
private int status;
// 执行结果
private String result;
// 创建时间
private long createdTime;
// 最后修改时间
private long lastModifiedTime;
}

View File

@ -0,0 +1,53 @@
package com.github.kfcfans.oms.worker.persistence;
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 任务持久化服务
*
* @author tjq
* @since 2020/3/17
*/
public class TaskPersistenceService {
private TaskDAO taskDAO = new TaskDAOImpl();
private static final int MAX_BATCH_SIZE = 50;
public boolean save(TaskDO task) {
return taskDAO.save(task);
}
public boolean batchSave(List<TaskDO> tasks) {
if (CollectionUtils.isEmpty(tasks)) {
return true;
}
if (tasks.size() <= MAX_BATCH_SIZE) {
return taskDAO.batchSave(tasks);
}
List<List<TaskDO>> partition = Lists.partition(tasks, MAX_BATCH_SIZE);
for (List<TaskDO> p : partition) {
boolean b = taskDAO.batchSave(p);
if (!b) {
return false;
}
}
return true;
}
/**
* 获取 TaskTracker 准备派发给 Worker 执行的 task
*/
public List<TaskDO> getNeedDispatchTask(String instanceId) {
SimpleTaskQuery query = new SimpleTaskQuery();
query.setInstanceId(instanceId);
query.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
query.setLimit(100);
return taskDAO.simpleQuery(query);
}
}

View File

@ -0,0 +1,33 @@
package com.github.kfcfans.oms.worker.pojo.model;
import lombok.Data;
/**
* 被调度执行的任务实例详情
*
* @author tjq
* @since 2020/3/16
*/
@Data
public class JobInstanceInfo {
private String jobId;
private String instanceId;
// 任务执行类型单机广播MR
private String executeType;
// 处理器类型JavaBeanJar脚本等
private String processorType;
// 处理器信息
private String processorInfo;
// 任务执行时间限制单位毫秒
private long timeLimit;
// 可用处理器地址可能多值逗号分隔
private String workerAddress;
/* *********************** Map/MapReduce 任务专用 *********************** */
// 每台机器的处理线程数上限
private int threadConcurrency;
// 子任务重试次数任务本身的重试机制由server控制
private int taskRetryNum;
}

View File

@ -0,0 +1,35 @@
package com.github.kfcfans.oms.worker.pojo.request;
/**
* 服务端调度任务请求一次任务处理的入口
*
* @author tjq
* @since 2020/3/17
*/
public class ServerScheduleJobReq {
// 调度的服务器地址默认通讯目标
private String serverAddress;
/* *********************** 任务相关属性 *********************** */
private String jobId;
private String instanceId;
// 任务执行类型单机广播MR
private String executeType;
// 处理器类型JavaBeanJar脚本等
private String processorType;
// 处理器信息
private String processorInfo;
// 任务执行时间限制单位毫秒
private long timeLimit;
// 可用处理器地址可能多值逗号分隔
private String workerAddress;
/* *********************** Map/MapReduce 任务专用 *********************** */
// 每台机器的处理线程数上限
private int threadConcurrency;
// 子任务重试次数任务本身的重试机制由server控制
private int taskRetryNum;
}

View File

@ -0,0 +1,10 @@
package com.github.kfcfans.oms.worker.pojo.request;
/**
* WorkerMapTaskRequest
*
* @author tjq
* @since 2020/3/17
*/
public class WorkerMapTaskRequest {
}

View File

@ -0,0 +1,15 @@
package com.github.kfcfans.oms.worker.tracker;
/**
* 广播任务使用的 TaskTracker
*
* @author tjq
* @since 2020/3/17
*/
public class BroadcastTaskTracker extends TaskTracker {
@Override
public void dispatch() {
}
}

View File

@ -0,0 +1,17 @@
package com.github.kfcfans.oms.worker.tracker;
import com.github.kfcfans.oms.worker.pojo.request.WorkerMapTaskRequest;
/**
* MapReduce 任务使用的 TaskTracker
*
* @author tjq
* @since 2020/3/17
*/
public class MapReduceTaskTracker extends StandaloneTaskTracker {
public void newTask(WorkerMapTaskRequest mapRequest) {
}
}

View File

@ -0,0 +1,15 @@
package com.github.kfcfans.oms.worker.tracker;
/**
* 单机任务使用的 TaskTracker
*
* @author tjq
* @since 2020/3/17
*/
public class StandaloneTaskTracker extends TaskTracker {
@Override
public void dispatch() {
}
}

View File

@ -0,0 +1,93 @@
package com.github.kfcfans.oms.worker.tracker;
import akka.actor.ActorRef;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.oms.worker.common.constants.CommonSJ;
import com.github.kfcfans.oms.worker.common.constants.TaskConstant;
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
import com.google.common.collect.Lists;
import java.util.List;
/**
* 负责管理 JobInstance 的运行主要包括任务的派发MR可能存在大量的任务和状态的更新
*
* @author tjq
* @since 2020/3/17
*/
public abstract class TaskTracker {
// 任务实例信息
protected JobInstanceInfo jobInstanceInfo;
protected ActorRef actor;
protected TaskPersistenceService taskPersistenceService;
/**
* 分发任务
*/
public abstract void dispatch();
public void updateTaskStatus() {
}
public boolean finished() {
return false;
}
/**
* 持久化根任务只有完成持久化才能视为任务开始running先持久化再报告server
*/
private void persistenceTask() {
ExecuteType executeType = ExecuteType.valueOf(jobInstanceInfo.getExecuteType());
boolean persistenceResult;
// 单机MR模型下根任务模型本机直接执行JobTracker一般为负载最小的机器且MR的根任务通常伴随着 map 操作本机执行可以有效减少网络I/O开销
if (executeType != ExecuteType.BROADCAST) {
TaskDO rootTask = new TaskDO();
rootTask.setStatus(1);
rootTask.setJobId(jobInstanceInfo.getJobId());
rootTask.setInstanceId(jobInstanceInfo.getInstanceId());
rootTask.setTaskId(TaskConstant.ROOT_TASK_ID);
rootTask.setAddress(NetUtils.getLocalHost());
rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME);
rootTask.setCreatedTime(System.currentTimeMillis());
rootTask.setCreatedTime(System.currentTimeMillis());
persistenceResult = taskPersistenceService.save(rootTask);
}else {
List<TaskDO> taskList = Lists.newLinkedList();
List<String> addrList = CommonSJ.commaSplitter.splitToList(jobInstanceInfo.getWorkerAddress());
for (int i = 0; i < addrList.size(); i++) {
TaskDO task = new TaskDO();
task.setStatus(1);
task.setJobId(jobInstanceInfo.getJobId());
task.setInstanceId(jobInstanceInfo.getInstanceId());
task.setTaskId(String.valueOf(i));
task.setAddress(addrList.get(i));
task.setTaskName(TaskConstant.ROOT_TASK_NAME);
task.setCreatedTime(System.currentTimeMillis());
task.setCreatedTime(System.currentTimeMillis());
taskList.add(task);
}
persistenceResult = taskPersistenceService.batchSave(taskList);
}
if (!persistenceResult) {
throw new RuntimeException("create root task failed.");
}
}
/**
* 启动任务分发器
*/
private void initDispatcher() {
}
}

2
oh-my-scheduler.iml Normal file
View File

@ -0,0 +1,2 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4" />

36
pom.xml Normal file
View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.kfcfans</groupId>
<artifactId>oh-my-scheduler</artifactId>
<version>1.0.0-SNAPSHOT</version>
<modules>
<module>oh-my-scheduler-worker</module>
<module>oh-my-scheduler-server</module>
<module>oh-my-scheduler-common</module>
</modules>
<packaging>pom</packaging>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<lombok.version>1.18.12</lombok.version>
</properties>
<dependencies>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>