mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
Merge branch 'v4.2.0'
This commit is contained in:
commit
dc98f5f37a
@ -14,7 +14,7 @@ services:
|
||||
container_name: powerjob-mysql
|
||||
image: powerjob/powerjob-mysql:4.1.1
|
||||
ports:
|
||||
- "3306:3306"
|
||||
- "3307:3306"
|
||||
volumes:
|
||||
- ./powerjob-data/powerjob-mysql:/var/lib/mysql
|
||||
command: --lower_case_table_names=1
|
||||
|
@ -110,6 +110,8 @@ CREATE TABLE `job_info`
|
||||
`task_retry_num` int not NULL default 0 COMMENT 'Task重试次数',
|
||||
`time_expression` varchar(255) default NULL COMMENT '时间表达式,内容取决于time_expression_type,1:CRON/2:NULL/3:LONG/4:LONG',
|
||||
`time_expression_type` int not NULL COMMENT '时间表达式类型,1:CRON/2:API/3:FIX_RATE/4:FIX_DELAY,5:WORKFLOW\n)',
|
||||
`tag` varchar(255) DEFAULT NULL COMMENT 'TAG',
|
||||
`log_config` varchar(255) DEFAULT NULL COMMENT '日志配置',
|
||||
`extra` varchar(255) DEFAULT NULL COMMENT '扩展字段',
|
||||
`gmt_create` datetime not NULL COMMENT '创建时间',
|
||||
`gmt_modified` datetime not NULL COMMENT '更新时间',
|
||||
|
@ -1,32 +1,31 @@
|
||||
#!/bin/bash
|
||||
cd `dirname $0`/../.. || exit
|
||||
echo "================== 构建 jar =================="
|
||||
mvn clean package -Pdev -DskipTests -U -e
|
||||
mvn clean package -Pdev -DskipTests -e
|
||||
echo "================== 拷贝 jar =================="
|
||||
/bin/cp -rf powerjob-server/powerjob-server-starter/target/*.jar powerjob-server/docker/powerjob-server.jar
|
||||
/bin/cp -rf powerjob-worker-agent/target/*.jar powerjob-worker-agent/powerjob-agent.jar
|
||||
echo "================== 关闭老应用 =================="
|
||||
docker stop powerjob-server
|
||||
docker stop powerjob-agent
|
||||
docker stop powerjob-agent2
|
||||
docker stop powerjob-worker-samples
|
||||
docker stop powerjob-worker-samples2
|
||||
echo "================== 删除老容器 =================="
|
||||
docker container rm powerjob-server
|
||||
docker container rm powerjob-agent
|
||||
docker container rm powerjob-agent2
|
||||
docker container rm powerjob-worker-samples
|
||||
docker container rm powerjob-worker-samples2
|
||||
echo "================== 删除旧镜像 =================="
|
||||
docker rmi -f tjqq/powerjob-server:latest
|
||||
docker rmi -f tjqq/powerjob-agent:latest
|
||||
docker rmi -f tjqq/powerjob-worker-samples:latest
|
||||
echo "================== 构建 powerjob-server 镜像 =================="
|
||||
docker build -t tjqq/powerjob-server:latest powerjob-server/docker/. || exit
|
||||
echo "================== 构建 powerjob-agent 镜像 =================="
|
||||
docker build -t tjqq/powerjob-agent:latest powerjob-worker-agent/. || exit
|
||||
echo "================== 构建 powerjob-worker-samples 镜像 =================="
|
||||
docker build -t tjqq/powerjob-worker-samples:latest powerjob-worker-samples/. || exit
|
||||
echo "================== 准备启动 powerjob-server =================="
|
||||
docker run -d \
|
||||
--restart=always \
|
||||
--name powerjob-server \
|
||||
-p 7700:7700 -p 10086:10086 -p 5001:5005 -p 10001:10000 \
|
||||
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
|
||||
-e PARAMS="--oms.swagger.enable=true --spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-product" \
|
||||
-e PARAMS="--oms.swagger.enable=true --spring.profiles.active=product --spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-product?useUnicode=true&characterEncoding=UTF-8 --oms.mongodb.enable=false --spring.data.mongodb.uri=mongodb://remotehost:27017/powerjob-product" \
|
||||
-v ~/docker/powerjob-server:/root/powerjob/server -v ~/.m2:/root/.m2 \
|
||||
tjqq/powerjob-server:latest
|
||||
sleep 60
|
||||
@ -37,19 +36,19 @@ echo "使用的Server地址:$serverAddress"
|
||||
|
||||
docker run -d \
|
||||
--restart=always \
|
||||
--name powerjob-agent \
|
||||
--name powerjob-worker-samples \
|
||||
-p 27777:27777 -p 5002:5005 -p 10002:10000 \
|
||||
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
|
||||
-e PARAMS="--app powerjob-agent-test --server $serverAddress" \
|
||||
-v ~/docker/powerjob-agent:/root \
|
||||
tjqq/powerjob-agent:latest
|
||||
-e PARAMS="--powerjob.worker.server-address=$serverAddress" \
|
||||
-v ~/docker/powerjob-worker-samples:/root \
|
||||
tjqq/powerjob-worker-samples:latest
|
||||
|
||||
docker run -d \
|
||||
--restart=always \
|
||||
--name powerjob-agent2 \
|
||||
--name powerjob-worker-samples2 \
|
||||
-p 27778:27777 -p 5003:5005 -p 10003:10000 \
|
||||
-e JVMOPTIONS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=10000 -Dcom.sun.management.jmxremote.rmi.port=10000 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
|
||||
-e PARAMS="--app powerjob-agent-test --server $serverAddress" \
|
||||
-v ~/docker/powerjob-agent2:/root \
|
||||
tjqq/powerjob-agent:latest
|
||||
-e PARAMS="--powerjob.worker.server-address=$serverAddress" \
|
||||
-v ~/docker/powerjob-worker-samples2:/root \
|
||||
tjqq/powerjob-worker-samples:latest
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
-- Upgrade SQL FROM 4.0.x to 4.1.x
|
||||
-- ----------------------------
|
||||
-- Table change for workflow_instance_info
|
||||
-- ----------------------------
|
6
others/sql/upgrade/v4.1.x-v4.2.x.sql
Normal file
6
others/sql/upgrade/v4.1.x-v4.2.x.sql
Normal file
@ -0,0 +1,6 @@
|
||||
-- Upgrade SQL FROM 4.1.x to 4.2.x
|
||||
-- ----------------------------
|
||||
-- Table change for job_info
|
||||
-- ----------------------------
|
||||
alter table job_info add tag varchar(255) comment 'TAG' default null;
|
||||
alter table job_info add log_config varchar(255) comment 'logConfig' default null;
|
@ -10,13 +10,13 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-client</artifactId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<junit.version>5.6.1</junit.version>
|
||||
<fastjson.version>1.2.83</fastjson.version>
|
||||
<powerjob.common.version>4.1.1</powerjob.common.version>
|
||||
<powerjob.common.version>4.2.0</powerjob.common.version>
|
||||
|
||||
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
|
||||
</properties>
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-common</artifactId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
|
@ -18,7 +18,8 @@ public enum LogLevel {
|
||||
DEBUG(1),
|
||||
INFO(2),
|
||||
WARN(3),
|
||||
ERROR(4);
|
||||
ERROR(4),
|
||||
OFF(99);
|
||||
|
||||
private final int v;
|
||||
|
||||
|
@ -0,0 +1,35 @@
|
||||
package tech.powerjob.common.enums;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* LogType
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/10/3
|
||||
*/
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
public enum LogType {
|
||||
ONLINE(1),
|
||||
LOCAL(2),
|
||||
STDOUT(3),
|
||||
|
||||
NULL(999);
|
||||
private final Integer v;
|
||||
|
||||
public static LogType of(Integer type) {
|
||||
|
||||
if (type == null) {
|
||||
return ONLINE;
|
||||
}
|
||||
|
||||
for (LogType logType : values()) {
|
||||
if (logType.v.equals(type)) {
|
||||
return logType;
|
||||
}
|
||||
}
|
||||
return ONLINE;
|
||||
}
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
package tech.powerjob.common.model;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* 任务日志配置
|
||||
*
|
||||
* @author yhz
|
||||
* @since 2022/9/16
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
@Accessors(chain = true)
|
||||
public class LogConfig {
|
||||
/**
|
||||
* log type {@link tech.powerjob.common.enums.LogType}
|
||||
*/
|
||||
private Integer type;
|
||||
/**
|
||||
* log level {@link tech.powerjob.common.enums.LogLevel}
|
||||
*/
|
||||
private Integer level;
|
||||
|
||||
private String loggerName;
|
||||
|
||||
|
||||
}
|
@ -94,6 +94,10 @@ public class ServerScheduleJobReq implements PowerSerializable {
|
||||
*/
|
||||
private String alarmConfig;
|
||||
|
||||
/**
|
||||
* 日志配置
|
||||
*/
|
||||
private String logConfig;
|
||||
|
||||
@Override
|
||||
public String path() {
|
||||
|
@ -5,6 +5,7 @@ import tech.powerjob.common.enums.ExecuteType;
|
||||
import tech.powerjob.common.enums.ProcessorType;
|
||||
import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.model.AlarmConfig;
|
||||
import tech.powerjob.common.model.LogConfig;
|
||||
import tech.powerjob.common.model.LifeCycle;
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import lombok.Data;
|
||||
@ -139,6 +140,16 @@ public class SaveJobInfoRequest {
|
||||
*/
|
||||
private AlarmConfig alarmConfig;
|
||||
|
||||
/**
|
||||
* 任务归类,开放给接入方自由定制
|
||||
*/
|
||||
private String tag;
|
||||
|
||||
/**
|
||||
* 日志配置,包括日志级别、日志方式等配置信息
|
||||
*/
|
||||
private LogConfig logConfig;
|
||||
|
||||
|
||||
/**
|
||||
* Check non-null properties.
|
||||
|
@ -50,4 +50,6 @@ public class JobInfoQuery extends PowerQuery {
|
||||
private Date gmtModifiedGt;
|
||||
|
||||
private Integer dispatchStrategyEq;
|
||||
|
||||
private String tagEq;
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package tech.powerjob.common.response;
|
||||
|
||||
import lombok.Data;
|
||||
import tech.powerjob.common.model.AlarmConfig;
|
||||
import tech.powerjob.common.model.LogConfig;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@ -125,4 +126,15 @@ public class JobInfoDTO {
|
||||
private String lifecycle;
|
||||
|
||||
private AlarmConfig alarmConfig;
|
||||
|
||||
/**
|
||||
* 任务归类,开放给接入方自由定制
|
||||
*/
|
||||
private String tag;
|
||||
|
||||
/**
|
||||
* 日志配置,包括日志级别、日志方式等配置信息
|
||||
*/
|
||||
private LogConfig logConfig;
|
||||
|
||||
}
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-official-processors</artifactId>
|
||||
<version>1.2.1</version>
|
||||
<version>1.2.2</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
@ -20,7 +20,7 @@
|
||||
<!-- 不会被打包的部分,scope 只能是 test 或 provide -->
|
||||
<junit.version>5.6.1</junit.version>
|
||||
<logback.version>1.2.3</logback.version>
|
||||
<powerjob.worker.version>4.1.1</powerjob.worker.version>
|
||||
<powerjob.worker.version>4.2.0</powerjob.worker.version>
|
||||
<spring.jdbc.version>5.2.9.RELEASE</spring.jdbc.version>
|
||||
<h2.db.version>1.4.200</h2.db.version>
|
||||
<mysql.version>8.0.28</mysql.version>
|
||||
|
@ -0,0 +1,84 @@
|
||||
package tech.powerjob.official.processors.impl;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import tech.powerjob.common.serialize.JsonUtils;
|
||||
import tech.powerjob.official.processors.util.CommonUtils;
|
||||
import tech.powerjob.worker.core.processor.ProcessResult;
|
||||
import tech.powerjob.worker.core.processor.TaskContext;
|
||||
import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.Serializable;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* 配置处理器
|
||||
* 超简易的配置中心,用于配置的下发,需要配合秒级 + 广播任务使用!
|
||||
* 超低成本下的解决方案,强配置 or 高SLA 场景,请使用标准的配置管理中间件。
|
||||
* 外部调用方法 {@link ConfigProcessor#fetchConfig()}
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/17
|
||||
*/
|
||||
@Slf4j
|
||||
public class ConfigProcessor implements BroadcastProcessor {
|
||||
|
||||
/**
|
||||
* 获取配置
|
||||
* @return 控制台下发的配置
|
||||
*/
|
||||
public static Map<String, Object> fetchConfig() {
|
||||
if (config == null) {
|
||||
return Maps.newHashMap();
|
||||
}
|
||||
return Optional.ofNullable(config.getConfig()).orElse(Maps.newHashMap());
|
||||
}
|
||||
|
||||
private static Config config;
|
||||
|
||||
@Override
|
||||
public ProcessResult process(TaskContext context) throws Exception {
|
||||
|
||||
Config newCfg = JsonUtils.parseObject(CommonUtils.parseParams(context), Config.class);
|
||||
context.getOmsLogger().info("[ConfigProcessor] receive and update config: {}", config);
|
||||
|
||||
// 空场景不更新
|
||||
final Map<String, Object> realConfig = newCfg.config;
|
||||
if (realConfig == null) {
|
||||
return new ProcessResult(false, "CONFIG_IS_NULL");
|
||||
}
|
||||
|
||||
config = newCfg;
|
||||
|
||||
if (StringUtils.isNotEmpty(config.persistentFileName)) {
|
||||
final File file = new File(config.persistentFileName);
|
||||
|
||||
String content = JSONObject.toJSONString(realConfig);
|
||||
FileUtils.copyToFile(new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)), file);
|
||||
}
|
||||
|
||||
return new ProcessResult(true, "UPDATE_SUCCESS");
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Config implements Serializable {
|
||||
|
||||
/**
|
||||
* 原始配置
|
||||
*/
|
||||
private Map<String, Object> config;
|
||||
|
||||
/**
|
||||
* 持久到本地的全路径名称
|
||||
*/
|
||||
private String persistentFileName;
|
||||
}
|
||||
}
|
@ -13,7 +13,9 @@ import tech.powerjob.official.processors.util.CommonUtils;
|
||||
|
||||
import java.io.*;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
|
||||
@ -30,6 +32,7 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
|
||||
private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors());
|
||||
private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp");
|
||||
protected static final String SH_SHELL = "/bin/sh";
|
||||
protected static final String CMD_SHELL = "cmd.exe";
|
||||
|
||||
private static final String WORKER_DIR = System.getProperty("user.home") + "/powerjob/worker/official_script_processor/";
|
||||
|
||||
@ -55,13 +58,16 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
|
||||
}
|
||||
|
||||
// 授权
|
||||
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
|
||||
// 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁)
|
||||
chmodPb.start().waitFor();
|
||||
omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~");
|
||||
|
||||
if ( !SystemUtils.IS_OS_WINDOWS) {
|
||||
ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
|
||||
// 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁)
|
||||
chmodPb.start().waitFor();
|
||||
omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~");
|
||||
}
|
||||
// 2. 执行目标脚本
|
||||
ProcessBuilder pb = new ProcessBuilder(getRunCommand(), scriptPath);
|
||||
ProcessBuilder pb = StringUtils.equals(getRunCommand(), CMD_SHELL) ?
|
||||
new ProcessBuilder(getRunCommand(), "/c", scriptPath)
|
||||
: new ProcessBuilder(getRunCommand(), scriptPath);
|
||||
Process process = pb.start();
|
||||
|
||||
StringBuilder inputBuilder = new StringBuilder();
|
||||
@ -70,10 +76,11 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
|
||||
boolean success = true;
|
||||
String result;
|
||||
|
||||
final Charset charset = getCharset();
|
||||
try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) {
|
||||
|
||||
POOL.execute(() -> copyStream(is, inputBuilder, omsLogger));
|
||||
POOL.execute(() -> copyStream(es, errorBuilder, omsLogger));
|
||||
POOL.execute(() -> copyStream(is, inputBuilder, omsLogger, charset));
|
||||
POOL.execute(() -> copyStream(es, errorBuilder, omsLogger, charset));
|
||||
|
||||
success = process.waitFor() == 0;
|
||||
|
||||
@ -106,17 +113,27 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
// 非下载链接,为 processInfo 生成可执行文件
|
||||
try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) {
|
||||
bw.write(processorInfo);
|
||||
bw.flush();
|
||||
final Charset charset = getCharset();
|
||||
|
||||
if(charset != null)
|
||||
{
|
||||
try (Writer fstream = new OutputStreamWriter(Files.newOutputStream(script.toPath()), charset); BufferedWriter out = new BufferedWriter(fstream)) {
|
||||
out.write(processorInfo);
|
||||
out.flush();
|
||||
}
|
||||
}
|
||||
else {
|
||||
try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) {
|
||||
bw.write(processorInfo);
|
||||
bw.flush();
|
||||
}
|
||||
}
|
||||
return scriptPath;
|
||||
}
|
||||
|
||||
private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger) {
|
||||
private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) {
|
||||
String line;
|
||||
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
|
||||
try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) {
|
||||
while ((line = br.readLine()) != null) {
|
||||
sb.append(line);
|
||||
// 同步到在线日志
|
||||
@ -142,4 +159,12 @@ public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
|
||||
* @return 执行脚本的命令
|
||||
*/
|
||||
protected abstract String getRunCommand();
|
||||
|
||||
/**
|
||||
* 默认不指定
|
||||
* @return Charset
|
||||
*/
|
||||
protected Charset getCharset() {
|
||||
return StandardCharsets.UTF_8;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,27 @@
|
||||
package tech.powerjob.official.processors.impl.script;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
/**
|
||||
* CMDProcessor
|
||||
*
|
||||
* @author fddc
|
||||
* @since 2021/5/14
|
||||
*/
|
||||
public class CMDProcessor extends AbstractScriptProcessor {
|
||||
|
||||
@Override
|
||||
protected String getScriptName(Long instanceId) {
|
||||
return String.format("cmd_%d.bat", instanceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getRunCommand() {
|
||||
return "cmd.exe";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Charset getCharset() {
|
||||
return Charset.defaultCharset();
|
||||
}
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package tech.powerjob.official.processors.impl.script;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
|
||||
/**
|
||||
* PowerShellProcessor
|
||||
*
|
||||
* @author fddc
|
||||
* @since 2021/5/14
|
||||
*/
|
||||
public class PowerShellProcessor extends AbstractScriptProcessor {
|
||||
|
||||
@Override
|
||||
protected String getScriptName(Long instanceId) {
|
||||
return String.format("powershell_%d.ps1", instanceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getRunCommand() {
|
||||
return "powershell.exe";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Charset getCharset() {
|
||||
return Charset.defaultCharset();
|
||||
}
|
||||
}
|
@ -1,5 +1,6 @@
|
||||
package tech.powerjob.official.processors;
|
||||
|
||||
import tech.powerjob.common.model.LogConfig;
|
||||
import tech.powerjob.worker.core.processor.TaskContext;
|
||||
import tech.powerjob.worker.core.processor.WorkflowContext;
|
||||
import tech.powerjob.worker.log.impl.OmsLocalLogger;
|
||||
@ -24,7 +25,7 @@ public class TestUtils {
|
||||
taskContext.setJobParams(jobParams);
|
||||
taskContext.setTaskId("0.0");
|
||||
taskContext.setTaskName("TEST_TASK");
|
||||
taskContext.setOmsLogger(new OmsLocalLogger());
|
||||
taskContext.setOmsLogger(new OmsLocalLogger(new LogConfig()));
|
||||
taskContext.setWorkflowContext(new WorkflowContext(null, null));
|
||||
return taskContext;
|
||||
}
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-server</artifactId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<modules>
|
||||
@ -27,8 +27,8 @@
|
||||
|
||||
<properties>
|
||||
<swagger.version>2.9.2</swagger.version>
|
||||
<springboot.version>2.3.4.RELEASE</springboot.version>
|
||||
<powerjob.common.version>4.1.1</powerjob.common.version>
|
||||
<springboot.version>2.7.4</springboot.version>
|
||||
<powerjob.common.version>4.2.0</powerjob.common.version>
|
||||
<!-- MySQL version that corresponds to spring-boot-dependencies version. -->
|
||||
<mysql.version>8.0.28</mysql.version>
|
||||
<ojdbc.version>19.7.0.0</ojdbc.version>
|
||||
|
@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>powerjob-server</artifactId>
|
||||
<groupId>tech.powerjob</groupId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>powerjob-server</artifactId>
|
||||
<groupId>tech.powerjob</groupId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
@ -1,6 +1,7 @@
|
||||
package tech.powerjob.server.core.service;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.data.jpa.domain.Specification;
|
||||
@ -107,6 +108,10 @@ public class JobService {
|
||||
}
|
||||
jobInfoDO.setAlarmConfig(JSON.toJSONString(request.getAlarmConfig()));
|
||||
}
|
||||
// 日志配置
|
||||
if (request.getLogConfig() != null) {
|
||||
jobInfoDO.setLogConfig(JSONObject.toJSONString(request.getLogConfig()));
|
||||
}
|
||||
JobInfoDO res = jobInfoRepository.saveAndFlush(jobInfoDO);
|
||||
return res.getId();
|
||||
}
|
||||
|
@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>powerjob-server</artifactId>
|
||||
<groupId>tech.powerjob</groupId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>powerjob-server</artifactId>
|
||||
<groupId>tech.powerjob</groupId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>powerjob-server</artifactId>
|
||||
<groupId>tech.powerjob</groupId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>powerjob-server</artifactId>
|
||||
<groupId>tech.powerjob</groupId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
@ -145,4 +145,13 @@ public class JobInfoDO {
|
||||
*/
|
||||
private String alarmConfig;
|
||||
|
||||
/**
|
||||
* 任务归类,开放给接入方自由定制
|
||||
*/
|
||||
private String tag;
|
||||
|
||||
/**
|
||||
* 日志配置,包括日志级别、日志方式等配置信息
|
||||
*/
|
||||
private String logConfig;
|
||||
}
|
||||
|
@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>powerjob-server</artifactId>
|
||||
<groupId>tech.powerjob</groupId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<artifactId>powerjob-server</artifactId>
|
||||
<groupId>tech.powerjob</groupId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
@ -54,8 +54,6 @@ public class InstanceController {
|
||||
@Resource
|
||||
private CacheService cacheService;
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
@Resource
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
@GetMapping("/stop")
|
||||
|
@ -1,10 +1,13 @@
|
||||
package tech.powerjob.server.web.response;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import tech.powerjob.common.enums.ExecuteType;
|
||||
import tech.powerjob.common.enums.ProcessorType;
|
||||
import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.model.AlarmConfig;
|
||||
import tech.powerjob.common.model.LogConfig;
|
||||
import tech.powerjob.common.model.LifeCycle;
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.server.common.SJ;
|
||||
@ -14,7 +17,6 @@ import tech.powerjob.server.persistence.remote.model.JobInfoDO;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.Data;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
@ -144,6 +146,16 @@ public class JobInfoVO {
|
||||
|
||||
private AlarmConfig alarmConfig;
|
||||
|
||||
/**
|
||||
* 任务归类,开放给接入方自由定制
|
||||
*/
|
||||
private String tag;
|
||||
|
||||
/**
|
||||
* 日志配置,包括日志级别、日志方式等配置信息
|
||||
*/
|
||||
private LogConfig logConfig;
|
||||
|
||||
public static JobInfoVO from(JobInfoDO jobInfoDO) {
|
||||
JobInfoVO jobInfoVO = new JobInfoVO();
|
||||
BeanUtils.copyProperties(jobInfoDO, jobInfoVO);
|
||||
@ -168,11 +180,20 @@ public class JobInfoVO {
|
||||
|
||||
if (!StringUtils.isEmpty(jobInfoDO.getAlarmConfig())){
|
||||
jobInfoVO.setAlarmConfig(JSON.parseObject(jobInfoDO.getAlarmConfig(),AlarmConfig.class));
|
||||
} else {
|
||||
jobInfoVO.setAlarmConfig(new AlarmConfig());
|
||||
}
|
||||
if (!StringUtils.isEmpty(jobInfoDO.getLifecycle())){
|
||||
jobInfoVO.setLifeCycle(LifeCycle.parse(jobInfoDO.getLifecycle()));
|
||||
}
|
||||
|
||||
if (!StringUtils.isEmpty(jobInfoDO.getLogConfig())) {
|
||||
jobInfoVO.setLogConfig(JSONObject.parseObject(jobInfoDO.getLogConfig(), LogConfig.class));
|
||||
} else {
|
||||
// 不存在 job 配置时防止前端报错
|
||||
jobInfoVO.setLogConfig(new LogConfig());
|
||||
}
|
||||
|
||||
return jobInfoVO;
|
||||
}
|
||||
}
|
||||
|
@ -13,6 +13,9 @@ spring.servlet.multipart.file-size-threshold=0
|
||||
spring.servlet.multipart.max-file-size=209715200
|
||||
spring.servlet.multipart.max-request-size=209715200
|
||||
|
||||
# temporary skip circular references check
|
||||
spring.main.allow-circular-references=true
|
||||
|
||||
###### PowerJob self-owned configuration (The following properties should exist in application.properties only). ######
|
||||
# Akka ActorSystem port.
|
||||
oms.akka.port=10086
|
||||
|
@ -12,3 +12,4 @@ ${AnsiColor.BRIGHT_RED}
|
||||
* OfficialWebsite: http://www.powerjob.tech/
|
||||
* SourceCode: https://github.com/PowerJob/PowerJob
|
||||
* PoweredBy: SpringBoot${spring-boot.formatted-version} & Akka (v2.6.12) & Vert.x (v4.0.2)
|
||||
${AnsiColor.DEFAULT}
|
@ -6,7 +6,7 @@
|
||||
<meta name="viewport" content="width=device-width,initial-scale=1.0">
|
||||
<link rel="icon" href="/favicon.ico">
|
||||
<title>PowerJob</title>
|
||||
<link href="/js/0.js" rel="prefetch"><link href="/js/1.js" rel="prefetch"><link href="/js/10.js" rel="prefetch"><link href="/js/11.js" rel="prefetch"><link href="/js/12.js" rel="prefetch"><link href="/js/2.js" rel="prefetch"><link href="/js/3.js" rel="prefetch"><link href="/js/4.js" rel="prefetch"><link href="/js/5.js" rel="prefetch"><link href="/js/6.js" rel="prefetch"><link href="/js/7.js" rel="prefetch"><link href="/js/8.js" rel="prefetch"><link href="/js/9.js" rel="prefetch"><link href="/js/app.js" rel="preload" as="script"><link href="/js/chunk-vendors.js" rel="preload" as="script"></head>
|
||||
<link href="/js/0.js" rel="prefetch"><link href="/js/1.js" rel="prefetch"><link href="/js/10.js" rel="prefetch"><link href="/js/11.js" rel="prefetch"><link href="/js/2.js" rel="prefetch"><link href="/js/3.js" rel="prefetch"><link href="/js/4.js" rel="prefetch"><link href="/js/5.js" rel="prefetch"><link href="/js/6.js" rel="prefetch"><link href="/js/7.js" rel="prefetch"><link href="/js/8.js" rel="prefetch"><link href="/js/9.js" rel="prefetch"><link href="/js/app.js" rel="preload" as="script"><link href="/js/chunk-vendors.js" rel="preload" as="script"></head>
|
||||
<body>
|
||||
<noscript>
|
||||
<strong>We're sorry but oms-console doesn't work properly without JavaScript enabled. Please enable it to continue.</strong>
|
||||
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -10,18 +10,18 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-worker-agent</artifactId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<properties>
|
||||
<powerjob.worker.version>4.1.1</powerjob.worker.version>
|
||||
<powerjob.worker.version>4.2.0</powerjob.worker.version>
|
||||
<logback.version>1.2.9</logback.version>
|
||||
<picocli.version>4.3.2</picocli.version>
|
||||
|
||||
<spring.boot.version>2.3.4.RELEASE</spring.boot.version>
|
||||
|
||||
<powerjob.official.processors.version>1.2.1</powerjob.official.processors.version>
|
||||
<powerjob.official.processors.version>1.2.2</powerjob.official.processors.version>
|
||||
|
||||
<!-- dependency for dynamic sql processor -->
|
||||
<mysql.version>8.0.28</mysql.version>
|
||||
|
@ -36,6 +36,9 @@ public class MainApplication implements Runnable {
|
||||
@Option(names = {"-l", "--length"}, description = "ProcessResult#msg max length")
|
||||
private int length = 1024;
|
||||
|
||||
@Option(names = {"-t", "--tag"}, description = "worker-agent's tag")
|
||||
private String tag;
|
||||
|
||||
public static void main(String[] args) {
|
||||
CommandLine commandLine = new CommandLine(new MainApplication());
|
||||
commandLine.execute(args);
|
||||
@ -52,6 +55,7 @@ public class MainApplication implements Runnable {
|
||||
cfg.setServerAddress(Splitter.on(",").splitToList(server));
|
||||
cfg.setStoreStrategy(StoreStrategy.MEMORY.name().equals(storeStrategy) ? StoreStrategy.MEMORY : StoreStrategy.DISK);
|
||||
cfg.setMaxResultLength(length);
|
||||
cfg.setTag(tag);
|
||||
|
||||
PowerJobWorker worker = new PowerJobWorker();
|
||||
worker.setConfig(cfg);
|
||||
|
@ -10,11 +10,11 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>powerjob-worker-samples</artifactId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
|
||||
<properties>
|
||||
<springboot.version>2.3.4.RELEASE</springboot.version>
|
||||
<powerjob.worker.starter.version>4.1.1</powerjob.worker.starter.version>
|
||||
<springboot.version>2.7.4</springboot.version>
|
||||
<powerjob.worker.starter.version>4.2.0</powerjob.worker.starter.version>
|
||||
<fastjson.version>1.2.83</fastjson.version>
|
||||
<powerjob.official.processors.version>1.2.1</powerjob.official.processors.version>
|
||||
|
||||
|
@ -87,7 +87,7 @@ public class MapProcessorDemo implements MapProcessor {
|
||||
@Getter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
private static class SubTask {
|
||||
public static class SubTask {
|
||||
private Integer siteId;
|
||||
private List<Integer> itemIds;
|
||||
}
|
||||
|
@ -87,7 +87,7 @@ public class MapReduceProcessorDemo implements MapReduceProcessor {
|
||||
@ToString
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
private static class TestSubTask {
|
||||
public static class TestSubTask {
|
||||
private String name;
|
||||
private int age;
|
||||
}
|
||||
|
@ -0,0 +1,41 @@
|
||||
package tech.powerjob.samples.processors.test;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.springframework.stereotype.Component;
|
||||
import tech.powerjob.official.processors.util.CommonUtils;
|
||||
import tech.powerjob.worker.core.processor.ProcessResult;
|
||||
import tech.powerjob.worker.core.processor.TaskContext;
|
||||
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
|
||||
import tech.powerjob.worker.log.OmsLogger;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* LogTestProcessor
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/18
|
||||
*/
|
||||
@Component
|
||||
public class LogTestProcessor implements BasicProcessor {
|
||||
|
||||
@Override
|
||||
public ProcessResult process(TaskContext context) throws Exception {
|
||||
|
||||
final OmsLogger omsLogger = context.getOmsLogger();
|
||||
final String parseParams = CommonUtils.parseParams(context);
|
||||
final JSONObject config = Optional.ofNullable(JSONObject.parseObject(parseParams)).orElse(new JSONObject());
|
||||
|
||||
final long loopTimes = Optional.ofNullable(config.getLong("loopTimes")).orElse(1000L);
|
||||
|
||||
for (int i = 0; i < loopTimes; i++) {
|
||||
omsLogger.debug("[DEBUG] one DEBUG log in {}", new Date());
|
||||
omsLogger.info("[INFO] one INFO log in {}", new Date());
|
||||
omsLogger.warn("[WARN] one WARN log in {}", new Date());
|
||||
omsLogger.error("[ERROR] one ERROR log in {}", new Date());
|
||||
}
|
||||
|
||||
return new ProcessResult(true);
|
||||
}
|
||||
}
|
@ -10,12 +10,12 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<powerjob.worker.version>4.1.1</powerjob.worker.version>
|
||||
<springboot.version>2.3.4.RELEASE</springboot.version>
|
||||
<powerjob.worker.version>4.2.0</powerjob.worker.version>
|
||||
<springboot.version>2.7.4</springboot.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
@ -10,12 +10,12 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-worker</artifactId>
|
||||
<version>4.1.1</version>
|
||||
<version>4.2.0</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<spring.version>5.2.4.RELEASE</spring.version>
|
||||
<powerjob.common.version>4.1.1</powerjob.common.version>
|
||||
<spring.version>5.3.23</spring.version>
|
||||
<powerjob.common.version>4.2.0</powerjob.common.version>
|
||||
<h2.db.version>1.4.200</h2.db.version>
|
||||
<hikaricp.version>3.4.2</hikaricp.version>
|
||||
<junit.version>5.6.1</junit.version>
|
||||
|
@ -35,7 +35,7 @@ public class OmsLogHandler {
|
||||
// 上报锁,只需要一个线程上报即可
|
||||
private final Lock reportLock = new ReentrantLock();
|
||||
// 生产者消费者模式,异步上传日志
|
||||
private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue();
|
||||
private final BlockingQueue<InstanceLogContent> logQueue = Queues.newLinkedBlockingQueue(10240);
|
||||
|
||||
// 每次上报携带的数据条数
|
||||
private static final int BATCH_SIZE = 20;
|
||||
@ -61,7 +61,10 @@ public class OmsLogHandler {
|
||||
}
|
||||
|
||||
InstanceLogContent tuple = new InstanceLogContent(instanceId, System.currentTimeMillis(), logLevel.getV(), logContent);
|
||||
logQueue.offer(tuple);
|
||||
boolean offerRet = logQueue.offer(tuple);
|
||||
if (!offerRet) {
|
||||
log.warn("[OmsLogHandler] [{}] submit log failed, maybe your log speed is too fast!", instanceId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -87,7 +87,7 @@ public class ServerDiscoveryService {
|
||||
}
|
||||
|
||||
if (StringUtils.isEmpty(result)) {
|
||||
log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined.");
|
||||
log.warn("[PowerDiscovery] can't find any available server, this worker[appId={}] has been quarantined.", appId);
|
||||
|
||||
// 在 Server 高可用的前提下,连续失败多次,说明该节点与外界失联,Server已经将秒级任务转移到其他Worker,需要杀死本地的任务
|
||||
if (FAILED_COUNT++ > MAX_FAILED_COUNT) {
|
||||
@ -108,7 +108,7 @@ public class ServerDiscoveryService {
|
||||
} else {
|
||||
// 重置失败次数
|
||||
FAILED_COUNT = 0;
|
||||
log.debug("[PowerDiscovery] current server is {}.", result);
|
||||
log.debug("[PowerDiscovery] appId={}, current server is {}.", appId, result);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -11,6 +11,8 @@ import tech.powerjob.common.RemoteConstant;
|
||||
import tech.powerjob.common.enums.ExecuteType;
|
||||
import tech.powerjob.common.enums.ProcessorType;
|
||||
import tech.powerjob.common.enums.TimeExpressionType;
|
||||
import tech.powerjob.common.model.LogConfig;
|
||||
import tech.powerjob.common.serialize.JsonUtils;
|
||||
import tech.powerjob.common.utils.CommonUtils;
|
||||
import tech.powerjob.worker.common.WorkerRuntime;
|
||||
import tech.powerjob.worker.common.constants.TaskStatus;
|
||||
@ -22,6 +24,7 @@ import tech.powerjob.worker.core.ProcessorBeanFactory;
|
||||
import tech.powerjob.worker.core.executor.ProcessorRunnable;
|
||||
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
|
||||
import tech.powerjob.worker.log.OmsLogger;
|
||||
import tech.powerjob.worker.log.OmsLoggerFactory;
|
||||
import tech.powerjob.worker.log.impl.OmsServerLogger;
|
||||
import tech.powerjob.worker.persistence.TaskDO;
|
||||
import tech.powerjob.worker.pojo.model.InstanceInfo;
|
||||
@ -116,7 +119,7 @@ public class ProcessorTracker {
|
||||
String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(taskTrackerAddress, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
|
||||
this.taskTrackerActorRef = workerRuntime.getActorSystem().actorSelection(akkaRemotePath);
|
||||
|
||||
this.omsLogger = new OmsServerLogger(instanceId, workerRuntime.getOmsLogHandler());
|
||||
this.omsLogger = OmsLoggerFactory.build(instanceId, request.getLogConfig(), workerRuntime);
|
||||
this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
|
||||
this.lastIdleTime = -1L;
|
||||
this.lastCompletedTaskCount = 0L;
|
||||
|
@ -0,0 +1,44 @@
|
||||
package tech.powerjob.worker.log;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import tech.powerjob.common.enums.LogType;
|
||||
import tech.powerjob.common.model.LogConfig;
|
||||
import tech.powerjob.common.serialize.JsonUtils;
|
||||
import tech.powerjob.worker.common.WorkerRuntime;
|
||||
import tech.powerjob.worker.log.impl.OmsLocalLogger;
|
||||
import tech.powerjob.worker.log.impl.OmsNullLogger;
|
||||
import tech.powerjob.worker.log.impl.OmsServerLogger;
|
||||
import tech.powerjob.worker.log.impl.OmsStdOutLogger;
|
||||
|
||||
/**
|
||||
* OmsLoggerFactory
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/17
|
||||
*/
|
||||
public class OmsLoggerFactory {
|
||||
|
||||
public static OmsLogger build(Long instanceId, String logConfig, WorkerRuntime workerRuntime) {
|
||||
LogConfig cfg;
|
||||
if (StringUtils.isEmpty(logConfig)) {
|
||||
cfg = new LogConfig();
|
||||
} else {
|
||||
try {
|
||||
cfg = JsonUtils.parseObject(logConfig, LogConfig.class);
|
||||
} catch (Exception ignore) {
|
||||
cfg = new LogConfig();
|
||||
}
|
||||
}
|
||||
|
||||
switch (LogType.of(cfg.getType())) {
|
||||
case LOCAL:
|
||||
return new OmsLocalLogger(cfg);
|
||||
case STDOUT:
|
||||
return new OmsStdOutLogger(cfg);
|
||||
case NULL:
|
||||
return new OmsNullLogger();
|
||||
default:
|
||||
return new OmsServerLogger(cfg, instanceId, workerRuntime.getOmsLogHandler());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,89 @@
|
||||
package tech.powerjob.worker.log.impl;
|
||||
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.slf4j.helpers.FormattingTuple;
|
||||
import org.slf4j.helpers.MessageFormatter;
|
||||
import tech.powerjob.common.enums.LogLevel;
|
||||
import tech.powerjob.common.enums.LogType;
|
||||
import tech.powerjob.common.model.LogConfig;
|
||||
import tech.powerjob.worker.log.OmsLogger;
|
||||
|
||||
/**
|
||||
* AbstractOmsLogger
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/9/16
|
||||
*/
|
||||
public abstract class AbstractOmsLogger implements OmsLogger {
|
||||
|
||||
private final LogConfig logConfig;
|
||||
|
||||
public AbstractOmsLogger(LogConfig logConfig) {
|
||||
this.logConfig = logConfig;
|
||||
|
||||
// 兼容空数据场景,添加默认值,尽量与原有逻辑保持兼容
|
||||
if (logConfig.getLevel() == null) {
|
||||
logConfig.setLevel(LogLevel.INFO.getV());
|
||||
}
|
||||
if (logConfig.getType() == null) {
|
||||
logConfig.setType(LogType.ONLINE.getV());
|
||||
}
|
||||
}
|
||||
|
||||
abstract void debug0(String messagePattern, Object... args);
|
||||
|
||||
abstract void info0(String messagePattern, Object... args);
|
||||
|
||||
abstract void warn0(String messagePattern, Object... args);
|
||||
|
||||
abstract void error0(String messagePattern, Object... args);
|
||||
|
||||
@Override
|
||||
public void debug(String messagePattern, Object... args) {
|
||||
if (LogLevel.DEBUG.getV() < logConfig.getLevel()) {
|
||||
return;
|
||||
}
|
||||
debug0(messagePattern, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void info(String messagePattern, Object... args) {
|
||||
if (LogLevel.INFO.getV() < logConfig.getLevel()) {
|
||||
return;
|
||||
}
|
||||
info0(messagePattern, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void warn(String messagePattern, Object... args) {
|
||||
if (LogLevel.WARN.getV() < logConfig.getLevel()) {
|
||||
return;
|
||||
}
|
||||
warn0(messagePattern, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(String messagePattern, Object... args) {
|
||||
if (LogLevel.ERROR.getV() < logConfig.getLevel()) {
|
||||
return;
|
||||
}
|
||||
error0(messagePattern, args);
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成日志内容
|
||||
* @param messagePattern 日志格式
|
||||
* @param arg 填充参数
|
||||
* @return 生成完毕的日志内容
|
||||
*/
|
||||
protected static String genLogContent(String messagePattern, Object... arg) {
|
||||
// 借用 Slf4J 直接生成日志信息
|
||||
FormattingTuple formattingTuple = MessageFormatter.arrayFormat(messagePattern, arg);
|
||||
if (formattingTuple.getThrowable() != null) {
|
||||
String stackTrace = ExceptionUtils.getStackTrace(formattingTuple.getThrowable());
|
||||
return formattingTuple.getMessage() + System.lineSeparator() + stackTrace;
|
||||
}else {
|
||||
return formattingTuple.getMessage();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,33 +1,46 @@
|
||||
package tech.powerjob.worker.log.impl;
|
||||
|
||||
import tech.powerjob.worker.log.OmsLogger;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import tech.powerjob.common.model.LogConfig;
|
||||
|
||||
/**
|
||||
* for local test
|
||||
* Many user feedback when the task volume server timeout serious. After pressure testing, we found that there is no bottleneck in the server processing scheduling tasks, and it is assumed that the large amount of logs is causing a serious bottleneck. Therefore, we need to provide local logging API for large MR tasks.
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2021/2/4
|
||||
*/
|
||||
@Slf4j
|
||||
public class OmsLocalLogger implements OmsLogger {
|
||||
@Override
|
||||
public void debug(String messagePattern, Object... args) {
|
||||
log.debug(messagePattern, args);
|
||||
public class OmsLocalLogger extends AbstractOmsLogger {
|
||||
|
||||
private final Logger LOGGER;
|
||||
|
||||
private static final String DEFAULT_LOGGER_NAME = OmsLocalLogger.class.getName();
|
||||
|
||||
public OmsLocalLogger(LogConfig logConfig) {
|
||||
super(logConfig);
|
||||
|
||||
String loggerName = StringUtils.isEmpty(logConfig.getLoggerName()) ? DEFAULT_LOGGER_NAME : logConfig.getLoggerName();
|
||||
LOGGER = LoggerFactory.getLogger(loggerName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void info(String messagePattern, Object... args) {
|
||||
log.info(messagePattern, args);
|
||||
public void debug0(String messagePattern, Object... args) {
|
||||
LOGGER.debug(messagePattern, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void warn(String messagePattern, Object... args) {
|
||||
log.warn(messagePattern, args);
|
||||
public void info0(String messagePattern, Object... args) {
|
||||
LOGGER.info(messagePattern, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(String messagePattern, Object... args) {
|
||||
log.error(messagePattern, args);
|
||||
public void warn0(String messagePattern, Object... args) {
|
||||
LOGGER.warn(messagePattern, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error0(String messagePattern, Object... args) {
|
||||
LOGGER.error(messagePattern, args);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,28 @@
|
||||
package tech.powerjob.worker.log.impl;
|
||||
|
||||
import tech.powerjob.worker.log.OmsLogger;
|
||||
|
||||
/**
|
||||
* DO NOTHING
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/10/3
|
||||
*/
|
||||
public class OmsNullLogger implements OmsLogger {
|
||||
|
||||
@Override
|
||||
public void debug(String messagePattern, Object... args) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void info(String messagePattern, Object... args) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void warn(String messagePattern, Object... args) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(String messagePattern, Object... args) {
|
||||
}
|
||||
}
|
@ -1,63 +1,49 @@
|
||||
package tech.powerjob.worker.log.impl;
|
||||
|
||||
import tech.powerjob.common.enums.LogLevel;
|
||||
import tech.powerjob.common.model.LogConfig;
|
||||
import tech.powerjob.worker.background.OmsLogHandler;
|
||||
import tech.powerjob.worker.log.OmsLogger;
|
||||
import lombok.AllArgsConstructor;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.slf4j.helpers.FormattingTuple;
|
||||
import org.slf4j.helpers.MessageFormatter;
|
||||
|
||||
|
||||
/**
|
||||
* PowerJob 在线日志,直接上报到 Server,可在控制台直接查看
|
||||
* WARN:Please do not use this logger to print large amounts of logs! <br/>
|
||||
* WARN:Please do not use this logger to print large amounts of logs! <br/>
|
||||
* WARN:Please do not use this logger to print large amounts of logs! <br/>
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/21
|
||||
* @since 2022/9/16
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class OmsServerLogger implements OmsLogger {
|
||||
public class OmsServerLogger extends AbstractOmsLogger {
|
||||
|
||||
private final long instanceId;
|
||||
private final OmsLogHandler omsLogHandler;
|
||||
|
||||
public OmsServerLogger(LogConfig logConfig, long instanceId, OmsLogHandler omsLogHandler) {
|
||||
super(logConfig);
|
||||
this.instanceId = instanceId;
|
||||
this.omsLogHandler = omsLogHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void debug(String messagePattern, Object... args) {
|
||||
public void debug0(String messagePattern, Object... args) {
|
||||
process(LogLevel.DEBUG, messagePattern, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void info(String messagePattern, Object... args) {
|
||||
public void info0(String messagePattern, Object... args) {
|
||||
process(LogLevel.INFO, messagePattern, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void warn(String messagePattern, Object... args) {
|
||||
public void warn0(String messagePattern, Object... args) {
|
||||
process(LogLevel.WARN, messagePattern, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(String messagePattern, Object... args) {
|
||||
public void error0(String messagePattern, Object... args) {
|
||||
process(LogLevel.ERROR, messagePattern, args);
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成日志内容
|
||||
* @param messagePattern 日志格式
|
||||
* @param arg 填充参数
|
||||
* @return 生成完毕的日志内容
|
||||
*/
|
||||
private static String genLogContent(String messagePattern, Object... arg) {
|
||||
// 借用 Slf4J 直接生成日志信息
|
||||
FormattingTuple formattingTuple = MessageFormatter.arrayFormat(messagePattern, arg);
|
||||
if (formattingTuple.getThrowable() != null) {
|
||||
String stackTrace = ExceptionUtils.getStackTrace(formattingTuple.getThrowable());
|
||||
return formattingTuple.getMessage() + System.lineSeparator() + stackTrace;
|
||||
}else {
|
||||
return formattingTuple.getMessage();
|
||||
}
|
||||
}
|
||||
|
||||
private void process(LogLevel level, String messagePattern, Object... args) {
|
||||
String logContent = genLogContent(messagePattern, args);
|
||||
omsLogHandler.submitLog(instanceId, level, logContent);
|
||||
|
@ -0,0 +1,43 @@
|
||||
package tech.powerjob.worker.log.impl;
|
||||
|
||||
import tech.powerjob.common.enums.LogLevel;
|
||||
import tech.powerjob.common.model.LogConfig;
|
||||
|
||||
/**
|
||||
* use java.lang.System#out or java.lang.System#err to print log info
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2022/10/3
|
||||
*/
|
||||
public class OmsStdOutLogger extends AbstractOmsLogger {
|
||||
|
||||
private static final String PREFIX = "[PowerJob] [%s] ";
|
||||
|
||||
public OmsStdOutLogger(LogConfig logConfig) {
|
||||
super(logConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
void debug0(String messagePattern, Object... args) {
|
||||
System.out.println(buildStdOut(LogLevel.DEBUG, messagePattern, args));
|
||||
}
|
||||
|
||||
@Override
|
||||
void info0(String messagePattern, Object... args) {
|
||||
System.out.println(buildStdOut(LogLevel.INFO, messagePattern, args));
|
||||
}
|
||||
|
||||
@Override
|
||||
void warn0(String messagePattern, Object... args) {
|
||||
System.out.println(buildStdOut(LogLevel.WARN, messagePattern, args));
|
||||
}
|
||||
|
||||
@Override
|
||||
void error0(String messagePattern, Object... args) {
|
||||
System.err.println(buildStdOut(LogLevel.ERROR, messagePattern, args));
|
||||
}
|
||||
|
||||
private static String buildStdOut(LogLevel logLevel, String messagePattern, Object... args) {
|
||||
return String.format(PREFIX, logLevel.name()).concat(genLogContent(messagePattern, args));
|
||||
}
|
||||
}
|
@ -51,4 +51,6 @@ public class InstanceInfo implements Serializable {
|
||||
private int threadConcurrency;
|
||||
// 子任务重试次数(任务本身的重试机制由server控制)
|
||||
private int taskRetryNum;
|
||||
|
||||
private String logConfig;
|
||||
}
|
||||
|
@ -32,6 +32,8 @@ public class TaskTrackerStartTaskReq implements PowerSerializable {
|
||||
// 秒级任务专用
|
||||
private long subInstanceId;
|
||||
|
||||
private String logConfig;
|
||||
|
||||
|
||||
/**
|
||||
* 创建 TaskTrackerStartTaskReq,该构造方法必须在 TaskTracker 节点调用
|
||||
@ -47,5 +49,7 @@ public class TaskTrackerStartTaskReq implements PowerSerializable {
|
||||
|
||||
this.taskCurrentRetryNums = task.getFailedCnt();
|
||||
this.subInstanceId = task.getSubInstanceId();
|
||||
|
||||
this.logConfig = instanceInfo.getLogConfig();
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user