From 2ed0391d1536455f7149c44cbd1163e0a799ef71 Mon Sep 17 00:00:00 2001 From: jiangjining Date: Tue, 9 Mar 2021 11:02:04 +0800 Subject: [PATCH 1/5] feat: Official HttpProcessor adds alarm support. #223 --- .../processors/impl/HttpProcessor.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/HttpProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/HttpProcessor.java index a9583f48..19535e78 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/HttpProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/HttpProcessor.java @@ -20,19 +20,27 @@ import java.util.concurrent.TimeUnit; * common http processor * * @author tjq + * @author Jiang Jining * @since 2021/1/30 */ public class HttpProcessor extends CommonBasicProcessor { - // 60 seconds + /** + * Default timeout is 60 seconds. + */ private static final int DEFAULT_TIMEOUT = 60; + private static final int HTTP_SUCCESS_CODE = 200; private static final Map CLIENT_STORE = new ConcurrentHashMap<>(); @Override public ProcessResult process0(TaskContext taskContext) throws Exception { OmsLogger omsLogger = taskContext.getOmsLogger(); HttpParams httpParams = JSONObject.parseObject(CommonUtils.parseParams(taskContext), HttpParams.class); - + if (httpParams == null) { + String message = "httpParams is null, please check jobParam configuration."; + omsLogger.warn(message); + return new ProcessResult(false, message); + } if (StringUtils.isEmpty(httpParams.url)) { return new ProcessResult(false, "url can't be empty!"); } @@ -95,9 +103,15 @@ public class HttpProcessor extends CommonBasicProcessor { msgBody = response.body().string(); } - String res = String.format("code:%d,body:%s", response.code(), msgBody); - - return new ProcessResult(true, res); + int responseCode = response.code(); + String res = String.format("code:%d, body:%s", responseCode, msgBody); + boolean success = true; + if (responseCode != HTTP_SUCCESS_CODE) { + success = false; + omsLogger.warn("{} url: {} failed, response code is {}, response body is {}", + httpParams.method, httpParams.url, responseCode, msgBody); + } + return new ProcessResult(success, res); } @Data From 65e0d118c39e7b8ad999b4a8828faa98006486c1 Mon Sep 17 00:00:00 2001 From: jiangjining Date: Tue, 9 Mar 2021 11:09:28 +0800 Subject: [PATCH 2/5] feat: Add windows script support.#161 --- .../impl/script/AbstractScriptProcessor.java | 44 +++++++++++++------ .../impl/script/ShellProcessor.java | 2 +- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java index 8cef3bc4..5c00969b 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java @@ -7,6 +7,8 @@ import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.SystemUtils; import tech.powerjob.official.processors.CommonBasicProcessor; import tech.powerjob.official.processors.util.CommonUtils; @@ -20,47 +22,62 @@ import java.util.concurrent.ForkJoinPool; * 脚本处理器 * * @author tjq + * @author Jiang Jining * @since 2020/4/16 */ @Slf4j public abstract class AbstractScriptProcessor extends CommonBasicProcessor { - private static final ForkJoinPool pool = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors()); + private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors()); private static final Set DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp"); + static final String SH_SHELL = "/bin/sh"; @Override protected ProcessResult process0(TaskContext context) throws Exception { OmsLogger omsLogger = context.getOmsLogger(); omsLogger.info("SYSTEM ===> ScriptProcessor start to process"); - - String scriptPath = prepareScriptFile(context.getInstanceId(), CommonUtils.parseParams(context)); - - // 1. 授权 - ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); - // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) - chmodPb.start().waitFor(); + String scriptParams = CommonUtils.parseParams(context); + if (scriptParams == null) { + String message = "scriptParams is null, please check jobParam configuration."; + omsLogger.warn(message); + return new ProcessResult(false, message); + } + String scriptPath = prepareScriptFile(context.getInstanceId(), scriptParams); + + if (SystemUtils.IS_OS_WINDOWS) { + if (StringUtils.equals(getRunCommand(), SH_SHELL)) { + String message = String.format("Current OS is %s where shell scripts cannot run.", SystemUtils.OS_NAME); + omsLogger.warn(message); + return new ProcessResult(false, message); + } + } else { + // 1. 授权 + ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); + // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) + chmodPb.start().waitFor(); + } // 2. 执行目标脚本 ProcessBuilder pb = new ProcessBuilder(getRunCommand(), scriptPath); Process process = pb.start(); - StringBuilder inputSB = new StringBuilder(); - StringBuilder errorSB = new StringBuilder(); + StringBuilder inputBuilder = new StringBuilder(); + StringBuilder errorBuilder = new StringBuilder(); boolean success = true; String result; try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) { - pool.execute(() -> copyStream(is, inputSB, omsLogger)); - pool.execute(() -> copyStream(es, errorSB, omsLogger)); + POOL.execute(() -> copyStream(is, inputBuilder, omsLogger)); + POOL.execute(() -> copyStream(es, errorBuilder, omsLogger)); success = process.waitFor() == 0; } catch (InterruptedException ie) { omsLogger.info("SYSTEM ===> ScriptProcessor has been interrupted"); } finally { - result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString()); + result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder.toString(), errorBuilder.toString()); } return new ProcessResult(success, result); } @@ -112,6 +129,7 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor { /** * 生成脚本名称 + * @param instanceId id of instance * @return 文件名称 */ protected abstract String getScriptName(Long instanceId); diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/ShellProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/ShellProcessor.java index fd25bda0..8aa4cd61 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/ShellProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/script/ShellProcessor.java @@ -15,6 +15,6 @@ public class ShellProcessor extends AbstractScriptProcessor { @Override protected String getRunCommand() { - return "/bin/sh"; + return SH_SHELL; } } From d07ed2b013dc80e4c9e45cc9cdae9ee69dd289e2 Mon Sep 17 00:00:00 2001 From: jiangjining Date: Thu, 11 Mar 2021 18:28:36 +0800 Subject: [PATCH 3/5] refactor: Rename Task_TRACKER_ACTOR_NAME --- .../java/com/github/kfcfans/powerjob/common/RemoteConstant.java | 2 +- .../powerjob/server/remote/transport/starter/AkkaStarter.java | 2 +- .../java/com/github/kfcfans/powerjob/worker/OhMyWorker.java | 2 +- .../powerjob/worker/core/processor/sdk/MapProcessor.java | 2 +- .../worker/core/tracker/processor/ProcessorTracker.java | 2 +- .../java/com/github/kfcfans/powerjob/CommonTaskTrackerTest.java | 2 +- .../src/test/java/com/github/kfcfans/powerjob/CommonTest.java | 2 +- .../com/github/kfcfans/powerjob/FrequentTaskTrackerTest.java | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java index 814e104a..fb5c2de4 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java @@ -14,7 +14,7 @@ public class RemoteConstant { public static final String WORKER_ACTOR_SYSTEM_NAME = "oms"; - public static final String Task_TRACKER_ACTOR_NAME = "task_tracker"; + public static final String TASK_TRACKER_ACTOR_NAME = "task_tracker"; public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker"; public static final String WORKER_ACTOR_NAME = "worker"; public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting"; diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/transport/starter/AkkaStarter.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/transport/starter/AkkaStarter.java index 03a4c695..6408dbae 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/transport/starter/AkkaStarter.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/remote/transport/starter/AkkaStarter.java @@ -90,7 +90,7 @@ public class AkkaStarter { } public static ActorSelection getTaskTrackerActor(String address) { - String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.Task_TRACKER_ACTOR_NAME); + String path = String.format(AKKA_PATH, RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, address, RemoteConstant.TASK_TRACKER_ACTOR_NAME); return actorSystem.actorSelection(path); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java index 7fe1fe1f..8d5190d7 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java @@ -101,7 +101,7 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig); actorSystem.actorOf(Props.create(TaskTrackerActor.class) .withDispatcher("akka.task-tracker-dispatcher") - .withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME); + .withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.TASK_TRACKER_ACTOR_NAME); actorSystem.actorOf(Props.create(ProcessorTrackerActor.class) .withDispatcher("akka.processor-tracker-dispatcher") .withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java index cbe0ff58..1af9e184 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/sdk/MapProcessor.java @@ -46,7 +46,7 @@ public abstract class MapProcessor implements BasicProcessor { ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName); // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(task.getAddress(), RemoteConstant.TASK_TRACKER_ACTOR_NAME); boolean requestSucceed = AkkaUtils.reliableTransmit(OhMyWorker.actorSystem.actorSelection(akkaRemotePath), req); if (requestSucceed) { diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java index c7ed24aa..1a5853a8 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/processor/ProcessorTracker.java @@ -84,7 +84,7 @@ public class ProcessorTracker { this.instanceInfo = request.getInstanceInfo(); this.instanceId = request.getInstanceInfo().getInstanceId(); this.taskTrackerAddress = request.getTaskTrackerAddress(); - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.TASK_TRACKER_ACTOR_NAME); this.taskTrackerActorRef = OhMyWorker.actorSystem.actorSelection(akkaRemotePath); this.omsLogger = new OmsServerLogger(instanceId); diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTaskTrackerTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTaskTrackerTest.java index eacdec3f..d9508498 100644 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTaskTrackerTest.java +++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTaskTrackerTest.java @@ -37,7 +37,7 @@ public class CommonTaskTrackerTest { worker.init(); ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.TASK_TRACKER_ACTOR_NAME); remoteTaskTracker = testAS.actorSelection(akkaRemotePath); } diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTest.java index ba884113..065f6123 100644 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTest.java +++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/CommonTest.java @@ -40,7 +40,7 @@ public class CommonTest { String address = NetUtils.getLocalHost() + ":27777"; remoteProcessorTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME)); - remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.Task_TRACKER_ACTOR_NAME)); + remoteTaskTracker = testAS.actorSelection(AkkaUtils.getAkkaWorkerPath(address, RemoteConstant.TASK_TRACKER_ACTOR_NAME)); } @AfterAll diff --git a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/FrequentTaskTrackerTest.java b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/FrequentTaskTrackerTest.java index 2e377764..ae2c1e3c 100644 --- a/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/FrequentTaskTrackerTest.java +++ b/powerjob-worker/src/test/java/com/github/kfcfans/powerjob/FrequentTaskTrackerTest.java @@ -35,7 +35,7 @@ public class FrequentTaskTrackerTest { worker.init(); ActorSystem testAS = ActorSystem.create("oms-test", ConfigFactory.load("oms-akka-test.conf")); - String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.Task_TRACKER_ACTOR_NAME); + String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(NetUtils.getLocalHost() + ":" + RemoteConstant.DEFAULT_WORKER_PORT, RemoteConstant.TASK_TRACKER_ACTOR_NAME); remoteTaskTracker = testAS.actorSelection(akkaRemotePath); } From 0f1d760dbe870c21345497440c52d4f79841e6ef Mon Sep 17 00:00:00 2001 From: Jiang Jining Date: Sat, 13 Mar 2021 00:43:51 +0800 Subject: [PATCH 4/5] test: Add test for HttpProcessor --- .../official/processors/impl/HttpProcessorTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/impl/HttpProcessorTest.java b/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/impl/HttpProcessorTest.java index 68c6fd67..7d8ea2d6 100644 --- a/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/impl/HttpProcessorTest.java +++ b/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/impl/HttpProcessorTest.java @@ -11,6 +11,14 @@ import tech.powerjob.official.processors.TestUtils; * @since 2021/1/31 */ class HttpProcessorTest { + + @Test + void testDefaultMethod() throws Exception { + String url = "https://www.baidu.com"; + JSONObject params = new JSONObject(); + params.put("url", url); + System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString()))); + } @Test void testGet() throws Exception { From 5834963fdd634537e8af9c40a10f7c350b4da810 Mon Sep 17 00:00:00 2001 From: Jiang Jining Date: Sat, 13 Mar 2021 00:47:08 +0800 Subject: [PATCH 5/5] fix: Set default request body to prevent NullPointerException --- .../processors/impl/HttpProcessor.java | 9 +++++++-- .../processors/impl/HttpProcessorTest.java | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/HttpProcessor.java b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/HttpProcessor.java index 19535e78..8505be0e 100644 --- a/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/HttpProcessor.java +++ b/powerjob-official-processors/src/main/java/tech/powerjob/official/processors/impl/HttpProcessor.java @@ -60,8 +60,13 @@ public class HttpProcessor extends CommonBasicProcessor { } // set default mediaType - if (!"GET".equals(httpParams.method) && StringUtils.isEmpty(httpParams.mediaType)) { - if (JSONValidator.from(httpParams.body).validate()) { + if (!"GET".equals(httpParams.method)) { + // set default request body + if (StringUtils.isEmpty(httpParams.body)) { + httpParams.body = new JSONObject().toJSONString(); + omsLogger.warn("try to use default request body:{}", httpParams.body); + } + if (JSONValidator.from(httpParams.body).validate() && StringUtils.isEmpty(httpParams.mediaType)) { httpParams.mediaType = "application/json"; omsLogger.warn("try to use 'application/json' as media type"); } diff --git a/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/impl/HttpProcessorTest.java b/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/impl/HttpProcessorTest.java index 7d8ea2d6..bc938ed0 100644 --- a/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/impl/HttpProcessorTest.java +++ b/powerjob-official-processors/src/test/java/tech/powerjob/official/processors/impl/HttpProcessorTest.java @@ -41,6 +41,25 @@ class HttpProcessorTest { System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString()))); } + + @Test + void testPostDefaultJson() throws Exception { + String url = "https://mock.uutool.cn/4f5qfgcdahj0?test=true"; + JSONObject params = new JSONObject(); + params.put("url", url); + params.put("method", "POST"); + System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString()))); + } + + @Test + void testPostDefaultWithMediaType() throws Exception { + String url = "https://mock.uutool.cn/4f5qfgcdahj0?test=true"; + JSONObject params = new JSONObject(); + params.put("url", url); + params.put("method", "POST"); + params.put("mediaType", "application/json"); + System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString()))); + } @Test void testTimeout() throws Exception {