feat: offical HttpProcessor

This commit is contained in:
tjq 2021-01-31 18:44:40 +08:00
parent c4098a60e7
commit 1af361444d
5 changed files with 338 additions and 0 deletions

View File

@ -13,5 +13,108 @@
<version>3.4.5</version>
<packaging>jar</packaging>
<properties>
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
<!-- 不会被打包的部分scope 只能是 test 或 provide -->
<junit.version>5.6.1</junit.version>
<logback.version>1.2.3</logback.version>
<powerjob.worker.version>3.4.4</powerjob.worker.version>
<!-- 全部 shade 化,避免依赖冲突 -->
<fastjson.version>1.2.68</fastjson.version>
<okhttp.version>3.14.9</okhttp.version>
<guava.version>29.0-jre</guava.version>
<commons.lang.version>3.10</commons.lang.version>
</properties>
<dependencies>
<!-- fastJson(used for serialization of DAG) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- OKHttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${okhttp.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons.lang.version}</version>
</dependency>
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.github.kfcfans</groupId>
<artifactId>powerjob-worker</artifactId>
<version>${powerjob.worker.version}</version>
<scope>provided</scope>
</dependency>
<!-- Junit tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${mvn.shade.plugin.version}</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<relocations>
<relocation>
<pattern>okhttp3</pattern>
<shadedPattern>shade.powerjob.okhttp3</shadedPattern>
</relocation>
<relocation>
<pattern>okio</pattern>
<shadedPattern>shade.powerjob.okio</shadedPattern>
</relocation>
<relocation>
<pattern>com</pattern>
<shadedPattern>shade.powerjob.com</shadedPattern>
</relocation>
<relocation>
<pattern>org</pattern>
<shadedPattern>shade.powerjob.org</shadedPattern>
</relocation>
<relocation>
<pattern>javax</pattern>
<shadedPattern>shade.powerjob.javax</shadedPattern>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,46 @@
package tech.powerjob.offical.processors;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import com.google.common.base.Stopwatch;
import org.apache.commons.lang3.exception.ExceptionUtils;
/**
* CommonBasicProcessor
*
* @author tjq
* @since 2021/1/30
*/
public abstract class CommonBasicProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext taskContext) throws Exception {
String clzName = this.getClass().getSimpleName();
OmsLogger omsLogger = taskContext.getOmsLogger();
omsLogger.info("[{}] using params: {}", clzName, taskContext.getJobParams());
try {
Stopwatch sw = Stopwatch.createStarted();
ProcessResult result = process0(taskContext);
omsLogger.info("[{}] execute succeed, using {}, result: {}", clzName, sw, result);
return suit(result);
} catch (Throwable t) {
omsLogger.error("[{}] execute failed!", clzName, t);
return new ProcessResult(false, ExceptionUtils.getMessage(t));
}
}
private static ProcessResult suit(ProcessResult processResult) {
if (processResult.getMsg() == null || processResult.getMsg().length() < 1024) {
return processResult;
}
processResult.setMsg(processResult.getMsg().substring(0, 1024) + "...");
return processResult;
}
protected abstract ProcessResult process0(TaskContext taskContext) throws Exception;
}

View File

