[release] v3.2.2

This commit is contained in:
朱八 2020-08-03 01:34:05 +08:00
commit 4ddf65a44e
69 changed files with 629 additions and 268 deletions

View File

@ -6,10 +6,10 @@
<groupId>com.github.kfcfans</groupId>
<artifactId>powerjob</artifactId>
<version>1.0.0</version>
<version>2.0.0</version>
<packaging>pom</packaging>
<name>powerjob</name>
<url>https://github.com/KFCFans/PowerJob</url>
<url>http://www.powerjob.tech</url>
<description>Distributed scheduling and execution framework</description>
<licenses>
<license>

View File

@ -5,16 +5,16 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0</version>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.2.1</version>
<version>3.2.2</version>
<packaging>jar</packaging>
<properties>
<powerjob.common.version>3.2.1</powerjob.common.version>
<powerjob.common.version>3.2.2</powerjob.common.version>
<junit.version>5.6.1</junit.version>
</properties>

View File

@ -228,6 +228,22 @@ public class OhMyClient {
return JsonUtils.parseObject(post, ResultDTO.class);
}
/**
* 重试任务实例
* 只有完成状态成功失败手动停止被取消的任务才能被重试且暂不支持工作流内任务实例的重试
* @param instanceId 任务实例ID
* @return true 代表取消成功false 取消失败
* @throws Exception 异常
*/
public ResultDTO<Void> retryInstance(Long instanceId) throws Exception {
RequestBody body = new FormBody.Builder()
.add("instanceId", instanceId.toString())
.add("appId", appId.toString())
.build();
String post = postHA(OpenAPIConstant.RETRY_INSTANCE, body);
return JsonUtils.parseObject(post, ResultDTO.class);
}
/**
* 查询任务实例状态
* @param instanceId 应用实例ID
@ -410,6 +426,6 @@ public class OhMyClient {
}
log.error("[OhMyClient] do post for path: {} failed because of no server available in {}.", path, allAddress);
throw new OmsException("no server available");
throw new OmsException("no server available when send post");
}
}

View File

@ -109,4 +109,10 @@ public class TestClient {
ResultDTO<Void> cancelRes = ohMyClient.cancelInstance(startRes.getData());
System.out.println("cancelJob result: " + JsonUtils.toJSONString(cancelRes));
}
@Test
public void testRetryInstance() throws Exception {
ResultDTO<Void> res = ohMyClient.retryInstance(169557545206153344L);
System.out.println(res);
}
}

View File

@ -5,12 +5,12 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0</version>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.2.1</version>
<version>3.2.2</version>
<packaging>jar</packaging>
<properties>

View File

@ -23,6 +23,7 @@ public class OpenAPIConstant {
/* ************* Instance 区 ************* */
public static final String STOP_INSTANCE = "/stopInstance";
public static final String CANCEL_INSTANCE = "/cancelInstance";
public static final String RETRY_INSTANCE = "/retryInstance";
public static final String FETCH_INSTANCE_STATUS = "/fetchInstanceStatus";
public static final String FETCH_INSTANCE_INFO = "/fetchInstanceInfo";

View File

@ -14,7 +14,7 @@ public class SystemMetrics implements OmsSerializable, Comparable<SystemMetrics>
// CPU核心数量
private int cpuProcessors;
// CPU负载需要处以核心数
// CPU负载负载 使用率 是两个完全不同的概念Java 无法获取 CPU 使用率只能获取负载
private double cpuLoad;
// 内存单位 GB
@ -34,11 +34,12 @@ public class SystemMetrics implements OmsSerializable, Comparable<SystemMetrics>
@Override
public int compareTo(SystemMetrics that) {
return this.calculateScore() - that.calculateScore();
// 降序排列
return that.calculateScore() - this.calculateScore();
}
/**
* 计算得分情况内存 then CPU then 磁盘磁盘必须有1G以上的剩余空间
* 计算得分情况内存 & CPU (磁盘不参与计算)
* @return 得分情况
*/
public int calculateScore() {
@ -47,13 +48,17 @@ public class SystemMetrics implements OmsSerializable, Comparable<SystemMetrics>
return score;
}
double availableCPUCores = cpuProcessors * cpuLoad;
double availableMemory = jvmMaxMemory - jvmUsedMemory;
// 对于 TaskTracker 来说内存是任务顺利完成的关键因此内存 2 块钱 1GB
double memScore = (jvmMaxMemory - jvmUsedMemory) * 2;
// CPU 剩余负载1 块钱 1
double cpuScore = cpuProcessors - cpuLoad;
// Indian Windows 无法获取 CpuLoad -1固定为 1
if (cpuScore > cpuProcessors) {
cpuScore = 1;
}
// Windows下无法获取CPU可用核心数值固定为-1
cpuLoad = Math.max(0, cpuLoad);
return (int) (availableMemory * 2 + availableCPUCores);
score = (int) (memScore + cpuScore);
return score;
}
/**
@ -65,9 +70,17 @@ public class SystemMetrics implements OmsSerializable, Comparable<SystemMetrics>
*/
public boolean available(double minCPUCores, double minMemorySpace, double minDiskSpace) {
double currentCpuCores = Math.max(cpuLoad * cpuProcessors, 0);
double currentMemory = jvmMaxMemory - jvmUsedMemory;
double currentDisk = diskTotal - diskUsed;
return currentCpuCores >= minCPUCores && currentMemory >= minMemorySpace && currentDisk >= minDiskSpace;
double availableMemory = jvmMaxMemory - jvmUsedMemory;
double availableDisk = diskTotal - diskUsed;
if (availableMemory < minMemorySpace || availableDisk < minDiskSpace) {
return false;
}
// cpuLoad 为负数代表无法获取不判断等于 0 为最理想情况CPU 空载不需要判断
if (cpuLoad <= 0 || minCPUCores <= 0) {
return true;
}
return minCPUCores < (cpuProcessors - cpuLoad);
}
}

View File

@ -7,6 +7,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.Collection;
import java.util.UUID;
import java.util.function.Supplier;
@ -146,4 +147,12 @@ public class CommonUtils {
return OmsConstant.NONE;
}
/**
* 生成 UUID
* @return uuid
*/
public static String genUUID() {
return StringUtils.replace(UUID.randomUUID().toString(), "-", "");
}
}

View File

@ -5,20 +5,26 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0</version>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.2.1</version>
<version>3.2.2</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.common.version>3.2.1</powerjob.common.version>
<powerjob.common.version>3.2.2</powerjob.common.version>
<!-- 数据库驱动版本使用的是spring-boot-dependencies管理的版本 -->
<mysql.version>8.0.19</mysql.version>
<ojdbc.version>19.7.0.0</ojdbc.version>
<mssql-jdbc.version>7.4.1.jre8</mssql-jdbc.version>
<db2-jdbc.version>11.5.0.0</db2-jdbc.version>
<postgresql.version>42.2.14</postgresql.version>
<h2.db.version>1.4.200</h2.db.version>
<zip4j.version>2.5.2</zip4j.version>
<jgit.version>5.7.0.202003110725-r</jgit.version>
<mvn.invoker.version>3.0.1</mvn.invoker.version>
@ -44,6 +50,30 @@
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- oracle -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>${ojdbc.version}</version>
</dependency>
<!-- sqlserver -->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>${mssql-jdbc.version}</version>
</dependency>
<!-- db2 -->
<dependency>
<groupId>com.ibm.db2</groupId>
<artifactId>jcc</artifactId>
<version>${db2-jdbc.version}</version>
</dependency>
<!-- postgresql -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
<!-- h2 database -->
<dependency>
<groupId>com.h2database</groupId>

