diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 68036a10..080e85d1 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -119,6 +119,11 @@ spring-boot-starter-data-mongodb ${springboot.version} + + org.springframework.boot + spring-boot-starter-actuator + ${springboot.version} + org.springframework.boot spring-boot-starter-mail diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java index 5af69945..a956830c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java @@ -5,6 +5,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; +import org.hibernate.annotations.Type; import javax.persistence.*; import java.util.Date; @@ -36,6 +37,7 @@ public class InstanceInfoDO { // 任务实例参数 @Lob @Column + @Type(type = TypeDefConstant.STRING_TYPE) private String instanceParams; // 该任务实例的类型,普通/工作流(InstanceType) @@ -49,6 +51,7 @@ public class InstanceInfoDO { // 执行结果(允许存储稍大的结果) @Lob @Column + @Type(type = TypeDefConstant.STRING_TYPE) private String result; // 预计触发时间 private Long expectedTriggerTime; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java index b5d2b171..ddccbf7a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java @@ -5,6 +5,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; +import org.hibernate.annotations.Type; import javax.persistence.*; import java.util.Date; @@ -52,6 +53,7 @@ public class JobInfoDO { // 执行器信息(可能需要存储整个脚本文件) @Lob @Column + @Type(type = TypeDefConstant.STRING_TYPE) private String processorInfo; /* ************************** 运行时配置 ************************** */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/TypeDefConstant.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/TypeDefConstant.java new file mode 100644 index 00000000..fbe4b199 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/TypeDefConstant.java @@ -0,0 +1,9 @@ +package com.github.kfcfans.powerjob.server.persistence.core.model; + +/** + * @see package-info.java + * @author user + */ +public final class TypeDefConstant { + public static final String STRING_TYPE = "string-type"; +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java index f3c5d560..ee8c53a8 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java @@ -4,6 +4,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; +import org.hibernate.annotations.Type; import javax.persistence.*; import java.util.Date; @@ -35,6 +36,7 @@ public class WorkflowInfoDO { // 工作流的DAG图信息(点线式DAG的json) @Lob @Column + @Type(type = TypeDefConstant.STRING_TYPE) private String peDAG; /* ************************** 定时参数 ************************** */ diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java index e6f483c4..d6290205 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java @@ -4,6 +4,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.hibernate.annotations.GenericGenerator; +import org.hibernate.annotations.Type; import javax.persistence.*; import java.util.Date; @@ -39,13 +40,16 @@ public class WorkflowInstanceInfoDO { // 工作流启动参数 @Lob @Column + @Type(type = TypeDefConstant.STRING_TYPE) private String wfInitParams; @Lob @Column + @Type(type = TypeDefConstant.STRING_TYPE) private String dag; @Lob @Column + @Type(type = TypeDefConstant.STRING_TYPE) private String result; // 预计触发时间 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/package-info.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/package-info.java new file mode 100644 index 00000000..543fd8a0 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/package-info.java @@ -0,0 +1,7 @@ +@TypeDefs({ + @TypeDef(name = TypeDefConstant.STRING_TYPE, typeClass = org.hibernate.type.StringType.class) +}) +package com.github.kfcfans.powerjob.server.persistence.core.model; + +import org.hibernate.annotations.TypeDef; +import org.hibernate.annotations.TypeDefs; \ No newline at end of file diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ServerInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ServerInfoRepository.java index 615933f2..7f0b70ba 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ServerInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/ServerInfoRepository.java @@ -2,6 +2,11 @@ package com.github.kfcfans.powerjob.server.persistence.core.repository; import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Query; + +import javax.persistence.LockModeType; +import java.util.List; /** * 服务器信息 数据操作层 @@ -11,4 +16,8 @@ import org.springframework.data.jpa.repository.JpaRepository; */ public interface ServerInfoRepository extends JpaRepository { ServerInfoDO findByIp(String ip); + + @Query("select t from ServerInfoDO as t order by t.id asc") + @Lock(LockModeType.PESSIMISTIC_WRITE) + List findAllAndLockTable(); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/DefaultServerIdProvider.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/DefaultServerIdProvider.java new file mode 100644 index 00000000..1c018370 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/DefaultServerIdProvider.java @@ -0,0 +1,39 @@ +package com.github.kfcfans.powerjob.server.service.id; + +import com.github.kfcfans.powerjob.common.utils.NetUtils; +import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository; + +/** + * @author user + */ +public class DefaultServerIdProvider implements ServerIdProvider { + private final ServerInfoRepository serverInfoRepository; + + private volatile Long id; + + public DefaultServerIdProvider(ServerInfoRepository serverInfoRepository) { + this.serverInfoRepository = serverInfoRepository; + } + + @Override + public long serverId() { + if (id == null) { + synchronized (this) { + if (id == null) { + String ip = NetUtils.getLocalHost(); + ServerInfoDO server = serverInfoRepository.findByIp(ip); + + if (server == null) { + ServerInfoDO newServerInfo = new ServerInfoDO(ip); + server = serverInfoRepository.saveAndFlush(newServerInfo); + } + + id = server.getId(); + } + } + } + + return id; + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/IdGenerateService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/IdGenerateService.java index e56462fa..c334d01b 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/IdGenerateService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/IdGenerateService.java @@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; /** @@ -24,18 +25,10 @@ public class IdGenerateService { private static final int DATA_CENTER_ID = 0; @Autowired - public IdGenerateService(ServerInfoRepository serverInfoRepository) { - - String ip = NetUtils.getLocalHost(); - ServerInfoDO server = serverInfoRepository.findByIp(ip); - - if (server == null) { - ServerInfoDO newServerInfo = new ServerInfoDO(ip); - server = serverInfoRepository.saveAndFlush(newServerInfo); - } - - Long id = server.getId(); + public IdGenerateService(@Qualifier("serverIdProvider") ServerIdProvider serverIdProvider) { + long id = serverIdProvider.serverId(); snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id); + String ip = NetUtils.getLocalHost(); log.info("[IdGenerateService] init snowflake for server(address={}) by machineId({}).", ip, id); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/ServerIdProvider.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/ServerIdProvider.java new file mode 100644 index 00000000..f7fcbd2e --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/ServerIdProvider.java @@ -0,0 +1,12 @@ +package com.github.kfcfans.powerjob.server.service.id; + +/** + * @author user + */ +public interface ServerIdProvider { + /** + * get number for IdGenerateService + * @return serverId + */ + long serverId(); +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/ServerIdProviderConfiguration.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/ServerIdProviderConfiguration.java new file mode 100644 index 00000000..e2e6f5c4 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/ServerIdProviderConfiguration.java @@ -0,0 +1,25 @@ +package com.github.kfcfans.powerjob.server.service.id; + +import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author user + */ +@Configuration +public class ServerIdProviderConfiguration { + @ConditionalOnProperty(prefix = "oms.server-id", name = "provider", havingValue = "hostname") + @Bean(name = "serverIdProvider") + public ServerIdProvider statefulSetServerIdProvider() { + return new StatefulSetServerIdProvider(); + } + + @ConditionalOnMissingBean(ServerIdProvider.class) + @Bean(name = "serverIdProvider") + public ServerIdProvider defaultServerIdProvider(ServerInfoRepository serverInfoRepository) { + return new DefaultServerIdProvider(serverInfoRepository); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/StatefulSetServerIdProvider.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/StatefulSetServerIdProvider.java new file mode 100644 index 00000000..9e796e36 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/id/StatefulSetServerIdProvider.java @@ -0,0 +1,30 @@ +package com.github.kfcfans.powerjob.server.service.id; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author user + */ +public class StatefulSetServerIdProvider implements ServerIdProvider { + /** + * xxx-1,aa-bb-2 + */ + private static final Pattern HOSTNAME_PATTERN = Pattern.compile("^.*-([0-9]+)$"); + + @Override + public long serverId() { + try { + String hostname = InetAddress.getLocalHost().getHostName(); + Matcher matcher = HOSTNAME_PATTERN.matcher(hostname); + if (matcher.matches()) { + return Long.parseLong(matcher.group(1)); + } + throw new RuntimeException(String.format("hostname=%s not match %s", hostname, HOSTNAME_PATTERN.toString())); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } +}