部分代码优化

1.邮件通知服务,优雅注入发件人
2.雪花算法,对时钟回拨情况做优化,避免服务直接不可用
3.扫描数据库task,部分代码调整减少重复计算性能消耗
4.部分枚举类,增强代码安全性
5.其它,规范部分代码
This commit is contained in:
ZhangJun 2022-12-17 16:11:56 +08:00 committed by Echo009
parent 5ba4ce5457
commit 3f95ee8a33
8 changed files with 34 additions and 22 deletions

View File

@ -26,8 +26,8 @@ public enum ExecuteType {
MAP_REDUCE(3, "MapReduce"), MAP_REDUCE(3, "MapReduce"),
MAP(4, "Map"); MAP(4, "Map");
int v; private final int v;
String des; private final String des;
public static ExecuteType of(int v) { public static ExecuteType of(int v) {
for (ExecuteType type : values()) { for (ExecuteType type : values()) {

View File

@ -5,6 +5,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import lombok.ToString; import lombok.ToString;
import java.util.Collections;
import java.util.List; import java.util.List;
/** /**
@ -24,13 +25,13 @@ public enum TimeExpressionType {
FIXED_DELAY(4), FIXED_DELAY(4),
WORKFLOW(5); WORKFLOW(5);
int v; private final int v;
public static final List<Integer> FREQUENT_TYPES = Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v); public static final List<Integer> FREQUENT_TYPES = Collections.unmodifiableList(Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v));
/** /**
* 首次计算触发时间时必须计算出一个有效值 * 首次计算触发时间时必须计算出一个有效值
*/ */
public static final List<Integer> INSPECT_TYPES = Lists.newArrayList(CRON.v); public static final List<Integer> INSPECT_TYPES = Collections.unmodifiableList(Lists.newArrayList(CRON.v));
public static TimeExpressionType of(int v) { public static TimeExpressionType of(int v) {
for (TimeExpressionType type : values()) { for (TimeExpressionType type : values()) {

View File

@ -4,6 +4,7 @@ import com.google.common.collect.Lists;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import java.util.Collections;
import java.util.List; import java.util.List;
/** /**
@ -27,11 +28,11 @@ public enum WorkflowInstanceStatus {
/** /**
* 广义的运行状态 * 广义的运行状态
*/ */
public static final List<Integer> GENERALIZED_RUNNING_STATUS = Lists.newArrayList(WAITING.v, RUNNING.v); public static final List<Integer> GENERALIZED_RUNNING_STATUS = Collections.unmodifiableList(Lists.newArrayList(WAITING.v, RUNNING.v));
/** /**
* 结束状态 * 结束状态
*/ */
public static final List<Integer> FINISHED_STATUS = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v); public static final List<Integer> FINISHED_STATUS = Collections.unmodifiableList(Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v));
private final int v; private final int v;

View File

@ -84,7 +84,9 @@ public class ContainerService {
// 并发部署的机器数量 // 并发部署的机器数量
private static final int DEPLOY_BATCH_NUM = 50; private static final int DEPLOY_BATCH_NUM = 50;
// 部署间隔 // 部署间隔
private static final long DEPLOY_MIN_INTERVAL = 10 * 60 * 1000; private static final long DEPLOY_MIN_INTERVAL = 10 * 60 * 1000L;
// 最长部署时间
private static final long DEPLOY_MAX_COST_TIME = 10 * 60 * 1000L;
/** /**
* 保存容器 * 保存容器
@ -208,14 +210,13 @@ public class ContainerService {
String deployLock = "containerDeployLock-" + containerId; String deployLock = "containerDeployLock-" + containerId;
RemoteEndpoint.Async remote = session.getAsyncRemote(); RemoteEndpoint.Async remote = session.getAsyncRemote();
// 最长部署时间10分钟 // 最长部署时间10分钟
boolean lock = lockService.tryLock(deployLock, 10 * 60 * 1000); boolean lock = lockService.tryLock(deployLock, DEPLOY_MAX_COST_TIME);
if (!lock) { if (!lock) {
remote.sendText("SYSTEM: acquire deploy lock failed, maybe other user is deploying, please wait until the running deploy task finished."); remote.sendText("SYSTEM: acquire deploy lock failed, maybe other user is deploying, please wait until the running deploy task finished.");
return; return;
} }
try { try {
Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(containerId); Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(containerId);
if (!containerInfoOpt.isPresent()) { if (!containerInfoOpt.isPresent()) {
remote.sendText("SYSTEM: can't find container by id: " + containerId); remote.sendText("SYSTEM: can't find container by id: " + containerId);

View File

@ -94,7 +94,7 @@ public class CleanService {
*/ */
private void cleanByOneServer() { private void cleanByOneServer() {
// 只要第一个server抢到锁其他server就会返回所以锁10分钟应该足够了 // 只要第一个server抢到锁其他server就会返回所以锁10分钟应该足够了
boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000); boolean lock = lockService.tryLock(HISTORY_DELETE_LOCK, 10 * 60 * 1000L);
if (!lock) { if (!lock) {
log.info("[CleanService] clean job is already running, just return."); log.info("[CleanService] clean job is already running, just return.");
return; return;

View File

@ -69,7 +69,7 @@ public class SnowFlakeIdGenerator {
public synchronized long nextId() { public synchronized long nextId() {
long currStamp = getNewStamp(); long currStamp = getNewStamp();
if (currStamp < lastTimestamp) { if (currStamp < lastTimestamp) {
throw new RuntimeException("clock moved backwards, refusing to generate id"); return futureId();
} }
if (currStamp == lastTimestamp) { if (currStamp == lastTimestamp) {
@ -92,6 +92,22 @@ public class SnowFlakeIdGenerator {
| sequence; //序列号部分 | sequence; //序列号部分
} }
/**
* 发生时钟回拨时借用未来时间生成Id避免运行过程中任务调度和工作流直接进入不可用状态
* 该方式不可解决原算法中停服状态下时钟回拨导致的重复id问题
*/
private long futureId() {
sequence = (sequence + 1) & MAX_SEQUENCE;
if (sequence == 0L) {
lastTimestamp = lastTimestamp + 1;
}
return (lastTimestamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分
| dataCenterId << DATA_CENTER_LEFT //数据中心部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //序列号部分
}
private long getNextMill() { private long getNextMill() {
long mill = getNewStamp(); long mill = getNewStamp();
while (mill <= lastTimestamp) { while (mill <= lastTimestamp) {

View File

@ -1,5 +1,6 @@
package tech.powerjob.server.extension.defaultimpl.alarm.impl; package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import org.springframework.beans.factory.annotation.Value;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import tech.powerjob.server.persistence.remote.model.UserInfoDO; import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm; import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
@ -31,12 +32,11 @@ public class MailAlarmService implements Alarmable {
private JavaMailSender javaMailSender; private JavaMailSender javaMailSender;
@Value("${spring.mail.username:''}")
private String from; private String from;
private static final String FROM_KEY = "spring.mail.username";
@Override @Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) { public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
initFrom();
if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) { if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
return; return;
} }
@ -59,10 +59,4 @@ public class MailAlarmService implements Alarmable {
this.javaMailSender = javaMailSender; this.javaMailSender = javaMailSender;
} }
// 不能直接使用 @Value 注入不存在的时候会报错
private void initFrom() {
if (StringUtils.isEmpty(from)) {
from = environment.getProperty(FROM_KEY);
}
}
} }

View File

@ -527,12 +527,11 @@ public abstract class TaskTracker {
// 3. 避免大查询分批派发任务 // 3. 避免大查询分批派发任务
long currentDispatchNum = 0; long currentDispatchNum = 0;
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L; long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L;
int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum);
AtomicInteger index = new AtomicInteger(0); AtomicInteger index = new AtomicInteger(0);
// 4. 循环查询数据库获取需要派发的任务 // 4. 循环查询数据库获取需要派发的任务
while (maxDispatchNum > currentDispatchNum) { while (maxDispatchNum > currentDispatchNum) {
int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum);
List<TaskDO> needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit); List<TaskDO> needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit);
currentDispatchNum += needDispatchTasks.size(); currentDispatchNum += needDispatchTasks.size();