lost the last connection with my shine

This commit is contained in:
tjq 2020-04-02 09:19:31 +08:00
parent 1267b3a34f
commit 02de19357c
30 changed files with 516 additions and 88 deletions

View File

@ -1,13 +1,20 @@
package com.github.kfcfans.common;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 任务执行类型
*
* @author tjq
* @since 2020/3/17
*/
@Getter
@AllArgsConstructor
public enum ExecuteType {
STANDALONE,
BROADCAST,
MAP_REDUCE
STANDALONE(1),
BROADCAST(2),
MAP_REDUCE(3);
int v;
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.common;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 处理器类型
@ -8,10 +9,12 @@ import lombok.AllArgsConstructor;
* @author tjq
* @since 2020/3/23
*/
@Getter
@AllArgsConstructor
public enum ProcessorType {
EMBEDDED_JAVA("内置Java对象");
EMBEDDED_JAVA(1, "内置Java对象");
private int v;
private String des;
}

View File

@ -20,13 +20,13 @@ public class RemoteConstant {
public static final String Task_TRACKER_ACTOR_NAME = "task_tracker";
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
public static final String AKKA_CONFIG_NAME = "oms-akka-application.conf";
public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf";
/* ************************ AKKA SERVER ************************ */
public static final String SERVER_ACTOR_SYSTEM_NAME = "oms-server";
public static final String SERVER_ACTOR_NAME = "server_actor";
public static final String SERVER_AKKA_CONFIG_NAME = "oms-worker.akka.conf";
/* ************************ OTHERS ************************ */

View File

@ -12,10 +12,12 @@ import java.io.Serializable;
* @since 2020/3/25
*/
@Data
public class WorkerHealthReportReq implements Serializable {
public class WorkerHeartbeat implements Serializable {
// 本机地址 -> IP:port
private String totalAddress;
private String workerAddress;
// 当前 appName
private String appName;
private SystemMetrics systemMetrics;
}

View File

@ -38,6 +38,10 @@ public class CommonUtils {
return executor.get();
}
public static <T> T executeWithRetry0(SupplierPlus<T> executor) throws Exception {
return executeWithRetry(executor, 3, 100);
}
/**
* 重试执行仅适用于根据返回值决定是否执行成功的方法
* @param booleanExecutor 需要执行的方法其返回值决定了执行是否成功

View File

@ -15,21 +15,67 @@
<properties>
<swagger.version>2.9.2</swagger.version>
<akka.version>2.6.4</akka.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
<oms.common.version>1.0.0-SNAPSHOT</oms.common.version>
<hikaricp.version>3.4.2</hikaricp.version>
<mysql.version>8.0.19</mysql.version>
<commons.lang.version>3.10</commons.lang.version>
</properties>
<dependencies>
<!-- oms-common -->
<dependency>
<groupId>com.github.kfcfans</groupId>
<artifactId>oh-my-scheduler-common</artifactId>
<version>${oms.common.version}</version>
</dependency>
<!-- HikariCP -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikaricp.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- akka remote -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons.lang.version}</version>
</dependency>
<!-- SpringBoot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot.version}</version>
<scope>test</scope>
</dependency>
@ -46,6 +92,7 @@
<version>${swagger.version}</version>
</dependency>
</dependencies>
<!-- SpringBoot专用的打包插件 -->

View File

@ -0,0 +1,29 @@
package com.github.kfcfans.oms.server.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import lombok.extern.slf4j.Slf4j;
/**
* 处理 Worker 请求
*
* @author tjq
* @since 2020/3/30
*/
@Slf4j
public class ServerActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(WorkerHeartbeat.class, this::onReceiveWorkerHeartbeat)
.matchAny(obj -> log.warn("[ServerActor] receive unknown request: {}.", obj))
.build();
}
private void onReceiveWorkerHeartbeat(WorkerHeartbeat heartbeat) {
}
}

View File

@ -0,0 +1,22 @@
package com.github.kfcfans.oms.server.common.constans;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 时间表达式类型
*
* @author tjq
* @since 2020/3/30
*/
@Getter
@AllArgsConstructor
public enum TimeExpressionType {
API(1),
CRON(2),
FIX_RATE(3),
FIX_DELAY(4);
int v;
}

View File

@ -1,22 +0,0 @@
package com.github.kfcfans.oms.server.model;
import java.util.Date;
/**
* 数据库实体类基类
*
* @author tjq
* @since 2020/3/29
*/
public abstract class BaseDO {
/**
* 主键
*/
private Long id;
private Date gmtCreate;
private Date gmtModified;
private String attribute;
}

View File

@ -1,20 +0,0 @@
package com.github.kfcfans.oms.server.model;
/**
* 分组信息表
*
* @author tjq
* @since 2020/3/29
*/
public class GroupInfoDO extends BaseDO {
/**
* 分组名称
*/
private String groupName;
/**
* 分组描述
*/
private String groupDescription;
}

View File

@ -1,14 +0,0 @@
package com.github.kfcfans.oms.server.model;
/**
* 注册信息表用于服务发现
*
* @author tjq
* @since 2020/3/30
*/
public class RegisterInfoDO extends BaseDO {
private String appName;
private String currentServerAddress;
private String lastServerAddress;
}

View File

@ -0,0 +1,28 @@
package com.github.kfcfans.oms.server.persistence.model;
import lombok.Data;
import javax.persistence.*;
import java.util.Date;
/**
* 应用信息表
*
* @author tjq
* @since 2020/3/30
*/
@Data
@Entity
@Table(name = "app_info")
public class AppInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String appName;
private String description;
private Date gmtCreate;
private Date gmtModified;
}

