Merge branch '4.2.0-main-upgrade-spring' into v4.2.0

This commit is contained in:
tjq 2022-10-03 13:57:24 +08:00
commit 050190ba89
46 changed files with 15793 additions and 17125 deletions

View File

@ -1,32 +1,31 @@
#!/bin/bash
cd `dirname $0`/../.. || exit
echo "================== 构建 jar =================="
mvn clean package -Pdev -DskipTests -U -e
mvn clean package -Pdev -DskipTests -e
echo "================== 拷贝 jar =================="
/bin/cp -rf powerjob-server/powerjob-server-starter/target/*.jar powerjob-server/docker/powerjob-server.jar
/bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar
echo "================== 关闭老应用 =================="
docker stop powerjob-server
docker stop powerjob-agent
docker stop powerjob-agent2
docker stop powerjob-worker-samples
docker stop powerjob-worker-samples2
echo "================== 删除老容器 =================="
docker container rm powerjob-server
docker container rm powerjob-agent
docker container rm powerjob-agent2
docker container rm powerjob-worker-samples
docker container rm powerjob-worker-samples2
echo "================== 删除旧镜像 =================="
docker rmi -f tjqq/powerjob-server:latest
docker rmi -f tjqq/powerjob-agent:latest
docker rmi -f tjqq/powerjob-worker-samples:latest
echo "================== 构建 powerjob-server 镜像 =================="
docker build -t tjqq/powerjob-server:latest powerjob-server/docker/. || exit
echo "================== 构建 powerjob-agent 镜像 =================="
docker build -t tjqq/powerjob-agent:latest powerjob-worker-agent/. || exit
echo "================== 构建 powerjob-worker-samples 镜像 =================="
docker build -t tjqq/powerjob-worker-samples:latest powerjob-worker-samples/. || exit
echo "================== 准备启动 powerjob-server =================="
docker run -d \
--restart=always \
--name powerjob-server \
-p 7700:7700 -p 10086:10086 -p 5001:5005 -p 10001:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
-e PARAMS="--oms.swagger.enable=true --spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-product" \
-e PARAMS="--oms.swagger.enable=true --spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --oms.mongodb.enable=false --spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-product" \
-v ~/docker/powerjob-server:/root/powerjob/server -v ~/.m2:/root/.m2 \
tjqq/powerjob-server:latest
sleep 60
@ -37,19 +36,19 @@ echo "使用的Server地址$serverAddress"
docker run -d \
--restart=always \
--name powerjob-agent \
--name powerjob-worker-samples \
-p 27777:27777 -p 5002:5005 -p 10002:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
-e PARAMS="--app powerjob-agent-test --server $serverAddress" \
-v ~/docker/powerjob-agent:/root \
tjqq/powerjob-agent:latest
-e PARAMS="--powerjob.worker.server-address=$serverAddress" \
-v ~/docker/powerjob-worker-samples:/root \
tjqq/powerjob-worker-samples:latest
docker run -d \
--restart=always \
--name powerjob-agent2 \
--name powerjob-worker-samples2 \
-p 27778:27777 -p 5003:5005 -p 10003:10000 \
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
-e PARAMS="--app powerjob-agent-test --server $serverAddress" \
-v ~/docker/powerjob-agent2:/root \
tjqq/powerjob-agent:latest
-e PARAMS="--powerjob.worker.server-address=$serverAddress" \
-v ~/docker/powerjob-worker-samples2:/root \
tjqq/powerjob-worker-samples:latest

View File

@ -18,7 +18,8 @@ public enum LogLevel {
DEBUG(1),
INFO(2),
WARN(3),
ERROR(4);
ERROR(4),
OFF(99);
private final int v;

View File

@ -0,0 +1,47 @@
package tech.powerjob.common.model;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
/**
* 任务日志配置
*
* @author yhz
* @since 2022/9/16
*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class LogConfig {
/**
* log type {@link LogType}
*/
private Integer type;
/**
* log level {@link tech.powerjob.common.enums.LogLevel}
*/
private Integer level;
private String loggerName;
@Getter
@AllArgsConstructor
public enum LogType {
ONLINE(1),
LOCAL(2);
private final Integer v;
public LogType of(Integer type) {
for (LogType logType : values()) {
if (logType.v.equals(type)) {
return logType;
}
}
return ONLINE;
}
}
}

