diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java index 814e104a..fb5c2de4 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java @@ -14,7 +14,7 @@ public class RemoteConstant { public static final String WORKER_ACTOR_SYSTEM_NAME = "oms"; - public static final String Task_TRACKER_ACTOR_NAME = "task_tracker"; + public static final String TASK_TRACKER_ACTOR_NAME = "task_tracker"; public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker"; public static final String WORKER_ACTOR_NAME = "worker"; public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting"; diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/HttpProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/HttpProcessor.java index 6f51017e..5e108544 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/HttpProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/HttpProcessor.java @@ -1,6 +1,7 @@ package tech.powerjob.official.processors.impl; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONValidator; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; @@ -20,14 +21,16 @@ import java.util.concurrent.TimeUnit; * common http processor * * @author tjq + * @author Jiang Jining * @since 2021/1/30 */ public class HttpProcessor extends CommonBasicProcessor { /** - * 60 seconds + * Default timeout is 60 seconds. */ private static final int DEFAULT_TIMEOUT = 60; + private static final int HTTP_SUCCESS_CODE = 200; private static final Map CLIENT_STORE = new ConcurrentHashMap<>(); @Override @@ -35,6 +38,12 @@ public class HttpProcessor extends CommonBasicProcessor { OmsLogger omsLogger = taskContext.getOmsLogger(); HttpParams httpParams = JSON.parseObject(CommonUtils.parseParams(taskContext), HttpParams.class); + if (httpParams == null) { + String message = "httpParams is null, please check jobParam configuration."; + omsLogger.warn(message); + return new ProcessResult(false, message); + } + if (StringUtils.isEmpty(httpParams.url)) { return new ProcessResult(false, "url can't be empty!"); } @@ -54,9 +63,16 @@ public class HttpProcessor extends CommonBasicProcessor { } // set default mediaType - if (!"GET".equals(httpParams.method) && StringUtils.isEmpty(httpParams.mediaType) && JSONValidator.from(httpParams.body).validate()) { - httpParams.mediaType = "application/json"; - omsLogger.warn("try to use 'application/json' as media type"); + if (!"GET".equals(httpParams.method)) { + // set default request body + if (StringUtils.isEmpty(httpParams.body)) { + httpParams.body = new JSONObject().toJSONString(); + omsLogger.warn("try to use default request body:{}", httpParams.body); + } + if (JSONValidator.from(httpParams.body).validate() && StringUtils.isEmpty(httpParams.mediaType)) { + httpParams.mediaType = "application/json"; + omsLogger.warn("try to use 'application/json' as media type"); + } } // set default timeout @@ -95,9 +111,15 @@ public class HttpProcessor extends CommonBasicProcessor { msgBody = response.body().string(); } - String res = String.format("code:%d,body:%s", response.code(), msgBody); - - return new ProcessResult(true, res); + int responseCode = response.code(); + String res = String.format("code:%d, body:%s", responseCode, msgBody); + boolean success = true; + if (responseCode != HTTP_SUCCESS_CODE) { + success = false; + omsLogger.warn("{} url: {} failed, response code is {}, response body is {}", + httpParams.method, httpParams.url, responseCode, msgBody); + } + return new ProcessResult(success, res); } @Data diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java index 8cef3bc4..5c00969b 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java @@ -7,6 +7,8 @@ import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.SystemUtils; import tech.powerjob.official.processors.CommonBasicProcessor; import tech.powerjob.official.processors.util.CommonUtils; @@ -20,47 +22,62 @@ import java.util.concurrent.ForkJoinPool; * 脚本处理器 * * @author tjq + * @author Jiang Jining * @since 2020/4/16 */ @Slf4j public abstract class AbstractScriptProcessor extends CommonBasicProcessor { - private static final ForkJoinPool pool = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors()); + private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors()); private static final Set DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp"); + static final String SH_SHELL = "/bin/sh"; @Override protected ProcessResult process0(TaskContext context) throws Exception { OmsLogger omsLogger = context.getOmsLogger(); omsLogger.info("SYSTEM ===> ScriptProcessor start to process"); - - String scriptPath = prepareScriptFile(context.getInstanceId(), CommonUtils.parseParams(context)); - - // 1. 授权 - ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); - // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) - chmodPb.start().waitFor(); + String scriptParams = CommonUtils.parseParams(context); + if (scriptParams == null) { + String message = "scriptParams is null, please check jobParam configuration."; + omsLogger.warn(message); + return new ProcessResult(false, message); + } + String scriptPath = prepareScriptFile(context.getInstanceId(), scriptParams); + + if (SystemUtils.IS_OS_WINDOWS) { + if (StringUtils.equals(getRunCommand(), SH_SHELL)) { + String message = String.format("Current OS is %s where shell scripts cannot run.", SystemUtils.OS_NAME); + omsLogger.warn(message); + return new ProcessResult(false, message); + } + } else { + // 1. 授权 + ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); + // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) + chmodPb.start().waitFor(); + } // 2. 执行目标脚本 ProcessBuilder pb = new ProcessBuilder(getRunCommand(), scriptPath); Process process = pb.start(); - StringBuilder inputSB = new StringBuilder(); - StringBuilder errorSB = new StringBuilder(); + StringBuilder inputBuilder = new StringBuilder(); + StringBuilder errorBuilder = new StringBuilder(); boolean success = true; String result; try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) { - pool.execute(() -> copyStream(is, inputSB, omsLogger)); - pool.execute(() -> copyStream(es, errorSB, omsLogger)); + POOL.execute(() -> copyStream(is, inputBuilder, omsLogger)); + POOL.execute(() -> copyStream(es, errorBuilder, omsLogger)); success = process.waitFor() == 0; } catch (InterruptedException ie) { omsLogger.info("SYSTEM ===> ScriptProcessor has been interrupted"); } finally { - result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString()); + result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder.toString(), errorBuilder.toString()); } return new ProcessResult(success, result); } @@ -112,6 +129,7 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { /** * 生成脚本名称 + * @param instanceId id of instance * @return 文件名称 */ protected abstract String getScriptName(Long instanceId); diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/ShellProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/ShellProcessor.java index fd25bda0..8aa4cd61 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/ShellProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/ShellProcessor.java @@ -15,6 +15,6 @@ public class ShellProcessor extends AbstractScriptProcessor { @Override protected String getRunCommand() { - return "/bin/sh"; + return SH_SHELL; } } diff --git a/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/impl/HttpProcessorTest.java b/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/impl/HttpProcessorTest.java index 68c6fd67..bc938ed0 100644 --- a/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/impl/HttpProcessorTest.java +++ b/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/impl/HttpProcessorTest.java @@ -11,6 +11,14 @@ import tech.powerjob.official.processors.TestUtils; * @since 2021/1/31 */ class HttpProcessorTest { + + @Test + void testDefaultMethod() throws Exception { + String url = "https://www.baidu.com"; + JSONObject params = new JSONObject(); + params.put("url", url); + System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString()))); + } @Test void testGet() throws Exception { @@ -33,6 +41,25 @@ class HttpProcessorTest { System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString()))); } + + @Test + void testPostDefaultJson() throws Exception { + String url = "https://mock.uutool.cn/4f5qfgcdahj0?test=true"; + JSONObject params = new JSONObject(); + params.put("url", url); + params.put("method", "POST"); + System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString()))); + } + + @Test + void testPostDefaultWithMediaType() throws Exception { + String url = "https://mock.uutool.cn/4f5qfgcdahj0?test=true"; + JSONObject params = new JSONObject(); + params.put("url", url); + params.put("method", "POST"); + params.put("mediaType", "application/json"); + System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString()))); + } @Test void testTimeout() throws Exception { diff --git a/powerjob-server/powerjob-server-common/pom.xml b/powerjob-server/powerjob-server-common/pom.xml index 7c0237b5..824644d4 100644 --- a/powerjob-server/powerjob-server-common/pom.xml +++ b/powerjob-server/powerjob-server-common/pom.xml @@ -5,7 +5,8 @@ powerjob-server com.github.kfcfans - 3.4.6 + 4.0.0 + ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-core/pom.xml b/powerjob-server/powerjob-server-core/pom.xml index 50757c6e..8be12ab1 100644 --- a/powerjob-server/powerjob-server-core/pom.xml +++ b/powerjob-server/powerjob-server-core/pom.xml @@ -5,7 +5,8 @@ powerjob-server com.github.kfcfans - 3.4.6 + 4.0.0 + ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-extension/pom.xml b/powerjob-server/powerjob-server-extension/pom.xml index dedd4834..61372a89 100644 --- a/powerjob-server/powerjob-server-extension/pom.xml +++ b/powerjob-server/powerjob-server-extension/pom.xml @@ -5,7 +5,8 @@ powerjob-server com.github.kfcfans - 3.4.6 + 4.0.0 + ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-migrate/pom.xml b/powerjob-server/powerjob-server-migrate/pom.xml index 420a6733..b655f578 100644 --- a/powerjob-server/powerjob-server-migrate/pom.xml +++ b/powerjob-server/powerjob-server-migrate/pom.xml @@ -5,7 +5,8 @@ powerjob-server com.github.kfcfans - 3.4.6 + 4.0.0 + ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-persistence/pom.xml b/powerjob-server/powerjob-server-persistence/pom.xml index c3f1e948..d6ac3c14 100644 --- a/powerjob-server/powerjob-server-persistence/pom.xml +++ b/powerjob-server/powerjob-server-persistence/pom.xml @@ -5,7 +5,8 @@ powerjob-server com.github.kfcfans - 3.4.6 + 4.0.0 + ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-remote/pom.xml b/powerjob-server/powerjob-server-remote/pom.xml index 3258319a..f274922a 100644 --- a/powerjob-server/powerjob-server-remote/pom.xml +++ b/powerjob-server/powerjob-server-remote/pom.xml @@ -5,7 +5,8 @@ powerjob-server com.github.kfcfans - 3.4.6 + 4.0.0 + ../pom.xml 4.0.0 diff --git a/powerjob-server/powerjob-server-starter/pom.xml b/powerjob-server/powerjob-server-starter/pom.xml index 13a786e9..0a7afd77 100644 --- a/powerjob-server/powerjob-server-starter/pom.xml +++ b/powerjob-server/powerjob-server-starter/pom.xml @@ -5,7 +5,8 @@ powerjob-server com.github.kfcfans - 3.4.6 + 4.0.0 + ../pom.xml 4.0.0 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java index 49426e06..585fefc4 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java @@ -120,7 +120,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di ActorRef taskTrackerActorRef = actorSystem.actorOf(TaskTrackerActor.props(workerRuntime) .withDispatcher("akka.task-tracker-dispatcher") - .withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME); + .withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.TASK_TRACKER_ACTOR_NAME); actorSystem.actorOf(ProcessorTrackerActor.props(workerRuntime) .withDispatcher("akka.processor-tracker-dispatcher") .withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java index cb098739..f843186d 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java @@ -53,7 +53,8 @@ public abstract class MapProcessor implements BasicProcessor { ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName); // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME); + + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.TASK_TRACKER_ACTOR_NAME); boolean requestSucceed = AkkaUtils.reliableTransmit(workerRuntime.getActorSystem().actorSelection(akkaRemotePath), req); if (requestSucceed) { diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index 0ec35c5b..dc6854d4 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -108,7 +108,8 @@ public class ProcessorTracker { this.instanceInfo = request.getInstanceInfo(); this.instanceId = request.getInstanceInfo().getInstanceId(); this.taskTrackerAddress = request.getTaskTrackerAddress(); - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME); + + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.TASK_TRACKER_ACTOR_NAME); this.taskTrackerActorRef = workerRuntime.getActorSystem().actorSelection(akkaRemotePath); this.omsLogger = new OmsServerLogger(instanceId, workerRuntime.getOmsLogHandler()); diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTaskTrackerTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTaskTrackerTest.java index eacdec3f..d9508498 100644 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTaskTrackerTest.java +++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTaskTrackerTest.java @@ -37,7 +37,7 @@ public class CommonTaskTrackerTest { worker.init(); ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.TASK_TRACKER_ACTOR_NAME); remoteTaskTracker = testAS.actorSelection(akkaRemotePath); } diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTest.java index ba884113..065f6123 100644 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTest.java +++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTest.java @@ -40,7 +40,7 @@ public class CommonTest { String address = NetUtils.getLocalHost() + ":27777"; remoteProcessorTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME)); - remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.Task_TRACKER_ACTOR_NAME)); + remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.TASK_TRACKER_ACTOR_NAME)); } @AfterAll diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/FrequentTaskTrackerTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/FrequentTaskTrackerTest.java index 2e377764..ae2c1e3c 100644 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/FrequentTaskTrackerTest.java +++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/FrequentTaskTrackerTest.java @@ -35,7 +35,7 @@ public class FrequentTaskTrackerTest { worker.init(); ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.TASK_TRACKER_ACTOR_NAME); remoteTaskTracker = testAS.actorSelection(akkaRemotePath); }