@ -0,0 +1,121 @@
package tech.powerjob.offical.processors.impl;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONValidator;
import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
import lombok.Data;
import okhttp3.*;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.offical.processors.CommonBasicProcessor;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* common http processor
*
* @author tjq
* @since 2021/1/30
*/
public class HttpProcessor extends CommonBasicProcessor {
private static final OkHttpClient client;
static {
client = new OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(60, TimeUnit.SECONDS)
.build();
}
@Override
public ProcessResult process0(TaskContext taskContext) throws Exception {
OmsLogger omsLogger = taskContext.getOmsLogger();
HttpParams httpParams = JSONObject.parseObject(taskContext.getJobParams(), HttpParams.class);
if (StringUtils.isEmpty(httpParams.url)) {
return new ProcessResult(false, "url can't be empty!");
}
if (!httpParams.url.startsWith("http")) {
httpParams.url = "http://" + httpParams.url;
}
omsLogger.info("[HttpProcessor] request url: {}", httpParams.url);
// set default method
if (StringUtils.isEmpty(httpParams.method)) {
httpParams.method = "GET";
omsLogger.info("[HttpProcessor] using default request method: GET");
} else {
omsLogger.info("[HttpProcessor] request method: {}", httpParams.method);
}
// set default mediaType
if (!"GET".equals(httpParams.method) && StringUtils.isEmpty(httpParams.mediaType)) {
if (JSONValidator.from(httpParams.body).validate()) {
httpParams.mediaType = "application/json";
omsLogger.warn("[HttpProcessor] try to use 'application/json' as media type");
}
}
Request.Builder builder = new Request.Builder().url(httpParams.url);
if (httpParams.headers != null) {
httpParams.headers.forEach((k, v) -> {
builder.addHeader(k, v);
omsLogger.info("[HttpProcessor] add header {}:{}", k, v);
});
}
switch (httpParams.method) {
case "PUT":
case "DELETE":
case "POST":
MediaType mediaType = MediaType.parse(httpParams.mediaType);
omsLogger.info("[HttpProcessor] mediaType: {}", mediaType);
RequestBody requestBody = RequestBody.create(mediaType, httpParams.body);
builder.method(httpParams.method, requestBody);
break;
default:
builder.get();
}
Response response = client.newCall(builder.build()).execute();
omsLogger.info("[HttpProcessor] response: {}", response);
String msgBody = "";
if (response.body() != null) {
msgBody = response.body().string();
}
String res = String.format("code:%d,body:%s", response.code(), msgBody);
omsLogger.info("[HttpProcessor] process result: {}", res);
return new ProcessResult(true, res);
}
@Data
public static class HttpParams {
/**
* POST / GET / PUT / DELETE
*/
private String method;
/**
* the request url
*/
private String url;
/**
* application/json
* application/xml
* image/png
* image/jpeg
* image/gif
*/
private String mediaType;
private String body;
private Map<String, String> headers;
}
}

View File

@ -0,0 +1,30 @@
package tech.powerjob.offical.processors;
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
import com.github.kfcfans.powerjob.worker.log.impl.OmsServerLogger;
import java.util.concurrent.ThreadLocalRandom;
/**
* TestUtils
*
* @author tjq
* @since 2021/1/30
*/
public class TestUtils {
public static TaskContext genTaskContext(String jobParams) {
long jobId = ThreadLocalRandom.current().nextLong();
TaskContext taskContext = new TaskContext();
taskContext.setJobId(jobId);
taskContext.setInstanceId(jobId);
taskContext.setJobParams(jobParams);
taskContext.setTaskId("0.0");
taskContext.setTaskName("TEST_TASK");
taskContext.setOmsLogger(new OmsServerLogger(jobId));
return taskContext;
}
}

View File

@ -0,0 +1,38 @@
package tech.powerjob.offical.processors.impl;
import com.alibaba.fastjson.JSONObject;
import org.junit.jupiter.api.Test;
import tech.powerjob.offical.processors.TestUtils;
import static org.junit.jupiter.api.Assertions.*;
/**
* HttpProcessorTest
*
* @author tjq
* @since 2021/1/31
*/
class HttpProcessorTest {
@Test
void testGet() throws Exception {
String url = "https://www.baidu.com";
JSONObject params = new JSONObject();
params.put("url", url);
params.put("method", "GET");
System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString())));
}
@Test
void testPost() throws Exception {
String url = "https://mock.uutool.cn/4f5qfgcdahj0?test=true";
JSONObject params = new JSONObject();
params.put("url", url);
params.put("method", "POST");
params.put("mediaType", "application/json");
params.put("body", params.toJSONString());
System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString())));
}
}