View File

@ -94,6 +94,10 @@ public class ServerScheduleJobReq implements PowerSerializable {
*/
private String alarmConfig;
/**
* 日志配置
*/
private String logConfig;
@Override
public String path() {

View File

@ -5,6 +5,7 @@ import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.utils.CommonUtils;
import lombok.Data;
@ -139,6 +140,16 @@ public class SaveJobInfoRequest {
*/
private AlarmConfig alarmConfig;
/**
* 任务归类开放给接入方自由定制
*/
private String tag;
/**
* 日志配置包括日志级别日志方式等配置信息
*/
private LogConfig logConfig;
/**
* Check non-null properties.

View File

@ -50,4 +50,6 @@ public class JobInfoQuery extends PowerQuery {
private Date gmtModifiedGt;
private Integer dispatchStrategyEq;
private String tagEq;
}

View File

@ -2,6 +2,7 @@ package tech.powerjob.common.response;
import lombok.Data;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.LogConfig;
import java.util.Date;
@ -125,4 +126,15 @@ public class JobInfoDTO {
private String lifecycle;
private AlarmConfig alarmConfig;
/**
* 任务归类开放给接入方自由定制
*/
private String tag;
/**
* 日志配置包括日志级别日志方式等配置信息
*/
private LogConfig logConfig;
}

View File

@ -0,0 +1,84 @@
package tech.powerjob.official.processors.impl;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.official.processors.util.CommonUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
/**
* 配置处理器
* 超简易的配置中心用于配置的下发需要配合秒级 + 广播任务使用
* 超低成本下的解决方案强配置 or 高SLA 场景请使用标准的配置管理中间件
* 外部调用方法 {@link ConfigProcessor#fetchConfig()}
*
* @author tjq
* @since 2022/9/17
*/
@Slf4j
public class ConfigProcessor implements BroadcastProcessor {
/**
* 获取配置
* @return 控制台下发的配置
*/
public static Map<String, Object> fetchConfig() {
if (config == null) {
return Maps.newHashMap();
}
return Optional.ofNullable(config.getConfig()).orElse(Maps.newHashMap());
}
private static Config config;
@Override
public ProcessResult process(TaskContext context) throws Exception {
Config newCfg = JsonUtils.parseObject(CommonUtils.parseParams(context), Config.class);
context.getOmsLogger().info("[ConfigProcessor] receive and update config: {}", config);
// 空场景不更新
final Map<String, Object> realConfig = newCfg.config;
if (realConfig == null) {
return new ProcessResult(false, "CONFIG_IS_NULL");
}
config = newCfg;
if (StringUtils.isNotEmpty(config.persistentFileName)) {
final File file = new File(config.persistentFileName);
String content = JSONObject.toJSONString(realConfig);
FileUtils.copyToFile(new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)), file);
}
return new ProcessResult(true, "UPDATE_SUCCESS");
}
@Data
public static class Config implements Serializable {
/**
* 原始配置
*/
private Map<String, Object> config;
/**
* 持久到本地的全路径名称
*/
private String persistentFileName;
}
}

View File

