diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInfoRepository.java index aa0c7cfe..849c3784 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInfoRepository.java @@ -18,7 +18,14 @@ public interface WorkflowInfoRepository extends JpaRepository findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List appIds, int status, int timeExpressionType, long time); - // 对外查询(list)三兄弟 + /** + * 查询指定 APP 下所有的工作流信息 + * @param appId APP ID + * @return 该 APP 下的所有工作流信息 + */ + List findByAppId(Long appId); + + /** 对外查询(list)三兄弟 */ Page findByAppIdAndStatusNot(Long appId, int nStatus, Pageable pageable); Page findByIdAndStatusNot(Long id, int nStatus, Pageable pageable); Page findByAppIdAndStatusNotAndWfNameLike(Long appId, int nStatus, String condition, Pageable pageable); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/migrate/MigrateService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/migrate/MigrateService.java new file mode 100644 index 00000000..cab49e15 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/migrate/MigrateService.java @@ -0,0 +1,257 @@ +package com.github.kfcfans.powerjob.server.service.migrate; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.github.kfcfans.powerjob.common.PowerJobException; +import com.github.kfcfans.powerjob.common.ProcessorType; +import com.github.kfcfans.powerjob.common.model.PEWorkflowDAG; +import com.github.kfcfans.powerjob.server.extension.LockService; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowNodeInfoDO; +import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; +import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository; +import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowNodeInfoRepository; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.jpa.domain.Specification; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import javax.annotation.Resource; +import javax.persistence.criteria.Predicate; +import javax.transaction.Transactional; +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * Help users upgrade from a low version of powerjob-server to a high version of powerjob-server + * + * @author tjq + * @author Echo009 + * @since 2021/3/5 + */ +@Service +@Slf4j +public class MigrateService { + + private static final String MIGRATE_LOCK_TEMPLATE = "migrateLock-%s-%s"; + + @Resource + private LockService lockService; + @Resource + private JobInfoRepository jobInfoRepository; + @Resource + private WorkflowInfoRepository workflowInfoRepository; + @Resource + private WorkflowNodeInfoRepository workflowNodeInfoRepository; + /** + * 避免内部方法调用导致事务不生效 + */ + @Resource + private MigrateService self; + + /* ********************** 3.x => 4.x ********************** */ + + /** + * 修复该 APP 下使用了弃用的处理器类型 {@link ProcessorType#SHELL} 以及 {@link ProcessorType#PYTHON} 的任务 + * 将其替换为官方提供的 Processor + */ + @Transactional(rollbackOn = Exception.class) + public JSONObject fixDeprecatedProcessType(Long appId) { + + final String lock = String.format(MIGRATE_LOCK_TEMPLATE, "fixDeprecatedProcessType", appId); + // 120 s + boolean getLock = lockService.tryLock(lock, 120000); + if (!getLock) { + throw new PowerJobException("get lock failed, maybe other migrate job is running"); + } + try { + JSONObject resultLog = new JSONObject(); + resultLog.put("docs", "https://www.yuque.com/powerjob/guidence/official_processor"); + resultLog.put("tips", "please add the maven dependency of 'powerjob-official-processors'"); + + Set convertedJobIds = Sets.newHashSet(); + + Specification specification = (root, query, criteriaBuilder) -> { + List predicates = Lists.newLinkedList(); + List scriptJobTypes = Lists.newArrayList(ProcessorType.SHELL.getV(), ProcessorType.PYTHON.getV()); + predicates.add(criteriaBuilder.equal(root.get("appId"), appId)); + predicates.add(root.get("processorType").in(scriptJobTypes)); + return query.where(predicates.toArray(new Predicate[0])).getRestriction(); + }; + List scriptJobs = jobInfoRepository.findAll(specification); + resultLog.put("scriptJobsNum", scriptJobs.size()); + + Stopwatch stopwatch = Stopwatch.createStarted(); + log.info("[FixDeprecatedProcessType] start to fix the job info whose processor type is deprecated,total number : {}", scriptJobs.size()); + scriptJobs.forEach(job -> { + + ProcessorType oldProcessorType = ProcessorType.of(job.getProcessorType()); + + job.setJobParams(job.getProcessorInfo()); + job.setProcessorType(ProcessorType.EMBEDDED_JAVA.getV()); + + if (oldProcessorType == ProcessorType.PYTHON) { + job.setProcessorInfo("tech.powerjob.official.processors.impl.script.PythonProcessor"); + } else { + job.setProcessorInfo("tech.powerjob.official.processors.impl.script.ShellProcessor"); + } + + jobInfoRepository.saveAndFlush(job); + convertedJobIds.add(job.getId()); + }); + resultLog.put("convertedJobIds", convertedJobIds); + stopwatch.stop(); + log.info("[FixDeprecatedProcessType] fix the job info successfully,used time: {}s", stopwatch.elapsed(TimeUnit.SECONDS)); + return resultLog; + } catch (Exception e) { + // log + log.error("[FixDeprecatedProcessType] fail to fix the job info of app {}", appId, e); + // rethrow + throw e; + } finally { + lockService.unlock(lock); + } + } + + + /** + * 修复该 APP 下的工作流信息,允许部分修复成功 + * 1、自动生成对应的节点信息 {@link WorkflowNodeInfoDO} + * 2、修复 DAG 信息(边+节点ID) + */ + @SuppressWarnings("squid:S1141") + public JSONObject fixWorkflowInfoFromV3ToV4(Long appId) { + + final String lock = String.format(MIGRATE_LOCK_TEMPLATE, "fixWorkflowInfoFromV3ToV4", appId); + // 180 s + boolean getLock = lockService.tryLock(lock, 180000); + if (!getLock) { + throw new PowerJobException("get lock failed, maybe other migrate job is running"); + } + + try { + JSONObject resultLog = new JSONObject(); + Set fixedWorkflowIds = Sets.newHashSet(); + + List workflowInfoList = workflowInfoRepository.findByAppId(appId); + resultLog.put("totalNum", workflowInfoList.size()); + Stopwatch stopwatch = Stopwatch.createStarted(); + log.info("[FixWorkflowInfoFromV3ToV4] start to fix the workflow info, total number : {}", workflowInfoList.size()); + + + HashMap jobId2NodeIdMap = new HashMap<>(64); + HashMap failureReasonMap = new HashMap<>(workflowInfoList.size() / 2 + 1); + + for (WorkflowInfoDO workflowInfo : workflowInfoList) { + + try { + boolean fixed = self.fixWorkflowInfoCoreFromV3ToV4(workflowInfo, jobId2NodeIdMap); + if (fixed) { + fixedWorkflowIds.add(workflowInfo.getId()); + } + } catch (Exception e) { + // 记录失败原因 + failureReasonMap.put(workflowInfo.getId(), e.toString()); + } + // 清空映射关系 + jobId2NodeIdMap.clear(); + } + stopwatch.stop(); + log.info("[FixWorkflowInfoFromV3ToV4] fix the workflow info successfully, total number : {}, fixed number : {}, used time: {}s", workflowInfoList.size(), fixedWorkflowIds.size(), stopwatch.elapsed(TimeUnit.SECONDS)); + resultLog.put("fixedWorkflowIds", fixedWorkflowIds); + resultLog.put("failureWorkflowInfo", failureReasonMap); + return resultLog; + } catch (Exception e) { + // log + log.error("[FixWorkflowInfoFromV3ToV4] fail to fix the workflow info of app {}", appId, e); + // rethrow + throw e; + } finally { + lockService.unlock(lock); + } + + } + + /** + * 有两种情况会修复失败 + * 1、节点对应 job 信息缺失(被物理删除) + * 2、图中一部分节点有 nodeId,一部分没有 + */ + @Transactional(rollbackOn = Exception.class) + public boolean fixWorkflowInfoCoreFromV3ToV4(WorkflowInfoDO workflowInfo, Map jobId2NodeIdMap) { + + String dag = workflowInfo.getPeDAG(); + PEWorkflowDAG peDag; + try { + peDag = JSON.parseObject(dag, PEWorkflowDAG.class); + } catch (Exception e) { + throw new PowerJobException("invalid DAG!"); + } + if (peDag == null || CollectionUtils.isEmpty(peDag.getNodes())) { + // 不需要修复 + return false; + } + // 只要有任意一个节点中存在 nodeId ,那么就不需要修复 + // 如果没有直接在 DB 改过数据,那么不可能出现一部分节点有 id,一部分没有的情况 + boolean needFix = false; + boolean existNodeId = false; + for (PEWorkflowDAG.Node node : peDag.getNodes()) { + if (node.getNodeId() == null) { + needFix = true; + } else { + existNodeId = true; + } + } + // 存在错误数据(一部分节点有 id,一部分没有),这种情况下只能让用户手工修复数据了 + if (needFix && existNodeId) { + throw new PowerJobException("sorry,we can't fix this workflow info automatically whose node info is wrong! you need to fix them by yourself."); + } + // 不需要修复,所有节点 id 均存在 + if (!needFix) { + return false; + } + + // 修复节点信息 + for (PEWorkflowDAG.Node node : peDag.getNodes()) { + JobInfoDO jobInfo = jobInfoRepository.findById(node.getJobId()).orElseThrow(() -> new PowerJobException("can't find job by id " + node.getJobId())); + + WorkflowNodeInfoDO nodeInfo = new WorkflowNodeInfoDO(); + nodeInfo.setWorkflowId(workflowInfo.getId()); + nodeInfo.setAppId(workflowInfo.getAppId()); + nodeInfo.setJobId(jobInfo.getId()); + // 默认启用,不允许失败跳过,参数和 Job 保持一致 + nodeInfo.setNodeAlias(jobInfo.getJobName()); + nodeInfo.setNodeParams(jobInfo.getJobParams()); + nodeInfo.setEnable(true); + nodeInfo.setSkipWhenFailed(false); + nodeInfo.setGmtCreate(new Date()); + nodeInfo.setGmtModified(new Date()); + + nodeInfo = workflowNodeInfoRepository.saveAndFlush(nodeInfo); + // 更新节点 ID + node.setNodeId(nodeInfo.getId()); + node.setJobName(nodeInfo.getNodeAlias()); + + jobId2NodeIdMap.put(node.getJobId(), node.getNodeId()); + } + if (!CollectionUtils.isEmpty(peDag.getEdges())) { + // 修复边信息 + for (PEWorkflowDAG.Edge edge : peDag.getEdges()) { + // 转换为节点 ID + edge.setFrom(jobId2NodeIdMap.get(edge.getFrom())); + edge.setTo(jobId2NodeIdMap.get(edge.getTo())); + } + } + workflowInfo.setPeDAG(JSON.toJSONString(peDag)); + workflowInfo.setGmtModified(new Date()); + workflowInfoRepository.saveAndFlush(workflowInfo); + return true; + + } + + +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/MigrateController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/MigrateController.java index 8d0c40e8..fb7552f5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/MigrateController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/MigrateController.java @@ -1,29 +1,21 @@ package com.github.kfcfans.powerjob.server.web.controller; import com.alibaba.fastjson.JSONObject; -import com.github.kfcfans.powerjob.common.ProcessorType; import com.github.kfcfans.powerjob.common.response.ResultDTO; -import com.github.kfcfans.powerjob.server.extension.LockService; -import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; -import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import com.github.kfcfans.powerjob.server.service.migrate.MigrateService; import lombok.extern.slf4j.Slf4j; -import org.springframework.data.jpa.domain.Specification; -import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; -import javax.persistence.criteria.Predicate; -import java.util.List; -import java.util.Set; /** * Help users upgrade from a low version of powerjob-server to a high version of powerjob-server * v4 means that this interface was upgraded from version v3.x to v4.x, and so on * * @author tjq + * @author Echo009 * @since 2021/2/23 */ @Slf4j @@ -31,59 +23,25 @@ import java.util.Set; @RequestMapping("/migrate") public class MigrateController { + @Resource - private LockService lockService; - @Resource - private JobInfoRepository jobInfoRepository; + private MigrateService migrateService; - @GetMapping("/v4/script") - public ResultDTO migrateScriptFromV3ToV4(Long appId) { - - JSONObject resultLog = new JSONObject(); - resultLog.put("docs", "https://www.yuque.com/powerjob/guidence/official_processor"); - resultLog.put("tips", "please add the maven dependency of 'powerjob-official-processors'"); - - String lock = "migrateScriptFromV3ToV4-" + appId; - boolean getLock = lockService.tryLock(lock, 60000); - if (!getLock) { - return ResultDTO.failed("get lock failed, maybe other migrate job is running"); - } - try { - Set convertedJobIds = Sets.newHashSet(); - - Specification specification = (Specification) (root, query, criteriaBuilder) -> { - List predicates = Lists.newLinkedList(); - List scriptJobTypes = Lists.newArrayList(ProcessorType.SHELL.getV(), ProcessorType.PYTHON.getV()); - predicates.add(criteriaBuilder.equal(root.get("appId"), appId)); - predicates.add(root.get("processorType").in(scriptJobTypes)); - return query.where(predicates.toArray(new Predicate[0])).getRestriction(); - }; - List scriptJobs = jobInfoRepository.findAll(specification); - - resultLog.put("scriptJobsNum", scriptJobs.size()); - resultLog.put("convertedJobIds", convertedJobIds); - - log.info("[MigrateScriptFromV3ToV4] script job num: {}", scriptJobs.size()); - scriptJobs.forEach(job -> { - - ProcessorType oldProcessorType = ProcessorType.of(job.getProcessorType()); - - job.setJobParams(job.getProcessorInfo()); - job.setProcessorType(ProcessorType.EMBEDDED_JAVA.getV()); - - if (oldProcessorType == ProcessorType.PYTHON) { - job.setProcessorInfo("tech.powerjob.official.processors.impl.script.PythonProcessor"); - } else { - job.setProcessorInfo("tech.powerjob.official.processors.impl.script.ShellProcessor"); - } - - jobInfoRepository.saveAndFlush(job); - convertedJobIds.add(job.getId()); - }); - return ResultDTO.success(resultLog); - } finally { - lockService.unlock(lock); - } + /** + * 修复对应 APP 下的任务信息 + */ + @RequestMapping("/v4/job") + public ResultDTO fixJobInfoFromV3ToV4(@RequestParam Long appId) { + return ResultDTO.success(migrateService.fixDeprecatedProcessType(appId)); } + /** + * 修复对应 APP 下的工作流信息 + */ + @RequestMapping("/v4/workflow") + public ResultDTO fixWorkflowInfoFromV3ToV4(@RequestParam Long appId){ + return ResultDTO.success(migrateService.fixWorkflowInfoFromV3ToV4(appId)); + } + + }