diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index 40693039..c231b063 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,12 +10,11 @@ 4.0.0 powerjob-client - 3.0.0 + 3.0.1 jar - 3.0.0 - 5.6.1 + 3.0.1 @@ -25,13 +24,6 @@ powerjob-common ${powerjob.common.version} - - - org.junit.jupiter - junit-jupiter-api - ${junit.version} - test - \ No newline at end of file diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index aa7ebbf5..be200502 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.0.0 + 3.0.1 jar @@ -20,6 +20,7 @@ 29.0-jre 4.4.1 2.6.4 + 5.6.1 @@ -68,13 +69,20 @@ ${akka.version} - commons-io commons-io ${commons.io.version} + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + \ No newline at end of file diff --git a/powerjob-common/src/test/java/SegmentLockTest.java b/powerjob-common/src/test/java/SegmentLockTest.java new file mode 100644 index 00000000..2867064d --- /dev/null +++ b/powerjob-common/src/test/java/SegmentLockTest.java @@ -0,0 +1,61 @@ +import com.github.kfcfans.powerjob.common.utils.CommonUtils; +import com.github.kfcfans.powerjob.common.utils.SegmentLock; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * 分段锁测试 + * + * @author tjq + * @since 2020/6/15 + */ +public class SegmentLockTest { + + @Test + public void testLock() throws Exception { + int lockId = 10086; + SegmentLock lock = new SegmentLock(4); + ExecutorService pool = Executors.newFixedThreadPool(2); + pool.execute(() -> { + System.out.println("before lock A"); + lock.lockInterruptibleSafe(lockId); + System.out.println("after lock A"); + }); + + pool.execute(() -> { + System.out.println("before lock AA"); + lock.lockInterruptibleSafe(lockId); + System.out.println("after lock AA"); + }); + + Thread.sleep(10000); + } + + @Test + public void testUnLock() throws Exception { + int lockId = 10086; + SegmentLock lock = new SegmentLock(4); + ExecutorService pool = Executors.newFixedThreadPool(2); + pool.execute(() -> { + System.out.println("before lock A"); + lock.lockInterruptibleSafe(lockId); + System.out.println("after lock A"); + try { + Thread.sleep(5000); + }catch (Exception ignore) { + } + lock.unlock(lockId); + }); + + pool.execute(() -> { + System.out.println("before lock AA"); + lock.lockInterruptibleSafe(lockId); + System.out.println("after lock AA"); + }); + + Thread.sleep(10000); + } + +} diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index f4b20453..efb56e95 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.0.0 + 3.0.1 jar 2.9.2 2.2.6.RELEASE - 3.0.0 + 3.0.1 8.0.19 1.4.200 2.5.2 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java index 6c99e680..ba5b876c 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java @@ -31,6 +31,8 @@ import java.util.Optional; @Slf4j public class ServerActor extends AbstractActor { + private InstanceManager instanceManager; + @Override public Receive createReceive() { return receiveBuilder() @@ -57,7 +59,7 @@ public class ServerActor extends AbstractActor { */ private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) { try { - InstanceManager.updateStatus(req); + getInstanceManager().updateStatus(req); // 结束状态(成功/失败)需要回复消息 if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) { @@ -105,4 +107,12 @@ public class ServerActor extends AbstractActor { getSender().tell(askResponse, getSelf()); } + + // 不需要加锁,从 Spring IOC 中重复取并没什么问题 + private InstanceManager getInstanceManager() { + if (instanceManager == null) { + instanceManager = SpringUtils.getBean(InstanceManager.class); + } + return instanceManager; + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java index 39f8d0a6..f06a4e50 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java @@ -54,6 +54,8 @@ public class InstanceInfoDO { private Long actualTriggerTime; // 结束时间 private Long finishedTime; + // 最后上报时间 + private Long lastReportTime; // TaskTracker地址 private String taskTrackerAddress; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index f5c60e1f..1f94bd77 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -36,6 +36,8 @@ import static com.github.kfcfans.powerjob.common.InstanceStatus.*; @Service public class DispatchService { + @Resource + private InstanceManager instanceManager; @Resource private InstanceInfoRepository instanceInfoRepository; @@ -71,7 +73,7 @@ public class DispatchService { log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now); - InstanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); + instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); return; } @@ -96,7 +98,7 @@ public class DispatchService { log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available, clusterStatus is {}.", jobId, instanceId, clusterStatusDescription); instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, dbInstanceParams, now); - InstanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE); + instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE); return; } @@ -107,9 +109,6 @@ public class DispatchService { } } - // 注册到任务实例管理中心 - InstanceManager.register(instanceId, jobInfo); - // 构造请求 ServerScheduleJobReq req = new ServerScheduleJobReq(); BeanUtils.copyProperties(jobInfo, req); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java index 3db9d990..da3c03d5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/InstanceLogService.java @@ -45,6 +45,8 @@ import java.util.stream.Stream; @Service public class InstanceLogService { + @Resource + private InstanceManager instanceManager; @Resource private GridFsManager gridFsManager; // 本地数据库操作bean @@ -195,8 +197,8 @@ public class InstanceLogService { } // 删除本地数据库数据 try { - CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); instanceId2LastReportTime.remove(instanceId); + CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId)); }catch (Exception e) { log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e); } @@ -315,10 +317,10 @@ public class InstanceLogService { @Scheduled(fixedDelay = 60000) public void timingCheck() { - // 1. 定时删除秒级任务的日志 + // 定时删除秒级任务的日志 List frequentInstanceIds = Lists.newLinkedList(); instanceId2LastReportTime.keySet().forEach(instanceId -> { - JobInfoDO jobInfo = InstanceManager.fetchJobInfo(instanceId); + JobInfoDO jobInfo = instanceManager.fetchJobInfo(instanceId); if (jobInfo == null) { return; } @@ -340,7 +342,7 @@ public class InstanceLogService { }); } - // 2. 删除长时间未 REPORT 的日志 + // 删除长时间未 REPORT 的日志(必要性考证中......) } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java index 522cb263..cb386261 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceManager.java @@ -16,13 +16,15 @@ import com.github.kfcfans.powerjob.server.service.alarm.Alarmable; import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarmContent; import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder; import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager; -import com.google.common.collect.Maps; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; +import org.springframework.stereotype.Service; +import javax.annotation.Resource; import java.util.Date; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -33,114 +35,112 @@ import java.util.concurrent.TimeUnit; * @since 2020/4/7 */ @Slf4j +@Service public class InstanceManager { // 存储 instanceId 对应的 Job 信息,便于重试 - private static Map instanceId2JobInfo = Maps.newConcurrentMap(); - // 存储 instance 的状态(暂时只用到了 lastReportTime) - private static Map instanceId2StatusHolder = Maps.newConcurrentMap(); + private static Cache instanceId2JobInfo; // Spring Bean - private static DispatchService dispatchService; - private static InstanceLogService instanceLogService; - private static InstanceInfoRepository instanceInfoRepository; - private static JobInfoRepository jobInfoRepository; - private static Alarmable omsCenterAlarmService; - private static WorkflowInstanceManager workflowInstanceManager; + @Resource + private DispatchService dispatchService; + @Resource + private InstanceLogService instanceLogService; + @Resource(name = "omsCenterAlarmService") + private Alarmable omsCenterAlarmService; + @Resource + private InstanceInfoRepository instanceInfoRepository; + @Resource + private JobInfoRepository jobInfoRepository; + @Resource + private WorkflowInstanceManager workflowInstanceManager; - /** - * 注册到任务实例管理器 - * @param instanceId 即将运行的任务实例ID - * @param jobInfoDO 即将运行的任务实例对应的任务元数据 - */ - public static void register(Long instanceId, JobInfoDO jobInfoDO) { + private static final int CACHE_CONCURRENCY_LEVEL = 8; + private static final int CACHE_MAX_SIZE = 4096; - InstanceStatusHolder statusHolder = new InstanceStatusHolder(); - statusHolder.setInstanceId(instanceId); - statusHolder.setInstanceStatus(InstanceStatus.WAITING_DISPATCH.getV()); - - instanceId2JobInfo.put(instanceId, jobInfoDO); - instanceId2StatusHolder.put(instanceId, statusHolder); + static { + instanceId2JobInfo = CacheBuilder.newBuilder() + .concurrencyLevel(CACHE_CONCURRENCY_LEVEL) + .maximumSize(CACHE_MAX_SIZE) + .build(); } /** * 更新任务状态 * @param req TaskTracker上报任务实例状态的请求 */ - public static void updateStatus(TaskTrackerReportInstanceStatusReq req) { + public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws Exception { Long jobId = req.getJobId(); Long instanceId = req.getInstanceId(); - // 不存在,可能该任务实例刚经历Server变更,需要重新构建基础信息 - if (!instanceId2JobInfo.containsKey(instanceId)) { - log.warn("[InstanceManager] can't find any register info for instance(jobId={},instanceId={}), maybe change the server.", jobId, instanceId); + // 获取相关数据 + JobInfoDO jobInfo = instanceId2JobInfo.get(instanceId, () -> { + Optional jobInfoOpt = jobInfoRepository.findById(jobId); + return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobIno by jobId: " + jobId)); + }); + InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); - Optional jobInfoDOOptional = getJobInfoRepository().findById(jobId); - if (jobInfoDOOptional.isPresent()) { - JobInfoDO JobInfoDo = jobInfoDOOptional.get(); - instanceId2JobInfo.put(instanceId, JobInfoDo); - }else { - throw new IllegalArgumentException("can't find JobIno by jobId:" + jobId); - } + // 丢弃过期的上报数据 + if (req.getReportTime() <= instanceInfo.getLastReportTime()) { + log.warn("[InstanceManager-{}] receive the expired status report request: {}, this report will br dropped.", instanceId, req); + return; } - // 更新本地保存的任务实例状态(用于未完成任务前的详细信息查询和缓存加速) - InstanceStatusHolder statusHolder = instanceId2StatusHolder.computeIfAbsent(instanceId, ignore -> new InstanceStatusHolder()); - if (req.getReportTime() > statusHolder.getLastReportTime()) { - BeanUtils.copyProperties(req, statusHolder); - statusHolder.setLastReportTime(req.getReportTime()); - }else { - log.warn("[InstanceManager] receive the expired status report request: {}.", req); + // 丢弃非目标 TaskTracker 的上报数据(脑裂情况) + if (!req.getSourceAddress().equals(instanceInfo.getTaskTrackerAddress())) { + log.warn("[InstanceManager-{}] receive the other TaskTracker's report: {}, but current TaskTracker is {}, this report will br dropped.", instanceId, req, instanceInfo.getTaskTrackerAddress()); return; } InstanceStatus newStatus = InstanceStatus.of(req.getInstanceStatus()); - Integer timeExpressionType = instanceId2JobInfo.get(instanceId).getTimeExpressionType(); + Integer timeExpressionType = jobInfo.getTimeExpressionType(); + + instanceInfo.setStatus(newStatus.getV()); + instanceInfo.setLastReportTime(req.getReportTime()); + instanceInfo.setGmtModified(new Date()); // FREQUENT 任务没有失败重试机制,TaskTracker一直运行即可,只需要将存活信息同步到DB即可 // FREQUENT 任务的 newStatus 只有2中情况,一种是 RUNNING,一种是 FAILED(表示该机器 overload,需要重新选一台机器执行) // 综上,直接把 status 和 runningNum 同步到DB即可 if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) { - getInstanceInfoRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum(), new Date()); + + instanceInfo.setRunningTimes(req.getTotalTaskNum()); + instanceInfoRepository.saveAndFlush(instanceInfo); return; } - InstanceInfoDO updateEntity = getInstanceInfoRepository().findByInstanceId(instanceId); - updateEntity.setStatus(newStatus.getV()); - updateEntity.setGmtModified(new Date()); - boolean finished = false; if (newStatus == InstanceStatus.SUCCEED) { - updateEntity.setResult(req.getResult()); - updateEntity.setFinishedTime(System.currentTimeMillis()); + instanceInfo.setResult(req.getResult()); + instanceInfo.setFinishedTime(System.currentTimeMillis()); finished = true; }else if (newStatus == InstanceStatus.FAILED) { // 当前重试次数 <= 最大重试次数,进行重试 (第一次运行,runningTimes为1,重试一次,instanceRetryNum也为1,故需要 =) - if (updateEntity.getRunningTimes() <= instanceId2JobInfo.get(instanceId).getInstanceRetryNum()) { + if (instanceInfo.getRunningTimes() <= jobInfo.getInstanceRetryNum()) { - log.info("[InstanceManager] instance(instanceId={}) execute failed but will take the {}th retry.", instanceId, updateEntity.getRunningTimes()); + log.info("[InstanceManager-{}] instance execute failed but will take the {}th retry.", instanceId, instanceInfo.getRunningTimes()); // 延迟10S重试(由于重试不改变 instanceId,如果派发到同一台机器,上一个 TaskTracker 还处于资源释放阶段,无法创建新的TaskTracker,任务失败) HashedWheelTimerHolder.TIMER.schedule(() -> { - getDispatchService().redispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes()); + dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes()); }, 10, TimeUnit.SECONDS); // 修改状态为 等待派发,正式开始重试 // 问题:会丢失以往的调度记录(actualTriggerTime什么的都会被覆盖) - updateEntity.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); + instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); }else { - updateEntity.setResult(req.getResult()); - updateEntity.setFinishedTime(System.currentTimeMillis()); + instanceInfo.setResult(req.getResult()); + instanceInfo.setFinishedTime(System.currentTimeMillis()); finished = true; - log.info("[InstanceManager] instance(instanceId={}) execute failed and have no chance to retry.", instanceId); + log.info("[InstanceManager-{}] instance execute failed and have no chance to retry.", instanceId); } } // 同步状态变更信息到数据库 - getInstanceInfoRepository().saveAndFlush(updateEntity); + instanceInfoRepository.saveAndFlush(instanceInfo); if (finished) { // 这里的 InstanceStatus 只有 成功/失败 两种,手动停止不会由 TaskTracker 上报 @@ -155,53 +155,53 @@ public class InstanceManager { * @param status 任务状态,有 成功/失败/手动停止 * @param result 执行结果 */ - public static void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) { + public void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) { log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name()); - // 清除已完成的实例信息 - instanceId2StatusHolder.remove(instanceId); - // 这一步也可能导致后面取不到 JobInfoDO - JobInfoDO jobInfo = instanceId2JobInfo.remove(instanceId); - // 上报日志数据 - getInstanceLogService().sync(instanceId); + instanceLogService.sync(instanceId); // workflow 特殊处理 if (wfInstanceId != null) { // 手动停止在工作流中也认为是失败(理论上不应该发生) - getWorkflowInstanceManager().move(wfInstanceId, instanceId, status, result); + workflowInstanceManager.move(wfInstanceId, instanceId, status, result); } // 告警 if (status == InstanceStatus.FAILED) { - - if (jobInfo == null) { - jobInfo = fetchJobInfo(instanceId); - } + JobInfoDO jobInfo = fetchJobInfo(instanceId); if (jobInfo == null) { log.warn("[InstanceManager] can't find jobInfo by instanceId({}), alarm failed.", instanceId); return; } - InstanceInfoDO instanceInfo = getInstanceInfoRepository().findByInstanceId(instanceId); + InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); JobInstanceAlarmContent content = new JobInstanceAlarmContent(); BeanUtils.copyProperties(jobInfo, content); BeanUtils.copyProperties(instanceInfo, content); List userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds()); - getAlarmService().onJobInstanceFailed(content, userList); + omsCenterAlarmService.onJobInstanceFailed(content, userList); } + + // 过期缓存 + instanceId2JobInfo.invalidate(instanceId); } - public static JobInfoDO fetchJobInfo(Long instanceId) { - JobInfoDO jobInfo = instanceId2JobInfo.get(instanceId); + /** + * 根据任务实例ID查询任务相关信息 + * @param instanceId 任务实例ID + * @return 任务元数据 + */ + public JobInfoDO fetchJobInfo(Long instanceId) { + JobInfoDO jobInfo = instanceId2JobInfo.getIfPresent(instanceId); if (jobInfo != null) { return jobInfo; } - InstanceInfoDO instanceInfo = getInstanceInfoRepository().findByInstanceId(instanceId); + InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId); if (instanceInfo != null) { - return getJobInfoRepository().findById(instanceInfo.getJobId()).orElse(null); + return jobInfoRepository.findById(instanceInfo.getJobId()).orElse(null); } return null; } @@ -209,74 +209,7 @@ public class InstanceManager { /** * 释放本地缓存,防止内存泄漏 */ - public static void releaseInstanceInfos() { - instanceId2JobInfo = Maps.newConcurrentMap(); - instanceId2StatusHolder = Maps.newConcurrentMap(); - } - - private static InstanceInfoRepository getInstanceInfoRepository() { - while (instanceInfoRepository == null) { - try { - Thread.sleep(100); - }catch (Exception ignore) { - } - instanceInfoRepository = SpringUtils.getBean(InstanceInfoRepository.class); - } - return instanceInfoRepository; - } - - private static JobInfoRepository getJobInfoRepository() { - while (jobInfoRepository == null) { - try { - Thread.sleep(100); - }catch (Exception ignore) { - } - jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class); - } - return jobInfoRepository; - } - - private static DispatchService getDispatchService() { - while (dispatchService == null) { - try { - Thread.sleep(100); - }catch (Exception ignore) { - } - dispatchService = SpringUtils.getBean(DispatchService.class); - } - return dispatchService; - } - - private static InstanceLogService getInstanceLogService() { - while (instanceLogService == null) { - try { - Thread.sleep(100); - }catch (Exception ignore) { - } - instanceLogService = SpringUtils.getBean(InstanceLogService.class); - } - return instanceLogService; - } - - private static Alarmable getAlarmService() { - while (omsCenterAlarmService == null) { - try { - Thread.sleep(100); - }catch (Exception ignore) { - } - omsCenterAlarmService = (Alarmable) SpringUtils.getBean("omsCenterAlarmService"); - } - return omsCenterAlarmService; - } - - private static WorkflowInstanceManager getWorkflowInstanceManager() { - while (workflowInstanceManager == null) { - try { - Thread.sleep(100); - }catch (Exception ignore) { - } - workflowInstanceManager = SpringUtils.getBean(WorkflowInstanceManager.class); - } - return workflowInstanceManager; + public static void releaseCache() { + instanceId2JobInfo.cleanUp(); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index 3765c81c..526e4ac6 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -41,6 +41,8 @@ public class InstanceService { @Resource private IdGenerateService idGenerateService; @Resource + private InstanceManager instanceManager; + @Resource private InstanceInfoRepository instanceInfoRepository; /** @@ -67,6 +69,7 @@ public class InstanceService { newInstanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV()); newInstanceInfo.setExpectedTriggerTime(expectTriggerTime); + newInstanceInfo.setLastReportTime(-1L); newInstanceInfo.setGmtCreate(now); newInstanceInfo.setGmtModified(now); @@ -101,7 +104,7 @@ public class InstanceService { instanceInfo.setResult(SystemInstanceResult.STOPPED_BY_USER); instanceInfoRepository.saveAndFlush(instanceInfo); - InstanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), STOPPED, SystemInstanceResult.STOPPED_BY_USER); + instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), STOPPED, SystemInstanceResult.STOPPED_BY_USER); /* 不可靠通知停止 TaskTracker diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceStatusHolder.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceStatusHolder.java deleted file mode 100644 index 75bb0803..00000000 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceStatusHolder.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.github.kfcfans.powerjob.server.service.instance; - -import lombok.Data; - -/** - * 保存任务实例的状态信息 - * - * @author tjq - * @since 2020/4/7 - */ -@Data -public class InstanceStatusHolder { - - private long instanceId; - private int instanceStatus; - private String result; - - /* ********* 统计信息 ********* */ - private long totalTaskNum; - private long succeedTaskNum; - private long failedTaskNum; - - // 任务开始时间 - private long startTime; - // 上次上报时间 - private long lastReportTime; - // 源地址(TaskTracker 地址) - private String sourceAddress; -} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index 6d1993eb..e3b9fd88 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -60,7 +60,7 @@ public class CleanService { // 释放本地缓存 WorkerManagerService.releaseContainerInfos(); - InstanceManager.releaseInstanceInfos(); + InstanceManager.releaseCache(); // 删除数据库运行记录 cleanInstanceLog(); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java index 47ca26ff..ad677415 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/InstanceStatusCheckService.java @@ -44,6 +44,8 @@ public class InstanceStatusCheckService { @Resource private DispatchService dispatchService; @Resource + private InstanceManager instanceManager; + @Resource private WorkflowInstanceManager workflowInstanceManager; @Resource @@ -139,7 +141,7 @@ public class InstanceStatusCheckService { } // CRON 和 API一样,失败次数 + 1,根据重试配置进行重试 - if (instance.getRunningTimes() > jobInfoDO.getInstanceRetryNum()) { + if (instance.getRunningTimes() < jobInfoDO.getInstanceRetryNum()) { dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes()); }else { updateFailedInstance(instance); @@ -190,6 +192,6 @@ public class InstanceStatusCheckService { instance.setResult(SystemInstanceResult.REPORT_TIMEOUT); instanceInfoRepository.saveAndFlush(instance); - InstanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, "timeout, maybe TaskTracker down!"); + instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, "timeout, maybe TaskTracker down!"); } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java index d5de14b7..0ebcd68a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/schedule/OmsScheduleService.java @@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.scheduling.annotation.Async; @@ -169,10 +170,8 @@ public class OmsScheduleService { jobInfos.forEach(jobInfoDO -> { try { - CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression()); - Date benchmarkTime = new Date(jobInfoDO.getNextTriggerTime()); - Date nextTriggerTime = cronExpression.getNextValidTimeAfter(benchmarkTime); + Date nextTriggerTime = calculateNextTriggerTime(jobInfoDO.getNextTriggerTime(), jobInfoDO.getTimeExpression()); JobInfoDO updatedJobInfo = new JobInfoDO(); BeanUtils.copyProperties(jobInfoDO, updatedJobInfo); @@ -221,8 +220,7 @@ public class OmsScheduleService { // 3. 重新计算下一次调度时间并更新 try { - CronExpression cronExpression = new CronExpression(wfInfo.getTimeExpression()); - Date nextTriggerTime = cronExpression.getNextValidTimeAfter(new Date(wfInfo.getNextTriggerTime())); + Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression()); WorkflowInfoDO updateEntity = new WorkflowInfoDO(); BeanUtils.copyProperties(wfInfo, updateEntity); @@ -267,4 +265,18 @@ public class OmsScheduleService { }); } + /** + * 计算下次触发时间 + * @param preTriggerTime 前一次触发时间 + * @param cronExpression CRON 表达式 + * @return 下一次调度时间 + * @throws Exception 异常 + */ + private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression) throws Exception { + + CronExpression ce = new CronExpression(cronExpression); + // 取最大值,防止长时间未调度任务被连续调度(原来DISABLE的任务突然被打开,不取最大值会补上过去所有的调度) + long benchmarkTime = Math.max(System.currentTimeMillis(), preTriggerTime); + return ce.getNextValidTimeAfter(new Date(benchmarkTime)); + } } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/WebLogAspect.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/WebLogAspect.java index a91c7001..d8693f46 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/WebLogAspect.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/WebLogAspect.java @@ -36,7 +36,7 @@ public class WebLogAspect { * 第三个*:所有的方法 * 最后的两个点:所有类型的参数 */ - @Pointcut("execution(public * com.github.kfcfans.oms.server.web.controller..*.*(..))") + @Pointcut("execution(public * com.github.kfcfans.powerjob.server.web.controller..*.*(..))") public void include() { } diff --git a/powerjob-server/src/main/resources/application-daily.properties b/powerjob-server/src/main/resources/application-daily.properties index d07ab9ad..be9b0491 100644 --- a/powerjob-server/src/main/resources/application-daily.properties +++ b/powerjob-server/src/main/resources/application-daily.properties @@ -3,14 +3,14 @@ logging.config=classpath:logback-dev.xml ####### 数据库配置 ####### spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver -spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/powerjob-daily?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8 +spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/powerjob-daily?useUnicode=true&characterEncoding=UTF-8 spring.datasource.core.username=root spring.datasource.core.password=No1Bug2Please3! spring.datasource.core.hikari.maximum-pool-size=20 spring.datasource.core.hikari.minimum-idle=5 ####### mongoDB配置,非核心依赖,可移除 ####### -spring.data.mongodb.uri=mongodb://remotehost:27017/oms-daily +spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-daily ####### 邮件配置(启用邮件报警则需要) ####### spring.mail.host=smtp.163.com diff --git a/powerjob-server/src/main/resources/application-pre.properties b/powerjob-server/src/main/resources/application-pre.properties index 3a25034c..4cad8276 100644 --- a/powerjob-server/src/main/resources/application-pre.properties +++ b/powerjob-server/src/main/resources/application-pre.properties @@ -3,14 +3,14 @@ logging.config=classpath:logback-product.xml ####### 数据库配置 ####### spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver -spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/powerjob-pre?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8 +spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/powerjob-pre?useUnicode=true&characterEncoding=UTF-8 spring.datasource.core.username=root spring.datasource.core.password=No1Bug2Please3! spring.datasource.core.hikari.maximum-pool-size=20 spring.datasource.core.hikari.minimum-idle=5 ####### mongoDB配置,非核心依赖,可移除 ####### -spring.data.mongodb.uri=mongodb://remotehost:27017/oms-pre +spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-pre ####### 邮件配置(启用邮件报警则需要) ####### spring.mail.host=smtp.qq.com diff --git a/powerjob-server/src/main/resources/application-product.properties b/powerjob-server/src/main/resources/application-product.properties index d6987df5..ed1e8d00 100644 --- a/powerjob-server/src/main/resources/application-product.properties +++ b/powerjob-server/src/main/resources/application-product.properties @@ -3,14 +3,14 @@ logging.config=classpath:logback-product.xml ####### 数据库配置 ####### spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver -spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-product?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8 +spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 spring.datasource.core.username=root spring.datasource.core.password=No1Bug2Please3! spring.datasource.core.hikari.maximum-pool-size=20 spring.datasource.core.hikari.minimum-idle=5 ####### mongoDB配置,非核心依赖,可移除 ####### -spring.data.mongodb.uri=mongodb://localhost:27017/oms-product +spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-product ####### 邮件配置(启用邮件报警则需要) ####### spring.mail.host=smtp.qq.com diff --git a/powerjob-server/src/main/resources/banner.txt b/powerjob-server/src/main/resources/banner.txt index e0e1d9e7..dffacc32 100644 --- a/powerjob-server/src/main/resources/banner.txt +++ b/powerjob-server/src/main/resources/banner.txt @@ -9,5 +9,5 @@ ${AnsiColor.GREEN} ░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░ ${AnsiColor.BRIGHT_RED} * Maintainer: tengjiqi@gmail.com -* SourceCode: https://github.com/KFCFans/OhMyScheduler +* SourceCode: https://github.com/KFCFans/PowerJob * PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4) diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index b7a8806f..cdc8df20 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker-agent - 3.0.0 + 3.0.1 jar - 3.0.0 + 3.0.1 1.2.3 4.3.2 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index 270cea43..700cd387 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-samples - 3.0.0 + 3.0.1 2.2.6.RELEASE - 3.0.0 + 3.0.1 1.2.68 diff --git a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/OhMySchedulerConfig.java b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/OhMySchedulerConfig.java index 10bc9c26..7c7b410b 100644 --- a/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/OhMySchedulerConfig.java +++ b/powerjob-worker-samples/src/main/java/com/github/kfcfans/powerjob/samples/OhMySchedulerConfig.java @@ -4,6 +4,7 @@ import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.OhMyConfig; import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy; import com.google.common.collect.Lists; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -17,6 +18,10 @@ import java.util.List; */ @Configuration public class OhMySchedulerConfig { + + @Value("${powerjob.akka.port}") + private int port; + @Bean public OhMyWorker initOMS() throws Exception { @@ -25,7 +30,7 @@ public class OhMySchedulerConfig { // 1. 创建配置文件 OhMyConfig config = new OhMyConfig(); - config.setPort(27777); + config.setPort(port); config.setAppName("oms-test"); config.setServerAddress(serverAddress); // 如果没有大型 Map/MapReduce 的需求,建议使用内存来加速计算 diff --git a/powerjob-worker-samples/src/main/resources/application.properties b/powerjob-worker-samples/src/main/resources/application.properties index 8e87e783..7d804f17 100644 --- a/powerjob-worker-samples/src/main/resources/application.properties +++ b/powerjob-worker-samples/src/main/resources/application.properties @@ -1,3 +1,5 @@ server.port=8081 -spring.jpa.open-in-view=false \ No newline at end of file +spring.jpa.open-in-view=false + +powerjob.akka.port=27777 \ No newline at end of file diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index 58a21d0f..5c8ab6af 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker - 3.0.0 + 3.0.1 jar 5.2.4.RELEASE - 3.0.0 + 3.0.1 1.4.200 3.4.2 5.6.1 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java index 8d4a1796..3df267c7 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/ProcessorTrackerActor.java @@ -7,6 +7,9 @@ import com.github.kfcfans.powerjob.worker.persistence.TaskDO; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.CollectionUtils; + +import java.util.List; /** * 普通计算节点,处理来自 TaskTracker 的请求 @@ -28,13 +31,14 @@ public class ProcessorTrackerActor extends AbstractActor { /** * 处理来自TaskTracker的task执行请求 + * @param req 请求 */ private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) { Long instanceId = req.getInstanceInfo().getInstanceId(); - // 创建 ProcessorTracker 一定能成功,且每个任务实例只会创建一个 ProcessorTracker - ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> new ProcessorTracker(req)); + // 创建 ProcessorTracker 一定能成功 + ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, req.getTaskTrackerAddress(), () -> new ProcessorTracker(req)); TaskDO task = new TaskDO(); @@ -47,14 +51,18 @@ public class ProcessorTrackerActor extends AbstractActor { processorTracker.submitTask(task); } + /** + * 处理来自TaskTracker停止任务的请求 + * @param req 请求 + */ private void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) { Long instanceId = req.getInstanceId(); - ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId); - if (processorTracker == null) { + List removedPts = ProcessorTrackerPool.removeProcessorTracker(instanceId); + if (CollectionUtils.isEmpty(removedPts)) { log.warn("[ProcessorTrackerActor] ProcessorTracker for instance(instanceId={}) already destroyed.", instanceId); }else { - processorTracker.destroy(); + removedPts.forEach(ProcessorTracker::destroy); } } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java index ec3d296d..8ae65fcc 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TaskTrackerActor.java @@ -1,19 +1,21 @@ package com.github.kfcfans.powerjob.worker.actors; import akka.actor.AbstractActor; +import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq; +import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool; import com.github.kfcfans.powerjob.worker.persistence.TaskDO; -import com.github.kfcfans.powerjob.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorMapTaskRequest; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.google.common.collect.Lists; +import javafx.concurrent.Task; import lombok.extern.slf4j.Slf4j; import java.util.List; @@ -34,7 +36,6 @@ public class TaskTrackerActor extends AbstractActor { .match(ServerScheduleJobReq.class, this::onReceiveServerScheduleJobReq) .match(ProcessorMapTaskRequest.class, this::onReceiveProcessorMapTaskRequest) .match(ProcessorTrackerStatusReportReq.class, this::onReceiveProcessorTrackerStatusReportReq) - .match(BroadcastTaskPreExecuteFinishedReq.class, this::onReceiveBroadcastTaskPreExecuteFinishedReq) .match(ServerStopInstanceReq.class, this::onReceiveServerStopInstanceReq) .match(ServerQueryInstanceStatusReq.class, this::onReceiveServerQueryInstanceStatusReq) .matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj)) @@ -47,13 +48,26 @@ public class TaskTrackerActor extends AbstractActor { */ private void onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) { + int taskStatus = req.getStatus(); TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId()); + + // 结束状态需要回复接受成功 + if (TaskStatus.finishedStatus.contains(taskStatus)) { + AskResponse askResponse = AskResponse.succeed(null); + getSender().tell(askResponse, getSelf()); + } + // 手动停止 TaskTracker 的情况下会出现这种情况 if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req); - } else { - taskTracker.updateTaskStatus(req.getTaskId(), req.getStatus(), req.getReportTime(), req.getResult()); + return; } + + if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) { + taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult()); + } + + taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult()); } /** @@ -94,19 +108,6 @@ public class TaskTrackerActor extends AbstractActor { getSender().tell(response, getSelf()); } - /** - * 广播任务前置任务执行完毕 处理器 - */ - private void onReceiveBroadcastTaskPreExecuteFinishedReq(BroadcastTaskPreExecuteFinishedReq req) { - - TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId()); - if (taskTracker == null) { - log.warn("[TaskTrackerActor] receive BroadcastTaskPreExecuteFinishedReq({}) but system can't find TaskTracker.", req); - return; - } - taskTracker.broadcast(req.isSuccess(), req.getSubInstanceId(), req.getTaskId(), req.getReportTime(), req.getMsg()); - } - /** * 服务器任务调度处理器 */ diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/constants/TaskStatus.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/constants/TaskStatus.java index 2dada51d..1ec13f93 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/constants/TaskStatus.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/constants/TaskStatus.java @@ -1,8 +1,11 @@ package com.github.kfcfans.powerjob.worker.common.constants; +import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Getter; +import java.util.List; + /** * 任务状态,task_info 表中 status 字段的枚举值 * @@ -20,6 +23,8 @@ public enum TaskStatus { WORKER_PROCESS_FAILED(5, "worker执行失败"), WORKER_PROCESS_SUCCESS(6, "worker执行成功"); + public static final List finishedStatus = Lists.newArrayList(WORKER_PROCESS_FAILED.value, WORKER_PROCESS_SUCCESS.value); + private int value; private String des; diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java index a0c168fb..42114b90 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/common/utils/AkkaUtils.java @@ -1,15 +1,24 @@ package com.github.kfcfans.powerjob.worker.common.utils; +import akka.actor.ActorSelection; +import akka.pattern.Patterns; +import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.common.RemoteConstant; +import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + /** * AKKA 工具类 * * @author tjq * @since 2020/3/17 */ +@Slf4j public class AkkaUtils { /** @@ -28,4 +37,21 @@ public class AkkaUtils { return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName); } + /** + * 可靠传输 + * @param remote 远程 AKKA 节点 + * @param msg 需要传输的对象 + * @return true: 对方接收成功 / false: 对方接收失败(可能传输成功但对方处理失败,需要协同处理 AskResponse 返回值) + */ + public static boolean reliableTransmit(ActorSelection remote, Object msg) { + try { + CompletionStage ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); + AskResponse response = (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); + return response.isSuccess(); + }catch (Exception e) { + log.warn("[Oms-Transmitter] transmit {} failed, reason is {}", msg, e.toString()); + } + return false; + } + } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java index 9d2f4ce1..f1eb08f9 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/container/OmsJarContainer.java @@ -133,7 +133,7 @@ public class OmsJarContainer implements OmsContainer { Thread.currentThread().setContextClassLoader(oldCL); } - log.info("[OmsJarContainer] init container(name={},jarPath={}) successfully", containerId, localJarFile.getPath()); + log.info("[OmsJarContainer-{}] init container(name={},jarPath={}) successfully", containerId, name, localJarFile.getPath()); } @Override diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java index 45eee519..4000dd47 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/executor/ProcessorRunnable.java @@ -6,13 +6,13 @@ import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; +import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.github.kfcfans.powerjob.worker.persistence.TaskDO; import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService; import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo; -import com.github.kfcfans.powerjob.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; @@ -26,6 +26,7 @@ import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; import java.util.List; +import java.util.Queue; /** * Processor 执行器 @@ -45,6 +46,8 @@ public class ProcessorRunnable implements Runnable { private final OmsLogger omsLogger; // 类加载器 private final ClassLoader classLoader; + // 重试队列,ProcessorTracker 将会定期重新上报处理结果 + private final Queue statusReportRetryQueue; public void innerRun() throws InterruptedException { @@ -68,40 +71,31 @@ public class ProcessorRunnable implements Runnable { taskContext.setUserContext(OhMyWorker.getConfig().getUserContext()); ThreadLocalStore.setTask(task); - reportStatus(TaskStatus.WORKER_PROCESSING, null); + reportStatus(TaskStatus.WORKER_PROCESSING, null, null); // 1. 根任务特殊处理 + ProcessResult processResult; ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName())) { // 广播执行:先选本机执行 preProcess,完成后TaskTracker再为所有Worker生成子Task if (executeType == ExecuteType.BROADCAST) { - BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq(); - spReq.setTaskId(taskId); - spReq.setInstanceId(instanceId); - spReq.setSubInstanceId(task.getSubInstanceId()); - if (processor instanceof BroadcastProcessor) { BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; try { - ProcessResult processResult = broadcastProcessor.preProcess(taskContext); - spReq.setSuccess(processResult.isSuccess()); - spReq.setMsg(suit(processResult.getMsg())); - }catch (Exception e) { + processResult = broadcastProcessor.preProcess(taskContext); + }catch (Throwable e) { log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e); - spReq.setSuccess(false); - spReq.setMsg(e.toString()); + processResult = new ProcessResult(false, e.toString()); } }else { - spReq.setSuccess(true); - spReq.setMsg("NO_PREPOST_TASK"); + processResult = new ProcessResult(true, "NO_PREPOST_TASK"); } - spReq.setReportTime(System.currentTimeMillis()); - taskTrackerActor.tell(spReq, null); + reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST); // 广播执行的第一个 task 只执行 preProcess 部分 return; } @@ -113,7 +107,6 @@ public class ProcessorRunnable implements Runnable { Stopwatch stopwatch = Stopwatch.createStarted(); log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId); - ProcessResult lastResult; List taskResults = TaskPersistenceService.INSTANCE.getAllTaskResult(instanceId, task.getSubInstanceId()); try { switch (executeType) { @@ -121,30 +114,30 @@ public class ProcessorRunnable implements Runnable { if (processor instanceof BroadcastProcessor) { BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor; - lastResult = broadcastProcessor.postProcess(taskContext, taskResults); + processResult = broadcastProcessor.postProcess(taskContext, taskResults); }else { - lastResult = BroadcastProcessor.defaultResult(taskResults); + processResult = BroadcastProcessor.defaultResult(taskResults); } break; case MAP_REDUCE: if (processor instanceof MapReduceProcessor) { MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor; - lastResult = mapReduceProcessor.reduce(taskContext, taskResults); + processResult = mapReduceProcessor.reduce(taskContext, taskResults); }else { - lastResult = new ProcessResult(false, "not implement the MapReduceProcessor"); + processResult = new ProcessResult(false, "not implement the MapReduceProcessor"); } break; default: - lastResult = new ProcessResult(false, "IMPOSSIBLE OR BUG"); + processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG"); } - }catch (Exception e) { - lastResult = new ProcessResult(false, e.toString()); + }catch (Throwable e) { + processResult = new ProcessResult(false, e.toString()); log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e); } - TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; - reportStatus(status, suit(lastResult.getMsg())); + TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED; + reportStatus(status, suit(processResult.getMsg()), null); log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch); return; @@ -152,29 +145,43 @@ public class ProcessorRunnable implements Runnable { // 3. 正式提交运行 - ProcessResult processResult; try { processResult = processor.process(taskContext); - }catch (Exception e) { + }catch (Throwable e) { log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e); processResult = new ProcessResult(false, e.toString()); } - reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg())); + reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null); } /** * 上报状态给 TaskTracker + * @param status Task状态 + * @param result 执行结果,只有结束时才存在 + * @param cmd 特殊需求,比如广播执行需要创建广播任务 */ - private void reportStatus(TaskStatus status, String result) { + private void reportStatus(TaskStatus status, String result, Integer cmd) { ProcessorReportTaskStatusReq req = new ProcessorReportTaskStatusReq(); req.setInstanceId(task.getInstanceId()); + req.setSubInstanceId(task.getSubInstanceId()); req.setTaskId(task.getTaskId()); req.setStatus(status.getValue()); req.setResult(result); req.setReportTime(System.currentTimeMillis()); + req.setCmd(cmd); - taskTrackerActor.tell(req, null); + // 最终结束状态要求可靠发送 + if (TaskStatus.finishedStatus.contains(status.getValue())) { + boolean success = AkkaUtils.reliableTransmit(taskTrackerActor, req); + if (!success) { + // 插入重试队列,等待重试 + statusReportRetryQueue.add(req); + log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed.", task.getInstanceId(), task.getTaskId(), status, result); + } + }else { + taskTrackerActor.tell(req, null); + } } @Override @@ -184,8 +191,9 @@ public class ProcessorRunnable implements Runnable { try { innerRun(); }catch (InterruptedException ignore) { - }catch (Exception e) { - log.error("[ProcessorRunnable-{}] execute failed, please fix this bug @tjq!", task.getInstanceId(), e); + }catch (Throwable e) { + reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null); + log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e); }finally { ThreadLocalStore.clear(); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java index a7a84f69..cbe0ff58 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java @@ -1,9 +1,6 @@ package com.github.kfcfans.powerjob.worker.core.processor.sdk; -import akka.actor.ActorSelection; -import akka.pattern.Patterns; import com.github.kfcfans.powerjob.common.RemoteConstant; -import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; @@ -14,10 +11,7 @@ import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorMapTaskRequest; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; -import java.time.Duration; import java.util.List; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; /** * Map 处理器,允许开发者自定义拆分任务进行分布式执行 @@ -29,7 +23,6 @@ import java.util.concurrent.TimeUnit; public abstract class MapProcessor implements BasicProcessor { private static final int RECOMMEND_BATCH_SIZE = 200; - private static final int REQUEST_TIMEOUT_MS = 5000; /** * 分发子任务 @@ -53,20 +46,13 @@ public abstract class MapProcessor implements BasicProcessor { ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName); // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) - boolean requestSucceed = false; - try { - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME); - ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); - CompletionStage requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS)); - AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); - requestSucceed = respObj.isSuccess(); - }catch (Exception e) { - log.warn("[MapProcessor] map failed, exception is {}.", e.toString()); - } + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME); + boolean requestSucceed = AkkaUtils.reliableTransmit(OhMyWorker.actorSystem.actorSelection(akkaRemotePath), req); if (requestSucceed) { return new ProcessResult(true, "MAP_SUCCESS"); }else { + log.warn("[MapProcessor] map failed for {}", taskName); return new ProcessResult(false, "MAP_FAILED"); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 0acfd455..d0df40bf 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -21,11 +21,13 @@ import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatus import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; +import com.google.common.collect.Queues; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.util.CollectionUtils; import java.util.List; +import java.util.Queue; import java.util.concurrent.*; /** @@ -50,6 +52,8 @@ public class ProcessorTracker { private OmsContainer omsContainer; // 在线日志 private OmsLogger omsLogger; + // ProcessResult 上报失败的重试队列 + private Queue statusReportRetryQueue; private String taskTrackerAddress; private ActorSelection taskTrackerActorRef; @@ -77,6 +81,7 @@ public class ProcessorTracker { this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); this.omsLogger = new OmsServerLogger(instanceId); + this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); // 初始化 线程池,TimingPool 启动的任务会检查 ThreadPool,所以必须先初始化线程池,否则NPE initThreadPool(); @@ -86,7 +91,7 @@ public class ProcessorTracker { initProcessor(); log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId); - }catch (Exception e) { + }catch (Throwable e) { log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, e); lethal = true; lethalReason = e.toString(); @@ -108,7 +113,7 @@ public class ProcessorTracker { // 一旦 ProcessorTracker 出现异常,所有提交到此处的任务直接返回失败,防止形成死锁 // 死锁分析:TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T if (lethal) { - ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, newTask.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), lethalReason, System.currentTimeMillis()); + ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, newTask.getSubInstanceId(), newTask.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), lethalReason, System.currentTimeMillis(), null); taskTrackerActorRef.tell(report, null); return; } @@ -119,7 +124,7 @@ public class ProcessorTracker { newTask.setAddress(taskTrackerAddress); ClassLoader classLoader = omsContainer == null ? getClass().getClassLoader() : omsContainer.getContainerClassLoader(); - ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader); + ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader, statusReportRetryQueue); try { threadPool.submit(processorRunnable); success = true; @@ -134,6 +139,7 @@ public class ProcessorTracker { if (success) { ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq(); reportReq.setInstanceId(instanceId); + reportReq.setSubInstanceId(newTask.getSubInstanceId()); reportReq.setTaskId(newTask.getTaskId()); reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); reportReq.setReportTime(System.currentTimeMillis()); @@ -165,6 +171,7 @@ public class ProcessorTracker { // 2. 去除顶层引用,送入GC世界 taskTrackerActorRef = null; + statusReportRetryQueue.clear(); ProcessorTrackerPool.removeProcessorTracker(instanceId); log.info("[ProcessorTracker-{}] ProcessorTracker already destroyed!", instanceId); @@ -224,6 +231,19 @@ public class ProcessorTracker { } } + // 上报状态之前,先重新发送失败的任务,只要有结果堆积,就不上报状态(让 PT 认为该 TT 失联然后重试相关任务) + while (!statusReportRetryQueue.isEmpty()) { + ProcessorReportTaskStatusReq req = statusReportRetryQueue.poll(); + if (req != null) { + req.setReportTime(System.currentTimeMillis()); + if (!AkkaUtils.reliableTransmit(taskTrackerActorRef, req)) { + statusReportRetryQueue.add(req); + return; + } + } + } + + // 上报当前 ProcessorTracker 负载 long waitingNum = threadPool.getQueue().size(); ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceId, waitingNum); taskTrackerActorRef.tell(req, null); @@ -247,7 +267,7 @@ public class ProcessorTracker { try { processor = SpringUtils.getBean(processorInfo); }catch (Exception e) { - log.warn("[ProcessorRunnable-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, e.toString()); + log.warn("[ProcessorTracker-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, e.toString()); } } // 反射加载 @@ -263,18 +283,20 @@ public class ProcessorTracker { break; case JAVA_CONTAINER: String[] split = processorInfo.split("#"); + log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", instanceId, split[1], split[0]); + omsContainer = OmsContainerFactory.getContainer(Long.valueOf(split[0])); if (omsContainer != null) { processor = omsContainer.getProcessor(split[1]); } break; default: - log.warn("[ProcessorRunnable-{}] unknown processor type: {}.", instanceId, processorType); + log.warn("[ProcessorTracker-{}] unknown processor type: {}.", instanceId, processorType); throw new OmsException("unknown processor type of " + processorType); } if (processor == null) { - log.warn("[ProcessorRunnable-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo); + log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo); throw new OmsException("fetch Processor failed"); } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTrackerPool.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTrackerPool.java index 9c2bdc0d..e6ffe539 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTrackerPool.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTrackerPool.java @@ -1,9 +1,13 @@ package com.github.kfcfans.powerjob.worker.core.tracker.processor; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.function.Supplier; /** * 持有 Processor 对象 @@ -14,23 +18,36 @@ import java.util.function.Function; */ public class ProcessorTrackerPool { - private static final Map instanceId2ProcessorTracker = Maps.newConcurrentMap(); + // instanceId -> (TaskTrackerAddress -> ProcessorTracker) + // 处理脑裂情况下同一个 Instance 存在多个 TaskTracker 的情况 + private static final Map> processorTrackerPool = Maps.newHashMap(); /** * 获取 ProcessorTracker,如果不存在则创建 */ - public static ProcessorTracker getProcessorTracker(Long instanceId, Function creator) { - return instanceId2ProcessorTracker.computeIfAbsent(instanceId, creator); + public static ProcessorTracker getProcessorTracker(Long instanceId, String address, Supplier creator) { + + ProcessorTracker processorTracker = processorTrackerPool.getOrDefault(instanceId, Collections.emptyMap()).get(address); + if (processorTracker == null) { + synchronized (ProcessorTrackerPool.class) { + processorTracker = processorTrackerPool.getOrDefault(instanceId, Collections.emptyMap()).get(address); + if (processorTracker == null) { + processorTracker = creator.get(); + processorTrackerPool.computeIfAbsent(instanceId, ignore -> Maps.newHashMap()).put(address, processorTracker); + } + } + } + return processorTracker; } - /** - * 获取 ProcessorTracker - */ - public static ProcessorTracker getProcessorTracker(Long instanceId) { - return instanceId2ProcessorTracker.get(instanceId); - } + public static List removeProcessorTracker(Long instanceId) { - public static void removeProcessorTracker(Long instanceId) { - instanceId2ProcessorTracker.remove(instanceId); + List res = Lists.newLinkedList(); + Map ttAddress2Pt = processorTrackerPool.remove(instanceId); + if (ttAddress2Pt != null) { + res.addAll(ttAddress2Pt.values()); + ttAddress2Pt.clear(); + } + return res; } } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index 530b421d..848bd9e4 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -47,7 +47,9 @@ public class CommonTaskTracker extends TaskTracker { @Override protected void initTaskTracker(ServerScheduleJobReq req) { - ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("oms-TaskTrackerTimingPool-%d").build(); + // CommonTaskTrackerTimingPool 缩写 + String poolName = String.format("ctttp-%d", req.getInstanceId()) + "-%d"; + ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build(); this.scheduledPool = Executors.newScheduledThreadPool(2, factory); // 持久化根任务 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index 88e97861..2d60b3ce 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -82,7 +82,8 @@ public class FrequentTaskTracker extends TaskTracker { subInstanceId2TimeHolder = Maps.newConcurrentMap(); // 1. 初始化定时调度线程池 - ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("oms-TaskTrackerTimingPool-%d").build(); + String poolName = String.format("ftttp-%d", req.getInstanceId()) + "-%d"; + ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build(); this.scheduledPool = Executors.newScheduledThreadPool(3, factory); // 2. 启动任务发射器 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java index f57e5811..cfb8b907 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/TaskTracker.java @@ -2,10 +2,12 @@ package com.github.kfcfans.powerjob.worker.core.tracker.task; import akka.actor.ActorSelection; import com.github.kfcfans.powerjob.common.ExecuteType; +import com.github.kfcfans.powerjob.common.InstanceStatus; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; +import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.SegmentLock; import com.github.kfcfans.powerjob.worker.OhMyWorker; @@ -97,7 +99,7 @@ public abstract class TaskTracker { // 子类自定义初始化操作 initTaskTracker(req); - log.info("[TaskTracker-{}] create TaskTracker from request({}) successfully.", instanceId, req); + log.info("[TaskTracker-{}] create TaskTracker successfully.", instanceId); } /** @@ -106,12 +108,30 @@ public abstract class TaskTracker { * @return API/CRON -> CommonTaskTracker, FIX_RATE/FIX_DELAY -> FrequentTaskTracker */ public static TaskTracker create(ServerScheduleJobReq req) { - TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType()); - switch (timeExpressionType) { - case FIX_RATE: - case FIX_DELAY:return new FrequentTaskTracker(req); - default:return new CommonTaskTracker(req); + try { + TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType()); + switch (timeExpressionType) { + case FIX_RATE: + case FIX_DELAY:return new FrequentTaskTracker(req); + default:return new CommonTaskTracker(req); + } + }catch (Exception e) { + log.warn("[TaskTracker-{}] create TaskTracker from request({}) failed.", req.getInstanceId(), req, e); + + // 直接发送失败请求 + TaskTrackerReportInstanceStatusReq response = new TaskTrackerReportInstanceStatusReq(); + BeanUtils.copyProperties(req, response); + response.setInstanceStatus(InstanceStatus.FAILED.getV()); + response.setResult(String.format("init TaskTracker failed, reason: %s", e.toString())); + response.setReportTime(System.currentTimeMillis()); + response.setStartTime(System.currentTimeMillis()); + response.setSourceAddress(OhMyWorker.getWorkerAddress()); + + String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); + ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); + serverActor.tell(response, null); } + return null; } /* *************************** 对外方法区 *************************** */ @@ -210,6 +230,8 @@ public abstract class TaskTracker { } } catch (InterruptedException ignore) { + } catch (Exception e) { + log.warn("[TaskTracker-{}] update task status failed.", instanceId, e); } finally { segmentLock.unlock(lockId); } @@ -254,10 +276,9 @@ public abstract class TaskTracker { * @param preExecuteSuccess 预执行广播任务运行状态 * @param subInstanceId 子实例ID * @param preTaskId 预执行广播任务的taskId - * @param reportTime 上报时间 * @param result 预执行广播任务的结果 */ - public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, long reportTime, String result) { + public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, String result) { if (finished.get()) { return; @@ -265,7 +286,7 @@ public abstract class TaskTracker { log.info("[TaskTracker-{}] finished broadcast's preProcess.", instanceId); - // 1. 生成集群子任务 + // 生成集群子任务 if (preExecuteSuccess) { List allWorkerAddress = ptStatusHolder.getAllProcessorTrackers(); List subTaskList = Lists.newLinkedList(); @@ -280,10 +301,6 @@ public abstract class TaskTracker { }else { log.debug("[TaskTracker-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", instanceId, result); } - - // 2. 更新根任务状态(广播任务的根任务为 preProcess 任务) - int status = preExecuteSuccess ? TaskStatus.WORKER_PROCESS_SUCCESS.getValue() : TaskStatus.WORKER_PROCESS_FAILED.getValue(); - updateTaskStatus(preTaskId, status, reportTime, result); } /** diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java deleted file mode 100644 index c0f671b8..00000000 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/BroadcastTaskPreExecuteFinishedReq.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.github.kfcfans.powerjob.worker.pojo.request; - -import com.github.kfcfans.powerjob.common.OmsSerializable; -import lombok.Data; - - -/** - * 广播任务 preExecute 结束信息 - * - * @author tjq - * @since 2020/3/23 - */ -@Data -public class BroadcastTaskPreExecuteFinishedReq implements OmsSerializable { - - private Long instanceId; - private Long subInstanceId; - private String taskId; - - private boolean success; - private String msg; - - // 上报时间 - private long reportTime; -} diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorReportTaskStatusReq.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorReportTaskStatusReq.java index c936f644..9d33b8b4 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorReportTaskStatusReq.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/pojo/request/ProcessorReportTaskStatusReq.java @@ -17,7 +17,10 @@ import lombok.NoArgsConstructor; @AllArgsConstructor public class ProcessorReportTaskStatusReq implements OmsSerializable { + public static final Integer BROADCAST = 1; + private Long instanceId; + private Long subInstanceId; private String taskId; private int status; @@ -29,4 +32,6 @@ public class ProcessorReportTaskStatusReq implements OmsSerializable { // 上报时间 private long reportTime; + // 特殊请求名称 + private Integer cmd; }