Merge branch '5.1.0-bugfix2' into 5.1.1_v2

This commit is contained in:
tjq 2024-12-07 17:00:45 +08:00
commit 7333ee3951
4 changed files with 29 additions and 16 deletions

View File

@ -64,4 +64,6 @@ public class PowerJobDKey {
public static final String WORKER_RUNTIME_SWAP_TASK_SCHEDULE_INTERVAL_MS = "powerjob.worker.swap.scan-interval";
public static final String SERVER_TEST_ACCOUNT_USERNAME = "powerjob.server.test-accounts";
}

View File

@ -1,14 +1,17 @@
package tech.powerjob.server.web.service.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.enums.ErrorCodes;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.enums.ErrorCodes;
import tech.powerjob.server.auth.common.PowerJobAuthException;
import tech.powerjob.common.utils.DigestUtils;
import tech.powerjob.server.auth.common.PowerJobAuthException;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.persistence.remote.model.PwjbUserInfoDO;
import tech.powerjob.server.persistence.remote.repository.PwjbUserInfoRepository;
import tech.powerjob.server.web.request.ChangePasswordRequest;
@ -88,7 +91,12 @@ public class PwjbUserWebServiceImplImpl implements PwjbUserWebService {
}
// 测试账号特殊处理
if (NOT_ALLOWED_CHANGE_PASSWORD_ACCOUNTS.contains(username)) {
Set<String> testAccounts = Sets.newHashSet(NOT_ALLOWED_CHANGE_PASSWORD_ACCOUNTS);
String testAccountsStr = System.getProperty(PowerJobDKey.SERVER_TEST_ACCOUNT_USERNAME);
if (StringUtils.isNotEmpty(testAccountsStr)) {
testAccounts.addAll(Lists.newArrayList(SJ.COMMA_SPLITTER.split(testAccountsStr)));
}
if (testAccounts.contains(username)) {
throw new IllegalArgumentException("this account not allowed change the password");
}

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>5.1.0-bugfix</version>
<version>5.1.0-bugfix2-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>

View File

@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -478,6 +479,7 @@ public abstract class HeavyTaskTracker extends TaskTracker {
long currentDispatchNum = 0;
long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L;
AtomicInteger index = new AtomicInteger(0);
AtomicBoolean skipThisRound = new AtomicBoolean(false);
// 4. 循环查询数据库获取需要派发的任务
while (maxDispatchNum > currentDispatchNum) {
@ -490,8 +492,15 @@ public abstract class HeavyTaskTracker extends TaskTracker {
// 获取 ProcessorTracker 地址如果 Task 中自带了 Address则使用该 Address
String ptAddress = task.getAddress();
if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) {
if (taskNeedByPassTaskTracker(availablePtIps)) {
if (taskNeedByPassTaskTracker()) {
int loopTime = 0;
do {
loopTime++;
if (loopTime > 3) {
log.warn("[TaskTracker-{}] The cluster has no available workers other than master, so this round dispatch is skipped.", instanceId);
skipThisRound.set(true);
return;
}
ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size());
} while (workerRuntime.getWorkerAddress().equals(ptAddress));
} else {
@ -501,6 +510,10 @@ public abstract class HeavyTaskTracker extends TaskTracker {
dispatchTask(task, ptAddress);
});
if (skipThisRound.get()) {
break;
}
// 数量不足 查询失败则终止循环
if (needDispatchTasks.size() < dbQueryLimit) {
break;
@ -510,18 +523,8 @@ public abstract class HeavyTaskTracker extends TaskTracker {
log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop());
}
/**
* padding的生效条件 在map || mapReduce 的情况下 且是该appId的worker是 非单机运行时才生效
* fix: 当该appId的worker是单机运行 padding时 导致Dispatcher分发任务处于死循环中 致使无法分发任务状态一直为运行中
* 且该线程不能通过停止任务的方式去停止只能通过重启该work实例的方式释放该线程
*/
private boolean taskNeedByPassTaskTracker(List<String> availablePtIps) {
private boolean taskNeedByPassTaskTracker() {
if (ExecuteType.MAP.equals(executeType) || ExecuteType.MAP_REDUCE.equals(executeType)) {
if (availablePtIps.size() <= 1) {
return false;
}
return TaskTrackerBehavior.PADDLING.getV().equals(advancedRuntimeConfig.getTaskTrackerBehavior());
}
return false;