@ -13,7 +13,9 @@ import tech.powerjob.official.processors.util.CommonUtils;
import java.io.*;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
@ -30,6 +32,7 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors());
private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
protected static final String SH_SHELL = "/bin/sh";
protected static final String CMD_SHELL = "cmd.exe";
private static final String WORKER_DIR = System.getProperty("user.home") + "/powerjob/worker/official_script_processor/";
@ -55,13 +58,16 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
}
// 授权
if ( !SystemUtils.IS_OS_WINDOWS) {
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
// 等待返回这里不可能导致死锁shell产生大量数据可能导致死锁
chmodPb.start().waitFor();
omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~");
}
// 2. 执行目标脚本
ProcessBuilder pb = new ProcessBuilder(getRunCommand(), scriptPath);
ProcessBuilder pb = StringUtils.equals(getRunCommand(), CMD_SHELL) ?
new ProcessBuilder(getRunCommand(), "/c", scriptPath)
: new ProcessBuilder(getRunCommand(), scriptPath);
Process process = pb.start();
StringBuilder inputBuilder = new StringBuilder();
@ -70,10 +76,11 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
boolean success = true;
String result;
final Charset charset = getCharset();
try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) {
POOL.execute(() -> copyStream(is, inputBuilder, omsLogger));
POOL.execute(() -> copyStream(es, errorBuilder, omsLogger));
POOL.execute(() -> copyStream(is, inputBuilder, omsLogger, charset));
POOL.execute(() -> copyStream(es, errorBuilder, omsLogger, charset));
success = process.waitFor() == 0;
@ -106,17 +113,27 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
}
}
// 非下载链接 processInfo 生成可执行文件
final Charset charset = getCharset();
if(charset != null)
{
try (Writer fstream = new OutputStreamWriter(Files.newOutputStream(script.toPath()), charset); BufferedWriter out = new BufferedWriter(fstream)) {
out.write(processorInfo);
out.flush();
}
}
else {
try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) {
bw.write(processorInfo);
bw.flush();
}
}
return scriptPath;
}
private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger) {
private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) {
String line;
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) {
while ((line = br.readLine()) != null) {
sb.append(line);
// 同步到在线日志
@ -142,4 +159,12 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
* @return 执行脚本的命令
*/
protected abstract String getRunCommand();
/**
* 默认不指定
* @return Charset
*/
protected Charset getCharset() {
return StandardCharsets.UTF_8;
}
}

View File

@ -0,0 +1,27 @@
package tech.powerjob.official.processors.impl.script;
import java.nio.charset.Charset;
/**
* python processor
*
* @author fddc
* @since 2021/5/14
*/
public class CMDProcessor extends AbstractScriptProcessor {
@Override
protected String getScriptName(Long instanceId) {
return String.format("cmd_%d.bat", instanceId);
}
@Override
protected String getRunCommand() {
return "cmd.exe";
}
@Override
protected Charset getCharset() {
return Charset.defaultCharset();
}
}

View File

@ -0,0 +1,27 @@
package tech.powerjob.official.processors.impl.script;
import java.nio.charset.Charset;
/**
* python processor
*
* @author fddc
* @since 2021/5/14
*/
public class PowerShellProcessor extends AbstractScriptProcessor {
@Override
protected String getScriptName(Long instanceId) {
return String.format("powershell_%d.ps1", instanceId);
}
@Override
protected String getRunCommand() {
return "powershell.exe";
}
@Override
protected Charset getCharset() {
return Charset.defaultCharset();
}
}

View File

@ -1,5 +1,6 @@
package tech.powerjob.official.processors;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.WorkflowContext;
import tech.powerjob.worker.log.impl.OmsLocalLogger;
@ -24,7 +25,7 @@ public class TestUtils {
taskContext.setJobParams(jobParams);
taskContext.setTaskId("0.0");
taskContext.setTaskName("TEST_TASK");
taskContext.setOmsLogger(new OmsLocalLogger());
taskContext.setOmsLogger(new OmsLocalLogger(new LogConfig()));
taskContext.setWorkflowContext(new WorkflowContext(null, null));
return taskContext;
}

View File

@ -27,7 +27,7 @@
<properties>
<swagger.version>2.9.2</swagger.version>
<springboot.version>2.3.4.RELEASE</springboot.version>
<springboot.version>2.7.4</springboot.version>
<powerjob.common.version>4.1.1</powerjob.common.version>
<!-- MySQL version that corresponds to spring-boot-dependencies version. -->
<mysql.version>8.0.28</mysql.version>

View File

@ -1,6 +1,7 @@
package tech.powerjob.server.core.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.data.jpa.domain.Specification;
@ -107,6 +108,10 @@ public class JobService {
}
jobInfoDO.setAlarmConfig(JSON.toJSONString(request.getAlarmConfig()));
}
// 日志配置
if (request.getLogConfig() != null) {
jobInfoDO.setLogConfig(JSONObject.toJSONString(request.getLogConfig()));
}
JobInfoDO res = jobInfoRepository.saveAndFlush(jobInfoDO);
return res.getId();
}

