mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
use java config to init jpa
This commit is contained in:
parent
8382e71da0
commit
fcacd96d37
@ -17,9 +17,8 @@
|
||||
<swagger.version>2.9.2</swagger.version>
|
||||
<springboot.version>2.2.6.RELEASE</springboot.version>
|
||||
<oms.common.version>1.0.0</oms.common.version>
|
||||
<hikaricp.version>3.4.2</hikaricp.version>
|
||||
<mysql.version>8.0.19</mysql.version>
|
||||
<curator.version>4.3.0</curator.version>
|
||||
<h2.db.version>1.4.200</h2.db.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
@ -31,19 +30,18 @@
|
||||
<version>${oms.common.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- HikariCP -->
|
||||
<dependency>
|
||||
<groupId>com.zaxxer</groupId>
|
||||
<artifactId>HikariCP</artifactId>
|
||||
<version>${hikaricp.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
|
||||
<!-- mysql -->
|
||||
<dependency>
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
<version>${mysql.version}</version>
|
||||
</dependency>
|
||||
<!-- h2 database -->
|
||||
<dependency>
|
||||
<groupId>com.h2database</groupId>
|
||||
<artifactId>h2</artifactId>
|
||||
<version>${h2.db.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- SpringBoot -->
|
||||
<dependency>
|
||||
|
@ -0,0 +1,82 @@
|
||||
package com.github.kfcfans.oms.server.persistence.config;
|
||||
|
||||
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
|
||||
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
|
||||
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
|
||||
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
|
||||
import org.springframework.orm.jpa.JpaTransactionManager;
|
||||
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.annotation.EnableTransactionManagement;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.sql.DataSource;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 核心数据库 JPA 配置
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/27
|
||||
*/
|
||||
@Configuration
|
||||
@EnableTransactionManagement
|
||||
@EnableJpaRepositories(
|
||||
// repository包名
|
||||
basePackages = CoreJpaConfig.PACKAGES,
|
||||
// 实体管理bean名称
|
||||
entityManagerFactoryRef = "coreEntityManagerFactory",
|
||||
// 事务管理bean名称
|
||||
transactionManagerRef = "coreTransactionManager"
|
||||
)
|
||||
public class CoreJpaConfig {
|
||||
|
||||
@Resource(name = "omsCoreDatasource")
|
||||
private DataSource omsCoreDatasource;
|
||||
|
||||
public static final String PACKAGES = "com.github.kfcfans.oms.server.persistence.core";
|
||||
|
||||
/**
|
||||
* 生成配置文件,包括 JPA配置文件和Hibernate配置文件,相当于一下三个配置
|
||||
* spring.jpa.show-sql=false
|
||||
* spring.jpa.open-in-view=false
|
||||
* spring.jpa.hibernate.ddl-auto=update
|
||||
*
|
||||
* @return 配置Map
|
||||
*/
|
||||
private Map<String, Object> genDatasourceProperties() {
|
||||
|
||||
JpaProperties jpaProperties = new JpaProperties();
|
||||
jpaProperties.setOpenInView(false);
|
||||
jpaProperties.setShowSql(false);
|
||||
|
||||
HibernateProperties hibernateProperties = new HibernateProperties();
|
||||
hibernateProperties.setDdlAuto("update");
|
||||
return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
|
||||
}
|
||||
|
||||
@Primary
|
||||
@Bean(name = "coreEntityManagerFactory")
|
||||
public LocalContainerEntityManagerFactoryBean initCoreEntityManagerFactory(EntityManagerFactoryBuilder builder) {
|
||||
|
||||
return builder
|
||||
.dataSource(omsCoreDatasource)
|
||||
.properties(genDatasourceProperties())
|
||||
.packages(PACKAGES)
|
||||
.persistenceUnit("corePersistenceUnit")
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Primary
|
||||
@Bean(name = "coreTransactionManager")
|
||||
public PlatformTransactionManager initCoreTransactionManager(EntityManagerFactoryBuilder builder) {
|
||||
return new JpaTransactionManager(Objects.requireNonNull(initCoreEntityManagerFactory(builder).getObject()));
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,10 @@
|
||||
package com.github.kfcfans.oms.server.persistence.config;
|
||||
|
||||
/**
|
||||
* 本地H2数据库配置
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/27
|
||||
*/
|
||||
public class LocalJpaConfig {
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
package com.github.kfcfans.oms.server.persistence.config;
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.boot.jdbc.DataSourceBuilder;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
|
||||
/**
|
||||
* 多重数据源配置
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/27
|
||||
*/
|
||||
@Configuration
|
||||
public class MultiDatasourceConfig {
|
||||
|
||||
private static final String H2_JDBC_URL = "jdbc:h2:file:~/oms/h2/oms_server_db";
|
||||
|
||||
@Primary
|
||||
@Bean("omsCoreDatasource")
|
||||
@ConfigurationProperties(prefix = "spring.datasource.core")
|
||||
public DataSource initOmsCoreDatasource() {
|
||||
return DataSourceBuilder.create().build();
|
||||
}
|
||||
|
||||
@Bean("omsLocalDatasource")
|
||||
public DataSource initOmsLocalDatasource() {
|
||||
|
||||
HikariConfig config = new HikariConfig();
|
||||
config.setDriverClassName("org.h2.Driver");
|
||||
config.setJdbcUrl(H2_JDBC_URL);
|
||||
config.setAutoCommit(true);
|
||||
// 池中最小空闲连接数量
|
||||
config.setMinimumIdle(4);
|
||||
// 池中最大连接数量
|
||||
config.setMaximumPoolSize(32);
|
||||
return new HikariDataSource(config);
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package com.github.kfcfans.oms.server.persistence.model;
|
||||
package com.github.kfcfans.oms.server.persistence.core.model;
|
||||
|
||||
import lombok.Data;
|
||||
|
@ -1,4 +1,4 @@
|
||||
package com.github.kfcfans.oms.server.persistence.model;
|
||||
package com.github.kfcfans.oms.server.persistence.core.model;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
@ -18,7 +18,7 @@ import java.util.Date;
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Table(name = "instance_log", indexes = {@Index(columnList = "jobId"), @Index(columnList = "appId")})
|
||||
public class InstanceLogDO {
|
||||
public class InstanceInfoDO {
|
||||
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||
@ -53,7 +53,7 @@ public class InstanceLogDO {
|
||||
|
||||
|
||||
// 针对 只查询 jobId 的情况
|
||||
public InstanceLogDO(Long jobId) {
|
||||
public InstanceInfoDO(Long jobId) {
|
||||
this.jobId = jobId;
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package com.github.kfcfans.oms.server.persistence.model;
|
||||
package com.github.kfcfans.oms.server.persistence.core.model;
|
||||
|
||||
|
||||
import lombok.AllArgsConstructor;
|
@ -1,4 +1,4 @@
|
||||
package com.github.kfcfans.oms.server.persistence.model;
|
||||
package com.github.kfcfans.oms.server.persistence.core.model;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
@ -1,4 +1,4 @@
|
||||
package com.github.kfcfans.oms.server.persistence.model;
|
||||
package com.github.kfcfans.oms.server.persistence.core.model;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
@ -1,4 +1,4 @@
|
||||
package com.github.kfcfans.oms.server.persistence.model;
|
||||
package com.github.kfcfans.oms.server.persistence.core.model;
|
||||
|
||||
import lombok.Data;
|
||||
|
@ -1,6 +1,6 @@
|
||||
package com.github.kfcfans.oms.server.persistence.repository;
|
||||
package com.github.kfcfans.oms.server.persistence.core.repository;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
import java.util.List;
|
@ -1,6 +1,6 @@
|
||||
package com.github.kfcfans.oms.server.persistence.repository;
|
||||
package com.github.kfcfans.oms.server.persistence.core.repository;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.google.errorprone.annotations.CanIgnoreReturnValue;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
@ -18,14 +18,14 @@ import java.util.List;
|
||||
* @author tjq
|
||||
* @since 2020/4/1
|
||||
*/
|
||||
public interface InstanceLogRepository extends JpaRepository<InstanceLogDO, Long> {
|
||||
public interface InstanceInfoRepository extends JpaRepository<InstanceInfoDO, Long> {
|
||||
|
||||
/**
|
||||
* 统计当前JOB有多少实例正在运行
|
||||
*/
|
||||
long countByJobIdAndStatusIn(long jobId, List<Integer> status);
|
||||
|
||||
List<InstanceLogDO> findByJobIdAndStatusIn(long jobId, List<Integer> status);
|
||||
List<InstanceInfoDO> findByJobIdAndStatusIn(long jobId, List<Integer> status);
|
||||
|
||||
|
||||
/**
|
||||
@ -56,16 +56,16 @@ public interface InstanceLogRepository extends JpaRepository<InstanceLogDO, Long
|
||||
|
||||
// 状态检查三兄弟,对应 WAITING_DISPATCH 、 WAITING_WORKER_RECEIVE 和 RUNNING 三阶段
|
||||
// 数据量一般不大,就不单独写SQL优化 IO 了
|
||||
List<InstanceLogDO> findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> jobIds, int status, long time);
|
||||
List<InstanceLogDO> findByAppIdInAndStatusAndActualTriggerTimeLessThan(List<Long> jobIds, int status, long time);
|
||||
List<InstanceLogDO> findByAppIdInAndStatusAndGmtModifiedBefore(List<Long> jobIds, int status, Date time);
|
||||
List<InstanceInfoDO> findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> jobIds, int status, long time);
|
||||
List<InstanceInfoDO> findByAppIdInAndStatusAndActualTriggerTimeLessThan(List<Long> jobIds, int status, long time);
|
||||
List<InstanceInfoDO> findByAppIdInAndStatusAndGmtModifiedBefore(List<Long> jobIds, int status, Date time);
|
||||
|
||||
InstanceLogDO findByInstanceId(long instanceId);
|
||||
InstanceInfoDO findByInstanceId(long instanceId);
|
||||
|
||||
Page<InstanceLogDO> findByAppId(long appId, Pageable pageable);
|
||||
Page<InstanceLogDO> findByJobId(long jobId, Pageable pageable);
|
||||
Page<InstanceInfoDO> findByAppId(long appId, Pageable pageable);
|
||||
Page<InstanceInfoDO> findByJobId(long jobId, Pageable pageable);
|
||||
// 只会有一条数据,只是为了统一
|
||||
Page<InstanceLogDO> findByInstanceId(long instanceId, Pageable pageable);
|
||||
Page<InstanceInfoDO> findByInstanceId(long instanceId, Pageable pageable);
|
||||
|
||||
// 数据统计
|
||||
long countByAppIdAndStatus(long appId, int status);
|
@ -1,12 +1,11 @@
|
||||
package com.github.kfcfans.oms.server.persistence.repository;
|
||||
package com.github.kfcfans.oms.server.persistence.core.repository;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
@ -1,6 +1,6 @@
|
||||
package com.github.kfcfans.oms.server.persistence.repository;
|
||||
package com.github.kfcfans.oms.server.persistence.core.repository;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.OmsLockDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.OmsLockDO;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Modifying;
|
||||
import org.springframework.data.jpa.repository.Query;
|
@ -1,6 +1,6 @@
|
||||
package com.github.kfcfans.oms.server.persistence.repository;
|
||||
package com.github.kfcfans.oms.server.persistence.core.repository;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.ServerInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.ServerInfoDO;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
/**
|
@ -1,6 +1,6 @@
|
||||
package com.github.kfcfans.oms.server.persistence.repository;
|
||||
package com.github.kfcfans.oms.server.persistence.core.repository;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.UserInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.UserInfoDO;
|
||||
import org.springframework.data.jpa.repository.JpaRepository;
|
||||
|
||||
import java.util.List;
|
@ -0,0 +1,35 @@
|
||||
package com.github.kfcfans.oms.server.persistence.local;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import javax.persistence.*;
|
||||
|
||||
/**
|
||||
* 本地的运行时日志
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/27
|
||||
*/
|
||||
@Data
|
||||
@Entity
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Table(name = "local_instance_log", indexes = {@Index(columnList = "instanceId")})
|
||||
public class LocalInstanceLogDO {
|
||||
|
||||
@Id
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||
private Long id;
|
||||
|
||||
private Long instanceId;
|
||||
/**
|
||||
* 日志时间
|
||||
*/
|
||||
private Long timestamp;
|
||||
/**
|
||||
* 日志内容
|
||||
*/
|
||||
private String content;
|
||||
}
|
@ -1,9 +1,9 @@
|
||||
package com.github.kfcfans.oms.server.service;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -26,7 +26,7 @@ public class CacheService {
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
private final Cache<Long, String> jobId2JobNameCache;
|
||||
private final Cache<Long, Long> instanceId2AppId;
|
||||
@ -68,7 +68,7 @@ public class CacheService {
|
||||
return instanceId2AppId.get(instanceId, () -> {
|
||||
// 内部记录数据库异常
|
||||
try {
|
||||
InstanceLogDO instanceLog = instanceLogRepository.findByInstanceId(instanceId);
|
||||
InstanceInfoDO instanceLog = instanceInfoRepository.findByInstanceId(instanceId);
|
||||
if (instanceLog != null) {
|
||||
return instanceLog.getAppId();
|
||||
}
|
||||
|
@ -4,8 +4,8 @@ import akka.actor.ActorSelection;
|
||||
import com.github.kfcfans.common.*;
|
||||
import com.github.kfcfans.common.request.ServerScheduleJobReq;
|
||||
import com.github.kfcfans.oms.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
|
||||
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
|
||||
import com.google.common.base.Splitter;
|
||||
@ -35,7 +35,7 @@ import static com.github.kfcfans.common.InstanceStatus.*;
|
||||
public class DispatchService {
|
||||
|
||||
@Resource
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
private static final Splitter commaSplitter = Splitter.on(",");
|
||||
|
||||
@ -55,13 +55,13 @@ public class DispatchService {
|
||||
log.info("[DispatchService] start to dispatch job: {}.", jobInfo);
|
||||
// 查询当前运行的实例数
|
||||
long current = System.currentTimeMillis();
|
||||
long runningInstanceCount = instanceLogRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus);
|
||||
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus);
|
||||
|
||||
// 超出最大同时运行限制,不执行调度
|
||||
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
|
||||
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
|
||||
log.warn("[DispatchService] cancel dispatch job(jobId={}) due to too much instance(num={}) is running.", jobId, runningInstanceCount);
|
||||
instanceLogRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result);
|
||||
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -84,7 +84,7 @@ public class DispatchService {
|
||||
if (CollectionUtils.isEmpty(finalWorkers)) {
|
||||
String clusterStatusDescription = WorkerManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId());
|
||||
log.warn("[DispatchService] cancel dispatch job(jobId={}) due to no worker available, clusterStatus is {}.", jobId, clusterStatusDescription);
|
||||
instanceLogRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE);
|
||||
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -122,6 +122,6 @@ public class DispatchService {
|
||||
log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString());
|
||||
|
||||
// 修改状态
|
||||
instanceLogRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress);
|
||||
instanceInfoRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress);
|
||||
}
|
||||
}
|
||||
|
@ -3,11 +3,10 @@ package com.github.kfcfans.oms.server.service;
|
||||
import com.github.kfcfans.common.InstanceStatus;
|
||||
import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.server.common.constans.JobStatus;
|
||||
import com.github.kfcfans.oms.server.common.utils.CronExpression;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.id.IdGenerateService;
|
||||
import com.github.kfcfans.oms.server.service.instance.InstanceService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -39,7 +38,7 @@ public class JobService {
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
/**
|
||||
* 手动立即运行某个任务
|
||||
@ -58,7 +57,7 @@ public class JobService {
|
||||
public long runJob(JobInfoDO jobInfo, String instanceParams) {
|
||||
long instanceId = idGenerateService.allocate();
|
||||
|
||||
InstanceLogDO executeLog = new InstanceLogDO();
|
||||
InstanceInfoDO executeLog = new InstanceInfoDO();
|
||||
executeLog.setJobId(jobInfo.getId());
|
||||
executeLog.setAppId(jobInfo.getAppId());
|
||||
executeLog.setInstanceId(instanceId);
|
||||
@ -67,7 +66,7 @@ public class JobService {
|
||||
executeLog.setGmtCreate(new Date());
|
||||
executeLog.setGmtModified(executeLog.getGmtCreate());
|
||||
|
||||
instanceLogRepository.saveAndFlush(executeLog);
|
||||
instanceInfoRepository.saveAndFlush(executeLog);
|
||||
dispatchService.dispatch(jobInfo, executeLog.getInstanceId(), 0, instanceParams);
|
||||
return instanceId;
|
||||
}
|
||||
@ -108,7 +107,7 @@ public class JobService {
|
||||
if (timeExpressionType == TimeExpressionType.CRON || timeExpressionType == TimeExpressionType.API) {
|
||||
return;
|
||||
}
|
||||
List<InstanceLogDO> executeLogs = instanceLogRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus);
|
||||
List<InstanceInfoDO> executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus);
|
||||
if (CollectionUtils.isEmpty(executeLogs)) {
|
||||
return;
|
||||
}
|
||||
|
@ -1,10 +1,9 @@
|
||||
package com.github.kfcfans.oms.server.service.alarm;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
@ -20,7 +19,7 @@ public class AlarmService {
|
||||
|
||||
private static List<Alarmable> alarmableList = Lists.newLinkedList();
|
||||
|
||||
public static void alarm(JobInfoDO jobInfo, InstanceLogDO instanceLog) {
|
||||
public static void alarm(JobInfoDO jobInfo, InstanceInfoDO instanceLog) {
|
||||
if (CollectionUtils.isEmpty(alarmableList)) {
|
||||
return;
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package com.github.kfcfans.oms.server.service.alarm;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
|
||||
|
||||
/**
|
||||
* 报警接口
|
||||
@ -11,5 +11,5 @@ import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
*/
|
||||
public interface Alarmable {
|
||||
|
||||
void alarm(JobInfoDO jobInfo, InstanceLogDO instanceLog);
|
||||
void alarm(JobInfoDO jobInfo, InstanceInfoDO instanceLog);
|
||||
}
|
||||
|
@ -5,8 +5,8 @@ import akka.pattern.Patterns;
|
||||
import com.github.kfcfans.common.response.AskResponse;
|
||||
import com.github.kfcfans.oms.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.oms.server.akka.requests.Ping;
|
||||
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.lock.LockService;
|
||||
import com.google.common.collect.Sets;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -1,8 +1,8 @@
|
||||
package com.github.kfcfans.oms.server.service.id;
|
||||
|
||||
import com.github.kfcfans.common.utils.NetUtils;
|
||||
import com.github.kfcfans.oms.server.persistence.model.ServerInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.ServerInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.ServerInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.ServerInfoRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
@ -4,10 +4,10 @@ import com.github.kfcfans.common.InstanceStatus;
|
||||
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
|
||||
import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.server.common.utils.SpringUtils;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.DispatchService;
|
||||
import com.github.kfcfans.oms.server.service.timing.schedule.HashedWheelTimerHolder;
|
||||
import com.google.common.collect.Maps;
|
||||
@ -35,7 +35,7 @@ public class InstanceManager {
|
||||
|
||||
// Spring Bean
|
||||
private static DispatchService dispatchService;
|
||||
private static InstanceLogRepository instanceLogRepository;
|
||||
private static InstanceInfoRepository instanceInfoRepository;
|
||||
private static JobInfoRepository jobInfoRepository;
|
||||
|
||||
/**
|
||||
@ -92,11 +92,11 @@ public class InstanceManager {
|
||||
// FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行)
|
||||
// 综上,直接把 status 和 runningNum 同步到DB即可
|
||||
if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) {
|
||||
getInstanceLogRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum());
|
||||
getInstanceInfoRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum());
|
||||
return;
|
||||
}
|
||||
|
||||
InstanceLogDO updateEntity = getInstanceLogRepository().findByInstanceId(instanceId);
|
||||
InstanceInfoDO updateEntity = getInstanceInfoRepository().findByInstanceId(instanceId);
|
||||
updateEntity.setStatus(newStatus.getV());
|
||||
updateEntity.setGmtModified(new Date());
|
||||
|
||||
@ -131,7 +131,7 @@ public class InstanceManager {
|
||||
}
|
||||
|
||||
// 同步状态变更信息到数据库
|
||||
getInstanceLogRepository().saveAndFlush(updateEntity);
|
||||
getInstanceInfoRepository().saveAndFlush(updateEntity);
|
||||
|
||||
// 清除已完成的实例信息
|
||||
if (finished) {
|
||||
@ -141,15 +141,15 @@ public class InstanceManager {
|
||||
}
|
||||
}
|
||||
|
||||
private static InstanceLogRepository getInstanceLogRepository() {
|
||||
while (instanceLogRepository == null) {
|
||||
private static InstanceInfoRepository getInstanceInfoRepository() {
|
||||
while (instanceInfoRepository == null) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
}catch (Exception ignore) {
|
||||
}
|
||||
instanceLogRepository = SpringUtils.getBean(InstanceLogRepository.class);
|
||||
instanceInfoRepository = SpringUtils.getBean(InstanceInfoRepository.class);
|
||||
}
|
||||
return instanceLogRepository;
|
||||
return instanceInfoRepository;
|
||||
}
|
||||
|
||||
private static JobInfoRepository getJobInfoRepository() {
|
||||
|
@ -10,8 +10,8 @@ import com.github.kfcfans.common.request.ServerQueryInstanceStatusReq;
|
||||
import com.github.kfcfans.common.request.ServerStopInstanceReq;
|
||||
import com.github.kfcfans.common.response.AskResponse;
|
||||
import com.github.kfcfans.oms.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
@ -36,7 +36,7 @@ import static com.github.kfcfans.common.InstanceStatus.STOPPED;
|
||||
public class InstanceService {
|
||||
|
||||
@Resource
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
/**
|
||||
* 停止任务实例
|
||||
@ -46,29 +46,29 @@ public class InstanceService {
|
||||
|
||||
try {
|
||||
|
||||
InstanceLogDO instanceLogDO = instanceLogRepository.findByInstanceId(instanceId);
|
||||
if (instanceLogDO == null) {
|
||||
InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(instanceId);
|
||||
if (instanceInfoDO == null) {
|
||||
log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId);
|
||||
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
|
||||
}
|
||||
|
||||
// 判断状态,只有运行中才能停止
|
||||
if (!InstanceStatus.generalizedRunningStatus.contains(instanceLogDO.getStatus())) {
|
||||
if (!InstanceStatus.generalizedRunningStatus.contains(instanceInfoDO.getStatus())) {
|
||||
throw new IllegalArgumentException("can't stop finished instance!");
|
||||
}
|
||||
|
||||
// 更新数据库,将状态置为停止
|
||||
instanceLogDO.setStatus(STOPPED.getV());
|
||||
instanceLogDO.setGmtModified(new Date());
|
||||
instanceLogDO.setFinishedTime(System.currentTimeMillis());
|
||||
instanceLogDO.setResult(SystemInstanceResult.STOPPED_BY_USER);
|
||||
instanceLogRepository.saveAndFlush(instanceLogDO);
|
||||
instanceInfoDO.setStatus(STOPPED.getV());
|
||||
instanceInfoDO.setGmtModified(new Date());
|
||||
instanceInfoDO.setFinishedTime(System.currentTimeMillis());
|
||||
instanceInfoDO.setResult(SystemInstanceResult.STOPPED_BY_USER);
|
||||
instanceInfoRepository.saveAndFlush(instanceInfoDO);
|
||||
|
||||
/*
|
||||
不可靠通知停止 TaskTracker
|
||||
假如没有成功关闭,之后 TaskTracker 会再次 reportStatus,按照流程,instanceLog 会被更新为 RUNNING,开发者可以再次手动关闭
|
||||
*/
|
||||
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(instanceLogDO.getTaskTrackerAddress());
|
||||
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(instanceInfoDO.getTaskTrackerAddress());
|
||||
ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId);
|
||||
taskTrackerActor.tell(req, null);
|
||||
|
||||
@ -88,12 +88,12 @@ public class InstanceService {
|
||||
* @return 任务实例的状态
|
||||
*/
|
||||
public InstanceStatus getInstanceStatus(Long instanceId) {
|
||||
InstanceLogDO instanceLogDO = instanceLogRepository.findByInstanceId(instanceId);
|
||||
if (instanceLogDO == null) {
|
||||
InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(instanceId);
|
||||
if (instanceInfoDO == null) {
|
||||
log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId);
|
||||
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
|
||||
}
|
||||
return InstanceStatus.of(instanceLogDO.getStatus());
|
||||
return InstanceStatus.of(instanceInfoDO.getStatus());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -103,33 +103,33 @@ public class InstanceService {
|
||||
*/
|
||||
public InstanceDetail getInstanceDetail(Long instanceId) {
|
||||
|
||||
InstanceLogDO instanceLogDO = instanceLogRepository.findByInstanceId(instanceId);
|
||||
if (instanceLogDO == null) {
|
||||
InstanceInfoDO instanceInfoDO = instanceInfoRepository.findByInstanceId(instanceId);
|
||||
if (instanceInfoDO == null) {
|
||||
log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId);
|
||||
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
|
||||
}
|
||||
|
||||
InstanceStatus instanceStatus = InstanceStatus.of(instanceLogDO.getStatus());
|
||||
InstanceStatus instanceStatus = InstanceStatus.of(instanceInfoDO.getStatus());
|
||||
|
||||
InstanceDetail detail = new InstanceDetail();
|
||||
detail.setStatus(instanceStatus.getDes());
|
||||
|
||||
// 只要不是运行状态,只需要返回简要信息
|
||||
if (instanceStatus != RUNNING) {
|
||||
BeanUtils.copyProperties(instanceLogDO, detail);
|
||||
BeanUtils.copyProperties(instanceInfoDO, detail);
|
||||
return detail;
|
||||
}
|
||||
|
||||
// 运行状态下,交由 TaskTracker 返回相关信息
|
||||
try {
|
||||
ServerQueryInstanceStatusReq req = new ServerQueryInstanceStatusReq(instanceId);
|
||||
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(instanceLogDO.getTaskTrackerAddress());
|
||||
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(instanceInfoDO.getTaskTrackerAddress());
|
||||
CompletionStage<Object> askCS = Patterns.ask(taskTrackerActor, req, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
|
||||
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||
|
||||
if (askResponse.isSuccess()) {
|
||||
InstanceDetail instanceDetail = askResponse.getData(InstanceDetail.class);
|
||||
instanceDetail.setRunningTimes(instanceLogDO.getRunningTimes());
|
||||
instanceDetail.setRunningTimes(instanceInfoDO.getRunningTimes());
|
||||
return instanceDetail;
|
||||
}else {
|
||||
log.warn("[InstanceService] ask InstanceStatus from TaskTracker failed, the message is {}.", askResponse.getMessage());
|
||||
@ -140,7 +140,7 @@ public class InstanceService {
|
||||
}
|
||||
|
||||
// 失败则返回基础版信息
|
||||
BeanUtils.copyProperties(instanceLogDO, detail);
|
||||
BeanUtils.copyProperties(instanceInfoDO, detail);
|
||||
return detail;
|
||||
}
|
||||
|
||||
|
@ -2,8 +2,8 @@ package com.github.kfcfans.oms.server.service.lock;
|
||||
|
||||
import com.github.kfcfans.common.utils.CommonUtils;
|
||||
import com.github.kfcfans.common.utils.NetUtils;
|
||||
import com.github.kfcfans.oms.server.persistence.model.OmsLockDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.OmsLockDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.OmsLockRepository;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -5,12 +5,12 @@ import com.github.kfcfans.common.SystemInstanceResult;
|
||||
import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.server.common.constans.JobStatus;
|
||||
import com.github.kfcfans.oms.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.DispatchService;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -44,7 +44,7 @@ public class InstanceStatusCheckService {
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
@Resource
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
|
||||
@ -73,7 +73,7 @@ public class InstanceStatusCheckService {
|
||||
|
||||
// 1. 检查等待 WAITING_DISPATCH 状态的任务
|
||||
long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS;
|
||||
List<InstanceLogDO> waitingDispatchInstances = instanceLogRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold);
|
||||
List<InstanceInfoDO> waitingDispatchInstances = instanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold);
|
||||
if (!CollectionUtils.isEmpty(waitingDispatchInstances)) {
|
||||
log.warn("[InstanceStatusCheckService] instances({}) is not triggered as expected.", waitingDispatchInstances);
|
||||
waitingDispatchInstances.forEach(instance -> {
|
||||
@ -92,7 +92,7 @@ public class InstanceStatusCheckService {
|
||||
|
||||
// 2. 检查 WAITING_WORKER_RECEIVE 状态的任务
|
||||
threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS;
|
||||
List<InstanceLogDO> waitingWorkerReceiveInstances = instanceLogRepository.findByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold);
|
||||
List<InstanceInfoDO> waitingWorkerReceiveInstances = instanceInfoRepository.findByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold);
|
||||
if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) {
|
||||
log.warn("[InstanceStatusCheckService] instances({}) did n’t receive any reply from worker.", waitingWorkerReceiveInstances);
|
||||
waitingWorkerReceiveInstances.forEach(instance -> {
|
||||
@ -104,7 +104,7 @@ public class InstanceStatusCheckService {
|
||||
|
||||
// 3. 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败)
|
||||
threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS;
|
||||
List<InstanceLogDO> failedInstances = instanceLogRepository.findByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold));
|
||||
List<InstanceInfoDO> failedInstances = instanceInfoRepository.findByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold));
|
||||
if (!CollectionUtils.isEmpty(failedInstances)) {
|
||||
log.warn("[InstanceStatusCheckService] instances({}) has not received status report for a long time.", failedInstances);
|
||||
failedInstances.forEach(instance -> {
|
||||
@ -134,7 +134,7 @@ public class InstanceStatusCheckService {
|
||||
/**
|
||||
* 处理上报超时而失败的任务实例
|
||||
*/
|
||||
private void updateFailedInstance(InstanceLogDO instance) {
|
||||
private void updateFailedInstance(InstanceInfoDO instance) {
|
||||
|
||||
log.warn("[InstanceStatusCheckService] detected instance(instanceId={},jobId={})'s TaskTracker report timeout,this instance is considered a failure.", instance.getInstanceId(), instance.getJobId());
|
||||
|
||||
@ -142,6 +142,6 @@ public class InstanceStatusCheckService {
|
||||
instance.setFinishedTime(System.currentTimeMillis());
|
||||
instance.setGmtModified(new Date());
|
||||
instance.setResult(SystemInstanceResult.REPORT_TIMEOUT);
|
||||
instanceLogRepository.saveAndFlush(instance);
|
||||
instanceInfoRepository.saveAndFlush(instance);
|
||||
}
|
||||
}
|
||||
|
@ -6,12 +6,12 @@ import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.server.common.utils.CronExpression;
|
||||
import com.github.kfcfans.oms.server.service.JobService;
|
||||
import com.github.kfcfans.oms.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.DispatchService;
|
||||
import com.github.kfcfans.oms.server.service.id.IdGenerateService;
|
||||
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
|
||||
@ -55,7 +55,7 @@ public class JobScheduleService {
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
@Resource
|
||||
private JobService jobService;
|
||||
@ -118,10 +118,10 @@ public class JobScheduleService {
|
||||
Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
|
||||
log.info("[JobScheduleService] These cron jobs will be scheduled: {}.", jobInfos);
|
||||
|
||||
List<InstanceLogDO> executeLogs = Lists.newLinkedList();
|
||||
List<InstanceInfoDO> executeLogs = Lists.newLinkedList();
|
||||
jobInfos.forEach(jobInfoDO -> {
|
||||
|
||||
InstanceLogDO executeLog = new InstanceLogDO();
|
||||
InstanceInfoDO executeLog = new InstanceInfoDO();
|
||||
executeLog.setJobId(jobInfoDO.getId());
|
||||
executeLog.setAppId(jobInfoDO.getAppId());
|
||||
executeLog.setInstanceId(idGenerateService.allocate());
|
||||
@ -134,8 +134,8 @@ public class JobScheduleService {
|
||||
|
||||
jobId2InstanceId.put(executeLog.getJobId(), executeLog.getInstanceId());
|
||||
});
|
||||
instanceLogRepository.saveAll(executeLogs);
|
||||
instanceLogRepository.flush();
|
||||
instanceInfoRepository.saveAll(executeLogs);
|
||||
instanceInfoRepository.flush();
|
||||
|
||||
// 2. 推入时间轮中等待调度执行
|
||||
jobInfos.forEach(jobInfoDO -> {
|
||||
@ -193,7 +193,7 @@ public class JobScheduleService {
|
||||
// 查询所有的秒级任务(只包含ID)
|
||||
List<Long> jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.frequentTypes);
|
||||
// 查询日志记录表中是否存在相关的任务
|
||||
List<Long> runningJobIdList = instanceLogRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.generalizedRunningStatus);
|
||||
List<Long> runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.generalizedRunningStatus);
|
||||
Set<Long> runningJobIdSet = Sets.newHashSet(runningJobIdList);
|
||||
|
||||
List<Long> notRunningJobIds = Lists.newLinkedList();
|
||||
|
@ -1,7 +1,7 @@
|
||||
package com.github.kfcfans.oms.server.web.controller;
|
||||
|
||||
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.common.response.ResultDTO;
|
||||
import com.github.kfcfans.oms.server.web.request.ModifyAppInfoRequest;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -4,8 +4,8 @@ import com.github.kfcfans.common.InstanceStatus;
|
||||
import com.github.kfcfans.common.response.ResultDTO;
|
||||
import com.github.kfcfans.common.model.InstanceDetail;
|
||||
import com.github.kfcfans.oms.server.persistence.PageResult;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.CacheService;
|
||||
import com.github.kfcfans.oms.server.service.instance.InstanceService;
|
||||
import com.github.kfcfans.oms.server.web.request.QueryInstanceRequest;
|
||||
@ -20,7 +20,6 @@ import org.springframework.web.bind.annotation.*;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
||||
/**
|
||||
@ -38,7 +37,7 @@ public class InstanceController {
|
||||
@Resource
|
||||
private CacheService cacheService;
|
||||
@Resource
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
private static final String TIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
|
||||
|
||||
@ -61,19 +60,19 @@ public class InstanceController {
|
||||
|
||||
// 查询全部数据
|
||||
if (request.getJobId() == null && request.getInstanceId() == null) {
|
||||
return ResultDTO.success(convertPage(instanceLogRepository.findByAppId(request.getAppId(), pageable)));
|
||||
return ResultDTO.success(convertPage(instanceInfoRepository.findByAppId(request.getAppId(), pageable)));
|
||||
}
|
||||
|
||||
// 根据JobId查询
|
||||
if (request.getJobId() != null) {
|
||||
return ResultDTO.success(convertPage(instanceLogRepository.findByJobId(request.getJobId(), pageable)));
|
||||
return ResultDTO.success(convertPage(instanceInfoRepository.findByJobId(request.getJobId(), pageable)));
|
||||
}
|
||||
|
||||
// 根据InstanceId查询
|
||||
return ResultDTO.success(convertPage(instanceLogRepository.findByInstanceId(request.getInstanceId(), pageable)));
|
||||
return ResultDTO.success(convertPage(instanceInfoRepository.findByInstanceId(request.getInstanceId(), pageable)));
|
||||
}
|
||||
|
||||
private PageResult<InstanceLogVO> convertPage(Page<InstanceLogDO> page) {
|
||||
private PageResult<InstanceLogVO> convertPage(Page<InstanceInfoDO> page) {
|
||||
List<InstanceLogVO> content = page.getContent().stream().map(instanceLogDO -> {
|
||||
InstanceLogVO instanceLogVO = new InstanceLogVO();
|
||||
BeanUtils.copyProperties(instanceLogDO, instanceLogVO);
|
||||
|
@ -6,9 +6,9 @@ import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.server.common.constans.JobStatus;
|
||||
import com.github.kfcfans.oms.server.common.utils.CronExpression;
|
||||
import com.github.kfcfans.oms.server.persistence.PageResult;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.common.response.ResultDTO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.service.JobService;
|
||||
import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest;
|
||||
import com.github.kfcfans.oms.server.web.request.QueryJobInfoRequest;
|
||||
|
@ -3,8 +3,8 @@ package com.github.kfcfans.oms.server.web.controller;
|
||||
import com.github.kfcfans.common.InstanceStatus;
|
||||
import com.github.kfcfans.common.OpenAPIConstant;
|
||||
import com.github.kfcfans.common.response.ResultDTO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.CacheService;
|
||||
import com.github.kfcfans.oms.server.service.JobService;
|
||||
import com.github.kfcfans.oms.server.service.instance.InstanceService;
|
||||
|
@ -1,8 +1,8 @@
|
||||
package com.github.kfcfans.oms.server.web.controller;
|
||||
|
||||
import com.github.kfcfans.oms.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.service.ha.ServerSelectService;
|
||||
import com.github.kfcfans.common.response.ResultDTO;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
|
@ -10,10 +10,10 @@ import com.github.kfcfans.common.response.ResultDTO;
|
||||
import com.github.kfcfans.common.utils.JsonUtils;
|
||||
import com.github.kfcfans.oms.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.oms.server.akka.requests.FriendQueryWorkerClusterStatusReq;
|
||||
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.AppInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.web.response.SystemOverviewVO;
|
||||
import com.github.kfcfans.oms.server.web.response.WorkerStatusVO;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -46,7 +46,7 @@ public class SystemInfoController {
|
||||
@Resource
|
||||
private JobInfoRepository jobInfoRepository;
|
||||
@Resource
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
@GetMapping("/listWorker")
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
@ -98,10 +98,10 @@ public class SystemInfoController {
|
||||
// 总任务数量
|
||||
overview.setJobCount(jobInfoRepository.countByAppId(appId));
|
||||
// 运行任务数
|
||||
overview.setRunningInstanceCount(instanceLogRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV()));
|
||||
overview.setRunningInstanceCount(instanceInfoRepository.countByAppIdAndStatus(appId, InstanceStatus.RUNNING.getV()));
|
||||
// 近期失败任务数(24H内)
|
||||
Date date = DateUtils.addDays(new Date(), -1);
|
||||
overview.setFailedInstanceCount(instanceLogRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date));
|
||||
overview.setFailedInstanceCount(instanceInfoRepository.countByAppIdAndStatusAndGmtCreateAfter(appId, InstanceStatus.FAILED.getV(), date));
|
||||
|
||||
return ResultDTO.success(overview);
|
||||
}
|
||||
|
@ -1,8 +1,8 @@
|
||||
package com.github.kfcfans.oms.server.web.controller;
|
||||
|
||||
import com.github.kfcfans.common.response.ResultDTO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.UserInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.UserInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.UserInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.UserInfoRepository;
|
||||
import com.github.kfcfans.oms.server.web.request.ModifyUserInfoRequest;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.AllArgsConstructor;
|
||||
|
@ -1,16 +1,14 @@
|
||||
# server port config
|
||||
####### server port config #######
|
||||
server.port=7700
|
||||
|
||||
# db config
|
||||
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
|
||||
####### database config #######
|
||||
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
|
||||
# JDBC配置不支持utf8mb4,需要更改my.conf
|
||||
spring.datasource.url=jdbc:mysql://remotehost:3391/oms?useUnicode=true&characterEncoding=UTF-8
|
||||
spring.datasource.username=root
|
||||
spring.datasource.password=No1Bug2Please3!
|
||||
# Hikari 数据源专用配置
|
||||
spring.datasource.hikari.maximum-pool-size=20
|
||||
spring.datasource.hikari.minimum-idle=5
|
||||
# JPA 相关配置
|
||||
spring.jpa.show-sql=false
|
||||
spring.jpa.hibernate.ddl-auto=update
|
||||
spring.jpa.open-in-view=false
|
||||
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/oms?useUnicode=true&characterEncoding=UTF-8
|
||||
spring.datasource.core.username=root
|
||||
spring.datasource.core.password=No1Bug2Please3!
|
||||
spring.datasource.core.hikari.maximum-pool-size=20
|
||||
spring.datasource.core.hikari.minimum-idle=5
|
||||
|
||||
####### mangoDB config #######
|
||||
|
||||
|
@ -3,12 +3,12 @@ package com.github.kfcfans.oms.server.test;
|
||||
import com.github.kfcfans.common.utils.NetUtils;
|
||||
import com.github.kfcfans.oms.server.common.constans.JobStatus;
|
||||
import com.github.kfcfans.common.TimeExpressionType;
|
||||
import com.github.kfcfans.oms.server.persistence.model.InstanceLogDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.model.OmsLockDO;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.InstanceLogRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.repository.OmsLockRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.model.OmsLockDO;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.oms.server.persistence.core.repository.OmsLockRepository;
|
||||
import org.assertj.core.util.Lists;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
@ -33,7 +33,7 @@ public class RepositoryTest {
|
||||
@Resource
|
||||
private OmsLockRepository omsLockRepository;
|
||||
@Resource
|
||||
private InstanceLogRepository instanceLogRepository;
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
/**
|
||||
* 需要证明批量写入失败后会回滚
|
||||
@ -58,17 +58,17 @@ public class RepositoryTest {
|
||||
|
||||
@Test
|
||||
public void testUpdate() {
|
||||
InstanceLogDO updateEntity = new InstanceLogDO();
|
||||
InstanceInfoDO updateEntity = new InstanceInfoDO();
|
||||
updateEntity.setId(22L);
|
||||
updateEntity.setActualTriggerTime(System.currentTimeMillis());
|
||||
updateEntity.setResult("hahaha");
|
||||
instanceLogRepository.saveAndFlush(updateEntity);
|
||||
instanceInfoRepository.saveAndFlush(updateEntity);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteLogUpdate() {
|
||||
instanceLogRepository.update4TriggerFailed(1586310414570L, 2, 100, System.currentTimeMillis(), System.currentTimeMillis(), "192.168.1.1", "NULL");
|
||||
instanceLogRepository.update4FrequentJob(1586310419650L, 2, 200);
|
||||
instanceInfoRepository.update4TriggerFailed(1586310414570L, 2, 100, System.currentTimeMillis(), System.currentTimeMillis(), "192.168.1.1", "NULL");
|
||||
instanceInfoRepository.update4FrequentJob(1586310419650L, 2, 200);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import com.github.kfcfans.oms.worker.OhMyWorker;
|
||||
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Queues;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.List;
|
||||
@ -20,6 +21,7 @@ import java.util.concurrent.TimeUnit;
|
||||
* @author tjq
|
||||
* @since 2020/4/21
|
||||
*/
|
||||
@Slf4j
|
||||
public class OmsLogHandler {
|
||||
|
||||
private OmsLogHandler() {
|
||||
@ -60,6 +62,7 @@ public class OmsLogHandler {
|
||||
for (int i = 0; i < 1024; i++) {
|
||||
logQueue.remove();
|
||||
}
|
||||
log.warn("[OmsLogHandler] because there is no available server to report logs which leads to queue accumulation, oms discarded some logs.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user