try to do cluster test

This commit is contained in:
tjq 2020-04-11 12:14:36 +08:00
parent 3b331e70cc
commit cd49e1ad98
26 changed files with 297 additions and 213 deletions

View File

@ -16,6 +16,7 @@
<properties> <properties>
<slf4j.version>1.7.30</slf4j.version> <slf4j.version>1.7.30</slf4j.version>
<commons.lang.version>3.10</commons.lang.version> <commons.lang.version>3.10</commons.lang.version>
<guava.version>28.2-jre</guava.version>
</properties> </properties>
<dependencies> <dependencies>
@ -32,6 +33,13 @@
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
<version>${commons.lang.version}</version> <version>${commons.lang.version}</version>
</dependency> </dependency>
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,8 +1,11 @@
package com.github.kfcfans.common; package com.github.kfcfans.common;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import java.util.List;
/** /**
* 任务运行状态 * 任务运行状态
* *
@ -13,7 +16,7 @@ import lombok.Getter;
@AllArgsConstructor @AllArgsConstructor
public enum InstanceStatus { public enum InstanceStatus {
WAITING_DISPATCH(1, "等待任务派发任务处理Server时间轮中"), WAITING_DISPATCH(1, "等待任务派发"),
WAITING_WORKER_RECEIVE(2, "Server已完成任务派发等待Worker接收"), WAITING_WORKER_RECEIVE(2, "Server已完成任务派发等待Worker接收"),
RUNNING(3, "Worker接收成功正在运行任务"), RUNNING(3, "Worker接收成功正在运行任务"),
FAILED(4, "任务运行失败"), FAILED(4, "任务运行失败"),
@ -23,6 +26,9 @@ public enum InstanceStatus {
private int v; private int v;
private String des; private String des;
// 广义的运行状态
public static final List<Integer> generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v);
public static InstanceStatus of(int v) { public static InstanceStatus of(int v) {
for (InstanceStatus is : values()) { for (InstanceStatus is : values()) {
if (v == is.v) { if (v == is.v) {

View File

@ -0,0 +1,29 @@
package com.github.kfcfans.common;
/**
* 系统生成的任务实例运行结果
*
* @author tjq
* @since 2020/4/11
*/
public class SystemInstanceResult {
// 同时运行的任务实例数过多
public static final String TOO_MUCH_INSTANCE = "too much instance(%d>%d)";
// 无可用worker
public static final String NO_WORKER_AVAILABLE = "no worker available";
// 任务执行超时
public static final String INSTANCE_EXECUTE_TIMEOUT = "instance execute timeout";
// 创建根任务失败
public static final String TASK_INIT_FAILED = "create root task failed";
// 未知错误
public static final String UNKNOWN_BUG = "unknown bug";
// TaskTracker 长时间未上报
public static final String REPORT_TIMEOUT = "worker report timeout, maybe TaskTracker down";
// 被用户手动停止
public static final String STOPPED_BY_USER = "stopped by user";
}

View File

@ -59,7 +59,7 @@ public class OhMyServer {
* @param address IP:port * @param address IP:port
* @return ActorSelection * @return ActorSelection
*/ */
public static ActorSelection getServerActor(String address) { public static ActorSelection getFriendActor(String address) {
String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_FRIEND_ACTOR_NAME); String path = String.format(AKKA_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, address, RemoteConstant.SERVER_FRIEND_ACTOR_NAME);
return actorSystem.actorSelection(path); return actorSystem.actorSelection(path);
} }

View File

