change serialization framwork(built-in java -> jackson-cbor)

This commit is contained in:
tjq 2020-04-16 19:37:41 +08:00
parent a8354f4cf4
commit 241dce57fa
45 changed files with 191 additions and 108 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

View File

@ -16,7 +16,6 @@
<properties>
<oms.common.version>1.0.0-SNAPSHOT</oms.common.version>
<junit.version>5.6.1</junit.version>
<fastjson.version>1.2.68</fastjson.version>
</properties>
<dependencies>
@ -26,12 +25,6 @@
<artifactId>oh-my-scheduler-common</artifactId>
<version>${oms.common.version}</version>
</dependency>
<!-- fastJson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- Junit 测试 -->
<dependency>
<groupId>org.junit.jupiter</groupId>

View File

@ -1,9 +1,9 @@
package com.github.kfcfans.oms.client;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.OpenAPIConstant;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.common.utils.HttpUtils;
import com.github.kfcfans.common.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import okhttp3.FormBody;
import okhttp3.RequestBody;
@ -43,7 +43,7 @@ public class OhMyClient {
String url = getUrl(OpenAPIConstant.ASSERT) + "?appName=" + appName;
String result = HttpUtils.get(url);
if (StringUtils.isNotEmpty(result)) {
ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class);
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
appId = Long.parseLong(resultDTO.getData().toString());
}else {
@ -72,7 +72,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = HttpUtils.post(url, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JsonUtils.parseObject(post, ResultDTO.class);
}
/**
@ -88,7 +88,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = HttpUtils.post(url, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JsonUtils.parseObject(post, ResultDTO.class);
}
/**
@ -108,7 +108,7 @@ public class OhMyClient {
builder.add("instanceParams", instanceParams);
}
String post = HttpUtils.post(url, builder.build());
return JSONObject.parseObject(post, ResultDTO.class);
return JsonUtils.parseObject(post, ResultDTO.class);
}
public ResultDTO<Long> runJob(Long jobId) throws Exception {
return runJob(jobId, null);
@ -128,7 +128,7 @@ public class OhMyClient {
.add("appId", appId.toString())
.build();
String post = HttpUtils.post(url, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JsonUtils.parseObject(post, ResultDTO.class);
}
/**
@ -143,6 +143,6 @@ public class OhMyClient {
.add("instanceId", instanceId.toString())
.build();
String post = HttpUtils.post(url, body);
return JSONObject.parseObject(post, ResultDTO.class);
return JsonUtils.parseObject(post, ResultDTO.class);
}
}

View File

@ -18,6 +18,7 @@
<commons.lang.version>3.10</commons.lang.version>
<guava.version>28.2-jre</guava.version>
<okhttp.version>4.4.1</okhttp.version>
<akka.version>2.6.4</akka.version>
</properties>
<dependencies>
@ -48,6 +49,18 @@
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
<!-- akka remote -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-serialization-jackson_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,12 @@
package com.github.kfcfans.common;
import java.io.Serializable;
/**
* OMS 序列化接口
*
* @author tjq
* @since 2020/4/16
*/
public interface OmsSerializable extends Serializable {
}

View File

@ -1,11 +1,10 @@
package com.github.kfcfans.common.request;
import com.github.kfcfans.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 服务器查询实例运行状态需要返回详细的运行数据
*
@ -15,6 +14,6 @@ import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerQueryInstanceStatusReq implements Serializable {
public class ServerQueryInstanceStatusReq implements OmsSerializable {
private Long instanceId;
}

View File

@ -1,8 +1,8 @@
package com.github.kfcfans.common.request;
import com.github.kfcfans.common.OmsSerializable;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
/**
@ -12,7 +12,7 @@ import java.util.List;
* @since 2020/3/17
*/
@Data
public class ServerScheduleJobReq implements Serializable {
public class ServerScheduleJobReq implements OmsSerializable {
// 可用处理器地址可能多值逗号分隔
private List<String> allWorkerAddress;

View File

@ -1,10 +1,10 @@
package com.github.kfcfans.common.request;
import com.github.kfcfans.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 服务器要求任务实例停止执行请求
@ -15,6 +15,6 @@ import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerStopInstanceReq implements Serializable {
public class ServerStopInstanceReq implements OmsSerializable {
private Long instanceId;
}

View File

@ -1,8 +1,8 @@
package com.github.kfcfans.common.request;
import com.github.kfcfans.common.OmsSerializable;
import lombok.Data;
import java.io.Serializable;
/**
* TaskTracker 将状态上报给服务器
@ -11,7 +11,7 @@ import java.io.Serializable;
* @since 2020/3/17
*/
@Data
public class TaskTrackerReportInstanceStatusReq implements Serializable {
public class TaskTrackerReportInstanceStatusReq implements OmsSerializable {
private Long jobId;
private Long instanceId;

View File

@ -1,9 +1,9 @@
package com.github.kfcfans.common.request;
import com.github.kfcfans.common.OmsSerializable;
import com.github.kfcfans.common.model.SystemMetrics;
import lombok.Data;
import java.io.Serializable;
/**
* Worker 上报健康信息worker定时发送的heartbeat
@ -12,7 +12,7 @@ import java.io.Serializable;
* @since 2020/3/25
*/
@Data
public class WorkerHeartbeat implements Serializable {
public class WorkerHeartbeat implements OmsSerializable {
// 本机地址 -> IP:port
private String workerAddress;

View File

@ -1,10 +1,10 @@
package com.github.kfcfans.common.response;
import com.github.kfcfans.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* Pattens.ask 的响应
@ -15,7 +15,7 @@ import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AskResponse implements Serializable {
public class AskResponse implements OmsSerializable {
private boolean success;
private Object extra;

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.common.response;
import com.github.kfcfans.common.OmsSerializable;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@ -14,7 +15,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
@Getter
@Setter
@ToString
public class ResultDTO<T> {
public class ResultDTO<T> implements OmsSerializable {
private boolean success;
// 数据success为 true 时存在

View File

@ -0,0 +1,27 @@
package com.github.kfcfans.common.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* JSON工具类
*
* @author tjq
* @since 2020/4/16
*/
public class JsonUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();
public static String toJSONString(Object obj) {
try {
return objectMapper.writeValueAsString(obj);
}catch (Exception ignore) {
}
return null;
}
public static <T> T parseObject(String json, Class<T> clz) throws JsonProcessingException {
return objectMapper.readValue(json, clz);
}
}

View File

@ -15,7 +15,6 @@
<properties>
<swagger.version>2.9.2</swagger.version>
<akka.version>2.6.4</akka.version>
<springboot.version>2.2.6.RELEASE</springboot.version>
<oms.common.version>1.0.0-SNAPSHOT</oms.common.version>
<hikaricp.version>3.4.2</hikaricp.version>
@ -46,13 +45,6 @@
<version>${mysql.version}</version>
</dependency>
<!-- akka remote -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- SpringBoot -->
<dependency>
<groupId>org.springframework.boot</groupId>
@ -84,7 +76,6 @@
<version>${swagger.version}</version>
</dependency>
</dependencies>
<!-- SpringBoot专用的打包插件 -->

View File

@ -1,10 +1,10 @@
package com.github.kfcfans.oms.server.akka.requests;
import com.github.kfcfans.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 查询 Worker 集群状态
@ -15,6 +15,6 @@ import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class FriendQueryWorkerClusterStatusReq implements Serializable {
public class FriendQueryWorkerClusterStatusReq implements OmsSerializable {
private Long appId;
}

View File

@ -1,8 +1,8 @@
package com.github.kfcfans.oms.server.akka.requests;
import com.github.kfcfans.common.OmsSerializable;
import lombok.Data;
import java.io.Serializable;
/**
* 检测目标机器是否存活
@ -11,6 +11,6 @@ import java.io.Serializable;
* @since 2020/4/5
*/
@Data
public class Ping implements Serializable {
public class Ping implements OmsSerializable {
private long currentTime;
}

View File

@ -46,9 +46,9 @@ public interface InstanceLogRepository extends JpaRepository<InstanceLogDO, Long
int update4FrequentJob(long instanceId, int status, long runningTimes);
// 状态检查三兄弟对应 WAITING_DISPATCH WAITING_WORKER_RECEIVE RUNNING 三阶段
List<InstanceLogDO> findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> jobIds, int status, long time);
List<InstanceLogDO> findByJobIdInAndStatusAndActualTriggerTimeLessThan(List<Long> jobIds, int status, long time);
List<InstanceLogDO> findByJobIdInAndStatusAndGmtModifiedBefore(List<Long> jobIds, int status, Date time);
List<InstanceLogDO> findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(List<Long> jobIds, int status, long time);
List<InstanceLogDO> findByAppIdInAndStatusAndActualTriggerTimeLessThan(List<Long> jobIds, int status, long time);
List<InstanceLogDO> findByAppIdInAndStatusAndGmtModifiedBefore(List<Long> jobIds, int status, Date time);
InstanceLogDO findByInstanceId(long instanceId);

View File

@ -73,7 +73,7 @@ public class InstanceStatusCheckService {
// 1. 检查等待 WAITING_DISPATCH 状态的任务
long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS;
List<InstanceLogDO> waitingDispatchInstances = instanceLogRepository.findByJobIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold);
List<InstanceLogDO> waitingDispatchInstances = instanceLogRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingDispatchInstances)) {
log.warn("[InstanceStatusCheckService] instances({}) is not triggered as expected.", waitingDispatchInstances);
waitingDispatchInstances.forEach(instance -> {
@ -85,7 +85,7 @@ public class InstanceStatusCheckService {
// 2. 检查 WAITING_WORKER_RECEIVE 状态的任务
threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS;
List<InstanceLogDO> waitingWorkerReceiveInstances = instanceLogRepository.findByJobIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold);
List<InstanceLogDO> waitingWorkerReceiveInstances = instanceLogRepository.findByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold);
if (!CollectionUtils.isEmpty(waitingWorkerReceiveInstances)) {
log.warn("[InstanceStatusCheckService] instances({}) did nt receive any reply from worker.", waitingWorkerReceiveInstances);
waitingWorkerReceiveInstances.forEach(instance -> {
@ -97,7 +97,7 @@ public class InstanceStatusCheckService {
// 3. 检查 RUNNING 状态的任务一定时间没收到 TaskTracker 的状态报告视为失败
threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS;
List<InstanceLogDO> failedInstances = instanceLogRepository.findByJobIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold));
List<InstanceLogDO> failedInstances = instanceLogRepository.findByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold));
if (!CollectionUtils.isEmpty(failedInstances)) {
log.warn("[InstanceStatusCheckService] instances({}) has not received status report for a long time.", failedInstances);
failedInstances.forEach(instance -> {

View File

@ -3,7 +3,8 @@ server.port=7700
# db config
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://remotehost:3391/oms?charset=utf8mb4&useSSL=false
# JDBC配置不支持utf8mb4需要更改my.conf
spring.datasource.url=jdbc:mysql://remotehost:3391/oms?useUnicode=true&characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=No1Bug2Please3!
# Hikari 数据源专用配置

View File

@ -3,7 +3,11 @@ akka {
# cluster is better(recommend by official document), but I prefer remote
provider = remote
# TODO : 临时使用 Java 序列化,开发完成后切换到 protocol-buffers
allow-java-serialization = on
allow-java-serialization = off
serialization-bindings {
"com.github.kfcfans.common.OmsSerializable" = jackson-cbor
}
}
remote {
artery {

View File

@ -15,14 +15,11 @@
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<akka.version>2.6.4</akka.version>
<oms.common.version>1.0.0-SNAPSHOT</oms.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>
<junit.version>5.6.1</junit.version>
<kryo.version>5.0.0-RC5</kryo.version>
<fastjson.version>1.2.68</fastjson.version>
<okhttp.version>4.4.1</okhttp.version>
<commons.io.version>2.6</commons.io.version>
</properties>
@ -36,13 +33,6 @@
<!-- <scope>provided</scope>-->
</dependency>
<!-- akka remote -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- oms-common -->
<dependency>
<groupId>com.github.kfcfans</groupId>
@ -78,20 +68,6 @@
<scope>test</scope>
</dependency>
<!-- OKHttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
<!-- fastJson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- commons-io -->
<dependency>
<groupId>commons-io</groupId>
@ -99,8 +75,6 @@
<version>${commons.io.version}</version>
</dependency>
<!-- 开发阶段输出日志 -->
<dependency>
<groupId>ch.qos.logback</groupId>

View File

@ -2,9 +2,9 @@ package com.github.kfcfans.oms.worker;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.common.utils.JsonUtils;
import com.github.kfcfans.oms.worker.actors.ProcessorTrackerActor;
import com.github.kfcfans.oms.worker.actors.TaskTrackerActor;
import com.github.kfcfans.oms.worker.background.ServerDiscoveryService;
@ -131,7 +131,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
String realUrl = String.format(url, server, appName);
try {
String resultDTOStr = CommonUtils.executeWithRetry0(() -> HttpUtils.get(realUrl));
ResultDTO resultDTO = JSONObject.parseObject(resultDTOStr, ResultDTO.class);
ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);
if (resultDTO.isSuccess()) {
Long appId = Long.valueOf(resultDTO.getData().toString());
log.info("[OhMyWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId);

View File

@ -1,8 +1,8 @@
package com.github.kfcfans.oms.worker.background;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.common.utils.CommonUtils;
import com.github.kfcfans.common.utils.JsonUtils;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.common.utils.HttpUtils;
import com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker;
@ -95,9 +95,12 @@ public class ServerDiscoveryService {
}catch (Exception ignore) {
}
if (!StringUtils.isEmpty(result)) {
ResultDTO resultDTO = JSONObject.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
return resultDTO.getData().toString();
try {
ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
if (resultDTO.isSuccess()) {
return resultDTO.getData().toString();
}
}catch (Exception ignore) {
}
}
return null;

View File

@ -27,7 +27,7 @@ public class TaskPersistenceService {
// 默认重试参数
private static final int RETRY_TIMES = 3;
private static final long RETRY_INTERVAL_MS = 100;
private static final long RETRY_INTERVAL_MS = 200;
private static volatile boolean initialized = false;
public static TaskPersistenceService INSTANCE = new TaskPersistenceService();

View File

@ -1,8 +1,8 @@
package com.github.kfcfans.oms.worker.pojo.request;
import com.github.kfcfans.common.OmsSerializable;
import lombok.Data;
import java.io.Serializable;
/**
* 广播任务 preExecute 结束信息
@ -11,7 +11,7 @@ import java.io.Serializable;
* @since 2020/3/23
*/
@Data
public class BroadcastTaskPreExecuteFinishedReq implements Serializable {
public class BroadcastTaskPreExecuteFinishedReq implements OmsSerializable {
private Long instanceId;
private Long subInstanceId;

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.worker.pojo.request;
import com.github.kfcfans.common.OmsSerializable;
import com.github.kfcfans.oms.worker.common.ThreadLocalStore;
import com.github.kfcfans.oms.worker.common.utils.SerializerUtils;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
@ -8,7 +9,6 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
/**
@ -19,7 +19,7 @@ import java.util.List;
*/
@Getter
@NoArgsConstructor
public class ProcessorMapTaskRequest implements Serializable {
public class ProcessorMapTaskRequest implements OmsSerializable {
private Long instanceId;
private Long subInstanceId;

View File

@ -1,10 +1,10 @@
package com.github.kfcfans.oms.worker.pojo.request;
import com.github.kfcfans.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* worker 上报 task 执行情况
@ -15,7 +15,7 @@ import java.io.Serializable;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ProcessorReportTaskStatusReq implements Serializable {
public class ProcessorReportTaskStatusReq implements OmsSerializable {
private Long instanceId;
private String taskId;

View File

@ -1,11 +1,10 @@
package com.github.kfcfans.oms.worker.pojo.request;
import com.github.kfcfans.common.utils.NetUtils;
import com.github.kfcfans.common.OmsSerializable;
import com.github.kfcfans.oms.worker.OhMyWorker;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* ProcessorTracker 定时向 TaskTracker 上报健康状态
@ -15,7 +14,7 @@ import java.io.Serializable;
*/
@Data
@NoArgsConstructor
public class ProcessorTrackerStatusReportReq implements Serializable {
public class ProcessorTrackerStatusReportReq implements OmsSerializable {
private Long instanceId;

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.worker.pojo.request;
import com.github.kfcfans.common.OmsSerializable;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.persistence.TaskDO;
import com.github.kfcfans.oms.worker.pojo.model.InstanceInfo;
@ -7,7 +8,6 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.io.Serializable;
/**
* TaskTracker 派发 task 进行执行
@ -18,7 +18,7 @@ import java.io.Serializable;
@Getter
@Setter
@NoArgsConstructor
public class TaskTrackerStartTaskReq implements Serializable {
public class TaskTrackerStartTaskReq implements OmsSerializable {
// TaskTracker 地址
private String taskTrackerAddress;

View File

@ -1,8 +1,8 @@
package com.github.kfcfans.oms.worker.pojo.request;
import com.github.kfcfans.common.OmsSerializable;
import lombok.Data;
import java.io.Serializable;
/**
* TaskTracker 停止 ProcessorTracker释放相关资源
@ -12,7 +12,7 @@ import java.io.Serializable;
* @since 2020/3/25
*/
@Data
public class TaskTrackerStopInstanceReq implements Serializable {
public class TaskTrackerStopInstanceReq implements OmsSerializable {
private Long instanceId;
// 保留字段暂时没用

View File

@ -2,8 +2,11 @@ akka {
actor {
# cluster is better(recommend by official document), but I prefer remote
provider = remote
# TODO : 临时使用 Java 序列化,开发完成后切换到 protocol-buffers
allow-java-serialization = on
allow-java-serialization = off
serialization-bindings {
"com.github.kfcfans.common.OmsSerializable" = jackson-cbor
}
}
remote {
artery {

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.oms.processors;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.utils.JsonUtils;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
@ -16,7 +16,7 @@ public class TestBasicProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
System.out.println("======== BasicProcessor#process ========");
System.out.println("TaskContext: " + JSONObject.toJSONString(context) + ";time = " + System.currentTimeMillis());
System.out.println("TaskContext: " + JsonUtils.toJSONString(context) + ";time = " + System.currentTimeMillis());
return new ProcessResult(true, System.currentTimeMillis() + "success");
}

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.oms.processors;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.utils.JsonUtils;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BroadcastProcessor;
@ -17,14 +17,14 @@ public class TestBroadcastProcessor implements BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext taskContext) throws Exception {
System.out.println("=============== TestBroadcastProcessor#preProcess ===============");
System.out.println("taskContext:" + JSONObject.toJSONString(taskContext));
System.out.println("taskContext:" + JsonUtils.toJSONString(taskContext));
return new ProcessResult(true, "preProcess success");
}
@Override
public ProcessResult postProcess(TaskContext taskContext, Map<String, String> taskId2Result) throws Exception {
System.out.println("=============== TestBroadcastProcessor#postProcess ===============");
System.out.println("taskContext:" + JSONObject.toJSONString(taskContext));
System.out.println("taskContext:" + JsonUtils.toJSONString(taskContext));
System.out.println("taskId2Result:" + taskId2Result);
return new ProcessResult(true, "postProcess success");
}
@ -32,7 +32,7 @@ public class TestBroadcastProcessor implements BroadcastProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
System.out.println("=============== TestBroadcastProcessor#process ===============");
System.out.println("taskContext:" + JSONObject.toJSONString(context));
System.out.println("taskContext:" + JsonUtils.toJSONString(context));
return new ProcessResult(true, "processSuccess");
}
}

View File

@ -1,6 +1,6 @@
package com.github.kfcfans.oms.processors;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.utils.JsonUtils;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.MapReduceProcessor;
@ -24,7 +24,7 @@ public class TestMapReduceProcessor extends MapReduceProcessor {
@Override
public ProcessResult reduce(TaskContext taskContext, Map<String, String> taskId2Result) {
System.out.println("============== TestMapReduceProcessor#reduce ==============");
System.out.println("taskContext:" + JSONObject.toJSONString(taskContext));
System.out.println("taskContext:" + JsonUtils.toJSONString(taskContext));
System.out.println("taskId2Result:" + taskId2Result);
return new ProcessResult(true, "REDUCE_SUCCESS");
}
@ -33,7 +33,7 @@ public class TestMapReduceProcessor extends MapReduceProcessor {
public ProcessResult process(TaskContext context) throws Exception {
System.out.println("============== TestMapReduceProcessor#process ==============");
System.out.println("isRootTask:" + isRootTask());
System.out.println("taskContext:" + JSONObject.toJSONString(context));
System.out.println("taskContext:" + JsonUtils.toJSONString(context));
if (isRootTask()) {
System.out.println("==== MAP ====");
@ -50,7 +50,7 @@ public class TestMapReduceProcessor extends MapReduceProcessor {
return new ProcessResult(true, "MAP_SUCCESS");
}else {
System.out.println("==== NORMAL_PROCESS ====");
System.out.println("subTask: " + JSONObject.toJSONString(context.getSubTask()));
System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask()));
Thread.sleep(1000);
if (context.getCurrentRetryTimes() == 0) {
return new ProcessResult(false, "FIRST_FAILED");

BIN
oh-my-scheduler-worker/~/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -0,0 +1 @@
print 'Hello World!'

View File

@ -0,0 +1 @@
ls -a

View File

@ -0,0 +1,17 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width,initial-scale=1.0">
<link rel="icon" href="/favicon.ico">
<title>oms-console</title>
<link href="/js/0.js" rel="prefetch"><link href="/js/1.js" rel="prefetch"><link href="/js/2.js" rel="prefetch"><link href="/js/3.js" rel="prefetch"><link href="/js/app.js" rel="preload" as="script"><link href="/js/chunk-vendors.js" rel="preload" as="script"></head>
<body>
<noscript>
<strong>We're sorry but oms-console doesn't work properly without JavaScript enabled. Please enable it to continue.</strong>
</noscript>
<div id="app"></div>
<!-- built files will be auto injected -->
<script type="text/javascript" src="/js/chunk-vendors.js"></script><script type="text/javascript" src="/js/app.js"></script></body>
</html>

View File

@ -0,0 +1 @@
ls -a

View File

@ -0,0 +1 @@
pwd

BIN
oh-my-scheduler-worker/~/oms/.DS_Store vendored Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -48,4 +48,46 @@ java.lang.RuntimeException: create root task failed.
#### SystemMetric算分问题
问题java.lang.management.OperatingSystemMXBean#getSystemLoadAverage 不一定能获取CPU当前负载可能返回负数代表不可用...
解决方案印度Windows上getSystemLoadAverage()固定返回-1...太坑了...先做个保护性判断继续测试吧...
解决方案印度Windows上getSystemLoadAverage()固定返回-1...太坑了...先做个保护性判断继续测试吧...
#### 未知的数组越界问题(可能是数据库性能问题)
秒级Broadcast任务在第四次执行时当Processor完成执行上报状态时TaskTracker报错错误的本质原因是无法从数据库中找到这个task对应的记录...
时间表达式FIX_DELAY对应的TaskTracker为FrequentTaskTracker
异常堆栈
```text
2020-04-16 18:05:09 ERROR - [TaskPersistenceService] getTaskStatus failed, instanceId=1586857062542,taskId=4.
java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
at java.util.LinkedList.checkElementIndex(LinkedList.java:555)
at java.util.LinkedList.get(LinkedList.java:476)
at com.github.kfcfans.oms.worker.persistence.TaskPersistenceService.lambda$getTaskStatus$10(TaskPersistenceService.java:214)
at com.github.kfcfans.common.utils.CommonUtils.executeWithRetry(CommonUtils.java:37)
at com.github.kfcfans.oms.worker.persistence.TaskPersistenceService.execute(TaskPersistenceService.java:310)
at com.github.kfcfans.oms.worker.persistence.TaskPersistenceService.getTaskStatus(TaskPersistenceService.java:212)
at com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker.updateTaskStatus(TaskTracker.java:107)
at com.github.kfcfans.oms.worker.core.tracker.task.TaskTracker.broadcast(TaskTracker.java:214)
at com.github.kfcfans.oms.worker.actors.TaskTrackerActor.onReceiveBroadcastTaskPreExecuteFinishedReq(TaskTrackerActor.java:106)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:187)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:186)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:241)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:242)
at akka.actor.Actor.aroundReceive(Actor.scala:534)
at akka.actor.Actor.aroundReceive$(Actor.scala:532)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
at akka.actor.ActorCell.invoke(ActorCell.scala:543)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
2020-04-16 18:05:09 WARN - [TaskTracker-1586857062542] query TaskStatus from DB failed when try to update new TaskStatus(taskId=4,newStatus=6).
```