feat: 更新实体模型以及 DAG 相关数据模型

This commit is contained in:
Echo009 2021-01-25 17:09:40 +08:00
parent e1fc805a0b
commit 56993335e3
15 changed files with 359 additions and 105 deletions

View File

@ -46,8 +46,8 @@ class TestWorkflow extends ClientInitializer {
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
nodes.add(new PEWorkflowDAG.Node(1L, "DAG-Node-1"));
nodes.add(new PEWorkflowDAG.Node(2L, "DAG-Node-2"));
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "DAG-Node-1"));
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "DAG-Node-2"));
edges.add(new PEWorkflowDAG.Edge(1L, 2L));

View File

@ -38,25 +38,45 @@ public class PEWorkflowDAG implements Serializable {
@NoArgsConstructor
@AllArgsConstructor
public static class Node implements Serializable {
/**
* node id
* @since 20210128
*/
private Long nodeId;
private Long jobId;
/**
* job alias or job name
*/
private String jobName;
/**
* Instance running param, which is not required by DAG.
*/
/* Instance running param, which is not required by DAG. */
@JsonSerialize(using= ToStringSerializer.class)
private Long instanceId;
private Integer status;
private String result;
public Node(Long jobId, String jobName) {
private String jobParams;
private Integer status;
private String result;
/**
* instanceId will be null if disable .
*/
private Boolean enable;
private Boolean skipWhenFailed;
public Node(Long nodeId,Long jobId, String jobName) {
this.nodeId = nodeId;
this.jobId = jobId;
this.jobName = jobName;
}
}
/**
* Edge formed by two job ids.
* Edge formed by two node ids.
*/
@Data
@NoArgsConstructor

View File

@ -23,6 +23,7 @@ public class WorkflowDAGUtils {
/**
* 获取所有根节点
*
* @param peWorkflowDAG 点线表示法的DAG图
* @return 根节点列表
*/
@ -37,6 +38,7 @@ public class WorkflowDAGUtils {
/**
* 校验 DAG 是否有效
*
* @param peWorkflowDAG 点线表示法的 DAG
* @return true/false
*/
@ -61,13 +63,14 @@ public class WorkflowDAGUtils {
}
}
return true;
}catch (Exception ignore) {
} catch (Exception ignore) {
}
return false;
}
/**
* 将点线表示法的DAG图转化为引用表达法的DAG图
*
* @param PEWorkflowDAG 点线表示法的DAG图
* @return 引用表示法的DAG图
*/
@ -82,7 +85,7 @@ public class WorkflowDAGUtils {
// 创建节点
PEWorkflowDAG.getNodes().forEach(node -> {
Long jobId = node.getJobId();
WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), jobId, node.getJobName(), null, InstanceStatus.WAITING_DISPATCH.getV(), null);
WorkflowDAG.Node n = new WorkflowDAG.Node(Lists.newLinkedList(), node.getNodeId(), jobId, node.getJobName(), InstanceStatus.WAITING_DISPATCH.getV());
id2Node.put(jobId, n);
// 初始阶段每一个点都设为顶点

View File

@ -18,21 +18,52 @@ import java.util.List;
@AllArgsConstructor
public class WorkflowDAG {
// DAG允许存在多个顶点
/**
* DAG允许存在多个顶点
*/
private List<Node> roots;
@Data
@NoArgsConstructor
@AllArgsConstructor
public static final class Node {
// 后继者子节点
private List<Node> successors;
private Long jobId;
private String jobName;
// 运行时参数
private Long instanceId; // 任务实例ID
private int status; // 状态 WAITING_DISPATCH -> RUNNING -> SUCCEED/FAILED/STOPPED
public Node(List<Node> successors, Long nodeId, Long jobId, String jobName, int status) {
this.successors = successors;
this.nodeId = nodeId;
this.jobId = jobId;
this.jobName = jobName;
this.status = status;
}
/**
* 后继者子节点
*/
private List<Node> successors;
/**
* node id
*
* @since 20210128
*/
private Long nodeId;
private Long jobId;
private String jobName;
/**
* 运行时信息
*/
private Long instanceId;
/**
* 状态 WAITING_DISPATCH -> RUNNING -> SUCCEED/FAILED/STOPPED
*/
private int status;
private String result;
/**
* instanceId will be null if disable .
*/
private Boolean enable;
private Boolean skipWhenFailed;
}
}

View File

@ -23,7 +23,9 @@ public class AppInfoDO {
private Long id;
private String appName;
// 应用分组密码
/**
* 应用分组密码
*/
private String password;
/**
@ -34,5 +36,6 @@ public class AppInfoDO {
private String currentServer;
private Date gmtCreate;
private Date gmtModified;
}

View File

@ -22,25 +22,38 @@ public class ContainerInfoDO {
@GenericGenerator(name = "native", strategy = "native")
private Long id;
// 所属的应用ID
/**
* 所属的应用ID
*/
private Long appId;
private String containerName;
// 容器类型枚举值为 ContainerSourceType
/**
* 容器类型枚举值为 ContainerSourceType
*/
private Integer sourceType;
// sourceType 决定JarFile -> String存储文件名称Git -> JSON包括 URLbranchusernamepassword
/**
* sourceType 决定JarFile -> String存储文件名称Git -> JSON包括 URLbranchusernamepassword
*/
private String sourceInfo;
// 版本 Jar包使用md5Git使用commitId前者32位后者40位不会产生碰撞
/**
* 版本 Jar包使用md5Git使用commitId前者32位后者40位不会产生碰撞
*/
private String version;
// 状态枚举值为 ContainerStatus
/**
* 状态枚举值为 ContainerStatus
*/
private Integer status;
// 上一次部署时间
/**
* 上一次部署时间
*/
private Date lastDeployTime;
private Date gmtCreate;
private Date gmtModified;
}

View File

@ -26,45 +26,70 @@ public class InstanceInfoDO {
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
// 任务ID
/**
* 任务ID
*/
private Long jobId;
// 任务所属应用的ID冗余提高查询效率
/**
* 任务所属应用的ID冗余提高查询效率
*/
private Long appId;
// 任务实例ID
/**
* 任务所属应用的ID冗余提高查询效率
*/
private Long instanceId;
// 任务实例参数
/**
* 任务实例参数动态
*/
@Lob
@Column
private String instanceParams;
// 该任务实例的类型普通/工作流InstanceType
/**
* 该任务实例的类型普通/工作流InstanceType
*/
private Integer type;
// 该任务实例所属的 workflow ID workflow 任务存在
/**
* 该任务实例所属的 workflow ID workflow 任务存在
*/
private Long wfInstanceId;
/**
* 任务状态 {@link InstanceStatus}
*/
private Integer status;
// 执行结果允许存储稍大的结果
/**
* 执行结果允许存储稍大的结果
*/
@Lob
@Column
private String result;
// 预计触发时间
/**
* 预计触发时间
*/
private Long expectedTriggerTime;
// 实际触发时间
/**
* 实际触发时间
*/
private Long actualTriggerTime;
// 结束时间
/**
* 结束时间
*/
private Long finishedTime;
// 最后上报时间
/**
* 最后上报时间
*/
private Long lastReportTime;
// TaskTracker地址
/**
* TaskTracker 地址
*/
private String taskTrackerAddress;
// 总共执行的次数用于重试判断
/**
* 总共执行的次数用于重试判断
*/
private Long runningTimes;
private Date gmtCreate;
private Date gmtModified;
}

View File

@ -29,66 +29,106 @@ public class JobInfoDO {
private Long id;
/* ************************** 任务基本信息 ************************** */
// 任务名称
/**
* 任务名称
*/
private String jobName;
// 任务描述
/**
* 任务描述
*/
private String jobDescription;
// 任务所属的应用ID
/**
* 任务所属的应用ID
*/
private Long appId;
// 任务自带的参数
/**
* 任务自带的参数
*/
private String jobParams;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
/**
* 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
*/
private Integer timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
/**
* 时间表达式CRON/NULL/LONG/LONG
*/
private String timeExpression;
/* ************************** 执行方式 ************************** */
// 执行类型单机/广播/MR
/**
* 执行类型单机/广播/MR
*/
private Integer executeType;
// 执行器类型Java/Shell
/**
* 执行器类型Java/Shell
*/
private Integer processorType;
// 执行器信息可能需要存储整个脚本文件
/**
* 执行器信息可能需要存储整个脚本文件
*/
@Lob
@Column
private String processorInfo;
/* ************************** 运行时配置 ************************** */
// 最大同时运行任务数默认 1
/**
* 最大同时运行任务数默认 1
*/
private Integer maxInstanceNum;
// 并发度同时执行某个任务的最大线程数量
/**
* 并发度同时执行某个任务的最大线程数量
*/
private Integer concurrency;
// 任务整体超时时间
/**
* 任务整体超时时间
*/
private Long instanceTimeLimit;
/* ************************** 重试配置 ************************** */
private Integer instanceRetryNum;
private Integer taskRetryNum;
// 1 正常运行2 停止不再调度
/**
* 1 正常运行2 停止不再调度
*/
private Integer status;
// 下一次调度时间
/**
* 下一次调度时间
*/
private Long nextTriggerTime;
/* ************************** 繁忙机器配置 ************************** */
// 最低CPU核心数量0代表不限
/**
* 最低CPU核心数量0代表不限
*/
private double minCpuCores;
// 最低内存空间单位 GB0代表不限
/**
* 最低内存空间单位 GB0代表不限
*/
private double minMemorySpace;
// 最低磁盘空间单位 GB0代表不限
/**
* 最低磁盘空间单位 GB0代表不限
*/
private double minDiskSpace;
/* ************************** 集群配置 ************************** */
// 指定机器运行空代表不限非空则只会使用其中的机器运行多值逗号分割
/**
* 指定机器运行空代表不限非空则只会使用其中的机器运行多值逗号分割
*/
private String designatedWorkers;
// 最大机器数量
/**
* 最大机器数量
*/
private Integer maxWorkerCount;
// 报警用户ID列表多值逗号分隔
/**
* 报警用户ID列表多值逗号分隔
*/
private String notifyUserIds;
private Date gmtCreate;
private Date gmtModified;
}

View File

@ -25,12 +25,15 @@ public class OmsLockDO {
private Long id;
private String lockName;
private String ownerIP;
// 最长持有锁的时间
private String ownerIP;
/**
* 最长持有锁的时间
*/
private Long maxLockTime;
private Date gmtCreate;
private Date gmtModified;
public OmsLockDO(String lockName, String ownerIP, Long maxLockTime) {

View File

@ -23,13 +23,13 @@ public class ServerInfoDO {
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
/**
* 服务器IP地址
*/
private String ip;
private Date gmtCreate;
private Date gmtModified;
public ServerInfoDO(String ip) {

View File

@ -23,18 +23,26 @@ public class UserInfoDO {
private Long id;
private String username;
private String password;
// 手机号
/**
* 手机号
*/
private String phone;
// 邮箱地址
/**
* 邮箱地址
*/
private String email;
// webHook
/**
* webHook
*/
private String webHook;
// 扩展字段
/**
* 扩展字段
*/
private String extra;
private Date gmtCreate;
private Date gmtModified;
}

View File

@ -27,33 +27,50 @@ public class WorkflowInfoDO {
private Long id;
private String wfName;
private String wfDescription;
// 所属应用ID
/**
* 所属应用ID
*/
private Long appId;
// 工作流的DAG图信息点线式DAG的json
/**
* 工作流的DAG图信息点线式DAG的json
*/
@Lob
@Column
private String peDAG;
/* ************************** 定时参数 ************************** */
// 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
/**
* 时间表达式类型CRON/API/FIX_RATE/FIX_DELAY
*/
private Integer timeExpressionType;
// 时间表达式CRON/NULL/LONG/LONG
/**
* 时间表达式CRON/NULL/LONG/LONG
*/
private String timeExpression;
// 最大同时运行的工作流个数默认 1
/**
* 最大同时运行的工作流个数默认 1
*/
private Integer maxWfInstanceNum;
// 1 正常运行2 停止不再调度
/**
* 1 正常运行2 停止不再调度
*/
private Integer status;
// 下一次调度时间
/**
* 下一次调度时间
*/
private Long nextTriggerTime;
// 工作流整体失败的报警
/**
* 工作流整体失败的报警
*/
private String notifyUserIds;
private Date gmtCreate;
private Date gmtModified;
}

View File

@ -25,36 +25,54 @@ public class WorkflowInstanceInfoDO {
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
// 任务所属应用的ID冗余提高查询效率
/**
* 任务所属应用的ID冗余提高查询效率
*/
private Long appId;
// workflowInstanceId任务实例表都使用单独的ID作为主键以支持潜在的分表需求
/**
* workflowInstanceId任务实例表都使用单独的ID作为主键以支持潜在的分表需求
*/
private Long wfInstanceId;
private Long workflowId;
// workflow 状态WorkflowInstanceStatus
/**
* workflow 状态WorkflowInstanceStatus
*/
private Integer status;
// 工作流启动参数
/**
* 工作流启动参数
*/
@Lob
@Column
private String wfInitParams;
/**
* 工作流上下文
*/
@Lob
@Column
private String wfContext;
@Lob
@Column
private String dag;
@Lob
@Column
private String result;
// 预计触发时间
/**
* 预计触发时间
*/
private Long expectedTriggerTime;
// 实际触发时间
/**
* 实际触发时间
*/
private Long actualTriggerTime;
// 结束时间
/**
* 结束时间
*/
private Long finishedTime;
private Date gmtCreate;
private Date gmtModified;
}

View File

@ -0,0 +1,73 @@
package com.github.kfcfans.powerjob.server.persistence.core.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.hibernate.annotations.GenericGenerator;
import javax.persistence.*;
import java.util.Date;
/**
* 工作流节点信息
* 记录了工作流中的任务节点个性化的配置信息
*
* @author Echo009
* @since 2021/1/23
*/
@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(indexes = {@Index(columnList = "appId"), @Index(columnList = "workflowId")})
public class WorkflowNodeInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
@Column(nullable = false)
private Long appId;
@Column(nullable = false)
private Long workflowId;
/**
* 任务 ID
*/
@Column(nullable = false)
private Long jobId;
/**
* 节点别名默认为对应的任务名称
*/
@Column(nullable = false)
private String nodeAlias;
/**
* 节点参数
*/
@Lob
private String nodeParams;
/**
* 是否启用
*/
@Column(nullable = false)
private Boolean enable;
/**
* 是否允许失败跳过
*/
@Column(nullable = false)
private Boolean skipWhenFailed;
/**
* 创建时间
*/
@Column(nullable = false)
private Date gmtCreate;
/**
* 更新时间
*/
@Column(nullable = false)
private Date gmtModified;
}

View File

@ -25,8 +25,8 @@ public class DAGTest {
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
// 图1 1 -> 2 -> 1理论上报错
nodes.add(new PEWorkflowDAG.Node(1L, "1"));
nodes.add(new PEWorkflowDAG.Node(2L, "2"));
nodes.add(new PEWorkflowDAG.Node(1L, 1L, "1"));
nodes.add(new PEWorkflowDAG.Node(2L, 2L, "2"));
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
edges.add(new PEWorkflowDAG.Edge(2L, 1L));
System.out.println(WorkflowDAGUtils.valid(new PEWorkflowDAG(nodes, edges)));
@ -35,10 +35,10 @@ public class DAGTest {
List<PEWorkflowDAG.Node> nodes2 = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges2 = Lists.newLinkedList();
nodes2.add(new PEWorkflowDAG.Node(1L, "1"));
nodes2.add(new PEWorkflowDAG.Node(2L, "2"));
nodes2.add(new PEWorkflowDAG.Node(3L, "3"));
nodes2.add(new PEWorkflowDAG.Node(4L, "4"));
nodes2.add(new PEWorkflowDAG.Node(1L, 1L, "1"));
nodes2.add(new PEWorkflowDAG.Node(2L, 2L, "2"));
nodes2.add(new PEWorkflowDAG.Node(3L, 3L, "3"));
nodes2.add(new PEWorkflowDAG.Node(4L, 4L, "4"));
edges2.add(new PEWorkflowDAG.Edge(1L, 2L));
edges2.add(new PEWorkflowDAG.Edge(1L, 3L));
edges2.add(new PEWorkflowDAG.Edge(2L, 4L));
@ -66,10 +66,10 @@ public class DAGTest {
List<PEWorkflowDAG.Node> nodes3 = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges3 = Lists.newLinkedList();
nodes3.add(new PEWorkflowDAG.Node(1L, "1"));
nodes3.add(new PEWorkflowDAG.Node(2L, "2"));
nodes3.add(new PEWorkflowDAG.Node(3L, "3"));
nodes3.add(new PEWorkflowDAG.Node(4L, "4"));
nodes3.add(new PEWorkflowDAG.Node(1L, 1L, "1"));
nodes3.add(new PEWorkflowDAG.Node(2L, 2L, "2"));
nodes3.add(new PEWorkflowDAG.Node(3L, 3L, "3"));
nodes3.add(new PEWorkflowDAG.Node(4L, 4L, "4"));
edges3.add(new PEWorkflowDAG.Edge(1L, 3L));
edges3.add(new PEWorkflowDAG.Edge(2L, 4L));