View File

@ -1,15 +1,11 @@
package com.github.kfcfans.powerjob.server;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.io.File;
/**
* SpringBoot 启动入口
*
@ -47,13 +43,6 @@ public class OhMyApplication {
private static void pre() {
log.info(TIPS);
// 删除历史遗留的 H2 数据库文件
try {
FileUtils.forceDeleteOnExit(new File(OmsFileUtils.genH2Path()));
}catch (Exception e) {
log.warn("[PowerJob] delete h2 workspace({}) failed, if server can't startup successfully, please delete it manually", OmsFileUtils.genH2Path(), e);
}
}
}

View File

@ -7,6 +7,7 @@ import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.akka.actors.FriendActor;
import com.github.kfcfans.powerjob.server.akka.actors.ServerActor;
import com.github.kfcfans.powerjob.server.akka.actors.ServerTroubleshootingActor;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
@ -44,7 +45,7 @@ public class OhMyServer {
// 解析配置文件
PropertyUtils.init();
Properties properties = PropertyUtils.getProperties();
int port = Integer.parseInt(properties.getProperty("oms.akka.port", "10086"));
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.AKKA_PORT, "10086"));
// 启动 ActorSystem
Map<String, Object> overrideConfig = Maps.newHashMap();

View File

@ -0,0 +1,27 @@
package com.github.kfcfans.powerjob.server.common;
/**
* 配置文件 key
*
* @author tjq
* @since 2020/8/2
*/
public class PowerJobServerConfigKey {
/**
* akka 端口号
*/
public static final String AKKA_PORT = "oms.akka.port";
/**
* alarm bean 名称多值逗号分隔
*/
public static final String ALARM_BEAN_NAMES = "oms.alarm.bean.names";
/**
* 自定义数据库表前缀
*/
public static final String TABLE_PREFIX = "oms.table-prefix";
/**
* 是否使用 mongoDB
*/
public static final String MONGODB_ENABLE = "oms.mongodb.enable";
}

View File