@ -1,25 +1,9 @@
package com.github.kfcfans.oms.server.akka.actors; package com.github.kfcfans.oms.server.akka.actors;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.pattern.Patterns;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.request.WorkerHeartbeat;
import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.common.response.AskResponse;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.akka.requests.Ping; import com.github.kfcfans.oms.server.akka.requests.Ping;
import com.github.kfcfans.oms.server.akka.requests.RedirectServerQueryInstanceStatusReq;
import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq;
import com.github.kfcfans.oms.server.service.ha.WorkerManagerService;
import com.github.kfcfans.oms.server.service.instance.InstanceManager;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static com.github.kfcfans.common.RemoteConstant.DEFAULT_TIMEOUT_MS;
/** /**
* 处理朋友们的信息处理服务器与服务器之间的通讯 * 处理朋友们的信息处理服务器与服务器之间的通讯
@ -33,7 +17,6 @@ public class FriendActor extends AbstractActor {
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
.match(Ping.class, this::onReceivePing) .match(Ping.class, this::onReceivePing)
.match(RedirectServerStopInstanceReq.class, this::onReceiveRedirectServerStopInstanceReq)
.matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj)) .matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj))
.build(); .build();
} }
@ -47,47 +30,4 @@ public class FriendActor extends AbstractActor {
askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime()); askResponse.setExtra(System.currentTimeMillis() - ping.getCurrentTime());
getSender().tell(askResponse, getSelf()); getSender().tell(askResponse, getSelf());
} }
/**
* 处理停止任务实例的请求
*/
private void onReceiveRedirectServerStopInstanceReq(RedirectServerStopInstanceReq req) {
Long instanceId = req.getServerStopInstanceReq().getInstanceId();
String taskTrackerAddress = InstanceManager.getTaskTrackerAddress(instanceId);
// 非空发请求停止任务实例
if (StringUtils.isNotEmpty(taskTrackerAddress)) {
OhMyServer.getTaskTrackerActor(taskTrackerAddress).tell(req.getServerStopInstanceReq(), getSelf());
return;
}
// 可能刚经历 Server 变更 TaskTracker 宕机先忽略吧打条日志压压惊
log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}), so stop instance may fail.", instanceId);
}
/**
* 处理Server查询任务实例运行情况的请求
*/
private void onReceiveRedirectServerQueryInstanceStatusReq(RedirectServerQueryInstanceStatusReq req) {
Long instanceId = req.getReq().getInstanceId();
String taskTrackerAddress = InstanceManager.getTaskTrackerAddress(instanceId);
AskResponse response = new AskResponse();
if (StringUtils.isEmpty(taskTrackerAddress)) {
response.setSuccess(false);
response.setExtra("can't find TaskTracker");
log.warn("[FriendActor] can't find TaskTracker's address for instance(instanceId={}).", instanceId);
}else {
try {
CompletionStage<Object> ask = Patterns.ask(OhMyServer.getTaskTrackerActor(taskTrackerAddress), req.getReq(), Duration.ofMillis(DEFAULT_TIMEOUT_MS));
response = (AskResponse) ask.toCompletableFuture().get(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}catch (Exception e) {
log.warn("[FriendActor] Ask TaskTracker for instance(instanceId={}) status failed.", instanceId, e);
response.setSuccess(false);
response.setExtra(e.getMessage());
}
}
getSender().tell(response, getSelf());
}
} }

View File

@ -1,17 +0,0 @@
package com.github.kfcfans.oms.server.akka.requests;
import com.github.kfcfans.common.request.ServerQueryInstanceStatusReq;
import lombok.Data;
import java.io.Serializable;
/**
* 重定向 ServerQueryInstanceStatusReq
*
* @author tjq
* @since 2020/4/10
*/
@Data
public class RedirectServerQueryInstanceStatusReq implements Serializable {
private ServerQueryInstanceStatusReq req;
}

View File

@ -1,18 +0,0 @@
package com.github.kfcfans.oms.server.akka.requests;
import com.github.kfcfans.common.request.ServerStopInstanceReq;
import lombok.Data;
import java.io.Serializable;
/**
* 重定向 ServerStopInstanceReq
* 被HTTP请求停止任务实例的机器需要将请求转发到该实例对应的Server上处理由该Server下发到Worker只有该Server持有Worker的地址信息
*
* @author tjq
* @since 2020/4/9
*/
@Data
public class RedirectServerStopInstanceReq implements Serializable {
private ServerStopInstanceReq serverStopInstanceReq;
}

View File

