optimize InstanceDetail & fix some bug

This commit is contained in:
tjq 2020-04-17 19:46:19 +08:00
parent 53d1ac82ec
commit 593ee714b8
32 changed files with 491 additions and 74 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@ -1,8 +1,11 @@
package com.github.kfcfans.common.model;
import com.github.kfcfans.common.OmsSerializable;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
/**
* 任务实例的运行详细信息对外
@ -11,7 +14,8 @@ import java.io.Serializable;
* @since 2020/4/11
*/
@Data
public class InstanceDetail implements Serializable {
@NoArgsConstructor
public class InstanceDetail implements OmsSerializable {
// 任务整体开始时间
private Long actualTriggerTime;
@ -24,21 +28,27 @@ public class InstanceDetail implements Serializable {
// TaskTracker地址
private String taskTrackerAddress;
private Object extra;
// MR或BD任务专用
private TaskDetail taskDetail;
// 秒级任务专用
private List<SubInstanceDetail> subInstanceDetails;
// 秒级任务的 extra -> List<SubInstanceDetail>
@Data
public static class SubInstanceDetail implements Serializable {
private long startTime;
private long finishedTime;
@NoArgsConstructor
public static class SubInstanceDetail implements OmsSerializable {
private long subInstanceId;
private String startTime;
private String finishedTime;
private String result;
private String status;
}
// MapReduce Broadcast 任务的 extra ->
@Data
public static class TaskDetail implements Serializable {
@NoArgsConstructor
public static class TaskDetail implements OmsSerializable {
private long totalTaskNum;
private long succeedTaskNum;
private long failedTaskNum;

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.common.model;
import com.github.kfcfans.common.OmsSerializable;
import lombok.Data;
import java.io.Serializable;
@ -11,7 +12,7 @@ import java.io.Serializable;
* @since 2020/3/25
*/
@Data
public class SystemMetrics implements Serializable, Comparable<SystemMetrics> {
public class SystemMetrics implements OmsSerializable, Comparable<SystemMetrics> {
// CPU核心数量
private int cpuProcessors;

View File

@ -1,9 +1,8 @@
package com.github.kfcfans.common.response;
import com.github.kfcfans.common.OmsSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.github.kfcfans.common.utils.JsonUtils;
import lombok.*;
/**
@ -16,10 +15,36 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
public class AskResponse implements OmsSerializable {
private boolean success;
private Object extra;
public AskResponse(boolean success) {
this.success = success;
private boolean success;
/*
- 使用 Object 会报错java.lang.ClassCastException: scala.collection.immutable.HashMap cannot be cast to XXX只能自己序列化反序列化了
- 嵌套类型比如 Map<String, B>如果B也是个复杂对象那么反序列化后B的类型为 LinkedHashMap... 处理比较麻烦转成JSON再转回来
*/
private byte[] data;
// 错误信息
private String message;
public static AskResponse succeed(Object data) {
AskResponse r = new AskResponse();
r.success = true;
if (data != null) {
r.data = JsonUtils.toBytes(data);
}
return r;
}
public static AskResponse failed(String msg) {
AskResponse r = new AskResponse();
r.success = false;
r.message = msg;
return r;
}
public <T> T getData(Class<T> clz) throws Exception {
return JsonUtils.parseObject(data, clz);
}
}

View File

@ -1,6 +1,8 @@
package com.github.kfcfans.common.utils;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.json.JsonReadFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
@ -13,6 +15,10 @@ public class JsonUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();
static {
}
public static String toJSONString(Object obj) {
try {
return objectMapper.writeValueAsString(obj);
@ -21,7 +27,19 @@ public class JsonUtils {
return null;
}
public static byte[] toBytes(Object obj) {
try {
return objectMapper.writeValueAsBytes(obj);
}catch (Exception ignore) {
}
return null;
}
public static <T> T parseObject(String json, Class<T> clz) throws JsonProcessingException {
return objectMapper.readValue(json, clz);
}
public static <T> T parseObject(byte[] b, Class<T> clz) throws Exception {
return objectMapper.readValue(b, clz);
}
}

View File

@ -31,10 +31,7 @@ public class FriendActor extends AbstractActor {
* 处理存活检测的请求
*/
private void onReceivePing(Ping ping) {
AskResponse askResponse = new AskResponse();
askResponse.setSuccess(true);
askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime());
getSender().tell(askResponse, getSelf());
getSender().tell(AskResponse.succeed(System.currentTimeMillis() - ping.getCurrentTime()), getSelf());
}
/**
@ -42,9 +39,7 @@ public class FriendActor extends AbstractActor {
*/
private void onReceiveFriendQueryWorkerClusterStatusReq(FriendQueryWorkerClusterStatusReq req) {
Map<String, SystemMetrics> workerInfo = WorkerManagerService.getActiveWorkerInfo(req.getAppId());
AskResponse askResponse = new AskResponse();
askResponse.setSuccess(true);
askResponse.setExtra(workerInfo);
AskResponse askResponse = AskResponse.succeed(workerInfo);
getSender().tell(askResponse, getSelf());
}
}

View File

@ -46,9 +46,7 @@ public class ServerActor extends AbstractActor {
InstanceManager.updateStatus(req);
// 回复接收成功
AskResponse askResponse = new AskResponse();
askResponse.setSuccess(true);
getSender().tell(askResponse, getSelf());
getSender().tell(AskResponse.succeed(null), getSelf());
}catch (Exception e) {
log.error("[ServerActor] update instance status failed for request: {}.", req, e);
}

View File

@ -39,8 +39,14 @@ public interface InstanceLogRepository extends JpaRepository<InstanceLogDO, Long
@Transactional
@Modifying
@CanIgnoreReturnValue
@Query(value = "update instance_log set status = ?2, running_times = ?3, actual_trigger_time = ?4, task_tracker_address = ?5, result = ?6, gmt_modified = now() where instance_id = ?1", nativeQuery = true)
int update4Trigger(long instanceId, int status, long runningTimes, long actualTriggerTime, String taskTrackerAddress, String result);
@Query(value = "update instance_log set status = ?2, running_times = ?3, actual_trigger_time = ?4, finished_time = ?5, task_tracker_address = ?6, result = ?7, gmt_modified = now() where instance_id = ?1", nativeQuery = true)
int update4TriggerFailed(long instanceId, int status, long runningTimes, long actualTriggerTime, long finishedTime, String taskTrackerAddress, String result);
@Transactional
@Modifying
@CanIgnoreReturnValue
@Query(value = "update instance_log set status = ?2, running_times = ?3, actual_trigger_time = ?4, task_tracker_address = ?5, gmt_modified = now() where instance_id = ?1", nativeQuery = true)
int update4TriggerSucceed(long instanceId, int status, long runningTimes, long actualTriggerTime, String taskTrackerAddress);
@Modifying
@Transactional

View File

@ -62,7 +62,7 @@ public class DispatchService {
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
log.warn("[DispatchService] cancel dispatch job(jobId={}) due to too much instance(num={}) is running.", jobId, runningInstanceCount);
instanceLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, result);
instanceLogRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result);
return;
}
@ -85,7 +85,7 @@ public class DispatchService {
if (CollectionUtils.isEmpty(finalWorkers)) {
String clusterStatusDescription = WorkerManagerService.getWorkerClusterStatusDescription(jobInfo.getAppId());
log.warn("[DispatchService] cancel dispatch job(jobId={}) due to no worker available, clusterStatus is {}.", jobId, clusterStatusDescription);
instanceLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE);
instanceLogRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE);
return;
}
@ -120,6 +120,6 @@ public class DispatchService {
log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString());
// 修改状态
instanceLogRepository.update4Trigger(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress, EMPTY_RESULT);
instanceLogRepository.update4TriggerSucceed(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, current, taskTrackerAddress);
}
}

View File

@ -114,9 +114,9 @@ public class InstanceService {
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (askResponse.isSuccess()) {
return (InstanceDetail) askResponse.getExtra();
return askResponse.getData(InstanceDetail.class);
}else {
log.warn("[InstanceService] ask InstanceStatus from TaskTracker failed, the message is {}.", askResponse.getExtra());
log.warn("[InstanceService] ask InstanceStatus from TaskTracker failed, the message is {}.", askResponse.getMessage());
}
}catch (Exception e) {

View File

@ -106,18 +106,12 @@ public class InstanceStatusCheckService {
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
JobStatus jobStatus = JobStatus.of(jobInfoDO.getStatus());
// 如果任务已关闭则不进行重试将任务置为失败即可
if (jobStatus != JobStatus.ENABLE) {
// 如果任务已关闭则不进行重试将任务置为失败即可秒级任务也直接置为失败由派发器重新调度
if (jobStatus != JobStatus.ENABLE || TimeExpressionType.frequentTypes.contains(timeExpressionType.getV())) {
updateFailedInstance(instance);
return;
}
// 秒级任务无限重试直接派发
if (timeExpressionType == TimeExpressionType.FIX_RATE || timeExpressionType == TimeExpressionType.FIX_DELAY) {
dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes());
return;
}
// CRON API一样失败次数 + 1根据重试配置进行重试
if (instance.getRunningTimes() > jobInfoDO.getInstanceRetryNum()) {
dispatchService.dispatch(jobInfoDO, instance.getInstanceId(), instance.getRunningTimes());
@ -134,6 +128,9 @@ public class InstanceStatusCheckService {
* 处理上报超时而失败的任务实例
*/
private void updateFailedInstance(InstanceLogDO instance) {
log.warn("[InstanceStatusCheckService] detected instance(instanceId={},jobId={})'s TaskTracker report timeout,this instance is considered a failure.", instance.getInstanceId(), instance.getJobId());
instance.setStatus(InstanceStatus.FAILED.getV());
instance.setFinishedTime(System.currentTimeMillis());
instance.setGmtModified(new Date());

View File

@ -49,8 +49,8 @@ public class InstanceController {
}
@GetMapping("/status")
public ResultDTO<InstanceDetail> getRunningStatus(Long instanceId) {
return ResultDTO.success(instanceService.getInstanceDetail(instanceId));
public ResultDTO<InstanceDetail> getRunningStatus(String instanceId) {
return ResultDTO.success(instanceService.getInstanceDetail(Long.valueOf(instanceId)));
}
@PostMapping("/list")
@ -79,10 +79,14 @@ public class InstanceController {
BeanUtils.copyProperties(instanceLogDO, instanceLogVO);
// 状态转化为中文
instanceLogVO.setStatus(InstanceStatus.of(instanceLogDO.getStatus()).getDes());
instanceLogVO.setStatusStr(InstanceStatus.of(instanceLogDO.getStatus()).getDes());
// 额外设置任务名称提高可读性
instanceLogVO.setJobName(cacheService.getJobName(instanceLogDO.getJobId()));
// ID 转化为 StringJS精度丢失
instanceLogVO.setJobId(instanceLogDO.getJobId().toString());
instanceLogVO.setInstanceId(instanceLogDO.getInstanceId().toString());
// 格式化时间
if (instanceLogDO.getActualTriggerTime() == null) {
instanceLogVO.setActualTriggerTime("N/A");

View File

@ -7,6 +7,7 @@ import com.github.kfcfans.common.RemoteConstant;
import com.github.kfcfans.common.model.SystemMetrics;
import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.common.utils.JsonUtils;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.requests.FriendQueryWorkerClusterStatusReq;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
@ -16,6 +17,7 @@ import com.github.kfcfans.oms.server.persistence.repository.JobInfoRepository;
import com.github.kfcfans.oms.server.web.response.SystemOverviewVO;
import com.github.kfcfans.oms.server.web.response.WorkerStatusVO;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
@ -34,6 +36,7 @@ import java.util.concurrent.TimeUnit;
* @author tjq
* @since 2020/4/14
*/
@Slf4j
@RestController
@RequestMapping("/system")
public class SystemInfoController {
@ -46,7 +49,7 @@ public class SystemInfoController {
private InstanceLogRepository instanceLogRepository;
@GetMapping("/listWorker")
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "rawtypes"})
public ResultDTO<List<WorkerStatusVO>> listWorker(Long appId) {
Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
if (!appInfoOpt.isPresent()) {
@ -67,16 +70,22 @@ public class SystemInfoController {
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (askResponse.isSuccess()) {
Map<String, SystemMetrics> address2Info = (Map<String, SystemMetrics>) askResponse.getExtra();
Map address2Info = askResponse.getData(Map.class);
List<WorkerStatusVO> result = Lists.newLinkedList();
address2Info.forEach((address, metrics) -> {
WorkerStatusVO info = new WorkerStatusVO(address, metrics);
result.add(info);
address2Info.forEach((address, m) -> {
try {
SystemMetrics metrics = JsonUtils.parseObject(JsonUtils.toJSONString(m), SystemMetrics.class);
WorkerStatusVO info = new WorkerStatusVO(String.valueOf(address), metrics);
result.add(info);
}catch (Exception e) {
e.printStackTrace();
}
});
return ResultDTO.success(result);
}
return ResultDTO.failed(String.valueOf(askResponse.getExtra()));
return ResultDTO.failed(askResponse.getMessage());
}catch (Exception e) {
log.error("[SystemInfoController] listWorker for appId:{} failed.", appId, e);
return ResultDTO.failed("no worker or server available");
}
}

View File

@ -13,12 +13,12 @@ import java.util.Date;
@Data
public class InstanceLogVO {
// 任务ID
private Long jobId;
// 任务IDJS精度丢失
private String jobId;
// 任务名称
private String jobName;
// 任务实例ID
private Long instanceId;
// 任务实例IDJS精度丢失
private String instanceId;
// 执行结果
private String result;
@ -28,9 +28,10 @@ public class InstanceLogVO {
// 总共执行的次数用于重试判断
private Long runningTimes;
private int status;
/* ********** 不一致区域 ********** */
private String status;
private String statusStr;
// 实际触发时间需要格式化为人看得懂的时间
private String actualTriggerTime;
// 结束时间同理需要格式化

View File

@ -67,7 +67,7 @@ public class RepositoryTest {
@Test
public void testExecuteLogUpdate() {
instanceLogRepository.update4Trigger(1586310414570L, 2, 100, System.currentTimeMillis(), "192.168.1.1", "NULL");
instanceLogRepository.update4TriggerFailed(1586310414570L, 2, 100, System.currentTimeMillis(), System.currentTimeMillis(), "192.168.1.1", "NULL");
instanceLogRepository.update4FrequentJob(1586310419650L, 2, 200);
}

View File

@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oh-my-scheduler</artifactId>
<groupId>com.github.kfcfans</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>oh-my-scheduler-worker-samples</artifactId>
<version>1.0.0</version>
<properties>
<springboot.version>2.2.6.RELEASE</springboot.version>
<oms.worker.version>1.0.0</oms.worker.version>
<fastjson.version>1.2.68</fastjson.version>
</properties>
<dependencies>
<!-- SpringBoot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${springboot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${springboot.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.kfcfans</groupId>
<artifactId>oh-my-scheduler-worker</artifactId>
<version>${oms.worker.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,38 @@
package com.github.kfcfans.oms.server;
import com.github.kfcfans.oms.worker.OhMyWorker;
import com.github.kfcfans.oms.worker.common.OhMyConfig;
import com.github.kfcfans.oms.worker.common.constants.StoreStrategy;
import com.google.common.collect.Lists;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
* OMS-Worker 配置
*
* @author tjq
* @since 2020/4/17
*/
@Configuration
public class OhMySchedulerConfig {
@Bean
public OhMyWorker initOMS() throws Exception {
List<String> serverAddress = Lists.newArrayList("192.168.1.6:7700", "127.0.0.1:7700");
// 1. 创建配置文件
OhMyConfig config = new OhMyConfig();
config.setAppName("oms-test");
config.setServerAddress(serverAddress);
config.setStoreStrategy(StoreStrategy.DISK);
// 2. 创建 Worker 对象设置配置文件
OhMyWorker ohMyWorker = new OhMyWorker();
ohMyWorker.setConfig(config);
return ohMyWorker;
}
}

View File

@ -0,0 +1,19 @@
package com.github.kfcfans.oms.server;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* 主类
*
* @author tjq
* @since 2020/4/17
*/
@EnableScheduling
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class, args);
}
}

View File

@ -0,0 +1,52 @@
package com.github.kfcfans.oms.server.processors;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.TaskResult;
import com.github.kfcfans.oms.worker.core.processor.sdk.BroadcastProcessor;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* 广播处理器 示例
* com.github.kfcfans.oms.server.processors.BroadcastProcessorDemo
*
* @author tjq
* @since 2020/4/17
*/
@Slf4j
public class BroadcastProcessorDemo extends BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext context) throws Exception {
System.out.println("================ BroadcastProcessorDemo#preProcess ================");
System.out.println("TaskContext: " + JSONObject.toJSONString(context));
boolean success = ThreadLocalRandom.current().nextBoolean();
return new ProcessResult(success, context + ": " + success);
}
@Override
public ProcessResult process(TaskContext context) throws Exception {
System.out.println("================ BroadcastProcessorDemo#process ================");
System.out.println("TaskContext: " + JSONObject.toJSONString(context));
boolean success = ThreadLocalRandom.current().nextBoolean();
return new ProcessResult(success, context + ": " + success);
}
@Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception {
System.out.println("================ BroadcastProcessorDemo#postProcess ================");
System.out.println("TaskContext: " + JSONObject.toJSONString(context));
System.out.println("List<TaskResult>: " + JSONObject.toJSONString(taskResults));
boolean success = ThreadLocalRandom.current().nextBoolean();
return new ProcessResult(success, context + ": " + success);
}
}

View File

@ -0,0 +1,84 @@
package com.github.kfcfans.oms.server.processors;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.common.utils.JsonUtils;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.TaskResult;
import com.github.kfcfans.oms.worker.core.processor.sdk.MapReduceProcessor;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* MapReduce 处理器示例
* com.github.kfcfans.oms.server.processors.MapReduceProcessorDemo
*
* @author tjq
* @since 2020/4/17
*/
@Slf4j
public class MapReduceProcessorDemo extends MapReduceProcessor {
// 每一批发送任务大小
private static final int batchSize = 100;
// 发送的批次
private static final int batchNum = 2;
@Override
public ProcessResult process(TaskContext context) throws Exception {
System.out.println("============== TestMapReduceProcessor#process ==============");
System.out.println("isRootTask:" + isRootTask());
System.out.println("taskContext:" + JsonUtils.toJSONString(context));
if (isRootTask()) {
System.out.println("==== MAP ====");
List<TestSubTask> subTasks = Lists.newLinkedList();
for (int j = 0; j < batchNum; j++) {
for (int i = 0; i < batchSize; i++) {
int x = j * batchSize + i;
subTasks.add(new TestSubTask("name" + x, x));
}
ProcessResult mapResult = map(subTasks, "MAP_TEST_TASK");
System.out.println("mapResult: " + mapResult);
subTasks.clear();
}
return new ProcessResult(true, "MAP_SUCCESS");
}else {
System.out.println("==== NORMAL_PROCESS ====");
System.out.println("subTask: " + JsonUtils.toJSONString(context.getSubTask()));
Thread.sleep(1000);
if (context.getCurrentRetryTimes() == 0) {
return new ProcessResult(false, "FIRST_FAILED");
}else {
return new ProcessResult(true, "PROCESS_SUCCESS");
}
}
}
@Override
public ProcessResult reduce(TaskContext context, List<TaskResult> taskResults) {
log.info("================ MapReduceProcessorDemo#postProcess ================");
log.info("TaskContext: {}", JSONObject.toJSONString(context));
log.info("List<TaskResult>: {}", JSONObject.toJSONString(taskResults));
boolean success = ThreadLocalRandom.current().nextBoolean();
return new ProcessResult(success, context + ": " + success);
}
@Getter
@ToString
@NoArgsConstructor
@AllArgsConstructor
private static class TestSubTask {
private String name;
private int age;
}
}

View File

@ -0,0 +1,30 @@
package com.github.kfcfans.oms.server.processors;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.worker.core.processor.ProcessResult;
import com.github.kfcfans.oms.worker.core.processor.TaskContext;
import com.github.kfcfans.oms.worker.core.processor.sdk.BasicProcessor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ThreadLocalRandom;
/**
* 单机处理器 示例
* com.github.kfcfans.oms.server.processors.StandaloneProcessorDemo
*
* @author tjq
* @since 2020/4/17
*/
@Slf4j
public class StandaloneProcessorDemo implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
System.out.println("================ StandaloneProcessorDemo#process ================");
System.out.println("TaskContext: " + JSONObject.toJSONString(context));
boolean success = ThreadLocalRandom.current().nextBoolean();
return new ProcessResult(success, context + ": " + success);
}
}

View File

@ -0,0 +1 @@
server.port=8081

View File

@ -84,5 +84,4 @@
</dependencies>
</project>

View File

@ -89,7 +89,8 @@ public class TaskTrackerActor extends AbstractActor {
log.warn("[TaskTrackerActor] process map task(instanceId={}) failed.", req.getInstanceId(), e);
}
AskResponse response = new AskResponse(success);
AskResponse response = new AskResponse();
response.setSuccess(success);
getSender().tell(response, getSelf());
}
@ -151,16 +152,14 @@ public class TaskTrackerActor extends AbstractActor {
* 查询任务实例运行状态
*/
private void onReceiveServerQueryInstanceStatusReq(ServerQueryInstanceStatusReq req) {
AskResponse askResponse = new AskResponse();
AskResponse askResponse;
TaskTracker taskTracker = TaskTrackerPool.getTaskTrackerPool(req.getInstanceId());
if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ServerQueryInstanceStatusReq({}) but system can't find TaskTracker.", req);
askResponse.setSuccess(false);
askResponse.setExtra("can't find TaskTracker");
askResponse = AskResponse.failed("can't find TaskTracker");
}else {
InstanceDetail instanceDetail = taskTracker.fetchRunningStatus();
askResponse.setSuccess(true);
askResponse.setExtra(instanceDetail);
askResponse = AskResponse.succeed(instanceDetail);
}
getSender().tell(askResponse, getSelf());
}

View File

@ -44,7 +44,7 @@ public abstract class MapProcessor implements BasicProcessor {
}
if (taskList.size() > RECOMMEND_BATCH_SIZE) {
log.warn("[MapReduceProcessor] map task size is too large, network maybe overload... please try to split the tasks.");
log.warn("[MapProcessor] map task size is too large, network maybe overload... please try to split the tasks.");
}
TaskDO task = ThreadLocalStore.getTask();
@ -61,7 +61,7 @@ public abstract class MapProcessor implements BasicProcessor {
AskResponse respObj = (AskResponse) requestCS.toCompletableFuture().get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
requestSucceed = respObj.isSuccess();
}catch (Exception e) {
log.warn("[MapReduceProcessor] map failed.", e);
log.warn("[MapProcessor] map failed.", e);
}
if (requestSucceed) {

View File

@ -73,7 +73,7 @@ public class CommonTaskTracker extends TaskTracker {
taskDetail.setSucceedTaskNum(holder.succeedNum);
taskDetail.setFailedTaskNum(holder.failedNum);
taskDetail.setTotalTaskNum(holder.getTotalTaskNum());
detail.setExtra(taskDetail);
detail.setTaskDetail(taskDetail);
return detail;
}

View File

@ -20,6 +20,8 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;
@ -111,15 +113,25 @@ public class FrequentTaskTracker extends TaskTracker {
detail.setTaskTrackerAddress(OhMyWorker.getWorkerAddress());
List<InstanceDetail.SubInstanceDetail> history = Lists.newLinkedList();
recentSubInstanceInfo.forEach((ignore, subInstanceInfo) -> {
recentSubInstanceInfo.forEach((subId, subInstanceInfo) -> {
InstanceDetail.SubInstanceDetail subDetail = new InstanceDetail.SubInstanceDetail();
BeanUtils.copyProperties(subInstanceInfo, subDetail);
subDetail.setStatus(InstanceStatus.of(subInstanceInfo.status).getDes());
InstanceStatus status = InstanceStatus.of(subInstanceInfo.status);
subDetail.setStatus(status.getDes());
subDetail.setSubInstanceId(subId);
// 设置时间
subDetail.setStartTime(DateFormatUtils.format(subInstanceInfo.getStartTime(), "yyyy-MM-dd HH:mm:ss"));
if (status == InstanceStatus.SUCCEED || status == InstanceStatus.FAILED) {
subDetail.setFinishedTime(DateFormatUtils.format(subInstanceInfo.getFinishedTime(), "yyyy-MM-dd HH:mm:ss"));
}else {
subDetail.setFinishedTime("N/A");
}
history.add(subDetail);
});
detail.setExtra(history);
detail.setSubInstanceDetails(history);
return detail;
}

View File

@ -183,11 +183,8 @@ public abstract class TaskTracker {
}
// 更新状态失败重试写入DB失败的也就不重试了...谁让你那么倒霉呢...
TaskDO updateEntity = new TaskDO();
updateEntity.setStatus(nTaskStatus.getValue());
updateEntity.setResult(result);
updateEntity.setLastReportTime(reportTime);
boolean updateResult = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
result = result == null ? "" : result;
boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);
if (!updateResult) {
log.warn("[TaskTracker-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, taskId);

View File

@ -39,4 +39,9 @@ public interface TaskDAO {
*/
List<TaskResult> getAllTaskResult(Long instanceId, Long subInstanceId) throws SQLException;
/**
* 更新任务状态result可能出现千奇百怪的字符比如 ' 只能特殊定制SQL直接写入
*/
boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) throws SQLException;
}

View File

@ -163,6 +163,22 @@ public class TaskDAOImpl implements TaskDAO {
return taskResults;
}
@Override
public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) throws SQLException {
String sql = "update task_info set status = ?, last_report_time = ?, result = ?, last_modified_time = ? where instance_id = ? and task_id = ?";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setInt(1, status);
ps.setLong(2, lastReportTime);
ps.setString(3, result);
ps.setLong(4, lastReportTime);
ps.setLong(5, instanceId);
ps.setString(6, taskId);
ps.executeUpdate();
return true;
}
}
private static TaskDO convert(ResultSet rs) throws SQLException {
TaskDO task = new TaskDO();
task.setTaskId(rs.getString("task_id"));

View File

@ -69,7 +69,7 @@ public class TaskPersistenceService {
}
/**
* 依靠主键更新 Task
* 依靠主键更新 Task不涉及 result 都可以用该方法更新
*/
public boolean updateTask(Long instanceId, String taskId, TaskDO updateEntity) {
try {
@ -82,6 +82,18 @@ public class TaskPersistenceService {
return false;
}
/**
* 更新任务状态
*/
public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) {
try {
return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result));
}catch (Exception e) {
log.error("[TaskPersistenceService] updateTaskStatus failed.", e);
}
return false;
}
/**
* 更新被派发到已经失联的 ProcessorTracker 的任务重新执行
* update task_info

35
pom.xml
View File

@ -12,6 +12,7 @@
<module>oh-my-scheduler-server</module>
<module>oh-my-scheduler-common</module>
<module>oh-my-scheduler-client</module>
<module>oh-my-scheduler-worker-samples</module>
</modules>
<packaging>pom</packaging>
@ -19,6 +20,8 @@
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<lombok.version>1.18.12</lombok.version>
@ -32,6 +35,36 @@
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<testSource>${java.version}</testSource>
<testTarget>${java.version}</testTarget>
</configuration>
</plugin>
<!-- 打包源码 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>${maven-source-plugin.version}</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>