This commit is contained in:
tjq 2020-04-11 14:04:57 +08:00
parent 4ad42a4d6a
commit dc00291a82
8 changed files with 45 additions and 12 deletions

View File

@ -52,8 +52,13 @@ public class SystemMetrics implements Serializable, Comparable<SystemMetrics> {
double availableMemory = jvmMaxMemory - jvmUsedMemory;
double availableDisk = diskTotal - diskUsage;
// 保护性判断Windows下无法获取CPU可用核心数先固定 0.5
if (availableCPUCores < 0) {
availableCPUCores = 0.5;
}
// 最低运行标准1G磁盘 & 0.5G内存 & 一个可用的CPU核心
if (availableDisk < 1 || availableMemory < 0.5 || availableCPUCores < 1) {
if (availableDisk < 1 || availableMemory < 0.5 || availableCPUCores < 0.5) {
score = MIN_SCORE;
} else {
// 磁盘只需要满足最低标准即可

View File

@ -32,9 +32,6 @@ public class DispatchService {
@Resource
private ExecuteLogRepository executeLogRepository;
// 前三个状态都视为运行中
private static final List<Integer> runningStatus = Lists.newArrayList(WAITING_DISPATCH.getV(), WAITING_WORKER_RECEIVE.getV(), RUNNING.getV());
private static final String EMPTY_RESULT = "";
/**
@ -45,16 +42,16 @@ public class DispatchService {
*/
public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) {
Long jobId = jobInfo.getId();
log.info("[DispatchService] start to dispatch job: {}.", jobInfo);
// 查询当前运行的实例数
long current = System.currentTimeMillis();
long runningInstanceCount = executeLogRepository.countByJobIdAndStatusIn(jobInfo.getId(), runningStatus);
long runningInstanceCount = executeLogRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus);
// 超出最大同时运行限制不执行调度
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
log.warn("[DispatchService] cancel dispatch job({}) due to too much instance(num={}) is running.", jobInfo, runningInstanceCount);
log.warn("[DispatchService] cancel dispatch job(jobId={}) due to too much instance(num={}) is running.", jobId, runningInstanceCount);
executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, result);
return;
}
@ -64,7 +61,8 @@ public class DispatchService {
List<String> allAvailableWorker = WorkerManagerService.getAllAvailableWorker(jobInfo.getAppId());
if (StringUtils.isEmpty(taskTrackerAddress)) {
log.warn("[DispatchService] cancel dispatch job({}) due to no worker available.", jobInfo);
String clusterStatusDescription = WorkerManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId());
log.warn("[DispatchService] cancel dispatch job(jobId={}) due to no worker available, clusterStatus is {}.", jobId, clusterStatusDescription);
executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE);
return;
}

View File

@ -117,4 +117,12 @@ public class ClusterStatusHolder {
}
return false;
}
/**
* 获取整个集群的简介
* @return 获取集群简介
*/
public String getClusterDescription() {
return String.format("appName:%s,clusterStatus:%s", appName, address2Metrics.toString());
}
}

View File

@ -66,4 +66,18 @@ public class WorkerManagerService {
appId2ClusterStatus.entrySet().removeIf(entry -> !keys.contains(entry.getKey()));
}
/**
* 获取某个应用下的Worker集群状态描述
* @param appId 应用ID
* @return 集群状态描述信息
*/
public static String getWorkerClusterStatusDescription(Long appId) {
ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.get(appId);
if (clusterStatusHolder == null) {
return "CAN'T_FIND_ANY_WORKER";
}
return clusterStatusHolder.getClusterDescription();
}
}

View File

@ -96,13 +96,12 @@ public class JobScheduleService {
List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, JobStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), timeThreshold);
if (CollectionUtils.isEmpty(jobInfos)) {
log.info("[JobScheduleService] no cron job need to schedule");
return;
}
// 1. 批量写日志表
Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
log.info("[JobScheduleService] try to schedule some cron jobs, they are {}.", jobInfos);
log.info("[JobScheduleService] These cron jobs will be scheduled {}.", jobInfos);
List<ExecuteLogDO> executeLogs = Lists.newLinkedList();
jobInfos.forEach(jobInfoDO -> {
@ -159,7 +158,7 @@ public class JobScheduleService {
updatedJobInfos.add(updatedJobInfo);
} catch (Exception e) {
log.error("[JobScheduleService] calculate next trigger time for job({}) failed.", jobInfoDO, e);
log.error("[JobScheduleService] calculate next trigger time for job(jobId={}) failed.", jobInfoDO.getId(), e);
}
});
jobInfoRepository.saveAll(updatedJobInfos);

View File

@ -10,5 +10,5 @@ spring.datasource.password=No1Bug2Please3!
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.minimum-idle=5
# JPA 相关配置
spring.jpa.show-sql=true
spring.jpa.show-sql=false
spring.jpa.hibernate.ddl-auto=update

View File

@ -117,6 +117,7 @@ public class TaskTrackerActor extends AbstractActor {
return;
}
log.debug("[TaskTrackerActor] server schedule job by request: {}.", req);
// 原子创建防止多实例的存在
TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> TaskTracker.create(req));
}

View File

@ -41,3 +41,11 @@ java.lang.RuntimeException: create root task failed.
```
***
原因及解决方案destroy方法调用了scheduledPool.shutdownNow()方法导致调用该方法的线程池被强制关闭该方法也自然被中断数据删到一半没删掉破坏了数据库结构后面的insert自然也就失败了。
# 2020.4.11 "集群"测试
#### 任务重试机制失效
原因SQL中的now()函数返回的是Datetime不能用ing/bigint去接收...
#### SystemMetric算分问题
问题java.lang.management.OperatingSystemMXBean#getSystemLoadAverage 不一定能获取CPU当前负载可能返回负数代表不可用...
解决方案印度Windows上getSystemLoadAverage()固定返回-1...太坑了...先做个保护性判断继续测试吧...