@ -38,6 +38,8 @@ public class ExecuteLogDO {
private Long actualTriggerTime; private Long actualTriggerTime;
// 结束时间 // 结束时间
private Long finishedTime; private Long finishedTime;
// TaskTracker地址
private String taskTrackerAddress;
// 总共执行的次数用于重试判断 // 总共执行的次数用于重试判断
private Long runningTimes; private Long runningTimes;

View File

@ -22,7 +22,7 @@ public interface ExecuteLogRepository extends JpaRepository<ExecuteLogDO, Long>
*/ */
long countByJobIdAndStatusIn(long jobId, List<Integer> status); long countByJobIdAndStatusIn(long jobId, List<Integer> status);
List<ExecuteLogDO> findByJobIdIn(List<Long> jobIds); List<ExecuteLogDO> findByJobIdAndStatusIn(long jobId, List<Integer> status);
/** /**
@ -35,8 +35,8 @@ public interface ExecuteLogRepository extends JpaRepository<ExecuteLogDO, Long>
*/ */
@Transactional @Transactional
@Modifying @Modifying
@Query(value = "update execute_log set status = ?2, running_times = ?3, actual_trigger_time = now(), result = ?4, gmt_modified = now() where instance_id = ?1", nativeQuery = true) @Query(value = "update execute_log set status = ?2, running_times = ?3, actual_trigger_time = now(), task_tracker_address = ?4, result = ?5, gmt_modified = now() where instance_id = ?1", nativeQuery = true)
int update4Trigger(long instanceId, int status, long runningTimes, String result); int update4Trigger(long instanceId, int status, long runningTimes, String taskTrackerAddress, String result);
@Modifying @Modifying
@Transactional @Transactional

View File

@ -1,9 +1,7 @@
package com.github.kfcfans.oms.server.service; package com.github.kfcfans.oms.server.service;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.*;
import com.github.kfcfans.common.ProcessorType;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
@ -37,23 +35,26 @@ public class DispatchService {
// 前三个状态都视为运行中 // 前三个状态都视为运行中
private static final List<Integer> runningStatus = Lists.newArrayList(WAITING_DISPATCH.getV(), WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()); private static final List<Integer> runningStatus = Lists.newArrayList(WAITING_DISPATCH.getV(), WAITING_WORKER_RECEIVE.getV(), RUNNING.getV());
private static final String TOO_MUCH_REASON = "too much instance(%d>%d)";
private static final String NO_WORKER_REASON = "no worker available";
private static final String EMPTY_RESULT = ""; private static final String EMPTY_RESULT = "";
/**
* 将任务从Server派发到WorkerTaskTracker
* @param jobInfo 任务的元信息
* @param instanceId 任务实例ID
* @param currentRunningTimes 当前运行的次数
*/
public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) { public void dispatch(JobInfoDO jobInfo, long instanceId, long currentRunningTimes) {
log.info("[DispatchService] start to dispatch job -> {}.", jobInfo); log.info("[DispatchService] start to dispatch job: {}.", jobInfo);
// 查询当前运行的实例数 // 查询当前运行的实例数
long runningInstanceCount = executeLogRepository.countByJobIdAndStatusIn(jobInfo.getId(), runningStatus); long runningInstanceCount = executeLogRepository.countByJobIdAndStatusIn(jobInfo.getId(), runningStatus);
// 超出最大同时运行限制不执行调度 // 超出最大同时运行限制不执行调度
if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { if (runningInstanceCount > jobInfo.getMaxInstanceNum()) {
String result = String.format(TOO_MUCH_REASON, runningInstanceCount, jobInfo.getMaxInstanceNum()); String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum());
log.warn("[DispatchService] cancel dispatch job({}) due to too much instance(num={}) is running.", jobInfo, runningInstanceCount); log.warn("[DispatchService] cancel dispatch job({}) due to too much instance(num={}) is running.", jobInfo, runningInstanceCount);
executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, result); executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, RemoteConstant.EMPTY_ADDRESS, result);
return; return;
} }
@ -63,7 +64,7 @@ public class DispatchService {
if (StringUtils.isEmpty(taskTrackerAddress)) { if (StringUtils.isEmpty(taskTrackerAddress)) {
log.warn("[DispatchService] cancel dispatch job({}) due to no worker available.", jobInfo); log.warn("[DispatchService] cancel dispatch job({}) due to no worker available.", jobInfo);
executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, NO_WORKER_REASON); executeLogRepository.update4Trigger(instanceId, FAILED.getV(), currentRunningTimes, RemoteConstant.EMPTY_ADDRESS, SystemInstanceResult.NO_WORKER_AVAILABLE);
return; return;
} }
@ -90,6 +91,6 @@ public class DispatchService {
log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString()); log.debug("[DispatchService] send request({}) to TaskTracker({}) succeed.", req, taskTrackerActor.pathString());
// 修改状态 // 修改状态
executeLogRepository.update4Trigger(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, EMPTY_RESULT); executeLogRepository.update4Trigger(instanceId, WAITING_WORKER_RECEIVE.getV(), currentRunningTimes + 1, taskTrackerAddress, EMPTY_RESULT);
} }
} }

View File

@ -114,7 +114,7 @@ public class ServerSelectService {
Ping ping = new Ping(); Ping ping = new Ping();
ping.setCurrentTime(System.currentTimeMillis()); ping.setCurrentTime(System.currentTimeMillis());
ActorSelection serverActor = OhMyServer.getServerActor(serverAddress); ActorSelection serverActor = OhMyServer.getFriendActor(serverAddress);
try { try {
CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS)); CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));
AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS); AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);

