diff --git a/README.md b/README.md index c6ea7baf..658653a3 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ PS:感谢文档翻译平台[breword](https://www.breword.com/)对本项目英 * [广播执行](https://yq.aliyun.com/articles/716203?spm=a2c4e.11153959.teamhomeleft.40.371960c9qhB1mB):运行清理日志脚本什么的,也太实用了8~ # 其他 -* 产品永久开源(Apache License, Version 2.0),免费使用,且目前开发者@KFCFans有充足的时间进行项目维护和提供无偿技术支持(All In 了解一下),欢迎各位试用! +* 开源许可证:Apache License, Version 2.0 * 欢迎共同参与本项目的贡献,PR和Issue都大大滴欢迎(求求了)~ * 觉得还不错的话,可以点个Star支持一下哦~ = ̄ω ̄= * 联系方式@KFCFans -> `tengjiqi@gmail.com` diff --git a/pom.xml b/pom.xml index ab2a2670..99022ea4 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ 1.0.0 pom powerjob - https://github.com/KFCFans/OhMyScheduler + https://github.com/KFCFans/PowerJob Distributed scheduling and execution framework @@ -19,8 +19,8 @@ - https://github.com/KFCFans/OhMyScheduler - https://github.com/KFCFans/OhMyScheduler.git + https://github.com/KFCFans/PowerJob + https://github.com/KFCFans/PowerJob.git diff --git a/powerjob-client/pom.xml b/powerjob-client/pom.xml index 376f6977..39ee4363 100644 --- a/powerjob-client/pom.xml +++ b/powerjob-client/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-client - 3.1.3 + 3.2.0 jar - 3.1.3 + 3.2.0 5.6.1 diff --git a/powerjob-common/pom.xml b/powerjob-common/pom.xml index b4ebcc1f..3e8766b4 100644 --- a/powerjob-common/pom.xml +++ b/powerjob-common/pom.xml @@ -10,7 +10,7 @@ 4.0.0 powerjob-common - 3.1.3 + 3.2.0 jar diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java index 7729b75b..749926db 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/InstanceStatus.java @@ -28,6 +28,8 @@ public enum InstanceStatus { // 广义的运行状态 public static final List generalizedRunningStatus = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v); + // 结束状态 + public static final List finishedStatus = Lists.newArrayList(FAILED.v, SUCCEED.v, STOPPED.v); public static InstanceStatus of(int v) { for (InstanceStatus is : values()) { diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java index b409347e..3fa65ab0 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/RemoteConstant.java @@ -17,6 +17,7 @@ public class RemoteConstant { public static final String Task_TRACKER_ACTOR_NAME = "task_tracker"; public static final String PROCESSOR_TRACKER_ACTOR_NAME = "processor_tracker"; public static final String WORKER_ACTOR_NAME = "worker"; + public static final String TROUBLESHOOTING_ACTOR_NAME = "troubleshooting"; public static final String WORKER_AKKA_CONFIG_NAME = "oms-worker.akka.conf"; @@ -26,6 +27,7 @@ public class RemoteConstant { public static final String SERVER_ACTOR_NAME = "server_actor"; public static final String SERVER_FRIEND_ACTOR_NAME = "friend_actor"; + public static final String SERVER_TROUBLESHOOTING_ACTOR_NAME = "server_troubleshooting_actor"; public static final String SERVER_AKKA_CONFIG_NAME = "oms-server.akka.conf"; diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java index c9ab4eaa..efa4686f 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/model/InstanceDetail.java @@ -7,7 +7,7 @@ import lombok.NoArgsConstructor; import java.util.List; /** - * 任务实例的运行详细信息(对外) + * 任务实例的运行详细信息 * * @author tjq * @since 2020/4/11 @@ -20,7 +20,7 @@ public class InstanceDetail implements OmsSerializable { private Long actualTriggerTime; // 任务整体结束时间(可能不存在) private Long finishedTime; - // 任务状态(中文) + // 任务状态 private Integer status; // 任务执行结果(可能不存在) private String result; @@ -35,13 +35,16 @@ public class InstanceDetail implements OmsSerializable { // 重试次数 private Long runningTimes; + // 扩展字段,中间件升级不易,最好不要再改 common 包了...否则 server worker 版本不兼容 + private String extra; + // 秒级任务的 extra -> List @Data @NoArgsConstructor public static class SubInstanceDetail implements OmsSerializable { private long subInstanceId; - private String startTime; - private String finishedTime; + private Long startTime; + private Long finishedTime; private String result; private int status; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/WorkerHeartbeat.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/WorkerHeartbeat.java index 831088ea..815988ea 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/WorkerHeartbeat.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/WorkerHeartbeat.java @@ -27,6 +27,10 @@ public class WorkerHeartbeat implements OmsSerializable { private long heartbeatTime; // 当前加载的容器(容器名称 -> 容器版本) private List containerInfos; + // worker 版本信息 + private String version; + // 扩展字段 + private String extra; private SystemMetrics systemMetrics; } diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java index ca3b5c2c..ae7a24e2 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/request/http/SaveJobInfoRequest.java @@ -46,8 +46,8 @@ public class SaveJobInfoRequest { /* ************************** 运行时配置 ************************** */ - // 最大同时运行任务数 - private Integer maxInstanceNum = 1; + // 最大同时运行任务数,0 代表不限 + private Integer maxInstanceNum = 0; // 并发度,同时执行的线程数量 private Integer concurrency = 5; // 任务整体超时时间 diff --git a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java index 52277188..4627ee53 100644 --- a/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java +++ b/powerjob-common/src/main/java/com/github/kfcfans/powerjob/common/utils/CommonUtils.java @@ -1,11 +1,12 @@ package com.github.kfcfans.powerjob.common.utils; +import com.github.kfcfans.powerjob.common.OmsConstant; import com.github.kfcfans.powerjob.common.OmsException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.DateFormatUtils; import java.util.Collection; -import java.util.Objects; import java.util.function.Supplier; @@ -129,4 +130,20 @@ public class CommonUtils { return obj; } + /** + * 格式化时间,将时间戳转化为可阅读时间 + * @param ts 时间戳 + * @return 可阅读时间 + */ + public static String formatTime(Long ts) { + if (ts == null || ts <= 0) { + return OmsConstant.NONE; + } + try { + return DateFormatUtils.format(ts, OmsConstant.TIME_PATTERN); + }catch (Exception ignore) { + } + return OmsConstant.NONE; + } + } diff --git a/powerjob-server/pom.xml b/powerjob-server/pom.xml index 7f3cdbf5..86e187f4 100644 --- a/powerjob-server/pom.xml +++ b/powerjob-server/pom.xml @@ -10,13 +10,13 @@ 4.0.0 powerjob-server - 3.1.3 + 3.2.0 jar 2.9.2 2.2.6.RELEASE - 3.1.3 + 3.2.0 8.0.19 1.4.200 2.5.2 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java index d173801f..98269a15 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/OhMyServer.java @@ -1,12 +1,12 @@ package com.github.kfcfans.powerjob.server.akka; -import akka.actor.ActorSelection; -import akka.actor.ActorSystem; -import akka.actor.Props; +import akka.actor.*; +import akka.routing.RoundRobinPool; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.utils.NetUtils; import com.github.kfcfans.powerjob.server.akka.actors.FriendActor; import com.github.kfcfans.powerjob.server.akka.actors.ServerActor; +import com.github.kfcfans.powerjob.server.akka.actors.ServerTroubleshootingActor; import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils; import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; @@ -58,9 +58,15 @@ public class OhMyServer { Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); actorSystem = ActorSystem.create(RemoteConstant.SERVER_ACTOR_SYSTEM_NAME, akkaFinalConfig); - actorSystem.actorOf(Props.create(ServerActor.class), RemoteConstant.SERVER_ACTOR_NAME); + actorSystem.actorOf(Props.create(ServerActor.class) + .withDispatcher("akka.server-actor-dispatcher") + .withRouter(new RoundRobinPool(Runtime.getRuntime().availableProcessors() * 4)), RemoteConstant.SERVER_ACTOR_NAME); actorSystem.actorOf(Props.create(FriendActor.class), RemoteConstant.SERVER_FRIEND_ACTOR_NAME); + // 处理系统中产生的异常情况 + ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(ServerTroubleshootingActor.class), RemoteConstant.SERVER_TROUBLESHOOTING_ACTOR_NAME); + actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class); + log.info("[OhMyServer] OhMyServer's akka system start successfully, using time {}.", stopwatch); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java index fc83f9a2..fdd82196 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerActor.java @@ -63,7 +63,7 @@ public class ServerActor extends AbstractActor { getInstanceManager().updateStatus(req); // 结束状态(成功/失败)需要回复消息 - if (!InstanceStatus.generalizedRunningStatus.contains(req.getInstanceStatus())) { + if (InstanceStatus.finishedStatus.contains(req.getInstanceStatus())) { getSender().tell(AskResponse.succeed(null), getSelf()); } }catch (Exception e) { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerTroubleshootingActor.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerTroubleshootingActor.java new file mode 100644 index 00000000..31ec3ef8 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/akka/actors/ServerTroubleshootingActor.java @@ -0,0 +1,25 @@ +package com.github.kfcfans.powerjob.server.akka.actors; + +import akka.actor.AbstractActor; +import akka.actor.DeadLetter; +import lombok.extern.slf4j.Slf4j; + +/** + * 处理 server 异常信息的 actor + * + * @author tjq + * @since 2020/7/18 + */ +@Slf4j +public class ServerTroubleshootingActor extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .match(DeadLetter.class, this::onReceiveDeadLetter) + .build(); + } + + public void onReceiveDeadLetter(DeadLetter dl) { + log.warn("[ServerTroubleshootingActor] receive DeadLetter: {}", dl); + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/config/CoreJpaConfig.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/config/CoreJpaConfig.java index 6e11c2a4..22398920 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/config/CoreJpaConfig.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/config/CoreJpaConfig.java @@ -57,7 +57,11 @@ public class CoreJpaConfig { HibernateProperties hibernateProperties = new HibernateProperties(); hibernateProperties.setDdlAuto("update"); - return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings()); + + // 配置JPA自定义表名称策略 + hibernateProperties.getNaming().setPhysicalStrategy(PowerJobPhysicalNamingStrategy.class.getName()); + HibernateSettings hibernateSettings = new HibernateSettings(); + return hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), hibernateSettings); } @Primary @@ -78,5 +82,4 @@ public class CoreJpaConfig { public PlatformTransactionManager initCoreTransactionManager(EntityManagerFactoryBuilder builder) { return new JpaTransactionManager(Objects.requireNonNull(initCoreEntityManagerFactory(builder).getObject())); } - } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/config/PowerJobPhysicalNamingStrategy.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/config/PowerJobPhysicalNamingStrategy.java new file mode 100644 index 00000000..4eec2266 --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/config/PowerJobPhysicalNamingStrategy.java @@ -0,0 +1,46 @@ +package com.github.kfcfans.powerjob.server.persistence.config; + +import com.github.kfcfans.powerjob.server.common.utils.PropertyUtils; +import org.hibernate.boot.model.naming.Identifier; +import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment; +import org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy; +import org.springframework.util.StringUtils; + +import java.io.Serializable; + +/** + * 自定义表前缀,配置项 oms.table-prefix 不配置时,不增加表前缀。 + * 参考实现:{@link org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy} + *

+ * 1. 继承 PhysicalNamingStrategy 类,实现自定义表前缀; + *

+ *

+ * 2. 修改@Query(nativeQuery = true)和其SQL,用对象名和属性名代替表名和数据库字段名。 + *

+ * + * @author songyinyin + * @since 2020/7/18 + */ +public class PowerJobPhysicalNamingStrategy extends SpringPhysicalNamingStrategy implements Serializable { + + + /** + * 映射物理表名称,如:把实体表 AppInfoDO 的 DO 去掉,再加上表前缀 + * + * @param name 实体名称 + * @param jdbcEnvironment jdbc环境变量 + * @return 映射后的物理表 + */ + @Override + public Identifier toPhysicalTableName(Identifier name, JdbcEnvironment jdbcEnvironment) { + + String tablePrefix = PropertyUtils.getProperties().getProperty("oms.table-prefix"); + + String text = name.getText(); + String noDOText = StringUtils.endsWithIgnoreCase(text, "do") ? text.substring(0, text.length() - 2) : text; + String newText = StringUtils.hasLength(tablePrefix) ? tablePrefix + noDOText : noDOText; + return super.toPhysicalTableName(new Identifier(newText, name.isQuoted()), jdbcEnvironment); + } + + +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/AppInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/AppInfoDO.java index d018b36e..650c63e7 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/AppInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/AppInfoDO.java @@ -13,7 +13,7 @@ import java.util.Date; */ @Data @Entity -@Table(name = "app_info", uniqueConstraints = {@UniqueConstraint(name = "appNameUK", columnNames = {"appName"})}) +@Table(uniqueConstraints = {@UniqueConstraint(name = "appNameUK", columnNames = {"appName"})}) public class AppInfoDO { @Id diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/ContainerInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/ContainerInfoDO.java index 3424a4a6..6e46e094 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/ContainerInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/ContainerInfoDO.java @@ -13,7 +13,7 @@ import java.util.Date; */ @Data @Entity -@Table(name = "container_info", indexes = {@Index(columnList = "appId")}) +@Table(indexes = {@Index(columnList = "appId")}) public class ContainerInfoDO { @Id diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java index f06a4e50..92c83797 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/InstanceInfoDO.java @@ -18,7 +18,7 @@ import java.util.Date; @Entity @NoArgsConstructor @AllArgsConstructor -@Table(name = "instance_info", indexes = {@Index(columnList = "jobId"), @Index(columnList = "appId"), @Index(columnList = "instanceId")}) +@Table(indexes = {@Index(columnList = "jobId"), @Index(columnList = "appId"), @Index(columnList = "instanceId")}) public class InstanceInfoDO { @Id diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java index 5d214cf6..7b9ed512 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/JobInfoDO.java @@ -18,7 +18,7 @@ import java.util.Date; @Entity @NoArgsConstructor @AllArgsConstructor -@Table(name = "job_info", indexes = {@Index(columnList = "appId")}) +@Table(indexes = {@Index(columnList = "appId")}) public class JobInfoDO { diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/OmsLockDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/OmsLockDO.java index 03a1f6b7..95949f50 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/OmsLockDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/OmsLockDO.java @@ -15,7 +15,7 @@ import java.util.Date; @Data @Entity @NoArgsConstructor -@Table(name = "oms_lock", uniqueConstraints = {@UniqueConstraint(name = "lockNameUK", columnNames = {"lockName"})}) +@Table(uniqueConstraints = {@UniqueConstraint(name = "lockNameUK", columnNames = {"lockName"})}) public class OmsLockDO { @Id diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/ServerInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/ServerInfoDO.java index be61cb34..3a6882d4 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/ServerInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/ServerInfoDO.java @@ -15,7 +15,7 @@ import java.util.Date; @Data @Entity @NoArgsConstructor -@Table(name = "server_info", uniqueConstraints = {@UniqueConstraint(columnNames = "ip")}) +@Table(uniqueConstraints = {@UniqueConstraint(columnNames = "ip")}) public class ServerInfoDO { @Id diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/UserInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/UserInfoDO.java index 883a223f..bed8a473 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/UserInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/UserInfoDO.java @@ -13,7 +13,7 @@ import java.util.Date; */ @Data @Entity -@Table(name = "user_info") +@Table public class UserInfoDO { @Id diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java index 606efad7..f87f2f99 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInfoDO.java @@ -17,7 +17,7 @@ import java.util.Date; @Entity @NoArgsConstructor @AllArgsConstructor -@Table(name = "workflow_info", indexes = {@Index(columnList = "appId")}) +@Table(indexes = {@Index(columnList = "appId")}) public class WorkflowInfoDO { @Id diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java index fca69561..ff9d5651 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/model/WorkflowInstanceInfoDO.java @@ -17,7 +17,7 @@ import java.util.Date; @Entity @NoArgsConstructor @AllArgsConstructor -@Table(name = "workflow_instance_info") +@Table public class WorkflowInstanceInfoDO { @Id diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java index e5ef8513..f65fb299 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/InstanceInfoRepository.java @@ -37,19 +37,19 @@ public interface InstanceInfoRepository extends JpaRepository findByJobIdInAndStatusIn(List jobIds, List status); // 删除历史数据,JPA自带的删除居然是根据ID循环删,2000条数据删了几秒,也太拉垮了吧... // 结果只能用 int 接收 @Modifying @Transactional - @Query(value = "delete from instance_info where gmt_modified < ?1", nativeQuery = true) + @Query(value = "delete from InstanceInfoDO where gmtModified < ?1") int deleteAllByGmtModifiedBefore(Date time); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java index 3dbd2254..1e033282 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/JobInfoRepository.java @@ -20,7 +20,7 @@ public interface JobInfoRepository extends JpaRepository { // 调度专用 List findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(List appIds, int status, int timeExpressionType, long time); - @Query(value = "select id from job_info where app_id in ?1 and status = ?2 and time_expression_type in ?3", nativeQuery = true) + @Query(value = "select id from JobInfoDO where appId in ?1 and status = ?2 and timeExpressionType in ?3") List findByAppIdInAndStatusAndTimeExpressionTypeIn(List appIds, int status, List timeTypes); Page findByAppIdAndStatusNot(Long appId, int status, Pageable pageable); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/OmsLockRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/OmsLockRepository.java index 1a2ce112..3f247323 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/OmsLockRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/OmsLockRepository.java @@ -17,7 +17,7 @@ public interface OmsLockRepository extends JpaRepository { @Modifying @Transactional - @Query(value = "delete from oms_lock where lock_name = ?1", nativeQuery = true) + @Query(value = "delete from OmsLockDO where lockName = ?1") int deleteByLockName(String lockName); OmsLockDO findByLockName(String lockName); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInstanceInfoRepository.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInstanceInfoRepository.java index 04910339..809bfdfc 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInstanceInfoRepository.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/persistence/core/repository/WorkflowInstanceInfoRepository.java @@ -24,7 +24,7 @@ public interface WorkflowInstanceInfoRepository extends JpaRepository status); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java index 7f5200a8..13281259 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/DispatchService.java @@ -68,16 +68,20 @@ public class DispatchService { // 查询当前运行的实例数 long current = System.currentTimeMillis(); - long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, generalizedRunningStatus); - // 超出最大同时运行限制,不执行调度 - if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { - String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum()); - log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount); - instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now); + // 0 代表不限制在线任务,还能省去一次 DB 查询 + if (jobInfo.getMaxInstanceNum() > 0) { - instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); - return; + long runningInstanceCount = instanceInfoRepository.countByJobIdAndStatusIn(jobId, Lists.newArrayList(WAITING_WORKER_RECEIVE.getV(), RUNNING.getV())); + // 超出最大同时运行限制,不执行调度 + if (runningInstanceCount > jobInfo.getMaxInstanceNum()) { + String result = String.format(SystemInstanceResult.TOO_MUCH_INSTANCE, runningInstanceCount, jobInfo.getMaxInstanceNum()); + log.warn("[Dispatcher-{}|{}] cancel dispatch job due to too much instance(num={}) is running.", jobId, instanceId, runningInstanceCount); + instanceInfoRepository.update4TriggerFailed(instanceId, FAILED.getV(), currentRunningTimes, current, current, RemoteConstant.EMPTY_ADDRESS, result, dbInstanceParams, now); + + instanceManager.processFinishedInstance(instanceId, wfInstanceId, FAILED, result); + return; + } } // 获取当前所有可用的Worker diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java index 526e4ac6..c152a321 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/instance/InstanceService.java @@ -194,7 +194,7 @@ public class InstanceService { } }catch (Exception e) { - log.error("[Instance-{}] ask InstanceStatus from TaskTracker failed, exception is {}", instanceId, e.toString()); + log.warn("[Instance-{}] ask InstanceStatus from TaskTracker failed, exception is {}", instanceId, e.toString()); } // 失败则返回基础版信息 diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java index 781dd5e5..da429f08 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/timing/CleanService.java @@ -35,17 +35,14 @@ public class CleanService { @Resource private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; - @Value("${oms.log.retention.local}") - private int localLogRetentionDay; - @Value("${oms.log.retention.remote}") - private int remoteLogRetentionDay; + @Value("${oms.instanceinfo.retention}") + private int instanceInfoRetentionDay; + @Value("${oms.container.retention.local}") private int localContainerRetentionDay; @Value("${oms.container.retention.remote}") private int remoteContainerRetentionDay; - @Value("${oms.instanceinfo.retention}") - private int instanceInfoRetentionDay; private static final int TEMPORARY_RETENTION_DAY = 3; @@ -65,12 +62,12 @@ public class CleanService { cleanWorkflowInstanceLog(); // 释放磁盘空间 - cleanLocal(OmsFileUtils.genLogDirPath(), localLogRetentionDay); + cleanLocal(OmsFileUtils.genLogDirPath(), instanceInfoRetentionDay); cleanLocal(OmsFileUtils.genContainerJarPath(), localContainerRetentionDay); cleanLocal(OmsFileUtils.genTemporaryPath(), TEMPORARY_RETENTION_DAY); // 删除 GridFS 过期文件 - cleanRemote(GridFsManager.LOG_BUCKET, remoteLogRetentionDay); + cleanRemote(GridFsManager.LOG_BUCKET, instanceInfoRetentionDay); cleanRemote(GridFsManager.CONTAINER_BUCKET, remoteContainerRetentionDay); } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java index f574d95e..08a20494 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/service/workflow/WorkflowInstanceService.java @@ -48,12 +48,16 @@ public class WorkflowInstanceService { // 停止所有已启动且未完成的服务 PEWorkflowDAG workflowDAG = JSONObject.parseObject(wfInstance.getDag(), PEWorkflowDAG.class); WorkflowDAGUtils.listRoots(workflowDAG).forEach(node -> { - if (node.getInstanceId() != null && InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) { - log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId()); - node.setStatus(InstanceStatus.STOPPED.getV()); - node.setResult(SystemInstanceResult.STOPPED_BY_USER); + try { + if (node.getInstanceId() != null && InstanceStatus.generalizedRunningStatus.contains(node.getStatus())) { + log.debug("[WfInstance-{}] instance({}) is running, try to stop it now.", wfInstanceId, node.getInstanceId()); + node.setStatus(InstanceStatus.STOPPED.getV()); + node.setResult(SystemInstanceResult.STOPPED_BY_USER); - instanceService.stopInstance(node.getInstanceId()); + instanceService.stopInstance(node.getInstanceId()); + } + }catch (Exception e) { + log.warn("[WfInstance-{}] stop instance({}) failed.", wfInstanceId, JSONObject.toJSONString(node), e); } }); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/WebLogAspect.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/WebLogAspect.java index d8693f46..1b83499a 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/WebLogAspect.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/WebLogAspect.java @@ -8,9 +8,11 @@ import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.annotation.Pointcut; +import org.springframework.core.io.Resource; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; +import org.springframework.web.multipart.MultipartFile; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -86,6 +88,11 @@ public class WebLogAspect { if (obj instanceof HttpServletRequest || obj instanceof HttpServletResponse) { break; } + // FatJar + if (obj instanceof MultipartFile || obj instanceof Resource) { + break; + } + objList.add(obj); } return JSONObject.toJSONString(objList); diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java index 537710e8..500a2ab5 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/InstanceController.java @@ -1,7 +1,6 @@ package com.github.kfcfans.powerjob.server.web.controller; import com.github.kfcfans.powerjob.common.InstanceStatus; -import com.github.kfcfans.powerjob.common.model.InstanceDetail; import com.github.kfcfans.powerjob.common.response.ResultDTO; import com.github.kfcfans.powerjob.server.akka.OhMyServer; import com.github.kfcfans.powerjob.server.common.utils.OmsFileUtils; @@ -15,6 +14,7 @@ import com.github.kfcfans.powerjob.server.service.CacheService; import com.github.kfcfans.powerjob.server.service.InstanceLogService; import com.github.kfcfans.powerjob.server.service.instance.InstanceService; import com.github.kfcfans.powerjob.server.web.request.QueryInstanceRequest; +import com.github.kfcfans.powerjob.server.web.response.InstanceDetailVO; import com.github.kfcfans.powerjob.server.web.response.InstanceInfoVO; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Value; @@ -65,8 +65,8 @@ public class InstanceController { } @GetMapping("/detail") - public ResultDTO getInstanceDetail(String instanceId) { - return ResultDTO.success(instanceService.getInstanceDetail(Long.valueOf(instanceId))); + public ResultDTO getInstanceDetail(String instanceId) { + return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(Long.valueOf(instanceId)))); } @GetMapping("/log") diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/JobController.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/JobController.java index c8afcab9..9a0af481 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/JobController.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/controller/JobController.java @@ -1,11 +1,7 @@ package com.github.kfcfans.powerjob.server.web.controller; -import com.github.kfcfans.powerjob.common.ExecuteType; -import com.github.kfcfans.powerjob.common.ProcessorType; -import com.github.kfcfans.powerjob.common.TimeExpressionType; import com.github.kfcfans.powerjob.common.request.http.SaveJobInfoRequest; import com.github.kfcfans.powerjob.common.response.ResultDTO; -import com.github.kfcfans.powerjob.server.common.SJ; import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; import com.github.kfcfans.powerjob.server.persistence.PageResult; import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; @@ -15,7 +11,6 @@ import com.github.kfcfans.powerjob.server.web.request.QueryJobInfoRequest; import com.github.kfcfans.powerjob.server.web.response.JobInfoVO; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeanUtils; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Sort; @@ -90,7 +85,7 @@ public class JobController { if (jobInfoOpt.isPresent()) { result.setTotalItems(1); result.setTotalPages(1); - result.setData(Lists.newArrayList(convert(jobInfoOpt.get()))); + result.setData(Lists.newArrayList(JobInfoVO.from(jobInfoOpt.get()))); }else { result.setTotalPages(0); result.setTotalItems(0); @@ -108,34 +103,11 @@ public class JobController { private static PageResult convertPage(Page jobInfoPage) { - List jobInfoVOList = jobInfoPage.getContent().stream().map(JobController::convert).collect(Collectors.toList()); + List jobInfoVOList = jobInfoPage.getContent().stream().map(JobInfoVO::from).collect(Collectors.toList()); PageResult pageResult = new PageResult<>(jobInfoPage); pageResult.setData(jobInfoVOList); return pageResult; } - private static JobInfoVO convert(JobInfoDO jobInfoDO) { - JobInfoVO jobInfoVO = new JobInfoVO(); - BeanUtils.copyProperties(jobInfoDO, jobInfoVO); - - TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); - ExecuteType executeType = ExecuteType.of(jobInfoDO.getExecuteType()); - ProcessorType processorType = ProcessorType.of(jobInfoDO.getProcessorType()); - - jobInfoVO.setTimeExpressionType(timeExpressionType.name()); - jobInfoVO.setExecuteType(executeType.name()); - jobInfoVO.setProcessorType(processorType.name()); - jobInfoVO.setEnable(jobInfoDO.getStatus() == SwitchableStatus.ENABLE.getV()); - - if (!StringUtils.isEmpty(jobInfoDO.getNotifyUserIds())) { - jobInfoVO.setNotifyUserIds(SJ.commaSplitter.splitToList(jobInfoDO.getNotifyUserIds())); - }else { - jobInfoVO.setNotifyUserIds(Lists.newLinkedList()); - } - - return jobInfoVO; - } - - } diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/InstanceDetailVO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/InstanceDetailVO.java new file mode 100644 index 00000000..ad61500c --- /dev/null +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/InstanceDetailVO.java @@ -0,0 +1,98 @@ +package com.github.kfcfans.powerjob.server.web.response; + +import com.github.kfcfans.powerjob.common.OmsSerializable; +import com.github.kfcfans.powerjob.common.model.InstanceDetail; +import com.github.kfcfans.powerjob.common.utils.CommonUtils; +import com.google.common.collect.Lists; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.beans.BeanUtils; +import org.springframework.util.CollectionUtils; + +import java.util.List; + +/** + * 任务实例的运行详细信息(对外展示对象) + * 注意:日期的格式化全部需要在 server 完成,不能在浏览器完成,否则会有时区问题(当 server 与 browser 时区不一致时显示会有问题) + * + * @author tjq + * @since 2020/7/18 + */ +@Data +@NoArgsConstructor +public class InstanceDetailVO { + + // 任务整体开始时间 + private String actualTriggerTime; + // 任务整体结束时间(可能不存在) + private String finishedTime; + // 任务状态 + private Integer status; + // 任务执行结果(可能不存在) + private String result; + // TaskTracker地址 + private String taskTrackerAddress; + + // MR或BD任务专用 + private InstanceDetailVO.TaskDetail taskDetail; + // 秒级任务专用 + private List subInstanceDetails; + + // 重试次数 + private Long runningTimes; + + // 秒级任务的 extra -> List + @Data + @NoArgsConstructor + public static class SubInstanceDetail implements OmsSerializable { + private long subInstanceId; + private String startTime; + private String finishedTime; + private String result; + private int status; + } + + // MapReduce 和 Broadcast 任务的 extra -> + @Data + @NoArgsConstructor + public static class TaskDetail implements OmsSerializable { + private long totalTaskNum; + private long succeedTaskNum; + private long failedTaskNum; + } + + public static InstanceDetailVO from(InstanceDetail origin) { + InstanceDetailVO vo = new InstanceDetailVO(); + BeanUtils.copyProperties(origin, vo); + + // 格式化时间 + vo.setFinishedTime(CommonUtils.formatTime(origin.getFinishedTime())); + vo.setActualTriggerTime(CommonUtils.formatTime(origin.getActualTriggerTime())); + + // 拷贝 TaskDetail + if (origin.getTaskDetail() != null) { + TaskDetail voDetail = new TaskDetail(); + BeanUtils.copyProperties(origin.getTaskDetail(), voDetail); + vo.setTaskDetail(voDetail); + } + + // 拷贝秒级任务数据 + if (!CollectionUtils.isEmpty(origin.getSubInstanceDetails())) { + vo.subInstanceDetails = Lists.newLinkedList(); + + origin.getSubInstanceDetails().forEach(subDetail -> { + + SubInstanceDetail voSubDetail = new SubInstanceDetail(); + BeanUtils.copyProperties(subDetail, voSubDetail); + + // 格式化时间 + voSubDetail.setStartTime(CommonUtils.formatTime(subDetail.getStartTime())); + voSubDetail.setFinishedTime(CommonUtils.formatTime(subDetail.getFinishedTime())); + + vo.subInstanceDetails.add(voSubDetail); + }); + } + + return vo; + } +} diff --git a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/JobInfoVO.java b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/JobInfoVO.java index 5d384f86..7145ae26 100644 --- a/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/JobInfoVO.java +++ b/powerjob-server/src/main/java/com/github/kfcfans/powerjob/server/web/response/JobInfoVO.java @@ -1,6 +1,16 @@ package com.github.kfcfans.powerjob.server.web.response; +import com.github.kfcfans.powerjob.common.ExecuteType; +import com.github.kfcfans.powerjob.common.ProcessorType; +import com.github.kfcfans.powerjob.common.TimeExpressionType; +import com.github.kfcfans.powerjob.common.utils.CommonUtils; +import com.github.kfcfans.powerjob.server.common.SJ; +import com.github.kfcfans.powerjob.server.common.constans.SwitchableStatus; +import com.github.kfcfans.powerjob.server.persistence.core.model.JobInfoDO; +import com.google.common.collect.Lists; import lombok.Data; +import org.springframework.beans.BeanUtils; +import org.springframework.util.StringUtils; import java.util.Date; import java.util.List; @@ -56,6 +66,8 @@ public class JobInfoVO { private boolean enable; // 下一次调度时间 private Long nextTriggerTime; + // 下一次调度时间(文字版) + private String nextTriggerTimeStr; /* ************************** 繁忙机器配置 ************************** */ // 最低CPU核心数量,0代表不限 @@ -76,4 +88,27 @@ public class JobInfoVO { // 报警用户ID列表 private List notifyUserIds; + + public static JobInfoVO from(JobInfoDO jobInfoDO) { + JobInfoVO jobInfoVO = new JobInfoVO(); + BeanUtils.copyProperties(jobInfoDO, jobInfoVO); + + TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoDO.getTimeExpressionType()); + ExecuteType executeType = ExecuteType.of(jobInfoDO.getExecuteType()); + ProcessorType processorType = ProcessorType.of(jobInfoDO.getProcessorType()); + + jobInfoVO.setTimeExpressionType(timeExpressionType.name()); + jobInfoVO.setExecuteType(executeType.name()); + jobInfoVO.setProcessorType(processorType.name()); + jobInfoVO.setEnable(jobInfoDO.getStatus() == SwitchableStatus.ENABLE.getV()); + + if (!StringUtils.isEmpty(jobInfoDO.getNotifyUserIds())) { + jobInfoVO.setNotifyUserIds(SJ.commaSplitter.splitToList(jobInfoDO.getNotifyUserIds())); + }else { + jobInfoVO.setNotifyUserIds(Lists.newLinkedList()); + } + jobInfoVO.setNextTriggerTimeStr(CommonUtils.formatTime(jobInfoDO.getNextTriggerTime())); + + return jobInfoVO; + } } diff --git a/powerjob-server/src/main/resources/application-daily.properties b/powerjob-server/src/main/resources/application-daily.properties index 87036311..5d3a6146 100644 --- a/powerjob-server/src/main/resources/application-daily.properties +++ b/powerjob-server/src/main/resources/application-daily.properties @@ -3,7 +3,7 @@ logging.config=classpath:logback-dev.xml ####### 数据库配置 ####### spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver -spring.datasource.core.jdbc-url=jdbc:mysql://remotehost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8 +spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8 spring.datasource.core.username=root spring.datasource.core.password=No1Bug2Please3! spring.datasource.core.hikari.maximum-pool-size=20 @@ -21,11 +21,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true ####### 资源清理配置 ####### -oms.log.retention.local=1 -oms.log.retention.remote=1 +oms.instanceinfo.retention=1 oms.container.retention.local=1 oms.container.retention.remote=-1 -oms.instanceinfo.retention=1 ####### 缓存配置 ####### oms.instance.metadata.cache.size=1024 \ No newline at end of file diff --git a/powerjob-server/src/main/resources/application-pre.properties b/powerjob-server/src/main/resources/application-pre.properties index 5e2291d8..f2d33b88 100644 --- a/powerjob-server/src/main/resources/application-pre.properties +++ b/powerjob-server/src/main/resources/application-pre.properties @@ -21,11 +21,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true ####### 资源清理配置 ####### -oms.log.retention.local=3 -oms.log.retention.remote=3 +oms.instanceinfo.retention=3 oms.container.retention.local=3 oms.container.retention.remote=-1 -oms.instanceinfo.retention=3 ####### 缓存配置 ####### oms.instance.metadata.cache.size=1024 \ No newline at end of file diff --git a/powerjob-server/src/main/resources/application-product.properties b/powerjob-server/src/main/resources/application-product.properties index 8df29624..0a4e2556 100644 --- a/powerjob-server/src/main/resources/application-product.properties +++ b/powerjob-server/src/main/resources/application-product.properties @@ -21,11 +21,9 @@ spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true ####### 资源清理配置 ####### -oms.log.retention.local=7 -oms.log.retention.remote=7 +oms.instanceinfo.retention=7 oms.container.retention.local=7 oms.container.retention.remote=-1 -oms.instanceinfo.retention=3 ####### 缓存配置 ####### oms.instance.metadata.cache.size=2048 \ No newline at end of file diff --git a/powerjob-server/src/main/resources/application.properties b/powerjob-server/src/main/resources/application.properties index b068bee6..1078fa1d 100644 --- a/powerjob-server/src/main/resources/application.properties +++ b/powerjob-server/src/main/resources/application.properties @@ -11,8 +11,10 @@ spring.servlet.multipart.file-size-threshold=0 spring.servlet.multipart.max-file-size=209715200 spring.servlet.multipart.max-request-size=209715200 -###### OhMyScheduler 自身配置(该配置只允许存在于 application.properties 文件中) ###### +###### PowerJob 自身配置(该配置只允许存在于 application.properties 文件中) ###### # akka ActorSystem 服务端口 oms.akka.port=10086 # 报警服务 bean名称 -oms.alarm.bean.names=omsDefaultMailAlarmService \ No newline at end of file +oms.alarm.bean.names=omsDefaultMailAlarmService +# 表前缀(默认无表前缀,有需求直接填入表前缀即可,比如 pj_ ) +oms.table-prefix= \ No newline at end of file diff --git a/powerjob-server/src/main/resources/oms-server.akka.conf b/powerjob-server/src/main/resources/oms-server.akka.conf index fe6ab702..34cd48bc 100644 --- a/powerjob-server/src/main/resources/oms-server.akka.conf +++ b/powerjob-server/src/main/resources/oms-server.akka.conf @@ -20,4 +20,24 @@ akka { canonical.port = 0 } } + + server-actor-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 4.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 128 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 10 + } } \ No newline at end of file diff --git a/powerjob-server/src/main/resources/static/js/1.js b/powerjob-server/src/main/resources/static/js/1.js index 48e4e67d..eb98814f 100644 --- a/powerjob-server/src/main/resources/static/js/1.js +++ b/powerjob-server/src/main/resources/static/js/1.js @@ -20,7 +20,7 @@ eval("__webpack_require__.r(__webpack_exports__);\n//\n//\n//\n//\n//\n//\n//\n/ /***/ (function(module, __webpack_exports__, __webpack_require__) { "use strict"; -eval("__webpack_require__.r(__webpack_exports__);\n/* harmony export (binding) */ __webpack_require__.d(__webpack_exports__, \"render\", function() { return render; });\n/* harmony export (binding) */ __webpack_require__.d(__webpack_exports__, \"staticRenderFns\", function() { return staticRenderFns; });\nvar render = function() {\n var _vm = this\n var _h = _vm.$createElement\n var _c = _vm._self._c || _h\n return _c(\n \"div\",\n [\n _c(\n \"el-row\",\n [\n _c(\n \"el-col\",\n { attrs: { offset: 20 } },\n [\n _c(\n \"el-button\",\n {\n attrs: { type: \"primary\" },\n on: { click: _vm.fetchInstanceDetail }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.refresh\")))]\n )\n ],\n 1\n )\n ],\n 1\n ),\n _c(\n \"el-row\",\n [\n _c(\"el-col\", { attrs: { span: 24 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.instanceId\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(_vm._s(_vm.instanceId))\n ])\n ])\n ],\n 1\n ),\n _c(\n \"el-row\",\n { staticStyle: { \"margin-top\": \"-20px\" } },\n [\n _c(\"el-col\", { attrs: { span: 8 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.status\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(\n _vm._s(\n this.common.translateInstanceStatus(_vm.instanceDetail.status)\n )\n )\n ])\n ]),\n _c(\"el-col\", { attrs: { span: 16 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.runningTimes\")) + \":\"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(_vm._s(_vm.instanceDetail.runningTimes))\n ])\n ])\n ],\n 1\n ),\n _c(\n \"el-row\",\n [\n _c(\"el-col\", { attrs: { span: 24 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.taskTrackerAddress\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(\" \" + _vm._s(_vm.instanceDetail.taskTrackerAddress))\n ])\n ])\n ],\n 1\n ),\n _c(\n \"el-row\",\n [\n _c(\"el-col\", { attrs: { span: 8 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.startTime\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(\n \" \" +\n _vm._s(\n this.common.timestamp2Str(\n _vm.instanceDetail.actualTriggerTime\n )\n )\n )\n ])\n ]),\n _c(\"el-col\", { attrs: { span: 8 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.finishedTime\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(\n _vm._s(\n this.common.timestamp2Str(_vm.instanceDetail.finishedTime)\n )\n )\n ])\n ])\n ],\n 1\n ),\n _c(\n \"el-row\",\n [\n _c(\"el-col\", { attrs: { span: 24 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.result\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(\" \" + _vm._s(_vm.instanceDetail.result))\n ])\n ])\n ],\n 1\n ),\n _c(\"el-row\", { attrs: { id: \"taskDetail\" } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.subTaskInfo\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(_vm._s(_vm.instanceDetail.taskDetail))\n ])\n ]),\n _c(\n \"el-row\",\n [\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(_vm._s(_vm.$t(\"message.secondlyJobHistory\")) + \":\")\n ]),\n _c(\n \"el-table\",\n {\n staticStyle: { width: \"100%\" },\n attrs: { data: _vm.instanceDetail.subInstanceDetails }\n },\n [\n _c(\"el-table-column\", {\n attrs: {\n prop: \"subInstanceId\",\n label: _vm.$t(\"message.subInstanceId\"),\n width: \"120\"\n }\n }),\n _c(\"el-table-column\", {\n attrs: {\n prop: \"startTime\",\n label: _vm.$t(\"message.startTime\"),\n width: \"160\"\n }\n }),\n _c(\"el-table-column\", {\n attrs: {\n prop: \"finishedTime\",\n label: _vm.$t(\"message.finishedTime\"),\n width: \"160\"\n }\n }),\n _c(\"el-table-column\", {\n attrs: { label: _vm.$t(\"message.status\"), width: \"160\" },\n scopedSlots: _vm._u([\n {\n key: \"default\",\n fn: function(scope) {\n return [\n _vm._v(\n \" \" +\n _vm._s(\n _vm.common.translateInstanceStatus(\n scope.row.status\n )\n ) +\n \" \"\n )\n ]\n }\n }\n ])\n }),\n _c(\"el-table-column\", {\n attrs: { prop: \"result\", label: _vm.$t(\"message.result\") }\n })\n ],\n 1\n )\n ],\n 1\n )\n ],\n 1\n )\n}\nvar staticRenderFns = []\nrender._withStripped = true\n\n\n\n//# sourceURL=webpack:///./src/components/common/InstanceDetail.vue?./node_modules/cache-loader/dist/cjs.js?%7B%22cacheDirectory%22:%22node_modules/.cache/vue-loader%22,%22cacheIdentifier%22:%2241f1f4da-vue-loader-template%22%7D!./node_modules/vue-loader/lib/loaders/templateLoader.js??vue-loader-options!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options"); +eval("__webpack_require__.r(__webpack_exports__);\n/* harmony export (binding) */ __webpack_require__.d(__webpack_exports__, \"render\", function() { return render; });\n/* harmony export (binding) */ __webpack_require__.d(__webpack_exports__, \"staticRenderFns\", function() { return staticRenderFns; });\nvar render = function() {\n var _vm = this\n var _h = _vm.$createElement\n var _c = _vm._self._c || _h\n return _c(\n \"div\",\n [\n _c(\n \"el-row\",\n [\n _c(\n \"el-col\",\n { attrs: { offset: 20 } },\n [\n _c(\n \"el-button\",\n {\n attrs: { type: \"primary\" },\n on: { click: _vm.fetchInstanceDetail }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.refresh\")))]\n )\n ],\n 1\n )\n ],\n 1\n ),\n _c(\n \"el-row\",\n [\n _c(\"el-col\", { attrs: { span: 24 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.instanceId\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(_vm._s(_vm.instanceId))\n ])\n ])\n ],\n 1\n ),\n _c(\n \"el-row\",\n { staticStyle: { \"margin-top\": \"-20px\" } },\n [\n _c(\"el-col\", { attrs: { span: 8 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.status\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(\n _vm._s(\n this.common.translateInstanceStatus(_vm.instanceDetail.status)\n )\n )\n ])\n ]),\n _c(\"el-col\", { attrs: { span: 16 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.runningTimes\")) + \":\"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(_vm._s(_vm.instanceDetail.runningTimes))\n ])\n ])\n ],\n 1\n ),\n _c(\n \"el-row\",\n [\n _c(\"el-col\", { attrs: { span: 24 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.taskTrackerAddress\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(\" \" + _vm._s(_vm.instanceDetail.taskTrackerAddress))\n ])\n ])\n ],\n 1\n ),\n _c(\n \"el-row\",\n [\n _c(\"el-col\", { attrs: { span: 8 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.startTime\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(\" \" + _vm._s(_vm.instanceDetail.actualTriggerTime))\n ])\n ]),\n _c(\"el-col\", { attrs: { span: 8 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.finishedTime\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(_vm._s(_vm.instanceDetail.finishedTime))\n ])\n ])\n ],\n 1\n ),\n _c(\n \"el-row\",\n [\n _c(\"el-col\", { attrs: { span: 24 } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.result\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(\" \" + _vm._s(_vm.instanceDetail.result))\n ])\n ])\n ],\n 1\n ),\n _c(\"el-row\", { attrs: { id: \"taskDetail\" } }, [\n _vm._v(\" \" + _vm._s(_vm.$t(\"message.subTaskInfo\")) + \": \"),\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(_vm._s(_vm.instanceDetail.taskDetail))\n ])\n ]),\n _c(\n \"el-row\",\n [\n _c(\"span\", { staticClass: \"title\" }, [\n _vm._v(_vm._s(_vm.$t(\"message.secondlyJobHistory\")) + \":\")\n ]),\n _c(\n \"el-table\",\n {\n staticStyle: { width: \"100%\" },\n attrs: { data: _vm.instanceDetail.subInstanceDetails }\n },\n [\n _c(\"el-table-column\", {\n attrs: {\n prop: \"subInstanceId\",\n label: _vm.$t(\"message.subInstanceId\"),\n width: \"120\"\n }\n }),\n _c(\"el-table-column\", {\n attrs: {\n prop: \"startTime\",\n label: _vm.$t(\"message.startTime\"),\n width: \"160\"\n }\n }),\n _c(\"el-table-column\", {\n attrs: {\n prop: \"finishedTime\",\n label: _vm.$t(\"message.finishedTime\"),\n width: \"160\"\n }\n }),\n _c(\"el-table-column\", {\n attrs: { label: _vm.$t(\"message.status\"), width: \"160\" },\n scopedSlots: _vm._u([\n {\n key: \"default\",\n fn: function(scope) {\n return [\n _vm._v(\n \" \" +\n _vm._s(\n _vm.common.translateInstanceStatus(\n scope.row.status\n )\n ) +\n \" \"\n )\n ]\n }\n }\n ])\n }),\n _c(\"el-table-column\", {\n attrs: { prop: \"result\", label: _vm.$t(\"message.result\") }\n })\n ],\n 1\n )\n ],\n 1\n )\n ],\n 1\n )\n}\nvar staticRenderFns = []\nrender._withStripped = true\n\n\n\n//# sourceURL=webpack:///./src/components/common/InstanceDetail.vue?./node_modules/cache-loader/dist/cjs.js?%7B%22cacheDirectory%22:%22node_modules/.cache/vue-loader%22,%22cacheIdentifier%22:%2241f1f4da-vue-loader-template%22%7D!./node_modules/vue-loader/lib/loaders/templateLoader.js??vue-loader-options!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options"); /***/ }), diff --git a/powerjob-server/src/main/resources/static/js/10.js b/powerjob-server/src/main/resources/static/js/10.js index 2764670c..0802fc5b 100644 --- a/powerjob-server/src/main/resources/static/js/10.js +++ b/powerjob-server/src/main/resources/static/js/10.js @@ -8,7 +8,7 @@ /***/ (function(module, __webpack_exports__, __webpack_require__) { "use strict"; -eval("__webpack_require__.r(__webpack_exports__);\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"JobManager\",\n data: function data() {\n return {\n modifiedJobFormVisible: false,\n // 新建任务对象\n modifiedJobForm: {\n id: undefined,\n jobName: \"\",\n jobDescription: \"\",\n appId: this.$store.state.appInfo.id,\n jobParams: \"\",\n timeExpressionType: \"\",\n timeExpression: \"\",\n executeType: \"\",\n processorType: \"\",\n processorInfo: \"\",\n maxInstanceNum: 1,\n concurrency: 5,\n instanceTimeLimit: 0,\n instanceRetryNum: 0,\n taskRetryNum: 1,\n minCpuCores: 0,\n minMemorySpace: 0,\n minDiskSpace: 0,\n enable: true,\n designatedWorkers: \"\",\n maxWorkerCount: 0,\n notifyUserIds: []\n },\n // 任务查询请求对象\n jobQueryContent: {\n appId: this.$store.state.appInfo.id,\n index: 0,\n pageSize: 10,\n jobId: undefined,\n keyword: undefined\n },\n // 任务列表(查询结果),包含index、pageSize、totalPages、totalItems、data(List类型)\n jobInfoPageResult: {\n pageSize: 10,\n totalItems: 0,\n data: []\n },\n // 时间表达式选择类型\n timeExpressionTypeOptions: [{\n key: \"API\",\n label: \"API\"\n }, {\n key: \"CRON\",\n label: \"CRON\"\n }, {\n key: \"FIX_RATE\",\n label: this.$t('message.fixRate')\n }, {\n key: \"FIX_DELAY\",\n label: this.$t('message.fixDelay')\n }, {\n key: \"WORKFLOW\",\n label: this.$t('message.workflow')\n }],\n // 处理器类型\n processorTypeOptions: [{\n key: \"EMBEDDED_JAVA\",\n label: \"JAVA\"\n }, {\n key: \"JAVA_CONTAINER\",\n label: this.$t('message.javaContainer')\n }, {\n key: \"SHELL\",\n label: \"SHELL\"\n }, {\n key: \"PYTHON\",\n label: \"PYTHON\"\n }],\n // 执行方式类型\n executeTypeOptions: [{\n key: \"STANDALONE\",\n label: this.$t('message.standalone')\n }, {\n key: \"BROADCAST\",\n label: this.$t('message.broadcast')\n }, {\n key: \"MAP\",\n label: this.$t('message.map')\n }, {\n key: \"MAP_REDUCE\",\n label: this.$t('message.mapReduce')\n }],\n // 用户列表\n userList: []\n };\n },\n methods: {\n // 保存变更,包括新增和修改\n saveJob: function saveJob() {\n var _this = this;\n\n var that = this;\n this.axios.post(\"/job/save\", this.modifiedJobForm).then(function () {\n that.modifiedJobFormVisible = false;\n that.$message.success(_this.$t('message.success')); // 重新加载数据\n\n that.listJobInfos();\n }, function () {\n return that.modifiedJobFormVisible = false;\n });\n },\n // 列出符合当前搜索条件的任务\n listJobInfos: function listJobInfos() {\n var that = this;\n this.axios.post(\"/job/list\", this.jobQueryContent).then(function (res) {\n that.jobInfoPageResult = res;\n });\n },\n // 修改任务状态\n changeJobStatus: function changeJobStatus(data) {\n // switch 会自动更改 enable 的值\n var that = this;\n\n if (data.enable === false) {\n // 仅有,有特殊逻辑(关闭秒级任务),走单独接口\n that.axios.get(\"/job/disable?jobId=\" + data.id).then(function () {\n return that.listJobInfos();\n });\n } else {\n // 启用,则发起正常的保存操作\n this.modifiedJobForm = data;\n this.saveJob();\n }\n },\n // 新增任务,去除旧数据\n onClickNewJob: function onClickNewJob() {\n this.modifiedJobForm.id = undefined;\n this.modifiedJobForm.jobName = undefined;\n this.modifiedJobForm.jobDescription = undefined;\n this.modifiedJobForm.jobParams = undefined;\n this.modifiedJobForm.timeExpression = undefined;\n this.modifiedJobForm.timeExpressionType = undefined;\n this.modifiedJobForm.processorInfo = undefined;\n this.modifiedJobForm.processorType = undefined;\n this.modifiedJobForm.executeType = undefined;\n this.modifiedJobFormVisible = true;\n },\n // 点击 编辑按钮\n onClickModify: function onClickModify(data) {\n // 修复点击编辑后再点击新增 行数据被清空 的问题\n this.modifiedJobForm = JSON.parse(JSON.stringify(data));\n this.modifiedJobFormVisible = true;\n },\n // 点击 立即运行按钮\n onClickRun: function onClickRun(data) {\n var _this2 = this;\n\n var that = this;\n var url = \"/job/run?jobId=\" + data.id;\n this.axios.get(url).then(function () {\n return that.$message.success(_this2.$t('message.success'));\n });\n },\n // 点击 删除任务\n onClickDeleteJob: function onClickDeleteJob(data) {\n var _this3 = this;\n\n var that = this;\n var url = \"/job/delete?jobId=\" + data.id;\n this.axios.get(url).then(function () {\n that.$message.success(_this3.$t('message.success'));\n that.listJobInfos();\n });\n },\n // 点击 换页\n onClickChangePage: function onClickChangePage(index) {\n // 后端从0开始,前端从1开始\n this.jobQueryContent.index = index - 1;\n this.listJobInfos();\n },\n // 点击重置按钮\n onClickReset: function onClickReset() {\n this.jobQueryContent.keyword = undefined;\n this.jobQueryContent.jobId = undefined;\n this.listJobInfos();\n },\n verifyPlaceholder: function verifyPlaceholder(processorType) {\n var res;\n\n switch (processorType) {\n case \"EMBEDDED_JAVA\":\n res = this.$t('message.javaProcessorInfoPLH');\n break;\n\n case \"JAVA_CONTAINER\":\n res = this.$t('message.containerProcessorInfoPLH');\n break;\n\n case \"SHELL\":\n res = this.$t('message.shellProcessorInfoPLH');\n break;\n\n case \"PYTHON\":\n res = this.$t('message.pythonProcessorInfoPLH');\n }\n\n return res;\n },\n // 翻译执行类型\n translateExecuteType: function translateExecuteType(executeType) {\n switch (executeType) {\n case \"STANDALONE\":\n return this.$t('message.standalone');\n\n case \"BROADCAST\":\n return this.$t('message.broadcast');\n\n case \"MAP_REDUCE\":\n return this.$t('message.mapReduce');\n\n case \"MAP\":\n return this.$t('message.map');\n\n default:\n return \"UNKNOWN\";\n }\n },\n // 翻译处理器类型\n translateProcessorType: function translateProcessorType(processorType) {\n if (processorType === \"JAVA_CONTAINER\") {\n return this.$t('message.javaContainer');\n }\n\n return processorType;\n }\n },\n mounted: function mounted() {\n // 加载用户信息\n var that = this;\n that.axios.get(\"/user/list\").then(function (res) {\n return that.userList = res;\n }); // 加载任务信息\n\n this.listJobInfos();\n }\n});\n\n//# sourceURL=webpack:///./src/components/views/JobManager.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options"); +eval("__webpack_require__.r(__webpack_exports__);\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"JobManager\",\n data: function data() {\n return {\n modifiedJobFormVisible: false,\n // 新建任务对象\n modifiedJobForm: {\n id: undefined,\n jobName: \"\",\n jobDescription: \"\",\n appId: this.$store.state.appInfo.id,\n jobParams: \"\",\n timeExpressionType: \"\",\n timeExpression: \"\",\n executeType: \"\",\n processorType: \"\",\n processorInfo: \"\",\n maxInstanceNum: 0,\n concurrency: 5,\n instanceTimeLimit: 0,\n instanceRetryNum: 0,\n taskRetryNum: 1,\n minCpuCores: 0,\n minMemorySpace: 0,\n minDiskSpace: 0,\n enable: true,\n designatedWorkers: \"\",\n maxWorkerCount: 0,\n notifyUserIds: []\n },\n // 任务查询请求对象\n jobQueryContent: {\n appId: this.$store.state.appInfo.id,\n index: 0,\n pageSize: 10,\n jobId: undefined,\n keyword: undefined\n },\n // 任务列表(查询结果),包含index、pageSize、totalPages、totalItems、data(List类型)\n jobInfoPageResult: {\n pageSize: 10,\n totalItems: 0,\n data: []\n },\n // 时间表达式选择类型\n timeExpressionTypeOptions: [{\n key: \"API\",\n label: \"API\"\n }, {\n key: \"CRON\",\n label: \"CRON\"\n }, {\n key: \"FIX_RATE\",\n label: this.$t('message.fixRate')\n }, {\n key: \"FIX_DELAY\",\n label: this.$t('message.fixDelay')\n }, {\n key: \"WORKFLOW\",\n label: this.$t('message.workflow')\n }],\n // 处理器类型\n processorTypeOptions: [{\n key: \"EMBEDDED_JAVA\",\n label: \"JAVA\"\n }, {\n key: \"JAVA_CONTAINER\",\n label: this.$t('message.javaContainer')\n }, {\n key: \"SHELL\",\n label: \"SHELL\"\n }, {\n key: \"PYTHON\",\n label: \"PYTHON\"\n }],\n // 执行方式类型\n executeTypeOptions: [{\n key: \"STANDALONE\",\n label: this.$t('message.standalone')\n }, {\n key: \"BROADCAST\",\n label: this.$t('message.broadcast')\n }, {\n key: \"MAP\",\n label: this.$t('message.map')\n }, {\n key: \"MAP_REDUCE\",\n label: this.$t('message.mapReduce')\n }],\n // 用户列表\n userList: []\n };\n },\n methods: {\n // 保存变更,包括新增和修改\n saveJob: function saveJob() {\n var _this = this;\n\n var that = this;\n this.axios.post(\"/job/save\", this.modifiedJobForm).then(function () {\n that.modifiedJobFormVisible = false;\n that.$message.success(_this.$t('message.success')); // 重新加载数据\n\n that.listJobInfos();\n }, function () {\n return that.modifiedJobFormVisible = false;\n });\n },\n // 列出符合当前搜索条件的任务\n listJobInfos: function listJobInfos() {\n var that = this;\n this.axios.post(\"/job/list\", this.jobQueryContent).then(function (res) {\n that.jobInfoPageResult = res;\n });\n },\n // 修改任务状态\n changeJobStatus: function changeJobStatus(data) {\n // switch 会自动更改 enable 的值\n var that = this;\n\n if (data.enable === false) {\n // 仅有,有特殊逻辑(关闭秒级任务),走单独接口\n that.axios.get(\"/job/disable?jobId=\" + data.id).then(function () {\n return that.listJobInfos();\n });\n } else {\n // 启用,则发起正常的保存操作\n this.modifiedJobForm = data;\n this.saveJob();\n }\n },\n // 新增任务,去除旧数据\n onClickNewJob: function onClickNewJob() {\n this.modifiedJobForm.id = undefined;\n this.modifiedJobForm.jobName = undefined;\n this.modifiedJobForm.jobDescription = undefined;\n this.modifiedJobForm.jobParams = undefined;\n this.modifiedJobForm.timeExpression = undefined;\n this.modifiedJobForm.timeExpressionType = undefined;\n this.modifiedJobForm.processorInfo = undefined;\n this.modifiedJobForm.processorType = undefined;\n this.modifiedJobForm.executeType = undefined;\n this.modifiedJobFormVisible = true;\n },\n // 点击 编辑按钮\n onClickModify: function onClickModify(data) {\n // 修复点击编辑后再点击新增 行数据被清空 的问题\n this.modifiedJobForm = JSON.parse(JSON.stringify(data));\n this.modifiedJobFormVisible = true;\n },\n // 点击 立即运行按钮\n onClickRun: function onClickRun(data) {\n var _this2 = this;\n\n var that = this;\n var url = \"/job/run?jobId=\" + data.id;\n this.axios.get(url).then(function () {\n return that.$message.success(_this2.$t('message.success'));\n });\n },\n // 点击 删除任务\n onClickDeleteJob: function onClickDeleteJob(data) {\n var _this3 = this;\n\n var that = this;\n var url = \"/job/delete?jobId=\" + data.id;\n this.axios.get(url).then(function () {\n that.$message.success(_this3.$t('message.success'));\n that.listJobInfos();\n });\n },\n // 点击 换页\n onClickChangePage: function onClickChangePage(index) {\n // 后端从0开始,前端从1开始\n this.jobQueryContent.index = index - 1;\n this.listJobInfos();\n },\n // 点击重置按钮\n onClickReset: function onClickReset() {\n this.jobQueryContent.keyword = undefined;\n this.jobQueryContent.jobId = undefined;\n this.listJobInfos();\n },\n verifyPlaceholder: function verifyPlaceholder(processorType) {\n var res;\n\n switch (processorType) {\n case \"EMBEDDED_JAVA\":\n res = this.$t('message.javaProcessorInfoPLH');\n break;\n\n case \"JAVA_CONTAINER\":\n res = this.$t('message.containerProcessorInfoPLH');\n break;\n\n case \"SHELL\":\n res = this.$t('message.shellProcessorInfoPLH');\n break;\n\n case \"PYTHON\":\n res = this.$t('message.pythonProcessorInfoPLH');\n }\n\n return res;\n },\n // 翻译执行类型\n translateExecuteType: function translateExecuteType(executeType) {\n switch (executeType) {\n case \"STANDALONE\":\n return this.$t('message.standalone');\n\n case \"BROADCAST\":\n return this.$t('message.broadcast');\n\n case \"MAP_REDUCE\":\n return this.$t('message.mapReduce');\n\n case \"MAP\":\n return this.$t('message.map');\n\n default:\n return \"UNKNOWN\";\n }\n },\n // 翻译处理器类型\n translateProcessorType: function translateProcessorType(processorType) {\n if (processorType === \"JAVA_CONTAINER\") {\n return this.$t('message.javaContainer');\n }\n\n return processorType;\n }\n },\n mounted: function mounted() {\n // 加载用户信息\n var that = this;\n that.axios.get(\"/user/list\").then(function (res) {\n return that.userList = res;\n }); // 加载任务信息\n\n this.listJobInfos();\n }\n});\n\n//# sourceURL=webpack:///./src/components/views/JobManager.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options"); /***/ }), diff --git a/powerjob-server/src/main/resources/static/js/11.js b/powerjob-server/src/main/resources/static/js/11.js index d9848f12..1a5fa90e 100644 --- a/powerjob-server/src/main/resources/static/js/11.js +++ b/powerjob-server/src/main/resources/static/js/11.js @@ -8,7 +8,7 @@ /***/ (function(module, __webpack_exports__, __webpack_require__) { "use strict"; -eval("__webpack_require__.r(__webpack_exports__);\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"WorkflowManager\",\n data: function data() {\n return {\n // 查询条件\n workflowQueryContent: {\n appId: this.$store.state.appInfo.id,\n index: 0,\n pageSize: 10,\n workflowId: undefined,\n keyword: undefined\n },\n // 工作流查询结果\n workflowPageResult: {\n pageSize: 10,\n totalItems: 0,\n data: []\n },\n // 新建工作流对象\n workflowObj: {}\n };\n },\n methods: {\n // 查询工作流\n listWorkflow: function listWorkflow() {\n var that = this;\n this.axios.post(\"/workflow/list\", this.workflowQueryContent).then(function (res) {\n that.workflowPageResult = res;\n });\n },\n // 点击重置\n onClickReset: function onClickReset() {\n this.workflowQueryContent.workflowId = undefined;\n this.workflowQueryContent.keyword = undefined;\n },\n // 开关工作流\n switchWorkflow: function switchWorkflow(data) {\n var that = this;\n var path = data.enable ? \"enable\" : \"disable\";\n var url = \"/workflow/\" + path + \"?appId=\" + this.$store.state.appInfo.id + \"&workflowId=\" + data.id;\n this.axios.get(url, function (res) {\n console.log(res);\n that.listWorkflow();\n });\n },\n // 编辑工作流\n onClickModifyWorkflow: function onClickModifyWorkflow(data) {\n this.$router.push({\n name: 'workflowEditor',\n params: {\n modify: true,\n workflowInfo: data\n }\n });\n },\n // 立即运行工作流\n onClickRunWorkflow: function onClickRunWorkflow(data) {\n var _this = this;\n\n var that = this;\n var url = \"/workflow/run?appId=\" + this.$store.state.appInfo.id + \"&workflowId=\" + data.id;\n this.axios.get(url).then(function () {\n return that.$message.success(_this.$t('message.success'));\n });\n },\n // 删除工作流\n onClickDeleteWorkflow: function onClickDeleteWorkflow(data) {\n var _this2 = this;\n\n var that = this;\n var url = \"/workflow/delete?appId=\" + this.$store.state.appInfo.id + \"&workflowId=\" + data.id;\n this.axios.get(url).then(function () {\n that.$message.success(_this2.$t('message.success'));\n that.listWorkflow();\n });\n },\n // 新建工作流\n onClickNewWorkflow: function onClickNewWorkflow() {\n this.$router.push({\n name: 'workflowEditor',\n params: {\n modify: false\n }\n });\n }\n },\n mounted: function mounted() {\n this.listWorkflow();\n }\n});\n\n//# sourceURL=webpack:///./src/components/views/WorkflowManager.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options"); +eval("__webpack_require__.r(__webpack_exports__);\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n//\n/* harmony default export */ __webpack_exports__[\"default\"] = ({\n name: \"WorkflowManager\",\n data: function data() {\n return {\n // 查询条件\n workflowQueryContent: {\n appId: this.$store.state.appInfo.id,\n index: 0,\n pageSize: 10,\n workflowId: undefined,\n keyword: undefined\n },\n // 工作流查询结果\n workflowPageResult: {\n pageSize: 10,\n totalItems: 0,\n data: []\n },\n // 新建工作流对象\n workflowObj: {}\n };\n },\n methods: {\n // 查询工作流\n listWorkflow: function listWorkflow() {\n var that = this;\n this.axios.post(\"/workflow/list\", this.workflowQueryContent).then(function (res) {\n that.workflowPageResult = res;\n });\n },\n // 点击重置\n onClickReset: function onClickReset() {\n this.workflowQueryContent.workflowId = undefined;\n this.workflowQueryContent.keyword = undefined;\n },\n // 开关工作流\n switchWorkflow: function switchWorkflow(data) {\n var that = this;\n var path = data.enable ? \"enable\" : \"disable\";\n var url = \"/workflow/\" + path + \"?appId=\" + this.$store.state.appInfo.id + \"&workflowId=\" + data.id;\n this.axios.get(url, function (res) {\n console.log(res);\n that.listWorkflow();\n });\n },\n // 编辑工作流\n onClickModifyWorkflow: function onClickModifyWorkflow(data) {\n this.$router.push({\n name: 'workflowEditor',\n params: {\n modify: true,\n workflowInfo: data\n }\n });\n },\n // 立即运行工作流\n onClickRunWorkflow: function onClickRunWorkflow(data) {\n var _this = this;\n\n var that = this;\n var url = \"/workflow/run?appId=\" + this.$store.state.appInfo.id + \"&workflowId=\" + data.id;\n this.axios.get(url).then(function () {\n return that.$message.success(_this.$t('message.success'));\n });\n },\n // 删除工作流\n onClickDeleteWorkflow: function onClickDeleteWorkflow(data) {\n var _this2 = this;\n\n var that = this;\n var url = \"/workflow/delete?appId=\" + this.$store.state.appInfo.id + \"&workflowId=\" + data.id;\n this.axios.get(url).then(function () {\n that.$message.success(_this2.$t('message.success'));\n that.listWorkflow();\n });\n },\n // 新建工作流\n onClickNewWorkflow: function onClickNewWorkflow() {\n this.$router.push({\n name: 'workflowEditor',\n params: {\n modify: false\n }\n });\n },\n // 点击换页\n onClickChangePage: function onClickChangePage(index) {\n // 后端从0开始,前端从1开始\n this.workflowQueryContent.index = index - 1;\n this.listWorkflow();\n }\n },\n mounted: function mounted() {\n this.listWorkflow();\n }\n});\n\n//# sourceURL=webpack:///./src/components/views/WorkflowManager.vue?./node_modules/cache-loader/dist/cjs.js??ref--12-0!./node_modules/babel-loader/lib!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options"); /***/ }), @@ -20,7 +20,7 @@ eval("__webpack_require__.r(__webpack_exports__);\n//\n//\n//\n//\n//\n//\n//\n/ /***/ (function(module, __webpack_exports__, __webpack_require__) { "use strict"; -eval("__webpack_require__.r(__webpack_exports__);\n/* harmony export (binding) */ __webpack_require__.d(__webpack_exports__, \"render\", function() { return render; });\n/* harmony export (binding) */ __webpack_require__.d(__webpack_exports__, \"staticRenderFns\", function() { return staticRenderFns; });\nvar render = function() {\n var _vm = this\n var _h = _vm.$createElement\n var _c = _vm._self._c || _h\n return _c(\n \"div\",\n { attrs: { id: \"workflow_manager\" } },\n [\n _c(\n \"el-row\",\n { attrs: { gutter: 20 } },\n [\n _c(\n \"el-col\",\n { attrs: { span: 20 } },\n [\n _c(\n \"el-form\",\n {\n staticClass: \"el-form--inline\",\n attrs: { inline: true, model: _vm.workflowQueryContent }\n },\n [\n _c(\n \"el-form-item\",\n { attrs: { label: _vm.$t(\"message.wfId\") } },\n [\n _c(\"el-input\", {\n attrs: { placeholder: _vm.$t(\"message.wfId\") },\n model: {\n value: _vm.workflowQueryContent.workflowId,\n callback: function($$v) {\n _vm.$set(\n _vm.workflowQueryContent,\n \"workflowId\",\n $$v\n )\n },\n expression: \"workflowQueryContent.workflowId\"\n }\n })\n ],\n 1\n ),\n _c(\n \"el-form-item\",\n { attrs: { label: _vm.$t(\"message.keyword\") } },\n [\n _c(\"el-input\", {\n attrs: { placeholder: _vm.$t(\"message.keyword\") },\n model: {\n value: _vm.workflowQueryContent.keyword,\n callback: function($$v) {\n _vm.$set(_vm.workflowQueryContent, \"keyword\", $$v)\n },\n expression: \"workflowQueryContent.keyword\"\n }\n })\n ],\n 1\n ),\n _c(\n \"el-form-item\",\n [\n _c(\n \"el-button\",\n {\n attrs: { type: \"primary\" },\n on: { click: _vm.listWorkflow }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.query\")))]\n ),\n _c(\n \"el-button\",\n {\n attrs: { type: \"cancel\" },\n on: { click: _vm.onClickReset }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.reset\")))]\n )\n ],\n 1\n )\n ],\n 1\n )\n ],\n 1\n ),\n _c(\"el-col\", { attrs: { span: 4 } }, [\n _c(\n \"div\",\n { staticStyle: { float: \"right\", \"padding-right\": \"10px\" } },\n [\n _c(\n \"el-button\",\n {\n attrs: { type: \"primary\" },\n on: { click: _vm.onClickNewWorkflow }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.newWorkflow\")))]\n )\n ],\n 1\n )\n ])\n ],\n 1\n ),\n _c(\n \"el-row\",\n [\n _c(\n \"el-table\",\n {\n staticStyle: { width: \"100%\" },\n attrs: { data: _vm.workflowPageResult.data }\n },\n [\n _c(\"el-table-column\", {\n attrs: {\n prop: \"id\",\n label: _vm.$t(\"message.wfId\"),\n width: \"120\"\n }\n }),\n _c(\"el-table-column\", {\n attrs: { prop: \"wfName\", label: _vm.$t(\"message.wfName\") }\n }),\n _c(\"el-table-column\", {\n attrs: { label: _vm.$t(\"message.scheduleInfo\") },\n scopedSlots: _vm._u([\n {\n key: \"default\",\n fn: function(scope) {\n return [\n _vm._v(\n \" \" +\n _vm._s(scope.row.timeExpressionType) +\n \" \" +\n _vm._s(scope.row.timeExpression) +\n \" \"\n )\n ]\n }\n }\n ])\n }),\n _c(\"el-table-column\", {\n attrs: { label: _vm.$t(\"message.status\"), width: \"80\" },\n scopedSlots: _vm._u([\n {\n key: \"default\",\n fn: function(scope) {\n return [\n _c(\"el-switch\", {\n attrs: {\n \"active-color\": \"#13ce66\",\n \"inactive-color\": \"#ff4949\"\n },\n on: {\n change: function($event) {\n return _vm.switchWorkflow(scope.row)\n }\n },\n model: {\n value: scope.row.enable,\n callback: function($$v) {\n _vm.$set(scope.row, \"enable\", $$v)\n },\n expression: \"scope.row.enable\"\n }\n })\n ]\n }\n }\n ])\n }),\n _c(\"el-table-column\", {\n attrs: { label: _vm.$t(\"message.operation\"), width: \"300\" },\n scopedSlots: _vm._u([\n {\n key: \"default\",\n fn: function(scope) {\n return [\n _c(\n \"el-button\",\n {\n attrs: { size: \"medium\" },\n on: {\n click: function($event) {\n return _vm.onClickModifyWorkflow(scope.row)\n }\n }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.edit\")))]\n ),\n _c(\n \"el-button\",\n {\n attrs: { size: \"medium\" },\n on: {\n click: function($event) {\n return _vm.onClickRunWorkflow(scope.row)\n }\n }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.run\")))]\n ),\n _c(\n \"el-button\",\n {\n attrs: { size: \"medium\", type: \"danger\" },\n on: {\n click: function($event) {\n return _vm.onClickDeleteWorkflow(scope.row)\n }\n }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.delete\")))]\n )\n ]\n }\n }\n ])\n })\n ],\n 1\n )\n ],\n 1\n )\n ],\n 1\n )\n}\nvar staticRenderFns = []\nrender._withStripped = true\n\n\n\n//# sourceURL=webpack:///./src/components/views/WorkflowManager.vue?./node_modules/cache-loader/dist/cjs.js?%7B%22cacheDirectory%22:%22node_modules/.cache/vue-loader%22,%22cacheIdentifier%22:%2241f1f4da-vue-loader-template%22%7D!./node_modules/vue-loader/lib/loaders/templateLoader.js??vue-loader-options!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options"); +eval("__webpack_require__.r(__webpack_exports__);\n/* harmony export (binding) */ __webpack_require__.d(__webpack_exports__, \"render\", function() { return render; });\n/* harmony export (binding) */ __webpack_require__.d(__webpack_exports__, \"staticRenderFns\", function() { return staticRenderFns; });\nvar render = function() {\n var _vm = this\n var _h = _vm.$createElement\n var _c = _vm._self._c || _h\n return _c(\n \"div\",\n { attrs: { id: \"workflow_manager\" } },\n [\n _c(\n \"el-row\",\n { attrs: { gutter: 20 } },\n [\n _c(\n \"el-col\",\n { attrs: { span: 20 } },\n [\n _c(\n \"el-form\",\n {\n staticClass: \"el-form--inline\",\n attrs: { inline: true, model: _vm.workflowQueryContent }\n },\n [\n _c(\n \"el-form-item\",\n { attrs: { label: _vm.$t(\"message.wfId\") } },\n [\n _c(\"el-input\", {\n attrs: { placeholder: _vm.$t(\"message.wfId\") },\n model: {\n value: _vm.workflowQueryContent.workflowId,\n callback: function($$v) {\n _vm.$set(\n _vm.workflowQueryContent,\n \"workflowId\",\n $$v\n )\n },\n expression: \"workflowQueryContent.workflowId\"\n }\n })\n ],\n 1\n ),\n _c(\n \"el-form-item\",\n { attrs: { label: _vm.$t(\"message.keyword\") } },\n [\n _c(\"el-input\", {\n attrs: { placeholder: _vm.$t(\"message.keyword\") },\n model: {\n value: _vm.workflowQueryContent.keyword,\n callback: function($$v) {\n _vm.$set(_vm.workflowQueryContent, \"keyword\", $$v)\n },\n expression: \"workflowQueryContent.keyword\"\n }\n })\n ],\n 1\n ),\n _c(\n \"el-form-item\",\n [\n _c(\n \"el-button\",\n {\n attrs: { type: \"primary\" },\n on: { click: _vm.listWorkflow }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.query\")))]\n ),\n _c(\n \"el-button\",\n {\n attrs: { type: \"cancel\" },\n on: { click: _vm.onClickReset }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.reset\")))]\n )\n ],\n 1\n )\n ],\n 1\n )\n ],\n 1\n ),\n _c(\"el-col\", { attrs: { span: 4 } }, [\n _c(\n \"div\",\n { staticStyle: { float: \"right\", \"padding-right\": \"10px\" } },\n [\n _c(\n \"el-button\",\n {\n attrs: { type: \"primary\" },\n on: { click: _vm.onClickNewWorkflow }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.newWorkflow\")))]\n )\n ],\n 1\n )\n ])\n ],\n 1\n ),\n _c(\n \"el-row\",\n [\n _c(\n \"el-table\",\n {\n staticStyle: { width: \"100%\" },\n attrs: { data: _vm.workflowPageResult.data }\n },\n [\n _c(\"el-table-column\", {\n attrs: {\n prop: \"id\",\n label: _vm.$t(\"message.wfId\"),\n width: \"120\"\n }\n }),\n _c(\"el-table-column\", {\n attrs: { prop: \"wfName\", label: _vm.$t(\"message.wfName\") }\n }),\n _c(\"el-table-column\", {\n attrs: { label: _vm.$t(\"message.scheduleInfo\") },\n scopedSlots: _vm._u([\n {\n key: \"default\",\n fn: function(scope) {\n return [\n _vm._v(\n \" \" +\n _vm._s(scope.row.timeExpressionType) +\n \" \" +\n _vm._s(scope.row.timeExpression) +\n \" \"\n )\n ]\n }\n }\n ])\n }),\n _c(\"el-table-column\", {\n attrs: { label: _vm.$t(\"message.status\"), width: \"80\" },\n scopedSlots: _vm._u([\n {\n key: \"default\",\n fn: function(scope) {\n return [\n _c(\"el-switch\", {\n attrs: {\n \"active-color\": \"#13ce66\",\n \"inactive-color\": \"#ff4949\"\n },\n on: {\n change: function($event) {\n return _vm.switchWorkflow(scope.row)\n }\n },\n model: {\n value: scope.row.enable,\n callback: function($$v) {\n _vm.$set(scope.row, \"enable\", $$v)\n },\n expression: \"scope.row.enable\"\n }\n })\n ]\n }\n }\n ])\n }),\n _c(\"el-table-column\", {\n attrs: { label: _vm.$t(\"message.operation\"), width: \"300\" },\n scopedSlots: _vm._u([\n {\n key: \"default\",\n fn: function(scope) {\n return [\n _c(\n \"el-button\",\n {\n attrs: { size: \"medium\" },\n on: {\n click: function($event) {\n return _vm.onClickModifyWorkflow(scope.row)\n }\n }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.edit\")))]\n ),\n _c(\n \"el-button\",\n {\n attrs: { size: \"medium\" },\n on: {\n click: function($event) {\n return _vm.onClickRunWorkflow(scope.row)\n }\n }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.run\")))]\n ),\n _c(\n \"el-button\",\n {\n attrs: { size: \"medium\", type: \"danger\" },\n on: {\n click: function($event) {\n return _vm.onClickDeleteWorkflow(scope.row)\n }\n }\n },\n [_vm._v(_vm._s(_vm.$t(\"message.delete\")))]\n )\n ]\n }\n }\n ])\n })\n ],\n 1\n )\n ],\n 1\n ),\n _c(\n \"el-row\",\n [\n _c(\"el-pagination\", {\n attrs: {\n layout: \"prev, pager, next\",\n total: this.workflowPageResult.totalItems,\n \"page-size\": this.workflowPageResult.pageSize,\n \"hide-on-single-page\": true\n },\n on: { \"current-change\": _vm.onClickChangePage }\n })\n ],\n 1\n )\n ],\n 1\n )\n}\nvar staticRenderFns = []\nrender._withStripped = true\n\n\n\n//# sourceURL=webpack:///./src/components/views/WorkflowManager.vue?./node_modules/cache-loader/dist/cjs.js?%7B%22cacheDirectory%22:%22node_modules/.cache/vue-loader%22,%22cacheIdentifier%22:%2241f1f4da-vue-loader-template%22%7D!./node_modules/vue-loader/lib/loaders/templateLoader.js??vue-loader-options!./node_modules/cache-loader/dist/cjs.js??ref--0-0!./node_modules/vue-loader/lib??vue-loader-options"); /***/ }), diff --git a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java index d44986d8..51cd52b3 100644 --- a/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java +++ b/powerjob-server/src/test/java/com/github/kfcfans/powerjob/server/test/RepositoryTest.java @@ -9,11 +9,13 @@ import com.github.kfcfans.powerjob.server.persistence.core.model.OmsLockDO; import com.github.kfcfans.powerjob.server.persistence.core.repository.InstanceInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.JobInfoRepository; import com.github.kfcfans.powerjob.server.persistence.core.repository.OmsLockRepository; +import com.github.kfcfans.powerjob.server.persistence.core.repository.WorkflowInstanceInfoRepository; import org.assertj.core.util.Lists; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.util.Date; @@ -36,11 +38,14 @@ public class RepositoryTest { private OmsLockRepository omsLockRepository; @Resource private InstanceInfoRepository instanceInfoRepository; + @Resource + private WorkflowInstanceInfoRepository workflowInstanceInfoRepository; /** * 需要证明批量写入失败后会回滚 */ @Test + @Transactional public void testBatchLock() { List locks = Lists.newArrayList(); @@ -52,6 +57,14 @@ public class RepositoryTest { omsLockRepository.flush(); } + @Test + public void testDeleteLock() { + String lockName = "test-lock"; + OmsLockDO lockDO = new OmsLockDO(lockName, NetUtils.getLocalHost(), 10000L); + omsLockRepository.save(lockDO); + omsLockRepository.deleteByLockName(lockName); + } + @Test public void testSelectCronJobSQL() { List result = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(Lists.newArrayList(1L), SwitchableStatus.ENABLE.getV(), TimeExpressionType.CRON.getV(), System.currentTimeMillis()); @@ -59,6 +72,7 @@ public class RepositoryTest { } @Test + @Transactional public void testUpdate() { InstanceInfoDO updateEntity = new InstanceInfoDO(); updateEntity.setId(22L); @@ -68,6 +82,7 @@ public class RepositoryTest { } @Test + @Transactional public void testExecuteLogUpdate() { instanceInfoRepository.update4TriggerFailed(1586310414570L, 2, 100, System.currentTimeMillis(), System.currentTimeMillis(), "192.168.1.1", "NULL", "", new Date()); instanceInfoRepository.update4FrequentJob(1586310419650L, 2, 200, new Date()); @@ -81,4 +96,20 @@ public class RepositoryTest { System.out.println(res); } + @Test + public void testFindByJobIdInAndStatusIn() { + List res = instanceInfoRepository.findByJobIdInAndStatusIn(Lists.newArrayList(1L, 2L, 3L, 4L), Lists.newArrayList(1, 2, 3, 4, 5)); + System.out.println(res); + } + + @Test + public void testDeleteInstanceInfo() { + instanceInfoRepository.deleteAllByGmtModifiedBefore(new Date()); + } + + @Test + public void testDeleteWorkflowInstanceInfo() { + workflowInstanceInfoRepository.deleteAllByGmtModifiedBefore(new Date()); + } + } diff --git a/powerjob-server/src/test/resources/application.properties b/powerjob-server/src/test/resources/application.properties index 6047d1a7..4cbbeca3 100644 --- a/powerjob-server/src/test/resources/application.properties +++ b/powerjob-server/src/test/resources/application.properties @@ -33,4 +33,6 @@ oms.log.retention.local=0 oms.log.retention.remote=0 oms.container.retention.local=0 oms.container.retention.remote=0 -oms.instanceinfo.retention=0; \ No newline at end of file +oms.instanceinfo.retention=0; +# 表前缀 +#oms.table-prefix=pj_ \ No newline at end of file diff --git a/powerjob-worker-agent/pom.xml b/powerjob-worker-agent/pom.xml index 4efd4128..4b4c7db2 100644 --- a/powerjob-worker-agent/pom.xml +++ b/powerjob-worker-agent/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker-agent - 3.1.3 + 3.2.0 jar - 3.1.3 + 3.2.0 1.2.3 4.3.2 diff --git a/powerjob-worker-samples/pom.xml b/powerjob-worker-samples/pom.xml index b6dc739a..e11ca2cc 100644 --- a/powerjob-worker-samples/pom.xml +++ b/powerjob-worker-samples/pom.xml @@ -10,11 +10,11 @@ 4.0.0 powerjob-worker-samples - 3.1.3 + 3.2.0 2.2.6.RELEASE - 3.1.3 + 3.2.0 1.2.68 diff --git a/powerjob-worker/pom.xml b/powerjob-worker/pom.xml index 6e7e9236..e4b55ef1 100644 --- a/powerjob-worker/pom.xml +++ b/powerjob-worker/pom.xml @@ -10,12 +10,12 @@ 4.0.0 powerjob-worker - 3.1.3 + 3.2.0 jar 5.2.4.RELEASE - 3.1.3 + 3.2.0 1.4.200 3.4.2 5.6.1 diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java index 4175bfe7..90722fee 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/OhMyWorker.java @@ -1,7 +1,10 @@ package com.github.kfcfans.powerjob.worker; +import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.DeadLetter; import akka.actor.Props; +import akka.routing.RoundRobinPool; import com.github.kfcfans.powerjob.common.OmsException; import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.response.ResultDTO; @@ -9,6 +12,7 @@ import com.github.kfcfans.powerjob.common.utils.CommonUtils; import com.github.kfcfans.powerjob.common.utils.HttpUtils; import com.github.kfcfans.powerjob.common.utils.JsonUtils; import com.github.kfcfans.powerjob.common.utils.NetUtils; +import com.github.kfcfans.powerjob.worker.actors.TroubleshootingActor; import com.github.kfcfans.powerjob.worker.actors.ProcessorTrackerActor; import com.github.kfcfans.powerjob.worker.actors.TaskTrackerActor; import com.github.kfcfans.powerjob.worker.actors.WorkerActor; @@ -93,10 +97,21 @@ public class OhMyWorker implements ApplicationContextAware, InitializingBean, Di Config akkaBasicConfig = ConfigFactory.load(RemoteConstant.WORKER_AKKA_CONFIG_NAME); Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig); + int cores = Runtime.getRuntime().availableProcessors(); actorSystem = ActorSystem.create(RemoteConstant.WORKER_ACTOR_SYSTEM_NAME, akkaFinalConfig); - actorSystem.actorOf(Props.create(TaskTrackerActor.class), RemoteConstant.Task_TRACKER_ACTOR_NAME); - actorSystem.actorOf(Props.create(ProcessorTrackerActor.class), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); - actorSystem.actorOf(Props.create(WorkerActor.class), RemoteConstant.WORKER_ACTOR_NAME); + actorSystem.actorOf(Props.create(TaskTrackerActor.class) + .withDispatcher("akka.task-tracker-dispatcher") + .withRouter(new RoundRobinPool(cores * 2)), RemoteConstant.Task_TRACKER_ACTOR_NAME); + actorSystem.actorOf(Props.create(ProcessorTrackerActor.class) + .withDispatcher("akka.processor-tracker-dispatcher") + .withRouter(new RoundRobinPool(cores)), RemoteConstant.PROCESSOR_TRACKER_ACTOR_NAME); + actorSystem.actorOf(Props.create(WorkerActor.class) + .withDispatcher("akka.worker-common-dispatcher") + .withRouter(new RoundRobinPool(cores)), RemoteConstant.WORKER_ACTOR_NAME); + + // 处理系统中产生的异常情况 + ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(TroubleshootingActor.class), RemoteConstant.TROUBLESHOOTING_ACTOR_NAME); + actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class); log.info("[OhMyWorker] akka-remote listening address: {}", workerAddress); log.info("[OhMyWorker] akka ActorSystem({}) initialized successfully.", actorSystem); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TroubleshootingActor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TroubleshootingActor.java new file mode 100644 index 00000000..0c6ec1ed --- /dev/null +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/actors/TroubleshootingActor.java @@ -0,0 +1,25 @@ +package com.github.kfcfans.powerjob.worker.actors; + +import akka.actor.AbstractActor; +import akka.actor.DeadLetter; +import lombok.extern.slf4j.Slf4j; + +/** + * 处理系统异常的 Actor + * + * @author 朱八 + * @since 2020/7/16 + */ +@Slf4j +public class TroubleshootingActor extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .match(DeadLetter.class, this::onReceiveDeadLetter) + .build(); + } + + public void onReceiveDeadLetter(DeadLetter dl) { + log.warn("[TroubleshootingActor] receive DeadLetter: {}", dl); + } +} diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/WorkerHealthReporter.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/WorkerHealthReporter.java index 74c84dbe..9caafca1 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/WorkerHealthReporter.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/background/WorkerHealthReporter.java @@ -5,6 +5,7 @@ import com.github.kfcfans.powerjob.common.RemoteConstant; import com.github.kfcfans.powerjob.common.model.SystemMetrics; import com.github.kfcfans.powerjob.common.request.WorkerHeartbeat; import com.github.kfcfans.powerjob.worker.OhMyWorker; +import com.github.kfcfans.powerjob.worker.common.OmsWorkerVersion; import com.github.kfcfans.powerjob.worker.common.utils.AkkaUtils; import com.github.kfcfans.powerjob.worker.common.utils.SystemInfoUtils; import com.github.kfcfans.powerjob.worker.container.OmsContainerFactory; @@ -39,6 +40,7 @@ public class WorkerHealthReporter implements Runnable { heartbeat.setAppName(OhMyWorker.getConfig().getAppName()); heartbeat.setAppId(OhMyWorker.getAppId()); heartbeat.setHeartbeatTime(System.currentTimeMillis()); + heartbeat.setVersion(OmsWorkerVersion.getVersion()); // 获取当前加载的容器列表 heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos()); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ProcessorBeanFactory.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ProcessorBeanFactory.java index ca92c950..2b890d90 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ProcessorBeanFactory.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/ProcessorBeanFactory.java @@ -37,7 +37,7 @@ public class ProcessorBeanFactory { return (BasicProcessor) clz.getDeclaredConstructor().newInstance(); }catch (Exception e) { - log.error("[ProcessorBeanFactory] load local Processor(className = {}) failed.", className, e); + log.warn("[ProcessorBeanFactory] load local Processor(className = {}) failed, reason is {}", className, e.getMessage()); } return null; }); diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/PythonProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/PythonProcessor.java index 317319c2..8973efad 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/PythonProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/PythonProcessor.java @@ -13,7 +13,7 @@ public class PythonProcessor extends ScriptProcessor { } @Override - protected String genScriptName(Long instanceId) { + protected String genScriptName() { return String.format("python_%d.py", instanceId); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java index 32e8120c..402b39ec 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ScriptProcessor.java @@ -4,6 +4,7 @@ import com.github.kfcfans.powerjob.worker.common.utils.OmsWorkerFileUtils; import com.github.kfcfans.powerjob.worker.core.processor.ProcessResult; import com.github.kfcfans.powerjob.worker.core.processor.TaskContext; import com.github.kfcfans.powerjob.worker.core.processor.sdk.BasicProcessor; +import com.github.kfcfans.powerjob.worker.log.OmsLogger; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -23,7 +24,7 @@ import java.util.concurrent.TimeUnit; @Slf4j public abstract class ScriptProcessor implements BasicProcessor { - private final Long instanceId; + protected final Long instanceId; // 脚本绝对路径 private final String scriptPath; private final long timeout; @@ -33,7 +34,7 @@ public abstract class ScriptProcessor implements BasicProcessor { public ScriptProcessor(Long instanceId, String processorInfo, long timeout) throws Exception { this.instanceId = instanceId; - this.scriptPath = OmsWorkerFileUtils.getScriptDir() + genScriptName(instanceId); + this.scriptPath = OmsWorkerFileUtils.getScriptDir() + genScriptName(); this.timeout = timeout; File script = new File(scriptPath); @@ -66,6 +67,10 @@ public abstract class ScriptProcessor implements BasicProcessor { @Override public ProcessResult process(TaskContext context) throws Exception { + OmsLogger omsLogger = context.getOmsLogger(); + + omsLogger.info("SYSTEM===> ScriptProcessor start to process"); + // 1. 授权 ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) @@ -80,40 +85,45 @@ public abstract class ScriptProcessor implements BasicProcessor { // 为了代码优雅而牺牲那么一点点点点点点点点性能 // 从外部传入线程池总感觉怪怪的...内部创建嘛又要考虑考虑资源释放问题,想来想去还是直接创建算了。 - new Thread(() -> copyStream(process.getInputStream(), inputSB)).start(); - new Thread(() -> copyStream(process.getErrorStream(), errorSB)).start(); + new Thread(() -> copyStream(process.getInputStream(), inputSB, omsLogger)).start(); + new Thread(() -> copyStream(process.getErrorStream(), errorSB, omsLogger)).start(); try { boolean s = process.waitFor(timeout, TimeUnit.MILLISECONDS); if (!s) { + omsLogger.info("SYSTEM===> process timeout"); return new ProcessResult(false, "TIMEOUT"); } String result = String.format("[INPUT]: %s;[ERROR]: %s", inputSB.toString(), errorSB.toString()); - log.debug("[ScriptProcessor] process result for instance(instanceId={}) is {}.", instanceId, result); + return new ProcessResult(true, result); }catch (InterruptedException ie) { + omsLogger.info("SYSTEM===> ScriptProcessor has been interrupted"); return new ProcessResult(false, "Interrupted"); } } - private void copyStream(InputStream is, StringBuilder sb) { + private void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger) { String line; try (BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { while ((line = br.readLine()) != null) { sb.append(line); + // 同步到在线日志 + omsLogger.info(line); } } catch (Exception e) { log.warn("[ScriptProcessor] copyStream failed.", e); + omsLogger.warn("[ScriptProcessor] copyStream failed.", e); + sb.append("Exception: ").append(e); } } /** * 生成脚本名称 - * @param instanceId 任务实例ID,作为文件名称(使用JobId会有更改不生效的问题) * @return 文件名称 */ - protected abstract String genScriptName(Long instanceId); + protected abstract String genScriptName(); /** * 获取运行命令(eg,shell返回 /bin/sh) diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ShellProcessor.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ShellProcessor.java index 02478e0c..d712bd5c 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ShellProcessor.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/processor/built/ShellProcessor.java @@ -17,7 +17,7 @@ public class ShellProcessor extends ScriptProcessor { } @Override - protected String genScriptName(Long instanceId) { + protected String genScriptName() { return String.format("shell_%d.sh", instanceId); } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java index 78b20322..01e1f9ac 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/CommonTaskTracker.java @@ -37,6 +37,10 @@ public class CommonTaskTracker extends TaskTracker { // 可以是除 ROOT_TASK_ID 的任何数字 private static final String LAST_TASK_ID = "1111"; + // 连续上报多次失败后放弃上报,视为结果不可达,TaskTracker down + private int reportFailedCnt = 0; + private static final int MAX_REPORT_FAILED_THRESHOLD = 5; + protected CommonTaskTracker(ServerScheduleJobReq req) { super(req); } @@ -232,6 +236,10 @@ public class CommonTaskTracker extends TaskTracker { // 服务器未接受上报,则等待下次重新上报 if (!serverAccepted) { + if (++reportFailedCnt > MAX_REPORT_FAILED_THRESHOLD) { + log.error("[TaskTracker-{}] try to report finished status(success={}, result={}) lots of times but all failed, it's time to give up, so the process result will be dropped", instanceId, success, result); + destroy(); + } return; } diff --git a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java index ed72f3f9..3e484b84 100644 --- a/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java +++ b/powerjob-worker/src/main/java/com/github/kfcfans/powerjob/worker/core/tracker/task/FrequentTaskTracker.java @@ -17,7 +17,6 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.time.DateFormatUtils; import org.springframework.beans.BeanUtils; import org.springframework.util.StringUtils; @@ -121,14 +120,6 @@ public class FrequentTaskTracker extends TaskTracker { subDetail.setStatus(status.getV()); subDetail.setSubInstanceId(subId); - // 设置时间 - subDetail.setStartTime(DateFormatUtils.format(subInstanceInfo.getStartTime(), OmsConstant.TIME_PATTERN)); - if (status == InstanceStatus.SUCCEED || status == InstanceStatus.FAILED) { - subDetail.setFinishedTime(DateFormatUtils.format(subInstanceInfo.getFinishedTime(), OmsConstant.TIME_PATTERN)); - }else { - subDetail.setFinishedTime("N/A"); - } - history.add(subDetail); }); diff --git a/powerjob-worker/src/main/resources/oms-worker.akka.conf b/powerjob-worker/src/main/resources/oms-worker.akka.conf index 94bf50af..03308428 100644 --- a/powerjob-worker/src/main/resources/oms-worker.akka.conf +++ b/powerjob-worker/src/main/resources/oms-worker.akka.conf @@ -20,4 +20,47 @@ akka { canonical.port = 25520 } } + + # dispatcher + task-tracker-dispatcher { + # Dispatcher is the name of the event-based dispatcher + type = Dispatcher + # What kind of ExecutionService to use + executor = "fork-join-executor" + # Configuration for the fork join pool + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 4.0 + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 64 + } + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 10 + } + + processor-tracker-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 2.0 + parallelism-max = 64 + } + throughput = 10 + } + + worker-common-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 2 + parallelism-factor = 2.0 + parallelism-max = 8 + } + throughput = 10 + } } \ No newline at end of file