mirror of
https://github.com/PowerJob/PowerJob.git
synced 2025-07-17 00:00:04 +08:00
[release] v3.4.1
This commit is contained in:
commit
11054e9761
@ -1,8 +1,8 @@
|
||||
<p align="center">
|
||||
<p style="text-align: center">
|
||||
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/logo.png" alt="PowerJob" title="PowerJob" width="557"/>
|
||||
</p>
|
||||
|
||||
<p align="center">
|
||||
<p style="text-align: center">
|
||||
<a href="https://github.com/KFCFans/PowerJob/actions"><img src="https://github.com/KFCFans/PowerJob/workflows/Java%20CI%20with%20Maven/badge.svg?branch=master" alt="actions"></a>
|
||||
<a href="https://search.maven.org/search?q=com.github.kfcfans"><img alt="Maven Central" src="https://img.shields.io/maven-central/v/com.github.kfcfans/powerjob-worker"></a>
|
||||
<a href="https://github.com/KFCFans/PowerJob/releases"><img alt="GitHub release (latest SemVer)" src="https://img.shields.io/github/v/release/kfcfans/powerjob?color=%23E59866"></a>
|
||||
@ -14,50 +14,70 @@
|
||||
- Have you ever felt helpless when batches of business tasks require handling?
|
||||
- Have you ever felt depressed about tasks that carry with complex dependencies?
|
||||
|
||||
Well, PowerJob is there for you, it is the choice of a new generation.It is a powerful, business-oriented scheduling framework that provides distributed computing ability.Based on Akka architecture, it makes everything with scheduling easier.Just with several steps, PowerJob could be deployed and work for you!
|
||||
Well, PowerJob is there for you, it is the choice of a new generation. It is a powerful, business-oriented scheduling framework that provides distributed computing ability. Based on Akka architecture, it makes everything with scheduling easier. Just with several steps, PowerJob could be deployed and work for you!
|
||||
|
||||
# Introduction
|
||||
|
||||
### Features
|
||||
- Simple to use: PowerJob provides a friendly front-end Web that allows developers to visually manage tasks (Create, Read, Update and Delete), monitor task status, and view operation logs online.
|
||||
- Simple to use: PowerJob provides a friendly front-end Web that allows developers to visually manage tasks (Create, Read, Update and Delete), monitor tasks, and view logs online.
|
||||
- Complete timing strategy: PowerJob supports four different scheduling strategies, including CRON expression, fixed frequency timing, fixed delay timing as well as the Open API.
|
||||
- Various execution modes: PowerJob supports four execution modes: stand-alone, broadcast, Map, and MapReduce. It's worth mentioning the Map and MapReduce modes. With the completion of several lines of codes, developers could take full advantage of PowerJob's distributed computing ability.
|
||||
- Complete workflow support. PowerJob supports DAG(Directed acyclic graph) based online task configuration. Developers could arrange tasks on the console, while data could be transferred between tasks on the flow.
|
||||
- Various execution modes: PowerJob supports four execution modes: stand-alone, broadcast, Map, and MapReduce. It's worth mentioning the Map and MapReduce modes. With several lines of codes, developers could take full advantage of PowerJob's distributed computing ability.
|
||||
- Complete workflow support. PowerJob supports DAG(Directed acyclic graph) based online task configuration. Developers could arrange tasks on the console, while data could be transferred among tasks on the flow.
|
||||
- Extensive executor support: PowerJob supports multiple processors, including Spring Beans, ordinary Java objects, Shell, Python and so on.
|
||||
- Simple in dependency: PowerJob aims to be simple in dependency. The only dependency is merely database (MySQL / Oracle / MS SQLServer ...), with MongoDB being the extra dependency for storing huge online logs.
|
||||
- High availability and performance: Unlike traditional job-scheduling frameworks which rely on database locks, PowerJob server is lock-free when scheduling. PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance just by deploying as many PowerJob server instances as you need.
|
||||
- Simple in dependency: PowerJob aims to be simple in dependency. The only dependency is merely database (MySQL / Oracle / MS SQLServer ...), with MongoDB being the extra dependency for storing large log files online.
|
||||
- High availability and performance: Unlike traditional job-scheduling frameworks that rely on database locks, PowerJob server is lock-free. PowerJob supports unlimited horizontal expansion. It's easy to achieve high availability and performance by deploying as many PowerJob server instances as you need.
|
||||
- Quick failover and recovery support: Whenever any task failed, PowerJob server would retry according to the configured strategy. As long as there were enough nodes in the cluster, the failed tasks could execute successfully finally.
|
||||
- Convenient to run and maintain: PowerJob supports online logging. Logs generated by the worker would be transferred and displayed on the console instantly, therefore reducing the cost of debugging and improving the efficiency for developers significantly.
|
||||
- Convenient to run and maintain: PowerJob supports online logging. Logs generated by the worker would be transferred and displayed on the console instantly, therefore reducing the cost of debugging and improving the efficiency significantly.
|
||||
|
||||
### Applicable scene
|
||||
### Applicable scenes
|
||||
|
||||
- Scenarios with timed tasks: such as full synchronization of data at midnight, generating business reports at desired time.
|
||||
- Scenarios that require all machines to run tasks simultaneously: such as log cleanup.
|
||||
- Scenarios that require distributed processing: For example, a large amount of data requires updating, while the stand-alone execution takes quite a lot of time. The Map/MapReduce mode could be applied while the workers would join the cluster for PowerJob server to dispatch, to speed up the time-consuming process, therefore improving the computing ablility of whole cluster.
|
||||
- Scenarios that require distributed processing: For example, a large amount of data requires updating, while the stand-alone execution takes quite a lot of time. The Map/MapReduce mode could be applied in which the workers would join the cluster for PowerJob server to dispatch, to speed up the time-consuming process, therefore improving the computing ability of the whole cluster.
|
||||
- Scenarios with delayed tasks: For instance, disposal of overdue orders.
|
||||
|
||||
### Comparison of similar products
|
||||
### Design goals
|
||||
|
||||
PowerJob aims to be an enterprise scheduling middleware. By deploying PowerJob-server as the scheduling center,
|
||||
all the applications could gain scheduling and distributed computing ability relying on PowerJob-worker.
|
||||
|
||||
### Online trial
|
||||
|
||||
Trial address: [Online Trial Address](http://try.powerjob.tech/)
|
||||
Application name: powerjob-agent-test
|
||||
Application password: 123
|
||||
|
||||
### Comparison with similar products
|
||||
|
||||
| | QuartZ | xxl-job | SchedulerX 2.0 | PowerJob |
|
||||
| ---------------------------------- | --------------------------------------------------------- | --------------------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
|
||||
| Timing type | CRON | CRON | CRON, fixed frequency, fixed delay, OpenAPI | **CRON, fixed frequency, fixed delay, OpenAPI** |
|
||||
| Task type | Built-in Java | Built-in Java, GLUE Java, Shell, Python and other scripts | Built-in Java, external Java (FatJar), Shell, Python and other scripts | **Built-in Java, external Java (container), Shell, Python and other scripts** |
|
||||
| Distributed strategy | Unsupported | Static sharding | MapReduce dynamic sharding | **MapReduce dynamic sharding** |
|
||||
| Online task management | Unsupported | Supported | Supported | **Supported** |
|
||||
| Online task management | Unsupported | Supported | Supported | **Supported** |
|
||||
| Online logging | Unsupported | Supported | Unsupported | **Supported** |
|
||||
| Scheduling methods and performance | Based on database lock, there is a performance bottleneck | Based on database lock, there is a performance bottleneck | Unknown | **Lock-free design, powerful performance without upper limit** |
|
||||
| Alarm monitoring | Unsupported | Email | SMS | **Email, WebHook, Dingtalk. An interface is provided for customization.** |
|
||||
| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | RMB (free during public beta, hey, help to advertise) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
|
||||
| workflow | Unsupported | Unsupported | Supported | **Supported** |
|
||||
| Alarm monitoring | Unsupported | Email | SMS | **Email, WebHook, DingTalk. An interface is provided for customization.** |
|
||||
| System dependence | Any relational database (MySQL, Oracle ...) supported by JDBC | MySQL | RMB (Public Beta version for free, hey, helping to promote) | **Any relational database (MySQL, Oracle ...) supported by Spring Data Jpa** |
|
||||
| workflow | Unsupported | Unsupported | Supported | **Supported** |
|
||||
|
||||
# Document
|
||||
**[GitHub Wiki](https://github.com/KFCFans/PowerJob/wiki)**
|
||||
|
||||
**[中文文档](https://www.yuque.com/powerjob/product)**
|
||||
|
||||
# User Registration
|
||||
[Click to register as PowerJob user and contribute to PowerJob!](https://github.com/KFCFans/PowerJob/issues/6)
|
||||
ღ( ´・ᴗ・\` )ღ Many thanks to the following registered users. ღ( ´・ᴗ・\` )ღ
|
||||
<p style="text-align: center">
|
||||
<img src="https://raw.githubusercontent.com/KFCFans/PowerJob/master/others/images/user.png" alt="PowerJob User" title="PowerJob User"/>
|
||||
</p>
|
||||
|
||||
|
||||
# Others
|
||||
|
||||
- PowerJob is permanently open source software(Apache License, Version 2.0), please feel free to try, use or deploy!
|
||||
- Owner of PowerJob (@KFCFans) has abundant time for maintenance, and is willing to provide technical support if you have needs!
|
||||
- PowerJob is permanently open source software(Apache License, Version 2.0), please feel free to try, deploy and put into production!
|
||||
- Author of PowerJob (@KFCFans) has abundant time for maintenance, and is willing to provide technical support if you have needs!
|
||||
- Welcome to contribute to PowerJob, both Pull Requests and Issues are precious.
|
||||
- Please STAR PowerJob if it is valuable. ~ =  ̄ω ̄ =
|
||||
- Do you need any help or want to propose suggestions? Please raise Github issues or contact the Author @KFCFans-> `tengjiqi@gmail.com` directly.
|
@ -10,13 +10,13 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-client</artifactId>
|
||||
<version>3.4.0</version>
|
||||
<version>3.4.1</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<junit.version>5.6.1</junit.version>
|
||||
<fastjson.version>1.2.68</fastjson.version>
|
||||
<powerjob.common.version>3.4.0</powerjob.common.version>
|
||||
<powerjob.common.version>3.4.1</powerjob.common.version>
|
||||
|
||||
<mvn.shade.plugin.version>3.2.4</mvn.shade.plugin.version>
|
||||
</properties>
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-common</artifactId>
|
||||
<version>3.4.0</version>
|
||||
<version>3.4.1</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
|
@ -24,8 +24,8 @@ public enum InstanceStatus {
|
||||
CANCELED(9, "取消"),
|
||||
STOPPED(10, "手动停止");
|
||||
|
||||
private int v;
|
||||
private String des;
|
||||
private final int v;
|
||||
private final String des;
|
||||
|
||||
// 广义的运行状态
|
||||
public static final List<Integer> generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v);
|
||||
|
@ -3,7 +3,7 @@ package com.github.kfcfans.powerjob.common;
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* OMS 序列化标记接口
|
||||
* PowerJob 序列化标记接口
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/16
|
||||
|
@ -10,13 +10,13 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-server</artifactId>
|
||||
<version>3.4.0</version>
|
||||
<version>3.4.1</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<swagger.version>2.9.2</swagger.version>
|
||||
<springboot.version>2.3.4.RELEASE</springboot.version>
|
||||
<powerjob.common.version>3.4.0</powerjob.common.version>
|
||||
<powerjob.common.version>3.4.1</powerjob.common.version>
|
||||
<!-- 数据库驱动版本,使用的是spring-boot-dependencies管理的版本 -->
|
||||
<mysql.version>8.0.19</mysql.version>
|
||||
<ojdbc.version>19.7.0.0</ojdbc.version>
|
||||
@ -119,6 +119,11 @@
|
||||
<artifactId>spring-boot-starter-data-mongodb</artifactId>
|
||||
<version>${springboot.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
<version>${springboot.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-mail</artifactId>
|
||||
|
@ -18,6 +18,7 @@ import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
@ -51,6 +52,11 @@ public class OhMyServer {
|
||||
PropertyUtils.init();
|
||||
Properties properties = PropertyUtils.getProperties();
|
||||
int port = Integer.parseInt(properties.getProperty(PowerJobServerConfigKey.AKKA_PORT, "10086"));
|
||||
String portFromJVM = System.getProperty(PowerJobServerConfigKey.AKKA_PORT);
|
||||
if (StringUtils.isNotEmpty(portFromJVM)) {
|
||||
log.info("[OhMyWorker] use port from jvm params: {}", portFromJVM);
|
||||
port = Integer.parseInt(portFromJVM);
|
||||
}
|
||||
|
||||
// 启动 ActorSystem
|
||||
Map<String, Object> overrideConfig = Maps.newHashMap();
|
||||
|
@ -1,18 +1,19 @@
|
||||
package com.github.kfcfans.powerjob.server.akka.actors;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import com.github.kfcfans.powerjob.common.PowerJobException;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.github.kfcfans.powerjob.common.model.SystemMetrics;
|
||||
import com.github.kfcfans.powerjob.common.response.AskResponse;
|
||||
import com.github.kfcfans.powerjob.server.akka.requests.FriendQueryWorkerClusterStatusReq;
|
||||
import com.github.kfcfans.powerjob.server.akka.requests.Ping;
|
||||
import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq;
|
||||
import com.github.kfcfans.powerjob.server.akka.requests.RemoteProcessReq;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.SpringUtils;
|
||||
import com.github.kfcfans.powerjob.server.service.JobService;
|
||||
import com.github.kfcfans.powerjob.server.service.ha.WorkerManagerService;
|
||||
import com.github.kfcfans.powerjob.server.service.workflow.WorkflowService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@ -27,7 +28,7 @@ public class FriendActor extends AbstractActor {
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(Ping.class, this::onReceivePing)
|
||||
.match(RunJobOrWorkflowReq.class, this::onReceiveFriendResendRunRequest)
|
||||
.match(RemoteProcessReq.class, this::onReceiveRemoteProcessReq)
|
||||
.match(FriendQueryWorkerClusterStatusReq.class, this::onReceiveFriendQueryWorkerClusterStatusReq)
|
||||
.matchAny(obj -> log.warn("[FriendActor] receive unknown request: {}.", obj))
|
||||
.build();
|
||||
@ -49,26 +50,39 @@ public class FriendActor extends AbstractActor {
|
||||
getSender().tell(askResponse, getSelf());
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理 run 转发
|
||||
*/
|
||||
private void onReceiveFriendResendRunRequest(RunJobOrWorkflowReq req) {
|
||||
private void onReceiveRemoteProcessReq(RemoteProcessReq req) {
|
||||
|
||||
AskResponse response = new AskResponse();
|
||||
response.setSuccess(true);
|
||||
try {
|
||||
Long resultId;
|
||||
switch (req.getType()) {
|
||||
case RunJobOrWorkflowReq.WORKFLOW:
|
||||
resultId = SpringUtils.getBean(WorkflowService.class).runWorkflow(req.getId(), req.getAppId(), req.getParams(), req.getDelay());
|
||||
break;
|
||||
case RunJobOrWorkflowReq.JOB:
|
||||
resultId = SpringUtils.getBean(JobService.class).runJob(req.getId(), req.getParams(), req.getDelay());
|
||||
break;
|
||||
default:
|
||||
throw new PowerJobException("unknown type: " + req.getType());
|
||||
|
||||
Object[] args = req.getArgs();
|
||||
String[] parameterTypes = req.getParameterTypes();
|
||||
Class<?>[] parameters = new Class[parameterTypes.length];
|
||||
|
||||
for (int i = 0; i < parameterTypes.length; i++) {
|
||||
parameters[i] = Class.forName(parameterTypes[i]);
|
||||
Object arg = args[i];
|
||||
if (arg != null) {
|
||||
args[i] = JSONObject.parseObject(JSONObject.toJSONBytes(arg), parameters[i]);
|
||||
}
|
||||
}
|
||||
getSender().tell(AskResponse.succeed(String.valueOf(resultId)), getSelf());
|
||||
} catch (Exception e) {
|
||||
log.error("[FriendActor] process run request [{}] failed!", req, e);
|
||||
getSender().tell(AskResponse.failed(e.getMessage()), getSelf());
|
||||
|
||||
Class<?> clz = Class.forName(req.getClassName());
|
||||
|
||||
Object bean = SpringUtils.getBean(clz);
|
||||
Method method = ReflectionUtils.findMethod(clz, req.getMethodName(), parameters);
|
||||
|
||||
assert method != null;
|
||||
Object invokeResult = ReflectionUtils.invokeMethod(method, bean, args);
|
||||
|
||||
response.setData(JSONObject.toJSONBytes(invokeResult));
|
||||
|
||||
} catch (Throwable t) {
|
||||
log.error("[FriendActor] process remote request[{}] failed!", req, t);
|
||||
response.setSuccess(false);
|
||||
response.setMessage(ExceptionUtils.getMessage(t));
|
||||
}
|
||||
getSender().tell(response, getSelf());
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,25 @@
|
||||
package com.github.kfcfans.powerjob.server.akka.requests;
|
||||
|
||||
import com.github.kfcfans.powerjob.common.OmsSerializable;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* 原创执行命令
|
||||
*
|
||||
* @author tjq
|
||||
* @since 12/13/20
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@Accessors(chain = true)
|
||||
public class RemoteProcessReq implements OmsSerializable {
|
||||
|
||||
private String className;
|
||||
private String methodName;
|
||||
private String[] parameterTypes;
|
||||
|
||||
private Object[] args;
|
||||
|
||||
}
|
@ -1,27 +0,0 @@
|
||||
package com.github.kfcfans.powerjob.server.akka.requests;
|
||||
|
||||
import com.github.kfcfans.powerjob.common.OmsSerializable;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 运行 Job 或 工作流,需要转发到 server 进行,否则没有集群信息
|
||||
*
|
||||
* @author tjq
|
||||
* @since 11/7/20
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class RunJobOrWorkflowReq implements OmsSerializable {
|
||||
public static final int JOB = 1;
|
||||
public static final int WORKFLOW = 2;
|
||||
|
||||
private int type;
|
||||
private long id;
|
||||
private long delay;
|
||||
private String params;
|
||||
|
||||
private long appId;
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
package com.github.kfcfans.powerjob.server.common.redirect;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
* 需要在指定的服务器运行
|
||||
* 注意:该注解所在方法的参数必须为对象,不可以是 long 等基本类型
|
||||
*
|
||||
* @author tjq
|
||||
* @since 12/13/20
|
||||
*/
|
||||
@Target(ElementType.METHOD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
public @interface DesignateServer {
|
||||
|
||||
/**
|
||||
* 转发请求需要 AppInfo 下的 currentServer 信息,因此必须要有 appId 作为入参,该字段指定了 appId 字段的参数名称
|
||||
* @return appId 参数名称
|
||||
*/
|
||||
String appIdParameterName();
|
||||
}
|
@ -0,0 +1,93 @@
|
||||
package com.github.kfcfans.powerjob.server.common.redirect;
|
||||
|
||||
import akka.pattern.Patterns;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.github.kfcfans.powerjob.common.PowerJobException;
|
||||
import com.github.kfcfans.powerjob.common.RemoteConstant;
|
||||
import com.github.kfcfans.powerjob.common.response.AskResponse;
|
||||
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.powerjob.server.akka.requests.RemoteProcessReq;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.Signature;
|
||||
import org.aspectj.lang.annotation.Around;
|
||||
import org.aspectj.lang.annotation.Aspect;
|
||||
import org.aspectj.lang.reflect.MethodSignature;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
|
||||
/**
|
||||
* 执行服务器运行切面
|
||||
*
|
||||
* @author tjq
|
||||
* @since 12/13/20
|
||||
*/
|
||||
@Slf4j
|
||||
@Aspect
|
||||
@Component
|
||||
public class DesignateServerAspect {
|
||||
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
|
||||
@Around(value = "@annotation(designateServer))")
|
||||
public Object execute(ProceedingJoinPoint point, DesignateServer designateServer) throws Throwable {
|
||||
|
||||
// 参数
|
||||
Object[] args = point.getArgs();
|
||||
// 方法名
|
||||
String methodName = point.getSignature().getName();
|
||||
// 类名
|
||||
String className = point.getSignature().getDeclaringTypeName();
|
||||
Signature signature = point.getSignature();
|
||||
// 方法签名
|
||||
MethodSignature methodSignature = (MethodSignature) signature;
|
||||
String[] parameterNames = methodSignature.getParameterNames();
|
||||
String[] parameterTypes = Arrays.stream(methodSignature.getParameterTypes()).map(Class::getName).toArray(String[]::new);
|
||||
|
||||
Long appId = null;
|
||||
for (int i = 0; i < parameterNames.length; i++) {
|
||||
if (StringUtils.equals(parameterNames[i], designateServer.appIdParameterName())) {
|
||||
appId = Long.parseLong(String.valueOf(args[i]));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (appId == null) {
|
||||
throw new PowerJobException("can't find appId in params for:" + signature.toString());
|
||||
}
|
||||
|
||||
// 获取执行机器
|
||||
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new PowerJobException("can't find app info"));
|
||||
String targetServer = appInfo.getCurrentServer();
|
||||
|
||||
// 目标IP与本地符合则本地执行
|
||||
if (Objects.equals(targetServer, OhMyServer.getActorSystemAddress())) {
|
||||
return point.proceed();
|
||||
}
|
||||
|
||||
log.info("[DesignateServerAspect] the method[{}] should execute in server[{}], so this request will be redirect to remote server!", signature.toShortString(), targetServer);
|
||||
// 转发请求,远程执行后返回结果
|
||||
RemoteProcessReq remoteProcessReq = new RemoteProcessReq()
|
||||
.setClassName(className)
|
||||
.setMethodName(methodName)
|
||||
.setParameterTypes(parameterTypes)
|
||||
.setArgs(args);
|
||||
|
||||
CompletionStage<Object> askCS = Patterns.ask(OhMyServer.getFriendActor(targetServer), remoteProcessReq, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
|
||||
AskResponse askResponse = (AskResponse) askCS.toCompletableFuture().get();
|
||||
|
||||
if (!askResponse.isSuccess()) {
|
||||
throw new PowerJobException("remote process failed: " + askResponse.getMessage());
|
||||
}
|
||||
return JSONObject.parseObject(askResponse.getData(), methodSignature.getReturnType());
|
||||
}
|
||||
}
|
@ -1,6 +1,8 @@
|
||||
package com.github.kfcfans.powerjob.server.service.alarm;
|
||||
package com.github.kfcfans.powerjob.server.extension;
|
||||
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.service.alarm.Alarm;
|
||||
import com.github.kfcfans.powerjob.server.service.alarm.AlarmCenter;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
|
||||
import java.util.List;
|
@ -0,0 +1,14 @@
|
||||
package com.github.kfcfans.powerjob.server.extension;
|
||||
|
||||
/**
|
||||
* provide unique server ip in the cluster for IdGenerateService
|
||||
* @author user
|
||||
*/
|
||||
public interface ServerIdProvider {
|
||||
|
||||
/**
|
||||
* get number for IdGenerateService
|
||||
* @return serverId, must in range [0, 16384)
|
||||
*/
|
||||
long getServerId();
|
||||
}
|
@ -81,6 +81,7 @@ public class DispatchService {
|
||||
if (maxInstanceNum > 0) {
|
||||
|
||||
// 这个 runningInstanceCount 已经包含了本 instance
|
||||
// 不统计 WAITING_DISPATCH 的状态:使用 OpenAPI 触发的延迟任务不应该统计进去(比如 delay 是 1 天)
|
||||
long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV()));
|
||||
// 超出最大同时运行限制,不执行调度
|
||||
if (runningInstanceCount > maxInstanceNum) {
|
||||
|
@ -4,7 +4,9 @@ import com.github.kfcfans.powerjob.common.OmsConstant;
|
||||
import com.github.kfcfans.powerjob.common.TimeExpressionType;
|
||||
import com.github.kfcfans.powerjob.common.model.InstanceLogContent;
|
||||
import com.github.kfcfans.powerjob.common.utils.CommonUtils;
|
||||
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||
import com.github.kfcfans.powerjob.common.utils.SegmentLock;
|
||||
import com.github.kfcfans.powerjob.server.common.redirect.DesignateServer;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
|
||||
import com.github.kfcfans.powerjob.server.persistence.StringPage;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
|
||||
@ -21,6 +23,7 @@ import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.apache.commons.lang3.time.FastDateFormat;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
@ -45,6 +48,9 @@ import java.util.stream.Stream;
|
||||
@Service
|
||||
public class InstanceLogService {
|
||||
|
||||
@Value("${server.port}")
|
||||
private int port;
|
||||
|
||||
@Resource
|
||||
private InstanceMetadataService instanceMetadataService;
|
||||
@Resource
|
||||
@ -99,11 +105,13 @@ public class InstanceLogService {
|
||||
|
||||
/**
|
||||
* 获取任务实例运行日志(默认存在本地数据,需要由生成完成请求的路由与转发)
|
||||
* @param appId appId,AOP 专用
|
||||
* @param instanceId 任务实例ID
|
||||
* @param index 页码,从0开始
|
||||
* @return 文本字符串
|
||||
*/
|
||||
public StringPage fetchInstanceLog(Long instanceId, long index) {
|
||||
@DesignateServer(appIdParameterName = "appId")
|
||||
public StringPage fetchInstanceLog(Long appId, Long instanceId, Long index) {
|
||||
try {
|
||||
Future<File> fileFuture = prepareLogFile(instanceId);
|
||||
// 超时并不会打断正在执行的任务
|
||||
@ -125,7 +133,7 @@ public class InstanceLogService {
|
||||
++lines;
|
||||
}
|
||||
}catch (Exception e) {
|
||||
log.warn("[InstanceLog-{}] read logFile from disk failed.", instanceId, e);
|
||||
log.warn("[InstanceLog-{}] read logFile from disk failed for app: {}.", instanceId, appId, e);
|
||||
return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e));
|
||||
}
|
||||
|
||||
@ -140,6 +148,19 @@ public class InstanceLogService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取日志的下载链接
|
||||
* @param appId AOP 专用
|
||||
* @param instanceId 任务实例 ID
|
||||
* @return 下载链接
|
||||
*/
|
||||
@DesignateServer(appIdParameterName = "appId")
|
||||
public String fetchDownloadUrl(Long appId, Long instanceId) {
|
||||
String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId;
|
||||
log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url);
|
||||
return url;
|
||||
}
|
||||
|
||||
/**
|
||||
* 下载全部的任务日志文件
|
||||
* @param instanceId 任务实例ID
|
||||
|
@ -5,15 +5,12 @@ import com.github.kfcfans.powerjob.common.PowerJobException;
|
||||
import com.github.kfcfans.powerjob.common.TimeExpressionType;
|
||||
import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest;
|
||||
import com.github.kfcfans.powerjob.common.response.JobInfoDTO;
|
||||
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq;
|
||||
import com.github.kfcfans.powerjob.server.common.SJ;
|
||||
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
|
||||
import com.github.kfcfans.powerjob.server.common.redirect.DesignateServer;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository;
|
||||
import com.github.kfcfans.powerjob.server.service.instance.InstanceService;
|
||||
@ -26,7 +23,6 @@ import org.springframework.util.CollectionUtils;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
@ -49,8 +45,6 @@ public class JobService {
|
||||
@Resource
|
||||
private InstanceInfoRepository instanceInfoRepository;
|
||||
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
|
||||
/**
|
||||
* 保存/修改任务
|
||||
@ -108,30 +102,13 @@ public class JobService {
|
||||
* @param delay 延迟时间,单位 毫秒
|
||||
* @return 任务实例ID
|
||||
*/
|
||||
public long runJob(Long jobId, String instanceParams, long delay) {
|
||||
@DesignateServer(appIdParameterName = "appId")
|
||||
public long runJob(Long appId, Long jobId, String instanceParams, Long delay) {
|
||||
|
||||
delay = delay == null ? 0 : delay;
|
||||
JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));
|
||||
|
||||
AppInfoDO appInfo = appInfoRepository.findById(jobInfo.getAppId()).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by appId: " + jobInfo.getAppId()));
|
||||
String targetServer = appInfo.getCurrentServer();
|
||||
|
||||
if (Objects.equals(targetServer, OhMyServer.getActorSystemAddress())) {
|
||||
return realRunJob(jobInfo, instanceParams, delay);
|
||||
}
|
||||
|
||||
// 转发请求
|
||||
log.info("[Job-{}] redirect run request[params={}] to target server: {}", jobId, instanceParams, targetServer);
|
||||
RunJobOrWorkflowReq req = new RunJobOrWorkflowReq(RunJobOrWorkflowReq.JOB, jobId, delay, instanceParams, jobInfo.getAppId());
|
||||
try {
|
||||
return Long.parseLong(OhMyServer.askFriend(targetServer, req));
|
||||
}catch (Exception e) {
|
||||
log.error("[Job-{}] redirect run request[params={}] to target server[{}] failed!", jobId, instanceParams, targetServer);
|
||||
throw new PowerJobException("redirect run request failed!", e);
|
||||
}
|
||||
}
|
||||
|
||||
private long realRunJob(JobInfoDO jobInfo, String instanceParams, long delay) {
|
||||
log.info("[Job-{}] try to run job, instanceParams={},delay={} ms.", jobInfo.getId(), instanceParams, delay);
|
||||
log.info("[Job-{}] try to run job in app[{}], instanceParams={},delay={} ms.", jobInfo.getId(), appId, instanceParams, delay);
|
||||
Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
|
||||
instanceInfoRepository.flush();
|
||||
if (delay <= 0) {
|
||||
@ -145,6 +122,7 @@ public class JobService {
|
||||
return instanceId;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 删除某个任务
|
||||
* @param jobId 任务ID
|
||||
|
@ -1,5 +1,6 @@
|
||||
package com.github.kfcfans.powerjob.server.service.alarm;
|
||||
|
||||
import com.github.kfcfans.powerjob.server.extension.Alarmable;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Queues;
|
||||
|
@ -8,7 +8,7 @@ import com.github.kfcfans.powerjob.server.common.SJ;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.DingTalkUtils;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.service.alarm.Alarm;
|
||||
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
|
||||
import com.github.kfcfans.powerjob.server.extension.Alarmable;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -2,7 +2,7 @@ package com.github.kfcfans.powerjob.server.service.alarm.impl;
|
||||
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.service.alarm.Alarm;
|
||||
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
|
||||
import com.github.kfcfans.powerjob.server.extension.Alarmable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.core.env.Environment;
|
||||
|
@ -5,7 +5,7 @@ import com.github.kfcfans.powerjob.common.OmsConstant;
|
||||
import com.github.kfcfans.powerjob.common.utils.HttpUtils;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.UserInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.service.alarm.Alarm;
|
||||
import com.github.kfcfans.powerjob.server.service.alarm.Alarmable;
|
||||
import com.github.kfcfans.powerjob.server.extension.Alarmable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import okhttp3.MediaType;
|
||||
import okhttp3.RequestBody;
|
||||
|
@ -0,0 +1,37 @@
|
||||
package com.github.kfcfans.powerjob.server.service.id;
|
||||
|
||||
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||
import com.github.kfcfans.powerjob.server.extension.ServerIdProvider;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* 默认服务器 ID 生成策略,不适用于 Server 频繁重启且变化 IP 的场景
|
||||
* @author user
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class DefaultServerIdProvider implements ServerIdProvider {
|
||||
|
||||
private final Long id;
|
||||
|
||||
public DefaultServerIdProvider(ServerInfoRepository serverInfoRepository) {
|
||||
String ip = NetUtils.getLocalHost();
|
||||
ServerInfoDO server = serverInfoRepository.findByIp(ip);
|
||||
|
||||
if (server == null) {
|
||||
ServerInfoDO newServerInfo = new ServerInfoDO(ip);
|
||||
server = serverInfoRepository.saveAndFlush(newServerInfo);
|
||||
}
|
||||
this.id = server.getId();
|
||||
|
||||
log.info("[DefaultServerIdProvider] address:{},id:{}", ip, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getServerId() {
|
||||
return id;
|
||||
}
|
||||
}
|
@ -1,16 +1,19 @@
|
||||
package com.github.kfcfans.powerjob.server.service.id;
|
||||
|
||||
import com.github.kfcfans.powerjob.common.utils.NetUtils;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.ServerInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.ServerInfoRepository;
|
||||
import com.github.kfcfans.powerjob.common.PowerJobException;
|
||||
import com.github.kfcfans.powerjob.server.extension.ServerIdProvider;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 唯一ID生成服务,使用 Twitter snowflake 算法
|
||||
* 机房ID:固定为0,占用2位
|
||||
* 机器ID:数据库自增,占用14位(如果频繁部署需要删除数据库重置id)
|
||||
* 机器ID:由 ServerIdProvider 提供
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/4/6
|
||||
@ -20,24 +23,37 @@ import org.springframework.stereotype.Service;
|
||||
public class IdGenerateService {
|
||||
|
||||
private final SnowFlakeIdGenerator snowFlakeIdGenerator;
|
||||
|
||||
private static final int DATA_CENTER_ID = 0;
|
||||
|
||||
@Autowired
|
||||
public IdGenerateService(ServerInfoRepository serverInfoRepository) {
|
||||
public IdGenerateService(List<ServerIdProvider> serverIdProviders) {
|
||||
|
||||
String ip = NetUtils.getLocalHost();
|
||||
ServerInfoDO server = serverInfoRepository.findByIp(ip);
|
||||
|
||||
if (server == null) {
|
||||
ServerInfoDO newServerInfo = new ServerInfoDO(ip);
|
||||
server = serverInfoRepository.saveAndFlush(newServerInfo);
|
||||
if (CollectionUtils.isEmpty(serverIdProviders)) {
|
||||
throw new PowerJobException("can't find any ServerIdProvider!");
|
||||
}
|
||||
|
||||
Long id = server.getId();
|
||||
snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);
|
||||
ServerIdProvider serverIdProvider;
|
||||
int severIpProviderNums = serverIdProviders.size();
|
||||
if (severIpProviderNums == 1) {
|
||||
serverIdProvider = serverIdProviders.get(0);
|
||||
} else {
|
||||
List<ServerIdProvider> extendServerIpProviders = Lists.newArrayList();
|
||||
for (ServerIdProvider sp : serverIdProviders) {
|
||||
if (sp instanceof DefaultServerIdProvider) {
|
||||
continue;
|
||||
}
|
||||
extendServerIpProviders.add(sp);
|
||||
}
|
||||
int extNum = extendServerIpProviders.size();
|
||||
if (extNum != 1) {
|
||||
throw new PowerJobException(String.format("find %d ServerIdProvider but just need one, please delete the useless ServerIdProvider!", extNum));
|
||||
}
|
||||
serverIdProvider = extendServerIpProviders.get(0);
|
||||
}
|
||||
|
||||
log.info("[IdGenerateService] init snowflake for server(address={}) by machineId({}).", ip, id);
|
||||
long id = serverIdProvider.getServerId();
|
||||
snowFlakeIdGenerator = new SnowFlakeIdGenerator(DATA_CENTER_ID, id);
|
||||
log.info("[IdGenerateService] initialize IdGenerateService successfully, ServerIdProvider:{},ID:{}", serverIdProvider.getClass().getSimpleName(), id);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -34,8 +34,8 @@ class SnowFlakeIdGenerator {
|
||||
private final static long DATA_CENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
|
||||
private final static long TIMESTAMP_LEFT = DATA_CENTER_LEFT + DATA_CENTER_BIT;
|
||||
|
||||
private long dataCenterId; //数据中心
|
||||
private long machineId; //机器标识
|
||||
private final long dataCenterId; //数据中心
|
||||
private final long machineId; //机器标识
|
||||
private long sequence = 0L; //序列号
|
||||
private long lastTimestamp = -1L;//上一次时间戳
|
||||
|
||||
|
@ -13,6 +13,7 @@ import com.github.kfcfans.powerjob.common.response.AskResponse;
|
||||
import com.github.kfcfans.powerjob.common.response.InstanceInfoDTO;
|
||||
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.powerjob.server.common.constans.InstanceType;
|
||||
import com.github.kfcfans.powerjob.server.common.redirect.DesignateServer;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.timewheel.TimerFuture;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.InstanceInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO;
|
||||
@ -132,7 +133,11 @@ public class InstanceService {
|
||||
* 重试任务(只有结束的任务运行重试)
|
||||
* @param instanceId 任务实例ID
|
||||
*/
|
||||
public void retryInstance(Long instanceId) {
|
||||
@DesignateServer(appIdParameterName = "appId")
|
||||
public void retryInstance(Long appId, Long instanceId) {
|
||||
|
||||
log.info("[Instance-{}] retry instance in appId: {}", instanceId, appId);
|
||||
|
||||
InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
|
||||
if (!InstanceStatus.finishedStatus.contains(instanceInfo.getStatus())) {
|
||||
throw new PowerJobException("Only stopped instance can be retry!");
|
||||
|
@ -30,10 +30,7 @@ import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -239,7 +236,10 @@ public class OmsScheduleService {
|
||||
}
|
||||
|
||||
log.info("[FrequentScheduler] These frequent jobs will be scheduled: {}.", notRunningJobIds);
|
||||
notRunningJobIds.forEach(jobId -> jobService.runJob(jobId, null, 0));
|
||||
notRunningJobIds.forEach(jobId -> {
|
||||
Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
|
||||
jobInfoOpt.ifPresent(jobInfoDO -> jobService.runJob(jobInfoDO.getAppId(), jobId, null, 0L));
|
||||
});
|
||||
}catch (Exception e) {
|
||||
log.error("[FrequentScheduler] schedule frequent job failed.", e);
|
||||
}
|
||||
|
@ -5,15 +5,12 @@ import com.github.kfcfans.powerjob.common.PowerJobException;
|
||||
import com.github.kfcfans.powerjob.common.TimeExpressionType;
|
||||
import com.github.kfcfans.powerjob.common.request.http.SaveWorkflowRequest;
|
||||
import com.github.kfcfans.powerjob.common.response.WorkflowInfoDTO;
|
||||
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.powerjob.server.akka.requests.RunJobOrWorkflowReq;
|
||||
import com.github.kfcfans.powerjob.server.common.SJ;
|
||||
import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus;
|
||||
import com.github.kfcfans.powerjob.server.common.redirect.DesignateServer;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.CronExpression;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.WorkflowDAGUtils;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.AppInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.model.WorkflowInfoDO;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.AppInfoRepository;
|
||||
import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInfoRepository;
|
||||
import com.github.kfcfans.powerjob.server.service.instance.InstanceTimeWheelService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -22,7 +19,6 @@ import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Workflow 服务
|
||||
@ -34,8 +30,6 @@ import java.util.Objects;
|
||||
@Service
|
||||
public class WorkflowService {
|
||||
|
||||
@Resource
|
||||
private AppInfoRepository appInfoRepository;
|
||||
@Resource
|
||||
private WorkflowInstanceManager workflowInstanceManager;
|
||||
@Resource
|
||||
@ -144,29 +138,12 @@ public class WorkflowService {
|
||||
* @param delay 延迟时间
|
||||
* @return 该 workflow 实例的 instanceId(wfInstanceId)
|
||||
*/
|
||||
public Long runWorkflow(Long wfId, Long appId, String initParams, long delay) {
|
||||
@DesignateServer(appIdParameterName = "appId")
|
||||
public Long runWorkflow(Long wfId, Long appId, String initParams, Long delay) {
|
||||
|
||||
delay = delay == null ? 0 : delay;
|
||||
WorkflowInfoDO wfInfo = permissionCheck(wfId, appId);
|
||||
|
||||
AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new IllegalArgumentException("can't find appInfo by appId: " + appId));
|
||||
|
||||
String targetServer = appInfo.getCurrentServer();
|
||||
if (Objects.equals(targetServer, OhMyServer.getActorSystemAddress())) {
|
||||
return realRunWorkflow(wfInfo, initParams, delay);
|
||||
}
|
||||
|
||||
log.info("[WorkflowService-{}] redirect run request[initParams={}] to target server: {}", wfId, initParams, targetServer);
|
||||
// 转发请求
|
||||
RunJobOrWorkflowReq req = new RunJobOrWorkflowReq(RunJobOrWorkflowReq.WORKFLOW, wfId, delay, initParams, appId);
|
||||
try {
|
||||
return Long.valueOf(OhMyServer.askFriend(targetServer, req));
|
||||
}catch (Exception e) {
|
||||
log.error("[WorkflowService-{}] redirect run request[params={}] to target server[{}] failed!", wfId, initParams, targetServer);
|
||||
throw new PowerJobException("redirect run request failed!", e);
|
||||
}
|
||||
}
|
||||
|
||||
private Long realRunWorkflow(WorkflowInfoDO wfInfo, String initParams, long delay) {
|
||||
log.info("[WorkflowService-{}] try to run workflow, initParams={},delay={} ms.", wfInfo.getId(), initParams, delay);
|
||||
Long wfInstanceId = workflowInstanceManager.create(wfInfo, initParams, System.currentTimeMillis() + delay);
|
||||
if (delay <= 0) {
|
||||
|
@ -3,7 +3,6 @@ package com.github.kfcfans.powerjob.server.web.controller;
|
||||
import com.github.kfcfans.powerjob.common.InstanceStatus;
|
||||
import com.github.kfcfans.powerjob.common.PowerJobException;
|
||||
import com.github.kfcfans.powerjob.common.response.ResultDTO;
|
||||
import com.github.kfcfans.powerjob.server.akka.OhMyServer;
|
||||
import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils;
|
||||
import com.github.kfcfans.powerjob.server.persistence.PageResult;
|
||||
import com.github.kfcfans.powerjob.server.persistence.StringPage;
|
||||
@ -19,7 +18,6 @@ import com.github.kfcfans.powerjob.server.web.response.InstanceDetailVO;
|
||||
import com.github.kfcfans.powerjob.server.web.response.InstanceInfoVO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.domain.Example;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
@ -46,8 +44,7 @@ import java.util.stream.Collectors;
|
||||
@RequestMapping("/instance")
|
||||
public class InstanceController {
|
||||
|
||||
@Value("${server.port}")
|
||||
private int port;
|
||||
|
||||
|
||||
@Resource
|
||||
private InstanceService instanceService;
|
||||
@ -68,8 +65,8 @@ public class InstanceController {
|
||||
}
|
||||
|
||||
@GetMapping("/retry")
|
||||
public ResultDTO<Void> retryInstance(Long instanceId) {
|
||||
instanceService.retryInstance(instanceId);
|
||||
public ResultDTO<Void> retryInstance(String appId, Long instanceId) {
|
||||
instanceService.retryInstance(Long.valueOf(appId), instanceId);
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
|
||||
@ -79,32 +76,13 @@ public class InstanceController {
|
||||
}
|
||||
|
||||
@GetMapping("/log")
|
||||
public ResultDTO<StringPage> getInstanceLog(Long instanceId, Long index, HttpServletResponse response) {
|
||||
|
||||
String targetServer = getTargetServer(instanceId);
|
||||
|
||||
// 转发HTTP请求(如果使用Akka,则需要传输两次,而转发HTTP请求只需要传输一次"大"数据包)
|
||||
if (!OhMyServer.getActorSystemAddress().equals(targetServer)) {
|
||||
String ip = targetServer.split(":")[0];
|
||||
String url = String.format("http://%s:%s/instance/log?instanceId=%d&index=%d", ip, port, instanceId, index);
|
||||
try {
|
||||
response.sendRedirect(url);
|
||||
return ResultDTO.success(StringPage.simple("redirecting..."));
|
||||
}catch (Exception e) {
|
||||
log.warn("[Instance-{}] redirect request to url[{}] failed, please ensure all server has the same http port!", instanceId, url, e);
|
||||
return ResultDTO.failed(e);
|
||||
}
|
||||
}
|
||||
|
||||
return ResultDTO.success(instanceLogService.fetchInstanceLog(instanceId, index));
|
||||
public ResultDTO<StringPage> getInstanceLog(Long appId, Long instanceId, Long index) {
|
||||
return ResultDTO.success(instanceLogService.fetchInstanceLog(appId, instanceId, index));
|
||||
}
|
||||
|
||||
@GetMapping("/downloadLogUrl")
|
||||
public ResultDTO<String> getDownloadUrl(Long instanceId) {
|
||||
String targetServer = getTargetServer(instanceId);
|
||||
String ip = targetServer.split(":")[0];
|
||||
String url = "http://" + ip + ":" + port + "/instance/downloadLog?instanceId=" + instanceId;
|
||||
return ResultDTO.success(url);
|
||||
public ResultDTO<String> getDownloadUrl(Long appId, Long instanceId) {
|
||||
return ResultDTO.success(instanceLogService.fetchDownloadUrl(appId, instanceId));
|
||||
}
|
||||
|
||||
@GetMapping("/downloadLog")
|
||||
|
@ -57,8 +57,8 @@ public class JobController {
|
||||
}
|
||||
|
||||
@GetMapping("/run")
|
||||
public ResultDTO<Long> runImmediately(String jobId) {
|
||||
return ResultDTO.success(jobService.runJob(Long.valueOf(jobId), null, 0));
|
||||
public ResultDTO<Long> runImmediately(String appId, String jobId) {
|
||||
return ResultDTO.success(jobService.runJob(Long.valueOf(appId), Long.valueOf(jobId), null, 0L));
|
||||
}
|
||||
|
||||
@PostMapping("/list")
|
||||
|
@ -82,7 +82,7 @@ public class OpenAPIController {
|
||||
@PostMapping(OpenAPIConstant.RUN_JOB)
|
||||
public ResultDTO<Long> runJob(Long appId, Long jobId, @RequestParam(required = false) String instanceParams, @RequestParam(required = false) Long delay) {
|
||||
checkJobIdValid(jobId, appId);
|
||||
return ResultDTO.success(jobService.runJob(jobId, instanceParams, delay == null ? 0 : delay));
|
||||
return ResultDTO.success(jobService.runJob(appId, jobId, instanceParams, delay));
|
||||
}
|
||||
|
||||
/* ************* Instance 区 ************* */
|
||||
@ -104,7 +104,7 @@ public class OpenAPIController {
|
||||
@PostMapping(OpenAPIConstant.RETRY_INSTANCE)
|
||||
public ResultDTO<Void> retryInstance(Long instanceId, Long appId) {
|
||||
checkInstanceIdValid(instanceId, appId);
|
||||
instanceService.retryInstance(instanceId);
|
||||
instanceService.retryInstance(appId, instanceId);
|
||||
return ResultDTO.success(null);
|
||||
}
|
||||
|
||||
@ -151,7 +151,7 @@ public class OpenAPIController {
|
||||
|
||||
@PostMapping(OpenAPIConstant.RUN_WORKFLOW)
|
||||
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId, @RequestParam(required = false) String initParams, @RequestParam(required = false) Long delay) {
|
||||
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, initParams, delay == null ? 0 : delay));
|
||||
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, initParams, delay));
|
||||
}
|
||||
|
||||
/* ************* Workflow Instance 区 ************* */
|
||||
|
@ -79,7 +79,7 @@ public class WorkflowController {
|
||||
|
||||
@GetMapping("/run")
|
||||
public ResultDTO<Long> runWorkflow(Long workflowId, Long appId) {
|
||||
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, null, 0));
|
||||
return ResultDTO.success(workflowService.runWorkflow(workflowId, appId, null, 0L));
|
||||
}
|
||||
|
||||
private static PageResult<WorkflowInfoVO> convertPage(Page<WorkflowInfoDO> originPage) {
|
||||
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -10,12 +10,12 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-worker-agent</artifactId>
|
||||
<version>3.4.0-bugfix</version>
|
||||
<version>3.4.1</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
|
||||
<properties>
|
||||
<powerjob.worker.version>3.4.0-bugfix</powerjob.worker.version>
|
||||
<powerjob.worker.version>3.4.1</powerjob.worker.version>
|
||||
<logback.version>1.2.3</logback.version>
|
||||
<picocli.version>4.3.2</picocli.version>
|
||||
|
||||
|
@ -10,11 +10,11 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>powerjob-worker-samples</artifactId>
|
||||
<version>3.4.0</version>
|
||||
<version>3.4.1</version>
|
||||
|
||||
<properties>
|
||||
<springboot.version>2.2.6.RELEASE</springboot.version>
|
||||
<powerjob.worker.starter.version>3.4.0-bugfix</powerjob.worker.starter.version>
|
||||
<powerjob.worker.starter.version>3.4.1</powerjob.worker.starter.version>
|
||||
<fastjson.version>1.2.68</fastjson.version>
|
||||
|
||||
<!-- 部署时跳过该module -->
|
||||
|
@ -10,11 +10,11 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
|
||||
<version>3.4.0-bugfix</version>
|
||||
<version>3.4.1</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<powerjob.worker.version>3.4.0-bugfix</powerjob.worker.version>
|
||||
<powerjob.worker.version>3.4.1</powerjob.worker.version>
|
||||
<springboot.version>2.2.6.RELEASE</springboot.version>
|
||||
</properties>
|
||||
|
||||
|
@ -10,12 +10,12 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>powerjob-worker</artifactId>
|
||||
<version>3.4.0-bugfix</version>
|
||||
<version>3.4.1</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<spring.version>5.2.4.RELEASE</spring.version>
|
||||
<powerjob.common.version>3.4.0</powerjob.common.version>
|
||||
<powerjob.common.version>3.4.1</powerjob.common.version>
|
||||
<h2.db.version>1.4.200</h2.db.version>
|
||||
<hikaricp.version>3.4.2</hikaricp.version>
|
||||
<junit.version>5.6.1</junit.version>
|
||||
|
@ -1,15 +1,15 @@
|
||||
package com.github.kfcfans.powerjob.worker.common.utils;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Spring ApplicationContext 工具类
|
||||
*
|
||||
* @author tjq
|
||||
* @since 2020/3/16
|
||||
*/
|
||||
@Slf4j
|
||||
public class SpringUtils {
|
||||
|
||||
private static boolean supportSpringBean = false;
|
||||
@ -43,7 +43,9 @@ public class SpringUtils {
|
||||
// 小写转大写
|
||||
char[] cs = beanName.toCharArray();
|
||||
cs[0] += 32;
|
||||
return (T) context.getBean(String.valueOf(cs));
|
||||
String beanName0 = String.valueOf(cs);
|
||||
log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", context, beanName0);
|
||||
return (T) context.getBean(beanName0);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package com.github.kfcfans.powerjob.worker.core;
|
||||
import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@ -37,7 +38,8 @@ public class ProcessorBeanFactory {
|
||||
return (BasicProcessor) clz.getDeclaredConstructor().newInstance();
|
||||
|
||||
}catch (Exception e) {
|
||||
log.warn("[ProcessorBeanFactory] load local Processor(className = {}) failed, reason is {}", className, e.getMessage());
|
||||
log.warn("[ProcessorBeanFactory] load local Processor(className = {}) failed.", className, e);
|
||||
ExceptionUtils.rethrow(e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
@ -24,6 +24,7 @@ import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor;
|
||||
import com.google.common.collect.Queues;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
@ -96,10 +97,10 @@ public class ProcessorTracker {
|
||||
initProcessor();
|
||||
|
||||
log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", instanceId);
|
||||
}catch (Throwable e) {
|
||||
log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, e);
|
||||
} catch (Throwable t) {
|
||||
log.warn("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", instanceId, t);
|
||||
lethal = true;
|
||||
lethalReason = e.toString();
|
||||
lethalReason = ExceptionUtils.getMessage(t);
|
||||
}
|
||||
}
|
||||
|
||||
@ -291,7 +292,7 @@ public class ProcessorTracker {
|
||||
try {
|
||||
processor = SpringUtils.getBean(processorInfo);
|
||||
}catch (Exception e) {
|
||||
log.warn("[ProcessorTracker-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, e.toString());
|
||||
log.warn("[ProcessorTracker-{}] no spring bean of processor(className={}), reason is {}.", instanceId, processorInfo, ExceptionUtils.getMessage(e));
|
||||
}
|
||||
}
|
||||
// 反射加载
|
||||
|
Loading…
x
Reference in New Issue
Block a user