View File

@ -0,0 +1,42 @@
package com.github.kfcfans.oms.server.service.instance;
import lombok.Data;
/**
* 任务实例的运行详细信息对外
*
* @author tjq
* @since 2020/4/11
*/
@Data
public class InstanceDetail {
// 任务整体开始时间
private long actualTriggerTime;
// 任务整体结束时间可能不存在
private long finishedTime;
// 任务状态中文
private String status;
// 任务执行结果可能不存在
private String result;
// TaskTracker地址
private String taskTrackerAddress;
private Object extra;
// 秒级任务的 extra -> List<SubInstanceDetail>
private static class SubInstanceDetail {
private long startTime;
private long finishedTime;
private String status;
private String result;
}
// MapReduce Broadcast 任务的 extra ->
private static class ClusterDetail {
private long totalTaskNum;
private long succeedTaskNum;
private long failedTaskNum;
}
}

View File

@ -15,6 +15,7 @@ import org.springframework.beans.BeanUtils;
import java.util.Date; import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.Optional;
/** /**
* 管理被调度的服务 * 管理被调度的服务
@ -63,8 +64,13 @@ public class InstanceManager {
if (!instanceId2JobInfo.containsKey(instanceId)) { if (!instanceId2JobInfo.containsKey(instanceId)) {
log.warn("[InstanceManager] can't find any register info for instance(jobId={},instanceId={}), maybe change the server.", jobId, instanceId); log.warn("[InstanceManager] can't find any register info for instance(jobId={},instanceId={}), maybe change the server.", jobId, instanceId);
JobInfoDO JobInfoDo = getJobInfoRepository().findById(jobId).orElseGet(JobInfoDO::new); Optional<JobInfoDO> jobInfoDOOptional = getJobInfoRepository().findById(jobId);
if (jobInfoDOOptional.isPresent()) {
JobInfoDO JobInfoDo = jobInfoDOOptional.get();
instanceId2JobInfo.put(instanceId, JobInfoDo); instanceId2JobInfo.put(instanceId, JobInfoDo);
}else {
throw new IllegalArgumentException("can't find JobIno by jobId:" + jobId);
}
} }
// 更新本地保存的任务实例状态用于未完成任务前的详细信息查询和缓存加速 // 更新本地保存的任务实例状态用于未完成任务前的详细信息查询和缓存加速

View File

@ -0,0 +1,87 @@
package com.github.kfcfans.oms.server.service.instance;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.SystemInstanceResult;
import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.common.request.ServerStopInstanceReq;
import com.github.kfcfans.oms.server.akka.OhMyServer;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
import static com.github.kfcfans.common.InstanceStatus.RUNNING;
import static com.github.kfcfans.common.InstanceStatus.STOPPED;
/**
* 任务运行实例服务
*
* @author tjq
* @since 2020/4/11
*/
@Slf4j
@Service
public class InstanceService {
@Resource
private AppInfoRepository appInfoRepository;
@Resource
private ExecuteLogRepository executeLogRepository;
/**
* 停止任务实例
* @param instanceId 任务实例ID
*/
public void stopInstance(Long instanceId) {
ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId);
if (executeLogDO == null) {
log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
// 更新数据库将状态置为停止
executeLogDO.setStatus(STOPPED.getV());
executeLogDO.setGmtModified(new Date());
executeLogDO.setFinishedTime(System.currentTimeMillis());
executeLogDO.setResult(SystemInstanceResult.STOPPED_BY_USER);
executeLogRepository.saveAndFlush(executeLogDO);
// 停止 TaskTracker
ActorSelection taskTrackerActor = OhMyServer.getTaskTrackerActor(executeLogDO.getTaskTrackerAddress());
ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId);
taskTrackerActor.tell(req, null);
}
public InstanceDetail getInstanceDetail(Long instanceId) {
ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId);
if (executeLogDO == null) {
log.warn("[InstanceService] can't find execute log for instanceId: {}.", instanceId);
throw new IllegalArgumentException("invalid instanceId: " + instanceId);
}
InstanceStatus instanceStatus = InstanceStatus.of(executeLogDO.getStatus());
InstanceDetail detail = new InstanceDetail();
detail.setStatus(instanceStatus.getDes());
// 只要不是运行状态只需要返回简要信息
if (instanceStatus != RUNNING) {
BeanUtils.copyProperties(executeLogDO, detail);
return detail;
}
// 运行状态下需要分别考虑MapReduceBroadcast和秒级任务的详细信息
return null;
}
}

