mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
parse akka config
This commit is contained in:
parent
2e348b411d
commit
2ca2b901b7
@ -1,14 +1,30 @@
|
||||
package com.github.kfcfans.oms.worker;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
import com.github.kfcfans.oms.worker.actors.TaskTrackerActor;
|
||||
import com.github.kfcfans.oms.worker.common.OhMyConfig;
|
||||
import com.github.kfcfans.oms.worker.common.constants.AkkaConstant;
|
||||
import com.github.kfcfans.oms.worker.common.utils.NetUtils;
|
||||
import com.github.kfcfans.oms.worker.common.utils.SpringUtils;
|
||||
import com.github.kfcfans.oms.worker.core.tracker.processor.ProcessorTracker;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.util.StopWatch;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 客户端启动类
|
||||
@ -16,12 +32,11 @@ import org.springframework.context.ApplicationContextAware;
|
||||
* @author KFCFans
|
||||
* @since 2020/3/16
|
||||
*/
|
||||
@Slf4j
|
||||
public class OhMyWorker implements ApplicationContextAware, InitializingBean {
|
||||
|
||||
public static ActorSystem actorSystem;
|
||||
@Getter
|
||||
@Setter
|
||||
private static OhMyConfig config;
|
||||
public static ActorSystem actorSystem;
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
@ -35,5 +50,39 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean {
|
||||
|
||||
public void init() {
|
||||
|
||||
Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
log.info("[OhMyWorker] start to initialize OhMyWorker...");
|
||||
|
||||
try {
|
||||
|
||||
// 初始化 ActorSystem
|
||||
Map<String, Object> overrideConfig = Maps.newHashMap();
|
||||
String localIP = StringUtils.isEmpty(config.getListeningIP()) ? NetUtils.getLocalHost() : config.getListeningIP();
|
||||
overrideConfig.put("akka.remote.artery.canonical.hostname", localIP);
|
||||
if (config.getListeningPort() != null) {
|
||||
overrideConfig.put("akka.remote.artery.canonical.port", config.getListeningPort());
|
||||
}
|
||||
log.info("[OhMyWorker] akka-remote listening address config: {}", overrideConfig);
|
||||
Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG_NAME);
|
||||
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
|
||||
|
||||
actorSystem = ActorSystem.create(AkkaConstant.ACTOR_SYSTEM_NAME, akkaFinalConfig);
|
||||
actorSystem.actorOf(Props.create(TaskTrackerActor.class));
|
||||
actorSystem.actorOf(Props.create(ProcessorTracker.class));
|
||||
|
||||
// 初始化存储
|
||||
TaskPersistenceService.INSTANCE.init();
|
||||
|
||||
}catch (Exception e) {
|
||||
log.error("[OhMyWorker] initialize OhMyWorker failed, using {}.", stopwatch, e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static OhMyConfig getConfig() {
|
||||
return config;
|
||||
}
|
||||
public void setConfig(OhMyConfig cfg) {
|
||||
config = cfg;
|
||||
}
|
||||
}
|
||||
|
@ -27,8 +27,13 @@ public class ProcessorTrackerActor extends AbstractActor {
|
||||
* 处理来自TaskTracker的task执行请求
|
||||
*/
|
||||
private void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) {
|
||||
String jobId = req.getJobId();
|
||||
String instanceId = req.getInstanceId();
|
||||
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> new ProcessorTracker(req.getThreadConcurrency()));
|
||||
processorTracker.submitTask(req, getSender());
|
||||
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, ignore -> {
|
||||
ProcessorTracker pt = new ProcessorTracker(req);
|
||||
log.info("[ProcessorTrackerActor] create ProcessorTracker for instance(jobId={}&instanceId={}) success.", jobId, instanceId);
|
||||
return pt;
|
||||
});
|
||||
processorTracker.submitTask(req);
|
||||
}
|
||||
}
|
||||
|
@ -21,5 +21,6 @@ public class OhMyConfig {
|
||||
/**
|
||||
* 通讯端口
|
||||
*/
|
||||
private int listeningPort;
|
||||
private String listeningIP;
|
||||
private Integer listeningPort;
|
||||
}
|
||||
|
@ -14,6 +14,8 @@ public class AkkaConstant {
|
||||
public static final String ACTOR_SYSTEM_NAME = "oms";
|
||||
|
||||
public static final String Task_TRACKER_ACTOR_NAME = "task_tracker";
|
||||
public static final String WORKER_ACTOR_NAME = "worker";
|
||||
public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker";
|
||||
|
||||
public static final String AKKA_CONFIG_NAME = "oms-akka-application.conf";
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package com.github.kfcfans.oms.worker.core.executor;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSelection;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.github.kfcfans.common.ExecuteType;
|
||||
import com.github.kfcfans.common.ProcessorType;
|
||||
@ -31,7 +32,7 @@ import org.springframework.beans.BeanUtils;
|
||||
@AllArgsConstructor
|
||||
public class ProcessorRunnable implements Runnable {
|
||||
|
||||
private final ActorRef taskTrackerActor;
|
||||
private final ActorSelection taskTrackerActor;
|
||||
|
||||
@Getter
|
||||
private final TaskTrackerStartTaskReq request;
|
||||
@ -85,6 +86,7 @@ public class ProcessorRunnable implements Runnable {
|
||||
TaskContext taskContext = new TaskContext();
|
||||
BeanUtils.copyProperties(request, taskContext);
|
||||
taskContext.setSubTask(JSONObject.parse(request.getSubTaskContent()));
|
||||
|
||||
ThreadLocalStore.TASK_CONTEXT_THREAD_LOCAL.set(taskContext);
|
||||
|
||||
// 5. 正式提交运行
|
||||
|
@ -1,14 +1,22 @@
|
||||
package com.github.kfcfans.oms.worker.core.tracker.processor;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSelection;
|
||||
import com.github.kfcfans.oms.worker.OhMyWorker;
|
||||
import com.github.kfcfans.oms.worker.common.constants.AkkaConstant;
|
||||
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
|
||||
import com.github.kfcfans.oms.worker.common.utils.AkkaUtils;
|
||||
import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskDO;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
@ -17,20 +25,59 @@ import java.util.concurrent.*;
|
||||
* @author tjq
|
||||
* @since 2020/3/20
|
||||
*/
|
||||
@Slf4j
|
||||
public class ProcessorTracker {
|
||||
|
||||
private ExecutorService threadPool;
|
||||
// 记录创建时间
|
||||
private long startTime;
|
||||
private long jobTimeLimitMS;
|
||||
|
||||
public ProcessorTracker(int threadConcurrency) {
|
||||
// 记录该 Job 相关信息
|
||||
private String instanceId;
|
||||
private String executeType;
|
||||
private String processorType;
|
||||
private String processorInfo;
|
||||
private int threadConcurrency;
|
||||
private String jobParams;
|
||||
private String instanceParams;
|
||||
private int maxRetryTimes;
|
||||
|
||||
// 初始化运行用的线程池
|
||||
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10);
|
||||
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build();
|
||||
RejectedProcessorHandler rejectHandler = new RejectedProcessorHandler(TaskPersistenceService.INSTANCE);
|
||||
threadPool = new ThreadPoolExecutor(threadConcurrency, threadConcurrency, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectHandler);
|
||||
private String taskTrackerAddress;
|
||||
private ActorSelection taskTrackerActorRef;
|
||||
|
||||
private ThreadPoolExecutor threadPool;
|
||||
private static final int MAX_QUEUE_SIZE = 20;
|
||||
|
||||
/**
|
||||
* 创建 ProcessorTracker(其实就是创建了个执行用的线程池 T_T)
|
||||
*/
|
||||
public ProcessorTracker(TaskTrackerStartTaskReq request) {
|
||||
|
||||
// 赋值
|
||||
this.startTime = System.currentTimeMillis();
|
||||
this.jobTimeLimitMS = request.getJobTimeLimitMS();
|
||||
this.instanceId = request.getInstanceId();
|
||||
this.executeType = request.getExecuteType();
|
||||
this.processorType = request.getProcessorType();
|
||||
this.processorInfo = request.getProcessorInfo();
|
||||
this.threadConcurrency = request.getThreadConcurrency();
|
||||
this.jobParams = request.getJobParams();
|
||||
this.instanceParams = request.getInstanceParams();
|
||||
this.maxRetryTimes = request.getMaxRetryTimes();
|
||||
this.taskTrackerAddress = request.getTaskTrackerAddress();
|
||||
|
||||
String akkaRemotePath = AkkaUtils.getAkkaRemotePath(taskTrackerAddress, AkkaConstant.Task_TRACKER_ACTOR_NAME);
|
||||
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath);
|
||||
|
||||
// 初始化
|
||||
initProcessorPool();
|
||||
initTimingJob();
|
||||
}
|
||||
|
||||
public void submitTask(TaskTrackerStartTaskReq newTaskReq, ActorRef taskTrackerActorRef) {
|
||||
/**
|
||||
* 提交任务
|
||||
*/
|
||||
public void submitTask(TaskTrackerStartTaskReq newTaskReq) {
|
||||
|
||||
// 1. 回复接受成功
|
||||
ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
|
||||
@ -38,8 +85,114 @@ public class ProcessorTracker {
|
||||
reportReq.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue());
|
||||
taskTrackerActorRef.tell(reportReq, null);
|
||||
|
||||
// 2. 提交线程池执行
|
||||
// 2.1 内存控制,持久化
|
||||
if (threadPool.getQueue().size() > MAX_QUEUE_SIZE) {
|
||||
|
||||
TaskDO newTask = new TaskDO();
|
||||
BeanUtils.copyProperties(newTaskReq, newTask);
|
||||
newTask.setTaskContent(newTaskReq.getSubTaskContent());
|
||||
newTask.setAddress(newTaskReq.getTaskTrackerAddress());
|
||||
newTask.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue());
|
||||
newTask.setFailedCnt(newTaskReq.getCurrentRetryTimes());
|
||||
newTask.setCreatedTime(System.currentTimeMillis());
|
||||
newTask.setLastModifiedTime(System.currentTimeMillis());
|
||||
|
||||
boolean save = TaskPersistenceService.INSTANCE.save(newTask);
|
||||
if (save) {
|
||||
log.debug("[RejectedProcessorHandler] persistent task({}) succeed.", newTask);
|
||||
}else {
|
||||
log.warn("[RejectedProcessorHandler] persistent task({}) failed.", newTask);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 2.2 提交执行
|
||||
ProcessorRunnable processorRunnable = new ProcessorRunnable(taskTrackerActorRef, newTaskReq);
|
||||
threadPool.submit(processorRunnable);
|
||||
}
|
||||
|
||||
/**
|
||||
* 任务是否超时
|
||||
*/
|
||||
public boolean isTimeout() {
|
||||
return System.currentTimeMillis() - startTime > jobTimeLimitMS;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 初始化线程池
|
||||
*/
|
||||
private void initProcessorPool() {
|
||||
// 待执行队列,为了防止对内存造成较大压力,内存队列不能太大
|
||||
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
|
||||
// 自定义线程池中线程名称
|
||||
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-pool-%d").build();
|
||||
threadPool = new ThreadPoolExecutor(threadConcurrency, threadConcurrency, 60L, TimeUnit.SECONDS, queue, threadFactory);
|
||||
// 当没有任务执行时,允许销毁核心线程(即线程池最终存活线程个数可能为0)
|
||||
threadPool.allowCoreThreadTimeOut(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化定时任务
|
||||
*/
|
||||
private void initTimingJob() {
|
||||
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("oms-processor-timing-pool-%d").build();
|
||||
ScheduledExecutorService timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);
|
||||
|
||||
timingPool.scheduleWithFixedDelay(new PoolStatusCheckRunnable(), 60, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 定期检查线程池运行状态(内存中的任务数量不足,则即使从数据库中获取并提交执行)
|
||||
*/
|
||||
private class PoolStatusCheckRunnable implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
int queueSize = threadPool.getQueue().size();
|
||||
if (queueSize >= MAX_QUEUE_SIZE / 2) {
|
||||
return;
|
||||
}
|
||||
|
||||
TaskPersistenceService taskPersistenceService = TaskPersistenceService.INSTANCE;
|
||||
List<TaskDO> taskDOList = taskPersistenceService.getNeedRunTask(instanceId, MAX_QUEUE_SIZE / 2);
|
||||
|
||||
if (CollectionUtils.isEmpty(taskDOList)) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<String> deletedIds = Lists.newLinkedList();
|
||||
|
||||
log.debug("[ProcessorTracker] timing add task to thread pool.");
|
||||
|
||||
// 提交到线程池执行
|
||||
taskDOList.forEach(task -> {
|
||||
runTask(task);
|
||||
deletedIds.add(task.getTaskId());
|
||||
});
|
||||
|
||||
// 删除任务
|
||||
taskPersistenceService.batchDelete(instanceId, deletedIds);
|
||||
}
|
||||
|
||||
private void runTask(TaskDO task) {
|
||||
TaskTrackerStartTaskReq req = new TaskTrackerStartTaskReq();
|
||||
BeanUtils.copyProperties(task, req);
|
||||
|
||||
req.setExecuteType(executeType);
|
||||
req.setProcessorType(processorType);
|
||||
req.setProcessorInfo(processorInfo);
|
||||
req.setTaskTrackerAddress(taskTrackerAddress);
|
||||
req.setJobParams(jobParams);
|
||||
req.setInstanceParams(instanceParams);
|
||||
req.setSubTaskContent(task.getTaskContent());
|
||||
req.setMaxRetryTimes(maxRetryTimes);
|
||||
req.setCurrentRetryTimes(task.getFailedCnt());
|
||||
|
||||
ProcessorRunnable processorRunnable = new ProcessorRunnable(taskTrackerActorRef, req);
|
||||
threadPool.submit(processorRunnable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,9 @@ public class ProcessorTrackerPool {
|
||||
|
||||
private static final Map<String, ProcessorTracker> instanceId2ProcessorTracker = Maps.newConcurrentMap();
|
||||
|
||||
/**
|
||||
* 获取 ProcessorTracker,如果不存在则创建
|
||||
*/
|
||||
public static ProcessorTracker getProcessorTracker(String instanceId, Function<String, ProcessorTracker> creator) {
|
||||
return instanceId2ProcessorTracker.computeIfAbsent(instanceId, creator);
|
||||
}
|
||||
|
@ -1,51 +0,0 @@
|
||||
package com.github.kfcfans.oms.worker.core.tracker.processor;
|
||||
|
||||
import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
|
||||
import com.github.kfcfans.oms.worker.core.executor.ProcessorRunnable;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskDO;
|
||||
import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* 线程池拒绝策略 -> 持久化到本地H2数据库
|
||||
* 出于内存占用考虑,线程池的阻塞队列容量不大,大量子任务涌入时需要持久化到本地数据库
|
||||
* 第一版先直接写H2,如果发现有性能问题再转变为 内存队列 + 批量写入 模式
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/3/23
|
||||
*/
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
public class RejectedProcessorHandler implements RejectedExecutionHandler {
|
||||
|
||||
private final TaskPersistenceService taskPersistenceService;
|
||||
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
|
||||
|
||||
ProcessorRunnable processorRunnable = (ProcessorRunnable) r;
|
||||
TaskTrackerStartTaskReq startTaskReq = processorRunnable.getRequest();
|
||||
|
||||
TaskDO newTask = new TaskDO();
|
||||
BeanUtils.copyProperties(startTaskReq, newTask);
|
||||
newTask.setTaskContent(startTaskReq.getSubTaskContent());
|
||||
newTask.setAddress(startTaskReq.getTaskTrackerAddress());
|
||||
newTask.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue());
|
||||
newTask.setFailedCnt(0);
|
||||
newTask.setCreatedTime(System.currentTimeMillis());
|
||||
newTask.setLastModifiedTime(System.currentTimeMillis());
|
||||
|
||||
boolean save = taskPersistenceService.save(newTask);
|
||||
if (save) {
|
||||
log.debug("[RejectedProcessorHandler] persistent task({}) succeed.", newTask);
|
||||
}else {
|
||||
log.warn("[RejectedProcessorHandler] persistent task({}) failed.", newTask);
|
||||
}
|
||||
}
|
||||
}
|
@ -2,7 +2,6 @@ package com.github.kfcfans.oms.worker.core.tracker.task;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSelection;
|
||||
import com.github.kfcfans.common.ExecuteType;
|
||||
import com.github.kfcfans.common.JobInstanceStatus;
|
||||
import com.github.kfcfans.common.request.TaskTrackerReportInstanceStatusReq;
|
||||
import com.github.kfcfans.oms.worker.OhMyWorker;
|
||||
@ -17,7 +16,6 @@ import com.github.kfcfans.oms.worker.persistence.TaskPersistenceService;
|
||||
import com.github.kfcfans.oms.worker.pojo.model.JobInstanceInfo;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.TaskTrackerStartTaskReq;
|
||||
import com.github.kfcfans.oms.worker.pojo.request.ProcessorReportTaskStatusReq;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.StringUtils;
|
||||
@ -36,6 +34,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@Slf4j
|
||||
public abstract class TaskTracker {
|
||||
|
||||
protected long startTime;
|
||||
protected long jobTimeLimitMS;
|
||||
|
||||
|
||||
// 任务实例信息
|
||||
protected JobInstanceInfo jobInstanceInfo;
|
||||
protected ActorRef taskTrackerActorRef;
|
||||
@ -49,6 +51,9 @@ public abstract class TaskTracker {
|
||||
|
||||
public TaskTracker(JobInstanceInfo jobInstanceInfo, ActorRef taskTrackerActorRef) {
|
||||
|
||||
this.startTime = System.currentTimeMillis();
|
||||
this.jobTimeLimitMS = jobInstanceInfo.getTimeLimit();
|
||||
|
||||
this.jobInstanceInfo = jobInstanceInfo;
|
||||
this.taskTrackerActorRef = taskTrackerActorRef;
|
||||
this.taskPersistenceService = TaskPersistenceService.INSTANCE;
|
||||
@ -92,6 +97,13 @@ public abstract class TaskTracker {
|
||||
return finished.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* 任务是否超时
|
||||
*/
|
||||
public boolean isTimeout() {
|
||||
return System.currentTimeMillis() - startTime > jobTimeLimitMS;
|
||||
}
|
||||
|
||||
/**
|
||||
* 持久化根任务,只有完成持久化才能视为任务开始running(先持久化,再报告server)
|
||||
*/
|
||||
@ -135,7 +147,7 @@ public abstract class TaskTracker {
|
||||
if (StringUtils.isEmpty(targetIP)) {
|
||||
targetIP = allWorkerAddress.get(ThreadLocalRandom.current().nextInt(allWorkerAddress.size()));
|
||||
}
|
||||
String targetPath = AkkaUtils.getAkkaRemotePath(targetIP, AkkaConstant.WORKER_ACTOR_NAME);
|
||||
String targetPath = AkkaUtils.getAkkaRemotePath(targetIP, AkkaConstant.PROCESSOR_TRACKER_ACTOR_NAME);
|
||||
ActorSelection targetActor = OhMyWorker.actorSystem.actorSelection(targetPath);
|
||||
|
||||
// 发送请求(Akka的tell是至少投递一次,经实验表明无法投递消息也不会报错...印度啊...)
|
||||
|
@ -15,7 +15,7 @@ public interface TaskDAO {
|
||||
/**
|
||||
* 初始化任务表
|
||||
*/
|
||||
boolean initTable();
|
||||
void initTable() throws Exception;
|
||||
|
||||
/**
|
||||
* 插入任务数据
|
||||
@ -28,6 +28,8 @@ public interface TaskDAO {
|
||||
*/
|
||||
boolean update(TaskDO task);
|
||||
|
||||
int batchDelete(String instanceId, List<String> taskIds);
|
||||
|
||||
TaskDO selectByKey(String instanceId, String taskId);
|
||||
|
||||
List<TaskDO> simpleQuery(SimpleTaskQuery query);
|
||||
|
@ -4,6 +4,7 @@ import com.github.kfcfans.oms.worker.common.constants.TaskStatus;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.sql.*;
|
||||
import java.util.Collection;
|
||||
@ -20,7 +21,7 @@ import java.util.Map;
|
||||
public class TaskDAOImpl implements TaskDAO {
|
||||
|
||||
@Override
|
||||
public boolean initTable() {
|
||||
public void initTable() throws Exception {
|
||||
|
||||
String delTableSQL = "drop table if exists task_info";
|
||||
String createTableSQL = "create table task_info (task_id varchar(20), instance_id varchar(20), job_id varchar(20), task_name varchar(20), task_content blob, address varchar(20), status int(11), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), unique KEY pkey (instance_id, task_id))";
|
||||
@ -28,11 +29,7 @@ public class TaskDAOImpl implements TaskDAO {
|
||||
try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) {
|
||||
stat.execute(delTableSQL);
|
||||
stat.execute(createTableSQL);
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskDAO] initTable failed.", e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -73,6 +70,20 @@ public class TaskDAOImpl implements TaskDAO {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int batchDelete(String instanceId, List<String> taskIds) {
|
||||
String deleteSQL = "delete from task_info where instance_id = %s and task_id in %s";
|
||||
String sql = String.format(deleteSQL, instanceId, getInStringCondition(taskIds));
|
||||
try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) {
|
||||
|
||||
return stat.executeUpdate(sql);
|
||||
|
||||
}catch (Exception e) {
|
||||
log.error("[TaskDAO] batchDelete failed(instanceId = {}, taskIds = {}).", instanceId, taskIds, e);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskDO selectByKey(String instanceId, String taskId) {
|
||||
String selectSQL = "select * from task_info where instance_id = ? and task_id = ?";
|
||||
@ -195,7 +206,19 @@ public class TaskDAOImpl implements TaskDAO {
|
||||
ps.setLong(11, task.getLastModifiedTime());
|
||||
}
|
||||
|
||||
private static String getInStringCondition(Collection<String> collection) {
|
||||
if (CollectionUtils.isEmpty(collection)) {
|
||||
return "()";
|
||||
}
|
||||
StringBuilder sb = new StringBuilder(" ( ");
|
||||
collection.forEach(str -> sb.append("'").append(str).append("',"));
|
||||
return sb.replace(sb.length() -1, sb.length(), " ) ").toString();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
System.out.println(getInStringCondition(Lists.newArrayList("2.1")));
|
||||
|
||||
TaskDAOImpl taskDAO = new TaskDAOImpl();
|
||||
taskDAO.initTable();
|
||||
|
||||
@ -234,6 +257,9 @@ public class TaskDAOImpl implements TaskDAO {
|
||||
List<Map<String, Object>> dbRES = taskDAO.simpleQueryPlus(query);
|
||||
System.out.println(dbRES);
|
||||
|
||||
System.out.println("=========== start to delete ===========");
|
||||
System.out.println(taskDAO.batchDelete("22", Lists.newArrayList("2.1")));;
|
||||
|
||||
Thread.sleep(100000);
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ import java.util.Map;
|
||||
*/
|
||||
public class TaskPersistenceService {
|
||||
|
||||
private static volatile boolean initialized = false;
|
||||
public static TaskPersistenceService INSTANCE = new TaskPersistenceService();
|
||||
|
||||
private TaskPersistenceService() {
|
||||
@ -25,6 +26,13 @@ public class TaskPersistenceService {
|
||||
private TaskDAO taskDAO = new TaskDAOImpl();
|
||||
private static final int MAX_BATCH_SIZE = 50;
|
||||
|
||||
public void init() throws Exception {
|
||||
if (initialized) {
|
||||
return;
|
||||
}
|
||||
taskDAO.initTable();
|
||||
}
|
||||
|
||||
public boolean save(TaskDO task) {
|
||||
boolean success = taskDAO.save(task);
|
||||
if (!success) {
|
||||
@ -97,4 +105,21 @@ public class TaskPersistenceService {
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取需要被执行的任务
|
||||
*/
|
||||
public List<TaskDO> getNeedRunTask(String instanceId, int limit) {
|
||||
|
||||
SimpleTaskQuery query = new SimpleTaskQuery();
|
||||
query.setInstanceId(instanceId);
|
||||
query.setStatus(TaskStatus.RECEIVE_SUCCESS.getValue());
|
||||
query.setLimit(limit);
|
||||
|
||||
return taskDAO.simpleQuery(query);
|
||||
}
|
||||
|
||||
public int batchDelete(String instanceId, List<String> taskIds) {
|
||||
return taskDAO.batchDelete(instanceId, taskIds);
|
||||
}
|
||||
}
|
||||
|
@ -28,6 +28,8 @@ public class TaskTrackerStartTaskReq {
|
||||
private int threadConcurrency;
|
||||
// TaskTracker 地址
|
||||
private String taskTrackerAddress;
|
||||
// 任务超时时间
|
||||
private long jobTimeLimitMS;
|
||||
|
||||
private String jobParams;
|
||||
private String instanceParams;
|
||||
@ -48,6 +50,7 @@ public class TaskTrackerStartTaskReq {
|
||||
threadConcurrency = instanceInfo.getThreadConcurrency();
|
||||
executeType = instanceInfo.getExecuteType();
|
||||
taskTrackerAddress = NetUtils.getLocalHost();
|
||||
jobTimeLimitMS = instanceInfo.getTimeLimit();
|
||||
|
||||
jobParams = instanceInfo.getJobParams();
|
||||
instanceParams = instanceInfo.getInstanceParams();
|
||||
|
Loading…
x
Reference in New Issue
Block a user