mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
fix: timeout params not take effect for HttpProcessor #200
This commit is contained in:
parent
3cc195ee33
commit
4a4ef9ba13
@ -72,6 +72,14 @@
|
|||||||
<version>${junit.version}</version>
|
<version>${junit.version}</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!-- log for test stage -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>ch.qos.logback</groupId>
|
||||||
|
<artifactId>logback-classic</artifactId>
|
||||||
|
<version>${logback.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
|
|||||||
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
|
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
|
||||||
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
|
import com.github.kfcfans.powerjob.worker.log.OmsLogger;
|
||||||
import com.google.common.base.Stopwatch;
|
import com.google.common.base.Stopwatch;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||||
import tech.powerjob.official.processors.util.CommonUtils;
|
import tech.powerjob.official.processors.util.CommonUtils;
|
||||||
|
|
||||||
@ -14,23 +15,30 @@ import tech.powerjob.official.processors.util.CommonUtils;
|
|||||||
* @author tjq
|
* @author tjq
|
||||||
* @since 2021/1/30
|
* @since 2021/1/30
|
||||||
*/
|
*/
|
||||||
|
@Slf4j
|
||||||
public abstract class CommonBasicProcessor implements BasicProcessor {
|
public abstract class CommonBasicProcessor implements BasicProcessor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ProcessResult process(TaskContext taskContext) throws Exception {
|
public ProcessResult process(TaskContext taskContext) throws Exception {
|
||||||
|
|
||||||
|
String status = "unknown";
|
||||||
|
Stopwatch sw = Stopwatch.createStarted();
|
||||||
|
|
||||||
String clzName = this.getClass().getSimpleName();
|
String clzName = this.getClass().getSimpleName();
|
||||||
OmsLogger omsLogger = taskContext.getOmsLogger();
|
OmsLogger omsLogger = taskContext.getOmsLogger();
|
||||||
omsLogger.info("[{}] using params: {}", clzName, CommonUtils.parseParams(taskContext));
|
omsLogger.info("[{}] using params: {}", clzName, CommonUtils.parseParams(taskContext));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Stopwatch sw = Stopwatch.createStarted();
|
|
||||||
ProcessResult result = process0(taskContext);
|
ProcessResult result = process0(taskContext);
|
||||||
omsLogger.info("[{}] execute succeed, using {}, result: {}", clzName, sw, result);
|
omsLogger.info("[{}] execute succeed, using {}, result: {}", clzName, sw, result);
|
||||||
|
status = result.isSuccess() ? "succeed" : "failed";
|
||||||
return result;
|
return result;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
status = "exception";
|
||||||
omsLogger.error("[{}] execute failed!", clzName, t);
|
omsLogger.error("[{}] execute failed!", clzName, t);
|
||||||
return new ProcessResult(false, ExceptionUtils.getMessage(t));
|
return new ProcessResult(false, ExceptionUtils.getMessage(t));
|
||||||
|
} finally {
|
||||||
|
log.info("[{}] status: {}, cost: {}", clzName, status, sw);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ import org.apache.commons.lang3.StringUtils;
|
|||||||
import tech.powerjob.official.processors.CommonBasicProcessor;
|
import tech.powerjob.official.processors.CommonBasicProcessor;
|
||||||
import tech.powerjob.official.processors.util.CommonUtils;
|
import tech.powerjob.official.processors.util.CommonUtils;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
@ -130,6 +131,9 @@ public class HttpProcessor extends CommonBasicProcessor {
|
|||||||
|
|
||||||
private static OkHttpClient getClient(Integer timeout) {
|
private static OkHttpClient getClient(Integer timeout) {
|
||||||
return CLIENT_STORE.computeIfAbsent(timeout, ignore -> new OkHttpClient.Builder()
|
return CLIENT_STORE.computeIfAbsent(timeout, ignore -> new OkHttpClient.Builder()
|
||||||
|
.connectTimeout(Duration.ZERO)
|
||||||
|
.readTimeout(Duration.ZERO)
|
||||||
|
.writeTimeout(Duration.ZERO)
|
||||||
.callTimeout(timeout, TimeUnit.SECONDS)
|
.callTimeout(timeout, TimeUnit.SECONDS)
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,7 @@ import org.apache.commons.lang3.StringUtils;
|
|||||||
public class CommonUtils {
|
public class CommonUtils {
|
||||||
|
|
||||||
public static String parseParams(TaskContext context) {
|
public static String parseParams(TaskContext context) {
|
||||||
if (StringUtils.isEmpty(context.getInstanceParams())) {
|
if (StringUtils.isNotEmpty(context.getInstanceParams())) {
|
||||||
return context.getInstanceParams();
|
return context.getInstanceParams();
|
||||||
}
|
}
|
||||||
return context.getJobParams();
|
return context.getJobParams();
|
||||||
|
@ -0,0 +1,43 @@
|
|||||||
|
package tech.powerjob.official.processors.impl;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSONArray;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.github.kfcfans.powerjob.worker.core.processor.TaskContext;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import tech.powerjob.official.processors.TestUtils;
|
||||||
|
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* description
|
||||||
|
*
|
||||||
|
* @author tjq
|
||||||
|
* @since 2021/2/1
|
||||||
|
*/
|
||||||
|
class FileCleanupProcessorTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testProcess() throws Exception {
|
||||||
|
JSONObject params = new JSONObject();
|
||||||
|
params.put("dirPath", "/Users/tjq/logs/oms-server");
|
||||||
|
params.put("filePattern", "*");
|
||||||
|
params.put("retentionTime", 10);
|
||||||
|
JSONArray array = new JSONArray();
|
||||||
|
array.add(params);
|
||||||
|
|
||||||
|
String paramsStr = array.toJSONString();
|
||||||
|
System.out.println(paramsStr);
|
||||||
|
|
||||||
|
TaskContext taskContext = TestUtils.genTaskContext(paramsStr);
|
||||||
|
System.out.println(new FileCleanupProcessor().process(taskContext));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testPatternCompile() throws Exception {
|
||||||
|
String fileName = "abc.log";
|
||||||
|
System.out.println(fileName.matches("[a-z]*\\.log"));
|
||||||
|
System.out.println(Pattern.matches("\\*", fileName));
|
||||||
|
}
|
||||||
|
}
|
@ -33,4 +33,14 @@ class HttpProcessorTest {
|
|||||||
|
|
||||||
System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString())));
|
System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testTimeout() throws Exception {
|
||||||
|
String url = "http://localhost:7700/tmp/sleep";
|
||||||
|
JSONObject params = new JSONObject();
|
||||||
|
params.put("url", url);
|
||||||
|
params.put("method", "GET");
|
||||||
|
params.put("timeout", 20);
|
||||||
|
System.out.println(new HttpProcessor().process(TestUtils.genTaskContext(params.toJSONString())));
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user