View File

@ -1,6 +1,7 @@
package com.github.kfcfans.oms.server.service.timing; package com.github.kfcfans.oms.server.service.timing;
import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.SystemInstanceResult;
import com.github.kfcfans.common.TimeExpressionType; import com.github.kfcfans.common.TimeExpressionType;
import com.github.kfcfans.oms.server.common.constans.JobStatus; import com.github.kfcfans.oms.server.common.constans.JobStatus;
import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.akka.OhMyServer;
@ -129,11 +130,14 @@ public class InstanceStatusCheckService {
}); });
} }
/**
* 处理上报超时而失败的任务实例
*/
private void updateFailedInstance(ExecuteLogDO instance) { private void updateFailedInstance(ExecuteLogDO instance) {
instance.setStatus(InstanceStatus.FAILED.getV()); instance.setStatus(InstanceStatus.FAILED.getV());
instance.setFinishedTime(System.currentTimeMillis()); instance.setFinishedTime(System.currentTimeMillis());
instance.setGmtModified(new Date()); instance.setGmtModified(new Date());
instance.setResult("worker report timeout, maybe all worker down"); instance.setResult(SystemInstanceResult.REPORT_TIMEOUT);
executeLogRepository.saveAndFlush(instance); executeLogRepository.saveAndFlush(instance);
} }
} }

View File

