refactor: change name from OhMyWorker to PowerJobWorker

This commit is contained in:
tjq 2021-03-19 23:40:55 +08:00
parent d8811c7d77
commit fe439721d0
17 changed files with 76 additions and 76 deletions

View File

@ -53,7 +53,7 @@
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${springboot.version}</version>
<configuration>
<mainClass>tech.powerjob.server.OhMyApplication</mainClass>
<mainClass>tech.powerjob.server.PowerJobServerApplication</mainClass>
</configuration>
<executions>
<execution>

View File

@ -17,7 +17,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@Slf4j
@EnableScheduling
@SpringBootApplication
public class OhMyApplication {
public class PowerJobServerApplication {
private static final String TIPS = "\n\n" +
"******************* PowerJob Tips *******************\n" +
@ -35,7 +35,7 @@ public class OhMyApplication {
// Start SpringBoot application.
try {
SpringApplication.run(OhMyApplication.class, args);
SpringApplication.run(PowerJobServerApplication.class, args);
} catch (Throwable t) {
log.error(TIPS);
throw t;

View File

@ -1,8 +1,8 @@
package tech.powerjob.agent;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.worker.OhMyWorker;
import tech.powerjob.worker.common.OhMyConfig;
import tech.powerjob.worker.PowerJobWorker;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.constants.StoreStrategy;
import com.google.common.base.Splitter;
import lombok.extern.slf4j.Slf4j;
@ -44,7 +44,7 @@ public class MainApplication implements Runnable {
@Override
public void run() {
OhMyConfig cfg = new OhMyConfig();
PowerJobWorkerConfig cfg = new PowerJobWorkerConfig();
try {
cfg.setAppName(appName);
@ -53,10 +53,10 @@ public class MainApplication implements Runnable {
cfg.setStoreStrategy(StoreStrategy.MEMORY.name().equals(storeStrategy) ? StoreStrategy.MEMORY : StoreStrategy.DISK);
cfg.setMaxResultLength(length);
OhMyWorker ohMyWorker = new OhMyWorker();
ohMyWorker.setConfig(cfg);
PowerJobWorker worker = new PowerJobWorker();
worker.setConfig(cfg);
ohMyWorker.init();
worker.init();
}catch (Exception e) {
log.error("[OhMyAgent] startup failed by config: {}.", cfg, e);
ExceptionUtils.rethrow(e);

View File

@ -10,7 +10,7 @@ import org.springframework.context.annotation.Configuration;
* @since 2020/4/17
*/
@Configuration
public class OhMySchedulerConfig {
public class PowerJobWorkerConfig {
/*

View File

@ -2,8 +2,8 @@ package tech.powerjob.worker.autoconfigure;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.worker.OhMyWorker;
import tech.powerjob.worker.common.OhMyConfig;
import tech.powerjob.worker.PowerJobWorker;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ -28,7 +28,7 @@ public class PowerJobAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public OhMyWorker initPowerJob(PowerJobProperties properties) {
public PowerJobWorker initPowerJob(PowerJobProperties properties) {
PowerJobProperties.Worker worker = properties.getWorker();
@ -42,7 +42,7 @@ public class PowerJobAutoConfiguration {
/*
* Create OhMyConfig object for setting properties.
*/
OhMyConfig config = new OhMyConfig();
PowerJobWorkerConfig config = new PowerJobWorkerConfig();
/*
* Configuration of worker port. Random port is enabled when port is set with non-positive number.
*/
@ -76,7 +76,7 @@ public class PowerJobAutoConfiguration {
/*
* Create OhMyWorker object and set properties.
*/
OhMyWorker ohMyWorker = new OhMyWorker();
PowerJobWorker ohMyWorker = new PowerJobWorker();
ohMyWorker.setConfig(config);
return ohMyWorker;
}

View File

@ -1,6 +1,6 @@
package tech.powerjob.worker.autoconfigure;
import tech.powerjob.worker.OhMyWorker;
import tech.powerjob.worker.PowerJobWorker;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
@ -15,7 +15,7 @@ class PowerJobAutoConfigurationTest {
@Test
void testAutoConfiguration() {
ConfigurableApplicationContext run = SpringApplication.run(PowerJobAutoConfigurationTest.class);
OhMyWorker worker = run.getBean(OhMyWorker.class);
PowerJobWorker worker = run.getBean(PowerJobWorker.class);
Assertions.assertNotNull(worker);
}

View File

@ -19,7 +19,7 @@ import tech.powerjob.worker.actors.WorkerActor;
import tech.powerjob.worker.background.OmsLogHandler;
import tech.powerjob.worker.background.ServerDiscoveryService;
import tech.powerjob.worker.background.WorkerHealthReporter;
import tech.powerjob.worker.common.OhMyConfig;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.PowerBannerPrinter;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.utils.SpringUtils;
@ -51,7 +51,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* @since 2020/3/16
*/
@Slf4j
public class OhMyWorker implements ApplicationContextAware, InitializingBean, DisposableBean {
public class PowerJobWorker implements ApplicationContextAware, InitializingBean, DisposableBean {
private ScheduledExecutorService timingPool;
private final WorkerRuntime workerRuntime = new WorkerRuntime();
@ -70,14 +70,14 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
public void init() throws Exception {
if (!initialized.compareAndSet(false, true)) {
log.warn("[OhMyWorker] please do not repeat the initialization");
log.warn("[PowerJobWorker] please do not repeat the initialization");
return;
}
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("[OhMyWorker] start to initialize OhMyWorker...");
log.info("[PowerJobWorker] start to initialize PowerJobWorker...");
OhMyConfig config = workerRuntime.getOhMyConfig();
PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
CommonUtils.requireNonNull(config, "can't find OhMyConfig, please set OhMyConfig first");
try {
@ -86,7 +86,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
if (!config.isEnableTestMode()) {
assertAppName();
}else {
log.warn("[OhMyWorker] using TestMode now, it's dangerous if this is production env.");
log.warn("[PowerJobWorker] using TestMode now, it's dangerous if this is production env.");
}
// 初始化元数据
@ -98,7 +98,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
timingPool = Executors.newScheduledThreadPool(3, timingPoolFactory);
// 连接 server
ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getOhMyConfig());
ServerDiscoveryService serverDiscoveryService = new ServerDiscoveryService(workerRuntime.getAppId(), workerRuntime.getWorkerConfig());
serverDiscoveryService.start(timingPool);
workerRuntime.setServerDiscoveryService(serverDiscoveryService);
@ -128,38 +128,38 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(TroubleshootingActor.class), RemoteConstant.TROUBLESHOOTING_ACTOR_NAME);
actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress);
log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
log.info("[PowerJobWorker] akka-remote listening address: {}", workerAddress);
log.info("[PowerJobWorker] akka ActorSystem({}) initialized successfully.", actorSystem);
// 初始化日志系统
OmsLogHandler omsLogHandler = new OmsLogHandler(workerAddress, actorSystem, serverDiscoveryService);
workerRuntime.setOmsLogHandler(omsLogHandler);
// 初始化存储
TaskPersistenceService taskPersistenceService = new TaskPersistenceService(workerRuntime.getOhMyConfig().getStoreStrategy());
TaskPersistenceService taskPersistenceService = new TaskPersistenceService(workerRuntime.getWorkerConfig().getStoreStrategy());
taskPersistenceService.init();
workerRuntime.setTaskPersistenceService(taskPersistenceService);
log.info("[OhMyWorker] local storage initialized successfully.");
log.info("[PowerJobWorker] local storage initialized successfully.");
// 初始化定时任务
timingPool.scheduleAtFixedRate(new WorkerHealthReporter(workerRuntime), 0, 15, TimeUnit.SECONDS);
timingPool.scheduleWithFixedDelay(omsLogHandler.logSubmitter, 0, 5, TimeUnit.SECONDS);
log.info("[OhMyWorker] OhMyWorker initialized successfully, using time: {}, congratulations!", stopwatch);
log.info("[PowerJobWorker] PowerJobWorker initialized successfully, using time: {}, congratulations!", stopwatch);
}catch (Exception e) {
log.error("[OhMyWorker] initialize OhMyWorker failed, using {}.", stopwatch, e);
log.error("[PowerJobWorker] initialize PowerJobWorker failed, using {}.", stopwatch, e);
throw e;
}
}
public void setConfig(OhMyConfig config) {
workerRuntime.setOhMyConfig(config);
public void setConfig(PowerJobWorkerConfig config) {
workerRuntime.setWorkerConfig(config);
}
@SuppressWarnings("rawtypes")
private void assertAppName() {
OhMyConfig config = workerRuntime.getOhMyConfig();
PowerJobWorkerConfig config = workerRuntime.getWorkerConfig();
String appName = config.getAppName();
Objects.requireNonNull(appName, "appName can't be empty!");
@ -171,20 +171,20 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di
ResultDTO resultDTO = JsonUtils.parseObject(resultDTOStr, ResultDTO.class);
if (resultDTO.isSuccess()) {
Long appId = Long.valueOf(resultDTO.getData().toString());
log.info("[OhMyWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId);
log.info("[PowerJobWorker] assert appName({}) succeed, the appId for this application is {}.", appName, appId);
workerRuntime.setAppId(appId);
return;
}else {
log.error("[OhMyWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
log.error("[PowerJobWorker] assert appName failed, this appName is invalid, please register the appName {} first.", appName);
throw new PowerJobException(resultDTO.getMessage());
}
}catch (PowerJobException oe) {
throw oe;
}catch (Exception ignore) {
log.warn("[OhMyWorker] assert appName by url({}) failed, please check the server address.", realUrl);
log.warn("[PowerJobWorker] assert appName by url({}) failed, please check the server address.", realUrl);
}
}
log.error("[OhMyWorker] no available server in {}.", config.getServerAddress());
log.error("[PowerJobWorker] no available server in {}.", config.getServerAddress());
throw new PowerJobException("no server available!");
}

View File

@ -5,7 +5,7 @@ import tech.powerjob.common.response.ResultDTO;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.common.utils.JsonUtils;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.worker.common.OhMyConfig;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.core.tracker.task.TaskTracker;
import tech.powerjob.worker.core.tracker.task.TaskTrackerPool;
import com.google.common.collect.Maps;
@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit;
public class ServerDiscoveryService {
private final Long appId;
private final OhMyConfig config;
private final PowerJobWorkerConfig config;
private String currentServerAddress;
@ -41,7 +41,7 @@ public class ServerDiscoveryService {
// 最大失败次数
private static final int MAX_FAILED_COUNT = 3;
public ServerDiscoveryService(Long appId, OhMyConfig config) {
public ServerDiscoveryService(Long appId, PowerJobWorkerConfig config) {
this.appId = appId;
this.config = config;
}

View File

@ -36,23 +36,23 @@ public class WorkerHealthReporter implements Runnable {
SystemMetrics systemMetrics;
if (workerRuntime.getOhMyConfig().getSystemMetricsCollector() == null) {
if (workerRuntime.getWorkerConfig().getSystemMetricsCollector() == null) {
systemMetrics = SystemInfoUtils.getSystemMetrics();
} else {
systemMetrics = workerRuntime.getOhMyConfig().getSystemMetricsCollector().collect();
systemMetrics = workerRuntime.getWorkerConfig().getSystemMetricsCollector().collect();
}
WorkerHeartbeat heartbeat = new WorkerHeartbeat();
heartbeat.setSystemMetrics(systemMetrics);
heartbeat.setWorkerAddress(workerRuntime.getWorkerAddress());
heartbeat.setAppName(workerRuntime.getOhMyConfig().getAppName());
heartbeat.setAppName(workerRuntime.getWorkerConfig().getAppName());
heartbeat.setAppId(workerRuntime.getAppId());
heartbeat.setHeartbeatTime(System.currentTimeMillis());
heartbeat.setVersion(PowerJobWorkerVersion.getVersion());
heartbeat.setProtocol(Protocol.AKKA.name());
heartbeat.setClient("Atlantis");
heartbeat.setTag(workerRuntime.getOhMyConfig().getTag());
heartbeat.setTag(workerRuntime.getWorkerConfig().getTag());
// 获取当前加载的容器列表
heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos());

View File

@ -19,7 +19,7 @@ import java.util.List;
*/
@Getter
@Setter
public class OhMyConfig {
public class PowerJobWorkerConfig {
/**
* AppName, recommend to use the name of this project
* Applications should be registered by powerjob-console in advance to prevent error.

View File

@ -17,7 +17,7 @@ public class WorkerRuntime {
private Long appId;
private OhMyConfig ohMyConfig;
private PowerJobWorkerConfig workerConfig;
private String workerAddress;

View File

@ -115,7 +115,7 @@ public class ProcessorRunnable implements Runnable {
if (task.getTaskContent() != null && task.getTaskContent().length > 0) {
taskContext.setSubTask(SerializerUtils.deSerialized(task.getTaskContent()));
}
taskContext.setUserContext(workerRuntime.getOhMyConfig().getUserContext());
taskContext.setUserContext(workerRuntime.getWorkerConfig().getUserContext());
return taskContext;
}
@ -211,8 +211,8 @@ public class ProcessorRunnable implements Runnable {
req.setReportTime(System.currentTimeMillis());
req.setCmd(cmd);
// 检查追加的上下文大小是否超出限制
if (instanceInfo.getWfInstanceId() !=null && WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, workerRuntime.getOhMyConfig().getMaxAppendedWfContextLength())) {
log.warn("[ProcessorRunnable-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(), workerRuntime.getOhMyConfig().getMaxAppendedWfContextLength());
if (instanceInfo.getWfInstanceId() !=null && WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength())) {
log.warn("[ProcessorRunnable-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(), workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength());
// ignore appended workflow context data
appendedWfContext = Collections.emptyMap();
}
@ -256,7 +256,7 @@ public class ProcessorRunnable implements Runnable {
if (StringUtils.isEmpty(result)) {
return "";
}
final int maxLength = workerRuntime.getOhMyConfig().getMaxResultLength();
final int maxLength = workerRuntime.getWorkerConfig().getMaxResultLength();
if (result.length() <= maxLength) {
return result;
}

View File

@ -1,7 +1,7 @@
package tech.powerjob.worker.core.processor;
import com.fasterxml.jackson.annotation.JsonIgnore;
import tech.powerjob.worker.common.OhMyConfig;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.log.OmsLogger;
import lombok.Getter;
import lombok.Setter;
@ -64,7 +64,7 @@ public class TaskContext {
@JsonIgnore
private OmsLogger omsLogger;
/**
* 用户自定义上下文通过 {@link OhMyConfig} 初始化
* 用户自定义上下文通过 {@link PowerJobWorkerConfig} 初始化
*/
private Object userContext;
/**

View File

@ -190,8 +190,8 @@ public abstract class TaskTracker {
return;
}
// 检查追加的上下文大小是否超出限制
if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, workerRuntime.getOhMyConfig().getMaxAppendedWfContextLength())) {
log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(), workerRuntime.getOhMyConfig().getMaxAppendedWfContextLength());
if (WorkflowContextUtils.isExceededLengthLimit(appendedWfContext, workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength())) {
log.warn("[TaskTracker-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!",instanceInfo.getInstanceId(), workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength());
// ignore appended workflow context data
return;
}

View File

@ -5,8 +5,8 @@ import akka.actor.ActorSystem;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.worker.OhMyWorker;
import tech.powerjob.worker.common.OhMyConfig;
import tech.powerjob.worker.PowerJobWorker;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.common.utils.NetUtils;
import com.google.common.collect.Lists;
@ -27,13 +27,13 @@ public class CommonTaskTrackerTest {
@BeforeAll
public static void init() throws Exception {
OhMyConfig ohMyConfig = new OhMyConfig();
ohMyConfig.setAppName("oms-test");
ohMyConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700"));
ohMyConfig.setEnableTestMode(true);
PowerJobWorkerConfig workerConfig = new PowerJobWorkerConfig();
workerConfig.setAppName("oms-test");
workerConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700"));
workerConfig.setEnableTestMode(true);
OhMyWorker worker = new OhMyWorker();
worker.setConfig(ohMyConfig);
PowerJobWorker worker = new PowerJobWorker();
worker.setConfig(workerConfig);
worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));

View File

@ -6,8 +6,8 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.worker.OhMyWorker;
import tech.powerjob.worker.common.OhMyConfig;
import tech.powerjob.worker.PowerJobWorker;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.utils.AkkaUtils;
import tech.powerjob.worker.pojo.model.InstanceInfo;
import tech.powerjob.worker.pojo.request.TaskTrackerStartTaskReq;
@ -28,12 +28,12 @@ public class CommonTest {
@BeforeAll
public static void startWorker() throws Exception {
OhMyConfig ohMyConfig = new OhMyConfig();
ohMyConfig.setAppName("oms-test");
ohMyConfig.setEnableTestMode(true);
PowerJobWorkerConfig workerConfig = new PowerJobWorkerConfig();
workerConfig.setAppName("oms-test");
workerConfig.setEnableTestMode(true);
OhMyWorker worker = new OhMyWorker();
worker.setConfig(ohMyConfig);
PowerJobWorker worker = new PowerJobWorker();
worker.setConfig(workerConfig);
worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));

View File

@ -6,8 +6,8 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.worker.OhMyWorker;
import tech.powerjob.worker.common.OhMyConfig;
import tech.powerjob.worker.PowerJobWorker;
import tech.powerjob.worker.common.PowerJobWorkerConfig;
import tech.powerjob.worker.common.utils.AkkaUtils;
import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
@ -27,11 +27,11 @@ public class FrequentTaskTrackerTest {
@BeforeAll
public static void init() throws Exception {
OhMyConfig ohMyConfig = new OhMyConfig();
ohMyConfig.setAppName("oms-test");
ohMyConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700"));
OhMyWorker worker = new OhMyWorker();
worker.setConfig(ohMyConfig);
PowerJobWorkerConfig workerConfig = new PowerJobWorkerConfig();
workerConfig.setAppName("oms-test");
workerConfig.setServerAddress(Lists.newArrayList("127.0.0.1:7700"));
PowerJobWorker worker = new PowerJobWorker();
worker.setConfig(workerConfig);
worker.init();
ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf"));