View File

@ -145,4 +145,13 @@ public class JobInfoDO {
*/
private String alarmConfig;
/**
* 任务归类开放给接入方自由定制
*/
private String tag;
/**
* 日志配置包括日志级别日志方式等配置信息
*/
private String logConfig;
}

View File

@ -54,8 +54,6 @@ public class InstanceController {
@Resource
private CacheService cacheService;
@Resource
private AppInfoRepository appInfoRepository;
@Resource
private InstanceInfoRepository instanceInfoRepository;
@GetMapping("/stop")

View File

@ -1,10 +1,12 @@
package tech.powerjob.server.web.response;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.model.LifeCycle;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.server.common.SJ;
@ -144,6 +146,16 @@ public class JobInfoVO {
private AlarmConfig alarmConfig;
/**
* 任务归类开放给接入方自由定制
*/
private String tag;
/**
* 日志配置包括日志级别日志方式等配置信息
*/
private LogConfig logConfig;
public static JobInfoVO from(JobInfoDO jobInfoDO) {
JobInfoVO jobInfoVO = new JobInfoVO();
BeanUtils.copyProperties(jobInfoDO, jobInfoVO);
@ -173,6 +185,10 @@ public class JobInfoVO {
jobInfoVO.setLifeCycle(LifeCycle.parse(jobInfoDO.getLifecycle()));
}
if (!StringUtils.isEmpty(jobInfoDO.getLogConfig())) {
jobInfoVO.setLogConfig(JSONObject.parseObject(jobInfoDO.getLogConfig(), LogConfig.class));
}
return jobInfoVO;
}
}

View File

@ -6,7 +6,7 @@
<meta name="viewport" content="width=device-width,initial-scale=1.0">
<link rel="icon" href="/favicon.ico">
<title>PowerJob</title>
<link href="/js/0.js" rel="prefetch"><link href="/js/1.js" rel="prefetch"><link href="/js/10.js" rel="prefetch"><link href="/js/11.js" rel="prefetch"><link href="/js/12.js" rel="prefetch"><link href="/js/2.js" rel="prefetch"><link href="/js/3.js" rel="prefetch"><link href="/js/4.js" rel="prefetch"><link href="/js/5.js" rel="prefetch"><link href="/js/6.js" rel="prefetch"><link href="/js/7.js" rel="prefetch"><link href="/js/8.js" rel="prefetch"><link href="/js/9.js" rel="prefetch"><link href="/js/app.js" rel="preload" as="script"><link href="/js/chunk-vendors.js" rel="preload" as="script"></head>
<link href="/js/0.js" rel="prefetch"><link href="/js/1.js" rel="prefetch"><link href="/js/10.js" rel="prefetch"><link href="/js/11.js" rel="prefetch"><link href="/js/2.js" rel="prefetch"><link href="/js/3.js" rel="prefetch"><link href="/js/4.js" rel="prefetch"><link href="/js/5.js" rel="prefetch"><link href="/js/6.js" rel="prefetch"><link href="/js/7.js" rel="prefetch"><link href="/js/8.js" rel="prefetch"><link href="/js/9.js" rel="prefetch"><link href="/js/app.js" rel="preload" as="script"><link href="/js/chunk-vendors.js" rel="preload" as="script"></head>
<body>
<noscript>
<strong>We're sorry but oms-console doesn't work properly without JavaScript enabled. Please enable it to continue.</strong>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -36,6 +36,9 @@ public class MainApplication implements Runnable {
@Option(names = {"-l", "--length"}, description = "ProcessResult#msg max length")
private int length = 1024;
@Option(names = {"-t", "--tag"}, description = "worker-agent's tag")
private String tag;
public static void main(String[] args) {
CommandLine commandLine = new CommandLine(new MainApplication());
commandLine.execute(args);
@ -52,6 +55,7 @@ public class MainApplication implements Runnable {
cfg.setServerAddress(Splitter.on(",").splitToList(server));
cfg.setStoreStrategy(StoreStrategy.MEMORY.name().equals(storeStrategy) ? StoreStrategy.MEMORY : StoreStrategy.DISK);
cfg.setMaxResultLength(length);
cfg.setTag(tag);
PowerJobWorker worker = new PowerJobWorker();
worker.setConfig(cfg);

View File

@ -0,0 +1,41 @@
package tech.powerjob.samples.processors.test;
import com.alibaba.fastjson.JSONObject;
import org.springframework.stereotype.Component;
import tech.powerjob.official.processors.util.CommonUtils;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.log.OmsLogger;
import java.util.Date;
import java.util.Optional;
/**
* LogTestProcessor
*
* @author tjq
* @since 2022/9/18
*/
@Component
public class LogTestProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
final OmsLogger omsLogger = context.getOmsLogger();
final String parseParams = CommonUtils.parseParams(context);
final JSONObject config = Optional.ofNullable(JSONObject.parseObject(parseParams)).orElse(new JSONObject());
final long loopTimes = Optional.ofNullable(config.getLong("loopTimes")).orElse(1000L);
for (int i = 0; i < loopTimes; i++) {
omsLogger.debug("[DEBUG] one DEBUG log in {}", new Date());
omsLogger.info("[INFO] one INFO log in {}", new Date());
omsLogger.warn("[WARN] one WARN log in {}", new Date());
omsLogger.error("[ERROR] one ERROR log in {}", new Date());
}
return new ProcessResult(true);
}
}

View File

@ -15,7 +15,7 @@
<properties>
<powerjob.worker.version>4.1.1</powerjob.worker.version>
<springboot.version>2.3.4.RELEASE</springboot.version>
<springboot.version>2.7.4</springboot.version>
</properties>
<dependencies>

View File

@ -14,7 +14,7 @@
<packaging>jar</packaging>
<properties>
<spring.version>5.2.4.RELEASE</spring.version>
<spring.version>5.3.23</spring.version>
<powerjob.common.version>4.1.1</powerjob.common.version>
<h2.db.version>1.4.200</h2.db.version>
<hikaricp.version>3.4.2</hikaricp.version>

View File

@ -35,7 +35,7 @@ public class OmsLogHandler {
// 上报锁只需要一个线程上报即可
private final Lock reportLock = new ReentrantLock();
// 生产者消费者模式异步上传日志
private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue();
private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);
// 每次上报携带的数据条数
private static final int BATCH_SIZE = 20;
@ -61,7 +61,10 @@ public class OmsLogHandler {
}
InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);
logQueue.offer(tuple);
boolean offerRet = logQueue.offer(tuple);
if (!offerRet) {
log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);
}
}

