merge: fix-k8s-postgresql by tanwenhai

This commit is contained in:
tjq 2020-12-19 22:10:15 +08:00
commit 9f2d3134d8
13 changed files with 151 additions and 11 deletions

View File

@ -119,6 +119,11 @@
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>

View File

@ -5,6 +5,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import org.hibernate.annotations.Type;
import javax.persistence.*;
import java.util.Date;
@ -36,6 +37,7 @@ public class InstanceInfoDO {
// 任务实例参数
@Lob
@Column
@Type(type = TypeDefConstant.STRING_TYPE)
private String instanceParams;
// 该任务实例的类型普通/工作流InstanceType
@ -49,6 +51,7 @@ public class InstanceInfoDO {
// 执行结果允许存储稍大的结果
@Lob
@Column
@Type(type = TypeDefConstant.STRING_TYPE)
private String result;
// 预计触发时间
private Long expectedTriggerTime;

View File

@ -5,6 +5,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import org.hibernate.annotations.Type;
import javax.persistence.*;
import java.util.Date;
@ -52,6 +53,7 @@ public class JobInfoDO {
// 执行器信息可能需要存储整个脚本文件
@Lob
@Column
@Type(type = TypeDefConstant.STRING_TYPE)
private String processorInfo;
/* ************************** 运行时配置 ************************** */

View File

@ -0,0 +1,9 @@
package com.github.kfcfans.powerjob.server.persistence.core.model;
/**
* @see package-info.java
* @author user
*/
public final class TypeDefConstant {
public static final String STRING_TYPE = "string-type";
}

View File

@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import org.hibernate.annotations.Type;
import javax.persistence.*;
import java.util.Date;
@ -35,6 +36,7 @@ public class WorkflowInfoDO {
// 工作流的DAG图信息点线式DAG的json
@Lob
@Column
@Type(type = TypeDefConstant.STRING_TYPE)
private String peDAG;
/* ************************** 定时参数 ************************** */

View File

@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import org.hibernate.annotations.Type;
import javax.persistence.*;
import java.util.Date;
@ -39,13 +40,16 @@ public class WorkflowInstanceInfoDO {
// 工作流启动参数
@Lob
@Column
@Type(type = TypeDefConstant.STRING_TYPE)
private String wfInitParams;
@Lob
@Column
@Type(type = TypeDefConstant.STRING_TYPE)
private String dag;
@Lob
@Column
@Type(type = TypeDefConstant.STRING_TYPE)
private String result;
// 预计触发时间

View File

@ -0,0 +1,7 @@
@TypeDefs({
@TypeDef(name = TypeDefConstant.STRING_TYPE, typeClass = org.hibernate.type.StringType.class)
})
package com.github.kfcfans.powerjob.server.persistence.core.model;
import org.hibernate.annotations.TypeDef;
import org.hibernate.annotations.TypeDefs;

View File

@ -2,6 +2,11 @@ package com.github.kfcfans.powerjob.server.persistence.core.repository;
import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Lock;
import org.springframework.data.jpa.repository.Query;
import javax.persistence.LockModeType;
import java.util.List;
/**
* 服务器信息 数据操作层
@ -11,4 +16,8 @@ import org.springframework.data.jpa.repository.JpaRepository;
*/
public interface ServerInfoRepository extends JpaRepository<ServerInfoDO, Long> {
ServerInfoDO findByIp(String ip);
@Query("select t from ServerInfoDO as t order by t.id asc")
@Lock(LockModeType.PESSIMISTIC_WRITE)
List<ServerInfoDO> findAllAndLockTable();
}

View File

@ -0,0 +1,39 @@
package com.github.kfcfans.powerjob.server.service.id;
import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository;
/**
* @author user
*/
public class DefaultServerIdProvider implements ServerIdProvider {
private final ServerInfoRepository serverInfoRepository;
private volatile Long id;
public DefaultServerIdProvider(ServerInfoRepository serverInfoRepository) {
this.serverInfoRepository = serverInfoRepository;
}
@Override
public long serverId() {
if (id == null) {
synchronized (this) {
if (id == null) {
String ip = NetUtils.getLocalHost();
ServerInfoDO server = serverInfoRepository.findByIp(ip);
if (server == null) {
ServerInfoDO newServerInfo = new ServerInfoDO(ip);
server = serverInfoRepository.saveAndFlush(newServerInfo);
}
id = server.getId();
}
}
}
return id;
}
}

View File

@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
/**
@ -24,18 +25,10 @@ public class IdGenerateService {
private static final int DATA_CENTER_ID = 0;
@Autowired
public IdGenerateService(ServerInfoRepository serverInfoRepository) {
String ip = NetUtils.getLocalHost();
ServerInfoDO server = serverInfoRepository.findByIp(ip);
if (server == null) {
ServerInfoDO newServerInfo = new ServerInfoDO(ip);
server = serverInfoRepository.saveAndFlush(newServerInfo);
}
Long id = server.getId();
public IdGenerateService(@Qualifier("serverIdProvider") ServerIdProvider serverIdProvider) {
long id = serverIdProvider.serverId();
snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);
String ip = NetUtils.getLocalHost();
log.info("[IdGenerateService] init snowflake for server(address={}) by machineId({}).", ip, id);
}

View File

@ -0,0 +1,12 @@
package com.github.kfcfans.powerjob.server.service.id;
/**
* @author user
*/
public interface ServerIdProvider {
/**
* get number for IdGenerateService
* @return serverId
*/
long serverId();
}

View File

@ -0,0 +1,25 @@
package com.github.kfcfans.powerjob.server.service.id;
import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author user
*/
@Configuration
public class ServerIdProviderConfiguration {
@ConditionalOnProperty(prefix = "oms.server-id", name = "provider", havingValue = "hostname")
@Bean(name = "serverIdProvider")
public ServerIdProvider statefulSetServerIdProvider() {
return new StatefulSetServerIdProvider();
}
@ConditionalOnMissingBean(ServerIdProvider.class)
@Bean(name = "serverIdProvider")
public ServerIdProvider defaultServerIdProvider(ServerInfoRepository serverInfoRepository) {
return new DefaultServerIdProvider(serverInfoRepository);
}
}

View File

@ -0,0 +1,30 @@
package com.github.kfcfans.powerjob.server.service.id;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author user
*/
public class StatefulSetServerIdProvider implements ServerIdProvider {
/**
* xxx-1,aa-bb-2
*/
private static final Pattern HOSTNAME_PATTERN = Pattern.compile("^.*-([0-9]+)$");
@Override
public long serverId() {
try {
String hostname = InetAddress.getLocalHost().getHostName();
Matcher matcher = HOSTNAME_PATTERN.matcher(hostname);
if (matcher.matches()) {
return Long.parseLong(matcher.group(1));
}
throw new RuntimeException(String.format("hostname=%s not match %s", hostname, HOSTNAME_PATTERN.toString()));
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
}