feat: migrate data service (v3.x to v4.x)

This commit is contained in:
Echo009 2021-03-05 23:54:50 +08:00
parent 218d974122
commit 5b09865b42
3 changed files with 285 additions and 63 deletions

View File

@ -18,7 +18,14 @@ public interface WorkflowInfoRepository extends JpaRepository<WorkflowInfoDO, Lo
List<WorkflowInfoDO> findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List<Long> appIds, int status, int timeExpressionType, long time);
// 对外查询list三兄弟
/**
* 查询指定 APP 下所有的工作流信息
* @param appId APP ID
* @return APP 下的所有工作流信息
*/
List<WorkflowInfoDO> findByAppId(Long appId);
/** 对外查询list三兄弟 */
Page<WorkflowInfoDO> findByAppIdAndStatusNot(Long appId, int nStatus, Pageable pageable);
Page<WorkflowInfoDO> findByIdAndStatusNot(Long id, int nStatus, Pageable pageable);
Page<WorkflowInfoDO> findByAppIdAndStatusNotAndWfNameLike(Long appId, int nStatus, String condition, Pageable pageable);

View File

@ -0,0 +1,257 @@
package com.github.kfcfans.powerjob.server.service.migrate;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG;
import com.github.kfcfans.powerjob.server.extension.LockService;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowNodeInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository;
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowNodeInfoRepository;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import javax.persistence.criteria.Predicate;
import javax.transaction.Transactional;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* Help users upgrade from a low version of powerjob-server to a high version of powerjob-server
*
* @author tjq
* @author Echo009
* @since 2021/3/5
*/
@Service
@Slf4j
public class MigrateService {
private static final String MIGRATE_LOCK_TEMPLATE = "migrateLock-%s-%s";
@Resource
private LockService lockService;
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Resource
private WorkflowNodeInfoRepository workflowNodeInfoRepository;
/**
* 避免内部方法调用导致事务不生效
*/
@Resource
private MigrateService self;
/* ********************** 3.x => 4.x ********************** */
/**
* 修复该 APP 下使用了弃用的处理器类型 {@link ProcessorType#SHELL} 以及 {@link ProcessorType#PYTHON} 的任务
* 将其替换为官方提供的 Processor
*/
@Transactional(rollbackOn = Exception.class)
public JSONObject fixDeprecatedProcessType(Long appId) {
final String lock = String.format(MIGRATE_LOCK_TEMPLATE, "fixDeprecatedProcessType", appId);
// 120 s
boolean getLock = lockService.tryLock(lock, 120000);
if (!getLock) {
throw new PowerJobException("get lock failed, maybe other migrate job is running");
}
try {
JSONObject resultLog = new JSONObject();
resultLog.put("docs", "https://www.yuque.com/powerjob/guidence/official_processor");
resultLog.put("tips", "please add the maven dependency of 'powerjob-official-processors'");
Set<Long> convertedJobIds = Sets.newHashSet();
Specification<JobInfoDO> specification = (root, query, criteriaBuilder) -> {
List<Predicate> predicates = Lists.newLinkedList();
List<Integer> scriptJobTypes = Lists.newArrayList(ProcessorType.SHELL.getV(), ProcessorType.PYTHON.getV());
predicates.add(criteriaBuilder.equal(root.get("appId"), appId));
predicates.add(root.get("processorType").in(scriptJobTypes));
return query.where(predicates.toArray(new Predicate[0])).getRestriction();
};
List<JobInfoDO> scriptJobs = jobInfoRepository.findAll(specification);
resultLog.put("scriptJobsNum", scriptJobs.size());
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[FixDeprecatedProcessType] start to fix the job info whose processor type is deprecated,total number : {}", scriptJobs.size());
scriptJobs.forEach(job -> {
ProcessorType oldProcessorType = ProcessorType.of(job.getProcessorType());
job.setJobParams(job.getProcessorInfo());
job.setProcessorType(ProcessorType.EMBEDDED_JAVA.getV());
if (oldProcessorType == ProcessorType.PYTHON) {
job.setProcessorInfo("tech.powerjob.official.processors.impl.script.PythonProcessor");
} else {
job.setProcessorInfo("tech.powerjob.official.processors.impl.script.ShellProcessor");
}
jobInfoRepository.saveAndFlush(job);
convertedJobIds.add(job.getId());
});
resultLog.put("convertedJobIds", convertedJobIds);
stopwatch.stop();
log.info("[FixDeprecatedProcessType] fix the job info successfully,used time: {}s", stopwatch.elapsed(TimeUnit.SECONDS));
return resultLog;
} catch (Exception e) {
// log
log.error("[FixDeprecatedProcessType] fail to fix the job info of app {}", appId, e);
// rethrow
throw e;
} finally {
lockService.unlock(lock);
}
}
/**
* 修复该 APP 下的工作流信息允许部分修复成功
* 1自动生成对应的节点信息 {@link WorkflowNodeInfoDO}
* 2修复 DAG 信息+节点ID
*/
@SuppressWarnings("squid:S1141")
public JSONObject fixWorkflowInfoFromV3ToV4(Long appId) {
final String lock = String.format(MIGRATE_LOCK_TEMPLATE, "fixWorkflowInfoFromV3ToV4", appId);
// 180 s
boolean getLock = lockService.tryLock(lock, 180000);
if (!getLock) {
throw new PowerJobException("get lock failed, maybe other migrate job is running");
}
try {
JSONObject resultLog = new JSONObject();
Set<Long> fixedWorkflowIds = Sets.newHashSet();
List<WorkflowInfoDO> workflowInfoList = workflowInfoRepository.findByAppId(appId);
resultLog.put("totalNum", workflowInfoList.size());
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[FixWorkflowInfoFromV3ToV4] start to fix the workflow info, total number : {}", workflowInfoList.size());
HashMap<Long, Long> jobId2NodeIdMap = new HashMap<>(64);
HashMap<Long, String> failureReasonMap = new HashMap<>(workflowInfoList.size() / 2 + 1);
for (WorkflowInfoDO workflowInfo : workflowInfoList) {
try {
boolean fixed = self.fixWorkflowInfoCoreFromV3ToV4(workflowInfo, jobId2NodeIdMap);
if (fixed) {
fixedWorkflowIds.add(workflowInfo.getId());
}
} catch (Exception e) {
// 记录失败原因
failureReasonMap.put(workflowInfo.getId(), e.toString());
}
// 清空映射关系
jobId2NodeIdMap.clear();
}
stopwatch.stop();
log.info("[FixWorkflowInfoFromV3ToV4] fix the workflow info successfully, total number : {}, fixed number : {}, used time: {}s", workflowInfoList.size(), fixedWorkflowIds.size(), stopwatch.elapsed(TimeUnit.SECONDS));
resultLog.put("fixedWorkflowIds", fixedWorkflowIds);
resultLog.put("failureWorkflowInfo", failureReasonMap);
return resultLog;
} catch (Exception e) {
// log
log.error("[FixWorkflowInfoFromV3ToV4] fail to fix the workflow info of app {}", appId, e);
// rethrow
throw e;
} finally {
lockService.unlock(lock);
}
}
/**
* 有两种情况会修复失败
* 1节点对应 job 信息缺失被物理删除
* 2图中一部分节点有 nodeId一部分没有
*/
@Transactional(rollbackOn = Exception.class)
public boolean fixWorkflowInfoCoreFromV3ToV4(WorkflowInfoDO workflowInfo, Map<Long, Long> jobId2NodeIdMap) {
String dag = workflowInfo.getPeDAG();
PEWorkflowDAG peDag;
try {
peDag = JSON.parseObject(dag, PEWorkflowDAG.class);
} catch (Exception e) {
throw new PowerJobException("invalid DAG!");
}
if (peDag == null || CollectionUtils.isEmpty(peDag.getNodes())) {
// 不需要修复
return false;
}
// 只要有任意一个节点中存在 nodeId 那么就不需要修复
// 如果没有直接在 DB 改过数据那么不可能出现一部分节点有 id一部分没有的情况
boolean needFix = false;
boolean existNodeId = false;
for (PEWorkflowDAG.Node node : peDag.getNodes()) {
if (node.getNodeId() == null) {
needFix = true;
} else {
existNodeId = true;
}
}
// 存在错误数据(一部分节点有 id一部分没有)这种情况下只能让用户手工修复数据了
if (needFix && existNodeId) {
throw new PowerJobException("sorry,we can't fix this workflow info automatically whose node info is wrong! you need to fix them by yourself.");
}
// 不需要修复所有节点 id 均存在
if (!needFix) {
return false;
}
// 修复节点信息
for (PEWorkflowDAG.Node node : peDag.getNodes()) {
JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseThrow(() -> new PowerJobException("can't find job by id " + node.getJobId()));
WorkflowNodeInfoDO nodeInfo = new WorkflowNodeInfoDO();
nodeInfo.setWorkflowId(workflowInfo.getId());
nodeInfo.setAppId(workflowInfo.getAppId());
nodeInfo.setJobId(jobInfo.getId());
// 默认启用不允许失败跳过参数和 Job 保持一致
nodeInfo.setNodeAlias(jobInfo.getJobName());
nodeInfo.setNodeParams(jobInfo.getJobParams());
nodeInfo.setEnable(true);
nodeInfo.setSkipWhenFailed(false);
nodeInfo.setGmtCreate(new Date());
nodeInfo.setGmtModified(new Date());
nodeInfo = workflowNodeInfoRepository.saveAndFlush(nodeInfo);
// 更新节点 ID
node.setNodeId(nodeInfo.getId());
node.setJobName(nodeInfo.getNodeAlias());
jobId2NodeIdMap.put(node.getJobId(), node.getNodeId());
}
if (!CollectionUtils.isEmpty(peDag.getEdges())) {
// 修复边信息
for (PEWorkflowDAG.Edge edge : peDag.getEdges()) {
// 转换为节点 ID
edge.setFrom(jobId2NodeIdMap.get(edge.getFrom()));
edge.setTo(jobId2NodeIdMap.get(edge.getTo()));
}
}
workflowInfo.setPeDAG(JSON.toJSONString(peDag));
workflowInfo.setGmtModified(new Date());
workflowInfoRepository.saveAndFlush(workflowInfo);
return true;
}
}