View File

@ -0,0 +1,58 @@
package com.github.kfcfans.oms.server.persistence.model;
import lombok.Data;
import javax.persistence.*;
import java.util.Date;
/**
* 任务信息表
*
* @author tjq
* @since 2020/3/29
*/
@Data
@Entity
@Table(name = "job_info")
public class JobInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/* ************************** 任务基本信息 ************************** */
// 任务名称
private String jobName;
// 任务描述
private String jobDescription;
// 任务所属的应用ID
private Long appId;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
private Integer timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
private String timeExpression;
/* ************************** 执行方式 ************************** */
// 执行类型单机/广播/MR
private Integer executeType;
// 执行器类型Java/Shell
private Integer processorType;
// 执行器信息
private String processorInfo;
/* ************************** 运行时配置 ************************** */
// 并发度同时执行的线程数量
private Integer concurrency;
// 任务整体超时时间
private Long instanceTimeLimit;
// 任务的每一个Task超时时间
private Long taskTimeLimit;
private Date gmtCreate;
private Date gmtModified;
}

View File

@ -1,4 +1,7 @@
package com.github.kfcfans.oms.server.model;
package com.github.kfcfans.oms.server.persistence.model;
import javax.persistence.*;
import java.util.Date;
/**
* 任务运行日志表
@ -6,7 +9,13 @@ package com.github.kfcfans.oms.server.model;
* @author tjq
* @since 2020/3/30
*/
public class JobLogDO extends BaseDO {
@Entity
@Table(name = "job_log")
public class JobLogDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
// 任务ID
private Long jobId;
@ -19,4 +28,7 @@ public class JobLogDO extends BaseDO {
// 耗时
private Long usedTime;
private Date gmtCreate;
private Date gmtModified;
}

View File

@ -0,0 +1,13 @@
package com.github.kfcfans.oms.server.persistence.repository;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* AppInfo 数据访问层
*
* @author tjq
* @since 2020/4/1
*/
public interface AppInfoRepository extends JpaRepository<AppInfoDO, Long> {
}

View File

@ -0,0 +1,13 @@
package com.github.kfcfans.oms.server.persistence.repository;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* JobInfo 数据访问层
*
* @author tjq
* @since 2020/4/1
*/
public interface JobInfoRepository extends JpaRepository<JobInfoDO, Long> {
}

View File

@ -0,0 +1,13 @@
package com.github.kfcfans.oms.server.persistence.repository;
import com.github.kfcfans.oms.server.persistence.model.JobLogDO;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* JobLog 数据访问层
*
* @author tjq
* @since 2020/4/1
*/
public interface JobLogRepository extends JpaRepository<JobLogDO, Long> {
}

View File

@ -0,0 +1,39 @@
package com.github.kfcfans.oms.server.pojo;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.exception.ExceptionUtils;
/**
* 请求返回的结果对象
*
* @author tjq
* @since 2020/3/30
*/
@Getter
@Setter
public class ResultDTO<T> {
private boolean success;
private T data;
private String message;
public static <T> ResultDTO<T> success(T data) {
ResultDTO<T> r = new ResultDTO<>();
r.success = true;
r.data = data;
return r;
}
public static <T> ResultDTO<T> failed(String message) {
ResultDTO<T> r = new ResultDTO<>();
r.success = false;
r.message = message;
return r;
}
public static <T> ResultDTO<T> failed(Throwable t) {
return failed(ExceptionUtils.getStackTrace(t));
}
}

View File

@ -0,0 +1,25 @@
package com.github.kfcfans.oms.server.web;
import com.github.kfcfans.oms.server.pojo.ResultDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* 统一处理 web 层异常信息
*
* @author tjq
* @since 2020/3/30
*/
@Slf4j
@ControllerAdvice
public class ControllerExceptionHandler {
@ResponseBody
@ExceptionHandler(Exception.class)
public ResultDTO<Void> exceptionHandler(Exception e) {
log.error("[ControllerException] http request failed.", e);
return ResultDTO.failed(e);
}
}

View File

@ -0,0 +1,61 @@
package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.pojo.ResultDTO;
import com.github.kfcfans.oms.server.web.request.ModifyAppInfoRequest;
import lombok.Data;
import org.springframework.beans.BeanUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
/**
* AppName Controller
*
* @author tjq
* @since 2020/4/1
*/
@RestController
@RequestMapping("/appInfo")
public class AppInfoController {
@Resource
private AppInfoRepository appInfoRepository;
@GetMapping("/save")
public ResultDTO<Void> saveAppInfo(ModifyAppInfoRequest appInfoRequest) {
AppInfoDO appInfoDO = new AppInfoDO();
BeanUtils.copyProperties(appInfoRequest, appInfoDO);
appInfoRepository.saveAndFlush(appInfoDO);
return ResultDTO.success(null);
}
@GetMapping("/delete")
public ResultDTO<Void> deleteAppInfo(Long appId) {
appInfoRepository.deleteById(appId);
return ResultDTO.success(null);
}
@GetMapping("/list")
public ResultDTO<List<AppInfoVO>> listAppInfo() {
List<AppInfoVO> result = appInfoRepository.findAll().stream().map(appInfoDO -> {
AppInfoVO appInfoVO = new AppInfoVO();
BeanUtils.copyProperties(appInfoDO, appInfoVO);
return appInfoVO;
}).collect(Collectors.toList());
return ResultDTO.success(result);
}
@Data
private static class AppInfoVO {
private Long id;
private String appName;
}
}

View File

@ -0,0 +1,52 @@
package com.github.kfcfans.oms.server.web.controller;
import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.oms.server.common.constans.TimeExpressionType;
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.pojo.ResultDTO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest;
import org.springframework.beans.BeanUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* 任务信息管理 Controller
*
* @author tjq
* @since 2020/3/30
*/
@RestController()
@RequestMapping("job")
public class JobController {
@Resource
private JobInfoRepository jobInfoRepository;
@PostMapping("/save")
public ResultDTO<Void> saveJobInfo(ModifyJobInfoRequest request) {
JobInfoDO jobInfoDO = new JobInfoDO();
BeanUtils.copyProperties(request, jobInfoDO);
// 拷贝枚举值
jobInfoDO.setExecuteType(ExecuteType.valueOf(request.getExecuteType()).getV());
jobInfoDO.setProcessorType(ProcessorType.valueOf(request.getProcessorType()).getV());
jobInfoDO.setTimeExpressionType(TimeExpressionType.valueOf(request.getTimeExpression()).getV());
jobInfoRepository.saveAndFlush(jobInfoDO);
return ResultDTO.success(null);
}
@GetMapping("/delete")
public ResultDTO<Void> deleteJobInfo(Long jobId) {
jobInfoRepository.deleteById(jobId);
return ResultDTO.success(null);
}
}

View File

@ -0,0 +1,7 @@
/**
* CRUD较为简单就不单独搞 Service 层了
*
* @author tjq
* @since 2020/4/2
*/
package com.github.kfcfans.oms.server.web.controller;

View File

@ -0,0 +1,17 @@
package com.github.kfcfans.oms.server.web.request;
import lombok.Data;
/**
* 修改应用信息请求
*
* @author tjq
* @since 2020/4/1
*/
@Data
public class ModifyAppInfoRequest {
private Long id;
private String appName;
private String description;
}

View File

@ -1,43 +1,46 @@
package com.github.kfcfans.oms.server.model;
package com.github.kfcfans.oms.server.web.request;
import lombok.Data;
/**
* 任务信息表
* 创建/修改 JobInfo 请求
*
* @author tjq
* @since 2020/3/29
* @since 2020/3/30
*/
public class JobInfoDO extends BaseDO {
@Data
public class ModifyJobInfoRequest {
/* ************************** 任务基本信息 ************************** */
// 任务名称
private String jobName;
// 任务描述
private String jobDescription;
// 任务分组名称
// 任务所属的应用ID
private Long appId;
// 任务分组名称仅用于前端展示的分组
private String groupName;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
private int timeExpressionType;
private String timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
private String timeExpression;
/* ************************** 执行方式 ************************** */
// 执行类型单机/广播/MR
private int executeType;
private String executeType;
// 执行器类型Java/Shell
private int processorType;
private String processorType;
// 执行器信息
private String processorInfo;
/* ************************** 运行时配置 ************************** */
// 并发度同时执行的线程数量
private int concurrency;
private Integer concurrency;
// 任务整体超时时间
private long instanceTimeLimit;
private Long instanceTimeLimit;
// 任务的每一个Task超时时间
private long taskTimeLimit;
private Long taskTimeLimit;
}

View File

@ -1,2 +1,14 @@
# server port config
server.port=7700
# db config
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://remotehost:3391/oms?charset=utf8mb4&useSSL=false
spring.datasource.username=root
spring.datasource.password=No1Bug2Please3!
# Hikari 数据源专用配置
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.minimum-idle=5
# JPA 相关配置
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=update

View File

@ -0,0 +1,16 @@
akka {
actor {
# cluster is better(recommend by official document), but I prefer remote
provider = remote
# TODO : 临时使用 Java 序列化,开发完成后切换到 protocol-buffers
allow-java-serialization = on
}
remote {
artery {
transport = tcp # See Selecting a transport below
# over write by code
canonical.hostname = "127.0.0.1"
canonical.port = 10086
}
}
}

View File

@ -45,7 +45,7 @@
<dependency>
<groupId>com.github.kfcfans</groupId>
<artifactId>oh-my-scheduler-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>${oms.common.version}</version>
</dependency>
<!-- h2 database -->

View File

@ -74,7 +74,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
workerAddress = localIP + ":" + port;
log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress);
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.AKKA_CONFIG_NAME);
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
actorSystem = ActorSystem.create(RemoteConstant.ACTOR_SYSTEM_NAME, akkaFinalConfig);

View File

@ -3,7 +3,7 @@ package com.github.kfcfans.oms.worker.background;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.model.SystemMetrics;
import com.github.kfcfans.common.request.WorkerHealthReportReq;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
import com.github.kfcfans.oms.worker.common.utils.SystemInfoUtils;
@ -25,13 +25,14 @@ public class WorkerHealthReportRunnable implements Runnable {
SystemMetrics systemMetrics = SystemInfoUtils.getSystemMetrics();
WorkerHealthReportReq reportReq = new WorkerHealthReportReq();
reportReq.setSystemMetrics(systemMetrics);
reportReq.setTotalAddress(OhMyWorker.getWorkerAddress());
WorkerHeartbeat heartbeat = new WorkerHeartbeat();
heartbeat.setSystemMetrics(systemMetrics);
heartbeat.setWorkerAddress(OhMyWorker.getWorkerAddress());
heartbeat.setAppName(OhMyWorker.getConfig().getAppName());
// 发送请求
String serverPath = AkkaUtils.getAkkaServerNodePath(RemoteConstant.SERVER_ACTOR_NAME);
ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(serverPath);
actorSelection.tell(reportReq, null);
actorSelection.tell(heartbeat, null);
}
}