View File

@ -87,7 +87,7 @@ public class ServerDiscoveryService {
}
if (StringUtils.isEmpty(result)) {
log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined.");
log.warn("[PowerDiscovery] can't find any available server, this worker[appId={}] has been quarantined.", appId);
// Server 高可用的前提下连续失败多次说明该节点与外界失联Server已经将秒级任务转移到其他Worker需要杀死本地的任务
if (FAILED_COUNT++ > MAX_FAILED_COUNT) {
@ -108,7 +108,7 @@ public class ServerDiscoveryService {
} else {
// 重置失败次数
FAILED_COUNT = 0;
log.debug("[PowerDiscovery] current server is {}.", result);
log.debug("[PowerDiscovery] appId={}, current server is {}.", appId, result);
return result;
}
}

View File

@ -11,6 +11,8 @@ import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.enums.ExecuteType;
import tech.powerjob.common.enums.ProcessorType;
import tech.powerjob.common.enums.TimeExpressionType;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.common.constants.TaskStatus;
@ -22,6 +24,7 @@ import tech.powerjob.worker.core.ProcessorBeanFactory;
import tech.powerjob.worker.core.executor.ProcessorRunnable;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.log.OmsLogger;
import tech.powerjob.worker.log.OmsLoggerFactory;
import tech.powerjob.worker.log.impl.OmsServerLogger;
import tech.powerjob.worker.persistence.TaskDO;
import tech.powerjob.worker.pojo.model.InstanceInfo;
@ -116,7 +119,7 @@ public class ProcessorTracker {
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
this.taskTrackerActorRef = workerRuntime.getActorSystem().actorSelection(akkaRemotePath);
this.omsLogger = new OmsServerLogger(instanceId, workerRuntime.getOmsLogHandler());
this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime);
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
this.lastIdleTime = -1L;
this.lastCompletedTaskCount = 0L;

View File

@ -0,0 +1,34 @@
package tech.powerjob.worker.log;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.worker.common.WorkerRuntime;
import tech.powerjob.worker.log.impl.OmsLocalLogger;
import tech.powerjob.worker.log.impl.OmsServerLogger;
/**
* OmsLoggerFactory
*
* @author tjq
* @since 2022/9/17
*/
public class OmsLoggerFactory {
public static OmsLogger build(Long instanceId, String logConfig, WorkerRuntime workerRuntime) {
LogConfig cfg;
if (logConfig == null) {
cfg = new LogConfig();
} else {
try {
cfg = JsonUtils.parseObject(logConfig, LogConfig.class);
} catch (Exception ignore) {
cfg = new LogConfig();
}
}
if (LogConfig.LogType.LOCAL.getV().equals(cfg.getType())) {
return new OmsLocalLogger(cfg);
}
return new OmsServerLogger(cfg, instanceId, workerRuntime.getOmsLogHandler());
}
}

View File

@ -0,0 +1,68 @@
package tech.powerjob.worker.log.impl;
import tech.powerjob.common.enums.LogLevel;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.worker.log.OmsLogger;
/**
* AbstractOmsLogger
*
* @author tjq
* @since 2022/9/16
*/
public abstract class AbstractOmsLogger implements OmsLogger {
private final LogConfig logConfig;
public AbstractOmsLogger(LogConfig logConfig) {
this.logConfig = logConfig;
// 兼容空数据场景添加默认值尽量与原有逻辑保持兼容
if (logConfig.getLevel() == null) {
logConfig.setLevel(LogLevel.INFO.getV());
}
if (logConfig.getType() == null) {
logConfig.setType(LogConfig.LogType.ONLINE.getV());
}
}
abstract void debug0(String messagePattern, Object... args);
abstract void info0(String messagePattern, Object... args);
abstract void warn0(String messagePattern, Object... args);
abstract void error0(String messagePattern, Object... args);
@Override
public void debug(String messagePattern, Object... args) {
if (LogLevel.DEBUG.getV() < logConfig.getLevel()) {
return;
}
debug0(messagePattern, args);
}
@Override
public void info(String messagePattern, Object... args) {
if (LogLevel.INFO.getV() < logConfig.getLevel()) {
return;
}
info0(messagePattern, args);
}
@Override
public void warn(String messagePattern, Object... args) {
if (LogLevel.WARN.getV() < logConfig.getLevel()) {
return;
}
warn0(messagePattern, args);
}
@Override
public void error(String messagePattern, Object... args) {
if (LogLevel.ERROR.getV() < logConfig.getLevel()) {
return;
}
error0(messagePattern, args);
}
}

View File

@ -1,33 +1,46 @@
package tech.powerjob.worker.log.impl;
import tech.powerjob.worker.log.OmsLogger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.powerjob.common.model.LogConfig;
/**
* for local test
* More user feedback when the task volume server timeout serious. After pressure testing, we found that there is no bottleneck in the server processing scheduling tasks, and it is assumed that the large amount of logs is causing a serious bottleneck. Therefore, we need to provide local logging API for large MR tasks.
*
* @author tjq
* @since 2021/2/4
*/
@Slf4j
public class OmsLocalLogger implements OmsLogger {
@Override
public void debug(String messagePattern, Object... args) {
log.debug(messagePattern, args);
public class OmsLocalLogger extends AbstractOmsLogger {
private final Logger LOGGER;
private static final String DEFAULT_LOGGER_NAME = OmsLocalLogger.class.getName();
public OmsLocalLogger(LogConfig logConfig) {
super(logConfig);
String loggerName = StringUtils.isEmpty(logConfig.getLoggerName()) ? DEFAULT_LOGGER_NAME : logConfig.getLoggerName();
LOGGER = LoggerFactory.getLogger(loggerName);
}
@Override
public void info(String messagePattern, Object... args) {
log.info(messagePattern, args);
public void debug0(String messagePattern, Object... args) {
LOGGER.debug(messagePattern, args);
}
@Override
public void warn(String messagePattern, Object... args) {
log.warn(messagePattern, args);
public void info0(String messagePattern, Object... args) {
LOGGER.info(messagePattern, args);
}
@Override
public void error(String messagePattern, Object... args) {
log.error(messagePattern, args);
public void warn0(String messagePattern, Object... args) {
LOGGER.warn(messagePattern, args);
}
@Override
public void error0(String messagePattern, Object... args) {
LOGGER.error(messagePattern, args);
}
}

View File

@ -1,43 +1,49 @@
package tech.powerjob.worker.log.impl;
import tech.powerjob.common.enums.LogLevel;
import tech.powerjob.common.model.LogConfig;
import tech.powerjob.worker.background.OmsLogHandler;
import tech.powerjob.worker.log.OmsLogger;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;
/**
* PowerJob 在线日志直接上报到 Server可在控制台直接查看
* WARNPlease do not use this logger to print large amounts of logs!
* WARNPlease do not use this logger to print large amounts of logs!
* WARNPlease do not use this logger to print large amounts of logs!
*
* @author tjq
* @since 2020/4/21
* @since 2022/9/16
*/
@AllArgsConstructor
public class OmsServerLogger implements OmsLogger {
public class OmsServerLogger extends AbstractOmsLogger {
private final long instanceId;
private final OmsLogHandler omsLogHandler;
public OmsServerLogger(LogConfig logConfig, long instanceId, OmsLogHandler omsLogHandler) {
super(logConfig);
this.instanceId = instanceId;
this.omsLogHandler = omsLogHandler;
}
@Override
public void debug(String messagePattern, Object... args) {
public void debug0(String messagePattern, Object... args) {
process(LogLevel.DEBUG, messagePattern, args);
}
@Override
public void info(String messagePattern, Object... args) {
public void info0(String messagePattern, Object... args) {
process(LogLevel.INFO, messagePattern, args);
}
@Override
public void warn(String messagePattern, Object... args) {
public void warn0(String messagePattern, Object... args) {
process(LogLevel.WARN, messagePattern, args);
}
@Override
public void error(String messagePattern, Object... args) {
public void error0(String messagePattern, Object... args) {
process(LogLevel.ERROR, messagePattern, args);
}

View File

@ -51,4 +51,6 @@ public class InstanceInfo implements Serializable {
private int threadConcurrency;
// 子任务重试次数任务本身的重试机制由server控制
private int taskRetryNum;
private String logConfig;
}

View File

@ -32,6 +32,8 @@ public class TaskTrackerStartTaskReq implements PowerSerializable {
// 秒级任务专用
private long subInstanceId;
private String logConfig;
/**
* 创建 TaskTrackerStartTaskReq该构造方法必须在 TaskTracker 节点调用
@ -47,5 +49,7 @@ public class TaskTrackerStartTaskReq implements PowerSerializable {
this.taskCurrentRetryNums = task.getFailedCnt();
this.subInstanceId = task.getSubInstanceId();
this.logConfig = instanceInfo.getLogConfig();
}
}