@ -14,7 +14,7 @@ import java.util.concurrent.*;
* 公用线程池配置
* omsTimingPool用于执行定时任务的线程池
* omsCommonPool用于执行普通任务的线程池
* omsCommonPool用于执行后台任务的线程池这类任务对时间不敏感慢慢执行细水长流即可
* omsBackgroundPool用于执行后台任务的线程池这类任务对时间不敏感慢慢执行细水长流即可
* taskScheduler用于定时调度的线程池
*
* @author tjq
@ -59,7 +59,7 @@ public class ThreadPoolConfig {
public Executor initBackgroundPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.setQueueCapacity(8192);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("omsBackgroundPool-");

View File

@ -1,13 +1,12 @@
package com.github.kfcfans.powerjob.server.common.utils;
import org.apache.commons.lang3.StringUtils;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.util.DigestUtils;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.net.URLEncoder;
import java.util.UUID;
/**
* 文件工具类统一文件存放地址
@ -49,17 +48,19 @@ public class OmsFileUtils {
* @return 临时目录
*/
public static String genTemporaryWorkPath() {
String uuid = StringUtils.replace(UUID.randomUUID().toString(), "-", "");
return genTemporaryPath() + uuid + "/";
return genTemporaryPath() + CommonUtils.genUUID() + "/";
}
/**
* 获取 H2 数据库工作目录
* @return H2 工作目录
*/
public static String genH2Path() {
public static String genH2BasePath() {
return COMMON_PATH + "h2/";
}
public static String genH2WorkPath() {
return genH2BasePath() + CommonUtils.genUUID() + "/";
}
/**
* 将文本写入文件

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.persistence.config;
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.io.FileUtils;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
@ -10,6 +11,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
import java.io.File;
/**
* 多重数据源配置
@ -34,14 +36,21 @@ public class MultiDatasourceConfig {
@Bean("omsLocalDatasource")
public DataSource initOmsLocalDatasource() {
String h2Path = OmsFileUtils.genH2WorkPath();
HikariConfig config = new HikariConfig();
config.setDriverClassName(H2_DRIVER_CLASS_NAME);
config.setJdbcUrl(String.format(H2_JDBC_URL_PATTERN, OmsFileUtils.genH2Path()));
config.setJdbcUrl(String.format(H2_JDBC_URL_PATTERN, h2Path));
config.setAutoCommit(true);
// 池中最小空闲连接数量
config.setMinimumIdle(H2_MIN_SIZE);
// 池中最大连接数量
config.setMaximumPoolSize(H2_MAX_ACTIVE_SIZE);
// JVM 关闭时删除文件
try {
FileUtils.forceDeleteOnExit(new File(h2Path));
}catch (Exception ignore) {
}
return new HikariDataSource(config);
}
}

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.server.persistence.config;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils;
import org.hibernate.boot.model.naming.Identifier;
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment;
@ -34,7 +35,7 @@ public class PowerJobPhysicalNamingStrategy extends SpringPhysicalNamingStrategy
@Override
public Identifier toPhysicalTableName(Identifier name, JdbcEnvironment jdbcEnvironment) {
String tablePrefix = PropertyUtils.getProperties().getProperty("oms.table-prefix");
String tablePrefix = PropertyUtils.getProperties().getProperty(PowerJobServerConfigKey.TABLE_PREFIX);
String text = name.getText();
String noDOText = StringUtils.endsWithIgnoreCase(text, "do") ? text.substring(0, text.length() - 2) : text;

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.persistence.core.model;
import lombok.Data;
import org.hibernate.annotations.GenericGenerator;
import javax.persistence.*;
import java.util.Date;
@ -17,7 +18,8 @@ import java.util.Date;
public class AppInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
private String appName;

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.persistence.core.model;
import lombok.Data;
import org.hibernate.annotations.GenericGenerator;
import javax.persistence.*;
import java.util.Date;
@ -17,7 +18,8 @@ import java.util.Date;
public class ContainerInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
// 所属的应用ID

View File

@ -4,6 +4,7 @@ import com.github.kfcfans.powerjob.common.InstanceStatus;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import javax.persistence.*;
import java.util.Date;
@ -22,7 +23,8 @@ import java.util.Date;
public class InstanceInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
// 任务ID
@ -33,7 +35,7 @@ public class InstanceInfoDO {
private Long instanceId;
// 任务实例参数
@Lob
@Column(columnDefinition="TEXT")
@Column
private String instanceParams;
// 该任务实例的类型普通/工作流InstanceType
@ -46,7 +48,7 @@ public class InstanceInfoDO {
private Integer status;
// 执行结果允许存储稍大的结果
@Lob
@Column(columnDefinition="TEXT")
@Column
private String result;
// 预计触发时间
private Long expectedTriggerTime;

View File

@ -4,6 +4,7 @@ package com.github.kfcfans.powerjob.server.persistence.core.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import javax.persistence.*;
import java.util.Date;
@ -23,7 +24,8 @@ public class JobInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
/* ************************** 任务基本信息 ************************** */
@ -49,7 +51,7 @@ public class JobInfoDO {
private Integer processorType;
// 执行器信息可能需要存储整个脚本文件
@Lob
@Column(columnDefinition="TEXT")
@Column
private String processorInfo;
/* ************************** 运行时配置 ************************** */

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.persistence.core.model;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import javax.persistence.*;
import java.util.Date;
@ -19,7 +20,8 @@ import java.util.Date;
public class OmsLockDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
private String lockName;

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.server.persistence.core.model;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import javax.persistence.*;
import java.util.Date;
@ -19,7 +20,8 @@ import java.util.Date;
public class ServerInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
/**

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.powerjob.server.persistence.core.model;
import lombok.Data;
import org.hibernate.annotations.GenericGenerator;
import javax.persistence.*;
import java.util.Date;
@ -17,7 +18,8 @@ import java.util.Date;
public class UserInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
private String username;

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.persistence.core.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import javax.persistence.*;
import java.util.Date;
@ -21,7 +22,8 @@ import java.util.Date;
public class WorkflowInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
private String wfName;
@ -32,7 +34,7 @@ public class WorkflowInfoDO {
// 工作流的DAG图信息点线式DAG的json
@Lob
@Column(columnDefinition="TEXT")
@Column
private String peDAG;
/* ************************** 定时参数 ************************** */

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.server.persistence.core.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import javax.persistence.*;
import java.util.Date;
@ -21,7 +22,8 @@ import java.util.Date;
public class WorkflowInstanceInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
// 任务所属应用的ID冗余提高查询效率
private Long appId;
@ -35,10 +37,10 @@ public class WorkflowInstanceInfoDO {
private Integer status;
@Lob
@Column(columnDefinition="TEXT")
@Column
private String dag;
@Lob
@Column(columnDefinition="TEXT")
@Column
private String result;
// 实际触发时间

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.server.persistence.mongodb;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Maps;
import com.mongodb.client.MongoDatabase;
@ -13,10 +14,13 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.*;
import java.util.Date;
import java.util.Map;
@ -30,18 +34,24 @@ import java.util.function.Consumer;
*/
@Slf4j
@Service
public class GridFsManager {
public class GridFsManager implements InitializingBean {
@Resource
private Environment environment;
private MongoDatabase db;
private boolean available;
private Map<String, GridFSBucket> bucketCache = Maps.newConcurrentMap();
private final Map<String, GridFSBucket> bucketCache = Maps.newConcurrentMap();
public static final String LOG_BUCKET = "log";
public static final String CONTAINER_BUCKET = "container";
@Autowired(required = false)
public void setMongoTemplate(MongoTemplate mongoTemplate) {
this.db = mongoTemplate.getDb();
if (mongoTemplate != null) {
this.db = mongoTemplate.getDb();
}
}
/**
@ -49,7 +59,7 @@ public class GridFsManager {
* @return true可用false不可用
*/
public boolean available() {
return db != null;
return available;
}
/**
@ -109,12 +119,12 @@ public class GridFsManager {
ObjectId objectId = gridFSFile.getObjectId();
try {
bucket.delete(objectId);
log.info("[GridFsHelper] deleted {}#{}", bucketName, objectId);
log.info("[GridFsManager] deleted {}#{}", bucketName, objectId);
}catch (Exception e) {
log.error("[GridFsHelper] deleted {}#{} failed.", bucketName, objectId, e);
log.error("[GridFsManager] deleted {}#{} failed.", bucketName, objectId, e);
}
});
log.info("[GridFsHelper] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop());
log.info("[GridFsManager] clean bucket({}) successfully, delete all files before {}, using {}.", bucketName, date, sw.stop());
}
public boolean exists(String bucketName, String fileName) {
@ -131,4 +141,11 @@ public class GridFsManager {
private GridFSBucket getBucket(String bucketName) {
return bucketCache.computeIfAbsent(bucketName, ignore -> GridFSBuckets.create(db, bucketName));
}
@Override
public void afterPropertiesSet() throws Exception {
String enable = environment.getProperty(PowerJobServerConfigKey.MONGODB_ENABLE, Boolean.FALSE.toString());
available = Boolean.TRUE.toString().equals(enable) && db != null;
log.info("[GridFsManager] available: {}, db: {}", available, db);
}
}

View File

@ -193,6 +193,7 @@ public class InstanceLogService {
try {
instanceId2LastReportTime.remove(instanceId);
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
log.warn("[InstanceLog-{}] delete local instanceLog successfully.", instanceId);
}catch (Exception e) {
log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e);
}
@ -308,7 +309,7 @@ public class InstanceLogService {
@Async("omsTimingPool")
@Scheduled(fixedDelay = 60000)
@Scheduled(fixedDelay = 120000)
public void timingCheck() {
// 定时删除秒级任务的日志

View File

@ -0,0 +1,36 @@
package com.github.kfcfans.powerjob.server.service.alarm;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import org.apache.commons.lang3.StringUtils;
/**
* 报警内容
*
* @author tjq
* @since 2020/8/1
*/
public interface Alarm extends OmsSerializable {
String fetchTitle();
default String fetchContent() {
StringBuilder sb = new StringBuilder();
JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this));
content.forEach((key, originWord) -> {
sb.append(key).append(": ");
String word = String.valueOf(originWord);
if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) {
try {
if (originWord instanceof Long) {
word = CommonUtils.formatTime((Long) originWord);
}
}catch (Exception ignore) {
}
}
sb.append(word).append("\n\r");
});
return sb.toString();
}
}

View File

@ -12,17 +12,6 @@ import java.util.List;
*/
public interface Alarmable {
/**
* 任务执行失败报警
* @param content 任务实例相关信息
* @param targetUserList 目标用户列表
*/
void onJobInstanceFailed(JobInstanceAlarmContent content, List<UserInfoDO> targetUserList);
void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
/**
* 工作流执行失败报警
* @param content 工作流实例相关信息
* @param targetUserList 目标用户列表
*/
void onWorkflowInstanceFailed(WorkflowInstanceAlarmContent content, List<UserInfoDO> targetUserList);
}

View File

@ -1,15 +1,16 @@
package com.github.kfcfans.powerjob.server.service.alarm;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
/**
@ -22,37 +23,18 @@ import java.util.List;
@Service("omsDefaultMailAlarmService")
public class DefaultMailAlarmService implements Alarmable {
@Resource
private Environment environment;
private JavaMailSender javaMailSender;
@Value("${spring.mail.username}")
private String from;
private static final String MAIL_TITLE = "OhMyScheduler AlarmService";
private static final String JOB_INSTANCE_FAILED_CONTENT_PATTERN = "Job run failed, detail is: %s";
private static final String WF_INSTANCE_FAILED_CONTENT_PATTERN = "Workflow run failed, detail is: %s";
@Autowired(required = false)
public DefaultMailAlarmService(JavaMailSender javaMailSender) {
this.javaMailSender = javaMailSender;
}
private static final String FROM_KEY = "spring.mail.username";
@Override
public void onJobInstanceFailed(JobInstanceAlarmContent content, List<UserInfoDO> targetUserList) {
String msg = String.format(JOB_INSTANCE_FAILED_CONTENT_PATTERN, JsonUtils.toJSONString(content));
sendMail(msg, targetUserList);
}
@Override
public void onWorkflowInstanceFailed(WorkflowInstanceAlarmContent content, List<UserInfoDO> targetUserList) {
String msg = String.format(WF_INSTANCE_FAILED_CONTENT_PATTERN, JsonUtils.toJSONString(content));
sendMail(msg, targetUserList);
}
private void sendMail(String msg, List<UserInfoDO> targetUserList) {
log.debug("[OmsMailAlarmService] msg: {}, to: {}", msg, targetUserList);
if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null) {
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
initFrom();
if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
return;
}
@ -60,12 +42,24 @@ public class DefaultMailAlarmService implements Alarmable {
try {
sm.setFrom(from);
sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).toArray(String[]::new));
sm.setSubject(MAIL_TITLE);
sm.setText(msg);
sm.setSubject(alarm.fetchTitle());
sm.setText(alarm.fetchContent());
javaMailSender.send(sm);
}catch (Exception e) {
log.error("[OmsMailAlarmService] send mail({}) failed, reason is {}", sm, e.getMessage());
}
}
@Autowired(required = false)
public void setJavaMailSender(JavaMailSender javaMailSender) {
this.javaMailSender = javaMailSender;
}
// 不能直接使用 @Value 注入不存在的时候会报错
private void initFrom() {
if (StringUtils.isEmpty(from)) {
from = environment.getProperty(FROM_KEY);
}
}
}

View File

@ -9,7 +9,7 @@ import lombok.Data;
* @since 2020/4/30
*/
@Data
public class JobInstanceAlarmContent {
public class JobInstanceAlarm implements Alarm {
// 应用ID
private long appId;
// 任务ID
@ -43,4 +43,9 @@ public class JobInstanceAlarmContent {
private Long finishedTime;
// TaskTracker地址
private String taskTrackerAddress;
@Override
public String fetchTitle() {
return "PowerJob AlarmService: Job Running Failed";
}
}

View File

@ -1,15 +1,17 @@
package com.github.kfcfans.powerjob.server.service.alarm;
import com.github.kfcfans.powerjob.server.common.PowerJobServerConfigKey;
import com.github.kfcfans.powerjob.server.common.SJ;
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
/**
@ -22,36 +24,19 @@ import java.util.List;
@Service("omsCenterAlarmService")
public class OmsCenterAlarmService implements Alarmable {
@Setter
@Value("${oms.alarm.bean.names}")
private String beanNames;
@Resource
private Environment environment;
private List<Alarmable> alarmableList;
private volatile boolean initialized = false;
public OmsCenterAlarmService() {
}
@Async("omsCommonPool")
@Override
public void onJobInstanceFailed(JobInstanceAlarmContent content, List<UserInfoDO> targetUserList) {
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
init();
alarmableList.forEach(alarmable -> {
try {
alarmable.onJobInstanceFailed(content, targetUserList);
}catch (Exception e) {
log.warn("[OmsCenterAlarmService] alarm failed.", e);
}
});
}
@Async("omsCommonPool")
@Override
public void onWorkflowInstanceFailed(WorkflowInstanceAlarmContent content, List<UserInfoDO> targetUserList) {
init();
alarmableList.forEach(alarmable -> {
try {
alarmable.onWorkflowInstanceFailed(content, targetUserList);
alarmable.onFailed(alarm, targetUserList);
}catch (Exception e) {
log.warn("[OmsCenterAlarmService] alarm failed.", e);
}
@ -73,17 +58,20 @@ public class OmsCenterAlarmService implements Alarmable {
}
alarmableList = Lists.newLinkedList();
Splitter.on(",").split(beanNames).forEach(beanName -> {
try {
Alarmable bean = (Alarmable) SpringUtils.getBean(beanName);
alarmableList.add(bean);
log.info("[OmsCenterAlarmService] load Alarmable for bean: {} successfully.", beanName);
}catch (Exception e) {
log.warn("[OmsCenterAlarmService] initialize Alarmable for bean: {} failed.", beanName, e);
}
});
String beanNames = environment.getProperty(PowerJobServerConfigKey.ALARM_BEAN_NAMES);
if (StringUtils.isNotEmpty(beanNames)) {
SJ.commaSplitter.split(beanNames).forEach(beanName -> {
try {
Alarmable bean = (Alarmable) SpringUtils.getBean(beanName);
alarmableList.add(bean);
log.info("[OmsCenterAlarmService] load Alarmable for bean: {} successfully.", beanName);
}catch (Exception e) {
log.warn("[OmsCenterAlarmService] initialize Alarmable for bean: {} failed.", beanName, e);
}
});
}
initialized = true;
}
}
}

View File

@ -10,7 +10,7 @@ import lombok.Data;
* @since 2020/6/12
*/
@Data
public class WorkflowInstanceAlarmContent {
public class WorkflowInstanceAlarm implements Alarm {
private String workflowName;
@ -34,4 +34,9 @@ public class WorkflowInstanceAlarmContent {
private Integer timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
private String timeExpression;
@Override
public String fetchTitle() {
return "PowerJob AlarmService: Workflow Running Failed";
}
}

View File

@ -22,13 +22,13 @@ import java.util.Map;
public class ClusterStatusHolder {
// 集群所属的应用名称
private String appName;
private final String appName;
// 集群中所有机器的健康状态
private Map<String, SystemMetrics> address2Metrics;
private final Map<String, SystemMetrics> address2Metrics;
// 集群中所有机器的容器部署状态 containerId -> (workerAddress -> containerInfo)
private Map<Long, Map<String, DeployedContainerInfo>> containerId2Infos;
// 集群中所有机器的最后心跳时间
private Map<String, Long> address2ActiveTime;
private final Map<String, Long> address2ActiveTime;
private static final long WORKER_TIMEOUT_MS = 60000;
@ -78,11 +78,14 @@ public class ClusterStatusHolder {
address2Metrics.forEach((address, metrics) -> {
if (timeout(address)) {
log.info("[ClusterStatusHolder] worker(address={},metrics={}) was filtered because of timeout, last active time is {}.", address, metrics, address2ActiveTime.get(address));
return;
}
// 判断指标
if (metrics.available(minCPUCores, minMemorySpace, minDiskSpace)) {
workers.add(address);
}else {
log.info("[ClusterStatusHolder] worker(address={},metrics={}) was filtered by config(minCPUCores={},minMemory={},minDiskSpace={})", address, metrics, minCPUCores, minMemorySpace, minDiskSpace);
}
});

View File

@ -99,4 +99,8 @@ public class WorkerManagerService {
public static void cleanUp() {
appId2ClusterStatus.values().forEach(ClusterStatusHolder::release);
}
public static Map<Long, ClusterStatusHolder> getAppId2ClusterStatus() {
return appId2ClusterStatus;
}
}

View File

@ -19,7 +19,7 @@ import org.springframework.stereotype.Service;
@Service
public class IdGenerateService {
private SnowFlakeIdGenerator snowFlakeIdGenerator;
private final SnowFlakeIdGenerator snowFlakeIdGenerator;
private static final int DATA_CENTER_ID = 0;

View File

@ -12,7 +12,7 @@ import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.service.InstanceLogService;
import com.github.kfcfans.powerjob.server.service.UserService;
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarmContent;
import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarm;
import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
import lombok.extern.slf4j.Slf4j;
@ -162,12 +162,12 @@ public class InstanceManager {
}
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
JobInstanceAlarmContent content = new JobInstanceAlarmContent();
JobInstanceAlarm content = new JobInstanceAlarm();
BeanUtils.copyProperties(jobInfo, content);
BeanUtils.copyProperties(instanceInfo, content);
List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds());
omsCenterAlarmService.onJobInstanceFailed(content, userList);
omsCenterAlarmService.onFailed(content, userList);
}
// 主动移除缓存减小内存占用

View File

@ -15,7 +15,10 @@ import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.common.constans.InstanceType;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
@ -40,11 +43,15 @@ import static com.github.kfcfans.powerjob.common.InstanceStatus.STOPPED;
@Service
public class InstanceService {
@Resource
private DispatchService dispatchService;
@Resource
private IdGenerateService idGenerateService;
@Resource
private InstanceManager instanceManager;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
/**
@ -121,6 +128,34 @@ public class InstanceService {
}
}
/**
* 重试任务只有结束的任务运行重试
* @param instanceId 任务实例ID
*/
public void retryInstance(Long instanceId) {
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
if (!InstanceStatus.finishedStatus.contains(instanceInfo.getStatus())) {
throw new OmsException("Only stopped instance can be retry!");
}
// 暂时不支持工作流任务的重试
if (instanceInfo.getWfInstanceId() != null) {
throw new OmsException("Workflow's instance do not support retry!");
}
instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
instanceInfo.setExpectedTriggerTime(System.currentTimeMillis());
instanceInfo.setFinishedTime(null);
instanceInfo.setActualTriggerTime(null);
instanceInfo.setTaskTrackerAddress(null);
instanceInfo.setResult(null);
instanceInfoRepository.saveAndFlush(instanceInfo);
// 派发任务
Long jobId = instanceInfo.getJobId();
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new OmsException("can't find job info by jobId: " + jobId));
dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes());
}
/**
* 取消任务实例的运行
* 接口使用条件调用接口时间与待取消任务的预计执行时间有一定时间间隔否则不保证可靠性

View File

@ -20,7 +20,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowIn
import com.github.kfcfans.powerjob.server.service.DispatchService;
import com.github.kfcfans.powerjob.server.service.UserService;
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarmContent;
import com.github.kfcfans.powerjob.server.service.alarm.WorkflowInstanceAlarm;
import com.github.kfcfans.powerjob.server.service.id.IdGenerateService;
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
import com.google.common.collect.LinkedListMultimap;
@ -332,14 +332,14 @@ public class WorkflowInstanceManager {
// 报警
try {
workflowInfoRepository.findById(wfInstance.getWorkflowId()).ifPresent(wfInfo -> {
WorkflowInstanceAlarmContent content = new WorkflowInstanceAlarmContent();
WorkflowInstanceAlarm content = new WorkflowInstanceAlarm();
BeanUtils.copyProperties(wfInfo, content);
BeanUtils.copyProperties(wfInstance, content);
content.setResult(result);
List<UserInfoDO> userList = userService.fetchNotifyUserList(wfInfo.getNotifyUserIds());
omsCenterAlarmService.onWorkflowInstanceFailed(content, userList);
omsCenterAlarmService.onFailed(content, userList);
});
}catch (Exception ignore) {
}

View File

@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.common.response.ResultDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.converter.HttpMessageNotReadableException;
import org.springframework.messaging.handler.annotation.support.MethodArgumentTypeMismatchException;
import org.springframework.web.HttpRequestMethodNotSupportedException;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
@ -28,6 +29,8 @@ public class ControllerExceptionHandler {
log.warn("[ControllerException] http request failed, message is {}.", e.getMessage());
} else if (e instanceof HttpMessageNotReadableException || e instanceof MethodArgumentTypeMismatchException) {
log.warn("[ControllerException] invalid http request params, exception is {}.", e.getMessage());
} else if (e instanceof HttpRequestMethodNotSupportedException) {
log.warn("[ControllerException] invalid http request method, exception is {}.", e.getMessage());
} else {
log.error("[ControllerException] http request failed.", e);
}

View File

@ -36,7 +36,7 @@ public class AppInfoController {
@Resource
private AppInfoRepository appInfoRepository;
private static final int MAX_APP_NUM = 50;
private static final int MAX_APP_NUM = 200;
@PostMapping("/save")
public ResultDTO<Void> saveAppInfo(@RequestBody ModifyAppInfoRequest req) {

View File

@ -64,6 +64,12 @@ public class InstanceController {
return ResultDTO.success(null);
}
@GetMapping("/retry")
public ResultDTO<Void> retryInstance(Long instanceId) {
instanceService.retryInstance(instanceId);
return ResultDTO.success(null);
}
@GetMapping("/detail")
public ResultDTO<InstanceDetailVO> getInstanceDetail(String instanceId) {
return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(Long.valueOf(instanceId))));

View File

@ -101,6 +101,13 @@ public class OpenAPIController {
return ResultDTO.success(null);
}
@PostMapping(OpenAPIConstant.RETRY_INSTANCE)
public ResultDTO<Void> retryInstance(Long instanceId, Long appId) {
checkInstanceIdValid(instanceId, appId);
instanceService.retryInstance(instanceId);
return ResultDTO.success(null);
}
@PostMapping(OpenAPIConstant.FETCH_INSTANCE_STATUS)
public ResultDTO<Integer> fetchInstanceStatus(Long instanceId) {
InstanceStatus instanceStatus = instanceService.getInstanceStatus(instanceId);

View File

@ -1,15 +1,18 @@
package com.github.kfcfans.powerjob.server.web.controller;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
import com.github.kfcfans.powerjob.server.service.ha.ServerSelectService;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@ -51,12 +54,17 @@ public class ServerController {
}
@GetMapping("/hello")
public ResultDTO<JSONObject> ping() {
public ResultDTO<JSONObject> ping(@RequestParam(required = false) boolean debug) {
JSONObject res = new JSONObject();
res.put("localHost", NetUtils.getLocalHost());
res.put("actorSystemAddress", OhMyServer.getActorSystemAddress());
res.put("serverTime", CommonUtils.formatTime(System.currentTimeMillis()));
res.put("serverTimeZone", TimeZone.getDefault().getDisplayName());
res.put("appIds", WorkerManagerService.getAppId2ClusterStatus().keySet());
if (debug) {
res.put("appId2ClusterInfo", JSON.parseObject(JSON.toJSONString(WorkerManagerService.getAppId2ClusterStatus())));
}
return ResultDTO.success(res);
}

View File

@ -25,37 +25,35 @@ public class WorkerStatusVO {
private int status;
// 12.3%(4 cores)
private static final String CPU_FORMAT = "%s%%(%d cores)";
private static final String CPU_FORMAT = "%s / %s cores";
// 27.7%(2.9/8.0 GB)
private static final String OTHER_FORMAT = "%s%%%s/%s GB";
private static final String OTHER_FORMAT = "%s%%%s / %s GB";
private static final DecimalFormat df = new DecimalFormat("#.#");
private static final double threshold = 0.8;
private static final double THRESHOLD = 0.8;
public WorkerStatusVO(String address, SystemMetrics systemMetrics) {
this.status = 1;
this.address = address;
String cpuL = df.format(systemMetrics.getCpuLoad() * 100);
this.cpuLoad = String.format(CPU_FORMAT, cpuL, systemMetrics.getCpuProcessors());
this.cpuLoad = String.format(CPU_FORMAT, df.format(systemMetrics.getCpuLoad()), systemMetrics.getCpuProcessors());
if (systemMetrics.getCpuLoad() > systemMetrics.getCpuProcessors() * THRESHOLD) {
this.status ++;
}
String menL = df.format(systemMetrics.getJvmMemoryUsage() * 100);
String menUsed = df.format(systemMetrics.getJvmUsedMemory());
String menMax = df.format(systemMetrics.getJvmMaxMemory());
this.memoryLoad = String.format(OTHER_FORMAT, menL, menUsed, menMax);
if (systemMetrics.getJvmMemoryUsage() > THRESHOLD) {
this.status ++;
}
String diskL = df.format(systemMetrics.getDiskUsage() * 100);
String diskUsed = df.format(systemMetrics.getDiskUsed());
String diskMax = df.format(systemMetrics.getDiskTotal());
this.diskLoad = String.format(OTHER_FORMAT, diskL, diskUsed, diskMax);
if (systemMetrics.getCpuLoad() < threshold && systemMetrics.getDiskUsage() < threshold && systemMetrics.getJvmMemoryUsage() < threshold) {
status = 1;
}else if (systemMetrics.getCpuLoad() > threshold && systemMetrics.getDiskUsage() > threshold && systemMetrics.getJvmMemoryUsage() > threshold) {
status = 3;
}else {
status = 2;
if (systemMetrics.getDiskUsage() > THRESHOLD) {
this.status ++;
}
}
}

View File

@ -3,19 +3,20 @@ logging.config=classpath:logback-dev.xml
####### 外部数据库配置(需要用户更改为自己的数据库配置) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置非核心依赖可移除 #######
####### mongoDB配置非核心依赖通过配置 oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily
####### 邮件配置(启用邮件报警则需要 #######
####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错 #######
spring.mail.host=smtp.163.com
spring.mail.username=zqq
spring.mail.password=qqz
spring.mail.username=zqq@163.com
spring.mail.password=GOFZPNARMVKCGONV
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true

View File

@ -3,16 +3,17 @@ logging.config=classpath:logback-product.xml
####### 数据库配置 #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-pre?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-pre?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置非核心依赖可移除 #######
####### mongoDB配置非核心依赖通过配置 oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-pre
####### 邮件配置(启用邮件报警则需要 #######
####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错 #######
spring.mail.host=smtp.qq.com
spring.mail.username=zqq
spring.mail.password=qqz

View File

@ -3,16 +3,17 @@ logging.config=classpath:logback-product.xml
####### 数据库配置 #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置非核心依赖可移除 #######
####### mongoDB配置非核心依赖通过配置 oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-product
####### 邮件配置(启用邮件报警则需要 #######
####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错 #######
spring.mail.host=smtp.qq.com
spring.mail.username=zqq
spring.mail.password=qqz

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -8,7 +8,7 @@
/***/ (function(module, __webpack_exports__, __webpack_require__) {
"use strict";
eval("__webpack_require__.r(__webpack_exports__);\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"Home\",\n data: function data() {\n return {\n systemInfo: {\n jobCount: \"N/A\",\n runningInstanceCount: \"N/A\",\n failedInstanceCount: \"N/A\",\n serverTime: \"UNKNOWN\",\n timezone: \"UNKNOWN\"\n },\n activeWorkerCount: \"N/A\",\n workerList: []\n };\n },\n methods: {\n workerTableRowClassName: function workerTableRowClassName(_ref) {\n var row = _ref.row;\n\n switch (row.status) {\n case 1:\n return 'success-row';\n\n case 2:\n return 'warning-row';\n\n case 3:\n return 'error-row';\n }\n }\n },\n mounted: function mounted() {\n var that = this;\n var appId = that.$store.state.appInfo.id; // 请求 Worker 列表\n\n that.axios.get(\"/system/listWorker?appId=\" + appId).then(function (res) {\n that.workerList = res;\n that.activeWorkerCount = that.workerList.length;\n }); // 请求 Overview\n\n that.axios.get(\"/system/overview?appId=\" + appId).then(function (res) {\n that.systemInfo = res; // 对比服务器时间和本地时间,误差超过一定时间弹窗警告\n // let localTime=new Date().getTime();\n // let serverTime = res.serverTime;\n // console.log(\"localTime: %o, serverTime: %o\", localTime, serverTime);\n //\n // let offset = localTime - serverTime;\n // if (Math.abs(offset) > 60000) {\n // this.$notify({\n // title: '警告',\n // message: '调度中心服务器与本地存在时间差,可能影响任务调度准确性,建议排查时间问题!',\n // type: 'warning',\n // duration: 0\n // });\n // }\n });\n }\n});\n\n//# sourceURL=webpack:///./src/components/views/Home.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options");
eval("__webpack_require__.r(__webpack_exports__);\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"Home\",\n data: function data() {\n return {\n systemInfo: {\n jobCount: \"N/A\",\n runningInstanceCount: \"N/A\",\n failedInstanceCount: \"N/A\",\n serverTime: \"UNKNOWN\",\n timezone: \"UNKNOWN\"\n },\n activeWorkerCount: \"N/A\",\n workerList: []\n };\n },\n methods: {\n workerTableRowClassName: function workerTableRowClassName(_ref) {\n var row = _ref.row;\n\n switch (row.status) {\n case 1:\n return 'success-row';\n\n case 2:\n return 'warning-row';\n\n default:\n return 'error-row';\n }\n }\n },\n mounted: function mounted() {\n var that = this;\n var appId = that.$store.state.appInfo.id; // 请求 Worker 列表\n\n that.axios.get(\"/system/listWorker?appId=\" + appId).then(function (res) {\n that.workerList = res;\n that.activeWorkerCount = that.workerList.length;\n }); // 请求 Overview\n\n that.axios.get(\"/system/overview?appId=\" + appId).then(function (res) {\n that.systemInfo = res; // 对比服务器时间和本地时间,误差超过一定时间弹窗警告\n // let localTime=new Date().getTime();\n // let serverTime = res.serverTime;\n // console.log(\"localTime: %o, serverTime: %o\", localTime, serverTime);\n //\n // let offset = localTime - serverTime;\n // if (Math.abs(offset) > 60000) {\n // this.$notify({\n // title: '警告',\n // message: '调度中心服务器与本地存在时间差,可能影响任务调度准确性,建议排查时间问题!',\n // type: 'warning',\n // duration: 0\n // });\n // }\n });\n }\n});\n\n//# sourceURL=webpack:///./src/components/views/Home.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options");
/***/ }),

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob.server.test;
import com.github.kfcfans.powerjob.server.OhMyApplication;
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.HashedWheelTimer;
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;

View File

@ -5,17 +5,17 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0</version>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.2.1</version>
<version>3.2.2</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.2.1</powerjob.worker.version>
<powerjob.worker.version>3.2.2</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -5,16 +5,16 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0</version>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.2.1</version>
<version>3.2.2</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.version>3.2.1</powerjob.worker.version>
<powerjob.worker.starter.version>3.2.2</powerjob.worker.starter.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->
@ -43,8 +43,8 @@
<dependency>
<groupId>com.github.kfcfans</groupId>
<artifactId>powerjob-worker</artifactId>
<version>${powerjob.worker.version}</version>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>${powerjob.worker.starter.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>

View File

@ -1,17 +1,10 @@
package com.github.kfcfans.powerjob.samples;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
* OMS-Worker 配置
* powerjob-worker 配置
* 代码配置示例SpringBoot 项目支持使用 starter只需要在 application.properties 中完成配置即可
*
* @author tjq
* @since 2020/4/17
@ -19,8 +12,7 @@ import java.util.List;
@Configuration
public class OhMySchedulerConfig {
@Value("${powerjob.akka.port}")
private int port;
/*
@Bean
public OhMyWorker initOMS() throws Exception {
@ -30,16 +22,18 @@ public class OhMySchedulerConfig {
// 1. 创建配置文件
OhMyConfig config = new OhMyConfig();
config.setPort(port);
config.setPort(27777);
config.setAppName("powerjob-agent-test");
config.setServerAddress(serverAddress);
// 如果没有大型 Map/MapReduce 的需求建议使用内存来加速计算
// 为了本地模拟多个实例只能使用 MEMORY 启动文件只能由一个应用占有
config.setStoreStrategy(StoreStrategy.MEMORY);
config.setStoreStrategy(StoreStrategy.DISK);
// 2. 创建 Worker 对象设置配置文件
OhMyWorker ohMyWorker = new OhMyWorker();
ohMyWorker.setConfig(config);
return ohMyWorker;
}
*/
}

View File

@ -0,0 +1,25 @@
package com.github.kfcfans.powerjob.samples.tester;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import org.springframework.stereotype.Component;
/**
* 测试用户反馈的无法停止实例的问题
* https://github.com/KFCFans/PowerJob/issues/37
*
* @author tjq
* @since 2020/7/30
*/
@Component
public class StopInstanceTester implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
int i = 0;
while (true) {
System.out.println(i++);
Thread.sleep(1000*10);
}
}
}

View File

@ -2,4 +2,12 @@ server.port=8081
spring.jpa.open-in-view=false
powerjob.akka.port=27777
########### powerjob-worker 配置 ###########
# akka 工作端口,可选,默认 27777
powerjob.akka-port=27777
# 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
powerjob.app-name=powerjob-agent-test
# 调度服务器地址IP:Port 或 域名,多值逗号分隔
powerjob.server-address=127.0.0.1:7700,127.0.0.1:7701
# 持久化方式,可选,默认 disk
powerjob.store-strategy=disk

View File

@ -5,16 +5,16 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0</version>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>3.2.1</version>
<version>3.2.2</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.2.1</powerjob.worker.version>
<powerjob.worker.version>3.2.2</powerjob.worker.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
</properties>

View File

@ -5,17 +5,17 @@
<parent>
<artifactId>powerjob</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0</version>
<version>2.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.2.1</version>
<version>3.2.2</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.2.1</powerjob.common.version>
<powerjob.common.version>3.2.2</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -12,16 +12,15 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.HttpUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.worker.actors.TroubleshootingActor;
import com.github.kfcfans.powerjob.worker.actors.ProcessorTrackerActor;
import com.github.kfcfans.powerjob.worker.actors.TaskTrackerActor;
import com.github.kfcfans.powerjob.worker.actors.TroubleshootingActor;
import com.github.kfcfans.powerjob.worker.actors.WorkerActor;
import com.github.kfcfans.powerjob.worker.background.OmsLogHandler;
import com.github.kfcfans.powerjob.worker.background.ServerDiscoveryService;
import com.github.kfcfans.powerjob.worker.background.WorkerHealthReporter;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.worker.common.OmsBannerPrinter;
import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
import com.google.common.base.Stopwatch;
@ -31,7 +30,6 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
@ -39,7 +37,6 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.util.StringUtils;
import java.io.File;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
@ -83,7 +80,6 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[OhMyWorker] start to initialize OhMyWorker...");
try {
pre();
OmsBannerPrinter.print();
// 校验 appName
if (!config.isEnableTestMode()) {
@ -183,14 +179,4 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
public void destroy() throws Exception {
timingPool.shutdownNow();
}
private static void pre() {
// 删除历史遗留的 H2 数据库文件
String h2Path = OmsWorkerFileUtils.getH2Dir();
try {
FileUtils.forceDeleteOnExit(new File(h2Path));
}catch (Exception e) {
log.warn("[PowerJob] delete h2 workspace({}) failed, if worker can't startup successfully, please delete it manually", h2Path, e);
}
}
}

View File

@ -1,5 +1,7 @@
package com.github.kfcfans.powerjob.worker.common;
import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.IOException;
import java.net.JarURLConnection;
@ -17,6 +19,8 @@ import java.util.jar.JarFile;
*/
public final class OmsWorkerVersion {
private static String CACHE = null;
/**
* Return the full version string of the present OhMyScheduler-Worker codebase, or {@code null}
* if it cannot be determined.
@ -24,7 +28,10 @@ public final class OmsWorkerVersion {
* @see Package#getImplementationVersion()
*/
public static String getVersion() {
return determineSpringBootVersion();
if (StringUtils.isEmpty(CACHE)) {
CACHE = determineSpringBootVersion();
}
return CACHE;
}
private static String determineSpringBootVersion() {

View File

@ -1,5 +1,8 @@
package com.github.kfcfans.powerjob.worker.common.utils;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
/**
* 文件工具类
*
@ -9,7 +12,7 @@ package com.github.kfcfans.powerjob.worker.common.utils;
public class OmsWorkerFileUtils {
private static final String USER_HOME = System.getProperty("user.home", "powerjob");
private static final String WORKER_DIR = USER_HOME + "/powerjob/";
private static final String WORKER_DIR = USER_HOME + "/powerjob/" + OhMyWorker.getConfig().getAppName() + "/";
public static String getScriptDir() {
return WORKER_DIR + "script/";
@ -19,7 +22,7 @@ public class OmsWorkerFileUtils {
return WORKER_DIR + "container/";
}
public static String getH2Dir() {
return WORKER_DIR + "h2/";
public static String getH2WorkDir() {
return WORKER_DIR + "h2/" + CommonUtils.genUUID() + "/";
}
}

View File

@ -5,6 +5,8 @@ import com.github.kfcfans.powerjob.common.model.SystemMetrics;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.math.RoundingMode;
import java.text.NumberFormat;
/**
* 系统信息工具用于采集监控指标
@ -14,26 +16,47 @@ import java.lang.management.OperatingSystemMXBean;
*/
public class SystemInfoUtils {
private static final NumberFormat NF = NumberFormat.getNumberInstance();
static {
NF.setMaximumFractionDigits(4);
NF.setMinimumFractionDigits(4);
NF.setRoundingMode(RoundingMode.HALF_UP);
}
// JMX bean can be accessed externally and is meant for management tools like hyperic ( or even nagios ) - It would delegate to Runtime anyway.
private static final Runtime runtime = Runtime.getRuntime();
private static OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();
private static final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();
public static SystemMetrics getSystemMetrics() {
SystemMetrics metrics = new SystemMetrics();
// CPU 信息
fillCPUInfo(metrics);
fillMemoryInfo(metrics);
fillDiskInfo(metrics);
// 在Worker完成分数计算减小Server压力
metrics.calculateScore();
return metrics;
}
private static void fillCPUInfo(SystemMetrics metrics) {
metrics.setCpuProcessors(osMXBean.getAvailableProcessors());
metrics.setCpuLoad(osMXBean.getSystemLoadAverage() / osMXBean.getAvailableProcessors());
metrics.setCpuLoad(miniDouble(osMXBean.getSystemLoadAverage()));
}
private static void fillMemoryInfo(SystemMetrics metrics) {
// JVM内存信息(maxMemory指JVM能从操作系统获取的最大内存-Xmx参数设置的值totalMemory指JVM当前持久的总内存)
metrics.setJvmMaxMemory(bytes2GB(runtime.maxMemory()));
long maxMemory = runtime.maxMemory();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
metrics.setJvmMaxMemory(bytes2GB(maxMemory));
// 已使用内存当前申请总量 - 当前空余量
metrics.setJvmUsedMemory(bytes2GB(runtime.totalMemory() - runtime.freeMemory()));
// 百分比直接 * 100
metrics.setJvmMemoryUsage(1.0 * metrics.getJvmUsedMemory() / runtime.maxMemory());
metrics.setJvmUsedMemory(bytes2GB(usedMemory));
// 已用内存比例
metrics.setJvmMemoryUsage(miniDouble((double) usedMemory / maxMemory));
}
// 磁盘信息
private static void fillDiskInfo(SystemMetrics metrics) {
long free = 0;
long total = 0;
File[] roots = File.listRoots();
@ -44,16 +67,15 @@ public class SystemInfoUtils {
metrics.setDiskUsed(bytes2GB(total - free));
metrics.setDiskTotal(bytes2GB(total));
metrics.setDiskUsage(metrics.getDiskUsed() / metrics.getDiskTotal() * 1.0);
// 在Worker完成分数计算减小Server压力
metrics.calculateScore();
return metrics;
metrics.setDiskUsage(miniDouble(metrics.getDiskUsed() / metrics.getDiskTotal()));
}
private static double bytes2GB(long bytes) {
return bytes / 1024.0 / 1024 / 1024;
return miniDouble(bytes / 1024.0 / 1024 / 1024);
}
private static double miniDouble(double origin) {
return Double.parseDouble(NF.format(origin));
}
}

View File

@ -5,8 +5,12 @@ import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.h2.Driver;
import javax.sql.DataSource;
import java.io.File;
import java.sql.Connection;
import java.sql.SQLException;
@ -16,12 +20,14 @@ import java.sql.SQLException;
* @author tjq
* @since 2020/3/17
*/
@Slf4j
public class ConnectionFactory {
private static volatile DataSource dataSource;
private static final String DISK_JDBC_URL = String.format("jdbc:h2:file:%spowerjob_worker_db", OmsWorkerFileUtils.getH2Dir());
private static final String MEMORY_JDBC_URL = String.format("jdbc:h2:mem:%spowerjob_worker_db", OmsWorkerFileUtils.getH2Dir());
private static final String H2_PATH = OmsWorkerFileUtils.getH2WorkDir();
private static final String DISK_JDBC_URL = String.format("jdbc:h2:file:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH);
private static final String MEMORY_JDBC_URL = String.format("jdbc:h2:mem:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH);
public static Connection getConnection() throws SQLException {
return getDataSource().getConnection();
@ -38,7 +44,7 @@ public class ConnectionFactory {
StoreStrategy strategy = OhMyWorker.getConfig() == null ? StoreStrategy.DISK : OhMyWorker.getConfig().getStoreStrategy();
HikariConfig config = new HikariConfig();
config.setDriverClassName("org.h2.Driver");
config.setDriverClassName(Driver.class.getName());
config.setJdbcUrl(strategy == StoreStrategy.DISK ? DISK_JDBC_URL : MEMORY_JDBC_URL);
config.setAutoCommit(true);
// 池中最小空闲连接数量
@ -46,6 +52,14 @@ public class ConnectionFactory {
// 池中最大连接数量
config.setMaximumPoolSize(32);
dataSource = new HikariDataSource(config);
log.info("[OmsDatasource] init h2 datasource successfully, use url: {}", config.getJdbcUrl());
// JVM 关闭时删除数据库文件
try {
FileUtils.forceDeleteOnExit(new File(H2_PATH));
}catch (Exception ignore) {
}
}
}
return dataSource;

View File

@ -36,7 +36,7 @@ public class TaskPersistenceService {
private TaskPersistenceService() {
}
private TaskDAO taskDAO = new TaskDAOImpl();
private final TaskDAO taskDAO = new TaskDAOImpl();
public void init() throws Exception {
if (initialized) {
@ -207,8 +207,8 @@ public class TaskPersistenceService {
Map<TaskStatus, Long> result = Maps.newHashMap();
dbRES.forEach(row -> {
// H2 数据库都是大写...
int status = Integer.parseInt(String.valueOf(row.get("STATUS")));
long num = Long.parseLong(String.valueOf(row.get("NUM")));
int status = Integer.parseInt(String.valueOf(row.get("status")));
long num = Long.parseLong(String.valueOf(row.get("num")));
result.put(TaskStatus.of(status), num);
});
return result;
@ -238,10 +238,10 @@ public class TaskPersistenceService {
try {
SimpleTaskQuery query = genKeyQuery(instanceId, taskId);
query.setQueryContent("STATUS");
query.setQueryContent("status");
return execute(() -> {
List<Map<String, Object>> rows = taskDAO.simpleQueryPlus(query);
return Optional.of(TaskStatus.of((int) rows.get(0).get("STATUS")));
return Optional.of(TaskStatus.of((int) rows.get(0).get("status")));
});
}catch (Exception e) {
log.error("[TaskPersistenceService] getTaskStatus failed, instanceId={},taskId={}.", instanceId, taskId, e);

View File

@ -0,0 +1,78 @@
package com.github.kfcfans.powerjob.function;
import com.github.kfcfans.powerjob.common.model.SystemMetrics;
import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SystemInfoUtils;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.OperatingSystemMXBean;
import java.util.Collections;
import java.util.List;
/**
* 测试监控指标
*
* @author tjq
* @since 2020/8/1
*/
public class MonitorTest {
private static final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();
private static final Runtime runtime = Runtime.getRuntime();
@Test
public void testGetSystemLoadAverage() {
for (int i = 0; i < 10000; i++) {
double average = osMXBean.getSystemLoadAverage();
System.out.println(average);
System.out.println(average / osMXBean.getAvailableProcessors());
try {
Thread.sleep(1000);
}catch (Exception ignore) {
}
}
}
@Test
public void testListDisk() {
Stopwatch sw = Stopwatch.createStarted();
SystemMetrics systemMetrics = SystemInfoUtils.getSystemMetrics();
System.out.println(JsonUtils.toJSONString(systemMetrics));
System.out.println(sw.stop());
Stopwatch sw2 = Stopwatch.createStarted();
System.out.println(systemMetrics.calculateScore());
System.out.println(sw2.stop());
}
@Test
public void testMemory() {
System.out.println("- used:" + (runtime.totalMemory() - runtime.freeMemory()));
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
System.out.println("heap used: " + memoryMXBean.getHeapMemoryUsage());
System.out.println("noheap used: " + memoryMXBean.getNonHeapMemoryUsage());
}
@Test
public void testFetchMetrics() {
SystemMetrics systemMetrics = SystemInfoUtils.getSystemMetrics();
System.out.println(JsonUtils.toJSONString(systemMetrics));
}
@Test
public void testSortMetrics() {
SystemMetrics high = new SystemMetrics();
high.setScore(100);
SystemMetrics low = new SystemMetrics();
low.setScore(1);
List<SystemMetrics> list = Lists.newArrayList(high, low);
list.sort((o1, o2) -> o2.calculateScore() - o1.calculateScore());
System.out.println(list);
Collections.sort(list);
System.out.println(list);
}
}