[fix] use fastjson to serlize reference

This commit is contained in:
tjq 2020-05-31 19:53:54 +08:00
parent aa805062b1
commit e722004d10
11 changed files with 152 additions and 27 deletions

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.common.utils;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.kfcfans.oms.common.OmsException;
@ -16,7 +17,7 @@ public class JsonUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();
static {
objectMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
}
public static String toJSONString(Object obj) {

View File

@ -23,6 +23,7 @@
<jgit.version>5.7.0.202003110725-r</jgit.version>
<mvn.invoker.version>3.0.1</mvn.invoker.version>
<commons.net.version>3.6</commons.net.version>
<fastjson.version>1.2.68</fastjson.version>
<!-- 部署时跳过该module -->
<maven.deploy.skip>true</maven.deploy.skip>
@ -122,6 +123,13 @@
<version>${mvn.invoker.version}</version>
</dependency>
<!-- fastJson为了序列化 DAG 引用引入) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- swagger2 -->
<dependency>

View File

@ -2,8 +2,10 @@ package com.github.kfcfans.oms.server.service;
import com.github.kfcfans.oms.server.persistence.core.model.InstanceInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.InstanceInfoRepository;
import com.github.kfcfans.oms.server.persistence.core.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInfoRepository;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.extern.slf4j.Slf4j;
@ -26,9 +28,12 @@ public class CacheService {
@Resource
private JobInfoRepository jobInfoRepository;
@Resource
private WorkflowInfoRepository workflowInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
private final Cache<Long, String> jobId2JobNameCache;
private final Cache<Long, String> workflowId2WorkflowNameCache;
private final Cache<Long, Long> instanceId2AppId;
private final Cache<Long, Long> jobId2AppId;
@ -38,6 +43,11 @@ public class CacheService {
.maximumSize(1024)
.build();
workflowId2WorkflowNameCache = CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofMinutes(1))
.maximumSize(1024)
.build();
instanceId2AppId = CacheBuilder.newBuilder()
.maximumSize(4096)
.build();
@ -48,6 +58,8 @@ public class CacheService {
/**
* 根据 jobId 查询 jobName不保证数据一致性或者说只要改了数据必不一致hhh
* @param jobId 任务ID
* @return 任务名称
*/
public String getJobName(Long jobId) {
try {
@ -57,7 +69,25 @@ public class CacheService {
return jobInfoDOOptional.map(JobInfoDO::getJobName).orElse("");
});
}catch (Exception e) {
log.error("[CacheService] getAppIdByInstanceId for {} failed.", jobId, e);
log.error("[CacheService] getJobName for {} failed.", jobId, e);
}
return null;
}
/**
* 根据 workflowId 查询 工作流名称
* @param workflowId 工作流ID
* @return 工作流名称
*/
public String getWorkflowName(Long workflowId) {
try {
return workflowId2WorkflowNameCache.get(workflowId, () -> {
Optional<WorkflowInfoDO> jobInfoDOOptional = workflowInfoRepository.findById(workflowId);
// 防止缓存穿透 hhh但是一开始没有后来创建的情况下会有问题不过问题不大这里就不管了
return jobInfoDOOptional.map(WorkflowInfoDO::getWfName).orElse("");
});
}catch (Exception e) {
log.error("[CacheService] getWorkflowName for {} failed.", workflowId, e);
}
return null;
}

View File

@ -1,11 +1,12 @@
package com.github.kfcfans.oms.server.service.workflow;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.github.kfcfans.oms.common.SystemInstanceResult;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.WorkflowInstanceStatus;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.oms.server.persistence.core.model.JobInfoDO;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInfoDO;
@ -44,6 +45,11 @@ public class WorkflowInstanceManager {
@Resource
private WorkflowInstanceInfoRepository workflowInstanceInfoRepository;
static {
// 开启 FastJSON 序列化引用功能当前版本默认开启但是保险起见还是手动声明
JSONObject.DEFAULT_GENERATE_FEATURE = SerializerFeature.DisableCircularReferenceDetect.getMask();
}
/**
* 创建工作流任务实例
* @param wfInfo 工作流任务元数据描述信息
@ -65,8 +71,8 @@ public class WorkflowInstanceManager {
try {
// 将用于表达的DAG转化为用于计算的DAG
WorkflowDAG workflowDAG = WorkflowDAGUtils.convert(JsonUtils.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class));
newWfInstance.setDag(JsonUtils.toJSONString(workflowDAG));
WorkflowDAG workflowDAG = WorkflowDAGUtils.convert(JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class));
newWfInstance.setDag(JSONObject.toJSONString(workflowDAG));
newWfInstance.setStatus(WorkflowInstanceStatus.WAITING.getV());
}catch (Exception e) {
@ -112,7 +118,7 @@ public class WorkflowInstanceManager {
}
try {
WorkflowDAG workflowDAG = WorkflowDAGUtils.convert(JsonUtils.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class));
WorkflowDAG workflowDAG = WorkflowDAGUtils.convert(JSONObject.parseObject(wfInfo.getPeDAG(), PEWorkflowDAG.class));
// 运行根任务无法找到根任务则直接失败
WorkflowDAG.Node root = workflowDAG.getRoot();
@ -123,7 +129,7 @@ public class WorkflowInstanceManager {
// 持久化
wfInstanceInfo.setStatus(WorkflowInstanceStatus.RUNNING.getV());
wfInstanceInfo.setDag(JsonUtils.toJSONStringUnsafe(workflowDAG));
wfInstanceInfo.setDag(JSONObject.toJSONString(workflowDAG));
workflowInstanceInfoRepository.saveAndFlush(wfInstanceInfo);
log.info("[Workflow-{}] start workflow successfully, wfInstanceId={}", wfInfo.getId(), wfInstanceId);
@ -161,7 +167,7 @@ public class WorkflowInstanceManager {
log.debug("[Workflow-{}] one task in dag finished, wfInstanceId={},instanceId={},success={},result={}", wfId, wfInstanceId, instanceId, success, result);
try {
WorkflowDAG dag = JsonUtils.parseObject(wfInstance.getDag(), WorkflowDAG.class);
WorkflowDAG dag = JSONObject.parseObject(wfInstance.getDag(), WorkflowDAG.class);
// 计算是否有新的节点需要派发执行relyMap 自底向上 的映射用来判断所有父节点是否都已经完成
Map<Long, WorkflowDAG.Node> jobId2Node = Maps.newHashMap();
@ -186,7 +192,7 @@ public class WorkflowInstanceManager {
// 任务失败DAG流程被打断整体失败
if (!success) {
wfInstance.setDag(JsonUtils.toJSONStringUnsafe(dag));
wfInstance.setDag(JSONObject.toJSONString(dag));
wfInstance.setStatus(WorkflowInstanceStatus.FAILED.getV());
wfInstance.setResult(SystemInstanceResult.MIDDLE_JOB_FAILED);
wfInstance.setFinishedTime(System.currentTimeMillis());
@ -221,11 +227,11 @@ public class WorkflowInstanceManager {
// 构建下一个任务的入参 前置任务 jobId -> result
relyMap.get(jobId).forEach(jid -> preJobId2Result.put(jid, jobId2Node.get(jid).getResult()));
Long newInstanceId = instanceService.create(jobId, wfInstance.getAppId(), JsonUtils.toJSONString(preJobId2Result), wfInstanceId, System.currentTimeMillis());
Long newInstanceId = instanceService.create(jobId, wfInstance.getAppId(), JSONObject.toJSONString(preJobId2Result), wfInstanceId, System.currentTimeMillis());
jobId2Node.get(jobId).setInstanceId(newInstanceId);
jobId2InstanceId.put(jobId, newInstanceId);
jobId2InstanceParams.put(jobId, JsonUtils.toJSONString(preJobId2Result));
jobId2InstanceParams.put(jobId, JSONObject.toJSONString(preJobId2Result));
log.debug("[Workflow-{}] workflowInstance(wfInstanceId={}) start to process new node(jobId={},instanceId={})", wfId, wfInstanceId, jobId, newInstanceId);
});
@ -238,7 +244,7 @@ public class WorkflowInstanceManager {
log.info("[Workflow-{}] workflowInstance(wfInstanceId={}) process successfully.", wfId, wfInstanceId);
}
wfInstance.setDag(JsonUtils.toJSONString(dag));
wfInstance.setDag(JSONObject.toJSONString(dag));
workflowInstanceInfoRepository.saveAndFlush(wfInstance);
// 持久化结束后开始调度执行所有的任务

View File

@ -1,10 +1,10 @@
package com.github.kfcfans.oms.server.service.workflow;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.SystemInstanceResult;
import com.github.kfcfans.oms.common.WorkflowInstanceStatus;
import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.oms.server.service.instance.InstanceService;
@ -54,7 +54,7 @@ public class WorkflowInstanceService {
wfInstanceInfoRepository.saveAndFlush(wfInstance);
// 停止所有已启动且未完成的服务
WorkflowDAG workflowDAG = JsonUtils.parseObjectUnsafe(wfInstance.getDag(), WorkflowDAG.class);
WorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), WorkflowDAG.class);
Queue<WorkflowDAG.Node> queue = Queues.newLinkedBlockingQueue();
queue.add(workflowDAG.getRoot());
while (!queue.isEmpty()) {

View File

@ -1,9 +1,9 @@
package com.github.kfcfans.oms.server.service.workflow;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.common.OmsException;
import com.github.kfcfans.oms.common.TimeExpressionType;
import com.github.kfcfans.oms.common.request.http.SaveWorkflowRequest;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils;
import com.github.kfcfans.oms.server.common.SJ;
import com.github.kfcfans.oms.server.common.constans.SwitchableStatus;
@ -53,7 +53,7 @@ public class WorkflowService {
BeanUtils.copyProperties(req, wf);
wf.setGmtModified(new Date());
wf.setPeDAG(JsonUtils.toJSONString(req.getPEWorkflowDAG()));
wf.setPeDAG(JSONObject.toJSONString(req.getPEWorkflowDAG()));
wf.setStatus(req.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
wf.setTimeExpressionType(req.getTimeExpressionType().getV());

View File

@ -137,6 +137,11 @@ public class InstanceController {
// ID 转化为 StringJS精度丢失
instanceInfoVO.setJobId(instanceLogDO.getJobId().toString());
instanceInfoVO.setInstanceId(instanceLogDO.getInstanceId().toString());
if (instanceLogDO.getWfInstanceId() == null) {
instanceInfoVO.setWfInstanceId(OmsConstant.NONE);
}else {
instanceInfoVO.setWfInstanceId(String.valueOf(instanceLogDO.getWfInstanceId()));
}
// 格式化时间
if (instanceLogDO.getActualTriggerTime() == null) {

View File

@ -4,6 +4,7 @@ import com.github.kfcfans.oms.common.response.ResultDTO;
import com.github.kfcfans.oms.server.persistence.PageResult;
import com.github.kfcfans.oms.server.persistence.core.model.WorkflowInstanceInfoDO;
import com.github.kfcfans.oms.server.persistence.core.repository.WorkflowInstanceInfoRepository;
import com.github.kfcfans.oms.server.service.CacheService;
import com.github.kfcfans.oms.server.service.workflow.WorkflowInstanceService;
import com.github.kfcfans.oms.server.web.request.QueryWorkflowInstanceRequest;
import com.github.kfcfans.oms.server.web.response.WorkflowInstanceInfoVO;
@ -25,6 +26,8 @@ import java.util.stream.Collectors;
@RequestMapping("/wfInstance")
public class WorkflowInstanceController {
@Resource
private CacheService cacheService;
@Resource
private WorkflowInstanceService workflowInstanceService;
@Resource
@ -52,9 +55,9 @@ public class WorkflowInstanceController {
return ResultDTO.success(convertPage(ps));
}
private static PageResult<WorkflowInstanceInfoVO> convertPage(Page<WorkflowInstanceInfoDO> ps) {
private PageResult<WorkflowInstanceInfoVO> convertPage(Page<WorkflowInstanceInfoDO> ps) {
PageResult<WorkflowInstanceInfoVO> pr = new PageResult<>(ps);
pr.setData(ps.getContent().stream().map(WorkflowInstanceInfoVO::from).collect(Collectors.toList()));
pr.setData(ps.getContent().stream().map(x -> WorkflowInstanceInfoVO.from(x, cacheService.getWorkflowName(x.getWorkflowId()))).collect(Collectors.toList()));
return pr;
}
}

View File

@ -18,7 +18,7 @@ public class InstanceInfoVO {
// 任务实例IDJS精度丢失
private String instanceId;
// 该任务实例所属的 workflow ID workflow 任务存在
private Long wfInstanceId;
private String wfInstanceId;
// 执行结果
private String result;

View File

@ -20,16 +20,16 @@ import org.springframework.beans.BeanUtils;
@Data
public class WorkflowInstanceInfoVO {
// 任务所属应用的ID冗余提高查询效率
private Long appId;
// workflowInstanceId任务实例表都使用单独的ID作为主键以支持潜在的分表需求
private Long wfInstanceId;
private String wfInstanceId;
private Long workflowId;
private String workflowId;
// 工作流名称通过 workflowId 查询获取
private String workflowName;
// workflow 状态WorkflowInstanceStatus
private String status;
private Integer status;
private String statusStr;
private PEWorkflowDAG pEWorkflowDAG;
private String result;
@ -39,13 +39,18 @@ public class WorkflowInstanceInfoVO {
// 结束时间同理需要格式化
private String finishedTime;
public static WorkflowInstanceInfoVO from(WorkflowInstanceInfoDO wfInstanceDO) {
public static WorkflowInstanceInfoVO from(WorkflowInstanceInfoDO wfInstanceDO, String workflowName) {
WorkflowInstanceInfoVO vo = new WorkflowInstanceInfoVO();
BeanUtils.copyProperties(wfInstanceDO, vo);
vo.setStatus(WorkflowInstanceStatus.of(wfInstanceDO.getStatus()).getDes());
vo.setWorkflowName(workflowName);
vo.setStatusStr(WorkflowInstanceStatus.of(wfInstanceDO.getStatus()).getDes());
vo.setPEWorkflowDAG(WorkflowDAGUtils.convert2PE(JsonUtils.parseObjectUnsafe(wfInstanceDO.getDag(), WorkflowDAG.class)));
// JS精度丢失问题
vo.setWfInstanceId(String.valueOf(wfInstanceDO.getWfInstanceId()));
vo.setWorkflowId(String.valueOf(wfInstanceDO.getWorkflowId()));
// 格式化时间
vo.setActualTriggerTime(DateFormatUtils.format(wfInstanceDO.getActualTriggerTime(), OmsConstant.TIME_PATTERN));
if (wfInstanceDO.getFinishedTime() == null) {

View File

@ -0,0 +1,67 @@
package com.github.kfcfans.oms.server.test;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.common.model.PEWorkflowDAG;
import com.github.kfcfans.oms.common.model.WorkflowDAG;
import com.github.kfcfans.oms.common.utils.JsonUtils;
import com.github.kfcfans.oms.common.utils.WorkflowDAGUtils;
import com.google.common.collect.Lists;
import org.junit.Test;
import java.util.List;
/**
* DAG 图算法测试集合
*
* @author tjq
* @since 2020/5/31
*/
public class DAGTest {
@Test
public void testDAGUtils() throws Exception {
List<PEWorkflowDAG.Node> nodes = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges = Lists.newLinkedList();
// 图1 1 -> 2 -> 1理论上报错
nodes.add(new PEWorkflowDAG.Node(1L, "1", null, false, null));
nodes.add(new PEWorkflowDAG.Node(2L, "2", null, false, null));
edges.add(new PEWorkflowDAG.Edge(1L, 2L));
edges.add(new PEWorkflowDAG.Edge(2L, 1L));
System.out.println(WorkflowDAGUtils.valid(new PEWorkflowDAG(nodes, edges)));
// 图2 1 -> 2/3 -> 4
List<PEWorkflowDAG.Node> nodes2 = Lists.newLinkedList();
List<PEWorkflowDAG.Edge> edges2 = Lists.newLinkedList();
nodes2.add(new PEWorkflowDAG.Node(1L, "1", null, false, null));
nodes2.add(new PEWorkflowDAG.Node(2L, "2", null, false, null));
nodes2.add(new PEWorkflowDAG.Node(3L, "3", null, false, null));
nodes2.add(new PEWorkflowDAG.Node(4L, "4", null, false, null));
edges2.add(new PEWorkflowDAG.Edge(1L, 2L));
edges2.add(new PEWorkflowDAG.Edge(1L, 3L));
edges2.add(new PEWorkflowDAG.Edge(2L, 4L));
edges2.add(new PEWorkflowDAG.Edge(3L, 4L));
PEWorkflowDAG validPEDAG = new PEWorkflowDAG(nodes2, edges2);
System.out.println(WorkflowDAGUtils.valid(validPEDAG));
WorkflowDAG wfDAG = WorkflowDAGUtils.convert(validPEDAG);
System.out.println("jackson");
System.out.println(JsonUtils.toJSONString(wfDAG));
// Jackson 不知道怎么序列化引用只能放弃使用 FastJSON 序列化引用 $ref
WorkflowDAG wfDAGByJackSon = JsonUtils.parseObject(JsonUtils.toJSONString(wfDAG), WorkflowDAG.class);
System.out.println("fastJson");
System.out.println(JSONObject.toJSONString(wfDAG));
WorkflowDAG wfDAGByFastJSON = JSONObject.parseObject(JSONObject.toJSONString(wfDAG), WorkflowDAG.class);
// 打断点看 reference 关系
System.out.println(wfDAGByJackSon);
System.out.println(wfDAGByFastJSON);
}
}