View File

@ -1,29 +1,21 @@
package com.github.kfcfans.powerjob.server.web.controller;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.powerjob.common.ProcessorType;
import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.server.extension.LockService;
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.github.kfcfans.powerjob.server.service.migrate.MigrateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.persistence.criteria.Predicate;
import java.util.List;
import java.util.Set;
/**
* Help users upgrade from a low version of powerjob-server to a high version of powerjob-server
* v4 means that this interface was upgraded from version v3.x to v4.x, and so on
*
* @author tjq
* @author Echo009
* @since 2021/2/23
*/
@Slf4j
@ -31,59 +23,25 @@ import java.util.Set;
@RequestMapping("/migrate")
public class MigrateController {
@Resource
private LockService lockService;
@Resource
private JobInfoRepository jobInfoRepository;
private MigrateService migrateService;
@GetMapping("/v4/script")
public ResultDTO<JSONObject> migrateScriptFromV3ToV4(Long appId) {
JSONObject resultLog = new JSONObject();
resultLog.put("docs", "https://www.yuque.com/powerjob/guidence/official_processor");
resultLog.put("tips", "please add the maven dependency of 'powerjob-official-processors'");
String lock = "migrateScriptFromV3ToV4-" + appId;
boolean getLock = lockService.tryLock(lock, 60000);
if (!getLock) {
return ResultDTO.failed("get lock failed, maybe other migrate job is running");
}
try {
Set<Long> convertedJobIds = Sets.newHashSet();
Specification<JobInfoDO> specification = (Specification<JobInfoDO>) (root, query, criteriaBuilder) -> {
List<Predicate> predicates = Lists.newLinkedList();
List<Integer> scriptJobTypes = Lists.newArrayList(ProcessorType.SHELL.getV(), ProcessorType.PYTHON.getV());
predicates.add(criteriaBuilder.equal(root.get("appId"), appId));
predicates.add(root.get("processorType").in(scriptJobTypes));
return query.where(predicates.toArray(new Predicate[0])).getRestriction();
};
List<JobInfoDO> scriptJobs = jobInfoRepository.findAll(specification);
resultLog.put("scriptJobsNum", scriptJobs.size());
resultLog.put("convertedJobIds", convertedJobIds);
log.info("[MigrateScriptFromV3ToV4] script job num: {}", scriptJobs.size());
scriptJobs.forEach(job -> {
ProcessorType oldProcessorType = ProcessorType.of(job.getProcessorType());
job.setJobParams(job.getProcessorInfo());
job.setProcessorType(ProcessorType.EMBEDDED_JAVA.getV());
if (oldProcessorType == ProcessorType.PYTHON) {
job.setProcessorInfo("tech.powerjob.official.processors.impl.script.PythonProcessor");
} else {
job.setProcessorInfo("tech.powerjob.official.processors.impl.script.ShellProcessor");
}
jobInfoRepository.saveAndFlush(job);
convertedJobIds.add(job.getId());
});
return ResultDTO.success(resultLog);
} finally {
lockService.unlock(lock);
}
/**
* 修复对应 APP 下的任务信息
*/
@RequestMapping("/v4/job")
public ResultDTO<JSONObject> fixJobInfoFromV3ToV4(@RequestParam Long appId) {
return ResultDTO.success(migrateService.fixDeprecatedProcessType(appId));
}
/**
* 修复对应 APP 下的工作流信息
*/
@RequestMapping("/v4/workflow")
public ResultDTO<JSONObject> fixWorkflowInfoFromV3ToV4(@RequestParam Long appId){
return ResultDTO.success(migrateService.fixWorkflowInfoFromV3ToV4(appId));
}
}