@ -1,25 +1,14 @@
package com.github.kfcfans.oms.server.web.controller; package com.github.kfcfans.oms.server.web.controller;
import akka.actor.ActorSelection;
import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.request.ServerStopInstanceReq;
import com.github.kfcfans.common.response.ResultDTO; import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.oms.server.akka.OhMyServer; import com.github.kfcfans.oms.server.service.instance.InstanceDetail;
import com.github.kfcfans.oms.server.akka.requests.RedirectServerStopInstanceReq; import com.github.kfcfans.oms.server.service.instance.InstanceService;
import com.github.kfcfans.oms.server.persistence.model.AppInfoDO;
import com.github.kfcfans.oms.server.persistence.model.ExecuteLogDO;
import com.github.kfcfans.oms.server.persistence.repository.AppInfoRepository;
import com.github.kfcfans.oms.server.persistence.repository.ExecuteLogRepository;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Date;
import static com.github.kfcfans.common.InstanceStatus.*;
/** /**
* 任务实例 Controller * 任务实例 Controller
@ -32,51 +21,16 @@ import static com.github.kfcfans.common.InstanceStatus.*;
public class InstanceController { public class InstanceController {
@Resource @Resource
private ExecuteLogRepository executeLogRepository; private InstanceService instanceService;
@Resource
private AppInfoRepository appInfoRepository;
@GetMapping("/stop") @GetMapping("/stop")
public ResultDTO<Void> stopInstance(Long instanceId) { public ResultDTO<Void> stopInstance(Long instanceId) {
instanceService.stopInstance(instanceId);
ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId);
if (executeLogDO == null) {
return ResultDTO.failed("invalid instanceId: " + instanceId);
}
// 更新数据库将状态置为停止
executeLogDO.setStatus(STOPPED.getV());
executeLogDO.setGmtModified(new Date());
executeLogDO.setFinishedTime(System.currentTimeMillis());
executeLogDO.setResult("STOPPED_BY_USER");
executeLogRepository.saveAndFlush(executeLogDO);
// 获取Server地址准备转发请求
AppInfoDO appInfoDO = appInfoRepository.findById(executeLogDO.getAppId()).orElse(new AppInfoDO());
if (StringUtils.isEmpty(appInfoDO.getCurrentServer())) {
return ResultDTO.failed("can't find server");
}
// 将请求转发给目标ServerHTTP -> AKKA
ActorSelection serverActor = OhMyServer.getServerActor(appInfoDO.getCurrentServer());
RedirectServerStopInstanceReq req = new RedirectServerStopInstanceReq();
req.setServerStopInstanceReq(new ServerStopInstanceReq(instanceId));
serverActor.tell(req, null);
return ResultDTO.success(null); return ResultDTO.success(null);
} }
@GetMapping("/status") @GetMapping("/status")
public ResultDTO<Void> getRunningStatus(Long instanceId) { public ResultDTO<InstanceDetail> getRunningStatus(Long instanceId) {
return ResultDTO.success(instanceService.getInstanceDetail(instanceId));
ExecuteLogDO executeLogDO = executeLogRepository.findByInstanceId(instanceId);
if (executeLogDO == null) {
return ResultDTO.failed("invalid instanceId: " + instanceId);
}
InstanceStatus status = InstanceStatus.of(executeLogDO.getStatus());
if (status == FAILED || status == SUCCEED || status == STOPPED) {
}
return null;
} }
} }

View File

@ -13,8 +13,11 @@ import com.github.kfcfans.common.response.ResultDTO;
import com.github.kfcfans.oms.server.persistence.model.JobInfoDO; import com.github.kfcfans.oms.server.persistence.model.JobInfoDO;
import com.github.kfcfans.oms.server.service.DispatchService; import com.github.kfcfans.oms.server.service.DispatchService;
import com.github.kfcfans.oms.server.service.IdGenerateService; import com.github.kfcfans.oms.server.service.IdGenerateService;
import com.github.kfcfans.oms.server.service.instance.InstanceService;
import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest; import com.github.kfcfans.oms.server.web.request.ModifyJobInfoRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
@ -22,6 +25,7 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.Optional; import java.util.Optional;
/** /**
@ -30,12 +34,16 @@ import java.util.Optional;
* @author tjq * @author tjq
* @since 2020/3/30 * @since 2020/3/30
*/ */
@Slf4j
@RestController @RestController
@RequestMapping("job") @RequestMapping("job")
public class JobController { public class JobController {
@Resource @Resource
private DispatchService dispatchService; private DispatchService dispatchService;
@Resource
private InstanceService instanceService;
@Resource @Resource
private JobInfoRepository jobInfoRepository; private JobInfoRepository jobInfoRepository;
@Resource @Resource
@ -68,7 +76,38 @@ public class JobController {
// 秒级任务直接调度执行 // 秒级任务直接调度执行
if (timeExpressionType == TimeExpressionType.FIX_RATE || timeExpressionType == TimeExpressionType.FIX_DELAY) { if (timeExpressionType == TimeExpressionType.FIX_RATE || timeExpressionType == TimeExpressionType.FIX_DELAY) {
runJobImmediately(jobInfoDO);
}
return ResultDTO.success(null);
}
@GetMapping("/stop")
public ResultDTO<Void> stopJob(Long jobId) throws Exception {
shutdownOrStopJob(jobId, JobStatus.STOPPED);
return ResultDTO.success(null);
}
@GetMapping("/delete")
public ResultDTO<Void> deleteJob(Long jobId) throws Exception {
shutdownOrStopJob(jobId, JobStatus.DELETED);
return ResultDTO.success(null);
}
@GetMapping("/run")
public ResultDTO<Void> runImmediately(Long jobId) {
Optional<JobInfoDO> jobInfoOPT = jobInfoRepository.findById(jobId);
if (!jobInfoOPT.isPresent()) {
throw new IllegalArgumentException("can't find job by jobId:" + jobId);
}
runJobImmediately(jobInfoOPT.get());
return ResultDTO.success(null);
}
/**
* 立即运行JOB
*/
private void runJobImmediately(JobInfoDO jobInfoDO) {
ExecuteLogDO executeLog = new ExecuteLogDO(); ExecuteLogDO executeLog = new ExecuteLogDO();
executeLog.setJobId(jobInfoDO.getId()); executeLog.setJobId(jobInfoDO.getId());
executeLog.setAppId(jobInfoDO.getAppId()); executeLog.setAppId(jobInfoDO.getAppId());
@ -82,30 +121,42 @@ public class JobController {
dispatchService.dispatch(jobInfoDO, executeLog.getInstanceId(), 0); dispatchService.dispatch(jobInfoDO, executeLog.getInstanceId(), 0);
} }
return ResultDTO.success(null); /**
} * 停止或删除某个JOB
* 秒级任务还要额外停止正在运行的任务实例
*/
private void shutdownOrStopJob(Long jobId, JobStatus status) throws IllegalArgumentException {
@GetMapping("/stop") // 1. 先更新 job_info
public ResultDTO<Void> stopJob(Long jobId) throws Exception { Optional<JobInfoDO> jobInfoOPT = jobInfoRepository.findById(jobId);
updateJobStatus(jobId, JobStatus.STOPPED); if (!jobInfoOPT.isPresent()) {
return ResultDTO.success(null); throw new IllegalArgumentException("can't find job by jobId:" + jobId);
} }
JobInfoDO jobInfoDO = jobInfoOPT.get();
@GetMapping("/delete")
public ResultDTO<Void> deleteJob(Long jobId) throws Exception {
updateJobStatus(jobId, JobStatus.DELETED);
return ResultDTO.success(null);
}
private void updateJobStatus(Long jobId, JobStatus status) {
JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> {
throw new IllegalArgumentException("can't find job which id is " + jobId);
});
jobInfoDO.setStatus(status.getV()); jobInfoDO.setStatus(status.getV());
jobInfoRepository.saveAndFlush(jobInfoDO); jobInfoRepository.saveAndFlush(jobInfoDO);
// TODO: 关闭秒级任务 // 2. 关闭秒级任务
TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType());
if (timeExpressionType == TimeExpressionType.CRON || timeExpressionType == TimeExpressionType.API) {
return;
}
List<ExecuteLogDO> executeLogs = executeLogRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.generalizedRunningStatus);
if (CollectionUtils.isEmpty(executeLogs)) {
return;
}
if (executeLogs.size() > 1) {
log.warn("[JobController] frequent job has multi instance, there must ha");
}
executeLogs.forEach(instance -> {
try {
// 重复查询了数据库不过问题不大这个调用量很小
instanceService.stopInstance(instance.getInstanceId());
}catch (Exception ignore) {
}
});
} }
} }

