[merge] Merge branch 'v3.0.1'

This commit is contained in:
tjq 2020-06-16 22:19:07 +08:00
commit de70c9467c
38 changed files with 428 additions and 353 deletions

View File

@ -10,12 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-client</artifactId>
<version>3.0.0</version>
<version>3.0.1</version>
<packaging>jar</packaging>
<properties>
<powerjob.common.version>3.0.0</powerjob.common.version>
<junit.version>5.6.1</junit.version>
<powerjob.common.version>3.0.1</powerjob.common.version>
</properties>
<dependencies>
@ -25,13 +24,6 @@
<artifactId>powerjob-common</artifactId>
<version>${powerjob.common.version}</version>
</dependency>
<!-- Junit 测试 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -10,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-common</artifactId>
<version>3.0.0</version>
<version>3.0.1</version>
<packaging>jar</packaging>
<properties>
@ -20,6 +20,7 @@
<guava.version>29.0-jre</guava.version>
<okhttp.version>4.4.1</okhttp.version>
<akka.version>2.6.4</akka.version>
<junit.version>5.6.1</junit.version>
</properties>
<dependencies>
@ -68,13 +69,20 @@
<version>${akka.version}</version>
</dependency>
<!-- commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons.io.version}</version>
</dependency>
<!-- Junit 测试 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,61 @@
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 分段锁测试
*
* @author tjq
* @since 2020/6/15
*/
public class SegmentLockTest {
@Test
public void testLock() throws Exception {
int lockId = 10086;
SegmentLock lock = new SegmentLock(4);
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.execute(() -> {
System.out.println("before lock A");
lock.lockInterruptibleSafe(lockId);
System.out.println("after lock A");
});
pool.execute(() -> {
System.out.println("before lock AA");
lock.lockInterruptibleSafe(lockId);
System.out.println("after lock AA");
});
Thread.sleep(10000);
}
@Test
public void testUnLock() throws Exception {
int lockId = 10086;
SegmentLock lock = new SegmentLock(4);
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.execute(() -> {
System.out.println("before lock A");
lock.lockInterruptibleSafe(lockId);
System.out.println("after lock A");
try {
Thread.sleep(5000);
}catch (Exception ignore) {
}
lock.unlock(lockId);
});
pool.execute(() -> {
System.out.println("before lock AA");
lock.lockInterruptibleSafe(lockId);
System.out.println("after lock AA");
});
Thread.sleep(10000);
}
}

View File

@ -10,13 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-server</artifactId>
<version>3.0.0</version>
<version>3.0.1</version>
<packaging>jar</packaging>
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.common.version>3.0.0</powerjob.common.version>
<powerjob.common.version>3.0.1</powerjob.common.version>
<mysql.version>8.0.19</mysql.version>
<h2.db.version>1.4.200</h2.db.version>
<zip4j.version>2.5.2</zip4j.version>

View File

@ -31,6 +31,8 @@ import java.util.Optional;
@Slf4j
public class ServerActor extends AbstractActor {
private InstanceManager instanceManager;
@Override
public Receive createReceive() {
return receiveBuilder()
@ -57,7 +59,7 @@ public class ServerActor extends AbstractActor {
*/
private void onReceiveTaskTrackerReportInstanceStatusReq(TaskTrackerReportInstanceStatusReq req) {
try {
InstanceManager.updateStatus(req);
getInstanceManager().updateStatus(req);
// 结束状态成功/失败需要回复消息
if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) {
@ -105,4 +107,12 @@ public class ServerActor extends AbstractActor {
getSender().tell(askResponse, getSelf());
}
// 不需要加锁 Spring IOC 中重复取并没什么问题
private InstanceManager getInstanceManager() {
if (instanceManager == null) {
instanceManager = SpringUtils.getBean(InstanceManager.class);
}
return instanceManager;
}
}

View File

@ -54,6 +54,8 @@ public class InstanceInfoDO {
private Long actualTriggerTime;
// 结束时间
private Long finishedTime;
// 最后上报时间
private Long lastReportTime;
// TaskTracker地址
private String taskTrackerAddress;

View File

@ -36,6 +36,8 @@ import static com.github.kfcfans.powerjob.common.InstanceStatus.*;
@Service
public class DispatchService {
@Resource
private InstanceManager instanceManager;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@ -71,7 +73,7 @@ public class DispatchService {
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now);
InstanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result);
instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result);
return;
}
@ -96,7 +98,7 @@ public class DispatchService {
log.warn("[Dispatcher-{}|{}] cancel dispatch job due to no worker available, clusterStatus is {}.", jobId, instanceId, clusterStatusDescription);
instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE, dbInstanceParams, now);
InstanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE);
instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, SystemInstanceResult.NO_WORKER_AVAILABLE);
return;
}
@ -107,9 +109,6 @@ public class DispatchService {
}
}
// 注册到任务实例管理中心
InstanceManager.register(instanceId, jobInfo);
// 构造请求
ServerScheduleJobReq req = new ServerScheduleJobReq();
BeanUtils.copyProperties(jobInfo, req);

View File

