feat: support for starting multiple workers on a single java application #217

This commit is contained in:
tjq 2021-03-07 21:17:19 +08:00
parent 17cb9ea626
commit 30f2d2404e
28 changed files with 343 additions and 203 deletions

View File

@ -21,6 +21,8 @@ import com.github.kfcfans.powerjob.worker.background.ServerDiscoveryService;
import com.github.kfcfans.powerjob.worker.background.WorkerHealthReporter; import com.github.kfcfans.powerjob.worker.background.WorkerHealthReporter;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig; import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.worker.common.PowerBannerPrinter; import com.github.kfcfans.powerjob.worker.common.PowerBannerPrinter;
import com.github.kfcfans.powerjob.worker.common.RuntimeMeta;
import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SpringUtils; import com.github.kfcfans.powerjob.worker.common.utils.SpringUtils;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService; import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
@ -28,14 +30,12 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.typesafe.config.Config; import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.util.StringUtils;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -53,17 +53,8 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class OhMyWorker implements ApplicationContextAware, InitializingBean, DisposableBean { public class OhMyWorker implements ApplicationContextAware, InitializingBean, DisposableBean {
@Getter private ScheduledExecutorService timingPool;
private static OhMyConfig config; private final RuntimeMeta runtimeMeta = new RuntimeMeta();
@Getter
private static String currentServer;
@Getter
private static String workerAddress;
public static ActorSystem actorSystem;
@Getter
private static Long appId;
private static ScheduledExecutorService timingPool;
@Override @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
@ -79,33 +70,54 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
Stopwatch stopwatch = Stopwatch.createStarted(); Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[OhMyWorker] start to initialize OhMyWorker..."); log.info("[OhMyWorker] start to initialize OhMyWorker...");
OhMyConfig config = runtimeMeta.getOhMyConfig();
CommonUtils.requireNonNull(config, "can't find OhMyConfig, please set OhMyConfig first");
try { try {
PowerBannerPrinter.print(); PowerBannerPrinter.print();
// 校验 appName // 校验 appName
if (!config.isEnableTestMode()) { if (!config.isEnableTestMode()) {
appId = assertAppName(); assertAppName();
}else { }else {
log.warn("[OhMyWorker] using TestMode now, it's dangerous if this is production env."); log.warn("[OhMyWorker] using TestMode now, it's dangerous if this is production env.");
} }
// 初始化文件系统
OmsWorkerFileUtils.init(runtimeMeta.getOhMyConfig());
// 初始化元数据
String workerAddress = NetUtils.getLocalHost() + ":" + config.getPort();
runtimeMeta.setWorkerAddress(workerAddress);
// 初始化定时线程池
ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build();
timingPool = Executors.newScheduledThreadPool(3, timingPoolFactory);
// 连接 server
ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(runtimeMeta.getAppId(), runtimeMeta.getOhMyConfig());
serverDiscoveryService.start(timingPool);
runtimeMeta.setServerDiscoveryService(serverDiscoveryService);
// 初始化 ActorSystemmacOS上 new ServerSocket 检测端口占用的方法并不生效可能是AKKA是Scala写的缘故没办法...只能靠异常重试了 // 初始化 ActorSystemmacOS上 new ServerSocket 检测端口占用的方法并不生效可能是AKKA是Scala写的缘故没办法...只能靠异常重试了
Map<String, Object> overrideConfig = Maps.newHashMap(); Map<String, Object> overrideConfig = Maps.newHashMap();
overrideConfig.put("akka.remote.artery.canonical.hostname", NetUtils.getLocalHost()); overrideConfig.put("akka.remote.artery.canonical.hostname", NetUtils.getLocalHost());
overrideConfig.put("akka.remote.artery.canonical.port", config.getPort()); overrideConfig.put("akka.remote.artery.canonical.port", config.getPort());
workerAddress = NetUtils.getLocalHost() + ":" + config.getPort();
Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME); Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
int cores = Runtime.getRuntime().availableProcessors(); int cores = Runtime.getRuntime().availableProcessors();
actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig); ActorSystem actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig);
actorSystem.actorOf(Props.create(TaskTrackerActor.class) runtimeMeta.setActorSystem(actorSystem);
ActorRef taskTrackerActorRef = actorSystem.actorOf(TaskTrackerActor.props(runtimeMeta)
.withDispatcher("akka.task-tracker-dispatcher") .withDispatcher("akka.task-tracker-dispatcher")
.withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME); .withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME);
actorSystem.actorOf(Props.create(ProcessorTrackerActor.class) actorSystem.actorOf(ProcessorTrackerActor.props(runtimeMeta)
.withDispatcher("akka.processor-tracker-dispatcher") .withDispatcher("akka.processor-tracker-dispatcher")
.withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); .withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
actorSystem.actorOf(Props.create(WorkerActor.class) actorSystem.actorOf(WorkerActor.props(taskTrackerActorRef)
.withDispatcher("akka.worker-common-dispatcher") .withDispatcher("akka.worker-common-dispatcher")
.withRouter(new RoundRobinPool(cores)), RemoteConstant.WORKER_ACTOR_NAME); .withRouter(new RoundRobinPool(cores)), RemoteConstant.WORKER_ACTOR_NAME);
@ -116,23 +128,19 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress); log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress);
log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem); log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
// 初始化日志系统
OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, actorSystem, serverDiscoveryService);
runtimeMeta.setOmsLogHandler(omsLogHandler);
// 初始化存储 // 初始化存储
TaskPersistenceService.INSTANCE.init(); TaskPersistenceService taskPersistenceService = new TaskPersistenceService(runtimeMeta.getOhMyConfig().getStoreStrategy());
taskPersistenceService.init();
runtimeMeta.setTaskPersistenceService(taskPersistenceService);
log.info("[OhMyWorker] local storage initialized successfully."); log.info("[OhMyWorker] local storage initialized successfully.");
// 服务发现
currentServer = ServerDiscoveryService.discovery();
if (StringUtils.isEmpty(currentServer) && !config.isEnableTestMode()) {
throw new RuntimeException("can't find any available server, this worker has been quarantined.");
}
log.info("[OhMyWorker] discovery server succeed, current server is {}.", currentServer);
// 初始化定时任务 // 初始化定时任务
ThreadFactory timingPoolFactory = new ThreadFactoryBuilder().setNameFormat("oms-worker-timing-pool-%d").build(); timingPool.scheduleAtFixedRate(new WorkerHealthReporter(runtimeMeta), 0, 15, TimeUnit.SECONDS);
timingPool = Executors.newScheduledThreadPool(3, timingPoolFactory); timingPool.scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);
timingPool.scheduleAtFixedRate(new WorkerHealthReporter(), 0, 15, TimeUnit.SECONDS);
timingPool.scheduleAtFixedRate(() -> currentServer = ServerDiscoveryService.discovery(), 10, 10, TimeUnit.SECONDS);
timingPool.scheduleWithFixedDelay(OmsLogHandler.INSTANCE.logSubmitter, 0, 5, TimeUnit.SECONDS);
log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch); log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch);
}catch (Exception e) { }catch (Exception e) {
@ -142,12 +150,13 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
} }
public void setConfig(OhMyConfig config) { public void setConfig(OhMyConfig config) {
OhMyWorker.config = config; runtimeMeta.setOhMyConfig(config);
} }
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private Long assertAppName() { private void assertAppName() {
OhMyConfig config = runtimeMeta.getOhMyConfig();
String appName = config.getAppName(); String appName = config.getAppName();
Objects.requireNonNull(appName, "appName can't be empty!"); Objects.requireNonNull(appName, "appName can't be empty!");
@ -160,7 +169,8 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
if (resultDTO.isSuccess()) { if (resultDTO.isSuccess()) {
Long appId = Long.valueOf(resultDTO.getData().toString()); Long appId = Long.valueOf(resultDTO.getData().toString());
log.info("[OhMyWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId); log.info("[OhMyWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId);
return appId; runtimeMeta.setAppId(appId);
return;
}else { }else {
log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName); log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
throw new PowerJobException(resultDTO.getMessage()); throw new PowerJobException(resultDTO.getMessage());

View File

@ -1,11 +1,14 @@
package com.github.kfcfans.powerjob.worker.actors; package com.github.kfcfans.powerjob.worker.actors;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.Props;
import com.github.kfcfans.powerjob.worker.common.RuntimeMeta;
import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker; import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker;
import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTrackerPool; import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTrackerPool;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO; import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq; import com.github.kfcfans.powerjob.worker.pojo.request.TaskTrackerStopInstanceReq;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -18,8 +21,15 @@ import java.util.List;
* @since 2020/3/17 * @since 2020/3/17
*/ */
@Slf4j @Slf4j
@AllArgsConstructor
public class ProcessorTrackerActor extends AbstractActor { public class ProcessorTrackerActor extends AbstractActor {
private final RuntimeMeta runtimeMeta;
public static Props props(RuntimeMeta runtimeMeta) {
return Props.create(ProcessorTrackerActor.class, () -> new ProcessorTrackerActor(runtimeMeta));
}
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
@ -38,7 +48,10 @@ public class ProcessorTrackerActor extends AbstractActor {
Long instanceId = req.getInstanceInfo().getInstanceId(); Long instanceId = req.getInstanceInfo().getInstanceId();
// 创建 ProcessorTracker 一定能成功 // 创建 ProcessorTracker 一定能成功
ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(instanceId, req.getTaskTrackerAddress(), () -> new ProcessorTracker(req)); ProcessorTracker processorTracker = ProcessorTrackerPool.getProcessorTracker(
instanceId,
req.getTaskTrackerAddress(),
() -> new ProcessorTracker(req, runtimeMeta));
TaskDO task = new TaskDO(); TaskDO task = new TaskDO();

View File

@ -1,10 +1,12 @@
package com.github.kfcfans.powerjob.worker.actors; package com.github.kfcfans.powerjob.worker.actors;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.Props;
import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.ServerQueryInstanceStatusReq;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq; import com.github.kfcfans.powerjob.common.request.ServerStopInstanceReq;
import com.github.kfcfans.powerjob.worker.common.RuntimeMeta;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool;
@ -14,6 +16,7 @@ import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorReportTaskStatus
import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.List; import java.util.List;
@ -25,8 +28,15 @@ import java.util.List;
* @since 2020/3/17 * @since 2020/3/17
*/ */
@Slf4j @Slf4j
@AllArgsConstructor
public class TaskTrackerActor extends AbstractActor { public class TaskTrackerActor extends AbstractActor {
private final RuntimeMeta runtimeMeta;
public static Props props(RuntimeMeta runtimeMeta) {
return Props.create(TaskTrackerActor.class, () -> new TaskTrackerActor(runtimeMeta));
}
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()
@ -123,7 +133,7 @@ public class TaskTrackerActor extends AbstractActor {
log.debug("[TaskTrackerActor] server schedule job by request: {}.", req); log.debug("[TaskTrackerActor] server schedule job by request: {}.", req);
// 原子创建防止多实例的存在 // 原子创建防止多实例的存在
TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> TaskTracker.create(req)); TaskTrackerPool.atomicCreateTaskTracker(instanceId, ignore -> TaskTracker.create(req, runtimeMeta));
} }
/** /**

View File

@ -1,9 +1,13 @@
package com.github.kfcfans.powerjob.worker.actors; package com.github.kfcfans.powerjob.worker.actors;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest; import com.github.kfcfans.powerjob.common.request.ServerDeployContainerRequest;
import com.github.kfcfans.powerjob.common.request.ServerDestroyContainerRequest; import com.github.kfcfans.powerjob.common.request.ServerDestroyContainerRequest;
import com.github.kfcfans.powerjob.worker.common.RuntimeMeta;
import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory; import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
@ -13,8 +17,15 @@ import lombok.extern.slf4j.Slf4j;
* @since 2020/3/24 * @since 2020/3/24
*/ */
@Slf4j @Slf4j
@AllArgsConstructor
public class WorkerActor extends AbstractActor { public class WorkerActor extends AbstractActor {
private final ActorRef taskTrackerActorRef;
public static Props props(ActorRef taskTrackerActorRef) {
return Props.create(WorkerActor.class, () -> new WorkerActor(taskTrackerActorRef));
}
@Override @Override
public Receive createReceive() { public Receive createReceive() {
return receiveBuilder() return receiveBuilder()

View File

@ -1,11 +1,11 @@
package com.github.kfcfans.powerjob.worker.background; package com.github.kfcfans.powerjob.worker.background;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import com.github.kfcfans.powerjob.common.LogLevel; import com.github.kfcfans.powerjob.common.LogLevel;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.model.InstanceLogContent; import com.github.kfcfans.powerjob.common.model.InstanceLogContent;
import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq; import com.github.kfcfans.powerjob.common.request.WorkerLogReportReq;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Queues; import com.google.common.collect.Queues;
@ -27,23 +27,28 @@ import java.util.concurrent.locks.ReentrantLock;
@Slf4j @Slf4j
public class OmsLogHandler { public class OmsLogHandler {
private OmsLogHandler() { private final String workerAddress;
} private final ActorSystem actorSystem;
private final ServerDiscoveryService serverDiscoveryService;
// 单例
public static final OmsLogHandler INSTANCE = new OmsLogHandler();
// 生产者消费者模式异步上传日志
private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue();
// 处理线程需要通过线程池启动 // 处理线程需要通过线程池启动
public final Runnable logSubmitter = new LogSubmitter(); public final Runnable logSubmitter = new LogSubmitter();
// 上报锁只需要一个线程上报即可 // 上报锁只需要一个线程上报即可
private final Lock reportLock = new ReentrantLock(); private final Lock reportLock = new ReentrantLock();
// 生产者消费者模式异步上传日志
private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue();
// 每次上报携带的数据条数 // 每次上报携带的数据条数
private static final int BATCH_SIZE = 20; private static final int BATCH_SIZE = 20;
// 本地囤积阈值 // 本地囤积阈值
private static final int REPORT_SIZE = 1024; private static final int REPORT_SIZE = 1024;
public OmsLogHandler(String workerAddress, ActorSystem actorSystem, ServerDiscoveryService serverDiscoveryService) {
this.workerAddress = workerAddress;
this.actorSystem = actorSystem;
this.serverDiscoveryService = serverDiscoveryService;
}
/** /**
* 提交日志 * 提交日志
* @param instanceId 任务实例ID * @param instanceId 任务实例ID
@ -74,7 +79,7 @@ public class OmsLogHandler {
try { try {
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); String serverPath = AkkaUtils.getServerActorPath(serverDiscoveryService.getCurrentServerAddress());
// 当前无可用 Server // 当前无可用 Server
if (StringUtils.isEmpty(serverPath)) { if (StringUtils.isEmpty(serverPath)) {
if (!logQueue.isEmpty()) { if (!logQueue.isEmpty()) {
@ -84,7 +89,7 @@ public class OmsLogHandler {
return; return;
} }
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); ActorSelection serverActor = actorSystem.actorSelection(serverPath);
List<InstanceLogContent> logs = Lists.newLinkedList(); List<InstanceLogContent> logs = Lists.newLinkedList();
while (!logQueue.isEmpty()) { while (!logQueue.isEmpty()) {
@ -93,7 +98,7 @@ public class OmsLogHandler {
logs.add(logContent); logs.add(logContent);
if (logs.size() >= BATCH_SIZE) { if (logs.size() >= BATCH_SIZE) {
WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), Lists.newLinkedList(logs)); WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, Lists.newLinkedList(logs));
// 不可靠请求WEB日志不追求极致 // 不可靠请求WEB日志不追求极致
serverActor.tell(req, null); serverActor.tell(req, null);
logs.clear(); logs.clear();
@ -105,7 +110,7 @@ public class OmsLogHandler {
} }
if (!logs.isEmpty()) { if (!logs.isEmpty()) {
WorkerLogReportReq req = new WorkerLogReportReq(OhMyWorker.getWorkerAddress(), logs); WorkerLogReportReq req = new WorkerLogReportReq(workerAddress, logs);
serverActor.tell(req, null); serverActor.tell(req, null);
} }

View File

@ -1,19 +1,22 @@
package com.github.kfcfans.powerjob.worker.background; package com.github.kfcfans.powerjob.worker.background;
import com.github.kfcfans.powerjob.common.PowerJobException;
import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.common.response.ResultDTO;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.common.utils.HttpUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTrackerPool;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/** /**
* 服务发现 * 服务发现
@ -24,8 +27,13 @@ import java.util.Map;
@Slf4j @Slf4j
public class ServerDiscoveryService { public class ServerDiscoveryService {
// 配置的可发起HTTP请求的ServerIP:Port private final Long appId;
private static final Map<String, String> IP2ADDRESS = Maps.newHashMap(); private final OhMyConfig config;
private String currentServerAddress;
private final Map<String, String> ip2Address = Maps.newHashMap();
// 服务发现地址 // 服务发现地址
private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d&currentServer=%s&protocol=AKKA"; private static final String DISCOVERY_URL = "http://%s/server/acquire?appId=%d&currentServer=%s&protocol=AKKA";
// 失败次数 // 失败次数
@ -33,27 +41,44 @@ public class ServerDiscoveryService {
// 最大失败次数 // 最大失败次数
private static final int MAX_FAILED_COUNT = 3; private static final int MAX_FAILED_COUNT = 3;
public ServerDiscoveryService(Long appId, OhMyConfig config) {
this.appId = appId;
this.config = config;
}
public static String discovery() { public void start(ScheduledExecutorService timingPool) {
this.currentServerAddress = discovery();
if (org.springframework.util.StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) {
throw new PowerJobException("can't find any available server, this worker has been quarantined.");
}
timingPool.scheduleAtFixedRate(() -> this.currentServerAddress = discovery(), 10, 10, TimeUnit.SECONDS);
}
if (IP2ADDRESS.isEmpty()) { public String getCurrentServerAddress() {
OhMyWorker.getConfig().getServerAddress().forEach(x -> IP2ADDRESS.put(x.split(":")[0], x)); return currentServerAddress;
}
private String discovery() {
if (ip2Address.isEmpty()) {
config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));
} }
String result = null; String result = null;
// 先对当前机器发起请求 // 先对当前机器发起请求
String currentServer = OhMyWorker.getCurrentServer(); String currentServer = currentServerAddress;
if (!StringUtils.isEmpty(currentServer)) { if (!StringUtils.isEmpty(currentServer)) {
String ip = currentServer.split(":")[0]; String ip = currentServer.split(":")[0];
// 直接请求当前Server的HTTP服务可以少一次网络开销减轻Server负担 // 直接请求当前Server的HTTP服务可以少一次网络开销减轻Server负担
String firstServerAddress = IP2ADDRESS.get(ip); String firstServerAddress = ip2Address.get(ip);
if (firstServerAddress != null) { if (firstServerAddress != null) {
result = acquire(firstServerAddress); result = acquire(firstServerAddress);
} }
} }
for (String httpServerAddress : OhMyWorker.getConfig().getServerAddress()) { for (String httpServerAddress : config.getServerAddress()) {
if (StringUtils.isEmpty(result)) { if (StringUtils.isEmpty(result)) {
result = acquire(httpServerAddress); result = acquire(httpServerAddress);
}else { }else {
@ -62,36 +87,36 @@ public class ServerDiscoveryService {
} }
if (StringUtils.isEmpty(result)) { if (StringUtils.isEmpty(result)) {
log.warn("[OmsServerDiscovery] can't find any available server, this worker has been quarantined."); log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined.");
// Server 高可用的前提下连续失败多次说明该节点与外界失联Server已经将秒级任务转移到其他Worker需要杀死本地的任务 // Server 高可用的前提下连续失败多次说明该节点与外界失联Server已经将秒级任务转移到其他Worker需要杀死本地的任务
if (FAILED_COUNT++ > MAX_FAILED_COUNT) { if (FAILED_COUNT++ > MAX_FAILED_COUNT) {
log.warn("[OmsServerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker."); log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");
List<Long> frequentInstanceIds = TaskTrackerPool.getAllFrequentTaskTrackerKeys(); List<Long> frequentInstanceIds = TaskTrackerPool.getAllFrequentTaskTrackerKeys();
if (!CollectionUtils.isEmpty(frequentInstanceIds)) { if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
frequentInstanceIds.forEach(instanceId -> { frequentInstanceIds.forEach(instanceId -> {
TaskTracker taskTracker = TaskTrackerPool.remove(instanceId); TaskTracker taskTracker = TaskTrackerPool.remove(instanceId);
taskTracker.destroy(); taskTracker.destroy();
log.warn("[OmsServerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId); log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);
}); });
} }
FAILED_COUNT = 0; FAILED_COUNT = 0;
} }
return null; return null;
}else { } else {
// 重置失败次数 // 重置失败次数
FAILED_COUNT = 0; FAILED_COUNT = 0;
log.debug("[OmsServerDiscovery] current server is {}.", result); log.debug("[PowerDiscovery] current server is {}.", result);
return result; return result;
} }
} }
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
private static String acquire(String httpServerAddress) { private String acquire(String httpServerAddress) {
String result = null; String result = null;
String url = String.format(DISCOVERY_URL, httpServerAddress, OhMyWorker.getAppId(), OhMyWorker.getCurrentServer()); String url = String.format(DISCOVERY_URL, httpServerAddress, appId, currentServerAddress);
try { try {
result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url)); result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
}catch (Exception ignore) { }catch (Exception ignore) {

View File

@ -5,8 +5,8 @@ import com.github.kfcfans.powerjob.common.Protocol;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.model.SystemMetrics; import com.github.kfcfans.powerjob.common.model.SystemMetrics;
import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat; import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat;
import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.PowerJobWorkerVersion; import com.github.kfcfans.powerjob.worker.common.PowerJobWorkerVersion;
import com.github.kfcfans.powerjob.worker.common.RuntimeMeta;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SystemInfoUtils; import com.github.kfcfans.powerjob.worker.common.utils.SystemInfoUtils;
import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory; import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory;
@ -24,29 +24,31 @@ import org.springframework.util.StringUtils;
@AllArgsConstructor @AllArgsConstructor
public class WorkerHealthReporter implements Runnable { public class WorkerHealthReporter implements Runnable {
private final RuntimeMeta runtimeMeta;
@Override @Override
public void run() { public void run() {
// 没有可用Server无法上报 // 没有可用Server无法上报
String currentServer = OhMyWorker.getCurrentServer(); String currentServer = runtimeMeta.getServerDiscoveryService().getCurrentServerAddress();
if (StringUtils.isEmpty(currentServer)) { if (StringUtils.isEmpty(currentServer)) {
return; return;
} }
SystemMetrics systemMetrics; SystemMetrics systemMetrics;
if (OhMyWorker.getConfig().getSystemMetricsCollector() == null) { if (runtimeMeta.getOhMyConfig().getSystemMetricsCollector() == null) {
systemMetrics = SystemInfoUtils.getSystemMetrics(); systemMetrics = SystemInfoUtils.getSystemMetrics();
} else { } else {
systemMetrics = OhMyWorker.getConfig().getSystemMetricsCollector().collect(); systemMetrics = runtimeMeta.getOhMyConfig().getSystemMetricsCollector().collect();
} }
WorkerHeartbeat heartbeat = new WorkerHeartbeat(); WorkerHeartbeat heartbeat = new WorkerHeartbeat();
heartbeat.setSystemMetrics(systemMetrics); heartbeat.setSystemMetrics(systemMetrics);
heartbeat.setWorkerAddress(OhMyWorker.getWorkerAddress()); heartbeat.setWorkerAddress(runtimeMeta.getWorkerAddress());
heartbeat.setAppName(OhMyWorker.getConfig().getAppName()); heartbeat.setAppName(runtimeMeta.getOhMyConfig().getAppName());
heartbeat.setAppId(OhMyWorker.getAppId()); heartbeat.setAppId(runtimeMeta.getAppId());
heartbeat.setHeartbeatTime(System.currentTimeMillis()); heartbeat.setHeartbeatTime(System.currentTimeMillis());
heartbeat.setVersion(PowerJobWorkerVersion.getVersion()); heartbeat.setVersion(PowerJobWorkerVersion.getVersion());
heartbeat.setProtocol(Protocol.AKKA.name()); heartbeat.setProtocol(Protocol.AKKA.name());
@ -56,11 +58,11 @@ public class WorkerHealthReporter implements Runnable {
heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos()); heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos());
// 发送请求 // 发送请求
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); String serverPath = AkkaUtils.getServerActorPath(currentServer);
if (StringUtils.isEmpty(serverPath)) { if (StringUtils.isEmpty(serverPath)) {
return; return;
} }
ActorSelection actorSelection = OhMyWorker.actorSystem.actorSelection(serverPath); ActorSelection actorSelection = runtimeMeta.getActorSystem().actorSelection(serverPath);
actorSelection.tell(heartbeat, null); actorSelection.tell(heartbeat, null);
} }
} }

View File

@ -0,0 +1,28 @@
package com.github.kfcfans.powerjob.worker.common;
import akka.actor.ActorSystem;
import com.github.kfcfans.powerjob.worker.background.OmsLogHandler;
import com.github.kfcfans.powerjob.worker.background.ServerDiscoveryService;
import com.github.kfcfans.powerjob.worker.persistence.TaskPersistenceService;
import lombok.Data;
/**
* store worker's runtime meta info
*
* @author tjq
* @since 2021/3/7
*/
@Data
public class RuntimeMeta {
private Long appId;
private OhMyConfig ohMyConfig;
private String workerAddress;
private ActorSystem actorSystem;
private OmsLogHandler omsLogHandler;
private ServerDiscoveryService serverDiscoveryService;
private TaskPersistenceService taskPersistenceService;
}

View File

@ -15,6 +15,8 @@ public class ThreadLocalStore {
private static final ThreadLocal<TaskDO> TASK_THREAD_LOCAL = new ThreadLocal<>(); private static final ThreadLocal<TaskDO> TASK_THREAD_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<RuntimeMeta> RUNTIME_META_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<AtomicLong> TASK_ID_THREAD_LOCAL = new ThreadLocal<>(); private static final ThreadLocal<AtomicLong> TASK_ID_THREAD_LOCAL = new ThreadLocal<>();
@ -26,6 +28,14 @@ public class ThreadLocalStore {
TASK_THREAD_LOCAL.set(task); TASK_THREAD_LOCAL.set(task);
} }
public static RuntimeMeta getRuntimeMeta() {
return RUNTIME_META_LOCAL.get();
}
public static void setRuntimeMeta(RuntimeMeta runtimeMeta) {
RUNTIME_META_LOCAL.set(runtimeMeta);
}
public static AtomicLong getTaskIDAddr() { public static AtomicLong getTaskIDAddr() {
if (TASK_ID_THREAD_LOCAL.get() == null) { if (TASK_ID_THREAD_LOCAL.get() == null) {
TASK_ID_THREAD_LOCAL.set(new AtomicLong(0)); TASK_ID_THREAD_LOCAL.set(new AtomicLong(0));
@ -35,6 +45,7 @@ public class ThreadLocalStore {
public static void clear() { public static void clear() {
TASK_ID_THREAD_LOCAL.remove(); TASK_ID_THREAD_LOCAL.remove();
RUNTIME_META_LOCAL.remove();
TASK_THREAD_LOCAL.remove(); TASK_THREAD_LOCAL.remove();
} }

View File

@ -31,11 +31,11 @@ public class AkkaUtils {
return String.format(AKKA_NODE_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, actorName); return String.format(AKKA_NODE_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, actorName);
} }
public static String getAkkaServerPath(String actorName) { public static String getServerActorPath(String serverAddress) {
if (StringUtils.isEmpty(OhMyWorker.getCurrentServer())) { if (StringUtils.isEmpty(serverAddress)) {
return null; return null;
} }
return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, OhMyWorker.getCurrentServer(), actorName); return String.format(AKKA_NODE_PATH, RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, serverAddress, RemoteConstant.SERVER_ACTOR_NAME);
} }
/** /**

View File

@ -2,6 +2,8 @@ package com.github.kfcfans.powerjob.worker.common.utils;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import lombok.extern.slf4j.Slf4j;
/** /**
* 文件工具类 * 文件工具类
@ -9,20 +11,26 @@ import com.github.kfcfans.powerjob.worker.OhMyWorker;
* @author tjq * @author tjq
* @since 2020/5/16 * @since 2020/5/16
*/ */
@Slf4j
public class OmsWorkerFileUtils { public class OmsWorkerFileUtils {
private static final String USER_HOME = System.getProperty("user.home", "powerjob"); private static String basePath;
private static final String WORKER_DIR = USER_HOME + "/powerjob/" + OhMyWorker.getConfig().getAppName() + "/";
public static void init(OhMyConfig config) {
String userHome = System.getProperty("user.home", "powerjob");
basePath = userHome + "/powerjob/" + config.getAppName() + "/";
log.info("[PowerFile] use base file path: {}", basePath);
}
public static String getScriptDir() { public static String getScriptDir() {
return WORKER_DIR + "script/"; return basePath + "script/";
} }
public static String getContainerDir() { public static String getContainerDir() {
return WORKER_DIR + "container/"; return basePath + "container/";
} }
public static String getH2WorkDir() { public static String getH2WorkDir() {
return WORKER_DIR + "h2/" + CommonUtils.genUUID() + "/"; return basePath + "h2/" + CommonUtils.genUUID() + "/";
} }
} }

View File

@ -18,16 +18,15 @@ public class WorkflowContextUtils {
} }
public static boolean isExceededLengthLimit(Map<String, String> appendedWfContext) { public static boolean isExceededLengthLimit(Map<String, String> appendedWfContext, int maxLength) {
String jsonString = JsonUtils.toJSONString(appendedWfContext); String jsonString = JsonUtils.toJSONString(appendedWfContext);
if (jsonString == null) { if (jsonString == null) {
// impossible // impossible
return true; return true;
} }
int maxAppendedWfContextLength = OhMyWorker.getConfig().getMaxAppendedWfContextLength();
return maxAppendedWfContextLength < jsonString.length(); return maxLength < jsonString.length();
} }

View File

@ -39,17 +39,17 @@ public class OmsContainerFactory {
/** /**
* 获取容器 * 获取容器
* @param containerId 容器ID * @param containerId 容器ID
* @param loadFromServer 当本地不存在时尝试从 server 加载 * @param serverActor 当容器不存在且 serverActor 非空时尝试从服务端重新拉取容器
* @return 容器示例可能为 null * @return 容器示例可能为 null
*/ */
public static OmsContainer fetchContainer(Long containerId, boolean loadFromServer) { public static OmsContainer fetchContainer(Long containerId, ActorSelection serverActor) {
OmsContainer omsContainer = CARGO.get(containerId); OmsContainer omsContainer = CARGO.get(containerId);
if (omsContainer != null) { if (omsContainer != null) {
return omsContainer; return omsContainer;
} }
if (!loadFromServer) { if (serverActor == null) {
return null; return null;
} }
@ -57,11 +57,6 @@ public class OmsContainerFactory {
log.info("[OmsContainer-{}] can't find the container in factory, try to deploy from server.", containerId); log.info("[OmsContainer-{}] can't find the container in factory, try to deploy from server.", containerId);
WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(containerId); WorkerNeedDeployContainerRequest request = new WorkerNeedDeployContainerRequest(containerId);
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME);
if (StringUtils.isEmpty(serverPath)) {
return null;
}
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath);
try { try {
CompletionStage<Object> askCS = Patterns.ask(serverActor, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS)); CompletionStage<Object> askCS = Patterns.ask(serverActor, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));

View File

@ -190,7 +190,7 @@ public class OmsJarContainer implements OmsContainer {
// 需要满足的条件引用计数器减为0 & 有更新的容器出现 // 需要满足的条件引用计数器减为0 & 有更新的容器出现
if (referenceCount.decrementAndGet() <= 0) { if (referenceCount.decrementAndGet() <= 0) {
OmsContainer container = OmsContainerFactory.fetchContainer(containerId, false); OmsContainer container = OmsContainerFactory.fetchContainer(containerId, null);
if (container != this) { if (container != this) {
try { try {
destroy(); destroy();

View File

@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.worker.core.executor;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.RuntimeMeta;
import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore; import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
@ -57,6 +58,7 @@ public class ProcessorRunnable implements Runnable {
* 重试队列ProcessorTracker 将会定期重新上报处理结果 * 重试队列ProcessorTracker 将会定期重新上报处理结果
*/ */
private final Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue; private final Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
private final RuntimeMeta runtimeMeta;
public void innerRun() throws InterruptedException { public void innerRun() throws InterruptedException {
@ -65,6 +67,8 @@ public class ProcessorRunnable implements Runnable {
log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName()); log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());
ThreadLocalStore.setTask(task); ThreadLocalStore.setTask(task);
ThreadLocalStore.setRuntimeMeta(runtimeMeta);
// 0. 构造任务上下文 // 0. 构造任务上下文
WorkflowContext workflowContext = constructWorkflowContext(); WorkflowContext workflowContext = constructWorkflowContext();
TaskContext taskContext = constructTaskContext(); TaskContext taskContext = constructTaskContext();
@ -113,7 +117,7 @@ public class ProcessorRunnable implements Runnable {
if (task.getTaskContent() != null && task.getTaskContent().length > 0) { if (task.getTaskContent() != null && task.getTaskContent().length > 0) {
taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent())); taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent()));
} }
taskContext.setUserContext(OhMyWorker.getConfig().getUserContext()); taskContext.setUserContext(runtimeMeta.getOhMyConfig().getUserContext());
return taskContext; return taskContext;
} }
@ -131,7 +135,7 @@ public class ProcessorRunnable implements Runnable {
Stopwatch stopwatch = Stopwatch.createStarted(); Stopwatch stopwatch = Stopwatch.createStarted();
log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId); log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", instanceId, taskId);
List<TaskResult> taskResults = TaskPersistenceService.INSTANCE.getAllTaskResult(instanceId, task.getSubInstanceId()); List<TaskResult> taskResults = runtimeMeta.getTaskPersistenceService().getAllTaskResult(instanceId, task.getSubInstanceId());
try { try {
switch (executeType) { switch (executeType) {
case BROADCAST: case BROADCAST:
@ -209,8 +213,8 @@ public class ProcessorRunnable implements Runnable {
req.setReportTime(System.currentTimeMillis()); req.setReportTime(System.currentTimeMillis());
req.setCmd(cmd); req.setCmd(cmd);
// 检查追加的上下文大小是否超出限制 // 检查追加的上下文大小是否超出限制
if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext)) { if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, runtimeMeta.getOhMyConfig().getMaxAppendedWfContextLength())) {
log.warn("[ProcessorRunnable-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(),OhMyWorker.getConfig().getMaxAppendedWfContextLength()); log.warn("[ProcessorRunnable-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(), runtimeMeta.getOhMyConfig().getMaxAppendedWfContextLength());
// ignore appended workflow context data // ignore appended workflow context data
appendedWfContext = Collections.emptyMap(); appendedWfContext = Collections.emptyMap();
} }
@ -254,7 +258,7 @@ public class ProcessorRunnable implements Runnable {
if (StringUtils.isEmpty(result)) { if (StringUtils.isEmpty(result)) {
return ""; return "";
} }
final int maxLength = OhMyWorker.getConfig().getMaxResultLength(); final int maxLength = runtimeMeta.getOhMyConfig().getMaxResultLength();
if (result.length() <= maxLength) { if (result.length() <= maxLength) {
return result; return result;
} }

View File

@ -2,6 +2,7 @@ package com.github.kfcfans.powerjob.worker.core.processor.sdk;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.RuntimeMeta;
import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore; import com.github.kfcfans.powerjob.worker.common.ThreadLocalStore;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
@ -47,13 +48,14 @@ public abstract class MapProcessor implements BasicProcessor {
} }
TaskDO task = ThreadLocalStore.getTask(); TaskDO task = ThreadLocalStore.getTask();
RuntimeMeta runtimeMeta = ThreadLocalStore.getRuntimeMeta();
// 1. 构造请求 // 1. 构造请求
ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName); ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName);
// 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常 // 2. 可靠发送请求任务不允许丢失需要使用 ask 方法失败抛异常
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME); String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME);
boolean requestSucceed = AkkaUtils.reliableTransmit(OhMyWorker.actorSystem.actorSelection(akkaRemotePath), req); boolean requestSucceed = AkkaUtils.reliableTransmit(runtimeMeta.getActorSystem().actorSelection(akkaRemotePath), req);
if (requestSucceed) { if (requestSucceed) {
return new ProcessResult(true, "MAP_SUCCESS"); return new ProcessResult(true, "MAP_SUCCESS");

View File

@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.worker.core.tracker.processor;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import com.github.kfcfans.powerjob.common.*; import com.github.kfcfans.powerjob.common.*;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.common.RuntimeMeta;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
import com.github.kfcfans.powerjob.worker.common.utils.SpringUtils; import com.github.kfcfans.powerjob.worker.common.utils.SpringUtils;
@ -42,6 +42,7 @@ public class ProcessorTracker {
* 记录创建时间 * 记录创建时间
*/ */
private long startTime; private long startTime;
private RuntimeMeta runtimeMeta;
/** /**
* 任务实例信息 * 任务实例信息
*/ */
@ -99,17 +100,18 @@ public class ProcessorTracker {
* 创建 ProcessorTracker其实就是创建了个执行用的线程池 T_T * 创建 ProcessorTracker其实就是创建了个执行用的线程池 T_T
*/ */
@SuppressWarnings("squid:S1181") @SuppressWarnings("squid:S1181")
public ProcessorTracker(TaskTrackerStartTaskReq request) { public ProcessorTracker(TaskTrackerStartTaskReq request, RuntimeMeta runtimeMeta) {
try { try {
// 赋值 // 赋值
this.startTime = System.currentTimeMillis(); this.startTime = System.currentTimeMillis();
this.runtimeMeta = runtimeMeta;
this.instanceInfo = request.getInstanceInfo(); this.instanceInfo = request.getInstanceInfo();
this.instanceId = request.getInstanceInfo().getInstanceId(); this.instanceId = request.getInstanceInfo().getInstanceId();
this.taskTrackerAddress = request.getTaskTrackerAddress(); this.taskTrackerAddress = request.getTaskTrackerAddress();
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME); String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME);
this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); this.taskTrackerActorRef = runtimeMeta.getActorSystem().actorSelection(akkaRemotePath);
this.omsLogger = new OmsServerLogger(instanceId); this.omsLogger = new OmsServerLogger(instanceId, runtimeMeta.getOmsLogHandler());
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue(); this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
this.lastIdleTime = -1L; this.lastIdleTime = -1L;
this.lastCompletedTaskCount = 0L; this.lastCompletedTaskCount = 0L;
@ -162,7 +164,7 @@ public class ProcessorTracker {
newTask.setAddress(taskTrackerAddress); newTask.setAddress(taskTrackerAddress);
ClassLoader classLoader = omsContainer == null ? getClass().getClassLoader() : omsContainer.getContainerClassLoader(); ClassLoader classLoader = omsContainer == null ? getClass().getClassLoader() : omsContainer.getContainerClassLoader();
ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader, statusReportRetryQueue); ProcessorRunnable processorRunnable = new ProcessorRunnable(instanceInfo, taskTrackerActorRef, newTask, processor, omsLogger, classLoader, statusReportRetryQueue, runtimeMeta);
try { try {
threadPool.submit(processorRunnable); threadPool.submit(processorRunnable);
success = true; success = true;
@ -284,7 +286,9 @@ public class ProcessorTracker {
log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime); log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", instanceId, idleTime);
// 不可靠通知如果该请求失败则整个任务处理集群缺失一个 ProcessorTracker影响可接受 // 不可靠通知如果该请求失败则整个任务处理集群缺失一个 ProcessorTracker影响可接受
taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildIdleReport(instanceId), null); ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport(instanceId);
statusReportReq.setAddress(runtimeMeta.getWorkerAddress());
taskTrackerActorRef.tell(statusReportReq, null);
destroy(); destroy();
return; return;
} }
@ -306,7 +310,9 @@ public class ProcessorTracker {
// 上报当前 ProcessorTracker 负载 // 上报当前 ProcessorTracker 负载
long waitingNum = threadPool.getQueue().size(); long waitingNum = threadPool.getQueue().size();
taskTrackerActorRef.tell(ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum), null); ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildLoadReport(instanceId, waitingNum);
statusReportReq.setAddress(runtimeMeta.getWorkerAddress());
taskTrackerActorRef.tell(statusReportReq, null);
log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum); log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", instanceId, waitingNum);
} }
@ -339,7 +345,9 @@ public class ProcessorTracker {
String[] split = processorInfo.split("#"); String[] split = processorInfo.split("#");
log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", instanceId, split[1], split[0]); log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", instanceId, split[1], split[0]);
omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), true); String serverPath = AkkaUtils.getServerActorPath(runtimeMeta.getServerDiscoveryService().getCurrentServerAddress());
ActorSelection actorSelection = runtimeMeta.getActorSystem().actorSelection(serverPath);
omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), actorSelection);
if (omsContainer != null) { if (omsContainer != null) {
processor = omsContainer.getProcessor(split[1]); processor = omsContainer.getProcessor(split[1]);
} else { } else {

View File

@ -8,6 +8,7 @@ import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.powerjob.common.response.AskResponse; import com.github.kfcfans.powerjob.common.response.AskResponse;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.RuntimeMeta;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
@ -49,8 +50,8 @@ public class CommonTaskTracker extends TaskTracker {
private static final int MAX_REPORT_FAILED_THRESHOLD = 5; private static final int MAX_REPORT_FAILED_THRESHOLD = 5;
protected CommonTaskTracker(ServerScheduleJobReq req) { protected CommonTaskTracker(ServerScheduleJobReq req, RuntimeMeta runtimeMeta) {
super(req); super(req, runtimeMeta);
} }
@Override @Override
@ -85,7 +86,7 @@ public class CommonTaskTracker extends TaskTracker {
// 填充基础信息 // 填充基础信息
detail.setActualTriggerTime(createTime); detail.setActualTriggerTime(createTime);
detail.setStatus(InstanceStatus.RUNNING.getV()); detail.setStatus(InstanceStatus.RUNNING.getV());
detail.setTaskTrackerAddress(OhMyWorker.getWorkerAddress()); detail.setTaskTrackerAddress(runtimeMeta.getWorkerAddress());
// 填充详细信息 // 填充详细信息
InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId); InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
@ -116,7 +117,7 @@ public class CommonTaskTracker extends TaskTracker {
rootTask.setInstanceId(instanceInfo.getInstanceId()); rootTask.setInstanceId(instanceInfo.getInstanceId());
rootTask.setTaskId(ROOT_TASK_ID); rootTask.setTaskId(ROOT_TASK_ID);
rootTask.setFailedCnt(0); rootTask.setFailedCnt(0);
rootTask.setAddress(OhMyWorker.getWorkerAddress()); rootTask.setAddress(runtimeMeta.getWorkerAddress());
rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME); rootTask.setTaskName(TaskConstant.ROOT_TASK_NAME);
rootTask.setCreatedTime(System.currentTimeMillis()); rootTask.setCreatedTime(System.currentTimeMillis());
rootTask.setLastModifiedTime(System.currentTimeMillis()); rootTask.setLastModifiedTime(System.currentTimeMillis());
@ -158,7 +159,7 @@ public class CommonTaskTracker extends TaskTracker {
req.setFailedTaskNum(holder.failedNum); req.setFailedTaskNum(holder.failedNum);
req.setReportTime(System.currentTimeMillis()); req.setReportTime(System.currentTimeMillis());
req.setStartTime(createTime); req.setStartTime(createTime);
req.setSourceAddress(OhMyWorker.getWorkerAddress()); req.setSourceAddress(runtimeMeta.getWorkerAddress());
boolean success = false; boolean success = false;
String result = null; String result = null;
@ -217,7 +218,7 @@ public class CommonTaskTracker extends TaskTracker {
newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME); newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
newLastTask.setTaskId(LAST_TASK_ID); newLastTask.setTaskId(LAST_TASK_ID);
newLastTask.setSubInstanceId(instanceId); newLastTask.setSubInstanceId(instanceId);
newLastTask.setAddress(OhMyWorker.getWorkerAddress()); newLastTask.setAddress(runtimeMeta.getWorkerAddress());
submitTask(Lists.newArrayList(newLastTask)); submitTask(Lists.newArrayList(newLastTask));
} }
} }
@ -231,8 +232,8 @@ public class CommonTaskTracker extends TaskTracker {
result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT; result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT;
} }
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); String serverPath = AkkaUtils.getServerActorPath(runtimeMeta.getServerDiscoveryService().getCurrentServerAddress());
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); ActorSelection serverActor = runtimeMeta.getActorSystem().actorSelection(serverPath);
// 4. 执行完毕报告服务器 // 4. 执行完毕报告服务器
if (finished.get()) { if (finished.get()) {

View File

@ -6,6 +6,7 @@ import com.github.kfcfans.powerjob.common.model.InstanceDetail;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq; import com.github.kfcfans.powerjob.common.request.TaskTrackerReportInstanceStatusReq;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.RuntimeMeta;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
@ -62,8 +63,8 @@ public class FrequentTaskTracker extends TaskTracker {
private static final String LAST_TASK_ID_PREFIX = "L"; private static final String LAST_TASK_ID_PREFIX = "L";
private static final int MIN_INTERVAL = 50; private static final int MIN_INTERVAL = 50;
protected FrequentTaskTracker(ServerScheduleJobReq req) { protected FrequentTaskTracker(ServerScheduleJobReq req, RuntimeMeta runtimeMeta) {
super(req); super(req, runtimeMeta);
} }
@Override @Override
@ -112,7 +113,7 @@ public class FrequentTaskTracker extends TaskTracker {
// 填充基础信息 // 填充基础信息
detail.setActualTriggerTime(createTime); detail.setActualTriggerTime(createTime);
detail.setStatus(InstanceStatus.RUNNING.getV()); detail.setStatus(InstanceStatus.RUNNING.getV());
detail.setTaskTrackerAddress(OhMyWorker.getWorkerAddress()); detail.setTaskTrackerAddress(runtimeMeta.getWorkerAddress());
List<InstanceDetail.SubInstanceDetail> history = Lists.newLinkedList(); List<InstanceDetail.SubInstanceDetail> history = Lists.newLinkedList();
recentSubInstanceInfo.forEach((subId, subInstanceInfo) -> { recentSubInstanceInfo.forEach((subId, subInstanceInfo) -> {
@ -152,7 +153,7 @@ public class FrequentTaskTracker extends TaskTracker {
subInstanceInfo.startTime = System.currentTimeMillis(); subInstanceInfo.startTime = System.currentTimeMillis();
recentSubInstanceInfo.put(subInstanceId, subInstanceInfo); recentSubInstanceInfo.put(subInstanceId, subInstanceInfo);
String myAddress = OhMyWorker.getWorkerAddress(); String myAddress = runtimeMeta.getWorkerAddress();
String taskId = String.valueOf(subInstanceId); String taskId = String.valueOf(subInstanceId);
TaskDO newRootTask = new TaskDO(); TaskDO newRootTask = new TaskDO();
@ -301,7 +302,7 @@ public class FrequentTaskTracker extends TaskTracker {
newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME); newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
newLastTask.setTaskId(LAST_TASK_ID_PREFIX + subInstanceId); newLastTask.setTaskId(LAST_TASK_ID_PREFIX + subInstanceId);
newLastTask.setSubInstanceId(subInstanceId); newLastTask.setSubInstanceId(subInstanceId);
newLastTask.setAddress(OhMyWorker.getWorkerAddress()); newLastTask.setAddress(runtimeMeta.getWorkerAddress());
submitTask(Lists.newArrayList(newLastTask)); submitTask(Lists.newArrayList(newLastTask));
} }
} }
@ -313,7 +314,8 @@ public class FrequentTaskTracker extends TaskTracker {
private void reportStatus() { private void reportStatus() {
if (StringUtils.isEmpty(OhMyWorker.getCurrentServer())) { String currentServerAddress = runtimeMeta.getServerDiscoveryService().getCurrentServerAddress();
if (StringUtils.isEmpty(currentServerAddress)) {
return; return;
} }
@ -327,14 +329,14 @@ public class FrequentTaskTracker extends TaskTracker {
req.setTotalTaskNum(triggerTimes.get()); req.setTotalTaskNum(triggerTimes.get());
req.setSucceedTaskNum(succeedTimes.get()); req.setSucceedTaskNum(succeedTimes.get());
req.setFailedTaskNum(failedTimes.get()); req.setFailedTaskNum(failedTimes.get());
req.setSourceAddress(OhMyWorker.getWorkerAddress()); req.setSourceAddress(runtimeMeta.getWorkerAddress());
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); String serverPath = AkkaUtils.getServerActorPath(currentServerAddress);
if (StringUtils.isEmpty(serverPath)) { if (StringUtils.isEmpty(serverPath)) {
return; return;
} }
// 非可靠通知Server挂掉后任务的kill工作交由其他线程去做 // 非可靠通知Server挂掉后任务的kill工作交由其他线程去做
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); ActorSelection serverActor = runtimeMeta.getActorSystem().actorSelection(serverPath);
serverActor.tell(req, null); serverActor.tell(req, null);
} }

View File

@ -15,6 +15,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils;
import com.github.kfcfans.powerjob.common.utils.SegmentLock; import com.github.kfcfans.powerjob.common.utils.SegmentLock;
import com.github.kfcfans.powerjob.worker.OhMyWorker; import com.github.kfcfans.powerjob.worker.OhMyWorker;
import com.github.kfcfans.powerjob.worker.common.RuntimeMeta;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils;
@ -59,6 +60,10 @@ public abstract class TaskTracker {
* TaskTracker创建时间 * TaskTracker创建时间
*/ */
protected final long createTime; protected final long createTime;
/**
* worker 运行时元数据
*/
protected final RuntimeMeta runtimeMeta;
/** /**
* 任务实例ID使用频率过高 InstanceInfo 提取出来单独保存一份 * 任务实例ID使用频率过高 InstanceInfo 提取出来单独保存一份
*/ */
@ -100,10 +105,11 @@ public abstract class TaskTracker {
private final SegmentLock segmentLock; private final SegmentLock segmentLock;
private static final int UPDATE_CONCURRENCY = 4; private static final int UPDATE_CONCURRENCY = 4;
protected TaskTracker(ServerScheduleJobReq req) { protected TaskTracker(ServerScheduleJobReq req, RuntimeMeta runtimeMeta) {
// 初始化成员变量 // 初始化成员变量
this.createTime = System.currentTimeMillis(); this.createTime = System.currentTimeMillis();
this.runtimeMeta = runtimeMeta;
this.instanceId = req.getInstanceId(); this.instanceId = req.getInstanceId();
this.instanceInfo = new InstanceInfo(); this.instanceInfo = new InstanceInfo();
BeanUtils.copyProperties(req, instanceInfo); BeanUtils.copyProperties(req, instanceInfo);
@ -118,7 +124,7 @@ public abstract class TaskTracker {
instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency())); instanceInfo.setThreadConcurrency(Math.max(1, instanceInfo.getThreadConcurrency()));
this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress()); this.ptStatusHolder = new ProcessorTrackerStatusHolder(req.getAllWorkerAddress());
this.taskPersistenceService = TaskPersistenceService.INSTANCE; this.taskPersistenceService = runtimeMeta.getTaskPersistenceService();
this.finished = new AtomicBoolean(false); this.finished = new AtomicBoolean(false);
// 只有工作流中的任务允许向工作流中追加上下文数据 // 只有工作流中的任务允许向工作流中追加上下文数据
this.appendedWfContext = req.getWfInstanceId() == null ? Collections.emptyMap() : Maps.newConcurrentMap(); this.appendedWfContext = req.getWfInstanceId() == null ? Collections.emptyMap() : Maps.newConcurrentMap();
@ -140,15 +146,15 @@ public abstract class TaskTracker {
* @param req 服务端调度任务请求 * @param req 服务端调度任务请求
* @return API/CRON -> CommonTaskTracker, FIX_RATE/FIX_DELAY -> FrequentTaskTracker * @return API/CRON -> CommonTaskTracker, FIX_RATE/FIX_DELAY -> FrequentTaskTracker
*/ */
public static TaskTracker create(ServerScheduleJobReq req) { public static TaskTracker create(ServerScheduleJobReq req, RuntimeMeta runtimeMeta) {
try { try {
TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType()); TimeExpressionType timeExpressionType = TimeExpressionType.valueOf(req.getTimeExpressionType());
switch (timeExpressionType) { switch (timeExpressionType) {
case FIXED_RATE: case FIXED_RATE:
case FIXED_DELAY: case FIXED_DELAY:
return new FrequentTaskTracker(req); return new FrequentTaskTracker(req, runtimeMeta);
default: default:
return new CommonTaskTracker(req); return new CommonTaskTracker(req, runtimeMeta);
} }
} catch (Exception e) { } catch (Exception e) {
log.warn("[TaskTracker-{}] create TaskTracker from request({}) failed.", req.getInstanceId(), req, e); log.warn("[TaskTracker-{}] create TaskTracker from request({}) failed.", req.getInstanceId(), req, e);
@ -160,10 +166,10 @@ public abstract class TaskTracker {
response.setResult(String.format("init TaskTracker failed, reason: %s", e.toString())); response.setResult(String.format("init TaskTracker failed, reason: %s", e.toString()));
response.setReportTime(System.currentTimeMillis()); response.setReportTime(System.currentTimeMillis());
response.setStartTime(System.currentTimeMillis()); response.setStartTime(System.currentTimeMillis());
response.setSourceAddress(OhMyWorker.getWorkerAddress()); response.setSourceAddress(runtimeMeta.getWorkerAddress());
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); String serverPath = AkkaUtils.getServerActorPath(runtimeMeta.getServerDiscoveryService().getCurrentServerAddress());
ActorSelection serverActor = OhMyWorker.actorSystem.actorSelection(serverPath); ActorSelection serverActor = runtimeMeta.getActorSystem().actorSelection(serverPath);
serverActor.tell(response, null); serverActor.tell(response, null);
} }
return null; return null;
@ -185,8 +191,8 @@ public abstract class TaskTracker {
return; return;
} }
// 检查追加的上下文大小是否超出限制 // 检查追加的上下文大小是否超出限制
if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext)) { if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, runtimeMeta.getOhMyConfig().getMaxAppendedWfContextLength())) {
log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(),OhMyWorker.getConfig().getMaxAppendedWfContextLength()); log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(), runtimeMeta.getOhMyConfig().getMaxAppendedWfContextLength());
// ignore appended workflow context data // ignore appended workflow context data
return; return;
} }
@ -345,7 +351,7 @@ public abstract class TaskTracker {
String idlePtAddress = heartbeatReq.getAddress(); String idlePtAddress = heartbeatReq.getAddress();
// ProcessorTracker 已销毁重置为初始状态 // ProcessorTracker 已销毁重置为初始状态
ptStatusHolder.getProcessorTrackerStatus(idlePtAddress).setDispatched(false); ptStatusHolder.getProcessorTrackerStatus(idlePtAddress).setDispatched(false);
List<TaskDO> unfinishedTask = TaskPersistenceService.INSTANCE.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress); List<TaskDO> unfinishedTask = taskPersistenceService.getAllUnFinishedTaskByAddress(instanceId, idlePtAddress);
if (!CollectionUtils.isEmpty(unfinishedTask)) { if (!CollectionUtils.isEmpty(unfinishedTask)) {
log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask); log.warn("[TaskTracker-{}] ProcessorTracker({}) is idle now but have unfinished tasks: {}", instanceId, idlePtAddress, unfinishedTask);
unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result")); unfinishedTask.forEach(task -> updateTaskStatus(task.getSubInstanceId(), task.getTaskId(), TaskStatus.WORKER_PROCESS_FAILED.getValue(), System.currentTimeMillis(), "SYSTEM: unreceived process result"));
@ -404,7 +410,7 @@ public abstract class TaskTracker {
stopRequest.setInstanceId(instanceId); stopRequest.setInstanceId(instanceId);
ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> { ptStatusHolder.getAllProcessorTrackers().forEach(ptIP -> {
String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); String ptPath = AkkaUtils.getAkkaWorkerPath(ptIP, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptPath); ActorSelection ptActor = runtimeMeta.getActorSystem().actorSelection(ptPath);
// 不可靠通知ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭 // 不可靠通知ProcessorTracker 也可以靠自己的定时任务/问询等方式关闭
ptActor.tell(stopRequest, null); ptActor.tell(stopRequest, null);
}); });
@ -456,9 +462,9 @@ public abstract class TaskTracker {
taskId2LastReportTime.put(task.getTaskId(), -1L); taskId2LastReportTime.put(task.getTaskId(), -1L);
// 4. 任务派发 // 4. 任务派发
TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task); TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, runtimeMeta.getWorkerAddress());
String ptActorPath = AkkaUtils.getAkkaWorkerPath(processorTrackerAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); String ptActorPath = AkkaUtils.getAkkaWorkerPath(processorTrackerAddress, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME);
ActorSelection ptActor = OhMyWorker.actorSystem.actorSelection(ptActorPath); ActorSelection ptActor = runtimeMeta.getActorSystem().actorSelection(ptActorPath);
ptActor.tell(startTaskReq, null); ptActor.tell(startTaskReq, null);
log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName()); log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName());
@ -549,13 +555,13 @@ public abstract class TaskTracker {
protected class WorkerDetector implements Runnable { protected class WorkerDetector implements Runnable {
@Override @Override
public void run() { public void run() {
String serverPath = AkkaUtils.getAkkaServerPath(RemoteConstant.SERVER_ACTOR_NAME); String serverPath = AkkaUtils.getServerActorPath(runtimeMeta.getServerDiscoveryService().getCurrentServerAddress());
if (StringUtils.isEmpty(serverPath)) { if (StringUtils.isEmpty(serverPath)) {
log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId); log.warn("[TaskTracker-{}] no server available, won't start worker detective!", instanceId);
return; return;
} }
WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(OhMyWorker.getAppId(), instanceInfo.getJobId()); WorkerQueryExecutorClusterReq req = new WorkerQueryExecutorClusterReq(runtimeMeta.getAppId(), instanceInfo.getJobId());
AskResponse response = AkkaUtils.easyAsk(OhMyWorker.actorSystem.actorSelection(serverPath), req); AskResponse response = AkkaUtils.easyAsk(runtimeMeta.getActorSystem().actorSelection(serverPath), req);
if (!response.isSuccess()) { if (!response.isSuccess()) {
log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage()); log.warn("[TaskTracker-{}] detective failed due to ask failed, message is {}", instanceId, response.getMessage());
return; return;

View File

@ -19,6 +19,7 @@ import org.slf4j.helpers.MessageFormatter;
public class OmsServerLogger implements OmsLogger { public class OmsServerLogger implements OmsLogger {
private final long instanceId; private final long instanceId;
private final OmsLogHandler omsLogHandler;
@Override @Override
public void debug(String messagePattern, Object... args) { public void debug(String messagePattern, Object... args) {
@ -59,7 +60,7 @@ public class OmsServerLogger implements OmsLogger {
private void process(LogLevel level, String messagePattern, Object... args) { private void process(LogLevel level, String messagePattern, Object... args) {
String logContent = genLogContent(messagePattern, args); String logContent = genLogContent(messagePattern, args);
OmsLogHandler.INSTANCE.submitLog(instanceId, level, logContent); omsLogHandler.submitLog(instanceId, level, logContent);
} }
} }

View File

@ -23,46 +23,37 @@ import java.sql.SQLException;
@Slf4j @Slf4j
public class ConnectionFactory { public class ConnectionFactory {
private static volatile DataSource dataSource; private volatile DataSource dataSource;
private static final String H2_PATH = OmsWorkerFileUtils.getH2WorkDir(); private final String H2_PATH = OmsWorkerFileUtils.getH2WorkDir();
private static final String DISK_JDBC_URL = String.format("jdbc:h2:file:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH); private final String DISK_JDBC_URL = String.format("jdbc:h2:file:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH);
private static final String MEMORY_JDBC_URL = String.format("jdbc:h2:mem:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH); private final String MEMORY_JDBC_URL = String.format("jdbc:h2:mem:%spowerjob_worker_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false", H2_PATH);
public static Connection getConnection() throws SQLException { public Connection getConnection() throws SQLException {
return getDataSource().getConnection(); return dataSource.getConnection();
} }
private static DataSource getDataSource() { public synchronized void initDatasource(StoreStrategy strategy) {
if (dataSource != null) { // 兼容单元测试否则没办法单独测试 DAO 层了
return dataSource; strategy = strategy == null ? StoreStrategy.DISK : strategy;
HikariConfig config = new HikariConfig();
config.setDriverClassName(Driver.class.getName());
config.setJdbcUrl(strategy == StoreStrategy.DISK ? DISK_JDBC_URL : MEMORY_JDBC_URL);
config.setAutoCommit(true);
// 池中最小空闲连接数量
config.setMinimumIdle(2);
// 池中最大连接数量
config.setMaximumPoolSize(32);
dataSource = new HikariDataSource(config);
log.info("[PowerDatasource] init h2 datasource successfully, use url: {}", config.getJdbcUrl());
// JVM 关闭时删除数据库文件
try {
FileUtils.forceDeleteOnExit(new File(H2_PATH));
}catch (Exception ignore) {
} }
synchronized (ConnectionFactory.class) {
if (dataSource == null) {
// 兼容单元测试否则没办法单独测试 DAO 层了
StoreStrategy strategy = OhMyWorker.getConfig() == null ? StoreStrategy.DISK : OhMyWorker.getConfig().getStoreStrategy();
HikariConfig config = new HikariConfig();
config.setDriverClassName(Driver.class.getName());
config.setJdbcUrl(strategy == StoreStrategy.DISK ? DISK_JDBC_URL : MEMORY_JDBC_URL);
config.setAutoCommit(true);
// 池中最小空闲连接数量
config.setMinimumIdle(2);
// 池中最大连接数量
config.setMaximumPoolSize(32);
dataSource = new HikariDataSource(config);
log.info("[OmsDatasource] init h2 datasource successfully, use url: {}", config.getJdbcUrl());
// JVM 关闭时删除数据库文件
try {
FileUtils.forceDeleteOnExit(new File(H2_PATH));
}catch (Exception ignore) {
}
}
}
return dataSource;
} }
} }

View File

@ -4,6 +4,7 @@ import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import java.sql.*; import java.sql.*;
import java.util.Collection; import java.util.Collection;
@ -16,7 +17,10 @@ import java.util.Map;
* @author tjq * @author tjq
* @since 2020/3/17 * @since 2020/3/17
*/ */
@AllArgsConstructor
public class TaskDAOImpl implements TaskDAO { public class TaskDAOImpl implements TaskDAO {
private final ConnectionFactory connectionFactory;
@Override @Override
public void initTable() throws Exception { public void initTable() throws Exception {
@ -26,7 +30,7 @@ public class TaskDAOImpl implements TaskDAO {
// bigint(20) Java Long 取值范围完全一致 // bigint(20) Java Long 取值范围完全一致
String createTableSQL = "create table task_info (task_id varchar(255), instance_id bigint(20), sub_instance_id bigint(20), task_name varchar(255), task_content blob, address varchar(255), status int(5), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), last_report_time bigint(20), unique KEY pkey (instance_id, task_id))"; String createTableSQL = "create table task_info (task_id varchar(255), instance_id bigint(20), sub_instance_id bigint(20), task_name varchar(255), task_content blob, address varchar(255), status int(5), result text, failed_cnt int(11), created_time bigint(20), last_modified_time bigint(20), last_report_time bigint(20), unique KEY pkey (instance_id, task_id))";
try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { try (Connection conn = connectionFactory.getConnection(); Statement stat = conn.createStatement()) {
stat.execute(delTableSQL); stat.execute(delTableSQL);
stat.execute(createTableSQL); stat.execute(createTableSQL);
} }
@ -35,7 +39,7 @@ public class TaskDAOImpl implements TaskDAO {
@Override @Override
public boolean save(TaskDO task) throws SQLException { public boolean save(TaskDO task) throws SQLException {
String insertSQL = "insert into task_info(task_id, instance_id, sub_instance_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time, last_report_time) values (?,?,?,?,?,?,?,?,?,?,?,?)"; String insertSQL = "insert into task_info(task_id, instance_id, sub_instance_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time, last_report_time) values (?,?,?,?,?,?,?,?,?,?,?,?)";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) {
fillInsertPreparedStatement(task, ps); fillInsertPreparedStatement(task, ps);
return ps.executeUpdate() == 1; return ps.executeUpdate() == 1;
} }
@ -44,7 +48,7 @@ public class TaskDAOImpl implements TaskDAO {
@Override @Override
public boolean batchSave(Collection<TaskDO> tasks) throws SQLException { public boolean batchSave(Collection<TaskDO> tasks) throws SQLException {
String insertSQL = "insert into task_info(task_id, instance_id, sub_instance_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time, last_report_time) values (?,?,?,?,?,?,?,?,?,?,?,?)"; String insertSQL = "insert into task_info(task_id, instance_id, sub_instance_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time, last_report_time) values (?,?,?,?,?,?,?,?,?,?,?,?)";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) { try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(insertSQL)) {
for (TaskDO task : tasks) { for (TaskDO task : tasks) {
@ -63,7 +67,7 @@ public class TaskDAOImpl implements TaskDAO {
public boolean simpleDelete(SimpleTaskQuery condition) throws SQLException { public boolean simpleDelete(SimpleTaskQuery condition) throws SQLException {
String deleteSQL = "delete from task_info where %s"; String deleteSQL = "delete from task_info where %s";
String sql = String.format(deleteSQL, condition.getQueryCondition()); String sql = String.format(deleteSQL, condition.getQueryCondition());
try (Connection conn = ConnectionFactory.getConnection(); Statement stat = conn.createStatement()) { try (Connection conn = connectionFactory.getConnection(); Statement stat = conn.createStatement()) {
stat.executeUpdate(sql); stat.executeUpdate(sql);
return true; return true;
} }
@ -74,7 +78,7 @@ public class TaskDAOImpl implements TaskDAO {
ResultSet rs = null; ResultSet rs = null;
String sql = "select * from task_info where " + query.getQueryCondition(); String sql = "select * from task_info where " + query.getQueryCondition();
List<TaskDO> result = Lists.newLinkedList(); List<TaskDO> result = Lists.newLinkedList();
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
rs = ps.executeQuery(); rs = ps.executeQuery();
while (rs.next()) { while (rs.next()) {
result.add(convert(rs)); result.add(convert(rs));
@ -96,7 +100,7 @@ public class TaskDAOImpl implements TaskDAO {
String sqlFormat = "select %s from task_info where %s"; String sqlFormat = "select %s from task_info where %s";
String sql = String.format(sqlFormat, query.getQueryContent(), query.getQueryCondition()); String sql = String.format(sqlFormat, query.getQueryContent(), query.getQueryCondition());
List<Map<String, Object>> result = Lists.newLinkedList(); List<Map<String, Object>> result = Lists.newLinkedList();
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
rs = ps.executeQuery(); rs = ps.executeQuery();
// 原数据包含了列名 // 原数据包含了列名
ResultSetMetaData metaData = rs.getMetaData(); ResultSetMetaData metaData = rs.getMetaData();
@ -125,7 +129,7 @@ public class TaskDAOImpl implements TaskDAO {
public boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) throws SQLException { public boolean simpleUpdate(SimpleTaskQuery condition, TaskDO updateField) throws SQLException {
String sqlFormat = "update task_info set %s where %s"; String sqlFormat = "update task_info set %s where %s";
String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getQueryCondition()); String updateSQL = String.format(sqlFormat, updateField.getUpdateSQL(), condition.getQueryCondition());
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) { try (Connection conn = connectionFactory.getConnection(); PreparedStatement stat = conn.prepareStatement(updateSQL)) {
stat.executeUpdate(); stat.executeUpdate();
return true; return true;
} }
@ -136,7 +140,7 @@ public class TaskDAOImpl implements TaskDAO {
ResultSet rs = null; ResultSet rs = null;
List<TaskResult> taskResults = Lists.newLinkedList(); List<TaskResult> taskResults = Lists.newLinkedList();
String sql = "select task_id, status, result from task_info where instance_id = ? and sub_instance_id = ?"; String sql = "select task_id, status, result from task_info where instance_id = ? and sub_instance_id = ?";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setLong(1, instanceId); ps.setLong(1, instanceId);
ps.setLong(2, subInstanceId); ps.setLong(2, subInstanceId);
rs = ps.executeQuery(); rs = ps.executeQuery();
@ -168,7 +172,7 @@ public class TaskDAOImpl implements TaskDAO {
@Override @Override
public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) throws SQLException { public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) throws SQLException {
String sql = "update task_info set status = ?, last_report_time = ?, result = ?, last_modified_time = ? where instance_id = ? and task_id = ?"; String sql = "update task_info set status = ?, last_report_time = ?, result = ?, last_modified_time = ? where instance_id = ? and task_id = ?";
try (Connection conn = ConnectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setInt(1, status); ps.setInt(1, status);
ps.setLong(2, lastReportTime); ps.setLong(2, lastReportTime);

View File

@ -4,11 +4,14 @@ package com.github.kfcfans.powerjob.worker.persistence;
import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.RemoteConstant;
import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.CommonUtils;
import com.github.kfcfans.powerjob.common.utils.SupplierPlus; import com.github.kfcfans.powerjob.common.utils.SupplierPlus;
import com.github.kfcfans.powerjob.worker.common.OhMyConfig;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant; import com.github.kfcfans.powerjob.worker.common.constants.TaskConstant;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.worker.core.processor.TaskResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskResult;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -26,24 +29,25 @@ import java.util.Optional;
@Slf4j @Slf4j
public class TaskPersistenceService { public class TaskPersistenceService {
private final StoreStrategy strategy;
// 默认重试参数 // 默认重试参数
private static final int RETRY_TIMES = 3; private static final int RETRY_TIMES = 3;
private static final long RETRY_INTERVAL_MS = 100; private static final long RETRY_INTERVAL_MS = 100;
private static volatile boolean initialized = false; private TaskDAO taskDAO;
public static TaskPersistenceService INSTANCE = new TaskPersistenceService();
private TaskPersistenceService() { public TaskPersistenceService(StoreStrategy strategy) {
this.strategy = strategy;
} }
private final TaskDAO taskDAO = new TaskDAOImpl();
public void init() throws Exception { public void init() throws Exception {
if (initialized) {
return; ConnectionFactory connectionFactory = new ConnectionFactory();
} connectionFactory.initDatasource(strategy);
taskDAO = new TaskDAOImpl(connectionFactory);
taskDAO.initTable(); taskDAO.initTable();
initialized = true;
} }
public boolean save(TaskDO task) { public boolean save(TaskDO task) {

View File

@ -45,7 +45,6 @@ public class ProcessorTrackerStatusReportReq implements OmsSerializable {
req.type = IDLE; req.type = IDLE;
req.instanceId = instanceId; req.instanceId = instanceId;
req.time = System.currentTimeMillis(); req.time = System.currentTimeMillis();
req.address = OhMyWorker.getWorkerAddress();
req.setRemainTaskNum(0); req.setRemainTaskNum(0);
return req; return req;
} }
@ -55,7 +54,6 @@ public class ProcessorTrackerStatusReportReq implements OmsSerializable {
req.type = LOAD; req.type = LOAD;
req.instanceId = instanceId; req.instanceId = instanceId;
req.time = System.currentTimeMillis(); req.time = System.currentTimeMillis();
req.address = OhMyWorker.getWorkerAddress();
req.setRemainTaskNum(remainTaskNum); req.setRemainTaskNum(remainTaskNum);
return req; return req;
} }

View File

@ -37,9 +37,9 @@ public class TaskTrackerStartTaskReq implements OmsSerializable {
/** /**
* 创建 TaskTrackerStartTaskReq该构造方法必须在 TaskTracker 节点调用 * 创建 TaskTrackerStartTaskReq该构造方法必须在 TaskTracker 节点调用
*/ */
public TaskTrackerStartTaskReq(InstanceInfo instanceInfo, TaskDO task) { public TaskTrackerStartTaskReq(InstanceInfo instanceInfo, TaskDO task, String taskTrackerAddress) {
this.taskTrackerAddress = OhMyWorker.getWorkerAddress(); this.taskTrackerAddress = taskTrackerAddress;
this.instanceInfo = instanceInfo; this.instanceInfo = instanceInfo;
this.taskId = task.getTaskId(); this.taskId = task.getTaskId();

View File

@ -1,5 +1,6 @@
package com.github.kfcfans.powerjob; package com.github.kfcfans.powerjob;
import com.github.kfcfans.powerjob.worker.common.constants.StoreStrategy;
import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus; import com.github.kfcfans.powerjob.worker.common.constants.TaskStatus;
import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils;
import com.github.kfcfans.powerjob.worker.persistence.TaskDO; import com.github.kfcfans.powerjob.worker.persistence.TaskDO;
@ -18,7 +19,7 @@ import java.util.concurrent.ThreadLocalRandom;
*/ */
public class PersistenceServiceTest { public class PersistenceServiceTest {
private static TaskPersistenceService taskPersistenceService = TaskPersistenceService.INSTANCE; private static TaskPersistenceService taskPersistenceService = new TaskPersistenceService(StoreStrategy.DISK);
@BeforeAll @BeforeAll
public static void initTable() throws Exception { public static void initTable() throws Exception {

View File

@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.TestUtils;
import com.github.kfcfans.powerjob.common.ExecuteType; import com.github.kfcfans.powerjob.common.ExecuteType;
import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.TimeExpressionType;
import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq; import com.github.kfcfans.powerjob.common.request.ServerScheduleJobReq;
import com.github.kfcfans.powerjob.worker.common.RuntimeMeta;
import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker; import com.github.kfcfans.powerjob.worker.core.tracker.processor.ProcessorTracker;
import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker; import com.github.kfcfans.powerjob.worker.core.tracker.task.TaskTracker;
import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq; import com.github.kfcfans.powerjob.worker.pojo.request.ProcessorTrackerStatusReportReq;
@ -22,7 +23,7 @@ public class IdleTest extends CommonTest {
@Test @Test
public void testProcessorTrackerSendIdleReport() throws Exception { public void testProcessorTrackerSendIdleReport() throws Exception {
TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestBasicProcessor"); TaskTrackerStartTaskReq req = genTaskTrackerStartTaskReq("com.github.kfcfans.powerjob.processors.TestBasicProcessor");
ProcessorTracker pt = new ProcessorTracker(req); ProcessorTracker pt = new ProcessorTracker(req, new RuntimeMeta());
Thread.sleep(300000); Thread.sleep(300000);
} }
@ -32,7 +33,7 @@ public class IdleTest extends CommonTest {
ProcessorTrackerStatusReportReq req = ProcessorTrackerStatusReportReq.buildIdleReport(10086L); ProcessorTrackerStatusReportReq req = ProcessorTrackerStatusReportReq.buildIdleReport(10086L);
ServerScheduleJobReq serverScheduleJobReq = TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.API); ServerScheduleJobReq serverScheduleJobReq = TestUtils.genServerScheduleJobReq(ExecuteType.STANDALONE, TimeExpressionType.API);
TaskTracker taskTracker = TaskTracker.create(serverScheduleJobReq); TaskTracker taskTracker = TaskTracker.create(serverScheduleJobReq, new RuntimeMeta());
if (taskTracker != null) { if (taskTracker != null) {
taskTracker.receiveProcessorTrackerHeartbeat(req); taskTracker.receiveProcessorTrackerHeartbeat(req);
} }