fix: limit worker num failed in map/mapreduce job #450

This commit is contained in:
tjq 2022-09-10 09:55:26 +08:00
parent dfd1fd069b
commit ac1b1fe0c8
4 changed files with 65 additions and 10 deletions

View File

@ -20,6 +20,11 @@ public class ServerScheduleJobReq implements PowerSerializable {
*/ */
private List<String> allWorkerAddress; private List<String> allWorkerAddress;
/**
* 最大机器数量
*/
private Integer maxWorkerCount;
/* *********************** 任务相关属性 *********************** */ /* *********************** 任务相关属性 *********************** */
/** /**

View File

@ -181,6 +181,7 @@ public class DispatchService {
} }
req.setInstanceId(instanceInfo.getInstanceId()); req.setInstanceId(instanceInfo.getInstanceId());
req.setAllWorkerAddress(finalWorkersIpList); req.setAllWorkerAddress(finalWorkersIpList);
req.setMaxWorkerCount(jobInfo.getMaxWorkerCount());
// 设置工作流ID // 设置工作流ID
req.setWfInstanceId(instanceInfo.getWfInstanceId()); req.setWfInstanceId(instanceInfo.getWfInstanceId());

View File

@ -17,10 +17,15 @@ import java.util.Map;
@Slf4j @Slf4j
public class ProcessorTrackerStatusHolder { public class ProcessorTrackerStatusHolder {
private final Long instanceId;
private final Integer maxWorkerCount;
// ProcessorTracker的address(IP:Port) -> 状态 // ProcessorTracker的address(IP:Port) -> 状态
private final Map<String, ProcessorTrackerStatus> address2Status; private final Map<String, ProcessorTrackerStatus> address2Status;
public ProcessorTrackerStatusHolder(List<String> allWorkerAddress) { public ProcessorTrackerStatusHolder(Long instanceId, Integer maxWorkerCount, List<String> allWorkerAddress) {
this.instanceId = instanceId;
this.maxWorkerCount = maxWorkerCount;
address2Status = Maps.newConcurrentMap(); address2Status = Maps.newConcurrentMap();
allWorkerAddress.forEach(address -> { allWorkerAddress.forEach(address -> {
@ -38,7 +43,7 @@ public class ProcessorTrackerStatusHolder {
public ProcessorTrackerStatus getProcessorTrackerStatus(String address) { public ProcessorTrackerStatus getProcessorTrackerStatus(String address) {
// remove 前突然收到了 PT 心跳同时立即被派发才可能出现这种情况0.001% 概率 // remove 前突然收到了 PT 心跳同时立即被派发才可能出现这种情况0.001% 概率
return address2Status.computeIfAbsent(address, ignore -> { return address2Status.computeIfAbsent(address, ignore -> {
log.warn("[ProcessorTrackerStatusHolder] unregistered worker: {}", address); log.warn("[PTStatusHolder-{}] unregistered worker: {}", instanceId, address);
ProcessorTrackerStatus processorTrackerStatus = new ProcessorTrackerStatus(); ProcessorTrackerStatus processorTrackerStatus = new ProcessorTrackerStatus();
processorTrackerStatus.init(address); processorTrackerStatus.init(address);
return processorTrackerStatus; return processorTrackerStatus;
@ -90,9 +95,9 @@ public class ProcessorTrackerStatusHolder {
/** /**
* 注册新的执行节点 * 注册新的执行节点
* @param address 新的执行节点地址 * @param address 新的执行节点地址
* @return true: 注册成功 / false已存在 * @return true: register successfully / false: already exists
*/ */
public boolean register(String address) { private boolean registerOne(String address) {
ProcessorTrackerStatus pts = address2Status.get(address); ProcessorTrackerStatus pts = address2Status.get(address);
if (pts != null) { if (pts != null) {
return false; return false;
@ -100,9 +105,50 @@ public class ProcessorTrackerStatusHolder {
pts = new ProcessorTrackerStatus(); pts = new ProcessorTrackerStatus();
pts.init(address); pts.init(address);
address2Status.put(address, pts); address2Status.put(address, pts);
log.info("[PTStatusHolder-{}] register new worker: {}", instanceId, address);
return true; return true;
} }
public void register(List<String> workerIpList) {
if (endlessWorkerNum()) {
workerIpList.forEach(this::registerOne);
return;
}
List<String> availableProcessorTrackers = getAvailableProcessorTrackers();
int currentWorkerSize = availableProcessorTrackers.size();
int needMoreNum = maxWorkerCount - currentWorkerSize;
if (needMoreNum <= 0) {
return;
}
log.info("[PTStatusHolder-{}] currentWorkerSize: {}, needMoreNum: {}", instanceId, currentWorkerSize, needMoreNum);
for (String newIp : workerIpList) {
boolean success = registerOne(newIp);
if (success) {
needMoreNum --;
}
if (needMoreNum <= 0) {
return;
}
}
}
/**
* 检查是否需要动态加载新的执行器
* @return check need more workers
*/
public boolean checkNeedMoreWorker() {
if (endlessWorkerNum()) {
return true;
}
return getAvailableProcessorTrackers().size() < maxWorkerCount;
}
private boolean endlessWorkerNum() {
return maxWorkerCount == null || maxWorkerCount == 0;
}
public void remove(List<String> addressList) { public void remove(List<String> addressList) {
addressList.forEach(address2Status::remove); addressList.forEach(address2Status::remove);
} }

View File

@ -124,7 +124,7 @@ public abstract class TaskTracker {
// 保护性操作 // 保护性操作
instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency())); instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency()));
this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress()); this.ptStatusHolder = new ProcessorTrackerStatusHolder(instanceId, req.getMaxWorkerCount(), req.getAllWorkerAddress());
this.taskPersistenceService = workerRuntime.getTaskPersistenceService(); this.taskPersistenceService = workerRuntime.getTaskPersistenceService();
this.finished = new AtomicBoolean(false); this.finished = new AtomicBoolean(false);
// 只有工作流中的任务允许向工作流中追加上下文数据 // 只有工作流中的任务允许向工作流中追加上下文数据
@ -562,6 +562,13 @@ public abstract class TaskTracker {
protected class WorkerDetector implements Runnable { protected class WorkerDetector implements Runnable {
@Override @Override
public void run() { public void run() {
boolean needMoreWorker = ptStatusHolder.checkNeedMoreWorker();
log.info("[TaskTracker-{}] checkNeedMoreWorker: {}", instanceId, needMoreWorker);
if (!needMoreWorker) {
return;
}
String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress()); String serverPath = AkkaUtils.getServerActorPath(workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
if (StringUtils.isEmpty(serverPath)) { if (StringUtils.isEmpty(serverPath)) {
log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId); log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId);
@ -575,11 +582,7 @@ public abstract class TaskTracker {
} }
try { try {
List<String> workerList = JsonUtils.parseObject(response.getData(), new TypeReference<List<String>>() {}); List<String> workerList = JsonUtils.parseObject(response.getData(), new TypeReference<List<String>>() {});
workerList.forEach(address -> { ptStatusHolder.register(workerList);
if (ptStatusHolder.register(address)) {
log.info("[TaskTracker-{}] detective new worker: {}", instanceId, address);
}
});
} catch (Exception e) { } catch (Exception e) {
log.warn("[TaskTracker-{}] detective failed!", instanceId, e); log.warn("[TaskTracker-{}] detective failed!", instanceId, e);
} }