@ -45,6 +45,8 @@ import java.util.stream.Stream;
@Service
public class InstanceLogService {
@Resource
private InstanceManager instanceManager;
@Resource
private GridFsManager gridFsManager;
// 本地数据库操作bean
@ -195,8 +197,8 @@ public class InstanceLogService {
}
// 删除本地数据库数据
try {
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
instanceId2LastReportTime.remove(instanceId);
CommonUtils.executeWithRetry0(() -> localInstanceLogRepository.deleteByInstanceId(instanceId));
}catch (Exception e) {
log.warn("[InstanceLog-{}] delete local instanceLog failed.", instanceId, e);
}
@ -315,10 +317,10 @@ public class InstanceLogService {
@Scheduled(fixedDelay = 60000)
public void timingCheck() {
// 1. 定时删除秒级任务的日志
// 定时删除秒级任务的日志
List<Long> frequentInstanceIds = Lists.newLinkedList();
instanceId2LastReportTime.keySet().forEach(instanceId -> {
JobInfoDO jobInfo = InstanceManager.fetchJobInfo(instanceId);
JobInfoDO jobInfo = instanceManager.fetchJobInfo(instanceId);
if (jobInfo == null) {
return;
}
@ -340,7 +342,7 @@ public class InstanceLogService {
});
}
// 2. 删除长时间未 REPORT 的日志
// 删除长时间未 REPORT 的日志必要性考证中......
}

View File

@ -16,13 +16,15 @@ import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
import com.github.kfcfans.powerjob.server.service.alarm.JobInstanceAlarmContent;
import com.github.kfcfans.powerjob.server.service.timing.schedule.HashedWheelTimerHolder;
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowInstanceManager;
import com.google.common.collect.Maps;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@ -33,114 +35,112 @@ import java.util.concurrent.TimeUnit;
* @since 2020/4/7
*/
@Slf4j
@Service
public class InstanceManager {
// 存储 instanceId 对应的 Job 信息便于重试
private static Map<Long, JobInfoDO> instanceId2JobInfo = Maps.newConcurrentMap();
// 存储 instance 的状态暂时只用到了 lastReportTime
private static Map<Long, InstanceStatusHolder> instanceId2StatusHolder = Maps.newConcurrentMap();
private static Cache<Long, JobInfoDO> instanceId2JobInfo;
// Spring Bean
private static DispatchService dispatchService;
private static InstanceLogService instanceLogService;
private static InstanceInfoRepository instanceInfoRepository;
private static JobInfoRepository jobInfoRepository;
private static Alarmable omsCenterAlarmService;
private static WorkflowInstanceManager workflowInstanceManager;
@Resource
private DispatchService dispatchService;
@Resource
private InstanceLogService instanceLogService;
@Resource(name = "omsCenterAlarmService")
private Alarmable omsCenterAlarmService;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
/**
* 注册到任务实例管理器
* @param instanceId 即将运行的任务实例ID
* @param jobInfoDO 即将运行的任务实例对应的任务元数据
*/
public static void register(Long instanceId, JobInfoDO jobInfoDO) {
private static final int CACHE_CONCURRENCY_LEVEL = 8;
private static final int CACHE_MAX_SIZE = 4096;
InstanceStatusHolder statusHolder = new InstanceStatusHolder();
statusHolder.setInstanceId(instanceId);
statusHolder.setInstanceStatus(InstanceStatus.WAITING_DISPATCH.getV());
instanceId2JobInfo.put(instanceId, jobInfoDO);
instanceId2StatusHolder.put(instanceId, statusHolder);
static {
instanceId2JobInfo = CacheBuilder.newBuilder()
.concurrencyLevel(CACHE_CONCURRENCY_LEVEL)
.maximumSize(CACHE_MAX_SIZE)
.build();
}
/**
* 更新任务状态
* @param req TaskTracker上报任务实例状态的请求
*/
public static void updateStatus(TaskTrackerReportInstanceStatusReq req) {
public void updateStatus(TaskTrackerReportInstanceStatusReq req) throws Exception {
Long jobId = req.getJobId();
Long instanceId = req.getInstanceId();
// 不存在可能该任务实例刚经历Server变更需要重新构建基础信息
if (!instanceId2JobInfo.containsKey(instanceId)) {
log.warn("[InstanceManager] can't find any register info for instance(jobId={},instanceId={}), maybe change the server.", jobId, instanceId);
// 获取相关数据
JobInfoDO jobInfo = instanceId2JobInfo.get(instanceId, () -> {
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
return jobInfoOpt.orElseThrow(() -> new IllegalArgumentException("can't find JobIno by jobId: " + jobId));
});
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
Optional<JobInfoDO> jobInfoDOOptional = getJobInfoRepository().findById(jobId);
if (jobInfoDOOptional.isPresent()) {
JobInfoDO JobInfoDo = jobInfoDOOptional.get();
instanceId2JobInfo.put(instanceId, JobInfoDo);
}else {
throw new IllegalArgumentException("can't find JobIno by jobId:" + jobId);
}
// 丢弃过期的上报数据
if (req.getReportTime() <= instanceInfo.getLastReportTime()) {
log.warn("[InstanceManager-{}] receive the expired status report request: {}, this report will br dropped.", instanceId, req);
return;
}
// 更新本地保存的任务实例状态用于未完成任务前的详细信息查询和缓存加速
InstanceStatusHolder statusHolder = instanceId2StatusHolder.computeIfAbsent(instanceId, ignore -> new InstanceStatusHolder());
if (req.getReportTime() > statusHolder.getLastReportTime()) {
BeanUtils.copyProperties(req, statusHolder);
statusHolder.setLastReportTime(req.getReportTime());
}else {
log.warn("[InstanceManager] receive the expired status report request: {}.", req);
// 丢弃非目标 TaskTracker 的上报数据脑裂情况
if (!req.getSourceAddress().equals(instanceInfo.getTaskTrackerAddress())) {
log.warn("[InstanceManager-{}] receive the other TaskTracker's report: {}, but current TaskTracker is {}, this report will br dropped.", instanceId, req, instanceInfo.getTaskTrackerAddress());
return;
}
InstanceStatus newStatus = InstanceStatus.of(req.getInstanceStatus());
Integer timeExpressionType = instanceId2JobInfo.get(instanceId).getTimeExpressionType();
Integer timeExpressionType = jobInfo.getTimeExpressionType();
instanceInfo.setStatus(newStatus.getV());
instanceInfo.setLastReportTime(req.getReportTime());
instanceInfo.setGmtModified(new Date());
// FREQUENT 任务没有失败重试机制TaskTracker一直运行即可只需要将存活信息同步到DB即可
// FREQUENT 任务的 newStatus 只有2中情况一种是 RUNNING一种是 FAILED表示该机器 overload需要重新选一台机器执行
// 综上直接把 status runningNum 同步到DB即可
if (TimeExpressionType.frequentTypes.contains(timeExpressionType)) {
getInstanceInfoRepository().update4FrequentJob(instanceId, newStatus.getV(), req.getTotalTaskNum(), new Date());
instanceInfo.setRunningTimes(req.getTotalTaskNum());
instanceInfoRepository.saveAndFlush(instanceInfo);
return;
}
InstanceInfoDO updateEntity = getInstanceInfoRepository().findByInstanceId(instanceId);
updateEntity.setStatus(newStatus.getV());
updateEntity.setGmtModified(new Date());
boolean finished = false;
if (newStatus == InstanceStatus.SUCCEED) {
updateEntity.setResult(req.getResult());
updateEntity.setFinishedTime(System.currentTimeMillis());
instanceInfo.setResult(req.getResult());
instanceInfo.setFinishedTime(System.currentTimeMillis());
finished = true;
}else if (newStatus == InstanceStatus.FAILED) {
// 当前重试次数 <= 最大重试次数进行重试 第一次运行runningTimes为1重试一次instanceRetryNum也为1故需要 =
if (updateEntity.getRunningTimes() <= instanceId2JobInfo.get(instanceId).getInstanceRetryNum()) {
if (instanceInfo.getRunningTimes() <= jobInfo.getInstanceRetryNum()) {
log.info("[InstanceManager] instance(instanceId={}) execute failed but will take the {}th retry.", instanceId, updateEntity.getRunningTimes());
log.info("[InstanceManager-{}] instance execute failed but will take the {}th retry.", instanceId, instanceInfo.getRunningTimes());
// 延迟10S重试由于重试不改变 instanceId如果派发到同一台机器上一个 TaskTracker 还处于资源释放阶段无法创建新的TaskTracker任务失败
HashedWheelTimerHolder.TIMER.schedule(() -> {
getDispatchService().redispatch(instanceId2JobInfo.get(instanceId), instanceId, updateEntity.getRunningTimes());
dispatchService.redispatch(jobInfo, instanceId, instanceInfo.getRunningTimes());
}, 10, TimeUnit.SECONDS);
// 修改状态为 等待派发正式开始重试
// 问题会丢失以往的调度记录actualTriggerTime什么的都会被覆盖
updateEntity.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
}else {
updateEntity.setResult(req.getResult());
updateEntity.setFinishedTime(System.currentTimeMillis());
instanceInfo.setResult(req.getResult());
instanceInfo.setFinishedTime(System.currentTimeMillis());
finished = true;
log.info("[InstanceManager] instance(instanceId={}) execute failed and have no chance to retry.", instanceId);
log.info("[InstanceManager-{}] instance execute failed and have no chance to retry.", instanceId);
}
}
// 同步状态变更信息到数据库
getInstanceInfoRepository().saveAndFlush(updateEntity);
instanceInfoRepository.saveAndFlush(instanceInfo);
if (finished) {
// 这里的 InstanceStatus 只有 成功/失败 两种手动停止不会由 TaskTracker 上报
@ -155,53 +155,53 @@ public class InstanceManager {
* @param status 任务状态 成功/失败/手动停止
* @param result 执行结果
*/
public static void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) {
public void processFinishedInstance(Long instanceId, Long wfInstanceId, InstanceStatus status, String result) {
log.info("[Instance-{}] process finished, final status is {}.", instanceId, status.name());
// 清除已完成的实例信息
instanceId2StatusHolder.remove(instanceId);
// 这一步也可能导致后面取不到 JobInfoDO
JobInfoDO jobInfo = instanceId2JobInfo.remove(instanceId);
// 上报日志数据
getInstanceLogService().sync(instanceId);
instanceLogService.sync(instanceId);
// workflow 特殊处理
if (wfInstanceId != null) {
// 手动停止在工作流中也认为是失败理论上不应该发生
getWorkflowInstanceManager().move(wfInstanceId, instanceId, status, result);
workflowInstanceManager.move(wfInstanceId, instanceId, status, result);
}
// 告警
if (status == InstanceStatus.FAILED) {
if (jobInfo == null) {
jobInfo = fetchJobInfo(instanceId);
}
JobInfoDO jobInfo = fetchJobInfo(instanceId);
if (jobInfo == null) {
log.warn("[InstanceManager] can't find jobInfo by instanceId({}), alarm failed.", instanceId);
return;
}
InstanceInfoDO instanceInfo = getInstanceInfoRepository().findByInstanceId(instanceId);
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
JobInstanceAlarmContent content = new JobInstanceAlarmContent();
BeanUtils.copyProperties(jobInfo, content);
BeanUtils.copyProperties(instanceInfo, content);
List<UserInfoDO> userList = SpringUtils.getBean(UserService.class).fetchNotifyUserList(jobInfo.getNotifyUserIds());
getAlarmService().onJobInstanceFailed(content, userList);
omsCenterAlarmService.onJobInstanceFailed(content, userList);
}
// 过期缓存
instanceId2JobInfo.invalidate(instanceId);
}
public static JobInfoDO fetchJobInfo(Long instanceId) {
JobInfoDO jobInfo = instanceId2JobInfo.get(instanceId);
/**
* 根据任务实例ID查询任务相关信息
* @param instanceId 任务实例ID
* @return 任务元数据
*/
public JobInfoDO fetchJobInfo(Long instanceId) {
JobInfoDO jobInfo = instanceId2JobInfo.getIfPresent(instanceId);
if (jobInfo != null) {
return jobInfo;
}
InstanceInfoDO instanceInfo = getInstanceInfoRepository().findByInstanceId(instanceId);
InstanceInfoDO instanceInfo = instanceInfoRepository.findByInstanceId(instanceId);
if (instanceInfo != null) {
return getJobInfoRepository().findById(instanceInfo.getJobId()).orElse(null);
return jobInfoRepository.findById(instanceInfo.getJobId()).orElse(null);
}
return null;
}
@ -209,74 +209,7 @@ public class InstanceManager {
/**
* 释放本地缓存防止内存泄漏
*/
public static void releaseInstanceInfos() {
instanceId2JobInfo = Maps.newConcurrentMap();
instanceId2StatusHolder = Maps.newConcurrentMap();
}
private static InstanceInfoRepository getInstanceInfoRepository() {
while (instanceInfoRepository == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
instanceInfoRepository = SpringUtils.getBean(InstanceInfoRepository.class);
}
return instanceInfoRepository;
}
private static JobInfoRepository getJobInfoRepository() {
while (jobInfoRepository == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
jobInfoRepository = SpringUtils.getBean(JobInfoRepository.class);
}
return jobInfoRepository;
}
private static DispatchService getDispatchService() {
while (dispatchService == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
dispatchService = SpringUtils.getBean(DispatchService.class);
}
return dispatchService;
}
private static InstanceLogService getInstanceLogService() {
while (instanceLogService == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
instanceLogService = SpringUtils.getBean(InstanceLogService.class);
}
return instanceLogService;
}
private static Alarmable getAlarmService() {
while (omsCenterAlarmService == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
omsCenterAlarmService = (Alarmable) SpringUtils.getBean("omsCenterAlarmService");
}
return omsCenterAlarmService;
}
private static WorkflowInstanceManager getWorkflowInstanceManager() {
while (workflowInstanceManager == null) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
workflowInstanceManager = SpringUtils.getBean(WorkflowInstanceManager.class);
}
return workflowInstanceManager;
public static void releaseCache() {
instanceId2JobInfo.cleanUp();
}
}

View File

@ -41,6 +41,8 @@ public class InstanceService {
@Resource
private IdGenerateService idGenerateService;
@Resource
private InstanceManager instanceManager;
@Resource
private InstanceInfoRepository instanceInfoRepository;
/**
@ -67,6 +69,7 @@ public class InstanceService {
newInstanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
newInstanceInfo.setExpectedTriggerTime(expectTriggerTime);
newInstanceInfo.setLastReportTime(-1L);
newInstanceInfo.setGmtCreate(now);
newInstanceInfo.setGmtModified(now);
@ -101,7 +104,7 @@ public class InstanceService {
instanceInfo.setResult(SystemInstanceResult.STOPPED_BY_USER);
instanceInfoRepository.saveAndFlush(instanceInfo);
InstanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), STOPPED, SystemInstanceResult.STOPPED_BY_USER);
instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), STOPPED, SystemInstanceResult.STOPPED_BY_USER);
/*
不可靠通知停止 TaskTracker

View File

@ -1,29 +0,0 @@
package com.github.kfcfans.powerjob.server.service.instance;
import lombok.Data;
/**
* 保存任务实例的状态信息
*
* @author tjq
* @since 2020/4/7
*/
@Data
public class InstanceStatusHolder {
private long instanceId;
private int instanceStatus;
private String result;
/* ********* 统计信息 ********* */
private long totalTaskNum;
private long succeedTaskNum;
private long failedTaskNum;
// 任务开始时间
private long startTime;
// 上次上报时间
private long lastReportTime;
// 源地址TaskTracker 地址
private String sourceAddress;
}

View File

@ -60,7 +60,7 @@ public class CleanService {
// 释放本地缓存
WorkerManagerService.releaseContainerInfos();
InstanceManager.releaseInstanceInfos();
InstanceManager.releaseCache();
// 删除数据库运行记录
cleanInstanceLog();

View File

@ -44,6 +44,8 @@ public class InstanceStatusCheckService {
@Resource
private DispatchService dispatchService;
@Resource
private InstanceManager instanceManager;
@Resource
private WorkflowInstanceManager workflowInstanceManager;
@Resource
@ -139,7 +141,7 @@ public class InstanceStatusCheckService {
}
// CRON API一样失败次数 + 1根据重试配置进行重试
if (instance.getRunningTimes() > jobInfoDO.getInstanceRetryNum()) {
if (instance.getRunningTimes() < jobInfoDO.getInstanceRetryNum()) {
dispatchService.redispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes());
}else {
updateFailedInstance(instance);
@ -190,6 +192,6 @@ public class InstanceStatusCheckService {
instance.setResult(SystemInstanceResult.REPORT_TIMEOUT);
instanceInfoRepository.saveAndFlush(instance);
InstanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, "timeout, maybe TaskTracker down!");
instanceManager.processFinishedInstance(instance.getInstanceId(), instance.getWfInstanceId(), InstanceStatus.FAILED, "timeout, maybe TaskTracker down!");
}
}

View File

@ -21,6 +21,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.scheduling.annotation.Async;
@ -169,10 +170,8 @@ public class OmsScheduleService {
jobInfos.forEach(jobInfoDO -> {
try {
CronExpression cronExpression = new CronExpression(jobInfoDO.getTimeExpression());
Date benchmarkTime = new Date(jobInfoDO.getNextTriggerTime());
Date nextTriggerTime = cronExpression.getNextValidTimeAfter(benchmarkTime);
Date nextTriggerTime = calculateNextTriggerTime(jobInfoDO.getNextTriggerTime(), jobInfoDO.getTimeExpression());
JobInfoDO updatedJobInfo = new JobInfoDO();
BeanUtils.copyProperties(jobInfoDO, updatedJobInfo);
@ -221,8 +220,7 @@ public class OmsScheduleService {
// 3. 重新计算下一次调度时间并更新
try {
CronExpression cronExpression = new CronExpression(wfInfo.getTimeExpression());
Date nextTriggerTime = cronExpression.getNextValidTimeAfter(new Date(wfInfo.getNextTriggerTime()));
Date nextTriggerTime = calculateNextTriggerTime(wfInfo.getNextTriggerTime(), wfInfo.getTimeExpression());
WorkflowInfoDO updateEntity = new WorkflowInfoDO();
BeanUtils.copyProperties(wfInfo, updateEntity);
@ -267,4 +265,18 @@ public class OmsScheduleService {
});
}
/**
* 计算下次触发时间
* @param preTriggerTime 前一次触发时间
* @param cronExpression CRON 表达式
* @return 下一次调度时间
* @throws Exception 异常
*/
private static Date calculateNextTriggerTime(Long preTriggerTime, String cronExpression) throws Exception {
CronExpression ce = new CronExpression(cronExpression);
// 取最大值防止长时间未调度任务被连续调度原来DISABLE的任务突然被打开不取最大值会补上过去所有的调度
long benchmarkTime = Math.max(System.currentTimeMillis(), preTriggerTime);
return ce.getNextValidTimeAfter(new Date(benchmarkTime));
}
}

View File

@ -36,7 +36,7 @@ public class WebLogAspect {
* 第三个*所有的方法
* 最后的两个点所有类型的参数
*/
@Pointcut("execution(public * com.github.kfcfans.oms.server.web.controller..*.*(..))")
@Pointcut("execution(public * com.github.kfcfans.powerjob.server.web.controller..*.*(..))")
public void include() {
}

View File

@ -3,14 +3,14 @@ logging.config=classpath:logback-dev.xml
####### 数据库配置 #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/powerjob-daily?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/powerjob-daily?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置非核心依赖可移除 #######
spring.data.mongodb.uri=mongodb://remotehost:27017/oms-daily
spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-daily
####### 邮件配置(启用邮件报警则需要) #######
spring.mail.host=smtp.163.com

View File

@ -3,14 +3,14 @@ logging.config=classpath:logback-product.xml
####### 数据库配置 #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/powerjob-pre?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3391/powerjob-pre?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置非核心依赖可移除 #######
spring.data.mongodb.uri=mongodb://remotehost:27017/oms-pre
spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-pre
####### 邮件配置(启用邮件报警则需要) #######
spring.mail.host=smtp.qq.com

View File

@ -3,14 +3,14 @@ logging.config=classpath:logback-product.xml
####### 数据库配置 #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-product?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置非核心依赖可移除 #######
spring.data.mongodb.uri=mongodb://localhost:27017/oms-product
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-product
####### 邮件配置(启用邮件报警则需要) #######
spring.mail.host=smtp.qq.com

View File

@ -9,5 +9,5 @@ ${AnsiColor.GREEN}
░░ ░░░░░░ ░░░ ░░░ ░░░░░░ ░░░ ░░░░░ ░░░░░░ ░░░░░
${AnsiColor.BRIGHT_RED}
* Maintainer: tengjiqi@gmail.com
* SourceCode: https://github.com/KFCFans/OhMyScheduler
* SourceCode: https://github.com/KFCFans/PowerJob
* PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.4)

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-agent</artifactId>
<version>3.0.0</version>
<version>3.0.1</version>
<packaging>jar</packaging>
<properties>
<powerjob.worker.version>3.0.0</powerjob.worker.version>
<powerjob.worker.version>3.0.1</powerjob.worker.version>
<logback.version>1.2.3</logback.version>
<picocli.version>4.3.2</picocli.version>

View File

@ -10,11 +10,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker-samples</artifactId>
<version>3.0.0</version>
<version>3.0.1</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<powerjob.worker.version>3.0.0</powerjob.worker.version>
<powerjob.worker.version>3.0.1</powerjob.worker.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->

View File

@ -4,6 +4,7 @@ import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -17,6 +18,10 @@ import java.util.List;
*/
@Configuration
public class OhMySchedulerConfig {
@Value("${powerjob.akka.port}")
private int port;
@Bean
public OhMyWorker initOMS() throws Exception {
@ -25,7 +30,7 @@ public class OhMySchedulerConfig {
// 1. 创建配置文件
OhMyConfig config = new OhMyConfig();
config.setPort(27777);
config.setPort(port);
config.setAppName("oms-test");
config.setServerAddress(serverAddress);
// 如果没有大型 Map/MapReduce 的需求建议使用内存来加速计算

View File

@ -1,3 +1,5 @@
server.port=8081
spring.jpa.open-in-view=false
powerjob.akka.port=27777

View File

@ -10,12 +10,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-worker</artifactId>
<version>3.0.0</version>
<version>3.0.1</version>
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<powerjob.common.version>3.0.0</powerjob.common.version>
<powerjob.common.version>3.0.1</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>

View File

@ -7,6 +7,9 @@ import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 普通计算节点处理来自 TaskTracker 的请求
@ -28,13 +31,14 @@ public class ProcessorTrackerActor extends AbstractActor {
/**
* 处理来自TaskTracker的task执行请求
* @param req 请求
*/
private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
Long instanceId = req.getInstanceInfo().getInstanceId();
// 创建 ProcessorTracker 一定能成功且每个任务实例只会创建一个 ProcessorTracker
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> new ProcessorTracker(req));
// 创建 ProcessorTracker 一定能成功
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, req.getTaskTrackerAddress(), () -> new ProcessorTracker(req));
TaskDO task = new TaskDO();
@ -47,14 +51,18 @@ public class ProcessorTrackerActor extends AbstractActor {
processorTracker.submitTask(task);
}
/**
* 处理来自TaskTracker停止任务的请求
* @param req 请求
*/
private void onReceiveTaskTrackerStopInstanceReq(TaskTrackerStopInstanceReq req) {
Long instanceId = req.getInstanceId();
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId);
if (processorTracker == null) {
List<ProcessorTracker> removedPts = ProcessorTrackerPool.removeProcessorTracker(instanceId);
if (CollectionUtils.isEmpty(removedPts)) {
log.warn("[ProcessorTrackerActor] ProcessorTracker for instance(instanceId={}) already destroyed.", instanceId);
}else {
processorTracker.destroy();
removedPts.forEach(ProcessorTracker::destroy);
}
}
}

View File

@ -1,19 +1,21 @@
package com.github.kfcfans.powerjob.worker.actors;
import akka.actor.AbstractActor;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
import com.github.kfcfans.powerjob.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorMapTaskRequest;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.google.common.collect.Lists;
import javafx.concurrent.Task;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@ -34,7 +36,6 @@ public class TaskTrackerActor extends AbstractActor {
.match(ServerScheduleJobReq.class, this::onReceiveServerScheduleJobReq)
.match(ProcessorMapTaskRequest.class, this::onReceiveProcessorMapTaskRequest)
.match(ProcessorTrackerStatusReportReq.class, this::onReceiveProcessorTrackerStatusReportReq)
.match(BroadcastTaskPreExecuteFinishedReq.class, this::onReceiveBroadcastTaskPreExecuteFinishedReq)
.match(ServerStopInstanceReq.class, this::onReceiveServerStopInstanceReq)
.match(ServerQueryInstanceStatusReq.class, this::onReceiveServerQueryInstanceStatusReq)
.matchAny(obj -> log.warn("[ServerRequestActor] receive unknown request: {}.", obj))
@ -47,13 +48,26 @@ public class TaskTrackerActor extends AbstractActor {
*/
private void onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) {
int taskStatus = req.getStatus();
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
// 结束状态需要回复接受成功
if (TaskStatus.finishedStatus.contains(taskStatus)) {
AskResponse askResponse = AskResponse.succeed(null);
getSender().tell(askResponse, getSelf());
}
// 手动停止 TaskTracker 的情况下会出现这种情况
if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req);
} else {
taskTracker.updateTaskStatus(req.getTaskId(), req.getStatus(), req.getReportTime(), req.getResult());
return;
}
if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) {
taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult());
}
taskTracker.updateTaskStatus(req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
}
/**
@ -94,19 +108,6 @@ public class TaskTrackerActor extends AbstractActor {
getSender().tell(response, getSelf());
}
/**
* 广播任务前置任务执行完毕 处理器
*/
private void onReceiveBroadcastTaskPreExecuteFinishedReq(BroadcastTaskPreExecuteFinishedReq req) {
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive BroadcastTaskPreExecuteFinishedReq({}) but system can't find TaskTracker.", req);
return;
}
taskTracker.broadcast(req.isSuccess(), req.getSubInstanceId(), req.getTaskId(), req.getReportTime(), req.getMsg());
}
/**
* 服务器任务调度处理器
*/

View File

@ -1,8 +1,11 @@
package com.github.kfcfans.powerjob.worker.common.constants;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.List;
/**
* 任务状态task_info 表中 status 字段的枚举值
*
@ -20,6 +23,8 @@ public enum TaskStatus {
WORKER_PROCESS_FAILED(5, "worker执行失败"),
WORKER_PROCESS_SUCCESS(6, "worker执行成功");
public static final List<Integer> finishedStatus = Lists.newArrayList(WORKER_PROCESS_FAILED.value, WORKER_PROCESS_SUCCESS.value);
private int value;
private String des;

View File

@ -1,15 +1,24 @@
package com.github.kfcfans.powerjob.worker.common.utils;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* AKKA 工具类
*
* @author tjq
* @since 2020/3/17
*/
@Slf4j
public class AkkaUtils {
/**
@ -28,4 +37,21 @@ public class AkkaUtils {
return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName);
}
/**
* 可靠传输
* @param remote 远程 AKKA 节点
* @param msg 需要传输的对象
* @return true: 对方接收成功 / false: 对方接收失败可能传输成功但对方处理失败需要协同处理 AskResponse 返回值
*/
public static boolean reliableTransmit(ActorSelection remote, Object msg) {
try {
CompletionStage<Object> ask = Patterns.ask(remote, msg, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
AskResponse response = (AskResponse) ask.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
return response.isSuccess();
}catch (Exception e) {
log.warn("[Oms-Transmitter] transmit {} failed, reason is {}", msg, e.toString());
}
return false;
}
}

View File

@ -133,7 +133,7 @@ public class OmsJarContainer implements OmsContainer {
Thread.currentThread().setContextClassLoader(oldCL);
}
log.info("[OmsJarContainer] init container(name={},jarPath={}) successfully", containerId, localJarFile.getPath());
log.info("[OmsJarContainer-{}] init container(name={},jarPath={}) successfully", containerId, name, localJarFile.getPath());
}
@Override

View File

@ -6,13 +6,13 @@ import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SerializerUtils;
import com.github.kfcfans.powerjob.worker.core.processor.TaskResult;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
import com.github.kfcfans.powerjob.worker.pojo.model.InstanceInfo;
import com.github.kfcfans.powerjob.worker.pojo.request.BroadcastTaskPreExecuteFinishedReq;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatusReq;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
@ -26,6 +26,7 @@ import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Queue;
/**
* Processor 执行器
@ -45,6 +46,8 @@ public class ProcessorRunnable implements Runnable {
private final OmsLogger omsLogger;
// 类加载器
private final ClassLoader classLoader;
// 重试队列ProcessorTracker 将会定期重新上报处理结果
private final Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
public void innerRun() throws InterruptedException {
@ -68,40 +71,31 @@ public class ProcessorRunnable implements Runnable {
taskContext.setUserContext(OhMyWorker.getConfig().getUserContext());
ThreadLocalStore.setTask(task);
reportStatus(TaskStatus.WORKER_PROCESSING, null);
reportStatus(TaskStatus.WORKER_PROCESSING, null, null);
// 1. 根任务特殊处理
ProcessResult processResult;
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName())) {
// 广播执行先选本机执行 preProcess完成后TaskTracker再为所有Worker生成子Task
if (executeType == ExecuteType.BROADCAST) {
BroadcastTaskPreExecuteFinishedReq spReq = new BroadcastTaskPreExecuteFinishedReq();
spReq.setTaskId(taskId);
spReq.setInstanceId(instanceId);
spReq.setSubInstanceId(task.getSubInstanceId());
if (processor instanceof BroadcastProcessor) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
try {
ProcessResult processResult = broadcastProcessor.preProcess(taskContext);
spReq.setSuccess(processResult.isSuccess());
spReq.setMsg(suit(processResult.getMsg()));
}catch (Exception e) {
processResult = broadcastProcessor.preProcess(taskContext);
}catch (Throwable e) {
log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", instanceId, e);
spReq.setSuccess(false);
spReq.setMsg(e.toString());
processResult = new ProcessResult(false, e.toString());
}
}else {
spReq.setSuccess(true);
spReq.setMsg("NO_PREPOST_TASK");
processResult = new ProcessResult(true, "NO_PREPOST_TASK");
}
spReq.setReportTime(System.currentTimeMillis());
taskTrackerActor.tell(spReq, null);
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST);
// 广播执行的第一个 task 只执行 preProcess 部分
return;
}
@ -113,7 +107,6 @@ public class ProcessorRunnable implements Runnable {
Stopwatch stopwatch = Stopwatch.createStarted();
log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
ProcessResult lastResult;
List<TaskResult> taskResults = TaskPersistenceService.INSTANCE.getAllTaskResult(instanceId, task.getSubInstanceId());
try {
switch (executeType) {
@ -121,30 +114,30 @@ public class ProcessorRunnable implements Runnable {
if (processor instanceof BroadcastProcessor) {
BroadcastProcessor broadcastProcessor = (BroadcastProcessor) processor;
lastResult = broadcastProcessor.postProcess(taskContext, taskResults);
processResult = broadcastProcessor.postProcess(taskContext, taskResults);
}else {
lastResult = BroadcastProcessor.defaultResult(taskResults);
processResult = BroadcastProcessor.defaultResult(taskResults);
}
break;
case MAP_REDUCE:
if (processor instanceof MapReduceProcessor) {
MapReduceProcessor mapReduceProcessor = (MapReduceProcessor) processor;
lastResult = mapReduceProcessor.reduce(taskContext, taskResults);
processResult = mapReduceProcessor.reduce(taskContext, taskResults);
}else {
lastResult = new ProcessResult(false, "not implement the MapReduceProcessor");
processResult = new ProcessResult(false, "not implement the MapReduceProcessor");
}
break;
default:
lastResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
}
}catch (Exception e) {
lastResult = new ProcessResult(false, e.toString());
}catch (Throwable e) {
processResult = new ProcessResult(false, e.toString());
log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", instanceId, taskId, e);
}
TaskStatus status = lastResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
reportStatus(status, suit(lastResult.getMsg()));
TaskStatus status = processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
reportStatus(status, suit(processResult.getMsg()), null);
log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", instanceId, stopwatch);
return;
@ -152,29 +145,43 @@ public class ProcessorRunnable implements Runnable {
// 3. 正式提交运行
ProcessResult processResult;
try {
processResult = processor.process(taskContext);
}catch (Exception e) {
}catch (Throwable e) {
log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);
processResult = new ProcessResult(false, e.toString());
}
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()));
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null);
}
/**
* 上报状态给 TaskTracker
* @param status Task状态
* @param result 执行结果只有结束时才存在
* @param cmd 特殊需求比如广播执行需要创建广播任务
*/
private void reportStatus(TaskStatus status, String result) {
private void reportStatus(TaskStatus status, String result, Integer cmd) {
ProcessorReportTaskStatusReq req = new ProcessorReportTaskStatusReq();
req.setInstanceId(task.getInstanceId());
req.setSubInstanceId(task.getSubInstanceId());
req.setTaskId(task.getTaskId());
req.setStatus(status.getValue());
req.setResult(result);
req.setReportTime(System.currentTimeMillis());
req.setCmd(cmd);
taskTrackerActor.tell(req, null);
// 最终结束状态要求可靠发送
if (TaskStatus.finishedStatus.contains(status.getValue())) {
boolean success = AkkaUtils.reliableTransmit(taskTrackerActor, req);
if (!success) {
// 插入重试队列等待重试
statusReportRetryQueue.add(req);
log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed.", task.getInstanceId(), task.getTaskId(), status, result);
}
}else {
taskTrackerActor.tell(req, null);
}
}
@Override
@ -184,8 +191,9 @@ public class ProcessorRunnable implements Runnable {
try {
innerRun();
}catch (InterruptedException ignore) {
}catch (Exception e) {
log.error("[ProcessorRunnable-{}] execute failed, please fix this bug @tjq!", task.getInstanceId(), e);
}catch (Throwable e) {
reportStatus(TaskStatus.WORKER_PROCESS_FAILED, e.toString(), null);
log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", task.getInstanceId(), e);
}finally {
ThreadLocalStore.clear();
}

View File

@ -1,9 +1,6 @@
package com.github.kfcfans.powerjob.worker.core.processor.sdk;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
@ -14,10 +11,7 @@ import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorMapTaskRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* Map 处理器允许开发者自定义拆分任务进行分布式执行
@ -29,7 +23,6 @@ import java.util.concurrent.TimeUnit;
public abstract class MapProcessor implements BasicProcessor {
private static final int RECOMMEND_BATCH_SIZE = 200;
private static final int REQUEST_TIMEOUT_MS = 5000;
/**
* 分发子任务
@ -53,20 +46,13 @@ public abstract class MapProcessor implements BasicProcessor {
ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);
// 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常
boolean requestSucceed = false;
try {
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME);
ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
CompletionStage<Object> requestCS = Patterns.ask(actorSelection, req, Duration.ofMillis(REQUEST_TIMEOUT_MS));
AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
requestSucceed = respObj.isSuccess();
}catch (Exception e) {
log.warn("[MapProcessor] map failed, exception is {}.", e.toString());
}
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME);
boolean requestSucceed = AkkaUtils.reliableTransmit(OhMyWorker.actorSystem.actorSelection(akkaRemotePath), req);
if (requestSucceed) {
return new ProcessResult(true, "MAP_SUCCESS");
}else {
log.warn("[MapProcessor] map failed for {}", taskName);
return new ProcessResult(false, "MAP_FAILED");
}
}

View File

@ -21,11 +21,13 @@ import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatus
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
/**
@ -50,6 +52,8 @@ public class ProcessorTracker {
private OmsContainer omsContainer;
// 在线日志
private OmsLogger omsLogger;
// ProcessResult 上报失败的重试队列
private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
private String taskTrackerAddress;
private ActorSelection taskTrackerActorRef;
@ -77,6 +81,7 @@ public class ProcessorTracker {
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
this.omsLogger = new OmsServerLogger(instanceId);
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
// 初始化 线程池TimingPool 启动的任务会检查 ThreadPool所以必须先初始化线程池否则NPE
initThreadPool();
@ -86,7 +91,7 @@ public class ProcessorTracker {
initProcessor();
log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId);
}catch (Exception e) {
}catch (Throwable e) {
log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, e);
lethal = true;
lethalReason = e.toString();
@ -108,7 +113,7 @@ public class ProcessorTracker {
// 一旦 ProcessorTracker 出现异常所有提交到此处的任务直接返回失败防止形成死锁
// 死锁分析TT创建PTPT创建失败无法定期汇报心跳TT长时间未收到PT心跳认为PT宕机确实宕机了无法选择可用的PT再次派发任务死锁形成GG斯密达 T_T
if (lethal) {
ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, newTask.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), lethalReason, System.currentTimeMillis());
ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq(instanceId, newTask.getSubInstanceId(), newTask.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), lethalReason, System.currentTimeMillis(), null);
taskTrackerActorRef.tell(report, null);
return;
}
@ -119,7 +124,7 @@ public class ProcessorTracker {
newTask.setAddress(taskTrackerAddress);
ClassLoader classLoader = omsContainer == null ? getClass().getClassLoader() : omsContainer.getContainerClassLoader();
ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader);
ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader, statusReportRetryQueue);
try {
threadPool.submit(processorRunnable);
success = true;
@ -134,6 +139,7 @@ public class ProcessorTracker {
if (success) {
ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
reportReq.setInstanceId(instanceId);
reportReq.setSubInstanceId(newTask.getSubInstanceId());
reportReq.setTaskId(newTask.getTaskId());
reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
reportReq.setReportTime(System.currentTimeMillis());
@ -165,6 +171,7 @@ public class ProcessorTracker {
// 2. 去除顶层引用送入GC世界
taskTrackerActorRef = null;
statusReportRetryQueue.clear();
ProcessorTrackerPool.removeProcessorTracker(instanceId);
log.info("[ProcessorTracker-{}] ProcessorTracker already destroyed!", instanceId);
@ -224,6 +231,19 @@ public class ProcessorTracker {
}
}
// 上报状态之前先重新发送失败的任务只要有结果堆积就不上报状态 PT 认为该 TT 失联然后重试相关任务
while (!statusReportRetryQueue.isEmpty()) {
ProcessorReportTaskStatusReq req = statusReportRetryQueue.poll();
if (req != null) {
req.setReportTime(System.currentTimeMillis());
if (!AkkaUtils.reliableTransmit(taskTrackerActorRef, req)) {
statusReportRetryQueue.add(req);
return;
}
}
}
// 上报当前 ProcessorTracker 负载
long waitingNum = threadPool.getQueue().size();
ProcessorTrackerStatusReportReq req = new ProcessorTrackerStatusReportReq(instanceId, waitingNum);
taskTrackerActorRef.tell(req, null);
@ -247,7 +267,7 @@ public class ProcessorTracker {
try {
processor = SpringUtils.getBean(processorInfo);
}catch (Exception e) {
log.warn("[ProcessorRunnable-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, e.toString());
log.warn("[ProcessorTracker-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, e.toString());
}
}
// 反射加载
@ -263,18 +283,20 @@ public class ProcessorTracker {
break;
case JAVA_CONTAINER:
String[] split = processorInfo.split("#");
log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", instanceId, split[1], split[0]);
omsContainer = OmsContainerFactory.getContainer(Long.valueOf(split[0]));
if (omsContainer != null) {
processor = omsContainer.getProcessor(split[1]);
}
break;
default:
log.warn("[ProcessorRunnable-{}] unknown processor type: {}.", instanceId, processorType);
log.warn("[ProcessorTracker-{}] unknown processor type: {}.", instanceId, processorType);
throw new OmsException("unknown processor type of " + processorType);
}
if (processor == null) {
log.warn("[ProcessorRunnable-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo);
log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", instanceId, processorType, processorInfo);
throw new OmsException("fetch Processor failed");
}
}

View File

@ -1,9 +1,13 @@
package com.github.kfcfans.powerjob.worker.core.tracker.processor;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* 持有 Processor 对象
@ -14,23 +18,36 @@ import java.util.function.Function;
*/
public class ProcessorTrackerPool {
private static final Map<Long, ProcessorTracker> instanceId2ProcessorTracker = Maps.newConcurrentMap();
// instanceId -> (TaskTrackerAddress -> ProcessorTracker)
// 处理脑裂情况下同一个 Instance 存在多个 TaskTracker 的情况
private static final Map<Long, Map<String, ProcessorTracker>> processorTrackerPool = Maps.newHashMap();
/**
* 获取 ProcessorTracker如果不存在则创建
*/
public static ProcessorTracker getProcessorTracker(Long instanceId, Function<Long, ProcessorTracker> creator) {
return instanceId2ProcessorTracker.computeIfAbsent(instanceId, creator);
public static ProcessorTracker getProcessorTracker(Long instanceId, String address, Supplier<ProcessorTracker> creator) {
ProcessorTracker processorTracker = processorTrackerPool.getOrDefault(instanceId, Collections.emptyMap()).get(address);
if (processorTracker == null) {
synchronized (ProcessorTrackerPool.class) {
processorTracker = processorTrackerPool.getOrDefault(instanceId, Collections.emptyMap()).get(address);
if (processorTracker == null) {
processorTracker = creator.get();
processorTrackerPool.computeIfAbsent(instanceId, ignore -> Maps.newHashMap()).put(address, processorTracker);
}
}
}
return processorTracker;
}
/**
* 获取 ProcessorTracker
*/
public static ProcessorTracker getProcessorTracker(Long instanceId) {
return instanceId2ProcessorTracker.get(instanceId);
}
public static List<ProcessorTracker> removeProcessorTracker(Long instanceId) {
public static void removeProcessorTracker(Long instanceId) {
instanceId2ProcessorTracker.remove(instanceId);
List<ProcessorTracker> res = Lists.newLinkedList();
Map<String, ProcessorTracker> ttAddress2Pt = processorTrackerPool.remove(instanceId);
if (ttAddress2Pt != null) {
res.addAll(ttAddress2Pt.values());
ttAddress2Pt.clear();
}
return res;
}
}

View File

@ -47,7 +47,9 @@ public class CommonTaskTracker extends TaskTracker {
@Override
protected void initTaskTracker(ServerScheduleJobReq req) {
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("oms-TaskTrackerTimingPool-%d").build();
// CommonTaskTrackerTimingPool 缩写
String poolName = String.format("ctttp-%d", req.getInstanceId()) + "-%d";
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
this.scheduledPool = Executors.newScheduledThreadPool(2, factory);
// 持久化根任务

View File

@ -82,7 +82,8 @@ public class FrequentTaskTracker extends TaskTracker {
subInstanceId2TimeHolder = Maps.newConcurrentMap();
// 1. 初始化定时调度线程池
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("oms-TaskTrackerTimingPool-%d").build();
String poolName = String.format("ftttp-%d", req.getInstanceId()) + "-%d";
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
this.scheduledPool = Executors.newScheduledThreadPool(3, factory);
// 2. 启动任务发射器

View File

@ -2,10 +2,12 @@ package com.github.kfcfans.powerjob.worker.core.tracker.task;
import akka.actor.ActorSelection;
import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.InstanceStatus;
import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
@ -97,7 +99,7 @@ public abstract class TaskTracker {
// 子类自定义初始化操作
initTaskTracker(req);
log.info("[TaskTracker-{}] create TaskTracker from request({}) successfully.", instanceId, req);
log.info("[TaskTracker-{}] create TaskTracker successfully.", instanceId);
}
/**
@ -106,12 +108,30 @@ public abstract class TaskTracker {
* @return API/CRON -> CommonTaskTracker, FIX_RATE/FIX_DELAY -> FrequentTaskTracker
*/
public static TaskTracker create(ServerScheduleJobReq req) {
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
switch (timeExpressionType) {
case FIX_RATE:
case FIX_DELAY:return new FrequentTaskTracker(req);
default:return new CommonTaskTracker(req);
try {
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
switch (timeExpressionType) {
case FIX_RATE:
case FIX_DELAY:return new FrequentTaskTracker(req);
default:return new CommonTaskTracker(req);
}
}catch (Exception e) {
log.warn("[TaskTracker-{}] create TaskTracker from request({}) failed.", req.getInstanceId(), req, e);
// 直接发送失败请求
TaskTrackerReportInstanceStatusReq response = new TaskTrackerReportInstanceStatusReq();
BeanUtils.copyProperties(req, response);
response.setInstanceStatus(InstanceStatus.FAILED.getV());
response.setResult(String.format("init TaskTracker failed, reason: %s", e.toString()));
response.setReportTime(System.currentTimeMillis());
response.setStartTime(System.currentTimeMillis());
response.setSourceAddress(OhMyWorker.getWorkerAddress());
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
serverActor.tell(response, null);
}
return null;
}
/* *************************** 对外方法区 *************************** */
@ -210,6 +230,8 @@ public abstract class TaskTracker {
}
} catch (InterruptedException ignore) {
} catch (Exception e) {
log.warn("[TaskTracker-{}] update task status failed.", instanceId, e);
} finally {
segmentLock.unlock(lockId);
}
@ -254,10 +276,9 @@ public abstract class TaskTracker {
* @param preExecuteSuccess 预执行广播任务运行状态
* @param subInstanceId 子实例ID
* @param preTaskId 预执行广播任务的taskId
* @param reportTime 上报时间
* @param result 预执行广播任务的结果
*/
public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, long reportTime, String result) {
public void broadcast(boolean preExecuteSuccess, long subInstanceId, String preTaskId, String result) {
if (finished.get()) {
return;
@ -265,7 +286,7 @@ public abstract class TaskTracker {
log.info("[TaskTracker-{}] finished broadcast's preProcess.", instanceId);
// 1. 生成集群子任务
// 生成集群子任务
if (preExecuteSuccess) {
List<String> allWorkerAddress = ptStatusHolder.getAllProcessorTrackers();
List<TaskDO> subTaskList = Lists.newLinkedList();
@ -280,10 +301,6 @@ public abstract class TaskTracker {
}else {
log.debug("[TaskTracker-{}] BroadcastTask failed because of preProcess failed, preProcess result={}.", instanceId, result);
}
// 2. 更新根任务状态广播任务的根任务为 preProcess 任务
int status = preExecuteSuccess ? TaskStatus.WORKER_PROCESS_SUCCESS.getValue() : TaskStatus.WORKER_PROCESS_FAILED.getValue();
updateTaskStatus(preTaskId, status, reportTime, result);
}
/**

View File

@ -1,25 +0,0 @@
package com.github.kfcfans.powerjob.worker.pojo.request;
import com.github.kfcfans.powerjob.common.OmsSerializable;
import lombok.Data;
/**
* 广播任务 preExecute 结束信息
*
* @author tjq
* @since 2020/3/23
*/
@Data
public class BroadcastTaskPreExecuteFinishedReq implements OmsSerializable {
private Long instanceId;
private Long subInstanceId;
private String taskId;
private boolean success;
private String msg;
// 上报时间
private long reportTime;
}

View File

@ -17,7 +17,10 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class ProcessorReportTaskStatusReq implements OmsSerializable {
public static final Integer BROADCAST = 1;
private Long instanceId;
private Long subInstanceId;
private String taskId;
private int status;
@ -29,4 +32,6 @@ public class ProcessorReportTaskStatusReq implements OmsSerializable {
// 上报时间
private long reportTime;
// 特殊请求名称
private Integer cmd;
}