View File

@ -12,7 +12,8 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
/** /**
* 处理内部请求的 Controller * 处理Worker请求的 Controller
* Worker启动时先请求assert验证appName的可用性再根据得到的appId获取Server地址
* *
* @author tjq * @author tjq
* @since 2020/4/4 * @since 2020/4/4

View File

@ -1,7 +0,0 @@
/**
* CRUD较为简单就不单独搞 Service 层了
*
* @author tjq
* @since 2020/4/2
*/
package com.github.kfcfans.oms.server.web.controller;

View File

@ -49,8 +49,6 @@ public class ModifyJobInfoRequest {
private Integer concurrency; private Integer concurrency;
// 任务整体超时时间 // 任务整体超时时间
private Long instanceTimeLimit; private Long instanceTimeLimit;
// 任务的每一个Task超时时间
private Long taskTimeLimit;
/* ************************** 重试配置 ************************** */ /* ************************** 重试配置 ************************** */
private Integer instanceRetryNum; private Integer instanceRetryNum;

View File

@ -19,7 +19,6 @@
<oms.common.version>1.0.0-SNAPSHOT</oms.common.version> <oms.common.version>1.0.0-SNAPSHOT</oms.common.version>
<h2.db.version>1.4.200</h2.db.version> <h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version> <hikaricp.version>3.4.2</hikaricp.version>
<guava.version>28.2-jre</guava.version>
<junit.version>5.6.1</junit.version> <junit.version>5.6.1</junit.version>
<kryo.version>5.0.0-RC5</kryo.version> <kryo.version>5.0.0-RC5</kryo.version>
<fastjson.version>1.2.68</fastjson.version> <fastjson.version>1.2.68</fastjson.version>
@ -63,13 +62,6 @@
<version>${hikaricp.version}</version> <version>${hikaricp.version}</version>
</dependency> </dependency>
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- kryo 超超超高性能序列化框架 --> <!-- kryo 超超超高性能序列化框架 -->
<dependency> <dependency>
<groupId>com.esotericsoftware</groupId> <groupId>com.esotericsoftware</groupId>

View File

@ -4,6 +4,7 @@ import akka.actor.ActorSelection;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import com.github.kfcfans.common.ExecuteType; import com.github.kfcfans.common.ExecuteType;
import com.github.kfcfans.common.InstanceStatus; import com.github.kfcfans.common.InstanceStatus;
import com.github.kfcfans.common.SystemInstanceResult;
import com.github.kfcfans.common.request.ServerScheduleJobReq; import com.github.kfcfans.common.request.ServerScheduleJobReq;
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.common.response.AskResponse; import com.github.kfcfans.common.response.AskResponse;
@ -21,7 +22,6 @@ import org.springframework.util.CollectionUtils;
import java.time.Duration; import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.*; import java.util.concurrent.*;
@ -125,7 +125,7 @@ public class CommonTaskTracker extends TaskTracker {
if (finishedNum == 0) { if (finishedNum == 0) {
finished.set(true); finished.set(true);
success = false; success = false;
result = "CREATE_ROOT_TASK_FAILED"; result = SystemInstanceResult.TASK_INIT_FAILED;
}else { }else {
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType()); ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
@ -137,7 +137,7 @@ public class CommonTaskTracker extends TaskTracker {
List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId); List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);
if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) { if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
success = false; success = false;
result = "UNKNOWN BUG"; result = SystemInstanceResult.UNKNOWN_BUG;
log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId); log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
}else { }else {
result = allTask.get(0).getResult(); result = allTask.get(0).getResult();
@ -178,7 +178,7 @@ public class CommonTaskTracker extends TaskTracker {
if (isTimeout()) { if (isTimeout()) {
finished.set(true); finished.set(true);
success = false; success = false;
result = "TIMEOUT"; result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT;
} }
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);

View File

@ -57,4 +57,10 @@ public class CommonTaskTrackerTest {
Thread.sleep(5000000); Thread.sleep(5000000);
} }
@Test
public void testBroadcast() throws Exception {
remoteTaskTracker.tell(TestUtils.genServerScheduleJobReq(ExecuteType.BROADCAST, TimeExpressionType.CRON), null);
Thread.sleep(5000000);
}
} }

View File

@ -72,7 +72,6 @@ public class ProcessorTrackerTest {
instanceInfo.setProcessorInfo(processor); instanceInfo.setProcessorInfo(processor);
instanceInfo.setInstanceTimeoutMS(500000); instanceInfo.setInstanceTimeoutMS(500000);
instanceInfo.setTaskTimeoutMS(5000000);
instanceInfo.setThreadConcurrency(5); instanceInfo.setThreadConcurrency(5);
instanceInfo.setTaskRetryNum(3); instanceInfo.setTaskRetryNum(3);

View File

@ -29,7 +29,6 @@ public class TestUtils {
req.setTaskRetryNum(3); req.setTaskRetryNum(3);
req.setThreadConcurrency(10); req.setThreadConcurrency(10);
req.setInstanceTimeoutMS(500000); req.setInstanceTimeoutMS(500000);
req.setTaskTimeoutMS(500000);
req.setTimeExpressionType(timeExpressionType.name()); req.setTimeExpressionType(timeExpressionType.name());
switch (timeExpressionType) { switch (timeExpressionType) {
case CRON:req.setTimeExpression("0 * * * * ? "); case CRON:req.setTimeExpression("0 * * * * ? ");

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.oms.processors; package com.github.kfcfans.oms.processors;
import com.alibaba.fastjson.JSONObject;
import com.github.kfcfans.oms.worker.sdk.ProcessResult; import com.github.kfcfans.oms.worker.sdk.ProcessResult;
import com.github.kfcfans.oms.worker.sdk.TaskContext; import com.github.kfcfans.oms.worker.sdk.TaskContext;
import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor; import com.github.kfcfans.oms.worker.sdk.api.BroadcastProcessor;
@ -16,14 +17,14 @@ public class TestBroadcastProcessor implements BroadcastProcessor {
@Override @Override
public ProcessResult preProcess(TaskContext taskContext) throws Exception { public ProcessResult preProcess(TaskContext taskContext) throws Exception {
System.out.println("=============== TestBroadcastProcessor#preProcess ==============="); System.out.println("=============== TestBroadcastProcessor#preProcess ===============");
System.out.println("taskContext:" + taskContext); System.out.println("taskContext:" + JSONObject.toJSONString(taskContext));
return new ProcessResult(true, "preProcess success"); return new ProcessResult(true, "preProcess success");
} }
@Override @Override
public ProcessResult postProcess(TaskContext taskContext, Map<String, String> taskId2Result) throws Exception { public ProcessResult postProcess(TaskContext taskContext, Map<String, String> taskId2Result) throws Exception {
System.out.println("=============== TestBroadcastProcessor#postProcess ==============="); System.out.println("=============== TestBroadcastProcessor#postProcess ===============");
System.out.println("taskContext:" + taskContext); System.out.println("taskContext:" + JSONObject.toJSONString(taskContext));
System.out.println("taskId2Result:" + taskId2Result); System.out.println("taskId2Result:" + taskId2Result);
return new ProcessResult(true, "postProcess success"); return new ProcessResult(true, "postProcess success");
} }
@ -31,7 +32,7 @@ public class TestBroadcastProcessor implements BroadcastProcessor {
@Override @Override
public ProcessResult process(TaskContext context) throws Exception { public ProcessResult process(TaskContext context) throws Exception {
System.out.println("=============== TestBroadcastProcessor#process ==============="); System.out.println("=============== TestBroadcastProcessor#process ===============");
System.out.println("taskContext:" + context); System.out.println("taskContext:" + JSONObject.toJSONString(context));
return new ProcessResult(true, "processSuccess"); return new ProcessResult(true, "processSuccess");
} }
} }